exocore-store 0.1.9

Store / indexation layer of Exocore (Distributed applications framework)
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
use std::{
    collections::{HashMap, HashSet},
    rc::Rc,
};

use chrono::{DateTime, Utc};
use exocore_chain::{block::BlockOffset, operation::OperationId};
use exocore_core::time::ConsistentTimestamp;
use exocore_protos::{
    reflect::{FieldId, MutableReflectMessage, ReflectMessage},
    registry::Registry,
    store::{Projection, Trait, TraitDetails},
};
use itertools::Itertools;

use super::super::mutation_index::{MutationMetadata, MutationType, PutTraitMetadata};
use crate::{entity::TraitId, error::Error, query::ResultHash};

const GC_INACTIVE_OPS_THRESHOLD: usize = 10;

/// Aggregates mutations metadata of an entity retrieved from the mutations
/// index. Once merged, only the latest / active mutations are remaining, and
/// can then be fetched from the chain.
///
/// Operations are merged in order they got committed to the chain, and then
/// operation id within a block. If an old operation gets committed after a
/// newer operation, the old operation gets discarded to prevent inconsistency.
pub struct EntityAggregator {
    pub entity_id: String,

    /// traits aggregator
    pub traits: HashMap<TraitId, TraitAggregator>,

    /// ids of operations that are still active (ex: were not overridden by
    /// another mutation)
    pub active_operations: HashSet<OperationId>,

    /// total number of mutations
    pub mutation_count: usize,

    /// hash of the operations of the entity
    pub hash: ResultHash,

    /// date of the creation of entity, based on put trait creation date OR
    /// first operation
    pub creation_date: Option<DateTime<Utc>>,

    /// date of the modification of entity, based on put trait modification date
    /// OR last operation
    pub modification_date: Option<DateTime<Utc>>,

    /// date of the deletion of the entity if all traits are deleted once all
    /// mutations are applied
    pub deletion_date: Option<DateTime<Utc>>,

    /// last operation that have affected the entity
    pub last_operation_id: OperationId,

    /// offset of the last block in which last committed operation is
    pub last_block_offset: Option<BlockOffset>,

    /// at least one of the operations is in pending store
    pub in_pending: bool,

    /// indicates that deletion affecting the entity is in the pending index.
    /// this inhibits any further garbage collection until they hit the chain
    /// index
    pub pending_deletion: bool,
}

