Skip to main content

lsp_server_tokio/
connection.rs

1//! Connection abstraction for bidirectional LSP message communication.
2//!
3//! This module provides the [`Connection`] type which is the primary entry point
4//! for LSP communication. It wraps a transport and provides:
5//!
6//! - Split sender/receiver halves for async message passing
7//! - Request queue for tracking pending requests in both directions
8//! - Lifecycle state management for LSP initialization and shutdown
9//!
10//! # Overview
11//!
12//! A [`Connection`] is constructed from any `AsyncRead + AsyncWrite` stream and
13//! provides a [`send()`](Connection::send) method for outbound messages and a public
14//! [`receiver_mut()`](Connection::receiver_mut) method for inbound messages. It also contains
15//! a [`RequestQueue`] for tracking pending requests.
16//!
17//! # Examples
18//!
19//! ## Basic usage with duplex streams
20//!
21//! ```
22//! use futures::{SinkExt, StreamExt};
23//! use lsp_server_tokio::{Connection, Message, Request};
24//!
25//! # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
26//! let (client_stream, server_stream) = tokio::io::duplex(4096);
27//!
28//! // Create connections
29//! let mut client: Connection<_, ()> = Connection::new(client_stream);
30//! let mut server: Connection<_, ()> = Connection::new(server_stream);
31//!
32//! // Send a request from client
33//! let request = Message::Request(Request::new(1, "textDocument/hover", None));
34//! client.send(request).unwrap();
35//!
36//! // Receive on server
37//! let msg = server.receiver_mut().next().await.unwrap().unwrap();
38//! assert!(msg.is_request());
39//! # });
40//! ```
41//!
42//! ## With request queue tracking
43//!
44//! ```
45//! use lsp_server_tokio::{Connection, RequestQueue};
46//!
47//! # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
48//! let (stream, _) = tokio::io::duplex(4096);
49//!
50//! // Create connection with typed request queue
51//! let mut conn: Connection<_, String> = Connection::new(stream);
52//!
53//! // Track an incoming request with a cancellation token
54//! use tokio_util::sync::CancellationToken;
55//! let token = CancellationToken::new();
56//! conn.request_queue.incoming.register(1.into(), "textDocument/hover".to_string(), token);
57//! assert!(conn.request_queue.incoming.is_pending(&1.into()));
58//! # });
59//! ```
60//!
61//! ## Stdio for LSP servers
62//!
63//! ```no_run
64//! use lsp_server_tokio::Connection;
65//!
66//! # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
67//! // Create a connection over stdin/stdout for typical LSP server usage
68//! let conn = Connection::stdio();
69//! // Use conn.send() to write responses, conn.receiver_mut() to read requests
70//! # });
71//! ```
72
73use std::io;
74use std::pin::Pin;
75use std::task::{Context, Poll};
76
77use futures::stream::SplitStream;
78use futures::{SinkExt, StreamExt};
79use pin_project_lite::pin_project;
80use tokio::io::{AsyncRead, AsyncWrite, ReadBuf, Stdin, Stdout};
81use tokio::sync::mpsc;
82use tokio_util::sync::CancellationToken;
83
84use crate::client_sender::ResponseMap;
85use crate::lifecycle::{ExitCode, LifecycleState, ProtocolError};
86use crate::ClientSender;
87use crate::{transport, Message, RequestQueue, Transport};
88
89/// Spawns a background task that drains messages from the channel into the transport sink.
90///
91/// Returns the sender half of the channel and a cancellation token that is
92/// triggered when the drain task exits (e.g., because the transport is broken).
93fn spawn_drain_task<T>(
94    mut sender: futures::stream::SplitSink<Transport<T>, Message>,
95) -> (mpsc::UnboundedSender<Message>, CancellationToken)
96where
97    T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
98{
99    let (tx, mut rx) = mpsc::unbounded_channel();
100    let drain_alive = CancellationToken::new();
101    let drain_guard = drain_alive.clone().drop_guard();
102
103    tokio::spawn(async move {
104        let _guard = drain_guard;
105        while let Some(message) = rx.recv().await {
106            if sender.send(message).await.is_err() {
107                return;
108            }
109        }
110    });
111
112    (tx, drain_alive)
113}
114
115/// A [`Connection`] backed by [`StdioTransport`].
116///
117/// This alias is useful when building stdio-based servers that want to use
118/// custom metadata types for request tracking without repeating the full
119/// `Connection<StdioTransport, I>` generic.
120///
121/// # Examples
122///
123/// ```no_run
124/// use lsp_server_tokio::{StdioTransport, Connection, StdioConnection};
125///
126/// let conn: StdioConnection<String> = Connection::new(StdioTransport::new());
127/// ```
128pub type StdioConnection<I = ()> = Connection<StdioTransport, I>;
129
130pin_project! {
131    /// The receiver half of an LSP connection.
132    ///
133    /// An opaque stream of inbound LSP messages. Implements
134    /// [`Stream<Item = Result<Message, io::Error>>`](futures::Stream) so you can
135    /// use it with [`StreamExt::next()`](futures::StreamExt::next).
136    ///
137    /// Obtain one via [`Connection::into_receiver`] or [`Connection::receiver_mut`].
138    ///
139    /// # Example
140    ///
141    /// ```
142    /// use futures::StreamExt;
143    /// use lsp_server_tokio::Connection;
144    ///
145    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
146    /// let (stream, _) = tokio::io::duplex(4096);
147    /// let mut conn: Connection<_, ()> = Connection::new(stream);
148    ///
149    /// // Poll the next inbound message
150    /// // let msg = conn.receiver_mut().next().await;
151    /// # });
152    /// ```
153    pub struct MessageStream<T> {
154        #[pin]
155        inner: SplitStream<Transport<T>>,
156    }
157}
158
159impl<T> MessageStream<T> {
160    fn new(inner: SplitStream<Transport<T>>) -> Self {
161        Self { inner }
162    }
163}
164
165impl<T: AsyncRead + AsyncWrite> futures::Stream for MessageStream<T> {
166    type Item = Result<Message, io::Error>;
167
168    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
169        self.project().inner.poll_next(cx)
170    }
171}
172
173/// A bidirectional LSP connection with split sender/receiver and request tracking.
174///
175/// `Connection` is the primary type for LSP communication. It provides:
176///
177/// - [`send()`](Connection::send): Send outbound messages (non-blocking, channel-based)
178/// - [`receiver_mut()`](Connection::receiver_mut): A stream for receiving inbound messages
179/// - [`request_queue`](Connection::request_queue): Tracking for pending requests
180///
181/// At construction, a background drain task is spawned to forward messages from
182/// an internal channel to the underlying transport. This means [`send()`](Connection::send)
183/// never blocks on I/O — it only enqueues the message.
184///
185/// # Type Parameters
186///
187/// - `T`: The underlying I/O stream type (`AsyncRead + AsyncWrite`)
188/// - `I`: Metadata type for incoming requests (default: `()`)
189///
190/// # Examples
191///
192/// ## Basic construction
193///
194/// ```
195/// use lsp_server_tokio::Connection;
196///
197/// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
198/// let (stream, _) = tokio::io::duplex(4096);
199///
200/// // Simple connection with unit metadata types
201/// let conn: Connection<_, ()> = Connection::new(stream);
202/// # });
203/// ```
204///
205/// ## With custom metadata types
206///
207/// ```
208/// use lsp_server_tokio::Connection;
209///
210/// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
211/// let (stream, _) = tokio::io::duplex(4096);
212///
213/// // Connection tracking method names for incoming, JSON values for outgoing
214/// let conn: Connection<_, String> = Connection::new(stream);
215/// # });
216/// ```
217pub struct Connection<T, I = ()>
218where
219    T: AsyncRead + AsyncWrite,
220{
221    /// Channel for sending outbound messages to the background drain task.
222    ///
223    /// The drain task forwards messages to the underlying transport sink.
224    /// This channel is always available — no `Option`, no "taken" state.
225    outbound_tx: mpsc::UnboundedSender<Message>,
226
227    /// The receiver half for inbound messages.
228    ///
229    /// Use [`receiver_mut()`](Connection::receiver_mut) to access the stream,
230    /// or [`into_receiver()`](Connection::into_receiver) to take ownership.
231    receiver: MessageStream<T>,
232
233    /// Request queue for tracking pending incoming and outgoing requests.
234    ///
235    /// - `request_queue.incoming`: Track requests you've received and need to respond to
236    /// - `request_queue.outgoing`: Track requests you've sent and are awaiting responses for
237    pub request_queue: RequestQueue<I>,
238
239    /// The current lifecycle state of the connection.
240    lifecycle_state: LifecycleState,
241
242    /// Cancellation token that is triggered when shutdown is requested.
243    shutdown_token: CancellationToken,
244
245    /// Shared response routing state used by [`ClientSender`], if enabled.
246    response_map: Option<ResponseMap>,
247
248    /// Cancellation token that is triggered when the background drain task exits.
249    ///
250    /// The drain task holds a [`DropGuard`] for this token. When the task exits
251    /// (e.g., because the transport is broken), the token is cancelled, which
252    /// allows [`ClientSender::request`] to detect disconnection instead of
253    /// hanging forever waiting for a response that will never arrive.
254    drain_alive: CancellationToken,
255}
256
257impl<T, I> Connection<T, I>
258where
259    T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
260{
261    /// Creates a new connection from an async I/O stream.
262    ///
263    /// This constructor wraps the stream with [`crate::LspCodec`] for Content-Length
264    /// framing, splits it into sender/receiver halves, and spawns a background
265    /// drain task for outbound messages.
266    ///
267    /// # Arguments
268    ///
269    /// * `io` - Any async I/O stream implementing `AsyncRead + AsyncWrite + Unpin + Send + 'static`
270    ///
271    /// # Example
272    ///
273    /// ```
274    /// use lsp_server_tokio::Connection;
275    ///
276    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
277    /// let (stream, _) = tokio::io::duplex(4096);
278    /// let conn: Connection<_, ()> = Connection::new(stream);
279    /// # });
280    /// ```
281    pub fn new(io: T) -> Self {
282        let transport = transport(io);
283        Self::from_transport(transport)
284    }
285
286    /// Creates a new connection from an existing transport.
287    ///
288    /// This is useful when you already have a [`Transport`] and want to
289    /// upgrade it to a full `Connection` with request tracking.
290    ///
291    /// # Arguments
292    ///
293    /// * `transport` - An existing LSP transport
294    ///
295    /// # Example
296    ///
297    /// ```
298    /// use lsp_server_tokio::{transport, Connection};
299    ///
300    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
301    /// let (stream, _) = tokio::io::duplex(4096);
302    /// let transport = transport(stream);
303    /// let conn: Connection<_, ()> = Connection::from_transport(transport);
304    /// # });
305    /// ```
306    pub fn from_transport(transport: Transport<T>) -> Self {
307        let (sender, receiver) = transport.split();
308        let (tx, drain_alive) = spawn_drain_task(sender);
309
310        Self {
311            outbound_tx: tx,
312            receiver: MessageStream::new(receiver),
313            request_queue: RequestQueue::new(),
314            lifecycle_state: LifecycleState::default(),
315            shutdown_token: CancellationToken::new(),
316            response_map: None,
317            drain_alive,
318        }
319    }
320
321    /// Creates a new connection with a pre-existing request queue.
322    ///
323    /// This constructor allows you to provide your own [`RequestQueue`], which
324    /// is useful for:
325    /// - Testing with pre-populated request state
326    /// - Migrating state between connections
327    /// - Using custom metadata types
328    ///
329    /// # Arguments
330    ///
331    /// * `io` - Any async I/O stream implementing `AsyncRead + AsyncWrite`
332    /// * `request_queue` - A pre-existing request queue
333    ///
334    /// # Example
335    ///
336    /// ```
337    /// use lsp_server_tokio::{Connection, RequestQueue};
338    ///
339    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
340    /// let (stream, _) = tokio::io::duplex(4096);
341    ///
342    /// // Create a queue with some pre-registered requests
343    /// use tokio_util::sync::CancellationToken;
344    /// let mut queue: RequestQueue<u32> = RequestQueue::new();
345    /// let token = CancellationToken::new();
346    /// queue.incoming.register(1.into(), 100, token);
347    ///
348    /// let conn = Connection::with_request_queue(stream, queue);
349    /// assert!(conn.request_queue.incoming.is_pending(&1.into()));
350    /// # });
351    /// ```
352    pub fn with_request_queue(io: T, request_queue: RequestQueue<I>) -> Self {
353        let transport = transport(io);
354        let (sender, receiver) = transport.split();
355        let (tx, drain_alive) = spawn_drain_task(sender);
356
357        Self {
358            outbound_tx: tx,
359            receiver: MessageStream::new(receiver),
360            request_queue,
361            lifecycle_state: LifecycleState::default(),
362            shutdown_token: CancellationToken::new(),
363            response_map: None,
364            drain_alive,
365        }
366    }
367
368    /// Returns the current lifecycle state.
369    #[must_use]
370    pub fn lifecycle_state(&self) -> LifecycleState {
371        self.lifecycle_state
372    }
373
374    /// Returns a token that is cancelled when shutdown is requested.
375    ///
376    /// Use this to gracefully stop background tasks when the server is
377    /// shutting down.
378    ///
379    /// # Example
380    ///
381    /// ```no_run
382    /// use lsp_server_tokio::Connection;
383    /// use tokio_util::sync::CancellationToken;
384    ///
385    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
386    /// let (stream, _) = tokio::io::duplex(4096);
387    /// let conn: Connection<_, ()> = Connection::new(stream);
388    ///
389    /// let token = conn.shutdown_token();
390    /// tokio::spawn(async move {
391    ///     loop {
392    ///         tokio::select! {
393    ///             _ = token.cancelled() => {
394    ///                 // Clean up and exit
395    ///                 break;
396    ///             }
397    ///             // ... other work ...
398    ///         }
399    ///     }
400    /// });
401    /// # });
402    /// ```
403    #[must_use]
404    pub fn shutdown_token(&self) -> CancellationToken {
405        self.shutdown_token.clone()
406    }
407
408    /// Returns a future that completes when shutdown is requested.
409    ///
410    /// This is equivalent to `self.shutdown_token().cancelled()` but more
411    /// convenient for simple use cases.
412    pub fn on_shutdown(&self) -> impl std::future::Future<Output = ()> + '_ {
413        self.shutdown_token.cancelled()
414    }
415}
416
417// Methods that only use the channel (no transport bounds needed beyond the struct constraint)
418impl<T, I> Connection<T, I>
419where
420    T: AsyncRead + AsyncWrite,
421{
422    /// Returns a mutable reference to the inbound message stream.
423    ///
424    /// Use with [`StreamExt::next()`](futures::StreamExt::next) to receive
425    /// messages from the peer.
426    ///
427    /// # Example
428    ///
429    /// ```
430    /// use futures::StreamExt;
431    /// use lsp_server_tokio::Connection;
432    ///
433    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
434    /// let (stream, _) = tokio::io::duplex(4096);
435    /// let mut conn: Connection<_, ()> = Connection::new(stream);
436    ///
437    /// // Poll the next inbound message
438    /// // let msg = conn.receiver_mut().next().await;
439    /// # });
440    /// ```
441    pub fn receiver_mut(&mut self) -> &mut MessageStream<T> {
442        &mut self.receiver
443    }
444
445    /// Consumes the connection and returns the inbound message stream.
446    ///
447    /// This is useful when you need to move the stream into a separate task.
448    ///
449    /// # Example
450    ///
451    /// ```
452    /// use futures::StreamExt;
453    /// use lsp_server_tokio::Connection;
454    ///
455    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
456    /// let (stream, _) = tokio::io::duplex(4096);
457    /// let conn: Connection<_, ()> = Connection::new(stream);
458    ///
459    /// let mut receiver = conn.into_receiver();
460    /// // tokio::spawn(async move { while let Some(msg) = receiver.next().await { ... } });
461    /// # });
462    /// ```
463    #[must_use]
464    pub fn into_receiver(self) -> MessageStream<T> {
465        self.receiver
466    }
467
468    /// Sends a message through the connection.
469    ///
470    /// Messages are forwarded to the background drain task which writes them
471    /// to the underlying transport. This method never blocks on I/O — it only
472    /// enqueues the message on an unbounded channel.
473    ///
474    /// # Errors
475    ///
476    /// Returns an error if the background drain task has exited (connection closed).
477    ///
478    /// # Example
479    ///
480    /// ```
481    /// use lsp_server_tokio::{Connection, Message, Request};
482    ///
483    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
484    /// let (client_stream, _server_stream) = tokio::io::duplex(4096);
485    /// let conn: Connection<_, ()> = Connection::new(client_stream);
486    ///
487    /// let request = Message::Request(Request::new(1, "test", None));
488    /// conn.send(request).unwrap();
489    /// # });
490    /// ```
491    pub fn send(&self, message: Message) -> Result<(), io::Error> {
492        self.outbound_tx
493            .send(message)
494            .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "connection closed"))
495    }
496}
497
498// Message routing methods
499impl<T, I> Connection<T, I>
500where
501    T: AsyncRead + AsyncWrite,
502    I: Default,
503{
504    /// Routes an incoming message, delivering responses to pending outgoing requests.
505    ///
506    /// Call this method for each message received from the transport to classify it
507    /// and automatically deliver responses to their corresponding outgoing request receivers.
508    ///
509    /// For [`IncomingMessage::Request`](crate::IncomingMessage::Request) variants, the request
510    /// is automatically registered with a [`CancellationToken`] that is:
511    /// - Cancelled when `$/cancelRequest` is received for this request ID
512    /// - Cancelled when the connection shuts down
513    ///
514    /// # Returns
515    ///
516    /// - [`IncomingMessage::Request`](crate::IncomingMessage::Request) - Handle the request and send a response
517    /// - [`IncomingMessage::Notification`](crate::IncomingMessage::Notification) - Handle the notification
518    /// - [`IncomingMessage::ResponseRouted`](crate::IncomingMessage::ResponseRouted) - Response was delivered to awaiting receiver
519    /// - [`IncomingMessage::ResponseUnknown`](crate::IncomingMessage::ResponseUnknown) - No pending request for this response ID
520    ///
521    /// # Example
522    ///
523    /// ```no_run
524    /// use lsp_server_tokio::{Connection, Message, IncomingMessage, Response};
525    /// use futures::StreamExt;
526    ///
527    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
528    /// let (stream, _) = tokio::io::duplex(4096);
529    /// let mut conn: Connection<_, ()> = Connection::new(stream);
530    ///
531    /// while let Some(Ok(msg)) = conn.receiver_mut().next().await {
532    ///     match conn.route(msg) {
533    ///         IncomingMessage::Request(req, token) => {
534    ///             // Handle request with cooperative cancellation
535    ///             println!("Request: {}", req.method);
536    ///             // Use token.cancelled().await in select! for cancellation
537    ///             // After handling, call conn.request_queue.incoming.complete(&req.id)
538    ///         }
539    ///         IncomingMessage::Notification(notif) => {
540    ///             // Handle notification
541    ///             println!("Notification: {}", notif.method);
542    ///         }
543    ///         IncomingMessage::CancelHandled => {
544    ///             // `$/cancelRequest` was handled by route()
545    ///         }
546    ///         IncomingMessage::ResponseRouted => {
547    ///             // Response delivered to awaiting task
548    ///         }
549    ///         IncomingMessage::ResponseUnknown(resp) => {
550    ///             // Log unexpected response
551    ///             eprintln!("Unknown response: {:?}", resp.id);
552    ///         }
553    ///         _ => {}
554    ///     }
555    /// }
556    /// # });
557    /// ```
558    pub fn route(&mut self, message: Message) -> crate::IncomingMessage {
559        match message {
560            Message::Request(req) => {
561                let token = self.shutdown_token.child_token();
562                self.request_queue
563                    .incoming
564                    .register(req.id.clone(), I::default(), token.clone());
565                crate::IncomingMessage::Request(req, token)
566            }
567            Message::Notification(notif) => {
568                if notif.method == crate::request_queue::CANCEL_REQUEST_METHOD {
569                    if let Some(id) = crate::parse_cancel_params(&notif.params) {
570                        let _ = self.request_queue.incoming.cancel(&id);
571                        crate::IncomingMessage::CancelHandled
572                    } else {
573                        // Malformed cancel — let consumer handle it
574                        crate::IncomingMessage::Notification(notif)
575                    }
576                } else {
577                    crate::IncomingMessage::Notification(notif)
578                }
579            }
580            Message::Response(resp) => {
581                if let Some(id) = resp.id.clone() {
582                    // Try ClientSender's response map first (avoids cloning unless needed)
583                    if let Some(response_map) = self.response_map.as_ref() {
584                        if response_map.contains(&id) && response_map.deliver(&id, resp.clone()) {
585                            return crate::IncomingMessage::ResponseRouted;
586                        }
587                    }
588
589                    // Check if there's a pending request for this response
590                    if self.request_queue.outgoing.is_pending(&id) {
591                        // Complete the request - this sends the response to the awaiting receiver
592                        self.request_queue.outgoing.complete(&id, resp);
593                        crate::IncomingMessage::ResponseRouted
594                    } else {
595                        // No pending request for this ID
596                        crate::IncomingMessage::ResponseUnknown(resp)
597                    }
598                } else {
599                    // Response with null ID (parse error response)
600                    crate::IncomingMessage::ResponseUnknown(resp)
601                }
602            }
603        }
604    }
605
606    /// Cancels an incoming request by request ID.
607    ///
608    /// This is a convenience method that cancels the [`CancellationToken`] for a registered
609    /// incoming request. Use this when you receive a `$/cancelRequest` notification.
610    ///
611    /// # Arguments
612    ///
613    /// * `id` - The request ID to cancel
614    ///
615    /// # Returns
616    ///
617    /// `true` if the request was found and cancelled, `false` if not found.
618    ///
619    /// # Example
620    ///
621    /// ```
622    /// use lsp_server_tokio::Connection;
623    ///
624    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
625    /// let (stream, _) = tokio::io::duplex(4096);
626    /// let mut conn: Connection<_, ()> = Connection::new(stream);
627    ///
628    /// // After receiving $/cancelRequest for ID 42:
629    /// let was_cancelled = conn.cancel_incoming(42);
630    /// # });
631    /// ```
632    pub fn cancel_incoming(&mut self, id: impl Into<crate::RequestId>) -> bool {
633        self.request_queue.incoming.cancel(&id.into())
634    }
635
636    /// Returns a cloneable [`ClientSender`] for sending requests, responses,
637    /// and notifications.
638    ///
639    /// The first call initializes the response routing infrastructure.
640    /// Subsequent calls return a new handle to the same underlying channel.
641    ///
642    /// # Example
643    ///
644    /// ```
645    /// use lsp_server_tokio::{Connection, Response};
646    ///
647    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
648    /// let (stream, _) = tokio::io::duplex(4096);
649    /// let mut conn: Connection<_, ()> = Connection::new(stream);
650    ///
651    /// let sender1 = conn.client_sender();
652    /// let sender2 = conn.client_sender(); // returns another handle
653    /// # });
654    /// ```
655    #[must_use]
656    pub fn client_sender(&mut self) -> ClientSender {
657        if let Some(ref response_map) = self.response_map {
658            return ClientSender::new(
659                self.outbound_tx.clone(),
660                response_map.clone(),
661                self.drain_alive.clone(),
662            );
663        }
664
665        let response_map = ResponseMap::new();
666        self.response_map = Some(response_map.clone());
667        ClientSender::new(
668            self.outbound_tx.clone(),
669            response_map,
670            self.drain_alive.clone(),
671        )
672    }
673}
674
675// Outgoing request cancellation methods
676impl<T, I> Connection<T, I>
677where
678    T: AsyncRead + AsyncWrite,
679{
680    /// Cancels an outgoing request by sending $/cancelRequest and removing it from the queue.
681    ///
682    /// This method is for cancelling requests that **this connection sent** (outgoing requests).
683    /// For incoming request cancellation, use [`route()`](Self::route) which handles
684    /// `$/cancelRequest` notifications automatically.
685    ///
686    /// # Behavior
687    ///
688    /// 1. Sends a `$/cancelRequest` notification with the given request ID
689    /// 2. Removes the request from the outgoing queue (the awaiting receiver will get `RecvError`)
690    ///
691    /// # Returns
692    ///
693    /// - `Ok(true)` if the request was pending and cancelled
694    /// - `Ok(false)` if the request was not found in the queue
695    /// - `Err` if sending the notification failed
696    ///
697    /// # Errors
698    ///
699    /// Returns an error if the cancellation notification cannot be sent to the peer.
700    ///
701    /// # Example
702    ///
703    /// ```
704    /// use lsp_server_tokio::{Connection, Response};
705    /// use futures::SinkExt;
706    ///
707    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
708    /// let (client_stream, server_stream) = tokio::io::duplex(4096);
709    /// let mut conn: Connection<_, ()> = Connection::new(client_stream);
710    ///
711    /// // Register an outgoing request
712    /// let rx = conn.request_queue.outgoing.register(42.into());
713    ///
714    /// // Cancel it
715    /// let was_pending = conn.cancel(42).unwrap();
716    /// assert!(was_pending);
717    ///
718    /// // The receiver will get an error
719    /// assert!(rx.await.is_err());
720    /// # });
721    /// ```
722    pub fn cancel(&mut self, id: impl Into<crate::RequestId>) -> Result<bool, std::io::Error> {
723        use crate::request_queue::CANCEL_REQUEST_METHOD;
724
725        let id = id.into();
726
727        // Build the $/cancelRequest notification
728        let notification =
729            crate::Notification::new(CANCEL_REQUEST_METHOD, Some(serde_json::json!({"id": id})));
730
731        // Send the notification
732        self.send(Message::Notification(notification))?;
733
734        // Cancel in the queue (returns true if was pending)
735        Ok(self.request_queue.outgoing.cancel(&id))
736    }
737}
738
739// Lifecycle management methods
740impl<T, I> Connection<T, I>
741where
742    T: AsyncRead + AsyncWrite,
743{
744    /// Waits for the initialize request from the client.
745    ///
746    /// This method blocks until an initialize request is received, rejecting
747    /// any other requests with `ServerNotInitialized` error and dropping
748    /// notifications (except exit which disconnects).
749    ///
750    /// Returns the request ID and params for the initialize request.
751    /// You must call [`initialize_finish()`](Self::initialize_finish) with the
752    /// same ID to complete the handshake.
753    ///
754    /// # Errors
755    ///
756    /// - [`ProtocolError::Disconnected`] if the connection is closed or exit notification received
757    /// - [`ProtocolError::Io`] if an I/O error occurs
758    ///
759    /// # Example
760    ///
761    /// ```no_run
762    /// use lsp_server_tokio::Connection;
763    ///
764    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
765    /// let (stream, _) = tokio::io::duplex(4096);
766    /// let mut conn: Connection<_, ()> = Connection::new(stream);
767    ///
768    /// let (id, params) = conn.initialize_start().await.unwrap();
769    /// // Process params, build the full InitializeResult payload...
770    /// let capabilities = serde_json::json!({"textDocumentSync": 1});
771    /// let result = serde_json::json!({
772    ///     "capabilities": capabilities,
773    ///     "serverInfo": {"name": "my-server", "version": "0.1.0"}
774    /// });
775    /// conn.initialize_finish(id, result).await.unwrap();
776    /// # });
777    /// ```
778    pub async fn initialize_start(
779        &mut self,
780    ) -> Result<(crate::RequestId, serde_json::Value), ProtocolError> {
781        loop {
782            match self.receiver_mut().next().await {
783                Some(Ok(Message::Request(req))) => {
784                    if req.method == "initialize" {
785                        self.lifecycle_state = LifecycleState::Initializing;
786                        return Ok((req.id, req.params.unwrap_or(serde_json::Value::Null)));
787                    }
788
789                    // Reject non-initialize requests with ServerNotInitialized
790                    let error = crate::ResponseError::new(
791                        crate::ErrorCode::ServerNotInitialized,
792                        "Server not yet initialized",
793                    );
794                    let response = Message::Response(crate::Response::err(req.id, error));
795                    if let Err(e) = self.send(response) {
796                        return Err(ProtocolError::Io(e));
797                    }
798                    // Continue waiting for initialize
799                }
800                Some(Ok(Message::Notification(notif))) => {
801                    if notif.method == "exit" {
802                        return Err(ProtocolError::Disconnected);
803                    }
804                    // Drop other notifications silently
805                }
806                Some(Ok(Message::Response(_))) => {
807                    // Unexpected response, ignore
808                }
809                Some(Err(e)) => {
810                    return Err(ProtocolError::Io(e));
811                }
812                None => {
813                    return Err(ProtocolError::Disconnected);
814                }
815            }
816        }
817    }
818
819    /// Completes the initialization handshake.
820    ///
821    /// Sends the `InitializeResult` response and waits for the initialized
822    /// notification from the client. After this returns `Ok(())`, the
823    /// connection is in Running state and ready for normal operation.
824    ///
825    /// # Arguments
826    ///
827    /// * `id` - The request ID from [`initialize_start()`](Self::initialize_start)
828    /// * `initialize_result` - The full `InitializeResult` value as JSON, sent
829    ///   verbatim as the response result. Must include `capabilities` and may
830    ///   include `serverInfo` or any other fields defined by the LSP spec.
831    ///
832    /// # Errors
833    ///
834    /// - [`ProtocolError::Disconnected`] if the connection is closed
835    /// - [`ProtocolError::InitializeTimeout`] if the client does not send
836    ///   `initialized` within 60 seconds
837    /// - [`ProtocolError::Io`] if an I/O error occurs
838    ///
839    /// # Example
840    ///
841    /// ```no_run
842    /// use lsp_server_tokio::Connection;
843    ///
844    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
845    /// let (stream, _) = tokio::io::duplex(4096);
846    /// let mut conn: Connection<_, ()> = Connection::new(stream);
847    ///
848    /// let (id, _params) = conn.initialize_start().await.unwrap();
849    /// let result = serde_json::json!({
850    ///     "capabilities": {"textDocumentSync": 1},
851    ///     "serverInfo": {"name": "my-server", "version": "0.1.0"}
852    /// });
853    /// conn.initialize_finish(id, result).await.unwrap();
854    ///
855    /// assert!(conn.is_running());
856    /// # });
857    /// ```
858    pub async fn initialize_finish(
859        &mut self,
860        id: crate::RequestId,
861        initialize_result: serde_json::Value,
862    ) -> Result<(), ProtocolError> {
863        use std::time::Duration;
864
865        // Send the response with the full InitializeResult verbatim
866        let response = Message::Response(crate::Response::ok(id, initialize_result));
867        if let Err(e) = self.send(response) {
868            return Err(ProtocolError::Io(e));
869        }
870
871        tokio::time::timeout(Duration::from_mins(1), async {
872            loop {
873                match self.receiver_mut().next().await {
874                    Some(Ok(Message::Notification(notif))) => {
875                        if notif.method == "initialized" {
876                            self.lifecycle_state = LifecycleState::Running;
877                            return Ok(());
878                        }
879                        // Drop other notifications silently
880                    }
881                    Some(Ok(Message::Request(req))) => {
882                        // Still initializing, reject with ServerNotInitialized
883                        let error = crate::ResponseError::new(
884                            crate::ErrorCode::ServerNotInitialized,
885                            "Server not yet initialized",
886                        );
887                        let response = Message::Response(crate::Response::err(req.id, error));
888                        if let Err(e) = self.send(response) {
889                            return Err(ProtocolError::Io(e));
890                        }
891                    }
892                    Some(Ok(Message::Response(_))) => {
893                        // Ignore unexpected responses
894                    }
895                    Some(Err(e)) => {
896                        return Err(ProtocolError::Io(e));
897                    }
898                    None => {
899                        return Err(ProtocolError::Disconnected);
900                    }
901                }
902            }
903        })
904        .await
905        .map_err(|_| ProtocolError::InitializeTimeout)?
906    }
907
908    /// Performs complete LSP initialization handshake.
909    ///
910    /// This is a convenience method that calls [`initialize_start()`](Self::initialize_start)
911    /// followed by [`initialize_finish()`](Self::initialize_finish).
912    /// Returns the initialize params from the client.
913    ///
914    /// Unlike [`initialize_finish()`](Self::initialize_finish) which takes a full
915    /// `InitializeResult`, this method takes just the server capabilities and
916    /// wraps them in `{"capabilities": ...}` automatically.
917    ///
918    /// # Arguments
919    ///
920    /// * `server_capabilities` - The server's capabilities as JSON (will be
921    ///   wrapped in `{"capabilities": server_capabilities}`)
922    ///
923    /// # Errors
924    ///
925    /// - [`ProtocolError::Disconnected`] if the connection is closed
926    /// - [`ProtocolError::Io`] if an I/O error occurs
927    ///
928    /// # Example
929    ///
930    /// ```no_run
931    /// use lsp_server_tokio::Connection;
932    ///
933    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
934    /// let (stream, _) = tokio::io::duplex(4096);
935    /// let mut conn: Connection<_, ()> = Connection::new(stream);
936    ///
937    /// let capabilities = serde_json::json!({"textDocumentSync": 1});
938    /// let client_params = conn.initialize(capabilities).await.unwrap();
939    /// println!("Client capabilities: {}", client_params);
940    /// # });
941    /// ```
942    pub async fn initialize(
943        &mut self,
944        server_capabilities: serde_json::Value,
945    ) -> Result<serde_json::Value, ProtocolError> {
946        let (id, params) = self.initialize_start().await?;
947        let initialize_result = serde_json::json!({
948            "capabilities": server_capabilities
949        });
950        self.initialize_finish(id, initialize_result).await?;
951        Ok(params)
952    }
953
954    /// Handles a shutdown request.
955    ///
956    /// Transitions to `ShuttingDown` state, cancels the shutdown token,
957    /// and sends a null response. After this, only exit notification
958    /// should be received.
959    ///
960    /// # Arguments
961    ///
962    /// * `id` - The request ID of the shutdown request
963    ///
964    /// # Errors
965    ///
966    /// - [`ProtocolError::Io`] if sending the response fails
967    ///
968    /// # Example
969    ///
970    /// ```no_run
971    /// use lsp_server_tokio::{Connection, Message};
972    /// use futures::StreamExt;
973    ///
974    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
975    /// let (stream, _) = tokio::io::duplex(4096);
976    /// let mut conn: Connection<_, ()> = Connection::new(stream);
977    ///
978    /// // ... after initialization ...
979    /// // When shutdown request is received:
980    /// // if let Message::Request(req) = msg && req.method == "shutdown" {
981    /// //     conn.handle_shutdown(req.id).unwrap();
982    /// //     assert!(conn.is_shutting_down());
983    /// // }
984    /// # });
985    /// ```
986    pub fn handle_shutdown(&mut self, id: crate::RequestId) -> Result<(), ProtocolError> {
987        // Cancel shutdown token first to notify waiting tasks
988        self.shutdown_token.cancel();
989
990        // Transition state
991        self.lifecycle_state = LifecycleState::ShuttingDown;
992
993        // Send null response
994        let response = Message::Response(crate::Response::ok(id, serde_json::Value::Null));
995        if let Err(e) = self.send(response) {
996            return Err(ProtocolError::Io(e));
997        }
998
999        Ok(())
1000    }
1001
1002    /// Handles the exit notification.
1003    ///
1004    /// Returns [`ExitCode::Success`] (exit code 0) if shutdown was received first,
1005    /// or [`ExitCode::Error`] (exit code 1) if exit came without shutdown.
1006    ///
1007    /// # Example
1008    ///
1009    /// ```
1010    /// use lsp_server_tokio::{Connection, ExitCode, LifecycleState};
1011    ///
1012    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
1013    /// let (stream, _) = tokio::io::duplex(4096);
1014    /// let mut conn: Connection<_, ()> = Connection::new(stream);
1015    ///
1016    /// // Exit without shutdown - dirty exit
1017    /// let code = conn.handle_exit();
1018    /// assert_eq!(code, ExitCode::Error);
1019    /// # });
1020    /// ```
1021    pub fn handle_exit(&mut self) -> ExitCode {
1022        let was_shutting_down = self.lifecycle_state == LifecycleState::ShuttingDown;
1023        self.lifecycle_state = LifecycleState::Exited;
1024
1025        if was_shutting_down {
1026            ExitCode::Success
1027        } else {
1028            ExitCode::Error
1029        }
1030    }
1031
1032    /// Returns true if the connection is in Running state.
1033    ///
1034    /// The connection is in Running state after successful initialization
1035    /// and before shutdown is requested.
1036    #[must_use]
1037    pub fn is_running(&self) -> bool {
1038        self.lifecycle_state == LifecycleState::Running
1039    }
1040
1041    /// Returns true if shutdown has been requested.
1042    ///
1043    /// After shutdown, the server should only expect the exit notification.
1044    #[must_use]
1045    pub fn is_shutting_down(&self) -> bool {
1046        self.lifecycle_state == LifecycleState::ShuttingDown
1047    }
1048}
1049
1050pin_project! {
1051    /// A combined stdin/stdout stream for LSP server communication.
1052    ///
1053    /// This type wraps tokio's [`Stdin`] and [`Stdout`] into a single type that
1054    /// implements both [`AsyncRead`] and [`AsyncWrite`], suitable for use with
1055    /// [`Connection`].
1056    ///
1057    /// This is typically used internally by [`Connection::stdio()`] and doesn't
1058    /// need to be constructed directly.
1059    pub struct StdioTransport {
1060        #[pin]
1061        stdin: Stdin,
1062        #[pin]
1063        stdout: Stdout,
1064    }
1065}
1066
1067impl StdioTransport {
1068    /// Creates a new stdio transport from stdin and stdout.
1069    #[must_use]
1070    pub fn new() -> Self {
1071        Self {
1072            stdin: tokio::io::stdin(),
1073            stdout: tokio::io::stdout(),
1074        }
1075    }
1076}
1077
1078impl Default for StdioTransport {
1079    fn default() -> Self {
1080        Self::new()
1081    }
1082}
1083
1084impl AsyncRead for StdioTransport {
1085    fn poll_read(
1086        self: Pin<&mut Self>,
1087        cx: &mut Context<'_>,
1088        buf: &mut ReadBuf<'_>,
1089    ) -> Poll<io::Result<()>> {
1090        self.project().stdin.poll_read(cx, buf)
1091    }
1092}
1093
1094impl AsyncWrite for StdioTransport {
1095    fn poll_write(
1096        self: Pin<&mut Self>,
1097        cx: &mut Context<'_>,
1098        buf: &[u8],
1099    ) -> Poll<io::Result<usize>> {
1100        self.project().stdout.poll_write(cx, buf)
1101    }
1102
1103    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1104        self.project().stdout.poll_flush(cx)
1105    }
1106
1107    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1108        self.project().stdout.poll_shutdown(cx)
1109    }
1110}
1111
1112impl Connection<StdioTransport, ()> {
1113    /// Creates a connection using stdin for reading and stdout for writing.
1114    ///
1115    /// This is the typical constructor for LSP servers that communicate
1116    /// over standard I/O. Uses unit types `()` for request queue metadata.
1117    ///
1118    /// # Example
1119    ///
1120    /// ```no_run
1121    /// use futures::StreamExt;
1122    /// use lsp_server_tokio::{Connection, Message};
1123    ///
1124    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
1125    /// let mut conn = Connection::stdio();
1126    ///
1127    /// // Process incoming messages
1128    /// while let Some(result) = conn.receiver_mut().next().await {
1129    ///     match result {
1130    ///         Ok(Message::Request(req)) => {
1131    ///             println!("Received request: {}", req.method);
1132    ///             // Handle request...
1133    ///         }
1134    ///         Ok(Message::Notification(notif)) => {
1135    ///             println!("Received notification: {}", notif.method);
1136    ///         }
1137    ///         Ok(Message::Response(resp)) => {
1138    ///             println!("Received response for: {:?}", resp.id);
1139    ///         }
1140    ///         Err(e) => {
1141    ///             eprintln!("Error: {}", e);
1142    ///             break;
1143    ///         }
1144    ///     }
1145    /// }
1146    /// # });
1147    /// ```
1148    ///
1149    /// # Note
1150    ///
1151    /// This constructor uses unit types `()` for both incoming and outgoing
1152    /// request metadata. If you need custom metadata types, use
1153    /// [`Connection::new()`] with a [`StdioTransport`] directly:
1154    ///
1155    /// ```no_run
1156    /// use lsp_server_tokio::{Connection, StdioTransport};
1157    ///
1158    /// let conn: Connection<StdioTransport, String> = Connection::new(StdioTransport::new());
1159    /// ```
1160    #[must_use]
1161    pub fn stdio() -> Self {
1162        Self::new(StdioTransport::new())
1163    }
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168    use super::*;
1169    use crate::{Request, Response, StdioConnection};
1170    use serde_json::json;
1171    use std::time::Duration;
1172
1173    #[tokio::test]
1174    async fn connection_from_duplex_test() {
1175        let (client_stream, server_stream) = tokio::io::duplex(4096);
1176        let client: Connection<_, ()> = Connection::new(client_stream);
1177        let mut server: Connection<_, ()> = Connection::new(server_stream);
1178
1179        // Send request from client
1180        let request = Message::Request(Request::new(1, "test", None));
1181        client.send(request).unwrap();
1182
1183        // Receive on server
1184        let received = server.receiver_mut().next().await.unwrap().unwrap();
1185        assert!(received.is_request());
1186        if let Message::Request(req) = received {
1187            assert_eq!(req.method, "test");
1188            assert_eq!(req.id, 1.into());
1189        } else {
1190            panic!("Expected Request");
1191        }
1192    }
1193
1194    #[tokio::test]
1195    async fn connection_bidirectional_test() {
1196        let (client_stream, server_stream) = tokio::io::duplex(4096);
1197        let mut client: Connection<_, ()> = Connection::new(client_stream);
1198        let mut server: Connection<_, ()> = Connection::new(server_stream);
1199
1200        // Client sends request
1201        let request = Message::Request(Request::new(
1202            1,
1203            "textDocument/hover",
1204            Some(json!({
1205                "textDocument": {"uri": "file:///test.rs"},
1206                "position": {"line": 10, "character": 5}
1207            })),
1208        ));
1209        client.send(request).unwrap();
1210
1211        // Server receives request
1212        let received = server.receiver_mut().next().await.unwrap().unwrap();
1213        assert!(received.is_request());
1214
1215        // Server sends response
1216        let response = Message::Response(Response::ok(
1217            1,
1218            json!({
1219                "contents": "fn main()"
1220            }),
1221        ));
1222        server.send(response).unwrap();
1223
1224        // Client receives response
1225        let received = client.receiver_mut().next().await.unwrap().unwrap();
1226        assert!(received.is_response());
1227        if let Message::Response(resp) = received {
1228            assert_eq!(resp.id, Some(1.into()));
1229            assert!(resp.result().is_some());
1230        } else {
1231            panic!("Expected Response");
1232        }
1233    }
1234
1235    #[tokio::test]
1236    async fn connection_from_transport_test() {
1237        let (client_stream, server_stream) = tokio::io::duplex(4096);
1238
1239        // Create transports manually
1240        let client_transport = transport(client_stream);
1241        let server_transport = transport(server_stream);
1242
1243        // Create connections from transports
1244        let client: Connection<_, ()> = Connection::from_transport(client_transport);
1245        let mut server: Connection<_, ()> = Connection::from_transport(server_transport);
1246
1247        // Verify functionality
1248        let request = Message::Request(Request::new(42, "test", None));
1249        client.send(request).unwrap();
1250
1251        let received = server.receiver_mut().next().await.unwrap().unwrap();
1252        assert!(received.is_request());
1253        if let Message::Request(req) = received {
1254            assert_eq!(req.id, 42.into());
1255        }
1256    }
1257
1258    #[tokio::test]
1259    async fn connection_multiple_messages_test() {
1260        let (client_stream, server_stream) = tokio::io::duplex(4096);
1261        let client: Connection<_, ()> = Connection::new(client_stream);
1262        let mut server: Connection<_, ()> = Connection::new(server_stream);
1263
1264        // Send 3 messages in sequence
1265        let msg1 = Message::Request(Request::new(1, "first", None));
1266        let msg2 = Message::Request(Request::new(2, "second", None));
1267        let msg3 = Message::Request(Request::new(3, "third", None));
1268
1269        client.send(msg1).unwrap();
1270        client.send(msg2).unwrap();
1271        client.send(msg3).unwrap();
1272
1273        // Receive all 3 - order must be preserved
1274        let recv1 = server.receiver_mut().next().await.unwrap().unwrap();
1275        let recv2 = server.receiver_mut().next().await.unwrap().unwrap();
1276        let recv3 = server.receiver_mut().next().await.unwrap().unwrap();
1277
1278        if let Message::Request(r) = recv1 {
1279            assert_eq!(r.method, "first");
1280            assert_eq!(r.id, 1.into());
1281        } else {
1282            panic!("Expected request");
1283        }
1284
1285        if let Message::Request(r) = recv2 {
1286            assert_eq!(r.method, "second");
1287            assert_eq!(r.id, 2.into());
1288        } else {
1289            panic!("Expected request");
1290        }
1291
1292        if let Message::Request(r) = recv3 {
1293            assert_eq!(r.method, "third");
1294            assert_eq!(r.id, 3.into());
1295        } else {
1296            panic!("Expected request");
1297        }
1298    }
1299
1300    #[tokio::test]
1301    async fn sender_receiver_independent_test() {
1302        let (client_stream, server_stream) = tokio::io::duplex(4096);
1303        let client: Connection<_, ()> = Connection::new(client_stream);
1304        let server: Connection<_, ()> = Connection::new(server_stream);
1305
1306        // Send from one task, receive from another
1307        let send_task =
1308            tokio::spawn(
1309                async move { client.send(Message::Request(Request::new(1, "test", None))) },
1310            );
1311
1312        let mut server_receiver = server.receiver;
1313        let recv_task = tokio::spawn(async move { server_receiver.next().await });
1314
1315        let (send_result, recv_result) = tokio::join!(send_task, recv_task);
1316        send_result.unwrap().unwrap();
1317        assert!(recv_result.unwrap().unwrap().unwrap().is_request());
1318    }
1319
1320    #[tokio::test]
1321    async fn connection_has_request_queue_test() {
1322        let (stream, _) = tokio::io::duplex(4096);
1323        let mut conn: Connection<_, String> = Connection::new(stream);
1324
1325        // Use request queue to track an incoming request
1326        let token = CancellationToken::new();
1327        conn.request_queue
1328            .incoming
1329            .register(1.into(), "handler_data".to_string(), token);
1330        assert!(conn.request_queue.incoming.is_pending(&1.into()));
1331
1332        // Complete it
1333        let data = conn.request_queue.incoming.complete(&1.into());
1334        assert_eq!(data, Some("handler_data".to_string()));
1335    }
1336
1337    #[tokio::test]
1338    async fn connection_with_request_queue_test() {
1339        let (stream, _) = tokio::io::duplex(4096);
1340        let mut queue: RequestQueue<u32> = RequestQueue::new();
1341        let token = CancellationToken::new();
1342        queue.incoming.register(42.into(), 100, token);
1343
1344        let conn = Connection::with_request_queue(stream, queue);
1345        assert!(conn.request_queue.incoming.is_pending(&42.into()));
1346    }
1347
1348    #[tokio::test]
1349    async fn connection_outgoing_request_queue_test() {
1350        let (stream, _) = tokio::io::duplex(4096);
1351        let mut conn: Connection<_, ()> = Connection::new(stream);
1352
1353        // Register an outgoing request
1354        let rx = conn.request_queue.outgoing.register(1.into());
1355        assert!(conn.request_queue.outgoing.is_pending(&1.into()));
1356
1357        // Complete it with a response
1358        let completed = conn.request_queue.outgoing.complete(
1359            &1.into(),
1360            Response::ok(1, serde_json::json!("response data")),
1361        );
1362        assert!(completed);
1363
1364        // Receiver gets the response
1365        let response = rx.await.unwrap();
1366        assert_eq!(response.id, Some(1.into()));
1367        assert_eq!(
1368            response.result().cloned(),
1369            Some(serde_json::json!("response data"))
1370        );
1371    }
1372
1373    // Note: Connection::stdio() cannot be tested in unit tests as it requires
1374    // actual stdin/stdout. It will be tested in E2E tests in Phase 9.
1375    // However, we can test that StdioTransport can be constructed.
1376    #[test]
1377    fn stdio_transport_constructible() {
1378        // Just verify it compiles and can be constructed
1379        // We can't actually test I/O without real stdio
1380        let _transport = StdioTransport::new();
1381        let _transport_default = StdioTransport::default();
1382    }
1383
1384    // =========================================================================
1385    // Lifecycle Tests
1386    // =========================================================================
1387
1388    use crate::{ExitCode, LifecycleState, Notification, ProtocolError};
1389
1390    #[tokio::test]
1391    async fn test_initialize_handshake() {
1392        let (client_stream, server_stream) = tokio::io::duplex(4096);
1393        let mut client: Connection<_, ()> = Connection::new(client_stream);
1394        let mut server: Connection<_, ()> = Connection::new(server_stream);
1395
1396        // Client sends initialize request
1397        let init_params = json!({"processId": 1234, "capabilities": {}});
1398        let init_request =
1399            Message::Request(Request::new(1, "initialize", Some(init_params.clone())));
1400        client.send(init_request).unwrap();
1401
1402        // Server waits for initialize
1403        let (id, params) = server.initialize_start().await.unwrap();
1404        assert_eq!(id, 1.into());
1405        assert_eq!(params["processId"], 1234);
1406        assert_eq!(server.lifecycle_state(), LifecycleState::Initializing);
1407
1408        // Spawn task to handle server's initialize_finish
1409        let server_task = tokio::spawn(async move {
1410            let result = json!({"capabilities": {"textDocumentSync": 1}});
1411            server.initialize_finish(id, result).await.unwrap();
1412            server
1413        });
1414
1415        // Client receives InitializeResult
1416        let response = client.receiver_mut().next().await.unwrap().unwrap();
1417        assert!(response.is_response());
1418        if let Message::Response(resp) = response {
1419            assert_eq!(resp.id, Some(1.into()));
1420            assert!(resp.result().is_some());
1421            let result = resp.into_result().unwrap();
1422            assert_eq!(result["capabilities"]["textDocumentSync"], 1);
1423        }
1424
1425        // Client sends initialized notification
1426        let initialized = Message::Notification(Notification::new("initialized", None));
1427        client.send(initialized).unwrap();
1428
1429        // Server's initialize_finish completes
1430        let server = server_task.await.unwrap();
1431        assert!(server.is_running());
1432        assert_eq!(server.lifecycle_state(), LifecycleState::Running);
1433    }
1434
1435    #[tokio::test]
1436    async fn stdio_connection_alias_constructs_with_custom_metadata() {
1437        let conn: StdioConnection<String> = Connection::new(StdioTransport::new());
1438
1439        assert_eq!(conn.lifecycle_state(), LifecycleState::Uninitialized);
1440        assert!(!conn.request_queue.incoming.is_pending(&1.into()));
1441        assert!(!conn.request_queue.outgoing.is_pending(&1.into()));
1442    }
1443
1444    #[tokio::test]
1445    async fn test_initialize_rejects_non_init_requests() {
1446        let (client_stream, server_stream) = tokio::io::duplex(4096);
1447        let mut client: Connection<_, ()> = Connection::new(client_stream);
1448        let mut server: Connection<_, ()> = Connection::new(server_stream);
1449
1450        // Client sends a non-initialize request first
1451        let hover_request = Message::Request(Request::new(1, "textDocument/hover", None));
1452        client.send(hover_request).unwrap();
1453
1454        // Spawn server's initialize_start
1455        let server_task = tokio::spawn(async move {
1456            server.initialize_start().await.unwrap();
1457            server
1458        });
1459
1460        // Client receives ServerNotInitialized error
1461        let response = client.receiver_mut().next().await.unwrap().unwrap();
1462        assert!(response.is_response());
1463        if let Message::Response(resp) = response {
1464            assert_eq!(resp.id, Some(1.into()));
1465            assert!(resp.error().is_some());
1466            let error = resp.into_error().unwrap();
1467            assert_eq!(error.code, crate::ErrorCode::ServerNotInitialized as i32);
1468        }
1469
1470        // Now client sends initialize - should be accepted
1471        let init_request = Message::Request(Request::new(2, "initialize", None));
1472        client.send(init_request).unwrap();
1473
1474        let server = server_task.await.unwrap();
1475        assert_eq!(server.lifecycle_state(), LifecycleState::Initializing);
1476    }
1477
1478    #[tokio::test(start_paused = true)]
1479    async fn test_initialize_finish_times_out_without_initialized() {
1480        let (client_stream, server_stream) = tokio::io::duplex(4096);
1481        let mut client: Connection<_, ()> = Connection::new(client_stream);
1482        let mut server: Connection<_, ()> = Connection::new(server_stream);
1483
1484        client
1485            .send(Message::Request(Request::new(1, "initialize", None)))
1486            .unwrap();
1487
1488        let (id, _params) = server.initialize_start().await.unwrap();
1489
1490        let server_task = tokio::spawn(async move {
1491            server
1492                .initialize_finish(id, json!({"capabilities": {}}))
1493                .await
1494        });
1495
1496        let response = client.receiver_mut().next().await.unwrap().unwrap();
1497        assert!(response.is_response());
1498
1499        tokio::time::advance(Duration::from_secs(61)).await;
1500
1501        let result = server_task.await.unwrap();
1502        assert!(matches!(result, Err(ProtocolError::InitializeTimeout)));
1503    }
1504
1505    #[tokio::test]
1506    async fn test_initialize_drops_notifications() {
1507        let (client_stream, server_stream) = tokio::io::duplex(4096);
1508        let client: Connection<_, ()> = Connection::new(client_stream);
1509        let mut server: Connection<_, ()> = Connection::new(server_stream);
1510
1511        // Client sends random notification before init
1512        let random_notif = Message::Notification(Notification::new("textDocument/didOpen", None));
1513        client.send(random_notif).unwrap();
1514
1515        // Client sends initialize request
1516        let init_request = Message::Request(Request::new(1, "initialize", None));
1517        client.send(init_request).unwrap();
1518
1519        // Server's initialize_start should skip notification and find initialize
1520        let (id, _params) = server.initialize_start().await.unwrap();
1521        assert_eq!(id, 1.into());
1522        assert_eq!(server.lifecycle_state(), LifecycleState::Initializing);
1523    }
1524
1525    #[tokio::test]
1526    async fn test_exit_during_init_disconnects() {
1527        let (client_stream, server_stream) = tokio::io::duplex(4096);
1528        let client: Connection<_, ()> = Connection::new(client_stream);
1529        let mut server: Connection<_, ()> = Connection::new(server_stream);
1530
1531        // Client sends exit notification instead of initialize
1532        let exit_notif = Message::Notification(Notification::new("exit", None));
1533        client.send(exit_notif).unwrap();
1534
1535        // Server's initialize_start should return Disconnected
1536        let result = server.initialize_start().await;
1537        assert!(matches!(result, Err(ProtocolError::Disconnected)));
1538    }
1539
1540    #[tokio::test]
1541    async fn test_shutdown_then_exit() {
1542        let (client_stream, server_stream) = tokio::io::duplex(4096);
1543        let mut client: Connection<_, ()> = Connection::new(client_stream);
1544        let mut server: Connection<_, ()> = Connection::new(server_stream);
1545
1546        // Complete initialization
1547        let init_request = Message::Request(Request::new(1, "initialize", None));
1548        client.send(init_request).unwrap();
1549
1550        let (id, _params) = server.initialize_start().await.unwrap();
1551
1552        let server_task = tokio::spawn(async move {
1553            server
1554                .initialize_finish(id, json!({"capabilities": {}}))
1555                .await
1556                .unwrap();
1557            server
1558        });
1559
1560        let _ = client.receiver_mut().next().await; // Receive InitializeResult
1561        let initialized = Message::Notification(Notification::new("initialized", None));
1562        client.send(initialized).unwrap();
1563
1564        let mut server = server_task.await.unwrap();
1565        assert!(server.is_running());
1566
1567        // Client sends shutdown request
1568        let shutdown_request = Message::Request(Request::new(2, "shutdown", None));
1569        client.send(shutdown_request).unwrap();
1570
1571        // Server receives and handles shutdown
1572        let msg = server.receiver_mut().next().await.unwrap().unwrap();
1573        if let Message::Request(req) = msg {
1574            assert_eq!(req.method, "shutdown");
1575            server.handle_shutdown(req.id).unwrap();
1576        } else {
1577            panic!("Expected shutdown request");
1578        }
1579
1580        // Verify shutdown state
1581        assert!(server.is_shutting_down());
1582        assert!(server.shutdown_token().is_cancelled());
1583
1584        // Client receives null response
1585        let response = client.receiver_mut().next().await.unwrap().unwrap();
1586        if let Message::Response(resp) = response {
1587            assert_eq!(resp.id, Some(2.into()));
1588            assert_eq!(resp.result().cloned(), Some(serde_json::Value::Null));
1589        }
1590
1591        // Client sends exit
1592        // Server handles exit
1593        let exit_code = server.handle_exit();
1594        assert_eq!(exit_code, ExitCode::Success);
1595        assert_eq!(server.lifecycle_state(), LifecycleState::Exited);
1596    }
1597
1598    #[tokio::test]
1599    async fn test_exit_without_shutdown() {
1600        let (client_stream, server_stream) = tokio::io::duplex(4096);
1601        let mut client: Connection<_, ()> = Connection::new(client_stream);
1602        let mut server: Connection<_, ()> = Connection::new(server_stream);
1603
1604        // Complete initialization
1605        let init_request = Message::Request(Request::new(1, "initialize", None));
1606        client.send(init_request).unwrap();
1607
1608        let (id, _params) = server.initialize_start().await.unwrap();
1609
1610        let server_task = tokio::spawn(async move {
1611            server
1612                .initialize_finish(id, json!({"capabilities": {}}))
1613                .await
1614                .unwrap();
1615            server
1616        });
1617
1618        let _ = client.receiver_mut().next().await;
1619        let initialized = Message::Notification(Notification::new("initialized", None));
1620        client.send(initialized).unwrap();
1621
1622        let mut server = server_task.await.unwrap();
1623        assert!(server.is_running());
1624
1625        // Server receives exit without shutdown - dirty exit
1626        let exit_code = server.handle_exit();
1627        assert_eq!(exit_code, ExitCode::Error);
1628        assert_eq!(server.lifecycle_state(), LifecycleState::Exited);
1629    }
1630
1631    #[tokio::test]
1632    async fn test_on_shutdown_future() {
1633        let (client_stream, server_stream) = tokio::io::duplex(4096);
1634        let mut client: Connection<_, ()> = Connection::new(client_stream);
1635        let mut server: Connection<_, ()> = Connection::new(server_stream);
1636
1637        // Complete initialization
1638        let init_request = Message::Request(Request::new(1, "initialize", None));
1639        client.send(init_request).unwrap();
1640
1641        let (id, _params) = server.initialize_start().await.unwrap();
1642
1643        let server_task = tokio::spawn(async move {
1644            server
1645                .initialize_finish(id, json!({"capabilities": {}}))
1646                .await
1647                .unwrap();
1648            server
1649        });
1650
1651        let _ = client.receiver_mut().next().await;
1652        let initialized = Message::Notification(Notification::new("initialized", None));
1653        client.send(initialized).unwrap();
1654
1655        let mut server = server_task.await.unwrap();
1656
1657        // Spawn a task waiting on shutdown
1658        let token = server.shutdown_token();
1659        let wait_task = tokio::spawn(async move {
1660            token.cancelled().await;
1661            "shutdown received"
1662        });
1663
1664        // Send shutdown
1665        let shutdown_request = Message::Request(Request::new(2, "shutdown", None));
1666        client.send(shutdown_request).unwrap();
1667
1668        let msg = server.receiver_mut().next().await.unwrap().unwrap();
1669        if let Message::Request(req) = msg {
1670            server.handle_shutdown(req.id).unwrap();
1671        }
1672
1673        // The wait task should complete now
1674        let result = tokio::time::timeout(std::time::Duration::from_millis(100), wait_task)
1675            .await
1676            .expect("wait task should complete quickly")
1677            .unwrap();
1678
1679        assert_eq!(result, "shutdown received");
1680    }
1681
1682    // =========================================================================
1683    // Routing Tests
1684    // =========================================================================
1685
1686    use crate::IncomingMessage;
1687
1688    #[tokio::test]
1689    async fn route_request_returns_incoming_request() {
1690        let (stream, _) = tokio::io::duplex(4096);
1691        let mut conn: Connection<_, ()> = Connection::new(stream);
1692
1693        let request = Request::new(42, "textDocument/hover", Some(json!({"line": 10})));
1694        let message = Message::Request(request);
1695
1696        let result = conn.route(message);
1697        match result {
1698            IncomingMessage::Request(req, token) => {
1699                assert_eq!(req.id, 42.into());
1700                assert_eq!(req.method, "textDocument/hover");
1701                // Token should not be cancelled yet
1702                assert!(!token.is_cancelled());
1703                // Request should be auto-registered
1704                assert!(conn.request_queue.incoming.is_pending(&42.into()));
1705            }
1706            _ => panic!("Expected IncomingMessage::Request"),
1707        }
1708    }
1709
1710    #[tokio::test]
1711    async fn route_notification_returns_incoming_notification() {
1712        let (stream, _) = tokio::io::duplex(4096);
1713        let mut conn: Connection<_, ()> = Connection::new(stream);
1714
1715        let notification = Notification::new(
1716            "textDocument/didOpen",
1717            Some(json!({"uri": "file:///test.rs"})),
1718        );
1719        let message = Message::Notification(notification);
1720
1721        let result = conn.route(message);
1722        match result {
1723            IncomingMessage::Notification(notif) => {
1724                assert_eq!(notif.method, "textDocument/didOpen");
1725            }
1726            _ => panic!("Expected IncomingMessage::Notification"),
1727        }
1728    }
1729
1730    #[tokio::test]
1731    async fn route_response_to_pending_outgoing_request() {
1732        let (stream, _) = tokio::io::duplex(4096);
1733        let mut conn: Connection<_, ()> = Connection::new(stream);
1734
1735        // Register an outgoing request
1736        let rx = conn.request_queue.outgoing.register(42.into());
1737
1738        // Create a matching response
1739        let response = Response::ok(42, json!({"result": "success"}));
1740        let message = Message::Response(response);
1741
1742        // Route it
1743        let result = conn.route(message);
1744        assert!(
1745            matches!(result, IncomingMessage::ResponseRouted),
1746            "Expected ResponseRouted, got {result:?}"
1747        );
1748
1749        // Verify receiver got the response
1750        let received = rx.await.expect("Should receive response");
1751        assert_eq!(received.id, Some(42.into()));
1752        assert!(received.result().is_some());
1753        assert_eq!(received.into_result().unwrap()["result"], "success");
1754    }
1755
1756    #[tokio::test]
1757    async fn route_response_for_unknown_id_returns_response_unknown() {
1758        let (stream, _) = tokio::io::duplex(4096);
1759        let mut conn: Connection<_, ()> = Connection::new(stream);
1760
1761        // Create a response for an ID that was never registered
1762        let response = Response::ok(999, json!({"unexpected": true}));
1763        let message = Message::Response(response);
1764
1765        let result = conn.route(message);
1766        match result {
1767            IncomingMessage::ResponseUnknown(resp) => {
1768                assert_eq!(resp.id, Some(999.into()));
1769            }
1770            _ => panic!("Expected IncomingMessage::ResponseUnknown"),
1771        }
1772    }
1773
1774    #[tokio::test]
1775    async fn route_response_with_null_id_returns_response_unknown() {
1776        let (stream, _) = tokio::io::duplex(4096);
1777        let mut conn: Connection<_, ()> = Connection::new(stream);
1778
1779        // Create a parse error response (null id)
1780        let response = Response::parse_error(crate::ResponseError::new(
1781            crate::ErrorCode::ParseError,
1782            "Parse error",
1783        ));
1784        let message = Message::Response(response);
1785
1786        let result = conn.route(message);
1787        match result {
1788            IncomingMessage::ResponseUnknown(resp) => {
1789                assert!(
1790                    resp.id.is_none(),
1791                    "Expected null id for parse error response"
1792                );
1793                assert!(resp.error().is_some());
1794            }
1795            _ => panic!("Expected IncomingMessage::ResponseUnknown"),
1796        }
1797    }
1798
1799    #[tokio::test]
1800    async fn route_response_with_string_id() {
1801        let (stream, _) = tokio::io::duplex(4096);
1802        let mut conn: Connection<_, ()> = Connection::new(stream);
1803
1804        // Register with string ID
1805        let rx = conn.request_queue.outgoing.register("request-abc".into());
1806
1807        // Create matching response
1808        let response = Response::ok("request-abc", json!(null));
1809        let message = Message::Response(response);
1810
1811        let result = conn.route(message);
1812        assert!(matches!(result, IncomingMessage::ResponseRouted));
1813
1814        let received = rx.await.expect("Should receive response");
1815        assert_eq!(
1816            received.id,
1817            Some(crate::RequestId::String("request-abc".to_string()))
1818        );
1819    }
1820
1821    #[tokio::test]
1822    async fn route_multiple_responses_to_different_requests() {
1823        let (stream, _) = tokio::io::duplex(4096);
1824        let mut conn: Connection<_, ()> = Connection::new(stream);
1825
1826        // Register multiple outgoing requests
1827        let rx1 = conn.request_queue.outgoing.register(1.into());
1828        let rx2 = conn.request_queue.outgoing.register(2.into());
1829        let rx3 = conn.request_queue.outgoing.register(3.into());
1830
1831        // Route responses out of order
1832        let result2 = conn.route(Message::Response(Response::ok(2, json!("second"))));
1833        assert!(matches!(result2, IncomingMessage::ResponseRouted));
1834
1835        let result1 = conn.route(Message::Response(Response::ok(1, json!("first"))));
1836        assert!(matches!(result1, IncomingMessage::ResponseRouted));
1837
1838        let result3 = conn.route(Message::Response(Response::ok(3, json!("third"))));
1839        assert!(matches!(result3, IncomingMessage::ResponseRouted));
1840
1841        // Verify all receivers got correct responses
1842        let resp1 = rx1.await.unwrap();
1843        assert_eq!(resp1.into_result().unwrap(), json!("first"));
1844
1845        let resp2 = rx2.await.unwrap();
1846        assert_eq!(resp2.into_result().unwrap(), json!("second"));
1847
1848        let resp3 = rx3.await.unwrap();
1849        assert_eq!(resp3.into_result().unwrap(), json!("third"));
1850    }
1851
1852    #[tokio::test]
1853    async fn route_error_response_to_pending_request() {
1854        let (stream, _) = tokio::io::duplex(4096);
1855        let mut conn: Connection<_, ()> = Connection::new(stream);
1856
1857        // Register an outgoing request (we won't await it in this sync test)
1858        let _rx = conn.request_queue.outgoing.register(42.into());
1859
1860        // Route an error response
1861        let response = Response::err(
1862            42,
1863            crate::ResponseError::new(crate::ErrorCode::MethodNotFound, "Not found"),
1864        );
1865        let message = Message::Response(response);
1866
1867        let result = conn.route(message);
1868        assert!(matches!(result, IncomingMessage::ResponseRouted));
1869    }
1870
1871    // =========================================================================
1872    // Cancellation Tests
1873    // =========================================================================
1874
1875    #[tokio::test]
1876    async fn route_cancel_request_returns_cancel_handled_and_cancels_pending() {
1877        let (stream, _) = tokio::io::duplex(4096);
1878        let mut conn: Connection<_, ()> = Connection::new(stream);
1879
1880        // Register a request via route
1881        let request = Request::new(42, "test", None);
1882        let routed = conn.route(Message::Request(request));
1883        let IncomingMessage::Request(_, token) = routed else {
1884            panic!("expected Request");
1885        };
1886        assert!(!token.is_cancelled());
1887
1888        // Send $/cancelRequest via route
1889        let cancel = Notification::new("$/cancelRequest", Some(json!({"id": 42})));
1890        let result = conn.route(Message::Notification(cancel));
1891        assert!(matches!(result, IncomingMessage::CancelHandled));
1892        assert!(token.is_cancelled());
1893    }
1894
1895    #[tokio::test]
1896    async fn route_cancel_request_unknown_id_still_returns_cancel_handled() {
1897        let (stream, _) = tokio::io::duplex(4096);
1898        let mut conn: Connection<_, ()> = Connection::new(stream);
1899
1900        // Cancel a request that was never registered — still CancelHandled
1901        let cancel = Notification::new("$/cancelRequest", Some(json!({"id": 99})));
1902        let result = conn.route(Message::Notification(cancel));
1903        assert!(matches!(result, IncomingMessage::CancelHandled));
1904    }
1905
1906    #[tokio::test]
1907    async fn route_regular_notification_not_affected() {
1908        let (stream, _) = tokio::io::duplex(4096);
1909        let mut conn: Connection<_, ()> = Connection::new(stream);
1910
1911        let notif = Notification::new("textDocument/didOpen", None);
1912        let result = conn.route(Message::Notification(notif));
1913        assert!(matches!(result, IncomingMessage::Notification(_)));
1914    }
1915
1916    #[tokio::test]
1917    async fn cancellation_propagates_to_spawned_handler() {
1918        let (stream, _) = tokio::io::duplex(4096);
1919        let mut conn: Connection<_, ()> = Connection::new(stream);
1920
1921        // Register a request via route
1922        let request = Request::new(1, "test", None);
1923        let result = conn.route(Message::Request(request));
1924        let IncomingMessage::Request(_, token) = result else {
1925            panic!("Expected IncomingMessage::Request")
1926        };
1927
1928        // Spawn a handler that waits for cancellation
1929        let handle = tokio::spawn(async move {
1930            token.cancelled().await;
1931            "cancelled"
1932        });
1933
1934        // Cancel the request
1935        let _ = conn.request_queue.incoming.cancel(&1.into());
1936
1937        // Handler should complete quickly
1938        let result = tokio::time::timeout(std::time::Duration::from_millis(100), handle)
1939            .await
1940            .expect("Handler should complete quickly")
1941            .unwrap();
1942
1943        assert_eq!(result, "cancelled");
1944    }
1945
1946    #[tokio::test]
1947    async fn route_request_auto_registers_and_cancellation_works() {
1948        let (stream, _) = tokio::io::duplex(4096);
1949        let mut conn: Connection<_, ()> = Connection::new(stream);
1950
1951        // Route a request
1952        let request = Request::new(42, "test", None);
1953        let result = conn.route(Message::Request(request));
1954
1955        let IncomingMessage::Request(_, token) = result else {
1956            panic!("Expected IncomingMessage::Request")
1957        };
1958
1959        // Token should not be cancelled
1960        assert!(!token.is_cancelled());
1961
1962        // Cancel via cancel_incoming
1963        let was_cancelled = conn.cancel_incoming(42);
1964        assert!(was_cancelled);
1965
1966        // Token should now be cancelled
1967        assert!(token.is_cancelled());
1968    }
1969
1970    // =========================================================================
1971    // Outgoing Cancel Tests
1972    // =========================================================================
1973
1974    #[tokio::test]
1975    async fn test_cancel_outgoing_request() {
1976        let (client_stream, server_stream) = tokio::io::duplex(4096);
1977        let mut client: Connection<_, ()> = Connection::new(client_stream);
1978        let mut server: Connection<_, ()> = Connection::new(server_stream);
1979
1980        // Register an outgoing request on client
1981        let rx = client.request_queue.outgoing.register(42.into());
1982
1983        // Cancel it - should send notification and remove from queue
1984        let was_pending = client.cancel(42).unwrap();
1985        assert!(was_pending);
1986        assert!(!client.request_queue.outgoing.is_pending(&42.into()));
1987
1988        // Server should receive the $/cancelRequest notification
1989        let msg = server.receiver_mut().next().await.unwrap().unwrap();
1990        assert!(msg.is_notification());
1991        if let Message::Notification(notif) = msg {
1992            assert_eq!(notif.method, "$/cancelRequest");
1993            assert_eq!(notif.params.unwrap()["id"], 42);
1994        } else {
1995            panic!("Expected notification");
1996        }
1997
1998        // The receiver should get an error (sender dropped)
1999        assert!(rx.await.is_err());
2000    }
2001
2002    #[tokio::test]
2003    async fn test_cancel_unknown_outgoing_request() {
2004        let (client_stream, _server_stream) = tokio::io::duplex(4096);
2005        let mut client: Connection<_, ()> = Connection::new(client_stream);
2006
2007        // Cancel a request that was never registered
2008        let was_pending = client.cancel(999).unwrap();
2009        assert!(!was_pending);
2010    }
2011
2012    #[tokio::test]
2013    async fn test_cancel_with_string_id() {
2014        let (client_stream, server_stream) = tokio::io::duplex(4096);
2015        let mut client: Connection<_, ()> = Connection::new(client_stream);
2016        let mut server: Connection<_, ()> = Connection::new(server_stream);
2017
2018        // Register with string ID
2019        let rx = client.request_queue.outgoing.register("req-abc".into());
2020
2021        // Cancel it
2022        let was_pending = client.cancel("req-abc").unwrap();
2023        assert!(was_pending);
2024
2025        // Server should receive the notification with string ID
2026        let msg = server.receiver_mut().next().await.unwrap().unwrap();
2027        if let Message::Notification(notif) = msg {
2028            assert_eq!(notif.params.unwrap()["id"], "req-abc");
2029        } else {
2030            panic!("Expected notification");
2031        }
2032
2033        assert!(rx.await.is_err());
2034    }
2035
2036    // =========================================================================
2037    // ClientSender Integration Tests
2038    // =========================================================================
2039
2040    #[tokio::test]
2041    async fn client_sender_messages_arrive_on_receiver() {
2042        let (client_stream, server_stream) = tokio::io::duplex(4096);
2043        let mut server: Connection<_, ()> = Connection::new(server_stream);
2044        let mut client: Connection<_, ()> = Connection::new(client_stream);
2045
2046        let sender = server.client_sender();
2047
2048        // Notification via ClientSender should arrive on client's receiver
2049        sender
2050            .notify(
2051                "window/logMessage",
2052                Some(json!({"type": 3, "message": "hello"})),
2053            )
2054            .unwrap();
2055
2056        let msg = client.receiver_mut().next().await.unwrap().unwrap();
2057        assert!(msg.is_notification());
2058        if let Message::Notification(notif) = msg {
2059            assert_eq!(notif.method, "window/logMessage");
2060            assert_eq!(notif.params.unwrap()["message"], "hello");
2061        }
2062    }
2063
2064    #[tokio::test]
2065    async fn client_sender_request_routed_through_response_map() {
2066        let (client_stream, server_stream) = tokio::io::duplex(4096);
2067        let mut server: Connection<_, ()> = Connection::new(server_stream);
2068        let mut client: Connection<_, ()> = Connection::new(client_stream);
2069
2070        let sender = server.client_sender();
2071
2072        // Spawn request task
2073        let sender_clone = sender.clone();
2074        let req_task = tokio::spawn(async move {
2075            sender_clone
2076                .request(
2077                    "client/registerCapability",
2078                    Some(json!({"registrations": []})),
2079                )
2080                .await
2081        });
2082
2083        // Client receives the request
2084        let msg = client.receiver_mut().next().await.unwrap().unwrap();
2085        let req_id = if let Message::Request(req) = msg {
2086            assert_eq!(req.method, "client/registerCapability");
2087            req.id.clone()
2088        } else {
2089            panic!("Expected Request");
2090        };
2091
2092        // Client sends response back
2093        client
2094            .send(Message::Response(Response::ok(req_id.clone(), json!(null))))
2095            .unwrap();
2096
2097        // Server reads the response and routes it through response_map
2098        let resp_msg = server.receiver_mut().next().await.unwrap().unwrap();
2099        let routed = server.route(resp_msg);
2100        assert!(
2101            matches!(routed, IncomingMessage::ResponseRouted),
2102            "Expected ResponseRouted, got {routed:?}"
2103        );
2104
2105        // The request task should complete with the response
2106        let result = tokio::time::timeout(Duration::from_secs(1), req_task)
2107            .await
2108            .expect("request should complete")
2109            .unwrap()
2110            .unwrap();
2111        assert_eq!(result.id, Some(req_id));
2112    }
2113
2114    #[tokio::test]
2115    async fn route_prefers_response_map_over_outgoing_queue() {
2116        let (client_stream, _server_stream) = tokio::io::duplex(4096);
2117        let mut server: Connection<_, ()> = Connection::new(client_stream);
2118
2119        let sender = server.client_sender();
2120
2121        // Register the same ID in both response_map (via ClientSender) and outgoing queue
2122        let mut outgoing_rx = server.request_queue.outgoing.register(1.into());
2123
2124        let sender_clone = sender.clone();
2125        let req_task = tokio::spawn(async move { sender_clone.request("test", None).await });
2126
2127        // Give the request task time to register in the response_map
2128        tokio::task::yield_now().await;
2129        tokio::task::yield_now().await;
2130
2131        // Route a response for id=1 — should go to response_map, not outgoing queue
2132        let response = Response::ok(1, json!("via-response-map"));
2133        let result = server.route(Message::Response(response));
2134        assert!(matches!(result, IncomingMessage::ResponseRouted));
2135
2136        // The request task should get the response
2137        let resp = tokio::time::timeout(Duration::from_secs(1), req_task)
2138            .await
2139            .expect("should complete")
2140            .unwrap()
2141            .unwrap();
2142        assert_eq!(resp.result().cloned(), Some(json!("via-response-map")));
2143
2144        // outgoing queue receiver should NOT have received anything (still pending)
2145        assert!(outgoing_rx.try_recv().is_err());
2146    }
2147
2148    #[tokio::test]
2149    async fn send_works_before_and_after_client_sender() {
2150        let (client_stream, server_stream) = tokio::io::duplex(4096);
2151        let server: Connection<_, ()> = Connection::new(server_stream);
2152        let mut client: Connection<_, ()> = Connection::new(client_stream);
2153
2154        // send works before client_sender
2155        server
2156            .send(Message::Notification(Notification::new(
2157                "test/before",
2158                None,
2159            )))
2160            .unwrap();
2161
2162        let msg = client.receiver_mut().next().await.unwrap().unwrap();
2163        if let Message::Notification(notif) = msg {
2164            assert_eq!(notif.method, "test/before");
2165        }
2166    }
2167
2168    #[tokio::test]
2169    async fn client_sender_can_be_called_multiple_times() {
2170        let (stream, _) = tokio::io::duplex(4096);
2171        let mut conn: Connection<_, ()> = Connection::new(stream);
2172
2173        let sender1 = conn.client_sender();
2174        let sender2 = conn.client_sender(); // should NOT panic
2175        drop(sender1);
2176        drop(sender2);
2177    }
2178
2179    #[tokio::test]
2180    async fn send_works_after_client_sender() {
2181        let (client_stream, server_stream) = tokio::io::duplex(4096);
2182        let mut server: Connection<_, ()> = Connection::new(server_stream);
2183        let mut client: Connection<_, ()> = Connection::new(client_stream);
2184
2185        let _sender = server.client_sender();
2186
2187        // send() should still work after client_sender()
2188        server
2189            .send(Message::Notification(Notification::new("test/ping", None)))
2190            .unwrap();
2191
2192        let msg = client.receiver_mut().next().await.unwrap().unwrap();
2193        assert!(msg.is_notification());
2194        if let Message::Notification(notif) = msg {
2195            assert_eq!(notif.method, "test/ping");
2196        }
2197    }
2198}