1use schemars::JsonSchema;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use std::cmp::Ordering;
7
8#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize, JsonSchema)]
10#[serde(tag = "type")]
11pub enum ReplicationMethod {
12 #[default]
14 FullTable,
15 Incremental,
18}
19
20pub fn filter_incremental(records: Vec<Value>, key: &str, start: &Value) -> Vec<Value> {
31 records
32 .into_iter()
33 .filter(|r| match r.get(key) {
34 None => false,
35 Some(v) if type_rank(v) != type_rank(start) => {
36 tracing::warn!(
37 key,
38 "incremental replication: record key type does not match the bookmark \
39 type; keeping the record to avoid silently dropping data"
40 );
41 true
42 }
43 Some(v) => json_gt(v, start),
44 })
45 .collect()
46}
47
48pub fn max_replication_value<'a>(records: &'a [Value], key: &str) -> Option<&'a Value> {
50 records
51 .iter()
52 .filter_map(|r| r.get(key))
53 .max_by(|a, b| json_compare(a, b))
54}
55
56pub fn max_value(a: Value, b: Value) -> Value {
60 match json_compare(&a, &b) {
61 Ordering::Less => b,
62 _ => a,
63 }
64}
65
66fn type_rank(v: &Value) -> u8 {
69 match v {
70 Value::Null => 0,
71 Value::Bool(_) => 1,
72 Value::Number(_) => 2,
73 Value::String(_) => 3,
74 Value::Array(_) => 4,
75 Value::Object(_) => 5,
76 }
77}
78
79fn number_as_i128(n: &serde_json::Number) -> Option<i128> {
83 n.as_i64()
84 .map(i128::from)
85 .or_else(|| n.as_u64().map(i128::from))
86}
87
88pub(crate) fn json_compare(a: &Value, b: &Value) -> Ordering {
97 match (a, b) {
98 (Value::Number(an), Value::Number(bn)) => {
99 match (number_as_i128(an), number_as_i128(bn)) {
100 (Some(ai), Some(bi)) => ai.cmp(&bi),
101 _ => {
102 let af = an.as_f64().unwrap_or(f64::NAN);
103 let bf = bn.as_f64().unwrap_or(f64::NAN);
104 af.partial_cmp(&bf).unwrap_or_else(|| {
105 match (af.is_nan(), bf.is_nan()) {
107 (false, true) => Ordering::Less,
108 (true, false) => Ordering::Greater,
109 _ => Ordering::Equal,
110 }
111 })
112 }
113 }
114 }
115 (Value::String(x), Value::String(y)) => x.cmp(y),
116 (Value::Bool(x), Value::Bool(y)) => x.cmp(y),
117 (Value::Null, Value::Null) => Ordering::Equal,
118 (Value::Array(x), Value::Array(y)) => {
119 for (xi, yi) in x.iter().zip(y.iter()) {
120 let c = json_compare(xi, yi);
121 if c != Ordering::Equal {
122 return c;
123 }
124 }
125 x.len().cmp(&y.len())
126 }
127 (Value::Object(_), Value::Object(_)) => a.to_string().cmp(&b.to_string()),
130 _ => type_rank(a).cmp(&type_rank(b)),
132 }
133}
134
135fn json_gt(a: &Value, b: &Value) -> bool {
136 json_compare(a, b) == Ordering::Greater
137}
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142 use serde_json::json;
143
144 #[test]
145 fn test_filter_incremental_strings() {
146 let records = vec![
147 json!({"id": 1, "updated_at": "2024-01-01"}),
148 json!({"id": 2, "updated_at": "2024-06-01"}),
149 json!({"id": 3, "updated_at": "2024-12-01"}),
150 ];
151 let start = json!("2024-06-01");
152 let filtered = filter_incremental(records, "updated_at", &start);
153 assert_eq!(filtered.len(), 1);
154 assert_eq!(filtered[0]["id"], 3);
155 }
156
157 #[test]
158 fn test_filter_incremental_numbers() {
159 let records = vec![
160 json!({"id": 1, "seq": 100}),
161 json!({"id": 2, "seq": 200}),
162 json!({"id": 3, "seq": 300}),
163 ];
164 let start = json!(150);
165 let filtered = filter_incremental(records, "seq", &start);
166 assert_eq!(filtered.len(), 2);
167 assert_eq!(filtered[0]["id"], 2);
168 assert_eq!(filtered[1]["id"], 3);
169 }
170
171 #[test]
172 fn test_filter_incremental_missing_key_excluded() {
173 let records = vec![
174 json!({"id": 1}),
175 json!({"id": 2, "updated_at": "2024-12-01"}),
176 ];
177 let start = json!("2024-01-01");
178 let filtered = filter_incremental(records, "updated_at", &start);
179 assert_eq!(filtered.len(), 1);
180 assert_eq!(filtered[0]["id"], 2);
181 }
182
183 #[test]
184 fn test_filter_incremental_equal_excluded() {
185 let records = vec![
186 json!({"id": 1, "updated_at": "2024-06-01"}),
187 json!({"id": 2, "updated_at": "2024-06-02"}),
188 ];
189 let start = json!("2024-06-01");
190 let filtered = filter_incremental(records, "updated_at", &start);
191 assert_eq!(filtered.len(), 1);
192 assert_eq!(filtered[0]["id"], 2);
193 }
194
195 #[test]
196 fn test_max_replication_value_strings() {
197 let records = vec![
198 json!({"updated_at": "2024-01-01"}),
199 json!({"updated_at": "2024-12-01"}),
200 json!({"updated_at": "2024-06-01"}),
201 ];
202 let max = max_replication_value(&records, "updated_at").unwrap();
203 assert_eq!(max, &json!("2024-12-01"));
204 }
205
206 #[test]
207 fn test_max_replication_value_numbers() {
208 let records = vec![json!({"seq": 5}), json!({"seq": 10}), json!({"seq": 3})];
209 let max = max_replication_value(&records, "seq").unwrap();
210 assert_eq!(max, &json!(10));
211 }
212
213 #[test]
214 fn test_max_replication_value_empty() {
215 let records: Vec<Value> = vec![];
216 assert!(max_replication_value(&records, "updated_at").is_none());
217 }
218
219 #[test]
220 fn test_max_value_picks_larger_string() {
221 assert_eq!(
222 max_value(json!("2024-01-01"), json!("2024-06-01")),
223 json!("2024-06-01")
224 );
225 }
226
227 #[test]
228 fn test_max_value_picks_larger_number() {
229 assert_eq!(max_value(json!(5), json!(10)), json!(10));
230 }
231
232 #[test]
233 fn test_max_value_returns_a_on_type_mismatch() {
234 assert_eq!(max_value(json!("string"), json!(5)), json!("string"));
237 }
238
239 #[test]
240 fn filter_incremental_keeps_large_integer_beyond_f64_precision() {
241 let two_pow_53 = 9_007_199_254_740_992_i64; let records = vec![
246 json!({"id": 1, "seq": two_pow_53 + 1}),
247 json!({"id": 2, "seq": two_pow_53 + 2}),
248 ];
249 let start = json!(two_pow_53);
250 let filtered = filter_incremental(records, "seq", &start);
251 assert_eq!(
252 filtered.len(),
253 2,
254 "both values are strictly greater than 2^53"
255 );
256 }
257
258 #[test]
259 fn json_compare_distinguishes_large_integers() {
260 let a = json!(9_007_199_254_740_993_i64); let b = json!(9_007_199_254_740_992_i64); assert_eq!(json_compare(&a, &b), Ordering::Greater);
263 }
264
265 #[test]
266 fn filter_incremental_keeps_records_on_type_mismatch() {
267 let records = vec![json!({"id": 1, "seq": 20_240_701})];
271 let start = json!("2024-06-01"); let filtered = filter_incremental(records, "seq", &start);
273 assert_eq!(filtered.len(), 1, "type mismatch must not silently drop");
274 }
275}