Skip to main content

ave_core/validation/
worker.rs

1use std::{collections::HashSet, sync::Arc};
2
3use crate::{
4    approval::response::ApprovalRes,
5    evaluation::response::EvaluationResult,
6    governance::{
7        data::GovernanceData,
8        model::Quorum,
9        role_register::{RoleDataRegister, SearchRole},
10    },
11    helpers::network::{NetworkMessage, service::NetworkSender},
12    model::{
13        common::{
14            check_quorum_signers, emit_fail, get_actual_roles_register,
15            get_validation_roles_register,
16            node::{SignTypesNode, get_sign},
17        },
18        event::{
19            ApprovalData, EvaluationData, EvaluationResponse,
20            ValidationMetadata,
21        },
22    },
23    subject::{Metadata, MetadataWithoutProperties, RequestSubjectData},
24    validation::{
25        request::{ActualProtocols, LastData},
26        response::ValidatorError,
27    },
28};
29
30use crate::helpers::network::ActorMessage;
31
32use async_trait::async_trait;
33use ave_common::{
34    ValueWrapper,
35    bridge::request::EventRequestType,
36    identity::{
37        DigestIdentifier, HashAlgorithm, PublicKey, Signed, hash_borsh,
38    },
39    request::EventRequest,
40};
41use borsh::{BorshDeserialize, BorshSerialize};
42
43use ave_network::ComunicateInfo;
44use json_patch::{Patch, patch};
45use std::collections::BTreeSet;
46
47use ave_actors::{
48    Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
49    NotPersistentActor,
50};
51
52use tracing::{Span, debug, error, info_span, warn};
53
54use super::{
55    Validation, ValidationMessage, request::ValidationReq,
56    response::ValidationRes,
57};
58
59/// A struct representing a ValiWorker actor.
60#[derive(
61    Clone,
62    Debug,
63    serde::Serialize,
64    serde::Deserialize,
65    BorshSerialize,
66    BorshDeserialize,
67)]
68pub struct CurrentRequestRoles {
69    pub evaluation: RoleDataRegister,
70    pub approval: RoleDataRegister,
71}
72
73#[derive(Clone, Debug)]
74pub struct CurrentWorkerRoles {
75    pub evaluation: RoleDataRegister,
76    pub approval: RoleDataRegister,
77}
78
79#[derive(Clone, Debug)]
80pub struct ValiWorker {
81    pub node_key: PublicKey,
82    pub our_key: Arc<PublicKey>,
83    pub init_state: Option<ValueWrapper>,
84    pub governance_id: DigestIdentifier,
85    pub gov_version: u64,
86    pub sn: u64,
87    pub hash: HashAlgorithm,
88    pub network: Arc<NetworkSender>,
89    pub current_roles: CurrentWorkerRoles,
90    pub stop: bool,
91}
92
93impl ValiWorker {
94    fn event_request_hash(
95        &self,
96        event_request: &Signed<EventRequest>,
97    ) -> Result<DigestIdentifier, ValidatorError> {
98        hash_borsh(&*self.hash.hasher(), event_request).map_err(|e| {
99            ValidatorError::InternalError {
100                problem: e.to_string(),
101            }
102        })
103    }
104
105    fn viewpoints_hash(
106        &self,
107        event_request: &EventRequest,
108    ) -> Result<DigestIdentifier, ValidatorError> {
109        let viewpoints = match event_request {
110            EventRequest::Fact(fact_request) => fact_request.viewpoints.clone(),
111            _ => BTreeSet::new(),
112        };
113
114        hash_borsh(&*self.hash.hasher(), &viewpoints).map_err(|e| {
115            ValidatorError::InternalError {
116                problem: e.to_string(),
117            }
118        })
119    }
120
121    fn current_evaluation_roles(&self) -> RoleDataRegister {
122        self.current_roles.evaluation.clone()
123    }
124
125    fn current_approval_roles(&self) -> RoleDataRegister {
126        self.current_roles.approval.clone()
127    }
128
129    async fn check_governance(
130        &self,
131        gov_version: u64,
132    ) -> Result<bool, ActorError> {
133        match self.gov_version.cmp(&gov_version) {
134            std::cmp::Ordering::Less => {
135                warn!(
136                    local_gov_version = self.gov_version,
137                    request_gov_version = gov_version,
138                    governance_id = %self.governance_id,
139                    sender = %self.node_key,
140                    "Received request with a higher governance version; ignoring request"
141                );
142                Err(ActorError::Functional {
143                    description:
144                        "Abort validation, request governance version is higher than local"
145                            .to_owned(),
146                })
147            }
148            std::cmp::Ordering::Equal => {
149                // If it is the same it means that we have the latest version of governance, we are up to date.
150                Ok(false)
151            }
152            std::cmp::Ordering::Greater => Ok(true),
153        }
154    }
155
156    fn check_data(
157        &self,
158        validation_req: &Signed<ValidationReq>,
159    ) -> Result<(), ValidatorError> {
160        if !validation_req.content().is_valid() {
161            return Err(ValidatorError::InvalidData {
162                value: "validation request",
163            });
164        }
165
166        let governance_id = validation_req
167            .content()
168            .get_governance_id()
169            .map_err(|_| ValidatorError::InvalidData {
170                value: "governance_id",
171            })?;
172
173        if governance_id != self.governance_id {
174            return Err(ValidatorError::InvalidData {
175                value: "governance_id",
176            });
177        }
178
179        if validation_req.verify().is_err() {
180            return Err(ValidatorError::InvalidSignature {
181                data: "validation request",
182            });
183        }
184
185        if validation_req
186            .content()
187            .get_signed_event_request()
188            .verify()
189            .is_err()
190        {
191            return Err(ValidatorError::InvalidSignature {
192                data: "event request",
193            });
194        }
195
196        Ok(())
197    }
198
199    fn check_metadata(
200        event_type: &EventRequestType,
201        metadata: &Metadata,
202        gov_version: u64,
203    ) -> Result<(), ValidatorError> {
204        let is_gov = metadata.schema_id.is_gov();
205
206        if let Some(name) = &metadata.name
207            && (name.is_empty() || name.len() > 100)
208        {
209            return Err(ValidatorError::InvalidData {
210                value: "metadata name",
211            });
212        }
213
214        if let Some(description) = &metadata.description
215            && (description.is_empty() || description.len() > 200)
216        {
217            return Err(ValidatorError::InvalidData {
218                value: "metadata description",
219            });
220        }
221
222        if metadata.subject_id.is_empty() {
223            return Err(ValidatorError::InvalidData {
224                value: "metadata subject_id",
225            });
226        }
227
228        if is_gov && metadata.governance_id != metadata.subject_id
229            || !is_gov && metadata.governance_id == metadata.subject_id
230        {
231            return Err(ValidatorError::InvalidData {
232                value: "metadata governance_id",
233            });
234        }
235
236        if is_gov && metadata.genesis_gov_version != 0
237            || !is_gov && metadata.genesis_gov_version == 0
238        {
239            return Err(ValidatorError::InvalidData {
240                value: "metadata genesis_gov_version",
241            });
242        }
243
244        if metadata.genesis_gov_version > gov_version {
245            return Err(ValidatorError::InvalidData {
246                value: "metadata genesis_gov_version",
247            });
248        }
249
250        if metadata.sn == 0 && !metadata.prev_ledger_event_hash.is_empty()
251            || metadata.sn != 0 && metadata.prev_ledger_event_hash.is_empty()
252        {
253            return Err(ValidatorError::InvalidData {
254                value: "metadata prev_ledger_event_hash",
255            });
256        };
257
258        if !metadata.schema_id.is_valid_in_request() {
259            return Err(ValidatorError::InvalidData {
260                value: "metadata schema_id",
261            });
262        };
263
264        if is_gov && !metadata.namespace.is_empty() {
265            return Err(ValidatorError::InvalidData {
266                value: "metadata namespace",
267            });
268        }
269
270        if metadata.creator.is_empty() {
271            return Err(ValidatorError::InvalidData {
272                value: "metadata creator",
273            });
274        }
275
276        if metadata.owner.is_empty() {
277            return Err(ValidatorError::InvalidData {
278                value: "metadata owner",
279            });
280        }
281
282        if let Some(new_owner) = &metadata.new_owner
283            && (new_owner.is_empty() || new_owner == &metadata.owner)
284        {
285            return Err(ValidatorError::InvalidData {
286                value: "metadata new owner",
287            });
288        };
289
290        if !metadata.active {
291            return Err(ValidatorError::InvalidData {
292                value: "metadata active",
293            });
294        }
295
296        match event_type {
297            EventRequestType::Create => {
298                return Err(ValidatorError::InvalidData {
299                    value: "Event request type",
300                });
301            }
302            EventRequestType::Confirm | EventRequestType::Reject => {
303                if metadata.new_owner.is_none() {
304                    return Err(ValidatorError::InvalidData {
305                        value: "Event request type",
306                    });
307                }
308            }
309            EventRequestType::Fact
310            | EventRequestType::Transfer
311            | EventRequestType::Eol => {
312                if metadata.new_owner.is_some() {
313                    return Err(ValidatorError::InvalidData {
314                        value: "Event request type",
315                    });
316                }
317            }
318        };
319
320        Ok(())
321    }
322
323    fn check_basic_data(
324        request: &Signed<EventRequest>,
325        metadata: &Metadata,
326        vali_req_signer: &PublicKey,
327        gov_version: u64,
328        sn: u64,
329    ) -> Result<(), ValidatorError> {
330        // Check event request.
331
332        if request.verify().is_err() {
333            return Err(ValidatorError::InvalidSignature {
334                data: "event request",
335            });
336        }
337
338        Self::check_metadata(
339            &EventRequestType::from(request.content()),
340            metadata,
341            gov_version,
342        )?;
343
344        if !request.content().check_request_signature(
345            &request.signature().signer,
346            &metadata.owner,
347            &metadata.new_owner,
348        ) {
349            return Err(ValidatorError::InvalidSigner {
350                signer: request.signature().signer.to_string(),
351            });
352        }
353
354        // subject
355        if request.content().get_subject_id() != metadata.subject_id {
356            return Err(ValidatorError::InvalidData {
357                value: "Subject_id",
358            });
359        }
360
361        // vali request signer
362        let signer = metadata
363            .new_owner
364            .clone()
365            .unwrap_or_else(|| metadata.owner.clone());
366
367        if &signer != vali_req_signer {
368            return Err(ValidatorError::InvalidSigner {
369                signer: vali_req_signer.to_string(),
370            });
371        }
372
373        // sn
374        if sn != metadata.sn + 1 {
375            return Err(ValidatorError::InvalidData { value: "sn" });
376        }
377        Ok(())
378    }
379
380    fn check_approval_signers(
381        agrees: &HashSet<PublicKey>,
382        disagrees: &HashSet<PublicKey>,
383        timeout: &HashSet<PublicKey>,
384        workers: &HashSet<PublicKey>,
385    ) -> bool {
386        agrees.is_subset(workers)
387            && disagrees.is_subset(workers)
388            && timeout.is_subset(workers)
389    }
390
391    fn check_approval_quorum(
392        agrees: u32,
393        timeout: u32,
394        quorum: &Quorum,
395        workers: &HashSet<PublicKey>,
396        approved: bool,
397    ) -> bool {
398        if approved {
399            quorum.check_quorum(workers.len() as u32, agrees + timeout)
400        } else {
401            !quorum.check_quorum(workers.len() as u32, agrees + timeout)
402        }
403    }
404
405    fn check_approval(
406        approval: ApprovalData,
407        appr_data: RoleDataRegister,
408        req_subject_data_hash: DigestIdentifier,
409        signer: PublicKey,
410    ) -> Result<(), ValidatorError> {
411        if signer != approval.approval_req_signature.signer {
412            return Err(ValidatorError::InvalidSigner {
413                signer: signer.to_string(),
414            });
415        }
416
417        let agrees = approval
418            .approvers_agrees_signatures
419            .iter()
420            .map(|x| x.signer.clone())
421            .collect::<HashSet<PublicKey>>();
422
423        let timeout = approval
424            .approvers_timeout
425            .iter()
426            .map(|x| x.who.clone())
427            .collect::<HashSet<PublicKey>>();
428
429        let disagrees = approval
430            .approvers_disagrees_signatures
431            .iter()
432            .map(|x| x.signer.clone())
433            .collect::<HashSet<PublicKey>>();
434
435        if !Self::check_approval_signers(
436            &agrees,
437            &disagrees,
438            &timeout,
439            &appr_data.workers,
440        ) {
441            return Err(ValidatorError::InvalidOperation {
442                action: "verify approval signers",
443            });
444        }
445
446        if !Self::check_approval_quorum(
447            agrees.len() as u32,
448            timeout.len() as u32,
449            &appr_data.quorum,
450            &appr_data.workers,
451            approval.approved,
452        ) {
453            return Err(ValidatorError::InvalidOperation {
454                action: "verify approval quorum",
455            });
456        }
457
458        let agrees_res = ApprovalRes::Response {
459            approval_req_hash: approval.approval_req_hash.clone(),
460            agrees: true,
461            req_subject_data_hash: req_subject_data_hash.clone(),
462        };
463        for signature in approval.approvers_agrees_signatures.iter() {
464            let signed_res =
465                Signed::from_parts(agrees_res.clone(), signature.clone());
466
467            if signed_res.verify().is_err() {
468                return Err(ValidatorError::InvalidSignature {
469                    data: "approval agrees",
470                });
471            }
472        }
473
474        let disagrees_res = ApprovalRes::Response {
475            approval_req_hash: approval.approval_req_hash.clone(),
476            agrees: false,
477            req_subject_data_hash,
478        };
479        for signature in approval.approvers_disagrees_signatures.iter() {
480            let signed_res =
481                Signed::from_parts(disagrees_res.clone(), signature.clone());
482
483            if signed_res.verify().is_err() {
484                return Err(ValidatorError::InvalidSignature {
485                    data: "approval disagrees",
486                });
487            }
488        }
489
490        Ok(())
491    }
492
493    fn check_evaluation(
494        &self,
495        evaluation: EvaluationData,
496        eval_data: RoleDataRegister,
497        mut properties: ValueWrapper,
498        req_subject_data_hash: DigestIdentifier,
499        signer: PublicKey,
500    ) -> Result<(bool, ValueWrapper), ValidatorError> {
501        if signer != evaluation.eval_req_signature.signer {
502            return Err(ValidatorError::InvalidSigner {
503                signer: signer.to_string(),
504            });
505        }
506
507        if !check_quorum_signers(
508            &evaluation
509                .evaluators_signatures
510                .iter()
511                .map(|x| x.signer.clone())
512                .collect::<HashSet<PublicKey>>(),
513            &eval_data.quorum,
514            &eval_data.workers,
515        ) {
516            return Err(ValidatorError::InvalidOperation {
517                action: "verify evaluation quorum",
518            });
519        }
520
521        let (eval_result, result_hash) = match evaluation.response.clone() {
522            EvaluationResponse::Ok {
523                result,
524                result_hash,
525            } => (
526                EvaluationResult::Ok {
527                    response: result,
528                    eval_req_hash: evaluation.eval_req_hash.clone(),
529                    req_subject_data_hash,
530                },
531                result_hash,
532            ),
533            EvaluationResponse::Error {
534                result,
535                result_hash,
536            } => (
537                EvaluationResult::Error {
538                    error: result,
539                    eval_req_hash: evaluation.eval_req_hash.clone(),
540                    req_subject_data_hash,
541                },
542                result_hash,
543            ),
544        };
545
546        let eval_result_hash = hash_borsh(&*self.hash.hasher(), &eval_result)
547            .map_err(|e| ValidatorError::InternalError {
548            problem: e.to_string(),
549        })?;
550
551        if eval_result_hash != result_hash {
552            return Err(ValidatorError::InvalidData {
553                value: "eval result hash",
554            });
555        }
556
557        for signature in evaluation.evaluators_signatures.iter() {
558            if signature.verify(&eval_result_hash).is_err() {
559                return Err(ValidatorError::InvalidSignature {
560                    data: "evaluation",
561                });
562            }
563        }
564
565        let appr_required = if let Some(evaluator_res) =
566            evaluation.evaluator_response_ok()
567        {
568            let json_patch =
569                serde_json::from_value::<Patch>(evaluator_res.patch.0)
570                    .map_err(|_| ValidatorError::InvalidData {
571                        value: "evaluation patch",
572                    })?;
573
574            patch(&mut properties.0, &json_patch).map_err(|_| {
575                ValidatorError::InvalidOperation {
576                    action: "apply patch",
577                }
578            })?;
579
580            let properties_hash = hash_borsh(&*self.hash.hasher(), &properties)
581                .map_err(|e| ValidatorError::InternalError {
582                    problem: e.to_string(),
583                })?;
584
585            if properties_hash != evaluator_res.properties_hash {
586                return Err(ValidatorError::InvalidData {
587                    value: "properties_hash",
588                });
589            }
590
591            evaluator_res.appr_required
592        } else {
593            false
594        };
595
596        Ok((appr_required, properties))
597    }
598
599    async fn check_actual_protocols(
600        &self,
601        ctx: &mut ActorContext<Self>,
602        metadata: &Metadata,
603        actual_protocols: &ActualProtocols,
604        event_type: &EventRequestType,
605        gov_version: u64,
606        signer: PublicKey,
607    ) -> Result<Option<ValueWrapper>, ValidatorError> {
608        if !actual_protocols
609            .check_protocols(metadata.schema_id.is_gov(), event_type)
610        {
611            return Err(ValidatorError::InvalidData {
612                value: "actual protocols",
613            });
614        }
615
616        let (evaluation, approval) = match &actual_protocols {
617            ActualProtocols::None => (None, None),
618            ActualProtocols::Eval { eval_data } => {
619                (Some(eval_data.clone()), None)
620            }
621            ActualProtocols::EvalApprove {
622                eval_data,
623                approval_data,
624            } => (Some(eval_data.clone()), Some(approval_data.clone())),
625        };
626
627        let properties = if let Some(evaluation) = evaluation {
628            let req_subject_data_hash = hash_borsh(
629                &*self.hash.hasher(),
630                &RequestSubjectData {
631                    subject_id: metadata.subject_id.clone(),
632                    governance_id: metadata.governance_id.clone(),
633                    sn: metadata.sn + 1,
634                    namespace: metadata.namespace.clone(),
635                    schema_id: metadata.schema_id.clone(),
636                    gov_version,
637                    signer: signer.clone(),
638                },
639            )
640            .map_err(|e| ValidatorError::InternalError {
641                problem: e.to_string(),
642            })?;
643
644            let (eval_data, appro_data) = if gov_version == self.gov_version {
645                (
646                    self.current_evaluation_roles(),
647                    approval.as_ref().map(|_| self.current_approval_roles()),
648                )
649            } else {
650                get_actual_roles_register(
651                    ctx,
652                    &metadata.governance_id,
653                    SearchRole {
654                        schema_id: metadata.schema_id.clone(),
655                        namespace: metadata.namespace.clone(),
656                    },
657                    approval.is_some(),
658                    gov_version,
659                )
660                .await
661                .map_err(|e| {
662                    if let ActorError::UnexpectedResponse { .. } = e {
663                        ValidatorError::OutOfVersion
664                    } else {
665                        ValidatorError::InternalError {
666                            problem: e.to_string(),
667                        }
668                    }
669                })?
670            };
671
672            let (appr_required, properties) = self.check_evaluation(
673                evaluation,
674                eval_data,
675                metadata.properties.clone(),
676                req_subject_data_hash.clone(),
677                signer.clone(),
678            )?;
679
680            if let Some(approval) = approval
681                && let Some(appr_data) = appro_data
682            {
683                if !appr_required {
684                    return Err(ValidatorError::InvalidData {
685                        value: "evaluation appr_required",
686                    });
687                }
688
689                Self::check_approval(
690                    approval,
691                    appr_data,
692                    req_subject_data_hash,
693                    signer,
694                )?;
695            } else if appr_required {
696                return Err(ValidatorError::InvalidData {
697                    value: "evaluation appr_required",
698                });
699            }
700
701            Some(properties)
702        } else {
703            None
704        };
705
706        Ok(properties)
707    }
708
709    async fn check_last_vali_data(
710        &self,
711        ctx: &mut ActorContext<Self>,
712        metadata: &Metadata,
713        last_validation: &LastData,
714    ) -> Result<(), ValidatorError> {
715        let vali_data = get_validation_roles_register(
716            ctx,
717            &metadata.governance_id,
718            SearchRole {
719                schema_id: metadata.schema_id.clone(),
720                namespace: metadata.namespace.clone(),
721            },
722            last_validation.gov_version,
723        )
724        .await
725        .map_err(|e| {
726            if let ActorError::UnexpectedResponse { .. } = e {
727                ValidatorError::InvalidData {
728                    value: "gov_version",
729                }
730            } else {
731                ValidatorError::InternalError {
732                    problem: e.to_string(),
733                }
734            }
735        })?;
736
737        if !check_quorum_signers(
738            &last_validation
739                .vali_data
740                .validators_signatures
741                .iter()
742                .map(|x| x.signer.clone())
743                .collect::<HashSet<PublicKey>>(),
744            &vali_data.quorum,
745            &vali_data.workers,
746        ) {
747            return Err(ValidatorError::InvalidOperation {
748                action: "verify validation quorum",
749            });
750        }
751
752        let vali_req_hash =
753            last_validation.vali_data.validation_req_hash.clone();
754        let vali_res = if metadata.sn == 0 {
755            ValidationRes::Create {
756                vali_req_hash,
757                subject_metadata: Box::new(metadata.clone()),
758            }
759        } else {
760            let ValidationMetadata::ModifiedHash {
761                event_request_hash,
762                viewpoints_hash,
763                ..
764            } = &last_validation.vali_data.validation_metadata
765            else {
766                return Err(ValidatorError::InvalidData {
767                    value: "last validation metadata",
768                });
769            };
770
771            let meta_wo_props =
772                MetadataWithoutProperties::from(metadata.clone());
773            let meta_wo_props_hash =
774                hash_borsh(&*self.hash.hasher(), &meta_wo_props).map_err(
775                    |e| ValidatorError::InternalError {
776                        problem: e.to_string(),
777                    },
778                )?;
779
780            let propierties_hash =
781                hash_borsh(&*self.hash.hasher(), &metadata.properties)
782                    .map_err(|e| ValidatorError::InternalError {
783                        problem: e.to_string(),
784                    })?;
785
786            ValidationRes::Response {
787                vali_req_hash,
788                modified_metadata_without_propierties_hash: meta_wo_props_hash,
789                propierties_hash,
790                event_request_hash: event_request_hash.clone(),
791                viewpoints_hash: viewpoints_hash.clone(),
792            }
793        };
794
795        for signature in last_validation.vali_data.validators_signatures.iter()
796        {
797            let signed_res =
798                Signed::from_parts(vali_res.clone(), signature.clone());
799
800            if signed_res.verify().is_err() {
801                return Err(ValidatorError::InvalidSignature {
802                    data: "last validation",
803                });
804            }
805        }
806
807        Ok(())
808    }
809
810    fn create_modified_metadata(
811        is_success: bool,
812        event_request: &EventRequest,
813        properties: Option<ValueWrapper>,
814        ledger_hash: DigestIdentifier,
815        mut metadata: Metadata,
816    ) -> Result<Metadata, ValidatorError> {
817        metadata.sn += 1;
818
819        metadata.prev_ledger_event_hash = ledger_hash;
820
821        if !is_success {
822            return Ok(metadata);
823        }
824
825        match event_request {
826            EventRequest::Create(..) => {
827                return Err(ValidatorError::InvalidData {
828                    value: "Event request type",
829                });
830            }
831            EventRequest::Fact(..) => {
832                if let Some(properties) = properties {
833                    metadata.properties = properties;
834                }
835            }
836            EventRequest::Transfer(transfer_request) => {
837                metadata.new_owner = Some(transfer_request.new_owner.clone());
838            }
839            EventRequest::Confirm(..) => {
840                if let Some(new_owner) = metadata.new_owner.take() {
841                    metadata.owner = new_owner;
842                } else {
843                    return Err(ValidatorError::InvalidData {
844                        value: "new owner",
845                    });
846                }
847
848                if let Some(properties) = properties {
849                    metadata.properties = properties;
850                }
851            }
852            EventRequest::Reject(..) => metadata.new_owner = None,
853            EventRequest::EOL(..) => metadata.active = false,
854        }
855
856        if metadata.schema_id.is_gov() {
857            let mut gov_data =
858                serde_json::from_value::<GovernanceData>(metadata.properties.0)
859                    .map_err(|_| ValidatorError::InvalidData {
860                        value: "metadata properties",
861                    })?;
862
863            gov_data.version += 1;
864            metadata.properties = gov_data.to_value_wrapper();
865        }
866
867        Ok(metadata)
868    }
869
870    async fn create_res(
871        &self,
872        ctx: &mut ActorContext<Self>,
873        reboot: bool,
874        validation_req: &Signed<ValidationReq>,
875    ) -> Result<ValidationRes, ValidatorError> {
876        if reboot {
877            Ok(ValidationRes::Reboot)
878        } else {
879            match validation_req.content() {
880                ValidationReq::Create {
881                    event_request,
882                    gov_version,
883                    subject_id,
884                } => {
885                    if let EventRequest::Create(create) =
886                        event_request.content()
887                    {
888                        if let Some(name) = &create.name
889                            && (name.is_empty() || name.len() > 100)
890                        {
891                            return Err(ValidatorError::InvalidData {
892                                value: "create event name",
893                            });
894                        }
895
896                        if let Some(description) = &create.description
897                            && (description.is_empty()
898                                || description.len() > 200)
899                        {
900                            return Err(ValidatorError::InvalidData {
901                                value: "create event description",
902                            });
903                        }
904
905                        if !create.schema_id.is_valid_in_request() {
906                            return Err(ValidatorError::InvalidData {
907                                value: "create event schema_id",
908                            });
909                        }
910
911                        if create.schema_id.is_gov() {
912                            if !create.governance_id.is_empty() {
913                                return Err(ValidatorError::InvalidData {
914                                    value: "create event governance_id",
915                                });
916                            }
917
918                            if !create.namespace.is_empty() {
919                                return Err(ValidatorError::InvalidData {
920                                    value: "create event namespace",
921                                });
922                            }
923                        } else if create.governance_id.is_empty() {
924                            return Err(ValidatorError::InvalidData {
925                                value: "create event governance_id",
926                            });
927                        }
928
929                        let subject_id_worker =
930                            hash_borsh(&*self.hash.hasher(), &event_request)
931                                .map_err(|e| ValidatorError::InternalError {
932                                    problem: e.to_string(),
933                                })?;
934
935                        if subject_id != &subject_id_worker {
936                            return Err(ValidatorError::InvalidData {
937                                value: "subject_id",
938                            });
939                        }
940
941                        let init_state = self.init_state.as_ref().map_or_else(
942                            || {
943                                let governance_data = GovernanceData::new(
944                                    validation_req.signature().signer.clone(),
945                                );
946
947                                governance_data.to_value_wrapper()
948                            },
949                            |init_state| init_state.clone(),
950                        );
951
952                        let governance_id = if create.schema_id.is_gov() {
953                            subject_id.clone()
954                        } else {
955                            create.governance_id.clone()
956                        };
957
958                        let subject_metadata = Metadata {
959                            name: create.name.clone(),
960                            description: create.description.clone(),
961                            subject_id: subject_id_worker,
962                            governance_id,
963                            genesis_gov_version: *gov_version,
964                            prev_ledger_event_hash: DigestIdentifier::default(),
965                            schema_id: create.schema_id.clone(),
966                            namespace: create.namespace.clone(),
967                            sn: 0,
968                            creator: validation_req.signature().signer.clone(),
969                            owner: validation_req.signature().signer.clone(),
970                            new_owner: None,
971                            active: true,
972                            properties: init_state,
973                        };
974
975                        let vali_req_hash =
976                            hash_borsh(&*self.hash.hasher(), &validation_req)
977                                .map_err(|e| ValidatorError::InternalError {
978                                problem: e.to_string(),
979                            })?;
980
981                        Ok(ValidationRes::Create {
982                            vali_req_hash,
983                            subject_metadata: Box::new(subject_metadata),
984                        })
985                    } else {
986                        Err(ValidatorError::InvalidData {
987                            value: "event type",
988                        })
989                    }
990                }
991                ValidationReq::Event {
992                    actual_protocols,
993                    event_request,
994                    metadata,
995                    last_data,
996                    gov_version,
997                    sn,
998                    ledger_hash,
999                } => {
1000                    let signer = validation_req.signature().signer.clone();
1001                    Self::check_basic_data(
1002                        event_request,
1003                        metadata,
1004                        &signer,
1005                        *gov_version,
1006                        *sn,
1007                    )?;
1008
1009                    let properties = self
1010                        .check_actual_protocols(
1011                            ctx,
1012                            metadata,
1013                            actual_protocols,
1014                            &EventRequestType::from(event_request.content()),
1015                            *gov_version,
1016                            signer,
1017                        )
1018                        .await?;
1019
1020                    self.check_last_vali_data(ctx, metadata, last_data).await?;
1021
1022                    let is_success = actual_protocols.is_success();
1023
1024                    let modified_metadata = Self::create_modified_metadata(
1025                        is_success,
1026                        event_request.content(),
1027                        properties,
1028                        ledger_hash.clone(),
1029                        *metadata.clone(),
1030                    )?;
1031
1032                    let vali_req_hash =
1033                        hash_borsh(&*self.hash.hasher(), &validation_req)
1034                            .map_err(|e| ValidatorError::InternalError {
1035                                problem: e.to_string(),
1036                            })?;
1037
1038                    let meta_wo_props = MetadataWithoutProperties::from(
1039                        modified_metadata.clone(),
1040                    );
1041                    let meta_wo_props_hash =
1042                        hash_borsh(&*self.hash.hasher(), &meta_wo_props)
1043                            .map_err(|e| ValidatorError::InternalError {
1044                                problem: e.to_string(),
1045                            })?;
1046
1047                    let propierties_hash = hash_borsh(
1048                        &*self.hash.hasher(),
1049                        &modified_metadata.properties,
1050                    )
1051                    .map_err(|e| {
1052                        ValidatorError::InternalError {
1053                            problem: e.to_string(),
1054                        }
1055                    })?;
1056
1057                    let event_request_hash =
1058                        self.event_request_hash(event_request)?;
1059                    let viewpoints_hash =
1060                        self.viewpoints_hash(event_request.content())?;
1061
1062                    Ok(ValidationRes::Response {
1063                        vali_req_hash,
1064                        modified_metadata_without_propierties_hash:
1065                            meta_wo_props_hash,
1066                        propierties_hash,
1067                        event_request_hash,
1068                        viewpoints_hash,
1069                    })
1070                }
1071            }
1072        }
1073    }
1074}
1075
1076#[derive(Debug, Clone)]
1077pub enum ValiWorkerMessage {
1078    UpdateCurrentRoles {
1079        gov_version: u64,
1080        current_roles: CurrentWorkerRoles,
1081    },
1082    LocalValidation {
1083        validation_req: Box<Signed<ValidationReq>>,
1084    },
1085    NetworkRequest {
1086        validation_req: Box<Signed<ValidationReq>>,
1087        sender: PublicKey,
1088        info: ComunicateInfo,
1089    },
1090}
1091
1092impl Message for ValiWorkerMessage {}
1093
1094#[async_trait]
1095impl Actor for ValiWorker {
1096    type Event = ();
1097    type Message = ValiWorkerMessage;
1098    type Response = ();
1099
1100    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
1101        parent_span.map_or_else(
1102            || info_span!("ValiWorker", id),
1103            |parent_span| info_span!(parent: parent_span, "ValiWorker", id),
1104        )
1105    }
1106}
1107
1108impl NotPersistentActor for ValiWorker {}
1109
1110#[async_trait]
1111impl Handler<Self> for ValiWorker {
1112    async fn handle_message(
1113        &mut self,
1114        _sender: ActorPath,
1115        msg: ValiWorkerMessage,
1116        ctx: &mut ActorContext<Self>,
1117    ) -> Result<(), ActorError> {
1118        match msg {
1119            ValiWorkerMessage::UpdateCurrentRoles {
1120                gov_version,
1121                current_roles,
1122            } => {
1123                self.gov_version = gov_version;
1124                self.current_roles = current_roles;
1125            }
1126            ValiWorkerMessage::LocalValidation { validation_req } => {
1127                let validation =
1128                    match self.create_res(ctx, false, &validation_req).await {
1129                        Ok(vali) => vali,
1130                        Err(e) => {
1131                            if matches!(e, ValidatorError::OutOfVersion) {
1132                                ValidationRes::Reboot
1133                            } else {
1134                                return Err(emit_fail(
1135                                    ctx,
1136                                    ActorError::FunctionalCritical {
1137                                        description: e.to_string(),
1138                                    },
1139                                )
1140                                .await);
1141                            }
1142                        }
1143                    };
1144
1145                let signature = match get_sign(
1146                    ctx,
1147                    SignTypesNode::ValidationRes(validation.clone()),
1148                )
1149                .await
1150                {
1151                    Ok(signature) => signature,
1152                    Err(e) => {
1153                        error!(
1154                            msg_type = "LocalValidation",
1155                            error = %e,
1156                            "Failed to sign validator response"
1157                        );
1158                        return Err(emit_fail(ctx, e).await);
1159                    }
1160                };
1161
1162                match ctx.get_parent::<Validation>().await {
1163                    Ok(validation_actor) => {
1164                        validation_actor
1165                            .tell(ValidationMessage::Response {
1166                                validation_res: Box::new(validation),
1167                                sender: (*self.our_key).clone(),
1168                                signature: Some(signature),
1169                            })
1170                            .await?;
1171
1172                        debug!(
1173                            msg_type = "LocalValidation",
1174                            "Validation completed and sent to parent"
1175                        );
1176                    }
1177                    Err(e) => {
1178                        error!(
1179                            msg_type = "LocalValidation",
1180                            "Failed to obtain Validation actor"
1181                        );
1182                        return Err(e);
1183                    }
1184                };
1185
1186                ctx.stop(None).await;
1187            }
1188            ValiWorkerMessage::NetworkRequest {
1189                validation_req,
1190                info,
1191                sender,
1192            } => {
1193                if sender != validation_req.signature().signer
1194                    || sender != self.node_key
1195                {
1196                    warn!(
1197                        msg_type = "NetworkRequest",
1198                        expected_sender = %self.node_key,
1199                        received_sender = %sender,
1200                        signer = %validation_req.signature().signer,
1201                        "Unexpected sender"
1202                    );
1203                    if self.stop {
1204                        ctx.stop(None).await;
1205                    }
1206
1207                    return Ok(());
1208                }
1209
1210                let reboot = match self
1211                    .check_governance(
1212                        validation_req.content().get_gov_version(),
1213                    )
1214                    .await
1215                {
1216                    Ok(reboot) => reboot,
1217                    Err(e) => {
1218                        warn!(
1219                            msg_type = "NetworkRequest",
1220                            error = %e,
1221                            "Failed to check governance"
1222                        );
1223                        if let ActorError::Functional { .. } = e {
1224                            if self.stop {
1225                                ctx.stop(None).await;
1226                            }
1227
1228                            return Err(e);
1229                        } else {
1230                            return Err(emit_fail(ctx, e).await);
1231                        }
1232                    }
1233                };
1234
1235                let validation = if let Err(error) =
1236                    self.check_data(&validation_req)
1237                {
1238                    ValidationRes::Abort(error.to_string())
1239                } else {
1240                    match self.create_res(ctx, reboot, &validation_req).await {
1241                        Ok(vali) => vali,
1242                        Err(e) => {
1243                            if let ValidatorError::InternalError { .. } = e {
1244                                error!(
1245                                    msg_type = "NetworkRequest",
1246                                    error = %e,
1247                                    "Internal error during validation"
1248                                );
1249
1250                                return Err(emit_fail(
1251                                    ctx,
1252                                    ActorError::FunctionalCritical {
1253                                        description: e.to_string(),
1254                                    },
1255                                )
1256                                .await);
1257                            } else if matches!(e, ValidatorError::OutOfVersion)
1258                            {
1259                                ValidationRes::Reboot
1260                            } else {
1261                                ValidationRes::Abort(e.to_string())
1262                            }
1263                        }
1264                    }
1265                };
1266
1267                let signature = match get_sign(
1268                    ctx,
1269                    SignTypesNode::ValidationRes(validation.clone()),
1270                )
1271                .await
1272                {
1273                    Ok(signature) => signature,
1274                    Err(e) => {
1275                        error!(
1276                            msg_type = "NetworkRequest",
1277                            error = %e,
1278                            "Failed to sign validation response"
1279                        );
1280                        return Err(emit_fail(ctx, e).await);
1281                    }
1282                };
1283
1284                let new_info = ComunicateInfo {
1285                    receiver: sender,
1286                    request_id: info.request_id,
1287                    version: info.version,
1288                    receiver_actor: format!(
1289                        "/user/request/{}/validation/{}",
1290                        validation_req.content().get_subject_id(),
1291                        self.our_key.clone()
1292                    ),
1293                };
1294
1295                let signed_response: Signed<ValidationRes> =
1296                    Signed::from_parts(validation, signature);
1297                if let Err(e) = self
1298                    .network
1299                    .send_command(ave_network::CommandHelper::SendMessage {
1300                        message: NetworkMessage {
1301                            info: new_info.clone(),
1302                            message: ActorMessage::ValidationRes {
1303                                res: signed_response,
1304                            },
1305                        },
1306                    })
1307                    .await
1308                {
1309                    error!(
1310                        msg_type = "NetworkRequest",
1311                        error = %e,
1312                        "Failed to send response to network"
1313                    );
1314                    return Err(emit_fail(ctx, e).await);
1315                } else {
1316                    debug!(
1317                        msg_type = "NetworkRequest",
1318                        receiver = %new_info.receiver,
1319                        request_id = %new_info.request_id,
1320                        "Validation response sent to network"
1321                    );
1322                }
1323
1324                if self.stop {
1325                    ctx.stop(None).await;
1326                }
1327            }
1328        }
1329
1330        Ok(())
1331    }
1332
1333    async fn on_child_fault(
1334        &mut self,
1335        error: ActorError,
1336        ctx: &mut ActorContext<Self>,
1337    ) -> ChildAction {
1338        error!(
1339            node_key = %self.node_key,
1340            governance_id = %self.governance_id,
1341            gov_version = self.gov_version,
1342            sn = self.sn,
1343            error = %error,
1344            "Child fault in validation worker"
1345        );
1346        emit_fail(ctx, error).await;
1347        ChildAction::Stop
1348    }
1349}