1mod dedup_state;
24mod noop;
25#[cfg(feature = "redis")]
26mod redis_dedup;
27
28pub use dedup_state::PersistentProducerNonce;
29#[cfg(feature = "redis")]
30pub use redis_dedup::RedisStreamDedup;
31
32#[cfg(feature = "redis")]
33mod redis;
34
35#[cfg(feature = "jetstream")]
36mod jetstream;
37
38#[cfg(feature = "net")]
39pub mod net;
40
41pub use noop::NoopAdapter;
42
43#[cfg(feature = "redis")]
44pub use self::redis::RedisAdapter;
45
46#[cfg(feature = "jetstream")]
47pub use self::jetstream::JetStreamAdapter;
48
49#[cfg(feature = "net")]
50pub use self::net::{NetAdapter, NetAdapterConfig};
51
52use std::sync::Arc;
53
54use async_trait::async_trait;
55
56use crate::error::AdapterError;
57use crate::event::{Batch, StoredEvent};
58
59#[must_use]
70#[cfg(any(feature = "redis", feature = "jetstream"))]
71pub(crate) fn redact_url(url: &str) -> std::borrow::Cow<'_, str> {
72 let Some(scheme_end) = url.find("://") else {
73 return std::borrow::Cow::Borrowed(url);
74 };
75 let after_scheme = scheme_end + 3;
76 let authority_end = url[after_scheme..]
79 .find(['/', '?', '#'])
80 .map_or(url.len(), |i| after_scheme + i);
81 let authority = &url[after_scheme..authority_end];
82 let Some(at_pos) = authority.rfind('@') else {
91 return std::borrow::Cow::Borrowed(url);
92 };
93 let mut redacted = String::with_capacity(url.len());
94 redacted.push_str(&url[..after_scheme]);
95 redacted.push_str("[REDACTED]");
96 redacted.push_str(&authority[at_pos..]);
97 redacted.push_str(&url[authority_end..]);
98 std::borrow::Cow::Owned(redacted)
99}
100
101#[derive(Debug, Clone)]
103pub struct ShardPollResult {
104 pub events: Vec<StoredEvent>,
106 pub next_id: Option<String>,
109 pub has_more: bool,
111}
112
113impl ShardPollResult {
114 pub fn empty() -> Self {
116 Self {
117 events: Vec::new(),
118 next_id: None,
119 has_more: false,
120 }
121 }
122}
123
124#[async_trait]
138pub trait Adapter: Send + Sync {
139 async fn init(&mut self) -> Result<(), AdapterError>;
144
145 async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError>;
165
166 async fn flush(&self) -> Result<(), AdapterError>;
171
172 async fn shutdown(&self) -> Result<(), AdapterError>;
176
177 async fn poll_shard(
189 &self,
190 shard_id: u16,
191 from_id: Option<&str>,
192 limit: usize,
193 ) -> Result<ShardPollResult, AdapterError>;
194
195 fn name(&self) -> &'static str;
197
198 async fn is_healthy(&self) -> bool {
202 true
203 }
204}
205
206#[async_trait]
208impl Adapter for Box<dyn Adapter> {
209 async fn init(&mut self) -> Result<(), AdapterError> {
210 (**self).init().await
211 }
212
213 async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
214 (**self).on_batch(batch).await
215 }
216
217 async fn flush(&self) -> Result<(), AdapterError> {
218 (**self).flush().await
219 }
220
221 async fn shutdown(&self) -> Result<(), AdapterError> {
222 (**self).shutdown().await
223 }
224
225 async fn poll_shard(
226 &self,
227 shard_id: u16,
228 from_id: Option<&str>,
229 limit: usize,
230 ) -> Result<ShardPollResult, AdapterError> {
231 (**self).poll_shard(shard_id, from_id, limit).await
232 }
233
234 fn name(&self) -> &'static str {
235 (**self).name()
236 }
237
238 async fn is_healthy(&self) -> bool {
239 (**self).is_healthy().await
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246 use crate::event::InternalEvent;
247 use serde_json::json;
248 use std::sync::Arc;
249
250 #[tokio::test]
251 async fn test_noop_adapter() {
252 let mut adapter = NoopAdapter::new();
253 adapter.init().await.unwrap();
254
255 let events = vec![
256 InternalEvent::from_value(json!({"test": 1}), 1, 0),
257 InternalEvent::from_value(json!({"test": 2}), 2, 0),
258 ];
259 let batch = Batch::new(0, events, 0);
260
261 adapter.on_batch(Arc::new(batch)).await.unwrap();
262 adapter.flush().await.unwrap();
263
264 let result = adapter.poll_shard(0, None, 10).await.unwrap();
266 assert!(result.events.is_empty());
267
268 adapter.shutdown().await.unwrap();
269 }
270
271 #[test]
272 fn test_shard_poll_result_empty() {
273 let result = ShardPollResult::empty();
274 assert!(result.events.is_empty());
275 assert!(result.next_id.is_none());
276 assert!(!result.has_more);
277 }
278
279 #[test]
280 fn test_shard_poll_result_debug() {
281 let result = ShardPollResult::empty();
282 let debug = format!("{:?}", result);
283 assert!(debug.contains("ShardPollResult"));
284 }
285
286 #[test]
287 fn test_shard_poll_result_clone() {
288 let mut result = ShardPollResult::empty();
289 result.next_id = Some("cursor".to_string());
290 result.has_more = true;
291
292 let cloned = result.clone();
293 assert_eq!(cloned.next_id, Some("cursor".to_string()));
294 assert!(cloned.has_more);
295 }
296
297 #[tokio::test]
298 async fn test_noop_adapter_name() {
299 let adapter = NoopAdapter::new();
300 assert_eq!(adapter.name(), "noop");
301 }
302
303 #[tokio::test]
304 async fn test_noop_adapter_is_healthy() {
305 let mut adapter = NoopAdapter::new();
306 assert!(!adapter.is_healthy().await);
308 adapter.init().await.unwrap();
310 assert!(adapter.is_healthy().await);
311 }
312
313 #[tokio::test]
314 async fn test_boxed_adapter() {
315 let mut adapter: Box<dyn Adapter> = Box::new(NoopAdapter::new());
316
317 adapter.init().await.unwrap();
319 assert_eq!(adapter.name(), "noop");
320 assert!(adapter.is_healthy().await);
321
322 let events = vec![InternalEvent::from_value(json!({"test": 1}), 1, 0)];
323 let batch = Batch::new(0, events, 0);
324 adapter.on_batch(Arc::new(batch)).await.unwrap();
325
326 adapter.flush().await.unwrap();
327
328 let result = adapter.poll_shard(0, None, 10).await.unwrap();
329 assert!(result.events.is_empty());
330
331 adapter.shutdown().await.unwrap();
332 }
333
334 #[tokio::test]
335 async fn test_arc_adapter() {
336 let mut adapter = NoopAdapter::new();
337 adapter.init().await.unwrap();
338
339 let adapter: Arc<dyn Adapter> = Arc::new(adapter);
340
341 assert_eq!(adapter.name(), "noop");
343 assert!(adapter.is_healthy().await);
344
345 let events = vec![InternalEvent::from_value(json!({"test": 1}), 1, 0)];
346 let batch = Batch::new(0, events, 0);
347 adapter.on_batch(Arc::new(batch)).await.unwrap();
348
349 adapter.flush().await.unwrap();
350 adapter.shutdown().await.unwrap();
351 }
352
353 #[cfg(any(feature = "redis", feature = "jetstream"))]
354 #[test]
355 fn redact_url_strips_userinfo() {
356 assert_eq!(
357 redact_url("redis://user:secret@redis.example.com:6379"),
358 "redis://[REDACTED]@redis.example.com:6379"
359 );
360 assert_eq!(
361 redact_url("nats://admin:p@ss@nats.svc:4222/path?foo=1"),
362 "nats://[REDACTED]@nats.svc:4222/path?foo=1"
369 );
370 assert_eq!(
371 redact_url("rediss://:tokenonly@host:6379"),
372 "rediss://[REDACTED]@host:6379"
373 );
374 }
375
376 #[cfg(any(feature = "redis", feature = "jetstream"))]
382 #[test]
383 fn redact_url_splits_on_last_at_in_authority() {
384 assert_eq!(
386 redact_url("redis://user:p@ss:word@redis.svc:6379"),
387 "redis://[REDACTED]@redis.svc:6379",
388 "the entire userinfo `user:p@ss:word` must redact, not just the first segment",
389 );
390 assert_eq!(
392 redact_url("nats://op:a@b@c@nats.svc:4222"),
393 "nats://[REDACTED]@nats.svc:4222",
394 );
395 assert_eq!(
401 redact_url("https://user:p@ss@host.example/foo@bar"),
402 "https://[REDACTED]@host.example/foo@bar",
403 );
404 }
405
406 #[cfg(any(feature = "redis", feature = "jetstream"))]
407 #[test]
408 fn redact_url_passthrough_when_no_userinfo() {
409 assert_eq!(
410 redact_url("redis://redis.svc:6379"),
411 "redis://redis.svc:6379"
412 );
413 assert_eq!(redact_url("nats://nats.svc:4222"), "nats://nats.svc:4222");
414 assert_eq!(
416 redact_url("https://example.com/path/@handle"),
417 "https://example.com/path/@handle"
418 );
419 }
420}