1use std::{collections::HashSet, ops::Deref};
5
6use crate::{
7 governance::{
8 Governance,
9 data::GovernanceData,
10 model::Quorum,
11 role_register::{RoleDataRegister, SearchRole},
12 },
13 model::{
14 common::{
15 check_quorum_signers, get_n_events, get_validation_roles_register,
16 },
17 event::{Ledger, Protocols, ValidationMetadata},
18 },
19 node::register::{Register, RegisterMessage},
20 tracker::Tracker,
21 validation::{
22 request::{ActualProtocols, LastData, ValidationReq},
23 response::ValidationRes,
24 },
25};
26
27use error::SubjectError;
28
29use ave_actors::{
30 Actor, ActorContext, ActorError, ActorPath, Event, PersistentActor,
31};
32use ave_common::{
33 DataToSink, DataToSinkEvent, Namespace, SchemaType, ValueWrapper,
34 bridge::request::EventRequestType,
35 identity::{
36 DigestIdentifier, HashAlgorithm, PublicKey, Signed, hash_borsh,
37 },
38 request::EventRequest,
39 response::SinkEventsPage,
40};
41
42use async_trait::async_trait;
43use borsh::{BorshDeserialize, BorshSerialize};
44use json_patch::{Patch, patch};
45use serde::{Deserialize, Serialize};
46use serde_json::Value;
47use sinkdata::{SinkData, SinkDataMessage};
48use tracing::{debug, error};
49
50pub mod error;
51pub mod sinkdata;
52
53#[derive(
54 Debug, Clone, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
55)]
56pub struct SignedLedger(pub Signed<Ledger>);
57
58impl Deref for SignedLedger {
59 type Target = Signed<Ledger>;
60
61 fn deref(&self) -> &Self::Target {
62 &self.0
63 }
64}
65
66impl Event for SignedLedger {}
67
68#[derive(
69 Clone, Debug, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
70)]
71pub struct RequestSubjectData {
72 pub subject_id: DigestIdentifier,
73 pub governance_id: DigestIdentifier,
74 pub namespace: Namespace,
75 pub schema_id: SchemaType,
76 pub sn: u64,
77 pub gov_version: u64,
78 pub signer: PublicKey,
79}
80
81#[derive(
83 Debug,
84 Clone,
85 Serialize,
86 Deserialize,
87 BorshSerialize,
88 BorshDeserialize,
89 PartialEq,
90 Eq,
91 Hash,
92)]
93pub struct Metadata {
94 pub name: Option<String>,
95 pub description: Option<String>,
96 pub subject_id: DigestIdentifier,
98 pub governance_id: DigestIdentifier,
100 pub genesis_gov_version: u64,
101 pub prev_ledger_event_hash: DigestIdentifier,
102 pub schema_id: SchemaType,
104 pub namespace: Namespace,
106 pub sn: u64,
108 pub creator: PublicKey,
110 pub owner: PublicKey,
112 pub new_owner: Option<PublicKey>,
113 pub active: bool,
115 pub properties: ValueWrapper,
117}
118
119impl From<Governance> for Metadata {
120 fn from(value: Governance) -> Self {
121 Self {
122 name: value.subject_metadata.name,
123 description: value.subject_metadata.description,
124 subject_id: value.subject_metadata.subject_id.clone(),
125 governance_id: value.subject_metadata.subject_id,
126 genesis_gov_version: 0,
127 prev_ledger_event_hash: value
128 .subject_metadata
129 .prev_ledger_event_hash,
130 schema_id: value.subject_metadata.schema_id,
131 namespace: Namespace::new(),
132 sn: value.subject_metadata.sn,
133 creator: value.subject_metadata.creator,
134 owner: value.subject_metadata.owner,
135 new_owner: value.subject_metadata.new_owner,
136 active: value.subject_metadata.active,
137 properties: value.properties.to_value_wrapper(),
138 }
139 }
140}
141
142impl From<Tracker> for Metadata {
143 fn from(value: Tracker) -> Self {
144 Self {
145 name: value.subject_metadata.name,
146 description: value.subject_metadata.description,
147 subject_id: value.subject_metadata.subject_id,
148 governance_id: value.governance_id,
149 genesis_gov_version: value.genesis_gov_version,
150 prev_ledger_event_hash: value
151 .subject_metadata
152 .prev_ledger_event_hash,
153 schema_id: value.subject_metadata.schema_id,
154 namespace: value.namespace,
155 sn: value.subject_metadata.sn,
156 creator: value.subject_metadata.creator,
157 owner: value.subject_metadata.owner,
158 new_owner: value.subject_metadata.new_owner,
159 active: value.subject_metadata.active,
160 properties: value.properties,
161 }
162 }
163}
164
165#[derive(Clone)]
166pub struct DataForSink {
167 pub gov_id: Option<String>,
168 pub subject_id: String,
169 pub sn: u64,
170 pub owner: String,
171 pub namespace: String,
172 pub schema_id: SchemaType,
173 pub issuer: String,
174 pub event_request_timestamp: u64,
175 pub event_ledger_timestamp: u64,
176 pub gov_version: u64,
177 pub event_data_ledger: EventLedgerDataForSink,
178}
179
180#[derive(Clone, Debug)]
181struct SinkReplayState {
182 governance_id: Option<String>,
183 subject_id: String,
184 owner: String,
185 new_owner: Option<String>,
186 namespace: String,
187 schema_id: SchemaType,
188}
189
190impl SinkReplayState {
191 fn from_metadata(metadata: &Metadata) -> Self {
192 Self {
193 governance_id: if metadata.schema_id.is_gov() {
194 None
195 } else {
196 Some(metadata.governance_id.to_string())
197 },
198 subject_id: metadata.subject_id.to_string(),
199 owner: metadata.owner.to_string(),
200 new_owner: metadata.new_owner.as_ref().map(ToString::to_string),
201 namespace: metadata.namespace.to_string(),
202 schema_id: metadata.schema_id.clone(),
203 }
204 }
205
206 fn data_for_sink(
207 &self,
208 event: &SignedLedger,
209 event_data_ledger: EventLedgerDataForSink,
210 ) -> DataForSink {
211 DataForSink {
212 gov_id: self.governance_id.clone(),
213 subject_id: self.subject_id.clone(),
214 sn: event.content().sn,
215 owner: self.owner.clone(),
216 namespace: self.namespace.clone(),
217 schema_id: self.schema_id.clone(),
218 issuer: event
219 .content()
220 .event_request
221 .signature()
222 .signer
223 .to_string(),
224 event_request_timestamp: event
225 .content()
226 .event_request
227 .signature()
228 .timestamp
229 .as_nanos(),
230 event_ledger_timestamp: event.signature().timestamp.as_nanos(),
231 gov_version: event.content().gov_version,
232 event_data_ledger,
233 }
234 }
235
236 fn apply_success(
237 &mut self,
238 event: &SignedLedger,
239 ) -> Result<(), ActorError> {
240 match event.content().event_request.content() {
241 EventRequest::Create(..) | EventRequest::Fact(..) => Ok(()),
242 EventRequest::Transfer(transfer_request) => {
243 self.new_owner = Some(transfer_request.new_owner.to_string());
244 Ok(())
245 }
246 EventRequest::Confirm(..) => {
247 let Some(new_owner) = self.new_owner.take() else {
248 return Err(ActorError::Functional {
249 description:
250 "Replay confirm event without pending new owner"
251 .to_owned(),
252 });
253 };
254 self.owner = new_owner;
255 Ok(())
256 }
257 EventRequest::Reject(..) => {
258 self.new_owner = None;
259 Ok(())
260 }
261 EventRequest::EOL(..) => Ok(()),
262 }
263 }
264}
265
266#[derive(Clone)]
267pub enum EventLedgerDataForSink {
268 Create { state: Value },
269 Fact { patch: Value },
270 Confirm { patch: Option<Value> },
271 Other,
272}
273
274impl EventLedgerDataForSink {
275 pub fn build(protocols: &Protocols, state: &Value) -> Self {
276 match protocols {
277 Protocols::Create { .. } => Self::Create {
278 state: state.clone(),
279 },
280 Protocols::TrackerFact { evaluation, .. }
281 | Protocols::GovFact { evaluation, .. } => Self::Fact {
282 patch: evaluation
283 .evaluator_res()
284 .expect("event is valid")
285 .patch
286 .0,
287 },
288 Protocols::Transfer { .. }
289 | Protocols::Reject { .. }
290 | Protocols::EOL { .. } => Self::Other,
291 Protocols::TrackerConfirm { .. } => Self::Confirm { patch: None },
292 Protocols::GovConfirm { evaluation, .. } => Self::Confirm {
293 patch: Some(
294 evaluation.evaluator_res().expect("event is valid").patch.0,
295 ),
296 },
297 }
298 }
299}
300
301fn data_to_sink_event(
302 data: DataForSink,
303 event: &EventRequest,
304) -> DataToSinkEvent {
305 match (event, data.event_data_ledger) {
306 (
307 EventRequest::Create(..),
308 EventLedgerDataForSink::Create { state },
309 ) => DataToSinkEvent::Create {
310 governance_id: data.gov_id,
311 subject_id: data.subject_id,
312 owner: data.owner,
313 schema_id: data.schema_id,
314 namespace: data.namespace.to_string(),
315 sn: data.sn,
316 gov_version: data.gov_version,
317 state,
318 },
319 (
320 EventRequest::Fact(fact_request),
321 EventLedgerDataForSink::Fact { patch },
322 ) => DataToSinkEvent::Fact {
323 governance_id: data.gov_id,
324 subject_id: data.subject_id,
325 issuer: data.issuer.to_string(),
326 owner: data.owner,
327 payload: fact_request.payload.0.clone(),
328 schema_id: data.schema_id,
329 sn: data.sn,
330 gov_version: data.gov_version,
331 patch,
332 },
333 (
334 EventRequest::Transfer(transfer_request),
335 EventLedgerDataForSink::Other,
336 ) => DataToSinkEvent::Transfer {
337 governance_id: data.gov_id,
338 subject_id: data.subject_id,
339 owner: data.owner,
340 new_owner: transfer_request.new_owner.to_string(),
341 schema_id: data.schema_id,
342 sn: data.sn,
343 gov_version: data.gov_version,
344 },
345 (
346 EventRequest::Confirm(confirm_request),
347 EventLedgerDataForSink::Confirm { patch },
348 ) => DataToSinkEvent::Confirm {
349 governance_id: data.gov_id,
350 subject_id: data.subject_id,
351 schema_id: data.schema_id,
352 sn: data.sn,
353 gov_version: data.gov_version,
354 patch,
355 name_old_owner: confirm_request.name_old_owner.clone(),
356 },
357 (EventRequest::Reject(..), EventLedgerDataForSink::Other) => {
358 DataToSinkEvent::Reject {
359 governance_id: data.gov_id,
360 subject_id: data.subject_id,
361 schema_id: data.schema_id,
362 sn: data.sn,
363 gov_version: data.gov_version,
364 }
365 }
366 (EventRequest::EOL(..), EventLedgerDataForSink::Other) => {
367 DataToSinkEvent::Eol {
368 governance_id: data.gov_id,
369 subject_id: data.subject_id,
370 schema_id: data.schema_id,
371 sn: data.sn,
372 gov_version: data.gov_version,
373 }
374 }
375 _ => {
376 unreachable!(
377 "EventLedgerDataForSink is created according to protocols and protocols according to EventRequest"
378 )
379 }
380 }
381}
382
383pub fn build_data_to_sink(
384 data: DataForSink,
385 event: &EventRequest,
386 public_key: &str,
387 sink_timestamp: u64,
388) -> DataToSink {
389 DataToSink {
390 event: data_to_sink_event(data.clone(), event),
391 public_key: public_key.to_owned(),
392 event_request_timestamp: data.event_request_timestamp,
393 event_ledger_timestamp: data.event_ledger_timestamp,
394 sink_timestamp,
395 }
396}
397
398pub fn replay_sink_events(
399 ledgers: &[SignedLedger],
400 public_key: &str,
401 from_sn: u64,
402 to_sn: Option<u64>,
403 limit: u64,
404 sink_timestamp: u64,
405) -> Result<SinkEventsPage, ActorError> {
406 if limit == 0 {
407 return Err(ActorError::Functional {
408 description: "Replay limit must be greater than zero".to_owned(),
409 });
410 }
411
412 let mut replay_state: Option<SinkReplayState> = None;
413 let mut events = Vec::new();
414 let mut next_sn = None;
415 let upper_bound = to_sn.unwrap_or(u64::MAX);
416
417 for ledger in ledgers {
418 if replay_state.is_none() {
419 let metadata =
420 ledger.content().get_create_metadata().map_err(|e| {
421 ActorError::Functional {
422 description: e.to_string(),
423 }
424 })?;
425 replay_state = Some(SinkReplayState::from_metadata(&metadata));
426 }
427
428 let Some(state) = replay_state.as_mut() else {
429 unreachable!("replay state is initialized above");
430 };
431
432 let sn = ledger.content().sn;
433 if sn > upper_bound {
434 break;
435 }
436
437 let is_success = ledger.content().protocols.is_success();
438 if is_success && sn >= from_sn {
439 if events.len() as u64 >= limit {
440 next_sn = Some(sn);
441 break;
442 }
443
444 let event_data_ledger =
445 match ledger.content().event_request.content() {
446 EventRequest::Create(..) => {
447 let metadata = ledger
448 .content()
449 .get_create_metadata()
450 .map_err(|e| ActorError::Functional {
451 description: e.to_string(),
452 })?;
453 EventLedgerDataForSink::Create {
454 state: metadata.properties.0,
455 }
456 }
457 EventRequest::Fact(..)
458 | EventRequest::Transfer(..)
459 | EventRequest::Confirm(..)
460 | EventRequest::Reject(..)
461 | EventRequest::EOL(..) => EventLedgerDataForSink::build(
462 &ledger.content().protocols,
463 &Value::Null,
464 ),
465 };
466
467 let data = state.data_for_sink(ledger, event_data_ledger);
468 events.push(build_data_to_sink(
469 data,
470 ledger.content().event_request.content(),
471 public_key,
472 sink_timestamp,
473 ));
474 }
475
476 if is_success {
477 state.apply_success(ledger)?;
478 }
479 }
480
481 Ok(SinkEventsPage {
482 from_sn,
483 to_sn,
484 limit,
485 next_sn,
486 has_more: next_sn.is_some(),
487 events,
488 })
489}
490
491#[derive(
492 Default,
493 Debug,
494 Serialize,
495 Deserialize,
496 Clone,
497 BorshSerialize,
498 BorshDeserialize,
499)]
500pub struct SubjectMetadata {
501 pub name: Option<String>,
503 pub description: Option<String>,
505 pub subject_id: DigestIdentifier,
507
508 pub schema_id: SchemaType,
509 pub owner: PublicKey,
511 pub new_owner: Option<PublicKey>,
513
514 pub prev_ledger_event_hash: DigestIdentifier,
515 pub creator: PublicKey,
517 pub active: bool,
519 pub sn: u64,
521}
522
523impl SubjectMetadata {
524 pub fn new(data: &Metadata) -> Self {
525 Self {
526 name: data.name.clone(),
527 description: data.description.clone(),
528 subject_id: data.subject_id.clone(),
529 owner: data.creator.clone(),
530 schema_id: data.schema_id.clone(),
531 new_owner: data.new_owner.clone(),
532 prev_ledger_event_hash: data.prev_ledger_event_hash.clone(),
533 creator: data.creator.clone(),
534 active: data.active,
535 sn: data.sn,
536 }
537 }
538}
539
540#[async_trait]
541pub trait Subject
542where
543 <Self as Actor>::Event: BorshSerialize + BorshDeserialize,
544 Self: PersistentActor,
545{
546 fn apply_patch_verify(
547 subject_properties: &mut ValueWrapper,
548 json_patch: ValueWrapper,
549 ) -> Result<(), SubjectError> {
550 let json_patch = serde_json::from_value::<Patch>(json_patch.0)
551 .map_err(|e| SubjectError::PatchConversionFailed {
552 details: e.to_string(),
553 })?;
554
555 patch(&mut subject_properties.0, &json_patch).map_err(|e| {
556 SubjectError::PatchApplicationFailed {
557 details: e.to_string(),
558 }
559 })?;
560
561 Ok(())
562 }
563
564 async fn verify_new_ledger_event(
565 ctx: &mut ActorContext<Self>,
566 new_ledger_event: &SignedLedger,
567 subject_metadata: Metadata,
568 actual_ledger_event_hash: DigestIdentifier,
569 last_data: LastData,
570 hash: &HashAlgorithm,
571 ) -> Result<bool, SubjectError> {
572 if !subject_metadata.active {
573 return Err(SubjectError::SubjectInactive);
574 }
575
576 if new_ledger_event.content().sn != subject_metadata.sn + 1 {
577 return Err(SubjectError::InvalidSequenceNumber {
578 expected: subject_metadata.sn + 1,
579 actual: new_ledger_event.content().sn,
580 });
581 }
582
583 if new_ledger_event.verify().is_err() {
584 return Err(SubjectError::SignatureVerificationFailed {
585 context: "new ledger event signature verification failed"
586 .to_string(),
587 });
588 }
589
590 let signer = if let Some(new_owner) = &subject_metadata.new_owner {
591 new_owner.clone()
592 } else {
593 subject_metadata.owner.clone()
594 };
595
596 if new_ledger_event.signature().signer != signer {
597 return Err(SubjectError::IncorrectSigner {
598 expected: signer.to_string(),
599 actual: new_ledger_event.signature().signer.to_string(),
600 });
601 }
602
603 if new_ledger_event.content().event_request.verify().is_err() {
604 return Err(SubjectError::SignatureVerificationFailed {
605 context: "event request signature verification failed"
606 .to_string(),
607 });
608 }
609
610 if actual_ledger_event_hash
611 != new_ledger_event.content().prev_ledger_event_hash
612 {
613 return Err(SubjectError::PreviousHashMismatch);
614 }
615
616 let mut modified_subject_metadata = subject_metadata.clone();
617 modified_subject_metadata.sn += 1;
618
619 let (validation, new_actual_protocols) = match (
620 new_ledger_event.content().event_request.content(),
621 &new_ledger_event.content().protocols,
622 subject_metadata.schema_id.is_gov(),
623 ) {
624 (
625 EventRequest::Fact(..),
626 Protocols::TrackerFact {
627 evaluation,
628 validation,
629 },
630 false,
631 ) => {
632 if modified_subject_metadata.new_owner.is_some() {
633 return Err(SubjectError::UnexpectedFactEvent);
634 }
635
636 if let Some(eval) = evaluation.evaluator_res() {
637 Self::apply_patch_verify(
638 &mut modified_subject_metadata.properties,
639 eval.patch,
640 )?;
641 }
642 (
643 validation,
644 ActualProtocols::Eval {
645 eval_data: evaluation.clone(),
646 },
647 )
648 }
649 (
650 EventRequest::Fact(..),
651 Protocols::GovFact {
652 evaluation,
653 approval,
654 validation,
655 },
656 true,
657 ) => {
658 if modified_subject_metadata.new_owner.is_some() {
659 return Err(SubjectError::UnexpectedFactEvent);
660 }
661
662 let actual_protocols =
663 if let Some(eval) = evaluation.evaluator_res() {
664 if let Some(appr) = approval {
665 if appr.approved {
666 Self::apply_patch_verify(
667 &mut modified_subject_metadata.properties,
668 eval.patch,
669 )?;
670 }
671
672 ActualProtocols::EvalApprove {
673 eval_data: evaluation.clone(),
674 approval_data: appr.clone(),
675 }
676 } else {
677 return Err(
678 SubjectError::MissingApprovalAfterEvaluation,
679 );
680 }
681 } else if approval.is_some() {
682 return Err(
683 SubjectError::UnexpectedApprovalAfterFailedEvaluation,
684 );
685 } else {
686 ActualProtocols::Eval {
687 eval_data: evaluation.clone(),
688 }
689 };
690
691 (validation, actual_protocols)
692 }
693 (
694 EventRequest::Transfer(transfer),
695 Protocols::Transfer {
696 evaluation,
697 validation,
698 },
699 ..,
700 ) => {
701 if modified_subject_metadata.new_owner.is_some() {
702 return Err(SubjectError::UnexpectedTransferEvent);
703 }
704
705 if let Some(eval) = evaluation.evaluator_res() {
706 Self::apply_patch_verify(
707 &mut modified_subject_metadata.properties,
708 eval.patch,
709 )?;
710 modified_subject_metadata.new_owner =
711 Some(transfer.new_owner.clone());
712 }
713
714 (
715 validation,
716 ActualProtocols::Eval {
717 eval_data: evaluation.clone(),
718 },
719 )
720 }
721 (
722 EventRequest::Confirm(..),
723 Protocols::TrackerConfirm { validation },
724 false,
725 ) => {
726 if let Some(new_owner) =
727 &modified_subject_metadata.new_owner.take()
728 {
729 modified_subject_metadata.owner = new_owner.clone();
730 } else {
731 return Err(SubjectError::ConfirmWithoutNewOwner);
732 }
733
734 (validation, ActualProtocols::None)
735 }
736 (
737 EventRequest::Confirm(..),
738 Protocols::GovConfirm {
739 evaluation,
740 validation,
741 },
742 true,
743 ) => {
744 if let Some(eval) = evaluation.evaluator_res() {
745 Self::apply_patch_verify(
746 &mut modified_subject_metadata.properties,
747 eval.patch,
748 )?;
749
750 if let Some(new_owner) =
751 &modified_subject_metadata.new_owner.take()
752 {
753 modified_subject_metadata.owner = new_owner.clone();
754 } else {
755 return Err(SubjectError::ConfirmWithoutNewOwner);
756 }
757 }
758
759 (
760 validation,
761 ActualProtocols::Eval {
762 eval_data: evaluation.clone(),
763 },
764 )
765 }
766 (
767 EventRequest::Reject(..),
768 Protocols::Reject { validation },
769 ..,
770 ) => {
771 if modified_subject_metadata.new_owner.take().is_none() {
772 return Err(SubjectError::RejectWithoutNewOwner);
773 }
774
775 (validation, ActualProtocols::None)
776 }
777 (EventRequest::EOL(..), Protocols::EOL { validation }, ..) => {
778 if modified_subject_metadata.new_owner.is_some() {
779 return Err(SubjectError::UnexpectedEOLEvent);
780 }
781
782 modified_subject_metadata.active = false;
783 (validation, ActualProtocols::None)
784 }
785 _ => {
786 return Err(SubjectError::EventProtocolMismatch);
787 }
788 };
789
790 if modified_subject_metadata.schema_id.is_gov()
791 && new_actual_protocols.is_success()
792 {
793 let mut gov_data = serde_json::from_value::<GovernanceData>(
794 modified_subject_metadata.properties.0,
795 )
796 .map_err(|e| {
797 SubjectError::GovernanceDataConversionFailed {
798 details: e.to_string(),
799 }
800 })?;
801
802 gov_data.version += 1;
803 modified_subject_metadata.properties = gov_data.to_value_wrapper();
804 }
805
806 let validation_req = ValidationReq::Event {
807 actual_protocols: Box::new(new_actual_protocols),
808 event_request: new_ledger_event.content().event_request.clone(),
809 ledger_hash: actual_ledger_event_hash.clone(),
810 metadata: Box::new(subject_metadata.clone()),
811 last_data: Box::new(last_data),
812 gov_version: new_ledger_event.content().gov_version,
813 sn: new_ledger_event.content().sn,
814 };
815
816 let signed_validation_req = Signed::from_parts(
817 validation_req,
818 validation.validation_req_signature.clone(),
819 );
820
821 if signed_validation_req.verify().is_err() {
822 return Err(SubjectError::InvalidValidationRequestSignature);
823 }
824
825 let hash_signed_val_req =
826 hash_borsh(&*hash.hasher(), &signed_validation_req).map_err(
827 |e| SubjectError::ValidationRequestHashFailed {
828 details: e.to_string(),
829 },
830 )?;
831
832 if hash_signed_val_req != validation.validation_req_hash {
833 return Err(SubjectError::ValidationRequestHashMismatch);
834 }
835
836 modified_subject_metadata.prev_ledger_event_hash =
837 actual_ledger_event_hash;
838
839 let modified_metadata_hash =
840 hash_borsh(&*hash.hasher(), &modified_subject_metadata).map_err(
841 |e| SubjectError::ModifiedMetadataHashFailed {
842 details: e.to_string(),
843 },
844 )?;
845
846 let validation_res = ValidationRes::Response {
847 vali_req_hash: hash_signed_val_req,
848 modified_metadata_hash,
849 };
850
851 let role_data = get_validation_roles_register(
852 ctx,
853 &subject_metadata.governance_id,
854 SearchRole {
855 schema_id: subject_metadata.schema_id,
856 namespace: subject_metadata.namespace,
857 },
858 new_ledger_event.content().gov_version,
859 )
860 .await
861 .map_err(|e| SubjectError::ValidatorsRetrievalFailed {
862 details: e.to_string(),
863 })?;
864
865 if !check_quorum_signers(
866 &validation
867 .validators_signatures
868 .iter()
869 .map(|x| x.signer.clone())
870 .collect::<HashSet<PublicKey>>(),
871 &role_data.quorum,
872 &role_data.workers,
873 ) {
874 return Err(SubjectError::InvalidQuorum);
875 }
876
877 for signature in validation.validators_signatures.iter() {
878 let signed_res =
879 Signed::from_parts(validation_res.clone(), signature.clone());
880
881 if signed_res.verify().is_err() {
882 return Err(SubjectError::InvalidValidatorSignature);
883 }
884 }
885
886 Ok(new_ledger_event.content().protocols.is_success())
887 }
888
889 async fn verify_first_ledger_event(
890 ctx: &mut ActorContext<Self>,
891 ledger_event: &SignedLedger,
892 hash: &HashAlgorithm,
893 subject_metadata: Metadata,
894 ) -> Result<(), SubjectError> {
895 if ledger_event.verify().is_err() {
896 return Err(SubjectError::SignatureVerificationFailed {
897 context: "first ledger event signature verification failed"
898 .to_string(),
899 });
900 }
901
902 if ledger_event.signature().signer != subject_metadata.owner {
903 return Err(SubjectError::IncorrectSigner {
904 expected: subject_metadata.owner.to_string(),
905 actual: ledger_event.signature().signer.to_string(),
906 });
907 }
908
909 if ledger_event.content().event_request.verify().is_err() {
910 return Err(SubjectError::SignatureVerificationFailed {
911 context: "event request signature verification failed"
912 .to_string(),
913 });
914 }
915
916 if ledger_event.content().sn != 0 {
917 return Err(SubjectError::InvalidCreationSequenceNumber);
918 }
919
920 if !ledger_event.content().prev_ledger_event_hash.is_empty() {
921 return Err(SubjectError::NonEmptyPreviousHashInCreation);
922 }
923
924 let event_request_type = EventRequestType::from(
925 ledger_event.content().event_request.content(),
926 );
927
928 let validation =
929 match (event_request_type, &ledger_event.content().protocols) {
930 (
931 EventRequestType::Create,
932 Protocols::Create { validation },
933 ) => validation,
934 _ => {
935 return Err(SubjectError::EventProtocolMismatch);
936 }
937 };
938
939 let ValidationMetadata::Metadata(metadata) =
940 &validation.validation_metadata
941 else {
942 return Err(SubjectError::InvalidValidationMetadata);
943 };
944
945 let validation_req = ValidationReq::Create {
946 event_request: ledger_event.content().event_request.clone(),
947 gov_version: ledger_event.content().gov_version,
948 subject_id: subject_metadata.subject_id.clone(),
949 };
950
951 let signed_validation_req = Signed::from_parts(
952 validation_req,
953 validation.validation_req_signature.clone(),
954 );
955
956 if signed_validation_req.verify().is_err() {
957 return Err(SubjectError::InvalidValidationRequestSignature);
958 }
959
960 let hash_signed_val_req =
961 hash_borsh(&*hash.hasher(), &signed_validation_req).map_err(
962 |e| SubjectError::ValidationRequestHashFailed {
963 details: e.to_string(),
964 },
965 )?;
966
967 if hash_signed_val_req != validation.validation_req_hash {
968 return Err(SubjectError::ValidationRequestHashMismatch);
969 }
970
971 if metadata.deref() != &subject_metadata {
972 return Err(SubjectError::MetadataMismatch);
973 }
974
975 if metadata.schema_id == SchemaType::Governance {
976 serde_json::from_value::<GovernanceData>(
977 metadata.properties.0.clone(),
978 )
979 .map_err(|e| {
980 SubjectError::GovernancePropertiesConversionFailed {
981 details: e.to_string(),
982 }
983 })?;
984 }
985
986 let validation_res = ValidationRes::Create {
987 vali_req_hash: hash_signed_val_req,
988 subject_metadata: Box::new(subject_metadata),
989 };
990
991 let role_data = match metadata.schema_id {
992 SchemaType::Governance => RoleDataRegister {
993 workers: HashSet::from([metadata.owner.clone()]),
994 quorum: Quorum::Majority,
995 },
996 SchemaType::Type(_) => get_validation_roles_register(
997 ctx,
998 &metadata.governance_id,
999 SearchRole {
1000 schema_id: metadata.schema_id.clone(),
1001 namespace: metadata.namespace.clone(),
1002 },
1003 ledger_event.content().gov_version,
1004 )
1005 .await
1006 .map_err(|e| {
1007 SubjectError::ValidatorsRetrievalFailed {
1008 details: e.to_string(),
1009 }
1010 })?,
1011 SchemaType::TrackerSchemas => {
1012 return Err(SubjectError::InvalidSchemaId);
1013 }
1014 };
1015
1016 if !check_quorum_signers(
1017 &validation
1018 .validators_signatures
1019 .iter()
1020 .map(|x| x.signer.clone())
1021 .collect::<HashSet<PublicKey>>(),
1022 &role_data.quorum,
1023 &role_data.workers,
1024 ) {
1025 return Err(SubjectError::InvalidQuorum);
1026 }
1027
1028 for signature in validation.validators_signatures.iter() {
1029 let signed_res =
1030 Signed::from_parts(validation_res.clone(), signature.clone());
1031
1032 if signed_res.verify().is_err() {
1033 return Err(SubjectError::InvalidValidatorSignature);
1034 }
1035 }
1036
1037 Ok(())
1038 }
1039
1040 async fn register(
1041 ctx: &mut ActorContext<Self>,
1042 message: RegisterMessage,
1043 ) -> Result<(), ActorError> {
1044 let register_path = ActorPath::from("/user/node/register");
1045 match ctx.system().get_actor::<Register>(®ister_path).await {
1046 Ok(register) => {
1047 register.tell(message.clone()).await?;
1048
1049 debug!(
1050 message = ?message,
1051 "Register message sent successfully"
1052 );
1053 }
1054 Err(e) => {
1055 error!(
1056 path = %register_path,
1057 "Register actor not found"
1058 );
1059 return Err(e);
1060 }
1061 };
1062
1063 Ok(())
1064 }
1065
1066 async fn event_to_sink(
1067 ctx: &mut ActorContext<Self>,
1068 data: DataForSink,
1069 event: &EventRequest,
1070 ) -> Result<(), ActorError> {
1071 let msg = SinkDataMessage::Event {
1072 event: Box::new(data_to_sink_event(data.clone(), event)),
1073 event_request_timestamp: data.event_request_timestamp,
1074 event_ledger_timestamp: data.event_ledger_timestamp,
1075 };
1076
1077 Self::publish_sink(ctx, msg).await
1078 }
1079
1080 async fn publish_sink(
1081 ctx: &mut ActorContext<Self>,
1082 message: SinkDataMessage,
1083 ) -> Result<(), ActorError> {
1084 let sink_data = ctx.get_child::<SinkData>("sink_data").await?;
1085 let (subject_id, schema_id) = message.get_subject_schema();
1086
1087 sink_data.tell(message).await?;
1088 debug!(
1089 subject_id = %subject_id,
1090 schema_id = %schema_id,
1091 "Message published to sink successfully"
1092 );
1093
1094 Ok(())
1095 }
1096
1097 async fn get_ledger(
1098 &self,
1099 ctx: &mut ActorContext<Self>,
1100 lo_sn: Option<u64>,
1101 hi_sn: u64,
1102 ) -> Result<(Vec<<Self as Actor>::Event>, bool), ActorError> {
1103 if let Some(lo_sn) = lo_sn {
1104 let actual_sn = lo_sn + 1;
1105 if (hi_sn - actual_sn) > 99 {
1106 Ok((get_n_events(ctx, actual_sn, 99).await?, false))
1107 } else {
1108 Ok((
1109 get_n_events(ctx, actual_sn, hi_sn - actual_sn).await?,
1110 true,
1111 ))
1112 }
1113 } else if hi_sn > 99 {
1114 Ok((get_n_events(ctx, 0, 99).await?, false))
1115 } else {
1116 Ok((get_n_events(ctx, 0, hi_sn).await?, true))
1117 }
1118 }
1119
1120 async fn update_sn(
1121 &self,
1122 ctx: &mut ActorContext<Self>,
1123 ) -> Result<(), ActorError>;
1124
1125 async fn reject(
1126 &self,
1127 ctx: &mut ActorContext<Self>,
1128 gov_version: u64,
1129 ) -> Result<(), ActorError>;
1130
1131 async fn confirm(
1132 &self,
1133 ctx: &mut ActorContext<Self>,
1134 new_owner: PublicKey,
1135 gov_version: u64,
1136 ) -> Result<(), ActorError>;
1137
1138 async fn transfer(
1139 &self,
1140 ctx: &mut ActorContext<Self>,
1141 new_owner: PublicKey,
1142 gov_version: u64,
1143 ) -> Result<(), ActorError>;
1144
1145 async fn eol(&self, ctx: &mut ActorContext<Self>)
1146 -> Result<(), ActorError>;
1147
1148 fn apply_patch(
1149 &mut self,
1150 json_patch: ValueWrapper,
1151 ) -> Result<(), ActorError>;
1152
1153 async fn manager_new_ledger_events(
1154 &mut self,
1155 ctx: &mut ActorContext<Self>,
1156 events: Vec<SignedLedger>,
1157 ) -> Result<(), ActorError>;
1158
1159 async fn get_last_ledger(
1160 &self,
1161 ctx: &mut ActorContext<Self>,
1162 ) -> Result<Option<SignedLedger>, ActorError>;
1163}