hyperi_rustlib/transport/
redis_transport.rs1use super::error::{TransportError, TransportResult};
43use super::traits::{CommitToken, RecvBatch, TransportBase, TransportReceiver, TransportSender};
44use super::types::{Message, PayloadFormat, SendResult};
45use redis::AsyncCommands;
46use redis::streams::{StreamMaxlen, StreamReadOptions, StreamReadReply};
47use serde::{Deserialize, Serialize};
48use std::sync::Arc;
49use std::sync::atomic::{AtomicBool, Ordering};
50use tokio::sync::Mutex;
51
52#[derive(Debug, Clone, PartialEq, Eq, Hash)]
56pub struct RedisToken {
57 pub stream: Arc<str>,
59 pub entry_id: String,
61}
62
63impl CommitToken for RedisToken {}
64
65impl std::fmt::Display for RedisToken {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 write!(f, "redis:{}:{}", self.stream, self.entry_id)
68 }
69}
70
71fn default_url() -> String {
72 "redis://127.0.0.1:6379".into()
73}
74
75fn default_group() -> String {
76 "dfe".into()
77}
78
79fn default_consumer() -> String {
80 "consumer-1".into()
81}
82
83fn default_block_ms() -> usize {
84 5000
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct RedisTransportConfig {
90 #[serde(default = "default_url")]
95 pub url: String,
96
97 #[serde(default)]
101 pub stream: Option<String>,
102
103 #[serde(default = "default_group")]
105 pub group: String,
106
107 #[serde(default = "default_consumer")]
109 pub consumer: String,
110
111 #[serde(default)]
115 pub max_stream_len: Option<usize>,
116
117 #[serde(default = "default_block_ms")]
119 pub block_ms: usize,
120
121 #[serde(default)]
123 pub filters_in: Vec<super::filter::FilterRule>,
124
125 #[serde(default)]
127 pub filters_out: Vec<super::filter::FilterRule>,
128}
129
130impl Default for RedisTransportConfig {
131 fn default() -> Self {
132 Self {
133 url: default_url(),
134 stream: None,
135 group: default_group(),
136 consumer: default_consumer(),
137 max_stream_len: None,
138 block_ms: default_block_ms(),
139 filters_in: Vec::new(),
140 filters_out: Vec::new(),
141 }
142 }
143}
144
145impl RedisTransportConfig {
146 #[must_use]
148 pub fn from_cascade() -> Self {
149 <Self as super::traits::FromCascade>::from_cascade_key("transport.redis")
150 }
151}
152
153pub struct RedisTransport {
158 conn: Mutex<redis::aio::MultiplexedConnection>,
159 config: RedisTransportConfig,
160 closed: Arc<AtomicBool>,
161 group_created: Mutex<std::collections::HashSet<String>>,
163 filter_engine: super::filter::TransportFilterEngine,
165}
166
167impl RedisTransport {
168 pub async fn new(config: &RedisTransportConfig) -> TransportResult<Self> {
177 let client = redis::Client::open(config.url.as_str()).map_err(|e| {
178 TransportError::Config(format!("invalid Redis URL '{}': {e}", config.url))
179 })?;
180
181 let conn = client
182 .get_multiplexed_async_connection()
183 .await
184 .map_err(|e| {
185 TransportError::Connection(format!(
186 "failed to connect to Redis at '{}': {e}",
187 config.url
188 ))
189 })?;
190
191 #[cfg(feature = "logger")]
192 tracing::info!(
193 url = %config.url,
194 stream = ?config.stream,
195 group = %config.group,
196 "Redis transport opened"
197 );
198
199 let filter_engine = super::filter::TransportFilterEngine::new(
200 &config.filters_in,
201 &config.filters_out,
202 &crate::transport::filter::TransportFilterTierConfig::default(),
203 )?;
204
205 let closed = Arc::new(AtomicBool::new(false));
206
207 #[cfg(feature = "health")]
208 {
209 let h = Arc::clone(&closed);
210 crate::health::HealthRegistry::register("transport:redis", move || {
211 if h.load(Ordering::Relaxed) {
212 crate::health::HealthStatus::Unhealthy
213 } else {
214 crate::health::HealthStatus::Healthy
215 }
216 });
217 }
218
219 Ok(Self {
220 conn: Mutex::new(conn),
221 config: config.clone(),
222 closed,
223 group_created: Mutex::new(std::collections::HashSet::new()),
224 filter_engine,
225 })
226 }
227
228 fn resolve_stream<'a>(&'a self, key: &'a str) -> Result<&'a str, TransportError> {
230 if !key.is_empty() {
231 return Ok(key);
232 }
233 self.config.stream.as_deref().ok_or_else(|| {
234 TransportError::Config(
235 "no stream name: key is empty and config.stream is not set".into(),
236 )
237 })
238 }
239
240 async fn ensure_group(&self, stream: &str) -> TransportResult<()> {
246 {
247 let created = self.group_created.lock().await;
248 if created.contains(stream) {
249 return Ok(());
250 }
251 }
252
253 let mut conn = self.conn.lock().await;
254 let result: redis::RedisResult<()> = conn
255 .xgroup_create_mkstream(stream, &self.config.group, "0")
256 .await;
257
258 match result {
259 Ok(()) => {}
260 Err(e) => {
261 let msg = e.to_string();
263 if !msg.contains("BUSYGROUP") {
264 return Err(TransportError::Connection(format!(
265 "failed to create consumer group '{}' on stream '{stream}': {e}",
266 self.config.group
267 )));
268 }
269 }
270 }
271
272 self.group_created.lock().await.insert(stream.to_string());
273 Ok(())
274 }
275}
276
277impl TransportBase for RedisTransport {
278 async fn close(&self) -> TransportResult<()> {
279 self.closed.store(true, Ordering::Relaxed);
280 Ok(())
281 }
282
283 fn is_healthy(&self) -> bool {
284 !self.closed.load(Ordering::Relaxed)
285 }
286
287 fn name(&self) -> &'static str {
288 "redis"
289 }
290}
291
292impl TransportSender for RedisTransport {
293 async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
294 if self.closed.load(Ordering::Relaxed) {
295 return SendResult::Fatal(TransportError::Closed);
296 }
297
298 if self.filter_engine.has_outbound_filters() {
300 match self.filter_engine.apply_outbound(&payload) {
301 super::filter::FilterDisposition::Pass => {}
302 super::filter::FilterDisposition::Drop => return SendResult::Ok,
303 super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
304 }
305 }
306
307 let stream = match self.resolve_stream(key) {
308 Ok(s) => s.to_string(),
309 Err(e) => return SendResult::Fatal(e),
310 };
311
312 let mut conn = self.conn.lock().await;
313
314 let result: redis::RedisResult<String> = if let Some(max_len) = self.config.max_stream_len {
315 conn.xadd_maxlen(
316 &stream,
317 StreamMaxlen::Approx(max_len),
318 "*",
319 &[("payload", payload.as_ref())],
320 )
321 .await
322 } else {
323 conn.xadd(&stream, "*", &[("payload", payload.as_ref())])
324 .await
325 };
326
327 match result {
328 Ok(_entry_id) => {
329 #[cfg(feature = "logger")]
330 tracing::debug!(stream = %stream, "Redis transport: XADD sent");
331
332 #[cfg(feature = "metrics")]
333 metrics::counter!("dfe_transport_sent_total", "transport" => "redis").increment(1);
334
335 SendResult::Ok
336 }
337 Err(e) => {
338 #[cfg(feature = "logger")]
339 tracing::warn!(error = %e, stream = %stream, "Redis transport: XADD error");
340
341 SendResult::Fatal(TransportError::Send(format!(
342 "XADD to stream '{stream}' failed: {e}"
343 )))
344 }
345 }
346 }
347}
348
349impl TransportReceiver for RedisTransport {
350 type Token = RedisToken;
351
352 async fn recv(&self, max: usize) -> TransportResult<RecvBatch<Self::Token>> {
353 if self.closed.load(Ordering::Relaxed) {
354 return Err(TransportError::Closed);
355 }
356
357 let stream_name = self
358 .config
359 .stream
360 .as_deref()
361 .ok_or_else(|| TransportError::Config("config.stream must be set for recv()".into()))?
362 .to_string();
363
364 self.ensure_group(&stream_name).await?;
365
366 let opts = StreamReadOptions::default()
367 .group(&self.config.group, &self.config.consumer)
368 .count(max)
369 .block(self.config.block_ms);
370
371 let mut conn = self.conn.lock().await;
372
373 let reply: StreamReadReply = conn
375 .xread_options(&[&stream_name], &[">"], &opts)
376 .await
377 .map_err(|e| {
378 #[cfg(feature = "logger")]
379 tracing::warn!(error = %e, stream = %stream_name, "Redis transport: XREADGROUP error");
380
381 TransportError::Recv(format!("XREADGROUP on stream '{stream_name}' failed: {e}"))
382 })?;
383
384 let stream_arc: Arc<str> = Arc::from(stream_name.as_str());
385 let mut messages = Vec::new();
386
387 for stream_key in &reply.keys {
388 for stream_id in &stream_key.ids {
389 let payload_bytes: Option<Vec<u8>> = stream_id
391 .map
392 .get("payload")
393 .and_then(|v| redis::from_redis_value(v.clone()).ok());
394
395 let payload = payload_bytes.unwrap_or_default();
396 let format = PayloadFormat::detect(&payload);
397 let timestamp_ms = parse_entry_timestamp(&stream_id.id);
398
399 messages.push(Message {
400 key: Some(Arc::clone(&stream_arc)),
401 payload,
402 token: RedisToken {
403 stream: Arc::clone(&stream_arc),
404 entry_id: stream_id.id.clone(),
405 },
406 timestamp_ms,
407 format,
408 });
409 }
410 }
411
412 let batch = self.filter_engine.partition_batch(
415 messages,
416 |m| m.payload.as_slice(),
417 |m| m.key.clone(),
418 );
419 let messages = batch.messages;
420 let dlq_entries = batch.dlq_entries;
421
422 #[cfg(feature = "logger")]
423 if !messages.is_empty() {
424 tracing::debug!(
425 messages = messages.len(),
426 "Redis transport: XREADGROUP received"
427 );
428 }
429
430 #[cfg(feature = "metrics")]
431 if !messages.is_empty() {
432 metrics::counter!("dfe_transport_received_total", "transport" => "redis")
433 .increment(messages.len() as u64);
434 }
435
436 Ok(RecvBatch {
437 messages,
438 dlq_entries,
439 })
440 }
441
442 async fn commit(&self, tokens: &[Self::Token]) -> TransportResult<()> {
443 if tokens.is_empty() {
444 return Ok(());
445 }
446
447 let mut by_stream: std::collections::HashMap<&str, Vec<&str>> =
449 std::collections::HashMap::new();
450 for token in tokens {
451 by_stream
452 .entry(&token.stream)
453 .or_default()
454 .push(&token.entry_id);
455 }
456
457 let mut conn = self.conn.lock().await;
458
459 for (stream, ids) in &by_stream {
460 let id_refs: &[&str] = ids;
461 let _acked: i32 = conn
462 .xack(*stream, &self.config.group, id_refs)
463 .await
464 .map_err(|e| {
465 #[cfg(feature = "logger")]
466 tracing::warn!(error = %e, stream = %stream, "Redis transport: XACK error");
467
468 TransportError::Commit(format!("XACK on stream '{stream}' failed: {e}"))
469 })?;
470 }
471
472 #[cfg(feature = "logger")]
473 tracing::debug!(count = tokens.len(), "Redis transport: XACK committed");
474
475 Ok(())
476 }
477}
478
479fn parse_entry_timestamp(entry_id: &str) -> Option<i64> {
483 entry_id
484 .split_once('-')
485 .and_then(|(ms_str, _)| ms_str.parse::<i64>().ok())
486}
487
488impl super::traits::FromCascade for RedisTransportConfig {}
489
490#[cfg(test)]
491mod tests {
492 use super::*;
493
494 #[test]
495 fn token_display() {
496 let token = RedisToken {
497 stream: Arc::from("my_stream"),
498 entry_id: "1711432800000-0".into(),
499 };
500 assert_eq!(format!("{token}"), "redis:my_stream:1711432800000-0");
501 }
502
503 #[test]
504 fn token_clone() {
505 let token = RedisToken {
506 stream: Arc::from("s1"),
507 entry_id: "100-0".into(),
508 };
509 let cloned = token.clone();
510 assert_eq!(token, cloned);
511 }
512
513 #[test]
514 fn config_defaults() {
515 let config = RedisTransportConfig::default();
516 assert_eq!(config.url, "redis://127.0.0.1:6379");
517 assert!(config.stream.is_none());
518 assert_eq!(config.group, "dfe");
519 assert!(config.max_stream_len.is_none());
520 assert_eq!(config.block_ms, 5000);
521 }
522
523 #[test]
524 fn config_deserialise_minimal() {
525 let yaml = r"
526url: redis://myhost:6380
527stream: events
528";
529 let config: RedisTransportConfig = serde_yaml_ng::from_str(yaml).unwrap();
530 assert_eq!(config.url, "redis://myhost:6380");
531 assert_eq!(config.stream.as_deref(), Some("events"));
532 assert_eq!(config.group, "dfe");
534 assert_eq!(config.block_ms, 5000);
535 }
536
537 #[test]
538 fn config_deserialise_full() {
539 let yaml = r"
540url: rediss://secure.redis.io:6380
541stream: audit_log
542group: my_group
543consumer: worker-3
544max_stream_len: 100000
545block_ms: 2000
546";
547 let config: RedisTransportConfig = serde_yaml_ng::from_str(yaml).unwrap();
548 assert_eq!(config.url, "rediss://secure.redis.io:6380");
549 assert_eq!(config.stream.as_deref(), Some("audit_log"));
550 assert_eq!(config.group, "my_group");
551 assert_eq!(config.consumer, "worker-3");
552 assert_eq!(config.max_stream_len, Some(100_000));
553 assert_eq!(config.block_ms, 2000);
554 }
555
556 #[test]
557 fn parse_entry_timestamp_valid() {
558 assert_eq!(
559 parse_entry_timestamp("1711432800000-0"),
560 Some(1_711_432_800_000)
561 );
562 assert_eq!(parse_entry_timestamp("0-0"), Some(0));
563 }
564
565 #[test]
566 fn parse_entry_timestamp_invalid() {
567 assert_eq!(parse_entry_timestamp("not-a-number"), None);
568 assert_eq!(parse_entry_timestamp(""), None);
569 }
570
571 #[test]
572 fn resolve_stream_uses_key_when_non_empty() {
573 let config = RedisTransportConfig {
574 stream: Some("default_stream".into()),
575 ..Default::default()
576 };
577 let key = "override_stream";
580 let resolved = if key.is_empty() {
581 config.stream.as_deref().unwrap_or("")
582 } else {
583 key
584 };
585 assert_eq!(resolved, "override_stream");
586 }
587
588 #[test]
589 fn resolve_stream_falls_back_to_config() {
590 let config = RedisTransportConfig {
591 stream: Some("default_stream".into()),
592 ..Default::default()
593 };
594 let key = "";
595 let resolved = if key.is_empty() {
596 config.stream.as_deref().unwrap_or("")
597 } else {
598 key
599 };
600 assert_eq!(resolved, "default_stream");
601 }
602
603 #[tokio::test]
606 async fn redis_integration_xadd_xreadgroup_xack() {
607 let Ok(url) = std::env::var("REDIS_URL") else {
608 eprintln!("Skipping: REDIS_URL not set");
609 return;
610 };
611
612 let stream = format!("test_stream_{}", chrono::Utc::now().timestamp_millis());
613 let group = "test_group";
614 let consumer = "test_consumer";
615
616 let config = RedisTransportConfig {
617 url: url.clone(),
618 stream: Some(stream.clone()),
619 group: group.into(),
620 consumer: consumer.into(),
621 max_stream_len: Some(1000),
622 block_ms: 1000,
623 ..Default::default()
624 };
625
626 let transport = RedisTransport::new(&config).await.unwrap();
627
628 let r1 = transport
630 .send("", bytes::Bytes::from_static(b"{\"n\":1}"))
631 .await;
632 assert!(r1.is_ok(), "first send should succeed");
633
634 let r2 = transport
635 .send("", bytes::Bytes::from_static(b"{\"n\":2}"))
636 .await;
637 assert!(r2.is_ok(), "second send should succeed");
638
639 let messages = transport.recv(10).await.unwrap().messages;
641 assert_eq!(messages.len(), 2, "should receive 2 messages");
642 assert_eq!(messages[0].payload, b"{\"n\":1}");
643 assert_eq!(messages[1].payload, b"{\"n\":2}");
644
645 let tokens: Vec<_> = messages.iter().map(|m| m.token.clone()).collect();
647 transport.commit(&tokens).await.unwrap();
648
649 let more = transport.recv(10).await.unwrap().messages;
651 assert!(more.is_empty(), "no more messages after commit");
652
653 let mut conn = transport.conn.lock().await;
655 let _: redis::RedisResult<()> =
656 redis::cmd("DEL").arg(&stream).query_async(&mut *conn).await;
657
658 transport.close().await.unwrap();
659 assert!(!transport.is_healthy());
660 }
661}