1use async_trait::async_trait;
9use std::sync::atomic::{AtomicU64, Ordering};
10
11use crate::adapter::{Adapter, ShardPollResult};
12use crate::error::AdapterError;
13use crate::event::Batch;
14
15#[derive(Debug, Default)]
22pub struct NoopAdapter {
23 batches_received: AtomicU64,
25 events_received: AtomicU64,
27 initialized: std::sync::atomic::AtomicBool,
29}
30
31impl NoopAdapter {
32 pub fn new() -> Self {
34 Self::default()
35 }
36
37 pub fn batches_received(&self) -> u64 {
39 self.batches_received.load(Ordering::Relaxed)
40 }
41
42 pub fn events_received(&self) -> u64 {
44 self.events_received.load(Ordering::Relaxed)
45 }
46
47 pub fn reset(&self) {
49 self.batches_received.store(0, Ordering::Relaxed);
50 self.events_received.store(0, Ordering::Relaxed);
51 }
52}
53
54#[async_trait]
55impl Adapter for NoopAdapter {
56 async fn init(&mut self) -> Result<(), AdapterError> {
57 self.initialized
58 .store(true, std::sync::atomic::Ordering::Release);
59 Ok(())
60 }
61
62 async fn on_batch(&self, batch: std::sync::Arc<Batch>) -> Result<(), AdapterError> {
63 self.batches_received.fetch_add(1, Ordering::Relaxed);
65 self.events_received
66 .fetch_add(batch.len() as u64, Ordering::Relaxed);
67 Ok(())
68 }
69
70 async fn flush(&self) -> Result<(), AdapterError> {
71 Ok(())
73 }
74
75 async fn shutdown(&self) -> Result<(), AdapterError> {
76 Ok(())
78 }
79
80 async fn poll_shard(
81 &self,
82 _shard_id: u16,
83 _from_id: Option<&str>,
84 _limit: usize,
85 ) -> Result<ShardPollResult, AdapterError> {
86 Ok(ShardPollResult::empty())
88 }
89
90 fn name(&self) -> &'static str {
91 "noop"
92 }
93
94 async fn is_healthy(&self) -> bool {
95 self.initialized.load(std::sync::atomic::Ordering::Acquire)
96 }
97}
98
99#[cfg(test)]
100mod tests {
101 use super::*;
102 use crate::event::InternalEvent;
103 use serde_json::json;
104
105 #[tokio::test]
106 async fn test_noop_counts() {
107 let mut adapter = NoopAdapter::new();
108 adapter.init().await.unwrap();
109
110 assert_eq!(adapter.batches_received(), 0);
111 assert_eq!(adapter.events_received(), 0);
112
113 let events = vec![
114 InternalEvent::from_value(json!({"a": 1}), 1, 0),
115 InternalEvent::from_value(json!({"a": 2}), 2, 0),
116 InternalEvent::from_value(json!({"a": 3}), 3, 0),
117 ];
118 let batch = Batch::new(0, events, 0);
119
120 adapter.on_batch(std::sync::Arc::new(batch)).await.unwrap();
121
122 assert_eq!(adapter.batches_received(), 1);
123 assert_eq!(adapter.events_received(), 3);
124
125 adapter.reset();
126 assert_eq!(adapter.batches_received(), 0);
127 assert_eq!(adapter.events_received(), 0);
128 }
129
130 #[tokio::test]
131 async fn test_noop_poll_empty() {
132 let mut adapter = NoopAdapter::new();
133 adapter.init().await.unwrap();
134
135 let result = adapter.poll_shard(0, None, 100).await.unwrap();
136 assert!(result.events.is_empty());
137 assert!(!result.has_more);
138 assert!(result.next_id.is_none());
139 }
140}