use std::sync::Arc;
use futures_util::{Stream, StreamExt as _};
use crate::{
Href,
property::Property,
sync::{
Side,
items::ItemWithData,
mapping::ResolvedMapping,
operation::{
ItemOp, MappingUidSource, Operation, PropertyOp, PropertyOpKind, StatusWrite,
StorageWrite, WriteItem,
},
plan::PlanError,
status::{MappingUid, StatusVersions},
},
};
pub fn resolve_conflicts<S, R>(
operations: S,
resolver: R,
) -> impl Stream<Item = Result<Operation, PlanError>>
where
S: Stream<Item = Result<Operation, PlanError>>,
R: ConflictResolver + Clone + 'static,
{
operations.then(move |result| {
let resolver = resolver.clone();
async move {
match result {
Ok(operation) => match operation {
Operation::Item(ItemOp::Conflict {
info, mapping_uid, ..
}) => match mapping_uid.resolve().await {
Ok(uid) => Ok(Operation::Item(resolver.resolve_item(info, uid))),
Err(err) => Err(PlanError::from(err)),
},
Operation::Property(PropertyOp {
kind: PropertyOpKind::Conflict { value_a, value_b },
property,
mapping,
mapping_uid,
..
}) => match mapping_uid.resolve().await {
Ok(uid) => Ok(Operation::Property(
resolver.resolve_property(property, value_a, value_b, mapping, uid),
)),
Err(err) => Err(PlanError::from(err)),
},
op => Ok(op),
},
Err(e) => Err(e),
}
}
})
}
#[derive(PartialEq, Debug, Clone)]
pub struct ConflictInfo {
pub a: ItemWithData,
pub b: ItemWithData,
pub old: Option<StatusVersions>,
pub collection_a: Href,
pub collection_b: Href,
}
pub trait ConflictResolver: Send + Sync {
fn resolve_item(&self, conflict: ConflictInfo, mapping_uid: MappingUid) -> ItemOp;
fn resolve_property(
&self,
property: Property,
value_a: String,
value_b: String,
mapping: Arc<ResolvedMapping>,
mapping_uid: MappingUid,
) -> PropertyOp;
}
#[derive(Debug, Clone, Copy)]
pub struct KeepSideResolver(pub Side);
impl ConflictResolver for KeepSideResolver {
fn resolve_item(&self, conflict: ConflictInfo, mapping_uid: MappingUid) -> ItemOp {
let (source, target_version, target_side) = match self.0 {
Side::A => (conflict.a, conflict.b.state.version, Side::B),
Side::B => (conflict.b, conflict.a.state.version, Side::A),
};
let (storage_write, status_write) = match conflict.old {
Some(old) => (
StorageWrite::Update {
target: old.for_side(target_side).clone(),
},
StatusWrite::Update { old },
),
None => (
StorageWrite::Update {
target: target_version,
},
StatusWrite::Insert,
),
};
ItemOp::Write(WriteItem {
source: source.into(),
target_side,
storage_write,
status_write,
mapping_uid: MappingUidSource::Immediate(mapping_uid),
on_complete: None,
})
}
fn resolve_property(
&self,
property: Property,
value_a: String,
value_b: String,
mapping: Arc<ResolvedMapping>,
mapping_uid: MappingUid,
) -> PropertyOp {
let (value, side) = match self.0 {
Side::A => (value_a, Side::B),
Side::B => (value_b, Side::A),
};
PropertyOp {
property,
mapping,
mapping_uid: MappingUidSource::Immediate(mapping_uid),
on_complete: None,
kind: PropertyOpKind::Write { value, side },
}
}
}
#[cfg(test)]
mod test {
use std::{str::FromStr, sync::Arc};
use futures_util::stream::StreamExt;
use tempfile::Builder;
use crate::{
CollectionId, ItemKind,
sync::{
Side,
declare::{OnEmpty, StoragePair, SyncedCollection},
operation::{CollectionOp, ItemOp, Operation},
plan::Plan,
},
vdir::VdirStorage,
};
#[tokio::test]
async fn test_stream_no_mappings() {
let dir_a = Builder::new().prefix("vstorage").tempdir().unwrap();
let dir_b = Builder::new().prefix("vstorage").tempdir().unwrap();
let storage_a = Arc::new(
VdirStorage::builder(dir_a.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let storage_b = Arc::from(
VdirStorage::builder(dir_b.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let pair = StoragePair::new(storage_a.clone(), storage_b.clone());
let stream = Plan::new(pair, None).await.unwrap();
let operations: Vec<_> = stream.collect::<Vec<_>>().await;
assert_eq!(operations.len(), 0);
}
#[tokio::test]
async fn test_stream_simple_mapping() {
let dir_a = Builder::new().prefix("vstorage").tempdir().unwrap();
let dir_b = Builder::new().prefix("vstorage").tempdir().unwrap();
let storage_a = Arc::new(
VdirStorage::builder(dir_a.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let storage_b = Arc::from(
VdirStorage::builder(dir_b.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let collection = CollectionId::from_str("test").unwrap();
let pair = StoragePair::new(storage_a.clone(), storage_b.clone())
.with_mapping(SyncedCollection::direct(collection));
let stream = Plan::new(pair, None).await.unwrap();
let operations: Vec<_> = stream.collect::<Vec<_>>().await;
assert!(!operations.is_empty());
let first = operations.first().unwrap();
assert!(matches!(first, Ok(Operation::Collection(_))));
}
#[tokio::test]
async fn test_collection_creation_before_items() {
let dir_a = Builder::new().prefix("vstorage").tempdir().unwrap();
let dir_b = Builder::new().prefix("vstorage").tempdir().unwrap();
let storage_a = Arc::new(
VdirStorage::builder(dir_a.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let storage_b = Arc::from(
VdirStorage::builder(dir_b.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let collection_id = CollectionId::from_str("test").unwrap();
let collection_path = dir_a.path().join("test");
std::fs::create_dir(&collection_path).unwrap();
std::fs::write(
collection_path.join("item.ics"),
[
"BEGIN:VCALENDAR",
"VERSION:2.0",
"BEGIN:VEVENT",
"UID:test-item",
"DTSTART:20240101T120000Z",
"END:VEVENT",
"END:VCALENDAR",
"",
]
.join("\r\n"),
)
.unwrap();
let pair = StoragePair::new(storage_a.clone(), storage_b.clone())
.with_mapping(SyncedCollection::direct(collection_id));
let stream = Plan::new(pair, None).await.unwrap();
let operations: Vec<_> = stream.collect::<Vec<_>>().await;
let mut collection_create_pos = None;
let mut first_item_pos = None;
for (i, op_result) in operations.iter().enumerate() {
if let Ok(op) = op_result {
match op {
Operation::Collection(
CollectionOp::CreateInOne { .. } | CollectionOp::CreateInBoth { .. },
) => {
assert!(
collection_create_pos.replace(i).is_none(),
"more than one collection creation event"
);
}
Operation::Item(ItemOp::Write(w)) if w.target_side == Side::B => {
assert!(
first_item_pos.replace(i).is_none(),
"more than one item write event"
);
}
_ => {}
}
}
}
let coll_pos = collection_create_pos.unwrap();
let item_pos = first_item_pos.unwrap();
assert!(coll_pos < item_pos);
}
#[tokio::test]
async fn test_collection_deletion_after_items() {
use crate::sync::status::StatusDatabase;
let dir_a = Builder::new().prefix("vstorage").tempdir().unwrap();
let dir_b = Builder::new().prefix("vstorage").tempdir().unwrap();
let db_path = Builder::new().prefix("vstorage").tempdir().unwrap();
let storage_a = Arc::new(
VdirStorage::builder(dir_a.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let storage_b = Arc::from(
VdirStorage::builder(dir_b.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let collection_id = CollectionId::from_str("test").unwrap();
let collection_path = dir_b.path().join("test");
std::fs::create_dir(&collection_path).unwrap();
std::fs::write(
collection_path.join("item.ics"),
[
"BEGIN:VCALENDAR",
"VERSION:2.0",
"BEGIN:VEVENT",
"UID:test-item",
"DTSTART:20240101T120000Z",
"END:VEVENT",
"END:VCALENDAR",
"",
]
.join("\r\n"),
)
.unwrap();
let status_path = db_path.path().join("status.db");
let status = Arc::new(StatusDatabase::open_or_create(&status_path).unwrap());
let pair = StoragePair::new(storage_a.clone(), storage_b.clone())
.with_mapping(SyncedCollection::direct(collection_id.clone()))
.on_empty(OnEmpty::Sync);
{
use crate::sync::execute::Executor;
let mut stream = Plan::new(pair.clone(), Some(status.clone())).await.unwrap();
let executor = Executor::new(storage_a.clone(), storage_b.clone(), status.clone());
while let Some(op_result) = stream.next().await {
let op = op_result.unwrap();
executor.execute_operation(op).await.unwrap().unwrap();
}
}
std::fs::remove_dir_all(&collection_path).unwrap();
let stream = Plan::new(pair, Some(status)).await.unwrap();
let operations: Vec<_> = stream.collect::<Vec<_>>().await;
let mut first_item_delete_pos = None;
let mut collection_delete_pos = None;
for (i, op_result) in operations.iter().enumerate() {
if let Ok(op) = op_result {
match op {
Operation::Item(ItemOp::Delete(d))
if d.side == Side::A && first_item_delete_pos.is_none() =>
{
first_item_delete_pos = Some(i);
}
Operation::Collection(CollectionOp::Delete { .. })
if collection_delete_pos.is_none() =>
{
collection_delete_pos = Some(i);
}
_ => {}
}
}
}
let item_pos = first_item_delete_pos.unwrap();
let coll_pos = collection_delete_pos.unwrap();
assert!(
item_pos < coll_pos,
"Item deletion (pos {item_pos}) must come before collection deletion (pos {coll_pos})"
);
}
}