neleus_db/
object_store.rs1use std::path::PathBuf;
2use std::sync::Arc;
3
4use anyhow::{Result, anyhow};
5use serde::{Serialize, de::DeserializeOwned};
6
7use crate::canonical::{from_cbor, to_cbor};
8use crate::cas::CasStore;
9use crate::compression;
10use crate::encryption::EncryptionRuntime;
11use crate::hash::{Hash, hash_typed};
12
13#[derive(Clone, Debug)]
14pub struct ObjectStore {
15 cas: CasStore,
16 verify_on_read: bool,
17 compress: bool,
18 encryption: Option<Arc<EncryptionRuntime>>,
19}
20
21impl ObjectStore {
22 pub fn new(root: impl Into<PathBuf>) -> Self {
23 Self {
24 cas: CasStore::new(root),
25 verify_on_read: false,
26 compress: false,
27 encryption: None,
28 }
29 }
30
31 pub fn with_options(root: impl Into<PathBuf>, verify_on_read: bool) -> Self {
32 Self::with_runtime_options(root, verify_on_read, false, None)
33 }
34
35 pub fn with_runtime_options(
36 root: impl Into<PathBuf>,
37 verify_on_read: bool,
38 compress: bool,
39 encryption: Option<Arc<EncryptionRuntime>>,
40 ) -> Self {
41 Self {
42 cas: CasStore::new(root),
43 verify_on_read,
44 compress,
45 encryption,
46 }
47 }
48
49 pub fn verify_on_read(&self) -> bool {
50 self.verify_on_read
51 }
52
53 pub fn compress(&self) -> bool {
54 self.compress
55 }
56
57 pub fn ensure_dir(&self) -> Result<()> {
58 self.cas.ensure_dir()
59 }
60
61 pub fn put_typed_bytes(&self, tag: &[u8], bytes: &[u8]) -> Result<Hash> {
62 let hash = hash_typed(tag, bytes);
64 let after_compress: Vec<u8> = if self.compress {
65 compression::compress(bytes)?
66 } else {
67 bytes.to_vec()
68 };
69 let stored = match &self.encryption {
70 Some(runtime) => runtime.encrypt(&after_compress)?,
71 None => after_compress,
72 };
73 self.cas.put_existing_hash(hash, &stored)?;
74 Ok(hash)
75 }
76
77 pub fn get_bytes(&self, hash: Hash) -> Result<Vec<u8>> {
78 let raw = self.cas.get(hash)?;
79 let after_decrypt = match &self.encryption {
80 Some(runtime) => runtime.decrypt(&raw)?,
81 None => raw,
82 };
83 Ok(compression::decompress_if_compressed(&after_decrypt)?.into_owned())
84 }
85
86 pub fn get_typed_bytes(&self, tag: &[u8], hash: Hash) -> Result<Vec<u8>> {
87 let raw = self.cas.get(hash)?;
88 let after_decrypt = match &self.encryption {
89 Some(runtime) => runtime.decrypt(&raw)?,
90 None => raw,
91 };
92 let bytes = compression::decompress_if_compressed(&after_decrypt)?;
93 if self.verify_on_read {
94 let computed = hash_typed(tag, &bytes);
95 if computed != hash {
96 return Err(anyhow!(
97 "object hash mismatch for {} (computed {})",
98 hash,
99 computed
100 ));
101 }
102 }
103 Ok(bytes.into_owned())
104 }
105
106 pub fn exists(&self, hash: Hash) -> bool {
107 self.cas.exists(hash)
108 }
109
110 pub fn put_serialized<T: Serialize>(&self, tag: &[u8], value: &T) -> Result<Hash> {
111 let bytes = to_cbor(value)?;
112 self.put_typed_bytes(tag, &bytes)
113 }
114
115 pub fn get_deserialized<T: DeserializeOwned>(&self, hash: Hash) -> Result<T> {
116 let bytes = self.get_bytes(hash)?;
117 from_cbor(&bytes)
118 }
119
120 pub fn get_deserialized_typed<T: DeserializeOwned>(&self, tag: &[u8], hash: Hash) -> Result<T> {
121 let bytes = self.get_typed_bytes(tag, hash)?;
122 from_cbor(&bytes)
123 }
124}
125
126#[cfg(test)]
127mod tests {
128 use std::fs;
129
130 use serde::{Deserialize, Serialize};
131 use tempfile::TempDir;
132
133 use super::*;
134 use crate::hash::hash_typed;
135
136 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
137 struct Obj {
138 x: u32,
139 }
140
141 #[test]
142 fn object_store_serialized_roundtrip() {
143 let dir = TempDir::new().unwrap();
144 let store = ObjectStore::new(dir.path());
145 store.ensure_dir().unwrap();
146
147 let hash = store.put_serialized(b"manifest:", &Obj { x: 7 }).unwrap();
148 let out: Obj = store.get_deserialized_typed(b"manifest:", hash).unwrap();
149 assert_eq!(out.x, 7);
150 }
151
152 #[test]
153 fn object_store_is_deterministic_for_same_object() {
154 let dir = TempDir::new().unwrap();
155 let store = ObjectStore::new(dir.path());
156 store.ensure_dir().unwrap();
157
158 let a = store.put_serialized(b"manifest:", &Obj { x: 1 }).unwrap();
159 let b = store.put_serialized(b"manifest:", &Obj { x: 1 }).unwrap();
160 assert_eq!(a, b);
161 }
162
163 #[test]
164 fn object_store_domain_separator_changes_hash() {
165 let dir = TempDir::new().unwrap();
166 let store = ObjectStore::new(dir.path());
167 store.ensure_dir().unwrap();
168
169 let bytes = crate::canonical::to_cbor(&Obj { x: 1 }).unwrap();
170 let a = store.put_typed_bytes(b"manifest:", &bytes).unwrap();
171 let b = store.put_typed_bytes(b"commit:", &bytes).unwrap();
172 assert_ne!(a, b);
173 }
174
175 #[test]
176 fn typed_read_verification_detects_corruption() {
177 let dir = TempDir::new().unwrap();
178 let store = ObjectStore::with_options(dir.path(), true);
179 store.ensure_dir().unwrap();
180
181 let hash = store.put_serialized(b"manifest:", &Obj { x: 7 }).unwrap();
182
183 let cas = CasStore::new(dir.path());
184 let path = cas.path_for(hash);
185 fs::write(path, b"tampered").unwrap();
186
187 assert!(
188 store
189 .get_deserialized_typed::<Obj>(b"manifest:", hash)
190 .is_err()
191 );
192 }
193
194 #[test]
195 fn typed_hash_matches_expected() {
196 let dir = TempDir::new().unwrap();
197 let store = ObjectStore::new(dir.path());
198 store.ensure_dir().unwrap();
199
200 let bytes = crate::canonical::to_cbor(&Obj { x: 9 }).unwrap();
201 let expected = hash_typed(b"commit:", &bytes);
202 let h = store.put_typed_bytes(b"commit:", &bytes).unwrap();
203 assert_eq!(h, expected);
204 }
205}