bucky_objects/objects/object_map/
path_env.rs

1use super::access::OpEnvPathAccess;
2use super::cache::*;
3use super::iterator::*;
4use super::lock::*;
5use super::path::*;
6use super::root::ObjectMapRootHolder;
7use crate::*;
8
9use async_std::sync::Mutex as AsyncMutex;
10use once_cell::sync::OnceCell;
11use std::sync::{Arc, RwLock};
12
13//#[derive(Clone)]
14
15pub struct ObjectMapPathSnapshot {
16    // 记录的初始状态的root
17    root: RwLock<ObjectId>,
18
19    // 所有基于path的操作都在这里面实现,包括root的更新
20    path: ObjectMapPath,
21}
22
23// 每个root共享一个大的读cache,每个op_env都有独立的写cache,在commit时候提交
24pub struct ObjectMapPathOpEnv {
25    // 每个root下的op_env都有唯一的一个sid
26    sid: u64,
27
28    // 当前op_env的所属root
29    root_holder: ObjectMapRootHolder,
30
31    path: OnceCell<ObjectMapPathSnapshot>,
32
33    // 同一个root下共享一个全局的锁管理器
34    lock: ObjectMapPathLock,
35
36    // env级别的cache
37    cache: ObjectMapOpEnvCacheRef,
38
39    // 写锁,确保顺序写
40    write_lock: AsyncMutex<()>,
41
42    // 权限相关
43    access: Option<OpEnvPathAccess>,
44}
45
46impl Drop for ObjectMapPathOpEnv {
47    fn drop(&mut self) {
48        async_std::task::block_on(self.unlock());
49    }
50}
51
52impl ObjectMapPathOpEnv {
53    pub(crate) fn new(
54        sid: u64,
55        root_holder: &ObjectMapRootHolder,
56        lock: &ObjectMapPathLock,
57        root_cache: &ObjectMapRootCacheRef,
58        access: Option<OpEnvPathAccess>,
59    ) -> Self {
60        debug!("new path_op_env: sid={},", sid);
61        let cache = ObjectMapOpEnvMemoryCache::new_ref(root_cache.clone());
62
63        Self {
64            sid,
65            root_holder: root_holder.clone(),
66            path: OnceCell::new(),
67            cache,
68            lock: lock.clone(),
69            write_lock: AsyncMutex::new(()),
70            access,
71        }
72    }
73
74    // 获取快照,如果不存在则会创建
75    fn path_snapshot(&self) -> &ObjectMapPathSnapshot {
76        self.path.get_or_init(|| {
77            // 记录当前root的快照
78            let root = self.root_holder.get_current_root();
79            info!(
80                "path_op_env bind root snapshot: sid={}, root={}",
81                self.sid, root
82            );
83            let path = ObjectMapPath::new(root.clone(), self.cache.clone(), true);
84
85            ObjectMapPathSnapshot {
86                root: RwLock::new(root),
87                path,
88            }
89        })
90    }
91
92    pub fn cache(&self) -> &ObjectMapOpEnvCacheRef {
93        &self.cache
94    }
95
96    pub fn sid(&self) -> u64 {
97        self.sid
98    }
99
100    // 调用次方法会导致path快照被绑定,所以如果需要lock,那么需要按照create_op_env->lock->访问其它方法的次序操作
101    pub fn root(&self) -> ObjectId {
102        self.path_snapshot().root.read().unwrap().to_owned()
103    }
104
105    fn path(&self) -> &ObjectMapPath {
106        &self.path_snapshot().path
107    }
108
109    pub async fn lock_path(
110        &self,
111        path_list: Vec<String>,
112        duration_in_millsecs: u64,
113        as_try: bool,
114    ) -> BuckyResult<()> {
115        info!(
116            "path_op_env lock_path: sid={}, path_list={:?}, duration_in_millsecs={}",
117            self.sid, path_list, duration_in_millsecs
118        );
119
120        // First check access permissions!
121        if let Some(access) = &self.access {
122            access.check_full_path_list(&path_list, RequestOpType::Write)?;
123        }
124
125        let mut req_list = vec![];
126        let expired = if duration_in_millsecs > 0 {
127            let now = bucky_time_now();
128            if duration_in_millsecs < (u64::MAX - now) / 1000 {
129                now + duration_in_millsecs * 1000
130            } else {
131                duration_in_millsecs
132            }
133        } else {
134            0
135        };
136
137        for path in path_list {
138            let req = PathLockRequest {
139                path,
140                sid: self.sid,
141                expired,
142            };
143
144            req_list.push(req);
145        }
146
147        if as_try {
148            self.lock.try_lock_list(req_list).await
149        } else {
150            self.lock.lock_list(req_list).await;
151            Ok(())
152        }
153    }
154
155    // list
156    pub async fn list(&self, path: &str) -> BuckyResult<ObjectMapContentList> {
157        // First check access permissions!
158        if let Some(access) = &self.access {
159            access.check_full_path(path, RequestOpType::Read)?;
160        }
161        self.path().list(path).await
162    }
163
164    // metadata
165    pub async fn metadata(&self, path: &str) -> BuckyResult<ObjectMapMetaData> {
166        // First check access permissions!
167        if let Some(access) = &self.access {
168            access.check_full_path(path, RequestOpType::Read)?;
169        }
170        self.path().metadata(path).await
171    }
172
173    // map path methods
174    pub async fn get_by_path(&self, full_path: &str) -> BuckyResult<Option<ObjectId>> {
175        // First check access permissions!
176        if let Some(access) = &self.access {
177            access.check_full_path(full_path, RequestOpType::Read)?;
178        }
179
180        self.path().get_by_path(full_path).await
181    }
182
183    pub async fn create_new_with_path(
184        &self,
185        full_path: &str,
186        content_type: ObjectMapSimpleContentType,
187    ) -> BuckyResult<()> {
188        info!(
189            "op_path_env create_new_with_path: sid={}, path={}, content_type={:?}",
190            self.sid, full_path, content_type,
191        );
192
193        // First check access permissions!
194        if let Some(access) = &self.access {
195            access.check_full_path(full_path, RequestOpType::Write)?;
196        }
197
198        let _write_lock = self.write_lock.lock().await;
199        self.lock.try_enter_path(full_path, self.sid).await?;
200        self.path()
201            .create_new_with_path(full_path, content_type)
202            .await
203    }
204
205    pub async fn insert_with_path(&self, full_path: &str, value: &ObjectId) -> BuckyResult<()> {
206        info!(
207            "op_path_env insert_with_path: sid={}, full_path={}, value={}",
208            self.sid, full_path, value
209        );
210
211        // First check access permissions!
212        if let Some(access) = &self.access {
213            access.check_full_path(full_path, RequestOpType::Write)?;
214        }
215
216        let _write_lock = self.write_lock.lock().await;
217        self.lock.try_enter_path(full_path, self.sid).await?;
218        self.path().insert_with_path(full_path, value).await
219    }
220
221    pub async fn set_with_path(
222        &self,
223        full_path: &str,
224        value: &ObjectId,
225        prev_value: &Option<ObjectId>,
226        auto_insert: bool,
227    ) -> BuckyResult<Option<ObjectId>> {
228        info!(
229            "op_path_env set_with_path: sid={}, full_path={}, value={}, prev_value={:?}, auto_insert={}",
230             self.sid, full_path, value, prev_value, auto_insert,
231        );
232
233        // First check access permissions!
234        if let Some(access) = &self.access {
235            access.check_full_path(full_path, RequestOpType::Write)?;
236        }
237
238        let _write_lock = self.write_lock.lock().await;
239        self.lock.try_enter_path(full_path, self.sid).await?;
240        self.path()
241            .set_with_path(full_path, value, prev_value, auto_insert)
242            .await
243    }
244
245    pub async fn remove_with_path(
246        &self,
247        full_path: &str,
248        prev_value: &Option<ObjectId>,
249    ) -> BuckyResult<Option<ObjectId>> {
250        info!(
251            "op_path_env remove_with_path: sid={}, full_path={}, prev_value={:?}",
252            self.sid, full_path, prev_value,
253        );
254
255        // First check access permissions!
256        if let Some(access) = &self.access {
257            access.check_full_path(full_path, RequestOpType::Write)?;
258        }
259
260        let _write_lock = self.write_lock.lock().await;
261        self.lock.try_enter_path(full_path, self.sid).await?;
262        self.path().remove_with_path(full_path, prev_value).await
263    }
264
265    // map origin methods
266    pub async fn get_by_key(&self, path: &str, key: &str) -> BuckyResult<Option<ObjectId>> {
267        // First check access permissions!
268        if let Some(access) = &self.access {
269            access.check_path_key(path, key, RequestOpType::Read)?;
270        }
271
272        self.path().get_by_key(path, key).await
273    }
274
275    pub async fn create_new(
276        &self,
277        path: &str,
278        key: &str,
279        content_type: ObjectMapSimpleContentType,
280    ) -> BuckyResult<()> {
281        info!(
282            "op_path_env create_new: sid={}, path={}, key={}, content_type={:?}",
283            self.sid, path, key, content_type,
284        );
285
286        // First check access permissions!
287        if let Some(access) = &self.access {
288            access.check_path_key(path, key, RequestOpType::Write)?;
289        }
290
291        let _write_lock = self.write_lock.lock().await;
292        self.lock
293            .try_enter_path_and_key(path, key, self.sid)
294            .await?;
295
296        self.path().create_new(path, key, content_type).await
297    }
298
299    pub async fn insert_with_key(
300        &self,
301        path: &str,
302        key: &str,
303        value: &ObjectId,
304    ) -> BuckyResult<()> {
305        info!(
306            "op_path_env insert_with_key: sid={}, path={}, key={}, value={}",
307            self.sid, path, key, value
308        );
309
310        // First check access permissions!
311        if let Some(access) = &self.access {
312            access.check_path_key(path, key, RequestOpType::Write)?;
313        }
314
315        let _write_lock = self.write_lock.lock().await;
316        self.lock
317            .try_enter_path_and_key(path, key, self.sid)
318            .await?;
319        self.path().insert_with_key(path, key, value).await
320    }
321
322    pub async fn set_with_key(
323        &self,
324        path: &str,
325        key: &str,
326        value: &ObjectId,
327        prev_value: &Option<ObjectId>,
328        auto_insert: bool,
329    ) -> BuckyResult<Option<ObjectId>> {
330        info!(
331            "op_path_env set_with_key: sid={}, path={}, key={}, value={}, prev_value={:?}, auto_insert={}",
332             self.sid, path, key, value, prev_value, auto_insert,
333        );
334
335        // First check access permissions!
336        if let Some(access) = &self.access {
337            access.check_path_key(path, key, RequestOpType::Write)?;
338        }
339
340        let _write_lock = self.write_lock.lock().await;
341        self.lock
342            .try_enter_path_and_key(path, key, self.sid)
343            .await?;
344        self.path()
345            .set_with_key(path, key, value, prev_value, auto_insert)
346            .await
347    }
348
349    pub async fn remove_with_key(
350        &self,
351        path: &str,
352        key: &str,
353        prev_value: &Option<ObjectId>,
354    ) -> BuckyResult<Option<ObjectId>> {
355        info!(
356            "op_path_env remove_with_key: sid={}, path={}, key={}, prev_value={:?}",
357            self.sid, path, key, prev_value,
358        );
359
360        // First check access permissions!
361        if let Some(access) = &self.access {
362            access.check_path_key(path, key, RequestOpType::Write)?;
363        }
364
365        let _write_lock = self.write_lock.lock().await;
366        self.lock
367            .try_enter_path_and_key(path, key, self.sid)
368            .await?;
369        self.path().remove_with_key(path, key, prev_value).await
370    }
371
372    // set methods
373    pub async fn contains(&self, path: &str, object_id: &ObjectId) -> BuckyResult<bool> {
374        // First check access permissions!
375        if let Some(access) = &self.access {
376            access.check_full_path(path, RequestOpType::Read)?;
377        }
378
379        self.path().contains(path, object_id).await
380    }
381
382    pub async fn insert(&self, path: &str, object_id: &ObjectId) -> BuckyResult<bool> {
383        info!(
384            "op_path_env insert: sid={}, path={}, value={}",
385            self.sid, path, object_id,
386        );
387
388        // First check access permissions!
389        if let Some(access) = &self.access {
390            access.check_full_path(path, RequestOpType::Write)?;
391        }
392
393        let _write_lock = self.write_lock.lock().await;
394        self.lock.try_enter_path(path, self.sid).await?;
395        self.path().insert(path, object_id).await
396    }
397
398    pub async fn remove(&self, path: &str, object_id: &ObjectId) -> BuckyResult<bool> {
399        info!(
400            "op_path_env remove: sid={}, path={}, value={}",
401            self.sid, path, object_id,
402        );
403
404        // First check access permissions!
405        if let Some(access) = &self.access {
406            access.check_full_path(path, RequestOpType::Write)?;
407        }
408
409        let _write_lock = self.write_lock.lock().await;
410        self.lock.try_enter_path(path, self.sid).await?;
411        self.path().remove(path, object_id).await
412    }
413
414    async fn update_root(&self) -> BuckyResult<ObjectId> {
415        // 首先判断有没有发生写操作,会导致path.root改变
416        let new_root = self.path().root();
417        let current_root = self.root();
418        if new_root == current_root {
419            info!(
420                "op env commit but root not changed! sid={}, root={}",
421                self.sid, current_root
422            );
423            return Ok(new_root);
424        }
425
426        let this = &self;
427        let update = |root: ObjectId| async move {
428            // 事务提交时候,存在两种情况:
429            // 1. root在外部没有发生改变,那么直接把暂存的操作提交到noc,并切换root到当前path的最新root
430            // 2. root在外部发生改变了,那么需要更新path的root到最新状态,并以事务模式尝试提交,提交成功后,切换root到当前path的最新root
431            if root != current_root {
432                info!("path_op_env commit but root changed, now will redo op list! sid={}, current_root={}, new_root={}", 
433                    this.sid, current_root, root);
434
435                this.cache.abort();
436
437                // root在外部被修改了,那么需要重做op_list
438                this.path().update_root(root.clone(), &new_root)?;
439
440                info!(
441                    "will commit op list on root changed: {} -> {}",
442                    current_root, root
443                );
444                this.path().commit_op_list().await?;
445            } else {
446                // env操作期间,root没发生改变,那么不再重新执行op_list
447                info!("will clear op list because root not changed during the operations: {}", root);
448                this.path().clear_op_list();
449            }
450
451            // update current op_env's snapshot
452            let new_root = this.path().root();
453            *this.path_snapshot().root.write().unwrap() = new_root.clone();
454
455            // 提交所有pending的对象到noc
456            if let Err(e) = this.cache.gc(false, &new_root).await {
457                error!("path env's cache gc error! root={}, {}", root, e);
458            }
459
460            this.cache.commit().await?;
461
462            Ok(new_root)
463        };
464
465        let new_root = self.root_holder.update_root(Box::new(update)).await?;
466
467        Ok(new_root)
468    }
469
470    pub async fn update(&self) -> BuckyResult<ObjectId> {
471        let _write_lock = self.write_lock.lock().await;
472        self.update_root().await
473    }
474
475    // 提交操作,只可以调用一次
476    // 提交成功,返回最新的root id
477    pub async fn commit(self) -> BuckyResult<ObjectId> {
478        self.update_root().await
479    }
480
481    // 释当前session持有的所有lock
482    async fn unlock(&self) {
483        let req = PathUnlockRequest {
484            path: None,
485            sid: self.sid,
486        };
487
488        self.lock.unlock(req).await.unwrap();
489    }
490
491    pub fn abort(self) -> BuckyResult<()> {
492        info!("will abort path_op_env: sid={}", self.sid);
493
494        // 释放cache里面的pending
495        self.cache.abort();
496
497        Ok(())
498    }
499}
500
501#[derive(Clone)]
502pub struct ObjectMapPathOpEnvRef(Arc<ObjectMapPathOpEnv>);
503
504impl ObjectMapPathOpEnvRef {
505    pub fn new(env: ObjectMapPathOpEnv) -> Self {
506        Self(Arc::new(env))
507    }
508
509    fn into_raw(self) -> BuckyResult<ObjectMapPathOpEnv> {
510        let sid = self.sid();
511        let env = Arc::try_unwrap(self.0).map_err(|this| {
512            let msg = format!(
513                "path_op_env's ref_count is more than one! sid={}, ref={}",
514                sid,
515                Arc::strong_count(&this)
516            );
517            error!("{}", msg);
518            BuckyError::new(BuckyErrorCode::ErrorState, msg)
519        })?;
520
521        Ok(env)
522    }
523
524    pub fn is_dropable(&self) -> bool {
525        Arc::strong_count(&self.0) == 1
526    }
527
528    pub async fn commit(self) -> BuckyResult<ObjectId> {
529        let env = self.into_raw()?;
530
531        env.commit().await
532    }
533
534    pub fn abort(self) -> BuckyResult<()> {
535        let env = self.into_raw()?;
536
537        env.abort()
538    }
539}
540
541impl std::ops::Deref for ObjectMapPathOpEnvRef {
542    type Target = Arc<ObjectMapPathOpEnv>;
543    fn deref(&self) -> &Arc<ObjectMapPathOpEnv> {
544        &self.0
545    }
546}