bucky_objects/objects/object_map/
cache.rs

1use super::object_map::{ObjectMap, ObjectMapRef};
2use super::visitor::*;
3use crate::*;
4
5use async_std::sync::Mutex as AsyncMutex;
6use std::any::Any;
7use std::collections::{hash_map::Entry, HashMap};
8use std::sync::{Arc, Mutex};
9
10/*
11ObjectMap的缓存设计
12每个op_env有独立的写缓存,用以暂存所有写入操作,在commit时候写入下层缓存
13每个root共享一个读缓存
14*/
15
16#[derive(Clone)]
17pub struct ObjectMapCacheItem {
18    pub object: ObjectMap,
19    pub access: AccessString,
20}
21
22// objectmap的依赖的noc接口,实现了object的最终保存和加载
23#[async_trait::async_trait]
24pub trait ObjectMapNOCCache: Send + Sync {
25    async fn exists(&self, dec: Option<ObjectId>, object_id: &ObjectId) -> BuckyResult<bool>;
26
27    async fn get_object_map(
28        &self,
29        dec: Option<ObjectId>,
30        object_id: &ObjectId,
31    ) -> BuckyResult<Option<ObjectMap>> {
32        self.get_object_map_ex(dec, object_id)
33            .await
34            .map(|ret| ret.map(|v| v.object))
35    }
36
37    async fn get_object_map_ex(
38        &self,
39        dec: Option<ObjectId>,
40        object_id: &ObjectId,
41    ) -> BuckyResult<Option<ObjectMapCacheItem>>;
42
43    async fn put_object_map(
44        &self,
45        dec: Option<ObjectId>,
46        object_id: ObjectId,
47        object: ObjectMap,
48        access: Option<AccessString>,
49    ) -> BuckyResult<()>;
50}
51
52pub type ObjectMapNOCCacheRef = Arc<Box<dyn ObjectMapNOCCache>>;
53
54// 简单的内存版本的noc
55pub(crate) struct ObjectMapMemoryNOCCache {
56    cache: Mutex<HashMap<ObjectId, ObjectMapCacheItem>>,
57}
58
59impl ObjectMapMemoryNOCCache {
60    pub fn new() -> ObjectMapNOCCacheRef {
61        let ret = Self {
62            cache: Mutex::new(HashMap::new()),
63        };
64
65        Arc::new(Box::new(ret))
66    }
67}
68
69#[async_trait::async_trait]
70impl ObjectMapNOCCache for ObjectMapMemoryNOCCache {
71    async fn exists(&self, dec_id: Option<ObjectId>, object_id: &ObjectId) -> BuckyResult<bool> {
72        info!(
73            "[memory_noc] exists object: dec={:?}, {}",
74            dec_id, object_id
75        );
76
77        Ok(self.cache.lock().unwrap().contains_key(object_id))
78    }
79
80    async fn get_object_map_ex(
81        &self,
82        dec_id: Option<ObjectId>,
83        object_id: &ObjectId,
84    ) -> BuckyResult<Option<ObjectMapCacheItem>> {
85        info!("[memory_noc] load object: dec={:?}, {}", dec_id, object_id);
86
87        let cache = self.cache.lock().unwrap();
88        Ok(cache.get(object_id).cloned())
89    }
90
91    async fn put_object_map(
92        &self,
93        dec_id: Option<ObjectId>,
94        object_id: ObjectId,
95        object: ObjectMap,
96        access: Option<AccessString>,
97    ) -> BuckyResult<()> {
98        info!(
99            "[memory_noc] save object: dec={:?}, {}, {:?}",
100            dec_id, object_id, access
101        );
102
103        let item = ObjectMapCacheItem {
104            object,
105            access: access.unwrap_or(AccessString::default()),
106        };
107
108        {
109            let mut cache = self.cache.lock().unwrap();
110            cache.insert(object_id, item);
111        }
112
113        Ok(())
114    }
115}
116
117//////////////////////////////////////////////////////////////////////////////////////////////////
118///  同一个root共享的一个cache
119#[async_trait::async_trait]
120pub trait ObjectMapRootCache: Send + Sync {
121    async fn exists(&self, object_id: &ObjectId) -> BuckyResult<bool>;
122
123    async fn get_object_map(&self, object_id: &ObjectId) -> BuckyResult<Option<ObjectMapRef>> {
124        self.get_object_map_ex(object_id)
125            .await
126            .map(|ret| ret.map(|v| v.object))
127    }
128
129    async fn get_object_map_ex(
130        &self,
131        object_id: &ObjectId,
132    ) -> BuckyResult<Option<ObjectMapRefCacheItem>>;
133
134    async fn put_object_map(
135        &self,
136        object_id: ObjectId,
137        object: ObjectMap,
138        access: Option<AccessString>,
139    ) -> BuckyResult<()>;
140}
141
142pub type ObjectMapRootCacheRef = Arc<Box<dyn ObjectMapRootCache>>;
143
144#[derive(Clone)]
145pub struct ObjectMapRefCacheItem {
146    pub object: ObjectMapRef,
147    pub access: AccessString,
148}
149
150pub struct ObjectMapRootMemoryCache {
151    // None for system dec
152    dec_id: Option<ObjectId>,
153
154    // 依赖的底层缓存,一般是noc层的缓存
155    noc: ObjectMapNOCCacheRef,
156
157    // 用来缓存从noc加载的objectmap和subs
158    cache: Arc<Mutex<lru_time_cache::LruCache<ObjectId, ObjectMapRefCacheItem>>>,
159}
160
161impl ObjectMapRootMemoryCache {
162    pub fn new(
163        dec_id: Option<ObjectId>,
164        noc: ObjectMapNOCCacheRef,
165        timeout_in_secs: u64,
166        capacity: usize,
167    ) -> Self {
168        let cache = lru_time_cache::LruCache::with_expiry_duration_and_capacity(
169            std::time::Duration::from_secs(timeout_in_secs),
170            capacity,
171        );
172
173        Self {
174            dec_id,
175            noc,
176            cache: Arc::new(Mutex::new(cache)),
177        }
178    }
179
180    pub fn new_ref(
181        dec_id: Option<ObjectId>,
182        noc: ObjectMapNOCCacheRef,
183        timeout_in_secs: u64,
184        capacity: usize,
185    ) -> ObjectMapRootCacheRef {
186        Arc::new(Box::new(Self::new(dec_id, noc, timeout_in_secs, capacity)))
187    }
188
189    pub fn new_default_ref(
190        dec_id: Option<ObjectId>,
191        noc: ObjectMapNOCCacheRef,
192    ) -> ObjectMapRootCacheRef {
193        Arc::new(Box::new(Self::new(dec_id, noc, 60 * 5, 1024)))
194    }
195
196    async fn exists(&self, object_id: &ObjectId) -> BuckyResult<bool> {
197        if self.cache.lock().unwrap().contains_key(object_id) {
198            return Ok(true);
199        }
200
201        self.noc.exists(self.dec_id.clone(), object_id).await
202    }
203
204    async fn get_object_map_ex(
205        &self,
206        object_id: &ObjectId,
207    ) -> BuckyResult<Option<ObjectMapRefCacheItem>> {
208        let item = self.get_object_map_impl(object_id).await?;
209
210        // FIXME 校验一次id是否是最新的
211        if let Some(item) = &item {
212            let current = item.object.lock().await;
213            let real_id = current.flush_id_without_cache();
214            assert_eq!(real_id, *object_id);
215
216            let current_id = current.cached_object_id();
217            assert_eq!(current_id, Some(real_id));
218        }
219
220        Ok(item)
221    }
222
223    async fn get_object_map_impl(
224        &self,
225        object_id: &ObjectId,
226    ) -> BuckyResult<Option<ObjectMapRefCacheItem>> {
227        // 首先查看缓存
228        if let Some(v) = self.cache.lock().unwrap().get(object_id) {
229            return Ok(Some(v.to_owned()));
230        }
231
232        // 最后尝试从noc加载
233        let ret = self
234            .noc
235            .get_object_map_ex(self.dec_id.clone(), object_id)
236            .await?;
237        if ret.is_none() {
238            return Ok(None);
239        }
240
241        let item = ret.unwrap();
242
243        let object = Arc::new(AsyncMutex::new(item.object));
244
245        let item = ObjectMapRefCacheItem {
246            object,
247            access: item.access,
248        };
249
250        // 缓存
251        {
252            let mut cache = self.cache.lock().unwrap();
253            if let Some(_) = cache.insert(object_id.to_owned(), item.clone()) {
254                warn!(
255                    "insert objectmap to memory cache but already exists! id={}",
256                    object_id
257                );
258            }
259        }
260
261        Ok(Some(item))
262    }
263
264    async fn put_object_map(
265        &self,
266        object_id: ObjectId,
267        object: ObjectMap,
268        access: Option<AccessString>,
269    ) -> BuckyResult<()> {
270        // TODO 这里是否需要更新缓存?
271
272        // FIXME 校验一次id是否是最新的
273        assert_eq!(Some(object_id), object.cached_object_id());
274        let current_id = object.flush_id();
275        assert_eq!(object_id, current_id);
276        let real_id = object.flush_id_without_cache();
277        assert_eq!(real_id, current_id);
278
279        self.noc
280            .put_object_map(self.dec_id.clone(), object_id, object, access)
281            .await
282    }
283}
284
285#[async_trait::async_trait]
286impl ObjectMapRootCache for ObjectMapRootMemoryCache {
287    async fn exists(&self, object_id: &ObjectId) -> BuckyResult<bool> {
288        Self::exists(&self, object_id).await
289    }
290
291    async fn get_object_map_ex(
292        &self,
293        object_id: &ObjectId,
294    ) -> BuckyResult<Option<ObjectMapRefCacheItem>> {
295        Self::get_object_map_ex(&self, object_id).await
296    }
297
298    async fn put_object_map(
299        &self,
300        object_id: ObjectId,
301        object: ObjectMap,
302        access: Option<AccessString>,
303    ) -> BuckyResult<()> {
304        Self::put_object_map(&self, object_id, object, access).await
305    }
306}
307
308/////////////////////////////////////////////////////////////////////////////////////////////////
309/// ObjectMap op_env操作粒度的cache
310#[async_trait::async_trait]
311pub trait ObjectMapOpEnvCache: Send + Sync {
312    // from pending list and lower cache
313    async fn get_object_map(&self, object_id: &ObjectId) -> BuckyResult<Option<ObjectMapRef>> {
314        let ret = self.get_object_map_ex(object_id).await?;
315        Ok(ret.map(|v| v.object))
316    }
317
318    async fn get_object_map_ex(
319        &self,
320        object_id: &ObjectId,
321    ) -> BuckyResult<Option<ObjectMapRefCacheItem>>;
322
323    // check if target object exists, in cache and in lower cache, not only for ObjectMap
324    async fn exists(&self, object_id: &ObjectId) -> BuckyResult<bool>;
325
326    // 同步的put,放置到暂存区
327    fn put_object_map(
328        &self,
329        object_id: &ObjectId,
330        object: ObjectMap,
331        access: Option<AccessString>,
332    ) -> BuckyResult<ObjectMapRef>;
333
334    // 从暂存区移除先前已经提交的操作(put_sub之后,commit之前)
335    fn remove_object_map(&self, object_id: &ObjectId) -> BuckyResult<ObjectMapRef>;
336
337    // 提交所有的暂存操作到下一级缓存/存储
338    async fn commit(&self) -> BuckyResult<()>;
339
340    // gc before commit, clear all untouchable objects from target
341    async fn gc(&self, single: bool, target: &ObjectId) -> BuckyResult<()>;
342
343    // 清除所有待提交的对象
344    fn abort(&self);
345}
346
347pub type ObjectMapOpEnvCacheRef = Arc<Box<dyn ObjectMapOpEnvCache>>;
348
349struct ObjectMapPendingItem {
350    is_touched: bool,
351    item: ObjectMapRef,
352    access: AccessString,
353}
354
355// 写操作缓存
356struct ObjectMapOpEnvMemoryCachePendingList {
357    pending: HashMap<ObjectId, ObjectMapPendingItem>,
358}
359
360impl ObjectMapOpEnvMemoryCachePendingList {
361    pub fn new() -> Self {
362        Self {
363            pending: HashMap::new(),
364        }
365    }
366
367    fn exists(&mut self, object_id: &ObjectId) -> bool {
368        self.pending.contains_key(object_id)
369    }
370
371    fn get_object_map(
372        &mut self,
373        object_id: &ObjectId,
374    ) -> BuckyResult<Option<ObjectMapRefCacheItem>> {
375        if let Some(v) = self.pending.get(object_id) {
376            let item = ObjectMapRefCacheItem {
377                object: v.item.clone(),
378                access: v.access.clone(),
379            };
380
381            return Ok(Some(item));
382        }
383
384        // 缺页错误
385        Ok(None)
386    }
387
388    fn remove_object_map(&mut self, object_id: &ObjectId) -> BuckyResult<ObjectMapRef> {
389        match self.pending.remove(object_id) {
390            Some(ret) => {
391                info!("will remove pending objectmap from cache! id={}", object_id);
392                Ok(ret.item)
393            }
394            None => {
395                let msg = format!(
396                    "remove pending objectmap from cache but not found! id={}",
397                    object_id
398                );
399                error!("{}", msg);
400                Err(BuckyError::new(BuckyErrorCode::NotFound, msg))
401            }
402        }
403    }
404
405    fn put_object_map(
406        &mut self,
407        object_id: &ObjectId,
408        object: ObjectMap,
409        access: Option<AccessString>,
410    ) -> BuckyResult<ObjectMapRef> {
411        let object = Arc::new(AsyncMutex::new(object));
412
413        match self.pending.entry(object_id.to_owned()) {
414            Entry::Occupied(mut o) => {
415                warn!(
416                    "insert object to objectmap memory cache but already exists! id={}, access={:?}",
417                    object_id, access,
418                );
419                let v = ObjectMapPendingItem {
420                    is_touched: false,
421                    item: object.clone(),
422                    access: access.unwrap_or(AccessString::default()),
423                };
424                o.insert(v);
425            }
426            Entry::Vacant(v) => {
427                debug!(
428                    "insert object to objectmap memory cache! id={}, access={:?}",
429                    object_id, access
430                );
431                let item = ObjectMapPendingItem {
432                    is_touched: false,
433                    item: object.clone(),
434                    access: access.unwrap_or(AccessString::default()),
435                };
436                v.insert(item);
437            }
438        };
439
440        Ok(object)
441    }
442
443    async fn commit(
444        pending: HashMap<ObjectId, ObjectMapPendingItem>,
445        root_cache: ObjectMapRootCacheRef,
446    ) -> BuckyResult<()> {
447        let count = pending.len();
448        for (object_id, value) in pending {
449            let obj = value.item;
450
451            assert!(Arc::strong_count(&obj) == 1);
452            let obj = Arc::try_unwrap(obj).unwrap();
453            let obj = obj.into_inner();
454
455            if let Err(e) = root_cache
456                .put_object_map(object_id.clone(), obj, Some(value.access))
457                .await
458            {
459                let msg = format!("commit pending objectmap error! obj={}, {}", object_id, e);
460                error!("{}", msg);
461                return Err(e);
462            }
463        }
464
465        info!("commit all pending objectmap success! count={}", count);
466        Ok(())
467    }
468}
469
470pub struct ObjectMapOpEnvMemoryCache {
471    // 依赖的底层root缓存
472    root_cache: ObjectMapRootCacheRef,
473
474    pending: Mutex<ObjectMapOpEnvMemoryCachePendingList>,
475}
476
477impl ObjectMapOpEnvMemoryCache {
478    pub fn new(root_cache: ObjectMapRootCacheRef) -> Self {
479        Self {
480            root_cache,
481            pending: Mutex::new(ObjectMapOpEnvMemoryCachePendingList::new()),
482        }
483    }
484
485    pub fn new_ref(root_cache: ObjectMapRootCacheRef) -> ObjectMapOpEnvCacheRef {
486        Arc::new(Box::new(Self::new(root_cache)))
487    }
488}
489
490#[async_trait::async_trait]
491impl ObjectMapOpEnvCache for ObjectMapOpEnvMemoryCache {
492    async fn exists(&self, object_id: &ObjectId) -> BuckyResult<bool> {
493        if self.pending.lock().unwrap().exists(object_id) {
494            return Ok(true);
495        }
496
497        self.root_cache.exists(object_id).await
498    }
499
500    async fn get_object_map_ex(
501        &self,
502        object_id: &ObjectId,
503    ) -> BuckyResult<Option<ObjectMapRefCacheItem>> {
504        // 首先从当前写缓存里面查找
505        if let Some(obj) = self.pending.lock().unwrap().get_object_map(object_id)? {
506            return Ok(Some(obj));
507        }
508
509        // 最后尝试从root_cache加载
510        self.root_cache.get_object_map_ex(object_id).await
511    }
512
513    fn put_object_map(
514        &self,
515        object_id: &ObjectId,
516        object: ObjectMap,
517        access: Option<AccessString>,
518    ) -> BuckyResult<ObjectMapRef> {
519        self.pending
520            .lock()
521            .unwrap()
522            .put_object_map(object_id, object, access)
523    }
524
525    fn remove_object_map(&self, object_id: &ObjectId) -> BuckyResult<ObjectMapRef> {
526        self.pending.lock().unwrap().remove_object_map(object_id)
527    }
528
529    async fn commit(&self) -> BuckyResult<()> {
530        // FIXME 这里提交操作会清空所有的pending对象,所以只能操作一次
531        let mut pending = HashMap::new();
532        {
533            let mut inner = self.pending.lock().unwrap();
534            std::mem::swap(&mut inner.pending, &mut pending);
535        }
536        ObjectMapOpEnvMemoryCachePendingList::commit(pending, self.root_cache.clone()).await
537    }
538
539    fn abort(&self) {
540        self.pending.lock().unwrap().pending.clear();
541    }
542
543    async fn gc(&self, single: bool, target: &ObjectId) -> BuckyResult<()> {
544        let mut pending = HashMap::new();
545        {
546            let mut inner = self.pending.lock().unwrap();
547            std::mem::swap(&mut inner.pending, &mut pending);
548        }
549
550        let prev_count = pending.len();
551
552        /*
553        let mut total = 0;
554        for (key, value) in pending.iter() {
555            let len = key.raw_measure(&None).unwrap() + value.item.lock().await.raw_measure(&None).unwrap();
556            total += len;
557        }
558        */
559
560        let gc = ObjectMapOpEnvMemoryCacheGC::new(pending);
561        let result = if single {
562            gc.single_gc(target).await?
563        } else {
564            gc.path_gc(target).await?
565        };
566
567        let mut result: HashMap<ObjectId, ObjectMapPendingItem> = result
568            .into_iter()
569            .filter(|item| item.1.is_touched)
570            .collect();
571
572        info!(
573            "gc for target single={}, target={}, {} -> {}",
574            single,
575            target,
576            prev_count,
577            result.len()
578        );
579
580        {
581            let mut inner = self.pending.lock().unwrap();
582            std::mem::swap(&mut inner.pending, &mut result);
583        }
584
585        Ok(())
586    }
587}
588
589struct ObjectMapOpEnvMemoryCacheGC {
590    pending: HashMap<ObjectId, ObjectMapPendingItem>,
591}
592
593impl ObjectMapOpEnvMemoryCacheGC {
594    pub fn new(pending: HashMap<ObjectId, ObjectMapPendingItem>) -> Self {
595        Self { pending }
596    }
597
598    fn touch_item(&mut self, id: &ObjectId) {
599        debug!("gc touch item: {}", id);
600        if id.is_data() {
601            return;
602        }
603
604        if let Some(v) = self.pending.get_mut(id) {
605            v.is_touched = true;
606        }
607    }
608
609    pub async fn single_gc(
610        mut self,
611        target: &ObjectId,
612    ) -> BuckyResult<HashMap<ObjectId, ObjectMapPendingItem>> {
613        self.touch_item(target);
614
615        let mut visitor = ObjectMapFullVisitor::new(Box::new(self));
616        visitor.visit(target).await?;
617
618        let loader = visitor.into_provider();
619        let this = loader.into_any().downcast::<Self>().unwrap();
620        Ok(this.pending)
621    }
622
623    pub async fn path_gc(
624        mut self,
625        root: &ObjectId,
626    ) -> BuckyResult<HashMap<ObjectId, ObjectMapPendingItem>> {
627        // first touch the root
628        self.touch_item(root);
629
630        let mut visitor = ObjectMapPathVisitor::new(Box::new(self));
631        visitor.visit(root).await?;
632
633        let visitor = visitor.into_provider();
634        let this = visitor.into_any().downcast::<Self>().unwrap();
635        Ok(this.pending)
636    }
637}
638
639#[async_trait::async_trait]
640impl ObjectMapVisitLoader for ObjectMapOpEnvMemoryCacheGC {
641    fn into_any(self: Box<Self>) -> Box<dyn Any> {
642        self
643    }
644
645    async fn get_object_map(&mut self, id: &ObjectId) -> BuckyResult<Option<ObjectMapRef>> {
646        if let Some(v) = self.pending.get(id) {
647            return Ok(Some(v.item.clone()));
648        }
649
650        debug!("object not exists: {}", id);
651        // 缺页错误
652        Ok(None)
653    }
654}
655
656#[async_trait::async_trait]
657impl ObjectMapVisitor for ObjectMapOpEnvMemoryCacheGC {
658    async fn visit_hub_item(&mut self, item: &ObjectId) -> BuckyResult<()> {
659        self.touch_item(&item);
660
661        Ok(())
662    }
663
664    async fn visit_map_item(&mut self, _key: &str, item: &ObjectId) -> BuckyResult<()> {
665        self.touch_item(&item);
666
667        Ok(())
668    }
669
670    async fn visit_set_item(&mut self, item: &ObjectId) -> BuckyResult<()> {
671        self.touch_item(&item);
672
673        Ok(())
674    }
675
676    async fn visit_diff_map_item(
677        &mut self,
678        _key: &str,
679        item: &ObjectMapDiffMapItem,
680    ) -> BuckyResult<()> {
681        if let Some(id) = &item.diff {
682            self.touch_item(&id);
683        }
684
685        Ok(())
686    }
687
688    async fn visit_diff_set_item(&mut self, _item: &ObjectMapDiffSetItem) -> BuckyResult<()> {
689        Ok(())
690    }
691}
692
693impl ObjectMapVisitorProvider for ObjectMapOpEnvMemoryCacheGC {}