Skip to main content

hyperi_rustlib/transport/memory/
mod.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/memory/mod.rs
3// Purpose:   In-memory transport using tokio channels
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Memory Transport
10//!
11//! In-memory transport backed by tokio channels. It exists primarily as a
12//! **no-mocks test substrate**: a REAL `Transport` implementation (send / recv
13//! / commit / DLQ / `WorkBatch`, with Kafka-style CUMULATIVE commit semantics)
14//! so the engine, driver, filter, routed and factory test suites can exercise
15//! the actual transport trait end-to-end -- fast, deterministic, in-process --
16//! WITHOUT standing up a Kafka or Redis broker. Per the HyperI no-mocks policy
17//! we run tests against this real backend rather than mocking the trait.
18//!
19//! It doubles as a **same-process loopback** for local dev / demos
20//! (`transport: memory` via the factory).
21//!
22//! NOT a production backend: no persistence, same-process only (sender and
23//! receiver are tied to one instance), and no cross-pod surface -- so it is
24//! also not horizontally autoscalable (it has no lag / backpressure signal).
25//!
26//! ## Example
27//!
28//! ```rust,ignore
29//! use hyperi_rustlib::transport::{MemoryTransport, MemoryConfig, Transport};
30//!
31//! let config = MemoryConfig::default();
32//! let transport = MemoryTransport::new(&config).expect("memory transport with valid config must construct");
33//!
34//! // In tests, you can also get a sender handle
35//! let sender = transport.sender();
36//! sender.send(b"test payload".to_vec()).await?;
37//!
38//! let records = transport.recv(10).await?.records;
39//! assert_eq!(records.len(), 1);
40//! ```
41
42mod token;
43
44pub use token::MemoryToken;
45
46use super::error::{TransportError, TransportResult};
47use super::traits::{RecvBatch, TransportBase, TransportReceiver, TransportSender};
48use super::types::{Message, PayloadFormat, SendResult};
49use super::work_batch::WorkBatch;
50use serde::{Deserialize, Serialize};
51use std::sync::Arc;
52use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
53use tokio::sync::mpsc;
54
55/// Configuration for memory transport.
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct MemoryConfig {
58    /// Channel buffer size.
59    #[serde(default = "default_buffer_size")]
60    pub buffer_size: usize,
61
62    /// Receive timeout in milliseconds (0 = no wait, return immediately).
63    #[serde(default)]
64    pub recv_timeout_ms: u64,
65
66    /// Inbound message filters (applied on recv before caller sees messages).
67    #[serde(default)]
68    pub filters_in: Vec<super::filter::FilterRule>,
69
70    /// Outbound message filters (applied on send before transport dispatches).
71    #[serde(default)]
72    pub filters_out: Vec<super::filter::FilterRule>,
73}
74
75fn default_buffer_size() -> usize {
76    1000
77}
78
79impl Default for MemoryConfig {
80    fn default() -> Self {
81        Self {
82            buffer_size: default_buffer_size(),
83            recv_timeout_ms: 0,
84            filters_in: Vec::new(),
85            filters_out: Vec::new(),
86        }
87    }
88}
89
90/// Internal message type for the channel.
91struct InternalMessage {
92    key: Option<Arc<str>>,
93    payload: bytes::Bytes,
94    seq: u64,
95    timestamp_ms: i64,
96}
97
98/// In-memory transport using tokio channels.
99///
100/// Primarily for unit testing - no persistence, same-process only.
101pub struct MemoryTransport {
102    sender: mpsc::Sender<InternalMessage>,
103    receiver: tokio::sync::Mutex<mpsc::Receiver<InternalMessage>>,
104    sequence: AtomicU64,
105    committed_seq: AtomicU64,
106    closed: AtomicBool,
107    recv_timeout_ms: u64,
108    filter_engine: super::filter::TransportFilterEngine,
109}
110
111impl MemoryTransport {
112    /// Create a new memory transport.
113    ///
114    /// # Errors
115    ///
116    /// Returns [`TransportError`] when any inbound/outbound filter rule
117    /// fails to compile. Previously this produced a `tracing::warn!` and
118    /// silently substituted an empty filter engine; that fail-open
119    /// behaviour hid real misconfiguration (a filter that should have
120    /// blocked traffic would instead let every message through), so the
121    /// constructor now propagates the error to the caller.
122    pub fn new(config: &MemoryConfig) -> super::error::TransportResult<Self> {
123        let (sender, receiver) = mpsc::channel(config.buffer_size);
124        let filter_engine = super::filter::TransportFilterEngine::new(
125            &config.filters_in,
126            &config.filters_out,
127            &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
128        )?;
129        Ok(Self {
130            sender,
131            receiver: tokio::sync::Mutex::new(receiver),
132            sequence: AtomicU64::new(0),
133            committed_seq: AtomicU64::new(0),
134            closed: AtomicBool::new(false),
135            recv_timeout_ms: config.recv_timeout_ms,
136            filter_engine,
137        })
138    }
139
140    /// Get a sender handle for injecting test messages.
141    ///
142    /// This is useful in tests to send messages without going through
143    /// the Transport trait.
144    #[must_use]
145    pub fn sender(&self) -> MemorySender<'_> {
146        MemorySender {
147            sender: self.sender.clone(),
148            sequence: &self.sequence,
149        }
150    }
151
152    /// Send a message directly (bypasses Transport trait).
153    ///
154    /// # Errors
155    ///
156    /// Returns error if the channel is full or closed.
157    pub async fn inject(&self, key: Option<&str>, payload: Vec<u8>) -> TransportResult<()> {
158        if self.closed.load(Ordering::Relaxed) {
159            return Err(TransportError::Closed);
160        }
161
162        let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
163        let timestamp_ms = chrono::Utc::now().timestamp_millis();
164
165        let msg = InternalMessage {
166            key: key.map(Arc::from),
167            payload: payload.into(),
168            seq,
169            timestamp_ms,
170        };
171
172        self.sender
173            .send(msg)
174            .await
175            .map_err(|_| TransportError::Send("channel closed".into()))
176    }
177
178    /// Get the current committed sequence number.
179    #[must_use]
180    pub fn committed_sequence(&self) -> u64 {
181        self.committed_seq.load(Ordering::Relaxed)
182    }
183}
184
185/// Sender handle for injecting test messages.
186pub struct MemorySender<'a> {
187    sender: mpsc::Sender<InternalMessage>,
188    sequence: &'a AtomicU64,
189}
190
191impl MemorySender<'_> {
192    /// Send a payload with optional key.
193    ///
194    /// # Errors
195    ///
196    /// Returns error if the channel is full or closed.
197    pub async fn send(&self, key: Option<&str>, payload: Vec<u8>) -> TransportResult<()> {
198        let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
199        let timestamp_ms = chrono::Utc::now().timestamp_millis();
200
201        let msg = InternalMessage {
202            key: key.map(Arc::from),
203            payload: payload.into(),
204            seq,
205            timestamp_ms,
206        };
207
208        self.sender
209            .send(msg)
210            .await
211            .map_err(|_| TransportError::Send("channel closed".into()))
212    }
213}
214
215impl TransportBase for MemoryTransport {
216    async fn close(&self) -> TransportResult<()> {
217        self.closed.store(true, Ordering::Relaxed);
218        Ok(())
219    }
220
221    fn is_healthy(&self) -> bool {
222        !self.closed.load(Ordering::Relaxed)
223    }
224
225    fn name(&self) -> &'static str {
226        "memory"
227    }
228}
229
230impl TransportSender for MemoryTransport {
231    async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
232        if self.closed.load(Ordering::Relaxed) {
233            return SendResult::Fatal(TransportError::Closed);
234        }
235
236        // Outbound filter check
237        if self.filter_engine.has_outbound_filters() {
238            match self.filter_engine.apply_outbound(&payload) {
239                super::filter::FilterDisposition::Pass => {}
240                super::filter::FilterDisposition::Drop => return SendResult::Ok,
241                super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
242            }
243        }
244
245        let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
246        let timestamp_ms = chrono::Utc::now().timestamp_millis();
247
248        let msg = InternalMessage {
249            key: Some(Arc::from(key)),
250            payload,
251            seq,
252            timestamp_ms,
253        };
254
255        match self.sender.try_send(msg) {
256            Ok(()) => SendResult::Ok,
257            Err(mpsc::error::TrySendError::Full(_)) => SendResult::Backpressured,
258            Err(mpsc::error::TrySendError::Closed(_)) => SendResult::Fatal(TransportError::Closed),
259        }
260    }
261}
262
263impl TransportReceiver for MemoryTransport {
264    type Token = MemoryToken;
265
266    async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
267        if self.closed.load(Ordering::Relaxed) {
268            return Err(TransportError::Closed);
269        }
270
271        let mut receiver = self.receiver.lock().await;
272        let mut messages = Vec::with_capacity(max.min(100));
273
274        for _ in 0..max {
275            let result = if self.recv_timeout_ms == 0 {
276                match receiver.try_recv() {
277                    Ok(msg) => Some(msg),
278                    Err(mpsc::error::TryRecvError::Empty) => break,
279                    Err(mpsc::error::TryRecvError::Disconnected) => {
280                        return Err(TransportError::Closed);
281                    }
282                }
283            } else if messages.is_empty() {
284                match tokio::time::timeout(
285                    std::time::Duration::from_millis(self.recv_timeout_ms),
286                    receiver.recv(),
287                )
288                .await
289                {
290                    Ok(Some(msg)) => Some(msg),
291                    Ok(None) => return Err(TransportError::Closed),
292                    Err(_) => break,
293                }
294            } else {
295                match receiver.try_recv() {
296                    Ok(msg) => Some(msg),
297                    Err(_) => break,
298                }
299            };
300
301            if let Some(internal) = result {
302                let payload = internal.payload;
303                let format = PayloadFormat::detect(&payload);
304                messages.push(Message {
305                    key: internal.key,
306                    payload,
307                    token: MemoryToken { seq: internal.seq },
308                    timestamp_ms: Some(internal.timestamp_ms),
309                    format,
310                });
311            }
312        }
313
314        // Apply inbound filters via the shared partition helper; DLQ entries
315        // are returned in the RecvBatch for the caller to route onward.
316        let batch = self.filter_engine.partition_batch(
317            messages,
318            |m| m.payload.as_ref(),
319            |m| m.key.clone(),
320            |m| m.token,
321        );
322        let messages = batch.messages;
323        let dlq_entries = batch.dlq_entries;
324        let filtered_tokens = batch.filtered_tokens;
325
326        Ok(RecvBatch {
327            messages,
328            dlq_entries,
329            filtered_tokens,
330        }
331        .into())
332    }
333
334    async fn commit(&self, tokens: &[Self::Token]) -> TransportResult<()> {
335        if let Some(max_seq) = tokens.iter().map(|t| t.seq).max() {
336            let _ = self.committed_seq.fetch_max(max_seq, Ordering::Relaxed);
337        }
338        Ok(())
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345
346    #[tokio::test]
347    async fn send_and_receive() {
348        let config = MemoryConfig::default();
349        let transport = MemoryTransport::new(&config)
350            .expect("memory transport with valid config must construct");
351        // Send a message
352        let result = transport
353            .send("test-key", bytes::Bytes::from_static(b"hello world"))
354            .await;
355        assert!(result.is_ok());
356
357        // Receive it
358        let records = transport.recv(10).await.unwrap().records;
359        assert_eq!(records.len(), 1);
360        assert_eq!(records[0].key.as_deref(), Some("test-key"));
361        assert_eq!(records[0].payload.as_ref(), b"hello world");
362    }
363
364    /// MemoryTransport does NOT override `send_batch`, so this exercises the
365    /// trait's per-record default fallback (Task 0.7c): every record is sent
366    /// individually via `send`, using its own key, and all arrive intact.
367    #[tokio::test]
368    async fn send_batch_default_fallback_sends_each_record() {
369        use super::super::work_batch::{Record, RecordMeta};
370
371        let transport = MemoryTransport::new(&MemoryConfig::default())
372            .expect("memory transport with valid config must construct");
373
374        let records: Vec<Record> = (0..3)
375            .map(|i| Record {
376                payload: bytes::Bytes::from(format!(r#"{{"id":{i}}}"#)),
377                key: Some(Arc::from(format!("k{i}").as_str())),
378                headers: Vec::new(),
379                metadata: RecordMeta {
380                    timestamp_ms: None,
381                    format: PayloadFormat::Json,
382                },
383            })
384            .collect();
385
386        // Default fallback: one send per record, returns Ok for the whole block.
387        let result = transport.send_batch(&records).await;
388        assert!(
389            result.is_ok(),
390            "default send_batch must succeed: {result:?}"
391        );
392
393        // All three records loop back through recv with keys + payloads intact.
394        let got = transport.recv(10).await.unwrap().records;
395        assert_eq!(got.len(), 3, "every record in the block was sent");
396        assert_eq!(got[0].key.as_deref(), Some("k0"));
397        assert_eq!(got[0].payload.as_ref(), br#"{"id":0}"#);
398        assert_eq!(got[2].key.as_deref(), Some("k2"));
399        assert_eq!(got[2].payload.as_ref(), br#"{"id":2}"#);
400    }
401
402    /// The default `send_batch` short-circuits on the first non-Ok result so the
403    /// caller retries the unconfirmed remainder (at-least-once). A closed
404    /// transport makes every `send` Fatal, so a non-empty block returns Fatal.
405    #[tokio::test]
406    async fn send_batch_default_short_circuits_on_error() {
407        use super::super::work_batch::{Record, RecordMeta};
408
409        let transport = MemoryTransport::new(&MemoryConfig::default())
410            .expect("memory transport with valid config must construct");
411        transport.close().await.unwrap();
412
413        let records = vec![Record {
414            payload: bytes::Bytes::from_static(b"{}"),
415            key: None,
416            headers: Vec::new(),
417            metadata: RecordMeta {
418                timestamp_ms: None,
419                format: PayloadFormat::Json,
420            },
421        }];
422
423        let result = transport.send_batch(&records).await;
424        assert!(
425            result.is_fatal(),
426            "closed transport must surface the send failure, got {result:?}"
427        );
428
429        // An empty block is a trivial Ok (nothing to send).
430        assert!(transport.send_batch(&[]).await.is_ok());
431    }
432
433    #[tokio::test]
434    async fn inject_messages() {
435        let config = MemoryConfig::default();
436        let transport = MemoryTransport::new(&config)
437            .expect("memory transport with valid config must construct");
438        // Inject test messages
439        transport
440            .inject(Some("key1"), b"msg1".to_vec())
441            .await
442            .unwrap();
443        transport
444            .inject(Some("key2"), b"msg2".to_vec())
445            .await
446            .unwrap();
447
448        // Receive them
449        let records = transport.recv(10).await.unwrap().records;
450        assert_eq!(records.len(), 2);
451    }
452
453    #[tokio::test]
454    async fn commit_advances_sequence() {
455        let config = MemoryConfig::default();
456        let transport = MemoryTransport::new(&config)
457            .expect("memory transport with valid config must construct");
458        transport.inject(None, b"msg".to_vec()).await.unwrap();
459        let batch = transport.recv(1).await.unwrap();
460
461        // Commit the message via the batch's commit tokens.
462        transport.commit(&batch.commit_tokens).await.unwrap();
463
464        // Verify committed sequence advanced
465        assert_eq!(transport.committed_sequence(), 0);
466    }
467
468    #[tokio::test]
469    async fn close_prevents_operations() {
470        let config = MemoryConfig::default();
471        let transport = MemoryTransport::new(&config)
472            .expect("memory transport with valid config must construct");
473        transport.close().await.unwrap();
474        assert!(!transport.is_healthy());
475
476        // Send should fail
477        let result = transport
478            .send("key", bytes::Bytes::from_static(b"data"))
479            .await;
480        assert!(result.is_fatal());
481
482        // Recv should fail
483        let result = transport.recv(1).await;
484        assert!(result.is_err());
485    }
486
487    #[tokio::test]
488    async fn backpressure_on_full_channel() {
489        let config = MemoryConfig {
490            buffer_size: 1,
491            recv_timeout_ms: 0,
492            ..Default::default()
493        };
494        let transport = MemoryTransport::new(&config)
495            .expect("memory transport with valid config must construct");
496
497        // Fill the channel
498        let result1 = transport
499            .send("key", bytes::Bytes::from_static(b"msg1"))
500            .await;
501        assert!(result1.is_ok());
502
503        // Next send should backpressure
504        let result2 = transport
505            .send("key", bytes::Bytes::from_static(b"msg2"))
506            .await;
507        assert!(result2.is_backpressured());
508    }
509}