use std::collections::HashSet;
use log::debug;
use crate::{
ErrorKind, Href,
base::{CollectionChanges, Storage},
sync::{
analysis::SideState,
status::{ItemState, MappingUid, Side, StatusDatabase},
},
};
use super::PlanError;
#[derive(Default)]
pub(crate) struct ItemsForCollectionResult {
pub items: Vec<SideState>,
pub new_sync_token: Option<String>,
}
pub(crate) async fn items_for_collection(
status: Option<&StatusDatabase>,
storage: &dyn Storage,
collection: &Href,
side: Side,
mapping_uid: Option<MappingUid>,
) -> Result<ItemsForCollectionResult, PlanError> {
debug!("Resolving state for collection: {collection}.");
let stored_token = match (status, mapping_uid) {
(Some(s), Some(m)) => s.get_sync_token(m, side)?,
_ => None,
};
match storage
.changed_since(collection, stored_token.as_deref())
.await
{
Ok(changes) => {
debug!(
"changed_since: {} changed, {} deleted (bootstrap={})",
changes.changed.len(),
changes.deleted.len(),
stored_token.is_none()
);
return items_via_synctoken(status, storage, mapping_uid, side, changes).await;
}
Err(err) if err.kind == ErrorKind::Unsupported => {
debug!("Storage does not support changed_since, using list_items.");
}
Err(err) if err.kind == ErrorKind::Unavailable => {
debug!("Server does not support sync tokens, using list_items.");
}
Err(err) => return Err(err.into()),
}
items_via_list(status, storage, collection, side, mapping_uid).await
}
async fn items_via_list(
status: Option<&StatusDatabase>,
storage: &dyn Storage,
collection: &Href,
side: Side,
mapping_uid: Option<MappingUid>,
) -> Result<ItemsForCollectionResult, PlanError> {
let mut result = ItemsForCollectionResult::default();
let prefetched = if let Some(status) = status {
let mut to_prefetch = Vec::new();
let listed_items = match storage.list_items(collection).await {
Ok(i) => i,
Err(err) if err.kind == ErrorKind::DoesNotExist => {
return Ok(result);
}
Err(err) => return Err(err.into()),
};
for item_ver in listed_items {
if let Some(m) = mapping_uid
&& let Some(from_status) = status.get_item_by_href(side, &item_ver.href, m)?
&& from_status.etag(side) == &item_ver.etag
{
let version = match side {
Side::A => &from_status.a,
Side::B => &from_status.b,
};
result.items.push(SideState::unchanged(ItemState {
version: version.clone(),
uid: from_status.uid.clone(),
hash: from_status.hash.clone(),
}));
continue;
}
to_prefetch.push(item_ver.href);
}
let to_prefetch = to_prefetch.iter().map(String::as_str).collect::<Vec<_>>();
storage.get_many_items(&to_prefetch).await?
} else {
match storage.get_all_items(collection).await {
Ok(i) => i,
Err(err) if err.kind == ErrorKind::DoesNotExist => Vec::with_capacity(0),
Err(err) => return Err(PlanError::from(err)),
}
};
let prefetched = prefetched.into_iter().map(SideState::from_fetched);
result.items.extend(prefetched);
Ok(result)
}
async fn items_via_synctoken(
status: Option<&StatusDatabase>,
storage: &dyn Storage,
mapping_uid: Option<MappingUid>,
side: Side,
changes: CollectionChanges,
) -> Result<ItemsForCollectionResult, PlanError> {
let mut result = ItemsForCollectionResult {
items: Vec::new(),
new_sync_token: changes.new_state,
};
if !changes.changed.is_empty() {
let hrefs: Vec<&str> = changes.changed.iter().map(Href::as_str).collect();
let fetched = storage.get_many_items(&hrefs).await?;
result
.items
.extend(fetched.into_iter().map(SideState::from_fetched));
}
if let (Some(status), Some(mapping_uid)) = (status, mapping_uid) {
let changed_or_deleted: HashSet<&str> = changes
.deleted
.iter()
.chain(changes.changed.iter())
.map(Href::as_str)
.collect();
for item_state in status.get_all_items(mapping_uid, side)? {
if !changed_or_deleted.contains(item_state.version.href.as_str()) {
result
.items
.push(SideState::Unchanged { state: item_state });
}
}
}
Ok(result)
}