hyperi_rustlib/transport/
redis_transport.rs1use super::error::{TransportError, TransportResult};
43use super::traits::{CommitToken, RecvBatch, TransportBase, TransportReceiver, TransportSender};
44use super::types::{Message, PayloadFormat, SendResult};
45use super::work_batch::WorkBatch;
46use redis::AsyncCommands;
47use redis::streams::{StreamMaxlen, StreamReadOptions, StreamReadReply};
48use serde::{Deserialize, Serialize};
49use std::sync::Arc;
50use std::sync::atomic::{AtomicBool, Ordering};
51use tokio::sync::Mutex;
52
53#[derive(Debug, Clone, PartialEq, Eq, Hash)]
57pub struct RedisToken {
58 pub stream: Arc<str>,
60 pub entry_id: String,
62}
63
64impl CommitToken for RedisToken {}
65
66impl std::fmt::Display for RedisToken {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 write!(f, "redis:{}:{}", self.stream, self.entry_id)
69 }
70}
71
72fn default_url() -> String {
73 "redis://127.0.0.1:6379".into()
74}
75
76fn default_group() -> String {
77 "dfe".into()
78}
79
80fn default_consumer() -> String {
81 "consumer-1".into()
82}
83
84fn default_block_ms() -> usize {
85 5000
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct RedisTransportConfig {
91 #[serde(default = "default_url")]
96 pub url: String,
97
98 #[serde(default)]
102 pub stream: Option<String>,
103
104 #[serde(default = "default_group")]
106 pub group: String,
107
108 #[serde(default = "default_consumer")]
110 pub consumer: String,
111
112 #[serde(default)]
116 pub max_stream_len: Option<usize>,
117
118 #[serde(default = "default_block_ms")]
120 pub block_ms: usize,
121
122 #[serde(default)]
124 pub filters_in: Vec<super::filter::FilterRule>,
125
126 #[serde(default)]
128 pub filters_out: Vec<super::filter::FilterRule>,
129}
130
131impl Default for RedisTransportConfig {
132 fn default() -> Self {
133 Self {
134 url: default_url(),
135 stream: None,
136 group: default_group(),
137 consumer: default_consumer(),
138 max_stream_len: None,
139 block_ms: default_block_ms(),
140 filters_in: Vec::new(),
141 filters_out: Vec::new(),
142 }
143 }
144}
145
146impl RedisTransportConfig {
147 #[must_use]
149 pub fn from_cascade() -> Self {
150 <Self as super::traits::FromCascade>::from_cascade_key("transport.redis")
151 }
152}
153
154pub struct RedisTransport {
159 conn: Mutex<redis::aio::MultiplexedConnection>,
160 config: RedisTransportConfig,
161 closed: Arc<AtomicBool>,
162 group_created: Mutex<std::collections::HashSet<String>>,
164 filter_engine: super::filter::TransportFilterEngine,
166}
167
168impl RedisTransport {
169 pub async fn new(config: &RedisTransportConfig) -> TransportResult<Self> {
178 let client = redis::Client::open(config.url.as_str()).map_err(|e| {
179 TransportError::Config(format!("invalid Redis URL '{}': {e}", config.url))
180 })?;
181
182 let conn = client
183 .get_multiplexed_async_connection()
184 .await
185 .map_err(|e| {
186 TransportError::Connection(format!(
187 "failed to connect to Redis at '{}': {e}",
188 config.url
189 ))
190 })?;
191
192 #[cfg(feature = "logger")]
193 tracing::info!(
194 url = %config.url,
195 stream = ?config.stream,
196 group = %config.group,
197 "Redis transport opened"
198 );
199
200 let filter_engine = super::filter::TransportFilterEngine::new(
201 &config.filters_in,
202 &config.filters_out,
203 &crate::transport::filter::TransportFilterTierConfig::default(),
204 )?;
205
206 let closed = Arc::new(AtomicBool::new(false));
207
208 #[cfg(feature = "health")]
209 {
210 let h = Arc::clone(&closed);
211 crate::health::HealthRegistry::register("transport:redis", move || {
212 if h.load(Ordering::Relaxed) {
213 crate::health::HealthStatus::Unhealthy
214 } else {
215 crate::health::HealthStatus::Healthy
216 }
217 });
218 }
219
220 Ok(Self {
221 conn: Mutex::new(conn),
222 config: config.clone(),
223 closed,
224 group_created: Mutex::new(std::collections::HashSet::new()),
225 filter_engine,
226 })
227 }
228
229 fn resolve_stream<'a>(&'a self, key: &'a str) -> Result<&'a str, TransportError> {
231 if !key.is_empty() {
232 return Ok(key);
233 }
234 self.config.stream.as_deref().ok_or_else(|| {
235 TransportError::Config(
236 "no stream name: key is empty and config.stream is not set".into(),
237 )
238 })
239 }
240
241 async fn ensure_group(&self, stream: &str) -> TransportResult<()> {
247 {
248 let created = self.group_created.lock().await;
249 if created.contains(stream) {
250 return Ok(());
251 }
252 }
253
254 let mut conn = self.conn.lock().await;
255 let result: redis::RedisResult<()> = conn
256 .xgroup_create_mkstream(stream, &self.config.group, "0")
257 .await;
258
259 match result {
260 Ok(()) => {}
261 Err(e) => {
262 let msg = e.to_string();
264 if !msg.contains("BUSYGROUP") {
265 return Err(TransportError::Connection(format!(
266 "failed to create consumer group '{}' on stream '{stream}': {e}",
267 self.config.group
268 )));
269 }
270 }
271 }
272
273 self.group_created.lock().await.insert(stream.to_string());
274 Ok(())
275 }
276}
277
278impl TransportBase for RedisTransport {
279 async fn close(&self) -> TransportResult<()> {
280 self.closed.store(true, Ordering::Relaxed);
281 Ok(())
282 }
283
284 fn is_healthy(&self) -> bool {
285 !self.closed.load(Ordering::Relaxed)
286 }
287
288 fn name(&self) -> &'static str {
289 "redis"
290 }
291}
292
293impl TransportSender for RedisTransport {
294 async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
295 if self.closed.load(Ordering::Relaxed) {
296 return SendResult::Fatal(TransportError::Closed);
297 }
298
299 if self.filter_engine.has_outbound_filters() {
301 match self.filter_engine.apply_outbound(&payload) {
302 super::filter::FilterDisposition::Pass => {}
303 super::filter::FilterDisposition::Drop => return SendResult::Ok,
304 super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
305 }
306 }
307
308 let stream = match self.resolve_stream(key) {
309 Ok(s) => s.to_string(),
310 Err(e) => return SendResult::Fatal(e),
311 };
312
313 let mut conn = self.conn.lock().await;
314
315 let result: redis::RedisResult<String> = if let Some(max_len) = self.config.max_stream_len {
316 conn.xadd_maxlen(
317 &stream,
318 StreamMaxlen::Approx(max_len),
319 "*",
320 &[("payload", payload.as_ref())],
321 )
322 .await
323 } else {
324 conn.xadd(&stream, "*", &[("payload", payload.as_ref())])
325 .await
326 };
327
328 match result {
329 Ok(_entry_id) => {
330 #[cfg(feature = "logger")]
331 tracing::debug!(stream = %stream, "Redis transport: XADD sent");
332
333 #[cfg(feature = "metrics")]
334 metrics::counter!("dfe_transport_sent_total", "transport" => "redis").increment(1);
335
336 SendResult::Ok
337 }
338 Err(e) => {
339 #[cfg(feature = "logger")]
340 tracing::warn!(error = %e, stream = %stream, "Redis transport: XADD error");
341
342 SendResult::Fatal(TransportError::Send(format!(
343 "XADD to stream '{stream}' failed: {e}"
344 )))
345 }
346 }
347 }
348}
349
350impl TransportReceiver for RedisTransport {
351 type Token = RedisToken;
352
353 async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
354 if self.closed.load(Ordering::Relaxed) {
355 return Err(TransportError::Closed);
356 }
357
358 let stream_name = self
359 .config
360 .stream
361 .as_deref()
362 .ok_or_else(|| TransportError::Config("config.stream must be set for recv()".into()))?
363 .to_string();
364
365 self.ensure_group(&stream_name).await?;
366
367 let opts = StreamReadOptions::default()
368 .group(&self.config.group, &self.config.consumer)
369 .count(max)
370 .block(self.config.block_ms);
371
372 let mut conn = self.conn.lock().await;
373
374 let reply: StreamReadReply = conn
376 .xread_options(&[&stream_name], &[">"], &opts)
377 .await
378 .map_err(|e| {
379 #[cfg(feature = "logger")]
380 tracing::warn!(error = %e, stream = %stream_name, "Redis transport: XREADGROUP error");
381
382 TransportError::Recv(format!("XREADGROUP on stream '{stream_name}' failed: {e}"))
383 })?;
384
385 let stream_arc: Arc<str> = Arc::from(stream_name.as_str());
386 let mut messages = Vec::new();
387
388 for stream_key in &reply.keys {
389 for stream_id in &stream_key.ids {
390 let payload_bytes: Option<Vec<u8>> = stream_id
392 .map
393 .get("payload")
394 .and_then(|v| redis::from_redis_value(v.clone()).ok());
395
396 let payload: bytes::Bytes = payload_bytes.unwrap_or_default().into();
397 let format = PayloadFormat::detect(&payload);
398 let timestamp_ms = parse_entry_timestamp(&stream_id.id);
399
400 messages.push(Message {
401 key: Some(Arc::clone(&stream_arc)),
402 payload,
403 token: RedisToken {
404 stream: Arc::clone(&stream_arc),
405 entry_id: stream_id.id.clone(),
406 },
407 timestamp_ms,
408 format,
409 });
410 }
411 }
412
413 let batch = self.filter_engine.partition_batch(
416 messages,
417 |m| m.payload.as_ref(),
418 |m| m.key.clone(),
419 |m| m.token.clone(),
420 );
421 let messages = batch.messages;
422 let dlq_entries = batch.dlq_entries;
423 let filtered_tokens = batch.filtered_tokens;
424
425 #[cfg(feature = "logger")]
426 if !messages.is_empty() {
427 tracing::debug!(
428 messages = messages.len(),
429 "Redis transport: XREADGROUP received"
430 );
431 }
432
433 #[cfg(feature = "metrics")]
434 if !messages.is_empty() {
435 metrics::counter!("dfe_transport_received_total", "transport" => "redis")
436 .increment(messages.len() as u64);
437 }
438
439 Ok(RecvBatch {
440 messages,
441 dlq_entries,
442 filtered_tokens,
443 }
444 .into())
445 }
446
447 async fn commit(&self, tokens: &[Self::Token]) -> TransportResult<()> {
448 if tokens.is_empty() {
449 return Ok(());
450 }
451
452 let mut by_stream: std::collections::HashMap<&str, Vec<&str>> =
454 std::collections::HashMap::new();
455 for token in tokens {
456 by_stream
457 .entry(&token.stream)
458 .or_default()
459 .push(&token.entry_id);
460 }
461
462 let mut conn = self.conn.lock().await;
463
464 for (stream, ids) in &by_stream {
465 let id_refs: &[&str] = ids;
466 let _acked: i32 = conn
467 .xack(*stream, &self.config.group, id_refs)
468 .await
469 .map_err(|e| {
470 #[cfg(feature = "logger")]
471 tracing::warn!(error = %e, stream = %stream, "Redis transport: XACK error");
472
473 TransportError::Commit(format!("XACK on stream '{stream}' failed: {e}"))
474 })?;
475 }
476
477 #[cfg(feature = "logger")]
478 tracing::debug!(count = tokens.len(), "Redis transport: XACK committed");
479
480 Ok(())
481 }
482}
483
484fn parse_entry_timestamp(entry_id: &str) -> Option<i64> {
488 entry_id
489 .split_once('-')
490 .and_then(|(ms_str, _)| ms_str.parse::<i64>().ok())
491}
492
493impl super::traits::FromCascade for RedisTransportConfig {}
494
495#[cfg(test)]
496mod tests {
497 use super::*;
498
499 #[test]
500 fn token_display() {
501 let token = RedisToken {
502 stream: Arc::from("my_stream"),
503 entry_id: "1711432800000-0".into(),
504 };
505 assert_eq!(format!("{token}"), "redis:my_stream:1711432800000-0");
506 }
507
508 #[test]
509 fn token_clone() {
510 let token = RedisToken {
511 stream: Arc::from("s1"),
512 entry_id: "100-0".into(),
513 };
514 let cloned = token.clone();
515 assert_eq!(token, cloned);
516 }
517
518 #[test]
519 fn config_defaults() {
520 let config = RedisTransportConfig::default();
521 assert_eq!(config.url, "redis://127.0.0.1:6379");
522 assert!(config.stream.is_none());
523 assert_eq!(config.group, "dfe");
524 assert!(config.max_stream_len.is_none());
525 assert_eq!(config.block_ms, 5000);
526 }
527
528 #[test]
529 fn config_deserialise_minimal() {
530 let yaml = r"
531url: redis://myhost:6380
532stream: events
533";
534 let config: RedisTransportConfig = serde_yaml_ng::from_str(yaml).unwrap();
535 assert_eq!(config.url, "redis://myhost:6380");
536 assert_eq!(config.stream.as_deref(), Some("events"));
537 assert_eq!(config.group, "dfe");
539 assert_eq!(config.block_ms, 5000);
540 }
541
542 #[test]
543 fn config_deserialise_full() {
544 let yaml = r"
545url: rediss://secure.redis.io:6380
546stream: audit_log
547group: my_group
548consumer: worker-3
549max_stream_len: 100000
550block_ms: 2000
551";
552 let config: RedisTransportConfig = serde_yaml_ng::from_str(yaml).unwrap();
553 assert_eq!(config.url, "rediss://secure.redis.io:6380");
554 assert_eq!(config.stream.as_deref(), Some("audit_log"));
555 assert_eq!(config.group, "my_group");
556 assert_eq!(config.consumer, "worker-3");
557 assert_eq!(config.max_stream_len, Some(100_000));
558 assert_eq!(config.block_ms, 2000);
559 }
560
561 #[test]
562 fn parse_entry_timestamp_valid() {
563 assert_eq!(
564 parse_entry_timestamp("1711432800000-0"),
565 Some(1_711_432_800_000)
566 );
567 assert_eq!(parse_entry_timestamp("0-0"), Some(0));
568 }
569
570 #[test]
571 fn parse_entry_timestamp_invalid() {
572 assert_eq!(parse_entry_timestamp("not-a-number"), None);
573 assert_eq!(parse_entry_timestamp(""), None);
574 }
575
576 #[test]
577 fn resolve_stream_uses_key_when_non_empty() {
578 let config = RedisTransportConfig {
579 stream: Some("default_stream".into()),
580 ..Default::default()
581 };
582 let key = "override_stream";
585 let resolved = if key.is_empty() {
586 config.stream.as_deref().unwrap_or("")
587 } else {
588 key
589 };
590 assert_eq!(resolved, "override_stream");
591 }
592
593 #[test]
594 fn resolve_stream_falls_back_to_config() {
595 let config = RedisTransportConfig {
596 stream: Some("default_stream".into()),
597 ..Default::default()
598 };
599 let key = "";
600 let resolved = if key.is_empty() {
601 config.stream.as_deref().unwrap_or("")
602 } else {
603 key
604 };
605 assert_eq!(resolved, "default_stream");
606 }
607
608 #[tokio::test]
611 async fn redis_integration_xadd_xreadgroup_xack() {
612 let Ok(url) = std::env::var("REDIS_URL") else {
613 eprintln!("Skipping: REDIS_URL not set");
614 return;
615 };
616
617 let stream = format!("test_stream_{}", chrono::Utc::now().timestamp_millis());
618 let group = "test_group";
619 let consumer = "test_consumer";
620
621 let config = RedisTransportConfig {
622 url: url.clone(),
623 stream: Some(stream.clone()),
624 group: group.into(),
625 consumer: consumer.into(),
626 max_stream_len: Some(1000),
627 block_ms: 1000,
628 ..Default::default()
629 };
630
631 let transport = RedisTransport::new(&config).await.unwrap();
632
633 let r1 = transport
635 .send("", bytes::Bytes::from_static(b"{\"n\":1}"))
636 .await;
637 assert!(r1.is_ok(), "first send should succeed");
638
639 let r2 = transport
640 .send("", bytes::Bytes::from_static(b"{\"n\":2}"))
641 .await;
642 assert!(r2.is_ok(), "second send should succeed");
643
644 let batch = transport.recv(10).await.unwrap();
646 assert_eq!(batch.records.len(), 2, "should receive 2 records");
647 assert_eq!(batch.records[0].payload.as_ref(), b"{\"n\":1}");
648 assert_eq!(batch.records[1].payload.as_ref(), b"{\"n\":2}");
649
650 transport.commit(&batch.commit_tokens).await.unwrap();
652
653 let more = transport.recv(10).await.unwrap().records;
655 assert!(more.is_empty(), "no more messages after commit");
656
657 let mut conn = transport.conn.lock().await;
659 let _: redis::RedisResult<()> =
660 redis::cmd("DEL").arg(&stream).query_async(&mut *conn).await;
661
662 transport.close().await.unwrap();
663 assert!(!transport.is_healthy());
664 }
665}