vstorage 0.8.1

Common API for various icalendar/vcard storages.
Documentation
// Copyright 2023-2026 Hugo Osvaldo Barrera
//
// SPDX-License-Identifier: EUPL-1.2

//! Collection operation generation.

use std::collections::{HashMap, hash_map};
use std::sync::Arc;

use log::warn;

use crate::sync::items::SideState;
use crate::sync::mapping::ResolvedMapping;
use crate::sync::{
    declare::{OnDelete, OnEmpty, StoragePair},
    mode::Mode,
    operation::{CollectionOp, MappingUidSource, Operation, PropertyOp},
    ordering::{DeletionBarrier, completion_pair},
    status::{MappingUid, Side, StatusDatabase},
};

use super::{
    PlanError, items::items_for_collection, property::load_and_create_property_operations,
};

/// State for a single collection's operation generation.
///
/// Each field acts as implicit phase state: once consumed (via `Option::take()` or iterator
/// exhaustion), subsequent calls to [`Self::next_operation`] fall through to later phases.
pub(super) struct CollectionGenerator {
    mode: Arc<dyn Mode>,
    // Collection operation state.
    collection_op: Option<CollectionOp>,
    collection_deletion: Option<CollectionOp>,
    mapping_uid_source: MappingUidSource,
    deletion_barrier: Option<DeletionBarrier>,
    // Item data (buffered, algorithmically required).
    items_by_uid: hash_map::IntoIter<String, (Option<SideState>, Option<SideState>)>,
    // Property data (lazily loaded).
    properties: Option<std::vec::IntoIter<PropertyOp>>,
    // Mapping for this collection.
    pub(super) mapping: Arc<ResolvedMapping>,
    // Sync tokens from incremental sync.
    pub(super) new_sync_token_a: Option<String>,
    pub(super) new_sync_token_b: Option<String>,
}

impl CollectionGenerator {
    /// Create a `CollectionGenerator` for a single collection.
    ///
    /// Performs the initial data loading for generating operations for a single collection.
    #[allow(clippy::too_many_lines)]
    pub(super) async fn new(
        pair: &StoragePair,
        mapping: Arc<ResolvedMapping>,
        status: Option<&StatusDatabase>,
    ) -> Result<Option<CollectionGenerator>, PlanError> {
        let mapping_uid = status
            .map(|s| s.get_mapping_uid(mapping.a().href(), mapping.b().href()))
            .transpose()?
            .flatten();

        let (result_a, result_b) = tokio::try_join!(
            items_for_collection(
                status,
                pair.storage_a().as_ref(),
                mapping.a().href(),
                Side::A,
                mapping_uid
            ),
            items_for_collection(
                status,
                pair.storage_b().as_ref(),
                mapping.b().href(),
                Side::B,
                mapping_uid
            ),
        )?;

        let items_a = result_a.items;
        let items_b = result_b.items;
        let new_sync_token_a = result_a.new_sync_token;
        let new_sync_token_b = result_b.new_sync_token;

        let status_uids = match (status, mapping_uid) {
            (Some(s), Some(m)) => {
                let uids = s.all_uids(m)?;
                if !uids.is_empty() && pair.on_empty == OnEmpty::Skip {
                    // Status has entries and we've been told to skip emptying
                    if items_a.is_empty() && !items_b.is_empty() {
                        warn!(
                            "Collection {} has been emptied on storage a.",
                            mapping.alias
                        );
                        return Ok(None);
                    }
                    if !items_a.is_empty() && items_b.is_empty() {
                        warn!(
                            "Collection {} has been emptied on storage b.",
                            mapping.alias
                        );
                        return Ok(None);
                    }
                }
                uids
            }
            _ => Vec::with_capacity(0),
        };

        let CollectionPlan {
            op,
            deletion,
            deletion_barrier,
            mapping_uid_source,
        } = determine_collection_operation(
            mapping.a.exists,
            mapping.b.exists,
            mapping_uid,
            pair.on_delete,
            mapping.clone(),
        );

        // Ensure that tokens are only applied after all other collection operations.
        let deletion_barrier = deletion_barrier.or_else(|| {
            if new_sync_token_a.is_some() || new_sync_token_b.is_some() {
                Some(DeletionBarrier::new())
            } else {
                None
            }
        });

        // Implicitly deduplicate UIDs across both sides and status.
        let mut items_by_uid = HashMap::<String, (Option<SideState>, Option<SideState>)>::new();
        for item in items_a {
            let uid = item.state().uid.clone();
            items_by_uid.entry(uid).or_default().0 = Some(item);
        }
        for item in items_b {
            let uid = item.state().uid.clone();
            items_by_uid.entry(uid).or_default().1 = Some(item);
        }
        for uid in status_uids {
            items_by_uid.entry(uid).or_default();
        }

        Ok(Some(CollectionGenerator {
            collection_op: op,
            collection_deletion: deletion,
            mapping_uid_source,
            deletion_barrier,
            items_by_uid: items_by_uid.into_iter(),
            properties: None,
            mapping,
            mode: pair.mode.clone(),
            new_sync_token_a,
            new_sync_token_b,
        }))
    }

