mod collection;
pub(crate) mod items;
mod mapping;
mod property;
mod stream;
use std::{future::Future, pin::Pin, sync::Arc, task::Context, task::Poll};
use futures_util::Stream;
use crate::{
Href,
sync::{
operation::Operation,
ordering::CompletionDroppedError,
status::{FindStaleMappingsError, Side, StatusDatabase, StatusError},
},
};
use self::{mapping::create_mappings_for_pair, stream::PlanStreamState};
use super::declare::StoragePair;
pub use stream::resolve_conflicts;
#[derive(thiserror::Error, Debug)]
pub enum PlanError {
#[error("Conflicting mappings on side {0} for href {1}.")]
ConflictingMappings(Side, Href),
#[error("Discovery failed for storage A: {0}")]
DiscoveryFailedA(#[source] crate::Error),
#[error("Discovery failed for storage B: {0}")]
DiscoveryFailedB(#[source] crate::Error),
#[error("Interacting with underlying storage: {0}")]
Storage(#[from] crate::Error),
#[error("Querying status database: {0}")]
StatusDb(#[from] StatusError),
#[error("Finding stale mappings: {0}")]
FindStaleMappings(#[from] FindStaleMappingsError),
#[error("Completion handle dropped before signaling: {0}")]
CompletionDropped(#[from] CompletionDroppedError),
}
type NextFuture = Pin<
Box<
dyn Future<Output = Option<(Result<Operation, PlanError>, PlanStreamState)>>
+ Send
+ 'static,
>,
>;
pub struct Plan {
state: Option<PlanStreamState>,
pending: Option<NextFuture>,
}
impl Plan {
pub async fn new(
pair: StoragePair,
status: Option<Arc<StatusDatabase>>,
) -> Result<Plan, PlanError> {
let mappings = create_mappings_for_pair(&pair).await?;
let mut stale_mappings = None;
if let Some(ref status) = status {
let mut active_uids = Vec::new();
for mapping in &mappings {
if let Ok(Some(uid)) =
status.get_mapping_uid(mapping.a().href(), mapping.b().href())
{
active_uids.push(uid);
}
}
let stale = status.find_stale_mappings(active_uids.into_iter())?;
if !stale.is_empty() {
stale_mappings = Some(stale);
}
}
let state = PlanStreamState {
pair,
status,
mappings: mappings.into_iter(),
current_collection: None,
stale_mappings,
};
Ok(Plan {
state: Some(state),
pending: None,
})
}
}
impl Stream for Plan {
type Item = Result<Operation, PlanError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.as_mut().get_mut();
loop {
if let Some(ref mut fut) = this.pending {
return match fut.as_mut().poll(cx) {
Poll::Ready(Some((result, new_state))) => {
this.pending = None;
this.state = Some(new_state);
Poll::Ready(Some(result))
}
Poll::Ready(None) => {
this.pending = None;
this.state = None;
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
};
}
if let Some(state) = this.state.take() {
this.pending = Some(Box::pin(state.next_operation()));
} else {
return Poll::Ready(None); }
}
}
}
#[cfg(test)]
mod test {
use std::{str::FromStr, sync::Arc};
use tempfile::Builder;
use crate::{
CollectionId, ItemKind,
base::Storage,
sync::declare::{CollectionDescription, StoragePair, SyncedCollection},
vdir::VdirStorage,
};
use super::{PlanError, create_mappings_for_pair};
#[tokio::test]
async fn test_plan_duplicate_mapping() {
let dir_a = Builder::new().prefix("vstorage").tempdir().unwrap();
let dir_b = Builder::new().prefix("vstorage").tempdir().unwrap();
let storage_a = Arc::new(
VdirStorage::builder(dir_a.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let storage_b = Arc::from(
VdirStorage::builder(dir_b.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let collection = CollectionId::from_str("test").unwrap();
let pair = StoragePair::new(storage_a.clone(), storage_b.clone())
.with_mapping(SyncedCollection::direct(collection.clone()))
.with_mapping(SyncedCollection::direct(collection));
let err = create_mappings_for_pair(&pair).await.unwrap_err();
assert!(matches!(err, PlanError::ConflictingMappings(..)));
}
#[tokio::test]
async fn test_plan_conflicting_mapping() {
let dir_a = Builder::new().prefix("vstorage").tempdir().unwrap();
let dir_b = Builder::new().prefix("vstorage").tempdir().unwrap();
let storage_a = Arc::new(
VdirStorage::builder(dir_a.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let storage_b = Arc::from(
VdirStorage::builder(dir_b.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let collection = CollectionId::from_str("test").unwrap();
let pair = StoragePair::new(storage_a.clone(), storage_b.clone())
.with_mapping(SyncedCollection::direct(collection.clone()))
.with_mapping(SyncedCollection::Mapped {
alias: "test".to_string(),
a: CollectionDescription::Id { id: collection },
b: CollectionDescription::Id {
id: CollectionId::from_str("test_2").unwrap(),
},
});
let err = create_mappings_for_pair(&pair).await.unwrap_err();
assert!(matches!(err, PlanError::ConflictingMappings(..)));
}
#[tokio::test]
async fn test_plan_same_from_both_sides() {
let dir_a = Builder::new().prefix("vstorage").tempdir().unwrap();
let dir_b = Builder::new().prefix("vstorage").tempdir().unwrap();
let storage_a = Arc::new(
VdirStorage::builder(dir_a.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let storage_b = Arc::from(
VdirStorage::builder(dir_b.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
std::fs::create_dir(dir_a.path().join("one")).unwrap();
std::fs::create_dir(dir_b.path().join("one")).unwrap();
let disco = storage_a.discover_collections().await.unwrap();
assert_eq!(disco.collections().len(), 1);
let pair = StoragePair::new(storage_a.clone(), storage_b.clone())
.with_all_from_a()
.with_all_from_b();
let mappings = create_mappings_for_pair(&pair).await.unwrap();
assert_eq!(mappings.len(), 1);
}
}