1use crate::{CacheConfig, CachedStorage, GitObject, ObjectId, ObjectStore, Result, StorageError};
7use parking_lot::RwLock;
8use std::collections::HashSet;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11
12#[derive(Debug, Clone)]
14pub struct HybridConfig {
15 pub hot_max_objects: usize,
17 pub hot_max_bytes: usize,
19 pub cache_config: CacheConfig,
21 pub promote_threshold: u32,
23 pub migration_interval_secs: u64,
25}
26
27impl Default for HybridConfig {
28 fn default() -> Self {
29 Self {
30 hot_max_objects: 10_000,
31 hot_max_bytes: 512 * 1024 * 1024, cache_config: CacheConfig::default(),
33 promote_threshold: 3,
34 migration_interval_secs: 60,
35 }
36 }
37}
38
39#[derive(Debug, Default)]
41struct AccessTracker {
42 counts: RwLock<std::collections::HashMap<ObjectId, u32>>,
44 total_accesses: AtomicU64,
46}
47
48impl AccessTracker {
49 fn record_access(&self, id: &ObjectId) -> u32 {
50 self.total_accesses.fetch_add(1, Ordering::Relaxed);
51 let mut counts = self.counts.write();
52 let count = counts.entry(*id).or_insert(0);
53 *count += 1;
54 *count
55 }
56
57 fn get_count(&self, id: &ObjectId) -> u32 {
58 self.counts.read().get(id).copied().unwrap_or(0)
59 }
60
61 fn reset(&self, id: &ObjectId) {
62 self.counts.write().remove(id);
63 }
64}
65
66pub struct HybridStorage<C> {
68 hot: Arc<ObjectStore>,
70 cold: Arc<C>,
72 cache: CachedStorage<Arc<C>>,
74 hot_objects: RwLock<HashSet<ObjectId>>,
76 hot_size: AtomicU64,
78 tracker: AccessTracker,
80 config: HybridConfig,
82 stats: HybridStats,
84}
85
86#[derive(Debug, Default)]
88struct HybridStats {
89 hot_hits: AtomicU64,
90 hot_misses: AtomicU64,
91 promotions: AtomicU64,
92 demotions: AtomicU64,
93}
94
95impl<C> HybridStorage<C>
96where
97 C: crate::traits::ObjectStoreBackend + Send + Sync + 'static,
98{
99 pub fn new(cold: C, config: HybridConfig) -> Self {
101 let cold = Arc::new(cold);
102 let cache = CachedStorage::new(Arc::clone(&cold), config.cache_config.clone());
103
104 Self {
105 hot: Arc::new(ObjectStore::new()),
106 cold,
107 cache,
108 hot_objects: RwLock::new(HashSet::new()),
109 hot_size: AtomicU64::new(0),
110 tracker: AccessTracker::default(),
111 config,
112 stats: HybridStats::default(),
113 }
114 }
115
116 pub fn with_defaults(cold: C) -> Self {
118 Self::new(cold, HybridConfig::default())
119 }
120
121 pub fn get(&self, id: &ObjectId) -> Result<Option<GitObject>> {
123 let access_count = self.tracker.record_access(id);
125
126 if self.hot_objects.read().contains(id) {
128 self.stats.hot_hits.fetch_add(1, Ordering::Relaxed);
129 match self.hot.get(id) {
130 Ok(obj) => return Ok(Some(obj)),
131 Err(StorageError::ObjectNotFound(_)) => {
132 }
134 Err(e) => return Err(e),
135 }
136 }
137
138 self.stats.hot_misses.fetch_add(1, Ordering::Relaxed);
139
140 let result = self.cache.get(id)?;
142
143 if let Some(ref obj) = result {
145 if access_count >= self.config.promote_threshold {
146 self.try_promote(obj.clone());
147 }
148 }
149
150 Ok(result)
151 }
152
153 pub fn put(&self, object: GitObject) -> Result<ObjectId> {
155 let size = object.data.len() as u64;
156 let id = object.id;
157
158 self.cold.put(object.clone())?;
160
161 if self.can_add_to_hot(size) {
163 self.hot.put(object);
164 self.hot_objects.write().insert(id);
165 self.hot_size.fetch_add(size, Ordering::Relaxed);
166 }
167
168 Ok(id)
169 }
170
171 pub fn contains(&self, id: &ObjectId) -> Result<bool> {
173 if self.hot_objects.read().contains(id) {
174 return Ok(true);
175 }
176 self.cold.contains(id)
177 }
178
179 pub fn delete(&self, id: &ObjectId) -> Result<bool> {
181 if self.hot_objects.write().remove(id) {
183 if let Ok(obj) = self.hot.get(id) {
184 self.hot_size
185 .fetch_sub(obj.data.len() as u64, Ordering::Relaxed);
186 }
187 }
188
189 self.cache.invalidate(id);
191
192 self.cold.delete(id)
194 }
195
196 pub fn len(&self) -> Result<usize> {
198 self.cold.len()
199 }
200
201 pub fn is_empty(&self) -> Result<bool> {
203 self.cold.is_empty()
204 }
205
206 pub fn list_objects(&self) -> Result<Vec<ObjectId>> {
208 self.cold.list_objects()
209 }
210
211 pub fn flush(&self) -> Result<()> {
213 self.cold.flush()
215 }
216
217 fn can_add_to_hot(&self, size: u64) -> bool {
219 let current_size = self.hot_size.load(Ordering::Relaxed);
220 let current_count = self.hot_objects.read().len();
221
222 current_count < self.config.hot_max_objects
223 && current_size + size <= self.config.hot_max_bytes as u64
224 }
225
226 fn try_promote(&self, object: GitObject) {
228 let size = object.data.len() as u64;
229 let id = object.id;
230
231 while !self.can_add_to_hot(size) {
233 if !self.evict_one() {
234 return; }
236 }
237
238 self.hot.put(object);
239 self.hot_objects.write().insert(id);
240 self.hot_size.fetch_add(size, Ordering::Relaxed);
241 self.stats.promotions.fetch_add(1, Ordering::Relaxed);
242 }
243
244 fn evict_one(&self) -> bool {
246 let hot_objects = self.hot_objects.read();
247 if hot_objects.is_empty() {
248 return false;
249 }
250
251 let victim = hot_objects
253 .iter()
254 .min_by_key(|id| self.tracker.get_count(id))
255 .copied();
256
257 drop(hot_objects);
258
259 if let Some(victim_id) = victim {
260 if let Ok(obj) = self.hot.get(&victim_id) {
261 let size = obj.data.len() as u64;
262 self.hot_objects.write().remove(&victim_id);
263 self.hot_size.fetch_sub(size, Ordering::Relaxed);
264 self.tracker.reset(&victim_id);
265 self.stats.demotions.fetch_add(1, Ordering::Relaxed);
266 return true;
267 }
268 }
269
270 false
271 }
272
273 pub fn stats(&self) -> HybridStatsSnapshot {
275 HybridStatsSnapshot {
276 hot_objects: self.hot_objects.read().len(),
277 hot_size_bytes: self.hot_size.load(Ordering::Relaxed),
278 hot_hits: self.stats.hot_hits.load(Ordering::Relaxed),
279 hot_misses: self.stats.hot_misses.load(Ordering::Relaxed),
280 promotions: self.stats.promotions.load(Ordering::Relaxed),
281 demotions: self.stats.demotions.load(Ordering::Relaxed),
282 cache_stats: self.cache.stats(),
283 }
284 }
285}
286
287#[derive(Debug, Clone)]
289pub struct HybridStatsSnapshot {
290 pub hot_objects: usize,
291 pub hot_size_bytes: u64,
292 pub hot_hits: u64,
293 pub hot_misses: u64,
294 pub promotions: u64,
295 pub demotions: u64,
296 pub cache_stats: crate::CacheStats,
297}
298
299impl HybridStatsSnapshot {
300 pub fn hot_hit_ratio(&self) -> f64 {
302 let total = self.hot_hits + self.hot_misses;
303 if total == 0 {
304 0.0
305 } else {
306 self.hot_hits as f64 / total as f64
307 }
308 }
309}
310
311impl<C> crate::traits::ObjectStoreBackend for HybridStorage<C>
313where
314 C: crate::traits::ObjectStoreBackend + Send + Sync + 'static,
315{
316 fn put(&self, object: GitObject) -> Result<ObjectId> {
317 HybridStorage::put(self, object)
318 }
319
320 fn get(&self, id: &ObjectId) -> Result<Option<GitObject>> {
321 HybridStorage::get(self, id)
322 }
323
324 fn contains(&self, id: &ObjectId) -> Result<bool> {
325 HybridStorage::contains(self, id)
326 }
327
328 fn delete(&self, id: &ObjectId) -> Result<bool> {
329 HybridStorage::delete(self, id)
330 }
331
332 fn len(&self) -> Result<usize> {
333 HybridStorage::len(self)
334 }
335
336 fn list_objects(&self) -> Result<Vec<ObjectId>> {
337 HybridStorage::list_objects(self)
338 }
339
340 fn flush(&self) -> Result<()> {
341 HybridStorage::flush(self)
342 }
343}
344
345#[cfg(test)]
346mod tests {
347 use super::*;
348 use crate::traits::ObjectStoreBackend;
349
350 struct MemoryCold {
352 store: ObjectStore,
353 }
354
355 impl MemoryCold {
356 fn new() -> Self {
357 Self {
358 store: ObjectStore::new(),
359 }
360 }
361 }
362
363 impl crate::traits::ObjectStoreBackend for MemoryCold {
364 fn put(&self, object: GitObject) -> Result<ObjectId> {
365 Ok(self.store.put(object))
366 }
367
368 fn get(&self, id: &ObjectId) -> Result<Option<GitObject>> {
369 match self.store.get(id) {
370 Ok(obj) => Ok(Some(obj)),
371 Err(StorageError::ObjectNotFound(_)) => Ok(None),
372 Err(e) => Err(e),
373 }
374 }
375
376 fn contains(&self, id: &ObjectId) -> Result<bool> {
377 Ok(self.store.contains(id))
378 }
379
380 fn delete(&self, _id: &ObjectId) -> Result<bool> {
381 Ok(false)
382 }
383
384 fn len(&self) -> Result<usize> {
385 Ok(self.store.len())
386 }
387
388 fn list_objects(&self) -> Result<Vec<ObjectId>> {
389 Ok(self.store.list_objects())
390 }
391 }
392
393 #[test]
394 fn test_hybrid_put_get() {
395 let cold = MemoryCold::new();
396 let hybrid = HybridStorage::with_defaults(cold);
397
398 let obj = GitObject::blob(b"test data".to_vec());
399 let id = hybrid.put(obj.clone()).unwrap();
400
401 let retrieved = hybrid.get(&id).unwrap().unwrap();
402 assert_eq!(retrieved.id, obj.id);
403 }
404
405 #[test]
406 fn test_hot_storage_hit() {
407 let cold = MemoryCold::new();
408 let hybrid = HybridStorage::with_defaults(cold);
409
410 let obj = GitObject::blob(b"hot data".to_vec());
411 let id = hybrid.put(obj).unwrap();
412
413 assert!(hybrid.hot_objects.read().contains(&id));
415
416 hybrid.get(&id).unwrap();
418
419 let stats = hybrid.stats();
420 assert_eq!(stats.hot_hits, 1);
421 }
422
423 #[test]
424 fn test_promotion() {
425 let config = HybridConfig {
426 promote_threshold: 2,
427 ..Default::default()
428 };
429 let cold = MemoryCold::new();
430 let hybrid = HybridStorage::new(cold, config);
431
432 let obj = GitObject::blob(b"promote me".to_vec());
434 hybrid.cold.put(obj.clone()).unwrap();
435
436 hybrid.get(&obj.id).unwrap();
438 assert!(!hybrid.hot_objects.read().contains(&obj.id));
439
440 hybrid.get(&obj.id).unwrap();
442 assert!(hybrid.hot_objects.read().contains(&obj.id));
443
444 let stats = hybrid.stats();
445 assert_eq!(stats.promotions, 1);
446 }
447
448 #[test]
449 fn test_eviction() {
450 let config = HybridConfig {
451 hot_max_objects: 2,
452 hot_max_bytes: 30, ..Default::default()
454 };
455 let cold = MemoryCold::new();
456 let hybrid = HybridStorage::new(cold, config);
457
458 for i in 0..3 {
460 let obj = GitObject::blob(format!("data-{}-pad", i).into_bytes());
461 hybrid.put(obj).unwrap();
462 }
463
464 assert!(hybrid.hot_objects.read().len() <= 2);
466 }
467
468 #[test]
469 fn test_stats() {
470 let cold = MemoryCold::new();
471 let hybrid = HybridStorage::with_defaults(cold);
472
473 let obj = GitObject::blob(b"test".to_vec());
474 let id = hybrid.put(obj).unwrap();
475
476 for _ in 0..5 {
478 hybrid.get(&id).unwrap();
479 }
480
481 let stats = hybrid.stats();
482 assert!(stats.hot_objects > 0);
483 assert!(stats.hot_hits > 0);
484 }
485}