dbx_core/engine/
serialization.rs1use crate::error::{DbxError, DbxResult};
9use sha2::{Digest, Sha256};
10use std::collections::HashMap;
11use std::fs;
12use std::path::PathBuf;
13use std::sync::{Arc, RwLock};
14
15pub type SerializeFn = Arc<dyn Fn(&[u8]) -> DbxResult<Vec<u8>> + Send + Sync>;
17
18pub type DeserializeFn = Arc<dyn Fn(&[u8]) -> DbxResult<Vec<u8>> + Send + Sync>;
20
21pub struct SerializationRegistry {
23 serializers: Arc<RwLock<HashMap<String, SerializeFn>>>,
25
26 deserializers: Arc<RwLock<HashMap<String, DeserializeFn>>>,
28}
29
30impl SerializationRegistry {
31 pub fn new() -> Self {
33 Self {
34 serializers: Arc::new(RwLock::new(HashMap::new())),
35 deserializers: Arc::new(RwLock::new(HashMap::new())),
36 }
37 }
38
39 pub fn register_serializer(&self, type_name: String, serializer: SerializeFn) {
41 self.serializers
42 .write()
43 .unwrap()
44 .insert(type_name, serializer);
45 }
46
47 pub fn register_deserializer(&self, type_name: String, deserializer: DeserializeFn) {
49 self.deserializers
50 .write()
51 .unwrap()
52 .insert(type_name, deserializer);
53 }
54
55 pub fn serialize(&self, type_name: &str, data: &[u8]) -> DbxResult<Vec<u8>> {
57 let serializers = self.serializers.read().unwrap();
58 let serializer = serializers.get(type_name).ok_or_else(|| {
59 DbxError::Serialization(format!("No serializer registered for type '{}'", type_name))
60 })?;
61
62 serializer(data)
63 }
64
65 pub fn deserialize(&self, type_name: &str, data: &[u8]) -> DbxResult<Vec<u8>> {
67 let deserializers = self.deserializers.read().unwrap();
68 let deserializer = deserializers.get(type_name).ok_or_else(|| {
69 DbxError::Serialization(format!(
70 "No deserializer registered for type '{}'",
71 type_name
72 ))
73 })?;
74
75 deserializer(data)
76 }
77
78 pub fn registered_types(&self) -> Vec<String> {
80 self.serializers.read().unwrap().keys().cloned().collect()
81 }
82
83 pub fn compress(&self, data: &[u8], level: i32) -> DbxResult<Vec<u8>> {
85 zstd::encode_all(data, level)
86 .map_err(|e| DbxError::Serialization(format!("Compression failed: {}", e)))
87 }
88
89 pub fn decompress(&self, data: &[u8]) -> DbxResult<Vec<u8>> {
91 zstd::decode_all(data)
92 .map_err(|e| DbxError::Serialization(format!("Decompression failed: {}", e)))
93 }
94
95 pub fn checksum(&self, data: &[u8]) -> Vec<u8> {
97 let mut hasher = Sha256::new();
98 hasher.update(data);
99 hasher.finalize().to_vec()
100 }
101
102 pub fn verify_checksum(&self, data: &[u8], expected_checksum: &[u8]) -> bool {
104 let actual_checksum = self.checksum(data);
105 actual_checksum == expected_checksum
106 }
107}
108
109impl Default for SerializationRegistry {
110 fn default() -> Self {
111 Self::new()
112 }
113}
114
115pub struct TwoLevelCache {
117 l1_cache: Arc<RwLock<HashMap<String, Vec<u8>>>>,
119
120 l1_max_size: usize,
122
123 l1_current_size: Arc<RwLock<usize>>,
125
126 l2_cache_dir: PathBuf,
128}
129
130impl TwoLevelCache {
131 pub fn new(l1_max_size: usize, l2_cache_dir: PathBuf) -> Self {
133 Self {
134 l1_cache: Arc::new(RwLock::new(HashMap::new())),
135 l1_max_size,
136 l1_current_size: Arc::new(RwLock::new(0)),
137 l2_cache_dir,
138 }
139 }
140
141 pub fn put(&self, key: String, value: Vec<u8>) -> DbxResult<()> {
143 let value_size = value.len();
144
145 let mut current_size = self.l1_current_size.write().unwrap();
147 if *current_size + value_size <= self.l1_max_size {
148 self.l1_cache
149 .write()
150 .unwrap()
151 .insert(key.clone(), value.clone());
152 *current_size += value_size;
153 } else {
154 drop(current_size);
156 self.put_l2(&key, &value)?;
157 }
158
159 Ok(())
160 }
161
162 pub fn get(&self, key: &str) -> DbxResult<Option<Vec<u8>>> {
164 if let Some(value) = self.l1_cache.read().unwrap().get(key) {
166 return Ok(Some(value.clone()));
167 }
168
169 self.get_l2(key)
171 }
172
173 fn put_l2(&self, key: &str, value: &[u8]) -> DbxResult<()> {
175 fs::create_dir_all(&self.l2_cache_dir)?;
177
178 let file_path = self.l2_cache_dir.join(format!("{}.bin", key));
180
181 fs::write(file_path, value)?;
183
184 Ok(())
185 }
186
187 fn get_l2(&self, key: &str) -> DbxResult<Option<Vec<u8>>> {
189 let file_path = self.l2_cache_dir.join(format!("{}.bin", key));
190
191 if !file_path.exists() {
192 return Ok(None);
193 }
194
195 let data = fs::read(file_path)?;
196 Ok(Some(data))
197 }
198
199 pub fn clear(&self) -> DbxResult<()> {
201 self.l1_cache.write().unwrap().clear();
203 *self.l1_current_size.write().unwrap() = 0;
204
205 if self.l2_cache_dir.exists() {
207 fs::remove_dir_all(&self.l2_cache_dir)?;
208 }
209
210 Ok(())
211 }
212
213 pub fn l1_size(&self) -> usize {
215 *self.l1_current_size.read().unwrap()
216 }
217
218 pub fn l1_count(&self) -> usize {
220 self.l1_cache.read().unwrap().len()
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use super::*;
227
228 #[test]
231 fn test_serialization_registry() {
232 let registry = SerializationRegistry::new();
233
234 let serializer: SerializeFn = Arc::new(|data| Ok(data.to_vec()));
236 registry.register_serializer("test_type".to_string(), serializer);
237
238 let deserializer: DeserializeFn = Arc::new(|data| Ok(data.to_vec()));
240 registry.register_deserializer("test_type".to_string(), deserializer);
241
242 let data = b"hello world";
244 let serialized = registry.serialize("test_type", data).unwrap();
245 assert_eq!(serialized, data);
246
247 let deserialized = registry.deserialize("test_type", &serialized).unwrap();
249 assert_eq!(deserialized, data);
250
251 let types = registry.registered_types();
253 assert!(types.contains(&"test_type".to_string()));
254 }
255
256 #[test]
257 fn test_two_level_cache_l1() {
258 let cache = TwoLevelCache::new(1024, PathBuf::from("target/test_cache_l1"));
259
260 let key = "test_key".to_string();
262 let value = b"test_value".to_vec();
263 cache.put(key.clone(), value.clone()).unwrap();
264
265 let retrieved = cache.get(&key).unwrap();
267 assert_eq!(retrieved, Some(value));
268
269 assert!(cache.l1_size() > 0);
271 assert_eq!(cache.l1_count(), 1);
272
273 let _ = cache.clear();
275 }
276
277 #[test]
278 fn test_two_level_cache_l2() {
279 let cache = TwoLevelCache::new(10, PathBuf::from("target/test_cache_l2"));
280
281 let key = "large_key".to_string();
283 let value = vec![0u8; 100]; cache.put(key.clone(), value.clone()).unwrap();
285
286 let retrieved = cache.get(&key).unwrap();
288 assert_eq!(retrieved, Some(value));
289
290 let _ = cache.clear();
292 }
293
294 #[test]
295 fn test_two_level_cache_clear() {
296 let cache = TwoLevelCache::new(1024, PathBuf::from("target/test_cache_clear"));
297
298 cache.put("key1".to_string(), b"value1".to_vec()).unwrap();
300 cache.put("key2".to_string(), b"value2".to_vec()).unwrap();
301
302 cache.clear().unwrap();
304
305 assert_eq!(cache.l1_size(), 0);
307 assert_eq!(cache.l1_count(), 0);
308 assert_eq!(cache.get("key1").unwrap(), None);
309 }
310}