1use std::{collections::HashSet, ops::Deref};
5
6use crate::{
7 governance::{
8 Governance,
9 data::GovernanceData,
10 model::Quorum,
11 role_register::{RoleDataRegister, SearchRole},
12 },
13 model::{
14 common::{
15 check_quorum_signers, get_n_events, get_validation_roles_register,
16 },
17 event::{Ledger, Protocols, ValidationMetadata},
18 },
19 node::register::{Register, RegisterMessage},
20 tracker::Tracker,
21 validation::{
22 request::{ActualProtocols, LastData, ValidationReq},
23 response::ValidationRes,
24 },
25};
26
27use error::SubjectError;
28
29use ave_actors::{
30 Actor, ActorContext, ActorError, ActorPath, Event, PersistentActor,
31};
32use ave_common::{
33 DataToSinkEvent, Namespace, SchemaType, ValueWrapper,
34 bridge::request::EventRequestType,
35 identity::{
36 DigestIdentifier, HashAlgorithm, PublicKey, Signed, hash_borsh,
37 },
38 request::EventRequest,
39};
40
41use async_trait::async_trait;
42use borsh::{BorshDeserialize, BorshSerialize};
43use json_patch::{Patch, patch};
44use serde::{Deserialize, Serialize};
45use serde_json::Value;
46use sinkdata::{SinkData, SinkDataMessage};
47use tracing::{debug, error};
48
49pub mod error;
50pub mod sinkdata;
51
52#[derive(
53 Debug, Clone, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
54)]
55pub struct SignedLedger(pub Signed<Ledger>);
56
57impl Deref for SignedLedger {
58 type Target = Signed<Ledger>;
59
60 fn deref(&self) -> &Self::Target {
61 &self.0
62 }
63}
64
65impl Event for SignedLedger {}
66
67#[derive(
68 Clone, Debug, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
69)]
70pub struct RequestSubjectData {
71 pub subject_id: DigestIdentifier,
72 pub governance_id: DigestIdentifier,
73 pub namespace: Namespace,
74 pub schema_id: SchemaType,
75 pub sn: u64,
76 pub gov_version: u64,
77 pub signer: PublicKey,
78}
79
80#[derive(
82 Debug,
83 Clone,
84 Serialize,
85 Deserialize,
86 BorshSerialize,
87 BorshDeserialize,
88 PartialEq,
89 Eq,
90 Hash,
91)]
92pub struct Metadata {
93 pub name: Option<String>,
94 pub description: Option<String>,
95 pub subject_id: DigestIdentifier,
97 pub governance_id: DigestIdentifier,
99 pub genesis_gov_version: u64,
100 pub prev_ledger_event_hash: DigestIdentifier,
101 pub schema_id: SchemaType,
103 pub namespace: Namespace,
105 pub sn: u64,
107 pub creator: PublicKey,
109 pub owner: PublicKey,
111 pub new_owner: Option<PublicKey>,
112 pub active: bool,
114 pub properties: ValueWrapper,
116}
117
118impl From<Governance> for Metadata {
119 fn from(value: Governance) -> Self {
120 Self {
121 name: value.subject_metadata.name,
122 description: value.subject_metadata.description,
123 subject_id: value.subject_metadata.subject_id.clone(),
124 governance_id: value.subject_metadata.subject_id,
125 genesis_gov_version: 0,
126 prev_ledger_event_hash: value
127 .subject_metadata
128 .prev_ledger_event_hash,
129 schema_id: value.subject_metadata.schema_id,
130 namespace: Namespace::new(),
131 sn: value.subject_metadata.sn,
132 creator: value.subject_metadata.creator,
133 owner: value.subject_metadata.owner,
134 new_owner: value.subject_metadata.new_owner,
135 active: value.subject_metadata.active,
136 properties: value.properties.to_value_wrapper(),
137 }
138 }
139}
140
141impl From<Tracker> for Metadata {
142 fn from(value: Tracker) -> Self {
143 Self {
144 name: value.subject_metadata.name,
145 description: value.subject_metadata.description,
146 subject_id: value.subject_metadata.subject_id,
147 governance_id: value.governance_id,
148 genesis_gov_version: value.genesis_gov_version,
149 prev_ledger_event_hash: value
150 .subject_metadata
151 .prev_ledger_event_hash,
152 schema_id: value.subject_metadata.schema_id,
153 namespace: value.namespace,
154 sn: value.subject_metadata.sn,
155 creator: value.subject_metadata.creator,
156 owner: value.subject_metadata.owner,
157 new_owner: value.subject_metadata.new_owner,
158 active: value.subject_metadata.active,
159 properties: value.properties,
160 }
161 }
162}
163
164pub struct DataForSink {
165 pub gov_id: Option<String>,
166 pub subject_id: String,
167 pub sn: u64,
168 pub owner: String,
169 pub namespace: String,
170 pub schema_id: SchemaType,
171 pub issuer: String,
172 pub event_request_timestamp: u64,
173 pub event_ledger_timestamp: u64,
174 pub gov_version: u64,
175 pub event_data_ledger: EventLedgerDataForSink,
176}
177
178pub enum EventLedgerDataForSink {
179 Create { state: Value },
180 Fact { patch: Value },
181 Confirm { patch: Option<Value> },
182 Other,
183}
184
185impl EventLedgerDataForSink {
186 pub fn build(protocols: &Protocols, state: &Value) -> Self {
187 match protocols {
188 Protocols::Create { .. } => Self::Create {
189 state: state.clone(),
190 },
191 Protocols::TrackerFact { evaluation, .. }
192 | Protocols::GovFact { evaluation, .. } => Self::Fact {
193 patch: evaluation
194 .evaluator_res()
195 .expect("event is valid")
196 .patch
197 .0,
198 },
199 Protocols::Transfer { .. }
200 | Protocols::Reject { .. }
201 | Protocols::EOL { .. } => Self::Other,
202 Protocols::TrackerConfirm { .. } => Self::Confirm { patch: None },
203 Protocols::GovConfirm { evaluation, .. } => Self::Confirm {
204 patch: Some(
205 evaluation.evaluator_res().expect("event is valid").patch.0,
206 ),
207 },
208 }
209 }
210}
211
212#[derive(
213 Default,
214 Debug,
215 Serialize,
216 Deserialize,
217 Clone,
218 BorshSerialize,
219 BorshDeserialize,
220)]
221pub struct SubjectMetadata {
222 pub name: Option<String>,
224 pub description: Option<String>,
226 pub subject_id: DigestIdentifier,
228
229 pub schema_id: SchemaType,
230 pub owner: PublicKey,
232 pub new_owner: Option<PublicKey>,
234
235 pub prev_ledger_event_hash: DigestIdentifier,
236 pub creator: PublicKey,
238 pub active: bool,
240 pub sn: u64,
242}
243
244impl SubjectMetadata {
245 pub fn new(data: &Metadata) -> Self {
246 Self {
247 name: data.name.clone(),
248 description: data.description.clone(),
249 subject_id: data.subject_id.clone(),
250 owner: data.creator.clone(),
251 schema_id: data.schema_id.clone(),
252 new_owner: data.new_owner.clone(),
253 prev_ledger_event_hash: data.prev_ledger_event_hash.clone(),
254 creator: data.creator.clone(),
255 active: data.active,
256 sn: data.sn,
257 }
258 }
259}
260
261#[async_trait]
262pub trait Subject
263where
264 <Self as Actor>::Event: BorshSerialize + BorshDeserialize,
265 Self: PersistentActor,
266{
267 fn apply_patch_verify(
268 subject_properties: &mut ValueWrapper,
269 json_patch: ValueWrapper,
270 ) -> Result<(), SubjectError> {
271 let json_patch = serde_json::from_value::<Patch>(json_patch.0)
272 .map_err(|e| SubjectError::PatchConversionFailed {
273 details: e.to_string(),
274 })?;
275
276 patch(&mut subject_properties.0, &json_patch).map_err(|e| {
277 SubjectError::PatchApplicationFailed {
278 details: e.to_string(),
279 }
280 })?;
281
282 Ok(())
283 }
284
285 async fn verify_new_ledger_event(
286 ctx: &mut ActorContext<Self>,
287 new_ledger_event: &SignedLedger,
288 subject_metadata: Metadata,
289 actual_ledger_event_hash: DigestIdentifier,
290 last_data: LastData,
291 hash: &HashAlgorithm,
292 ) -> Result<bool, SubjectError> {
293 if !subject_metadata.active {
294 return Err(SubjectError::SubjectInactive);
295 }
296
297 if new_ledger_event.content().sn != subject_metadata.sn + 1 {
298 return Err(SubjectError::InvalidSequenceNumber {
299 expected: subject_metadata.sn + 1,
300 actual: new_ledger_event.content().sn,
301 });
302 }
303
304 if new_ledger_event.verify().is_err() {
305 return Err(SubjectError::SignatureVerificationFailed {
306 context: "new ledger event signature verification failed"
307 .to_string(),
308 });
309 }
310
311 let signer = if let Some(new_owner) = &subject_metadata.new_owner {
312 new_owner.clone()
313 } else {
314 subject_metadata.owner.clone()
315 };
316
317 if new_ledger_event.signature().signer != signer {
318 return Err(SubjectError::IncorrectSigner {
319 expected: signer.to_string(),
320 actual: new_ledger_event.signature().signer.to_string(),
321 });
322 }
323
324 if new_ledger_event.content().event_request.verify().is_err() {
325 return Err(SubjectError::SignatureVerificationFailed {
326 context: "event request signature verification failed"
327 .to_string(),
328 });
329 }
330
331 if actual_ledger_event_hash
332 != new_ledger_event.content().prev_ledger_event_hash
333 {
334 return Err(SubjectError::PreviousHashMismatch);
335 }
336
337 let mut modified_subject_metadata = subject_metadata.clone();
338 modified_subject_metadata.sn += 1;
339
340 let (validation, new_actual_protocols) = match (
341 new_ledger_event.content().event_request.content(),
342 &new_ledger_event.content().protocols,
343 subject_metadata.schema_id.is_gov(),
344 ) {
345 (
346 EventRequest::Fact(..),
347 Protocols::TrackerFact {
348 evaluation,
349 validation,
350 },
351 false,
352 ) => {
353 if modified_subject_metadata.new_owner.is_some() {
354 return Err(SubjectError::UnexpectedFactEvent);
355 }
356
357 if let Some(eval) = evaluation.evaluator_res() {
358 Self::apply_patch_verify(
359 &mut modified_subject_metadata.properties,
360 eval.patch,
361 )?;
362 }
363 (
364 validation,
365 ActualProtocols::Eval {
366 eval_data: evaluation.clone(),
367 },
368 )
369 }
370 (
371 EventRequest::Fact(..),
372 Protocols::GovFact {
373 evaluation,
374 approval,
375 validation,
376 },
377 true,
378 ) => {
379 if modified_subject_metadata.new_owner.is_some() {
380 return Err(SubjectError::UnexpectedFactEvent);
381 }
382
383 let actual_protocols =
384 if let Some(eval) = evaluation.evaluator_res() {
385 if let Some(appr) = approval {
386 if appr.approved {
387 Self::apply_patch_verify(
388 &mut modified_subject_metadata.properties,
389 eval.patch,
390 )?;
391 }
392
393 ActualProtocols::EvalApprove {
394 eval_data: evaluation.clone(),
395 approval_data: appr.clone(),
396 }
397 } else {
398 return Err(
399 SubjectError::MissingApprovalAfterEvaluation,
400 );
401 }
402 } else if approval.is_some() {
403 return Err(
404 SubjectError::UnexpectedApprovalAfterFailedEvaluation,
405 );
406 } else {
407 ActualProtocols::Eval {
408 eval_data: evaluation.clone(),
409 }
410 };
411
412 (validation, actual_protocols)
413 }
414 (
415 EventRequest::Transfer(transfer),
416 Protocols::Transfer {
417 evaluation,
418 validation,
419 },
420 ..,
421 ) => {
422 if modified_subject_metadata.new_owner.is_some() {
423 return Err(SubjectError::UnexpectedTransferEvent);
424 }
425
426 if let Some(eval) = evaluation.evaluator_res() {
427 Self::apply_patch_verify(
428 &mut modified_subject_metadata.properties,
429 eval.patch,
430 )?;
431 modified_subject_metadata.new_owner =
432 Some(transfer.new_owner.clone());
433 }
434
435 (
436 validation,
437 ActualProtocols::Eval {
438 eval_data: evaluation.clone(),
439 },
440 )
441 }
442 (
443 EventRequest::Confirm(..),
444 Protocols::TrackerConfirm { validation },
445 false,
446 ) => {
447 if let Some(new_owner) =
448 &modified_subject_metadata.new_owner.take()
449 {
450 modified_subject_metadata.owner = new_owner.clone();
451 } else {
452 return Err(SubjectError::ConfirmWithoutNewOwner);
453 }
454
455 (validation, ActualProtocols::None)
456 }
457 (
458 EventRequest::Confirm(..),
459 Protocols::GovConfirm {
460 evaluation,
461 validation,
462 },
463 true,
464 ) => {
465 if let Some(eval) = evaluation.evaluator_res() {
466 Self::apply_patch_verify(
467 &mut modified_subject_metadata.properties,
468 eval.patch,
469 )?;
470
471 if let Some(new_owner) =
472 &modified_subject_metadata.new_owner.take()
473 {
474 modified_subject_metadata.owner = new_owner.clone();
475 } else {
476 return Err(SubjectError::ConfirmWithoutNewOwner);
477 }
478 }
479
480 (
481 validation,
482 ActualProtocols::Eval {
483 eval_data: evaluation.clone(),
484 },
485 )
486 }
487 (
488 EventRequest::Reject(..),
489 Protocols::Reject { validation },
490 ..,
491 ) => {
492 if modified_subject_metadata.new_owner.take().is_none() {
493 return Err(SubjectError::RejectWithoutNewOwner);
494 }
495
496 (validation, ActualProtocols::None)
497 }
498 (EventRequest::EOL(..), Protocols::EOL { validation }, ..) => {
499 if modified_subject_metadata.new_owner.is_some() {
500 return Err(SubjectError::UnexpectedEOLEvent);
501 }
502
503 modified_subject_metadata.active = false;
504 (validation, ActualProtocols::None)
505 }
506 _ => {
507 return Err(SubjectError::EventProtocolMismatch);
508 }
509 };
510
511 if modified_subject_metadata.schema_id.is_gov()
512 && new_actual_protocols.is_success()
513 {
514 let mut gov_data = serde_json::from_value::<GovernanceData>(
515 modified_subject_metadata.properties.0,
516 )
517 .map_err(|e| {
518 SubjectError::GovernanceDataConversionFailed {
519 details: e.to_string(),
520 }
521 })?;
522
523 gov_data.version += 1;
524 modified_subject_metadata.properties = gov_data.to_value_wrapper();
525 }
526
527 let validation_req = ValidationReq::Event {
528 actual_protocols: Box::new(new_actual_protocols),
529 event_request: new_ledger_event.content().event_request.clone(),
530 ledger_hash: actual_ledger_event_hash.clone(),
531 metadata: Box::new(subject_metadata.clone()),
532 last_data: Box::new(last_data),
533 gov_version: new_ledger_event.content().gov_version,
534 sn: new_ledger_event.content().sn,
535 };
536
537 let signed_validation_req = Signed::from_parts(
538 validation_req,
539 validation.validation_req_signature.clone(),
540 );
541
542 if signed_validation_req.verify().is_err() {
543 return Err(SubjectError::InvalidValidationRequestSignature);
544 }
545
546 let hash_signed_val_req =
547 hash_borsh(&*hash.hasher(), &signed_validation_req).map_err(
548 |e| SubjectError::ValidationRequestHashFailed {
549 details: e.to_string(),
550 },
551 )?;
552
553 if hash_signed_val_req != validation.validation_req_hash {
554 return Err(SubjectError::ValidationRequestHashMismatch);
555 }
556
557 modified_subject_metadata.prev_ledger_event_hash =
558 actual_ledger_event_hash;
559
560 let modified_metadata_hash =
561 hash_borsh(&*hash.hasher(), &modified_subject_metadata).map_err(
562 |e| SubjectError::ModifiedMetadataHashFailed {
563 details: e.to_string(),
564 },
565 )?;
566
567 let validation_res = ValidationRes::Response {
568 vali_req_hash: hash_signed_val_req,
569 modified_metadata_hash,
570 };
571
572 let role_data = get_validation_roles_register(
573 ctx,
574 &subject_metadata.governance_id,
575 SearchRole {
576 schema_id: subject_metadata.schema_id,
577 namespace: subject_metadata.namespace,
578 },
579 new_ledger_event.content().gov_version,
580 )
581 .await
582 .map_err(|e| SubjectError::ValidatorsRetrievalFailed {
583 details: e.to_string(),
584 })?;
585
586 if !check_quorum_signers(
587 &validation
588 .validators_signatures
589 .iter()
590 .map(|x| x.signer.clone())
591 .collect::<HashSet<PublicKey>>(),
592 &role_data.quorum,
593 &role_data.workers,
594 ) {
595 return Err(SubjectError::InvalidQuorum);
596 }
597
598 for signature in validation.validators_signatures.iter() {
599 let signed_res =
600 Signed::from_parts(validation_res.clone(), signature.clone());
601
602 if signed_res.verify().is_err() {
603 return Err(SubjectError::InvalidValidatorSignature);
604 }
605 }
606
607 Ok(new_ledger_event.content().protocols.is_success())
608 }
609
610 async fn verify_first_ledger_event(
611 ctx: &mut ActorContext<Self>,
612 ledger_event: &SignedLedger,
613 hash: &HashAlgorithm,
614 subject_metadata: Metadata,
615 ) -> Result<(), SubjectError> {
616 if ledger_event.verify().is_err() {
617 return Err(SubjectError::SignatureVerificationFailed {
618 context: "first ledger event signature verification failed"
619 .to_string(),
620 });
621 }
622
623 if ledger_event.signature().signer != subject_metadata.owner {
624 return Err(SubjectError::IncorrectSigner {
625 expected: subject_metadata.owner.to_string(),
626 actual: ledger_event.signature().signer.to_string(),
627 });
628 }
629
630 if ledger_event.content().event_request.verify().is_err() {
631 return Err(SubjectError::SignatureVerificationFailed {
632 context: "event request signature verification failed"
633 .to_string(),
634 });
635 }
636
637 if ledger_event.content().sn != 0 {
638 return Err(SubjectError::InvalidCreationSequenceNumber);
639 }
640
641 if !ledger_event.content().prev_ledger_event_hash.is_empty() {
642 return Err(SubjectError::NonEmptyPreviousHashInCreation);
643 }
644
645 let event_request_type = EventRequestType::from(
646 ledger_event.content().event_request.content(),
647 );
648
649 let validation =
650 match (event_request_type, &ledger_event.content().protocols) {
651 (
652 EventRequestType::Create,
653 Protocols::Create { validation },
654 ) => validation,
655 _ => {
656 return Err(SubjectError::EventProtocolMismatch);
657 }
658 };
659
660 let ValidationMetadata::Metadata(metadata) =
661 &validation.validation_metadata
662 else {
663 return Err(SubjectError::InvalidValidationMetadata);
664 };
665
666 let validation_req = ValidationReq::Create {
667 event_request: ledger_event.content().event_request.clone(),
668 gov_version: ledger_event.content().gov_version,
669 subject_id: subject_metadata.subject_id.clone(),
670 };
671
672 let signed_validation_req = Signed::from_parts(
673 validation_req,
674 validation.validation_req_signature.clone(),
675 );
676
677 if signed_validation_req.verify().is_err() {
678 return Err(SubjectError::InvalidValidationRequestSignature);
679 }
680
681 let hash_signed_val_req =
682 hash_borsh(&*hash.hasher(), &signed_validation_req).map_err(
683 |e| SubjectError::ValidationRequestHashFailed {
684 details: e.to_string(),
685 },
686 )?;
687
688 if hash_signed_val_req != validation.validation_req_hash {
689 return Err(SubjectError::ValidationRequestHashMismatch);
690 }
691
692 if metadata.deref() != &subject_metadata {
693 return Err(SubjectError::MetadataMismatch);
694 }
695
696 if metadata.schema_id == SchemaType::Governance {
697 serde_json::from_value::<GovernanceData>(
698 metadata.properties.0.clone(),
699 )
700 .map_err(|e| {
701 SubjectError::GovernancePropertiesConversionFailed {
702 details: e.to_string(),
703 }
704 })?;
705 }
706
707 let validation_res = ValidationRes::Create {
708 vali_req_hash: hash_signed_val_req,
709 subject_metadata: Box::new(subject_metadata),
710 };
711
712 let role_data = match metadata.schema_id {
713 SchemaType::Governance => RoleDataRegister {
714 workers: HashSet::from([metadata.owner.clone()]),
715 quorum: Quorum::Majority,
716 },
717 SchemaType::Type(_) => get_validation_roles_register(
718 ctx,
719 &metadata.governance_id,
720 SearchRole {
721 schema_id: metadata.schema_id.clone(),
722 namespace: metadata.namespace.clone(),
723 },
724 ledger_event.content().gov_version,
725 )
726 .await
727 .map_err(|e| {
728 SubjectError::ValidatorsRetrievalFailed {
729 details: e.to_string(),
730 }
731 })?,
732 SchemaType::TrackerSchemas => {
733 return Err(SubjectError::InvalidSchemaId);
734 }
735 };
736
737 if !check_quorum_signers(
738 &validation
739 .validators_signatures
740 .iter()
741 .map(|x| x.signer.clone())
742 .collect::<HashSet<PublicKey>>(),
743 &role_data.quorum,
744 &role_data.workers,
745 ) {
746 return Err(SubjectError::InvalidQuorum);
747 }
748
749 for signature in validation.validators_signatures.iter() {
750 let signed_res =
751 Signed::from_parts(validation_res.clone(), signature.clone());
752
753 if signed_res.verify().is_err() {
754 return Err(SubjectError::InvalidValidatorSignature);
755 }
756 }
757
758 Ok(())
759 }
760
761 async fn register(
762 ctx: &mut ActorContext<Self>,
763 message: RegisterMessage,
764 ) -> Result<(), ActorError> {
765 let register_path = ActorPath::from("/user/node/register");
766 match ctx.system().get_actor::<Register>(®ister_path).await {
767 Ok(register) => {
768 register.tell(message.clone()).await?;
769
770 debug!(
771 message = ?message,
772 "Register message sent successfully"
773 );
774 }
775 Err(e) => {
776 error!(
777 path = %register_path,
778 "Register actor not found"
779 );
780 return Err(e);
781 }
782 };
783
784 Ok(())
785 }
786
787 async fn event_to_sink(
788 ctx: &mut ActorContext<Self>,
789 data: DataForSink,
790 event: &EventRequest,
791 ) -> Result<(), ActorError> {
792 let event = match (event, data.event_data_ledger) {
793 (
794 EventRequest::Create(..),
795 EventLedgerDataForSink::Create { state },
796 ) => DataToSinkEvent::Create {
797 governance_id: data.gov_id,
798 subject_id: data.subject_id,
799 owner: data.owner,
800 schema_id: data.schema_id,
801 namespace: data.namespace.to_string(),
802 sn: data.sn,
803 gov_version: data.gov_version,
804 state,
805 },
806 (
807 EventRequest::Fact(fact_request),
808 EventLedgerDataForSink::Fact { patch },
809 ) => DataToSinkEvent::Fact {
810 governance_id: data.gov_id,
811 subject_id: data.subject_id,
812 issuer: data.issuer.to_string(),
813 owner: data.owner,
814 payload: fact_request.payload.0.clone(),
815 schema_id: data.schema_id,
816 sn: data.sn,
817 gov_version: data.gov_version,
818 patch,
819 },
820 (
821 EventRequest::Transfer(transfer_request),
822 EventLedgerDataForSink::Other,
823 ) => DataToSinkEvent::Transfer {
824 governance_id: data.gov_id,
825 subject_id: data.subject_id,
826 owner: data.owner,
827 new_owner: transfer_request.new_owner.to_string(),
828 schema_id: data.schema_id,
829 sn: data.sn,
830 gov_version: data.gov_version,
831 },
832 (
833 EventRequest::Confirm(confirm_request),
834 EventLedgerDataForSink::Confirm { patch },
835 ) => DataToSinkEvent::Confirm {
836 governance_id: data.gov_id,
837 subject_id: data.subject_id,
838 schema_id: data.schema_id,
839 sn: data.sn,
840 gov_version: data.gov_version,
841 patch,
842 name_old_owner: confirm_request.name_old_owner.clone(),
843 },
844 (EventRequest::Reject(..), EventLedgerDataForSink::Other) => {
845 DataToSinkEvent::Reject {
846 governance_id: data.gov_id,
847 subject_id: data.subject_id,
848 schema_id: data.schema_id,
849 sn: data.sn,
850 gov_version: data.gov_version,
851 }
852 }
853 (EventRequest::EOL(..), EventLedgerDataForSink::Other) => {
854 DataToSinkEvent::Eol {
855 governance_id: data.gov_id,
856 subject_id: data.subject_id,
857 schema_id: data.schema_id,
858 sn: data.sn,
859 gov_version: data.gov_version,
860 }
861 }
862 _ => {
863 unreachable!(
864 "EventLedgerDataForSink is created according to protocols and protocols according to EventRequest"
865 )
866 }
867 };
868
869 let msg = SinkDataMessage::Event {
870 event: Box::new(event),
871 event_request_timestamp: data.event_request_timestamp,
872 event_ledger_timestamp: data.event_ledger_timestamp,
873 };
874
875 Self::publish_sink(ctx, msg).await
876 }
877
878 async fn publish_sink(
879 ctx: &mut ActorContext<Self>,
880 message: SinkDataMessage,
881 ) -> Result<(), ActorError> {
882 let sink_data = ctx.get_child::<SinkData>("sink_data").await?;
883 let (subject_id, schema_id) = message.get_subject_schema();
884
885 sink_data.tell(message).await?;
886 debug!(
887 subject_id = %subject_id,
888 schema_id = %schema_id,
889 "Message published to sink successfully"
890 );
891
892 Ok(())
893 }
894
895 async fn get_ledger(
896 &self,
897 ctx: &mut ActorContext<Self>,
898 lo_sn: Option<u64>,
899 hi_sn: u64,
900 ) -> Result<(Vec<<Self as Actor>::Event>, bool), ActorError> {
901 if let Some(lo_sn) = lo_sn {
902 let actual_sn = lo_sn + 1;
903 if (hi_sn - actual_sn) > 99 {
904 Ok((get_n_events(ctx, actual_sn, 99).await?, false))
905 } else {
906 Ok((
907 get_n_events(ctx, actual_sn, hi_sn - actual_sn).await?,
908 true,
909 ))
910 }
911 } else if hi_sn > 99 {
912 Ok((get_n_events(ctx, 0, 99).await?, false))
913 } else {
914 Ok((get_n_events(ctx, 0, hi_sn).await?, true))
915 }
916 }
917
918 async fn update_sn(
919 &self,
920 ctx: &mut ActorContext<Self>,
921 ) -> Result<(), ActorError>;
922
923 async fn reject(
924 &self,
925 ctx: &mut ActorContext<Self>,
926 gov_version: u64,
927 ) -> Result<(), ActorError>;
928
929 async fn confirm(
930 &self,
931 ctx: &mut ActorContext<Self>,
932 new_owner: PublicKey,
933 gov_version: u64,
934 ) -> Result<(), ActorError>;
935
936 async fn transfer(
937 &self,
938 ctx: &mut ActorContext<Self>,
939 new_owner: PublicKey,
940 gov_version: u64,
941 ) -> Result<(), ActorError>;
942
943 async fn eol(&self, ctx: &mut ActorContext<Self>)
944 -> Result<(), ActorError>;
945
946 fn apply_patch(
947 &mut self,
948 json_patch: ValueWrapper,
949 ) -> Result<(), ActorError>;
950
951 async fn manager_new_ledger_events(
952 &mut self,
953 ctx: &mut ActorContext<Self>,
954 events: Vec<SignedLedger>,
955 ) -> Result<(), ActorError>;
956
957 async fn get_last_ledger(
958 &self,
959 ctx: &mut ActorContext<Self>,
960 ) -> Result<Option<SignedLedger>, ActorError>;
961}