rapace_transport_mem/
lib.rs1use std::sync::Arc;
20
21use parking_lot::Mutex;
22use rapace_core::{
23 DecodeError, EncodeCtx, EncodeError, Frame, FrameView, MsgDescHot, Transport, TransportError,
24};
25use tokio::sync::mpsc;
26
27const CHANNEL_CAPACITY: usize = 64;
29
30pub struct InProcTransport {
35 inner: Arc<InProcInner>,
36}
37
38struct InProcInner {
39 tx: mpsc::Sender<Frame>,
41 rx: tokio::sync::Mutex<mpsc::Receiver<Frame>>,
43 last_frame: Mutex<Option<Frame>>,
46 closed: std::sync::atomic::AtomicBool,
48}
49
50impl InProcTransport {
51 pub fn pair() -> (Self, Self) {
55 let (tx_a, rx_a) = mpsc::channel(CHANNEL_CAPACITY);
56 let (tx_b, rx_b) = mpsc::channel(CHANNEL_CAPACITY);
57
58 let inner_a = Arc::new(InProcInner {
59 tx: tx_b, rx: tokio::sync::Mutex::new(rx_a),
61 last_frame: Mutex::new(None),
62 closed: std::sync::atomic::AtomicBool::new(false),
63 });
64
65 let inner_b = Arc::new(InProcInner {
66 tx: tx_a, rx: tokio::sync::Mutex::new(rx_b),
68 last_frame: Mutex::new(None),
69 closed: std::sync::atomic::AtomicBool::new(false),
70 });
71
72 (Self { inner: inner_a }, Self { inner: inner_b })
73 }
74
75 pub fn is_closed(&self) -> bool {
77 self.inner.closed.load(std::sync::atomic::Ordering::Acquire)
78 }
79}
80
81impl Transport for InProcTransport {
82 async fn send_frame(&self, frame: &Frame) -> Result<(), TransportError> {
83 if self.is_closed() {
84 return Err(TransportError::Closed);
85 }
86
87 self.inner
89 .tx
90 .send(frame.clone())
91 .await
92 .map_err(|_| TransportError::Closed)
93 }
94
95 async fn recv_frame(&self) -> Result<FrameView<'_>, TransportError> {
96 if self.is_closed() {
97 return Err(TransportError::Closed);
98 }
99
100 let frame = {
102 let mut rx = self.inner.rx.lock().await;
103 rx.recv().await.ok_or(TransportError::Closed)?
104 };
105
106 {
108 let mut last = self.inner.last_frame.lock();
109 *last = Some(frame);
110 }
111
112 let last = self.inner.last_frame.lock();
118 let frame_ref = last.as_ref().unwrap();
119
120 let desc_ptr = &frame_ref.desc as *const MsgDescHot;
125 let payload_ptr = frame_ref.payload().as_ptr();
126 let payload_len = frame_ref.payload().len();
127
128 let desc: &MsgDescHot = unsafe { &*desc_ptr };
132 let payload: &[u8] = unsafe { std::slice::from_raw_parts(payload_ptr, payload_len) };
133
134 Ok(FrameView::new(desc, payload))
135 }
136
137 fn encoder(&self) -> Box<dyn EncodeCtx + '_> {
138 Box::new(InProcEncoder::new())
139 }
140
141 async fn close(&self) -> Result<(), TransportError> {
142 self.inner
143 .closed
144 .store(true, std::sync::atomic::Ordering::Release);
145 Ok(())
146 }
147}
148
149pub struct InProcEncoder {
153 desc: MsgDescHot,
154 payload: Vec<u8>,
155}
156
157impl InProcEncoder {
158 fn new() -> Self {
159 Self {
160 desc: MsgDescHot::new(),
161 payload: Vec::new(),
162 }
163 }
164
165 pub fn set_desc(&mut self, desc: MsgDescHot) {
167 self.desc = desc;
168 }
169}
170
171impl EncodeCtx for InProcEncoder {
172 fn encode_bytes(&mut self, bytes: &[u8]) -> Result<(), EncodeError> {
173 self.payload.extend_from_slice(bytes);
174 Ok(())
175 }
176
177 fn finish(self: Box<Self>) -> Result<Frame, EncodeError> {
178 Ok(Frame::with_payload(self.desc, self.payload))
179 }
180}
181
182pub struct InProcDecoder<'a> {
184 data: &'a [u8],
185 pos: usize,
186}
187
188impl<'a> InProcDecoder<'a> {
189 pub fn new(data: &'a [u8]) -> Self {
191 Self { data, pos: 0 }
192 }
193}
194
195impl<'a> rapace_core::DecodeCtx<'a> for InProcDecoder<'a> {
196 fn decode_bytes(&mut self) -> Result<&'a [u8], DecodeError> {
197 let result = &self.data[self.pos..];
198 self.pos = self.data.len();
199 Ok(result)
200 }
201
202 fn remaining(&self) -> &'a [u8] {
203 &self.data[self.pos..]
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210 use rapace_core::FrameFlags;
211
212 #[tokio::test]
213 async fn test_pair_creation() {
214 let (a, b) = InProcTransport::pair();
215 assert!(!a.is_closed());
216 assert!(!b.is_closed());
217 }
218
219 #[tokio::test]
220 async fn test_send_recv_inline() {
221 let (a, b) = InProcTransport::pair();
222
223 let mut desc = MsgDescHot::new();
225 desc.msg_id = 1;
226 desc.channel_id = 1;
227 desc.method_id = 42;
228 desc.flags = FrameFlags::DATA;
229
230 let frame = Frame::with_inline_payload(desc, b"hello").unwrap();
231
232 a.send_frame(&frame).await.unwrap();
234
235 let view = b.recv_frame().await.unwrap();
237 assert_eq!(view.desc.msg_id, 1);
238 assert_eq!(view.desc.channel_id, 1);
239 assert_eq!(view.desc.method_id, 42);
240 assert_eq!(view.payload, b"hello");
241 }
242
243 #[tokio::test]
244 async fn test_send_recv_external_payload() {
245 let (a, b) = InProcTransport::pair();
246
247 let mut desc = MsgDescHot::new();
248 desc.msg_id = 2;
249 desc.flags = FrameFlags::DATA;
250
251 let payload = vec![0u8; 1000]; let frame = Frame::with_payload(desc, payload.clone());
253
254 a.send_frame(&frame).await.unwrap();
255
256 let view = b.recv_frame().await.unwrap();
257 assert_eq!(view.desc.msg_id, 2);
258 assert_eq!(view.payload.len(), 1000);
259 }
260
261 #[tokio::test]
262 async fn test_bidirectional() {
263 let (a, b) = InProcTransport::pair();
264
265 let mut desc_a = MsgDescHot::new();
267 desc_a.msg_id = 1;
268 let frame_a = Frame::with_inline_payload(desc_a, b"from A").unwrap();
269 a.send_frame(&frame_a).await.unwrap();
270
271 let mut desc_b = MsgDescHot::new();
273 desc_b.msg_id = 2;
274 let frame_b = Frame::with_inline_payload(desc_b, b"from B").unwrap();
275 b.send_frame(&frame_b).await.unwrap();
276
277 let view_b = b.recv_frame().await.unwrap();
279 assert_eq!(view_b.payload, b"from A");
280
281 let view_a = a.recv_frame().await.unwrap();
282 assert_eq!(view_a.payload, b"from B");
283 }
284
285 #[tokio::test]
286 async fn test_close() {
287 let (a, _b) = InProcTransport::pair();
288
289 a.close().await.unwrap();
290 assert!(a.is_closed());
291
292 let frame = Frame::new(MsgDescHot::new());
294 assert!(matches!(
295 a.send_frame(&frame).await,
296 Err(TransportError::Closed)
297 ));
298 }
299
300 #[tokio::test]
301 async fn test_encoder() {
302 let (a, _b) = InProcTransport::pair();
303
304 let mut encoder = a.encoder();
305 encoder.encode_bytes(b"test data").unwrap();
306 let frame = encoder.finish().unwrap();
307
308 assert_eq!(frame.payload(), b"test data");
309 }
310}
311
312#[cfg(test)]
314mod conformance_tests {
315 use super::*;
316 use rapace_testkit::{TestError, TransportFactory};
317 use std::sync::Once;
318
319 static INIT: Once = Once::new();
320
321 fn init_tracing() {
322 INIT.call_once(|| {
323 tracing_subscriber::fmt()
324 .with_env_filter(
325 tracing_subscriber::EnvFilter::from_default_env()
326 .add_directive(tracing::Level::DEBUG.into()),
327 )
328 .with_test_writer()
329 .init();
330 });
331 }
332
333 struct InProcFactory;
334
335 impl TransportFactory for InProcFactory {
336 type Transport = InProcTransport;
337
338 async fn connect_pair() -> Result<(Self::Transport, Self::Transport), TestError> {
339 Ok(InProcTransport::pair())
340 }
341 }
342
343 #[tokio::test]
344 async fn unary_happy_path() {
345 init_tracing();
346 rapace_testkit::run_unary_happy_path::<InProcFactory>().await;
347 }
348
349 #[tokio::test]
350 async fn unary_multiple_calls() {
351 rapace_testkit::run_unary_multiple_calls::<InProcFactory>().await;
352 }
353
354 #[tokio::test]
355 async fn ping_pong() {
356 rapace_testkit::run_ping_pong::<InProcFactory>().await;
357 }
358
359 #[tokio::test]
360 async fn deadline_success() {
361 rapace_testkit::run_deadline_success::<InProcFactory>().await;
362 }
363
364 #[tokio::test]
365 async fn deadline_exceeded() {
366 rapace_testkit::run_deadline_exceeded::<InProcFactory>().await;
367 }
368
369 #[tokio::test]
370 async fn cancellation() {
371 rapace_testkit::run_cancellation::<InProcFactory>().await;
372 }
373
374 #[tokio::test]
375 async fn credit_grant() {
376 rapace_testkit::run_credit_grant::<InProcFactory>().await;
377 }
378
379 #[tokio::test]
380 async fn error_response() {
381 rapace_testkit::run_error_response::<InProcFactory>().await;
382 }
383
384 #[tokio::test]
387 async fn session_credit_exhaustion() {
388 rapace_testkit::run_session_credit_exhaustion::<InProcFactory>().await;
389 }
390
391 #[tokio::test]
392 async fn session_cancelled_channel_drop() {
393 rapace_testkit::run_session_cancelled_channel_drop::<InProcFactory>().await;
394 }
395
396 #[tokio::test]
397 async fn session_cancel_control_frame() {
398 rapace_testkit::run_session_cancel_control_frame::<InProcFactory>().await;
399 }
400
401 #[tokio::test]
402 async fn session_grant_credits_control_frame() {
403 rapace_testkit::run_session_grant_credits_control_frame::<InProcFactory>().await;
404 }
405
406 #[tokio::test]
407 async fn session_deadline_check() {
408 rapace_testkit::run_session_deadline_check::<InProcFactory>().await;
409 }
410
411 #[tokio::test]
414 async fn server_streaming_happy_path() {
415 rapace_testkit::run_server_streaming_happy_path::<InProcFactory>().await;
416 }
417
418 #[tokio::test]
419 async fn client_streaming_happy_path() {
420 rapace_testkit::run_client_streaming_happy_path::<InProcFactory>().await;
421 }
422
423 #[tokio::test]
424 async fn bidirectional_streaming() {
425 rapace_testkit::run_bidirectional_streaming::<InProcFactory>().await;
426 }
427
428 #[tokio::test]
429 async fn streaming_cancellation() {
430 rapace_testkit::run_streaming_cancellation::<InProcFactory>().await;
431 }
432
433 #[tokio::test]
436 async fn macro_server_streaming() {
437 rapace_testkit::run_macro_server_streaming::<InProcFactory>().await;
438 }
439
440 #[tokio::test]
443 async fn large_blob_echo() {
444 rapace_testkit::run_large_blob_echo::<InProcFactory>().await;
445 }
446
447 #[tokio::test]
448 async fn large_blob_transform() {
449 rapace_testkit::run_large_blob_transform::<InProcFactory>().await;
450 }
451
452 #[tokio::test]
453 async fn large_blob_checksum() {
454 rapace_testkit::run_large_blob_checksum::<InProcFactory>().await;
455 }
456}