1use std::path::Path;
16
17use redb::{Database, ReadableTable, TableDefinition};
18use tracing::{debug, info};
19
20use nodedb_raft::message::LogEntry;
21use nodedb_raft::state::HardState;
22use nodedb_raft::storage::LogStorage;
23
24use crate::wire_version::envelope::{decode_versioned, encode_versioned};
25
26const ENTRIES: TableDefinition<&[u8], &[u8]> = TableDefinition::new("raft.entries");
28
29const META: TableDefinition<&str, &[u8]> = TableDefinition::new("raft.meta");
31
32const KEY_HARD_STATE: &str = "hard_state";
33const KEY_SNAPSHOT_INDEX: &str = "snapshot_index";
34const KEY_SNAPSHOT_TERM: &str = "snapshot_term";
35
36pub struct RedbLogStorage {
38 db: Database,
39}
40
41impl RedbLogStorage {
42 pub fn open(path: &Path) -> crate::Result<Self> {
44 if let Some(parent) = path.parent() {
45 std::fs::create_dir_all(parent).map_err(|e| crate::ClusterError::Storage {
46 detail: format!("create raft storage dir: {e}"),
47 })?;
48 }
49
50 let db = Database::create(path).map_err(|e| crate::ClusterError::Storage {
51 detail: format!("open raft storage: {e}"),
52 })?;
53
54 let write_txn = db.begin_write().map_err(|e| crate::ClusterError::Storage {
56 detail: format!("init raft tables: {e}"),
57 })?;
58 {
59 write_txn
60 .open_table(ENTRIES)
61 .map_err(|e| crate::ClusterError::Storage {
62 detail: format!("create entries table: {e}"),
63 })?;
64 write_txn
65 .open_table(META)
66 .map_err(|e| crate::ClusterError::Storage {
67 detail: format!("create meta table: {e}"),
68 })?;
69 }
70 write_txn
71 .commit()
72 .map_err(|e| crate::ClusterError::Storage {
73 detail: format!("commit raft init: {e}"),
74 })?;
75
76 info!(path = %path.display(), "raft log storage opened");
77
78 Ok(Self { db })
79 }
80}
81
82fn index_key(index: u64) -> [u8; 8] {
83 index.to_be_bytes()
84}
85
86impl LogStorage for RedbLogStorage {
87 fn append(&mut self, entries: &[LogEntry]) -> nodedb_raft::error::Result<()> {
88 if entries.is_empty() {
89 return Ok(());
90 }
91
92 let write_txn =
93 self.db
94 .begin_write()
95 .map_err(|e| nodedb_raft::error::RaftError::Storage {
96 detail: format!("write txn: {e}"),
97 })?;
98 {
99 let mut table = write_txn.open_table(ENTRIES).map_err(|e| {
100 nodedb_raft::error::RaftError::Storage {
101 detail: format!("open entries: {e}"),
102 }
103 })?;
104
105 for entry in entries {
106 let key = index_key(entry.index);
107 let value = encode_versioned(entry).map_err(|e| {
108 nodedb_raft::error::RaftError::Storage {
109 detail: format!("serialize entry: {e}"),
110 }
111 })?;
112 table
113 .insert(key.as_slice(), value.as_slice())
114 .map_err(|e| nodedb_raft::error::RaftError::Storage {
115 detail: format!("insert entry: {e}"),
116 })?;
117 }
118 }
119 write_txn
120 .commit()
121 .map_err(|e| nodedb_raft::error::RaftError::Storage {
122 detail: format!("commit append: {e}"),
123 })?;
124
125 debug!(count = entries.len(), "raft log appended");
126 Ok(())
127 }
128
129 fn truncate(&mut self, index: u64) -> nodedb_raft::error::Result<()> {
130 let write_txn =
131 self.db
132 .begin_write()
133 .map_err(|e| nodedb_raft::error::RaftError::Storage {
134 detail: format!("write txn: {e}"),
135 })?;
136 {
137 let mut table = write_txn.open_table(ENTRIES).map_err(|e| {
138 nodedb_raft::error::RaftError::Storage {
139 detail: format!("open entries: {e}"),
140 }
141 })?;
142
143 let start = index_key(index);
145 let keys_to_remove: Vec<[u8; 8]> = table
146 .range(start.as_slice()..)
147 .map_err(|e| nodedb_raft::error::RaftError::Storage {
148 detail: format!("range: {e}"),
149 })?
150 .filter_map(|r| {
151 r.ok().map(|(k, _)| {
152 let mut buf = [0u8; 8];
153 buf.copy_from_slice(k.value());
154 buf
155 })
156 })
157 .collect();
158
159 for key in &keys_to_remove {
160 table.remove(key.as_slice()).map_err(|e| {
161 nodedb_raft::error::RaftError::Storage {
162 detail: format!("remove: {e}"),
163 }
164 })?;
165 }
166 }
167 write_txn
168 .commit()
169 .map_err(|e| nodedb_raft::error::RaftError::Storage {
170 detail: format!("commit truncate: {e}"),
171 })?;
172
173 debug!(from_index = index, "raft log truncated");
174 Ok(())
175 }
176
177 fn load_entries_after(&self, snapshot_index: u64) -> nodedb_raft::error::Result<Vec<LogEntry>> {
178 let read_txn =
179 self.db
180 .begin_read()
181 .map_err(|e| nodedb_raft::error::RaftError::Storage {
182 detail: format!("read txn: {e}"),
183 })?;
184 let table =
185 read_txn
186 .open_table(ENTRIES)
187 .map_err(|e| nodedb_raft::error::RaftError::Storage {
188 detail: format!("open entries: {e}"),
189 })?;
190
191 let start = index_key(snapshot_index + 1);
193 let mut entries = Vec::new();
194
195 for result in
196 table
197 .range(start.as_slice()..)
198 .map_err(|e| nodedb_raft::error::RaftError::Storage {
199 detail: format!("range: {e}"),
200 })?
201 {
202 let (_, value) = result.map_err(|e| nodedb_raft::error::RaftError::Storage {
203 detail: format!("entry read: {e}"),
204 })?;
205 let entry: LogEntry = decode_versioned(value.value()).map_err(|e| {
206 nodedb_raft::error::RaftError::Storage {
207 detail: format!("deserialize entry: {e}"),
208 }
209 })?;
210 entries.push(entry);
211 }
212
213 debug!(
214 count = entries.len(),
215 after = snapshot_index,
216 "raft log loaded"
217 );
218 Ok(entries)
219 }
220
221 fn compact(&mut self, index: u64, term: u64) -> nodedb_raft::error::Result<()> {
222 let write_txn =
223 self.db
224 .begin_write()
225 .map_err(|e| nodedb_raft::error::RaftError::Storage {
226 detail: format!("write txn: {e}"),
227 })?;
228 {
229 let mut table = write_txn.open_table(ENTRIES).map_err(|e| {
231 nodedb_raft::error::RaftError::Storage {
232 detail: format!("open entries: {e}"),
233 }
234 })?;
235
236 let end = index_key(index + 1);
237 let keys_to_remove: Vec<[u8; 8]> = table
238 .range(..end.as_slice())
239 .map_err(|e| nodedb_raft::error::RaftError::Storage {
240 detail: format!("range: {e}"),
241 })?
242 .filter_map(|r| {
243 r.ok().map(|(k, _)| {
244 let mut buf = [0u8; 8];
245 buf.copy_from_slice(k.value());
246 buf
247 })
248 })
249 .collect();
250
251 for key in &keys_to_remove {
252 table.remove(key.as_slice()).map_err(|e| {
253 nodedb_raft::error::RaftError::Storage {
254 detail: format!("remove: {e}"),
255 }
256 })?;
257 }
258
259 let mut meta =
261 write_txn
262 .open_table(META)
263 .map_err(|e| nodedb_raft::error::RaftError::Storage {
264 detail: format!("open meta: {e}"),
265 })?;
266
267 let idx_bytes = zerompk::to_msgpack_vec(&index).map_err(|e| {
268 nodedb_raft::error::RaftError::Storage {
269 detail: format!("serialize: {e}"),
270 }
271 })?;
272 let term_bytes = zerompk::to_msgpack_vec(&term).map_err(|e| {
273 nodedb_raft::error::RaftError::Storage {
274 detail: format!("serialize: {e}"),
275 }
276 })?;
277
278 meta.insert(KEY_SNAPSHOT_INDEX, idx_bytes.as_slice())
279 .map_err(|e| nodedb_raft::error::RaftError::Storage {
280 detail: format!("insert meta: {e}"),
281 })?;
282 meta.insert(KEY_SNAPSHOT_TERM, term_bytes.as_slice())
283 .map_err(|e| nodedb_raft::error::RaftError::Storage {
284 detail: format!("insert meta: {e}"),
285 })?;
286 }
287 write_txn
288 .commit()
289 .map_err(|e| nodedb_raft::error::RaftError::Storage {
290 detail: format!("commit compact: {e}"),
291 })?;
292
293 debug!(index, term, "raft log compacted");
294 Ok(())
295 }
296
297 fn snapshot_metadata(&self) -> (u64, u64) {
298 let Ok(read_txn) = self.db.begin_read() else {
299 return (0, 0);
300 };
301 let Ok(table) = read_txn.open_table(META) else {
302 return (0, 0);
303 };
304
305 let index = table
306 .get(KEY_SNAPSHOT_INDEX)
307 .ok()
308 .flatten()
309 .and_then(|v| zerompk::from_msgpack::<u64>(v.value()).ok())
310 .unwrap_or(0);
311 let term = table
312 .get(KEY_SNAPSHOT_TERM)
313 .ok()
314 .flatten()
315 .and_then(|v| zerompk::from_msgpack::<u64>(v.value()).ok())
316 .unwrap_or(0);
317
318 (index, term)
319 }
320
321 fn save_hard_state(&mut self, state: &HardState) -> nodedb_raft::error::Result<()> {
322 let write_txn =
323 self.db
324 .begin_write()
325 .map_err(|e| nodedb_raft::error::RaftError::Storage {
326 detail: format!("write txn: {e}"),
327 })?;
328 {
329 let mut table =
330 write_txn
331 .open_table(META)
332 .map_err(|e| nodedb_raft::error::RaftError::Storage {
333 detail: format!("open meta: {e}"),
334 })?;
335
336 let bytes = zerompk::to_msgpack_vec(state).map_err(|e| {
337 nodedb_raft::error::RaftError::Storage {
338 detail: format!("serialize: {e}"),
339 }
340 })?;
341 table
342 .insert(KEY_HARD_STATE, bytes.as_slice())
343 .map_err(|e| nodedb_raft::error::RaftError::Storage {
344 detail: format!("insert: {e}"),
345 })?;
346 }
347 write_txn
348 .commit()
349 .map_err(|e| nodedb_raft::error::RaftError::Storage {
350 detail: format!("commit: {e}"),
351 })?;
352
353 debug!(
354 term = state.current_term,
355 voted_for = state.voted_for,
356 "raft hard state saved"
357 );
358 Ok(())
359 }
360
361 fn load_hard_state(&self) -> nodedb_raft::error::Result<HardState> {
362 let read_txn =
363 self.db
364 .begin_read()
365 .map_err(|e| nodedb_raft::error::RaftError::Storage {
366 detail: format!("read txn: {e}"),
367 })?;
368 let table =
369 read_txn
370 .open_table(META)
371 .map_err(|e| nodedb_raft::error::RaftError::Storage {
372 detail: format!("open meta: {e}"),
373 })?;
374
375 match table.get(KEY_HARD_STATE) {
376 Ok(Some(value)) => {
377 let state: HardState = zerompk::from_msgpack(value.value()).map_err(|e| {
378 nodedb_raft::error::RaftError::Storage {
379 detail: format!("deserialize: {e}"),
380 }
381 })?;
382 Ok(state)
383 }
384 Ok(None) => Ok(HardState::default()),
385 Err(e) => Err(nodedb_raft::error::RaftError::Storage {
386 detail: format!("get hard state: {e}"),
387 }),
388 }
389 }
390}
391
392#[cfg(test)]
393mod tests {
394 use super::*;
395
396 fn open_temp() -> (RedbLogStorage, tempfile::TempDir) {
397 let dir = tempfile::tempdir().unwrap();
398 let path = dir.path().join("test-raft.redb");
399 let storage = RedbLogStorage::open(&path).unwrap();
400 (storage, dir)
401 }
402
403 #[test]
404 fn append_and_load() {
405 let (mut s, _dir) = open_temp();
406 let entries = vec![
407 LogEntry {
408 term: 1,
409 index: 1,
410 data: b"cmd-a".to_vec(),
411 },
412 LogEntry {
413 term: 1,
414 index: 2,
415 data: b"cmd-b".to_vec(),
416 },
417 LogEntry {
418 term: 2,
419 index: 3,
420 data: b"cmd-c".to_vec(),
421 },
422 ];
423 s.append(&entries).unwrap();
424
425 let loaded = s.load_entries_after(0).unwrap();
426 assert_eq!(loaded.len(), 3);
427 assert_eq!(loaded[0].data, b"cmd-a");
428 assert_eq!(loaded[2].term, 2);
429 }
430
431 #[test]
432 fn truncate_removes_tail() {
433 let (mut s, _dir) = open_temp();
434 for i in 1..=5 {
435 s.append(&[LogEntry {
436 term: 1,
437 index: i,
438 data: vec![],
439 }])
440 .unwrap();
441 }
442 s.truncate(3).unwrap();
443 let loaded = s.load_entries_after(0).unwrap();
444 assert_eq!(loaded.len(), 2);
445 assert_eq!(loaded.last().unwrap().index, 2);
446 }
447
448 #[test]
449 fn compact_removes_prefix() {
450 let (mut s, _dir) = open_temp();
451 for i in 1..=10 {
452 s.append(&[LogEntry {
453 term: 1,
454 index: i,
455 data: vec![],
456 }])
457 .unwrap();
458 }
459 s.compact(5, 1).unwrap();
460 assert_eq!(s.snapshot_metadata(), (5, 1));
461 let loaded = s.load_entries_after(5).unwrap();
462 assert_eq!(loaded.len(), 5);
463 assert_eq!(loaded[0].index, 6);
464 }
465
466 #[test]
467 fn hard_state_roundtrip() {
468 let (mut s, _dir) = open_temp();
469 let hs = HardState {
470 current_term: 7,
471 voted_for: 3,
472 };
473 s.save_hard_state(&hs).unwrap();
474 let loaded = s.load_hard_state().unwrap();
475 assert_eq!(loaded, hs);
476 }
477
478 #[test]
480 fn versioned_roundtrip() {
481 let (mut s, _dir) = open_temp();
482 let entry = LogEntry {
483 term: 3,
484 index: 1,
485 data: b"versioned-payload".to_vec(),
486 };
487 s.append(std::slice::from_ref(&entry)).unwrap();
488 let loaded = s.load_entries_after(0).unwrap();
489 assert_eq!(loaded.len(), 1);
490 assert_eq!(loaded[0], entry);
491 }
492
493 #[test]
496 fn version_mismatch_returns_error() {
497 let (s, _dir) = open_temp();
498
499 let inner = zerompk::to_msgpack_vec(&LogEntry {
501 term: 1,
502 index: 77,
503 data: vec![],
504 })
505 .unwrap();
506
507 let mut bad_bytes = Vec::new();
509 bad_bytes.push(0xc1u8); bad_bytes.extend_from_slice(&9999u16.to_be_bytes());
511 bad_bytes.extend_from_slice(&(inner.len() as u32).to_be_bytes());
512 bad_bytes.extend_from_slice(&inner);
513
514 let key = index_key(77);
515 let write_txn = s.db.begin_write().unwrap();
516 {
517 let mut table = write_txn.open_table(ENTRIES).unwrap();
518 table.insert(key.as_slice(), bad_bytes.as_slice()).unwrap();
519 }
520 write_txn.commit().unwrap();
521
522 let err = s.load_entries_after(76).unwrap_err();
523 match err {
524 nodedb_raft::error::RaftError::Storage { detail } => {
525 assert!(
526 detail.contains("deserialize"),
527 "expected 'deserialize' in detail: {detail}"
528 );
529 }
530 other => panic!("expected Storage error, got: {other}"),
531 }
532 }
533
534 #[test]
535 fn survives_reopen() {
536 let dir = tempfile::tempdir().unwrap();
537 let path = dir.path().join("reopen-raft.redb");
538
539 {
540 let mut s = RedbLogStorage::open(&path).unwrap();
541 s.append(&[LogEntry {
542 term: 1,
543 index: 1,
544 data: b"durable".to_vec(),
545 }])
546 .unwrap();
547 s.save_hard_state(&HardState {
548 current_term: 3,
549 voted_for: 1,
550 })
551 .unwrap();
552 }
553
554 let s = RedbLogStorage::open(&path).unwrap();
555 let loaded = s.load_entries_after(0).unwrap();
556 assert_eq!(loaded.len(), 1);
557 assert_eq!(loaded[0].data, b"durable");
558 let hs = s.load_hard_state().unwrap();
559 assert_eq!(hs.current_term, 3);
560 }
561}