1use std::collections::{HashMap, HashSet};
2
3use async_trait::async_trait;
4use ave_actors::{
5 Actor, ActorContext, ActorError, ActorPath, Event, Handler, Message,
6 Response,
7};
8use ave_actors::{LightPersistence, PersistentActor};
9use ave_common::SchemaType;
10use ave_common::identity::{DigestIdentifier, PublicKey};
11use borsh::{BorshDeserialize, BorshSerialize};
12use serde::{Deserialize, Serialize};
13use tracing::{Span, debug, error, info_span};
14
15use crate::model::common::CeilingMap;
16use crate::{
17 db::Storable,
18 governance::model::CreatorQuantity,
19 model::common::{emit_fail, purge_storage},
20};
21
22#[derive(
23 Clone,
24 Debug,
25 Serialize,
26 Deserialize,
27 Hash,
28 PartialEq,
29 Eq,
30 Ord,
31 PartialOrd,
32 BorshDeserialize,
33 BorshSerialize,
34)]
35pub struct OwnerSchema {
36 pub owner: PublicKey,
37 pub schema_id: SchemaType,
38 pub namespace: String,
39}
40
41#[derive(
42 Clone,
43 Debug,
44 Serialize,
45 Deserialize,
46 Default,
47 BorshDeserialize,
48 BorshSerialize,
49)]
50pub struct SubjectRegister {
51 register: HashMap<RegisterData, (RegisterCreations, RegisterSubjects)>,
52}
53
54type RegisterData = (PublicKey, SchemaType, String);
55type RegisterCreations = CeilingMap<CreatorQuantity>;
56type RegisterSubjects = HashSet<DigestIdentifier>;
57
58impl SubjectRegister {
59 fn check(
60 &self,
61 creator: &PublicKey,
62 namespace: &str,
63 schema_id: &SchemaType,
64 gov_version: u64,
65 ) -> Result<(), ActorError> {
66 if let Some((creator_quantity, subjects)) = self.register.get(&(
67 creator.clone(),
68 schema_id.clone(),
69 namespace.to_owned(),
70 )) {
71 if let Some(quantity) =
72 creator_quantity.get_prev_or_equal(gov_version)
73 {
74 match quantity {
75 CreatorQuantity::Quantity(quantity) => {
76 if subjects.len() + 1 > quantity as usize {
77 return Err(ActorError::Functional {
78 description:
79 "Maximum number of subjects reached"
80 .to_owned(),
81 });
82 }
83 }
84 CreatorQuantity::Infinity => {}
85 };
86 Ok(())
87 } else {
88 Err(ActorError::Functional {
89 description: "Can not get Creator Quantity".to_owned(),
90 })
91 }
92 } else {
93 Err(ActorError::Functional {
94 description: "Is not a Creator".to_owned(),
95 })
96 }
97 }
98}
99
100#[derive(Clone, Debug, Serialize, Deserialize)]
101pub enum SubjectRegisterMessage {
102 PurgeStorage,
103 Check {
104 creator: PublicKey,
105 gov_version: u64,
106 namespace: String,
107 schema_id: SchemaType,
108 },
109 GetSubjectsByOwnerSchema {
110 owner: PublicKey,
111 schema_id: SchemaType,
112 namespace: String,
113 },
114 RegisterData {
115 gov_version: u64,
116 data: Vec<(PublicKey, SchemaType, String, CreatorQuantity)>,
117 },
118 CreateSubject {
119 creator: PublicKey,
120 subject_id: DigestIdentifier,
121 namespace: String,
122 schema_id: SchemaType,
123 gov_version: u64,
124 },
125 DeleteSubject {
126 subject_id: DigestIdentifier,
127 },
128 UpdateSubject {
129 new_owner: PublicKey,
130 old_owner: PublicKey,
131 subject_id: DigestIdentifier,
132 namespace: String,
133 schema_id: SchemaType,
134 gov_version: u64,
135 },
136}
137
138impl Message for SubjectRegisterMessage {
139 fn is_critical(&self) -> bool {
140 match self {
141 Self::PurgeStorage
142 | Self::RegisterData { .. }
143 | Self::CreateSubject { .. }
144 | Self::DeleteSubject { .. }
145 | Self::UpdateSubject { .. } => true,
146 Self::Check { .. } | Self::GetSubjectsByOwnerSchema { .. } => false,
147 }
148 }
149}
150
151#[derive(Clone, Debug, Serialize, Deserialize)]
152pub enum SubjectRegisterResponse {
153 Ok,
154 Subjects(Vec<DigestIdentifier>),
155}
156
157impl Response for SubjectRegisterResponse {}
158
159#[derive(
160 Clone, Debug, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
161)]
162pub enum SubjectRegisterEvent {
163 RegisterData {
164 gov_version: u64,
165 data: Vec<(PublicKey, SchemaType, String, CreatorQuantity)>,
166 },
167 CreateSubject {
168 creator: PublicKey,
169 subject_id: DigestIdentifier,
170 namespace: String,
171 schema_id: SchemaType,
172 },
173 DeleteSubject {
174 subject_id: DigestIdentifier,
175 },
176 UpdateSubject {
177 new_owner: PublicKey,
178 old_owner: PublicKey,
179 subject_id: DigestIdentifier,
180 namespace: String,
181 schema_id: SchemaType,
182 },
183}
184
185impl Event for SubjectRegisterEvent {}
186
187#[async_trait]
188impl Actor for SubjectRegister {
189 type Message = SubjectRegisterMessage;
190 type Event = SubjectRegisterEvent;
191 type Response = SubjectRegisterResponse;
192
193 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
194 parent_span.map_or_else(
195 || info_span!("SubjectRegister"),
196 |parent_span| info_span!(parent: parent_span, "SubjectRegister"),
197 )
198 }
199
200 async fn pre_start(
201 &mut self,
202 ctx: &mut ave_actors::ActorContext<Self>,
203 ) -> Result<(), ActorError> {
204 let prefix = ctx.path().parent().key();
205 if let Err(e) = self
206 .init_store("subject_register", Some(prefix), false, ctx)
207 .await
208 {
209 error!(
210 error = %e,
211 "Failed to initialize subject_register store"
212 );
213 return Err(e);
214 }
215 Ok(())
216 }
217}
218
219#[async_trait]
220impl Handler<Self> for SubjectRegister {
221 async fn handle_message(
222 &mut self,
223 _sender: ActorPath,
224 msg: SubjectRegisterMessage,
225 ctx: &mut ave_actors::ActorContext<Self>,
226 ) -> Result<SubjectRegisterResponse, ActorError> {
227 match msg {
228 SubjectRegisterMessage::PurgeStorage => {
229 purge_storage(ctx).await?;
230
231 debug!(
232 msg_type = "PurgeStorage",
233 "Subject register storage purged"
234 );
235
236 return Ok(SubjectRegisterResponse::Ok);
237 }
238 SubjectRegisterMessage::GetSubjectsByOwnerSchema {
239 owner,
240 schema_id,
241 namespace,
242 } => {
243 let subjects = self
244 .register
245 .get(&(owner, schema_id, namespace))
246 .map(|(_, subjects)| {
247 let mut subjects =
248 subjects.iter().cloned().collect::<Vec<_>>();
249 subjects.sort();
250 subjects
251 })
252 .unwrap_or_default();
253
254 return Ok(SubjectRegisterResponse::Subjects(subjects));
255 }
256 SubjectRegisterMessage::RegisterData { gov_version, data } => {
257 let data_count = data.len();
258 self.on_event(
259 SubjectRegisterEvent::RegisterData { gov_version, data },
260 ctx,
261 )
262 .await;
263
264 debug!(
265 msg_type = "RegisterData",
266 gov_version = gov_version,
267 data_count = data_count,
268 "Creator data registered"
269 );
270
271 Ok(SubjectRegisterResponse::Ok)
272 }
273 SubjectRegisterMessage::CreateSubject {
274 creator,
275 subject_id,
276 namespace,
277 schema_id,
278 gov_version,
279 } => {
280 self.check(&creator, &namespace, &schema_id, gov_version)?;
281
282 self.on_event(
283 SubjectRegisterEvent::CreateSubject {
284 creator: creator.clone(),
285 subject_id: subject_id.clone(),
286 namespace: namespace.clone(),
287 schema_id: schema_id.clone(),
288 },
289 ctx,
290 )
291 .await;
292
293 debug!(
294 msg_type = "CreateSubject",
295 subject_id = %subject_id,
296 creator = %creator,
297 schema_id = ?schema_id,
298 "Subject created in register"
299 );
300
301 Ok(SubjectRegisterResponse::Ok)
302 }
303 SubjectRegisterMessage::DeleteSubject { subject_id } => {
304 self.on_event(
305 SubjectRegisterEvent::DeleteSubject {
306 subject_id: subject_id.clone(),
307 },
308 ctx,
309 )
310 .await;
311
312 debug!(
313 msg_type = "DeleteSubject",
314 subject_id = %subject_id,
315 "Subject removed from register"
316 );
317
318 Ok(SubjectRegisterResponse::Ok)
319 }
320 SubjectRegisterMessage::UpdateSubject {
321 new_owner,
322 old_owner,
323 subject_id,
324 namespace,
325 schema_id,
326 gov_version,
327 } => {
328 self.check(&new_owner, &namespace, &schema_id, gov_version)?;
329 self.on_event(
330 SubjectRegisterEvent::UpdateSubject {
331 new_owner: new_owner.clone(),
332 old_owner: old_owner.clone(),
333 subject_id: subject_id.clone(),
334 namespace: namespace.clone(),
335 schema_id: schema_id.clone(),
336 },
337 ctx,
338 )
339 .await;
340
341 debug!(
342 msg_type = "UpdateSubject",
343 subject_id = %subject_id,
344 old_owner = %old_owner,
345 new_owner = %new_owner,
346 "Subject ownership updated"
347 );
348
349 Ok(SubjectRegisterResponse::Ok)
350 }
351 SubjectRegisterMessage::Check {
352 creator,
353 gov_version,
354 namespace,
355 schema_id,
356 } => {
357 self.check(&creator, &namespace, &schema_id, gov_version)?;
358
359 debug!(
360 msg_type = "Check",
361 creator = %creator,
362 schema_id = ?schema_id,
363 "Creator check passed"
364 );
365
366 Ok(SubjectRegisterResponse::Ok)
367 }
368 }
369 }
370
371 async fn on_event(
372 &mut self,
373 event: SubjectRegisterEvent,
374 ctx: &mut ActorContext<Self>,
375 ) {
376 if let Err(e) = self.persist(&event, ctx).await {
377 error!(
378 event = ?event,
379 error = %e,
380 "Failed to persist subject register event"
381 );
382 emit_fail(ctx, e).await;
383 }
384 }
385}
386
387#[async_trait]
388impl PersistentActor for SubjectRegister {
389 type Persistence = LightPersistence;
390 type InitParams = ();
391
392 fn create_initial(_params: Self::InitParams) -> Self {
393 Self::default()
394 }
395
396 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
398 match event {
399 SubjectRegisterEvent::RegisterData { gov_version, data } => {
400 for (creator, schema_id, namespace, quantity) in data.iter() {
401 self.register
402 .entry((
403 creator.to_owned(),
404 schema_id.to_owned(),
405 namespace.to_owned(),
406 ))
407 .or_insert_with(|| (CeilingMap::new(), HashSet::new()))
408 .0
409 .insert(*gov_version, quantity.to_owned());
410 }
411
412 debug!(
413 event_type = "RegisterData",
414 gov_version = gov_version,
415 data_count = data.len(),
416 "Creator data applied to state"
417 );
418 }
419 SubjectRegisterEvent::CreateSubject {
420 creator,
421 subject_id,
422 namespace,
423 schema_id,
424 } => {
425 self.register
426 .entry((
427 creator.to_owned(),
428 schema_id.to_owned(),
429 namespace.to_owned(),
430 ))
431 .or_insert_with(|| (CeilingMap::new(), HashSet::new()))
432 .1
433 .insert(subject_id.to_owned());
434
435 debug!(
436 event_type = "CreateSubject",
437 subject_id = %subject_id,
438 creator = %creator,
439 "Subject added to register state"
440 );
441 }
442 SubjectRegisterEvent::DeleteSubject { subject_id } => {
443 for (_, subjects) in self.register.values_mut() {
444 subjects.remove(subject_id);
445 }
446
447 debug!(
448 event_type = "DeleteSubject",
449 subject_id = %subject_id,
450 "Subject removed from register state"
451 );
452 }
453 SubjectRegisterEvent::UpdateSubject {
454 new_owner,
455 old_owner,
456 subject_id,
457 namespace,
458 schema_id,
459 } => {
460 self.register
461 .entry((
462 new_owner.to_owned(),
463 schema_id.to_owned(),
464 namespace.to_owned(),
465 ))
466 .or_insert_with(|| (CeilingMap::new(), HashSet::new()))
467 .1
468 .insert(subject_id.to_owned());
469
470 self.register
471 .entry((
472 old_owner.to_owned(),
473 schema_id.to_owned(),
474 namespace.to_owned(),
475 ))
476 .or_insert_with(|| (CeilingMap::new(), HashSet::new()))
477 .1
478 .remove(subject_id);
479
480 debug!(
481 event_type = "UpdateSubject",
482 subject_id = %subject_id,
483 old_owner = %old_owner,
484 new_owner = %new_owner,
485 "Subject ownership updated in state"
486 );
487 }
488 };
489
490 Ok(())
491 }
492}
493
494#[async_trait]
495impl Storable for SubjectRegister {}