1use crate::error::FaucetError;
4use async_trait::async_trait;
5use serde_json::Value;
6
7#[async_trait]
9pub trait Source: Send + Sync {
10 async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError>;
12
13 async fn fetch_all_incremental(&self) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
22 let records = self.fetch_all().await?;
23 Ok((records, None))
24 }
25
26 fn config_schema(&self) -> Value {
34 serde_json::json!({"type": "object", "properties": {}})
35 }
36}
37
38#[async_trait]
40pub trait Sink: Send + Sync {
41 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError>;
45
46 async fn flush(&self) -> Result<(), FaucetError> {
51 Ok(())
52 }
53
54 fn config_schema(&self) -> Value {
62 serde_json::json!({"type": "object", "properties": {}})
63 }
64}
65
66#[cfg(test)]
67mod tests {
68 use super::*;
69 use serde_json::json;
70
71 struct MockSource {
74 records: Vec<Value>,
75 }
76
77 #[async_trait]
78 impl Source for MockSource {
79 async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
80 Ok(self.records.clone())
81 }
82 }
83
84 struct IncrementalSource {
85 records: Vec<Value>,
86 bookmark: Value,
87 }
88
89 #[async_trait]
90 impl Source for IncrementalSource {
91 async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
92 Ok(self.records.clone())
93 }
94
95 async fn fetch_all_incremental(&self) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
96 Ok((self.records.clone(), Some(self.bookmark.clone())))
97 }
98 }
99
100 struct FailingSource;
101
102 #[async_trait]
103 impl Source for FailingSource {
104 async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
105 Err(FaucetError::Auth("no credentials".into()))
106 }
107 }
108
109 struct MockSink {
112 written: std::sync::Mutex<Vec<Value>>,
113 }
114
115 impl MockSink {
116 fn new() -> Self {
117 Self {
118 written: std::sync::Mutex::new(Vec::new()),
119 }
120 }
121 }
122
123 #[async_trait]
124 impl Sink for MockSink {
125 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
126 let mut w = self.written.lock().unwrap();
127 w.extend(records.iter().cloned());
128 Ok(records.len())
129 }
130 }
131
132 struct FailingSink;
133
134 #[async_trait]
135 impl Sink for FailingSink {
136 async fn write_batch(&self, _records: &[Value]) -> Result<usize, FaucetError> {
137 Err(FaucetError::Sink("write failed".into()))
138 }
139 }
140
141 #[tokio::test]
144 async fn source_fetch_all_returns_records() {
145 let source = MockSource {
146 records: vec![json!({"id": 1}), json!({"id": 2})],
147 };
148 let records = source.fetch_all().await.unwrap();
149 assert_eq!(records.len(), 2);
150 assert_eq!(records[0]["id"], 1);
151 }
152
153 #[tokio::test]
154 async fn source_fetch_all_empty() {
155 let source = MockSource { records: vec![] };
156 let records = source.fetch_all().await.unwrap();
157 assert!(records.is_empty());
158 }
159
160 #[tokio::test]
161 async fn source_default_incremental_returns_none_bookmark() {
162 let source = MockSource {
163 records: vec![json!({"id": 1})],
164 };
165 let (records, bookmark) = source.fetch_all_incremental().await.unwrap();
166 assert_eq!(records.len(), 1);
167 assert!(bookmark.is_none());
168 }
169
170 #[tokio::test]
171 async fn source_custom_incremental_returns_bookmark() {
172 let source = IncrementalSource {
173 records: vec![json!({"id": 1})],
174 bookmark: json!("2024-12-01"),
175 };
176 let (records, bookmark) = source.fetch_all_incremental().await.unwrap();
177 assert_eq!(records.len(), 1);
178 assert_eq!(bookmark, Some(json!("2024-12-01")));
179 }
180
181 #[tokio::test]
182 async fn source_error_propagates() {
183 let source = FailingSource;
184 let result = source.fetch_all().await;
185 assert!(result.is_err());
186 assert!(matches!(result, Err(FaucetError::Auth(_))));
187 }
188
189 #[tokio::test]
190 async fn source_as_trait_object() {
191 let source: Box<dyn Source> = Box::new(MockSource {
192 records: vec![json!({"id": 42})],
193 });
194 let records = source.fetch_all().await.unwrap();
195 assert_eq!(records[0]["id"], 42);
196 }
197
198 #[tokio::test]
201 async fn sink_write_batch_returns_count() {
202 let sink = MockSink::new();
203 let records = vec![json!({"id": 1}), json!({"id": 2}), json!({"id": 3})];
204 let count = sink.write_batch(&records).await.unwrap();
205 assert_eq!(count, 3);
206 }
207
208 #[tokio::test]
209 async fn sink_write_batch_empty() {
210 let sink = MockSink::new();
211 let count = sink.write_batch(&[]).await.unwrap();
212 assert_eq!(count, 0);
213 }
214
215 #[tokio::test]
216 async fn sink_accumulates_records() {
217 let sink = MockSink::new();
218 sink.write_batch(&[json!({"a": 1})]).await.unwrap();
219 sink.write_batch(&[json!({"b": 2})]).await.unwrap();
220 let written = sink.written.lock().unwrap();
221 assert_eq!(written.len(), 2);
222 }
223
224 #[tokio::test]
225 async fn sink_default_flush_is_noop() {
226 let sink = MockSink::new();
227 assert!(sink.flush().await.is_ok());
228 }
229
230 #[tokio::test]
231 async fn sink_error_propagates() {
232 let sink = FailingSink;
233 let result = sink.write_batch(&[json!({"id": 1})]).await;
234 assert!(result.is_err());
235 assert!(matches!(result, Err(FaucetError::Sink(_))));
236 }
237
238 #[tokio::test]
239 async fn sink_as_trait_object() {
240 let sink: Box<dyn Sink> = Box::new(MockSink::new());
241 let count = sink.write_batch(&[json!({"id": 1})]).await.unwrap();
242 assert_eq!(count, 1);
243 }
244}