vstorage 0.7.0

Common API for various icalendar/vcard storages.
Documentation
// Copyright 2023-2026 Hugo Osvaldo Barrera
//
// SPDX-License-Identifier: EUPL-1.2

//! Stream-based plan generation.

use std::sync::Arc;

use futures_util::stream::{Stream, StreamExt};

use crate::sync::{
    analysis::ResolvedMapping,
    conflict::ConflictResolver,
    declare::StoragePair,
    operation::{ItemOp, Operation, PropertyOp, PropertyOpKind},
    status::{MappingUid, StatusDatabase},
};

use super::{PlanError, collection::CollectionGenerator};

/// Internal state for the Plan stream implementation.
pub(super) struct PlanStreamState {
    pub(super) pair: StoragePair,
    pub(super) status: Option<Arc<StatusDatabase>>,
    pub(super) mappings: std::vec::IntoIter<Arc<ResolvedMapping>>,
    pub(super) current_collection: Option<CollectionGenerator>,
    pub(super) stale_mappings: Option<Vec<MappingUid>>,
}

impl PlanStreamState {
    pub(super) async fn next_operation(
        mut self,
    ) -> Option<(Result<Operation, PlanError>, PlanStreamState)> {
        if let Some(stale_uids) = self.stale_mappings.take() {
            return Some((Ok(Operation::FlushStaleMappings { stale_uids }), self));
        }

        loop {
            if let Some(ref mut collection) = self.current_collection {
                if let Some(result) = collection
                    .next_operation(&self.pair, self.status.as_deref())
                    .await
                {
                    return Some((result, self));
                }
                // Current collection is exhausted, move to next.
                self.current_collection = None;
            }

            if let Some(mapping) = self.mappings.next() {
                // No current collection or it's exhausted, try the next one.

                match CollectionGenerator::new(&self.pair, mapping, self.status.as_deref()).await {
                    Ok(Some(collection_plan)) => {
                        self.current_collection = Some(collection_plan);
                        // Next loop will yield operation from this collection.
                    }
                    Ok(None) => {} // Collection skipped.
                    Err(e) => return Some((Err(e), self)),
                }
            } else {
                // No more mappings, stream is exhausted
                return None;
            }
        }
    }
}

/// Wrap a stream of operations to automatically resolve conflicts.
///
/// Takes a stream that may contain conflict operations and a conflict resolver,
/// and returns a new stream where all conflicts have been replaced with resolved operations.
pub fn resolve_conflicts<S, R>(
    operations: S,
    resolver: R,
) -> impl Stream<Item = Result<Operation, PlanError>>
where
    S: Stream<Item = Result<Operation, PlanError>>,
    R: ConflictResolver + Clone + 'static,
{
    operations.then(move |result| {
        let resolver = resolver.clone();
        async move {
            match result {
                Ok(operation) => match operation {
                    Operation::Item(ItemOp::Conflict {
                        info, mapping_uid, ..
                    }) => match mapping_uid.resolve().await {
                        Ok(uid) => Ok(Operation::Item(resolver.resolve_item(info, uid))),
                        Err(err) => Err(PlanError::from(err)),
                    },
                    Operation::Property(PropertyOp {
                        kind: PropertyOpKind::Conflict { value_a, value_b },
                        property,
                        mapping,
                        mapping_uid,
                        ..
                    }) => match mapping_uid.resolve().await {
                        Ok(uid) => Ok(Operation::Property(
                            resolver.resolve_property(property, value_a, value_b, mapping, uid),
                        )),
                        Err(err) => Err(PlanError::from(err)),
                    },
                    // Pass through non-conflict operations unchanged
                    op => Ok(op),
                },
                Err(e) => Err(e),
            }
        }
    })
}

#[cfg(test)]
mod test {
    use std::{str::FromStr, sync::Arc};

    use futures_util::stream::StreamExt;
    use tempfile::Builder;

    use crate::{
        CollectionId, ItemKind,
        sync::{
            declare::{OnEmpty, StoragePair, SyncedCollection},
            operation::{CollectionOp, ItemOp, Operation},
            plan::Plan,
            status::Side,
        },
        vdir::VdirStorage,
    };

