1use std::{collections::BTreeMap, sync::Arc};
2
3use crate::{
4 evaluation::{
5 compiler::{CompilerResponse, error::CompilerError},
6 request::EvaluateData,
7 response::{
8 EvalRunnerError, EvaluatorError, EvaluatorResponse as EvalRes,
9 },
10 runner::types::EvaluateInfo,
11 },
12 governance::{data::GovernanceData, model::Schema},
13 helpers::network::{NetworkMessage, service::NetworkSender},
14 model::common::{
15 emit_fail,
16 node::{SignTypesNode, UpdateData, get_sign, update_ledger_network},
17 },
18 subject::RequestSubjectData,
19 system::ConfigHelper,
20};
21
22use crate::helpers::network::ActorMessage;
23
24use async_trait::async_trait;
25use ave_common::{
26 SchemaType, ValueWrapper,
27 identity::{
28 DigestIdentifier, HashAlgorithm, PublicKey, Signed, hash_borsh,
29 },
30};
31
32use json_patch::diff;
33use network::ComunicateInfo;
34
35use ave_actors::{
36 Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
37 NotPersistentActor,
38};
39
40use serde_json::Value;
41use tracing::{Span, debug, error, info_span, warn};
42
43use super::{
44 Evaluation, EvaluationMessage,
45 compiler::{Compiler, CompilerMessage},
46 request::EvaluationReq,
47 response::EvaluationRes,
48 runner::{Runner, RunnerMessage, RunnerResponse, types::RunnerResult},
49};
50
51#[derive(Clone, Debug)]
53pub struct EvalWorker {
54 pub node_key: PublicKey,
55 pub our_key: Arc<PublicKey>,
56 pub governance_id: DigestIdentifier,
57 pub gov_version: u64,
58 pub sn: u64,
59 pub init_state: Option<ValueWrapper>,
60 pub hash: HashAlgorithm,
61 pub network: Arc<NetworkSender>,
62 pub stop: bool,
63}
64
65impl EvalWorker {
66 async fn execute_contract(
67 &self,
68 ctx: &mut ActorContext<Self>,
69 runner_data: EvaluateInfo,
70 is_owner: bool,
71 ) -> Result<RunnerResponse, ActorError> {
72 let runner_actor = ctx.create_child("runner", Runner).await?;
73
74 let response = runner_actor
75 .ask(RunnerMessage {
76 data: runner_data,
77 is_owner,
78 })
79 .await;
80
81 runner_actor.ask_stop().await?;
82
83 response
84 }
85
86 async fn compile_contracts(
87 &self,
88 ctx: &mut ActorContext<Self>,
89 ids: &[SchemaType],
90 schemas: BTreeMap<SchemaType, Schema>,
91 ) -> Result<Option<CompilerError>, ActorError> {
92 let contracts_path = if let Some(config) =
93 ctx.system().get_helper::<ConfigHelper>("config").await
94 {
95 config.contracts_path
96 } else {
97 return Err(ActorError::Helper {
98 name: "config".to_owned(),
99 reason: "Not found".to_owned(),
100 });
101 };
102
103 let compiler = ctx
104 .create_child("compiler", Compiler::new(self.hash))
105 .await?;
106
107 for id in ids {
108 let Some(schema) = schemas.get(id) else {
109 return Err(ActorError::Functional { description: "There is a contract that requires compilation but its scheme could not be found".to_owned()});
110 };
111
112 let response = compiler
113 .ask(CompilerMessage::TemporalCompile {
114 contract_name: format!("{}_{}", self.governance_id, id),
115 contract: schema.contract.clone(),
116 initial_value: schema.initial_value.0.clone(),
117 contract_path: contracts_path
118 .join("contracts")
119 .join(format!("{}_temp_{}", self.governance_id, id)),
120 })
121 .await?;
122
123 if let CompilerResponse::Error(e) = response {
124 compiler.ask_stop().await?;
125 return Ok(Some(e));
126 }
127 }
128
129 compiler.ask_stop().await?;
130 Ok(None)
131 }
132
133 async fn check_governance(
134 &self,
135 gov_version: u64,
136 ) -> Result<bool, ActorError> {
137 match gov_version.cmp(&self.gov_version) {
138 std::cmp::Ordering::Equal => {
139 }
141 std::cmp::Ordering::Greater => {
142 let data = UpdateData {
144 sn: self.sn,
145 gov_version: self.gov_version,
146 subject_id: self.governance_id.clone(),
147 other_node: self.node_key.clone(),
148 };
149 update_ledger_network(data, self.network.clone()).await?;
150 let e = ActorError::Functional {
151 description: "Abort evaluation, update is required"
152 .to_owned(),
153 };
154 return Err(e);
155 }
156 std::cmp::Ordering::Less => {
157 return Ok(true);
158 }
159 }
160
161 Ok(false)
162 }
163
164 async fn evaluate(
165 &self,
166 ctx: &mut ActorContext<Self>,
167 evaluation_req: &EvaluationReq,
168 ) -> Result<RunnerResult, EvaluatorError> {
169 let runner_data =
170 evaluation_req.build_evaluate_info(&self.init_state)?;
171
172 let response = self
174 .execute_contract(ctx, runner_data, evaluation_req.signer_is_owner)
175 .await
176 .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
177
178 let (result, compilations) = match response {
179 RunnerResponse::Ok {
180 result,
181 compilations,
182 } => (result, compilations),
183 RunnerResponse::Error(runner_error) => {
184 return Err(EvaluatorError::from(runner_error));
185 }
186 };
187
188 if self.init_state.is_none() && !compilations.is_empty() {
189 let governance_data = GovernanceData::try_from(
190 result.final_state.clone(),
191 )
192 .map_err(|e| {
193 let e = format!(
194 "can not convert GovernanceData from properties: {}",
195 e
196 );
197 EvaluatorError::InternalError(e)
198 })?;
199
200 if let Some(error) = self
201 .compile_contracts(ctx, &compilations, governance_data.schemas)
202 .await
203 .map_err(|e| EvaluatorError::InternalError(e.to_string()))?
204 {
205 return Err(EvaluatorError::from(error));
206 };
207 }
208
209 Ok(result)
210 }
211
212 fn generate_json_patch(
213 prev_state: &Value,
214 new_state: &Value,
215 ) -> Result<Value, EvaluatorError> {
216 let patch = diff(prev_state, new_state);
217 serde_json::to_value(patch).map_err(|e| {
218 EvaluatorError::InternalError(format!(
219 "Can not generate json patch {}",
220 e
221 ))
222 })
223 }
224
225 fn build_response(
226 &self,
227 evaluation: RunnerResult,
228 evaluation_req: Signed<EvaluationReq>,
229 ) -> Result<EvaluationRes, EvaluatorError> {
230 let eval_req_hash =
231 hash_borsh(&*self.hash.hasher(), &evaluation_req)
232 .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
233
234 let EvaluationReq {
235 event_request,
236 governance_id,
237 sn,
238 gov_version,
239 namespace,
240 schema_id,
241 signer,
242 ..
243 } = evaluation_req.content().clone();
244
245 let req_subject_data_hash = hash_borsh(
246 &*self.hash.hasher(),
247 &RequestSubjectData {
248 namespace,
249 schema_id,
250 subject_id: event_request.content().get_subject_id(),
251 governance_id,
252 sn,
253 gov_version,
254 signer,
255 },
256 )
257 .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
258
259 let (patch, properties_hash) = match &evaluation_req.content().data {
260 EvaluateData::GovFact { state, .. } => {
261 let properties_hash =
262 hash_borsh(&*self.hash.hasher(), &evaluation.final_state)
263 .map_err(|e| {
264 EvaluatorError::InternalError(e.to_string())
265 })?;
266
267 let state = state.to_value_wrapper();
268 let patch = Self::generate_json_patch(
269 &state.0,
270 &evaluation.final_state.0,
271 )?;
272
273 (ValueWrapper(patch), properties_hash)
274 }
275 EvaluateData::TrackerSchemasFact { state, .. } => {
276 let properties_hash =
277 hash_borsh(&*self.hash.hasher(), &evaluation.final_state)
278 .map_err(|e| {
279 EvaluatorError::InternalError(e.to_string())
280 })?;
281
282 let patch = Self::generate_json_patch(
283 &state.0,
284 &evaluation.final_state.0,
285 )?;
286
287 (ValueWrapper(patch), properties_hash)
288 }
289 EvaluateData::GovTransfer { state } => {
290 let state = state.to_value_wrapper();
291 let properties_hash = hash_borsh(&*self.hash.hasher(), &state)
292 .map_err(|e| {
293 EvaluatorError::InternalError(e.to_string())
294 })?;
295
296 (evaluation.final_state, properties_hash)
297 }
298 EvaluateData::GovConfirm { state } => {
299 let properties_hash =
300 hash_borsh(&*self.hash.hasher(), &evaluation.final_state)
301 .map_err(|e| {
302 EvaluatorError::InternalError(e.to_string())
303 })?;
304
305 let state = state.to_value_wrapper();
306 let patch = Self::generate_json_patch(
307 &state.0,
308 &evaluation.final_state.0,
309 )?;
310
311 (ValueWrapper(patch), properties_hash)
312 }
313 EvaluateData::TrackerSchemasTransfer { state, .. } => {
314 let properties_hash = hash_borsh(&*self.hash.hasher(), &state)
315 .map_err(|e| {
316 EvaluatorError::InternalError(e.to_string())
317 })?;
318
319 (evaluation.final_state, properties_hash)
320 }
321 };
322
323 Ok(EvaluationRes::Response {
324 response: EvalRes {
325 patch,
326 properties_hash,
327 appr_required: evaluation.approval_required,
328 },
329 eval_req_hash,
330 req_subject_data_hash,
331 })
332 }
333
334 fn build_response_error(
335 &self,
336 evaluator_error: EvaluatorError,
337 evaluation_req: Signed<EvaluationReq>,
338 ) -> Result<EvaluationRes, EvaluatorError> {
339 match &evaluator_error {
340 EvaluatorError::InvalidEventSignature
341 | EvaluatorError::InvalidEventRequest(..) => {
342 return Ok(EvaluationRes::Abort(evaluator_error.to_string()));
343 }
344 EvaluatorError::Runner(EvalRunnerError::ContractNotFound(..)) => {
345 return Ok(EvaluationRes::Reboot);
346 }
347 _ => {}
348 };
349
350 let eval_req_hash =
351 hash_borsh(&*self.hash.hasher(), &evaluation_req)
352 .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
353
354 let EvaluationReq {
355 event_request,
356 governance_id,
357 sn,
358 gov_version,
359 namespace,
360 schema_id,
361 signer,
362 ..
363 } = evaluation_req.content().clone();
364
365 let req_subject_data_hash = hash_borsh(
366 &*self.hash.hasher(),
367 &RequestSubjectData {
368 namespace,
369 schema_id,
370 subject_id: event_request.content().get_subject_id(),
371 governance_id,
372 sn,
373 gov_version,
374 signer,
375 },
376 )
377 .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
378
379 Ok(EvaluationRes::Error {
380 error: evaluator_error,
381 eval_req_hash,
382 req_subject_data_hash,
383 })
384 }
385
386 async fn create_res(
387 &self,
388 ctx: &mut ActorContext<Self>,
389 reboot: bool,
390 evaluation_req: &Signed<EvaluationReq>,
391 ) -> Result<EvaluationRes, EvaluatorError> {
392 let evaluation = if reboot {
393 EvaluationRes::Reboot
394 } else {
395 match self.evaluate(ctx, evaluation_req.content()).await {
396 Ok(evaluation) => {
397 self.build_response(evaluation, evaluation_req.clone())?
398 }
399 Err(error) => {
400 if let EvaluatorError::InternalError(..) = &error {
401 return Err(error);
402 } else {
403 self.build_response_error(
404 error,
405 evaluation_req.clone(),
406 )?
407 }
408 }
409 }
410 };
411
412 Ok(evaluation)
413 }
414
415 fn check_data(
416 &self,
417 evaluation_req: &Signed<EvaluationReq>,
418 ) -> Result<(), EvaluatorError> {
419 let event_is_for_gov = evaluation_req.content().data.is_gov_event();
420 match (self.init_state.is_none(), event_is_for_gov) {
421 (true, false) => return Err(EvaluatorError::InvalidEventRequest(
422 "Evaluator is for governance but eval request is for tracker"
423 .to_owned(),
424 )),
425 (false, true) => return Err(EvaluatorError::InvalidEventRequest(
426 "Evaluator is for tracker but eval request is for governance"
427 .to_owned(),
428 )),
429 _ => {}
430 };
431
432 if evaluation_req.content().governance_id != self.governance_id {
433 return Err(EvaluatorError::InvalidEventRequest(format!(
434 "Evaluator governance_id {} and eval request governance_id {} are different",
435 self.governance_id,
436 evaluation_req.content().governance_id
437 )));
438 }
439
440 if evaluation_req.verify().is_err() {
441 return Err(EvaluatorError::InvalidEventSignature);
442 }
443
444 if evaluation_req.content().event_request.verify().is_err() {
445 return Err(EvaluatorError::InvalidEventSignature);
446 }
447
448 Ok(())
449 }
450}
451
452#[derive(Debug, Clone)]
453pub enum EvalWorkerMessage {
454 UpdateGovVersion {
455 gov_version: u64,
456 },
457 LocalEvaluation {
458 evaluation_req: Signed<EvaluationReq>,
459 },
460 NetworkRequest {
461 evaluation_req: Signed<EvaluationReq>,
462 sender: PublicKey,
463 info: ComunicateInfo,
464 },
465}
466
467impl Message for EvalWorkerMessage {}
468
469#[async_trait]
470impl Actor for EvalWorker {
471 type Event = ();
472 type Message = EvalWorkerMessage;
473 type Response = ();
474
475 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
476 parent_span.map_or_else(
477 || info_span!("EvalWorker", id),
478 |parent_span| info_span!(parent: parent_span, "EvalWorker", id),
479 )
480 }
481}
482
483impl NotPersistentActor for EvalWorker {}
484
485#[async_trait]
486impl Handler<Self> for EvalWorker {
487 async fn handle_message(
488 &mut self,
489 _sender: ActorPath,
490 msg: EvalWorkerMessage,
491 ctx: &mut ActorContext<Self>,
492 ) -> Result<(), ActorError> {
493 match msg {
494 EvalWorkerMessage::UpdateGovVersion { gov_version } => {
495 self.gov_version = gov_version;
496 }
497 EvalWorkerMessage::LocalEvaluation { evaluation_req } => {
498 let evaluation =
499 match self.create_res(ctx, false, &evaluation_req).await {
500 Ok(eval) => eval,
501 Err(e) => {
502 error!(
503 msg_type = "LocalEvaluation",
504 error = %e,
505 "Failed to create evaluation response"
506 );
507 return Err(emit_fail(
508 ctx,
509 ActorError::FunctionalCritical {
510 description: e.to_string(),
511 },
512 )
513 .await);
514 }
515 };
516
517 let signature = match get_sign(
518 ctx,
519 SignTypesNode::EvaluationRes(evaluation.clone()),
520 )
521 .await
522 {
523 Ok(signature) => signature,
524 Err(e) => {
525 error!(
526 msg_type = "LocalEvaluation",
527 error = %e,
528 "Failed to sign evaluator response"
529 );
530 return Err(emit_fail(ctx, e).await);
531 }
532 };
533
534 match ctx.get_parent::<Evaluation>().await {
535 Ok(evaluation_actor) => {
536 if let Err(e) = evaluation_actor
537 .tell(EvaluationMessage::Response {
538 evaluation_res: evaluation.clone(),
539 sender: (*self.our_key).clone(),
540 signature: Some(signature),
541 })
542 .await
543 {
544 error!(
545 msg_type = "LocalEvaluation",
546 error = %e,
547 "Failed to send response to evaluation actor"
548 );
549 return Err(emit_fail(ctx, e).await);
550 }
551
552 debug!(
553 msg_type = "LocalEvaluation",
554 "Local evaluation completed successfully"
555 );
556 }
557 Err(e) => {
558 error!(
559 msg_type = "LocalEvaluation",
560 path = %ctx.path().parent(),
561 "Evaluation actor not found"
562 );
563 return Err(e);
564 }
565 }
566
567 ctx.stop(None).await;
568 }
569 EvalWorkerMessage::NetworkRequest {
570 evaluation_req,
571 info,
572 sender,
573 } => {
574 if sender != evaluation_req.signature().signer
575 || sender != self.node_key
576 {
577 warn!(
578 msg_type = "NetworkRequest",
579 expected_sender = %self.node_key,
580 received_sender = %sender,
581 signer = %evaluation_req.signature().signer,
582 "Unexpected sender"
583 );
584 if self.stop {
585 ctx.stop(None).await;
586 }
587
588 return Ok(());
589 }
590
591 let reboot = match self
593 .check_governance(evaluation_req.content().gov_version)
594 .await
595 {
596 Ok(reboot) => reboot,
597 Err(e) => {
598 warn!(
599 msg_type = "NetworkRequest",
600 error = %e,
601 "Failed to check governance"
602 );
603 if let ActorError::Functional { .. } = e {
604 if self.stop {
605 ctx.stop(None).await;
606 }
607 return Err(e);
608 } else {
609 return Err(emit_fail(ctx, e).await);
610 }
611 }
612 };
613
614 let evaluation = if let Err(error) =
615 self.check_data(&evaluation_req)
616 {
617 match self
618 .build_response_error(error, evaluation_req.clone())
619 {
620 Ok(eval) => eval,
621 Err(e) => {
622 error!(
623 msg_type = "NetworkRequest",
624 error = %e,
625 "Failed to build error response"
626 );
627 return Err(emit_fail(
628 ctx,
629 ActorError::FunctionalCritical {
630 description: e.to_string(),
631 },
632 )
633 .await);
634 }
635 }
636 } else {
637 match self.create_res(ctx, reboot, &evaluation_req).await {
638 Ok(eval) => eval,
639 Err(e) => {
640 error!(
641 msg_type = "NetworkRequest",
642 error = %e,
643 "Internal error during evaluation"
644 );
645 return Err(emit_fail(
646 ctx,
647 ActorError::FunctionalCritical {
648 description: e.to_string(),
649 },
650 )
651 .await);
652 }
653 }
654 };
655
656 let signature = match get_sign(
657 ctx,
658 SignTypesNode::EvaluationRes(evaluation.clone()),
659 )
660 .await
661 {
662 Ok(signature) => signature,
663 Err(e) => {
664 error!(
665 msg_type = "NetworkRequest",
666 error = %e,
667 "Failed to sign response"
668 );
669 return Err(emit_fail(ctx, e).await);
670 }
671 };
672
673 let new_info = ComunicateInfo {
674 receiver: sender.clone(),
675 request_id: info.request_id.clone(),
676 version: info.version,
677 receiver_actor: format!(
678 "/user/request/{}/evaluation/{}",
679 evaluation_req
680 .content()
681 .event_request
682 .content()
683 .get_subject_id(),
684 self.our_key.clone()
685 ),
686 };
687
688 let signed_response: Signed<EvaluationRes> =
689 Signed::from_parts(evaluation, signature);
690 if let Err(e) = self
691 .network
692 .send_command(network::CommandHelper::SendMessage {
693 message: NetworkMessage {
694 info: new_info,
695 message: ActorMessage::EvaluationRes {
696 res: signed_response,
697 },
698 },
699 })
700 .await
701 {
702 error!(
703 msg_type = "NetworkRequest",
704 error = %e,
705 "Failed to send response to network"
706 );
707 return Err(emit_fail(ctx, e).await);
708 };
709
710 debug!(
711 msg_type = "NetworkRequest",
712 request_id = %info.request_id,
713 version = info.version,
714 sender = %sender,
715 "Network evaluation request processed successfully"
716 );
717
718 if self.stop {
719 ctx.stop(None).await;
720 }
721 }
722 }
723
724 Ok(())
725 }
726
727 async fn on_child_fault(
728 &mut self,
729 error: ActorError,
730 ctx: &mut ActorContext<Self>,
731 ) -> ChildAction {
732 error!(
733 governance_id = %self.governance_id,
734 gov_version = self.gov_version,
735 sn = self.sn,
736 node_key = %self.node_key,
737 error = %error,
738 "Child fault in evaluation worker"
739 );
740 emit_fail(ctx, error).await;
741 ChildAction::Stop
742 }
743}