bucky_objects/objects/object_map/
path.rs

1use super::cache::*;
2use super::check::*;
3use super::iterator::*;
4use super::object_map::*;
5use super::op::*;
6use crate::*;
7
8use std::sync::{Arc, Mutex};
9
10// 基于路径管理的ObjectMap集合,共享同一个root,每级子路径对应一个ObjectMap
11pub struct ObjectMapPath {
12    root: Arc<Mutex<ObjectId>>,
13    obj_map_cache: ObjectMapOpEnvCacheRef,
14
15    // 用以暂存所有写入操作
16    write_ops: Option<ObjectMapOpList>,
17}
18
19struct ObjectMapPathSeg {
20    obj_map: ObjectMap,
21    seg: Option<String>,
22}
23
24impl std::fmt::Debug for ObjectMapPathSeg {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        write!(f, "[{:?},{:?}]", self.seg, self.obj_map.cached_object_id())
27    }
28}
29
30impl ObjectMapPath {
31    pub fn new(
32        root: ObjectId,
33        obj_map_cache: ObjectMapOpEnvCacheRef,
34        enable_transaction: bool,
35    ) -> Self {
36        Self {
37            root: Arc::new(Mutex::new(root)),
38            obj_map_cache,
39            write_ops: if enable_transaction {
40                Some(ObjectMapOpList::new())
41            } else {
42                None
43            },
44        }
45    }
46
47    // 获取当前的root
48    pub fn root(&self) -> ObjectId {
49        self.root.lock().unwrap().clone()
50    }
51
52    pub fn update_root(&self, root_id: ObjectId, prev_id: &ObjectId) -> BuckyResult<()> {
53        let mut root = self.root.lock().unwrap();
54        if *root != *prev_id {
55            let msg = format!(
56                "update root but unmatch! current={}, prev={}, new={}",
57                *root, prev_id, root_id
58            );
59            error!("{}", msg);
60            return Err(BuckyError::new(BuckyErrorCode::Unmatch, msg));
61        }
62
63        info!("objectmap path root updated! {} -> {}", *root, root_id);
64        *root = root_id;
65        Ok(())
66    }
67
68    async fn get_root(&self) -> BuckyResult<ObjectMapRef> {
69        let root_id = self.root();
70        let ret = self.obj_map_cache.get_object_map(&root_id).await?;
71        if ret.is_none() {
72            let msg = format!("load root object but not found! id={}", root_id);
73            error!("{}", msg);
74            return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
75        }
76
77        Ok(ret.unwrap())
78    }
79
80    /*
81    /a/b/ -> /a/b
82    / -> /
83    /a/b?arg=xxx -> /a/b
84    */
85    fn fix_path(path: &str) -> BuckyResult<&str> {
86        let path = path.trim();
87        if path == "/" {
88            return Ok(path);
89        }
90
91        // Remove the query params
92        let path = match path.rsplit_once('?') {
93            Some((path, _)) => path,
94            None => path,
95        };
96
97        // The / at the end needs to be removed
98        let path_ret = path.trim_end_matches("/");
99        if !path_ret.starts_with("/") {
100            let msg = format!("invalid objectmap path format! path={}", path);
101            error!("{}", msg);
102            return Err(BuckyError::new(BuckyErrorCode::InvalidFormat, msg));
103        }
104
105        Ok(path_ret)
106    }
107
108    // 获取path对应的obj_map叶子节点
109    async fn get_object_map(&self, path: &str) -> BuckyResult<Option<ObjectMapRef>> {
110        let mut current = self.get_root().await?;
111
112        let path = Self::fix_path(path)?;
113        // Check if is root path
114        if path == "/" {
115            return Ok(Some(current));
116        }
117
118        // 依次获取每级子路径
119        let parts = path.split("/").skip(1);
120        for part in parts {
121            ObjectMapChecker::check_key_value(part)?;
122
123            let sub = current
124                .lock()
125                .await
126                .get_or_create_child_object_map(
127                    &self.obj_map_cache,
128                    part,
129                    ObjectMapSimpleContentType::Map,
130                    ObjectMapCreateStrategy::NotCreate,
131                    None,
132                )
133                .await
134                .map_err(|e| {
135                    let msg = format!(
136                        "get object by path error! path={}, part={}, {}",
137                        path, part, e
138                    );
139                    error!("{}", msg);
140                    BuckyError::new(e.code(), msg)
141                })?;
142
143            if sub.is_none() {
144                let msg = format!(
145                    "get object by path but not found! path={}, part={}",
146                    path, part
147                );
148                warn!("{}", msg);
149                return Ok(None);
150            }
151
152            current = sub.unwrap();
153            debug!(
154                "get objectmap path seg: {}={:?}",
155                part,
156                current.lock().await.cached_object_id()
157            );
158        }
159
160        Ok(Some(current))
161    }
162
163    // 以/开头的路径
164    async fn create_object_map(
165        &self,
166        path: &str,
167        content_type: ObjectMapSimpleContentType,
168        auto_create: ObjectMapCreateStrategy,
169    ) -> BuckyResult<Option<Vec<ObjectMapPathSeg>>> {
170        let root = self.get_root().await?;
171        let current = root.lock().await.clone();
172
173        let path = Self::fix_path(path)?;
174
175        let root_seg = ObjectMapPathSeg {
176            obj_map: current,
177            seg: None,
178        };
179
180        let mut obj_list = vec![root_seg];
181
182        // 判断是不是root
183        if path == "/" {
184            trace!("object map path list: path={}, list={:?}", path, obj_list);
185            return Ok(Some(obj_list));
186        }
187
188        // 依次获取每级子路径
189        let parts: Vec<&str> = path.split("/").skip(1).collect();
190        for (index, &part) in parts.iter().enumerate() {
191            ObjectMapChecker::check_key_value(part)?;
192
193            let is_last_part = index == parts.len() - 1;
194            // 最后一级使用目标类型, 中间子目录统一使用map
195            let content_type = if is_last_part {
196                content_type.clone()
197            } else {
198                ObjectMapSimpleContentType::Map
199            };
200
201            let create_strategy = match auto_create {
202                ObjectMapCreateStrategy::CreateIfNotExists => {
203                    ObjectMapCreateStrategy::CreateIfNotExists
204                }
205                ObjectMapCreateStrategy::NotCreate => ObjectMapCreateStrategy::NotCreate,
206                ObjectMapCreateStrategy::CreateNew => {
207                    // only use createNew for the last seg
208                    if is_last_part {
209                        ObjectMapCreateStrategy::CreateNew
210                    } else {
211                        ObjectMapCreateStrategy::CreateIfNotExists
212                    }
213                }
214            };
215
216            let sub = obj_list
217                .last_mut()
218                .unwrap()
219                .obj_map
220                .get_or_create_child_object_map(&self.obj_map_cache, part, content_type, create_strategy, None)
221                .await
222                .map_err(|e| {
223                    let msg = format!(
224                        "get or create object by path error! path={}, part={}, create_strategy={:?}, {}",
225                        path, part, create_strategy, e
226                    );
227                    error!("{}", msg);
228                    BuckyError::new(e.code(), msg)
229                })?;
230
231            if sub.is_none() {
232                let msg = format!(
233                    "get object by path but not found! path={}, part={}",
234                    path, part
235                );
236                warn!("{}", msg);
237                return Ok(None);
238            }
239
240            // 可能涉及到修改操作,所以路径上的objectmap都clone一份
241            let current = sub.unwrap().lock().await.clone();
242            let current_seq = ObjectMapPathSeg {
243                obj_map: current,
244                seg: Some(part.to_owned()),
245            };
246
247            obj_list.push(current_seq);
248        }
249
250        debug!("object map path list: path={}, list={:?}", path, obj_list);
251
252        Ok(Some(obj_list))
253    }
254
255    async fn update_path_obj_map_list(
256        &self,
257        mut obj_map_list: Vec<ObjectMapPathSeg>,
258    ) -> BuckyResult<Vec<(ObjectMap, ObjectId)>> {
259        assert!(!obj_map_list.is_empty());
260
261        let mut current_obj_map = obj_map_list.pop().unwrap();
262        let mut new_obj_map_list = vec![];
263
264        // 更新路径上的所有obj_map
265        loop {
266            // 刷新当前obj_map的id
267            let prev_id = current_obj_map.obj_map.cached_object_id().unwrap();
268            let current_id = current_obj_map.obj_map.flush_id();
269            assert_ne!(prev_id, current_id);
270
271            trace!(
272                "update objectmap path seg: seg={:?}, {} -> {}",
273                current_obj_map.seg, prev_id, current_id
274            );
275
276            // 更新此段的obj_map(id发生了变化)
277            new_obj_map_list.push((current_obj_map.obj_map, prev_id.clone()));
278
279            if obj_map_list.is_empty() {
280                break;
281            }
282
283            // 如果存在父一级,那么需要更新到父级obj_map
284            let seg = current_obj_map.seg.unwrap();
285            assert!(seg.len() > 0);
286
287            let mut parent_obj_map = obj_map_list.pop().unwrap();
288            parent_obj_map
289                .obj_map
290                .set_with_key(
291                    &self.obj_map_cache,
292                    &seg,
293                    &current_id,
294                    &Some(prev_id),
295                    false,
296                )
297                .await
298                .map_err(|e| e)?;
299
300            current_obj_map = parent_obj_map;
301        }
302
303        Ok(new_obj_map_list)
304    }
305
306    // 新的path刷新到缓存,并更新root
307    fn flush_path_obj_map_list(&self, obj_map_list: Vec<(ObjectMap, ObjectId)>) -> BuckyResult<()> {
308        let count = obj_map_list.len();
309
310        // 从叶子节点向根节点依次更新,最后更新root
311        for (index, (obj_map, prev_id)) in obj_map_list.into_iter().enumerate() {
312            // 最新的map必须已经计算过id了
313            let current_id = obj_map.cached_object_id().unwrap();
314            assert_ne!(current_id, prev_id);
315
316            self.obj_map_cache
317                .put_object_map(&current_id, obj_map, None)?;
318
319            if index + 1 == count {
320                self.update_root(current_id, &prev_id)?;
321            }
322
323            // TODO 如果之前的object在pending区,那么尝试移除
324            // 这里不能简单的移除,因为可能别的path操作里面长生的新路径存在相同对象id,会引用,这里移除会导致后续的查找失败
325            // 需要依赖统一的GC逻辑
326            // remove_list.insert(prev_id);
327        }
328
329        Ok(())
330    }
331
332    pub async fn metadata(&self, path: &str) -> BuckyResult<ObjectMapMetaData> {
333        let ret = self.get_object_map(path).await?;
334        if ret.is_none() {
335            let msg = format!("get value from path but objectmap not found! path={}", path);
336            warn!("{}", msg);
337            return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
338        }
339
340        let ret = ret.unwrap();
341        let obj = ret.lock().await;
342        Ok(obj.metadata())
343    }
344
345    pub async fn list(&self, path: &str) -> BuckyResult<ObjectMapContentList> {
346        let ret = self.get_object_map(path).await?;
347        if ret.is_none() {
348            let msg = format!("get value from path but objectmap not found! path={}", path);
349            warn!("{}", msg);
350            return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
351        }
352
353        let item = ret.unwrap();
354        let obj = item.lock().await;
355        let mut list = ObjectMapContentList::new(obj.count() as usize);
356        obj.list(&self.obj_map_cache, &mut list).await?;
357        Ok(list)
358    }
359
360    pub fn parse_path_allow_empty_key(full_path: &str) -> BuckyResult<(&str, &str)> {
361        let full_path = Self::fix_path(full_path)?;
362
363        if full_path == "/" {
364            return Ok((full_path, ""));
365        }
366
367        let mut path_segs: Vec<&str> = full_path.split("/").collect();
368        if path_segs.len() < 2 {
369            let msg = format!("invalid objectmap full path: {}", full_path);
370            error!("{}", msg);
371            return Err(BuckyError::new(BuckyErrorCode::InvalidFormat, msg));
372        }
373
374        let key = path_segs.pop().unwrap();
375        let trim_len = if path_segs.len() > 1 {
376            key.len() + 1
377        } else {
378            key.len()
379        };
380
381        let path = &full_path[..(full_path.len() - trim_len)];
382        if path.len() == 0 {
383            let msg = format!("invalid objectmap full path: {}", full_path);
384            error!("{}", msg);
385            return Err(BuckyError::new(BuckyErrorCode::InvalidFormat, msg));
386        }
387
388        Ok((path, key))
389    }
390
391    // map methods with full_path
392    // key should not been empty string
393    // 用来解析全路径,提取path和key
394    /*
395    /a -> / + a
396    /a/b/ -> /a + b
397    / -> Err
398    */
399    pub fn parse_full_path(full_path: &str) -> BuckyResult<(&str, &str)> {
400        let (path, key) = Self::parse_path_allow_empty_key(full_path)?;
401
402        let full_path = Self::fix_path(full_path)?;
403
404        if key.len() == 0 {
405            let msg = format!("invalid objectmap full path: {}", full_path);
406            error!("{}", msg);
407            return Err(BuckyError::new(BuckyErrorCode::InvalidFormat, msg));
408        }
409
410        Ok((path, key))
411    }
412
413    pub async fn get_by_path(&self, full_path: &str) -> BuckyResult<Option<ObjectId>> {
414        let (path, key) = Self::parse_path_allow_empty_key(full_path)?;
415
416        self.get_by_key(path, key).await
417    }
418
419    pub async fn create_new_with_path(
420        &self,
421        full_path: &str,
422        content_type: ObjectMapSimpleContentType,
423    ) -> BuckyResult<()> {
424        let (path, key) = Self::parse_full_path(full_path)?;
425
426        self.create_new(path, key, content_type).await
427    }
428
429    pub async fn insert_with_path(&self, full_path: &str, value: &ObjectId) -> BuckyResult<()> {
430        let (path, key) = Self::parse_full_path(full_path)?;
431
432        self.insert_with_key(path, key, value).await
433    }
434
435    pub async fn set_with_path(
436        &self,
437        full_path: &str,
438        value: &ObjectId,
439        prev_value: &Option<ObjectId>,
440        auto_insert: bool,
441    ) -> BuckyResult<Option<ObjectId>> {
442        let (path, key) = Self::parse_full_path(full_path)?;
443
444        self.set_with_key(path, key, value, prev_value, auto_insert)
445            .await
446    }
447
448    pub async fn remove_with_path(
449        &self,
450        full_path: &str,
451        prev_value: &Option<ObjectId>,
452    ) -> BuckyResult<Option<ObjectId>> {
453        let (path, key) = Self::parse_full_path(full_path)?;
454
455        self.remove_with_key(path, key, prev_value).await
456    }
457
458    // map methods
459    pub async fn create_new(
460        &self,
461        path: &str,
462        key: &str,
463        content_type: ObjectMapSimpleContentType,
464    ) -> BuckyResult<()> {
465        // 创建事务
466        let param = CreateNewParam {
467            key: key.to_owned(),
468            content_type,
469        };
470        let op_data = CreateNewOpData {
471            path: path.to_owned(),
472            param,
473            state: None,
474        };
475
476        let ret = self.create_new_op(&op_data).await?;
477
478        // insert不需要保存状态,只要插入成功,那么状态就认为是一致的
479
480        if let Some(write_ops) = &self.write_ops {
481            write_ops.append_op(ObjectMapWriteOp::CreateNew(op_data));
482        }
483
484        Ok(ret)
485    }
486
487    async fn create_new_op(&self, op_data: &CreateNewOpData) -> BuckyResult<()> {
488        // 首先获取路径上的所有ObjectMap(空目录自动创建)
489        let ret = self
490            .create_object_map(
491                &op_data.path,
492                ObjectMapSimpleContentType::Map,
493                ObjectMapCreateStrategy::CreateIfNotExists,
494            )
495            .await?;
496        let mut obj_map_list = ret.unwrap();
497        assert!(obj_map_list.len() > 0);
498
499        // create_new不需要保存旧值,因为如果存在旧值,那么会直接失败;只有为空才可以创建成功
500        obj_map_list
501            .last_mut()
502            .unwrap()
503            .obj_map
504            .get_or_create_child_object_map(
505                &self.obj_map_cache,
506                &op_data.param.key,
507                op_data.param.content_type,
508                ObjectMapCreateStrategy::CreateNew,
509                None,
510            )
511            .await?;
512
513        let list = self.update_path_obj_map_list(obj_map_list).await?;
514        self.flush_path_obj_map_list(list)?;
515
516        Ok(())
517    }
518
519    pub async fn get_by_key(&self, path: &str, key: &str) -> BuckyResult<Option<ObjectId>> {
520        let ret = self.get_object_map(path).await?;
521        if ret.is_none() {
522            info!(
523                "get value from path but objectmap not found! path={}, key={}",
524                path, key
525            );
526            return Ok(None);
527        }
528
529        // without key, return the path last node
530        if key.len() == 0 {
531            let obj_map = ret.as_ref().unwrap().lock().await;
532            return Ok(obj_map.cached_object_id());
533        }
534
535        let ret = ret.unwrap();
536        let obj_map = ret.lock().await;
537        obj_map.get_by_key(&self.obj_map_cache, key).await
538    }
539
540    pub async fn insert_with_key(
541        &self,
542        path: &str,
543        key: &str,
544        value: &ObjectId,
545    ) -> BuckyResult<()> {
546        // 创建事务
547        let param = InsertWithKeyParam {
548            key: key.to_owned(),
549            value: value.to_owned(),
550        };
551        let op_data = InsertWithKeyOpData {
552            path: path.to_owned(),
553            param,
554            state: None,
555        };
556
557        let ret = self.insert_with_key_op(&op_data).await?;
558
559        // insert不需要保存状态,只要插入成功,那么状态就认为是一致的
560
561        if let Some(write_ops) = &self.write_ops {
562            write_ops.append_op(ObjectMapWriteOp::InsertWithKey(op_data));
563        }
564
565        Ok(ret)
566    }
567
568    async fn insert_with_key_op(&self, op_data: &InsertWithKeyOpData) -> BuckyResult<()> {
569        // 首先获取路径上的所有ObjectMap(空目录自动创建)
570        let ret = self
571            .create_object_map(
572                &op_data.path,
573                ObjectMapSimpleContentType::Map,
574                ObjectMapCreateStrategy::CreateIfNotExists,
575            )
576            .await?;
577        let mut obj_map_list = ret.unwrap();
578        assert!(obj_map_list.len() > 0);
579
580        // insert_with_key不需要保存旧值,因为如果存在旧值,那么会直接失败;只有为空才可以插入成功
581        obj_map_list
582            .last_mut()
583            .unwrap()
584            .obj_map
585            .insert_with_key(
586                &self.obj_map_cache,
587                &op_data.param.key,
588                &op_data.param.value,
589            )
590            .await?;
591
592        let list = self.update_path_obj_map_list(obj_map_list).await?;
593        self.flush_path_obj_map_list(list)?;
594
595        Ok(())
596    }
597
598    pub async fn set_with_key(
599        &self,
600        path: &str,
601        key: &str,
602        value: &ObjectId,
603        prev_value: &Option<ObjectId>,
604        auto_insert: bool,
605    ) -> BuckyResult<Option<ObjectId>> {
606        // 创建事务
607        let param = SetWithKeyParam {
608            key: key.to_owned(),
609            value: value.to_owned(),
610            prev_value: prev_value.to_owned(),
611            auto_insert,
612        };
613
614        let mut op_data = SetWithKeyOpData {
615            path: path.to_owned(),
616            param,
617            state: None,
618        };
619
620        let ret = self.set_with_key_op(&op_data).await?;
621
622        // 保存状态
623        if let Some(write_ops) = &self.write_ops {
624            let state = ObjectMapKeyState { value: ret.clone() };
625            op_data.state = Some(state);
626
627            write_ops.append_op(ObjectMapWriteOp::SetWithKey(op_data));
628        }
629
630        Ok(ret)
631    }
632
633    async fn set_with_key_op(&self, op_data: &SetWithKeyOpData) -> BuckyResult<Option<ObjectId>> {
634        // 首先获取路径上的所有ObjectMap(空目录自动创建)
635
636        let create_strategy = if op_data.param.auto_insert {
637            ObjectMapCreateStrategy::CreateIfNotExists
638        } else {
639            ObjectMapCreateStrategy::NotCreate
640        };
641
642        let obj_map_list = self
643            .create_object_map(
644                &op_data.path,
645                ObjectMapSimpleContentType::Map,
646                create_strategy,
647            )
648            .await?;
649        if obj_map_list.is_none() {
650            // 如果auto_insert=false,并且路径不存在,那么直接返回Err(NotFound)
651            let msg = format!(
652                "set_with_key but path not found! path={}, value={}",
653                op_data.path, op_data.param.value,
654            );
655            error!("{}", msg);
656            return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
657        }
658
659        let mut obj_map_list = obj_map_list.unwrap();
660        assert!(obj_map_list.len() > 0);
661
662        // set_with_key存在以下几种情况:
663        // 1. 当前slot为空,auto_insert=false,那么直接返回Err(NotFound)
664        // 2. 当前slot为空,auto_insert=true,那么操作成功,返回Ok(None)
665        // 3. 当前slot不为空,prev_value=None, 那么操作成功,返回Ok(prev_value)
666        // 4. 当前slot不为空, prev_value!=None, 那么只有当前value和prev_value匹配,才成功,并且返回当前值;否则返回Err(Unmatch)
667        let ret = obj_map_list
668            .last_mut()
669            .unwrap()
670            .obj_map
671            .set_with_key(
672                &self.obj_map_cache,
673                &op_data.param.key,
674                &op_data.param.value,
675                &op_data.param.prev_value,
676                op_data.param.auto_insert,
677            )
678            .await?;
679
680        // 判断状态是否一致
681        if let Some(state) = &op_data.state {
682            if ret != state.value {
683                let msg = format!(
684                    "set_with_key with path commit but state conflict! op_data={:?}, ret={:?}",
685                    op_data, ret,
686                );
687                warn!("{}", msg);
688                return Err(BuckyError::new(BuckyErrorCode::Conflict, msg));
689            }
690        }
691
692        if ret != Some(op_data.param.value) {
693            let list = self.update_path_obj_map_list(obj_map_list).await?;
694            self.flush_path_obj_map_list(list)?;
695        }
696
697        Ok(ret)
698    }
699
700    pub async fn remove_with_key(
701        &self,
702        path: &str,
703        key: &str,
704        prev_value: &Option<ObjectId>,
705    ) -> BuckyResult<Option<ObjectId>> {
706        // 创建事务
707        let param = RemoveWithKeyParam {
708            key: key.to_owned(),
709            prev_value: prev_value.to_owned(),
710        };
711        let mut op_data = RemoveWithKeyOpData {
712            path: path.to_owned(),
713            param,
714            state: None,
715        };
716
717        let ret = self.remove_with_key_op(&op_data).await?;
718
719        // 保存状态
720        if let Some(write_ops) = &self.write_ops {
721            let state = ObjectMapKeyState { value: ret.clone() };
722            op_data.state = Some(state);
723
724            write_ops.append_op(ObjectMapWriteOp::RemoveWithKey(op_data));
725        }
726
727        Ok(ret)
728    }
729
730    async fn remove_with_key_op(
731        &self,
732        op_data: &RemoveWithKeyOpData,
733    ) -> BuckyResult<Option<ObjectId>> {
734        let (ret, obj_map_list) = loop {
735            let ret = self
736                .create_object_map(
737                    &op_data.path,
738                    ObjectMapSimpleContentType::Map,
739                    ObjectMapCreateStrategy::NotCreate,
740                )
741                .await?;
742
743            // 所在目录不存在,那么直接返回不存在即可
744            if ret.is_none() {
745                debug!(
746                    "objectmap path remove_with_key but path not found! root={}, path={}, key={}",
747                    self.root(),
748                    op_data.path,
749                    op_data.param.key,
750                );
751
752                break (None, None);
753            }
754
755            let mut obj_map_list = ret.unwrap();
756            assert!(obj_map_list.len() > 0);
757
758            // 发起真正的remove操作
759            let ret = obj_map_list
760                .last_mut()
761                .unwrap()
762                .obj_map
763                .remove_with_key(
764                    &self.obj_map_cache,
765                    &op_data.param.key,
766                    &op_data.param.prev_value,
767                )
768                .await?;
769
770            info!(
771                "objectmap path remove_with_key success! root={}, path={}, key={}, value={:?}",
772                self.root(),
773                op_data.path,
774                op_data.param.key,
775                ret
776            );
777            break (ret, Some(obj_map_list));
778        };
779
780        // 判断状态是否一致
781        if let Some(state) = &op_data.state {
782            if ret != state.value {
783                let msg = format!(
784                    "remove_with_key from path commit but state conflict! op_data={:?}, ret={:?}",
785                    op_data, ret,
786                );
787                warn!("{}", msg);
788                return Err(BuckyError::new(BuckyErrorCode::Conflict, msg));
789            }
790        }
791
792        if ret.is_none() {
793            return Ok(None);
794        }
795
796        // 内容改变了,需要更新整个路径
797        let list = self.update_path_obj_map_list(obj_map_list.unwrap()).await?;
798        self.flush_path_obj_map_list(list)?;
799
800        Ok(ret)
801    }
802
803    // set methods
804    pub async fn contains(&self, path: &str, object_id: &ObjectId) -> BuckyResult<bool> {
805        let ret = self.get_object_map(path).await?;
806
807        if ret.is_none() {
808            let msg = format!(
809                "contains from path but objectmap not found! path={}, value={}",
810                path, object_id,
811            );
812            error!("{}", msg);
813            return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
814        }
815
816        let ret = ret.unwrap();
817        let obj_map = ret.lock().await;
818        obj_map.contains(&self.obj_map_cache, object_id).await
819    }
820
821    pub async fn insert(&self, path: &str, object_id: &ObjectId) -> BuckyResult<bool> {
822        // 创建事务
823        let param = InsertParam {
824            value: object_id.to_owned(),
825        };
826        let mut op_data = InsertOpData {
827            path: path.to_owned(),
828            param,
829            state: None,
830        };
831
832        let ret = self.insert_op(&op_data).await?;
833
834        // 保存现有状态
835        if let Some(write_ops) = &self.write_ops {
836            op_data.state = Some(ret);
837
838            write_ops.append_op(ObjectMapWriteOp::Insert(op_data));
839        }
840
841        Ok(ret)
842    }
843
844    async fn insert_op(&self, op_data: &InsertOpData) -> BuckyResult<bool> {
845        let obj_map_list = self
846            .create_object_map(
847                &op_data.path,
848                ObjectMapSimpleContentType::Set,
849                ObjectMapCreateStrategy::CreateIfNotExists,
850            )
851            .await?;
852
853        let mut obj_map_list = obj_map_list.unwrap();
854        assert!(obj_map_list.len() > 0);
855
856        // 发起真正的insert操作
857        let ret = obj_map_list
858            .last_mut()
859            .unwrap()
860            .obj_map
861            .insert(&self.obj_map_cache, &op_data.param.value)
862            .await?;
863        // 如果事务是带状态的,那么需要校验一次状态
864        if let Some(state) = &op_data.state {
865            if *state != ret {
866                let msg = format!(
867                    "insert to path commit but state conflict! op_data={:?}",
868                    op_data,
869                );
870                warn!("{}", msg);
871                return Err(BuckyError::new(BuckyErrorCode::Conflict, msg));
872            }
873        }
874
875        // 值不存在,插入成功,需要更新路径
876        if ret {
877            // 内容改变了,需要更新整个路径
878            let list = self.update_path_obj_map_list(obj_map_list).await?;
879            self.flush_path_obj_map_list(list)?;
880        }
881
882        Ok(ret)
883    }
884
885    pub async fn remove(&self, path: &str, object_id: &ObjectId) -> BuckyResult<bool> {
886        // 创建事务
887        let param = RemoveParam {
888            value: object_id.to_owned(),
889        };
890        let mut op_data = RemoveOpData {
891            path: path.to_owned(),
892            param,
893            state: None,
894        };
895
896        let ret = self.remove_op(&op_data).await?;
897
898        // 保存状态
899        if let Some(write_ops) = &self.write_ops {
900            op_data.state = Some(ret);
901
902            write_ops.append_op(ObjectMapWriteOp::Remove(op_data));
903        }
904
905        Ok(ret)
906    }
907
908    async fn remove_op(&self, op_data: &RemoveOpData) -> BuckyResult<bool> {
909        let ret = self
910            .create_object_map(
911                &op_data.path,
912                ObjectMapSimpleContentType::Set,
913                ObjectMapCreateStrategy::NotCreate,
914            )
915            .await?;
916
917        // 所在目录不存在,那么直接返回错误
918        if ret.is_none() {
919            let msg = format!(
920                "remove but path not found! path={}, value={}",
921                op_data.path, op_data.param.value,
922            );
923            error!("{}", msg);
924            return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
925        }
926
927        let mut obj_map_list = ret.unwrap();
928        assert!(obj_map_list.len() > 0);
929
930        // 发起真正的remove操作
931        let ret = obj_map_list
932            .last_mut()
933            .unwrap()
934            .obj_map
935            .remove(&self.obj_map_cache, &op_data.param.value)
936            .await?;
937
938        // 如果事务是带状态的,那么需要校验一次状态
939        if let Some(state) = &op_data.state {
940            if *state != ret {
941                let msg = format!(
942                    "remove from path commit but state conflict! op_data={:?}",
943                    op_data,
944                );
945                warn!("{}", msg);
946                return Err(BuckyError::new(BuckyErrorCode::Conflict, msg));
947            }
948        }
949
950        if ret {
951            // 内容改变了,需要更新整个路径
952            let list = self.update_path_obj_map_list(obj_map_list).await?;
953            self.flush_path_obj_map_list(list)?;
954        }
955
956        Ok(ret)
957    }
958
959    pub fn clear_op_list(&self) {
960        if let Some(write_ops) = &self.write_ops {
961            let _ = write_ops.fetch_all();
962        }
963    }
964
965    // 提交操作列表,用以实现事务的commit
966    pub async fn commit_op_list(&self) -> BuckyResult<()> {
967        let op_list = self.write_ops.as_ref().unwrap().fetch_all();
968
969        for op_data in op_list {
970            self.commit_op(op_data).await?;
971        }
972
973        Ok(())
974    }
975
976    async fn commit_op(&self, op: ObjectMapWriteOp) -> BuckyResult<()> {
977        match op {
978            ObjectMapWriteOp::CreateNew(op_data) => {
979                self.create_new_op(&op_data).await?;
980            }
981            ObjectMapWriteOp::InsertWithKey(op_data) => {
982                self.insert_with_key_op(&op_data).await?;
983            }
984            ObjectMapWriteOp::SetWithKey(op_data) => {
985                self.set_with_key_op(&op_data).await?;
986            }
987            ObjectMapWriteOp::RemoveWithKey(op_data) => {
988                self.remove_with_key_op(&op_data).await?;
989            }
990
991            ObjectMapWriteOp::Insert(op_data) => {
992                self.insert_op(&op_data).await?;
993            }
994            ObjectMapWriteOp::Remove(op_data) => {
995                self.remove_op(&op_data).await?;
996            }
997        }
998
999        Ok(())
1000    }
1001}
1002
1003#[cfg(test)]
1004mod test_path {
1005    use super::super::cache::*;
1006    use super::super::path_iterator::*;
1007    use super::*;
1008
1009    use std::str::FromStr;
1010
1011    async fn dump_path(item: &ObjectMapPath, path: &str) {
1012        let list = item.list(path).await.unwrap();
1013        info!("dump path={} as follows:", path);
1014        info!("{}", list);
1015    }
1016
1017    async fn test_path1(path: &ObjectMapPath) {
1018        let x1_value = ObjectId::from_str("5aSixgPg3hDa1oU9eAtRcKTyVKg5X2bVXWPVhk3U5c7G").unwrap();
1019        let x1_value2 = ObjectId::from_str("5aSixgPCivmQfASRbjAvBiwgxhU8LrNtYtC2D6Lis2NQ").unwrap();
1020
1021        path.insert_with_key("/", "x1", &x1_value).await.unwrap();
1022
1023        let ret = path.get_by_key("/a/b/c", "x1").await.unwrap();
1024        assert!(ret.is_none());
1025
1026        let ret = path.get_by_path("/a/b/c/x1").await.unwrap();
1027        assert!(ret.is_none());
1028
1029        path.insert_with_key("/a/b/c", "x1", &x1_value)
1030            .await
1031            .unwrap();
1032        let ret = path.insert_with_path("/a/b/c/x1", &x1_value).await;
1033        let e = ret.unwrap_err();
1034        assert_eq!(e.code(), BuckyErrorCode::AlreadyExists);
1035
1036        let ret = path.get_by_key("/a/b/c", "x1").await.unwrap();
1037        assert_eq!(ret, Some(x1_value));
1038        let ret = path.get_by_path("/a/b/c/x1").await.unwrap();
1039        assert_eq!(ret, Some(x1_value));
1040
1041        dump_path(path, "/").await;
1042        dump_path(path, "/a").await;
1043        dump_path(path, "/a/b").await;
1044        dump_path(path, "/a/b/c").await;
1045        let ret = path.get_by_key("/a/b/c", "x1").await.unwrap();
1046        assert_eq!(ret, Some(x1_value));
1047
1048        // 插入已经存在的key,返回错误
1049        let ret = path.insert_with_key("/a/b/c", "x1", &x1_value).await;
1050        let err = ret.unwrap_err();
1051        assert_eq!(err.code(), BuckyErrorCode::AlreadyExists);
1052
1053        // 测试set_with_key
1054        let ret = path
1055            .set_with_key("/a/b/c", "x1", &x1_value2, &Some(x1_value2), false)
1056            .await;
1057        assert!(ret.is_err());
1058        let err = ret.unwrap_err();
1059        assert_eq!(err.code(), BuckyErrorCode::Unmatch);
1060
1061        let ret = path
1062            .set_with_key("/a/b/c", "x1", &x1_value2, &Some(x1_value), false)
1063            .await
1064            .unwrap();
1065        assert_eq!(ret, Some(x1_value));
1066
1067        // 测试删除
1068        let ret = path.remove_with_key("/a/b/c", "x1", &Some(x1_value)).await;
1069        assert!(ret.is_err());
1070        let err = ret.unwrap_err();
1071        assert_eq!(err.code(), BuckyErrorCode::Unmatch);
1072
1073        let ret = path.remove_with_key("/a/b/c", "x1", &None).await.unwrap();
1074        assert_eq!(ret, Some(x1_value2));
1075
1076        // 再次测试set_with_key
1077        let ret = path
1078            .set_with_key("/a/b/c", "x1", &x1_value2, &None, false)
1079            .await;
1080        assert!(ret.is_err());
1081        let err = ret.unwrap_err();
1082        assert_eq!(err.code(), BuckyErrorCode::NotFound);
1083
1084        // 自动插入x1
1085        let ret = path
1086            .set_with_key("/a/b/c", "x1", &x1_value2, &None, true)
1087            .await
1088            .unwrap();
1089        assert_eq!(ret, None);
1090
1091        let ret = path.get_by_key("/a/b/c", "x1").await.unwrap();
1092        assert_eq!(ret, Some(x1_value2));
1093
1094        let ret = path.remove_with_key("/a/b/c", "x1", &None).await.unwrap();
1095        assert_eq!(ret, Some(x1_value2));
1096
1097        let ret = path.get_by_key("/a/b/c", "x1").await.unwrap();
1098        assert!(ret.is_none());
1099
1100        let ret = path.get_by_key("/a/b", "c").await.unwrap();
1101        assert!(ret.is_some());
1102        let c_id = ret.unwrap();
1103        info!("/a/b/c={}", c_id);
1104
1105        dump_path(path, "/").await;
1106        dump_path(path, "/a").await;
1107        dump_path(path, "/a/b").await;
1108        dump_path(path, "/a/b/c").await;
1109
1110        let ret = path.remove_with_key("/a/b", "c", &None).await.unwrap();
1111        assert_eq!(ret, Some(c_id));
1112
1113        let ret = path.get_by_key("/a/b/c", "x1").await.unwrap();
1114        assert!(ret.is_none());
1115
1116        let ret = path.get_by_key("/a/b", "c").await.unwrap();
1117        assert!(ret.is_none());
1118
1119        let ret = path.get_by_key("/a/b/c", "x1").await.unwrap();
1120        assert!(ret.is_none());
1121
1122        let ret = path.get_by_path("/").await.unwrap();
1123        assert!(ret.is_some());
1124
1125        path.create_new("/a/b", "c", ObjectMapSimpleContentType::Set)
1126            .await
1127            .unwrap();
1128        if let Err(e) = path
1129            .create_new("/a/b", "c", ObjectMapSimpleContentType::Set)
1130            .await
1131        {
1132            assert!(e.code() == BuckyErrorCode::AlreadyExists);
1133        } else {
1134            unreachable!();
1135        }
1136        if let Err(e) = path
1137            .create_new("/a/b", "c", ObjectMapSimpleContentType::Set)
1138            .await
1139        {
1140            assert!(e.code() == BuckyErrorCode::AlreadyExists);
1141        } else {
1142            unreachable!();
1143        }
1144
1145        let ret = path.get_by_key("/a/b", "c").await.unwrap();
1146        assert!(ret.is_some());
1147    }
1148
1149    async fn test_path() {
1150        let noc = ObjectMapMemoryNOCCache::new();
1151        let root_cache = ObjectMapRootMemoryCache::new_default_ref(None, noc);
1152        let cache = ObjectMapOpEnvMemoryCache::new_ref(root_cache.clone());
1153
1154        // 创建一个空的objectmap作为root
1155        let owner = ObjectId::default();
1156        let root = ObjectMap::new(
1157            ObjectMapSimpleContentType::Map,
1158            Some(owner.clone()),
1159            Some(owner.clone()),
1160        )
1161        .no_create_time()
1162        .build();
1163        let root_id = root.flush_id();
1164        cache.put_object_map(&root_id, root, None).unwrap();
1165        info!("new root: {}", root_id);
1166
1167        let path = ObjectMapPath::new(root_id.clone(), cache.clone(), true);
1168        test_path1(&path).await;
1169
1170        let opt = ObjectMapPathIteratorOption::new(true, true);
1171        let root = path.root();
1172        let root_obj = cache.get_object_map(&root).await.unwrap();
1173        let mut it =
1174            ObjectMapPathIterator::new(root_obj.unwrap(), cache.clone(), opt.clone()).await;
1175        while !it.is_end() {
1176            let list = it.next(5).await.unwrap();
1177            info!("list: {} {:?}", 1, list.list);
1178        }
1179
1180        let root_id = path.root();
1181        info!("result root: {}", root_id);
1182
1183        cache.gc(false, &root_id).await.unwrap();
1184
1185        let root_obj = cache.get_object_map(&root_id).await.unwrap();
1186        let mut it =
1187            ObjectMapPathIterator::new(root_obj.unwrap(), cache.clone(), opt.clone()).await;
1188        while !it.is_end() {
1189            let list = it.next(5).await.unwrap();
1190            info!("list: {} {:?}", 1, list.list);
1191        }
1192    }
1193
1194    #[test]
1195    fn test_full_path() {
1196        ObjectMapPath::parse_full_path("/").unwrap_err();
1197        let (path, key) = ObjectMapPath::parse_full_path("/a").unwrap();
1198        assert_eq!(path, "/");
1199        assert_eq!(key, "a");
1200
1201        let (path, key) = ObjectMapPath::parse_full_path("/a/").unwrap();
1202        assert_eq!(path, "/");
1203        assert_eq!(key, "a");
1204
1205        let (path, key) = ObjectMapPath::parse_full_path("/a/b").unwrap();
1206        assert_eq!(path, "/a");
1207        assert_eq!(key, "b");
1208
1209        let (path, key) = ObjectMapPath::parse_full_path("/eeee/eeee").unwrap();
1210        assert_eq!(path, "/eeee");
1211        assert_eq!(key, "eeee");
1212
1213        let (path, key) = ObjectMapPath::parse_full_path("/eeee/eeee/").unwrap();
1214        assert_eq!(path, "/eeee");
1215        assert_eq!(key, "eeee");
1216    }
1217
1218    #[test]
1219    fn test() {
1220        crate::init_simple_log("test-object-map-path", Some("debug"));
1221        test_full_path();
1222        async_std::task::block_on(async move {
1223            test_path().await;
1224        });
1225    }
1226}