impl EntityAggregator {
    pub fn new<I>(mutations_metadata: I) -> Result<EntityAggregator, Error>
    where
        I: Iterator<Item = MutationMetadata>,
    {
        let ordered_mutations_metadata = sort_mutations_commit_time(mutations_metadata);

        let mut entity_id = String::new();
        let mut entity_creation_date = None;
        let mut entity_modification_date = None;
        let mut entity_deletion_date = None;

        let hasher = result_hasher();
        let mut digest = hasher.digest();
        let mut traits = HashMap::<TraitId, TraitAggregator>::new();
        let mut active_operation_ids = HashSet::<OperationId>::new();
        let mut last_operation_id = None;
        let mut last_block_offset = None;
        let mut in_pending = false;
        let mut pending_deletion = false;
        let mut mutation_count = 0;

        for current_mutation in ordered_mutations_metadata {
            mutation_count += 1;

            let current_operation_id = current_mutation.operation_id;
            let current_operation_time =
                ConsistentTimestamp::from(current_operation_id).to_datetime();
            let current_block_offset = current_mutation.block_offset;

            // hashing operations instead of traits content allow invalidating results as
            // soon as one operation is made since we can't guarantee anything
            digest.update(&current_operation_id.to_ne_bytes());

            entity_id = current_mutation.entity_id.clone();

            match &current_mutation.mutation_type {
                MutationType::TraitPut(put_trait) => {
                    let agg = TraitAggregator::get_for_trait(&mut traits, &put_trait.trait_id);

                    if let Some(last_operation_id) = agg.last_operation_id {
                        // discard the new mutation if it happened before the last mutation, but got
                        // committed late, to prevent inconsistency
                        if current_operation_id < last_operation_id {
                            continue;
                        }

                        active_operation_ids.remove(&last_operation_id);
                    }

                    agg.push_put_mutation(current_mutation);
                    active_operation_ids.insert(current_operation_id);

                    update_if_older(&mut entity_creation_date, agg.creation_date);
                    update_if_newer(&mut entity_modification_date, agg.modification_date);
                    entity_deletion_date = None;
                }
                MutationType::TraitTombstone(trait_id) => {
                    let agg = TraitAggregator::get_for_trait(&mut traits, trait_id);

                    if let Some(last_operation_id) = agg.last_operation_id {
                        // discard the new mutation if it happened before the last mutation, but got
                        // committed late, to prevent inconsistency
                        if current_operation_id < last_operation_id {
                            continue;
                        }

                        active_operation_ids.remove(&last_operation_id);
                    }

                    active_operation_ids.insert(current_operation_id);
                    agg.push_delete_mutation(current_operation_id);

                    update_if_newer(&mut entity_modification_date, Some(current_operation_time));

                    if TraitAggregator::all_deleted(&traits) {
                        entity_creation_date = None;
                        entity_modification_date = None;
                        entity_deletion_date = Some(current_operation_time);
                    }
                }
                MutationType::EntityTombstone => {
                    if let Some(latest_operation_id) = last_operation_id {
                        // discard the new mutation if it happened before the latest operation, but
                        // got committed late, to prevent inconsistency
                        if current_operation_id < latest_operation_id {
                            continue;
                        }
                    }

                    for aggregated_trait in traits.values_mut() {
                        aggregated_trait.push_delete_mutation(current_operation_id);
                    }

                    active_operation_ids.clear();
                    active_operation_ids.insert(current_operation_id);

                    entity_creation_date = None;
                    entity_modification_date = None;
                    entity_deletion_date = Some(current_operation_time);
                }
                MutationType::PendingDeletion => pending_deletion = true,
            }

            if current_operation_id > last_operation_id.unwrap_or(std::u64::MIN) {
                last_operation_id = Some(current_operation_id);
            }

            if let Some(block_offset) = current_block_offset {
                // no need to check if current block is after since mutations are ordered by
                // block offset
                last_block_offset = Some(block_offset);
            } else {
                in_pending = true;
            }
        }

        Ok(EntityAggregator {
            entity_id,
            traits,
            active_operations: active_operation_ids,
            mutation_count,
            hash: digest.finalize(),
            creation_date: entity_creation_date,
            modification_date: entity_modification_date,
            deletion_date: entity_deletion_date,
            last_operation_id: last_operation_id.unwrap_or_default(),
            last_block_offset,
            in_pending,
            pending_deletion,
        })
    }

    /// Annotates each trait with projections that are matching them in a query.
    ///
    /// Projections allow returning only a subset of the traits or a part of its
    /// data. See `project_trait` method for the actual projections no the data
    /// of a retrieved trait.
    pub fn annotate_projections(&mut self, projections: &[Projection]) {
        if projections.is_empty() {
            return;
        }

        let projections_rc = projections.iter().map(|p| Rc::new(p.clone())).collect_vec();

        'traits_loop: for trait_agg in self.traits.values_mut() {
            if let Some((_mutation, pm)) = trait_agg.last_put_mutation() {
                for projection in &projections_rc {
                    if projection_matches_trait(pm.trait_type.as_deref(), projection) {
                        trait_agg.projection = Some(projection.clone());
                        continue 'traits_loop;
                    }
                }
            }
        }
    }

    /// Whether the entity should be analysed for garbage collection because it
    /// has inactive / overridden operations.
    pub fn should_collect(&self) -> bool {
        self.mutation_count - self.active_operations.len() > GC_INACTIVE_OPS_THRESHOLD
    }
}

