1#[cfg(feature = "dap")]
4mod dap_srv;
5
6#[cfg(feature = "lsp")]
7mod lsp_srv;
8
9use core::fmt;
10use std::any::Any;
11use std::collections::HashMap;
12use std::path::{Path, PathBuf};
13use std::pin::Pin;
14use std::sync::{Arc, Weak};
15use std::time::Instant;
16
17use futures::future::MaybeDone;
18use parking_lot::Mutex;
19use serde::Serialize;
20use serde_json::{from_value, Value as JsonValue};
21
22#[cfg(feature = "lsp")]
23use crate::lsp::{Notification, Request};
24use crate::msg::*;
25use crate::req_queue;
26use crate::*;
27
28type ImmutPath = Arc<Path>;
29
30pub type ResponseFuture<T> = MaybeDone<Pin<Box<dyn std::future::Future<Output = T> + Send>>>;
32pub type LspResponseFuture<T> = LspResult<ResponseFuture<T>>;
34pub type SchedulableResponse<T> = LspResponseFuture<LspResult<T>>;
36pub type AnySchedulableResponse = SchedulableResponse<JsonValue>;
38pub type ScheduledResult = LspResult<Option<()>>;
44
45pub type ConnectionTx = TConnectionTx<Message>;
47pub type ConnectionRx = TConnectionRx<Message>;
49
50#[derive(Debug, Clone)]
52pub struct TConnectionTx<M> {
53 pub event: crossbeam_channel::Sender<Event>,
55 pub lsp: crossbeam_channel::Sender<Message>,
57 pub(crate) marker: std::marker::PhantomData<M>,
58}
59
60#[derive(Debug, Clone)]
62pub struct TConnectionRx<M> {
63 pub event: crossbeam_channel::Receiver<Event>,
65 pub lsp: crossbeam_channel::Receiver<Message>,
67 pub(crate) marker: std::marker::PhantomData<M>,
68}
69
70impl<M: TryFrom<Message, Error = anyhow::Error>> TConnectionRx<M> {
71 pub(crate) fn recv(&self) -> anyhow::Result<EventOrMessage<M>> {
73 crossbeam_channel::select_biased! {
74 recv(self.lsp) -> msg => Ok(EventOrMessage::Msg(msg?.try_into()?)),
75 recv(self.event) -> event => Ok(event.map(EventOrMessage::Evt)?),
76 }
77 }
78}
79
80pub(crate) enum EventOrMessage<M> {
82 Evt(Event),
83 Msg(M),
84}
85
86pub struct Connection<M> {
88 pub sender: TConnectionTx<M>,
90 pub receiver: TConnectionRx<M>,
92}
93
94impl<M: TryFrom<Message, Error = anyhow::Error>> From<Connection<Message>> for Connection<M> {
95 fn from(conn: Connection<Message>) -> Self {
96 Self {
97 sender: TConnectionTx {
98 event: conn.sender.event,
99 lsp: conn.sender.lsp,
100 marker: std::marker::PhantomData,
101 },
102 receiver: TConnectionRx {
103 event: conn.receiver.event,
104 lsp: conn.receiver.lsp,
105 marker: std::marker::PhantomData,
106 },
107 }
108 }
109}
110
111impl<M: TryFrom<Message, Error = anyhow::Error>> From<TConnectionTx<M>> for ConnectionTx {
112 fn from(conn: TConnectionTx<M>) -> Self {
113 Self {
114 event: conn.event,
115 lsp: conn.lsp,
116 marker: std::marker::PhantomData,
117 }
118 }
119}
120
121type AnyCaster<S> = Arc<dyn Fn(&mut dyn Any) -> &mut S + Send + Sync>;
122
123pub struct TypedLspClient<S> {
125 client: LspClient,
126 caster: AnyCaster<S>,
127}
128
129impl<S> TypedLspClient<S> {
130 pub fn to_untyped(self) -> LspClient {
132 self.client
133 }
134}
135
136impl<S: 'static> TypedLspClient<S> {
137 pub fn untyped(&self) -> &LspClient {
139 &self.client
140 }
141
142 pub fn cast<T: 'static>(&self, f: fn(&mut S) -> &mut T) -> TypedLspClient<T> {
144 let caster = self.caster.clone();
145 TypedLspClient {
146 client: self.client.clone(),
147 caster: Arc::new(move |s| f(caster(s))),
148 }
149 }
150
151 pub fn send_event<T: std::any::Any + Send + 'static>(&self, event: T) {
153 let Some(sender) = self.sender.upgrade() else {
154 log::warn!("failed to send request: connection closed");
155 return;
156 };
157
158 let Err(res) = sender.event.send(Box::new(event)) else {
159 return;
160 };
161 log::warn!("failed to send event: {res:?}");
162 }
163}
164
165impl<S> Clone for TypedLspClient<S> {
166 fn clone(&self) -> Self {
167 Self {
168 client: self.client.clone(),
169 caster: self.caster.clone(),
170 }
171 }
172}
173
174impl<S> std::ops::Deref for TypedLspClient<S> {
175 type Target = LspClient;
176
177 fn deref(&self) -> &Self::Target {
178 &self.client
179 }
180}
181
182#[derive(Debug, Clone)]
185pub struct LspClientRoot {
186 weak: LspClient,
187 _strong: Arc<ConnectionTx>,
188}
189
190impl LspClientRoot {
191 pub fn new<M: TryFrom<Message, Error = anyhow::Error> + GetMessageKind>(
193 handle: tokio::runtime::Handle,
194 sender: TConnectionTx<M>,
195 ) -> Self {
196 let _strong = Arc::new(sender.into());
197 let weak = LspClient {
198 handle,
199 msg_kind: M::get_message_kind(),
200 sender: Arc::downgrade(&_strong),
201 req_queue: Arc::new(Mutex::new(ReqQueue::default())),
202 };
203 Self { weak, _strong }
204 }
205
206 pub fn weak(&self) -> LspClient {
208 self.weak.clone()
209 }
210}
211
212type ReqHandler = Box<dyn for<'a> FnOnce(&'a mut dyn Any, LspOrDapResponse) + Send + Sync>;
213type ReqQueue = req_queue::ReqQueue<(String, Instant), ReqHandler>;
214
215#[derive(Debug, Clone)]
217pub struct LspClient {
218 pub handle: tokio::runtime::Handle,
220
221 pub(crate) msg_kind: MessageKind,
222 pub(crate) sender: Weak<ConnectionTx>,
223 pub(crate) req_queue: Arc<Mutex<ReqQueue>>,
224}
225
226impl LspClient {
227 pub fn untyped(&self) -> &Self {
229 self
230 }
231
232 pub fn to_typed<S: Any>(&self) -> TypedLspClient<S> {
234 TypedLspClient {
235 client: self.clone(),
236 caster: Arc::new(|s| s.downcast_mut().expect("invalid cast")),
237 }
238 }
239
240 pub fn has_pending_requests(&self) -> bool {
242 self.req_queue.lock().incoming.has_pending()
243 }
244
245 pub fn begin_panic(&self) {
247 self.req_queue.lock().begin_panic();
248 }
249
250 pub fn send_event<T: std::any::Any + Send + 'static>(&self, event: T) {
252 let Some(sender) = self.sender.upgrade() else {
253 log::warn!("failed to send request: connection closed");
254 return;
255 };
256
257 if let Err(res) = sender.event.send(Box::new(event)) {
258 log::warn!("failed to send event: {res:?}");
259 }
260 }
261
262 #[cfg(feature = "lsp")]
264 pub fn complete_lsp_request<S: Any>(&self, service: &mut S, response: lsp::Response) {
265 let mut req_queue = self.req_queue.lock();
266 let Some(handler) = req_queue.outgoing.complete(response.id.clone()) else {
267 log::warn!("received response for unknown request");
268 return;
269 };
270 drop(req_queue);
271 handler(service, response.into())
272 }
273
274 #[cfg(feature = "dap")]
276 pub fn complete_dap_request<S: Any>(&self, service: &mut S, response: dap::Response) {
277 let mut req_queue = self.req_queue.lock();
278 let Some(handler) = req_queue
279 .outgoing
280 .complete((response.request_seq as i32).into())
282 else {
283 log::warn!("received response for unknown request");
284 return;
285 };
286 drop(req_queue);
287 handler(service, response.into())
288 }
289
290 pub fn register_request(&self, method: &str, id: &RequestId, received_at: Instant) {
292 let mut req_queue = self.req_queue.lock();
293 self.start_request(id, method);
294 req_queue
295 .incoming
296 .register(id.clone(), (method.to_owned(), received_at));
297 }
298
299 pub fn respond_result<T: Serialize>(&self, id: RequestId, result: LspResult<T>) {
301 let result = result.and_then(|t| serde_json::to_value(t).map_err(internal_error));
302 self.respond_any_result(id, result);
303 }
304
305 fn respond_any_result(&self, id: RequestId, result: LspResult<JsonValue>) {
306 let req_id = id.clone();
307 let msg: Message = match (self.msg_kind, result) {
308 #[cfg(feature = "lsp")]
309 (MessageKind::Lsp, Ok(resp)) => lsp::Response::new_ok(id, resp).into(),
310 #[cfg(feature = "lsp")]
311 (MessageKind::Lsp, Err(e)) => lsp::Response::new_err(id, e.code, e.message).into(),
312 #[cfg(feature = "dap")]
313 (MessageKind::Dap, Ok(resp)) => dap::Response::success(RequestId::dap(id), resp).into(),
314 #[cfg(feature = "dap")]
315 (MessageKind::Dap, Err(e)) => {
316 dap::Response::error(RequestId::dap(id), Some(e.message), None).into()
317 }
318 };
319
320 self.respond(req_id, msg);
321 }
322
323 pub fn respond(&self, id: RequestId, response: Message) {
325 let mut req_queue = self.req_queue.lock();
326 let Some((method, received_at)) = req_queue.incoming.complete(&id) else {
327 return;
328 };
329
330 self.stop_request(&id, &method, received_at);
331
332 let Some(sender) = self.sender.upgrade() else {
333 log::warn!("failed to send response ({method}, {id}): connection closed");
334 return;
335 };
336 if let Err(res) = sender.lsp.send(response) {
337 log::warn!("failed to send response ({method}, {id}): {res:?}");
338 }
339 }
340}
341
342impl LspClient {
343 pub fn schedule<T: Serialize + 'static>(
345 &self,
346 req_id: RequestId,
347 resp: SchedulableResponse<T>,
348 ) -> ScheduledResult {
349 let resp = resp?;
350
351 use futures::future::MaybeDone::*;
352 match resp {
353 Done(output) => {
354 self.respond_result(req_id, output);
355 }
356 Future(fut) => {
357 let client = self.clone();
358 let req_id = req_id.clone();
359 self.handle.spawn(async move {
360 client.respond_result(req_id, fut.await);
361 });
362 }
363 Gone => {
364 log::warn!("response for request({req_id:?}) already taken");
365 }
366 };
367
368 Ok(Some(()))
369 }
370
371 pub(crate) fn schedule_tail(&self, req_id: RequestId, resp: ScheduledResult) {
373 match resp {
374 Ok(Some(())) => {}
376 _ => self.respond_result(req_id, resp),
378 }
379 }
380}
381
382impl LspClient {
383 fn start_request(&self, req_id: &RequestId, method: &str) {
384 log::info!("handling {method} - ({req_id})");
385 }
386
387 fn stop_request(&self, req_id: &RequestId, method: &str, received_at: Instant) {
388 let duration = received_at.elapsed();
389 log::info!("handled {method} - ({req_id}) in {duration:0.2?}");
390 }
391
392 fn start_notification(&self, method: &str) {
393 log::info!("notifying {method}");
394 }
395
396 fn stop_notification(&self, method: &str, received_at: Instant, result: LspResult<()>) {
397 let request_duration = received_at.elapsed();
398 if let Err(err) = result {
399 log::error!("notify {method} failed in {request_duration:0.2?}: {err:?}");
400 } else {
401 log::info!("notify {method} succeeded in {request_duration:0.2?}");
402 }
403 }
404}
405
406type AsyncHandler<S, T, R> = fn(srv: &mut S, args: T) -> SchedulableResponse<R>;
407type RawHandler<S, T> = fn(srv: &mut S, req_id: RequestId, args: T) -> ScheduledResult;
408type BoxPureHandler<S, T> = Box<dyn Fn(&mut S, T) -> LspResult<()>>;
409type BoxHandler<S, T> = Box<dyn Fn(&mut S, &LspClient, RequestId, T) -> ScheduledResult>;
410type ExecuteCmdMap<S> = HashMap<&'static str, BoxHandler<S, Vec<JsonValue>>>;
411type RegularCmdMap<S> = HashMap<&'static str, BoxHandler<S, JsonValue>>;
412type NotifyCmdMap<S> = HashMap<&'static str, BoxPureHandler<S, JsonValue>>;
413type ResourceMap<S> = HashMap<ImmutPath, BoxHandler<S, Vec<JsonValue>>>;
414type MayInitBoxHandler<A, S, T> =
415 Box<dyn for<'a> Fn(ServiceState<'a, A, S>, &LspClient, T) -> anyhow::Result<()>>;
416type EventMap<A, S> = HashMap<core::any::TypeId, MayInitBoxHandler<A, S, Event>>;
417
418pub trait Initializer {
420 type I: for<'de> serde::Deserialize<'de>;
422 type S;
424
425 fn initialize(self, req: Self::I) -> (Self::S, AnySchedulableResponse);
429}
430
431#[cfg(feature = "lsp")]
433pub type LspBuilder<Args> = LsBuilder<LspMessage, Args>;
434#[cfg(feature = "dap")]
436pub type DapBuilder<Args> = LsBuilder<DapMessage, Args>;
437
438pub struct LsBuilder<M, Args: Initializer> {
440 pub args: Args,
442 pub client: LspClient,
444 pub events: EventMap<Args, Args::S>,
446 pub command_handlers: ExecuteCmdMap<Args::S>,
448 pub notif_handlers: NotifyCmdMap<Args::S>,
450 pub req_handlers: RegularCmdMap<Args::S>,
452 pub resource_handlers: ResourceMap<Args::S>,
454 _marker: std::marker::PhantomData<M>,
455}
456
457impl<M, Args: Initializer> LsBuilder<M, Args>
458where
459 Args::S: 'static,
460{
461 pub fn new(args: Args, client: LspClient) -> Self {
463 Self {
464 args,
465 client,
466 events: EventMap::new(),
467 command_handlers: ExecuteCmdMap::new(),
468 notif_handlers: NotifyCmdMap::new(),
469 req_handlers: RegularCmdMap::new(),
470 resource_handlers: ResourceMap::new(),
471 _marker: std::marker::PhantomData,
472 }
473 }
474
475 pub fn with_event<T: std::any::Any>(
477 mut self,
478 ins: &T,
479 handler: impl for<'a> Fn(ServiceState<'a, Args, Args::S>, T) -> anyhow::Result<()> + 'static,
480 ) -> Self {
481 self.events.insert(
482 ins.type_id(),
483 Box::new(move |s, _client, req| handler(s, *req.downcast().unwrap())),
484 );
485 self
486 }
487
488 pub fn with_resource_(
490 mut self,
491 path: ImmutPath,
492 handler: RawHandler<Args::S, Vec<JsonValue>>,
493 ) -> Self {
494 self.resource_handlers.insert(path, raw_to_boxed(handler));
495 self
496 }
497
498 pub fn with_resource(
500 mut self,
501 path: &'static str,
502 handler: fn(&mut Args::S, Vec<JsonValue>) -> AnySchedulableResponse,
503 ) -> Self {
504 self.resource_handlers.insert(
505 Path::new(path).into(),
506 Box::new(move |s, client, req_id, req| client.schedule(req_id, handler(s, req))),
507 );
508 self
509 }
510
511 pub fn build(self) -> LsDriver<M, Args> {
513 LsDriver {
514 state: State::Uninitialized(Some(Box::new(self.args))),
515 events: self.events,
516 client: self.client,
517 commands: self.command_handlers,
518 notifications: self.notif_handlers,
519 requests: self.req_handlers,
520 resources: self.resource_handlers,
521 _marker: std::marker::PhantomData,
522 }
523 }
524}
525
526pub enum ServiceState<'a, A, S> {
528 Uninitialized(Option<&'a mut A>),
530 Ready(&'a mut S),
532}
533
534impl<A, S> ServiceState<'_, A, S> {
535 pub fn ready(&mut self) -> Option<&mut S> {
537 match self {
538 ServiceState::Ready(s) => Some(s),
539 _ => None,
540 }
541 }
542}
543
544#[derive(Debug, Clone, PartialEq, Eq)]
545#[allow(dead_code)]
546enum State<Args, S> {
547 Uninitialized(Option<Box<Args>>),
548 Initializing(S),
549 Ready(S),
550 ShuttingDown,
551}
552
553impl<Args, S> State<Args, S> {
554 fn opt(&self) -> Option<&S> {
555 match &self {
556 State::Ready(s) => Some(s),
557 _ => None,
558 }
559 }
560
561 fn opt_mut(&mut self) -> Option<&mut S> {
562 match self {
563 State::Ready(s) => Some(s),
564 _ => None,
565 }
566 }
567}
568
569pub struct LsDriver<M, Args: Initializer> {
571 state: State<Args, Args::S>,
573 pub client: LspClient,
575
576 pub events: EventMap<Args, Args::S>,
579 pub commands: ExecuteCmdMap<Args::S>,
581 pub notifications: NotifyCmdMap<Args::S>,
583 pub requests: RegularCmdMap<Args::S>,
585 pub resources: ResourceMap<Args::S>,
587 _marker: std::marker::PhantomData<M>,
588}
589
590impl<M, Args: Initializer> LsDriver<M, Args> {
591 pub fn state(&self) -> Option<&Args::S> {
593 self.state.opt()
594 }
595
596 pub fn state_mut(&mut self) -> Option<&mut Args::S> {
598 self.state.opt_mut()
599 }
600
601 pub fn ready(&mut self, params: Args::I) -> AnySchedulableResponse {
603 let args = match &mut self.state {
604 State::Uninitialized(args) => args,
605 _ => return just_result(Err(invalid_request("server is already initialized"))),
606 };
607
608 let args = args.take().expect("already initialized");
609 let (s, res) = args.initialize(params);
610 self.state = State::Ready(s);
611
612 res
613 }
614
615 pub fn get_resources(&mut self, req_id: RequestId, args: Vec<JsonValue>) -> ScheduledResult {
618 let s = self.state.opt_mut().ok_or_else(not_initialized)?;
619
620 let path =
621 from_value::<PathBuf>(args[0].clone()).map_err(|e| invalid_params(e.to_string()))?;
622
623 let Some(handler) = self.resources.get(path.as_path()) else {
624 log::error!("asked for unknown resource: {path:?}");
625 return Err(method_not_found());
626 };
627
628 handler(s, &self.client, req_id, args)
630 }
631}
632
633pub fn just_ok<T, E>(res: T) -> Result<ResponseFuture<Result<T, E>>, E> {
635 Ok(futures::future::MaybeDone::Done(Ok(res)))
636}
637
638pub fn just_result<T, E>(res: Result<T, E>) -> Result<ResponseFuture<Result<T, E>>, E> {
640 Ok(futures::future::MaybeDone::Done(res))
641}
642
643pub fn just_future<T, E>(
645 fut: impl std::future::Future<Output = Result<T, E>> + Send + 'static,
646) -> Result<ResponseFuture<Result<T, E>>, E> {
647 Ok(futures::future::MaybeDone::Future(Box::pin(fut)))
648}
649
650pub fn invalid_params(msg: impl fmt::Display) -> ResponseError {
652 resp_err(ErrorCode::InvalidParams, msg)
653}
654
655pub fn internal_error(msg: impl fmt::Display) -> ResponseError {
657 resp_err(ErrorCode::InternalError, msg)
658}
659
660pub fn not_initialized() -> ResponseError {
662 resp_err(ErrorCode::ServerNotInitialized, "not initialized yet")
663}
664
665pub fn method_not_found() -> ResponseError {
667 resp_err(ErrorCode::MethodNotFound, "method not found")
668}
669
670pub fn invalid_request(msg: impl fmt::Display) -> ResponseError {
672 resp_err(ErrorCode::InvalidRequest, msg)
673}
674
675fn from_json<T: serde::de::DeserializeOwned>(json: JsonValue) -> LspResult<T> {
676 serde_json::from_value(json).map_err(invalid_request)
677}
678
679fn raw_to_boxed<S: 'static, T: 'static>(handler: RawHandler<S, T>) -> BoxHandler<S, T> {
680 Box::new(move |s, _client, req_id, req| handler(s, req_id, req))
681}
682
683fn resp_err(code: ErrorCode, msg: impl fmt::Display) -> ResponseError {
684 ResponseError {
685 code: code as i32,
686 message: msg.to_string(),
687 data: None,
688 }
689}