1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
use crate::*;
use async_std::sync::Mutex as AsyncMutex;
use std::future::Future;
use std::sync::{Arc, RwLock};
#[async_trait::async_trait]
pub trait ObjectMapRootEvent: Sync + Send + 'static {
async fn root_updated(
&self,
dec_id: &Option<ObjectId>,
new_root_id: ObjectId,
prev_id: ObjectId,
) -> BuckyResult<()>;
}
pub type ObjectMapRootEventRef = Arc<Box<dyn ObjectMapRootEvent>>;
#[derive(Clone)]
pub struct ObjectMapRootHolder {
dec_id: Option<ObjectId>,
root: Arc<RwLock<ObjectId>>,
update_lock: Arc<AsyncMutex<()>>,
event: ObjectMapRootEventRef,
}
impl ObjectMapRootHolder {
pub fn new(dec_id: Option<ObjectId>, root: ObjectId, event: ObjectMapRootEventRef) -> Self {
Self {
dec_id,
root: Arc::new(RwLock::new(root)),
update_lock: Arc::new(AsyncMutex::new(())),
event,
}
}
pub fn get_current_root(&self) -> ObjectId {
self.root.read().unwrap().clone()
}
pub async fn direct_reload_root(&self, new_root_id: ObjectId) {
let _update_lock = self.update_lock.lock().await;
let mut current = self.root.write().unwrap();
info!(
"reload objectmap root holder's root! dec={:?}, current={}, new={}",
self.dec_id, *current, new_root_id
);
*current = new_root_id;
}
pub async fn update_root<F, Fut>(&self, update_root_fn: F) -> BuckyResult<ObjectId>
where
F: FnOnce(ObjectId) -> Fut,
Fut: Future<Output = BuckyResult<ObjectId>>,
{
let _update_lock = self.update_lock.lock().await;
let root = self.get_current_root();
let new_root = update_root_fn(root.clone()).await?;
if new_root != root {
info!("will update root holder: {} -> {}", root, new_root);
if let Err(e) = self
.event
.root_updated(&self.dec_id, new_root.clone(), root.clone())
.await
{
error!(
"root update event notify error! {} -> {}, {}",
root, new_root, e
);
return Err(e);
}
{
let mut current = self.root.write().unwrap();
assert_eq!(*current, root);
*current = new_root.clone();
}
info!("root updated! {} -> {}", root, new_root);
}
Ok(new_root)
}
}