use std::collections::HashMap;
use std::sync::Arc;
use log::warn;
use crate::ErrorKind;
use crate::property::Property;
use crate::sync::items::SideState;
use crate::sync::mapping::ResolvedMapping;
use crate::sync::operation::PropertyOp;
use crate::sync::{Mode, Side};
use crate::sync::{
declare::{OnDelete, OnEmpty, StoragePair},
operation::{CollectionOp, MappingUidSource, Operation},
ordering::{DeletionBarrier, completion_pair},
status::{MappingUid, StatusDatabase, StatusError},
};
use tokio::sync::mpsc::Sender;
use super::{PlanError, items::items_for_collection};
#[derive(Debug, thiserror::Error)]
pub(super) enum GenerateError {
#[error(transparent)]
Plan(#[from] PlanError),
#[error("receiver dropped")]
ChannelClosed,
}
impl From<StatusError> for GenerateError {
fn from(e: StatusError) -> Self {
GenerateError::Plan(e.into())
}
}
async fn send_op(
tx: &Sender<Result<Operation, PlanError>>,
op: Operation,
) -> Result<(), GenerateError> {
tx.send(Ok(op))
.await
.map_err(|_| GenerateError::ChannelClosed)
}
#[allow(clippy::too_many_lines)]
pub(super) async fn generate_collection_operations(
tx: &Sender<Result<Operation, PlanError>>,
pair: &StoragePair,
mapping: Arc<ResolvedMapping>,
status: Option<&StatusDatabase>,
) -> Result<(), GenerateError> {
let mapping_uid = status
.map(|s| s.get_mapping_uid(mapping.a().href(), mapping.b().href()))
.transpose()?
.flatten();
let (result_a, result_b) = tokio::try_join!(
items_for_collection(
status,
pair.storage_a().as_ref(),
mapping.a().href(),
Side::A,
mapping_uid
),
items_for_collection(
status,
pair.storage_b().as_ref(),
mapping.b().href(),
Side::B,
mapping_uid
),
)?;
let items_a = result_a.items;
let items_b = result_b.items;
let new_sync_token_a = result_a.new_sync_token;
let new_sync_token_b = result_b.new_sync_token;
let status_uids = match (status, mapping_uid) {
(Some(s), Some(m)) => {
let uids = s.all_uids(m)?;
if !uids.is_empty() && pair.on_empty == OnEmpty::Skip {
if items_a.is_empty() && !items_b.is_empty() {
warn!(
"Collection {} has been emptied on storage a.",
mapping.alias
);
return Ok(());
}
if !items_a.is_empty() && items_b.is_empty() {
warn!(
"Collection {} has been emptied on storage b.",
mapping.alias
);
return Ok(());
}
}
uids
}
_ => Vec::new(),
};
let CollectionPlan {
op,
deletion,
deletion_barrier,
mapping_uid_source,
} = determine_collection_operation(
result_a.exists,
result_b.exists,
mapping_uid,
pair.on_delete,
mapping.clone(),
);
let deletion_barrier = deletion_barrier.or_else(|| {
if new_sync_token_a.is_some() || new_sync_token_b.is_some() {
Some(DeletionBarrier::new())
} else {
None
}
});
let mut items_by_uid = HashMap::<String, (Option<SideState>, Option<SideState>)>::new();
for item in items_a {
let uid = item.state().uid.clone();
items_by_uid.entry(uid).or_default().0 = Some(item);
}
for item in items_b {
let uid = item.state().uid.clone();
items_by_uid.entry(uid).or_default().1 = Some(item);
}
for uid in status_uids {
items_by_uid.entry(uid).or_default();
}
if let Some(op) = op {
send_op(tx, Operation::Collection(op)).await?;
}
for (uid, (side_a, side_b)) in items_by_uid {
let previous = match (status, mapping_uid_source.immediate()) {
(Some(s), Some(m)) => s.get_item_hash_by_uid(m, &uid)?,
_ => None,
};
if let Some(item_op) = pair.mode.decide_item_action(
side_a,
side_b,
previous,
&mapping,
mapping_uid_source.clone(),
deletion_barrier.as_ref(),
) {
send_op(tx, Operation::Item(item_op)).await?;
}
}
let skip_properties = deletion.is_some();
if !skip_properties {
let props = generate_property_operations(
&mapping,
&mapping_uid_source,
pair,
status,
deletion_barrier.as_ref(),
&*pair.mode,
)
.await?;
for op in props {
send_op(tx, Operation::Property(op)).await?;
}
}
if let Some(barrier) = deletion_barrier.as_ref() {
if let Some(token) = new_sync_token_a {
let op = Operation::Collection(CollectionOp::StoreSyncToken {
mapping_uid: mapping_uid_source.clone(),
side: Side::A,
token,
wait_for_items: barrier.wait_handle(),
});
send_op(tx, op).await?;
}
if let Some(token) = new_sync_token_b {
let op = Operation::Collection(CollectionOp::StoreSyncToken {
mapping_uid: mapping_uid_source.clone(),
side: Side::B,
token,
wait_for_items: barrier.wait_handle(),
});
send_op(tx, op).await?;
}
}
if let Some(op) = deletion {
send_op(tx, Operation::Collection(op)).await?;
}
Ok(())
}
struct CollectionPlan {
op: Option<CollectionOp>,
deletion: Option<CollectionOp>,
deletion_barrier: Option<DeletionBarrier>,
mapping_uid_source: MappingUidSource,
}
#[allow(clippy::too_many_lines)]
fn determine_collection_operation(
a_exists: bool,
b_exists: bool,
mapping_uid: Option<MappingUid>,
on_delete: OnDelete,
mapping: Arc<ResolvedMapping>,
) -> CollectionPlan {
match (a_exists, b_exists, mapping_uid) {
(false, false, _) => {
let (completion, wait) = completion_pair();
CollectionPlan {
op: Some(CollectionOp::CreateInBoth {
mapping,
completion,
}),
deletion: None,
deletion_barrier: None,
mapping_uid_source: MappingUidSource::Deferred(wait),
}
}
(true, true, None) => {
let (completion, wait) = completion_pair();
CollectionPlan {
op: Some(CollectionOp::SaveToStatus {
mapping,
completion,
}),
deletion: None,
deletion_barrier: None,
mapping_uid_source: MappingUidSource::Deferred(wait),
}
}
(true, true, Some(m)) => CollectionPlan {
op: None,
deletion: None,
deletion_barrier: None,
mapping_uid_source: MappingUidSource::Immediate(m),
},
(false, true, Some(m)) => {
if on_delete == OnDelete::Skip {
CollectionPlan {
op: None,
deletion: None,
deletion_barrier: None,
mapping_uid_source: MappingUidSource::Immediate(m),
}
} else {
let barrier = DeletionBarrier::new();
let wait_handle = barrier.wait_handle();
CollectionPlan {
op: None,
deletion: Some(CollectionOp::Delete {
mapping,
mapping_uid: m,
side: Side::B,
wait_for_items: wait_handle,
}),
deletion_barrier: Some(barrier),
mapping_uid_source: MappingUidSource::Immediate(m),
}
}
}
(false, true, None) => {
let (completion, wait) = completion_pair();
CollectionPlan {
op: Some(CollectionOp::CreateInOne {
mapping,
side: Side::A,
completion,
}),
deletion: None,
deletion_barrier: None,
mapping_uid_source: MappingUidSource::Deferred(wait),
}
}
(true, false, None) => {
let (completion, wait) = completion_pair();
CollectionPlan {
op: Some(CollectionOp::CreateInOne {
mapping,
side: Side::B,
completion,
}),
deletion: None,
deletion_barrier: None,
mapping_uid_source: MappingUidSource::Deferred(wait),
}
}
(true, false, Some(m)) => {
if on_delete == OnDelete::Skip {
CollectionPlan {
op: None,
deletion: None,
deletion_barrier: None,
mapping_uid_source: MappingUidSource::Immediate(m),
}
} else {
let barrier = DeletionBarrier::new();
let wait_handle = barrier.wait_handle();
CollectionPlan {
op: None,
deletion: Some(CollectionOp::Delete {
mapping,
mapping_uid: m,
side: Side::A,
wait_for_items: wait_handle,
}),
deletion_barrier: Some(barrier),
mapping_uid_source: MappingUidSource::Immediate(m),
}
}
}
}
}
async fn generate_property_operations(
mapping: &Arc<ResolvedMapping>,
mapping_uid: &MappingUidSource,
pair: &StoragePair,
status: Option<&StatusDatabase>,
deletion_completion: Option<&DeletionBarrier>,
mode: &dyn Mode,
) -> Result<Vec<PropertyOp>, PlanError> {
let (props_a, props_b) = match tokio::try_join!(
pair.storage_a().list_properties(mapping.a().href()),
pair.storage_b().list_properties(mapping.b().href()),
) {
Ok((a, b)) => (a, b),
Err(error) => {
if let ErrorKind::DoesNotExist | ErrorKind::Unsupported = error.kind {
return Ok(Vec::new());
}
return Err(error.into());
}
};
let props_status = match (status, mapping_uid) {
(Some(s), MappingUidSource::Immediate(u)) => s.list_properties_for_collection(*u)?,
_ => Vec::new(),
};
let mut all_props =
HashMap::<Property, (Option<String>, Option<String>, Option<String>)>::new();
for p in props_a {
all_props.entry(p.property).or_default().0 = Some(p.value);
}
for p in props_b {
all_props.entry(p.property).or_default().1 = Some(p.value);
}
for p in props_status {
all_props.entry(p.property).or_default().2 = Some(p.value);
}
let mut operations = Vec::new();
for (property, values) in all_props {
if let Some(op) = mode.decide_property_action(
property,
values.0,
values.1,
values.2,
mapping,
mapping_uid,
deletion_completion,
) {
operations.push(op);
}
}
Ok(operations)
}