Skip to main content

lsp_server_tokio/
request_queue.rs

1//! Request queue types for tracking pending LSP requests.
2//!
3//! This module provides types for tracking request-response correlation in both
4//! directions of LSP communication:
5//!
6//! - [`IncomingRequests`] - Tracks requests received from clients (server needs to send responses)
7//! - [`OutgoingRequests`] - Tracks requests sent to clients (server is waiting for responses)
8//! - [`RequestQueue`] - Combines both for complete request lifecycle management
9//!
10//! # Usage Pattern
11//!
12//! ```
13//! use lsp_server_tokio::{RequestQueue, RequestId};
14//! use tokio_util::sync::CancellationToken;
15//!
16//! // Create a queue with custom metadata types
17//! let mut queue: RequestQueue<String, String> = RequestQueue::new();
18//!
19//! // Track an incoming request (from client) with a cancellation token
20//! let request_id: RequestId = 1.into();
21//! let token = CancellationToken::new();
22//! queue.incoming.register(request_id.clone(), "handler_context".to_string(), token);
23//! assert!(queue.incoming.is_pending(&request_id));
24//!
25//! // When ready to respond, complete the request
26//! let metadata = queue.incoming.complete(&request_id);
27//! assert_eq!(metadata, Some("handler_context".to_string()));
28//! ```
29//!
30//! # Server-Initiated Requests
31//!
32//! ```
33//! use lsp_server_tokio::{RequestQueue, RequestId};
34//!
35//! # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
36//! let mut queue: RequestQueue<(), String> = RequestQueue::new();
37//!
38//! // Register an outgoing request (to client) and get a receiver
39//! let request_id: RequestId = 42.into();
40//! let rx = queue.outgoing.register(request_id.clone());
41//!
42//! // When response arrives, complete the request
43//! let sent = queue.outgoing.complete(&request_id, "response data".to_string());
44//! assert!(sent);
45//!
46//! // The receiver gets the response
47//! let response = rx.await.unwrap();
48//! assert_eq!(response, "response data");
49//! # });
50//! ```
51
52use std::collections::HashMap;
53use tokio::sync::oneshot;
54use tokio_util::sync::CancellationToken;
55
56use crate::request_id::RequestId;
57
58/// The method name for cancel request notifications per LSP specification.
59pub const CANCEL_REQUEST_METHOD: &str = "$/cancelRequest";
60
61/// Parses the request ID from $/cancelRequest notification params.
62///
63/// According to the LSP specification, $/cancelRequest has params:
64/// ```json
65/// { "id": number | string }
66/// ```
67///
68/// Returns `None` if params are missing, malformed, or the ID is not
69/// a valid integer or string.
70///
71/// # Example
72///
73/// ```
74/// use lsp_server_tokio::parse_cancel_params;
75/// use serde_json::json;
76///
77/// // Integer ID
78/// let params = Some(json!({"id": 42}));
79/// let id = parse_cancel_params(&params);
80/// assert_eq!(id, Some(42.into()));
81///
82/// // String ID
83/// let params = Some(json!({"id": "request-abc"}));
84/// let id = parse_cancel_params(&params);
85/// assert_eq!(id, Some("request-abc".into()));
86///
87/// // Missing params
88/// let id = parse_cancel_params(&None);
89/// assert!(id.is_none());
90/// ```
91#[must_use]
92pub fn parse_cancel_params(params: &Option<serde_json::Value>) -> Option<RequestId> {
93    let params = params.as_ref()?;
94    let id_value = params.get("id")?;
95
96    match id_value {
97        serde_json::Value::Number(n) => n
98            .as_i64()
99            .and_then(|i| i32::try_from(i).ok())
100            .map(RequestId::Integer),
101        serde_json::Value::String(s) => Some(RequestId::String(s.clone())),
102        _ => None,
103    }
104}
105
106/// Tracks requests received from clients (incoming to the server).
107///
108/// When the server receives a request, it registers the request ID along with
109/// any metadata needed to process the response. When the server is ready to
110/// send a response, it completes the request to retrieve the metadata.
111///
112/// Each request is also associated with a [`CancellationToken`] that can be
113/// used to signal cancellation (e.g., when receiving `$/cancelRequest`).
114///
115/// The generic parameter `I` represents user-defined metadata associated with
116/// each incoming request (e.g., handler context, timing info, request origin).
117///
118/// # Example
119///
120/// ```
121/// use lsp_server_tokio::{IncomingRequests, RequestId};
122/// use tokio_util::sync::CancellationToken;
123///
124/// let mut incoming: IncomingRequests<String> = IncomingRequests::new();
125///
126/// // Register a request with metadata and cancellation token
127/// let token1 = CancellationToken::new();
128/// let token2 = CancellationToken::new();
129/// incoming.register(1.into(), "textDocument/hover".to_string(), token1);
130/// incoming.register(2.into(), "textDocument/completion".to_string(), token2);
131///
132/// assert_eq!(incoming.pending_count(), 2);
133/// assert!(incoming.is_pending(&1.into()));
134///
135/// // Cancel a request
136/// incoming.cancel(&2.into());
137///
138/// // Complete request and get metadata back
139/// let method = incoming.complete(&1.into());
140/// assert_eq!(method, Some("textDocument/hover".to_string()));
141/// assert_eq!(incoming.pending_count(), 1);
142/// ```
143#[derive(Debug)]
144pub struct IncomingRequests<I> {
145    pending: HashMap<RequestId, (I, CancellationToken)>,
146}
147
148impl<I> IncomingRequests<I> {
149    /// Creates a new empty incoming request tracker.
150    #[must_use]
151    pub fn new() -> Self {
152        Self {
153            pending: HashMap::new(),
154        }
155    }
156
157    /// Registers an incoming request with associated metadata and cancellation token.
158    ///
159    /// The metadata can be any user-defined type that you want to associate
160    /// with this request until it's completed. The cancellation token can be
161    /// used to signal request cancellation to async handlers.
162    pub fn register(&mut self, id: RequestId, data: I, token: CancellationToken) {
163        self.pending.insert(id, (data, token));
164    }
165
166    /// Completes an incoming request, removing it from tracking and returning the metadata.
167    ///
168    /// Returns `Some(metadata)` if the request was pending, `None` otherwise.
169    /// The cancellation token is dropped when the request is completed.
170    pub fn complete(&mut self, id: &RequestId) -> Option<I> {
171        self.pending.remove(id).map(|(data, _)| data)
172    }
173
174    /// Returns `true` if the request is currently pending.
175    #[must_use]
176    pub fn is_pending(&self, id: &RequestId) -> bool {
177        self.pending.contains_key(id)
178    }
179
180    /// Cancels a pending request by triggering its cancellation token.
181    ///
182    /// Returns `true` if the request was found and cancelled, `false` if the
183    /// request ID was not pending. Note that cancelling an already-cancelled
184    /// token is a no-op.
185    #[must_use]
186    pub fn cancel(&self, id: &RequestId) -> bool {
187        if let Some((_, token)) = self.pending.get(id) {
188            token.cancel();
189            true
190        } else {
191            false
192        }
193    }
194
195    /// Returns a clone of the cancellation token for a pending request.
196    ///
197    /// Returns `None` if the request is not pending. The returned token
198    /// can be passed to async handlers for cooperative cancellation.
199    #[must_use]
200    pub fn get_token(&self, id: &RequestId) -> Option<CancellationToken> {
201        self.pending.get(id).map(|(_, token)| token.clone())
202    }
203
204    /// Returns the number of currently pending requests.
205    #[must_use]
206    pub fn pending_count(&self) -> usize {
207        self.pending.len()
208    }
209}
210
211impl<I> Default for IncomingRequests<I> {
212    fn default() -> Self {
213        Self::new()
214    }
215}
216
217/// Tracks requests sent to clients (outgoing from the server).
218///
219/// When the server sends a request to the client, it registers the request ID
220/// and receives a oneshot receiver. When the client's response arrives, the
221/// server completes the request, sending the response to the waiting receiver.
222///
223/// The generic parameter `O` represents the response type that will be delivered
224/// when the request completes.
225///
226/// # Example
227///
228/// ```
229/// use lsp_server_tokio::{OutgoingRequests, RequestId};
230///
231/// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
232/// let mut outgoing: OutgoingRequests<String> = OutgoingRequests::new();
233///
234/// // Register an outgoing request
235/// let rx = outgoing.register(1.into());
236/// assert!(outgoing.is_pending(&1.into()));
237///
238/// // Simulate receiving a response
239/// let completed = outgoing.complete(&1.into(), "result".to_string());
240/// assert!(completed);
241///
242/// // Receiver gets the response
243/// let result = rx.await.unwrap();
244/// assert_eq!(result, "result");
245/// # });
246/// ```
247#[derive(Debug)]
248pub struct OutgoingRequests<O> {
249    pending: HashMap<RequestId, oneshot::Sender<O>>,
250}
251
252impl<O> OutgoingRequests<O> {
253    /// Creates a new empty outgoing request tracker.
254    #[must_use]
255    pub fn new() -> Self {
256        Self {
257            pending: HashMap::new(),
258        }
259    }
260
261    /// Registers an outgoing request and returns a receiver for the response.
262    ///
263    /// The returned receiver will receive the response value when [`complete`](Self::complete)
264    /// is called with a matching ID. If the request is cancelled via [`cancel`](Self::cancel),
265    /// the receiver will return a `RecvError`.
266    pub fn register(&mut self, id: RequestId) -> oneshot::Receiver<O> {
267        let (tx, rx) = oneshot::channel();
268        self.pending.insert(id, tx);
269        rx
270    }
271
272    /// Completes an outgoing request by sending the response to the waiting receiver.
273    ///
274    /// Returns `true` if the request was pending and the response was sent,
275    /// `false` if the request was not found.
276    ///
277    /// Note: This returns `true` even if the receiver was dropped (the response is
278    /// still considered "completed" from the queue's perspective).
279    pub fn complete(&mut self, id: &RequestId, response: O) -> bool {
280        if let Some(tx) = self.pending.remove(id) {
281            // Ignore send error - receiver may have been dropped
282            let _ = tx.send(response);
283            true
284        } else {
285            false
286        }
287    }
288
289    /// Cancels an outgoing request without sending a response.
290    ///
291    /// The sender is dropped, causing the receiver to return `RecvError`.
292    ///
293    /// Returns `true` if the request was pending, `false` otherwise.
294    pub fn cancel(&mut self, id: &RequestId) -> bool {
295        self.pending.remove(id).is_some()
296    }
297
298    /// Returns `true` if the request is currently pending.
299    #[must_use]
300    pub fn is_pending(&self, id: &RequestId) -> bool {
301        self.pending.contains_key(id)
302    }
303
304    /// Returns the number of currently pending requests.
305    #[must_use]
306    pub fn pending_count(&self) -> usize {
307        self.pending.len()
308    }
309}
310
311impl<O> Default for OutgoingRequests<O> {
312    fn default() -> Self {
313        Self::new()
314    }
315}
316
317/// Combined request queue tracking both incoming and outgoing requests.
318///
319/// This is the primary type for managing LSP request-response correlation.
320/// It provides separate tracking for:
321///
322/// - `incoming`: Requests received from clients that need responses
323/// - `outgoing`: Requests sent to clients that are awaiting responses
324///
325/// # Type Parameters
326///
327/// - `I`: Metadata type for incoming requests (e.g., handler context)
328/// - `O`: Response type for outgoing requests
329///
330/// # Example
331///
332/// ```
333/// use lsp_server_tokio::{RequestQueue, RequestId};
334/// use tokio_util::sync::CancellationToken;
335///
336/// // Create a queue for a server that tracks method names for incoming
337/// // requests and expects JSON responses for outgoing requests
338/// let mut queue: RequestQueue<String, serde_json::Value> = RequestQueue::new();
339///
340/// // Track incoming request with cancellation token
341/// let token = CancellationToken::new();
342/// queue.incoming.register(1.into(), "textDocument/hover".to_string(), token);
343///
344/// // Operations on incoming don't affect outgoing
345/// assert_eq!(queue.incoming.pending_count(), 1);
346/// assert_eq!(queue.outgoing.pending_count(), 0);
347/// ```
348#[derive(Debug)]
349pub struct RequestQueue<I, O> {
350    /// Tracker for requests received from clients.
351    pub incoming: IncomingRequests<I>,
352    /// Tracker for requests sent to clients.
353    pub outgoing: OutgoingRequests<O>,
354}
355
356impl<I, O> RequestQueue<I, O> {
357    /// Creates a new empty request queue.
358    #[must_use]
359    pub fn new() -> Self {
360        Self {
361            incoming: IncomingRequests::new(),
362            outgoing: OutgoingRequests::new(),
363        }
364    }
365}
366
367impl<I, O> Default for RequestQueue<I, O> {
368    fn default() -> Self {
369        Self::new()
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376    use tokio_util::sync::CancellationToken;
377
378    // ============== IncomingRequests Tests ==============
379
380    #[test]
381    fn incoming_register_and_complete() {
382        let mut incoming: IncomingRequests<String> = IncomingRequests::new();
383        let token = CancellationToken::new();
384
385        incoming.register(1.into(), "metadata".to_string(), token);
386        let data = incoming.complete(&1.into());
387
388        assert_eq!(data, Some("metadata".to_string()));
389        assert!(!incoming.is_pending(&1.into()));
390    }
391
392    #[test]
393    fn incoming_complete_unknown_returns_none() {
394        let mut incoming: IncomingRequests<String> = IncomingRequests::new();
395
396        let data = incoming.complete(&999.into());
397        assert_eq!(data, None);
398    }
399
400    #[test]
401    fn incoming_is_pending() {
402        let mut incoming: IncomingRequests<()> = IncomingRequests::new();
403
404        assert!(!incoming.is_pending(&1.into()));
405
406        let token = CancellationToken::new();
407        incoming.register(1.into(), (), token);
408        assert!(incoming.is_pending(&1.into()));
409
410        incoming.complete(&1.into());
411        assert!(!incoming.is_pending(&1.into()));
412    }
413
414    #[test]
415    fn incoming_pending_count() {
416        let mut incoming: IncomingRequests<i32> = IncomingRequests::new();
417
418        assert_eq!(incoming.pending_count(), 0);
419
420        let token1 = CancellationToken::new();
421        incoming.register(1.into(), 100, token1);
422        assert_eq!(incoming.pending_count(), 1);
423
424        let token2 = CancellationToken::new();
425        incoming.register(2.into(), 200, token2);
426        assert_eq!(incoming.pending_count(), 2);
427
428        incoming.complete(&1.into());
429        assert_eq!(incoming.pending_count(), 1);
430
431        incoming.complete(&2.into());
432        assert_eq!(incoming.pending_count(), 0);
433    }
434
435    #[test]
436    fn incoming_default() {
437        let incoming: IncomingRequests<()> = IncomingRequests::default();
438        assert_eq!(incoming.pending_count(), 0);
439    }
440
441    #[test]
442    fn incoming_cancel_triggers_token() {
443        let mut incoming: IncomingRequests<String> = IncomingRequests::new();
444        let token = CancellationToken::new();
445        let token_clone = token.clone();
446
447        incoming.register(1.into(), "data".to_string(), token);
448
449        // Cancel the request
450        assert!(incoming.cancel(&1.into()));
451
452        // Token should be cancelled
453        assert!(token_clone.is_cancelled());
454    }
455
456    #[test]
457    fn incoming_cancel_unknown_returns_false() {
458        let incoming: IncomingRequests<()> = IncomingRequests::new();
459        assert!(!incoming.cancel(&999.into()));
460    }
461
462    #[test]
463    fn incoming_cancel_idempotent() {
464        let mut incoming: IncomingRequests<()> = IncomingRequests::new();
465        let token = CancellationToken::new();
466
467        incoming.register(1.into(), (), token);
468
469        // Cancel twice - both should succeed
470        assert!(incoming.cancel(&1.into()));
471        assert!(incoming.cancel(&1.into())); // Still returns true, request still pending
472    }
473
474    #[test]
475    fn incoming_get_token_returns_clone() {
476        let mut incoming: IncomingRequests<String> = IncomingRequests::new();
477        let original_token = CancellationToken::new();
478
479        incoming.register(1.into(), "data".to_string(), original_token.clone());
480
481        // Get the token
482        let retrieved = incoming.get_token(&1.into());
483        assert!(retrieved.is_some());
484
485        // Cancel via retrieved token
486        retrieved.unwrap().cancel();
487
488        // Original should also be cancelled (they're the same underlying token)
489        assert!(original_token.is_cancelled());
490    }
491
492    #[test]
493    fn incoming_get_token_unknown_returns_none() {
494        let incoming: IncomingRequests<()> = IncomingRequests::new();
495        assert!(incoming.get_token(&999.into()).is_none());
496    }
497
498    #[test]
499    fn incoming_complete_after_cancel_returns_data() {
500        let mut incoming: IncomingRequests<String> = IncomingRequests::new();
501        let token = CancellationToken::new();
502
503        incoming.register(1.into(), "cancelled_data".to_string(), token);
504
505        // Cancel first
506        let _ = incoming.cancel(&1.into());
507
508        // Complete should still return the data
509        let data = incoming.complete(&1.into());
510        assert_eq!(data, Some("cancelled_data".to_string()));
511    }
512
513    // ============== OutgoingRequests Tests ==============
514
515    #[tokio::test]
516    async fn outgoing_register_and_complete() {
517        let mut outgoing: OutgoingRequests<String> = OutgoingRequests::new();
518        let rx = outgoing.register(1.into());
519
520        assert!(outgoing.complete(&1.into(), "response".to_string()));
521        assert_eq!(rx.await.unwrap(), "response");
522    }
523
524    #[test]
525    fn outgoing_complete_unknown_returns_false() {
526        let mut outgoing: OutgoingRequests<String> = OutgoingRequests::new();
527
528        let result = outgoing.complete(&999.into(), "response".to_string());
529        assert!(!result);
530    }
531
532    #[tokio::test]
533    async fn outgoing_cancel_drops_sender() {
534        let mut outgoing: OutgoingRequests<String> = OutgoingRequests::new();
535        let rx = outgoing.register(1.into());
536
537        assert!(outgoing.cancel(&1.into()));
538        assert!(!outgoing.is_pending(&1.into()));
539
540        // Receiver should get an error since sender was dropped
541        assert!(rx.await.is_err());
542    }
543
544    #[test]
545    fn outgoing_cancel_unknown_returns_false() {
546        let mut outgoing: OutgoingRequests<String> = OutgoingRequests::new();
547
548        assert!(!outgoing.cancel(&999.into()));
549    }
550
551    #[test]
552    fn outgoing_is_pending() {
553        let mut outgoing: OutgoingRequests<()> = OutgoingRequests::new();
554
555        assert!(!outgoing.is_pending(&1.into()));
556
557        let _rx = outgoing.register(1.into());
558        assert!(outgoing.is_pending(&1.into()));
559
560        outgoing.complete(&1.into(), ());
561        assert!(!outgoing.is_pending(&1.into()));
562    }
563
564    #[test]
565    fn outgoing_pending_count() {
566        let mut outgoing: OutgoingRequests<i32> = OutgoingRequests::new();
567
568        assert_eq!(outgoing.pending_count(), 0);
569
570        let _rx1 = outgoing.register(1.into());
571        assert_eq!(outgoing.pending_count(), 1);
572
573        let _rx2 = outgoing.register(2.into());
574        assert_eq!(outgoing.pending_count(), 2);
575
576        outgoing.complete(&1.into(), 100);
577        assert_eq!(outgoing.pending_count(), 1);
578
579        outgoing.cancel(&2.into());
580        assert_eq!(outgoing.pending_count(), 0);
581    }
582
583    #[test]
584    fn outgoing_default() {
585        let outgoing: OutgoingRequests<()> = OutgoingRequests::default();
586        assert_eq!(outgoing.pending_count(), 0);
587    }
588
589    // ============== RequestQueue Tests ==============
590
591    #[test]
592    fn queue_new_creates_empty() {
593        let queue: RequestQueue<(), ()> = RequestQueue::new();
594
595        assert_eq!(queue.incoming.pending_count(), 0);
596        assert_eq!(queue.outgoing.pending_count(), 0);
597    }
598
599    #[test]
600    fn queue_incoming_outgoing_independent() {
601        let mut queue: RequestQueue<String, String> = RequestQueue::new();
602
603        // Register on incoming
604        let token = CancellationToken::new();
605        queue
606            .incoming
607            .register(1.into(), "incoming".to_string(), token);
608        assert_eq!(queue.incoming.pending_count(), 1);
609        assert_eq!(queue.outgoing.pending_count(), 0);
610
611        // Register on outgoing
612        let _rx = queue.outgoing.register(2.into());
613        assert_eq!(queue.incoming.pending_count(), 1);
614        assert_eq!(queue.outgoing.pending_count(), 1);
615
616        // Complete incoming doesn't affect outgoing
617        queue.incoming.complete(&1.into());
618        assert_eq!(queue.incoming.pending_count(), 0);
619        assert_eq!(queue.outgoing.pending_count(), 1);
620    }
621
622    #[test]
623    fn queue_default() {
624        let queue: RequestQueue<(), ()> = RequestQueue::default();
625        assert_eq!(queue.incoming.pending_count(), 0);
626        assert_eq!(queue.outgoing.pending_count(), 0);
627    }
628
629    #[test]
630    fn queue_with_string_request_id() {
631        let mut queue: RequestQueue<i32, i32> = RequestQueue::new();
632
633        let str_id: RequestId = "abc-123".into();
634        let token = CancellationToken::new();
635        queue.incoming.register(str_id.clone(), 42, token);
636
637        assert!(queue.incoming.is_pending(&str_id));
638        assert_eq!(queue.incoming.complete(&str_id), Some(42));
639    }
640
641    // ============== parse_cancel_params Tests ==============
642
643    use super::parse_cancel_params;
644
645    #[test]
646    fn parse_cancel_params_integer_id() {
647        let params = Some(serde_json::json!({"id": 42}));
648        let id = parse_cancel_params(&params);
649        assert_eq!(id, Some(RequestId::Integer(42)));
650    }
651
652    #[test]
653    fn parse_cancel_params_string_id() {
654        let params = Some(serde_json::json!({"id": "request-123"}));
655        let id = parse_cancel_params(&params);
656        assert_eq!(id, Some(RequestId::String("request-123".to_string())));
657    }
658
659    #[test]
660    fn parse_cancel_params_missing_params() {
661        let id = parse_cancel_params(&None);
662        assert!(id.is_none());
663    }
664
665    #[test]
666    fn parse_cancel_params_missing_id_field() {
667        let params = Some(serde_json::json!({"other": "field"}));
668        let id = parse_cancel_params(&params);
669        assert!(id.is_none());
670    }
671
672    #[test]
673    fn parse_cancel_params_invalid_id_type() {
674        let params = Some(serde_json::json!({"id": true}));
675        let id = parse_cancel_params(&params);
676        assert!(id.is_none());
677    }
678
679    #[test]
680    fn parse_cancel_params_null_id() {
681        let params = Some(serde_json::json!({"id": null}));
682        let id = parse_cancel_params(&params);
683        assert!(id.is_none());
684    }
685}