cyfs_base/objects/object_map/
root.rs1use crate::*;
2
3use async_std::sync::Mutex as AsyncMutex;
4use std::future::Future;
5
6use std::sync::{Arc, RwLock};
7
8#[async_trait::async_trait]
9pub trait ObjectMapRootEvent: Sync + Send + 'static {
10 async fn root_updated(
11 &self,
12 dec_id: &Option<ObjectId>,
13 new_root_id: ObjectId,
14 prev_id: ObjectId,
15 ) -> BuckyResult<()>;
16}
17
18pub type ObjectMapRootEventRef = Arc<Box<dyn ObjectMapRootEvent>>;
19
20#[derive(Clone)]
21pub struct ObjectMapRootHolder {
22 dec_id: Option<ObjectId>,
23
24 root: Arc<RwLock<ObjectId>>,
26 update_lock: Arc<AsyncMutex<()>>,
27 event: ObjectMapRootEventRef,
28}
29
30impl ObjectMapRootHolder {
31 pub fn new(dec_id: Option<ObjectId>, root: ObjectId, event: ObjectMapRootEventRef) -> Self {
32 Self {
33 dec_id,
34 root: Arc::new(RwLock::new(root)),
35 update_lock: Arc::new(AsyncMutex::new(())),
36 event,
37 }
38 }
39
40 pub fn get_current_root(&self) -> ObjectId {
41 self.root.read().unwrap().clone()
42 }
43
44 pub async fn direct_reload_root(&self, new_root_id: ObjectId) {
46 let _update_lock = self.update_lock.lock().await;
47 let mut current = self.root.write().unwrap();
48
49 info!(
50 "reload objectmap root holder's root! dec={:?}, current={}, new={}",
51 self.dec_id, *current, new_root_id
52 );
53 *current = new_root_id;
54 }
55
56 pub async fn update_root<F, Fut>(&self, update_root_fn: F) -> BuckyResult<ObjectId>
58 where
59 F: FnOnce(ObjectId) -> Fut,
60 Fut: Future<Output = BuckyResult<ObjectId>>,
61 {
62 let _update_lock = self.update_lock.lock().await;
63 let root = self.get_current_root();
64 let new_root = update_root_fn(root.clone()).await?;
65 if new_root != root {
66 info!("will update root holder: {} -> {}", root, new_root);
67
68 if let Err(e) = self
70 .event
71 .root_updated(&self.dec_id, new_root.clone(), root.clone())
72 .await
73 {
74 error!(
75 "root update event notify error! {} -> {}, {}",
76 root, new_root, e
77 );
78
79 return Err(e);
80 }
81
82 {
85 let mut current = self.root.write().unwrap();
86 assert_eq!(*current, root);
87 *current = new_root.clone();
88 }
89
90 info!("root updated! {} -> {}", root, new_root);
91 }
92
93 Ok(new_root)
94 }
95}