Skip to main content

ave_core/governance/
sn_register.rs

1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use ave_actors::{
5    Actor, ActorContext, ActorError, ActorPath, Event, Handler, Message,
6    Response,
7};
8use ave_actors::{LightPersistence, PersistentActor};
9use ave_common::SchemaType;
10use ave_common::identity::{DigestIdentifier, PublicKey};
11use borsh::{BorshDeserialize, BorshSerialize};
12use serde::{Deserialize, Serialize};
13use tracing::{Span, debug, error, info_span};
14
15use crate::model::common::CeilingMap;
16use crate::{
17    db::Storable,
18    model::common::{emit_fail, purge_storage},
19};
20
21#[derive(
22    Clone,
23    Debug,
24    Serialize,
25    Deserialize,
26    Hash,
27    PartialEq,
28    Eq,
29    Ord,
30    PartialOrd,
31    BorshDeserialize,
32    BorshSerialize,
33)]
34pub struct OwnerSchema {
35    pub owner: PublicKey,
36    pub schema_id: SchemaType,
37    pub namespace: String,
38}
39
40#[derive(
41    Clone,
42    Debug,
43    Serialize,
44    Deserialize,
45    Default,
46    BorshDeserialize,
47    BorshSerialize,
48)]
49pub struct SnRegister {
50    register: HashMap<DigestIdentifier, CeilingMap<u64>>,
51}
52
53#[derive(Clone, Debug, Serialize, Deserialize)]
54pub enum SnRegisterMessage {
55    PurgeStorage,
56    DeleteSubject {
57        subject_id: DigestIdentifier,
58    },
59    RegisterSn {
60        subject_id: DigestIdentifier,
61        gov_version: u64,
62        sn: u64,
63    },
64    GetSn {
65        subject_id: DigestIdentifier,
66        gov_version: u64,
67    },
68    GetGovVersionWindow {
69        subject_id: DigestIdentifier,
70        from_sn: u64,
71        to_sn: u64,
72    },
73}
74
75impl Message for SnRegisterMessage {
76    fn is_critical(&self) -> bool {
77        matches!(
78            self,
79            Self::PurgeStorage
80                | Self::RegisterSn { .. }
81                | Self::DeleteSubject { .. }
82        )
83    }
84}
85
86#[derive(Clone, Debug, Serialize, Deserialize)]
87pub enum SnLimit {
88    Sn(u64),
89    LastSn,
90    NotSn,
91}
92
93#[derive(Clone, Debug, Serialize, Deserialize)]
94pub enum SnRegisterResponse {
95    Ok,
96    Sn(SnLimit),
97    GovVersionWindow(Vec<SnGovVersionRange>),
98}
99
100impl Response for SnRegisterResponse {}
101
102#[derive(
103    Clone, Debug, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
104)]
105pub enum SnRegisterEvent {
106    DeleteSubject {
107        subject_id: DigestIdentifier,
108    },
109    RegisterSn {
110        subject_id: DigestIdentifier,
111        gov_version: u64,
112        sn: u64,
113    },
114}
115
116impl Event for SnRegisterEvent {}
117
118#[derive(Clone, Debug, Serialize, Deserialize)]
119pub struct SnGovVersionRange {
120    pub from_sn: u64,
121    pub to_sn: u64,
122    pub gov_version: u64,
123}
124
125#[async_trait]
126impl Actor for SnRegister {
127    type Message = SnRegisterMessage;
128    type Event = SnRegisterEvent;
129    type Response = SnRegisterResponse;
130
131    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
132        parent_span.map_or_else(
133            || info_span!("SnRegister"),
134            |parent_span| info_span!(parent: parent_span, "SnRegister"),
135        )
136    }
137
138    async fn pre_start(
139        &mut self,
140        ctx: &mut ave_actors::ActorContext<Self>,
141    ) -> Result<(), ActorError> {
142        let prefix = ctx.path().parent().key();
143        if let Err(e) = self
144            .init_store("sn_register", Some(prefix), false, ctx)
145            .await
146        {
147            error!(
148                error = %e,
149                "Failed to initialize sn_register store"
150            );
151            return Err(e);
152        }
153        Ok(())
154    }
155}
156
157#[async_trait]
158impl Handler<Self> for SnRegister {
159    async fn handle_message(
160        &mut self,
161        _sender: ActorPath,
162        msg: SnRegisterMessage,
163        ctx: &mut ave_actors::ActorContext<Self>,
164    ) -> Result<SnRegisterResponse, ActorError> {
165        match msg {
166            SnRegisterMessage::PurgeStorage => {
167                purge_storage(ctx).await?;
168
169                debug!(msg_type = "PurgeStorage", "Sn register storage purged");
170
171                Ok(SnRegisterResponse::Ok)
172            }
173            SnRegisterMessage::DeleteSubject { subject_id } => {
174                self.on_event(
175                    SnRegisterEvent::DeleteSubject {
176                        subject_id: subject_id.clone(),
177                    },
178                    ctx,
179                )
180                .await;
181
182                debug!(
183                    msg_type = "DeleteSubject",
184                    subject_id = %subject_id,
185                    "Sn register entry deleted"
186                );
187
188                Ok(SnRegisterResponse::Ok)
189            }
190            SnRegisterMessage::GetSn {
191                subject_id,
192                gov_version,
193            } => {
194                let response = if let Some(gov_version_register) =
195                    self.register.get(&subject_id)
196                    && let Some(last) = gov_version_register.last()
197                {
198                    if gov_version > *last.0 {
199                        SnRegisterResponse::Sn(SnLimit::LastSn)
200                    } else if let Some(sn) =
201                        gov_version_register.get_prev_or_equal(gov_version)
202                    {
203                        SnRegisterResponse::Sn(SnLimit::Sn(sn))
204                    } else {
205                        SnRegisterResponse::Sn(SnLimit::NotSn)
206                    }
207                } else {
208                    SnRegisterResponse::Sn(SnLimit::NotSn)
209                };
210
211                debug!(
212                    msg_type = "GetSn",
213                    subject_id = %subject_id,
214                    gov_version = gov_version,
215                    "Sn lookup completed"
216                );
217
218                Ok(response)
219            }
220            SnRegisterMessage::RegisterSn {
221                subject_id,
222                gov_version,
223                sn,
224            } => {
225                self.on_event(
226                    SnRegisterEvent::RegisterSn {
227                        subject_id: subject_id.clone(),
228                        gov_version,
229                        sn,
230                    },
231                    ctx,
232                )
233                .await;
234
235                debug!(
236                    msg_type = "RegisterSn",
237                    subject_id = %subject_id,
238                    gov_version = gov_version,
239                    sn = sn,
240                    "Sn registered"
241                );
242
243                Ok(SnRegisterResponse::Ok)
244            }
245            SnRegisterMessage::GetGovVersionWindow {
246                subject_id,
247                from_sn,
248                to_sn,
249            } => {
250                let mut ranges = Vec::new();
251
252                if let Some(register) = self.register.get(&subject_id) {
253                    let mut prev_end_sn: Option<u64> = None;
254
255                    for (gov_version, end_sn) in register.iter() {
256                        let start_sn = prev_end_sn
257                            .map_or(0, |prev| prev.saturating_add(1));
258                        let range_from = start_sn.max(from_sn);
259                        let range_to = (*end_sn).min(to_sn);
260
261                        if range_from <= range_to {
262                            ranges.push(SnGovVersionRange {
263                                from_sn: range_from,
264                                to_sn: range_to,
265                                gov_version: *gov_version,
266                            });
267                        }
268
269                        if *end_sn >= to_sn {
270                            break;
271                        }
272
273                        prev_end_sn = Some(*end_sn);
274                    }
275                }
276
277                Ok(SnRegisterResponse::GovVersionWindow(ranges))
278            }
279        }
280    }
281
282    async fn on_event(
283        &mut self,
284        event: SnRegisterEvent,
285        ctx: &mut ActorContext<Self>,
286    ) {
287        if let Err(e) = self.persist(&event, ctx).await {
288            error!(
289                event = ?event,
290                error = %e,
291                "Failed to persist sn register event"
292            );
293            emit_fail(ctx, e).await;
294        }
295    }
296}
297
298#[async_trait]
299impl PersistentActor for SnRegister {
300    type Persistence = LightPersistence;
301    type InitParams = ();
302
303    fn create_initial(_params: Self::InitParams) -> Self {
304        Self::default()
305    }
306
307    /// Change node state.
308    fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
309        match event {
310            SnRegisterEvent::DeleteSubject { subject_id } => {
311                self.register.remove(subject_id);
312
313                debug!(
314                    event_type = "DeleteSubject",
315                    subject_id = %subject_id,
316                    "Sn register state deleted"
317                );
318            }
319            SnRegisterEvent::RegisterSn {
320                subject_id,
321                gov_version,
322                sn,
323            } => {
324                self.register
325                    .entry(subject_id.to_owned())
326                    .or_default()
327                    .insert(*gov_version, *sn);
328
329                debug!(
330                    event_type = "RegisterSn",
331                    subject_id = %subject_id,
332                    gov_version = gov_version,
333                    sn = sn,
334                    "Sn register state updated"
335                );
336            }
337        };
338
339        Ok(())
340    }
341}
342
343#[async_trait]
344impl Storable for SnRegister {}