Skip to main content

eml_storage_fjall/
lib.rs

1//! Fjall-backed persistence implementation for the Epoch Merkle Log (EML).
2//!
3//! This crate provides [`FjallStorage`], a production-grade implementation of the EML
4//! [`Storage`] trait. It stores leaves, internal node hashes, and algorithm metadata
5//! in dedicated Fjall keyspaces inside a shared database.
6//!
7//! # Architecture
8//!
9//! `FjallStorage` manages three distinct keyspaces (partitions):
10//! - `"eml_leaves"`: Maps leaf index (`u64`) to raw payload bytes.
11//! - `"eml_nodes"`: Maps tree node coordinates `(alg_id, left, height)` to hash digests.
12//! - `"eml_metadata"`: Maps algorithm ID (`u64`) to serialized active epoch ranges.
13
14use std::path::Path;
15
16use eml::{AlgorithmMetas, Epochs, Storage};
17use fjall::{Database, Keyspace, KeyspaceCreateOptions};
18
19/// Error type for [`FjallStorage`] operations.
20#[derive(Debug, thiserror::Error)]
21pub enum FjallStorageError {
22    /// An error occurred in the underlying Fjall engine.
23    #[error("Fjall database error: {0}")]
24    Fjall(#[from] fjall::Error),
25
26    /// An I/O error occurred.
27    #[error("I/O error: {0}")]
28    Io(#[from] std::io::Error),
29
30    /// Serialized epoch data is malformed or corrupted.
31    #[error("Epoch metadata corruption: {0}")]
32    MetadataCorruption(String),
33}
34
35/// A production-grade EML storage backend backed by a Fjall database.
36///
37/// Clones of `FjallStorage` share the same underlying database handle.
38#[derive(Clone)]
39pub struct FjallStorage {
40    db: Database,
41    leaves: Keyspace,
42    nodes: Keyspace,
43    metadata: Keyspace,
44}
45
46impl FjallStorage {
47    /// Open or create a new EML storage database at the specified directory path.
48    ///
49    /// # Errors
50    ///
51    /// Returns a [`FjallStorageError`] if the directory cannot be created or the database
52    /// fails to initialize.
53    pub fn open(path: &Path) -> Result<Self, FjallStorageError> {
54        let db = Database::builder(path).open()?;
55        Self::with_database(db)
56    }
57
58    /// Initialize EML storage keyspaces using an existing, shared Fjall database.
59    ///
60    /// This constructor allows reuse of a single database instance (e.g., sharing it
61    /// with a concurrent content-addressed blob store).
62    ///
63    /// # Errors
64    ///
65    /// Returns a [`FjallStorageError`] if keyspace initialization fails.
66    pub fn with_database(db: Database) -> Result<Self, FjallStorageError> {
67        let leaves = db.keyspace("eml_leaves", || KeyspaceCreateOptions::default())?;
68        let nodes = db.keyspace("eml_nodes", || KeyspaceCreateOptions::default())?;
69        let metadata = db.keyspace("eml_metadata", || KeyspaceCreateOptions::default())?;
70
71        Ok(Self {
72            db,
73            leaves,
74            nodes,
75            metadata,
76        })
77    }
78}
79
80impl Storage for FjallStorage {
81    type Error = FjallStorageError;
82
83    async fn store_leaf(&mut self, index: u64, data: &[u8]) -> Result<(), Self::Error> {
84        let key = index.to_be_bytes();
85        self.leaves.insert(key, data)?;
86        Ok(())
87    }
88
89    async fn get_leaf(&self, index: u64) -> Result<Vec<u8>, Self::Error> {
90        let key = index.to_be_bytes();
91        let value = self.leaves.get(key)?;
92        match value {
93            Some(bytes) => Ok(bytes.to_vec()),
94            None => Err(FjallStorageError::Io(std::io::Error::new(
95                std::io::ErrorKind::NotFound,
96                format!("leaf index {index} not found"),
97            ))),
98        }
99    }
100
101    async fn len(&self) -> u64 {
102        // Since leaf indices are written sequentially (0, 1, 2...) as big-endian u64,
103        // the last key in the partition is the highest index.
104        // Seeking to the end of the keyspace is O(1) in Fjall/LSM.
105        if let Some(guard) = self.leaves.iter().rev().next() {
106            if let Ok(key_bytes) = guard.key() {
107                if key_bytes.len() == 8 {
108                    let index = u64::from_be_bytes(key_bytes.as_ref().try_into().unwrap());
109                    return index + 1;
110                }
111            }
112        }
113        0
114    }
115
116    async fn store_node(
117        &mut self,
118        alg_id: u64,
119        left: u64,
120        height: usize,
121        hash: &[u8],
122    ) -> Result<(), Self::Error> {
123        let mut key = [0u8; 24];
124        key[0..8].copy_from_slice(&alg_id.to_be_bytes());
125        key[8..16].copy_from_slice(&left.to_be_bytes());
126        key[16..24].copy_from_slice(&(height as u64).to_be_bytes());
127
128        self.nodes.insert(key, hash)?;
129        Ok(())
130    }
131
132    async fn get_node(
133        &self,
134        alg_id: u64,
135        left: u64,
136        height: usize,
137    ) -> Result<Option<Vec<u8>>, Self::Error> {
138        let mut key = [0u8; 24];
139        key[0..8].copy_from_slice(&alg_id.to_be_bytes());
140        key[8..16].copy_from_slice(&left.to_be_bytes());
141        key[16..24].copy_from_slice(&(height as u64).to_be_bytes());
142
143        let value = self.nodes.get(key)?;
144        Ok(value.map(|bytes| bytes.to_vec()))
145    }
146
147    async fn store_algorithm_meta(
148        &mut self,
149        alg_id: u64,
150        epochs: &[(u64, u64)],
151    ) -> Result<(), Self::Error> {
152        let key = alg_id.to_be_bytes();
153        let value = serialize_epochs(epochs);
154        self.metadata.insert(key, value)?;
155        Ok(())
156    }
157
158    async fn load_algorithm_metas(&self) -> Result<AlgorithmMetas, Self::Error> {
159        let mut metas = Vec::new();
160        // Iterate through all entries in the metadata keyspace.
161        for guard in self.metadata.iter() {
162            let (key_bytes, value_bytes) = guard.into_inner()?;
163
164            let alg_id = u64::from_be_bytes(key_bytes.as_ref().try_into().map_err(|_| {
165                FjallStorageError::MetadataCorruption("Invalid key length".to_string())
166            })?);
167
168            let epochs = deserialize_epochs(value_bytes.as_ref())
169                .map_err(FjallStorageError::MetadataCorruption)?;
170
171            metas.push((alg_id, epochs));
172        }
173        Ok(metas)
174    }
175
176    async fn write_batch(
177        &mut self,
178        leaves: &[(u64, &[u8])],
179        nodes: &[(u64, u64, usize, &[u8])],
180    ) -> Result<(), Self::Error> {
181        // Implement atomic batch write across both leaf and node keyspaces
182        let mut batch = self.db.batch();
183
184        for &(index, data) in leaves {
185            let key = index.to_be_bytes();
186            batch.insert(&self.leaves, key, data);
187        }
188
189        for &(alg_id, left, height, hash) in nodes {
190            let mut key = [0u8; 24];
191            key[0..8].copy_from_slice(&alg_id.to_be_bytes());
192            key[8..16].copy_from_slice(&left.to_be_bytes());
193            key[16..24].copy_from_slice(&(height as u64).to_be_bytes());
194            batch.insert(&self.nodes, key, hash);
195        }
196
197        // Commit the batch atomically
198        batch.commit()?;
199        Ok(())
200    }
201}
202
203/// Helper to serialize active algorithm epochs into bytes.
204fn serialize_epochs(epochs: &[(u64, u64)]) -> Vec<u8> {
205    let mut bytes = Vec::with_capacity(epochs.len() * 16);
206    for &(start, end) in epochs {
207        bytes.extend_from_slice(&start.to_be_bytes());
208        bytes.extend_from_slice(&end.to_be_bytes());
209    }
210    bytes
211}
212
213/// Helper to deserialize active algorithm epochs from bytes.
214fn deserialize_epochs(bytes: &[u8]) -> Result<Epochs, String> {
215    if bytes.len() % 16 != 0 {
216        return Err(format!("Invalid metadata length: {}", bytes.len()));
217    }
218    let mut epochs = Vec::with_capacity(bytes.len() / 16);
219    for chunk in bytes.chunks_exact(16) {
220        let start = u64::from_be_bytes(chunk[0..8].try_into().unwrap());
221        let end = u64::from_be_bytes(chunk[8..16].try_into().unwrap());
222        epochs.push((start, end));
223    }
224    Ok(epochs)
225}
226
227#[cfg(test)]
228mod tests {
229    use tempfile::tempdir;
230
231    use super::*;
232
233    #[tokio::test]
234    async fn test_atomic_batch_abort() {
235        let dir = tempdir().unwrap();
236        let storage = FjallStorage::open(dir.path()).unwrap();
237
238        // Verify initial state is empty
239        assert_eq!(storage.len().await, 0);
240
241        // Open a batch, perform inserts, then drop the batch without committing
242        {
243            let mut batch = storage.db.batch();
244            batch.insert(&storage.leaves, 0u64.to_be_bytes(), b"should_not_exist");
245            let mut node_key = [0u8; 24];
246            node_key[0..8].copy_from_slice(&99u64.to_be_bytes());
247            batch.insert(&storage.nodes, node_key, b"should_not_exist_node");
248            // batch is dropped here
249        }
250
251        // Verify partitions remain empty
252        assert_eq!(storage.len().await, 0);
253        assert!(storage.get_leaf(0).await.is_err());
254        assert!(storage.get_node(99, 0, 0).await.unwrap().is_none());
255    }
256}