exocore-store 0.1.26

Store / indexation layer of Exocore (Distributed applications framework)
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
use std::{
    borrow::Borrow,
    path::{Path, PathBuf},
    rc::Rc,
    sync::Arc,
};

use exocore_chain::{
    block::{BlockHeight, BlockOffset},
    chain,
    engine::Event,
    operation::{Operation, OperationId},
    pending, EngineHandle, EngineOperationStatus,
};
use exocore_core::{
    cell::FullCell,
    time::{Clock, Instant},
};
use exocore_protos::{
    generated::exocore_store::{
        entity_mutation::Mutation, EntityMutation, EntityQuery, EntityResults, Trait,
    },
    prost::{Message, ProstDateTimeExt},
    registry::Registry,
    store::Projection,
};
use gc::GarbageCollector;
use itertools::Itertools;

use super::mutation_index::{IndexOperation, MutationIndex, MutationMetadata};
use crate::error::Error;

mod config;
pub use config::*;

mod aggregator;
pub use aggregator::*;

pub mod gc;

mod searcher;

pub mod iterator;

#[cfg(test)]
pub(crate) mod test_index;
#[cfg(test)]
mod tests;

/// Manages and index entities and their traits stored in the chain and pending
/// store of the chain layer. The index accepts mutations from the chain layer
/// through its event stream, and manages both indices to be consistent.
///
/// The chain index is persisted on disk, while the pending store index is an
/// in-memory index. Since the persistence in the chain is not definitive until
/// blocks and their operations (entity mutations) are stored at a certain
/// depth, a part of the chain is actually indexed in the in-memory index.
/// Once they reach a certain depth, they are persisted in the chain index.
pub struct EntityIndex<CS, PS>
where
    CS: chain::ChainStore,
    PS: pending::PendingStore,
{
    config: EntityIndexConfig,
    pending_index: MutationIndex,
    chain_index_dir: PathBuf,
    chain_index: MutationIndex,
    chain_index_last_block: Option<BlockOffset>,
    full_cell: FullCell,
    chain_handle: EngineHandle<CS, PS>,
    gc: GarbageCollector,
}

