dittolive-ditto 4.11.5

Ditto is a peer to peer cross-platform database that allows mobile, web, IoT and server apps to sync with or without an internet connection.
use core::{fmt::Debug, hash::Hash};
use std::{
    collections::HashSet,
    sync::{mpsc, Arc, Mutex},
};

use dittolive_ditto::dql::{Diff, DiffMove, Differ, QueryResultItem};
use serde_json::json;
use static_assertions::{assert_impl_all, assert_not_impl_any};

mod common;

// === Public API trait impls ===

assert_impl_all!(DiffMove: Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Send, Sync, Sized);

assert_impl_all!(Diff: Debug, Clone, Eq, PartialEq, Default, Send, Sync, Sized);
assert_not_impl_any!(Diff: Ord, Hash); // semver hazards

assert_impl_all!(Differ: Debug, Default, Send, Sync, Sized);
assert_not_impl_any!(Differ: Eq, Ord, Hash); // semver hazards

#[test]
fn differ_initializes_new_instance_with_defaults() {
    let _differ = Differ::new();
}

#[test]
fn diff_returns_diff_between_an_empty_array_and_passed_in_items_on_first_call() {
    let items = vec![
        json!({"_id": "1", "value": "A"}),
        json!({"_id": "2", "value": "B"}),
        json!({"_id": "3", "value": "C"}),
    ]
    .into_iter()
    .map(|jsvalue| QueryResultItem::unstable_try_from_serde_json_value(jsvalue).unwrap());

    let differ = Differ::new();
    let initial_diff = differ.diff(items);
    let insertions: HashSet<usize> = HashSet::from([0, 1, 2]);

    assert_eq!(initial_diff.insertions, insertions);
}

#[test]
fn diff_returns_diffs_for_a_series_of_simple_updates() {
    let items1 = vec![json!({"_id": "1", "value": "A"})]
        .into_iter()
        .map(|jsvalue| QueryResultItem::unstable_try_from_serde_json_value(jsvalue).unwrap());

    let differ = Differ::new();
    let diff1 = differ.diff(items1);
    let mut expected_diff1 = Diff::empty();
    expected_diff1.insertions = HashSet::from_iter([0]);
    assert_eq!(diff1, expected_diff1);

    let items2 = vec![
        json!({"_id": "1", "value": "A"}),
        json!({"_id": "2", "value": "B"}),
        json!({"_id": "3", "value": "C"}),
    ]
    .into_iter()
    .map(|jsvalue| QueryResultItem::unstable_try_from_serde_json_value(jsvalue).unwrap());

    let diff2 = differ.diff(items2);
    let mut expected_diff2 = Diff::empty();
    expected_diff2.insertions = HashSet::from_iter([1, 2]);
    assert_eq!(diff2, expected_diff2);

    let items3 = vec![
        json!({"_id": "1", "value": "A"}),
        json!({"_id": "2", "value": "B.updated"}),
        json!({"_id": "3", "value": "C"}),
    ]
    .into_iter()
    .map(|jsvalue| QueryResultItem::unstable_try_from_serde_json_value(jsvalue).unwrap());

    let diff3 = differ.diff(items3);
    let mut expected_diff3 = Diff::empty();
    expected_diff3.updates = HashSet::from_iter([1]);
    assert_eq!(diff3, expected_diff3);

    let items4 = vec![
        json!({"_id": "3", "value": "C.moved_up"}),
        json!({"_id": "2", "value": "B.updated"}),
    ]
    .into_iter()
    .map(|jsvalue| QueryResultItem::unstable_try_from_serde_json_value(jsvalue).unwrap());

    let diff4 = differ.diff(items4);
    let mut expected_diff4 = Diff::empty();
    expected_diff4.deletions = HashSet::from_iter([0]);
    expected_diff4.updates = HashSet::from_iter([0]);
    expected_diff4.moves = HashSet::from_iter([(2, 0), (1, 1)].map(Into::into));
    assert_eq!(diff4, expected_diff4);
}

