faucet_core/
replication.rs1use schemars::JsonSchema;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use std::cmp::Ordering;
7
8#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize, JsonSchema)]
10#[serde(tag = "type")]
11pub enum ReplicationMethod {
12 #[default]
14 FullTable,
15 Incremental,
18}
19
20pub fn filter_incremental(records: Vec<Value>, key: &str, start: &Value) -> Vec<Value> {
26 records
27 .into_iter()
28 .filter(|r| r.get(key).is_some_and(|v| json_gt(v, start)))
29 .collect()
30}
31
32pub fn max_replication_value<'a>(records: &'a [Value], key: &str) -> Option<&'a Value> {
34 records
35 .iter()
36 .filter_map(|r| r.get(key))
37 .max_by(|a, b| json_compare(a, b))
38}
39
40pub(crate) fn json_compare(a: &Value, b: &Value) -> Ordering {
41 match (a, b) {
42 (Value::Number(an), Value::Number(bn)) => {
43 let af = an.as_f64().unwrap_or(f64::NEG_INFINITY);
44 let bf = bn.as_f64().unwrap_or(f64::NEG_INFINITY);
45 af.partial_cmp(&bf).unwrap_or(Ordering::Equal)
46 }
47 (Value::String(as_), Value::String(bs)) => as_.cmp(bs),
48 _ => Ordering::Equal,
49 }
50}
51
52fn json_gt(a: &Value, b: &Value) -> bool {
53 json_compare(a, b) == Ordering::Greater
54}
55
56#[cfg(test)]
57mod tests {
58 use super::*;
59 use serde_json::json;
60
61 #[test]
62 fn test_filter_incremental_strings() {
63 let records = vec![
64 json!({"id": 1, "updated_at": "2024-01-01"}),
65 json!({"id": 2, "updated_at": "2024-06-01"}),
66 json!({"id": 3, "updated_at": "2024-12-01"}),
67 ];
68 let start = json!("2024-06-01");
69 let filtered = filter_incremental(records, "updated_at", &start);
70 assert_eq!(filtered.len(), 1);
71 assert_eq!(filtered[0]["id"], 3);
72 }
73
74 #[test]
75 fn test_filter_incremental_numbers() {
76 let records = vec![
77 json!({"id": 1, "seq": 100}),
78 json!({"id": 2, "seq": 200}),
79 json!({"id": 3, "seq": 300}),
80 ];
81 let start = json!(150);
82 let filtered = filter_incremental(records, "seq", &start);
83 assert_eq!(filtered.len(), 2);
84 assert_eq!(filtered[0]["id"], 2);
85 assert_eq!(filtered[1]["id"], 3);
86 }
87
88 #[test]
89 fn test_filter_incremental_missing_key_excluded() {
90 let records = vec![
91 json!({"id": 1}),
92 json!({"id": 2, "updated_at": "2024-12-01"}),
93 ];
94 let start = json!("2024-01-01");
95 let filtered = filter_incremental(records, "updated_at", &start);
96 assert_eq!(filtered.len(), 1);
97 assert_eq!(filtered[0]["id"], 2);
98 }
99
100 #[test]
101 fn test_filter_incremental_equal_excluded() {
102 let records = vec![
103 json!({"id": 1, "updated_at": "2024-06-01"}),
104 json!({"id": 2, "updated_at": "2024-06-02"}),
105 ];
106 let start = json!("2024-06-01");
107 let filtered = filter_incremental(records, "updated_at", &start);
108 assert_eq!(filtered.len(), 1);
109 assert_eq!(filtered[0]["id"], 2);
110 }
111
112 #[test]
113 fn test_max_replication_value_strings() {
114 let records = vec![
115 json!({"updated_at": "2024-01-01"}),
116 json!({"updated_at": "2024-12-01"}),
117 json!({"updated_at": "2024-06-01"}),
118 ];
119 let max = max_replication_value(&records, "updated_at").unwrap();
120 assert_eq!(max, &json!("2024-12-01"));
121 }
122
123 #[test]
124 fn test_max_replication_value_numbers() {
125 let records = vec![json!({"seq": 5}), json!({"seq": 10}), json!({"seq": 3})];
126 let max = max_replication_value(&records, "seq").unwrap();
127 assert_eq!(max, &json!(10));
128 }
129
130 #[test]
131 fn test_max_replication_value_empty() {
132 let records: Vec<Value> = vec![];
133 assert!(max_replication_value(&records, "updated_at").is_none());
134 }
135}