1mod dedup_state;
24mod noop;
25#[cfg(feature = "redis")]
26mod redis_dedup;
27
28pub use dedup_state::PersistentProducerNonce;
29#[cfg(feature = "redis")]
30pub use redis_dedup::RedisStreamDedup;
31
32#[cfg(feature = "redis")]
33mod redis;
34
35#[cfg(feature = "jetstream")]
36mod jetstream;
37
38#[cfg(feature = "net")]
39pub mod net;
40
41pub use noop::NoopAdapter;
42
43#[cfg(feature = "redis")]
44pub use self::redis::RedisAdapter;
45
46#[cfg(feature = "jetstream")]
47pub use self::jetstream::JetStreamAdapter;
48
49#[cfg(feature = "net")]
50pub use self::net::{NetAdapter, NetAdapterConfig};
51
52use async_trait::async_trait;
53
54use crate::error::AdapterError;
55use crate::event::{Batch, StoredEvent};
56
57#[must_use]
68#[cfg(any(feature = "redis", feature = "jetstream"))]
69pub(crate) fn redact_url(url: &str) -> std::borrow::Cow<'_, str> {
70 let Some(scheme_end) = url.find("://") else {
71 return std::borrow::Cow::Borrowed(url);
72 };
73 let after_scheme = scheme_end + 3;
74 let authority_end = url[after_scheme..]
77 .find(['/', '?', '#'])
78 .map_or(url.len(), |i| after_scheme + i);
79 let authority = &url[after_scheme..authority_end];
80 let Some(at_pos) = authority.rfind('@') else {
89 return std::borrow::Cow::Borrowed(url);
90 };
91 let mut redacted = String::with_capacity(url.len());
92 redacted.push_str(&url[..after_scheme]);
93 redacted.push_str("[REDACTED]");
94 redacted.push_str(&authority[at_pos..]);
95 redacted.push_str(&url[authority_end..]);
96 std::borrow::Cow::Owned(redacted)
97}
98
99#[derive(Debug, Clone)]
101pub struct ShardPollResult {
102 pub events: Vec<StoredEvent>,
104 pub next_id: Option<String>,
107 pub has_more: bool,
109}
110
111impl ShardPollResult {
112 pub fn empty() -> Self {
114 Self {
115 events: Vec::new(),
116 next_id: None,
117 has_more: false,
118 }
119 }
120}
121
122#[async_trait]
136pub trait Adapter: Send + Sync {
137 async fn init(&mut self) -> Result<(), AdapterError>;
142
143 async fn on_batch(&self, batch: Batch) -> Result<(), AdapterError>;
154
155 async fn flush(&self) -> Result<(), AdapterError>;
160
161 async fn shutdown(&self) -> Result<(), AdapterError>;
165
166 async fn poll_shard(
178 &self,
179 shard_id: u16,
180 from_id: Option<&str>,
181 limit: usize,
182 ) -> Result<ShardPollResult, AdapterError>;
183
184 fn name(&self) -> &'static str;
186
187 async fn is_healthy(&self) -> bool {
191 true
192 }
193}
194
195#[async_trait]
197impl Adapter for Box<dyn Adapter> {
198 async fn init(&mut self) -> Result<(), AdapterError> {
199 (**self).init().await
200 }
201
202 async fn on_batch(&self, batch: Batch) -> Result<(), AdapterError> {
203 (**self).on_batch(batch).await
204 }
205
206 async fn flush(&self) -> Result<(), AdapterError> {
207 (**self).flush().await
208 }
209
210 async fn shutdown(&self) -> Result<(), AdapterError> {
211 (**self).shutdown().await
212 }
213
214 async fn poll_shard(
215 &self,
216 shard_id: u16,
217 from_id: Option<&str>,
218 limit: usize,
219 ) -> Result<ShardPollResult, AdapterError> {
220 (**self).poll_shard(shard_id, from_id, limit).await
221 }
222
223 fn name(&self) -> &'static str {
224 (**self).name()
225 }
226
227 async fn is_healthy(&self) -> bool {
228 (**self).is_healthy().await
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use crate::event::InternalEvent;
236 use serde_json::json;
237 use std::sync::Arc;
238
239 #[tokio::test]
240 async fn test_noop_adapter() {
241 let mut adapter = NoopAdapter::new();
242 adapter.init().await.unwrap();
243
244 let events = vec![
245 InternalEvent::from_value(json!({"test": 1}), 1, 0),
246 InternalEvent::from_value(json!({"test": 2}), 2, 0),
247 ];
248 let batch = Batch::new(0, events, 0);
249
250 adapter.on_batch(batch).await.unwrap();
251 adapter.flush().await.unwrap();
252
253 let result = adapter.poll_shard(0, None, 10).await.unwrap();
255 assert!(result.events.is_empty());
256
257 adapter.shutdown().await.unwrap();
258 }
259
260 #[test]
261 fn test_shard_poll_result_empty() {
262 let result = ShardPollResult::empty();
263 assert!(result.events.is_empty());
264 assert!(result.next_id.is_none());
265 assert!(!result.has_more);
266 }
267
268 #[test]
269 fn test_shard_poll_result_debug() {
270 let result = ShardPollResult::empty();
271 let debug = format!("{:?}", result);
272 assert!(debug.contains("ShardPollResult"));
273 }
274
275 #[test]
276 fn test_shard_poll_result_clone() {
277 let mut result = ShardPollResult::empty();
278 result.next_id = Some("cursor".to_string());
279 result.has_more = true;
280
281 let cloned = result.clone();
282 assert_eq!(cloned.next_id, Some("cursor".to_string()));
283 assert!(cloned.has_more);
284 }
285
286 #[tokio::test]
287 async fn test_noop_adapter_name() {
288 let adapter = NoopAdapter::new();
289 assert_eq!(adapter.name(), "noop");
290 }
291
292 #[tokio::test]
293 async fn test_noop_adapter_is_healthy() {
294 let mut adapter = NoopAdapter::new();
295 assert!(!adapter.is_healthy().await);
297 adapter.init().await.unwrap();
299 assert!(adapter.is_healthy().await);
300 }
301
302 #[tokio::test]
303 async fn test_boxed_adapter() {
304 let mut adapter: Box<dyn Adapter> = Box::new(NoopAdapter::new());
305
306 adapter.init().await.unwrap();
308 assert_eq!(adapter.name(), "noop");
309 assert!(adapter.is_healthy().await);
310
311 let events = vec![InternalEvent::from_value(json!({"test": 1}), 1, 0)];
312 let batch = Batch::new(0, events, 0);
313 adapter.on_batch(batch).await.unwrap();
314
315 adapter.flush().await.unwrap();
316
317 let result = adapter.poll_shard(0, None, 10).await.unwrap();
318 assert!(result.events.is_empty());
319
320 adapter.shutdown().await.unwrap();
321 }
322
323 #[tokio::test]
324 async fn test_arc_adapter() {
325 let mut adapter = NoopAdapter::new();
326 adapter.init().await.unwrap();
327
328 let adapter: Arc<dyn Adapter> = Arc::new(adapter);
329
330 assert_eq!(adapter.name(), "noop");
332 assert!(adapter.is_healthy().await);
333
334 let events = vec![InternalEvent::from_value(json!({"test": 1}), 1, 0)];
335 let batch = Batch::new(0, events, 0);
336 adapter.on_batch(batch).await.unwrap();
337
338 adapter.flush().await.unwrap();
339 adapter.shutdown().await.unwrap();
340 }
341
342 #[cfg(any(feature = "redis", feature = "jetstream"))]
343 #[test]
344 fn redact_url_strips_userinfo() {
345 assert_eq!(
346 redact_url("redis://user:secret@redis.example.com:6379"),
347 "redis://[REDACTED]@redis.example.com:6379"
348 );
349 assert_eq!(
350 redact_url("nats://admin:p@ss@nats.svc:4222/path?foo=1"),
351 "nats://[REDACTED]@nats.svc:4222/path?foo=1"
358 );
359 assert_eq!(
360 redact_url("rediss://:tokenonly@host:6379"),
361 "rediss://[REDACTED]@host:6379"
362 );
363 }
364
365 #[cfg(any(feature = "redis", feature = "jetstream"))]
371 #[test]
372 fn redact_url_splits_on_last_at_in_authority() {
373 assert_eq!(
375 redact_url("redis://user:p@ss:word@redis.svc:6379"),
376 "redis://[REDACTED]@redis.svc:6379",
377 "the entire userinfo `user:p@ss:word` must redact, not just the first segment",
378 );
379 assert_eq!(
381 redact_url("nats://op:a@b@c@nats.svc:4222"),
382 "nats://[REDACTED]@nats.svc:4222",
383 );
384 assert_eq!(
390 redact_url("https://user:p@ss@host.example/foo@bar"),
391 "https://[REDACTED]@host.example/foo@bar",
392 );
393 }
394
395 #[cfg(any(feature = "redis", feature = "jetstream"))]
396 #[test]
397 fn redact_url_passthrough_when_no_userinfo() {
398 assert_eq!(
399 redact_url("redis://redis.svc:6379"),
400 "redis://redis.svc:6379"
401 );
402 assert_eq!(redact_url("nats://nats.svc:4222"), "nats://nats.svc:4222");
403 assert_eq!(
405 redact_url("https://example.com/path/@handle"),
406 "https://example.com/path/@handle"
407 );
408 }
409}