Skip to main content

ave_core/subject/
mod.rs

1//! # Subject module.
2//!
3
4use std::{collections::HashSet, ops::Deref};
5
6use crate::{
7    governance::{
8        Governance,
9        data::GovernanceData,
10        model::Quorum,
11        role_register::{RoleDataRegister, SearchRole},
12    },
13    model::{
14        common::{
15            check_quorum_signers, get_n_events, get_validation_roles_register,
16        },
17        event::{Ledger, Protocols, ValidationMetadata},
18    },
19    node::register::{Register, RegisterMessage},
20    tracker::Tracker,
21    validation::{
22        request::{ActualProtocols, LastData, ValidationReq},
23        response::ValidationRes,
24    },
25};
26
27use error::SubjectError;
28
29use ave_actors::{
30    Actor, ActorContext, ActorError, ActorPath, Event, PersistentActor,
31};
32use ave_common::{
33    DataToSinkEvent, Namespace, SchemaType, ValueWrapper,
34    bridge::request::EventRequestType,
35    identity::{
36        DigestIdentifier, HashAlgorithm, PublicKey, Signed, hash_borsh,
37    },
38    request::EventRequest,
39};
40
41use async_trait::async_trait;
42use borsh::{BorshDeserialize, BorshSerialize};
43use json_patch::{Patch, patch};
44use serde::{Deserialize, Serialize};
45use serde_json::Value;
46use sinkdata::{SinkData, SinkDataMessage};
47use tracing::{debug, error};
48
49pub mod error;
50pub mod sinkdata;
51
52#[derive(
53    Debug, Clone, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
54)]
55pub struct SignedLedger(pub Signed<Ledger>);
56
57impl Deref for SignedLedger {
58    type Target = Signed<Ledger>;
59
60    fn deref(&self) -> &Self::Target {
61        &self.0
62    }
63}
64
65impl Event for SignedLedger {}
66
67#[derive(
68    Clone, Debug, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
69)]
70pub struct RequestSubjectData {
71    pub subject_id: DigestIdentifier,
72    pub governance_id: DigestIdentifier,
73    pub namespace: Namespace,
74    pub schema_id: SchemaType,
75    pub sn: u64,
76    pub gov_version: u64,
77    pub signer: PublicKey,
78}
79
80/// Subject metadata.
81#[derive(
82    Debug,
83    Clone,
84    Serialize,
85    Deserialize,
86    BorshSerialize,
87    BorshDeserialize,
88    PartialEq,
89    Eq,
90    Hash,
91)]
92pub struct Metadata {
93    pub name: Option<String>,
94    pub description: Option<String>,
95    /// The identifier of the subject of the event.
96    pub subject_id: DigestIdentifier,
97    /// The identifier of the governance contract.
98    pub governance_id: DigestIdentifier,
99    pub genesis_gov_version: u64,
100    pub prev_ledger_event_hash: DigestIdentifier,
101    /// The identifier of the schema_id used to validate the event.
102    pub schema_id: SchemaType,
103    /// The namespace of the subject.
104    pub namespace: Namespace,
105    /// The current sequence number of the subject.
106    pub sn: u64,
107    /// The identifier of the public key of the creator owner.
108    pub creator: PublicKey,
109    /// The identifier of the public key of the subject owner.
110    pub owner: PublicKey,
111    pub new_owner: Option<PublicKey>,
112    /// Indicates whether the subject is active or not.
113    pub active: bool,
114    /// The current status of the subject.
115    pub properties: ValueWrapper,
116}
117
118impl From<Governance> for Metadata {
119    fn from(value: Governance) -> Self {
120        Self {
121            name: value.subject_metadata.name,
122            description: value.subject_metadata.description,
123            subject_id: value.subject_metadata.subject_id.clone(),
124            governance_id: value.subject_metadata.subject_id,
125            genesis_gov_version: 0,
126            prev_ledger_event_hash: value
127                .subject_metadata
128                .prev_ledger_event_hash,
129            schema_id: value.subject_metadata.schema_id,
130            namespace: Namespace::new(),
131            sn: value.subject_metadata.sn,
132            creator: value.subject_metadata.creator,
133            owner: value.subject_metadata.owner,
134            new_owner: value.subject_metadata.new_owner,
135            active: value.subject_metadata.active,
136            properties: value.properties.to_value_wrapper(),
137        }
138    }
139}
140
141impl From<Tracker> for Metadata {
142    fn from(value: Tracker) -> Self {
143        Self {
144            name: value.subject_metadata.name,
145            description: value.subject_metadata.description,
146            subject_id: value.subject_metadata.subject_id,
147            governance_id: value.governance_id,
148            genesis_gov_version: value.genesis_gov_version,
149            prev_ledger_event_hash: value
150                .subject_metadata
151                .prev_ledger_event_hash,
152            schema_id: value.subject_metadata.schema_id,
153            namespace: value.namespace,
154            sn: value.subject_metadata.sn,
155            creator: value.subject_metadata.creator,
156            owner: value.subject_metadata.owner,
157            new_owner: value.subject_metadata.new_owner,
158            active: value.subject_metadata.active,
159            properties: value.properties,
160        }
161    }
162}
163
164pub struct DataForSink {
165    pub gov_id: Option<String>,
166    pub subject_id: String,
167    pub sn: u64,
168    pub owner: String,
169    pub namespace: String,
170    pub schema_id: SchemaType,
171    pub issuer: String,
172    pub event_request_timestamp: u64,
173    pub event_ledger_timestamp: u64,
174    pub gov_version: u64,
175    pub event_data_ledger: EventLedgerDataForSink,
176}
177
178pub enum EventLedgerDataForSink {
179    Create { state: Value },
180    Fact { patch: Value },
181    Confirm { patch: Option<Value> },
182    Other,
183}
184
185impl EventLedgerDataForSink {
186    pub fn build(protocols: &Protocols, state: &Value) -> Self {
187        match protocols {
188            Protocols::Create { .. } => Self::Create {
189                state: state.clone(),
190            },
191            Protocols::TrackerFact { evaluation, .. }
192            | Protocols::GovFact { evaluation, .. } => Self::Fact {
193                patch: evaluation
194                    .evaluator_res()
195                    .expect("event is valid")
196                    .patch
197                    .0,
198            },
199            Protocols::Transfer { .. }
200            | Protocols::Reject { .. }
201            | Protocols::EOL { .. } => Self::Other,
202            Protocols::TrackerConfirm { .. } => Self::Confirm { patch: None },
203            Protocols::GovConfirm { evaluation, .. } => Self::Confirm {
204                patch: Some(
205                    evaluation.evaluator_res().expect("event is valid").patch.0,
206                ),
207            },
208        }
209    }
210}
211
212#[derive(
213    Default,
214    Debug,
215    Serialize,
216    Deserialize,
217    Clone,
218    BorshSerialize,
219    BorshDeserialize,
220)]
221pub struct SubjectMetadata {
222    /// The name of the subject.
223    pub name: Option<String>,
224    /// The description of the subject.
225    pub description: Option<String>,
226    /// The identifier of the subject.
227    pub subject_id: DigestIdentifier,
228
229    pub schema_id: SchemaType,
230    /// The identifier of the public key of the subject owner.
231    pub owner: PublicKey,
232    /// The identifier of the public key of the new subject owner.
233    pub new_owner: Option<PublicKey>,
234
235    pub prev_ledger_event_hash: DigestIdentifier,
236    /// The identifier of the public key of the subject creator.
237    pub creator: PublicKey,
238    /// Indicates whether the subject is active or not.
239    pub active: bool,
240    /// The current sequence number of the subject.
241    pub sn: u64,
242}
243
244impl SubjectMetadata {
245    pub fn new(data: &Metadata) -> Self {
246        Self {
247            name: data.name.clone(),
248            description: data.description.clone(),
249            subject_id: data.subject_id.clone(),
250            owner: data.creator.clone(),
251            schema_id: data.schema_id.clone(),
252            new_owner: data.new_owner.clone(),
253            prev_ledger_event_hash: data.prev_ledger_event_hash.clone(),
254            creator: data.creator.clone(),
255            active: data.active,
256            sn: data.sn,
257        }
258    }
259}
260
261#[async_trait]
262pub trait Subject
263where
264    <Self as Actor>::Event: BorshSerialize + BorshDeserialize,
265    Self: PersistentActor,
266{
267    fn apply_patch_verify(
268        subject_properties: &mut ValueWrapper,
269        json_patch: ValueWrapper,
270    ) -> Result<(), SubjectError> {
271        let json_patch = serde_json::from_value::<Patch>(json_patch.0)
272            .map_err(|e| SubjectError::PatchConversionFailed {
273                details: e.to_string(),
274            })?;
275
276        patch(&mut subject_properties.0, &json_patch).map_err(|e| {
277            SubjectError::PatchApplicationFailed {
278                details: e.to_string(),
279            }
280        })?;
281
282        Ok(())
283    }
284
285    async fn verify_new_ledger_event(
286        ctx: &mut ActorContext<Self>,
287        new_ledger_event: &SignedLedger,
288        subject_metadata: Metadata,
289        actual_ledger_event_hash: DigestIdentifier,
290        last_data: LastData,
291        hash: &HashAlgorithm,
292    ) -> Result<bool, SubjectError> {
293        if !subject_metadata.active {
294            return Err(SubjectError::SubjectInactive);
295        }
296
297        if new_ledger_event.content().sn != subject_metadata.sn + 1 {
298            return Err(SubjectError::InvalidSequenceNumber {
299                expected: subject_metadata.sn + 1,
300                actual: new_ledger_event.content().sn,
301            });
302        }
303
304        if new_ledger_event.verify().is_err() {
305            return Err(SubjectError::SignatureVerificationFailed {
306                context: "new ledger event signature verification failed"
307                    .to_string(),
308            });
309        }
310
311        let signer = if let Some(new_owner) = &subject_metadata.new_owner {
312            new_owner.clone()
313        } else {
314            subject_metadata.owner.clone()
315        };
316
317        if new_ledger_event.signature().signer != signer {
318            return Err(SubjectError::IncorrectSigner {
319                expected: signer.to_string(),
320                actual: new_ledger_event.signature().signer.to_string(),
321            });
322        }
323
324        if new_ledger_event.content().event_request.verify().is_err() {
325            return Err(SubjectError::SignatureVerificationFailed {
326                context: "event request signature verification failed"
327                    .to_string(),
328            });
329        }
330
331        if actual_ledger_event_hash
332            != new_ledger_event.content().prev_ledger_event_hash
333        {
334            return Err(SubjectError::PreviousHashMismatch);
335        }
336
337        let mut modified_subject_metadata = subject_metadata.clone();
338        modified_subject_metadata.sn += 1;
339
340        let (validation, new_actual_protocols) = match (
341            new_ledger_event.content().event_request.content(),
342            &new_ledger_event.content().protocols,
343            subject_metadata.schema_id.is_gov(),
344        ) {
345            (
346                EventRequest::Fact(..),
347                Protocols::TrackerFact {
348                    evaluation,
349                    validation,
350                },
351                false,
352            ) => {
353                if modified_subject_metadata.new_owner.is_some() {
354                    return Err(SubjectError::UnexpectedFactEvent);
355                }
356
357                if let Some(eval) = evaluation.evaluator_res() {
358                    Self::apply_patch_verify(
359                        &mut modified_subject_metadata.properties,
360                        eval.patch,
361                    )?;
362                }
363                (
364                    validation,
365                    ActualProtocols::Eval {
366                        eval_data: evaluation.clone(),
367                    },
368                )
369            }
370            (
371                EventRequest::Fact(..),
372                Protocols::GovFact {
373                    evaluation,
374                    approval,
375                    validation,
376                },
377                true,
378            ) => {
379                if modified_subject_metadata.new_owner.is_some() {
380                    return Err(SubjectError::UnexpectedFactEvent);
381                }
382
383                let actual_protocols =
384                    if let Some(eval) = evaluation.evaluator_res() {
385                        if let Some(appr) = approval {
386                            if appr.approved {
387                                Self::apply_patch_verify(
388                                    &mut modified_subject_metadata.properties,
389                                    eval.patch,
390                                )?;
391                            }
392
393                            ActualProtocols::EvalApprove {
394                                eval_data: evaluation.clone(),
395                                approval_data: appr.clone(),
396                            }
397                        } else {
398                            return Err(
399                                SubjectError::MissingApprovalAfterEvaluation,
400                            );
401                        }
402                    } else if approval.is_some() {
403                        return Err(
404                        SubjectError::UnexpectedApprovalAfterFailedEvaluation,
405                    );
406                    } else {
407                        ActualProtocols::Eval {
408                            eval_data: evaluation.clone(),
409                        }
410                    };
411
412                (validation, actual_protocols)
413            }
414            (
415                EventRequest::Transfer(transfer),
416                Protocols::Transfer {
417                    evaluation,
418                    validation,
419                },
420                ..,
421            ) => {
422                if modified_subject_metadata.new_owner.is_some() {
423                    return Err(SubjectError::UnexpectedTransferEvent);
424                }
425
426                if let Some(eval) = evaluation.evaluator_res() {
427                    Self::apply_patch_verify(
428                        &mut modified_subject_metadata.properties,
429                        eval.patch,
430                    )?;
431                    modified_subject_metadata.new_owner =
432                        Some(transfer.new_owner.clone());
433                }
434
435                (
436                    validation,
437                    ActualProtocols::Eval {
438                        eval_data: evaluation.clone(),
439                    },
440                )
441            }
442            (
443                EventRequest::Confirm(..),
444                Protocols::TrackerConfirm { validation },
445                false,
446            ) => {
447                if let Some(new_owner) =
448                    &modified_subject_metadata.new_owner.take()
449                {
450                    modified_subject_metadata.owner = new_owner.clone();
451                } else {
452                    return Err(SubjectError::ConfirmWithoutNewOwner);
453                }
454
455                (validation, ActualProtocols::None)
456            }
457            (
458                EventRequest::Confirm(..),
459                Protocols::GovConfirm {
460                    evaluation,
461                    validation,
462                },
463                true,
464            ) => {
465                if let Some(eval) = evaluation.evaluator_res() {
466                    Self::apply_patch_verify(
467                        &mut modified_subject_metadata.properties,
468                        eval.patch,
469                    )?;
470
471                    if let Some(new_owner) =
472                        &modified_subject_metadata.new_owner.take()
473                    {
474                        modified_subject_metadata.owner = new_owner.clone();
475                    } else {
476                        return Err(SubjectError::ConfirmWithoutNewOwner);
477                    }
478                }
479
480                (
481                    validation,
482                    ActualProtocols::Eval {
483                        eval_data: evaluation.clone(),
484                    },
485                )
486            }
487            (
488                EventRequest::Reject(..),
489                Protocols::Reject { validation },
490                ..,
491            ) => {
492                if modified_subject_metadata.new_owner.take().is_none() {
493                    return Err(SubjectError::RejectWithoutNewOwner);
494                }
495
496                (validation, ActualProtocols::None)
497            }
498            (EventRequest::EOL(..), Protocols::EOL { validation }, ..) => {
499                if modified_subject_metadata.new_owner.is_some() {
500                    return Err(SubjectError::UnexpectedEOLEvent);
501                }
502
503                modified_subject_metadata.active = false;
504                (validation, ActualProtocols::None)
505            }
506            _ => {
507                return Err(SubjectError::EventProtocolMismatch);
508            }
509        };
510
511        if modified_subject_metadata.schema_id.is_gov()
512            && new_actual_protocols.is_success()
513        {
514            let mut gov_data = serde_json::from_value::<GovernanceData>(
515                modified_subject_metadata.properties.0,
516            )
517            .map_err(|e| {
518                SubjectError::GovernanceDataConversionFailed {
519                    details: e.to_string(),
520                }
521            })?;
522
523            gov_data.version += 1;
524            modified_subject_metadata.properties = gov_data.to_value_wrapper();
525        }
526
527        let validation_req = ValidationReq::Event {
528            actual_protocols: Box::new(new_actual_protocols),
529            event_request: new_ledger_event.content().event_request.clone(),
530            ledger_hash: actual_ledger_event_hash.clone(),
531            metadata: Box::new(subject_metadata.clone()),
532            last_data: Box::new(last_data),
533            gov_version: new_ledger_event.content().gov_version,
534            sn: new_ledger_event.content().sn,
535        };
536
537        let signed_validation_req = Signed::from_parts(
538            validation_req,
539            validation.validation_req_signature.clone(),
540        );
541
542        if signed_validation_req.verify().is_err() {
543            return Err(SubjectError::InvalidValidationRequestSignature);
544        }
545
546        let hash_signed_val_req =
547            hash_borsh(&*hash.hasher(), &signed_validation_req).map_err(
548                |e| SubjectError::ValidationRequestHashFailed {
549                    details: e.to_string(),
550                },
551            )?;
552
553        if hash_signed_val_req != validation.validation_req_hash {
554            return Err(SubjectError::ValidationRequestHashMismatch);
555        }
556
557        modified_subject_metadata.prev_ledger_event_hash =
558            actual_ledger_event_hash;
559
560        let modified_metadata_hash =
561            hash_borsh(&*hash.hasher(), &modified_subject_metadata).map_err(
562                |e| SubjectError::ModifiedMetadataHashFailed {
563                    details: e.to_string(),
564                },
565            )?;
566
567        let validation_res = ValidationRes::Response {
568            vali_req_hash: hash_signed_val_req,
569            modified_metadata_hash,
570        };
571
572        let role_data = get_validation_roles_register(
573            ctx,
574            &subject_metadata.governance_id,
575            SearchRole {
576                schema_id: subject_metadata.schema_id,
577                namespace: subject_metadata.namespace,
578            },
579            new_ledger_event.content().gov_version,
580        )
581        .await
582        .map_err(|e| SubjectError::ValidatorsRetrievalFailed {
583            details: e.to_string(),
584        })?;
585
586        if !check_quorum_signers(
587            &validation
588                .validators_signatures
589                .iter()
590                .map(|x| x.signer.clone())
591                .collect::<HashSet<PublicKey>>(),
592            &role_data.quorum,
593            &role_data.workers,
594        ) {
595            return Err(SubjectError::InvalidQuorum);
596        }
597
598        for signature in validation.validators_signatures.iter() {
599            let signed_res =
600                Signed::from_parts(validation_res.clone(), signature.clone());
601
602            if signed_res.verify().is_err() {
603                return Err(SubjectError::InvalidValidatorSignature);
604            }
605        }
606
607        Ok(new_ledger_event.content().protocols.is_success())
608    }
609
610    async fn verify_first_ledger_event(
611        ctx: &mut ActorContext<Self>,
612        ledger_event: &SignedLedger,
613        hash: &HashAlgorithm,
614        subject_metadata: Metadata,
615    ) -> Result<(), SubjectError> {
616        if ledger_event.verify().is_err() {
617            return Err(SubjectError::SignatureVerificationFailed {
618                context: "first ledger event signature verification failed"
619                    .to_string(),
620            });
621        }
622
623        if ledger_event.signature().signer != subject_metadata.owner {
624            return Err(SubjectError::IncorrectSigner {
625                expected: subject_metadata.owner.to_string(),
626                actual: ledger_event.signature().signer.to_string(),
627            });
628        }
629
630        if ledger_event.content().event_request.verify().is_err() {
631            return Err(SubjectError::SignatureVerificationFailed {
632                context: "event request signature verification failed"
633                    .to_string(),
634            });
635        }
636
637        if ledger_event.content().sn != 0 {
638            return Err(SubjectError::InvalidCreationSequenceNumber);
639        }
640
641        if !ledger_event.content().prev_ledger_event_hash.is_empty() {
642            return Err(SubjectError::NonEmptyPreviousHashInCreation);
643        }
644
645        let event_request_type = EventRequestType::from(
646            ledger_event.content().event_request.content(),
647        );
648
649        let validation =
650            match (event_request_type, &ledger_event.content().protocols) {
651                (
652                    EventRequestType::Create,
653                    Protocols::Create { validation },
654                ) => validation,
655                _ => {
656                    return Err(SubjectError::EventProtocolMismatch);
657                }
658            };
659
660        let ValidationMetadata::Metadata(metadata) =
661            &validation.validation_metadata
662        else {
663            return Err(SubjectError::InvalidValidationMetadata);
664        };
665
666        let validation_req = ValidationReq::Create {
667            event_request: ledger_event.content().event_request.clone(),
668            gov_version: ledger_event.content().gov_version,
669            subject_id: subject_metadata.subject_id.clone(),
670        };
671
672        let signed_validation_req = Signed::from_parts(
673            validation_req,
674            validation.validation_req_signature.clone(),
675        );
676
677        if signed_validation_req.verify().is_err() {
678            return Err(SubjectError::InvalidValidationRequestSignature);
679        }
680
681        let hash_signed_val_req =
682            hash_borsh(&*hash.hasher(), &signed_validation_req).map_err(
683                |e| SubjectError::ValidationRequestHashFailed {
684                    details: e.to_string(),
685                },
686            )?;
687
688        if hash_signed_val_req != validation.validation_req_hash {
689            return Err(SubjectError::ValidationRequestHashMismatch);
690        }
691
692        if metadata.deref() != &subject_metadata {
693            return Err(SubjectError::MetadataMismatch);
694        }
695
696        if metadata.schema_id == SchemaType::Governance {
697            serde_json::from_value::<GovernanceData>(
698                metadata.properties.0.clone(),
699            )
700            .map_err(|e| {
701                SubjectError::GovernancePropertiesConversionFailed {
702                    details: e.to_string(),
703                }
704            })?;
705        }
706
707        let validation_res = ValidationRes::Create {
708            vali_req_hash: hash_signed_val_req,
709            subject_metadata: Box::new(subject_metadata),
710        };
711
712        let role_data = match metadata.schema_id {
713            SchemaType::Governance => RoleDataRegister {
714                workers: HashSet::from([metadata.owner.clone()]),
715                quorum: Quorum::Majority,
716            },
717            SchemaType::Type(_) => get_validation_roles_register(
718                ctx,
719                &metadata.governance_id,
720                SearchRole {
721                    schema_id: metadata.schema_id.clone(),
722                    namespace: metadata.namespace.clone(),
723                },
724                ledger_event.content().gov_version,
725            )
726            .await
727            .map_err(|e| {
728                SubjectError::ValidatorsRetrievalFailed {
729                    details: e.to_string(),
730                }
731            })?,
732            SchemaType::TrackerSchemas => {
733                return Err(SubjectError::InvalidSchemaId);
734            }
735        };
736
737        if !check_quorum_signers(
738            &validation
739                .validators_signatures
740                .iter()
741                .map(|x| x.signer.clone())
742                .collect::<HashSet<PublicKey>>(),
743            &role_data.quorum,
744            &role_data.workers,
745        ) {
746            return Err(SubjectError::InvalidQuorum);
747        }
748
749        for signature in validation.validators_signatures.iter() {
750            let signed_res =
751                Signed::from_parts(validation_res.clone(), signature.clone());
752
753            if signed_res.verify().is_err() {
754                return Err(SubjectError::InvalidValidatorSignature);
755            }
756        }
757
758        Ok(())
759    }
760
761    async fn register(
762        ctx: &mut ActorContext<Self>,
763        message: RegisterMessage,
764    ) -> Result<(), ActorError> {
765        let register_path = ActorPath::from("/user/node/register");
766        match ctx.system().get_actor::<Register>(&register_path).await {
767            Ok(register) => {
768                register.tell(message.clone()).await?;
769
770                debug!(
771                    message = ?message,
772                    "Register message sent successfully"
773                );
774            }
775            Err(e) => {
776                error!(
777                    path = %register_path,
778                    "Register actor not found"
779                );
780                return Err(e);
781            }
782        };
783
784        Ok(())
785    }
786
787    async fn event_to_sink(
788        ctx: &mut ActorContext<Self>,
789        data: DataForSink,
790        event: &EventRequest,
791    ) -> Result<(), ActorError> {
792        let event = match (event, data.event_data_ledger) {
793            (
794                EventRequest::Create(..),
795                EventLedgerDataForSink::Create { state },
796            ) => DataToSinkEvent::Create {
797                governance_id: data.gov_id,
798                subject_id: data.subject_id,
799                owner: data.owner,
800                schema_id: data.schema_id,
801                namespace: data.namespace.to_string(),
802                sn: data.sn,
803                gov_version: data.gov_version,
804                state,
805            },
806            (
807                EventRequest::Fact(fact_request),
808                EventLedgerDataForSink::Fact { patch },
809            ) => DataToSinkEvent::Fact {
810                governance_id: data.gov_id,
811                subject_id: data.subject_id,
812                issuer: data.issuer.to_string(),
813                owner: data.owner,
814                payload: fact_request.payload.0.clone(),
815                schema_id: data.schema_id,
816                sn: data.sn,
817                gov_version: data.gov_version,
818                patch,
819            },
820            (
821                EventRequest::Transfer(transfer_request),
822                EventLedgerDataForSink::Other,
823            ) => DataToSinkEvent::Transfer {
824                governance_id: data.gov_id,
825                subject_id: data.subject_id,
826                owner: data.owner,
827                new_owner: transfer_request.new_owner.to_string(),
828                schema_id: data.schema_id,
829                sn: data.sn,
830                gov_version: data.gov_version,
831            },
832            (
833                EventRequest::Confirm(confirm_request),
834                EventLedgerDataForSink::Confirm { patch },
835            ) => DataToSinkEvent::Confirm {
836                governance_id: data.gov_id,
837                subject_id: data.subject_id,
838                schema_id: data.schema_id,
839                sn: data.sn,
840                gov_version: data.gov_version,
841                patch,
842                name_old_owner: confirm_request.name_old_owner.clone(),
843            },
844            (EventRequest::Reject(..), EventLedgerDataForSink::Other) => {
845                DataToSinkEvent::Reject {
846                    governance_id: data.gov_id,
847                    subject_id: data.subject_id,
848                    schema_id: data.schema_id,
849                    sn: data.sn,
850                    gov_version: data.gov_version,
851                }
852            }
853            (EventRequest::EOL(..), EventLedgerDataForSink::Other) => {
854                DataToSinkEvent::Eol {
855                    governance_id: data.gov_id,
856                    subject_id: data.subject_id,
857                    schema_id: data.schema_id,
858                    sn: data.sn,
859                    gov_version: data.gov_version,
860                }
861            }
862            _ => {
863                unreachable!(
864                    "EventLedgerDataForSink is created according to protocols and protocols according to EventRequest"
865                )
866            }
867        };
868
869        let msg = SinkDataMessage::Event {
870            event: Box::new(event),
871            event_request_timestamp: data.event_request_timestamp,
872            event_ledger_timestamp: data.event_ledger_timestamp,
873        };
874
875        Self::publish_sink(ctx, msg).await
876    }
877
878    async fn publish_sink(
879        ctx: &mut ActorContext<Self>,
880        message: SinkDataMessage,
881    ) -> Result<(), ActorError> {
882        let sink_data = ctx.get_child::<SinkData>("sink_data").await?;
883        let (subject_id, schema_id) = message.get_subject_schema();
884
885        sink_data.tell(message).await?;
886        debug!(
887            subject_id = %subject_id,
888            schema_id = %schema_id,
889            "Message published to sink successfully"
890        );
891
892        Ok(())
893    }
894
895    async fn get_ledger(
896        &self,
897        ctx: &mut ActorContext<Self>,
898        lo_sn: Option<u64>,
899        hi_sn: u64,
900    ) -> Result<(Vec<<Self as Actor>::Event>, bool), ActorError> {
901        if let Some(lo_sn) = lo_sn {
902            let actual_sn = lo_sn + 1;
903            if (hi_sn - actual_sn) > 99 {
904                Ok((get_n_events(ctx, actual_sn, 99).await?, false))
905            } else {
906                Ok((
907                    get_n_events(ctx, actual_sn, hi_sn - actual_sn).await?,
908                    true,
909                ))
910            }
911        } else if hi_sn > 99 {
912            Ok((get_n_events(ctx, 0, 99).await?, false))
913        } else {
914            Ok((get_n_events(ctx, 0, hi_sn).await?, true))
915        }
916    }
917
918    async fn update_sn(
919        &self,
920        ctx: &mut ActorContext<Self>,
921    ) -> Result<(), ActorError>;
922
923    async fn reject(
924        &self,
925        ctx: &mut ActorContext<Self>,
926        gov_version: u64,
927    ) -> Result<(), ActorError>;
928
929    async fn confirm(
930        &self,
931        ctx: &mut ActorContext<Self>,
932        new_owner: PublicKey,
933        gov_version: u64,
934    ) -> Result<(), ActorError>;
935
936    async fn transfer(
937        &self,
938        ctx: &mut ActorContext<Self>,
939        new_owner: PublicKey,
940        gov_version: u64,
941    ) -> Result<(), ActorError>;
942
943    async fn eol(&self, ctx: &mut ActorContext<Self>)
944    -> Result<(), ActorError>;
945
946    fn apply_patch(
947        &mut self,
948        json_patch: ValueWrapper,
949    ) -> Result<(), ActorError>;
950
951    async fn manager_new_ledger_events(
952        &mut self,
953        ctx: &mut ActorContext<Self>,
954        events: Vec<SignedLedger>,
955    ) -> Result<(), ActorError>;
956
957    async fn get_last_ledger(
958        &self,
959        ctx: &mut ActorContext<Self>,
960    ) -> Result<Option<SignedLedger>, ActorError>;
961}