Skip to main content

fastmcp_transport/
lib.rs

1//! Transport layer for FastMCP.
2//!
3//! This crate provides transport implementations for MCP communication:
4//! - **Stdio**: Standard input/output (primary transport)
5//! - **SSE**: Server-Sent Events (HTTP-based streaming)
6//! - **WebSocket**: Bidirectional web sockets
7//!
8//! # Transport Design
9//!
10//! Transports are designed around asupersync's principles:
11//!
12//! - **Cancel-correctness**: All operations check cancellation via `Cx::checkpoint()`
13//! - **Two-phase sends**: Use reserve/commit pattern to prevent message loss
14//! - **Budget awareness**: Operations respect the request's budget constraints
15//!
16//! # Wire Format
17//!
18//! MCP uses newline-delimited JSON (NDJSON) for message framing:
19//! - Each message is a single line of JSON
20//! - Messages are separated by `\n`
21//! - UTF-8 encoding is required
22//!
23//! # Role in the System
24//!
25//! `fastmcp-transport` is the **I/O boundary** for FastMCP. It is deliberately
26//! protocol-agnostic: transports move `JsonRpcMessage` values in and out while
27//! the server/client layers handle semantics. This keeps transport
28//! implementations small, testable, and reusable.
29//!
30//! If you need to add a new transport (for example, QUIC or a custom IPC),
31//! this is the crate to extend.
32
33#![forbid(unsafe_code)]
34#![allow(dead_code)]
35
36mod async_io;
37mod codec;
38pub mod event_store;
39pub mod http;
40pub mod memory;
41pub mod sse;
42mod stdio;
43pub mod websocket;
44
45pub use async_io::{AsyncLineReader, AsyncStdin, AsyncStdout};
46
47pub use codec::{Codec, CodecError};
48pub use stdio::{AsyncStdioTransport, StdioTransport};
49
50use asupersync::Cx;
51use fastmcp_protocol::{JsonRpcMessage, JsonRpcRequest, JsonRpcResponse};
52
53/// Transport trait for cancel-correct message passing.
54///
55/// All transports must integrate with asupersync's capability context (`Cx`)
56/// for cancellation checking and budget enforcement.
57///
58/// # Cancel-Safety
59///
60/// Implementations should:
61/// - Call `cx.checkpoint()` before blocking operations
62/// - Use two-phase patterns (reserve/commit) where applicable
63/// - Respect budget constraints from the context
64///
65/// # Example
66///
67/// ```ignore
68/// impl Transport for MyTransport {
69///     fn send(&mut self, cx: &Cx, msg: &JsonRpcMessage) -> Result<(), TransportError> {
70///         cx.checkpoint()?;  // Check for cancellation
71///         let bytes = self.codec.encode(msg)?;
72///         self.write_all(&bytes)?;
73///         Ok(())
74///     }
75/// }
76/// ```
77pub trait Transport {
78    /// Send a JSON-RPC message through this transport.
79    ///
80    /// # Cancel-Safety
81    ///
82    /// This operation checks for cancellation before sending.
83    /// If cancelled, the message is not sent.
84    ///
85    /// # Errors
86    ///
87    /// Returns an error if the transport is closed, an I/O error occurs,
88    /// or the request has been cancelled.
89    fn send(&mut self, cx: &Cx, message: &JsonRpcMessage) -> Result<(), TransportError>;
90
91    /// Receive the next JSON-RPC message from this transport.
92    ///
93    /// # Cancel-Safety
94    ///
95    /// This operation checks for cancellation while waiting for data.
96    /// If cancelled, returns `TransportError::Cancelled`.
97    ///
98    /// # Errors
99    ///
100    /// Returns an error if the transport is closed, an I/O error occurs,
101    /// or the request has been cancelled.
102    fn recv(&mut self, cx: &Cx) -> Result<JsonRpcMessage, TransportError>;
103
104    /// Send a request through this transport.
105    ///
106    /// Convenience method that wraps a request in a message.
107    fn send_request(&mut self, cx: &Cx, request: &JsonRpcRequest) -> Result<(), TransportError> {
108        self.send(cx, &JsonRpcMessage::Request(request.clone()))
109    }
110
111    /// Send a response through this transport.
112    ///
113    /// Convenience method that wraps a response in a message.
114    fn send_response(&mut self, cx: &Cx, response: &JsonRpcResponse) -> Result<(), TransportError> {
115        self.send(cx, &JsonRpcMessage::Response(response.clone()))
116    }
117
118    /// Close the transport gracefully.
119    ///
120    /// This flushes any pending data and releases resources.
121    fn close(&mut self) -> Result<(), TransportError>;
122}
123
124/// Transport error types.
125#[derive(Debug)]
126pub enum TransportError {
127    /// I/O error during read or write.
128    Io(std::io::Error),
129    /// Transport was closed (EOF or explicit close).
130    Closed,
131    /// Codec error (JSON parsing or encoding).
132    Codec(CodecError),
133    /// Connection timeout.
134    Timeout,
135    /// Request was cancelled.
136    Cancelled,
137}
138
139impl TransportError {
140    /// Returns true if this is a cancellation error.
141    #[must_use]
142    pub fn is_cancelled(&self) -> bool {
143        matches!(self, TransportError::Cancelled)
144    }
145
146    /// Returns true if this is an EOF/closed condition.
147    #[must_use]
148    pub fn is_closed(&self) -> bool {
149        matches!(self, TransportError::Closed)
150    }
151}
152
153impl std::fmt::Display for TransportError {
154    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155        match self {
156            TransportError::Io(e) => write!(f, "I/O error: {e}"),
157            TransportError::Closed => write!(f, "Transport closed"),
158            TransportError::Codec(e) => write!(f, "Codec error: {e}"),
159            TransportError::Timeout => write!(f, "Connection timeout"),
160            TransportError::Cancelled => write!(f, "Request cancelled"),
161        }
162    }
163}
164
165impl std::error::Error for TransportError {
166    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
167        match self {
168            TransportError::Io(e) => Some(e),
169            TransportError::Codec(e) => Some(e),
170            _ => None,
171        }
172    }
173}
174
175impl From<std::io::Error> for TransportError {
176    fn from(err: std::io::Error) -> Self {
177        TransportError::Io(err)
178    }
179}
180
181impl From<CodecError> for TransportError {
182    fn from(err: CodecError) -> Self {
183        TransportError::Codec(err)
184    }
185}
186
187// =============================================================================
188// Two-Phase Send Protocol
189// =============================================================================
190
191/// A permit for sending a message via two-phase commit.
192///
193/// This implements the reserve/commit pattern for cancel-safe message sending:
194/// 1. **Reserve**: Allocate the permit (cancellable)
195/// 2. **Commit**: Send the message (infallible after reserve)
196///
197/// # Cancel-Safety
198///
199/// The reservation phase is the cancellation point. Once you have a permit,
200/// the send will complete. This ensures no message loss on cancellation:
201///
202/// ```ignore
203/// // Cancel-safe pattern:
204/// let permit = transport.reserve_send(cx)?;  // Can be cancelled here
205/// permit.send(message);                       // Always succeeds
206/// ```
207///
208/// # Example
209///
210/// ```ignore
211/// use fastmcp_transport::{AsyncStdioTransport, TwoPhaseTransport};
212/// use asupersync::Cx;
213///
214/// let mut transport = AsyncStdioTransport::new();
215/// let cx = Cx::for_testing();
216///
217/// // Reserve a send slot (cancellable)
218/// let permit = transport.reserve_send(&cx)?;
219///
220/// // At this point, we're committed - send is infallible
221/// permit.send(&JsonRpcMessage::Request(request));
222/// ```
223pub struct SendPermit<'a, W: std::io::Write> {
224    writer: &'a mut W,
225    codec: &'a Codec,
226}
227
228impl<'a, W: std::io::Write> SendPermit<'a, W> {
229    /// Creates a new send permit.
230    ///
231    /// This is an internal constructor. Use `TwoPhaseTransport::reserve_send()`
232    /// to obtain a permit.
233    fn new(writer: &'a mut W, codec: &'a Codec) -> Self {
234        Self { writer, codec }
235    }
236
237    /// Commits the send by writing the message.
238    ///
239    /// This method is synchronous and, from the protocol's perspective,
240    /// infallible after reservation. I/O errors are returned but the
241    /// reservation is consumed regardless.
242    ///
243    /// # Errors
244    ///
245    /// Returns an error if the underlying write fails. However, the permit
246    /// is consumed and the reservation is released.
247    pub fn send(self, message: &JsonRpcMessage) -> Result<(), TransportError> {
248        let bytes = match message {
249            JsonRpcMessage::Request(req) => self.codec.encode_request(req)?,
250            JsonRpcMessage::Response(resp) => self.codec.encode_response(resp)?,
251        };
252
253        self.writer.write_all(&bytes)?;
254        self.writer.flush()?;
255        Ok(())
256    }
257
258    /// Commits the send by writing a request.
259    ///
260    /// Convenience method for sending a request directly.
261    ///
262    /// # Errors
263    ///
264    /// Returns an error if the underlying write fails.
265    pub fn send_request(self, request: &JsonRpcRequest) -> Result<(), TransportError> {
266        let bytes = self.codec.encode_request(request)?;
267        self.writer.write_all(&bytes)?;
268        self.writer.flush()?;
269        Ok(())
270    }
271
272    /// Commits the send by writing a response.
273    ///
274    /// Convenience method for sending a response directly.
275    ///
276    /// # Errors
277    ///
278    /// Returns an error if the underlying write fails.
279    pub fn send_response(self, response: &JsonRpcResponse) -> Result<(), TransportError> {
280        let bytes = self.codec.encode_response(response)?;
281        self.writer.write_all(&bytes)?;
282        self.writer.flush()?;
283        Ok(())
284    }
285}
286
287/// Extension trait for two-phase send operations.
288///
289/// This trait adds the reserve/commit pattern to transports. The pattern
290/// ensures cancel-safety by making the reservation the cancellation point:
291///
292/// - **Reserve phase**: Check cancellation, allocate resources
293/// - **Commit phase**: Actually send (synchronous, infallible from protocol perspective)
294///
295/// # Why Two-Phase?
296///
297/// Without two-phase:
298/// ```ignore
299/// // BROKEN: Can lose messages on cancel
300/// async fn bad_send(cx: &Cx, msg: Message) {
301///     let serialized = serialize(&msg);  // Work done
302///     cx.checkpoint()?;                   // Cancel here = lost message!
303///     writer.write(&serialized).await;
304/// }
305/// ```
306///
307/// With two-phase:
308/// ```ignore
309/// // CORRECT: Either fully sent or not started
310/// async fn good_send(cx: &Cx, msg: Message) {
311///     let permit = transport.reserve_send(cx)?;  // Cancel here = no work lost
312///     // After reserve, commit is synchronous and infallible
313///     permit.send(msg);
314/// }
315/// ```
316pub trait TwoPhaseTransport: Transport {
317    /// The writer type for permits.
318    type Writer: std::io::Write;
319
320    /// Reserve a send slot.
321    ///
322    /// This is the cancellation point for sends. If this succeeds, the
323    /// subsequent `permit.send()` will complete.
324    ///
325    /// # Errors
326    ///
327    /// Returns `TransportError::Cancelled` if the request has been cancelled.
328    fn reserve_send(&mut self, cx: &Cx) -> Result<SendPermit<'_, Self::Writer>, TransportError>;
329}
330
331#[cfg(test)]
332mod tests {
333    use super::{Codec, CodecError, SendPermit, Transport, TransportError, TwoPhaseTransport};
334    use asupersync::Cx;
335    use fastmcp_protocol::{JsonRpcMessage, JsonRpcRequest, JsonRpcResponse, RequestId};
336    use std::error::Error;
337
338    #[derive(Default)]
339    struct RecordingTransport {
340        sent: Vec<JsonRpcMessage>,
341        closed: bool,
342    }
343
344    impl Transport for RecordingTransport {
345        fn send(&mut self, _cx: &Cx, message: &JsonRpcMessage) -> Result<(), TransportError> {
346            self.sent.push(message.clone());
347            Ok(())
348        }
349
350        fn recv(&mut self, _cx: &Cx) -> Result<JsonRpcMessage, TransportError> {
351            Err(TransportError::Closed)
352        }
353
354        fn close(&mut self) -> Result<(), TransportError> {
355            self.closed = true;
356            Ok(())
357        }
358    }
359
360    struct TwoPhaseFixture {
361        writer: Vec<u8>,
362        codec: Codec,
363    }
364
365    impl Default for TwoPhaseFixture {
366        fn default() -> Self {
367            Self {
368                writer: Vec::new(),
369                codec: Codec::new(),
370            }
371        }
372    }
373
374    impl Transport for TwoPhaseFixture {
375        fn send(&mut self, cx: &Cx, message: &JsonRpcMessage) -> Result<(), TransportError> {
376            let permit = self.reserve_send(cx)?;
377            permit.send(message)
378        }
379
380        fn recv(&mut self, _cx: &Cx) -> Result<JsonRpcMessage, TransportError> {
381            Err(TransportError::Closed)
382        }
383
384        fn close(&mut self) -> Result<(), TransportError> {
385            Ok(())
386        }
387    }
388
389    impl TwoPhaseTransport for TwoPhaseFixture {
390        type Writer = Vec<u8>;
391
392        fn reserve_send(
393            &mut self,
394            cx: &Cx,
395        ) -> Result<SendPermit<'_, Self::Writer>, TransportError> {
396            if cx.is_cancel_requested() {
397                return Err(TransportError::Cancelled);
398            }
399
400            Ok(SendPermit::new(&mut self.writer, &self.codec))
401        }
402    }
403
404    fn test_error(message: &str) -> Box<dyn Error> {
405        std::io::Error::other(message).into()
406    }
407
408    fn require(condition: bool, message: &str) -> Result<(), Box<dyn Error>> {
409        if condition {
410            Ok(())
411        } else {
412            Err(test_error(message))
413        }
414    }
415
416    #[test]
417    fn transport_error_predicates_match_variants() -> Result<(), Box<dyn Error>> {
418        require(
419            TransportError::Cancelled.is_cancelled(),
420            "cancelled flag mismatch",
421        )?;
422        require(
423            !TransportError::Timeout.is_cancelled(),
424            "timeout should not be cancelled",
425        )?;
426        require(TransportError::Closed.is_closed(), "closed flag mismatch")?;
427        require(
428            !TransportError::Timeout.is_closed(),
429            "timeout should not be closed",
430        )?;
431        Ok(())
432    }
433
434    #[test]
435    fn transport_error_display_and_source_are_exposed() -> Result<(), Box<dyn Error>> {
436        let io_error = std::io::Error::other("write failed");
437        let io_transport_error = TransportError::Io(io_error);
438        require(
439            io_transport_error.to_string() == "I/O error: write failed",
440            "io display mismatch",
441        )?;
442        require(
443            io_transport_error.source().is_some(),
444            "io source should exist",
445        )?;
446
447        let json_error = match serde_json::from_str::<serde_json::Value>("not json") {
448            Err(err) => err,
449            Ok(_) => return Err(test_error("invalid json unexpectedly parsed")),
450        };
451        let codec_error = CodecError::from(json_error);
452        let codec_transport_error = TransportError::Codec(codec_error);
453        require(
454            codec_transport_error
455                .to_string()
456                .starts_with("Codec error: JSON error:"),
457            "codec display mismatch",
458        )?;
459        require(
460            codec_transport_error.source().is_some(),
461            "codec source should exist",
462        )?;
463
464        require(
465            TransportError::Timeout.source().is_none(),
466            "timeout should not have source",
467        )?;
468        require(
469            TransportError::Closed.source().is_none(),
470            "closed should not have source",
471        )?;
472        require(
473            TransportError::Cancelled.source().is_none(),
474            "cancelled should not have source",
475        )?;
476        Ok(())
477    }
478
479    #[test]
480    fn transport_error_from_conversions_wrap_underlying_types() -> Result<(), Box<dyn Error>> {
481        let io_transport_error = TransportError::from(std::io::Error::other("socket closed"));
482        require(
483            matches!(io_transport_error, TransportError::Io(_)),
484            "io conversion mismatch",
485        )?;
486
487        let json_error = match serde_json::from_str::<serde_json::Value>("bad json") {
488            Err(err) => err,
489            Ok(_) => return Err(test_error("invalid json unexpectedly parsed")),
490        };
491        let codec_transport_error = TransportError::from(CodecError::from(json_error));
492        require(
493            matches!(codec_transport_error, TransportError::Codec(_)),
494            "codec conversion mismatch",
495        )?;
496        Ok(())
497    }
498
499    #[test]
500    fn send_request_wraps_request_message() -> Result<(), Box<dyn Error>> {
501        let mut transport = RecordingTransport::default();
502        let cx = Cx::for_testing();
503        let request = JsonRpcRequest::new("tools/list", None, 7i64);
504
505        transport.send_request(&cx, &request)?;
506
507        require(transport.sent.len() == 1, "expected one sent message")?;
508        match &transport.sent[0] {
509            JsonRpcMessage::Request(req) => {
510                require(req.method == "tools/list", "request method mismatch")?;
511                require(
512                    req.id == Some(RequestId::Number(7)),
513                    "request id mismatch for wrapped message",
514                )?;
515            }
516            JsonRpcMessage::Response(_) => {
517                return Err(test_error("expected request message"));
518            }
519        }
520        Ok(())
521    }
522
523    #[test]
524    fn send_response_wraps_response_message() -> Result<(), Box<dyn Error>> {
525        let mut transport = RecordingTransport::default();
526        let cx = Cx::for_testing();
527        let response = JsonRpcResponse::success(
528            RequestId::Number(9),
529            serde_json::json!({"server": "fastmcp"}),
530        );
531
532        transport.send_response(&cx, &response)?;
533
534        require(transport.sent.len() == 1, "expected one sent message")?;
535        match &transport.sent[0] {
536            JsonRpcMessage::Response(resp) => {
537                require(
538                    resp.id == Some(RequestId::Number(9)),
539                    "response id mismatch for wrapped message",
540                )?;
541            }
542            JsonRpcMessage::Request(_) => {
543                return Err(test_error("expected response message"));
544            }
545        }
546        Ok(())
547    }
548
549    #[test]
550    fn send_permit_writes_request_bytes() -> Result<(), Box<dyn Error>> {
551        let cx = Cx::for_testing();
552        let mut fixture = TwoPhaseFixture::default();
553        let request = JsonRpcRequest::new("resources/list", None, 11i64);
554
555        let permit = fixture.reserve_send(&cx)?;
556        permit.send_request(&request)?;
557
558        let mut decode_codec = Codec::new();
559        let messages = decode_codec.decode(&fixture.writer)?;
560        require(messages.len() == 1, "expected one decoded message")?;
561        match &messages[0] {
562            JsonRpcMessage::Request(req) => {
563                require(
564                    req.method == "resources/list",
565                    "decoded request method mismatch",
566                )?;
567                require(
568                    req.id == Some(RequestId::Number(11)),
569                    "decoded request id mismatch",
570                )?;
571            }
572            JsonRpcMessage::Response(_) => {
573                return Err(test_error("expected request message"));
574            }
575        }
576        Ok(())
577    }
578
579    #[test]
580    fn send_permit_writes_response_bytes() -> Result<(), Box<dyn Error>> {
581        let cx = Cx::for_testing();
582        let mut fixture = TwoPhaseFixture::default();
583        let response =
584            JsonRpcResponse::success(RequestId::Number(22), serde_json::json!({"status": "ok"}));
585
586        let permit = fixture.reserve_send(&cx)?;
587        permit.send_response(&response)?;
588
589        let mut decode_codec = Codec::new();
590        let messages = decode_codec.decode(&fixture.writer)?;
591        require(messages.len() == 1, "expected one decoded message")?;
592        match &messages[0] {
593            JsonRpcMessage::Response(resp) => {
594                require(
595                    resp.id == Some(RequestId::Number(22)),
596                    "decoded response id mismatch",
597                )?;
598            }
599            JsonRpcMessage::Request(_) => {
600                return Err(test_error("expected response message"));
601            }
602        }
603        Ok(())
604    }
605
606    #[test]
607    fn reserve_send_returns_cancelled_when_context_is_cancelled() -> Result<(), Box<dyn Error>> {
608        let cx = Cx::for_testing();
609        cx.set_cancel_requested(true);
610        let mut fixture = TwoPhaseFixture::default();
611
612        let result = fixture.reserve_send(&cx);
613
614        match result {
615            Err(TransportError::Cancelled) => Ok(()),
616            _ => Err(test_error("reserve_send should return cancelled")),
617        }
618    }
619
620    #[test]
621    fn recording_transport_close() {
622        let mut transport = RecordingTransport::default();
623        assert!(!transport.closed);
624        transport.close().unwrap();
625        assert!(transport.closed);
626    }
627
628    #[test]
629    fn recording_transport_recv_returns_closed() {
630        let mut transport = RecordingTransport::default();
631        let cx = Cx::for_testing();
632        let result = transport.recv(&cx);
633        assert!(matches!(result, Err(TransportError::Closed)));
634    }
635
636    #[test]
637    fn transport_error_display_all_variants() {
638        assert_eq!(TransportError::Closed.to_string(), "Transport closed");
639        assert_eq!(TransportError::Timeout.to_string(), "Connection timeout");
640        assert_eq!(TransportError::Cancelled.to_string(), "Request cancelled");
641    }
642
643    #[test]
644    fn transport_error_is_cancelled_false_for_other_variants() {
645        assert!(!TransportError::Closed.is_cancelled());
646        assert!(!TransportError::Io(std::io::Error::other("err")).is_cancelled());
647        assert!(!TransportError::Codec(CodecError::MessageTooLarge(1)).is_cancelled());
648    }
649
650    #[test]
651    fn transport_error_is_closed_false_for_other_variants() {
652        assert!(!TransportError::Cancelled.is_closed());
653        assert!(!TransportError::Timeout.is_closed());
654        assert!(!TransportError::Io(std::io::Error::other("err")).is_closed());
655        assert!(!TransportError::Codec(CodecError::MessageTooLarge(1)).is_closed());
656    }
657
658    #[test]
659    fn send_permit_sends_request_as_message() -> Result<(), Box<dyn Error>> {
660        let cx = Cx::for_testing();
661        let mut fixture = TwoPhaseFixture::default();
662        let request = JsonRpcRequest::new("tools/call", None, 1i64);
663
664        let permit = fixture.reserve_send(&cx)?;
665        permit.send(&JsonRpcMessage::Request(request))?;
666
667        let mut decode_codec = Codec::new();
668        let messages = decode_codec.decode(&fixture.writer)?;
669        require(messages.len() == 1, "expected one decoded message")?;
670        match &messages[0] {
671            JsonRpcMessage::Request(req) => {
672                require(req.method == "tools/call", "method mismatch")?;
673            }
674            _ => return Err(test_error("expected request")),
675        }
676        Ok(())
677    }
678
679    #[test]
680    fn send_permit_sends_response_as_message() -> Result<(), Box<dyn Error>> {
681        let cx = Cx::for_testing();
682        let mut fixture = TwoPhaseFixture::default();
683        let response =
684            JsonRpcResponse::success(RequestId::Number(5), serde_json::json!({"ok": true}));
685
686        let permit = fixture.reserve_send(&cx)?;
687        permit.send(&JsonRpcMessage::Response(response))?;
688
689        let mut decode_codec = Codec::new();
690        let messages = decode_codec.decode(&fixture.writer)?;
691        require(messages.len() == 1, "expected one decoded message")?;
692        assert!(matches!(&messages[0], JsonRpcMessage::Response(_)));
693        Ok(())
694    }
695
696    #[test]
697    fn send_multiple_messages_via_transport() {
698        let mut transport = RecordingTransport::default();
699        let cx = Cx::for_testing();
700
701        for i in 0..5 {
702            let request = JsonRpcRequest::new(format!("method/{i}"), None, i as i64);
703            transport.send_request(&cx, &request).unwrap();
704        }
705
706        assert_eq!(transport.sent.len(), 5);
707        for (i, msg) in transport.sent.iter().enumerate() {
708            if let JsonRpcMessage::Request(req) = msg {
709                assert_eq!(req.method, format!("method/{i}"));
710            } else {
711                panic!("expected request at index {i}");
712            }
713        }
714    }
715
716    #[test]
717    fn two_phase_fixture_send_via_transport_trait() -> Result<(), Box<dyn Error>> {
718        let cx = Cx::for_testing();
719        let mut fixture = TwoPhaseFixture::default();
720        let request = JsonRpcRequest::new("test/method", None, 42i64);
721
722        // Use the Transport::send method which delegates to reserve_send
723        fixture.send(&cx, &JsonRpcMessage::Request(request))?;
724
725        let mut decode_codec = Codec::new();
726        let messages = decode_codec.decode(&fixture.writer)?;
727        require(messages.len() == 1, "expected one decoded message")?;
728        Ok(())
729    }
730
731    #[test]
732    fn two_phase_fixture_close_succeeds() {
733        let mut fixture = TwoPhaseFixture::default();
734        assert!(fixture.close().is_ok());
735    }
736
737    #[test]
738    fn two_phase_multiple_sends() -> Result<(), Box<dyn Error>> {
739        let cx = Cx::for_testing();
740        let mut fixture = TwoPhaseFixture::default();
741
742        // Send multiple messages through two-phase
743        for i in 0..3 {
744            let permit = fixture.reserve_send(&cx)?;
745            let request = JsonRpcRequest::new(format!("method/{i}"), None, i as i64);
746            permit.send_request(&request)?;
747        }
748
749        let mut decode_codec = Codec::new();
750        let messages = decode_codec.decode(&fixture.writer)?;
751        require(messages.len() == 3, "expected three decoded messages")?;
752        Ok(())
753    }
754
755    #[test]
756    fn send_permit_notification_without_id() -> Result<(), Box<dyn Error>> {
757        let cx = Cx::for_testing();
758        let mut fixture = TwoPhaseFixture::default();
759        let notification = JsonRpcRequest::notification("notifications/progress", None);
760
761        let permit = fixture.reserve_send(&cx)?;
762        permit.send_request(&notification)?;
763
764        let mut decode_codec = Codec::new();
765        let messages = decode_codec.decode(&fixture.writer)?;
766        require(messages.len() == 1, "expected one decoded message")?;
767        if let JsonRpcMessage::Request(req) = &messages[0] {
768            require(req.id.is_none(), "notification should have no id")?;
769        }
770        Ok(())
771    }
772}