Skip to main content

ave_core/model/common/
mod.rs

1use ave_common::SchemaType;
2use borsh::{BorshDeserialize, BorshSerialize};
3use rand::rng;
4use rand::seq::IteratorRandom;
5use serde::{Deserialize, Serialize};
6use std::collections::{BTreeMap, HashSet};
7use std::fmt::Debug;
8use std::slice;
9use tracing::error;
10
11use ave_actors::{
12    Actor, ActorContext, ActorError, ActorPath, ActorRef, Handler,
13    PersistentActor, Store, StoreCommand, StoreResponse,
14};
15
16use ave_common::identity::{DigestIdentifier, PublicKey};
17
18use crate::governance::model::Quorum;
19use crate::governance::role_register::{
20    CurrentValidationRoles, RoleDataRegister, RoleRegister,
21    RoleRegisterMessage, RoleRegisterResponse, SearchRole,
22};
23use crate::governance::subject_register::{
24    SubjectRegister, SubjectRegisterMessage,
25};
26use crate::governance::witnesses_register::{
27    WitnessesRegister, WitnessesRegisterMessage, WitnessesRegisterResponse,
28};
29use crate::request::manager::{
30    RebootType, RequestManager, RequestManagerMessage,
31};
32use crate::request::tracking::{RequestTracking, RequestTrackingMessage};
33use std::ops::Bound::{Included, Unbounded};
34
35pub mod contract;
36pub mod node;
37pub mod subject;
38
39pub fn check_quorum_signers(
40    signers: &HashSet<PublicKey>,
41    quorum: &Quorum,
42    workers: &HashSet<PublicKey>,
43) -> bool {
44    signers.is_subset(workers)
45        && quorum.check_quorum(workers.len() as u32, signers.len() as u32)
46}
47
48pub async fn get_actual_roles_register<A>(
49    ctx: &mut ActorContext<A>,
50    governance_id: &DigestIdentifier,
51    evaluation: SearchRole,
52    approval: bool,
53    version: u64,
54) -> Result<(RoleDataRegister, Option<RoleDataRegister>), ActorError>
55where
56    A: Actor + Handler<A>,
57{
58    let path = ActorPath::from(format!(
59        "/user/node/subject_manager/{}/role_register",
60        governance_id
61    ));
62    let actor = ctx.system().get_actor::<RoleRegister>(&path).await?;
63
64    let response = actor
65        .ask(RoleRegisterMessage::SearchActualRoles {
66            version,
67            evaluation,
68            approval,
69        })
70        .await?;
71
72    match response {
73        RoleRegisterResponse::ActualRoles {
74            evaluation,
75            approval,
76        } => Ok((evaluation, approval)),
77        _ => Err(ActorError::UnexpectedResponse {
78            path,
79            expected: "RolesRegisterResponse::ActualRoles".to_string(),
80        }),
81    }
82}
83
84pub async fn get_validation_roles_register<A>(
85    ctx: &mut ActorContext<A>,
86    governance_id: &DigestIdentifier,
87    search: SearchRole,
88    version: u64,
89) -> Result<RoleDataRegister, ActorError>
90where
91    A: Actor + Handler<A>,
92{
93    let path = ActorPath::from(format!(
94        "/user/node/subject_manager/{}/role_register",
95        governance_id
96    ));
97    let actor = ctx.system().get_actor::<RoleRegister>(&path).await?;
98
99    let response = actor
100        .ask(RoleRegisterMessage::SearchValidators { search, version })
101        .await?;
102
103    match response {
104        RoleRegisterResponse::Validation(validation) => Ok(validation),
105        _ => Err(ActorError::UnexpectedResponse {
106            path,
107            expected: "RolesRegisterResponse::Validation".to_string(),
108        }),
109    }
110}
111
112pub async fn get_current_validation_roles_register<A>(
113    ctx: &mut ActorContext<A>,
114    governance_id: &DigestIdentifier,
115    schema_id: SchemaType,
116) -> Result<CurrentValidationRoles, ActorError>
117where
118    A: Actor + Handler<A>,
119{
120    let path = ActorPath::from(format!(
121        "/user/node/subject_manager/{}/role_register",
122        governance_id
123    ));
124    let actor = ctx.system().get_actor::<RoleRegister>(&path).await?;
125
126    let response = actor
127        .ask(RoleRegisterMessage::GetCurrentValidationRoles { schema_id })
128        .await?;
129
130    match response {
131        RoleRegisterResponse::CurrentValidationRoles(roles) => Ok(roles),
132        _ => Err(ActorError::UnexpectedResponse {
133            path,
134            expected: "RolesRegisterResponse::CurrentValidationRoles"
135                .to_string(),
136        }),
137    }
138}
139
140pub async fn check_subject_creation<A>(
141    ctx: &mut ActorContext<A>,
142    governance_id: &DigestIdentifier,
143    creator: PublicKey,
144    gov_version: u64,
145    namespace: String,
146    schema_id: SchemaType,
147) -> Result<(), ActorError>
148where
149    A: Actor + Handler<A>,
150{
151    let actor_path = ActorPath::from(format!(
152        "/user/node/subject_manager/{}/subject_register",
153        governance_id
154    ));
155
156    let actor: ActorRef<SubjectRegister> =
157        ctx.system().get_actor(&actor_path).await.map_err(|_| {
158            ActorError::Functional {
159                description: "Governance has not been found".to_string(),
160            }
161        })?;
162
163    let _response = actor
164        .ask(SubjectRegisterMessage::Check {
165            creator,
166            gov_version,
167            namespace,
168            schema_id,
169        })
170        .await?;
171
172    Ok(())
173}
174
175pub async fn check_witness_access<A>(
176    ctx: &mut ActorContext<A>,
177    governance_id: &DigestIdentifier,
178    subject_id: &DigestIdentifier,
179    node: PublicKey,
180    namespace: String,
181    schema_id: SchemaType,
182) -> Result<Option<u64>, ActorError>
183where
184    A: Actor + Handler<A>,
185{
186    let actor_path = ActorPath::from(format!(
187        "/user/node/subject_manager/{}/witnesses_register",
188        governance_id
189    ));
190
191    let actor: ActorRef<WitnessesRegister> =
192        ctx.system().get_actor(&actor_path).await?;
193
194    let response = actor
195        .ask(WitnessesRegisterMessage::Access {
196            subject_id: subject_id.to_owned(),
197            node,
198            namespace,
199            schema_id,
200        })
201        .await?;
202
203    match response {
204        WitnessesRegisterResponse::Access { sn } => Ok(sn),
205        _ => Err(ActorError::UnexpectedResponse {
206            path: actor_path,
207            expected: "WitnessesRegisterResponse::Access { sn }".to_string(),
208        }),
209    }
210}
211
212#[derive(
213    Clone,
214    Copy,
215    Debug,
216    PartialEq,
217    Eq,
218    Serialize,
219    Deserialize,
220    BorshDeserialize,
221    BorshSerialize,
222)]
223pub struct Interval {
224    pub lo: u64,
225    pub hi: u64, // inclusivo
226}
227
228impl Interval {
229    pub const fn new(a: u64, b: u64) -> Self {
230        if a <= b {
231            Self { lo: a, hi: b }
232        } else {
233            Self { lo: b, hi: a }
234        }
235    }
236
237    pub const fn contains(&self, value: u64) -> bool {
238        value >= self.lo && value <= self.hi
239    }
240}
241
242#[derive(
243    Default,
244    Debug,
245    Clone,
246    Serialize,
247    Deserialize,
248    BorshDeserialize,
249    BorshSerialize,
250)]
251pub struct IntervalSet {
252    // Invariante: ordenados por lo, disjuntos y ya "mergeados" (hi < siguiente.lo)
253    intervals: Vec<Interval>,
254}
255
256impl IntervalSet {
257    pub const fn new() -> Self {
258        Self {
259            intervals: Vec::new(),
260        }
261    }
262
263    // Devuelve true si `x` está dentro de algún intervalo (extremos inclusivos).
264    pub fn contains(&self, x: u64) -> bool {
265        if self.intervals.is_empty() {
266            return false;
267        }
268
269        match self.intervals.binary_search_by(|iv| iv.lo.cmp(&x)) {
270            Ok(_) => true, // existe un intervalo con lo == x => contenido seguro
271            Err(pos) => {
272                if pos == 0 {
273                    return false; // x es menor que el lo del primer intervalo
274                }
275                let iv = self.intervals[pos - 1];
276                iv.hi >= x
277            }
278        }
279    }
280
281    // Inserta un intervalo inclusivo y fusiona solapes (incluye tocar por extremo: [1,4] + [4,7] => [1,7]).
282    pub fn insert(&mut self, mut iv: Interval) {
283        // Posición donde iv.lo podría insertarse manteniendo orden
284        let mut i = match self.intervals.binary_search_by(|x| x.lo.cmp(&iv.lo))
285        {
286            Ok(pos) | Err(pos) => pos,
287        };
288
289        // Si puede fusionar con el anterior, retrocede uno
290        if i > 0 && self.intervals[i - 1].hi >= iv.lo {
291            i -= 1;
292        }
293
294        // Fusiona hacia delante todo lo que solape/toque (condición inclusiva: next.lo <= iv.hi)
295        while i < self.intervals.len() && self.intervals[i].lo <= iv.hi {
296            let cur = self.intervals[i];
297            iv.lo = iv.lo.min(cur.lo);
298            iv.hi = iv.hi.max(cur.hi);
299            self.intervals.remove(i); // O(n) pero muy compacto en memoria
300        }
301
302        self.intervals.insert(i, iv);
303    }
304
305    // Consulta: devuelve el máximo valor cubierto dentro de [ql, qh], o None.
306    pub fn max_covered_in(&self, ql: u64, qh: u64) -> Option<u64> {
307        let (ql, qh) = if ql <= qh { (ql, qh) } else { (qh, ql) };
308        if self.intervals.is_empty() {
309            return None;
310        }
311
312        // Queremos el intervalo más a la derecha con lo <= qh
313        let idx = match self.intervals.binary_search_by(|iv| iv.lo.cmp(&qh)) {
314            Ok(pos) => pos, // lo == qh
315            Err(pos) => {
316                if pos == 0 {
317                    return None;
318                }
319                pos - 1
320            }
321        };
322
323        let iv = self.intervals[idx];
324        // Hay intersección si iv.hi >= ql (ya sabemos iv.lo <= qh)
325        if iv.hi >= ql {
326            Some(iv.hi.min(qh))
327        } else {
328            None
329        }
330    }
331
332    pub fn as_slice(&self) -> &[Interval] {
333        &self.intervals
334    }
335
336    pub fn iter(&self) -> slice::Iter<'_, Interval> {
337        self.intervals.iter()
338    }
339
340    pub fn iter_mut(&mut self) -> slice::IterMut<'_, Interval> {
341        self.intervals.iter_mut()
342    }
343}
344
345impl<'a> IntoIterator for &'a IntervalSet {
346    type Item = &'a Interval;
347    type IntoIter = slice::Iter<'a, Interval>;
348
349    fn into_iter(self) -> Self::IntoIter {
350        self.intervals.iter()
351    }
352}
353
354impl<'a> IntoIterator for &'a mut IntervalSet {
355    type Item = &'a mut Interval;
356    type IntoIter = slice::IterMut<'a, Interval>;
357
358    fn into_iter(self) -> Self::IntoIter {
359        self.intervals.iter_mut()
360    }
361}
362
363impl IntoIterator for IntervalSet {
364    type Item = Interval;
365    type IntoIter = std::vec::IntoIter<Interval>;
366
367    fn into_iter(self) -> Self::IntoIter {
368        self.intervals.into_iter()
369    }
370}
371
372#[derive(
373    Debug,
374    Clone,
375    Serialize,
376    Deserialize,
377    Default,
378    BorshDeserialize,
379    BorshSerialize,
380)]
381pub struct CeilingMap<T> {
382    inner: BTreeMap<u64, T>,
383}
384
385impl<T> CeilingMap<T>
386where
387    T: Debug + Clone + Serialize,
388{
389    pub const fn new() -> Self {
390        Self {
391            inner: BTreeMap::new(),
392        }
393    }
394
395    pub fn last(&self) -> Option<(&u64, &T)> {
396        self.inner.last_key_value()
397    }
398
399    pub fn insert(&mut self, key: u64, value: T) {
400        self.inner.insert(key, value);
401    }
402
403    pub fn range_with_predecessor(
404        &self,
405        lower: u64,
406        upper: u64,
407    ) -> Vec<(u64, T)> {
408        let mut out: Vec<(u64, T)> = Vec::new();
409
410        if let Some((key, value)) = self.inner.range(..lower).next_back() {
411            out.push((*key, value.clone()));
412        }
413
414        for (key, value) in
415            self.inner.range((Included(&lower), Included(&upper)))
416        {
417            out.push((*key, value.clone()));
418        }
419
420        out
421    }
422
423    pub fn get_prev_or_equal(&self, key: u64) -> Option<T> {
424        self.inner
425            .range((Unbounded, Included(&key)))
426            .next_back()
427            .map(|x| x.1.clone())
428    }
429}
430
431pub async fn send_to_tracking<A>(
432    ctx: &mut ActorContext<A>,
433    message: RequestTrackingMessage,
434) -> Result<(), ActorError>
435where
436    A: Actor + Handler<A>,
437{
438    let tracking_path = ActorPath::from("/user/request/tracking");
439    let tracking_actor = ctx
440        .system()
441        .get_actor::<RequestTracking>(&tracking_path)
442        .await?;
443    tracking_actor.tell(message).await
444}
445
446pub async fn emit_fail<A>(
447    ctx: &mut ActorContext<A>,
448    error: ActorError,
449) -> ActorError
450where
451    A: Actor + Handler<A>,
452{
453    error!("Falling, error: {}, actor: {}", error, ctx.path());
454    if let Err(_e) = ctx.emit_fail(error.clone()).await {
455        ctx.system().crash_system();
456    };
457    error
458}
459
460pub fn take_random_signers(
461    signers: HashSet<PublicKey>,
462    quantity: usize,
463) -> (HashSet<PublicKey>, HashSet<PublicKey>) {
464    if quantity == signers.len() {
465        return (signers, HashSet::new());
466    }
467
468    let mut rng = rng();
469
470    let random_signers: HashSet<PublicKey> = signers
471        .iter()
472        .sample(&mut rng, quantity)
473        .into_iter()
474        .cloned()
475        .collect();
476
477    let signers = signers
478        .difference(&random_signers)
479        .cloned()
480        .collect::<HashSet<PublicKey>>();
481
482    (random_signers, signers)
483}
484
485pub async fn send_reboot_to_req<A>(
486    ctx: &mut ActorContext<A>,
487    request_id: DigestIdentifier,
488    governance_id: DigestIdentifier,
489    reboot_type: RebootType,
490) -> Result<(), ActorError>
491where
492    A: Actor + Handler<A>,
493{
494    let req_actor = ctx.get_parent::<RequestManager>().await?;
495    req_actor
496        .tell(RequestManagerMessage::Reboot {
497            governance_id,
498            reboot_type,
499            request_id,
500        })
501        .await
502}
503
504pub async fn abort_req<A>(
505    ctx: &mut ActorContext<A>,
506    request_id: DigestIdentifier,
507    who: PublicKey,
508    reason: String,
509    sn: u64,
510) -> Result<(), ActorError>
511where
512    A: Actor + Handler<A>,
513{
514    let req_actor = ctx.get_parent::<RequestManager>().await?;
515    req_actor
516        .tell(RequestManagerMessage::Abort {
517            request_id,
518            who,
519            reason,
520            sn,
521        })
522        .await
523}
524
525pub async fn purge_storage<A>(
526    ctx: &mut ActorContext<A>,
527) -> Result<(), ActorError>
528where
529    A: PersistentActor,
530    A::Event: BorshSerialize + BorshDeserialize,
531{
532    let store = ctx.get_child::<Store<A>>("store").await?;
533    let _response = store.ask(StoreCommand::Purge).await?;
534
535    Ok(())
536}
537
538pub async fn get_last_event<A>(
539    ctx: &mut ActorContext<A>,
540) -> Result<Option<A::Event>, ActorError>
541where
542    A: PersistentActor,
543    A::Event: BorshSerialize + BorshDeserialize,
544{
545    let store = ctx.get_child::<Store<A>>("store").await?;
546    let response = store.ask(StoreCommand::LastEvent).await?;
547
548    match response {
549        StoreResponse::LastEvent(event) => Ok(event),
550        _ => Err(ActorError::UnexpectedResponse {
551            path: ActorPath::from(format!("{}/store", ctx.path())),
552            expected: "StoreResponse::LastEvent".to_owned(),
553        }),
554    }
555}
556
557pub async fn get_n_events<A>(
558    ctx: &mut ActorContext<A>,
559    last_sn: u64,
560    quantity: u64,
561) -> Result<Vec<A::Event>, ActorError>
562where
563    A: PersistentActor,
564    A::Event: BorshSerialize + BorshDeserialize,
565{
566    let store = ctx.get_child::<Store<A>>("store").await?;
567    let response = store
568        .ask(StoreCommand::GetEvents {
569            from: last_sn,
570            to: last_sn + quantity,
571        })
572        .await?;
573
574    match response {
575        StoreResponse::Events(events) => Ok(events),
576        _ => Err(ActorError::UnexpectedResponse {
577            path: ActorPath::from(format!("{}/store", ctx.path())),
578            expected: "StoreResponse::Events".to_owned(),
579        }),
580    }
581}