1use std::{
2 collections::{BTreeMap, BTreeSet},
3 sync::Arc,
4};
5
6use async_trait::async_trait;
7use ave_actors::{
8 Actor, ActorContext, ActorError, ActorPath, Handler, Message,
9 NotPersistentActor,
10};
11use ave_common::{
12 Namespace, SchemaType, ValueWrapper,
13 identity::{DigestIdentifier, HashAlgorithm, PublicKey, Signed},
14};
15use network::ComunicateInfo;
16use tracing::{Span, debug, error, info_span, warn};
17
18use crate::{
19 evaluation::worker::{EvalWorker, EvalWorkerMessage},
20 helpers::network::service::NetworkSender,
21 metrics::try_core_metrics,
22 model::common::{emit_fail, node::try_to_update},
23};
24
25use super::request::EvaluationReq;
26
27#[derive(Clone, Debug)]
28pub struct EvaluationSchema {
29 pub our_key: Arc<PublicKey>,
30 pub governance_id: DigestIdentifier,
31 pub gov_version: u64,
32 pub schema_id: SchemaType,
33 pub sn: u64,
34 pub creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
35 pub init_state: ValueWrapper,
36 pub hash: HashAlgorithm,
37 pub network: Arc<NetworkSender>,
38}
39
40#[derive(Debug, Clone)]
41pub enum EvaluationSchemaMessage {
42 NetworkRequest {
43 evaluation_req: Box<Signed<EvaluationReq>>,
44 info: ComunicateInfo,
45 sender: PublicKey,
46 },
47 Update {
48 creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
49 sn: u64,
50 gov_version: u64,
51 init_state: ValueWrapper,
52 },
53}
54
55impl Message for EvaluationSchemaMessage {}
56
57impl NotPersistentActor for EvaluationSchema {}
58
59#[async_trait]
60impl Actor for EvaluationSchema {
61 type Event = ();
62 type Message = EvaluationSchemaMessage;
63 type Response = ();
64
65 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
66 parent_span.map_or_else(
67 || info_span!("EvaluationSchema", id),
68 |parent_span| info_span!(parent: parent_span, "EvaluationSchema", id),
69 )
70 }
71}
72
73#[async_trait]
74impl Handler<Self> for EvaluationSchema {
75 async fn handle_message(
76 &mut self,
77 _sender: ActorPath,
78 msg: EvaluationSchemaMessage,
79 ctx: &mut ActorContext<Self>,
80 ) -> Result<(), ActorError> {
81 match msg {
82 EvaluationSchemaMessage::NetworkRequest {
83 evaluation_req,
84 info,
85 sender,
86 } => {
87 let observe = |result: &'static str| {
88 if let Some(metrics) = try_core_metrics() {
89 metrics
90 .observe_schema_event("evaluation_schema", result);
91 }
92 };
93 if sender != evaluation_req.signature().signer {
94 observe("rejected");
95 warn!(
96 msg_type = "NetworkRequest",
97 sender = %sender,
98 signer = %evaluation_req.signature().signer,
99 "Signer and sender are not the same"
100 );
101 return Ok(());
102 }
103
104 if self.governance_id != evaluation_req.content().governance_id
105 {
106 observe("rejected");
107 warn!(
108 msg_type = "NetworkRequest",
109 expected_governance_id = %self.governance_id,
110 received_governance_id = %evaluation_req.content().governance_id,
111 "Invalid governance_id"
112 );
113 return Ok(());
114 }
115
116 if self.schema_id != evaluation_req.content().schema_id {
117 observe("rejected");
118 warn!(
119 msg_type = "NetworkRequest",
120 expected_schema_id = ?self.schema_id,
121 received_schema_id = ?evaluation_req.content().schema_id,
122 "Invalid schema_id"
123 );
124 return Ok(());
125 }
126
127 if let Some(ns) = self.creators.get(&sender) {
128 if !ns.contains(&evaluation_req.content().namespace) {
129 observe("rejected");
130 warn!(
131 msg_type = "NetworkRequest",
132 sender = %sender,
133 namespace = ?evaluation_req.content().namespace,
134 "Invalid sender namespace"
135 );
136 return Ok(());
137 }
138 } else {
139 observe("rejected");
140 warn!(
141 msg_type = "NetworkRequest",
142 sender = %sender,
143 "Sender is not a creator"
144 );
145 return Ok(());
146 }
147
148 if self.gov_version < evaluation_req.content().gov_version
149 && let Err(e) =
150 try_to_update(ctx, self.governance_id.clone(), None)
151 .await
152 {
153 error!(
154 msg_type = "NetworkRequest",
155 error = %e,
156 "Failed to update governance"
157 );
158 return Err(emit_fail(ctx, e).await);
159 }
160
161 let child = ctx
162 .create_child(
163 &format!("{}", evaluation_req.signature().signer),
164 EvalWorker {
165 node_key: sender.clone(),
166 our_key: self.our_key.clone(),
167 init_state: Some(self.init_state.clone()),
168 governance_id: self.governance_id.clone(),
169 gov_version: self.gov_version,
170 sn: self.sn,
171 hash: self.hash,
172 network: self.network.clone(),
173 stop: true,
174 },
175 )
176 .await;
177
178 let evaluator_actor = match child {
179 Ok(child) => child,
180 Err(e) => {
181 if let ActorError::Exists { .. } = e {
182 warn!(
183 msg_type = "NetworkRequest",
184 error = %e,
185 "Evaluator actor already exists"
186 );
187 observe("rejected");
188 return Ok(());
189 } else {
190 error!(
191 msg_type = "NetworkRequest",
192 error = %e,
193 "Failed to create evaluator actor"
194 );
195 return Err(emit_fail(ctx, e).await);
196 }
197 }
198 };
199
200 if let Err(e) = evaluator_actor
201 .tell(EvalWorkerMessage::NetworkRequest {
202 evaluation_req: *evaluation_req,
203 info,
204 sender: sender.clone(),
205 })
206 .await
207 {
208 warn!(
209 msg_type = "NetworkRequest",
210 error = %e,
211 "Failed to send request to evaluator"
212 );
213 } else {
214 observe("delegated");
215 debug!(
216 msg_type = "NetworkRequest",
217 sender = %sender,
218 "Evaluation request delegated to worker"
219 );
220 }
221 }
222 EvaluationSchemaMessage::Update {
223 creators,
224 sn,
225 gov_version,
226 init_state,
227 } => {
228 if let Some(metrics) = try_core_metrics() {
229 metrics.observe_schema_event("evaluation_schema", "update");
230 }
231 self.creators = creators;
232 self.gov_version = gov_version;
233 self.sn = sn;
234 self.init_state = init_state;
235
236 debug!(
237 msg_type = "Update",
238 sn = self.sn,
239 gov_version = self.gov_version,
240 "Schema updated successfully"
241 );
242 }
243 };
244 Ok(())
245 }
246
247 async fn on_child_fault(
248 &mut self,
249 error: ActorError,
250 ctx: &mut ActorContext<Self>,
251 ) -> ave_actors::ChildAction {
252 if let Some(metrics) = try_core_metrics() {
253 metrics.observe_schema_event("evaluation_schema", "child_fault");
254 }
255 error!(
256 governance_id = %self.governance_id,
257 schema_id = ?self.schema_id,
258 gov_version = self.gov_version,
259 sn = self.sn,
260 error = %error,
261 "Child fault in evaluation schema actor"
262 );
263 emit_fail(ctx, error).await;
264 ave_actors::ChildAction::Stop
265 }
266}