    /// Returns true if properties should be skipped.
    fn skip_properties(&self) -> bool {
        self.collection_deletion.is_some()
    }
}

/// Decision about what operations are needed for a collection.
struct CollectionPlan {
    /// None if no-op or in case of deletion.
    op: Option<CollectionOp>,
    /// None unless this is a deletion scenario.
    deletion: Option<CollectionOp>,
    /// Barrier for coordinating deletion with item/property operations.
    /// None unless this is a deletion scenario.
    deletion_barrier: Option<DeletionBarrier>,
    /// Source of mapping UID for items in this collection.
    mapping_uid_source: MappingUidSource,
}

/// Determine the collection operation plan based on current state.
#[allow(clippy::too_many_lines)]
fn determine_collection_operation(
    a_exists: bool,
    b_exists: bool,
    mapping_uid: Option<MappingUid>,
    on_delete: OnDelete,
    mapping: Arc<ResolvedMapping>,
) -> CollectionPlan {
    match (a_exists, b_exists, mapping_uid) {
        // Both missing:  create on both sides.
        (false, false, _) => {
            let (completion, wait) = completion_pair();
            CollectionPlan {
                op: Some(CollectionOp::CreateInBoth {
                    mapping,
                    completion,
                }),
                deletion: None,
                deletion_barrier: None,
                mapping_uid_source: MappingUidSource::Deferred(wait),
            }
        }
        // Both exist and are new: save to status.
        (true, true, None) => {
            let (completion, wait) = completion_pair();
            CollectionPlan {
                op: Some(CollectionOp::SaveToStatus {
                    mapping,
                    completion,
                }),
                deletion: None,
                deletion_barrier: None,
                mapping_uid_source: MappingUidSource::Deferred(wait),
            }
        }
        // Both exist, already in status: noop.
        (true, true, Some(m)) => CollectionPlan {
            op: None,
            deletion: None,
            deletion_barrier: None,
            mapping_uid_source: MappingUidSource::Immediate(m),
        },

        // Deleted from A (exists in B): delete from B.
        (false, true, Some(m)) => {
            if on_delete == OnDelete::Skip {
                CollectionPlan {
                    op: None,
                    deletion: None,
                    deletion_barrier: None,
                    mapping_uid_source: MappingUidSource::Immediate(m),
                }
            } else {
                let barrier = DeletionBarrier::new();
                let wait_handle = barrier.wait_handle();

                CollectionPlan {
                    op: None,
                    deletion: Some(CollectionOp::Delete {
                        mapping,
                        mapping_uid: m,
                        side: Side::B,
                        wait_for_items: wait_handle,
                    }),
                    deletion_barrier: Some(barrier),
                    mapping_uid_source: MappingUidSource::Immediate(m),
                }
            }
        }
        // New in B only: create in A.
        (false, true, None) => {
            let (completion, wait) = completion_pair();
            CollectionPlan {
                op: Some(CollectionOp::CreateInOne {
                    mapping,
                    side: Side::A,
                    completion,
                }),
                deletion: None,
                deletion_barrier: None,
                mapping_uid_source: MappingUidSource::Deferred(wait),
            }
        }
        // New in A only: create in B.
        (true, false, None) => {
            let (completion, wait) = completion_pair();
            CollectionPlan {
                op: Some(CollectionOp::CreateInOne {
                    mapping,
                    side: Side::B,
                    completion,
                }),
                deletion: None,
                deletion_barrier: None,
                mapping_uid_source: MappingUidSource::Deferred(wait),
            }
        }
        // Deleted from B (exists in A): delete from A.
        (true, false, Some(m)) => {
            if on_delete == OnDelete::Skip {
                CollectionPlan {
                    op: None,
                    deletion: None,
                    deletion_barrier: None,
                    mapping_uid_source: MappingUidSource::Immediate(m),
                }
            } else {
                let barrier = DeletionBarrier::new();
                let wait_handle = barrier.wait_handle();

                CollectionPlan {
                    op: None,
                    deletion: Some(CollectionOp::Delete {
                        mapping,
                        mapping_uid: m,
                        side: Side::A,
                        wait_for_items: wait_handle,
                    }),
                    deletion_barrier: Some(barrier),
                    mapping_uid_source: MappingUidSource::Immediate(m),
                }
            }
        }
    }
}

