vstorage 0.6.0

Common API for various icalendar/vcard storages.
Documentation
// Copyright 2023-2026 Hugo Osvaldo Barrera
//
// SPDX-License-Identifier: EUPL-1.2

//! Plan for a synchronisation.
//!
//! Handles the generation of synchronisation operations.
//! See also: [`crate::sync::analysis`].

mod collection;
pub(crate) mod items;
mod mapping;
mod property;
mod stream;

use std::{future::Future, pin::Pin, sync::Arc, task::Context, task::Poll};

use futures_util::Stream;

use crate::{
    Href,
    sync::{
        operation::Operation,
        ordering::CompletionDroppedError,
        status::{FindStaleMappingsError, Side, StatusDatabase, StatusError},
    },
};

use self::{mapping::create_mappings_for_pair, stream::PlanStreamState};
use super::declare::StoragePair;

pub use stream::resolve_conflicts;

/// Error that occurs when planning synchronisation operations.
#[derive(thiserror::Error, Debug)]
pub enum PlanError {
    /// Conflicting mapping haves been defined.
    ///
    /// Two (or more) collections on one side would be synchronised to the same collection on the
    /// other side. The `Side` and `Href` parameters refer to the collection that has multiple
    /// counterparts.
    #[error("Conflicting mappings on side {0} for href {1}.")]
    ConflictingMappings(Side, Href),

