Skip to main content

ave_core/model/common/
mod.rs

1use ave_common::SchemaType;
2use borsh::{BorshDeserialize, BorshSerialize};
3use rand::rng;
4use rand::seq::IteratorRandom;
5use serde::{Deserialize, Serialize};
6use std::collections::{BTreeMap, HashSet};
7use std::fmt::Debug;
8use std::slice;
9use tracing::error;
10
11use ave_actors::{
12    Actor, ActorContext, ActorError, ActorPath, ActorRef, Handler,
13    PersistentActor, Store, StoreCommand, StoreResponse,
14};
15
16use ave_common::identity::{DigestIdentifier, PublicKey};
17
18use crate::governance::model::Quorum;
19use crate::governance::role_register::{
20    RoleDataRegister, RoleRegister, RoleRegisterMessage, RoleRegisterResponse,
21    SearchRole,
22};
23use crate::governance::subject_register::{
24    SubjectRegister, SubjectRegisterMessage,
25};
26use crate::governance::witnesses_register::{
27    WitnessesRegister, WitnessesRegisterMessage, WitnessesRegisterResponse,
28};
29use crate::request::manager::{
30    RebootType, RequestManager, RequestManagerMessage,
31};
32use crate::request::tracking::{RequestTracking, RequestTrackingMessage};
33use std::ops::Bound::{Included, Unbounded};
34
35pub mod contract;
36pub mod node;
37pub mod subject;
38
39pub fn check_quorum_signers(
40    signers: &HashSet<PublicKey>,
41    quorum: &Quorum,
42    workers: &HashSet<PublicKey>,
43) -> bool {
44    signers.is_subset(workers)
45        && quorum.check_quorum(workers.len() as u32, signers.len() as u32)
46}
47
48pub async fn get_actual_roles_register<A>(
49    ctx: &mut ActorContext<A>,
50    governance_id: &DigestIdentifier,
51    evaluation: SearchRole,
52    approval: bool,
53    version: u64,
54) -> Result<(RoleDataRegister, Option<RoleDataRegister>), ActorError>
55where
56    A: Actor + Handler<A>,
57{
58    let path =
59        ActorPath::from(format!("/user/node/{}/role_register", governance_id));
60    let actor = ctx.system().get_actor::<RoleRegister>(&path).await?;
61
62    let response = actor
63        .ask(RoleRegisterMessage::SearchActualRoles {
64            version,
65            evaluation,
66            approval,
67        })
68        .await?;
69
70    match response {
71        RoleRegisterResponse::ActualRoles {
72            evaluation,
73            approval,
74        } => Ok((evaluation, approval)),
75        _ => Err(ActorError::UnexpectedResponse {
76            path,
77            expected: "RolesRegisterResponse::ActualRoles".to_string(),
78        }),
79    }
80}
81
82pub async fn get_validation_roles_register<A>(
83    ctx: &mut ActorContext<A>,
84    governance_id: &DigestIdentifier,
85    search: SearchRole,
86    version: u64,
87) -> Result<RoleDataRegister, ActorError>
88where
89    A: Actor + Handler<A>,
90{
91    let path =
92        ActorPath::from(format!("/user/node/{}/role_register", governance_id));
93    let actor = ctx.system().get_actor::<RoleRegister>(&path).await?;
94
95    let response = actor
96        .ask(RoleRegisterMessage::SearchValidators { search, version })
97        .await?;
98
99    match response {
100        RoleRegisterResponse::Validation(validation) => Ok(validation),
101        _ => Err(ActorError::UnexpectedResponse {
102            path,
103            expected: "RolesRegisterResponse::Validation".to_string(),
104        }),
105    }
106}
107
108pub async fn check_subject_creation<A>(
109    ctx: &mut ActorContext<A>,
110    governance_id: &DigestIdentifier,
111    creator: PublicKey,
112    gov_version: u64,
113    namespace: String,
114    schema_id: SchemaType,
115) -> Result<(), ActorError>
116where
117    A: Actor + Handler<A>,
118{
119    let actor_path = ActorPath::from(format!(
120        "/user/node/{}/subject_register",
121        governance_id
122    ));
123
124    let actor: ActorRef<SubjectRegister> =
125        ctx.system().get_actor(&actor_path).await.map_err(|_| {
126            ActorError::Functional {
127                description: "Governance has not been found".to_string(),
128            }
129        })?;
130
131    let _response = actor
132        .ask(SubjectRegisterMessage::Check {
133            creator,
134            gov_version,
135            namespace,
136            schema_id,
137        })
138        .await?;
139
140    Ok(())
141}
142
143pub async fn check_witness_access<A>(
144    ctx: &mut ActorContext<A>,
145    governance_id: &DigestIdentifier,
146    subject_id: &DigestIdentifier,
147    node: PublicKey,
148    namespace: String,
149    schema_id: SchemaType,
150) -> Result<Option<u64>, ActorError>
151where
152    A: Actor + Handler<A>,
153{
154    let actor_path = ActorPath::from(format!(
155        "/user/node/{}/witnesses_register",
156        governance_id
157    ));
158
159    let actor: ActorRef<WitnessesRegister> =
160        ctx.system().get_actor(&actor_path).await?;
161
162    let response = actor
163        .ask(WitnessesRegisterMessage::Access {
164            subject_id: subject_id.to_owned(),
165            node,
166            namespace,
167            schema_id,
168        })
169        .await?;
170
171    match response {
172        WitnessesRegisterResponse::Access { sn } => Ok(sn),
173        _ => Err(ActorError::UnexpectedResponse {
174            path: actor_path,
175            expected: "WitnessesRegisterResponse::Access { sn }".to_string(),
176        }),
177    }
178}
179
180#[derive(
181    Clone,
182    Copy,
183    Debug,
184    PartialEq,
185    Eq,
186    Serialize,
187    Deserialize,
188    BorshDeserialize,
189    BorshSerialize,
190)]
191pub struct Interval {
192    pub lo: u64,
193    pub hi: u64, // inclusivo
194}
195
196impl Interval {
197    pub const fn new(a: u64, b: u64) -> Self {
198        if a <= b {
199            Self { lo: a, hi: b }
200        } else {
201            Self { lo: b, hi: a }
202        }
203    }
204
205    pub const fn contains(&self, value: u64) -> bool {
206        value >= self.lo && value <= self.hi
207    }
208}
209
210#[derive(
211    Default,
212    Debug,
213    Clone,
214    Serialize,
215    Deserialize,
216    BorshDeserialize,
217    BorshSerialize,
218)]
219pub struct IntervalSet {
220    // Invariante: ordenados por lo, disjuntos y ya "mergeados" (hi < siguiente.lo)
221    intervals: Vec<Interval>,
222}
223
224impl IntervalSet {
225    pub const fn new() -> Self {
226        Self {
227            intervals: Vec::new(),
228        }
229    }
230
231    // Devuelve true si `x` está dentro de algún intervalo (extremos inclusivos).
232    pub fn contains(&self, x: u64) -> bool {
233        if self.intervals.is_empty() {
234            return false;
235        }
236
237        match self.intervals.binary_search_by(|iv| iv.lo.cmp(&x)) {
238            Ok(_) => true, // existe un intervalo con lo == x => contenido seguro
239            Err(pos) => {
240                if pos == 0 {
241                    return false; // x es menor que el lo del primer intervalo
242                }
243                let iv = self.intervals[pos - 1];
244                iv.hi >= x
245            }
246        }
247    }
248
249    // Inserta un intervalo inclusivo y fusiona solapes (incluye tocar por extremo: [1,4] + [4,7] => [1,7]).
250    pub fn insert(&mut self, mut iv: Interval) {
251        // Posición donde iv.lo podría insertarse manteniendo orden
252        let mut i = match self.intervals.binary_search_by(|x| x.lo.cmp(&iv.lo))
253        {
254            Ok(pos) | Err(pos) => pos,
255        };
256
257        // Si puede fusionar con el anterior, retrocede uno
258        if i > 0 && self.intervals[i - 1].hi >= iv.lo {
259            i -= 1;
260        }
261
262        // Fusiona hacia delante todo lo que solape/toque (condición inclusiva: next.lo <= iv.hi)
263        while i < self.intervals.len() && self.intervals[i].lo <= iv.hi {
264            let cur = self.intervals[i];
265            iv.lo = iv.lo.min(cur.lo);
266            iv.hi = iv.hi.max(cur.hi);
267            self.intervals.remove(i); // O(n) pero muy compacto en memoria
268        }
269
270        self.intervals.insert(i, iv);
271    }
272
273    // Consulta: devuelve el máximo valor cubierto dentro de [ql, qh], o None.
274    pub fn max_covered_in(&self, ql: u64, qh: u64) -> Option<u64> {
275        let (ql, qh) = if ql <= qh { (ql, qh) } else { (qh, ql) };
276        if self.intervals.is_empty() {
277            return None;
278        }
279
280        // Queremos el intervalo más a la derecha con lo <= qh
281        let idx = match self.intervals.binary_search_by(|iv| iv.lo.cmp(&qh)) {
282            Ok(pos) => pos, // lo == qh
283            Err(pos) => {
284                if pos == 0 {
285                    return None;
286                }
287                pos - 1
288            }
289        };
290
291        let iv = self.intervals[idx];
292        // Hay intersección si iv.hi >= ql (ya sabemos iv.lo <= qh)
293        if iv.hi >= ql {
294            Some(iv.hi.min(qh))
295        } else {
296            None
297        }
298    }
299
300    pub fn as_slice(&self) -> &[Interval] {
301        &self.intervals
302    }
303
304    pub fn iter(&self) -> slice::Iter<'_, Interval> {
305        self.intervals.iter()
306    }
307
308    pub fn iter_mut(&mut self) -> slice::IterMut<'_, Interval> {
309        self.intervals.iter_mut()
310    }
311}
312
313impl<'a> IntoIterator for &'a IntervalSet {
314    type Item = &'a Interval;
315    type IntoIter = slice::Iter<'a, Interval>;
316
317    fn into_iter(self) -> Self::IntoIter {
318        self.intervals.iter()
319    }
320}
321
322impl<'a> IntoIterator for &'a mut IntervalSet {
323    type Item = &'a mut Interval;
324    type IntoIter = slice::IterMut<'a, Interval>;
325
326    fn into_iter(self) -> Self::IntoIter {
327        self.intervals.iter_mut()
328    }
329}
330
331impl IntoIterator for IntervalSet {
332    type Item = Interval;
333    type IntoIter = std::vec::IntoIter<Interval>;
334
335    fn into_iter(self) -> Self::IntoIter {
336        self.intervals.into_iter()
337    }
338}
339
340#[derive(
341    Debug,
342    Clone,
343    Serialize,
344    Deserialize,
345    Default,
346    BorshDeserialize,
347    BorshSerialize,
348)]
349pub struct CeilingMap<T> {
350    inner: BTreeMap<u64, T>,
351}
352
353impl<T> CeilingMap<T>
354where
355    T: Debug + Clone + Serialize,
356{
357    pub const fn new() -> Self {
358        Self {
359            inner: BTreeMap::new(),
360        }
361    }
362
363    pub fn last(&self) -> Option<(&u64, &T)> {
364        self.inner.last_key_value()
365    }
366
367    pub fn insert(&mut self, key: u64, value: T) {
368        self.inner.insert(key, value);
369    }
370
371    pub fn range_with_predecessor(
372        &self,
373        lower: u64,
374        upper: u64,
375    ) -> Vec<(u64, T)> {
376        let mut out: Vec<(u64, T)> = Vec::new();
377
378        if let Some((key, value)) = self.inner.range(..lower).next_back() {
379            out.push((*key, value.clone()));
380        }
381
382        for (key, value) in
383            self.inner.range((Included(&lower), Included(&upper)))
384        {
385            out.push((*key, value.clone()));
386        }
387
388        out
389    }
390
391    pub fn get_prev_or_equal(&self, key: u64) -> Option<T> {
392        self.inner
393            .range((Unbounded, Included(&key)))
394            .next_back()
395            .map(|x| x.1.clone())
396    }
397}
398
399pub async fn send_to_tracking<A>(
400    ctx: &mut ActorContext<A>,
401    message: RequestTrackingMessage,
402) -> Result<(), ActorError>
403where
404    A: Actor + Handler<A>,
405{
406    let tracking_path = ActorPath::from("/user/request/tracking");
407    let tracking_actor = ctx
408        .system()
409        .get_actor::<RequestTracking>(&tracking_path)
410        .await?;
411    tracking_actor.tell(message).await
412}
413
414pub async fn emit_fail<A>(
415    ctx: &mut ActorContext<A>,
416    error: ActorError,
417) -> ActorError
418where
419    A: Actor + Handler<A>,
420{
421    error!("Falling, error: {}, actor: {}", error, ctx.path());
422    if let Err(_e) = ctx.emit_fail(error.clone()).await {
423        ctx.system().crash_system();
424    };
425    error
426}
427
428pub fn take_random_signers(
429    signers: HashSet<PublicKey>,
430    quantity: usize,
431) -> (HashSet<PublicKey>, HashSet<PublicKey>) {
432    if quantity == signers.len() {
433        return (signers, HashSet::new());
434    }
435
436    let mut rng = rng();
437
438    let random_signers: HashSet<PublicKey> = signers
439        .iter()
440        .sample(&mut rng, quantity)
441        .into_iter()
442        .cloned()
443        .collect();
444
445    let signers = signers
446        .difference(&random_signers)
447        .cloned()
448        .collect::<HashSet<PublicKey>>();
449
450    (random_signers, signers)
451}
452
453pub async fn send_reboot_to_req<A>(
454    ctx: &mut ActorContext<A>,
455    request_id: DigestIdentifier,
456    governance_id: DigestIdentifier,
457    reboot_type: RebootType,
458) -> Result<(), ActorError>
459where
460    A: Actor + Handler<A>,
461{
462    let req_actor = ctx.get_parent::<RequestManager>().await?;
463    req_actor
464        .tell(RequestManagerMessage::Reboot {
465            governance_id,
466            reboot_type,
467            request_id,
468        })
469        .await
470}
471
472pub async fn abort_req<A>(
473    ctx: &mut ActorContext<A>,
474    request_id: DigestIdentifier,
475    who: PublicKey,
476    reason: String,
477    sn: u64,
478) -> Result<(), ActorError>
479where
480    A: Actor + Handler<A>,
481{
482    let req_actor = ctx.get_parent::<RequestManager>().await?;
483    req_actor
484        .tell(RequestManagerMessage::Abort {
485            request_id,
486            who,
487            reason,
488            sn,
489        })
490        .await
491}
492
493pub async fn purge_storage<A>(
494    ctx: &mut ActorContext<A>,
495) -> Result<(), ActorError>
496where
497    A: PersistentActor,
498    A::Event: BorshSerialize + BorshDeserialize,
499{
500    let store = ctx.get_child::<Store<A>>("store").await?;
501    let _response = store.ask(StoreCommand::Purge).await?;
502
503    Ok(())
504}
505
506pub async fn get_last_event<A>(
507    ctx: &mut ActorContext<A>,
508) -> Result<Option<A::Event>, ActorError>
509where
510    A: PersistentActor,
511    A::Event: BorshSerialize + BorshDeserialize,
512{
513    let store = ctx.get_child::<Store<A>>("store").await?;
514    let response = store.ask(StoreCommand::LastEvent).await?;
515
516    match response {
517        StoreResponse::LastEvent(event) => Ok(event),
518        _ => Err(ActorError::UnexpectedResponse {
519            path: ActorPath::from(format!("{}/store", ctx.path())),
520            expected: "StoreResponse::LastEvent".to_owned(),
521        }),
522    }
523}
524
525pub async fn get_n_events<A>(
526    ctx: &mut ActorContext<A>,
527    last_sn: u64,
528    quantity: u64,
529) -> Result<Vec<A::Event>, ActorError>
530where
531    A: PersistentActor,
532    A::Event: BorshSerialize + BorshDeserialize,
533{
534    let store = ctx.get_child::<Store<A>>("store").await?;
535    let response = store
536        .ask(StoreCommand::GetEvents {
537            from: last_sn,
538            to: last_sn + quantity,
539        })
540        .await?;
541
542    match response {
543        StoreResponse::Events(events) => Ok(events),
544        _ => Err(ActorError::UnexpectedResponse {
545            path: ActorPath::from(format!("{}/store", ctx.path())),
546            expected: "StoreResponse::Events".to_owned(),
547        }),
548    }
549}