Skip to main content

hyperi_rustlib/transport/
redis_transport.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/redis_transport.rs
3// Purpose:   Redis/Valkey Streams transport
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Redis Streams Transport
10//!
11//! Lightweight pub/sub transport using Redis (or Valkey) Streams.
12//! Uses `XADD` for send, `XREADGROUP` for receive, and `XACK` for commit.
13//!
14//! ## Send
15//!
16//! Appends payload bytes to a named stream via `XADD`. Optionally caps
17//! the stream length with `MAXLEN ~` for approximate trimming.
18//!
19//! ## Receive
20//!
21//! Reads from a consumer group via `XREADGROUP` with blocking. Creates
22//! the consumer group on first use if it does not exist.
23//!
24//! ## Commit
25//!
26//! Acknowledges processed entries via `XACK` so they are not re-delivered
27//! to other consumers in the same group.
28//!
29//! ## Example
30//!
31//! ```rust,ignore
32//! use hyperi_rustlib::transport::redis_transport::{RedisTransport, RedisTransportConfig};
33//!
34//! let config = RedisTransportConfig {
35//!     stream: Some("events".into()),
36//!     ..Default::default()
37//! };
38//! let transport = RedisTransport::new(&config).await?;
39//! transport.send("events", bytes::Bytes::from_static(b"{\"msg\":\"hello\"}")).await;
40//! ```
41
42use super::error::{TransportError, TransportResult};
43use super::traits::{CommitToken, RecvBatch, TransportBase, TransportReceiver, TransportSender};
44use super::types::{Message, PayloadFormat, SendResult};
45use redis::AsyncCommands;
46use redis::streams::{StreamMaxlen, StreamReadOptions, StreamReadReply};
47use serde::{Deserialize, Serialize};
48use std::sync::Arc;
49use std::sync::atomic::{AtomicBool, Ordering};
50use tokio::sync::Mutex;
51
52/// Commit token for Redis Streams transport.
53///
54/// Contains the stream name and entry ID needed for `XACK`.
55#[derive(Debug, Clone, PartialEq, Eq, Hash)]
56pub struct RedisToken {
57    /// Stream name the entry belongs to.
58    pub stream: Arc<str>,
59    /// Redis stream entry ID (e.g. "1711432800000-0").
60    pub entry_id: String,
61}
62
63impl CommitToken for RedisToken {}
64
65impl std::fmt::Display for RedisToken {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        write!(f, "redis:{}:{}", self.stream, self.entry_id)
68    }
69}
70
71fn default_url() -> String {
72    "redis://127.0.0.1:6379".into()
73}
74
75fn default_group() -> String {
76    "dfe".into()
77}
78
79fn default_consumer() -> String {
80    "consumer-1".into()
81}
82
83fn default_block_ms() -> usize {
84    5000
85}
86
87/// Configuration for Redis/Valkey Streams transport.
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct RedisTransportConfig {
90    /// Redis/Valkey connection URL.
91    ///
92    /// Supports `redis://`, `rediss://` (TLS), and `unix://` schemes.
93    /// Default: `"redis://127.0.0.1:6379"`.
94    #[serde(default = "default_url")]
95    pub url: String,
96
97    /// Stream name for send/receive.
98    ///
99    /// Used as default when key is empty in `send()`.
100    #[serde(default)]
101    pub stream: Option<String>,
102
103    /// Consumer group name. Default: `"dfe"`.
104    #[serde(default = "default_group")]
105    pub group: String,
106
107    /// Consumer name within group. Default: hostname or `"consumer-1"`.
108    #[serde(default = "default_consumer")]
109    pub consumer: String,
110
111    /// Maximum stream length (approximate via `MAXLEN ~`).
112    ///
113    /// `None` means unlimited growth.
114    #[serde(default)]
115    pub max_stream_len: Option<usize>,
116
117    /// Block timeout in milliseconds for `XREADGROUP`. Default: 5000.
118    #[serde(default = "default_block_ms")]
119    pub block_ms: usize,
120
121    /// Inbound message filters (applied on recv before caller sees messages).
122    #[serde(default)]
123    pub filters_in: Vec<super::filter::FilterRule>,
124
125    /// Outbound message filters (applied on send before transport dispatches).
126    #[serde(default)]
127    pub filters_out: Vec<super::filter::FilterRule>,
128}
129
130impl Default for RedisTransportConfig {
131    fn default() -> Self {
132        Self {
133            url: default_url(),
134            stream: None,
135            group: default_group(),
136            consumer: default_consumer(),
137            max_stream_len: None,
138            block_ms: default_block_ms(),
139            filters_in: Vec::new(),
140            filters_out: Vec::new(),
141        }
142    }
143}
144
145impl RedisTransportConfig {
146    /// Load from the config cascade under the `transport.redis` key.
147    #[must_use]
148    pub fn from_cascade() -> Self {
149        <Self as super::traits::FromCascade>::from_cascade_key("transport.redis")
150    }
151}
152
153/// Redis/Valkey Streams transport.
154///
155/// Supports both send (`XADD`) and receive (`XREADGROUP`) operations.
156/// Works with both Redis and Valkey (same wire protocol).
157pub struct RedisTransport {
158    conn: Mutex<redis::aio::MultiplexedConnection>,
159    config: RedisTransportConfig,
160    closed: Arc<AtomicBool>,
161    /// Whether the consumer group has been ensured for a given stream.
162    group_created: Mutex<std::collections::HashSet<String>>,
163    /// Transport-level message filter engine.
164    filter_engine: super::filter::TransportFilterEngine,
165}
166
167impl RedisTransport {
168    /// Create a new Redis Streams transport.
169    ///
170    /// Connects to the Redis server and prepares for stream operations.
171    /// The consumer group is created lazily on first `recv()` call.
172    ///
173    /// # Errors
174    ///
175    /// Returns error if the URL is invalid or connection fails.
176    pub async fn new(config: &RedisTransportConfig) -> TransportResult<Self> {
177        let client = redis::Client::open(config.url.as_str()).map_err(|e| {
178            TransportError::Config(format!("invalid Redis URL '{}': {e}", config.url))
179        })?;
180
181        let conn = client
182            .get_multiplexed_async_connection()
183            .await
184            .map_err(|e| {
185                TransportError::Connection(format!(
186                    "failed to connect to Redis at '{}': {e}",
187                    config.url
188                ))
189            })?;
190
191        #[cfg(feature = "logger")]
192        tracing::info!(
193            url = %config.url,
194            stream = ?config.stream,
195            group = %config.group,
196            "Redis transport opened"
197        );
198
199        let filter_engine = super::filter::TransportFilterEngine::new(
200            &config.filters_in,
201            &config.filters_out,
202            &crate::transport::filter::TransportFilterTierConfig::default(),
203        )?;
204
205        let closed = Arc::new(AtomicBool::new(false));
206
207        #[cfg(feature = "health")]
208        {
209            let h = Arc::clone(&closed);
210            crate::health::HealthRegistry::register("transport:redis", move || {
211                if h.load(Ordering::Relaxed) {
212                    crate::health::HealthStatus::Unhealthy
213                } else {
214                    crate::health::HealthStatus::Healthy
215                }
216            });
217        }
218
219        Ok(Self {
220            conn: Mutex::new(conn),
221            config: config.clone(),
222            closed,
223            group_created: Mutex::new(std::collections::HashSet::new()),
224            filter_engine,
225        })
226    }
227
228    /// Resolve the stream name: use `key` if non-empty, else fall back to config.
229    fn resolve_stream<'a>(&'a self, key: &'a str) -> Result<&'a str, TransportError> {
230        if !key.is_empty() {
231            return Ok(key);
232        }
233        self.config.stream.as_deref().ok_or_else(|| {
234            TransportError::Config(
235                "no stream name: key is empty and config.stream is not set".into(),
236            )
237        })
238    }
239
240    /// Ensure the consumer group exists for the given stream.
241    ///
242    /// Uses `XGROUP CREATE ... MKSTREAM` so the stream is created if absent.
243    /// Idempotent: tracks which streams have been initialised and only
244    /// issues the command once per stream per transport instance.
245    async fn ensure_group(&self, stream: &str) -> TransportResult<()> {
246        {
247            let created = self.group_created.lock().await;
248            if created.contains(stream) {
249                return Ok(());
250            }
251        }
252
253        let mut conn = self.conn.lock().await;
254        let result: redis::RedisResult<()> = conn
255            .xgroup_create_mkstream(stream, &self.config.group, "0")
256            .await;
257
258        match result {
259            Ok(()) => {}
260            Err(e) => {
261                // "BUSYGROUP Consumer Group name already exists" is not an error
262                let msg = e.to_string();
263                if !msg.contains("BUSYGROUP") {
264                    return Err(TransportError::Connection(format!(
265                        "failed to create consumer group '{}' on stream '{stream}': {e}",
266                        self.config.group
267                    )));
268                }
269            }
270        }
271
272        self.group_created.lock().await.insert(stream.to_string());
273        Ok(())
274    }
275}
276
277impl TransportBase for RedisTransport {
278    async fn close(&self) -> TransportResult<()> {
279        self.closed.store(true, Ordering::Relaxed);
280        Ok(())
281    }
282
283    fn is_healthy(&self) -> bool {
284        !self.closed.load(Ordering::Relaxed)
285    }
286
287    fn name(&self) -> &'static str {
288        "redis"
289    }
290}
291
292impl TransportSender for RedisTransport {
293    async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
294        if self.closed.load(Ordering::Relaxed) {
295            return SendResult::Fatal(TransportError::Closed);
296        }
297
298        // Outbound filter check
299        if self.filter_engine.has_outbound_filters() {
300            match self.filter_engine.apply_outbound(&payload) {
301                super::filter::FilterDisposition::Pass => {}
302                super::filter::FilterDisposition::Drop => return SendResult::Ok,
303                super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
304            }
305        }
306
307        let stream = match self.resolve_stream(key) {
308            Ok(s) => s.to_string(),
309            Err(e) => return SendResult::Fatal(e),
310        };
311
312        let mut conn = self.conn.lock().await;
313
314        let result: redis::RedisResult<String> = if let Some(max_len) = self.config.max_stream_len {
315            conn.xadd_maxlen(
316                &stream,
317                StreamMaxlen::Approx(max_len),
318                "*",
319                &[("payload", payload.as_ref())],
320            )
321            .await
322        } else {
323            conn.xadd(&stream, "*", &[("payload", payload.as_ref())])
324                .await
325        };
326
327        match result {
328            Ok(_entry_id) => {
329                #[cfg(feature = "logger")]
330                tracing::debug!(stream = %stream, "Redis transport: XADD sent");
331
332                #[cfg(feature = "metrics")]
333                metrics::counter!("dfe_transport_sent_total", "transport" => "redis").increment(1);
334
335                SendResult::Ok
336            }
337            Err(e) => {
338                #[cfg(feature = "logger")]
339                tracing::warn!(error = %e, stream = %stream, "Redis transport: XADD error");
340
341                SendResult::Fatal(TransportError::Send(format!(
342                    "XADD to stream '{stream}' failed: {e}"
343                )))
344            }
345        }
346    }
347}
348
349impl TransportReceiver for RedisTransport {
350    type Token = RedisToken;
351
352    async fn recv(&self, max: usize) -> TransportResult<RecvBatch<Self::Token>> {
353        if self.closed.load(Ordering::Relaxed) {
354            return Err(TransportError::Closed);
355        }
356
357        let stream_name = self
358            .config
359            .stream
360            .as_deref()
361            .ok_or_else(|| TransportError::Config("config.stream must be set for recv()".into()))?
362            .to_string();
363
364        self.ensure_group(&stream_name).await?;
365
366        let opts = StreamReadOptions::default()
367            .group(&self.config.group, &self.config.consumer)
368            .count(max)
369            .block(self.config.block_ms);
370
371        let mut conn = self.conn.lock().await;
372
373        // ">" means only new (undelivered) messages
374        let reply: StreamReadReply = conn
375            .xread_options(&[&stream_name], &[">"], &opts)
376            .await
377            .map_err(|e| {
378                #[cfg(feature = "logger")]
379                tracing::warn!(error = %e, stream = %stream_name, "Redis transport: XREADGROUP error");
380
381                TransportError::Recv(format!("XREADGROUP on stream '{stream_name}' failed: {e}"))
382            })?;
383
384        let stream_arc: Arc<str> = Arc::from(stream_name.as_str());
385        let mut messages = Vec::new();
386
387        for stream_key in &reply.keys {
388            for stream_id in &stream_key.ids {
389                // Extract the "payload" field from the entry
390                let payload_bytes: Option<Vec<u8>> = stream_id
391                    .map
392                    .get("payload")
393                    .and_then(|v| redis::from_redis_value(v.clone()).ok());
394
395                let payload = payload_bytes.unwrap_or_default();
396                let format = PayloadFormat::detect(&payload);
397                let timestamp_ms = parse_entry_timestamp(&stream_id.id);
398
399                messages.push(Message {
400                    key: Some(Arc::clone(&stream_arc)),
401                    payload,
402                    token: RedisToken {
403                        stream: Arc::clone(&stream_arc),
404                        entry_id: stream_id.id.clone(),
405                    },
406                    timestamp_ms,
407                    format,
408                });
409            }
410        }
411
412        // Apply inbound filters via the shared partition helper; DLQ entries
413        // are returned in the RecvBatch for the caller to route onward.
414        let batch = self.filter_engine.partition_batch(
415            messages,
416            |m| m.payload.as_slice(),
417            |m| m.key.clone(),
418        );
419        let messages = batch.messages;
420        let dlq_entries = batch.dlq_entries;
421
422        #[cfg(feature = "logger")]
423        if !messages.is_empty() {
424            tracing::debug!(
425                messages = messages.len(),
426                "Redis transport: XREADGROUP received"
427            );
428        }
429
430        #[cfg(feature = "metrics")]
431        if !messages.is_empty() {
432            metrics::counter!("dfe_transport_received_total", "transport" => "redis")
433                .increment(messages.len() as u64);
434        }
435
436        Ok(RecvBatch {
437            messages,
438            dlq_entries,
439        })
440    }
441
442    async fn commit(&self, tokens: &[Self::Token]) -> TransportResult<()> {
443        if tokens.is_empty() {
444            return Ok(());
445        }
446
447        // Group tokens by stream name for batch XACK
448        let mut by_stream: std::collections::HashMap<&str, Vec<&str>> =
449            std::collections::HashMap::new();
450        for token in tokens {
451            by_stream
452                .entry(&token.stream)
453                .or_default()
454                .push(&token.entry_id);
455        }
456
457        let mut conn = self.conn.lock().await;
458
459        for (stream, ids) in &by_stream {
460            let id_refs: &[&str] = ids;
461            let _acked: i32 = conn
462                .xack(*stream, &self.config.group, id_refs)
463                .await
464                .map_err(|e| {
465                    #[cfg(feature = "logger")]
466                    tracing::warn!(error = %e, stream = %stream, "Redis transport: XACK error");
467
468                    TransportError::Commit(format!("XACK on stream '{stream}' failed: {e}"))
469                })?;
470        }
471
472        #[cfg(feature = "logger")]
473        tracing::debug!(count = tokens.len(), "Redis transport: XACK committed");
474
475        Ok(())
476    }
477}
478
479/// Parse millisecond timestamp from a Redis stream entry ID.
480///
481/// Entry IDs have the format `<millisecondsTime>-<sequenceNumber>`.
482fn parse_entry_timestamp(entry_id: &str) -> Option<i64> {
483    entry_id
484        .split_once('-')
485        .and_then(|(ms_str, _)| ms_str.parse::<i64>().ok())
486}
487
488impl super::traits::FromCascade for RedisTransportConfig {}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493
494    #[test]
495    fn token_display() {
496        let token = RedisToken {
497            stream: Arc::from("my_stream"),
498            entry_id: "1711432800000-0".into(),
499        };
500        assert_eq!(format!("{token}"), "redis:my_stream:1711432800000-0");
501    }
502
503    #[test]
504    fn token_clone() {
505        let token = RedisToken {
506            stream: Arc::from("s1"),
507            entry_id: "100-0".into(),
508        };
509        let cloned = token.clone();
510        assert_eq!(token, cloned);
511    }
512
513    #[test]
514    fn config_defaults() {
515        let config = RedisTransportConfig::default();
516        assert_eq!(config.url, "redis://127.0.0.1:6379");
517        assert!(config.stream.is_none());
518        assert_eq!(config.group, "dfe");
519        assert!(config.max_stream_len.is_none());
520        assert_eq!(config.block_ms, 5000);
521    }
522
523    #[test]
524    fn config_deserialise_minimal() {
525        let yaml = r"
526url: redis://myhost:6380
527stream: events
528";
529        let config: RedisTransportConfig = serde_yaml_ng::from_str(yaml).unwrap();
530        assert_eq!(config.url, "redis://myhost:6380");
531        assert_eq!(config.stream.as_deref(), Some("events"));
532        // Defaults should be applied
533        assert_eq!(config.group, "dfe");
534        assert_eq!(config.block_ms, 5000);
535    }
536
537    #[test]
538    fn config_deserialise_full() {
539        let yaml = r"
540url: rediss://secure.redis.io:6380
541stream: audit_log
542group: my_group
543consumer: worker-3
544max_stream_len: 100000
545block_ms: 2000
546";
547        let config: RedisTransportConfig = serde_yaml_ng::from_str(yaml).unwrap();
548        assert_eq!(config.url, "rediss://secure.redis.io:6380");
549        assert_eq!(config.stream.as_deref(), Some("audit_log"));
550        assert_eq!(config.group, "my_group");
551        assert_eq!(config.consumer, "worker-3");
552        assert_eq!(config.max_stream_len, Some(100_000));
553        assert_eq!(config.block_ms, 2000);
554    }
555
556    #[test]
557    fn parse_entry_timestamp_valid() {
558        assert_eq!(
559            parse_entry_timestamp("1711432800000-0"),
560            Some(1_711_432_800_000)
561        );
562        assert_eq!(parse_entry_timestamp("0-0"), Some(0));
563    }
564
565    #[test]
566    fn parse_entry_timestamp_invalid() {
567        assert_eq!(parse_entry_timestamp("not-a-number"), None);
568        assert_eq!(parse_entry_timestamp(""), None);
569    }
570
571    #[test]
572    fn resolve_stream_uses_key_when_non_empty() {
573        let config = RedisTransportConfig {
574            stream: Some("default_stream".into()),
575            ..Default::default()
576        };
577        // Cannot call resolve_stream without a transport instance, so test
578        // the logic inline: non-empty key takes precedence.
579        let key = "override_stream";
580        let resolved = if key.is_empty() {
581            config.stream.as_deref().unwrap_or("")
582        } else {
583            key
584        };
585        assert_eq!(resolved, "override_stream");
586    }
587
588    #[test]
589    fn resolve_stream_falls_back_to_config() {
590        let config = RedisTransportConfig {
591            stream: Some("default_stream".into()),
592            ..Default::default()
593        };
594        let key = "";
595        let resolved = if key.is_empty() {
596            config.stream.as_deref().unwrap_or("")
597        } else {
598            key
599        };
600        assert_eq!(resolved, "default_stream");
601    }
602
603    // Integration test: requires a running Redis instance.
604    // Run with: REDIS_URL=redis://localhost:6379 cargo nextest run redis_integration
605    #[tokio::test]
606    async fn redis_integration_xadd_xreadgroup_xack() {
607        let Ok(url) = std::env::var("REDIS_URL") else {
608            eprintln!("Skipping: REDIS_URL not set");
609            return;
610        };
611
612        let stream = format!("test_stream_{}", chrono::Utc::now().timestamp_millis());
613        let group = "test_group";
614        let consumer = "test_consumer";
615
616        let config = RedisTransportConfig {
617            url: url.clone(),
618            stream: Some(stream.clone()),
619            group: group.into(),
620            consumer: consumer.into(),
621            max_stream_len: Some(1000),
622            block_ms: 1000,
623            ..Default::default()
624        };
625
626        let transport = RedisTransport::new(&config).await.unwrap();
627
628        // Send two messages
629        let r1 = transport
630            .send("", bytes::Bytes::from_static(b"{\"n\":1}"))
631            .await;
632        assert!(r1.is_ok(), "first send should succeed");
633
634        let r2 = transport
635            .send("", bytes::Bytes::from_static(b"{\"n\":2}"))
636            .await;
637        assert!(r2.is_ok(), "second send should succeed");
638
639        // Receive messages
640        let messages = transport.recv(10).await.unwrap().messages;
641        assert_eq!(messages.len(), 2, "should receive 2 messages");
642        assert_eq!(messages[0].payload, b"{\"n\":1}");
643        assert_eq!(messages[1].payload, b"{\"n\":2}");
644
645        // Commit (XACK)
646        let tokens: Vec<_> = messages.iter().map(|m| m.token.clone()).collect();
647        transport.commit(&tokens).await.unwrap();
648
649        // After commit, no new messages should be available
650        let more = transport.recv(10).await.unwrap().messages;
651        assert!(more.is_empty(), "no more messages after commit");
652
653        // Clean up: delete the test stream
654        let mut conn = transport.conn.lock().await;
655        let _: redis::RedisResult<()> =
656            redis::cmd("DEL").arg(&stream).query_async(&mut *conn).await;
657
658        transport.close().await.unwrap();
659        assert!(!transport.is_healthy());
660    }
661}