Skip to main content

faucet_core/
traits.rs

1//! Shared traits for faucet sources and sinks.
2
3use crate::error::FaucetError;
4use async_trait::async_trait;
5use serde_json::Value;
6
7/// A source fetches records from an external system.
8#[async_trait]
9pub trait Source: Send + Sync {
10    /// Fetch all records.
11    async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError>;
12
13    /// Fetch all records with incremental replication support.
14    ///
15    /// Returns the records and an optional bookmark value. The bookmark
16    /// should be persisted by the caller and passed back on the next run
17    /// to resume from where the previous run left off.
18    ///
19    /// The default implementation delegates to [`fetch_all`](Self::fetch_all)
20    /// and returns `None` for the bookmark.
21    async fn fetch_all_incremental(&self) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
22        let records = self.fetch_all().await?;
23        Ok((records, None))
24    }
25
26    /// Return a JSON Schema describing the configuration this source accepts.
27    ///
28    /// The schema is auto-generated from the config struct using `schemars`.
29    /// Callers can inspect it to discover required fields, types, defaults,
30    /// and descriptions before constructing the source.
31    ///
32    /// The default returns an empty object schema.
33    fn config_schema(&self) -> Value {
34        serde_json::json!({"type": "object", "properties": {}})
35    }
36}
37
38/// A sink writes records to an external system.
39#[async_trait]
40pub trait Sink: Send + Sync {
41    /// Write a batch of records to the destination.
42    ///
43    /// Returns the number of records successfully written.
44    async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError>;
45
46    /// Flush any buffered data to the destination.
47    ///
48    /// The default implementation is a no-op (suitable for sinks that
49    /// write immediately in `write_batch`).
50    async fn flush(&self) -> Result<(), FaucetError> {
51        Ok(())
52    }
53
54    /// Return a JSON Schema describing the configuration this sink accepts.
55    ///
56    /// The schema is auto-generated from the config struct using `schemars`.
57    /// Callers can inspect it to discover required fields, types, defaults,
58    /// and descriptions before constructing the sink.
59    ///
60    /// The default returns an empty object schema.
61    fn config_schema(&self) -> Value {
62        serde_json::json!({"type": "object", "properties": {}})
63    }
64}
65
66#[cfg(test)]
67mod tests {
68    use super::*;
69    use serde_json::json;
70
71    // ── Mock Source ──────────────────────────────────────────────────────────
72
73    struct MockSource {
74        records: Vec<Value>,
75    }
76
77    #[async_trait]
78    impl Source for MockSource {
79        async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
80            Ok(self.records.clone())
81        }
82    }
83
84    struct IncrementalSource {
85        records: Vec<Value>,
86        bookmark: Value,
87    }
88
89    #[async_trait]
90    impl Source for IncrementalSource {
91        async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
92            Ok(self.records.clone())
93        }
94
95        async fn fetch_all_incremental(&self) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
96            Ok((self.records.clone(), Some(self.bookmark.clone())))
97        }
98    }
99
100    struct FailingSource;
101
102    #[async_trait]
103    impl Source for FailingSource {
104        async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
105            Err(FaucetError::Auth("no credentials".into()))
106        }
107    }
108
109    // ── Mock Sink ───────────────────────────────────────────────────────────
110
111    struct MockSink {
112        written: std::sync::Mutex<Vec<Value>>,
113    }
114
115    impl MockSink {
116        fn new() -> Self {
117            Self {
118                written: std::sync::Mutex::new(Vec::new()),
119            }
120        }
121    }
122
123    #[async_trait]
124    impl Sink for MockSink {
125        async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
126            let mut w = self.written.lock().unwrap();
127            w.extend(records.iter().cloned());
128            Ok(records.len())
129        }
130    }
131
132    struct FailingSink;
133
134    #[async_trait]
135    impl Sink for FailingSink {
136        async fn write_batch(&self, _records: &[Value]) -> Result<usize, FaucetError> {
137            Err(FaucetError::Sink("write failed".into()))
138        }
139    }
140
141    // ── Source tests ────────────────────────────────────────────────────────
142
143    #[tokio::test]
144    async fn source_fetch_all_returns_records() {
145        let source = MockSource {
146            records: vec![json!({"id": 1}), json!({"id": 2})],
147        };
148        let records = source.fetch_all().await.unwrap();
149        assert_eq!(records.len(), 2);
150        assert_eq!(records[0]["id"], 1);
151    }
152
153    #[tokio::test]
154    async fn source_fetch_all_empty() {
155        let source = MockSource { records: vec![] };
156        let records = source.fetch_all().await.unwrap();
157        assert!(records.is_empty());
158    }
159
160    #[tokio::test]
161    async fn source_default_incremental_returns_none_bookmark() {
162        let source = MockSource {
163            records: vec![json!({"id": 1})],
164        };
165        let (records, bookmark) = source.fetch_all_incremental().await.unwrap();
166        assert_eq!(records.len(), 1);
167        assert!(bookmark.is_none());
168    }
169
170    #[tokio::test]
171    async fn source_custom_incremental_returns_bookmark() {
172        let source = IncrementalSource {
173            records: vec![json!({"id": 1})],
174            bookmark: json!("2024-12-01"),
175        };
176        let (records, bookmark) = source.fetch_all_incremental().await.unwrap();
177        assert_eq!(records.len(), 1);
178        assert_eq!(bookmark, Some(json!("2024-12-01")));
179    }
180
181    #[tokio::test]
182    async fn source_error_propagates() {
183        let source = FailingSource;
184        let result = source.fetch_all().await;
185        assert!(result.is_err());
186        assert!(matches!(result, Err(FaucetError::Auth(_))));
187    }
188
189    #[tokio::test]
190    async fn source_as_trait_object() {
191        let source: Box<dyn Source> = Box::new(MockSource {
192            records: vec![json!({"id": 42})],
193        });
194        let records = source.fetch_all().await.unwrap();
195        assert_eq!(records[0]["id"], 42);
196    }
197
198    // ── Sink tests ──────────────────────────────────────────────────────────
199
200    #[tokio::test]
201    async fn sink_write_batch_returns_count() {
202        let sink = MockSink::new();
203        let records = vec![json!({"id": 1}), json!({"id": 2}), json!({"id": 3})];
204        let count = sink.write_batch(&records).await.unwrap();
205        assert_eq!(count, 3);
206    }
207
208    #[tokio::test]
209    async fn sink_write_batch_empty() {
210        let sink = MockSink::new();
211        let count = sink.write_batch(&[]).await.unwrap();
212        assert_eq!(count, 0);
213    }
214
215    #[tokio::test]
216    async fn sink_accumulates_records() {
217        let sink = MockSink::new();
218        sink.write_batch(&[json!({"a": 1})]).await.unwrap();
219        sink.write_batch(&[json!({"b": 2})]).await.unwrap();
220        let written = sink.written.lock().unwrap();
221        assert_eq!(written.len(), 2);
222    }
223
224    #[tokio::test]
225    async fn sink_default_flush_is_noop() {
226        let sink = MockSink::new();
227        assert!(sink.flush().await.is_ok());
228    }
229
230    #[tokio::test]
231    async fn sink_error_propagates() {
232        let sink = FailingSink;
233        let result = sink.write_batch(&[json!({"id": 1})]).await;
234        assert!(result.is_err());
235        assert!(matches!(result, Err(FaucetError::Sink(_))));
236    }
237
238    #[tokio::test]
239    async fn sink_as_trait_object() {
240        let sink: Box<dyn Sink> = Box::new(MockSink::new());
241        let count = sink.write_batch(&[json!({"id": 1})]).await.unwrap();
242        assert_eq!(count, 1);
243    }
244}