use std::{borrow::Cow, sync::Arc};
use futures_util::stream::{Stream, StreamExt, TryStreamExt};
use log::{debug, error, warn};
use crate::{
CollectionId, Href,
base::{CreateItemOptions, ItemVersion, Storage},
disco::DiscoveredCollection,
sync::{
operation::{DeleteItem, StatusOnly, StatusWrite, StorageWrite, WriteItem},
ordering::CompletionDroppedError,
},
};
use super::{
analysis::{ResolvedMapping, StatusAction},
error::SyncError,
operation::{CollectionOp, ItemOp, Operation, PropertyOp, PropertyOpKind},
plan::PlanError,
status::{MappingUid, Side, StatusDatabase, StatusError},
};
enum StreamError {
Planning(PlanError),
Status(StatusError),
}
impl From<StatusError> for StreamError {
fn from(e: StatusError) -> Self {
StreamError::Status(e)
}
}
pub struct Executor {
on_error: Box<dyn Fn(SyncError) + Send + Sync>,
concurrency: usize,
}
impl Executor {
pub fn new(on_error: impl Fn(SyncError) + Send + Sync + 'static) -> Executor {
Executor {
on_error: Box::new(on_error),
concurrency: 4,
}
}
#[must_use]
pub fn with_concurrency(mut self, concurrency: usize) -> Self {
self.concurrency = concurrency;
self
}
async fn create_item(
&self,
storage: &dyn Storage,
collection: &Href,
data: &crate::base::Item,
resource_name: Option<Href>,
op: Operation,
) -> Option<ItemVersion> {
if let Some(name) = resource_name {
let opts = CreateItemOptions {
resource_name: Some(name),
};
match storage.create_item(collection, data, opts).await {
Ok(v) => return Some(v),
Err(e)
if e.kind == crate::ErrorKind::Exists
|| e.kind == crate::ErrorKind::InvalidInput =>
{
warn!("Creation with preferred name failed: {e}");
}
Err(e) => {
(self.on_error)(SyncError::new(op, ExecutionError::Storage(e)));
return None;
}
}
}
match storage
.create_item(collection, data, CreateItemOptions::default())
.await
{
Ok(v) => Some(v),
Err(e) => {
(self.on_error)(SyncError::new(op, ExecutionError::Storage(e)));
None
}
}
}
async fn update_item(
&self,
storage: &dyn Storage,
href: &Href,
etag: &crate::Etag,
data: &crate::base::Item,
op: Operation,
) -> Option<crate::Etag> {
match storage.update_item(href, etag, data).await {
Ok(etag) => Some(etag),
Err(e) => {
{
(self.on_error)(SyncError::new(op, ExecutionError::Storage(e)));
};
None
}
}
}
pub async fn execute_stream(
&self,
storage_a: Arc<dyn Storage>,
storage_b: Arc<dyn Storage>,
operations: impl Stream<Item = Result<Operation, PlanError>>,
status: &StatusDatabase,
) -> ExecutionResult {
let a_storage: &dyn Storage = &*storage_a;
let b_storage: &dyn Storage = &*storage_b;
let result = operations
.map(|item| item.map_err(StreamError::Planning))
.try_for_each_concurrent(self.concurrency, |operation| async {
self.execute_operation(operation, a_storage, b_storage, status)
.await
.map_err(StreamError::Status)
})
.await;
match result {
Ok(()) => Ok(Ok(())),
Err(StreamError::Planning(plan_err)) => Ok(Err(ExecutionError::Planning(plan_err))),
Err(StreamError::Status(status_err)) => Err(status_err),
}
}
async fn execute_operation(
&self,
operation: Operation,
storage_a: &dyn Storage,
storage_b: &dyn Storage,
status: &StatusDatabase,
) -> Result<(), StatusError> {
match operation {
Operation::FlushStaleMappings { stale_uids } => status.flush_stale_mappings(stale_uids),
Operation::Collection(op) => {
self.execute_collection_op(op, storage_a, storage_b, status)
.await
}
Operation::Item(op) => self.execute_item_op(op, storage_a, storage_b, status).await,
Operation::Property(op) => {
self.execute_property_op(op, storage_a, storage_b, status)
.await
}
}
}
async fn execute_collection_op(
&self,
op: CollectionOp,
storage_a: &dyn Storage,
storage_b: &dyn Storage,
status: &StatusDatabase,
) -> Result<(), StatusError> {
match &op {
CollectionOp::SaveToStatus {
mapping,
completion,
} => {
let uid = status.get_or_add_collection(mapping.a().href(), mapping.b().href())?;
if completion.complete(uid).is_err() {
error!("Failed to signal completion; all waiters dropped!?");
}
Ok(())
}
CollectionOp::CreateInOne {
mapping,
side,
completion,
} => {
let storage = match side {
Side::A => storage_a,
Side::B => storage_b,
};
match create_collection(storage, status, mapping, *side).await? {
Ok(uid) => {
if completion.complete(uid).is_err() {
error!("Failed to signal completion; all waiters dropped!?");
}
Ok(())
}
Err(err) => {
(self.on_error)(SyncError::new(Operation::Collection(op), err));
Ok(())
}
}
}
CollectionOp::CreateInBoth {
mapping,
completion,
} => match create_both_collections(storage_a, storage_b, mapping, status).await? {
Ok(uid) => {
if completion.complete(uid).is_err() {
error!("Failed to signal completion; all waiters dropped!?");
}
Ok(())
}
Err(err) => {
(self.on_error)(SyncError::new(Operation::Collection(op), err));
Ok(())
}
},
CollectionOp::Delete {
mapping,
mapping_uid,
side,
wait_for_items,
} => {
wait_for_items.wait().await;
let (storage, href) = match side {
Side::A => (storage_a, mapping.a().href()),
Side::B => (storage_b, mapping.b().href()),
};
if let Err(err) = delete_collection(href, status, storage, *mapping_uid).await? {
(self.on_error)(SyncError::new(Operation::Collection(op), err));
}
Ok(())
}
CollectionOp::StoreSyncToken {
mapping_uid,
side,
token,
wait_for_items,
} => {
let resolved_uid = match mapping_uid.clone().resolve().await {
Ok(uid) => uid,
Err(err) => {
(self.on_error)(SyncError::new(Operation::Collection(op), err.into()));
return Ok(());
}
};
wait_for_items.wait().await;
status.set_sync_token(resolved_uid, *side, token)?;
Ok(())
}
}
}
async fn execute_item_op(
&self,
mut op: ItemOp,
storage_a: &dyn Storage,
storage_b: &dyn Storage,
status: &StatusDatabase,
) -> Result<(), StatusError> {
debug!("Executing item operation for UID: {:?}", op.uid());
let _on_complete = match &mut op {
ItemOp::StatusOnly(data) => data.on_complete.take(),
ItemOp::Write(data) => data.on_complete.take(),
ItemOp::Delete(data) => data.on_complete.take(),
ItemOp::Conflict { on_complete, .. } => on_complete.take(),
};
match op {
ItemOp::StatusOnly(data) => self.execute_status_action(data, status).await,
ItemOp::Write(data) => self.execute_write(data, storage_a, storage_b, status).await,
ItemOp::Delete(data) => {
self.execute_delete(data, storage_a, storage_b, status)
.await
}
ItemOp::Conflict {
info,
mapping,
mapping_uid,
on_complete,
} => {
(self.on_error)(SyncError::new(
Operation::Item(ItemOp::Conflict {
info,
mapping,
mapping_uid,
on_complete,
}),
ExecutionError::Conflict,
));
Ok(())
}
}
}
async fn execute_property_op(
&self,
mut op: PropertyOp,
storage_a: &dyn Storage,
storage_b: &dyn Storage,
status: &StatusDatabase,
) -> Result<(), StatusError> {
let _on_complete = op.on_complete.take();
if matches!(op.kind, PropertyOpKind::Conflict { .. }) {
error!("Conflict for property {}. Skipping.", op.property.name());
return Ok(());
}
let href_a = op.mapping.a().href();
let href_b = op.mapping.b().href();
let mapping_uid = match op.mapping_uid.clone().resolve().await {
Ok(uid) => uid,
Err(err) => {
(self.on_error)(SyncError::new(Operation::Property(op), err.into()));
return Ok(());
}
};
match &op.kind {
PropertyOpKind::Write { value, side } => {
let (storage, href) = match side {
Side::A => (storage_a, href_a),
Side::B => (storage_b, href_b),
};
if let Err(err) = storage.set_property(href, op.property, value).await {
(self.on_error)(SyncError::new(Operation::Property(op), err.into()));
return Ok(());
}
}
PropertyOpKind::Delete { side } => {
let (storage, href) = match side {
Side::A => (storage_a, href_a),
Side::B => (storage_b, href_b),
};
if let Err(err) = storage.unset_property(href, op.property).await {
(self.on_error)(SyncError::new(Operation::Property(op), err.into()));
return Ok(());
}
}
_ => {}
}
match &op.kind {
PropertyOpKind::Write { value, .. } | PropertyOpKind::UpdateStatus { value } => {
status.set_property(mapping_uid, href_a, href_b, op.property.name(), value)
}
PropertyOpKind::Delete { .. } | PropertyOpKind::ClearStatus => {
status.delete_property(mapping_uid, href_a, href_b, op.property.name())
}
PropertyOpKind::Conflict { .. } => unreachable!(),
}
}
async fn execute_status_action(
&self,
data: StatusOnly,
status: &'_ StatusDatabase,
) -> Result<(), StatusError> {
let mapping_uid = match data.mapping_uid.clone().resolve().await {
Ok(uid) => uid,
Err(err) => {
(self.on_error)(SyncError::new(
Operation::Item(ItemOp::StatusOnly(data)),
err.into(),
));
return Ok(());
}
};
match data.action {
StatusAction::Insert {
uid,
hash,
versions,
} => status.insert_item(mapping_uid, &uid, &hash, &versions.a, &versions.b),
StatusAction::Update { hash, old, new } => {
status.update_item(&hash, &old.a, &old.b, &new.a, &new.b)
}
StatusAction::Clear { uid } => status.delete_item(mapping_uid, &uid),
}
}
async fn execute_write(
&self,
data: WriteItem,
storage_a: &dyn Storage,
storage_b: &dyn Storage,
status: &StatusDatabase,
) -> Result<(), StatusError> {
let (source_storage, target_storage) = match data.target_side {
Side::A => (storage_b, storage_a),
Side::B => (storage_a, storage_b),
};
debug!(
"Writing item to {} from {}",
data.target_side, data.source.state.version.href
);
let item_data = match &data.source.data {
Some(d) => Cow::Borrowed(d),
None => match source_storage
.get_item(&data.source.state.version.href)
.await
{
Ok((item, _etag)) => Cow::Owned(item),
Err(e) => {
(self.on_error)(SyncError::new(
Operation::Item(data.clone().into()),
ExecutionError::Storage(e),
));
return Ok(());
}
},
};
let source_ver = data.source.to_item_ver();
let mapping_uid = match data.mapping_uid.clone().resolve().await {
Ok(uid) => uid,
Err(err) => {
(self.on_error)(SyncError::new(Operation::Item(data.into()), err.into()));
return Ok(());
}
};
let op = Operation::Item(data.clone().into());
let target_ver = match data.storage_write {
StorageWrite::Create {
collection,
resource_name,
} => {
self.create_item(target_storage, &collection, &item_data, resource_name, op)
.await
}
StorageWrite::Update { target } => self
.update_item(target_storage, &target.href, &target.etag, &item_data, op)
.await
.map(|etag| ItemVersion::new(target.href, etag)),
};
let Some(target_ver) = target_ver else {
return Ok(());
};
let (a, b) = ordered_versions(data.target_side, &target_ver, &source_ver);
match data.status_write {
StatusWrite::Insert => {
status.insert_item(mapping_uid, &item_data.ident(), &item_data.hash(), a, b)
}
StatusWrite::Update { ref old } => {
status.update_item(&item_data.hash(), &old.a, &old.b, a, b)
}
}
}
async fn execute_delete(
&self,
data: DeleteItem,
storage_a: &dyn Storage,
storage_b: &dyn Storage,
status: &StatusDatabase,
) -> Result<(), StatusError> {
let storage = match data.side {
Side::A => storage_a,
Side::B => storage_b,
};
debug!("Deleting from {}: {}", data.side, data.target.href);
match storage
.delete_item(&data.target.href, &data.target.etag)
.await
{
Ok(()) => {
let mapping_uid = match data.mapping_uid.clone().resolve().await {
Ok(uid) => uid,
Err(err) => {
(self.on_error)(SyncError::new(Operation::Item(data.into()), err.into()));
return Ok(());
}
};
status.delete_item(mapping_uid, &data.uid)
}
Err(err) => {
(self.on_error)(SyncError::new(
Operation::Item(data.into()),
ExecutionError::Storage(err),
));
Ok(())
}
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum ExecutionError {
#[error(transparent)]
Storage(#[from] crate::Error),
#[error("created collection {1} on side {0} does not have the expected id, it has: {2:?}")]
IdMismatch(Side, Href, Option<CollectionId>),
#[error("planning error: {0}")]
Planning(#[from] PlanError),
#[error("dependant collection task failed")]
DependantFailed(#[from] CompletionDroppedError),
#[error("resource is in conflict")]
Conflict,
}
pub type ExecutionResult = Result<Result<(), ExecutionError>, StatusError>;
async fn delete_collection(
href: &Href,
status: &StatusDatabase,
storage: &dyn Storage,
mapping_uid: MappingUid,
) -> ExecutionResult {
match storage.delete_collection(href).await {
Ok(()) => Ok(Ok(status.remove_collection(mapping_uid)?)),
Err(err) => Ok(Err(ExecutionError::Storage(err))),
}
}
async fn create_collection(
storage: &dyn Storage,
status: &StatusDatabase,
mapping: &ResolvedMapping,
side: Side,
) -> Result<Result<MappingUid, ExecutionError>, StatusError> {
let (target, existing) = match side {
Side::A => (mapping.a(), mapping.b()),
Side::B => (mapping.b(), mapping.a()),
};
let new = match storage.create_collection(target.href()).await {
Ok(c) => c,
Err(err) => return Ok(Err(ExecutionError::Storage(err))),
};
if new.href() != target.href() {
warn!(
"Created collection has href {}, expected {}.",
new.href(),
target.href()
);
}
if let Err(err) = check_id_matches_expected(target.id(), storage, new.href(), side).await {
return Ok(Err(err));
}
let mapping_uid = match side {
Side::A => status.add_collection(new.href(), existing.href()),
Side::B => status.add_collection(existing.href(), new.href()),
}?;
Ok(Ok(mapping_uid))
}
async fn create_both_collections(
storage_a: &dyn Storage,
storage_b: &dyn Storage,
mapping: &ResolvedMapping,
status: &StatusDatabase,
) -> Result<Result<MappingUid, ExecutionError>, StatusError> {
let href_a = mapping.a().href();
let href_b = mapping.b().href();
let id_a = mapping.a().id();
let id_b = mapping.b().id();
let new_a = match storage_a.create_collection(href_a).await {
Ok(c) => c,
Err(err) => return Ok(Err(ExecutionError::Storage(err))),
};
if let Err(err) = check_id_matches_expected(id_a, storage_a, new_a.href(), Side::A).await {
return Ok(Err(err));
}
let new_b = match storage_b.create_collection(href_b).await {
Ok(c) => c,
Err(err) => return Ok(Err(ExecutionError::Storage(err))),
};
if let Err(err) = check_id_matches_expected(id_b, storage_b, new_b.href(), Side::B).await {
return Ok(Err(err));
}
Ok(Ok(status.get_or_add_collection(href_a, href_b)?))
}
fn ordered_versions<'a>(
target_side: Side,
target_ver: &'a ItemVersion,
source_ver: &'a ItemVersion,
) -> (&'a ItemVersion, &'a ItemVersion) {
match target_side {
Side::A => (target_ver, source_ver),
Side::B => (source_ver, target_ver),
}
}
async fn check_id_matches_expected(
expected_id: Option<&CollectionId>,
storage: &dyn Storage,
collection: &Href,
side: Side,
) -> Result<(), ExecutionError> {
if let Some(expected_id) = expected_id {
let disco = storage.discover_collections().await?;
let created = disco.collections().iter().find(|c| c.href() == collection);
let created_id = created.map(DiscoveredCollection::id);
if created_id != Some(expected_id) {
return Err(ExecutionError::IdMismatch(
side,
collection.clone(),
created_id.cloned(),
));
}
}
Ok(())
}