Skip to main content

ave_core/governance/
subject_register.rs

1use std::collections::{HashMap, HashSet};
2
3use async_trait::async_trait;
4use ave_actors::{
5    Actor, ActorContext, ActorError, ActorPath, Event, Handler, Message,
6    Response,
7};
8use ave_actors::{LightPersistence, PersistentActor};
9use ave_common::SchemaType;
10use ave_common::identity::{DigestIdentifier, PublicKey};
11use borsh::{BorshDeserialize, BorshSerialize};
12use serde::{Deserialize, Serialize};
13use tracing::{Span, debug, error, info_span};
14
15use crate::model::common::CeilingMap;
16use crate::{
17    db::Storable,
18    governance::model::CreatorQuantity,
19    model::common::{emit_fail, purge_storage},
20};
21
22#[derive(
23    Clone,
24    Debug,
25    Serialize,
26    Deserialize,
27    Hash,
28    PartialEq,
29    Eq,
30    Ord,
31    PartialOrd,
32    BorshDeserialize,
33    BorshSerialize,
34)]
35pub struct OwnerSchema {
36    pub owner: PublicKey,
37    pub schema_id: SchemaType,
38    pub namespace: String,
39}
40
41#[derive(
42    Clone,
43    Debug,
44    Serialize,
45    Deserialize,
46    Default,
47    BorshDeserialize,
48    BorshSerialize,
49)]
50pub struct SubjectRegister {
51    register: HashMap<RegisterData, (RegisterCreations, RegisterSubjects)>,
52}
53
54type RegisterData = (PublicKey, SchemaType, String);
55type RegisterCreations = CeilingMap<CreatorQuantity>;
56type RegisterSubjects = HashSet<DigestIdentifier>;
57
58impl SubjectRegister {
59    fn check(
60        &self,
61        creator: &PublicKey,
62        namespace: &str,
63        schema_id: &SchemaType,
64        gov_version: u64,
65    ) -> Result<(), ActorError> {
66        if let Some((creator_quantity, subjects)) = self.register.get(&(
67            creator.clone(),
68            schema_id.clone(),
69            namespace.to_owned(),
70        )) {
71            if let Some(quantity) =
72                creator_quantity.get_prev_or_equal(gov_version)
73            {
74                match quantity {
75                    CreatorQuantity::Quantity(quantity) => {
76                        if subjects.len() + 1 > quantity as usize {
77                            return Err(ActorError::Functional {
78                                description:
79                                    "Maximum number of subjects reached"
80                                        .to_owned(),
81                            });
82                        }
83                    }
84                    CreatorQuantity::Infinity => {}
85                };
86                Ok(())
87            } else {
88                Err(ActorError::Functional {
89                    description: "Can not get Creator Quantity".to_owned(),
90                })
91            }
92        } else {
93            Err(ActorError::Functional {
94                description: "Is not a Creator".to_owned(),
95            })
96        }
97    }
98}
99
100#[derive(Clone, Debug, Serialize, Deserialize)]
101pub enum SubjectRegisterMessage {
102    PurgeStorage,
103    Check {
104        creator: PublicKey,
105        gov_version: u64,
106        namespace: String,
107        schema_id: SchemaType,
108    },
109    GetSubjectsByOwnerSchema {
110        owner: PublicKey,
111        schema_id: SchemaType,
112        namespace: String,
113    },
114    RegisterData {
115        gov_version: u64,
116        data: Vec<(PublicKey, SchemaType, String, CreatorQuantity)>,
117    },
118    CreateSubject {
119        creator: PublicKey,
120        subject_id: DigestIdentifier,
121        namespace: String,
122        schema_id: SchemaType,
123        gov_version: u64,
124    },
125    DeleteSubject {
126        subject_id: DigestIdentifier,
127    },
128    UpdateSubject {
129        new_owner: PublicKey,
130        old_owner: PublicKey,
131        subject_id: DigestIdentifier,
132        namespace: String,
133        schema_id: SchemaType,
134        gov_version: u64,
135    },
136}
137
138impl Message for SubjectRegisterMessage {
139    fn is_critical(&self) -> bool {
140        match self {
141            Self::PurgeStorage
142            | Self::RegisterData { .. }
143            | Self::CreateSubject { .. }
144            | Self::DeleteSubject { .. }
145            | Self::UpdateSubject { .. } => true,
146            Self::Check { .. } | Self::GetSubjectsByOwnerSchema { .. } => false,
147        }
148    }
149}
150
151#[derive(Clone, Debug, Serialize, Deserialize)]
152pub enum SubjectRegisterResponse {
153    Ok,
154    Subjects(Vec<DigestIdentifier>),
155}
156
157impl Response for SubjectRegisterResponse {}
158
159#[derive(
160    Clone, Debug, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
161)]
162pub enum SubjectRegisterEvent {
163    RegisterData {
164        gov_version: u64,
165        data: Vec<(PublicKey, SchemaType, String, CreatorQuantity)>,
166    },
167    CreateSubject {
168        creator: PublicKey,
169        subject_id: DigestIdentifier,
170        namespace: String,
171        schema_id: SchemaType,
172    },
173    DeleteSubject {
174        subject_id: DigestIdentifier,
175    },
176    UpdateSubject {
177        new_owner: PublicKey,
178        old_owner: PublicKey,
179        subject_id: DigestIdentifier,
180        namespace: String,
181        schema_id: SchemaType,
182    },
183}
184
185impl Event for SubjectRegisterEvent {}
186
187#[async_trait]
188impl Actor for SubjectRegister {
189    type Message = SubjectRegisterMessage;
190    type Event = SubjectRegisterEvent;
191    type Response = SubjectRegisterResponse;
192
193    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
194        parent_span.map_or_else(
195            || info_span!("SubjectRegister"),
196            |parent_span| info_span!(parent: parent_span, "SubjectRegister"),
197        )
198    }
199
200    async fn pre_start(
201        &mut self,
202        ctx: &mut ave_actors::ActorContext<Self>,
203    ) -> Result<(), ActorError> {
204        let prefix = ctx.path().parent().key();
205        if let Err(e) = self
206            .init_store("subject_register", Some(prefix), false, ctx)
207            .await
208        {
209            error!(
210                error = %e,
211                "Failed to initialize subject_register store"
212            );
213            return Err(e);
214        }
215        Ok(())
216    }
217}
218
219#[async_trait]
220impl Handler<Self> for SubjectRegister {
221    async fn handle_message(
222        &mut self,
223        _sender: ActorPath,
224        msg: SubjectRegisterMessage,
225        ctx: &mut ave_actors::ActorContext<Self>,
226    ) -> Result<SubjectRegisterResponse, ActorError> {
227        match msg {
228            SubjectRegisterMessage::PurgeStorage => {
229                purge_storage(ctx).await?;
230
231                debug!(
232                    msg_type = "PurgeStorage",
233                    "Subject register storage purged"
234                );
235
236                return Ok(SubjectRegisterResponse::Ok);
237            }
238            SubjectRegisterMessage::GetSubjectsByOwnerSchema {
239                owner,
240                schema_id,
241                namespace,
242            } => {
243                let subjects = self
244                    .register
245                    .get(&(owner, schema_id, namespace))
246                    .map(|(_, subjects)| {
247                        let mut subjects =
248                            subjects.iter().cloned().collect::<Vec<_>>();
249                        subjects.sort();
250                        subjects
251                    })
252                    .unwrap_or_default();
253
254                return Ok(SubjectRegisterResponse::Subjects(subjects));
255            }
256            SubjectRegisterMessage::RegisterData { gov_version, data } => {
257                let data_count = data.len();
258                self.on_event(
259                    SubjectRegisterEvent::RegisterData { gov_version, data },
260                    ctx,
261                )
262                .await;
263
264                debug!(
265                    msg_type = "RegisterData",
266                    gov_version = gov_version,
267                    data_count = data_count,
268                    "Creator data registered"
269                );
270
271                Ok(SubjectRegisterResponse::Ok)
272            }
273            SubjectRegisterMessage::CreateSubject {
274                creator,
275                subject_id,
276                namespace,
277                schema_id,
278                gov_version,
279            } => {
280                self.check(&creator, &namespace, &schema_id, gov_version)?;
281
282                self.on_event(
283                    SubjectRegisterEvent::CreateSubject {
284                        creator: creator.clone(),
285                        subject_id: subject_id.clone(),
286                        namespace: namespace.clone(),
287                        schema_id: schema_id.clone(),
288                    },
289                    ctx,
290                )
291                .await;
292
293                debug!(
294                    msg_type = "CreateSubject",
295                    subject_id = %subject_id,
296                    creator = %creator,
297                    schema_id = ?schema_id,
298                    "Subject created in register"
299                );
300
301                Ok(SubjectRegisterResponse::Ok)
302            }
303            SubjectRegisterMessage::DeleteSubject { subject_id } => {
304                self.on_event(
305                    SubjectRegisterEvent::DeleteSubject {
306                        subject_id: subject_id.clone(),
307                    },
308                    ctx,
309                )
310                .await;
311
312                debug!(
313                    msg_type = "DeleteSubject",
314                    subject_id = %subject_id,
315                    "Subject removed from register"
316                );
317
318                Ok(SubjectRegisterResponse::Ok)
319            }
320            SubjectRegisterMessage::UpdateSubject {
321                new_owner,
322                old_owner,
323                subject_id,
324                namespace,
325                schema_id,
326                gov_version,
327            } => {
328                self.check(&new_owner, &namespace, &schema_id, gov_version)?;
329                self.on_event(
330                    SubjectRegisterEvent::UpdateSubject {
331                        new_owner: new_owner.clone(),
332                        old_owner: old_owner.clone(),
333                        subject_id: subject_id.clone(),
334                        namespace: namespace.clone(),
335                        schema_id: schema_id.clone(),
336                    },
337                    ctx,
338                )
339                .await;
340
341                debug!(
342                    msg_type = "UpdateSubject",
343                    subject_id = %subject_id,
344                    old_owner = %old_owner,
345                    new_owner = %new_owner,
346                    "Subject ownership updated"
347                );
348
349                Ok(SubjectRegisterResponse::Ok)
350            }
351            SubjectRegisterMessage::Check {
352                creator,
353                gov_version,
354                namespace,
355                schema_id,
356            } => {
357                self.check(&creator, &namespace, &schema_id, gov_version)?;
358
359                debug!(
360                    msg_type = "Check",
361                    creator = %creator,
362                    schema_id = ?schema_id,
363                    "Creator check passed"
364                );
365
366                Ok(SubjectRegisterResponse::Ok)
367            }
368        }
369    }
370
371    async fn on_event(
372        &mut self,
373        event: SubjectRegisterEvent,
374        ctx: &mut ActorContext<Self>,
375    ) {
376        if let Err(e) = self.persist(&event, ctx).await {
377            error!(
378                event = ?event,
379                error = %e,
380                "Failed to persist subject register event"
381            );
382            emit_fail(ctx, e).await;
383        }
384    }
385}
386
387#[async_trait]
388impl PersistentActor for SubjectRegister {
389    type Persistence = LightPersistence;
390    type InitParams = ();
391
392    fn create_initial(_params: Self::InitParams) -> Self {
393        Self::default()
394    }
395
396    /// Change node state.
397    fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
398        match event {
399            SubjectRegisterEvent::RegisterData { gov_version, data } => {
400                for (creator, schema_id, namespace, quantity) in data.iter() {
401                    self.register
402                        .entry((
403                            creator.to_owned(),
404                            schema_id.to_owned(),
405                            namespace.to_owned(),
406                        ))
407                        .or_insert_with(|| (CeilingMap::new(), HashSet::new()))
408                        .0
409                        .insert(*gov_version, quantity.to_owned());
410                }
411
412                debug!(
413                    event_type = "RegisterData",
414                    gov_version = gov_version,
415                    data_count = data.len(),
416                    "Creator data applied to state"
417                );
418            }
419            SubjectRegisterEvent::CreateSubject {
420                creator,
421                subject_id,
422                namespace,
423                schema_id,
424            } => {
425                self.register
426                    .entry((
427                        creator.to_owned(),
428                        schema_id.to_owned(),
429                        namespace.to_owned(),
430                    ))
431                    .or_insert_with(|| (CeilingMap::new(), HashSet::new()))
432                    .1
433                    .insert(subject_id.to_owned());
434
435                debug!(
436                    event_type = "CreateSubject",
437                    subject_id = %subject_id,
438                    creator = %creator,
439                    "Subject added to register state"
440                );
441            }
442            SubjectRegisterEvent::DeleteSubject { subject_id } => {
443                for (_, subjects) in self.register.values_mut() {
444                    subjects.remove(subject_id);
445                }
446
447                debug!(
448                    event_type = "DeleteSubject",
449                    subject_id = %subject_id,
450                    "Subject removed from register state"
451                );
452            }
453            SubjectRegisterEvent::UpdateSubject {
454                new_owner,
455                old_owner,
456                subject_id,
457                namespace,
458                schema_id,
459            } => {
460                self.register
461                    .entry((
462                        new_owner.to_owned(),
463                        schema_id.to_owned(),
464                        namespace.to_owned(),
465                    ))
466                    .or_insert_with(|| (CeilingMap::new(), HashSet::new()))
467                    .1
468                    .insert(subject_id.to_owned());
469
470                self.register
471                    .entry((
472                        old_owner.to_owned(),
473                        schema_id.to_owned(),
474                        namespace.to_owned(),
475                    ))
476                    .or_insert_with(|| (CeilingMap::new(), HashSet::new()))
477                    .1
478                    .remove(subject_id);
479
480                debug!(
481                    event_type = "UpdateSubject",
482                    subject_id = %subject_id,
483                    old_owner = %old_owner,
484                    new_owner = %new_owner,
485                    "Subject ownership updated in state"
486                );
487            }
488        };
489
490        Ok(())
491    }
492}
493
494#[async_trait]
495impl Storable for SubjectRegister {}