hyperi_rustlib/transport/memory/
mod.rs1mod token;
31
32pub use token::MemoryToken;
33
34use super::error::{TransportError, TransportResult};
35use super::traits::{RecvBatch, TransportBase, TransportReceiver, TransportSender};
36use super::types::{Message, PayloadFormat, SendResult};
37use super::work_batch::WorkBatch;
38use serde::{Deserialize, Serialize};
39use std::sync::Arc;
40use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
41use tokio::sync::mpsc;
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct MemoryConfig {
46 #[serde(default = "default_buffer_size")]
48 pub buffer_size: usize,
49
50 #[serde(default)]
52 pub recv_timeout_ms: u64,
53
54 #[serde(default)]
56 pub filters_in: Vec<super::filter::FilterRule>,
57
58 #[serde(default)]
60 pub filters_out: Vec<super::filter::FilterRule>,
61}
62
63fn default_buffer_size() -> usize {
64 1000
65}
66
67impl Default for MemoryConfig {
68 fn default() -> Self {
69 Self {
70 buffer_size: default_buffer_size(),
71 recv_timeout_ms: 0,
72 filters_in: Vec::new(),
73 filters_out: Vec::new(),
74 }
75 }
76}
77
78struct InternalMessage {
80 key: Option<Arc<str>>,
81 payload: bytes::Bytes,
82 seq: u64,
83 timestamp_ms: i64,
84}
85
86pub struct MemoryTransport {
90 sender: mpsc::Sender<InternalMessage>,
91 receiver: tokio::sync::Mutex<mpsc::Receiver<InternalMessage>>,
92 sequence: AtomicU64,
93 committed_seq: AtomicU64,
94 closed: AtomicBool,
95 recv_timeout_ms: u64,
96 filter_engine: super::filter::TransportFilterEngine,
97}
98
99impl MemoryTransport {
100 pub fn new(config: &MemoryConfig) -> super::error::TransportResult<Self> {
111 let (sender, receiver) = mpsc::channel(config.buffer_size);
112 let filter_engine = super::filter::TransportFilterEngine::new(
113 &config.filters_in,
114 &config.filters_out,
115 &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
116 )?;
117 Ok(Self {
118 sender,
119 receiver: tokio::sync::Mutex::new(receiver),
120 sequence: AtomicU64::new(0),
121 committed_seq: AtomicU64::new(0),
122 closed: AtomicBool::new(false),
123 recv_timeout_ms: config.recv_timeout_ms,
124 filter_engine,
125 })
126 }
127
128 #[must_use]
133 pub fn sender(&self) -> MemorySender<'_> {
134 MemorySender {
135 sender: self.sender.clone(),
136 sequence: &self.sequence,
137 }
138 }
139
140 pub async fn inject(&self, key: Option<&str>, payload: Vec<u8>) -> TransportResult<()> {
146 if self.closed.load(Ordering::Relaxed) {
147 return Err(TransportError::Closed);
148 }
149
150 let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
151 let timestamp_ms = chrono::Utc::now().timestamp_millis();
152
153 let msg = InternalMessage {
154 key: key.map(Arc::from),
155 payload: payload.into(),
156 seq,
157 timestamp_ms,
158 };
159
160 self.sender
161 .send(msg)
162 .await
163 .map_err(|_| TransportError::Send("channel closed".into()))
164 }
165
166 #[must_use]
168 pub fn committed_sequence(&self) -> u64 {
169 self.committed_seq.load(Ordering::Relaxed)
170 }
171}
172
173pub struct MemorySender<'a> {
175 sender: mpsc::Sender<InternalMessage>,
176 sequence: &'a AtomicU64,
177}
178
179impl MemorySender<'_> {
180 pub async fn send(&self, key: Option<&str>, payload: Vec<u8>) -> TransportResult<()> {
186 let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
187 let timestamp_ms = chrono::Utc::now().timestamp_millis();
188
189 let msg = InternalMessage {
190 key: key.map(Arc::from),
191 payload: payload.into(),
192 seq,
193 timestamp_ms,
194 };
195
196 self.sender
197 .send(msg)
198 .await
199 .map_err(|_| TransportError::Send("channel closed".into()))
200 }
201}
202
203impl TransportBase for MemoryTransport {
204 async fn close(&self) -> TransportResult<()> {
205 self.closed.store(true, Ordering::Relaxed);
206 Ok(())
207 }
208
209 fn is_healthy(&self) -> bool {
210 !self.closed.load(Ordering::Relaxed)
211 }
212
213 fn name(&self) -> &'static str {
214 "memory"
215 }
216}
217
218impl TransportSender for MemoryTransport {
219 async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
220 if self.closed.load(Ordering::Relaxed) {
221 return SendResult::Fatal(TransportError::Closed);
222 }
223
224 if self.filter_engine.has_outbound_filters() {
226 match self.filter_engine.apply_outbound(&payload) {
227 super::filter::FilterDisposition::Pass => {}
228 super::filter::FilterDisposition::Drop => return SendResult::Ok,
229 super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
230 }
231 }
232
233 let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
234 let timestamp_ms = chrono::Utc::now().timestamp_millis();
235
236 let msg = InternalMessage {
237 key: Some(Arc::from(key)),
238 payload,
239 seq,
240 timestamp_ms,
241 };
242
243 match self.sender.try_send(msg) {
244 Ok(()) => SendResult::Ok,
245 Err(mpsc::error::TrySendError::Full(_)) => SendResult::Backpressured,
246 Err(mpsc::error::TrySendError::Closed(_)) => SendResult::Fatal(TransportError::Closed),
247 }
248 }
249}
250
251impl TransportReceiver for MemoryTransport {
252 type Token = MemoryToken;
253
254 async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
255 if self.closed.load(Ordering::Relaxed) {
256 return Err(TransportError::Closed);
257 }
258
259 let mut receiver = self.receiver.lock().await;
260 let mut messages = Vec::with_capacity(max.min(100));
261
262 for _ in 0..max {
263 let result = if self.recv_timeout_ms == 0 {
264 match receiver.try_recv() {
265 Ok(msg) => Some(msg),
266 Err(mpsc::error::TryRecvError::Empty) => break,
267 Err(mpsc::error::TryRecvError::Disconnected) => {
268 return Err(TransportError::Closed);
269 }
270 }
271 } else if messages.is_empty() {
272 match tokio::time::timeout(
273 std::time::Duration::from_millis(self.recv_timeout_ms),
274 receiver.recv(),
275 )
276 .await
277 {
278 Ok(Some(msg)) => Some(msg),
279 Ok(None) => return Err(TransportError::Closed),
280 Err(_) => break,
281 }
282 } else {
283 match receiver.try_recv() {
284 Ok(msg) => Some(msg),
285 Err(_) => break,
286 }
287 };
288
289 if let Some(internal) = result {
290 let payload = internal.payload;
291 let format = PayloadFormat::detect(&payload);
292 messages.push(Message {
293 key: internal.key,
294 payload,
295 token: MemoryToken { seq: internal.seq },
296 timestamp_ms: Some(internal.timestamp_ms),
297 format,
298 });
299 }
300 }
301
302 let batch = self.filter_engine.partition_batch(
305 messages,
306 |m| m.payload.as_ref(),
307 |m| m.key.clone(),
308 |m| m.token,
309 );
310 let messages = batch.messages;
311 let dlq_entries = batch.dlq_entries;
312 let filtered_tokens = batch.filtered_tokens;
313
314 Ok(RecvBatch {
315 messages,
316 dlq_entries,
317 filtered_tokens,
318 }
319 .into())
320 }
321
322 async fn commit(&self, tokens: &[Self::Token]) -> TransportResult<()> {
323 if let Some(max_seq) = tokens.iter().map(|t| t.seq).max() {
324 let _ = self.committed_seq.fetch_max(max_seq, Ordering::Relaxed);
325 }
326 Ok(())
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333
334 #[tokio::test]
335 async fn send_and_receive() {
336 let config = MemoryConfig::default();
337 let transport = MemoryTransport::new(&config)
338 .expect("memory transport with valid config must construct");
339 let result = transport
341 .send("test-key", bytes::Bytes::from_static(b"hello world"))
342 .await;
343 assert!(result.is_ok());
344
345 let records = transport.recv(10).await.unwrap().records;
347 assert_eq!(records.len(), 1);
348 assert_eq!(records[0].key.as_deref(), Some("test-key"));
349 assert_eq!(records[0].payload.as_ref(), b"hello world");
350 }
351
352 #[tokio::test]
356 async fn send_batch_default_fallback_sends_each_record() {
357 use super::super::work_batch::{Record, RecordMeta};
358
359 let transport = MemoryTransport::new(&MemoryConfig::default())
360 .expect("memory transport with valid config must construct");
361
362 let records: Vec<Record> = (0..3)
363 .map(|i| Record {
364 payload: bytes::Bytes::from(format!(r#"{{"id":{i}}}"#)),
365 key: Some(Arc::from(format!("k{i}").as_str())),
366 headers: Vec::new(),
367 metadata: RecordMeta {
368 timestamp_ms: None,
369 format: PayloadFormat::Json,
370 },
371 })
372 .collect();
373
374 let result = transport.send_batch(&records).await;
376 assert!(
377 result.is_ok(),
378 "default send_batch must succeed: {result:?}"
379 );
380
381 let got = transport.recv(10).await.unwrap().records;
383 assert_eq!(got.len(), 3, "every record in the block was sent");
384 assert_eq!(got[0].key.as_deref(), Some("k0"));
385 assert_eq!(got[0].payload.as_ref(), br#"{"id":0}"#);
386 assert_eq!(got[2].key.as_deref(), Some("k2"));
387 assert_eq!(got[2].payload.as_ref(), br#"{"id":2}"#);
388 }
389
390 #[tokio::test]
394 async fn send_batch_default_short_circuits_on_error() {
395 use super::super::work_batch::{Record, RecordMeta};
396
397 let transport = MemoryTransport::new(&MemoryConfig::default())
398 .expect("memory transport with valid config must construct");
399 transport.close().await.unwrap();
400
401 let records = vec![Record {
402 payload: bytes::Bytes::from_static(b"{}"),
403 key: None,
404 headers: Vec::new(),
405 metadata: RecordMeta {
406 timestamp_ms: None,
407 format: PayloadFormat::Json,
408 },
409 }];
410
411 let result = transport.send_batch(&records).await;
412 assert!(
413 result.is_fatal(),
414 "closed transport must surface the send failure, got {result:?}"
415 );
416
417 assert!(transport.send_batch(&[]).await.is_ok());
419 }
420
421 #[tokio::test]
422 async fn inject_messages() {
423 let config = MemoryConfig::default();
424 let transport = MemoryTransport::new(&config)
425 .expect("memory transport with valid config must construct");
426 transport
428 .inject(Some("key1"), b"msg1".to_vec())
429 .await
430 .unwrap();
431 transport
432 .inject(Some("key2"), b"msg2".to_vec())
433 .await
434 .unwrap();
435
436 let records = transport.recv(10).await.unwrap().records;
438 assert_eq!(records.len(), 2);
439 }
440
441 #[tokio::test]
442 async fn commit_advances_sequence() {
443 let config = MemoryConfig::default();
444 let transport = MemoryTransport::new(&config)
445 .expect("memory transport with valid config must construct");
446 transport.inject(None, b"msg".to_vec()).await.unwrap();
447 let batch = transport.recv(1).await.unwrap();
448
449 transport.commit(&batch.commit_tokens).await.unwrap();
451
452 assert_eq!(transport.committed_sequence(), 0);
454 }
455
456 #[tokio::test]
457 async fn close_prevents_operations() {
458 let config = MemoryConfig::default();
459 let transport = MemoryTransport::new(&config)
460 .expect("memory transport with valid config must construct");
461 transport.close().await.unwrap();
462 assert!(!transport.is_healthy());
463
464 let result = transport
466 .send("key", bytes::Bytes::from_static(b"data"))
467 .await;
468 assert!(result.is_fatal());
469
470 let result = transport.recv(1).await;
472 assert!(result.is_err());
473 }
474
475 #[tokio::test]
476 async fn backpressure_on_full_channel() {
477 let config = MemoryConfig {
478 buffer_size: 1,
479 recv_timeout_ms: 0,
480 ..Default::default()
481 };
482 let transport = MemoryTransport::new(&config)
483 .expect("memory transport with valid config must construct");
484
485 let result1 = transport
487 .send("key", bytes::Bytes::from_static(b"msg1"))
488 .await;
489 assert!(result1.is_ok());
490
491 let result2 = transport
493 .send("key", bytes::Bytes::from_static(b"msg2"))
494 .await;
495 assert!(result2.is_backpressured());
496 }
497}