#[test]
fn diff_returns_diff_between_previous_and_passed_in_items_with_default_ids() {
    let items_old = vec![
        json!({"_id": "0", "value": "A"}),
        json!({"_id": "1", "value": "B"}), // Will move to 4.
        json!({"_id": "2", "value": "C"}),
        json!({"_id": "3", "value": "D"}),
        json!({"_id": "4", "value": "E"}), // Will be updated.
        json!({"_id": "5", "value": "F"}),
        json!({"_id": "6", "value": "G"}),
        json!({"_id": "7", "value": "H"}), // Will be deleted.
        json!({"_id": "8", "value": "I"}),
        json!({"_id": "9", "value": "J"}),
    ]
    .into_iter()
    .map(|jsvalue| QueryResultItem::unstable_try_from_serde_json_value(jsvalue).unwrap());

    let items_new = vec![
        json!({"_id": "0", "value": "A"}),
        json!({"_id": "2", "value": "C"}),
        json!({"_id": "3", "value": "D"}),
        json!({"_id": "4", "value": "X"}),
        json!({"_id": "1", "value": "B"}),
        json!({"_id": "5", "value": "F"}),
        json!({"_id": "6", "value": "G"}),
        json!({"_id": "8", "value": "I"}),
        json!({"_id": "9", "value": "J"}),
        json!({"_id": "10", "value": "X"}),
    ]
    .into_iter()
    .map(|jsvalue| QueryResultItem::unstable_try_from_serde_json_value(jsvalue).unwrap());

    let differ = Differ::new();
    differ.diff(items_old); // Ignore initial diff
    let diff = differ.diff(items_new);

    let expected_insertions = HashSet::from([9]);
    let expected_deletions = HashSet::from([7]);
    let expected_updates = HashSet::from([3]);
    let expected_moves = HashSet::from([(2, 1), (3, 2), (4, 3), (1, 4)].map(Into::into));

    assert_eq!(diff.insertions, expected_insertions);
    assert_eq!(diff.deletions, expected_deletions);
    assert_eq!(diff.updates, expected_updates);
    assert_eq!(diff.moves, expected_moves);
}

#[tokio::test]
async fn diffing_on_a_rolling_basis_across_multiple_store_observer_callback_calls(
) -> anyhow::Result<()> {
    let ditto = common::get_ditto(None).unwrap();
    ditto.disable_sync_with_v3().unwrap();
    let store = ditto.store();

    let (tx_fired_initial_expectation, rx_fired_initial_expectation) = mpsc::channel();
    let (tx_fired_first_update, rx_fired_first_update) = mpsc::channel();
    let (tx_fired_second_update, rx_fired_second_update) = mpsc::channel();
    let (tx_fired_third_update, rx_fired_third_update) = mpsc::channel();

    let collection_name = uuid::Uuid::new_v4().to_string();
    let differ = Arc::new(Mutex::new(Differ::new()));
    let store_observation_handler_calls = Arc::new(Mutex::new(Vec::new()));

    let differ_clone = Arc::clone(&differ);
    let store_observation_handler_calls_clone = Arc::clone(&store_observation_handler_calls);
    let observer = store
        .register_observer_v2(
            format!("SELECT * from `{}` ORDER BY a ASC", collection_name),
            move |query_result| {
                let diff = differ_clone.lock().unwrap().diff(&query_result);
                let mut calls = store_observation_handler_calls_clone.lock().unwrap();
                calls.push(diff);
                match calls.len() {
                    1 => tx_fired_initial_expectation.send(true).unwrap(),
                    2 => tx_fired_first_update.send(true).unwrap(),
                    3 => tx_fired_second_update.send(true).unwrap(),
                    4 => tx_fired_third_update.send(true).unwrap(),
                    _ => {}
                }
            },
        )
        .unwrap();

    assert!(rx_fired_initial_expectation.recv().unwrap());
    let diff_initial = store_observation_handler_calls.lock().unwrap()[0].clone();
    let expected_initial_diff = Diff::empty();
    assert_eq!(diff_initial, expected_initial_diff);

    store
        .execute_v2((
            format!(
                "INSERT INTO `{}` DOCUMENTS (:doc1), (:doc2), (:doc3)",
                collection_name
            ),
            json!({
                         "doc1" : {"_id" : "1", "a" : "A"},
                         "doc2" : {"_id" : "2", "a" : "B"},
                         "doc3" : {"_id" : "3", "a" : "C"}
            }),
        ))
        .await?;

    // Wait for the first update
    assert!(rx_fired_first_update.recv().unwrap());
    let diff_first_update = store_observation_handler_calls.lock().unwrap()[1].clone();
    let mut expected_first_update_diff = Diff::empty();
    expected_first_update_diff.insertions = HashSet::from([0, 1, 2]);
    assert_eq!(diff_first_update, expected_first_update_diff);

    store
        .execute_v2(format!(
            "UPDATE COLLECTION `{}` SET a = 'Z' WHERE _id = '1'",
            collection_name
        ))
        .await?;

    assert!(rx_fired_second_update.recv().unwrap());
    let diff_second_update = store_observation_handler_calls.lock().unwrap()[2].clone();
    let mut expected_second_update_diff = Diff::empty();
    expected_second_update_diff.updates = HashSet::from([2]);
    expected_second_update_diff.moves = HashSet::from([(1, 0), (2, 1), (0, 2)].map(Into::into));
    assert_eq!(diff_second_update, expected_second_update_diff);

    store
        .execute_v2(format!("DELETE FROM `{}` WHERE _id = '2'", collection_name))
        .await?;

    assert!(rx_fired_third_update.recv().unwrap());
    let diff_third_update = store_observation_handler_calls.lock().unwrap()[3].clone();
    let mut expected_third_update_diff = Diff::empty();
    expected_third_update_diff.deletions = HashSet::from([0]);
    assert_eq!(diff_third_update, expected_third_update_diff);

    observer.cancel();

    Ok(())
}