Skip to main content

ave_core/evaluation/
worker.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    sync::Arc,
4};
5
6use crate::{
7    evaluation::{
8        compiler::{CompilerResponse, error::CompilerError},
9        request::{EvalWorkerContext, EvaluateData},
10        response::{
11            EvalRunnerError, EvaluationResult, EvaluatorError,
12            EvaluatorResponse as EvalRes,
13        },
14        runner::types::EvaluateInfo,
15    },
16    governance::{data::GovernanceData, model::Schema},
17    helpers::network::{NetworkMessage, service::NetworkSender},
18    model::common::{
19        emit_fail,
20        node::{SignTypesNode, get_sign},
21    },
22    subject::RequestSubjectData,
23    system::ConfigHelper,
24};
25
26use crate::helpers::network::ActorMessage;
27
28use async_trait::async_trait;
29use ave_common::{
30    SchemaType, ValueWrapper,
31    identity::{
32        DigestIdentifier, HashAlgorithm, PublicKey, Signed, hash_borsh,
33    },
34};
35
36use ave_network::ComunicateInfo;
37use json_patch::diff;
38
39use ave_actors::{
40    Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
41    NotPersistentActor,
42};
43
44use serde_json::Value;
45use tracing::{Span, debug, error, info_span, warn};
46
47use super::{
48    Evaluation, EvaluationMessage,
49    compiler::{TempCompiler, TempCompilerMessage},
50    request::EvaluationReq,
51    response::EvaluationRes,
52    runner::{Runner, RunnerMessage, RunnerResponse, types::RunnerResult},
53};
54
55/// A struct representing a EvalWorker actor.
56#[derive(Clone, Debug)]
57pub struct EvalWorker {
58    pub node_key: PublicKey,
59    pub our_key: Arc<PublicKey>,
60    pub governance_id: DigestIdentifier,
61    pub gov_version: u64,
62    pub sn: u64,
63    pub context: EvalWorkerContext,
64    pub init_state: Option<ValueWrapper>,
65    pub hash: HashAlgorithm,
66    pub network: Arc<NetworkSender>,
67    pub stop: bool,
68}
69
70impl EvalWorker {
71    async fn execute_contract(
72        &self,
73        ctx: &mut ActorContext<Self>,
74        runner_data: EvaluateInfo,
75        is_owner: bool,
76    ) -> Result<RunnerResponse, ActorError> {
77        let runner_actor = ctx.create_child("runner", Runner).await?;
78
79        let response = runner_actor
80            .ask(RunnerMessage {
81                data: runner_data,
82                is_owner,
83            })
84            .await;
85
86        runner_actor.ask_stop().await?;
87
88        response
89    }
90
91    async fn compile_contracts(
92        &self,
93        ctx: &mut ActorContext<Self>,
94        ids: &[SchemaType],
95        schemas: BTreeMap<SchemaType, Schema>,
96    ) -> Result<Option<CompilerError>, ActorError> {
97        let contracts_path = if let Some(config) =
98            ctx.system().get_helper::<ConfigHelper>("config").await
99        {
100            config.contracts_path
101        } else {
102            return Err(ActorError::Helper {
103                name: "config".to_owned(),
104                reason: "Not found".to_owned(),
105            });
106        };
107
108        let compiler = ctx
109            .create_child("temp_compiler", TempCompiler::new(self.hash))
110            .await?;
111
112        for id in ids {
113            let Some(schema) = schemas.get(id) else {
114                return Err(ActorError::Functional { description: "There is a contract that requires compilation but its scheme could not be found".to_owned()});
115            };
116
117            let response = compiler
118                .ask(TempCompilerMessage::Compile {
119                    contract_name: format!("{}_{}", self.governance_id, id),
120                    contract: schema.contract.clone(),
121                    initial_value: schema.initial_value.0.clone(),
122                    contract_path: contracts_path
123                        .join("contracts")
124                        .join(format!("{}_temp_{}", self.governance_id, id)),
125                })
126                .await?;
127
128            if let CompilerResponse::Error(e) = response {
129                compiler.ask_stop().await?;
130                return Ok(Some(e));
131            }
132        }
133
134        compiler.ask_stop().await?;
135        Ok(None)
136    }
137
138    async fn check_governance(
139        &self,
140        gov_version: u64,
141    ) -> Result<bool, ActorError> {
142        match self.gov_version.cmp(&gov_version) {
143            std::cmp::Ordering::Less => {
144                warn!(
145                    local_gov_version = self.gov_version,
146                    request_gov_version = gov_version,
147                    governance_id = %self.governance_id,
148                    sender = %self.node_key,
149                    "Received request with a higher governance version; ignoring request"
150                );
151                Err(ActorError::Functional {
152                    description:
153                        "Abort evaluation, request governance version is higher than local"
154                            .to_owned(),
155                })
156            }
157            std::cmp::Ordering::Equal => {
158                // If it is the same it means that we have the latest version of governance, we are up to date.
159                Ok(false)
160            }
161            std::cmp::Ordering::Greater => Ok(true),
162        }
163    }
164
165    async fn evaluate(
166        &self,
167        ctx: &mut ActorContext<Self>,
168        evaluation_req: &EvaluationReq,
169    ) -> Result<RunnerResult, EvaluatorError> {
170        let runner_data = evaluation_req
171            .build_evaluate_info(&self.init_state, &self.context)?;
172
173        // Mirar la parte final de execute contract.
174        let response = self
175            .execute_contract(ctx, runner_data, evaluation_req.signer_is_owner)
176            .await
177            .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
178
179        let (result, compilations) = match response {
180            RunnerResponse::Ok {
181                result,
182                compilations,
183            } => (result, compilations),
184            RunnerResponse::Error(runner_error) => {
185                return Err(EvaluatorError::from(runner_error));
186            }
187        };
188
189        if self.init_state.is_none() && !compilations.is_empty() {
190            let governance_data = GovernanceData::try_from(
191                result.final_state.clone(),
192            )
193            .map_err(|e| {
194                let e = format!(
195                    "can not convert GovernanceData from properties: {}",
196                    e
197                );
198                EvaluatorError::InternalError(e)
199            })?;
200
201            if let Some(error) = self
202                .compile_contracts(ctx, &compilations, governance_data.schemas)
203                .await
204                .map_err(|e| EvaluatorError::InternalError(e.to_string()))?
205            {
206                return Err(EvaluatorError::from(error));
207            };
208        }
209
210        Ok(result)
211    }
212
213    fn generate_json_patch(
214        prev_state: &Value,
215        new_state: &Value,
216    ) -> Result<Value, EvaluatorError> {
217        let patch = diff(prev_state, new_state);
218        serde_json::to_value(patch).map_err(|e| {
219            EvaluatorError::InternalError(format!(
220                "Can not generate json patch {}",
221                e
222            ))
223        })
224    }
225
226    fn build_request_hashes(
227        &self,
228        evaluation_req: &Signed<EvaluationReq>,
229    ) -> Result<(DigestIdentifier, DigestIdentifier), EvaluatorError> {
230        let eval_req_hash = hash_borsh(&*self.hash.hasher(), evaluation_req)
231            .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
232
233        let EvaluationReq {
234            event_request,
235            governance_id,
236            sn,
237            gov_version,
238            namespace,
239            schema_id,
240            signer,
241            ..
242        } = evaluation_req.content().clone();
243
244        let req_subject_data_hash = hash_borsh(
245            &*self.hash.hasher(),
246            &RequestSubjectData {
247                namespace,
248                schema_id,
249                subject_id: event_request.content().get_subject_id(),
250                governance_id,
251                sn,
252                gov_version,
253                signer,
254            },
255        )
256        .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
257
258        Ok((eval_req_hash, req_subject_data_hash))
259    }
260
261    async fn sign_result(
262        &self,
263        ctx: &mut ActorContext<Self>,
264        result: EvaluationResult,
265    ) -> Result<EvaluationRes, EvaluatorError> {
266        let result_hash = hash_borsh(&*self.hash.hasher(), &result)
267            .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
268
269        let result_hash_signature = get_sign(
270            ctx,
271            SignTypesNode::EvaluationSignature(result_hash.clone()),
272        )
273        .await
274        .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
275
276        Ok(EvaluationRes::Response {
277            result,
278            result_hash,
279            result_hash_signature,
280        })
281    }
282
283    async fn build_response(
284        &self,
285        ctx: &mut ActorContext<Self>,
286        evaluation: RunnerResult,
287        evaluation_req: Signed<EvaluationReq>,
288    ) -> Result<EvaluationRes, EvaluatorError> {
289        let (eval_req_hash, req_subject_data_hash) =
290            self.build_request_hashes(&evaluation_req)?;
291
292        let (patch, properties_hash) = match &evaluation_req.content().data {
293            EvaluateData::GovFact { state, .. } => {
294                let properties_hash =
295                    hash_borsh(&*self.hash.hasher(), &evaluation.final_state)
296                        .map_err(|e| {
297                        EvaluatorError::InternalError(e.to_string())
298                    })?;
299
300                let state = state.to_value_wrapper();
301                let patch = Self::generate_json_patch(
302                    &state.0,
303                    &evaluation.final_state.0,
304                )?;
305
306                (ValueWrapper(patch), properties_hash)
307            }
308            EvaluateData::TrackerSchemasFact { state, .. } => {
309                let properties_hash =
310                    hash_borsh(&*self.hash.hasher(), &evaluation.final_state)
311                        .map_err(|e| {
312                        EvaluatorError::InternalError(e.to_string())
313                    })?;
314
315                let patch = Self::generate_json_patch(
316                    &state.0,
317                    &evaluation.final_state.0,
318                )?;
319
320                (ValueWrapper(patch), properties_hash)
321            }
322            EvaluateData::GovTransfer { state } => {
323                let state = state.to_value_wrapper();
324                let properties_hash = hash_borsh(&*self.hash.hasher(), &state)
325                    .map_err(|e| {
326                        EvaluatorError::InternalError(e.to_string())
327                    })?;
328
329                (evaluation.final_state, properties_hash)
330            }
331            EvaluateData::GovConfirm { state } => {
332                let properties_hash =
333                    hash_borsh(&*self.hash.hasher(), &evaluation.final_state)
334                        .map_err(|e| {
335                        EvaluatorError::InternalError(e.to_string())
336                    })?;
337
338                let state = state.to_value_wrapper();
339                let patch = Self::generate_json_patch(
340                    &state.0,
341                    &evaluation.final_state.0,
342                )?;
343
344                (ValueWrapper(patch), properties_hash)
345            }
346            EvaluateData::TrackerSchemasTransfer { state, .. } => {
347                let properties_hash = hash_borsh(&*self.hash.hasher(), &state)
348                    .map_err(|e| {
349                        EvaluatorError::InternalError(e.to_string())
350                    })?;
351
352                (evaluation.final_state, properties_hash)
353            }
354        };
355
356        let result = EvaluationResult::Ok {
357            response: EvalRes {
358                patch,
359                properties_hash,
360                appr_required: evaluation.approval_required,
361            },
362            eval_req_hash,
363            req_subject_data_hash,
364        };
365
366        self.sign_result(ctx, result).await
367    }
368
369    async fn build_response_error(
370        &self,
371        ctx: &mut ActorContext<Self>,
372        evaluator_error: EvaluatorError,
373        evaluation_req: Signed<EvaluationReq>,
374    ) -> Result<EvaluationRes, EvaluatorError> {
375        match &evaluator_error {
376            EvaluatorError::InvalidEventSignature
377            | EvaluatorError::InvalidEventRequest(..) => {
378                return Ok(EvaluationRes::Abort(evaluator_error.to_string()));
379            }
380            EvaluatorError::Runner(EvalRunnerError::ContractNotFound(..)) => {
381                return Ok(EvaluationRes::Reboot);
382            }
383            _ => {}
384        };
385
386        let (eval_req_hash, req_subject_data_hash) =
387            self.build_request_hashes(&evaluation_req)?;
388
389        let result = EvaluationResult::Error {
390            error: evaluator_error,
391            eval_req_hash,
392            req_subject_data_hash,
393        };
394
395        self.sign_result(ctx, result).await
396    }
397
398    async fn create_res(
399        &self,
400        ctx: &mut ActorContext<Self>,
401        reboot: bool,
402        evaluation_req: &Signed<EvaluationReq>,
403    ) -> Result<EvaluationRes, EvaluatorError> {
404        let evaluation = if reboot {
405            EvaluationRes::Reboot
406        } else {
407            match self.evaluate(ctx, evaluation_req.content()).await {
408                Ok(evaluation) => {
409                    self.build_response(ctx, evaluation, evaluation_req.clone())
410                        .await?
411                }
412                Err(error) => {
413                    if let EvaluatorError::InternalError(..) = &error {
414                        return Err(error);
415                    } else {
416                        self.build_response_error(
417                            ctx,
418                            error,
419                            evaluation_req.clone(),
420                        )
421                        .await?
422                    }
423                }
424            }
425        };
426
427        Ok(evaluation)
428    }
429
430    fn check_data(
431        &self,
432        evaluation_req: &Signed<EvaluationReq>,
433    ) -> Result<(), EvaluatorError> {
434        let event_is_for_gov = evaluation_req.content().data.is_gov_event();
435        match (self.init_state.is_none(), event_is_for_gov) {
436            (true, false) => return Err(EvaluatorError::InvalidEventRequest(
437                "Evaluator is for governance but eval request is for tracker"
438                    .to_owned(),
439            )),
440            (false, true) => return Err(EvaluatorError::InvalidEventRequest(
441                "Evaluator is for tracker but eval request is for governance"
442                    .to_owned(),
443            )),
444            _ => {}
445        };
446
447        if evaluation_req.content().governance_id != self.governance_id {
448            return Err(EvaluatorError::InvalidEventRequest(format!(
449                "Evaluator governance_id {} and eval request governance_id {} are different",
450                self.governance_id,
451                evaluation_req.content().governance_id
452            )));
453        }
454
455        if evaluation_req.verify().is_err() {
456            return Err(EvaluatorError::InvalidEventSignature);
457        }
458
459        if evaluation_req.content().event_request.verify().is_err() {
460            return Err(EvaluatorError::InvalidEventSignature);
461        }
462
463        if self.gov_version == evaluation_req.content().gov_version {
464            let signer = evaluation_req
465                .content()
466                .event_request
467                .signature()
468                .signer
469                .clone();
470
471            if evaluation_req
472                .content()
473                .event_request
474                .content()
475                .is_fact_event()
476            {
477                let Some((issuers, issuer_any)) = self.context.issuers() else {
478                    return Err(EvaluatorError::InvalidEventRequest(
479                        "Fact event evaluation context does not include issuers"
480                            .to_owned(),
481                    ));
482                };
483
484                if !issuer_any && !issuers.contains(&signer) {
485                    return Err(EvaluatorError::InvalidEventRequest(
486                        "In fact events, the signer has to be an issuer"
487                            .to_owned(),
488                    ));
489                }
490            } else if signer != self.node_key {
491                return Err(EvaluatorError::InvalidEventRequest(
492                    "In non-fact events, the signer has to be the sender"
493                        .to_owned(),
494                ));
495            }
496        }
497
498        Ok(())
499    }
500}
501
502#[derive(Debug, Clone)]
503pub enum EvalWorkerMessage {
504    Update {
505        gov_version: u64,
506        issuers: BTreeSet<PublicKey>,
507        issuer_any: bool,
508    },
509    LocalEvaluation {
510        evaluation_req: Signed<EvaluationReq>,
511    },
512    NetworkRequest {
513        evaluation_req: Signed<EvaluationReq>,
514        sender: PublicKey,
515        info: ComunicateInfo,
516    },
517}
518
519impl Message for EvalWorkerMessage {}
520
521#[async_trait]
522impl Actor for EvalWorker {
523    type Event = ();
524    type Message = EvalWorkerMessage;
525    type Response = ();
526
527    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
528        parent_span.map_or_else(
529            || info_span!("EvalWorker", id),
530            |parent_span| info_span!(parent: parent_span, "EvalWorker", id),
531        )
532    }
533}
534
535impl NotPersistentActor for EvalWorker {}
536
537#[async_trait]
538impl Handler<Self> for EvalWorker {
539    async fn handle_message(
540        &mut self,
541        _sender: ActorPath,
542        msg: EvalWorkerMessage,
543        ctx: &mut ActorContext<Self>,
544    ) -> Result<(), ActorError> {
545        match msg {
546            EvalWorkerMessage::Update {
547                gov_version,
548                issuers,
549                issuer_any,
550            } => {
551                self.gov_version = gov_version;
552                self.context = EvalWorkerContext::Governance {
553                    issuers,
554                    issuer_any,
555                };
556            }
557            EvalWorkerMessage::LocalEvaluation { evaluation_req } => {
558                let evaluation =
559                    match self.create_res(ctx, false, &evaluation_req).await {
560                        Ok(eval) => eval,
561                        Err(e) => {
562                            error!(
563                                msg_type = "LocalEvaluation",
564                                error = %e,
565                                "Failed to create evaluation response"
566                            );
567                            return Err(emit_fail(
568                                ctx,
569                                ActorError::FunctionalCritical {
570                                    description: e.to_string(),
571                                },
572                            )
573                            .await);
574                        }
575                    };
576
577                match ctx.get_parent::<Evaluation>().await {
578                    Ok(evaluation_actor) => {
579                        if let Err(e) = evaluation_actor
580                            .tell(EvaluationMessage::Response {
581                                evaluation_res: evaluation.clone(),
582                                sender: (*self.our_key).clone(),
583                            })
584                            .await
585                        {
586                            error!(
587                                msg_type = "LocalEvaluation",
588                                error = %e,
589                                "Failed to send response to evaluation actor"
590                            );
591                            return Err(emit_fail(ctx, e).await);
592                        }
593
594                        debug!(
595                            msg_type = "LocalEvaluation",
596                            "Local evaluation completed successfully"
597                        );
598                    }
599                    Err(e) => {
600                        error!(
601                            msg_type = "LocalEvaluation",
602                            path = %ctx.path().parent(),
603                            "Evaluation actor not found"
604                        );
605                        return Err(e);
606                    }
607                }
608
609                ctx.stop(None).await;
610            }
611            EvalWorkerMessage::NetworkRequest {
612                evaluation_req,
613                info,
614                sender,
615            } => {
616                if sender != evaluation_req.signature().signer
617                    || sender != self.node_key
618                {
619                    warn!(
620                        msg_type = "NetworkRequest",
621                        expected_sender = %self.node_key,
622                        received_sender = %sender,
623                        signer = %evaluation_req.signature().signer,
624                        "Unexpected sender"
625                    );
626                    if self.stop {
627                        ctx.stop(None).await;
628                    }
629
630                    return Ok(());
631                }
632
633                let reboot = match self
634                    .check_governance(evaluation_req.content().gov_version)
635                    .await
636                {
637                    Ok(reboot) => reboot,
638                    Err(e) => {
639                        warn!(
640                            msg_type = "NetworkRequest",
641                            error = %e,
642                            "Failed to check governance"
643                        );
644                        if let ActorError::Functional { .. } = e {
645                            if self.stop {
646                                ctx.stop(None).await;
647                            }
648                            return Err(e);
649                        } else {
650                            return Err(emit_fail(ctx, e).await);
651                        }
652                    }
653                };
654
655                let evaluation = if let Err(error) =
656                    self.check_data(&evaluation_req)
657                {
658                    match self
659                        .build_response_error(
660                            ctx,
661                            error,
662                            evaluation_req.clone(),
663                        )
664                        .await
665                    {
666                        Ok(eval) => eval,
667                        Err(e) => {
668                            error!(
669                                msg_type = "NetworkRequest",
670                                error = %e,
671                                "Failed to build error response"
672                            );
673                            return Err(emit_fail(
674                                ctx,
675                                ActorError::FunctionalCritical {
676                                    description: e.to_string(),
677                                },
678                            )
679                            .await);
680                        }
681                    }
682                } else {
683                    match self.create_res(ctx, reboot, &evaluation_req).await {
684                        Ok(eval) => eval,
685                        Err(e) => {
686                            error!(
687                                msg_type = "NetworkRequest",
688                                error = %e,
689                                "Internal error during evaluation"
690                            );
691                            return Err(emit_fail(
692                                ctx,
693                                ActorError::FunctionalCritical {
694                                    description: e.to_string(),
695                                },
696                            )
697                            .await);
698                        }
699                    }
700                };
701
702                let new_info = ComunicateInfo {
703                    receiver: sender.clone(),
704                    request_id: info.request_id.clone(),
705                    version: info.version,
706                    receiver_actor: format!(
707                        "/user/request/{}/evaluation/{}",
708                        evaluation_req
709                            .content()
710                            .event_request
711                            .content()
712                            .get_subject_id(),
713                        self.our_key.clone()
714                    ),
715                };
716
717                if let Err(e) = self
718                    .network
719                    .send_command(ave_network::CommandHelper::SendMessage {
720                        message: NetworkMessage {
721                            info: new_info,
722                            message: ActorMessage::EvaluationRes {
723                                res: evaluation,
724                            },
725                        },
726                    })
727                    .await
728                {
729                    error!(
730                        msg_type = "NetworkRequest",
731                        error = %e,
732                        "Failed to send response to network"
733                    );
734                    return Err(emit_fail(ctx, e).await);
735                };
736
737                debug!(
738                    msg_type = "NetworkRequest",
739                    request_id = %info.request_id,
740                    version = info.version,
741                    sender = %sender,
742                    "Network evaluation request processed successfully"
743                );
744
745                if self.stop {
746                    ctx.stop(None).await;
747                }
748            }
749        }
750
751        Ok(())
752    }
753
754    async fn on_child_fault(
755        &mut self,
756        error: ActorError,
757        ctx: &mut ActorContext<Self>,
758    ) -> ChildAction {
759        error!(
760            governance_id = %self.governance_id,
761            gov_version = self.gov_version,
762            sn = self.sn,
763            node_key = %self.node_key,
764            error = %error,
765            "Child fault in evaluation worker"
766        );
767        emit_fail(ctx, error).await;
768        ChildAction::Stop
769    }
770}