Skip to main content

hyperi_rustlib/transport/
redis_transport.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/redis_transport.rs
3// Purpose:   Redis/Valkey Streams transport
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Redis Streams Transport
10//!
11//! Lightweight pub/sub transport using Redis (or Valkey) Streams.
12//! Uses `XADD` for send, `XREADGROUP` for receive, and `XACK` for commit.
13//!
14//! ## Send
15//!
16//! Appends payload bytes to a named stream via `XADD`. Optionally caps
17//! the stream length with `MAXLEN ~` for approximate trimming.
18//!
19//! ## Receive
20//!
21//! Reads from a consumer group via `XREADGROUP` with blocking. Creates
22//! the consumer group on first use if it does not exist.
23//!
24//! ## Commit
25//!
26//! Acknowledges processed entries via `XACK` so they are not re-delivered
27//! to other consumers in the same group.
28//!
29//! ## Example
30//!
31//! ```rust,ignore
32//! use hyperi_rustlib::transport::redis_transport::{RedisTransport, RedisTransportConfig};
33//!
34//! let config = RedisTransportConfig {
35//!     stream: Some("events".into()),
36//!     ..Default::default()
37//! };
38//! let transport = RedisTransport::new(&config).await?;
39//! transport.send("events", bytes::Bytes::from_static(b"{\"msg\":\"hello\"}")).await;
40//! ```
41
42use super::error::{TransportError, TransportResult};
43use super::traits::{CommitToken, RecvBatch, TransportBase, TransportReceiver, TransportSender};
44use super::types::{Message, PayloadFormat, SendResult};
45use super::work_batch::WorkBatch;
46use redis::AsyncCommands;
47use redis::streams::{StreamMaxlen, StreamReadOptions, StreamReadReply};
48use serde::{Deserialize, Serialize};
49use std::sync::Arc;
50use std::sync::atomic::{AtomicBool, Ordering};
51use tokio::sync::Mutex;
52
53/// Commit token for Redis Streams transport.
54///
55/// Contains the stream name and entry ID needed for `XACK`.
56#[derive(Debug, Clone, PartialEq, Eq, Hash)]
57pub struct RedisToken {
58    /// Stream name the entry belongs to.
59    pub stream: Arc<str>,
60    /// Redis stream entry ID (e.g. "1711432800000-0").
61    pub entry_id: String,
62}
63
64impl CommitToken for RedisToken {}
65
66impl std::fmt::Display for RedisToken {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        write!(f, "redis:{}:{}", self.stream, self.entry_id)
69    }
70}
71
72fn default_url() -> String {
73    "redis://127.0.0.1:6379".into()
74}
75
76fn default_group() -> String {
77    "dfe".into()
78}
79
80fn default_consumer() -> String {
81    "consumer-1".into()
82}
83
84fn default_block_ms() -> usize {
85    5000
86}
87
88/// Configuration for Redis/Valkey Streams transport.
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct RedisTransportConfig {
91    /// Redis/Valkey connection URL.
92    ///
93    /// Supports `redis://`, `rediss://` (TLS), and `unix://` schemes.
94    /// Default: `"redis://127.0.0.1:6379"`.
95    #[serde(default = "default_url")]
96    pub url: String,
97
98    /// Stream name for send/receive.
99    ///
100    /// Used as default when key is empty in `send()`.
101    #[serde(default)]
102    pub stream: Option<String>,
103
104    /// Consumer group name. Default: `"dfe"`.
105    #[serde(default = "default_group")]
106    pub group: String,
107
108    /// Consumer name within group. Default: hostname or `"consumer-1"`.
109    #[serde(default = "default_consumer")]
110    pub consumer: String,
111
112    /// Maximum stream length (approximate via `MAXLEN ~`).
113    ///
114    /// `None` means unlimited growth.
115    #[serde(default)]
116    pub max_stream_len: Option<usize>,
117
118    /// Block timeout in milliseconds for `XREADGROUP`. Default: 5000.
119    #[serde(default = "default_block_ms")]
120    pub block_ms: usize,
121
122    /// Inbound message filters (applied on recv before caller sees messages).
123    #[serde(default)]
124    pub filters_in: Vec<super::filter::FilterRule>,
125
126    /// Outbound message filters (applied on send before transport dispatches).
127    #[serde(default)]
128    pub filters_out: Vec<super::filter::FilterRule>,
129}
130
131impl Default for RedisTransportConfig {
132    fn default() -> Self {
133        Self {
134            url: default_url(),
135            stream: None,
136            group: default_group(),
137            consumer: default_consumer(),
138            max_stream_len: None,
139            block_ms: default_block_ms(),
140            filters_in: Vec::new(),
141            filters_out: Vec::new(),
142        }
143    }
144}
145
146impl RedisTransportConfig {
147    /// Load from the config cascade under the `transport.redis` key.
148    #[must_use]
149    pub fn from_cascade() -> Self {
150        <Self as super::traits::FromCascade>::from_cascade_key("transport.redis")
151    }
152}
153
154/// Redis/Valkey Streams transport.
155///
156/// Supports both send (`XADD`) and receive (`XREADGROUP`) operations.
157/// Works with both Redis and Valkey (same wire protocol).
158pub struct RedisTransport {
159    conn: Mutex<redis::aio::MultiplexedConnection>,
160    config: RedisTransportConfig,
161    closed: Arc<AtomicBool>,
162    /// Whether the consumer group has been ensured for a given stream.
163    group_created: Mutex<std::collections::HashSet<String>>,
164    /// Transport-level message filter engine.
165    filter_engine: super::filter::TransportFilterEngine,
166}
167
168impl RedisTransport {
169    /// Create a new Redis Streams transport.
170    ///
171    /// Connects to the Redis server and prepares for stream operations.
172    /// The consumer group is created lazily on first `recv()` call.
173    ///
174    /// # Errors
175    ///
176    /// Returns error if the URL is invalid or connection fails.
177    pub async fn new(config: &RedisTransportConfig) -> TransportResult<Self> {
178        let client = redis::Client::open(config.url.as_str()).map_err(|e| {
179            TransportError::Config(format!("invalid Redis URL '{}': {e}", config.url))
180        })?;
181
182        let conn = client
183            .get_multiplexed_async_connection()
184            .await
185            .map_err(|e| {
186                TransportError::Connection(format!(
187                    "failed to connect to Redis at '{}': {e}",
188                    config.url
189                ))
190            })?;
191
192        #[cfg(feature = "logger")]
193        tracing::info!(
194            url = %config.url,
195            stream = ?config.stream,
196            group = %config.group,
197            "Redis transport opened"
198        );
199
200        let filter_engine = super::filter::TransportFilterEngine::new(
201            &config.filters_in,
202            &config.filters_out,
203            &crate::transport::filter::TransportFilterTierConfig::default(),
204        )?;
205
206        let closed = Arc::new(AtomicBool::new(false));
207
208        #[cfg(feature = "health")]
209        {
210            let h = Arc::clone(&closed);
211            crate::health::HealthRegistry::register("transport:redis", move || {
212                if h.load(Ordering::Relaxed) {
213                    crate::health::HealthStatus::Unhealthy
214                } else {
215                    crate::health::HealthStatus::Healthy
216                }
217            });
218        }
219
220        Ok(Self {
221            conn: Mutex::new(conn),
222            config: config.clone(),
223            closed,
224            group_created: Mutex::new(std::collections::HashSet::new()),
225            filter_engine,
226        })
227    }
228
229    /// Resolve the stream name: use `key` if non-empty, else fall back to config.
230    fn resolve_stream<'a>(&'a self, key: &'a str) -> Result<&'a str, TransportError> {
231        if !key.is_empty() {
232            return Ok(key);
233        }
234        self.config.stream.as_deref().ok_or_else(|| {
235            TransportError::Config(
236                "no stream name: key is empty and config.stream is not set".into(),
237            )
238        })
239    }
240
241    /// Ensure the consumer group exists for the given stream.
242    ///
243    /// Uses `XGROUP CREATE ... MKSTREAM` so the stream is created if absent.
244    /// Idempotent: tracks which streams have been initialised and only
245    /// issues the command once per stream per transport instance.
246    async fn ensure_group(&self, stream: &str) -> TransportResult<()> {
247        {
248            let created = self.group_created.lock().await;
249            if created.contains(stream) {
250                return Ok(());
251            }
252        }
253
254        let mut conn = self.conn.lock().await;
255        let result: redis::RedisResult<()> = conn
256            .xgroup_create_mkstream(stream, &self.config.group, "0")
257            .await;
258
259        match result {
260            Ok(()) => {}
261            Err(e) => {
262                // "BUSYGROUP Consumer Group name already exists" is not an error
263                let msg = e.to_string();
264                if !msg.contains("BUSYGROUP") {
265                    return Err(TransportError::Connection(format!(
266                        "failed to create consumer group '{}' on stream '{stream}': {e}",
267                        self.config.group
268                    )));
269                }
270            }
271        }
272
273        self.group_created.lock().await.insert(stream.to_string());
274        Ok(())
275    }
276}
277
278impl TransportBase for RedisTransport {
279    async fn close(&self) -> TransportResult<()> {
280        self.closed.store(true, Ordering::Relaxed);
281        Ok(())
282    }
283
284    fn is_healthy(&self) -> bool {
285        !self.closed.load(Ordering::Relaxed)
286    }
287
288    fn name(&self) -> &'static str {
289        "redis"
290    }
291}
292
293impl TransportSender for RedisTransport {
294    async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
295        if self.closed.load(Ordering::Relaxed) {
296            return SendResult::Fatal(TransportError::Closed);
297        }
298
299        // Outbound filter check
300        if self.filter_engine.has_outbound_filters() {
301            match self.filter_engine.apply_outbound(&payload) {
302                super::filter::FilterDisposition::Pass => {}
303                super::filter::FilterDisposition::Drop => return SendResult::Ok,
304                super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
305            }
306        }
307
308        let stream = match self.resolve_stream(key) {
309            Ok(s) => s.to_string(),
310            Err(e) => return SendResult::Fatal(e),
311        };
312
313        let mut conn = self.conn.lock().await;
314
315        let result: redis::RedisResult<String> = if let Some(max_len) = self.config.max_stream_len {
316            conn.xadd_maxlen(
317                &stream,
318                StreamMaxlen::Approx(max_len),
319                "*",
320                &[("payload", payload.as_ref())],
321            )
322            .await
323        } else {
324            conn.xadd(&stream, "*", &[("payload", payload.as_ref())])
325                .await
326        };
327
328        match result {
329            Ok(_entry_id) => {
330                #[cfg(feature = "logger")]
331                tracing::debug!(stream = %stream, "Redis transport: XADD sent");
332
333                #[cfg(feature = "metrics")]
334                metrics::counter!("dfe_transport_sent_total", "transport" => "redis").increment(1);
335
336                SendResult::Ok
337            }
338            Err(e) => {
339                #[cfg(feature = "logger")]
340                tracing::warn!(error = %e, stream = %stream, "Redis transport: XADD error");
341
342                SendResult::Fatal(TransportError::Send(format!(
343                    "XADD to stream '{stream}' failed: {e}"
344                )))
345            }
346        }
347    }
348}
349
350impl TransportReceiver for RedisTransport {
351    type Token = RedisToken;
352
353    async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
354        if self.closed.load(Ordering::Relaxed) {
355            return Err(TransportError::Closed);
356        }
357
358        let stream_name = self
359            .config
360            .stream
361            .as_deref()
362            .ok_or_else(|| TransportError::Config("config.stream must be set for recv()".into()))?
363            .to_string();
364
365        self.ensure_group(&stream_name).await?;
366
367        let opts = StreamReadOptions::default()
368            .group(&self.config.group, &self.config.consumer)
369            .count(max)
370            .block(self.config.block_ms);
371
372        let mut conn = self.conn.lock().await;
373
374        // ">" means only new (undelivered) messages
375        let reply: StreamReadReply = conn
376            .xread_options(&[&stream_name], &[">"], &opts)
377            .await
378            .map_err(|e| {
379                #[cfg(feature = "logger")]
380                tracing::warn!(error = %e, stream = %stream_name, "Redis transport: XREADGROUP error");
381
382                TransportError::Recv(format!("XREADGROUP on stream '{stream_name}' failed: {e}"))
383            })?;
384
385        let stream_arc: Arc<str> = Arc::from(stream_name.as_str());
386        let mut messages = Vec::new();
387
388        for stream_key in &reply.keys {
389            for stream_id in &stream_key.ids {
390                // Extract the "payload" field from the entry
391                let payload_bytes: Option<Vec<u8>> = stream_id
392                    .map
393                    .get("payload")
394                    .and_then(|v| redis::from_redis_value(v.clone()).ok());
395
396                let payload: bytes::Bytes = payload_bytes.unwrap_or_default().into();
397                let format = PayloadFormat::detect(&payload);
398                let timestamp_ms = parse_entry_timestamp(&stream_id.id);
399
400                messages.push(Message {
401                    key: Some(Arc::clone(&stream_arc)),
402                    payload,
403                    token: RedisToken {
404                        stream: Arc::clone(&stream_arc),
405                        entry_id: stream_id.id.clone(),
406                    },
407                    timestamp_ms,
408                    format,
409                });
410            }
411        }
412
413        // Apply inbound filters via the shared partition helper; DLQ entries
414        // are returned in the RecvBatch for the caller to route onward.
415        let batch = self.filter_engine.partition_batch(
416            messages,
417            |m| m.payload.as_ref(),
418            |m| m.key.clone(),
419            |m| m.token.clone(),
420        );
421        let messages = batch.messages;
422        let dlq_entries = batch.dlq_entries;
423        let filtered_tokens = batch.filtered_tokens;
424
425        #[cfg(feature = "logger")]
426        if !messages.is_empty() {
427            tracing::debug!(
428                messages = messages.len(),
429                "Redis transport: XREADGROUP received"
430            );
431        }
432
433        #[cfg(feature = "metrics")]
434        if !messages.is_empty() {
435            metrics::counter!("dfe_transport_received_total", "transport" => "redis")
436                .increment(messages.len() as u64);
437        }
438
439        Ok(RecvBatch {
440            messages,
441            dlq_entries,
442            filtered_tokens,
443        }
444        .into())
445    }
446
447    async fn commit(&self, tokens: &[Self::Token]) -> TransportResult<()> {
448        if tokens.is_empty() {
449            return Ok(());
450        }
451
452        // Group tokens by stream name for batch XACK
453        let mut by_stream: std::collections::HashMap<&str, Vec<&str>> =
454            std::collections::HashMap::new();
455        for token in tokens {
456            by_stream
457                .entry(&token.stream)
458                .or_default()
459                .push(&token.entry_id);
460        }
461
462        let mut conn = self.conn.lock().await;
463
464        for (stream, ids) in &by_stream {
465            let id_refs: &[&str] = ids;
466            let _acked: i32 = conn
467                .xack(*stream, &self.config.group, id_refs)
468                .await
469                .map_err(|e| {
470                    #[cfg(feature = "logger")]
471                    tracing::warn!(error = %e, stream = %stream, "Redis transport: XACK error");
472
473                    TransportError::Commit(format!("XACK on stream '{stream}' failed: {e}"))
474                })?;
475        }
476
477        #[cfg(feature = "logger")]
478        tracing::debug!(count = tokens.len(), "Redis transport: XACK committed");
479
480        Ok(())
481    }
482}
483
484/// Parse millisecond timestamp from a Redis stream entry ID.
485///
486/// Entry IDs have the format `<millisecondsTime>-<sequenceNumber>`.
487fn parse_entry_timestamp(entry_id: &str) -> Option<i64> {
488    entry_id
489        .split_once('-')
490        .and_then(|(ms_str, _)| ms_str.parse::<i64>().ok())
491}
492
493impl super::traits::FromCascade for RedisTransportConfig {}
494
495#[cfg(test)]
496mod tests {
497    use super::*;
498
499    #[test]
500    fn token_display() {
501        let token = RedisToken {
502            stream: Arc::from("my_stream"),
503            entry_id: "1711432800000-0".into(),
504        };
505        assert_eq!(format!("{token}"), "redis:my_stream:1711432800000-0");
506    }
507
508    #[test]
509    fn token_clone() {
510        let token = RedisToken {
511            stream: Arc::from("s1"),
512            entry_id: "100-0".into(),
513        };
514        let cloned = token.clone();
515        assert_eq!(token, cloned);
516    }
517
518    #[test]
519    fn config_defaults() {
520        let config = RedisTransportConfig::default();
521        assert_eq!(config.url, "redis://127.0.0.1:6379");
522        assert!(config.stream.is_none());
523        assert_eq!(config.group, "dfe");
524        assert!(config.max_stream_len.is_none());
525        assert_eq!(config.block_ms, 5000);
526    }
527
528    #[test]
529    fn config_deserialise_minimal() {
530        let yaml = r"
531url: redis://myhost:6380
532stream: events
533";
534        let config: RedisTransportConfig = serde_yaml_ng::from_str(yaml).unwrap();
535        assert_eq!(config.url, "redis://myhost:6380");
536        assert_eq!(config.stream.as_deref(), Some("events"));
537        // Defaults should be applied
538        assert_eq!(config.group, "dfe");
539        assert_eq!(config.block_ms, 5000);
540    }
541
542    #[test]
543    fn config_deserialise_full() {
544        let yaml = r"
545url: rediss://secure.redis.io:6380
546stream: audit_log
547group: my_group
548consumer: worker-3
549max_stream_len: 100000
550block_ms: 2000
551";
552        let config: RedisTransportConfig = serde_yaml_ng::from_str(yaml).unwrap();
553        assert_eq!(config.url, "rediss://secure.redis.io:6380");
554        assert_eq!(config.stream.as_deref(), Some("audit_log"));
555        assert_eq!(config.group, "my_group");
556        assert_eq!(config.consumer, "worker-3");
557        assert_eq!(config.max_stream_len, Some(100_000));
558        assert_eq!(config.block_ms, 2000);
559    }
560
561    #[test]
562    fn parse_entry_timestamp_valid() {
563        assert_eq!(
564            parse_entry_timestamp("1711432800000-0"),
565            Some(1_711_432_800_000)
566        );
567        assert_eq!(parse_entry_timestamp("0-0"), Some(0));
568    }
569
570    #[test]
571    fn parse_entry_timestamp_invalid() {
572        assert_eq!(parse_entry_timestamp("not-a-number"), None);
573        assert_eq!(parse_entry_timestamp(""), None);
574    }
575
576    #[test]
577    fn resolve_stream_uses_key_when_non_empty() {
578        let config = RedisTransportConfig {
579            stream: Some("default_stream".into()),
580            ..Default::default()
581        };
582        // Cannot call resolve_stream without a transport instance, so test
583        // the logic inline: non-empty key takes precedence.
584        let key = "override_stream";
585        let resolved = if key.is_empty() {
586            config.stream.as_deref().unwrap_or("")
587        } else {
588            key
589        };
590        assert_eq!(resolved, "override_stream");
591    }
592
593    #[test]
594    fn resolve_stream_falls_back_to_config() {
595        let config = RedisTransportConfig {
596            stream: Some("default_stream".into()),
597            ..Default::default()
598        };
599        let key = "";
600        let resolved = if key.is_empty() {
601            config.stream.as_deref().unwrap_or("")
602        } else {
603            key
604        };
605        assert_eq!(resolved, "default_stream");
606    }
607
608    // Integration test: requires a running Redis instance.
609    // Run with: REDIS_URL=redis://localhost:6379 cargo nextest run redis_integration
610    #[tokio::test]
611    async fn redis_integration_xadd_xreadgroup_xack() {
612        let Ok(url) = std::env::var("REDIS_URL") else {
613            eprintln!("Skipping: REDIS_URL not set");
614            return;
615        };
616
617        let stream = format!("test_stream_{}", chrono::Utc::now().timestamp_millis());
618        let group = "test_group";
619        let consumer = "test_consumer";
620
621        let config = RedisTransportConfig {
622            url: url.clone(),
623            stream: Some(stream.clone()),
624            group: group.into(),
625            consumer: consumer.into(),
626            max_stream_len: Some(1000),
627            block_ms: 1000,
628            ..Default::default()
629        };
630
631        let transport = RedisTransport::new(&config).await.unwrap();
632
633        // Send two messages
634        let r1 = transport
635            .send("", bytes::Bytes::from_static(b"{\"n\":1}"))
636            .await;
637        assert!(r1.is_ok(), "first send should succeed");
638
639        let r2 = transport
640            .send("", bytes::Bytes::from_static(b"{\"n\":2}"))
641            .await;
642        assert!(r2.is_ok(), "second send should succeed");
643
644        // Receive messages
645        let batch = transport.recv(10).await.unwrap();
646        assert_eq!(batch.records.len(), 2, "should receive 2 records");
647        assert_eq!(batch.records[0].payload.as_ref(), b"{\"n\":1}");
648        assert_eq!(batch.records[1].payload.as_ref(), b"{\"n\":2}");
649
650        // Commit (XACK) via the batch's commit tokens.
651        transport.commit(&batch.commit_tokens).await.unwrap();
652
653        // After commit, no new messages should be available
654        let more = transport.recv(10).await.unwrap().records;
655        assert!(more.is_empty(), "no more messages after commit");
656
657        // Clean up: delete the test stream
658        let mut conn = transport.conn.lock().await;
659        let _: redis::RedisResult<()> =
660            redis::cmd("DEL").arg(&stream).query_async(&mut *conn).await;
661
662        transport.close().await.unwrap();
663        assert!(!transport.is_healthy());
664    }
665}