1use ave_common::SchemaType;
2use borsh::{BorshDeserialize, BorshSerialize};
3use rand::rng;
4use rand::seq::IteratorRandom;
5use serde::{Deserialize, Serialize};
6use std::collections::{BTreeMap, HashSet};
7use std::fmt::Debug;
8use std::slice;
9use tracing::error;
10
11use ave_actors::{
12 Actor, ActorContext, ActorError, ActorPath, ActorRef, Handler,
13 PersistentActor, Store, StoreCommand, StoreResponse,
14};
15
16use ave_common::identity::{DigestIdentifier, PublicKey};
17
18use crate::governance::model::Quorum;
19use crate::governance::role_register::{
20 RoleDataRegister, RoleRegister, RoleRegisterMessage, RoleRegisterResponse,
21 SearchRole,
22};
23use crate::governance::subject_register::{
24 SubjectRegister, SubjectRegisterMessage,
25};
26use crate::governance::witnesses_register::{
27 WitnessesRegister, WitnessesRegisterMessage, WitnessesRegisterResponse,
28};
29use crate::request::manager::{
30 RebootType, RequestManager, RequestManagerMessage,
31};
32use crate::request::tracking::{RequestTracking, RequestTrackingMessage};
33use std::ops::Bound::{Included, Unbounded};
34
35pub mod contract;
36pub mod node;
37pub mod subject;
38
39pub fn check_quorum_signers(
40 signers: &HashSet<PublicKey>,
41 quorum: &Quorum,
42 workers: &HashSet<PublicKey>,
43) -> bool {
44 signers.is_subset(workers)
45 && quorum.check_quorum(workers.len() as u32, signers.len() as u32)
46}
47
48pub async fn get_actual_roles_register<A>(
49 ctx: &mut ActorContext<A>,
50 governance_id: &DigestIdentifier,
51 evaluation: SearchRole,
52 approval: bool,
53 version: u64,
54) -> Result<(RoleDataRegister, Option<RoleDataRegister>), ActorError>
55where
56 A: Actor + Handler<A>,
57{
58 let path =
59 ActorPath::from(format!("/user/node/{}/role_register", governance_id));
60 let actor = ctx.system().get_actor::<RoleRegister>(&path).await?;
61
62 let response = actor
63 .ask(RoleRegisterMessage::SearchActualRoles {
64 version,
65 evaluation,
66 approval,
67 })
68 .await?;
69
70 match response {
71 RoleRegisterResponse::ActualRoles {
72 evaluation,
73 approval,
74 } => Ok((evaluation, approval)),
75 _ => Err(ActorError::UnexpectedResponse {
76 path,
77 expected: "RolesRegisterResponse::ActualRoles".to_string(),
78 }),
79 }
80}
81
82pub async fn get_validation_roles_register<A>(
83 ctx: &mut ActorContext<A>,
84 governance_id: &DigestIdentifier,
85 search: SearchRole,
86 version: u64,
87) -> Result<RoleDataRegister, ActorError>
88where
89 A: Actor + Handler<A>,
90{
91 let path =
92 ActorPath::from(format!("/user/node/{}/role_register", governance_id));
93 let actor = ctx.system().get_actor::<RoleRegister>(&path).await?;
94
95 let response = actor
96 .ask(RoleRegisterMessage::SearchValidators { search, version })
97 .await?;
98
99 match response {
100 RoleRegisterResponse::Validation(validation) => Ok(validation),
101 _ => Err(ActorError::UnexpectedResponse {
102 path,
103 expected: "RolesRegisterResponse::Validation".to_string(),
104 }),
105 }
106}
107
108pub async fn check_subject_creation<A>(
109 ctx: &mut ActorContext<A>,
110 governance_id: &DigestIdentifier,
111 creator: PublicKey,
112 gov_version: u64,
113 namespace: String,
114 schema_id: SchemaType,
115) -> Result<(), ActorError>
116where
117 A: Actor + Handler<A>,
118{
119 let actor_path = ActorPath::from(format!(
120 "/user/node/{}/subject_register",
121 governance_id
122 ));
123
124 let actor: ActorRef<SubjectRegister> =
125 ctx.system().get_actor(&actor_path).await.map_err(|_| {
126 ActorError::Functional {
127 description: "Governance has not been found".to_string(),
128 }
129 })?;
130
131 let _response = actor
132 .ask(SubjectRegisterMessage::Check {
133 creator,
134 gov_version,
135 namespace,
136 schema_id,
137 })
138 .await?;
139
140 Ok(())
141}
142
143pub async fn check_witness_access<A>(
144 ctx: &mut ActorContext<A>,
145 governance_id: &DigestIdentifier,
146 subject_id: &DigestIdentifier,
147 node: PublicKey,
148 namespace: String,
149 schema_id: SchemaType,
150) -> Result<Option<u64>, ActorError>
151where
152 A: Actor + Handler<A>,
153{
154 let actor_path = ActorPath::from(format!(
155 "/user/node/{}/witnesses_register",
156 governance_id
157 ));
158
159 let actor: ActorRef<WitnessesRegister> =
160 ctx.system().get_actor(&actor_path).await?;
161
162 let response = actor
163 .ask(WitnessesRegisterMessage::Access {
164 subject_id: subject_id.to_owned(),
165 node,
166 namespace,
167 schema_id,
168 })
169 .await?;
170
171 match response {
172 WitnessesRegisterResponse::Access { sn } => Ok(sn),
173 _ => Err(ActorError::UnexpectedResponse {
174 path: actor_path,
175 expected: "WitnessesRegisterResponse::Access { sn }".to_string(),
176 }),
177 }
178}
179
180#[derive(
181 Clone,
182 Copy,
183 Debug,
184 PartialEq,
185 Eq,
186 Serialize,
187 Deserialize,
188 BorshDeserialize,
189 BorshSerialize,
190)]
191pub struct Interval {
192 pub lo: u64,
193 pub hi: u64, }
195
196impl Interval {
197 pub const fn new(a: u64, b: u64) -> Self {
198 if a <= b {
199 Self { lo: a, hi: b }
200 } else {
201 Self { lo: b, hi: a }
202 }
203 }
204
205 pub const fn contains(&self, value: u64) -> bool {
206 value >= self.lo && value <= self.hi
207 }
208}
209
210#[derive(
211 Default,
212 Debug,
213 Clone,
214 Serialize,
215 Deserialize,
216 BorshDeserialize,
217 BorshSerialize,
218)]
219pub struct IntervalSet {
220 intervals: Vec<Interval>,
222}
223
224impl IntervalSet {
225 pub const fn new() -> Self {
226 Self {
227 intervals: Vec::new(),
228 }
229 }
230
231 pub fn contains(&self, x: u64) -> bool {
233 if self.intervals.is_empty() {
234 return false;
235 }
236
237 match self.intervals.binary_search_by(|iv| iv.lo.cmp(&x)) {
238 Ok(_) => true, Err(pos) => {
240 if pos == 0 {
241 return false; }
243 let iv = self.intervals[pos - 1];
244 iv.hi >= x
245 }
246 }
247 }
248
249 pub fn insert(&mut self, mut iv: Interval) {
251 let mut i = match self.intervals.binary_search_by(|x| x.lo.cmp(&iv.lo))
253 {
254 Ok(pos) | Err(pos) => pos,
255 };
256
257 if i > 0 && self.intervals[i - 1].hi >= iv.lo {
259 i -= 1;
260 }
261
262 while i < self.intervals.len() && self.intervals[i].lo <= iv.hi {
264 let cur = self.intervals[i];
265 iv.lo = iv.lo.min(cur.lo);
266 iv.hi = iv.hi.max(cur.hi);
267 self.intervals.remove(i); }
269
270 self.intervals.insert(i, iv);
271 }
272
273 pub fn max_covered_in(&self, ql: u64, qh: u64) -> Option<u64> {
275 let (ql, qh) = if ql <= qh { (ql, qh) } else { (qh, ql) };
276 if self.intervals.is_empty() {
277 return None;
278 }
279
280 let idx = match self.intervals.binary_search_by(|iv| iv.lo.cmp(&qh)) {
282 Ok(pos) => pos, Err(pos) => {
284 if pos == 0 {
285 return None;
286 }
287 pos - 1
288 }
289 };
290
291 let iv = self.intervals[idx];
292 if iv.hi >= ql {
294 Some(iv.hi.min(qh))
295 } else {
296 None
297 }
298 }
299
300 pub fn as_slice(&self) -> &[Interval] {
301 &self.intervals
302 }
303
304 pub fn iter(&self) -> slice::Iter<'_, Interval> {
305 self.intervals.iter()
306 }
307
308 pub fn iter_mut(&mut self) -> slice::IterMut<'_, Interval> {
309 self.intervals.iter_mut()
310 }
311}
312
313impl<'a> IntoIterator for &'a IntervalSet {
314 type Item = &'a Interval;
315 type IntoIter = slice::Iter<'a, Interval>;
316
317 fn into_iter(self) -> Self::IntoIter {
318 self.intervals.iter()
319 }
320}
321
322impl<'a> IntoIterator for &'a mut IntervalSet {
323 type Item = &'a mut Interval;
324 type IntoIter = slice::IterMut<'a, Interval>;
325
326 fn into_iter(self) -> Self::IntoIter {
327 self.intervals.iter_mut()
328 }
329}
330
331impl IntoIterator for IntervalSet {
332 type Item = Interval;
333 type IntoIter = std::vec::IntoIter<Interval>;
334
335 fn into_iter(self) -> Self::IntoIter {
336 self.intervals.into_iter()
337 }
338}
339
340#[derive(
341 Debug,
342 Clone,
343 Serialize,
344 Deserialize,
345 Default,
346 BorshDeserialize,
347 BorshSerialize,
348)]
349pub struct CeilingMap<T> {
350 inner: BTreeMap<u64, T>,
351}
352
353impl<T> CeilingMap<T>
354where
355 T: Debug + Clone + Serialize,
356{
357 pub const fn new() -> Self {
358 Self {
359 inner: BTreeMap::new(),
360 }
361 }
362
363 pub fn last(&self) -> Option<(&u64, &T)> {
364 self.inner.last_key_value()
365 }
366
367 pub fn insert(&mut self, key: u64, value: T) {
368 self.inner.insert(key, value);
369 }
370
371 pub fn range_with_predecessor(
372 &self,
373 lower: u64,
374 upper: u64,
375 ) -> Vec<(u64, T)> {
376 let mut out: Vec<(u64, T)> = Vec::new();
377
378 if let Some((key, value)) = self.inner.range(..lower).next_back() {
379 out.push((*key, value.clone()));
380 }
381
382 for (key, value) in
383 self.inner.range((Included(&lower), Included(&upper)))
384 {
385 out.push((*key, value.clone()));
386 }
387
388 out
389 }
390
391 pub fn get_prev_or_equal(&self, key: u64) -> Option<T> {
392 self.inner
393 .range((Unbounded, Included(&key)))
394 .next_back()
395 .map(|x| x.1.clone())
396 }
397}
398
399pub async fn send_to_tracking<A>(
400 ctx: &mut ActorContext<A>,
401 message: RequestTrackingMessage,
402) -> Result<(), ActorError>
403where
404 A: Actor + Handler<A>,
405{
406 let tracking_path = ActorPath::from("/user/request/tracking");
407 let tracking_actor = ctx
408 .system()
409 .get_actor::<RequestTracking>(&tracking_path)
410 .await?;
411 tracking_actor.tell(message).await
412}
413
414pub async fn emit_fail<A>(
415 ctx: &mut ActorContext<A>,
416 error: ActorError,
417) -> ActorError
418where
419 A: Actor + Handler<A>,
420{
421 error!("Falling, error: {}, actor: {}", error, ctx.path());
422 if let Err(_e) = ctx.emit_fail(error.clone()).await {
423 ctx.system().crash_system();
424 };
425 error
426}
427
428pub fn take_random_signers(
429 signers: HashSet<PublicKey>,
430 quantity: usize,
431) -> (HashSet<PublicKey>, HashSet<PublicKey>) {
432 if quantity == signers.len() {
433 return (signers, HashSet::new());
434 }
435
436 let mut rng = rng();
437
438 let random_signers: HashSet<PublicKey> = signers
439 .iter()
440 .sample(&mut rng, quantity)
441 .into_iter()
442 .cloned()
443 .collect();
444
445 let signers = signers
446 .difference(&random_signers)
447 .cloned()
448 .collect::<HashSet<PublicKey>>();
449
450 (random_signers, signers)
451}
452
453pub async fn send_reboot_to_req<A>(
454 ctx: &mut ActorContext<A>,
455 request_id: DigestIdentifier,
456 governance_id: DigestIdentifier,
457 reboot_type: RebootType,
458) -> Result<(), ActorError>
459where
460 A: Actor + Handler<A>,
461{
462 let req_actor = ctx.get_parent::<RequestManager>().await?;
463 req_actor
464 .tell(RequestManagerMessage::Reboot {
465 governance_id,
466 reboot_type,
467 request_id,
468 })
469 .await
470}
471
472pub async fn abort_req<A>(
473 ctx: &mut ActorContext<A>,
474 request_id: DigestIdentifier,
475 who: PublicKey,
476 reason: String,
477 sn: u64,
478) -> Result<(), ActorError>
479where
480 A: Actor + Handler<A>,
481{
482 let req_actor = ctx.get_parent::<RequestManager>().await?;
483 req_actor
484 .tell(RequestManagerMessage::Abort {
485 request_id,
486 who,
487 reason,
488 sn,
489 })
490 .await
491}
492
493pub async fn purge_storage<A>(
494 ctx: &mut ActorContext<A>,
495) -> Result<(), ActorError>
496where
497 A: PersistentActor,
498 A::Event: BorshSerialize + BorshDeserialize,
499{
500 let store = ctx.get_child::<Store<A>>("store").await?;
501 let _response = store.ask(StoreCommand::Purge).await?;
502
503 Ok(())
504}
505
506pub async fn get_last_event<A>(
507 ctx: &mut ActorContext<A>,
508) -> Result<Option<A::Event>, ActorError>
509where
510 A: PersistentActor,
511 A::Event: BorshSerialize + BorshDeserialize,
512{
513 let store = ctx.get_child::<Store<A>>("store").await?;
514 let response = store.ask(StoreCommand::LastEvent).await?;
515
516 match response {
517 StoreResponse::LastEvent(event) => Ok(event),
518 _ => Err(ActorError::UnexpectedResponse {
519 path: ActorPath::from(format!("{}/store", ctx.path())),
520 expected: "StoreResponse::LastEvent".to_owned(),
521 }),
522 }
523}
524
525pub async fn get_n_events<A>(
526 ctx: &mut ActorContext<A>,
527 last_sn: u64,
528 quantity: u64,
529) -> Result<Vec<A::Event>, ActorError>
530where
531 A: PersistentActor,
532 A::Event: BorshSerialize + BorshDeserialize,
533{
534 let store = ctx.get_child::<Store<A>>("store").await?;
535 let response = store
536 .ask(StoreCommand::GetEvents {
537 from: last_sn,
538 to: last_sn + quantity,
539 })
540 .await?;
541
542 match response {
543 StoreResponse::Events(events) => Ok(events),
544 _ => Err(ActorError::UnexpectedResponse {
545 path: ActorPath::from(format!("{}/store", ctx.path())),
546 expected: "StoreResponse::Events".to_owned(),
547 }),
548 }
549}