/// Aggregates mutations metadata of an entity's trait retrieved from the
/// mutations index. Once merged, only the latest / active mutations are
/// remaining, and can then be fetched from the chain.
#[derive(Default)]
pub struct TraitAggregator {
    pub put_mutations: Vec<MutationMetadata>,
    pub last_operation_id: Option<OperationId>,
    pub creation_date: Option<DateTime<Utc>>,
    pub modification_date: Option<DateTime<Utc>>,
    pub deletion_date: Option<DateTime<Utc>>,
    pub projection: Option<Rc<Projection>>,
    pub mutation_count: usize,
}

impl TraitAggregator {
    fn get_for_trait<'t>(
        traits: &'t mut HashMap<TraitId, TraitAggregator>,
        trait_id: &str,
    ) -> &'t mut TraitAggregator {
        if !traits.contains_key(trait_id) {
            traits.insert(trait_id.to_string(), TraitAggregator::default());
        }

        traits.get_mut(trait_id).unwrap()
    }

    fn all_deleted(traits: &HashMap<TraitId, TraitAggregator>) -> bool {
        traits.values().all(|t| t.deletion_date.is_some())
    }

    fn push_put_mutation(&mut self, mutation: MutationMetadata) {
        let op_id = mutation.operation_id;
        let op_time = ConsistentTimestamp::from(op_id).to_datetime();

        let put_trait = if let MutationType::TraitPut(put_trait) = &mutation.mutation_type {
            put_trait
        } else {
            return;
        };

        let modification_date = if let Some(modification_date) = put_trait.modification_date {
            Some(modification_date)
        } else if self.creation_date.is_some() {
            Some(op_time)
        } else {
            None
        };
        update_if_newer(&mut self.modification_date, modification_date);

        let creation_date = put_trait.creation_date.unwrap_or(op_time);
        update_if_older(&mut self.creation_date, Some(creation_date));

        self.deletion_date = None;

        self.put_mutations.push(mutation);
        self.last_operation_id = Some(op_id);
        self.mutation_count += 1;
    }

    fn push_delete_mutation(&mut self, operation_id: OperationId) {
        let op_time = ConsistentTimestamp::from(operation_id).to_datetime();
        self.creation_date = None;
        self.modification_date = None;
        self.deletion_date = Some(op_time);
        self.last_operation_id = Some(operation_id);
        self.mutation_count += 1;
    }

    pub fn last_put_mutation(&self) -> Option<(&MutationMetadata, &PutTraitMetadata)> {
        let mutation = self.put_mutations.last()?;

        match &mutation.mutation_type {
            MutationType::TraitPut(put) => Some((mutation, put)),
            _ => None,
        }
    }
}

pub fn result_hasher() -> crc::Crc<u64> {
    crc::Crc::<u64>::new(&crc::CRC_64_ECMA_182)
}

/// Checks if a projection specified in a query matches the given trait type.
fn projection_matches_trait(trait_type: Option<&str>, projection: &Projection) -> bool {
    if projection.package.is_empty() {
        return true;
    }

    let trait_type = if let Some(trait_type) = trait_type {
        trait_type
    } else {
        return false;
    };

    for package in &projection.package {
        if (package.ends_with('$') && Some(trait_type) == package.strip_suffix('$'))
            || trait_type.starts_with(package)
        {
            return true;
        }
    }

    false
}

