faucet_stream/
replication.rs1use serde_json::Value;
4use std::cmp::Ordering;
5
6#[derive(Debug, Clone, Default, PartialEq)]
8pub enum ReplicationMethod {
9 #[default]
11 FullTable,
12 Incremental,
15}
16
17pub fn filter_incremental(records: Vec<Value>, key: &str, start: &Value) -> Vec<Value> {
23 records
24 .into_iter()
25 .filter(|r| r.get(key).is_some_and(|v| json_gt(v, start)))
26 .collect()
27}
28
29pub fn max_replication_value<'a>(records: &'a [Value], key: &str) -> Option<&'a Value> {
31 records
32 .iter()
33 .filter_map(|r| r.get(key))
34 .max_by(|a, b| json_compare(a, b))
35}
36
37pub(crate) fn json_compare(a: &Value, b: &Value) -> Ordering {
38 match (a, b) {
39 (Value::Number(an), Value::Number(bn)) => {
40 let af = an.as_f64().unwrap_or(f64::NEG_INFINITY);
41 let bf = bn.as_f64().unwrap_or(f64::NEG_INFINITY);
42 af.partial_cmp(&bf).unwrap_or(Ordering::Equal)
43 }
44 (Value::String(as_), Value::String(bs)) => as_.cmp(bs),
45 _ => Ordering::Equal,
46 }
47}
48
49fn json_gt(a: &Value, b: &Value) -> bool {
50 json_compare(a, b) == Ordering::Greater
51}
52
53#[cfg(test)]
54mod tests {
55 use super::*;
56 use serde_json::json;
57
58 #[test]
59 fn test_filter_incremental_strings() {
60 let records = vec![
61 json!({"id": 1, "updated_at": "2024-01-01"}),
62 json!({"id": 2, "updated_at": "2024-06-01"}),
63 json!({"id": 3, "updated_at": "2024-12-01"}),
64 ];
65 let start = json!("2024-06-01");
66 let filtered = filter_incremental(records, "updated_at", &start);
67 assert_eq!(filtered.len(), 1);
68 assert_eq!(filtered[0]["id"], 3);
69 }
70
71 #[test]
72 fn test_filter_incremental_numbers() {
73 let records = vec![
74 json!({"id": 1, "seq": 100}),
75 json!({"id": 2, "seq": 200}),
76 json!({"id": 3, "seq": 300}),
77 ];
78 let start = json!(150);
79 let filtered = filter_incremental(records, "seq", &start);
80 assert_eq!(filtered.len(), 2);
81 assert_eq!(filtered[0]["id"], 2);
82 assert_eq!(filtered[1]["id"], 3);
83 }
84
85 #[test]
86 fn test_filter_incremental_missing_key_excluded() {
87 let records = vec![
88 json!({"id": 1}),
89 json!({"id": 2, "updated_at": "2024-12-01"}),
90 ];
91 let start = json!("2024-01-01");
92 let filtered = filter_incremental(records, "updated_at", &start);
93 assert_eq!(filtered.len(), 1);
94 assert_eq!(filtered[0]["id"], 2);
95 }
96
97 #[test]
98 fn test_filter_incremental_equal_excluded() {
99 let records = vec![
100 json!({"id": 1, "updated_at": "2024-06-01"}),
101 json!({"id": 2, "updated_at": "2024-06-02"}),
102 ];
103 let start = json!("2024-06-01");
104 let filtered = filter_incremental(records, "updated_at", &start);
105 assert_eq!(filtered.len(), 1);
106 assert_eq!(filtered[0]["id"], 2);
107 }
108
109 #[test]
110 fn test_max_replication_value_strings() {
111 let records = vec![
112 json!({"updated_at": "2024-01-01"}),
113 json!({"updated_at": "2024-12-01"}),
114 json!({"updated_at": "2024-06-01"}),
115 ];
116 let max = max_replication_value(&records, "updated_at").unwrap();
117 assert_eq!(max, &json!("2024-12-01"));
118 }
119
120 #[test]
121 fn test_max_replication_value_numbers() {
122 let records = vec![json!({"seq": 5}), json!({"seq": 10}), json!({"seq": 3})];
123 let max = max_replication_value(&records, "seq").unwrap();
124 assert_eq!(max, &json!(10));
125 }
126
127 #[test]
128 fn test_max_replication_value_empty() {
129 let records: Vec<Value> = vec![];
130 assert!(max_replication_value(&records, "updated_at").is_none());
131 }
132}