use std::{collections::HashSet, time::Duration};
use futures::{Future, FutureExt, StreamExt};
use mofo::Mofo;
use serde_json::json;
use endr::{Diff, Node, ObjectState, SetItem, MemoryStorageBackend};
fn timeout<T, F: Future<Output = T>>(f: F) -> impl Future<Output = T> {
tokio::time::timeout(Duration::from_millis(100), f).map(|r| r.expect("Timed out"))
}
#[tokio::test]
async fn can_create_append_and_read_a_log() {
let node = Node::new(Mofo::new(), MemoryStorageBackend::new());
let (id, write_access) = node.create_log::<()>(None).await;
node.append_to_log(id, &write_access, litl::Val::number(1.0)).await;
match &*timeout(node.load_object(id)).await.borrow() {
ObjectState::Log(log) => {
assert_eq!(log.entries, vec![litl::Val::number(1.0)]);
}
_ => panic!("Expected a log"),
}
}
#[tokio::test]
async fn can_create_insert_into_and_read_a_set() {
let node = Node::new(Mofo::new(), MemoryStorageBackend::new());
let (id, write_access) = node.create_set::<()>(None).await;
let data1 = json!([1, 2, 3]);
let data2 = json!([4, 5, 6]);
let first_item_id = node
.insert_into_set(id, &write_access, SetItem::new(data1.clone(), None))
.await
.unwrap();
let _ = node
.insert_into_set(
id,
&write_access,
SetItem::new(data2.clone(), Some(first_item_id)),
)
.await
.unwrap();
match &*timeout(node.load_object(id)).await.borrow() {
ObjectState::Set(set) => {
assert!(set.header.is_some());
assert_eq!(
set.items
.values_ordered().iter()
.map(|item| item.data.clone())
.collect::<HashSet<_>>(),
[data1, data2]
.iter()
.map(|d| litl::to_val(d).unwrap())
.collect()
);
}
_ => panic!("Expected a set"),
}
}
#[tokio::test]
async fn can_create_a_blob() {
let node = Node::new(Mofo::new(), MemoryStorageBackend::new());
let data = json!([1, 2, 3]);
let id = node.create_blob(data.clone()).await;
match &*timeout(node.load_object(id)).await.borrow() {
ObjectState::Blob(blob) => {
assert_eq!(blob.data.as_ref().unwrap(), &litl::to_val(&data).unwrap());
}
_ => panic!("Expected a blob"),
}
}
#[tokio::test]
async fn listeners_get_notified_on_local_log_appends() {
let node = Node::new(Mofo::new(), MemoryStorageBackend::new());
let (id, write_access) = node.create_log::<()>(None).await;
let mut diffs = node.diffs(id, "test".to_owned());
match timeout(diffs.next()).await.unwrap() {
Diff::Log(diff) => {
assert!(diff.header.is_some());
assert_eq!(diff.new_entries, Vec::<litl::Val>::new());
}
_ => panic!("Expected a log diff"),
}
node.append_to_log(id, &write_access, litl::Val::number(1.0)).await;
match timeout(diffs.next()).await.unwrap() {
Diff::Log(diff) => {
assert_eq!(diff.new_entries, vec![litl::Val::number(1.0)]);
}
_ => panic!("Expected a log diff"),
}
}
#[tokio::test]
async fn listeners_get_notified_with_initial_state() {
let node = Node::new(Mofo::new(), MemoryStorageBackend::new());
let (id, write_access) = node.create_log::<()>(None).await;
node.append_to_log(id, &write_access, litl::Val::number(1.0)).await;
node.append_to_log(id, &write_access, litl::Val::number(2.0)).await;
let mut diffs = node.diffs(id, "test".to_owned());
match timeout(diffs.next()).await.unwrap() {
Diff::Log(diff) => {
assert!(diff.header.is_some());
assert_eq!(diff.new_entries, vec![litl::Val::number(1.0), litl::Val::number(2.0)]);
}
_ => panic!("Expected a log diff"),
}
node.append_to_log(id, &write_access, litl::Val::number(3.0)).await;
match timeout(diffs.next()).await.unwrap() {
Diff::Log(diff) => {
assert_eq!(diff.new_entries, vec![litl::Val::number(3.0)]);
}
_ => panic!("Expected a log diff"),
}
}