1use std::{collections::BTreeMap, sync::Arc};
2
3use crate::{
4 evaluation::{
5 compiler::{CompilerResponse, error::CompilerError},
6 request::EvaluateData,
7 response::{
8 EvalRunnerError, EvaluatorError, EvaluatorResponse as EvalRes,
9 },
10 runner::types::EvaluateInfo,
11 },
12 governance::{data::GovernanceData, model::Schema},
13 helpers::network::{NetworkMessage, service::NetworkSender},
14 model::common::{
15 emit_fail,
16 node::{SignTypesNode, get_sign},
17 },
18 subject::RequestSubjectData,
19 system::ConfigHelper,
20};
21
22use crate::helpers::network::ActorMessage;
23
24use async_trait::async_trait;
25use ave_common::{
26 SchemaType, ValueWrapper,
27 identity::{
28 DigestIdentifier, HashAlgorithm, PublicKey, Signed, hash_borsh,
29 },
30};
31
32use json_patch::diff;
33use network::ComunicateInfo;
34
35use ave_actors::{
36 Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
37 NotPersistentActor,
38};
39
40use serde_json::Value;
41use tracing::{Span, debug, error, info_span, warn};
42
43use super::{
44 Evaluation, EvaluationMessage,
45 compiler::{TempCompiler, TempCompilerMessage},
46 request::EvaluationReq,
47 response::EvaluationRes,
48 runner::{Runner, RunnerMessage, RunnerResponse, types::RunnerResult},
49};
50
51#[derive(Clone, Debug)]
53pub struct EvalWorker {
54 pub node_key: PublicKey,
55 pub our_key: Arc<PublicKey>,
56 pub governance_id: DigestIdentifier,
57 pub gov_version: u64,
58 pub sn: u64,
59 pub init_state: Option<ValueWrapper>,
60 pub hash: HashAlgorithm,
61 pub network: Arc<NetworkSender>,
62 pub stop: bool,
63}
64
65impl EvalWorker {
66 async fn execute_contract(
67 &self,
68 ctx: &mut ActorContext<Self>,
69 runner_data: EvaluateInfo,
70 is_owner: bool,
71 ) -> Result<RunnerResponse, ActorError> {
72 let runner_actor = ctx.create_child("runner", Runner).await?;
73
74 let response = runner_actor
75 .ask(RunnerMessage {
76 data: runner_data,
77 is_owner,
78 })
79 .await;
80
81 runner_actor.ask_stop().await?;
82
83 response
84 }
85
86 async fn compile_contracts(
87 &self,
88 ctx: &mut ActorContext<Self>,
89 ids: &[SchemaType],
90 schemas: BTreeMap<SchemaType, Schema>,
91 ) -> Result<Option<CompilerError>, ActorError> {
92 let contracts_path = if let Some(config) =
93 ctx.system().get_helper::<ConfigHelper>("config").await
94 {
95 config.contracts_path
96 } else {
97 return Err(ActorError::Helper {
98 name: "config".to_owned(),
99 reason: "Not found".to_owned(),
100 });
101 };
102
103 let compiler = ctx
104 .create_child("temp_compiler", TempCompiler::new(self.hash))
105 .await?;
106
107 for id in ids {
108 let Some(schema) = schemas.get(id) else {
109 return Err(ActorError::Functional { description: "There is a contract that requires compilation but its scheme could not be found".to_owned()});
110 };
111
112 let response = compiler
113 .ask(TempCompilerMessage::Compile {
114 contract_name: format!("{}_{}", self.governance_id, id),
115 contract: schema.contract.clone(),
116 initial_value: schema.initial_value.0.clone(),
117 contract_path: contracts_path
118 .join("contracts")
119 .join(format!("{}_temp_{}", self.governance_id, id)),
120 })
121 .await?;
122
123 if let CompilerResponse::Error(e) = response {
124 compiler.ask_stop().await?;
125 return Ok(Some(e));
126 }
127 }
128
129 compiler.ask_stop().await?;
130 Ok(None)
131 }
132
133 async fn check_governance(
134 &self,
135 gov_version: u64,
136 ) -> Result<bool, ActorError> {
137 match self.gov_version.cmp(&gov_version) {
138 std::cmp::Ordering::Less => {
139 warn!(
140 local_gov_version = self.gov_version,
141 request_gov_version = gov_version,
142 governance_id = %self.governance_id,
143 sender = %self.node_key,
144 "Received request with a higher governance version; ignoring request"
145 );
146 Err(ActorError::Functional {
147 description:
148 "Abort evaluation, request governance version is higher than local"
149 .to_owned(),
150 })
151 }
152 std::cmp::Ordering::Equal => {
153 Ok(false)
155 }
156 std::cmp::Ordering::Greater => Ok(true),
157 }
158 }
159
160 async fn evaluate(
161 &self,
162 ctx: &mut ActorContext<Self>,
163 evaluation_req: &EvaluationReq,
164 ) -> Result<RunnerResult, EvaluatorError> {
165 let runner_data =
166 evaluation_req.build_evaluate_info(&self.init_state)?;
167
168 let response = self
170 .execute_contract(ctx, runner_data, evaluation_req.signer_is_owner)
171 .await
172 .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
173
174 let (result, compilations) = match response {
175 RunnerResponse::Ok {
176 result,
177 compilations,
178 } => (result, compilations),
179 RunnerResponse::Error(runner_error) => {
180 return Err(EvaluatorError::from(runner_error));
181 }
182 };
183
184 if self.init_state.is_none() && !compilations.is_empty() {
185 let governance_data = GovernanceData::try_from(
186 result.final_state.clone(),
187 )
188 .map_err(|e| {
189 let e = format!(
190 "can not convert GovernanceData from properties: {}",
191 e
192 );
193 EvaluatorError::InternalError(e)
194 })?;
195
196 if let Some(error) = self
197 .compile_contracts(ctx, &compilations, governance_data.schemas)
198 .await
199 .map_err(|e| EvaluatorError::InternalError(e.to_string()))?
200 {
201 return Err(EvaluatorError::from(error));
202 };
203 }
204
205 Ok(result)
206 }
207
208 fn generate_json_patch(
209 prev_state: &Value,
210 new_state: &Value,
211 ) -> Result<Value, EvaluatorError> {
212 let patch = diff(prev_state, new_state);
213 serde_json::to_value(patch).map_err(|e| {
214 EvaluatorError::InternalError(format!(
215 "Can not generate json patch {}",
216 e
217 ))
218 })
219 }
220
221 fn build_response(
222 &self,
223 evaluation: RunnerResult,
224 evaluation_req: Signed<EvaluationReq>,
225 ) -> Result<EvaluationRes, EvaluatorError> {
226 let eval_req_hash =
227 hash_borsh(&*self.hash.hasher(), &evaluation_req)
228 .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
229
230 let EvaluationReq {
231 event_request,
232 governance_id,
233 sn,
234 gov_version,
235 namespace,
236 schema_id,
237 signer,
238 ..
239 } = evaluation_req.content().clone();
240
241 let req_subject_data_hash = hash_borsh(
242 &*self.hash.hasher(),
243 &RequestSubjectData {
244 namespace,
245 schema_id,
246 subject_id: event_request.content().get_subject_id(),
247 governance_id,
248 sn,
249 gov_version,
250 signer,
251 },
252 )
253 .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
254
255 let (patch, properties_hash) = match &evaluation_req.content().data {
256 EvaluateData::GovFact { state, .. } => {
257 let properties_hash =
258 hash_borsh(&*self.hash.hasher(), &evaluation.final_state)
259 .map_err(|e| {
260 EvaluatorError::InternalError(e.to_string())
261 })?;
262
263 let state = state.to_value_wrapper();
264 let patch = Self::generate_json_patch(
265 &state.0,
266 &evaluation.final_state.0,
267 )?;
268
269 (ValueWrapper(patch), properties_hash)
270 }
271 EvaluateData::TrackerSchemasFact { state, .. } => {
272 let properties_hash =
273 hash_borsh(&*self.hash.hasher(), &evaluation.final_state)
274 .map_err(|e| {
275 EvaluatorError::InternalError(e.to_string())
276 })?;
277
278 let patch = Self::generate_json_patch(
279 &state.0,
280 &evaluation.final_state.0,
281 )?;
282
283 (ValueWrapper(patch), properties_hash)
284 }
285 EvaluateData::GovTransfer { state } => {
286 let state = state.to_value_wrapper();
287 let properties_hash = hash_borsh(&*self.hash.hasher(), &state)
288 .map_err(|e| {
289 EvaluatorError::InternalError(e.to_string())
290 })?;
291
292 (evaluation.final_state, properties_hash)
293 }
294 EvaluateData::GovConfirm { state } => {
295 let properties_hash =
296 hash_borsh(&*self.hash.hasher(), &evaluation.final_state)
297 .map_err(|e| {
298 EvaluatorError::InternalError(e.to_string())
299 })?;
300
301 let state = state.to_value_wrapper();
302 let patch = Self::generate_json_patch(
303 &state.0,
304 &evaluation.final_state.0,
305 )?;
306
307 (ValueWrapper(patch), properties_hash)
308 }
309 EvaluateData::TrackerSchemasTransfer { state, .. } => {
310 let properties_hash = hash_borsh(&*self.hash.hasher(), &state)
311 .map_err(|e| {
312 EvaluatorError::InternalError(e.to_string())
313 })?;
314
315 (evaluation.final_state, properties_hash)
316 }
317 };
318
319 Ok(EvaluationRes::Response {
320 response: EvalRes {
321 patch,
322 properties_hash,
323 appr_required: evaluation.approval_required,
324 },
325 eval_req_hash,
326 req_subject_data_hash,
327 })
328 }
329
330 fn build_response_error(
331 &self,
332 evaluator_error: EvaluatorError,
333 evaluation_req: Signed<EvaluationReq>,
334 ) -> Result<EvaluationRes, EvaluatorError> {
335 match &evaluator_error {
336 EvaluatorError::InvalidEventSignature
337 | EvaluatorError::InvalidEventRequest(..) => {
338 return Ok(EvaluationRes::Abort(evaluator_error.to_string()));
339 }
340 EvaluatorError::Runner(EvalRunnerError::ContractNotFound(..)) => {
341 return Ok(EvaluationRes::Reboot);
342 }
343 _ => {}
344 };
345
346 let eval_req_hash =
347 hash_borsh(&*self.hash.hasher(), &evaluation_req)
348 .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
349
350 let EvaluationReq {
351 event_request,
352 governance_id,
353 sn,
354 gov_version,
355 namespace,
356 schema_id,
357 signer,
358 ..
359 } = evaluation_req.content().clone();
360
361 let req_subject_data_hash = hash_borsh(
362 &*self.hash.hasher(),
363 &RequestSubjectData {
364 namespace,
365 schema_id,
366 subject_id: event_request.content().get_subject_id(),
367 governance_id,
368 sn,
369 gov_version,
370 signer,
371 },
372 )
373 .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
374
375 Ok(EvaluationRes::Error {
376 error: evaluator_error,
377 eval_req_hash,
378 req_subject_data_hash,
379 })
380 }
381
382 async fn create_res(
383 &self,
384 ctx: &mut ActorContext<Self>,
385 reboot: bool,
386 evaluation_req: &Signed<EvaluationReq>,
387 ) -> Result<EvaluationRes, EvaluatorError> {
388 let evaluation = if reboot {
389 EvaluationRes::Reboot
390 } else {
391 match self.evaluate(ctx, evaluation_req.content()).await {
392 Ok(evaluation) => {
393 self.build_response(evaluation, evaluation_req.clone())?
394 }
395 Err(error) => {
396 if let EvaluatorError::InternalError(..) = &error {
397 return Err(error);
398 } else {
399 self.build_response_error(
400 error,
401 evaluation_req.clone(),
402 )?
403 }
404 }
405 }
406 };
407
408 Ok(evaluation)
409 }
410
411 fn check_data(
412 &self,
413 evaluation_req: &Signed<EvaluationReq>,
414 ) -> Result<(), EvaluatorError> {
415 let event_is_for_gov = evaluation_req.content().data.is_gov_event();
416 match (self.init_state.is_none(), event_is_for_gov) {
417 (true, false) => return Err(EvaluatorError::InvalidEventRequest(
418 "Evaluator is for governance but eval request is for tracker"
419 .to_owned(),
420 )),
421 (false, true) => return Err(EvaluatorError::InvalidEventRequest(
422 "Evaluator is for tracker but eval request is for governance"
423 .to_owned(),
424 )),
425 _ => {}
426 };
427
428 if evaluation_req.content().governance_id != self.governance_id {
429 return Err(EvaluatorError::InvalidEventRequest(format!(
430 "Evaluator governance_id {} and eval request governance_id {} are different",
431 self.governance_id,
432 evaluation_req.content().governance_id
433 )));
434 }
435
436 if evaluation_req.verify().is_err() {
437 return Err(EvaluatorError::InvalidEventSignature);
438 }
439
440 if evaluation_req.content().event_request.verify().is_err() {
441 return Err(EvaluatorError::InvalidEventSignature);
442 }
443
444 Ok(())
445 }
446}
447
448#[derive(Debug, Clone)]
449pub enum EvalWorkerMessage {
450 UpdateGovVersion {
451 gov_version: u64,
452 },
453 LocalEvaluation {
454 evaluation_req: Signed<EvaluationReq>,
455 },
456 NetworkRequest {
457 evaluation_req: Signed<EvaluationReq>,
458 sender: PublicKey,
459 info: ComunicateInfo,
460 },
461}
462
463impl Message for EvalWorkerMessage {}
464
465#[async_trait]
466impl Actor for EvalWorker {
467 type Event = ();
468 type Message = EvalWorkerMessage;
469 type Response = ();
470
471 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
472 parent_span.map_or_else(
473 || info_span!("EvalWorker", id),
474 |parent_span| info_span!(parent: parent_span, "EvalWorker", id),
475 )
476 }
477}
478
479impl NotPersistentActor for EvalWorker {}
480
481#[async_trait]
482impl Handler<Self> for EvalWorker {
483 async fn handle_message(
484 &mut self,
485 _sender: ActorPath,
486 msg: EvalWorkerMessage,
487 ctx: &mut ActorContext<Self>,
488 ) -> Result<(), ActorError> {
489 match msg {
490 EvalWorkerMessage::UpdateGovVersion { gov_version } => {
491 self.gov_version = gov_version;
492 }
493 EvalWorkerMessage::LocalEvaluation { evaluation_req } => {
494 let evaluation =
495 match self.create_res(ctx, false, &evaluation_req).await {
496 Ok(eval) => eval,
497 Err(e) => {
498 error!(
499 msg_type = "LocalEvaluation",
500 error = %e,
501 "Failed to create evaluation response"
502 );
503 return Err(emit_fail(
504 ctx,
505 ActorError::FunctionalCritical {
506 description: e.to_string(),
507 },
508 )
509 .await);
510 }
511 };
512
513 let signature = match get_sign(
514 ctx,
515 SignTypesNode::EvaluationRes(evaluation.clone()),
516 )
517 .await
518 {
519 Ok(signature) => signature,
520 Err(e) => {
521 error!(
522 msg_type = "LocalEvaluation",
523 error = %e,
524 "Failed to sign evaluator response"
525 );
526 return Err(emit_fail(ctx, e).await);
527 }
528 };
529
530 match ctx.get_parent::<Evaluation>().await {
531 Ok(evaluation_actor) => {
532 if let Err(e) = evaluation_actor
533 .tell(EvaluationMessage::Response {
534 evaluation_res: evaluation.clone(),
535 sender: (*self.our_key).clone(),
536 signature: Some(signature),
537 })
538 .await
539 {
540 error!(
541 msg_type = "LocalEvaluation",
542 error = %e,
543 "Failed to send response to evaluation actor"
544 );
545 return Err(emit_fail(ctx, e).await);
546 }
547
548 debug!(
549 msg_type = "LocalEvaluation",
550 "Local evaluation completed successfully"
551 );
552 }
553 Err(e) => {
554 error!(
555 msg_type = "LocalEvaluation",
556 path = %ctx.path().parent(),
557 "Evaluation actor not found"
558 );
559 return Err(e);
560 }
561 }
562
563 ctx.stop(None).await;
564 }
565 EvalWorkerMessage::NetworkRequest {
566 evaluation_req,
567 info,
568 sender,
569 } => {
570 if sender != evaluation_req.signature().signer
571 || sender != self.node_key
572 {
573 warn!(
574 msg_type = "NetworkRequest",
575 expected_sender = %self.node_key,
576 received_sender = %sender,
577 signer = %evaluation_req.signature().signer,
578 "Unexpected sender"
579 );
580 if self.stop {
581 ctx.stop(None).await;
582 }
583
584 return Ok(());
585 }
586
587 let reboot = match self
589 .check_governance(evaluation_req.content().gov_version)
590 .await
591 {
592 Ok(reboot) => reboot,
593 Err(e) => {
594 warn!(
595 msg_type = "NetworkRequest",
596 error = %e,
597 "Failed to check governance"
598 );
599 if let ActorError::Functional { .. } = e {
600 if self.stop {
601 ctx.stop(None).await;
602 }
603 return Err(e);
604 } else {
605 return Err(emit_fail(ctx, e).await);
606 }
607 }
608 };
609
610 let evaluation = if let Err(error) =
611 self.check_data(&evaluation_req)
612 {
613 match self
614 .build_response_error(error, evaluation_req.clone())
615 {
616 Ok(eval) => eval,
617 Err(e) => {
618 error!(
619 msg_type = "NetworkRequest",
620 error = %e,
621 "Failed to build error response"
622 );
623 return Err(emit_fail(
624 ctx,
625 ActorError::FunctionalCritical {
626 description: e.to_string(),
627 },
628 )
629 .await);
630 }
631 }
632 } else {
633 match self.create_res(ctx, reboot, &evaluation_req).await {
634 Ok(eval) => eval,
635 Err(e) => {
636 error!(
637 msg_type = "NetworkRequest",
638 error = %e,
639 "Internal error during evaluation"
640 );
641 return Err(emit_fail(
642 ctx,
643 ActorError::FunctionalCritical {
644 description: e.to_string(),
645 },
646 )
647 .await);
648 }
649 }
650 };
651
652 let signature = match get_sign(
653 ctx,
654 SignTypesNode::EvaluationRes(evaluation.clone()),
655 )
656 .await
657 {
658 Ok(signature) => signature,
659 Err(e) => {
660 error!(
661 msg_type = "NetworkRequest",
662 error = %e,
663 "Failed to sign response"
664 );
665 return Err(emit_fail(ctx, e).await);
666 }
667 };
668
669 let new_info = ComunicateInfo {
670 receiver: sender.clone(),
671 request_id: info.request_id.clone(),
672 version: info.version,
673 receiver_actor: format!(
674 "/user/request/{}/evaluation/{}",
675 evaluation_req
676 .content()
677 .event_request
678 .content()
679 .get_subject_id(),
680 self.our_key.clone()
681 ),
682 };
683
684 let signed_response: Signed<EvaluationRes> =
685 Signed::from_parts(evaluation, signature);
686 if let Err(e) = self
687 .network
688 .send_command(network::CommandHelper::SendMessage {
689 message: NetworkMessage {
690 info: new_info,
691 message: ActorMessage::EvaluationRes {
692 res: signed_response,
693 },
694 },
695 })
696 .await
697 {
698 error!(
699 msg_type = "NetworkRequest",
700 error = %e,
701 "Failed to send response to network"
702 );
703 return Err(emit_fail(ctx, e).await);
704 };
705
706 debug!(
707 msg_type = "NetworkRequest",
708 request_id = %info.request_id,
709 version = info.version,
710 sender = %sender,
711 "Network evaluation request processed successfully"
712 );
713
714 if self.stop {
715 ctx.stop(None).await;
716 }
717 }
718 }
719
720 Ok(())
721 }
722
723 async fn on_child_fault(
724 &mut self,
725 error: ActorError,
726 ctx: &mut ActorContext<Self>,
727 ) -> ChildAction {
728 error!(
729 governance_id = %self.governance_id,
730 gov_version = self.gov_version,
731 sn = self.sn,
732 node_key = %self.node_key,
733 error = %error,
734 "Child fault in evaluation worker"
735 );
736 emit_fail(ctx, error).await;
737 ChildAction::Stop
738 }
739}