1use ave_common::SchemaType;
2use borsh::{BorshDeserialize, BorshSerialize};
3use rand::rng;
4use rand::seq::IteratorRandom;
5use serde::{Deserialize, Serialize};
6use std::collections::{BTreeMap, HashSet};
7use std::fmt::Debug;
8use std::slice;
9use tracing::error;
10
11use ave_actors::{
12 Actor, ActorContext, ActorError, ActorPath, ActorRef, Handler,
13 PersistentActor, Store, StoreCommand, StoreResponse,
14};
15
16use ave_common::identity::{DigestIdentifier, PublicKey};
17
18use crate::governance::model::Quorum;
19use crate::governance::role_register::{
20 CurrentValidationRoles, RoleDataRegister, RoleRegister,
21 RoleRegisterMessage, RoleRegisterResponse, SearchRole,
22};
23use crate::governance::subject_register::{
24 SubjectRegister, SubjectRegisterMessage,
25};
26use crate::governance::witnesses_register::{
27 WitnessesRegister, WitnessesRegisterMessage, WitnessesRegisterResponse,
28};
29use crate::request::manager::{
30 RebootType, RequestManager, RequestManagerMessage,
31};
32use crate::request::tracking::{RequestTracking, RequestTrackingMessage};
33use std::ops::Bound::{Included, Unbounded};
34
35pub mod contract;
36pub mod node;
37pub mod subject;
38
39pub fn check_quorum_signers(
40 signers: &HashSet<PublicKey>,
41 quorum: &Quorum,
42 workers: &HashSet<PublicKey>,
43) -> bool {
44 signers.is_subset(workers)
45 && quorum.check_quorum(workers.len() as u32, signers.len() as u32)
46}
47
48pub async fn get_actual_roles_register<A>(
49 ctx: &mut ActorContext<A>,
50 governance_id: &DigestIdentifier,
51 evaluation: SearchRole,
52 approval: bool,
53 version: u64,
54) -> Result<(RoleDataRegister, Option<RoleDataRegister>), ActorError>
55where
56 A: Actor + Handler<A>,
57{
58 let path = ActorPath::from(format!(
59 "/user/node/subject_manager/{}/role_register",
60 governance_id
61 ));
62 let actor = ctx.system().get_actor::<RoleRegister>(&path).await?;
63
64 let response = actor
65 .ask(RoleRegisterMessage::SearchActualRoles {
66 version,
67 evaluation,
68 approval,
69 })
70 .await?;
71
72 match response {
73 RoleRegisterResponse::ActualRoles {
74 evaluation,
75 approval,
76 } => Ok((evaluation, approval)),
77 _ => Err(ActorError::UnexpectedResponse {
78 path,
79 expected: "RolesRegisterResponse::ActualRoles".to_string(),
80 }),
81 }
82}
83
84pub async fn get_validation_roles_register<A>(
85 ctx: &mut ActorContext<A>,
86 governance_id: &DigestIdentifier,
87 search: SearchRole,
88 version: u64,
89) -> Result<RoleDataRegister, ActorError>
90where
91 A: Actor + Handler<A>,
92{
93 let path = ActorPath::from(format!(
94 "/user/node/subject_manager/{}/role_register",
95 governance_id
96 ));
97 let actor = ctx.system().get_actor::<RoleRegister>(&path).await?;
98
99 let response = actor
100 .ask(RoleRegisterMessage::SearchValidators { search, version })
101 .await?;
102
103 match response {
104 RoleRegisterResponse::Validation(validation) => Ok(validation),
105 _ => Err(ActorError::UnexpectedResponse {
106 path,
107 expected: "RolesRegisterResponse::Validation".to_string(),
108 }),
109 }
110}
111
112pub async fn get_current_validation_roles_register<A>(
113 ctx: &mut ActorContext<A>,
114 governance_id: &DigestIdentifier,
115 schema_id: SchemaType,
116) -> Result<CurrentValidationRoles, ActorError>
117where
118 A: Actor + Handler<A>,
119{
120 let path = ActorPath::from(format!(
121 "/user/node/subject_manager/{}/role_register",
122 governance_id
123 ));
124 let actor = ctx.system().get_actor::<RoleRegister>(&path).await?;
125
126 let response = actor
127 .ask(RoleRegisterMessage::GetCurrentValidationRoles { schema_id })
128 .await?;
129
130 match response {
131 RoleRegisterResponse::CurrentValidationRoles(roles) => Ok(roles),
132 _ => Err(ActorError::UnexpectedResponse {
133 path,
134 expected: "RolesRegisterResponse::CurrentValidationRoles"
135 .to_string(),
136 }),
137 }
138}
139
140pub async fn check_subject_creation<A>(
141 ctx: &mut ActorContext<A>,
142 governance_id: &DigestIdentifier,
143 creator: PublicKey,
144 gov_version: u64,
145 namespace: String,
146 schema_id: SchemaType,
147) -> Result<(), ActorError>
148where
149 A: Actor + Handler<A>,
150{
151 let actor_path = ActorPath::from(format!(
152 "/user/node/subject_manager/{}/subject_register",
153 governance_id
154 ));
155
156 let actor: ActorRef<SubjectRegister> =
157 ctx.system().get_actor(&actor_path).await.map_err(|_| {
158 ActorError::Functional {
159 description: "Governance has not been found".to_string(),
160 }
161 })?;
162
163 let _response = actor
164 .ask(SubjectRegisterMessage::Check {
165 creator,
166 gov_version,
167 namespace,
168 schema_id,
169 })
170 .await?;
171
172 Ok(())
173}
174
175pub async fn check_witness_access<A>(
176 ctx: &mut ActorContext<A>,
177 governance_id: &DigestIdentifier,
178 subject_id: &DigestIdentifier,
179 node: PublicKey,
180 namespace: String,
181 schema_id: SchemaType,
182) -> Result<Option<u64>, ActorError>
183where
184 A: Actor + Handler<A>,
185{
186 let actor_path = ActorPath::from(format!(
187 "/user/node/subject_manager/{}/witnesses_register",
188 governance_id
189 ));
190
191 let actor: ActorRef<WitnessesRegister> =
192 ctx.system().get_actor(&actor_path).await?;
193
194 let response = actor
195 .ask(WitnessesRegisterMessage::Access {
196 subject_id: subject_id.to_owned(),
197 node,
198 namespace,
199 schema_id,
200 })
201 .await?;
202
203 match response {
204 WitnessesRegisterResponse::Access { sn } => Ok(sn),
205 _ => Err(ActorError::UnexpectedResponse {
206 path: actor_path,
207 expected: "WitnessesRegisterResponse::Access { sn }".to_string(),
208 }),
209 }
210}
211
212#[derive(
213 Clone,
214 Copy,
215 Debug,
216 PartialEq,
217 Eq,
218 Serialize,
219 Deserialize,
220 BorshDeserialize,
221 BorshSerialize,
222)]
223pub struct Interval {
224 pub lo: u64,
225 pub hi: u64, }
227
228impl Interval {
229 pub const fn new(a: u64, b: u64) -> Self {
230 if a <= b {
231 Self { lo: a, hi: b }
232 } else {
233 Self { lo: b, hi: a }
234 }
235 }
236
237 pub const fn contains(&self, value: u64) -> bool {
238 value >= self.lo && value <= self.hi
239 }
240}
241
242#[derive(
243 Default,
244 Debug,
245 Clone,
246 Serialize,
247 Deserialize,
248 BorshDeserialize,
249 BorshSerialize,
250)]
251pub struct IntervalSet {
252 intervals: Vec<Interval>,
254}
255
256impl IntervalSet {
257 pub const fn new() -> Self {
258 Self {
259 intervals: Vec::new(),
260 }
261 }
262
263 pub fn contains(&self, x: u64) -> bool {
265 if self.intervals.is_empty() {
266 return false;
267 }
268
269 match self.intervals.binary_search_by(|iv| iv.lo.cmp(&x)) {
270 Ok(_) => true, Err(pos) => {
272 if pos == 0 {
273 return false; }
275 let iv = self.intervals[pos - 1];
276 iv.hi >= x
277 }
278 }
279 }
280
281 pub fn insert(&mut self, mut iv: Interval) {
283 let mut i = match self.intervals.binary_search_by(|x| x.lo.cmp(&iv.lo))
285 {
286 Ok(pos) | Err(pos) => pos,
287 };
288
289 if i > 0 && self.intervals[i - 1].hi >= iv.lo {
291 i -= 1;
292 }
293
294 while i < self.intervals.len() && self.intervals[i].lo <= iv.hi {
296 let cur = self.intervals[i];
297 iv.lo = iv.lo.min(cur.lo);
298 iv.hi = iv.hi.max(cur.hi);
299 self.intervals.remove(i); }
301
302 self.intervals.insert(i, iv);
303 }
304
305 pub fn max_covered_in(&self, ql: u64, qh: u64) -> Option<u64> {
307 let (ql, qh) = if ql <= qh { (ql, qh) } else { (qh, ql) };
308 if self.intervals.is_empty() {
309 return None;
310 }
311
312 let idx = match self.intervals.binary_search_by(|iv| iv.lo.cmp(&qh)) {
314 Ok(pos) => pos, Err(pos) => {
316 if pos == 0 {
317 return None;
318 }
319 pos - 1
320 }
321 };
322
323 let iv = self.intervals[idx];
324 if iv.hi >= ql {
326 Some(iv.hi.min(qh))
327 } else {
328 None
329 }
330 }
331
332 pub fn as_slice(&self) -> &[Interval] {
333 &self.intervals
334 }
335
336 pub fn iter(&self) -> slice::Iter<'_, Interval> {
337 self.intervals.iter()
338 }
339
340 pub fn iter_mut(&mut self) -> slice::IterMut<'_, Interval> {
341 self.intervals.iter_mut()
342 }
343}
344
345impl<'a> IntoIterator for &'a IntervalSet {
346 type Item = &'a Interval;
347 type IntoIter = slice::Iter<'a, Interval>;
348
349 fn into_iter(self) -> Self::IntoIter {
350 self.intervals.iter()
351 }
352}
353
354impl<'a> IntoIterator for &'a mut IntervalSet {
355 type Item = &'a mut Interval;
356 type IntoIter = slice::IterMut<'a, Interval>;
357
358 fn into_iter(self) -> Self::IntoIter {
359 self.intervals.iter_mut()
360 }
361}
362
363impl IntoIterator for IntervalSet {
364 type Item = Interval;
365 type IntoIter = std::vec::IntoIter<Interval>;
366
367 fn into_iter(self) -> Self::IntoIter {
368 self.intervals.into_iter()
369 }
370}
371
372#[derive(
373 Debug,
374 Clone,
375 Serialize,
376 Deserialize,
377 Default,
378 BorshDeserialize,
379 BorshSerialize,
380)]
381pub struct CeilingMap<T> {
382 inner: BTreeMap<u64, T>,
383}
384
385impl<T> CeilingMap<T>
386where
387 T: Debug + Clone + Serialize,
388{
389 pub const fn new() -> Self {
390 Self {
391 inner: BTreeMap::new(),
392 }
393 }
394
395 pub fn last(&self) -> Option<(&u64, &T)> {
396 self.inner.last_key_value()
397 }
398
399 pub fn insert(&mut self, key: u64, value: T) {
400 self.inner.insert(key, value);
401 }
402
403 pub fn range_with_predecessor(
404 &self,
405 lower: u64,
406 upper: u64,
407 ) -> Vec<(u64, T)> {
408 let mut out: Vec<(u64, T)> = Vec::new();
409
410 if let Some((key, value)) = self.inner.range(..lower).next_back() {
411 out.push((*key, value.clone()));
412 }
413
414 for (key, value) in
415 self.inner.range((Included(&lower), Included(&upper)))
416 {
417 out.push((*key, value.clone()));
418 }
419
420 out
421 }
422
423 pub fn get_prev_or_equal(&self, key: u64) -> Option<T> {
424 self.inner
425 .range((Unbounded, Included(&key)))
426 .next_back()
427 .map(|x| x.1.clone())
428 }
429}
430
431pub async fn send_to_tracking<A>(
432 ctx: &mut ActorContext<A>,
433 message: RequestTrackingMessage,
434) -> Result<(), ActorError>
435where
436 A: Actor + Handler<A>,
437{
438 let tracking_path = ActorPath::from("/user/request/tracking");
439 let tracking_actor = ctx
440 .system()
441 .get_actor::<RequestTracking>(&tracking_path)
442 .await?;
443 tracking_actor.tell(message).await
444}
445
446pub async fn emit_fail<A>(
447 ctx: &mut ActorContext<A>,
448 error: ActorError,
449) -> ActorError
450where
451 A: Actor + Handler<A>,
452{
453 error!("Falling, error: {}, actor: {}", error, ctx.path());
454 if let Err(_e) = ctx.emit_fail(error.clone()).await {
455 ctx.system().crash_system();
456 };
457 error
458}
459
460pub fn take_random_signers(
461 signers: HashSet<PublicKey>,
462 quantity: usize,
463) -> (HashSet<PublicKey>, HashSet<PublicKey>) {
464 if quantity == signers.len() {
465 return (signers, HashSet::new());
466 }
467
468 let mut rng = rng();
469
470 let random_signers: HashSet<PublicKey> = signers
471 .iter()
472 .sample(&mut rng, quantity)
473 .into_iter()
474 .cloned()
475 .collect();
476
477 let signers = signers
478 .difference(&random_signers)
479 .cloned()
480 .collect::<HashSet<PublicKey>>();
481
482 (random_signers, signers)
483}
484
485pub async fn send_reboot_to_req<A>(
486 ctx: &mut ActorContext<A>,
487 request_id: DigestIdentifier,
488 governance_id: DigestIdentifier,
489 reboot_type: RebootType,
490) -> Result<(), ActorError>
491where
492 A: Actor + Handler<A>,
493{
494 let req_actor = ctx.get_parent::<RequestManager>().await?;
495 req_actor
496 .tell(RequestManagerMessage::Reboot {
497 governance_id,
498 reboot_type,
499 request_id,
500 })
501 .await
502}
503
504pub async fn abort_req<A>(
505 ctx: &mut ActorContext<A>,
506 request_id: DigestIdentifier,
507 who: PublicKey,
508 reason: String,
509 sn: u64,
510) -> Result<(), ActorError>
511where
512 A: Actor + Handler<A>,
513{
514 let req_actor = ctx.get_parent::<RequestManager>().await?;
515 req_actor
516 .tell(RequestManagerMessage::Abort {
517 request_id,
518 who,
519 reason,
520 sn,
521 })
522 .await
523}
524
525pub async fn purge_storage<A>(
526 ctx: &mut ActorContext<A>,
527) -> Result<(), ActorError>
528where
529 A: PersistentActor,
530 A::Event: BorshSerialize + BorshDeserialize,
531{
532 let store = ctx.get_child::<Store<A>>("store").await?;
533 let _response = store.ask(StoreCommand::Purge).await?;
534
535 Ok(())
536}
537
538pub async fn get_last_event<A>(
539 ctx: &mut ActorContext<A>,
540) -> Result<Option<A::Event>, ActorError>
541where
542 A: PersistentActor,
543 A::Event: BorshSerialize + BorshDeserialize,
544{
545 let store = ctx.get_child::<Store<A>>("store").await?;
546 let response = store.ask(StoreCommand::LastEvent).await?;
547
548 match response {
549 StoreResponse::LastEvent(event) => Ok(event),
550 _ => Err(ActorError::UnexpectedResponse {
551 path: ActorPath::from(format!("{}/store", ctx.path())),
552 expected: "StoreResponse::LastEvent".to_owned(),
553 }),
554 }
555}
556
557pub async fn get_n_events<A>(
558 ctx: &mut ActorContext<A>,
559 last_sn: u64,
560 quantity: u64,
561) -> Result<Vec<A::Event>, ActorError>
562where
563 A: PersistentActor,
564 A::Event: BorshSerialize + BorshDeserialize,
565{
566 let store = ctx.get_child::<Store<A>>("store").await?;
567 let response = store
568 .ask(StoreCommand::GetEvents {
569 from: last_sn,
570 to: last_sn + quantity,
571 })
572 .await?;
573
574 match response {
575 StoreResponse::Events(events) => Ok(events),
576 _ => Err(ActorError::UnexpectedResponse {
577 path: ActorPath::from(format!("{}/store", ctx.path())),
578 expected: "StoreResponse::Events".to_owned(),
579 }),
580 }
581}