1use std::{
2 pin::Pin,
3 sync::{Arc, Mutex, RwLock, Weak},
4 task::{Context, Poll},
5 time::Instant,
6};
7
8use async_trait::async_trait;
9use exocore_chain::engine::Event;
10use exocore_core::{
11 cell::Cell,
12 futures::{interval, spawn_blocking, BatchingStream},
13 time::{AtomicInstant, Clock},
14 utils::handle_set::{Handle, HandleSet},
15};
16use exocore_protos::{
17 generated::exocore_store::{
18 entity_mutation::Mutation, entity_query, EntityQuery, EntityResults, MutationRequest,
19 MutationResult,
20 },
21 prost::Message,
22 store::OperationsPredicate,
23};
24use futures::{
25 channel::{mpsc, oneshot},
26 prelude::*,
27};
28
29use super::{entity_index::EntityIndex, StoreConfig};
30use crate::{
31 error::Error,
32 local::{mutation_tracker::MutationTracker, watched_queries::WatchedQueries},
33 mutation::MutationRequestLike,
34 query::WatchToken,
35};
36
37pub struct Store<CS, PS>
44where
45 CS: exocore_chain::chain::ChainStore,
46 PS: exocore_chain::pending::PendingStore,
47{
48 config: StoreConfig,
49 handle_set: HandleSet,
50 incoming_queries_receiver: mpsc::Receiver<QueryRequest>,
51 inner: Arc<RwLock<Inner<CS, PS>>>,
52}
53
54impl<CS, PS> Store<CS, PS>
55where
56 CS: exocore_chain::chain::ChainStore,
57 PS: exocore_chain::pending::PendingStore,
58{
59 pub fn new(
60 config: StoreConfig,
61 cell: Cell,
62 clock: Clock,
63 chain_handle: exocore_chain::engine::EngineHandle<CS, PS>,
64 index: EntityIndex<CS, PS>,
65 ) -> Result<Store<CS, PS>, Error> {
66 let (incoming_queries_sender, incoming_queries_receiver) =
67 mpsc::channel::<QueryRequest>(config.query_channel_size);
68
69 let inner = Arc::new(RwLock::new(Inner {
70 config,
71 cell,
72 clock,
73 index,
74 watched_queries: WatchedQueries::new(),
75 mutation_tracker: MutationTracker::new(config),
76 incoming_queries_sender,
77 chain_handle,
78 }));
79
80 Ok(Store {
81 config,
82 handle_set: HandleSet::new(),
83 incoming_queries_receiver,
84 inner,
85 })
86 }
87
88 pub fn get_handle(&self) -> StoreHandle<CS, PS> {
89 StoreHandle {
90 config: self.config,
91 handle: self.handle_set.get_handle(),
92 inner: Arc::downgrade(&self.inner),
93 }
94 }
95
96 pub async fn run(self) -> Result<(), Error> {
97 let config = self.config;
98
99 let incoming_queries_receiver = self.incoming_queries_receiver;
101 let weak_inner1 = Arc::downgrade(&self.inner);
102 let weak_inner2 = Arc::downgrade(&self.inner);
103 let last_user_query_shared = Arc::new(AtomicInstant::new());
104 let last_user_query = last_user_query_shared.clone();
105 let queries_executor = async move {
106 let mut executed_queries = incoming_queries_receiver
107 .map(move |query_request: QueryRequest| {
108 if !query_request.query.programmatic {
109 last_user_query.update_now();
110 }
111
112 let weak_inner = weak_inner1.clone();
113 async move {
114 let result =
115 Inner::execute_query_async(weak_inner, query_request.query.clone())
116 .await;
117 (result, query_request)
118 }
119 })
120 .buffer_unordered(config.query_parallelism);
121
122 while let Some((result, query_request)) = executed_queries.next().await {
123 let inner = weak_inner2.upgrade().ok_or(Error::Dropped)?;
124 let inner = inner.read()?;
125
126 if let Err(err) = &result {
127 warn!("Error executing query: err={}", err);
128 }
129
130 let should_reply = match (&query_request.sender, &result) {
131 (QueryRequestSender::WatchedQuery(_sender, watch_token), Ok(result)) => inner
132 .watched_queries
133 .update_query_results(*watch_token, result),
134
135 (QueryRequestSender::WatchedQuery(_, watch_token), Err(_err)) => {
136 inner.watched_queries.unwatch_query(*watch_token);
137 true
138 }
139
140 (QueryRequestSender::Query(_), _result) => true,
141 };
142
143 if should_reply {
144 query_request.reply(result);
145 }
146 }
147
148 Ok::<(), Error>(())
149 };
150
151 let mut events_stream = {
153 let mut inner = self.inner.write()?;
154 let events = inner.chain_handle.take_events_stream()?;
155
156 BatchingStream::new(events, config.chain_events_batch_size)
158 };
159 let (mut watch_check_sender, mut watch_check_receiver) = mpsc::channel(1);
160 let weak_inner = Arc::downgrade(&self.inner);
161 let chain_events_handler = async move {
162 while let Some(events) = events_stream.next().await {
163 match Inner::handle_chain_engine_events(&weak_inner, events).await {
164 Ok(indexed_operations) if indexed_operations > 0 => {
165 let _ = watch_check_sender.try_send(());
168 }
169 Err(err) => {
170 error!("Error handling chain engine event: {}", err);
171 if err.is_fatal() {
172 return Err(err);
173 }
174 }
175 Ok(_) => {}
176 }
177 }
178 Ok(())
179 };
180
181 let weak_inner = Arc::downgrade(&self.inner);
183 let watched_queries_checker = async move {
184 while watch_check_receiver.next().await.is_some() {
185 let inner = weak_inner.upgrade().ok_or(Error::Dropped)?;
186 let mut inner = inner.write()?;
187
188 let queries = inner.watched_queries.queries();
189 if queries.is_empty() {
190 continue;
191 }
192
193 debug!(
194 "Store may have changed because of new events. Checking for {} queries",
195 queries.len()
196 );
197 for watched_query in queries {
198 let send_result = inner.incoming_queries_sender.try_send(QueryRequest {
199 query: watched_query.query,
200 sender: QueryRequestSender::WatchedQuery(
201 watched_query.sender.clone(),
202 watched_query.token,
203 ),
204 });
205
206 if let Err(err) = send_result {
207 error!(
208 "Error sending to watched query. Removing it from queries: {}",
209 err
210 );
211 inner.watched_queries.unwatch_query(watched_query.token);
212 }
213 }
214 }
215
216 Ok::<(), Error>(())
217 };
218
219 let last_user_query = last_user_query_shared.clone();
221 let weak_inner = Arc::downgrade(&self.inner);
222 let chain_indexer = async move {
223 let Some(interval_dur) = self.config.chain_index_deferred_interval else {
224 future::pending::<()>().await; return Ok(());
226 };
227
228 let mut chain_index_interval = interval(interval_dur);
229 let mut last_index_attempt = Instant::now();
230 loop {
231 chain_index_interval.tick().await;
232
233 if last_user_query.elapsed() < self.config.chain_index_deferred_query_interval
234 && last_index_attempt.elapsed() < self.config.chain_index_deferred_max_interval
235 {
236 debug!(
237 "Skipping indexing because of recent query (last_query={:?} last_index_attempt={:?})",
238 last_user_query.elapsed(),
239 last_index_attempt.elapsed(),
240 );
241 continue;
242 }
243
244 let weak_inner = weak_inner.clone();
245 spawn_blocking(move || {
246 let inner = weak_inner.upgrade().ok_or(Error::Dropped)?;
247 let mut inner = inner.write()?;
248
249 let affected_operations = inner.index.maybe_index_chain_blocks()?;
250 if !affected_operations.is_empty() {
251 inner
252 .mutation_tracker
253 .handle_indexed_operations(affected_operations.as_slice());
254 }
255
256 Ok::<(), Error>(())
257 })
258 .await
259 .map_err(|err| {
260 Error::Other(anyhow!("Couldn't launch indexation operation: {:?}", err))
261 })??;
262
263 last_index_attempt = Instant::now();
264 }
265
266 #[allow(unreachable_code)]
268 Ok::<(), Error>(())
269 };
270
271 let mut gc_interval = interval(config.garbage_collect_interval);
276 let weak_inner = Arc::downgrade(&self.inner);
277 let garbage_collector = async move {
278 loop {
279 gc_interval.tick().await;
280
281 let inner = weak_inner.upgrade().ok_or(Error::Dropped)?;
282 let inner = inner.read()?;
283 match inner.index.run_garbage_collector() {
284 Ok(deletions) => {
285 let deletion_request = MutationRequest {
286 mutations: deletions,
287 ..Default::default()
288 };
289
290 if let Err(err) = inner.handle_mutation_request(deletion_request) {
291 error!("Error executing mutations from garbage collection: {}", err);
292 }
293 }
294 Err(err) => {
295 error!("Error running garbage collection: {}", err);
296 if err.is_fatal() {
297 return Err(err);
298 }
299 }
300 }
301 }
302
303 #[allow(unreachable_code)]
305 Ok::<(), Error>(())
306 };
307
308 info!("Entity store started");
309
310 futures::select! {
311 _ = self.handle_set.on_handles_dropped().fuse() => {},
312 _ = queries_executor.fuse() => {},
313 _ = chain_events_handler.fuse() => {},
314 _ = watched_queries_checker.fuse() => {},
315 _ = chain_indexer.fuse() => {},
316 _ = garbage_collector.fuse() => {},
317 }
318
319 Ok(())
320 }
321}
322
323struct Inner<CS, PS>
324where
325 CS: exocore_chain::chain::ChainStore,
326 PS: exocore_chain::pending::PendingStore,
327{
328 config: StoreConfig,
329 cell: Cell,
330 clock: Clock,
331 index: EntityIndex<CS, PS>,
332 watched_queries: WatchedQueries,
333 mutation_tracker: MutationTracker,
334 incoming_queries_sender: mpsc::Sender<QueryRequest>,
335 chain_handle: exocore_chain::engine::EngineHandle<CS, PS>,
336}
337
338impl<CS, PS> Inner<CS, PS>
339where
340 CS: exocore_chain::chain::ChainStore,
341 PS: exocore_chain::pending::PendingStore,
342{
343 fn handle_mutation_request(
344 &self,
345 mut request: MutationRequest,
346 ) -> Result<oneshot::Receiver<Result<MutationResult, Error>>, Error> {
347 let (sender, receiver) = oneshot::channel();
348
349 let mut operation_ids = Vec::new();
350 let mut last_entity_id = None;
351 for mutation in &mut request.mutations {
352 if let Some(Mutation::Test(_mutation)) = &mutation.mutation {
353 return Err(Error::ProtoFieldExpected("mutation"));
354 }
355
356 if mutation.entity_id.is_empty() {
358 if request.common_entity_id && last_entity_id.is_some() {
359 mutation.entity_id = last_entity_id.unwrap_or_default();
360 } else {
361 mutation.entity_id = exocore_core::utils::id::generate_prefixed_id("et");
362 }
363 }
364 last_entity_id = Some(mutation.entity_id.clone());
365
366 if let Some(Mutation::PutTrait(put_mutation)) = &mut mutation.mutation {
368 if let Some(trt) = &mut put_mutation.r#trait {
369 if trt.id.is_empty() {
370 trt.id = exocore_core::utils::id::generate_prefixed_id("trt");
371 }
372 }
373 }
374
375 let encoded = mutation.encode_to_vec();
376 let operation_id = self.chain_handle.write_entry_operation(&encoded)?;
377
378 operation_ids.push(operation_id);
379 }
380
381 if (request.wait_indexed || request.return_entities) && !request.mutations.is_empty() {
382 self.mutation_tracker.track_request(operation_ids, sender);
383 } else {
384 let _ = sender.send(Ok(MutationResult {
385 operation_ids,
386 ..Default::default()
387 }));
388 }
389
390 Ok(receiver)
391 }
392
393 async fn execute_query_async(
394 weak_inner: Weak<RwLock<Inner<CS, PS>>>,
395 query: Box<EntityQuery>,
396 ) -> Result<EntityResults, Error> {
397 let result = spawn_blocking(move || {
398 let inner = weak_inner.upgrade().ok_or(Error::Dropped)?;
399 let inner = inner.read()?;
400
401 inner.index.search(query)
402 })
403 .await;
404
405 result.map_err(|err| anyhow!("Couldn't launch blocking query: {}", err))?
406 }
407
408 async fn handle_chain_engine_events(
409 weak_inner: &Weak<RwLock<Inner<CS, PS>>>,
410 events: Vec<exocore_chain::engine::Event>,
411 ) -> Result<usize, Error> {
412 let weak_inner = weak_inner.clone();
413
414 let indexed_operations = spawn_blocking(move || -> Result<usize, Error> {
415 let inner = weak_inner.upgrade().ok_or(Error::Dropped)?;
416 let mut inner = inner.write()?;
417
418 let deferred_commit = inner.config.chain_index_deferred_interval.is_some();
421 let filtered_events = events
422 .into_iter()
423 .filter(|event| !deferred_commit || !matches!(event, Event::NewChainBlock(_)));
424
425 let (affected_operations, indexed_operations) =
426 inner.index.handle_chain_engine_events(filtered_events)?;
427 inner
428 .mutation_tracker
429 .handle_indexed_operations(affected_operations.as_slice());
430
431 Ok(indexed_operations)
432 })
433 .await;
434
435 indexed_operations.map_err(|err| anyhow!("Couldn't launch blocking query: {}", err))?
436 }
437}
438
439pub struct StoreHandle<CS, PS>
441where
442 CS: exocore_chain::chain::ChainStore,
443 PS: exocore_chain::pending::PendingStore,
444{
445 config: StoreConfig,
446 handle: Handle,
447 inner: Weak<RwLock<Inner<CS, PS>>>,
448}
449
450impl<CS, PS> StoreHandle<CS, PS>
451where
452 CS: exocore_chain::chain::ChainStore,
453 PS: exocore_chain::pending::PendingStore,
454{
455 pub async fn on_start(&self) {
456 self.handle.on_set_started().await
457 }
458
459 #[cfg(test)]
460 pub(crate) fn watched_queries(&self) -> Vec<WatchToken> {
461 let inner = self.inner.upgrade().unwrap();
462 let inner = inner.read().unwrap();
463
464 let mut tokens = Vec::new();
465 for query in inner.watched_queries.queries() {
466 tokens.push(query.token);
467 }
468
469 tokens
470 }
471
472 #[cfg(test)]
473 pub(crate) fn clear_watched_queries(&self) {
474 let inner = self.inner.upgrade().unwrap();
475 let inner = inner.read().unwrap();
476
477 inner.watched_queries.clear();
478 }
479}
480
481#[async_trait]
482impl<CS, PS> crate::store::Store for StoreHandle<CS, PS>
483where
484 CS: exocore_chain::chain::ChainStore,
485 PS: exocore_chain::pending::PendingStore,
486{
487 type WatchedQueryStream = WatchedQueryStream<CS, PS>;
488
489 async fn mutate<M: Into<MutationRequestLike> + Send>(
490 &self,
491 request: M,
492 ) -> Result<MutationResult, Error> {
493 let inner = self.inner.upgrade().ok_or(Error::Dropped)?;
494
495 let request = request.into().0;
496 let return_entities = request.return_entities;
497
498 let mutation_future = {
499 let inner = inner.read().map_err(|_| Error::Dropped)?;
500 inner.handle_mutation_request(request)?
501 };
502
503 let mut mutation_result = mutation_future.await.map_err(|_err| Error::Cancelled)??;
504
505 if return_entities {
506 let query_result = self
507 .query(EntityQuery {
508 predicate: Some(entity_query::Predicate::Operations(OperationsPredicate {
509 operation_ids: mutation_result.operation_ids.clone(),
510 })),
511 ..Default::default()
512 })
513 .await?;
514
515 mutation_result.entities = query_result
516 .entities
517 .into_iter()
518 .flat_map(|e| e.entity)
519 .collect();
520 }
521
522 Ok(mutation_result)
523 }
524
525 async fn query(&self, query: EntityQuery) -> Result<EntityResults, Error> {
526 let inner = self.inner.upgrade().ok_or(Error::Dropped)?;
527
528 let receiver = {
529 let mut inner = inner.write().map_err(|_| Error::Dropped)?;
530
531 let (sender, receiver) = oneshot::channel();
534 let _ = inner.incoming_queries_sender.try_send(QueryRequest {
535 query: Box::new(query),
536 sender: QueryRequestSender::Query(sender),
537 });
538
539 receiver
540 };
541
542 receiver.await.map_err(|_err| Error::Cancelled)?
543 }
544
545 fn watched_query(&self, mut query: EntityQuery) -> Result<Self::WatchedQueryStream, Error> {
546 let inner = self.inner.upgrade().ok_or(Error::Dropped)?;
547 let mut inner = inner.write().map_err(|_| Error::Dropped)?;
548
549 let mut watch_token = query.watch_token;
550 if watch_token == 0 {
551 watch_token = inner.clock.consistent_time(inner.cell.local_node()).into();
552 query.watch_token = watch_token;
553 }
554
555 let (sender, receiver) = mpsc::channel(self.config.handle_watch_query_channel_size);
556 let sender = Arc::new(Mutex::new(sender));
557
558 inner
559 .watched_queries
560 .track_query(watch_token, &query, sender.clone());
561
562 let _ = inner.incoming_queries_sender.try_send(QueryRequest {
565 query: Box::new(query),
566 sender: QueryRequestSender::WatchedQuery(sender, watch_token),
567 });
568
569 Ok(WatchedQueryStream {
570 watch_token,
571 inner: self.inner.clone(),
572 receiver,
573 })
574 }
575}
576
577impl<CS, PS> Clone for StoreHandle<CS, PS>
578where
579 CS: exocore_chain::chain::ChainStore,
580 PS: exocore_chain::pending::PendingStore,
581{
582 fn clone(&self) -> StoreHandle<CS, PS> {
583 StoreHandle {
584 config: self.config,
585 handle: self.handle.clone(),
586 inner: self.inner.clone(),
587 }
588 }
589}
590
591pub struct WatchedQueryStream<CS, PS>
595where
596 CS: exocore_chain::chain::ChainStore,
597 PS: exocore_chain::pending::PendingStore,
598{
599 watch_token: WatchToken,
600 inner: Weak<RwLock<Inner<CS, PS>>>,
601 receiver: mpsc::Receiver<Result<EntityResults, Error>>,
602}
603
604impl<CS, PS> futures::Stream for WatchedQueryStream<CS, PS>
605where
606 CS: exocore_chain::chain::ChainStore,
607 PS: exocore_chain::pending::PendingStore,
608{
609 type Item = Result<EntityResults, Error>;
610
611 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
612 self.receiver.poll_next_unpin(cx)
613 }
614}
615
616impl<CS, PS> Drop for WatchedQueryStream<CS, PS>
617where
618 CS: exocore_chain::chain::ChainStore,
619 PS: exocore_chain::pending::PendingStore,
620{
621 fn drop(&mut self) {
622 if let Some(inner) = self.inner.upgrade() {
623 if let Ok(inner) = inner.read() {
624 inner.watched_queries.unwatch_query(self.watch_token);
625 }
626 }
627 }
628}
629
630pub(crate) struct QueryRequest {
633 pub query: Box<EntityQuery>,
634 pub sender: QueryRequestSender,
635}
636
637pub(crate) enum QueryRequestSender {
638 Query(oneshot::Sender<Result<EntityResults, Error>>),
639 WatchedQuery(
640 Arc<Mutex<mpsc::Sender<Result<EntityResults, Error>>>>,
641 WatchToken,
642 ),
643}
644
645impl QueryRequest {
646 fn reply(mut self, result: Result<EntityResults, Error>) {
647 match self.sender {
648 QueryRequestSender::Query(sender) => {
649 let _ = sender.send(result);
650 }
651 QueryRequestSender::WatchedQuery(ref mut sender, _token) => {
652 if let Ok(mut sender) = sender.lock() {
653 let _ = sender.try_send(result);
654 }
655 }
656 }
657 }
658}
659
660#[cfg(test)]
661pub mod tests {
662 use std::time::Duration;
663
664 use exocore_core::{
665 futures::sleep,
666 tests_utils::{async_expect_eventually_fallible, async_test_retry},
667 };
668 use exocore_protos::{prost::ProstAnyPackMessageExt, store::Trait, test::TestMessage};
669 use futures::executor::block_on_stream;
670
671 use super::{super::TestStore, *};
672 use crate::{
673 local::{entity_index::gc::GarbageCollectorConfig, EntityIndexConfig},
674 mutation::MutationBuilder,
675 query::QueryBuilder,
676 store::Store,
677 };
678
679 #[tokio::test(flavor = "multi_thread")]
680 async fn store_mutate_query_via_handle() -> anyhow::Result<()> {
681 let mut test_store = TestStore::new().await?;
682 test_store.start_store().await?;
683
684 let mutation = test_store.create_put_contact_mutation("entry1", "trt1", "Hello World");
685 let result = test_store.mutate(mutation).await?;
686 assert_eq!(result.entities.len(), 0);
687 test_store
688 .cluster
689 .wait_operation_committed(0, result.operation_ids[0]);
690
691 let query = QueryBuilder::matches("hello").build();
692 let results = test_store.query(query).await?;
693 assert_eq!(results.entities.len(), 1);
694
695 Ok(())
696 }
697
698 #[tokio::test(flavor = "multi_thread")]
699 async fn store_mutate_return_entities() -> anyhow::Result<()> {
700 let mut test_store = TestStore::new().await?;
701 test_store.start_store().await?;
702
703 let mutation = test_store
704 .create_put_contact_mutation("entry1", "trt1", "Hello World")
705 .return_entities();
706 let result = test_store.mutate(mutation).await?;
707 assert_eq!(result.entities.len(), 1);
708
709 let entity = &result.entities[0];
710 assert_eq!(entity.id, "entry1");
711
712 Ok(())
713 }
714
715 #[tokio::test(flavor = "multi_thread")]
716 async fn store_mutate_generate_ids() -> anyhow::Result<()> {
717 let mut test_store = TestStore::new().await?;
718 test_store.start_store().await?;
719
720 let test_msg = TestMessage::default().pack_to_any().unwrap();
721
722 {
723 let mutation = test_store
725 .create_put_contact_mutation("", "trt1", "Hello World")
726 .return_entities();
727 let result = test_store.mutate(mutation).await?;
728 assert_eq!(result.entities.len(), 1);
729 assert_ne!("", result.entities[0].id);
730 assert_ne!("", result.entities[0].traits[0].id);
731 }
732
733 {
734 let mutation = MutationBuilder::new()
736 .put_trait(
737 "".to_string(),
738 Trait {
739 id: "".to_string(),
740 message: Some(test_msg.clone()),
741 ..Default::default()
742 },
743 )
744 .put_trait(
745 "".to_string(),
746 Trait {
747 id: "".to_string(),
748 message: Some(test_msg),
749 ..Default::default()
750 },
751 )
752 .use_common_entity_id()
753 .return_entities();
754 let result = test_store.mutate(mutation).await?;
755
756 assert_eq!(result.entities.len(), 1);
758 assert_ne!("", result.entities[0].id);
759 assert_eq!(result.entities[0].traits.len(), 2);
760 assert_ne!("", result.entities[0].traits[0].id);
761 assert_ne!("", result.entities[0].traits[1].id);
762 }
763
764 Ok(())
765 }
766
767 #[tokio::test(flavor = "multi_thread")]
768 async fn query_error_propagating() -> anyhow::Result<()> {
769 let mut test_store = TestStore::new().await?;
770 test_store.start_store().await?;
771
772 let query = QueryBuilder::test(false).build();
773 assert!(test_store.query(query).await.is_err());
774
775 Ok(())
776 }
777
778 #[tokio::test(flavor = "multi_thread")]
779 async fn mutation_error_propagating() -> anyhow::Result<()> {
780 let mut test_store = TestStore::new().await?;
781 test_store.start_store().await?;
782
783 let mutation = MutationBuilder::new().fail_mutation("entity_id".to_string());
784 assert!(test_store.mutate(mutation).await.is_err());
785
786 Ok(())
787 }
788
789 #[tokio::test(flavor = "multi_thread")]
790 async fn watched_query() -> anyhow::Result<()> {
791 let mut test_store = TestStore::new().await?;
792 test_store.start_store().await?;
793
794 let query = QueryBuilder::matches("hello").build();
795 let mut stream = block_on_stream(test_store.store_handle.watched_query(query)?);
796
797 let result = stream.next().unwrap();
798 assert_eq!(result.unwrap().entities.len(), 0);
799
800 let mutation = test_store.create_put_contact_mutation("entry1", "trt1", "Hello World");
801 let response = test_store.mutate(mutation).await?;
802 test_store
803 .cluster
804 .wait_operation_committed(0, response.operation_ids[0]);
805
806 let result = stream.next().unwrap();
807 assert_eq!(result.unwrap().entities.len(), 1);
808
809 let mutation =
810 test_store.create_put_contact_mutation("entry2", "contact2", "Something else");
811 let response = test_store.mutate(mutation).await?;
812 test_store
813 .cluster
814 .wait_operation_committed(0, response.operation_ids[0]);
815
816 let mut stream = stream.into_inner();
817 let delay = sleep(Duration::from_secs(1));
818
819 futures::select! {
820 _ = stream.next().fuse() => {
821 panic!("Got result, should have timed out");
822 },
823 _ = delay.fuse() => {
824 }
826 }
827
828 Ok(())
829 }
830
831 #[tokio::test(flavor = "multi_thread")]
832 async fn watched_query_failure() -> anyhow::Result<()> {
833 let mut test_store = TestStore::new().await?;
834 test_store.start_store().await?;
835
836 let query = QueryBuilder::test(false).build();
837 let mut stream = block_on_stream(test_store.store_handle.watched_query(query)?);
838
839 let result = stream.next().unwrap();
840 assert!(result.is_err());
841
842 Ok(())
843 }
844
845 #[tokio::test(flavor = "multi_thread")]
846 async fn garbage_collection() -> anyhow::Result<()> {
847 async_test_retry(|| async {
848 let store_config = StoreConfig {
849 garbage_collect_interval: Duration::from_millis(300),
850 chain_index_deferred_interval: None, ..Default::default()
852 };
853 let index_config = EntityIndexConfig {
854 chain_index_min_depth: 0, chain_index_depth_leeway: 0, chain_index_in_memory: true,
857 garbage_collector: GarbageCollectorConfig {
858 deleted_entity_collection: Duration::from_millis(100),
859 min_operation_age: Duration::from_nanos(1),
860 ..Default::default()
861 },
862 ..TestStore::test_index_config()
863 };
864
865 let mut test_store = TestStore::new_with_config(store_config, index_config).await?;
866 test_store.start_store().await?;
867
868 {
869 let mutation =
871 test_store.create_put_contact_mutation("entity1", "trt1", "Hello entity1");
872 test_store.mutate(mutation).await?;
873
874 let delete = MutationBuilder::new()
875 .delete_entity("entity1")
876 .return_entities()
877 .build();
878 let resp = test_store.mutate(delete).await?;
879 test_store
880 .cluster
881 .wait_operation_committed(0, resp.operation_ids[0]);
882 }
883
884 let store_handle = test_store.store_handle.clone();
886 let ent2_mut = test_store
887 .create_put_contact_mutation("entity2", "trt1", "Hello")
888 .build();
889 async_expect_eventually_fallible(|| async {
890 let query = QueryBuilder::with_id("entity1").include_deleted().build();
891 let res = store_handle.query(query).await?;
892 let is_deleted = res.entities.is_empty();
893
894 if !is_deleted {
895 store_handle.mutate(ent2_mut.clone()).await.unwrap();
898 Err(anyhow!("entities still not deleted"))
899 } else {
900 Ok(())
901 }
902 })
903 .await?;
904
905 Ok(())
906 })
907 .await
908 }
909}