1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use ave_actors::{
5 Actor, ActorContext, ActorError, ActorPath, Event, Handler, Message,
6 Response,
7};
8use ave_actors::{LightPersistence, PersistentActor};
9use ave_common::SchemaType;
10use ave_common::identity::{DigestIdentifier, PublicKey};
11use borsh::{BorshDeserialize, BorshSerialize};
12use serde::{Deserialize, Serialize};
13use tracing::{Span, debug, error, info_span};
14
15use crate::model::common::CeilingMap;
16use crate::{db::Storable, model::common::emit_fail};
17
18#[derive(
19 Clone,
20 Debug,
21 Serialize,
22 Deserialize,
23 Hash,
24 PartialEq,
25 Eq,
26 Ord,
27 PartialOrd,
28 BorshDeserialize,
29 BorshSerialize,
30)]
31pub struct OwnerSchema {
32 pub owner: PublicKey,
33 pub schema_id: SchemaType,
34 pub namespace: String,
35}
36
37#[derive(
38 Clone,
39 Debug,
40 Serialize,
41 Deserialize,
42 Default,
43 BorshDeserialize,
44 BorshSerialize,
45)]
46pub struct SnRegister {
47 register: HashMap<DigestIdentifier, CeilingMap<u64>>,
48}
49
50#[derive(Clone, Debug, Serialize, Deserialize)]
51pub enum SnRegisterMessage {
52 RegisterSn {
53 subject_id: DigestIdentifier,
54 gov_version: u64,
55 sn: u64,
56 },
57 GetSn {
58 subject_id: DigestIdentifier,
59 gov_version: u64,
60 },
61}
62
63impl Message for SnRegisterMessage {
64 fn is_critical(&self) -> bool {
65 matches!(self, Self::RegisterSn { .. })
66 }
67}
68
69#[derive(Clone, Debug, Serialize, Deserialize)]
70pub enum SnLimit {
71 Sn(u64),
72 LastSn,
73 NotSn,
74}
75
76#[derive(Clone, Debug, Serialize, Deserialize)]
77pub enum SnRegisterResponse {
78 Ok,
79 Sn(SnLimit),
80}
81
82impl Response for SnRegisterResponse {}
83
84#[derive(
85 Clone, Debug, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
86)]
87pub enum SnRegisterEvent {
88 RegisterSn {
89 subject_id: DigestIdentifier,
90 gov_version: u64,
91 sn: u64,
92 },
93}
94
95impl Event for SnRegisterEvent {}
96
97#[async_trait]
98impl Actor for SnRegister {
99 type Message = SnRegisterMessage;
100 type Event = SnRegisterEvent;
101 type Response = SnRegisterResponse;
102
103 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
104 parent_span.map_or_else(
105 || info_span!("SnRegister"),
106 |parent_span| info_span!(parent: parent_span, "SnRegister"),
107 )
108 }
109
110 async fn pre_start(
111 &mut self,
112 ctx: &mut ave_actors::ActorContext<Self>,
113 ) -> Result<(), ActorError> {
114 let prefix = ctx.path().parent().key();
115 if let Err(e) = self
116 .init_store("sn_register", Some(prefix), false, ctx)
117 .await
118 {
119 error!(
120 error = %e,
121 "Failed to initialize sn_register store"
122 );
123 return Err(e);
124 }
125 Ok(())
126 }
127}
128
129#[async_trait]
130impl Handler<Self> for SnRegister {
131 async fn handle_message(
132 &mut self,
133 _sender: ActorPath,
134 msg: SnRegisterMessage,
135 ctx: &mut ave_actors::ActorContext<Self>,
136 ) -> Result<SnRegisterResponse, ActorError> {
137 match msg {
138 SnRegisterMessage::GetSn {
139 subject_id,
140 gov_version,
141 } => {
142 let response = if let Some(gov_version_register) =
143 self.register.get(&subject_id)
144 && let Some(last) = gov_version_register.last()
145 {
146 if gov_version > *last.0 {
147 SnRegisterResponse::Sn(SnLimit::LastSn)
148 } else if let Some(sn) =
149 gov_version_register.get_prev_or_equal(gov_version)
150 {
151 SnRegisterResponse::Sn(SnLimit::Sn(sn))
152 } else {
153 SnRegisterResponse::Sn(SnLimit::NotSn)
154 }
155 } else {
156 SnRegisterResponse::Sn(SnLimit::NotSn)
157 };
158
159 debug!(
160 msg_type = "GetSn",
161 subject_id = %subject_id,
162 gov_version = gov_version,
163 "Sn lookup completed"
164 );
165
166 Ok(response)
167 }
168 SnRegisterMessage::RegisterSn {
169 subject_id,
170 gov_version,
171 sn,
172 } => {
173 self.on_event(
174 SnRegisterEvent::RegisterSn {
175 subject_id: subject_id.clone(),
176 gov_version,
177 sn,
178 },
179 ctx,
180 )
181 .await;
182
183 debug!(
184 msg_type = "RegisterSn",
185 subject_id = %subject_id,
186 gov_version = gov_version,
187 sn = sn,
188 "Sn registered"
189 );
190
191 Ok(SnRegisterResponse::Ok)
192 }
193 }
194 }
195
196 async fn on_event(
197 &mut self,
198 event: SnRegisterEvent,
199 ctx: &mut ActorContext<Self>,
200 ) {
201 if let Err(e) = self.persist(&event, ctx).await {
202 error!(
203 event = ?event,
204 error = %e,
205 "Failed to persist sn register event"
206 );
207 emit_fail(ctx, e).await;
208 }
209 }
210}
211
212#[async_trait]
213impl PersistentActor for SnRegister {
214 type Persistence = LightPersistence;
215 type InitParams = ();
216
217 fn create_initial(_params: Self::InitParams) -> Self {
218 Self::default()
219 }
220
221 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
223 match event {
224 SnRegisterEvent::RegisterSn {
225 subject_id,
226 gov_version,
227 sn,
228 } => {
229 self.register
230 .entry(subject_id.to_owned())
231 .or_default()
232 .insert(*gov_version, *sn);
233
234 debug!(
235 event_type = "RegisterSn",
236 subject_id = %subject_id,
237 gov_version = gov_version,
238 sn = sn,
239 "Sn register state updated"
240 );
241 }
242 };
243
244 Ok(())
245 }
246}
247
248#[async_trait]
249impl Storable for SnRegister {}