Skip to main content

faucet_core/
replication.rs

1//! Incremental replication support.
2
3use schemars::JsonSchema;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use std::cmp::Ordering;
7
8/// Determines how records are replicated from the source.
9#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize, JsonSchema)]
10#[serde(tag = "type")]
11pub enum ReplicationMethod {
12    /// All records are fetched on every run (default).
13    #[default]
14    FullTable,
15    /// Only records where the `replication_key` field is strictly greater than
16    /// the stored bookmark (`start_replication_value`) are kept.
17    Incremental,
18}
19
20/// Filter `records` to only those where `record[key] > start`.
21///
22/// Records missing the key are excluded. Strings compare lexicographically
23/// (ISO-8601 dates compare correctly this way); integers compare exactly
24/// (no `f64` precision loss); floats compare as `f64`.
25///
26/// If a record's key value is a *different JSON type* than `start` (e.g. a
27/// numeric key against a string bookmark), the comparison is not meaningful;
28/// rather than silently dropping the record — which is data loss (#78/#27) —
29/// it is **kept** and a warning is logged.
30pub fn filter_incremental(records: Vec<Value>, key: &str, start: &Value) -> Vec<Value> {
31    records
32        .into_iter()
33        .filter(|r| match r.get(key) {
34            None => false,
35            Some(v) if type_rank(v) != type_rank(start) => {
36                tracing::warn!(
37                    key,
38                    "incremental replication: record key type does not match the bookmark \
39                     type; keeping the record to avoid silently dropping data"
40                );
41                true
42            }
43            Some(v) => json_gt(v, start),
44        })
45        .collect()
46}
47
48/// Return the maximum value of `record[key]` across all records, if any.
49pub fn max_replication_value<'a>(records: &'a [Value], key: &str) -> Option<&'a Value> {
50    records
51        .iter()
52        .filter_map(|r| r.get(key))
53        .max_by(|a, b| json_compare(a, b))
54}
55
56/// Return the larger of two replication values using the same ordering as
57/// [`max_replication_value`] (string lexicographic, numeric for numbers,
58/// falling back to `a` on type mismatch).
59pub fn max_value(a: Value, b: Value) -> Value {
60    match json_compare(&a, &b) {
61        Ordering::Less => b,
62        _ => a,
63    }
64}
65
66/// Type-rank for a total ordering across JSON value kinds, so comparisons of
67/// differing types are deterministic instead of collapsing to `Equal`.
68fn type_rank(v: &Value) -> u8 {
69    match v {
70        Value::Null => 0,
71        Value::Bool(_) => 1,
72        Value::Number(_) => 2,
73        Value::String(_) => 3,
74        Value::Array(_) => 4,
75        Value::Object(_) => 5,
76    }
77}
78
79/// Exact integer view of a JSON number (`i64` or `u64`), widened to `i128` so
80/// both halves of the range compare without `f64` precision loss. `None` for
81/// non-integral (floating) numbers.
82fn number_as_i128(n: &serde_json::Number) -> Option<i128> {
83    n.as_i64()
84        .map(i128::from)
85        .or_else(|| n.as_u64().map(i128::from))
86}
87
88/// Total ordering over JSON values used for replication bookmarks.
89///
90/// - Numbers: compared exactly as `i128` when both are integral (so cursors
91///   above 2^53 don't lose precision); otherwise as `f64`, with NaN ordered
92///   last.
93/// - Same-type scalars/containers: natural ordering (strings lexicographic,
94///   bools `false < true`, arrays element-wise, objects by serialized form).
95/// - Different types: ordered by [`type_rank`] so the result is always total.
96pub(crate) fn json_compare(a: &Value, b: &Value) -> Ordering {
97    match (a, b) {
98        (Value::Number(an), Value::Number(bn)) => {
99            match (number_as_i128(an), number_as_i128(bn)) {
100                (Some(ai), Some(bi)) => ai.cmp(&bi),
101                _ => {
102                    let af = an.as_f64().unwrap_or(f64::NAN);
103                    let bf = bn.as_f64().unwrap_or(f64::NAN);
104                    af.partial_cmp(&bf).unwrap_or_else(|| {
105                        // At least one NaN — order NaN last, deterministically.
106                        match (af.is_nan(), bf.is_nan()) {
107                            (false, true) => Ordering::Less,
108                            (true, false) => Ordering::Greater,
109                            _ => Ordering::Equal,
110                        }
111                    })
112                }
113            }
114        }
115        (Value::String(x), Value::String(y)) => x.cmp(y),
116        (Value::Bool(x), Value::Bool(y)) => x.cmp(y),
117        (Value::Null, Value::Null) => Ordering::Equal,
118        (Value::Array(x), Value::Array(y)) => {
119            for (xi, yi) in x.iter().zip(y.iter()) {
120                let c = json_compare(xi, yi);
121                if c != Ordering::Equal {
122                    return c;
123                }
124            }
125            x.len().cmp(&y.len())
126        }
127        // Objects have no natural order; use the serialized form for a stable
128        // total order (objects as replication keys are pathological).
129        (Value::Object(_), Value::Object(_)) => a.to_string().cmp(&b.to_string()),
130        // Different JSON types — order by type rank so comparison is total.
131        _ => type_rank(a).cmp(&type_rank(b)),
132    }
133}
134
135fn json_gt(a: &Value, b: &Value) -> bool {
136    json_compare(a, b) == Ordering::Greater
137}
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142    use serde_json::json;
143
144    #[test]
145    fn test_filter_incremental_strings() {
146        let records = vec![
147            json!({"id": 1, "updated_at": "2024-01-01"}),
148            json!({"id": 2, "updated_at": "2024-06-01"}),
149            json!({"id": 3, "updated_at": "2024-12-01"}),
150        ];
151        let start = json!("2024-06-01");
152        let filtered = filter_incremental(records, "updated_at", &start);
153        assert_eq!(filtered.len(), 1);
154        assert_eq!(filtered[0]["id"], 3);
155    }
156
157    #[test]
158    fn test_filter_incremental_numbers() {
159        let records = vec![
160            json!({"id": 1, "seq": 100}),
161            json!({"id": 2, "seq": 200}),
162            json!({"id": 3, "seq": 300}),
163        ];
164        let start = json!(150);
165        let filtered = filter_incremental(records, "seq", &start);
166        assert_eq!(filtered.len(), 2);
167        assert_eq!(filtered[0]["id"], 2);
168        assert_eq!(filtered[1]["id"], 3);
169    }
170
171    #[test]
172    fn test_filter_incremental_missing_key_excluded() {
173        let records = vec![
174            json!({"id": 1}),
175            json!({"id": 2, "updated_at": "2024-12-01"}),
176        ];
177        let start = json!("2024-01-01");
178        let filtered = filter_incremental(records, "updated_at", &start);
179        assert_eq!(filtered.len(), 1);
180        assert_eq!(filtered[0]["id"], 2);
181    }
182
183    #[test]
184    fn test_filter_incremental_equal_excluded() {
185        let records = vec![
186            json!({"id": 1, "updated_at": "2024-06-01"}),
187            json!({"id": 2, "updated_at": "2024-06-02"}),
188        ];
189        let start = json!("2024-06-01");
190        let filtered = filter_incremental(records, "updated_at", &start);
191        assert_eq!(filtered.len(), 1);
192        assert_eq!(filtered[0]["id"], 2);
193    }
194
195    #[test]
196    fn test_max_replication_value_strings() {
197        let records = vec![
198            json!({"updated_at": "2024-01-01"}),
199            json!({"updated_at": "2024-12-01"}),
200            json!({"updated_at": "2024-06-01"}),
201        ];
202        let max = max_replication_value(&records, "updated_at").unwrap();
203        assert_eq!(max, &json!("2024-12-01"));
204    }
205
206    #[test]
207    fn test_max_replication_value_numbers() {
208        let records = vec![json!({"seq": 5}), json!({"seq": 10}), json!({"seq": 3})];
209        let max = max_replication_value(&records, "seq").unwrap();
210        assert_eq!(max, &json!(10));
211    }
212
213    #[test]
214    fn test_max_replication_value_empty() {
215        let records: Vec<Value> = vec![];
216        assert!(max_replication_value(&records, "updated_at").is_none());
217    }
218
219    #[test]
220    fn test_max_value_picks_larger_string() {
221        assert_eq!(
222            max_value(json!("2024-01-01"), json!("2024-06-01")),
223            json!("2024-06-01")
224        );
225    }
226
227    #[test]
228    fn test_max_value_picks_larger_number() {
229        assert_eq!(max_value(json!(5), json!(10)), json!(10));
230    }
231
232    #[test]
233    fn test_max_value_returns_a_on_type_mismatch() {
234        // String outranks Number in the total type-rank ordering, so the
235        // larger (a) is returned.
236        assert_eq!(max_value(json!("string"), json!(5)), json!("string"));
237    }
238
239    #[test]
240    fn filter_incremental_keeps_large_integer_beyond_f64_precision() {
241        // Regression for #78/#27: integer cursors above 2^53 lose precision
242        // when compared as f64, so a genuinely-greater value compared Equal
243        // and was silently dropped.
244        let two_pow_53 = 9_007_199_254_740_992_i64; // 2^53
245        let records = vec![
246            json!({"id": 1, "seq": two_pow_53 + 1}),
247            json!({"id": 2, "seq": two_pow_53 + 2}),
248        ];
249        let start = json!(two_pow_53);
250        let filtered = filter_incremental(records, "seq", &start);
251        assert_eq!(
252            filtered.len(),
253            2,
254            "both values are strictly greater than 2^53"
255        );
256    }
257
258    #[test]
259    fn json_compare_distinguishes_large_integers() {
260        let a = json!(9_007_199_254_740_993_i64); // 2^53 + 1
261        let b = json!(9_007_199_254_740_992_i64); // 2^53
262        assert_eq!(json_compare(&a, &b), Ordering::Greater);
263    }
264
265    #[test]
266    fn filter_incremental_keeps_records_on_type_mismatch() {
267        // Regression for #78/#27: a bookmark/key type mismatch must not be
268        // silently treated as "not greater" and the record dropped — that is
269        // data loss. Keep the record instead.
270        let records = vec![json!({"id": 1, "seq": 20_240_701})];
271        let start = json!("2024-06-01"); // string bookmark vs numeric key
272        let filtered = filter_incremental(records, "seq", &start);
273        assert_eq!(filtered.len(), 1, "type mismatch must not silently drop");
274    }
275}