rapace_transport_mem/
lib.rs

1//! rapace-transport-mem: In-process transport for rapace.
2//!
3//! This is the **semantic reference** implementation. All other transports
4//! must behave identically to this one. If behavior differs, the other
5//! transport has a bug.
6//!
7//! # Characteristics
8//!
9//! - Frames are passed through async channels (no serialization)
10//! - Real Rust lifetimes for in-process calls
11//! - Full RPC semantics (channels, deadlines, cancellation)
12//!
13//! # Usage
14//!
15//! ```ignore
16//! let (client_transport, server_transport) = InProcTransport::pair();
17//! ```
18
19use std::sync::Arc;
20
21use parking_lot::Mutex;
22use rapace_core::{
23    DecodeError, EncodeCtx, EncodeError, Frame, FrameView, MsgDescHot, Transport, TransportError,
24};
25use tokio::sync::mpsc;
26
27/// Channel capacity for the in-proc transport.
28const CHANNEL_CAPACITY: usize = 64;
29
30/// In-process transport implementation.
31///
32/// This transport passes frames through async channels without serialization.
33/// It serves as the semantic reference for correct RPC behavior.
34pub struct InProcTransport {
35    inner: Arc<InProcInner>,
36}
37
38struct InProcInner {
39    /// Channel to send frames to the peer.
40    tx: mpsc::Sender<Frame>,
41    /// Channel to receive frames from the peer.
42    rx: tokio::sync::Mutex<mpsc::Receiver<Frame>>,
43    /// Most recently received frame (for FrameView lifetime).
44    /// Using parking_lot for sync access in recv_frame.
45    last_frame: Mutex<Option<Frame>>,
46    /// Whether the transport is closed.
47    closed: std::sync::atomic::AtomicBool,
48}
49
50impl InProcTransport {
51    /// Create a connected pair of in-proc transports.
52    ///
53    /// Returns (A, B) where frames sent on A are received on B and vice versa.
54    pub fn pair() -> (Self, Self) {
55        let (tx_a, rx_a) = mpsc::channel(CHANNEL_CAPACITY);
56        let (tx_b, rx_b) = mpsc::channel(CHANNEL_CAPACITY);
57
58        let inner_a = Arc::new(InProcInner {
59            tx: tx_b, // A sends to B's receiver
60            rx: tokio::sync::Mutex::new(rx_a),
61            last_frame: Mutex::new(None),
62            closed: std::sync::atomic::AtomicBool::new(false),
63        });
64
65        let inner_b = Arc::new(InProcInner {
66            tx: tx_a, // B sends to A's receiver
67            rx: tokio::sync::Mutex::new(rx_b),
68            last_frame: Mutex::new(None),
69            closed: std::sync::atomic::AtomicBool::new(false),
70        });
71
72        (Self { inner: inner_a }, Self { inner: inner_b })
73    }
74
75    /// Check if the transport is closed.
76    pub fn is_closed(&self) -> bool {
77        self.inner.closed.load(std::sync::atomic::Ordering::Acquire)
78    }
79}
80
81impl Transport for InProcTransport {
82    async fn send_frame(&self, frame: &Frame) -> Result<(), TransportError> {
83        if self.is_closed() {
84            return Err(TransportError::Closed);
85        }
86
87        // Clone the frame for sending (in-proc still needs ownership transfer)
88        self.inner
89            .tx
90            .send(frame.clone())
91            .await
92            .map_err(|_| TransportError::Closed)
93    }
94
95    async fn recv_frame(&self) -> Result<FrameView<'_>, TransportError> {
96        if self.is_closed() {
97            return Err(TransportError::Closed);
98        }
99
100        // First, receive the frame
101        let frame = {
102            let mut rx = self.inner.rx.lock().await;
103            rx.recv().await.ok_or(TransportError::Closed)?
104        };
105
106        // Store in last_frame
107        {
108            let mut last = self.inner.last_frame.lock();
109            *last = Some(frame);
110        }
111
112        // Now borrow from last_frame with lifetime tied to &self
113        // SAFETY: last_frame lives as long as self.inner which lives as long as self.
114        // The MutexGuard is dropped but the data remains in the Arc.
115        // Caller must not call recv_frame again before dropping the FrameView,
116        // which is enforced by the &self borrow in the return type.
117        let last = self.inner.last_frame.lock();
118        let frame_ref = last.as_ref().unwrap();
119
120        // We need to extend the lifetime. This is sound because:
121        // 1. The frame is stored in self.inner.last_frame
122        // 2. self.inner is Arc'd and lives as long as self
123        // 3. The returned FrameView borrows &self, preventing another recv_frame call
124        let desc_ptr = &frame_ref.desc as *const MsgDescHot;
125        let payload_ptr = frame_ref.payload().as_ptr();
126        let payload_len = frame_ref.payload().len();
127
128        // SAFETY: Extending lifetime is safe because:
129        // - Data lives in Arc<InProcInner> which outlives 'self
130        // - FrameView borrows &'self, preventing concurrent recv_frame
131        let desc: &MsgDescHot = unsafe { &*desc_ptr };
132        let payload: &[u8] = unsafe { std::slice::from_raw_parts(payload_ptr, payload_len) };
133
134        Ok(FrameView::new(desc, payload))
135    }
136
137    fn encoder(&self) -> Box<dyn EncodeCtx + '_> {
138        Box::new(InProcEncoder::new())
139    }
140
141    async fn close(&self) -> Result<(), TransportError> {
142        self.inner
143            .closed
144            .store(true, std::sync::atomic::Ordering::Release);
145        Ok(())
146    }
147}
148
149/// Encoder for in-proc transport.
150///
151/// Simply accumulates bytes into a Vec since no serialization is needed.
152pub struct InProcEncoder {
153    desc: MsgDescHot,
154    payload: Vec<u8>,
155}
156
157impl InProcEncoder {
158    fn new() -> Self {
159        Self {
160            desc: MsgDescHot::new(),
161            payload: Vec::new(),
162        }
163    }
164
165    /// Set the descriptor for this frame.
166    pub fn set_desc(&mut self, desc: MsgDescHot) {
167        self.desc = desc;
168    }
169}
170
171impl EncodeCtx for InProcEncoder {
172    fn encode_bytes(&mut self, bytes: &[u8]) -> Result<(), EncodeError> {
173        self.payload.extend_from_slice(bytes);
174        Ok(())
175    }
176
177    fn finish(self: Box<Self>) -> Result<Frame, EncodeError> {
178        Ok(Frame::with_payload(self.desc, self.payload))
179    }
180}
181
182/// Decoder for in-proc transport.
183pub struct InProcDecoder<'a> {
184    data: &'a [u8],
185    pos: usize,
186}
187
188impl<'a> InProcDecoder<'a> {
189    /// Create a new decoder from a byte slice.
190    pub fn new(data: &'a [u8]) -> Self {
191        Self { data, pos: 0 }
192    }
193}
194
195impl<'a> rapace_core::DecodeCtx<'a> for InProcDecoder<'a> {
196    fn decode_bytes(&mut self) -> Result<&'a [u8], DecodeError> {
197        let result = &self.data[self.pos..];
198        self.pos = self.data.len();
199        Ok(result)
200    }
201
202    fn remaining(&self) -> &'a [u8] {
203        &self.data[self.pos..]
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210    use rapace_core::FrameFlags;
211
212    #[tokio::test]
213    async fn test_pair_creation() {
214        let (a, b) = InProcTransport::pair();
215        assert!(!a.is_closed());
216        assert!(!b.is_closed());
217    }
218
219    #[tokio::test]
220    async fn test_send_recv_inline() {
221        let (a, b) = InProcTransport::pair();
222
223        // Create a frame with inline payload
224        let mut desc = MsgDescHot::new();
225        desc.msg_id = 1;
226        desc.channel_id = 1;
227        desc.method_id = 42;
228        desc.flags = FrameFlags::DATA;
229
230        let frame = Frame::with_inline_payload(desc, b"hello").unwrap();
231
232        // Send from A
233        a.send_frame(&frame).await.unwrap();
234
235        // Receive on B
236        let view = b.recv_frame().await.unwrap();
237        assert_eq!(view.desc.msg_id, 1);
238        assert_eq!(view.desc.channel_id, 1);
239        assert_eq!(view.desc.method_id, 42);
240        assert_eq!(view.payload, b"hello");
241    }
242
243    #[tokio::test]
244    async fn test_send_recv_external_payload() {
245        let (a, b) = InProcTransport::pair();
246
247        let mut desc = MsgDescHot::new();
248        desc.msg_id = 2;
249        desc.flags = FrameFlags::DATA;
250
251        let payload = vec![0u8; 1000]; // Larger than inline
252        let frame = Frame::with_payload(desc, payload.clone());
253
254        a.send_frame(&frame).await.unwrap();
255
256        let view = b.recv_frame().await.unwrap();
257        assert_eq!(view.desc.msg_id, 2);
258        assert_eq!(view.payload.len(), 1000);
259    }
260
261    #[tokio::test]
262    async fn test_bidirectional() {
263        let (a, b) = InProcTransport::pair();
264
265        // A -> B
266        let mut desc_a = MsgDescHot::new();
267        desc_a.msg_id = 1;
268        let frame_a = Frame::with_inline_payload(desc_a, b"from A").unwrap();
269        a.send_frame(&frame_a).await.unwrap();
270
271        // B -> A
272        let mut desc_b = MsgDescHot::new();
273        desc_b.msg_id = 2;
274        let frame_b = Frame::with_inline_payload(desc_b, b"from B").unwrap();
275        b.send_frame(&frame_b).await.unwrap();
276
277        // Receive both
278        let view_b = b.recv_frame().await.unwrap();
279        assert_eq!(view_b.payload, b"from A");
280
281        let view_a = a.recv_frame().await.unwrap();
282        assert_eq!(view_a.payload, b"from B");
283    }
284
285    #[tokio::test]
286    async fn test_close() {
287        let (a, _b) = InProcTransport::pair();
288
289        a.close().await.unwrap();
290        assert!(a.is_closed());
291
292        // Sending on closed transport should fail
293        let frame = Frame::new(MsgDescHot::new());
294        assert!(matches!(
295            a.send_frame(&frame).await,
296            Err(TransportError::Closed)
297        ));
298    }
299
300    #[tokio::test]
301    async fn test_encoder() {
302        let (a, _b) = InProcTransport::pair();
303
304        let mut encoder = a.encoder();
305        encoder.encode_bytes(b"test data").unwrap();
306        let frame = encoder.finish().unwrap();
307
308        assert_eq!(frame.payload(), b"test data");
309    }
310}
311
312/// Conformance tests using rapace-testkit.
313#[cfg(test)]
314mod conformance_tests {
315    use super::*;
316    use rapace_testkit::{TestError, TransportFactory};
317    use std::sync::Once;
318
319    static INIT: Once = Once::new();
320
321    fn init_tracing() {
322        INIT.call_once(|| {
323            tracing_subscriber::fmt()
324                .with_env_filter(
325                    tracing_subscriber::EnvFilter::from_default_env()
326                        .add_directive(tracing::Level::DEBUG.into()),
327                )
328                .with_test_writer()
329                .init();
330        });
331    }
332
333    struct InProcFactory;
334
335    impl TransportFactory for InProcFactory {
336        type Transport = InProcTransport;
337
338        async fn connect_pair() -> Result<(Self::Transport, Self::Transport), TestError> {
339            Ok(InProcTransport::pair())
340        }
341    }
342
343    #[tokio::test]
344    async fn unary_happy_path() {
345        init_tracing();
346        rapace_testkit::run_unary_happy_path::<InProcFactory>().await;
347    }
348
349    #[tokio::test]
350    async fn unary_multiple_calls() {
351        rapace_testkit::run_unary_multiple_calls::<InProcFactory>().await;
352    }
353
354    #[tokio::test]
355    async fn ping_pong() {
356        rapace_testkit::run_ping_pong::<InProcFactory>().await;
357    }
358
359    #[tokio::test]
360    async fn deadline_success() {
361        rapace_testkit::run_deadline_success::<InProcFactory>().await;
362    }
363
364    #[tokio::test]
365    async fn deadline_exceeded() {
366        rapace_testkit::run_deadline_exceeded::<InProcFactory>().await;
367    }
368
369    #[tokio::test]
370    async fn cancellation() {
371        rapace_testkit::run_cancellation::<InProcFactory>().await;
372    }
373
374    #[tokio::test]
375    async fn credit_grant() {
376        rapace_testkit::run_credit_grant::<InProcFactory>().await;
377    }
378
379    #[tokio::test]
380    async fn error_response() {
381        rapace_testkit::run_error_response::<InProcFactory>().await;
382    }
383
384    // Session-level tests (semantic enforcement)
385
386    #[tokio::test]
387    async fn session_credit_exhaustion() {
388        rapace_testkit::run_session_credit_exhaustion::<InProcFactory>().await;
389    }
390
391    #[tokio::test]
392    async fn session_cancelled_channel_drop() {
393        rapace_testkit::run_session_cancelled_channel_drop::<InProcFactory>().await;
394    }
395
396    #[tokio::test]
397    async fn session_cancel_control_frame() {
398        rapace_testkit::run_session_cancel_control_frame::<InProcFactory>().await;
399    }
400
401    #[tokio::test]
402    async fn session_grant_credits_control_frame() {
403        rapace_testkit::run_session_grant_credits_control_frame::<InProcFactory>().await;
404    }
405
406    #[tokio::test]
407    async fn session_deadline_check() {
408        rapace_testkit::run_session_deadline_check::<InProcFactory>().await;
409    }
410
411    // Streaming tests
412
413    #[tokio::test]
414    async fn server_streaming_happy_path() {
415        rapace_testkit::run_server_streaming_happy_path::<InProcFactory>().await;
416    }
417
418    #[tokio::test]
419    async fn client_streaming_happy_path() {
420        rapace_testkit::run_client_streaming_happy_path::<InProcFactory>().await;
421    }
422
423    #[tokio::test]
424    async fn bidirectional_streaming() {
425        rapace_testkit::run_bidirectional_streaming::<InProcFactory>().await;
426    }
427
428    #[tokio::test]
429    async fn streaming_cancellation() {
430        rapace_testkit::run_streaming_cancellation::<InProcFactory>().await;
431    }
432
433    // Macro-generated streaming tests
434
435    #[tokio::test]
436    async fn macro_server_streaming() {
437        rapace_testkit::run_macro_server_streaming::<InProcFactory>().await;
438    }
439
440    // Large blob tests
441
442    #[tokio::test]
443    async fn large_blob_echo() {
444        rapace_testkit::run_large_blob_echo::<InProcFactory>().await;
445    }
446
447    #[tokio::test]
448    async fn large_blob_transform() {
449        rapace_testkit::run_large_blob_transform::<InProcFactory>().await;
450    }
451
452    #[tokio::test]
453    async fn large_blob_checksum() {
454        rapace_testkit::run_large_blob_checksum::<InProcFactory>().await;
455    }
456}