noatun 0.1.3

Noatun is an in-process, distributed database with materialized view support.
Documentation
use crate::database::DatabaseSettings;
use crate::distributor::{Distributor, DistributorMessage, EphemeralNodeId, Neighborhood};
use crate::mini_pather::MiniPather;
use crate::noatun_instant::Instant;
use crate::tests::CounterMessage;
use crate::{set_test_epoch, Database, MessageId, NoatunTime};
use arcshift::ArcShift;
use datetime_literal::datetime;
use std::iter::once;
use std::sync::Arc;
use std::sync::RwLock;
use std::time::Duration;

fn create_app<'a>(
    msgs: impl IntoIterator<
        Item = (
            impl Into<NoatunTime>,
            &'a [NoatunTime],
            i32,
            u32,
            bool, /*local*/
        ),
    >,
) -> Database<CounterMessage> {
    let mut db: Database<CounterMessage> = Database::create_in_memory(
        10000,
        DatabaseSettings {
            mock_time: Some(datetime!(2021-01-01 Z).into()),
            ..Default::default()
        },
    )
    .unwrap();
    let mut sess = db.begin_session_mut().unwrap();

    for (id, parents, inc1, set1, local) in msgs {
        let id: NoatunTime = id.into();
        sess.append_single(
            &CounterMessage {
                id: MessageId::from_parts_for_test(id, 0),
                parent: parents
                    .iter()
                    .copied()
                    .map(|x| MessageId::from_parts_for_test(x, 0))
                    .collect(),
                inc1,
                set1,
            }
            .wrap(sess.current_cutoff_time().unwrap()),
            local,
        )
        .unwrap();
    }
    drop(sess);
    db
}

#[test]
fn test_distributor() {
    set_test_epoch(Instant::now());
    let mut app1 = create_app([(datetime!(2021-01-01 Z), [].as_slice(), 1, 0, true)]);
    let mut app2 = create_app([(datetime!(2021-01-02 Z), [].as_slice(), 1, 0, true)]);

    let mut dist1 = Distributor::new(
        Duration::from_secs(5),
        ArcShift::new(EphemeralNodeId::new(1)),
        Instant::now(),
        None,
        true,
    );
    let mut dist2 = Distributor::new(
        Duration::from_secs(5),
        ArcShift::new(EphemeralNodeId::new(2)),
        Instant::now(),
        None,
        true,
    );

    dist1.neighborhood =
        Neighborhood::new(Instant::now(), Arc::new(RwLock::new(MiniPather::new(1))));
    dist2.neighborhood =
        Neighborhood::new(Instant::now(), Arc::new(RwLock::new(MiniPather::new(2))));
    dist1
        .neighborhood
        .get_insert_peer(EphemeralNodeId::new(2), Instant::now());

    let sess1 = app1.begin_session().unwrap();
    let mut msg1 = dist1.get_periodic_message(&sess1, Instant::now()).unwrap();
    assert_eq!(msg1.len(), 1, "no resync is in progress");
    let msg1 = msg1.pop().unwrap();

    println!("dist1 sent: {msg1:?}");
    let mut result = dist2
        .receive_message2(&mut app2, once(msg1), Instant::now())
        .unwrap();

    println!("dist2 sent: {result:?}");

    insta::assert_debug_snapshot!(result);
    assert_eq!(result.len(), 1);

    let mut result = dist1
        .receive_message2(&mut app1, once(result.pop().unwrap()), Instant::now())
        .unwrap();
    println!("dist1 sent: {result:?}");
    insta::assert_debug_snapshot!(result);
    assert_eq!(result.len(), 1);

    let mut result = dist2
        .receive_message2(&mut app2, once(result.pop().unwrap()), Instant::now())
        .unwrap();
    println!("dist2 sent: {result:?}");
    insta::assert_debug_snapshot!(result);

    let mut result = dist1
        .receive_message2(&mut app1, once(result.pop().unwrap()), Instant::now())
        .unwrap();
    println!("dist1 sent: {result:?}");
    assert!(matches!(
        &result[0],
        DistributorMessage::Message {
            demand_ack: false,
            ..
        }
    ));
    assert_eq!(result.len(), 1);

    let _result = dist2
        .receive_message2(&mut app2, once(result.pop().unwrap()), Instant::now())
        .unwrap();
    let sess2 = app2.begin_session().unwrap();
    println!("App2 all msgs: {:?}", sess2.get_all_message_ids().unwrap());
    println!("App2 update heads: {:?}", sess2.get_update_heads());

    insta::assert_debug_snapshot!(sess2.get_all_message_ids().unwrap());
    insta::assert_debug_snapshot!(sess2.get_update_heads());
}