/// Executes a projection on fields of a trait so that only desired fields are
/// returned.
pub fn project_trait_fields(
    registry: &Registry,
    trt: &mut Trait,
    projection: &Projection,
) -> Result<(), Error> {
    let any_msg = if let Some(msg) = &trt.message {
        msg
    } else {
        return Ok(());
    };

    // early exit if nothing to project since unmarshal+marshal is costly
    if projection.field_ids.is_empty() && projection.field_group_ids.is_empty() {
        return Ok(());
    }

    let mut dyn_msg = exocore_protos::reflect::from_prost_any(registry, any_msg)?;

    let field_ids_set: HashSet<FieldId> = projection.field_ids.iter().cloned().collect();
    let field_groups_set: HashSet<FieldId> = projection.field_group_ids.iter().cloned().collect();

    let mut fields_to_clear = Vec::new();
    for (field_id, field) in dyn_msg.fields() {
        let direct_field_match = field_ids_set.contains(field_id);
        let group_field_match = field
            .groups
            .iter()
            .any(|gid| field_groups_set.contains(gid));

        if !direct_field_match && !group_field_match {
            fields_to_clear.push(*field_id);
        }
    }

    if !fields_to_clear.is_empty() {
        for field_id in fields_to_clear {
            let _ = dyn_msg.clear_field_value(field_id);
        }
        trt.message = Some(dyn_msg.encode_to_prost_any()?);
        trt.details = TraitDetails::Partial.into();
    }

    Ok(())
}

/// Sorts mutations in order they got committed (block offset/pending, then
/// operation id)
pub fn sort_mutations_commit_time<M: Iterator<Item = MutationMetadata>>(
    mutations: M,
) -> impl Iterator<Item = MutationMetadata> {
    mutations.sorted_by_key(|result| {
        let block_offset = result.block_offset.unwrap_or(std::u64::MAX);
        (block_offset, result.operation_id)
    })
}

fn update_if_newer(current: &mut Option<DateTime<Utc>>, new: Option<DateTime<Utc>>) {
    if current.is_none() || new > *current {
        *current = new;
    }
}

fn update_if_older(current: &mut Option<DateTime<Utc>>, new: Option<DateTime<Utc>>) {
    if current.is_none() || new < *current {
        *current = new;
    }
}

#[cfg(test)]
pub(crate) mod tests {
    use exocore_chain::block::BlockOffset;
    use exocore_protos::{
        prost::{Message, ProstAnyPackMessageExt},
        reflect::FieldGroupId,
        store::OrderingValue,
        test::TestMessage,
    };

    use super::*;
    use crate::ordering::OrderingValueWrapper;

    const TYPE1: &str = "exocore.test.TestMessage";
    const TYPE2: &str = "exocore.test.TestMessage2";

    #[test]
    fn mutations_ordering() {
        let t1 = "t1".to_string();
        let t2 = "t2".to_string();
        let t3 = "t3".to_string();

        let mutations = vec![
            mock_put_trait(&t1, TYPE1, Some(2), 3, None, None),
            mock_put_trait(&t3, TYPE1, Some(1), 0, None, None),
            mock_put_trait(&t1, TYPE1, Some(1), 2, None, None),
            mock_put_trait(&t2, TYPE1, Some(3), 5, None, None),
            mock_put_trait(&t3, TYPE1, None, 6, None, None),
            mock_put_trait(&t2, TYPE1, Some(1), 1, None, None),
            mock_put_trait(&t2, TYPE1, Some(2), 4, None, None),
        ];

        let em = EntityAggregator::new(mutations.into_iter()).unwrap();
        assert_eq!(em.traits.get(&t1).unwrap().last_operation_id, Some(3));
        assert_eq!(em.traits.get(&t2).unwrap().last_operation_id, Some(5));
        assert_eq!(em.traits.get(&t3).unwrap().last_operation_id, Some(6));
        assert_eq!(em.active_operations.len(), 3);
        assert!(em.active_operations.contains(&3));
        assert!(em.active_operations.contains(&5));
        assert!(em.active_operations.contains(&6));
    }

    #[test]
    fn put_trait_conflict() {
        let t1 = "t1".to_string();

        // operation 2 got committed before operation 1
        let mutations = vec![
            mock_put_trait(&t1, TYPE1, Some(1), 2, None, None),
            mock_put_trait(&t1, TYPE1, Some(2), 1, None, None),
        ];

        // operation 1 should be discarded, and only operation 2 active
        let em = EntityAggregator::new(mutations.into_iter()).unwrap();
        assert_eq!(em.traits.get(&t1).unwrap().last_operation_id, Some(2));
        assert!(em.active_operations.contains(&2));
    }

