1use petgraph::graphmap::DiGraphMap;
5use petgraph::visit::{IntoNodeIdentifiers, Topo};
6use std::collections::{HashMap, HashSet};
7use std::{fmt::Debug, marker::PhantomData};
8
9use crate::graph::{concurrent_bubbles, split_bubble};
10use crate::group::crdt::{GroupCrdtInnerError, apply_remove_unsafe};
11use crate::group::{
12 AuthorityGraphs, GroupAction, GroupControlMessage, GroupCrdtInnerState, GroupMember,
13 apply_action,
14};
15use crate::traits::{Conditions, IdentityHandle, Operation, OperationId, Resolver};
16
17#[derive(Clone, Debug, Default)]
52pub struct StrongRemove<ID, OP, C, M> {
53 _phantom: PhantomData<(ID, OP, C, M)>,
54}
55
56impl<ID, OP, C, M> Resolver<ID, OP, C, M> for StrongRemove<ID, OP, C, M>
57where
58 ID: IdentityHandle + Ord,
59 OP: OperationId + Ord,
60 C: Conditions,
61 M: Clone + Operation<ID, OP, GroupControlMessage<ID, C>>,
62{
63 type State = GroupCrdtInnerState<ID, OP, C, M>;
64 type Error = GroupCrdtInnerError<OP>;
65
66 fn rebuild_required(y: &Self::State, operation: &M) -> Result<bool, Self::Error> {
68 let dependencies = operation.dependencies().into_iter().collect();
69 Ok(y.heads() != dependencies)
70 }
71
72 fn process(mut y: Self::State) -> Result<Self::State, Self::Error> {
75 y.ignore = HashSet::default();
77 y.mutual_removes = HashSet::default();
78
79 let bubbles = concurrent_bubbles(&y.graph);
84 let mut processed_bubbles = Vec::new();
85
86 let mut topo = Topo::new(&y.graph);
87 while let Some(operation_id) = topo.next(&y.graph) {
88 let bubble = bubbles.iter().find(|bubble| bubble.contains(&operation_id));
89
90 let Some(bubble) = bubble else {
91 y = StrongRemove::apply_operation(y, operation_id);
92 continue;
93 };
94
95 if processed_bubbles.iter().any(|b| b == &bubble) {
96 continue;
97 };
98
99 y = StrongRemove::process_bubble(y, bubble);
100 processed_bubbles.push(bubble);
101 }
102
103 Ok(y)
104 }
105}
106
107impl<ID, OP, C, M> StrongRemove<ID, OP, C, M>
108where
109 ID: IdentityHandle + Ord,
110 OP: OperationId + Ord,
111 C: Conditions,
112 M: Clone + Operation<ID, OP, GroupControlMessage<ID, C>>,
113{
114 fn process_bubble(
120 mut y: GroupCrdtInnerState<ID, OP, C, M>,
121 bubble: &HashSet<OP>,
122 ) -> GroupCrdtInnerState<ID, OP, C, M> {
123 let bubble_graph = {
125 let non_bubble_operations: Vec<_> = y
126 .graph
127 .node_identifiers()
128 .filter(|n| !bubble.contains(n))
129 .collect();
130
131 let mut bubble_graph = y.graph.clone();
132 for node_id in non_bubble_operations {
133 bubble_graph.remove_node(node_id);
134 }
135 bubble_graph
136 };
137
138 let mut authority_graphs = Self::build_authority_graphs(&y.operations, &bubble_graph);
140 y = Self::compute_filter(y, bubble, &mut authority_graphs);
141
142 let mut topo = Topo::new(&bubble_graph);
149 while let Some(operation_id) = topo.next(&bubble_graph) {
150 y = Self::apply_operation(y, operation_id);
151 }
152 y
153 }
154
155 fn is_mutual_remove(operation: &M, authority_graphs: &mut AuthorityGraphs<ID, OP>) -> bool {
157 let removed = removed_or_demoted_manager(operation);
158 let added = added_or_promoted_manager(operation);
159 if removed.is_none() && added.is_none() {
160 return false;
161 }
162
163 let group_id = operation.payload().group_id();
164 authority_graphs.is_cycle(&group_id, &operation.id())
165 }
166
167 fn is_readd(group_id: ID, removed: ID, operation: &M) -> bool {
169 if group_id != operation.payload().group_id() {
170 return false;
171 }
172
173 let GroupAction::Add { member: added, .. } = &operation.payload().action else {
174 return false;
175 };
176
177 added.id() == removed
178 }
179
180 fn is_removed(group_id: ID, removed: ID, operation: &M) -> bool {
182 if group_id != operation.payload().group_id() {
183 return false;
184 }
185
186 operation.author() == removed
187 }
188
189 fn compute_filter(
191 mut y: GroupCrdtInnerState<ID, OP, C, M>,
192 bubble: &HashSet<OP>,
193 authority_graphs: &mut AuthorityGraphs<ID, OP>,
194 ) -> GroupCrdtInnerState<ID, OP, C, M> {
195 let mut filter = HashSet::new();
197
198 let mut mutual_removes = HashSet::new();
201
202 for operation_id in bubble {
204 let operation = y
205 .operations
206 .get(operation_id)
207 .expect("all operations present in map");
208
209 let Some(removed) = removed_or_demoted_manager(operation) else {
211 continue;
212 };
213
214 if Self::is_mutual_remove(operation, authority_graphs) {
216 mutual_removes.insert(*operation_id);
217 }
218
219 let group_id = operation.payload().group_id();
221 let (mut concurrent, ..) = split_bubble(&y.graph, bubble, *operation_id);
222 concurrent.retain(|id| {
223 let concurrent_operation =
224 y.operations.get(id).expect("all operations present in map");
225
226 let is_readd = Self::is_readd(group_id, removed, concurrent_operation);
228 let is_removed = Self::is_removed(group_id, removed, concurrent_operation);
230 is_removed || is_readd
231 });
232
233 filter.extend(concurrent.iter());
234 }
235
236 y.ignore = filter;
237 y.mutual_removes = mutual_removes;
238 y
239 }
240
241 fn build_authority_graphs(
243 operations: &HashMap<OP, M>,
244 bubble_graph: &DiGraphMap<OP, ()>,
245 ) -> AuthorityGraphs<ID, OP> {
246 let mut authority_graphs = AuthorityGraphs::new(bubble_graph.clone());
247
248 for id in bubble_graph.nodes() {
250 let operation = operations.get(&id).expect("all operations present in map");
251 let author = operation.author();
252 let group_id = operation.payload().group_id();
253
254 if let Some(removed) = removed_or_demoted_manager(operation) {
256 authority_graphs.add_removal(group_id, author, removed, id);
257 };
258
259 if let Some(added) = added_or_promoted_manager(operation) {
261 authority_graphs.add_delegation(group_id, author, added, id);
262 };
263 }
264
265 authority_graphs
266 }
267
268 fn apply_operation(
270 mut y: GroupCrdtInnerState<ID, OP, C, M>,
271 operation_id: OP,
272 ) -> GroupCrdtInnerState<ID, OP, C, M> {
273 let operation = y
274 .operations
275 .get(&operation_id)
276 .expect("all processed operations exist");
277
278 let dependencies = HashSet::from_iter(operation.dependencies().clone());
279
280 let mut groups_y = y
281 .state_at(&dependencies)
282 .expect("all state objects to exist");
283
284 groups_y = if !y.mutual_removes.contains(&operation_id) {
285 apply_action(
286 groups_y,
287 operation.payload().group_id(),
288 operation.id(),
289 operation.author(),
290 &operation.payload().action,
291 &y.ignore,
292 )
293 .state()
294 .to_owned()
295 } else {
296 let Some(removed) = removed_or_demoted_manager(operation) else {
297 unreachable!();
298 };
299
300 apply_remove_unsafe(
303 groups_y,
304 operation.payload().group_id(),
305 GroupMember::Individual(removed),
306 )
307 };
308 y.states.insert(operation.id(), groups_y);
309 y
310 }
311}
312
313fn removed_or_demoted_manager<ID, OP, C, M>(operation: &M) -> Option<ID>
316where
317 ID: IdentityHandle + Ord,
318 OP: OperationId + Ord,
319 C: Conditions,
320 M: Clone + Operation<ID, OP, GroupControlMessage<ID, C>>,
321{
322 let action = operation.payload().action;
323 if let GroupAction::Remove { member: removed } = action {
324 return Some(removed.id());
325 }
326
327 if let GroupAction::Demote {
328 member: demoted,
329 access,
330 } = action
331 && !access.is_manage()
332 {
333 return Some(demoted.id());
334 }
335
336 None
337}
338
339fn added_or_promoted_manager<ID, OP, C, M>(operation: &M) -> Option<ID>
342where
343 ID: IdentityHandle + Ord,
344 OP: OperationId + Ord,
345 C: Conditions,
346 M: Clone + Operation<ID, OP, GroupControlMessage<ID, C>>,
347{
348 let action = operation.payload().action;
349 if let GroupAction::Add {
350 member: added,
351 access,
352 } = &action
353 && access.is_manage()
354 {
355 return Some(added.id());
356 }
357
358 if let GroupAction::Promote {
359 member: promoted,
360 access,
361 } = action
362 && access.is_manage()
363 {
364 return Some(promoted.id());
365 }
366
367 None
368}
369
370#[cfg(test)]
371mod tests {
372 use crate::Access;
373 use crate::group::GroupMember;
374 use crate::test_utils::no_ord::TestGroupState;
375 use crate::test_utils::{
376 MemberId, add_member, assert_members, create_group, demote_member, remove_member, sync,
377 };
378
379 use super::*;
380
381 const G0: char = '0';
382 const G1: char = '1';
383
384 const ALICE: char = 'A';
385 const BOB: char = 'B';
386 const CLAIRE: char = 'C';
387 const DAVE: char = 'D';
388 const EVE: char = 'E';
389 const FRANK: char = 'F';
390 const GRACE: char = 'G';
391
392 #[test]
393 fn mutual_removal_filter() {
394 let y = TestGroupState::new(());
405
406 let op0 = create_group(
408 ALICE,
409 0,
410 G1,
411 vec![
412 (GroupMember::Individual(ALICE), Access::manage()),
413 (GroupMember::Individual(BOB), Access::manage()),
414 (GroupMember::Individual(CLAIRE), Access::manage()),
415 ],
416 vec![],
417 );
418
419 let op1 = remove_member(ALICE, 1, G1, GroupMember::Individual(BOB), vec![op0.id()]);
421
422 let op2 = remove_member(BOB, 2, G1, GroupMember::Individual(ALICE), vec![op0.id()]);
424
425 let expected = vec![(CLAIRE, Access::manage())];
426 let y_i = sync(y, &[op0, op1, op2]);
427 assert_members(&y_i, G1, &expected);
428
429 assert_eq!(y_i.inner.ignore.len(), 2);
430 assert_eq!(y_i.inner.mutual_removes.len(), 2);
431 }
432
433 #[test]
434 fn mutual_remove_cycles_detected() {
435 let y = TestGroupState::new(());
448
449 let op0 = create_group(
451 ALICE,
452 0,
453 G1,
454 vec![
455 (GroupMember::Individual(ALICE), Access::manage()),
456 (GroupMember::Individual(BOB), Access::manage()),
457 (GroupMember::Individual(CLAIRE), Access::manage()),
458 ],
459 vec![],
460 );
461
462 let op1 = remove_member(ALICE, 1, G1, GroupMember::Individual(BOB), vec![op0.id()]);
464
465 let op2 = remove_member(BOB, 2, G1, GroupMember::Individual(CLAIRE), vec![op0.id()]);
467
468 let op3 = remove_member(
470 CLAIRE,
471 3,
472 G1,
473 GroupMember::Individual(ALICE),
474 vec![op0.id()],
475 );
476
477 let expected: Vec<(MemberId, Access)> = vec![];
478
479 let y_i = sync(y, &[op0, op1, op2, op3]);
480 assert_members(&y_i, G1, &expected);
481
482 assert_eq!(y_i.inner.ignore.len(), 3);
483 assert_eq!(y_i.inner.mutual_removes.len(), 3);
484 }
485
486 #[test]
487 fn mutual_remove_cycle_with_delegation() {
488 let y = TestGroupState::new(());
504
505 let op0 = create_group(
507 ALICE,
508 0,
509 G1,
510 vec![
511 (GroupMember::Individual(ALICE), Access::manage()),
512 (GroupMember::Individual(BOB), Access::manage()),
513 (GroupMember::Individual(CLAIRE), Access::manage()),
514 ],
515 vec![],
516 );
517
518 let op1 = remove_member(ALICE, 1, G1, GroupMember::Individual(BOB), vec![op0.id()]);
520
521 let op2 = remove_member(BOB, 2, G1, GroupMember::Individual(CLAIRE), vec![op0.id()]);
523
524 let op3 = add_member(
526 CLAIRE,
527 3,
528 G1,
529 GroupMember::Individual(DAVE),
530 Access::manage(),
531 vec![op0.id()],
532 );
533
534 let op4 = remove_member(DAVE, 4, G1, GroupMember::Individual(ALICE), vec![op3.id()]);
536
537 let expected: Vec<(MemberId, Access)> = vec![]; let y_i = sync(y, &[op0, op1, op2, op3, op4]);
540 assert_members(&y_i, G1, &expected);
541 assert_eq!(y_i.inner.ignore.len(), 3);
542 assert_eq!(y_i.inner.mutual_removes.len(), 3);
543 }
544
545 #[test]
546 fn demote_remove_filter() {
547 let y = TestGroupState::new(());
558
559 let op0 = create_group(
561 ALICE,
562 0,
563 G1,
564 vec![
565 (GroupMember::Individual(ALICE), Access::manage()),
566 (GroupMember::Individual(BOB), Access::manage()),
567 (GroupMember::Individual(CLAIRE), Access::manage()),
568 ],
569 vec![],
570 );
571
572 let op1 = demote_member(
574 ALICE,
575 1,
576 G1,
577 GroupMember::Individual(BOB),
578 Access::write(),
579 vec![op0.id()],
580 );
581
582 let op2 = remove_member(BOB, 2, G1, GroupMember::Individual(CLAIRE), vec![op0.id()]);
584
585 let expected = vec![
586 (ALICE, Access::manage()),
587 (BOB, Access::write()),
588 (CLAIRE, Access::manage()),
589 ];
590
591 let y_final = sync(y, &[op0, op1, op2]);
592 assert_members(&y_final, G1, &expected);
593 }
594
595 #[test]
596 fn demote_add_filter() {
597 let y = TestGroupState::new(());
608
609 let op0 = create_group(
611 ALICE,
612 0,
613 G1,
614 vec![
615 (GroupMember::Individual(ALICE), Access::manage()),
616 (GroupMember::Individual(BOB), Access::manage()),
617 (GroupMember::Individual(CLAIRE), Access::manage()),
618 ],
619 vec![],
620 );
621
622 let op1 = demote_member(
624 ALICE,
625 1,
626 G1,
627 GroupMember::Individual(BOB),
628 Access::write(),
629 vec![op0.id()],
630 );
631
632 let op2 = add_member(
634 BOB,
635 2,
636 G1,
637 GroupMember::Individual(DAVE),
638 Access::read(),
639 vec![op0.id()],
640 );
641
642 let expected = vec![
643 (ALICE, Access::manage()),
644 (BOB, Access::write()),
645 (CLAIRE, Access::manage()),
646 ];
647
648 let y_i = sync(y, &[op0, op1, op2]);
649 assert_members(&y_i, G1, &expected);
650 }
651
652 #[test]
653 fn remove_dependencies_filter() {
654 let y = TestGroupState::new(());
669
670 let op0 = create_group(
672 ALICE,
673 0,
674 G1,
675 vec![
676 (GroupMember::Individual(ALICE), Access::manage()),
677 (GroupMember::Individual(BOB), Access::manage()),
678 ],
679 vec![],
680 );
681
682 let op1 = remove_member(ALICE, 1, G1, GroupMember::Individual(BOB), vec![op0.id()]);
684
685 let op2 = add_member(
687 BOB,
688 2,
689 G1,
690 GroupMember::Individual(CLAIRE),
691 Access::manage(),
692 vec![op0.id()],
693 );
694
695 let op3 = add_member(
697 CLAIRE,
698 3,
699 G1,
700 GroupMember::Individual(DAVE),
701 Access::read(),
702 vec![op2.id()],
703 );
704
705 let expected_members = vec![(ALICE, Access::manage())];
707 let y_i = sync(y, &[op0, op1, op2, op3]);
708 assert_members(&y_i, G1, &expected_members);
709 }
710
711 #[test]
712 fn remove_readd_dependencies_filter() {
713 let y = TestGroupState::new(());
731
732 let op0 = create_group(
734 ALICE,
735 0,
736 G1,
737 vec![
738 (GroupMember::Individual(ALICE), Access::manage()),
739 (GroupMember::Individual(BOB), Access::manage()),
740 (GroupMember::Individual(CLAIRE), Access::manage()),
741 ],
742 vec![],
743 );
744
745 let op1 = remove_member(ALICE, 1, G1, GroupMember::Individual(BOB), vec![op0.id()]);
747
748 let op2 = add_member(
750 ALICE,
751 2,
752 G1,
753 GroupMember::Individual(BOB),
754 Access::manage(),
755 vec![op1.id()],
756 );
757
758 let op3 = add_member(
760 BOB,
761 3,
762 G1,
763 GroupMember::Individual(DAVE),
764 Access::read(),
765 vec![op0.id()],
766 );
767
768 let op4 = add_member(
770 BOB,
771 4,
772 G1,
773 GroupMember::Individual(EVE),
774 Access::read(),
775 vec![op2.id(), op3.id()],
776 );
777
778 let expected = vec![
779 (ALICE, Access::manage()),
780 (BOB, Access::manage()),
781 (CLAIRE, Access::manage()),
782 (EVE, Access::read()),
783 ];
784
785 let y_final = sync(y, &[op0, op1, op2, op3, op4]);
786 assert_members(&y_final, G1, &expected);
787 }
788
789 #[test]
790 fn two_bubbles() {
791 let y = TestGroupState::new(());
815
816 let op0 = create_group(
818 ALICE,
819 0,
820 G0,
821 vec![
822 (GroupMember::Individual(ALICE), Access::manage()),
823 (GroupMember::Individual(BOB), Access::manage()),
824 ],
825 vec![],
826 );
827
828 let op1 = remove_member(ALICE, 1, G0, GroupMember::Individual(BOB), vec![op0.id()]);
830
831 let op2 = add_member(
833 BOB,
834 2,
835 G0,
836 GroupMember::Individual(CLAIRE),
837 Access::read(),
838 vec![op0.id()],
839 );
840
841 let op3 = add_member(
843 ALICE,
844 3,
845 G0,
846 GroupMember::Individual(DAVE),
847 Access::manage(),
848 vec![op1.id(), op2.id()],
849 );
850
851 let op4 = add_member(
853 DAVE,
854 4,
855 G0,
856 GroupMember::Individual(EVE),
857 Access::read(),
858 vec![op3.id()],
859 );
860
861 let op5 = add_member(
863 ALICE,
864 5,
865 G0,
866 GroupMember::Individual(FRANK),
867 Access::manage(),
868 vec![op4.id()],
869 );
870
871 let op6 = add_member(
873 FRANK,
874 6,
875 G0,
876 GroupMember::Individual(GRACE),
877 Access::read(),
878 vec![op5.id()],
879 );
880
881 let op7 = remove_member(DAVE, 7, G0, GroupMember::Individual(ALICE), vec![op4.id()]);
883
884 let expected_members = vec![(DAVE, Access::manage()), (EVE, Access::read())];
885 let y_i = sync(y, &[op0, op1, op2, op3, op4, op5, op6, op7]);
886 assert_members(&y_i, G0, &expected_members);
887 }
888
889 #[test]
890 fn concurrent_readds_filtered() {
891 let y = TestGroupState::new(());
906
907 let op0 = create_group(
909 ALICE,
910 0,
911 G1,
912 vec![
913 (GroupMember::Individual(ALICE), Access::manage()),
914 (GroupMember::Individual(BOB), Access::manage()),
915 (GroupMember::Individual(CLAIRE), Access::manage()),
916 ],
917 vec![],
918 );
919
920 let op1 = remove_member(ALICE, 1, G1, GroupMember::Individual(BOB), vec![op0.id()]);
922
923 let op2 = add_member(
925 ALICE,
926 2,
927 G1,
928 GroupMember::Individual(BOB),
929 Access::manage(),
930 vec![op1.id()],
931 );
932
933 let op3 = remove_member(CLAIRE, 3, G1, GroupMember::Individual(BOB), vec![op0.id()]);
935
936 let expected = vec![(ALICE, Access::manage()), (CLAIRE, Access::manage())];
937
938 let y_final = sync(y, &[op0, op1, op2, op3]);
939 assert_members(&y_final, G1, &expected);
940 }
941
942 #[test]
943 fn filter_only_concurrent_operations() {
944 let y = TestGroupState::new(());
945
946 let op0 = create_group(
948 ALICE,
949 0,
950 G1,
951 vec![
952 (GroupMember::Individual(ALICE), Access::manage()),
953 (GroupMember::Individual(BOB), Access::manage()),
954 ],
955 vec![],
956 );
957
958 let op1 = add_member(
960 BOB,
961 1,
962 G1,
963 GroupMember::Individual(CLAIRE),
964 Access::read(),
965 vec![op0.id()],
966 );
967
968 let op2 = demote_member(
970 ALICE,
971 2,
972 G1,
973 GroupMember::Individual(BOB),
974 Access::write(),
975 vec![op1.id()],
976 );
977
978 let op3 = add_member(
980 BOB,
981 3,
982 G1,
983 GroupMember::Individual(DAVE),
984 Access::read(),
985 vec![op1.id()],
986 );
987
988 let expected = vec![
989 (ALICE, Access::manage()),
990 (BOB, Access::write()),
991 (CLAIRE, Access::read()),
992 ];
993 let y_i = sync(y, &[op0, op1, op2, op3]);
994 assert_members(&y_i, G1, &expected);
995 }
996}