1use std::collections::HashMap;
4use std::sync::atomic::{AtomicI64, Ordering};
5use std::sync::Arc;
6use std::time::Duration;
7
8use parking_lot::Mutex;
9
10use thiserror::Error;
11use tokio::sync::{mpsc, oneshot};
12use tokio_util::sync::CancellationToken;
13
14use crate::{Message, Notification, ProtocolError, Request, RequestId, Response};
15
16#[derive(Debug, Clone, Default)]
18pub(crate) struct ResponseMap {
19 pending: Arc<Mutex<HashMap<RequestId, oneshot::Sender<Response>>>>,
20}
21
22impl ResponseMap {
23 pub(crate) fn new() -> Self {
24 Self::default()
25 }
26
27 #[cfg(test)]
28 pub(crate) fn register(&self, id: RequestId) -> oneshot::Receiver<Response> {
29 let (tx, rx) = oneshot::channel();
30 self.pending.lock().insert(id, tx);
31 rx
32 }
33
34 pub(crate) fn try_register(&self, id: RequestId) -> Option<oneshot::Receiver<Response>> {
35 let (tx, rx) = oneshot::channel();
36 let mut pending = self.pending.lock();
37 if pending.contains_key(&id) {
38 return None;
39 }
40
41 pending.insert(id, tx);
42 Some(rx)
43 }
44
45 pub(crate) fn contains(&self, id: &RequestId) -> bool {
46 self.pending.lock().contains_key(id)
47 }
48
49 pub(crate) fn deliver(&self, id: &RequestId, response: Response) -> bool {
50 if let Some(tx) = self.pending.lock().remove(id) {
51 let _ = tx.send(response);
52 true
53 } else {
54 false
55 }
56 }
57
58 pub(crate) fn cancel(&self, id: &RequestId) -> bool {
59 self.pending.lock().remove(id).is_some()
60 }
61}
62
63#[derive(Debug)]
64struct PendingResponseGuard {
65 response_map: ResponseMap,
66 id: Option<RequestId>,
67}
68
69impl PendingResponseGuard {
70 fn new(response_map: ResponseMap, id: RequestId) -> Self {
71 Self {
72 response_map,
73 id: Some(id),
74 }
75 }
76
77 fn disarm(&mut self) {
78 self.id = None;
79 }
80}
81
82impl Drop for PendingResponseGuard {
83 fn drop(&mut self) {
84 if let Some(id) = self.id.take() {
85 let _ = self.response_map.cancel(&id);
86 }
87 }
88}
89
90#[derive(Debug, Clone, Error)]
92#[error("connection closed")]
93pub struct SendError;
94
95#[derive(Debug, Clone)]
97pub struct ClientSender {
98 tx: mpsc::UnboundedSender<Message>,
99 response_map: ResponseMap,
100 id_counter: Arc<AtomicI64>,
101 drain_alive: CancellationToken,
104}
105
106impl ClientSender {
107 pub(crate) fn new(
108 tx: mpsc::UnboundedSender<Message>,
109 response_map: ResponseMap,
110 drain_alive: CancellationToken,
111 ) -> Self {
112 Self {
113 tx,
114 response_map,
115 id_counter: Arc::new(AtomicI64::new(1)),
116 drain_alive,
117 }
118 }
119
120 pub fn notify(&self, method: &str, params: Option<serde_json::Value>) -> Result<(), SendError> {
126 let notification = Notification::new(method, params);
127 self.tx
128 .send(Message::Notification(notification))
129 .map_err(|_| SendError)
130 }
131
132 pub fn respond(&self, response: Response) -> Result<(), SendError> {
138 self.tx
139 .send(Message::Response(response))
140 .map_err(|_| SendError)
141 }
142
143 pub async fn request(
150 &self,
151 method: &str,
152 params: Option<serde_json::Value>,
153 ) -> Result<Response, ProtocolError> {
154 let (id, rx) = self.reserve_request_slot().map_err(ProtocolError::Io)?;
155 self.send_registered_request(id, method, params, rx).await
156 }
157
158 pub async fn request_timeout(
165 &self,
166 method: &str,
167 params: Option<serde_json::Value>,
168 timeout: Duration,
169 ) -> Result<Response, ProtocolError> {
170 tokio::time::timeout(timeout, self.request(method, params))
171 .await
172 .map_err(|_| ProtocolError::RequestTimeout)?
173 }
174
175 fn next_id(&self) -> std::io::Result<RequestId> {
186 let id = self
187 .id_counter
188 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
189 Some(if current >= i64::from(i32::MAX) {
190 1
191 } else {
192 current + 1
193 })
194 })
195 .map_err(|current| {
196 std::io::Error::other(format!(
197 "request id counter update failed unexpectedly (current value: {current})"
198 ))
199 })?;
200
201 let id = i32::try_from(id).map_err(|e| {
202 std::io::Error::other(format!("generated request id out of i32 range: {e}"))
203 })?;
204
205 Ok(RequestId::Integer(id))
206 }
207
208 fn reserve_request_slot(&self) -> std::io::Result<(RequestId, oneshot::Receiver<Response>)> {
209 const MAX_RETRIES: usize = 1000;
210 for _ in 0..MAX_RETRIES {
211 let id = self.next_id()?;
212 if let Some(rx) = self.response_map.try_register(id.clone()) {
213 return Ok((id, rx));
214 }
215 }
216 Err(std::io::Error::other(
217 "failed to reserve a request slot after 1000 attempts",
218 ))
219 }
220
221 #[cfg(test)]
222 async fn send_request_with_id(
223 &self,
224 id: RequestId,
225 method: &str,
226 params: Option<serde_json::Value>,
227 ) -> Result<Response, ProtocolError> {
228 let rx = self.response_map.register(id.clone());
229 self.send_registered_request(id, method, params, rx).await
230 }
231
232 async fn send_registered_request(
233 &self,
234 id: RequestId,
235 method: &str,
236 params: Option<serde_json::Value>,
237 rx: oneshot::Receiver<Response>,
238 ) -> Result<Response, ProtocolError> {
239 let mut cleanup = PendingResponseGuard::new(self.response_map.clone(), id.clone());
240 let request = Request::new(id.clone(), method, params);
241
242 if self.tx.send(Message::Request(request)).is_err() {
243 return Err(ProtocolError::Disconnected);
244 }
245
246 tokio::select! {
247 result = rx => {
248 match result {
249 Ok(response) => {
250 cleanup.disarm();
251 Ok(response)
252 }
253 Err(_) => Err(ProtocolError::Disconnected),
254 }
255 }
256 () = self.drain_alive.cancelled() => {
257 Err(ProtocolError::Disconnected)
258 }
259 }
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266
267 use futures::future::join_all;
268 use futures::StreamExt;
269 use serde_json::json;
270
271 use crate::{Connection, IncomingMessage};
272
273 fn assert_clone_send_sync<T: Clone + Send + Sync>() {}
274
275 #[test]
276 fn client_sender_is_clone_send_sync() {
277 assert_clone_send_sync::<ClientSender>();
278 }
279
280 #[tokio::test]
281 async fn client_sender_notify_sends_notification() {
282 let (client_stream, server_stream) = tokio::io::duplex(4096);
283 let mut server: Connection<_, ()> = Connection::new(server_stream);
284 let mut client: Connection<_, ()> = Connection::new(client_stream);
285 let sender = server.client_sender();
286
287 sender
288 .notify("window/logMessage", Some(json!({"type": 4})))
289 .unwrap();
290
291 match client.receiver_mut().next().await.unwrap().unwrap() {
292 Message::Notification(notification) => {
293 assert_eq!(notification.method, "window/logMessage");
294 assert_eq!(notification.params, Some(json!({"type": 4})));
295 }
296 other => panic!("expected notification, got {other:?}"),
297 }
298 }
299
300 #[tokio::test]
301 async fn client_sender_respond_sends_response() {
302 let (client_stream, server_stream) = tokio::io::duplex(4096);
303 let mut server: Connection<_, ()> = Connection::new(server_stream);
304 let mut client: Connection<_, ()> = Connection::new(client_stream);
305 let sender = server.client_sender();
306
307 sender.respond(Response::ok(7, json!("ok"))).unwrap();
308
309 match client.receiver_mut().next().await.unwrap().unwrap() {
310 Message::Response(response) => {
311 assert_eq!(response.id, Some(7.into()));
312 assert_eq!(response.result().cloned(), Some(json!("ok")));
313 }
314 other => panic!("expected response, got {other:?}"),
315 }
316 }
317
318 #[tokio::test]
319 async fn client_sender_request_auto_id() {
320 let (client_stream, server_stream) = tokio::io::duplex(4096);
321 let mut server: Connection<_, ()> = Connection::new(server_stream);
322 let mut client: Connection<_, ()> = Connection::new(client_stream);
323 let sender = server.client_sender();
324
325 let first_sender = sender.clone();
326 let first_task: tokio::task::JoinHandle<Result<Response, ProtocolError>> =
327 tokio::spawn(async move {
328 first_sender
329 .request("first", None::<serde_json::Value>)
330 .await
331 });
332
333 let first_id = match client.receiver_mut().next().await.unwrap().unwrap() {
334 Message::Request(request) => {
335 assert_eq!(request.method, "first");
336 request.id
337 }
338 other => panic!("expected request, got {other:?}"),
339 };
340 assert_eq!(first_id, RequestId::Integer(1));
341
342 let second_sender = sender.clone();
343 let second_task: tokio::task::JoinHandle<Result<Response, ProtocolError>> =
344 tokio::spawn(async move {
345 second_sender
346 .request("second", None::<serde_json::Value>)
347 .await
348 });
349
350 let second_id = match client.receiver_mut().next().await.unwrap().unwrap() {
351 Message::Request(request) => {
352 assert_eq!(request.method, "second");
353 request.id
354 }
355 other => panic!("expected request, got {other:?}"),
356 };
357 assert_eq!(second_id, RequestId::Integer(2));
358
359 assert!(matches!(
360 server.route(Message::Response(Response::ok(
361 first_id.clone(),
362 json!("first")
363 ))),
364 IncomingMessage::ResponseRouted
365 ));
366 assert!(matches!(
367 server.route(Message::Response(Response::ok(
368 second_id.clone(),
369 json!("second")
370 ))),
371 IncomingMessage::ResponseRouted
372 ));
373
374 assert_eq!(first_task.await.unwrap().unwrap().id, Some(first_id));
375 assert_eq!(second_task.await.unwrap().unwrap().id, Some(second_id));
376 }
377
378 #[tokio::test]
379 async fn client_sender_request_gets_response() {
380 let (client_stream, server_stream) = tokio::io::duplex(4096);
381 let mut server: Connection<_, ()> = Connection::new(server_stream);
382 let mut client: Connection<_, ()> = Connection::new(client_stream);
383 let sender = server.client_sender();
384
385 let task: tokio::task::JoinHandle<Result<Response, ProtocolError>> =
386 tokio::spawn(async move {
387 sender
388 .request("workspace/configuration", Some(json!({"items": []})))
389 .await
390 });
391
392 let id = match client.receiver_mut().next().await.unwrap().unwrap() {
393 Message::Request(request) => {
394 assert_eq!(request.method, "workspace/configuration");
395 request.id
396 }
397 other => panic!("expected request, got {other:?}"),
398 };
399
400 let routed = server.route(Message::Response(Response::ok(
401 id.clone(),
402 json!({"settings": []}),
403 )));
404 assert!(matches!(routed, IncomingMessage::ResponseRouted));
405
406 let response = task.await.unwrap().unwrap();
407 assert_eq!(response.id, Some(id));
408 assert_eq!(response.result().cloned(), Some(json!({"settings": []})));
409 }
410
411 #[tokio::test]
412 async fn client_sender_concurrent_requests() {
413 let (client_stream, server_stream) = tokio::io::duplex(4096);
414 let mut server: Connection<_, ()> = Connection::new(server_stream);
415 let mut client: Connection<_, ()> = Connection::new(client_stream);
416 let sender = server.client_sender();
417
418 let methods = ["first", "second", "third"];
419 let tasks = methods.into_iter().map(|method| {
420 let sender = sender.clone();
421 let method = method.to_string();
422 tokio::spawn(async move {
423 let response = sender
424 .request(&method, None::<serde_json::Value>)
425 .await
426 .unwrap();
427 (method, response)
428 })
429 });
430 let tasks = join_all(tasks);
431
432 let mut requests = Vec::new();
433 for _ in 0..3 {
434 match client.receiver_mut().next().await.unwrap().unwrap() {
435 Message::Request(request) => requests.push(request),
436 other => panic!("expected request, got {other:?}"),
437 }
438 }
439
440 for request in [
441 requests[2].clone(),
442 requests[0].clone(),
443 requests[1].clone(),
444 ] {
445 assert!(matches!(
446 server.route(Message::Response(Response::ok(
447 request.id.clone(),
448 json!(request.method.clone()),
449 ))),
450 IncomingMessage::ResponseRouted
451 ));
452 }
453
454 let results = tasks.await;
455 for result in results {
456 let (method, response) = result.unwrap();
457 assert_eq!(response.result().cloned(), Some(json!(method)));
458 }
459 }
460
461 #[tokio::test(start_paused = true)]
462 async fn client_sender_request_timeout() {
463 let (client_stream, server_stream) = tokio::io::duplex(4096);
464 let mut server: Connection<_, ()> = Connection::new(server_stream);
465 let _client: Connection<_, ()> = Connection::new(client_stream);
466 let sender = server.client_sender();
467
468 let task: tokio::task::JoinHandle<Result<Response, ProtocolError>> =
469 tokio::spawn(async move {
470 sender
471 .request_timeout(
472 "workspace/configuration",
473 None::<serde_json::Value>,
474 Duration::from_secs(5),
475 )
476 .await
477 });
478
479 tokio::task::yield_now().await;
480 tokio::time::advance(Duration::from_secs(5)).await;
481
482 let result = task.await.unwrap();
483 assert!(matches!(result, Err(ProtocolError::RequestTimeout)));
484 }
485
486 #[tokio::test]
487 async fn client_sender_request_disconnected() {
488 let (client_stream, server_stream) = tokio::io::duplex(4096);
489 let mut server: Connection<_, ()> = Connection::new(server_stream);
490 let client: Connection<_, ()> = Connection::new(client_stream);
491 let sender = server.client_sender();
492
493 drop(client);
494 tokio::task::yield_now().await;
497
498 let result = tokio::time::timeout(
499 Duration::from_secs(1),
500 sender.request("test", None::<serde_json::Value>),
501 )
502 .await
503 .unwrap();
504 assert!(matches!(result, Err(ProtocolError::Disconnected)));
505 }
506
507 #[tokio::test]
508 async fn client_sender_notify_after_close() {
509 let (client_stream, server_stream) = tokio::io::duplex(4096);
510 let mut server: Connection<_, ()> = Connection::new(server_stream);
511 let client: Connection<_, ()> = Connection::new(client_stream);
512 let sender = server.client_sender();
513
514 drop(client);
515 let _ = sender.notify("window/logMessage", None);
516
517 tokio::time::timeout(Duration::from_secs(1), async {
518 loop {
519 if sender.notify("window/logMessage", None).is_err() {
520 break;
521 }
522 tokio::task::yield_now().await;
523 }
524 })
525 .await
526 .unwrap();
527 }
528
529 #[tokio::test]
530 async fn client_sender_multiple_clones_share_state() {
531 let (client_stream, server_stream) = tokio::io::duplex(4096);
532 let mut server: Connection<_, ()> = Connection::new(server_stream);
533 let mut client: Connection<_, ()> = Connection::new(client_stream);
534 let sender = server.client_sender();
535 let sender_clone = sender.clone();
536
537 let first_task: tokio::task::JoinHandle<Result<Response, ProtocolError>> =
538 tokio::spawn(async move { sender.request("first", None::<serde_json::Value>).await });
539 let second_task: tokio::task::JoinHandle<Result<Response, ProtocolError>> =
540 tokio::spawn(async move {
541 sender_clone
542 .request("second", None::<serde_json::Value>)
543 .await
544 });
545
546 let first_request = match client.receiver_mut().next().await.unwrap().unwrap() {
547 Message::Request(request) => request,
548 other => panic!("expected request, got {other:?}"),
549 };
550 let second_request = match client.receiver_mut().next().await.unwrap().unwrap() {
551 Message::Request(request) => request,
552 other => panic!("expected request, got {other:?}"),
553 };
554
555 assert_eq!(first_request.id, RequestId::Integer(1));
556 assert_eq!(second_request.id, RequestId::Integer(2));
557
558 assert!(matches!(
559 server.route(Message::Response(Response::ok(
560 first_request.id.clone(),
561 json!(first_request.method.clone()),
562 ))),
563 IncomingMessage::ResponseRouted
564 ));
565 assert!(matches!(
566 server.route(Message::Response(Response::ok(
567 second_request.id.clone(),
568 json!(second_request.method.clone()),
569 ))),
570 IncomingMessage::ResponseRouted
571 ));
572
573 assert_eq!(
574 first_task.await.unwrap().unwrap().into_result(),
575 Some(json!("first"))
576 );
577 assert_eq!(
578 second_task.await.unwrap().unwrap().into_result(),
579 Some(json!("second"))
580 );
581 }
582
583 #[tokio::test]
584 async fn route_delivers_to_response_map() {
585 let (client_stream, server_stream) = tokio::io::duplex(4096);
586 let mut server: Connection<_, ()> = Connection::new(server_stream);
587 let mut client: Connection<_, ()> = Connection::new(client_stream);
588 let sender = server.client_sender();
589
590 let task: tokio::task::JoinHandle<Result<Response, ProtocolError>> =
591 tokio::spawn(async move { sender.request("test", None::<serde_json::Value>).await });
592
593 let request_id = match client.receiver_mut().next().await.unwrap().unwrap() {
594 Message::Request(request) => request.id,
595 other => panic!("expected request, got {other:?}"),
596 };
597
598 let routed = server.route(Message::Response(Response::ok(
599 request_id.clone(),
600 json!(true),
601 )));
602 assert!(matches!(routed, IncomingMessage::ResponseRouted));
603
604 let response = task.await.unwrap().unwrap();
605 assert_eq!(response.id, Some(request_id));
606 }
607
608 #[tokio::test]
609 async fn route_response_map_takes_priority() {
610 let (client_stream, server_stream) = tokio::io::duplex(4096);
611 let mut server: Connection<_, ()> = Connection::new(server_stream);
612 let mut client: Connection<_, ()> = Connection::new(client_stream);
613 let sender = server.client_sender();
614
615 let task: tokio::task::JoinHandle<Result<Response, ProtocolError>> =
616 tokio::spawn(async move { sender.request("test", None::<serde_json::Value>).await });
617
618 let request_id = match client.receiver_mut().next().await.unwrap().unwrap() {
619 Message::Request(request) => request.id,
620 other => panic!("expected request, got {other:?}"),
621 };
622 let _outgoing_rx = server.request_queue.outgoing.register(request_id.clone());
623
624 let routed = server.route(Message::Response(Response::ok(
625 request_id.clone(),
626 json!("response-map"),
627 )));
628 assert!(matches!(routed, IncomingMessage::ResponseRouted));
629 assert!(server.request_queue.outgoing.is_pending(&request_id));
630
631 let response = task.await.unwrap().unwrap();
632 assert_eq!(response.result().cloned(), Some(json!("response-map")));
633 }
634
635 #[tokio::test]
636 async fn client_sender_request_timeout_does_not_swallow_late_response() {
637 let (client_stream, server_stream) = tokio::io::duplex(4096);
638 let mut server: Connection<_, ()> = Connection::new(server_stream);
639 let mut client: Connection<_, ()> = Connection::new(client_stream);
640 let sender = server.client_sender();
641
642 let task = tokio::spawn(async move {
643 sender
644 .request_timeout(
645 "late-response",
646 None::<serde_json::Value>,
647 Duration::from_millis(10),
648 )
649 .await
650 });
651
652 let request_id = match client.receiver_mut().next().await.unwrap().unwrap() {
653 Message::Request(request) => request.id,
654 other => panic!("expected request, got {other:?}"),
655 };
656
657 let result = task.await.unwrap();
658 assert!(matches!(result, Err(ProtocolError::RequestTimeout)));
659
660 let routed = server.route(Message::Response(Response::ok(
661 request_id.clone(),
662 json!(true),
663 )));
664 match routed {
665 IncomingMessage::ResponseUnknown(response) => {
666 assert_eq!(response.id, Some(request_id));
667 }
668 other => panic!("late response should not be swallowed after timeout, got {other:?}"),
669 }
670 }
671
672 #[tokio::test]
673 async fn client_sender_aborted_request_does_not_swallow_late_response() {
674 let (client_stream, server_stream) = tokio::io::duplex(4096);
675 let mut server: Connection<_, ()> = Connection::new(server_stream);
676 let mut client: Connection<_, ()> = Connection::new(client_stream);
677 let sender = server.client_sender();
678
679 let task =
680 tokio::spawn(async move { sender.request("aborted", None::<serde_json::Value>).await });
681
682 let request_id = match client.receiver_mut().next().await.unwrap().unwrap() {
683 Message::Request(request) => request.id,
684 other => panic!("expected request, got {other:?}"),
685 };
686
687 task.abort();
688 let _ = task.await;
689
690 let routed = server.route(Message::Response(Response::ok(
691 request_id.clone(),
692 json!(true),
693 )));
694 match routed {
695 IncomingMessage::ResponseUnknown(response) => {
696 assert_eq!(response.id, Some(request_id));
697 }
698 other => panic!("late response should not be swallowed after abort, got {other:?}"),
699 }
700 }
701
702 #[tokio::test]
703 async fn client_sender_wraparound_does_not_reuse_pending_id() {
704 let (client_stream, server_stream) = tokio::io::duplex(4096);
705 let mut server: Connection<_, ()> = Connection::new(server_stream);
706 let mut client: Connection<_, ()> = Connection::new(client_stream);
707 let sender = server.client_sender();
708
709 let first_sender = sender.clone();
710 let first_task = tokio::spawn(async move {
711 first_sender
712 .send_request_with_id(RequestId::Integer(1), "first", None::<serde_json::Value>)
713 .await
714 });
715
716 let first_request = match client.receiver_mut().next().await.unwrap().unwrap() {
717 Message::Request(request) => request,
718 other => panic!("expected request, got {other:?}"),
719 };
720 assert_eq!(first_request.id, RequestId::Integer(1));
721
722 sender
723 .id_counter
724 .store(i64::from(i32::MAX), Ordering::Relaxed);
725
726 let max_sender = sender.clone();
727 let max_task =
728 tokio::spawn(async move { max_sender.request("max", None::<serde_json::Value>).await });
729 let wrapped_sender = sender.clone();
730 let wrapped_task = tokio::spawn(async move {
731 wrapped_sender
732 .request("wrapped", None::<serde_json::Value>)
733 .await
734 });
735
736 let second_request = match client.receiver_mut().next().await.unwrap().unwrap() {
737 Message::Request(request) => request,
738 other => panic!("expected request, got {other:?}"),
739 };
740 let third_request = match client.receiver_mut().next().await.unwrap().unwrap() {
741 Message::Request(request) => request,
742 other => panic!("expected request, got {other:?}"),
743 };
744
745 assert_ne!(second_request.id, first_request.id);
746 assert_ne!(
747 third_request.id, first_request.id,
748 "wrapped request reused an active pending id"
749 );
750 assert_ne!(second_request.id, third_request.id);
751
752 assert!(matches!(
753 server.route(Message::Response(Response::ok(
754 first_request.id.clone(),
755 json!("first")
756 ))),
757 IncomingMessage::ResponseRouted
758 ));
759 assert!(matches!(
760 server.route(Message::Response(Response::ok(
761 second_request.id.clone(),
762 json!("second")
763 ))),
764 IncomingMessage::ResponseRouted
765 ));
766 assert!(matches!(
767 server.route(Message::Response(Response::ok(
768 third_request.id.clone(),
769 json!("third")
770 ))),
771 IncomingMessage::ResponseRouted
772 ));
773
774 assert_eq!(
775 first_task.await.unwrap().unwrap().into_result(),
776 Some(json!("first"))
777 );
778 assert_eq!(
779 max_task.await.unwrap().unwrap().into_result(),
780 Some(json!("second"))
781 );
782 assert_eq!(
783 wrapped_task.await.unwrap().unwrap().into_result(),
784 Some(json!("third"))
785 );
786 }
787
788 #[tokio::test]
789 async fn client_sender_timeout_preserves_other_pending_requests() {
790 let (client_stream, server_stream) = tokio::io::duplex(4096);
791 let mut server: Connection<_, ()> = Connection::new(server_stream);
792 let mut client: Connection<_, ()> = Connection::new(client_stream);
793 let sender = server.client_sender();
794
795 let timeout_sender = sender.clone();
796 let timeout_task = tokio::spawn(async move {
797 timeout_sender
798 .request_timeout(
799 "timeout",
800 None::<serde_json::Value>,
801 Duration::from_millis(10),
802 )
803 .await
804 });
805 let ok_sender = sender.clone();
806 let ok_task =
807 tokio::spawn(async move { ok_sender.request("ok", None::<serde_json::Value>).await });
808
809 let first_request = match client.receiver_mut().next().await.unwrap().unwrap() {
810 Message::Request(request) => request,
811 other => panic!("expected request, got {other:?}"),
812 };
813 let second_request = match client.receiver_mut().next().await.unwrap().unwrap() {
814 Message::Request(request) => request,
815 other => panic!("expected request, got {other:?}"),
816 };
817
818 let (timed_out, live_request) = if first_request.method == "timeout" {
819 (first_request, second_request)
820 } else {
821 (second_request, first_request)
822 };
823
824 let result = timeout_task.await.unwrap();
825 assert!(matches!(result, Err(ProtocolError::RequestTimeout)));
826
827 assert!(matches!(
828 server.route(Message::Response(Response::ok(
829 live_request.id.clone(),
830 json!("ok")
831 ))),
832 IncomingMessage::ResponseRouted
833 ));
834 assert_eq!(
835 ok_task.await.unwrap().unwrap().result().cloned(),
836 Some(json!("ok"))
837 );
838
839 let routed = server.route(Message::Response(Response::ok(
840 timed_out.id.clone(),
841 json!("late"),
842 )));
843 assert!(matches!(routed, IncomingMessage::ResponseUnknown(_)));
844 }
845
846 #[tokio::test]
847 async fn response_map_cancel_ignores_unknown_id() {
848 let response_map = ResponseMap::new();
849
850 assert!(!response_map.cancel(&RequestId::Integer(404)));
851 assert!(!response_map.deliver(&RequestId::Integer(404), Response::ok(404, json!(null))));
852 }
853}