Skip to main content

lsp_server_tokio/
client_sender.rs

1//! Cloneable client communication handle with response routing.
2
3use std::collections::HashMap;
4use std::sync::atomic::{AtomicI64, Ordering};
5use std::sync::Arc;
6use std::time::Duration;
7
8use parking_lot::Mutex;
9
10use thiserror::Error;
11use tokio::sync::{mpsc, oneshot};
12use tokio_util::sync::CancellationToken;
13
14use crate::{Message, Notification, ProtocolError, Request, RequestId, Response};
15
16/// Shared map for routing responses to waiting callers.
17#[derive(Debug, Clone, Default)]
18pub(crate) struct ResponseMap {
19    pending: Arc<Mutex<HashMap<RequestId, oneshot::Sender<Response>>>>,
20}
21
22impl ResponseMap {
23    pub(crate) fn new() -> Self {
24        Self::default()
25    }
26
27    #[cfg(test)]
28    pub(crate) fn register(&self, id: RequestId) -> oneshot::Receiver<Response> {
29        let (tx, rx) = oneshot::channel();
30        self.pending.lock().insert(id, tx);
31        rx
32    }
33
34    pub(crate) fn try_register(&self, id: RequestId) -> Option<oneshot::Receiver<Response>> {
35        let (tx, rx) = oneshot::channel();
36        let mut pending = self.pending.lock();
37        if pending.contains_key(&id) {
38            return None;
39        }
40
41        pending.insert(id, tx);
42        Some(rx)
43    }
44
45    pub(crate) fn contains(&self, id: &RequestId) -> bool {
46        self.pending.lock().contains_key(id)
47    }
48
49    pub(crate) fn deliver(&self, id: &RequestId, response: Response) -> bool {
50        if let Some(tx) = self.pending.lock().remove(id) {
51            let _ = tx.send(response);
52            true
53        } else {
54            false
55        }
56    }
57
58    pub(crate) fn cancel(&self, id: &RequestId) -> bool {
59        self.pending.lock().remove(id).is_some()
60    }
61}
62
63#[derive(Debug)]
64struct PendingResponseGuard {
65    response_map: ResponseMap,
66    id: Option<RequestId>,
67}
68
69impl PendingResponseGuard {
70    fn new(response_map: ResponseMap, id: RequestId) -> Self {
71        Self {
72            response_map,
73            id: Some(id),
74        }
75    }
76
77    fn disarm(&mut self) {
78        self.id = None;
79    }
80}
81
82impl Drop for PendingResponseGuard {
83    fn drop(&mut self) {
84        if let Some(id) = self.id.take() {
85            let _ = self.response_map.cancel(&id);
86        }
87    }
88}
89
90/// Error returned when the connection is closed and messages can no longer be queued.
91#[derive(Debug, Clone, Error)]
92#[error("connection closed")]
93pub struct SendError;
94
95/// A cloneable, non-blocking handle for server→client communication.
96#[derive(Debug, Clone)]
97pub struct ClientSender {
98    tx: mpsc::UnboundedSender<Message>,
99    response_map: ResponseMap,
100    id_counter: Arc<AtomicI64>,
101    /// Token that is cancelled when the background drain task exits.
102    /// Used to detect disconnection in `request()` instead of hanging forever.
103    drain_alive: CancellationToken,
104}
105
106impl ClientSender {
107    pub(crate) fn new(
108        tx: mpsc::UnboundedSender<Message>,
109        response_map: ResponseMap,
110        drain_alive: CancellationToken,
111    ) -> Self {
112        Self {
113            tx,
114            response_map,
115            id_counter: Arc::new(AtomicI64::new(1)),
116            drain_alive,
117        }
118    }
119
120    /// Queues a notification for delivery to the client.
121    ///
122    /// # Errors
123    ///
124    /// Returns [`SendError`] if the connection has been closed.
125    pub fn notify(&self, method: &str, params: Option<serde_json::Value>) -> Result<(), SendError> {
126        let notification = Notification::new(method, params);
127        self.tx
128            .send(Message::Notification(notification))
129            .map_err(|_| SendError)
130    }
131
132    /// Queues a response for delivery to the client.
133    ///
134    /// # Errors
135    ///
136    /// Returns [`SendError`] if the connection has been closed.
137    pub fn respond(&self, response: Response) -> Result<(), SendError> {
138        self.tx
139            .send(Message::Response(response))
140            .map_err(|_| SendError)
141    }
142
143    /// Sends a request with an auto-generated ID and waits for the response.
144    ///
145    /// # Errors
146    ///
147    /// Returns [`ProtocolError::Disconnected`] if the connection is closed before
148    /// a response arrives.
149    pub async fn request(
150        &self,
151        method: &str,
152        params: Option<serde_json::Value>,
153    ) -> Result<Response, ProtocolError> {
154        let (id, rx) = self.reserve_request_slot().map_err(ProtocolError::Io)?;
155        self.send_registered_request(id, method, params, rx).await
156    }
157
158    /// Sends a request and returns a timeout error if no response arrives in time.
159    ///
160    /// # Errors
161    ///
162    /// Returns [`ProtocolError::RequestTimeout`] if the timeout elapses, or
163    /// [`ProtocolError::Disconnected`] if the connection is closed.
164    pub async fn request_timeout(
165        &self,
166        method: &str,
167        params: Option<serde_json::Value>,
168        timeout: Duration,
169    ) -> Result<Response, ProtocolError> {
170        tokio::time::timeout(timeout, self.request(method, params))
171            .await
172            .map_err(|_| ProtocolError::RequestTimeout)?
173    }
174
175    /// Generates the next unique request ID.
176    ///
177    /// IDs start at 1 (skipping 0 to avoid ambiguity with some clients that
178    /// treat 0 as "no ID") and wrap back to 1 after reaching `i32::MAX`.
179    ///
180    /// # Errors
181    ///
182    /// Returns an error if the atomic counter update fails or the generated
183    /// value is out of `i32` range (both conditions are theoretically
184    /// unreachable with the current wrapping logic).
185    fn next_id(&self) -> std::io::Result<RequestId> {
186        let id = self
187            .id_counter
188            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
189                Some(if current >= i64::from(i32::MAX) {
190                    1
191                } else {
192                    current + 1
193                })
194            })
195            .map_err(|current| {
196                std::io::Error::other(format!(
197                    "request id counter update failed unexpectedly (current value: {current})"
198                ))
199            })?;
200
201        let id = i32::try_from(id).map_err(|e| {
202            std::io::Error::other(format!("generated request id out of i32 range: {e}"))
203        })?;
204
205        Ok(RequestId::Integer(id))
206    }
207
208    fn reserve_request_slot(&self) -> std::io::Result<(RequestId, oneshot::Receiver<Response>)> {
209        const MAX_RETRIES: usize = 1000;
210        for _ in 0..MAX_RETRIES {
211            let id = self.next_id()?;
212            if let Some(rx) = self.response_map.try_register(id.clone()) {
213                return Ok((id, rx));
214            }
215        }
216        Err(std::io::Error::other(
217            "failed to reserve a request slot after 1000 attempts",
218        ))
219    }
220
221    #[cfg(test)]
222    async fn send_request_with_id(
223        &self,
224        id: RequestId,
225        method: &str,
226        params: Option<serde_json::Value>,
227    ) -> Result<Response, ProtocolError> {
228        let rx = self.response_map.register(id.clone());
229        self.send_registered_request(id, method, params, rx).await
230    }
231
232    async fn send_registered_request(
233        &self,
234        id: RequestId,
235        method: &str,
236        params: Option<serde_json::Value>,
237        rx: oneshot::Receiver<Response>,
238    ) -> Result<Response, ProtocolError> {
239        let mut cleanup = PendingResponseGuard::new(self.response_map.clone(), id.clone());
240        let request = Request::new(id.clone(), method, params);
241
242        if self.tx.send(Message::Request(request)).is_err() {
243            return Err(ProtocolError::Disconnected);
244        }
245
246        tokio::select! {
247            result = rx => {
248                match result {
249                    Ok(response) => {
250                        cleanup.disarm();
251                        Ok(response)
252                    }
253                    Err(_) => Err(ProtocolError::Disconnected),
254                }
255            }
256            () = self.drain_alive.cancelled() => {
257                Err(ProtocolError::Disconnected)
258            }
259        }
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266
267    use futures::future::join_all;
268    use futures::StreamExt;
269    use serde_json::json;
270
271    use crate::{Connection, IncomingMessage};
272
273    fn assert_clone_send_sync<T: Clone + Send + Sync>() {}
274
275    #[test]
276    fn client_sender_is_clone_send_sync() {
277        assert_clone_send_sync::<ClientSender>();
278    }
279
280    #[tokio::test]
281    async fn client_sender_notify_sends_notification() {
282        let (client_stream, server_stream) = tokio::io::duplex(4096);
283        let mut server: Connection<_, ()> = Connection::new(server_stream);
284        let mut client: Connection<_, ()> = Connection::new(client_stream);
285        let sender = server.client_sender();
286
287        sender
288            .notify("window/logMessage", Some(json!({"type": 4})))
289            .unwrap();
290
291        match client.receiver_mut().next().await.unwrap().unwrap() {
292            Message::Notification(notification) => {
293                assert_eq!(notification.method, "window/logMessage");
294                assert_eq!(notification.params, Some(json!({"type": 4})));
295            }
296            other => panic!("expected notification, got {other:?}"),
297        }
298    }
299
300    #[tokio::test]
301    async fn client_sender_respond_sends_response() {
302        let (client_stream, server_stream) = tokio::io::duplex(4096);
303        let mut server: Connection<_, ()> = Connection::new(server_stream);
304        let mut client: Connection<_, ()> = Connection::new(client_stream);
305        let sender = server.client_sender();
306
307        sender.respond(Response::ok(7, json!("ok"))).unwrap();
308
309        match client.receiver_mut().next().await.unwrap().unwrap() {
310            Message::Response(response) => {
311                assert_eq!(response.id, Some(7.into()));
312                assert_eq!(response.result().cloned(), Some(json!("ok")));
313            }
314            other => panic!("expected response, got {other:?}"),
315        }
316    }
317
318    #[tokio::test]
319    async fn client_sender_request_auto_id() {
320        let (client_stream, server_stream) = tokio::io::duplex(4096);
321        let mut server: Connection<_, ()> = Connection::new(server_stream);
322        let mut client: Connection<_, ()> = Connection::new(client_stream);
323        let sender = server.client_sender();
324
325        let first_sender = sender.clone();
326        let first_task: tokio::task::JoinHandle<Result<Response, ProtocolError>> =
327            tokio::spawn(async move {
328                first_sender
329                    .request("first", None::<serde_json::Value>)
330                    .await
331            });
332
333        let first_id = match client.receiver_mut().next().await.unwrap().unwrap() {
334            Message::Request(request) => {
335                assert_eq!(request.method, "first");
336                request.id
337            }
338            other => panic!("expected request, got {other:?}"),
339        };
340        assert_eq!(first_id, RequestId::Integer(1));
341
342        let second_sender = sender.clone();
343        let second_task: tokio::task::JoinHandle<Result<Response, ProtocolError>> =
344            tokio::spawn(async move {
345                second_sender
346                    .request("second", None::<serde_json::Value>)
347                    .await
348            });
349
350        let second_id = match client.receiver_mut().next().await.unwrap().unwrap() {
351            Message::Request(request) => {
352                assert_eq!(request.method, "second");
353                request.id
354            }
355            other => panic!("expected request, got {other:?}"),
356        };
357        assert_eq!(second_id, RequestId::Integer(2));
358
359        assert!(matches!(
360            server.route(Message::Response(Response::ok(
361                first_id.clone(),
362                json!("first")
363            ))),
364            IncomingMessage::ResponseRouted
365        ));
366        assert!(matches!(
367            server.route(Message::Response(Response::ok(
368                second_id.clone(),
369                json!("second")
370            ))),
371            IncomingMessage::ResponseRouted
372        ));
373
374        assert_eq!(first_task.await.unwrap().unwrap().id, Some(first_id));
375        assert_eq!(second_task.await.unwrap().unwrap().id, Some(second_id));
376    }
377
378    #[tokio::test]
379    async fn client_sender_request_gets_response() {
380        let (client_stream, server_stream) = tokio::io::duplex(4096);
381        let mut server: Connection<_, ()> = Connection::new(server_stream);
382        let mut client: Connection<_, ()> = Connection::new(client_stream);
383        let sender = server.client_sender();
384
385        let task: tokio::task::JoinHandle<Result<Response, ProtocolError>> =
386            tokio::spawn(async move {
387                sender
388                    .request("workspace/configuration", Some(json!({"items": []})))
389                    .await
390            });
391
392        let id = match client.receiver_mut().next().await.unwrap().unwrap() {
393            Message::Request(request) => {
394                assert_eq!(request.method, "workspace/configuration");
395                request.id
396            }
397            other => panic!("expected request, got {other:?}"),
398        };
399
400        let routed = server.route(Message::Response(Response::ok(
401            id.clone(),
402            json!({"settings": []}),
403        )));
404        assert!(matches!(routed, IncomingMessage::ResponseRouted));
405
406        let response = task.await.unwrap().unwrap();
407        assert_eq!(response.id, Some(id));
408        assert_eq!(response.result().cloned(), Some(json!({"settings": []})));
409    }
410
411    #[tokio::test]
412    async fn client_sender_concurrent_requests() {
413        let (client_stream, server_stream) = tokio::io::duplex(4096);
414        let mut server: Connection<_, ()> = Connection::new(server_stream);
415        let mut client: Connection<_, ()> = Connection::new(client_stream);
416        let sender = server.client_sender();
417
418        let methods = ["first", "second", "third"];
419        let tasks = methods.into_iter().map(|method| {
420            let sender = sender.clone();
421            let method = method.to_string();
422            tokio::spawn(async move {
423                let response = sender
424                    .request(&method, None::<serde_json::Value>)
425                    .await
426                    .unwrap();
427                (method, response)
428            })
429        });
430        let tasks = join_all(tasks);
431
432        let mut requests = Vec::new();
433        for _ in 0..3 {
434            match client.receiver_mut().next().await.unwrap().unwrap() {
435                Message::Request(request) => requests.push(request),
436                other => panic!("expected request, got {other:?}"),
437            }
438        }
439
440        for request in [
441            requests[2].clone(),
442            requests[0].clone(),
443            requests[1].clone(),
444        ] {
445            assert!(matches!(
446                server.route(Message::Response(Response::ok(
447                    request.id.clone(),
448                    json!(request.method.clone()),
449                ))),
450                IncomingMessage::ResponseRouted
451            ));
452        }
453
454        let results = tasks.await;
455        for result in results {
456            let (method, response) = result.unwrap();
457            assert_eq!(response.result().cloned(), Some(json!(method)));
458        }
459    }
460
461    #[tokio::test(start_paused = true)]
462    async fn client_sender_request_timeout() {
463        let (client_stream, server_stream) = tokio::io::duplex(4096);
464        let mut server: Connection<_, ()> = Connection::new(server_stream);
465        let _client: Connection<_, ()> = Connection::new(client_stream);
466        let sender = server.client_sender();
467
468        let task: tokio::task::JoinHandle<Result<Response, ProtocolError>> =
469            tokio::spawn(async move {
470                sender
471                    .request_timeout(
472                        "workspace/configuration",
473                        None::<serde_json::Value>,
474                        Duration::from_secs(5),
475                    )
476                    .await
477            });
478
479        tokio::task::yield_now().await;
480        tokio::time::advance(Duration::from_secs(5)).await;
481
482        let result = task.await.unwrap();
483        assert!(matches!(result, Err(ProtocolError::RequestTimeout)));
484    }
485
486    #[tokio::test]
487    async fn client_sender_request_disconnected() {
488        let (client_stream, server_stream) = tokio::io::duplex(4096);
489        let mut server: Connection<_, ()> = Connection::new(server_stream);
490        let client: Connection<_, ()> = Connection::new(client_stream);
491        let sender = server.client_sender();
492
493        drop(client);
494        // Give the runtime a chance to process the duplex close.
495        // The client's drain task needs to observe the channel close and exit.
496        tokio::task::yield_now().await;
497
498        let result = tokio::time::timeout(
499            Duration::from_secs(1),
500            sender.request("test", None::<serde_json::Value>),
501        )
502        .await
503        .unwrap();
504        assert!(matches!(result, Err(ProtocolError::Disconnected)));
505    }
506
507    #[tokio::test]
508    async fn client_sender_notify_after_close() {
509        let (client_stream, server_stream) = tokio::io::duplex(4096);
510        let mut server: Connection<_, ()> = Connection::new(server_stream);
511        let client: Connection<_, ()> = Connection::new(client_stream);
512        let sender = server.client_sender();
513
514        drop(client);
515        let _ = sender.notify("window/logMessage", None);
516
517        tokio::time::timeout(Duration::from_secs(1), async {
518            loop {
519                if sender.notify("window/logMessage", None).is_err() {
520                    break;
521                }
522                tokio::task::yield_now().await;
523            }
524        })
525        .await
526        .unwrap();
527    }
528
529    #[tokio::test]
530    async fn client_sender_multiple_clones_share_state() {
531        let (client_stream, server_stream) = tokio::io::duplex(4096);
532        let mut server: Connection<_, ()> = Connection::new(server_stream);
533        let mut client: Connection<_, ()> = Connection::new(client_stream);
534        let sender = server.client_sender();
535        let sender_clone = sender.clone();
536
537        let first_task: tokio::task::JoinHandle<Result<Response, ProtocolError>> =
538            tokio::spawn(async move { sender.request("first", None::<serde_json::Value>).await });
539        let second_task: tokio::task::JoinHandle<Result<Response, ProtocolError>> =
540            tokio::spawn(async move {
541                sender_clone
542                    .request("second", None::<serde_json::Value>)
543                    .await
544            });
545
546        let first_request = match client.receiver_mut().next().await.unwrap().unwrap() {
547            Message::Request(request) => request,
548            other => panic!("expected request, got {other:?}"),
549        };
550        let second_request = match client.receiver_mut().next().await.unwrap().unwrap() {
551            Message::Request(request) => request,
552            other => panic!("expected request, got {other:?}"),
553        };
554
555        assert_eq!(first_request.id, RequestId::Integer(1));
556        assert_eq!(second_request.id, RequestId::Integer(2));
557
558        assert!(matches!(
559            server.route(Message::Response(Response::ok(
560                first_request.id.clone(),
561                json!(first_request.method.clone()),
562            ))),
563            IncomingMessage::ResponseRouted
564        ));
565        assert!(matches!(
566            server.route(Message::Response(Response::ok(
567                second_request.id.clone(),
568                json!(second_request.method.clone()),
569            ))),
570            IncomingMessage::ResponseRouted
571        ));
572
573        assert_eq!(
574            first_task.await.unwrap().unwrap().into_result(),
575            Some(json!("first"))
576        );
577        assert_eq!(
578            second_task.await.unwrap().unwrap().into_result(),
579            Some(json!("second"))
580        );
581    }
582
583    #[tokio::test]
584    async fn route_delivers_to_response_map() {
585        let (client_stream, server_stream) = tokio::io::duplex(4096);
586        let mut server: Connection<_, ()> = Connection::new(server_stream);
587        let mut client: Connection<_, ()> = Connection::new(client_stream);
588        let sender = server.client_sender();
589
590        let task: tokio::task::JoinHandle<Result<Response, ProtocolError>> =
591            tokio::spawn(async move { sender.request("test", None::<serde_json::Value>).await });
592
593        let request_id = match client.receiver_mut().next().await.unwrap().unwrap() {
594            Message::Request(request) => request.id,
595            other => panic!("expected request, got {other:?}"),
596        };
597
598        let routed = server.route(Message::Response(Response::ok(
599            request_id.clone(),
600            json!(true),
601        )));
602        assert!(matches!(routed, IncomingMessage::ResponseRouted));
603
604        let response = task.await.unwrap().unwrap();
605        assert_eq!(response.id, Some(request_id));
606    }
607
608    #[tokio::test]
609    async fn route_response_map_takes_priority() {
610        let (client_stream, server_stream) = tokio::io::duplex(4096);
611        let mut server: Connection<_, ()> = Connection::new(server_stream);
612        let mut client: Connection<_, ()> = Connection::new(client_stream);
613        let sender = server.client_sender();
614
615        let task: tokio::task::JoinHandle<Result<Response, ProtocolError>> =
616            tokio::spawn(async move { sender.request("test", None::<serde_json::Value>).await });
617
618        let request_id = match client.receiver_mut().next().await.unwrap().unwrap() {
619            Message::Request(request) => request.id,
620            other => panic!("expected request, got {other:?}"),
621        };
622        let _outgoing_rx = server.request_queue.outgoing.register(request_id.clone());
623
624        let routed = server.route(Message::Response(Response::ok(
625            request_id.clone(),
626            json!("response-map"),
627        )));
628        assert!(matches!(routed, IncomingMessage::ResponseRouted));
629        assert!(server.request_queue.outgoing.is_pending(&request_id));
630
631        let response = task.await.unwrap().unwrap();
632        assert_eq!(response.result().cloned(), Some(json!("response-map")));
633    }
634
635    #[tokio::test]
636    async fn client_sender_request_timeout_does_not_swallow_late_response() {
637        let (client_stream, server_stream) = tokio::io::duplex(4096);
638        let mut server: Connection<_, ()> = Connection::new(server_stream);
639        let mut client: Connection<_, ()> = Connection::new(client_stream);
640        let sender = server.client_sender();
641
642        let task = tokio::spawn(async move {
643            sender
644                .request_timeout(
645                    "late-response",
646                    None::<serde_json::Value>,
647                    Duration::from_millis(10),
648                )
649                .await
650        });
651
652        let request_id = match client.receiver_mut().next().await.unwrap().unwrap() {
653            Message::Request(request) => request.id,
654            other => panic!("expected request, got {other:?}"),
655        };
656
657        let result = task.await.unwrap();
658        assert!(matches!(result, Err(ProtocolError::RequestTimeout)));
659
660        let routed = server.route(Message::Response(Response::ok(
661            request_id.clone(),
662            json!(true),
663        )));
664        match routed {
665            IncomingMessage::ResponseUnknown(response) => {
666                assert_eq!(response.id, Some(request_id));
667            }
668            other => panic!("late response should not be swallowed after timeout, got {other:?}"),
669        }
670    }
671
672    #[tokio::test]
673    async fn client_sender_aborted_request_does_not_swallow_late_response() {
674        let (client_stream, server_stream) = tokio::io::duplex(4096);
675        let mut server: Connection<_, ()> = Connection::new(server_stream);
676        let mut client: Connection<_, ()> = Connection::new(client_stream);
677        let sender = server.client_sender();
678
679        let task =
680            tokio::spawn(async move { sender.request("aborted", None::<serde_json::Value>).await });
681
682        let request_id = match client.receiver_mut().next().await.unwrap().unwrap() {
683            Message::Request(request) => request.id,
684            other => panic!("expected request, got {other:?}"),
685        };
686
687        task.abort();
688        let _ = task.await;
689
690        let routed = server.route(Message::Response(Response::ok(
691            request_id.clone(),
692            json!(true),
693        )));
694        match routed {
695            IncomingMessage::ResponseUnknown(response) => {
696                assert_eq!(response.id, Some(request_id));
697            }
698            other => panic!("late response should not be swallowed after abort, got {other:?}"),
699        }
700    }
701
702    #[tokio::test]
703    async fn client_sender_wraparound_does_not_reuse_pending_id() {
704        let (client_stream, server_stream) = tokio::io::duplex(4096);
705        let mut server: Connection<_, ()> = Connection::new(server_stream);
706        let mut client: Connection<_, ()> = Connection::new(client_stream);
707        let sender = server.client_sender();
708
709        let first_sender = sender.clone();
710        let first_task = tokio::spawn(async move {
711            first_sender
712                .send_request_with_id(RequestId::Integer(1), "first", None::<serde_json::Value>)
713                .await
714        });
715
716        let first_request = match client.receiver_mut().next().await.unwrap().unwrap() {
717            Message::Request(request) => request,
718            other => panic!("expected request, got {other:?}"),
719        };
720        assert_eq!(first_request.id, RequestId::Integer(1));
721
722        sender
723            .id_counter
724            .store(i64::from(i32::MAX), Ordering::Relaxed);
725
726        let max_sender = sender.clone();
727        let max_task =
728            tokio::spawn(async move { max_sender.request("max", None::<serde_json::Value>).await });
729        let wrapped_sender = sender.clone();
730        let wrapped_task = tokio::spawn(async move {
731            wrapped_sender
732                .request("wrapped", None::<serde_json::Value>)
733                .await
734        });
735
736        let second_request = match client.receiver_mut().next().await.unwrap().unwrap() {
737            Message::Request(request) => request,
738            other => panic!("expected request, got {other:?}"),
739        };
740        let third_request = match client.receiver_mut().next().await.unwrap().unwrap() {
741            Message::Request(request) => request,
742            other => panic!("expected request, got {other:?}"),
743        };
744
745        assert_ne!(second_request.id, first_request.id);
746        assert_ne!(
747            third_request.id, first_request.id,
748            "wrapped request reused an active pending id"
749        );
750        assert_ne!(second_request.id, third_request.id);
751
752        assert!(matches!(
753            server.route(Message::Response(Response::ok(
754                first_request.id.clone(),
755                json!("first")
756            ))),
757            IncomingMessage::ResponseRouted
758        ));
759        assert!(matches!(
760            server.route(Message::Response(Response::ok(
761                second_request.id.clone(),
762                json!("second")
763            ))),
764            IncomingMessage::ResponseRouted
765        ));
766        assert!(matches!(
767            server.route(Message::Response(Response::ok(
768                third_request.id.clone(),
769                json!("third")
770            ))),
771            IncomingMessage::ResponseRouted
772        ));
773
774        assert_eq!(
775            first_task.await.unwrap().unwrap().into_result(),
776            Some(json!("first"))
777        );
778        assert_eq!(
779            max_task.await.unwrap().unwrap().into_result(),
780            Some(json!("second"))
781        );
782        assert_eq!(
783            wrapped_task.await.unwrap().unwrap().into_result(),
784            Some(json!("third"))
785        );
786    }
787
788    #[tokio::test]
789    async fn client_sender_timeout_preserves_other_pending_requests() {
790        let (client_stream, server_stream) = tokio::io::duplex(4096);
791        let mut server: Connection<_, ()> = Connection::new(server_stream);
792        let mut client: Connection<_, ()> = Connection::new(client_stream);
793        let sender = server.client_sender();
794
795        let timeout_sender = sender.clone();
796        let timeout_task = tokio::spawn(async move {
797            timeout_sender
798                .request_timeout(
799                    "timeout",
800                    None::<serde_json::Value>,
801                    Duration::from_millis(10),
802                )
803                .await
804        });
805        let ok_sender = sender.clone();
806        let ok_task =
807            tokio::spawn(async move { ok_sender.request("ok", None::<serde_json::Value>).await });
808
809        let first_request = match client.receiver_mut().next().await.unwrap().unwrap() {
810            Message::Request(request) => request,
811            other => panic!("expected request, got {other:?}"),
812        };
813        let second_request = match client.receiver_mut().next().await.unwrap().unwrap() {
814            Message::Request(request) => request,
815            other => panic!("expected request, got {other:?}"),
816        };
817
818        let (timed_out, live_request) = if first_request.method == "timeout" {
819            (first_request, second_request)
820        } else {
821            (second_request, first_request)
822        };
823
824        let result = timeout_task.await.unwrap();
825        assert!(matches!(result, Err(ProtocolError::RequestTimeout)));
826
827        assert!(matches!(
828            server.route(Message::Response(Response::ok(
829                live_request.id.clone(),
830                json!("ok")
831            ))),
832            IncomingMessage::ResponseRouted
833        ));
834        assert_eq!(
835            ok_task.await.unwrap().unwrap().result().cloned(),
836            Some(json!("ok"))
837        );
838
839        let routed = server.route(Message::Response(Response::ok(
840            timed_out.id.clone(),
841            json!("late"),
842        )));
843        assert!(matches!(routed, IncomingMessage::ResponseUnknown(_)));
844    }
845
846    #[tokio::test]
847    async fn response_map_cancel_ignores_unknown_id() {
848        let response_map = ResponseMap::new();
849
850        assert!(!response_map.cancel(&RequestId::Integer(404)));
851        assert!(!response_map.deliver(&RequestId::Integer(404), Response::ok(404, json!(null))));
852    }
853}