hyperi_rustlib/transport/memory/
mod.rs1mod token;
43
44pub use token::MemoryToken;
45
46use super::error::{TransportError, TransportResult};
47use super::traits::{RecvBatch, TransportBase, TransportReceiver, TransportSender};
48use super::types::{Message, PayloadFormat, SendResult};
49use super::work_batch::WorkBatch;
50use serde::{Deserialize, Serialize};
51use std::sync::Arc;
52use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
53use tokio::sync::mpsc;
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct MemoryConfig {
58 #[serde(default = "default_buffer_size")]
60 pub buffer_size: usize,
61
62 #[serde(default)]
64 pub recv_timeout_ms: u64,
65
66 #[serde(default)]
68 pub filters_in: Vec<super::filter::FilterRule>,
69
70 #[serde(default)]
72 pub filters_out: Vec<super::filter::FilterRule>,
73}
74
75fn default_buffer_size() -> usize {
76 1000
77}
78
79impl Default for MemoryConfig {
80 fn default() -> Self {
81 Self {
82 buffer_size: default_buffer_size(),
83 recv_timeout_ms: 0,
84 filters_in: Vec::new(),
85 filters_out: Vec::new(),
86 }
87 }
88}
89
90struct InternalMessage {
92 key: Option<Arc<str>>,
93 payload: bytes::Bytes,
94 seq: u64,
95 timestamp_ms: i64,
96}
97
98pub struct MemoryTransport {
102 sender: mpsc::Sender<InternalMessage>,
103 receiver: tokio::sync::Mutex<mpsc::Receiver<InternalMessage>>,
104 sequence: AtomicU64,
105 committed_seq: AtomicU64,
106 closed: AtomicBool,
107 recv_timeout_ms: u64,
108 filter_engine: super::filter::TransportFilterEngine,
109}
110
111impl MemoryTransport {
112 pub fn new(config: &MemoryConfig) -> super::error::TransportResult<Self> {
123 let (sender, receiver) = mpsc::channel(config.buffer_size);
124 let filter_engine = super::filter::TransportFilterEngine::new(
125 &config.filters_in,
126 &config.filters_out,
127 &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
128 )?;
129 Ok(Self {
130 sender,
131 receiver: tokio::sync::Mutex::new(receiver),
132 sequence: AtomicU64::new(0),
133 committed_seq: AtomicU64::new(0),
134 closed: AtomicBool::new(false),
135 recv_timeout_ms: config.recv_timeout_ms,
136 filter_engine,
137 })
138 }
139
140 #[must_use]
145 pub fn sender(&self) -> MemorySender<'_> {
146 MemorySender {
147 sender: self.sender.clone(),
148 sequence: &self.sequence,
149 }
150 }
151
152 pub async fn inject(&self, key: Option<&str>, payload: Vec<u8>) -> TransportResult<()> {
158 if self.closed.load(Ordering::Relaxed) {
159 return Err(TransportError::Closed);
160 }
161
162 let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
163 let timestamp_ms = chrono::Utc::now().timestamp_millis();
164
165 let msg = InternalMessage {
166 key: key.map(Arc::from),
167 payload: payload.into(),
168 seq,
169 timestamp_ms,
170 };
171
172 self.sender
173 .send(msg)
174 .await
175 .map_err(|_| TransportError::Send("channel closed".into()))
176 }
177
178 #[must_use]
180 pub fn committed_sequence(&self) -> u64 {
181 self.committed_seq.load(Ordering::Relaxed)
182 }
183}
184
185pub struct MemorySender<'a> {
187 sender: mpsc::Sender<InternalMessage>,
188 sequence: &'a AtomicU64,
189}
190
191impl MemorySender<'_> {
192 pub async fn send(&self, key: Option<&str>, payload: Vec<u8>) -> TransportResult<()> {
198 let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
199 let timestamp_ms = chrono::Utc::now().timestamp_millis();
200
201 let msg = InternalMessage {
202 key: key.map(Arc::from),
203 payload: payload.into(),
204 seq,
205 timestamp_ms,
206 };
207
208 self.sender
209 .send(msg)
210 .await
211 .map_err(|_| TransportError::Send("channel closed".into()))
212 }
213}
214
215impl TransportBase for MemoryTransport {
216 async fn close(&self) -> TransportResult<()> {
217 self.closed.store(true, Ordering::Relaxed);
218 Ok(())
219 }
220
221 fn is_healthy(&self) -> bool {
222 !self.closed.load(Ordering::Relaxed)
223 }
224
225 fn name(&self) -> &'static str {
226 "memory"
227 }
228}
229
230impl TransportSender for MemoryTransport {
231 async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
232 if self.closed.load(Ordering::Relaxed) {
233 return SendResult::Fatal(TransportError::Closed);
234 }
235
236 if self.filter_engine.has_outbound_filters() {
238 match self.filter_engine.apply_outbound(&payload) {
239 super::filter::FilterDisposition::Pass => {}
240 super::filter::FilterDisposition::Drop => return SendResult::Ok,
241 super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
242 }
243 }
244
245 let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
246 let timestamp_ms = chrono::Utc::now().timestamp_millis();
247
248 let msg = InternalMessage {
249 key: Some(Arc::from(key)),
250 payload,
251 seq,
252 timestamp_ms,
253 };
254
255 match self.sender.try_send(msg) {
256 Ok(()) => SendResult::Ok,
257 Err(mpsc::error::TrySendError::Full(_)) => SendResult::Backpressured,
258 Err(mpsc::error::TrySendError::Closed(_)) => SendResult::Fatal(TransportError::Closed),
259 }
260 }
261}
262
263impl TransportReceiver for MemoryTransport {
264 type Token = MemoryToken;
265
266 async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
267 if self.closed.load(Ordering::Relaxed) {
268 return Err(TransportError::Closed);
269 }
270
271 let mut receiver = self.receiver.lock().await;
272 let mut messages = Vec::with_capacity(max.min(100));
273
274 for _ in 0..max {
275 let result = if self.recv_timeout_ms == 0 {
276 match receiver.try_recv() {
277 Ok(msg) => Some(msg),
278 Err(mpsc::error::TryRecvError::Empty) => break,
279 Err(mpsc::error::TryRecvError::Disconnected) => {
280 return Err(TransportError::Closed);
281 }
282 }
283 } else if messages.is_empty() {
284 match tokio::time::timeout(
285 std::time::Duration::from_millis(self.recv_timeout_ms),
286 receiver.recv(),
287 )
288 .await
289 {
290 Ok(Some(msg)) => Some(msg),
291 Ok(None) => return Err(TransportError::Closed),
292 Err(_) => break,
293 }
294 } else {
295 match receiver.try_recv() {
296 Ok(msg) => Some(msg),
297 Err(_) => break,
298 }
299 };
300
301 if let Some(internal) = result {
302 let payload = internal.payload;
303 let format = PayloadFormat::detect(&payload);
304 messages.push(Message {
305 key: internal.key,
306 payload,
307 token: MemoryToken { seq: internal.seq },
308 timestamp_ms: Some(internal.timestamp_ms),
309 format,
310 });
311 }
312 }
313
314 let batch = self.filter_engine.partition_batch(
317 messages,
318 |m| m.payload.as_ref(),
319 |m| m.key.clone(),
320 |m| m.token,
321 );
322 let messages = batch.messages;
323 let dlq_entries = batch.dlq_entries;
324 let filtered_tokens = batch.filtered_tokens;
325
326 Ok(RecvBatch {
327 messages,
328 dlq_entries,
329 filtered_tokens,
330 }
331 .into())
332 }
333
334 async fn commit(&self, tokens: &[Self::Token]) -> TransportResult<()> {
335 if let Some(max_seq) = tokens.iter().map(|t| t.seq).max() {
336 let _ = self.committed_seq.fetch_max(max_seq, Ordering::Relaxed);
337 }
338 Ok(())
339 }
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345
346 #[tokio::test]
347 async fn send_and_receive() {
348 let config = MemoryConfig::default();
349 let transport = MemoryTransport::new(&config)
350 .expect("memory transport with valid config must construct");
351 let result = transport
353 .send("test-key", bytes::Bytes::from_static(b"hello world"))
354 .await;
355 assert!(result.is_ok());
356
357 let records = transport.recv(10).await.unwrap().records;
359 assert_eq!(records.len(), 1);
360 assert_eq!(records[0].key.as_deref(), Some("test-key"));
361 assert_eq!(records[0].payload.as_ref(), b"hello world");
362 }
363
364 #[tokio::test]
368 async fn send_batch_default_fallback_sends_each_record() {
369 use super::super::work_batch::{Record, RecordMeta};
370
371 let transport = MemoryTransport::new(&MemoryConfig::default())
372 .expect("memory transport with valid config must construct");
373
374 let records: Vec<Record> = (0..3)
375 .map(|i| Record {
376 payload: bytes::Bytes::from(format!(r#"{{"id":{i}}}"#)),
377 key: Some(Arc::from(format!("k{i}").as_str())),
378 headers: Vec::new(),
379 metadata: RecordMeta {
380 timestamp_ms: None,
381 format: PayloadFormat::Json,
382 },
383 })
384 .collect();
385
386 let result = transport.send_batch(&records).await;
388 assert!(
389 result.is_ok(),
390 "default send_batch must succeed: {result:?}"
391 );
392
393 let got = transport.recv(10).await.unwrap().records;
395 assert_eq!(got.len(), 3, "every record in the block was sent");
396 assert_eq!(got[0].key.as_deref(), Some("k0"));
397 assert_eq!(got[0].payload.as_ref(), br#"{"id":0}"#);
398 assert_eq!(got[2].key.as_deref(), Some("k2"));
399 assert_eq!(got[2].payload.as_ref(), br#"{"id":2}"#);
400 }
401
402 #[tokio::test]
406 async fn send_batch_default_short_circuits_on_error() {
407 use super::super::work_batch::{Record, RecordMeta};
408
409 let transport = MemoryTransport::new(&MemoryConfig::default())
410 .expect("memory transport with valid config must construct");
411 transport.close().await.unwrap();
412
413 let records = vec![Record {
414 payload: bytes::Bytes::from_static(b"{}"),
415 key: None,
416 headers: Vec::new(),
417 metadata: RecordMeta {
418 timestamp_ms: None,
419 format: PayloadFormat::Json,
420 },
421 }];
422
423 let result = transport.send_batch(&records).await;
424 assert!(
425 result.is_fatal(),
426 "closed transport must surface the send failure, got {result:?}"
427 );
428
429 assert!(transport.send_batch(&[]).await.is_ok());
431 }
432
433 #[tokio::test]
434 async fn inject_messages() {
435 let config = MemoryConfig::default();
436 let transport = MemoryTransport::new(&config)
437 .expect("memory transport with valid config must construct");
438 transport
440 .inject(Some("key1"), b"msg1".to_vec())
441 .await
442 .unwrap();
443 transport
444 .inject(Some("key2"), b"msg2".to_vec())
445 .await
446 .unwrap();
447
448 let records = transport.recv(10).await.unwrap().records;
450 assert_eq!(records.len(), 2);
451 }
452
453 #[tokio::test]
454 async fn commit_advances_sequence() {
455 let config = MemoryConfig::default();
456 let transport = MemoryTransport::new(&config)
457 .expect("memory transport with valid config must construct");
458 transport.inject(None, b"msg".to_vec()).await.unwrap();
459 let batch = transport.recv(1).await.unwrap();
460
461 transport.commit(&batch.commit_tokens).await.unwrap();
463
464 assert_eq!(transport.committed_sequence(), 0);
466 }
467
468 #[tokio::test]
469 async fn close_prevents_operations() {
470 let config = MemoryConfig::default();
471 let transport = MemoryTransport::new(&config)
472 .expect("memory transport with valid config must construct");
473 transport.close().await.unwrap();
474 assert!(!transport.is_healthy());
475
476 let result = transport
478 .send("key", bytes::Bytes::from_static(b"data"))
479 .await;
480 assert!(result.is_fatal());
481
482 let result = transport.recv(1).await;
484 assert!(result.is_err());
485 }
486
487 #[tokio::test]
488 async fn backpressure_on_full_channel() {
489 let config = MemoryConfig {
490 buffer_size: 1,
491 recv_timeout_ms: 0,
492 ..Default::default()
493 };
494 let transport = MemoryTransport::new(&config)
495 .expect("memory transport with valid config must construct");
496
497 let result1 = transport
499 .send("key", bytes::Bytes::from_static(b"msg1"))
500 .await;
501 assert!(result1.is_ok());
502
503 let result2 = transport
505 .send("key", bytes::Bytes::from_static(b"msg2"))
506 .await;
507 assert!(result2.is_backpressured());
508 }
509}