use std::sync::Arc;
use log::warn;
use crate::{
property::Property,
sync::{
items::{ItemWithData, SideState},
mapping::ResolvedMapping,
mode::Mode,
operation::StatusAction,
status::{ItemPairStatus, StatusVersions},
},
};
use super::{
conflict::ConflictInfo,
operation::{
DeleteItem, ItemOp, MappingUidSource, PropertyOp, PropertyOpKind, StatusOnly, StatusWrite,
StorageWrite, WriteItem, resource_name,
},
ordering::{DeletionBarrier, DeletionCompletionHandle},
status::{MappingUid, Side},
};
pub struct TwoWaySync;
impl Mode for TwoWaySync {
fn decide_item_action(
&self,
side_a: Option<SideState>,
side_b: Option<SideState>,
previous: Option<ItemPairStatus>,
mapping: &Arc<ResolvedMapping>,
mapping_uid_source: MappingUidSource,
on_complete: Option<&DeletionBarrier>,
) -> Option<ItemOp> {
let on_complete_handle = on_complete.map(DeletionBarrier::completion_handle);
match previous {
None => Some(Self::plan_new_item(
side_a,
side_b,
mapping,
mapping_uid_source,
on_complete_handle,
)),
Some(status) => {
Self::plan_existing_item(side_a, side_b, status, mapping, on_complete_handle)
}
}
}
fn decide_property_action(
&self,
property: Property,
value_a: Option<String>,
value_b: Option<String>,
previous: Option<String>,
mapping: &Arc<ResolvedMapping>,
mapping_uid: MappingUid,
on_complete: Option<&DeletionBarrier>,
) -> Option<PropertyOp> {
let on_complete = on_complete.map(DeletionBarrier::completion_handle);
let mapping = mapping.clone();
let mapping_uid = MappingUidSource::Immediate(mapping_uid);
let kind = match (value_a, value_b, previous) {
(None, None, None) => unreachable!("property must exist somewhere"),
(None, None, Some(_)) => PropertyOpKind::ClearStatus,
(Some(value_a), None, None) => PropertyOpKind::Write {
value: value_a,
side: Side::B,
},
(Some(_), None, Some(_)) => PropertyOpKind::Delete { side: Side::A },
(None, Some(value_b), None) => PropertyOpKind::Write {
value: value_b,
side: Side::A,
},
(None, Some(_), Some(_)) => PropertyOpKind::Delete { side: Side::B },
(Some(value_a), Some(value_b), previous) => {
if value_a == value_b {
return match previous {
Some(prev) if prev == value_a => None, _ => Some(PropertyOp {
property,
mapping,
mapping_uid,
on_complete,
kind: PropertyOpKind::UpdateStatus { value: value_a },
}),
};
}
match previous {
Some(prev) if prev == value_a => PropertyOpKind::Write {
value: value_b,
side: Side::A,
},
Some(prev) if prev == value_b => PropertyOpKind::Write {
value: value_a,
side: Side::B,
},
_ => PropertyOpKind::Conflict { value_a, value_b },
}
}
};
Some(PropertyOp {
property,
mapping,
mapping_uid,
on_complete,
kind,
})
}
}
impl TwoWaySync {
fn plan_new_item(
side_a: Option<SideState>,
side_b: Option<SideState>,
mapping: &Arc<ResolvedMapping>,
mapping_uid_source: MappingUidSource,
on_complete: Option<DeletionCompletionHandle>,
) -> ItemOp {
match (side_a, side_b) {
(None, None) => {
unreachable!("item must exist on at least one side")
}
(Some(a @ SideState::Changed { .. }), None) => {
let name = resource_name(&a.state().version.href);
ItemOp::Write(WriteItem {
source: a.into(),
target_side: Side::B,
storage_write: StorageWrite::Create {
collection: mapping.b().href().clone(),
resource_name: Some(name),
},
status_write: StatusWrite::Insert,
mapping_uid: mapping_uid_source,
on_complete,
})
}
(None, Some(b @ SideState::Changed { .. })) => {
let name = resource_name(&b.state().version.href);
ItemOp::Write(WriteItem {
source: b.into(),
target_side: Side::A,
storage_write: StorageWrite::Create {
collection: mapping.a().href().clone(),
resource_name: Some(name),
},
status_write: StatusWrite::Insert,
mapping_uid: mapping_uid_source,
on_complete,
})
}
(
Some(SideState::Changed {
state: a,
data: data_a,
}),
Some(SideState::Changed {
state: b,
data: data_b,
}),
) => {
if a.hash == b.hash {
ItemOp::StatusOnly(StatusOnly {
action: StatusAction::Insert {
uid: a.uid.clone(),
hash: a.hash.clone(),
versions: StatusVersions {
a: a.version,
b: b.version,
},
},
mapping_uid: mapping_uid_source,
on_complete,
})
} else {
ItemOp::Conflict {
info: ConflictInfo {
a: ItemWithData {
state: a,
data: data_a,
},
b: ItemWithData {
state: b,
data: data_b,
},
old: None,
collection_a: mapping.a().href().clone(),
collection_b: mapping.b().href().clone(),
},
mapping: mapping.clone(),
mapping_uid: mapping_uid_source,
on_complete,
}
}
}
(Some(SideState::Unchanged { .. }), _) | (_, Some(SideState::Unchanged { .. })) => {
unreachable!("new item cannot have Unchanged state")
}
}
}
fn plan_existing_item(
side_a: Option<SideState>,
side_b: Option<SideState>,
status: ItemPairStatus,
mapping: &Arc<ResolvedMapping>,
on_complete: Option<DeletionCompletionHandle>,
) -> Option<ItemOp> {
let mapping_uid = MappingUidSource::Immediate(status.mapping_uid);
match (side_a, side_b) {
(None, None) => Some(ItemOp::StatusOnly(StatusOnly {
action: StatusAction::Clear { uid: status.uid },
mapping_uid,
on_complete,
})),
(Some(a), None) => Some(Self::plan_only_one_side(
a,
Side::A,
status,
mapping,
on_complete,
)),
(None, Some(b)) => Some(Self::plan_only_one_side(
b,
Side::B,
status,
mapping,
on_complete,
)),
(Some(a), Some(b)) => Self::plan_both(a, b, status, mapping, on_complete),
}
}
fn plan_only_one_side(
present: SideState,
present_side: Side,
status: ItemPairStatus,
mapping: &Arc<ResolvedMapping>,
on_complete: Option<DeletionCompletionHandle>,
) -> ItemOp {
let mapping_uid = MappingUidSource::Immediate(status.mapping_uid);
let absent_side = present_side.opposite();
match present {
present @ SideState::Changed { .. } if present.state().hash != status.hash => {
warn!(
"Item deleted in {} but changed in {}: {}.",
absent_side,
present_side,
present.state().uid
);
let name = resource_name(&present.state().version.href);
ItemOp::Write(WriteItem {
source: present.into(),
target_side: absent_side,
storage_write: StorageWrite::Create {
collection: mapping.for_side(absent_side).href().clone(),
resource_name: Some(name),
},
status_write: StatusWrite::Update { old: status.into() },
mapping_uid,
on_complete,
})
}
SideState::Changed { state, .. } | SideState::Unchanged { state } => {
ItemOp::Delete(DeleteItem {
uid: state.uid.clone(),
target: state.version,
side: present_side,
mapping_uid,
on_complete,
})
}
}
}
fn plan_both(
a: SideState,
b: SideState,
status: ItemPairStatus,
mapping: &Arc<ResolvedMapping>,
on_complete: Option<DeletionCompletionHandle>,
) -> Option<ItemOp> {
let mapping_uid = MappingUidSource::Immediate(status.mapping_uid);
let old: StatusVersions = status.into();
match (a, b) {
(SideState::Unchanged { state: a }, SideState::Unchanged { state: b }) => {
if a.version == old.a && b.version == old.b {
return None; }
Some(ItemOp::StatusOnly(StatusOnly {
action: StatusAction::Update {
hash: a.hash.clone(),
old,
new: StatusVersions {
a: a.version,
b: b.version,
},
},
mapping_uid,
on_complete,
}))
}
(a @ SideState::Changed { .. }, SideState::Unchanged { .. }) => {
let target = old.b.clone();
Some(ItemOp::Write(WriteItem {
source: a.into(),
target_side: Side::B,
storage_write: StorageWrite::Update { target },
status_write: StatusWrite::Update { old },
mapping_uid,
on_complete,
}))
}
(SideState::Unchanged { .. }, b @ SideState::Changed { .. }) => {
let target = old.a.clone();
Some(ItemOp::Write(WriteItem {
source: b.into(),
target_side: Side::A,
storage_write: StorageWrite::Update { target },
status_write: StatusWrite::Update { old },
mapping_uid,
on_complete,
}))
}
(
SideState::Changed {
state: a,
data: data_a,
},
SideState::Changed {
state: b,
data: data_b,
},
) => {
if a.hash == b.hash {
Some(ItemOp::StatusOnly(StatusOnly {
action: StatusAction::Update {
hash: a.hash.clone(),
old,
new: StatusVersions {
a: a.version,
b: b.version,
},
},
mapping_uid,
on_complete,
}))
} else {
Some(ItemOp::Conflict {
info: ConflictInfo {
a: ItemWithData {
state: a,
data: data_a,
},
b: ItemWithData {
state: b,
data: data_b,
},
old: Some(old),
collection_a: mapping.a().href().clone(),
collection_b: mapping.b().href().clone(),
},
mapping: mapping.clone(),
mapping_uid,
on_complete,
})
}
}
}
}
}