Skip to main content

ave_core/subject/
mod.rs

1//! # Subject module.
2//!
3
4use std::{
5    collections::{BTreeSet, HashSet},
6    ops::Deref,
7};
8
9use crate::{
10    governance::{
11        Governance,
12        data::GovernanceData,
13        model::Quorum,
14        role_register::{RoleDataRegister, SearchRole},
15    },
16    model::{
17        common::{
18            check_quorum_signers, get_n_events, get_validation_roles_register,
19        },
20        event::{Ledger, LedgerSeal, Protocols, ValidationMetadata},
21    },
22    node::register::{Register, RegisterMessage},
23    tracker::Tracker,
24    validation::{
25        request::{ActualProtocols, LastData, ValidationReq},
26        response::ValidationRes,
27    },
28};
29
30use error::SubjectError;
31
32use ave_actors::{
33    Actor, ActorContext, ActorError, ActorPath, Event, PersistentActor,
34};
35use ave_common::{
36    DataToSink, DataToSinkEvent, Namespace, SchemaType, ValueWrapper,
37    identity::{
38        DigestIdentifier, HashAlgorithm, PublicKey, Signed, hash_borsh,
39    },
40    request::EventRequest,
41    response::{
42        SinkEventsPage, SubjectDB, TrackerEventVisibilityDB,
43        TrackerEventVisibilityRangeDB, TrackerStoredVisibilityDB,
44        TrackerStoredVisibilityRangeDB, TrackerVisibilityModeDB,
45        TrackerVisibilityStateDB,
46    },
47};
48
49use async_trait::async_trait;
50use borsh::{BorshDeserialize, BorshSerialize};
51use json_patch::{Patch, patch};
52use serde::{Deserialize, Serialize};
53use serde_json::Value;
54use sinkdata::{SinkData, SinkDataMessage};
55use tracing::{debug, error};
56
57pub mod error;
58pub mod sinkdata;
59
60impl Event for Ledger {}
61
62#[derive(
63    Clone, Debug, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
64)]
65pub struct RequestSubjectData {
66    pub subject_id: DigestIdentifier,
67    pub governance_id: DigestIdentifier,
68    pub namespace: Namespace,
69    pub schema_id: SchemaType,
70    pub sn: u64,
71    pub gov_version: u64,
72    pub signer: PublicKey,
73}
74
75pub struct VerifyNewLedgerEvent<'a> {
76    pub new_ledger_event: &'a Ledger,
77    pub subject_metadata: Metadata,
78    pub actual_ledger_event_hash: DigestIdentifier,
79    pub last_data: LastData,
80    pub hash: &'a HashAlgorithm,
81    pub full_view: bool,
82    pub only_clear_events: bool,
83}
84
85/// Subject metadata.
86#[derive(
87    Debug,
88    Clone,
89    Serialize,
90    Deserialize,
91    BorshSerialize,
92    BorshDeserialize,
93    PartialEq,
94    Eq,
95    Hash,
96)]
97pub struct Metadata {
98    pub name: Option<String>,
99    pub description: Option<String>,
100    /// The identifier of the subject of the event.
101    pub subject_id: DigestIdentifier,
102    /// The identifier of the governance contract.
103    pub governance_id: DigestIdentifier,
104    pub genesis_gov_version: u64,
105    pub prev_ledger_event_hash: DigestIdentifier,
106    /// The identifier of the schema_id used to validate the event.
107    pub schema_id: SchemaType,
108    /// The namespace of the subject.
109    pub namespace: Namespace,
110    /// The current sequence number of the subject.
111    pub sn: u64,
112    /// The identifier of the public key of the creator owner.
113    pub creator: PublicKey,
114    /// The identifier of the public key of the subject owner.
115    pub owner: PublicKey,
116    pub new_owner: Option<PublicKey>,
117    /// Indicates whether the subject is active or not.
118    pub active: bool,
119    /// The current status of the subject.
120    pub properties: ValueWrapper,
121}
122
123#[derive(
124    Debug,
125    Clone,
126    Serialize,
127    Deserialize,
128    BorshSerialize,
129    BorshDeserialize,
130    PartialEq,
131    Eq,
132    Hash,
133)]
134pub struct MetadataWithoutProperties {
135    pub name: Option<String>,
136    pub description: Option<String>,
137    pub subject_id: DigestIdentifier,
138    pub governance_id: DigestIdentifier,
139    pub genesis_gov_version: u64,
140    pub prev_ledger_event_hash: DigestIdentifier,
141    pub schema_id: SchemaType,
142    pub namespace: Namespace,
143    pub sn: u64,
144    pub creator: PublicKey,
145    pub owner: PublicKey,
146    pub new_owner: Option<PublicKey>,
147    pub active: bool,
148}
149
150impl From<Metadata> for MetadataWithoutProperties {
151    fn from(value: Metadata) -> Self {
152        Self {
153            name: value.name,
154            description: value.description,
155            subject_id: value.subject_id,
156            governance_id: value.governance_id,
157            genesis_gov_version: value.genesis_gov_version,
158            prev_ledger_event_hash: value.prev_ledger_event_hash,
159            schema_id: value.schema_id,
160            namespace: value.namespace,
161            sn: value.sn,
162            creator: value.creator,
163            owner: value.owner,
164            new_owner: value.new_owner,
165            active: value.active,
166        }
167    }
168}
169
170impl From<Governance> for Metadata {
171    fn from(value: Governance) -> Self {
172        Self {
173            name: value.subject_metadata.name,
174            description: value.subject_metadata.description,
175            subject_id: value.subject_metadata.subject_id.clone(),
176            governance_id: value.subject_metadata.subject_id,
177            genesis_gov_version: 0,
178            prev_ledger_event_hash: value
179                .subject_metadata
180                .prev_ledger_event_hash,
181            schema_id: value.subject_metadata.schema_id,
182            namespace: Namespace::new(),
183            sn: value.subject_metadata.sn,
184            creator: value.subject_metadata.creator,
185            owner: value.subject_metadata.owner,
186            new_owner: value.subject_metadata.new_owner,
187            active: value.subject_metadata.active,
188            properties: value.properties.to_value_wrapper(),
189        }
190    }
191}
192
193impl From<Tracker> for Metadata {
194    fn from(value: Tracker) -> Self {
195        Self {
196            name: value.subject_metadata.name,
197            description: value.subject_metadata.description,
198            subject_id: value.subject_metadata.subject_id,
199            governance_id: value.governance_id,
200            genesis_gov_version: value.genesis_gov_version,
201            prev_ledger_event_hash: value
202                .subject_metadata
203                .prev_ledger_event_hash,
204            schema_id: value.subject_metadata.schema_id,
205            namespace: value.namespace,
206            sn: value.subject_metadata.sn,
207            creator: value.subject_metadata.creator,
208            owner: value.subject_metadata.owner,
209            new_owner: value.subject_metadata.new_owner,
210            active: value.subject_metadata.active,
211            properties: value.properties,
212        }
213    }
214}
215
216impl From<Governance> for SubjectDB {
217    fn from(value: Governance) -> Self {
218        Self {
219            name: value.subject_metadata.name,
220            description: value.subject_metadata.description,
221            subject_id: value.subject_metadata.subject_id.to_string(),
222            governance_id: value.subject_metadata.subject_id.to_string(),
223            genesis_gov_version: 0,
224            prev_ledger_event_hash: if value
225                .subject_metadata
226                .prev_ledger_event_hash
227                .is_empty()
228            {
229                None
230            } else {
231                Some(value.subject_metadata.prev_ledger_event_hash.to_string())
232            },
233            schema_id: value.subject_metadata.schema_id.to_string(),
234            namespace: Namespace::new().to_string(),
235            sn: value.subject_metadata.sn,
236            creator: value.subject_metadata.creator.to_string(),
237            owner: value.subject_metadata.owner.to_string(),
238            new_owner: value
239                .subject_metadata
240                .new_owner
241                .map(|owner| owner.to_string()),
242            active: value.subject_metadata.active,
243            tracker_visibility: None,
244            properties: value.properties.to_value_wrapper().0,
245        }
246    }
247}
248
249impl From<crate::model::common::TrackerVisibilityMode>
250    for TrackerVisibilityModeDB
251{
252    fn from(value: crate::model::common::TrackerVisibilityMode) -> Self {
253        match value {
254            crate::model::common::TrackerVisibilityMode::Full => Self::Full,
255            crate::model::common::TrackerVisibilityMode::Opaque => Self::Opaque,
256        }
257    }
258}
259
260impl From<crate::model::common::TrackerStoredVisibility>
261    for TrackerStoredVisibilityDB
262{
263    fn from(value: crate::model::common::TrackerStoredVisibility) -> Self {
264        match value {
265            crate::model::common::TrackerStoredVisibility::Full => Self::Full,
266            crate::model::common::TrackerStoredVisibility::Only(viewpoints) => {
267                Self::Only {
268                    viewpoints: viewpoints.into_iter().collect(),
269                }
270            }
271            crate::model::common::TrackerStoredVisibility::None => Self::None,
272        }
273    }
274}
275
276impl From<crate::model::common::TrackerStoredVisibilityRange>
277    for TrackerStoredVisibilityRangeDB
278{
279    fn from(value: crate::model::common::TrackerStoredVisibilityRange) -> Self {
280        Self {
281            from_sn: value.from_sn,
282            to_sn: value.to_sn,
283            visibility: value.visibility.into(),
284        }
285    }
286}
287
288impl From<crate::model::common::TrackerEventVisibility>
289    for TrackerEventVisibilityDB
290{
291    fn from(value: crate::model::common::TrackerEventVisibility) -> Self {
292        match value {
293            crate::model::common::TrackerEventVisibility::NonFact => {
294                Self::NonFact
295            }
296            crate::model::common::TrackerEventVisibility::Fact(viewpoints) => {
297                Self::Fact {
298                    viewpoints: viewpoints.into_iter().collect(),
299                }
300            }
301        }
302    }
303}
304
305impl From<crate::model::common::TrackerEventVisibilityRange>
306    for TrackerEventVisibilityRangeDB
307{
308    fn from(value: crate::model::common::TrackerEventVisibilityRange) -> Self {
309        Self {
310            from_sn: value.from_sn,
311            to_sn: value.to_sn,
312            visibility: value.visibility.into(),
313        }
314    }
315}
316
317impl From<crate::model::common::TrackerVisibilityState>
318    for TrackerVisibilityStateDB
319{
320    fn from(value: crate::model::common::TrackerVisibilityState) -> Self {
321        Self {
322            mode: value.mode.into(),
323            stored_ranges: value
324                .stored_ranges
325                .into_iter()
326                .map(Into::into)
327                .collect(),
328            event_ranges: value
329                .event_ranges
330                .into_iter()
331                .map(Into::into)
332                .collect(),
333        }
334    }
335}
336
337#[derive(Clone)]
338pub struct DataForSink {
339    pub gov_id: Option<String>,
340    pub subject_id: String,
341    pub sn: u64,
342    pub owner: String,
343    pub namespace: String,
344    pub schema_id: SchemaType,
345    pub issuer: String,
346    pub event_request_timestamp: u64,
347    pub event_ledger_timestamp: u64,
348    pub gov_version: u64,
349    pub event_data_ledger: EventLedgerDataForSink,
350}
351
352#[derive(Clone, Debug)]
353struct SinkReplayState {
354    governance_id: Option<String>,
355    subject_id: String,
356    owner: String,
357    new_owner: Option<String>,
358    namespace: String,
359    schema_id: SchemaType,
360}
361
362impl SinkReplayState {
363    fn from_metadata(metadata: &Metadata) -> Self {
364        Self {
365            governance_id: if metadata.schema_id.is_gov() {
366                None
367            } else {
368                Some(metadata.governance_id.to_string())
369            },
370            subject_id: metadata.subject_id.to_string(),
371            owner: metadata.owner.to_string(),
372            new_owner: metadata.new_owner.as_ref().map(ToString::to_string),
373            namespace: metadata.namespace.to_string(),
374            schema_id: metadata.schema_id.clone(),
375        }
376    }
377
378    fn data_for_sink(
379        &self,
380        event: &Ledger,
381        event_data_ledger: EventLedgerDataForSink,
382    ) -> DataForSink {
383        let (issuer, event_request_timestamp) =
384            event.get_issuer_event_request_timestamp();
385
386        DataForSink {
387            gov_id: self.governance_id.clone(),
388            subject_id: self.subject_id.clone(),
389            sn: event.sn,
390            owner: self.owner.clone(),
391            namespace: self.namespace.clone(),
392            schema_id: self.schema_id.clone(),
393            issuer,
394            event_request_timestamp,
395            event_ledger_timestamp: event
396                .ledger_seal_signature
397                .timestamp
398                .as_nanos(),
399            gov_version: event.gov_version,
400            event_data_ledger,
401        }
402    }
403
404    fn build_replay_data_to_sink(
405        &self,
406        ledger: &Ledger,
407        public_key: &str,
408        sink_timestamp: u64,
409    ) -> Result<DataToSink, ActorError> {
410        let replay_parts = SinkReplayEventParts::from_ledger(ledger)?;
411        let data = self.data_for_sink(ledger, replay_parts.event_data_ledger);
412
413        Ok(build_data_to_sink(
414            data,
415            replay_parts.event_request,
416            public_key,
417            sink_timestamp,
418        ))
419    }
420
421    fn apply_success(
422        &mut self,
423        protocols: &Protocols,
424    ) -> Result<(), ActorError> {
425        match protocols {
426            Protocols::Create { .. }
427            | Protocols::TrackerFactFull { .. }
428            | Protocols::TrackerFactOpaque { .. }
429            | Protocols::GovFact { .. }
430            | Protocols::EOL { .. } => Ok(()),
431            Protocols::Transfer { event_request, .. } => {
432                let EventRequest::Transfer(transfer_request) =
433                    event_request.content()
434                else {
435                    error!(
436                        subject_id = %self.subject_id,
437                        actual_request = ?event_request.content(),
438                        "Unexpected event request type while replaying transfer event"
439                    );
440                    return Err(ActorError::Functional {
441                        description:
442                            "Replay transfer event must carry a Transfer request"
443                                .to_owned(),
444                    });
445                };
446                self.new_owner = Some(transfer_request.new_owner.to_string());
447                Ok(())
448            }
449            Protocols::TrackerConfirm { .. } | Protocols::GovConfirm { .. } => {
450                let Some(new_owner) = self.new_owner.take() else {
451                    error!(
452                        subject_id = %self.subject_id,
453                        "Replay confirm event without pending new owner"
454                    );
455                    return Err(ActorError::Functional {
456                        description:
457                            "Replay confirm event without pending new owner"
458                                .to_owned(),
459                    });
460                };
461                self.owner = new_owner;
462                Ok(())
463            }
464            Protocols::Reject { .. } => {
465                self.new_owner = None;
466                Ok(())
467            }
468        }
469    }
470}
471
472struct SinkReplayEventParts {
473    event_request: Option<EventRequest>,
474    event_data_ledger: EventLedgerDataForSink,
475}
476
477impl SinkReplayEventParts {
478    fn from_ledger(ledger: &Ledger) -> Result<Self, ActorError> {
479        match &ledger.protocols {
480            Protocols::Create { event_request, .. } => {
481                let metadata = ledger.get_create_metadata().map_err(|e| {
482                    ActorError::Functional {
483                        description: e.to_string(),
484                    }
485                })?;
486
487                Ok(Self {
488                    event_request: Some(event_request.content().clone()),
489                    event_data_ledger: EventLedgerDataForSink::Create {
490                        state: metadata.properties.0,
491                    },
492                })
493            }
494            Protocols::TrackerFactFull { event_request, .. }
495            | Protocols::GovFact { event_request, .. }
496            | Protocols::Transfer { event_request, .. }
497            | Protocols::TrackerConfirm { event_request, .. }
498            | Protocols::GovConfirm { event_request, .. }
499            | Protocols::Reject { event_request, .. }
500            | Protocols::EOL { event_request, .. } => Ok(Self {
501                event_request: Some(event_request.content().clone()),
502                event_data_ledger: EventLedgerDataForSink::build(
503                    &ledger.protocols,
504                    &Value::Null,
505                ),
506            }),
507            Protocols::TrackerFactOpaque { .. } => Ok(Self {
508                event_request: None,
509                event_data_ledger: EventLedgerDataForSink::build(
510                    &ledger.protocols,
511                    &Value::Null,
512                ),
513            }),
514        }
515    }
516}
517
518#[derive(Clone)]
519pub enum EventLedgerDataForSink {
520    Create {
521        state: Value,
522    },
523    FactFull {
524        patch: Option<Value>,
525        success: bool,
526        error: Option<String>,
527    },
528    FactOpaque {
529        viewpoints: Vec<String>,
530        success: bool,
531    },
532    Transfer {
533        success: bool,
534        error: Option<String>,
535    },
536    Confirm {
537        patch: Option<Value>,
538        success: bool,
539        error: Option<String>,
540    },
541    Reject,
542    Eol,
543}
544
545impl EventLedgerDataForSink {
546    pub fn build(protocols: &Protocols, state: &Value) -> Self {
547        match protocols {
548            Protocols::Create { .. } => Self::Create {
549                state: state.clone(),
550            },
551            Protocols::TrackerFactFull { evaluation, .. }
552            | Protocols::GovFact { evaluation, .. } => {
553                let success = protocols.is_success();
554                let (patch, error) = match &evaluation.response {
555                    crate::model::event::EvaluationResponse::Ok {
556                        result,
557                        ..
558                    } if success => (Some(result.patch.0.clone()), None),
559                    crate::model::event::EvaluationResponse::Ok { .. } => {
560                        (None, None)
561                    }
562                    crate::model::event::EvaluationResponse::Error {
563                        result,
564                        ..
565                    } => (None, Some(result.to_string())),
566                };
567
568                Self::FactFull {
569                    patch,
570                    success,
571                    error,
572                }
573            }
574            Protocols::TrackerFactOpaque { evaluation, .. } => {
575                Self::FactOpaque {
576                    viewpoints: evaluation.viewpoints.iter().cloned().collect(),
577                    success: protocols.is_success(),
578                }
579            }
580            Protocols::Transfer { evaluation, .. } => {
581                let success = protocols.is_success();
582                let error = match &evaluation.response {
583                    crate::model::event::EvaluationResponse::Error {
584                        result,
585                        ..
586                    } => Some(result.to_string()),
587                    crate::model::event::EvaluationResponse::Ok { .. } => None,
588                };
589                Self::Transfer { success, error }
590            }
591            Protocols::Reject { .. } => Self::Reject,
592            Protocols::EOL { .. } => Self::Eol,
593            Protocols::TrackerConfirm { .. } => Self::Confirm {
594                patch: None,
595                success: true,
596                error: None,
597            },
598            Protocols::GovConfirm { evaluation, .. } => {
599                let success = protocols.is_success();
600                let (patch, error) = match &evaluation.response {
601                    crate::model::event::EvaluationResponse::Ok {
602                        result,
603                        ..
604                    } if success => (Some(result.patch.0.clone()), None),
605                    crate::model::event::EvaluationResponse::Ok { .. } => {
606                        (None, None)
607                    }
608                    crate::model::event::EvaluationResponse::Error {
609                        result,
610                        ..
611                    } => (None, Some(result.to_string())),
612                };
613
614                Self::Confirm {
615                    patch,
616                    success,
617                    error,
618                }
619            }
620        }
621    }
622}
623
624fn data_to_sink_event(
625    data: DataForSink,
626    event: Option<EventRequest>,
627) -> DataToSinkEvent {
628    match (event, data.event_data_ledger) {
629        (
630            Some(EventRequest::Create(..)),
631            EventLedgerDataForSink::Create { state },
632        ) => DataToSinkEvent::Create {
633            governance_id: data.gov_id,
634            subject_id: data.subject_id,
635            owner: data.owner,
636            schema_id: data.schema_id,
637            namespace: data.namespace.to_string(),
638            sn: data.sn,
639            gov_version: data.gov_version,
640            state,
641        },
642        (
643            Some(EventRequest::Fact(fact_request)),
644            EventLedgerDataForSink::FactFull {
645                patch,
646                success,
647                error,
648            },
649        ) => DataToSinkEvent::FactFull {
650            governance_id: data.gov_id,
651            subject_id: data.subject_id,
652            issuer: data.issuer.to_string(),
653            viewpoints: fact_request.viewpoints.iter().cloned().collect(),
654            owner: data.owner,
655            payload: success.then_some(fact_request.payload.0),
656            schema_id: data.schema_id,
657            sn: data.sn,
658            gov_version: data.gov_version,
659            patch,
660            success,
661            error,
662        },
663        (
664            None,
665            EventLedgerDataForSink::FactOpaque {
666                viewpoints,
667                success,
668            },
669        ) => DataToSinkEvent::FactOpaque {
670            governance_id: data.gov_id,
671            subject_id: data.subject_id,
672            viewpoints,
673            owner: data.owner,
674            schema_id: data.schema_id,
675            sn: data.sn,
676            gov_version: data.gov_version,
677            success,
678        },
679        (
680            Some(EventRequest::Transfer(transfer_request)),
681            EventLedgerDataForSink::Transfer { success, error },
682        ) => DataToSinkEvent::Transfer {
683            governance_id: data.gov_id,
684            subject_id: data.subject_id,
685            owner: data.owner,
686            new_owner: transfer_request.new_owner.to_string(),
687            schema_id: data.schema_id,
688            sn: data.sn,
689            gov_version: data.gov_version,
690            success,
691            error,
692        },
693        (
694            Some(EventRequest::Confirm(confirm_request)),
695            EventLedgerDataForSink::Confirm {
696                patch,
697                success,
698                error,
699            },
700        ) => DataToSinkEvent::Confirm {
701            governance_id: data.gov_id,
702            subject_id: data.subject_id,
703            schema_id: data.schema_id,
704            sn: data.sn,
705            gov_version: data.gov_version,
706            patch,
707            success,
708            error,
709            name_old_owner: confirm_request.name_old_owner,
710        },
711        (Some(EventRequest::Reject(..)), EventLedgerDataForSink::Reject) => {
712            DataToSinkEvent::Reject {
713                governance_id: data.gov_id,
714                subject_id: data.subject_id,
715                schema_id: data.schema_id,
716                sn: data.sn,
717                gov_version: data.gov_version,
718            }
719        }
720        (Some(EventRequest::EOL(..)), EventLedgerDataForSink::Eol) => {
721            DataToSinkEvent::Eol {
722                governance_id: data.gov_id,
723                subject_id: data.subject_id,
724                schema_id: data.schema_id,
725                sn: data.sn,
726                gov_version: data.gov_version,
727            }
728        }
729        _ => {
730            unreachable!(
731                "EventLedgerDataForSink is created according to protocols and protocols according to EventRequest"
732            )
733        }
734    }
735}
736
737pub fn build_data_to_sink(
738    data: DataForSink,
739    event: Option<EventRequest>,
740    public_key: &str,
741    sink_timestamp: u64,
742) -> DataToSink {
743    DataToSink {
744        payload: data_to_sink_event(data.clone(), event),
745        public_key: public_key.to_owned(),
746        event_request_timestamp: data.event_request_timestamp,
747        event_ledger_timestamp: data.event_ledger_timestamp,
748        sink_timestamp,
749    }
750}
751
752pub fn replay_sink_events(
753    ledgers: &[Ledger],
754    public_key: &str,
755    from_sn: u64,
756    to_sn: Option<u64>,
757    limit: u64,
758    sink_timestamp: u64,
759) -> Result<SinkEventsPage, ActorError> {
760    if limit == 0 {
761        return Err(ActorError::Functional {
762            description: "Replay limit must be greater than zero".to_owned(),
763        });
764    }
765
766    let mut replay_state: Option<SinkReplayState> = None;
767    let mut events = Vec::new();
768    let mut next_sn = None;
769    let upper_bound = to_sn.unwrap_or(u64::MAX);
770
771    for ledger in ledgers {
772        if replay_state.is_none() {
773            let metadata = ledger.get_create_metadata().map_err(|e| {
774                ActorError::Functional {
775                    description: e.to_string(),
776                }
777            })?;
778            replay_state = Some(SinkReplayState::from_metadata(&metadata));
779        }
780
781        let Some(state) = replay_state.as_mut() else {
782            unreachable!("replay state is initialized above");
783        };
784
785        let sn = ledger.sn;
786        if sn > upper_bound {
787            break;
788        }
789
790        let is_success = ledger.protocols.is_success();
791        if sn >= from_sn {
792            if events.len() as u64 >= limit {
793                next_sn = Some(sn);
794                break;
795            }
796
797            events.push(state.build_replay_data_to_sink(
798                ledger,
799                public_key,
800                sink_timestamp,
801            )?);
802        }
803
804        if is_success {
805            state.apply_success(&ledger.protocols)?;
806        }
807    }
808
809    Ok(SinkEventsPage {
810        from_sn,
811        to_sn,
812        limit,
813        next_sn,
814        has_more: next_sn.is_some(),
815        events,
816    })
817}
818
819#[derive(
820    Default,
821    Debug,
822    Serialize,
823    Deserialize,
824    Clone,
825    BorshSerialize,
826    BorshDeserialize,
827)]
828pub struct SubjectMetadata {
829    /// The name of the subject.
830    pub name: Option<String>,
831    /// The description of the subject.
832    pub description: Option<String>,
833    /// The identifier of the subject.
834    pub subject_id: DigestIdentifier,
835
836    pub schema_id: SchemaType,
837    /// The identifier of the public key of the subject owner.
838    pub owner: PublicKey,
839    /// The identifier of the public key of the new subject owner.
840    pub new_owner: Option<PublicKey>,
841
842    pub prev_ledger_event_hash: DigestIdentifier,
843    /// The identifier of the public key of the subject creator.
844    pub creator: PublicKey,
845    /// Indicates whether the subject is active or not.
846    pub active: bool,
847    /// The current sequence number of the subject.
848    pub sn: u64,
849}
850
851impl SubjectMetadata {
852    pub fn new(data: &Metadata) -> Self {
853        Self {
854            name: data.name.clone(),
855            description: data.description.clone(),
856            subject_id: data.subject_id.clone(),
857            owner: data.creator.clone(),
858            schema_id: data.schema_id.clone(),
859            new_owner: data.new_owner.clone(),
860            prev_ledger_event_hash: data.prev_ledger_event_hash.clone(),
861            creator: data.creator.clone(),
862            active: data.active,
863            sn: data.sn,
864        }
865    }
866}
867
868#[async_trait]
869pub trait Subject
870where
871    <Self as Actor>::Event: BorshSerialize + BorshDeserialize,
872    Self: PersistentActor,
873{
874    fn verify_new_ledger_event_args<'a>(
875        new_ledger_event: &'a Ledger,
876        subject_metadata: Metadata,
877        actual_ledger_event_hash: DigestIdentifier,
878        last_data: LastData,
879        hash: &'a HashAlgorithm,
880        full_view: bool,
881        only_clear_events: bool,
882    ) -> VerifyNewLedgerEvent<'a> {
883        VerifyNewLedgerEvent {
884            new_ledger_event,
885            subject_metadata,
886            actual_ledger_event_hash,
887            last_data,
888            hash,
889            full_view,
890            only_clear_events,
891        }
892    }
893
894    fn hash_viewpoints(
895        hash: &HashAlgorithm,
896        viewpoints: &BTreeSet<String>,
897    ) -> Result<DigestIdentifier, SubjectError> {
898        hash_borsh(&*hash.hasher(), viewpoints).map_err(|e| {
899            SubjectError::HashCreationFailed {
900                details: e.to_string(),
901            }
902        })
903    }
904
905    fn request_viewpoints(event_request: &EventRequest) -> BTreeSet<String> {
906        match event_request {
907            EventRequest::Fact(fact_request) => fact_request.viewpoints.clone(),
908            _ => BTreeSet::new(),
909        }
910    }
911
912    fn apply_patch_verify(
913        subject_properties: &mut ValueWrapper,
914        json_patch: ValueWrapper,
915    ) -> Result<(), SubjectError> {
916        let json_patch = serde_json::from_value::<Patch>(json_patch.0)
917            .map_err(|e| SubjectError::PatchConversionFailed {
918                details: e.to_string(),
919            })?;
920
921        patch(&mut subject_properties.0, &json_patch).map_err(|e| {
922            SubjectError::PatchApplicationFailed {
923                details: e.to_string(),
924            }
925        })?;
926
927        Ok(())
928    }
929
930    async fn verify_new_ledger_event(
931        ctx: &mut ActorContext<Self>,
932        args: VerifyNewLedgerEvent<'_>,
933    ) -> Result<bool, SubjectError> {
934        let VerifyNewLedgerEvent {
935            new_ledger_event,
936            subject_metadata,
937            actual_ledger_event_hash,
938            last_data,
939            hash,
940            full_view,
941            only_clear_events,
942        } = args;
943
944        if !subject_metadata.active {
945            return Err(SubjectError::SubjectInactive);
946        }
947
948        if new_ledger_event.sn != subject_metadata.sn + 1 {
949            return Err(SubjectError::InvalidSequenceNumber {
950                expected: subject_metadata.sn + 1,
951                actual: new_ledger_event.sn,
952            });
953        }
954
955        let protocols_hash = new_ledger_event
956            .protocols
957            .hash_for_ledger(hash)
958            .map_err(|e| SubjectError::HashCreationFailed {
959            details: e.to_string(),
960        })?;
961
962        let ledger_seal = LedgerSeal {
963            gov_version: new_ledger_event.gov_version,
964            sn: new_ledger_event.sn,
965            prev_ledger_event_hash: new_ledger_event
966                .prev_ledger_event_hash
967                .clone(),
968            protocols_hash,
969        };
970
971        if new_ledger_event
972            .ledger_seal_signature
973            .verify(&ledger_seal)
974            .is_err()
975        {
976            return Err(SubjectError::SignatureVerificationFailed {
977                context: "new ledger event signature verification failed"
978                    .to_string(),
979            });
980        }
981
982        let signer = if let Some(new_owner) = &subject_metadata.new_owner {
983            new_owner.clone()
984        } else {
985            subject_metadata.owner.clone()
986        };
987
988        if new_ledger_event.ledger_seal_signature.signer != signer {
989            return Err(SubjectError::IncorrectSigner {
990                expected: signer.to_string(),
991                actual: new_ledger_event
992                    .ledger_seal_signature
993                    .signer
994                    .to_string(),
995            });
996        }
997
998        if actual_ledger_event_hash != new_ledger_event.prev_ledger_event_hash {
999            return Err(SubjectError::PreviousHashMismatch);
1000        }
1001
1002        let mut modified_subject_metadata = subject_metadata.clone();
1003        modified_subject_metadata.sn += 1;
1004
1005        let (
1006            validation,
1007            new_actual_protocols,
1008            event_request,
1009            opaque_event_request_hash,
1010            opaque_viewpoints_hash,
1011        ) = match (
1012            &new_ledger_event.protocols,
1013            subject_metadata.schema_id.is_gov(),
1014        ) {
1015            (
1016                Protocols::TrackerFactFull {
1017                    event_request,
1018                    evaluation,
1019                    validation,
1020                },
1021                false,
1022            ) => {
1023                if let EventRequest::Fact(..) = event_request.content() {
1024                } else {
1025                    return Err(SubjectError::EventProtocolMismatch);
1026                }
1027
1028                if modified_subject_metadata.new_owner.is_some() {
1029                    return Err(SubjectError::UnexpectedFactEvent);
1030                }
1031
1032                if full_view
1033                    && let Some(eval) = evaluation.evaluator_response_ok()
1034                {
1035                    Self::apply_patch_verify(
1036                        &mut modified_subject_metadata.properties,
1037                        eval.patch,
1038                    )?;
1039                }
1040                (
1041                    validation,
1042                    ActualProtocols::Eval {
1043                        eval_data: evaluation.clone(),
1044                    },
1045                    Some(event_request),
1046                    None,
1047                    None,
1048                )
1049            }
1050            (
1051                Protocols::TrackerFactOpaque {
1052                    data,
1053                    event_request_hash,
1054                    evaluation,
1055                    validation,
1056                    ..
1057                },
1058                false,
1059            ) => {
1060                if only_clear_events {
1061                    return Err(
1062                        SubjectError::OnlyClearEventsCannotAcceptTrackerOpaque,
1063                    );
1064                }
1065
1066                if data.subject_id != subject_metadata.subject_id {
1067                    return Err(SubjectError::SubjectIdMismatch {
1068                        expected: subject_metadata.subject_id.to_string(),
1069                        actual: data.subject_id.to_string(),
1070                    });
1071                }
1072
1073                (
1074                    validation,
1075                    ActualProtocols::None,
1076                    None,
1077                    Some(event_request_hash.clone()),
1078                    Some(Self::hash_viewpoints(hash, &evaluation.viewpoints)?),
1079                )
1080            }
1081            (
1082                Protocols::GovFact {
1083                    event_request,
1084                    evaluation,
1085                    approval,
1086                    validation,
1087                },
1088                true,
1089            ) => {
1090                if let EventRequest::Fact(fact_request) =
1091                    event_request.content()
1092                {
1093                    if !fact_request.viewpoints.is_empty() {
1094                        return Err(
1095                            SubjectError::GovernanceFactViewpointsNotAllowed,
1096                        );
1097                    }
1098                } else {
1099                    return Err(SubjectError::EventProtocolMismatch);
1100                }
1101
1102                if modified_subject_metadata.new_owner.is_some() {
1103                    return Err(SubjectError::UnexpectedFactEvent);
1104                }
1105
1106                let actual_protocols =
1107                    if let Some(eval) = evaluation.evaluator_response_ok() {
1108                        if let Some(appr) = approval {
1109                            if appr.approved {
1110                                Self::apply_patch_verify(
1111                                    &mut modified_subject_metadata.properties,
1112                                    eval.patch,
1113                                )?;
1114                            }
1115
1116                            ActualProtocols::EvalApprove {
1117                                eval_data: evaluation.clone(),
1118                                approval_data: appr.clone(),
1119                            }
1120                        } else {
1121                            return Err(
1122                                SubjectError::MissingApprovalAfterEvaluation,
1123                            );
1124                        }
1125                    } else if approval.is_some() {
1126                        return Err(
1127                        SubjectError::UnexpectedApprovalAfterFailedEvaluation,
1128                    );
1129                    } else {
1130                        ActualProtocols::Eval {
1131                            eval_data: evaluation.clone(),
1132                        }
1133                    };
1134
1135                (
1136                    validation,
1137                    actual_protocols,
1138                    Some(event_request),
1139                    None,
1140                    None,
1141                )
1142            }
1143            (
1144                Protocols::Transfer {
1145                    event_request,
1146                    evaluation,
1147                    validation,
1148                },
1149                ..,
1150            ) => {
1151                let EventRequest::Transfer(transfer) = event_request.content()
1152                else {
1153                    return Err(SubjectError::EventProtocolMismatch);
1154                };
1155
1156                if modified_subject_metadata.new_owner.is_some() {
1157                    return Err(SubjectError::UnexpectedTransferEvent);
1158                }
1159
1160                if let Some(eval) = evaluation.evaluator_response_ok() {
1161                    Self::apply_patch_verify(
1162                        &mut modified_subject_metadata.properties,
1163                        eval.patch,
1164                    )?;
1165                    modified_subject_metadata.new_owner =
1166                        Some(transfer.new_owner.clone());
1167                }
1168
1169                (
1170                    validation,
1171                    ActualProtocols::Eval {
1172                        eval_data: evaluation.clone(),
1173                    },
1174                    Some(event_request),
1175                    None,
1176                    None,
1177                )
1178            }
1179            (
1180                Protocols::TrackerConfirm {
1181                    event_request,
1182                    validation,
1183                },
1184                false,
1185            ) => {
1186                if let EventRequest::Confirm(..) = event_request.content() {
1187                } else {
1188                    return Err(SubjectError::EventProtocolMismatch);
1189                }
1190
1191                if let Some(new_owner) =
1192                    &modified_subject_metadata.new_owner.take()
1193                {
1194                    modified_subject_metadata.owner = new_owner.clone();
1195                } else {
1196                    return Err(SubjectError::ConfirmWithoutNewOwner);
1197                }
1198
1199                (
1200                    validation,
1201                    ActualProtocols::None,
1202                    Some(event_request),
1203                    None,
1204                    None,
1205                )
1206            }
1207            (
1208                Protocols::GovConfirm {
1209                    event_request,
1210                    evaluation,
1211                    validation,
1212                },
1213                true,
1214            ) => {
1215                if let EventRequest::Confirm(..) = event_request.content() {
1216                } else {
1217                    return Err(SubjectError::EventProtocolMismatch);
1218                }
1219
1220                if let Some(eval) = evaluation.evaluator_response_ok() {
1221                    Self::apply_patch_verify(
1222                        &mut modified_subject_metadata.properties,
1223                        eval.patch,
1224                    )?;
1225
1226                    if let Some(new_owner) =
1227                        &modified_subject_metadata.new_owner.take()
1228                    {
1229                        modified_subject_metadata.owner = new_owner.clone();
1230                    } else {
1231                        return Err(SubjectError::ConfirmWithoutNewOwner);
1232                    }
1233                }
1234
1235                (
1236                    validation,
1237                    ActualProtocols::Eval {
1238                        eval_data: evaluation.clone(),
1239                    },
1240                    Some(event_request),
1241                    None,
1242                    None,
1243                )
1244            }
1245            (
1246                Protocols::Reject {
1247                    event_request,
1248                    validation,
1249                },
1250                ..,
1251            ) => {
1252                if let EventRequest::Reject(..) = event_request.content() {
1253                } else {
1254                    return Err(SubjectError::EventProtocolMismatch);
1255                }
1256
1257                if modified_subject_metadata.new_owner.take().is_none() {
1258                    return Err(SubjectError::RejectWithoutNewOwner);
1259                }
1260
1261                (
1262                    validation,
1263                    ActualProtocols::None,
1264                    Some(event_request),
1265                    None,
1266                    None,
1267                )
1268            }
1269            (
1270                Protocols::EOL {
1271                    event_request,
1272                    validation,
1273                },
1274                ..,
1275            ) => {
1276                if let EventRequest::EOL(..) = event_request.content() {
1277                } else {
1278                    return Err(SubjectError::EventProtocolMismatch);
1279                }
1280
1281                if modified_subject_metadata.new_owner.is_some() {
1282                    return Err(SubjectError::UnexpectedEOLEvent);
1283                }
1284
1285                modified_subject_metadata.active = false;
1286                (
1287                    validation,
1288                    ActualProtocols::None,
1289                    Some(event_request),
1290                    None,
1291                    None,
1292                )
1293            }
1294            _ => {
1295                return Err(SubjectError::EventProtocolMismatch);
1296            }
1297        };
1298
1299        if let Some(event_request) = event_request {
1300            if event_request.verify().is_err() {
1301                return Err(SubjectError::SignatureVerificationFailed {
1302                    context: "event request signature verification failed"
1303                        .to_string(),
1304                });
1305            }
1306
1307            let signer = event_request.signature().signer.clone();
1308            if !event_request.content().check_request_signature(
1309                &signer,
1310                &subject_metadata.owner,
1311                &subject_metadata.new_owner,
1312            ) {
1313                let (event, expected) = match event_request.content() {
1314                    EventRequest::Create(..) => {
1315                        ("create", subject_metadata.owner.to_string())
1316                    }
1317                    EventRequest::Transfer(..) => {
1318                        ("transfer", subject_metadata.owner.to_string())
1319                    }
1320                    EventRequest::EOL(..) => {
1321                        ("eol", subject_metadata.owner.to_string())
1322                    }
1323                    EventRequest::Confirm(..) => (
1324                        "confirm",
1325                        subject_metadata.new_owner.as_ref().map_or_else(
1326                            || "new_owner".to_owned(),
1327                            ToString::to_string,
1328                        ),
1329                    ),
1330                    EventRequest::Reject(..) => (
1331                        "reject",
1332                        subject_metadata.new_owner.as_ref().map_or_else(
1333                            || "new_owner".to_owned(),
1334                            ToString::to_string,
1335                        ),
1336                    ),
1337                    EventRequest::Fact(..) => ("fact", signer.to_string()),
1338                };
1339
1340                return Err(SubjectError::InvalidEventRequestSigner {
1341                    event: event.to_owned(),
1342                    expected,
1343                    actual: signer.to_string(),
1344                });
1345            }
1346
1347            let event_subject_id = event_request.content().get_subject_id();
1348            if event_subject_id != subject_metadata.subject_id {
1349                return Err(SubjectError::SubjectIdMismatch {
1350                    expected: subject_metadata.subject_id.to_string(),
1351                    actual: event_subject_id.to_string(),
1352                });
1353            }
1354        }
1355
1356        if modified_subject_metadata.schema_id.is_gov()
1357            && new_actual_protocols.is_success()
1358        {
1359            let mut gov_data = serde_json::from_value::<GovernanceData>(
1360                modified_subject_metadata.properties.0,
1361            )
1362            .map_err(|e| {
1363                SubjectError::GovernanceDataConversionFailed {
1364                    details: e.to_string(),
1365                }
1366            })?;
1367
1368            gov_data.version += 1;
1369            modified_subject_metadata.properties = gov_data.to_value_wrapper();
1370        }
1371
1372        modified_subject_metadata.prev_ledger_event_hash =
1373            actual_ledger_event_hash.clone();
1374
1375        let meta_wo_props =
1376            MetadataWithoutProperties::from(modified_subject_metadata.clone());
1377
1378        let meta_wo_props_hash = hash_borsh(&*hash.hasher(), &meta_wo_props)
1379            .map_err(|e| SubjectError::ModifiedMetadataHashFailed {
1380                details: e.to_string(),
1381            })?;
1382
1383        let (event_request_hash, viewpoints_hash) = if let Some(event_request) =
1384            event_request
1385        {
1386            (
1387                hash_borsh(&*hash.hasher(), event_request).map_err(|e| {
1388                    SubjectError::HashCreationFailed {
1389                        details: e.to_string(),
1390                    }
1391                })?,
1392                Self::hash_viewpoints(
1393                    hash,
1394                    &Self::request_viewpoints(event_request.content()),
1395                )?,
1396            )
1397        } else {
1398            (
1399                opaque_event_request_hash.ok_or_else(|| {
1400                    SubjectError::CannotObtain {
1401                        what: "tracker opaque event_request_hash".to_owned(),
1402                    }
1403                })?,
1404                opaque_viewpoints_hash.ok_or_else(|| {
1405                    SubjectError::CannotObtain {
1406                        what: "tracker opaque viewpoints_hash".to_owned(),
1407                    }
1408                })?,
1409            )
1410        };
1411
1412        let propierties_hash = if full_view
1413            && let Some(event_request) = event_request
1414        {
1415            let validation_req = ValidationReq::Event {
1416                actual_protocols: Box::new(new_actual_protocols),
1417                event_request: event_request.clone(),
1418                ledger_hash: actual_ledger_event_hash,
1419                metadata: Box::new(subject_metadata.clone()),
1420                last_data: Box::new(last_data),
1421                gov_version: new_ledger_event.gov_version,
1422                sn: new_ledger_event.sn,
1423            };
1424
1425            let signed_validation_req = Signed::from_parts(
1426                validation_req,
1427                validation.validation_req_signature.clone(),
1428            );
1429
1430            if signed_validation_req.verify().is_err() {
1431                return Err(SubjectError::InvalidValidationRequestSignature);
1432            }
1433
1434            let hash_signed_val_req =
1435                hash_borsh(&*hash.hasher(), &signed_validation_req).map_err(
1436                    |e| SubjectError::ValidationRequestHashFailed {
1437                        details: e.to_string(),
1438                    },
1439                )?;
1440
1441            if hash_signed_val_req != validation.validation_req_hash {
1442                return Err(SubjectError::ValidationRequestHashMismatch);
1443            }
1444
1445            let prop_hash = hash_borsh(
1446                &*hash.hasher(),
1447                &modified_subject_metadata.properties,
1448            )
1449            .map_err(|e| {
1450                SubjectError::ModifiedMetadataHashFailed {
1451                    details: e.to_string(),
1452                }
1453            })?;
1454
1455            if let ValidationMetadata::ModifiedHash {
1456                propierties_hash,
1457                modified_metadata_without_propierties_hash,
1458                event_request_hash: validation_event_request_hash,
1459                viewpoints_hash: validation_viewpoints_hash,
1460            } = &validation.validation_metadata
1461            {
1462                if modified_metadata_without_propierties_hash
1463                    != &meta_wo_props_hash
1464                {
1465                    return Err(
1466                        SubjectError::ModifiedMetadataWithoutPropertiesHashMismatch {
1467                            expected: meta_wo_props_hash.to_string(),
1468                            actual: modified_metadata_without_propierties_hash
1469                                .to_string(),
1470                        },
1471                    );
1472                }
1473
1474                if &prop_hash != propierties_hash {
1475                    return Err(SubjectError::PropertiesHashMismatch {
1476                        expected: prop_hash.to_string(),
1477                        actual: propierties_hash.to_string(),
1478                    });
1479                }
1480
1481                if &event_request_hash != validation_event_request_hash {
1482                    return Err(SubjectError::EventRequestHashMismatch {
1483                        expected: event_request_hash.to_string(),
1484                        actual: validation_event_request_hash.to_string(),
1485                    });
1486                }
1487
1488                if &viewpoints_hash != validation_viewpoints_hash {
1489                    return Err(SubjectError::ViewpointsHashMismatch {
1490                        expected: viewpoints_hash.to_string(),
1491                        actual: validation_viewpoints_hash.to_string(),
1492                    });
1493                }
1494            } else {
1495                return Err(SubjectError::InvalidNonCreationValidationMetadata);
1496            }
1497
1498            prop_hash
1499        } else {
1500            if let ValidationMetadata::ModifiedHash {
1501                propierties_hash,
1502                modified_metadata_without_propierties_hash,
1503                event_request_hash: validation_event_request_hash,
1504                viewpoints_hash: validation_viewpoints_hash,
1505            } = &validation.validation_metadata
1506            {
1507                if modified_metadata_without_propierties_hash
1508                    != &meta_wo_props_hash
1509                {
1510                    return Err(
1511                        SubjectError::ModifiedMetadataWithoutPropertiesHashMismatch {
1512                            expected: meta_wo_props_hash.to_string(),
1513                            actual: modified_metadata_without_propierties_hash
1514                                .to_string(),
1515                        },
1516                    );
1517                }
1518
1519                if &event_request_hash != validation_event_request_hash {
1520                    return Err(SubjectError::EventRequestHashMismatch {
1521                        expected: event_request_hash.to_string(),
1522                        actual: validation_event_request_hash.to_string(),
1523                    });
1524                }
1525
1526                if &viewpoints_hash != validation_viewpoints_hash {
1527                    return Err(SubjectError::ViewpointsHashMismatch {
1528                        expected: viewpoints_hash.to_string(),
1529                        actual: validation_viewpoints_hash.to_string(),
1530                    });
1531                }
1532
1533                propierties_hash.clone()
1534            } else {
1535                return Err(SubjectError::InvalidNonCreationValidationMetadata);
1536            }
1537        };
1538
1539        let validation_res = ValidationRes::Response {
1540            vali_req_hash: validation.validation_req_hash.clone(),
1541            modified_metadata_without_propierties_hash: meta_wo_props_hash,
1542            propierties_hash,
1543            event_request_hash,
1544            viewpoints_hash,
1545        };
1546
1547        let role_data = get_validation_roles_register(
1548            ctx,
1549            &subject_metadata.governance_id,
1550            SearchRole {
1551                schema_id: subject_metadata.schema_id,
1552                namespace: subject_metadata.namespace,
1553            },
1554            new_ledger_event.gov_version,
1555        )
1556        .await
1557        .map_err(|e| SubjectError::ValidatorsRetrievalFailed {
1558            details: e.to_string(),
1559        })?;
1560
1561        if !check_quorum_signers(
1562            &validation
1563                .validators_signatures
1564                .iter()
1565                .map(|x| x.signer.clone())
1566                .collect::<HashSet<PublicKey>>(),
1567            &role_data.quorum,
1568            &role_data.workers,
1569        ) {
1570            return Err(SubjectError::InvalidQuorum);
1571        }
1572
1573        for signature in validation.validators_signatures.iter() {
1574            let signed_res =
1575                Signed::from_parts(validation_res.clone(), signature.clone());
1576
1577            if signed_res.verify().is_err() {
1578                return Err(SubjectError::InvalidValidatorSignature);
1579            }
1580        }
1581
1582        Ok(new_ledger_event.protocols.is_success())
1583    }
1584
1585    async fn verify_first_ledger_event(
1586        ctx: &mut ActorContext<Self>,
1587        ledger_event: &Ledger,
1588        hash: &HashAlgorithm,
1589        subject_metadata: Metadata,
1590    ) -> Result<(), SubjectError> {
1591        if ledger_event.sn != 0 {
1592            return Err(SubjectError::InvalidCreationSequenceNumber);
1593        }
1594
1595        let protocols_hash = ledger_event
1596            .protocols
1597            .hash_for_ledger(hash)
1598            .map_err(|e| SubjectError::HashCreationFailed {
1599                details: e.to_string(),
1600            })?;
1601
1602        let ledger_seal = LedgerSeal {
1603            gov_version: ledger_event.gov_version,
1604            sn: ledger_event.sn,
1605            prev_ledger_event_hash: ledger_event.prev_ledger_event_hash.clone(),
1606            protocols_hash,
1607        };
1608
1609        if ledger_event
1610            .ledger_seal_signature
1611            .verify(&ledger_seal)
1612            .is_err()
1613        {
1614            return Err(SubjectError::SignatureVerificationFailed {
1615                context: "first ledger event signature verification failed"
1616                    .to_string(),
1617            });
1618        }
1619
1620        if ledger_event.ledger_seal_signature.signer != subject_metadata.owner {
1621            return Err(SubjectError::IncorrectSigner {
1622                expected: subject_metadata.owner.to_string(),
1623                actual: ledger_event.ledger_seal_signature.signer.to_string(),
1624            });
1625        }
1626
1627        let (validation, event_request) = match &ledger_event.protocols {
1628            Protocols::Create {
1629                validation,
1630                event_request,
1631            } => {
1632                if let EventRequest::Create(..) = event_request.content() {
1633                } else {
1634                    return Err(SubjectError::EventProtocolMismatch);
1635                }
1636
1637                if event_request.verify().is_err() {
1638                    return Err(SubjectError::SignatureVerificationFailed {
1639                        context: "event request signature verification failed"
1640                            .to_string(),
1641                    });
1642                }
1643
1644                let event_request_signer =
1645                    event_request.signature().signer.clone();
1646                if event_request_signer != subject_metadata.owner {
1647                    return Err(SubjectError::InvalidEventRequestSigner {
1648                        event: "Create".to_owned(),
1649                        expected: subject_metadata.owner.to_string(),
1650                        actual: event_request_signer.to_string(),
1651                    });
1652                }
1653
1654                (validation, event_request)
1655            }
1656            _ => {
1657                return Err(SubjectError::EventProtocolMismatch);
1658            }
1659        };
1660
1661        if !ledger_event.prev_ledger_event_hash.is_empty() {
1662            return Err(SubjectError::NonEmptyPreviousHashInCreation);
1663        }
1664
1665        let ValidationMetadata::Metadata(metadata) =
1666            &validation.validation_metadata
1667        else {
1668            return Err(SubjectError::InvalidValidationMetadata);
1669        };
1670
1671        let validation_req = ValidationReq::Create {
1672            event_request: event_request.clone(),
1673            gov_version: ledger_event.gov_version,
1674            subject_id: subject_metadata.subject_id.clone(),
1675        };
1676
1677        let signed_validation_req = Signed::from_parts(
1678            validation_req,
1679            validation.validation_req_signature.clone(),
1680        );
1681
1682        if signed_validation_req.verify().is_err() {
1683            return Err(SubjectError::InvalidValidationRequestSignature);
1684        }
1685
1686        let hash_signed_val_req =
1687            hash_borsh(&*hash.hasher(), &signed_validation_req).map_err(
1688                |e| SubjectError::ValidationRequestHashFailed {
1689                    details: e.to_string(),
1690                },
1691            )?;
1692
1693        if hash_signed_val_req != validation.validation_req_hash {
1694            return Err(SubjectError::ValidationRequestHashMismatch);
1695        }
1696
1697        if metadata.deref() != &subject_metadata {
1698            return Err(SubjectError::MetadataMismatch);
1699        }
1700
1701        if metadata.schema_id == SchemaType::Governance {
1702            serde_json::from_value::<GovernanceData>(
1703                metadata.properties.0.clone(),
1704            )
1705            .map_err(|e| {
1706                SubjectError::GovernancePropertiesConversionFailed {
1707                    details: e.to_string(),
1708                }
1709            })?;
1710        }
1711
1712        let validation_res = ValidationRes::Create {
1713            vali_req_hash: hash_signed_val_req,
1714            subject_metadata: Box::new(subject_metadata),
1715        };
1716
1717        let role_data = match metadata.schema_id {
1718            SchemaType::Governance => RoleDataRegister {
1719                workers: HashSet::from([metadata.owner.clone()]),
1720                quorum: Quorum::Majority,
1721            },
1722            SchemaType::Type(_) => get_validation_roles_register(
1723                ctx,
1724                &metadata.governance_id,
1725                SearchRole {
1726                    schema_id: metadata.schema_id.clone(),
1727                    namespace: metadata.namespace.clone(),
1728                },
1729                ledger_event.gov_version,
1730            )
1731            .await
1732            .map_err(|e| {
1733                SubjectError::ValidatorsRetrievalFailed {
1734                    details: e.to_string(),
1735                }
1736            })?,
1737            SchemaType::TrackerSchemas => {
1738                return Err(SubjectError::InvalidSchemaId);
1739            }
1740        };
1741
1742        if !check_quorum_signers(
1743            &validation
1744                .validators_signatures
1745                .iter()
1746                .map(|x| x.signer.clone())
1747                .collect::<HashSet<PublicKey>>(),
1748            &role_data.quorum,
1749            &role_data.workers,
1750        ) {
1751            return Err(SubjectError::InvalidQuorum);
1752        }
1753
1754        for signature in validation.validators_signatures.iter() {
1755            let signed_res =
1756                Signed::from_parts(validation_res.clone(), signature.clone());
1757
1758            if signed_res.verify().is_err() {
1759                return Err(SubjectError::InvalidValidatorSignature);
1760            }
1761        }
1762
1763        Ok(())
1764    }
1765
1766    async fn register(
1767        ctx: &mut ActorContext<Self>,
1768        message: RegisterMessage,
1769    ) -> Result<(), ActorError> {
1770        let register_path = ActorPath::from("/user/node/register");
1771        match ctx.system().get_actor::<Register>(&register_path).await {
1772            Ok(register) => {
1773                register.tell(message.clone()).await?;
1774
1775                debug!(
1776                    message = ?message,
1777                    "Register message sent successfully"
1778                );
1779            }
1780            Err(e) => {
1781                error!(
1782                    path = %register_path,
1783                    "Register actor not found"
1784                );
1785                return Err(e);
1786            }
1787        };
1788
1789        Ok(())
1790    }
1791
1792    async fn event_to_sink(
1793        ctx: &mut ActorContext<Self>,
1794        data: DataForSink,
1795        event: Option<EventRequest>,
1796    ) -> Result<(), ActorError> {
1797        let msg = SinkDataMessage::Event {
1798            event: Box::new(data_to_sink_event(data.clone(), event)),
1799            event_request_timestamp: data.event_request_timestamp,
1800            event_ledger_timestamp: data.event_ledger_timestamp,
1801        };
1802
1803        Self::publish_sink(ctx, msg).await
1804    }
1805
1806    async fn publish_sink(
1807        ctx: &mut ActorContext<Self>,
1808        message: SinkDataMessage,
1809    ) -> Result<(), ActorError> {
1810        let sink_data = ctx.get_child::<SinkData>("sink_data").await?;
1811        let (subject_id, schema_id) = message.get_subject_schema();
1812
1813        sink_data.tell(message).await?;
1814        debug!(
1815            subject_id = %subject_id,
1816            schema_id = %schema_id,
1817            "Message published to sink successfully"
1818        );
1819
1820        Ok(())
1821    }
1822
1823    async fn get_ledger(
1824        &self,
1825        ctx: &mut ActorContext<Self>,
1826        lo_sn: Option<u64>,
1827        hi_sn: u64,
1828    ) -> Result<(Vec<<Self as Actor>::Event>, bool), ActorError> {
1829        if let Some(lo_sn) = lo_sn {
1830            let actual_sn = lo_sn + 1;
1831            if hi_sn < actual_sn {
1832                Ok((Vec::new(), true))
1833            } else {
1834                Ok((
1835                    get_n_events(ctx, actual_sn, hi_sn - actual_sn).await?,
1836                    true,
1837                ))
1838            }
1839        } else {
1840            Ok((get_n_events(ctx, 0, hi_sn).await?, true))
1841        }
1842    }
1843
1844    async fn update_sn(
1845        &self,
1846        ctx: &mut ActorContext<Self>,
1847    ) -> Result<(), ActorError>;
1848
1849    async fn reject(
1850        &self,
1851        ctx: &mut ActorContext<Self>,
1852        gov_version: u64,
1853    ) -> Result<(), ActorError>;
1854
1855    async fn confirm(
1856        &self,
1857        ctx: &mut ActorContext<Self>,
1858        new_owner: PublicKey,
1859        gov_version: u64,
1860    ) -> Result<(), ActorError>;
1861
1862    async fn transfer(
1863        &self,
1864        ctx: &mut ActorContext<Self>,
1865        new_owner: PublicKey,
1866        gov_version: u64,
1867    ) -> Result<(), ActorError>;
1868
1869    async fn eol(&self, ctx: &mut ActorContext<Self>)
1870    -> Result<(), ActorError>;
1871
1872    fn apply_patch(
1873        &mut self,
1874        json_patch: ValueWrapper,
1875    ) -> Result<(), ActorError>;
1876
1877    async fn manager_new_ledger_events(
1878        &mut self,
1879        ctx: &mut ActorContext<Self>,
1880        events: Vec<Ledger>,
1881    ) -> Result<(), ActorError>;
1882
1883    async fn get_last_ledger(
1884        &self,
1885        ctx: &mut ActorContext<Self>,
1886    ) -> Result<Option<Ledger>, ActorError>;
1887}