hyperi_rustlib/transport/
redis_transport.rs1use super::error::{TransportError, TransportResult};
43use super::traits::{CommitToken, TransportBase, TransportReceiver, TransportSender};
44use super::types::{Message, PayloadFormat, SendResult};
45use redis::AsyncCommands;
46use redis::streams::{StreamMaxlen, StreamReadOptions, StreamReadReply};
47use serde::{Deserialize, Serialize};
48use std::sync::Arc;
49use std::sync::atomic::{AtomicBool, Ordering};
50use tokio::sync::Mutex;
51
52#[derive(Debug, Clone, PartialEq, Eq, Hash)]
56pub struct RedisToken {
57 pub stream: Arc<str>,
59 pub entry_id: String,
61}
62
63impl CommitToken for RedisToken {}
64
65impl std::fmt::Display for RedisToken {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 write!(f, "redis:{}:{}", self.stream, self.entry_id)
68 }
69}
70
71fn default_url() -> String {
72 "redis://127.0.0.1:6379".into()
73}
74
75fn default_group() -> String {
76 "dfe".into()
77}
78
79fn default_consumer() -> String {
80 "consumer-1".into()
81}
82
83fn default_block_ms() -> usize {
84 5000
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct RedisTransportConfig {
90 #[serde(default = "default_url")]
95 pub url: String,
96
97 #[serde(default)]
101 pub stream: Option<String>,
102
103 #[serde(default = "default_group")]
105 pub group: String,
106
107 #[serde(default = "default_consumer")]
109 pub consumer: String,
110
111 #[serde(default)]
115 pub max_stream_len: Option<usize>,
116
117 #[serde(default = "default_block_ms")]
119 pub block_ms: usize,
120
121 #[serde(default)]
123 pub filters_in: Vec<super::filter::FilterRule>,
124
125 #[serde(default)]
127 pub filters_out: Vec<super::filter::FilterRule>,
128}
129
130impl Default for RedisTransportConfig {
131 fn default() -> Self {
132 Self {
133 url: default_url(),
134 stream: None,
135 group: default_group(),
136 consumer: default_consumer(),
137 max_stream_len: None,
138 block_ms: default_block_ms(),
139 filters_in: Vec::new(),
140 filters_out: Vec::new(),
141 }
142 }
143}
144
145impl RedisTransportConfig {
146 #[must_use]
148 pub fn from_cascade() -> Self {
149 #[cfg(feature = "config")]
150 {
151 if let Some(cfg) = crate::config::try_get()
152 && let Ok(tc) = cfg.unmarshal_key_registered::<Self>("transport.redis")
153 {
154 return tc;
155 }
156 }
157 Self::default()
158 }
159}
160
161pub struct RedisTransport {
166 conn: Mutex<redis::aio::MultiplexedConnection>,
167 config: RedisTransportConfig,
168 closed: Arc<AtomicBool>,
169 group_created: Mutex<std::collections::HashSet<String>>,
171 filter_engine: super::filter::TransportFilterEngine,
173 filtered_dlq_buffer: parking_lot::Mutex<Vec<super::filter::FilteredDlqEntry>>,
176}
177
178impl RedisTransport {
179 pub async fn new(config: &RedisTransportConfig) -> TransportResult<Self> {
188 let client = redis::Client::open(config.url.as_str()).map_err(|e| {
189 TransportError::Config(format!("invalid Redis URL '{}': {e}", config.url))
190 })?;
191
192 let conn = client
193 .get_multiplexed_async_connection()
194 .await
195 .map_err(|e| {
196 TransportError::Connection(format!(
197 "failed to connect to Redis at '{}': {e}",
198 config.url
199 ))
200 })?;
201
202 #[cfg(feature = "logger")]
203 tracing::info!(
204 url = %config.url,
205 stream = ?config.stream,
206 group = %config.group,
207 "Redis transport opened"
208 );
209
210 let filter_engine = super::filter::TransportFilterEngine::new(
211 &config.filters_in,
212 &config.filters_out,
213 &crate::transport::filter::TransportFilterTierConfig::default(),
214 )?;
215
216 let closed = Arc::new(AtomicBool::new(false));
217
218 #[cfg(feature = "health")]
219 {
220 let h = Arc::clone(&closed);
221 crate::health::HealthRegistry::register("transport:redis", move || {
222 if h.load(Ordering::Relaxed) {
223 crate::health::HealthStatus::Unhealthy
224 } else {
225 crate::health::HealthStatus::Healthy
226 }
227 });
228 }
229
230 Ok(Self {
231 conn: Mutex::new(conn),
232 config: config.clone(),
233 closed,
234 group_created: Mutex::new(std::collections::HashSet::new()),
235 filter_engine,
236 filtered_dlq_buffer: parking_lot::Mutex::new(Vec::new()),
237 })
238 }
239
240 fn resolve_stream<'a>(&'a self, key: &'a str) -> Result<&'a str, TransportError> {
242 if !key.is_empty() {
243 return Ok(key);
244 }
245 self.config.stream.as_deref().ok_or_else(|| {
246 TransportError::Config(
247 "no stream name: key is empty and config.stream is not set".into(),
248 )
249 })
250 }
251
252 async fn ensure_group(&self, stream: &str) -> TransportResult<()> {
258 {
259 let created = self.group_created.lock().await;
260 if created.contains(stream) {
261 return Ok(());
262 }
263 }
264
265 let mut conn = self.conn.lock().await;
266 let result: redis::RedisResult<()> = conn
267 .xgroup_create_mkstream(stream, &self.config.group, "0")
268 .await;
269
270 match result {
271 Ok(()) => {}
272 Err(e) => {
273 let msg = e.to_string();
275 if !msg.contains("BUSYGROUP") {
276 return Err(TransportError::Connection(format!(
277 "failed to create consumer group '{}' on stream '{stream}': {e}",
278 self.config.group
279 )));
280 }
281 }
282 }
283
284 self.group_created.lock().await.insert(stream.to_string());
285 Ok(())
286 }
287}
288
289impl TransportBase for RedisTransport {
290 async fn close(&self) -> TransportResult<()> {
291 self.closed.store(true, Ordering::Relaxed);
292 Ok(())
293 }
294
295 fn is_healthy(&self) -> bool {
296 !self.closed.load(Ordering::Relaxed)
297 }
298
299 fn name(&self) -> &'static str {
300 "redis"
301 }
302}
303
304impl TransportSender for RedisTransport {
305 async fn send(&self, key: &str, payload: &[u8]) -> SendResult {
306 if self.closed.load(Ordering::Relaxed) {
307 return SendResult::Fatal(TransportError::Closed);
308 }
309
310 if self.filter_engine.has_outbound_filters() {
312 match self.filter_engine.apply_outbound(payload) {
313 super::filter::FilterDisposition::Pass => {}
314 super::filter::FilterDisposition::Drop => return SendResult::Ok,
315 super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
316 }
317 }
318
319 let stream = match self.resolve_stream(key) {
320 Ok(s) => s.to_string(),
321 Err(e) => return SendResult::Fatal(e),
322 };
323
324 let mut conn = self.conn.lock().await;
325
326 let result: redis::RedisResult<String> = if let Some(max_len) = self.config.max_stream_len {
327 conn.xadd_maxlen(
328 &stream,
329 StreamMaxlen::Approx(max_len),
330 "*",
331 &[("payload", payload)],
332 )
333 .await
334 } else {
335 conn.xadd(&stream, "*", &[("payload", payload)]).await
336 };
337
338 match result {
339 Ok(_entry_id) => {
340 #[cfg(feature = "logger")]
341 tracing::debug!(stream = %stream, "Redis transport: XADD sent");
342
343 #[cfg(feature = "metrics")]
344 metrics::counter!("dfe_transport_sent_total", "transport" => "redis").increment(1);
345
346 SendResult::Ok
347 }
348 Err(e) => {
349 #[cfg(feature = "logger")]
350 tracing::warn!(error = %e, stream = %stream, "Redis transport: XADD error");
351
352 SendResult::Fatal(TransportError::Send(format!(
353 "XADD to stream '{stream}' failed: {e}"
354 )))
355 }
356 }
357 }
358}
359
360impl TransportReceiver for RedisTransport {
361 type Token = RedisToken;
362
363 async fn recv(&self, max: usize) -> TransportResult<Vec<Message<Self::Token>>> {
364 if self.closed.load(Ordering::Relaxed) {
365 return Err(TransportError::Closed);
366 }
367
368 let stream_name = self
369 .config
370 .stream
371 .as_deref()
372 .ok_or_else(|| TransportError::Config("config.stream must be set for recv()".into()))?
373 .to_string();
374
375 self.ensure_group(&stream_name).await?;
376
377 let opts = StreamReadOptions::default()
378 .group(&self.config.group, &self.config.consumer)
379 .count(max)
380 .block(self.config.block_ms);
381
382 let mut conn = self.conn.lock().await;
383
384 let reply: StreamReadReply = conn
386 .xread_options(&[&stream_name], &[">"], &opts)
387 .await
388 .map_err(|e| {
389 #[cfg(feature = "logger")]
390 tracing::warn!(error = %e, stream = %stream_name, "Redis transport: XREADGROUP error");
391
392 TransportError::Recv(format!("XREADGROUP on stream '{stream_name}' failed: {e}"))
393 })?;
394
395 let stream_arc: Arc<str> = Arc::from(stream_name.as_str());
396 let mut messages = Vec::new();
397
398 for stream_key in &reply.keys {
399 for stream_id in &stream_key.ids {
400 let payload_bytes: Option<Vec<u8>> = stream_id
402 .map
403 .get("payload")
404 .and_then(|v| redis::from_redis_value(v.clone()).ok());
405
406 let payload = payload_bytes.unwrap_or_default();
407 let format = PayloadFormat::detect(&payload);
408 let timestamp_ms = parse_entry_timestamp(&stream_id.id);
409
410 messages.push(Message {
411 key: Some(Arc::clone(&stream_arc)),
412 payload,
413 token: RedisToken {
414 stream: Arc::clone(&stream_arc),
415 entry_id: stream_id.id.clone(),
416 },
417 timestamp_ms,
418 format,
419 });
420 }
421 }
422
423 if self.filter_engine.has_inbound_filters() {
425 let mut staged_dlq: Vec<super::filter::FilteredDlqEntry> = Vec::new();
426 messages.retain(|msg| match self.filter_engine.apply_inbound(&msg.payload) {
427 super::filter::FilterDisposition::Pass => true,
428 super::filter::FilterDisposition::Drop => false,
429 super::filter::FilterDisposition::Dlq => {
430 staged_dlq.push(super::filter::FilteredDlqEntry {
431 payload: msg.payload.clone(),
432 key: msg.key.clone(),
433 reason: "transport filter".to_string(),
434 });
435 false
436 }
437 });
438 if !staged_dlq.is_empty() {
439 self.filtered_dlq_buffer.lock().extend(staged_dlq);
440 }
441 }
442
443 #[cfg(feature = "logger")]
444 if !messages.is_empty() {
445 tracing::debug!(
446 messages = messages.len(),
447 "Redis transport: XREADGROUP received"
448 );
449 }
450
451 #[cfg(feature = "metrics")]
452 if !messages.is_empty() {
453 metrics::counter!("dfe_transport_received_total", "transport" => "redis")
454 .increment(messages.len() as u64);
455 }
456
457 Ok(messages)
458 }
459
460 fn take_filtered_dlq_entries(&self) -> Vec<super::filter::FilteredDlqEntry> {
461 std::mem::take(&mut *self.filtered_dlq_buffer.lock())
462 }
463
464 async fn commit(&self, tokens: &[Self::Token]) -> TransportResult<()> {
465 if tokens.is_empty() {
466 return Ok(());
467 }
468
469 let mut by_stream: std::collections::HashMap<&str, Vec<&str>> =
471 std::collections::HashMap::new();
472 for token in tokens {
473 by_stream
474 .entry(&token.stream)
475 .or_default()
476 .push(&token.entry_id);
477 }
478
479 let mut conn = self.conn.lock().await;
480
481 for (stream, ids) in &by_stream {
482 let id_refs: &[&str] = ids;
483 let _acked: i32 = conn
484 .xack(*stream, &self.config.group, id_refs)
485 .await
486 .map_err(|e| {
487 #[cfg(feature = "logger")]
488 tracing::warn!(error = %e, stream = %stream, "Redis transport: XACK error");
489
490 TransportError::Commit(format!("XACK on stream '{stream}' failed: {e}"))
491 })?;
492 }
493
494 #[cfg(feature = "logger")]
495 tracing::debug!(count = tokens.len(), "Redis transport: XACK committed");
496
497 Ok(())
498 }
499}
500
501fn parse_entry_timestamp(entry_id: &str) -> Option<i64> {
505 entry_id
506 .split_once('-')
507 .and_then(|(ms_str, _)| ms_str.parse::<i64>().ok())
508}
509
510#[cfg(test)]
511mod tests {
512 use super::*;
513
514 #[test]
515 fn token_display() {
516 let token = RedisToken {
517 stream: Arc::from("my_stream"),
518 entry_id: "1711432800000-0".into(),
519 };
520 assert_eq!(format!("{token}"), "redis:my_stream:1711432800000-0");
521 }
522
523 #[test]
524 fn token_clone() {
525 let token = RedisToken {
526 stream: Arc::from("s1"),
527 entry_id: "100-0".into(),
528 };
529 let cloned = token.clone();
530 assert_eq!(token, cloned);
531 }
532
533 #[test]
534 fn config_defaults() {
535 let config = RedisTransportConfig::default();
536 assert_eq!(config.url, "redis://127.0.0.1:6379");
537 assert!(config.stream.is_none());
538 assert_eq!(config.group, "dfe");
539 assert!(config.max_stream_len.is_none());
540 assert_eq!(config.block_ms, 5000);
541 }
542
543 #[test]
544 fn config_deserialise_minimal() {
545 let yaml = r"
546url: redis://myhost:6380
547stream: events
548";
549 let config: RedisTransportConfig = serde_yaml_ng::from_str(yaml).unwrap();
550 assert_eq!(config.url, "redis://myhost:6380");
551 assert_eq!(config.stream.as_deref(), Some("events"));
552 assert_eq!(config.group, "dfe");
554 assert_eq!(config.block_ms, 5000);
555 }
556
557 #[test]
558 fn config_deserialise_full() {
559 let yaml = r"
560url: rediss://secure.redis.io:6380
561stream: audit_log
562group: my_group
563consumer: worker-3
564max_stream_len: 100000
565block_ms: 2000
566";
567 let config: RedisTransportConfig = serde_yaml_ng::from_str(yaml).unwrap();
568 assert_eq!(config.url, "rediss://secure.redis.io:6380");
569 assert_eq!(config.stream.as_deref(), Some("audit_log"));
570 assert_eq!(config.group, "my_group");
571 assert_eq!(config.consumer, "worker-3");
572 assert_eq!(config.max_stream_len, Some(100_000));
573 assert_eq!(config.block_ms, 2000);
574 }
575
576 #[test]
577 fn parse_entry_timestamp_valid() {
578 assert_eq!(
579 parse_entry_timestamp("1711432800000-0"),
580 Some(1_711_432_800_000)
581 );
582 assert_eq!(parse_entry_timestamp("0-0"), Some(0));
583 }
584
585 #[test]
586 fn parse_entry_timestamp_invalid() {
587 assert_eq!(parse_entry_timestamp("not-a-number"), None);
588 assert_eq!(parse_entry_timestamp(""), None);
589 }
590
591 #[test]
592 fn resolve_stream_uses_key_when_non_empty() {
593 let config = RedisTransportConfig {
594 stream: Some("default_stream".into()),
595 ..Default::default()
596 };
597 let key = "override_stream";
600 let resolved = if key.is_empty() {
601 config.stream.as_deref().unwrap_or("")
602 } else {
603 key
604 };
605 assert_eq!(resolved, "override_stream");
606 }
607
608 #[test]
609 fn resolve_stream_falls_back_to_config() {
610 let config = RedisTransportConfig {
611 stream: Some("default_stream".into()),
612 ..Default::default()
613 };
614 let key = "";
615 let resolved = if key.is_empty() {
616 config.stream.as_deref().unwrap_or("")
617 } else {
618 key
619 };
620 assert_eq!(resolved, "default_stream");
621 }
622
623 #[tokio::test]
626 async fn redis_integration_xadd_xreadgroup_xack() {
627 let Ok(url) = std::env::var("REDIS_URL") else {
628 eprintln!("Skipping: REDIS_URL not set");
629 return;
630 };
631
632 let stream = format!("test_stream_{}", chrono::Utc::now().timestamp_millis());
633 let group = "test_group";
634 let consumer = "test_consumer";
635
636 let config = RedisTransportConfig {
637 url: url.clone(),
638 stream: Some(stream.clone()),
639 group: group.into(),
640 consumer: consumer.into(),
641 max_stream_len: Some(1000),
642 block_ms: 1000,
643 ..Default::default()
644 };
645
646 let transport = RedisTransport::new(&config).await.unwrap();
647
648 let r1 = transport.send("", b"{\"n\":1}").await;
650 assert!(r1.is_ok(), "first send should succeed");
651
652 let r2 = transport.send("", b"{\"n\":2}").await;
653 assert!(r2.is_ok(), "second send should succeed");
654
655 let messages = transport.recv(10).await.unwrap();
657 assert_eq!(messages.len(), 2, "should receive 2 messages");
658 assert_eq!(messages[0].payload, b"{\"n\":1}");
659 assert_eq!(messages[1].payload, b"{\"n\":2}");
660
661 let tokens: Vec<_> = messages.iter().map(|m| m.token.clone()).collect();
663 transport.commit(&tokens).await.unwrap();
664
665 let more = transport.recv(10).await.unwrap();
667 assert!(more.is_empty(), "no more messages after commit");
668
669 let mut conn = transport.conn.lock().await;
671 let _: redis::RedisResult<()> =
672 redis::cmd("DEL").arg(&stream).query_async(&mut *conn).await;
673
674 transport.close().await.unwrap();
675 assert!(!transport.is_healthy());
676 }
677}