1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use ave_actors::{
5 Actor, ActorContext, ActorError, ActorPath, Event, Handler, Message,
6 Response,
7};
8use ave_actors::{LightPersistence, PersistentActor};
9use ave_common::SchemaType;
10use ave_common::identity::{DigestIdentifier, PublicKey};
11use borsh::{BorshDeserialize, BorshSerialize};
12use serde::{Deserialize, Serialize};
13use tracing::{Span, debug, error, info_span};
14
15use crate::model::common::CeilingMap;
16use crate::{
17 db::Storable,
18 model::common::{emit_fail, purge_storage},
19};
20
21#[derive(
22 Clone,
23 Debug,
24 Serialize,
25 Deserialize,
26 Hash,
27 PartialEq,
28 Eq,
29 Ord,
30 PartialOrd,
31 BorshDeserialize,
32 BorshSerialize,
33)]
34pub struct OwnerSchema {
35 pub owner: PublicKey,
36 pub schema_id: SchemaType,
37 pub namespace: String,
38}
39
40#[derive(
41 Clone,
42 Debug,
43 Serialize,
44 Deserialize,
45 Default,
46 BorshDeserialize,
47 BorshSerialize,
48)]
49pub struct SnRegister {
50 register: HashMap<DigestIdentifier, CeilingMap<u64>>,
51}
52
53#[derive(Clone, Debug, Serialize, Deserialize)]
54pub enum SnRegisterMessage {
55 PurgeStorage,
56 DeleteSubject {
57 subject_id: DigestIdentifier,
58 },
59 RegisterSn {
60 subject_id: DigestIdentifier,
61 gov_version: u64,
62 sn: u64,
63 },
64 GetSn {
65 subject_id: DigestIdentifier,
66 gov_version: u64,
67 },
68}
69
70impl Message for SnRegisterMessage {
71 fn is_critical(&self) -> bool {
72 matches!(
73 self,
74 Self::PurgeStorage
75 | Self::RegisterSn { .. }
76 | Self::DeleteSubject { .. }
77 )
78 }
79}
80
81#[derive(Clone, Debug, Serialize, Deserialize)]
82pub enum SnLimit {
83 Sn(u64),
84 LastSn,
85 NotSn,
86}
87
88#[derive(Clone, Debug, Serialize, Deserialize)]
89pub enum SnRegisterResponse {
90 Ok,
91 Sn(SnLimit),
92}
93
94impl Response for SnRegisterResponse {}
95
96#[derive(
97 Clone, Debug, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
98)]
99pub enum SnRegisterEvent {
100 DeleteSubject {
101 subject_id: DigestIdentifier,
102 },
103 RegisterSn {
104 subject_id: DigestIdentifier,
105 gov_version: u64,
106 sn: u64,
107 },
108}
109
110impl Event for SnRegisterEvent {}
111
112#[async_trait]
113impl Actor for SnRegister {
114 type Message = SnRegisterMessage;
115 type Event = SnRegisterEvent;
116 type Response = SnRegisterResponse;
117
118 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
119 parent_span.map_or_else(
120 || info_span!("SnRegister"),
121 |parent_span| info_span!(parent: parent_span, "SnRegister"),
122 )
123 }
124
125 async fn pre_start(
126 &mut self,
127 ctx: &mut ave_actors::ActorContext<Self>,
128 ) -> Result<(), ActorError> {
129 let prefix = ctx.path().parent().key();
130 if let Err(e) = self
131 .init_store("sn_register", Some(prefix), false, ctx)
132 .await
133 {
134 error!(
135 error = %e,
136 "Failed to initialize sn_register store"
137 );
138 return Err(e);
139 }
140 Ok(())
141 }
142}
143
144#[async_trait]
145impl Handler<Self> for SnRegister {
146 async fn handle_message(
147 &mut self,
148 _sender: ActorPath,
149 msg: SnRegisterMessage,
150 ctx: &mut ave_actors::ActorContext<Self>,
151 ) -> Result<SnRegisterResponse, ActorError> {
152 match msg {
153 SnRegisterMessage::PurgeStorage => {
154 purge_storage(ctx).await?;
155
156 debug!(msg_type = "PurgeStorage", "Sn register storage purged");
157
158 Ok(SnRegisterResponse::Ok)
159 }
160 SnRegisterMessage::DeleteSubject { subject_id } => {
161 self.on_event(
162 SnRegisterEvent::DeleteSubject {
163 subject_id: subject_id.clone(),
164 },
165 ctx,
166 )
167 .await;
168
169 debug!(
170 msg_type = "DeleteSubject",
171 subject_id = %subject_id,
172 "Sn register entry deleted"
173 );
174
175 Ok(SnRegisterResponse::Ok)
176 }
177 SnRegisterMessage::GetSn {
178 subject_id,
179 gov_version,
180 } => {
181 let response = if let Some(gov_version_register) =
182 self.register.get(&subject_id)
183 && let Some(last) = gov_version_register.last()
184 {
185 if gov_version > *last.0 {
186 SnRegisterResponse::Sn(SnLimit::LastSn)
187 } else if let Some(sn) =
188 gov_version_register.get_prev_or_equal(gov_version)
189 {
190 SnRegisterResponse::Sn(SnLimit::Sn(sn))
191 } else {
192 SnRegisterResponse::Sn(SnLimit::NotSn)
193 }
194 } else {
195 SnRegisterResponse::Sn(SnLimit::NotSn)
196 };
197
198 debug!(
199 msg_type = "GetSn",
200 subject_id = %subject_id,
201 gov_version = gov_version,
202 "Sn lookup completed"
203 );
204
205 Ok(response)
206 }
207 SnRegisterMessage::RegisterSn {
208 subject_id,
209 gov_version,
210 sn,
211 } => {
212 self.on_event(
213 SnRegisterEvent::RegisterSn {
214 subject_id: subject_id.clone(),
215 gov_version,
216 sn,
217 },
218 ctx,
219 )
220 .await;
221
222 debug!(
223 msg_type = "RegisterSn",
224 subject_id = %subject_id,
225 gov_version = gov_version,
226 sn = sn,
227 "Sn registered"
228 );
229
230 Ok(SnRegisterResponse::Ok)
231 }
232 }
233 }
234
235 async fn on_event(
236 &mut self,
237 event: SnRegisterEvent,
238 ctx: &mut ActorContext<Self>,
239 ) {
240 if let Err(e) = self.persist(&event, ctx).await {
241 error!(
242 event = ?event,
243 error = %e,
244 "Failed to persist sn register event"
245 );
246 emit_fail(ctx, e).await;
247 }
248 }
249}
250
251#[async_trait]
252impl PersistentActor for SnRegister {
253 type Persistence = LightPersistence;
254 type InitParams = ();
255
256 fn create_initial(_params: Self::InitParams) -> Self {
257 Self::default()
258 }
259
260 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
262 match event {
263 SnRegisterEvent::DeleteSubject { subject_id } => {
264 self.register.remove(subject_id);
265
266 debug!(
267 event_type = "DeleteSubject",
268 subject_id = %subject_id,
269 "Sn register state deleted"
270 );
271 }
272 SnRegisterEvent::RegisterSn {
273 subject_id,
274 gov_version,
275 sn,
276 } => {
277 self.register
278 .entry(subject_id.to_owned())
279 .or_default()
280 .insert(*gov_version, *sn);
281
282 debug!(
283 event_type = "RegisterSn",
284 subject_id = %subject_id,
285 gov_version = gov_version,
286 sn = sn,
287 "Sn register state updated"
288 );
289 }
290 };
291
292 Ok(())
293 }
294}
295
296#[async_trait]
297impl Storable for SnRegister {}