Skip to main content

ave_core/validation/
worker.rs

1use std::{collections::HashSet, sync::Arc};
2
3use crate::{
4    approval::response::ApprovalRes,
5    evaluation::response::EvaluationRes,
6    governance::{
7        data::GovernanceData,
8        model::Quorum,
9        role_register::{RoleDataRegister, SearchRole},
10    },
11    helpers::network::{NetworkMessage, service::NetworkSender},
12    model::{
13        common::{
14            check_quorum_signers, emit_fail, get_actual_roles_register,
15            get_validation_roles_register,
16            node::{
17                SignTypesNode, UpdateData, get_sign, update_ledger_network,
18            },
19        },
20        event::{ApprovalData, EvaluationData, EvaluationResponse},
21    },
22    subject::{Metadata, RequestSubjectData},
23    validation::{
24        request::{ActualProtocols, LastData},
25        response::ValidatorError,
26    },
27};
28
29use crate::helpers::network::ActorMessage;
30
31use async_trait::async_trait;
32use ave_common::{
33    ValueWrapper,
34    bridge::request::EventRequestType,
35    identity::{
36        DigestIdentifier, HashAlgorithm, PublicKey, Signed, hash_borsh,
37    },
38    request::EventRequest,
39};
40
41use json_patch::{Patch, patch};
42use network::ComunicateInfo;
43
44use ave_actors::{
45    Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
46    NotPersistentActor,
47};
48
49use tracing::{Span, debug, error, info_span, warn};
50
51use super::{
52    Validation, ValidationMessage, request::ValidationReq,
53    response::ValidationRes,
54};
55
56/// A struct representing a ValiWorker actor.
57#[derive(Clone, Debug)]
58pub struct ValiWorker {
59    pub node_key: PublicKey,
60    pub our_key: Arc<PublicKey>,
61    pub init_state: Option<ValueWrapper>,
62    pub governance_id: DigestIdentifier,
63    pub gov_version: u64,
64    pub sn: u64,
65    pub hash: HashAlgorithm,
66    pub network: Arc<NetworkSender>,
67    pub stop: bool,
68}
69
70impl ValiWorker {
71    async fn check_governance(
72        &self,
73        gov_version: u64,
74    ) -> Result<bool, ActorError> {
75        match gov_version.cmp(&self.gov_version) {
76            std::cmp::Ordering::Equal => {
77                // If it is the same it means that we have the latest version of governance, we are up to date.
78            }
79            std::cmp::Ordering::Greater => {
80                // Me llega una versión mayor a la mía.
81                let data = UpdateData {
82                    sn: self.sn,
83                    gov_version: self.gov_version,
84                    subject_id: self.governance_id.clone(),
85                    other_node: self.node_key.clone(),
86                };
87                update_ledger_network(data, self.network.clone()).await?;
88                let e = ActorError::Functional {
89                    description: "Abort Validation, update is required"
90                        .to_owned(),
91                };
92                return Err(e);
93            }
94            std::cmp::Ordering::Less => {
95                return Ok(true);
96            }
97        }
98
99        Ok(false)
100    }
101
102    fn check_data(
103        &self,
104        validation_req: &Signed<ValidationReq>,
105    ) -> Result<(), ValidatorError> {
106        if !validation_req.content().is_valid() {
107            return Err(ValidatorError::InvalidData {
108                value: "validation request",
109            });
110        }
111
112        let governance_id = validation_req
113            .content()
114            .get_governance_id()
115            .map_err(|_| ValidatorError::InvalidData {
116                value: "governance_id",
117            })?;
118
119        if governance_id != self.governance_id {
120            return Err(ValidatorError::InvalidData {
121                value: "governance_id",
122            });
123        }
124
125        if validation_req.verify().is_err() {
126            return Err(ValidatorError::InvalidSignature {
127                data: "validation request",
128            });
129        }
130
131        if validation_req
132            .content()
133            .get_signed_event_request()
134            .verify()
135            .is_err()
136        {
137            return Err(ValidatorError::InvalidSignature {
138                data: "event request",
139            });
140        }
141
142        Ok(())
143    }
144
145    fn check_metadata(
146        event_type: &EventRequestType,
147        metadata: &Metadata,
148        gov_version: u64,
149    ) -> Result<(), ValidatorError> {
150        let is_gov = metadata.schema_id.is_gov();
151
152        if let Some(name) = &metadata.name
153            && (name.is_empty() || name.len() > 100)
154        {
155            return Err(ValidatorError::InvalidData {
156                value: "metadata name",
157            });
158        }
159
160        if let Some(description) = &metadata.description
161            && (description.is_empty() || description.len() > 200)
162        {
163            return Err(ValidatorError::InvalidData {
164                value: "metadata description",
165            });
166        }
167
168        if metadata.subject_id.is_empty() {
169            return Err(ValidatorError::InvalidData {
170                value: "metadata subject_id",
171            });
172        }
173
174        if is_gov && metadata.governance_id != metadata.subject_id
175            || !is_gov && metadata.governance_id == metadata.subject_id
176        {
177            return Err(ValidatorError::InvalidData {
178                value: "metadata governance_id",
179            });
180        }
181
182        if is_gov && metadata.genesis_gov_version != 0
183            || !is_gov && metadata.genesis_gov_version == 0
184        {
185            return Err(ValidatorError::InvalidData {
186                value: "metadata genesis_gov_version",
187            });
188        }
189
190        if metadata.genesis_gov_version > gov_version {
191            return Err(ValidatorError::InvalidData {
192                value: "metadata genesis_gov_version",
193            });
194        }
195
196        if metadata.sn == 0 && !metadata.prev_ledger_event_hash.is_empty()
197            || metadata.sn != 0 && metadata.prev_ledger_event_hash.is_empty()
198        {
199            return Err(ValidatorError::InvalidData {
200                value: "metadata prev_ledger_event_hash",
201            });
202        };
203
204        if !metadata.schema_id.is_valid_in_request() {
205            return Err(ValidatorError::InvalidData {
206                value: "metadata schema_id",
207            });
208        };
209
210        if is_gov && !metadata.namespace.is_empty() {
211            return Err(ValidatorError::InvalidData {
212                value: "metadata namespace",
213            });
214        }
215
216        if metadata.creator.is_empty() {
217            return Err(ValidatorError::InvalidData {
218                value: "metadata creator",
219            });
220        }
221
222        if metadata.owner.is_empty() {
223            return Err(ValidatorError::InvalidData {
224                value: "metadata owner",
225            });
226        }
227
228        if let Some(new_owner) = &metadata.new_owner
229            && (new_owner.is_empty() || new_owner == &metadata.owner)
230        {
231            return Err(ValidatorError::InvalidData {
232                value: "metadata new owner",
233            });
234        };
235
236        if !metadata.active {
237            return Err(ValidatorError::InvalidData {
238                value: "metadata active",
239            });
240        }
241
242        match event_type {
243            EventRequestType::Create => {
244                return Err(ValidatorError::InvalidData {
245                    value: "Event request type",
246                });
247            }
248            EventRequestType::Confirm | EventRequestType::Reject => {
249                if metadata.new_owner.is_none() {
250                    return Err(ValidatorError::InvalidData {
251                        value: "Event request type",
252                    });
253                }
254            }
255            EventRequestType::Fact
256            | EventRequestType::Transfer
257            | EventRequestType::Eol => {
258                if metadata.new_owner.is_some() {
259                    return Err(ValidatorError::InvalidData {
260                        value: "Event request type",
261                    });
262                }
263            }
264        };
265
266        Ok(())
267    }
268
269    fn check_basic_data(
270        request: &Signed<EventRequest>,
271        metadata: &Metadata,
272        vali_req_signer: &PublicKey,
273        gov_version: u64,
274        sn: u64,
275    ) -> Result<(), ValidatorError> {
276        // // Check event request.
277
278        // todo mover a check
279        if request.verify().is_err() {
280            return Err(ValidatorError::InvalidSignature {
281                data: "event request",
282            });
283        }
284
285        Self::check_metadata(
286            &EventRequestType::from(request.content()),
287            metadata,
288            gov_version,
289        )?;
290
291        if !request.content().check_request_signature(
292            &request.signature().signer,
293            &metadata.owner,
294            &metadata.new_owner,
295        ) {
296            return Err(ValidatorError::InvalidSigner {
297                signer: request.signature().signer.to_string(),
298            });
299        }
300
301        // subject
302        if request.content().get_subject_id() != metadata.subject_id {
303            return Err(ValidatorError::InvalidData {
304                value: "Subject_id",
305            });
306        }
307
308        // vali request signer
309        let signer = metadata
310            .new_owner
311            .clone()
312            .unwrap_or_else(|| metadata.owner.clone());
313
314        if &signer != vali_req_signer {
315            return Err(ValidatorError::InvalidSigner {
316                signer: vali_req_signer.to_string(),
317            });
318        }
319
320        // sn
321        if sn != metadata.sn + 1 {
322            return Err(ValidatorError::InvalidData { value: "sn" });
323        }
324        Ok(())
325    }
326
327    fn check_approval_signers(
328        agrees: &HashSet<PublicKey>,
329        disagrees: &HashSet<PublicKey>,
330        timeout: &HashSet<PublicKey>,
331        workers: &HashSet<PublicKey>,
332    ) -> bool {
333        agrees.is_subset(workers)
334            && disagrees.is_subset(workers)
335            && timeout.is_subset(workers)
336    }
337
338    fn check_approval_quorum(
339        agrees: u32,
340        timeout: u32,
341        quorum: &Quorum,
342        workers: &HashSet<PublicKey>,
343        approved: bool,
344    ) -> bool {
345        if approved {
346            quorum.check_quorum(workers.len() as u32, agrees + timeout)
347        } else {
348            !quorum.check_quorum(workers.len() as u32, agrees + timeout)
349        }
350    }
351
352    fn check_approval(
353        approval: ApprovalData,
354        appr_data: RoleDataRegister,
355        req_subject_data_hash: DigestIdentifier,
356        signer: PublicKey,
357    ) -> Result<(), ValidatorError> {
358        if signer != approval.approval_req_signature.signer {
359            return Err(ValidatorError::InvalidSigner {
360                signer: signer.to_string(),
361            });
362        }
363
364        let agrees = approval
365            .approvers_agrees_signatures
366            .iter()
367            .map(|x| x.signer.clone())
368            .collect::<HashSet<PublicKey>>();
369
370        let timeout = approval
371            .approvers_timeout
372            .iter()
373            .map(|x| x.who.clone())
374            .collect::<HashSet<PublicKey>>();
375
376        let disagrees = approval
377            .approvers_disagrees_signatures
378            .iter()
379            .map(|x| x.signer.clone())
380            .collect::<HashSet<PublicKey>>();
381
382        if !Self::check_approval_signers(
383            &agrees,
384            &disagrees,
385            &timeout,
386            &appr_data.workers,
387        ) {
388            return Err(ValidatorError::InvalidOperation {
389                action: "verify approval signers",
390            });
391        }
392
393        if !Self::check_approval_quorum(
394            agrees.len() as u32,
395            timeout.len() as u32,
396            &appr_data.quorum,
397            &appr_data.workers,
398            approval.approved,
399        ) {
400            return Err(ValidatorError::InvalidOperation {
401                action: "verify approval quorum",
402            });
403        }
404
405        let agrees_res = ApprovalRes::Response {
406            approval_req_hash: approval.approval_req_hash.clone(),
407            agrees: true,
408            req_subject_data_hash: req_subject_data_hash.clone(),
409        };
410        for signature in approval.approvers_agrees_signatures.iter() {
411            let signed_res =
412                Signed::from_parts(agrees_res.clone(), signature.clone());
413
414            if signed_res.verify().is_err() {
415                return Err(ValidatorError::InvalidSignature {
416                    data: "approval agrees",
417                });
418            }
419        }
420
421        let disagrees_res = ApprovalRes::Response {
422            approval_req_hash: approval.approval_req_hash.clone(),
423            agrees: false,
424            req_subject_data_hash,
425        };
426        for signature in approval.approvers_disagrees_signatures.iter() {
427            let signed_res =
428                Signed::from_parts(disagrees_res.clone(), signature.clone());
429
430            if signed_res.verify().is_err() {
431                return Err(ValidatorError::InvalidSignature {
432                    data: "approval disagrees",
433                });
434            }
435        }
436
437        Ok(())
438    }
439
440    fn check_evaluation(
441        &self,
442        evaluation: EvaluationData,
443        eval_data: RoleDataRegister,
444        mut properties: ValueWrapper,
445        req_subject_data_hash: DigestIdentifier,
446        signer: PublicKey,
447    ) -> Result<(bool, ValueWrapper), ValidatorError> {
448        if signer != evaluation.eval_req_signature.signer {
449            return Err(ValidatorError::InvalidSigner {
450                signer: signer.to_string(),
451            });
452        }
453
454        if !check_quorum_signers(
455            &evaluation
456                .evaluators_signatures
457                .iter()
458                .map(|x| x.signer.clone())
459                .collect::<HashSet<PublicKey>>(),
460            &eval_data.quorum,
461            &eval_data.workers,
462        ) {
463            return Err(ValidatorError::InvalidOperation {
464                action: "verify evaluation quorum",
465            });
466        }
467
468        let eval_res = match evaluation.response.clone() {
469            EvaluationResponse::Ok(evaluator_response) => {
470                EvaluationRes::Response {
471                    response: evaluator_response,
472                    eval_req_hash: evaluation.eval_req_hash.clone(),
473                    req_subject_data_hash,
474                }
475            }
476            EvaluationResponse::Error(evaluator_error) => {
477                EvaluationRes::Error {
478                    error: evaluator_error,
479                    eval_req_hash: evaluation.eval_req_hash.clone(),
480                    req_subject_data_hash,
481                }
482            }
483        };
484
485        for signature in evaluation.evaluators_signatures.iter() {
486            let signed_res =
487                Signed::from_parts(eval_res.clone(), signature.clone());
488
489            if signed_res.verify().is_err() {
490                return Err(ValidatorError::InvalidSignature {
491                    data: "evaluation",
492                });
493            }
494        }
495
496        let appr_required = if let Some(evaluator_res) =
497            evaluation.evaluator_res()
498        {
499            let json_patch =
500                serde_json::from_value::<Patch>(evaluator_res.patch.0)
501                    .map_err(|_| ValidatorError::InvalidData {
502                        value: "evaluation patch",
503                    })?;
504
505            patch(&mut properties.0, &json_patch).map_err(|_| {
506                ValidatorError::InvalidOperation {
507                    action: "apply patch",
508                }
509            })?;
510
511            let properties_hash = hash_borsh(&*self.hash.hasher(), &properties)
512                .map_err(|e| ValidatorError::InternalError {
513                    problem: e.to_string(),
514                })?;
515
516            if properties_hash != evaluator_res.properties_hash {
517                return Err(ValidatorError::InvalidData {
518                    value: "properties_hash",
519                });
520            }
521
522            evaluator_res.appr_required
523        } else {
524            false
525        };
526
527        Ok((appr_required, properties))
528    }
529
530    async fn check_actual_protocols(
531        &self,
532        ctx: &mut ActorContext<Self>,
533        metadata: &Metadata,
534        actual_protocols: &ActualProtocols,
535        event_type: &EventRequestType,
536        gov_version: u64,
537        signer: PublicKey,
538    ) -> Result<Option<ValueWrapper>, ValidatorError> {
539        if !actual_protocols
540            .check_protocols(metadata.schema_id.is_gov(), event_type)
541        {
542            return Err(ValidatorError::InvalidData {
543                value: "actual protocols",
544            });
545        }
546
547        let (evaluation, approval) = match &actual_protocols {
548            ActualProtocols::None => (None, None),
549            ActualProtocols::Eval { eval_data } => {
550                (Some(eval_data.clone()), None)
551            }
552            ActualProtocols::EvalApprove {
553                eval_data,
554                approval_data,
555            } => (Some(eval_data.clone()), Some(approval_data.clone())),
556        };
557
558        let properties = if let Some(evaluation) = evaluation {
559            let req_subject_data_hash = hash_borsh(
560                &*self.hash.hasher(),
561                &RequestSubjectData {
562                    subject_id: metadata.subject_id.clone(),
563                    governance_id: metadata.governance_id.clone(),
564                    sn: metadata.sn + 1,
565                    namespace: metadata.namespace.clone(),
566                    schema_id: metadata.schema_id.clone(),
567                    gov_version,
568                    signer: signer.clone(),
569                },
570            )
571            .map_err(|e| ValidatorError::InternalError {
572                problem: e.to_string(),
573            })?;
574
575            let (eval_data, appro_data) = get_actual_roles_register(
576                ctx,
577                &metadata.governance_id,
578                SearchRole {
579                    schema_id: metadata.schema_id.clone(),
580                    namespace: metadata.namespace.clone(),
581                },
582                approval.is_some(),
583                gov_version,
584            )
585            .await
586            .map_err(|e| {
587                if let ActorError::UnexpectedResponse { .. } = e {
588                    ValidatorError::OutOfVersion
589                } else {
590                    ValidatorError::InternalError {
591                        problem: e.to_string(),
592                    }
593                }
594            })?;
595
596            let (appr_required, properties) = self.check_evaluation(
597                evaluation,
598                eval_data,
599                metadata.properties.clone(),
600                req_subject_data_hash.clone(),
601                signer.clone(),
602            )?;
603
604            if let Some(approval) = approval
605                && let Some(appr_data) = appro_data
606            {
607                if !appr_required {
608                    return Err(ValidatorError::InvalidData {
609                        value: "evaluation appr_required",
610                    });
611                }
612
613                Self::check_approval(
614                    approval,
615                    appr_data,
616                    req_subject_data_hash,
617                    signer,
618                )?;
619            } else if appr_required {
620                return Err(ValidatorError::InvalidData {
621                    value: "evaluation appr_required",
622                });
623            }
624
625            Some(properties)
626        } else {
627            None
628        };
629
630        Ok(properties)
631    }
632
633    async fn check_last_vali_data(
634        &self,
635        ctx: &mut ActorContext<Self>,
636        metadata: &Metadata,
637        last_validation: &LastData,
638    ) -> Result<(), ValidatorError> {
639        let vali_data = get_validation_roles_register(
640            ctx,
641            &metadata.governance_id,
642            SearchRole {
643                schema_id: metadata.schema_id.clone(),
644                namespace: metadata.namespace.clone(),
645            },
646            last_validation.gov_version,
647        )
648        .await
649        .map_err(|e| {
650            if let ActorError::UnexpectedResponse { .. } = e {
651                ValidatorError::InvalidData {
652                    value: "gov_version",
653                }
654            } else {
655                ValidatorError::InternalError {
656                    problem: e.to_string(),
657                }
658            }
659        })?;
660
661        if !check_quorum_signers(
662            &last_validation
663                .vali_data
664                .validators_signatures
665                .iter()
666                .map(|x| x.signer.clone())
667                .collect::<HashSet<PublicKey>>(),
668            &vali_data.quorum,
669            &vali_data.workers,
670        ) {
671            return Err(ValidatorError::InvalidOperation {
672                action: "verify validation quorum",
673            });
674        }
675
676        let vali_req_hash =
677            last_validation.vali_data.validation_req_hash.clone();
678        let vali_res = if metadata.sn == 0 {
679            ValidationRes::Create {
680                vali_req_hash,
681                subject_metadata: Box::new(metadata.clone()),
682            }
683        } else {
684            let hash_metadata =
685                hash_borsh(&*self.hash.hasher(), &metadata.clone()).map_err(
686                    |e| ValidatorError::InternalError {
687                        problem: e.to_string(),
688                    },
689                )?;
690
691            ValidationRes::Response {
692                vali_req_hash,
693                modified_metadata_hash: hash_metadata,
694            }
695        };
696
697        for signature in last_validation.vali_data.validators_signatures.iter()
698        {
699            let signed_res =
700                Signed::from_parts(vali_res.clone(), signature.clone());
701
702            if signed_res.verify().is_err() {
703                return Err(ValidatorError::InvalidSignature {
704                    data: "last validation",
705                });
706            }
707        }
708
709        Ok(())
710    }
711
712    fn create_modified_metadata(
713        is_success: bool,
714        event_request: &EventRequest,
715        properties: Option<ValueWrapper>,
716        ledger_hash: DigestIdentifier,
717        mut metadata: Metadata,
718    ) -> Result<Metadata, ValidatorError> {
719        metadata.sn += 1;
720
721        metadata.prev_ledger_event_hash = ledger_hash;
722
723        if !is_success {
724            return Ok(metadata);
725        }
726
727        match event_request {
728            EventRequest::Create(..) => {
729                return Err(ValidatorError::InvalidData {
730                    value: "Event request type",
731                });
732            }
733            EventRequest::Fact(..) => {
734                if let Some(properties) = properties {
735                    metadata.properties = properties;
736                }
737            }
738            EventRequest::Transfer(transfer_request) => {
739                metadata.new_owner = Some(transfer_request.new_owner.clone());
740            }
741            EventRequest::Confirm(..) => {
742                if let Some(new_owner) = metadata.new_owner.take() {
743                    metadata.owner = new_owner;
744                } else {
745                    return Err(ValidatorError::InvalidData {
746                        value: "new owner",
747                    });
748                }
749
750                if let Some(properties) = properties {
751                    metadata.properties = properties;
752                }
753            }
754            EventRequest::Reject(..) => metadata.new_owner = None,
755            EventRequest::EOL(..) => metadata.active = false,
756        }
757
758        if metadata.schema_id.is_gov() {
759            let mut gov_data =
760                serde_json::from_value::<GovernanceData>(metadata.properties.0)
761                    .map_err(|_| ValidatorError::InvalidData {
762                        value: "metadata properties",
763                    })?;
764
765            gov_data.version += 1;
766            metadata.properties = gov_data.to_value_wrapper();
767        }
768
769        Ok(metadata)
770    }
771
772    async fn create_res(
773        &self,
774        ctx: &mut ActorContext<Self>,
775        reboot: bool,
776        validation_req: &Signed<ValidationReq>,
777    ) -> Result<ValidationRes, ValidatorError> {
778        if reboot {
779            Ok(ValidationRes::Reboot)
780        } else {
781            match validation_req.content() {
782                ValidationReq::Create {
783                    event_request,
784                    gov_version,
785                    subject_id,
786                } => {
787                    // todo Check is valid en check_data
788                    if let EventRequest::Create(create) =
789                        event_request.content()
790                    {
791                        if let Some(name) = &create.name
792                            && (name.is_empty() || name.len() > 100)
793                        {
794                            return Err(ValidatorError::InvalidData {
795                                value: "create event name",
796                            });
797                        }
798
799                        if let Some(description) = &create.description
800                            && (description.is_empty()
801                                || description.len() > 200)
802                        {
803                            return Err(ValidatorError::InvalidData {
804                                value: "create event description",
805                            });
806                        }
807
808                        if !create.schema_id.is_valid_in_request() {
809                            return Err(ValidatorError::InvalidData {
810                                value: "create event schema_id",
811                            });
812                        }
813
814                        if create.schema_id.is_gov() {
815                            if !create.governance_id.is_empty() {
816                                return Err(ValidatorError::InvalidData {
817                                    value: "create event governance_id",
818                                });
819                            }
820
821                            if !create.namespace.is_empty() {
822                                return Err(ValidatorError::InvalidData {
823                                    value: "create event namespace",
824                                });
825                            }
826                        } else if create.governance_id.is_empty() {
827                            return Err(ValidatorError::InvalidData {
828                                value: "create event governance_id",
829                            });
830                        }
831
832                        let subject_id_worker =
833                            hash_borsh(&*self.hash.hasher(), &event_request)
834                                .map_err(|e| ValidatorError::InternalError {
835                                    problem: e.to_string(),
836                                })?;
837
838                        if subject_id != &subject_id_worker {
839                            return Err(ValidatorError::InvalidData {
840                                value: "subject_id",
841                            });
842                        }
843
844                        let init_state = self.init_state.as_ref().map_or_else(
845                            || {
846                                let governance_data = GovernanceData::new(
847                                    validation_req.signature().signer.clone(),
848                                );
849
850                                governance_data.to_value_wrapper()
851                            },
852                            |init_state| init_state.clone(),
853                        );
854
855                        let governance_id = if create.schema_id.is_gov() {
856                            subject_id.clone()
857                        } else {
858                            create.governance_id.clone()
859                        };
860
861                        let subject_metadata = Metadata {
862                            name: create.name.clone(),
863                            description: create.description.clone(),
864                            subject_id: subject_id_worker,
865                            governance_id,
866                            genesis_gov_version: *gov_version,
867                            prev_ledger_event_hash: DigestIdentifier::default(),
868                            schema_id: create.schema_id.clone(),
869                            namespace: create.namespace.clone(),
870                            sn: 0,
871                            creator: validation_req.signature().signer.clone(),
872                            owner: validation_req.signature().signer.clone(),
873                            new_owner: None,
874                            active: true,
875                            properties: init_state,
876                        };
877
878                        let vali_req_hash =
879                            hash_borsh(&*self.hash.hasher(), &validation_req)
880                                .map_err(|e| ValidatorError::InternalError {
881                                problem: e.to_string(),
882                            })?;
883
884                        Ok(ValidationRes::Create {
885                            vali_req_hash,
886                            subject_metadata: Box::new(subject_metadata),
887                        })
888                    } else {
889                        Err(ValidatorError::InvalidData {
890                            value: "event type",
891                        })
892                    }
893                }
894                ValidationReq::Event {
895                    actual_protocols,
896                    event_request,
897                    metadata,
898                    last_data,
899                    gov_version,
900                    sn,
901                    ledger_hash,
902                } => {
903                    let signer = validation_req.signature().signer.clone();
904                    Self::check_basic_data(
905                        event_request,
906                        metadata,
907                        &signer,
908                        *gov_version,
909                        *sn,
910                    )?;
911
912                    let properties = self
913                        .check_actual_protocols(
914                            ctx,
915                            metadata,
916                            actual_protocols,
917                            &EventRequestType::from(event_request.content()),
918                            *gov_version,
919                            signer,
920                        )
921                        .await?;
922
923                    self.check_last_vali_data(ctx, metadata, last_data).await?;
924
925                    let is_success = actual_protocols.is_success();
926
927                    let modified_metadata = Self::create_modified_metadata(
928                        is_success,
929                        event_request.content(),
930                        properties,
931                        ledger_hash.clone(),
932                        *metadata.clone(),
933                    )?;
934
935                    let vali_req_hash =
936                        hash_borsh(&*self.hash.hasher(), &validation_req)
937                            .map_err(|e| ValidatorError::InternalError {
938                                problem: e.to_string(),
939                            })?;
940
941                    let modified_metadata_hash =
942                        hash_borsh(&*self.hash.hasher(), &modified_metadata)
943                            .map_err(|e| ValidatorError::InternalError {
944                                problem: e.to_string(),
945                            })?;
946
947                    Ok(ValidationRes::Response {
948                        vali_req_hash,
949                        modified_metadata_hash,
950                    })
951                }
952            }
953        }
954    }
955}
956
957#[derive(Debug, Clone)]
958pub enum ValiWorkerMessage {
959    UpdateGovVersion {
960        gov_version: u64,
961    },
962    LocalValidation {
963        validation_req: Box<Signed<ValidationReq>>,
964    },
965    NetworkRequest {
966        validation_req: Box<Signed<ValidationReq>>,
967        sender: PublicKey,
968        info: ComunicateInfo,
969    },
970}
971
972impl Message for ValiWorkerMessage {}
973
974#[async_trait]
975impl Actor for ValiWorker {
976    type Event = ();
977    type Message = ValiWorkerMessage;
978    type Response = ();
979
980    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
981        parent_span.map_or_else(
982            || info_span!("ValiWorker", id),
983            |parent_span| info_span!(parent: parent_span, "ValiWorker", id),
984        )
985    }
986}
987
988impl NotPersistentActor for ValiWorker {}
989
990#[async_trait]
991impl Handler<Self> for ValiWorker {
992    async fn handle_message(
993        &mut self,
994        _sender: ActorPath,
995        msg: ValiWorkerMessage,
996        ctx: &mut ActorContext<Self>,
997    ) -> Result<(), ActorError> {
998        match msg {
999            ValiWorkerMessage::UpdateGovVersion { gov_version } => {
1000                self.gov_version = gov_version;
1001            }
1002            ValiWorkerMessage::LocalValidation { validation_req } => {
1003                let validation =
1004                    match self.create_res(ctx, false, &validation_req).await {
1005                        Ok(vali) => vali,
1006                        Err(e) => {
1007                            if matches!(e, ValidatorError::OutOfVersion) {
1008                                ValidationRes::Reboot
1009                            } else {
1010                                return Err(emit_fail(
1011                                    ctx,
1012                                    ActorError::FunctionalCritical {
1013                                        description: e.to_string(),
1014                                    },
1015                                )
1016                                .await);
1017                            }
1018                        }
1019                    };
1020
1021                let signature = match get_sign(
1022                    ctx,
1023                    SignTypesNode::ValidationRes(validation.clone()),
1024                )
1025                .await
1026                {
1027                    Ok(signature) => signature,
1028                    Err(e) => {
1029                        error!(
1030                            msg_type = "LocalValidation",
1031                            error = %e,
1032                            "Failed to sign validator response"
1033                        );
1034                        return Err(emit_fail(ctx, e).await);
1035                    }
1036                };
1037
1038                match ctx.get_parent::<Validation>().await {
1039                    Ok(validation_actor) => {
1040                        validation_actor
1041                            .tell(ValidationMessage::Response {
1042                                validation_res: Box::new(validation),
1043                                sender: (*self.our_key).clone(),
1044                                signature: Some(signature),
1045                            })
1046                            .await?;
1047
1048                        debug!(
1049                            msg_type = "LocalValidation",
1050                            "Validation completed and sent to parent"
1051                        );
1052                    }
1053                    Err(e) => {
1054                        error!(
1055                            msg_type = "LocalValidation",
1056                            "Failed to obtain Validation actor"
1057                        );
1058                        return Err(e);
1059                    }
1060                };
1061
1062                ctx.stop(None).await;
1063            }
1064            ValiWorkerMessage::NetworkRequest {
1065                validation_req,
1066                info,
1067                sender,
1068            } => {
1069                if sender != validation_req.signature().signer
1070                    || sender != self.node_key
1071                {
1072                    warn!(
1073                        msg_type = "NetworkRequest",
1074                        expected_sender = %self.node_key,
1075                        received_sender = %sender,
1076                        signer = %validation_req.signature().signer,
1077                        "Unexpected sender"
1078                    );
1079                    if self.stop {
1080                        ctx.stop(None).await;
1081                    }
1082
1083                    return Ok(());
1084                }
1085
1086                // TODO MUCHO CUIDADO COn esto
1087                let reboot = match self
1088                    .check_governance(
1089                        validation_req.content().get_gov_version(),
1090                    )
1091                    .await
1092                {
1093                    Ok(reboot) => reboot,
1094                    Err(e) => {
1095                        warn!(
1096                            msg_type = "NetworkRequest",
1097                            error = %e,
1098                            "Failed to check governance"
1099                        );
1100                        if let ActorError::Functional { .. } = e {
1101                            if self.stop {
1102                                ctx.stop(None).await;
1103                            }
1104
1105                            return Err(e);
1106                        } else {
1107                            return Err(emit_fail(ctx, e).await);
1108                        }
1109                    }
1110                };
1111
1112                let validation = if let Err(error) =
1113                    self.check_data(&validation_req)
1114                {
1115                    ValidationRes::Abort(error.to_string())
1116                } else {
1117                    match self.create_res(ctx, reboot, &validation_req).await {
1118                        Ok(vali) => vali,
1119                        Err(e) => {
1120                            if let ValidatorError::InternalError { .. } = e {
1121                                error!(
1122                                    msg_type = "NetworkRequest",
1123                                    error = %e,
1124                                    "Internal error during validation"
1125                                );
1126
1127                                return Err(emit_fail(
1128                                    ctx,
1129                                    ActorError::FunctionalCritical {
1130                                        description: e.to_string(),
1131                                    },
1132                                )
1133                                .await);
1134                            } else if matches!(e, ValidatorError::OutOfVersion)
1135                            {
1136                                ValidationRes::Reboot
1137                            } else {
1138                                ValidationRes::Abort(e.to_string())
1139                            }
1140                        }
1141                    }
1142                };
1143
1144                let signature = match get_sign(
1145                    ctx,
1146                    SignTypesNode::ValidationRes(validation.clone()),
1147                )
1148                .await
1149                {
1150                    Ok(signature) => signature,
1151                    Err(e) => {
1152                        error!(
1153                            msg_type = "NetworkRequest",
1154                            error = %e,
1155                            "Failed to sign validation response"
1156                        );
1157                        return Err(emit_fail(ctx, e).await);
1158                    }
1159                };
1160
1161                let new_info = ComunicateInfo {
1162                    receiver: sender,
1163                    request_id: info.request_id,
1164                    version: info.version,
1165                    receiver_actor: format!(
1166                        "/user/request/{}/validation/{}",
1167                        validation_req.content().get_subject_id(),
1168                        self.our_key.clone()
1169                    ),
1170                };
1171
1172                let signed_response: Signed<ValidationRes> =
1173                    Signed::from_parts(validation, signature);
1174                if let Err(e) = self
1175                    .network
1176                    .send_command(network::CommandHelper::SendMessage {
1177                        message: NetworkMessage {
1178                            info: new_info.clone(),
1179                            message: ActorMessage::ValidationRes {
1180                                res: signed_response,
1181                            },
1182                        },
1183                    })
1184                    .await
1185                {
1186                    error!(
1187                        msg_type = "NetworkRequest",
1188                        error = %e,
1189                        "Failed to send response to network"
1190                    );
1191                    return Err(emit_fail(ctx, e).await);
1192                } else {
1193                    debug!(
1194                        msg_type = "NetworkRequest",
1195                        receiver = %new_info.receiver,
1196                        request_id = %new_info.request_id,
1197                        "Validation response sent to network"
1198                    );
1199                }
1200
1201                if self.stop {
1202                    ctx.stop(None).await;
1203                }
1204            }
1205        }
1206
1207        Ok(())
1208    }
1209
1210    async fn on_child_fault(
1211        &mut self,
1212        error: ActorError,
1213        ctx: &mut ActorContext<Self>,
1214    ) -> ChildAction {
1215        error!(
1216            node_key = %self.node_key,
1217            governance_id = %self.governance_id,
1218            gov_version = self.gov_version,
1219            sn = self.sn,
1220            error = %error,
1221            "Child fault in validation worker"
1222        );
1223        emit_fail(ctx, error).await;
1224        ChildAction::Stop
1225    }
1226}