1use std::{
2 collections::{BTreeMap, BTreeSet},
3 sync::Arc,
4};
5
6use async_trait::async_trait;
7use ave_actors::{
8 Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
9 NotPersistentActor,
10};
11use ave_common::{
12 Namespace, SchemaType, ValueWrapper,
13 identity::{DigestIdentifier, HashAlgorithm, PublicKey},
14};
15use ave_network::ComunicateInfo;
16use tracing::{Span, debug, error, info_span, warn};
17
18use crate::{
19 Signed,
20 governance::role_register::CurrentSchemaRoles,
21 helpers::network::service::NetworkSender,
22 metrics::try_core_metrics,
23 model::common::emit_fail,
24 validation::worker::{CurrentWorkerRoles, ValiWorker, ValiWorkerMessage},
25};
26
27use super::request::ValidationReq;
28
29#[derive(Clone, Debug)]
30pub struct ValidationSchema {
31 pub our_key: Arc<PublicKey>,
32 pub governance_id: DigestIdentifier,
33 pub gov_version: u64,
34 pub sn: u64,
35 pub schema_id: SchemaType,
36 pub creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
37 pub init_state: ValueWrapper,
38 pub current_roles: CurrentSchemaRoles,
39 pub hash: HashAlgorithm,
40 pub network: Arc<NetworkSender>,
41}
42
43#[derive(Debug, Clone)]
44pub enum ValidationSchemaMessage {
45 NetworkRequest {
46 validation_req: Box<Signed<ValidationReq>>,
47 info: ComunicateInfo,
48 sender: PublicKey,
49 },
50 Update {
51 creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
52 sn: u64,
53 gov_version: u64,
54 init_state: ValueWrapper,
55 current_roles: CurrentSchemaRoles,
56 },
57}
58
59impl Message for ValidationSchemaMessage {}
60
61impl NotPersistentActor for ValidationSchema {}
62
63#[async_trait]
64impl Actor for ValidationSchema {
65 type Event = ();
66 type Message = ValidationSchemaMessage;
67 type Response = ();
68
69 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
70 parent_span.map_or_else(
71 || info_span!("ValidationSchema", id),
72 |parent_span| info_span!(parent: parent_span, "ValidationSchema", id),
73 )
74 }
75}
76
77#[async_trait]
78impl Handler<Self> for ValidationSchema {
79 async fn handle_message(
80 &mut self,
81 _sender: ActorPath,
82 msg: ValidationSchemaMessage,
83 ctx: &mut ActorContext<Self>,
84 ) -> Result<(), ActorError> {
85 match msg {
86 ValidationSchemaMessage::NetworkRequest {
87 validation_req,
88 info,
89 sender,
90 } => {
91 let observe = |result: &'static str| {
92 if let Some(metrics) = try_core_metrics() {
93 metrics
94 .observe_schema_event("validation_schema", result);
95 }
96 };
97 if sender != validation_req.signature().signer {
98 observe("rejected");
99 warn!(
100 msg_type = "NetworkRequest",
101 sender = %sender,
102 signer = %validation_req.signature().signer,
103 "Signer and sender are not the same"
104 );
105 return Ok(());
106 }
107
108 let governance_id =
109 match validation_req.content().get_governance_id() {
110 Ok(governance_id) => governance_id,
111 Err(e) => {
112 observe("rejected");
113 warn!(
114 msg_type = "NetworkRequest",
115 error = %e,
116 "Failed to get governance_id"
117 );
118 return Ok(());
119 }
120 };
121
122 if self.governance_id != governance_id {
123 observe("rejected");
124 warn!(
125 msg_type = "NetworkRequest",
126 expected_governance_id = %self.governance_id,
127 received_governance_id = %governance_id,
128 "Invalid governance_id"
129 );
130 return Ok(());
131 }
132
133 let schema_id = match validation_req.content().get_schema_id() {
134 Ok(schema_id) => schema_id,
135 Err(e) => {
136 observe("rejected");
137 warn!(
138 msg_type = "NetworkRequest",
139 error = %e,
140 "Failed to get schema_id"
141 );
142 return Ok(());
143 }
144 };
145
146 if self.schema_id != schema_id {
147 observe("rejected");
148 warn!(
149 msg_type = "NetworkRequest",
150 expected_schema_id = ?self.schema_id,
151 received_schema_id = ?schema_id,
152 "Invalid schema_id"
153 );
154 return Ok(());
155 }
156
157 if let Some(ns) = self.creators.get(&sender) {
158 let namespace =
159 match validation_req.content().get_namespace() {
160 Ok(namespace) => namespace,
161 Err(e) => {
162 observe("rejected");
163 warn!(
164 msg_type = "NetworkRequest",
165 error = %e,
166 "Failed to get namespace"
167 );
168 return Ok(());
169 }
170 };
171 if !ns.contains(&namespace) {
172 observe("rejected");
173 warn!(
174 msg_type = "NetworkRequest",
175 sender = %sender,
176 namespace = ?namespace,
177 "Invalid sender namespace"
178 );
179 return Ok(());
180 }
181 } else {
182 observe("rejected");
183 warn!(
184 msg_type = "NetworkRequest",
185 sender = %sender,
186 "Sender is not a creator"
187 );
188 return Ok(());
189 }
190
191 if self.gov_version < validation_req.content().get_gov_version()
192 {
193 observe("rejected");
194 warn!(
195 msg_type = "NetworkRequest",
196 local_gov_version = self.gov_version,
197 request_gov_version = validation_req.content().get_gov_version(),
198 governance_id = %self.governance_id,
199 sender = %sender,
200 "Ignoring request with newer governance version; service nodes must update governance through resilience protocols"
201 );
202 return Ok(());
203 }
204
205 let child = ctx
206 .create_child(
207 &format!("{}", validation_req.signature().signer),
208 ValiWorker {
209 init_state: Some(self.init_state.clone()),
210 node_key: sender.clone(),
211 our_key: self.our_key.clone(),
212 governance_id: self.governance_id.clone(),
213 gov_version: self.gov_version,
214 sn: self.sn,
215 current_roles: CurrentWorkerRoles {
216 evaluation: crate::governance::role_register::RoleDataRegister {
217 workers: self
218 .current_roles
219 .evaluation
220 .iter()
221 .filter(|role| role.namespace.is_ancestor_or_equal_of(&validation_req.content().get_namespace().unwrap_or_default()))
222 .map(|role| role.key.clone())
223 .collect(),
224 quorum: self.current_roles.evaluation_quorum.clone(),
225 },
226 approval: crate::governance::role_register::RoleDataRegister {
227 workers: std::collections::HashSet::new(),
228 quorum: crate::governance::model::Quorum::default(),
229 },
230 },
231 hash: self.hash,
232 network: self.network.clone(),
233 stop: true,
234 },
235 )
236 .await;
237
238 let validator_actor = match child {
239 Ok(child) => child,
240 Err(e) => {
241 if let ActorError::Exists { .. } = e {
242 observe("rejected");
243 warn!(
244 msg_type = "NetworkRequest",
245 error = %e,
246 "Validator actor already exists"
247 );
248 return Ok(());
249 } else {
250 error!(
251 msg_type = "NetworkRequest",
252 error = %e,
253 "Failed to create validator actor"
254 );
255 return Err(emit_fail(ctx, e).await);
256 }
257 }
258 };
259
260 if let Err(e) = validator_actor
261 .tell(ValiWorkerMessage::NetworkRequest {
262 validation_req,
263 info,
264 sender: sender.clone(),
265 })
266 .await
267 {
268 warn!(
269 msg_type = "NetworkRequest",
270 error = %e,
271 "Failed to send request to validator"
272 );
273 } else {
274 observe("delegated");
275 debug!(
276 msg_type = "NetworkRequest",
277 sender = %sender,
278 "Validation request delegated to worker"
279 );
280 }
281 }
282 ValidationSchemaMessage::Update {
283 creators,
284 sn,
285 gov_version,
286 init_state,
287 current_roles,
288 } => {
289 if let Some(metrics) = try_core_metrics() {
290 metrics.observe_schema_event("validation_schema", "update");
291 }
292 self.creators = creators;
293 self.gov_version = gov_version;
294 self.sn = sn;
295 self.init_state = init_state;
296 self.current_roles = current_roles;
297
298 debug!(
299 msg_type = "Update",
300 sn = self.sn,
301 gov_version = self.gov_version,
302 "Schema updated successfully"
303 );
304 }
305 };
306 Ok(())
307 }
308
309 async fn on_child_fault(
310 &mut self,
311 error: ActorError,
312 ctx: &mut ActorContext<Self>,
313 ) -> ChildAction {
314 if let Some(metrics) = try_core_metrics() {
315 metrics.observe_schema_event("validation_schema", "child_fault");
316 }
317 error!(
318 governance_id = %self.governance_id,
319 schema_id = ?self.schema_id,
320 gov_version = self.gov_version,
321 error = %error,
322 "Child fault in validation schema"
323 );
324 emit_fail(ctx, error).await;
325 ChildAction::Stop
326 }
327}