1use std::{collections::{BTreeMap, BTreeSet}, sync::Arc};
2
3use crate::{
4 evaluation::{
5 compiler::{CompilerResponse, error::CompilerError},
6 request::{EvalWorkerContext, EvaluateData},
7 response::{
8 EvalRunnerError, EvaluationResult, EvaluatorError,
9 EvaluatorResponse as EvalRes,
10 },
11 runner::types::EvaluateInfo,
12 },
13 governance::{data::GovernanceData, model::Schema},
14 helpers::network::{NetworkMessage, service::NetworkSender},
15 model::common::{
16 emit_fail,
17 node::{SignTypesNode, get_sign},
18 },
19 subject::RequestSubjectData,
20 system::ConfigHelper,
21};
22
23use crate::helpers::network::ActorMessage;
24
25use async_trait::async_trait;
26use ave_common::{
27 SchemaType, ValueWrapper,
28 identity::{
29 DigestIdentifier, HashAlgorithm, PublicKey, Signed, hash_borsh,
30 },
31};
32
33use json_patch::diff;
34use ave_network::ComunicateInfo;
35
36use ave_actors::{
37 Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
38 NotPersistentActor,
39};
40
41use serde_json::Value;
42use tracing::{Span, debug, error, info_span, warn};
43
44use super::{
45 Evaluation, EvaluationMessage,
46 compiler::{TempCompiler, TempCompilerMessage},
47 request::EvaluationReq,
48 response::EvaluationRes,
49 runner::{Runner, RunnerMessage, RunnerResponse, types::RunnerResult},
50};
51
52#[derive(Clone, Debug)]
54pub struct EvalWorker {
55 pub node_key: PublicKey,
56 pub our_key: Arc<PublicKey>,
57 pub governance_id: DigestIdentifier,
58 pub gov_version: u64,
59 pub sn: u64,
60 pub context: EvalWorkerContext,
61 pub init_state: Option<ValueWrapper>,
62 pub hash: HashAlgorithm,
63 pub network: Arc<NetworkSender>,
64 pub stop: bool,
65}
66
67impl EvalWorker {
68 async fn execute_contract(
69 &self,
70 ctx: &mut ActorContext<Self>,
71 runner_data: EvaluateInfo,
72 is_owner: bool,
73 ) -> Result<RunnerResponse, ActorError> {
74 let runner_actor = ctx.create_child("runner", Runner).await?;
75
76 let response = runner_actor
77 .ask(RunnerMessage {
78 data: runner_data,
79 is_owner,
80 })
81 .await;
82
83 runner_actor.ask_stop().await?;
84
85 response
86 }
87
88 async fn compile_contracts(
89 &self,
90 ctx: &mut ActorContext<Self>,
91 ids: &[SchemaType],
92 schemas: BTreeMap<SchemaType, Schema>,
93 ) -> Result<Option<CompilerError>, ActorError> {
94 let contracts_path = if let Some(config) =
95 ctx.system().get_helper::<ConfigHelper>("config").await
96 {
97 config.contracts_path
98 } else {
99 return Err(ActorError::Helper {
100 name: "config".to_owned(),
101 reason: "Not found".to_owned(),
102 });
103 };
104
105 let compiler = ctx
106 .create_child("temp_compiler", TempCompiler::new(self.hash))
107 .await?;
108
109 for id in ids {
110 let Some(schema) = schemas.get(id) else {
111 return Err(ActorError::Functional { description: "There is a contract that requires compilation but its scheme could not be found".to_owned()});
112 };
113
114 let response = compiler
115 .ask(TempCompilerMessage::Compile {
116 contract_name: format!("{}_{}", self.governance_id, id),
117 contract: schema.contract.clone(),
118 initial_value: schema.initial_value.0.clone(),
119 contract_path: contracts_path
120 .join("contracts")
121 .join(format!("{}_temp_{}", self.governance_id, id)),
122 })
123 .await?;
124
125 if let CompilerResponse::Error(e) = response {
126 compiler.ask_stop().await?;
127 return Ok(Some(e));
128 }
129 }
130
131 compiler.ask_stop().await?;
132 Ok(None)
133 }
134
135 async fn check_governance(
136 &self,
137 gov_version: u64,
138 ) -> Result<bool, ActorError> {
139 match self.gov_version.cmp(&gov_version) {
140 std::cmp::Ordering::Less => {
141 warn!(
142 local_gov_version = self.gov_version,
143 request_gov_version = gov_version,
144 governance_id = %self.governance_id,
145 sender = %self.node_key,
146 "Received request with a higher governance version; ignoring request"
147 );
148 Err(ActorError::Functional {
149 description:
150 "Abort evaluation, request governance version is higher than local"
151 .to_owned(),
152 })
153 }
154 std::cmp::Ordering::Equal => {
155 Ok(false)
157 }
158 std::cmp::Ordering::Greater => Ok(true),
159 }
160 }
161
162 async fn evaluate(
163 &self,
164 ctx: &mut ActorContext<Self>,
165 evaluation_req: &EvaluationReq,
166 ) -> Result<RunnerResult, EvaluatorError> {
167 let runner_data = evaluation_req
168 .build_evaluate_info(&self.init_state, &self.context)?;
169
170 let response = self
172 .execute_contract(ctx, runner_data, evaluation_req.signer_is_owner)
173 .await
174 .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
175
176 let (result, compilations) = match response {
177 RunnerResponse::Ok {
178 result,
179 compilations,
180 } => (result, compilations),
181 RunnerResponse::Error(runner_error) => {
182 return Err(EvaluatorError::from(runner_error));
183 }
184 };
185
186 if self.init_state.is_none() && !compilations.is_empty() {
187 let governance_data = GovernanceData::try_from(
188 result.final_state.clone(),
189 )
190 .map_err(|e| {
191 let e = format!(
192 "can not convert GovernanceData from properties: {}",
193 e
194 );
195 EvaluatorError::InternalError(e)
196 })?;
197
198 if let Some(error) = self
199 .compile_contracts(ctx, &compilations, governance_data.schemas)
200 .await
201 .map_err(|e| EvaluatorError::InternalError(e.to_string()))?
202 {
203 return Err(EvaluatorError::from(error));
204 };
205 }
206
207 Ok(result)
208 }
209
210 fn generate_json_patch(
211 prev_state: &Value,
212 new_state: &Value,
213 ) -> Result<Value, EvaluatorError> {
214 let patch = diff(prev_state, new_state);
215 serde_json::to_value(patch).map_err(|e| {
216 EvaluatorError::InternalError(format!(
217 "Can not generate json patch {}",
218 e
219 ))
220 })
221 }
222
223 fn build_request_hashes(
224 &self,
225 evaluation_req: &Signed<EvaluationReq>,
226 ) -> Result<(DigestIdentifier, DigestIdentifier), EvaluatorError> {
227 let eval_req_hash = hash_borsh(&*self.hash.hasher(), evaluation_req)
228 .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
229
230 let EvaluationReq {
231 event_request,
232 governance_id,
233 sn,
234 gov_version,
235 namespace,
236 schema_id,
237 signer,
238 ..
239 } = evaluation_req.content().clone();
240
241 let req_subject_data_hash = hash_borsh(
242 &*self.hash.hasher(),
243 &RequestSubjectData {
244 namespace,
245 schema_id,
246 subject_id: event_request.content().get_subject_id(),
247 governance_id,
248 sn,
249 gov_version,
250 signer,
251 },
252 )
253 .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
254
255 Ok((eval_req_hash, req_subject_data_hash))
256 }
257
258 async fn sign_result(
259 &self,
260 ctx: &mut ActorContext<Self>,
261 result: EvaluationResult,
262 ) -> Result<EvaluationRes, EvaluatorError> {
263 let result_hash = hash_borsh(&*self.hash.hasher(), &result)
264 .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
265
266 let result_hash_signature = get_sign(
267 ctx,
268 SignTypesNode::EvaluationSignature(result_hash.clone()),
269 )
270 .await
271 .map_err(|e| EvaluatorError::InternalError(e.to_string()))?;
272
273 Ok(EvaluationRes::Response {
274 result,
275 result_hash,
276 result_hash_signature,
277 })
278 }
279
280 async fn build_response(
281 &self,
282 ctx: &mut ActorContext<Self>,
283 evaluation: RunnerResult,
284 evaluation_req: Signed<EvaluationReq>,
285 ) -> Result<EvaluationRes, EvaluatorError> {
286 let (eval_req_hash, req_subject_data_hash) =
287 self.build_request_hashes(&evaluation_req)?;
288
289 let (patch, properties_hash) = match &evaluation_req.content().data {
290 EvaluateData::GovFact { state, .. } => {
291 let properties_hash =
292 hash_borsh(&*self.hash.hasher(), &evaluation.final_state)
293 .map_err(|e| {
294 EvaluatorError::InternalError(e.to_string())
295 })?;
296
297 let state = state.to_value_wrapper();
298 let patch = Self::generate_json_patch(
299 &state.0,
300 &evaluation.final_state.0,
301 )?;
302
303 (ValueWrapper(patch), properties_hash)
304 }
305 EvaluateData::TrackerSchemasFact { state, .. } => {
306 let properties_hash =
307 hash_borsh(&*self.hash.hasher(), &evaluation.final_state)
308 .map_err(|e| {
309 EvaluatorError::InternalError(e.to_string())
310 })?;
311
312 let patch = Self::generate_json_patch(
313 &state.0,
314 &evaluation.final_state.0,
315 )?;
316
317 (ValueWrapper(patch), properties_hash)
318 }
319 EvaluateData::GovTransfer { state } => {
320 let state = state.to_value_wrapper();
321 let properties_hash = hash_borsh(&*self.hash.hasher(), &state)
322 .map_err(|e| {
323 EvaluatorError::InternalError(e.to_string())
324 })?;
325
326 (evaluation.final_state, properties_hash)
327 }
328 EvaluateData::GovConfirm { state } => {
329 let properties_hash =
330 hash_borsh(&*self.hash.hasher(), &evaluation.final_state)
331 .map_err(|e| {
332 EvaluatorError::InternalError(e.to_string())
333 })?;
334
335 let state = state.to_value_wrapper();
336 let patch = Self::generate_json_patch(
337 &state.0,
338 &evaluation.final_state.0,
339 )?;
340
341 (ValueWrapper(patch), properties_hash)
342 }
343 EvaluateData::TrackerSchemasTransfer { state, .. } => {
344 let properties_hash = hash_borsh(&*self.hash.hasher(), &state)
345 .map_err(|e| {
346 EvaluatorError::InternalError(e.to_string())
347 })?;
348
349 (evaluation.final_state, properties_hash)
350 }
351 };
352
353 let result = EvaluationResult::Ok {
354 response: EvalRes {
355 patch,
356 properties_hash,
357 appr_required: evaluation.approval_required,
358 },
359 eval_req_hash,
360 req_subject_data_hash,
361 };
362
363 self.sign_result(ctx, result).await
364 }
365
366 async fn build_response_error(
367 &self,
368 ctx: &mut ActorContext<Self>,
369 evaluator_error: EvaluatorError,
370 evaluation_req: Signed<EvaluationReq>,
371 ) -> Result<EvaluationRes, EvaluatorError> {
372 match &evaluator_error {
373 EvaluatorError::InvalidEventSignature
374 | EvaluatorError::InvalidEventRequest(..) => {
375 return Ok(EvaluationRes::Abort(evaluator_error.to_string()));
376 }
377 EvaluatorError::Runner(EvalRunnerError::ContractNotFound(..)) => {
378 return Ok(EvaluationRes::Reboot);
379 }
380 _ => {}
381 };
382
383 let (eval_req_hash, req_subject_data_hash) =
384 self.build_request_hashes(&evaluation_req)?;
385
386 let result = EvaluationResult::Error {
387 error: evaluator_error,
388 eval_req_hash,
389 req_subject_data_hash,
390 };
391
392 self.sign_result(ctx, result).await
393 }
394
395 async fn create_res(
396 &self,
397 ctx: &mut ActorContext<Self>,
398 reboot: bool,
399 evaluation_req: &Signed<EvaluationReq>,
400 ) -> Result<EvaluationRes, EvaluatorError> {
401 let evaluation = if reboot {
402 EvaluationRes::Reboot
403 } else {
404 match self.evaluate(ctx, evaluation_req.content()).await {
405 Ok(evaluation) => {
406 self.build_response(ctx, evaluation, evaluation_req.clone())
407 .await?
408 }
409 Err(error) => {
410 if let EvaluatorError::InternalError(..) = &error {
411 return Err(error);
412 } else {
413 self.build_response_error(
414 ctx,
415 error,
416 evaluation_req.clone(),
417 )
418 .await?
419 }
420 }
421 }
422 };
423
424 Ok(evaluation)
425 }
426
427 fn check_data(
428 &self,
429 evaluation_req: &Signed<EvaluationReq>,
430 ) -> Result<(), EvaluatorError> {
431 let event_is_for_gov = evaluation_req.content().data.is_gov_event();
432 match (self.init_state.is_none(), event_is_for_gov) {
433 (true, false) => return Err(EvaluatorError::InvalidEventRequest(
434 "Evaluator is for governance but eval request is for tracker"
435 .to_owned(),
436 )),
437 (false, true) => return Err(EvaluatorError::InvalidEventRequest(
438 "Evaluator is for tracker but eval request is for governance"
439 .to_owned(),
440 )),
441 _ => {}
442 };
443
444 if evaluation_req.content().governance_id != self.governance_id {
445 return Err(EvaluatorError::InvalidEventRequest(format!(
446 "Evaluator governance_id {} and eval request governance_id {} are different",
447 self.governance_id,
448 evaluation_req.content().governance_id
449 )));
450 }
451
452 if evaluation_req.verify().is_err() {
453 return Err(EvaluatorError::InvalidEventSignature);
454 }
455
456 if evaluation_req.content().event_request.verify().is_err() {
457 return Err(EvaluatorError::InvalidEventSignature);
458 }
459
460 if self.gov_version == evaluation_req.content().gov_version {
461 let signer = evaluation_req
462 .content()
463 .event_request
464 .signature()
465 .signer
466 .clone();
467
468 if evaluation_req
469 .content()
470 .event_request
471 .content()
472 .is_fact_event()
473 {
474 let Some((issuers, issuer_any)) = self.context.issuers()
475 else {
476 return Err(EvaluatorError::InvalidEventRequest(
477 "Fact event evaluation context does not include issuers"
478 .to_owned(),
479 ));
480 };
481
482 if !issuer_any && !issuers.contains(&signer) {
483 return Err(EvaluatorError::InvalidEventRequest(
484 "In fact events, the signer has to be an issuer"
485 .to_owned(),
486 ));
487 }
488 } else if signer != self.node_key {
489 return Err(EvaluatorError::InvalidEventRequest(
490 "In non-fact events, the signer has to be the sender"
491 .to_owned(),
492 ));
493 }
494 }
495
496 Ok(())
497 }
498}
499
500#[derive(Debug, Clone)]
501pub enum EvalWorkerMessage {
502 Update {
503 gov_version: u64,
504 issuers: BTreeSet<PublicKey>,
505 issuer_any: bool,
506 },
507 LocalEvaluation {
508 evaluation_req: Signed<EvaluationReq>,
509 },
510 NetworkRequest {
511 evaluation_req: Signed<EvaluationReq>,
512 sender: PublicKey,
513 info: ComunicateInfo,
514 },
515}
516
517impl Message for EvalWorkerMessage {}
518
519#[async_trait]
520impl Actor for EvalWorker {
521 type Event = ();
522 type Message = EvalWorkerMessage;
523 type Response = ();
524
525 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
526 parent_span.map_or_else(
527 || info_span!("EvalWorker", id),
528 |parent_span| info_span!(parent: parent_span, "EvalWorker", id),
529 )
530 }
531}
532
533impl NotPersistentActor for EvalWorker {}
534
535#[async_trait]
536impl Handler<Self> for EvalWorker {
537 async fn handle_message(
538 &mut self,
539 _sender: ActorPath,
540 msg: EvalWorkerMessage,
541 ctx: &mut ActorContext<Self>,
542 ) -> Result<(), ActorError> {
543 match msg {
544 EvalWorkerMessage::Update {
545 gov_version,
546 issuers,
547 issuer_any,
548 } => {
549 self.gov_version = gov_version;
550 self.context = EvalWorkerContext::Governance {
551 issuers,
552 issuer_any,
553 };
554 }
555 EvalWorkerMessage::LocalEvaluation { evaluation_req } => {
556 let evaluation =
557 match self.create_res(ctx, false, &evaluation_req).await {
558 Ok(eval) => eval,
559 Err(e) => {
560 error!(
561 msg_type = "LocalEvaluation",
562 error = %e,
563 "Failed to create evaluation response"
564 );
565 return Err(emit_fail(
566 ctx,
567 ActorError::FunctionalCritical {
568 description: e.to_string(),
569 },
570 )
571 .await);
572 }
573 };
574
575 match ctx.get_parent::<Evaluation>().await {
576 Ok(evaluation_actor) => {
577 if let Err(e) = evaluation_actor
578 .tell(EvaluationMessage::Response {
579 evaluation_res: evaluation.clone(),
580 sender: (*self.our_key).clone(),
581 })
582 .await
583 {
584 error!(
585 msg_type = "LocalEvaluation",
586 error = %e,
587 "Failed to send response to evaluation actor"
588 );
589 return Err(emit_fail(ctx, e).await);
590 }
591
592 debug!(
593 msg_type = "LocalEvaluation",
594 "Local evaluation completed successfully"
595 );
596 }
597 Err(e) => {
598 error!(
599 msg_type = "LocalEvaluation",
600 path = %ctx.path().parent(),
601 "Evaluation actor not found"
602 );
603 return Err(e);
604 }
605 }
606
607 ctx.stop(None).await;
608 }
609 EvalWorkerMessage::NetworkRequest {
610 evaluation_req,
611 info,
612 sender,
613 } => {
614 if sender != evaluation_req.signature().signer
615 || sender != self.node_key
616 {
617 warn!(
618 msg_type = "NetworkRequest",
619 expected_sender = %self.node_key,
620 received_sender = %sender,
621 signer = %evaluation_req.signature().signer,
622 "Unexpected sender"
623 );
624 if self.stop {
625 ctx.stop(None).await;
626 }
627
628 return Ok(());
629 }
630
631 let reboot = match self
632 .check_governance(evaluation_req.content().gov_version)
633 .await
634 {
635 Ok(reboot) => reboot,
636 Err(e) => {
637 warn!(
638 msg_type = "NetworkRequest",
639 error = %e,
640 "Failed to check governance"
641 );
642 if let ActorError::Functional { .. } = e {
643 if self.stop {
644 ctx.stop(None).await;
645 }
646 return Err(e);
647 } else {
648 return Err(emit_fail(ctx, e).await);
649 }
650 }
651 };
652
653 let evaluation = if let Err(error) =
654 self.check_data(&evaluation_req)
655 {
656 match self
657 .build_response_error(
658 ctx,
659 error,
660 evaluation_req.clone(),
661 )
662 .await
663 {
664 Ok(eval) => eval,
665 Err(e) => {
666 error!(
667 msg_type = "NetworkRequest",
668 error = %e,
669 "Failed to build error response"
670 );
671 return Err(emit_fail(
672 ctx,
673 ActorError::FunctionalCritical {
674 description: e.to_string(),
675 },
676 )
677 .await);
678 }
679 }
680 } else {
681 match self.create_res(ctx, reboot, &evaluation_req).await {
682 Ok(eval) => eval,
683 Err(e) => {
684 error!(
685 msg_type = "NetworkRequest",
686 error = %e,
687 "Internal error during evaluation"
688 );
689 return Err(emit_fail(
690 ctx,
691 ActorError::FunctionalCritical {
692 description: e.to_string(),
693 },
694 )
695 .await);
696 }
697 }
698 };
699
700 let new_info = ComunicateInfo {
701 receiver: sender.clone(),
702 request_id: info.request_id.clone(),
703 version: info.version,
704 receiver_actor: format!(
705 "/user/request/{}/evaluation/{}",
706 evaluation_req
707 .content()
708 .event_request
709 .content()
710 .get_subject_id(),
711 self.our_key.clone()
712 ),
713 };
714
715 if let Err(e) = self
716 .network
717 .send_command(ave_network::CommandHelper::SendMessage {
718 message: NetworkMessage {
719 info: new_info,
720 message: ActorMessage::EvaluationRes {
721 res: evaluation,
722 },
723 },
724 })
725 .await
726 {
727 error!(
728 msg_type = "NetworkRequest",
729 error = %e,
730 "Failed to send response to network"
731 );
732 return Err(emit_fail(ctx, e).await);
733 };
734
735 debug!(
736 msg_type = "NetworkRequest",
737 request_id = %info.request_id,
738 version = info.version,
739 sender = %sender,
740 "Network evaluation request processed successfully"
741 );
742
743 if self.stop {
744 ctx.stop(None).await;
745 }
746 }
747 }
748
749 Ok(())
750 }
751
752 async fn on_child_fault(
753 &mut self,
754 error: ActorError,
755 ctx: &mut ActorContext<Self>,
756 ) -> ChildAction {
757 error!(
758 governance_id = %self.governance_id,
759 gov_version = self.gov_version,
760 sn = self.sn,
761 node_key = %self.node_key,
762 error = %error,
763 "Child fault in evaluation worker"
764 );
765 emit_fail(ctx, error).await;
766 ChildAction::Stop
767 }
768}