Skip to main content

ave_core/evaluation/
worker.rs

1use std::{collections::BTreeMap, sync::Arc};
2
3use crate::{
4    evaluation::{
5        compiler::{CompilerResponse, error::CompilerError},
6        request::EvaluateData,
7        response::{
8            EvalRunnerError, EvaluatorError, EvaluatorResponse as EvalRes,
9        },
10        runner::types::EvaluateInfo,
11    },
12    governance::{data::GovernanceData, model::Schema},
13    helpers::network::{NetworkMessage, service::NetworkSender},
14    model::common::{
15        emit_fail,
16        node::{SignTypesNode, get_sign},
17    },
18    subject::RequestSubjectData,
19    system::ConfigHelper,
20};
21
22use crate::helpers::network::ActorMessage;
23
24use async_trait::async_trait;
25use ave_common::{
26    SchemaType, ValueWrapper,
27    identity::{
28        DigestIdentifier, HashAlgorithm, PublicKey, Signed, hash_borsh,
29    },
30};
31
32use json_patch::diff;
33use network::ComunicateInfo;
34
35use ave_actors::{
36    Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
37    NotPersistentActor,
38};
39
40use serde_json::Value;
41use tracing::{Span, debug, error, info_span, warn};
42
43use super::{
44    Evaluation, EvaluationMessage,
45    compiler::{TempCompiler, TempCompilerMessage},
46    request::EvaluationReq,
47    response::EvaluationRes,
48    runner::{Runner, RunnerMessage, RunnerResponse, types::RunnerResult},
49};
50
51/// A struct representing a EvalWorker actor.
52#[derive(Clone, Debug)]
53pub struct EvalWorker {
54    pub node_key: PublicKey,
55    pub our_key: Arc<PublicKey>,
56    pub governance_id: DigestIdentifier,
57    pub gov_version: u64,
58    pub sn: u64,
59    pub init_state: Option<ValueWrapper>,
60    pub hash: HashAlgorithm,
61    pub network: Arc<NetworkSender>,
62    pub stop: bool,
63}
64
65impl EvalWorker {
66    async fn execute_contract(
67        &self,
68        ctx: &mut ActorContext<Self>,
69        runner_data: EvaluateInfo,
70        is_owner: bool,
71    ) -> Result<RunnerResponse, ActorError> {
72        let runner_actor = ctx.create_child("runner", Runner).await?;
73
74        let response = runner_actor
75            .ask(RunnerMessage {
76                data: runner_data,
77                is_owner,
78            })
79            .await;
80
81        runner_actor.ask_stop().await?;
82
83        response
84    }
85
86    async fn compile_contracts(
87        &self,
88        ctx: &mut ActorContext<Self>,
89        ids: &[SchemaType],
90        schemas: BTreeMap<SchemaType, Schema>,
91    ) -> Result<Option<CompilerError>, ActorError> {
92        let contracts_path = if let Some(config) =
93            ctx.system().get_helper::<ConfigHelper>("config").await
94        {
95            config.contracts_path
96        } else {
97            return Err(ActorError::Helper {
98                name: "config".to_owned(),
99                reason: "Not found".to_owned(),
100            });
101        };
102
103        let compiler = ctx
104            .create_child("temp_compiler", TempCompiler::new(self.hash))
105            .await?;
106
107        for id in ids {
108            let Some(schema) = schemas.get(id) else {
109                return Err(ActorError::Functional { description: "There is a contract that requires compilation but its scheme could not be found".to_owned()});
110            };
111
112            let response = compiler
113                .ask(TempCompilerMessage::Compile {
114                    contract_name: format!("{}_{}", self.governance_id, id),
115                    contract: schema.contract.clone(),
116                    initial_value: schema.initial_value.0.clone(),
117                    contract_path: contracts_path
118                        .join("contracts")
119                        .join(format!("{}_temp_{}", self.governance_id, id)),
120                })
121                .await?;
122
123            if let CompilerResponse::Error(e) = response {
124                compiler.ask_stop().await?;
125                return Ok(Some(e));
126            }
127        }
128
129        compiler.ask_stop().await?;
130        Ok(None)
131    }
132
133    async fn check_governance(
134        &self,
135        gov_version: u64,
136    ) -> Result<bool, ActorError> {
137        match self.gov_version.cmp(&gov_version) {
138            std::cmp::Ordering::Less => {
139                warn!(
140                    local_gov_version = self.gov_version,
141                    request_gov_version = gov_version,
142                    governance_id = %self.governance_id,
143                    sender = %self.node_key,
144                    "Received request with a higher governance version; ignoring request"
145                );
146                Err(ActorError::Functional {
147                    description:
148                        "Abort evaluation, request governance version is higher than local"
149                            .to_owned(),
150                })
151            }
152            std::cmp::Ordering::Equal => {
153                // If it is the same it means that we have the latest version of governance, we are up to date.
154                Ok(false)
155            }
156            std::cmp::Ordering::Greater => Ok(true),
157        }
158    }
159
160    async fn evaluate(
161        &self,
162        ctx: &mut ActorContext<Self>,
163        evaluation_req: &EvaluationReq,
164    ) -> Result<RunnerResult, EvaluatorError> {
165        let runner_data =
166            evaluation_req.build_evaluate_info(&self.init_state)?;
167
168        // Mirar la parte final de execute contract.
169        let response = self
170            .execute_contract(ctx, runner_data, evaluation_req.signer_is_owner)
171            .await
172            .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
173
174        let (result, compilations) = match response {
175            RunnerResponse::Ok {
176                result,
177                compilations,
178            } => (result, compilations),
179            RunnerResponse::Error(runner_error) => {
180                return Err(EvaluatorError::from(runner_error));
181            }
182        };
183
184        if self.init_state.is_none() && !compilations.is_empty() {
185            let governance_data = GovernanceData::try_from(
186                result.final_state.clone(),
187            )
188            .map_err(|e| {
189                let e = format!(
190                    "can not convert GovernanceData from properties: {}",
191                    e
192                );
193                EvaluatorError::InternalError(e)
194            })?;
195
196            if let Some(error) = self
197                .compile_contracts(ctx, &compilations, governance_data.schemas)
198                .await
199                .map_err(|e| EvaluatorError::InternalError(e.to_string()))?
200            {
201                return Err(EvaluatorError::from(error));
202            };
203        }
204
205        Ok(result)
206    }
207
208    fn generate_json_patch(
209        prev_state: &Value,
210        new_state: &Value,
211    ) -> Result<Value, EvaluatorError> {
212        let patch = diff(prev_state, new_state);
213        serde_json::to_value(patch).map_err(|e| {
214            EvaluatorError::InternalError(format!(
215                "Can not generate json patch {}",
216                e
217            ))
218        })
219    }
220
221    fn build_response(
222        &self,
223        evaluation: RunnerResult,
224        evaluation_req: Signed<EvaluationReq>,
225    ) -> Result<EvaluationRes, EvaluatorError> {
226        let eval_req_hash =
227            hash_borsh(&*self.hash.hasher(), &evaluation_req)
228                .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
229
230        let EvaluationReq {
231            event_request,
232            governance_id,
233            sn,
234            gov_version,
235            namespace,
236            schema_id,
237            signer,
238            ..
239        } = evaluation_req.content().clone();
240
241        let req_subject_data_hash = hash_borsh(
242            &*self.hash.hasher(),
243            &RequestSubjectData {
244                namespace,
245                schema_id,
246                subject_id: event_request.content().get_subject_id(),
247                governance_id,
248                sn,
249                gov_version,
250                signer,
251            },
252        )
253        .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
254
255        let (patch, properties_hash) = match &evaluation_req.content().data {
256            EvaluateData::GovFact { state, .. } => {
257                let properties_hash =
258                    hash_borsh(&*self.hash.hasher(), &evaluation.final_state)
259                        .map_err(|e| {
260                        EvaluatorError::InternalError(e.to_string())
261                    })?;
262
263                let state = state.to_value_wrapper();
264                let patch = Self::generate_json_patch(
265                    &state.0,
266                    &evaluation.final_state.0,
267                )?;
268
269                (ValueWrapper(patch), properties_hash)
270            }
271            EvaluateData::TrackerSchemasFact { state, .. } => {
272                let properties_hash =
273                    hash_borsh(&*self.hash.hasher(), &evaluation.final_state)
274                        .map_err(|e| {
275                        EvaluatorError::InternalError(e.to_string())
276                    })?;
277
278                let patch = Self::generate_json_patch(
279                    &state.0,
280                    &evaluation.final_state.0,
281                )?;
282
283                (ValueWrapper(patch), properties_hash)
284            }
285            EvaluateData::GovTransfer { state } => {
286                let state = state.to_value_wrapper();
287                let properties_hash = hash_borsh(&*self.hash.hasher(), &state)
288                    .map_err(|e| {
289                        EvaluatorError::InternalError(e.to_string())
290                    })?;
291
292                (evaluation.final_state, properties_hash)
293            }
294            EvaluateData::GovConfirm { state } => {
295                let properties_hash =
296                    hash_borsh(&*self.hash.hasher(), &evaluation.final_state)
297                        .map_err(|e| {
298                        EvaluatorError::InternalError(e.to_string())
299                    })?;
300
301                let state = state.to_value_wrapper();
302                let patch = Self::generate_json_patch(
303                    &state.0,
304                    &evaluation.final_state.0,
305                )?;
306
307                (ValueWrapper(patch), properties_hash)
308            }
309            EvaluateData::TrackerSchemasTransfer { state, .. } => {
310                let properties_hash = hash_borsh(&*self.hash.hasher(), &state)
311                    .map_err(|e| {
312                        EvaluatorError::InternalError(e.to_string())
313                    })?;
314
315                (evaluation.final_state, properties_hash)
316            }
317        };
318
319        Ok(EvaluationRes::Response {
320            response: EvalRes {
321                patch,
322                properties_hash,
323                appr_required: evaluation.approval_required,
324            },
325            eval_req_hash,
326            req_subject_data_hash,
327        })
328    }
329
330    fn build_response_error(
331        &self,
332        evaluator_error: EvaluatorError,
333        evaluation_req: Signed<EvaluationReq>,
334    ) -> Result<EvaluationRes, EvaluatorError> {
335        match &evaluator_error {
336            EvaluatorError::InvalidEventSignature
337            | EvaluatorError::InvalidEventRequest(..) => {
338                return Ok(EvaluationRes::Abort(evaluator_error.to_string()));
339            }
340            EvaluatorError::Runner(EvalRunnerError::ContractNotFound(..)) => {
341                return Ok(EvaluationRes::Reboot);
342            }
343            _ => {}
344        };
345
346        let eval_req_hash =
347            hash_borsh(&*self.hash.hasher(), &evaluation_req)
348                .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
349
350        let EvaluationReq {
351            event_request,
352            governance_id,
353            sn,
354            gov_version,
355            namespace,
356            schema_id,
357            signer,
358            ..
359        } = evaluation_req.content().clone();
360
361        let req_subject_data_hash = hash_borsh(
362            &*self.hash.hasher(),
363            &RequestSubjectData {
364                namespace,
365                schema_id,
366                subject_id: event_request.content().get_subject_id(),
367                governance_id,
368                sn,
369                gov_version,
370                signer,
371            },
372        )
373        .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
374
375        Ok(EvaluationRes::Error {
376            error: evaluator_error,
377            eval_req_hash,
378            req_subject_data_hash,
379        })
380    }
381
382    async fn create_res(
383        &self,
384        ctx: &mut ActorContext<Self>,
385        reboot: bool,
386        evaluation_req: &Signed<EvaluationReq>,
387    ) -> Result<EvaluationRes, EvaluatorError> {
388        let evaluation = if reboot {
389            EvaluationRes::Reboot
390        } else {
391            match self.evaluate(ctx, evaluation_req.content()).await {
392                Ok(evaluation) => {
393                    self.build_response(evaluation, evaluation_req.clone())?
394                }
395                Err(error) => {
396                    if let EvaluatorError::InternalError(..) = &error {
397                        return Err(error);
398                    } else {
399                        self.build_response_error(
400                            error,
401                            evaluation_req.clone(),
402                        )?
403                    }
404                }
405            }
406        };
407
408        Ok(evaluation)
409    }
410
411    fn check_data(
412        &self,
413        evaluation_req: &Signed<EvaluationReq>,
414    ) -> Result<(), EvaluatorError> {
415        let event_is_for_gov = evaluation_req.content().data.is_gov_event();
416        match (self.init_state.is_none(), event_is_for_gov) {
417            (true, false) => return Err(EvaluatorError::InvalidEventRequest(
418                "Evaluator is for governance but eval request is for tracker"
419                    .to_owned(),
420            )),
421            (false, true) => return Err(EvaluatorError::InvalidEventRequest(
422                "Evaluator is for tracker but eval request is for governance"
423                    .to_owned(),
424            )),
425            _ => {}
426        };
427
428        if evaluation_req.content().governance_id != self.governance_id {
429            return Err(EvaluatorError::InvalidEventRequest(format!(
430                "Evaluator governance_id {} and eval request governance_id {} are different",
431                self.governance_id,
432                evaluation_req.content().governance_id
433            )));
434        }
435
436        if evaluation_req.verify().is_err() {
437            return Err(EvaluatorError::InvalidEventSignature);
438        }
439
440        if evaluation_req.content().event_request.verify().is_err() {
441            return Err(EvaluatorError::InvalidEventSignature);
442        }
443
444        Ok(())
445    }
446}
447
448#[derive(Debug, Clone)]
449pub enum EvalWorkerMessage {
450    UpdateGovVersion {
451        gov_version: u64,
452    },
453    LocalEvaluation {
454        evaluation_req: Signed<EvaluationReq>,
455    },
456    NetworkRequest {
457        evaluation_req: Signed<EvaluationReq>,
458        sender: PublicKey,
459        info: ComunicateInfo,
460    },
461}
462
463impl Message for EvalWorkerMessage {}
464
465#[async_trait]
466impl Actor for EvalWorker {
467    type Event = ();
468    type Message = EvalWorkerMessage;
469    type Response = ();
470
471    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
472        parent_span.map_or_else(
473            || info_span!("EvalWorker", id),
474            |parent_span| info_span!(parent: parent_span, "EvalWorker", id),
475        )
476    }
477}
478
479impl NotPersistentActor for EvalWorker {}
480
481#[async_trait]
482impl Handler<Self> for EvalWorker {
483    async fn handle_message(
484        &mut self,
485        _sender: ActorPath,
486        msg: EvalWorkerMessage,
487        ctx: &mut ActorContext<Self>,
488    ) -> Result<(), ActorError> {
489        match msg {
490            EvalWorkerMessage::UpdateGovVersion { gov_version } => {
491                self.gov_version = gov_version;
492            }
493            EvalWorkerMessage::LocalEvaluation { evaluation_req } => {
494                let evaluation =
495                    match self.create_res(ctx, false, &evaluation_req).await {
496                        Ok(eval) => eval,
497                        Err(e) => {
498                            error!(
499                                msg_type = "LocalEvaluation",
500                                error = %e,
501                                "Failed to create evaluation response"
502                            );
503                            return Err(emit_fail(
504                                ctx,
505                                ActorError::FunctionalCritical {
506                                    description: e.to_string(),
507                                },
508                            )
509                            .await);
510                        }
511                    };
512
513                let signature = match get_sign(
514                    ctx,
515                    SignTypesNode::EvaluationRes(evaluation.clone()),
516                )
517                .await
518                {
519                    Ok(signature) => signature,
520                    Err(e) => {
521                        error!(
522                            msg_type = "LocalEvaluation",
523                            error = %e,
524                            "Failed to sign evaluator response"
525                        );
526                        return Err(emit_fail(ctx, e).await);
527                    }
528                };
529
530                match ctx.get_parent::<Evaluation>().await {
531                    Ok(evaluation_actor) => {
532                        if let Err(e) = evaluation_actor
533                            .tell(EvaluationMessage::Response {
534                                evaluation_res: evaluation.clone(),
535                                sender: (*self.our_key).clone(),
536                                signature: Some(signature),
537                            })
538                            .await
539                        {
540                            error!(
541                                msg_type = "LocalEvaluation",
542                                error = %e,
543                                "Failed to send response to evaluation actor"
544                            );
545                            return Err(emit_fail(ctx, e).await);
546                        }
547
548                        debug!(
549                            msg_type = "LocalEvaluation",
550                            "Local evaluation completed successfully"
551                        );
552                    }
553                    Err(e) => {
554                        error!(
555                            msg_type = "LocalEvaluation",
556                            path = %ctx.path().parent(),
557                            "Evaluation actor not found"
558                        );
559                        return Err(e);
560                    }
561                }
562
563                ctx.stop(None).await;
564            }
565            EvalWorkerMessage::NetworkRequest {
566                evaluation_req,
567                info,
568                sender,
569            } => {
570                if sender != evaluation_req.signature().signer
571                    || sender != self.node_key
572                {
573                    warn!(
574                        msg_type = "NetworkRequest",
575                        expected_sender = %self.node_key,
576                        received_sender = %sender,
577                        signer = %evaluation_req.signature().signer,
578                        "Unexpected sender"
579                    );
580                    if self.stop {
581                        ctx.stop(None).await;
582                    }
583
584                    return Ok(());
585                }
586
587                // TODO MUCHO CUIDADO COn esto
588                let reboot = match self
589                    .check_governance(evaluation_req.content().gov_version)
590                    .await
591                {
592                    Ok(reboot) => reboot,
593                    Err(e) => {
594                        warn!(
595                            msg_type = "NetworkRequest",
596                            error = %e,
597                            "Failed to check governance"
598                        );
599                        if let ActorError::Functional { .. } = e {
600                            if self.stop {
601                                ctx.stop(None).await;
602                            }
603                            return Err(e);
604                        } else {
605                            return Err(emit_fail(ctx, e).await);
606                        }
607                    }
608                };
609
610                let evaluation = if let Err(error) =
611                    self.check_data(&evaluation_req)
612                {
613                    match self
614                        .build_response_error(error, evaluation_req.clone())
615                    {
616                        Ok(eval) => eval,
617                        Err(e) => {
618                            error!(
619                                msg_type = "NetworkRequest",
620                                error = %e,
621                                "Failed to build error response"
622                            );
623                            return Err(emit_fail(
624                                ctx,
625                                ActorError::FunctionalCritical {
626                                    description: e.to_string(),
627                                },
628                            )
629                            .await);
630                        }
631                    }
632                } else {
633                    match self.create_res(ctx, reboot, &evaluation_req).await {
634                        Ok(eval) => eval,
635                        Err(e) => {
636                            error!(
637                                msg_type = "NetworkRequest",
638                                error = %e,
639                                "Internal error during evaluation"
640                            );
641                            return Err(emit_fail(
642                                ctx,
643                                ActorError::FunctionalCritical {
644                                    description: e.to_string(),
645                                },
646                            )
647                            .await);
648                        }
649                    }
650                };
651
652                let signature = match get_sign(
653                    ctx,
654                    SignTypesNode::EvaluationRes(evaluation.clone()),
655                )
656                .await
657                {
658                    Ok(signature) => signature,
659                    Err(e) => {
660                        error!(
661                            msg_type = "NetworkRequest",
662                            error = %e,
663                            "Failed to sign response"
664                        );
665                        return Err(emit_fail(ctx, e).await);
666                    }
667                };
668
669                let new_info = ComunicateInfo {
670                    receiver: sender.clone(),
671                    request_id: info.request_id.clone(),
672                    version: info.version,
673                    receiver_actor: format!(
674                        "/user/request/{}/evaluation/{}",
675                        evaluation_req
676                            .content()
677                            .event_request
678                            .content()
679                            .get_subject_id(),
680                        self.our_key.clone()
681                    ),
682                };
683
684                let signed_response: Signed<EvaluationRes> =
685                    Signed::from_parts(evaluation, signature);
686                if let Err(e) = self
687                    .network
688                    .send_command(network::CommandHelper::SendMessage {
689                        message: NetworkMessage {
690                            info: new_info,
691                            message: ActorMessage::EvaluationRes {
692                                res: signed_response,
693                            },
694                        },
695                    })
696                    .await
697                {
698                    error!(
699                        msg_type = "NetworkRequest",
700                        error = %e,
701                        "Failed to send response to network"
702                    );
703                    return Err(emit_fail(ctx, e).await);
704                };
705
706                debug!(
707                    msg_type = "NetworkRequest",
708                    request_id = %info.request_id,
709                    version = info.version,
710                    sender = %sender,
711                    "Network evaluation request processed successfully"
712                );
713
714                if self.stop {
715                    ctx.stop(None).await;
716                }
717            }
718        }
719
720        Ok(())
721    }
722
723    async fn on_child_fault(
724        &mut self,
725        error: ActorError,
726        ctx: &mut ActorContext<Self>,
727    ) -> ChildAction {
728        error!(
729            governance_id = %self.governance_id,
730            gov_version = self.gov_version,
731            sn = self.sn,
732            node_key = %self.node_key,
733            error = %error,
734            "Child fault in evaluation worker"
735        );
736        emit_fail(ctx, error).await;
737        ChildAction::Stop
738    }
739}