1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use std::collections::HashSet;

use super::Result;

use sos_sdk::{
    commit::CommitRelationship,
    events::{ChangeAction, ChangeEvent, ChangeNotification},
    vault::VaultRef,
};

use crate::client::provider::StorageProvider;

/// Respond to a change notification.
///
/// The return flag indicates whether the change was made
/// by this node which is determined by comparing the session
/// identifier on the change notification with the current
/// session identifier for this node.
pub async fn handle_change(
    provider: &mut (impl StorageProvider + Send + Sync + 'static),
    change: ChangeNotification,
) -> Result<HashSet<ChangeAction>> {
    // Gather actions corresponding to the events
    let mut actions = HashSet::new();
    for event in change.changes() {
        let action = match event {
            ChangeEvent::CreateVault(summary) => {
                ChangeAction::Create(summary.clone())
            }
            ChangeEvent::DeleteVault => {
                ChangeAction::Remove(*change.vault_id())
            }
            _ => ChangeAction::Pull(*change.vault_id()),
        };
        actions.insert(action);
    }

    // Consume and react to the actions
    for action in &actions {
        let summary = provider
            .state()
            .find_vault(&VaultRef::Id(*change.vault_id()))
            .cloned();

        if let Some(summary) = &summary {
            match action {
                ChangeAction::Pull(_) => {
                    let tree = provider
                        .commit_tree(summary)
                        .ok_or(sos_sdk::Error::NoRootCommit)?;

                    let head = tree.head()?;

                    tracing::debug!(
                        vault_id = ?summary.id(),
                        change_root = ?change.proof().root_hex(),
                        root = ?head.root_hex(),
                        "handle_change");

                    // Looks like the change was made elsewhere
                    // and we should attempt to sync with the server
                    if change.proof().root() != head.root() {
                        let (status, _) = provider.status(summary).await?;

                        match status {
                            CommitRelationship::Behind(_, _) => {
                                provider.pull(summary, false).await?;
                            }
                            CommitRelationship::Diverged(_) => {
                                if change
                                    .changes()
                                    .iter()
                                    .any(|c| c == &ChangeEvent::UpdateVault)
                                {
                                    // If the trees have diverged and the other
                                    // node indicated it did an update to the
                                    // entire vault then we need a force pull to
                                    // stay in sync
                                    provider.pull(summary, true).await?;
                                }
                            }
                            _ => {}
                        }
                    }
                }
                ChangeAction::Remove(_) => {
                    provider.remove_local_cache(summary)?;
                }
                _ => {}
            }
        } else if let ChangeAction::Create(summary) = action {
            provider.add_local_cache(summary.clone()).await?;
        }
    }

    Ok(actions)
}