Skip to main content

net/adapter/
mod.rs

1//! Adapter trait and implementations for durable event storage.
2//!
3//! Adapters provide the persistence layer for the event bus. They receive
4//! batches of events from the ingestion core and store them durably.
5//!
6//! # Adapter Contract
7//!
8//! Adapters must:
9//! - Append batches in received order
10//! - Never block ingestion indefinitely
11//! - Fail fast on internal errors
12//! - Be idempotent under retry
13//! - Preserve per-shard FIFO order
14//! - NOT allocate memory per-event (only per-batch or static)
15//!
16//! # Available Adapters
17//!
18//! - `NoopAdapter`: Discards events (for testing/benchmarking)
19//! - `RedisAdapter`: Redis Streams backend (requires `redis` feature)
20//! - `JetStreamAdapter`: NATS JetStream backend (requires `jetstream` feature)
21//! - `NetAdapter`: High-performance UDP transport (requires `net` feature)
22
23mod dedup_state;
24mod noop;
25#[cfg(feature = "redis")]
26mod redis_dedup;
27
28pub use dedup_state::PersistentProducerNonce;
29#[cfg(feature = "redis")]
30pub use redis_dedup::RedisStreamDedup;
31
32#[cfg(feature = "redis")]
33mod redis;
34
35#[cfg(feature = "jetstream")]
36mod jetstream;
37
38#[cfg(feature = "net")]
39pub mod net;
40
41pub use noop::NoopAdapter;
42
43#[cfg(feature = "redis")]
44pub use self::redis::RedisAdapter;
45
46#[cfg(feature = "jetstream")]
47pub use self::jetstream::JetStreamAdapter;
48
49#[cfg(feature = "net")]
50pub use self::net::{NetAdapter, NetAdapterConfig};
51
52use std::sync::Arc;
53
54use async_trait::async_trait;
55
56use crate::error::AdapterError;
57use crate::event::{Batch, StoredEvent};
58
59/// Strip `user:password@` from a connection URL for safe logging /
60/// `Debug` output. Returns an `Cow::Borrowed` when no redaction is
61/// needed so the common no-credentials path is allocation-free.
62///
63/// Both adapter init logs and `Debug` impls previously emitted
64/// `config.url` verbatim. A misconfigured operator who put the
65/// password in the URL (the canonical Redis / NATS shape) would
66/// leak it into every log sink the application uses. Redaction is
67/// based on the URI spec: userinfo is the substring between
68/// `"://"` and the first `'@'`, scoped to the authority component.
69#[must_use]
70#[cfg(any(feature = "redis", feature = "jetstream"))]
71pub(crate) fn redact_url(url: &str) -> std::borrow::Cow<'_, str> {
72    let Some(scheme_end) = url.find("://") else {
73        return std::borrow::Cow::Borrowed(url);
74    };
75    let after_scheme = scheme_end + 3;
76    // Only scan within the authority component — anything past the
77    // first '/' (path) or '?' (query) terminates it.
78    let authority_end = url[after_scheme..]
79        .find(['/', '?', '#'])
80        .map_or(url.len(), |i| after_scheme + i);
81    let authority = &url[after_scheme..authority_end];
82    // Find the LAST `@` in the authority. The URI spec says
83    // userinfo terminates at the rightmost `@` in the authority,
84    // not the first — an unencoded `@` inside the password (a
85    // common operator mistake the redactor is here to catch)
86    // splits the userinfo only at the trailing delimiter.
87    // Pre-fix `find('@')` left the password tail visible: e.g.
88    // `nats://admin:p@ss@nats.svc:4222` redacted to
89    // `nats://[REDACTED]@ss@nats.svc:4222`, leaking `ss`.
90    let Some(at_pos) = authority.rfind('@') else {
91        return std::borrow::Cow::Borrowed(url);
92    };
93    let mut redacted = String::with_capacity(url.len());
94    redacted.push_str(&url[..after_scheme]);
95    redacted.push_str("[REDACTED]");
96    redacted.push_str(&authority[at_pos..]);
97    redacted.push_str(&url[authority_end..]);
98    std::borrow::Cow::Owned(redacted)
99}
100
101/// Result of polling a single shard.
102#[derive(Debug, Clone)]
103pub struct ShardPollResult {
104    /// Events retrieved from the shard.
105    pub events: Vec<StoredEvent>,
106    /// Cursor for the next poll (backend-specific).
107    /// None if no events were returned.
108    pub next_id: Option<String>,
109    /// True if there are more events available.
110    pub has_more: bool,
111}
112
113impl ShardPollResult {
114    /// Create an empty poll result.
115    pub fn empty() -> Self {
116        Self {
117            events: Vec::new(),
118            next_id: None,
119            has_more: false,
120        }
121    }
122}
123
124/// Adapter trait for durable event storage.
125///
126/// # Memory Allocation Constraint
127///
128/// Adapters **MUST NOT** allocate memory per-event. Allowed allocations:
129/// - Per-batch buffer allocation (reusable)
130/// - Static/pooled buffers
131/// - Connection resources
132///
133/// Forbidden:
134/// - `Vec::push` per event in hot path
135/// - String allocation per event
136/// - Any heap allocation scaling with event count
137#[async_trait]
138pub trait Adapter: Send + Sync {
139    /// Initialize the adapter.
140    ///
141    /// Called once before any other methods. Use this to establish
142    /// connections, validate configuration, etc.
143    async fn init(&mut self) -> Result<(), AdapterError>;
144
145    /// Process a batch of events.
146    ///
147    /// The adapter must persist all events in the batch atomically
148    /// (all or nothing). Events must be stored in order within the batch.
149    ///
150    /// `batch` is passed as `Arc<Batch>` so the dispatch retry loop
151    /// can clone cheaply (refcount bump) instead of deep-cloning the
152    /// events `Vec` on every attempt — the common path is `retries
153    /// == 0` so the prior `batch.clone()` was almost always wasted.
154    /// Implementations that only read events (the overwhelming
155    /// majority) pay nothing for the wrap; the one that genuinely
156    /// consumes can `Arc::try_unwrap` and fall back to clone on
157    /// contention.
158    ///
159    /// # Errors
160    ///
161    /// - `AdapterError::Transient`: Temporary failure, retry is safe
162    /// - `AdapterError::Fatal`: Unrecoverable error, adapter is broken
163    /// - `AdapterError::Backpressure`: Backend overloaded, slow down
164    async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError>;
165
166    /// Force flush any buffered data.
167    ///
168    /// Some adapters may buffer writes for efficiency. This method
169    /// forces all buffered data to be persisted.
170    async fn flush(&self) -> Result<(), AdapterError>;
171
172    /// Gracefully shut down the adapter.
173    ///
174    /// This should flush any pending data and close connections.
175    async fn shutdown(&self) -> Result<(), AdapterError>;
176
177    /// Poll events from a single shard.
178    ///
179    /// # Parameters
180    ///
181    /// - `shard_id`: The shard to poll
182    /// - `from_id`: Start cursor (exclusive). None means from the beginning.
183    /// - `limit`: Maximum number of events to return
184    ///
185    /// # Returns
186    ///
187    /// A `ShardPollResult` containing the events and pagination info.
188    async fn poll_shard(
189        &self,
190        shard_id: u16,
191        from_id: Option<&str>,
192        limit: usize,
193    ) -> Result<ShardPollResult, AdapterError>;
194
195    /// Get the adapter name (for logging/metrics).
196    fn name(&self) -> &'static str;
197
198    /// Check if the adapter is healthy.
199    ///
200    /// Returns true if the adapter can accept batches.
201    async fn is_healthy(&self) -> bool {
202        true
203    }
204}
205
206/// Wrapper to make `Box<dyn Adapter>` implement Adapter.
207#[async_trait]
208impl Adapter for Box<dyn Adapter> {
209    async fn init(&mut self) -> Result<(), AdapterError> {
210        (**self).init().await
211    }
212
213    async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
214        (**self).on_batch(batch).await
215    }
216
217    async fn flush(&self) -> Result<(), AdapterError> {
218        (**self).flush().await
219    }
220
221    async fn shutdown(&self) -> Result<(), AdapterError> {
222        (**self).shutdown().await
223    }
224
225    async fn poll_shard(
226        &self,
227        shard_id: u16,
228        from_id: Option<&str>,
229        limit: usize,
230    ) -> Result<ShardPollResult, AdapterError> {
231        (**self).poll_shard(shard_id, from_id, limit).await
232    }
233
234    fn name(&self) -> &'static str {
235        (**self).name()
236    }
237
238    async fn is_healthy(&self) -> bool {
239        (**self).is_healthy().await
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246    use crate::event::InternalEvent;
247    use serde_json::json;
248    use std::sync::Arc;
249
250    #[tokio::test]
251    async fn test_noop_adapter() {
252        let mut adapter = NoopAdapter::new();
253        adapter.init().await.unwrap();
254
255        let events = vec![
256            InternalEvent::from_value(json!({"test": 1}), 1, 0),
257            InternalEvent::from_value(json!({"test": 2}), 2, 0),
258        ];
259        let batch = Batch::new(0, events, 0);
260
261        adapter.on_batch(Arc::new(batch)).await.unwrap();
262        adapter.flush().await.unwrap();
263
264        // Noop adapter doesn't store anything
265        let result = adapter.poll_shard(0, None, 10).await.unwrap();
266        assert!(result.events.is_empty());
267
268        adapter.shutdown().await.unwrap();
269    }
270
271    #[test]
272    fn test_shard_poll_result_empty() {
273        let result = ShardPollResult::empty();
274        assert!(result.events.is_empty());
275        assert!(result.next_id.is_none());
276        assert!(!result.has_more);
277    }
278
279    #[test]
280    fn test_shard_poll_result_debug() {
281        let result = ShardPollResult::empty();
282        let debug = format!("{:?}", result);
283        assert!(debug.contains("ShardPollResult"));
284    }
285
286    #[test]
287    fn test_shard_poll_result_clone() {
288        let mut result = ShardPollResult::empty();
289        result.next_id = Some("cursor".to_string());
290        result.has_more = true;
291
292        let cloned = result.clone();
293        assert_eq!(cloned.next_id, Some("cursor".to_string()));
294        assert!(cloned.has_more);
295    }
296
297    #[tokio::test]
298    async fn test_noop_adapter_name() {
299        let adapter = NoopAdapter::new();
300        assert_eq!(adapter.name(), "noop");
301    }
302
303    #[tokio::test]
304    async fn test_noop_adapter_is_healthy() {
305        let mut adapter = NoopAdapter::new();
306        // Not healthy before init
307        assert!(!adapter.is_healthy().await);
308        // Healthy after init
309        adapter.init().await.unwrap();
310        assert!(adapter.is_healthy().await);
311    }
312
313    #[tokio::test]
314    async fn test_boxed_adapter() {
315        let mut adapter: Box<dyn Adapter> = Box::new(NoopAdapter::new());
316
317        // Test all trait methods through Box
318        adapter.init().await.unwrap();
319        assert_eq!(adapter.name(), "noop");
320        assert!(adapter.is_healthy().await);
321
322        let events = vec![InternalEvent::from_value(json!({"test": 1}), 1, 0)];
323        let batch = Batch::new(0, events, 0);
324        adapter.on_batch(Arc::new(batch)).await.unwrap();
325
326        adapter.flush().await.unwrap();
327
328        let result = adapter.poll_shard(0, None, 10).await.unwrap();
329        assert!(result.events.is_empty());
330
331        adapter.shutdown().await.unwrap();
332    }
333
334    #[tokio::test]
335    async fn test_arc_adapter() {
336        let mut adapter = NoopAdapter::new();
337        adapter.init().await.unwrap();
338
339        let adapter: Arc<dyn Adapter> = Arc::new(adapter);
340
341        // Test methods through Arc
342        assert_eq!(adapter.name(), "noop");
343        assert!(adapter.is_healthy().await);
344
345        let events = vec![InternalEvent::from_value(json!({"test": 1}), 1, 0)];
346        let batch = Batch::new(0, events, 0);
347        adapter.on_batch(Arc::new(batch)).await.unwrap();
348
349        adapter.flush().await.unwrap();
350        adapter.shutdown().await.unwrap();
351    }
352
353    #[cfg(any(feature = "redis", feature = "jetstream"))]
354    #[test]
355    fn redact_url_strips_userinfo() {
356        assert_eq!(
357            redact_url("redis://user:secret@redis.example.com:6379"),
358            "redis://[REDACTED]@redis.example.com:6379"
359        );
360        assert_eq!(
361            redact_url("nats://admin:p@ss@nats.svc:4222/path?foo=1"),
362            // Password contains an unencoded '@' — URI spec says
363            // userinfo terminates at the LAST `@` in the
364            // authority, so the full `admin:p@ss` redacts.
365            // Pre-fix used `find('@')` and emitted
366            // `nats://[REDACTED]@ss@nats.svc:4222/...`, leaking
367            // the password tail `ss`.
368            "nats://[REDACTED]@nats.svc:4222/path?foo=1"
369        );
370        assert_eq!(
371            redact_url("rediss://:tokenonly@host:6379"),
372            "rediss://[REDACTED]@host:6379"
373        );
374    }
375
376    /// Regression: `redact_url` must split on the LAST `@` in the
377    /// authority, not the first. Pre-fix `find('@')` left any
378    /// password suffix after an unencoded inner `@` visible in
379    /// every log line — exactly the leak the redactor is here to
380    /// prevent.
381    #[cfg(any(feature = "redis", feature = "jetstream"))]
382    #[test]
383    fn redact_url_splits_on_last_at_in_authority() {
384        // Password with an unencoded `@` mid-string.
385        assert_eq!(
386            redact_url("redis://user:p@ss:word@redis.svc:6379"),
387            "redis://[REDACTED]@redis.svc:6379",
388            "the entire userinfo `user:p@ss:word` must redact, not just the first segment",
389        );
390        // Username + multi-`@` password.
391        assert_eq!(
392            redact_url("nats://op:a@b@c@nats.svc:4222"),
393            "nats://[REDACTED]@nats.svc:4222",
394        );
395        // Authority terminator (first `/` after `://`) bounds the
396        // search — a `@` inside the path must NOT be the split
397        // point. Without scoping to the authority, the path's `@`
398        // would steal the rfind result and the userinfo would
399        // leak through.
400        assert_eq!(
401            redact_url("https://user:p@ss@host.example/foo@bar"),
402            "https://[REDACTED]@host.example/foo@bar",
403        );
404    }
405
406    #[cfg(any(feature = "redis", feature = "jetstream"))]
407    #[test]
408    fn redact_url_passthrough_when_no_userinfo() {
409        assert_eq!(
410            redact_url("redis://redis.svc:6379"),
411            "redis://redis.svc:6379"
412        );
413        assert_eq!(redact_url("nats://nats.svc:4222"), "nats://nats.svc:4222");
414        // '@' in the path / query is not userinfo — must not redact.
415        assert_eq!(
416            redact_url("https://example.com/path/@handle"),
417            "https://example.com/path/@handle"
418        );
419    }
420}