vstorage 0.7.0

Common API for various icalendar/vcard storages.
Documentation
// Copyright 2023-2026 Hugo Osvaldo Barrera
//
// SPDX-License-Identifier: EUPL-1.2

//! Item fetching logic.

use std::collections::HashSet;

use log::debug;

use crate::{
    ErrorKind, Href,
    base::{CollectionChanges, Storage},
    sync::{
        analysis::SideState,
        status::{ItemState, MappingUid, Side, StatusDatabase},
    },
};

use super::PlanError;

/// Result of fetching items for a collection.
#[derive(Default)]
pub(crate) struct ItemsForCollectionResult {
    /// Items found on this side.
    pub items: Vec<SideState>,
    /// New sync token if incremental sync is available.
    pub new_sync_token: Option<String>,
}

// Fetching ===================================================================

/// Returns the state of all items for a collection as [`SideState`].
///
/// Loads item state from storage, using the status database to optimize fetching
/// by skipping items that haven't changed according to their etags.
///
/// For unchanged items (etag matches status), `data` shall be `None`.
/// For changed or new items, `data` shall be `Some(item)`.
///
/// Uses [`Storage::changed_since`] if available.
// XXX: Items missing in the results set are interpreted as "deleted".
// XXX: With sync-token: we need to manually include unchanged items from the status.
// XXX: Without sync-token: unchanged items are naturally listed.
pub(crate) async fn items_for_collection(
    status: Option<&StatusDatabase>,
    storage: &dyn Storage,
    collection: &Href,
    side: Side,
    mapping_uid: Option<MappingUid>,
) -> Result<ItemsForCollectionResult, PlanError> {
    debug!("Resolving state for collection: {collection}.");

    let stored_token = match (status, mapping_uid) {
        (Some(s), Some(m)) => s.get_sync_token(m, side)?,
        _ => None,
    };

    match storage
        .changed_since(collection, stored_token.as_deref())
        .await
    {
        Ok(changes) => {
            debug!(
                "changed_since: {} changed, {} deleted (bootstrap={})",
                changes.changed.len(),
                changes.deleted.len(),
                stored_token.is_none()
            );
            return items_via_synctoken(status, storage, mapping_uid, side, changes).await;
        }
        Err(err) if err.kind == ErrorKind::Unsupported => {
            debug!("Storage does not support changed_since, using list_items.");
        }
        Err(err) if err.kind == ErrorKind::Unavailable => {
            debug!("Server does not support sync tokens, using list_items.");
        }
        Err(err) => return Err(err.into()),
    }

    // Fallback: sync token not supported.
    items_via_list(status, storage, collection, side, mapping_uid).await
}

/// Fetch items using standard [`Storage::list_items`] + `ETag` comparison.
async fn items_via_list(
    status: Option<&StatusDatabase>,
    storage: &dyn Storage,
    collection: &Href,
    side: Side,
    mapping_uid: Option<MappingUid>,
) -> Result<ItemsForCollectionResult, PlanError> {
    let mut result = ItemsForCollectionResult::default();

    let prefetched = if let Some(status) = status {
        let mut to_prefetch = Vec::new();

        let listed_items = match storage.list_items(collection).await {
            Ok(i) => i,
            Err(err) if err.kind == ErrorKind::DoesNotExist => {
                return Ok(result);
            }
            Err(err) => return Err(err.into()),
        };

        for item_ver in listed_items {
            if let Some(m) = mapping_uid
                && let Some(from_status) = status.get_item_by_href(side, &item_ver.href, m)?
                && from_status.etag(side) == &item_ver.etag
            {
                // Unchanged - use hash from status, no data
                let version = match side {
                    Side::A => &from_status.a,
                    Side::B => &from_status.b,
                };
                result.items.push(SideState::unchanged(ItemState {
                    version: version.clone(),
                    uid: from_status.uid.clone(),
                    hash: from_status.hash.clone(),
                }));
                continue;
            }
            to_prefetch.push(item_ver.href);
        }

        let to_prefetch = to_prefetch.iter().map(String::as_str).collect::<Vec<_>>();
        storage.get_many_items(&to_prefetch).await?
    } else {
        match storage.get_all_items(collection).await {
            Ok(i) => i,
            Err(err) if err.kind == ErrorKind::DoesNotExist => Vec::with_capacity(0),
            Err(err) => return Err(PlanError::from(err)),
        }
    };

    let prefetched = prefetched.into_iter().map(SideState::from_fetched);
    result.items.extend(prefetched);

    Ok(result)
}

/// Fetch items using incremental sync via `changed_since`.
async fn items_via_synctoken(
    status: Option<&StatusDatabase>,
    storage: &dyn Storage,
    mapping_uid: Option<MappingUid>,
    side: Side,
    changes: CollectionChanges,
) -> Result<ItemsForCollectionResult, PlanError> {
    let mut result = ItemsForCollectionResult {
        items: Vec::new(),
        new_sync_token: changes.new_state,
    };

    if !changes.changed.is_empty() {
        let hrefs: Vec<&str> = changes.changed.iter().map(Href::as_str).collect();
        let fetched = storage.get_many_items(&hrefs).await?;
        result
            .items
            .extend(fetched.into_iter().map(SideState::from_fetched));
    }
    // Deleted items are omitted from the return value.

    // Items not changed (nor deleted) need to be in the result set.
    if let (Some(status), Some(mapping_uid)) = (status, mapping_uid) {
        let changed_or_deleted: HashSet<&str> = changes
            .deleted
            .iter()
            .chain(changes.changed.iter())
            .map(Href::as_str)
            .collect();

        for item_state in status.get_all_items(mapping_uid, side)? {
            if !changed_or_deleted.contains(item_state.version.href.as_str()) {
                result
                    .items
                    .push(SideState::Unchanged { state: item_state });
            }
        }
    }

    Ok(result)
}