Skip to main content

ave_core/evaluation/
worker.rs

1use std::{collections::BTreeMap, sync::Arc};
2
3use crate::{
4    evaluation::{
5        compiler::{CompilerResponse, error::CompilerError},
6        request::EvaluateData,
7        response::{
8            EvalRunnerError, EvaluatorError, EvaluatorResponse as EvalRes,
9        },
10        runner::types::EvaluateInfo,
11    },
12    governance::{data::GovernanceData, model::Schema},
13    helpers::network::{NetworkMessage, service::NetworkSender},
14    model::common::{
15        emit_fail,
16        node::{SignTypesNode, UpdateData, get_sign, update_ledger_network},
17    },
18    subject::RequestSubjectData,
19    system::ConfigHelper,
20};
21
22use crate::helpers::network::ActorMessage;
23
24use async_trait::async_trait;
25use ave_common::{
26    SchemaType, ValueWrapper,
27    identity::{
28        DigestIdentifier, HashAlgorithm, PublicKey, Signed, hash_borsh,
29    },
30};
31
32use json_patch::diff;
33use network::ComunicateInfo;
34
35use ave_actors::{
36    Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
37    NotPersistentActor,
38};
39
40use serde_json::Value;
41use tracing::{Span, debug, error, info_span, warn};
42
43use super::{
44    Evaluation, EvaluationMessage,
45    compiler::{Compiler, CompilerMessage},
46    request::EvaluationReq,
47    response::EvaluationRes,
48    runner::{Runner, RunnerMessage, RunnerResponse, types::RunnerResult},
49};
50
51/// A struct representing a EvalWorker actor.
52#[derive(Clone, Debug)]
53pub struct EvalWorker {
54    pub node_key: PublicKey,
55    pub our_key: Arc<PublicKey>,
56    pub governance_id: DigestIdentifier,
57    pub gov_version: u64,
58    pub sn: u64,
59    pub init_state: Option<ValueWrapper>,
60    pub hash: HashAlgorithm,
61    pub network: Arc<NetworkSender>,
62    pub stop: bool,
63}
64
65impl EvalWorker {
66    async fn execute_contract(
67        &self,
68        ctx: &mut ActorContext<Self>,
69        runner_data: EvaluateInfo,
70        is_owner: bool,
71    ) -> Result<RunnerResponse, ActorError> {
72        let runner_actor = ctx.create_child("runner", Runner).await?;
73
74        let response = runner_actor
75            .ask(RunnerMessage {
76                data: runner_data,
77                is_owner,
78            })
79            .await;
80
81        runner_actor.ask_stop().await?;
82
83        response
84    }
85
86    async fn compile_contracts(
87        &self,
88        ctx: &mut ActorContext<Self>,
89        ids: &[SchemaType],
90        schemas: BTreeMap<SchemaType, Schema>,
91    ) -> Result<Option<CompilerError>, ActorError> {
92        let contracts_path = if let Some(config) =
93            ctx.system().get_helper::<ConfigHelper>("config").await
94        {
95            config.contracts_path
96        } else {
97            return Err(ActorError::Helper {
98                name: "config".to_owned(),
99                reason: "Not found".to_owned(),
100            });
101        };
102
103        let compiler = ctx
104            .create_child("compiler", Compiler::new(self.hash))
105            .await?;
106
107        for id in ids {
108            let Some(schema) = schemas.get(id) else {
109                return Err(ActorError::Functional { description: "There is a contract that requires compilation but its scheme could not be found".to_owned()});
110            };
111
112            let response = compiler
113                .ask(CompilerMessage::TemporalCompile {
114                    contract_name: format!("{}_{}", self.governance_id, id),
115                    contract: schema.contract.clone(),
116                    initial_value: schema.initial_value.0.clone(),
117                    contract_path: contracts_path
118                        .join("contracts")
119                        .join(format!("{}_temp_{}", self.governance_id, id)),
120                })
121                .await?;
122
123            if let CompilerResponse::Error(e) = response {
124                compiler.ask_stop().await?;
125                return Ok(Some(e));
126            }
127        }
128
129        compiler.ask_stop().await?;
130        Ok(None)
131    }
132
133    async fn check_governance(
134        &self,
135        gov_version: u64,
136    ) -> Result<bool, ActorError> {
137        match gov_version.cmp(&self.gov_version) {
138            std::cmp::Ordering::Equal => {
139                // If it is the same it means that we have the latest version of governance, we are up to date.
140            }
141            std::cmp::Ordering::Greater => {
142                // Me llega una versión mayor a la mía.
143                let data = UpdateData {
144                    sn: self.sn,
145                    gov_version: self.gov_version,
146                    subject_id: self.governance_id.clone(),
147                    other_node: self.node_key.clone(),
148                };
149                update_ledger_network(data, self.network.clone()).await?;
150                let e = ActorError::Functional {
151                    description: "Abort evaluation, update is required"
152                        .to_owned(),
153                };
154                return Err(e);
155            }
156            std::cmp::Ordering::Less => {
157                return Ok(true);
158            }
159        }
160
161        Ok(false)
162    }
163
164    async fn evaluate(
165        &self,
166        ctx: &mut ActorContext<Self>,
167        evaluation_req: &EvaluationReq,
168    ) -> Result<RunnerResult, EvaluatorError> {
169        let runner_data =
170            evaluation_req.build_evaluate_info(&self.init_state)?;
171
172        // Mirar la parte final de execute contract.
173        let response = self
174            .execute_contract(ctx, runner_data, evaluation_req.signer_is_owner)
175            .await
176            .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
177
178        let (result, compilations) = match response {
179            RunnerResponse::Ok {
180                result,
181                compilations,
182            } => (result, compilations),
183            RunnerResponse::Error(runner_error) => {
184                return Err(EvaluatorError::from(runner_error));
185            }
186        };
187
188        if self.init_state.is_none() && !compilations.is_empty() {
189            let governance_data = GovernanceData::try_from(
190                result.final_state.clone(),
191            )
192            .map_err(|e| {
193                let e = format!(
194                    "can not convert GovernanceData from properties: {}",
195                    e
196                );
197                EvaluatorError::InternalError(e)
198            })?;
199
200            if let Some(error) = self
201                .compile_contracts(ctx, &compilations, governance_data.schemas)
202                .await
203                .map_err(|e| EvaluatorError::InternalError(e.to_string()))?
204            {
205                return Err(EvaluatorError::from(error));
206            };
207        }
208
209        Ok(result)
210    }
211
212    fn generate_json_patch(
213        prev_state: &Value,
214        new_state: &Value,
215    ) -> Result<Value, EvaluatorError> {
216        let patch = diff(prev_state, new_state);
217        serde_json::to_value(patch).map_err(|e| {
218            EvaluatorError::InternalError(format!(
219                "Can not generate json patch {}",
220                e
221            ))
222        })
223    }
224
225    fn build_response(
226        &self,
227        evaluation: RunnerResult,
228        evaluation_req: Signed<EvaluationReq>,
229    ) -> Result<EvaluationRes, EvaluatorError> {
230        let eval_req_hash =
231            hash_borsh(&*self.hash.hasher(), &evaluation_req)
232                .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
233
234        let EvaluationReq {
235            event_request,
236            governance_id,
237            sn,
238            gov_version,
239            namespace,
240            schema_id,
241            signer,
242            ..
243        } = evaluation_req.content().clone();
244
245        let req_subject_data_hash = hash_borsh(
246            &*self.hash.hasher(),
247            &RequestSubjectData {
248                namespace,
249                schema_id,
250                subject_id: event_request.content().get_subject_id(),
251                governance_id,
252                sn,
253                gov_version,
254                signer,
255            },
256        )
257        .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
258
259        let (patch, properties_hash) = match &evaluation_req.content().data {
260            EvaluateData::GovFact { state, .. } => {
261                let properties_hash =
262                    hash_borsh(&*self.hash.hasher(), &evaluation.final_state)
263                        .map_err(|e| {
264                        EvaluatorError::InternalError(e.to_string())
265                    })?;
266
267                let state = state.to_value_wrapper();
268                let patch = Self::generate_json_patch(
269                    &state.0,
270                    &evaluation.final_state.0,
271                )?;
272
273                (ValueWrapper(patch), properties_hash)
274            }
275            EvaluateData::TrackerSchemasFact { state, .. } => {
276                let properties_hash =
277                    hash_borsh(&*self.hash.hasher(), &evaluation.final_state)
278                        .map_err(|e| {
279                        EvaluatorError::InternalError(e.to_string())
280                    })?;
281
282                let patch = Self::generate_json_patch(
283                    &state.0,
284                    &evaluation.final_state.0,
285                )?;
286
287                (ValueWrapper(patch), properties_hash)
288            }
289            EvaluateData::GovTransfer { state } => {
290                let state = state.to_value_wrapper();
291                let properties_hash = hash_borsh(&*self.hash.hasher(), &state)
292                    .map_err(|e| {
293                        EvaluatorError::InternalError(e.to_string())
294                    })?;
295
296                (evaluation.final_state, properties_hash)
297            }
298            EvaluateData::GovConfirm { state } => {
299                let properties_hash =
300                    hash_borsh(&*self.hash.hasher(), &evaluation.final_state)
301                        .map_err(|e| {
302                        EvaluatorError::InternalError(e.to_string())
303                    })?;
304
305                let state = state.to_value_wrapper();
306                let patch = Self::generate_json_patch(
307                    &state.0,
308                    &evaluation.final_state.0,
309                )?;
310
311                (ValueWrapper(patch), properties_hash)
312            }
313            EvaluateData::TrackerSchemasTransfer { state, .. } => {
314                let properties_hash = hash_borsh(&*self.hash.hasher(), &state)
315                    .map_err(|e| {
316                        EvaluatorError::InternalError(e.to_string())
317                    })?;
318
319                (evaluation.final_state, properties_hash)
320            }
321        };
322
323        Ok(EvaluationRes::Response {
324            response: EvalRes {
325                patch,
326                properties_hash,
327                appr_required: evaluation.approval_required,
328            },
329            eval_req_hash,
330            req_subject_data_hash,
331        })
332    }
333
334    fn build_response_error(
335        &self,
336        evaluator_error: EvaluatorError,
337        evaluation_req: Signed<EvaluationReq>,
338    ) -> Result<EvaluationRes, EvaluatorError> {
339        match &evaluator_error {
340            EvaluatorError::InvalidEventSignature
341            | EvaluatorError::InvalidEventRequest(..) => {
342                return Ok(EvaluationRes::Abort(evaluator_error.to_string()));
343            }
344            EvaluatorError::Runner(EvalRunnerError::ContractNotFound(..)) => {
345                return Ok(EvaluationRes::Reboot);
346            }
347            _ => {}
348        };
349
350        let eval_req_hash =
351            hash_borsh(&*self.hash.hasher(), &evaluation_req)
352                .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
353
354        let EvaluationReq {
355            event_request,
356            governance_id,
357            sn,
358            gov_version,
359            namespace,
360            schema_id,
361            signer,
362            ..
363        } = evaluation_req.content().clone();
364
365        let req_subject_data_hash = hash_borsh(
366            &*self.hash.hasher(),
367            &RequestSubjectData {
368                namespace,
369                schema_id,
370                subject_id: event_request.content().get_subject_id(),
371                governance_id,
372                sn,
373                gov_version,
374                signer,
375            },
376        )
377        .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
378
379        Ok(EvaluationRes::Error {
380            error: evaluator_error,
381            eval_req_hash,
382            req_subject_data_hash,
383        })
384    }
385
386    async fn create_res(
387        &self,
388        ctx: &mut ActorContext<Self>,
389        reboot: bool,
390        evaluation_req: &Signed<EvaluationReq>,
391    ) -> Result<EvaluationRes, EvaluatorError> {
392        let evaluation = if reboot {
393            EvaluationRes::Reboot
394        } else {
395            match self.evaluate(ctx, evaluation_req.content()).await {
396                Ok(evaluation) => {
397                    self.build_response(evaluation, evaluation_req.clone())?
398                }
399                Err(error) => {
400                    if let EvaluatorError::InternalError(..) = &error {
401                        return Err(error);
402                    } else {
403                        self.build_response_error(
404                            error,
405                            evaluation_req.clone(),
406                        )?
407                    }
408                }
409            }
410        };
411
412        Ok(evaluation)
413    }
414
415    fn check_data(
416        &self,
417        evaluation_req: &Signed<EvaluationReq>,
418    ) -> Result<(), EvaluatorError> {
419        let event_is_for_gov = evaluation_req.content().data.is_gov_event();
420        match (self.init_state.is_none(), event_is_for_gov) {
421            (true, false) => return Err(EvaluatorError::InvalidEventRequest(
422                "Evaluator is for governance but eval request is for tracker"
423                    .to_owned(),
424            )),
425            (false, true) => return Err(EvaluatorError::InvalidEventRequest(
426                "Evaluator is for tracker but eval request is for governance"
427                    .to_owned(),
428            )),
429            _ => {}
430        };
431
432        if evaluation_req.content().governance_id != self.governance_id {
433            return Err(EvaluatorError::InvalidEventRequest(format!(
434                "Evaluator governance_id {} and eval request governance_id {} are different",
435                self.governance_id,
436                evaluation_req.content().governance_id
437            )));
438        }
439
440        if evaluation_req.verify().is_err() {
441            return Err(EvaluatorError::InvalidEventSignature);
442        }
443
444        if evaluation_req.content().event_request.verify().is_err() {
445            return Err(EvaluatorError::InvalidEventSignature);
446        }
447
448        Ok(())
449    }
450}
451
452#[derive(Debug, Clone)]
453pub enum EvalWorkerMessage {
454    UpdateGovVersion {
455        gov_version: u64,
456    },
457    LocalEvaluation {
458        evaluation_req: Signed<EvaluationReq>,
459    },
460    NetworkRequest {
461        evaluation_req: Signed<EvaluationReq>,
462        sender: PublicKey,
463        info: ComunicateInfo,
464    },
465}
466
467impl Message for EvalWorkerMessage {}
468
469#[async_trait]
470impl Actor for EvalWorker {
471    type Event = ();
472    type Message = EvalWorkerMessage;
473    type Response = ();
474
475    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
476        parent_span.map_or_else(
477            || info_span!("EvalWorker", id),
478            |parent_span| info_span!(parent: parent_span, "EvalWorker", id),
479        )
480    }
481}
482
483impl NotPersistentActor for EvalWorker {}
484
485#[async_trait]
486impl Handler<Self> for EvalWorker {
487    async fn handle_message(
488        &mut self,
489        _sender: ActorPath,
490        msg: EvalWorkerMessage,
491        ctx: &mut ActorContext<Self>,
492    ) -> Result<(), ActorError> {
493        match msg {
494            EvalWorkerMessage::UpdateGovVersion { gov_version } => {
495                self.gov_version = gov_version;
496            }
497            EvalWorkerMessage::LocalEvaluation { evaluation_req } => {
498                let evaluation =
499                    match self.create_res(ctx, false, &evaluation_req).await {
500                        Ok(eval) => eval,
501                        Err(e) => {
502                            error!(
503                                msg_type = "LocalEvaluation",
504                                error = %e,
505                                "Failed to create evaluation response"
506                            );
507                            return Err(emit_fail(
508                                ctx,
509                                ActorError::FunctionalCritical {
510                                    description: e.to_string(),
511                                },
512                            )
513                            .await);
514                        }
515                    };
516
517                let signature = match get_sign(
518                    ctx,
519                    SignTypesNode::EvaluationRes(evaluation.clone()),
520                )
521                .await
522                {
523                    Ok(signature) => signature,
524                    Err(e) => {
525                        error!(
526                            msg_type = "LocalEvaluation",
527                            error = %e,
528                            "Failed to sign evaluator response"
529                        );
530                        return Err(emit_fail(ctx, e).await);
531                    }
532                };
533
534                match ctx.get_parent::<Evaluation>().await {
535                    Ok(evaluation_actor) => {
536                        if let Err(e) = evaluation_actor
537                            .tell(EvaluationMessage::Response {
538                                evaluation_res: evaluation.clone(),
539                                sender: (*self.our_key).clone(),
540                                signature: Some(signature),
541                            })
542                            .await
543                        {
544                            error!(
545                                msg_type = "LocalEvaluation",
546                                error = %e,
547                                "Failed to send response to evaluation actor"
548                            );
549                            return Err(emit_fail(ctx, e).await);
550                        }
551
552                        debug!(
553                            msg_type = "LocalEvaluation",
554                            "Local evaluation completed successfully"
555                        );
556                    }
557                    Err(e) => {
558                        error!(
559                            msg_type = "LocalEvaluation",
560                            path = %ctx.path().parent(),
561                            "Evaluation actor not found"
562                        );
563                        return Err(e);
564                    }
565                }
566
567                ctx.stop(None).await;
568            }
569            EvalWorkerMessage::NetworkRequest {
570                evaluation_req,
571                info,
572                sender,
573            } => {
574                if sender != evaluation_req.signature().signer
575                    || sender != self.node_key
576                {
577                    warn!(
578                        msg_type = "NetworkRequest",
579                        expected_sender = %self.node_key,
580                        received_sender = %sender,
581                        signer = %evaluation_req.signature().signer,
582                        "Unexpected sender"
583                    );
584                    if self.stop {
585                        ctx.stop(None).await;
586                    }
587
588                    return Ok(());
589                }
590
591                // TODO MUCHO CUIDADO COn esto
592                let reboot = match self
593                    .check_governance(evaluation_req.content().gov_version)
594                    .await
595                {
596                    Ok(reboot) => reboot,
597                    Err(e) => {
598                        warn!(
599                            msg_type = "NetworkRequest",
600                            error = %e,
601                            "Failed to check governance"
602                        );
603                        if let ActorError::Functional { .. } = e {
604                            if self.stop {
605                                ctx.stop(None).await;
606                            }
607                            return Err(e);
608                        } else {
609                            return Err(emit_fail(ctx, e).await);
610                        }
611                    }
612                };
613
614                let evaluation = if let Err(error) =
615                    self.check_data(&evaluation_req)
616                {
617                    match self
618                        .build_response_error(error, evaluation_req.clone())
619                    {
620                        Ok(eval) => eval,
621                        Err(e) => {
622                            error!(
623                                msg_type = "NetworkRequest",
624                                error = %e,
625                                "Failed to build error response"
626                            );
627                            return Err(emit_fail(
628                                ctx,
629                                ActorError::FunctionalCritical {
630                                    description: e.to_string(),
631                                },
632                            )
633                            .await);
634                        }
635                    }
636                } else {
637                    match self.create_res(ctx, reboot, &evaluation_req).await {
638                        Ok(eval) => eval,
639                        Err(e) => {
640                            error!(
641                                msg_type = "NetworkRequest",
642                                error = %e,
643                                "Internal error during evaluation"
644                            );
645                            return Err(emit_fail(
646                                ctx,
647                                ActorError::FunctionalCritical {
648                                    description: e.to_string(),
649                                },
650                            )
651                            .await);
652                        }
653                    }
654                };
655
656                let signature = match get_sign(
657                    ctx,
658                    SignTypesNode::EvaluationRes(evaluation.clone()),
659                )
660                .await
661                {
662                    Ok(signature) => signature,
663                    Err(e) => {
664                        error!(
665                            msg_type = "NetworkRequest",
666                            error = %e,
667                            "Failed to sign response"
668                        );
669                        return Err(emit_fail(ctx, e).await);
670                    }
671                };
672
673                let new_info = ComunicateInfo {
674                    receiver: sender.clone(),
675                    request_id: info.request_id.clone(),
676                    version: info.version,
677                    receiver_actor: format!(
678                        "/user/request/{}/evaluation/{}",
679                        evaluation_req
680                            .content()
681                            .event_request
682                            .content()
683                            .get_subject_id(),
684                        self.our_key.clone()
685                    ),
686                };
687
688                let signed_response: Signed<EvaluationRes> =
689                    Signed::from_parts(evaluation, signature);
690                if let Err(e) = self
691                    .network
692                    .send_command(network::CommandHelper::SendMessage {
693                        message: NetworkMessage {
694                            info: new_info,
695                            message: ActorMessage::EvaluationRes {
696                                res: signed_response,
697                            },
698                        },
699                    })
700                    .await
701                {
702                    error!(
703                        msg_type = "NetworkRequest",
704                        error = %e,
705                        "Failed to send response to network"
706                    );
707                    return Err(emit_fail(ctx, e).await);
708                };
709
710                debug!(
711                    msg_type = "NetworkRequest",
712                    request_id = %info.request_id,
713                    version = info.version,
714                    sender = %sender,
715                    "Network evaluation request processed successfully"
716                );
717
718                if self.stop {
719                    ctx.stop(None).await;
720                }
721            }
722        }
723
724        Ok(())
725    }
726
727    async fn on_child_fault(
728        &mut self,
729        error: ActorError,
730        ctx: &mut ActorContext<Self>,
731    ) -> ChildAction {
732        error!(
733            governance_id = %self.governance_id,
734            gov_version = self.gov_version,
735            sn = self.sn,
736            node_key = %self.node_key,
737            error = %error,
738            "Child fault in evaluation worker"
739        );
740        emit_fail(ctx, error).await;
741        ChildAction::Stop
742    }
743}