1use std::collections::{HashMap, HashSet};
31use std::hash::Hash;
32
33use super::merge::Merge;
34use super::{OrSet, Timestamp};
35
36#[derive(Debug, Clone, PartialEq, Eq)]
42pub enum OrSetOp<T> {
43 Add(T, Timestamp),
45 Remove(T, Vec<Timestamp>),
47}
48
49impl<T: Hash + Eq + Clone> OrSet<T> {
50 #[must_use]
52 pub fn new() -> Self {
53 Self {
54 elements: HashSet::new(),
55 tombstone: HashSet::new(),
56 }
57 }
58
59 pub fn add(&mut self, value: T, tag: Timestamp) {
65 self.elements.insert((value, tag));
66 }
67
68 pub fn remove(&mut self, value: &T) -> Vec<Timestamp>
76 where
77 T: Eq + Hash,
78 {
79 let active_tags: Vec<Timestamp> = self
81 .elements
82 .iter()
83 .filter(|(v, _)| v == value)
84 .map(|(_, ts)| ts.clone())
85 .collect();
86
87 for tag in &active_tags {
89 self.tombstone.insert((value.clone(), tag.clone()));
90 }
91
92 active_tags
93 }
94
95 pub fn remove_specific(&mut self, value: &T, tags: &[Timestamp])
99 where
100 T: Eq + Hash,
101 {
102 for tag in tags {
103 self.tombstone.insert((value.clone(), tag.clone()));
104 }
105 }
106
107 pub fn contains(&self, value: &T) -> bool {
112 self.elements
113 .iter()
114 .any(|(v, ts)| v == value && !self.tombstone.contains(&(value.clone(), ts.clone())))
115 }
116
117 #[must_use]
121 pub fn values(&self) -> HashSet<&T> {
122 let mut result = HashSet::new();
123 for (value, tag) in &self.elements {
124 if !self.tombstone.contains(&(value.clone(), tag.clone())) {
125 result.insert(value);
126 }
127 }
128 result
129 }
130
131 #[must_use]
133 pub fn len(&self) -> usize {
134 self.values().len()
135 }
136
137 #[must_use]
139 pub fn is_empty(&self) -> bool {
140 self.len() == 0
141 }
142
143 pub fn active_tags(&self, value: &T) -> Vec<&Timestamp> {
145 self.elements
146 .iter()
147 .filter(|(v, ts)| v == value && !self.tombstone.contains(&(value.clone(), ts.clone())))
148 .map(|(_, ts)| ts)
149 .collect()
150 }
151
152 pub fn apply(&mut self, op: OrSetOp<T>) {
154 match op {
155 OrSetOp::Add(value, tag) => {
156 self.add(value, tag);
157 }
158 OrSetOp::Remove(value, observed_tags) => {
159 for tag in observed_tags {
161 self.tombstone.insert((value.clone(), tag));
162 }
163 }
164 }
165 }
166}
167
168impl<T: Hash + Eq + Clone> Default for OrSet<T> {
169 fn default() -> Self {
170 Self::new()
171 }
172}
173
174impl<T: Eq + Hash + Clone> Merge for OrSet<T> {
187 fn merge(&mut self, other: Self) {
188 self.elements.extend(other.elements);
189 self.tombstone.extend(other.tombstone);
190 }
191}
192
193use crate::dag::graph::EventDag;
198use crate::dag::replay::{DivergentReplay, ReplayError, replay_divergent};
199use crate::event::Event;
200use crate::event::data::{AssignAction, EventData};
201use crate::event::types::EventType;
202
203#[derive(Debug, Clone)]
208pub enum OrSetField {
209 Labels,
212 Assignees,
214 BlockedBy,
216 RelatedTo,
218}
219
220pub fn ops_from_events(
239 events: &[Event],
240 field: &OrSetField,
241 base_state: Option<&OrSet<String>>,
242 dag: Option<&EventDag>,
243) -> Vec<OrSetOp<String>> {
244 let mut current_set = base_state.map_or_else(OrSet::new, Clone::clone);
245 let mut tag_map: HashMap<Timestamp, String> = HashMap::new();
246 let mut ops = Vec::new();
247
248 for event in events {
249 let tag = event_to_timestamp(event);
250 if let Some(op) = dispatch_event(event, field, &tag, dag, &tag_map, &mut current_set) {
251 if let OrSetOp::Add(_, ref t) = op {
252 tag_map.insert(t.clone(), event.event_hash.clone());
253 }
254 ops.push(op);
255 }
256 }
257
258 ops
259}
260
261fn dispatch_event(
263 event: &Event,
264 field: &OrSetField,
265 tag: &Timestamp,
266 dag: Option<&EventDag>,
267 tag_map: &HashMap<Timestamp, String>,
268 current_set: &mut OrSet<String>,
269) -> Option<OrSetOp<String>> {
270 match field {
271 OrSetField::Assignees => handle_assignee(event, tag, dag, tag_map, current_set),
272 OrSetField::Labels => handle_label(event, tag, dag, tag_map, current_set),
273 OrSetField::BlockedBy => handle_link(
274 event,
275 tag,
276 dag,
277 tag_map,
278 current_set,
279 &["blocks", "blocked_by"],
280 ),
281 OrSetField::RelatedTo => handle_link(
282 event,
283 tag,
284 dag,
285 tag_map,
286 current_set,
287 &["related_to", "related"],
288 ),
289 }
290}
291
292fn visible_tags(
294 candidates: Vec<&Timestamp>,
295 event: &Event,
296 dag: Option<&EventDag>,
297 tag_map: &HashMap<Timestamp, String>,
298) -> Vec<Timestamp> {
299 candidates
300 .into_iter()
301 .filter(|t| {
302 dag.is_none_or(|dag_ref| {
303 tag_map
304 .get(t)
305 .is_none_or(|tag_hash| dag_ref.is_ancestor(tag_hash, &event.event_hash))
306 })
307 })
308 .cloned()
309 .collect()
310}
311
312fn record_add(value: String, tag: &Timestamp, current_set: &mut OrSet<String>) -> OrSetOp<String> {
314 let op = OrSetOp::Add(value.clone(), tag.clone());
315 current_set.add(value, tag.clone());
316 op
317}
318
319fn record_remove(
321 value: String,
322 event: &Event,
323 dag: Option<&EventDag>,
324 tag_map: &HashMap<Timestamp, String>,
325 current_set: &mut OrSet<String>,
326) -> OrSetOp<String> {
327 let candidate_tags = current_set.active_tags(&value);
328 let observed_tags = visible_tags(candidate_tags, event, dag, tag_map);
329 current_set.remove_specific(&value, &observed_tags);
330 OrSetOp::Remove(value, observed_tags)
331}
332
333fn handle_assignee(
334 event: &Event,
335 tag: &Timestamp,
336 dag: Option<&EventDag>,
337 tag_map: &HashMap<Timestamp, String>,
338 current_set: &mut OrSet<String>,
339) -> Option<OrSetOp<String>> {
340 if let EventData::Assign(data) = &event.data {
341 return match data.action {
342 AssignAction::Assign => Some(record_add(data.agent.clone(), tag, current_set)),
343 AssignAction::Unassign => Some(record_remove(
344 data.agent.clone(),
345 event,
346 dag,
347 tag_map,
348 current_set,
349 )),
350 };
351 }
352 None
353}
354
355fn handle_label(
356 event: &Event,
357 tag: &Timestamp,
358 dag: Option<&EventDag>,
359 tag_map: &HashMap<Timestamp, String>,
360 current_set: &mut OrSet<String>,
361) -> Option<OrSetOp<String>> {
362 if event.event_type != EventType::Update {
363 return None;
364 }
365 let EventData::Update(data) = &event.data else {
366 return None;
367 };
368 if data.field != "labels" {
369 return None;
370 }
371 let obj = data.value.as_object()?;
372 let action_str = obj.get("action")?.as_str().unwrap_or("");
373 let label_str = obj.get("label")?.as_str().unwrap_or("").to_string();
374
375 match action_str {
376 "add" => Some(record_add(label_str, tag, current_set)),
377 "remove" => Some(record_remove(label_str, event, dag, tag_map, current_set)),
378 _ => None,
379 }
380}
381
382fn handle_link(
383 event: &Event,
384 tag: &Timestamp,
385 dag: Option<&EventDag>,
386 tag_map: &HashMap<Timestamp, String>,
387 current_set: &mut OrSet<String>,
388 link_types: &[&str],
389) -> Option<OrSetOp<String>> {
390 match event.event_type {
391 EventType::Link => {
392 if let EventData::Link(data) = &event.data
393 && link_types.contains(&data.link_type.as_str())
394 {
395 return Some(record_add(data.target.clone(), tag, current_set));
396 }
397 }
398 EventType::Unlink => {
399 if let EventData::Unlink(data) = &event.data {
400 let matches = data
401 .link_type
402 .as_ref()
403 .is_none_or(|lt| link_types.contains(<.as_str()));
404 if matches {
405 return Some(record_remove(
406 data.target.clone(),
407 event,
408 dag,
409 tag_map,
410 current_set,
411 ));
412 }
413 }
414 }
415 _ => {}
416 }
417 None
418}
419
420#[must_use]
425pub fn materialize_from_events(events: &[Event], field: &OrSetField) -> OrSet<String> {
426 let ops = ops_from_events(events, field, None, None);
427 let mut set = OrSet::new();
428 for op in ops {
429 set.apply(op);
430 }
431 set
432}
433
434pub fn materialize_from_replay(
454 dag: &EventDag,
455 tip_a: &str,
456 tip_b: &str,
457 base_state: &OrSet<String>,
458 field: &OrSetField,
459) -> Result<OrSet<String>, ReplayError> {
460 let replay = replay_divergent(dag, tip_a, tip_b)?;
461 Ok(apply_replay(base_state, &replay, field, Some(dag)))
462}
463
464#[must_use]
469pub fn apply_replay(
470 base_state: &OrSet<String>,
471 replay: &DivergentReplay,
472 field: &OrSetField,
473 dag: Option<&EventDag>,
474) -> OrSet<String> {
475 let mut result = base_state.clone();
477
478 let ops = ops_from_events(&replay.merged, field, Some(base_state), dag);
482 for op in ops {
483 result.apply(op);
484 }
485
486 result
487}
488
489fn event_to_timestamp(event: &Event) -> Timestamp {
494 use chrono::TimeZone;
495
496 let epoch_secs = event.wall_ts_us / 1_000_000;
497 let subsec_nanos = u32::try_from((event.wall_ts_us % 1_000_000) * 1_000).unwrap_or(0);
498 let wall = chrono::Utc.timestamp_opt(epoch_secs, subsec_nanos).unwrap();
499
500 let actor = hash_str_to_u64(&event.agent);
502
503 let event_hash_u64 = hash_str_to_u64(&event.event_hash);
505
506 let itc = event.wall_ts_us.cast_unsigned();
508
509 Timestamp {
510 wall,
511 actor,
512 event_hash: event_hash_u64,
513 itc,
514 }
515}
516
517fn hash_str_to_u64(s: &str) -> u64 {
519 use std::hash::{Hash, Hasher};
520 let mut hasher = std::collections::hash_map::DefaultHasher::new();
521 s.hash(&mut hasher);
522 hasher.finish()
523}
524
525#[cfg(test)]
530mod tests {
531 use super::*;
532 use crate::event::data::{AssignAction, AssignData, EventData};
533 use crate::event::{Event, EventType};
534 use chrono::{TimeZone, Utc};
535 use std::collections::BTreeMap;
536
537 fn make_tag(wall_secs: i64, actor: u64, event_hash: u64) -> Timestamp {
538 Timestamp {
539 wall: Utc.timestamp_opt(wall_secs, 0).unwrap(),
540 actor,
541 event_hash,
542 itc: wall_secs as u64,
543 }
544 }
545
546 #[test]
551 fn new_orset_is_empty() {
552 let set: OrSet<String> = OrSet::new();
553 assert!(set.is_empty());
554 assert_eq!(set.len(), 0);
555 assert!(set.values().is_empty());
556 }
557
558 #[test]
559 fn add_single_element() {
560 let mut set: OrSet<String> = OrSet::new();
561 let tag = make_tag(1, 1, 100);
562 set.add("alice".into(), tag);
563
564 assert!(set.contains(&"alice".into()));
565 assert!(!set.contains(&"bob".into()));
566 assert_eq!(set.len(), 1);
567 }
568
569 #[test]
570 fn add_multiple_elements() {
571 let mut set: OrSet<String> = OrSet::new();
572 set.add("alice".into(), make_tag(1, 1, 100));
573 set.add("bob".into(), make_tag(2, 1, 101));
574 set.add("charlie".into(), make_tag(3, 1, 102));
575
576 assert_eq!(set.len(), 3);
577 assert!(set.contains(&"alice".into()));
578 assert!(set.contains(&"bob".into()));
579 assert!(set.contains(&"charlie".into()));
580 }
581
582 #[test]
583 fn add_same_element_twice_with_different_tags() {
584 let mut set: OrSet<String> = OrSet::new();
585 set.add("alice".into(), make_tag(1, 1, 100));
586 set.add("alice".into(), make_tag(2, 2, 101));
587
588 assert_eq!(set.len(), 1);
590 assert!(set.contains(&"alice".into()));
591 assert_eq!(set.active_tags(&"alice".into()).len(), 2);
592 }
593
594 #[test]
595 fn remove_element() {
596 let mut set: OrSet<String> = OrSet::new();
597 set.add("alice".into(), make_tag(1, 1, 100));
598 assert!(set.contains(&"alice".into()));
599
600 let removed = set.remove(&"alice".into());
601 assert_eq!(removed.len(), 1);
602 assert!(!set.contains(&"alice".into()));
603 assert!(set.is_empty());
604 }
605
606 #[test]
607 fn remove_nonexistent_element() {
608 let mut set: OrSet<String> = OrSet::new();
609 let removed = set.remove(&"alice".into());
610 assert!(removed.is_empty());
611 assert!(set.is_empty());
612 }
613
614 #[test]
615 fn add_remove_add_cycle() {
616 let mut set: OrSet<String> = OrSet::new();
617
618 set.add("alice".into(), make_tag(1, 1, 100));
620 assert!(set.contains(&"alice".into()));
621
622 set.remove(&"alice".into());
624 assert!(!set.contains(&"alice".into()));
625
626 set.add("alice".into(), make_tag(3, 1, 102));
628 assert!(set.contains(&"alice".into()));
629 assert_eq!(set.active_tags(&"alice".into()).len(), 1);
630 }
631
632 #[test]
633 fn multiple_add_remove_cycles() {
634 let mut set: OrSet<String> = OrSet::new();
635
636 for i in 0..5 {
637 set.add("x".into(), make_tag(i * 2, 1, (i * 2) as u64));
638 assert!(set.contains(&"x".into()));
639 set.remove(&"x".into());
640 assert!(!set.contains(&"x".into()));
641 }
642
643 set.add("x".into(), make_tag(100, 1, 999));
645 assert!(set.contains(&"x".into()));
646 assert_eq!(set.len(), 1);
647 }
648
649 #[test]
654 fn concurrent_add_remove_add_wins() {
655 let tag1 = make_tag(1, 1, 100);
661 let mut base: OrSet<String> = OrSet::new();
662 base.add("x".into(), tag1.clone());
663
664 let mut agent_a = base.clone();
666 agent_a.remove(&"x".into());
667 assert!(!agent_a.contains(&"x".into()));
668
669 let mut agent_b = base.clone();
671 let tag2 = make_tag(2, 2, 200);
672 agent_b.add("x".into(), tag2.clone());
673
674 let mut merged_ab = agent_a.clone();
676 merged_ab.merge(agent_b.clone());
677 assert!(
678 merged_ab.contains(&"x".into()),
679 "add-wins: concurrent add should survive remove"
680 );
681
682 let mut merged_ba = agent_b.clone();
684 merged_ba.merge(agent_a.clone());
685 assert!(
686 merged_ba.contains(&"x".into()),
687 "add-wins: merge must be commutative"
688 );
689 }
690
691 #[test]
692 fn concurrent_adds_both_present() {
693 let mut agent_a: OrSet<String> = OrSet::new();
695 agent_a.add("x".into(), make_tag(1, 1, 100));
696
697 let mut agent_b: OrSet<String> = OrSet::new();
698 agent_b.add("x".into(), make_tag(1, 2, 200));
699
700 let mut merged = agent_a.clone();
701 merged.merge(agent_b);
702
703 assert!(merged.contains(&"x".into()));
704 assert_eq!(merged.active_tags(&"x".into()).len(), 2);
706 }
707
708 #[test]
709 fn causal_remove_after_add_element_absent() {
710 let mut set: OrSet<String> = OrSet::new();
712 set.add("x".into(), make_tag(1, 1, 100));
713 set.remove(&"x".into());
714
715 assert!(!set.contains(&"x".into()));
716 }
717
718 #[test]
723 fn merge_commutative() {
724 let mut a: OrSet<u32> = OrSet::new();
725 a.add(1, make_tag(1, 1, 100));
726 a.add(2, make_tag(2, 1, 101));
727
728 let mut b: OrSet<u32> = OrSet::new();
729 b.add(2, make_tag(3, 2, 200));
730 b.add(3, make_tag(4, 2, 201));
731
732 let mut ab = a.clone();
733 ab.merge(b.clone());
734
735 let mut ba = b.clone();
736 ba.merge(a.clone());
737
738 assert_eq!(ab, ba);
739 }
740
741 #[test]
742 fn merge_associative() {
743 let mut a: OrSet<u32> = OrSet::new();
744 a.add(1, make_tag(1, 1, 100));
745
746 let mut b: OrSet<u32> = OrSet::new();
747 b.add(2, make_tag(2, 2, 200));
748
749 let mut c: OrSet<u32> = OrSet::new();
750 c.add(3, make_tag(3, 3, 300));
751
752 let mut ab_c = a.clone();
754 ab_c.merge(b.clone());
755 ab_c.merge(c.clone());
756
757 let mut bc = b.clone();
759 bc.merge(c.clone());
760 let mut a_bc = a.clone();
761 a_bc.merge(bc);
762
763 assert_eq!(ab_c, a_bc);
764 }
765
766 #[test]
767 fn merge_idempotent() {
768 let mut a: OrSet<u32> = OrSet::new();
769 a.add(1, make_tag(1, 1, 100));
770 a.add(2, make_tag(2, 1, 101));
771 a.remove(&1);
772
773 let before = a.clone();
774 a.merge(before.clone());
775 assert_eq!(a, before);
776 }
777
778 #[test]
779 fn merge_empty_sets() {
780 let a: OrSet<String> = OrSet::new();
781 let b: OrSet<String> = OrSet::new();
782
783 let mut merged = a.clone();
784 merged.merge(b);
785
786 assert!(merged.is_empty());
787 }
788
789 #[test]
790 fn merge_with_empty_is_identity() {
791 let mut a: OrSet<u32> = OrSet::new();
792 a.add(1, make_tag(1, 1, 100));
793 a.add(2, make_tag(2, 1, 101));
794
795 let before = a.clone();
796 a.merge(OrSet::new());
797 assert_eq!(a, before);
798 }
799
800 #[test]
805 fn three_way_concurrent_add_remove() {
806 let tag1 = make_tag(1, 0, 1);
808 let mut base: OrSet<String> = OrSet::new();
809 base.add("x".into(), tag1.clone());
810
811 let mut a = base.clone();
813 a.remove(&"x".into());
814
815 let mut b = base.clone();
817 b.add("x".into(), make_tag(2, 2, 200));
818
819 let mut c = base.clone();
821 c.remove(&"x".into());
822
823 let mut result = a.clone();
825 result.merge(b.clone());
826 result.merge(c.clone());
827
828 assert!(
830 result.contains(&"x".into()),
831 "B's concurrent add should win over A and C's removes"
832 );
833 }
834
835 #[test]
836 fn remove_then_concurrent_re_adds() {
837 let tag1 = make_tag(1, 0, 1);
839 let mut base: OrSet<String> = OrSet::new();
840 base.add("x".into(), tag1.clone());
841
842 let mut a = base.clone();
844 a.remove(&"x".into());
845 a.add("x".into(), make_tag(3, 1, 300));
846
847 let mut b = base.clone();
849 b.remove(&"x".into());
850 b.add("x".into(), make_tag(4, 2, 400));
851
852 let mut merged = a.clone();
853 merged.merge(b.clone());
854
855 assert!(merged.contains(&"x".into()));
857 assert_eq!(merged.active_tags(&"x".into()).len(), 2);
858 }
859
860 #[test]
861 fn mixed_elements_concurrent_ops() {
862 let mut base: OrSet<String> = OrSet::new();
864 base.add("a".into(), make_tag(1, 0, 1));
865 base.add("b".into(), make_tag(2, 0, 2));
866
867 let mut s1 = base.clone();
869 s1.remove(&"a".into());
870 s1.add("c".into(), make_tag(3, 1, 100));
871
872 let mut s2 = base.clone();
874 s2.remove(&"b".into());
875 s2.add("d".into(), make_tag(4, 2, 200));
876
877 let mut merged = s1.clone();
878 merged.merge(s2);
879
880 assert!(!merged.contains(&"a".into()));
887
888 assert!(!merged.contains(&"b".into()));
891
892 assert!(merged.contains(&"c".into()));
894 assert!(merged.contains(&"d".into()));
895 }
896
897 #[test]
902 fn apply_add_op() {
903 let mut set: OrSet<String> = OrSet::new();
904 let tag = make_tag(1, 1, 100);
905 set.apply(OrSetOp::Add("alice".into(), tag));
906
907 assert!(set.contains(&"alice".into()));
908 }
909
910 #[test]
911 fn apply_remove_op_with_observed_tags() {
912 let mut set: OrSet<String> = OrSet::new();
913 let tag = make_tag(1, 1, 100);
914 set.add("alice".into(), tag.clone());
915
916 set.apply(OrSetOp::Remove("alice".into(), vec![tag]));
918 assert!(!set.contains(&"alice".into()));
919 }
920
921 #[test]
922 fn apply_remove_with_unobserved_tag_survives() {
923 let mut set: OrSet<String> = OrSet::new();
924 let tag1 = make_tag(1, 1, 100);
925 let tag2 = make_tag(2, 2, 200);
926 set.add("alice".into(), tag1.clone());
927 set.add("alice".into(), tag2.clone());
928
929 set.apply(OrSetOp::Remove("alice".into(), vec![tag1]));
931
932 assert!(set.contains(&"alice".into()));
934 assert_eq!(set.active_tags(&"alice".into()).len(), 1);
935 }
936
937 #[test]
942 fn values_returns_distinct_present_elements() {
943 let mut set: OrSet<String> = OrSet::new();
944 set.add("a".into(), make_tag(1, 1, 100));
945 set.add("b".into(), make_tag(2, 1, 101));
946 set.add("c".into(), make_tag(3, 1, 102));
947 set.remove(&"b".into());
948
949 let vals = set.values();
950 assert_eq!(vals.len(), 2);
951 assert!(vals.contains(&"a".to_string()));
952 assert!(vals.contains(&"c".to_string()));
953 assert!(!vals.contains(&"b".to_string()));
954 }
955
956 #[test]
961 fn remove_base_state_item_succeeds() {
962 let base_tag = make_tag(1, 1, 100);
966 let mut base_state: OrSet<String> = OrSet::new();
967 base_state.add("alice".into(), base_tag.clone());
968
969 let event = Event {
971 wall_ts_us: 2000,
972 agent: "agent".into(),
973 itc: "itc".into(),
974 parents: vec![],
975 event_type: EventType::Assign,
976 item_id: crate::model::item_id::ItemId::new_unchecked("bn-test"),
977 data: EventData::Assign(AssignData {
978 agent: "alice".into(),
979 action: AssignAction::Unassign,
980 extra: BTreeMap::new(),
981 }),
982 event_hash: "hash".into(),
983 };
984
985 let ops = ops_from_events(&[event], &OrSetField::Assignees, Some(&base_state), None);
987
988 let mut result = base_state.clone();
990 for op in ops {
991 result.apply(op);
992 }
993
994 assert!(
996 !result.contains(&"alice".into()),
997 "Alice should be removed when base_state is provided"
998 );
999 }
1000
1001 #[test]
1002 fn concurrent_remove_respects_dag_visibility() {
1003 let root_hash = "root";
1008 let add_hash = "add_hash";
1009 let remove_hash = "remove_hash";
1010
1011 let mut dag = EventDag::new();
1013 let make_evt = |hash: &str, parents: Vec<&str>| Event {
1017 wall_ts_us: 0,
1018 agent: "a".into(),
1019 itc: "i".into(),
1020 parents: parents.iter().map(|s| s.to_string()).collect(),
1021 event_type: EventType::Create, item_id: crate::model::item_id::ItemId::new_unchecked("bn"),
1023 data: EventData::Create(crate::event::data::CreateData {
1024 title: "".into(),
1025 kind: crate::model::item::Kind::Task,
1026 size: None,
1027 urgency: crate::model::item::Urgency::Default,
1028 labels: vec![],
1029 parent: None,
1030 causation: None,
1031 description: None,
1032 extra: BTreeMap::new(),
1033 }),
1034 event_hash: hash.into(),
1035 };
1036
1037 dag.insert(make_evt(root_hash, vec![]));
1038 dag.insert(make_evt(add_hash, vec![root_hash]));
1039 dag.insert(make_evt(remove_hash, vec![root_hash]));
1040
1041 let add_event = Event {
1044 wall_ts_us: 1000,
1045 agent: "alice".into(),
1046 itc: "1".into(),
1047 parents: vec![root_hash.into()],
1048 event_type: EventType::Assign,
1049 item_id: crate::model::item_id::ItemId::new_unchecked("bn"),
1050 data: EventData::Assign(AssignData {
1051 agent: "alice".into(),
1052 action: AssignAction::Assign,
1053 extra: BTreeMap::new(),
1054 }),
1055 event_hash: add_hash.into(),
1056 };
1057
1058 let remove_event = Event {
1060 wall_ts_us: 2000,
1061 agent: "bob".into(),
1062 itc: "2".into(),
1063 parents: vec![root_hash.into()],
1064 event_type: EventType::Assign,
1065 item_id: crate::model::item_id::ItemId::new_unchecked("bn"),
1066 data: EventData::Assign(AssignData {
1067 agent: "alice".into(),
1068 action: AssignAction::Unassign,
1069 extra: BTreeMap::new(),
1070 }),
1071 event_hash: remove_hash.into(),
1072 };
1073
1074 let events = vec![add_event, remove_event];
1076
1077 let ops = ops_from_events(
1078 &events,
1079 &OrSetField::Assignees,
1080 None, Some(&dag),
1082 );
1083
1084 let mut set = OrSet::new();
1085 for op in ops {
1086 set.apply(op);
1087 }
1088
1089 assert!(
1091 set.contains(&"alice".into()),
1092 "Concurrent Add should survive Remove"
1093 );
1094 }
1095}