Skip to main content

nodedb_cluster/
raft_storage.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Persistent Raft log storage backed by redb.
4//!
5//! Implements `nodedb_raft::LogStorage` with ACID durability via redb.
6//! Each Raft group gets its own redb file at `{data_dir}/raft/group-{id}.redb`.
7//!
8//! Layout:
9//! - `ENTRIES` table: key = log index (u64 big-endian), value = MessagePack LogEntry
10//! - `META` table: key = name (&str), value = MessagePack-encoded metadata
11//!   - "hard_state" → HardState { current_term, voted_for }
12//!   - "snapshot_index" → u64
13//!   - "snapshot_term" → u64
14
15use std::path::Path;
16
17use redb::{Database, ReadableTable, TableDefinition};
18use tracing::{debug, info};
19
20use nodedb_raft::message::LogEntry;
21use nodedb_raft::state::HardState;
22use nodedb_raft::storage::LogStorage;
23
24use crate::wire_version::envelope::{decode_versioned, encode_versioned};
25
26/// Log entries: key = index (big-endian u64 for sorted iteration), value = versioned-envelope bytes.
27const ENTRIES: TableDefinition<&[u8], &[u8]> = TableDefinition::new("raft.entries");
28
29/// Metadata: key = name, value = MessagePack bytes.
30const META: TableDefinition<&str, &[u8]> = TableDefinition::new("raft.meta");
31
32const KEY_HARD_STATE: &str = "hard_state";
33const KEY_SNAPSHOT_INDEX: &str = "snapshot_index";
34const KEY_SNAPSHOT_TERM: &str = "snapshot_term";
35
36/// Persistent Raft log storage backed by redb.
37pub struct RedbLogStorage {
38    db: Database,
39}
40
41impl RedbLogStorage {
42    /// Open or create storage at the given path.
43    pub fn open(path: &Path) -> crate::Result<Self> {
44        if let Some(parent) = path.parent() {
45            std::fs::create_dir_all(parent).map_err(|e| crate::ClusterError::Storage {
46                detail: format!("create raft storage dir: {e}"),
47            })?;
48        }
49
50        let db = Database::create(path).map_err(|e| crate::ClusterError::Storage {
51            detail: format!("open raft storage: {e}"),
52        })?;
53
54        // Ensure tables exist.
55        let write_txn = db.begin_write().map_err(|e| crate::ClusterError::Storage {
56            detail: format!("init raft tables: {e}"),
57        })?;
58        {
59            write_txn
60                .open_table(ENTRIES)
61                .map_err(|e| crate::ClusterError::Storage {
62                    detail: format!("create entries table: {e}"),
63                })?;
64            write_txn
65                .open_table(META)
66                .map_err(|e| crate::ClusterError::Storage {
67                    detail: format!("create meta table: {e}"),
68                })?;
69        }
70        write_txn
71            .commit()
72            .map_err(|e| crate::ClusterError::Storage {
73                detail: format!("commit raft init: {e}"),
74            })?;
75
76        info!(path = %path.display(), "raft log storage opened");
77
78        Ok(Self { db })
79    }
80}
81
82fn index_key(index: u64) -> [u8; 8] {
83    index.to_be_bytes()
84}
85
86impl LogStorage for RedbLogStorage {
87    fn append(&mut self, entries: &[LogEntry]) -> nodedb_raft::error::Result<()> {
88        if entries.is_empty() {
89            return Ok(());
90        }
91
92        let write_txn =
93            self.db
94                .begin_write()
95                .map_err(|e| nodedb_raft::error::RaftError::Storage {
96                    detail: format!("write txn: {e}"),
97                })?;
98        {
99            let mut table = write_txn.open_table(ENTRIES).map_err(|e| {
100                nodedb_raft::error::RaftError::Storage {
101                    detail: format!("open entries: {e}"),
102                }
103            })?;
104
105            for entry in entries {
106                let key = index_key(entry.index);
107                let value = encode_versioned(entry).map_err(|e| {
108                    nodedb_raft::error::RaftError::Storage {
109                        detail: format!("serialize entry: {e}"),
110                    }
111                })?;
112                table
113                    .insert(key.as_slice(), value.as_slice())
114                    .map_err(|e| nodedb_raft::error::RaftError::Storage {
115                        detail: format!("insert entry: {e}"),
116                    })?;
117            }
118        }
119        write_txn
120            .commit()
121            .map_err(|e| nodedb_raft::error::RaftError::Storage {
122                detail: format!("commit append: {e}"),
123            })?;
124
125        debug!(count = entries.len(), "raft log appended");
126        Ok(())
127    }
128
129    fn truncate(&mut self, index: u64) -> nodedb_raft::error::Result<()> {
130        let write_txn =
131            self.db
132                .begin_write()
133                .map_err(|e| nodedb_raft::error::RaftError::Storage {
134                    detail: format!("write txn: {e}"),
135                })?;
136        {
137            let mut table = write_txn.open_table(ENTRIES).map_err(|e| {
138                nodedb_raft::error::RaftError::Storage {
139                    detail: format!("open entries: {e}"),
140                }
141            })?;
142
143            // Collect keys >= index to remove.
144            let start = index_key(index);
145            let keys_to_remove: Vec<[u8; 8]> = table
146                .range(start.as_slice()..)
147                .map_err(|e| nodedb_raft::error::RaftError::Storage {
148                    detail: format!("range: {e}"),
149                })?
150                .filter_map(|r| {
151                    r.ok().map(|(k, _)| {
152                        let mut buf = [0u8; 8];
153                        buf.copy_from_slice(k.value());
154                        buf
155                    })
156                })
157                .collect();
158
159            for key in &keys_to_remove {
160                table.remove(key.as_slice()).map_err(|e| {
161                    nodedb_raft::error::RaftError::Storage {
162                        detail: format!("remove: {e}"),
163                    }
164                })?;
165            }
166        }
167        write_txn
168            .commit()
169            .map_err(|e| nodedb_raft::error::RaftError::Storage {
170                detail: format!("commit truncate: {e}"),
171            })?;
172
173        debug!(from_index = index, "raft log truncated");
174        Ok(())
175    }
176
177    fn load_entries_after(&self, snapshot_index: u64) -> nodedb_raft::error::Result<Vec<LogEntry>> {
178        let read_txn =
179            self.db
180                .begin_read()
181                .map_err(|e| nodedb_raft::error::RaftError::Storage {
182                    detail: format!("read txn: {e}"),
183                })?;
184        let table =
185            read_txn
186                .open_table(ENTRIES)
187                .map_err(|e| nodedb_raft::error::RaftError::Storage {
188                    detail: format!("open entries: {e}"),
189                })?;
190
191        // Start after snapshot_index (exclusive).
192        let start = index_key(snapshot_index + 1);
193        let mut entries = Vec::new();
194
195        for result in
196            table
197                .range(start.as_slice()..)
198                .map_err(|e| nodedb_raft::error::RaftError::Storage {
199                    detail: format!("range: {e}"),
200                })?
201        {
202            let (_, value) = result.map_err(|e| nodedb_raft::error::RaftError::Storage {
203                detail: format!("entry read: {e}"),
204            })?;
205            let entry: LogEntry = decode_versioned(value.value()).map_err(|e| {
206                nodedb_raft::error::RaftError::Storage {
207                    detail: format!("deserialize entry: {e}"),
208                }
209            })?;
210            entries.push(entry);
211        }
212
213        debug!(
214            count = entries.len(),
215            after = snapshot_index,
216            "raft log loaded"
217        );
218        Ok(entries)
219    }
220
221    fn compact(&mut self, index: u64, term: u64) -> nodedb_raft::error::Result<()> {
222        let write_txn =
223            self.db
224                .begin_write()
225                .map_err(|e| nodedb_raft::error::RaftError::Storage {
226                    detail: format!("write txn: {e}"),
227                })?;
228        {
229            // Remove entries <= index.
230            let mut table = write_txn.open_table(ENTRIES).map_err(|e| {
231                nodedb_raft::error::RaftError::Storage {
232                    detail: format!("open entries: {e}"),
233                }
234            })?;
235
236            let end = index_key(index + 1);
237            let keys_to_remove: Vec<[u8; 8]> = table
238                .range(..end.as_slice())
239                .map_err(|e| nodedb_raft::error::RaftError::Storage {
240                    detail: format!("range: {e}"),
241                })?
242                .filter_map(|r| {
243                    r.ok().map(|(k, _)| {
244                        let mut buf = [0u8; 8];
245                        buf.copy_from_slice(k.value());
246                        buf
247                    })
248                })
249                .collect();
250
251            for key in &keys_to_remove {
252                table.remove(key.as_slice()).map_err(|e| {
253                    nodedb_raft::error::RaftError::Storage {
254                        detail: format!("remove: {e}"),
255                    }
256                })?;
257            }
258
259            // Save snapshot metadata.
260            let mut meta =
261                write_txn
262                    .open_table(META)
263                    .map_err(|e| nodedb_raft::error::RaftError::Storage {
264                        detail: format!("open meta: {e}"),
265                    })?;
266
267            let idx_bytes = zerompk::to_msgpack_vec(&index).map_err(|e| {
268                nodedb_raft::error::RaftError::Storage {
269                    detail: format!("serialize: {e}"),
270                }
271            })?;
272            let term_bytes = zerompk::to_msgpack_vec(&term).map_err(|e| {
273                nodedb_raft::error::RaftError::Storage {
274                    detail: format!("serialize: {e}"),
275                }
276            })?;
277
278            meta.insert(KEY_SNAPSHOT_INDEX, idx_bytes.as_slice())
279                .map_err(|e| nodedb_raft::error::RaftError::Storage {
280                    detail: format!("insert meta: {e}"),
281                })?;
282            meta.insert(KEY_SNAPSHOT_TERM, term_bytes.as_slice())
283                .map_err(|e| nodedb_raft::error::RaftError::Storage {
284                    detail: format!("insert meta: {e}"),
285                })?;
286        }
287        write_txn
288            .commit()
289            .map_err(|e| nodedb_raft::error::RaftError::Storage {
290                detail: format!("commit compact: {e}"),
291            })?;
292
293        debug!(index, term, "raft log compacted");
294        Ok(())
295    }
296
297    fn snapshot_metadata(&self) -> (u64, u64) {
298        let Ok(read_txn) = self.db.begin_read() else {
299            return (0, 0);
300        };
301        let Ok(table) = read_txn.open_table(META) else {
302            return (0, 0);
303        };
304
305        let index = table
306            .get(KEY_SNAPSHOT_INDEX)
307            .ok()
308            .flatten()
309            .and_then(|v| zerompk::from_msgpack::<u64>(v.value()).ok())
310            .unwrap_or(0);
311        let term = table
312            .get(KEY_SNAPSHOT_TERM)
313            .ok()
314            .flatten()
315            .and_then(|v| zerompk::from_msgpack::<u64>(v.value()).ok())
316            .unwrap_or(0);
317
318        (index, term)
319    }
320
321    fn save_hard_state(&mut self, state: &HardState) -> nodedb_raft::error::Result<()> {
322        let write_txn =
323            self.db
324                .begin_write()
325                .map_err(|e| nodedb_raft::error::RaftError::Storage {
326                    detail: format!("write txn: {e}"),
327                })?;
328        {
329            let mut table =
330                write_txn
331                    .open_table(META)
332                    .map_err(|e| nodedb_raft::error::RaftError::Storage {
333                        detail: format!("open meta: {e}"),
334                    })?;
335
336            let bytes = zerompk::to_msgpack_vec(state).map_err(|e| {
337                nodedb_raft::error::RaftError::Storage {
338                    detail: format!("serialize: {e}"),
339                }
340            })?;
341            table
342                .insert(KEY_HARD_STATE, bytes.as_slice())
343                .map_err(|e| nodedb_raft::error::RaftError::Storage {
344                    detail: format!("insert: {e}"),
345                })?;
346        }
347        write_txn
348            .commit()
349            .map_err(|e| nodedb_raft::error::RaftError::Storage {
350                detail: format!("commit: {e}"),
351            })?;
352
353        debug!(
354            term = state.current_term,
355            voted_for = state.voted_for,
356            "raft hard state saved"
357        );
358        Ok(())
359    }
360
361    fn load_hard_state(&self) -> nodedb_raft::error::Result<HardState> {
362        let read_txn =
363            self.db
364                .begin_read()
365                .map_err(|e| nodedb_raft::error::RaftError::Storage {
366                    detail: format!("read txn: {e}"),
367                })?;
368        let table =
369            read_txn
370                .open_table(META)
371                .map_err(|e| nodedb_raft::error::RaftError::Storage {
372                    detail: format!("open meta: {e}"),
373                })?;
374
375        match table.get(KEY_HARD_STATE) {
376            Ok(Some(value)) => {
377                let state: HardState = zerompk::from_msgpack(value.value()).map_err(|e| {
378                    nodedb_raft::error::RaftError::Storage {
379                        detail: format!("deserialize: {e}"),
380                    }
381                })?;
382                Ok(state)
383            }
384            Ok(None) => Ok(HardState::default()),
385            Err(e) => Err(nodedb_raft::error::RaftError::Storage {
386                detail: format!("get hard state: {e}"),
387            }),
388        }
389    }
390}
391
392#[cfg(test)]
393mod tests {
394    use super::*;
395
396    fn open_temp() -> (RedbLogStorage, tempfile::TempDir) {
397        let dir = tempfile::tempdir().unwrap();
398        let path = dir.path().join("test-raft.redb");
399        let storage = RedbLogStorage::open(&path).unwrap();
400        (storage, dir)
401    }
402
403    #[test]
404    fn append_and_load() {
405        let (mut s, _dir) = open_temp();
406        let entries = vec![
407            LogEntry {
408                term: 1,
409                index: 1,
410                data: b"cmd-a".to_vec(),
411            },
412            LogEntry {
413                term: 1,
414                index: 2,
415                data: b"cmd-b".to_vec(),
416            },
417            LogEntry {
418                term: 2,
419                index: 3,
420                data: b"cmd-c".to_vec(),
421            },
422        ];
423        s.append(&entries).unwrap();
424
425        let loaded = s.load_entries_after(0).unwrap();
426        assert_eq!(loaded.len(), 3);
427        assert_eq!(loaded[0].data, b"cmd-a");
428        assert_eq!(loaded[2].term, 2);
429    }
430
431    #[test]
432    fn truncate_removes_tail() {
433        let (mut s, _dir) = open_temp();
434        for i in 1..=5 {
435            s.append(&[LogEntry {
436                term: 1,
437                index: i,
438                data: vec![],
439            }])
440            .unwrap();
441        }
442        s.truncate(3).unwrap();
443        let loaded = s.load_entries_after(0).unwrap();
444        assert_eq!(loaded.len(), 2);
445        assert_eq!(loaded.last().unwrap().index, 2);
446    }
447
448    #[test]
449    fn compact_removes_prefix() {
450        let (mut s, _dir) = open_temp();
451        for i in 1..=10 {
452            s.append(&[LogEntry {
453                term: 1,
454                index: i,
455                data: vec![],
456            }])
457            .unwrap();
458        }
459        s.compact(5, 1).unwrap();
460        assert_eq!(s.snapshot_metadata(), (5, 1));
461        let loaded = s.load_entries_after(5).unwrap();
462        assert_eq!(loaded.len(), 5);
463        assert_eq!(loaded[0].index, 6);
464    }
465
466    #[test]
467    fn hard_state_roundtrip() {
468        let (mut s, _dir) = open_temp();
469        let hs = HardState {
470            current_term: 7,
471            voted_for: 3,
472        };
473        s.save_hard_state(&hs).unwrap();
474        let loaded = s.load_hard_state().unwrap();
475        assert_eq!(loaded, hs);
476    }
477
478    /// LogEntry written and read back via the versioned envelope.
479    #[test]
480    fn versioned_roundtrip() {
481        let (mut s, _dir) = open_temp();
482        let entry = LogEntry {
483            term: 3,
484            index: 1,
485            data: b"versioned-payload".to_vec(),
486        };
487        s.append(std::slice::from_ref(&entry)).unwrap();
488        let loaded = s.load_entries_after(0).unwrap();
489        assert_eq!(loaded.len(), 1);
490        assert_eq!(loaded[0], entry);
491    }
492
493    /// Bytes with the envelope marker but an unknown future version must
494    /// return a Storage error, not silently decode as v1.
495    #[test]
496    fn version_mismatch_returns_error() {
497        let (s, _dir) = open_temp();
498
499        // Build an envelope with a future version number (9999).
500        let inner = zerompk::to_msgpack_vec(&LogEntry {
501            term: 1,
502            index: 77,
503            data: vec![],
504        })
505        .unwrap();
506
507        // Construct the envelope manually: marker + version(9999) + inner_len + inner.
508        let mut bad_bytes = Vec::new();
509        bad_bytes.push(0xc1u8); // ENVELOPE_MARKER
510        bad_bytes.extend_from_slice(&9999u16.to_be_bytes());
511        bad_bytes.extend_from_slice(&(inner.len() as u32).to_be_bytes());
512        bad_bytes.extend_from_slice(&inner);
513
514        let key = index_key(77);
515        let write_txn = s.db.begin_write().unwrap();
516        {
517            let mut table = write_txn.open_table(ENTRIES).unwrap();
518            table.insert(key.as_slice(), bad_bytes.as_slice()).unwrap();
519        }
520        write_txn.commit().unwrap();
521
522        let err = s.load_entries_after(76).unwrap_err();
523        match err {
524            nodedb_raft::error::RaftError::Storage { detail } => {
525                assert!(
526                    detail.contains("deserialize"),
527                    "expected 'deserialize' in detail: {detail}"
528                );
529            }
530            other => panic!("expected Storage error, got: {other}"),
531        }
532    }
533
534    #[test]
535    fn survives_reopen() {
536        let dir = tempfile::tempdir().unwrap();
537        let path = dir.path().join("reopen-raft.redb");
538
539        {
540            let mut s = RedbLogStorage::open(&path).unwrap();
541            s.append(&[LogEntry {
542                term: 1,
543                index: 1,
544                data: b"durable".to_vec(),
545            }])
546            .unwrap();
547            s.save_hard_state(&HardState {
548                current_term: 3,
549                voted_for: 1,
550            })
551            .unwrap();
552        }
553
554        let s = RedbLogStorage::open(&path).unwrap();
555        let loaded = s.load_entries_after(0).unwrap();
556        assert_eq!(loaded.len(), 1);
557        assert_eq!(loaded[0].data, b"durable");
558        let hs = s.load_hard_state().unwrap();
559        assert_eq!(hs.current_term, 3);
560    }
561}