    #[test]
    fn delete_trait() {
        let t1 = "t1".to_string();

        let mutations = vec![
            mock_put_trait(&t1, TYPE1, Some(1), 1, None, None),
            mock_delete_trait(&t1, Some(2), 2),
        ];

        let em = EntityAggregator::new(mutations.into_iter()).unwrap();
        assert!(em.deletion_date.is_some());
        assert!(em.traits.get(&t1).unwrap().deletion_date.is_some());
        assert!(em.active_operations.contains(&2));
    }

    #[test]
    fn delete_trait_conflict() {
        let t1 = "t1".to_string();

        // delete operation 1 got committed after operation 2
        let mutations = vec![
            mock_put_trait(&t1, TYPE1, Some(1), 2, None, None),
            mock_delete_trait(&t1, Some(2), 1),
        ];

        // delete operation should be discarded since an operation happened on the trait
        // before it got committed
        let em = EntityAggregator::new(mutations.into_iter()).unwrap();
        assert_eq!(em.traits.get(&t1).unwrap().last_operation_id, Some(2));
        assert!(em.active_operations.contains(&2))
    }

    #[test]
    fn delete_entity() {
        let t1 = "t1".to_string();

        let mutations = vec![
            mock_put_trait(&t1, TYPE1, Some(1), 1, None, None),
            mock_delete_entity(Some(2), 2),
        ];

        let em = EntityAggregator::new(mutations.into_iter()).unwrap();
        assert!(em.deletion_date.is_some());
        assert!(em.traits.get(&t1).unwrap().deletion_date.is_some());
        assert!(em.active_operations.contains(&2));
    }

    #[test]
    fn mutations_delete_entity_conflict() {
        let t1 = "t1".to_string();

        // delete entity operation got committed after an newer operation
        let mutations = vec![
            mock_delete_entity(Some(1), 2),
            mock_put_trait(&t1, TYPE1, Some(2), 1, None, None),
        ];

        let em = EntityAggregator::new(mutations.into_iter()).unwrap();
        assert_eq!(em.traits.get(&t1).unwrap().last_operation_id, Some(1));
        assert!(em.active_operations.contains(&1));
    }

    #[test]
    fn trait_dates() {
        let t1 = "t1".to_string();

        let merge_mutations = |mutations: Vec<MutationMetadata>| -> TraitAggregator {
            let mut em = EntityAggregator::new(mutations.into_iter()).unwrap();
            em.traits.remove(&t1).unwrap()
        };

        fn assert_dates(
            agg: &TraitAggregator,
            creation: Option<OperationId>,
            modification: Option<OperationId>,
            deletion: Option<OperationId>,
        ) {
            assert_eq!(
                agg.creation_date,
                creation.map(|c| ConsistentTimestamp::from(c).to_datetime()),
            );
            assert_eq!(
                agg.modification_date,
                modification.map(|m| ConsistentTimestamp::from(m).to_datetime()),
            );
            assert_eq!(
                agg.deletion_date,
                deletion.map(|m| ConsistentTimestamp::from(m).to_datetime()),
            );
        }

        {
            // if no dates specified, creation date is based on first operation
            let agg = merge_mutations(vec![mock_put_trait(&t1, TYPE1, Some(1), 1, None, None)]);
            assert_dates(&agg, Some(1), None, None);
        }

        {
            // if no dates specified, modification date is based on last operation
            let agg = merge_mutations(vec![
                mock_put_trait(&t1, TYPE1, Some(1), 1, None, None),
                mock_put_trait(&t1, TYPE1, Some(2), 2, None, None),
            ]);
            assert_dates(&agg, Some(1), Some(2), None);
        }

        {
            // oldest specified creation date has priority
            let agg = merge_mutations(vec![
                mock_put_trait(&t1, TYPE1, Some(1), 5, None, None),
                mock_put_trait(&t1, TYPE1, Some(2), 6, Some(1), None),
            ]);
            assert_dates(&agg, Some(1), Some(6), None);
        }

        {
            // last operation always override older specified modification date
            let agg = merge_mutations(vec![
                mock_put_trait(&t1, TYPE1, Some(1), 5, None, Some(2)),
                mock_put_trait(&t1, TYPE1, Some(2), 6, None, None),
                mock_put_trait(&t1, TYPE1, Some(2), 7, None, None),
            ]);
            assert_dates(&agg, Some(5), Some(7), None);
        }

        {
            // deleting trait should reset dates & mark as deleted
            let agg = merge_mutations(vec![
                mock_put_trait(&t1, TYPE1, Some(1), 5, None, Some(2)),
                mock_delete_trait(&t1, Some(2), 6),
            ]);
            assert_dates(&agg, None, None, Some(6));
        }

        {
            // deleting entity should reset dates & mark as deleted
            let agg = merge_mutations(vec![
                mock_put_trait(&t1, TYPE1, Some(1), 5, None, Some(2)),
                mock_delete_entity(Some(2), 6),
            ]);
            assert_dates(&agg, None, None, Some(6));
        }
    }

