1#[cfg(feature = "dap")]
4pub mod dap;
5#[cfg(feature = "lsp")]
6pub mod lsp;
7pub mod req_queue;
8pub mod transport;
9
10pub use error::*;
11pub use msg::*;
12pub use server::*;
13
14mod error;
15mod msg;
16mod server;
17
18use std::any::Any;
19use std::pin::Pin;
20use std::sync::{Arc, Weak};
21use std::time::Instant;
22
23use futures::future::MaybeDone;
24use parking_lot::Mutex;
25use serde::Serialize;
26use serde_json::Value as JsonValue;
27
28type Event = Box<dyn Any + Send>;
29
30#[derive(Debug, Clone)]
32pub struct TConnectionTx<M> {
33 pub event: crossbeam_channel::Sender<Event>,
35 pub lsp: crossbeam_channel::Sender<Message>,
37 marker: std::marker::PhantomData<M>,
38}
39
40#[derive(Debug, Clone)]
42pub struct TConnectionRx<M> {
43 pub event: crossbeam_channel::Receiver<Event>,
45 pub lsp: crossbeam_channel::Receiver<Message>,
47 marker: std::marker::PhantomData<M>,
48}
49
50impl<M: TryFrom<Message, Error = anyhow::Error>> TConnectionRx<M> {
51 fn recv(&self) -> anyhow::Result<EventOrMessage<M>> {
53 crossbeam_channel::select_biased! {
54 recv(self.lsp) -> msg => Ok(EventOrMessage::Msg(msg?.try_into()?)),
55 recv(self.event) -> event => Ok(event.map(EventOrMessage::Evt)?),
56 }
57 }
58}
59
60pub type ConnectionTx = TConnectionTx<Message>;
62pub type ConnectionRx = TConnectionRx<Message>;
64
65enum EventOrMessage<M> {
67 Evt(Event),
68 Msg(M),
69}
70
71pub struct Connection<M> {
73 pub sender: TConnectionTx<M>,
75 pub receiver: TConnectionRx<M>,
77}
78
79impl<M: TryFrom<Message, Error = anyhow::Error>> From<Connection<Message>> for Connection<M> {
80 fn from(conn: Connection<Message>) -> Self {
81 Self {
82 sender: TConnectionTx {
83 event: conn.sender.event,
84 lsp: conn.sender.lsp,
85 marker: std::marker::PhantomData,
86 },
87 receiver: TConnectionRx {
88 event: conn.receiver.event,
89 lsp: conn.receiver.lsp,
90 marker: std::marker::PhantomData,
91 },
92 }
93 }
94}
95
96impl<M: TryFrom<Message, Error = anyhow::Error>> From<TConnectionTx<M>> for ConnectionTx {
97 fn from(conn: TConnectionTx<M>) -> Self {
98 Self {
99 event: conn.event,
100 lsp: conn.lsp,
101 marker: std::marker::PhantomData,
102 }
103 }
104}
105
106pub use msg::ResponseError;
108pub type LspResult<T> = Result<T, ResponseError>;
110pub type ResponseFuture<T> = MaybeDone<Pin<Box<dyn std::future::Future<Output = T> + Send>>>;
112pub type LspResponseFuture<T> = LspResult<ResponseFuture<T>>;
114pub type SchedulableResponse<T> = LspResponseFuture<LspResult<T>>;
116pub type AnySchedulableResponse = SchedulableResponse<JsonValue>;
118pub type ScheduledResult = LspResult<Option<()>>;
124
125pub fn just_ok<T, E>(res: T) -> Result<ResponseFuture<Result<T, E>>, E> {
127 Ok(futures::future::MaybeDone::Done(Ok(res)))
128}
129pub fn just_result<T, E>(res: Result<T, E>) -> Result<ResponseFuture<Result<T, E>>, E> {
131 Ok(futures::future::MaybeDone::Done(res))
132}
133pub fn just_future<T, E>(
135 fut: impl std::future::Future<Output = Result<T, E>> + Send + 'static,
136) -> Result<ResponseFuture<Result<T, E>>, E> {
137 Ok(futures::future::MaybeDone::Future(Box::pin(fut)))
138}
139
140type AnyCaster<S> = Arc<dyn Fn(&mut dyn Any) -> &mut S + Send + Sync>;
141
142pub struct TypedLspClient<S> {
144 client: LspClient,
145 caster: AnyCaster<S>,
146}
147
148impl<S> TypedLspClient<S> {
149 pub fn to_untyped(self) -> LspClient {
151 self.client
152 }
153}
154
155impl<S: 'static> TypedLspClient<S> {
156 pub fn untyped(&self) -> &LspClient {
158 &self.client
159 }
160
161 pub fn cast<T: 'static>(&self, f: fn(&mut S) -> &mut T) -> TypedLspClient<T> {
163 let caster = self.caster.clone();
164 TypedLspClient {
165 client: self.client.clone(),
166 caster: Arc::new(move |s| f(caster(s))),
167 }
168 }
169
170 pub fn send_event<T: std::any::Any + Send + 'static>(&self, event: T) {
172 let Some(sender) = self.sender.upgrade() else {
173 log::warn!("failed to send request: connection closed");
174 return;
175 };
176
177 let Err(res) = sender.event.send(Box::new(event)) else {
178 return;
179 };
180 log::warn!("failed to send event: {res:?}");
181 }
182}
183
184impl<S> Clone for TypedLspClient<S> {
185 fn clone(&self) -> Self {
186 Self {
187 client: self.client.clone(),
188 caster: self.caster.clone(),
189 }
190 }
191}
192
193impl<S> std::ops::Deref for TypedLspClient<S> {
194 type Target = LspClient;
195
196 fn deref(&self) -> &Self::Target {
197 &self.client
198 }
199}
200
201#[derive(Debug, Clone)]
204pub struct LspClientRoot {
205 weak: LspClient,
206 _strong: Arc<ConnectionTx>,
207}
208
209impl LspClientRoot {
210 pub fn new<M: TryFrom<Message, Error = anyhow::Error> + GetMessageKind>(
212 handle: tokio::runtime::Handle,
213 sender: TConnectionTx<M>,
214 ) -> Self {
215 let _strong = Arc::new(sender.into());
216 let weak = LspClient {
217 handle,
218 msg_kind: M::get_message_kind(),
219 sender: Arc::downgrade(&_strong),
220 req_queue: Arc::new(Mutex::new(ReqQueue::default())),
221 };
222 Self { weak, _strong }
223 }
224
225 pub fn weak(&self) -> LspClient {
227 self.weak.clone()
228 }
229}
230
231type ReqHandler = Box<dyn for<'a> FnOnce(&'a mut dyn Any, LspOrDapResponse) + Send + Sync>;
232type ReqQueue = req_queue::ReqQueue<(String, Instant), ReqHandler>;
233
234#[derive(Debug, Clone)]
236pub struct LspClient {
237 pub handle: tokio::runtime::Handle,
239
240 msg_kind: MessageKind,
241 sender: Weak<ConnectionTx>,
242 req_queue: Arc<Mutex<ReqQueue>>,
243}
244
245impl LspClient {
246 pub fn untyped(&self) -> &Self {
248 self
249 }
250
251 pub fn to_typed<S: Any>(&self) -> TypedLspClient<S> {
253 TypedLspClient {
254 client: self.clone(),
255 caster: Arc::new(|s| s.downcast_mut().expect("invalid cast")),
256 }
257 }
258
259 pub fn has_pending_requests(&self) -> bool {
261 self.req_queue.lock().incoming.has_pending()
262 }
263
264 pub fn begin_panic(&self) {
266 self.req_queue.lock().begin_panic();
267 }
268
269 pub fn send_event<T: std::any::Any + Send + 'static>(&self, event: T) {
271 let Some(sender) = self.sender.upgrade() else {
272 log::warn!("failed to send request: connection closed");
273 return;
274 };
275
276 if let Err(res) = sender.event.send(Box::new(event)) {
277 log::warn!("failed to send event: {res:?}");
278 }
279 }
280
281 #[cfg(feature = "lsp")]
283 pub fn complete_lsp_request<S: Any>(&self, service: &mut S, response: lsp::Response) {
284 let mut req_queue = self.req_queue.lock();
285 let Some(handler) = req_queue.outgoing.complete(response.id.clone()) else {
286 log::warn!("received response for unknown request");
287 return;
288 };
289 drop(req_queue);
290 handler(service, response.into())
291 }
292
293 #[cfg(feature = "dap")]
295 pub fn complete_dap_request<S: Any>(&self, service: &mut S, response: dap::Response) {
296 let mut req_queue = self.req_queue.lock();
297 let Some(handler) = req_queue
298 .outgoing
299 .complete((response.request_seq as i32).into())
301 else {
302 log::warn!("received response for unknown request");
303 return;
304 };
305 drop(req_queue);
306 handler(service, response.into())
307 }
308
309 pub fn register_request(&self, method: &str, id: &RequestId, received_at: Instant) {
311 let mut req_queue = self.req_queue.lock();
312 self.start_request(id, method);
313 req_queue
314 .incoming
315 .register(id.clone(), (method.to_owned(), received_at));
316 }
317
318 pub fn respond_result<T: Serialize>(&self, id: RequestId, result: LspResult<T>) {
320 let result = result.and_then(|t| serde_json::to_value(t).map_err(internal_error));
321 self.respond_any_result(id, result);
322 }
323
324 fn respond_any_result(&self, id: RequestId, result: LspResult<JsonValue>) {
325 let req_id = id.clone();
326 let msg: Message = match (self.msg_kind, result) {
327 #[cfg(feature = "lsp")]
328 (MessageKind::Lsp, Ok(resp)) => lsp::Response::new_ok(id, resp).into(),
329 #[cfg(feature = "lsp")]
330 (MessageKind::Lsp, Err(e)) => lsp::Response::new_err(id, e.code, e.message).into(),
331 #[cfg(feature = "dap")]
332 (MessageKind::Dap, Ok(resp)) => dap::Response::success(RequestId::dap(id), resp).into(),
333 #[cfg(feature = "dap")]
334 (MessageKind::Dap, Err(e)) => {
335 dap::Response::error(RequestId::dap(id), Some(e.message), None).into()
336 }
337 };
338
339 self.respond(req_id, msg);
340 }
341
342 pub fn respond(&self, id: RequestId, response: Message) {
344 let mut req_queue = self.req_queue.lock();
345 let Some((method, received_at)) = req_queue.incoming.complete(&id) else {
346 return;
347 };
348
349 self.stop_request(&id, &method, received_at);
350
351 let Some(sender) = self.sender.upgrade() else {
352 log::warn!("failed to send response ({method}, {id}): connection closed");
353 return;
354 };
355 if let Err(res) = sender.lsp.send(response) {
356 log::warn!("failed to send response ({method}, {id}): {res:?}");
357 }
358 }
359}
360
361impl LspClient {
362 pub fn schedule<T: Serialize + 'static>(
364 &self,
365 req_id: RequestId,
366 resp: SchedulableResponse<T>,
367 ) -> ScheduledResult {
368 let resp = resp?;
369
370 use futures::future::MaybeDone::*;
371 match resp {
372 Done(output) => {
373 self.respond_result(req_id, output);
374 }
375 Future(fut) => {
376 let client = self.clone();
377 let req_id = req_id.clone();
378 self.handle.spawn(async move {
379 client.respond_result(req_id, fut.await);
380 });
381 }
382 Gone => {
383 log::warn!("response for request({req_id:?}) already taken");
384 }
385 };
386
387 Ok(Some(()))
388 }
389
390 fn schedule_tail(&self, req_id: RequestId, resp: ScheduledResult) {
392 match resp {
393 Ok(Some(())) => {}
395 _ => self.respond_result(req_id, resp),
397 }
398 }
399}
400
401impl LspClient {
402 fn start_request(&self, req_id: &RequestId, method: &str) {
403 log::info!("handling {method} - ({req_id})");
404 }
405
406 fn stop_request(&self, req_id: &RequestId, method: &str, received_at: Instant) {
407 let duration = received_at.elapsed();
408 log::info!("handled {method} - ({req_id}) in {duration:0.2?}");
409 }
410
411 fn start_notification(&self, method: &str) {
412 log::info!("notifying {method}");
413 }
414
415 fn stop_notification(&self, method: &str, received_at: Instant, result: LspResult<()>) {
416 let request_duration = received_at.elapsed();
417 if let Err(err) = result {
418 log::error!("notify {method} failed in {request_duration:0.2?}: {err:?}");
419 } else {
420 log::info!("notify {method} succeeded in {request_duration:0.2?}");
421 }
422 }
423}