cyfs_base/objects/object_map/
diff.rs

1use super::cache::*;
2use super::iterator::*;
3use super::object_map::*;
4use crate::*;
5
6use std::collections::VecDeque;
7
8/*
9pub const OBJECT_MAP_DIFF_ADDED_KEY: &str = "added";
10pub const OBJECT_MAP_DIFF_ALTERED_KEY: &str = "altered";
11pub const OBJECT_MAP_DIFF_PREV_KEY: &str = "prev";
12pub const OBJECT_MAP_DIFF_REMOVED_KEY: &str = "removed";
13pub const OBJECT_MAP_DIFF_DIFF_KEY: &str = "diff";
14*/
15
16enum ObjectMapPendingDiff {
17    SubAlter((ObjectId, ObjectId)),
18    Alter((ObjectId, ObjectId)),
19    Add(ObjectId),
20    Remove(ObjectId),
21}
22
23enum ObjectMapPendingResult {
24    Map((ObjectMapDiffAction, String, Option<ObjectId>, ObjectId)),
25    Set((ObjectMapDiffAction, ObjectId)),
26}
27/*
28同类型的ObjectMap才可以Diff
29不同类型的ObjectMap,必须在上一级完成diff
30*/
31
32pub struct ObjectMapDiff {
33    owner: Option<ObjectId>,
34    dec_id: Option<ObjectId>,
35
36    // 只有两个类型一致的ObjectMap才可以diff操作
37    content_type: ObjectMapSimpleContentType,
38
39    // 是否递归展开child diff
40    expand_altered: bool,
41
42    cache: ObjectMapOpEnvCacheRef,
43
44    // diff结果存放的ObjectMap
45    // Map -> DiffMap, Set -> DiffSet
46    result: Option<ObjectMap>,
47
48    // 等待被异步处理的diff队列
49    pending_diffs: VecDeque<ObjectMapPendingDiff>,
50
51    // 等待被添加到结果ObjectMap的对象列表
52    pending_results: VecDeque<ObjectMapPendingResult>,
53}
54
55#[derive(Clone, Debug, Eq, PartialEq)]
56pub enum ObjectMapDiffAction {
57    Add,
58    Alter,
59    Remove,
60}
61
62// 两个ObjectMap的叶子节点的最终的diff,一定会落到下面两种形式:
63// 一、两个SimpleContent的diff
64// 二、一个SimpleContent和一个HubContent的diff
65
66impl ObjectMapDiff {
67    pub fn new(
68        owner: Option<ObjectId>,
69        dec_id: Option<ObjectId>,
70        cache: ObjectMapOpEnvCacheRef,
71        content_type: ObjectMapSimpleContentType,
72        expand_altered: bool,
73    ) -> Self {
74        let result = ObjectMap::new(
75            content_type.get_diff_type().unwrap(),
76            owner.clone(),
77            dec_id.clone(),
78        )
79        .build();
80
81        Self {
82            owner,
83            dec_id,
84            cache,
85            content_type,
86            expand_altered,
87
88            result: Some(result),
89
90            pending_diffs: VecDeque::new(),
91            pending_results: VecDeque::new(),
92        }
93    }
94
95    // 计算两个对象的diff,id不能相同
96    #[async_recursion::async_recursion]
97    pub async fn diff_objects(
98        cache: &ObjectMapOpEnvCacheRef,
99        prev: &ObjectId,
100        next: &ObjectId,
101        expand_altered: bool,
102    ) -> BuckyResult<ObjectId> {
103        if *prev == *next {
104            let msg = format!("diff object but with id the same! id={}", prev);
105            error!("{}", msg);
106            return Err(BuckyError::new(BuckyErrorCode::InvalidParam, msg));
107        }
108
109        let prev_obj = cache.get_object_map(prev).await?;
110        if prev_obj.is_none() {
111            let msg = format!("diff object but not found! prev={}", prev);
112            error!("{}", msg);
113            return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
114        }
115
116        let next_obj = cache.get_object_map(next).await?;
117        if next_obj.is_none() {
118            let msg = format!("diff object but not found! altered={}", next);
119            error!("{}", msg);
120            return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
121        }
122
123        let prev_obj = prev_obj.unwrap();
124        let next_obj = next_obj.unwrap();
125
126        let mut diff = {
127            let prev_obj = prev_obj.lock().await;
128            let next_obj = next_obj.lock().await;
129
130            if prev_obj.content_type() != next_obj.content_type() {
131                let msg = format!(
132                    "diff object but content_type not match! prev={}, next={}",
133                    prev, next
134                );
135                error!("{}", msg);
136                return Err(BuckyError::new(BuckyErrorCode::Unmatch, msg));
137            }
138
139            ObjectMapDiff::new(
140                prev_obj.desc().owner().to_owned(),
141                prev_obj.desc().dec_id().to_owned(),
142                cache.clone(),
143                prev_obj.content_type(),
144                expand_altered,
145            )
146        };
147
148        // 把根ObjectMap放置到pending列表
149        diff.pend_async_sub_alter(&prev, &next);
150
151        // 计算diff
152        diff.calc_diff().await?;
153
154        // 转换成diff_object
155        diff.into_object_map().await
156    }
157
158    async fn into_object_map(&mut self) -> BuckyResult<ObjectId> {
159        let diff_object = self.result.take().unwrap();
160
161        let id = diff_object.flush_id();
162        self.cache.put_object_map(&id, diff_object, None)?;
163
164        Ok(id)
165    }
166
167    pub(super) fn map_alter_item(
168        &mut self,
169        key: &str,
170        prev: impl IntoObjectMapContentItem,
171        value: impl IntoObjectMapContentItem,
172    ) {
173        assert_eq!(self.content_type, ObjectMapSimpleContentType::Map);
174
175        let prev = prev.into_content(None).into_set_item();
176        let (key, value) = value.into_content(Some(key)).into_map_item();
177
178        self.pending_results.push_back(ObjectMapPendingResult::Map((
179            ObjectMapDiffAction::Alter,
180            key,
181            Some(prev),
182            value,
183        )));
184    }
185
186    pub(super) fn map_item(
187        &mut self,
188        action: ObjectMapDiffAction,
189        key: &str,
190        value: impl IntoObjectMapContentItem,
191    ) {
192        assert_eq!(self.content_type, ObjectMapSimpleContentType::Map);
193        assert!(action != ObjectMapDiffAction::Alter);
194
195        let (key, value) = value.into_content(Some(key)).into_map_item();
196
197        self.pending_results
198            .push_back(ObjectMapPendingResult::Map((action, key, None, value)));
199    }
200
201    pub(super) fn set_item(
202        &mut self,
203        action: ObjectMapDiffAction,
204        value: impl IntoObjectMapContentItem,
205    ) {
206        assert_eq!(self.content_type, ObjectMapSimpleContentType::Set);
207
208        let value = value.into_content(None).into_set_item();
209        self.pending_results
210            .push_back(ObjectMapPendingResult::Set((action, value)));
211    }
212
213    async fn deal_with_pending_result(&mut self) -> BuckyResult<()> {
214        let cache = self.cache.clone();
215        loop {
216            let item = self.pending_results.pop_front();
217            if item.is_none() {
218                break;
219            }
220
221            let item = item.unwrap();
222            match item {
223                ObjectMapPendingResult::Map((action, key, prev, value)) => {
224                    let item = match action {
225                        ObjectMapDiffAction::Alter => {
226                            assert!(prev.is_some());
227
228                            // 如果需要展开,那么这里异步的计算
229                            let diff = if self.expand_altered {
230                                let prev = prev.as_ref().unwrap();
231
232                                if prev.obj_type_code() == ObjectTypeCode::ObjectMap && value.obj_type_code() == ObjectTypeCode::ObjectMap {
233                                    let cache = cache.clone();
234                                    let prev = prev.to_owned();
235                                    let value = value.clone();
236
237                                    info!("will calc sub diff: prev={}, altered={}", prev, value);
238                                    let diff_id = async_std::task::spawn(async move {
239                                        Self::diff_objects(&cache, &prev, &value, true).await
240                                    }).await?;
241
242                                    Some(diff_id)
243                                } else {
244                                    None
245                                }
246                            } else {
247                                None
248                            };
249
250                            ObjectMapDiffMapItem {
251                                prev,
252                                altered: Some(value),
253                                diff,
254                            }
255                        }
256                        ObjectMapDiffAction::Add => {
257                            assert!(prev.is_none());
258
259                            ObjectMapDiffMapItem {
260                                prev: None,
261                                altered: Some(value),
262                                diff: None,
263                            }
264                        }
265                        ObjectMapDiffAction::Remove => {
266                            assert!(prev.is_none());
267
268                            ObjectMapDiffMapItem {
269                                prev: Some(value),
270                                altered: None,
271                                diff: None,
272                            }
273                        }
274                    };
275
276                    self.result
277                        .as_mut()
278                        .unwrap()
279                        .diff_insert_with_key(&cache, &key, &item)
280                        .await?;
281                }
282                ObjectMapPendingResult::Set((action, value)) => {
283                    let item = match action {
284                        ObjectMapDiffAction::Add => ObjectMapDiffSetItem {
285                            prev: None,
286                            altered: Some(value),
287                        },
288                        ObjectMapDiffAction::Remove => ObjectMapDiffSetItem {
289                            prev: Some(value),
290                            altered: None,
291                        },
292                        _ => {
293                            unreachable!();
294                        }
295                    };
296
297                    self.result
298                        .as_mut()
299                        .unwrap()
300                        .diff_insert(&cache, &item)
301                        .await?;
302                }
303            }
304        }
305
306        Ok(())
307    }
308
309    // 添加一个pending的diff
310    pub(crate) fn pend_async_sub_alter(&mut self, prev: &ObjectId, next: &ObjectId) {
311        self.pending_diffs
312            .push_back(ObjectMapPendingDiff::SubAlter((
313                prev.to_owned(),
314                next.to_owned(),
315            )));
316    }
317
318    pub(crate) fn pend_async_alter(&mut self, prev: ObjectId, next: ObjectId) {
319        self.pending_diffs.push_back(ObjectMapPendingDiff::Alter((
320            prev.to_owned(),
321            next.to_owned(),
322        )));
323    }
324
325    pub(crate) fn pend_async_add(&mut self, value: &ObjectId) {
326        self.pending_diffs
327            .push_back(ObjectMapPendingDiff::Add(value.to_owned()));
328    }
329
330    pub(crate) fn pend_async_remove(&mut self, value: &ObjectId) {
331        self.pending_diffs
332            .push_back(ObjectMapPendingDiff::Remove(value.to_owned()));
333    }
334
335    async fn calc_diff(&mut self) -> BuckyResult<()> {
336        loop {
337            if self.pending_diffs.is_empty() {
338                break;
339            }
340
341            let pending = self.pending_diffs.pop_front().unwrap();
342            match pending {
343                ObjectMapPendingDiff::SubAlter((prev, next)) => {
344                    self.diff_recursive(&prev, &next).await?;
345                }
346                ObjectMapPendingDiff::Alter((prev, next)) => {
347                    self.diff_hub_simple(&prev, &next).await?;
348                }
349                ObjectMapPendingDiff::Add(value) => {
350                    self.diff_all(ObjectMapDiffAction::Add, &value).await?;
351                }
352                ObjectMapPendingDiff::Remove(value) => {
353                    self.diff_all(ObjectMapDiffAction::Remove, &value).await?;
354                }
355            }
356        }
357
358        self.deal_with_pending_result().await?;
359
360        Ok(())
361    }
362
363    // 将整个ObjectMap/SubObjectMap添加到目标列表,通过遍历算法获取所有元素
364    async fn diff_all(
365        &mut self,
366        action: ObjectMapDiffAction,
367        target: &ObjectId,
368    ) -> BuckyResult<()> {
369        assert!(action == ObjectMapDiffAction::Add || action == ObjectMapDiffAction::Remove);
370
371        let target_obj = self.cache.get_object_map(target).await?;
372
373        if target_obj.is_none() {
374            let msg = format!("diff all but not found! target={}", target);
375            error!("{}", msg);
376            return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
377        }
378
379        let target_obj = target_obj.unwrap();
380
381        let mut it = ObjectMapBindIterator::new_with_target(target_obj, self.cache.clone()).await;
382
383        while !it.is_end() {
384            // 处理已经缓存的结果
385            self.deal_with_pending_result().await?;
386
387            let list = it.next(32).await?;
388            for item in list.list {
389                match item {
390                    ObjectMapContentItem::Map((key, value)) => {
391                        self.map_item(action.clone(), &key, value);
392                    }
393                    ObjectMapContentItem::Set(value) => {
394                        self.set_item(action.clone(), value);
395                    }
396                    _ => unreachable!(),
397                }
398            }
399        }
400
401        Ok(())
402    }
403
404    async fn diff_recursive(&mut self, prev: &ObjectId, next: &ObjectId) -> BuckyResult<()> {
405        let prev = self.cache.get_object_map(prev).await?;
406        let next = self.cache.get_object_map(next).await?;
407
408        if prev.is_none() || next.is_none() {
409            let msg = format!(
410                "recursive diff object but not found! prev={:?}, next={:?}",
411                prev, next
412            );
413            error!("{}", msg);
414            return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
415        }
416
417        let prev = prev.unwrap();
418        let next = next.unwrap();
419        let prev_obj = prev.lock().await;
420        let next_obj = next.lock().await;
421        prev_obj.diff(&next_obj, self);
422
423        Ok(())
424    }
425
426    // hub和simple模式的两个ObjectMap求diff,通过遍历的方法
427    async fn diff_hub_simple(&mut self, prev: &ObjectId, next: &ObjectId) -> BuckyResult<()> {
428        let prev = self.cache.get_object_map(prev).await?;
429        let next = self.cache.get_object_map(next).await?;
430
431        if prev.is_none() || next.is_none() {
432            let msg = format!(
433                "diff object but not found! prev={:?}, next={:?}",
434                prev, next
435            );
436            error!("{}", msg);
437            return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
438        }
439
440        let prev = prev.unwrap();
441        let next = next.unwrap();
442
443        // 遍历查找修改和移除的元素
444        let mut it = ObjectMapBindIterator::new_with_target(prev.clone(), self.cache.clone()).await;
445        while !it.is_end() {
446            // 处理已经缓存的结果
447            self.deal_with_pending_result().await?;
448
449            let list = it.next(32).await?;
450            for item in list.list {
451                let next_obj = next.lock().await;
452
453                match item {
454                    ObjectMapContentItem::Map((key, value)) => {
455                        if let Some(next_value) = next_obj.get_by_key(&self.cache, &key).await? {
456                            if value != next_value {
457                                self.map_alter_item(&key, value, next_value);
458                            }
459                        } else {
460                            self.map_item(ObjectMapDiffAction::Remove, &key, value);
461                        }
462                    }
463                    ObjectMapContentItem::Set(value) => {
464                        if !next_obj.contains(&self.cache, &value).await? {
465                            self.set_item(ObjectMapDiffAction::Remove, value);
466                        }
467                    }
468                    _ => unreachable!(),
469                }
470            }
471        }
472
473        let mut it = ObjectMapBindIterator::new_with_target(next, self.cache.clone()).await;
474        while !it.is_end() {
475            // 处理已经缓存的结果
476            self.deal_with_pending_result().await?;
477
478            let list = it.next(32).await?;
479            for item in list.list {
480                let prev_obj = prev.lock().await;
481
482                match item {
483                    ObjectMapContentItem::Map((key, value)) => {
484                        if let None = prev_obj.get_by_key(&self.cache, &key).await? {
485                            self.map_item(ObjectMapDiffAction::Add, &key, value);
486                        }
487                    }
488                    ObjectMapContentItem::Set(value) => {
489                        if !prev_obj.contains(&self.cache, &value).await? {
490                            self.set_item(ObjectMapDiffAction::Add, value);
491                        }
492                    }
493                    _ => unreachable!(),
494                }
495            }
496        }
497
498        Ok(())
499    }
500
501    // dest - source = diff => source + diff = dest
502    pub async fn apply_diff(
503        cache: &ObjectMapOpEnvCacheRef,
504        source_id: &ObjectId,
505        diff_id: &ObjectId,
506    ) -> BuckyResult<ObjectId> {
507        assert_ne!(source_id, diff_id);
508
509        let source = cache.get_object_map(source_id).await?;
510        if source.is_none() {
511            let msg = format!("source object not found! target={}", source_id);
512            error!("{}", msg);
513            return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
514        }
515        let source = source.unwrap();
516
517        let diff = cache.get_object_map(diff_id).await?;
518        if diff.is_none() {
519            let msg = format!("diff object not found! target={}", diff_id);
520            error!("{}", msg);
521            return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
522        }
523        let diff = diff.unwrap();
524
525        let source = source.lock().await;
526
527        // 检查类型是否匹配
528        let content_type = source.content_type();
529        let diff_content_type = diff.lock().await.content_type();
530        if !content_type.is_diff_match(&diff_content_type) {
531            let msg = format!(
532                "apply diff with unmatched objectmap content type: source={:?}, diff={:?}",
533                content_type, diff_content_type
534            );
535            error!("{}", msg);
536            return Err(BuckyError::new(BuckyErrorCode::Unmatch, msg));
537        }
538
539        // source需要clone一份再修改
540        let mut source = source.clone();
541
542        let mut it = ObjectMapBindIterator::new_with_target(diff, cache.clone()).await;
543        while !it.is_end() {
544            let list = it.next(32).await?;
545            for item in list.list {
546                match item {
547                    ObjectMapContentItem::DiffMap((key, value)) => {
548                       if value.prev.is_none() {
549                            // add action
550                            // FIXME 如果出现了错误项,是否继续?
551                            if value.altered.is_none() {
552                                error!("invalid diffmap content item: {}", value);
553                                continue;
554                            }
555                            debug!("will apply diff added item: {}={}", key, value);
556
557                            let altered = value.altered.unwrap();
558                            // 这里使用set_with_key而不是insert_with_key,是为了处理已经存在情况下的容错
559                            let ret = source
560                                .set_with_key(&cache, &key, &altered, &None, true)
561                                .await?;
562                            if ret.is_some() {
563                                error!("apply added diff but key/value already exists! key={}, added={}, current={:?}", key, altered, ret);
564                            }
565                        } else if value.altered.is_none() {
566                            // remove action
567                            debug!("will apply diff removed item: {}={}", key, value);
568
569                            // FIXME 这里是否需要对当前状态进行检查?
570                            let prev = value.prev.unwrap();
571                            let ret = source.remove_with_key(&cache, &key, &None).await?;
572                            if ret != Some(prev) {
573                                error!("apply removed diff but key/value not match! key={}, removed={}, current={:?}", key, prev, ret);
574                            }
575                        } else {
576                            // altered action
577                            let prev = value.prev.as_ref().unwrap();
578                            let altered = value.altered.as_ref().unwrap();
579                            debug!("will apply diff altered item: {}={}", key, value);
580
581                            // FIXME 这里是否需要对当前状态进行检查?
582                            let ret = source
583                                .set_with_key(&cache, &key, &altered, &None, false)
584                                .await?;
585                            if ret != Some(*prev) {
586                                error!(
587                                    "apply altered diff but prev value not the same! key={}, prev={}, altered={}, current={:?}",
588                                    key, prev, altered, ret,
589                                );
590                            }
591
592                            if value.diff.is_some() {
593                                if value.diff.as_ref().unwrap().obj_type_code() == ObjectTypeCode::ObjectMap {
594                                    debug!("will apply diff recursive: {}={}", key, value);
595                                    Self::apply_diff_recursive(cache, value).await?;
596                                } else {
597                                    // allow some user defined diff object for leaf node
598                                    debug!("ignore diff object type: {}={}", key, value);
599                                }
600                            }
601                        }
602                    }
603                    ObjectMapContentItem::DiffSet(value) => {
604                        if value.prev.is_some() {
605                            debug!("will apply removed diff: {}", value);
606
607                            let prev = value.prev.unwrap();
608                            let ret = source.remove(&cache, &prev).await?;
609                            if !ret {
610                                error!("apply removed diff but not found! value={}", prev);
611                            }
612                        } else if value.altered.is_some() {
613                            debug!("will apply added diff: {}", value);
614
615                            let altered = value.altered.unwrap();
616                            let ret = source.insert(&cache, &altered).await?;
617                            if !ret {
618                                error!("apply added diff but already exists! value={}", altered);
619                            }
620                        } else {
621                            // FIXME 如果出现了错误项,是否继续?
622                            error!("invalid diffmap content item: {}", value);
623                            continue;
624                        }
625                    }
626                    _ => {
627                        unreachable!();
628                    }
629                }
630            }
631        }
632
633        // 刷新id
634        let id = source.flush_id();
635
636        // 如果和源对象一致
637        if id == *source_id {
638            warn!(
639                "apply diff to source object but not changed! source={}, diff={}",
640                source_id, diff_id
641            );
642            return Ok(id);
643        }
644
645        info!(
646            "apply diff to source object success! source={}, diff={}, result={}",
647            source_id, diff_id, id
648        );
649        cache.put_object_map(&id, source, None)?;
650
651        Ok(id)
652    }
653
654    #[async_recursion::async_recursion]
655    async fn apply_diff_recursive(
656        cache: &ObjectMapOpEnvCacheRef,
657        diff_item: ObjectMapDiffMapItem,
658    ) -> BuckyResult<ObjectId> {
659        let prev_id = diff_item.prev.unwrap();
660        let diff_id = diff_item.diff.unwrap();
661
662        let prev_code = prev_id.obj_type_code();
663        let diff_code = diff_id.obj_type_code();
664
665        if prev_code != ObjectTypeCode::ObjectMap || diff_code != ObjectTypeCode::ObjectMap {
666            let msg = format!("apply diff but recursive but invalid objectmap object type! prev={}, diff={}, prev_code={:?}, diff_code={:?}",
667                prev_id, diff_id, prev_code, diff_code);
668            error!("{}", msg);
669            return Err(BuckyError::new(BuckyErrorCode::Unmatch, msg));
670        }
671
672        let cache = cache.clone();
673        async_std::task::spawn(async move {
674            let new_id = Self::apply_diff(&cache, &prev_id, &diff_id).await?;
675
676            /*
677            // for debug
678            let root = cache.get_object_map(&diff_item.altered.as_ref().unwrap()).await.unwrap();
679            let root = root.unwrap();
680            {
681                let obj = root.lock().await;
682                let id = obj.flush_id_without_cache();
683                assert_eq!(id, *diff_item.altered.as_ref().unwrap());
684            }
685            let mut it = ObjectMapPathIterator::new(root, cache.clone()).await;
686            while !it.is_end() {
687                let list = it.next(1).await.unwrap();
688                info!("altered list: {} {:?}", 1, list.list);
689            }
690
691
692            let root = cache.get_object_map(&new_id).await.unwrap();
693            let root = root.unwrap();
694            {
695                let obj = root.lock().await;
696                let id = obj.flush_id_without_cache();
697                assert_eq!(id, new_id);
698            }
699            let mut it = ObjectMapPathIterator::new(root, cache.clone()).await;
700            while !it.is_end() {
701                let list = it.next(1).await.unwrap();
702                info!("result list: {} {:?}", 1, list.list);
703            }
704            */
705
706            // 校验结果是否一致
707            if Some(new_id) != diff_item.altered {
708                let msg = format!("apply diff but got unmatch result: prev={}, diff={}, expect={:?}, got={}",
709                    prev_id, diff_id, diff_item.altered, new_id);
710                error!("{}", msg);
711                return Err(BuckyError::new(BuckyErrorCode::Unmatch, msg));
712            }
713
714            Ok(new_id)
715        })
716        .await
717    }
718
719    // 对diff的alter内容进行展开,返回一个新的对象(如果有实际的展开内容)
720    async fn expand_altered(
721        cache: &ObjectMapOpEnvCacheRef,
722        diff_id: &ObjectId,
723    ) -> BuckyResult<ObjectId> {
724        let diff = cache.get_object_map(diff_id).await?;
725        if diff.is_none() {
726            let msg = format!("diff object not found! target={}", diff_id);
727            error!("{}", msg);
728            return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
729        }
730        let diff = diff.unwrap();
731
732        let mut it = ObjectMapBindIterator::new_with_target(diff.clone(), cache.clone()).await;
733
734        // 用以保存展开后的alered对象
735        let mut expanded_altered = None;
736
737        while !it.is_end() {
738            let list = it.next(32).await?;
739            for item in list.list {
740                match item {
741                    ObjectMapContentItem::DiffMap((key, item)) => {
742                        if item.prev.is_none() || item.altered.is_none() {
743                            continue;
744                        }
745                        assert!(item.diff.is_none());
746
747                        let prev = item.prev.as_ref().unwrap();
748                        let altered = item.altered.as_ref().unwrap();
749
750                        info!(
751                            "will expand child diff: key={}, prev={}, altered={}",
752                            key, prev, altered
753                        );
754
755                        // prev和altered必须都是ObjectMap,才可以继续递归的diff
756                        let diff_id = Self::diff_and_expand_recursive(cache, prev, altered).await?;
757                        if diff_id != *altered {
758                            // 增加diff字段,替换现有的项
759                            if expanded_altered.is_none() {
760                                // 把diff clone一份
761                                expanded_altered = Some(diff.lock().await.clone());
762                            }
763
764                            let mut expanded_item = item.clone();
765                            expanded_item.diff = Some(diff_id);
766
767                            expanded_altered
768                                .as_mut()
769                                .unwrap()
770                                .diff_set_with_key(cache, &key, &expanded_item, &Some(item), false)
771                                .await?;
772                        }
773                    }
774                    ObjectMapContentItem::DiffSet(_) => {
775                        // set类型不需要展开
776                    }
777                    _ => {
778                        error!("expand altered but with unmatch objectmap content type: diff={}, content_type={:?}", diff_id, item.content_type());
779                    }
780                }
781            }
782        }
783
784        if let Some(expanded_altered) = expanded_altered {
785            let new_diff_id = expanded_altered.flush_id();
786            assert_ne!(new_diff_id, *diff_id);
787            cache.put_object_map(&new_diff_id, expanded_altered, None)?;
788
789            Ok(new_diff_id)
790        } else {
791            Ok(diff_id.clone())
792        }
793    }
794
795    #[async_recursion::async_recursion]
796    async fn diff_and_expand_recursive(
797        cache: &ObjectMapOpEnvCacheRef,
798        prev_id: &ObjectId,
799        altered_id: &ObjectId,
800    ) -> BuckyResult<ObjectId> {
801        let prev_code = prev_id.obj_type_code();
802        let altered_code = altered_id.obj_type_code();
803
804        if prev_code != ObjectTypeCode::ObjectMap || altered_code != ObjectTypeCode::ObjectMap {
805            return Ok(altered_id.to_owned());
806        }
807
808        let cache = cache.clone();
809        let prev_id = prev_id.to_owned();
810        let altered_id = altered_id.to_owned();
811        async_std::task::spawn(async move {
812            let diff_id = Self::diff_objects(&cache, &prev_id, &altered_id, false).await?;
813            Self::expand_altered(&cache, &diff_id).await
814        })
815        .await
816    }
817
818    // 打印一个diff的内容
819    pub async fn dump_diff(cache: &ObjectMapOpEnvCacheRef, diff_id: &ObjectId) -> BuckyResult<()> {
820        let diff = cache.get_object_map(diff_id).await?;
821
822        if diff.is_none() {
823            let msg = format!("diff object not found! target={}", diff_id);
824            error!("{}", msg);
825            return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
826        }
827
828        let diff = diff.unwrap();
829
830        info!("will dump diff objectmap: {}", diff_id);
831
832        let mut it = ObjectMapBindIterator::new_with_target(diff, cache.clone()).await;
833
834        let mut index = 0;
835        while !it.is_end() {
836            let list = it.next(8).await?;
837            for item in list.list {
838                match item {
839                    ObjectMapContentItem::Map((key, value)) => {
840                        info!("[{}] map item: {}={}", index, key, value);
841                        index += 1;
842                    }
843                    ObjectMapContentItem::DiffMap((key, value)) => {
844
845                        info!("[{}] map diff item: {:?}, {}={}", index, value.action(), key, value);
846                        index += 1;
847                    }
848                    ObjectMapContentItem::Set(value) => {
849                        info!("[{}] set item: {}", index, value);
850                        index += 1;
851                    }
852                    ObjectMapContentItem::DiffSet(value) => {
853                        info!("[{}] set diff item: {:?}, {}", index, value.action(), value);
854                        index += 1;
855                    }
856                }
857            }
858        }
859
860        info!("end dump diff objectmap: {}", diff_id);
861
862        Ok(())
863    }
864}
865
866
867#[cfg(test)]
868mod test {
869    use super::super::cache::*;
870    use super::super::object_map::*;
871    use super::*;
872
873    use std::str::FromStr;
874
875    async fn gen_prev(cache: &ObjectMapOpEnvCacheRef) -> ObjectId {
876        let owner = ObjectId::default();
877        let mut map = ObjectMap::new(
878            ObjectMapSimpleContentType::Map,
879            Some(owner.clone()),
880            Some(owner.clone()),
881        )
882        .no_create_time()
883        .build();
884
885        for i in 0..1000 {
886            let key = format!("test_map_{:0>3}", i);
887            let object_id = ObjectId::default();
888            info!("begin insert_with_key: {}", key);
889            map.insert_with_key(&cache, &key, &object_id).await.unwrap();
890            info!("end insert_with_key: {}", key);
891        }
892
893        let id = map.flush_id();
894        cache.put_object_map(&id, map, None).unwrap();
895
896        id
897    }
898
899    async fn gen_next(cache: &ObjectMapOpEnvCacheRef) -> ObjectId {
900        let owner = ObjectId::default();
901        let mut map = ObjectMap::new(
902            ObjectMapSimpleContentType::Map,
903            Some(owner.clone()),
904            Some(owner.clone()),
905        )
906        .no_create_time()
907        .build();
908
909        for i in 500..1500 {
910            let key = format!("test_map_{:0>3}", i);
911
912            //let object_id = ObjectId::default();
913            let object_id =
914                ObjectId::from_str("95RvaS5anntyAoRUBi48vQoivWzX95M8xm4rkB93DdSt").unwrap();
915
916            info!("begin insert_with_key: {}", key);
917            map.insert_with_key(&cache, &key, &object_id).await.unwrap();
918            info!("end insert_with_key: {}", key);
919        }
920
921        let id = map.flush_id();
922        cache.put_object_map(&id, map, None).unwrap();
923
924        id
925    }
926
927    async fn gen_next2(cache: &ObjectMapOpEnvCacheRef) -> ObjectId {
928        let owner = ObjectId::default();
929        let mut map = ObjectMap::new(
930            ObjectMapSimpleContentType::Map,
931            Some(owner.clone()),
932            Some(owner.clone()),
933        )
934        .no_create_time()
935        .build();
936
937        for i in 0..1 {
938            let key = format!("test_map_{:0>3}", i);
939
940            //let object_id = ObjectId::default();
941            let object_id =
942                ObjectId::from_str("95RvaS5anntyAoRUBi48vQoivWzX95M8xm4rkB93DdSt").unwrap();
943
944            info!("begin insert_with_key: {}", key);
945            map.insert_with_key(&cache, &key, &object_id).await.unwrap();
946            info!("end insert_with_key: {}", key);
947        }
948
949        let id = map.flush_id();
950        cache.put_object_map(&id, map, None).unwrap();
951
952        id
953    }
954
955    async fn test_diff() {
956        let noc = ObjectMapMemoryNOCCache::new();
957        let root_cache = ObjectMapRootMemoryCache::new_default_ref(None, noc);
958        let cache = ObjectMapOpEnvMemoryCache::new_ref(root_cache.clone());
959
960        let prev_id = gen_prev(&cache).await;
961        let next_id = gen_next(&cache).await;
962
963        let diff_id = ObjectMapDiff::diff_objects(&cache, &prev_id, &next_id, false)
964            .await
965            .unwrap();
966        ObjectMapDiff::dump_diff(&cache, &diff_id).await.unwrap();
967
968        let new_next_id = ObjectMapDiff::apply_diff(&cache, &prev_id, &diff_id)
969            .await
970            .unwrap();
971
972        /*
973        let new_diff_id = ObjectMapDiff::diff_objects(&cache, &next_id, &new_next_id)
974        .await
975        .unwrap();
976        ObjectMapDiff::dump_diff(&cache, &new_diff_id).await.unwrap();
977        */
978        assert_eq!(new_next_id, next_id);
979    }
980
981    #[test]
982    fn test() {
983        crate::init_simple_log("test-object-map-diff", Some("debug"));
984        async_std::task::block_on(async move {
985            //test_set().await;
986            //test_map().await;
987            test_diff().await;
988        });
989    }
990}
991
992#[cfg(test)]
993mod test_path_diff {
994    use super::super::cache::*;
995    use super::super::object_map::*;
996    use super::super::path::*;
997    use super::super::path_iterator::*;
998    use super::*;
999
1000    use std::str::FromStr;
1001
1002    async fn gen_path1(cache: &ObjectMapOpEnvCacheRef, root_id: &ObjectId) -> ObjectId {
1003        // 这个用以测试objectmap由于不存在无法继续展开的情况
1004        // let x_value = ObjectId::from_str("95RvaS5anntyAoRUBi48vQoivWzX95M8xm4rkB93DdSt").unwrap();
1005        let x_value = ObjectId::from_str("5aSixgPg3hDa1oU9eAtRcKTyVKg5X2bVXWPVhk3U5c7G").unwrap();
1006
1007        let path = ObjectMapPath::new(root_id.clone(), cache.clone(), false);
1008        path.insert_with_path("/a/b/c", &x_value).await.unwrap();
1009        path.insert_with_path("/a/b/d", &x_value).await.unwrap();
1010        //path.insert_with_path("/a/x", &x_value).await.unwrap();
1011        path.insert("/a/z", &x_value).await.unwrap();
1012
1013        path.root()
1014    }
1015
1016    async fn gen_path2(cache: &ObjectMapOpEnvCacheRef, root_id: &ObjectId) -> ObjectId {
1017        // let x_value = ObjectId::from_str("95RvaS5aZKKM8ghTYmsTyhSEWD4pAmALoUSJx1yNxSx5").unwrap();
1018        let x_value = ObjectId::from_str("5aSixgPCivmQfASRbjAvBiwgxhU8LrNtYtC2D6Lis2NQ").unwrap();
1019
1020        let path = ObjectMapPath::new(root_id.clone(), cache.clone(), false);
1021        path.insert_with_path("/a/b/c", &x_value).await.unwrap();
1022        path.insert_with_path("/a/b/d", &x_value).await.unwrap();
1023        path.insert_with_path("/a/x", &x_value).await.unwrap();
1024        path.insert("/a/z", &x_value).await.unwrap();
1025        path.insert("/a/z1", &x_value).await.unwrap();
1026        path.insert("/a/z2", &x_value).await.unwrap();
1027
1028        path.root()
1029    }
1030
1031    async fn test1(cache: &ObjectMapOpEnvCacheRef) {
1032
1033        // 创建一个空的objectmap作为root
1034        let owner = ObjectId::default();
1035        let root = ObjectMap::new(
1036            ObjectMapSimpleContentType::Map,
1037            Some(owner.clone()),
1038            Some(owner.clone()),
1039        )
1040        .no_create_time()
1041        .build();
1042        let root_id = root.flush_id();
1043        cache.put_object_map(&root_id, root, None).unwrap();
1044        info!("new root: {}", root_id);
1045
1046        let path1 = gen_path1(cache, &root_id).await;
1047        let path2 = gen_path2(cache, &root_id).await;
1048
1049        let diff_id = ObjectMapDiff::diff_objects(&cache, &path1, &path2, true).await.unwrap();
1050        ObjectMapDiff::dump_diff(&cache, &diff_id).await.unwrap();
1051
1052        // 测试diff遍历
1053        let root = cache.get_object_map(&diff_id).await.unwrap();
1054        let mut it = ObjectMapPathIterator::new(root.unwrap(), cache.clone(), ObjectMapPathIteratorOption::default()).await;
1055        while !it.is_end() {
1056            let list = it.next(1).await.unwrap();
1057            info!("list: {} {:?}", 1, list.list);
1058        }
1059
1060        // 测试apply
1061        let new_2 = ObjectMapDiff::apply_diff(&cache, &path1, &diff_id).await.unwrap();
1062        let root = cache.get_object_map(&new_2).await.unwrap();
1063        let mut it = ObjectMapPathIterator::new(root.unwrap(), cache.clone(), ObjectMapPathIteratorOption::default()).await;
1064        while !it.is_end() {
1065            let list = it.next(1).await.unwrap();
1066            info!("list: {} {:?}", 1, list.list);
1067        }
1068
1069        assert_eq!(new_2, path2);
1070    }
1071
1072    async fn test_diff() {
1073        let noc = ObjectMapMemoryNOCCache::new();
1074        let root_cache = ObjectMapRootMemoryCache::new_default_ref(None, noc);
1075        let cache = ObjectMapOpEnvMemoryCache::new_ref(root_cache.clone());
1076
1077        test1(&cache).await;
1078    }
1079
1080    #[test]
1081    fn test() {
1082        crate::init_simple_log("test-object-map-path-diff", Some("debug"));
1083        async_std::task::block_on(async move {
1084            test_diff().await;
1085        });
1086    }
1087}