    #[test]
    fn entity_dates() {
        let t1 = "t1".to_string();
        let t2 = "t2".to_string();

        fn assert_dates(
            agg: &EntityAggregator,
            creation: Option<OperationId>,
            modification: Option<OperationId>,
            deletion: Option<OperationId>,
        ) {
            assert_eq!(
                agg.creation_date,
                creation.map(|c| ConsistentTimestamp::from(c).to_datetime()),
            );
            assert_eq!(
                agg.modification_date,
                modification.map(|m| ConsistentTimestamp::from(m).to_datetime()),
            );
            assert_eq!(
                agg.deletion_date,
                deletion.map(|m| ConsistentTimestamp::from(m).to_datetime()),
            );
        }

        {
            // creation date based on operation
            let mutations = vec![mock_put_trait(&t1, TYPE1, Some(2), 1, None, None)];
            let em = EntityAggregator::new(mutations.into_iter()).unwrap();
            assert_dates(&em, Some(1), None, None);
        }

        {
            // explicit dates
            let mutations = vec![mock_put_trait(&t1, TYPE1, Some(2), 10, Some(1), Some(2))];
            let em = EntityAggregator::new(mutations.into_iter()).unwrap();
            assert_dates(&em, Some(1), Some(2), None);
        }

        {
            // multiple mutations operations based dates
            let mutations = vec![
                mock_put_trait(&t1, TYPE1, Some(2), 10, None, None),
                mock_put_trait(&t1, TYPE1, Some(3), 11, None, None),
            ];
            let em = EntityAggregator::new(mutations.into_iter()).unwrap();
            assert_dates(&em, Some(10), Some(11), None);
        }

        {
            // trait deletion counts as modification
            let mutations = vec![
                mock_put_trait(&t1, TYPE1, Some(2), 10, None, None),
                mock_put_trait(&t2, TYPE1, Some(2), 10, None, None),
                mock_delete_trait(&t1, Some(3), 11),
            ];
            let em = EntityAggregator::new(mutations.into_iter()).unwrap();
            assert_dates(&em, Some(10), Some(11), None);
        }

        {
            // deleting all traits should mark entity as deleted
            let mutations = vec![
                mock_put_trait(&t1, TYPE1, Some(2), 10, None, None),
                mock_delete_trait(&t1, Some(3), 11),
            ];
            let em = EntityAggregator::new(mutations.into_iter()).unwrap();
            assert_dates(&em, None, None, Some(11));
        }
        {
            // deleting entity should mark entity as deleted
            let mutations = vec![
                mock_put_trait(&t1, TYPE1, Some(2), 10, None, None),
                mock_delete_entity(Some(2), 11),
            ];
            let em = EntityAggregator::new(mutations.into_iter()).unwrap();
            assert_dates(&em, None, None, Some(11));
        }

        {
            // entity deletion resets dates
            let mutations = vec![
                mock_put_trait(&t1, TYPE1, Some(2), 1, Some(10), Some(11)),
                mock_delete_entity(Some(2), 2),
                mock_put_trait(&t1, TYPE1, Some(3), 3, Some(20), Some(21)),
            ];
            let em = EntityAggregator::new(mutations.into_iter()).unwrap();
            assert_dates(&em, Some(20), Some(21), None);
        }
    }

