Skip to main content

nklave_storage/
log.rs

1//! Append-only decision log
2//!
3//! Records all signing decisions for audit and state recovery
4
5use nklave_core::state::integrity::DecisionRecord;
6use std::fs::{File, OpenOptions};
7use std::io::{BufRead, BufReader, BufWriter, Write};
8use std::path::{Path, PathBuf};
9use thiserror::Error;
10
11/// Append-only decision log
12pub struct DecisionLog {
13    path: PathBuf,
14    writer: BufWriter<File>,
15    last_sequence: u64,
16}
17
18impl DecisionLog {
19    /// Open or create a decision log at the given path
20    pub fn open(path: impl AsRef<Path>) -> Result<Self, LogError> {
21        let path = path.as_ref().to_path_buf();
22
23        // Determine last sequence by reading existing log
24        let last_sequence = if path.exists() {
25            Self::read_last_sequence(&path)?
26        } else {
27            0
28        };
29
30        // Open for appending
31        let file = OpenOptions::new()
32            .create(true)
33            .append(true)
34            .open(&path)
35            .map_err(|e| LogError::Io(e.to_string()))?;
36
37        let writer = BufWriter::new(file);
38
39        Ok(Self {
40            path,
41            writer,
42            last_sequence,
43        })
44    }
45
46    /// Read the last sequence number from an existing log
47    fn read_last_sequence(path: &Path) -> Result<u64, LogError> {
48        let file = File::open(path).map_err(|e| LogError::Io(e.to_string()))?;
49        let reader = BufReader::new(file);
50
51        let mut last_sequence = 0u64;
52
53        for line in reader.lines() {
54            let line = line.map_err(|e| LogError::Io(e.to_string()))?;
55            if line.is_empty() {
56                continue;
57            }
58
59            let record: DecisionRecord =
60                serde_json::from_str(&line).map_err(|e| LogError::Parse(e.to_string()))?;
61
62            last_sequence = record.sequence;
63        }
64
65        Ok(last_sequence)
66    }
67
68    /// Append a decision record to the log
69    pub fn append(&mut self, record: &DecisionRecord) -> Result<(), LogError> {
70        // Verify sequence continuity
71        if record.sequence != self.last_sequence + 1 {
72            return Err(LogError::SequenceGap {
73                expected: self.last_sequence + 1,
74                actual: record.sequence,
75            });
76        }
77
78        // Serialize and write
79        let json = serde_json::to_string(record).map_err(|e| LogError::Serialize(e.to_string()))?;
80
81        writeln!(self.writer, "{}", json).map_err(|e| LogError::Io(e.to_string()))?;
82
83        // Flush to ensure durability
84        self.writer
85            .flush()
86            .map_err(|e| LogError::Io(e.to_string()))?;
87
88        self.last_sequence = record.sequence;
89
90        Ok(())
91    }
92
93    /// Get the last recorded sequence number
94    pub fn last_sequence(&self) -> u64 {
95        self.last_sequence
96    }
97
98    /// Replay all records from the log
99    pub fn replay(&self) -> Result<Vec<DecisionRecord>, LogError> {
100        let file = File::open(&self.path).map_err(|e| LogError::Io(e.to_string()))?;
101        let reader = BufReader::new(file);
102
103        let mut records = Vec::new();
104
105        for line in reader.lines() {
106            let line = line.map_err(|e| LogError::Io(e.to_string()))?;
107            if line.is_empty() {
108                continue;
109            }
110
111            let record: DecisionRecord =
112                serde_json::from_str(&line).map_err(|e| LogError::Parse(e.to_string()))?;
113
114            records.push(record);
115        }
116
117        Ok(records)
118    }
119
120    /// Replay records starting from a specific sequence
121    pub fn replay_from(&self, start_sequence: u64) -> Result<Vec<DecisionRecord>, LogError> {
122        let records = self.replay()?;
123        Ok(records
124            .into_iter()
125            .filter(|r| r.sequence >= start_sequence)
126            .collect())
127    }
128
129    /// Sync the log to disk
130    pub fn sync(&mut self) -> Result<(), LogError> {
131        self.writer
132            .flush()
133            .map_err(|e| LogError::Io(e.to_string()))?;
134        self.writer
135            .get_ref()
136            .sync_all()
137            .map_err(|e| LogError::Io(e.to_string()))?;
138        Ok(())
139    }
140}
141
142/// Errors related to the decision log
143#[derive(Debug, Error)]
144pub enum LogError {
145    #[error("I/O error: {0}")]
146    Io(String),
147
148    #[error("Parse error: {0}")]
149    Parse(String),
150
151    #[error("Serialization error: {0}")]
152    Serialize(String),
153
154    #[error("Sequence gap: expected {expected}, got {actual}")]
155    SequenceGap { expected: u64, actual: u64 },
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161    use nklave_core::policy::types::{PolicyDecision, SigningType};
162    use tempfile::TempDir;
163
164    fn make_record(seq: u64) -> DecisionRecord {
165        DecisionRecord {
166            sequence: seq,
167            timestamp: 1234567890,
168            validator_pubkey: [0u8; 48],
169            request_type: SigningType::BlockProposal,
170            decision: PolicyDecision::Allow,
171            signing_root: [0u8; 32],
172            prev_state_hash: [0u8; 32],
173            signing_context: None,
174        }
175    }
176
177    #[test]
178    fn test_log_create_and_append() {
179        let dir = TempDir::new().unwrap();
180        let log_path = dir.path().join("decisions.log");
181
182        let mut log = DecisionLog::open(&log_path).unwrap();
183        assert_eq!(log.last_sequence(), 0);
184
185        log.append(&make_record(1)).unwrap();
186        assert_eq!(log.last_sequence(), 1);
187
188        log.append(&make_record(2)).unwrap();
189        assert_eq!(log.last_sequence(), 2);
190    }
191
192    #[test]
193    fn test_log_replay() {
194        let dir = TempDir::new().unwrap();
195        let log_path = dir.path().join("decisions.log");
196
197        {
198            let mut log = DecisionLog::open(&log_path).unwrap();
199            log.append(&make_record(1)).unwrap();
200            log.append(&make_record(2)).unwrap();
201            log.append(&make_record(3)).unwrap();
202        }
203
204        // Reopen and replay
205        let log = DecisionLog::open(&log_path).unwrap();
206        assert_eq!(log.last_sequence(), 3);
207
208        let records = log.replay().unwrap();
209        assert_eq!(records.len(), 3);
210        assert_eq!(records[0].sequence, 1);
211        assert_eq!(records[2].sequence, 3);
212    }
213
214    #[test]
215    fn test_log_sequence_gap() {
216        let dir = TempDir::new().unwrap();
217        let log_path = dir.path().join("decisions.log");
218
219        let mut log = DecisionLog::open(&log_path).unwrap();
220        log.append(&make_record(1)).unwrap();
221
222        // Try to append with wrong sequence
223        let result = log.append(&make_record(5));
224        assert!(matches!(result, Err(LogError::SequenceGap { .. })));
225    }
226}