    #[tokio::test]
    async fn test_stream_no_mappings() {
        let dir_a = Builder::new().prefix("vstorage").tempdir().unwrap();
        let dir_b = Builder::new().prefix("vstorage").tempdir().unwrap();

        let storage_a = Arc::new(
            VdirStorage::builder(dir_a.path().to_path_buf().try_into().unwrap())
                .unwrap()
                .build(ItemKind::Calendar),
        );
        let storage_b = Arc::from(
            VdirStorage::builder(dir_b.path().to_path_buf().try_into().unwrap())
                .unwrap()
                .build(ItemKind::Calendar),
        );

        // This sync would be a no-op, but it's not "wrong".
        let pair = StoragePair::new(storage_a.clone(), storage_b.clone());
        let stream = Plan::new(pair, None).await.unwrap();

        let operations: Vec<_> = stream.collect::<Vec<_>>().await;
        assert_eq!(operations.len(), 0);
    }

    #[tokio::test]
    async fn test_stream_simple_mapping() {
        let dir_a = Builder::new().prefix("vstorage").tempdir().unwrap();
        let dir_b = Builder::new().prefix("vstorage").tempdir().unwrap();

        let storage_a = Arc::new(
            VdirStorage::builder(dir_a.path().to_path_buf().try_into().unwrap())
                .unwrap()
                .build(ItemKind::Calendar),
        );
        let storage_b = Arc::from(
            VdirStorage::builder(dir_b.path().to_path_buf().try_into().unwrap())
                .unwrap()
                .build(ItemKind::Calendar),
        );

        // This sync has a single direct collection mapping.
        let collection = CollectionId::from_str("test").unwrap();
        let pair = StoragePair::new(storage_a.clone(), storage_b.clone())
            .with_mapping(SyncedCollection::direct(collection));

        let stream = Plan::new(pair, None).await.unwrap();

        let operations: Vec<_> = stream.collect::<Vec<_>>().await;
        assert!(!operations.is_empty());

        let first = operations.first().unwrap();
        assert!(matches!(first, Ok(Operation::Collection(_))));
    }

    #[tokio::test]
    async fn test_collection_creation_before_items() {
        let dir_a = Builder::new().prefix("vstorage").tempdir().unwrap();
        let dir_b = Builder::new().prefix("vstorage").tempdir().unwrap();

        let storage_a = Arc::new(
            VdirStorage::builder(dir_a.path().to_path_buf().try_into().unwrap())
                .unwrap()
                .build(ItemKind::Calendar),
        );
        let storage_b = Arc::from(
            VdirStorage::builder(dir_b.path().to_path_buf().try_into().unwrap())
                .unwrap()
                .build(ItemKind::Calendar),
        );

        // Create collection on side A with an item
        let collection_id = CollectionId::from_str("test").unwrap();
        let collection_path = dir_a.path().join("test");
        std::fs::create_dir(&collection_path).unwrap();

        // Add an item to the collection
        std::fs::write(
            collection_path.join("item.ics"),
            [
                "BEGIN:VCALENDAR",
                "VERSION:2.0",
                "BEGIN:VEVENT",
                "UID:test-item",
                "DTSTART:20240101T120000Z",
                "END:VEVENT",
                "END:VCALENDAR",
                "",
            ]
            .join("\r\n"),
        )
        .unwrap();

        // Collection doesn't exist on side B, so it needs to be created
        let pair = StoragePair::new(storage_a.clone(), storage_b.clone())
            .with_mapping(SyncedCollection::direct(collection_id));

        let stream = Plan::new(pair, None).await.unwrap();
        let operations: Vec<_> = stream.collect::<Vec<_>>().await;

        // Find positions of collection creation and item operations
        let mut collection_create_pos = None;
        let mut first_item_pos = None;

        for (i, op_result) in operations.iter().enumerate() {
            if let Ok(op) = op_result {
                match op {
                    Operation::Collection(
                        CollectionOp::CreateInOne { .. } | CollectionOp::CreateInBoth { .. },
                    ) => {
                        assert!(
                            collection_create_pos.replace(i).is_none(),
                            "more than one collection creation event"
                        );
                    }
                    Operation::Item(ItemOp::Write(w)) if w.target_side == Side::B => {
                        assert!(
                            first_item_pos.replace(i).is_none(),
                            "more than one item write event"
                        );
                    }
                    _ => {}
                }
            }
        }

        let coll_pos = collection_create_pos.unwrap();
        let item_pos = first_item_pos.unwrap();
        assert!(coll_pos < item_pos);
    }

