1use super::cache::*;
2use super::check::*;
3use super::iterator::*;
4use super::object_map::*;
5use super::op::*;
6use crate::*;
7
8use std::sync::{Arc, Mutex};
9
10pub struct ObjectMapPath {
12 root: Arc<Mutex<ObjectId>>,
13 obj_map_cache: ObjectMapOpEnvCacheRef,
14
15 write_ops: Option<ObjectMapOpList>,
17}
18
19struct ObjectMapPathSeg {
20 obj_map: ObjectMap,
21 seg: Option<String>,
22}
23
24impl std::fmt::Debug for ObjectMapPathSeg {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 write!(f, "[{:?},{:?}]", self.seg, self.obj_map.cached_object_id())
27 }
28}
29
30impl ObjectMapPath {
31 pub fn new(
32 root: ObjectId,
33 obj_map_cache: ObjectMapOpEnvCacheRef,
34 enable_transaction: bool,
35 ) -> Self {
36 Self {
37 root: Arc::new(Mutex::new(root)),
38 obj_map_cache,
39 write_ops: if enable_transaction {
40 Some(ObjectMapOpList::new())
41 } else {
42 None
43 },
44 }
45 }
46
47 pub fn root(&self) -> ObjectId {
49 self.root.lock().unwrap().clone()
50 }
51
52 pub fn update_root(&self, root_id: ObjectId, prev_id: &ObjectId) -> BuckyResult<()> {
53 let mut root = self.root.lock().unwrap();
54 if *root != *prev_id {
55 let msg = format!(
56 "update root but unmatch! current={}, prev={}, new={}",
57 *root, prev_id, root_id
58 );
59 error!("{}", msg);
60 return Err(BuckyError::new(BuckyErrorCode::Unmatch, msg));
61 }
62
63 info!("objectmap path root updated! {} -> {}", *root, root_id);
64 *root = root_id;
65 Ok(())
66 }
67
68 async fn get_root(&self) -> BuckyResult<ObjectMapRef> {
69 let root_id = self.root();
70 let ret = self.obj_map_cache.get_object_map(&root_id).await?;
71 if ret.is_none() {
72 let msg = format!("load root object but not found! id={}", root_id);
73 error!("{}", msg);
74 return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
75 }
76
77 Ok(ret.unwrap())
78 }
79
80 fn fix_path(path: &str) -> BuckyResult<&str> {
86 let path = path.trim();
87 if path == "/" {
88 return Ok(path);
89 }
90
91 let path = match path.rsplit_once('?') {
93 Some((path, _)) => path,
94 None => path,
95 };
96
97 let path_ret = path.trim_end_matches("/");
99 if !path_ret.starts_with("/") {
100 let msg = format!("invalid objectmap path format! path={}", path);
101 error!("{}", msg);
102 return Err(BuckyError::new(BuckyErrorCode::InvalidFormat, msg));
103 }
104
105 Ok(path_ret)
106 }
107
108 async fn get_object_map(&self, path: &str) -> BuckyResult<Option<ObjectMapRef>> {
110 let mut current = self.get_root().await?;
111
112 let path = Self::fix_path(path)?;
113 if path == "/" {
115 return Ok(Some(current));
116 }
117
118 let parts = path.split("/").skip(1);
120 for part in parts {
121 ObjectMapChecker::check_key_value(part)?;
122
123 let sub = current
124 .lock()
125 .await
126 .get_or_create_child_object_map(
127 &self.obj_map_cache,
128 part,
129 ObjectMapSimpleContentType::Map,
130 ObjectMapCreateStrategy::NotCreate,
131 None,
132 )
133 .await
134 .map_err(|e| {
135 let msg = format!(
136 "get object by path error! path={}, part={}, {}",
137 path, part, e
138 );
139 error!("{}", msg);
140 BuckyError::new(e.code(), msg)
141 })?;
142
143 if sub.is_none() {
144 let msg = format!(
145 "get object by path but not found! path={}, part={}",
146 path, part
147 );
148 warn!("{}", msg);
149 return Ok(None);
150 }
151
152 current = sub.unwrap();
153 debug!(
154 "get objectmap path seg: {}={:?}",
155 part,
156 current.lock().await.cached_object_id()
157 );
158 }
159
160 Ok(Some(current))
161 }
162
163 async fn create_object_map(
165 &self,
166 path: &str,
167 content_type: ObjectMapSimpleContentType,
168 auto_create: ObjectMapCreateStrategy,
169 ) -> BuckyResult<Option<Vec<ObjectMapPathSeg>>> {
170 let root = self.get_root().await?;
171 let current = root.lock().await.clone();
172
173 let path = Self::fix_path(path)?;
174
175 let root_seg = ObjectMapPathSeg {
176 obj_map: current,
177 seg: None,
178 };
179
180 let mut obj_list = vec![root_seg];
181
182 if path == "/" {
184 trace!("object map path list: path={}, list={:?}", path, obj_list);
185 return Ok(Some(obj_list));
186 }
187
188 let parts: Vec<&str> = path.split("/").skip(1).collect();
190 for (index, &part) in parts.iter().enumerate() {
191 ObjectMapChecker::check_key_value(part)?;
192
193 let is_last_part = index == parts.len() - 1;
194 let content_type = if is_last_part {
196 content_type.clone()
197 } else {
198 ObjectMapSimpleContentType::Map
199 };
200
201 let create_strategy = match auto_create {
202 ObjectMapCreateStrategy::CreateIfNotExists => {
203 ObjectMapCreateStrategy::CreateIfNotExists
204 }
205 ObjectMapCreateStrategy::NotCreate => ObjectMapCreateStrategy::NotCreate,
206 ObjectMapCreateStrategy::CreateNew => {
207 if is_last_part {
209 ObjectMapCreateStrategy::CreateNew
210 } else {
211 ObjectMapCreateStrategy::CreateIfNotExists
212 }
213 }
214 };
215
216 let sub = obj_list
217 .last_mut()
218 .unwrap()
219 .obj_map
220 .get_or_create_child_object_map(&self.obj_map_cache, part, content_type, create_strategy, None)
221 .await
222 .map_err(|e| {
223 let msg = format!(
224 "get or create object by path error! path={}, part={}, create_strategy={:?}, {}",
225 path, part, create_strategy, e
226 );
227 error!("{}", msg);
228 BuckyError::new(e.code(), msg)
229 })?;
230
231 if sub.is_none() {
232 let msg = format!(
233 "get object by path but not found! path={}, part={}",
234 path, part
235 );
236 warn!("{}", msg);
237 return Ok(None);
238 }
239
240 let current = sub.unwrap().lock().await.clone();
242 let current_seq = ObjectMapPathSeg {
243 obj_map: current,
244 seg: Some(part.to_owned()),
245 };
246
247 obj_list.push(current_seq);
248 }
249
250 debug!("object map path list: path={}, list={:?}", path, obj_list);
251
252 Ok(Some(obj_list))
253 }
254
255 async fn update_path_obj_map_list(
256 &self,
257 mut obj_map_list: Vec<ObjectMapPathSeg>,
258 ) -> BuckyResult<Vec<(ObjectMap, ObjectId)>> {
259 assert!(!obj_map_list.is_empty());
260
261 let mut current_obj_map = obj_map_list.pop().unwrap();
262 let mut new_obj_map_list = vec![];
263
264 loop {
266 let prev_id = current_obj_map.obj_map.cached_object_id().unwrap();
268 let current_id = current_obj_map.obj_map.flush_id();
269 assert_ne!(prev_id, current_id);
270
271 trace!(
272 "update objectmap path seg: seg={:?}, {} -> {}",
273 current_obj_map.seg, prev_id, current_id
274 );
275
276 new_obj_map_list.push((current_obj_map.obj_map, prev_id.clone()));
278
279 if obj_map_list.is_empty() {
280 break;
281 }
282
283 let seg = current_obj_map.seg.unwrap();
285 assert!(seg.len() > 0);
286
287 let mut parent_obj_map = obj_map_list.pop().unwrap();
288 parent_obj_map
289 .obj_map
290 .set_with_key(
291 &self.obj_map_cache,
292 &seg,
293 ¤t_id,
294 &Some(prev_id),
295 false,
296 )
297 .await
298 .map_err(|e| e)?;
299
300 current_obj_map = parent_obj_map;
301 }
302
303 Ok(new_obj_map_list)
304 }
305
306 fn flush_path_obj_map_list(&self, obj_map_list: Vec<(ObjectMap, ObjectId)>) -> BuckyResult<()> {
308 let count = obj_map_list.len();
309
310 for (index, (obj_map, prev_id)) in obj_map_list.into_iter().enumerate() {
312 let current_id = obj_map.cached_object_id().unwrap();
314 assert_ne!(current_id, prev_id);
315
316 self.obj_map_cache
317 .put_object_map(¤t_id, obj_map, None)?;
318
319 if index + 1 == count {
320 self.update_root(current_id, &prev_id)?;
321 }
322
323 }
328
329 Ok(())
330 }
331
332 pub async fn metadata(&self, path: &str) -> BuckyResult<ObjectMapMetaData> {
333 let ret = self.get_object_map(path).await?;
334 if ret.is_none() {
335 let msg = format!("get value from path but objectmap not found! path={}", path);
336 warn!("{}", msg);
337 return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
338 }
339
340 let ret = ret.unwrap();
341 let obj = ret.lock().await;
342 Ok(obj.metadata())
343 }
344
345 pub async fn list(&self, path: &str) -> BuckyResult<ObjectMapContentList> {
346 let ret = self.get_object_map(path).await?;
347 if ret.is_none() {
348 let msg = format!("get value from path but objectmap not found! path={}", path);
349 warn!("{}", msg);
350 return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
351 }
352
353 let item = ret.unwrap();
354 let obj = item.lock().await;
355 let mut list = ObjectMapContentList::new(obj.count() as usize);
356 obj.list(&self.obj_map_cache, &mut list).await?;
357 Ok(list)
358 }
359
360 pub fn parse_path_allow_empty_key(full_path: &str) -> BuckyResult<(&str, &str)> {
361 let full_path = Self::fix_path(full_path)?;
362
363 if full_path == "/" {
364 return Ok((full_path, ""));
365 }
366
367 let mut path_segs: Vec<&str> = full_path.split("/").collect();
368 if path_segs.len() < 2 {
369 let msg = format!("invalid objectmap full path: {}", full_path);
370 error!("{}", msg);
371 return Err(BuckyError::new(BuckyErrorCode::InvalidFormat, msg));
372 }
373
374 let key = path_segs.pop().unwrap();
375 let trim_len = if path_segs.len() > 1 {
376 key.len() + 1
377 } else {
378 key.len()
379 };
380
381 let path = &full_path[..(full_path.len() - trim_len)];
382 if path.len() == 0 {
383 let msg = format!("invalid objectmap full path: {}", full_path);
384 error!("{}", msg);
385 return Err(BuckyError::new(BuckyErrorCode::InvalidFormat, msg));
386 }
387
388 Ok((path, key))
389 }
390
391 pub fn parse_full_path(full_path: &str) -> BuckyResult<(&str, &str)> {
400 let (path, key) = Self::parse_path_allow_empty_key(full_path)?;
401
402 let full_path = Self::fix_path(full_path)?;
403
404 if key.len() == 0 {
405 let msg = format!("invalid objectmap full path: {}", full_path);
406 error!("{}", msg);
407 return Err(BuckyError::new(BuckyErrorCode::InvalidFormat, msg));
408 }
409
410 Ok((path, key))
411 }
412
413 pub async fn get_by_path(&self, full_path: &str) -> BuckyResult<Option<ObjectId>> {
414 let (path, key) = Self::parse_path_allow_empty_key(full_path)?;
415
416 self.get_by_key(path, key).await
417 }
418
419 pub async fn create_new_with_path(
420 &self,
421 full_path: &str,
422 content_type: ObjectMapSimpleContentType,
423 ) -> BuckyResult<()> {
424 let (path, key) = Self::parse_full_path(full_path)?;
425
426 self.create_new(path, key, content_type).await
427 }
428
429 pub async fn insert_with_path(&self, full_path: &str, value: &ObjectId) -> BuckyResult<()> {
430 let (path, key) = Self::parse_full_path(full_path)?;
431
432 self.insert_with_key(path, key, value).await
433 }
434
435 pub async fn set_with_path(
436 &self,
437 full_path: &str,
438 value: &ObjectId,
439 prev_value: &Option<ObjectId>,
440 auto_insert: bool,
441 ) -> BuckyResult<Option<ObjectId>> {
442 let (path, key) = Self::parse_full_path(full_path)?;
443
444 self.set_with_key(path, key, value, prev_value, auto_insert)
445 .await
446 }
447
448 pub async fn remove_with_path(
449 &self,
450 full_path: &str,
451 prev_value: &Option<ObjectId>,
452 ) -> BuckyResult<Option<ObjectId>> {
453 let (path, key) = Self::parse_full_path(full_path)?;
454
455 self.remove_with_key(path, key, prev_value).await
456 }
457
458 pub async fn create_new(
460 &self,
461 path: &str,
462 key: &str,
463 content_type: ObjectMapSimpleContentType,
464 ) -> BuckyResult<()> {
465 let param = CreateNewParam {
467 key: key.to_owned(),
468 content_type,
469 };
470 let op_data = CreateNewOpData {
471 path: path.to_owned(),
472 param,
473 state: None,
474 };
475
476 let ret = self.create_new_op(&op_data).await?;
477
478 if let Some(write_ops) = &self.write_ops {
481 write_ops.append_op(ObjectMapWriteOp::CreateNew(op_data));
482 }
483
484 Ok(ret)
485 }
486
487 async fn create_new_op(&self, op_data: &CreateNewOpData) -> BuckyResult<()> {
488 let ret = self
490 .create_object_map(
491 &op_data.path,
492 ObjectMapSimpleContentType::Map,
493 ObjectMapCreateStrategy::CreateIfNotExists,
494 )
495 .await?;
496 let mut obj_map_list = ret.unwrap();
497 assert!(obj_map_list.len() > 0);
498
499 obj_map_list
501 .last_mut()
502 .unwrap()
503 .obj_map
504 .get_or_create_child_object_map(
505 &self.obj_map_cache,
506 &op_data.param.key,
507 op_data.param.content_type,
508 ObjectMapCreateStrategy::CreateNew,
509 None,
510 )
511 .await?;
512
513 let list = self.update_path_obj_map_list(obj_map_list).await?;
514 self.flush_path_obj_map_list(list)?;
515
516 Ok(())
517 }
518
519 pub async fn get_by_key(&self, path: &str, key: &str) -> BuckyResult<Option<ObjectId>> {
520 let ret = self.get_object_map(path).await?;
521 if ret.is_none() {
522 info!(
523 "get value from path but objectmap not found! path={}, key={}",
524 path, key
525 );
526 return Ok(None);
527 }
528
529 if key.len() == 0 {
531 let obj_map = ret.as_ref().unwrap().lock().await;
532 return Ok(obj_map.cached_object_id());
533 }
534
535 let ret = ret.unwrap();
536 let obj_map = ret.lock().await;
537 obj_map.get_by_key(&self.obj_map_cache, key).await
538 }
539
540 pub async fn insert_with_key(
541 &self,
542 path: &str,
543 key: &str,
544 value: &ObjectId,
545 ) -> BuckyResult<()> {
546 let param = InsertWithKeyParam {
548 key: key.to_owned(),
549 value: value.to_owned(),
550 };
551 let op_data = InsertWithKeyOpData {
552 path: path.to_owned(),
553 param,
554 state: None,
555 };
556
557 let ret = self.insert_with_key_op(&op_data).await?;
558
559 if let Some(write_ops) = &self.write_ops {
562 write_ops.append_op(ObjectMapWriteOp::InsertWithKey(op_data));
563 }
564
565 Ok(ret)
566 }
567
568 async fn insert_with_key_op(&self, op_data: &InsertWithKeyOpData) -> BuckyResult<()> {
569 let ret = self
571 .create_object_map(
572 &op_data.path,
573 ObjectMapSimpleContentType::Map,
574 ObjectMapCreateStrategy::CreateIfNotExists,
575 )
576 .await?;
577 let mut obj_map_list = ret.unwrap();
578 assert!(obj_map_list.len() > 0);
579
580 obj_map_list
582 .last_mut()
583 .unwrap()
584 .obj_map
585 .insert_with_key(
586 &self.obj_map_cache,
587 &op_data.param.key,
588 &op_data.param.value,
589 )
590 .await?;
591
592 let list = self.update_path_obj_map_list(obj_map_list).await?;
593 self.flush_path_obj_map_list(list)?;
594
595 Ok(())
596 }
597
598 pub async fn set_with_key(
599 &self,
600 path: &str,
601 key: &str,
602 value: &ObjectId,
603 prev_value: &Option<ObjectId>,
604 auto_insert: bool,
605 ) -> BuckyResult<Option<ObjectId>> {
606 let param = SetWithKeyParam {
608 key: key.to_owned(),
609 value: value.to_owned(),
610 prev_value: prev_value.to_owned(),
611 auto_insert,
612 };
613
614 let mut op_data = SetWithKeyOpData {
615 path: path.to_owned(),
616 param,
617 state: None,
618 };
619
620 let ret = self.set_with_key_op(&op_data).await?;
621
622 if let Some(write_ops) = &self.write_ops {
624 let state = ObjectMapKeyState { value: ret.clone() };
625 op_data.state = Some(state);
626
627 write_ops.append_op(ObjectMapWriteOp::SetWithKey(op_data));
628 }
629
630 Ok(ret)
631 }
632
633 async fn set_with_key_op(&self, op_data: &SetWithKeyOpData) -> BuckyResult<Option<ObjectId>> {
634 let create_strategy = if op_data.param.auto_insert {
637 ObjectMapCreateStrategy::CreateIfNotExists
638 } else {
639 ObjectMapCreateStrategy::NotCreate
640 };
641
642 let obj_map_list = self
643 .create_object_map(
644 &op_data.path,
645 ObjectMapSimpleContentType::Map,
646 create_strategy,
647 )
648 .await?;
649 if obj_map_list.is_none() {
650 let msg = format!(
652 "set_with_key but path not found! path={}, value={}",
653 op_data.path, op_data.param.value,
654 );
655 error!("{}", msg);
656 return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
657 }
658
659 let mut obj_map_list = obj_map_list.unwrap();
660 assert!(obj_map_list.len() > 0);
661
662 let ret = obj_map_list
668 .last_mut()
669 .unwrap()
670 .obj_map
671 .set_with_key(
672 &self.obj_map_cache,
673 &op_data.param.key,
674 &op_data.param.value,
675 &op_data.param.prev_value,
676 op_data.param.auto_insert,
677 )
678 .await?;
679
680 if let Some(state) = &op_data.state {
682 if ret != state.value {
683 let msg = format!(
684 "set_with_key with path commit but state conflict! op_data={:?}, ret={:?}",
685 op_data, ret,
686 );
687 warn!("{}", msg);
688 return Err(BuckyError::new(BuckyErrorCode::Conflict, msg));
689 }
690 }
691
692 if ret != Some(op_data.param.value) {
693 let list = self.update_path_obj_map_list(obj_map_list).await?;
694 self.flush_path_obj_map_list(list)?;
695 }
696
697 Ok(ret)
698 }
699
700 pub async fn remove_with_key(
701 &self,
702 path: &str,
703 key: &str,
704 prev_value: &Option<ObjectId>,
705 ) -> BuckyResult<Option<ObjectId>> {
706 let param = RemoveWithKeyParam {
708 key: key.to_owned(),
709 prev_value: prev_value.to_owned(),
710 };
711 let mut op_data = RemoveWithKeyOpData {
712 path: path.to_owned(),
713 param,
714 state: None,
715 };
716
717 let ret = self.remove_with_key_op(&op_data).await?;
718
719 if let Some(write_ops) = &self.write_ops {
721 let state = ObjectMapKeyState { value: ret.clone() };
722 op_data.state = Some(state);
723
724 write_ops.append_op(ObjectMapWriteOp::RemoveWithKey(op_data));
725 }
726
727 Ok(ret)
728 }
729
730 async fn remove_with_key_op(
731 &self,
732 op_data: &RemoveWithKeyOpData,
733 ) -> BuckyResult<Option<ObjectId>> {
734 let (ret, obj_map_list) = loop {
735 let ret = self
736 .create_object_map(
737 &op_data.path,
738 ObjectMapSimpleContentType::Map,
739 ObjectMapCreateStrategy::NotCreate,
740 )
741 .await?;
742
743 if ret.is_none() {
745 debug!(
746 "objectmap path remove_with_key but path not found! root={}, path={}, key={}",
747 self.root(),
748 op_data.path,
749 op_data.param.key,
750 );
751
752 break (None, None);
753 }
754
755 let mut obj_map_list = ret.unwrap();
756 assert!(obj_map_list.len() > 0);
757
758 let ret = obj_map_list
760 .last_mut()
761 .unwrap()
762 .obj_map
763 .remove_with_key(
764 &self.obj_map_cache,
765 &op_data.param.key,
766 &op_data.param.prev_value,
767 )
768 .await?;
769
770 info!(
771 "objectmap path remove_with_key success! root={}, path={}, key={}, value={:?}",
772 self.root(),
773 op_data.path,
774 op_data.param.key,
775 ret
776 );
777 break (ret, Some(obj_map_list));
778 };
779
780 if let Some(state) = &op_data.state {
782 if ret != state.value {
783 let msg = format!(
784 "remove_with_key from path commit but state conflict! op_data={:?}, ret={:?}",
785 op_data, ret,
786 );
787 warn!("{}", msg);
788 return Err(BuckyError::new(BuckyErrorCode::Conflict, msg));
789 }
790 }
791
792 if ret.is_none() {
793 return Ok(None);
794 }
795
796 let list = self.update_path_obj_map_list(obj_map_list.unwrap()).await?;
798 self.flush_path_obj_map_list(list)?;
799
800 Ok(ret)
801 }
802
803 pub async fn contains(&self, path: &str, object_id: &ObjectId) -> BuckyResult<bool> {
805 let ret = self.get_object_map(path).await?;
806
807 if ret.is_none() {
808 let msg = format!(
809 "contains from path but objectmap not found! path={}, value={}",
810 path, object_id,
811 );
812 error!("{}", msg);
813 return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
814 }
815
816 let ret = ret.unwrap();
817 let obj_map = ret.lock().await;
818 obj_map.contains(&self.obj_map_cache, object_id).await
819 }
820
821 pub async fn insert(&self, path: &str, object_id: &ObjectId) -> BuckyResult<bool> {
822 let param = InsertParam {
824 value: object_id.to_owned(),
825 };
826 let mut op_data = InsertOpData {
827 path: path.to_owned(),
828 param,
829 state: None,
830 };
831
832 let ret = self.insert_op(&op_data).await?;
833
834 if let Some(write_ops) = &self.write_ops {
836 op_data.state = Some(ret);
837
838 write_ops.append_op(ObjectMapWriteOp::Insert(op_data));
839 }
840
841 Ok(ret)
842 }
843
844 async fn insert_op(&self, op_data: &InsertOpData) -> BuckyResult<bool> {
845 let obj_map_list = self
846 .create_object_map(
847 &op_data.path,
848 ObjectMapSimpleContentType::Set,
849 ObjectMapCreateStrategy::CreateIfNotExists,
850 )
851 .await?;
852
853 let mut obj_map_list = obj_map_list.unwrap();
854 assert!(obj_map_list.len() > 0);
855
856 let ret = obj_map_list
858 .last_mut()
859 .unwrap()
860 .obj_map
861 .insert(&self.obj_map_cache, &op_data.param.value)
862 .await?;
863 if let Some(state) = &op_data.state {
865 if *state != ret {
866 let msg = format!(
867 "insert to path commit but state conflict! op_data={:?}",
868 op_data,
869 );
870 warn!("{}", msg);
871 return Err(BuckyError::new(BuckyErrorCode::Conflict, msg));
872 }
873 }
874
875 if ret {
877 let list = self.update_path_obj_map_list(obj_map_list).await?;
879 self.flush_path_obj_map_list(list)?;
880 }
881
882 Ok(ret)
883 }
884
885 pub async fn remove(&self, path: &str, object_id: &ObjectId) -> BuckyResult<bool> {
886 let param = RemoveParam {
888 value: object_id.to_owned(),
889 };
890 let mut op_data = RemoveOpData {
891 path: path.to_owned(),
892 param,
893 state: None,
894 };
895
896 let ret = self.remove_op(&op_data).await?;
897
898 if let Some(write_ops) = &self.write_ops {
900 op_data.state = Some(ret);
901
902 write_ops.append_op(ObjectMapWriteOp::Remove(op_data));
903 }
904
905 Ok(ret)
906 }
907
908 async fn remove_op(&self, op_data: &RemoveOpData) -> BuckyResult<bool> {
909 let ret = self
910 .create_object_map(
911 &op_data.path,
912 ObjectMapSimpleContentType::Set,
913 ObjectMapCreateStrategy::NotCreate,
914 )
915 .await?;
916
917 if ret.is_none() {
919 let msg = format!(
920 "remove but path not found! path={}, value={}",
921 op_data.path, op_data.param.value,
922 );
923 error!("{}", msg);
924 return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
925 }
926
927 let mut obj_map_list = ret.unwrap();
928 assert!(obj_map_list.len() > 0);
929
930 let ret = obj_map_list
932 .last_mut()
933 .unwrap()
934 .obj_map
935 .remove(&self.obj_map_cache, &op_data.param.value)
936 .await?;
937
938 if let Some(state) = &op_data.state {
940 if *state != ret {
941 let msg = format!(
942 "remove from path commit but state conflict! op_data={:?}",
943 op_data,
944 );
945 warn!("{}", msg);
946 return Err(BuckyError::new(BuckyErrorCode::Conflict, msg));
947 }
948 }
949
950 if ret {
951 let list = self.update_path_obj_map_list(obj_map_list).await?;
953 self.flush_path_obj_map_list(list)?;
954 }
955
956 Ok(ret)
957 }
958
959 pub fn clear_op_list(&self) {
960 if let Some(write_ops) = &self.write_ops {
961 let _ = write_ops.fetch_all();
962 }
963 }
964
965 pub async fn commit_op_list(&self) -> BuckyResult<()> {
967 let op_list = self.write_ops.as_ref().unwrap().fetch_all();
968
969 for op_data in op_list {
970 self.commit_op(op_data).await?;
971 }
972
973 Ok(())
974 }
975
976 async fn commit_op(&self, op: ObjectMapWriteOp) -> BuckyResult<()> {
977 match op {
978 ObjectMapWriteOp::CreateNew(op_data) => {
979 self.create_new_op(&op_data).await?;
980 }
981 ObjectMapWriteOp::InsertWithKey(op_data) => {
982 self.insert_with_key_op(&op_data).await?;
983 }
984 ObjectMapWriteOp::SetWithKey(op_data) => {
985 self.set_with_key_op(&op_data).await?;
986 }
987 ObjectMapWriteOp::RemoveWithKey(op_data) => {
988 self.remove_with_key_op(&op_data).await?;
989 }
990
991 ObjectMapWriteOp::Insert(op_data) => {
992 self.insert_op(&op_data).await?;
993 }
994 ObjectMapWriteOp::Remove(op_data) => {
995 self.remove_op(&op_data).await?;
996 }
997 }
998
999 Ok(())
1000 }
1001}
1002
1003#[cfg(test)]
1004mod test_path {
1005 use super::super::cache::*;
1006 use super::super::path_iterator::*;
1007 use super::*;
1008
1009 use std::str::FromStr;
1010
1011 async fn dump_path(item: &ObjectMapPath, path: &str) {
1012 let list = item.list(path).await.unwrap();
1013 info!("dump path={} as follows:", path);
1014 info!("{}", list);
1015 }
1016
1017 async fn test_path1(path: &ObjectMapPath) {
1018 let x1_value = ObjectId::from_str("5aSixgPg3hDa1oU9eAtRcKTyVKg5X2bVXWPVhk3U5c7G").unwrap();
1019 let x1_value2 = ObjectId::from_str("5aSixgPCivmQfASRbjAvBiwgxhU8LrNtYtC2D6Lis2NQ").unwrap();
1020
1021 path.insert_with_key("/", "x1", &x1_value).await.unwrap();
1022
1023 let ret = path.get_by_key("/a/b/c", "x1").await.unwrap();
1024 assert!(ret.is_none());
1025
1026 let ret = path.get_by_path("/a/b/c/x1").await.unwrap();
1027 assert!(ret.is_none());
1028
1029 path.insert_with_key("/a/b/c", "x1", &x1_value)
1030 .await
1031 .unwrap();
1032 let ret = path.insert_with_path("/a/b/c/x1", &x1_value).await;
1033 let e = ret.unwrap_err();
1034 assert_eq!(e.code(), BuckyErrorCode::AlreadyExists);
1035
1036 let ret = path.get_by_key("/a/b/c", "x1").await.unwrap();
1037 assert_eq!(ret, Some(x1_value));
1038 let ret = path.get_by_path("/a/b/c/x1").await.unwrap();
1039 assert_eq!(ret, Some(x1_value));
1040
1041 dump_path(path, "/").await;
1042 dump_path(path, "/a").await;
1043 dump_path(path, "/a/b").await;
1044 dump_path(path, "/a/b/c").await;
1045 let ret = path.get_by_key("/a/b/c", "x1").await.unwrap();
1046 assert_eq!(ret, Some(x1_value));
1047
1048 let ret = path.insert_with_key("/a/b/c", "x1", &x1_value).await;
1050 let err = ret.unwrap_err();
1051 assert_eq!(err.code(), BuckyErrorCode::AlreadyExists);
1052
1053 let ret = path
1055 .set_with_key("/a/b/c", "x1", &x1_value2, &Some(x1_value2), false)
1056 .await;
1057 assert!(ret.is_err());
1058 let err = ret.unwrap_err();
1059 assert_eq!(err.code(), BuckyErrorCode::Unmatch);
1060
1061 let ret = path
1062 .set_with_key("/a/b/c", "x1", &x1_value2, &Some(x1_value), false)
1063 .await
1064 .unwrap();
1065 assert_eq!(ret, Some(x1_value));
1066
1067 let ret = path.remove_with_key("/a/b/c", "x1", &Some(x1_value)).await;
1069 assert!(ret.is_err());
1070 let err = ret.unwrap_err();
1071 assert_eq!(err.code(), BuckyErrorCode::Unmatch);
1072
1073 let ret = path.remove_with_key("/a/b/c", "x1", &None).await.unwrap();
1074 assert_eq!(ret, Some(x1_value2));
1075
1076 let ret = path
1078 .set_with_key("/a/b/c", "x1", &x1_value2, &None, false)
1079 .await;
1080 assert!(ret.is_err());
1081 let err = ret.unwrap_err();
1082 assert_eq!(err.code(), BuckyErrorCode::NotFound);
1083
1084 let ret = path
1086 .set_with_key("/a/b/c", "x1", &x1_value2, &None, true)
1087 .await
1088 .unwrap();
1089 assert_eq!(ret, None);
1090
1091 let ret = path.get_by_key("/a/b/c", "x1").await.unwrap();
1092 assert_eq!(ret, Some(x1_value2));
1093
1094 let ret = path.remove_with_key("/a/b/c", "x1", &None).await.unwrap();
1095 assert_eq!(ret, Some(x1_value2));
1096
1097 let ret = path.get_by_key("/a/b/c", "x1").await.unwrap();
1098 assert!(ret.is_none());
1099
1100 let ret = path.get_by_key("/a/b", "c").await.unwrap();
1101 assert!(ret.is_some());
1102 let c_id = ret.unwrap();
1103 info!("/a/b/c={}", c_id);
1104
1105 dump_path(path, "/").await;
1106 dump_path(path, "/a").await;
1107 dump_path(path, "/a/b").await;
1108 dump_path(path, "/a/b/c").await;
1109
1110 let ret = path.remove_with_key("/a/b", "c", &None).await.unwrap();
1111 assert_eq!(ret, Some(c_id));
1112
1113 let ret = path.get_by_key("/a/b/c", "x1").await.unwrap();
1114 assert!(ret.is_none());
1115
1116 let ret = path.get_by_key("/a/b", "c").await.unwrap();
1117 assert!(ret.is_none());
1118
1119 let ret = path.get_by_key("/a/b/c", "x1").await.unwrap();
1120 assert!(ret.is_none());
1121
1122 let ret = path.get_by_path("/").await.unwrap();
1123 assert!(ret.is_some());
1124
1125 path.create_new("/a/b", "c", ObjectMapSimpleContentType::Set)
1126 .await
1127 .unwrap();
1128 if let Err(e) = path
1129 .create_new("/a/b", "c", ObjectMapSimpleContentType::Set)
1130 .await
1131 {
1132 assert!(e.code() == BuckyErrorCode::AlreadyExists);
1133 } else {
1134 unreachable!();
1135 }
1136 if let Err(e) = path
1137 .create_new("/a/b", "c", ObjectMapSimpleContentType::Set)
1138 .await
1139 {
1140 assert!(e.code() == BuckyErrorCode::AlreadyExists);
1141 } else {
1142 unreachable!();
1143 }
1144
1145 let ret = path.get_by_key("/a/b", "c").await.unwrap();
1146 assert!(ret.is_some());
1147 }
1148
1149 async fn test_path() {
1150 let noc = ObjectMapMemoryNOCCache::new();
1151 let root_cache = ObjectMapRootMemoryCache::new_default_ref(None, noc);
1152 let cache = ObjectMapOpEnvMemoryCache::new_ref(root_cache.clone());
1153
1154 let owner = ObjectId::default();
1156 let root = ObjectMap::new(
1157 ObjectMapSimpleContentType::Map,
1158 Some(owner.clone()),
1159 Some(owner.clone()),
1160 )
1161 .no_create_time()
1162 .build();
1163 let root_id = root.flush_id();
1164 cache.put_object_map(&root_id, root, None).unwrap();
1165 info!("new root: {}", root_id);
1166
1167 let path = ObjectMapPath::new(root_id.clone(), cache.clone(), true);
1168 test_path1(&path).await;
1169
1170 let opt = ObjectMapPathIteratorOption::new(true, true);
1171 let root = path.root();
1172 let root_obj = cache.get_object_map(&root).await.unwrap();
1173 let mut it =
1174 ObjectMapPathIterator::new(root_obj.unwrap(), cache.clone(), opt.clone()).await;
1175 while !it.is_end() {
1176 let list = it.next(5).await.unwrap();
1177 info!("list: {} {:?}", 1, list.list);
1178 }
1179
1180 let root_id = path.root();
1181 info!("result root: {}", root_id);
1182
1183 cache.gc(false, &root_id).await.unwrap();
1184
1185 let root_obj = cache.get_object_map(&root_id).await.unwrap();
1186 let mut it =
1187 ObjectMapPathIterator::new(root_obj.unwrap(), cache.clone(), opt.clone()).await;
1188 while !it.is_end() {
1189 let list = it.next(5).await.unwrap();
1190 info!("list: {} {:?}", 1, list.list);
1191 }
1192 }
1193
1194 #[test]
1195 fn test_full_path() {
1196 ObjectMapPath::parse_full_path("/").unwrap_err();
1197 let (path, key) = ObjectMapPath::parse_full_path("/a").unwrap();
1198 assert_eq!(path, "/");
1199 assert_eq!(key, "a");
1200
1201 let (path, key) = ObjectMapPath::parse_full_path("/a/").unwrap();
1202 assert_eq!(path, "/");
1203 assert_eq!(key, "a");
1204
1205 let (path, key) = ObjectMapPath::parse_full_path("/a/b").unwrap();
1206 assert_eq!(path, "/a");
1207 assert_eq!(key, "b");
1208
1209 let (path, key) = ObjectMapPath::parse_full_path("/eeee/eeee").unwrap();
1210 assert_eq!(path, "/eeee");
1211 assert_eq!(key, "eeee");
1212
1213 let (path, key) = ObjectMapPath::parse_full_path("/eeee/eeee/").unwrap();
1214 assert_eq!(path, "/eeee");
1215 assert_eq!(key, "eeee");
1216 }
1217
1218 #[test]
1219 fn test() {
1220 crate::init_simple_log("test-object-map-path", Some("debug"));
1221 test_full_path();
1222 async_std::task::block_on(async move {
1223 test_path().await;
1224 });
1225 }
1226}