Skip to main content

ave_core/validation/
worker.rs

1use std::{collections::HashSet, sync::Arc};
2
3use crate::{
4    approval::response::ApprovalRes,
5    evaluation::response::EvaluationRes,
6    governance::{
7        data::GovernanceData,
8        model::Quorum,
9        role_register::{RoleDataRegister, SearchRole},
10    },
11    helpers::network::{NetworkMessage, service::NetworkSender},
12    model::{
13        common::{
14            check_quorum_signers, emit_fail, get_actual_roles_register,
15            get_validation_roles_register,
16            node::{SignTypesNode, get_sign},
17        },
18        event::{ApprovalData, EvaluationData, EvaluationResponse},
19    },
20    subject::{Metadata, RequestSubjectData},
21    validation::{
22        request::{ActualProtocols, LastData},
23        response::ValidatorError,
24    },
25};
26
27use crate::helpers::network::ActorMessage;
28
29use async_trait::async_trait;
30use ave_common::{
31    ValueWrapper,
32    bridge::request::EventRequestType,
33    identity::{
34        DigestIdentifier, HashAlgorithm, PublicKey, Signed, hash_borsh,
35    },
36    request::EventRequest,
37};
38use borsh::{BorshDeserialize, BorshSerialize};
39
40use json_patch::{Patch, patch};
41use network::ComunicateInfo;
42
43use ave_actors::{
44    Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
45    NotPersistentActor,
46};
47
48use tracing::{Span, debug, error, info_span, warn};
49
50use super::{
51    Validation, ValidationMessage, request::ValidationReq,
52    response::ValidationRes,
53};
54
55/// A struct representing a ValiWorker actor.
56#[derive(
57    Clone,
58    Debug,
59    serde::Serialize,
60    serde::Deserialize,
61    BorshSerialize,
62    BorshDeserialize,
63)]
64pub struct CurrentRequestRoles {
65    pub evaluation: RoleDataRegister,
66    pub approval: RoleDataRegister,
67}
68
69#[derive(Clone, Debug)]
70pub struct CurrentWorkerRoles {
71    pub evaluation: RoleDataRegister,
72    pub approval: RoleDataRegister,
73}
74
75#[derive(Clone, Debug)]
76pub struct ValiWorker {
77    pub node_key: PublicKey,
78    pub our_key: Arc<PublicKey>,
79    pub init_state: Option<ValueWrapper>,
80    pub governance_id: DigestIdentifier,
81    pub gov_version: u64,
82    pub sn: u64,
83    pub hash: HashAlgorithm,
84    pub network: Arc<NetworkSender>,
85    pub current_roles: CurrentWorkerRoles,
86    pub stop: bool,
87}
88
89impl ValiWorker {
90    fn current_evaluation_roles(&self) -> RoleDataRegister {
91        self.current_roles.evaluation.clone()
92    }
93
94    fn current_approval_roles(&self) -> RoleDataRegister {
95        self.current_roles.approval.clone()
96    }
97
98    async fn check_governance(
99        &self,
100        gov_version: u64,
101    ) -> Result<bool, ActorError> {
102        match self.gov_version.cmp(&gov_version) {
103            std::cmp::Ordering::Less => {
104                warn!(
105                    local_gov_version = self.gov_version,
106                    request_gov_version = gov_version,
107                    governance_id = %self.governance_id,
108                    sender = %self.node_key,
109                    "Received request with a higher governance version; ignoring request"
110                );
111                Err(ActorError::Functional {
112                    description:
113                        "Abort validation, request governance version is higher than local"
114                            .to_owned(),
115                })
116            }
117            std::cmp::Ordering::Equal => {
118                // If it is the same it means that we have the latest version of governance, we are up to date.
119                Ok(false)
120            }
121            std::cmp::Ordering::Greater => Ok(true),
122        }
123    }
124
125    fn check_data(
126        &self,
127        validation_req: &Signed<ValidationReq>,
128    ) -> Result<(), ValidatorError> {
129        if !validation_req.content().is_valid() {
130            return Err(ValidatorError::InvalidData {
131                value: "validation request",
132            });
133        }
134
135        let governance_id = validation_req
136            .content()
137            .get_governance_id()
138            .map_err(|_| ValidatorError::InvalidData {
139                value: "governance_id",
140            })?;
141
142        if governance_id != self.governance_id {
143            return Err(ValidatorError::InvalidData {
144                value: "governance_id",
145            });
146        }
147
148        if validation_req.verify().is_err() {
149            return Err(ValidatorError::InvalidSignature {
150                data: "validation request",
151            });
152        }
153
154        if validation_req
155            .content()
156            .get_signed_event_request()
157            .verify()
158            .is_err()
159        {
160            return Err(ValidatorError::InvalidSignature {
161                data: "event request",
162            });
163        }
164
165        Ok(())
166    }
167
168    fn check_metadata(
169        event_type: &EventRequestType,
170        metadata: &Metadata,
171        gov_version: u64,
172    ) -> Result<(), ValidatorError> {
173        let is_gov = metadata.schema_id.is_gov();
174
175        if let Some(name) = &metadata.name
176            && (name.is_empty() || name.len() > 100)
177        {
178            return Err(ValidatorError::InvalidData {
179                value: "metadata name",
180            });
181        }
182
183        if let Some(description) = &metadata.description
184            && (description.is_empty() || description.len() > 200)
185        {
186            return Err(ValidatorError::InvalidData {
187                value: "metadata description",
188            });
189        }
190
191        if metadata.subject_id.is_empty() {
192            return Err(ValidatorError::InvalidData {
193                value: "metadata subject_id",
194            });
195        }
196
197        if is_gov && metadata.governance_id != metadata.subject_id
198            || !is_gov && metadata.governance_id == metadata.subject_id
199        {
200            return Err(ValidatorError::InvalidData {
201                value: "metadata governance_id",
202            });
203        }
204
205        if is_gov && metadata.genesis_gov_version != 0
206            || !is_gov && metadata.genesis_gov_version == 0
207        {
208            return Err(ValidatorError::InvalidData {
209                value: "metadata genesis_gov_version",
210            });
211        }
212
213        if metadata.genesis_gov_version > gov_version {
214            return Err(ValidatorError::InvalidData {
215                value: "metadata genesis_gov_version",
216            });
217        }
218
219        if metadata.sn == 0 && !metadata.prev_ledger_event_hash.is_empty()
220            || metadata.sn != 0 && metadata.prev_ledger_event_hash.is_empty()
221        {
222            return Err(ValidatorError::InvalidData {
223                value: "metadata prev_ledger_event_hash",
224            });
225        };
226
227        if !metadata.schema_id.is_valid_in_request() {
228            return Err(ValidatorError::InvalidData {
229                value: "metadata schema_id",
230            });
231        };
232
233        if is_gov && !metadata.namespace.is_empty() {
234            return Err(ValidatorError::InvalidData {
235                value: "metadata namespace",
236            });
237        }
238
239        if metadata.creator.is_empty() {
240            return Err(ValidatorError::InvalidData {
241                value: "metadata creator",
242            });
243        }
244
245        if metadata.owner.is_empty() {
246            return Err(ValidatorError::InvalidData {
247                value: "metadata owner",
248            });
249        }
250
251        if let Some(new_owner) = &metadata.new_owner
252            && (new_owner.is_empty() || new_owner == &metadata.owner)
253        {
254            return Err(ValidatorError::InvalidData {
255                value: "metadata new owner",
256            });
257        };
258
259        if !metadata.active {
260            return Err(ValidatorError::InvalidData {
261                value: "metadata active",
262            });
263        }
264
265        match event_type {
266            EventRequestType::Create => {
267                return Err(ValidatorError::InvalidData {
268                    value: "Event request type",
269                });
270            }
271            EventRequestType::Confirm | EventRequestType::Reject => {
272                if metadata.new_owner.is_none() {
273                    return Err(ValidatorError::InvalidData {
274                        value: "Event request type",
275                    });
276                }
277            }
278            EventRequestType::Fact
279            | EventRequestType::Transfer
280            | EventRequestType::Eol => {
281                if metadata.new_owner.is_some() {
282                    return Err(ValidatorError::InvalidData {
283                        value: "Event request type",
284                    });
285                }
286            }
287        };
288
289        Ok(())
290    }
291
292    fn check_basic_data(
293        request: &Signed<EventRequest>,
294        metadata: &Metadata,
295        vali_req_signer: &PublicKey,
296        gov_version: u64,
297        sn: u64,
298    ) -> Result<(), ValidatorError> {
299        // Check event request.
300
301        if request.verify().is_err() {
302            return Err(ValidatorError::InvalidSignature {
303                data: "event request",
304            });
305        }
306
307        Self::check_metadata(
308            &EventRequestType::from(request.content()),
309            metadata,
310            gov_version,
311        )?;
312
313        if !request.content().check_request_signature(
314            &request.signature().signer,
315            &metadata.owner,
316            &metadata.new_owner,
317        ) {
318            return Err(ValidatorError::InvalidSigner {
319                signer: request.signature().signer.to_string(),
320            });
321        }
322
323        // subject
324        if request.content().get_subject_id() != metadata.subject_id {
325            return Err(ValidatorError::InvalidData {
326                value: "Subject_id",
327            });
328        }
329
330        // vali request signer
331        let signer = metadata
332            .new_owner
333            .clone()
334            .unwrap_or_else(|| metadata.owner.clone());
335
336        if &signer != vali_req_signer {
337            return Err(ValidatorError::InvalidSigner {
338                signer: vali_req_signer.to_string(),
339            });
340        }
341
342        // sn
343        if sn != metadata.sn + 1 {
344            return Err(ValidatorError::InvalidData { value: "sn" });
345        }
346        Ok(())
347    }
348
349    fn check_approval_signers(
350        agrees: &HashSet<PublicKey>,
351        disagrees: &HashSet<PublicKey>,
352        timeout: &HashSet<PublicKey>,
353        workers: &HashSet<PublicKey>,
354    ) -> bool {
355        agrees.is_subset(workers)
356            && disagrees.is_subset(workers)
357            && timeout.is_subset(workers)
358    }
359
360    fn check_approval_quorum(
361        agrees: u32,
362        timeout: u32,
363        quorum: &Quorum,
364        workers: &HashSet<PublicKey>,
365        approved: bool,
366    ) -> bool {
367        if approved {
368            quorum.check_quorum(workers.len() as u32, agrees + timeout)
369        } else {
370            !quorum.check_quorum(workers.len() as u32, agrees + timeout)
371        }
372    }
373
374    fn check_approval(
375        approval: ApprovalData,
376        appr_data: RoleDataRegister,
377        req_subject_data_hash: DigestIdentifier,
378        signer: PublicKey,
379    ) -> Result<(), ValidatorError> {
380        if signer != approval.approval_req_signature.signer {
381            return Err(ValidatorError::InvalidSigner {
382                signer: signer.to_string(),
383            });
384        }
385
386        let agrees = approval
387            .approvers_agrees_signatures
388            .iter()
389            .map(|x| x.signer.clone())
390            .collect::<HashSet<PublicKey>>();
391
392        let timeout = approval
393            .approvers_timeout
394            .iter()
395            .map(|x| x.who.clone())
396            .collect::<HashSet<PublicKey>>();
397
398        let disagrees = approval
399            .approvers_disagrees_signatures
400            .iter()
401            .map(|x| x.signer.clone())
402            .collect::<HashSet<PublicKey>>();
403
404        if !Self::check_approval_signers(
405            &agrees,
406            &disagrees,
407            &timeout,
408            &appr_data.workers,
409        ) {
410            return Err(ValidatorError::InvalidOperation {
411                action: "verify approval signers",
412            });
413        }
414
415        if !Self::check_approval_quorum(
416            agrees.len() as u32,
417            timeout.len() as u32,
418            &appr_data.quorum,
419            &appr_data.workers,
420            approval.approved,
421        ) {
422            return Err(ValidatorError::InvalidOperation {
423                action: "verify approval quorum",
424            });
425        }
426
427        let agrees_res = ApprovalRes::Response {
428            approval_req_hash: approval.approval_req_hash.clone(),
429            agrees: true,
430            req_subject_data_hash: req_subject_data_hash.clone(),
431        };
432        for signature in approval.approvers_agrees_signatures.iter() {
433            let signed_res =
434                Signed::from_parts(agrees_res.clone(), signature.clone());
435
436            if signed_res.verify().is_err() {
437                return Err(ValidatorError::InvalidSignature {
438                    data: "approval agrees",
439                });
440            }
441        }
442
443        let disagrees_res = ApprovalRes::Response {
444            approval_req_hash: approval.approval_req_hash.clone(),
445            agrees: false,
446            req_subject_data_hash,
447        };
448        for signature in approval.approvers_disagrees_signatures.iter() {
449            let signed_res =
450                Signed::from_parts(disagrees_res.clone(), signature.clone());
451
452            if signed_res.verify().is_err() {
453                return Err(ValidatorError::InvalidSignature {
454                    data: "approval disagrees",
455                });
456            }
457        }
458
459        Ok(())
460    }
461
462    fn check_evaluation(
463        &self,
464        evaluation: EvaluationData,
465        eval_data: RoleDataRegister,
466        mut properties: ValueWrapper,
467        req_subject_data_hash: DigestIdentifier,
468        signer: PublicKey,
469    ) -> Result<(bool, ValueWrapper), ValidatorError> {
470        if signer != evaluation.eval_req_signature.signer {
471            return Err(ValidatorError::InvalidSigner {
472                signer: signer.to_string(),
473            });
474        }
475
476        if !check_quorum_signers(
477            &evaluation
478                .evaluators_signatures
479                .iter()
480                .map(|x| x.signer.clone())
481                .collect::<HashSet<PublicKey>>(),
482            &eval_data.quorum,
483            &eval_data.workers,
484        ) {
485            return Err(ValidatorError::InvalidOperation {
486                action: "verify evaluation quorum",
487            });
488        }
489
490        let eval_res = match evaluation.response.clone() {
491            EvaluationResponse::Ok(evaluator_response) => {
492                EvaluationRes::Response {
493                    response: evaluator_response,
494                    eval_req_hash: evaluation.eval_req_hash.clone(),
495                    req_subject_data_hash,
496                }
497            }
498            EvaluationResponse::Error(evaluator_error) => {
499                EvaluationRes::Error {
500                    error: evaluator_error,
501                    eval_req_hash: evaluation.eval_req_hash.clone(),
502                    req_subject_data_hash,
503                }
504            }
505        };
506
507        for signature in evaluation.evaluators_signatures.iter() {
508            let signed_res =
509                Signed::from_parts(eval_res.clone(), signature.clone());
510
511            if signed_res.verify().is_err() {
512                return Err(ValidatorError::InvalidSignature {
513                    data: "evaluation",
514                });
515            }
516        }
517
518        let appr_required = if let Some(evaluator_res) =
519            evaluation.evaluator_res()
520        {
521            let json_patch =
522                serde_json::from_value::<Patch>(evaluator_res.patch.0)
523                    .map_err(|_| ValidatorError::InvalidData {
524                        value: "evaluation patch",
525                    })?;
526
527            patch(&mut properties.0, &json_patch).map_err(|_| {
528                ValidatorError::InvalidOperation {
529                    action: "apply patch",
530                }
531            })?;
532
533            let properties_hash = hash_borsh(&*self.hash.hasher(), &properties)
534                .map_err(|e| ValidatorError::InternalError {
535                    problem: e.to_string(),
536                })?;
537
538            if properties_hash != evaluator_res.properties_hash {
539                return Err(ValidatorError::InvalidData {
540                    value: "properties_hash",
541                });
542            }
543
544            evaluator_res.appr_required
545        } else {
546            false
547        };
548
549        Ok((appr_required, properties))
550    }
551
552    async fn check_actual_protocols(
553        &self,
554        ctx: &mut ActorContext<Self>,
555        metadata: &Metadata,
556        actual_protocols: &ActualProtocols,
557        event_type: &EventRequestType,
558        gov_version: u64,
559        signer: PublicKey,
560    ) -> Result<Option<ValueWrapper>, ValidatorError> {
561        if !actual_protocols
562            .check_protocols(metadata.schema_id.is_gov(), event_type)
563        {
564            return Err(ValidatorError::InvalidData {
565                value: "actual protocols",
566            });
567        }
568
569        let (evaluation, approval) = match &actual_protocols {
570            ActualProtocols::None => (None, None),
571            ActualProtocols::Eval { eval_data } => {
572                (Some(eval_data.clone()), None)
573            }
574            ActualProtocols::EvalApprove {
575                eval_data,
576                approval_data,
577            } => (Some(eval_data.clone()), Some(approval_data.clone())),
578        };
579
580        let properties = if let Some(evaluation) = evaluation {
581            let req_subject_data_hash = hash_borsh(
582                &*self.hash.hasher(),
583                &RequestSubjectData {
584                    subject_id: metadata.subject_id.clone(),
585                    governance_id: metadata.governance_id.clone(),
586                    sn: metadata.sn + 1,
587                    namespace: metadata.namespace.clone(),
588                    schema_id: metadata.schema_id.clone(),
589                    gov_version,
590                    signer: signer.clone(),
591                },
592            )
593            .map_err(|e| ValidatorError::InternalError {
594                problem: e.to_string(),
595            })?;
596
597            let (eval_data, appro_data) = if gov_version == self.gov_version {
598                (
599                    self.current_evaluation_roles(),
600                    approval.as_ref().map(|_| self.current_approval_roles()),
601                )
602            } else {
603                get_actual_roles_register(
604                    ctx,
605                    &metadata.governance_id,
606                    SearchRole {
607                        schema_id: metadata.schema_id.clone(),
608                        namespace: metadata.namespace.clone(),
609                    },
610                    approval.is_some(),
611                    gov_version,
612                )
613                .await
614                .map_err(|e| {
615                    if let ActorError::UnexpectedResponse { .. } = e {
616                        ValidatorError::OutOfVersion
617                    } else {
618                        ValidatorError::InternalError {
619                            problem: e.to_string(),
620                        }
621                    }
622                })?
623            };
624
625            let (appr_required, properties) = self.check_evaluation(
626                evaluation,
627                eval_data,
628                metadata.properties.clone(),
629                req_subject_data_hash.clone(),
630                signer.clone(),
631            )?;
632
633            if let Some(approval) = approval
634                && let Some(appr_data) = appro_data
635            {
636                if !appr_required {
637                    return Err(ValidatorError::InvalidData {
638                        value: "evaluation appr_required",
639                    });
640                }
641
642                Self::check_approval(
643                    approval,
644                    appr_data,
645                    req_subject_data_hash,
646                    signer,
647                )?;
648            } else if appr_required {
649                return Err(ValidatorError::InvalidData {
650                    value: "evaluation appr_required",
651                });
652            }
653
654            Some(properties)
655        } else {
656            None
657        };
658
659        Ok(properties)
660    }
661
662    async fn check_last_vali_data(
663        &self,
664        ctx: &mut ActorContext<Self>,
665        metadata: &Metadata,
666        last_validation: &LastData,
667    ) -> Result<(), ValidatorError> {
668        let vali_data = get_validation_roles_register(
669            ctx,
670            &metadata.governance_id,
671            SearchRole {
672                schema_id: metadata.schema_id.clone(),
673                namespace: metadata.namespace.clone(),
674            },
675            last_validation.gov_version,
676        )
677        .await
678        .map_err(|e| {
679            if let ActorError::UnexpectedResponse { .. } = e {
680                ValidatorError::InvalidData {
681                    value: "gov_version",
682                }
683            } else {
684                ValidatorError::InternalError {
685                    problem: e.to_string(),
686                }
687            }
688        })?;
689
690        if !check_quorum_signers(
691            &last_validation
692                .vali_data
693                .validators_signatures
694                .iter()
695                .map(|x| x.signer.clone())
696                .collect::<HashSet<PublicKey>>(),
697            &vali_data.quorum,
698            &vali_data.workers,
699        ) {
700            return Err(ValidatorError::InvalidOperation {
701                action: "verify validation quorum",
702            });
703        }
704
705        let vali_req_hash =
706            last_validation.vali_data.validation_req_hash.clone();
707        let vali_res = if metadata.sn == 0 {
708            ValidationRes::Create {
709                vali_req_hash,
710                subject_metadata: Box::new(metadata.clone()),
711            }
712        } else {
713            let hash_metadata =
714                hash_borsh(&*self.hash.hasher(), &metadata.clone()).map_err(
715                    |e| ValidatorError::InternalError {
716                        problem: e.to_string(),
717                    },
718                )?;
719
720            ValidationRes::Response {
721                vali_req_hash,
722                modified_metadata_hash: hash_metadata,
723            }
724        };
725
726        for signature in last_validation.vali_data.validators_signatures.iter()
727        {
728            let signed_res =
729                Signed::from_parts(vali_res.clone(), signature.clone());
730
731            if signed_res.verify().is_err() {
732                return Err(ValidatorError::InvalidSignature {
733                    data: "last validation",
734                });
735            }
736        }
737
738        Ok(())
739    }
740
741    fn create_modified_metadata(
742        is_success: bool,
743        event_request: &EventRequest,
744        properties: Option<ValueWrapper>,
745        ledger_hash: DigestIdentifier,
746        mut metadata: Metadata,
747    ) -> Result<Metadata, ValidatorError> {
748        metadata.sn += 1;
749
750        metadata.prev_ledger_event_hash = ledger_hash;
751
752        if !is_success {
753            return Ok(metadata);
754        }
755
756        match event_request {
757            EventRequest::Create(..) => {
758                return Err(ValidatorError::InvalidData {
759                    value: "Event request type",
760                });
761            }
762            EventRequest::Fact(..) => {
763                if let Some(properties) = properties {
764                    metadata.properties = properties;
765                }
766            }
767            EventRequest::Transfer(transfer_request) => {
768                metadata.new_owner = Some(transfer_request.new_owner.clone());
769            }
770            EventRequest::Confirm(..) => {
771                if let Some(new_owner) = metadata.new_owner.take() {
772                    metadata.owner = new_owner;
773                } else {
774                    return Err(ValidatorError::InvalidData {
775                        value: "new owner",
776                    });
777                }
778
779                if let Some(properties) = properties {
780                    metadata.properties = properties;
781                }
782            }
783            EventRequest::Reject(..) => metadata.new_owner = None,
784            EventRequest::EOL(..) => metadata.active = false,
785        }
786
787        if metadata.schema_id.is_gov() {
788            let mut gov_data =
789                serde_json::from_value::<GovernanceData>(metadata.properties.0)
790                    .map_err(|_| ValidatorError::InvalidData {
791                        value: "metadata properties",
792                    })?;
793
794            gov_data.version += 1;
795            metadata.properties = gov_data.to_value_wrapper();
796        }
797
798        Ok(metadata)
799    }
800
801    async fn create_res(
802        &self,
803        ctx: &mut ActorContext<Self>,
804        reboot: bool,
805        validation_req: &Signed<ValidationReq>,
806    ) -> Result<ValidationRes, ValidatorError> {
807        if reboot {
808            Ok(ValidationRes::Reboot)
809        } else {
810            match validation_req.content() {
811                ValidationReq::Create {
812                    event_request,
813                    gov_version,
814                    subject_id,
815                } => {
816                    if let EventRequest::Create(create) =
817                        event_request.content()
818                    {
819                        if let Some(name) = &create.name
820                            && (name.is_empty() || name.len() > 100)
821                        {
822                            return Err(ValidatorError::InvalidData {
823                                value: "create event name",
824                            });
825                        }
826
827                        if let Some(description) = &create.description
828                            && (description.is_empty()
829                                || description.len() > 200)
830                        {
831                            return Err(ValidatorError::InvalidData {
832                                value: "create event description",
833                            });
834                        }
835
836                        if !create.schema_id.is_valid_in_request() {
837                            return Err(ValidatorError::InvalidData {
838                                value: "create event schema_id",
839                            });
840                        }
841
842                        if create.schema_id.is_gov() {
843                            if !create.governance_id.is_empty() {
844                                return Err(ValidatorError::InvalidData {
845                                    value: "create event governance_id",
846                                });
847                            }
848
849                            if !create.namespace.is_empty() {
850                                return Err(ValidatorError::InvalidData {
851                                    value: "create event namespace",
852                                });
853                            }
854                        } else if create.governance_id.is_empty() {
855                            return Err(ValidatorError::InvalidData {
856                                value: "create event governance_id",
857                            });
858                        }
859
860                        let subject_id_worker =
861                            hash_borsh(&*self.hash.hasher(), &event_request)
862                                .map_err(|e| ValidatorError::InternalError {
863                                    problem: e.to_string(),
864                                })?;
865
866                        if subject_id != &subject_id_worker {
867                            return Err(ValidatorError::InvalidData {
868                                value: "subject_id",
869                            });
870                        }
871
872                        let init_state = self.init_state.as_ref().map_or_else(
873                            || {
874                                let governance_data = GovernanceData::new(
875                                    validation_req.signature().signer.clone(),
876                                );
877
878                                governance_data.to_value_wrapper()
879                            },
880                            |init_state| init_state.clone(),
881                        );
882
883                        let governance_id = if create.schema_id.is_gov() {
884                            subject_id.clone()
885                        } else {
886                            create.governance_id.clone()
887                        };
888
889                        let subject_metadata = Metadata {
890                            name: create.name.clone(),
891                            description: create.description.clone(),
892                            subject_id: subject_id_worker,
893                            governance_id,
894                            genesis_gov_version: *gov_version,
895                            prev_ledger_event_hash: DigestIdentifier::default(),
896                            schema_id: create.schema_id.clone(),
897                            namespace: create.namespace.clone(),
898                            sn: 0,
899                            creator: validation_req.signature().signer.clone(),
900                            owner: validation_req.signature().signer.clone(),
901                            new_owner: None,
902                            active: true,
903                            properties: init_state,
904                        };
905
906                        let vali_req_hash =
907                            hash_borsh(&*self.hash.hasher(), &validation_req)
908                                .map_err(|e| ValidatorError::InternalError {
909                                problem: e.to_string(),
910                            })?;
911
912                        Ok(ValidationRes::Create {
913                            vali_req_hash,
914                            subject_metadata: Box::new(subject_metadata),
915                        })
916                    } else {
917                        Err(ValidatorError::InvalidData {
918                            value: "event type",
919                        })
920                    }
921                }
922                ValidationReq::Event {
923                    actual_protocols,
924                    event_request,
925                    metadata,
926                    last_data,
927                    gov_version,
928                    sn,
929                    ledger_hash,
930                } => {
931                    let signer = validation_req.signature().signer.clone();
932                    Self::check_basic_data(
933                        event_request,
934                        metadata,
935                        &signer,
936                        *gov_version,
937                        *sn,
938                    )?;
939
940                    let properties = self
941                        .check_actual_protocols(
942                            ctx,
943                            metadata,
944                            actual_protocols,
945                            &EventRequestType::from(event_request.content()),
946                            *gov_version,
947                            signer,
948                        )
949                        .await?;
950
951                    self.check_last_vali_data(ctx, metadata, last_data).await?;
952
953                    let is_success = actual_protocols.is_success();
954
955                    let modified_metadata = Self::create_modified_metadata(
956                        is_success,
957                        event_request.content(),
958                        properties,
959                        ledger_hash.clone(),
960                        *metadata.clone(),
961                    )?;
962
963                    let vali_req_hash =
964                        hash_borsh(&*self.hash.hasher(), &validation_req)
965                            .map_err(|e| ValidatorError::InternalError {
966                                problem: e.to_string(),
967                            })?;
968
969                    let modified_metadata_hash =
970                        hash_borsh(&*self.hash.hasher(), &modified_metadata)
971                            .map_err(|e| ValidatorError::InternalError {
972                                problem: e.to_string(),
973                            })?;
974
975                    Ok(ValidationRes::Response {
976                        vali_req_hash,
977                        modified_metadata_hash,
978                    })
979                }
980            }
981        }
982    }
983}
984
985#[derive(Debug, Clone)]
986pub enum ValiWorkerMessage {
987    UpdateCurrentRoles {
988        gov_version: u64,
989        current_roles: CurrentWorkerRoles,
990    },
991    LocalValidation {
992        validation_req: Box<Signed<ValidationReq>>,
993    },
994    NetworkRequest {
995        validation_req: Box<Signed<ValidationReq>>,
996        sender: PublicKey,
997        info: ComunicateInfo,
998    },
999}
1000
1001impl Message for ValiWorkerMessage {}
1002
1003#[async_trait]
1004impl Actor for ValiWorker {
1005    type Event = ();
1006    type Message = ValiWorkerMessage;
1007    type Response = ();
1008
1009    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
1010        parent_span.map_or_else(
1011            || info_span!("ValiWorker", id),
1012            |parent_span| info_span!(parent: parent_span, "ValiWorker", id),
1013        )
1014    }
1015}
1016
1017impl NotPersistentActor for ValiWorker {}
1018
1019#[async_trait]
1020impl Handler<Self> for ValiWorker {
1021    async fn handle_message(
1022        &mut self,
1023        _sender: ActorPath,
1024        msg: ValiWorkerMessage,
1025        ctx: &mut ActorContext<Self>,
1026    ) -> Result<(), ActorError> {
1027        match msg {
1028            ValiWorkerMessage::UpdateCurrentRoles {
1029                gov_version,
1030                current_roles,
1031            } => {
1032                self.gov_version = gov_version;
1033                self.current_roles = current_roles;
1034            }
1035            ValiWorkerMessage::LocalValidation { validation_req } => {
1036                let validation =
1037                    match self.create_res(ctx, false, &validation_req).await {
1038                        Ok(vali) => vali,
1039                        Err(e) => {
1040                            if matches!(e, ValidatorError::OutOfVersion) {
1041                                ValidationRes::Reboot
1042                            } else {
1043                                return Err(emit_fail(
1044                                    ctx,
1045                                    ActorError::FunctionalCritical {
1046                                        description: e.to_string(),
1047                                    },
1048                                )
1049                                .await);
1050                            }
1051                        }
1052                    };
1053
1054                let signature = match get_sign(
1055                    ctx,
1056                    SignTypesNode::ValidationRes(validation.clone()),
1057                )
1058                .await
1059                {
1060                    Ok(signature) => signature,
1061                    Err(e) => {
1062                        error!(
1063                            msg_type = "LocalValidation",
1064                            error = %e,
1065                            "Failed to sign validator response"
1066                        );
1067                        return Err(emit_fail(ctx, e).await);
1068                    }
1069                };
1070
1071                match ctx.get_parent::<Validation>().await {
1072                    Ok(validation_actor) => {
1073                        validation_actor
1074                            .tell(ValidationMessage::Response {
1075                                validation_res: Box::new(validation),
1076                                sender: (*self.our_key).clone(),
1077                                signature: Some(signature),
1078                            })
1079                            .await?;
1080
1081                        debug!(
1082                            msg_type = "LocalValidation",
1083                            "Validation completed and sent to parent"
1084                        );
1085                    }
1086                    Err(e) => {
1087                        error!(
1088                            msg_type = "LocalValidation",
1089                            "Failed to obtain Validation actor"
1090                        );
1091                        return Err(e);
1092                    }
1093                };
1094
1095                ctx.stop(None).await;
1096            }
1097            ValiWorkerMessage::NetworkRequest {
1098                validation_req,
1099                info,
1100                sender,
1101            } => {
1102                if sender != validation_req.signature().signer
1103                    || sender != self.node_key
1104                {
1105                    warn!(
1106                        msg_type = "NetworkRequest",
1107                        expected_sender = %self.node_key,
1108                        received_sender = %sender,
1109                        signer = %validation_req.signature().signer,
1110                        "Unexpected sender"
1111                    );
1112                    if self.stop {
1113                        ctx.stop(None).await;
1114                    }
1115
1116                    return Ok(());
1117                }
1118
1119                // TODO MUCHO CUIDADO COn esto
1120                let reboot = match self
1121                    .check_governance(
1122                        validation_req.content().get_gov_version(),
1123                    )
1124                    .await
1125                {
1126                    Ok(reboot) => reboot,
1127                    Err(e) => {
1128                        warn!(
1129                            msg_type = "NetworkRequest",
1130                            error = %e,
1131                            "Failed to check governance"
1132                        );
1133                        if let ActorError::Functional { .. } = e {
1134                            if self.stop {
1135                                ctx.stop(None).await;
1136                            }
1137
1138                            return Err(e);
1139                        } else {
1140                            return Err(emit_fail(ctx, e).await);
1141                        }
1142                    }
1143                };
1144
1145                let validation = if let Err(error) =
1146                    self.check_data(&validation_req)
1147                {
1148                    ValidationRes::Abort(error.to_string())
1149                } else {
1150                    match self.create_res(ctx, reboot, &validation_req).await {
1151                        Ok(vali) => vali,
1152                        Err(e) => {
1153                            if let ValidatorError::InternalError { .. } = e {
1154                                error!(
1155                                    msg_type = "NetworkRequest",
1156                                    error = %e,
1157                                    "Internal error during validation"
1158                                );
1159
1160                                return Err(emit_fail(
1161                                    ctx,
1162                                    ActorError::FunctionalCritical {
1163                                        description: e.to_string(),
1164                                    },
1165                                )
1166                                .await);
1167                            } else if matches!(e, ValidatorError::OutOfVersion)
1168                            {
1169                                ValidationRes::Reboot
1170                            } else {
1171                                ValidationRes::Abort(e.to_string())
1172                            }
1173                        }
1174                    }
1175                };
1176
1177                let signature = match get_sign(
1178                    ctx,
1179                    SignTypesNode::ValidationRes(validation.clone()),
1180                )
1181                .await
1182                {
1183                    Ok(signature) => signature,
1184                    Err(e) => {
1185                        error!(
1186                            msg_type = "NetworkRequest",
1187                            error = %e,
1188                            "Failed to sign validation response"
1189                        );
1190                        return Err(emit_fail(ctx, e).await);
1191                    }
1192                };
1193
1194                let new_info = ComunicateInfo {
1195                    receiver: sender,
1196                    request_id: info.request_id,
1197                    version: info.version,
1198                    receiver_actor: format!(
1199                        "/user/request/{}/validation/{}",
1200                        validation_req.content().get_subject_id(),
1201                        self.our_key.clone()
1202                    ),
1203                };
1204
1205                let signed_response: Signed<ValidationRes> =
1206                    Signed::from_parts(validation, signature);
1207                if let Err(e) = self
1208                    .network
1209                    .send_command(network::CommandHelper::SendMessage {
1210                        message: NetworkMessage {
1211                            info: new_info.clone(),
1212                            message: ActorMessage::ValidationRes {
1213                                res: signed_response,
1214                            },
1215                        },
1216                    })
1217                    .await
1218                {
1219                    error!(
1220                        msg_type = "NetworkRequest",
1221                        error = %e,
1222                        "Failed to send response to network"
1223                    );
1224                    return Err(emit_fail(ctx, e).await);
1225                } else {
1226                    debug!(
1227                        msg_type = "NetworkRequest",
1228                        receiver = %new_info.receiver,
1229                        request_id = %new_info.request_id,
1230                        "Validation response sent to network"
1231                    );
1232                }
1233
1234                if self.stop {
1235                    ctx.stop(None).await;
1236                }
1237            }
1238        }
1239
1240        Ok(())
1241    }
1242
1243    async fn on_child_fault(
1244        &mut self,
1245        error: ActorError,
1246        ctx: &mut ActorContext<Self>,
1247    ) -> ChildAction {
1248        error!(
1249            node_key = %self.node_key,
1250            governance_id = %self.governance_id,
1251            gov_version = self.gov_version,
1252            sn = self.sn,
1253            error = %error,
1254            "Child fault in validation worker"
1255        );
1256        emit_fail(ctx, error).await;
1257        ChildAction::Stop
1258    }
1259}