lsp_server_tokio/request_queue.rs
1//! Request queue types for tracking pending LSP requests.
2//!
3//! This module provides types for tracking request-response correlation in both
4//! directions of LSP communication:
5//!
6//! - [`IncomingRequests`] - Tracks requests received from clients (server needs to send responses)
7//! - [`OutgoingRequests`] - Tracks requests sent to clients (server is waiting for responses)
8//! - [`RequestQueue`] - Combines both for complete request lifecycle management
9//!
10//! # Usage Pattern
11//!
12//! ```
13//! use lsp_server_tokio::{RequestQueue, RequestId};
14//! use tokio_util::sync::CancellationToken;
15//!
16//! // Create a queue with custom metadata types
17//! let mut queue: RequestQueue<String, String> = RequestQueue::new();
18//!
19//! // Track an incoming request (from client) with a cancellation token
20//! let request_id: RequestId = 1.into();
21//! let token = CancellationToken::new();
22//! queue.incoming.register(request_id.clone(), "handler_context".to_string(), token);
23//! assert!(queue.incoming.is_pending(&request_id));
24//!
25//! // When ready to respond, complete the request
26//! let metadata = queue.incoming.complete(&request_id);
27//! assert_eq!(metadata, Some("handler_context".to_string()));
28//! ```
29//!
30//! # Server-Initiated Requests
31//!
32//! ```
33//! use lsp_server_tokio::{RequestQueue, RequestId};
34//!
35//! # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
36//! let mut queue: RequestQueue<(), String> = RequestQueue::new();
37//!
38//! // Register an outgoing request (to client) and get a receiver
39//! let request_id: RequestId = 42.into();
40//! let rx = queue.outgoing.register(request_id.clone());
41//!
42//! // When response arrives, complete the request
43//! let sent = queue.outgoing.complete(&request_id, "response data".to_string());
44//! assert!(sent);
45//!
46//! // The receiver gets the response
47//! let response = rx.await.unwrap();
48//! assert_eq!(response, "response data");
49//! # });
50//! ```
51
52use std::collections::HashMap;
53use tokio::sync::oneshot;
54use tokio_util::sync::CancellationToken;
55
56use crate::request_id::RequestId;
57
58/// The method name for cancel request notifications per LSP specification.
59pub const CANCEL_REQUEST_METHOD: &str = "$/cancelRequest";
60
61/// Parses the request ID from $/cancelRequest notification params.
62///
63/// According to the LSP specification, $/cancelRequest has params:
64/// ```json
65/// { "id": number | string }
66/// ```
67///
68/// Returns `None` if params are missing, malformed, or the ID is not
69/// a valid integer or string.
70///
71/// # Example
72///
73/// ```
74/// use lsp_server_tokio::parse_cancel_params;
75/// use serde_json::json;
76///
77/// // Integer ID
78/// let params = Some(json!({"id": 42}));
79/// let id = parse_cancel_params(¶ms);
80/// assert_eq!(id, Some(42.into()));
81///
82/// // String ID
83/// let params = Some(json!({"id": "request-abc"}));
84/// let id = parse_cancel_params(¶ms);
85/// assert_eq!(id, Some("request-abc".into()));
86///
87/// // Missing params
88/// let id = parse_cancel_params(&None);
89/// assert!(id.is_none());
90/// ```
91#[must_use]
92pub fn parse_cancel_params(params: &Option<serde_json::Value>) -> Option<RequestId> {
93 let params = params.as_ref()?;
94 let id_value = params.get("id")?;
95
96 match id_value {
97 serde_json::Value::Number(n) => n
98 .as_i64()
99 .and_then(|i| i32::try_from(i).ok())
100 .map(RequestId::Integer),
101 serde_json::Value::String(s) => Some(RequestId::String(s.clone())),
102 _ => None,
103 }
104}
105
106/// Tracks requests received from clients (incoming to the server).
107///
108/// When the server receives a request, it registers the request ID along with
109/// any metadata needed to process the response. When the server is ready to
110/// send a response, it completes the request to retrieve the metadata.
111///
112/// Each request is also associated with a [`CancellationToken`] that can be
113/// used to signal cancellation (e.g., when receiving `$/cancelRequest`).
114///
115/// The generic parameter `I` represents user-defined metadata associated with
116/// each incoming request (e.g., handler context, timing info, request origin).
117///
118/// # Example
119///
120/// ```
121/// use lsp_server_tokio::{IncomingRequests, RequestId};
122/// use tokio_util::sync::CancellationToken;
123///
124/// let mut incoming: IncomingRequests<String> = IncomingRequests::new();
125///
126/// // Register a request with metadata and cancellation token
127/// let token1 = CancellationToken::new();
128/// let token2 = CancellationToken::new();
129/// incoming.register(1.into(), "textDocument/hover".to_string(), token1);
130/// incoming.register(2.into(), "textDocument/completion".to_string(), token2);
131///
132/// assert_eq!(incoming.pending_count(), 2);
133/// assert!(incoming.is_pending(&1.into()));
134///
135/// // Cancel a request
136/// incoming.cancel(&2.into());
137///
138/// // Complete request and get metadata back
139/// let method = incoming.complete(&1.into());
140/// assert_eq!(method, Some("textDocument/hover".to_string()));
141/// assert_eq!(incoming.pending_count(), 1);
142/// ```
143#[derive(Debug)]
144pub struct IncomingRequests<I> {
145 pending: HashMap<RequestId, (I, CancellationToken)>,
146}
147
148impl<I> IncomingRequests<I> {
149 /// Creates a new empty incoming request tracker.
150 #[must_use]
151 pub fn new() -> Self {
152 Self {
153 pending: HashMap::new(),
154 }
155 }
156
157 /// Registers an incoming request with associated metadata and cancellation token.
158 ///
159 /// The metadata can be any user-defined type that you want to associate
160 /// with this request until it's completed. The cancellation token can be
161 /// used to signal request cancellation to async handlers.
162 pub fn register(&mut self, id: RequestId, data: I, token: CancellationToken) {
163 self.pending.insert(id, (data, token));
164 }
165
166 /// Completes an incoming request, removing it from tracking and returning the metadata.
167 ///
168 /// Returns `Some(metadata)` if the request was pending, `None` otherwise.
169 /// The cancellation token is dropped when the request is completed.
170 pub fn complete(&mut self, id: &RequestId) -> Option<I> {
171 self.pending.remove(id).map(|(data, _)| data)
172 }
173
174 /// Returns `true` if the request is currently pending.
175 #[must_use]
176 pub fn is_pending(&self, id: &RequestId) -> bool {
177 self.pending.contains_key(id)
178 }
179
180 /// Cancels a pending request by triggering its cancellation token.
181 ///
182 /// Returns `true` if the request was found and cancelled, `false` if the
183 /// request ID was not pending. Note that cancelling an already-cancelled
184 /// token is a no-op.
185 #[must_use]
186 pub fn cancel(&self, id: &RequestId) -> bool {
187 if let Some((_, token)) = self.pending.get(id) {
188 token.cancel();
189 true
190 } else {
191 false
192 }
193 }
194
195 /// Returns a clone of the cancellation token for a pending request.
196 ///
197 /// Returns `None` if the request is not pending. The returned token
198 /// can be passed to async handlers for cooperative cancellation.
199 #[must_use]
200 pub fn get_token(&self, id: &RequestId) -> Option<CancellationToken> {
201 self.pending.get(id).map(|(_, token)| token.clone())
202 }
203
204 /// Returns the number of currently pending requests.
205 #[must_use]
206 pub fn pending_count(&self) -> usize {
207 self.pending.len()
208 }
209}
210
211impl<I> Default for IncomingRequests<I> {
212 fn default() -> Self {
213 Self::new()
214 }
215}
216
217/// Tracks requests sent to clients (outgoing from the server).
218///
219/// When the server sends a request to the client, it registers the request ID
220/// and receives a oneshot receiver. When the client's response arrives, the
221/// server completes the request, sending the response to the waiting receiver.
222///
223/// The generic parameter `O` represents the response type that will be delivered
224/// when the request completes.
225///
226/// # Example
227///
228/// ```
229/// use lsp_server_tokio::{OutgoingRequests, RequestId};
230///
231/// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
232/// let mut outgoing: OutgoingRequests<String> = OutgoingRequests::new();
233///
234/// // Register an outgoing request
235/// let rx = outgoing.register(1.into());
236/// assert!(outgoing.is_pending(&1.into()));
237///
238/// // Simulate receiving a response
239/// let completed = outgoing.complete(&1.into(), "result".to_string());
240/// assert!(completed);
241///
242/// // Receiver gets the response
243/// let result = rx.await.unwrap();
244/// assert_eq!(result, "result");
245/// # });
246/// ```
247#[derive(Debug)]
248pub struct OutgoingRequests<O> {
249 pending: HashMap<RequestId, oneshot::Sender<O>>,
250}
251
252impl<O> OutgoingRequests<O> {
253 /// Creates a new empty outgoing request tracker.
254 #[must_use]
255 pub fn new() -> Self {
256 Self {
257 pending: HashMap::new(),
258 }
259 }
260
261 /// Registers an outgoing request and returns a receiver for the response.
262 ///
263 /// The returned receiver will receive the response value when [`complete`](Self::complete)
264 /// is called with a matching ID. If the request is cancelled via [`cancel`](Self::cancel),
265 /// the receiver will return a `RecvError`.
266 pub fn register(&mut self, id: RequestId) -> oneshot::Receiver<O> {
267 let (tx, rx) = oneshot::channel();
268 self.pending.insert(id, tx);
269 rx
270 }
271
272 /// Completes an outgoing request by sending the response to the waiting receiver.
273 ///
274 /// Returns `true` if the request was pending and the response was sent,
275 /// `false` if the request was not found.
276 ///
277 /// Note: This returns `true` even if the receiver was dropped (the response is
278 /// still considered "completed" from the queue's perspective).
279 pub fn complete(&mut self, id: &RequestId, response: O) -> bool {
280 if let Some(tx) = self.pending.remove(id) {
281 // Ignore send error - receiver may have been dropped
282 let _ = tx.send(response);
283 true
284 } else {
285 false
286 }
287 }
288
289 /// Cancels an outgoing request without sending a response.
290 ///
291 /// The sender is dropped, causing the receiver to return `RecvError`.
292 ///
293 /// Returns `true` if the request was pending, `false` otherwise.
294 pub fn cancel(&mut self, id: &RequestId) -> bool {
295 self.pending.remove(id).is_some()
296 }
297
298 /// Returns `true` if the request is currently pending.
299 #[must_use]
300 pub fn is_pending(&self, id: &RequestId) -> bool {
301 self.pending.contains_key(id)
302 }
303
304 /// Returns the number of currently pending requests.
305 #[must_use]
306 pub fn pending_count(&self) -> usize {
307 self.pending.len()
308 }
309}
310
311impl<O> Default for OutgoingRequests<O> {
312 fn default() -> Self {
313 Self::new()
314 }
315}
316
317/// Combined request queue tracking both incoming and outgoing requests.
318///
319/// This is the primary type for managing LSP request-response correlation.
320/// It provides separate tracking for:
321///
322/// - `incoming`: Requests received from clients that need responses
323/// - `outgoing`: Requests sent to clients that are awaiting responses
324///
325/// # Type Parameters
326///
327/// - `I`: Metadata type for incoming requests (e.g., handler context)
328/// - `O`: Response type for outgoing requests
329///
330/// # Example
331///
332/// ```
333/// use lsp_server_tokio::{RequestQueue, RequestId};
334/// use tokio_util::sync::CancellationToken;
335///
336/// // Create a queue for a server that tracks method names for incoming
337/// // requests and expects JSON responses for outgoing requests
338/// let mut queue: RequestQueue<String, serde_json::Value> = RequestQueue::new();
339///
340/// // Track incoming request with cancellation token
341/// let token = CancellationToken::new();
342/// queue.incoming.register(1.into(), "textDocument/hover".to_string(), token);
343///
344/// // Operations on incoming don't affect outgoing
345/// assert_eq!(queue.incoming.pending_count(), 1);
346/// assert_eq!(queue.outgoing.pending_count(), 0);
347/// ```
348#[derive(Debug)]
349pub struct RequestQueue<I, O> {
350 /// Tracker for requests received from clients.
351 pub incoming: IncomingRequests<I>,
352 /// Tracker for requests sent to clients.
353 pub outgoing: OutgoingRequests<O>,
354}
355
356impl<I, O> RequestQueue<I, O> {
357 /// Creates a new empty request queue.
358 #[must_use]
359 pub fn new() -> Self {
360 Self {
361 incoming: IncomingRequests::new(),
362 outgoing: OutgoingRequests::new(),
363 }
364 }
365}
366
367impl<I, O> Default for RequestQueue<I, O> {
368 fn default() -> Self {
369 Self::new()
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376 use tokio_util::sync::CancellationToken;
377
378 // ============== IncomingRequests Tests ==============
379
380 #[test]
381 fn incoming_register_and_complete() {
382 let mut incoming: IncomingRequests<String> = IncomingRequests::new();
383 let token = CancellationToken::new();
384
385 incoming.register(1.into(), "metadata".to_string(), token);
386 let data = incoming.complete(&1.into());
387
388 assert_eq!(data, Some("metadata".to_string()));
389 assert!(!incoming.is_pending(&1.into()));
390 }
391
392 #[test]
393 fn incoming_complete_unknown_returns_none() {
394 let mut incoming: IncomingRequests<String> = IncomingRequests::new();
395
396 let data = incoming.complete(&999.into());
397 assert_eq!(data, None);
398 }
399
400 #[test]
401 fn incoming_is_pending() {
402 let mut incoming: IncomingRequests<()> = IncomingRequests::new();
403
404 assert!(!incoming.is_pending(&1.into()));
405
406 let token = CancellationToken::new();
407 incoming.register(1.into(), (), token);
408 assert!(incoming.is_pending(&1.into()));
409
410 incoming.complete(&1.into());
411 assert!(!incoming.is_pending(&1.into()));
412 }
413
414 #[test]
415 fn incoming_pending_count() {
416 let mut incoming: IncomingRequests<i32> = IncomingRequests::new();
417
418 assert_eq!(incoming.pending_count(), 0);
419
420 let token1 = CancellationToken::new();
421 incoming.register(1.into(), 100, token1);
422 assert_eq!(incoming.pending_count(), 1);
423
424 let token2 = CancellationToken::new();
425 incoming.register(2.into(), 200, token2);
426 assert_eq!(incoming.pending_count(), 2);
427
428 incoming.complete(&1.into());
429 assert_eq!(incoming.pending_count(), 1);
430
431 incoming.complete(&2.into());
432 assert_eq!(incoming.pending_count(), 0);
433 }
434
435 #[test]
436 fn incoming_default() {
437 let incoming: IncomingRequests<()> = IncomingRequests::default();
438 assert_eq!(incoming.pending_count(), 0);
439 }
440
441 #[test]
442 fn incoming_cancel_triggers_token() {
443 let mut incoming: IncomingRequests<String> = IncomingRequests::new();
444 let token = CancellationToken::new();
445 let token_clone = token.clone();
446
447 incoming.register(1.into(), "data".to_string(), token);
448
449 // Cancel the request
450 assert!(incoming.cancel(&1.into()));
451
452 // Token should be cancelled
453 assert!(token_clone.is_cancelled());
454 }
455
456 #[test]
457 fn incoming_cancel_unknown_returns_false() {
458 let incoming: IncomingRequests<()> = IncomingRequests::new();
459 assert!(!incoming.cancel(&999.into()));
460 }
461
462 #[test]
463 fn incoming_cancel_idempotent() {
464 let mut incoming: IncomingRequests<()> = IncomingRequests::new();
465 let token = CancellationToken::new();
466
467 incoming.register(1.into(), (), token);
468
469 // Cancel twice - both should succeed
470 assert!(incoming.cancel(&1.into()));
471 assert!(incoming.cancel(&1.into())); // Still returns true, request still pending
472 }
473
474 #[test]
475 fn incoming_get_token_returns_clone() {
476 let mut incoming: IncomingRequests<String> = IncomingRequests::new();
477 let original_token = CancellationToken::new();
478
479 incoming.register(1.into(), "data".to_string(), original_token.clone());
480
481 // Get the token
482 let retrieved = incoming.get_token(&1.into());
483 assert!(retrieved.is_some());
484
485 // Cancel via retrieved token
486 retrieved.unwrap().cancel();
487
488 // Original should also be cancelled (they're the same underlying token)
489 assert!(original_token.is_cancelled());
490 }
491
492 #[test]
493 fn incoming_get_token_unknown_returns_none() {
494 let incoming: IncomingRequests<()> = IncomingRequests::new();
495 assert!(incoming.get_token(&999.into()).is_none());
496 }
497
498 #[test]
499 fn incoming_complete_after_cancel_returns_data() {
500 let mut incoming: IncomingRequests<String> = IncomingRequests::new();
501 let token = CancellationToken::new();
502
503 incoming.register(1.into(), "cancelled_data".to_string(), token);
504
505 // Cancel first
506 let _ = incoming.cancel(&1.into());
507
508 // Complete should still return the data
509 let data = incoming.complete(&1.into());
510 assert_eq!(data, Some("cancelled_data".to_string()));
511 }
512
513 // ============== OutgoingRequests Tests ==============
514
515 #[tokio::test]
516 async fn outgoing_register_and_complete() {
517 let mut outgoing: OutgoingRequests<String> = OutgoingRequests::new();
518 let rx = outgoing.register(1.into());
519
520 assert!(outgoing.complete(&1.into(), "response".to_string()));
521 assert_eq!(rx.await.unwrap(), "response");
522 }
523
524 #[test]
525 fn outgoing_complete_unknown_returns_false() {
526 let mut outgoing: OutgoingRequests<String> = OutgoingRequests::new();
527
528 let result = outgoing.complete(&999.into(), "response".to_string());
529 assert!(!result);
530 }
531
532 #[tokio::test]
533 async fn outgoing_cancel_drops_sender() {
534 let mut outgoing: OutgoingRequests<String> = OutgoingRequests::new();
535 let rx = outgoing.register(1.into());
536
537 assert!(outgoing.cancel(&1.into()));
538 assert!(!outgoing.is_pending(&1.into()));
539
540 // Receiver should get an error since sender was dropped
541 assert!(rx.await.is_err());
542 }
543
544 #[test]
545 fn outgoing_cancel_unknown_returns_false() {
546 let mut outgoing: OutgoingRequests<String> = OutgoingRequests::new();
547
548 assert!(!outgoing.cancel(&999.into()));
549 }
550
551 #[test]
552 fn outgoing_is_pending() {
553 let mut outgoing: OutgoingRequests<()> = OutgoingRequests::new();
554
555 assert!(!outgoing.is_pending(&1.into()));
556
557 let _rx = outgoing.register(1.into());
558 assert!(outgoing.is_pending(&1.into()));
559
560 outgoing.complete(&1.into(), ());
561 assert!(!outgoing.is_pending(&1.into()));
562 }
563
564 #[test]
565 fn outgoing_pending_count() {
566 let mut outgoing: OutgoingRequests<i32> = OutgoingRequests::new();
567
568 assert_eq!(outgoing.pending_count(), 0);
569
570 let _rx1 = outgoing.register(1.into());
571 assert_eq!(outgoing.pending_count(), 1);
572
573 let _rx2 = outgoing.register(2.into());
574 assert_eq!(outgoing.pending_count(), 2);
575
576 outgoing.complete(&1.into(), 100);
577 assert_eq!(outgoing.pending_count(), 1);
578
579 outgoing.cancel(&2.into());
580 assert_eq!(outgoing.pending_count(), 0);
581 }
582
583 #[test]
584 fn outgoing_default() {
585 let outgoing: OutgoingRequests<()> = OutgoingRequests::default();
586 assert_eq!(outgoing.pending_count(), 0);
587 }
588
589 // ============== RequestQueue Tests ==============
590
591 #[test]
592 fn queue_new_creates_empty() {
593 let queue: RequestQueue<(), ()> = RequestQueue::new();
594
595 assert_eq!(queue.incoming.pending_count(), 0);
596 assert_eq!(queue.outgoing.pending_count(), 0);
597 }
598
599 #[test]
600 fn queue_incoming_outgoing_independent() {
601 let mut queue: RequestQueue<String, String> = RequestQueue::new();
602
603 // Register on incoming
604 let token = CancellationToken::new();
605 queue
606 .incoming
607 .register(1.into(), "incoming".to_string(), token);
608 assert_eq!(queue.incoming.pending_count(), 1);
609 assert_eq!(queue.outgoing.pending_count(), 0);
610
611 // Register on outgoing
612 let _rx = queue.outgoing.register(2.into());
613 assert_eq!(queue.incoming.pending_count(), 1);
614 assert_eq!(queue.outgoing.pending_count(), 1);
615
616 // Complete incoming doesn't affect outgoing
617 queue.incoming.complete(&1.into());
618 assert_eq!(queue.incoming.pending_count(), 0);
619 assert_eq!(queue.outgoing.pending_count(), 1);
620 }
621
622 #[test]
623 fn queue_default() {
624 let queue: RequestQueue<(), ()> = RequestQueue::default();
625 assert_eq!(queue.incoming.pending_count(), 0);
626 assert_eq!(queue.outgoing.pending_count(), 0);
627 }
628
629 #[test]
630 fn queue_with_string_request_id() {
631 let mut queue: RequestQueue<i32, i32> = RequestQueue::new();
632
633 let str_id: RequestId = "abc-123".into();
634 let token = CancellationToken::new();
635 queue.incoming.register(str_id.clone(), 42, token);
636
637 assert!(queue.incoming.is_pending(&str_id));
638 assert_eq!(queue.incoming.complete(&str_id), Some(42));
639 }
640
641 // ============== parse_cancel_params Tests ==============
642
643 use super::parse_cancel_params;
644
645 #[test]
646 fn parse_cancel_params_integer_id() {
647 let params = Some(serde_json::json!({"id": 42}));
648 let id = parse_cancel_params(¶ms);
649 assert_eq!(id, Some(RequestId::Integer(42)));
650 }
651
652 #[test]
653 fn parse_cancel_params_string_id() {
654 let params = Some(serde_json::json!({"id": "request-123"}));
655 let id = parse_cancel_params(¶ms);
656 assert_eq!(id, Some(RequestId::String("request-123".to_string())));
657 }
658
659 #[test]
660 fn parse_cancel_params_missing_params() {
661 let id = parse_cancel_params(&None);
662 assert!(id.is_none());
663 }
664
665 #[test]
666 fn parse_cancel_params_missing_id_field() {
667 let params = Some(serde_json::json!({"other": "field"}));
668 let id = parse_cancel_params(¶ms);
669 assert!(id.is_none());
670 }
671
672 #[test]
673 fn parse_cancel_params_invalid_id_type() {
674 let params = Some(serde_json::json!({"id": true}));
675 let id = parse_cancel_params(¶ms);
676 assert!(id.is_none());
677 }
678
679 #[test]
680 fn parse_cancel_params_null_id() {
681 let params = Some(serde_json::json!({"id": null}));
682 let id = parse_cancel_params(¶ms);
683 assert!(id.is_none());
684 }
685}