1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use ave_actors::{
5 Actor, ActorContext, ActorError, ActorPath, Event, Handler, Message,
6 Response,
7};
8use ave_actors::{LightPersistence, PersistentActor};
9use ave_common::SchemaType;
10use ave_common::identity::{DigestIdentifier, PublicKey};
11use borsh::{BorshDeserialize, BorshSerialize};
12use serde::{Deserialize, Serialize};
13use tracing::{Span, debug, error, info_span};
14
15use crate::model::common::CeilingMap;
16use crate::{
17 db::Storable,
18 model::common::{emit_fail, purge_storage},
19};
20
21#[derive(
22 Clone,
23 Debug,
24 Serialize,
25 Deserialize,
26 Hash,
27 PartialEq,
28 Eq,
29 Ord,
30 PartialOrd,
31 BorshDeserialize,
32 BorshSerialize,
33)]
34pub struct OwnerSchema {
35 pub owner: PublicKey,
36 pub schema_id: SchemaType,
37 pub namespace: String,
38}
39
40#[derive(
41 Clone,
42 Debug,
43 Serialize,
44 Deserialize,
45 Default,
46 BorshDeserialize,
47 BorshSerialize,
48)]
49pub struct SnRegister {
50 register: HashMap<DigestIdentifier, CeilingMap<u64>>,
51}
52
53#[derive(Clone, Debug, Serialize, Deserialize)]
54pub enum SnRegisterMessage {
55 PurgeStorage,
56 DeleteSubject {
57 subject_id: DigestIdentifier,
58 },
59 RegisterSn {
60 subject_id: DigestIdentifier,
61 gov_version: u64,
62 sn: u64,
63 },
64 GetSn {
65 subject_id: DigestIdentifier,
66 gov_version: u64,
67 },
68 GetGovVersionWindow {
69 subject_id: DigestIdentifier,
70 from_sn: u64,
71 to_sn: u64,
72 },
73}
74
75impl Message for SnRegisterMessage {
76 fn is_critical(&self) -> bool {
77 matches!(
78 self,
79 Self::PurgeStorage
80 | Self::RegisterSn { .. }
81 | Self::DeleteSubject { .. }
82 )
83 }
84}
85
86#[derive(Clone, Debug, Serialize, Deserialize)]
87pub enum SnLimit {
88 Sn(u64),
89 LastSn,
90 NotSn,
91}
92
93#[derive(Clone, Debug, Serialize, Deserialize)]
94pub enum SnRegisterResponse {
95 Ok,
96 Sn(SnLimit),
97 GovVersionWindow(Vec<SnGovVersionRange>),
98}
99
100impl Response for SnRegisterResponse {}
101
102#[derive(
103 Clone, Debug, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
104)]
105pub enum SnRegisterEvent {
106 DeleteSubject {
107 subject_id: DigestIdentifier,
108 },
109 RegisterSn {
110 subject_id: DigestIdentifier,
111 gov_version: u64,
112 sn: u64,
113 },
114}
115
116impl Event for SnRegisterEvent {}
117
118#[derive(Clone, Debug, Serialize, Deserialize)]
119pub struct SnGovVersionRange {
120 pub from_sn: u64,
121 pub to_sn: u64,
122 pub gov_version: u64,
123}
124
125#[async_trait]
126impl Actor for SnRegister {
127 type Message = SnRegisterMessage;
128 type Event = SnRegisterEvent;
129 type Response = SnRegisterResponse;
130
131 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
132 parent_span.map_or_else(
133 || info_span!("SnRegister"),
134 |parent_span| info_span!(parent: parent_span, "SnRegister"),
135 )
136 }
137
138 async fn pre_start(
139 &mut self,
140 ctx: &mut ave_actors::ActorContext<Self>,
141 ) -> Result<(), ActorError> {
142 let prefix = ctx.path().parent().key();
143 if let Err(e) = self
144 .init_store("sn_register", Some(prefix), false, ctx)
145 .await
146 {
147 error!(
148 error = %e,
149 "Failed to initialize sn_register store"
150 );
151 return Err(e);
152 }
153 Ok(())
154 }
155}
156
157#[async_trait]
158impl Handler<Self> for SnRegister {
159 async fn handle_message(
160 &mut self,
161 _sender: ActorPath,
162 msg: SnRegisterMessage,
163 ctx: &mut ave_actors::ActorContext<Self>,
164 ) -> Result<SnRegisterResponse, ActorError> {
165 match msg {
166 SnRegisterMessage::PurgeStorage => {
167 purge_storage(ctx).await?;
168
169 debug!(msg_type = "PurgeStorage", "Sn register storage purged");
170
171 Ok(SnRegisterResponse::Ok)
172 }
173 SnRegisterMessage::DeleteSubject { subject_id } => {
174 self.on_event(
175 SnRegisterEvent::DeleteSubject {
176 subject_id: subject_id.clone(),
177 },
178 ctx,
179 )
180 .await;
181
182 debug!(
183 msg_type = "DeleteSubject",
184 subject_id = %subject_id,
185 "Sn register entry deleted"
186 );
187
188 Ok(SnRegisterResponse::Ok)
189 }
190 SnRegisterMessage::GetSn {
191 subject_id,
192 gov_version,
193 } => {
194 let response = if let Some(gov_version_register) =
195 self.register.get(&subject_id)
196 && let Some(last) = gov_version_register.last()
197 {
198 if gov_version > *last.0 {
199 SnRegisterResponse::Sn(SnLimit::LastSn)
200 } else if let Some(sn) =
201 gov_version_register.get_prev_or_equal(gov_version)
202 {
203 SnRegisterResponse::Sn(SnLimit::Sn(sn))
204 } else {
205 SnRegisterResponse::Sn(SnLimit::NotSn)
206 }
207 } else {
208 SnRegisterResponse::Sn(SnLimit::NotSn)
209 };
210
211 debug!(
212 msg_type = "GetSn",
213 subject_id = %subject_id,
214 gov_version = gov_version,
215 "Sn lookup completed"
216 );
217
218 Ok(response)
219 }
220 SnRegisterMessage::RegisterSn {
221 subject_id,
222 gov_version,
223 sn,
224 } => {
225 self.on_event(
226 SnRegisterEvent::RegisterSn {
227 subject_id: subject_id.clone(),
228 gov_version,
229 sn,
230 },
231 ctx,
232 )
233 .await;
234
235 debug!(
236 msg_type = "RegisterSn",
237 subject_id = %subject_id,
238 gov_version = gov_version,
239 sn = sn,
240 "Sn registered"
241 );
242
243 Ok(SnRegisterResponse::Ok)
244 }
245 SnRegisterMessage::GetGovVersionWindow {
246 subject_id,
247 from_sn,
248 to_sn,
249 } => {
250 let mut ranges = Vec::new();
251
252 if let Some(register) = self.register.get(&subject_id) {
253 let mut prev_end_sn: Option<u64> = None;
254
255 for (gov_version, end_sn) in register.iter() {
256 let start_sn = prev_end_sn
257 .map_or(0, |prev| prev.saturating_add(1));
258 let range_from = start_sn.max(from_sn);
259 let range_to = (*end_sn).min(to_sn);
260
261 if range_from <= range_to {
262 ranges.push(SnGovVersionRange {
263 from_sn: range_from,
264 to_sn: range_to,
265 gov_version: *gov_version,
266 });
267 }
268
269 if *end_sn >= to_sn {
270 break;
271 }
272
273 prev_end_sn = Some(*end_sn);
274 }
275 }
276
277 Ok(SnRegisterResponse::GovVersionWindow(ranges))
278 }
279 }
280 }
281
282 async fn on_event(
283 &mut self,
284 event: SnRegisterEvent,
285 ctx: &mut ActorContext<Self>,
286 ) {
287 if let Err(e) = self.persist(&event, ctx).await {
288 error!(
289 event = ?event,
290 error = %e,
291 "Failed to persist sn register event"
292 );
293 emit_fail(ctx, e).await;
294 }
295 }
296}
297
298#[async_trait]
299impl PersistentActor for SnRegister {
300 type Persistence = LightPersistence;
301 type InitParams = ();
302
303 fn create_initial(_params: Self::InitParams) -> Self {
304 Self::default()
305 }
306
307 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
309 match event {
310 SnRegisterEvent::DeleteSubject { subject_id } => {
311 self.register.remove(subject_id);
312
313 debug!(
314 event_type = "DeleteSubject",
315 subject_id = %subject_id,
316 "Sn register state deleted"
317 );
318 }
319 SnRegisterEvent::RegisterSn {
320 subject_id,
321 gov_version,
322 sn,
323 } => {
324 self.register
325 .entry(subject_id.to_owned())
326 .or_default()
327 .insert(*gov_version, *sn);
328
329 debug!(
330 event_type = "RegisterSn",
331 subject_id = %subject_id,
332 gov_version = gov_version,
333 sn = sn,
334 "Sn register state updated"
335 );
336 }
337 };
338
339 Ok(())
340 }
341}
342
343#[async_trait]
344impl Storable for SnRegister {}