1#[cfg(feature = "dap")]
4mod dap_srv;
5
6#[cfg(feature = "lsp")]
7mod lsp_srv;
8
9use core::fmt;
10use std::any::Any;
11use std::collections::HashMap;
12use std::path::{Path, PathBuf};
13use std::pin::Pin;
14use std::sync::{Arc, Weak};
15use std::time::Instant;
16
17use futures::future::MaybeDone;
18use parking_lot::Mutex;
19use serde::Serialize;
20use serde_json::{from_value, Value as JsonValue};
21
22#[cfg(feature = "lsp")]
23use crate::lsp::{Notification, Request};
24use crate::msg::*;
25use crate::req_queue;
26use crate::*;
27
28type ImmutPath = Arc<Path>;
29
30pub type ResponseFuture<T> = MaybeDone<Pin<Box<dyn std::future::Future<Output = T> + Send>>>;
32pub type LspResponseFuture<T> = LspResult<ResponseFuture<T>>;
34pub type SchedulableResponse<T> = LspResponseFuture<LspResult<T>>;
36pub type AnySchedulableResponse = SchedulableResponse<JsonValue>;
38pub type ScheduledResult = LspResult<Option<()>>;
44
45pub type ConnectionTx = TConnectionTx<Message>;
47pub type ConnectionRx = TConnectionRx<Message>;
49
50#[derive(Debug, Clone)]
52pub struct TConnectionTx<M> {
53 pub event: crossbeam_channel::Sender<Event>,
55 pub lsp: crossbeam_channel::Sender<Message>,
57 pub(crate) marker: std::marker::PhantomData<M>,
58}
59
60#[derive(Debug, Clone)]
62pub struct TConnectionRx<M> {
63 pub event: crossbeam_channel::Receiver<Event>,
65 pub lsp: crossbeam_channel::Receiver<Message>,
67 pub(crate) marker: std::marker::PhantomData<M>,
68}
69
70impl<M: TryFrom<Message, Error = anyhow::Error>> TConnectionRx<M> {
71 pub(crate) fn recv(&self) -> anyhow::Result<EventOrMessage<M>> {
73 crossbeam_channel::select_biased! {
74 recv(self.lsp) -> msg => Ok(EventOrMessage::Msg(msg?.try_into()?)),
75 recv(self.event) -> event => Ok(event.map(EventOrMessage::Evt)?),
76 }
77 }
78}
79
80pub(crate) enum EventOrMessage<M> {
82 Evt(Event),
83 Msg(M),
84}
85
86pub struct Connection<M> {
88 pub sender: TConnectionTx<M>,
90 pub receiver: TConnectionRx<M>,
92}
93
94impl<M: TryFrom<Message, Error = anyhow::Error>> From<Connection<Message>> for Connection<M> {
95 fn from(conn: Connection<Message>) -> Self {
96 Self {
97 sender: TConnectionTx {
98 event: conn.sender.event,
99 lsp: conn.sender.lsp,
100 marker: std::marker::PhantomData,
101 },
102 receiver: TConnectionRx {
103 event: conn.receiver.event,
104 lsp: conn.receiver.lsp,
105 marker: std::marker::PhantomData,
106 },
107 }
108 }
109}
110
111impl<M: TryFrom<Message, Error = anyhow::Error>> From<TConnectionTx<M>> for ConnectionTx {
112 fn from(conn: TConnectionTx<M>) -> Self {
113 Self {
114 event: conn.event,
115 lsp: conn.lsp,
116 marker: std::marker::PhantomData,
117 }
118 }
119}
120
121type AnyCaster<S> = Arc<dyn Fn(&mut dyn Any) -> &mut S + Send + Sync>;
122
123pub struct TypedLspClient<S> {
125 client: LspClient,
126 caster: AnyCaster<S>,
127}
128
129impl<S> TypedLspClient<S> {
130 pub fn to_untyped(self) -> LspClient {
132 self.client
133 }
134}
135
136impl<S: 'static> TypedLspClient<S> {
137 pub fn untyped(&self) -> &LspClient {
139 &self.client
140 }
141
142 pub fn cast<T: 'static>(&self, f: fn(&mut S) -> &mut T) -> TypedLspClient<T> {
144 let caster = self.caster.clone();
145 TypedLspClient {
146 client: self.client.clone(),
147 caster: Arc::new(move |s| f(caster(s))),
148 }
149 }
150
151 pub fn send_event<T: std::any::Any + Send + 'static>(&self, event: T) {
153 let Some(sender) = self.sender.upgrade() else {
154 log::warn!("failed to send request: connection closed");
155 return;
156 };
157
158 let Err(res) = sender.event.send(Box::new(event)) else {
159 return;
160 };
161 log::warn!("failed to send event: {res:?}");
162 }
163}
164
165impl<S> Clone for TypedLspClient<S> {
166 fn clone(&self) -> Self {
167 Self {
168 client: self.client.clone(),
169 caster: self.caster.clone(),
170 }
171 }
172}
173
174impl<S> std::ops::Deref for TypedLspClient<S> {
175 type Target = LspClient;
176
177 fn deref(&self) -> &Self::Target {
178 &self.client
179 }
180}
181
182#[derive(Debug, Clone)]
185pub struct LspClientRoot {
186 weak: LspClient,
187 _strong: Arc<ConnectionTx>,
188}
189
190impl LspClientRoot {
191 pub fn new<M: TryFrom<Message, Error = anyhow::Error> + GetMessageKind>(
193 handle: tokio::runtime::Handle,
194 sender: TConnectionTx<M>,
195 ) -> Self {
196 let _strong = Arc::new(sender.into());
197 let weak = LspClient {
198 handle,
199 msg_kind: M::get_message_kind(),
200 sender: Arc::downgrade(&_strong),
201 req_queue: Arc::new(Mutex::new(ReqQueue::default())),
202
203 hook: Arc::new(()),
204 };
205 Self { weak, _strong }
206 }
207
208 pub fn with_hook(mut self, hook: Arc<dyn LsHook>) -> Self {
210 self.weak.hook = hook;
211 self
212 }
213
214 pub fn weak(&self) -> LspClient {
216 self.weak.clone()
217 }
218}
219
220type ReqHandler = Box<dyn for<'a> FnOnce(&'a mut dyn Any, LspOrDapResponse) + Send + Sync>;
221type ReqQueue = req_queue::ReqQueue<(String, Instant), ReqHandler>;
222
223#[derive(Debug, Clone)]
225pub struct LspClient {
226 pub handle: tokio::runtime::Handle,
228
229 pub(crate) msg_kind: MessageKind,
230 pub(crate) sender: Weak<ConnectionTx>,
231 pub(crate) req_queue: Arc<Mutex<ReqQueue>>,
232
233 pub(crate) hook: Arc<dyn LsHook>,
234}
235
236impl LspClient {
237 pub fn untyped(&self) -> &Self {
239 self
240 }
241
242 pub fn to_typed<S: Any>(&self) -> TypedLspClient<S> {
244 TypedLspClient {
245 client: self.clone(),
246 caster: Arc::new(|s| s.downcast_mut().expect("invalid cast")),
247 }
248 }
249
250 pub fn has_pending_requests(&self) -> bool {
252 self.req_queue.lock().incoming.has_pending()
253 }
254
255 pub fn begin_panic(&self) {
257 self.req_queue.lock().begin_panic();
258 }
259
260 pub fn send_event<T: std::any::Any + Send + 'static>(&self, event: T) {
262 let Some(sender) = self.sender.upgrade() else {
263 log::warn!("failed to send request: connection closed");
264 return;
265 };
266
267 if let Err(res) = sender.event.send(Box::new(event)) {
268 log::warn!("failed to send event: {res:?}");
269 }
270 }
271
272 #[cfg(feature = "lsp")]
274 pub fn complete_lsp_request<S: Any>(&self, service: &mut S, response: lsp::Response) {
275 let mut req_queue = self.req_queue.lock();
276 let Some(handler) = req_queue.outgoing.complete(response.id.clone()) else {
277 log::warn!("received response for unknown request");
278 return;
279 };
280 drop(req_queue);
281 handler(service, response.into())
282 }
283
284 #[cfg(feature = "dap")]
286 pub fn complete_dap_request<S: Any>(&self, service: &mut S, response: dap::Response) {
287 let mut req_queue = self.req_queue.lock();
288 let Some(handler) = req_queue
289 .outgoing
290 .complete((response.request_seq as i32).into())
292 else {
293 log::warn!("received response for unknown request");
294 return;
295 };
296 drop(req_queue);
297 handler(service, response.into())
298 }
299
300 pub fn register_request(&self, method: &str, id: &RequestId, received_at: Instant) {
302 let mut req_queue = self.req_queue.lock();
303 self.hook.start_request(id, method);
304 req_queue
305 .incoming
306 .register(id.clone(), (method.to_owned(), received_at));
307 }
308
309 pub fn respond_result<T: Serialize>(&self, id: RequestId, result: LspResult<T>) {
311 let result = result.and_then(|t| serde_json::to_value(t).map_err(internal_error));
312 self.respond_any_result(id, result);
313 }
314
315 fn respond_any_result(&self, id: RequestId, result: LspResult<JsonValue>) {
316 let req_id = id.clone();
317 let msg: Message = match (self.msg_kind, result) {
318 #[cfg(feature = "lsp")]
319 (MessageKind::Lsp, Ok(resp)) => lsp::Response::new_ok(id, resp).into(),
320 #[cfg(feature = "lsp")]
321 (MessageKind::Lsp, Err(e)) => lsp::Response::new_err(id, e.code, e.message).into(),
322 #[cfg(feature = "dap")]
323 (MessageKind::Dap, Ok(resp)) => dap::Response::success(RequestId::dap(id), resp).into(),
324 #[cfg(feature = "dap")]
325 (MessageKind::Dap, Err(e)) => {
326 dap::Response::error(RequestId::dap(id), Some(e.message), None).into()
327 }
328 };
329
330 self.respond(req_id, msg);
331 }
332
333 pub fn respond(&self, id: RequestId, response: Message) {
335 let mut req_queue = self.req_queue.lock();
336 let Some((method, received_at)) = req_queue.incoming.complete(&id) else {
337 return;
338 };
339
340 self.hook.stop_request(&id, &method, received_at);
341
342 let Some(sender) = self.sender.upgrade() else {
343 log::warn!("failed to send response ({method}, {id}): connection closed");
344 return;
345 };
346 if let Err(res) = sender.lsp.send(response) {
347 log::warn!("failed to send response ({method}, {id}): {res:?}");
348 }
349 }
350}
351
352impl LspClient {
353 pub fn schedule<T: Serialize + 'static>(
355 &self,
356 req_id: RequestId,
357 resp: SchedulableResponse<T>,
358 ) -> ScheduledResult {
359 let resp = resp?;
360
361 use futures::future::MaybeDone::*;
362 match resp {
363 Done(output) => {
364 self.respond_result(req_id, output);
365 }
366 Future(fut) => {
367 let client = self.clone();
368 let req_id = req_id.clone();
369 self.handle.spawn(async move {
370 client.respond_result(req_id, fut.await);
371 });
372 }
373 Gone => {
374 log::warn!("response for request({req_id:?}) already taken");
375 }
376 };
377
378 Ok(Some(()))
379 }
380
381 pub(crate) fn schedule_tail(&self, req_id: RequestId, resp: ScheduledResult) {
384 match resp {
385 Ok(Some(())) => {}
387 _ => self.respond_result(req_id, resp),
389 }
390 }
391}
392
393pub trait LsHook: fmt::Debug + Send + Sync {
395 fn start_request(&self, req_id: &RequestId, method: &str);
397 fn stop_request(&self, req_id: &RequestId, method: &str, received_at: Instant);
399 fn start_notification(&self, method: &str);
401 fn stop_notification(&self, method: &str, received_at: Instant, result: LspResult<()>);
403}
404
405impl LsHook for () {
406 fn start_request(&self, req_id: &RequestId, method: &str) {
407 log::info!("handling {method} - ({req_id})");
408 }
409
410 fn stop_request(&self, req_id: &RequestId, method: &str, received_at: Instant) {
411 let duration = received_at.elapsed();
412 log::info!("handled {method} - ({req_id}) in {duration:0.2?}");
413 }
414
415 fn start_notification(&self, method: &str) {
416 log::info!("notifying {method}");
417 }
418
419 fn stop_notification(&self, method: &str, received_at: Instant, result: LspResult<()>) {
420 let request_duration = received_at.elapsed();
421 if let Err(err) = result {
422 log::error!("notify {method} failed in {request_duration:0.2?}: {err:?}");
423 } else {
424 log::info!("notify {method} succeeded in {request_duration:0.2?}");
425 }
426 }
427}
428
429type AsyncHandler<S, T, R> = fn(srv: &mut S, args: T) -> SchedulableResponse<R>;
430type RawHandler<S, T> = fn(srv: &mut S, req_id: RequestId, args: T) -> ScheduledResult;
431type BoxPureHandler<S, T> = Box<dyn Fn(&mut S, T) -> LspResult<()>>;
432type BoxHandler<S, T> = Box<dyn Fn(&mut S, &LspClient, RequestId, T) -> ScheduledResult>;
433type ExecuteCmdMap<S> = HashMap<&'static str, BoxHandler<S, Vec<JsonValue>>>;
434type RegularCmdMap<S> = HashMap<&'static str, BoxHandler<S, JsonValue>>;
435type NotifyCmdMap<S> = HashMap<&'static str, BoxPureHandler<S, JsonValue>>;
436type ResourceMap<S> = HashMap<ImmutPath, BoxHandler<S, Vec<JsonValue>>>;
437type MayInitBoxHandler<A, S, T> =
438 Box<dyn for<'a> Fn(ServiceState<'a, A, S>, &LspClient, T) -> anyhow::Result<()>>;
439type EventMap<A, S> = HashMap<core::any::TypeId, MayInitBoxHandler<A, S, Event>>;
440
441pub trait Initializer {
443 type I: for<'de> serde::Deserialize<'de>;
445 type S;
447
448 fn initialize(self, req: Self::I) -> (Self::S, AnySchedulableResponse);
452}
453
454#[cfg(feature = "lsp")]
456pub type LspBuilder<Args> = LsBuilder<LspMessage, Args>;
457#[cfg(feature = "dap")]
459pub type DapBuilder<Args> = LsBuilder<DapMessage, Args>;
460
461pub struct LsBuilder<M, Args: Initializer> {
463 pub args: Args,
465 pub client: LspClient,
467 pub events: EventMap<Args, Args::S>,
469 pub command_handlers: ExecuteCmdMap<Args::S>,
471 pub notif_handlers: NotifyCmdMap<Args::S>,
473 pub req_handlers: RegularCmdMap<Args::S>,
475 pub resource_handlers: ResourceMap<Args::S>,
477 _marker: std::marker::PhantomData<M>,
478}
479
480impl<M, Args: Initializer> LsBuilder<M, Args>
481where
482 Args::S: 'static,
483{
484 pub fn new(args: Args, client: LspClient) -> Self {
486 Self {
487 args,
488 client,
489 events: EventMap::new(),
490 command_handlers: ExecuteCmdMap::new(),
491 notif_handlers: NotifyCmdMap::new(),
492 req_handlers: RegularCmdMap::new(),
493 resource_handlers: ResourceMap::new(),
494 _marker: std::marker::PhantomData,
495 }
496 }
497
498 pub fn with_event<T: std::any::Any>(
500 mut self,
501 ins: &T,
502 handler: impl for<'a> Fn(ServiceState<'a, Args, Args::S>, T) -> anyhow::Result<()> + 'static,
503 ) -> Self {
504 self.events.insert(
505 ins.type_id(),
506 Box::new(move |s, _client, req| handler(s, *req.downcast().unwrap())),
507 );
508 self
509 }
510
511 pub fn with_resource_(
513 mut self,
514 path: ImmutPath,
515 handler: RawHandler<Args::S, Vec<JsonValue>>,
516 ) -> Self {
517 self.resource_handlers.insert(path, raw_to_boxed(handler));
518 self
519 }
520
521 pub fn with_resource(
523 mut self,
524 path: &'static str,
525 handler: fn(&mut Args::S, Vec<JsonValue>) -> AnySchedulableResponse,
526 ) -> Self {
527 self.resource_handlers.insert(
528 Path::new(path).into(),
529 Box::new(move |s, client, req_id, req| client.schedule(req_id, handler(s, req))),
530 );
531 self
532 }
533
534 pub fn build(self) -> LsDriver<M, Args> {
536 LsDriver {
537 state: State::Uninitialized(Some(Box::new(self.args))),
538 events: self.events,
539 client: self.client,
540 commands: self.command_handlers,
541 notifications: self.notif_handlers,
542 requests: self.req_handlers,
543 resources: self.resource_handlers,
544 _marker: std::marker::PhantomData,
545 }
546 }
547}
548
549pub enum ServiceState<'a, A, S> {
551 Uninitialized(Option<&'a mut A>),
553 Ready(&'a mut S),
555}
556
557impl<A, S> ServiceState<'_, A, S> {
558 pub fn ready(&mut self) -> Option<&mut S> {
560 match self {
561 ServiceState::Ready(s) => Some(s),
562 _ => None,
563 }
564 }
565}
566
567#[derive(Debug, Clone, PartialEq, Eq)]
568#[allow(dead_code)]
569enum State<Args, S> {
570 Uninitialized(Option<Box<Args>>),
571 Initializing(S),
572 Ready(S),
573 ShuttingDown,
574}
575
576impl<Args, S> State<Args, S> {
577 fn opt(&self) -> Option<&S> {
578 match &self {
579 State::Ready(s) => Some(s),
580 _ => None,
581 }
582 }
583
584 fn opt_mut(&mut self) -> Option<&mut S> {
585 match self {
586 State::Ready(s) => Some(s),
587 _ => None,
588 }
589 }
590}
591
592pub struct LsDriver<M, Args: Initializer> {
594 state: State<Args, Args::S>,
596 pub client: LspClient,
598
599 pub events: EventMap<Args, Args::S>,
602 pub commands: ExecuteCmdMap<Args::S>,
604 pub notifications: NotifyCmdMap<Args::S>,
606 pub requests: RegularCmdMap<Args::S>,
608 pub resources: ResourceMap<Args::S>,
610 _marker: std::marker::PhantomData<M>,
611}
612
613impl<M, Args: Initializer> LsDriver<M, Args> {
614 pub fn state(&self) -> Option<&Args::S> {
616 self.state.opt()
617 }
618
619 pub fn state_mut(&mut self) -> Option<&mut Args::S> {
621 self.state.opt_mut()
622 }
623
624 pub fn ready(&mut self, params: Args::I) -> AnySchedulableResponse {
626 let args = match &mut self.state {
627 State::Uninitialized(args) => args,
628 _ => return just_result(Err(invalid_request("server is already initialized"))),
629 };
630
631 let args = args.take().expect("already initialized");
632 let (s, res) = args.initialize(params);
633 self.state = State::Ready(s);
634
635 res
636 }
637
638 pub fn get_resources(&mut self, req_id: RequestId, args: Vec<JsonValue>) -> ScheduledResult {
641 let s = self.state.opt_mut().ok_or_else(not_initialized)?;
642
643 let path =
644 from_value::<PathBuf>(args[0].clone()).map_err(|e| invalid_params(e.to_string()))?;
645
646 let Some(handler) = self.resources.get(path.as_path()) else {
647 log::error!("asked for unknown resource: {path:?}");
648 return Err(method_not_found());
649 };
650
651 handler(s, &self.client, req_id, args)
653 }
654}
655
656pub fn just_ok<T, E>(res: T) -> Result<ResponseFuture<Result<T, E>>, E> {
658 Ok(futures::future::MaybeDone::Done(Ok(res)))
659}
660
661pub fn just_result<T, E>(res: Result<T, E>) -> Result<ResponseFuture<Result<T, E>>, E> {
663 Ok(futures::future::MaybeDone::Done(res))
664}
665
666pub fn just_future<T, E>(
668 fut: impl std::future::Future<Output = Result<T, E>> + Send + 'static,
669) -> Result<ResponseFuture<Result<T, E>>, E> {
670 Ok(futures::future::MaybeDone::Future(Box::pin(fut)))
671}
672
673pub fn invalid_params(msg: impl fmt::Display) -> ResponseError {
675 resp_err(ErrorCode::InvalidParams, msg)
676}
677
678pub fn internal_error(msg: impl fmt::Display) -> ResponseError {
680 resp_err(ErrorCode::InternalError, msg)
681}
682
683pub fn not_initialized() -> ResponseError {
685 resp_err(ErrorCode::ServerNotInitialized, "not initialized yet")
686}
687
688pub fn method_not_found() -> ResponseError {
690 resp_err(ErrorCode::MethodNotFound, "method not found")
691}
692
693pub fn invalid_request(msg: impl fmt::Display) -> ResponseError {
695 resp_err(ErrorCode::InvalidRequest, msg)
696}
697
698fn from_json<T: serde::de::DeserializeOwned>(json: JsonValue) -> LspResult<T> {
699 serde_json::from_value(json).map_err(invalid_request)
700}
701
702fn raw_to_boxed<S: 'static, T: 'static>(handler: RawHandler<S, T>) -> BoxHandler<S, T> {
703 Box::new(move |s, _client, req_id, req| handler(s, req_id, req))
704}
705
706fn resp_err(code: ErrorCode, msg: impl fmt::Display) -> ResponseError {
707 ResponseError {
708 code: code as i32,
709 message: msg.to_string(),
710 data: None,
711 }
712}