    #[test]
    fn test_projection_matches_trait() {
        {
            // prefix match
            let proj1 = Projection {
                package: vec!["some.message".to_string()],
                ..Default::default()
            };

            assert!(projection_matches_trait(Some("some.message.Type"), &proj1));
            assert!(!projection_matches_trait(None, &proj1));
            assert!(!projection_matches_trait(
                Some("other.message.Type2"),
                &proj1
            ));
        }

        {
            // match all
            let proj2 = Projection {
                package: vec![],
                ..Default::default()
            };
            assert!(projection_matches_trait(Some("some.message.Type"), &proj2));
            assert!(projection_matches_trait(None, &proj2));
            assert!(projection_matches_trait(
                Some("other.message.Type2"),
                &proj2
            ));
        }

        {
            // exact match
            let proj1 = Projection {
                package: vec!["some.message.Type$".to_string()],
                ..Default::default()
            };

            assert!(projection_matches_trait(Some("some.message.Type"), &proj1));
            assert!(!projection_matches_trait(
                Some("some.message.Type2"),
                &proj1
            ));
        }
    }

    #[test]
    fn traits_projection_annotation() {
        let t1 = "t1";
        let t2 = "t2";

        fn assert_projection_id(em: &EntityAggregator, trait_id: &str, id: u32) {
            let trt = em.traits.get(trait_id);
            let proj = trt.unwrap().projection.as_ref().unwrap();
            assert_eq!(proj.field_ids[0], id,);
        }

        {
            // prefix match
            let mutations = vec![
                mock_put_trait(t1, TYPE1, Some(1), 1, None, None),
                mock_put_trait(t2, TYPE2, Some(1), 2, None, None),
            ];
            let mut em = EntityAggregator::new(mutations.into_iter()).unwrap();
            em.annotate_projections(&[Projection {
                package: vec!["exocore.test".to_string()],
                field_ids: vec![1],
                ..Default::default()
            }]);

            assert_projection_id(&em, t1, 1);
            assert_projection_id(&em, t2, 1);
        }

        {
            // exact match & catch-all
            let mutations = vec![
                mock_put_trait(t1, TYPE1, Some(1), 1, None, None),
                mock_put_trait(t2, TYPE2, Some(1), 2, None, None),
            ];
            let mut em = EntityAggregator::new(mutations.into_iter()).unwrap();
            em.annotate_projections(&[
                Projection {
                    package: vec![format!("{}$", TYPE1)],
                    field_ids: vec![1],
                    ..Default::default()
                },
                Projection {
                    field_ids: vec![2],
                    ..Default::default()
                },
            ]);

            assert_projection_id(&em, t1, 1);
            assert_projection_id(&em, t2, 2);
        }
    }

