mod common;
use std::sync::Arc;
use tempfile::{TempDir, tempdir};
use futures_util::{StreamExt, TryStreamExt};
use vstorage::{
ItemKind,
base::{CreateItemOptions, Storage},
property::Property,
sync::{
Side,
declare::{OnEmpty, StoragePair, SyncedCollection},
execute::Executor,
operation::{ItemOp, Operation, PropertyOp, PropertyOpKind},
plan::{Plan, incremental::IncrementalPlan},
status::StatusDatabase,
},
vdir::VdirStorage,
watch::{Event, EventKind},
};
use common::minimal_icalendar_with_uid;
async fn setup_synced_pair() -> (
Arc<VdirStorage>,
Arc<VdirStorage>,
Arc<StatusDatabase>,
StoragePair,
TempDir,
TempDir,
) {
let path_a = tempdir().unwrap();
let path_b = tempdir().unwrap();
let storage_a = Arc::new(
VdirStorage::builder(path_a.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let storage_b = Arc::new(
VdirStorage::builder(path_b.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
storage_a.create_collection("cal").await.unwrap();
storage_b.create_collection("cal").await.unwrap();
let item = minimal_icalendar_with_uid("test-event-uid", "Test Event")
.unwrap()
.into();
let opts = CreateItemOptions::default();
storage_a
.create_item("cal", &item, opts.clone())
.await
.unwrap();
storage_b.create_item("cal", &item, opts).await.unwrap();
let pair = StoragePair::new(storage_a.clone(), storage_b.clone())
.with_mapping(SyncedCollection::direct("cal".parse().unwrap()))
.on_empty(OnEmpty::Sync);
let status = Arc::new(StatusDatabase::open_or_create(":memory:").unwrap());
let mut operations = Plan::new(pair.clone(), Some(status.clone())).await.unwrap();
let executor = Executor::new(storage_a.clone(), storage_b.clone(), status.clone());
while let Some(op) = operations.next().await {
executor
.execute_operation(op.unwrap())
.await
.unwrap()
.unwrap();
}
(storage_a, storage_b, status, pair, path_a, path_b)
}
#[tokio::test]
async fn empty_events_yields_no_operations() {
let (_storage_a, _storage_b, status, pair, _tmp_a, _tmp_b) = setup_synced_pair().await;
let mut plan = IncrementalPlan::new(pair, Some(status), Vec::new())
.await
.unwrap();
assert!(plan.next().await.is_none());
}
#[tokio::test]
async fn storage_event_updates_item() {
let (storage_a, _storage_b, status, pair, _tmp_a, _tmp_b) = setup_synced_pair().await;
let items = storage_a.get_all_items("cal").await.unwrap();
let modified = minimal_icalendar_with_uid("test-event-uid", "Modified Event")
.unwrap()
.into();
storage_a
.update_item(&items[0].href, &items[0].etag, &modified)
.await
.unwrap();
let events = vec![(Event::Storage, Side::A)];
let plan = IncrementalPlan::new(pair, Some(status), events)
.await
.unwrap();
let ops: Vec<Operation> = plan.try_collect().await.unwrap();
assert_eq!(ops.len(), 1, "Expected exactly one op; got: {:?}", ops);
assert!(matches!(
&ops[0],
Operation::Item(ItemOp::Write(w)) if w.target_side == Side::B
));
}
#[tokio::test]
async fn item_change_yields_write_op() {
let (storage_a, _storage_b, status, pair, _tmp_a, _tmp_b) = setup_synced_pair().await;
let items = storage_a.get_all_items("cal").await.unwrap();
let modified = minimal_icalendar_with_uid("test-event-uid", "Modified Event")
.unwrap()
.into();
storage_a
.update_item(&items[0].href, &items[0].etag, &modified)
.await
.unwrap();
let events = vec![(Event::Collection("cal".into(), EventKind::Change), Side::A)];
let plan = IncrementalPlan::new(pair, Some(status), events)
.await
.unwrap();
let ops: Vec<Operation> = plan.try_collect().await.unwrap();
assert_eq!(ops.len(), 1, "Expected exactly one op; got: {:?}", ops);
assert!(matches!(
&ops[0],
Operation::Item(ItemOp::Write(w)) if w.target_side == Side::B
));
}
#[tokio::test]
async fn item_delete_yields_delete_op() {
let (storage_a, _storage_b, status, pair, _tmp_a, _tmp_b) = setup_synced_pair().await;
let items = storage_a.get_all_items("cal").await.unwrap();
storage_a
.delete_item(&items[0].href, &items[0].etag)
.await
.unwrap();
let events = vec![(
Event::Item("cal".into(), items[0].href.clone(), EventKind::Delete),
Side::A,
)];
let plan = IncrementalPlan::new(pair, Some(status), events)
.await
.unwrap();
let ops: Vec<Operation> = plan.try_collect().await.unwrap();
assert_eq!(ops.len(), 1, "Expected exactly one op; got: {:?}", ops);
assert!(matches!(
&ops[0],
Operation::Item(ItemOp::Delete(d)) if d.side == Side::B
));
}
#[tokio::test]
async fn unchanged_collection_yields_no_ops() {
let (_storage_a, _storage_b, status, pair, _tmp_a, _tmp_b) = setup_synced_pair().await;
let events = vec![(Event::Collection("cal".into(), EventKind::Change), Side::A)];
let mut plan = IncrementalPlan::new(pair, Some(status), events)
.await
.unwrap();
assert!(plan.next().await.is_none());
}
#[tokio::test]
async fn property_event_yields_property_op() {
let (storage_a, _storage_b, status, pair, _tmp_a, _tmp_b) = setup_synced_pair().await;
storage_a
.set_property("cal", Property::DisplayName, "Work")
.await
.unwrap();
let items = storage_a.get_all_items("cal").await.unwrap();
storage_a
.delete_item(&items[0].href, &items[0].etag)
.await
.unwrap();
let events = vec![(
Event::Property("cal".into(), Property::DisplayName, EventKind::Change),
Side::A,
)];
let plan = IncrementalPlan::new(pair, Some(status), events)
.await
.unwrap();
let ops: Vec<Operation> = plan.try_collect().await.unwrap();
assert_eq!(ops.len(), 1, "Expected exactly one op; got: {:?}", ops);
assert!(matches!(
&ops[0],
Operation::Property(PropertyOp {
property: Property::DisplayName,
kind: PropertyOpKind::Write { side: Side::B, value },
..
}) if value == "Work"
));
}
#[tokio::test]
async fn collection_event_alone_syncs_property_changes() {
let (storage_a, _storage_b, status, pair, _tmp_a, _tmp_b) = setup_synced_pair().await;
storage_a
.set_property("cal", Property::DisplayName, "Work")
.await
.unwrap();
let events = vec![(Event::Collection("cal".into(), EventKind::Change), Side::A)];
let plan = IncrementalPlan::new(pair, Some(status), events)
.await
.unwrap();
let ops: Vec<Operation> = plan.try_collect().await.unwrap();
assert_eq!(
ops.len(),
1,
"Expected exactly one operation; got: {:?}",
ops
);
assert!(matches!(
&ops[0],
Operation::Property(PropertyOp {
property: Property::DisplayName,
kind: PropertyOpKind::Write { side: Side::B, value },
..
}) if value == "Work"
));
}
#[tokio::test]
async fn duplicate_property_events_emit_one_operation() {
let (storage_a, _storage_b, status, pair, _tmp_a, _tmp_b) = setup_synced_pair().await;
storage_a
.set_property("cal", Property::DisplayName, "Work")
.await
.unwrap();
let events = vec![
(
Event::Property("cal".into(), Property::DisplayName, EventKind::Change),
Side::A,
),
(
Event::Property("cal".into(), Property::DisplayName, EventKind::Change),
Side::A,
),
];
let plan = IncrementalPlan::new(pair, Some(status), events)
.await
.unwrap();
let ops: Vec<Operation> = plan.try_collect().await.unwrap();
assert_eq!(ops.len(), 1, "Expected exactly one op; got: {:?}", ops);
assert!(matches!(
&ops[0],
Operation::Property(PropertyOp {
property: Property::DisplayName,
kind: PropertyOpKind::Write { side: Side::B, value },
..
}) if value == "Work"
));
}