Skip to main content

hyperi_rustlib/transport/memory/
mod.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/memory/mod.rs
3// Purpose:   In-memory transport using tokio channels
4// Language:  Rust
5//
6// License:   FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Memory Transport
10//!
11//! In-memory transport using tokio channels for unit testing.
12//! No persistence, same-process only.
13//!
14//! ## Example
15//!
16//! ```rust,ignore
17//! use hyperi_rustlib::transport::{MemoryTransport, MemoryConfig, Transport};
18//!
19//! let config = MemoryConfig::default();
20//! let transport = MemoryTransport::new(&config).expect("memory transport with valid config must construct");
21//!
22//! // In tests, you can also get a sender handle
23//! let sender = transport.sender();
24//! sender.send(b"test payload".to_vec()).await?;
25//!
26//! let messages = transport.recv(10).await?;
27//! assert_eq!(messages.len(), 1);
28//! ```
29
30mod token;
31
32pub use token::MemoryToken;
33
34use super::error::{TransportError, TransportResult};
35use super::traits::{TransportBase, TransportReceiver, TransportSender};
36use super::types::{Message, PayloadFormat, SendResult};
37use serde::{Deserialize, Serialize};
38use std::sync::Arc;
39use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
40use tokio::sync::mpsc;
41
42/// Configuration for memory transport.
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct MemoryConfig {
45    /// Channel buffer size.
46    #[serde(default = "default_buffer_size")]
47    pub buffer_size: usize,
48
49    /// Receive timeout in milliseconds (0 = no wait, return immediately).
50    #[serde(default)]
51    pub recv_timeout_ms: u64,
52
53    /// Inbound message filters (applied on recv before caller sees messages).
54    #[serde(default)]
55    pub filters_in: Vec<super::filter::FilterRule>,
56
57    /// Outbound message filters (applied on send before transport dispatches).
58    #[serde(default)]
59    pub filters_out: Vec<super::filter::FilterRule>,
60}
61
62fn default_buffer_size() -> usize {
63    1000
64}
65
66impl Default for MemoryConfig {
67    fn default() -> Self {
68        Self {
69            buffer_size: default_buffer_size(),
70            recv_timeout_ms: 0,
71            filters_in: Vec::new(),
72            filters_out: Vec::new(),
73        }
74    }
75}
76
77/// Internal message type for the channel.
78struct InternalMessage {
79    key: Option<Arc<str>>,
80    payload: Vec<u8>,
81    seq: u64,
82    timestamp_ms: i64,
83}
84
85/// In-memory transport using tokio channels.
86///
87/// Primarily for unit testing - no persistence, same-process only.
88pub struct MemoryTransport {
89    sender: mpsc::Sender<InternalMessage>,
90    receiver: tokio::sync::Mutex<mpsc::Receiver<InternalMessage>>,
91    sequence: AtomicU64,
92    committed_seq: AtomicU64,
93    closed: AtomicBool,
94    recv_timeout_ms: u64,
95    filter_engine: super::filter::TransportFilterEngine,
96    /// Buffer for messages staged to DLQ by inbound filters.
97    /// Drained by `take_filtered_dlq_entries()`.
98    filtered_dlq_buffer: parking_lot::Mutex<Vec<super::filter::FilteredDlqEntry>>,
99}
100
101impl MemoryTransport {
102    /// Create a new memory transport.
103    ///
104    /// # Errors
105    ///
106    /// Returns [`TransportError`] when any inbound/outbound filter rule
107    /// fails to compile. Previously this produced a `tracing::warn!` and
108    /// silently substituted an empty filter engine; that fail-open
109    /// behaviour hid real misconfiguration (a filter that should have
110    /// blocked traffic would instead let every message through), so the
111    /// constructor now propagates the error to the caller.
112    pub fn new(config: &MemoryConfig) -> super::error::TransportResult<Self> {
113        let (sender, receiver) = mpsc::channel(config.buffer_size);
114        let filter_engine = super::filter::TransportFilterEngine::new(
115            &config.filters_in,
116            &config.filters_out,
117            &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
118        )?;
119        Ok(Self {
120            sender,
121            receiver: tokio::sync::Mutex::new(receiver),
122            sequence: AtomicU64::new(0),
123            committed_seq: AtomicU64::new(0),
124            closed: AtomicBool::new(false),
125            recv_timeout_ms: config.recv_timeout_ms,
126            filter_engine,
127            filtered_dlq_buffer: parking_lot::Mutex::new(Vec::new()),
128        })
129    }
130
131    /// Get a sender handle for injecting test messages.
132    ///
133    /// This is useful in tests to send messages without going through
134    /// the Transport trait.
135    #[must_use]
136    pub fn sender(&self) -> MemorySender<'_> {
137        MemorySender {
138            sender: self.sender.clone(),
139            sequence: &self.sequence,
140        }
141    }
142
143    /// Send a message directly (bypasses Transport trait).
144    ///
145    /// # Errors
146    ///
147    /// Returns error if the channel is full or closed.
148    pub async fn inject(&self, key: Option<&str>, payload: Vec<u8>) -> TransportResult<()> {
149        if self.closed.load(Ordering::Relaxed) {
150            return Err(TransportError::Closed);
151        }
152
153        let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
154        let timestamp_ms = chrono::Utc::now().timestamp_millis();
155
156        let msg = InternalMessage {
157            key: key.map(Arc::from),
158            payload,
159            seq,
160            timestamp_ms,
161        };
162
163        self.sender
164            .send(msg)
165            .await
166            .map_err(|_| TransportError::Send("channel closed".into()))
167    }
168
169    /// Get the current committed sequence number.
170    #[must_use]
171    pub fn committed_sequence(&self) -> u64 {
172        self.committed_seq.load(Ordering::Relaxed)
173    }
174}
175
176/// Sender handle for injecting test messages.
177pub struct MemorySender<'a> {
178    sender: mpsc::Sender<InternalMessage>,
179    sequence: &'a AtomicU64,
180}
181
182impl MemorySender<'_> {
183    /// Send a payload with optional key.
184    ///
185    /// # Errors
186    ///
187    /// Returns error if the channel is full or closed.
188    pub async fn send(&self, key: Option<&str>, payload: Vec<u8>) -> TransportResult<()> {
189        let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
190        let timestamp_ms = chrono::Utc::now().timestamp_millis();
191
192        let msg = InternalMessage {
193            key: key.map(Arc::from),
194            payload,
195            seq,
196            timestamp_ms,
197        };
198
199        self.sender
200            .send(msg)
201            .await
202            .map_err(|_| TransportError::Send("channel closed".into()))
203    }
204}
205
206impl TransportBase for MemoryTransport {
207    async fn close(&self) -> TransportResult<()> {
208        self.closed.store(true, Ordering::Relaxed);
209        Ok(())
210    }
211
212    fn is_healthy(&self) -> bool {
213        !self.closed.load(Ordering::Relaxed)
214    }
215
216    fn name(&self) -> &'static str {
217        "memory"
218    }
219}
220
221impl TransportSender for MemoryTransport {
222    async fn send(&self, key: &str, payload: &[u8]) -> SendResult {
223        if self.closed.load(Ordering::Relaxed) {
224            return SendResult::Fatal(TransportError::Closed);
225        }
226
227        // Outbound filter check
228        if self.filter_engine.has_outbound_filters() {
229            match self.filter_engine.apply_outbound(payload) {
230                super::filter::FilterDisposition::Pass => {}
231                super::filter::FilterDisposition::Drop => return SendResult::Ok,
232                super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
233            }
234        }
235
236        let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
237        let timestamp_ms = chrono::Utc::now().timestamp_millis();
238
239        let msg = InternalMessage {
240            key: Some(Arc::from(key)),
241            payload: payload.to_vec(),
242            seq,
243            timestamp_ms,
244        };
245
246        match self.sender.try_send(msg) {
247            Ok(()) => SendResult::Ok,
248            Err(mpsc::error::TrySendError::Full(_)) => SendResult::Backpressured,
249            Err(mpsc::error::TrySendError::Closed(_)) => SendResult::Fatal(TransportError::Closed),
250        }
251    }
252}
253
254impl TransportReceiver for MemoryTransport {
255    type Token = MemoryToken;
256
257    async fn recv(&self, max: usize) -> TransportResult<Vec<Message<Self::Token>>> {
258        if self.closed.load(Ordering::Relaxed) {
259            return Err(TransportError::Closed);
260        }
261
262        let mut receiver = self.receiver.lock().await;
263        let mut messages = Vec::with_capacity(max.min(100));
264
265        for _ in 0..max {
266            let result = if self.recv_timeout_ms == 0 {
267                match receiver.try_recv() {
268                    Ok(msg) => Some(msg),
269                    Err(mpsc::error::TryRecvError::Empty) => break,
270                    Err(mpsc::error::TryRecvError::Disconnected) => {
271                        return Err(TransportError::Closed);
272                    }
273                }
274            } else if messages.is_empty() {
275                match tokio::time::timeout(
276                    std::time::Duration::from_millis(self.recv_timeout_ms),
277                    receiver.recv(),
278                )
279                .await
280                {
281                    Ok(Some(msg)) => Some(msg),
282                    Ok(None) => return Err(TransportError::Closed),
283                    Err(_) => break,
284                }
285            } else {
286                match receiver.try_recv() {
287                    Ok(msg) => Some(msg),
288                    Err(_) => break,
289                }
290            };
291
292            if let Some(internal) = result {
293                let format = PayloadFormat::detect(&internal.payload);
294                messages.push(Message {
295                    key: internal.key,
296                    payload: internal.payload,
297                    token: MemoryToken { seq: internal.seq },
298                    timestamp_ms: Some(internal.timestamp_ms),
299                    format,
300                });
301            }
302        }
303
304        // Apply inbound filters: drop messages, stage DLQ entries
305        if self.filter_engine.has_inbound_filters() {
306            let mut staged_dlq: Vec<super::filter::FilteredDlqEntry> = Vec::new();
307            messages.retain(|msg| match self.filter_engine.apply_inbound(&msg.payload) {
308                super::filter::FilterDisposition::Pass => true,
309                super::filter::FilterDisposition::Drop => false,
310                super::filter::FilterDisposition::Dlq => {
311                    staged_dlq.push(super::filter::FilteredDlqEntry {
312                        payload: msg.payload.clone(),
313                        key: msg.key.clone(),
314                        reason: "transport filter".to_string(),
315                    });
316                    false
317                }
318            });
319            if !staged_dlq.is_empty() {
320                self.filtered_dlq_buffer.lock().extend(staged_dlq);
321            }
322        }
323
324        Ok(messages)
325    }
326
327    fn take_filtered_dlq_entries(&self) -> Vec<super::filter::FilteredDlqEntry> {
328        std::mem::take(&mut *self.filtered_dlq_buffer.lock())
329    }
330
331    async fn commit(&self, tokens: &[Self::Token]) -> TransportResult<()> {
332        if let Some(max_seq) = tokens.iter().map(|t| t.seq).max() {
333            let _ = self.committed_seq.fetch_max(max_seq, Ordering::Relaxed);
334        }
335        Ok(())
336    }
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342
343    #[tokio::test]
344    async fn send_and_receive() {
345        let config = MemoryConfig::default();
346        let transport = MemoryTransport::new(&config)
347            .expect("memory transport with valid config must construct");
348        // Send a message
349        let result = transport.send("test-key", b"hello world").await;
350        assert!(result.is_ok());
351
352        // Receive it
353        let messages = transport.recv(10).await.unwrap();
354        assert_eq!(messages.len(), 1);
355        assert_eq!(messages[0].key.as_deref(), Some("test-key"));
356        assert_eq!(messages[0].payload, b"hello world");
357    }
358
359    #[tokio::test]
360    async fn inject_messages() {
361        let config = MemoryConfig::default();
362        let transport = MemoryTransport::new(&config)
363            .expect("memory transport with valid config must construct");
364        // Inject test messages
365        transport
366            .inject(Some("key1"), b"msg1".to_vec())
367            .await
368            .unwrap();
369        transport
370            .inject(Some("key2"), b"msg2".to_vec())
371            .await
372            .unwrap();
373
374        // Receive them
375        let messages = transport.recv(10).await.unwrap();
376        assert_eq!(messages.len(), 2);
377    }
378
379    #[tokio::test]
380    async fn commit_advances_sequence() {
381        let config = MemoryConfig::default();
382        let transport = MemoryTransport::new(&config)
383            .expect("memory transport with valid config must construct");
384        transport.inject(None, b"msg".to_vec()).await.unwrap();
385        let messages = transport.recv(1).await.unwrap();
386
387        // Commit the message
388        let tokens: Vec<_> = messages.iter().map(|m| m.token).collect();
389        transport.commit(&tokens).await.unwrap();
390
391        // Verify committed sequence advanced
392        assert_eq!(transport.committed_sequence(), 0);
393    }
394
395    #[tokio::test]
396    async fn close_prevents_operations() {
397        let config = MemoryConfig::default();
398        let transport = MemoryTransport::new(&config)
399            .expect("memory transport with valid config must construct");
400        transport.close().await.unwrap();
401        assert!(!transport.is_healthy());
402
403        // Send should fail
404        let result = transport.send("key", b"data").await;
405        assert!(result.is_fatal());
406
407        // Recv should fail
408        let result = transport.recv(1).await;
409        assert!(result.is_err());
410    }
411
412    #[tokio::test]
413    async fn backpressure_on_full_channel() {
414        let config = MemoryConfig {
415            buffer_size: 1,
416            recv_timeout_ms: 0,
417            ..Default::default()
418        };
419        let transport = MemoryTransport::new(&config)
420            .expect("memory transport with valid config must construct");
421
422        // Fill the channel
423        let result1 = transport.send("key", b"msg1").await;
424        assert!(result1.is_ok());
425
426        // Next send should backpressure
427        let result2 = transport.send("key", b"msg2").await;
428        assert!(result2.is_backpressured());
429    }
430}