use std::sync::Arc;
use futures_util::stream::{Stream, StreamExt};
use crate::sync::{
analysis::ResolvedMapping,
conflict::ConflictResolver,
declare::StoragePair,
operation::{ItemOp, Operation, PropertyOp, PropertyOpKind},
status::{MappingUid, StatusDatabase},
};
use super::{PlanError, collection::CollectionGenerator};
pub(super) struct PlanStreamState {
pub(super) pair: StoragePair,
pub(super) status: Option<Arc<StatusDatabase>>,
pub(super) mappings: std::vec::IntoIter<Arc<ResolvedMapping>>,
pub(super) current_collection: Option<CollectionGenerator>,
pub(super) stale_mappings: Option<Vec<MappingUid>>,
}
impl PlanStreamState {
pub(super) async fn next_operation(
mut self,
) -> Option<(Result<Operation, PlanError>, PlanStreamState)> {
if let Some(stale_uids) = self.stale_mappings.take() {
return Some((Ok(Operation::FlushStaleMappings { stale_uids }), self));
}
loop {
if let Some(ref mut collection) = self.current_collection {
if let Some(result) = collection
.next_operation(&self.pair, self.status.as_deref())
.await
{
return Some((result, self));
}
self.current_collection = None;
}
if let Some(mapping) = self.mappings.next() {
match CollectionGenerator::new(&self.pair, mapping, self.status.as_deref()).await {
Ok(Some(collection_plan)) => {
self.current_collection = Some(collection_plan);
}
Ok(None) => {} Err(e) => return Some((Err(e), self)),
}
} else {
return None;
}
}
}
}
pub fn resolve_conflicts<S, R>(
operations: S,
resolver: R,
) -> impl Stream<Item = Result<Operation, PlanError>>
where
S: Stream<Item = Result<Operation, PlanError>>,
R: ConflictResolver + Clone + 'static,
{
operations.then(move |result| {
let resolver = resolver.clone();
async move {
match result {
Ok(operation) => match operation {
Operation::Item(ItemOp::Conflict {
info, mapping_uid, ..
}) => match mapping_uid.resolve().await {
Ok(uid) => Ok(Operation::Item(resolver.resolve_item(info, uid))),
Err(err) => Err(PlanError::from(err)),
},
Operation::Property(PropertyOp {
kind: PropertyOpKind::Conflict { value_a, value_b },
property,
mapping,
mapping_uid,
..
}) => match mapping_uid.resolve().await {
Ok(uid) => Ok(Operation::Property(
resolver.resolve_property(property, value_a, value_b, mapping, uid),
)),
Err(err) => Err(PlanError::from(err)),
},
op => Ok(op),
},
Err(e) => Err(e),
}
}
})
}
#[cfg(test)]
mod test {
use std::{str::FromStr, sync::Arc};
use futures_util::stream::StreamExt;
use tempfile::Builder;
use crate::{
CollectionId, ItemKind,
sync::{
declare::{OnEmpty, StoragePair, SyncedCollection},
operation::{CollectionOp, ItemOp, Operation},
plan::Plan,
status::Side,
},
vdir::VdirStorage,
};
#[tokio::test]
async fn test_stream_no_mappings() {
let dir_a = Builder::new().prefix("vstorage").tempdir().unwrap();
let dir_b = Builder::new().prefix("vstorage").tempdir().unwrap();
let storage_a = Arc::new(
VdirStorage::builder(dir_a.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let storage_b = Arc::from(
VdirStorage::builder(dir_b.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let pair = StoragePair::new(storage_a.clone(), storage_b.clone());
let stream = Plan::new(pair, None).await.unwrap();
let operations: Vec<_> = stream.collect::<Vec<_>>().await;
assert_eq!(operations.len(), 0);
}
#[tokio::test]
async fn test_stream_simple_mapping() {
let dir_a = Builder::new().prefix("vstorage").tempdir().unwrap();
let dir_b = Builder::new().prefix("vstorage").tempdir().unwrap();
let storage_a = Arc::new(
VdirStorage::builder(dir_a.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let storage_b = Arc::from(
VdirStorage::builder(dir_b.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let collection = CollectionId::from_str("test").unwrap();
let pair = StoragePair::new(storage_a.clone(), storage_b.clone())
.with_mapping(SyncedCollection::direct(collection));
let stream = Plan::new(pair, None).await.unwrap();
let operations: Vec<_> = stream.collect::<Vec<_>>().await;
assert!(!operations.is_empty());
let first = operations.first().unwrap();
assert!(matches!(first, Ok(Operation::Collection(_))));
}
#[tokio::test]
async fn test_collection_creation_before_items() {
let dir_a = Builder::new().prefix("vstorage").tempdir().unwrap();
let dir_b = Builder::new().prefix("vstorage").tempdir().unwrap();
let storage_a = Arc::new(
VdirStorage::builder(dir_a.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let storage_b = Arc::from(
VdirStorage::builder(dir_b.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let collection_id = CollectionId::from_str("test").unwrap();
let collection_path = dir_a.path().join("test");
std::fs::create_dir(&collection_path).unwrap();
std::fs::write(
collection_path.join("item.ics"),
[
"BEGIN:VCALENDAR",
"VERSION:2.0",
"BEGIN:VEVENT",
"UID:test-item",
"DTSTART:20240101T120000Z",
"END:VEVENT",
"END:VCALENDAR",
"",
]
.join("\r\n"),
)
.unwrap();
let pair = StoragePair::new(storage_a.clone(), storage_b.clone())
.with_mapping(SyncedCollection::direct(collection_id));
let stream = Plan::new(pair, None).await.unwrap();
let operations: Vec<_> = stream.collect::<Vec<_>>().await;
let mut collection_create_pos = None;
let mut first_item_pos = None;
for (i, op_result) in operations.iter().enumerate() {
if let Ok(op) = op_result {
match op {
Operation::Collection(
CollectionOp::CreateInOne { .. } | CollectionOp::CreateInBoth { .. },
) => {
assert!(
collection_create_pos.replace(i).is_none(),
"more than one collection creation event"
);
}
Operation::Item(ItemOp::Write(w)) if w.target_side == Side::B => {
assert!(
first_item_pos.replace(i).is_none(),
"more than one item write event"
);
}
_ => {}
}
}
}
let coll_pos = collection_create_pos.unwrap();
let item_pos = first_item_pos.unwrap();
assert!(coll_pos < item_pos);
}
#[tokio::test]
async fn test_collection_deletion_after_items() {
use crate::sync::status::StatusDatabase;
let dir_a = Builder::new().prefix("vstorage").tempdir().unwrap();
let dir_b = Builder::new().prefix("vstorage").tempdir().unwrap();
let db_path = Builder::new().prefix("vstorage").tempdir().unwrap();
let storage_a = Arc::new(
VdirStorage::builder(dir_a.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let storage_b = Arc::from(
VdirStorage::builder(dir_b.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let collection_id = CollectionId::from_str("test").unwrap();
let collection_path = dir_b.path().join("test");
std::fs::create_dir(&collection_path).unwrap();
std::fs::write(
collection_path.join("item.ics"),
[
"BEGIN:VCALENDAR",
"VERSION:2.0",
"BEGIN:VEVENT",
"UID:test-item",
"DTSTART:20240101T120000Z",
"END:VEVENT",
"END:VCALENDAR",
"",
]
.join("\r\n"),
)
.unwrap();
let status_path = db_path.path().join("status.db");
let status = Arc::new(StatusDatabase::open_or_create(&status_path).unwrap());
let pair = StoragePair::new(storage_a.clone(), storage_b.clone())
.with_mapping(SyncedCollection::direct(collection_id.clone()))
.on_empty(OnEmpty::Sync);
{
use crate::sync::execute::Executor;
let stream = Plan::new(pair.clone(), Some(status.clone())).await.unwrap();
let operations: Vec<_> = stream.collect::<Vec<_>>().await;
Executor::new(|_| {})
.execute_stream(
storage_a.clone(),
storage_b.clone(),
futures_util::stream::iter(operations),
&status,
)
.await
.unwrap()
.unwrap();
}
std::fs::remove_dir_all(&collection_path).unwrap();
let stream = Plan::new(pair, Some(status)).await.unwrap();
let operations: Vec<_> = stream.collect::<Vec<_>>().await;
let mut first_item_delete_pos = None;
let mut collection_delete_pos = None;
for (i, op_result) in operations.iter().enumerate() {
if let Ok(op) = op_result {
match op {
Operation::Item(ItemOp::Delete(d)) if d.side == Side::A => {
if first_item_delete_pos.is_none() {
first_item_delete_pos = Some(i);
}
}
Operation::Collection(CollectionOp::Delete { .. }) => {
if collection_delete_pos.is_none() {
collection_delete_pos = Some(i);
}
}
_ => {}
}
}
}
let item_pos = first_item_delete_pos.unwrap();
let coll_pos = collection_delete_pos.unwrap();
assert!(
item_pos < coll_pos,
"Item deletion (pos {item_pos}) must come before collection deletion (pos {coll_pos})"
);
}
}