Skip to main content

faucet_core/
replication.rs

1//! Incremental replication support.
2
3use schemars::JsonSchema;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use std::cmp::Ordering;
7
8/// Determines how records are replicated from the source.
9#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize, JsonSchema)]
10#[serde(tag = "type")]
11pub enum ReplicationMethod {
12    /// All records are fetched on every run (default).
13    #[default]
14    FullTable,
15    /// Only records where the `replication_key` field is strictly greater than
16    /// the stored bookmark (`start_replication_value`) are kept.
17    Incremental,
18}
19
20/// Filter `records` to only those where `record[key] > start`.
21///
22/// Records that are missing the key are excluded.
23/// String values are compared lexicographically (ISO-8601 dates compare correctly this way).
24/// Numeric values are compared as `f64`.
25pub fn filter_incremental(records: Vec<Value>, key: &str, start: &Value) -> Vec<Value> {
26    records
27        .into_iter()
28        .filter(|r| r.get(key).is_some_and(|v| json_gt(v, start)))
29        .collect()
30}
31
32/// Return the maximum value of `record[key]` across all records, if any.
33pub fn max_replication_value<'a>(records: &'a [Value], key: &str) -> Option<&'a Value> {
34    records
35        .iter()
36        .filter_map(|r| r.get(key))
37        .max_by(|a, b| json_compare(a, b))
38}
39
40pub(crate) fn json_compare(a: &Value, b: &Value) -> Ordering {
41    match (a, b) {
42        (Value::Number(an), Value::Number(bn)) => {
43            let af = an.as_f64().unwrap_or(f64::NEG_INFINITY);
44            let bf = bn.as_f64().unwrap_or(f64::NEG_INFINITY);
45            af.partial_cmp(&bf).unwrap_or(Ordering::Equal)
46        }
47        (Value::String(as_), Value::String(bs)) => as_.cmp(bs),
48        _ => Ordering::Equal,
49    }
50}
51
52fn json_gt(a: &Value, b: &Value) -> bool {
53    json_compare(a, b) == Ordering::Greater
54}
55
56#[cfg(test)]
57mod tests {
58    use super::*;
59    use serde_json::json;
60
61    #[test]
62    fn test_filter_incremental_strings() {
63        let records = vec![
64            json!({"id": 1, "updated_at": "2024-01-01"}),
65            json!({"id": 2, "updated_at": "2024-06-01"}),
66            json!({"id": 3, "updated_at": "2024-12-01"}),
67        ];
68        let start = json!("2024-06-01");
69        let filtered = filter_incremental(records, "updated_at", &start);
70        assert_eq!(filtered.len(), 1);
71        assert_eq!(filtered[0]["id"], 3);
72    }
73
74    #[test]
75    fn test_filter_incremental_numbers() {
76        let records = vec![
77            json!({"id": 1, "seq": 100}),
78            json!({"id": 2, "seq": 200}),
79            json!({"id": 3, "seq": 300}),
80        ];
81        let start = json!(150);
82        let filtered = filter_incremental(records, "seq", &start);
83        assert_eq!(filtered.len(), 2);
84        assert_eq!(filtered[0]["id"], 2);
85        assert_eq!(filtered[1]["id"], 3);
86    }
87
88    #[test]
89    fn test_filter_incremental_missing_key_excluded() {
90        let records = vec![
91            json!({"id": 1}),
92            json!({"id": 2, "updated_at": "2024-12-01"}),
93        ];
94        let start = json!("2024-01-01");
95        let filtered = filter_incremental(records, "updated_at", &start);
96        assert_eq!(filtered.len(), 1);
97        assert_eq!(filtered[0]["id"], 2);
98    }
99
100    #[test]
101    fn test_filter_incremental_equal_excluded() {
102        let records = vec![
103            json!({"id": 1, "updated_at": "2024-06-01"}),
104            json!({"id": 2, "updated_at": "2024-06-02"}),
105        ];
106        let start = json!("2024-06-01");
107        let filtered = filter_incremental(records, "updated_at", &start);
108        assert_eq!(filtered.len(), 1);
109        assert_eq!(filtered[0]["id"], 2);
110    }
111
112    #[test]
113    fn test_max_replication_value_strings() {
114        let records = vec![
115            json!({"updated_at": "2024-01-01"}),
116            json!({"updated_at": "2024-12-01"}),
117            json!({"updated_at": "2024-06-01"}),
118        ];
119        let max = max_replication_value(&records, "updated_at").unwrap();
120        assert_eq!(max, &json!("2024-12-01"));
121    }
122
123    #[test]
124    fn test_max_replication_value_numbers() {
125        let records = vec![json!({"seq": 5}), json!({"seq": 10}), json!({"seq": 3})];
126        let max = max_replication_value(&records, "seq").unwrap();
127        assert_eq!(max, &json!(10));
128    }
129
130    #[test]
131    fn test_max_replication_value_empty() {
132        let records: Vec<Value> = vec![];
133        assert!(max_replication_value(&records, "updated_at").is_none());
134    }
135}