lsp_server_tokio/connection.rs
1//! Connection abstraction for bidirectional LSP message communication.
2//!
3//! This module provides the [`Connection`] type which is the primary entry point
4//! for LSP communication. It wraps a transport and provides:
5//!
6//! - Split sender/receiver halves for async message passing
7//! - Request queue for tracking pending requests in both directions
8//! - Lifecycle state management for LSP initialization and shutdown
9//!
10//! # Overview
11//!
12//! A [`Connection`] is constructed from any `AsyncRead + AsyncWrite` stream and
13//! provides a [`send()`](Connection::send) method for outbound messages and a public
14//! [`receiver_mut()`](Connection::receiver_mut) method for inbound messages. It also contains
15//! a [`RequestQueue`] for tracking pending requests.
16//!
17//! # Examples
18//!
19//! ## Basic usage with duplex streams
20//!
21//! ```
22//! use futures::{SinkExt, StreamExt};
23//! use lsp_server_tokio::{Connection, Message, Request};
24//!
25//! # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
26//! let (client_stream, server_stream) = tokio::io::duplex(4096);
27//!
28//! // Create connections
29//! let mut client: Connection<_, ()> = Connection::new(client_stream);
30//! let mut server: Connection<_, ()> = Connection::new(server_stream);
31//!
32//! // Send a request from client
33//! let request = Message::Request(Request::new(1, "textDocument/hover", None));
34//! client.send(request).unwrap();
35//!
36//! // Receive on server
37//! let msg = server.receiver_mut().next().await.unwrap().unwrap();
38//! assert!(msg.is_request());
39//! # });
40//! ```
41//!
42//! ## With request queue tracking
43//!
44//! ```
45//! use lsp_server_tokio::{Connection, RequestQueue};
46//!
47//! # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
48//! let (stream, _) = tokio::io::duplex(4096);
49//!
50//! // Create connection with typed request queue
51//! let mut conn: Connection<_, String> = Connection::new(stream);
52//!
53//! // Track an incoming request with a cancellation token
54//! use tokio_util::sync::CancellationToken;
55//! let token = CancellationToken::new();
56//! conn.request_queue.incoming.register(1.into(), "textDocument/hover".to_string(), token);
57//! assert!(conn.request_queue.incoming.is_pending(&1.into()));
58//! # });
59//! ```
60//!
61//! ## Stdio for LSP servers
62//!
63//! ```no_run
64//! use lsp_server_tokio::Connection;
65//!
66//! # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
67//! // Create a connection over stdin/stdout for typical LSP server usage
68//! let conn = Connection::stdio();
69//! // Use conn.send() to write responses, conn.receiver_mut() to read requests
70//! # });
71//! ```
72
73use std::io;
74use std::pin::Pin;
75use std::task::{Context, Poll};
76
77use futures::stream::SplitStream;
78use futures::{SinkExt, StreamExt};
79use pin_project_lite::pin_project;
80use tokio::io::{AsyncRead, AsyncWrite, ReadBuf, Stdin, Stdout};
81use tokio::sync::mpsc;
82use tokio_util::sync::CancellationToken;
83
84use crate::client_sender::ResponseMap;
85use crate::lifecycle::{ExitCode, LifecycleState, ProtocolError};
86use crate::ClientSender;
87use crate::{transport, Message, RequestQueue, Transport};
88
89/// Spawns a background task that drains messages from the channel into the transport sink.
90///
91/// Returns the sender half of the channel and a cancellation token that is
92/// triggered when the drain task exits (e.g., because the transport is broken).
93fn spawn_drain_task<T>(
94 mut sender: futures::stream::SplitSink<Transport<T>, Message>,
95) -> (mpsc::UnboundedSender<Message>, CancellationToken)
96where
97 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
98{
99 let (tx, mut rx) = mpsc::unbounded_channel();
100 let drain_alive = CancellationToken::new();
101 let drain_guard = drain_alive.clone().drop_guard();
102
103 tokio::spawn(async move {
104 let _guard = drain_guard;
105 while let Some(message) = rx.recv().await {
106 if sender.send(message).await.is_err() {
107 return;
108 }
109 }
110 });
111
112 (tx, drain_alive)
113}
114
115/// A [`Connection`] backed by [`StdioTransport`].
116///
117/// This alias is useful when building stdio-based servers that want to use
118/// custom metadata types for request tracking without repeating the full
119/// `Connection<StdioTransport, I>` generic.
120///
121/// # Examples
122///
123/// ```no_run
124/// use lsp_server_tokio::{StdioTransport, Connection, StdioConnection};
125///
126/// let conn: StdioConnection<String> = Connection::new(StdioTransport::new());
127/// ```
128pub type StdioConnection<I = ()> = Connection<StdioTransport, I>;
129
130pin_project! {
131 /// The receiver half of an LSP connection.
132 ///
133 /// An opaque stream of inbound LSP messages. Implements
134 /// [`Stream<Item = Result<Message, io::Error>>`](futures::Stream) so you can
135 /// use it with [`StreamExt::next()`](futures::StreamExt::next).
136 ///
137 /// Obtain one via [`Connection::into_receiver`] or [`Connection::receiver_mut`].
138 ///
139 /// # Example
140 ///
141 /// ```
142 /// use futures::StreamExt;
143 /// use lsp_server_tokio::Connection;
144 ///
145 /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
146 /// let (stream, _) = tokio::io::duplex(4096);
147 /// let mut conn: Connection<_, ()> = Connection::new(stream);
148 ///
149 /// // Poll the next inbound message
150 /// // let msg = conn.receiver_mut().next().await;
151 /// # });
152 /// ```
153 pub struct MessageStream<T> {
154 #[pin]
155 inner: SplitStream<Transport<T>>,
156 }
157}
158
159impl<T> MessageStream<T> {
160 fn new(inner: SplitStream<Transport<T>>) -> Self {
161 Self { inner }
162 }
163}
164
165impl<T: AsyncRead + AsyncWrite> futures::Stream for MessageStream<T> {
166 type Item = Result<Message, io::Error>;
167
168 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
169 self.project().inner.poll_next(cx)
170 }
171}
172
173/// A bidirectional LSP connection with split sender/receiver and request tracking.
174///
175/// `Connection` is the primary type for LSP communication. It provides:
176///
177/// - [`send()`](Connection::send): Send outbound messages (non-blocking, channel-based)
178/// - [`receiver_mut()`](Connection::receiver_mut): A stream for receiving inbound messages
179/// - [`request_queue`](Connection::request_queue): Tracking for pending requests
180///
181/// At construction, a background drain task is spawned to forward messages from
182/// an internal channel to the underlying transport. This means [`send()`](Connection::send)
183/// never blocks on I/O — it only enqueues the message.
184///
185/// # Type Parameters
186///
187/// - `T`: The underlying I/O stream type (`AsyncRead + AsyncWrite`)
188/// - `I`: Metadata type for incoming requests (default: `()`)
189///
190/// # Examples
191///
192/// ## Basic construction
193///
194/// ```
195/// use lsp_server_tokio::Connection;
196///
197/// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
198/// let (stream, _) = tokio::io::duplex(4096);
199///
200/// // Simple connection with unit metadata types
201/// let conn: Connection<_, ()> = Connection::new(stream);
202/// # });
203/// ```
204///
205/// ## With custom metadata types
206///
207/// ```
208/// use lsp_server_tokio::Connection;
209///
210/// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
211/// let (stream, _) = tokio::io::duplex(4096);
212///
213/// // Connection tracking method names for incoming, JSON values for outgoing
214/// let conn: Connection<_, String> = Connection::new(stream);
215/// # });
216/// ```
217pub struct Connection<T, I = ()>
218where
219 T: AsyncRead + AsyncWrite,
220{
221 /// Channel for sending outbound messages to the background drain task.
222 ///
223 /// The drain task forwards messages to the underlying transport sink.
224 /// This channel is always available — no `Option`, no "taken" state.
225 outbound_tx: mpsc::UnboundedSender<Message>,
226
227 /// The receiver half for inbound messages.
228 ///
229 /// Use [`receiver_mut()`](Connection::receiver_mut) to access the stream,
230 /// or [`into_receiver()`](Connection::into_receiver) to take ownership.
231 receiver: MessageStream<T>,
232
233 /// Request queue for tracking pending incoming and outgoing requests.
234 ///
235 /// - `request_queue.incoming`: Track requests you've received and need to respond to
236 /// - `request_queue.outgoing`: Track requests you've sent and are awaiting responses for
237 pub request_queue: RequestQueue<I>,
238
239 /// The current lifecycle state of the connection.
240 lifecycle_state: LifecycleState,
241
242 /// Cancellation token that is triggered when shutdown is requested.
243 shutdown_token: CancellationToken,
244
245 /// Shared response routing state used by [`ClientSender`], if enabled.
246 response_map: Option<ResponseMap>,
247
248 /// Cancellation token that is triggered when the background drain task exits.
249 ///
250 /// The drain task holds a [`DropGuard`] for this token. When the task exits
251 /// (e.g., because the transport is broken), the token is cancelled, which
252 /// allows [`ClientSender::request`] to detect disconnection instead of
253 /// hanging forever waiting for a response that will never arrive.
254 drain_alive: CancellationToken,
255}
256
257impl<T, I> Connection<T, I>
258where
259 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
260{
261 /// Creates a new connection from an async I/O stream.
262 ///
263 /// This constructor wraps the stream with [`crate::LspCodec`] for Content-Length
264 /// framing, splits it into sender/receiver halves, and spawns a background
265 /// drain task for outbound messages.
266 ///
267 /// # Arguments
268 ///
269 /// * `io` - Any async I/O stream implementing `AsyncRead + AsyncWrite + Unpin + Send + 'static`
270 ///
271 /// # Example
272 ///
273 /// ```
274 /// use lsp_server_tokio::Connection;
275 ///
276 /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
277 /// let (stream, _) = tokio::io::duplex(4096);
278 /// let conn: Connection<_, ()> = Connection::new(stream);
279 /// # });
280 /// ```
281 pub fn new(io: T) -> Self {
282 let transport = transport(io);
283 Self::from_transport(transport)
284 }
285
286 /// Creates a new connection from an existing transport.
287 ///
288 /// This is useful when you already have a [`Transport`] and want to
289 /// upgrade it to a full `Connection` with request tracking.
290 ///
291 /// # Arguments
292 ///
293 /// * `transport` - An existing LSP transport
294 ///
295 /// # Example
296 ///
297 /// ```
298 /// use lsp_server_tokio::{transport, Connection};
299 ///
300 /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
301 /// let (stream, _) = tokio::io::duplex(4096);
302 /// let transport = transport(stream);
303 /// let conn: Connection<_, ()> = Connection::from_transport(transport);
304 /// # });
305 /// ```
306 pub fn from_transport(transport: Transport<T>) -> Self {
307 let (sender, receiver) = transport.split();
308 let (tx, drain_alive) = spawn_drain_task(sender);
309
310 Self {
311 outbound_tx: tx,
312 receiver: MessageStream::new(receiver),
313 request_queue: RequestQueue::new(),
314 lifecycle_state: LifecycleState::default(),
315 shutdown_token: CancellationToken::new(),
316 response_map: None,
317 drain_alive,
318 }
319 }
320
321 /// Creates a new connection with a pre-existing request queue.
322 ///
323 /// This constructor allows you to provide your own [`RequestQueue`], which
324 /// is useful for:
325 /// - Testing with pre-populated request state
326 /// - Migrating state between connections
327 /// - Using custom metadata types
328 ///
329 /// # Arguments
330 ///
331 /// * `io` - Any async I/O stream implementing `AsyncRead + AsyncWrite`
332 /// * `request_queue` - A pre-existing request queue
333 ///
334 /// # Example
335 ///
336 /// ```
337 /// use lsp_server_tokio::{Connection, RequestQueue};
338 ///
339 /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
340 /// let (stream, _) = tokio::io::duplex(4096);
341 ///
342 /// // Create a queue with some pre-registered requests
343 /// use tokio_util::sync::CancellationToken;
344 /// let mut queue: RequestQueue<u32> = RequestQueue::new();
345 /// let token = CancellationToken::new();
346 /// queue.incoming.register(1.into(), 100, token);
347 ///
348 /// let conn = Connection::with_request_queue(stream, queue);
349 /// assert!(conn.request_queue.incoming.is_pending(&1.into()));
350 /// # });
351 /// ```
352 pub fn with_request_queue(io: T, request_queue: RequestQueue<I>) -> Self {
353 let transport = transport(io);
354 let (sender, receiver) = transport.split();
355 let (tx, drain_alive) = spawn_drain_task(sender);
356
357 Self {
358 outbound_tx: tx,
359 receiver: MessageStream::new(receiver),
360 request_queue,
361 lifecycle_state: LifecycleState::default(),
362 shutdown_token: CancellationToken::new(),
363 response_map: None,
364 drain_alive,
365 }
366 }
367
368 /// Returns the current lifecycle state.
369 #[must_use]
370 pub fn lifecycle_state(&self) -> LifecycleState {
371 self.lifecycle_state
372 }
373
374 /// Returns a token that is cancelled when shutdown is requested.
375 ///
376 /// Use this to gracefully stop background tasks when the server is
377 /// shutting down.
378 ///
379 /// # Example
380 ///
381 /// ```no_run
382 /// use lsp_server_tokio::Connection;
383 /// use tokio_util::sync::CancellationToken;
384 ///
385 /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
386 /// let (stream, _) = tokio::io::duplex(4096);
387 /// let conn: Connection<_, ()> = Connection::new(stream);
388 ///
389 /// let token = conn.shutdown_token();
390 /// tokio::spawn(async move {
391 /// loop {
392 /// tokio::select! {
393 /// _ = token.cancelled() => {
394 /// // Clean up and exit
395 /// break;
396 /// }
397 /// // ... other work ...
398 /// }
399 /// }
400 /// });
401 /// # });
402 /// ```
403 #[must_use]
404 pub fn shutdown_token(&self) -> CancellationToken {
405 self.shutdown_token.clone()
406 }
407
408 /// Returns a future that completes when shutdown is requested.
409 ///
410 /// This is equivalent to `self.shutdown_token().cancelled()` but more
411 /// convenient for simple use cases.
412 pub fn on_shutdown(&self) -> impl std::future::Future<Output = ()> + '_ {
413 self.shutdown_token.cancelled()
414 }
415}
416
417// Methods that only use the channel (no transport bounds needed beyond the struct constraint)
418impl<T, I> Connection<T, I>
419where
420 T: AsyncRead + AsyncWrite,
421{
422 /// Returns a mutable reference to the inbound message stream.
423 ///
424 /// Use with [`StreamExt::next()`](futures::StreamExt::next) to receive
425 /// messages from the peer.
426 ///
427 /// # Example
428 ///
429 /// ```
430 /// use futures::StreamExt;
431 /// use lsp_server_tokio::Connection;
432 ///
433 /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
434 /// let (stream, _) = tokio::io::duplex(4096);
435 /// let mut conn: Connection<_, ()> = Connection::new(stream);
436 ///
437 /// // Poll the next inbound message
438 /// // let msg = conn.receiver_mut().next().await;
439 /// # });
440 /// ```
441 pub fn receiver_mut(&mut self) -> &mut MessageStream<T> {
442 &mut self.receiver
443 }
444
445 /// Consumes the connection and returns the inbound message stream.
446 ///
447 /// This is useful when you need to move the stream into a separate task.
448 ///
449 /// # Example
450 ///
451 /// ```
452 /// use futures::StreamExt;
453 /// use lsp_server_tokio::Connection;
454 ///
455 /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
456 /// let (stream, _) = tokio::io::duplex(4096);
457 /// let conn: Connection<_, ()> = Connection::new(stream);
458 ///
459 /// let mut receiver = conn.into_receiver();
460 /// // tokio::spawn(async move { while let Some(msg) = receiver.next().await { ... } });
461 /// # });
462 /// ```
463 #[must_use]
464 pub fn into_receiver(self) -> MessageStream<T> {
465 self.receiver
466 }
467
468 /// Sends a message through the connection.
469 ///
470 /// Messages are forwarded to the background drain task which writes them
471 /// to the underlying transport. This method never blocks on I/O — it only
472 /// enqueues the message on an unbounded channel.
473 ///
474 /// # Errors
475 ///
476 /// Returns an error if the background drain task has exited (connection closed).
477 ///
478 /// # Example
479 ///
480 /// ```
481 /// use lsp_server_tokio::{Connection, Message, Request};
482 ///
483 /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
484 /// let (client_stream, _server_stream) = tokio::io::duplex(4096);
485 /// let conn: Connection<_, ()> = Connection::new(client_stream);
486 ///
487 /// let request = Message::Request(Request::new(1, "test", None));
488 /// conn.send(request).unwrap();
489 /// # });
490 /// ```
491 pub fn send(&self, message: Message) -> Result<(), io::Error> {
492 self.outbound_tx
493 .send(message)
494 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "connection closed"))
495 }
496}
497
498// Message routing methods
499impl<T, I> Connection<T, I>
500where
501 T: AsyncRead + AsyncWrite,
502 I: Default,
503{
504 /// Routes an incoming message, delivering responses to pending outgoing requests.
505 ///
506 /// Call this method for each message received from the transport to classify it
507 /// and automatically deliver responses to their corresponding outgoing request receivers.
508 ///
509 /// For [`IncomingMessage::Request`](crate::IncomingMessage::Request) variants, the request
510 /// is automatically registered with a [`CancellationToken`] that is:
511 /// - Cancelled when `$/cancelRequest` is received for this request ID
512 /// - Cancelled when the connection shuts down
513 ///
514 /// # Returns
515 ///
516 /// - [`IncomingMessage::Request`](crate::IncomingMessage::Request) - Handle the request and send a response
517 /// - [`IncomingMessage::Notification`](crate::IncomingMessage::Notification) - Handle the notification
518 /// - [`IncomingMessage::ResponseRouted`](crate::IncomingMessage::ResponseRouted) - Response was delivered to awaiting receiver
519 /// - [`IncomingMessage::ResponseUnknown`](crate::IncomingMessage::ResponseUnknown) - No pending request for this response ID
520 ///
521 /// # Example
522 ///
523 /// ```no_run
524 /// use lsp_server_tokio::{Connection, Message, IncomingMessage, Response};
525 /// use futures::StreamExt;
526 ///
527 /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
528 /// let (stream, _) = tokio::io::duplex(4096);
529 /// let mut conn: Connection<_, ()> = Connection::new(stream);
530 ///
531 /// while let Some(Ok(msg)) = conn.receiver_mut().next().await {
532 /// match conn.route(msg) {
533 /// IncomingMessage::Request(req, token) => {
534 /// // Handle request with cooperative cancellation
535 /// println!("Request: {}", req.method);
536 /// // Use token.cancelled().await in select! for cancellation
537 /// // After handling, call conn.request_queue.incoming.complete(&req.id)
538 /// }
539 /// IncomingMessage::Notification(notif) => {
540 /// // Handle notification
541 /// println!("Notification: {}", notif.method);
542 /// }
543 /// IncomingMessage::CancelHandled => {
544 /// // `$/cancelRequest` was handled by route()
545 /// }
546 /// IncomingMessage::ResponseRouted => {
547 /// // Response delivered to awaiting task
548 /// }
549 /// IncomingMessage::ResponseUnknown(resp) => {
550 /// // Log unexpected response
551 /// eprintln!("Unknown response: {:?}", resp.id);
552 /// }
553 /// _ => {}
554 /// }
555 /// }
556 /// # });
557 /// ```
558 pub fn route(&mut self, message: Message) -> crate::IncomingMessage {
559 match message {
560 Message::Request(req) => {
561 let token = self.shutdown_token.child_token();
562 self.request_queue
563 .incoming
564 .register(req.id.clone(), I::default(), token.clone());
565 crate::IncomingMessage::Request(req, token)
566 }
567 Message::Notification(notif) => {
568 if notif.method == crate::request_queue::CANCEL_REQUEST_METHOD {
569 if let Some(id) = crate::parse_cancel_params(¬if.params) {
570 let _ = self.request_queue.incoming.cancel(&id);
571 crate::IncomingMessage::CancelHandled
572 } else {
573 // Malformed cancel — let consumer handle it
574 crate::IncomingMessage::Notification(notif)
575 }
576 } else {
577 crate::IncomingMessage::Notification(notif)
578 }
579 }
580 Message::Response(resp) => {
581 if let Some(id) = resp.id.clone() {
582 // Try ClientSender's response map first (avoids cloning unless needed)
583 if let Some(response_map) = self.response_map.as_ref() {
584 if response_map.contains(&id) && response_map.deliver(&id, resp.clone()) {
585 return crate::IncomingMessage::ResponseRouted;
586 }
587 }
588
589 // Check if there's a pending request for this response
590 if self.request_queue.outgoing.is_pending(&id) {
591 // Complete the request - this sends the response to the awaiting receiver
592 self.request_queue.outgoing.complete(&id, resp);
593 crate::IncomingMessage::ResponseRouted
594 } else {
595 // No pending request for this ID
596 crate::IncomingMessage::ResponseUnknown(resp)
597 }
598 } else {
599 // Response with null ID (parse error response)
600 crate::IncomingMessage::ResponseUnknown(resp)
601 }
602 }
603 }
604 }
605
606 /// Cancels an incoming request by request ID.
607 ///
608 /// This is a convenience method that cancels the [`CancellationToken`] for a registered
609 /// incoming request. Use this when you receive a `$/cancelRequest` notification.
610 ///
611 /// # Arguments
612 ///
613 /// * `id` - The request ID to cancel
614 ///
615 /// # Returns
616 ///
617 /// `true` if the request was found and cancelled, `false` if not found.
618 ///
619 /// # Example
620 ///
621 /// ```
622 /// use lsp_server_tokio::Connection;
623 ///
624 /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
625 /// let (stream, _) = tokio::io::duplex(4096);
626 /// let mut conn: Connection<_, ()> = Connection::new(stream);
627 ///
628 /// // After receiving $/cancelRequest for ID 42:
629 /// let was_cancelled = conn.cancel_incoming(42);
630 /// # });
631 /// ```
632 pub fn cancel_incoming(&mut self, id: impl Into<crate::RequestId>) -> bool {
633 self.request_queue.incoming.cancel(&id.into())
634 }
635
636 /// Returns a cloneable [`ClientSender`] for sending requests, responses,
637 /// and notifications.
638 ///
639 /// The first call initializes the response routing infrastructure.
640 /// Subsequent calls return a new handle to the same underlying channel.
641 ///
642 /// # Example
643 ///
644 /// ```
645 /// use lsp_server_tokio::{Connection, Response};
646 ///
647 /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
648 /// let (stream, _) = tokio::io::duplex(4096);
649 /// let mut conn: Connection<_, ()> = Connection::new(stream);
650 ///
651 /// let sender1 = conn.client_sender();
652 /// let sender2 = conn.client_sender(); // returns another handle
653 /// # });
654 /// ```
655 #[must_use]
656 pub fn client_sender(&mut self) -> ClientSender {
657 if let Some(ref response_map) = self.response_map {
658 return ClientSender::new(
659 self.outbound_tx.clone(),
660 response_map.clone(),
661 self.drain_alive.clone(),
662 );
663 }
664
665 let response_map = ResponseMap::new();
666 self.response_map = Some(response_map.clone());
667 ClientSender::new(
668 self.outbound_tx.clone(),
669 response_map,
670 self.drain_alive.clone(),
671 )
672 }
673}
674
675// Outgoing request cancellation methods
676impl<T, I> Connection<T, I>
677where
678 T: AsyncRead + AsyncWrite,
679{
680 /// Cancels an outgoing request by sending $/cancelRequest and removing it from the queue.
681 ///
682 /// This method is for cancelling requests that **this connection sent** (outgoing requests).
683 /// For incoming request cancellation, use [`route()`](Self::route) which handles
684 /// `$/cancelRequest` notifications automatically.
685 ///
686 /// # Behavior
687 ///
688 /// 1. Sends a `$/cancelRequest` notification with the given request ID
689 /// 2. Removes the request from the outgoing queue (the awaiting receiver will get `RecvError`)
690 ///
691 /// # Returns
692 ///
693 /// - `Ok(true)` if the request was pending and cancelled
694 /// - `Ok(false)` if the request was not found in the queue
695 /// - `Err` if sending the notification failed
696 ///
697 /// # Errors
698 ///
699 /// Returns an error if the cancellation notification cannot be sent to the peer.
700 ///
701 /// # Example
702 ///
703 /// ```
704 /// use lsp_server_tokio::{Connection, Response};
705 /// use futures::SinkExt;
706 ///
707 /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
708 /// let (client_stream, server_stream) = tokio::io::duplex(4096);
709 /// let mut conn: Connection<_, ()> = Connection::new(client_stream);
710 ///
711 /// // Register an outgoing request
712 /// let rx = conn.request_queue.outgoing.register(42.into());
713 ///
714 /// // Cancel it
715 /// let was_pending = conn.cancel(42).unwrap();
716 /// assert!(was_pending);
717 ///
718 /// // The receiver will get an error
719 /// assert!(rx.await.is_err());
720 /// # });
721 /// ```
722 pub fn cancel(&mut self, id: impl Into<crate::RequestId>) -> Result<bool, std::io::Error> {
723 use crate::request_queue::CANCEL_REQUEST_METHOD;
724
725 let id = id.into();
726
727 // Build the $/cancelRequest notification
728 let notification =
729 crate::Notification::new(CANCEL_REQUEST_METHOD, Some(serde_json::json!({"id": id})));
730
731 // Send the notification
732 self.send(Message::Notification(notification))?;
733
734 // Cancel in the queue (returns true if was pending)
735 Ok(self.request_queue.outgoing.cancel(&id))
736 }
737}
738
739// Lifecycle management methods
740impl<T, I> Connection<T, I>
741where
742 T: AsyncRead + AsyncWrite,
743{
744 /// Waits for the initialize request from the client.
745 ///
746 /// This method blocks until an initialize request is received, rejecting
747 /// any other requests with `ServerNotInitialized` error and dropping
748 /// notifications (except exit which disconnects).
749 ///
750 /// Returns the request ID and params for the initialize request.
751 /// You must call [`initialize_finish()`](Self::initialize_finish) with the
752 /// same ID to complete the handshake.
753 ///
754 /// # Errors
755 ///
756 /// - [`ProtocolError::Disconnected`] if the connection is closed or exit notification received
757 /// - [`ProtocolError::Io`] if an I/O error occurs
758 ///
759 /// # Example
760 ///
761 /// ```no_run
762 /// use lsp_server_tokio::Connection;
763 ///
764 /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
765 /// let (stream, _) = tokio::io::duplex(4096);
766 /// let mut conn: Connection<_, ()> = Connection::new(stream);
767 ///
768 /// let (id, params) = conn.initialize_start().await.unwrap();
769 /// // Process params, build the full InitializeResult payload...
770 /// let capabilities = serde_json::json!({"textDocumentSync": 1});
771 /// let result = serde_json::json!({
772 /// "capabilities": capabilities,
773 /// "serverInfo": {"name": "my-server", "version": "0.1.0"}
774 /// });
775 /// conn.initialize_finish(id, result).await.unwrap();
776 /// # });
777 /// ```
778 pub async fn initialize_start(
779 &mut self,
780 ) -> Result<(crate::RequestId, serde_json::Value), ProtocolError> {
781 loop {
782 match self.receiver_mut().next().await {
783 Some(Ok(Message::Request(req))) => {
784 if req.method == "initialize" {
785 self.lifecycle_state = LifecycleState::Initializing;
786 return Ok((req.id, req.params.unwrap_or(serde_json::Value::Null)));
787 }
788
789 // Reject non-initialize requests with ServerNotInitialized
790 let error = crate::ResponseError::new(
791 crate::ErrorCode::ServerNotInitialized,
792 "Server not yet initialized",
793 );
794 let response = Message::Response(crate::Response::err(req.id, error));
795 if let Err(e) = self.send(response) {
796 return Err(ProtocolError::Io(e));
797 }
798 // Continue waiting for initialize
799 }
800 Some(Ok(Message::Notification(notif))) => {
801 if notif.method == "exit" {
802 return Err(ProtocolError::Disconnected);
803 }
804 // Drop other notifications silently
805 }
806 Some(Ok(Message::Response(_))) => {
807 // Unexpected response, ignore
808 }
809 Some(Err(e)) => {
810 return Err(ProtocolError::Io(e));
811 }
812 None => {
813 return Err(ProtocolError::Disconnected);
814 }
815 }
816 }
817 }
818
819 /// Completes the initialization handshake.
820 ///
821 /// Sends the `InitializeResult` response and waits for the initialized
822 /// notification from the client. After this returns `Ok(())`, the
823 /// connection is in Running state and ready for normal operation.
824 ///
825 /// # Arguments
826 ///
827 /// * `id` - The request ID from [`initialize_start()`](Self::initialize_start)
828 /// * `initialize_result` - The full `InitializeResult` value as JSON, sent
829 /// verbatim as the response result. Must include `capabilities` and may
830 /// include `serverInfo` or any other fields defined by the LSP spec.
831 ///
832 /// # Errors
833 ///
834 /// - [`ProtocolError::Disconnected`] if the connection is closed
835 /// - [`ProtocolError::InitializeTimeout`] if the client does not send
836 /// `initialized` within 60 seconds
837 /// - [`ProtocolError::Io`] if an I/O error occurs
838 ///
839 /// # Example
840 ///
841 /// ```no_run
842 /// use lsp_server_tokio::Connection;
843 ///
844 /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
845 /// let (stream, _) = tokio::io::duplex(4096);
846 /// let mut conn: Connection<_, ()> = Connection::new(stream);
847 ///
848 /// let (id, _params) = conn.initialize_start().await.unwrap();
849 /// let result = serde_json::json!({
850 /// "capabilities": {"textDocumentSync": 1},
851 /// "serverInfo": {"name": "my-server", "version": "0.1.0"}
852 /// });
853 /// conn.initialize_finish(id, result).await.unwrap();
854 ///
855 /// assert!(conn.is_running());
856 /// # });
857 /// ```
858 pub async fn initialize_finish(
859 &mut self,
860 id: crate::RequestId,
861 initialize_result: serde_json::Value,
862 ) -> Result<(), ProtocolError> {
863 use std::time::Duration;
864
865 // Send the response with the full InitializeResult verbatim
866 let response = Message::Response(crate::Response::ok(id, initialize_result));
867 if let Err(e) = self.send(response) {
868 return Err(ProtocolError::Io(e));
869 }
870
871 tokio::time::timeout(Duration::from_mins(1), async {
872 loop {
873 match self.receiver_mut().next().await {
874 Some(Ok(Message::Notification(notif))) => {
875 if notif.method == "initialized" {
876 self.lifecycle_state = LifecycleState::Running;
877 return Ok(());
878 }
879 // Drop other notifications silently
880 }
881 Some(Ok(Message::Request(req))) => {
882 // Still initializing, reject with ServerNotInitialized
883 let error = crate::ResponseError::new(
884 crate::ErrorCode::ServerNotInitialized,
885 "Server not yet initialized",
886 );
887 let response = Message::Response(crate::Response::err(req.id, error));
888 if let Err(e) = self.send(response) {
889 return Err(ProtocolError::Io(e));
890 }
891 }
892 Some(Ok(Message::Response(_))) => {
893 // Ignore unexpected responses
894 }
895 Some(Err(e)) => {
896 return Err(ProtocolError::Io(e));
897 }
898 None => {
899 return Err(ProtocolError::Disconnected);
900 }
901 }
902 }
903 })
904 .await
905 .map_err(|_| ProtocolError::InitializeTimeout)?
906 }
907
908 /// Performs complete LSP initialization handshake.
909 ///
910 /// This is a convenience method that calls [`initialize_start()`](Self::initialize_start)
911 /// followed by [`initialize_finish()`](Self::initialize_finish).
912 /// Returns the initialize params from the client.
913 ///
914 /// Unlike [`initialize_finish()`](Self::initialize_finish) which takes a full
915 /// `InitializeResult`, this method takes just the server capabilities and
916 /// wraps them in `{"capabilities": ...}` automatically.
917 ///
918 /// # Arguments
919 ///
920 /// * `server_capabilities` - The server's capabilities as JSON (will be
921 /// wrapped in `{"capabilities": server_capabilities}`)
922 ///
923 /// # Errors
924 ///
925 /// - [`ProtocolError::Disconnected`] if the connection is closed
926 /// - [`ProtocolError::Io`] if an I/O error occurs
927 ///
928 /// # Example
929 ///
930 /// ```no_run
931 /// use lsp_server_tokio::Connection;
932 ///
933 /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
934 /// let (stream, _) = tokio::io::duplex(4096);
935 /// let mut conn: Connection<_, ()> = Connection::new(stream);
936 ///
937 /// let capabilities = serde_json::json!({"textDocumentSync": 1});
938 /// let client_params = conn.initialize(capabilities).await.unwrap();
939 /// println!("Client capabilities: {}", client_params);
940 /// # });
941 /// ```
942 pub async fn initialize(
943 &mut self,
944 server_capabilities: serde_json::Value,
945 ) -> Result<serde_json::Value, ProtocolError> {
946 let (id, params) = self.initialize_start().await?;
947 let initialize_result = serde_json::json!({
948 "capabilities": server_capabilities
949 });
950 self.initialize_finish(id, initialize_result).await?;
951 Ok(params)
952 }
953
954 /// Handles a shutdown request.
955 ///
956 /// Transitions to `ShuttingDown` state, cancels the shutdown token,
957 /// and sends a null response. After this, only exit notification
958 /// should be received.
959 ///
960 /// # Arguments
961 ///
962 /// * `id` - The request ID of the shutdown request
963 ///
964 /// # Errors
965 ///
966 /// - [`ProtocolError::Io`] if sending the response fails
967 ///
968 /// # Example
969 ///
970 /// ```no_run
971 /// use lsp_server_tokio::{Connection, Message};
972 /// use futures::StreamExt;
973 ///
974 /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
975 /// let (stream, _) = tokio::io::duplex(4096);
976 /// let mut conn: Connection<_, ()> = Connection::new(stream);
977 ///
978 /// // ... after initialization ...
979 /// // When shutdown request is received:
980 /// // if let Message::Request(req) = msg && req.method == "shutdown" {
981 /// // conn.handle_shutdown(req.id).unwrap();
982 /// // assert!(conn.is_shutting_down());
983 /// // }
984 /// # });
985 /// ```
986 pub fn handle_shutdown(&mut self, id: crate::RequestId) -> Result<(), ProtocolError> {
987 // Cancel shutdown token first to notify waiting tasks
988 self.shutdown_token.cancel();
989
990 // Transition state
991 self.lifecycle_state = LifecycleState::ShuttingDown;
992
993 // Send null response
994 let response = Message::Response(crate::Response::ok(id, serde_json::Value::Null));
995 if let Err(e) = self.send(response) {
996 return Err(ProtocolError::Io(e));
997 }
998
999 Ok(())
1000 }
1001
1002 /// Handles the exit notification.
1003 ///
1004 /// Returns [`ExitCode::Success`] (exit code 0) if shutdown was received first,
1005 /// or [`ExitCode::Error`] (exit code 1) if exit came without shutdown.
1006 ///
1007 /// # Example
1008 ///
1009 /// ```
1010 /// use lsp_server_tokio::{Connection, ExitCode, LifecycleState};
1011 ///
1012 /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
1013 /// let (stream, _) = tokio::io::duplex(4096);
1014 /// let mut conn: Connection<_, ()> = Connection::new(stream);
1015 ///
1016 /// // Exit without shutdown - dirty exit
1017 /// let code = conn.handle_exit();
1018 /// assert_eq!(code, ExitCode::Error);
1019 /// # });
1020 /// ```
1021 pub fn handle_exit(&mut self) -> ExitCode {
1022 let was_shutting_down = self.lifecycle_state == LifecycleState::ShuttingDown;
1023 self.lifecycle_state = LifecycleState::Exited;
1024
1025 if was_shutting_down {
1026 ExitCode::Success
1027 } else {
1028 ExitCode::Error
1029 }
1030 }
1031
1032 /// Returns true if the connection is in Running state.
1033 ///
1034 /// The connection is in Running state after successful initialization
1035 /// and before shutdown is requested.
1036 #[must_use]
1037 pub fn is_running(&self) -> bool {
1038 self.lifecycle_state == LifecycleState::Running
1039 }
1040
1041 /// Returns true if shutdown has been requested.
1042 ///
1043 /// After shutdown, the server should only expect the exit notification.
1044 #[must_use]
1045 pub fn is_shutting_down(&self) -> bool {
1046 self.lifecycle_state == LifecycleState::ShuttingDown
1047 }
1048}
1049
1050pin_project! {
1051 /// A combined stdin/stdout stream for LSP server communication.
1052 ///
1053 /// This type wraps tokio's [`Stdin`] and [`Stdout`] into a single type that
1054 /// implements both [`AsyncRead`] and [`AsyncWrite`], suitable for use with
1055 /// [`Connection`].
1056 ///
1057 /// This is typically used internally by [`Connection::stdio()`] and doesn't
1058 /// need to be constructed directly.
1059 pub struct StdioTransport {
1060 #[pin]
1061 stdin: Stdin,
1062 #[pin]
1063 stdout: Stdout,
1064 }
1065}
1066
1067impl StdioTransport {
1068 /// Creates a new stdio transport from stdin and stdout.
1069 #[must_use]
1070 pub fn new() -> Self {
1071 Self {
1072 stdin: tokio::io::stdin(),
1073 stdout: tokio::io::stdout(),
1074 }
1075 }
1076}
1077
1078impl Default for StdioTransport {
1079 fn default() -> Self {
1080 Self::new()
1081 }
1082}
1083
1084impl AsyncRead for StdioTransport {
1085 fn poll_read(
1086 self: Pin<&mut Self>,
1087 cx: &mut Context<'_>,
1088 buf: &mut ReadBuf<'_>,
1089 ) -> Poll<io::Result<()>> {
1090 self.project().stdin.poll_read(cx, buf)
1091 }
1092}
1093
1094impl AsyncWrite for StdioTransport {
1095 fn poll_write(
1096 self: Pin<&mut Self>,
1097 cx: &mut Context<'_>,
1098 buf: &[u8],
1099 ) -> Poll<io::Result<usize>> {
1100 self.project().stdout.poll_write(cx, buf)
1101 }
1102
1103 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1104 self.project().stdout.poll_flush(cx)
1105 }
1106
1107 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1108 self.project().stdout.poll_shutdown(cx)
1109 }
1110}
1111
1112impl Connection<StdioTransport, ()> {
1113 /// Creates a connection using stdin for reading and stdout for writing.
1114 ///
1115 /// This is the typical constructor for LSP servers that communicate
1116 /// over standard I/O. Uses unit types `()` for request queue metadata.
1117 ///
1118 /// # Example
1119 ///
1120 /// ```no_run
1121 /// use futures::StreamExt;
1122 /// use lsp_server_tokio::{Connection, Message};
1123 ///
1124 /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
1125 /// let mut conn = Connection::stdio();
1126 ///
1127 /// // Process incoming messages
1128 /// while let Some(result) = conn.receiver_mut().next().await {
1129 /// match result {
1130 /// Ok(Message::Request(req)) => {
1131 /// println!("Received request: {}", req.method);
1132 /// // Handle request...
1133 /// }
1134 /// Ok(Message::Notification(notif)) => {
1135 /// println!("Received notification: {}", notif.method);
1136 /// }
1137 /// Ok(Message::Response(resp)) => {
1138 /// println!("Received response for: {:?}", resp.id);
1139 /// }
1140 /// Err(e) => {
1141 /// eprintln!("Error: {}", e);
1142 /// break;
1143 /// }
1144 /// }
1145 /// }
1146 /// # });
1147 /// ```
1148 ///
1149 /// # Note
1150 ///
1151 /// This constructor uses unit types `()` for both incoming and outgoing
1152 /// request metadata. If you need custom metadata types, use
1153 /// [`Connection::new()`] with a [`StdioTransport`] directly:
1154 ///
1155 /// ```no_run
1156 /// use lsp_server_tokio::{Connection, StdioTransport};
1157 ///
1158 /// let conn: Connection<StdioTransport, String> = Connection::new(StdioTransport::new());
1159 /// ```
1160 #[must_use]
1161 pub fn stdio() -> Self {
1162 Self::new(StdioTransport::new())
1163 }
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168 use super::*;
1169 use crate::{Request, Response, StdioConnection};
1170 use serde_json::json;
1171 use std::time::Duration;
1172
1173 #[tokio::test]
1174 async fn connection_from_duplex_test() {
1175 let (client_stream, server_stream) = tokio::io::duplex(4096);
1176 let client: Connection<_, ()> = Connection::new(client_stream);
1177 let mut server: Connection<_, ()> = Connection::new(server_stream);
1178
1179 // Send request from client
1180 let request = Message::Request(Request::new(1, "test", None));
1181 client.send(request).unwrap();
1182
1183 // Receive on server
1184 let received = server.receiver_mut().next().await.unwrap().unwrap();
1185 assert!(received.is_request());
1186 if let Message::Request(req) = received {
1187 assert_eq!(req.method, "test");
1188 assert_eq!(req.id, 1.into());
1189 } else {
1190 panic!("Expected Request");
1191 }
1192 }
1193
1194 #[tokio::test]
1195 async fn connection_bidirectional_test() {
1196 let (client_stream, server_stream) = tokio::io::duplex(4096);
1197 let mut client: Connection<_, ()> = Connection::new(client_stream);
1198 let mut server: Connection<_, ()> = Connection::new(server_stream);
1199
1200 // Client sends request
1201 let request = Message::Request(Request::new(
1202 1,
1203 "textDocument/hover",
1204 Some(json!({
1205 "textDocument": {"uri": "file:///test.rs"},
1206 "position": {"line": 10, "character": 5}
1207 })),
1208 ));
1209 client.send(request).unwrap();
1210
1211 // Server receives request
1212 let received = server.receiver_mut().next().await.unwrap().unwrap();
1213 assert!(received.is_request());
1214
1215 // Server sends response
1216 let response = Message::Response(Response::ok(
1217 1,
1218 json!({
1219 "contents": "fn main()"
1220 }),
1221 ));
1222 server.send(response).unwrap();
1223
1224 // Client receives response
1225 let received = client.receiver_mut().next().await.unwrap().unwrap();
1226 assert!(received.is_response());
1227 if let Message::Response(resp) = received {
1228 assert_eq!(resp.id, Some(1.into()));
1229 assert!(resp.result().is_some());
1230 } else {
1231 panic!("Expected Response");
1232 }
1233 }
1234
1235 #[tokio::test]
1236 async fn connection_from_transport_test() {
1237 let (client_stream, server_stream) = tokio::io::duplex(4096);
1238
1239 // Create transports manually
1240 let client_transport = transport(client_stream);
1241 let server_transport = transport(server_stream);
1242
1243 // Create connections from transports
1244 let client: Connection<_, ()> = Connection::from_transport(client_transport);
1245 let mut server: Connection<_, ()> = Connection::from_transport(server_transport);
1246
1247 // Verify functionality
1248 let request = Message::Request(Request::new(42, "test", None));
1249 client.send(request).unwrap();
1250
1251 let received = server.receiver_mut().next().await.unwrap().unwrap();
1252 assert!(received.is_request());
1253 if let Message::Request(req) = received {
1254 assert_eq!(req.id, 42.into());
1255 }
1256 }
1257
1258 #[tokio::test]
1259 async fn connection_multiple_messages_test() {
1260 let (client_stream, server_stream) = tokio::io::duplex(4096);
1261 let client: Connection<_, ()> = Connection::new(client_stream);
1262 let mut server: Connection<_, ()> = Connection::new(server_stream);
1263
1264 // Send 3 messages in sequence
1265 let msg1 = Message::Request(Request::new(1, "first", None));
1266 let msg2 = Message::Request(Request::new(2, "second", None));
1267 let msg3 = Message::Request(Request::new(3, "third", None));
1268
1269 client.send(msg1).unwrap();
1270 client.send(msg2).unwrap();
1271 client.send(msg3).unwrap();
1272
1273 // Receive all 3 - order must be preserved
1274 let recv1 = server.receiver_mut().next().await.unwrap().unwrap();
1275 let recv2 = server.receiver_mut().next().await.unwrap().unwrap();
1276 let recv3 = server.receiver_mut().next().await.unwrap().unwrap();
1277
1278 if let Message::Request(r) = recv1 {
1279 assert_eq!(r.method, "first");
1280 assert_eq!(r.id, 1.into());
1281 } else {
1282 panic!("Expected request");
1283 }
1284
1285 if let Message::Request(r) = recv2 {
1286 assert_eq!(r.method, "second");
1287 assert_eq!(r.id, 2.into());
1288 } else {
1289 panic!("Expected request");
1290 }
1291
1292 if let Message::Request(r) = recv3 {
1293 assert_eq!(r.method, "third");
1294 assert_eq!(r.id, 3.into());
1295 } else {
1296 panic!("Expected request");
1297 }
1298 }
1299
1300 #[tokio::test]
1301 async fn sender_receiver_independent_test() {
1302 let (client_stream, server_stream) = tokio::io::duplex(4096);
1303 let client: Connection<_, ()> = Connection::new(client_stream);
1304 let server: Connection<_, ()> = Connection::new(server_stream);
1305
1306 // Send from one task, receive from another
1307 let send_task =
1308 tokio::spawn(
1309 async move { client.send(Message::Request(Request::new(1, "test", None))) },
1310 );
1311
1312 let mut server_receiver = server.receiver;
1313 let recv_task = tokio::spawn(async move { server_receiver.next().await });
1314
1315 let (send_result, recv_result) = tokio::join!(send_task, recv_task);
1316 send_result.unwrap().unwrap();
1317 assert!(recv_result.unwrap().unwrap().unwrap().is_request());
1318 }
1319
1320 #[tokio::test]
1321 async fn connection_has_request_queue_test() {
1322 let (stream, _) = tokio::io::duplex(4096);
1323 let mut conn: Connection<_, String> = Connection::new(stream);
1324
1325 // Use request queue to track an incoming request
1326 let token = CancellationToken::new();
1327 conn.request_queue
1328 .incoming
1329 .register(1.into(), "handler_data".to_string(), token);
1330 assert!(conn.request_queue.incoming.is_pending(&1.into()));
1331
1332 // Complete it
1333 let data = conn.request_queue.incoming.complete(&1.into());
1334 assert_eq!(data, Some("handler_data".to_string()));
1335 }
1336
1337 #[tokio::test]
1338 async fn connection_with_request_queue_test() {
1339 let (stream, _) = tokio::io::duplex(4096);
1340 let mut queue: RequestQueue<u32> = RequestQueue::new();
1341 let token = CancellationToken::new();
1342 queue.incoming.register(42.into(), 100, token);
1343
1344 let conn = Connection::with_request_queue(stream, queue);
1345 assert!(conn.request_queue.incoming.is_pending(&42.into()));
1346 }
1347
1348 #[tokio::test]
1349 async fn connection_outgoing_request_queue_test() {
1350 let (stream, _) = tokio::io::duplex(4096);
1351 let mut conn: Connection<_, ()> = Connection::new(stream);
1352
1353 // Register an outgoing request
1354 let rx = conn.request_queue.outgoing.register(1.into());
1355 assert!(conn.request_queue.outgoing.is_pending(&1.into()));
1356
1357 // Complete it with a response
1358 let completed = conn.request_queue.outgoing.complete(
1359 &1.into(),
1360 Response::ok(1, serde_json::json!("response data")),
1361 );
1362 assert!(completed);
1363
1364 // Receiver gets the response
1365 let response = rx.await.unwrap();
1366 assert_eq!(response.id, Some(1.into()));
1367 assert_eq!(
1368 response.result().cloned(),
1369 Some(serde_json::json!("response data"))
1370 );
1371 }
1372
1373 // Note: Connection::stdio() cannot be tested in unit tests as it requires
1374 // actual stdin/stdout. It will be tested in E2E tests in Phase 9.
1375 // However, we can test that StdioTransport can be constructed.
1376 #[test]
1377 fn stdio_transport_constructible() {
1378 // Just verify it compiles and can be constructed
1379 // We can't actually test I/O without real stdio
1380 let _transport = StdioTransport::new();
1381 let _transport_default = StdioTransport::default();
1382 }
1383
1384 // =========================================================================
1385 // Lifecycle Tests
1386 // =========================================================================
1387
1388 use crate::{ExitCode, LifecycleState, Notification, ProtocolError};
1389
1390 #[tokio::test]
1391 async fn test_initialize_handshake() {
1392 let (client_stream, server_stream) = tokio::io::duplex(4096);
1393 let mut client: Connection<_, ()> = Connection::new(client_stream);
1394 let mut server: Connection<_, ()> = Connection::new(server_stream);
1395
1396 // Client sends initialize request
1397 let init_params = json!({"processId": 1234, "capabilities": {}});
1398 let init_request =
1399 Message::Request(Request::new(1, "initialize", Some(init_params.clone())));
1400 client.send(init_request).unwrap();
1401
1402 // Server waits for initialize
1403 let (id, params) = server.initialize_start().await.unwrap();
1404 assert_eq!(id, 1.into());
1405 assert_eq!(params["processId"], 1234);
1406 assert_eq!(server.lifecycle_state(), LifecycleState::Initializing);
1407
1408 // Spawn task to handle server's initialize_finish
1409 let server_task = tokio::spawn(async move {
1410 let result = json!({"capabilities": {"textDocumentSync": 1}});
1411 server.initialize_finish(id, result).await.unwrap();
1412 server
1413 });
1414
1415 // Client receives InitializeResult
1416 let response = client.receiver_mut().next().await.unwrap().unwrap();
1417 assert!(response.is_response());
1418 if let Message::Response(resp) = response {
1419 assert_eq!(resp.id, Some(1.into()));
1420 assert!(resp.result().is_some());
1421 let result = resp.into_result().unwrap();
1422 assert_eq!(result["capabilities"]["textDocumentSync"], 1);
1423 }
1424
1425 // Client sends initialized notification
1426 let initialized = Message::Notification(Notification::new("initialized", None));
1427 client.send(initialized).unwrap();
1428
1429 // Server's initialize_finish completes
1430 let server = server_task.await.unwrap();
1431 assert!(server.is_running());
1432 assert_eq!(server.lifecycle_state(), LifecycleState::Running);
1433 }
1434
1435 #[tokio::test]
1436 async fn stdio_connection_alias_constructs_with_custom_metadata() {
1437 let conn: StdioConnection<String> = Connection::new(StdioTransport::new());
1438
1439 assert_eq!(conn.lifecycle_state(), LifecycleState::Uninitialized);
1440 assert!(!conn.request_queue.incoming.is_pending(&1.into()));
1441 assert!(!conn.request_queue.outgoing.is_pending(&1.into()));
1442 }
1443
1444 #[tokio::test]
1445 async fn test_initialize_rejects_non_init_requests() {
1446 let (client_stream, server_stream) = tokio::io::duplex(4096);
1447 let mut client: Connection<_, ()> = Connection::new(client_stream);
1448 let mut server: Connection<_, ()> = Connection::new(server_stream);
1449
1450 // Client sends a non-initialize request first
1451 let hover_request = Message::Request(Request::new(1, "textDocument/hover", None));
1452 client.send(hover_request).unwrap();
1453
1454 // Spawn server's initialize_start
1455 let server_task = tokio::spawn(async move {
1456 server.initialize_start().await.unwrap();
1457 server
1458 });
1459
1460 // Client receives ServerNotInitialized error
1461 let response = client.receiver_mut().next().await.unwrap().unwrap();
1462 assert!(response.is_response());
1463 if let Message::Response(resp) = response {
1464 assert_eq!(resp.id, Some(1.into()));
1465 assert!(resp.error().is_some());
1466 let error = resp.into_error().unwrap();
1467 assert_eq!(error.code, crate::ErrorCode::ServerNotInitialized as i32);
1468 }
1469
1470 // Now client sends initialize - should be accepted
1471 let init_request = Message::Request(Request::new(2, "initialize", None));
1472 client.send(init_request).unwrap();
1473
1474 let server = server_task.await.unwrap();
1475 assert_eq!(server.lifecycle_state(), LifecycleState::Initializing);
1476 }
1477
1478 #[tokio::test(start_paused = true)]
1479 async fn test_initialize_finish_times_out_without_initialized() {
1480 let (client_stream, server_stream) = tokio::io::duplex(4096);
1481 let mut client: Connection<_, ()> = Connection::new(client_stream);
1482 let mut server: Connection<_, ()> = Connection::new(server_stream);
1483
1484 client
1485 .send(Message::Request(Request::new(1, "initialize", None)))
1486 .unwrap();
1487
1488 let (id, _params) = server.initialize_start().await.unwrap();
1489
1490 let server_task = tokio::spawn(async move {
1491 server
1492 .initialize_finish(id, json!({"capabilities": {}}))
1493 .await
1494 });
1495
1496 let response = client.receiver_mut().next().await.unwrap().unwrap();
1497 assert!(response.is_response());
1498
1499 tokio::time::advance(Duration::from_secs(61)).await;
1500
1501 let result = server_task.await.unwrap();
1502 assert!(matches!(result, Err(ProtocolError::InitializeTimeout)));
1503 }
1504
1505 #[tokio::test]
1506 async fn test_initialize_drops_notifications() {
1507 let (client_stream, server_stream) = tokio::io::duplex(4096);
1508 let client: Connection<_, ()> = Connection::new(client_stream);
1509 let mut server: Connection<_, ()> = Connection::new(server_stream);
1510
1511 // Client sends random notification before init
1512 let random_notif = Message::Notification(Notification::new("textDocument/didOpen", None));
1513 client.send(random_notif).unwrap();
1514
1515 // Client sends initialize request
1516 let init_request = Message::Request(Request::new(1, "initialize", None));
1517 client.send(init_request).unwrap();
1518
1519 // Server's initialize_start should skip notification and find initialize
1520 let (id, _params) = server.initialize_start().await.unwrap();
1521 assert_eq!(id, 1.into());
1522 assert_eq!(server.lifecycle_state(), LifecycleState::Initializing);
1523 }
1524
1525 #[tokio::test]
1526 async fn test_exit_during_init_disconnects() {
1527 let (client_stream, server_stream) = tokio::io::duplex(4096);
1528 let client: Connection<_, ()> = Connection::new(client_stream);
1529 let mut server: Connection<_, ()> = Connection::new(server_stream);
1530
1531 // Client sends exit notification instead of initialize
1532 let exit_notif = Message::Notification(Notification::new("exit", None));
1533 client.send(exit_notif).unwrap();
1534
1535 // Server's initialize_start should return Disconnected
1536 let result = server.initialize_start().await;
1537 assert!(matches!(result, Err(ProtocolError::Disconnected)));
1538 }
1539
1540 #[tokio::test]
1541 async fn test_shutdown_then_exit() {
1542 let (client_stream, server_stream) = tokio::io::duplex(4096);
1543 let mut client: Connection<_, ()> = Connection::new(client_stream);
1544 let mut server: Connection<_, ()> = Connection::new(server_stream);
1545
1546 // Complete initialization
1547 let init_request = Message::Request(Request::new(1, "initialize", None));
1548 client.send(init_request).unwrap();
1549
1550 let (id, _params) = server.initialize_start().await.unwrap();
1551
1552 let server_task = tokio::spawn(async move {
1553 server
1554 .initialize_finish(id, json!({"capabilities": {}}))
1555 .await
1556 .unwrap();
1557 server
1558 });
1559
1560 let _ = client.receiver_mut().next().await; // Receive InitializeResult
1561 let initialized = Message::Notification(Notification::new("initialized", None));
1562 client.send(initialized).unwrap();
1563
1564 let mut server = server_task.await.unwrap();
1565 assert!(server.is_running());
1566
1567 // Client sends shutdown request
1568 let shutdown_request = Message::Request(Request::new(2, "shutdown", None));
1569 client.send(shutdown_request).unwrap();
1570
1571 // Server receives and handles shutdown
1572 let msg = server.receiver_mut().next().await.unwrap().unwrap();
1573 if let Message::Request(req) = msg {
1574 assert_eq!(req.method, "shutdown");
1575 server.handle_shutdown(req.id).unwrap();
1576 } else {
1577 panic!("Expected shutdown request");
1578 }
1579
1580 // Verify shutdown state
1581 assert!(server.is_shutting_down());
1582 assert!(server.shutdown_token().is_cancelled());
1583
1584 // Client receives null response
1585 let response = client.receiver_mut().next().await.unwrap().unwrap();
1586 if let Message::Response(resp) = response {
1587 assert_eq!(resp.id, Some(2.into()));
1588 assert_eq!(resp.result().cloned(), Some(serde_json::Value::Null));
1589 }
1590
1591 // Client sends exit
1592 // Server handles exit
1593 let exit_code = server.handle_exit();
1594 assert_eq!(exit_code, ExitCode::Success);
1595 assert_eq!(server.lifecycle_state(), LifecycleState::Exited);
1596 }
1597
1598 #[tokio::test]
1599 async fn test_exit_without_shutdown() {
1600 let (client_stream, server_stream) = tokio::io::duplex(4096);
1601 let mut client: Connection<_, ()> = Connection::new(client_stream);
1602 let mut server: Connection<_, ()> = Connection::new(server_stream);
1603
1604 // Complete initialization
1605 let init_request = Message::Request(Request::new(1, "initialize", None));
1606 client.send(init_request).unwrap();
1607
1608 let (id, _params) = server.initialize_start().await.unwrap();
1609
1610 let server_task = tokio::spawn(async move {
1611 server
1612 .initialize_finish(id, json!({"capabilities": {}}))
1613 .await
1614 .unwrap();
1615 server
1616 });
1617
1618 let _ = client.receiver_mut().next().await;
1619 let initialized = Message::Notification(Notification::new("initialized", None));
1620 client.send(initialized).unwrap();
1621
1622 let mut server = server_task.await.unwrap();
1623 assert!(server.is_running());
1624
1625 // Server receives exit without shutdown - dirty exit
1626 let exit_code = server.handle_exit();
1627 assert_eq!(exit_code, ExitCode::Error);
1628 assert_eq!(server.lifecycle_state(), LifecycleState::Exited);
1629 }
1630
1631 #[tokio::test]
1632 async fn test_on_shutdown_future() {
1633 let (client_stream, server_stream) = tokio::io::duplex(4096);
1634 let mut client: Connection<_, ()> = Connection::new(client_stream);
1635 let mut server: Connection<_, ()> = Connection::new(server_stream);
1636
1637 // Complete initialization
1638 let init_request = Message::Request(Request::new(1, "initialize", None));
1639 client.send(init_request).unwrap();
1640
1641 let (id, _params) = server.initialize_start().await.unwrap();
1642
1643 let server_task = tokio::spawn(async move {
1644 server
1645 .initialize_finish(id, json!({"capabilities": {}}))
1646 .await
1647 .unwrap();
1648 server
1649 });
1650
1651 let _ = client.receiver_mut().next().await;
1652 let initialized = Message::Notification(Notification::new("initialized", None));
1653 client.send(initialized).unwrap();
1654
1655 let mut server = server_task.await.unwrap();
1656
1657 // Spawn a task waiting on shutdown
1658 let token = server.shutdown_token();
1659 let wait_task = tokio::spawn(async move {
1660 token.cancelled().await;
1661 "shutdown received"
1662 });
1663
1664 // Send shutdown
1665 let shutdown_request = Message::Request(Request::new(2, "shutdown", None));
1666 client.send(shutdown_request).unwrap();
1667
1668 let msg = server.receiver_mut().next().await.unwrap().unwrap();
1669 if let Message::Request(req) = msg {
1670 server.handle_shutdown(req.id).unwrap();
1671 }
1672
1673 // The wait task should complete now
1674 let result = tokio::time::timeout(std::time::Duration::from_millis(100), wait_task)
1675 .await
1676 .expect("wait task should complete quickly")
1677 .unwrap();
1678
1679 assert_eq!(result, "shutdown received");
1680 }
1681
1682 // =========================================================================
1683 // Routing Tests
1684 // =========================================================================
1685
1686 use crate::IncomingMessage;
1687
1688 #[tokio::test]
1689 async fn route_request_returns_incoming_request() {
1690 let (stream, _) = tokio::io::duplex(4096);
1691 let mut conn: Connection<_, ()> = Connection::new(stream);
1692
1693 let request = Request::new(42, "textDocument/hover", Some(json!({"line": 10})));
1694 let message = Message::Request(request);
1695
1696 let result = conn.route(message);
1697 match result {
1698 IncomingMessage::Request(req, token) => {
1699 assert_eq!(req.id, 42.into());
1700 assert_eq!(req.method, "textDocument/hover");
1701 // Token should not be cancelled yet
1702 assert!(!token.is_cancelled());
1703 // Request should be auto-registered
1704 assert!(conn.request_queue.incoming.is_pending(&42.into()));
1705 }
1706 _ => panic!("Expected IncomingMessage::Request"),
1707 }
1708 }
1709
1710 #[tokio::test]
1711 async fn route_notification_returns_incoming_notification() {
1712 let (stream, _) = tokio::io::duplex(4096);
1713 let mut conn: Connection<_, ()> = Connection::new(stream);
1714
1715 let notification = Notification::new(
1716 "textDocument/didOpen",
1717 Some(json!({"uri": "file:///test.rs"})),
1718 );
1719 let message = Message::Notification(notification);
1720
1721 let result = conn.route(message);
1722 match result {
1723 IncomingMessage::Notification(notif) => {
1724 assert_eq!(notif.method, "textDocument/didOpen");
1725 }
1726 _ => panic!("Expected IncomingMessage::Notification"),
1727 }
1728 }
1729
1730 #[tokio::test]
1731 async fn route_response_to_pending_outgoing_request() {
1732 let (stream, _) = tokio::io::duplex(4096);
1733 let mut conn: Connection<_, ()> = Connection::new(stream);
1734
1735 // Register an outgoing request
1736 let rx = conn.request_queue.outgoing.register(42.into());
1737
1738 // Create a matching response
1739 let response = Response::ok(42, json!({"result": "success"}));
1740 let message = Message::Response(response);
1741
1742 // Route it
1743 let result = conn.route(message);
1744 assert!(
1745 matches!(result, IncomingMessage::ResponseRouted),
1746 "Expected ResponseRouted, got {result:?}"
1747 );
1748
1749 // Verify receiver got the response
1750 let received = rx.await.expect("Should receive response");
1751 assert_eq!(received.id, Some(42.into()));
1752 assert!(received.result().is_some());
1753 assert_eq!(received.into_result().unwrap()["result"], "success");
1754 }
1755
1756 #[tokio::test]
1757 async fn route_response_for_unknown_id_returns_response_unknown() {
1758 let (stream, _) = tokio::io::duplex(4096);
1759 let mut conn: Connection<_, ()> = Connection::new(stream);
1760
1761 // Create a response for an ID that was never registered
1762 let response = Response::ok(999, json!({"unexpected": true}));
1763 let message = Message::Response(response);
1764
1765 let result = conn.route(message);
1766 match result {
1767 IncomingMessage::ResponseUnknown(resp) => {
1768 assert_eq!(resp.id, Some(999.into()));
1769 }
1770 _ => panic!("Expected IncomingMessage::ResponseUnknown"),
1771 }
1772 }
1773
1774 #[tokio::test]
1775 async fn route_response_with_null_id_returns_response_unknown() {
1776 let (stream, _) = tokio::io::duplex(4096);
1777 let mut conn: Connection<_, ()> = Connection::new(stream);
1778
1779 // Create a parse error response (null id)
1780 let response = Response::parse_error(crate::ResponseError::new(
1781 crate::ErrorCode::ParseError,
1782 "Parse error",
1783 ));
1784 let message = Message::Response(response);
1785
1786 let result = conn.route(message);
1787 match result {
1788 IncomingMessage::ResponseUnknown(resp) => {
1789 assert!(
1790 resp.id.is_none(),
1791 "Expected null id for parse error response"
1792 );
1793 assert!(resp.error().is_some());
1794 }
1795 _ => panic!("Expected IncomingMessage::ResponseUnknown"),
1796 }
1797 }
1798
1799 #[tokio::test]
1800 async fn route_response_with_string_id() {
1801 let (stream, _) = tokio::io::duplex(4096);
1802 let mut conn: Connection<_, ()> = Connection::new(stream);
1803
1804 // Register with string ID
1805 let rx = conn.request_queue.outgoing.register("request-abc".into());
1806
1807 // Create matching response
1808 let response = Response::ok("request-abc", json!(null));
1809 let message = Message::Response(response);
1810
1811 let result = conn.route(message);
1812 assert!(matches!(result, IncomingMessage::ResponseRouted));
1813
1814 let received = rx.await.expect("Should receive response");
1815 assert_eq!(
1816 received.id,
1817 Some(crate::RequestId::String("request-abc".to_string()))
1818 );
1819 }
1820
1821 #[tokio::test]
1822 async fn route_multiple_responses_to_different_requests() {
1823 let (stream, _) = tokio::io::duplex(4096);
1824 let mut conn: Connection<_, ()> = Connection::new(stream);
1825
1826 // Register multiple outgoing requests
1827 let rx1 = conn.request_queue.outgoing.register(1.into());
1828 let rx2 = conn.request_queue.outgoing.register(2.into());
1829 let rx3 = conn.request_queue.outgoing.register(3.into());
1830
1831 // Route responses out of order
1832 let result2 = conn.route(Message::Response(Response::ok(2, json!("second"))));
1833 assert!(matches!(result2, IncomingMessage::ResponseRouted));
1834
1835 let result1 = conn.route(Message::Response(Response::ok(1, json!("first"))));
1836 assert!(matches!(result1, IncomingMessage::ResponseRouted));
1837
1838 let result3 = conn.route(Message::Response(Response::ok(3, json!("third"))));
1839 assert!(matches!(result3, IncomingMessage::ResponseRouted));
1840
1841 // Verify all receivers got correct responses
1842 let resp1 = rx1.await.unwrap();
1843 assert_eq!(resp1.into_result().unwrap(), json!("first"));
1844
1845 let resp2 = rx2.await.unwrap();
1846 assert_eq!(resp2.into_result().unwrap(), json!("second"));
1847
1848 let resp3 = rx3.await.unwrap();
1849 assert_eq!(resp3.into_result().unwrap(), json!("third"));
1850 }
1851
1852 #[tokio::test]
1853 async fn route_error_response_to_pending_request() {
1854 let (stream, _) = tokio::io::duplex(4096);
1855 let mut conn: Connection<_, ()> = Connection::new(stream);
1856
1857 // Register an outgoing request (we won't await it in this sync test)
1858 let _rx = conn.request_queue.outgoing.register(42.into());
1859
1860 // Route an error response
1861 let response = Response::err(
1862 42,
1863 crate::ResponseError::new(crate::ErrorCode::MethodNotFound, "Not found"),
1864 );
1865 let message = Message::Response(response);
1866
1867 let result = conn.route(message);
1868 assert!(matches!(result, IncomingMessage::ResponseRouted));
1869 }
1870
1871 // =========================================================================
1872 // Cancellation Tests
1873 // =========================================================================
1874
1875 #[tokio::test]
1876 async fn route_cancel_request_returns_cancel_handled_and_cancels_pending() {
1877 let (stream, _) = tokio::io::duplex(4096);
1878 let mut conn: Connection<_, ()> = Connection::new(stream);
1879
1880 // Register a request via route
1881 let request = Request::new(42, "test", None);
1882 let routed = conn.route(Message::Request(request));
1883 let IncomingMessage::Request(_, token) = routed else {
1884 panic!("expected Request");
1885 };
1886 assert!(!token.is_cancelled());
1887
1888 // Send $/cancelRequest via route
1889 let cancel = Notification::new("$/cancelRequest", Some(json!({"id": 42})));
1890 let result = conn.route(Message::Notification(cancel));
1891 assert!(matches!(result, IncomingMessage::CancelHandled));
1892 assert!(token.is_cancelled());
1893 }
1894
1895 #[tokio::test]
1896 async fn route_cancel_request_unknown_id_still_returns_cancel_handled() {
1897 let (stream, _) = tokio::io::duplex(4096);
1898 let mut conn: Connection<_, ()> = Connection::new(stream);
1899
1900 // Cancel a request that was never registered — still CancelHandled
1901 let cancel = Notification::new("$/cancelRequest", Some(json!({"id": 99})));
1902 let result = conn.route(Message::Notification(cancel));
1903 assert!(matches!(result, IncomingMessage::CancelHandled));
1904 }
1905
1906 #[tokio::test]
1907 async fn route_regular_notification_not_affected() {
1908 let (stream, _) = tokio::io::duplex(4096);
1909 let mut conn: Connection<_, ()> = Connection::new(stream);
1910
1911 let notif = Notification::new("textDocument/didOpen", None);
1912 let result = conn.route(Message::Notification(notif));
1913 assert!(matches!(result, IncomingMessage::Notification(_)));
1914 }
1915
1916 #[tokio::test]
1917 async fn cancellation_propagates_to_spawned_handler() {
1918 let (stream, _) = tokio::io::duplex(4096);
1919 let mut conn: Connection<_, ()> = Connection::new(stream);
1920
1921 // Register a request via route
1922 let request = Request::new(1, "test", None);
1923 let result = conn.route(Message::Request(request));
1924 let IncomingMessage::Request(_, token) = result else {
1925 panic!("Expected IncomingMessage::Request")
1926 };
1927
1928 // Spawn a handler that waits for cancellation
1929 let handle = tokio::spawn(async move {
1930 token.cancelled().await;
1931 "cancelled"
1932 });
1933
1934 // Cancel the request
1935 let _ = conn.request_queue.incoming.cancel(&1.into());
1936
1937 // Handler should complete quickly
1938 let result = tokio::time::timeout(std::time::Duration::from_millis(100), handle)
1939 .await
1940 .expect("Handler should complete quickly")
1941 .unwrap();
1942
1943 assert_eq!(result, "cancelled");
1944 }
1945
1946 #[tokio::test]
1947 async fn route_request_auto_registers_and_cancellation_works() {
1948 let (stream, _) = tokio::io::duplex(4096);
1949 let mut conn: Connection<_, ()> = Connection::new(stream);
1950
1951 // Route a request
1952 let request = Request::new(42, "test", None);
1953 let result = conn.route(Message::Request(request));
1954
1955 let IncomingMessage::Request(_, token) = result else {
1956 panic!("Expected IncomingMessage::Request")
1957 };
1958
1959 // Token should not be cancelled
1960 assert!(!token.is_cancelled());
1961
1962 // Cancel via cancel_incoming
1963 let was_cancelled = conn.cancel_incoming(42);
1964 assert!(was_cancelled);
1965
1966 // Token should now be cancelled
1967 assert!(token.is_cancelled());
1968 }
1969
1970 // =========================================================================
1971 // Outgoing Cancel Tests
1972 // =========================================================================
1973
1974 #[tokio::test]
1975 async fn test_cancel_outgoing_request() {
1976 let (client_stream, server_stream) = tokio::io::duplex(4096);
1977 let mut client: Connection<_, ()> = Connection::new(client_stream);
1978 let mut server: Connection<_, ()> = Connection::new(server_stream);
1979
1980 // Register an outgoing request on client
1981 let rx = client.request_queue.outgoing.register(42.into());
1982
1983 // Cancel it - should send notification and remove from queue
1984 let was_pending = client.cancel(42).unwrap();
1985 assert!(was_pending);
1986 assert!(!client.request_queue.outgoing.is_pending(&42.into()));
1987
1988 // Server should receive the $/cancelRequest notification
1989 let msg = server.receiver_mut().next().await.unwrap().unwrap();
1990 assert!(msg.is_notification());
1991 if let Message::Notification(notif) = msg {
1992 assert_eq!(notif.method, "$/cancelRequest");
1993 assert_eq!(notif.params.unwrap()["id"], 42);
1994 } else {
1995 panic!("Expected notification");
1996 }
1997
1998 // The receiver should get an error (sender dropped)
1999 assert!(rx.await.is_err());
2000 }
2001
2002 #[tokio::test]
2003 async fn test_cancel_unknown_outgoing_request() {
2004 let (client_stream, _server_stream) = tokio::io::duplex(4096);
2005 let mut client: Connection<_, ()> = Connection::new(client_stream);
2006
2007 // Cancel a request that was never registered
2008 let was_pending = client.cancel(999).unwrap();
2009 assert!(!was_pending);
2010 }
2011
2012 #[tokio::test]
2013 async fn test_cancel_with_string_id() {
2014 let (client_stream, server_stream) = tokio::io::duplex(4096);
2015 let mut client: Connection<_, ()> = Connection::new(client_stream);
2016 let mut server: Connection<_, ()> = Connection::new(server_stream);
2017
2018 // Register with string ID
2019 let rx = client.request_queue.outgoing.register("req-abc".into());
2020
2021 // Cancel it
2022 let was_pending = client.cancel("req-abc").unwrap();
2023 assert!(was_pending);
2024
2025 // Server should receive the notification with string ID
2026 let msg = server.receiver_mut().next().await.unwrap().unwrap();
2027 if let Message::Notification(notif) = msg {
2028 assert_eq!(notif.params.unwrap()["id"], "req-abc");
2029 } else {
2030 panic!("Expected notification");
2031 }
2032
2033 assert!(rx.await.is_err());
2034 }
2035
2036 // =========================================================================
2037 // ClientSender Integration Tests
2038 // =========================================================================
2039
2040 #[tokio::test]
2041 async fn client_sender_messages_arrive_on_receiver() {
2042 let (client_stream, server_stream) = tokio::io::duplex(4096);
2043 let mut server: Connection<_, ()> = Connection::new(server_stream);
2044 let mut client: Connection<_, ()> = Connection::new(client_stream);
2045
2046 let sender = server.client_sender();
2047
2048 // Notification via ClientSender should arrive on client's receiver
2049 sender
2050 .notify(
2051 "window/logMessage",
2052 Some(json!({"type": 3, "message": "hello"})),
2053 )
2054 .unwrap();
2055
2056 let msg = client.receiver_mut().next().await.unwrap().unwrap();
2057 assert!(msg.is_notification());
2058 if let Message::Notification(notif) = msg {
2059 assert_eq!(notif.method, "window/logMessage");
2060 assert_eq!(notif.params.unwrap()["message"], "hello");
2061 }
2062 }
2063
2064 #[tokio::test]
2065 async fn client_sender_request_routed_through_response_map() {
2066 let (client_stream, server_stream) = tokio::io::duplex(4096);
2067 let mut server: Connection<_, ()> = Connection::new(server_stream);
2068 let mut client: Connection<_, ()> = Connection::new(client_stream);
2069
2070 let sender = server.client_sender();
2071
2072 // Spawn request task
2073 let sender_clone = sender.clone();
2074 let req_task = tokio::spawn(async move {
2075 sender_clone
2076 .request(
2077 "client/registerCapability",
2078 Some(json!({"registrations": []})),
2079 )
2080 .await
2081 });
2082
2083 // Client receives the request
2084 let msg = client.receiver_mut().next().await.unwrap().unwrap();
2085 let req_id = if let Message::Request(req) = msg {
2086 assert_eq!(req.method, "client/registerCapability");
2087 req.id.clone()
2088 } else {
2089 panic!("Expected Request");
2090 };
2091
2092 // Client sends response back
2093 client
2094 .send(Message::Response(Response::ok(req_id.clone(), json!(null))))
2095 .unwrap();
2096
2097 // Server reads the response and routes it through response_map
2098 let resp_msg = server.receiver_mut().next().await.unwrap().unwrap();
2099 let routed = server.route(resp_msg);
2100 assert!(
2101 matches!(routed, IncomingMessage::ResponseRouted),
2102 "Expected ResponseRouted, got {routed:?}"
2103 );
2104
2105 // The request task should complete with the response
2106 let result = tokio::time::timeout(Duration::from_secs(1), req_task)
2107 .await
2108 .expect("request should complete")
2109 .unwrap()
2110 .unwrap();
2111 assert_eq!(result.id, Some(req_id));
2112 }
2113
2114 #[tokio::test]
2115 async fn route_prefers_response_map_over_outgoing_queue() {
2116 let (client_stream, _server_stream) = tokio::io::duplex(4096);
2117 let mut server: Connection<_, ()> = Connection::new(client_stream);
2118
2119 let sender = server.client_sender();
2120
2121 // Register the same ID in both response_map (via ClientSender) and outgoing queue
2122 let mut outgoing_rx = server.request_queue.outgoing.register(1.into());
2123
2124 let sender_clone = sender.clone();
2125 let req_task = tokio::spawn(async move { sender_clone.request("test", None).await });
2126
2127 // Give the request task time to register in the response_map
2128 tokio::task::yield_now().await;
2129 tokio::task::yield_now().await;
2130
2131 // Route a response for id=1 — should go to response_map, not outgoing queue
2132 let response = Response::ok(1, json!("via-response-map"));
2133 let result = server.route(Message::Response(response));
2134 assert!(matches!(result, IncomingMessage::ResponseRouted));
2135
2136 // The request task should get the response
2137 let resp = tokio::time::timeout(Duration::from_secs(1), req_task)
2138 .await
2139 .expect("should complete")
2140 .unwrap()
2141 .unwrap();
2142 assert_eq!(resp.result().cloned(), Some(json!("via-response-map")));
2143
2144 // outgoing queue receiver should NOT have received anything (still pending)
2145 assert!(outgoing_rx.try_recv().is_err());
2146 }
2147
2148 #[tokio::test]
2149 async fn send_works_before_and_after_client_sender() {
2150 let (client_stream, server_stream) = tokio::io::duplex(4096);
2151 let server: Connection<_, ()> = Connection::new(server_stream);
2152 let mut client: Connection<_, ()> = Connection::new(client_stream);
2153
2154 // send works before client_sender
2155 server
2156 .send(Message::Notification(Notification::new(
2157 "test/before",
2158 None,
2159 )))
2160 .unwrap();
2161
2162 let msg = client.receiver_mut().next().await.unwrap().unwrap();
2163 if let Message::Notification(notif) = msg {
2164 assert_eq!(notif.method, "test/before");
2165 }
2166 }
2167
2168 #[tokio::test]
2169 async fn client_sender_can_be_called_multiple_times() {
2170 let (stream, _) = tokio::io::duplex(4096);
2171 let mut conn: Connection<_, ()> = Connection::new(stream);
2172
2173 let sender1 = conn.client_sender();
2174 let sender2 = conn.client_sender(); // should NOT panic
2175 drop(sender1);
2176 drop(sender2);
2177 }
2178
2179 #[tokio::test]
2180 async fn send_works_after_client_sender() {
2181 let (client_stream, server_stream) = tokio::io::duplex(4096);
2182 let mut server: Connection<_, ()> = Connection::new(server_stream);
2183 let mut client: Connection<_, ()> = Connection::new(client_stream);
2184
2185 let _sender = server.client_sender();
2186
2187 // send() should still work after client_sender()
2188 server
2189 .send(Message::Notification(Notification::new("test/ping", None)))
2190 .unwrap();
2191
2192 let msg = client.receiver_mut().next().await.unwrap().unwrap();
2193 assert!(msg.is_notification());
2194 if let Message::Notification(notif) = msg {
2195 assert_eq!(notif.method, "test/ping");
2196 }
2197 }
2198}