Skip to main content

faucet_stream/
replication.rs

1//! Incremental replication support.
2
3use serde_json::Value;
4use std::cmp::Ordering;
5
6/// Determines how records are replicated from the source.
7#[derive(Debug, Clone, Default, PartialEq)]
8pub enum ReplicationMethod {
9    /// All records are fetched on every run (default).
10    #[default]
11    FullTable,
12    /// Only records where the `replication_key` field is strictly greater than
13    /// the stored bookmark (`start_replication_value`) are kept.
14    Incremental,
15}
16
17/// Filter `records` to only those where `record[key] > start`.
18///
19/// Records that are missing the key are excluded.
20/// String values are compared lexicographically (ISO-8601 dates compare correctly this way).
21/// Numeric values are compared as `f64`.
22pub fn filter_incremental(records: Vec<Value>, key: &str, start: &Value) -> Vec<Value> {
23    records
24        .into_iter()
25        .filter(|r| r.get(key).is_some_and(|v| json_gt(v, start)))
26        .collect()
27}
28
29/// Return the maximum value of `record[key]` across all records, if any.
30pub fn max_replication_value<'a>(records: &'a [Value], key: &str) -> Option<&'a Value> {
31    records
32        .iter()
33        .filter_map(|r| r.get(key))
34        .max_by(|a, b| json_compare(a, b))
35}
36
37pub(crate) fn json_compare(a: &Value, b: &Value) -> Ordering {
38    match (a, b) {
39        (Value::Number(an), Value::Number(bn)) => {
40            let af = an.as_f64().unwrap_or(f64::NEG_INFINITY);
41            let bf = bn.as_f64().unwrap_or(f64::NEG_INFINITY);
42            af.partial_cmp(&bf).unwrap_or(Ordering::Equal)
43        }
44        (Value::String(as_), Value::String(bs)) => as_.cmp(bs),
45        _ => Ordering::Equal,
46    }
47}
48
49fn json_gt(a: &Value, b: &Value) -> bool {
50    json_compare(a, b) == Ordering::Greater
51}
52
53#[cfg(test)]
54mod tests {
55    use super::*;
56    use serde_json::json;
57
58    #[test]
59    fn test_filter_incremental_strings() {
60        let records = vec![
61            json!({"id": 1, "updated_at": "2024-01-01"}),
62            json!({"id": 2, "updated_at": "2024-06-01"}),
63            json!({"id": 3, "updated_at": "2024-12-01"}),
64        ];
65        let start = json!("2024-06-01");
66        let filtered = filter_incremental(records, "updated_at", &start);
67        assert_eq!(filtered.len(), 1);
68        assert_eq!(filtered[0]["id"], 3);
69    }
70
71    #[test]
72    fn test_filter_incremental_numbers() {
73        let records = vec![
74            json!({"id": 1, "seq": 100}),
75            json!({"id": 2, "seq": 200}),
76            json!({"id": 3, "seq": 300}),
77        ];
78        let start = json!(150);
79        let filtered = filter_incremental(records, "seq", &start);
80        assert_eq!(filtered.len(), 2);
81        assert_eq!(filtered[0]["id"], 2);
82        assert_eq!(filtered[1]["id"], 3);
83    }
84
85    #[test]
86    fn test_filter_incremental_missing_key_excluded() {
87        let records = vec![
88            json!({"id": 1}),
89            json!({"id": 2, "updated_at": "2024-12-01"}),
90        ];
91        let start = json!("2024-01-01");
92        let filtered = filter_incremental(records, "updated_at", &start);
93        assert_eq!(filtered.len(), 1);
94        assert_eq!(filtered[0]["id"], 2);
95    }
96
97    #[test]
98    fn test_filter_incremental_equal_excluded() {
99        let records = vec![
100            json!({"id": 1, "updated_at": "2024-06-01"}),
101            json!({"id": 2, "updated_at": "2024-06-02"}),
102        ];
103        let start = json!("2024-06-01");
104        let filtered = filter_incremental(records, "updated_at", &start);
105        assert_eq!(filtered.len(), 1);
106        assert_eq!(filtered[0]["id"], 2);
107    }
108
109    #[test]
110    fn test_max_replication_value_strings() {
111        let records = vec![
112            json!({"updated_at": "2024-01-01"}),
113            json!({"updated_at": "2024-12-01"}),
114            json!({"updated_at": "2024-06-01"}),
115        ];
116        let max = max_replication_value(&records, "updated_at").unwrap();
117        assert_eq!(max, &json!("2024-12-01"));
118    }
119
120    #[test]
121    fn test_max_replication_value_numbers() {
122        let records = vec![json!({"seq": 5}), json!({"seq": 10}), json!({"seq": 3})];
123        let max = max_replication_value(&records, "seq").unwrap();
124        assert_eq!(max, &json!(10));
125    }
126
127    #[test]
128    fn test_max_replication_value_empty() {
129        let records: Vec<Value> = vec![];
130        assert!(max_replication_value(&records, "updated_at").is_none());
131    }
132}