1use std::{
2 collections::{BTreeMap, BTreeSet},
3 sync::Arc,
4};
5
6use async_trait::async_trait;
7use ave_actors::{
8 Actor, ActorContext, ActorError, ActorPath, Handler, Message,
9 NotPersistentActor,
10};
11use ave_common::{
12 Namespace, SchemaType, ValueWrapper,
13 identity::{DigestIdentifier, HashAlgorithm, PublicKey, Signed},
14 request::EventRequest,
15};
16use ave_network::ComunicateInfo;
17use tracing::{Span, debug, error, info_span, warn};
18
19use crate::{
20 evaluation::worker::{EvalWorker, EvalWorkerMessage},
21 helpers::network::service::NetworkSender,
22 metrics::try_core_metrics,
23 model::common::emit_fail,
24};
25
26use super::request::{EvalWorkerContext, EvaluationReq};
27
28#[derive(Clone, Debug)]
29pub struct EvaluationSchema {
30 pub our_key: Arc<PublicKey>,
31 pub governance_id: DigestIdentifier,
32 pub gov_version: u64,
33 pub schema_id: SchemaType,
34 pub sn: u64,
35 pub members: BTreeSet<PublicKey>,
36 pub creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
37 pub issuers: BTreeMap<PublicKey, BTreeSet<Namespace>>,
38 pub issuer_any: bool,
39 pub schema_viewpoints: BTreeSet<String>,
40 pub init_state: ValueWrapper,
41 pub hash: HashAlgorithm,
42 pub network: Arc<NetworkSender>,
43}
44
45#[derive(Debug, Clone)]
46pub enum EvaluationSchemaMessage {
47 NetworkRequest {
48 evaluation_req: Box<Signed<EvaluationReq>>,
49 info: ComunicateInfo,
50 sender: PublicKey,
51 },
52 Update {
53 members: BTreeSet<PublicKey>,
54 creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
55 issuers: BTreeMap<PublicKey, BTreeSet<Namespace>>,
56 issuer_any: bool,
57 schema_viewpoints: BTreeSet<String>,
58 sn: u64,
59 gov_version: u64,
60 init_state: ValueWrapper,
61 },
62}
63
64impl Message for EvaluationSchemaMessage {}
65
66impl NotPersistentActor for EvaluationSchema {}
67
68impl EvaluationSchema {
69 fn context_for_request(
70 &self,
71 evaluation_req: &EvaluationReq,
72 ) -> EvalWorkerContext {
73 match evaluation_req.event_request.content() {
74 EventRequest::Fact(_) => EvalWorkerContext::TrackerFact {
75 issuers: self
76 .issuers
77 .iter()
78 .filter(|(_, namespaces)| {
79 namespaces.iter().any(|issuer_namespace| {
80 issuer_namespace.is_ancestor_or_equal_of(
81 &evaluation_req.namespace,
82 )
83 })
84 })
85 .map(|(issuer, _)| issuer.clone())
86 .collect(),
87 issuer_any: self.issuer_any,
88 schema_viewpoints: self.schema_viewpoints.clone(),
89 },
90 EventRequest::Transfer(_) => EvalWorkerContext::TrackerTransfer {
91 members: self.members.clone(),
92 creators: self.creators.clone(),
93 },
94 _ => EvalWorkerContext::Empty,
95 }
96 }
97}
98
99#[async_trait]
100impl Actor for EvaluationSchema {
101 type Event = ();
102 type Message = EvaluationSchemaMessage;
103 type Response = ();
104
105 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
106 parent_span.map_or_else(
107 || info_span!("EvaluationSchema", id),
108 |parent_span| info_span!(parent: parent_span, "EvaluationSchema", id),
109 )
110 }
111}
112
113#[async_trait]
114impl Handler<Self> for EvaluationSchema {
115 async fn handle_message(
116 &mut self,
117 _sender: ActorPath,
118 msg: EvaluationSchemaMessage,
119 ctx: &mut ActorContext<Self>,
120 ) -> Result<(), ActorError> {
121 match msg {
122 EvaluationSchemaMessage::NetworkRequest {
123 evaluation_req,
124 info,
125 sender,
126 } => {
127 let observe = |result: &'static str| {
128 if let Some(metrics) = try_core_metrics() {
129 metrics
130 .observe_schema_event("evaluation_schema", result);
131 }
132 };
133 if sender != evaluation_req.signature().signer {
134 observe("rejected");
135 warn!(
136 msg_type = "NetworkRequest",
137 sender = %sender,
138 signer = %evaluation_req.signature().signer,
139 "Signer and sender are not the same"
140 );
141 return Ok(());
142 }
143
144 if self.governance_id != evaluation_req.content().governance_id
145 {
146 observe("rejected");
147 warn!(
148 msg_type = "NetworkRequest",
149 expected_governance_id = %self.governance_id,
150 received_governance_id = %evaluation_req.content().governance_id,
151 "Invalid governance_id"
152 );
153 return Ok(());
154 }
155
156 if self.schema_id != evaluation_req.content().schema_id {
157 observe("rejected");
158 warn!(
159 msg_type = "NetworkRequest",
160 expected_schema_id = ?self.schema_id,
161 received_schema_id = ?evaluation_req.content().schema_id,
162 "Invalid schema_id"
163 );
164 return Ok(());
165 }
166
167 if let Some(ns) = self.creators.get(&sender) {
168 if !ns.contains(&evaluation_req.content().namespace) {
169 observe("rejected");
170 warn!(
171 msg_type = "NetworkRequest",
172 sender = %sender,
173 namespace = ?evaluation_req.content().namespace,
174 "Invalid sender namespace"
175 );
176 return Ok(());
177 }
178 } else {
179 observe("rejected");
180 warn!(
181 msg_type = "NetworkRequest",
182 sender = %sender,
183 "Sender is not a creator"
184 );
185 return Ok(());
186 }
187
188 if self.gov_version < evaluation_req.content().gov_version {
189 observe("rejected");
190 warn!(
191 msg_type = "NetworkRequest",
192 local_gov_version = self.gov_version,
193 request_gov_version = evaluation_req.content().gov_version,
194 governance_id = %self.governance_id,
195 sender = %sender,
196 "Ignoring request with newer governance version; service nodes must update governance through resilience protocols"
197 );
198 return Ok(());
199 }
200
201 let child = ctx
202 .create_child(
203 &format!("{}", evaluation_req.signature().signer),
204 EvalWorker {
205 node_key: sender.clone(),
206 our_key: self.our_key.clone(),
207 init_state: Some(self.init_state.clone()),
208 governance_id: self.governance_id.clone(),
209 gov_version: self.gov_version,
210 sn: self.sn,
211 context: self
212 .context_for_request(evaluation_req.content()),
213 hash: self.hash,
214 network: self.network.clone(),
215 stop: true,
216 },
217 )
218 .await;
219
220 let evaluator_actor = match child {
221 Ok(child) => child,
222 Err(e) => {
223 if let ActorError::Exists { .. } = e {
224 warn!(
225 msg_type = "NetworkRequest",
226 error = %e,
227 "Evaluator actor already exists"
228 );
229 observe("rejected");
230 return Ok(());
231 } else {
232 error!(
233 msg_type = "NetworkRequest",
234 error = %e,
235 "Failed to create evaluator actor"
236 );
237 return Err(emit_fail(ctx, e).await);
238 }
239 }
240 };
241
242 if let Err(e) = evaluator_actor
243 .tell(EvalWorkerMessage::NetworkRequest {
244 evaluation_req: *evaluation_req,
245 info,
246 sender: sender.clone(),
247 })
248 .await
249 {
250 warn!(
251 msg_type = "NetworkRequest",
252 error = %e,
253 "Failed to send request to evaluator"
254 );
255 } else {
256 observe("delegated");
257 debug!(
258 msg_type = "NetworkRequest",
259 sender = %sender,
260 "Evaluation request delegated to worker"
261 );
262 }
263 }
264 EvaluationSchemaMessage::Update {
265 members,
266 creators,
267 issuers,
268 issuer_any,
269 schema_viewpoints,
270 sn,
271 gov_version,
272 init_state,
273 } => {
274 if let Some(metrics) = try_core_metrics() {
275 metrics.observe_schema_event("evaluation_schema", "update");
276 }
277 self.members = members;
278 self.creators = creators;
279 self.issuers = issuers;
280 self.issuer_any = issuer_any;
281 self.schema_viewpoints = schema_viewpoints;
282 self.gov_version = gov_version;
283 self.sn = sn;
284 self.init_state = init_state;
285
286 debug!(
287 msg_type = "Update",
288 sn = self.sn,
289 gov_version = self.gov_version,
290 "Schema updated successfully"
291 );
292 }
293 };
294 Ok(())
295 }
296
297 async fn on_child_fault(
298 &mut self,
299 error: ActorError,
300 ctx: &mut ActorContext<Self>,
301 ) -> ave_actors::ChildAction {
302 if let Some(metrics) = try_core_metrics() {
303 metrics.observe_schema_event("evaluation_schema", "child_fault");
304 }
305 error!(
306 governance_id = %self.governance_id,
307 schema_id = ?self.schema_id,
308 gov_version = self.gov_version,
309 sn = self.sn,
310 error = %error,
311 "Child fault in evaluation schema actor"
312 );
313 emit_fail(ctx, error).await;
314 ave_actors::ChildAction::Stop
315 }
316}