1use std::{collections::HashSet, sync::Arc};
2
3use crate::{
4 approval::response::ApprovalRes,
5 evaluation::response::EvaluationResult,
6 governance::{
7 data::GovernanceData,
8 model::Quorum,
9 role_register::{RoleDataRegister, SearchRole},
10 },
11 helpers::network::{NetworkMessage, service::NetworkSender},
12 model::{
13 common::{
14 check_quorum_signers, emit_fail, get_actual_roles_register,
15 get_validation_roles_register,
16 node::{SignTypesNode, get_sign},
17 },
18 event::{
19 ApprovalData, EvaluationData, EvaluationResponse,
20 ValidationMetadata,
21 },
22 },
23 subject::{Metadata, MetadataWithoutProperties, RequestSubjectData},
24 validation::{
25 request::{ActualProtocols, LastData},
26 response::ValidatorError,
27 },
28};
29
30use crate::helpers::network::ActorMessage;
31
32use async_trait::async_trait;
33use ave_common::{
34 ValueWrapper,
35 bridge::request::EventRequestType,
36 identity::{
37 DigestIdentifier, HashAlgorithm, PublicKey, Signed, hash_borsh,
38 },
39 request::EventRequest,
40};
41use borsh::{BorshDeserialize, BorshSerialize};
42
43use ave_network::ComunicateInfo;
44use json_patch::{Patch, patch};
45use std::collections::BTreeSet;
46
47use ave_actors::{
48 Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
49 NotPersistentActor,
50};
51
52use tracing::{Span, debug, error, info_span, warn};
53
54use super::{
55 Validation, ValidationMessage, request::ValidationReq,
56 response::ValidationRes,
57};
58
59#[derive(
61 Clone,
62 Debug,
63 serde::Serialize,
64 serde::Deserialize,
65 BorshSerialize,
66 BorshDeserialize,
67)]
68pub struct CurrentRequestRoles {
69 pub evaluation: RoleDataRegister,
70 pub approval: RoleDataRegister,
71}
72
73#[derive(Clone, Debug)]
74pub struct CurrentWorkerRoles {
75 pub evaluation: RoleDataRegister,
76 pub approval: RoleDataRegister,
77}
78
79#[derive(Clone, Debug)]
80pub struct ValiWorker {
81 pub node_key: PublicKey,
82 pub our_key: Arc<PublicKey>,
83 pub init_state: Option<ValueWrapper>,
84 pub governance_id: DigestIdentifier,
85 pub gov_version: u64,
86 pub sn: u64,
87 pub hash: HashAlgorithm,
88 pub network: Arc<NetworkSender>,
89 pub current_roles: CurrentWorkerRoles,
90 pub stop: bool,
91}
92
93impl ValiWorker {
94 fn event_request_hash(
95 &self,
96 event_request: &Signed<EventRequest>,
97 ) -> Result<DigestIdentifier, ValidatorError> {
98 hash_borsh(&*self.hash.hasher(), event_request).map_err(|e| {
99 ValidatorError::InternalError {
100 problem: e.to_string(),
101 }
102 })
103 }
104
105 fn viewpoints_hash(
106 &self,
107 event_request: &EventRequest,
108 ) -> Result<DigestIdentifier, ValidatorError> {
109 let viewpoints = match event_request {
110 EventRequest::Fact(fact_request) => fact_request.viewpoints.clone(),
111 _ => BTreeSet::new(),
112 };
113
114 hash_borsh(&*self.hash.hasher(), &viewpoints).map_err(|e| {
115 ValidatorError::InternalError {
116 problem: e.to_string(),
117 }
118 })
119 }
120
121 fn current_evaluation_roles(&self) -> RoleDataRegister {
122 self.current_roles.evaluation.clone()
123 }
124
125 fn current_approval_roles(&self) -> RoleDataRegister {
126 self.current_roles.approval.clone()
127 }
128
129 async fn check_governance(
130 &self,
131 gov_version: u64,
132 ) -> Result<bool, ActorError> {
133 match self.gov_version.cmp(&gov_version) {
134 std::cmp::Ordering::Less => {
135 warn!(
136 local_gov_version = self.gov_version,
137 request_gov_version = gov_version,
138 governance_id = %self.governance_id,
139 sender = %self.node_key,
140 "Received request with a higher governance version; ignoring request"
141 );
142 Err(ActorError::Functional {
143 description:
144 "Abort validation, request governance version is higher than local"
145 .to_owned(),
146 })
147 }
148 std::cmp::Ordering::Equal => {
149 Ok(false)
151 }
152 std::cmp::Ordering::Greater => Ok(true),
153 }
154 }
155
156 fn check_data(
157 &self,
158 validation_req: &Signed<ValidationReq>,
159 ) -> Result<(), ValidatorError> {
160 if !validation_req.content().is_valid() {
161 return Err(ValidatorError::InvalidData {
162 value: "validation request",
163 });
164 }
165
166 let governance_id = validation_req
167 .content()
168 .get_governance_id()
169 .map_err(|_| ValidatorError::InvalidData {
170 value: "governance_id",
171 })?;
172
173 if governance_id != self.governance_id {
174 return Err(ValidatorError::InvalidData {
175 value: "governance_id",
176 });
177 }
178
179 if validation_req.verify().is_err() {
180 return Err(ValidatorError::InvalidSignature {
181 data: "validation request",
182 });
183 }
184
185 if validation_req
186 .content()
187 .get_signed_event_request()
188 .verify()
189 .is_err()
190 {
191 return Err(ValidatorError::InvalidSignature {
192 data: "event request",
193 });
194 }
195
196 Ok(())
197 }
198
199 fn check_metadata(
200 event_type: &EventRequestType,
201 metadata: &Metadata,
202 gov_version: u64,
203 ) -> Result<(), ValidatorError> {
204 let is_gov = metadata.schema_id.is_gov();
205
206 if let Some(name) = &metadata.name
207 && (name.is_empty() || name.len() > 100)
208 {
209 return Err(ValidatorError::InvalidData {
210 value: "metadata name",
211 });
212 }
213
214 if let Some(description) = &metadata.description
215 && (description.is_empty() || description.len() > 200)
216 {
217 return Err(ValidatorError::InvalidData {
218 value: "metadata description",
219 });
220 }
221
222 if metadata.subject_id.is_empty() {
223 return Err(ValidatorError::InvalidData {
224 value: "metadata subject_id",
225 });
226 }
227
228 if is_gov && metadata.governance_id != metadata.subject_id
229 || !is_gov && metadata.governance_id == metadata.subject_id
230 {
231 return Err(ValidatorError::InvalidData {
232 value: "metadata governance_id",
233 });
234 }
235
236 if is_gov && metadata.genesis_gov_version != 0
237 || !is_gov && metadata.genesis_gov_version == 0
238 {
239 return Err(ValidatorError::InvalidData {
240 value: "metadata genesis_gov_version",
241 });
242 }
243
244 if metadata.genesis_gov_version > gov_version {
245 return Err(ValidatorError::InvalidData {
246 value: "metadata genesis_gov_version",
247 });
248 }
249
250 if metadata.sn == 0 && !metadata.prev_ledger_event_hash.is_empty()
251 || metadata.sn != 0 && metadata.prev_ledger_event_hash.is_empty()
252 {
253 return Err(ValidatorError::InvalidData {
254 value: "metadata prev_ledger_event_hash",
255 });
256 };
257
258 if !metadata.schema_id.is_valid_in_request() {
259 return Err(ValidatorError::InvalidData {
260 value: "metadata schema_id",
261 });
262 };
263
264 if is_gov && !metadata.namespace.is_empty() {
265 return Err(ValidatorError::InvalidData {
266 value: "metadata namespace",
267 });
268 }
269
270 if metadata.creator.is_empty() {
271 return Err(ValidatorError::InvalidData {
272 value: "metadata creator",
273 });
274 }
275
276 if metadata.owner.is_empty() {
277 return Err(ValidatorError::InvalidData {
278 value: "metadata owner",
279 });
280 }
281
282 if let Some(new_owner) = &metadata.new_owner
283 && (new_owner.is_empty() || new_owner == &metadata.owner)
284 {
285 return Err(ValidatorError::InvalidData {
286 value: "metadata new owner",
287 });
288 };
289
290 if !metadata.active {
291 return Err(ValidatorError::InvalidData {
292 value: "metadata active",
293 });
294 }
295
296 match event_type {
297 EventRequestType::Create => {
298 return Err(ValidatorError::InvalidData {
299 value: "Event request type",
300 });
301 }
302 EventRequestType::Confirm | EventRequestType::Reject => {
303 if metadata.new_owner.is_none() {
304 return Err(ValidatorError::InvalidData {
305 value: "Event request type",
306 });
307 }
308 }
309 EventRequestType::Fact
310 | EventRequestType::Transfer
311 | EventRequestType::Eol => {
312 if metadata.new_owner.is_some() {
313 return Err(ValidatorError::InvalidData {
314 value: "Event request type",
315 });
316 }
317 }
318 };
319
320 Ok(())
321 }
322
323 fn check_basic_data(
324 request: &Signed<EventRequest>,
325 metadata: &Metadata,
326 vali_req_signer: &PublicKey,
327 gov_version: u64,
328 sn: u64,
329 ) -> Result<(), ValidatorError> {
330 if request.verify().is_err() {
333 return Err(ValidatorError::InvalidSignature {
334 data: "event request",
335 });
336 }
337
338 Self::check_metadata(
339 &EventRequestType::from(request.content()),
340 metadata,
341 gov_version,
342 )?;
343
344 if !request.content().check_request_signature(
345 &request.signature().signer,
346 &metadata.owner,
347 &metadata.new_owner,
348 ) {
349 return Err(ValidatorError::InvalidSigner {
350 signer: request.signature().signer.to_string(),
351 });
352 }
353
354 if request.content().get_subject_id() != metadata.subject_id {
356 return Err(ValidatorError::InvalidData {
357 value: "Subject_id",
358 });
359 }
360
361 let signer = metadata
363 .new_owner
364 .clone()
365 .unwrap_or_else(|| metadata.owner.clone());
366
367 if &signer != vali_req_signer {
368 return Err(ValidatorError::InvalidSigner {
369 signer: vali_req_signer.to_string(),
370 });
371 }
372
373 if sn != metadata.sn + 1 {
375 return Err(ValidatorError::InvalidData { value: "sn" });
376 }
377 Ok(())
378 }
379
380 fn check_approval_signers(
381 agrees: &HashSet<PublicKey>,
382 disagrees: &HashSet<PublicKey>,
383 timeout: &HashSet<PublicKey>,
384 workers: &HashSet<PublicKey>,
385 ) -> bool {
386 agrees.is_subset(workers)
387 && disagrees.is_subset(workers)
388 && timeout.is_subset(workers)
389 }
390
391 fn check_approval_quorum(
392 agrees: u32,
393 timeout: u32,
394 quorum: &Quorum,
395 workers: &HashSet<PublicKey>,
396 approved: bool,
397 ) -> bool {
398 if approved {
399 quorum.check_quorum(workers.len() as u32, agrees + timeout)
400 } else {
401 !quorum.check_quorum(workers.len() as u32, agrees + timeout)
402 }
403 }
404
405 fn check_approval(
406 approval: ApprovalData,
407 appr_data: RoleDataRegister,
408 req_subject_data_hash: DigestIdentifier,
409 signer: PublicKey,
410 ) -> Result<(), ValidatorError> {
411 if signer != approval.approval_req_signature.signer {
412 return Err(ValidatorError::InvalidSigner {
413 signer: signer.to_string(),
414 });
415 }
416
417 let agrees = approval
418 .approvers_agrees_signatures
419 .iter()
420 .map(|x| x.signer.clone())
421 .collect::<HashSet<PublicKey>>();
422
423 let timeout = approval
424 .approvers_timeout
425 .iter()
426 .map(|x| x.who.clone())
427 .collect::<HashSet<PublicKey>>();
428
429 let disagrees = approval
430 .approvers_disagrees_signatures
431 .iter()
432 .map(|x| x.signer.clone())
433 .collect::<HashSet<PublicKey>>();
434
435 if !Self::check_approval_signers(
436 &agrees,
437 &disagrees,
438 &timeout,
439 &appr_data.workers,
440 ) {
441 return Err(ValidatorError::InvalidOperation {
442 action: "verify approval signers",
443 });
444 }
445
446 if !Self::check_approval_quorum(
447 agrees.len() as u32,
448 timeout.len() as u32,
449 &appr_data.quorum,
450 &appr_data.workers,
451 approval.approved,
452 ) {
453 return Err(ValidatorError::InvalidOperation {
454 action: "verify approval quorum",
455 });
456 }
457
458 let agrees_res = ApprovalRes::Response {
459 approval_req_hash: approval.approval_req_hash.clone(),
460 agrees: true,
461 req_subject_data_hash: req_subject_data_hash.clone(),
462 };
463 for signature in approval.approvers_agrees_signatures.iter() {
464 let signed_res =
465 Signed::from_parts(agrees_res.clone(), signature.clone());
466
467 if signed_res.verify().is_err() {
468 return Err(ValidatorError::InvalidSignature {
469 data: "approval agrees",
470 });
471 }
472 }
473
474 let disagrees_res = ApprovalRes::Response {
475 approval_req_hash: approval.approval_req_hash.clone(),
476 agrees: false,
477 req_subject_data_hash,
478 };
479 for signature in approval.approvers_disagrees_signatures.iter() {
480 let signed_res =
481 Signed::from_parts(disagrees_res.clone(), signature.clone());
482
483 if signed_res.verify().is_err() {
484 return Err(ValidatorError::InvalidSignature {
485 data: "approval disagrees",
486 });
487 }
488 }
489
490 Ok(())
491 }
492
493 fn check_evaluation(
494 &self,
495 evaluation: EvaluationData,
496 eval_data: RoleDataRegister,
497 mut properties: ValueWrapper,
498 req_subject_data_hash: DigestIdentifier,
499 signer: PublicKey,
500 ) -> Result<(bool, ValueWrapper), ValidatorError> {
501 if signer != evaluation.eval_req_signature.signer {
502 return Err(ValidatorError::InvalidSigner {
503 signer: signer.to_string(),
504 });
505 }
506
507 if !check_quorum_signers(
508 &evaluation
509 .evaluators_signatures
510 .iter()
511 .map(|x| x.signer.clone())
512 .collect::<HashSet<PublicKey>>(),
513 &eval_data.quorum,
514 &eval_data.workers,
515 ) {
516 return Err(ValidatorError::InvalidOperation {
517 action: "verify evaluation quorum",
518 });
519 }
520
521 let (eval_result, result_hash) = match evaluation.response.clone() {
522 EvaluationResponse::Ok {
523 result,
524 result_hash,
525 } => (
526 EvaluationResult::Ok {
527 response: result,
528 eval_req_hash: evaluation.eval_req_hash.clone(),
529 req_subject_data_hash,
530 },
531 result_hash,
532 ),
533 EvaluationResponse::Error {
534 result,
535 result_hash,
536 } => (
537 EvaluationResult::Error {
538 error: result,
539 eval_req_hash: evaluation.eval_req_hash.clone(),
540 req_subject_data_hash,
541 },
542 result_hash,
543 ),
544 };
545
546 let eval_result_hash = hash_borsh(&*self.hash.hasher(), &eval_result)
547 .map_err(|e| ValidatorError::InternalError {
548 problem: e.to_string(),
549 })?;
550
551 if eval_result_hash != result_hash {
552 return Err(ValidatorError::InvalidData {
553 value: "eval result hash",
554 });
555 }
556
557 for signature in evaluation.evaluators_signatures.iter() {
558 if signature.verify(&eval_result_hash).is_err() {
559 return Err(ValidatorError::InvalidSignature {
560 data: "evaluation",
561 });
562 }
563 }
564
565 let appr_required = if let Some(evaluator_res) =
566 evaluation.evaluator_response_ok()
567 {
568 let json_patch =
569 serde_json::from_value::<Patch>(evaluator_res.patch.0)
570 .map_err(|_| ValidatorError::InvalidData {
571 value: "evaluation patch",
572 })?;
573
574 patch(&mut properties.0, &json_patch).map_err(|_| {
575 ValidatorError::InvalidOperation {
576 action: "apply patch",
577 }
578 })?;
579
580 let properties_hash = hash_borsh(&*self.hash.hasher(), &properties)
581 .map_err(|e| ValidatorError::InternalError {
582 problem: e.to_string(),
583 })?;
584
585 if properties_hash != evaluator_res.properties_hash {
586 return Err(ValidatorError::InvalidData {
587 value: "properties_hash",
588 });
589 }
590
591 evaluator_res.appr_required
592 } else {
593 false
594 };
595
596 Ok((appr_required, properties))
597 }
598
599 async fn check_actual_protocols(
600 &self,
601 ctx: &mut ActorContext<Self>,
602 metadata: &Metadata,
603 actual_protocols: &ActualProtocols,
604 event_type: &EventRequestType,
605 gov_version: u64,
606 signer: PublicKey,
607 ) -> Result<Option<ValueWrapper>, ValidatorError> {
608 if !actual_protocols
609 .check_protocols(metadata.schema_id.is_gov(), event_type)
610 {
611 return Err(ValidatorError::InvalidData {
612 value: "actual protocols",
613 });
614 }
615
616 let (evaluation, approval) = match &actual_protocols {
617 ActualProtocols::None => (None, None),
618 ActualProtocols::Eval { eval_data } => {
619 (Some(eval_data.clone()), None)
620 }
621 ActualProtocols::EvalApprove {
622 eval_data,
623 approval_data,
624 } => (Some(eval_data.clone()), Some(approval_data.clone())),
625 };
626
627 let properties = if let Some(evaluation) = evaluation {
628 let req_subject_data_hash = hash_borsh(
629 &*self.hash.hasher(),
630 &RequestSubjectData {
631 subject_id: metadata.subject_id.clone(),
632 governance_id: metadata.governance_id.clone(),
633 sn: metadata.sn + 1,
634 namespace: metadata.namespace.clone(),
635 schema_id: metadata.schema_id.clone(),
636 gov_version,
637 signer: signer.clone(),
638 },
639 )
640 .map_err(|e| ValidatorError::InternalError {
641 problem: e.to_string(),
642 })?;
643
644 let (eval_data, appro_data) = if gov_version == self.gov_version {
645 (
646 self.current_evaluation_roles(),
647 approval.as_ref().map(|_| self.current_approval_roles()),
648 )
649 } else {
650 get_actual_roles_register(
651 ctx,
652 &metadata.governance_id,
653 SearchRole {
654 schema_id: metadata.schema_id.clone(),
655 namespace: metadata.namespace.clone(),
656 },
657 approval.is_some(),
658 gov_version,
659 )
660 .await
661 .map_err(|e| {
662 if let ActorError::UnexpectedResponse { .. } = e {
663 ValidatorError::OutOfVersion
664 } else {
665 ValidatorError::InternalError {
666 problem: e.to_string(),
667 }
668 }
669 })?
670 };
671
672 let (appr_required, properties) = self.check_evaluation(
673 evaluation,
674 eval_data,
675 metadata.properties.clone(),
676 req_subject_data_hash.clone(),
677 signer.clone(),
678 )?;
679
680 if let Some(approval) = approval
681 && let Some(appr_data) = appro_data
682 {
683 if !appr_required {
684 return Err(ValidatorError::InvalidData {
685 value: "evaluation appr_required",
686 });
687 }
688
689 Self::check_approval(
690 approval,
691 appr_data,
692 req_subject_data_hash,
693 signer,
694 )?;
695 } else if appr_required {
696 return Err(ValidatorError::InvalidData {
697 value: "evaluation appr_required",
698 });
699 }
700
701 Some(properties)
702 } else {
703 None
704 };
705
706 Ok(properties)
707 }
708
709 async fn check_last_vali_data(
710 &self,
711 ctx: &mut ActorContext<Self>,
712 metadata: &Metadata,
713 last_validation: &LastData,
714 ) -> Result<(), ValidatorError> {
715 let vali_data = get_validation_roles_register(
716 ctx,
717 &metadata.governance_id,
718 SearchRole {
719 schema_id: metadata.schema_id.clone(),
720 namespace: metadata.namespace.clone(),
721 },
722 last_validation.gov_version,
723 )
724 .await
725 .map_err(|e| {
726 if let ActorError::UnexpectedResponse { .. } = e {
727 ValidatorError::InvalidData {
728 value: "gov_version",
729 }
730 } else {
731 ValidatorError::InternalError {
732 problem: e.to_string(),
733 }
734 }
735 })?;
736
737 if !check_quorum_signers(
738 &last_validation
739 .vali_data
740 .validators_signatures
741 .iter()
742 .map(|x| x.signer.clone())
743 .collect::<HashSet<PublicKey>>(),
744 &vali_data.quorum,
745 &vali_data.workers,
746 ) {
747 return Err(ValidatorError::InvalidOperation {
748 action: "verify validation quorum",
749 });
750 }
751
752 let vali_req_hash =
753 last_validation.vali_data.validation_req_hash.clone();
754 let vali_res = if metadata.sn == 0 {
755 ValidationRes::Create {
756 vali_req_hash,
757 subject_metadata: Box::new(metadata.clone()),
758 }
759 } else {
760 let ValidationMetadata::ModifiedHash {
761 event_request_hash,
762 viewpoints_hash,
763 ..
764 } = &last_validation.vali_data.validation_metadata
765 else {
766 return Err(ValidatorError::InvalidData {
767 value: "last validation metadata",
768 });
769 };
770
771 let meta_wo_props =
772 MetadataWithoutProperties::from(metadata.clone());
773 let meta_wo_props_hash =
774 hash_borsh(&*self.hash.hasher(), &meta_wo_props).map_err(
775 |e| ValidatorError::InternalError {
776 problem: e.to_string(),
777 },
778 )?;
779
780 let propierties_hash =
781 hash_borsh(&*self.hash.hasher(), &metadata.properties)
782 .map_err(|e| ValidatorError::InternalError {
783 problem: e.to_string(),
784 })?;
785
786 ValidationRes::Response {
787 vali_req_hash,
788 modified_metadata_without_propierties_hash: meta_wo_props_hash,
789 propierties_hash,
790 event_request_hash: event_request_hash.clone(),
791 viewpoints_hash: viewpoints_hash.clone(),
792 }
793 };
794
795 for signature in last_validation.vali_data.validators_signatures.iter()
796 {
797 let signed_res =
798 Signed::from_parts(vali_res.clone(), signature.clone());
799
800 if signed_res.verify().is_err() {
801 return Err(ValidatorError::InvalidSignature {
802 data: "last validation",
803 });
804 }
805 }
806
807 Ok(())
808 }
809
810 fn create_modified_metadata(
811 is_success: bool,
812 event_request: &EventRequest,
813 properties: Option<ValueWrapper>,
814 ledger_hash: DigestIdentifier,
815 mut metadata: Metadata,
816 ) -> Result<Metadata, ValidatorError> {
817 metadata.sn += 1;
818
819 metadata.prev_ledger_event_hash = ledger_hash;
820
821 if !is_success {
822 return Ok(metadata);
823 }
824
825 match event_request {
826 EventRequest::Create(..) => {
827 return Err(ValidatorError::InvalidData {
828 value: "Event request type",
829 });
830 }
831 EventRequest::Fact(..) => {
832 if let Some(properties) = properties {
833 metadata.properties = properties;
834 }
835 }
836 EventRequest::Transfer(transfer_request) => {
837 metadata.new_owner = Some(transfer_request.new_owner.clone());
838 }
839 EventRequest::Confirm(..) => {
840 if let Some(new_owner) = metadata.new_owner.take() {
841 metadata.owner = new_owner;
842 } else {
843 return Err(ValidatorError::InvalidData {
844 value: "new owner",
845 });
846 }
847
848 if let Some(properties) = properties {
849 metadata.properties = properties;
850 }
851 }
852 EventRequest::Reject(..) => metadata.new_owner = None,
853 EventRequest::EOL(..) => metadata.active = false,
854 }
855
856 if metadata.schema_id.is_gov() {
857 let mut gov_data =
858 serde_json::from_value::<GovernanceData>(metadata.properties.0)
859 .map_err(|_| ValidatorError::InvalidData {
860 value: "metadata properties",
861 })?;
862
863 gov_data.version += 1;
864 metadata.properties = gov_data.to_value_wrapper();
865 }
866
867 Ok(metadata)
868 }
869
870 async fn create_res(
871 &self,
872 ctx: &mut ActorContext<Self>,
873 reboot: bool,
874 validation_req: &Signed<ValidationReq>,
875 ) -> Result<ValidationRes, ValidatorError> {
876 if reboot {
877 Ok(ValidationRes::Reboot)
878 } else {
879 match validation_req.content() {
880 ValidationReq::Create {
881 event_request,
882 gov_version,
883 subject_id,
884 } => {
885 if let EventRequest::Create(create) =
886 event_request.content()
887 {
888 if let Some(name) = &create.name
889 && (name.is_empty() || name.len() > 100)
890 {
891 return Err(ValidatorError::InvalidData {
892 value: "create event name",
893 });
894 }
895
896 if let Some(description) = &create.description
897 && (description.is_empty()
898 || description.len() > 200)
899 {
900 return Err(ValidatorError::InvalidData {
901 value: "create event description",
902 });
903 }
904
905 if !create.schema_id.is_valid_in_request() {
906 return Err(ValidatorError::InvalidData {
907 value: "create event schema_id",
908 });
909 }
910
911 if create.schema_id.is_gov() {
912 if !create.governance_id.is_empty() {
913 return Err(ValidatorError::InvalidData {
914 value: "create event governance_id",
915 });
916 }
917
918 if !create.namespace.is_empty() {
919 return Err(ValidatorError::InvalidData {
920 value: "create event namespace",
921 });
922 }
923 } else if create.governance_id.is_empty() {
924 return Err(ValidatorError::InvalidData {
925 value: "create event governance_id",
926 });
927 }
928
929 let subject_id_worker =
930 hash_borsh(&*self.hash.hasher(), &event_request)
931 .map_err(|e| ValidatorError::InternalError {
932 problem: e.to_string(),
933 })?;
934
935 if subject_id != &subject_id_worker {
936 return Err(ValidatorError::InvalidData {
937 value: "subject_id",
938 });
939 }
940
941 let init_state = self.init_state.as_ref().map_or_else(
942 || {
943 let governance_data = GovernanceData::new(
944 validation_req.signature().signer.clone(),
945 );
946
947 governance_data.to_value_wrapper()
948 },
949 |init_state| init_state.clone(),
950 );
951
952 let governance_id = if create.schema_id.is_gov() {
953 subject_id.clone()
954 } else {
955 create.governance_id.clone()
956 };
957
958 let subject_metadata = Metadata {
959 name: create.name.clone(),
960 description: create.description.clone(),
961 subject_id: subject_id_worker,
962 governance_id,
963 genesis_gov_version: *gov_version,
964 prev_ledger_event_hash: DigestIdentifier::default(),
965 schema_id: create.schema_id.clone(),
966 namespace: create.namespace.clone(),
967 sn: 0,
968 creator: validation_req.signature().signer.clone(),
969 owner: validation_req.signature().signer.clone(),
970 new_owner: None,
971 active: true,
972 properties: init_state,
973 };
974
975 let vali_req_hash =
976 hash_borsh(&*self.hash.hasher(), &validation_req)
977 .map_err(|e| ValidatorError::InternalError {
978 problem: e.to_string(),
979 })?;
980
981 Ok(ValidationRes::Create {
982 vali_req_hash,
983 subject_metadata: Box::new(subject_metadata),
984 })
985 } else {
986 Err(ValidatorError::InvalidData {
987 value: "event type",
988 })
989 }
990 }
991 ValidationReq::Event {
992 actual_protocols,
993 event_request,
994 metadata,
995 last_data,
996 gov_version,
997 sn,
998 ledger_hash,
999 } => {
1000 let signer = validation_req.signature().signer.clone();
1001 Self::check_basic_data(
1002 event_request,
1003 metadata,
1004 &signer,
1005 *gov_version,
1006 *sn,
1007 )?;
1008
1009 let properties = self
1010 .check_actual_protocols(
1011 ctx,
1012 metadata,
1013 actual_protocols,
1014 &EventRequestType::from(event_request.content()),
1015 *gov_version,
1016 signer,
1017 )
1018 .await?;
1019
1020 self.check_last_vali_data(ctx, metadata, last_data).await?;
1021
1022 let is_success = actual_protocols.is_success();
1023
1024 let modified_metadata = Self::create_modified_metadata(
1025 is_success,
1026 event_request.content(),
1027 properties,
1028 ledger_hash.clone(),
1029 *metadata.clone(),
1030 )?;
1031
1032 let vali_req_hash =
1033 hash_borsh(&*self.hash.hasher(), &validation_req)
1034 .map_err(|e| ValidatorError::InternalError {
1035 problem: e.to_string(),
1036 })?;
1037
1038 let meta_wo_props = MetadataWithoutProperties::from(
1039 modified_metadata.clone(),
1040 );
1041 let meta_wo_props_hash =
1042 hash_borsh(&*self.hash.hasher(), &meta_wo_props)
1043 .map_err(|e| ValidatorError::InternalError {
1044 problem: e.to_string(),
1045 })?;
1046
1047 let propierties_hash = hash_borsh(
1048 &*self.hash.hasher(),
1049 &modified_metadata.properties,
1050 )
1051 .map_err(|e| {
1052 ValidatorError::InternalError {
1053 problem: e.to_string(),
1054 }
1055 })?;
1056
1057 let event_request_hash =
1058 self.event_request_hash(event_request)?;
1059 let viewpoints_hash =
1060 self.viewpoints_hash(event_request.content())?;
1061
1062 Ok(ValidationRes::Response {
1063 vali_req_hash,
1064 modified_metadata_without_propierties_hash:
1065 meta_wo_props_hash,
1066 propierties_hash,
1067 event_request_hash,
1068 viewpoints_hash,
1069 })
1070 }
1071 }
1072 }
1073 }
1074}
1075
1076#[derive(Debug, Clone)]
1077pub enum ValiWorkerMessage {
1078 UpdateCurrentRoles {
1079 gov_version: u64,
1080 current_roles: CurrentWorkerRoles,
1081 },
1082 LocalValidation {
1083 validation_req: Box<Signed<ValidationReq>>,
1084 },
1085 NetworkRequest {
1086 validation_req: Box<Signed<ValidationReq>>,
1087 sender: PublicKey,
1088 info: ComunicateInfo,
1089 },
1090}
1091
1092impl Message for ValiWorkerMessage {}
1093
1094#[async_trait]
1095impl Actor for ValiWorker {
1096 type Event = ();
1097 type Message = ValiWorkerMessage;
1098 type Response = ();
1099
1100 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
1101 parent_span.map_or_else(
1102 || info_span!("ValiWorker", id),
1103 |parent_span| info_span!(parent: parent_span, "ValiWorker", id),
1104 )
1105 }
1106}
1107
1108impl NotPersistentActor for ValiWorker {}
1109
1110#[async_trait]
1111impl Handler<Self> for ValiWorker {
1112 async fn handle_message(
1113 &mut self,
1114 _sender: ActorPath,
1115 msg: ValiWorkerMessage,
1116 ctx: &mut ActorContext<Self>,
1117 ) -> Result<(), ActorError> {
1118 match msg {
1119 ValiWorkerMessage::UpdateCurrentRoles {
1120 gov_version,
1121 current_roles,
1122 } => {
1123 self.gov_version = gov_version;
1124 self.current_roles = current_roles;
1125 }
1126 ValiWorkerMessage::LocalValidation { validation_req } => {
1127 let validation =
1128 match self.create_res(ctx, false, &validation_req).await {
1129 Ok(vali) => vali,
1130 Err(e) => {
1131 if matches!(e, ValidatorError::OutOfVersion) {
1132 ValidationRes::Reboot
1133 } else {
1134 return Err(emit_fail(
1135 ctx,
1136 ActorError::FunctionalCritical {
1137 description: e.to_string(),
1138 },
1139 )
1140 .await);
1141 }
1142 }
1143 };
1144
1145 let signature = match get_sign(
1146 ctx,
1147 SignTypesNode::ValidationRes(validation.clone()),
1148 )
1149 .await
1150 {
1151 Ok(signature) => signature,
1152 Err(e) => {
1153 error!(
1154 msg_type = "LocalValidation",
1155 error = %e,
1156 "Failed to sign validator response"
1157 );
1158 return Err(emit_fail(ctx, e).await);
1159 }
1160 };
1161
1162 match ctx.get_parent::<Validation>().await {
1163 Ok(validation_actor) => {
1164 validation_actor
1165 .tell(ValidationMessage::Response {
1166 validation_res: Box::new(validation),
1167 sender: (*self.our_key).clone(),
1168 signature: Some(signature),
1169 })
1170 .await?;
1171
1172 debug!(
1173 msg_type = "LocalValidation",
1174 "Validation completed and sent to parent"
1175 );
1176 }
1177 Err(e) => {
1178 error!(
1179 msg_type = "LocalValidation",
1180 "Failed to obtain Validation actor"
1181 );
1182 return Err(e);
1183 }
1184 };
1185
1186 ctx.stop(None).await;
1187 }
1188 ValiWorkerMessage::NetworkRequest {
1189 validation_req,
1190 info,
1191 sender,
1192 } => {
1193 if sender != validation_req.signature().signer
1194 || sender != self.node_key
1195 {
1196 warn!(
1197 msg_type = "NetworkRequest",
1198 expected_sender = %self.node_key,
1199 received_sender = %sender,
1200 signer = %validation_req.signature().signer,
1201 "Unexpected sender"
1202 );
1203 if self.stop {
1204 ctx.stop(None).await;
1205 }
1206
1207 return Ok(());
1208 }
1209
1210 let reboot = match self
1211 .check_governance(
1212 validation_req.content().get_gov_version(),
1213 )
1214 .await
1215 {
1216 Ok(reboot) => reboot,
1217 Err(e) => {
1218 warn!(
1219 msg_type = "NetworkRequest",
1220 error = %e,
1221 "Failed to check governance"
1222 );
1223 if let ActorError::Functional { .. } = e {
1224 if self.stop {
1225 ctx.stop(None).await;
1226 }
1227
1228 return Err(e);
1229 } else {
1230 return Err(emit_fail(ctx, e).await);
1231 }
1232 }
1233 };
1234
1235 let validation = if let Err(error) =
1236 self.check_data(&validation_req)
1237 {
1238 ValidationRes::Abort(error.to_string())
1239 } else {
1240 match self.create_res(ctx, reboot, &validation_req).await {
1241 Ok(vali) => vali,
1242 Err(e) => {
1243 if let ValidatorError::InternalError { .. } = e {
1244 error!(
1245 msg_type = "NetworkRequest",
1246 error = %e,
1247 "Internal error during validation"
1248 );
1249
1250 return Err(emit_fail(
1251 ctx,
1252 ActorError::FunctionalCritical {
1253 description: e.to_string(),
1254 },
1255 )
1256 .await);
1257 } else if matches!(e, ValidatorError::OutOfVersion)
1258 {
1259 ValidationRes::Reboot
1260 } else {
1261 ValidationRes::Abort(e.to_string())
1262 }
1263 }
1264 }
1265 };
1266
1267 let signature = match get_sign(
1268 ctx,
1269 SignTypesNode::ValidationRes(validation.clone()),
1270 )
1271 .await
1272 {
1273 Ok(signature) => signature,
1274 Err(e) => {
1275 error!(
1276 msg_type = "NetworkRequest",
1277 error = %e,
1278 "Failed to sign validation response"
1279 );
1280 return Err(emit_fail(ctx, e).await);
1281 }
1282 };
1283
1284 let new_info = ComunicateInfo {
1285 receiver: sender,
1286 request_id: info.request_id,
1287 version: info.version,
1288 receiver_actor: format!(
1289 "/user/request/{}/validation/{}",
1290 validation_req.content().get_subject_id(),
1291 self.our_key.clone()
1292 ),
1293 };
1294
1295 let signed_response: Signed<ValidationRes> =
1296 Signed::from_parts(validation, signature);
1297 if let Err(e) = self
1298 .network
1299 .send_command(ave_network::CommandHelper::SendMessage {
1300 message: NetworkMessage {
1301 info: new_info.clone(),
1302 message: ActorMessage::ValidationRes {
1303 res: signed_response,
1304 },
1305 },
1306 })
1307 .await
1308 {
1309 error!(
1310 msg_type = "NetworkRequest",
1311 error = %e,
1312 "Failed to send response to network"
1313 );
1314 return Err(emit_fail(ctx, e).await);
1315 } else {
1316 debug!(
1317 msg_type = "NetworkRequest",
1318 receiver = %new_info.receiver,
1319 request_id = %new_info.request_id,
1320 "Validation response sent to network"
1321 );
1322 }
1323
1324 if self.stop {
1325 ctx.stop(None).await;
1326 }
1327 }
1328 }
1329
1330 Ok(())
1331 }
1332
1333 async fn on_child_fault(
1334 &mut self,
1335 error: ActorError,
1336 ctx: &mut ActorContext<Self>,
1337 ) -> ChildAction {
1338 error!(
1339 node_key = %self.node_key,
1340 governance_id = %self.governance_id,
1341 gov_version = self.gov_version,
1342 sn = self.sn,
1343 error = %error,
1344 "Child fault in validation worker"
1345 );
1346 emit_fail(ctx, error).await;
1347 ChildAction::Stop
1348 }
1349}