Skip to main content

ave_core/evaluation/
worker.rs

1use std::{collections::{BTreeMap, BTreeSet}, sync::Arc};
2
3use crate::{
4    evaluation::{
5        compiler::{CompilerResponse, error::CompilerError},
6        request::{EvalWorkerContext, EvaluateData},
7        response::{
8            EvalRunnerError, EvaluationResult, EvaluatorError,
9            EvaluatorResponse as EvalRes,
10        },
11        runner::types::EvaluateInfo,
12    },
13    governance::{data::GovernanceData, model::Schema},
14    helpers::network::{NetworkMessage, service::NetworkSender},
15    model::common::{
16        emit_fail,
17        node::{SignTypesNode, get_sign},
18    },
19    subject::RequestSubjectData,
20    system::ConfigHelper,
21};
22
23use crate::helpers::network::ActorMessage;
24
25use async_trait::async_trait;
26use ave_common::{
27    SchemaType, ValueWrapper,
28    identity::{
29        DigestIdentifier, HashAlgorithm, PublicKey, Signed, hash_borsh,
30    },
31};
32
33use json_patch::diff;
34use ave_network::ComunicateInfo;
35
36use ave_actors::{
37    Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
38    NotPersistentActor,
39};
40
41use serde_json::Value;
42use tracing::{Span, debug, error, info_span, warn};
43
44use super::{
45    Evaluation, EvaluationMessage,
46    compiler::{TempCompiler, TempCompilerMessage},
47    request::EvaluationReq,
48    response::EvaluationRes,
49    runner::{Runner, RunnerMessage, RunnerResponse, types::RunnerResult},
50};
51
52/// A struct representing a EvalWorker actor.
53#[derive(Clone, Debug)]
54pub struct EvalWorker {
55    pub node_key: PublicKey,
56    pub our_key: Arc<PublicKey>,
57    pub governance_id: DigestIdentifier,
58    pub gov_version: u64,
59    pub sn: u64,
60    pub context: EvalWorkerContext,
61    pub init_state: Option<ValueWrapper>,
62    pub hash: HashAlgorithm,
63    pub network: Arc<NetworkSender>,
64    pub stop: bool,
65}
66
67impl EvalWorker {
68    async fn execute_contract(
69        &self,
70        ctx: &mut ActorContext<Self>,
71        runner_data: EvaluateInfo,
72        is_owner: bool,
73    ) -> Result<RunnerResponse, ActorError> {
74        let runner_actor = ctx.create_child("runner", Runner).await?;
75
76        let response = runner_actor
77            .ask(RunnerMessage {
78                data: runner_data,
79                is_owner,
80            })
81            .await;
82
83        runner_actor.ask_stop().await?;
84
85        response
86    }
87
88    async fn compile_contracts(
89        &self,
90        ctx: &mut ActorContext<Self>,
91        ids: &[SchemaType],
92        schemas: BTreeMap<SchemaType, Schema>,
93    ) -> Result<Option<CompilerError>, ActorError> {
94        let contracts_path = if let Some(config) =
95            ctx.system().get_helper::<ConfigHelper>("config").await
96        {
97            config.contracts_path
98        } else {
99            return Err(ActorError::Helper {
100                name: "config".to_owned(),
101                reason: "Not found".to_owned(),
102            });
103        };
104
105        let compiler = ctx
106            .create_child("temp_compiler", TempCompiler::new(self.hash))
107            .await?;
108
109        for id in ids {
110            let Some(schema) = schemas.get(id) else {
111                return Err(ActorError::Functional { description: "There is a contract that requires compilation but its scheme could not be found".to_owned()});
112            };
113
114            let response = compiler
115                .ask(TempCompilerMessage::Compile {
116                    contract_name: format!("{}_{}", self.governance_id, id),
117                    contract: schema.contract.clone(),
118                    initial_value: schema.initial_value.0.clone(),
119                    contract_path: contracts_path
120                        .join("contracts")
121                        .join(format!("{}_temp_{}", self.governance_id, id)),
122                })
123                .await?;
124
125            if let CompilerResponse::Error(e) = response {
126                compiler.ask_stop().await?;
127                return Ok(Some(e));
128            }
129        }
130
131        compiler.ask_stop().await?;
132        Ok(None)
133    }
134
135    async fn check_governance(
136        &self,
137        gov_version: u64,
138    ) -> Result<bool, ActorError> {
139        match self.gov_version.cmp(&gov_version) {
140            std::cmp::Ordering::Less => {
141                warn!(
142                    local_gov_version = self.gov_version,
143                    request_gov_version = gov_version,
144                    governance_id = %self.governance_id,
145                    sender = %self.node_key,
146                    "Received request with a higher governance version; ignoring request"
147                );
148                Err(ActorError::Functional {
149                    description:
150                        "Abort evaluation, request governance version is higher than local"
151                            .to_owned(),
152                })
153            }
154            std::cmp::Ordering::Equal => {
155                // If it is the same it means that we have the latest version of governance, we are up to date.
156                Ok(false)
157            }
158            std::cmp::Ordering::Greater => Ok(true),
159        }
160    }
161
162    async fn evaluate(
163        &self,
164        ctx: &mut ActorContext<Self>,
165        evaluation_req: &EvaluationReq,
166    ) -> Result<RunnerResult, EvaluatorError> {
167        let runner_data = evaluation_req
168            .build_evaluate_info(&self.init_state, &self.context)?;
169
170        // Mirar la parte final de execute contract.
171        let response = self
172            .execute_contract(ctx, runner_data, evaluation_req.signer_is_owner)
173            .await
174            .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
175
176        let (result, compilations) = match response {
177            RunnerResponse::Ok {
178                result,
179                compilations,
180            } => (result, compilations),
181            RunnerResponse::Error(runner_error) => {
182                return Err(EvaluatorError::from(runner_error));
183            }
184        };
185
186        if self.init_state.is_none() && !compilations.is_empty() {
187            let governance_data = GovernanceData::try_from(
188                result.final_state.clone(),
189            )
190            .map_err(|e| {
191                let e = format!(
192                    "can not convert GovernanceData from properties: {}",
193                    e
194                );
195                EvaluatorError::InternalError(e)
196            })?;
197
198            if let Some(error) = self
199                .compile_contracts(ctx, &compilations, governance_data.schemas)
200                .await
201                .map_err(|e| EvaluatorError::InternalError(e.to_string()))?
202            {
203                return Err(EvaluatorError::from(error));
204            };
205        }
206
207        Ok(result)
208    }
209
210    fn generate_json_patch(
211        prev_state: &Value,
212        new_state: &Value,
213    ) -> Result<Value, EvaluatorError> {
214        let patch = diff(prev_state, new_state);
215        serde_json::to_value(patch).map_err(|e| {
216            EvaluatorError::InternalError(format!(
217                "Can not generate json patch {}",
218                e
219            ))
220        })
221    }
222
223    fn build_request_hashes(
224        &self,
225        evaluation_req: &Signed<EvaluationReq>,
226    ) -> Result<(DigestIdentifier, DigestIdentifier), EvaluatorError> {
227        let eval_req_hash = hash_borsh(&*self.hash.hasher(), evaluation_req)
228            .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
229
230        let EvaluationReq {
231            event_request,
232            governance_id,
233            sn,
234            gov_version,
235            namespace,
236            schema_id,
237            signer,
238            ..
239        } = evaluation_req.content().clone();
240
241        let req_subject_data_hash = hash_borsh(
242            &*self.hash.hasher(),
243            &RequestSubjectData {
244                namespace,
245                schema_id,
246                subject_id: event_request.content().get_subject_id(),
247                governance_id,
248                sn,
249                gov_version,
250                signer,
251            },
252        )
253        .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
254
255        Ok((eval_req_hash, req_subject_data_hash))
256    }
257
258    async fn sign_result(
259        &self,
260        ctx: &mut ActorContext<Self>,
261        result: EvaluationResult,
262    ) -> Result<EvaluationRes, EvaluatorError> {
263        let result_hash = hash_borsh(&*self.hash.hasher(), &result)
264            .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
265
266        let result_hash_signature = get_sign(
267            ctx,
268            SignTypesNode::EvaluationSignature(result_hash.clone()),
269        )
270        .await
271        .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
272
273        Ok(EvaluationRes::Response {
274            result,
275            result_hash,
276            result_hash_signature,
277        })
278    }
279
280    async fn build_response(
281        &self,
282        ctx: &mut ActorContext<Self>,
283        evaluation: RunnerResult,
284        evaluation_req: Signed<EvaluationReq>,
285    ) -> Result<EvaluationRes, EvaluatorError> {
286        let (eval_req_hash, req_subject_data_hash) =
287            self.build_request_hashes(&evaluation_req)?;
288
289        let (patch, properties_hash) = match &evaluation_req.content().data {
290            EvaluateData::GovFact { state, .. } => {
291                let properties_hash =
292                    hash_borsh(&*self.hash.hasher(), &evaluation.final_state)
293                        .map_err(|e| {
294                        EvaluatorError::InternalError(e.to_string())
295                    })?;
296
297                let state = state.to_value_wrapper();
298                let patch = Self::generate_json_patch(
299                    &state.0,
300                    &evaluation.final_state.0,
301                )?;
302
303                (ValueWrapper(patch), properties_hash)
304            }
305            EvaluateData::TrackerSchemasFact { state, .. } => {
306                let properties_hash =
307                    hash_borsh(&*self.hash.hasher(), &evaluation.final_state)
308                        .map_err(|e| {
309                        EvaluatorError::InternalError(e.to_string())
310                    })?;
311
312                let patch = Self::generate_json_patch(
313                    &state.0,
314                    &evaluation.final_state.0,
315                )?;
316
317                (ValueWrapper(patch), properties_hash)
318            }
319            EvaluateData::GovTransfer { state } => {
320                let state = state.to_value_wrapper();
321                let properties_hash = hash_borsh(&*self.hash.hasher(), &state)
322                    .map_err(|e| {
323                        EvaluatorError::InternalError(e.to_string())
324                    })?;
325
326                (evaluation.final_state, properties_hash)
327            }
328            EvaluateData::GovConfirm { state } => {
329                let properties_hash =
330                    hash_borsh(&*self.hash.hasher(), &evaluation.final_state)
331                        .map_err(|e| {
332                        EvaluatorError::InternalError(e.to_string())
333                    })?;
334
335                let state = state.to_value_wrapper();
336                let patch = Self::generate_json_patch(
337                    &state.0,
338                    &evaluation.final_state.0,
339                )?;
340
341                (ValueWrapper(patch), properties_hash)
342            }
343            EvaluateData::TrackerSchemasTransfer { state, .. } => {
344                let properties_hash = hash_borsh(&*self.hash.hasher(), &state)
345                    .map_err(|e| {
346                        EvaluatorError::InternalError(e.to_string())
347                    })?;
348
349                (evaluation.final_state, properties_hash)
350            }
351        };
352
353        let result = EvaluationResult::Ok {
354            response: EvalRes {
355                patch,
356                properties_hash,
357                appr_required: evaluation.approval_required,
358            },
359            eval_req_hash,
360            req_subject_data_hash,
361        };
362
363        self.sign_result(ctx, result).await
364    }
365
366    async fn build_response_error(
367        &self,
368        ctx: &mut ActorContext<Self>,
369        evaluator_error: EvaluatorError,
370        evaluation_req: Signed<EvaluationReq>,
371    ) -> Result<EvaluationRes, EvaluatorError> {
372        match &evaluator_error {
373            EvaluatorError::InvalidEventSignature
374            | EvaluatorError::InvalidEventRequest(..) => {
375                return Ok(EvaluationRes::Abort(evaluator_error.to_string()));
376            }
377            EvaluatorError::Runner(EvalRunnerError::ContractNotFound(..)) => {
378                return Ok(EvaluationRes::Reboot);
379            }
380            _ => {}
381        };
382
383        let (eval_req_hash, req_subject_data_hash) =
384            self.build_request_hashes(&evaluation_req)?;
385
386        let result = EvaluationResult::Error {
387            error: evaluator_error,
388            eval_req_hash,
389            req_subject_data_hash,
390        };
391
392        self.sign_result(ctx, result).await
393    }
394
395    async fn create_res(
396        &self,
397        ctx: &mut ActorContext<Self>,
398        reboot: bool,
399        evaluation_req: &Signed<EvaluationReq>,
400    ) -> Result<EvaluationRes, EvaluatorError> {
401        let evaluation = if reboot {
402            EvaluationRes::Reboot
403        } else {
404            match self.evaluate(ctx, evaluation_req.content()).await {
405                Ok(evaluation) => {
406                    self.build_response(ctx, evaluation, evaluation_req.clone())
407                        .await?
408                }
409                Err(error) => {
410                    if let EvaluatorError::InternalError(..) = &error {
411                        return Err(error);
412                    } else {
413                        self.build_response_error(
414                            ctx,
415                            error,
416                            evaluation_req.clone(),
417                        )
418                        .await?
419                    }
420                }
421            }
422        };
423
424        Ok(evaluation)
425    }
426
427    fn check_data(
428        &self,
429        evaluation_req: &Signed<EvaluationReq>,
430    ) -> Result<(), EvaluatorError> {
431        let event_is_for_gov = evaluation_req.content().data.is_gov_event();
432        match (self.init_state.is_none(), event_is_for_gov) {
433            (true, false) => return Err(EvaluatorError::InvalidEventRequest(
434                "Evaluator is for governance but eval request is for tracker"
435                    .to_owned(),
436            )),
437            (false, true) => return Err(EvaluatorError::InvalidEventRequest(
438                "Evaluator is for tracker but eval request is for governance"
439                    .to_owned(),
440            )),
441            _ => {}
442        };
443
444        if evaluation_req.content().governance_id != self.governance_id {
445            return Err(EvaluatorError::InvalidEventRequest(format!(
446                "Evaluator governance_id {} and eval request governance_id {} are different",
447                self.governance_id,
448                evaluation_req.content().governance_id
449            )));
450        }
451
452        if evaluation_req.verify().is_err() {
453            return Err(EvaluatorError::InvalidEventSignature);
454        }
455
456        if evaluation_req.content().event_request.verify().is_err() {
457            return Err(EvaluatorError::InvalidEventSignature);
458        }
459
460        if self.gov_version == evaluation_req.content().gov_version {
461            let signer = evaluation_req
462                .content()
463                .event_request
464                .signature()
465                .signer
466                .clone();
467
468            if evaluation_req
469                .content()
470                .event_request
471                .content()
472                .is_fact_event()
473            {
474                let Some((issuers, issuer_any)) = self.context.issuers()
475                else {
476                    return Err(EvaluatorError::InvalidEventRequest(
477                        "Fact event evaluation context does not include issuers"
478                            .to_owned(),
479                    ));
480                };
481
482                if !issuer_any && !issuers.contains(&signer) {
483                    return Err(EvaluatorError::InvalidEventRequest(
484                        "In fact events, the signer has to be an issuer"
485                            .to_owned(),
486                    ));
487                }
488            } else if signer != self.node_key {
489                return Err(EvaluatorError::InvalidEventRequest(
490                    "In non-fact events, the signer has to be the sender"
491                        .to_owned(),
492                ));
493            }
494        }
495
496        Ok(())
497    }
498}
499
500#[derive(Debug, Clone)]
501pub enum EvalWorkerMessage {
502    Update {
503        gov_version: u64,
504        issuers: BTreeSet<PublicKey>,
505        issuer_any: bool,
506    },
507    LocalEvaluation {
508        evaluation_req: Signed<EvaluationReq>,
509    },
510    NetworkRequest {
511        evaluation_req: Signed<EvaluationReq>,
512        sender: PublicKey,
513        info: ComunicateInfo,
514    },
515}
516
517impl Message for EvalWorkerMessage {}
518
519#[async_trait]
520impl Actor for EvalWorker {
521    type Event = ();
522    type Message = EvalWorkerMessage;
523    type Response = ();
524
525    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
526        parent_span.map_or_else(
527            || info_span!("EvalWorker", id),
528            |parent_span| info_span!(parent: parent_span, "EvalWorker", id),
529        )
530    }
531}
532
533impl NotPersistentActor for EvalWorker {}
534
535#[async_trait]
536impl Handler<Self> for EvalWorker {
537    async fn handle_message(
538        &mut self,
539        _sender: ActorPath,
540        msg: EvalWorkerMessage,
541        ctx: &mut ActorContext<Self>,
542    ) -> Result<(), ActorError> {
543        match msg {
544            EvalWorkerMessage::Update {
545                gov_version,
546                issuers,
547                issuer_any,
548            } => {
549                self.gov_version = gov_version;
550                self.context = EvalWorkerContext::Governance {
551                    issuers,
552                    issuer_any,
553                };
554            }
555            EvalWorkerMessage::LocalEvaluation { evaluation_req } => {
556                let evaluation =
557                    match self.create_res(ctx, false, &evaluation_req).await {
558                        Ok(eval) => eval,
559                        Err(e) => {
560                            error!(
561                                msg_type = "LocalEvaluation",
562                                error = %e,
563                                "Failed to create evaluation response"
564                            );
565                            return Err(emit_fail(
566                                ctx,
567                                ActorError::FunctionalCritical {
568                                    description: e.to_string(),
569                                },
570                            )
571                            .await);
572                        }
573                    };
574
575                match ctx.get_parent::<Evaluation>().await {
576                    Ok(evaluation_actor) => {
577                        if let Err(e) = evaluation_actor
578                            .tell(EvaluationMessage::Response {
579                                evaluation_res: evaluation.clone(),
580                                sender: (*self.our_key).clone(),
581                            })
582                            .await
583                        {
584                            error!(
585                                msg_type = "LocalEvaluation",
586                                error = %e,
587                                "Failed to send response to evaluation actor"
588                            );
589                            return Err(emit_fail(ctx, e).await);
590                        }
591
592                        debug!(
593                            msg_type = "LocalEvaluation",
594                            "Local evaluation completed successfully"
595                        );
596                    }
597                    Err(e) => {
598                        error!(
599                            msg_type = "LocalEvaluation",
600                            path = %ctx.path().parent(),
601                            "Evaluation actor not found"
602                        );
603                        return Err(e);
604                    }
605                }
606
607                ctx.stop(None).await;
608            }
609            EvalWorkerMessage::NetworkRequest {
610                evaluation_req,
611                info,
612                sender,
613            } => {
614                if sender != evaluation_req.signature().signer
615                    || sender != self.node_key
616                {
617                    warn!(
618                        msg_type = "NetworkRequest",
619                        expected_sender = %self.node_key,
620                        received_sender = %sender,
621                        signer = %evaluation_req.signature().signer,
622                        "Unexpected sender"
623                    );
624                    if self.stop {
625                        ctx.stop(None).await;
626                    }
627
628                    return Ok(());
629                }
630
631                let reboot = match self
632                    .check_governance(evaluation_req.content().gov_version)
633                    .await
634                {
635                    Ok(reboot) => reboot,
636                    Err(e) => {
637                        warn!(
638                            msg_type = "NetworkRequest",
639                            error = %e,
640                            "Failed to check governance"
641                        );
642                        if let ActorError::Functional { .. } = e {
643                            if self.stop {
644                                ctx.stop(None).await;
645                            }
646                            return Err(e);
647                        } else {
648                            return Err(emit_fail(ctx, e).await);
649                        }
650                    }
651                };
652
653                let evaluation = if let Err(error) =
654                    self.check_data(&evaluation_req)
655                {
656                    match self
657                        .build_response_error(
658                            ctx,
659                            error,
660                            evaluation_req.clone(),
661                        )
662                        .await
663                    {
664                        Ok(eval) => eval,
665                        Err(e) => {
666                            error!(
667                                msg_type = "NetworkRequest",
668                                error = %e,
669                                "Failed to build error response"
670                            );
671                            return Err(emit_fail(
672                                ctx,
673                                ActorError::FunctionalCritical {
674                                    description: e.to_string(),
675                                },
676                            )
677                            .await);
678                        }
679                    }
680                } else {
681                    match self.create_res(ctx, reboot, &evaluation_req).await {
682                        Ok(eval) => eval,
683                        Err(e) => {
684                            error!(
685                                msg_type = "NetworkRequest",
686                                error = %e,
687                                "Internal error during evaluation"
688                            );
689                            return Err(emit_fail(
690                                ctx,
691                                ActorError::FunctionalCritical {
692                                    description: e.to_string(),
693                                },
694                            )
695                            .await);
696                        }
697                    }
698                };
699
700                let new_info = ComunicateInfo {
701                    receiver: sender.clone(),
702                    request_id: info.request_id.clone(),
703                    version: info.version,
704                    receiver_actor: format!(
705                        "/user/request/{}/evaluation/{}",
706                        evaluation_req
707                            .content()
708                            .event_request
709                            .content()
710                            .get_subject_id(),
711                        self.our_key.clone()
712                    ),
713                };
714
715                if let Err(e) = self
716                    .network
717                    .send_command(ave_network::CommandHelper::SendMessage {
718                        message: NetworkMessage {
719                            info: new_info,
720                            message: ActorMessage::EvaluationRes {
721                                res: evaluation,
722                            },
723                        },
724                    })
725                    .await
726                {
727                    error!(
728                        msg_type = "NetworkRequest",
729                        error = %e,
730                        "Failed to send response to network"
731                    );
732                    return Err(emit_fail(ctx, e).await);
733                };
734
735                debug!(
736                    msg_type = "NetworkRequest",
737                    request_id = %info.request_id,
738                    version = info.version,
739                    sender = %sender,
740                    "Network evaluation request processed successfully"
741                );
742
743                if self.stop {
744                    ctx.stop(None).await;
745                }
746            }
747        }
748
749        Ok(())
750    }
751
752    async fn on_child_fault(
753        &mut self,
754        error: ActorError,
755        ctx: &mut ActorContext<Self>,
756    ) -> ChildAction {
757        error!(
758            governance_id = %self.governance_id,
759            gov_version = self.gov_version,
760            sn = self.sn,
761            node_key = %self.node_key,
762            error = %error,
763            "Child fault in evaluation worker"
764        );
765        emit_fail(ctx, error).await;
766        ChildAction::Stop
767    }
768}