Skip to main content

net/adapter/
noop.rs

1//! No-op adapter for testing and benchmarking.
2//!
3//! This adapter discards all events. Useful for:
4//! - Benchmarking ingestion throughput without backend overhead
5//! - Testing the event bus without a real backend
6//! - Development and prototyping
7
8use async_trait::async_trait;
9use std::sync::atomic::{AtomicU64, Ordering};
10
11use crate::adapter::{Adapter, ShardPollResult};
12use crate::error::AdapterError;
13use crate::event::Batch;
14
15/// No-op adapter that discards all events.
16///
17/// This adapter is useful for:
18/// - Measuring pure ingestion throughput
19/// - Testing without a backend
20/// - Development/prototyping
21#[derive(Debug, Default)]
22pub struct NoopAdapter {
23    /// Count of batches received (for testing).
24    batches_received: AtomicU64,
25    /// Count of events received (for testing).
26    events_received: AtomicU64,
27    /// Whether the adapter has been initialized.
28    initialized: std::sync::atomic::AtomicBool,
29}
30
31impl NoopAdapter {
32    /// Create a new no-op adapter.
33    pub fn new() -> Self {
34        Self::default()
35    }
36
37    /// Get the number of batches received.
38    pub fn batches_received(&self) -> u64 {
39        self.batches_received.load(Ordering::Relaxed)
40    }
41
42    /// Get the number of events received.
43    pub fn events_received(&self) -> u64 {
44        self.events_received.load(Ordering::Relaxed)
45    }
46
47    /// Reset counters.
48    pub fn reset(&self) {
49        self.batches_received.store(0, Ordering::Relaxed);
50        self.events_received.store(0, Ordering::Relaxed);
51    }
52}
53
54#[async_trait]
55impl Adapter for NoopAdapter {
56    async fn init(&mut self) -> Result<(), AdapterError> {
57        self.initialized
58            .store(true, std::sync::atomic::Ordering::Release);
59        Ok(())
60    }
61
62    async fn on_batch(&self, batch: std::sync::Arc<Batch>) -> Result<(), AdapterError> {
63        // Just count, don't store
64        self.batches_received.fetch_add(1, Ordering::Relaxed);
65        self.events_received
66            .fetch_add(batch.len() as u64, Ordering::Relaxed);
67        Ok(())
68    }
69
70    async fn flush(&self) -> Result<(), AdapterError> {
71        // Nothing to flush
72        Ok(())
73    }
74
75    async fn shutdown(&self) -> Result<(), AdapterError> {
76        // Nothing to clean up
77        Ok(())
78    }
79
80    async fn poll_shard(
81        &self,
82        _shard_id: u16,
83        _from_id: Option<&str>,
84        _limit: usize,
85    ) -> Result<ShardPollResult, AdapterError> {
86        // No events stored
87        Ok(ShardPollResult::empty())
88    }
89
90    fn name(&self) -> &'static str {
91        "noop"
92    }
93
94    async fn is_healthy(&self) -> bool {
95        self.initialized.load(std::sync::atomic::Ordering::Acquire)
96    }
97}
98
99#[cfg(test)]
100mod tests {
101    use super::*;
102    use crate::event::InternalEvent;
103    use serde_json::json;
104
105    #[tokio::test]
106    async fn test_noop_counts() {
107        let mut adapter = NoopAdapter::new();
108        adapter.init().await.unwrap();
109
110        assert_eq!(adapter.batches_received(), 0);
111        assert_eq!(adapter.events_received(), 0);
112
113        let events = vec![
114            InternalEvent::from_value(json!({"a": 1}), 1, 0),
115            InternalEvent::from_value(json!({"a": 2}), 2, 0),
116            InternalEvent::from_value(json!({"a": 3}), 3, 0),
117        ];
118        let batch = Batch::new(0, events, 0);
119
120        adapter.on_batch(std::sync::Arc::new(batch)).await.unwrap();
121
122        assert_eq!(adapter.batches_received(), 1);
123        assert_eq!(adapter.events_received(), 3);
124
125        adapter.reset();
126        assert_eq!(adapter.batches_received(), 0);
127        assert_eq!(adapter.events_received(), 0);
128    }
129
130    #[tokio::test]
131    async fn test_noop_poll_empty() {
132        let mut adapter = NoopAdapter::new();
133        adapter.init().await.unwrap();
134
135        let result = adapter.poll_shard(0, None, 100).await.unwrap();
136        assert!(result.events.is_empty());
137        assert!(!result.has_more);
138        assert!(result.next_id.is_none());
139    }
140}