use std::sync::Arc;
use log::debug;
use crate::{
ErrorKind, Href,
base::Storage,
disco::{DiscoveredCollection, Discovery},
sync::{
analysis::{ResolvedCollection, ResolvedMapping},
declare::{CollectionDescription, StoragePair, SyncedCollection},
status::Side,
},
};
use super::PlanError;
impl ResolvedMapping {
pub(crate) async fn from_declared_mapping(
declared: &SyncedCollection,
storage_a: &dyn Storage,
storage_b: &dyn Storage,
disco_a: &Discovery,
disco_b: &Discovery,
) -> Result<Self, crate::Error> {
match declared {
SyncedCollection::Direct { description } => Ok(ResolvedMapping {
alias: description.alias(),
a: ResolvedCollection::from_declaration(description, disco_a, storage_a).await?,
b: ResolvedCollection::from_declaration(description, disco_b, storage_b).await?,
}),
SyncedCollection::Mapped { a, b, alias } => Ok(ResolvedMapping {
alias: alias.clone(),
a: ResolvedCollection::from_declaration(a, disco_a, storage_a).await?,
b: ResolvedCollection::from_declaration(b, disco_b, storage_b).await?,
}),
}
}
}
impl ResolvedCollection {
async fn from_declaration(
declared: &CollectionDescription,
discovery: &Discovery,
storage: &dyn Storage,
) -> Result<ResolvedCollection, crate::Error> {
match declared {
CollectionDescription::Id { id } => {
if let Some(collection) = discovery.find_collection_by_id(id) {
Ok(ResolvedCollection {
id: Some(id.clone()),
href: collection.href().to_string(),
exists: true,
})
} else {
Ok(ResolvedCollection {
id: Some(id.clone()),
href: storage.href_for_collection_id(id)?,
exists: false,
})
}
}
CollectionDescription::Href { href } => {
let id = discovery
.collections()
.iter()
.find(|c| c.href() == *href)
.map(|c| c.id().clone());
let exists = id.is_some() || collection_exists(storage, href).await?;
Ok(ResolvedCollection {
href: href.clone(),
id,
exists,
})
}
}
}
}
async fn collection_exists(storage: &dyn Storage, href: &str) -> Result<bool, crate::Error> {
match storage.list_items(href).await {
Ok(_) => Ok(true),
Err(e) => {
if e.kind == ErrorKind::DoesNotExist || e.kind == ErrorKind::AccessDenied {
Ok(false)
} else {
Err(e)
}
}
}
}
fn resolve_mapping_counterpart(
source_collection: &DiscoveredCollection,
target_discovery: &Discovery,
target_storage: &dyn Storage,
) -> Result<ResolvedCollection, PlanError> {
let id = source_collection.id();
match target_discovery.find_collection_by_id(id) {
Some(c) => Ok(ResolvedCollection {
href: c.href().to_string(),
id: Some(id.clone()),
exists: true,
}),
None => Ok(ResolvedCollection {
id: Some(id.clone()),
href: target_storage.href_for_collection_id(id)?,
exists: false,
}),
}
}
pub(crate) async fn create_mappings_for_pair(
pair: &StoragePair,
) -> Result<Vec<Arc<ResolvedMapping>>, PlanError> {
let mut mappings = Vec::<Arc<ResolvedMapping>>::with_capacity(pair.mappings.len());
let (disco_a, disco_b) = tokio::join!(
pair.storage_a.discover_collections(),
pair.storage_b.discover_collections(),
);
let disco_a = disco_a.map_err(PlanError::DiscoveryFailedA)?;
let disco_b = disco_b.map_err(PlanError::DiscoveryFailedA)?;
for mapping in pair.mappings.iter() {
mappings.push(Arc::new(
ResolvedMapping::from_declared_mapping(
mapping,
pair.storage_a.as_ref(),
pair.storage_b.as_ref(),
&disco_a,
&disco_b,
)
.await?,
));
}
if pair.all_from_a {
debug!("Adding collections from a");
mappings.reserve(disco_a.collection_count());
for collection in disco_a.collections() {
mappings.push(Arc::new(ResolvedMapping {
alias: format!("id:{}", collection.id()),
a: ResolvedCollection {
href: collection.href().to_string(),
id: Some(collection.id().clone()),
exists: true,
},
b: resolve_mapping_counterpart(collection, &disco_b, pair.storage_b().as_ref())?,
}));
}
}
if pair.all_from_b {
debug!("Adding collections from b");
mappings.reserve(disco_b.collection_count());
for collection in disco_b.collections() {
let mapping = Arc::new(ResolvedMapping {
alias: format!("id:{}", collection.id()),
a: resolve_mapping_counterpart(collection, &disco_a, pair.storage_a().as_ref())?,
b: ResolvedCollection {
href: collection.href().to_string(),
id: Some(collection.id().clone()),
exists: true,
},
});
if mappings.iter().any(|m| m.as_ref() == mapping.as_ref()) {
debug!("Skipping mapping; already present.");
} else {
mappings.push(mapping);
}
}
}
check_for_duplicate_mappings(&mappings)?;
Ok(mappings)
}
fn check_for_duplicate_mappings(mappings: &[Arc<ResolvedMapping>]) -> Result<(), PlanError> {
let mut seen = Vec::<(&Href, &Href)>::new();
for mapping in mappings {
if let Some(conflict) = seen.iter().find_map(|s| {
if s.0 == &mapping.a.href {
Some(PlanError::ConflictingMappings(Side::A, s.0.clone()))
} else if s.1 == &mapping.b.href {
Some(PlanError::ConflictingMappings(Side::B, s.1.clone()))
} else {
None
}
}) {
return Err(conflict);
}
seen.push((&mapping.a.href, &mapping.b.href));
}
Ok(())
}