cyfs_base/objects/object_map/
root.rs

1use crate::*;
2
3use async_std::sync::Mutex as AsyncMutex;
4use std::future::Future;
5
6use std::sync::{Arc, RwLock};
7
8#[async_trait::async_trait]
9pub trait ObjectMapRootEvent: Sync + Send + 'static {
10    async fn root_updated(
11        &self,
12        dec_id: &Option<ObjectId>,
13        new_root_id: ObjectId,
14        prev_id: ObjectId,
15    ) -> BuckyResult<()>;
16}
17
18pub type ObjectMapRootEventRef = Arc<Box<dyn ObjectMapRootEvent>>;
19
20#[derive(Clone)]
21pub struct ObjectMapRootHolder {
22    dec_id: Option<ObjectId>,
23
24    // 当前的读写锁,只有在持有update_lock情况下,才可以更新
25    root: Arc<RwLock<ObjectId>>,
26    update_lock: Arc<AsyncMutex<()>>,
27    event: ObjectMapRootEventRef,
28}
29
30impl ObjectMapRootHolder {
31    pub fn new(dec_id: Option<ObjectId>, root: ObjectId, event: ObjectMapRootEventRef) -> Self {
32        Self {
33            dec_id,
34            root: Arc::new(RwLock::new(root)),
35            update_lock: Arc::new(AsyncMutex::new(())),
36            event,
37        }
38    }
39
40    pub fn get_current_root(&self) -> ObjectId {
41        self.root.read().unwrap().clone()
42    }
43
44    // direct set the root_state without notify event
45    pub async fn direct_reload_root(&self, new_root_id: ObjectId) {
46        let _update_lock = self.update_lock.lock().await;
47        let mut current = self.root.write().unwrap();
48
49        info!(
50            "reload objectmap root holder's root! dec={:?}, current={}, new={}",
51            self.dec_id, *current, new_root_id
52        );
53        *current = new_root_id;
54    }
55
56    // 尝试更新root,同一个root同一时刻只能有一个操作在进行,通过异步锁来保证
57    pub async fn update_root<F, Fut>(&self, update_root_fn: F) -> BuckyResult<ObjectId>
58    where
59        F: FnOnce(ObjectId) -> Fut,
60        Fut: Future<Output = BuckyResult<ObjectId>>,
61    {
62        let _update_lock = self.update_lock.lock().await;
63        let root = self.get_current_root();
64        let new_root = update_root_fn(root.clone()).await?;
65        if new_root != root {
66            info!("will update root holder: {} -> {}", root, new_root);
67
68            // 必须先触发事件,通知上层更新全局状态
69            if let Err(e) = self
70                .event
71                .root_updated(&self.dec_id, new_root.clone(), root.clone())
72                .await
73            {
74                error!(
75                    "root update event notify error! {} -> {}, {}",
76                    root, new_root, e
77                );
78
79                return Err(e);
80            }
81
82            // 触发事件成功后,才可以更新root-holder
83            // 避免这两个操作之间,新的root-holder被使用但全局根状态由于没更新导致的各种异常
84            {
85                let mut current = self.root.write().unwrap();
86                assert_eq!(*current, root);
87                *current = new_root.clone();
88            }
89
90            info!("root updated! {} -> {}", root, new_root);
91        }
92
93        Ok(new_root)
94    }
95}