1use std::{
2 collections::{BTreeMap, BTreeSet},
3 sync::Arc,
4};
5
6use async_trait::async_trait;
7use ave_actors::{
8 Actor, ActorContext, ActorError, ActorPath, Handler, Message,
9 NotPersistentActor,
10};
11use ave_common::{
12 Namespace, SchemaType, ValueWrapper,
13 identity::{DigestIdentifier, HashAlgorithm, PublicKey, Signed},
14 request::EventRequest,
15};
16use ave_network::ComunicateInfo;
17use tracing::{Span, debug, error, info_span, warn};
18
19use crate::{
20 evaluation::worker::{EvalWorker, EvalWorkerMessage},
21 helpers::network::service::NetworkSender,
22 metrics::try_core_metrics,
23 model::common::emit_fail,
24};
25
26use super::request::{EvalWorkerContext, EvaluationReq};
27
28#[derive(Clone, Debug)]
29pub struct EvaluationSchema {
30 pub our_key: Arc<PublicKey>,
31 pub governance_id: DigestIdentifier,
32 pub gov_version: u64,
33 pub schema_id: SchemaType,
34 pub sn: u64,
35 pub members: BTreeSet<PublicKey>,
36 pub creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
37 pub issuers: BTreeMap<PublicKey, BTreeSet<Namespace>>,
38 pub issuer_any: bool,
39 pub schema_viewpoints: BTreeSet<String>,
40 pub init_state: ValueWrapper,
41 pub hash: HashAlgorithm,
42 pub network: Arc<NetworkSender>,
43}
44
45#[derive(Debug, Clone)]
46pub enum EvaluationSchemaMessage {
47 NetworkRequest {
48 evaluation_req: Box<Signed<EvaluationReq>>,
49 info: ComunicateInfo,
50 sender: PublicKey,
51 },
52 Update {
53 members: BTreeSet<PublicKey>,
54 creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
55 issuers: BTreeMap<PublicKey, BTreeSet<Namespace>>,
56 issuer_any: bool,
57 schema_viewpoints: BTreeSet<String>,
58 sn: u64,
59 gov_version: u64,
60 init_state: ValueWrapper,
61 },
62}
63
64impl Message for EvaluationSchemaMessage {}
65
66impl NotPersistentActor for EvaluationSchema {}
67
68impl EvaluationSchema {
69 fn context_for_request(
70 &self,
71 evaluation_req: &EvaluationReq,
72 ) -> EvalWorkerContext {
73 match evaluation_req.event_request.content() {
74 EventRequest::Fact(_) => EvalWorkerContext::TrackerFact {
75 issuers: self
76 .issuers
77 .iter()
78 .filter(|(_, namespaces)| {
79 namespaces.iter().any(|issuer_namespace| {
80 issuer_namespace
81 .is_ancestor_or_equal_of(&evaluation_req.namespace)
82 })
83 })
84 .map(|(issuer, _)| issuer.clone())
85 .collect(),
86 issuer_any: self.issuer_any,
87 schema_viewpoints: self.schema_viewpoints.clone(),
88 },
89 EventRequest::Transfer(_) => EvalWorkerContext::TrackerTransfer {
90 members: self.members.clone(),
91 creators: self.creators.clone(),
92 },
93 _ => EvalWorkerContext::Empty,
94 }
95 }
96}
97
98#[async_trait]
99impl Actor for EvaluationSchema {
100 type Event = ();
101 type Message = EvaluationSchemaMessage;
102 type Response = ();
103
104 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
105 parent_span.map_or_else(
106 || info_span!("EvaluationSchema", id),
107 |parent_span| info_span!(parent: parent_span, "EvaluationSchema", id),
108 )
109 }
110}
111
112#[async_trait]
113impl Handler<Self> for EvaluationSchema {
114 async fn handle_message(
115 &mut self,
116 _sender: ActorPath,
117 msg: EvaluationSchemaMessage,
118 ctx: &mut ActorContext<Self>,
119 ) -> Result<(), ActorError> {
120 match msg {
121 EvaluationSchemaMessage::NetworkRequest {
122 evaluation_req,
123 info,
124 sender,
125 } => {
126 let observe = |result: &'static str| {
127 if let Some(metrics) = try_core_metrics() {
128 metrics
129 .observe_schema_event("evaluation_schema", result);
130 }
131 };
132 if sender != evaluation_req.signature().signer {
133 observe("rejected");
134 warn!(
135 msg_type = "NetworkRequest",
136 sender = %sender,
137 signer = %evaluation_req.signature().signer,
138 "Signer and sender are not the same"
139 );
140 return Ok(());
141 }
142
143 if self.governance_id != evaluation_req.content().governance_id
144 {
145 observe("rejected");
146 warn!(
147 msg_type = "NetworkRequest",
148 expected_governance_id = %self.governance_id,
149 received_governance_id = %evaluation_req.content().governance_id,
150 "Invalid governance_id"
151 );
152 return Ok(());
153 }
154
155 if self.schema_id != evaluation_req.content().schema_id {
156 observe("rejected");
157 warn!(
158 msg_type = "NetworkRequest",
159 expected_schema_id = ?self.schema_id,
160 received_schema_id = ?evaluation_req.content().schema_id,
161 "Invalid schema_id"
162 );
163 return Ok(());
164 }
165
166 if let Some(ns) = self.creators.get(&sender) {
167 if !ns.contains(&evaluation_req.content().namespace) {
168 observe("rejected");
169 warn!(
170 msg_type = "NetworkRequest",
171 sender = %sender,
172 namespace = ?evaluation_req.content().namespace,
173 "Invalid sender namespace"
174 );
175 return Ok(());
176 }
177 } else {
178 observe("rejected");
179 warn!(
180 msg_type = "NetworkRequest",
181 sender = %sender,
182 "Sender is not a creator"
183 );
184 return Ok(());
185 }
186
187 if self.gov_version < evaluation_req.content().gov_version {
188 observe("rejected");
189 warn!(
190 msg_type = "NetworkRequest",
191 local_gov_version = self.gov_version,
192 request_gov_version = evaluation_req.content().gov_version,
193 governance_id = %self.governance_id,
194 sender = %sender,
195 "Ignoring request with newer governance version; service nodes must update governance through resilience protocols"
196 );
197 return Ok(());
198 }
199
200 let child = ctx
201 .create_child(
202 &format!("{}", evaluation_req.signature().signer),
203 EvalWorker {
204 node_key: sender.clone(),
205 our_key: self.our_key.clone(),
206 init_state: Some(self.init_state.clone()),
207 governance_id: self.governance_id.clone(),
208 gov_version: self.gov_version,
209 sn: self.sn,
210 context: self
211 .context_for_request(evaluation_req.content()),
212 hash: self.hash,
213 network: self.network.clone(),
214 stop: true,
215 },
216 )
217 .await;
218
219 let evaluator_actor = match child {
220 Ok(child) => child,
221 Err(e) => {
222 if let ActorError::Exists { .. } = e {
223 warn!(
224 msg_type = "NetworkRequest",
225 error = %e,
226 "Evaluator actor already exists"
227 );
228 observe("rejected");
229 return Ok(());
230 } else {
231 error!(
232 msg_type = "NetworkRequest",
233 error = %e,
234 "Failed to create evaluator actor"
235 );
236 return Err(emit_fail(ctx, e).await);
237 }
238 }
239 };
240
241 if let Err(e) = evaluator_actor
242 .tell(EvalWorkerMessage::NetworkRequest {
243 evaluation_req: *evaluation_req,
244 info,
245 sender: sender.clone(),
246 })
247 .await
248 {
249 warn!(
250 msg_type = "NetworkRequest",
251 error = %e,
252 "Failed to send request to evaluator"
253 );
254 } else {
255 observe("delegated");
256 debug!(
257 msg_type = "NetworkRequest",
258 sender = %sender,
259 "Evaluation request delegated to worker"
260 );
261 }
262 }
263 EvaluationSchemaMessage::Update {
264 members,
265 creators,
266 issuers,
267 issuer_any,
268 schema_viewpoints,
269 sn,
270 gov_version,
271 init_state,
272 } => {
273 if let Some(metrics) = try_core_metrics() {
274 metrics.observe_schema_event("evaluation_schema", "update");
275 }
276 self.members = members;
277 self.creators = creators;
278 self.issuers = issuers;
279 self.issuer_any = issuer_any;
280 self.schema_viewpoints = schema_viewpoints;
281 self.gov_version = gov_version;
282 self.sn = sn;
283 self.init_state = init_state;
284
285 debug!(
286 msg_type = "Update",
287 sn = self.sn,
288 gov_version = self.gov_version,
289 "Schema updated successfully"
290 );
291 }
292 };
293 Ok(())
294 }
295
296 async fn on_child_fault(
297 &mut self,
298 error: ActorError,
299 ctx: &mut ActorContext<Self>,
300 ) -> ave_actors::ChildAction {
301 if let Some(metrics) = try_core_metrics() {
302 metrics.observe_schema_event("evaluation_schema", "child_fault");
303 }
304 error!(
305 governance_id = %self.governance_id,
306 schema_id = ?self.schema_id,
307 gov_version = self.gov_version,
308 sn = self.sn,
309 error = %error,
310 "Child fault in evaluation schema actor"
311 );
312 emit_fail(ctx, error).await;
313 ave_actors::ChildAction::Stop
314 }
315}