Skip to main content

lsp_server_tokio/
request_queue.rs

1//! Request queue types for tracking pending LSP requests.
2//!
3//! This module provides types for tracking request-response correlation in both
4//! directions of LSP communication:
5//!
6//! - [`IncomingRequests`] - Tracks requests received from clients (server needs to send responses)
7//! - [`OutgoingRequests`] - Tracks requests sent to clients (server is waiting for responses)
8//! - [`RequestQueue`] - Combines both for complete request lifecycle management
9//!
10//! # Usage Pattern
11//!
12//! ```
13//! use lsp_server_tokio::{RequestQueue, RequestId};
14//! use tokio_util::sync::CancellationToken;
15//!
16//! // Create a queue with custom metadata types
17//! let mut queue: RequestQueue<String> = RequestQueue::new();
18//!
19//! // Track an incoming request (from client) with a cancellation token
20//! let request_id: RequestId = 1.into();
21//! let token = CancellationToken::new();
22//! queue.incoming.register(request_id.clone(), "handler_context".to_string(), token);
23//! assert!(queue.incoming.is_pending(&request_id));
24//!
25//! // When ready to respond, complete the request
26//! let metadata = queue.incoming.complete(&request_id);
27//! assert_eq!(metadata, Some("handler_context".to_string()));
28//! ```
29//!
30//! # Server-Initiated Requests
31//!
32//! ```
33//! use lsp_server_tokio::{RequestQueue, RequestId, Response};
34//! use serde_json::json;
35//!
36//! # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
37//! let mut queue: RequestQueue<()> = RequestQueue::new();
38//!
39//! // Register an outgoing request (to client) and get a receiver
40//! let request_id: RequestId = 42.into();
41//! let rx = queue.outgoing.register(request_id.clone());
42//!
43//! // When response arrives, complete the request
44//! let response = Response::ok(42, json!({"result": "ok"}));
45//! let sent = queue.outgoing.complete(&request_id, response);
46//! assert!(sent);
47//!
48//! // The receiver gets the response
49//! let received = rx.await.unwrap();
50//! assert_eq!(received.id, Some(42.into()));
51//! # });
52//! ```
53
54use std::collections::HashMap;
55use tokio::sync::oneshot;
56use tokio_util::sync::CancellationToken;
57
58use crate::request_id::RequestId;
59use crate::Response;
60
61/// The method name for cancel request notifications per LSP specification.
62pub const CANCEL_REQUEST_METHOD: &str = "$/cancelRequest";
63
64/// Parses the request ID from $/cancelRequest notification params.
65///
66/// According to the LSP specification, $/cancelRequest has params:
67/// ```json
68/// { "id": number | string }
69/// ```
70///
71/// Returns `None` if params are missing, malformed, or the ID is not
72/// a valid integer or string.
73///
74/// # Example
75///
76/// ```
77/// use lsp_server_tokio::parse_cancel_params;
78/// use serde_json::json;
79///
80/// // Integer ID
81/// let params = Some(json!({"id": 42}));
82/// let id = parse_cancel_params(&params);
83/// assert_eq!(id, Some(42.into()));
84///
85/// // String ID
86/// let params = Some(json!({"id": "request-abc"}));
87/// let id = parse_cancel_params(&params);
88/// assert_eq!(id, Some("request-abc".into()));
89///
90/// // Missing params
91/// let id = parse_cancel_params(&None);
92/// assert!(id.is_none());
93/// ```
94#[must_use]
95pub fn parse_cancel_params(params: &Option<serde_json::Value>) -> Option<RequestId> {
96    let params = params.as_ref()?;
97    let id_value = params.get("id")?;
98
99    match id_value {
100        serde_json::Value::Number(n) => n
101            .as_i64()
102            .and_then(|i| i32::try_from(i).ok())
103            .map(RequestId::Integer),
104        serde_json::Value::String(s) => Some(RequestId::String(s.clone())),
105        _ => None,
106    }
107}
108
109/// Tracks requests received from clients (incoming to the server).
110///
111/// When the server receives a request, it registers the request ID along with
112/// any metadata needed to process the response. When the server is ready to
113/// send a response, it completes the request to retrieve the metadata.
114///
115/// Each request is also associated with a [`CancellationToken`] that can be
116/// used to signal cancellation (e.g., when receiving `$/cancelRequest`).
117///
118/// The generic parameter `I` represents user-defined metadata associated with
119/// each incoming request (e.g., handler context, timing info, request origin).
120///
121/// # Example
122///
123/// ```
124/// use lsp_server_tokio::{IncomingRequests, RequestId};
125/// use tokio_util::sync::CancellationToken;
126///
127/// let mut incoming: IncomingRequests<String> = IncomingRequests::new();
128///
129/// // Register a request with metadata and cancellation token
130/// let token1 = CancellationToken::new();
131/// let token2 = CancellationToken::new();
132/// incoming.register(1.into(), "textDocument/hover".to_string(), token1);
133/// incoming.register(2.into(), "textDocument/completion".to_string(), token2);
134///
135/// assert_eq!(incoming.pending_count(), 2);
136/// assert!(incoming.is_pending(&1.into()));
137///
138/// // Cancel a request
139/// incoming.cancel(&2.into());
140///
141/// // Complete request and get metadata back
142/// let method = incoming.complete(&1.into());
143/// assert_eq!(method, Some("textDocument/hover".to_string()));
144/// assert_eq!(incoming.pending_count(), 1);
145/// ```
146#[derive(Debug)]
147pub struct IncomingRequests<I> {
148    pending: HashMap<RequestId, (I, CancellationToken)>,
149}
150
151impl<I> IncomingRequests<I> {
152    /// Creates a new empty incoming request tracker.
153    #[must_use]
154    pub fn new() -> Self {
155        Self {
156            pending: HashMap::new(),
157        }
158    }
159
160    /// Registers an incoming request with associated metadata and cancellation token.
161    ///
162    /// The metadata can be any user-defined type that you want to associate
163    /// with this request until it's completed. The cancellation token can be
164    /// used to signal request cancellation to async handlers.
165    pub fn register(&mut self, id: RequestId, data: I, token: CancellationToken) {
166        self.pending.insert(id, (data, token));
167    }
168
169    /// Completes an incoming request, removing it from tracking and returning the metadata.
170    ///
171    /// Returns `Some(metadata)` if the request was pending, `None` otherwise.
172    /// The cancellation token is dropped when the request is completed.
173    pub fn complete(&mut self, id: &RequestId) -> Option<I> {
174        self.pending.remove(id).map(|(data, _)| data)
175    }
176
177    /// Returns `true` if the request is currently pending.
178    #[must_use]
179    pub fn is_pending(&self, id: &RequestId) -> bool {
180        self.pending.contains_key(id)
181    }
182
183    /// Cancels a pending request by triggering its cancellation token.
184    ///
185    /// Returns `true` if the request was found and cancelled, `false` if the
186    /// request ID was not pending. Note that cancelling an already-cancelled
187    /// token is a no-op.
188    #[must_use]
189    pub fn cancel(&self, id: &RequestId) -> bool {
190        if let Some((_, token)) = self.pending.get(id) {
191            token.cancel();
192            true
193        } else {
194            false
195        }
196    }
197
198    /// Returns a clone of the cancellation token for a pending request.
199    ///
200    /// Returns `None` if the request is not pending. The returned token
201    /// can be passed to async handlers for cooperative cancellation.
202    #[must_use]
203    pub fn get_token(&self, id: &RequestId) -> Option<CancellationToken> {
204        self.pending.get(id).map(|(_, token)| token.clone())
205    }
206
207    /// Returns the number of currently pending requests.
208    #[must_use]
209    pub fn pending_count(&self) -> usize {
210        self.pending.len()
211    }
212}
213
214impl<I> Default for IncomingRequests<I> {
215    fn default() -> Self {
216        Self::new()
217    }
218}
219
220/// Tracks requests sent to clients (outgoing from the server).
221///
222/// When the server sends a request to the client, it registers the request ID
223/// and receives a oneshot receiver. When the client's response arrives, the
224/// server completes the request, sending the response to the waiting receiver.
225///
226/// Responses are always [`Response`](crate::Response) — the standard JSON-RPC response type.
227///
228/// # Example
229///
230/// ```
231/// use lsp_server_tokio::{OutgoingRequests, RequestId, Response};
232/// use serde_json::json;
233///
234/// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
235/// let mut outgoing = OutgoingRequests::new();
236///
237/// // Register an outgoing request
238/// let rx = outgoing.register(1.into());
239/// assert!(outgoing.is_pending(&1.into()));
240///
241/// // Simulate receiving a response
242/// let response = Response::ok(1, json!({"result": "success"}));
243/// let completed = outgoing.complete(&1.into(), response);
244/// assert!(completed);
245///
246/// // Receiver gets the response
247/// let result = rx.await.unwrap();
248/// assert_eq!(result.id, Some(1.into()));
249/// # });
250/// ```
251#[derive(Debug)]
252pub struct OutgoingRequests {
253    pending: HashMap<RequestId, oneshot::Sender<Response>>,
254}
255
256impl OutgoingRequests {
257    /// Creates a new empty outgoing request tracker.
258    #[must_use]
259    pub fn new() -> Self {
260        Self {
261            pending: HashMap::new(),
262        }
263    }
264
265    /// Registers an outgoing request and returns a receiver for the response.
266    ///
267    /// The returned receiver will receive the response value when [`complete`](Self::complete)
268    /// is called with a matching ID. If the request is cancelled via [`cancel`](Self::cancel),
269    /// the receiver will return a `RecvError`.
270    pub fn register(&mut self, id: RequestId) -> oneshot::Receiver<Response> {
271        let (tx, rx) = oneshot::channel();
272        self.pending.insert(id, tx);
273        rx
274    }
275
276    /// Completes an outgoing request by sending the response to the waiting receiver.
277    ///
278    /// Returns `true` if the request was pending and the response was sent,
279    /// `false` if the request was not found.
280    ///
281    /// Note: This returns `true` even if the receiver was dropped (the response is
282    /// still considered "completed" from the queue's perspective).
283    pub fn complete(&mut self, id: &RequestId, response: Response) -> bool {
284        if let Some(tx) = self.pending.remove(id) {
285            // Ignore send error - receiver may have been dropped
286            let _ = tx.send(response);
287            true
288        } else {
289            false
290        }
291    }
292
293    /// Cancels an outgoing request without sending a response.
294    ///
295    /// The sender is dropped, causing the receiver to return `RecvError`.
296    ///
297    /// Returns `true` if the request was pending, `false` otherwise.
298    pub fn cancel(&mut self, id: &RequestId) -> bool {
299        self.pending.remove(id).is_some()
300    }
301
302    /// Returns `true` if the request is currently pending.
303    #[must_use]
304    pub fn is_pending(&self, id: &RequestId) -> bool {
305        self.pending.contains_key(id)
306    }
307
308    /// Returns the number of currently pending requests.
309    #[must_use]
310    pub fn pending_count(&self) -> usize {
311        self.pending.len()
312    }
313}
314
315impl Default for OutgoingRequests {
316    fn default() -> Self {
317        Self::new()
318    }
319}
320
321/// Combined request queue tracking both incoming and outgoing requests.
322///
323/// This is the primary type for managing LSP request-response correlation.
324/// It provides separate tracking for:
325///
326/// - `incoming`: Requests received from clients that need responses
327/// - `outgoing`: Requests sent to clients that are awaiting responses
328///
329/// # Type Parameters
330///
331/// - `I`: Metadata type for incoming requests (e.g., handler context)
332///
333/// # Example
334///
335/// ```
336/// use lsp_server_tokio::{RequestQueue, RequestId};
337/// use tokio_util::sync::CancellationToken;
338///
339/// // Create a queue for a server that tracks method names for incoming
340/// // requests and expects JSON responses for outgoing requests
341/// let mut queue: RequestQueue<String> = RequestQueue::new();
342///
343/// // Track incoming request with cancellation token
344/// let token = CancellationToken::new();
345/// queue.incoming.register(1.into(), "textDocument/hover".to_string(), token);
346///
347/// // Operations on incoming don't affect outgoing
348/// assert_eq!(queue.incoming.pending_count(), 1);
349/// assert_eq!(queue.outgoing.pending_count(), 0);
350/// ```
351#[derive(Debug)]
352pub struct RequestQueue<I> {
353    /// Tracker for requests received from clients.
354    pub incoming: IncomingRequests<I>,
355    /// Tracker for requests sent to clients.
356    pub outgoing: OutgoingRequests,
357}
358
359impl<I> RequestQueue<I> {
360    /// Creates a new empty request queue.
361    #[must_use]
362    pub fn new() -> Self {
363        Self {
364            incoming: IncomingRequests::new(),
365            outgoing: OutgoingRequests::new(),
366        }
367    }
368}
369
370impl<I> Default for RequestQueue<I> {
371    fn default() -> Self {
372        Self::new()
373    }
374}
375
376#[cfg(test)]
377mod tests {
378    use super::*;
379    use tokio_util::sync::CancellationToken;
380
381    // ============== IncomingRequests Tests ==============
382
383    #[test]
384    fn incoming_register_and_complete() {
385        let mut incoming: IncomingRequests<String> = IncomingRequests::new();
386        let token = CancellationToken::new();
387
388        incoming.register(1.into(), "metadata".to_string(), token);
389        let data = incoming.complete(&1.into());
390
391        assert_eq!(data, Some("metadata".to_string()));
392        assert!(!incoming.is_pending(&1.into()));
393    }
394
395    #[test]
396    fn incoming_complete_unknown_returns_none() {
397        let mut incoming: IncomingRequests<String> = IncomingRequests::new();
398
399        let data = incoming.complete(&999.into());
400        assert_eq!(data, None);
401    }
402
403    #[test]
404    fn incoming_is_pending() {
405        let mut incoming: IncomingRequests<()> = IncomingRequests::new();
406
407        assert!(!incoming.is_pending(&1.into()));
408
409        let token = CancellationToken::new();
410        incoming.register(1.into(), (), token);
411        assert!(incoming.is_pending(&1.into()));
412
413        incoming.complete(&1.into());
414        assert!(!incoming.is_pending(&1.into()));
415    }
416
417    #[test]
418    fn incoming_pending_count() {
419        let mut incoming: IncomingRequests<i32> = IncomingRequests::new();
420
421        assert_eq!(incoming.pending_count(), 0);
422
423        let token1 = CancellationToken::new();
424        incoming.register(1.into(), 100, token1);
425        assert_eq!(incoming.pending_count(), 1);
426
427        let token2 = CancellationToken::new();
428        incoming.register(2.into(), 200, token2);
429        assert_eq!(incoming.pending_count(), 2);
430
431        incoming.complete(&1.into());
432        assert_eq!(incoming.pending_count(), 1);
433
434        incoming.complete(&2.into());
435        assert_eq!(incoming.pending_count(), 0);
436    }
437
438    #[test]
439    fn incoming_default() {
440        let incoming: IncomingRequests<()> = IncomingRequests::default();
441        assert_eq!(incoming.pending_count(), 0);
442    }
443
444    #[test]
445    fn incoming_cancel_triggers_token() {
446        let mut incoming: IncomingRequests<String> = IncomingRequests::new();
447        let token = CancellationToken::new();
448        let token_clone = token.clone();
449
450        incoming.register(1.into(), "data".to_string(), token);
451
452        // Cancel the request
453        assert!(incoming.cancel(&1.into()));
454
455        // Token should be cancelled
456        assert!(token_clone.is_cancelled());
457    }
458
459    #[test]
460    fn incoming_cancel_unknown_returns_false() {
461        let incoming: IncomingRequests<()> = IncomingRequests::new();
462        assert!(!incoming.cancel(&999.into()));
463    }
464
465    #[test]
466    fn incoming_cancel_idempotent() {
467        let mut incoming: IncomingRequests<()> = IncomingRequests::new();
468        let token = CancellationToken::new();
469
470        incoming.register(1.into(), (), token);
471
472        // Cancel twice - both should succeed
473        assert!(incoming.cancel(&1.into()));
474        assert!(incoming.cancel(&1.into())); // Still returns true, request still pending
475    }
476
477    #[test]
478    fn incoming_get_token_returns_clone() {
479        let mut incoming: IncomingRequests<String> = IncomingRequests::new();
480        let original_token = CancellationToken::new();
481
482        incoming.register(1.into(), "data".to_string(), original_token.clone());
483
484        // Get the token
485        let retrieved = incoming.get_token(&1.into());
486        assert!(retrieved.is_some());
487
488        // Cancel via retrieved token
489        retrieved.unwrap().cancel();
490
491        // Original should also be cancelled (they're the same underlying token)
492        assert!(original_token.is_cancelled());
493    }
494
495    #[test]
496    fn incoming_get_token_unknown_returns_none() {
497        let incoming: IncomingRequests<()> = IncomingRequests::new();
498        assert!(incoming.get_token(&999.into()).is_none());
499    }
500
501    #[test]
502    fn incoming_complete_after_cancel_returns_data() {
503        let mut incoming: IncomingRequests<String> = IncomingRequests::new();
504        let token = CancellationToken::new();
505
506        incoming.register(1.into(), "cancelled_data".to_string(), token);
507
508        // Cancel first
509        let _ = incoming.cancel(&1.into());
510
511        // Complete should still return the data
512        let data = incoming.complete(&1.into());
513        assert_eq!(data, Some("cancelled_data".to_string()));
514    }
515
516    // ============== OutgoingRequests Tests ==============
517
518    #[tokio::test]
519    async fn outgoing_register_and_complete() {
520        let mut outgoing = OutgoingRequests::new();
521        let rx = outgoing.register(1.into());
522
523        let response = Response::ok(1, serde_json::json!("response"));
524        assert!(outgoing.complete(&1.into(), response.clone()));
525        assert_eq!(rx.await.unwrap().id, response.id);
526    }
527
528    #[test]
529    fn outgoing_complete_unknown_returns_false() {
530        let mut outgoing = OutgoingRequests::new();
531
532        let result = outgoing.complete(&999.into(), Response::ok(999, serde_json::json!(null)));
533        assert!(!result);
534    }
535
536    #[tokio::test]
537    async fn outgoing_cancel_drops_sender() {
538        let mut outgoing = OutgoingRequests::new();
539        let rx = outgoing.register(1.into());
540
541        assert!(outgoing.cancel(&1.into()));
542        assert!(!outgoing.is_pending(&1.into()));
543
544        // Receiver should get an error since sender was dropped
545        assert!(rx.await.is_err());
546    }
547
548    #[test]
549    fn outgoing_cancel_unknown_returns_false() {
550        let mut outgoing = OutgoingRequests::new();
551
552        assert!(!outgoing.cancel(&999.into()));
553    }
554
555    #[test]
556    fn outgoing_is_pending() {
557        let mut outgoing = OutgoingRequests::new();
558
559        assert!(!outgoing.is_pending(&1.into()));
560
561        let _rx = outgoing.register(1.into());
562        assert!(outgoing.is_pending(&1.into()));
563
564        outgoing.complete(&1.into(), Response::ok(1, serde_json::json!(null)));
565        assert!(!outgoing.is_pending(&1.into()));
566    }
567
568    #[test]
569    fn outgoing_pending_count() {
570        let mut outgoing = OutgoingRequests::new();
571
572        assert_eq!(outgoing.pending_count(), 0);
573
574        let _rx1 = outgoing.register(1.into());
575        assert_eq!(outgoing.pending_count(), 1);
576
577        let _rx2 = outgoing.register(2.into());
578        assert_eq!(outgoing.pending_count(), 2);
579
580        outgoing.complete(&1.into(), Response::ok(1, serde_json::json!(null)));
581        assert_eq!(outgoing.pending_count(), 1);
582
583        outgoing.cancel(&2.into());
584        assert_eq!(outgoing.pending_count(), 0);
585    }
586
587    #[test]
588    fn outgoing_default() {
589        let outgoing = OutgoingRequests::default();
590        assert_eq!(outgoing.pending_count(), 0);
591    }
592
593    // ============== RequestQueue Tests ==============
594
595    #[test]
596    fn queue_new_creates_empty() {
597        let queue: RequestQueue<()> = RequestQueue::new();
598
599        assert_eq!(queue.incoming.pending_count(), 0);
600        assert_eq!(queue.outgoing.pending_count(), 0);
601    }
602
603    #[test]
604    fn queue_incoming_outgoing_independent() {
605        let mut queue: RequestQueue<String> = RequestQueue::new();
606
607        // Register on incoming
608        let token = CancellationToken::new();
609        queue
610            .incoming
611            .register(1.into(), "incoming".to_string(), token);
612        assert_eq!(queue.incoming.pending_count(), 1);
613        assert_eq!(queue.outgoing.pending_count(), 0);
614
615        // Register on outgoing
616        let _rx = queue.outgoing.register(2.into());
617        assert_eq!(queue.incoming.pending_count(), 1);
618        assert_eq!(queue.outgoing.pending_count(), 1);
619
620        // Complete incoming doesn't affect outgoing
621        queue.incoming.complete(&1.into());
622        assert_eq!(queue.incoming.pending_count(), 0);
623        assert_eq!(queue.outgoing.pending_count(), 1);
624    }
625
626    #[test]
627    fn queue_default() {
628        let queue: RequestQueue<()> = RequestQueue::default();
629        assert_eq!(queue.incoming.pending_count(), 0);
630        assert_eq!(queue.outgoing.pending_count(), 0);
631    }
632
633    #[test]
634    fn queue_with_string_request_id() {
635        let mut queue: RequestQueue<i32> = RequestQueue::new();
636
637        let str_id: RequestId = "abc-123".into();
638        let token = CancellationToken::new();
639        queue.incoming.register(str_id.clone(), 42, token);
640
641        assert!(queue.incoming.is_pending(&str_id));
642        assert_eq!(queue.incoming.complete(&str_id), Some(42));
643    }
644
645    // ============== parse_cancel_params Tests ==============
646
647    use super::parse_cancel_params;
648
649    #[test]
650    fn parse_cancel_params_integer_id() {
651        let params = Some(serde_json::json!({"id": 42}));
652        let id = parse_cancel_params(&params);
653        assert_eq!(id, Some(RequestId::Integer(42)));
654    }
655
656    #[test]
657    fn parse_cancel_params_string_id() {
658        let params = Some(serde_json::json!({"id": "request-123"}));
659        let id = parse_cancel_params(&params);
660        assert_eq!(id, Some(RequestId::String("request-123".to_string())));
661    }
662
663    #[test]
664    fn parse_cancel_params_missing_params() {
665        let id = parse_cancel_params(&None);
666        assert!(id.is_none());
667    }
668
669    #[test]
670    fn parse_cancel_params_missing_id_field() {
671        let params = Some(serde_json::json!({"other": "field"}));
672        let id = parse_cancel_params(&params);
673        assert!(id.is_none());
674    }
675
676    #[test]
677    fn parse_cancel_params_invalid_id_type() {
678        let params = Some(serde_json::json!({"id": true}));
679        let id = parse_cancel_params(&params);
680        assert!(id.is_none());
681    }
682
683    #[test]
684    fn parse_cancel_params_null_id() {
685        let params = Some(serde_json::json!({"id": null}));
686        let id = parse_cancel_params(&params);
687        assert!(id.is_none());
688    }
689}