Skip to main content

orca_control/store/
kv.rs

1//! Key-value cluster store backed by redb.
2
3use std::collections::HashMap;
4use std::path::Path;
5
6use redb::{Database, ReadableTable, TableDefinition};
7use tracing::debug;
8
9use orca_core::config::ServiceConfig;
10use orca_core::types::NodeStatus;
11
12use super::types::{Assignment, NodeEntry, RaftEntry, RaftSnapshot};
13
14pub(super) const NODES: TableDefinition<u64, &[u8]> = TableDefinition::new("nodes");
15pub(super) const SERVICES: TableDefinition<&str, &[u8]> = TableDefinition::new("services");
16pub(super) const ASSIGNMENTS: TableDefinition<&str, &[u8]> = TableDefinition::new("assignments");
17
18/// Persistent cluster state store.
19pub struct ClusterStore {
20    pub(super) db: Database,
21}
22
23impl ClusterStore {
24    /// Open or create a store at the given path.
25    pub fn open(path: &Path) -> anyhow::Result<Self> {
26        let db = Database::create(path)?;
27        // Ensure tables exist
28        let tx = db.begin_write()?;
29        {
30            let _ = tx.open_table(NODES)?;
31            let _ = tx.open_table(SERVICES)?;
32            let _ = tx.open_table(ASSIGNMENTS)?;
33        }
34        tx.commit()?;
35        Ok(Self { db })
36    }
37
38    /// Apply a Raft log entry to the store.
39    pub fn apply(&self, entry: &RaftEntry) -> anyhow::Result<()> {
40        match entry {
41            RaftEntry::RegisterNode {
42                node_id,
43                address,
44                labels,
45            } => {
46                let node = NodeEntry {
47                    node_id: *node_id,
48                    address: address.clone(),
49                    labels: labels.clone(),
50                    status: NodeStatus::Ready,
51                    resources: None,
52                };
53                self.set_node(*node_id, &node)?;
54                debug!("Registered node {node_id} at {address}");
55            }
56            RaftEntry::DeregisterNode { node_id } => {
57                self.remove_node(*node_id)?;
58                debug!("Deregistered node {node_id}");
59            }
60            RaftEntry::SetService(config) => {
61                self.set_service(&config.name, config)?;
62                debug!("Set service {}", config.name);
63            }
64            RaftEntry::RemoveService(name) => {
65                self.remove_service(name)?;
66                debug!("Removed service {name}");
67            }
68            RaftEntry::AssignWorkload {
69                service,
70                replica_idx,
71                node_id,
72            } => {
73                let mut assignments = self.get_assignments(service)?;
74                assignments.push(Assignment {
75                    service: service.clone(),
76                    replica_idx: *replica_idx,
77                    node_id: *node_id,
78                });
79                self.set_assignments(service, &assignments)?;
80            }
81            RaftEntry::UnassignWorkload {
82                service,
83                replica_idx,
84                node_id,
85            } => {
86                let mut assignments = self.get_assignments(service)?;
87                assignments.retain(|a| !(a.replica_idx == *replica_idx && a.node_id == *node_id));
88                self.set_assignments(service, &assignments)?;
89            }
90            RaftEntry::UpdateNodeStatus {
91                node_id,
92                status,
93                resources,
94            } => {
95                if let Ok(Some(mut node)) = self.get_node(*node_id) {
96                    node.status = *status;
97                    node.resources = Some(resources.clone());
98                    self.set_node(*node_id, &node)?;
99                }
100            }
101        }
102        Ok(())
103    }
104
105    /// Get a node by ID.
106    pub fn get_node(&self, id: u64) -> anyhow::Result<Option<NodeEntry>> {
107        let tx = self.db.begin_read()?;
108        let table = tx.open_table(NODES)?;
109        match table.get(id)? {
110            Some(val) => Ok(Some(serde_json::from_slice(val.value())?)),
111            None => Ok(None),
112        }
113    }
114
115    fn set_node(&self, id: u64, node: &NodeEntry) -> anyhow::Result<()> {
116        let data = serde_json::to_vec(node)?;
117        let tx = self.db.begin_write()?;
118        {
119            let mut table = tx.open_table(NODES)?;
120            table.insert(id, data.as_slice())?;
121        }
122        tx.commit()?;
123        Ok(())
124    }
125
126    fn remove_node(&self, id: u64) -> anyhow::Result<()> {
127        let tx = self.db.begin_write()?;
128        {
129            let mut table = tx.open_table(NODES)?;
130            table.remove(id)?;
131        }
132        tx.commit()?;
133        Ok(())
134    }
135
136    /// Get all registered nodes.
137    pub fn get_all_nodes(&self) -> anyhow::Result<HashMap<u64, NodeEntry>> {
138        let tx = self.db.begin_read()?;
139        let table = tx.open_table(NODES)?;
140        let mut nodes = HashMap::new();
141        for entry in table.iter()? {
142            let (k, v) = entry?;
143            let node: NodeEntry = serde_json::from_slice(v.value())?;
144            nodes.insert(k.value(), node);
145        }
146        Ok(nodes)
147    }
148
149    /// Get a service config by name.
150    pub fn get_service(&self, name: &str) -> anyhow::Result<Option<ServiceConfig>> {
151        let tx = self.db.begin_read()?;
152        let table = tx.open_table(SERVICES)?;
153        match table.get(name)? {
154            Some(val) => Ok(Some(serde_json::from_slice(val.value())?)),
155            None => Ok(None),
156        }
157    }
158
159    pub fn set_service(&self, name: &str, config: &ServiceConfig) -> anyhow::Result<()> {
160        let data = serde_json::to_vec(config)?;
161        let tx = self.db.begin_write()?;
162        {
163            let mut table = tx.open_table(SERVICES)?;
164            table.insert(name, data.as_slice())?;
165        }
166        tx.commit()?;
167        Ok(())
168    }
169
170    pub fn remove_service(&self, name: &str) -> anyhow::Result<()> {
171        let tx = self.db.begin_write()?;
172        {
173            let mut table = tx.open_table(SERVICES)?;
174            table.remove(name)?;
175        }
176        tx.commit()?;
177        Ok(())
178    }
179
180    /// Get all service configs.
181    pub fn get_all_services(&self) -> anyhow::Result<HashMap<String, ServiceConfig>> {
182        let tx = self.db.begin_read()?;
183        let table = tx.open_table(SERVICES)?;
184        let mut services = HashMap::new();
185        for entry in table.iter()? {
186            let (k, v) = entry?;
187            let config: ServiceConfig = serde_json::from_slice(v.value())?;
188            services.insert(k.value().to_string(), config);
189        }
190        Ok(services)
191    }
192
193    /// Get assignments for a service.
194    pub fn get_assignments(&self, service: &str) -> anyhow::Result<Vec<Assignment>> {
195        let tx = self.db.begin_read()?;
196        let table = tx.open_table(ASSIGNMENTS)?;
197        match table.get(service)? {
198            Some(val) => Ok(serde_json::from_slice(val.value())?),
199            None => Ok(Vec::new()),
200        }
201    }
202
203    fn set_assignments(&self, service: &str, assignments: &[Assignment]) -> anyhow::Result<()> {
204        let data = serde_json::to_vec(assignments)?;
205        let tx = self.db.begin_write()?;
206        {
207            let mut table = tx.open_table(ASSIGNMENTS)?;
208            table.insert(service, data.as_slice())?;
209        }
210        tx.commit()?;
211        Ok(())
212    }
213
214    /// Take a full snapshot of the store.
215    pub fn snapshot(&self) -> anyhow::Result<RaftSnapshot> {
216        Ok(RaftSnapshot {
217            nodes: self.get_all_nodes()?,
218            services: self.get_all_services()?,
219            assignments: self.get_all_assignments()?,
220        })
221    }
222
223    /// Get all assignments across all services.
224    pub fn get_all_assignments(&self) -> anyhow::Result<Vec<Assignment>> {
225        let tx = self.db.begin_read()?;
226        let table = tx.open_table(ASSIGNMENTS)?;
227        let mut all = Vec::new();
228        for entry in table.iter()? {
229            let (_, v) = entry?;
230            let assignments: Vec<Assignment> = serde_json::from_slice(v.value())?;
231            all.extend(assignments);
232        }
233        Ok(all)
234    }
235}
236
237#[cfg(test)]
238#[path = "kv_tests.rs"]
239mod tests;