impl CollectionGenerator {
    /// Generate the next operation from a collection's state.
    ///
    /// Returns `Some(operation)` if there's a next operation for this collection,
    /// or `None` if the collection is exhausted.
    ///
    /// Each phase's data is consumed as it is yielded (`Option::take()`, iterator exhaustion),
    /// so subsequent calls naturally fall through to the next phase.
    pub(super) async fn next_operation(
        &mut self,
        pair: &StoragePair,
        status: Option<&StatusDatabase>,
    ) -> Option<Result<Operation, PlanError>> {
        // Collection creation/save op.
        if let Some(op) = self.collection_op.take() {
            return Some(Ok(Operation::Collection(op)));
        }

        // Item ops (one per call, iterator resumes where it left off).
        for (uid, (side_a, side_b)) in self.items_by_uid.by_ref() {
            let previous = match (status, self.mapping_uid_source.immediate()) {
                (Some(s), Some(m)) => match s.get_item_hash_by_uid(m, &uid) {
                    Ok(prev) => prev,
                    Err(e) => return Some(Err(e.into())),
                },
                _ => None,
            };

            if let Some(item_op) = self.mode.decide_item_action(
                side_a,
                side_b,
                previous,
                &self.mapping,
                self.mapping_uid_source.clone(),
                self.deletion_barrier.as_ref(),
            ) {
                return Some(Ok(Operation::Item(item_op)));
            }
        }

        // Property ops (lazily loaded on first entry, then iterated).
        if !self.skip_properties() {
            if self.properties.is_none() {
                match load_and_create_property_operations(
                    &self.mapping,
                    self.mapping_uid_source.immediate(),
                    pair,
                    status,
                    self.deletion_barrier.as_ref(),
                    &*self.mode,
                )
                .await
                {
                    Ok(props) => self.properties = Some(props.into_iter()),
                    Err(e) => return Some(Err(e)),
                }
            }

            if let Some(op) = self.properties.as_mut().and_then(Iterator::next) {
                return Some(Ok(Operation::Property(op)));
            }
        }

        // Sync token ops (barrier is guaranteed to exist when tokens are present).
        if let Some(barrier) = self.deletion_barrier.as_ref() {
            if let Some(token) = self.new_sync_token_a.take() {
                return Some(Ok(Operation::Collection(CollectionOp::StoreSyncToken {
                    mapping_uid: self.mapping_uid_source.clone(),
                    side: Side::A,
                    token,
                    wait_for_items: barrier.wait_handle(),
                })));
            }
            if let Some(token) = self.new_sync_token_b.take() {
                return Some(Ok(Operation::Collection(CollectionOp::StoreSyncToken {
                    mapping_uid: self.mapping_uid_source.clone(),
                    side: Side::B,
                    token,
                    wait_for_items: barrier.wait_handle(),
                })));
            }
        }

        // Collection deletion.
        if let Some(op) = self.collection_deletion.take() {
            return Some(Ok(Operation::Collection(op)));
        }

        None
    }
}