use std::{borrow::Cow, sync::Arc};
use log::{debug, error, warn};
use crate::{
CollectionId, Href,
base::{CreateItemOptions, ItemVersion, Storage},
disco::DiscoveredCollection,
sync::{
operation::{DeleteItem, StatusAction, StatusOnly, StatusWrite, StorageWrite, WriteItem},
ordering::CompletionDroppedError,
},
};
use super::{
error::SyncError,
mapping::ResolvedMapping,
operation::{CollectionOp, ItemOp, Operation, PropertyOp, PropertyOpKind},
plan::PlanError,
status::{MappingUid, Side, StatusDatabase, StatusError},
};
pub struct Executor {
storage_a: Arc<dyn Storage>,
storage_b: Arc<dyn Storage>,
status: Arc<StatusDatabase>,
concurrency: usize,
}
impl Executor {
pub fn new(
storage_a: Arc<dyn Storage>,
storage_b: Arc<dyn Storage>,
status: Arc<StatusDatabase>,
) -> Executor {
Executor {
storage_a,
storage_b,
status,
concurrency: 4,
}
}
#[must_use]
pub fn with_concurrency(mut self, concurrency: usize) -> Self {
self.concurrency = concurrency;
self
}
async fn create_item(
&self,
storage: &dyn Storage,
collection: &Href,
data: &crate::base::Item,
resource_name: Option<Href>,
op: Operation,
) -> Result<ItemVersion, SyncError> {
if let Some(name) = resource_name {
let opts = CreateItemOptions {
resource_name: Some(name),
};
match storage.create_item(collection, data, opts).await {
Ok(v) => return Ok(v),
Err(e)
if e.kind == crate::ErrorKind::Exists
|| e.kind == crate::ErrorKind::InvalidInput =>
{
warn!("Creation with preferred name failed: {e}");
}
Err(e) => return Err(SyncError::new(op, ExecutionError::Storage(e))),
}
}
match storage
.create_item(collection, data, CreateItemOptions::default())
.await
{
Ok(v) => Ok(v),
Err(e) => Err(SyncError::new(op, ExecutionError::Storage(e))),
}
}
async fn update_item(
&self,
storage: &dyn Storage,
href: &Href,
etag: &crate::Etag,
data: &crate::base::Item,
op: Operation,
) -> Result<crate::Etag, SyncError> {
match storage.update_item(href, etag, data).await {
Ok(etag) => Ok(etag),
Err(e) => Err(SyncError::new(op, ExecutionError::Storage(e))),
}
}
pub async fn execute_operation(
&self,
operation: Operation,
) -> Result<Result<(), SyncError>, StatusError> {
match operation {
Operation::FlushStaleMappings { stale_uids } => {
self.status.flush_stale_mappings(stale_uids).map(Ok)
}
Operation::Collection(op) => self.execute_collection_op(op).await,
Operation::Item(op) => self.execute_item_op(op).await,
Operation::Property(op) => self.execute_property_op(op).await,
}
}
async fn execute_collection_op(
&self,
op: CollectionOp,
) -> Result<Result<(), SyncError>, StatusError> {
match &op {
CollectionOp::SaveToStatus {
mapping,
completion,
} => {
let uid = self
.status
.get_or_add_collection(mapping.a().href(), mapping.b().href())?;
if completion.complete(uid).is_err() {
error!("Failed to signal completion; all waiters dropped!?");
}
Ok(Ok(()))
}
CollectionOp::CreateInOne {
mapping,
side,
completion,
} => {
let storage_a = self.storage_a.as_ref();
let storage_b = self.storage_b.as_ref();
let storage = match side {
Side::A => storage_a,
Side::B => storage_b,
};
match create_collection(storage, &self.status, mapping, *side).await? {
Ok(uid) => {
if completion.complete(uid).is_err() {
error!("Failed to signal completion; all waiters dropped!?");
}
Ok(Ok(()))
}
Err(err) => Ok(Err(SyncError::new(Operation::Collection(op), err))),
}
}
CollectionOp::CreateInBoth {
mapping,
completion,
} => {
match create_both_collections(
self.storage_a.as_ref(),
self.storage_b.as_ref(),
mapping,
&self.status,
)
.await?
{
Ok(uid) => {
if completion.complete(uid).is_err() {
error!("Failed to signal completion; all waiters dropped!?");
}
Ok(Ok(()))
}
Err(err) => Ok(Err(SyncError::new(Operation::Collection(op), err))),
}
}
CollectionOp::Delete {
mapping,
mapping_uid,
side,
wait_for_items,
} => {
wait_for_items.wait().await;
let (storage, href) = match side {
Side::A => (self.storage_a.as_ref(), mapping.a().href()),
Side::B => (self.storage_b.as_ref(), mapping.b().href()),
};
if let Err(err) =
delete_collection(href, &self.status, storage, *mapping_uid).await?
{
return Ok(Err(SyncError::new(Operation::Collection(op), err)));
}
Ok(Ok(()))
}
CollectionOp::StoreSyncToken {
mapping_uid,
side,
token,
wait_for_items,
} => {
let resolved_uid = match mapping_uid.clone().resolve().await {
Ok(uid) => uid,
Err(err) => {
return Ok(Err(SyncError::new(Operation::Collection(op), err.into())));
}
};
wait_for_items.wait().await;
self.status.set_sync_token(resolved_uid, *side, token)?;
Ok(Ok(()))
}
}
}
async fn execute_item_op(&self, mut op: ItemOp) -> Result<Result<(), SyncError>, StatusError> {
debug!("Executing item operation for UID: {:?}", op.uid());
let _on_complete = match &mut op {
ItemOp::StatusOnly(data) => data.on_complete.take(),
ItemOp::Write(data) => data.on_complete.take(),
ItemOp::Delete(data) => data.on_complete.take(),
ItemOp::Conflict { on_complete, .. } => on_complete.take(),
};
match op {
ItemOp::StatusOnly(data) => self.execute_status_action(data).await,
ItemOp::Write(data) => self.execute_write(data).await,
ItemOp::Delete(data) => self.execute_delete(data).await,
ItemOp::Conflict {
info,
mapping,
mapping_uid,
on_complete,
} => Ok(Err(SyncError::new(
Operation::Item(ItemOp::Conflict {
info,
mapping,
mapping_uid,
on_complete,
}),
ExecutionError::Conflict,
))),
}
}
async fn execute_property_op(
&self,
mut op: PropertyOp,
) -> Result<Result<(), SyncError>, StatusError> {
let _on_complete = op.on_complete.take();
if matches!(op.kind, PropertyOpKind::Conflict { .. }) {
return Ok(Err(SyncError::new(
Operation::Property(op),
ExecutionError::Conflict,
)));
}
let href_a = op.mapping.a().href();
let href_b = op.mapping.b().href();
let mapping_uid = match op.mapping_uid.clone().resolve().await {
Ok(uid) => uid,
Err(err) => {
return Ok(Err(SyncError::new(Operation::Property(op), err.into())));
}
};
let storage_a = self.storage_a.as_ref();
let storage_b = self.storage_b.as_ref();
match &op.kind {
PropertyOpKind::Write { value, side } => {
let (storage, href) = match side {
Side::A => (storage_a, href_a),
Side::B => (storage_b, href_b),
};
if let Err(err) = storage.set_property(href, op.property, value).await {
return Ok(Err(SyncError::new(Operation::Property(op), err.into())));
}
}
PropertyOpKind::Delete { side } => {
let (storage, href) = match side {
Side::A => (storage_a, href_a),
Side::B => (storage_b, href_b),
};
if let Err(err) = storage.unset_property(href, op.property).await {
return Ok(Err(SyncError::new(Operation::Property(op), err.into())));
}
}
_ => {}
}
match &op.kind {
PropertyOpKind::Write { value, .. } | PropertyOpKind::UpdateStatus { value } => {
self.status
.set_property(mapping_uid, op.property.name(), value)?;
Ok(Ok(()))
}
PropertyOpKind::Delete { .. } | PropertyOpKind::ClearStatus => {
self.status
.delete_property(mapping_uid, op.property.name())?;
Ok(Ok(()))
}
PropertyOpKind::Conflict { .. } => unreachable!(),
}
}
async fn execute_status_action(
&self,
data: StatusOnly,
) -> Result<Result<(), SyncError>, StatusError> {
let mapping_uid = match data.mapping_uid.clone().resolve().await {
Ok(uid) => uid,
Err(err) => {
return Ok(Err(SyncError::new(
Operation::Item(ItemOp::StatusOnly(data)),
err.into(),
)));
}
};
match data.action {
StatusAction::Insert {
uid,
hash,
versions,
} => self
.status
.insert_item(mapping_uid, &uid, &hash, &versions.a, &versions.b)
.map(Ok),
StatusAction::Update { hash, old, new } => self
.status
.update_item(&hash, &old.a, &old.b, &new.a, &new.b)
.map(Ok),
StatusAction::Clear { uid } => self.status.delete_item(mapping_uid, &uid).map(Ok),
}
}
async fn execute_write(&self, data: WriteItem) -> Result<Result<(), SyncError>, StatusError> {
let (source_storage, target_storage) = match data.target_side {
Side::A => (self.storage_b.as_ref(), self.storage_a.as_ref()),
Side::B => (self.storage_a.as_ref(), self.storage_b.as_ref()),
};
debug!(
"Writing item to {} from {}",
data.target_side, data.source.state.version.href
);
let item_data = match &data.source.data {
Some(d) => Cow::Borrowed(d),
None => match source_storage
.get_item(&data.source.state.version.href)
.await
{
Ok((item, _etag)) => Cow::Owned(item),
Err(e) => {
return Ok(Err(SyncError::new(
Operation::Item(data.clone().into()),
ExecutionError::Storage(e),
)));
}
},
};
let source_ver = data.source.state.version.clone();
let mapping_uid = match data.mapping_uid.clone().resolve().await {
Ok(uid) => uid,
Err(err) => {
return Ok(Err(SyncError::new(
Operation::Item(data.into()),
err.into(),
)));
}
};
let op = Operation::Item(data.clone().into());
let target_ver = match data.storage_write {
StorageWrite::Create {
collection,
resource_name,
} => match self
.create_item(
target_storage,
&collection,
&item_data,
resource_name,
op.clone(),
)
.await
{
Ok(v) => Some(v),
Err(e) => return Ok(Err(e)),
},
StorageWrite::Update { target } => match self
.update_item(
target_storage,
&target.href,
&target.etag,
&item_data,
op.clone(),
)
.await
{
Ok(etag) => Some(ItemVersion::new(target.href, etag)),
Err(e) => return Ok(Err(e)),
},
};
let Some(target_ver) = target_ver else {
return Ok(Ok(()));
};
let (a, b) = ordered_versions(data.target_side, &target_ver, &source_ver);
match data.status_write {
StatusWrite::Insert => self
.status
.insert_item(mapping_uid, &item_data.ident(), &item_data.hash(), a, b)
.map(Ok),
StatusWrite::Update { ref old } => self
.status
.update_item(&item_data.hash(), &old.a, &old.b, a, b)
.map(Ok),
}
}
async fn execute_delete(&self, data: DeleteItem) -> Result<Result<(), SyncError>, StatusError> {
let storage = match data.side {
Side::A => self.storage_a.as_ref(),
Side::B => self.storage_b.as_ref(),
};
debug!("Deleting from {}: {}", data.side, data.target.href);
match storage
.delete_item(&data.target.href, &data.target.etag)
.await
{
Ok(()) => {
let mapping_uid = match data.mapping_uid.clone().resolve().await {
Ok(uid) => uid,
Err(err) => {
return Ok(Err(SyncError::new(
Operation::Item(data.into()),
err.into(),
)));
}
};
self.status.delete_item(mapping_uid, &data.uid).map(Ok)
}
Err(err) => Ok(Err(SyncError::new(
Operation::Item(data.into()),
ExecutionError::Storage(err),
))),
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum ExecutionError {
#[error(transparent)]
Storage(#[from] crate::Error),
#[error("created collection {1} on side {0} has unexpected id, it has: {2:?}")]
IdMismatch(Side, Href, Option<CollectionId>),
#[error("planning error: {0}")]
Planning(#[from] PlanError),
#[error("dependant collection task failed")]
DependantFailed(#[from] CompletionDroppedError),
#[error("resource is in conflict")]
Conflict,
}
pub type ExecutionResult = Result<Result<(), ExecutionError>, StatusError>;
async fn delete_collection(
href: &Href,
status: &StatusDatabase,
storage: &dyn Storage,
mapping_uid: MappingUid,
) -> ExecutionResult {
match storage.delete_collection(href).await {
Ok(()) => Ok(Ok(status.remove_collection(mapping_uid)?)),
Err(err) => Ok(Err(ExecutionError::Storage(err))),
}
}
async fn create_collection(
storage: &dyn Storage,
status: &StatusDatabase,
mapping: &ResolvedMapping,
side: Side,
) -> Result<Result<MappingUid, ExecutionError>, StatusError> {
let (target, existing) = match side {
Side::A => (mapping.a(), mapping.b()),
Side::B => (mapping.b(), mapping.a()),
};
let new = match storage.create_collection(target.href()).await {
Ok(c) => c,
Err(err) => return Ok(Err(ExecutionError::Storage(err))),
};
if new.href() != target.href() {
warn!(
"Created collection has href {}, expected {}.",
new.href(),
target.href()
);
}
if let Err(err) = check_id_matches_expected(target.id(), storage, new.href(), side).await {
return Ok(Err(err));
}
let mapping_uid = match side {
Side::A => status.add_collection(new.href(), existing.href()),
Side::B => status.add_collection(existing.href(), new.href()),
}?;
Ok(Ok(mapping_uid))
}
async fn create_both_collections(
storage_a: &dyn Storage,
storage_b: &dyn Storage,
mapping: &ResolvedMapping,
status: &StatusDatabase,
) -> Result<Result<MappingUid, ExecutionError>, StatusError> {
let href_a = mapping.a().href();
let href_b = mapping.b().href();
let id_a = mapping.a().id();
let id_b = mapping.b().id();
let new_a = match storage_a.create_collection(href_a).await {
Ok(c) => c,
Err(err) => return Ok(Err(ExecutionError::Storage(err))),
};
if let Err(err) = check_id_matches_expected(id_a, storage_a, new_a.href(), Side::A).await {
return Ok(Err(err));
}
let new_b = match storage_b.create_collection(href_b).await {
Ok(c) => c,
Err(err) => return Ok(Err(ExecutionError::Storage(err))),
};
if let Err(err) = check_id_matches_expected(id_b, storage_b, new_b.href(), Side::B).await {
return Ok(Err(err));
}
Ok(Ok(status.get_or_add_collection(href_a, href_b)?))
}
fn ordered_versions<'a>(
target_side: Side,
target_ver: &'a ItemVersion,
source_ver: &'a ItemVersion,
) -> (&'a ItemVersion, &'a ItemVersion) {
match target_side {
Side::A => (target_ver, source_ver),
Side::B => (source_ver, target_ver),
}
}
async fn check_id_matches_expected(
expected_id: Option<&CollectionId>,
storage: &dyn Storage,
collection: &Href,
side: Side,
) -> Result<(), ExecutionError> {
if let Some(expected_id) = expected_id {
let disco = storage.discover_collections().await?;
let created = disco.collections().iter().find(|c| c.href() == collection);
let created_id = created.map(DiscoveredCollection::id);
if created_id != Some(expected_id) {
return Err(ExecutionError::IdMismatch(
side,
collection.clone(),
created_id.cloned(),
));
}
}
Ok(())
}