bucky_objects/objects/object_map/
isolate_path_env.rs

1use super::access::OpEnvPathAccess;
2use super::cache::*;
3use super::iterator::*;
4use super::path::*;
5use super::root::ObjectMapRootHolder;
6use crate::*;
7
8use async_std::sync::Mutex as AsyncMutex;
9use once_cell::sync::OnceCell;
10use std::sync::Arc;
11use bucky_error::{BuckyError, BuckyErrorCode, BuckyResult};
12
13// 每个root共享一个大的读cache,每个op_env都有独立的写cache,在commit时候提交
14pub struct ObjectMapIsolatePathOpEnv {
15    // each op_env under root has the unique SID
16    sid: u64,
17
18    // The root current op_env's belonging to
19    root_holder: ObjectMapRootHolder,
20
21    path: OnceCell<ObjectMapPath>,
22
23    // the cache owned by current op-env
24    cache: ObjectMapOpEnvCacheRef,
25
26    // Write locks, ensure order writing
27    write_lock: AsyncMutex<()>,
28
29    // Permission related
30    access: Option<OpEnvPathAccess>,
31}
32
33impl ObjectMapIsolatePathOpEnv {
34    pub(crate) fn new(
35        sid: u64,
36        root_holder: &ObjectMapRootHolder,
37        root_cache: &ObjectMapRootCacheRef,
38        access: Option<OpEnvPathAccess>,
39    ) -> Self {
40        debug!("new isolate_path_op_env: sid={},", sid);
41        let cache = ObjectMapOpEnvMemoryCache::new_ref(root_cache.clone());
42
43        Self {
44            sid,
45            root_holder: root_holder.clone(),
46            path: OnceCell::new(),
47            cache,
48            write_lock: AsyncMutex::new(()),
49            access,
50        }
51    }
52
53    fn init_path(&self, root: ObjectId) -> BuckyResult<()> {
54        let path = ObjectMapPath::new(root.clone(), self.cache.clone(), false);
55        if let Err(_) = self.path.set(path) {
56            let msg = format!(
57                "isolate_path_op_env has been initialized already! current root={}",
58                self.path.get().unwrap().root()
59            );
60            error!("{}", msg);
61            return Err(BuckyError::new(BuckyErrorCode::AlreadyExists, msg));
62        }
63
64        Ok(())
65    }
66
67    fn path(&self) -> BuckyResult<&ObjectMapPath> {
68        self.path.get().ok_or(BuckyError::new(
69            BuckyErrorCode::ErrorState,
70            "isolate_path_op_env has not been initialized yet!",
71        ))
72    }
73
74    // init methods
75    pub async fn create_new(&self, content_type: ObjectMapSimpleContentType, owner: Option<ObjectId>, dec_id: Option<ObjectId>,) -> BuckyResult<()> {
76        let obj = ObjectMap::new(
77            content_type.clone(),
78            owner,
79            dec_id,
80        )
81        .no_create_time()
82        .build();
83        let id = obj.flush_id();
84        info!(
85            "create new objectmap for ioslate_path_op_env: content_type={:?}, id={}",
86            content_type, id
87        );
88
89        self.cache.put_object_map(&id, obj, None)?;
90        self.init_path(id)
91    }
92
93    pub async fn load(&self, obj_map_id: &ObjectId) -> BuckyResult<()> {
94        let ret = self.cache.get_object_map(obj_map_id).await?;
95        if ret.is_none() {
96            let msg = format!(
97                "load ioslate_path_op_env object_id but not found! id={}",
98                obj_map_id,
99            );
100            error!("{}", msg);
101            return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
102        }
103
104        debug!("load objectmap for ioslate_path_op_env: id={}", obj_map_id,);
105        self.init_path(obj_map_id.to_owned())
106    }
107
108    pub async fn load_by_path(&self, full_path: &str) -> BuckyResult<()> {
109        let (path, key) = ObjectMapPath::parse_path_allow_empty_key(full_path)?;
110
111        self.load_by_key(path, key).await
112    }
113
114    pub async fn load_with_inner_path(
115        &self,
116        obj_map_id: &ObjectId,
117        inner_path: Option<String>,
118    ) -> BuckyResult<()> {
119        let value = match &inner_path {
120            Some(inner_path) if inner_path.len() > 0 => {
121                let object_path = ObjectMapPath::new(obj_map_id.clone(), self.cache.clone(), false);
122                let value = object_path.get_by_path(&inner_path).await?;
123                if value.is_none() {
124                    let msg = format!(
125                        "load ioslate_path_op_env with inner_path but not found! root={}, inner_path={}",
126                        obj_map_id, inner_path,
127                    );
128                    error!("{}", msg);
129                    return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
130                }
131
132                value.unwrap()
133            }
134            _ => obj_map_id.to_owned(),
135        };
136
137        info!(
138            "will load ioslate_path_op_env with inner_path! root={}, inner_path={:?}, target={}",
139            obj_map_id, inner_path, value,
140        );
141
142        self.load(&value).await
143    }
144
145    // Load object_map on the specified path
146    // The root object cannot use single_op_env to operate directly, so at least one key must be specified!
147    pub async fn load_by_key(&self, path: &str, key: &str) -> BuckyResult<()> {
148        // First check access permissions!
149        if let Some(access) = &self.access {
150            access.check_path_key(path, key, RequestOpType::Read)?;
151        }
152
153        let root = self.root_holder.get_current_root();
154
155        let value = if key.len() > 0 {
156            let object_path = ObjectMapPath::new(root.clone(), self.cache.clone(), false);
157            let value = object_path.get_by_key(path, key).await?;
158            if value.is_none() {
159                let msg = format!(
160                    "load ioslate_path_op_env by path but not found! root={}, path={}, key={}",
161                    root, path, key
162                );
163                error!("{}", msg);
164                return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
165            }
166
167            value.unwrap()
168        } else {
169            assert_eq!(path, "/");
170            root
171        };
172
173        info!(
174            "will load ioslate_path_op_env by path! root={}, path={}, key={}, value={}",
175            root, path, key, value
176        );
177
178        self.load(&value).await
179    }
180
181    pub fn cache(&self) -> &ObjectMapOpEnvCacheRef {
182        &self.cache
183    }
184
185    pub fn sid(&self) -> u64 {
186        self.sid
187    }
188
189    // Calling this method will cause the path snapshot to be bound, so if a lock is needed,
190    // you should follow the sequence of create_op_env -> lock -> access other methods for operations.
191    pub fn root(&self) -> Option<ObjectId> {
192        self.path.get().map(|path| path.root())
193    }
194
195    // list
196    pub async fn list(&self, path: &str) -> BuckyResult<ObjectMapContentList> {
197        self.path()?.list(path).await
198    }
199
200    // metadata
201    pub async fn metadata(&self, path: &str) -> BuckyResult<ObjectMapMetaData> {
202        self.path()?.metadata(path).await
203    }
204
205    // map path methods
206    pub async fn get_by_path(&self, full_path: &str) -> BuckyResult<Option<ObjectId>> {
207        self.path()?.get_by_path(full_path).await
208    }
209
210    pub async fn create_new_with_path(
211        &self,
212        full_path: &str,
213        content_type: ObjectMapSimpleContentType,
214    ) -> BuckyResult<()> {
215        info!(
216            "op_path_env create_new_with_path: sid={}, path={}, content_type={:?}",
217            self.sid, full_path, content_type,
218        );
219
220        let _write_lock = self.write_lock.lock().await;
221        self.path()?
222            .create_new_with_path(full_path, content_type)
223            .await
224    }
225
226    pub async fn insert_with_path(&self, full_path: &str, value: &ObjectId) -> BuckyResult<()> {
227        info!(
228            "op_path_env insert_with_path: sid={}, full_path={}, value={}",
229            self.sid, full_path, value
230        );
231
232        let _write_lock = self.write_lock.lock().await;
233        self.path()?.insert_with_path(full_path, value).await
234    }
235
236    pub async fn set_with_path(
237        &self,
238        full_path: &str,
239        value: &ObjectId,
240        prev_value: &Option<ObjectId>,
241        auto_insert: bool,
242    ) -> BuckyResult<Option<ObjectId>> {
243        info!(
244            "op_path_env set_with_path: sid={}, full_path={}, value={}, prev_value={:?}, auto_insert={}",
245             self.sid, full_path, value, prev_value, auto_insert,
246        );
247
248        let _write_lock = self.write_lock.lock().await;
249        self.path()?
250            .set_with_path(full_path, value, prev_value, auto_insert)
251            .await
252    }
253
254    pub async fn remove_with_path(
255        &self,
256        full_path: &str,
257        prev_value: &Option<ObjectId>,
258    ) -> BuckyResult<Option<ObjectId>> {
259        info!(
260            "op_path_env remove_with_path: sid={}, full_path={}, prev_value={:?}",
261            self.sid, full_path, prev_value,
262        );
263
264        let _write_lock = self.write_lock.lock().await;
265        self.path()?.remove_with_path(full_path, prev_value).await
266    }
267
268    // map origin methods
269    pub async fn get_by_key(&self, path: &str, key: &str) -> BuckyResult<Option<ObjectId>> {
270        self.path()?.get_by_key(path, key).await
271    }
272
273    pub async fn create_new_with_key(
274        &self,
275        path: &str,
276        key: &str,
277        content_type: ObjectMapSimpleContentType,
278    ) -> BuckyResult<()> {
279        info!(
280            "op_path_env create_new: sid={}, path={}, key={}, content_type={:?}",
281            self.sid, path, key, content_type,
282        );
283
284        let _write_lock = self.write_lock.lock().await;
285        self.path()?.create_new(path, key, content_type).await
286    }
287
288    pub async fn insert_with_key(
289        &self,
290        path: &str,
291        key: &str,
292        value: &ObjectId,
293    ) -> BuckyResult<()> {
294        info!(
295            "op_path_env insert_with_key: sid={}, path={}, key={}, value={}",
296            self.sid, path, key, value
297        );
298
299        let _write_lock = self.write_lock.lock().await;
300        self.path()?.insert_with_key(path, key, value).await
301    }
302
303    pub async fn set_with_key(
304        &self,
305        path: &str,
306        key: &str,
307        value: &ObjectId,
308        prev_value: &Option<ObjectId>,
309        auto_insert: bool,
310    ) -> BuckyResult<Option<ObjectId>> {
311        info!(
312            "op_path_env set_with_key: sid={}, path={}, key={}, value={}, prev_value={:?}, auto_insert={}",
313             self.sid, path, key, value, prev_value, auto_insert,
314        );
315
316        let _write_lock = self.write_lock.lock().await;
317        self.path()?
318            .set_with_key(path, key, value, prev_value, auto_insert)
319            .await
320    }
321
322    pub async fn remove_with_key(
323        &self,
324        path: &str,
325        key: &str,
326        prev_value: &Option<ObjectId>,
327    ) -> BuckyResult<Option<ObjectId>> {
328        info!(
329            "op_path_env remove_with_key: sid={}, path={}, key={}, prev_value={:?}",
330            self.sid, path, key, prev_value,
331        );
332
333        let _write_lock = self.write_lock.lock().await;
334        self.path()?.remove_with_key(path, key, prev_value).await
335    }
336
337    // set methods
338    pub async fn contains(&self, path: &str, object_id: &ObjectId) -> BuckyResult<bool> {
339        self.path()?.contains(path, object_id).await
340    }
341
342    pub async fn insert(&self, path: &str, object_id: &ObjectId) -> BuckyResult<bool> {
343        info!(
344            "op_path_env insert: sid={}, path={}, value={}",
345            self.sid, path, object_id,
346        );
347
348        let _write_lock = self.write_lock.lock().await;
349        self.path()?.insert(path, object_id).await
350    }
351
352    pub async fn remove(&self, path: &str, object_id: &ObjectId) -> BuckyResult<bool> {
353        info!(
354            "op_path_env remove: sid={}, path={}, value={}",
355            self.sid, path, object_id,
356        );
357
358        let _write_lock = self.write_lock.lock().await;
359        self.path()?.remove(path, object_id).await
360    }
361
362    pub async fn update(&self) -> BuckyResult<ObjectId> {
363        let _write_lock = self.write_lock.lock().await;
364
365        // First gc temporary objects that are generated
366        let root = self.path()?.root();
367        if let Err(e) = self.cache.gc(false, &root).await {
368            error!("path env's cache gc error! root={}, {}", root, e);
369        }
370
371        // Save all result objects to noc
372        self.cache.commit().await?;
373
374        Ok(root)
375    }
376
377    // Commit operation, can only be called once
378    // Return the newest root id if commit success!
379    pub async fn commit(self) -> BuckyResult<ObjectId> {
380        self.update().await
381    }
382
383    pub fn abort(self) -> BuckyResult<()> {
384        info!("will abort isolate_path_op_env: sid={}", self.sid);
385
386        // Relase the pending objects in cache
387        self.cache.abort();
388
389        Ok(())
390    }
391}
392
393#[derive(Clone)]
394pub struct ObjectMapIsolatePathOpEnvRef(Arc<ObjectMapIsolatePathOpEnv>);
395
396impl ObjectMapIsolatePathOpEnvRef {
397    pub fn new(env: ObjectMapIsolatePathOpEnv) -> Self {
398        Self(Arc::new(env))
399    }
400
401    fn into_raw(self) -> BuckyResult<ObjectMapIsolatePathOpEnv> {
402        let sid = self.sid();
403        let env = Arc::try_unwrap(self.0).map_err(|this| {
404            let msg = format!(
405                "path_op_env's ref_count is more than one! sid={}, ref={}",
406                sid,
407                Arc::strong_count(&this)
408            );
409            error!("{}", msg);
410            BuckyError::new(BuckyErrorCode::ErrorState, msg)
411        })?;
412
413        Ok(env)
414    }
415
416    pub fn is_dropable(&self) -> bool {
417        Arc::strong_count(&self.0) == 1
418    }
419
420    pub async fn commit(self) -> BuckyResult<ObjectId> {
421        let env = self.into_raw()?;
422
423        env.commit().await
424    }
425
426    pub fn abort(self) -> BuckyResult<()> {
427        let env = self.into_raw()?;
428
429        env.abort()
430    }
431}
432
433impl std::ops::Deref for ObjectMapIsolatePathOpEnvRef {
434    type Target = Arc<ObjectMapIsolatePathOpEnv>;
435    fn deref(&self) -> &Arc<ObjectMapIsolatePathOpEnv> {
436        &self.0
437    }
438}