    #[test]
    fn traits_projection() {
        let registry = Registry::new_with_exocore_types();

        let msg = TestMessage {
            string1: "string1".to_string(),
            string2: "string2".to_string(),
            grouped1: "grouped1".to_string(),
            grouped2: "grouped2".to_string(),
            ..Default::default()
        };

        let project = |fields: Vec<FieldId>, groups: Vec<FieldGroupId>| {
            let mut trt = Trait {
                message: Some(msg.pack_to_any().unwrap()),
                ..Default::default()
            };

            project_trait_fields(
                &registry,
                &mut trt,
                &Projection {
                    field_ids: fields,
                    field_group_ids: groups,
                    ..Default::default()
                },
            )
            .unwrap();

            let msg = TestMessage::decode(trt.message.as_ref().unwrap().value.as_slice()).unwrap();

            (msg, TraitDetails::from_i32(trt.details).unwrap())
        };

        assert_eq!(project(vec![], vec![]), (msg.clone(), TraitDetails::Full));

        assert_eq!(
            project(vec![1], vec![]),
            (
                TestMessage {
                    string1: "string1".to_string(),
                    ..Default::default()
                },
                TraitDetails::Partial
            )
        );

        assert_eq!(
            project(vec![1, 2], vec![]),
            (
                TestMessage {
                    string1: "string1".to_string(),
                    string2: "string2".to_string(),
                    ..Default::default()
                },
                TraitDetails::Partial
            )
        );

        assert_eq!(
            project(vec![2], vec![1]),
            (
                TestMessage {
                    string2: "string2".to_string(),
                    grouped1: "grouped1".to_string(),
                    grouped2: "grouped2".to_string(),
                    ..Default::default()
                },
                TraitDetails::Partial
            )
        );

        assert_eq!(
            project(vec![], vec![2]),
            (
                TestMessage {
                    grouped2: "grouped2".to_string(),
                    ..Default::default()
                },
                TraitDetails::Partial
            )
        );
    }

    pub fn mock_put_trait<I: Into<String>, T: Into<String>>(
        trait_id: I,
        trait_type: T,
        block_offset: Option<BlockOffset>,
        operation_id: OperationId,
        created_time_op_id: Option<OperationId>,
        modification_time_op_id: Option<OperationId>,
    ) -> MutationMetadata {
        let creation_date =
            created_time_op_id.map(|op_id| ConsistentTimestamp::from(op_id).to_datetime());
        let modification_date =
            modification_time_op_id.map(|op_id| ConsistentTimestamp::from(op_id).to_datetime());

        MutationMetadata {
            operation_id,
            block_offset,
            entity_id: String::new(),
            mutation_type: MutationType::TraitPut(PutTraitMetadata {
                trait_id: trait_id.into(),
                trait_type: Some(trait_type.into()),
                creation_date,
                modification_date,
            }),
            sort_value: OrderingValueWrapper {
                value: OrderingValue::default(),
                ignore: true,
                reverse: true,
            },
        }
    }

    pub fn mock_delete_trait<T: Into<String>>(
        trait_id: T,
        block_offset: Option<BlockOffset>,
        operation_id: OperationId,
    ) -> MutationMetadata {
        MutationMetadata {
            operation_id,
            block_offset,
            entity_id: String::new(),
            mutation_type: MutationType::TraitTombstone(trait_id.into()),
            sort_value: OrderingValueWrapper {
                value: OrderingValue::default(),
                ignore: true,
                reverse: true,
            },
        }
    }

    pub fn mock_delete_entity(
        block_offset: Option<BlockOffset>,
        operation_id: OperationId,
    ) -> MutationMetadata {
        MutationMetadata {
            operation_id,
            block_offset,
            entity_id: String::new(),
            mutation_type: MutationType::EntityTombstone,
            sort_value: OrderingValueWrapper {
                value: OrderingValue::default(),
                ignore: true,
                reverse: true,
            },
        }
    }

    pub fn mock_pending_delete(
        block_offset: Option<BlockOffset>,
        operation_id: OperationId,
    ) -> MutationMetadata {
        MutationMetadata {
            operation_id,
            block_offset,
            entity_id: String::new(),
            mutation_type: MutationType::PendingDeletion,
            sort_value: OrderingValueWrapper {
                value: OrderingValue::default(),
                ignore: true,
                reverse: true,
            },
        }
    }
}