1use std::fs;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use anyhow::{bail, Context, Result};
16use serde::{Deserialize, Serialize};
17use blake2::{Blake2b512, Digest};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct Node {
22 pub id: String,
24 pub coll: String,
26 pub seq: u64,
28 pub data: serde_json::Value,
30 pub prev: Option<String>,
32 pub caused_by: Vec<String>,
34 pub ts: f64,
36 pub valid_from: Option<String>,
38 pub valid_to: Option<String>,
40 #[serde(skip_serializing_if = "String::is_empty", default)]
42 pub hash: String,
43}
44
45#[derive(Clone)]
48pub struct Dek(pub [u8; 32]);
49
50impl Dek {
51 pub fn from_tmk(tmk: &[u8; 32], salt: &[u8]) -> Self {
52 use sha2::{Sha256, Digest as _};
53 let mut h = Sha256::new();
54 h.update(tmk);
55 h.update(salt);
56 let result = h.finalize();
57 let mut key = [0u8; 32];
58 key.copy_from_slice(&result[..32]);
59 Dek(key)
60 }
61}
62
63fn blake2b(data: &[u8]) -> String {
64 let mut h = Blake2b512::new();
65 h.update(data);
66 hex::encode(&h.finalize()[..32]) }
68
69fn encrypt(data: &[u8], dek: &Dek) -> Result<Vec<u8>> {
70 use aes_gcm::{Aes256Gcm, KeyInit, aead::{Aead, OsRng, rand_core::RngCore}};
71 let cipher = Aes256Gcm::new_from_slice(&dek.0)?;
72 let mut nonce_bytes = [0u8; 12];
73 OsRng.fill_bytes(&mut nonce_bytes);
74 let nonce = aes_gcm::Nonce::from(nonce_bytes);
75 let ciphertext = cipher.encrypt(&nonce, data)
76 .map_err(|e| anyhow::anyhow!("encrypt failed: {:?}", e))?;
77 let mut out = nonce_bytes.to_vec();
79 out.extend_from_slice(&ciphertext);
80 Ok(out)
81}
82
83fn decrypt(data: &[u8], dek: &Dek) -> Result<Vec<u8>> {
84 use aes_gcm::{Aes256Gcm, KeyInit, aead::Aead};
85 if data.len() < 12 { bail!("ciphertext too short"); }
86 let (nonce_bytes, ciphertext) = data.split_at(12);
87 let cipher = Aes256Gcm::new_from_slice(&dek.0)?;
88 let nonce = aes_gcm::Nonce::from_slice(nonce_bytes);
89 cipher.decrypt(nonce, ciphertext)
90 .map_err(|e| anyhow::anyhow!("decrypt failed: {:?}", e))
91}
92
93pub struct ObjectStore {
95 root: PathBuf,
96 dek: Option<Dek>,
97 mem: Option<Arc<dashmap::DashMap<String, Vec<u8>>>>,
99}
100
101impl ObjectStore {
102 pub fn new(db_root: &Path, dek: Option<Dek>) -> Result<Self> {
103 let root = db_root.join("objects");
104 fs::create_dir_all(&root)
105 .context("create objects/ dir")?;
106 Ok(Self { root, dek, mem: None })
107 }
108
109 pub fn in_memory() -> Self {
111 Self {
112 root: PathBuf::from(":memory:"),
113 dek: None,
114 mem: Some(Arc::new(dashmap::DashMap::new())),
115 }
116 }
117
118 pub fn write(&self, node: &mut Node) -> Result<String> {
120 let raw = serde_json::to_vec(node)?;
121 let content = match &self.dek {
122 Some(dek) => encrypt(&raw, dek)?,
123 None => raw,
124 };
125 let hash = blake2b(&content);
126
127 if let Some(ref mem) = self.mem {
128 mem.entry(hash.clone()).or_insert_with(|| content);
130 } else {
131 let dir = self.root.join(&hash[..2]);
133 fs::create_dir_all(&dir)?;
134 let path = dir.join(&hash[2..]);
135 if !path.exists() {
136 let tmp = path.with_extension("tmp");
137 fs::write(&tmp, &content)?;
138 fs::rename(&tmp, &path)?;
139 }
140 }
141 node.hash = hash.clone();
142 Ok(hash)
143 }
144
145 pub fn read(&self, hash: &str) -> Result<Node> {
147 if hash.len() < 3 {
148 anyhow::bail!("invalid object hash (too short): {:?}", hash);
149 }
150
151 let content: Vec<u8> = if let Some(ref mem) = self.mem {
152 mem.get(hash)
153 .map(|v| v.clone())
154 .ok_or_else(|| anyhow::anyhow!("object not found in memory: {}", hash))?
155 } else {
156 let path = self.root.join(&hash[..2]).join(&hash[2..]);
157 let c = fs::read(&path).with_context(|| format!("read object {}", hash))?;
158 let actual = blake2b(&c);
160 if actual != hash {
161 bail!("object {} tampered: expected {} got {}", hash, hash, actual);
162 }
163 c
164 };
165
166 let raw = match &self.dek {
167 Some(dek) => decrypt(&content, dek)?,
168 None => content,
169 };
170 let mut node: Node = serde_json::from_slice(&raw)
171 .context("deserialize node")?;
172 if node.hash.is_empty() {
173 node.hash = hash.to_string();
174 }
175 Ok(node)
176 }
177
178 pub fn all_hashes(&self) -> Box<dyn Iterator<Item = String> + '_> {
180 if let Some(ref mem) = self.mem {
182 let hashes: Vec<String> = mem.iter().map(|e| e.key().clone()).collect();
183 return Box::new(hashes.into_iter());
184 }
185 let root = self.root.clone();
187 Box::new(fs::read_dir(&root)
188 .into_iter()
189 .flatten()
190 .filter_map(|e| e.ok())
191 .filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
192 .flat_map(move |prefix_dir| {
193 let prefix = prefix_dir.file_name().to_string_lossy().to_string();
194 fs::read_dir(prefix_dir.path())
195 .into_iter()
196 .flatten()
197 .filter_map(|e| e.ok())
198 .filter_map(move |e| {
199 let name = e.file_name().to_string_lossy().to_string();
200 if name.ends_with(".tmp") { return None; }
201 Some(format!("{}{}", prefix, name))
202 })
203 }))
204 }
205
206 pub fn verify_all(&self) -> (usize, Vec<String>) {
208 use rayon::prelude::*;
209 let hashes: Vec<String> = self.all_hashes().collect();
210 let results: Vec<(bool, String)> = hashes.par_iter().map(|h| {
211 (self.read(h).is_ok(), h.clone())
212 }).collect();
213 let ok = results.iter().filter(|(ok, _)| *ok).count();
214 let bad: Vec<String> = results.into_iter()
215 .filter(|(ok, _)| !*ok)
216 .map(|(_, h)| h)
217 .collect();
218 (ok, bad)
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225 use tempfile::tempdir;
226
227 fn make_node(id: &str, coll: &str, seq: u64) -> Node {
228 Node {
229 id: id.to_string(), coll: coll.to_string(), seq,
230 data: serde_json::json!({"height": seq, "hash": "0000abc"}),
231 prev: None, caused_by: vec![], ts: 1718400000.0,
232 valid_from: None, valid_to: None, hash: String::new(),
233 }
234 }
235
236 #[test]
237 fn write_read_roundtrip() {
238 let dir = tempdir().unwrap();
239 let store = ObjectStore::new(dir.path(), None).unwrap();
240 let mut node = make_node("1", "blocks", 1);
241 let hash = store.write(&mut node).unwrap();
242 assert_eq!(hash.len(), 64);
243 let read_back = store.read(&hash).unwrap();
244 assert_eq!(read_back.id, "1");
245 assert_eq!(read_back.coll, "blocks");
246 }
247
248 #[test]
249 fn write_is_idempotent() {
250 let dir = tempdir().unwrap();
251 let store = ObjectStore::new(dir.path(), None).unwrap();
252 let mut node = make_node("1", "blocks", 1);
253 let h1 = store.write(&mut node).unwrap();
254 let h2 = store.write(&mut node).unwrap();
255 assert_eq!(h1, h2);
256 }
257
258 #[test]
259 fn tamper_detected() {
260 let dir = tempdir().unwrap();
261 let store = ObjectStore::new(dir.path(), None).unwrap();
262 let mut node = make_node("1", "blocks", 1);
263 let hash = store.write(&mut node).unwrap();
264 let path = dir.path().join("objects").join(&hash[..2]).join(&hash[2..]);
266 let mut content = fs::read(&path).unwrap();
267 content[10] ^= 0xff;
268 fs::write(&path, content).unwrap();
269 assert!(store.read(&hash).is_err());
270 }
271}