mod collection;
pub mod incremental;
pub(crate) mod items;
mod mapping;
use std::{pin::Pin, sync::Arc, task::Context, task::Poll};
use futures_util::Stream;
use tokio::sync::mpsc;
use crate::{
Href,
sync::{
Side,
declare::StoragePair,
mapping::ResolvedMapping,
operation::Operation,
ordering::CompletionDroppedError,
plan::{
collection::{GenerateError, generate_collection_operations},
mapping::create_mappings_for_pair,
},
status::{FindStaleMappingsError, MappingUid, StatusDatabase, StatusError},
},
};
pub use incremental::IncrementalPlan;
#[derive(thiserror::Error, Debug)]
pub enum PlanError {
#[error("Conflicting mappings on side {0} for href {1}.")]
ConflictingMappings(Side, Href),
#[error("Discovery failed for storage A: {0}")]
DiscoveryFailedA(#[source] crate::Error),
#[error("Discovery failed for storage B: {0}")]
DiscoveryFailedB(#[source] crate::Error),
#[error("Interacting with underlying storage: {0}")]
Storage(#[from] crate::Error),
#[error("Querying status database: {0}")]
StatusDb(#[from] StatusError),
#[error("Finding stale mappings: {0}")]
FindStaleMappings(#[from] FindStaleMappingsError),
#[error("Completion handle dropped before signaling: {0}")]
CompletionDropped(#[from] CompletionDroppedError),
}
pub struct Plan(mpsc::Receiver<Result<Operation, PlanError>>);
impl Plan {
pub async fn new(
pair: StoragePair,
status: Option<Arc<StatusDatabase>>,
) -> Result<Plan, PlanError> {
let mappings = create_mappings_for_pair(&pair).await?;
let rx = Plan::receiver(pair, status, mappings)?;
Ok(Plan(rx))
}
pub(self) fn receiver(
pair: StoragePair,
status: Option<Arc<StatusDatabase>>,
mappings: Vec<Arc<ResolvedMapping>>,
) -> Result<mpsc::Receiver<Result<Operation, PlanError>>, PlanError> {
let mut stale_mappings = None;
if let Some(ref status) = status {
let mut active_uids = Vec::new();
for mapping in &mappings {
if let Ok(Some(uid)) =
status.get_mapping_uid(mapping.a().href(), mapping.b().href())
{
active_uids.push(uid);
}
}
let stale = status.find_stale_mappings(active_uids.into_iter())?;
if !stale.is_empty() {
stale_mappings = Some(stale);
}
}
let (tx, rx) = mpsc::channel(4);
tokio::spawn(run_plan_generator(
tx,
pair,
status,
mappings,
stale_mappings,
));
Ok(rx)
}
}
async fn run_plan_generator(
tx: mpsc::Sender<Result<Operation, PlanError>>,
pair: StoragePair,
status: Option<Arc<StatusDatabase>>,
mappings: Vec<Arc<ResolvedMapping>>,
stale_mappings: Option<Vec<MappingUid>>,
) {
if let Some(stale_uids) = stale_mappings {
let op = Operation::FlushStaleMappings { stale_uids };
if tx.send(Ok(op)).await.is_err() {
return;
}
}
for mapping in mappings {
match generate_collection_operations(&tx, &pair, mapping, status.as_deref()).await {
Ok(()) => {} Err(GenerateError::Plan(e)) => {
if tx.send(Err(e)).await.is_err() {
return;
}
}
Err(GenerateError::ChannelClosed) => return,
}
}
}
impl Stream for Plan {
type Item = Result<Operation, PlanError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.0.poll_recv(cx)
}
}
#[cfg(test)]
mod test {
use std::{str::FromStr, sync::Arc};
use tempfile::Builder;
use crate::{
CollectionId, ItemKind,
base::Storage,
sync::declare::{CollectionDescription, StoragePair, SyncedCollection},
vdir::VdirStorage,
};
use super::{PlanError, create_mappings_for_pair};
#[tokio::test]
async fn test_plan_duplicate_mapping() {
let dir_a = Builder::new().prefix("vstorage").tempdir().unwrap();
let dir_b = Builder::new().prefix("vstorage").tempdir().unwrap();
let storage_a = Arc::new(
VdirStorage::builder(dir_a.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let storage_b = Arc::from(
VdirStorage::builder(dir_b.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let collection = CollectionId::from_str("test").unwrap();
let pair = StoragePair::new(storage_a.clone(), storage_b.clone())
.with_mapping(SyncedCollection::direct(collection.clone()))
.with_mapping(SyncedCollection::direct(collection));
let err = create_mappings_for_pair(&pair).await.unwrap_err();
assert!(matches!(err, PlanError::ConflictingMappings(..)));
}
#[tokio::test]
async fn test_plan_conflicting_mapping() {
let dir_a = Builder::new().prefix("vstorage").tempdir().unwrap();
let dir_b = Builder::new().prefix("vstorage").tempdir().unwrap();
let storage_a = Arc::new(
VdirStorage::builder(dir_a.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let storage_b = Arc::from(
VdirStorage::builder(dir_b.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let collection = CollectionId::from_str("test").unwrap();
let pair = StoragePair::new(storage_a.clone(), storage_b.clone())
.with_mapping(SyncedCollection::direct(collection.clone()))
.with_mapping(SyncedCollection::Mapped {
alias: "test".to_string(),
a: CollectionDescription::Id { id: collection },
b: CollectionDescription::Id {
id: CollectionId::from_str("test_2").unwrap(),
},
});
let err = create_mappings_for_pair(&pair).await.unwrap_err();
assert!(matches!(err, PlanError::ConflictingMappings(..)));
}
#[tokio::test]
async fn test_plan_same_from_both_sides() {
let dir_a = Builder::new().prefix("vstorage").tempdir().unwrap();
let dir_b = Builder::new().prefix("vstorage").tempdir().unwrap();
let storage_a = Arc::new(
VdirStorage::builder(dir_a.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
let storage_b = Arc::from(
VdirStorage::builder(dir_b.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar),
);
std::fs::create_dir(dir_a.path().join("one")).unwrap();
std::fs::create_dir(dir_b.path().join("one")).unwrap();
let disco = storage_a.discover_collections().await.unwrap();
assert_eq!(disco.collections().len(), 1);
let pair = StoragePair::new(storage_a.clone(), storage_b.clone())
.with_all_from_a()
.with_all_from_b();
let mappings = create_mappings_for_pair(&pair).await.unwrap();
assert_eq!(mappings.len(), 1);
}
}