1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use std::collections::HashMap;
use std::convert::Infallible;
use std::future::Future;
use std::num::NonZeroUsize;
use std::ops::ControlFlow;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use std::thread::available_parallelism;

use lsp_types::notification::{self, Notification};
use pin_project_lite::pin_project;
use tokio::sync::oneshot;
use tower_layer::Layer;
use tower_service::Service;

use crate::{
    AnyEvent, AnyNotification, AnyRequest, ErrorCode, JsonValue, LspService, RequestId,
    ResponseError, Result,
};

pub struct Concurrency<S> {
    service: S,
    max_concurrency: NonZeroUsize,
    ongoing: HashMap<RequestId, oneshot::Sender<Infallible>>,
}

impl<S: LspService> Service<AnyRequest> for Concurrency<S> {
    type Response = JsonValue;
    type Error = ResponseError;
    type Future = ResponseFuture<S::Future>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.ongoing.retain(|_, tx| tx.poll_closed(cx).is_pending());
        if self.ongoing.len() < self.max_concurrency.get() {
            Poll::Ready(Ok(()))
        } else {
            Poll::Pending
        }
    }

    fn call(&mut self, req: AnyRequest) -> Self::Future {
        assert!(self.ongoing.len() < self.max_concurrency.get(), "Not ready");
        let (cancel_tx, cancel_rx) = oneshot::channel();
        self.ongoing.insert(req.id.clone(), cancel_tx);
        let fut = self.service.call(req);
        ResponseFuture { fut, cancel_rx }
    }
}

pin_project! {
    pub struct ResponseFuture<Fut> {
        #[pin]
        fut: Fut,
        cancel_rx: oneshot::Receiver<Infallible>,
    }
}

impl<Fut: Future<Output = Result<JsonValue, ResponseError>>> Future for ResponseFuture<Fut> {
    type Output = Fut::Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();
        if let Poll::Ready(ret) = this.fut.poll(cx) {
            return Poll::Ready(ret);
        }
        match ready!(Pin::new(this.cancel_rx).poll(cx)) {
            Ok(never) => match never {},
            Err(_) => Poll::Ready(Err(ResponseError {
                code: ErrorCode::REQUEST_CANCELLED,
                message: "Client cancelled the request".into(),
                data: None,
            })),
        }
    }
}

impl<S: LspService> LspService for Concurrency<S> {
    fn notify(&mut self, notif: AnyNotification) -> ControlFlow<Result<()>> {
        if notif.method == notification::Cancel::METHOD {
            if let Ok(params) = serde_json::from_value::<lsp_types::CancelParams>(notif.params) {
                self.ongoing.remove(&params.id);
            }
            return ControlFlow::Continue(());
        }
        self.service.notify(notif)
    }

    fn emit(&mut self, event: AnyEvent) -> ControlFlow<Result<()>> {
        self.service.emit(event)
    }
}

#[derive(Clone, Debug)]
#[must_use]
pub struct ConcurrencyBuilder {
    max_concurrency: NonZeroUsize,
}

impl Default for ConcurrencyBuilder {
    fn default() -> Self {
        Self::new(available_parallelism().unwrap_or(NonZeroUsize::new(1).unwrap()))
    }
}

impl ConcurrencyBuilder {
    pub fn new(max_concurrency: NonZeroUsize) -> Self {
        Self { max_concurrency }
    }
}

pub type ConcurrencyLayer = ConcurrencyBuilder;

impl<S> Layer<S> for ConcurrencyBuilder {
    type Service = Concurrency<S>;

    fn layer(&self, inner: S) -> Self::Service {
        Concurrency {
            service: inner,
            max_concurrency: self.max_concurrency,
            ongoing: HashMap::with_capacity(self.max_concurrency.get()),
        }
    }
}