use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::cmp::Ordering;
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize, JsonSchema)]
#[serde(tag = "type")]
pub enum ReplicationMethod {
#[default]
FullTable,
Incremental,
}
pub fn filter_incremental(records: Vec<Value>, key: &str, start: &Value) -> Vec<Value> {
records
.into_iter()
.filter(|r| match r.get(key) {
None => false,
Some(v) if type_rank(v) != type_rank(start) => {
tracing::warn!(
key,
"incremental replication: record key type does not match the bookmark \
type; keeping the record to avoid silently dropping data"
);
true
}
Some(v) => json_gt(v, start),
})
.collect()
}
pub fn max_replication_value<'a>(records: &'a [Value], key: &str) -> Option<&'a Value> {
records
.iter()
.filter_map(|r| r.get(key))
.max_by(|a, b| json_compare(a, b))
}
pub fn max_value(a: Value, b: Value) -> Value {
match json_compare(&a, &b) {
Ordering::Less => b,
_ => a,
}
}
fn type_rank(v: &Value) -> u8 {
match v {
Value::Null => 0,
Value::Bool(_) => 1,
Value::Number(_) => 2,
Value::String(_) => 3,
Value::Array(_) => 4,
Value::Object(_) => 5,
}
}
fn number_as_i128(n: &serde_json::Number) -> Option<i128> {
n.as_i64()
.map(i128::from)
.or_else(|| n.as_u64().map(i128::from))
}
pub(crate) fn json_compare(a: &Value, b: &Value) -> Ordering {
match (a, b) {
(Value::Number(an), Value::Number(bn)) => {
match (number_as_i128(an), number_as_i128(bn)) {
(Some(ai), Some(bi)) => ai.cmp(&bi),
_ => {
let af = an.as_f64().unwrap_or(f64::NAN);
let bf = bn.as_f64().unwrap_or(f64::NAN);
af.partial_cmp(&bf).unwrap_or_else(|| {
match (af.is_nan(), bf.is_nan()) {
(false, true) => Ordering::Less,
(true, false) => Ordering::Greater,
_ => Ordering::Equal,
}
})
}
}
}
(Value::String(x), Value::String(y)) => x.cmp(y),
(Value::Bool(x), Value::Bool(y)) => x.cmp(y),
(Value::Null, Value::Null) => Ordering::Equal,
(Value::Array(x), Value::Array(y)) => {
for (xi, yi) in x.iter().zip(y.iter()) {
let c = json_compare(xi, yi);
if c != Ordering::Equal {
return c;
}
}
x.len().cmp(&y.len())
}
(Value::Object(_), Value::Object(_)) => a.to_string().cmp(&b.to_string()),
_ => type_rank(a).cmp(&type_rank(b)),
}
}
fn json_gt(a: &Value, b: &Value) -> bool {
json_compare(a, b) == Ordering::Greater
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_filter_incremental_strings() {
let records = vec![
json!({"id": 1, "updated_at": "2024-01-01"}),
json!({"id": 2, "updated_at": "2024-06-01"}),
json!({"id": 3, "updated_at": "2024-12-01"}),
];
let start = json!("2024-06-01");
let filtered = filter_incremental(records, "updated_at", &start);
assert_eq!(filtered.len(), 1);
assert_eq!(filtered[0]["id"], 3);
}
#[test]
fn test_filter_incremental_numbers() {
let records = vec![
json!({"id": 1, "seq": 100}),
json!({"id": 2, "seq": 200}),
json!({"id": 3, "seq": 300}),
];
let start = json!(150);
let filtered = filter_incremental(records, "seq", &start);
assert_eq!(filtered.len(), 2);
assert_eq!(filtered[0]["id"], 2);
assert_eq!(filtered[1]["id"], 3);
}
#[test]
fn test_filter_incremental_missing_key_excluded() {
let records = vec![
json!({"id": 1}),
json!({"id": 2, "updated_at": "2024-12-01"}),
];
let start = json!("2024-01-01");
let filtered = filter_incremental(records, "updated_at", &start);
assert_eq!(filtered.len(), 1);
assert_eq!(filtered[0]["id"], 2);
}
#[test]
fn test_filter_incremental_equal_excluded() {
let records = vec![
json!({"id": 1, "updated_at": "2024-06-01"}),
json!({"id": 2, "updated_at": "2024-06-02"}),
];
let start = json!("2024-06-01");
let filtered = filter_incremental(records, "updated_at", &start);
assert_eq!(filtered.len(), 1);
assert_eq!(filtered[0]["id"], 2);
}
#[test]
fn test_max_replication_value_strings() {
let records = vec![
json!({"updated_at": "2024-01-01"}),
json!({"updated_at": "2024-12-01"}),
json!({"updated_at": "2024-06-01"}),
];
let max = max_replication_value(&records, "updated_at").unwrap();
assert_eq!(max, &json!("2024-12-01"));
}
#[test]
fn test_max_replication_value_numbers() {
let records = vec![json!({"seq": 5}), json!({"seq": 10}), json!({"seq": 3})];
let max = max_replication_value(&records, "seq").unwrap();
assert_eq!(max, &json!(10));
}
#[test]
fn test_max_replication_value_empty() {
let records: Vec<Value> = vec![];
assert!(max_replication_value(&records, "updated_at").is_none());
}
#[test]
fn test_max_value_picks_larger_string() {
assert_eq!(
max_value(json!("2024-01-01"), json!("2024-06-01")),
json!("2024-06-01")
);
}
#[test]
fn test_max_value_picks_larger_number() {
assert_eq!(max_value(json!(5), json!(10)), json!(10));
}
#[test]
fn test_max_value_returns_a_on_type_mismatch() {
assert_eq!(max_value(json!("string"), json!(5)), json!("string"));
}
#[test]
fn filter_incremental_keeps_large_integer_beyond_f64_precision() {
let two_pow_53 = 9_007_199_254_740_992_i64; let records = vec![
json!({"id": 1, "seq": two_pow_53 + 1}),
json!({"id": 2, "seq": two_pow_53 + 2}),
];
let start = json!(two_pow_53);
let filtered = filter_incremental(records, "seq", &start);
assert_eq!(
filtered.len(),
2,
"both values are strictly greater than 2^53"
);
}
#[test]
fn json_compare_distinguishes_large_integers() {
let a = json!(9_007_199_254_740_993_i64); let b = json!(9_007_199_254_740_992_i64); assert_eq!(json_compare(&a, &b), Ordering::Greater);
}
#[test]
fn filter_incremental_keeps_records_on_type_mismatch() {
let records = vec![json!({"id": 1, "seq": 20_240_701})];
let start = json!("2024-06-01"); let filtered = filter_incremental(records, "seq", &start);
assert_eq!(filtered.len(), 1, "type mismatch must not silently drop");
}
}