    /// Discovering collections on storage A failed.
    #[error("Discovery failed for storage A: {0}")]
    DiscoveryFailedA(#[source] crate::Error),

    /// Discovering collections on storage B failed.
    #[error("Discovery failed for storage B: {0}")]
    DiscoveryFailedB(#[source] crate::Error),

    /// Error occurred interacting with a storage.
    #[error("Interacting with underlying storage: {0}")]
    Storage(#[from] crate::Error),

    /// Error occurred reading the status database.
    #[error("Querying status database: {0}")]
    StatusDb(#[from] StatusError),

    /// Error querying status database for stale mappings.
    #[error("Finding stale mappings: {0}")]
    FindStaleMappings(#[from] FindStaleMappingsError),

    /// Completion handle dropped before signaling.
    #[error("Completion handle dropped before signaling: {0}")]
    CompletionDropped(#[from] CompletionDroppedError),
}

// Type alias for readability.
type NextFuture = Pin<
    Box<
        dyn Future<Output = Option<(Result<Operation, PlanError>, PlanStreamState)>>
            + Send
            + 'static,
    >,
>;

/// Stream generating actions that would synchronise a pair of storages.
///
/// In order to inspect the stream or export it into a human-readable format, collect it into
/// memory first.
///
/// Operations are streamed in order:
///
/// 1. Collection operations (create/update).
/// 2. Item operation.
/// 3. Property operations.
/// 4. Collection deletions (after items cleared).
///
/// It is safe to run operations concurrently; operations contain handles to ensure dependant
/// operations wait for others before running.
pub struct Plan {
    state: Option<PlanStreamState>,
    pending: Option<NextFuture>,
}

impl Plan {
    /// Create a new plan for a given storage pair.
    ///
    /// # Errors
    ///
    /// Returns an error if collection discovery or mapping resolution fails.
    pub async fn new(
        pair: StoragePair,
        status: Option<Arc<StatusDatabase>>,
    ) -> Result<Plan, PlanError> {
        let mappings = create_mappings_for_pair(&pair).await?;

        let mut stale_mappings = None;
        if let Some(ref status) = status {
            let mut active_uids = Vec::new();
            for mapping in &mappings {
                if let Ok(Some(uid)) =
                    status.get_mapping_uid(mapping.a().href(), mapping.b().href())
                {
                    active_uids.push(uid);
                }
            }

            let stale = status.find_stale_mappings(active_uids.into_iter())?;
            if !stale.is_empty() {
                stale_mappings = Some(stale);
            }
        }

        let state = PlanStreamState {
            pair,
            status,
            mappings: mappings.into_iter(),
            current_collection: None,
            stale_mappings,
        };

        Ok(Plan {
            state: Some(state),
            pending: None,
        })
    }
}

impl Stream for Plan {
    type Item = Result<Operation, PlanError>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.as_mut().get_mut();

        loop {
            if let Some(ref mut fut) = this.pending {
                return match fut.as_mut().poll(cx) {
                    Poll::Ready(Some((result, new_state))) => {
                        this.pending = None;
                        this.state = Some(new_state);
                        Poll::Ready(Some(result))
                    }
                    Poll::Ready(None) => {
                        this.pending = None;
                        this.state = None;
                        Poll::Ready(None)
                    }
                    Poll::Pending => Poll::Pending,
                };
            }

            if let Some(state) = this.state.take() {
                this.pending = Some(Box::pin(state.next_operation()));
                // Next loop polls future.
            } else {
                return Poll::Ready(None); // Done.
            }
        }
    }
}

#[cfg(test)]
mod test {
    use std::{str::FromStr, sync::Arc};

    use tempfile::Builder;

    use crate::{
        CollectionId, ItemKind,
        base::Storage,
        sync::declare::{CollectionDescription, StoragePair, SyncedCollection},
        vdir::VdirStorage,
    };

    use super::{PlanError, create_mappings_for_pair};

    #[tokio::test]
    async fn test_plan_duplicate_mapping() {
        let dir_a = Builder::new().prefix("vstorage").tempdir().unwrap();
        let dir_b = Builder::new().prefix("vstorage").tempdir().unwrap();

        let storage_a = Arc::new(
            VdirStorage::builder(dir_a.path().to_path_buf().try_into().unwrap())
                .unwrap()
                .build(ItemKind::Calendar),
        );
        let storage_b = Arc::from(
            VdirStorage::builder(dir_b.path().to_path_buf().try_into().unwrap())
                .unwrap()
                .build(ItemKind::Calendar),
        );

        // Duplicate mapping
        let collection = CollectionId::from_str("test").unwrap();
        let pair = StoragePair::new(storage_a.clone(), storage_b.clone())
            .with_mapping(SyncedCollection::direct(collection.clone()))
            .with_mapping(SyncedCollection::direct(collection));

        let err = create_mappings_for_pair(&pair).await.unwrap_err();
        assert!(matches!(err, PlanError::ConflictingMappings(..)));
    }

    #[tokio::test]
    async fn test_plan_conflicting_mapping() {
        let dir_a = Builder::new().prefix("vstorage").tempdir().unwrap();
        let dir_b = Builder::new().prefix("vstorage").tempdir().unwrap();

        let storage_a = Arc::new(
            VdirStorage::builder(dir_a.path().to_path_buf().try_into().unwrap())
                .unwrap()
                .build(ItemKind::Calendar),
        );
        let storage_b = Arc::from(
            VdirStorage::builder(dir_b.path().to_path_buf().try_into().unwrap())
                .unwrap()
                .build(ItemKind::Calendar),
        );
        // This sync has duplicate items.
        let collection = CollectionId::from_str("test").unwrap();
        let pair = StoragePair::new(storage_a.clone(), storage_b.clone())
            .with_mapping(SyncedCollection::direct(collection.clone()))
            .with_mapping(SyncedCollection::Mapped {
                alias: "test".to_string(),
                a: CollectionDescription::Id { id: collection },
                b: CollectionDescription::Id {
                    id: CollectionId::from_str("test_2").unwrap(),
                },
            });

        let err = create_mappings_for_pair(&pair).await.unwrap_err();
        assert!(matches!(err, PlanError::ConflictingMappings(..)));
    }

    #[tokio::test]
    async fn test_plan_same_from_both_sides() {
        let dir_a = Builder::new().prefix("vstorage").tempdir().unwrap();
        let dir_b = Builder::new().prefix("vstorage").tempdir().unwrap();

        let storage_a = Arc::new(
            VdirStorage::builder(dir_a.path().to_path_buf().try_into().unwrap())
                .unwrap()
                .build(ItemKind::Calendar),
        );
        let storage_b = Arc::from(
            VdirStorage::builder(dir_b.path().to_path_buf().try_into().unwrap())
                .unwrap()
                .build(ItemKind::Calendar),
        );
        // `from_a` and `from_b` with collection existing on both sides.
        // This particular scenario is special-cased.
        std::fs::create_dir(dir_a.path().join("one")).unwrap();
        std::fs::create_dir(dir_b.path().join("one")).unwrap();

        let disco = storage_a.discover_collections().await.unwrap();
        assert_eq!(disco.collections().len(), 1);

        let pair = StoragePair::new(storage_a.clone(), storage_b.clone())
            .with_all_from_a()
            .with_all_from_b();

        let mappings = create_mappings_for_pair(&pair).await.unwrap();
        // When the same collection exists on both sides and we discover from both,
        // it should result in one mapping (not duplicated).
        assert_eq!(mappings.len(), 1);
    }
}