1#[cfg(feature = "dap")]
4mod dap_srv;
5
6#[cfg(feature = "lsp")]
7mod lsp_srv;
8
9use core::fmt;
10use std::any::Any;
11use std::collections::HashMap;
12use std::path::{Path, PathBuf};
13use std::pin::Pin;
14use std::sync::{Arc, Weak};
15use std::time::Instant;
16
17use futures::future::MaybeDone;
18use parking_lot::Mutex;
19use serde::Serialize;
20use serde_json::{from_value, Value as JsonValue};
21
22#[cfg(feature = "lsp")]
23use crate::lsp::{Notification, Request};
24use crate::msg::*;
25use crate::req_queue;
26use crate::*;
27
28type ImmutPath = Arc<Path>;
29
30pub type ResponseFuture<T> = MaybeDone<Pin<Box<dyn std::future::Future<Output = T> + Send>>>;
32pub type LspResponseFuture<T> = LspResult<ResponseFuture<T>>;
34pub type SchedulableResponse<T> = LspResponseFuture<LspResult<T>>;
36pub type AnySchedulableResponse = SchedulableResponse<JsonValue>;
38pub type ScheduledResult = LspResult<Option<()>>;
44
45pub type ConnectionTx = TConnectionTx<Message>;
47pub type ConnectionRx = TConnectionRx<Message>;
49
50#[derive(Debug, Clone)]
52pub struct TConnectionTx<M> {
53 pub event: crossbeam_channel::Sender<Event>,
55 pub lsp: crossbeam_channel::Sender<Message>,
57 pub(crate) marker: std::marker::PhantomData<M>,
58}
59
60#[derive(Debug, Clone)]
62pub struct TConnectionRx<M> {
63 pub event: crossbeam_channel::Receiver<Event>,
65 pub lsp: crossbeam_channel::Receiver<Message>,
67 pub(crate) marker: std::marker::PhantomData<M>,
68}
69
70impl<M: TryFrom<Message, Error = anyhow::Error>> TConnectionRx<M> {
71 pub(crate) fn recv(&self) -> anyhow::Result<EventOrMessage<M>> {
73 crossbeam_channel::select_biased! {
74 recv(self.lsp) -> msg => Ok(EventOrMessage::Msg(msg?.try_into()?)),
75 recv(self.event) -> event => Ok(event.map(EventOrMessage::Evt)?),
76 }
77 }
78}
79
80pub(crate) enum EventOrMessage<M> {
82 Evt(Event),
83 Msg(M),
84}
85
86pub struct Connection<M> {
88 pub sender: TConnectionTx<M>,
90 pub receiver: TConnectionRx<M>,
92}
93
94impl<M: TryFrom<Message, Error = anyhow::Error>> From<Connection<Message>> for Connection<M> {
95 fn from(conn: Connection<Message>) -> Self {
96 Self {
97 sender: TConnectionTx {
98 event: conn.sender.event,
99 lsp: conn.sender.lsp,
100 marker: std::marker::PhantomData,
101 },
102 receiver: TConnectionRx {
103 event: conn.receiver.event,
104 lsp: conn.receiver.lsp,
105 marker: std::marker::PhantomData,
106 },
107 }
108 }
109}
110
111impl<M: TryFrom<Message, Error = anyhow::Error>> From<TConnectionTx<M>> for ConnectionTx {
112 fn from(conn: TConnectionTx<M>) -> Self {
113 Self {
114 event: conn.event,
115 lsp: conn.lsp,
116 marker: std::marker::PhantomData,
117 }
118 }
119}
120
121type AnyCaster<S> = Arc<dyn Fn(&mut dyn Any) -> &mut S + Send + Sync>;
122
123pub struct TypedLspClient<S> {
125 client: LspClient,
126 caster: AnyCaster<S>,
127}
128
129impl<S> TypedLspClient<S> {
130 pub fn to_untyped(self) -> LspClient {
132 self.client
133 }
134}
135
136impl<S: 'static> TypedLspClient<S> {
137 pub fn untyped(&self) -> &LspClient {
139 &self.client
140 }
141
142 pub fn cast<T: 'static>(&self, f: fn(&mut S) -> &mut T) -> TypedLspClient<T> {
144 let caster = self.caster.clone();
145 TypedLspClient {
146 client: self.client.clone(),
147 caster: Arc::new(move |s| f(caster(s))),
148 }
149 }
150
151 pub fn send_event<T: std::any::Any + Send + 'static>(&self, event: T) {
153 let Some(sender) = self.sender.upgrade() else {
154 log::warn!("failed to send request: connection closed");
155 return;
156 };
157
158 let Err(res) = sender.event.send(Box::new(event)) else {
159 return;
160 };
161 log::warn!("failed to send event: {res:?}");
162 }
163}
164
165impl<S> Clone for TypedLspClient<S> {
166 fn clone(&self) -> Self {
167 Self {
168 client: self.client.clone(),
169 caster: self.caster.clone(),
170 }
171 }
172}
173
174impl<S> std::ops::Deref for TypedLspClient<S> {
175 type Target = LspClient;
176
177 fn deref(&self) -> &Self::Target {
178 &self.client
179 }
180}
181
182#[derive(Debug, Clone)]
185pub struct LspClientRoot {
186 weak: LspClient,
187 _strong: Arc<ConnectionTx>,
188}
189
190impl LspClientRoot {
191 pub fn new<M: TryFrom<Message, Error = anyhow::Error> + GetMessageKind>(
193 handle: tokio::runtime::Handle,
194 sender: TConnectionTx<M>,
195 ) -> Self {
196 let _strong = Arc::new(sender.into());
197 let weak = LspClient {
198 handle,
199 msg_kind: M::get_message_kind(),
200 sender: Arc::downgrade(&_strong),
201 req_queue: Arc::new(Mutex::new(ReqQueue::default())),
202 };
203 Self { weak, _strong }
204 }
205
206 pub fn weak(&self) -> LspClient {
208 self.weak.clone()
209 }
210}
211
212type ReqHandler = Box<dyn for<'a> FnOnce(&'a mut dyn Any, LspOrDapResponse) + Send + Sync>;
213type ReqQueue = req_queue::ReqQueue<(String, Instant), ReqHandler>;
214
215#[derive(Debug, Clone)]
217pub struct LspClient {
218 pub handle: tokio::runtime::Handle,
220
221 pub(crate) msg_kind: MessageKind,
222 pub(crate) sender: Weak<ConnectionTx>,
223 pub(crate) req_queue: Arc<Mutex<ReqQueue>>,
224}
225
226impl LspClient {
227 pub fn untyped(&self) -> &Self {
229 self
230 }
231
232 pub fn to_typed<S: Any>(&self) -> TypedLspClient<S> {
234 TypedLspClient {
235 client: self.clone(),
236 caster: Arc::new(|s| s.downcast_mut().expect("invalid cast")),
237 }
238 }
239
240 pub fn has_pending_requests(&self) -> bool {
242 self.req_queue.lock().incoming.has_pending()
243 }
244
245 pub fn begin_panic(&self) {
247 self.req_queue.lock().begin_panic();
248 }
249
250 pub fn send_event<T: std::any::Any + Send + 'static>(&self, event: T) {
252 let Some(sender) = self.sender.upgrade() else {
253 log::warn!("failed to send request: connection closed");
254 return;
255 };
256
257 if let Err(res) = sender.event.send(Box::new(event)) {
258 log::warn!("failed to send event: {res:?}");
259 }
260 }
261
262 #[cfg(feature = "lsp")]
264 pub fn complete_lsp_request<S: Any>(&self, service: &mut S, response: lsp::Response) {
265 let mut req_queue = self.req_queue.lock();
266 let Some(handler) = req_queue.outgoing.complete(response.id.clone()) else {
267 log::warn!("received response for unknown request");
268 return;
269 };
270 drop(req_queue);
271 handler(service, response.into())
272 }
273
274 #[cfg(feature = "dap")]
276 pub fn complete_dap_request<S: Any>(&self, service: &mut S, response: dap::Response) {
277 let mut req_queue = self.req_queue.lock();
278 let Some(handler) = req_queue
279 .outgoing
280 .complete((response.request_seq as i32).into())
282 else {
283 log::warn!("received response for unknown request");
284 return;
285 };
286 drop(req_queue);
287 handler(service, response.into())
288 }
289
290 pub fn register_request(&self, method: &str, id: &RequestId, received_at: Instant) {
292 let mut req_queue = self.req_queue.lock();
293 self.start_request(id, method);
294 req_queue
295 .incoming
296 .register(id.clone(), (method.to_owned(), received_at));
297 }
298
299 pub fn respond_result<T: Serialize>(&self, id: RequestId, result: LspResult<T>) {
301 let result = result.and_then(|t| serde_json::to_value(t).map_err(internal_error));
302 self.respond_any_result(id, result);
303 }
304
305 fn respond_any_result(&self, id: RequestId, result: LspResult<JsonValue>) {
306 let req_id = id.clone();
307 let msg: Message = match (self.msg_kind, result) {
308 #[cfg(feature = "lsp")]
309 (MessageKind::Lsp, Ok(resp)) => lsp::Response::new_ok(id, resp).into(),
310 #[cfg(feature = "lsp")]
311 (MessageKind::Lsp, Err(e)) => lsp::Response::new_err(id, e.code, e.message).into(),
312 #[cfg(feature = "dap")]
313 (MessageKind::Dap, Ok(resp)) => dap::Response::success(RequestId::dap(id), resp).into(),
314 #[cfg(feature = "dap")]
315 (MessageKind::Dap, Err(e)) => {
316 dap::Response::error(RequestId::dap(id), Some(e.message), None).into()
317 }
318 };
319
320 self.respond(req_id, msg);
321 }
322
323 pub fn respond(&self, id: RequestId, response: Message) {
325 let mut req_queue = self.req_queue.lock();
326 let Some((method, received_at)) = req_queue.incoming.complete(&id) else {
327 return;
328 };
329
330 self.stop_request(&id, &method, received_at);
331
332 let Some(sender) = self.sender.upgrade() else {
333 log::warn!("failed to send response ({method}, {id}): connection closed");
334 return;
335 };
336 if let Err(res) = sender.lsp.send(response) {
337 log::warn!("failed to send response ({method}, {id}): {res:?}");
338 }
339 }
340}
341
342impl LspClient {
343 pub fn schedule<T: Serialize + 'static>(
345 &self,
346 req_id: RequestId,
347 resp: SchedulableResponse<T>,
348 ) -> ScheduledResult {
349 let resp = resp?;
350
351 use futures::future::MaybeDone::*;
352 match resp {
353 Done(output) => {
354 self.respond_result(req_id, output);
355 }
356 Future(fut) => {
357 let client = self.clone();
358 let req_id = req_id.clone();
359 self.handle.spawn(async move {
360 client.respond_result(req_id, fut.await);
361 });
362 }
363 Gone => {
364 log::warn!("response for request({req_id:?}) already taken");
365 }
366 };
367
368 Ok(Some(()))
369 }
370
371 pub(crate) fn schedule_tail(&self, req_id: RequestId, resp: ScheduledResult) {
374 match resp {
375 Ok(Some(())) => {}
377 _ => self.respond_result(req_id, resp),
379 }
380 }
381}
382
383impl LspClient {
384 fn start_request(&self, req_id: &RequestId, method: &str) {
385 log::info!("handling {method} - ({req_id})");
386 }
387
388 fn stop_request(&self, req_id: &RequestId, method: &str, received_at: Instant) {
389 let duration = received_at.elapsed();
390 log::info!("handled {method} - ({req_id}) in {duration:0.2?}");
391 }
392
393 fn start_notification(&self, method: &str) {
394 log::info!("notifying {method}");
395 }
396
397 fn stop_notification(&self, method: &str, received_at: Instant, result: LspResult<()>) {
398 let request_duration = received_at.elapsed();
399 if let Err(err) = result {
400 log::error!("notify {method} failed in {request_duration:0.2?}: {err:?}");
401 } else {
402 log::info!("notify {method} succeeded in {request_duration:0.2?}");
403 }
404 }
405}
406
407type AsyncHandler<S, T, R> = fn(srv: &mut S, args: T) -> SchedulableResponse<R>;
408type RawHandler<S, T> = fn(srv: &mut S, req_id: RequestId, args: T) -> ScheduledResult;
409type BoxPureHandler<S, T> = Box<dyn Fn(&mut S, T) -> LspResult<()>>;
410type BoxHandler<S, T> = Box<dyn Fn(&mut S, &LspClient, RequestId, T) -> ScheduledResult>;
411type ExecuteCmdMap<S> = HashMap<&'static str, BoxHandler<S, Vec<JsonValue>>>;
412type RegularCmdMap<S> = HashMap<&'static str, BoxHandler<S, JsonValue>>;
413type NotifyCmdMap<S> = HashMap<&'static str, BoxPureHandler<S, JsonValue>>;
414type ResourceMap<S> = HashMap<ImmutPath, BoxHandler<S, Vec<JsonValue>>>;
415type MayInitBoxHandler<A, S, T> =
416 Box<dyn for<'a> Fn(ServiceState<'a, A, S>, &LspClient, T) -> anyhow::Result<()>>;
417type EventMap<A, S> = HashMap<core::any::TypeId, MayInitBoxHandler<A, S, Event>>;
418
419pub trait Initializer {
421 type I: for<'de> serde::Deserialize<'de>;
423 type S;
425
426 fn initialize(self, req: Self::I) -> (Self::S, AnySchedulableResponse);
430}
431
432#[cfg(feature = "lsp")]
434pub type LspBuilder<Args> = LsBuilder<LspMessage, Args>;
435#[cfg(feature = "dap")]
437pub type DapBuilder<Args> = LsBuilder<DapMessage, Args>;
438
439pub struct LsBuilder<M, Args: Initializer> {
441 pub args: Args,
443 pub client: LspClient,
445 pub events: EventMap<Args, Args::S>,
447 pub command_handlers: ExecuteCmdMap<Args::S>,
449 pub notif_handlers: NotifyCmdMap<Args::S>,
451 pub req_handlers: RegularCmdMap<Args::S>,
453 pub resource_handlers: ResourceMap<Args::S>,
455 _marker: std::marker::PhantomData<M>,
456}
457
458impl<M, Args: Initializer> LsBuilder<M, Args>
459where
460 Args::S: 'static,
461{
462 pub fn new(args: Args, client: LspClient) -> Self {
464 Self {
465 args,
466 client,
467 events: EventMap::new(),
468 command_handlers: ExecuteCmdMap::new(),
469 notif_handlers: NotifyCmdMap::new(),
470 req_handlers: RegularCmdMap::new(),
471 resource_handlers: ResourceMap::new(),
472 _marker: std::marker::PhantomData,
473 }
474 }
475
476 pub fn with_event<T: std::any::Any>(
478 mut self,
479 ins: &T,
480 handler: impl for<'a> Fn(ServiceState<'a, Args, Args::S>, T) -> anyhow::Result<()> + 'static,
481 ) -> Self {
482 self.events.insert(
483 ins.type_id(),
484 Box::new(move |s, _client, req| handler(s, *req.downcast().unwrap())),
485 );
486 self
487 }
488
489 pub fn with_resource_(
491 mut self,
492 path: ImmutPath,
493 handler: RawHandler<Args::S, Vec<JsonValue>>,
494 ) -> Self {
495 self.resource_handlers.insert(path, raw_to_boxed(handler));
496 self
497 }
498
499 pub fn with_resource(
501 mut self,
502 path: &'static str,
503 handler: fn(&mut Args::S, Vec<JsonValue>) -> AnySchedulableResponse,
504 ) -> Self {
505 self.resource_handlers.insert(
506 Path::new(path).into(),
507 Box::new(move |s, client, req_id, req| client.schedule(req_id, handler(s, req))),
508 );
509 self
510 }
511
512 pub fn build(self) -> LsDriver<M, Args> {
514 LsDriver {
515 state: State::Uninitialized(Some(Box::new(self.args))),
516 events: self.events,
517 client: self.client,
518 commands: self.command_handlers,
519 notifications: self.notif_handlers,
520 requests: self.req_handlers,
521 resources: self.resource_handlers,
522 _marker: std::marker::PhantomData,
523 }
524 }
525}
526
527pub enum ServiceState<'a, A, S> {
529 Uninitialized(Option<&'a mut A>),
531 Ready(&'a mut S),
533}
534
535impl<A, S> ServiceState<'_, A, S> {
536 pub fn ready(&mut self) -> Option<&mut S> {
538 match self {
539 ServiceState::Ready(s) => Some(s),
540 _ => None,
541 }
542 }
543}
544
545#[derive(Debug, Clone, PartialEq, Eq)]
546#[allow(dead_code)]
547enum State<Args, S> {
548 Uninitialized(Option<Box<Args>>),
549 Initializing(S),
550 Ready(S),
551 ShuttingDown,
552}
553
554impl<Args, S> State<Args, S> {
555 fn opt(&self) -> Option<&S> {
556 match &self {
557 State::Ready(s) => Some(s),
558 _ => None,
559 }
560 }
561
562 fn opt_mut(&mut self) -> Option<&mut S> {
563 match self {
564 State::Ready(s) => Some(s),
565 _ => None,
566 }
567 }
568}
569
570pub struct LsDriver<M, Args: Initializer> {
572 state: State<Args, Args::S>,
574 pub client: LspClient,
576
577 pub events: EventMap<Args, Args::S>,
580 pub commands: ExecuteCmdMap<Args::S>,
582 pub notifications: NotifyCmdMap<Args::S>,
584 pub requests: RegularCmdMap<Args::S>,
586 pub resources: ResourceMap<Args::S>,
588 _marker: std::marker::PhantomData<M>,
589}
590
591impl<M, Args: Initializer> LsDriver<M, Args> {
592 pub fn state(&self) -> Option<&Args::S> {
594 self.state.opt()
595 }
596
597 pub fn state_mut(&mut self) -> Option<&mut Args::S> {
599 self.state.opt_mut()
600 }
601
602 pub fn ready(&mut self, params: Args::I) -> AnySchedulableResponse {
604 let args = match &mut self.state {
605 State::Uninitialized(args) => args,
606 _ => return just_result(Err(invalid_request("server is already initialized"))),
607 };
608
609 let args = args.take().expect("already initialized");
610 let (s, res) = args.initialize(params);
611 self.state = State::Ready(s);
612
613 res
614 }
615
616 pub fn get_resources(&mut self, req_id: RequestId, args: Vec<JsonValue>) -> ScheduledResult {
619 let s = self.state.opt_mut().ok_or_else(not_initialized)?;
620
621 let path =
622 from_value::<PathBuf>(args[0].clone()).map_err(|e| invalid_params(e.to_string()))?;
623
624 let Some(handler) = self.resources.get(path.as_path()) else {
625 log::error!("asked for unknown resource: {path:?}");
626 return Err(method_not_found());
627 };
628
629 handler(s, &self.client, req_id, args)
631 }
632}
633
634pub fn just_ok<T, E>(res: T) -> Result<ResponseFuture<Result<T, E>>, E> {
636 Ok(futures::future::MaybeDone::Done(Ok(res)))
637}
638
639pub fn just_result<T, E>(res: Result<T, E>) -> Result<ResponseFuture<Result<T, E>>, E> {
641 Ok(futures::future::MaybeDone::Done(res))
642}
643
644pub fn just_future<T, E>(
646 fut: impl std::future::Future<Output = Result<T, E>> + Send + 'static,
647) -> Result<ResponseFuture<Result<T, E>>, E> {
648 Ok(futures::future::MaybeDone::Future(Box::pin(fut)))
649}
650
651pub fn invalid_params(msg: impl fmt::Display) -> ResponseError {
653 resp_err(ErrorCode::InvalidParams, msg)
654}
655
656pub fn internal_error(msg: impl fmt::Display) -> ResponseError {
658 resp_err(ErrorCode::InternalError, msg)
659}
660
661pub fn not_initialized() -> ResponseError {
663 resp_err(ErrorCode::ServerNotInitialized, "not initialized yet")
664}
665
666pub fn method_not_found() -> ResponseError {
668 resp_err(ErrorCode::MethodNotFound, "method not found")
669}
670
671pub fn invalid_request(msg: impl fmt::Display) -> ResponseError {
673 resp_err(ErrorCode::InvalidRequest, msg)
674}
675
676fn from_json<T: serde::de::DeserializeOwned>(json: JsonValue) -> LspResult<T> {
677 serde_json::from_value(json).map_err(invalid_request)
678}
679
680fn raw_to_boxed<S: 'static, T: 'static>(handler: RawHandler<S, T>) -> BoxHandler<S, T> {
681 Box::new(move |s, _client, req_id, req| handler(s, req_id, req))
682}
683
684fn resp_err(code: ErrorCode, msg: impl fmt::Display) -> ResponseError {
685 ResponseError {
686 code: code as i32,
687 message: msg.to_string(),
688 data: None,
689 }
690}