exocore_store/local/
store.rs

1use std::{
2    pin::Pin,
3    sync::{Arc, Mutex, RwLock, Weak},
4    task::{Context, Poll},
5    time::Instant,
6};
7
8use async_trait::async_trait;
9use exocore_chain::engine::Event;
10use exocore_core::{
11    cell::Cell,
12    futures::{interval, spawn_blocking, BatchingStream},
13    time::{AtomicInstant, Clock},
14    utils::handle_set::{Handle, HandleSet},
15};
16use exocore_protos::{
17    generated::exocore_store::{
18        entity_mutation::Mutation, entity_query, EntityQuery, EntityResults, MutationRequest,
19        MutationResult,
20    },
21    prost::Message,
22    store::OperationsPredicate,
23};
24use futures::{
25    channel::{mpsc, oneshot},
26    prelude::*,
27};
28
29use super::{entity_index::EntityIndex, StoreConfig};
30use crate::{
31    error::Error,
32    local::{mutation_tracker::MutationTracker, watched_queries::WatchedQueries},
33    mutation::MutationRequestLike,
34    query::WatchToken,
35};
36
37/// Locally persisted entities store allowing mutation and queries on entities
38/// and their traits.
39///
40/// It forwards mutation requests to the chain engine, receives back chain
41/// events that get indexed by the entities index. Queries are executed by the
42/// entities index.
43pub struct Store<CS, PS>
44where
45    CS: exocore_chain::chain::ChainStore,
46    PS: exocore_chain::pending::PendingStore,
47{
48    config: StoreConfig,
49    handle_set: HandleSet,
50    incoming_queries_receiver: mpsc::Receiver<QueryRequest>,
51    inner: Arc<RwLock<Inner<CS, PS>>>,
52}
53
54impl<CS, PS> Store<CS, PS>
55where
56    CS: exocore_chain::chain::ChainStore,
57    PS: exocore_chain::pending::PendingStore,
58{
59    pub fn new(
60        config: StoreConfig,
61        cell: Cell,
62        clock: Clock,
63        chain_handle: exocore_chain::engine::EngineHandle<CS, PS>,
64        index: EntityIndex<CS, PS>,
65    ) -> Result<Store<CS, PS>, Error> {
66        let (incoming_queries_sender, incoming_queries_receiver) =
67            mpsc::channel::<QueryRequest>(config.query_channel_size);
68
69        let inner = Arc::new(RwLock::new(Inner {
70            config,
71            cell,
72            clock,
73            index,
74            watched_queries: WatchedQueries::new(),
75            mutation_tracker: MutationTracker::new(config),
76            incoming_queries_sender,
77            chain_handle,
78        }));
79
80        Ok(Store {
81            config,
82            handle_set: HandleSet::new(),
83            incoming_queries_receiver,
84            inner,
85        })
86    }
87
88    pub fn get_handle(&self) -> StoreHandle<CS, PS> {
89        StoreHandle {
90            config: self.config,
91            handle: self.handle_set.get_handle(),
92            inner: Arc::downgrade(&self.inner),
93        }
94    }
95
96    pub async fn run(self) -> Result<(), Error> {
97        let config = self.config;
98
99        // Incoming queries execution
100        let incoming_queries_receiver = self.incoming_queries_receiver;
101        let weak_inner1 = Arc::downgrade(&self.inner);
102        let weak_inner2 = Arc::downgrade(&self.inner);
103        let last_user_query_shared = Arc::new(AtomicInstant::new());
104        let last_user_query = last_user_query_shared.clone();
105        let queries_executor = async move {
106            let mut executed_queries = incoming_queries_receiver
107                .map(move |query_request: QueryRequest| {
108                    if !query_request.query.programmatic {
109                        last_user_query.update_now();
110                    }
111
112                    let weak_inner = weak_inner1.clone();
113                    async move {
114                        let result =
115                            Inner::execute_query_async(weak_inner, query_request.query.clone())
116                                .await;
117                        (result, query_request)
118                    }
119                })
120                .buffer_unordered(config.query_parallelism);
121
122            while let Some((result, query_request)) = executed_queries.next().await {
123                let inner = weak_inner2.upgrade().ok_or(Error::Dropped)?;
124                let inner = inner.read()?;
125
126                if let Err(err) = &result {
127                    warn!("Error executing query: err={}", err);
128                }
129
130                let should_reply = match (&query_request.sender, &result) {
131                    (QueryRequestSender::WatchedQuery(_sender, watch_token), Ok(result)) => inner
132                        .watched_queries
133                        .update_query_results(*watch_token, result),
134
135                    (QueryRequestSender::WatchedQuery(_, watch_token), Err(_err)) => {
136                        inner.watched_queries.unwatch_query(*watch_token);
137                        true
138                    }
139
140                    (QueryRequestSender::Query(_), _result) => true,
141                };
142
143                if should_reply {
144                    query_request.reply(result);
145                }
146            }
147
148            Ok::<(), Error>(())
149        };
150
151        // Schedules chain engine events stream
152        let mut events_stream = {
153            let mut inner = self.inner.write()?;
154            let events = inner.chain_handle.take_events_stream()?;
155
156            // batching stream consumes all available events from stream without waiting
157            BatchingStream::new(events, config.chain_events_batch_size)
158        };
159        let (mut watch_check_sender, mut watch_check_receiver) = mpsc::channel(1);
160        let weak_inner = Arc::downgrade(&self.inner);
161        let chain_events_handler = async move {
162            while let Some(events) = events_stream.next().await {
163                match Inner::handle_chain_engine_events(&weak_inner, events).await {
164                    Ok(indexed_operations) if indexed_operations > 0 => {
165                        // notify query watching. if it's full, it's guaranteed that it will catch
166                        // those changes on next iteration
167                        let _ = watch_check_sender.try_send(());
168                    }
169                    Err(err) => {
170                        error!("Error handling chain engine event: {}", err);
171                        if err.is_fatal() {
172                            return Err(err);
173                        }
174                    }
175                    Ok(_) => {}
176                }
177            }
178            Ok(())
179        };
180
181        // Checks if watched queries have their results changed
182        let weak_inner = Arc::downgrade(&self.inner);
183        let watched_queries_checker = async move {
184            while watch_check_receiver.next().await.is_some() {
185                let inner = weak_inner.upgrade().ok_or(Error::Dropped)?;
186                let mut inner = inner.write()?;
187
188                let queries = inner.watched_queries.queries();
189                if queries.is_empty() {
190                    continue;
191                }
192
193                debug!(
194                    "Store may have changed because of new events. Checking for {} queries",
195                    queries.len()
196                );
197                for watched_query in queries {
198                    let send_result = inner.incoming_queries_sender.try_send(QueryRequest {
199                        query: watched_query.query,
200                        sender: QueryRequestSender::WatchedQuery(
201                            watched_query.sender.clone(),
202                            watched_query.token,
203                        ),
204                    });
205
206                    if let Err(err) = send_result {
207                        error!(
208                            "Error sending to watched query. Removing it from queries: {}",
209                            err
210                        );
211                        inner.watched_queries.unwatch_query(watched_query.token);
212                    }
213                }
214            }
215
216            Ok::<(), Error>(())
217        };
218
219        // Index chain at interval when idling
220        let last_user_query = last_user_query_shared.clone();
221        let weak_inner = Arc::downgrade(&self.inner);
222        let chain_indexer = async move {
223            let Some(interval_dur) = self.config.chain_index_deferred_interval else {
224                future::pending::<()>().await; // deferred disabled, wait forever
225                return Ok(());
226            };
227
228            let mut chain_index_interval = interval(interval_dur);
229            let mut last_index_attempt = Instant::now();
230            loop {
231                chain_index_interval.tick().await;
232
233                if last_user_query.elapsed() < self.config.chain_index_deferred_query_interval
234                    && last_index_attempt.elapsed() < self.config.chain_index_deferred_max_interval
235                {
236                    debug!(
237                        "Skipping indexing because of recent query (last_query={:?} last_index_attempt={:?})",
238                        last_user_query.elapsed(),
239                        last_index_attempt.elapsed(),
240                    );
241                    continue;
242                }
243
244                let weak_inner = weak_inner.clone();
245                spawn_blocking(move || {
246                    let inner = weak_inner.upgrade().ok_or(Error::Dropped)?;
247                    let mut inner = inner.write()?;
248
249                    let affected_operations = inner.index.maybe_index_chain_blocks()?;
250                    if !affected_operations.is_empty() {
251                        inner
252                            .mutation_tracker
253                            .handle_indexed_operations(affected_operations.as_slice());
254                    }
255
256                    Ok::<(), Error>(())
257                })
258                .await
259                .map_err(|err| {
260                    Error::Other(anyhow!("Couldn't launch indexation operation: {:?}", err))
261                })??;
262
263                last_index_attempt = Instant::now();
264            }
265
266            // types the async block
267            #[allow(unreachable_code)]
268            Ok::<(), Error>(())
269        };
270
271        // Runs a garbage collection pass on entity index, which will only get executed
272        // if entities to be collected got flagged in a previous search query and added
273        // to the garbage collector queue.
274        // See [GarbageCollector](super::entity_index::gc::GarbageCollector)
275        let mut gc_interval = interval(config.garbage_collect_interval);
276        let weak_inner = Arc::downgrade(&self.inner);
277        let garbage_collector = async move {
278            loop {
279                gc_interval.tick().await;
280
281                let inner = weak_inner.upgrade().ok_or(Error::Dropped)?;
282                let inner = inner.read()?;
283                match inner.index.run_garbage_collector() {
284                    Ok(deletions) => {
285                        let deletion_request = MutationRequest {
286                            mutations: deletions,
287                            ..Default::default()
288                        };
289
290                        if let Err(err) = inner.handle_mutation_request(deletion_request) {
291                            error!("Error executing mutations from garbage collection: {}", err);
292                        }
293                    }
294                    Err(err) => {
295                        error!("Error running garbage collection: {}", err);
296                        if err.is_fatal() {
297                            return Err(err);
298                        }
299                    }
300                }
301            }
302
303            // types the async block
304            #[allow(unreachable_code)]
305            Ok::<(), Error>(())
306        };
307
308        info!("Entity store started");
309
310        futures::select! {
311            _ = self.handle_set.on_handles_dropped().fuse() => {},
312            _ = queries_executor.fuse() => {},
313            _ = chain_events_handler.fuse() => {},
314            _ = watched_queries_checker.fuse() => {},
315            _ = chain_indexer.fuse() => {},
316            _ = garbage_collector.fuse() => {},
317        }
318
319        Ok(())
320    }
321}
322
323struct Inner<CS, PS>
324where
325    CS: exocore_chain::chain::ChainStore,
326    PS: exocore_chain::pending::PendingStore,
327{
328    config: StoreConfig,
329    cell: Cell,
330    clock: Clock,
331    index: EntityIndex<CS, PS>,
332    watched_queries: WatchedQueries,
333    mutation_tracker: MutationTracker,
334    incoming_queries_sender: mpsc::Sender<QueryRequest>,
335    chain_handle: exocore_chain::engine::EngineHandle<CS, PS>,
336}
337
338impl<CS, PS> Inner<CS, PS>
339where
340    CS: exocore_chain::chain::ChainStore,
341    PS: exocore_chain::pending::PendingStore,
342{
343    fn handle_mutation_request(
344        &self,
345        mut request: MutationRequest,
346    ) -> Result<oneshot::Receiver<Result<MutationResult, Error>>, Error> {
347        let (sender, receiver) = oneshot::channel();
348
349        let mut operation_ids = Vec::new();
350        let mut last_entity_id = None;
351        for mutation in &mut request.mutations {
352            if let Some(Mutation::Test(_mutation)) = &mutation.mutation {
353                return Err(Error::ProtoFieldExpected("mutation"));
354            }
355
356            // if mutation doesn't have an entity id, generate one
357            if mutation.entity_id.is_empty() {
358                if request.common_entity_id && last_entity_id.is_some() {
359                    mutation.entity_id = last_entity_id.unwrap_or_default();
360                } else {
361                    mutation.entity_id = exocore_core::utils::id::generate_prefixed_id("et");
362                }
363            }
364            last_entity_id = Some(mutation.entity_id.clone());
365
366            // if a put mutation doesn't have a trait id, generate one
367            if let Some(Mutation::PutTrait(put_mutation)) = &mut mutation.mutation {
368                if let Some(trt) = &mut put_mutation.r#trait {
369                    if trt.id.is_empty() {
370                        trt.id = exocore_core::utils::id::generate_prefixed_id("trt");
371                    }
372                }
373            }
374
375            let encoded = mutation.encode_to_vec();
376            let operation_id = self.chain_handle.write_entry_operation(&encoded)?;
377
378            operation_ids.push(operation_id);
379        }
380
381        if (request.wait_indexed || request.return_entities) && !request.mutations.is_empty() {
382            self.mutation_tracker.track_request(operation_ids, sender);
383        } else {
384            let _ = sender.send(Ok(MutationResult {
385                operation_ids,
386                ..Default::default()
387            }));
388        }
389
390        Ok(receiver)
391    }
392
393    async fn execute_query_async(
394        weak_inner: Weak<RwLock<Inner<CS, PS>>>,
395        query: Box<EntityQuery>,
396    ) -> Result<EntityResults, Error> {
397        let result = spawn_blocking(move || {
398            let inner = weak_inner.upgrade().ok_or(Error::Dropped)?;
399            let inner = inner.read()?;
400
401            inner.index.search(query)
402        })
403        .await;
404
405        result.map_err(|err| anyhow!("Couldn't launch blocking query: {}", err))?
406    }
407
408    async fn handle_chain_engine_events(
409        weak_inner: &Weak<RwLock<Inner<CS, PS>>>,
410        events: Vec<exocore_chain::engine::Event>,
411    ) -> Result<usize, Error> {
412        let weak_inner = weak_inner.clone();
413
414        let indexed_operations = spawn_blocking(move || -> Result<usize, Error> {
415            let inner = weak_inner.upgrade().ok_or(Error::Dropped)?;
416            let mut inner = inner.write()?;
417
418            // if deferred commit is enabled, we don't forward new chain block events to the
419            // entity index
420            let deferred_commit = inner.config.chain_index_deferred_interval.is_some();
421            let filtered_events = events
422                .into_iter()
423                .filter(|event| !deferred_commit || !matches!(event, Event::NewChainBlock(_)));
424
425            let (affected_operations, indexed_operations) =
426                inner.index.handle_chain_engine_events(filtered_events)?;
427            inner
428                .mutation_tracker
429                .handle_indexed_operations(affected_operations.as_slice());
430
431            Ok(indexed_operations)
432        })
433        .await;
434
435        indexed_operations.map_err(|err| anyhow!("Couldn't launch blocking query: {}", err))?
436    }
437}
438
439/// Handle to the store, allowing communication to the store asynchronously
440pub struct StoreHandle<CS, PS>
441where
442    CS: exocore_chain::chain::ChainStore,
443    PS: exocore_chain::pending::PendingStore,
444{
445    config: StoreConfig,
446    handle: Handle,
447    inner: Weak<RwLock<Inner<CS, PS>>>,
448}
449
450impl<CS, PS> StoreHandle<CS, PS>
451where
452    CS: exocore_chain::chain::ChainStore,
453    PS: exocore_chain::pending::PendingStore,
454{
455    pub async fn on_start(&self) {
456        self.handle.on_set_started().await
457    }
458
459    #[cfg(test)]
460    pub(crate) fn watched_queries(&self) -> Vec<WatchToken> {
461        let inner = self.inner.upgrade().unwrap();
462        let inner = inner.read().unwrap();
463
464        let mut tokens = Vec::new();
465        for query in inner.watched_queries.queries() {
466            tokens.push(query.token);
467        }
468
469        tokens
470    }
471
472    #[cfg(test)]
473    pub(crate) fn clear_watched_queries(&self) {
474        let inner = self.inner.upgrade().unwrap();
475        let inner = inner.read().unwrap();
476
477        inner.watched_queries.clear();
478    }
479}
480
481#[async_trait]
482impl<CS, PS> crate::store::Store for StoreHandle<CS, PS>
483where
484    CS: exocore_chain::chain::ChainStore,
485    PS: exocore_chain::pending::PendingStore,
486{
487    type WatchedQueryStream = WatchedQueryStream<CS, PS>;
488
489    async fn mutate<M: Into<MutationRequestLike> + Send>(
490        &self,
491        request: M,
492    ) -> Result<MutationResult, Error> {
493        let inner = self.inner.upgrade().ok_or(Error::Dropped)?;
494
495        let request = request.into().0;
496        let return_entities = request.return_entities;
497
498        let mutation_future = {
499            let inner = inner.read().map_err(|_| Error::Dropped)?;
500            inner.handle_mutation_request(request)?
501        };
502
503        let mut mutation_result = mutation_future.await.map_err(|_err| Error::Cancelled)??;
504
505        if return_entities {
506            let query_result = self
507                .query(EntityQuery {
508                    predicate: Some(entity_query::Predicate::Operations(OperationsPredicate {
509                        operation_ids: mutation_result.operation_ids.clone(),
510                    })),
511                    ..Default::default()
512                })
513                .await?;
514
515            mutation_result.entities = query_result
516                .entities
517                .into_iter()
518                .flat_map(|e| e.entity)
519                .collect();
520        }
521
522        Ok(mutation_result)
523    }
524
525    async fn query(&self, query: EntityQuery) -> Result<EntityResults, Error> {
526        let inner = self.inner.upgrade().ok_or(Error::Dropped)?;
527
528        let receiver = {
529            let mut inner = inner.write().map_err(|_| Error::Dropped)?;
530
531            // ok to dismiss send as sender end will be dropped in case of an error and
532            // consumer will be notified by channel being closed
533            let (sender, receiver) = oneshot::channel();
534            let _ = inner.incoming_queries_sender.try_send(QueryRequest {
535                query: Box::new(query),
536                sender: QueryRequestSender::Query(sender),
537            });
538
539            receiver
540        };
541
542        receiver.await.map_err(|_err| Error::Cancelled)?
543    }
544
545    fn watched_query(&self, mut query: EntityQuery) -> Result<Self::WatchedQueryStream, Error> {
546        let inner = self.inner.upgrade().ok_or(Error::Dropped)?;
547        let mut inner = inner.write().map_err(|_| Error::Dropped)?;
548
549        let mut watch_token = query.watch_token;
550        if watch_token == 0 {
551            watch_token = inner.clock.consistent_time(inner.cell.local_node()).into();
552            query.watch_token = watch_token;
553        }
554
555        let (sender, receiver) = mpsc::channel(self.config.handle_watch_query_channel_size);
556        let sender = Arc::new(Mutex::new(sender));
557
558        inner
559            .watched_queries
560            .track_query(watch_token, &query, sender.clone());
561
562        // ok to dismiss send as sender end will be dropped in case of an error and
563        // consumer will be notified by channel being closed
564        let _ = inner.incoming_queries_sender.try_send(QueryRequest {
565            query: Box::new(query),
566            sender: QueryRequestSender::WatchedQuery(sender, watch_token),
567        });
568
569        Ok(WatchedQueryStream {
570            watch_token,
571            inner: self.inner.clone(),
572            receiver,
573        })
574    }
575}
576
577impl<CS, PS> Clone for StoreHandle<CS, PS>
578where
579    CS: exocore_chain::chain::ChainStore,
580    PS: exocore_chain::pending::PendingStore,
581{
582    fn clone(&self) -> StoreHandle<CS, PS> {
583        StoreHandle {
584            config: self.config,
585            handle: self.handle.clone(),
586            inner: self.inner.clone(),
587        }
588    }
589}
590
591/// A query received through the `watched_query` method that needs to be watched
592/// and notified when new changes happen to the store that would affects its
593/// results.
594pub struct WatchedQueryStream<CS, PS>
595where
596    CS: exocore_chain::chain::ChainStore,
597    PS: exocore_chain::pending::PendingStore,
598{
599    watch_token: WatchToken,
600    inner: Weak<RwLock<Inner<CS, PS>>>,
601    receiver: mpsc::Receiver<Result<EntityResults, Error>>,
602}
603
604impl<CS, PS> futures::Stream for WatchedQueryStream<CS, PS>
605where
606    CS: exocore_chain::chain::ChainStore,
607    PS: exocore_chain::pending::PendingStore,
608{
609    type Item = Result<EntityResults, Error>;
610
611    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
612        self.receiver.poll_next_unpin(cx)
613    }
614}
615
616impl<CS, PS> Drop for WatchedQueryStream<CS, PS>
617where
618    CS: exocore_chain::chain::ChainStore,
619    PS: exocore_chain::pending::PendingStore,
620{
621    fn drop(&mut self) {
622        if let Some(inner) = self.inner.upgrade() {
623            if let Ok(inner) = inner.read() {
624                inner.watched_queries.unwatch_query(self.watch_token);
625            }
626        }
627    }
628}
629
630/// Incoming query to be executed, or re-scheduled watched query to be
631/// re-executed.
632pub(crate) struct QueryRequest {
633    pub query: Box<EntityQuery>,
634    pub sender: QueryRequestSender,
635}
636
637pub(crate) enum QueryRequestSender {
638    Query(oneshot::Sender<Result<EntityResults, Error>>),
639    WatchedQuery(
640        Arc<Mutex<mpsc::Sender<Result<EntityResults, Error>>>>,
641        WatchToken,
642    ),
643}
644
645impl QueryRequest {
646    fn reply(mut self, result: Result<EntityResults, Error>) {
647        match self.sender {
648            QueryRequestSender::Query(sender) => {
649                let _ = sender.send(result);
650            }
651            QueryRequestSender::WatchedQuery(ref mut sender, _token) => {
652                if let Ok(mut sender) = sender.lock() {
653                    let _ = sender.try_send(result);
654                }
655            }
656        }
657    }
658}
659
660#[cfg(test)]
661pub mod tests {
662    use std::time::Duration;
663
664    use exocore_core::{
665        futures::sleep,
666        tests_utils::{async_expect_eventually_fallible, async_test_retry},
667    };
668    use exocore_protos::{prost::ProstAnyPackMessageExt, store::Trait, test::TestMessage};
669    use futures::executor::block_on_stream;
670
671    use super::{super::TestStore, *};
672    use crate::{
673        local::{entity_index::gc::GarbageCollectorConfig, EntityIndexConfig},
674        mutation::MutationBuilder,
675        query::QueryBuilder,
676        store::Store,
677    };
678
679    #[tokio::test(flavor = "multi_thread")]
680    async fn store_mutate_query_via_handle() -> anyhow::Result<()> {
681        let mut test_store = TestStore::new().await?;
682        test_store.start_store().await?;
683
684        let mutation = test_store.create_put_contact_mutation("entry1", "trt1", "Hello World");
685        let result = test_store.mutate(mutation).await?;
686        assert_eq!(result.entities.len(), 0);
687        test_store
688            .cluster
689            .wait_operation_committed(0, result.operation_ids[0]);
690
691        let query = QueryBuilder::matches("hello").build();
692        let results = test_store.query(query).await?;
693        assert_eq!(results.entities.len(), 1);
694
695        Ok(())
696    }
697
698    #[tokio::test(flavor = "multi_thread")]
699    async fn store_mutate_return_entities() -> anyhow::Result<()> {
700        let mut test_store = TestStore::new().await?;
701        test_store.start_store().await?;
702
703        let mutation = test_store
704            .create_put_contact_mutation("entry1", "trt1", "Hello World")
705            .return_entities();
706        let result = test_store.mutate(mutation).await?;
707        assert_eq!(result.entities.len(), 1);
708
709        let entity = &result.entities[0];
710        assert_eq!(entity.id, "entry1");
711
712        Ok(())
713    }
714
715    #[tokio::test(flavor = "multi_thread")]
716    async fn store_mutate_generate_ids() -> anyhow::Result<()> {
717        let mut test_store = TestStore::new().await?;
718        test_store.start_store().await?;
719
720        let test_msg = TestMessage::default().pack_to_any().unwrap();
721
722        {
723            // generate entity and trait ids if none are given
724            let mutation = test_store
725                .create_put_contact_mutation("", "trt1", "Hello World")
726                .return_entities();
727            let result = test_store.mutate(mutation).await?;
728            assert_eq!(result.entities.len(), 1);
729            assert_ne!("", result.entities[0].id);
730            assert_ne!("", result.entities[0].traits[0].id);
731        }
732
733        {
734            // use same entity id for both mutations if requested to be common ids
735            let mutation = MutationBuilder::new()
736                .put_trait(
737                    "".to_string(),
738                    Trait {
739                        id: "".to_string(),
740                        message: Some(test_msg.clone()),
741                        ..Default::default()
742                    },
743                )
744                .put_trait(
745                    "".to_string(),
746                    Trait {
747                        id: "".to_string(),
748                        message: Some(test_msg),
749                        ..Default::default()
750                    },
751                )
752                .use_common_entity_id()
753                .return_entities();
754            let result = test_store.mutate(mutation).await?;
755
756            // should have created 1 entity with same id, with 2 different traits
757            assert_eq!(result.entities.len(), 1);
758            assert_ne!("", result.entities[0].id);
759            assert_eq!(result.entities[0].traits.len(), 2);
760            assert_ne!("", result.entities[0].traits[0].id);
761            assert_ne!("", result.entities[0].traits[1].id);
762        }
763
764        Ok(())
765    }
766
767    #[tokio::test(flavor = "multi_thread")]
768    async fn query_error_propagating() -> anyhow::Result<()> {
769        let mut test_store = TestStore::new().await?;
770        test_store.start_store().await?;
771
772        let query = QueryBuilder::test(false).build();
773        assert!(test_store.query(query).await.is_err());
774
775        Ok(())
776    }
777
778    #[tokio::test(flavor = "multi_thread")]
779    async fn mutation_error_propagating() -> anyhow::Result<()> {
780        let mut test_store = TestStore::new().await?;
781        test_store.start_store().await?;
782
783        let mutation = MutationBuilder::new().fail_mutation("entity_id".to_string());
784        assert!(test_store.mutate(mutation).await.is_err());
785
786        Ok(())
787    }
788
789    #[tokio::test(flavor = "multi_thread")]
790    async fn watched_query() -> anyhow::Result<()> {
791        let mut test_store = TestStore::new().await?;
792        test_store.start_store().await?;
793
794        let query = QueryBuilder::matches("hello").build();
795        let mut stream = block_on_stream(test_store.store_handle.watched_query(query)?);
796
797        let result = stream.next().unwrap();
798        assert_eq!(result.unwrap().entities.len(), 0);
799
800        let mutation = test_store.create_put_contact_mutation("entry1", "trt1", "Hello World");
801        let response = test_store.mutate(mutation).await?;
802        test_store
803            .cluster
804            .wait_operation_committed(0, response.operation_ids[0]);
805
806        let result = stream.next().unwrap();
807        assert_eq!(result.unwrap().entities.len(), 1);
808
809        let mutation =
810            test_store.create_put_contact_mutation("entry2", "contact2", "Something else");
811        let response = test_store.mutate(mutation).await?;
812        test_store
813            .cluster
814            .wait_operation_committed(0, response.operation_ids[0]);
815
816        let mut stream = stream.into_inner();
817        let delay = sleep(Duration::from_secs(1));
818
819        futures::select! {
820            _ = stream.next().fuse() => {
821                panic!("Got result, should have timed out");
822            },
823            _ = delay.fuse() => {
824                // alright
825            }
826        }
827
828        Ok(())
829    }
830
831    #[tokio::test(flavor = "multi_thread")]
832    async fn watched_query_failure() -> anyhow::Result<()> {
833        let mut test_store = TestStore::new().await?;
834        test_store.start_store().await?;
835
836        let query = QueryBuilder::test(false).build();
837        let mut stream = block_on_stream(test_store.store_handle.watched_query(query)?);
838
839        let result = stream.next().unwrap();
840        assert!(result.is_err());
841
842        Ok(())
843    }
844
845    #[tokio::test(flavor = "multi_thread")]
846    async fn garbage_collection() -> anyhow::Result<()> {
847        async_test_retry(|| async {
848            let store_config = StoreConfig {
849                garbage_collect_interval: Duration::from_millis(300),
850                chain_index_deferred_interval: None, // commit as blocks get committed
851                ..Default::default()
852            };
853            let index_config = EntityIndexConfig {
854                chain_index_min_depth: 0, // index in chain as soon as a block is committed
855                chain_index_depth_leeway: 0, // for tests, we want to index as soon as possible
856                chain_index_in_memory: true,
857                garbage_collector: GarbageCollectorConfig {
858                    deleted_entity_collection: Duration::from_millis(100),
859                    min_operation_age: Duration::from_nanos(1),
860                    ..Default::default()
861                },
862                ..TestStore::test_index_config()
863            };
864
865            let mut test_store = TestStore::new_with_config(store_config, index_config).await?;
866            test_store.start_store().await?;
867
868            {
869                // create an entity and then delete it
870                let mutation =
871                    test_store.create_put_contact_mutation("entity1", "trt1", "Hello entity1");
872                test_store.mutate(mutation).await?;
873
874                let delete = MutationBuilder::new()
875                    .delete_entity("entity1")
876                    .return_entities()
877                    .build();
878                let resp = test_store.mutate(delete).await?;
879                test_store
880                    .cluster
881                    .wait_operation_committed(0, resp.operation_ids[0]);
882            }
883
884            // entity should eventually be completely deleted
885            let store_handle = test_store.store_handle.clone();
886            let ent2_mut = test_store
887                .create_put_contact_mutation("entity2", "trt1", "Hello")
888                .build();
889            async_expect_eventually_fallible(|| async {
890                let query = QueryBuilder::with_id("entity1").include_deleted().build();
891                let res = store_handle.query(query).await?;
892                let is_deleted = res.entities.is_empty();
893
894                if !is_deleted {
895                    // if not yet deleted, we create a new mutation on another entity to make sure
896                    // entity deletion is in chain
897                    store_handle.mutate(ent2_mut.clone()).await.unwrap();
898                    Err(anyhow!("entities still not deleted"))
899                } else {
900                    Ok(())
901                }
902            })
903            .await?;
904
905            Ok(())
906        })
907        .await
908    }
909}