use core::{fmt::Debug, hash::Hash};
use std::{
collections::HashSet,
sync::{mpsc, Arc, Mutex},
};
use dittolive_ditto::dql::{Diff, DiffMove, Differ, QueryResultItem};
use serde_json::json;
use static_assertions::{assert_impl_all, assert_not_impl_any};
mod common;
assert_impl_all!(DiffMove: Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Send, Sync, Sized);
assert_impl_all!(Diff: Debug, Clone, Eq, PartialEq, Default, Send, Sync, Sized);
assert_not_impl_any!(Diff: Ord, Hash);
assert_impl_all!(Differ: Debug, Default, Send, Sync, Sized);
assert_not_impl_any!(Differ: Eq, Ord, Hash);
#[test]
fn differ_initializes_new_instance_with_defaults() {
let _differ = Differ::new();
}
#[test]
fn diff_returns_diff_between_an_empty_array_and_passed_in_items_on_first_call() {
let items = vec![
json!({"_id": "1", "value": "A"}),
json!({"_id": "2", "value": "B"}),
json!({"_id": "3", "value": "C"}),
]
.into_iter()
.map(|jsvalue| QueryResultItem::unstable_try_from_serde_json_value(jsvalue).unwrap());
let differ = Differ::new();
let initial_diff = differ.diff(items);
let insertions: HashSet<usize> = HashSet::from([0, 1, 2]);
assert_eq!(initial_diff.insertions, insertions);
}
#[test]
fn diff_returns_diffs_for_a_series_of_simple_updates() {
let items1 = vec![json!({"_id": "1", "value": "A"})]
.into_iter()
.map(|jsvalue| QueryResultItem::unstable_try_from_serde_json_value(jsvalue).unwrap());
let differ = Differ::new();
let diff1 = differ.diff(items1);
let mut expected_diff1 = Diff::empty();
expected_diff1.insertions = HashSet::from_iter([0]);
assert_eq!(diff1, expected_diff1);
let items2 = vec![
json!({"_id": "1", "value": "A"}),
json!({"_id": "2", "value": "B"}),
json!({"_id": "3", "value": "C"}),
]
.into_iter()
.map(|jsvalue| QueryResultItem::unstable_try_from_serde_json_value(jsvalue).unwrap());
let diff2 = differ.diff(items2);
let mut expected_diff2 = Diff::empty();
expected_diff2.insertions = HashSet::from_iter([1, 2]);
assert_eq!(diff2, expected_diff2);
let items3 = vec![
json!({"_id": "1", "value": "A"}),
json!({"_id": "2", "value": "B.updated"}),
json!({"_id": "3", "value": "C"}),
]
.into_iter()
.map(|jsvalue| QueryResultItem::unstable_try_from_serde_json_value(jsvalue).unwrap());
let diff3 = differ.diff(items3);
let mut expected_diff3 = Diff::empty();
expected_diff3.updates = HashSet::from_iter([1]);
assert_eq!(diff3, expected_diff3);
let items4 = vec![
json!({"_id": "3", "value": "C.moved_up"}),
json!({"_id": "2", "value": "B.updated"}),
]
.into_iter()
.map(|jsvalue| QueryResultItem::unstable_try_from_serde_json_value(jsvalue).unwrap());
let diff4 = differ.diff(items4);
let mut expected_diff4 = Diff::empty();
expected_diff4.deletions = HashSet::from_iter([0]);
expected_diff4.updates = HashSet::from_iter([0]);
expected_diff4.moves = HashSet::from_iter([(2, 0), (1, 1)].map(Into::into));
assert_eq!(diff4, expected_diff4);
}
#[test]
fn diff_returns_diff_between_previous_and_passed_in_items_with_default_ids() {
let items_old = vec![
json!({"_id": "0", "value": "A"}),
json!({"_id": "1", "value": "B"}), json!({"_id": "2", "value": "C"}),
json!({"_id": "3", "value": "D"}),
json!({"_id": "4", "value": "E"}), json!({"_id": "5", "value": "F"}),
json!({"_id": "6", "value": "G"}),
json!({"_id": "7", "value": "H"}), json!({"_id": "8", "value": "I"}),
json!({"_id": "9", "value": "J"}),
]
.into_iter()
.map(|jsvalue| QueryResultItem::unstable_try_from_serde_json_value(jsvalue).unwrap());
let items_new = vec![
json!({"_id": "0", "value": "A"}),
json!({"_id": "2", "value": "C"}),
json!({"_id": "3", "value": "D"}),
json!({"_id": "4", "value": "X"}),
json!({"_id": "1", "value": "B"}),
json!({"_id": "5", "value": "F"}),
json!({"_id": "6", "value": "G"}),
json!({"_id": "8", "value": "I"}),
json!({"_id": "9", "value": "J"}),
json!({"_id": "10", "value": "X"}),
]
.into_iter()
.map(|jsvalue| QueryResultItem::unstable_try_from_serde_json_value(jsvalue).unwrap());
let differ = Differ::new();
differ.diff(items_old); let diff = differ.diff(items_new);
let expected_insertions = HashSet::from([9]);
let expected_deletions = HashSet::from([7]);
let expected_updates = HashSet::from([3]);
let expected_moves = HashSet::from([(2, 1), (3, 2), (4, 3), (1, 4)].map(Into::into));
assert_eq!(diff.insertions, expected_insertions);
assert_eq!(diff.deletions, expected_deletions);
assert_eq!(diff.updates, expected_updates);
assert_eq!(diff.moves, expected_moves);
}
#[tokio::test]
async fn diffing_on_a_rolling_basis_across_multiple_store_observer_callback_calls(
) -> anyhow::Result<()> {
let ditto = common::get_ditto(None).unwrap();
ditto.disable_sync_with_v3().unwrap();
let store = ditto.store();
let (tx_fired_initial_expectation, rx_fired_initial_expectation) = mpsc::channel();
let (tx_fired_first_update, rx_fired_first_update) = mpsc::channel();
let (tx_fired_second_update, rx_fired_second_update) = mpsc::channel();
let (tx_fired_third_update, rx_fired_third_update) = mpsc::channel();
let collection_name = uuid::Uuid::new_v4().to_string();
let differ = Arc::new(Mutex::new(Differ::new()));
let store_observation_handler_calls = Arc::new(Mutex::new(Vec::new()));
let differ_clone = Arc::clone(&differ);
let store_observation_handler_calls_clone = Arc::clone(&store_observation_handler_calls);
let observer = store
.register_observer_v2(
format!("SELECT * from `{}` ORDER BY a ASC", collection_name),
move |query_result| {
let diff = differ_clone.lock().unwrap().diff(&query_result);
let mut calls = store_observation_handler_calls_clone.lock().unwrap();
calls.push(diff);
match calls.len() {
1 => tx_fired_initial_expectation.send(true).unwrap(),
2 => tx_fired_first_update.send(true).unwrap(),
3 => tx_fired_second_update.send(true).unwrap(),
4 => tx_fired_third_update.send(true).unwrap(),
_ => {}
}
},
)
.unwrap();
assert!(rx_fired_initial_expectation.recv().unwrap());
let diff_initial = store_observation_handler_calls.lock().unwrap()[0].clone();
let expected_initial_diff = Diff::empty();
assert_eq!(diff_initial, expected_initial_diff);
store
.execute_v2((
format!(
"INSERT INTO `{}` DOCUMENTS (:doc1), (:doc2), (:doc3)",
collection_name
),
json!({
"doc1" : {"_id" : "1", "a" : "A"},
"doc2" : {"_id" : "2", "a" : "B"},
"doc3" : {"_id" : "3", "a" : "C"}
}),
))
.await?;
assert!(rx_fired_first_update.recv().unwrap());
let diff_first_update = store_observation_handler_calls.lock().unwrap()[1].clone();
let mut expected_first_update_diff = Diff::empty();
expected_first_update_diff.insertions = HashSet::from([0, 1, 2]);
assert_eq!(diff_first_update, expected_first_update_diff);
store
.execute_v2(format!(
"UPDATE COLLECTION `{}` SET a = 'Z' WHERE _id = '1'",
collection_name
))
.await?;
assert!(rx_fired_second_update.recv().unwrap());
let diff_second_update = store_observation_handler_calls.lock().unwrap()[2].clone();
let mut expected_second_update_diff = Diff::empty();
expected_second_update_diff.updates = HashSet::from([2]);
expected_second_update_diff.moves = HashSet::from([(1, 0), (2, 1), (0, 2)].map(Into::into));
assert_eq!(diff_second_update, expected_second_update_diff);
store
.execute_v2(format!("DELETE FROM `{}` WHERE _id = '2'", collection_name))
.await?;
assert!(rx_fired_third_update.recv().unwrap());
let diff_third_update = store_observation_handler_calls.lock().unwrap()[3].clone();
let mut expected_third_update_diff = Diff::empty();
expected_third_update_diff.deletions = HashSet::from([0]);
assert_eq!(diff_third_update, expected_third_update_diff);
observer.cancel();
Ok(())
}