1use std::path::Path;
14
15use redb::{Database, ReadableTable, TableDefinition};
16use tracing::{debug, info};
17
18use nodedb_raft::message::LogEntry;
19use nodedb_raft::state::HardState;
20use nodedb_raft::storage::LogStorage;
21
22const ENTRIES: TableDefinition<&[u8], &[u8]> = TableDefinition::new("raft.entries");
24
25const META: TableDefinition<&str, &[u8]> = TableDefinition::new("raft.meta");
27
28const KEY_HARD_STATE: &str = "hard_state";
29const KEY_SNAPSHOT_INDEX: &str = "snapshot_index";
30const KEY_SNAPSHOT_TERM: &str = "snapshot_term";
31
32pub struct RedbLogStorage {
34 db: Database,
35}
36
37impl RedbLogStorage {
38 pub fn open(path: &Path) -> crate::Result<Self> {
40 if let Some(parent) = path.parent() {
41 std::fs::create_dir_all(parent).map_err(|e| crate::ClusterError::Storage {
42 detail: format!("create raft storage dir: {e}"),
43 })?;
44 }
45
46 let db = Database::create(path).map_err(|e| crate::ClusterError::Storage {
47 detail: format!("open raft storage: {e}"),
48 })?;
49
50 let write_txn = db.begin_write().map_err(|e| crate::ClusterError::Storage {
52 detail: format!("init raft tables: {e}"),
53 })?;
54 {
55 write_txn
56 .open_table(ENTRIES)
57 .map_err(|e| crate::ClusterError::Storage {
58 detail: format!("create entries table: {e}"),
59 })?;
60 write_txn
61 .open_table(META)
62 .map_err(|e| crate::ClusterError::Storage {
63 detail: format!("create meta table: {e}"),
64 })?;
65 }
66 write_txn
67 .commit()
68 .map_err(|e| crate::ClusterError::Storage {
69 detail: format!("commit raft init: {e}"),
70 })?;
71
72 info!(path = %path.display(), "raft log storage opened");
73
74 Ok(Self { db })
75 }
76}
77
78fn index_key(index: u64) -> [u8; 8] {
79 index.to_be_bytes()
80}
81
82impl LogStorage for RedbLogStorage {
83 fn append(&mut self, entries: &[LogEntry]) -> nodedb_raft::error::Result<()> {
84 if entries.is_empty() {
85 return Ok(());
86 }
87
88 let write_txn =
89 self.db
90 .begin_write()
91 .map_err(|e| nodedb_raft::error::RaftError::Storage {
92 detail: format!("write txn: {e}"),
93 })?;
94 {
95 let mut table = write_txn.open_table(ENTRIES).map_err(|e| {
96 nodedb_raft::error::RaftError::Storage {
97 detail: format!("open entries: {e}"),
98 }
99 })?;
100
101 for entry in entries {
102 let key = index_key(entry.index);
103 let value = zerompk::to_msgpack_vec(entry).map_err(|e| {
104 nodedb_raft::error::RaftError::Storage {
105 detail: format!("serialize entry: {e}"),
106 }
107 })?;
108 table
109 .insert(key.as_slice(), value.as_slice())
110 .map_err(|e| nodedb_raft::error::RaftError::Storage {
111 detail: format!("insert entry: {e}"),
112 })?;
113 }
114 }
115 write_txn
116 .commit()
117 .map_err(|e| nodedb_raft::error::RaftError::Storage {
118 detail: format!("commit append: {e}"),
119 })?;
120
121 debug!(count = entries.len(), "raft log appended");
122 Ok(())
123 }
124
125 fn truncate(&mut self, index: u64) -> nodedb_raft::error::Result<()> {
126 let write_txn =
127 self.db
128 .begin_write()
129 .map_err(|e| nodedb_raft::error::RaftError::Storage {
130 detail: format!("write txn: {e}"),
131 })?;
132 {
133 let mut table = write_txn.open_table(ENTRIES).map_err(|e| {
134 nodedb_raft::error::RaftError::Storage {
135 detail: format!("open entries: {e}"),
136 }
137 })?;
138
139 let start = index_key(index);
141 let keys_to_remove: Vec<[u8; 8]> = table
142 .range(start.as_slice()..)
143 .map_err(|e| nodedb_raft::error::RaftError::Storage {
144 detail: format!("range: {e}"),
145 })?
146 .filter_map(|r| {
147 r.ok().map(|(k, _)| {
148 let mut buf = [0u8; 8];
149 buf.copy_from_slice(k.value());
150 buf
151 })
152 })
153 .collect();
154
155 for key in &keys_to_remove {
156 table.remove(key.as_slice()).map_err(|e| {
157 nodedb_raft::error::RaftError::Storage {
158 detail: format!("remove: {e}"),
159 }
160 })?;
161 }
162 }
163 write_txn
164 .commit()
165 .map_err(|e| nodedb_raft::error::RaftError::Storage {
166 detail: format!("commit truncate: {e}"),
167 })?;
168
169 debug!(from_index = index, "raft log truncated");
170 Ok(())
171 }
172
173 fn load_entries_after(&self, snapshot_index: u64) -> nodedb_raft::error::Result<Vec<LogEntry>> {
174 let read_txn =
175 self.db
176 .begin_read()
177 .map_err(|e| nodedb_raft::error::RaftError::Storage {
178 detail: format!("read txn: {e}"),
179 })?;
180 let table =
181 read_txn
182 .open_table(ENTRIES)
183 .map_err(|e| nodedb_raft::error::RaftError::Storage {
184 detail: format!("open entries: {e}"),
185 })?;
186
187 let start = index_key(snapshot_index + 1);
189 let mut entries = Vec::new();
190
191 for result in
192 table
193 .range(start.as_slice()..)
194 .map_err(|e| nodedb_raft::error::RaftError::Storage {
195 detail: format!("range: {e}"),
196 })?
197 {
198 let (_, value) = result.map_err(|e| nodedb_raft::error::RaftError::Storage {
199 detail: format!("entry read: {e}"),
200 })?;
201 let entry: LogEntry = zerompk::from_msgpack(value.value()).map_err(|e| {
202 nodedb_raft::error::RaftError::Storage {
203 detail: format!("deserialize entry: {e}"),
204 }
205 })?;
206 entries.push(entry);
207 }
208
209 debug!(
210 count = entries.len(),
211 after = snapshot_index,
212 "raft log loaded"
213 );
214 Ok(entries)
215 }
216
217 fn compact(&mut self, index: u64, term: u64) -> nodedb_raft::error::Result<()> {
218 let write_txn =
219 self.db
220 .begin_write()
221 .map_err(|e| nodedb_raft::error::RaftError::Storage {
222 detail: format!("write txn: {e}"),
223 })?;
224 {
225 let mut table = write_txn.open_table(ENTRIES).map_err(|e| {
227 nodedb_raft::error::RaftError::Storage {
228 detail: format!("open entries: {e}"),
229 }
230 })?;
231
232 let end = index_key(index + 1);
233 let keys_to_remove: Vec<[u8; 8]> = table
234 .range(..end.as_slice())
235 .map_err(|e| nodedb_raft::error::RaftError::Storage {
236 detail: format!("range: {e}"),
237 })?
238 .filter_map(|r| {
239 r.ok().map(|(k, _)| {
240 let mut buf = [0u8; 8];
241 buf.copy_from_slice(k.value());
242 buf
243 })
244 })
245 .collect();
246
247 for key in &keys_to_remove {
248 table.remove(key.as_slice()).map_err(|e| {
249 nodedb_raft::error::RaftError::Storage {
250 detail: format!("remove: {e}"),
251 }
252 })?;
253 }
254
255 let mut meta =
257 write_txn
258 .open_table(META)
259 .map_err(|e| nodedb_raft::error::RaftError::Storage {
260 detail: format!("open meta: {e}"),
261 })?;
262
263 let idx_bytes = zerompk::to_msgpack_vec(&index).map_err(|e| {
264 nodedb_raft::error::RaftError::Storage {
265 detail: format!("serialize: {e}"),
266 }
267 })?;
268 let term_bytes = zerompk::to_msgpack_vec(&term).map_err(|e| {
269 nodedb_raft::error::RaftError::Storage {
270 detail: format!("serialize: {e}"),
271 }
272 })?;
273
274 meta.insert(KEY_SNAPSHOT_INDEX, idx_bytes.as_slice())
275 .map_err(|e| nodedb_raft::error::RaftError::Storage {
276 detail: format!("insert meta: {e}"),
277 })?;
278 meta.insert(KEY_SNAPSHOT_TERM, term_bytes.as_slice())
279 .map_err(|e| nodedb_raft::error::RaftError::Storage {
280 detail: format!("insert meta: {e}"),
281 })?;
282 }
283 write_txn
284 .commit()
285 .map_err(|e| nodedb_raft::error::RaftError::Storage {
286 detail: format!("commit compact: {e}"),
287 })?;
288
289 debug!(index, term, "raft log compacted");
290 Ok(())
291 }
292
293 fn snapshot_metadata(&self) -> (u64, u64) {
294 let Ok(read_txn) = self.db.begin_read() else {
295 return (0, 0);
296 };
297 let Ok(table) = read_txn.open_table(META) else {
298 return (0, 0);
299 };
300
301 let index = table
302 .get(KEY_SNAPSHOT_INDEX)
303 .ok()
304 .flatten()
305 .and_then(|v| zerompk::from_msgpack::<u64>(v.value()).ok())
306 .unwrap_or(0);
307 let term = table
308 .get(KEY_SNAPSHOT_TERM)
309 .ok()
310 .flatten()
311 .and_then(|v| zerompk::from_msgpack::<u64>(v.value()).ok())
312 .unwrap_or(0);
313
314 (index, term)
315 }
316
317 fn save_hard_state(&mut self, state: &HardState) -> nodedb_raft::error::Result<()> {
318 let write_txn =
319 self.db
320 .begin_write()
321 .map_err(|e| nodedb_raft::error::RaftError::Storage {
322 detail: format!("write txn: {e}"),
323 })?;
324 {
325 let mut table =
326 write_txn
327 .open_table(META)
328 .map_err(|e| nodedb_raft::error::RaftError::Storage {
329 detail: format!("open meta: {e}"),
330 })?;
331
332 let bytes = zerompk::to_msgpack_vec(state).map_err(|e| {
333 nodedb_raft::error::RaftError::Storage {
334 detail: format!("serialize: {e}"),
335 }
336 })?;
337 table
338 .insert(KEY_HARD_STATE, bytes.as_slice())
339 .map_err(|e| nodedb_raft::error::RaftError::Storage {
340 detail: format!("insert: {e}"),
341 })?;
342 }
343 write_txn
344 .commit()
345 .map_err(|e| nodedb_raft::error::RaftError::Storage {
346 detail: format!("commit: {e}"),
347 })?;
348
349 debug!(
350 term = state.current_term,
351 voted_for = state.voted_for,
352 "raft hard state saved"
353 );
354 Ok(())
355 }
356
357 fn load_hard_state(&self) -> nodedb_raft::error::Result<HardState> {
358 let read_txn =
359 self.db
360 .begin_read()
361 .map_err(|e| nodedb_raft::error::RaftError::Storage {
362 detail: format!("read txn: {e}"),
363 })?;
364 let table =
365 read_txn
366 .open_table(META)
367 .map_err(|e| nodedb_raft::error::RaftError::Storage {
368 detail: format!("open meta: {e}"),
369 })?;
370
371 match table.get(KEY_HARD_STATE) {
372 Ok(Some(value)) => {
373 let state: HardState = zerompk::from_msgpack(value.value()).map_err(|e| {
374 nodedb_raft::error::RaftError::Storage {
375 detail: format!("deserialize: {e}"),
376 }
377 })?;
378 Ok(state)
379 }
380 Ok(None) => Ok(HardState::default()),
381 Err(e) => Err(nodedb_raft::error::RaftError::Storage {
382 detail: format!("get hard state: {e}"),
383 }),
384 }
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use super::*;
391
392 fn open_temp() -> (RedbLogStorage, tempfile::TempDir) {
393 let dir = tempfile::tempdir().unwrap();
394 let path = dir.path().join("test-raft.redb");
395 let storage = RedbLogStorage::open(&path).unwrap();
396 (storage, dir)
397 }
398
399 #[test]
400 fn append_and_load() {
401 let (mut s, _dir) = open_temp();
402 let entries = vec![
403 LogEntry {
404 term: 1,
405 index: 1,
406 data: b"cmd-a".to_vec(),
407 },
408 LogEntry {
409 term: 1,
410 index: 2,
411 data: b"cmd-b".to_vec(),
412 },
413 LogEntry {
414 term: 2,
415 index: 3,
416 data: b"cmd-c".to_vec(),
417 },
418 ];
419 s.append(&entries).unwrap();
420
421 let loaded = s.load_entries_after(0).unwrap();
422 assert_eq!(loaded.len(), 3);
423 assert_eq!(loaded[0].data, b"cmd-a");
424 assert_eq!(loaded[2].term, 2);
425 }
426
427 #[test]
428 fn truncate_removes_tail() {
429 let (mut s, _dir) = open_temp();
430 for i in 1..=5 {
431 s.append(&[LogEntry {
432 term: 1,
433 index: i,
434 data: vec![],
435 }])
436 .unwrap();
437 }
438 s.truncate(3).unwrap();
439 let loaded = s.load_entries_after(0).unwrap();
440 assert_eq!(loaded.len(), 2);
441 assert_eq!(loaded.last().unwrap().index, 2);
442 }
443
444 #[test]
445 fn compact_removes_prefix() {
446 let (mut s, _dir) = open_temp();
447 for i in 1..=10 {
448 s.append(&[LogEntry {
449 term: 1,
450 index: i,
451 data: vec![],
452 }])
453 .unwrap();
454 }
455 s.compact(5, 1).unwrap();
456 assert_eq!(s.snapshot_metadata(), (5, 1));
457 let loaded = s.load_entries_after(5).unwrap();
458 assert_eq!(loaded.len(), 5);
459 assert_eq!(loaded[0].index, 6);
460 }
461
462 #[test]
463 fn hard_state_roundtrip() {
464 let (mut s, _dir) = open_temp();
465 let hs = HardState {
466 current_term: 7,
467 voted_for: 3,
468 };
469 s.save_hard_state(&hs).unwrap();
470 let loaded = s.load_hard_state().unwrap();
471 assert_eq!(loaded, hs);
472 }
473
474 #[test]
475 fn survives_reopen() {
476 let dir = tempfile::tempdir().unwrap();
477 let path = dir.path().join("reopen-raft.redb");
478
479 {
480 let mut s = RedbLogStorage::open(&path).unwrap();
481 s.append(&[LogEntry {
482 term: 1,
483 index: 1,
484 data: b"durable".to_vec(),
485 }])
486 .unwrap();
487 s.save_hard_state(&HardState {
488 current_term: 3,
489 voted_for: 1,
490 })
491 .unwrap();
492 }
493
494 let s = RedbLogStorage::open(&path).unwrap();
495 let loaded = s.load_entries_after(0).unwrap();
496 assert_eq!(loaded.len(), 1);
497 assert_eq!(loaded[0].data, b"durable");
498 let hs = s.load_hard_state().unwrap();
499 assert_eq!(hs.current_term, 3);
500 }
501}