Skip to main content

hyperi_rustlib/transport/
redis_transport.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/redis_transport.rs
3// Purpose:   Redis/Valkey Streams transport
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Redis Streams Transport
10//!
11//! Lightweight pub/sub transport using Redis (or Valkey) Streams.
12//! Uses `XADD` for send, `XREADGROUP` for receive, and `XACK` for commit.
13//!
14//! ## Send
15//!
16//! Appends payload bytes to a named stream via `XADD`. Optionally caps
17//! the stream length with `MAXLEN ~` for approximate trimming.
18//!
19//! ## Receive
20//!
21//! Reads from a consumer group via `XREADGROUP` with blocking. Creates
22//! the consumer group on first use if it does not exist.
23//!
24//! ## Commit
25//!
26//! Acknowledges processed entries via `XACK` so they are not re-delivered
27//! to other consumers in the same group.
28//!
29//! ## Example
30//!
31//! ```rust,ignore
32//! use hyperi_rustlib::transport::redis_transport::{RedisTransport, RedisTransportConfig};
33//!
34//! let config = RedisTransportConfig {
35//!     stream: Some("events".into()),
36//!     ..Default::default()
37//! };
38//! let transport = RedisTransport::new(&config).await?;
39//! transport.send("events", b"{\"msg\":\"hello\"}").await;
40//! ```
41
42use super::error::{TransportError, TransportResult};
43use super::traits::{CommitToken, TransportBase, TransportReceiver, TransportSender};
44use super::types::{Message, PayloadFormat, SendResult};
45use redis::AsyncCommands;
46use redis::streams::{StreamMaxlen, StreamReadOptions, StreamReadReply};
47use serde::{Deserialize, Serialize};
48use std::sync::Arc;
49use std::sync::atomic::{AtomicBool, Ordering};
50use tokio::sync::Mutex;
51
52/// Commit token for Redis Streams transport.
53///
54/// Contains the stream name and entry ID needed for `XACK`.
55#[derive(Debug, Clone, PartialEq, Eq, Hash)]
56pub struct RedisToken {
57    /// Stream name the entry belongs to.
58    pub stream: Arc<str>,
59    /// Redis stream entry ID (e.g. "1711432800000-0").
60    pub entry_id: String,
61}
62
63impl CommitToken for RedisToken {}
64
65impl std::fmt::Display for RedisToken {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        write!(f, "redis:{}:{}", self.stream, self.entry_id)
68    }
69}
70
71fn default_url() -> String {
72    "redis://127.0.0.1:6379".into()
73}
74
75fn default_group() -> String {
76    "dfe".into()
77}
78
79fn default_consumer() -> String {
80    "consumer-1".into()
81}
82
83fn default_block_ms() -> usize {
84    5000
85}
86
87/// Configuration for Redis/Valkey Streams transport.
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct RedisTransportConfig {
90    /// Redis/Valkey connection URL.
91    ///
92    /// Supports `redis://`, `rediss://` (TLS), and `unix://` schemes.
93    /// Default: `"redis://127.0.0.1:6379"`.
94    #[serde(default = "default_url")]
95    pub url: String,
96
97    /// Stream name for send/receive.
98    ///
99    /// Used as default when key is empty in `send()`.
100    #[serde(default)]
101    pub stream: Option<String>,
102
103    /// Consumer group name. Default: `"dfe"`.
104    #[serde(default = "default_group")]
105    pub group: String,
106
107    /// Consumer name within group. Default: hostname or `"consumer-1"`.
108    #[serde(default = "default_consumer")]
109    pub consumer: String,
110
111    /// Maximum stream length (approximate via `MAXLEN ~`).
112    ///
113    /// `None` means unlimited growth.
114    #[serde(default)]
115    pub max_stream_len: Option<usize>,
116
117    /// Block timeout in milliseconds for `XREADGROUP`. Default: 5000.
118    #[serde(default = "default_block_ms")]
119    pub block_ms: usize,
120
121    /// Inbound message filters (applied on recv before caller sees messages).
122    #[serde(default)]
123    pub filters_in: Vec<super::filter::FilterRule>,
124
125    /// Outbound message filters (applied on send before transport dispatches).
126    #[serde(default)]
127    pub filters_out: Vec<super::filter::FilterRule>,
128}
129
130impl Default for RedisTransportConfig {
131    fn default() -> Self {
132        Self {
133            url: default_url(),
134            stream: None,
135            group: default_group(),
136            consumer: default_consumer(),
137            max_stream_len: None,
138            block_ms: default_block_ms(),
139            filters_in: Vec::new(),
140            filters_out: Vec::new(),
141        }
142    }
143}
144
145impl RedisTransportConfig {
146    /// Load from the config cascade under the `transport.redis` key.
147    #[must_use]
148    pub fn from_cascade() -> Self {
149        #[cfg(feature = "config")]
150        {
151            if let Some(cfg) = crate::config::try_get()
152                && let Ok(tc) = cfg.unmarshal_key_registered::<Self>("transport.redis")
153            {
154                return tc;
155            }
156        }
157        Self::default()
158    }
159}
160
161/// Redis/Valkey Streams transport.
162///
163/// Supports both send (`XADD`) and receive (`XREADGROUP`) operations.
164/// Works with both Redis and Valkey (same wire protocol).
165pub struct RedisTransport {
166    conn: Mutex<redis::aio::MultiplexedConnection>,
167    config: RedisTransportConfig,
168    closed: Arc<AtomicBool>,
169    /// Whether the consumer group has been ensured for a given stream.
170    group_created: Mutex<std::collections::HashSet<String>>,
171    /// Transport-level message filter engine.
172    filter_engine: super::filter::TransportFilterEngine,
173    /// Buffer for messages staged to DLQ by inbound filters.
174    /// Drained by `take_filtered_dlq_entries()`.
175    filtered_dlq_buffer: parking_lot::Mutex<Vec<super::filter::FilteredDlqEntry>>,
176}
177
178impl RedisTransport {
179    /// Create a new Redis Streams transport.
180    ///
181    /// Connects to the Redis server and prepares for stream operations.
182    /// The consumer group is created lazily on first `recv()` call.
183    ///
184    /// # Errors
185    ///
186    /// Returns error if the URL is invalid or connection fails.
187    pub async fn new(config: &RedisTransportConfig) -> TransportResult<Self> {
188        let client = redis::Client::open(config.url.as_str()).map_err(|e| {
189            TransportError::Config(format!("invalid Redis URL '{}': {e}", config.url))
190        })?;
191
192        let conn = client
193            .get_multiplexed_async_connection()
194            .await
195            .map_err(|e| {
196                TransportError::Connection(format!(
197                    "failed to connect to Redis at '{}': {e}",
198                    config.url
199                ))
200            })?;
201
202        #[cfg(feature = "logger")]
203        tracing::info!(
204            url = %config.url,
205            stream = ?config.stream,
206            group = %config.group,
207            "Redis transport opened"
208        );
209
210        let filter_engine = super::filter::TransportFilterEngine::new(
211            &config.filters_in,
212            &config.filters_out,
213            &crate::transport::filter::TransportFilterTierConfig::default(),
214        )?;
215
216        let closed = Arc::new(AtomicBool::new(false));
217
218        #[cfg(feature = "health")]
219        {
220            let h = Arc::clone(&closed);
221            crate::health::HealthRegistry::register("transport:redis", move || {
222                if h.load(Ordering::Relaxed) {
223                    crate::health::HealthStatus::Unhealthy
224                } else {
225                    crate::health::HealthStatus::Healthy
226                }
227            });
228        }
229
230        Ok(Self {
231            conn: Mutex::new(conn),
232            config: config.clone(),
233            closed,
234            group_created: Mutex::new(std::collections::HashSet::new()),
235            filter_engine,
236            filtered_dlq_buffer: parking_lot::Mutex::new(Vec::new()),
237        })
238    }
239
240    /// Resolve the stream name: use `key` if non-empty, else fall back to config.
241    fn resolve_stream<'a>(&'a self, key: &'a str) -> Result<&'a str, TransportError> {
242        if !key.is_empty() {
243            return Ok(key);
244        }
245        self.config.stream.as_deref().ok_or_else(|| {
246            TransportError::Config(
247                "no stream name: key is empty and config.stream is not set".into(),
248            )
249        })
250    }
251
252    /// Ensure the consumer group exists for the given stream.
253    ///
254    /// Uses `XGROUP CREATE ... MKSTREAM` so the stream is created if absent.
255    /// Idempotent: tracks which streams have been initialised and only
256    /// issues the command once per stream per transport instance.
257    async fn ensure_group(&self, stream: &str) -> TransportResult<()> {
258        {
259            let created = self.group_created.lock().await;
260            if created.contains(stream) {
261                return Ok(());
262            }
263        }
264
265        let mut conn = self.conn.lock().await;
266        let result: redis::RedisResult<()> = conn
267            .xgroup_create_mkstream(stream, &self.config.group, "0")
268            .await;
269
270        match result {
271            Ok(()) => {}
272            Err(e) => {
273                // "BUSYGROUP Consumer Group name already exists" is not an error
274                let msg = e.to_string();
275                if !msg.contains("BUSYGROUP") {
276                    return Err(TransportError::Connection(format!(
277                        "failed to create consumer group '{}' on stream '{stream}': {e}",
278                        self.config.group
279                    )));
280                }
281            }
282        }
283
284        self.group_created.lock().await.insert(stream.to_string());
285        Ok(())
286    }
287}
288
289impl TransportBase for RedisTransport {
290    async fn close(&self) -> TransportResult<()> {
291        self.closed.store(true, Ordering::Relaxed);
292        Ok(())
293    }
294
295    fn is_healthy(&self) -> bool {
296        !self.closed.load(Ordering::Relaxed)
297    }
298
299    fn name(&self) -> &'static str {
300        "redis"
301    }
302}
303
304impl TransportSender for RedisTransport {
305    async fn send(&self, key: &str, payload: &[u8]) -> SendResult {
306        if self.closed.load(Ordering::Relaxed) {
307            return SendResult::Fatal(TransportError::Closed);
308        }
309
310        // Outbound filter check
311        if self.filter_engine.has_outbound_filters() {
312            match self.filter_engine.apply_outbound(payload) {
313                super::filter::FilterDisposition::Pass => {}
314                super::filter::FilterDisposition::Drop => return SendResult::Ok,
315                super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
316            }
317        }
318
319        let stream = match self.resolve_stream(key) {
320            Ok(s) => s.to_string(),
321            Err(e) => return SendResult::Fatal(e),
322        };
323
324        let mut conn = self.conn.lock().await;
325
326        let result: redis::RedisResult<String> = if let Some(max_len) = self.config.max_stream_len {
327            conn.xadd_maxlen(
328                &stream,
329                StreamMaxlen::Approx(max_len),
330                "*",
331                &[("payload", payload)],
332            )
333            .await
334        } else {
335            conn.xadd(&stream, "*", &[("payload", payload)]).await
336        };
337
338        match result {
339            Ok(_entry_id) => {
340                #[cfg(feature = "logger")]
341                tracing::debug!(stream = %stream, "Redis transport: XADD sent");
342
343                #[cfg(feature = "metrics")]
344                metrics::counter!("dfe_transport_sent_total", "transport" => "redis").increment(1);
345
346                SendResult::Ok
347            }
348            Err(e) => {
349                #[cfg(feature = "logger")]
350                tracing::warn!(error = %e, stream = %stream, "Redis transport: XADD error");
351
352                SendResult::Fatal(TransportError::Send(format!(
353                    "XADD to stream '{stream}' failed: {e}"
354                )))
355            }
356        }
357    }
358}
359
360impl TransportReceiver for RedisTransport {
361    type Token = RedisToken;
362
363    async fn recv(&self, max: usize) -> TransportResult<Vec<Message<Self::Token>>> {
364        if self.closed.load(Ordering::Relaxed) {
365            return Err(TransportError::Closed);
366        }
367
368        let stream_name = self
369            .config
370            .stream
371            .as_deref()
372            .ok_or_else(|| TransportError::Config("config.stream must be set for recv()".into()))?
373            .to_string();
374
375        self.ensure_group(&stream_name).await?;
376
377        let opts = StreamReadOptions::default()
378            .group(&self.config.group, &self.config.consumer)
379            .count(max)
380            .block(self.config.block_ms);
381
382        let mut conn = self.conn.lock().await;
383
384        // ">" means only new (undelivered) messages
385        let reply: StreamReadReply = conn
386            .xread_options(&[&stream_name], &[">"], &opts)
387            .await
388            .map_err(|e| {
389                #[cfg(feature = "logger")]
390                tracing::warn!(error = %e, stream = %stream_name, "Redis transport: XREADGROUP error");
391
392                TransportError::Recv(format!("XREADGROUP on stream '{stream_name}' failed: {e}"))
393            })?;
394
395        let stream_arc: Arc<str> = Arc::from(stream_name.as_str());
396        let mut messages = Vec::new();
397
398        for stream_key in &reply.keys {
399            for stream_id in &stream_key.ids {
400                // Extract the "payload" field from the entry
401                let payload_bytes: Option<Vec<u8>> = stream_id
402                    .map
403                    .get("payload")
404                    .and_then(|v| redis::from_redis_value(v.clone()).ok());
405
406                let payload = payload_bytes.unwrap_or_default();
407                let format = PayloadFormat::detect(&payload);
408                let timestamp_ms = parse_entry_timestamp(&stream_id.id);
409
410                messages.push(Message {
411                    key: Some(Arc::clone(&stream_arc)),
412                    payload,
413                    token: RedisToken {
414                        stream: Arc::clone(&stream_arc),
415                        entry_id: stream_id.id.clone(),
416                    },
417                    timestamp_ms,
418                    format,
419                });
420            }
421        }
422
423        // Apply inbound filters: drop messages, stage DLQ entries
424        if self.filter_engine.has_inbound_filters() {
425            let mut staged_dlq: Vec<super::filter::FilteredDlqEntry> = Vec::new();
426            messages.retain(|msg| match self.filter_engine.apply_inbound(&msg.payload) {
427                super::filter::FilterDisposition::Pass => true,
428                super::filter::FilterDisposition::Drop => false,
429                super::filter::FilterDisposition::Dlq => {
430                    staged_dlq.push(super::filter::FilteredDlqEntry {
431                        payload: msg.payload.clone(),
432                        key: msg.key.clone(),
433                        reason: "transport filter".to_string(),
434                    });
435                    false
436                }
437            });
438            if !staged_dlq.is_empty() {
439                self.filtered_dlq_buffer.lock().extend(staged_dlq);
440            }
441        }
442
443        #[cfg(feature = "logger")]
444        if !messages.is_empty() {
445            tracing::debug!(
446                messages = messages.len(),
447                "Redis transport: XREADGROUP received"
448            );
449        }
450
451        #[cfg(feature = "metrics")]
452        if !messages.is_empty() {
453            metrics::counter!("dfe_transport_received_total", "transport" => "redis")
454                .increment(messages.len() as u64);
455        }
456
457        Ok(messages)
458    }
459
460    fn take_filtered_dlq_entries(&self) -> Vec<super::filter::FilteredDlqEntry> {
461        std::mem::take(&mut *self.filtered_dlq_buffer.lock())
462    }
463
464    async fn commit(&self, tokens: &[Self::Token]) -> TransportResult<()> {
465        if tokens.is_empty() {
466            return Ok(());
467        }
468
469        // Group tokens by stream name for batch XACK
470        let mut by_stream: std::collections::HashMap<&str, Vec<&str>> =
471            std::collections::HashMap::new();
472        for token in tokens {
473            by_stream
474                .entry(&token.stream)
475                .or_default()
476                .push(&token.entry_id);
477        }
478
479        let mut conn = self.conn.lock().await;
480
481        for (stream, ids) in &by_stream {
482            let id_refs: &[&str] = ids;
483            let _acked: i32 = conn
484                .xack(*stream, &self.config.group, id_refs)
485                .await
486                .map_err(|e| {
487                    #[cfg(feature = "logger")]
488                    tracing::warn!(error = %e, stream = %stream, "Redis transport: XACK error");
489
490                    TransportError::Commit(format!("XACK on stream '{stream}' failed: {e}"))
491                })?;
492        }
493
494        #[cfg(feature = "logger")]
495        tracing::debug!(count = tokens.len(), "Redis transport: XACK committed");
496
497        Ok(())
498    }
499}
500
501/// Parse millisecond timestamp from a Redis stream entry ID.
502///
503/// Entry IDs have the format `<millisecondsTime>-<sequenceNumber>`.
504fn parse_entry_timestamp(entry_id: &str) -> Option<i64> {
505    entry_id
506        .split_once('-')
507        .and_then(|(ms_str, _)| ms_str.parse::<i64>().ok())
508}
509
510#[cfg(test)]
511mod tests {
512    use super::*;
513
514    #[test]
515    fn token_display() {
516        let token = RedisToken {
517            stream: Arc::from("my_stream"),
518            entry_id: "1711432800000-0".into(),
519        };
520        assert_eq!(format!("{token}"), "redis:my_stream:1711432800000-0");
521    }
522
523    #[test]
524    fn token_clone() {
525        let token = RedisToken {
526            stream: Arc::from("s1"),
527            entry_id: "100-0".into(),
528        };
529        let cloned = token.clone();
530        assert_eq!(token, cloned);
531    }
532
533    #[test]
534    fn config_defaults() {
535        let config = RedisTransportConfig::default();
536        assert_eq!(config.url, "redis://127.0.0.1:6379");
537        assert!(config.stream.is_none());
538        assert_eq!(config.group, "dfe");
539        assert!(config.max_stream_len.is_none());
540        assert_eq!(config.block_ms, 5000);
541    }
542
543    #[test]
544    fn config_deserialise_minimal() {
545        let yaml = r"
546url: redis://myhost:6380
547stream: events
548";
549        let config: RedisTransportConfig = serde_yaml_ng::from_str(yaml).unwrap();
550        assert_eq!(config.url, "redis://myhost:6380");
551        assert_eq!(config.stream.as_deref(), Some("events"));
552        // Defaults should be applied
553        assert_eq!(config.group, "dfe");
554        assert_eq!(config.block_ms, 5000);
555    }
556
557    #[test]
558    fn config_deserialise_full() {
559        let yaml = r"
560url: rediss://secure.redis.io:6380
561stream: audit_log
562group: my_group
563consumer: worker-3
564max_stream_len: 100000
565block_ms: 2000
566";
567        let config: RedisTransportConfig = serde_yaml_ng::from_str(yaml).unwrap();
568        assert_eq!(config.url, "rediss://secure.redis.io:6380");
569        assert_eq!(config.stream.as_deref(), Some("audit_log"));
570        assert_eq!(config.group, "my_group");
571        assert_eq!(config.consumer, "worker-3");
572        assert_eq!(config.max_stream_len, Some(100_000));
573        assert_eq!(config.block_ms, 2000);
574    }
575
576    #[test]
577    fn parse_entry_timestamp_valid() {
578        assert_eq!(
579            parse_entry_timestamp("1711432800000-0"),
580            Some(1_711_432_800_000)
581        );
582        assert_eq!(parse_entry_timestamp("0-0"), Some(0));
583    }
584
585    #[test]
586    fn parse_entry_timestamp_invalid() {
587        assert_eq!(parse_entry_timestamp("not-a-number"), None);
588        assert_eq!(parse_entry_timestamp(""), None);
589    }
590
591    #[test]
592    fn resolve_stream_uses_key_when_non_empty() {
593        let config = RedisTransportConfig {
594            stream: Some("default_stream".into()),
595            ..Default::default()
596        };
597        // Cannot call resolve_stream without a transport instance, so test
598        // the logic inline: non-empty key takes precedence.
599        let key = "override_stream";
600        let resolved = if key.is_empty() {
601            config.stream.as_deref().unwrap_or("")
602        } else {
603            key
604        };
605        assert_eq!(resolved, "override_stream");
606    }
607
608    #[test]
609    fn resolve_stream_falls_back_to_config() {
610        let config = RedisTransportConfig {
611            stream: Some("default_stream".into()),
612            ..Default::default()
613        };
614        let key = "";
615        let resolved = if key.is_empty() {
616            config.stream.as_deref().unwrap_or("")
617        } else {
618            key
619        };
620        assert_eq!(resolved, "default_stream");
621    }
622
623    // Integration test: requires a running Redis instance.
624    // Run with: REDIS_URL=redis://localhost:6379 cargo nextest run redis_integration
625    #[tokio::test]
626    async fn redis_integration_xadd_xreadgroup_xack() {
627        let Ok(url) = std::env::var("REDIS_URL") else {
628            eprintln!("Skipping: REDIS_URL not set");
629            return;
630        };
631
632        let stream = format!("test_stream_{}", chrono::Utc::now().timestamp_millis());
633        let group = "test_group";
634        let consumer = "test_consumer";
635
636        let config = RedisTransportConfig {
637            url: url.clone(),
638            stream: Some(stream.clone()),
639            group: group.into(),
640            consumer: consumer.into(),
641            max_stream_len: Some(1000),
642            block_ms: 1000,
643            ..Default::default()
644        };
645
646        let transport = RedisTransport::new(&config).await.unwrap();
647
648        // Send two messages
649        let r1 = transport.send("", b"{\"n\":1}").await;
650        assert!(r1.is_ok(), "first send should succeed");
651
652        let r2 = transport.send("", b"{\"n\":2}").await;
653        assert!(r2.is_ok(), "second send should succeed");
654
655        // Receive messages
656        let messages = transport.recv(10).await.unwrap();
657        assert_eq!(messages.len(), 2, "should receive 2 messages");
658        assert_eq!(messages[0].payload, b"{\"n\":1}");
659        assert_eq!(messages[1].payload, b"{\"n\":2}");
660
661        // Commit (XACK)
662        let tokens: Vec<_> = messages.iter().map(|m| m.token.clone()).collect();
663        transport.commit(&tokens).await.unwrap();
664
665        // After commit, no new messages should be available
666        let more = transport.recv(10).await.unwrap();
667        assert!(more.is_empty(), "no more messages after commit");
668
669        // Clean up: delete the test stream
670        let mut conn = transport.conn.lock().await;
671        let _: redis::RedisResult<()> =
672            redis::cmd("DEL").arg(&stream).query_async(&mut *conn).await;
673
674        transport.close().await.unwrap();
675        assert!(!transport.is_healthy());
676    }
677}