hyperi_rustlib/transport/memory/
mod.rs1mod token;
31
32pub use token::MemoryToken;
33
34use super::error::{TransportError, TransportResult};
35use super::traits::{RecvBatch, TransportBase, TransportReceiver, TransportSender};
36use super::types::{Message, PayloadFormat, SendResult};
37use super::work_batch::WorkBatch;
38use serde::{Deserialize, Serialize};
39use std::sync::Arc;
40use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
41use tokio::sync::mpsc;
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct MemoryConfig {
46 #[serde(default = "default_buffer_size")]
48 pub buffer_size: usize,
49
50 #[serde(default)]
52 pub recv_timeout_ms: u64,
53
54 #[serde(default)]
56 pub filters_in: Vec<super::filter::FilterRule>,
57
58 #[serde(default)]
60 pub filters_out: Vec<super::filter::FilterRule>,
61}
62
63fn default_buffer_size() -> usize {
64 1000
65}
66
67impl Default for MemoryConfig {
68 fn default() -> Self {
69 Self {
70 buffer_size: default_buffer_size(),
71 recv_timeout_ms: 0,
72 filters_in: Vec::new(),
73 filters_out: Vec::new(),
74 }
75 }
76}
77
78struct InternalMessage {
80 key: Option<Arc<str>>,
81 payload: Vec<u8>,
82 seq: u64,
83 timestamp_ms: i64,
84}
85
86pub struct MemoryTransport {
90 sender: mpsc::Sender<InternalMessage>,
91 receiver: tokio::sync::Mutex<mpsc::Receiver<InternalMessage>>,
92 sequence: AtomicU64,
93 committed_seq: AtomicU64,
94 closed: AtomicBool,
95 recv_timeout_ms: u64,
96 filter_engine: super::filter::TransportFilterEngine,
97}
98
99impl MemoryTransport {
100 pub fn new(config: &MemoryConfig) -> super::error::TransportResult<Self> {
111 let (sender, receiver) = mpsc::channel(config.buffer_size);
112 let filter_engine = super::filter::TransportFilterEngine::new(
113 &config.filters_in,
114 &config.filters_out,
115 &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
116 )?;
117 Ok(Self {
118 sender,
119 receiver: tokio::sync::Mutex::new(receiver),
120 sequence: AtomicU64::new(0),
121 committed_seq: AtomicU64::new(0),
122 closed: AtomicBool::new(false),
123 recv_timeout_ms: config.recv_timeout_ms,
124 filter_engine,
125 })
126 }
127
128 #[must_use]
133 pub fn sender(&self) -> MemorySender<'_> {
134 MemorySender {
135 sender: self.sender.clone(),
136 sequence: &self.sequence,
137 }
138 }
139
140 pub async fn inject(&self, key: Option<&str>, payload: Vec<u8>) -> TransportResult<()> {
146 if self.closed.load(Ordering::Relaxed) {
147 return Err(TransportError::Closed);
148 }
149
150 let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
151 let timestamp_ms = chrono::Utc::now().timestamp_millis();
152
153 let msg = InternalMessage {
154 key: key.map(Arc::from),
155 payload,
156 seq,
157 timestamp_ms,
158 };
159
160 self.sender
161 .send(msg)
162 .await
163 .map_err(|_| TransportError::Send("channel closed".into()))
164 }
165
166 #[must_use]
168 pub fn committed_sequence(&self) -> u64 {
169 self.committed_seq.load(Ordering::Relaxed)
170 }
171}
172
173pub struct MemorySender<'a> {
175 sender: mpsc::Sender<InternalMessage>,
176 sequence: &'a AtomicU64,
177}
178
179impl MemorySender<'_> {
180 pub async fn send(&self, key: Option<&str>, payload: Vec<u8>) -> TransportResult<()> {
186 let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
187 let timestamp_ms = chrono::Utc::now().timestamp_millis();
188
189 let msg = InternalMessage {
190 key: key.map(Arc::from),
191 payload,
192 seq,
193 timestamp_ms,
194 };
195
196 self.sender
197 .send(msg)
198 .await
199 .map_err(|_| TransportError::Send("channel closed".into()))
200 }
201}
202
203impl TransportBase for MemoryTransport {
204 async fn close(&self) -> TransportResult<()> {
205 self.closed.store(true, Ordering::Relaxed);
206 Ok(())
207 }
208
209 fn is_healthy(&self) -> bool {
210 !self.closed.load(Ordering::Relaxed)
211 }
212
213 fn name(&self) -> &'static str {
214 "memory"
215 }
216}
217
218impl TransportSender for MemoryTransport {
219 async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
220 if self.closed.load(Ordering::Relaxed) {
221 return SendResult::Fatal(TransportError::Closed);
222 }
223
224 if self.filter_engine.has_outbound_filters() {
226 match self.filter_engine.apply_outbound(&payload) {
227 super::filter::FilterDisposition::Pass => {}
228 super::filter::FilterDisposition::Drop => return SendResult::Ok,
229 super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
230 }
231 }
232
233 let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
234 let timestamp_ms = chrono::Utc::now().timestamp_millis();
235
236 let msg = InternalMessage {
237 key: Some(Arc::from(key)),
238 payload: payload.to_vec(),
239 seq,
240 timestamp_ms,
241 };
242
243 match self.sender.try_send(msg) {
244 Ok(()) => SendResult::Ok,
245 Err(mpsc::error::TrySendError::Full(_)) => SendResult::Backpressured,
246 Err(mpsc::error::TrySendError::Closed(_)) => SendResult::Fatal(TransportError::Closed),
247 }
248 }
249}
250
251impl TransportReceiver for MemoryTransport {
252 type Token = MemoryToken;
253
254 async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
255 if self.closed.load(Ordering::Relaxed) {
256 return Err(TransportError::Closed);
257 }
258
259 let mut receiver = self.receiver.lock().await;
260 let mut messages = Vec::with_capacity(max.min(100));
261
262 for _ in 0..max {
263 let result = if self.recv_timeout_ms == 0 {
264 match receiver.try_recv() {
265 Ok(msg) => Some(msg),
266 Err(mpsc::error::TryRecvError::Empty) => break,
267 Err(mpsc::error::TryRecvError::Disconnected) => {
268 return Err(TransportError::Closed);
269 }
270 }
271 } else if messages.is_empty() {
272 match tokio::time::timeout(
273 std::time::Duration::from_millis(self.recv_timeout_ms),
274 receiver.recv(),
275 )
276 .await
277 {
278 Ok(Some(msg)) => Some(msg),
279 Ok(None) => return Err(TransportError::Closed),
280 Err(_) => break,
281 }
282 } else {
283 match receiver.try_recv() {
284 Ok(msg) => Some(msg),
285 Err(_) => break,
286 }
287 };
288
289 if let Some(internal) = result {
290 let payload: bytes::Bytes = internal.payload.into();
291 let format = PayloadFormat::detect(&payload);
292 messages.push(Message {
293 key: internal.key,
294 payload,
295 token: MemoryToken { seq: internal.seq },
296 timestamp_ms: Some(internal.timestamp_ms),
297 format,
298 });
299 }
300 }
301
302 let batch =
305 self.filter_engine
306 .partition_batch(messages, |m| m.payload.as_ref(), |m| m.key.clone());
307 let messages = batch.messages;
308 let dlq_entries = batch.dlq_entries;
309
310 Ok(RecvBatch {
311 messages,
312 dlq_entries,
313 }
314 .into())
315 }
316
317 async fn commit(&self, tokens: &[Self::Token]) -> TransportResult<()> {
318 if let Some(max_seq) = tokens.iter().map(|t| t.seq).max() {
319 let _ = self.committed_seq.fetch_max(max_seq, Ordering::Relaxed);
320 }
321 Ok(())
322 }
323}
324
325#[cfg(test)]
326mod tests {
327 use super::*;
328
329 #[tokio::test]
330 async fn send_and_receive() {
331 let config = MemoryConfig::default();
332 let transport = MemoryTransport::new(&config)
333 .expect("memory transport with valid config must construct");
334 let result = transport
336 .send("test-key", bytes::Bytes::from_static(b"hello world"))
337 .await;
338 assert!(result.is_ok());
339
340 let records = transport.recv(10).await.unwrap().records;
342 assert_eq!(records.len(), 1);
343 assert_eq!(records[0].key.as_deref(), Some("test-key"));
344 assert_eq!(records[0].payload.as_ref(), b"hello world");
345 }
346
347 #[tokio::test]
351 async fn send_batch_default_fallback_sends_each_record() {
352 use super::super::work_batch::{Record, RecordMeta};
353
354 let transport = MemoryTransport::new(&MemoryConfig::default())
355 .expect("memory transport with valid config must construct");
356
357 let records: Vec<Record> = (0..3)
358 .map(|i| Record {
359 payload: bytes::Bytes::from(format!(r#"{{"id":{i}}}"#)),
360 key: Some(Arc::from(format!("k{i}").as_str())),
361 headers: Vec::new(),
362 metadata: RecordMeta {
363 timestamp_ms: None,
364 format: PayloadFormat::Json,
365 },
366 })
367 .collect();
368
369 let result = transport.send_batch(&records).await;
371 assert!(
372 result.is_ok(),
373 "default send_batch must succeed: {result:?}"
374 );
375
376 let got = transport.recv(10).await.unwrap().records;
378 assert_eq!(got.len(), 3, "every record in the block was sent");
379 assert_eq!(got[0].key.as_deref(), Some("k0"));
380 assert_eq!(got[0].payload.as_ref(), br#"{"id":0}"#);
381 assert_eq!(got[2].key.as_deref(), Some("k2"));
382 assert_eq!(got[2].payload.as_ref(), br#"{"id":2}"#);
383 }
384
385 #[tokio::test]
389 async fn send_batch_default_short_circuits_on_error() {
390 use super::super::work_batch::{Record, RecordMeta};
391
392 let transport = MemoryTransport::new(&MemoryConfig::default())
393 .expect("memory transport with valid config must construct");
394 transport.close().await.unwrap();
395
396 let records = vec![Record {
397 payload: bytes::Bytes::from_static(b"{}"),
398 key: None,
399 headers: Vec::new(),
400 metadata: RecordMeta {
401 timestamp_ms: None,
402 format: PayloadFormat::Json,
403 },
404 }];
405
406 let result = transport.send_batch(&records).await;
407 assert!(
408 result.is_fatal(),
409 "closed transport must surface the send failure, got {result:?}"
410 );
411
412 assert!(transport.send_batch(&[]).await.is_ok());
414 }
415
416 #[tokio::test]
417 async fn inject_messages() {
418 let config = MemoryConfig::default();
419 let transport = MemoryTransport::new(&config)
420 .expect("memory transport with valid config must construct");
421 transport
423 .inject(Some("key1"), b"msg1".to_vec())
424 .await
425 .unwrap();
426 transport
427 .inject(Some("key2"), b"msg2".to_vec())
428 .await
429 .unwrap();
430
431 let records = transport.recv(10).await.unwrap().records;
433 assert_eq!(records.len(), 2);
434 }
435
436 #[tokio::test]
437 async fn commit_advances_sequence() {
438 let config = MemoryConfig::default();
439 let transport = MemoryTransport::new(&config)
440 .expect("memory transport with valid config must construct");
441 transport.inject(None, b"msg".to_vec()).await.unwrap();
442 let batch = transport.recv(1).await.unwrap();
443
444 transport.commit(&batch.commit_tokens).await.unwrap();
446
447 assert_eq!(transport.committed_sequence(), 0);
449 }
450
451 #[tokio::test]
452 async fn close_prevents_operations() {
453 let config = MemoryConfig::default();
454 let transport = MemoryTransport::new(&config)
455 .expect("memory transport with valid config must construct");
456 transport.close().await.unwrap();
457 assert!(!transport.is_healthy());
458
459 let result = transport
461 .send("key", bytes::Bytes::from_static(b"data"))
462 .await;
463 assert!(result.is_fatal());
464
465 let result = transport.recv(1).await;
467 assert!(result.is_err());
468 }
469
470 #[tokio::test]
471 async fn backpressure_on_full_channel() {
472 let config = MemoryConfig {
473 buffer_size: 1,
474 recv_timeout_ms: 0,
475 ..Default::default()
476 };
477 let transport = MemoryTransport::new(&config)
478 .expect("memory transport with valid config must construct");
479
480 let result1 = transport
482 .send("key", bytes::Bytes::from_static(b"msg1"))
483 .await;
484 assert!(result1.is_ok());
485
486 let result2 = transport
488 .send("key", bytes::Bytes::from_static(b"msg2"))
489 .await;
490 assert!(result2.is_backpressured());
491 }
492}