endr 0.9.0

endr: append-only replicated objects
Documentation
use std::{collections::HashSet, time::Duration};

use futures::{Future, FutureExt, StreamExt};
use mofo::Mofo;
use serde_json::json;
use endr::{Diff, Node, ObjectState, SetItem, MemoryStorageBackend};

fn timeout<T, F: Future<Output = T>>(f: F) -> impl Future<Output = T> {
    tokio::time::timeout(Duration::from_millis(100), f).map(|r| r.expect("Timed out"))
}

#[tokio::test]
async fn can_create_append_and_read_a_log() {
    let node = Node::new(Mofo::new(), MemoryStorageBackend::new());

    let (id, write_access) = node.create_log::<()>(None).await;

    node.append_to_log(id, &write_access, litl::Val::number(1.0)).await;

    match &*timeout(node.load_object(id)).await.borrow() {
        ObjectState::Log(log) => {
            assert_eq!(log.entries, vec![litl::Val::number(1.0)]);
        }
        _ => panic!("Expected a log"),
    }
}

#[tokio::test]
async fn can_create_insert_into_and_read_a_set() {
    let node = Node::new(Mofo::new(), MemoryStorageBackend::new());

    let (id, write_access) = node.create_set::<()>(None).await;

    let data1 = json!([1, 2, 3]);
    let data2 = json!([4, 5, 6]);

    let first_item_id = node
        .insert_into_set(id, &write_access, SetItem::new(data1.clone(), None))
        .await
        .unwrap();
    let _ = node
        .insert_into_set(
            id,
            &write_access,
            SetItem::new(data2.clone(), Some(first_item_id)),
        )
        .await
        .unwrap();

    match &*timeout(node.load_object(id)).await.borrow() {
        ObjectState::Set(set) => {
            assert!(set.header.is_some());
            assert_eq!(
                set.items
                    .values_ordered().iter()
                    .map(|item| item.data.clone())
                    .collect::<HashSet<_>>(),
                [data1, data2]
                    .iter()
                    .map(|d| litl::to_val(d).unwrap())
                    .collect()
            );
        }
        _ => panic!("Expected a set"),
    }
}

#[tokio::test]
async fn can_create_a_blob() {
    let node = Node::new(Mofo::new(), MemoryStorageBackend::new());

    let data = json!([1, 2, 3]);
    let id = node.create_blob(data.clone()).await;

    match &*timeout(node.load_object(id)).await.borrow() {
        ObjectState::Blob(blob) => {
            assert_eq!(blob.data.as_ref().unwrap(), &litl::to_val(&data).unwrap());
        }
        _ => panic!("Expected a blob"),
    }
}

#[tokio::test]
async fn listeners_get_notified_on_local_log_appends() {
    let node = Node::new(Mofo::new(), MemoryStorageBackend::new());

    let (id, write_access) = node.create_log::<()>(None).await;

    let mut diffs = node.diffs(id, "test".to_owned());

    match timeout(diffs.next()).await.unwrap() {
        Diff::Log(diff) => {
            assert!(diff.header.is_some());
            assert_eq!(diff.new_entries, Vec::<litl::Val>::new());
        }
        _ => panic!("Expected a log diff"),
    }

    node.append_to_log(id, &write_access, litl::Val::number(1.0)).await;

    match timeout(diffs.next()).await.unwrap() {
        Diff::Log(diff) => {
            assert_eq!(diff.new_entries, vec![litl::Val::number(1.0)]);
        }
        _ => panic!("Expected a log diff"),
    }
}

#[tokio::test]
async fn listeners_get_notified_with_initial_state() {
    let node = Node::new(Mofo::new(), MemoryStorageBackend::new());

    let (id, write_access) = node.create_log::<()>(None).await;

    node.append_to_log(id, &write_access, litl::Val::number(1.0)).await;
    node.append_to_log(id, &write_access, litl::Val::number(2.0)).await;

    let mut diffs = node.diffs(id, "test".to_owned());

    match timeout(diffs.next()).await.unwrap() {
        Diff::Log(diff) => {
            assert!(diff.header.is_some());
            assert_eq!(diff.new_entries, vec![litl::Val::number(1.0), litl::Val::number(2.0)]);
        }
        _ => panic!("Expected a log diff"),
    }

    node.append_to_log(id, &write_access, litl::Val::number(3.0)).await;

    match timeout(diffs.next()).await.unwrap() {
        Diff::Log(diff) => {
            assert_eq!(diff.new_entries, vec![litl::Val::number(3.0)]);
        }
        _ => panic!("Expected a log diff"),
    }
}