1use std::{collections::HashSet, sync::Arc};
2
3use crate::{
4 approval::response::ApprovalRes,
5 evaluation::response::EvaluationRes,
6 governance::{
7 data::GovernanceData,
8 model::Quorum,
9 role_register::{RoleDataRegister, SearchRole},
10 },
11 helpers::network::{NetworkMessage, service::NetworkSender},
12 model::{
13 common::{
14 check_quorum_signers, emit_fail, get_actual_roles_register,
15 get_validation_roles_register,
16 node::{
17 SignTypesNode, UpdateData, get_sign, update_ledger_network,
18 },
19 },
20 event::{ApprovalData, EvaluationData, EvaluationResponse},
21 },
22 subject::{Metadata, RequestSubjectData},
23 validation::{
24 request::{ActualProtocols, LastData},
25 response::ValidatorError,
26 },
27};
28
29use crate::helpers::network::ActorMessage;
30
31use async_trait::async_trait;
32use ave_common::{
33 ValueWrapper,
34 bridge::request::EventRequestType,
35 identity::{
36 DigestIdentifier, HashAlgorithm, PublicKey, Signed, hash_borsh,
37 },
38 request::EventRequest,
39};
40
41use json_patch::{Patch, patch};
42use network::ComunicateInfo;
43
44use ave_actors::{
45 Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
46 NotPersistentActor,
47};
48
49use tracing::{Span, debug, error, info_span, warn};
50
51use super::{
52 Validation, ValidationMessage, request::ValidationReq,
53 response::ValidationRes,
54};
55
56#[derive(Clone, Debug)]
58pub struct ValiWorker {
59 pub node_key: PublicKey,
60 pub our_key: Arc<PublicKey>,
61 pub init_state: Option<ValueWrapper>,
62 pub governance_id: DigestIdentifier,
63 pub gov_version: u64,
64 pub sn: u64,
65 pub hash: HashAlgorithm,
66 pub network: Arc<NetworkSender>,
67 pub stop: bool,
68}
69
70impl ValiWorker {
71 async fn check_governance(
72 &self,
73 gov_version: u64,
74 ) -> Result<bool, ActorError> {
75 match gov_version.cmp(&self.gov_version) {
76 std::cmp::Ordering::Equal => {
77 }
79 std::cmp::Ordering::Greater => {
80 let data = UpdateData {
82 sn: self.sn,
83 gov_version: self.gov_version,
84 subject_id: self.governance_id.clone(),
85 other_node: self.node_key.clone(),
86 };
87 update_ledger_network(data, self.network.clone()).await?;
88 let e = ActorError::Functional {
89 description: "Abort Validation, update is required"
90 .to_owned(),
91 };
92 return Err(e);
93 }
94 std::cmp::Ordering::Less => {
95 return Ok(true);
96 }
97 }
98
99 Ok(false)
100 }
101
102 fn check_data(
103 &self,
104 validation_req: &Signed<ValidationReq>,
105 ) -> Result<(), ValidatorError> {
106 if !validation_req.content().is_valid() {
107 return Err(ValidatorError::InvalidData {
108 value: "validation request",
109 });
110 }
111
112 let governance_id = validation_req
113 .content()
114 .get_governance_id()
115 .map_err(|_| ValidatorError::InvalidData {
116 value: "governance_id",
117 })?;
118
119 if governance_id != self.governance_id {
120 return Err(ValidatorError::InvalidData {
121 value: "governance_id",
122 });
123 }
124
125 if validation_req.verify().is_err() {
126 return Err(ValidatorError::InvalidSignature {
127 data: "validation request",
128 });
129 }
130
131 if validation_req
132 .content()
133 .get_signed_event_request()
134 .verify()
135 .is_err()
136 {
137 return Err(ValidatorError::InvalidSignature {
138 data: "event request",
139 });
140 }
141
142 Ok(())
143 }
144
145 fn check_metadata(
146 event_type: &EventRequestType,
147 metadata: &Metadata,
148 gov_version: u64,
149 ) -> Result<(), ValidatorError> {
150 let is_gov = metadata.schema_id.is_gov();
151
152 if let Some(name) = &metadata.name
153 && (name.is_empty() || name.len() > 100)
154 {
155 return Err(ValidatorError::InvalidData {
156 value: "metadata name",
157 });
158 }
159
160 if let Some(description) = &metadata.description
161 && (description.is_empty() || description.len() > 200)
162 {
163 return Err(ValidatorError::InvalidData {
164 value: "metadata description",
165 });
166 }
167
168 if metadata.subject_id.is_empty() {
169 return Err(ValidatorError::InvalidData {
170 value: "metadata subject_id",
171 });
172 }
173
174 if is_gov && metadata.governance_id != metadata.subject_id
175 || !is_gov && metadata.governance_id == metadata.subject_id
176 {
177 return Err(ValidatorError::InvalidData {
178 value: "metadata governance_id",
179 });
180 }
181
182 if is_gov && metadata.genesis_gov_version != 0
183 || !is_gov && metadata.genesis_gov_version == 0
184 {
185 return Err(ValidatorError::InvalidData {
186 value: "metadata genesis_gov_version",
187 });
188 }
189
190 if metadata.genesis_gov_version > gov_version {
191 return Err(ValidatorError::InvalidData {
192 value: "metadata genesis_gov_version",
193 });
194 }
195
196 if metadata.sn == 0 && !metadata.prev_ledger_event_hash.is_empty()
197 || metadata.sn != 0 && metadata.prev_ledger_event_hash.is_empty()
198 {
199 return Err(ValidatorError::InvalidData {
200 value: "metadata prev_ledger_event_hash",
201 });
202 };
203
204 if !metadata.schema_id.is_valid_in_request() {
205 return Err(ValidatorError::InvalidData {
206 value: "metadata schema_id",
207 });
208 };
209
210 if is_gov && !metadata.namespace.is_empty() {
211 return Err(ValidatorError::InvalidData {
212 value: "metadata namespace",
213 });
214 }
215
216 if metadata.creator.is_empty() {
217 return Err(ValidatorError::InvalidData {
218 value: "metadata creator",
219 });
220 }
221
222 if metadata.owner.is_empty() {
223 return Err(ValidatorError::InvalidData {
224 value: "metadata owner",
225 });
226 }
227
228 if let Some(new_owner) = &metadata.new_owner
229 && (new_owner.is_empty() || new_owner == &metadata.owner)
230 {
231 return Err(ValidatorError::InvalidData {
232 value: "metadata new owner",
233 });
234 };
235
236 if !metadata.active {
237 return Err(ValidatorError::InvalidData {
238 value: "metadata active",
239 });
240 }
241
242 match event_type {
243 EventRequestType::Create => {
244 return Err(ValidatorError::InvalidData {
245 value: "Event request type",
246 });
247 }
248 EventRequestType::Confirm | EventRequestType::Reject => {
249 if metadata.new_owner.is_none() {
250 return Err(ValidatorError::InvalidData {
251 value: "Event request type",
252 });
253 }
254 }
255 EventRequestType::Fact
256 | EventRequestType::Transfer
257 | EventRequestType::Eol => {
258 if metadata.new_owner.is_some() {
259 return Err(ValidatorError::InvalidData {
260 value: "Event request type",
261 });
262 }
263 }
264 };
265
266 Ok(())
267 }
268
269 fn check_basic_data(
270 request: &Signed<EventRequest>,
271 metadata: &Metadata,
272 vali_req_signer: &PublicKey,
273 gov_version: u64,
274 sn: u64,
275 ) -> Result<(), ValidatorError> {
276 if request.verify().is_err() {
280 return Err(ValidatorError::InvalidSignature {
281 data: "event request",
282 });
283 }
284
285 Self::check_metadata(
286 &EventRequestType::from(request.content()),
287 metadata,
288 gov_version,
289 )?;
290
291 if !request.content().check_request_signature(
292 &request.signature().signer,
293 &metadata.owner,
294 &metadata.new_owner,
295 ) {
296 return Err(ValidatorError::InvalidSigner {
297 signer: request.signature().signer.to_string(),
298 });
299 }
300
301 if request.content().get_subject_id() != metadata.subject_id {
303 return Err(ValidatorError::InvalidData {
304 value: "Subject_id",
305 });
306 }
307
308 let signer = metadata
310 .new_owner
311 .clone()
312 .unwrap_or_else(|| metadata.owner.clone());
313
314 if &signer != vali_req_signer {
315 return Err(ValidatorError::InvalidSigner {
316 signer: vali_req_signer.to_string(),
317 });
318 }
319
320 if sn != metadata.sn + 1 {
322 return Err(ValidatorError::InvalidData { value: "sn" });
323 }
324 Ok(())
325 }
326
327 fn check_approval_signers(
328 agrees: &HashSet<PublicKey>,
329 disagrees: &HashSet<PublicKey>,
330 timeout: &HashSet<PublicKey>,
331 workers: &HashSet<PublicKey>,
332 ) -> bool {
333 agrees.is_subset(workers)
334 && disagrees.is_subset(workers)
335 && timeout.is_subset(workers)
336 }
337
338 fn check_approval_quorum(
339 agrees: u32,
340 timeout: u32,
341 quorum: &Quorum,
342 workers: &HashSet<PublicKey>,
343 approved: bool,
344 ) -> bool {
345 if approved {
346 quorum.check_quorum(workers.len() as u32, agrees + timeout)
347 } else {
348 !quorum.check_quorum(workers.len() as u32, agrees + timeout)
349 }
350 }
351
352 fn check_approval(
353 approval: ApprovalData,
354 appr_data: RoleDataRegister,
355 req_subject_data_hash: DigestIdentifier,
356 signer: PublicKey,
357 ) -> Result<(), ValidatorError> {
358 if signer != approval.approval_req_signature.signer {
359 return Err(ValidatorError::InvalidSigner {
360 signer: signer.to_string(),
361 });
362 }
363
364 let agrees = approval
365 .approvers_agrees_signatures
366 .iter()
367 .map(|x| x.signer.clone())
368 .collect::<HashSet<PublicKey>>();
369
370 let timeout = approval
371 .approvers_timeout
372 .iter()
373 .map(|x| x.who.clone())
374 .collect::<HashSet<PublicKey>>();
375
376 let disagrees = approval
377 .approvers_disagrees_signatures
378 .iter()
379 .map(|x| x.signer.clone())
380 .collect::<HashSet<PublicKey>>();
381
382 if !Self::check_approval_signers(
383 &agrees,
384 &disagrees,
385 &timeout,
386 &appr_data.workers,
387 ) {
388 return Err(ValidatorError::InvalidOperation {
389 action: "verify approval signers",
390 });
391 }
392
393 if !Self::check_approval_quorum(
394 agrees.len() as u32,
395 timeout.len() as u32,
396 &appr_data.quorum,
397 &appr_data.workers,
398 approval.approved,
399 ) {
400 return Err(ValidatorError::InvalidOperation {
401 action: "verify approval quorum",
402 });
403 }
404
405 let agrees_res = ApprovalRes::Response {
406 approval_req_hash: approval.approval_req_hash.clone(),
407 agrees: true,
408 req_subject_data_hash: req_subject_data_hash.clone(),
409 };
410 for signature in approval.approvers_agrees_signatures.iter() {
411 let signed_res =
412 Signed::from_parts(agrees_res.clone(), signature.clone());
413
414 if signed_res.verify().is_err() {
415 return Err(ValidatorError::InvalidSignature {
416 data: "approval agrees",
417 });
418 }
419 }
420
421 let disagrees_res = ApprovalRes::Response {
422 approval_req_hash: approval.approval_req_hash.clone(),
423 agrees: false,
424 req_subject_data_hash,
425 };
426 for signature in approval.approvers_disagrees_signatures.iter() {
427 let signed_res =
428 Signed::from_parts(disagrees_res.clone(), signature.clone());
429
430 if signed_res.verify().is_err() {
431 return Err(ValidatorError::InvalidSignature {
432 data: "approval disagrees",
433 });
434 }
435 }
436
437 Ok(())
438 }
439
440 fn check_evaluation(
441 &self,
442 evaluation: EvaluationData,
443 eval_data: RoleDataRegister,
444 mut properties: ValueWrapper,
445 req_subject_data_hash: DigestIdentifier,
446 signer: PublicKey,
447 ) -> Result<(bool, ValueWrapper), ValidatorError> {
448 if signer != evaluation.eval_req_signature.signer {
449 return Err(ValidatorError::InvalidSigner {
450 signer: signer.to_string(),
451 });
452 }
453
454 if !check_quorum_signers(
455 &evaluation
456 .evaluators_signatures
457 .iter()
458 .map(|x| x.signer.clone())
459 .collect::<HashSet<PublicKey>>(),
460 &eval_data.quorum,
461 &eval_data.workers,
462 ) {
463 return Err(ValidatorError::InvalidOperation {
464 action: "verify evaluation quorum",
465 });
466 }
467
468 let eval_res = match evaluation.response.clone() {
469 EvaluationResponse::Ok(evaluator_response) => {
470 EvaluationRes::Response {
471 response: evaluator_response,
472 eval_req_hash: evaluation.eval_req_hash.clone(),
473 req_subject_data_hash,
474 }
475 }
476 EvaluationResponse::Error(evaluator_error) => {
477 EvaluationRes::Error {
478 error: evaluator_error,
479 eval_req_hash: evaluation.eval_req_hash.clone(),
480 req_subject_data_hash,
481 }
482 }
483 };
484
485 for signature in evaluation.evaluators_signatures.iter() {
486 let signed_res =
487 Signed::from_parts(eval_res.clone(), signature.clone());
488
489 if signed_res.verify().is_err() {
490 return Err(ValidatorError::InvalidSignature {
491 data: "evaluation",
492 });
493 }
494 }
495
496 let appr_required = if let Some(evaluator_res) =
497 evaluation.evaluator_res()
498 {
499 let json_patch =
500 serde_json::from_value::<Patch>(evaluator_res.patch.0)
501 .map_err(|_| ValidatorError::InvalidData {
502 value: "evaluation patch",
503 })?;
504
505 patch(&mut properties.0, &json_patch).map_err(|_| {
506 ValidatorError::InvalidOperation {
507 action: "apply patch",
508 }
509 })?;
510
511 let properties_hash = hash_borsh(&*self.hash.hasher(), &properties)
512 .map_err(|e| ValidatorError::InternalError {
513 problem: e.to_string(),
514 })?;
515
516 if properties_hash != evaluator_res.properties_hash {
517 return Err(ValidatorError::InvalidData {
518 value: "properties_hash",
519 });
520 }
521
522 evaluator_res.appr_required
523 } else {
524 false
525 };
526
527 Ok((appr_required, properties))
528 }
529
530 async fn check_actual_protocols(
531 &self,
532 ctx: &mut ActorContext<Self>,
533 metadata: &Metadata,
534 actual_protocols: &ActualProtocols,
535 event_type: &EventRequestType,
536 gov_version: u64,
537 signer: PublicKey,
538 ) -> Result<Option<ValueWrapper>, ValidatorError> {
539 if !actual_protocols
540 .check_protocols(metadata.schema_id.is_gov(), event_type)
541 {
542 return Err(ValidatorError::InvalidData {
543 value: "actual protocols",
544 });
545 }
546
547 let (evaluation, approval) = match &actual_protocols {
548 ActualProtocols::None => (None, None),
549 ActualProtocols::Eval { eval_data } => {
550 (Some(eval_data.clone()), None)
551 }
552 ActualProtocols::EvalApprove {
553 eval_data,
554 approval_data,
555 } => (Some(eval_data.clone()), Some(approval_data.clone())),
556 };
557
558 let properties = if let Some(evaluation) = evaluation {
559 let req_subject_data_hash = hash_borsh(
560 &*self.hash.hasher(),
561 &RequestSubjectData {
562 subject_id: metadata.subject_id.clone(),
563 governance_id: metadata.governance_id.clone(),
564 sn: metadata.sn + 1,
565 namespace: metadata.namespace.clone(),
566 schema_id: metadata.schema_id.clone(),
567 gov_version,
568 signer: signer.clone(),
569 },
570 )
571 .map_err(|e| ValidatorError::InternalError {
572 problem: e.to_string(),
573 })?;
574
575 let (eval_data, appro_data) = get_actual_roles_register(
576 ctx,
577 &metadata.governance_id,
578 SearchRole {
579 schema_id: metadata.schema_id.clone(),
580 namespace: metadata.namespace.clone(),
581 },
582 approval.is_some(),
583 gov_version,
584 )
585 .await
586 .map_err(|e| {
587 if let ActorError::UnexpectedResponse { .. } = e {
588 ValidatorError::OutOfVersion
589 } else {
590 ValidatorError::InternalError {
591 problem: e.to_string(),
592 }
593 }
594 })?;
595
596 let (appr_required, properties) = self.check_evaluation(
597 evaluation,
598 eval_data,
599 metadata.properties.clone(),
600 req_subject_data_hash.clone(),
601 signer.clone(),
602 )?;
603
604 if let Some(approval) = approval
605 && let Some(appr_data) = appro_data
606 {
607 if !appr_required {
608 return Err(ValidatorError::InvalidData {
609 value: "evaluation appr_required",
610 });
611 }
612
613 Self::check_approval(
614 approval,
615 appr_data,
616 req_subject_data_hash,
617 signer,
618 )?;
619 } else if appr_required {
620 return Err(ValidatorError::InvalidData {
621 value: "evaluation appr_required",
622 });
623 }
624
625 Some(properties)
626 } else {
627 None
628 };
629
630 Ok(properties)
631 }
632
633 async fn check_last_vali_data(
634 &self,
635 ctx: &mut ActorContext<Self>,
636 metadata: &Metadata,
637 last_validation: &LastData,
638 ) -> Result<(), ValidatorError> {
639 let vali_data = get_validation_roles_register(
640 ctx,
641 &metadata.governance_id,
642 SearchRole {
643 schema_id: metadata.schema_id.clone(),
644 namespace: metadata.namespace.clone(),
645 },
646 last_validation.gov_version,
647 )
648 .await
649 .map_err(|e| {
650 if let ActorError::UnexpectedResponse { .. } = e {
651 ValidatorError::InvalidData {
652 value: "gov_version",
653 }
654 } else {
655 ValidatorError::InternalError {
656 problem: e.to_string(),
657 }
658 }
659 })?;
660
661 if !check_quorum_signers(
662 &last_validation
663 .vali_data
664 .validators_signatures
665 .iter()
666 .map(|x| x.signer.clone())
667 .collect::<HashSet<PublicKey>>(),
668 &vali_data.quorum,
669 &vali_data.workers,
670 ) {
671 return Err(ValidatorError::InvalidOperation {
672 action: "verify validation quorum",
673 });
674 }
675
676 let vali_req_hash =
677 last_validation.vali_data.validation_req_hash.clone();
678 let vali_res = if metadata.sn == 0 {
679 ValidationRes::Create {
680 vali_req_hash,
681 subject_metadata: Box::new(metadata.clone()),
682 }
683 } else {
684 let hash_metadata =
685 hash_borsh(&*self.hash.hasher(), &metadata.clone()).map_err(
686 |e| ValidatorError::InternalError {
687 problem: e.to_string(),
688 },
689 )?;
690
691 ValidationRes::Response {
692 vali_req_hash,
693 modified_metadata_hash: hash_metadata,
694 }
695 };
696
697 for signature in last_validation.vali_data.validators_signatures.iter()
698 {
699 let signed_res =
700 Signed::from_parts(vali_res.clone(), signature.clone());
701
702 if signed_res.verify().is_err() {
703 return Err(ValidatorError::InvalidSignature {
704 data: "last validation",
705 });
706 }
707 }
708
709 Ok(())
710 }
711
712 fn create_modified_metadata(
713 is_success: bool,
714 event_request: &EventRequest,
715 properties: Option<ValueWrapper>,
716 ledger_hash: DigestIdentifier,
717 mut metadata: Metadata,
718 ) -> Result<Metadata, ValidatorError> {
719 metadata.sn += 1;
720
721 metadata.prev_ledger_event_hash = ledger_hash;
722
723 if !is_success {
724 return Ok(metadata);
725 }
726
727 match event_request {
728 EventRequest::Create(..) => {
729 return Err(ValidatorError::InvalidData {
730 value: "Event request type",
731 });
732 }
733 EventRequest::Fact(..) => {
734 if let Some(properties) = properties {
735 metadata.properties = properties;
736 }
737 }
738 EventRequest::Transfer(transfer_request) => {
739 metadata.new_owner = Some(transfer_request.new_owner.clone());
740 }
741 EventRequest::Confirm(..) => {
742 if let Some(new_owner) = metadata.new_owner.take() {
743 metadata.owner = new_owner;
744 } else {
745 return Err(ValidatorError::InvalidData {
746 value: "new owner",
747 });
748 }
749
750 if let Some(properties) = properties {
751 metadata.properties = properties;
752 }
753 }
754 EventRequest::Reject(..) => metadata.new_owner = None,
755 EventRequest::EOL(..) => metadata.active = false,
756 }
757
758 if metadata.schema_id.is_gov() {
759 let mut gov_data =
760 serde_json::from_value::<GovernanceData>(metadata.properties.0)
761 .map_err(|_| ValidatorError::InvalidData {
762 value: "metadata properties",
763 })?;
764
765 gov_data.version += 1;
766 metadata.properties = gov_data.to_value_wrapper();
767 }
768
769 Ok(metadata)
770 }
771
772 async fn create_res(
773 &self,
774 ctx: &mut ActorContext<Self>,
775 reboot: bool,
776 validation_req: &Signed<ValidationReq>,
777 ) -> Result<ValidationRes, ValidatorError> {
778 if reboot {
779 Ok(ValidationRes::Reboot)
780 } else {
781 match validation_req.content() {
782 ValidationReq::Create {
783 event_request,
784 gov_version,
785 subject_id,
786 } => {
787 if let EventRequest::Create(create) =
789 event_request.content()
790 {
791 if let Some(name) = &create.name
792 && (name.is_empty() || name.len() > 100)
793 {
794 return Err(ValidatorError::InvalidData {
795 value: "create event name",
796 });
797 }
798
799 if let Some(description) = &create.description
800 && (description.is_empty()
801 || description.len() > 200)
802 {
803 return Err(ValidatorError::InvalidData {
804 value: "create event description",
805 });
806 }
807
808 if !create.schema_id.is_valid_in_request() {
809 return Err(ValidatorError::InvalidData {
810 value: "create event schema_id",
811 });
812 }
813
814 if create.schema_id.is_gov() {
815 if !create.governance_id.is_empty() {
816 return Err(ValidatorError::InvalidData {
817 value: "create event governance_id",
818 });
819 }
820
821 if !create.namespace.is_empty() {
822 return Err(ValidatorError::InvalidData {
823 value: "create event namespace",
824 });
825 }
826 } else if create.governance_id.is_empty() {
827 return Err(ValidatorError::InvalidData {
828 value: "create event governance_id",
829 });
830 }
831
832 let subject_id_worker =
833 hash_borsh(&*self.hash.hasher(), &event_request)
834 .map_err(|e| ValidatorError::InternalError {
835 problem: e.to_string(),
836 })?;
837
838 if subject_id != &subject_id_worker {
839 return Err(ValidatorError::InvalidData {
840 value: "subject_id",
841 });
842 }
843
844 let init_state = self.init_state.as_ref().map_or_else(
845 || {
846 let governance_data = GovernanceData::new(
847 validation_req.signature().signer.clone(),
848 );
849
850 governance_data.to_value_wrapper()
851 },
852 |init_state| init_state.clone(),
853 );
854
855 let governance_id = if create.schema_id.is_gov() {
856 subject_id.clone()
857 } else {
858 create.governance_id.clone()
859 };
860
861 let subject_metadata = Metadata {
862 name: create.name.clone(),
863 description: create.description.clone(),
864 subject_id: subject_id_worker,
865 governance_id,
866 genesis_gov_version: *gov_version,
867 prev_ledger_event_hash: DigestIdentifier::default(),
868 schema_id: create.schema_id.clone(),
869 namespace: create.namespace.clone(),
870 sn: 0,
871 creator: validation_req.signature().signer.clone(),
872 owner: validation_req.signature().signer.clone(),
873 new_owner: None,
874 active: true,
875 properties: init_state,
876 };
877
878 let vali_req_hash =
879 hash_borsh(&*self.hash.hasher(), &validation_req)
880 .map_err(|e| ValidatorError::InternalError {
881 problem: e.to_string(),
882 })?;
883
884 Ok(ValidationRes::Create {
885 vali_req_hash,
886 subject_metadata: Box::new(subject_metadata),
887 })
888 } else {
889 Err(ValidatorError::InvalidData {
890 value: "event type",
891 })
892 }
893 }
894 ValidationReq::Event {
895 actual_protocols,
896 event_request,
897 metadata,
898 last_data,
899 gov_version,
900 sn,
901 ledger_hash,
902 } => {
903 let signer = validation_req.signature().signer.clone();
904 Self::check_basic_data(
905 event_request,
906 metadata,
907 &signer,
908 *gov_version,
909 *sn,
910 )?;
911
912 let properties = self
913 .check_actual_protocols(
914 ctx,
915 metadata,
916 actual_protocols,
917 &EventRequestType::from(event_request.content()),
918 *gov_version,
919 signer,
920 )
921 .await?;
922
923 self.check_last_vali_data(ctx, metadata, last_data).await?;
924
925 let is_success = actual_protocols.is_success();
926
927 let modified_metadata = Self::create_modified_metadata(
928 is_success,
929 event_request.content(),
930 properties,
931 ledger_hash.clone(),
932 *metadata.clone(),
933 )?;
934
935 let vali_req_hash =
936 hash_borsh(&*self.hash.hasher(), &validation_req)
937 .map_err(|e| ValidatorError::InternalError {
938 problem: e.to_string(),
939 })?;
940
941 let modified_metadata_hash =
942 hash_borsh(&*self.hash.hasher(), &modified_metadata)
943 .map_err(|e| ValidatorError::InternalError {
944 problem: e.to_string(),
945 })?;
946
947 Ok(ValidationRes::Response {
948 vali_req_hash,
949 modified_metadata_hash,
950 })
951 }
952 }
953 }
954 }
955}
956
957#[derive(Debug, Clone)]
958pub enum ValiWorkerMessage {
959 UpdateGovVersion {
960 gov_version: u64,
961 },
962 LocalValidation {
963 validation_req: Box<Signed<ValidationReq>>,
964 },
965 NetworkRequest {
966 validation_req: Box<Signed<ValidationReq>>,
967 sender: PublicKey,
968 info: ComunicateInfo,
969 },
970}
971
972impl Message for ValiWorkerMessage {}
973
974#[async_trait]
975impl Actor for ValiWorker {
976 type Event = ();
977 type Message = ValiWorkerMessage;
978 type Response = ();
979
980 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
981 parent_span.map_or_else(
982 || info_span!("ValiWorker", id),
983 |parent_span| info_span!(parent: parent_span, "ValiWorker", id),
984 )
985 }
986}
987
988impl NotPersistentActor for ValiWorker {}
989
990#[async_trait]
991impl Handler<Self> for ValiWorker {
992 async fn handle_message(
993 &mut self,
994 _sender: ActorPath,
995 msg: ValiWorkerMessage,
996 ctx: &mut ActorContext<Self>,
997 ) -> Result<(), ActorError> {
998 match msg {
999 ValiWorkerMessage::UpdateGovVersion { gov_version } => {
1000 self.gov_version = gov_version;
1001 }
1002 ValiWorkerMessage::LocalValidation { validation_req } => {
1003 let validation =
1004 match self.create_res(ctx, false, &validation_req).await {
1005 Ok(vali) => vali,
1006 Err(e) => {
1007 if matches!(e, ValidatorError::OutOfVersion) {
1008 ValidationRes::Reboot
1009 } else {
1010 return Err(emit_fail(
1011 ctx,
1012 ActorError::FunctionalCritical {
1013 description: e.to_string(),
1014 },
1015 )
1016 .await);
1017 }
1018 }
1019 };
1020
1021 let signature = match get_sign(
1022 ctx,
1023 SignTypesNode::ValidationRes(validation.clone()),
1024 )
1025 .await
1026 {
1027 Ok(signature) => signature,
1028 Err(e) => {
1029 error!(
1030 msg_type = "LocalValidation",
1031 error = %e,
1032 "Failed to sign validator response"
1033 );
1034 return Err(emit_fail(ctx, e).await);
1035 }
1036 };
1037
1038 match ctx.get_parent::<Validation>().await {
1039 Ok(validation_actor) => {
1040 validation_actor
1041 .tell(ValidationMessage::Response {
1042 validation_res: Box::new(validation),
1043 sender: (*self.our_key).clone(),
1044 signature: Some(signature),
1045 })
1046 .await?;
1047
1048 debug!(
1049 msg_type = "LocalValidation",
1050 "Validation completed and sent to parent"
1051 );
1052 }
1053 Err(e) => {
1054 error!(
1055 msg_type = "LocalValidation",
1056 "Failed to obtain Validation actor"
1057 );
1058 return Err(e);
1059 }
1060 };
1061
1062 ctx.stop(None).await;
1063 }
1064 ValiWorkerMessage::NetworkRequest {
1065 validation_req,
1066 info,
1067 sender,
1068 } => {
1069 if sender != validation_req.signature().signer
1070 || sender != self.node_key
1071 {
1072 warn!(
1073 msg_type = "NetworkRequest",
1074 expected_sender = %self.node_key,
1075 received_sender = %sender,
1076 signer = %validation_req.signature().signer,
1077 "Unexpected sender"
1078 );
1079 if self.stop {
1080 ctx.stop(None).await;
1081 }
1082
1083 return Ok(());
1084 }
1085
1086 let reboot = match self
1088 .check_governance(
1089 validation_req.content().get_gov_version(),
1090 )
1091 .await
1092 {
1093 Ok(reboot) => reboot,
1094 Err(e) => {
1095 warn!(
1096 msg_type = "NetworkRequest",
1097 error = %e,
1098 "Failed to check governance"
1099 );
1100 if let ActorError::Functional { .. } = e {
1101 if self.stop {
1102 ctx.stop(None).await;
1103 }
1104
1105 return Err(e);
1106 } else {
1107 return Err(emit_fail(ctx, e).await);
1108 }
1109 }
1110 };
1111
1112 let validation = if let Err(error) =
1113 self.check_data(&validation_req)
1114 {
1115 ValidationRes::Abort(error.to_string())
1116 } else {
1117 match self.create_res(ctx, reboot, &validation_req).await {
1118 Ok(vali) => vali,
1119 Err(e) => {
1120 if let ValidatorError::InternalError { .. } = e {
1121 error!(
1122 msg_type = "NetworkRequest",
1123 error = %e,
1124 "Internal error during validation"
1125 );
1126
1127 return Err(emit_fail(
1128 ctx,
1129 ActorError::FunctionalCritical {
1130 description: e.to_string(),
1131 },
1132 )
1133 .await);
1134 } else if matches!(e, ValidatorError::OutOfVersion)
1135 {
1136 ValidationRes::Reboot
1137 } else {
1138 ValidationRes::Abort(e.to_string())
1139 }
1140 }
1141 }
1142 };
1143
1144 let signature = match get_sign(
1145 ctx,
1146 SignTypesNode::ValidationRes(validation.clone()),
1147 )
1148 .await
1149 {
1150 Ok(signature) => signature,
1151 Err(e) => {
1152 error!(
1153 msg_type = "NetworkRequest",
1154 error = %e,
1155 "Failed to sign validation response"
1156 );
1157 return Err(emit_fail(ctx, e).await);
1158 }
1159 };
1160
1161 let new_info = ComunicateInfo {
1162 receiver: sender,
1163 request_id: info.request_id,
1164 version: info.version,
1165 receiver_actor: format!(
1166 "/user/request/{}/validation/{}",
1167 validation_req.content().get_subject_id(),
1168 self.our_key.clone()
1169 ),
1170 };
1171
1172 let signed_response: Signed<ValidationRes> =
1173 Signed::from_parts(validation, signature);
1174 if let Err(e) = self
1175 .network
1176 .send_command(network::CommandHelper::SendMessage {
1177 message: NetworkMessage {
1178 info: new_info.clone(),
1179 message: ActorMessage::ValidationRes {
1180 res: signed_response,
1181 },
1182 },
1183 })
1184 .await
1185 {
1186 error!(
1187 msg_type = "NetworkRequest",
1188 error = %e,
1189 "Failed to send response to network"
1190 );
1191 return Err(emit_fail(ctx, e).await);
1192 } else {
1193 debug!(
1194 msg_type = "NetworkRequest",
1195 receiver = %new_info.receiver,
1196 request_id = %new_info.request_id,
1197 "Validation response sent to network"
1198 );
1199 }
1200
1201 if self.stop {
1202 ctx.stop(None).await;
1203 }
1204 }
1205 }
1206
1207 Ok(())
1208 }
1209
1210 async fn on_child_fault(
1211 &mut self,
1212 error: ActorError,
1213 ctx: &mut ActorContext<Self>,
1214 ) -> ChildAction {
1215 error!(
1216 node_key = %self.node_key,
1217 governance_id = %self.governance_id,
1218 gov_version = self.gov_version,
1219 sn = self.sn,
1220 error = %error,
1221 "Child fault in validation worker"
1222 );
1223 emit_fail(ctx, error).await;
1224 ChildAction::Stop
1225 }
1226}