1use std::path::Path;
15
16use eml::{AlgorithmMetas, Epochs, Storage};
17use fjall::{Database, Keyspace, KeyspaceCreateOptions};
18
19#[derive(Debug, thiserror::Error)]
21pub enum FjallStorageError {
22 #[error("Fjall database error: {0}")]
24 Fjall(#[from] fjall::Error),
25
26 #[error("I/O error: {0}")]
28 Io(#[from] std::io::Error),
29
30 #[error("Epoch metadata corruption: {0}")]
32 MetadataCorruption(String),
33}
34
35#[derive(Clone)]
39pub struct FjallStorage {
40 db: Database,
41 leaves: Keyspace,
42 nodes: Keyspace,
43 metadata: Keyspace,
44}
45
46impl FjallStorage {
47 pub fn open(path: &Path) -> Result<Self, FjallStorageError> {
54 let db = Database::builder(path).open()?;
55 Self::with_database(db)
56 }
57
58 pub fn with_database(db: Database) -> Result<Self, FjallStorageError> {
67 let leaves = db.keyspace("eml_leaves", || KeyspaceCreateOptions::default())?;
68 let nodes = db.keyspace("eml_nodes", || KeyspaceCreateOptions::default())?;
69 let metadata = db.keyspace("eml_metadata", || KeyspaceCreateOptions::default())?;
70
71 Ok(Self {
72 db,
73 leaves,
74 nodes,
75 metadata,
76 })
77 }
78}
79
80impl Storage for FjallStorage {
81 type Error = FjallStorageError;
82
83 async fn store_leaf(&mut self, index: u64, data: &[u8]) -> Result<(), Self::Error> {
84 let key = index.to_be_bytes();
85 self.leaves.insert(key, data)?;
86 Ok(())
87 }
88
89 async fn get_leaf(&self, index: u64) -> Result<Vec<u8>, Self::Error> {
90 let key = index.to_be_bytes();
91 let value = self.leaves.get(key)?;
92 match value {
93 Some(bytes) => Ok(bytes.to_vec()),
94 None => Err(FjallStorageError::Io(std::io::Error::new(
95 std::io::ErrorKind::NotFound,
96 format!("leaf index {index} not found"),
97 ))),
98 }
99 }
100
101 async fn len(&self) -> u64 {
102 if let Some(guard) = self.leaves.iter().rev().next() {
106 if let Ok(key_bytes) = guard.key() {
107 if key_bytes.len() == 8 {
108 let index = u64::from_be_bytes(key_bytes.as_ref().try_into().unwrap());
109 return index + 1;
110 }
111 }
112 }
113 0
114 }
115
116 async fn store_node(
117 &mut self,
118 alg_id: u64,
119 left: u64,
120 height: usize,
121 hash: &[u8],
122 ) -> Result<(), Self::Error> {
123 let mut key = [0u8; 24];
124 key[0..8].copy_from_slice(&alg_id.to_be_bytes());
125 key[8..16].copy_from_slice(&left.to_be_bytes());
126 key[16..24].copy_from_slice(&(height as u64).to_be_bytes());
127
128 self.nodes.insert(key, hash)?;
129 Ok(())
130 }
131
132 async fn get_node(
133 &self,
134 alg_id: u64,
135 left: u64,
136 height: usize,
137 ) -> Result<Option<Vec<u8>>, Self::Error> {
138 let mut key = [0u8; 24];
139 key[0..8].copy_from_slice(&alg_id.to_be_bytes());
140 key[8..16].copy_from_slice(&left.to_be_bytes());
141 key[16..24].copy_from_slice(&(height as u64).to_be_bytes());
142
143 let value = self.nodes.get(key)?;
144 Ok(value.map(|bytes| bytes.to_vec()))
145 }
146
147 async fn store_algorithm_meta(
148 &mut self,
149 alg_id: u64,
150 epochs: &[(u64, u64)],
151 ) -> Result<(), Self::Error> {
152 let key = alg_id.to_be_bytes();
153 let value = serialize_epochs(epochs);
154 self.metadata.insert(key, value)?;
155 Ok(())
156 }
157
158 async fn load_algorithm_metas(&self) -> Result<AlgorithmMetas, Self::Error> {
159 let mut metas = Vec::new();
160 for guard in self.metadata.iter() {
162 let (key_bytes, value_bytes) = guard.into_inner()?;
163
164 let alg_id = u64::from_be_bytes(key_bytes.as_ref().try_into().map_err(|_| {
165 FjallStorageError::MetadataCorruption("Invalid key length".to_string())
166 })?);
167
168 let epochs = deserialize_epochs(value_bytes.as_ref())
169 .map_err(FjallStorageError::MetadataCorruption)?;
170
171 metas.push((alg_id, epochs));
172 }
173 Ok(metas)
174 }
175
176 async fn write_batch(
177 &mut self,
178 leaves: &[(u64, &[u8])],
179 nodes: &[(u64, u64, usize, &[u8])],
180 ) -> Result<(), Self::Error> {
181 let mut batch = self.db.batch();
183
184 for &(index, data) in leaves {
185 let key = index.to_be_bytes();
186 batch.insert(&self.leaves, key, data);
187 }
188
189 for &(alg_id, left, height, hash) in nodes {
190 let mut key = [0u8; 24];
191 key[0..8].copy_from_slice(&alg_id.to_be_bytes());
192 key[8..16].copy_from_slice(&left.to_be_bytes());
193 key[16..24].copy_from_slice(&(height as u64).to_be_bytes());
194 batch.insert(&self.nodes, key, hash);
195 }
196
197 batch.commit()?;
199 Ok(())
200 }
201}
202
203fn serialize_epochs(epochs: &[(u64, u64)]) -> Vec<u8> {
205 let mut bytes = Vec::with_capacity(epochs.len() * 16);
206 for &(start, end) in epochs {
207 bytes.extend_from_slice(&start.to_be_bytes());
208 bytes.extend_from_slice(&end.to_be_bytes());
209 }
210 bytes
211}
212
213fn deserialize_epochs(bytes: &[u8]) -> Result<Epochs, String> {
215 if bytes.len() % 16 != 0 {
216 return Err(format!("Invalid metadata length: {}", bytes.len()));
217 }
218 let mut epochs = Vec::with_capacity(bytes.len() / 16);
219 for chunk in bytes.chunks_exact(16) {
220 let start = u64::from_be_bytes(chunk[0..8].try_into().unwrap());
221 let end = u64::from_be_bytes(chunk[8..16].try_into().unwrap());
222 epochs.push((start, end));
223 }
224 Ok(epochs)
225}
226
227#[cfg(test)]
228mod tests {
229 use tempfile::tempdir;
230
231 use super::*;
232
233 #[tokio::test]
234 async fn test_atomic_batch_abort() {
235 let dir = tempdir().unwrap();
236 let storage = FjallStorage::open(dir.path()).unwrap();
237
238 assert_eq!(storage.len().await, 0);
240
241 {
243 let mut batch = storage.db.batch();
244 batch.insert(&storage.leaves, 0u64.to_be_bytes(), b"should_not_exist");
245 let mut node_key = [0u8; 24];
246 node_key[0..8].copy_from_slice(&99u64.to_be_bytes());
247 batch.insert(&storage.nodes, node_key, b"should_not_exist_node");
248 }
250
251 assert_eq!(storage.len().await, 0);
253 assert!(storage.get_leaf(0).await.is_err());
254 assert!(storage.get_node(99, 0, 0).await.unwrap().is_none());
255 }
256}