hyperi_rustlib/transport/memory/
mod.rs1mod token;
31
32pub use token::MemoryToken;
33
34use super::error::{TransportError, TransportResult};
35use super::traits::{TransportBase, TransportReceiver, TransportSender};
36use super::types::{Message, PayloadFormat, SendResult};
37use serde::{Deserialize, Serialize};
38use std::sync::Arc;
39use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
40use tokio::sync::mpsc;
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct MemoryConfig {
45 #[serde(default = "default_buffer_size")]
47 pub buffer_size: usize,
48
49 #[serde(default)]
51 pub recv_timeout_ms: u64,
52
53 #[serde(default)]
55 pub filters_in: Vec<super::filter::FilterRule>,
56
57 #[serde(default)]
59 pub filters_out: Vec<super::filter::FilterRule>,
60}
61
62fn default_buffer_size() -> usize {
63 1000
64}
65
66impl Default for MemoryConfig {
67 fn default() -> Self {
68 Self {
69 buffer_size: default_buffer_size(),
70 recv_timeout_ms: 0,
71 filters_in: Vec::new(),
72 filters_out: Vec::new(),
73 }
74 }
75}
76
77struct InternalMessage {
79 key: Option<Arc<str>>,
80 payload: Vec<u8>,
81 seq: u64,
82 timestamp_ms: i64,
83}
84
85pub struct MemoryTransport {
89 sender: mpsc::Sender<InternalMessage>,
90 receiver: tokio::sync::Mutex<mpsc::Receiver<InternalMessage>>,
91 sequence: AtomicU64,
92 committed_seq: AtomicU64,
93 closed: AtomicBool,
94 recv_timeout_ms: u64,
95 filter_engine: super::filter::TransportFilterEngine,
96 filtered_dlq_buffer: parking_lot::Mutex<Vec<super::filter::FilteredDlqEntry>>,
99}
100
101impl MemoryTransport {
102 pub fn new(config: &MemoryConfig) -> super::error::TransportResult<Self> {
113 let (sender, receiver) = mpsc::channel(config.buffer_size);
114 let filter_engine = super::filter::TransportFilterEngine::new(
115 &config.filters_in,
116 &config.filters_out,
117 &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
118 )?;
119 Ok(Self {
120 sender,
121 receiver: tokio::sync::Mutex::new(receiver),
122 sequence: AtomicU64::new(0),
123 committed_seq: AtomicU64::new(0),
124 closed: AtomicBool::new(false),
125 recv_timeout_ms: config.recv_timeout_ms,
126 filter_engine,
127 filtered_dlq_buffer: parking_lot::Mutex::new(Vec::new()),
128 })
129 }
130
131 #[must_use]
136 pub fn sender(&self) -> MemorySender<'_> {
137 MemorySender {
138 sender: self.sender.clone(),
139 sequence: &self.sequence,
140 }
141 }
142
143 pub async fn inject(&self, key: Option<&str>, payload: Vec<u8>) -> TransportResult<()> {
149 if self.closed.load(Ordering::Relaxed) {
150 return Err(TransportError::Closed);
151 }
152
153 let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
154 let timestamp_ms = chrono::Utc::now().timestamp_millis();
155
156 let msg = InternalMessage {
157 key: key.map(Arc::from),
158 payload,
159 seq,
160 timestamp_ms,
161 };
162
163 self.sender
164 .send(msg)
165 .await
166 .map_err(|_| TransportError::Send("channel closed".into()))
167 }
168
169 #[must_use]
171 pub fn committed_sequence(&self) -> u64 {
172 self.committed_seq.load(Ordering::Relaxed)
173 }
174}
175
176pub struct MemorySender<'a> {
178 sender: mpsc::Sender<InternalMessage>,
179 sequence: &'a AtomicU64,
180}
181
182impl MemorySender<'_> {
183 pub async fn send(&self, key: Option<&str>, payload: Vec<u8>) -> TransportResult<()> {
189 let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
190 let timestamp_ms = chrono::Utc::now().timestamp_millis();
191
192 let msg = InternalMessage {
193 key: key.map(Arc::from),
194 payload,
195 seq,
196 timestamp_ms,
197 };
198
199 self.sender
200 .send(msg)
201 .await
202 .map_err(|_| TransportError::Send("channel closed".into()))
203 }
204}
205
206impl TransportBase for MemoryTransport {
207 async fn close(&self) -> TransportResult<()> {
208 self.closed.store(true, Ordering::Relaxed);
209 Ok(())
210 }
211
212 fn is_healthy(&self) -> bool {
213 !self.closed.load(Ordering::Relaxed)
214 }
215
216 fn name(&self) -> &'static str {
217 "memory"
218 }
219}
220
221impl TransportSender for MemoryTransport {
222 async fn send(&self, key: &str, payload: &[u8]) -> SendResult {
223 if self.closed.load(Ordering::Relaxed) {
224 return SendResult::Fatal(TransportError::Closed);
225 }
226
227 if self.filter_engine.has_outbound_filters() {
229 match self.filter_engine.apply_outbound(payload) {
230 super::filter::FilterDisposition::Pass => {}
231 super::filter::FilterDisposition::Drop => return SendResult::Ok,
232 super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
233 }
234 }
235
236 let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
237 let timestamp_ms = chrono::Utc::now().timestamp_millis();
238
239 let msg = InternalMessage {
240 key: Some(Arc::from(key)),
241 payload: payload.to_vec(),
242 seq,
243 timestamp_ms,
244 };
245
246 match self.sender.try_send(msg) {
247 Ok(()) => SendResult::Ok,
248 Err(mpsc::error::TrySendError::Full(_)) => SendResult::Backpressured,
249 Err(mpsc::error::TrySendError::Closed(_)) => SendResult::Fatal(TransportError::Closed),
250 }
251 }
252}
253
254impl TransportReceiver for MemoryTransport {
255 type Token = MemoryToken;
256
257 async fn recv(&self, max: usize) -> TransportResult<Vec<Message<Self::Token>>> {
258 if self.closed.load(Ordering::Relaxed) {
259 return Err(TransportError::Closed);
260 }
261
262 let mut receiver = self.receiver.lock().await;
263 let mut messages = Vec::with_capacity(max.min(100));
264
265 for _ in 0..max {
266 let result = if self.recv_timeout_ms == 0 {
267 match receiver.try_recv() {
268 Ok(msg) => Some(msg),
269 Err(mpsc::error::TryRecvError::Empty) => break,
270 Err(mpsc::error::TryRecvError::Disconnected) => {
271 return Err(TransportError::Closed);
272 }
273 }
274 } else if messages.is_empty() {
275 match tokio::time::timeout(
276 std::time::Duration::from_millis(self.recv_timeout_ms),
277 receiver.recv(),
278 )
279 .await
280 {
281 Ok(Some(msg)) => Some(msg),
282 Ok(None) => return Err(TransportError::Closed),
283 Err(_) => break,
284 }
285 } else {
286 match receiver.try_recv() {
287 Ok(msg) => Some(msg),
288 Err(_) => break,
289 }
290 };
291
292 if let Some(internal) = result {
293 let format = PayloadFormat::detect(&internal.payload);
294 messages.push(Message {
295 key: internal.key,
296 payload: internal.payload,
297 token: MemoryToken { seq: internal.seq },
298 timestamp_ms: Some(internal.timestamp_ms),
299 format,
300 });
301 }
302 }
303
304 if self.filter_engine.has_inbound_filters() {
306 let mut staged_dlq: Vec<super::filter::FilteredDlqEntry> = Vec::new();
307 messages.retain(|msg| match self.filter_engine.apply_inbound(&msg.payload) {
308 super::filter::FilterDisposition::Pass => true,
309 super::filter::FilterDisposition::Drop => false,
310 super::filter::FilterDisposition::Dlq => {
311 staged_dlq.push(super::filter::FilteredDlqEntry {
312 payload: msg.payload.clone(),
313 key: msg.key.clone(),
314 reason: "transport filter".to_string(),
315 });
316 false
317 }
318 });
319 if !staged_dlq.is_empty() {
320 self.filtered_dlq_buffer.lock().extend(staged_dlq);
321 }
322 }
323
324 Ok(messages)
325 }
326
327 fn take_filtered_dlq_entries(&self) -> Vec<super::filter::FilteredDlqEntry> {
328 std::mem::take(&mut *self.filtered_dlq_buffer.lock())
329 }
330
331 async fn commit(&self, tokens: &[Self::Token]) -> TransportResult<()> {
332 if let Some(max_seq) = tokens.iter().map(|t| t.seq).max() {
333 let _ = self.committed_seq.fetch_max(max_seq, Ordering::Relaxed);
334 }
335 Ok(())
336 }
337}
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342
343 #[tokio::test]
344 async fn send_and_receive() {
345 let config = MemoryConfig::default();
346 let transport = MemoryTransport::new(&config)
347 .expect("memory transport with valid config must construct");
348 let result = transport.send("test-key", b"hello world").await;
350 assert!(result.is_ok());
351
352 let messages = transport.recv(10).await.unwrap();
354 assert_eq!(messages.len(), 1);
355 assert_eq!(messages[0].key.as_deref(), Some("test-key"));
356 assert_eq!(messages[0].payload, b"hello world");
357 }
358
359 #[tokio::test]
360 async fn inject_messages() {
361 let config = MemoryConfig::default();
362 let transport = MemoryTransport::new(&config)
363 .expect("memory transport with valid config must construct");
364 transport
366 .inject(Some("key1"), b"msg1".to_vec())
367 .await
368 .unwrap();
369 transport
370 .inject(Some("key2"), b"msg2".to_vec())
371 .await
372 .unwrap();
373
374 let messages = transport.recv(10).await.unwrap();
376 assert_eq!(messages.len(), 2);
377 }
378
379 #[tokio::test]
380 async fn commit_advances_sequence() {
381 let config = MemoryConfig::default();
382 let transport = MemoryTransport::new(&config)
383 .expect("memory transport with valid config must construct");
384 transport.inject(None, b"msg".to_vec()).await.unwrap();
385 let messages = transport.recv(1).await.unwrap();
386
387 let tokens: Vec<_> = messages.iter().map(|m| m.token).collect();
389 transport.commit(&tokens).await.unwrap();
390
391 assert_eq!(transport.committed_sequence(), 0);
393 }
394
395 #[tokio::test]
396 async fn close_prevents_operations() {
397 let config = MemoryConfig::default();
398 let transport = MemoryTransport::new(&config)
399 .expect("memory transport with valid config must construct");
400 transport.close().await.unwrap();
401 assert!(!transport.is_healthy());
402
403 let result = transport.send("key", b"data").await;
405 assert!(result.is_fatal());
406
407 let result = transport.recv(1).await;
409 assert!(result.is_err());
410 }
411
412 #[tokio::test]
413 async fn backpressure_on_full_channel() {
414 let config = MemoryConfig {
415 buffer_size: 1,
416 recv_timeout_ms: 0,
417 ..Default::default()
418 };
419 let transport = MemoryTransport::new(&config)
420 .expect("memory transport with valid config must construct");
421
422 let result1 = transport.send("key", b"msg1").await;
424 assert!(result1.is_ok());
425
426 let result2 = transport.send("key", b"msg2").await;
428 assert!(result2.is_backpressured());
429 }
430}