Skip to main content

nodedb_cluster/
raft_storage.rs

1//! Persistent Raft log storage backed by redb.
2//!
3//! Implements `nodedb_raft::LogStorage` with ACID durability via redb.
4//! Each Raft group gets its own redb file at `{data_dir}/raft/group-{id}.redb`.
5//!
6//! Layout:
7//! - `ENTRIES` table: key = log index (u64 big-endian), value = MessagePack LogEntry
8//! - `META` table: key = name (&str), value = MessagePack-encoded metadata
9//!   - "hard_state" → HardState { current_term, voted_for }
10//!   - "snapshot_index" → u64
11//!   - "snapshot_term" → u64
12
13use std::path::Path;
14
15use redb::{Database, ReadableTable, TableDefinition};
16use tracing::{debug, info};
17
18use nodedb_raft::message::LogEntry;
19use nodedb_raft::state::HardState;
20use nodedb_raft::storage::LogStorage;
21
22/// Log entries: key = index (big-endian u64 for sorted iteration), value = MessagePack bytes.
23const ENTRIES: TableDefinition<&[u8], &[u8]> = TableDefinition::new("raft.entries");
24
25/// Metadata: key = name, value = MessagePack bytes.
26const META: TableDefinition<&str, &[u8]> = TableDefinition::new("raft.meta");
27
28const KEY_HARD_STATE: &str = "hard_state";
29const KEY_SNAPSHOT_INDEX: &str = "snapshot_index";
30const KEY_SNAPSHOT_TERM: &str = "snapshot_term";
31
32/// Persistent Raft log storage backed by redb.
33pub struct RedbLogStorage {
34    db: Database,
35}
36
37impl RedbLogStorage {
38    /// Open or create storage at the given path.
39    pub fn open(path: &Path) -> crate::Result<Self> {
40        if let Some(parent) = path.parent() {
41            std::fs::create_dir_all(parent).map_err(|e| crate::ClusterError::Storage {
42                detail: format!("create raft storage dir: {e}"),
43            })?;
44        }
45
46        let db = Database::create(path).map_err(|e| crate::ClusterError::Storage {
47            detail: format!("open raft storage: {e}"),
48        })?;
49
50        // Ensure tables exist.
51        let write_txn = db.begin_write().map_err(|e| crate::ClusterError::Storage {
52            detail: format!("init raft tables: {e}"),
53        })?;
54        {
55            write_txn
56                .open_table(ENTRIES)
57                .map_err(|e| crate::ClusterError::Storage {
58                    detail: format!("create entries table: {e}"),
59                })?;
60            write_txn
61                .open_table(META)
62                .map_err(|e| crate::ClusterError::Storage {
63                    detail: format!("create meta table: {e}"),
64                })?;
65        }
66        write_txn
67            .commit()
68            .map_err(|e| crate::ClusterError::Storage {
69                detail: format!("commit raft init: {e}"),
70            })?;
71
72        info!(path = %path.display(), "raft log storage opened");
73
74        Ok(Self { db })
75    }
76}
77
78fn index_key(index: u64) -> [u8; 8] {
79    index.to_be_bytes()
80}
81
82impl LogStorage for RedbLogStorage {
83    fn append(&mut self, entries: &[LogEntry]) -> nodedb_raft::error::Result<()> {
84        if entries.is_empty() {
85            return Ok(());
86        }
87
88        let write_txn =
89            self.db
90                .begin_write()
91                .map_err(|e| nodedb_raft::error::RaftError::Storage {
92                    detail: format!("write txn: {e}"),
93                })?;
94        {
95            let mut table = write_txn.open_table(ENTRIES).map_err(|e| {
96                nodedb_raft::error::RaftError::Storage {
97                    detail: format!("open entries: {e}"),
98                }
99            })?;
100
101            for entry in entries {
102                let key = index_key(entry.index);
103                let value = zerompk::to_msgpack_vec(entry).map_err(|e| {
104                    nodedb_raft::error::RaftError::Storage {
105                        detail: format!("serialize entry: {e}"),
106                    }
107                })?;
108                table
109                    .insert(key.as_slice(), value.as_slice())
110                    .map_err(|e| nodedb_raft::error::RaftError::Storage {
111                        detail: format!("insert entry: {e}"),
112                    })?;
113            }
114        }
115        write_txn
116            .commit()
117            .map_err(|e| nodedb_raft::error::RaftError::Storage {
118                detail: format!("commit append: {e}"),
119            })?;
120
121        debug!(count = entries.len(), "raft log appended");
122        Ok(())
123    }
124
125    fn truncate(&mut self, index: u64) -> nodedb_raft::error::Result<()> {
126        let write_txn =
127            self.db
128                .begin_write()
129                .map_err(|e| nodedb_raft::error::RaftError::Storage {
130                    detail: format!("write txn: {e}"),
131                })?;
132        {
133            let mut table = write_txn.open_table(ENTRIES).map_err(|e| {
134                nodedb_raft::error::RaftError::Storage {
135                    detail: format!("open entries: {e}"),
136                }
137            })?;
138
139            // Collect keys >= index to remove.
140            let start = index_key(index);
141            let keys_to_remove: Vec<[u8; 8]> = table
142                .range(start.as_slice()..)
143                .map_err(|e| nodedb_raft::error::RaftError::Storage {
144                    detail: format!("range: {e}"),
145                })?
146                .filter_map(|r| {
147                    r.ok().map(|(k, _)| {
148                        let mut buf = [0u8; 8];
149                        buf.copy_from_slice(k.value());
150                        buf
151                    })
152                })
153                .collect();
154
155            for key in &keys_to_remove {
156                table.remove(key.as_slice()).map_err(|e| {
157                    nodedb_raft::error::RaftError::Storage {
158                        detail: format!("remove: {e}"),
159                    }
160                })?;
161            }
162        }
163        write_txn
164            .commit()
165            .map_err(|e| nodedb_raft::error::RaftError::Storage {
166                detail: format!("commit truncate: {e}"),
167            })?;
168
169        debug!(from_index = index, "raft log truncated");
170        Ok(())
171    }
172
173    fn load_entries_after(&self, snapshot_index: u64) -> nodedb_raft::error::Result<Vec<LogEntry>> {
174        let read_txn =
175            self.db
176                .begin_read()
177                .map_err(|e| nodedb_raft::error::RaftError::Storage {
178                    detail: format!("read txn: {e}"),
179                })?;
180        let table =
181            read_txn
182                .open_table(ENTRIES)
183                .map_err(|e| nodedb_raft::error::RaftError::Storage {
184                    detail: format!("open entries: {e}"),
185                })?;
186
187        // Start after snapshot_index (exclusive).
188        let start = index_key(snapshot_index + 1);
189        let mut entries = Vec::new();
190
191        for result in
192            table
193                .range(start.as_slice()..)
194                .map_err(|e| nodedb_raft::error::RaftError::Storage {
195                    detail: format!("range: {e}"),
196                })?
197        {
198            let (_, value) = result.map_err(|e| nodedb_raft::error::RaftError::Storage {
199                detail: format!("entry read: {e}"),
200            })?;
201            let entry: LogEntry = zerompk::from_msgpack(value.value()).map_err(|e| {
202                nodedb_raft::error::RaftError::Storage {
203                    detail: format!("deserialize entry: {e}"),
204                }
205            })?;
206            entries.push(entry);
207        }
208
209        debug!(
210            count = entries.len(),
211            after = snapshot_index,
212            "raft log loaded"
213        );
214        Ok(entries)
215    }
216
217    fn compact(&mut self, index: u64, term: u64) -> nodedb_raft::error::Result<()> {
218        let write_txn =
219            self.db
220                .begin_write()
221                .map_err(|e| nodedb_raft::error::RaftError::Storage {
222                    detail: format!("write txn: {e}"),
223                })?;
224        {
225            // Remove entries <= index.
226            let mut table = write_txn.open_table(ENTRIES).map_err(|e| {
227                nodedb_raft::error::RaftError::Storage {
228                    detail: format!("open entries: {e}"),
229                }
230            })?;
231
232            let end = index_key(index + 1);
233            let keys_to_remove: Vec<[u8; 8]> = table
234                .range(..end.as_slice())
235                .map_err(|e| nodedb_raft::error::RaftError::Storage {
236                    detail: format!("range: {e}"),
237                })?
238                .filter_map(|r| {
239                    r.ok().map(|(k, _)| {
240                        let mut buf = [0u8; 8];
241                        buf.copy_from_slice(k.value());
242                        buf
243                    })
244                })
245                .collect();
246
247            for key in &keys_to_remove {
248                table.remove(key.as_slice()).map_err(|e| {
249                    nodedb_raft::error::RaftError::Storage {
250                        detail: format!("remove: {e}"),
251                    }
252                })?;
253            }
254
255            // Save snapshot metadata.
256            let mut meta =
257                write_txn
258                    .open_table(META)
259                    .map_err(|e| nodedb_raft::error::RaftError::Storage {
260                        detail: format!("open meta: {e}"),
261                    })?;
262
263            let idx_bytes = zerompk::to_msgpack_vec(&index).map_err(|e| {
264                nodedb_raft::error::RaftError::Storage {
265                    detail: format!("serialize: {e}"),
266                }
267            })?;
268            let term_bytes = zerompk::to_msgpack_vec(&term).map_err(|e| {
269                nodedb_raft::error::RaftError::Storage {
270                    detail: format!("serialize: {e}"),
271                }
272            })?;
273
274            meta.insert(KEY_SNAPSHOT_INDEX, idx_bytes.as_slice())
275                .map_err(|e| nodedb_raft::error::RaftError::Storage {
276                    detail: format!("insert meta: {e}"),
277                })?;
278            meta.insert(KEY_SNAPSHOT_TERM, term_bytes.as_slice())
279                .map_err(|e| nodedb_raft::error::RaftError::Storage {
280                    detail: format!("insert meta: {e}"),
281                })?;
282        }
283        write_txn
284            .commit()
285            .map_err(|e| nodedb_raft::error::RaftError::Storage {
286                detail: format!("commit compact: {e}"),
287            })?;
288
289        debug!(index, term, "raft log compacted");
290        Ok(())
291    }
292
293    fn snapshot_metadata(&self) -> (u64, u64) {
294        let Ok(read_txn) = self.db.begin_read() else {
295            return (0, 0);
296        };
297        let Ok(table) = read_txn.open_table(META) else {
298            return (0, 0);
299        };
300
301        let index = table
302            .get(KEY_SNAPSHOT_INDEX)
303            .ok()
304            .flatten()
305            .and_then(|v| zerompk::from_msgpack::<u64>(v.value()).ok())
306            .unwrap_or(0);
307        let term = table
308            .get(KEY_SNAPSHOT_TERM)
309            .ok()
310            .flatten()
311            .and_then(|v| zerompk::from_msgpack::<u64>(v.value()).ok())
312            .unwrap_or(0);
313
314        (index, term)
315    }
316
317    fn save_hard_state(&mut self, state: &HardState) -> nodedb_raft::error::Result<()> {
318        let write_txn =
319            self.db
320                .begin_write()
321                .map_err(|e| nodedb_raft::error::RaftError::Storage {
322                    detail: format!("write txn: {e}"),
323                })?;
324        {
325            let mut table =
326                write_txn
327                    .open_table(META)
328                    .map_err(|e| nodedb_raft::error::RaftError::Storage {
329                        detail: format!("open meta: {e}"),
330                    })?;
331
332            let bytes = zerompk::to_msgpack_vec(state).map_err(|e| {
333                nodedb_raft::error::RaftError::Storage {
334                    detail: format!("serialize: {e}"),
335                }
336            })?;
337            table
338                .insert(KEY_HARD_STATE, bytes.as_slice())
339                .map_err(|e| nodedb_raft::error::RaftError::Storage {
340                    detail: format!("insert: {e}"),
341                })?;
342        }
343        write_txn
344            .commit()
345            .map_err(|e| nodedb_raft::error::RaftError::Storage {
346                detail: format!("commit: {e}"),
347            })?;
348
349        debug!(
350            term = state.current_term,
351            voted_for = state.voted_for,
352            "raft hard state saved"
353        );
354        Ok(())
355    }
356
357    fn load_hard_state(&self) -> nodedb_raft::error::Result<HardState> {
358        let read_txn =
359            self.db
360                .begin_read()
361                .map_err(|e| nodedb_raft::error::RaftError::Storage {
362                    detail: format!("read txn: {e}"),
363                })?;
364        let table =
365            read_txn
366                .open_table(META)
367                .map_err(|e| nodedb_raft::error::RaftError::Storage {
368                    detail: format!("open meta: {e}"),
369                })?;
370
371        match table.get(KEY_HARD_STATE) {
372            Ok(Some(value)) => {
373                let state: HardState = zerompk::from_msgpack(value.value()).map_err(|e| {
374                    nodedb_raft::error::RaftError::Storage {
375                        detail: format!("deserialize: {e}"),
376                    }
377                })?;
378                Ok(state)
379            }
380            Ok(None) => Ok(HardState::default()),
381            Err(e) => Err(nodedb_raft::error::RaftError::Storage {
382                detail: format!("get hard state: {e}"),
383            }),
384        }
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391
392    fn open_temp() -> (RedbLogStorage, tempfile::TempDir) {
393        let dir = tempfile::tempdir().unwrap();
394        let path = dir.path().join("test-raft.redb");
395        let storage = RedbLogStorage::open(&path).unwrap();
396        (storage, dir)
397    }
398
399    #[test]
400    fn append_and_load() {
401        let (mut s, _dir) = open_temp();
402        let entries = vec![
403            LogEntry {
404                term: 1,
405                index: 1,
406                data: b"cmd-a".to_vec(),
407            },
408            LogEntry {
409                term: 1,
410                index: 2,
411                data: b"cmd-b".to_vec(),
412            },
413            LogEntry {
414                term: 2,
415                index: 3,
416                data: b"cmd-c".to_vec(),
417            },
418        ];
419        s.append(&entries).unwrap();
420
421        let loaded = s.load_entries_after(0).unwrap();
422        assert_eq!(loaded.len(), 3);
423        assert_eq!(loaded[0].data, b"cmd-a");
424        assert_eq!(loaded[2].term, 2);
425    }
426
427    #[test]
428    fn truncate_removes_tail() {
429        let (mut s, _dir) = open_temp();
430        for i in 1..=5 {
431            s.append(&[LogEntry {
432                term: 1,
433                index: i,
434                data: vec![],
435            }])
436            .unwrap();
437        }
438        s.truncate(3).unwrap();
439        let loaded = s.load_entries_after(0).unwrap();
440        assert_eq!(loaded.len(), 2);
441        assert_eq!(loaded.last().unwrap().index, 2);
442    }
443
444    #[test]
445    fn compact_removes_prefix() {
446        let (mut s, _dir) = open_temp();
447        for i in 1..=10 {
448            s.append(&[LogEntry {
449                term: 1,
450                index: i,
451                data: vec![],
452            }])
453            .unwrap();
454        }
455        s.compact(5, 1).unwrap();
456        assert_eq!(s.snapshot_metadata(), (5, 1));
457        let loaded = s.load_entries_after(5).unwrap();
458        assert_eq!(loaded.len(), 5);
459        assert_eq!(loaded[0].index, 6);
460    }
461
462    #[test]
463    fn hard_state_roundtrip() {
464        let (mut s, _dir) = open_temp();
465        let hs = HardState {
466            current_term: 7,
467            voted_for: 3,
468        };
469        s.save_hard_state(&hs).unwrap();
470        let loaded = s.load_hard_state().unwrap();
471        assert_eq!(loaded, hs);
472    }
473
474    #[test]
475    fn survives_reopen() {
476        let dir = tempfile::tempdir().unwrap();
477        let path = dir.path().join("reopen-raft.redb");
478
479        {
480            let mut s = RedbLogStorage::open(&path).unwrap();
481            s.append(&[LogEntry {
482                term: 1,
483                index: 1,
484                data: b"durable".to_vec(),
485            }])
486            .unwrap();
487            s.save_hard_state(&HardState {
488                current_term: 3,
489                voted_for: 1,
490            })
491            .unwrap();
492        }
493
494        let s = RedbLogStorage::open(&path).unwrap();
495        let loaded = s.load_entries_after(0).unwrap();
496        assert_eq!(loaded.len(), 1);
497        assert_eq!(loaded[0].data, b"durable");
498        let hs = s.load_hard_state().unwrap();
499        assert_eq!(hs.current_term, 3);
500    }
501}