mod dedup_state;
mod noop;
#[cfg(feature = "redis")]
mod redis_dedup;
pub use dedup_state::PersistentProducerNonce;
#[cfg(feature = "redis")]
pub use redis_dedup::RedisStreamDedup;
#[cfg(feature = "redis")]
mod redis;
#[cfg(feature = "jetstream")]
mod jetstream;
#[cfg(feature = "net")]
pub mod net;
pub use noop::NoopAdapter;
#[cfg(feature = "redis")]
pub use self::redis::RedisAdapter;
#[cfg(feature = "jetstream")]
pub use self::jetstream::JetStreamAdapter;
#[cfg(feature = "net")]
pub use self::net::{NetAdapter, NetAdapterConfig};
use std::sync::Arc;
use async_trait::async_trait;
use crate::error::AdapterError;
use crate::event::{Batch, StoredEvent};
#[must_use]
#[cfg(any(feature = "redis", feature = "jetstream"))]
pub(crate) fn redact_url(url: &str) -> std::borrow::Cow<'_, str> {
let Some(scheme_end) = url.find("://") else {
return std::borrow::Cow::Borrowed(url);
};
let after_scheme = scheme_end + 3;
let authority_end = url[after_scheme..]
.find(['/', '?', '#'])
.map_or(url.len(), |i| after_scheme + i);
let authority = &url[after_scheme..authority_end];
let Some(at_pos) = authority.rfind('@') else {
return std::borrow::Cow::Borrowed(url);
};
let mut redacted = String::with_capacity(url.len());
redacted.push_str(&url[..after_scheme]);
redacted.push_str("[REDACTED]");
redacted.push_str(&authority[at_pos..]);
redacted.push_str(&url[authority_end..]);
std::borrow::Cow::Owned(redacted)
}
#[derive(Debug, Clone)]
pub struct ShardPollResult {
pub events: Vec<StoredEvent>,
pub next_id: Option<String>,
pub has_more: bool,
}
impl ShardPollResult {
pub fn empty() -> Self {
Self {
events: Vec::new(),
next_id: None,
has_more: false,
}
}
}
#[async_trait]
pub trait Adapter: Send + Sync {
async fn init(&mut self) -> Result<(), AdapterError>;
async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError>;
async fn flush(&self) -> Result<(), AdapterError>;
async fn shutdown(&self) -> Result<(), AdapterError>;
async fn poll_shard(
&self,
shard_id: u16,
from_id: Option<&str>,
limit: usize,
) -> Result<ShardPollResult, AdapterError>;
fn name(&self) -> &'static str;
async fn is_healthy(&self) -> bool {
true
}
}
#[async_trait]
impl Adapter for Box<dyn Adapter> {
async fn init(&mut self) -> Result<(), AdapterError> {
(**self).init().await
}
async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
(**self).on_batch(batch).await
}
async fn flush(&self) -> Result<(), AdapterError> {
(**self).flush().await
}
async fn shutdown(&self) -> Result<(), AdapterError> {
(**self).shutdown().await
}
async fn poll_shard(
&self,
shard_id: u16,
from_id: Option<&str>,
limit: usize,
) -> Result<ShardPollResult, AdapterError> {
(**self).poll_shard(shard_id, from_id, limit).await
}
fn name(&self) -> &'static str {
(**self).name()
}
async fn is_healthy(&self) -> bool {
(**self).is_healthy().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event::InternalEvent;
use serde_json::json;
use std::sync::Arc;
#[tokio::test]
async fn test_noop_adapter() {
let mut adapter = NoopAdapter::new();
adapter.init().await.unwrap();
let events = vec![
InternalEvent::from_value(json!({"test": 1}), 1, 0),
InternalEvent::from_value(json!({"test": 2}), 2, 0),
];
let batch = Batch::new(0, events, 0);
adapter.on_batch(Arc::new(batch)).await.unwrap();
adapter.flush().await.unwrap();
let result = adapter.poll_shard(0, None, 10).await.unwrap();
assert!(result.events.is_empty());
adapter.shutdown().await.unwrap();
}
#[test]
fn test_shard_poll_result_empty() {
let result = ShardPollResult::empty();
assert!(result.events.is_empty());
assert!(result.next_id.is_none());
assert!(!result.has_more);
}
#[test]
fn test_shard_poll_result_debug() {
let result = ShardPollResult::empty();
let debug = format!("{:?}", result);
assert!(debug.contains("ShardPollResult"));
}
#[test]
fn test_shard_poll_result_clone() {
let mut result = ShardPollResult::empty();
result.next_id = Some("cursor".to_string());
result.has_more = true;
let cloned = result.clone();
assert_eq!(cloned.next_id, Some("cursor".to_string()));
assert!(cloned.has_more);
}
#[tokio::test]
async fn test_noop_adapter_name() {
let adapter = NoopAdapter::new();
assert_eq!(adapter.name(), "noop");
}
#[tokio::test]
async fn test_noop_adapter_is_healthy() {
let mut adapter = NoopAdapter::new();
assert!(!adapter.is_healthy().await);
adapter.init().await.unwrap();
assert!(adapter.is_healthy().await);
}
#[tokio::test]
async fn test_boxed_adapter() {
let mut adapter: Box<dyn Adapter> = Box::new(NoopAdapter::new());
adapter.init().await.unwrap();
assert_eq!(adapter.name(), "noop");
assert!(adapter.is_healthy().await);
let events = vec![InternalEvent::from_value(json!({"test": 1}), 1, 0)];
let batch = Batch::new(0, events, 0);
adapter.on_batch(Arc::new(batch)).await.unwrap();
adapter.flush().await.unwrap();
let result = adapter.poll_shard(0, None, 10).await.unwrap();
assert!(result.events.is_empty());
adapter.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_arc_adapter() {
let mut adapter = NoopAdapter::new();
adapter.init().await.unwrap();
let adapter: Arc<dyn Adapter> = Arc::new(adapter);
assert_eq!(adapter.name(), "noop");
assert!(adapter.is_healthy().await);
let events = vec![InternalEvent::from_value(json!({"test": 1}), 1, 0)];
let batch = Batch::new(0, events, 0);
adapter.on_batch(Arc::new(batch)).await.unwrap();
adapter.flush().await.unwrap();
adapter.shutdown().await.unwrap();
}
#[cfg(any(feature = "redis", feature = "jetstream"))]
#[test]
fn redact_url_strips_userinfo() {
assert_eq!(
redact_url("redis://user:secret@redis.example.com:6379"),
"redis://[REDACTED]@redis.example.com:6379"
);
assert_eq!(
redact_url("nats://admin:p@ss@nats.svc:4222/path?foo=1"),
"nats://[REDACTED]@nats.svc:4222/path?foo=1"
);
assert_eq!(
redact_url("rediss://:tokenonly@host:6379"),
"rediss://[REDACTED]@host:6379"
);
}
#[cfg(any(feature = "redis", feature = "jetstream"))]
#[test]
fn redact_url_splits_on_last_at_in_authority() {
assert_eq!(
redact_url("redis://user:p@ss:word@redis.svc:6379"),
"redis://[REDACTED]@redis.svc:6379",
"the entire userinfo `user:p@ss:word` must redact, not just the first segment",
);
assert_eq!(
redact_url("nats://op:a@b@c@nats.svc:4222"),
"nats://[REDACTED]@nats.svc:4222",
);
assert_eq!(
redact_url("https://user:p@ss@host.example/foo@bar"),
"https://[REDACTED]@host.example/foo@bar",
);
}
#[cfg(any(feature = "redis", feature = "jetstream"))]
#[test]
fn redact_url_passthrough_when_no_userinfo() {
assert_eq!(
redact_url("redis://redis.svc:6379"),
"redis://redis.svc:6379"
);
assert_eq!(redact_url("nats://nats.svc:4222"), "nats://nats.svc:4222");
assert_eq!(
redact_url("https://example.com/path/@handle"),
"https://example.com/path/@handle"
);
}
}