Skip to main content

hyperi_rustlib/transport/memory/
mod.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/memory/mod.rs
3// Purpose:   In-memory transport using tokio channels
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Memory Transport
10//!
11//! In-memory transport using tokio channels for unit testing.
12//! No persistence, same-process only.
13//!
14//! ## Example
15//!
16//! ```rust,ignore
17//! use hyperi_rustlib::transport::{MemoryTransport, MemoryConfig, Transport};
18//!
19//! let config = MemoryConfig::default();
20//! let transport = MemoryTransport::new(&config).expect("memory transport with valid config must construct");
21//!
22//! // In tests, you can also get a sender handle
23//! let sender = transport.sender();
24//! sender.send(b"test payload".to_vec()).await?;
25//!
26//! let records = transport.recv(10).await?.records;
27//! assert_eq!(records.len(), 1);
28//! ```
29
30mod token;
31
32pub use token::MemoryToken;
33
34use super::error::{TransportError, TransportResult};
35use super::traits::{RecvBatch, TransportBase, TransportReceiver, TransportSender};
36use super::types::{Message, PayloadFormat, SendResult};
37use super::work_batch::WorkBatch;
38use serde::{Deserialize, Serialize};
39use std::sync::Arc;
40use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
41use tokio::sync::mpsc;
42
43/// Configuration for memory transport.
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct MemoryConfig {
46    /// Channel buffer size.
47    #[serde(default = "default_buffer_size")]
48    pub buffer_size: usize,
49
50    /// Receive timeout in milliseconds (0 = no wait, return immediately).
51    #[serde(default)]
52    pub recv_timeout_ms: u64,
53
54    /// Inbound message filters (applied on recv before caller sees messages).
55    #[serde(default)]
56    pub filters_in: Vec<super::filter::FilterRule>,
57
58    /// Outbound message filters (applied on send before transport dispatches).
59    #[serde(default)]
60    pub filters_out: Vec<super::filter::FilterRule>,
61}
62
63fn default_buffer_size() -> usize {
64    1000
65}
66
67impl Default for MemoryConfig {
68    fn default() -> Self {
69        Self {
70            buffer_size: default_buffer_size(),
71            recv_timeout_ms: 0,
72            filters_in: Vec::new(),
73            filters_out: Vec::new(),
74        }
75    }
76}
77
78/// Internal message type for the channel.
79struct InternalMessage {
80    key: Option<Arc<str>>,
81    payload: Vec<u8>,
82    seq: u64,
83    timestamp_ms: i64,
84}
85
86/// In-memory transport using tokio channels.
87///
88/// Primarily for unit testing - no persistence, same-process only.
89pub struct MemoryTransport {
90    sender: mpsc::Sender<InternalMessage>,
91    receiver: tokio::sync::Mutex<mpsc::Receiver<InternalMessage>>,
92    sequence: AtomicU64,
93    committed_seq: AtomicU64,
94    closed: AtomicBool,
95    recv_timeout_ms: u64,
96    filter_engine: super::filter::TransportFilterEngine,
97}
98
99impl MemoryTransport {
100    /// Create a new memory transport.
101    ///
102    /// # Errors
103    ///
104    /// Returns [`TransportError`] when any inbound/outbound filter rule
105    /// fails to compile. Previously this produced a `tracing::warn!` and
106    /// silently substituted an empty filter engine; that fail-open
107    /// behaviour hid real misconfiguration (a filter that should have
108    /// blocked traffic would instead let every message through), so the
109    /// constructor now propagates the error to the caller.
110    pub fn new(config: &MemoryConfig) -> super::error::TransportResult<Self> {
111        let (sender, receiver) = mpsc::channel(config.buffer_size);
112        let filter_engine = super::filter::TransportFilterEngine::new(
113            &config.filters_in,
114            &config.filters_out,
115            &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
116        )?;
117        Ok(Self {
118            sender,
119            receiver: tokio::sync::Mutex::new(receiver),
120            sequence: AtomicU64::new(0),
121            committed_seq: AtomicU64::new(0),
122            closed: AtomicBool::new(false),
123            recv_timeout_ms: config.recv_timeout_ms,
124            filter_engine,
125        })
126    }
127
128    /// Get a sender handle for injecting test messages.
129    ///
130    /// This is useful in tests to send messages without going through
131    /// the Transport trait.
132    #[must_use]
133    pub fn sender(&self) -> MemorySender<'_> {
134        MemorySender {
135            sender: self.sender.clone(),
136            sequence: &self.sequence,
137        }
138    }
139
140    /// Send a message directly (bypasses Transport trait).
141    ///
142    /// # Errors
143    ///
144    /// Returns error if the channel is full or closed.
145    pub async fn inject(&self, key: Option<&str>, payload: Vec<u8>) -> TransportResult<()> {
146        if self.closed.load(Ordering::Relaxed) {
147            return Err(TransportError::Closed);
148        }
149
150        let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
151        let timestamp_ms = chrono::Utc::now().timestamp_millis();
152
153        let msg = InternalMessage {
154            key: key.map(Arc::from),
155            payload,
156            seq,
157            timestamp_ms,
158        };
159
160        self.sender
161            .send(msg)
162            .await
163            .map_err(|_| TransportError::Send("channel closed".into()))
164    }
165
166    /// Get the current committed sequence number.
167    #[must_use]
168    pub fn committed_sequence(&self) -> u64 {
169        self.committed_seq.load(Ordering::Relaxed)
170    }
171}
172
173/// Sender handle for injecting test messages.
174pub struct MemorySender<'a> {
175    sender: mpsc::Sender<InternalMessage>,
176    sequence: &'a AtomicU64,
177}
178
179impl MemorySender<'_> {
180    /// Send a payload with optional key.
181    ///
182    /// # Errors
183    ///
184    /// Returns error if the channel is full or closed.
185    pub async fn send(&self, key: Option<&str>, payload: Vec<u8>) -> TransportResult<()> {
186        let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
187        let timestamp_ms = chrono::Utc::now().timestamp_millis();
188
189        let msg = InternalMessage {
190            key: key.map(Arc::from),
191            payload,
192            seq,
193            timestamp_ms,
194        };
195
196        self.sender
197            .send(msg)
198            .await
199            .map_err(|_| TransportError::Send("channel closed".into()))
200    }
201}
202
203impl TransportBase for MemoryTransport {
204    async fn close(&self) -> TransportResult<()> {
205        self.closed.store(true, Ordering::Relaxed);
206        Ok(())
207    }
208
209    fn is_healthy(&self) -> bool {
210        !self.closed.load(Ordering::Relaxed)
211    }
212
213    fn name(&self) -> &'static str {
214        "memory"
215    }
216}
217
218impl TransportSender for MemoryTransport {
219    async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
220        if self.closed.load(Ordering::Relaxed) {
221            return SendResult::Fatal(TransportError::Closed);
222        }
223
224        // Outbound filter check
225        if self.filter_engine.has_outbound_filters() {
226            match self.filter_engine.apply_outbound(&payload) {
227                super::filter::FilterDisposition::Pass => {}
228                super::filter::FilterDisposition::Drop => return SendResult::Ok,
229                super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
230            }
231        }
232
233        let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
234        let timestamp_ms = chrono::Utc::now().timestamp_millis();
235
236        let msg = InternalMessage {
237            key: Some(Arc::from(key)),
238            payload: payload.to_vec(),
239            seq,
240            timestamp_ms,
241        };
242
243        match self.sender.try_send(msg) {
244            Ok(()) => SendResult::Ok,
245            Err(mpsc::error::TrySendError::Full(_)) => SendResult::Backpressured,
246            Err(mpsc::error::TrySendError::Closed(_)) => SendResult::Fatal(TransportError::Closed),
247        }
248    }
249}
250
251impl TransportReceiver for MemoryTransport {
252    type Token = MemoryToken;
253
254    async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
255        if self.closed.load(Ordering::Relaxed) {
256            return Err(TransportError::Closed);
257        }
258
259        let mut receiver = self.receiver.lock().await;
260        let mut messages = Vec::with_capacity(max.min(100));
261
262        for _ in 0..max {
263            let result = if self.recv_timeout_ms == 0 {
264                match receiver.try_recv() {
265                    Ok(msg) => Some(msg),
266                    Err(mpsc::error::TryRecvError::Empty) => break,
267                    Err(mpsc::error::TryRecvError::Disconnected) => {
268                        return Err(TransportError::Closed);
269                    }
270                }
271            } else if messages.is_empty() {
272                match tokio::time::timeout(
273                    std::time::Duration::from_millis(self.recv_timeout_ms),
274                    receiver.recv(),
275                )
276                .await
277                {
278                    Ok(Some(msg)) => Some(msg),
279                    Ok(None) => return Err(TransportError::Closed),
280                    Err(_) => break,
281                }
282            } else {
283                match receiver.try_recv() {
284                    Ok(msg) => Some(msg),
285                    Err(_) => break,
286                }
287            };
288
289            if let Some(internal) = result {
290                let payload: bytes::Bytes = internal.payload.into();
291                let format = PayloadFormat::detect(&payload);
292                messages.push(Message {
293                    key: internal.key,
294                    payload,
295                    token: MemoryToken { seq: internal.seq },
296                    timestamp_ms: Some(internal.timestamp_ms),
297                    format,
298                });
299            }
300        }
301
302        // Apply inbound filters via the shared partition helper; DLQ entries
303        // are returned in the RecvBatch for the caller to route onward.
304        let batch =
305            self.filter_engine
306                .partition_batch(messages, |m| m.payload.as_ref(), |m| m.key.clone());
307        let messages = batch.messages;
308        let dlq_entries = batch.dlq_entries;
309
310        Ok(RecvBatch {
311            messages,
312            dlq_entries,
313        }
314        .into())
315    }
316
317    async fn commit(&self, tokens: &[Self::Token]) -> TransportResult<()> {
318        if let Some(max_seq) = tokens.iter().map(|t| t.seq).max() {
319            let _ = self.committed_seq.fetch_max(max_seq, Ordering::Relaxed);
320        }
321        Ok(())
322    }
323}
324
325#[cfg(test)]
326mod tests {
327    use super::*;
328
329    #[tokio::test]
330    async fn send_and_receive() {
331        let config = MemoryConfig::default();
332        let transport = MemoryTransport::new(&config)
333            .expect("memory transport with valid config must construct");
334        // Send a message
335        let result = transport
336            .send("test-key", bytes::Bytes::from_static(b"hello world"))
337            .await;
338        assert!(result.is_ok());
339
340        // Receive it
341        let records = transport.recv(10).await.unwrap().records;
342        assert_eq!(records.len(), 1);
343        assert_eq!(records[0].key.as_deref(), Some("test-key"));
344        assert_eq!(records[0].payload.as_ref(), b"hello world");
345    }
346
347    /// MemoryTransport does NOT override `send_batch`, so this exercises the
348    /// trait's per-record default fallback (Task 0.7c): every record is sent
349    /// individually via `send`, using its own key, and all arrive intact.
350    #[tokio::test]
351    async fn send_batch_default_fallback_sends_each_record() {
352        use super::super::work_batch::{Record, RecordMeta};
353
354        let transport = MemoryTransport::new(&MemoryConfig::default())
355            .expect("memory transport with valid config must construct");
356
357        let records: Vec<Record> = (0..3)
358            .map(|i| Record {
359                payload: bytes::Bytes::from(format!(r#"{{"id":{i}}}"#)),
360                key: Some(Arc::from(format!("k{i}").as_str())),
361                headers: Vec::new(),
362                metadata: RecordMeta {
363                    timestamp_ms: None,
364                    format: PayloadFormat::Json,
365                },
366            })
367            .collect();
368
369        // Default fallback: one send per record, returns Ok for the whole block.
370        let result = transport.send_batch(&records).await;
371        assert!(
372            result.is_ok(),
373            "default send_batch must succeed: {result:?}"
374        );
375
376        // All three records loop back through recv with keys + payloads intact.
377        let got = transport.recv(10).await.unwrap().records;
378        assert_eq!(got.len(), 3, "every record in the block was sent");
379        assert_eq!(got[0].key.as_deref(), Some("k0"));
380        assert_eq!(got[0].payload.as_ref(), br#"{"id":0}"#);
381        assert_eq!(got[2].key.as_deref(), Some("k2"));
382        assert_eq!(got[2].payload.as_ref(), br#"{"id":2}"#);
383    }
384
385    /// The default `send_batch` short-circuits on the first non-Ok result so the
386    /// caller retries the unconfirmed remainder (at-least-once). A closed
387    /// transport makes every `send` Fatal, so a non-empty block returns Fatal.
388    #[tokio::test]
389    async fn send_batch_default_short_circuits_on_error() {
390        use super::super::work_batch::{Record, RecordMeta};
391
392        let transport = MemoryTransport::new(&MemoryConfig::default())
393            .expect("memory transport with valid config must construct");
394        transport.close().await.unwrap();
395
396        let records = vec![Record {
397            payload: bytes::Bytes::from_static(b"{}"),
398            key: None,
399            headers: Vec::new(),
400            metadata: RecordMeta {
401                timestamp_ms: None,
402                format: PayloadFormat::Json,
403            },
404        }];
405
406        let result = transport.send_batch(&records).await;
407        assert!(
408            result.is_fatal(),
409            "closed transport must surface the send failure, got {result:?}"
410        );
411
412        // An empty block is a trivial Ok (nothing to send).
413        assert!(transport.send_batch(&[]).await.is_ok());
414    }
415
416    #[tokio::test]
417    async fn inject_messages() {
418        let config = MemoryConfig::default();
419        let transport = MemoryTransport::new(&config)
420            .expect("memory transport with valid config must construct");
421        // Inject test messages
422        transport
423            .inject(Some("key1"), b"msg1".to_vec())
424            .await
425            .unwrap();
426        transport
427            .inject(Some("key2"), b"msg2".to_vec())
428            .await
429            .unwrap();
430
431        // Receive them
432        let records = transport.recv(10).await.unwrap().records;
433        assert_eq!(records.len(), 2);
434    }
435
436    #[tokio::test]
437    async fn commit_advances_sequence() {
438        let config = MemoryConfig::default();
439        let transport = MemoryTransport::new(&config)
440            .expect("memory transport with valid config must construct");
441        transport.inject(None, b"msg".to_vec()).await.unwrap();
442        let batch = transport.recv(1).await.unwrap();
443
444        // Commit the message via the batch's commit tokens.
445        transport.commit(&batch.commit_tokens).await.unwrap();
446
447        // Verify committed sequence advanced
448        assert_eq!(transport.committed_sequence(), 0);
449    }
450
451    #[tokio::test]
452    async fn close_prevents_operations() {
453        let config = MemoryConfig::default();
454        let transport = MemoryTransport::new(&config)
455            .expect("memory transport with valid config must construct");
456        transport.close().await.unwrap();
457        assert!(!transport.is_healthy());
458
459        // Send should fail
460        let result = transport
461            .send("key", bytes::Bytes::from_static(b"data"))
462            .await;
463        assert!(result.is_fatal());
464
465        // Recv should fail
466        let result = transport.recv(1).await;
467        assert!(result.is_err());
468    }
469
470    #[tokio::test]
471    async fn backpressure_on_full_channel() {
472        let config = MemoryConfig {
473            buffer_size: 1,
474            recv_timeout_ms: 0,
475            ..Default::default()
476        };
477        let transport = MemoryTransport::new(&config)
478            .expect("memory transport with valid config must construct");
479
480        // Fill the channel
481        let result1 = transport
482            .send("key", bytes::Bytes::from_static(b"msg1"))
483            .await;
484        assert!(result1.is_ok());
485
486        // Next send should backpressure
487        let result2 = transport
488            .send("key", bytes::Bytes::from_static(b"msg2"))
489            .await;
490        assert!(result2.is_backpressured());
491    }
492}