Skip to main content

hyperi_rustlib/transport/memory/
mod.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/memory/mod.rs
3// Purpose:   In-memory transport using tokio channels
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Memory Transport
10//!
11//! In-memory transport using tokio channels for unit testing.
12//! No persistence, same-process only.
13//!
14//! ## Example
15//!
16//! ```rust,ignore
17//! use hyperi_rustlib::transport::{MemoryTransport, MemoryConfig, Transport};
18//!
19//! let config = MemoryConfig::default();
20//! let transport = MemoryTransport::new(&config).expect("memory transport with valid config must construct");
21//!
22//! // In tests, you can also get a sender handle
23//! let sender = transport.sender();
24//! sender.send(b"test payload".to_vec()).await?;
25//!
26//! let records = transport.recv(10).await?.records;
27//! assert_eq!(records.len(), 1);
28//! ```
29
30mod token;
31
32pub use token::MemoryToken;
33
34use super::error::{TransportError, TransportResult};
35use super::traits::{RecvBatch, TransportBase, TransportReceiver, TransportSender};
36use super::types::{Message, PayloadFormat, SendResult};
37use super::work_batch::WorkBatch;
38use serde::{Deserialize, Serialize};
39use std::sync::Arc;
40use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
41use tokio::sync::mpsc;
42
43/// Configuration for memory transport.
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct MemoryConfig {
46    /// Channel buffer size.
47    #[serde(default = "default_buffer_size")]
48    pub buffer_size: usize,
49
50    /// Receive timeout in milliseconds (0 = no wait, return immediately).
51    #[serde(default)]
52    pub recv_timeout_ms: u64,
53
54    /// Inbound message filters (applied on recv before caller sees messages).
55    #[serde(default)]
56    pub filters_in: Vec<super::filter::FilterRule>,
57
58    /// Outbound message filters (applied on send before transport dispatches).
59    #[serde(default)]
60    pub filters_out: Vec<super::filter::FilterRule>,
61}
62
63fn default_buffer_size() -> usize {
64    1000
65}
66
67impl Default for MemoryConfig {
68    fn default() -> Self {
69        Self {
70            buffer_size: default_buffer_size(),
71            recv_timeout_ms: 0,
72            filters_in: Vec::new(),
73            filters_out: Vec::new(),
74        }
75    }
76}
77
78/// Internal message type for the channel.
79struct InternalMessage {
80    key: Option<Arc<str>>,
81    payload: bytes::Bytes,
82    seq: u64,
83    timestamp_ms: i64,
84}
85
86/// In-memory transport using tokio channels.
87///
88/// Primarily for unit testing - no persistence, same-process only.
89pub struct MemoryTransport {
90    sender: mpsc::Sender<InternalMessage>,
91    receiver: tokio::sync::Mutex<mpsc::Receiver<InternalMessage>>,
92    sequence: AtomicU64,
93    committed_seq: AtomicU64,
94    closed: AtomicBool,
95    recv_timeout_ms: u64,
96    filter_engine: super::filter::TransportFilterEngine,
97}
98
99impl MemoryTransport {
100    /// Create a new memory transport.
101    ///
102    /// # Errors
103    ///
104    /// Returns [`TransportError`] when any inbound/outbound filter rule
105    /// fails to compile. Previously this produced a `tracing::warn!` and
106    /// silently substituted an empty filter engine; that fail-open
107    /// behaviour hid real misconfiguration (a filter that should have
108    /// blocked traffic would instead let every message through), so the
109    /// constructor now propagates the error to the caller.
110    pub fn new(config: &MemoryConfig) -> super::error::TransportResult<Self> {
111        let (sender, receiver) = mpsc::channel(config.buffer_size);
112        let filter_engine = super::filter::TransportFilterEngine::new(
113            &config.filters_in,
114            &config.filters_out,
115            &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
116        )?;
117        Ok(Self {
118            sender,
119            receiver: tokio::sync::Mutex::new(receiver),
120            sequence: AtomicU64::new(0),
121            committed_seq: AtomicU64::new(0),
122            closed: AtomicBool::new(false),
123            recv_timeout_ms: config.recv_timeout_ms,
124            filter_engine,
125        })
126    }
127
128    /// Get a sender handle for injecting test messages.
129    ///
130    /// This is useful in tests to send messages without going through
131    /// the Transport trait.
132    #[must_use]
133    pub fn sender(&self) -> MemorySender<'_> {
134        MemorySender {
135            sender: self.sender.clone(),
136            sequence: &self.sequence,
137        }
138    }
139
140    /// Send a message directly (bypasses Transport trait).
141    ///
142    /// # Errors
143    ///
144    /// Returns error if the channel is full or closed.
145    pub async fn inject(&self, key: Option<&str>, payload: Vec<u8>) -> TransportResult<()> {
146        if self.closed.load(Ordering::Relaxed) {
147            return Err(TransportError::Closed);
148        }
149
150        let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
151        let timestamp_ms = chrono::Utc::now().timestamp_millis();
152
153        let msg = InternalMessage {
154            key: key.map(Arc::from),
155            payload: payload.into(),
156            seq,
157            timestamp_ms,
158        };
159
160        self.sender
161            .send(msg)
162            .await
163            .map_err(|_| TransportError::Send("channel closed".into()))
164    }
165
166    /// Get the current committed sequence number.
167    #[must_use]
168    pub fn committed_sequence(&self) -> u64 {
169        self.committed_seq.load(Ordering::Relaxed)
170    }
171}
172
173/// Sender handle for injecting test messages.
174pub struct MemorySender<'a> {
175    sender: mpsc::Sender<InternalMessage>,
176    sequence: &'a AtomicU64,
177}
178
179impl MemorySender<'_> {
180    /// Send a payload with optional key.
181    ///
182    /// # Errors
183    ///
184    /// Returns error if the channel is full or closed.
185    pub async fn send(&self, key: Option<&str>, payload: Vec<u8>) -> TransportResult<()> {
186        let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
187        let timestamp_ms = chrono::Utc::now().timestamp_millis();
188
189        let msg = InternalMessage {
190            key: key.map(Arc::from),
191            payload: payload.into(),
192            seq,
193            timestamp_ms,
194        };
195
196        self.sender
197            .send(msg)
198            .await
199            .map_err(|_| TransportError::Send("channel closed".into()))
200    }
201}
202
203impl TransportBase for MemoryTransport {
204    async fn close(&self) -> TransportResult<()> {
205        self.closed.store(true, Ordering::Relaxed);
206        Ok(())
207    }
208
209    fn is_healthy(&self) -> bool {
210        !self.closed.load(Ordering::Relaxed)
211    }
212
213    fn name(&self) -> &'static str {
214        "memory"
215    }
216}
217
218impl TransportSender for MemoryTransport {
219    async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
220        if self.closed.load(Ordering::Relaxed) {
221            return SendResult::Fatal(TransportError::Closed);
222        }
223
224        // Outbound filter check
225        if self.filter_engine.has_outbound_filters() {
226            match self.filter_engine.apply_outbound(&payload) {
227                super::filter::FilterDisposition::Pass => {}
228                super::filter::FilterDisposition::Drop => return SendResult::Ok,
229                super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
230            }
231        }
232
233        let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
234        let timestamp_ms = chrono::Utc::now().timestamp_millis();
235
236        let msg = InternalMessage {
237            key: Some(Arc::from(key)),
238            payload,
239            seq,
240            timestamp_ms,
241        };
242
243        match self.sender.try_send(msg) {
244            Ok(()) => SendResult::Ok,
245            Err(mpsc::error::TrySendError::Full(_)) => SendResult::Backpressured,
246            Err(mpsc::error::TrySendError::Closed(_)) => SendResult::Fatal(TransportError::Closed),
247        }
248    }
249}
250
251impl TransportReceiver for MemoryTransport {
252    type Token = MemoryToken;
253
254    async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
255        if self.closed.load(Ordering::Relaxed) {
256            return Err(TransportError::Closed);
257        }
258
259        let mut receiver = self.receiver.lock().await;
260        let mut messages = Vec::with_capacity(max.min(100));
261
262        for _ in 0..max {
263            let result = if self.recv_timeout_ms == 0 {
264                match receiver.try_recv() {
265                    Ok(msg) => Some(msg),
266                    Err(mpsc::error::TryRecvError::Empty) => break,
267                    Err(mpsc::error::TryRecvError::Disconnected) => {
268                        return Err(TransportError::Closed);
269                    }
270                }
271            } else if messages.is_empty() {
272                match tokio::time::timeout(
273                    std::time::Duration::from_millis(self.recv_timeout_ms),
274                    receiver.recv(),
275                )
276                .await
277                {
278                    Ok(Some(msg)) => Some(msg),
279                    Ok(None) => return Err(TransportError::Closed),
280                    Err(_) => break,
281                }
282            } else {
283                match receiver.try_recv() {
284                    Ok(msg) => Some(msg),
285                    Err(_) => break,
286                }
287            };
288
289            if let Some(internal) = result {
290                let payload = internal.payload;
291                let format = PayloadFormat::detect(&payload);
292                messages.push(Message {
293                    key: internal.key,
294                    payload,
295                    token: MemoryToken { seq: internal.seq },
296                    timestamp_ms: Some(internal.timestamp_ms),
297                    format,
298                });
299            }
300        }
301
302        // Apply inbound filters via the shared partition helper; DLQ entries
303        // are returned in the RecvBatch for the caller to route onward.
304        let batch = self.filter_engine.partition_batch(
305            messages,
306            |m| m.payload.as_ref(),
307            |m| m.key.clone(),
308            |m| m.token,
309        );
310        let messages = batch.messages;
311        let dlq_entries = batch.dlq_entries;
312        let filtered_tokens = batch.filtered_tokens;
313
314        Ok(RecvBatch {
315            messages,
316            dlq_entries,
317            filtered_tokens,
318        }
319        .into())
320    }
321
322    async fn commit(&self, tokens: &[Self::Token]) -> TransportResult<()> {
323        if let Some(max_seq) = tokens.iter().map(|t| t.seq).max() {
324            let _ = self.committed_seq.fetch_max(max_seq, Ordering::Relaxed);
325        }
326        Ok(())
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333
334    #[tokio::test]
335    async fn send_and_receive() {
336        let config = MemoryConfig::default();
337        let transport = MemoryTransport::new(&config)
338            .expect("memory transport with valid config must construct");
339        // Send a message
340        let result = transport
341            .send("test-key", bytes::Bytes::from_static(b"hello world"))
342            .await;
343        assert!(result.is_ok());
344
345        // Receive it
346        let records = transport.recv(10).await.unwrap().records;
347        assert_eq!(records.len(), 1);
348        assert_eq!(records[0].key.as_deref(), Some("test-key"));
349        assert_eq!(records[0].payload.as_ref(), b"hello world");
350    }
351
352    /// MemoryTransport does NOT override `send_batch`, so this exercises the
353    /// trait's per-record default fallback (Task 0.7c): every record is sent
354    /// individually via `send`, using its own key, and all arrive intact.
355    #[tokio::test]
356    async fn send_batch_default_fallback_sends_each_record() {
357        use super::super::work_batch::{Record, RecordMeta};
358
359        let transport = MemoryTransport::new(&MemoryConfig::default())
360            .expect("memory transport with valid config must construct");
361
362        let records: Vec<Record> = (0..3)
363            .map(|i| Record {
364                payload: bytes::Bytes::from(format!(r#"{{"id":{i}}}"#)),
365                key: Some(Arc::from(format!("k{i}").as_str())),
366                headers: Vec::new(),
367                metadata: RecordMeta {
368                    timestamp_ms: None,
369                    format: PayloadFormat::Json,
370                },
371            })
372            .collect();
373
374        // Default fallback: one send per record, returns Ok for the whole block.
375        let result = transport.send_batch(&records).await;
376        assert!(
377            result.is_ok(),
378            "default send_batch must succeed: {result:?}"
379        );
380
381        // All three records loop back through recv with keys + payloads intact.
382        let got = transport.recv(10).await.unwrap().records;
383        assert_eq!(got.len(), 3, "every record in the block was sent");
384        assert_eq!(got[0].key.as_deref(), Some("k0"));
385        assert_eq!(got[0].payload.as_ref(), br#"{"id":0}"#);
386        assert_eq!(got[2].key.as_deref(), Some("k2"));
387        assert_eq!(got[2].payload.as_ref(), br#"{"id":2}"#);
388    }
389
390    /// The default `send_batch` short-circuits on the first non-Ok result so the
391    /// caller retries the unconfirmed remainder (at-least-once). A closed
392    /// transport makes every `send` Fatal, so a non-empty block returns Fatal.
393    #[tokio::test]
394    async fn send_batch_default_short_circuits_on_error() {
395        use super::super::work_batch::{Record, RecordMeta};
396
397        let transport = MemoryTransport::new(&MemoryConfig::default())
398            .expect("memory transport with valid config must construct");
399        transport.close().await.unwrap();
400
401        let records = vec![Record {
402            payload: bytes::Bytes::from_static(b"{}"),
403            key: None,
404            headers: Vec::new(),
405            metadata: RecordMeta {
406                timestamp_ms: None,
407                format: PayloadFormat::Json,
408            },
409        }];
410
411        let result = transport.send_batch(&records).await;
412        assert!(
413            result.is_fatal(),
414            "closed transport must surface the send failure, got {result:?}"
415        );
416
417        // An empty block is a trivial Ok (nothing to send).
418        assert!(transport.send_batch(&[]).await.is_ok());
419    }
420
421    #[tokio::test]
422    async fn inject_messages() {
423        let config = MemoryConfig::default();
424        let transport = MemoryTransport::new(&config)
425            .expect("memory transport with valid config must construct");
426        // Inject test messages
427        transport
428            .inject(Some("key1"), b"msg1".to_vec())
429            .await
430            .unwrap();
431        transport
432            .inject(Some("key2"), b"msg2".to_vec())
433            .await
434            .unwrap();
435
436        // Receive them
437        let records = transport.recv(10).await.unwrap().records;
438        assert_eq!(records.len(), 2);
439    }
440
441    #[tokio::test]
442    async fn commit_advances_sequence() {
443        let config = MemoryConfig::default();
444        let transport = MemoryTransport::new(&config)
445            .expect("memory transport with valid config must construct");
446        transport.inject(None, b"msg".to_vec()).await.unwrap();
447        let batch = transport.recv(1).await.unwrap();
448
449        // Commit the message via the batch's commit tokens.
450        transport.commit(&batch.commit_tokens).await.unwrap();
451
452        // Verify committed sequence advanced
453        assert_eq!(transport.committed_sequence(), 0);
454    }
455
456    #[tokio::test]
457    async fn close_prevents_operations() {
458        let config = MemoryConfig::default();
459        let transport = MemoryTransport::new(&config)
460            .expect("memory transport with valid config must construct");
461        transport.close().await.unwrap();
462        assert!(!transport.is_healthy());
463
464        // Send should fail
465        let result = transport
466            .send("key", bytes::Bytes::from_static(b"data"))
467            .await;
468        assert!(result.is_fatal());
469
470        // Recv should fail
471        let result = transport.recv(1).await;
472        assert!(result.is_err());
473    }
474
475    #[tokio::test]
476    async fn backpressure_on_full_channel() {
477        let config = MemoryConfig {
478            buffer_size: 1,
479            recv_timeout_ms: 0,
480            ..Default::default()
481        };
482        let transport = MemoryTransport::new(&config)
483            .expect("memory transport with valid config must construct");
484
485        // Fill the channel
486        let result1 = transport
487            .send("key", bytes::Bytes::from_static(b"msg1"))
488            .await;
489        assert!(result1.is_ok());
490
491        // Next send should backpressure
492        let result2 = transport
493            .send("key", bytes::Bytes::from_static(b"msg2"))
494            .await;
495        assert!(result2.is_backpressured());
496    }
497}