1use std::collections::HashMap;
4use std::path::Path;
5
6use redb::{Database, ReadableTable, TableDefinition};
7use tracing::debug;
8
9use orca_core::config::ServiceConfig;
10use orca_core::types::NodeStatus;
11
12use super::types::{Assignment, NodeEntry, RaftEntry, RaftSnapshot};
13
14pub(super) const NODES: TableDefinition<u64, &[u8]> = TableDefinition::new("nodes");
15pub(super) const SERVICES: TableDefinition<&str, &[u8]> = TableDefinition::new("services");
16pub(super) const ASSIGNMENTS: TableDefinition<&str, &[u8]> = TableDefinition::new("assignments");
17
18pub struct ClusterStore {
20 pub(super) db: Database,
21}
22
23impl ClusterStore {
24 pub fn open(path: &Path) -> anyhow::Result<Self> {
26 let db = Database::create(path)?;
27 let tx = db.begin_write()?;
29 {
30 let _ = tx.open_table(NODES)?;
31 let _ = tx.open_table(SERVICES)?;
32 let _ = tx.open_table(ASSIGNMENTS)?;
33 }
34 tx.commit()?;
35 Ok(Self { db })
36 }
37
38 pub fn apply(&self, entry: &RaftEntry) -> anyhow::Result<()> {
40 match entry {
41 RaftEntry::RegisterNode {
42 node_id,
43 address,
44 labels,
45 } => {
46 let node = NodeEntry {
47 node_id: *node_id,
48 address: address.clone(),
49 labels: labels.clone(),
50 status: NodeStatus::Ready,
51 resources: None,
52 };
53 self.set_node(*node_id, &node)?;
54 debug!("Registered node {node_id} at {address}");
55 }
56 RaftEntry::DeregisterNode { node_id } => {
57 self.remove_node(*node_id)?;
58 debug!("Deregistered node {node_id}");
59 }
60 RaftEntry::SetService(config) => {
61 self.set_service(&config.name, config)?;
62 debug!("Set service {}", config.name);
63 }
64 RaftEntry::RemoveService(name) => {
65 self.remove_service(name)?;
66 debug!("Removed service {name}");
67 }
68 RaftEntry::AssignWorkload {
69 service,
70 replica_idx,
71 node_id,
72 } => {
73 let mut assignments = self.get_assignments(service)?;
74 assignments.push(Assignment {
75 service: service.clone(),
76 replica_idx: *replica_idx,
77 node_id: *node_id,
78 });
79 self.set_assignments(service, &assignments)?;
80 }
81 RaftEntry::UnassignWorkload {
82 service,
83 replica_idx,
84 node_id,
85 } => {
86 let mut assignments = self.get_assignments(service)?;
87 assignments.retain(|a| !(a.replica_idx == *replica_idx && a.node_id == *node_id));
88 self.set_assignments(service, &assignments)?;
89 }
90 RaftEntry::UpdateNodeStatus {
91 node_id,
92 status,
93 resources,
94 } => {
95 if let Ok(Some(mut node)) = self.get_node(*node_id) {
96 node.status = *status;
97 node.resources = Some(resources.clone());
98 self.set_node(*node_id, &node)?;
99 }
100 }
101 }
102 Ok(())
103 }
104
105 pub fn get_node(&self, id: u64) -> anyhow::Result<Option<NodeEntry>> {
107 let tx = self.db.begin_read()?;
108 let table = tx.open_table(NODES)?;
109 match table.get(id)? {
110 Some(val) => Ok(Some(serde_json::from_slice(val.value())?)),
111 None => Ok(None),
112 }
113 }
114
115 fn set_node(&self, id: u64, node: &NodeEntry) -> anyhow::Result<()> {
116 let data = serde_json::to_vec(node)?;
117 let tx = self.db.begin_write()?;
118 {
119 let mut table = tx.open_table(NODES)?;
120 table.insert(id, data.as_slice())?;
121 }
122 tx.commit()?;
123 Ok(())
124 }
125
126 fn remove_node(&self, id: u64) -> anyhow::Result<()> {
127 let tx = self.db.begin_write()?;
128 {
129 let mut table = tx.open_table(NODES)?;
130 table.remove(id)?;
131 }
132 tx.commit()?;
133 Ok(())
134 }
135
136 pub fn get_all_nodes(&self) -> anyhow::Result<HashMap<u64, NodeEntry>> {
138 let tx = self.db.begin_read()?;
139 let table = tx.open_table(NODES)?;
140 let mut nodes = HashMap::new();
141 for entry in table.iter()? {
142 let (k, v) = entry?;
143 let node: NodeEntry = serde_json::from_slice(v.value())?;
144 nodes.insert(k.value(), node);
145 }
146 Ok(nodes)
147 }
148
149 pub fn get_service(&self, name: &str) -> anyhow::Result<Option<ServiceConfig>> {
151 let tx = self.db.begin_read()?;
152 let table = tx.open_table(SERVICES)?;
153 match table.get(name)? {
154 Some(val) => Ok(Some(serde_json::from_slice(val.value())?)),
155 None => Ok(None),
156 }
157 }
158
159 pub fn set_service(&self, name: &str, config: &ServiceConfig) -> anyhow::Result<()> {
160 let data = serde_json::to_vec(config)?;
161 let tx = self.db.begin_write()?;
162 {
163 let mut table = tx.open_table(SERVICES)?;
164 table.insert(name, data.as_slice())?;
165 }
166 tx.commit()?;
167 Ok(())
168 }
169
170 pub fn remove_service(&self, name: &str) -> anyhow::Result<()> {
171 let tx = self.db.begin_write()?;
172 {
173 let mut table = tx.open_table(SERVICES)?;
174 table.remove(name)?;
175 }
176 tx.commit()?;
177 Ok(())
178 }
179
180 pub fn get_all_services(&self) -> anyhow::Result<HashMap<String, ServiceConfig>> {
182 let tx = self.db.begin_read()?;
183 let table = tx.open_table(SERVICES)?;
184 let mut services = HashMap::new();
185 for entry in table.iter()? {
186 let (k, v) = entry?;
187 let config: ServiceConfig = serde_json::from_slice(v.value())?;
188 services.insert(k.value().to_string(), config);
189 }
190 Ok(services)
191 }
192
193 pub fn get_assignments(&self, service: &str) -> anyhow::Result<Vec<Assignment>> {
195 let tx = self.db.begin_read()?;
196 let table = tx.open_table(ASSIGNMENTS)?;
197 match table.get(service)? {
198 Some(val) => Ok(serde_json::from_slice(val.value())?),
199 None => Ok(Vec::new()),
200 }
201 }
202
203 fn set_assignments(&self, service: &str, assignments: &[Assignment]) -> anyhow::Result<()> {
204 let data = serde_json::to_vec(assignments)?;
205 let tx = self.db.begin_write()?;
206 {
207 let mut table = tx.open_table(ASSIGNMENTS)?;
208 table.insert(service, data.as_slice())?;
209 }
210 tx.commit()?;
211 Ok(())
212 }
213
214 pub fn snapshot(&self) -> anyhow::Result<RaftSnapshot> {
216 Ok(RaftSnapshot {
217 nodes: self.get_all_nodes()?,
218 services: self.get_all_services()?,
219 assignments: self.get_all_assignments()?,
220 })
221 }
222
223 pub fn get_all_assignments(&self) -> anyhow::Result<Vec<Assignment>> {
225 let tx = self.db.begin_read()?;
226 let table = tx.open_table(ASSIGNMENTS)?;
227 let mut all = Vec::new();
228 for entry in table.iter()? {
229 let (_, v) = entry?;
230 let assignments: Vec<Assignment> = serde_json::from_slice(v.value())?;
231 all.extend(assignments);
232 }
233 Ok(all)
234 }
235}
236
237#[cfg(test)]
238#[path = "kv_tests.rs"]
239mod tests;