impl<CS, PS> EntityIndex<CS, PS>
where
    CS: chain::ChainStore,
    PS: pending::PendingStore,
{
    /// Opens or create an entities index
    pub fn open_or_create(
        cell: FullCell,
        config: EntityIndexConfig,
        chain_handle: EngineHandle<CS, PS>,
        clock: Clock,
    ) -> Result<EntityIndex<CS, PS>, Error> {
        let mut pending_index = MutationIndex::create_in_memory(
            config.pending_index_config,
            cell.cell().schemas().clone(),
        )?;
        pending_index.set_full_text_boost(config.pending_index_boost);

        // make sure directories are created
        let chain_index_dir = cell
            .cell()
            .store_directory()
            .as_os_path()
            .expect("Expected cell to be in an OS directory")
            .join("chain");
        if std::fs::metadata(&chain_index_dir).is_err() {
            std::fs::create_dir_all(&chain_index_dir)?;
        }

        let chain_index =
            Self::create_chain_index(config, cell.cell().schemas(), &chain_index_dir)?;
        let mut index = EntityIndex {
            config,
            pending_index,
            chain_index_dir,
            chain_index,
            chain_index_last_block: None,
            full_cell: cell,
            chain_handle,
            gc: GarbageCollector::new(config.garbage_collector, clock),
        };

        let chain_last_block = index.chain_handle.get_chain_last_block_info()?;
        let last_chain_indexed_block = index.chain_index.highest_indexed_block()?;
        if last_chain_indexed_block.is_none() && chain_last_block.is_some() {
            index.reindex_chain()?;
        }

        index.reindex_pending()?;

        Ok(index)
    }

    pub fn handle_chain_engine_event(
        &mut self,
        event: Event,
    ) -> Result<(Vec<OperationId>, usize), Error> {
        self.handle_chain_engine_events(std::iter::once(event))
    }

    /// Handles events coming from the chain layer. These events allow keeping
    /// the index consistent with the chain layer, up to the consistency
    /// guarantees that the layer offers. Returns operations that have been
    /// involved and the number of index operations applied.
    ///
    /// Since the events stream is buffered, we may receive a discontinuity if
    /// the chain layer couldn't send us an event. In that case, we re-index
    /// the pending index since we can't guarantee that we didn't lose an
    /// event.
    pub fn handle_chain_engine_events<E>(
        &mut self,
        events: E,
    ) -> Result<(Vec<OperationId>, usize), Error>
    where
        E: Iterator<Item = Event>,
    {
        let mut index_operations_count = 0;
        let mut affected_operations = Vec::new();

        let mut batched_operations = Vec::new();
        for event in events {
            // We collect new pending operations so that we can apply them in batch. As soon
            // as we hit another kind of events, we apply collected events at
            // once and then continue.
            if let Event::NewPendingOperation(op_id) = event {
                batched_operations.push(op_id);
                affected_operations.push(op_id);
                continue;
            } else if !batched_operations.is_empty() {
                let current_operations = std::mem::take(&mut batched_operations);
                index_operations_count +=
                    self.handle_chain_pending_operations(current_operations.into_iter())?;
            }

            match event {
                Event::Started => {
                    info!("Chain engine is ready, indexing pending store & chain");
                    self.index_chain_new_blocks(Some(&mut affected_operations))?;
                    self.reindex_pending()?;
                }
                Event::StreamDiscontinuity => {
                    warn!("Got a stream discontinuity. Forcing re-indexation of pending...");
                    self.reindex_pending()?;
                }
                Event::NewChainBlock(block_offset) => {
                    debug!(
                        "Got new block at offset {}, checking if we can index a new block",
                        block_offset
                    );
                    index_operations_count +=
                        self.index_chain_new_blocks(Some(&mut affected_operations))?;
                }
                Event::ChainDiverged(diverged_block_offset) => {
                    let highest_indexed_block = self.chain_index.highest_indexed_block()?;
                    warn!(
                        "Chain has diverged at offset={}. Highest indexed block at = {:?}",
                        diverged_block_offset, highest_indexed_block
                    );

                    if let Some(last_indexed_offset) = highest_indexed_block {
                        if last_indexed_offset < diverged_block_offset {
                            // since we only index blocks that have a certain depth, and therefor
                            // higher probability of being definitive,
                            // if we have a divergence, we can just re-index
                            // the pending store which should still contain operations that are in
                            // our invalid chain
                            warn!(
                                "Divergence is after last indexed offset, we only re-index pending"
                            );
                            self.reindex_pending()?;
                        } else {
                            // if we are here, we indexed a block from the chain that isn't valid
                            // anymore since we are deleting traits that
                            // got deleted from the actual index, there is no
                            // way to rollback to the diverged offset, and will require a re-index.
                            // this can be prevented by tweaking the
                            // `EntitiesIndexConfig`.`chain_index_min_depth` value
                            return Err(Error::Fatal(anyhow!(
                                "Chain has diverged at an offset={}, which is before last indexed block at offset {}",
                                diverged_block_offset, last_indexed_offset
                            )));
                        }
                    } else {
                        warn!("Diverged with an empty chain index. Re-indexing...");
                        self.reindex_chain()?;
                    }
                }
                Event::NewPendingOperation(_op_id) => unreachable!(),
            }
        }

        if !batched_operations.is_empty() {
            index_operations_count +=
                self.handle_chain_pending_operations(batched_operations.into_iter())?;
        }

        Ok((affected_operations, index_operations_count))
    }

    pub fn maybe_index_chain_blocks(&mut self) -> Result<Vec<OperationId>, Error> {
        let mut affected_operations = Vec::new();
        self.index_chain_new_blocks(Some(&mut affected_operations))?;
        Ok(affected_operations)
    }

    pub fn search<Q: Borrow<EntityQuery>>(&self, query: Q) -> Result<EntityResults, Error> {
        let searcher = searcher::Searcher::new(
            &self.chain_index,
            &self.pending_index,
            &self.gc,
            |cache, entity_id, projections| {
                self.fetch_and_cache_entity_mutations_metadata(cache, entity_id, projections)
            },
            |entity_results, include_deleted| {
                self.populate_results_traits(entity_results, include_deleted)
            },
            query.borrow(),
        );

        searcher.search()
    }

    /// Calls the garbage collector to run a pass on entities that got flagged
    /// and generates deletion mutations.
    pub fn run_garbage_collector(&self) -> Result<Vec<EntityMutation>, Error> {
        let last_chain_indexed_block = self
            .last_chain_indexed_block()
            .map_err(|err| anyhow!("Couldn't get last chain indexed block: {}", err))?;
        if last_chain_indexed_block.is_none() {
            // we can only run GC from chain index
            return Ok(Vec::new());
        };

        let deletions = self
            .gc
            .run(|entity_id| self.fetch_entity_mutations_metadata(entity_id));

        Ok(deletions)
    }

    /// Creates the chain index based on configuration.
    fn create_chain_index<P: AsRef<Path>>(
        config: EntityIndexConfig,
        schemas: &Arc<Registry>,
        chain_index_dir: P,
    ) -> Result<MutationIndex, Error> {
        if !config.chain_index_in_memory {
            MutationIndex::open_or_create_mmap(
                config.chain_index_config,
                schemas.clone(),
                chain_index_dir.as_ref(),
            )
        } else {
            MutationIndex::create_in_memory(config.chain_index_config, schemas.clone())
        }
    }

    /// Re-indexes the pending store completely, along the last few blocks of
    /// the chain (see `EntitiesIndexConfig`.`chain_index_min_depth`) that
    /// are not considered definitive yet.
    fn reindex_pending(&mut self) -> Result<(), Error> {
        self.pending_index = MutationIndex::create_in_memory(
            self.config.pending_index_config,
            self.full_cell.cell().schemas().clone(),
        )?;
        self.pending_index
            .set_full_text_boost(self.config.pending_index_boost);

        let last_chain_indexed_offset = self
            .last_chain_indexed_block()?
            .map(|(offset, _height)| offset)
            .unwrap_or(0);

        info!(
            "Clearing & re-indexing pending index. last_chain_indexed_offset={}",
            last_chain_indexed_offset
        );

        // create an iterator over operations in pending store that aren't in chain
        // index
        let pending_iter = self
            .chain_handle
            .get_pending_operations(..)?
            .into_iter()
            .filter(|op| match op.status {
                EngineOperationStatus::Pending => true,
                EngineOperationStatus::Committed(offset, _height) => {
                    offset > last_chain_indexed_offset
                }
            });

        // combine pending and chain operations that aren't deemed committed yet
        let pending_and_chain_iter = {
            // filter pending to exclude operations that are now in the chain index
            let pending_iter =
                pending_iter.filter(move |op| op.status == EngineOperationStatus::Pending);

            // take operations from chain that have not been indexed to the chain index yet
            let chain_iter = self
                .chain_handle
                .get_chain_operations(Some(last_chain_indexed_offset))
                .filter(move |op| {
                    if let EngineOperationStatus::Committed(offset, _height) = op.status {
                        offset > last_chain_indexed_offset
                    } else {
                        false
                    }
                });

            Box::new(chain_iter.chain(pending_iter))
        };

        let mutations_iter =
            pending_and_chain_iter.flat_map(IndexOperation::from_pending_engine_operation);
        self.pending_index.apply_operations(mutations_iter)?;

        Ok(())
    }

    /// Re-indexes the chain index completely
    fn reindex_chain(&mut self) -> Result<(), Error> {
        info!("Clearing & reindexing chain index");

        // create temporary in-memory to wipe directory
        self.chain_index = MutationIndex::create_in_memory(
            self.config.pending_index_config,
            self.full_cell.cell().schemas().clone(),
        )?;

        // remove and re-create data dir
        std::fs::remove_dir_all(&self.chain_index_dir)?;
        std::fs::create_dir_all(&self.chain_index_dir)?;

        // re-create index, and force re-index of chain
        self.chain_index = Self::create_chain_index(
            self.config,
            self.full_cell.cell().schemas(),
            &self.chain_index_dir,
        )?;
        self.index_chain_new_blocks(None)?;

        self.reindex_pending()?;

        Ok(())
    }

    /// Checks if we need to index any new block in the chain.
    /// Blocks don't get indexed as soon as they appear in the chain so that we
    /// don't need to revert them from the chain index since their wouldn't
    /// be "easy" way to revert them from the chain index (Tantivy don't
    /// support deletion revert).
    ///
    /// The latest blocks that aren't considered definitive are kept in the
    /// pending store, and deletion are actually implemented using tombstone
    /// in the pending store. If a trait gets deleted from the chain, the
    /// tombstone in the in-memory will be used to remove it from
    /// the results.
    fn index_chain_new_blocks(
        &mut self,
        affected_operations: Option<&mut Vec<OperationId>>,
    ) -> Result<usize, Error> {
        let (_last_chain_block_offset, last_chain_block_height) = self
            .chain_handle
            .get_chain_last_block_info()?
            .ok_or_else(|| anyhow!("Tried to index chain, but it had no blocks in it"))?;

        let chain_index_min_depth = self.config.chain_index_min_depth;
        let chain_index_depth_leeway = self.config.chain_index_depth_leeway;
        let last_indexed_block = self.last_chain_indexed_block()?;
        let offset_from = last_indexed_block.map(|(offset, _height)| offset);
        if let Some((_last_indexed_offset, last_indexed_height)) = last_indexed_block {
            let depth = last_chain_block_height - last_indexed_height;
            if depth < chain_index_min_depth || depth < chain_index_depth_leeway {
                debug!(
                    "No need to index new blocks to chain index. last_chain_block_height={} last_indexed_block_height={} depth={} min_depth={} leeway={}",
                    last_chain_block_height, last_indexed_height, depth, chain_index_min_depth, chain_index_depth_leeway,
                );
                return Ok(0);
            }
        }

        let pending_index_empty = self.pending_index.highest_indexed_block()?.is_none();

        let mut pending_index_mutations = Vec::new();
        let mut new_highest_block_offset: Option<BlockOffset> = None;
        let mut affected_operations_ref = affected_operations;

        let operations = self.chain_handle.get_chain_operations(offset_from);
        let chain_index_mutations = operations
            .flat_map(|operation| {
                if let Some(affected_operations) = affected_operations_ref.as_mut() {
                    affected_operations.push(operation.operation_id);
                }

                if let EngineOperationStatus::Committed(offset, height) = operation.status {
                    Some((offset, height, operation))
                } else {
                    None
                }
            })
            .filter(|(offset, height, _engine_operation)| {
                // make sure that this operation belongs to the chain index by making sure its
                // depth is below the configured chain_index_min_depth.
                *offset > offset_from.unwrap_or(0)
                    && last_chain_block_height.saturating_sub(*height) >= chain_index_min_depth
            })
            .flat_map(|(offset, _height, engine_operation)| {
                let operation_id = engine_operation.operation_id;
                let (index_ops, entity_id) =
                    IndexOperation::from_chain_engine_operation(engine_operation, offset);

                if !pending_index_empty {
                    // delete from pending index if it's not already empty
                    pending_index_mutations.push(IndexOperation::DeleteEntityOperation(
                        entity_id,
                        operation_id,
                    ));
                }

                // take note of the latest block that we indexed in chain
                if Some(offset) > new_highest_block_offset {
                    new_highest_block_offset = Some(offset);
                }

                index_ops
            });

        let before_apply = Instant::now();
        self.chain_index.apply_operations(chain_index_mutations)?;

        let index_operations_count = pending_index_mutations.len();
        if index_operations_count > 0 {
            self.pending_index
                .apply_operations(pending_index_mutations.into_iter())?;

            if let Some(new_highest_block_offset) = new_highest_block_offset {
                self.chain_index_last_block = Some(new_highest_block_offset);
            }

            info!(
                "Indexed in chain, and deleted from pending {} operations (from offset {:?}) in {:?}. New chain index last offset is {:?}.",
                index_operations_count,
                offset_from,
                before_apply.elapsed(),
                new_highest_block_offset
            );
        }

        Ok(index_operations_count)
    }

    /// Gets last block that got indexed in the chain index
    fn last_chain_indexed_block(&self) -> Result<Option<(BlockOffset, BlockHeight)>, Error> {
        let mut last_indexed_offset = self.chain_index_last_block;

        if last_indexed_offset.is_none() {
            last_indexed_offset = self.chain_index.highest_indexed_block()?;
        }

        match last_indexed_offset {
            Some(offset) => {
                let block_info = self.chain_handle.get_chain_block_info(offset)?;
                Ok(block_info)
            }
            None => Ok(None),
        }
    }

    /// Handles new pending store operations events from the chain layer by
    /// indexing them into the pending index.
    ///
    /// Returns number of operations applied on the mutation index.
    fn handle_chain_pending_operations<O>(&mut self, operations_id: O) -> Result<usize, Error>
    where
        O: Iterator<Item = OperationId>,
    {
        #![allow(clippy::needless_collect)] // see https://github.com/rust-lang/rust-clippy/issues/6066
        let mutations = operations_id
            .flat_map(|op_id| match self.chain_handle.get_pending_operation(op_id) {
                Ok(Some(op)) => IndexOperation::from_pending_engine_operation(op),
                Ok(None) => {
                    error!(
                        "An event from chain layer contained a pending operation that wasn't found: operation_id={}",
                        op_id
                    );
                    smallvec![]
                }
                Err(err) => { error!(
                        "An event from chain layer contained that couldn't be fetched from pending operation: {}",
                        err
                    );
                    smallvec![]
                }
            })
            .collect::<Vec<_>>();

        self.pending_index.apply_operations(mutations.into_iter())
    }

    /// Fetches an entity and all its traits from indices and the chain layer.
    /// Traits returned follow mutations in order of operation id.
    #[cfg(test)]
    fn fetch_entity(
        &self,
        entity_id: &str,
    ) -> Result<exocore_protos::generated::exocore_store::Entity, Error> {
        let aggr = self.fetch_aggregated_entity_mutations(entity_id)?;
        let traits = self.fetch_entity_traits(&aggr, false);

        Ok(exocore_protos::generated::exocore_store::Entity {
            id: entity_id.to_string(),
            traits,
            creation_date: aggr.creation_date.map(|t| t.to_proto_timestamp()),
            modification_date: aggr.modification_date.map(|t| t.to_proto_timestamp()),
            deletion_date: aggr.deletion_date.map(|t| t.to_proto_timestamp()),
            last_operation_id: aggr.last_operation_id,
        })
    }

    /// Fetches indexed mutations metadata from pending and chain indices for
    /// this entity id and aggregate them.
    pub(super) fn fetch_aggregated_entity_mutations(
        &self,
        entity_id: &str,
    ) -> Result<EntityAggregator, Error> {
        let mutations_metadata = self.fetch_entity_mutations_metadata(entity_id)?;
        Ok(EntityAggregator::new(mutations_metadata))
    }

    /// Fetches indexed mutations metadata from pending and chain indices for
    /// this entity id.
    pub(super) fn fetch_entity_mutations_metadata(
        &self,
        entity_id: &str,
    ) -> Result<impl Iterator<Item = MutationMetadata>, Error> {
        let pending_results = self.pending_index.fetch_entity_mutations(entity_id)?;
        let chain_results = self.chain_index.fetch_entity_mutations(entity_id)?;
        Ok(pending_results
            .mutations
            .into_iter()
            .chain(chain_results.mutations)
            .sorted_by_key(|result| {
                // sorts mutations in order they got committed (block offset/pending, then
                // operation id)
                let block_offset = result.block_offset.unwrap_or(u64::MAX);
                (block_offset, result.operation_id)
            })
            .dedup_by(|a, b| {
                // make sure we don't have duplicate across pending & chain (may happen
                // temporarily)
                a.operation_id == b.operation_id
            }))
    }

    /// Fetches and cache indexed mutations metadata.
    fn fetch_and_cache_entity_mutations_metadata(
        &self,
        cache: &mut searcher::EntityMetaCache,
        entity_id: &str,
        projections: &[Projection],
    ) -> Option<Rc<EntityAggregator>> {
        let entity_mutations = if let Some(mutations) = cache.get(entity_id) {
            mutations.clone()
        } else {
            let mut entity_mutations = self
                .fetch_aggregated_entity_mutations(entity_id)
                .map_err(|err| {
                    error!(
                        "Error fetching mutations metadata for entity_id={} from indices: {}",
                        entity_id, err
                    );
                    err
                })
                .ok()?;

            if !projections.is_empty() {
                entity_mutations.annotate_projections(projections);
            }

            let entity_mutations = Rc::new(entity_mutations);

            cache.insert(entity_id.to_string(), entity_mutations.clone());

            entity_mutations
        };
        Some(entity_mutations)
    }

    /// Populates traits in the EntityResult by fetching each entity's traits
    /// from the chain layer.
    fn populate_results_traits(
        &self,
        entity_results: &mut Vec<searcher::SearchResult>,
        include_deleted: bool,
    ) {
        for entity_result in entity_results {
            if entity_result.mutations.should_collect() {
                self.gc.maybe_flag_for_collection(&entity_result.mutations);
            }

            let traits = self.fetch_entity_traits(&entity_result.mutations, include_deleted);
            if let Some(entity) = entity_result.proto.entity.as_mut() {
                entity.traits = traits;
            }
        }
    }

    /// Fetches traits data from chain layer.
    fn fetch_entity_traits(
        &self,
        entity_mutations: &EntityAggregator,
        include_deleted: bool,
    ) -> Vec<Trait> {
        entity_mutations
            .traits
            .iter()
            .filter_map(|(trait_id, agg)| {
                if let Some(projection) = &agg.projection {
                    if projection.skip {
                        return None;
                    }
                }

                if agg.deletion_date.is_some() && !include_deleted {
                    return None;
                }

                let (mut_metadata, _put_mut_metadata) = agg.last_put_mutation()?;

                let mutation = self.fetch_chain_mutation_operation(
                    mut_metadata.operation_id,
                    mut_metadata.block_offset,
                );
                let mutation = match mutation {
                    Ok(Some(mutation)) => mutation,
                    other => {
                        error!(
                            "Couldn't fetch operation_id={} for entity_id={}: {:?}",
                            mut_metadata.operation_id, mut_metadata.entity_id, other
                        );
                        return None;
                    }
                };

                if mutation.entity_id != entity_mutations.entity_id {
                    error!(
                        "Fetched from chain operation {} that didn't belong to entity {}, but entity {}",
                        mut_metadata.operation_id, entity_mutations.entity_id, mutation.entity_id
                    );
                    return None;
                }

                let mut trt = match mutation.mutation? {
                    Mutation::PutTrait(put_mut) => put_mut.r#trait,
                    Mutation::DeleteTrait(_)
                    | Mutation::DeleteEntity(_)
                    | Mutation::DeleteOperations(_)
                    | Mutation::Test(_) => return None,
                }?;

                if let Some(projection) = &agg.projection {
                    let res = project_trait_fields(
                        self.full_cell.cell().schemas().as_ref(),
                        &mut trt,
                        projection.as_ref(),
                    );

                    if let Err(err) = res {
                        error!(
                            "Couldn't run projection on trait_id={} of entity_id={}: {:?}",
                            trait_id, mut_metadata.entity_id, err,
                        );
                    }
                }

                // update the trait's dates that got merged from metadata
                trt.creation_date = agg.creation_date.map(|d| d.to_proto_timestamp());
                trt.modification_date = agg.modification_date.map(|d| d.to_proto_timestamp());
                trt.deletion_date = agg.deletion_date.map(|d| d.to_proto_timestamp());
                trt.last_operation_id = agg.last_operation_id.unwrap_or_default();

                Some(trt)
            })
            .collect()
    }

    /// Fetches an operation from the chain layer by the given operation id and
    /// optional block offset.
    fn fetch_chain_mutation_operation(
        &self,
        operation_id: OperationId,
        block_offset: Option<BlockOffset>,
    ) -> Result<Option<EntityMutation>, Error> {
        let operation = if let Some(block_offset) = block_offset {
            self.chain_handle
                .get_chain_operation(block_offset, operation_id)?
        } else {
            self.chain_handle.get_operation(operation_id)?
        };

        let Some(operation) = operation else {
            return Ok(None);
        };

        if let Ok(data) = operation.as_entry_data() {
            let mutation = EntityMutation::decode(data)?;
            Ok(Some(mutation))
        } else {
            Ok(None)
        }
    }
}