1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
use std::collections::HashSet;
use super::Result;
use sos_sdk::{
commit::CommitRelationship,
events::{ChangeAction, ChangeEvent, ChangeNotification},
vault::VaultRef,
};
use crate::client::provider::StorageProvider;
/// Respond to a change notification.
///
/// The return flag indicates whether the change was made
/// by this node which is determined by comparing the session
/// identifier on the change notification with the current
/// session identifier for this node.
pub async fn handle_change(
provider: &mut (impl StorageProvider + Send + Sync + 'static),
change: ChangeNotification,
) -> Result<HashSet<ChangeAction>> {
// Gather actions corresponding to the events
let mut actions = HashSet::new();
for event in change.changes() {
let action = match event {
ChangeEvent::CreateVault(summary) => {
ChangeAction::Create(summary.clone())
}
ChangeEvent::DeleteVault => {
ChangeAction::Remove(*change.vault_id())
}
_ => ChangeAction::Pull(*change.vault_id()),
};
actions.insert(action);
}
// Consume and react to the actions
for action in &actions {
let summary = provider
.state()
.find_vault(&VaultRef::Id(*change.vault_id()))
.cloned();
if let Some(summary) = &summary {
match action {
ChangeAction::Pull(_) => {
let tree = provider
.commit_tree(summary)
.ok_or(sos_sdk::Error::NoRootCommit)?;
let head = tree.head()?;
tracing::debug!(
vault_id = ?summary.id(),
change_root = ?change.proof().root_hex(),
root = ?head.root_hex(),
"handle_change");
// Looks like the change was made elsewhere
// and we should attempt to sync with the server
if change.proof().root() != head.root() {
let (status, _) = provider.status(summary).await?;
match status {
CommitRelationship::Behind(_, _) => {
provider.pull(summary, false).await?;
}
CommitRelationship::Diverged(_) => {
if change
.changes()
.iter()
.any(|c| c == &ChangeEvent::UpdateVault)
{
// If the trees have diverged and the other
// node indicated it did an update to the
// entire vault then we need a force pull to
// stay in sync
provider.pull(summary, true).await?;
}
}
_ => {}
}
}
}
ChangeAction::Remove(_) => {
provider.remove_local_cache(summary)?;
}
_ => {}
}
} else if let ChangeAction::Create(summary) = action {
provider.add_local_cache(summary.clone()).await?;
}
}
Ok(actions)
}