lsp_server_tokio/request_queue.rs
1//! Request queue types for tracking pending LSP requests.
2//!
3//! This module provides types for tracking request-response correlation in both
4//! directions of LSP communication:
5//!
6//! - [`IncomingRequests`] - Tracks requests received from clients (server needs to send responses)
7//! - [`OutgoingRequests`] - Tracks requests sent to clients (server is waiting for responses)
8//! - [`RequestQueue`] - Combines both for complete request lifecycle management
9//!
10//! # Usage Pattern
11//!
12//! ```
13//! use lsp_server_tokio::{RequestQueue, RequestId};
14//! use tokio_util::sync::CancellationToken;
15//!
16//! // Create a queue with custom metadata types
17//! let mut queue: RequestQueue<String> = RequestQueue::new();
18//!
19//! // Track an incoming request (from client) with a cancellation token
20//! let request_id: RequestId = 1.into();
21//! let token = CancellationToken::new();
22//! queue.incoming.register(request_id.clone(), "handler_context".to_string(), token);
23//! assert!(queue.incoming.is_pending(&request_id));
24//!
25//! // When ready to respond, complete the request
26//! let metadata = queue.incoming.complete(&request_id);
27//! assert_eq!(metadata, Some("handler_context".to_string()));
28//! ```
29//!
30//! # Server-Initiated Requests
31//!
32//! ```
33//! use lsp_server_tokio::{RequestQueue, RequestId, Response};
34//! use serde_json::json;
35//!
36//! # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
37//! let mut queue: RequestQueue<()> = RequestQueue::new();
38//!
39//! // Register an outgoing request (to client) and get a receiver
40//! let request_id: RequestId = 42.into();
41//! let rx = queue.outgoing.register(request_id.clone());
42//!
43//! // When response arrives, complete the request
44//! let response = Response::ok(42, json!({"result": "ok"}));
45//! let sent = queue.outgoing.complete(&request_id, response);
46//! assert!(sent);
47//!
48//! // The receiver gets the response
49//! let received = rx.await.unwrap();
50//! assert_eq!(received.id, Some(42.into()));
51//! # });
52//! ```
53
54use std::collections::HashMap;
55use tokio::sync::oneshot;
56use tokio_util::sync::CancellationToken;
57
58use crate::request_id::RequestId;
59use crate::Response;
60
61/// The method name for cancel request notifications per LSP specification.
62pub const CANCEL_REQUEST_METHOD: &str = "$/cancelRequest";
63
64/// Parses the request ID from $/cancelRequest notification params.
65///
66/// According to the LSP specification, $/cancelRequest has params:
67/// ```json
68/// { "id": number | string }
69/// ```
70///
71/// Returns `None` if params are missing, malformed, or the ID is not
72/// a valid integer or string.
73///
74/// # Example
75///
76/// ```
77/// use lsp_server_tokio::parse_cancel_params;
78/// use serde_json::json;
79///
80/// // Integer ID
81/// let params = Some(json!({"id": 42}));
82/// let id = parse_cancel_params(¶ms);
83/// assert_eq!(id, Some(42.into()));
84///
85/// // String ID
86/// let params = Some(json!({"id": "request-abc"}));
87/// let id = parse_cancel_params(¶ms);
88/// assert_eq!(id, Some("request-abc".into()));
89///
90/// // Missing params
91/// let id = parse_cancel_params(&None);
92/// assert!(id.is_none());
93/// ```
94#[must_use]
95pub fn parse_cancel_params(params: &Option<serde_json::Value>) -> Option<RequestId> {
96 let params = params.as_ref()?;
97 let id_value = params.get("id")?;
98
99 match id_value {
100 serde_json::Value::Number(n) => n
101 .as_i64()
102 .and_then(|i| i32::try_from(i).ok())
103 .map(RequestId::Integer),
104 serde_json::Value::String(s) => Some(RequestId::String(s.clone())),
105 _ => None,
106 }
107}
108
109/// Tracks requests received from clients (incoming to the server).
110///
111/// When the server receives a request, it registers the request ID along with
112/// any metadata needed to process the response. When the server is ready to
113/// send a response, it completes the request to retrieve the metadata.
114///
115/// Each request is also associated with a [`CancellationToken`] that can be
116/// used to signal cancellation (e.g., when receiving `$/cancelRequest`).
117///
118/// The generic parameter `I` represents user-defined metadata associated with
119/// each incoming request (e.g., handler context, timing info, request origin).
120///
121/// # Example
122///
123/// ```
124/// use lsp_server_tokio::{IncomingRequests, RequestId};
125/// use tokio_util::sync::CancellationToken;
126///
127/// let mut incoming: IncomingRequests<String> = IncomingRequests::new();
128///
129/// // Register a request with metadata and cancellation token
130/// let token1 = CancellationToken::new();
131/// let token2 = CancellationToken::new();
132/// incoming.register(1.into(), "textDocument/hover".to_string(), token1);
133/// incoming.register(2.into(), "textDocument/completion".to_string(), token2);
134///
135/// assert_eq!(incoming.pending_count(), 2);
136/// assert!(incoming.is_pending(&1.into()));
137///
138/// // Cancel a request
139/// incoming.cancel(&2.into());
140///
141/// // Complete request and get metadata back
142/// let method = incoming.complete(&1.into());
143/// assert_eq!(method, Some("textDocument/hover".to_string()));
144/// assert_eq!(incoming.pending_count(), 1);
145/// ```
146#[derive(Debug)]
147pub struct IncomingRequests<I> {
148 pending: HashMap<RequestId, (I, CancellationToken)>,
149}
150
151impl<I> IncomingRequests<I> {
152 /// Creates a new empty incoming request tracker.
153 #[must_use]
154 pub fn new() -> Self {
155 Self {
156 pending: HashMap::new(),
157 }
158 }
159
160 /// Registers an incoming request with associated metadata and cancellation token.
161 ///
162 /// The metadata can be any user-defined type that you want to associate
163 /// with this request until it's completed. The cancellation token can be
164 /// used to signal request cancellation to async handlers.
165 pub fn register(&mut self, id: RequestId, data: I, token: CancellationToken) {
166 self.pending.insert(id, (data, token));
167 }
168
169 /// Completes an incoming request, removing it from tracking and returning the metadata.
170 ///
171 /// Returns `Some(metadata)` if the request was pending, `None` otherwise.
172 /// The cancellation token is dropped when the request is completed.
173 pub fn complete(&mut self, id: &RequestId) -> Option<I> {
174 self.pending.remove(id).map(|(data, _)| data)
175 }
176
177 /// Returns `true` if the request is currently pending.
178 #[must_use]
179 pub fn is_pending(&self, id: &RequestId) -> bool {
180 self.pending.contains_key(id)
181 }
182
183 /// Cancels a pending request by triggering its cancellation token.
184 ///
185 /// Returns `true` if the request was found and cancelled, `false` if the
186 /// request ID was not pending. Note that cancelling an already-cancelled
187 /// token is a no-op.
188 #[must_use]
189 pub fn cancel(&self, id: &RequestId) -> bool {
190 if let Some((_, token)) = self.pending.get(id) {
191 token.cancel();
192 true
193 } else {
194 false
195 }
196 }
197
198 /// Returns a clone of the cancellation token for a pending request.
199 ///
200 /// Returns `None` if the request is not pending. The returned token
201 /// can be passed to async handlers for cooperative cancellation.
202 #[must_use]
203 pub fn get_token(&self, id: &RequestId) -> Option<CancellationToken> {
204 self.pending.get(id).map(|(_, token)| token.clone())
205 }
206
207 /// Returns the number of currently pending requests.
208 #[must_use]
209 pub fn pending_count(&self) -> usize {
210 self.pending.len()
211 }
212}
213
214impl<I> Default for IncomingRequests<I> {
215 fn default() -> Self {
216 Self::new()
217 }
218}
219
220/// Tracks requests sent to clients (outgoing from the server).
221///
222/// When the server sends a request to the client, it registers the request ID
223/// and receives a oneshot receiver. When the client's response arrives, the
224/// server completes the request, sending the response to the waiting receiver.
225///
226/// Responses are always [`Response`](crate::Response) — the standard JSON-RPC response type.
227///
228/// # Example
229///
230/// ```
231/// use lsp_server_tokio::{OutgoingRequests, RequestId, Response};
232/// use serde_json::json;
233///
234/// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
235/// let mut outgoing = OutgoingRequests::new();
236///
237/// // Register an outgoing request
238/// let rx = outgoing.register(1.into());
239/// assert!(outgoing.is_pending(&1.into()));
240///
241/// // Simulate receiving a response
242/// let response = Response::ok(1, json!({"result": "success"}));
243/// let completed = outgoing.complete(&1.into(), response);
244/// assert!(completed);
245///
246/// // Receiver gets the response
247/// let result = rx.await.unwrap();
248/// assert_eq!(result.id, Some(1.into()));
249/// # });
250/// ```
251#[derive(Debug)]
252pub struct OutgoingRequests {
253 pending: HashMap<RequestId, oneshot::Sender<Response>>,
254}
255
256impl OutgoingRequests {
257 /// Creates a new empty outgoing request tracker.
258 #[must_use]
259 pub fn new() -> Self {
260 Self {
261 pending: HashMap::new(),
262 }
263 }
264
265 /// Registers an outgoing request and returns a receiver for the response.
266 ///
267 /// The returned receiver will receive the response value when [`complete`](Self::complete)
268 /// is called with a matching ID. If the request is cancelled via [`cancel`](Self::cancel),
269 /// the receiver will return a `RecvError`.
270 pub fn register(&mut self, id: RequestId) -> oneshot::Receiver<Response> {
271 let (tx, rx) = oneshot::channel();
272 self.pending.insert(id, tx);
273 rx
274 }
275
276 /// Completes an outgoing request by sending the response to the waiting receiver.
277 ///
278 /// Returns `true` if the request was pending and the response was sent,
279 /// `false` if the request was not found.
280 ///
281 /// Note: This returns `true` even if the receiver was dropped (the response is
282 /// still considered "completed" from the queue's perspective).
283 pub fn complete(&mut self, id: &RequestId, response: Response) -> bool {
284 if let Some(tx) = self.pending.remove(id) {
285 // Ignore send error - receiver may have been dropped
286 let _ = tx.send(response);
287 true
288 } else {
289 false
290 }
291 }
292
293 /// Cancels an outgoing request without sending a response.
294 ///
295 /// The sender is dropped, causing the receiver to return `RecvError`.
296 ///
297 /// Returns `true` if the request was pending, `false` otherwise.
298 pub fn cancel(&mut self, id: &RequestId) -> bool {
299 self.pending.remove(id).is_some()
300 }
301
302 /// Returns `true` if the request is currently pending.
303 #[must_use]
304 pub fn is_pending(&self, id: &RequestId) -> bool {
305 self.pending.contains_key(id)
306 }
307
308 /// Returns the number of currently pending requests.
309 #[must_use]
310 pub fn pending_count(&self) -> usize {
311 self.pending.len()
312 }
313}
314
315impl Default for OutgoingRequests {
316 fn default() -> Self {
317 Self::new()
318 }
319}
320
321/// Combined request queue tracking both incoming and outgoing requests.
322///
323/// This is the primary type for managing LSP request-response correlation.
324/// It provides separate tracking for:
325///
326/// - `incoming`: Requests received from clients that need responses
327/// - `outgoing`: Requests sent to clients that are awaiting responses
328///
329/// # Type Parameters
330///
331/// - `I`: Metadata type for incoming requests (e.g., handler context)
332///
333/// # Example
334///
335/// ```
336/// use lsp_server_tokio::{RequestQueue, RequestId};
337/// use tokio_util::sync::CancellationToken;
338///
339/// // Create a queue for a server that tracks method names for incoming
340/// // requests and expects JSON responses for outgoing requests
341/// let mut queue: RequestQueue<String> = RequestQueue::new();
342///
343/// // Track incoming request with cancellation token
344/// let token = CancellationToken::new();
345/// queue.incoming.register(1.into(), "textDocument/hover".to_string(), token);
346///
347/// // Operations on incoming don't affect outgoing
348/// assert_eq!(queue.incoming.pending_count(), 1);
349/// assert_eq!(queue.outgoing.pending_count(), 0);
350/// ```
351#[derive(Debug)]
352pub struct RequestQueue<I> {
353 /// Tracker for requests received from clients.
354 pub incoming: IncomingRequests<I>,
355 /// Tracker for requests sent to clients.
356 pub outgoing: OutgoingRequests,
357}
358
359impl<I> RequestQueue<I> {
360 /// Creates a new empty request queue.
361 #[must_use]
362 pub fn new() -> Self {
363 Self {
364 incoming: IncomingRequests::new(),
365 outgoing: OutgoingRequests::new(),
366 }
367 }
368}
369
370impl<I> Default for RequestQueue<I> {
371 fn default() -> Self {
372 Self::new()
373 }
374}
375
376#[cfg(test)]
377mod tests {
378 use super::*;
379 use tokio_util::sync::CancellationToken;
380
381 // ============== IncomingRequests Tests ==============
382
383 #[test]
384 fn incoming_register_and_complete() {
385 let mut incoming: IncomingRequests<String> = IncomingRequests::new();
386 let token = CancellationToken::new();
387
388 incoming.register(1.into(), "metadata".to_string(), token);
389 let data = incoming.complete(&1.into());
390
391 assert_eq!(data, Some("metadata".to_string()));
392 assert!(!incoming.is_pending(&1.into()));
393 }
394
395 #[test]
396 fn incoming_complete_unknown_returns_none() {
397 let mut incoming: IncomingRequests<String> = IncomingRequests::new();
398
399 let data = incoming.complete(&999.into());
400 assert_eq!(data, None);
401 }
402
403 #[test]
404 fn incoming_is_pending() {
405 let mut incoming: IncomingRequests<()> = IncomingRequests::new();
406
407 assert!(!incoming.is_pending(&1.into()));
408
409 let token = CancellationToken::new();
410 incoming.register(1.into(), (), token);
411 assert!(incoming.is_pending(&1.into()));
412
413 incoming.complete(&1.into());
414 assert!(!incoming.is_pending(&1.into()));
415 }
416
417 #[test]
418 fn incoming_pending_count() {
419 let mut incoming: IncomingRequests<i32> = IncomingRequests::new();
420
421 assert_eq!(incoming.pending_count(), 0);
422
423 let token1 = CancellationToken::new();
424 incoming.register(1.into(), 100, token1);
425 assert_eq!(incoming.pending_count(), 1);
426
427 let token2 = CancellationToken::new();
428 incoming.register(2.into(), 200, token2);
429 assert_eq!(incoming.pending_count(), 2);
430
431 incoming.complete(&1.into());
432 assert_eq!(incoming.pending_count(), 1);
433
434 incoming.complete(&2.into());
435 assert_eq!(incoming.pending_count(), 0);
436 }
437
438 #[test]
439 fn incoming_default() {
440 let incoming: IncomingRequests<()> = IncomingRequests::default();
441 assert_eq!(incoming.pending_count(), 0);
442 }
443
444 #[test]
445 fn incoming_cancel_triggers_token() {
446 let mut incoming: IncomingRequests<String> = IncomingRequests::new();
447 let token = CancellationToken::new();
448 let token_clone = token.clone();
449
450 incoming.register(1.into(), "data".to_string(), token);
451
452 // Cancel the request
453 assert!(incoming.cancel(&1.into()));
454
455 // Token should be cancelled
456 assert!(token_clone.is_cancelled());
457 }
458
459 #[test]
460 fn incoming_cancel_unknown_returns_false() {
461 let incoming: IncomingRequests<()> = IncomingRequests::new();
462 assert!(!incoming.cancel(&999.into()));
463 }
464
465 #[test]
466 fn incoming_cancel_idempotent() {
467 let mut incoming: IncomingRequests<()> = IncomingRequests::new();
468 let token = CancellationToken::new();
469
470 incoming.register(1.into(), (), token);
471
472 // Cancel twice - both should succeed
473 assert!(incoming.cancel(&1.into()));
474 assert!(incoming.cancel(&1.into())); // Still returns true, request still pending
475 }
476
477 #[test]
478 fn incoming_get_token_returns_clone() {
479 let mut incoming: IncomingRequests<String> = IncomingRequests::new();
480 let original_token = CancellationToken::new();
481
482 incoming.register(1.into(), "data".to_string(), original_token.clone());
483
484 // Get the token
485 let retrieved = incoming.get_token(&1.into());
486 assert!(retrieved.is_some());
487
488 // Cancel via retrieved token
489 retrieved.unwrap().cancel();
490
491 // Original should also be cancelled (they're the same underlying token)
492 assert!(original_token.is_cancelled());
493 }
494
495 #[test]
496 fn incoming_get_token_unknown_returns_none() {
497 let incoming: IncomingRequests<()> = IncomingRequests::new();
498 assert!(incoming.get_token(&999.into()).is_none());
499 }
500
501 #[test]
502 fn incoming_complete_after_cancel_returns_data() {
503 let mut incoming: IncomingRequests<String> = IncomingRequests::new();
504 let token = CancellationToken::new();
505
506 incoming.register(1.into(), "cancelled_data".to_string(), token);
507
508 // Cancel first
509 let _ = incoming.cancel(&1.into());
510
511 // Complete should still return the data
512 let data = incoming.complete(&1.into());
513 assert_eq!(data, Some("cancelled_data".to_string()));
514 }
515
516 // ============== OutgoingRequests Tests ==============
517
518 #[tokio::test]
519 async fn outgoing_register_and_complete() {
520 let mut outgoing = OutgoingRequests::new();
521 let rx = outgoing.register(1.into());
522
523 let response = Response::ok(1, serde_json::json!("response"));
524 assert!(outgoing.complete(&1.into(), response.clone()));
525 assert_eq!(rx.await.unwrap().id, response.id);
526 }
527
528 #[test]
529 fn outgoing_complete_unknown_returns_false() {
530 let mut outgoing = OutgoingRequests::new();
531
532 let result = outgoing.complete(&999.into(), Response::ok(999, serde_json::json!(null)));
533 assert!(!result);
534 }
535
536 #[tokio::test]
537 async fn outgoing_cancel_drops_sender() {
538 let mut outgoing = OutgoingRequests::new();
539 let rx = outgoing.register(1.into());
540
541 assert!(outgoing.cancel(&1.into()));
542 assert!(!outgoing.is_pending(&1.into()));
543
544 // Receiver should get an error since sender was dropped
545 assert!(rx.await.is_err());
546 }
547
548 #[test]
549 fn outgoing_cancel_unknown_returns_false() {
550 let mut outgoing = OutgoingRequests::new();
551
552 assert!(!outgoing.cancel(&999.into()));
553 }
554
555 #[test]
556 fn outgoing_is_pending() {
557 let mut outgoing = OutgoingRequests::new();
558
559 assert!(!outgoing.is_pending(&1.into()));
560
561 let _rx = outgoing.register(1.into());
562 assert!(outgoing.is_pending(&1.into()));
563
564 outgoing.complete(&1.into(), Response::ok(1, serde_json::json!(null)));
565 assert!(!outgoing.is_pending(&1.into()));
566 }
567
568 #[test]
569 fn outgoing_pending_count() {
570 let mut outgoing = OutgoingRequests::new();
571
572 assert_eq!(outgoing.pending_count(), 0);
573
574 let _rx1 = outgoing.register(1.into());
575 assert_eq!(outgoing.pending_count(), 1);
576
577 let _rx2 = outgoing.register(2.into());
578 assert_eq!(outgoing.pending_count(), 2);
579
580 outgoing.complete(&1.into(), Response::ok(1, serde_json::json!(null)));
581 assert_eq!(outgoing.pending_count(), 1);
582
583 outgoing.cancel(&2.into());
584 assert_eq!(outgoing.pending_count(), 0);
585 }
586
587 #[test]
588 fn outgoing_default() {
589 let outgoing = OutgoingRequests::default();
590 assert_eq!(outgoing.pending_count(), 0);
591 }
592
593 // ============== RequestQueue Tests ==============
594
595 #[test]
596 fn queue_new_creates_empty() {
597 let queue: RequestQueue<()> = RequestQueue::new();
598
599 assert_eq!(queue.incoming.pending_count(), 0);
600 assert_eq!(queue.outgoing.pending_count(), 0);
601 }
602
603 #[test]
604 fn queue_incoming_outgoing_independent() {
605 let mut queue: RequestQueue<String> = RequestQueue::new();
606
607 // Register on incoming
608 let token = CancellationToken::new();
609 queue
610 .incoming
611 .register(1.into(), "incoming".to_string(), token);
612 assert_eq!(queue.incoming.pending_count(), 1);
613 assert_eq!(queue.outgoing.pending_count(), 0);
614
615 // Register on outgoing
616 let _rx = queue.outgoing.register(2.into());
617 assert_eq!(queue.incoming.pending_count(), 1);
618 assert_eq!(queue.outgoing.pending_count(), 1);
619
620 // Complete incoming doesn't affect outgoing
621 queue.incoming.complete(&1.into());
622 assert_eq!(queue.incoming.pending_count(), 0);
623 assert_eq!(queue.outgoing.pending_count(), 1);
624 }
625
626 #[test]
627 fn queue_default() {
628 let queue: RequestQueue<()> = RequestQueue::default();
629 assert_eq!(queue.incoming.pending_count(), 0);
630 assert_eq!(queue.outgoing.pending_count(), 0);
631 }
632
633 #[test]
634 fn queue_with_string_request_id() {
635 let mut queue: RequestQueue<i32> = RequestQueue::new();
636
637 let str_id: RequestId = "abc-123".into();
638 let token = CancellationToken::new();
639 queue.incoming.register(str_id.clone(), 42, token);
640
641 assert!(queue.incoming.is_pending(&str_id));
642 assert_eq!(queue.incoming.complete(&str_id), Some(42));
643 }
644
645 // ============== parse_cancel_params Tests ==============
646
647 use super::parse_cancel_params;
648
649 #[test]
650 fn parse_cancel_params_integer_id() {
651 let params = Some(serde_json::json!({"id": 42}));
652 let id = parse_cancel_params(¶ms);
653 assert_eq!(id, Some(RequestId::Integer(42)));
654 }
655
656 #[test]
657 fn parse_cancel_params_string_id() {
658 let params = Some(serde_json::json!({"id": "request-123"}));
659 let id = parse_cancel_params(¶ms);
660 assert_eq!(id, Some(RequestId::String("request-123".to_string())));
661 }
662
663 #[test]
664 fn parse_cancel_params_missing_params() {
665 let id = parse_cancel_params(&None);
666 assert!(id.is_none());
667 }
668
669 #[test]
670 fn parse_cancel_params_missing_id_field() {
671 let params = Some(serde_json::json!({"other": "field"}));
672 let id = parse_cancel_params(¶ms);
673 assert!(id.is_none());
674 }
675
676 #[test]
677 fn parse_cancel_params_invalid_id_type() {
678 let params = Some(serde_json::json!({"id": true}));
679 let id = parse_cancel_params(¶ms);
680 assert!(id.is_none());
681 }
682
683 #[test]
684 fn parse_cancel_params_null_id() {
685 let params = Some(serde_json::json!({"id": null}));
686 let id = parse_cancel_params(¶ms);
687 assert!(id.is_none());
688 }
689}