    #[tokio::test]
    async fn test_collection_deletion_after_items() {
        use crate::sync::status::StatusDatabase;

        let dir_a = Builder::new().prefix("vstorage").tempdir().unwrap();
        let dir_b = Builder::new().prefix("vstorage").tempdir().unwrap();
        let db_path = Builder::new().prefix("vstorage").tempdir().unwrap();

        let storage_a = Arc::new(
            VdirStorage::builder(dir_a.path().to_path_buf().try_into().unwrap())
                .unwrap()
                .build(ItemKind::Calendar),
        );
        let storage_b = Arc::from(
            VdirStorage::builder(dir_b.path().to_path_buf().try_into().unwrap())
                .unwrap()
                .build(ItemKind::Calendar),
        );

        let collection_id = CollectionId::from_str("test").unwrap();
        let collection_path = dir_b.path().join("test");
        std::fs::create_dir(&collection_path).unwrap();

        std::fs::write(
            collection_path.join("item.ics"),
            [
                "BEGIN:VCALENDAR",
                "VERSION:2.0",
                "BEGIN:VEVENT",
                "UID:test-item",
                "DTSTART:20240101T120000Z",
                "END:VEVENT",
                "END:VCALENDAR",
                "",
            ]
            .join("\r\n"),
        )
        .unwrap();

        // Create status database and record the collection + item as previously synced.
        let status_path = db_path.path().join("status.db");
        let status = Arc::new(StatusDatabase::open_or_create(&status_path).unwrap());

        // Set up the pair with the collection mapping.
        let pair = StoragePair::new(storage_a.clone(), storage_b.clone())
            .with_mapping(SyncedCollection::direct(collection_id.clone()))
            .on_empty(OnEmpty::Sync);

        // First sync: establish the collection and item in status database
        {
            use crate::sync::execute::Executor;
            let stream = Plan::new(pair.clone(), Some(status.clone())).await.unwrap();
            let operations: Vec<_> = stream.collect::<Vec<_>>().await;

            Executor::new(|_| {})
                .execute_stream(
                    storage_a.clone(),
                    storage_b.clone(),
                    futures_util::stream::iter(operations),
                    &status,
                )
                .await
                .unwrap()
                .unwrap();
        }

        // Simulate out-of-band deletion from side B only.
        std::fs::remove_dir_all(&collection_path).unwrap();

        // Second sync: should delete items first, then collection.
        let stream = Plan::new(pair, Some(status)).await.unwrap();
        let operations: Vec<_> = stream.collect::<Vec<_>>().await;

        let mut first_item_delete_pos = None;
        let mut collection_delete_pos = None;

        for (i, op_result) in operations.iter().enumerate() {
            if let Ok(op) = op_result {
                match op {
                    Operation::Item(ItemOp::Delete(d)) if d.side == Side::A => {
                        if first_item_delete_pos.is_none() {
                            first_item_delete_pos = Some(i);
                        }
                    }
                    Operation::Collection(CollectionOp::Delete { .. }) => {
                        if collection_delete_pos.is_none() {
                            collection_delete_pos = Some(i);
                        }
                    }
                    _ => {}
                }
            }
        }

        let item_pos = first_item_delete_pos.unwrap();
        let coll_pos = collection_delete_pos.unwrap();
        assert!(
            item_pos < coll_pos,
            "Item deletion (pos {item_pos}) must come before collection deletion (pos {coll_pos})"
        );
    }
}