vstorage 0.10.0

Common API for various icalendar/vcard storages.
Documentation
// Copyright 2026 Hugo Osvaldo Barrera
//
// SPDX-License-Identifier: EUPL-1.2

//! Incremental plan generation from change events.

use std::{pin::Pin, sync::Arc, task::Context, task::Poll};

use futures_util::Stream;
use log::warn;
use tokio::sync::mpsc;

use crate::{
    ErrorKind, Href,
    property::Property,
    sync::{
        Side,
        declare::StoragePair,
        mapping::ResolvedMapping,
        operation::{MappingUidSource, Operation, PropertyOp},
        plan::{
            Plan, PlanError,
            collection::{GenerateError, generate_collection_operations},
            create_mappings_for_pair,
        },
        status::StatusDatabase,
    },
    watch::Event,
};

/// Stream generating synchronisation operations from a set of change events.
///
/// Processes only collections affected by the provided events, and does not check collections which
/// are known to be in sync.
pub struct IncrementalPlan(mpsc::Receiver<Result<Operation, PlanError>>);

impl IncrementalPlan {
    /// Create new incremental plan from a set of events.
    ///
    /// # Errors
    ///
    /// Returns an error if collection discovery or mapping resolution fails.
    pub async fn new(
        pair: StoragePair,
        status: Option<Arc<StatusDatabase>>,
        events: impl IntoIterator<Item = (Event, Side)>,
    ) -> Result<IncrementalPlan, PlanError> {
        let mappings = create_mappings_for_pair(&pair).await?;
        let mut affected = Vec::new();
        let mut properties: Vec<(Arc<ResolvedMapping>, Property)> = Vec::new();
        let mut has_storage_event = false;

        for (event, side) in events {
            match event {
                Event::Storage => {
                    has_storage_event = true;
                    break;
                }
                Event::Collection(col, _) | Event::Item(col, ..) => {
                    if let Some(mapping) = find_mapping(&mappings, side, &col) {
                        push_affected(&mut affected, mapping);
                        // Don't perform separate property-only sync.
                        properties.retain(|(m, _)| {
                            m.for_side(side).href() != mapping.for_side(side).href()
                        });
                    } else {
                        warn!("Event for unknown collection; falling back to full sync.");
                        has_storage_event = true;
                        break;
                    }
                }
                Event::Property(col, property, _) => {
                    if let Some(mapping) = find_mapping(&mappings, side, &col) {
                        // Skip if full-collection sync is pending.
                        if affected
                            .iter()
                            .any(|m| m.for_side(side).href() == mapping.for_side(side).href())
                        {
                            continue;
                        }
                        // Skip if property already pending.
                        if properties.iter().any(|(m, p)| {
                            m.for_side(side).href() == mapping.for_side(side).href()
                                && *p == property
                        }) {
                            continue;
                        }
                        properties.push((mapping.clone(), property));
                    } else {
                        warn!("Property event for unknown collection '{col}'. Ignoring.");
                    }
                }
            }
        }

        if has_storage_event {
            let rx = Plan::receiver(pair, status, mappings)?;
            return Ok(IncrementalPlan(rx));
        }

        let (tx, rx) = mpsc::channel(4);
        tokio::spawn(run_generator(tx, pair, status, affected, properties));

        Ok(IncrementalPlan(rx))
    }
}

/// Find the mapping with the given href.
fn find_mapping<'a>(
    mappings: &'a [Arc<ResolvedMapping>],
    side: Side,
    href: &Href,
) -> Option<&'a Arc<ResolvedMapping>> {
    mappings.iter().find(|m| m.for_side(side).href() == href)
}

/// Register another affected mapping, avoiding duplicates.
fn push_affected(affected: &mut Vec<Arc<ResolvedMapping>>, mapping: &Arc<ResolvedMapping>) {
    // Hint: StoragePair cannot have overlapping mappings.
    if !affected
        .iter()
        .any(|m| m.a().href() == mapping.a().href() || m.b().href() == mapping.b().href())
    {
        affected.push(mapping.clone());
    }
}

/// Background task for [`IncrementalPlan`].
async fn run_generator(
    tx: mpsc::Sender<Result<Operation, PlanError>>,
    pair: StoragePair,
    status: Option<Arc<StatusDatabase>>,
    mappings: Vec<Arc<ResolvedMapping>>,
    properties: Vec<(Arc<ResolvedMapping>, Property)>,
) {
    for mapping in mappings {
        match generate_collection_operations(&tx, &pair, mapping, status.as_deref()).await {
            Ok(()) => {}
            Err(GenerateError::Plan(e)) => {
                if tx.send(Err(e)).await.is_err() {
                    return;
                }
            }
            Err(GenerateError::ChannelClosed) => return,
        }
    }

    for (mapping, property) in properties {
        match generate_single_property_op(&mapping, property, &pair, status.as_deref()).await {
            Ok(Some(op)) => {
                if tx.send(Ok(Operation::Property(op))).await.is_err() {
                    return;
                }
            }
            Ok(None) => {}
            Err(e) => {
                if tx.send(Err(e)).await.is_err() {
                    return;
                }
            }
        }
    }
}

async fn generate_single_property_op(
    mapping: &Arc<ResolvedMapping>,
    property: Property,
    pair: &StoragePair,
    status: Option<&StatusDatabase>,
) -> Result<Option<PropertyOp>, PlanError> {
    let Some(status) = status else {
        warn!("Property sync: unknown collection '{}'.", mapping.alias());
        return Ok(None);
    };
    let Ok(Some(mapping_uid)) = status.get_mapping_uid(mapping.a().href(), mapping.b().href())
    else {
        warn!("Property sync: no mapping_uid for '{}'.", mapping.alias());
        return Ok(None);
    };

    let value_a = match pair
        .storage_a()
        .get_property(mapping.a().href(), property)
        .await
    {
        Ok(v) => v,
        Err(e) if e.kind == ErrorKind::Unsupported || e.kind == ErrorKind::DoesNotExist => {
            return Ok(None);
        }
        Err(e) => return Err(e.into()),
    };
    let value_b = match pair
        .storage_b()
        .get_property(mapping.b().href(), property)
        .await
    {
        Ok(v) => v,
        Err(e) if e.kind == ErrorKind::Unsupported || e.kind == ErrorKind::DoesNotExist => {
            return Ok(None);
        }
        Err(e) => return Err(e.into()),
    };

    let previous = status.get_property(mapping_uid, property)?;

    Ok(pair.mode.decide_property_action(
        property,
        value_a,
        value_b,
        previous,
        mapping,
        &MappingUidSource::Immediate(mapping_uid),
        None,
    ))
}

impl Stream for IncrementalPlan {
    type Item = Result<Operation, PlanError>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.0.poll_recv(cx)
    }
}