1use std::{collections::HashSet, sync::Arc};
2
3use crate::{
4 approval::response::ApprovalRes,
5 evaluation::response::EvaluationRes,
6 governance::{
7 data::GovernanceData,
8 model::Quorum,
9 role_register::{RoleDataRegister, SearchRole},
10 },
11 helpers::network::{NetworkMessage, service::NetworkSender},
12 model::{
13 common::{
14 check_quorum_signers, emit_fail, get_actual_roles_register,
15 get_validation_roles_register,
16 node::{SignTypesNode, get_sign},
17 },
18 event::{ApprovalData, EvaluationData, EvaluationResponse},
19 },
20 subject::{Metadata, RequestSubjectData},
21 validation::{
22 request::{ActualProtocols, LastData},
23 response::ValidatorError,
24 },
25};
26
27use crate::helpers::network::ActorMessage;
28
29use async_trait::async_trait;
30use ave_common::{
31 ValueWrapper,
32 bridge::request::EventRequestType,
33 identity::{
34 DigestIdentifier, HashAlgorithm, PublicKey, Signed, hash_borsh,
35 },
36 request::EventRequest,
37};
38use borsh::{BorshDeserialize, BorshSerialize};
39
40use json_patch::{Patch, patch};
41use network::ComunicateInfo;
42
43use ave_actors::{
44 Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
45 NotPersistentActor,
46};
47
48use tracing::{Span, debug, error, info_span, warn};
49
50use super::{
51 Validation, ValidationMessage, request::ValidationReq,
52 response::ValidationRes,
53};
54
55#[derive(
57 Clone,
58 Debug,
59 serde::Serialize,
60 serde::Deserialize,
61 BorshSerialize,
62 BorshDeserialize,
63)]
64pub struct CurrentRequestRoles {
65 pub evaluation: RoleDataRegister,
66 pub approval: RoleDataRegister,
67}
68
69#[derive(Clone, Debug)]
70pub struct CurrentWorkerRoles {
71 pub evaluation: RoleDataRegister,
72 pub approval: RoleDataRegister,
73}
74
75#[derive(Clone, Debug)]
76pub struct ValiWorker {
77 pub node_key: PublicKey,
78 pub our_key: Arc<PublicKey>,
79 pub init_state: Option<ValueWrapper>,
80 pub governance_id: DigestIdentifier,
81 pub gov_version: u64,
82 pub sn: u64,
83 pub hash: HashAlgorithm,
84 pub network: Arc<NetworkSender>,
85 pub current_roles: CurrentWorkerRoles,
86 pub stop: bool,
87}
88
89impl ValiWorker {
90 fn current_evaluation_roles(&self) -> RoleDataRegister {
91 self.current_roles.evaluation.clone()
92 }
93
94 fn current_approval_roles(&self) -> RoleDataRegister {
95 self.current_roles.approval.clone()
96 }
97
98 async fn check_governance(
99 &self,
100 gov_version: u64,
101 ) -> Result<bool, ActorError> {
102 match self.gov_version.cmp(&gov_version) {
103 std::cmp::Ordering::Less => {
104 warn!(
105 local_gov_version = self.gov_version,
106 request_gov_version = gov_version,
107 governance_id = %self.governance_id,
108 sender = %self.node_key,
109 "Received request with a higher governance version; ignoring request"
110 );
111 Err(ActorError::Functional {
112 description:
113 "Abort validation, request governance version is higher than local"
114 .to_owned(),
115 })
116 }
117 std::cmp::Ordering::Equal => {
118 Ok(false)
120 }
121 std::cmp::Ordering::Greater => Ok(true),
122 }
123 }
124
125 fn check_data(
126 &self,
127 validation_req: &Signed<ValidationReq>,
128 ) -> Result<(), ValidatorError> {
129 if !validation_req.content().is_valid() {
130 return Err(ValidatorError::InvalidData {
131 value: "validation request",
132 });
133 }
134
135 let governance_id = validation_req
136 .content()
137 .get_governance_id()
138 .map_err(|_| ValidatorError::InvalidData {
139 value: "governance_id",
140 })?;
141
142 if governance_id != self.governance_id {
143 return Err(ValidatorError::InvalidData {
144 value: "governance_id",
145 });
146 }
147
148 if validation_req.verify().is_err() {
149 return Err(ValidatorError::InvalidSignature {
150 data: "validation request",
151 });
152 }
153
154 if validation_req
155 .content()
156 .get_signed_event_request()
157 .verify()
158 .is_err()
159 {
160 return Err(ValidatorError::InvalidSignature {
161 data: "event request",
162 });
163 }
164
165 Ok(())
166 }
167
168 fn check_metadata(
169 event_type: &EventRequestType,
170 metadata: &Metadata,
171 gov_version: u64,
172 ) -> Result<(), ValidatorError> {
173 let is_gov = metadata.schema_id.is_gov();
174
175 if let Some(name) = &metadata.name
176 && (name.is_empty() || name.len() > 100)
177 {
178 return Err(ValidatorError::InvalidData {
179 value: "metadata name",
180 });
181 }
182
183 if let Some(description) = &metadata.description
184 && (description.is_empty() || description.len() > 200)
185 {
186 return Err(ValidatorError::InvalidData {
187 value: "metadata description",
188 });
189 }
190
191 if metadata.subject_id.is_empty() {
192 return Err(ValidatorError::InvalidData {
193 value: "metadata subject_id",
194 });
195 }
196
197 if is_gov && metadata.governance_id != metadata.subject_id
198 || !is_gov && metadata.governance_id == metadata.subject_id
199 {
200 return Err(ValidatorError::InvalidData {
201 value: "metadata governance_id",
202 });
203 }
204
205 if is_gov && metadata.genesis_gov_version != 0
206 || !is_gov && metadata.genesis_gov_version == 0
207 {
208 return Err(ValidatorError::InvalidData {
209 value: "metadata genesis_gov_version",
210 });
211 }
212
213 if metadata.genesis_gov_version > gov_version {
214 return Err(ValidatorError::InvalidData {
215 value: "metadata genesis_gov_version",
216 });
217 }
218
219 if metadata.sn == 0 && !metadata.prev_ledger_event_hash.is_empty()
220 || metadata.sn != 0 && metadata.prev_ledger_event_hash.is_empty()
221 {
222 return Err(ValidatorError::InvalidData {
223 value: "metadata prev_ledger_event_hash",
224 });
225 };
226
227 if !metadata.schema_id.is_valid_in_request() {
228 return Err(ValidatorError::InvalidData {
229 value: "metadata schema_id",
230 });
231 };
232
233 if is_gov && !metadata.namespace.is_empty() {
234 return Err(ValidatorError::InvalidData {
235 value: "metadata namespace",
236 });
237 }
238
239 if metadata.creator.is_empty() {
240 return Err(ValidatorError::InvalidData {
241 value: "metadata creator",
242 });
243 }
244
245 if metadata.owner.is_empty() {
246 return Err(ValidatorError::InvalidData {
247 value: "metadata owner",
248 });
249 }
250
251 if let Some(new_owner) = &metadata.new_owner
252 && (new_owner.is_empty() || new_owner == &metadata.owner)
253 {
254 return Err(ValidatorError::InvalidData {
255 value: "metadata new owner",
256 });
257 };
258
259 if !metadata.active {
260 return Err(ValidatorError::InvalidData {
261 value: "metadata active",
262 });
263 }
264
265 match event_type {
266 EventRequestType::Create => {
267 return Err(ValidatorError::InvalidData {
268 value: "Event request type",
269 });
270 }
271 EventRequestType::Confirm | EventRequestType::Reject => {
272 if metadata.new_owner.is_none() {
273 return Err(ValidatorError::InvalidData {
274 value: "Event request type",
275 });
276 }
277 }
278 EventRequestType::Fact
279 | EventRequestType::Transfer
280 | EventRequestType::Eol => {
281 if metadata.new_owner.is_some() {
282 return Err(ValidatorError::InvalidData {
283 value: "Event request type",
284 });
285 }
286 }
287 };
288
289 Ok(())
290 }
291
292 fn check_basic_data(
293 request: &Signed<EventRequest>,
294 metadata: &Metadata,
295 vali_req_signer: &PublicKey,
296 gov_version: u64,
297 sn: u64,
298 ) -> Result<(), ValidatorError> {
299 if request.verify().is_err() {
302 return Err(ValidatorError::InvalidSignature {
303 data: "event request",
304 });
305 }
306
307 Self::check_metadata(
308 &EventRequestType::from(request.content()),
309 metadata,
310 gov_version,
311 )?;
312
313 if !request.content().check_request_signature(
314 &request.signature().signer,
315 &metadata.owner,
316 &metadata.new_owner,
317 ) {
318 return Err(ValidatorError::InvalidSigner {
319 signer: request.signature().signer.to_string(),
320 });
321 }
322
323 if request.content().get_subject_id() != metadata.subject_id {
325 return Err(ValidatorError::InvalidData {
326 value: "Subject_id",
327 });
328 }
329
330 let signer = metadata
332 .new_owner
333 .clone()
334 .unwrap_or_else(|| metadata.owner.clone());
335
336 if &signer != vali_req_signer {
337 return Err(ValidatorError::InvalidSigner {
338 signer: vali_req_signer.to_string(),
339 });
340 }
341
342 if sn != metadata.sn + 1 {
344 return Err(ValidatorError::InvalidData { value: "sn" });
345 }
346 Ok(())
347 }
348
349 fn check_approval_signers(
350 agrees: &HashSet<PublicKey>,
351 disagrees: &HashSet<PublicKey>,
352 timeout: &HashSet<PublicKey>,
353 workers: &HashSet<PublicKey>,
354 ) -> bool {
355 agrees.is_subset(workers)
356 && disagrees.is_subset(workers)
357 && timeout.is_subset(workers)
358 }
359
360 fn check_approval_quorum(
361 agrees: u32,
362 timeout: u32,
363 quorum: &Quorum,
364 workers: &HashSet<PublicKey>,
365 approved: bool,
366 ) -> bool {
367 if approved {
368 quorum.check_quorum(workers.len() as u32, agrees + timeout)
369 } else {
370 !quorum.check_quorum(workers.len() as u32, agrees + timeout)
371 }
372 }
373
374 fn check_approval(
375 approval: ApprovalData,
376 appr_data: RoleDataRegister,
377 req_subject_data_hash: DigestIdentifier,
378 signer: PublicKey,
379 ) -> Result<(), ValidatorError> {
380 if signer != approval.approval_req_signature.signer {
381 return Err(ValidatorError::InvalidSigner {
382 signer: signer.to_string(),
383 });
384 }
385
386 let agrees = approval
387 .approvers_agrees_signatures
388 .iter()
389 .map(|x| x.signer.clone())
390 .collect::<HashSet<PublicKey>>();
391
392 let timeout = approval
393 .approvers_timeout
394 .iter()
395 .map(|x| x.who.clone())
396 .collect::<HashSet<PublicKey>>();
397
398 let disagrees = approval
399 .approvers_disagrees_signatures
400 .iter()
401 .map(|x| x.signer.clone())
402 .collect::<HashSet<PublicKey>>();
403
404 if !Self::check_approval_signers(
405 &agrees,
406 &disagrees,
407 &timeout,
408 &appr_data.workers,
409 ) {
410 return Err(ValidatorError::InvalidOperation {
411 action: "verify approval signers",
412 });
413 }
414
415 if !Self::check_approval_quorum(
416 agrees.len() as u32,
417 timeout.len() as u32,
418 &appr_data.quorum,
419 &appr_data.workers,
420 approval.approved,
421 ) {
422 return Err(ValidatorError::InvalidOperation {
423 action: "verify approval quorum",
424 });
425 }
426
427 let agrees_res = ApprovalRes::Response {
428 approval_req_hash: approval.approval_req_hash.clone(),
429 agrees: true,
430 req_subject_data_hash: req_subject_data_hash.clone(),
431 };
432 for signature in approval.approvers_agrees_signatures.iter() {
433 let signed_res =
434 Signed::from_parts(agrees_res.clone(), signature.clone());
435
436 if signed_res.verify().is_err() {
437 return Err(ValidatorError::InvalidSignature {
438 data: "approval agrees",
439 });
440 }
441 }
442
443 let disagrees_res = ApprovalRes::Response {
444 approval_req_hash: approval.approval_req_hash.clone(),
445 agrees: false,
446 req_subject_data_hash,
447 };
448 for signature in approval.approvers_disagrees_signatures.iter() {
449 let signed_res =
450 Signed::from_parts(disagrees_res.clone(), signature.clone());
451
452 if signed_res.verify().is_err() {
453 return Err(ValidatorError::InvalidSignature {
454 data: "approval disagrees",
455 });
456 }
457 }
458
459 Ok(())
460 }
461
462 fn check_evaluation(
463 &self,
464 evaluation: EvaluationData,
465 eval_data: RoleDataRegister,
466 mut properties: ValueWrapper,
467 req_subject_data_hash: DigestIdentifier,
468 signer: PublicKey,
469 ) -> Result<(bool, ValueWrapper), ValidatorError> {
470 if signer != evaluation.eval_req_signature.signer {
471 return Err(ValidatorError::InvalidSigner {
472 signer: signer.to_string(),
473 });
474 }
475
476 if !check_quorum_signers(
477 &evaluation
478 .evaluators_signatures
479 .iter()
480 .map(|x| x.signer.clone())
481 .collect::<HashSet<PublicKey>>(),
482 &eval_data.quorum,
483 &eval_data.workers,
484 ) {
485 return Err(ValidatorError::InvalidOperation {
486 action: "verify evaluation quorum",
487 });
488 }
489
490 let eval_res = match evaluation.response.clone() {
491 EvaluationResponse::Ok(evaluator_response) => {
492 EvaluationRes::Response {
493 response: evaluator_response,
494 eval_req_hash: evaluation.eval_req_hash.clone(),
495 req_subject_data_hash,
496 }
497 }
498 EvaluationResponse::Error(evaluator_error) => {
499 EvaluationRes::Error {
500 error: evaluator_error,
501 eval_req_hash: evaluation.eval_req_hash.clone(),
502 req_subject_data_hash,
503 }
504 }
505 };
506
507 for signature in evaluation.evaluators_signatures.iter() {
508 let signed_res =
509 Signed::from_parts(eval_res.clone(), signature.clone());
510
511 if signed_res.verify().is_err() {
512 return Err(ValidatorError::InvalidSignature {
513 data: "evaluation",
514 });
515 }
516 }
517
518 let appr_required = if let Some(evaluator_res) =
519 evaluation.evaluator_res()
520 {
521 let json_patch =
522 serde_json::from_value::<Patch>(evaluator_res.patch.0)
523 .map_err(|_| ValidatorError::InvalidData {
524 value: "evaluation patch",
525 })?;
526
527 patch(&mut properties.0, &json_patch).map_err(|_| {
528 ValidatorError::InvalidOperation {
529 action: "apply patch",
530 }
531 })?;
532
533 let properties_hash = hash_borsh(&*self.hash.hasher(), &properties)
534 .map_err(|e| ValidatorError::InternalError {
535 problem: e.to_string(),
536 })?;
537
538 if properties_hash != evaluator_res.properties_hash {
539 return Err(ValidatorError::InvalidData {
540 value: "properties_hash",
541 });
542 }
543
544 evaluator_res.appr_required
545 } else {
546 false
547 };
548
549 Ok((appr_required, properties))
550 }
551
552 async fn check_actual_protocols(
553 &self,
554 ctx: &mut ActorContext<Self>,
555 metadata: &Metadata,
556 actual_protocols: &ActualProtocols,
557 event_type: &EventRequestType,
558 gov_version: u64,
559 signer: PublicKey,
560 ) -> Result<Option<ValueWrapper>, ValidatorError> {
561 if !actual_protocols
562 .check_protocols(metadata.schema_id.is_gov(), event_type)
563 {
564 return Err(ValidatorError::InvalidData {
565 value: "actual protocols",
566 });
567 }
568
569 let (evaluation, approval) = match &actual_protocols {
570 ActualProtocols::None => (None, None),
571 ActualProtocols::Eval { eval_data } => {
572 (Some(eval_data.clone()), None)
573 }
574 ActualProtocols::EvalApprove {
575 eval_data,
576 approval_data,
577 } => (Some(eval_data.clone()), Some(approval_data.clone())),
578 };
579
580 let properties = if let Some(evaluation) = evaluation {
581 let req_subject_data_hash = hash_borsh(
582 &*self.hash.hasher(),
583 &RequestSubjectData {
584 subject_id: metadata.subject_id.clone(),
585 governance_id: metadata.governance_id.clone(),
586 sn: metadata.sn + 1,
587 namespace: metadata.namespace.clone(),
588 schema_id: metadata.schema_id.clone(),
589 gov_version,
590 signer: signer.clone(),
591 },
592 )
593 .map_err(|e| ValidatorError::InternalError {
594 problem: e.to_string(),
595 })?;
596
597 let (eval_data, appro_data) = if gov_version == self.gov_version {
598 (
599 self.current_evaluation_roles(),
600 approval.as_ref().map(|_| self.current_approval_roles()),
601 )
602 } else {
603 get_actual_roles_register(
604 ctx,
605 &metadata.governance_id,
606 SearchRole {
607 schema_id: metadata.schema_id.clone(),
608 namespace: metadata.namespace.clone(),
609 },
610 approval.is_some(),
611 gov_version,
612 )
613 .await
614 .map_err(|e| {
615 if let ActorError::UnexpectedResponse { .. } = e {
616 ValidatorError::OutOfVersion
617 } else {
618 ValidatorError::InternalError {
619 problem: e.to_string(),
620 }
621 }
622 })?
623 };
624
625 let (appr_required, properties) = self.check_evaluation(
626 evaluation,
627 eval_data,
628 metadata.properties.clone(),
629 req_subject_data_hash.clone(),
630 signer.clone(),
631 )?;
632
633 if let Some(approval) = approval
634 && let Some(appr_data) = appro_data
635 {
636 if !appr_required {
637 return Err(ValidatorError::InvalidData {
638 value: "evaluation appr_required",
639 });
640 }
641
642 Self::check_approval(
643 approval,
644 appr_data,
645 req_subject_data_hash,
646 signer,
647 )?;
648 } else if appr_required {
649 return Err(ValidatorError::InvalidData {
650 value: "evaluation appr_required",
651 });
652 }
653
654 Some(properties)
655 } else {
656 None
657 };
658
659 Ok(properties)
660 }
661
662 async fn check_last_vali_data(
663 &self,
664 ctx: &mut ActorContext<Self>,
665 metadata: &Metadata,
666 last_validation: &LastData,
667 ) -> Result<(), ValidatorError> {
668 let vali_data = get_validation_roles_register(
669 ctx,
670 &metadata.governance_id,
671 SearchRole {
672 schema_id: metadata.schema_id.clone(),
673 namespace: metadata.namespace.clone(),
674 },
675 last_validation.gov_version,
676 )
677 .await
678 .map_err(|e| {
679 if let ActorError::UnexpectedResponse { .. } = e {
680 ValidatorError::InvalidData {
681 value: "gov_version",
682 }
683 } else {
684 ValidatorError::InternalError {
685 problem: e.to_string(),
686 }
687 }
688 })?;
689
690 if !check_quorum_signers(
691 &last_validation
692 .vali_data
693 .validators_signatures
694 .iter()
695 .map(|x| x.signer.clone())
696 .collect::<HashSet<PublicKey>>(),
697 &vali_data.quorum,
698 &vali_data.workers,
699 ) {
700 return Err(ValidatorError::InvalidOperation {
701 action: "verify validation quorum",
702 });
703 }
704
705 let vali_req_hash =
706 last_validation.vali_data.validation_req_hash.clone();
707 let vali_res = if metadata.sn == 0 {
708 ValidationRes::Create {
709 vali_req_hash,
710 subject_metadata: Box::new(metadata.clone()),
711 }
712 } else {
713 let hash_metadata =
714 hash_borsh(&*self.hash.hasher(), &metadata.clone()).map_err(
715 |e| ValidatorError::InternalError {
716 problem: e.to_string(),
717 },
718 )?;
719
720 ValidationRes::Response {
721 vali_req_hash,
722 modified_metadata_hash: hash_metadata,
723 }
724 };
725
726 for signature in last_validation.vali_data.validators_signatures.iter()
727 {
728 let signed_res =
729 Signed::from_parts(vali_res.clone(), signature.clone());
730
731 if signed_res.verify().is_err() {
732 return Err(ValidatorError::InvalidSignature {
733 data: "last validation",
734 });
735 }
736 }
737
738 Ok(())
739 }
740
741 fn create_modified_metadata(
742 is_success: bool,
743 event_request: &EventRequest,
744 properties: Option<ValueWrapper>,
745 ledger_hash: DigestIdentifier,
746 mut metadata: Metadata,
747 ) -> Result<Metadata, ValidatorError> {
748 metadata.sn += 1;
749
750 metadata.prev_ledger_event_hash = ledger_hash;
751
752 if !is_success {
753 return Ok(metadata);
754 }
755
756 match event_request {
757 EventRequest::Create(..) => {
758 return Err(ValidatorError::InvalidData {
759 value: "Event request type",
760 });
761 }
762 EventRequest::Fact(..) => {
763 if let Some(properties) = properties {
764 metadata.properties = properties;
765 }
766 }
767 EventRequest::Transfer(transfer_request) => {
768 metadata.new_owner = Some(transfer_request.new_owner.clone());
769 }
770 EventRequest::Confirm(..) => {
771 if let Some(new_owner) = metadata.new_owner.take() {
772 metadata.owner = new_owner;
773 } else {
774 return Err(ValidatorError::InvalidData {
775 value: "new owner",
776 });
777 }
778
779 if let Some(properties) = properties {
780 metadata.properties = properties;
781 }
782 }
783 EventRequest::Reject(..) => metadata.new_owner = None,
784 EventRequest::EOL(..) => metadata.active = false,
785 }
786
787 if metadata.schema_id.is_gov() {
788 let mut gov_data =
789 serde_json::from_value::<GovernanceData>(metadata.properties.0)
790 .map_err(|_| ValidatorError::InvalidData {
791 value: "metadata properties",
792 })?;
793
794 gov_data.version += 1;
795 metadata.properties = gov_data.to_value_wrapper();
796 }
797
798 Ok(metadata)
799 }
800
801 async fn create_res(
802 &self,
803 ctx: &mut ActorContext<Self>,
804 reboot: bool,
805 validation_req: &Signed<ValidationReq>,
806 ) -> Result<ValidationRes, ValidatorError> {
807 if reboot {
808 Ok(ValidationRes::Reboot)
809 } else {
810 match validation_req.content() {
811 ValidationReq::Create {
812 event_request,
813 gov_version,
814 subject_id,
815 } => {
816 if let EventRequest::Create(create) =
817 event_request.content()
818 {
819 if let Some(name) = &create.name
820 && (name.is_empty() || name.len() > 100)
821 {
822 return Err(ValidatorError::InvalidData {
823 value: "create event name",
824 });
825 }
826
827 if let Some(description) = &create.description
828 && (description.is_empty()
829 || description.len() > 200)
830 {
831 return Err(ValidatorError::InvalidData {
832 value: "create event description",
833 });
834 }
835
836 if !create.schema_id.is_valid_in_request() {
837 return Err(ValidatorError::InvalidData {
838 value: "create event schema_id",
839 });
840 }
841
842 if create.schema_id.is_gov() {
843 if !create.governance_id.is_empty() {
844 return Err(ValidatorError::InvalidData {
845 value: "create event governance_id",
846 });
847 }
848
849 if !create.namespace.is_empty() {
850 return Err(ValidatorError::InvalidData {
851 value: "create event namespace",
852 });
853 }
854 } else if create.governance_id.is_empty() {
855 return Err(ValidatorError::InvalidData {
856 value: "create event governance_id",
857 });
858 }
859
860 let subject_id_worker =
861 hash_borsh(&*self.hash.hasher(), &event_request)
862 .map_err(|e| ValidatorError::InternalError {
863 problem: e.to_string(),
864 })?;
865
866 if subject_id != &subject_id_worker {
867 return Err(ValidatorError::InvalidData {
868 value: "subject_id",
869 });
870 }
871
872 let init_state = self.init_state.as_ref().map_or_else(
873 || {
874 let governance_data = GovernanceData::new(
875 validation_req.signature().signer.clone(),
876 );
877
878 governance_data.to_value_wrapper()
879 },
880 |init_state| init_state.clone(),
881 );
882
883 let governance_id = if create.schema_id.is_gov() {
884 subject_id.clone()
885 } else {
886 create.governance_id.clone()
887 };
888
889 let subject_metadata = Metadata {
890 name: create.name.clone(),
891 description: create.description.clone(),
892 subject_id: subject_id_worker,
893 governance_id,
894 genesis_gov_version: *gov_version,
895 prev_ledger_event_hash: DigestIdentifier::default(),
896 schema_id: create.schema_id.clone(),
897 namespace: create.namespace.clone(),
898 sn: 0,
899 creator: validation_req.signature().signer.clone(),
900 owner: validation_req.signature().signer.clone(),
901 new_owner: None,
902 active: true,
903 properties: init_state,
904 };
905
906 let vali_req_hash =
907 hash_borsh(&*self.hash.hasher(), &validation_req)
908 .map_err(|e| ValidatorError::InternalError {
909 problem: e.to_string(),
910 })?;
911
912 Ok(ValidationRes::Create {
913 vali_req_hash,
914 subject_metadata: Box::new(subject_metadata),
915 })
916 } else {
917 Err(ValidatorError::InvalidData {
918 value: "event type",
919 })
920 }
921 }
922 ValidationReq::Event {
923 actual_protocols,
924 event_request,
925 metadata,
926 last_data,
927 gov_version,
928 sn,
929 ledger_hash,
930 } => {
931 let signer = validation_req.signature().signer.clone();
932 Self::check_basic_data(
933 event_request,
934 metadata,
935 &signer,
936 *gov_version,
937 *sn,
938 )?;
939
940 let properties = self
941 .check_actual_protocols(
942 ctx,
943 metadata,
944 actual_protocols,
945 &EventRequestType::from(event_request.content()),
946 *gov_version,
947 signer,
948 )
949 .await?;
950
951 self.check_last_vali_data(ctx, metadata, last_data).await?;
952
953 let is_success = actual_protocols.is_success();
954
955 let modified_metadata = Self::create_modified_metadata(
956 is_success,
957 event_request.content(),
958 properties,
959 ledger_hash.clone(),
960 *metadata.clone(),
961 )?;
962
963 let vali_req_hash =
964 hash_borsh(&*self.hash.hasher(), &validation_req)
965 .map_err(|e| ValidatorError::InternalError {
966 problem: e.to_string(),
967 })?;
968
969 let modified_metadata_hash =
970 hash_borsh(&*self.hash.hasher(), &modified_metadata)
971 .map_err(|e| ValidatorError::InternalError {
972 problem: e.to_string(),
973 })?;
974
975 Ok(ValidationRes::Response {
976 vali_req_hash,
977 modified_metadata_hash,
978 })
979 }
980 }
981 }
982 }
983}
984
985#[derive(Debug, Clone)]
986pub enum ValiWorkerMessage {
987 UpdateCurrentRoles {
988 gov_version: u64,
989 current_roles: CurrentWorkerRoles,
990 },
991 LocalValidation {
992 validation_req: Box<Signed<ValidationReq>>,
993 },
994 NetworkRequest {
995 validation_req: Box<Signed<ValidationReq>>,
996 sender: PublicKey,
997 info: ComunicateInfo,
998 },
999}
1000
1001impl Message for ValiWorkerMessage {}
1002
1003#[async_trait]
1004impl Actor for ValiWorker {
1005 type Event = ();
1006 type Message = ValiWorkerMessage;
1007 type Response = ();
1008
1009 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
1010 parent_span.map_or_else(
1011 || info_span!("ValiWorker", id),
1012 |parent_span| info_span!(parent: parent_span, "ValiWorker", id),
1013 )
1014 }
1015}
1016
1017impl NotPersistentActor for ValiWorker {}
1018
1019#[async_trait]
1020impl Handler<Self> for ValiWorker {
1021 async fn handle_message(
1022 &mut self,
1023 _sender: ActorPath,
1024 msg: ValiWorkerMessage,
1025 ctx: &mut ActorContext<Self>,
1026 ) -> Result<(), ActorError> {
1027 match msg {
1028 ValiWorkerMessage::UpdateCurrentRoles {
1029 gov_version,
1030 current_roles,
1031 } => {
1032 self.gov_version = gov_version;
1033 self.current_roles = current_roles;
1034 }
1035 ValiWorkerMessage::LocalValidation { validation_req } => {
1036 let validation =
1037 match self.create_res(ctx, false, &validation_req).await {
1038 Ok(vali) => vali,
1039 Err(e) => {
1040 if matches!(e, ValidatorError::OutOfVersion) {
1041 ValidationRes::Reboot
1042 } else {
1043 return Err(emit_fail(
1044 ctx,
1045 ActorError::FunctionalCritical {
1046 description: e.to_string(),
1047 },
1048 )
1049 .await);
1050 }
1051 }
1052 };
1053
1054 let signature = match get_sign(
1055 ctx,
1056 SignTypesNode::ValidationRes(validation.clone()),
1057 )
1058 .await
1059 {
1060 Ok(signature) => signature,
1061 Err(e) => {
1062 error!(
1063 msg_type = "LocalValidation",
1064 error = %e,
1065 "Failed to sign validator response"
1066 );
1067 return Err(emit_fail(ctx, e).await);
1068 }
1069 };
1070
1071 match ctx.get_parent::<Validation>().await {
1072 Ok(validation_actor) => {
1073 validation_actor
1074 .tell(ValidationMessage::Response {
1075 validation_res: Box::new(validation),
1076 sender: (*self.our_key).clone(),
1077 signature: Some(signature),
1078 })
1079 .await?;
1080
1081 debug!(
1082 msg_type = "LocalValidation",
1083 "Validation completed and sent to parent"
1084 );
1085 }
1086 Err(e) => {
1087 error!(
1088 msg_type = "LocalValidation",
1089 "Failed to obtain Validation actor"
1090 );
1091 return Err(e);
1092 }
1093 };
1094
1095 ctx.stop(None).await;
1096 }
1097 ValiWorkerMessage::NetworkRequest {
1098 validation_req,
1099 info,
1100 sender,
1101 } => {
1102 if sender != validation_req.signature().signer
1103 || sender != self.node_key
1104 {
1105 warn!(
1106 msg_type = "NetworkRequest",
1107 expected_sender = %self.node_key,
1108 received_sender = %sender,
1109 signer = %validation_req.signature().signer,
1110 "Unexpected sender"
1111 );
1112 if self.stop {
1113 ctx.stop(None).await;
1114 }
1115
1116 return Ok(());
1117 }
1118
1119 let reboot = match self
1121 .check_governance(
1122 validation_req.content().get_gov_version(),
1123 )
1124 .await
1125 {
1126 Ok(reboot) => reboot,
1127 Err(e) => {
1128 warn!(
1129 msg_type = "NetworkRequest",
1130 error = %e,
1131 "Failed to check governance"
1132 );
1133 if let ActorError::Functional { .. } = e {
1134 if self.stop {
1135 ctx.stop(None).await;
1136 }
1137
1138 return Err(e);
1139 } else {
1140 return Err(emit_fail(ctx, e).await);
1141 }
1142 }
1143 };
1144
1145 let validation = if let Err(error) =
1146 self.check_data(&validation_req)
1147 {
1148 ValidationRes::Abort(error.to_string())
1149 } else {
1150 match self.create_res(ctx, reboot, &validation_req).await {
1151 Ok(vali) => vali,
1152 Err(e) => {
1153 if let ValidatorError::InternalError { .. } = e {
1154 error!(
1155 msg_type = "NetworkRequest",
1156 error = %e,
1157 "Internal error during validation"
1158 );
1159
1160 return Err(emit_fail(
1161 ctx,
1162 ActorError::FunctionalCritical {
1163 description: e.to_string(),
1164 },
1165 )
1166 .await);
1167 } else if matches!(e, ValidatorError::OutOfVersion)
1168 {
1169 ValidationRes::Reboot
1170 } else {
1171 ValidationRes::Abort(e.to_string())
1172 }
1173 }
1174 }
1175 };
1176
1177 let signature = match get_sign(
1178 ctx,
1179 SignTypesNode::ValidationRes(validation.clone()),
1180 )
1181 .await
1182 {
1183 Ok(signature) => signature,
1184 Err(e) => {
1185 error!(
1186 msg_type = "NetworkRequest",
1187 error = %e,
1188 "Failed to sign validation response"
1189 );
1190 return Err(emit_fail(ctx, e).await);
1191 }
1192 };
1193
1194 let new_info = ComunicateInfo {
1195 receiver: sender,
1196 request_id: info.request_id,
1197 version: info.version,
1198 receiver_actor: format!(
1199 "/user/request/{}/validation/{}",
1200 validation_req.content().get_subject_id(),
1201 self.our_key.clone()
1202 ),
1203 };
1204
1205 let signed_response: Signed<ValidationRes> =
1206 Signed::from_parts(validation, signature);
1207 if let Err(e) = self
1208 .network
1209 .send_command(network::CommandHelper::SendMessage {
1210 message: NetworkMessage {
1211 info: new_info.clone(),
1212 message: ActorMessage::ValidationRes {
1213 res: signed_response,
1214 },
1215 },
1216 })
1217 .await
1218 {
1219 error!(
1220 msg_type = "NetworkRequest",
1221 error = %e,
1222 "Failed to send response to network"
1223 );
1224 return Err(emit_fail(ctx, e).await);
1225 } else {
1226 debug!(
1227 msg_type = "NetworkRequest",
1228 receiver = %new_info.receiver,
1229 request_id = %new_info.request_id,
1230 "Validation response sent to network"
1231 );
1232 }
1233
1234 if self.stop {
1235 ctx.stop(None).await;
1236 }
1237 }
1238 }
1239
1240 Ok(())
1241 }
1242
1243 async fn on_child_fault(
1244 &mut self,
1245 error: ActorError,
1246 ctx: &mut ActorContext<Self>,
1247 ) -> ChildAction {
1248 error!(
1249 node_key = %self.node_key,
1250 governance_id = %self.governance_id,
1251 gov_version = self.gov_version,
1252 sn = self.sn,
1253 error = %error,
1254 "Child fault in validation worker"
1255 );
1256 emit_fail(ctx, error).await;
1257 ChildAction::Stop
1258 }
1259}