Skip to main content

cyphr_storage/
file.rs

1//! File-based storage backend using JSONL format.
2//!
3//! Stores each principal's entries in a separate `.jsonl` file,
4//! with one commit bundle per line (commit-based format).
5
6use crate::{CommitEntry, Entry, EntryError, QueryOpts, Store};
7use cyphr::state::PrincipalGenesis;
8use std::fs::{self, File, OpenOptions};
9use std::io::{BufRead, BufReader, Write};
10use std::path::{Path, PathBuf};
11
12/// File-based storage backend.
13///
14/// Stores each principal's entries in a JSONL file named `<pr_b64>.jsonl`
15/// within the configured base directory.
16///
17/// # Example
18///
19/// ```no_run
20/// use cyphr_storage::FileStore;
21///
22/// let store = FileStore::new("/var/data/cyphr");
23/// ```
24pub struct FileStore {
25    base_dir: PathBuf,
26}
27
28impl FileStore {
29    /// Create a new file store with the given base directory.
30    ///
31    /// The directory will be created if it doesn't exist.
32    pub fn new(base_dir: impl AsRef<Path>) -> Self {
33        Self {
34            base_dir: base_dir.as_ref().to_path_buf(),
35        }
36    }
37
38    /// Get the file path for a principal's entry log.
39    ///
40    /// # Errors
41    ///
42    /// Returns `FileStoreError::EmptyDigest` if the PrincipalGenesis has no variants.
43    fn path_for(&self, pr: &PrincipalGenesis) -> Result<PathBuf, FileStoreError> {
44        use coz::base64ct::{Base64UrlUnpadded, Encoding};
45        // Extract the first variant for the filename
46        let pr_bytes = pr
47            .as_multihash()
48            .first_variant()
49            .map_err(|e| FileStoreError::EmptyDigest(e.to_string()))?;
50        let filename = format!("{}.jsonl", Base64UrlUnpadded::encode_string(pr_bytes));
51        Ok(self.base_dir.join(filename))
52    }
53
54    /// Ensure the base directory exists.
55    fn ensure_dir(&self) -> Result<(), FileStoreError> {
56        if !self.base_dir.exists() {
57            fs::create_dir_all(&self.base_dir).map_err(FileStoreError::Io)?;
58        }
59        Ok(())
60    }
61}
62
63impl Store for FileStore {
64    type Error = FileStoreError;
65
66    fn append_entry(&self, pr: &PrincipalGenesis, entry: &Entry) -> Result<(), Self::Error> {
67        self.ensure_dir()?;
68        let path = self.path_for(pr)?;
69
70        let mut file = OpenOptions::new()
71            .create(true)
72            .append(true)
73            .open(&path)
74            .map_err(FileStoreError::Io)?;
75
76        // Write the original JSON bytes (no re-serialization)
77        writeln!(file, "{}", entry.raw_json()).map_err(FileStoreError::Io)?;
78
79        Ok(())
80    }
81
82    fn get_entries(&self, pr: &PrincipalGenesis) -> Result<Vec<Entry>, Self::Error> {
83        let path = self.path_for(pr)?;
84
85        if !path.exists() {
86            return Ok(vec![]);
87        }
88
89        let file = File::open(&path).map_err(FileStoreError::Io)?;
90        let reader = BufReader::new(file);
91        let mut entries = Vec::new();
92
93        for (line_num, line) in reader.lines().enumerate() {
94            let line = line.map_err(FileStoreError::Io)?;
95            if line.trim().is_empty() {
96                continue;
97            }
98
99            // CRITICAL: Use from_json to preserve original bytes for czd computation
100            let entry = Entry::from_json(line).map_err(|e| FileStoreError::Entry {
101                line: line_num + 1,
102                source: e,
103            })?;
104
105            entries.push(entry);
106        }
107
108        Ok(entries)
109    }
110
111    fn get_entries_range(
112        &self,
113        pr: &PrincipalGenesis,
114        opts: &QueryOpts,
115    ) -> Result<Vec<Entry>, Self::Error> {
116        let mut entries = self.get_entries(pr)?;
117
118        // Apply time filters
119        if let Some(after) = opts.after {
120            entries.retain(|e| e.now > after);
121        }
122        if let Some(before) = opts.before {
123            entries.retain(|e| e.now < before);
124        }
125
126        // Apply limit
127        if let Some(limit) = opts.limit {
128            entries.truncate(limit);
129        }
130
131        Ok(entries)
132    }
133
134    fn exists(&self, pr: &PrincipalGenesis) -> Result<bool, Self::Error> {
135        Ok(self.path_for(pr)?.exists())
136    }
137}
138
139// ============================================================================
140// Commit-Based Storage Methods
141// ============================================================================
142
143impl FileStore {
144    /// Append a commit bundle to the principal's log.
145    ///
146    /// Each commit is stored as a single JSON line containing:
147    /// - `cozies`: Array of coz entries
148    /// - `commit_id`: Commit ID (base64url)
149    /// - `as`: Auth State (base64url)
150    /// - `sr`: State Root (base64url)
151    /// - `ps`: Principal State (base64url)
152    pub fn append_commit(
153        &self,
154        pr: &PrincipalGenesis,
155        commit: &CommitEntry,
156    ) -> Result<(), FileStoreError> {
157        self.ensure_dir()?;
158        let path = self.path_for(pr)?;
159
160        let mut file = OpenOptions::new()
161            .create(true)
162            .append(true)
163            .open(&path)
164            .map_err(FileStoreError::Io)?;
165
166        // Serialize and write as a single line
167        let json = serde_json::to_string(commit).map_err(FileStoreError::Json)?;
168        writeln!(file, "{}", json).map_err(FileStoreError::Io)?;
169
170        Ok(())
171    }
172
173    /// Get all commits from the principal's log.
174    ///
175    /// Parses each line as a `CommitEntry` with embedded state digests.
176    pub fn get_commits(&self, pr: &PrincipalGenesis) -> Result<Vec<CommitEntry>, FileStoreError> {
177        let path = self.path_for(pr)?;
178
179        if !path.exists() {
180            return Ok(vec![]);
181        }
182
183        let file = File::open(&path).map_err(FileStoreError::Io)?;
184        let reader = BufReader::new(file);
185        let mut commits = Vec::new();
186
187        for (line_num, line) in reader.lines().enumerate() {
188            let line = line.map_err(FileStoreError::Io)?;
189            if line.trim().is_empty() {
190                continue;
191            }
192
193            let commit: CommitEntry =
194                serde_json::from_str(&line).map_err(|e| FileStoreError::ParseLine {
195                    line: line_num + 1,
196                    source: e,
197                })?;
198
199            commits.push(commit);
200        }
201
202        Ok(commits)
203    }
204}
205
206/// Errors from the file-based storage backend.
207#[derive(Debug, thiserror::Error)]
208pub enum FileStoreError {
209    /// I/O error reading or writing files.
210    #[error("I/O error: {0}")]
211    Io(#[from] std::io::Error),
212
213    /// JSON serialization error.
214    #[error("JSON error: {0}")]
215    Json(#[from] serde_json::Error),
216
217    /// Error parsing a line in the JSONL file.
218    #[error("failed to parse line {line}: {source}")]
219    ParseLine {
220        line: usize,
221        #[source]
222        source: serde_json::Error,
223    },
224
225    /// Error extracting entry data.
226    #[error("invalid entry at line {line}: {source}")]
227    Entry {
228        line: usize,
229        #[source]
230        source: EntryError,
231    },
232    /// State digest is empty.
233    #[error("empty state digest: {0}")]
234    EmptyDigest(String),
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use std::env::temp_dir;
241    use std::time::{SystemTime, UNIX_EPOCH};
242
243    fn temp_store(test_name: &str) -> (FileStore, PathBuf) {
244        let nanos = SystemTime::now()
245            .duration_since(UNIX_EPOCH)
246            .unwrap()
247            .subsec_nanos();
248        let dir = temp_dir().join(format!(
249            "cyphr_test_{}_{}_{}",
250            std::process::id(),
251            test_name,
252            nanos
253        ));
254        (FileStore::new(&dir), dir)
255    }
256
257    #[test]
258    fn test_exists_empty() {
259        let (store, dir) = temp_store("exists_empty");
260        let pr = PrincipalGenesis::from_bytes(vec![1, 2, 3, 4]);
261
262        assert!(!store.exists(&pr).unwrap());
263
264        // Cleanup
265        let _ = fs::remove_dir_all(&dir);
266    }
267
268    #[test]
269    fn test_append_and_get() {
270        let (store, dir) = temp_store("append_and_get");
271        let pr = PrincipalGenesis::from_bytes(vec![1, 2, 3, 4]);
272
273        let entry = Entry::from_json(
274            r#"{"pay":{"now":1234567890,"typ":"test/action"},"sig":"test"}"#.to_string(),
275        )
276        .unwrap();
277
278        store.append_entry(&pr, &entry).unwrap();
279        assert!(store.exists(&pr).unwrap());
280
281        let entries = store.get_entries(&pr).unwrap();
282        assert_eq!(entries.len(), 1);
283        assert_eq!(entries[0].now, 1234567890);
284
285        // Cleanup
286        let _ = fs::remove_dir_all(&dir);
287    }
288
289    #[test]
290    fn test_get_entries_range() {
291        let (store, dir) = temp_store("entries_range");
292        let pr = PrincipalGenesis::from_bytes(vec![1, 2, 3, 4]);
293
294        // Add entries with different timestamps: 100, 200, 300, 400, 500
295        for i in 1..=5 {
296            let json = format!(
297                r#"{{"pay":{{"now":{},"typ":"test"}},"sig":"test"}}"#,
298                i * 100
299            );
300            let entry = Entry::from_json(json).unwrap();
301            store.append_entry(&pr, &entry).unwrap();
302        }
303
304        // Filter by time range: after 150, before 450
305        // Entries: 100, 200, 300, 400, 500
306        // after 150 -> now > 150 -> 200, 300, 400, 500
307        // before 450 -> now < 450 -> 200, 300, 400
308        // Result: 3 entries (200, 300, 400)
309        let opts = QueryOpts {
310            after: Some(150),
311            before: Some(450),
312            limit: None,
313        };
314        let entries = store.get_entries_range(&pr, &opts).unwrap();
315        assert_eq!(entries.len(), 3);
316        assert_eq!(entries[0].now, 200);
317        assert_eq!(entries[1].now, 300);
318        assert_eq!(entries[2].now, 400);
319
320        // Cleanup
321        let _ = fs::remove_dir_all(&dir);
322    }
323}