1use ave_common::SchemaType;
2use borsh::{BorshDeserialize, BorshSerialize};
3use rand::rng;
4use rand::seq::IteratorRandom;
5use serde::{Deserialize, Serialize};
6use std::collections::{BTreeMap, BTreeSet, HashSet};
7use std::fmt::Debug;
8use std::slice;
9use tracing::error;
10
11use ave_actors::{
12 Actor, ActorContext, ActorError, ActorPath, ActorRef, Handler,
13 PersistentActor, Store, StoreCommand, StoreResponse,
14};
15
16use ave_common::identity::{DigestIdentifier, PublicKey};
17
18use crate::governance::model::Quorum;
19use crate::governance::role_register::{
20 CurrentValidationRoles, RoleDataRegister, RoleRegister,
21 RoleRegisterMessage, RoleRegisterResponse, SearchRole,
22};
23use crate::governance::subject_register::{
24 SubjectRegister, SubjectRegisterMessage,
25};
26use crate::governance::witnesses_register::{
27 WitnessesRegister, WitnessesRegisterMessage, WitnessesRegisterResponse,
28};
29use crate::request::manager::{
30 RebootType, RequestManager, RequestManagerMessage,
31};
32use crate::request::tracking::{RequestTracking, RequestTrackingMessage};
33use std::ops::Bound::{Included, Unbounded};
34
35pub mod contract;
36pub mod distribution_plan;
37pub mod node;
38pub mod subject;
39pub mod viewpoints;
40
41pub fn check_quorum_signers(
42 signers: &HashSet<PublicKey>,
43 quorum: &Quorum,
44 workers: &HashSet<PublicKey>,
45) -> bool {
46 signers.is_subset(workers)
47 && quorum.check_quorum(workers.len() as u32, signers.len() as u32)
48}
49
50pub async fn get_actual_roles_register<A>(
51 ctx: &mut ActorContext<A>,
52 governance_id: &DigestIdentifier,
53 evaluation: SearchRole,
54 approval: bool,
55 version: u64,
56) -> Result<(RoleDataRegister, Option<RoleDataRegister>), ActorError>
57where
58 A: Actor + Handler<A>,
59{
60 let path = ActorPath::from(format!(
61 "/user/node/subject_manager/{}/role_register",
62 governance_id
63 ));
64 let actor = ctx.system().get_actor::<RoleRegister>(&path).await?;
65
66 let response = actor
67 .ask(RoleRegisterMessage::SearchActualRoles {
68 version,
69 evaluation,
70 approval,
71 })
72 .await?;
73
74 match response {
75 RoleRegisterResponse::ActualRoles {
76 evaluation,
77 approval,
78 } => Ok((evaluation, approval)),
79 _ => Err(ActorError::UnexpectedResponse {
80 path,
81 expected: "RolesRegisterResponse::ActualRoles".to_string(),
82 }),
83 }
84}
85
86pub async fn get_validation_roles_register<A>(
87 ctx: &mut ActorContext<A>,
88 governance_id: &DigestIdentifier,
89 search: SearchRole,
90 version: u64,
91) -> Result<RoleDataRegister, ActorError>
92where
93 A: Actor + Handler<A>,
94{
95 let path = ActorPath::from(format!(
96 "/user/node/subject_manager/{}/role_register",
97 governance_id
98 ));
99 let actor = ctx.system().get_actor::<RoleRegister>(&path).await?;
100
101 let response = actor
102 .ask(RoleRegisterMessage::SearchValidators { search, version })
103 .await?;
104
105 match response {
106 RoleRegisterResponse::Validation(validation) => Ok(validation),
107 _ => Err(ActorError::UnexpectedResponse {
108 path,
109 expected: "RolesRegisterResponse::Validation".to_string(),
110 }),
111 }
112}
113
114pub async fn get_current_validation_roles_register<A>(
115 ctx: &mut ActorContext<A>,
116 governance_id: &DigestIdentifier,
117 schema_id: SchemaType,
118) -> Result<CurrentValidationRoles, ActorError>
119where
120 A: Actor + Handler<A>,
121{
122 let path = ActorPath::from(format!(
123 "/user/node/subject_manager/{}/role_register",
124 governance_id
125 ));
126 let actor = ctx.system().get_actor::<RoleRegister>(&path).await?;
127
128 let response = actor
129 .ask(RoleRegisterMessage::GetCurrentValidationRoles { schema_id })
130 .await?;
131
132 match response {
133 RoleRegisterResponse::CurrentValidationRoles(roles) => Ok(roles),
134 _ => Err(ActorError::UnexpectedResponse {
135 path,
136 expected: "RolesRegisterResponse::CurrentValidationRoles"
137 .to_string(),
138 }),
139 }
140}
141
142pub async fn check_subject_creation<A>(
143 ctx: &mut ActorContext<A>,
144 governance_id: &DigestIdentifier,
145 creator: PublicKey,
146 gov_version: u64,
147 namespace: String,
148 schema_id: SchemaType,
149) -> Result<(), ActorError>
150where
151 A: Actor + Handler<A>,
152{
153 let actor_path = ActorPath::from(format!(
154 "/user/node/subject_manager/{}/subject_register",
155 governance_id
156 ));
157
158 let actor: ActorRef<SubjectRegister> =
159 ctx.system().get_actor(&actor_path).await.map_err(|_| {
160 ActorError::Functional {
161 description: "Governance has not been found".to_string(),
162 }
163 })?;
164
165 let _response = actor
166 .ask(SubjectRegisterMessage::Check {
167 creator,
168 gov_version,
169 namespace,
170 schema_id,
171 })
172 .await?;
173
174 Ok(())
175}
176
177pub async fn check_witness_access<A>(
178 ctx: &mut ActorContext<A>,
179 governance_id: &DigestIdentifier,
180 subject_id: &DigestIdentifier,
181 node: PublicKey,
182 namespace: String,
183 schema_id: SchemaType,
184) -> Result<Option<u64>, ActorError>
185where
186 A: Actor + Handler<A>,
187{
188 let actor_path = ActorPath::from(format!(
189 "/user/node/subject_manager/{}/witnesses_register",
190 governance_id
191 ));
192
193 let actor: ActorRef<WitnessesRegister> =
194 ctx.system().get_actor(&actor_path).await?;
195
196 let response = actor
197 .ask(WitnessesRegisterMessage::Access {
198 subject_id: subject_id.to_owned(),
199 node,
200 namespace,
201 schema_id,
202 })
203 .await?;
204
205 match response {
206 WitnessesRegisterResponse::Access { sn } => Ok(sn),
207 _ => Err(ActorError::UnexpectedResponse {
208 path: actor_path,
209 expected: "WitnessesRegisterResponse::Access { sn }".to_string(),
210 }),
211 }
212}
213
214#[derive(
215 Clone,
216 Copy,
217 Debug,
218 PartialEq,
219 Eq,
220 Serialize,
221 Deserialize,
222 BorshDeserialize,
223 BorshSerialize,
224)]
225pub struct Interval {
226 pub lo: u64,
227 pub hi: u64, }
229
230impl Interval {
231 pub const fn new(a: u64, b: u64) -> Self {
232 if a <= b {
233 Self { lo: a, hi: b }
234 } else {
235 Self { lo: b, hi: a }
236 }
237 }
238
239 pub const fn contains(&self, value: u64) -> bool {
240 value >= self.lo && value <= self.hi
241 }
242}
243
244#[derive(
245 Default,
246 Debug,
247 Clone,
248 Serialize,
249 Deserialize,
250 BorshDeserialize,
251 BorshSerialize,
252)]
253pub struct IntervalSet {
254 intervals: Vec<Interval>,
256}
257
258impl IntervalSet {
259 pub const fn new() -> Self {
260 Self {
261 intervals: Vec::new(),
262 }
263 }
264
265 pub fn contains(&self, x: u64) -> bool {
267 if self.intervals.is_empty() {
268 return false;
269 }
270
271 match self.intervals.binary_search_by(|iv| iv.lo.cmp(&x)) {
272 Ok(_) => true, Err(pos) => {
274 if pos == 0 {
275 return false; }
277 let iv = self.intervals[pos - 1];
278 iv.hi >= x
279 }
280 }
281 }
282
283 pub fn insert(&mut self, mut iv: Interval) {
285 let mut i = match self.intervals.binary_search_by(|x| x.lo.cmp(&iv.lo))
287 {
288 Ok(pos) | Err(pos) => pos,
289 };
290
291 if i > 0 && self.intervals[i - 1].hi >= iv.lo {
293 i -= 1;
294 }
295
296 while i < self.intervals.len() && self.intervals[i].lo <= iv.hi {
298 let cur = self.intervals[i];
299 iv.lo = iv.lo.min(cur.lo);
300 iv.hi = iv.hi.max(cur.hi);
301 self.intervals.remove(i); }
303
304 self.intervals.insert(i, iv);
305 }
306
307 pub fn max_covered_in(&self, ql: u64, qh: u64) -> Option<u64> {
309 let (ql, qh) = if ql <= qh { (ql, qh) } else { (qh, ql) };
310 if self.intervals.is_empty() {
311 return None;
312 }
313
314 let idx = match self.intervals.binary_search_by(|iv| iv.lo.cmp(&qh)) {
316 Ok(pos) => pos, Err(pos) => {
318 if pos == 0 {
319 return None;
320 }
321 pos - 1
322 }
323 };
324
325 let iv = self.intervals[idx];
326 if iv.hi >= ql {
328 Some(iv.hi.min(qh))
329 } else {
330 None
331 }
332 }
333
334 pub fn as_slice(&self) -> &[Interval] {
335 &self.intervals
336 }
337
338 pub fn iter(&self) -> slice::Iter<'_, Interval> {
339 self.intervals.iter()
340 }
341
342 pub fn iter_mut(&mut self) -> slice::IterMut<'_, Interval> {
343 self.intervals.iter_mut()
344 }
345}
346
347#[derive(
348 Debug,
349 Clone,
350 Copy,
351 PartialEq,
352 Eq,
353 Serialize,
354 Deserialize,
355 BorshDeserialize,
356 BorshSerialize,
357)]
358pub enum TrackerVisibilityMode {
359 Full,
360 Opaque,
361}
362
363#[derive(
364 Debug,
365 Clone,
366 PartialEq,
367 Eq,
368 Serialize,
369 Deserialize,
370 BorshDeserialize,
371 BorshSerialize,
372)]
373pub enum TrackerStoredVisibility {
374 Full,
375 Only(BTreeSet<String>),
376 None,
377}
378
379#[derive(
380 Debug,
381 Clone,
382 PartialEq,
383 Eq,
384 Serialize,
385 Deserialize,
386 BorshDeserialize,
387 BorshSerialize,
388)]
389pub struct TrackerStoredVisibilityRange {
390 pub from_sn: u64,
391 pub to_sn: Option<u64>,
392 pub visibility: TrackerStoredVisibility,
393}
394
395#[derive(Debug, Clone, Copy)]
396pub struct TrackerStoredVisibilitySpan<'a> {
397 pub interval: Interval,
398 pub visibility: &'a TrackerStoredVisibility,
399}
400
401#[derive(
402 Debug,
403 Clone,
404 PartialEq,
405 Eq,
406 Serialize,
407 Deserialize,
408 BorshDeserialize,
409 BorshSerialize,
410)]
411pub enum TrackerEventVisibility {
412 NonFact,
413 Fact(BTreeSet<String>),
414}
415
416#[derive(
417 Debug,
418 Clone,
419 PartialEq,
420 Eq,
421 Serialize,
422 Deserialize,
423 BorshDeserialize,
424 BorshSerialize,
425)]
426pub struct TrackerEventVisibilityRange {
427 pub from_sn: u64,
428 pub to_sn: Option<u64>,
429 pub visibility: TrackerEventVisibility,
430}
431
432#[derive(Debug, Clone, Copy)]
433pub struct TrackerEventVisibilitySpan<'a> {
434 pub interval: Interval,
435 pub visibility: &'a TrackerEventVisibility,
436}
437
438#[derive(
439 Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
440)]
441pub struct TrackerVisibilityState {
442 pub mode: TrackerVisibilityMode,
443 pub stored_ranges: Vec<TrackerStoredVisibilityRange>,
448 pub event_ranges: Vec<TrackerEventVisibilityRange>,
453}
454
455pub struct TrackerStoredVisibilityIter<'a> {
456 ranges: &'a [TrackerStoredVisibilityRange],
457 index: usize,
458 from_sn: u64,
459 to_sn: u64,
460}
461
462pub struct TrackerEventVisibilityIter<'a> {
463 ranges: &'a [TrackerEventVisibilityRange],
464 index: usize,
465 from_sn: u64,
466 to_sn: u64,
467}
468
469impl Default for TrackerVisibilityState {
470 fn default() -> Self {
471 Self {
472 mode: TrackerVisibilityMode::Full,
473 stored_ranges: Vec::new(),
474 event_ranges: Vec::new(),
475 }
476 }
477}
478
479impl TrackerVisibilityState {
480 pub const fn is_full(&self) -> bool {
481 matches!(self.mode, TrackerVisibilityMode::Full)
482 }
483
484 pub const fn set_mode(&mut self, mode: TrackerVisibilityMode) {
485 self.mode = mode;
486 }
487
488 pub fn record_event(
489 &mut self,
490 sn: u64,
491 stored_visibility: TrackerStoredVisibility,
492 event_visibility: TrackerEventVisibility,
493 ) {
494 self.push_stored(sn, stored_visibility);
495 self.push_event(sn, event_visibility);
496 }
497
498 fn push_stored(&mut self, sn: u64, visibility: TrackerStoredVisibility) {
499 if let Some(last) = self.stored_ranges.last_mut() {
500 if last.visibility == visibility {
501 return;
502 }
503
504 if last.to_sn.is_none() {
505 last.to_sn = Some(sn.saturating_sub(1));
506 }
507 }
508
509 self.stored_ranges.push(TrackerStoredVisibilityRange {
510 from_sn: sn,
511 to_sn: None,
512 visibility,
513 });
514 }
515
516 fn push_event(&mut self, sn: u64, visibility: TrackerEventVisibility) {
517 if let Some(last) = self.event_ranges.last_mut() {
518 if last.visibility == visibility {
519 return;
520 }
521
522 if last.to_sn.is_none() {
523 last.to_sn = Some(sn.saturating_sub(1));
524 }
525 }
526
527 self.event_ranges.push(TrackerEventVisibilityRange {
528 from_sn: sn,
529 to_sn: None,
530 visibility,
531 });
532 }
533
534 fn first_overlapping_stored_index(&self, from_sn: u64) -> Option<usize> {
535 if self.stored_ranges.is_empty() {
536 return None;
537 }
538
539 match self
540 .stored_ranges
541 .binary_search_by(|range| range.from_sn.cmp(&from_sn))
542 {
543 Ok(index) => Some(index),
544 Err(pos) => {
545 if pos == 0 {
546 return Some(0);
547 }
548
549 let index = pos - 1;
550 if self.stored_ranges[index].contains(from_sn) {
551 Some(index)
552 } else if pos < self.stored_ranges.len() {
553 Some(pos)
554 } else {
555 None
556 }
557 }
558 }
559 }
560
561 fn first_overlapping_event_index(&self, from_sn: u64) -> Option<usize> {
562 if self.event_ranges.is_empty() {
563 return None;
564 }
565
566 match self
567 .event_ranges
568 .binary_search_by(|range| range.from_sn.cmp(&from_sn))
569 {
570 Ok(index) => Some(index),
571 Err(pos) => {
572 if pos == 0 {
573 return Some(0);
574 }
575
576 let index = pos - 1;
577 if self.event_ranges[index].contains(from_sn) {
578 Some(index)
579 } else if pos < self.event_ranges.len() {
580 Some(pos)
581 } else {
582 None
583 }
584 }
585 }
586 }
587
588 pub fn iter_stored(
589 &self,
590 from_sn: u64,
591 to_sn: u64,
592 ) -> TrackerStoredVisibilityIter<'_> {
593 let index = self
594 .first_overlapping_stored_index(from_sn)
595 .unwrap_or(self.stored_ranges.len());
596
597 TrackerStoredVisibilityIter {
598 ranges: &self.stored_ranges,
599 index,
600 from_sn,
601 to_sn,
602 }
603 }
604
605 pub fn iter_events(
606 &self,
607 from_sn: u64,
608 to_sn: u64,
609 ) -> TrackerEventVisibilityIter<'_> {
610 let index = self
611 .first_overlapping_event_index(from_sn)
612 .unwrap_or(self.event_ranges.len());
613
614 TrackerEventVisibilityIter {
615 ranges: &self.event_ranges,
616 index,
617 from_sn,
618 to_sn,
619 }
620 }
621}
622
623impl<'a> Iterator for TrackerStoredVisibilityIter<'a> {
624 type Item = TrackerStoredVisibilitySpan<'a>;
625
626 fn next(&mut self) -> Option<Self::Item> {
627 let range = self.ranges.get(self.index)?;
628 if range.from_sn > self.to_sn {
629 return None;
630 }
631
632 self.index += 1;
633
634 let lo = range.from_sn.max(self.from_sn);
635 let hi = range.to_sn.unwrap_or(self.to_sn).min(self.to_sn);
636 if hi < lo {
637 return self.next();
638 }
639
640 Some(TrackerStoredVisibilitySpan {
641 interval: Interval::new(lo, hi),
642 visibility: &range.visibility,
643 })
644 }
645}
646
647impl<'a> Iterator for TrackerEventVisibilityIter<'a> {
648 type Item = TrackerEventVisibilitySpan<'a>;
649
650 fn next(&mut self) -> Option<Self::Item> {
651 let range = self.ranges.get(self.index)?;
652 if range.from_sn > self.to_sn {
653 return None;
654 }
655
656 self.index += 1;
657
658 let lo = range.from_sn.max(self.from_sn);
659 let hi = range.to_sn.unwrap_or(self.to_sn).min(self.to_sn);
660 if hi < lo {
661 return self.next();
662 }
663
664 Some(TrackerEventVisibilitySpan {
665 interval: Interval::new(lo, hi),
666 visibility: &range.visibility,
667 })
668 }
669}
670
671impl TrackerStoredVisibilityRange {
672 fn contains(&self, sn: u64) -> bool {
673 self.from_sn <= sn && self.to_sn.is_none_or(|to_sn| sn <= to_sn)
674 }
675}
676
677impl TrackerEventVisibilityRange {
678 fn contains(&self, sn: u64) -> bool {
679 self.from_sn <= sn && self.to_sn.is_none_or(|to_sn| sn <= to_sn)
680 }
681}
682
683impl<'a> IntoIterator for &'a IntervalSet {
684 type Item = &'a Interval;
685 type IntoIter = slice::Iter<'a, Interval>;
686
687 fn into_iter(self) -> Self::IntoIter {
688 self.intervals.iter()
689 }
690}
691
692impl<'a> IntoIterator for &'a mut IntervalSet {
693 type Item = &'a mut Interval;
694 type IntoIter = slice::IterMut<'a, Interval>;
695
696 fn into_iter(self) -> Self::IntoIter {
697 self.intervals.iter_mut()
698 }
699}
700
701impl IntoIterator for IntervalSet {
702 type Item = Interval;
703 type IntoIter = std::vec::IntoIter<Interval>;
704
705 fn into_iter(self) -> Self::IntoIter {
706 self.intervals.into_iter()
707 }
708}
709
710#[derive(
711 Debug,
712 Clone,
713 Serialize,
714 Deserialize,
715 Default,
716 BorshDeserialize,
717 BorshSerialize,
718)]
719pub struct CeilingMap<T> {
720 inner: BTreeMap<u64, T>,
721}
722
723impl<T> CeilingMap<T>
724where
725 T: Debug + Clone + Serialize,
726{
727 pub const fn new() -> Self {
728 Self {
729 inner: BTreeMap::new(),
730 }
731 }
732
733 pub fn last(&self) -> Option<(&u64, &T)> {
734 self.inner.last_key_value()
735 }
736
737 pub fn insert(&mut self, key: u64, value: T) {
738 self.inner.insert(key, value);
739 }
740
741 pub fn iter(&self) -> impl Iterator<Item = (&u64, &T)> {
742 self.inner.iter()
743 }
744
745 pub fn range_with_predecessor(
746 &self,
747 lower: u64,
748 upper: u64,
749 ) -> Vec<(u64, T)> {
750 let mut out: Vec<(u64, T)> = Vec::new();
751
752 if let Some((key, value)) = self.inner.range(..lower).next_back() {
753 out.push((*key, value.clone()));
754 }
755
756 for (key, value) in
757 self.inner.range((Included(&lower), Included(&upper)))
758 {
759 out.push((*key, value.clone()));
760 }
761
762 out
763 }
764
765 pub fn get_prev_or_equal(&self, key: u64) -> Option<T> {
766 self.inner
767 .range((Unbounded, Included(&key)))
768 .next_back()
769 .map(|x| x.1.clone())
770 }
771}
772
773pub async fn send_to_tracking<A>(
774 ctx: &mut ActorContext<A>,
775 message: RequestTrackingMessage,
776) -> Result<(), ActorError>
777where
778 A: Actor + Handler<A>,
779{
780 let tracking_path = ActorPath::from("/user/request/tracking");
781 let tracking_actor = ctx
782 .system()
783 .get_actor::<RequestTracking>(&tracking_path)
784 .await?;
785 tracking_actor.tell(message).await
786}
787
788pub async fn emit_fail<A>(
789 ctx: &mut ActorContext<A>,
790 error: ActorError,
791) -> ActorError
792where
793 A: Actor + Handler<A>,
794{
795 error!("Falling, error: {}, actor: {}", error, ctx.path());
796 if let Err(_e) = ctx.emit_fail(error.clone()).await {
797 ctx.system().crash_system();
798 };
799 error
800}
801
802pub fn take_random_signers(
803 signers: HashSet<PublicKey>,
804 quantity: usize,
805) -> (HashSet<PublicKey>, HashSet<PublicKey>) {
806 if quantity == signers.len() {
807 return (signers, HashSet::new());
808 }
809
810 let mut rng = rng();
811
812 let random_signers: HashSet<PublicKey> = signers
813 .iter()
814 .sample(&mut rng, quantity)
815 .into_iter()
816 .cloned()
817 .collect();
818
819 let signers = signers
820 .difference(&random_signers)
821 .cloned()
822 .collect::<HashSet<PublicKey>>();
823
824 (random_signers, signers)
825}
826
827pub async fn send_reboot_to_req<A>(
828 ctx: &mut ActorContext<A>,
829 request_id: DigestIdentifier,
830 governance_id: DigestIdentifier,
831 reboot_type: RebootType,
832) -> Result<(), ActorError>
833where
834 A: Actor + Handler<A>,
835{
836 let req_actor = ctx.get_parent::<RequestManager>().await?;
837 req_actor
838 .tell(RequestManagerMessage::Reboot {
839 governance_id,
840 reboot_type,
841 request_id,
842 })
843 .await
844}
845
846pub async fn abort_req<A>(
847 ctx: &mut ActorContext<A>,
848 request_id: DigestIdentifier,
849 who: PublicKey,
850 reason: String,
851 sn: u64,
852) -> Result<(), ActorError>
853where
854 A: Actor + Handler<A>,
855{
856 let req_actor = ctx.get_parent::<RequestManager>().await?;
857 req_actor
858 .tell(RequestManagerMessage::Abort {
859 request_id,
860 who,
861 reason,
862 sn,
863 })
864 .await
865}
866
867pub async fn purge_storage<A>(
868 ctx: &mut ActorContext<A>,
869) -> Result<(), ActorError>
870where
871 A: PersistentActor,
872 A::Event: BorshSerialize + BorshDeserialize,
873{
874 let store = ctx.get_child::<Store<A>>("store").await?;
875 let _response = store.ask(StoreCommand::Purge).await?;
876
877 Ok(())
878}
879
880pub async fn get_last_event<A>(
881 ctx: &mut ActorContext<A>,
882) -> Result<Option<A::Event>, ActorError>
883where
884 A: PersistentActor,
885 A::Event: BorshSerialize + BorshDeserialize,
886{
887 let store = ctx.get_child::<Store<A>>("store").await?;
888 let response = store.ask(StoreCommand::LastEvent).await?;
889
890 match response {
891 StoreResponse::LastEvent(event) => Ok(event),
892 _ => Err(ActorError::UnexpectedResponse {
893 path: ActorPath::from(format!("{}/store", ctx.path())),
894 expected: "StoreResponse::LastEvent".to_owned(),
895 }),
896 }
897}
898
899pub async fn get_n_events<A>(
900 ctx: &mut ActorContext<A>,
901 last_sn: u64,
902 quantity: u64,
903) -> Result<Vec<A::Event>, ActorError>
904where
905 A: PersistentActor,
906 A::Event: BorshSerialize + BorshDeserialize,
907{
908 let store = ctx.get_child::<Store<A>>("store").await?;
909 let response = store
910 .ask(StoreCommand::GetEvents {
911 from: last_sn,
912 to: last_sn + quantity,
913 })
914 .await?;
915
916 match response {
917 StoreResponse::Events(events) => Ok(events),
918 _ => Err(ActorError::UnexpectedResponse {
919 path: ActorPath::from(format!("{}/store", ctx.path())),
920 expected: "StoreResponse::Events".to_owned(),
921 }),
922 }
923}