1use std::{
5 collections::{BTreeSet, HashSet},
6 ops::Deref,
7};
8
9use crate::{
10 governance::{
11 Governance,
12 data::GovernanceData,
13 model::Quorum,
14 role_register::{RoleDataRegister, SearchRole},
15 },
16 model::{
17 common::{
18 check_quorum_signers, get_n_events, get_validation_roles_register,
19 },
20 event::{Ledger, LedgerSeal, Protocols, ValidationMetadata},
21 },
22 node::register::{Register, RegisterMessage},
23 tracker::Tracker,
24 validation::{
25 request::{ActualProtocols, LastData, ValidationReq},
26 response::ValidationRes,
27 },
28};
29
30use error::SubjectError;
31
32use ave_actors::{
33 Actor, ActorContext, ActorError, ActorPath, Event, PersistentActor,
34};
35use ave_common::{
36 DataToSink, DataToSinkEvent, Namespace, SchemaType, ValueWrapper,
37 identity::{
38 DigestIdentifier, HashAlgorithm, PublicKey, Signed, hash_borsh,
39 },
40 request::EventRequest,
41 response::{
42 SinkEventsPage, SubjectDB, TrackerEventVisibilityDB,
43 TrackerEventVisibilityRangeDB, TrackerStoredVisibilityDB,
44 TrackerStoredVisibilityRangeDB, TrackerVisibilityModeDB,
45 TrackerVisibilityStateDB,
46 },
47};
48
49use async_trait::async_trait;
50use borsh::{BorshDeserialize, BorshSerialize};
51use json_patch::{Patch, patch};
52use serde::{Deserialize, Serialize};
53use serde_json::Value;
54use sinkdata::{SinkData, SinkDataMessage};
55use tracing::{debug, error};
56
57pub mod error;
58pub mod sinkdata;
59
60impl Event for Ledger {}
61
62#[derive(
63 Clone, Debug, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
64)]
65pub struct RequestSubjectData {
66 pub subject_id: DigestIdentifier,
67 pub governance_id: DigestIdentifier,
68 pub namespace: Namespace,
69 pub schema_id: SchemaType,
70 pub sn: u64,
71 pub gov_version: u64,
72 pub signer: PublicKey,
73}
74
75pub struct VerifyNewLedgerEvent<'a> {
76 pub new_ledger_event: &'a Ledger,
77 pub subject_metadata: Metadata,
78 pub actual_ledger_event_hash: DigestIdentifier,
79 pub last_data: LastData,
80 pub hash: &'a HashAlgorithm,
81 pub full_view: bool,
82 pub only_clear_events: bool,
83}
84
85#[derive(
87 Debug,
88 Clone,
89 Serialize,
90 Deserialize,
91 BorshSerialize,
92 BorshDeserialize,
93 PartialEq,
94 Eq,
95 Hash,
96)]
97pub struct Metadata {
98 pub name: Option<String>,
99 pub description: Option<String>,
100 pub subject_id: DigestIdentifier,
102 pub governance_id: DigestIdentifier,
104 pub genesis_gov_version: u64,
105 pub prev_ledger_event_hash: DigestIdentifier,
106 pub schema_id: SchemaType,
108 pub namespace: Namespace,
110 pub sn: u64,
112 pub creator: PublicKey,
114 pub owner: PublicKey,
116 pub new_owner: Option<PublicKey>,
117 pub active: bool,
119 pub properties: ValueWrapper,
121}
122
123#[derive(
124 Debug,
125 Clone,
126 Serialize,
127 Deserialize,
128 BorshSerialize,
129 BorshDeserialize,
130 PartialEq,
131 Eq,
132 Hash,
133)]
134pub struct MetadataWithoutProperties {
135 pub name: Option<String>,
136 pub description: Option<String>,
137 pub subject_id: DigestIdentifier,
138 pub governance_id: DigestIdentifier,
139 pub genesis_gov_version: u64,
140 pub prev_ledger_event_hash: DigestIdentifier,
141 pub schema_id: SchemaType,
142 pub namespace: Namespace,
143 pub sn: u64,
144 pub creator: PublicKey,
145 pub owner: PublicKey,
146 pub new_owner: Option<PublicKey>,
147 pub active: bool,
148}
149
150impl From<Metadata> for MetadataWithoutProperties {
151 fn from(value: Metadata) -> Self {
152 Self {
153 name: value.name,
154 description: value.description,
155 subject_id: value.subject_id,
156 governance_id: value.governance_id,
157 genesis_gov_version: value.genesis_gov_version,
158 prev_ledger_event_hash: value.prev_ledger_event_hash,
159 schema_id: value.schema_id,
160 namespace: value.namespace,
161 sn: value.sn,
162 creator: value.creator,
163 owner: value.owner,
164 new_owner: value.new_owner,
165 active: value.active,
166 }
167 }
168}
169
170impl From<Governance> for Metadata {
171 fn from(value: Governance) -> Self {
172 Self {
173 name: value.subject_metadata.name,
174 description: value.subject_metadata.description,
175 subject_id: value.subject_metadata.subject_id.clone(),
176 governance_id: value.subject_metadata.subject_id,
177 genesis_gov_version: 0,
178 prev_ledger_event_hash: value
179 .subject_metadata
180 .prev_ledger_event_hash,
181 schema_id: value.subject_metadata.schema_id,
182 namespace: Namespace::new(),
183 sn: value.subject_metadata.sn,
184 creator: value.subject_metadata.creator,
185 owner: value.subject_metadata.owner,
186 new_owner: value.subject_metadata.new_owner,
187 active: value.subject_metadata.active,
188 properties: value.properties.to_value_wrapper(),
189 }
190 }
191}
192
193impl From<Tracker> for Metadata {
194 fn from(value: Tracker) -> Self {
195 Self {
196 name: value.subject_metadata.name,
197 description: value.subject_metadata.description,
198 subject_id: value.subject_metadata.subject_id,
199 governance_id: value.governance_id,
200 genesis_gov_version: value.genesis_gov_version,
201 prev_ledger_event_hash: value
202 .subject_metadata
203 .prev_ledger_event_hash,
204 schema_id: value.subject_metadata.schema_id,
205 namespace: value.namespace,
206 sn: value.subject_metadata.sn,
207 creator: value.subject_metadata.creator,
208 owner: value.subject_metadata.owner,
209 new_owner: value.subject_metadata.new_owner,
210 active: value.subject_metadata.active,
211 properties: value.properties,
212 }
213 }
214}
215
216impl From<Governance> for SubjectDB {
217 fn from(value: Governance) -> Self {
218 Self {
219 name: value.subject_metadata.name,
220 description: value.subject_metadata.description,
221 subject_id: value.subject_metadata.subject_id.to_string(),
222 governance_id: value.subject_metadata.subject_id.to_string(),
223 genesis_gov_version: 0,
224 prev_ledger_event_hash: if value
225 .subject_metadata
226 .prev_ledger_event_hash
227 .is_empty()
228 {
229 None
230 } else {
231 Some(value.subject_metadata.prev_ledger_event_hash.to_string())
232 },
233 schema_id: value.subject_metadata.schema_id.to_string(),
234 namespace: Namespace::new().to_string(),
235 sn: value.subject_metadata.sn,
236 creator: value.subject_metadata.creator.to_string(),
237 owner: value.subject_metadata.owner.to_string(),
238 new_owner: value
239 .subject_metadata
240 .new_owner
241 .map(|owner| owner.to_string()),
242 active: value.subject_metadata.active,
243 tracker_visibility: None,
244 properties: value.properties.to_value_wrapper().0,
245 }
246 }
247}
248
249impl From<crate::model::common::TrackerVisibilityMode>
250 for TrackerVisibilityModeDB
251{
252 fn from(value: crate::model::common::TrackerVisibilityMode) -> Self {
253 match value {
254 crate::model::common::TrackerVisibilityMode::Full => Self::Full,
255 crate::model::common::TrackerVisibilityMode::Opaque => Self::Opaque,
256 }
257 }
258}
259
260impl From<crate::model::common::TrackerStoredVisibility>
261 for TrackerStoredVisibilityDB
262{
263 fn from(value: crate::model::common::TrackerStoredVisibility) -> Self {
264 match value {
265 crate::model::common::TrackerStoredVisibility::Full => Self::Full,
266 crate::model::common::TrackerStoredVisibility::Only(viewpoints) => {
267 Self::Only {
268 viewpoints: viewpoints.into_iter().collect(),
269 }
270 }
271 crate::model::common::TrackerStoredVisibility::None => Self::None,
272 }
273 }
274}
275
276impl From<crate::model::common::TrackerStoredVisibilityRange>
277 for TrackerStoredVisibilityRangeDB
278{
279 fn from(value: crate::model::common::TrackerStoredVisibilityRange) -> Self {
280 Self {
281 from_sn: value.from_sn,
282 to_sn: value.to_sn,
283 visibility: value.visibility.into(),
284 }
285 }
286}
287
288impl From<crate::model::common::TrackerEventVisibility>
289 for TrackerEventVisibilityDB
290{
291 fn from(value: crate::model::common::TrackerEventVisibility) -> Self {
292 match value {
293 crate::model::common::TrackerEventVisibility::NonFact => {
294 Self::NonFact
295 }
296 crate::model::common::TrackerEventVisibility::Fact(viewpoints) => {
297 Self::Fact {
298 viewpoints: viewpoints.into_iter().collect(),
299 }
300 }
301 }
302 }
303}
304
305impl From<crate::model::common::TrackerEventVisibilityRange>
306 for TrackerEventVisibilityRangeDB
307{
308 fn from(value: crate::model::common::TrackerEventVisibilityRange) -> Self {
309 Self {
310 from_sn: value.from_sn,
311 to_sn: value.to_sn,
312 visibility: value.visibility.into(),
313 }
314 }
315}
316
317impl From<crate::model::common::TrackerVisibilityState>
318 for TrackerVisibilityStateDB
319{
320 fn from(value: crate::model::common::TrackerVisibilityState) -> Self {
321 Self {
322 mode: value.mode.into(),
323 stored_ranges: value
324 .stored_ranges
325 .into_iter()
326 .map(Into::into)
327 .collect(),
328 event_ranges: value
329 .event_ranges
330 .into_iter()
331 .map(Into::into)
332 .collect(),
333 }
334 }
335}
336
337#[derive(Clone)]
338pub struct DataForSink {
339 pub gov_id: Option<String>,
340 pub subject_id: String,
341 pub sn: u64,
342 pub owner: String,
343 pub namespace: String,
344 pub schema_id: SchemaType,
345 pub issuer: String,
346 pub event_request_timestamp: u64,
347 pub event_ledger_timestamp: u64,
348 pub gov_version: u64,
349 pub event_data_ledger: EventLedgerDataForSink,
350}
351
352#[derive(Clone, Debug)]
353struct SinkReplayState {
354 governance_id: Option<String>,
355 subject_id: String,
356 owner: String,
357 new_owner: Option<String>,
358 namespace: String,
359 schema_id: SchemaType,
360}
361
362impl SinkReplayState {
363 fn from_metadata(metadata: &Metadata) -> Self {
364 Self {
365 governance_id: if metadata.schema_id.is_gov() {
366 None
367 } else {
368 Some(metadata.governance_id.to_string())
369 },
370 subject_id: metadata.subject_id.to_string(),
371 owner: metadata.owner.to_string(),
372 new_owner: metadata.new_owner.as_ref().map(ToString::to_string),
373 namespace: metadata.namespace.to_string(),
374 schema_id: metadata.schema_id.clone(),
375 }
376 }
377
378 fn data_for_sink(
379 &self,
380 event: &Ledger,
381 event_data_ledger: EventLedgerDataForSink,
382 ) -> DataForSink {
383 let (issuer, event_request_timestamp) =
384 event.get_issuer_event_request_timestamp();
385
386 DataForSink {
387 gov_id: self.governance_id.clone(),
388 subject_id: self.subject_id.clone(),
389 sn: event.sn,
390 owner: self.owner.clone(),
391 namespace: self.namespace.clone(),
392 schema_id: self.schema_id.clone(),
393 issuer,
394 event_request_timestamp,
395 event_ledger_timestamp: event
396 .ledger_seal_signature
397 .timestamp
398 .as_nanos(),
399 gov_version: event.gov_version,
400 event_data_ledger,
401 }
402 }
403
404 fn build_replay_data_to_sink(
405 &self,
406 ledger: &Ledger,
407 public_key: &str,
408 sink_timestamp: u64,
409 ) -> Result<DataToSink, ActorError> {
410 let replay_parts = SinkReplayEventParts::from_ledger(ledger)?;
411 let data = self.data_for_sink(ledger, replay_parts.event_data_ledger);
412
413 Ok(build_data_to_sink(
414 data,
415 replay_parts.event_request,
416 public_key,
417 sink_timestamp,
418 ))
419 }
420
421 fn apply_success(
422 &mut self,
423 protocols: &Protocols,
424 ) -> Result<(), ActorError> {
425 match protocols {
426 Protocols::Create { .. }
427 | Protocols::TrackerFactFull { .. }
428 | Protocols::TrackerFactOpaque { .. }
429 | Protocols::GovFact { .. }
430 | Protocols::EOL { .. } => Ok(()),
431 Protocols::Transfer { event_request, .. } => {
432 let EventRequest::Transfer(transfer_request) =
433 event_request.content()
434 else {
435 error!(
436 subject_id = %self.subject_id,
437 actual_request = ?event_request.content(),
438 "Unexpected event request type while replaying transfer event"
439 );
440 return Err(ActorError::Functional {
441 description:
442 "Replay transfer event must carry a Transfer request"
443 .to_owned(),
444 });
445 };
446 self.new_owner = Some(transfer_request.new_owner.to_string());
447 Ok(())
448 }
449 Protocols::TrackerConfirm { .. } | Protocols::GovConfirm { .. } => {
450 let Some(new_owner) = self.new_owner.take() else {
451 error!(
452 subject_id = %self.subject_id,
453 "Replay confirm event without pending new owner"
454 );
455 return Err(ActorError::Functional {
456 description:
457 "Replay confirm event without pending new owner"
458 .to_owned(),
459 });
460 };
461 self.owner = new_owner;
462 Ok(())
463 }
464 Protocols::Reject { .. } => {
465 self.new_owner = None;
466 Ok(())
467 }
468 }
469 }
470}
471
472struct SinkReplayEventParts {
473 event_request: Option<EventRequest>,
474 event_data_ledger: EventLedgerDataForSink,
475}
476
477impl SinkReplayEventParts {
478 fn from_ledger(ledger: &Ledger) -> Result<Self, ActorError> {
479 match &ledger.protocols {
480 Protocols::Create { event_request, .. } => {
481 let metadata = ledger.get_create_metadata().map_err(|e| {
482 ActorError::Functional {
483 description: e.to_string(),
484 }
485 })?;
486
487 Ok(Self {
488 event_request: Some(event_request.content().clone()),
489 event_data_ledger: EventLedgerDataForSink::Create {
490 state: metadata.properties.0,
491 },
492 })
493 }
494 Protocols::TrackerFactFull { event_request, .. }
495 | Protocols::GovFact { event_request, .. }
496 | Protocols::Transfer { event_request, .. }
497 | Protocols::TrackerConfirm { event_request, .. }
498 | Protocols::GovConfirm { event_request, .. }
499 | Protocols::Reject { event_request, .. }
500 | Protocols::EOL { event_request, .. } => Ok(Self {
501 event_request: Some(event_request.content().clone()),
502 event_data_ledger: EventLedgerDataForSink::build(
503 &ledger.protocols,
504 &Value::Null,
505 ),
506 }),
507 Protocols::TrackerFactOpaque { .. } => Ok(Self {
508 event_request: None,
509 event_data_ledger: EventLedgerDataForSink::build(
510 &ledger.protocols,
511 &Value::Null,
512 ),
513 }),
514 }
515 }
516}
517
518#[derive(Clone)]
519pub enum EventLedgerDataForSink {
520 Create {
521 state: Value,
522 },
523 FactFull {
524 patch: Option<Value>,
525 success: bool,
526 error: Option<String>,
527 },
528 FactOpaque {
529 viewpoints: Vec<String>,
530 success: bool,
531 },
532 Transfer {
533 success: bool,
534 error: Option<String>,
535 },
536 Confirm {
537 patch: Option<Value>,
538 success: bool,
539 error: Option<String>,
540 },
541 Reject,
542 Eol,
543}
544
545impl EventLedgerDataForSink {
546 pub fn build(protocols: &Protocols, state: &Value) -> Self {
547 match protocols {
548 Protocols::Create { .. } => Self::Create {
549 state: state.clone(),
550 },
551 Protocols::TrackerFactFull { evaluation, .. }
552 | Protocols::GovFact { evaluation, .. } => {
553 let success = protocols.is_success();
554 let (patch, error) = match &evaluation.response {
555 crate::model::event::EvaluationResponse::Ok {
556 result,
557 ..
558 } if success => (Some(result.patch.0.clone()), None),
559 crate::model::event::EvaluationResponse::Ok { .. } => {
560 (None, None)
561 }
562 crate::model::event::EvaluationResponse::Error {
563 result,
564 ..
565 } => (None, Some(result.to_string())),
566 };
567
568 Self::FactFull {
569 patch,
570 success,
571 error,
572 }
573 }
574 Protocols::TrackerFactOpaque { evaluation, .. } => {
575 Self::FactOpaque {
576 viewpoints: evaluation.viewpoints.iter().cloned().collect(),
577 success: protocols.is_success(),
578 }
579 }
580 Protocols::Transfer { evaluation, .. } => {
581 let success = protocols.is_success();
582 let error = match &evaluation.response {
583 crate::model::event::EvaluationResponse::Error {
584 result,
585 ..
586 } => Some(result.to_string()),
587 crate::model::event::EvaluationResponse::Ok { .. } => None,
588 };
589 Self::Transfer { success, error }
590 }
591 Protocols::Reject { .. } => Self::Reject,
592 Protocols::EOL { .. } => Self::Eol,
593 Protocols::TrackerConfirm { .. } => Self::Confirm {
594 patch: None,
595 success: true,
596 error: None,
597 },
598 Protocols::GovConfirm { evaluation, .. } => {
599 let success = protocols.is_success();
600 let (patch, error) = match &evaluation.response {
601 crate::model::event::EvaluationResponse::Ok {
602 result,
603 ..
604 } if success => (Some(result.patch.0.clone()), None),
605 crate::model::event::EvaluationResponse::Ok { .. } => {
606 (None, None)
607 }
608 crate::model::event::EvaluationResponse::Error {
609 result,
610 ..
611 } => (None, Some(result.to_string())),
612 };
613
614 Self::Confirm {
615 patch,
616 success,
617 error,
618 }
619 }
620 }
621 }
622}
623
624fn data_to_sink_event(
625 data: DataForSink,
626 event: Option<EventRequest>,
627) -> DataToSinkEvent {
628 match (event, data.event_data_ledger) {
629 (
630 Some(EventRequest::Create(..)),
631 EventLedgerDataForSink::Create { state },
632 ) => DataToSinkEvent::Create {
633 governance_id: data.gov_id,
634 subject_id: data.subject_id,
635 owner: data.owner,
636 schema_id: data.schema_id,
637 namespace: data.namespace.to_string(),
638 sn: data.sn,
639 gov_version: data.gov_version,
640 state,
641 },
642 (
643 Some(EventRequest::Fact(fact_request)),
644 EventLedgerDataForSink::FactFull {
645 patch,
646 success,
647 error,
648 },
649 ) => DataToSinkEvent::FactFull {
650 governance_id: data.gov_id,
651 subject_id: data.subject_id,
652 issuer: data.issuer.to_string(),
653 viewpoints: fact_request.viewpoints.iter().cloned().collect(),
654 owner: data.owner,
655 payload: success.then_some(fact_request.payload.0),
656 schema_id: data.schema_id,
657 sn: data.sn,
658 gov_version: data.gov_version,
659 patch,
660 success,
661 error,
662 },
663 (
664 None,
665 EventLedgerDataForSink::FactOpaque {
666 viewpoints,
667 success,
668 },
669 ) => DataToSinkEvent::FactOpaque {
670 governance_id: data.gov_id,
671 subject_id: data.subject_id,
672 viewpoints,
673 owner: data.owner,
674 schema_id: data.schema_id,
675 sn: data.sn,
676 gov_version: data.gov_version,
677 success,
678 },
679 (
680 Some(EventRequest::Transfer(transfer_request)),
681 EventLedgerDataForSink::Transfer { success, error },
682 ) => DataToSinkEvent::Transfer {
683 governance_id: data.gov_id,
684 subject_id: data.subject_id,
685 owner: data.owner,
686 new_owner: transfer_request.new_owner.to_string(),
687 schema_id: data.schema_id,
688 sn: data.sn,
689 gov_version: data.gov_version,
690 success,
691 error,
692 },
693 (
694 Some(EventRequest::Confirm(confirm_request)),
695 EventLedgerDataForSink::Confirm {
696 patch,
697 success,
698 error,
699 },
700 ) => DataToSinkEvent::Confirm {
701 governance_id: data.gov_id,
702 subject_id: data.subject_id,
703 schema_id: data.schema_id,
704 sn: data.sn,
705 gov_version: data.gov_version,
706 patch,
707 success,
708 error,
709 name_old_owner: confirm_request.name_old_owner,
710 },
711 (Some(EventRequest::Reject(..)), EventLedgerDataForSink::Reject) => {
712 DataToSinkEvent::Reject {
713 governance_id: data.gov_id,
714 subject_id: data.subject_id,
715 schema_id: data.schema_id,
716 sn: data.sn,
717 gov_version: data.gov_version,
718 }
719 }
720 (Some(EventRequest::EOL(..)), EventLedgerDataForSink::Eol) => {
721 DataToSinkEvent::Eol {
722 governance_id: data.gov_id,
723 subject_id: data.subject_id,
724 schema_id: data.schema_id,
725 sn: data.sn,
726 gov_version: data.gov_version,
727 }
728 }
729 _ => {
730 unreachable!(
731 "EventLedgerDataForSink is created according to protocols and protocols according to EventRequest"
732 )
733 }
734 }
735}
736
737pub fn build_data_to_sink(
738 data: DataForSink,
739 event: Option<EventRequest>,
740 public_key: &str,
741 sink_timestamp: u64,
742) -> DataToSink {
743 DataToSink {
744 payload: data_to_sink_event(data.clone(), event),
745 public_key: public_key.to_owned(),
746 event_request_timestamp: data.event_request_timestamp,
747 event_ledger_timestamp: data.event_ledger_timestamp,
748 sink_timestamp,
749 }
750}
751
752pub fn replay_sink_events(
753 ledgers: &[Ledger],
754 public_key: &str,
755 from_sn: u64,
756 to_sn: Option<u64>,
757 limit: u64,
758 sink_timestamp: u64,
759) -> Result<SinkEventsPage, ActorError> {
760 if limit == 0 {
761 return Err(ActorError::Functional {
762 description: "Replay limit must be greater than zero".to_owned(),
763 });
764 }
765
766 let mut replay_state: Option<SinkReplayState> = None;
767 let mut events = Vec::new();
768 let mut next_sn = None;
769 let upper_bound = to_sn.unwrap_or(u64::MAX);
770
771 for ledger in ledgers {
772 if replay_state.is_none() {
773 let metadata = ledger.get_create_metadata().map_err(|e| {
774 ActorError::Functional {
775 description: e.to_string(),
776 }
777 })?;
778 replay_state = Some(SinkReplayState::from_metadata(&metadata));
779 }
780
781 let Some(state) = replay_state.as_mut() else {
782 unreachable!("replay state is initialized above");
783 };
784
785 let sn = ledger.sn;
786 if sn > upper_bound {
787 break;
788 }
789
790 let is_success = ledger.protocols.is_success();
791 if sn >= from_sn {
792 if events.len() as u64 >= limit {
793 next_sn = Some(sn);
794 break;
795 }
796
797 events.push(state.build_replay_data_to_sink(
798 ledger,
799 public_key,
800 sink_timestamp,
801 )?);
802 }
803
804 if is_success {
805 state.apply_success(&ledger.protocols)?;
806 }
807 }
808
809 Ok(SinkEventsPage {
810 from_sn,
811 to_sn,
812 limit,
813 next_sn,
814 has_more: next_sn.is_some(),
815 events,
816 })
817}
818
819#[derive(
820 Default,
821 Debug,
822 Serialize,
823 Deserialize,
824 Clone,
825 BorshSerialize,
826 BorshDeserialize,
827)]
828pub struct SubjectMetadata {
829 pub name: Option<String>,
831 pub description: Option<String>,
833 pub subject_id: DigestIdentifier,
835
836 pub schema_id: SchemaType,
837 pub owner: PublicKey,
839 pub new_owner: Option<PublicKey>,
841
842 pub prev_ledger_event_hash: DigestIdentifier,
843 pub creator: PublicKey,
845 pub active: bool,
847 pub sn: u64,
849}
850
851impl SubjectMetadata {
852 pub fn new(data: &Metadata) -> Self {
853 Self {
854 name: data.name.clone(),
855 description: data.description.clone(),
856 subject_id: data.subject_id.clone(),
857 owner: data.creator.clone(),
858 schema_id: data.schema_id.clone(),
859 new_owner: data.new_owner.clone(),
860 prev_ledger_event_hash: data.prev_ledger_event_hash.clone(),
861 creator: data.creator.clone(),
862 active: data.active,
863 sn: data.sn,
864 }
865 }
866}
867
868#[async_trait]
869pub trait Subject
870where
871 <Self as Actor>::Event: BorshSerialize + BorshDeserialize,
872 Self: PersistentActor,
873{
874 fn verify_new_ledger_event_args<'a>(
875 new_ledger_event: &'a Ledger,
876 subject_metadata: Metadata,
877 actual_ledger_event_hash: DigestIdentifier,
878 last_data: LastData,
879 hash: &'a HashAlgorithm,
880 full_view: bool,
881 only_clear_events: bool,
882 ) -> VerifyNewLedgerEvent<'a> {
883 VerifyNewLedgerEvent {
884 new_ledger_event,
885 subject_metadata,
886 actual_ledger_event_hash,
887 last_data,
888 hash,
889 full_view,
890 only_clear_events,
891 }
892 }
893
894 fn hash_viewpoints(
895 hash: &HashAlgorithm,
896 viewpoints: &BTreeSet<String>,
897 ) -> Result<DigestIdentifier, SubjectError> {
898 hash_borsh(&*hash.hasher(), viewpoints).map_err(|e| {
899 SubjectError::HashCreationFailed {
900 details: e.to_string(),
901 }
902 })
903 }
904
905 fn request_viewpoints(event_request: &EventRequest) -> BTreeSet<String> {
906 match event_request {
907 EventRequest::Fact(fact_request) => fact_request.viewpoints.clone(),
908 _ => BTreeSet::new(),
909 }
910 }
911
912 fn apply_patch_verify(
913 subject_properties: &mut ValueWrapper,
914 json_patch: ValueWrapper,
915 ) -> Result<(), SubjectError> {
916 let json_patch = serde_json::from_value::<Patch>(json_patch.0)
917 .map_err(|e| SubjectError::PatchConversionFailed {
918 details: e.to_string(),
919 })?;
920
921 patch(&mut subject_properties.0, &json_patch).map_err(|e| {
922 SubjectError::PatchApplicationFailed {
923 details: e.to_string(),
924 }
925 })?;
926
927 Ok(())
928 }
929
930 async fn verify_new_ledger_event(
931 ctx: &mut ActorContext<Self>,
932 args: VerifyNewLedgerEvent<'_>,
933 ) -> Result<bool, SubjectError> {
934 let VerifyNewLedgerEvent {
935 new_ledger_event,
936 subject_metadata,
937 actual_ledger_event_hash,
938 last_data,
939 hash,
940 full_view,
941 only_clear_events,
942 } = args;
943
944 if !subject_metadata.active {
945 return Err(SubjectError::SubjectInactive);
946 }
947
948 if new_ledger_event.sn != subject_metadata.sn + 1 {
949 return Err(SubjectError::InvalidSequenceNumber {
950 expected: subject_metadata.sn + 1,
951 actual: new_ledger_event.sn,
952 });
953 }
954
955 let protocols_hash = new_ledger_event
956 .protocols
957 .hash_for_ledger(hash)
958 .map_err(|e| SubjectError::HashCreationFailed {
959 details: e.to_string(),
960 })?;
961
962 let ledger_seal = LedgerSeal {
963 gov_version: new_ledger_event.gov_version,
964 sn: new_ledger_event.sn,
965 prev_ledger_event_hash: new_ledger_event
966 .prev_ledger_event_hash
967 .clone(),
968 protocols_hash,
969 };
970
971 if new_ledger_event
972 .ledger_seal_signature
973 .verify(&ledger_seal)
974 .is_err()
975 {
976 return Err(SubjectError::SignatureVerificationFailed {
977 context: "new ledger event signature verification failed"
978 .to_string(),
979 });
980 }
981
982 let signer = if let Some(new_owner) = &subject_metadata.new_owner {
983 new_owner.clone()
984 } else {
985 subject_metadata.owner.clone()
986 };
987
988 if new_ledger_event.ledger_seal_signature.signer != signer {
989 return Err(SubjectError::IncorrectSigner {
990 expected: signer.to_string(),
991 actual: new_ledger_event
992 .ledger_seal_signature
993 .signer
994 .to_string(),
995 });
996 }
997
998 if actual_ledger_event_hash != new_ledger_event.prev_ledger_event_hash {
999 return Err(SubjectError::PreviousHashMismatch);
1000 }
1001
1002 let mut modified_subject_metadata = subject_metadata.clone();
1003 modified_subject_metadata.sn += 1;
1004
1005 let (
1006 validation,
1007 new_actual_protocols,
1008 event_request,
1009 opaque_event_request_hash,
1010 opaque_viewpoints_hash,
1011 ) = match (
1012 &new_ledger_event.protocols,
1013 subject_metadata.schema_id.is_gov(),
1014 ) {
1015 (
1016 Protocols::TrackerFactFull {
1017 event_request,
1018 evaluation,
1019 validation,
1020 },
1021 false,
1022 ) => {
1023 if let EventRequest::Fact(..) = event_request.content() {
1024 } else {
1025 return Err(SubjectError::EventProtocolMismatch);
1026 }
1027
1028 if modified_subject_metadata.new_owner.is_some() {
1029 return Err(SubjectError::UnexpectedFactEvent);
1030 }
1031
1032 if full_view
1033 && let Some(eval) = evaluation.evaluator_response_ok()
1034 {
1035 Self::apply_patch_verify(
1036 &mut modified_subject_metadata.properties,
1037 eval.patch,
1038 )?;
1039 }
1040 (
1041 validation,
1042 ActualProtocols::Eval {
1043 eval_data: evaluation.clone(),
1044 },
1045 Some(event_request),
1046 None,
1047 None,
1048 )
1049 }
1050 (
1051 Protocols::TrackerFactOpaque {
1052 data,
1053 event_request_hash,
1054 evaluation,
1055 validation,
1056 ..
1057 },
1058 false,
1059 ) => {
1060 if only_clear_events {
1061 return Err(
1062 SubjectError::OnlyClearEventsCannotAcceptTrackerOpaque,
1063 );
1064 }
1065
1066 if data.subject_id != subject_metadata.subject_id {
1067 return Err(SubjectError::SubjectIdMismatch {
1068 expected: subject_metadata.subject_id.to_string(),
1069 actual: data.subject_id.to_string(),
1070 });
1071 }
1072
1073 (
1074 validation,
1075 ActualProtocols::None,
1076 None,
1077 Some(event_request_hash.clone()),
1078 Some(Self::hash_viewpoints(hash, &evaluation.viewpoints)?),
1079 )
1080 }
1081 (
1082 Protocols::GovFact {
1083 event_request,
1084 evaluation,
1085 approval,
1086 validation,
1087 },
1088 true,
1089 ) => {
1090 if let EventRequest::Fact(fact_request) =
1091 event_request.content()
1092 {
1093 if !fact_request.viewpoints.is_empty() {
1094 return Err(
1095 SubjectError::GovernanceFactViewpointsNotAllowed,
1096 );
1097 }
1098 } else {
1099 return Err(SubjectError::EventProtocolMismatch);
1100 }
1101
1102 if modified_subject_metadata.new_owner.is_some() {
1103 return Err(SubjectError::UnexpectedFactEvent);
1104 }
1105
1106 let actual_protocols =
1107 if let Some(eval) = evaluation.evaluator_response_ok() {
1108 if let Some(appr) = approval {
1109 if appr.approved {
1110 Self::apply_patch_verify(
1111 &mut modified_subject_metadata.properties,
1112 eval.patch,
1113 )?;
1114 }
1115
1116 ActualProtocols::EvalApprove {
1117 eval_data: evaluation.clone(),
1118 approval_data: appr.clone(),
1119 }
1120 } else {
1121 return Err(
1122 SubjectError::MissingApprovalAfterEvaluation,
1123 );
1124 }
1125 } else if approval.is_some() {
1126 return Err(
1127 SubjectError::UnexpectedApprovalAfterFailedEvaluation,
1128 );
1129 } else {
1130 ActualProtocols::Eval {
1131 eval_data: evaluation.clone(),
1132 }
1133 };
1134
1135 (
1136 validation,
1137 actual_protocols,
1138 Some(event_request),
1139 None,
1140 None,
1141 )
1142 }
1143 (
1144 Protocols::Transfer {
1145 event_request,
1146 evaluation,
1147 validation,
1148 },
1149 ..,
1150 ) => {
1151 let EventRequest::Transfer(transfer) = event_request.content()
1152 else {
1153 return Err(SubjectError::EventProtocolMismatch);
1154 };
1155
1156 if modified_subject_metadata.new_owner.is_some() {
1157 return Err(SubjectError::UnexpectedTransferEvent);
1158 }
1159
1160 if let Some(eval) = evaluation.evaluator_response_ok() {
1161 Self::apply_patch_verify(
1162 &mut modified_subject_metadata.properties,
1163 eval.patch,
1164 )?;
1165 modified_subject_metadata.new_owner =
1166 Some(transfer.new_owner.clone());
1167 }
1168
1169 (
1170 validation,
1171 ActualProtocols::Eval {
1172 eval_data: evaluation.clone(),
1173 },
1174 Some(event_request),
1175 None,
1176 None,
1177 )
1178 }
1179 (
1180 Protocols::TrackerConfirm {
1181 event_request,
1182 validation,
1183 },
1184 false,
1185 ) => {
1186 if let EventRequest::Confirm(..) = event_request.content() {
1187 } else {
1188 return Err(SubjectError::EventProtocolMismatch);
1189 }
1190
1191 if let Some(new_owner) =
1192 &modified_subject_metadata.new_owner.take()
1193 {
1194 modified_subject_metadata.owner = new_owner.clone();
1195 } else {
1196 return Err(SubjectError::ConfirmWithoutNewOwner);
1197 }
1198
1199 (
1200 validation,
1201 ActualProtocols::None,
1202 Some(event_request),
1203 None,
1204 None,
1205 )
1206 }
1207 (
1208 Protocols::GovConfirm {
1209 event_request,
1210 evaluation,
1211 validation,
1212 },
1213 true,
1214 ) => {
1215 if let EventRequest::Confirm(..) = event_request.content() {
1216 } else {
1217 return Err(SubjectError::EventProtocolMismatch);
1218 }
1219
1220 if let Some(eval) = evaluation.evaluator_response_ok() {
1221 Self::apply_patch_verify(
1222 &mut modified_subject_metadata.properties,
1223 eval.patch,
1224 )?;
1225
1226 if let Some(new_owner) =
1227 &modified_subject_metadata.new_owner.take()
1228 {
1229 modified_subject_metadata.owner = new_owner.clone();
1230 } else {
1231 return Err(SubjectError::ConfirmWithoutNewOwner);
1232 }
1233 }
1234
1235 (
1236 validation,
1237 ActualProtocols::Eval {
1238 eval_data: evaluation.clone(),
1239 },
1240 Some(event_request),
1241 None,
1242 None,
1243 )
1244 }
1245 (
1246 Protocols::Reject {
1247 event_request,
1248 validation,
1249 },
1250 ..,
1251 ) => {
1252 if let EventRequest::Reject(..) = event_request.content() {
1253 } else {
1254 return Err(SubjectError::EventProtocolMismatch);
1255 }
1256
1257 if modified_subject_metadata.new_owner.take().is_none() {
1258 return Err(SubjectError::RejectWithoutNewOwner);
1259 }
1260
1261 (
1262 validation,
1263 ActualProtocols::None,
1264 Some(event_request),
1265 None,
1266 None,
1267 )
1268 }
1269 (
1270 Protocols::EOL {
1271 event_request,
1272 validation,
1273 },
1274 ..,
1275 ) => {
1276 if let EventRequest::EOL(..) = event_request.content() {
1277 } else {
1278 return Err(SubjectError::EventProtocolMismatch);
1279 }
1280
1281 if modified_subject_metadata.new_owner.is_some() {
1282 return Err(SubjectError::UnexpectedEOLEvent);
1283 }
1284
1285 modified_subject_metadata.active = false;
1286 (
1287 validation,
1288 ActualProtocols::None,
1289 Some(event_request),
1290 None,
1291 None,
1292 )
1293 }
1294 _ => {
1295 return Err(SubjectError::EventProtocolMismatch);
1296 }
1297 };
1298
1299 if let Some(event_request) = event_request {
1300 if event_request.verify().is_err() {
1301 return Err(SubjectError::SignatureVerificationFailed {
1302 context: "event request signature verification failed"
1303 .to_string(),
1304 });
1305 }
1306
1307 let signer = event_request.signature().signer.clone();
1308 if !event_request.content().check_request_signature(
1309 &signer,
1310 &subject_metadata.owner,
1311 &subject_metadata.new_owner,
1312 ) {
1313 let (event, expected) = match event_request.content() {
1314 EventRequest::Create(..) => {
1315 ("create", subject_metadata.owner.to_string())
1316 }
1317 EventRequest::Transfer(..) => {
1318 ("transfer", subject_metadata.owner.to_string())
1319 }
1320 EventRequest::EOL(..) => {
1321 ("eol", subject_metadata.owner.to_string())
1322 }
1323 EventRequest::Confirm(..) => (
1324 "confirm",
1325 subject_metadata.new_owner.as_ref().map_or_else(
1326 || "new_owner".to_owned(),
1327 ToString::to_string,
1328 ),
1329 ),
1330 EventRequest::Reject(..) => (
1331 "reject",
1332 subject_metadata.new_owner.as_ref().map_or_else(
1333 || "new_owner".to_owned(),
1334 ToString::to_string,
1335 ),
1336 ),
1337 EventRequest::Fact(..) => ("fact", signer.to_string()),
1338 };
1339
1340 return Err(SubjectError::InvalidEventRequestSigner {
1341 event: event.to_owned(),
1342 expected,
1343 actual: signer.to_string(),
1344 });
1345 }
1346
1347 let event_subject_id = event_request.content().get_subject_id();
1348 if event_subject_id != subject_metadata.subject_id {
1349 return Err(SubjectError::SubjectIdMismatch {
1350 expected: subject_metadata.subject_id.to_string(),
1351 actual: event_subject_id.to_string(),
1352 });
1353 }
1354 }
1355
1356 if modified_subject_metadata.schema_id.is_gov()
1357 && new_actual_protocols.is_success()
1358 {
1359 let mut gov_data = serde_json::from_value::<GovernanceData>(
1360 modified_subject_metadata.properties.0,
1361 )
1362 .map_err(|e| {
1363 SubjectError::GovernanceDataConversionFailed {
1364 details: e.to_string(),
1365 }
1366 })?;
1367
1368 gov_data.version += 1;
1369 modified_subject_metadata.properties = gov_data.to_value_wrapper();
1370 }
1371
1372 modified_subject_metadata.prev_ledger_event_hash =
1373 actual_ledger_event_hash.clone();
1374
1375 let meta_wo_props =
1376 MetadataWithoutProperties::from(modified_subject_metadata.clone());
1377
1378 let meta_wo_props_hash = hash_borsh(&*hash.hasher(), &meta_wo_props)
1379 .map_err(|e| SubjectError::ModifiedMetadataHashFailed {
1380 details: e.to_string(),
1381 })?;
1382
1383 let (event_request_hash, viewpoints_hash) = if let Some(event_request) =
1384 event_request
1385 {
1386 (
1387 hash_borsh(&*hash.hasher(), event_request).map_err(|e| {
1388 SubjectError::HashCreationFailed {
1389 details: e.to_string(),
1390 }
1391 })?,
1392 Self::hash_viewpoints(
1393 hash,
1394 &Self::request_viewpoints(event_request.content()),
1395 )?,
1396 )
1397 } else {
1398 (
1399 opaque_event_request_hash.ok_or_else(|| {
1400 SubjectError::CannotObtain {
1401 what: "tracker opaque event_request_hash".to_owned(),
1402 }
1403 })?,
1404 opaque_viewpoints_hash.ok_or_else(|| {
1405 SubjectError::CannotObtain {
1406 what: "tracker opaque viewpoints_hash".to_owned(),
1407 }
1408 })?,
1409 )
1410 };
1411
1412 let propierties_hash = if full_view
1413 && let Some(event_request) = event_request
1414 {
1415 let validation_req = ValidationReq::Event {
1416 actual_protocols: Box::new(new_actual_protocols),
1417 event_request: event_request.clone(),
1418 ledger_hash: actual_ledger_event_hash,
1419 metadata: Box::new(subject_metadata.clone()),
1420 last_data: Box::new(last_data),
1421 gov_version: new_ledger_event.gov_version,
1422 sn: new_ledger_event.sn,
1423 };
1424
1425 let signed_validation_req = Signed::from_parts(
1426 validation_req,
1427 validation.validation_req_signature.clone(),
1428 );
1429
1430 if signed_validation_req.verify().is_err() {
1431 return Err(SubjectError::InvalidValidationRequestSignature);
1432 }
1433
1434 let hash_signed_val_req =
1435 hash_borsh(&*hash.hasher(), &signed_validation_req).map_err(
1436 |e| SubjectError::ValidationRequestHashFailed {
1437 details: e.to_string(),
1438 },
1439 )?;
1440
1441 if hash_signed_val_req != validation.validation_req_hash {
1442 return Err(SubjectError::ValidationRequestHashMismatch);
1443 }
1444
1445 let prop_hash = hash_borsh(
1446 &*hash.hasher(),
1447 &modified_subject_metadata.properties,
1448 )
1449 .map_err(|e| {
1450 SubjectError::ModifiedMetadataHashFailed {
1451 details: e.to_string(),
1452 }
1453 })?;
1454
1455 if let ValidationMetadata::ModifiedHash {
1456 propierties_hash,
1457 modified_metadata_without_propierties_hash,
1458 event_request_hash: validation_event_request_hash,
1459 viewpoints_hash: validation_viewpoints_hash,
1460 } = &validation.validation_metadata
1461 {
1462 if modified_metadata_without_propierties_hash
1463 != &meta_wo_props_hash
1464 {
1465 return Err(
1466 SubjectError::ModifiedMetadataWithoutPropertiesHashMismatch {
1467 expected: meta_wo_props_hash.to_string(),
1468 actual: modified_metadata_without_propierties_hash
1469 .to_string(),
1470 },
1471 );
1472 }
1473
1474 if &prop_hash != propierties_hash {
1475 return Err(SubjectError::PropertiesHashMismatch {
1476 expected: prop_hash.to_string(),
1477 actual: propierties_hash.to_string(),
1478 });
1479 }
1480
1481 if &event_request_hash != validation_event_request_hash {
1482 return Err(SubjectError::EventRequestHashMismatch {
1483 expected: event_request_hash.to_string(),
1484 actual: validation_event_request_hash.to_string(),
1485 });
1486 }
1487
1488 if &viewpoints_hash != validation_viewpoints_hash {
1489 return Err(SubjectError::ViewpointsHashMismatch {
1490 expected: viewpoints_hash.to_string(),
1491 actual: validation_viewpoints_hash.to_string(),
1492 });
1493 }
1494 } else {
1495 return Err(SubjectError::InvalidNonCreationValidationMetadata);
1496 }
1497
1498 prop_hash
1499 } else {
1500 if let ValidationMetadata::ModifiedHash {
1501 propierties_hash,
1502 modified_metadata_without_propierties_hash,
1503 event_request_hash: validation_event_request_hash,
1504 viewpoints_hash: validation_viewpoints_hash,
1505 } = &validation.validation_metadata
1506 {
1507 if modified_metadata_without_propierties_hash
1508 != &meta_wo_props_hash
1509 {
1510 return Err(
1511 SubjectError::ModifiedMetadataWithoutPropertiesHashMismatch {
1512 expected: meta_wo_props_hash.to_string(),
1513 actual: modified_metadata_without_propierties_hash
1514 .to_string(),
1515 },
1516 );
1517 }
1518
1519 if &event_request_hash != validation_event_request_hash {
1520 return Err(SubjectError::EventRequestHashMismatch {
1521 expected: event_request_hash.to_string(),
1522 actual: validation_event_request_hash.to_string(),
1523 });
1524 }
1525
1526 if &viewpoints_hash != validation_viewpoints_hash {
1527 return Err(SubjectError::ViewpointsHashMismatch {
1528 expected: viewpoints_hash.to_string(),
1529 actual: validation_viewpoints_hash.to_string(),
1530 });
1531 }
1532
1533 propierties_hash.clone()
1534 } else {
1535 return Err(SubjectError::InvalidNonCreationValidationMetadata);
1536 }
1537 };
1538
1539 let validation_res = ValidationRes::Response {
1540 vali_req_hash: validation.validation_req_hash.clone(),
1541 modified_metadata_without_propierties_hash: meta_wo_props_hash,
1542 propierties_hash,
1543 event_request_hash,
1544 viewpoints_hash,
1545 };
1546
1547 let role_data = get_validation_roles_register(
1548 ctx,
1549 &subject_metadata.governance_id,
1550 SearchRole {
1551 schema_id: subject_metadata.schema_id,
1552 namespace: subject_metadata.namespace,
1553 },
1554 new_ledger_event.gov_version,
1555 )
1556 .await
1557 .map_err(|e| SubjectError::ValidatorsRetrievalFailed {
1558 details: e.to_string(),
1559 })?;
1560
1561 if !check_quorum_signers(
1562 &validation
1563 .validators_signatures
1564 .iter()
1565 .map(|x| x.signer.clone())
1566 .collect::<HashSet<PublicKey>>(),
1567 &role_data.quorum,
1568 &role_data.workers,
1569 ) {
1570 return Err(SubjectError::InvalidQuorum);
1571 }
1572
1573 for signature in validation.validators_signatures.iter() {
1574 let signed_res =
1575 Signed::from_parts(validation_res.clone(), signature.clone());
1576
1577 if signed_res.verify().is_err() {
1578 return Err(SubjectError::InvalidValidatorSignature);
1579 }
1580 }
1581
1582 Ok(new_ledger_event.protocols.is_success())
1583 }
1584
1585 async fn verify_first_ledger_event(
1586 ctx: &mut ActorContext<Self>,
1587 ledger_event: &Ledger,
1588 hash: &HashAlgorithm,
1589 subject_metadata: Metadata,
1590 ) -> Result<(), SubjectError> {
1591 if ledger_event.sn != 0 {
1592 return Err(SubjectError::InvalidCreationSequenceNumber);
1593 }
1594
1595 let protocols_hash = ledger_event
1596 .protocols
1597 .hash_for_ledger(hash)
1598 .map_err(|e| SubjectError::HashCreationFailed {
1599 details: e.to_string(),
1600 })?;
1601
1602 let ledger_seal = LedgerSeal {
1603 gov_version: ledger_event.gov_version,
1604 sn: ledger_event.sn,
1605 prev_ledger_event_hash: ledger_event.prev_ledger_event_hash.clone(),
1606 protocols_hash,
1607 };
1608
1609 if ledger_event
1610 .ledger_seal_signature
1611 .verify(&ledger_seal)
1612 .is_err()
1613 {
1614 return Err(SubjectError::SignatureVerificationFailed {
1615 context: "first ledger event signature verification failed"
1616 .to_string(),
1617 });
1618 }
1619
1620 if ledger_event.ledger_seal_signature.signer != subject_metadata.owner {
1621 return Err(SubjectError::IncorrectSigner {
1622 expected: subject_metadata.owner.to_string(),
1623 actual: ledger_event.ledger_seal_signature.signer.to_string(),
1624 });
1625 }
1626
1627 let (validation, event_request) = match &ledger_event.protocols {
1628 Protocols::Create {
1629 validation,
1630 event_request,
1631 } => {
1632 if let EventRequest::Create(..) = event_request.content() {
1633 } else {
1634 return Err(SubjectError::EventProtocolMismatch);
1635 }
1636
1637 if event_request.verify().is_err() {
1638 return Err(SubjectError::SignatureVerificationFailed {
1639 context: "event request signature verification failed"
1640 .to_string(),
1641 });
1642 }
1643
1644 let event_request_signer =
1645 event_request.signature().signer.clone();
1646 if event_request_signer != subject_metadata.owner {
1647 return Err(SubjectError::InvalidEventRequestSigner {
1648 event: "Create".to_owned(),
1649 expected: subject_metadata.owner.to_string(),
1650 actual: event_request_signer.to_string(),
1651 });
1652 }
1653
1654 (validation, event_request)
1655 }
1656 _ => {
1657 return Err(SubjectError::EventProtocolMismatch);
1658 }
1659 };
1660
1661 if !ledger_event.prev_ledger_event_hash.is_empty() {
1662 return Err(SubjectError::NonEmptyPreviousHashInCreation);
1663 }
1664
1665 let ValidationMetadata::Metadata(metadata) =
1666 &validation.validation_metadata
1667 else {
1668 return Err(SubjectError::InvalidValidationMetadata);
1669 };
1670
1671 let validation_req = ValidationReq::Create {
1672 event_request: event_request.clone(),
1673 gov_version: ledger_event.gov_version,
1674 subject_id: subject_metadata.subject_id.clone(),
1675 };
1676
1677 let signed_validation_req = Signed::from_parts(
1678 validation_req,
1679 validation.validation_req_signature.clone(),
1680 );
1681
1682 if signed_validation_req.verify().is_err() {
1683 return Err(SubjectError::InvalidValidationRequestSignature);
1684 }
1685
1686 let hash_signed_val_req =
1687 hash_borsh(&*hash.hasher(), &signed_validation_req).map_err(
1688 |e| SubjectError::ValidationRequestHashFailed {
1689 details: e.to_string(),
1690 },
1691 )?;
1692
1693 if hash_signed_val_req != validation.validation_req_hash {
1694 return Err(SubjectError::ValidationRequestHashMismatch);
1695 }
1696
1697 if metadata.deref() != &subject_metadata {
1698 return Err(SubjectError::MetadataMismatch);
1699 }
1700
1701 if metadata.schema_id == SchemaType::Governance {
1702 serde_json::from_value::<GovernanceData>(
1703 metadata.properties.0.clone(),
1704 )
1705 .map_err(|e| {
1706 SubjectError::GovernancePropertiesConversionFailed {
1707 details: e.to_string(),
1708 }
1709 })?;
1710 }
1711
1712 let validation_res = ValidationRes::Create {
1713 vali_req_hash: hash_signed_val_req,
1714 subject_metadata: Box::new(subject_metadata),
1715 };
1716
1717 let role_data = match metadata.schema_id {
1718 SchemaType::Governance => RoleDataRegister {
1719 workers: HashSet::from([metadata.owner.clone()]),
1720 quorum: Quorum::Majority,
1721 },
1722 SchemaType::Type(_) => get_validation_roles_register(
1723 ctx,
1724 &metadata.governance_id,
1725 SearchRole {
1726 schema_id: metadata.schema_id.clone(),
1727 namespace: metadata.namespace.clone(),
1728 },
1729 ledger_event.gov_version,
1730 )
1731 .await
1732 .map_err(|e| {
1733 SubjectError::ValidatorsRetrievalFailed {
1734 details: e.to_string(),
1735 }
1736 })?,
1737 SchemaType::TrackerSchemas => {
1738 return Err(SubjectError::InvalidSchemaId);
1739 }
1740 };
1741
1742 if !check_quorum_signers(
1743 &validation
1744 .validators_signatures
1745 .iter()
1746 .map(|x| x.signer.clone())
1747 .collect::<HashSet<PublicKey>>(),
1748 &role_data.quorum,
1749 &role_data.workers,
1750 ) {
1751 return Err(SubjectError::InvalidQuorum);
1752 }
1753
1754 for signature in validation.validators_signatures.iter() {
1755 let signed_res =
1756 Signed::from_parts(validation_res.clone(), signature.clone());
1757
1758 if signed_res.verify().is_err() {
1759 return Err(SubjectError::InvalidValidatorSignature);
1760 }
1761 }
1762
1763 Ok(())
1764 }
1765
1766 async fn register(
1767 ctx: &mut ActorContext<Self>,
1768 message: RegisterMessage,
1769 ) -> Result<(), ActorError> {
1770 let register_path = ActorPath::from("/user/node/register");
1771 match ctx.system().get_actor::<Register>(®ister_path).await {
1772 Ok(register) => {
1773 register.tell(message.clone()).await?;
1774
1775 debug!(
1776 message = ?message,
1777 "Register message sent successfully"
1778 );
1779 }
1780 Err(e) => {
1781 error!(
1782 path = %register_path,
1783 "Register actor not found"
1784 );
1785 return Err(e);
1786 }
1787 };
1788
1789 Ok(())
1790 }
1791
1792 async fn event_to_sink(
1793 ctx: &mut ActorContext<Self>,
1794 data: DataForSink,
1795 event: Option<EventRequest>,
1796 ) -> Result<(), ActorError> {
1797 let msg = SinkDataMessage::Event {
1798 event: Box::new(data_to_sink_event(data.clone(), event)),
1799 event_request_timestamp: data.event_request_timestamp,
1800 event_ledger_timestamp: data.event_ledger_timestamp,
1801 };
1802
1803 Self::publish_sink(ctx, msg).await
1804 }
1805
1806 async fn publish_sink(
1807 ctx: &mut ActorContext<Self>,
1808 message: SinkDataMessage,
1809 ) -> Result<(), ActorError> {
1810 let sink_data = ctx.get_child::<SinkData>("sink_data").await?;
1811 let (subject_id, schema_id) = message.get_subject_schema();
1812
1813 sink_data.tell(message).await?;
1814 debug!(
1815 subject_id = %subject_id,
1816 schema_id = %schema_id,
1817 "Message published to sink successfully"
1818 );
1819
1820 Ok(())
1821 }
1822
1823 async fn get_ledger(
1824 &self,
1825 ctx: &mut ActorContext<Self>,
1826 lo_sn: Option<u64>,
1827 hi_sn: u64,
1828 ) -> Result<(Vec<<Self as Actor>::Event>, bool), ActorError> {
1829 if let Some(lo_sn) = lo_sn {
1830 let actual_sn = lo_sn + 1;
1831 if hi_sn < actual_sn {
1832 Ok((Vec::new(), true))
1833 } else {
1834 Ok((
1835 get_n_events(ctx, actual_sn, hi_sn - actual_sn).await?,
1836 true,
1837 ))
1838 }
1839 } else {
1840 Ok((get_n_events(ctx, 0, hi_sn).await?, true))
1841 }
1842 }
1843
1844 async fn update_sn(
1845 &self,
1846 ctx: &mut ActorContext<Self>,
1847 ) -> Result<(), ActorError>;
1848
1849 async fn reject(
1850 &self,
1851 ctx: &mut ActorContext<Self>,
1852 gov_version: u64,
1853 ) -> Result<(), ActorError>;
1854
1855 async fn confirm(
1856 &self,
1857 ctx: &mut ActorContext<Self>,
1858 new_owner: PublicKey,
1859 gov_version: u64,
1860 ) -> Result<(), ActorError>;
1861
1862 async fn transfer(
1863 &self,
1864 ctx: &mut ActorContext<Self>,
1865 new_owner: PublicKey,
1866 gov_version: u64,
1867 ) -> Result<(), ActorError>;
1868
1869 async fn eol(&self, ctx: &mut ActorContext<Self>)
1870 -> Result<(), ActorError>;
1871
1872 fn apply_patch(
1873 &mut self,
1874 json_patch: ValueWrapper,
1875 ) -> Result<(), ActorError>;
1876
1877 async fn manager_new_ledger_events(
1878 &mut self,
1879 ctx: &mut ActorContext<Self>,
1880 events: Vec<Ledger>,
1881 ) -> Result<(), ActorError>;
1882
1883 async fn get_last_ledger(
1884 &self,
1885 ctx: &mut ActorContext<Self>,
1886 ) -> Result<Option<Ledger>, ActorError>;
1887}