Skip to main content

net/adapter/
mod.rs

1//! Adapter trait and implementations for durable event storage.
2//!
3//! Adapters provide the persistence layer for the event bus. They receive
4//! batches of events from the ingestion core and store them durably.
5//!
6//! # Adapter Contract
7//!
8//! Adapters must:
9//! - Append batches in received order
10//! - Never block ingestion indefinitely
11//! - Fail fast on internal errors
12//! - Be idempotent under retry
13//! - Preserve per-shard FIFO order
14//! - NOT allocate memory per-event (only per-batch or static)
15//!
16//! # Available Adapters
17//!
18//! - `NoopAdapter`: Discards events (for testing/benchmarking)
19//! - `RedisAdapter`: Redis Streams backend (requires `redis` feature)
20//! - `JetStreamAdapter`: NATS JetStream backend (requires `jetstream` feature)
21//! - `NetAdapter`: High-performance UDP transport (requires `net` feature)
22
23mod dedup_state;
24mod noop;
25#[cfg(feature = "redis")]
26mod redis_dedup;
27
28pub use dedup_state::PersistentProducerNonce;
29#[cfg(feature = "redis")]
30pub use redis_dedup::RedisStreamDedup;
31
32#[cfg(feature = "redis")]
33mod redis;
34
35#[cfg(feature = "jetstream")]
36mod jetstream;
37
38#[cfg(feature = "net")]
39pub mod net;
40
41pub use noop::NoopAdapter;
42
43#[cfg(feature = "redis")]
44pub use self::redis::RedisAdapter;
45
46#[cfg(feature = "jetstream")]
47pub use self::jetstream::JetStreamAdapter;
48
49#[cfg(feature = "net")]
50pub use self::net::{NetAdapter, NetAdapterConfig};
51
52use async_trait::async_trait;
53
54use crate::error::AdapterError;
55use crate::event::{Batch, StoredEvent};
56
57/// Strip `user:password@` from a connection URL for safe logging /
58/// `Debug` output. Returns an `Cow::Borrowed` when no redaction is
59/// needed so the common no-credentials path is allocation-free.
60///
61/// Both adapter init logs and `Debug` impls previously emitted
62/// `config.url` verbatim. A misconfigured operator who put the
63/// password in the URL (the canonical Redis / NATS shape) would
64/// leak it into every log sink the application uses. Redaction is
65/// based on the URI spec: userinfo is the substring between
66/// `"://"` and the first `'@'`, scoped to the authority component.
67#[must_use]
68#[cfg(any(feature = "redis", feature = "jetstream"))]
69pub(crate) fn redact_url(url: &str) -> std::borrow::Cow<'_, str> {
70    let Some(scheme_end) = url.find("://") else {
71        return std::borrow::Cow::Borrowed(url);
72    };
73    let after_scheme = scheme_end + 3;
74    // Only scan within the authority component — anything past the
75    // first '/' (path) or '?' (query) terminates it.
76    let authority_end = url[after_scheme..]
77        .find(['/', '?', '#'])
78        .map_or(url.len(), |i| after_scheme + i);
79    let authority = &url[after_scheme..authority_end];
80    // Find the LAST `@` in the authority. The URI spec says
81    // userinfo terminates at the rightmost `@` in the authority,
82    // not the first — an unencoded `@` inside the password (a
83    // common operator mistake the redactor is here to catch)
84    // splits the userinfo only at the trailing delimiter.
85    // Pre-fix `find('@')` left the password tail visible: e.g.
86    // `nats://admin:p@ss@nats.svc:4222` redacted to
87    // `nats://[REDACTED]@ss@nats.svc:4222`, leaking `ss`.
88    let Some(at_pos) = authority.rfind('@') else {
89        return std::borrow::Cow::Borrowed(url);
90    };
91    let mut redacted = String::with_capacity(url.len());
92    redacted.push_str(&url[..after_scheme]);
93    redacted.push_str("[REDACTED]");
94    redacted.push_str(&authority[at_pos..]);
95    redacted.push_str(&url[authority_end..]);
96    std::borrow::Cow::Owned(redacted)
97}
98
99/// Result of polling a single shard.
100#[derive(Debug, Clone)]
101pub struct ShardPollResult {
102    /// Events retrieved from the shard.
103    pub events: Vec<StoredEvent>,
104    /// Cursor for the next poll (backend-specific).
105    /// None if no events were returned.
106    pub next_id: Option<String>,
107    /// True if there are more events available.
108    pub has_more: bool,
109}
110
111impl ShardPollResult {
112    /// Create an empty poll result.
113    pub fn empty() -> Self {
114        Self {
115            events: Vec::new(),
116            next_id: None,
117            has_more: false,
118        }
119    }
120}
121
122/// Adapter trait for durable event storage.
123///
124/// # Memory Allocation Constraint
125///
126/// Adapters **MUST NOT** allocate memory per-event. Allowed allocations:
127/// - Per-batch buffer allocation (reusable)
128/// - Static/pooled buffers
129/// - Connection resources
130///
131/// Forbidden:
132/// - `Vec::push` per event in hot path
133/// - String allocation per event
134/// - Any heap allocation scaling with event count
135#[async_trait]
136pub trait Adapter: Send + Sync {
137    /// Initialize the adapter.
138    ///
139    /// Called once before any other methods. Use this to establish
140    /// connections, validate configuration, etc.
141    async fn init(&mut self) -> Result<(), AdapterError>;
142
143    /// Process a batch of events.
144    ///
145    /// The adapter must persist all events in the batch atomically
146    /// (all or nothing). Events must be stored in order within the batch.
147    ///
148    /// # Errors
149    ///
150    /// - `AdapterError::Transient`: Temporary failure, retry is safe
151    /// - `AdapterError::Fatal`: Unrecoverable error, adapter is broken
152    /// - `AdapterError::Backpressure`: Backend overloaded, slow down
153    async fn on_batch(&self, batch: Batch) -> Result<(), AdapterError>;
154
155    /// Force flush any buffered data.
156    ///
157    /// Some adapters may buffer writes for efficiency. This method
158    /// forces all buffered data to be persisted.
159    async fn flush(&self) -> Result<(), AdapterError>;
160
161    /// Gracefully shut down the adapter.
162    ///
163    /// This should flush any pending data and close connections.
164    async fn shutdown(&self) -> Result<(), AdapterError>;
165
166    /// Poll events from a single shard.
167    ///
168    /// # Parameters
169    ///
170    /// - `shard_id`: The shard to poll
171    /// - `from_id`: Start cursor (exclusive). None means from the beginning.
172    /// - `limit`: Maximum number of events to return
173    ///
174    /// # Returns
175    ///
176    /// A `ShardPollResult` containing the events and pagination info.
177    async fn poll_shard(
178        &self,
179        shard_id: u16,
180        from_id: Option<&str>,
181        limit: usize,
182    ) -> Result<ShardPollResult, AdapterError>;
183
184    /// Get the adapter name (for logging/metrics).
185    fn name(&self) -> &'static str;
186
187    /// Check if the adapter is healthy.
188    ///
189    /// Returns true if the adapter can accept batches.
190    async fn is_healthy(&self) -> bool {
191        true
192    }
193}
194
195/// Wrapper to make `Box<dyn Adapter>` implement Adapter.
196#[async_trait]
197impl Adapter for Box<dyn Adapter> {
198    async fn init(&mut self) -> Result<(), AdapterError> {
199        (**self).init().await
200    }
201
202    async fn on_batch(&self, batch: Batch) -> Result<(), AdapterError> {
203        (**self).on_batch(batch).await
204    }
205
206    async fn flush(&self) -> Result<(), AdapterError> {
207        (**self).flush().await
208    }
209
210    async fn shutdown(&self) -> Result<(), AdapterError> {
211        (**self).shutdown().await
212    }
213
214    async fn poll_shard(
215        &self,
216        shard_id: u16,
217        from_id: Option<&str>,
218        limit: usize,
219    ) -> Result<ShardPollResult, AdapterError> {
220        (**self).poll_shard(shard_id, from_id, limit).await
221    }
222
223    fn name(&self) -> &'static str {
224        (**self).name()
225    }
226
227    async fn is_healthy(&self) -> bool {
228        (**self).is_healthy().await
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use crate::event::InternalEvent;
236    use serde_json::json;
237    use std::sync::Arc;
238
239    #[tokio::test]
240    async fn test_noop_adapter() {
241        let mut adapter = NoopAdapter::new();
242        adapter.init().await.unwrap();
243
244        let events = vec![
245            InternalEvent::from_value(json!({"test": 1}), 1, 0),
246            InternalEvent::from_value(json!({"test": 2}), 2, 0),
247        ];
248        let batch = Batch::new(0, events, 0);
249
250        adapter.on_batch(batch).await.unwrap();
251        adapter.flush().await.unwrap();
252
253        // Noop adapter doesn't store anything
254        let result = adapter.poll_shard(0, None, 10).await.unwrap();
255        assert!(result.events.is_empty());
256
257        adapter.shutdown().await.unwrap();
258    }
259
260    #[test]
261    fn test_shard_poll_result_empty() {
262        let result = ShardPollResult::empty();
263        assert!(result.events.is_empty());
264        assert!(result.next_id.is_none());
265        assert!(!result.has_more);
266    }
267
268    #[test]
269    fn test_shard_poll_result_debug() {
270        let result = ShardPollResult::empty();
271        let debug = format!("{:?}", result);
272        assert!(debug.contains("ShardPollResult"));
273    }
274
275    #[test]
276    fn test_shard_poll_result_clone() {
277        let mut result = ShardPollResult::empty();
278        result.next_id = Some("cursor".to_string());
279        result.has_more = true;
280
281        let cloned = result.clone();
282        assert_eq!(cloned.next_id, Some("cursor".to_string()));
283        assert!(cloned.has_more);
284    }
285
286    #[tokio::test]
287    async fn test_noop_adapter_name() {
288        let adapter = NoopAdapter::new();
289        assert_eq!(adapter.name(), "noop");
290    }
291
292    #[tokio::test]
293    async fn test_noop_adapter_is_healthy() {
294        let mut adapter = NoopAdapter::new();
295        // Not healthy before init
296        assert!(!adapter.is_healthy().await);
297        // Healthy after init
298        adapter.init().await.unwrap();
299        assert!(adapter.is_healthy().await);
300    }
301
302    #[tokio::test]
303    async fn test_boxed_adapter() {
304        let mut adapter: Box<dyn Adapter> = Box::new(NoopAdapter::new());
305
306        // Test all trait methods through Box
307        adapter.init().await.unwrap();
308        assert_eq!(adapter.name(), "noop");
309        assert!(adapter.is_healthy().await);
310
311        let events = vec![InternalEvent::from_value(json!({"test": 1}), 1, 0)];
312        let batch = Batch::new(0, events, 0);
313        adapter.on_batch(batch).await.unwrap();
314
315        adapter.flush().await.unwrap();
316
317        let result = adapter.poll_shard(0, None, 10).await.unwrap();
318        assert!(result.events.is_empty());
319
320        adapter.shutdown().await.unwrap();
321    }
322
323    #[tokio::test]
324    async fn test_arc_adapter() {
325        let mut adapter = NoopAdapter::new();
326        adapter.init().await.unwrap();
327
328        let adapter: Arc<dyn Adapter> = Arc::new(adapter);
329
330        // Test methods through Arc
331        assert_eq!(adapter.name(), "noop");
332        assert!(adapter.is_healthy().await);
333
334        let events = vec![InternalEvent::from_value(json!({"test": 1}), 1, 0)];
335        let batch = Batch::new(0, events, 0);
336        adapter.on_batch(batch).await.unwrap();
337
338        adapter.flush().await.unwrap();
339        adapter.shutdown().await.unwrap();
340    }
341
342    #[cfg(any(feature = "redis", feature = "jetstream"))]
343    #[test]
344    fn redact_url_strips_userinfo() {
345        assert_eq!(
346            redact_url("redis://user:secret@redis.example.com:6379"),
347            "redis://[REDACTED]@redis.example.com:6379"
348        );
349        assert_eq!(
350            redact_url("nats://admin:p@ss@nats.svc:4222/path?foo=1"),
351            // Password contains an unencoded '@' — URI spec says
352            // userinfo terminates at the LAST `@` in the
353            // authority, so the full `admin:p@ss` redacts.
354            // Pre-fix used `find('@')` and emitted
355            // `nats://[REDACTED]@ss@nats.svc:4222/...`, leaking
356            // the password tail `ss`.
357            "nats://[REDACTED]@nats.svc:4222/path?foo=1"
358        );
359        assert_eq!(
360            redact_url("rediss://:tokenonly@host:6379"),
361            "rediss://[REDACTED]@host:6379"
362        );
363    }
364
365    /// Regression: `redact_url` must split on the LAST `@` in the
366    /// authority, not the first. Pre-fix `find('@')` left any
367    /// password suffix after an unencoded inner `@` visible in
368    /// every log line — exactly the leak the redactor is here to
369    /// prevent.
370    #[cfg(any(feature = "redis", feature = "jetstream"))]
371    #[test]
372    fn redact_url_splits_on_last_at_in_authority() {
373        // Password with an unencoded `@` mid-string.
374        assert_eq!(
375            redact_url("redis://user:p@ss:word@redis.svc:6379"),
376            "redis://[REDACTED]@redis.svc:6379",
377            "the entire userinfo `user:p@ss:word` must redact, not just the first segment",
378        );
379        // Username + multi-`@` password.
380        assert_eq!(
381            redact_url("nats://op:a@b@c@nats.svc:4222"),
382            "nats://[REDACTED]@nats.svc:4222",
383        );
384        // Authority terminator (first `/` after `://`) bounds the
385        // search — a `@` inside the path must NOT be the split
386        // point. Without scoping to the authority, the path's `@`
387        // would steal the rfind result and the userinfo would
388        // leak through.
389        assert_eq!(
390            redact_url("https://user:p@ss@host.example/foo@bar"),
391            "https://[REDACTED]@host.example/foo@bar",
392        );
393    }
394
395    #[cfg(any(feature = "redis", feature = "jetstream"))]
396    #[test]
397    fn redact_url_passthrough_when_no_userinfo() {
398        assert_eq!(
399            redact_url("redis://redis.svc:6379"),
400            "redis://redis.svc:6379"
401        );
402        assert_eq!(redact_url("nats://nats.svc:4222"), "nats://nats.svc:4222");
403        // '@' in the path / query is not userinfo — must not redact.
404        assert_eq!(
405            redact_url("https://example.com/path/@handle"),
406            "https://example.com/path/@handle"
407        );
408    }
409}