bucky_objects/objects/object_map/
single_env.rs

1use super::access::OpEnvPathAccess;
2use super::cache::*;
3use super::iterator::*;
4use super::object_map::*;
5use super::path::*;
6use crate::*;
7
8use async_std::sync::Mutex as AsyncMutex;
9use once_cell::sync::OnceCell;
10use std::sync::Arc;
11
12pub struct ObjectMapSingleOpEnv {
13    sid: u64,
14
15    // 所属dec的root
16    root_holder: ObjectMapRootHolder,
17
18    // 操作的目标object_map
19    root: AsyncMutex<Option<ObjectMap>>,
20
21    // env级别的cache
22    cache: ObjectMapOpEnvCacheRef,
23
24    iterator: OnceCell<AsyncMutex<ObjectMapIterator>>,
25
26    // 权限相关
27    access: Option<OpEnvPathAccess>,
28}
29
30impl ObjectMapSingleOpEnv {
31    pub(crate) fn new(
32        sid: u64,
33        root_holder: &ObjectMapRootHolder,
34        root_cache: &ObjectMapRootCacheRef,
35        access: Option<OpEnvPathAccess>,
36    ) -> Self {
37        let cache = ObjectMapOpEnvMemoryCache::new_ref(root_cache.clone());
38
39        Self {
40            sid,
41            root_holder: root_holder.clone(),
42            root: AsyncMutex::new(None),
43            cache,
44            iterator: OnceCell::new(),
45            access,
46        }
47    }
48
49    pub fn sid(&self) -> u64 {
50        self.sid
51    }
52
53    // 获取当前操作的object_map id,需要注意在commit之前都是快照模式,id不会更新
54    pub async fn get_current_root(&self) -> Option<ObjectId> {
55        let ret = self.root.lock().await;
56        ret.as_ref().map(|v| v.cached_object_id().unwrap())
57    }
58
59    async fn set_root(&self, obj_map: ObjectMap) -> BuckyResult<()> {
60        let mut current = self.root.lock().await;
61        if current.is_some() {
62            let msg = format!(
63                "single op_env root already been set! id={}",
64                current.as_ref().unwrap().cached_object_id().unwrap()
65            );
66            error!("{}", msg);
67            return Err(BuckyError::new(BuckyErrorCode::AlreadyExists, msg));
68        }
69
70        info!(
71            "single op_env root init: id={}",
72            obj_map.cached_object_id().unwrap()
73        );
74
75        *current = Some(obj_map);
76
77        Ok(())
78    }
79
80    // 创建一个新的object_map
81    pub async fn create_new(&self, content_type: ObjectMapSimpleContentType, owner: Option<ObjectId>, dec_id: Option<ObjectId>,) -> BuckyResult<()> {
82        let obj = ObjectMap::new(
83            content_type.clone(),
84            owner,
85            dec_id,
86        )
87        .no_create_time()
88        .build();
89        let id = obj.flush_id();
90        info!(
91            "create new objectmap for single op_env: content_type={:?}, id={}",
92            content_type, id
93        );
94
95        self.set_root(obj).await?;
96
97        Ok(())
98    }
99
100    // 加载一个已有的object_map
101    pub async fn load(&self, obj_map_id: &ObjectId) -> BuckyResult<()> {
102        let ret = self.cache.get_object_map(obj_map_id).await?;
103        if ret.is_none() {
104            let msg = format!(
105                "load single op_env object_id but not found! id={}",
106                obj_map_id,
107            );
108            error!("{}", msg);
109            return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
110        }
111
112        debug!("load objectmap for single op_env: id={}", obj_map_id,);
113
114        // 拷贝一份用以后续的修改操作
115        let obj_map = ret.unwrap().lock().await.clone();
116        self.set_root(obj_map).await?;
117
118        Ok(())
119    }
120
121    pub async fn load_by_path(&self, full_path: &str) -> BuckyResult<()> {
122        let (path, key) = ObjectMapPath::parse_path_allow_empty_key(full_path)?;
123
124        self.load_by_key(path, key).await
125    }
126
127    pub async fn load_with_inner_path(&self, obj_map_id: &ObjectId, inner_path: Option<String>) -> BuckyResult<()> {
128        let value = match &inner_path {
129            Some(inner_path) if inner_path.len() > 0  => {
130                let object_path = ObjectMapPath::new(obj_map_id.clone(), self.cache.clone(), false);
131                let value = object_path.get_by_path(&inner_path).await?;
132                if value.is_none() {
133                    let msg = format!(
134                        "load single_op_env with inner_path but not found! root={}, inner_path={}",
135                        obj_map_id, inner_path,
136                    );
137                    warn!("{}", msg);
138                    return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
139                }
140    
141                value.unwrap()
142            }
143            _ => {
144                obj_map_id.to_owned()
145            }
146        };
147
148        info!(
149            "will load single_op_env with inner_path! root={}, inner_path={:?}, target={}",
150            obj_map_id, inner_path, value,
151        );
152
153        self.load(&value).await
154    }
155
156    // 加载指定路径上的object_map
157    // root不能使用single_op_env直接操作,所以必须至少要指定一个key
158    pub async fn load_by_key(&self, path: &str, key: &str) -> BuckyResult<()> {
159        // First check access permissions!
160        if let Some(access) = &self.access {
161            access.check_path_key(path, key, RequestOpType::Read)?;
162        }
163
164        let root = self.root_holder.get_current_root();
165
166        let value = if key.len() > 0 {
167            let object_path = ObjectMapPath::new(root.clone(), self.cache.clone(), false);
168            let value = object_path.get_by_key(path, key).await?;
169            if value.is_none() {
170                let msg = format!(
171                    "load single_op_env by path but not found! root={}, path={}, key={}",
172                    root, path, key
173                );
174                warn!("{}", msg);
175                return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
176            }
177
178            value.unwrap()
179        } else {
180            assert_eq!(path, "/");
181            root
182        };
183
184        info!(
185            "will load single_op_env by path! root={}, path={}, key={}, value={}",
186            root, path, key, value
187        );
188
189        self.load(&value).await
190    }
191
192    // list
193    pub async fn list(&self) -> BuckyResult<ObjectMapContentList> {
194        let ret = self.root.lock().await;
195        if ret.is_none() {
196            let msg = format!("single op_env root not been init yet! sid={}", self.sid);
197            error!("{}", msg);
198            return Err(BuckyError::new(BuckyErrorCode::ErrorState, msg));
199        }
200
201        let obj = ret.as_ref().unwrap();
202        let mut list = ObjectMapContentList::new(obj.count() as usize);
203        ret.as_ref().unwrap().list(&self.cache, &mut list).await?;
204
205        Ok(list)
206    }
207
208    // iterator
209    pub async fn next(&self, step: usize) -> BuckyResult<ObjectMapContentList> {
210        let ret = self.root.lock().await;
211        if ret.is_none() {
212            let msg = format!("single op_env root not been init yet! sid={}", self.sid);
213            error!("{}", msg);
214            return Err(BuckyError::new(BuckyErrorCode::ErrorState, msg));
215        }
216
217        let obj = ret.as_ref().unwrap();
218
219        let iterator = self.iterator.get_or_init(|| {
220            let ret = ObjectMapIterator::new(false, &obj, self.cache.clone());
221            AsyncMutex::new(ret)
222        });
223
224        let mut it = iterator.lock().await;
225        it.next(&obj, step).await
226    }
227
228    // reset the iterator
229    pub async fn reset(&self) {
230        if self.iterator.get().is_none() {
231            return;
232        }
233
234        let ret = self.root.lock().await;
235        if ret.is_none() {
236            let msg = format!("single op_env root not been init yet! sid={}", self.sid);
237            error!("{}", msg);
238            return;
239        }
240
241        let obj = ret.as_ref().unwrap();
242
243        let ret = self.iterator.get();
244        if ret.is_none() {
245            return;
246        }
247
248        let new_it = ObjectMapIterator::new(false, &obj, self.cache.clone());
249
250        info!(
251            "will reset single op_env iterator: root={}",
252            obj.cached_object_id().unwrap()
253        );
254
255        let iterator = ret.unwrap();
256        *iterator.lock().await = new_it;
257    }
258
259    pub async fn metadata(&self) -> BuckyResult<ObjectMapMetaData> {
260        let ret = self.root.lock().await;
261        if ret.is_none() {
262            let msg = format!("single op_env root not been init yet! sid={}", self.sid);
263            error!("{}", msg);
264            return Err(BuckyError::new(BuckyErrorCode::ErrorState, msg));
265        }
266
267        let obj = ret.as_ref().unwrap();
268        Ok(obj.metadata())
269    }
270
271    // map methods
272    pub async fn get_by_key(&self, key: &str) -> BuckyResult<Option<ObjectId>> {
273        let ret = self.root.lock().await;
274        if ret.is_none() {
275            let msg = format!("single op_env root not been init yet! sid={}", self.sid);
276            error!("{}", msg);
277            return Err(BuckyError::new(BuckyErrorCode::ErrorState, msg));
278        }
279
280        ret.as_ref().unwrap().get_by_key(&self.cache, key).await
281    }
282
283    pub async fn insert_with_key(&self, key: &str, value: &ObjectId) -> BuckyResult<()> {
284        let mut ret = self.root.lock().await;
285        if ret.is_none() {
286            let msg = format!(
287                "single op_env root not been init yet! key={}, value={}",
288                key, value
289            );
290            error!("{}", msg);
291            return Err(BuckyError::new(BuckyErrorCode::ErrorState, msg));
292        }
293
294        ret.as_mut()
295            .unwrap()
296            .insert_with_key(&self.cache, key, value)
297            .await
298    }
299
300    pub async fn set_with_key(
301        &self,
302        key: &str,
303        value: &ObjectId,
304        prev_value: &Option<ObjectId>,
305        auto_insert: bool,
306    ) -> BuckyResult<Option<ObjectId>> {
307        let mut ret = self.root.lock().await;
308        if ret.is_none() {
309            let msg = format!(
310                "single op_env root not been init yet! sid={}, key={}, value={}",
311                self.sid, key, value
312            );
313            error!("{}", msg);
314            return Err(BuckyError::new(BuckyErrorCode::ErrorState, msg));
315        }
316
317        ret.as_mut()
318            .unwrap()
319            .set_with_key(&self.cache, key, value, prev_value, auto_insert)
320            .await
321    }
322
323    pub async fn remove_with_key(
324        &self,
325        key: &str,
326        prev_value: &Option<ObjectId>,
327    ) -> BuckyResult<Option<ObjectId>> {
328        let mut ret = self.root.lock().await;
329        if ret.is_none() {
330            let msg = format!(
331                "single op_env root not been init yet! sid={}, key={}",
332                self.sid, key
333            );
334            error!("{}", msg);
335            return Err(BuckyError::new(BuckyErrorCode::ErrorState, msg));
336        }
337
338        ret.as_mut()
339            .unwrap()
340            .remove_with_key(&self.cache, key, prev_value)
341            .await
342    }
343
344    // set methods
345    pub async fn contains(&self, object_id: &ObjectId) -> BuckyResult<bool> {
346        let ret = self.root.lock().await;
347        if ret.is_none() {
348            let msg = format!(
349                "single op_env root not been init yet! sid={}, value={}",
350                self.sid, object_id
351            );
352            error!("{}", msg);
353            return Err(BuckyError::new(BuckyErrorCode::ErrorState, msg));
354        }
355
356        ret.as_ref().unwrap().contains(&self.cache, object_id).await
357    }
358
359    pub async fn insert(&self, object_id: &ObjectId) -> BuckyResult<bool> {
360        let mut ret = self.root.lock().await;
361        if ret.is_none() {
362            let msg = format!(
363                "single op_env root not been init yet! sid={}, value={}",
364                self.sid, object_id
365            );
366            error!("{}", msg);
367            return Err(BuckyError::new(BuckyErrorCode::ErrorState, msg));
368        }
369
370        ret.as_mut().unwrap().insert(&self.cache, object_id).await
371    }
372
373    pub async fn remove(&self, object_id: &ObjectId) -> BuckyResult<bool> {
374        let mut ret = self.root.lock().await;
375        if ret.is_none() {
376            let msg = format!(
377                "single op_env root not been init yet! sid={}, value={}",
378                self.sid, object_id
379            );
380            error!("{}", msg);
381            return Err(BuckyError::new(BuckyErrorCode::ErrorState, msg));
382        }
383
384        ret.as_mut().unwrap().remove(&self.cache, object_id).await
385    }
386
387    async fn update_root(&self, finish: bool) -> BuckyResult<ObjectId> {
388        let mut root_slot = self.root.lock().await;
389        if root_slot.is_none() {
390            let msg = format!(
391                "update root error, single op_env root not been init yet! sid={}",
392                self.sid
393            );
394            error!("{}", msg);
395            return Err(BuckyError::new(BuckyErrorCode::ErrorState, msg));
396        }
397
398        let root = root_slot.as_ref().unwrap();
399        let object_id = root.cached_object_id().unwrap();
400        let new_id = root.flush_id();
401        if object_id == new_id {
402            info!(
403                "single op_env update root but object_id unchanged! id={}",
404                object_id
405            );
406            return Ok(new_id);
407        }
408
409        // 发生改变了,需要提交到noc
410        info!(
411            "single op_env root object changed! sid={}, {} => {}",
412            self.sid, object_id, new_id
413        );
414
415        let root = if finish {
416            root_slot.take().unwrap()
417        } else {
418            root.clone()
419        };
420
421        self.cache.put_object_map(&new_id, root, None)?;
422
423        if let Err(e) = self.cache.gc(true, &new_id).await {
424            error!("single env's cache gc error! root={}, {}", new_id, e);
425        }
426
427        self.cache.commit().await?;
428
429        info!(
430            "single op_env update root success! sid={}, root=={}",
431            self.sid, new_id
432        );
433        Ok(new_id)
434    }
435
436    pub async fn update(&self) -> BuckyResult<ObjectId> {
437        self.update_root(false).await
438    }
439
440    pub async fn commit(self) -> BuckyResult<ObjectId> {
441        self.update_root(true).await
442    }
443
444    pub fn abort(self) -> BuckyResult<()> {
445        info!("will abort single_op_env: sid={}", self.sid);
446        self.cache.abort();
447
448        Ok(())
449    }
450}
451
452#[derive(Clone)]
453pub struct ObjectMapSingleOpEnvRef(Arc<ObjectMapSingleOpEnv>);
454
455impl ObjectMapSingleOpEnvRef {
456    pub fn new(env: ObjectMapSingleOpEnv) -> Self {
457        Self(Arc::new(env))
458    }
459
460    fn into_raw(self) -> BuckyResult<ObjectMapSingleOpEnv> {
461        let sid = self.sid();
462        let env = Arc::try_unwrap(self.0).map_err(|this| {
463            let msg = format!(
464                "single_op_env's ref_count is more than one! sid={}, ref={}",
465                sid,
466                Arc::strong_count(&this)
467            );
468            error!("{}", msg);
469            BuckyError::new(BuckyErrorCode::ErrorState, msg)
470        })?;
471
472        Ok(env)
473    }
474
475    pub fn is_dropable(&self) -> bool {
476        Arc::strong_count(&self.0) == 1
477    }
478
479    pub async fn commit(self) -> BuckyResult<ObjectId> {
480        let env = self.into_raw()?;
481
482        env.commit().await
483    }
484
485    pub fn abort(self) -> BuckyResult<()> {
486        let env = self.into_raw()?;
487
488        env.abort()
489    }
490}
491
492impl std::ops::Deref for ObjectMapSingleOpEnvRef {
493    type Target = Arc<ObjectMapSingleOpEnv>;
494    fn deref(&self) -> &Arc<ObjectMapSingleOpEnv> {
495        &self.0
496    }
497}