Skip to main content

ave_core/subject/
mod.rs

1//! # Subject module.
2//!
3
4use std::{collections::HashSet, ops::Deref};
5
6use crate::{
7    governance::{
8        Governance,
9        data::GovernanceData,
10        model::Quorum,
11        role_register::{RoleDataRegister, SearchRole},
12    },
13    model::{
14        common::{
15            check_quorum_signers, get_n_events, get_validation_roles_register,
16        },
17        event::{Ledger, Protocols, ValidationMetadata},
18    },
19    node::register::{Register, RegisterMessage},
20    tracker::Tracker,
21    validation::{
22        request::{ActualProtocols, LastData, ValidationReq},
23        response::ValidationRes,
24    },
25};
26
27use error::SubjectError;
28
29use ave_actors::{
30    Actor, ActorContext, ActorError, ActorPath, Event, PersistentActor,
31};
32use ave_common::{
33    DataToSink, DataToSinkEvent, Namespace, SchemaType, ValueWrapper,
34    bridge::request::EventRequestType,
35    identity::{
36        DigestIdentifier, HashAlgorithm, PublicKey, Signed, hash_borsh,
37    },
38    request::EventRequest,
39    response::SinkEventsPage,
40};
41
42use async_trait::async_trait;
43use borsh::{BorshDeserialize, BorshSerialize};
44use json_patch::{Patch, patch};
45use serde::{Deserialize, Serialize};
46use serde_json::Value;
47use sinkdata::{SinkData, SinkDataMessage};
48use tracing::{debug, error};
49
50pub mod error;
51pub mod sinkdata;
52
53#[derive(
54    Debug, Clone, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
55)]
56pub struct SignedLedger(pub Signed<Ledger>);
57
58impl Deref for SignedLedger {
59    type Target = Signed<Ledger>;
60
61    fn deref(&self) -> &Self::Target {
62        &self.0
63    }
64}
65
66impl Event for SignedLedger {}
67
68#[derive(
69    Clone, Debug, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
70)]
71pub struct RequestSubjectData {
72    pub subject_id: DigestIdentifier,
73    pub governance_id: DigestIdentifier,
74    pub namespace: Namespace,
75    pub schema_id: SchemaType,
76    pub sn: u64,
77    pub gov_version: u64,
78    pub signer: PublicKey,
79}
80
81/// Subject metadata.
82#[derive(
83    Debug,
84    Clone,
85    Serialize,
86    Deserialize,
87    BorshSerialize,
88    BorshDeserialize,
89    PartialEq,
90    Eq,
91    Hash,
92)]
93pub struct Metadata {
94    pub name: Option<String>,
95    pub description: Option<String>,
96    /// The identifier of the subject of the event.
97    pub subject_id: DigestIdentifier,
98    /// The identifier of the governance contract.
99    pub governance_id: DigestIdentifier,
100    pub genesis_gov_version: u64,
101    pub prev_ledger_event_hash: DigestIdentifier,
102    /// The identifier of the schema_id used to validate the event.
103    pub schema_id: SchemaType,
104    /// The namespace of the subject.
105    pub namespace: Namespace,
106    /// The current sequence number of the subject.
107    pub sn: u64,
108    /// The identifier of the public key of the creator owner.
109    pub creator: PublicKey,
110    /// The identifier of the public key of the subject owner.
111    pub owner: PublicKey,
112    pub new_owner: Option<PublicKey>,
113    /// Indicates whether the subject is active or not.
114    pub active: bool,
115    /// The current status of the subject.
116    pub properties: ValueWrapper,
117}
118
119impl From<Governance> for Metadata {
120    fn from(value: Governance) -> Self {
121        Self {
122            name: value.subject_metadata.name,
123            description: value.subject_metadata.description,
124            subject_id: value.subject_metadata.subject_id.clone(),
125            governance_id: value.subject_metadata.subject_id,
126            genesis_gov_version: 0,
127            prev_ledger_event_hash: value
128                .subject_metadata
129                .prev_ledger_event_hash,
130            schema_id: value.subject_metadata.schema_id,
131            namespace: Namespace::new(),
132            sn: value.subject_metadata.sn,
133            creator: value.subject_metadata.creator,
134            owner: value.subject_metadata.owner,
135            new_owner: value.subject_metadata.new_owner,
136            active: value.subject_metadata.active,
137            properties: value.properties.to_value_wrapper(),
138        }
139    }
140}
141
142impl From<Tracker> for Metadata {
143    fn from(value: Tracker) -> Self {
144        Self {
145            name: value.subject_metadata.name,
146            description: value.subject_metadata.description,
147            subject_id: value.subject_metadata.subject_id,
148            governance_id: value.governance_id,
149            genesis_gov_version: value.genesis_gov_version,
150            prev_ledger_event_hash: value
151                .subject_metadata
152                .prev_ledger_event_hash,
153            schema_id: value.subject_metadata.schema_id,
154            namespace: value.namespace,
155            sn: value.subject_metadata.sn,
156            creator: value.subject_metadata.creator,
157            owner: value.subject_metadata.owner,
158            new_owner: value.subject_metadata.new_owner,
159            active: value.subject_metadata.active,
160            properties: value.properties,
161        }
162    }
163}
164
165#[derive(Clone)]
166pub struct DataForSink {
167    pub gov_id: Option<String>,
168    pub subject_id: String,
169    pub sn: u64,
170    pub owner: String,
171    pub namespace: String,
172    pub schema_id: SchemaType,
173    pub issuer: String,
174    pub event_request_timestamp: u64,
175    pub event_ledger_timestamp: u64,
176    pub gov_version: u64,
177    pub event_data_ledger: EventLedgerDataForSink,
178}
179
180#[derive(Clone, Debug)]
181struct SinkReplayState {
182    governance_id: Option<String>,
183    subject_id: String,
184    owner: String,
185    new_owner: Option<String>,
186    namespace: String,
187    schema_id: SchemaType,
188}
189
190impl SinkReplayState {
191    fn from_metadata(metadata: &Metadata) -> Self {
192        Self {
193            governance_id: if metadata.schema_id.is_gov() {
194                None
195            } else {
196                Some(metadata.governance_id.to_string())
197            },
198            subject_id: metadata.subject_id.to_string(),
199            owner: metadata.owner.to_string(),
200            new_owner: metadata.new_owner.as_ref().map(ToString::to_string),
201            namespace: metadata.namespace.to_string(),
202            schema_id: metadata.schema_id.clone(),
203        }
204    }
205
206    fn data_for_sink(
207        &self,
208        event: &SignedLedger,
209        event_data_ledger: EventLedgerDataForSink,
210    ) -> DataForSink {
211        DataForSink {
212            gov_id: self.governance_id.clone(),
213            subject_id: self.subject_id.clone(),
214            sn: event.content().sn,
215            owner: self.owner.clone(),
216            namespace: self.namespace.clone(),
217            schema_id: self.schema_id.clone(),
218            issuer: event
219                .content()
220                .event_request
221                .signature()
222                .signer
223                .to_string(),
224            event_request_timestamp: event
225                .content()
226                .event_request
227                .signature()
228                .timestamp
229                .as_nanos(),
230            event_ledger_timestamp: event.signature().timestamp.as_nanos(),
231            gov_version: event.content().gov_version,
232            event_data_ledger,
233        }
234    }
235
236    fn apply_success(
237        &mut self,
238        event: &SignedLedger,
239    ) -> Result<(), ActorError> {
240        match event.content().event_request.content() {
241            EventRequest::Create(..) | EventRequest::Fact(..) => Ok(()),
242            EventRequest::Transfer(transfer_request) => {
243                self.new_owner = Some(transfer_request.new_owner.to_string());
244                Ok(())
245            }
246            EventRequest::Confirm(..) => {
247                let Some(new_owner) = self.new_owner.take() else {
248                    return Err(ActorError::Functional {
249                        description:
250                            "Replay confirm event without pending new owner"
251                                .to_owned(),
252                    });
253                };
254                self.owner = new_owner;
255                Ok(())
256            }
257            EventRequest::Reject(..) => {
258                self.new_owner = None;
259                Ok(())
260            }
261            EventRequest::EOL(..) => Ok(()),
262        }
263    }
264}
265
266#[derive(Clone)]
267pub enum EventLedgerDataForSink {
268    Create { state: Value },
269    Fact { patch: Value },
270    Confirm { patch: Option<Value> },
271    Other,
272}
273
274impl EventLedgerDataForSink {
275    pub fn build(protocols: &Protocols, state: &Value) -> Self {
276        match protocols {
277            Protocols::Create { .. } => Self::Create {
278                state: state.clone(),
279            },
280            Protocols::TrackerFact { evaluation, .. }
281            | Protocols::GovFact { evaluation, .. } => Self::Fact {
282                patch: evaluation
283                    .evaluator_res()
284                    .expect("event is valid")
285                    .patch
286                    .0,
287            },
288            Protocols::Transfer { .. }
289            | Protocols::Reject { .. }
290            | Protocols::EOL { .. } => Self::Other,
291            Protocols::TrackerConfirm { .. } => Self::Confirm { patch: None },
292            Protocols::GovConfirm { evaluation, .. } => Self::Confirm {
293                patch: Some(
294                    evaluation.evaluator_res().expect("event is valid").patch.0,
295                ),
296            },
297        }
298    }
299}
300
301fn data_to_sink_event(
302    data: DataForSink,
303    event: &EventRequest,
304) -> DataToSinkEvent {
305    match (event, data.event_data_ledger) {
306        (
307            EventRequest::Create(..),
308            EventLedgerDataForSink::Create { state },
309        ) => DataToSinkEvent::Create {
310            governance_id: data.gov_id,
311            subject_id: data.subject_id,
312            owner: data.owner,
313            schema_id: data.schema_id,
314            namespace: data.namespace.to_string(),
315            sn: data.sn,
316            gov_version: data.gov_version,
317            state,
318        },
319        (
320            EventRequest::Fact(fact_request),
321            EventLedgerDataForSink::Fact { patch },
322        ) => DataToSinkEvent::Fact {
323            governance_id: data.gov_id,
324            subject_id: data.subject_id,
325            issuer: data.issuer.to_string(),
326            owner: data.owner,
327            payload: fact_request.payload.0.clone(),
328            schema_id: data.schema_id,
329            sn: data.sn,
330            gov_version: data.gov_version,
331            patch,
332        },
333        (
334            EventRequest::Transfer(transfer_request),
335            EventLedgerDataForSink::Other,
336        ) => DataToSinkEvent::Transfer {
337            governance_id: data.gov_id,
338            subject_id: data.subject_id,
339            owner: data.owner,
340            new_owner: transfer_request.new_owner.to_string(),
341            schema_id: data.schema_id,
342            sn: data.sn,
343            gov_version: data.gov_version,
344        },
345        (
346            EventRequest::Confirm(confirm_request),
347            EventLedgerDataForSink::Confirm { patch },
348        ) => DataToSinkEvent::Confirm {
349            governance_id: data.gov_id,
350            subject_id: data.subject_id,
351            schema_id: data.schema_id,
352            sn: data.sn,
353            gov_version: data.gov_version,
354            patch,
355            name_old_owner: confirm_request.name_old_owner.clone(),
356        },
357        (EventRequest::Reject(..), EventLedgerDataForSink::Other) => {
358            DataToSinkEvent::Reject {
359                governance_id: data.gov_id,
360                subject_id: data.subject_id,
361                schema_id: data.schema_id,
362                sn: data.sn,
363                gov_version: data.gov_version,
364            }
365        }
366        (EventRequest::EOL(..), EventLedgerDataForSink::Other) => {
367            DataToSinkEvent::Eol {
368                governance_id: data.gov_id,
369                subject_id: data.subject_id,
370                schema_id: data.schema_id,
371                sn: data.sn,
372                gov_version: data.gov_version,
373            }
374        }
375        _ => {
376            unreachable!(
377                "EventLedgerDataForSink is created according to protocols and protocols according to EventRequest"
378            )
379        }
380    }
381}
382
383pub fn build_data_to_sink(
384    data: DataForSink,
385    event: &EventRequest,
386    public_key: &str,
387    sink_timestamp: u64,
388) -> DataToSink {
389    DataToSink {
390        event: data_to_sink_event(data.clone(), event),
391        public_key: public_key.to_owned(),
392        event_request_timestamp: data.event_request_timestamp,
393        event_ledger_timestamp: data.event_ledger_timestamp,
394        sink_timestamp,
395    }
396}
397
398pub fn replay_sink_events(
399    ledgers: &[SignedLedger],
400    public_key: &str,
401    from_sn: u64,
402    to_sn: Option<u64>,
403    limit: u64,
404    sink_timestamp: u64,
405) -> Result<SinkEventsPage, ActorError> {
406    if limit == 0 {
407        return Err(ActorError::Functional {
408            description: "Replay limit must be greater than zero".to_owned(),
409        });
410    }
411
412    let mut replay_state: Option<SinkReplayState> = None;
413    let mut events = Vec::new();
414    let mut next_sn = None;
415    let upper_bound = to_sn.unwrap_or(u64::MAX);
416
417    for ledger in ledgers {
418        if replay_state.is_none() {
419            let metadata =
420                ledger.content().get_create_metadata().map_err(|e| {
421                    ActorError::Functional {
422                        description: e.to_string(),
423                    }
424                })?;
425            replay_state = Some(SinkReplayState::from_metadata(&metadata));
426        }
427
428        let Some(state) = replay_state.as_mut() else {
429            unreachable!("replay state is initialized above");
430        };
431
432        let sn = ledger.content().sn;
433        if sn > upper_bound {
434            break;
435        }
436
437        let is_success = ledger.content().protocols.is_success();
438        if is_success && sn >= from_sn {
439            if events.len() as u64 >= limit {
440                next_sn = Some(sn);
441                break;
442            }
443
444            let event_data_ledger =
445                match ledger.content().event_request.content() {
446                    EventRequest::Create(..) => {
447                        let metadata = ledger
448                            .content()
449                            .get_create_metadata()
450                            .map_err(|e| ActorError::Functional {
451                            description: e.to_string(),
452                        })?;
453                        EventLedgerDataForSink::Create {
454                            state: metadata.properties.0,
455                        }
456                    }
457                    EventRequest::Fact(..)
458                    | EventRequest::Transfer(..)
459                    | EventRequest::Confirm(..)
460                    | EventRequest::Reject(..)
461                    | EventRequest::EOL(..) => EventLedgerDataForSink::build(
462                        &ledger.content().protocols,
463                        &Value::Null,
464                    ),
465                };
466
467            let data = state.data_for_sink(ledger, event_data_ledger);
468            events.push(build_data_to_sink(
469                data,
470                ledger.content().event_request.content(),
471                public_key,
472                sink_timestamp,
473            ));
474        }
475
476        if is_success {
477            state.apply_success(ledger)?;
478        }
479    }
480
481    Ok(SinkEventsPage {
482        from_sn,
483        to_sn,
484        limit,
485        next_sn,
486        has_more: next_sn.is_some(),
487        events,
488    })
489}
490
491#[derive(
492    Default,
493    Debug,
494    Serialize,
495    Deserialize,
496    Clone,
497    BorshSerialize,
498    BorshDeserialize,
499)]
500pub struct SubjectMetadata {
501    /// The name of the subject.
502    pub name: Option<String>,
503    /// The description of the subject.
504    pub description: Option<String>,
505    /// The identifier of the subject.
506    pub subject_id: DigestIdentifier,
507
508    pub schema_id: SchemaType,
509    /// The identifier of the public key of the subject owner.
510    pub owner: PublicKey,
511    /// The identifier of the public key of the new subject owner.
512    pub new_owner: Option<PublicKey>,
513
514    pub prev_ledger_event_hash: DigestIdentifier,
515    /// The identifier of the public key of the subject creator.
516    pub creator: PublicKey,
517    /// Indicates whether the subject is active or not.
518    pub active: bool,
519    /// The current sequence number of the subject.
520    pub sn: u64,
521}
522
523impl SubjectMetadata {
524    pub fn new(data: &Metadata) -> Self {
525        Self {
526            name: data.name.clone(),
527            description: data.description.clone(),
528            subject_id: data.subject_id.clone(),
529            owner: data.creator.clone(),
530            schema_id: data.schema_id.clone(),
531            new_owner: data.new_owner.clone(),
532            prev_ledger_event_hash: data.prev_ledger_event_hash.clone(),
533            creator: data.creator.clone(),
534            active: data.active,
535            sn: data.sn,
536        }
537    }
538}
539
540#[async_trait]
541pub trait Subject
542where
543    <Self as Actor>::Event: BorshSerialize + BorshDeserialize,
544    Self: PersistentActor,
545{
546    fn apply_patch_verify(
547        subject_properties: &mut ValueWrapper,
548        json_patch: ValueWrapper,
549    ) -> Result<(), SubjectError> {
550        let json_patch = serde_json::from_value::<Patch>(json_patch.0)
551            .map_err(|e| SubjectError::PatchConversionFailed {
552                details: e.to_string(),
553            })?;
554
555        patch(&mut subject_properties.0, &json_patch).map_err(|e| {
556            SubjectError::PatchApplicationFailed {
557                details: e.to_string(),
558            }
559        })?;
560
561        Ok(())
562    }
563
564    async fn verify_new_ledger_event(
565        ctx: &mut ActorContext<Self>,
566        new_ledger_event: &SignedLedger,
567        subject_metadata: Metadata,
568        actual_ledger_event_hash: DigestIdentifier,
569        last_data: LastData,
570        hash: &HashAlgorithm,
571    ) -> Result<bool, SubjectError> {
572        if !subject_metadata.active {
573            return Err(SubjectError::SubjectInactive);
574        }
575
576        if new_ledger_event.content().sn != subject_metadata.sn + 1 {
577            return Err(SubjectError::InvalidSequenceNumber {
578                expected: subject_metadata.sn + 1,
579                actual: new_ledger_event.content().sn,
580            });
581        }
582
583        if new_ledger_event.verify().is_err() {
584            return Err(SubjectError::SignatureVerificationFailed {
585                context: "new ledger event signature verification failed"
586                    .to_string(),
587            });
588        }
589
590        let signer = if let Some(new_owner) = &subject_metadata.new_owner {
591            new_owner.clone()
592        } else {
593            subject_metadata.owner.clone()
594        };
595
596        if new_ledger_event.signature().signer != signer {
597            return Err(SubjectError::IncorrectSigner {
598                expected: signer.to_string(),
599                actual: new_ledger_event.signature().signer.to_string(),
600            });
601        }
602
603        if new_ledger_event.content().event_request.verify().is_err() {
604            return Err(SubjectError::SignatureVerificationFailed {
605                context: "event request signature verification failed"
606                    .to_string(),
607            });
608        }
609
610        if actual_ledger_event_hash
611            != new_ledger_event.content().prev_ledger_event_hash
612        {
613            return Err(SubjectError::PreviousHashMismatch);
614        }
615
616        let mut modified_subject_metadata = subject_metadata.clone();
617        modified_subject_metadata.sn += 1;
618
619        let (validation, new_actual_protocols) = match (
620            new_ledger_event.content().event_request.content(),
621            &new_ledger_event.content().protocols,
622            subject_metadata.schema_id.is_gov(),
623        ) {
624            (
625                EventRequest::Fact(..),
626                Protocols::TrackerFact {
627                    evaluation,
628                    validation,
629                },
630                false,
631            ) => {
632                if modified_subject_metadata.new_owner.is_some() {
633                    return Err(SubjectError::UnexpectedFactEvent);
634                }
635
636                if let Some(eval) = evaluation.evaluator_res() {
637                    Self::apply_patch_verify(
638                        &mut modified_subject_metadata.properties,
639                        eval.patch,
640                    )?;
641                }
642                (
643                    validation,
644                    ActualProtocols::Eval {
645                        eval_data: evaluation.clone(),
646                    },
647                )
648            }
649            (
650                EventRequest::Fact(..),
651                Protocols::GovFact {
652                    evaluation,
653                    approval,
654                    validation,
655                },
656                true,
657            ) => {
658                if modified_subject_metadata.new_owner.is_some() {
659                    return Err(SubjectError::UnexpectedFactEvent);
660                }
661
662                let actual_protocols =
663                    if let Some(eval) = evaluation.evaluator_res() {
664                        if let Some(appr) = approval {
665                            if appr.approved {
666                                Self::apply_patch_verify(
667                                    &mut modified_subject_metadata.properties,
668                                    eval.patch,
669                                )?;
670                            }
671
672                            ActualProtocols::EvalApprove {
673                                eval_data: evaluation.clone(),
674                                approval_data: appr.clone(),
675                            }
676                        } else {
677                            return Err(
678                                SubjectError::MissingApprovalAfterEvaluation,
679                            );
680                        }
681                    } else if approval.is_some() {
682                        return Err(
683                        SubjectError::UnexpectedApprovalAfterFailedEvaluation,
684                    );
685                    } else {
686                        ActualProtocols::Eval {
687                            eval_data: evaluation.clone(),
688                        }
689                    };
690
691                (validation, actual_protocols)
692            }
693            (
694                EventRequest::Transfer(transfer),
695                Protocols::Transfer {
696                    evaluation,
697                    validation,
698                },
699                ..,
700            ) => {
701                if modified_subject_metadata.new_owner.is_some() {
702                    return Err(SubjectError::UnexpectedTransferEvent);
703                }
704
705                if let Some(eval) = evaluation.evaluator_res() {
706                    Self::apply_patch_verify(
707                        &mut modified_subject_metadata.properties,
708                        eval.patch,
709                    )?;
710                    modified_subject_metadata.new_owner =
711                        Some(transfer.new_owner.clone());
712                }
713
714                (
715                    validation,
716                    ActualProtocols::Eval {
717                        eval_data: evaluation.clone(),
718                    },
719                )
720            }
721            (
722                EventRequest::Confirm(..),
723                Protocols::TrackerConfirm { validation },
724                false,
725            ) => {
726                if let Some(new_owner) =
727                    &modified_subject_metadata.new_owner.take()
728                {
729                    modified_subject_metadata.owner = new_owner.clone();
730                } else {
731                    return Err(SubjectError::ConfirmWithoutNewOwner);
732                }
733
734                (validation, ActualProtocols::None)
735            }
736            (
737                EventRequest::Confirm(..),
738                Protocols::GovConfirm {
739                    evaluation,
740                    validation,
741                },
742                true,
743            ) => {
744                if let Some(eval) = evaluation.evaluator_res() {
745                    Self::apply_patch_verify(
746                        &mut modified_subject_metadata.properties,
747                        eval.patch,
748                    )?;
749
750                    if let Some(new_owner) =
751                        &modified_subject_metadata.new_owner.take()
752                    {
753                        modified_subject_metadata.owner = new_owner.clone();
754                    } else {
755                        return Err(SubjectError::ConfirmWithoutNewOwner);
756                    }
757                }
758
759                (
760                    validation,
761                    ActualProtocols::Eval {
762                        eval_data: evaluation.clone(),
763                    },
764                )
765            }
766            (
767                EventRequest::Reject(..),
768                Protocols::Reject { validation },
769                ..,
770            ) => {
771                if modified_subject_metadata.new_owner.take().is_none() {
772                    return Err(SubjectError::RejectWithoutNewOwner);
773                }
774
775                (validation, ActualProtocols::None)
776            }
777            (EventRequest::EOL(..), Protocols::EOL { validation }, ..) => {
778                if modified_subject_metadata.new_owner.is_some() {
779                    return Err(SubjectError::UnexpectedEOLEvent);
780                }
781
782                modified_subject_metadata.active = false;
783                (validation, ActualProtocols::None)
784            }
785            _ => {
786                return Err(SubjectError::EventProtocolMismatch);
787            }
788        };
789
790        if modified_subject_metadata.schema_id.is_gov()
791            && new_actual_protocols.is_success()
792        {
793            let mut gov_data = serde_json::from_value::<GovernanceData>(
794                modified_subject_metadata.properties.0,
795            )
796            .map_err(|e| {
797                SubjectError::GovernanceDataConversionFailed {
798                    details: e.to_string(),
799                }
800            })?;
801
802            gov_data.version += 1;
803            modified_subject_metadata.properties = gov_data.to_value_wrapper();
804        }
805
806        let validation_req = ValidationReq::Event {
807            actual_protocols: Box::new(new_actual_protocols),
808            event_request: new_ledger_event.content().event_request.clone(),
809            ledger_hash: actual_ledger_event_hash.clone(),
810            metadata: Box::new(subject_metadata.clone()),
811            last_data: Box::new(last_data),
812            gov_version: new_ledger_event.content().gov_version,
813            sn: new_ledger_event.content().sn,
814        };
815
816        let signed_validation_req = Signed::from_parts(
817            validation_req,
818            validation.validation_req_signature.clone(),
819        );
820
821        if signed_validation_req.verify().is_err() {
822            return Err(SubjectError::InvalidValidationRequestSignature);
823        }
824
825        let hash_signed_val_req =
826            hash_borsh(&*hash.hasher(), &signed_validation_req).map_err(
827                |e| SubjectError::ValidationRequestHashFailed {
828                    details: e.to_string(),
829                },
830            )?;
831
832        if hash_signed_val_req != validation.validation_req_hash {
833            return Err(SubjectError::ValidationRequestHashMismatch);
834        }
835
836        modified_subject_metadata.prev_ledger_event_hash =
837            actual_ledger_event_hash;
838
839        let modified_metadata_hash =
840            hash_borsh(&*hash.hasher(), &modified_subject_metadata).map_err(
841                |e| SubjectError::ModifiedMetadataHashFailed {
842                    details: e.to_string(),
843                },
844            )?;
845
846        let validation_res = ValidationRes::Response {
847            vali_req_hash: hash_signed_val_req,
848            modified_metadata_hash,
849        };
850
851        let role_data = get_validation_roles_register(
852            ctx,
853            &subject_metadata.governance_id,
854            SearchRole {
855                schema_id: subject_metadata.schema_id,
856                namespace: subject_metadata.namespace,
857            },
858            new_ledger_event.content().gov_version,
859        )
860        .await
861        .map_err(|e| SubjectError::ValidatorsRetrievalFailed {
862            details: e.to_string(),
863        })?;
864
865        if !check_quorum_signers(
866            &validation
867                .validators_signatures
868                .iter()
869                .map(|x| x.signer.clone())
870                .collect::<HashSet<PublicKey>>(),
871            &role_data.quorum,
872            &role_data.workers,
873        ) {
874            return Err(SubjectError::InvalidQuorum);
875        }
876
877        for signature in validation.validators_signatures.iter() {
878            let signed_res =
879                Signed::from_parts(validation_res.clone(), signature.clone());
880
881            if signed_res.verify().is_err() {
882                return Err(SubjectError::InvalidValidatorSignature);
883            }
884        }
885
886        Ok(new_ledger_event.content().protocols.is_success())
887    }
888
889    async fn verify_first_ledger_event(
890        ctx: &mut ActorContext<Self>,
891        ledger_event: &SignedLedger,
892        hash: &HashAlgorithm,
893        subject_metadata: Metadata,
894    ) -> Result<(), SubjectError> {
895        if ledger_event.verify().is_err() {
896            return Err(SubjectError::SignatureVerificationFailed {
897                context: "first ledger event signature verification failed"
898                    .to_string(),
899            });
900        }
901
902        if ledger_event.signature().signer != subject_metadata.owner {
903            return Err(SubjectError::IncorrectSigner {
904                expected: subject_metadata.owner.to_string(),
905                actual: ledger_event.signature().signer.to_string(),
906            });
907        }
908
909        if ledger_event.content().event_request.verify().is_err() {
910            return Err(SubjectError::SignatureVerificationFailed {
911                context: "event request signature verification failed"
912                    .to_string(),
913            });
914        }
915
916        if ledger_event.content().sn != 0 {
917            return Err(SubjectError::InvalidCreationSequenceNumber);
918        }
919
920        if !ledger_event.content().prev_ledger_event_hash.is_empty() {
921            return Err(SubjectError::NonEmptyPreviousHashInCreation);
922        }
923
924        let event_request_type = EventRequestType::from(
925            ledger_event.content().event_request.content(),
926        );
927
928        let validation =
929            match (event_request_type, &ledger_event.content().protocols) {
930                (
931                    EventRequestType::Create,
932                    Protocols::Create { validation },
933                ) => validation,
934                _ => {
935                    return Err(SubjectError::EventProtocolMismatch);
936                }
937            };
938
939        let ValidationMetadata::Metadata(metadata) =
940            &validation.validation_metadata
941        else {
942            return Err(SubjectError::InvalidValidationMetadata);
943        };
944
945        let validation_req = ValidationReq::Create {
946            event_request: ledger_event.content().event_request.clone(),
947            gov_version: ledger_event.content().gov_version,
948            subject_id: subject_metadata.subject_id.clone(),
949        };
950
951        let signed_validation_req = Signed::from_parts(
952            validation_req,
953            validation.validation_req_signature.clone(),
954        );
955
956        if signed_validation_req.verify().is_err() {
957            return Err(SubjectError::InvalidValidationRequestSignature);
958        }
959
960        let hash_signed_val_req =
961            hash_borsh(&*hash.hasher(), &signed_validation_req).map_err(
962                |e| SubjectError::ValidationRequestHashFailed {
963                    details: e.to_string(),
964                },
965            )?;
966
967        if hash_signed_val_req != validation.validation_req_hash {
968            return Err(SubjectError::ValidationRequestHashMismatch);
969        }
970
971        if metadata.deref() != &subject_metadata {
972            return Err(SubjectError::MetadataMismatch);
973        }
974
975        if metadata.schema_id == SchemaType::Governance {
976            serde_json::from_value::<GovernanceData>(
977                metadata.properties.0.clone(),
978            )
979            .map_err(|e| {
980                SubjectError::GovernancePropertiesConversionFailed {
981                    details: e.to_string(),
982                }
983            })?;
984        }
985
986        let validation_res = ValidationRes::Create {
987            vali_req_hash: hash_signed_val_req,
988            subject_metadata: Box::new(subject_metadata),
989        };
990
991        let role_data = match metadata.schema_id {
992            SchemaType::Governance => RoleDataRegister {
993                workers: HashSet::from([metadata.owner.clone()]),
994                quorum: Quorum::Majority,
995            },
996            SchemaType::Type(_) => get_validation_roles_register(
997                ctx,
998                &metadata.governance_id,
999                SearchRole {
1000                    schema_id: metadata.schema_id.clone(),
1001                    namespace: metadata.namespace.clone(),
1002                },
1003                ledger_event.content().gov_version,
1004            )
1005            .await
1006            .map_err(|e| {
1007                SubjectError::ValidatorsRetrievalFailed {
1008                    details: e.to_string(),
1009                }
1010            })?,
1011            SchemaType::TrackerSchemas => {
1012                return Err(SubjectError::InvalidSchemaId);
1013            }
1014        };
1015
1016        if !check_quorum_signers(
1017            &validation
1018                .validators_signatures
1019                .iter()
1020                .map(|x| x.signer.clone())
1021                .collect::<HashSet<PublicKey>>(),
1022            &role_data.quorum,
1023            &role_data.workers,
1024        ) {
1025            return Err(SubjectError::InvalidQuorum);
1026        }
1027
1028        for signature in validation.validators_signatures.iter() {
1029            let signed_res =
1030                Signed::from_parts(validation_res.clone(), signature.clone());
1031
1032            if signed_res.verify().is_err() {
1033                return Err(SubjectError::InvalidValidatorSignature);
1034            }
1035        }
1036
1037        Ok(())
1038    }
1039
1040    async fn register(
1041        ctx: &mut ActorContext<Self>,
1042        message: RegisterMessage,
1043    ) -> Result<(), ActorError> {
1044        let register_path = ActorPath::from("/user/node/register");
1045        match ctx.system().get_actor::<Register>(&register_path).await {
1046            Ok(register) => {
1047                register.tell(message.clone()).await?;
1048
1049                debug!(
1050                    message = ?message,
1051                    "Register message sent successfully"
1052                );
1053            }
1054            Err(e) => {
1055                error!(
1056                    path = %register_path,
1057                    "Register actor not found"
1058                );
1059                return Err(e);
1060            }
1061        };
1062
1063        Ok(())
1064    }
1065
1066    async fn event_to_sink(
1067        ctx: &mut ActorContext<Self>,
1068        data: DataForSink,
1069        event: &EventRequest,
1070    ) -> Result<(), ActorError> {
1071        let msg = SinkDataMessage::Event {
1072            event: Box::new(data_to_sink_event(data.clone(), event)),
1073            event_request_timestamp: data.event_request_timestamp,
1074            event_ledger_timestamp: data.event_ledger_timestamp,
1075        };
1076
1077        Self::publish_sink(ctx, msg).await
1078    }
1079
1080    async fn publish_sink(
1081        ctx: &mut ActorContext<Self>,
1082        message: SinkDataMessage,
1083    ) -> Result<(), ActorError> {
1084        let sink_data = ctx.get_child::<SinkData>("sink_data").await?;
1085        let (subject_id, schema_id) = message.get_subject_schema();
1086
1087        sink_data.tell(message).await?;
1088        debug!(
1089            subject_id = %subject_id,
1090            schema_id = %schema_id,
1091            "Message published to sink successfully"
1092        );
1093
1094        Ok(())
1095    }
1096
1097    async fn get_ledger(
1098        &self,
1099        ctx: &mut ActorContext<Self>,
1100        lo_sn: Option<u64>,
1101        hi_sn: u64,
1102    ) -> Result<(Vec<<Self as Actor>::Event>, bool), ActorError> {
1103        if let Some(lo_sn) = lo_sn {
1104            let actual_sn = lo_sn + 1;
1105            if (hi_sn - actual_sn) > 99 {
1106                Ok((get_n_events(ctx, actual_sn, 99).await?, false))
1107            } else {
1108                Ok((
1109                    get_n_events(ctx, actual_sn, hi_sn - actual_sn).await?,
1110                    true,
1111                ))
1112            }
1113        } else if hi_sn > 99 {
1114            Ok((get_n_events(ctx, 0, 99).await?, false))
1115        } else {
1116            Ok((get_n_events(ctx, 0, hi_sn).await?, true))
1117        }
1118    }
1119
1120    async fn update_sn(
1121        &self,
1122        ctx: &mut ActorContext<Self>,
1123    ) -> Result<(), ActorError>;
1124
1125    async fn reject(
1126        &self,
1127        ctx: &mut ActorContext<Self>,
1128        gov_version: u64,
1129    ) -> Result<(), ActorError>;
1130
1131    async fn confirm(
1132        &self,
1133        ctx: &mut ActorContext<Self>,
1134        new_owner: PublicKey,
1135        gov_version: u64,
1136    ) -> Result<(), ActorError>;
1137
1138    async fn transfer(
1139        &self,
1140        ctx: &mut ActorContext<Self>,
1141        new_owner: PublicKey,
1142        gov_version: u64,
1143    ) -> Result<(), ActorError>;
1144
1145    async fn eol(&self, ctx: &mut ActorContext<Self>)
1146    -> Result<(), ActorError>;
1147
1148    fn apply_patch(
1149        &mut self,
1150        json_patch: ValueWrapper,
1151    ) -> Result<(), ActorError>;
1152
1153    async fn manager_new_ledger_events(
1154        &mut self,
1155        ctx: &mut ActorContext<Self>,
1156        events: Vec<SignedLedger>,
1157    ) -> Result<(), ActorError>;
1158
1159    async fn get_last_ledger(
1160        &self,
1161        ctx: &mut ActorContext<Self>,
1162    ) -> Result<Option<SignedLedger>, ActorError>;
1163}