1use std::{
2 collections::{BTreeMap, BTreeSet},
3 sync::Arc,
4};
5
6use crate::{
7 evaluation::{
8 compiler::{CompilerResponse, error::CompilerError},
9 request::{EvalWorkerContext, EvaluateData},
10 response::{
11 EvalRunnerError, EvaluationResult, EvaluatorError,
12 EvaluatorResponse as EvalRes,
13 },
14 runner::types::EvaluateInfo,
15 },
16 governance::{data::GovernanceData, model::Schema},
17 helpers::network::{NetworkMessage, service::NetworkSender},
18 model::common::{
19 emit_fail,
20 node::{SignTypesNode, get_sign},
21 },
22 subject::RequestSubjectData,
23 system::ConfigHelper,
24};
25
26use crate::helpers::network::ActorMessage;
27
28use async_trait::async_trait;
29use ave_common::{
30 SchemaType, ValueWrapper,
31 identity::{
32 DigestIdentifier, HashAlgorithm, PublicKey, Signed, hash_borsh,
33 },
34};
35
36use ave_network::ComunicateInfo;
37use json_patch::diff;
38
39use ave_actors::{
40 Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
41 NotPersistentActor,
42};
43
44use serde_json::Value;
45use tracing::{Span, debug, error, info_span, warn};
46
47use super::{
48 Evaluation, EvaluationMessage,
49 compiler::{TempCompiler, TempCompilerMessage},
50 request::EvaluationReq,
51 response::EvaluationRes,
52 runner::{Runner, RunnerMessage, RunnerResponse, types::RunnerResult},
53};
54
55#[derive(Clone, Debug)]
57pub struct EvalWorker {
58 pub node_key: PublicKey,
59 pub our_key: Arc<PublicKey>,
60 pub governance_id: DigestIdentifier,
61 pub gov_version: u64,
62 pub sn: u64,
63 pub context: EvalWorkerContext,
64 pub init_state: Option<ValueWrapper>,
65 pub hash: HashAlgorithm,
66 pub network: Arc<NetworkSender>,
67 pub stop: bool,
68}
69
70impl EvalWorker {
71 async fn execute_contract(
72 &self,
73 ctx: &mut ActorContext<Self>,
74 runner_data: EvaluateInfo,
75 is_owner: bool,
76 ) -> Result<RunnerResponse, ActorError> {
77 let runner_actor = ctx.create_child("runner", Runner).await?;
78
79 let response = runner_actor
80 .ask(RunnerMessage {
81 data: runner_data,
82 is_owner,
83 })
84 .await;
85
86 runner_actor.ask_stop().await?;
87
88 response
89 }
90
91 async fn compile_contracts(
92 &self,
93 ctx: &mut ActorContext<Self>,
94 ids: &[SchemaType],
95 schemas: BTreeMap<SchemaType, Schema>,
96 ) -> Result<Option<CompilerError>, ActorError> {
97 let contracts_path = if let Some(config) =
98 ctx.system().get_helper::<ConfigHelper>("config").await
99 {
100 config.contracts_path
101 } else {
102 return Err(ActorError::Helper {
103 name: "config".to_owned(),
104 reason: "Not found".to_owned(),
105 });
106 };
107
108 let compiler = ctx
109 .create_child("temp_compiler", TempCompiler::new(self.hash))
110 .await?;
111
112 for id in ids {
113 let Some(schema) = schemas.get(id) else {
114 return Err(ActorError::Functional { description: "There is a contract that requires compilation but its scheme could not be found".to_owned()});
115 };
116
117 let response = compiler
118 .ask(TempCompilerMessage::Compile {
119 contract_name: format!("{}_{}", self.governance_id, id),
120 contract: schema.contract.clone(),
121 initial_value: schema.initial_value.0.clone(),
122 contract_path: contracts_path
123 .join("contracts")
124 .join(format!("{}_temp_{}", self.governance_id, id)),
125 })
126 .await?;
127
128 if let CompilerResponse::Error(e) = response {
129 compiler.ask_stop().await?;
130 return Ok(Some(e));
131 }
132 }
133
134 compiler.ask_stop().await?;
135 Ok(None)
136 }
137
138 async fn check_governance(
139 &self,
140 gov_version: u64,
141 ) -> Result<bool, ActorError> {
142 match self.gov_version.cmp(&gov_version) {
143 std::cmp::Ordering::Less => {
144 warn!(
145 local_gov_version = self.gov_version,
146 request_gov_version = gov_version,
147 governance_id = %self.governance_id,
148 sender = %self.node_key,
149 "Received request with a higher governance version; ignoring request"
150 );
151 Err(ActorError::Functional {
152 description:
153 "Abort evaluation, request governance version is higher than local"
154 .to_owned(),
155 })
156 }
157 std::cmp::Ordering::Equal => {
158 Ok(false)
160 }
161 std::cmp::Ordering::Greater => Ok(true),
162 }
163 }
164
165 async fn evaluate(
166 &self,
167 ctx: &mut ActorContext<Self>,
168 evaluation_req: &EvaluationReq,
169 ) -> Result<RunnerResult, EvaluatorError> {
170 let runner_data = evaluation_req
171 .build_evaluate_info(&self.init_state, &self.context)?;
172
173 let response = self
175 .execute_contract(ctx, runner_data, evaluation_req.signer_is_owner)
176 .await
177 .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
178
179 let (result, compilations) = match response {
180 RunnerResponse::Ok {
181 result,
182 compilations,
183 } => (result, compilations),
184 RunnerResponse::Error(runner_error) => {
185 return Err(EvaluatorError::from(runner_error));
186 }
187 };
188
189 if self.init_state.is_none() && !compilations.is_empty() {
190 let governance_data = GovernanceData::try_from(
191 result.final_state.clone(),
192 )
193 .map_err(|e| {
194 let e = format!(
195 "can not convert GovernanceData from properties: {}",
196 e
197 );
198 EvaluatorError::InternalError(e)
199 })?;
200
201 if let Some(error) = self
202 .compile_contracts(ctx, &compilations, governance_data.schemas)
203 .await
204 .map_err(|e| EvaluatorError::InternalError(e.to_string()))?
205 {
206 return Err(EvaluatorError::from(error));
207 };
208 }
209
210 Ok(result)
211 }
212
213 fn generate_json_patch(
214 prev_state: &Value,
215 new_state: &Value,
216 ) -> Result<Value, EvaluatorError> {
217 let patch = diff(prev_state, new_state);
218 serde_json::to_value(patch).map_err(|e| {
219 EvaluatorError::InternalError(format!(
220 "Can not generate json patch {}",
221 e
222 ))
223 })
224 }
225
226 fn build_request_hashes(
227 &self,
228 evaluation_req: &Signed<EvaluationReq>,
229 ) -> Result<(DigestIdentifier, DigestIdentifier), EvaluatorError> {
230 let eval_req_hash = hash_borsh(&*self.hash.hasher(), evaluation_req)
231 .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
232
233 let EvaluationReq {
234 event_request,
235 governance_id,
236 sn,
237 gov_version,
238 namespace,
239 schema_id,
240 signer,
241 ..
242 } = evaluation_req.content().clone();
243
244 let req_subject_data_hash = hash_borsh(
245 &*self.hash.hasher(),
246 &RequestSubjectData {
247 namespace,
248 schema_id,
249 subject_id: event_request.content().get_subject_id(),
250 governance_id,
251 sn,
252 gov_version,
253 signer,
254 },
255 )
256 .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
257
258 Ok((eval_req_hash, req_subject_data_hash))
259 }
260
261 async fn sign_result(
262 &self,
263 ctx: &mut ActorContext<Self>,
264 result: EvaluationResult,
265 ) -> Result<EvaluationRes, EvaluatorError> {
266 let result_hash = hash_borsh(&*self.hash.hasher(), &result)
267 .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
268
269 let result_hash_signature = get_sign(
270 ctx,
271 SignTypesNode::EvaluationSignature(result_hash.clone()),
272 )
273 .await
274 .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
275
276 Ok(EvaluationRes::Response {
277 result,
278 result_hash,
279 result_hash_signature,
280 })
281 }
282
283 async fn build_response(
284 &self,
285 ctx: &mut ActorContext<Self>,
286 evaluation: RunnerResult,
287 evaluation_req: Signed<EvaluationReq>,
288 ) -> Result<EvaluationRes, EvaluatorError> {
289 let (eval_req_hash, req_subject_data_hash) =
290 self.build_request_hashes(&evaluation_req)?;
291
292 let (patch, properties_hash) = match &evaluation_req.content().data {
293 EvaluateData::GovFact { state, .. } => {
294 let properties_hash =
295 hash_borsh(&*self.hash.hasher(), &evaluation.final_state)
296 .map_err(|e| {
297 EvaluatorError::InternalError(e.to_string())
298 })?;
299
300 let state = state.to_value_wrapper();
301 let patch = Self::generate_json_patch(
302 &state.0,
303 &evaluation.final_state.0,
304 )?;
305
306 (ValueWrapper(patch), properties_hash)
307 }
308 EvaluateData::TrackerSchemasFact { state, .. } => {
309 let properties_hash =
310 hash_borsh(&*self.hash.hasher(), &evaluation.final_state)
311 .map_err(|e| {
312 EvaluatorError::InternalError(e.to_string())
313 })?;
314
315 let patch = Self::generate_json_patch(
316 &state.0,
317 &evaluation.final_state.0,
318 )?;
319
320 (ValueWrapper(patch), properties_hash)
321 }
322 EvaluateData::GovTransfer { state } => {
323 let state = state.to_value_wrapper();
324 let properties_hash = hash_borsh(&*self.hash.hasher(), &state)
325 .map_err(|e| {
326 EvaluatorError::InternalError(e.to_string())
327 })?;
328
329 (evaluation.final_state, properties_hash)
330 }
331 EvaluateData::GovConfirm { state } => {
332 let properties_hash =
333 hash_borsh(&*self.hash.hasher(), &evaluation.final_state)
334 .map_err(|e| {
335 EvaluatorError::InternalError(e.to_string())
336 })?;
337
338 let state = state.to_value_wrapper();
339 let patch = Self::generate_json_patch(
340 &state.0,
341 &evaluation.final_state.0,
342 )?;
343
344 (ValueWrapper(patch), properties_hash)
345 }
346 EvaluateData::TrackerSchemasTransfer { state, .. } => {
347 let properties_hash = hash_borsh(&*self.hash.hasher(), &state)
348 .map_err(|e| {
349 EvaluatorError::InternalError(e.to_string())
350 })?;
351
352 (evaluation.final_state, properties_hash)
353 }
354 };
355
356 let result = EvaluationResult::Ok {
357 response: EvalRes {
358 patch,
359 properties_hash,
360 appr_required: evaluation.approval_required,
361 },
362 eval_req_hash,
363 req_subject_data_hash,
364 };
365
366 self.sign_result(ctx, result).await
367 }
368
369 async fn build_response_error(
370 &self,
371 ctx: &mut ActorContext<Self>,
372 evaluator_error: EvaluatorError,
373 evaluation_req: Signed<EvaluationReq>,
374 ) -> Result<EvaluationRes, EvaluatorError> {
375 match &evaluator_error {
376 EvaluatorError::InvalidEventSignature
377 | EvaluatorError::InvalidEventRequest(..) => {
378 return Ok(EvaluationRes::Abort(evaluator_error.to_string()));
379 }
380 EvaluatorError::Runner(EvalRunnerError::ContractNotFound(..)) => {
381 return Ok(EvaluationRes::Reboot);
382 }
383 _ => {}
384 };
385
386 let (eval_req_hash, req_subject_data_hash) =
387 self.build_request_hashes(&evaluation_req)?;
388
389 let result = EvaluationResult::Error {
390 error: evaluator_error,
391 eval_req_hash,
392 req_subject_data_hash,
393 };
394
395 self.sign_result(ctx, result).await
396 }
397
398 async fn create_res(
399 &self,
400 ctx: &mut ActorContext<Self>,
401 reboot: bool,
402 evaluation_req: &Signed<EvaluationReq>,
403 ) -> Result<EvaluationRes, EvaluatorError> {
404 let evaluation = if reboot {
405 EvaluationRes::Reboot
406 } else {
407 match self.evaluate(ctx, evaluation_req.content()).await {
408 Ok(evaluation) => {
409 self.build_response(ctx, evaluation, evaluation_req.clone())
410 .await?
411 }
412 Err(error) => {
413 if let EvaluatorError::InternalError(..) = &error {
414 return Err(error);
415 } else {
416 self.build_response_error(
417 ctx,
418 error,
419 evaluation_req.clone(),
420 )
421 .await?
422 }
423 }
424 }
425 };
426
427 Ok(evaluation)
428 }
429
430 fn check_data(
431 &self,
432 evaluation_req: &Signed<EvaluationReq>,
433 ) -> Result<(), EvaluatorError> {
434 let event_is_for_gov = evaluation_req.content().data.is_gov_event();
435 match (self.init_state.is_none(), event_is_for_gov) {
436 (true, false) => return Err(EvaluatorError::InvalidEventRequest(
437 "Evaluator is for governance but eval request is for tracker"
438 .to_owned(),
439 )),
440 (false, true) => return Err(EvaluatorError::InvalidEventRequest(
441 "Evaluator is for tracker but eval request is for governance"
442 .to_owned(),
443 )),
444 _ => {}
445 };
446
447 if evaluation_req.content().governance_id != self.governance_id {
448 return Err(EvaluatorError::InvalidEventRequest(format!(
449 "Evaluator governance_id {} and eval request governance_id {} are different",
450 self.governance_id,
451 evaluation_req.content().governance_id
452 )));
453 }
454
455 if evaluation_req.verify().is_err() {
456 return Err(EvaluatorError::InvalidEventSignature);
457 }
458
459 if evaluation_req.content().event_request.verify().is_err() {
460 return Err(EvaluatorError::InvalidEventSignature);
461 }
462
463 if self.gov_version == evaluation_req.content().gov_version {
464 let signer = evaluation_req
465 .content()
466 .event_request
467 .signature()
468 .signer
469 .clone();
470
471 if evaluation_req
472 .content()
473 .event_request
474 .content()
475 .is_fact_event()
476 {
477 let Some((issuers, issuer_any)) = self.context.issuers() else {
478 return Err(EvaluatorError::InvalidEventRequest(
479 "Fact event evaluation context does not include issuers"
480 .to_owned(),
481 ));
482 };
483
484 if !issuer_any && !issuers.contains(&signer) {
485 return Err(EvaluatorError::InvalidEventRequest(
486 "In fact events, the signer has to be an issuer"
487 .to_owned(),
488 ));
489 }
490 } else if signer != self.node_key {
491 return Err(EvaluatorError::InvalidEventRequest(
492 "In non-fact events, the signer has to be the sender"
493 .to_owned(),
494 ));
495 }
496 }
497
498 Ok(())
499 }
500}
501
502#[derive(Debug, Clone)]
503pub enum EvalWorkerMessage {
504 Update {
505 gov_version: u64,
506 issuers: BTreeSet<PublicKey>,
507 issuer_any: bool,
508 },
509 LocalEvaluation {
510 evaluation_req: Signed<EvaluationReq>,
511 },
512 NetworkRequest {
513 evaluation_req: Signed<EvaluationReq>,
514 sender: PublicKey,
515 info: ComunicateInfo,
516 },
517}
518
519impl Message for EvalWorkerMessage {}
520
521#[async_trait]
522impl Actor for EvalWorker {
523 type Event = ();
524 type Message = EvalWorkerMessage;
525 type Response = ();
526
527 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
528 parent_span.map_or_else(
529 || info_span!("EvalWorker", id),
530 |parent_span| info_span!(parent: parent_span, "EvalWorker", id),
531 )
532 }
533}
534
535impl NotPersistentActor for EvalWorker {}
536
537#[async_trait]
538impl Handler<Self> for EvalWorker {
539 async fn handle_message(
540 &mut self,
541 _sender: ActorPath,
542 msg: EvalWorkerMessage,
543 ctx: &mut ActorContext<Self>,
544 ) -> Result<(), ActorError> {
545 match msg {
546 EvalWorkerMessage::Update {
547 gov_version,
548 issuers,
549 issuer_any,
550 } => {
551 self.gov_version = gov_version;
552 self.context = EvalWorkerContext::Governance {
553 issuers,
554 issuer_any,
555 };
556 }
557 EvalWorkerMessage::LocalEvaluation { evaluation_req } => {
558 let evaluation =
559 match self.create_res(ctx, false, &evaluation_req).await {
560 Ok(eval) => eval,
561 Err(e) => {
562 error!(
563 msg_type = "LocalEvaluation",
564 error = %e,
565 "Failed to create evaluation response"
566 );
567 return Err(emit_fail(
568 ctx,
569 ActorError::FunctionalCritical {
570 description: e.to_string(),
571 },
572 )
573 .await);
574 }
575 };
576
577 match ctx.get_parent::<Evaluation>().await {
578 Ok(evaluation_actor) => {
579 if let Err(e) = evaluation_actor
580 .tell(EvaluationMessage::Response {
581 evaluation_res: evaluation.clone(),
582 sender: (*self.our_key).clone(),
583 })
584 .await
585 {
586 error!(
587 msg_type = "LocalEvaluation",
588 error = %e,
589 "Failed to send response to evaluation actor"
590 );
591 return Err(emit_fail(ctx, e).await);
592 }
593
594 debug!(
595 msg_type = "LocalEvaluation",
596 "Local evaluation completed successfully"
597 );
598 }
599 Err(e) => {
600 error!(
601 msg_type = "LocalEvaluation",
602 path = %ctx.path().parent(),
603 "Evaluation actor not found"
604 );
605 return Err(e);
606 }
607 }
608
609 ctx.stop(None).await;
610 }
611 EvalWorkerMessage::NetworkRequest {
612 evaluation_req,
613 info,
614 sender,
615 } => {
616 if sender != evaluation_req.signature().signer
617 || sender != self.node_key
618 {
619 warn!(
620 msg_type = "NetworkRequest",
621 expected_sender = %self.node_key,
622 received_sender = %sender,
623 signer = %evaluation_req.signature().signer,
624 "Unexpected sender"
625 );
626 if self.stop {
627 ctx.stop(None).await;
628 }
629
630 return Ok(());
631 }
632
633 let reboot = match self
634 .check_governance(evaluation_req.content().gov_version)
635 .await
636 {
637 Ok(reboot) => reboot,
638 Err(e) => {
639 warn!(
640 msg_type = "NetworkRequest",
641 error = %e,
642 "Failed to check governance"
643 );
644 if let ActorError::Functional { .. } = e {
645 if self.stop {
646 ctx.stop(None).await;
647 }
648 return Err(e);
649 } else {
650 return Err(emit_fail(ctx, e).await);
651 }
652 }
653 };
654
655 let evaluation = if let Err(error) =
656 self.check_data(&evaluation_req)
657 {
658 match self
659 .build_response_error(
660 ctx,
661 error,
662 evaluation_req.clone(),
663 )
664 .await
665 {
666 Ok(eval) => eval,
667 Err(e) => {
668 error!(
669 msg_type = "NetworkRequest",
670 error = %e,
671 "Failed to build error response"
672 );
673 return Err(emit_fail(
674 ctx,
675 ActorError::FunctionalCritical {
676 description: e.to_string(),
677 },
678 )
679 .await);
680 }
681 }
682 } else {
683 match self.create_res(ctx, reboot, &evaluation_req).await {
684 Ok(eval) => eval,
685 Err(e) => {
686 error!(
687 msg_type = "NetworkRequest",
688 error = %e,
689 "Internal error during evaluation"
690 );
691 return Err(emit_fail(
692 ctx,
693 ActorError::FunctionalCritical {
694 description: e.to_string(),
695 },
696 )
697 .await);
698 }
699 }
700 };
701
702 let new_info = ComunicateInfo {
703 receiver: sender.clone(),
704 request_id: info.request_id.clone(),
705 version: info.version,
706 receiver_actor: format!(
707 "/user/request/{}/evaluation/{}",
708 evaluation_req
709 .content()
710 .event_request
711 .content()
712 .get_subject_id(),
713 self.our_key.clone()
714 ),
715 };
716
717 if let Err(e) = self
718 .network
719 .send_command(ave_network::CommandHelper::SendMessage {
720 message: NetworkMessage {
721 info: new_info,
722 message: ActorMessage::EvaluationRes {
723 res: evaluation,
724 },
725 },
726 })
727 .await
728 {
729 error!(
730 msg_type = "NetworkRequest",
731 error = %e,
732 "Failed to send response to network"
733 );
734 return Err(emit_fail(ctx, e).await);
735 };
736
737 debug!(
738 msg_type = "NetworkRequest",
739 request_id = %info.request_id,
740 version = info.version,
741 sender = %sender,
742 "Network evaluation request processed successfully"
743 );
744
745 if self.stop {
746 ctx.stop(None).await;
747 }
748 }
749 }
750
751 Ok(())
752 }
753
754 async fn on_child_fault(
755 &mut self,
756 error: ActorError,
757 ctx: &mut ActorContext<Self>,
758 ) -> ChildAction {
759 error!(
760 governance_id = %self.governance_id,
761 gov_version = self.gov_version,
762 sn = self.sn,
763 node_key = %self.node_key,
764 error = %error,
765 "Child fault in evaluation worker"
766 );
767 emit_fail(ctx, error).await;
768 ChildAction::Stop
769 }
770}