1use super::object_map::{ObjectMap, ObjectMapRef};
2use super::visitor::*;
3use crate::*;
4
5use async_std::sync::Mutex as AsyncMutex;
6use std::any::Any;
7use std::collections::{hash_map::Entry, HashMap};
8use std::sync::{Arc, Mutex};
9
10#[derive(Clone)]
17pub struct ObjectMapCacheItem {
18 pub object: ObjectMap,
19 pub access: AccessString,
20}
21
22#[async_trait::async_trait]
24pub trait ObjectMapNOCCache: Send + Sync {
25 async fn exists(&self, dec: Option<ObjectId>, object_id: &ObjectId) -> BuckyResult<bool>;
26
27 async fn get_object_map(
28 &self,
29 dec: Option<ObjectId>,
30 object_id: &ObjectId,
31 ) -> BuckyResult<Option<ObjectMap>> {
32 self.get_object_map_ex(dec, object_id)
33 .await
34 .map(|ret| ret.map(|v| v.object))
35 }
36
37 async fn get_object_map_ex(
38 &self,
39 dec: Option<ObjectId>,
40 object_id: &ObjectId,
41 ) -> BuckyResult<Option<ObjectMapCacheItem>>;
42
43 async fn put_object_map(
44 &self,
45 dec: Option<ObjectId>,
46 object_id: ObjectId,
47 object: ObjectMap,
48 access: Option<AccessString>,
49 ) -> BuckyResult<()>;
50}
51
52pub type ObjectMapNOCCacheRef = Arc<Box<dyn ObjectMapNOCCache>>;
53
54pub(crate) struct ObjectMapMemoryNOCCache {
56 cache: Mutex<HashMap<ObjectId, ObjectMapCacheItem>>,
57}
58
59impl ObjectMapMemoryNOCCache {
60 pub fn new() -> ObjectMapNOCCacheRef {
61 let ret = Self {
62 cache: Mutex::new(HashMap::new()),
63 };
64
65 Arc::new(Box::new(ret))
66 }
67}
68
69#[async_trait::async_trait]
70impl ObjectMapNOCCache for ObjectMapMemoryNOCCache {
71 async fn exists(&self, dec_id: Option<ObjectId>, object_id: &ObjectId) -> BuckyResult<bool> {
72 info!(
73 "[memory_noc] exists object: dec={:?}, {}",
74 dec_id, object_id
75 );
76
77 Ok(self.cache.lock().unwrap().contains_key(object_id))
78 }
79
80 async fn get_object_map_ex(
81 &self,
82 dec_id: Option<ObjectId>,
83 object_id: &ObjectId,
84 ) -> BuckyResult<Option<ObjectMapCacheItem>> {
85 info!("[memory_noc] load object: dec={:?}, {}", dec_id, object_id);
86
87 let cache = self.cache.lock().unwrap();
88 Ok(cache.get(object_id).cloned())
89 }
90
91 async fn put_object_map(
92 &self,
93 dec_id: Option<ObjectId>,
94 object_id: ObjectId,
95 object: ObjectMap,
96 access: Option<AccessString>,
97 ) -> BuckyResult<()> {
98 info!(
99 "[memory_noc] save object: dec={:?}, {}, {:?}",
100 dec_id, object_id, access
101 );
102
103 let item = ObjectMapCacheItem {
104 object,
105 access: access.unwrap_or(AccessString::default()),
106 };
107
108 {
109 let mut cache = self.cache.lock().unwrap();
110 cache.insert(object_id, item);
111 }
112
113 Ok(())
114 }
115}
116
117#[async_trait::async_trait]
120pub trait ObjectMapRootCache: Send + Sync {
121 async fn exists(&self, object_id: &ObjectId) -> BuckyResult<bool>;
122
123 async fn get_object_map(&self, object_id: &ObjectId) -> BuckyResult<Option<ObjectMapRef>> {
124 self.get_object_map_ex(object_id)
125 .await
126 .map(|ret| ret.map(|v| v.object))
127 }
128
129 async fn get_object_map_ex(
130 &self,
131 object_id: &ObjectId,
132 ) -> BuckyResult<Option<ObjectMapRefCacheItem>>;
133
134 async fn put_object_map(
135 &self,
136 object_id: ObjectId,
137 object: ObjectMap,
138 access: Option<AccessString>,
139 ) -> BuckyResult<()>;
140}
141
142pub type ObjectMapRootCacheRef = Arc<Box<dyn ObjectMapRootCache>>;
143
144#[derive(Clone)]
145pub struct ObjectMapRefCacheItem {
146 pub object: ObjectMapRef,
147 pub access: AccessString,
148}
149
150pub struct ObjectMapRootMemoryCache {
151 dec_id: Option<ObjectId>,
153
154 noc: ObjectMapNOCCacheRef,
156
157 cache: Arc<Mutex<lru_time_cache::LruCache<ObjectId, ObjectMapRefCacheItem>>>,
159}
160
161impl ObjectMapRootMemoryCache {
162 pub fn new(
163 dec_id: Option<ObjectId>,
164 noc: ObjectMapNOCCacheRef,
165 timeout_in_secs: u64,
166 capacity: usize,
167 ) -> Self {
168 let cache = lru_time_cache::LruCache::with_expiry_duration_and_capacity(
169 std::time::Duration::from_secs(timeout_in_secs),
170 capacity,
171 );
172
173 Self {
174 dec_id,
175 noc,
176 cache: Arc::new(Mutex::new(cache)),
177 }
178 }
179
180 pub fn new_ref(
181 dec_id: Option<ObjectId>,
182 noc: ObjectMapNOCCacheRef,
183 timeout_in_secs: u64,
184 capacity: usize,
185 ) -> ObjectMapRootCacheRef {
186 Arc::new(Box::new(Self::new(dec_id, noc, timeout_in_secs, capacity)))
187 }
188
189 pub fn new_default_ref(
190 dec_id: Option<ObjectId>,
191 noc: ObjectMapNOCCacheRef,
192 ) -> ObjectMapRootCacheRef {
193 Arc::new(Box::new(Self::new(dec_id, noc, 60 * 5, 1024)))
194 }
195
196 async fn exists(&self, object_id: &ObjectId) -> BuckyResult<bool> {
197 if self.cache.lock().unwrap().contains_key(object_id) {
198 return Ok(true);
199 }
200
201 self.noc.exists(self.dec_id.clone(), object_id).await
202 }
203
204 async fn get_object_map_ex(
205 &self,
206 object_id: &ObjectId,
207 ) -> BuckyResult<Option<ObjectMapRefCacheItem>> {
208 let item = self.get_object_map_impl(object_id).await?;
209
210 if let Some(item) = &item {
212 let current = item.object.lock().await;
213 let real_id = current.flush_id_without_cache();
214 assert_eq!(real_id, *object_id);
215
216 let current_id = current.cached_object_id();
217 assert_eq!(current_id, Some(real_id));
218 }
219
220 Ok(item)
221 }
222
223 async fn get_object_map_impl(
224 &self,
225 object_id: &ObjectId,
226 ) -> BuckyResult<Option<ObjectMapRefCacheItem>> {
227 if let Some(v) = self.cache.lock().unwrap().get(object_id) {
229 return Ok(Some(v.to_owned()));
230 }
231
232 let ret = self
234 .noc
235 .get_object_map_ex(self.dec_id.clone(), object_id)
236 .await?;
237 if ret.is_none() {
238 return Ok(None);
239 }
240
241 let item = ret.unwrap();
242
243 let object = Arc::new(AsyncMutex::new(item.object));
244
245 let item = ObjectMapRefCacheItem {
246 object,
247 access: item.access,
248 };
249
250 {
252 let mut cache = self.cache.lock().unwrap();
253 if let Some(_) = cache.insert(object_id.to_owned(), item.clone()) {
254 warn!(
255 "insert objectmap to memory cache but already exists! id={}",
256 object_id
257 );
258 }
259 }
260
261 Ok(Some(item))
262 }
263
264 async fn put_object_map(
265 &self,
266 object_id: ObjectId,
267 object: ObjectMap,
268 access: Option<AccessString>,
269 ) -> BuckyResult<()> {
270 assert_eq!(Some(object_id), object.cached_object_id());
274 let current_id = object.flush_id();
275 assert_eq!(object_id, current_id);
276 let real_id = object.flush_id_without_cache();
277 assert_eq!(real_id, current_id);
278
279 self.noc
280 .put_object_map(self.dec_id.clone(), object_id, object, access)
281 .await
282 }
283}
284
285#[async_trait::async_trait]
286impl ObjectMapRootCache for ObjectMapRootMemoryCache {
287 async fn exists(&self, object_id: &ObjectId) -> BuckyResult<bool> {
288 Self::exists(&self, object_id).await
289 }
290
291 async fn get_object_map_ex(
292 &self,
293 object_id: &ObjectId,
294 ) -> BuckyResult<Option<ObjectMapRefCacheItem>> {
295 Self::get_object_map_ex(&self, object_id).await
296 }
297
298 async fn put_object_map(
299 &self,
300 object_id: ObjectId,
301 object: ObjectMap,
302 access: Option<AccessString>,
303 ) -> BuckyResult<()> {
304 Self::put_object_map(&self, object_id, object, access).await
305 }
306}
307
308#[async_trait::async_trait]
311pub trait ObjectMapOpEnvCache: Send + Sync {
312 async fn get_object_map(&self, object_id: &ObjectId) -> BuckyResult<Option<ObjectMapRef>> {
314 let ret = self.get_object_map_ex(object_id).await?;
315 Ok(ret.map(|v| v.object))
316 }
317
318 async fn get_object_map_ex(
319 &self,
320 object_id: &ObjectId,
321 ) -> BuckyResult<Option<ObjectMapRefCacheItem>>;
322
323 async fn exists(&self, object_id: &ObjectId) -> BuckyResult<bool>;
325
326 fn put_object_map(
328 &self,
329 object_id: &ObjectId,
330 object: ObjectMap,
331 access: Option<AccessString>,
332 ) -> BuckyResult<ObjectMapRef>;
333
334 fn remove_object_map(&self, object_id: &ObjectId) -> BuckyResult<ObjectMapRef>;
336
337 async fn commit(&self) -> BuckyResult<()>;
339
340 async fn gc(&self, single: bool, target: &ObjectId) -> BuckyResult<()>;
342
343 fn abort(&self);
345}
346
347pub type ObjectMapOpEnvCacheRef = Arc<Box<dyn ObjectMapOpEnvCache>>;
348
349struct ObjectMapPendingItem {
350 is_touched: bool,
351 item: ObjectMapRef,
352 access: AccessString,
353}
354
355struct ObjectMapOpEnvMemoryCachePendingList {
357 pending: HashMap<ObjectId, ObjectMapPendingItem>,
358}
359
360impl ObjectMapOpEnvMemoryCachePendingList {
361 pub fn new() -> Self {
362 Self {
363 pending: HashMap::new(),
364 }
365 }
366
367 fn exists(&mut self, object_id: &ObjectId) -> bool {
368 self.pending.contains_key(object_id)
369 }
370
371 fn get_object_map(
372 &mut self,
373 object_id: &ObjectId,
374 ) -> BuckyResult<Option<ObjectMapRefCacheItem>> {
375 if let Some(v) = self.pending.get(object_id) {
376 let item = ObjectMapRefCacheItem {
377 object: v.item.clone(),
378 access: v.access.clone(),
379 };
380
381 return Ok(Some(item));
382 }
383
384 Ok(None)
386 }
387
388 fn remove_object_map(&mut self, object_id: &ObjectId) -> BuckyResult<ObjectMapRef> {
389 match self.pending.remove(object_id) {
390 Some(ret) => {
391 info!("will remove pending objectmap from cache! id={}", object_id);
392 Ok(ret.item)
393 }
394 None => {
395 let msg = format!(
396 "remove pending objectmap from cache but not found! id={}",
397 object_id
398 );
399 error!("{}", msg);
400 Err(BuckyError::new(BuckyErrorCode::NotFound, msg))
401 }
402 }
403 }
404
405 fn put_object_map(
406 &mut self,
407 object_id: &ObjectId,
408 object: ObjectMap,
409 access: Option<AccessString>,
410 ) -> BuckyResult<ObjectMapRef> {
411 let object = Arc::new(AsyncMutex::new(object));
412
413 match self.pending.entry(object_id.to_owned()) {
414 Entry::Occupied(mut o) => {
415 warn!(
416 "insert object to objectmap memory cache but already exists! id={}, access={:?}",
417 object_id, access,
418 );
419 let v = ObjectMapPendingItem {
420 is_touched: false,
421 item: object.clone(),
422 access: access.unwrap_or(AccessString::default()),
423 };
424 o.insert(v);
425 }
426 Entry::Vacant(v) => {
427 debug!(
428 "insert object to objectmap memory cache! id={}, access={:?}",
429 object_id, access
430 );
431 let item = ObjectMapPendingItem {
432 is_touched: false,
433 item: object.clone(),
434 access: access.unwrap_or(AccessString::default()),
435 };
436 v.insert(item);
437 }
438 };
439
440 Ok(object)
441 }
442
443 async fn commit(
444 pending: HashMap<ObjectId, ObjectMapPendingItem>,
445 root_cache: ObjectMapRootCacheRef,
446 ) -> BuckyResult<()> {
447 let count = pending.len();
448 for (object_id, value) in pending {
449 let obj = value.item;
450
451 assert!(Arc::strong_count(&obj) == 1);
452 let obj = Arc::try_unwrap(obj).unwrap();
453 let obj = obj.into_inner();
454
455 if let Err(e) = root_cache
456 .put_object_map(object_id.clone(), obj, Some(value.access))
457 .await
458 {
459 let msg = format!("commit pending objectmap error! obj={}, {}", object_id, e);
460 error!("{}", msg);
461 return Err(e);
462 }
463 }
464
465 info!("commit all pending objectmap success! count={}", count);
466 Ok(())
467 }
468}
469
470pub struct ObjectMapOpEnvMemoryCache {
471 root_cache: ObjectMapRootCacheRef,
473
474 pending: Mutex<ObjectMapOpEnvMemoryCachePendingList>,
475}
476
477impl ObjectMapOpEnvMemoryCache {
478 pub fn new(root_cache: ObjectMapRootCacheRef) -> Self {
479 Self {
480 root_cache,
481 pending: Mutex::new(ObjectMapOpEnvMemoryCachePendingList::new()),
482 }
483 }
484
485 pub fn new_ref(root_cache: ObjectMapRootCacheRef) -> ObjectMapOpEnvCacheRef {
486 Arc::new(Box::new(Self::new(root_cache)))
487 }
488}
489
490#[async_trait::async_trait]
491impl ObjectMapOpEnvCache for ObjectMapOpEnvMemoryCache {
492 async fn exists(&self, object_id: &ObjectId) -> BuckyResult<bool> {
493 if self.pending.lock().unwrap().exists(object_id) {
494 return Ok(true);
495 }
496
497 self.root_cache.exists(object_id).await
498 }
499
500 async fn get_object_map_ex(
501 &self,
502 object_id: &ObjectId,
503 ) -> BuckyResult<Option<ObjectMapRefCacheItem>> {
504 if let Some(obj) = self.pending.lock().unwrap().get_object_map(object_id)? {
506 return Ok(Some(obj));
507 }
508
509 self.root_cache.get_object_map_ex(object_id).await
511 }
512
513 fn put_object_map(
514 &self,
515 object_id: &ObjectId,
516 object: ObjectMap,
517 access: Option<AccessString>,
518 ) -> BuckyResult<ObjectMapRef> {
519 self.pending
520 .lock()
521 .unwrap()
522 .put_object_map(object_id, object, access)
523 }
524
525 fn remove_object_map(&self, object_id: &ObjectId) -> BuckyResult<ObjectMapRef> {
526 self.pending.lock().unwrap().remove_object_map(object_id)
527 }
528
529 async fn commit(&self) -> BuckyResult<()> {
530 let mut pending = HashMap::new();
532 {
533 let mut inner = self.pending.lock().unwrap();
534 std::mem::swap(&mut inner.pending, &mut pending);
535 }
536 ObjectMapOpEnvMemoryCachePendingList::commit(pending, self.root_cache.clone()).await
537 }
538
539 fn abort(&self) {
540 self.pending.lock().unwrap().pending.clear();
541 }
542
543 async fn gc(&self, single: bool, target: &ObjectId) -> BuckyResult<()> {
544 let mut pending = HashMap::new();
545 {
546 let mut inner = self.pending.lock().unwrap();
547 std::mem::swap(&mut inner.pending, &mut pending);
548 }
549
550 let prev_count = pending.len();
551
552 let gc = ObjectMapOpEnvMemoryCacheGC::new(pending);
561 let result = if single {
562 gc.single_gc(target).await?
563 } else {
564 gc.path_gc(target).await?
565 };
566
567 let mut result: HashMap<ObjectId, ObjectMapPendingItem> = result
568 .into_iter()
569 .filter(|item| item.1.is_touched)
570 .collect();
571
572 info!(
573 "gc for target single={}, target={}, {} -> {}",
574 single,
575 target,
576 prev_count,
577 result.len()
578 );
579
580 {
581 let mut inner = self.pending.lock().unwrap();
582 std::mem::swap(&mut inner.pending, &mut result);
583 }
584
585 Ok(())
586 }
587}
588
589struct ObjectMapOpEnvMemoryCacheGC {
590 pending: HashMap<ObjectId, ObjectMapPendingItem>,
591}
592
593impl ObjectMapOpEnvMemoryCacheGC {
594 pub fn new(pending: HashMap<ObjectId, ObjectMapPendingItem>) -> Self {
595 Self { pending }
596 }
597
598 fn touch_item(&mut self, id: &ObjectId) {
599 debug!("gc touch item: {}", id);
600 if id.is_data() {
601 return;
602 }
603
604 if let Some(v) = self.pending.get_mut(id) {
605 v.is_touched = true;
606 }
607 }
608
609 pub async fn single_gc(
610 mut self,
611 target: &ObjectId,
612 ) -> BuckyResult<HashMap<ObjectId, ObjectMapPendingItem>> {
613 self.touch_item(target);
614
615 let mut visitor = ObjectMapFullVisitor::new(Box::new(self));
616 visitor.visit(target).await?;
617
618 let loader = visitor.into_provider();
619 let this = loader.into_any().downcast::<Self>().unwrap();
620 Ok(this.pending)
621 }
622
623 pub async fn path_gc(
624 mut self,
625 root: &ObjectId,
626 ) -> BuckyResult<HashMap<ObjectId, ObjectMapPendingItem>> {
627 self.touch_item(root);
629
630 let mut visitor = ObjectMapPathVisitor::new(Box::new(self));
631 visitor.visit(root).await?;
632
633 let visitor = visitor.into_provider();
634 let this = visitor.into_any().downcast::<Self>().unwrap();
635 Ok(this.pending)
636 }
637}
638
639#[async_trait::async_trait]
640impl ObjectMapVisitLoader for ObjectMapOpEnvMemoryCacheGC {
641 fn into_any(self: Box<Self>) -> Box<dyn Any> {
642 self
643 }
644
645 async fn get_object_map(&mut self, id: &ObjectId) -> BuckyResult<Option<ObjectMapRef>> {
646 if let Some(v) = self.pending.get(id) {
647 return Ok(Some(v.item.clone()));
648 }
649
650 debug!("object not exists: {}", id);
651 Ok(None)
653 }
654}
655
656#[async_trait::async_trait]
657impl ObjectMapVisitor for ObjectMapOpEnvMemoryCacheGC {
658 async fn visit_hub_item(&mut self, item: &ObjectId) -> BuckyResult<()> {
659 self.touch_item(&item);
660
661 Ok(())
662 }
663
664 async fn visit_map_item(&mut self, _key: &str, item: &ObjectId) -> BuckyResult<()> {
665 self.touch_item(&item);
666
667 Ok(())
668 }
669
670 async fn visit_set_item(&mut self, item: &ObjectId) -> BuckyResult<()> {
671 self.touch_item(&item);
672
673 Ok(())
674 }
675
676 async fn visit_diff_map_item(
677 &mut self,
678 _key: &str,
679 item: &ObjectMapDiffMapItem,
680 ) -> BuckyResult<()> {
681 if let Some(id) = &item.diff {
682 self.touch_item(&id);
683 }
684
685 Ok(())
686 }
687
688 async fn visit_diff_set_item(&mut self, _item: &ObjectMapDiffSetItem) -> BuckyResult<()> {
689 Ok(())
690 }
691}
692
693impl ObjectMapVisitorProvider for ObjectMapOpEnvMemoryCacheGC {}