Skip to main content

orca_control/store/
restore.rs

1//! Snapshot restore logic for `ClusterStore`.
2
3use std::collections::HashMap;
4
5use tracing::debug;
6
7use super::kv::{ASSIGNMENTS, ClusterStore, NODES, SERVICES};
8use super::types::{Assignment, RaftSnapshot};
9
10impl ClusterStore {
11    /// Restore the store from a snapshot, replacing all data atomically.
12    pub fn restore_from_snapshot(&self, snap: &RaftSnapshot) -> anyhow::Result<()> {
13        let tx = self.db.begin_write()?;
14        let mut nt = tx.open_table(NODES)?;
15        while nt.pop_last()?.is_some() {}
16        for (id, n) in &snap.nodes {
17            nt.insert(*id, serde_json::to_vec(n)?.as_slice())?;
18        }
19        drop(nt);
20        let mut st = tx.open_table(SERVICES)?;
21        while st.pop_last()?.is_some() {}
22        for (name, c) in &snap.services {
23            st.insert(name.as_str(), serde_json::to_vec(c)?.as_slice())?;
24        }
25        drop(st);
26        let mut at = tx.open_table(ASSIGNMENTS)?;
27        while at.pop_last()?.is_some() {}
28        let mut by_svc: HashMap<&str, Vec<&Assignment>> = HashMap::new();
29        for a in &snap.assignments {
30            by_svc.entry(&a.service).or_default().push(a);
31        }
32        for (s, v) in &by_svc {
33            at.insert(*s, serde_json::to_vec(&v)?.as_slice())?;
34        }
35        drop(at);
36        tx.commit()?;
37        debug!("Restored store from snapshot");
38        Ok(())
39    }
40}