1use std::{
2 collections::{BTreeMap, BTreeSet},
3 sync::Arc,
4};
5
6use async_trait::async_trait;
7use ave_actors::{
8 Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
9 NotPersistentActor,
10};
11use ave_common::{
12 Namespace, SchemaType, ValueWrapper,
13 identity::{DigestIdentifier, HashAlgorithm, PublicKey},
14};
15use network::ComunicateInfo;
16use tracing::{Span, debug, error, info_span, warn};
17
18use crate::{
19 Signed,
20 governance::role_register::CurrentSchemaRoles,
21 helpers::network::service::NetworkSender,
22 metrics::try_core_metrics,
23 model::common::{emit_fail, node::try_to_update},
24 validation::worker::{CurrentWorkerRoles, ValiWorker, ValiWorkerMessage},
25};
26
27use super::request::ValidationReq;
28
29#[derive(Clone, Debug)]
30pub struct ValidationSchema {
31 pub our_key: Arc<PublicKey>,
32 pub governance_id: DigestIdentifier,
33 pub gov_version: u64,
34 pub sn: u64,
35 pub schema_id: SchemaType,
36 pub creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
37 pub init_state: ValueWrapper,
38 pub current_roles: CurrentSchemaRoles,
39 pub hash: HashAlgorithm,
40 pub network: Arc<NetworkSender>,
41}
42
43#[derive(Debug, Clone)]
44pub enum ValidationSchemaMessage {
45 NetworkRequest {
46 validation_req: Box<Signed<ValidationReq>>,
47 info: ComunicateInfo,
48 sender: PublicKey,
49 },
50 Update {
51 creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
52 sn: u64,
53 gov_version: u64,
54 init_state: ValueWrapper,
55 current_roles: CurrentSchemaRoles,
56 },
57}
58
59impl Message for ValidationSchemaMessage {}
60
61impl NotPersistentActor for ValidationSchema {}
62
63#[async_trait]
64impl Actor for ValidationSchema {
65 type Event = ();
66 type Message = ValidationSchemaMessage;
67 type Response = ();
68
69 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
70 parent_span.map_or_else(
71 || info_span!("ValidationSchema", id),
72 |parent_span| info_span!(parent: parent_span, "ValidationSchema", id),
73 )
74 }
75}
76
77#[async_trait]
78impl Handler<Self> for ValidationSchema {
79 async fn handle_message(
80 &mut self,
81 _sender: ActorPath,
82 msg: ValidationSchemaMessage,
83 ctx: &mut ActorContext<Self>,
84 ) -> Result<(), ActorError> {
85 match msg {
86 ValidationSchemaMessage::NetworkRequest {
87 validation_req,
88 info,
89 sender,
90 } => {
91 let observe = |result: &'static str| {
92 if let Some(metrics) = try_core_metrics() {
93 metrics
94 .observe_schema_event("validation_schema", result);
95 }
96 };
97 if sender != validation_req.signature().signer {
98 observe("rejected");
99 warn!(
100 msg_type = "NetworkRequest",
101 sender = %sender,
102 signer = %validation_req.signature().signer,
103 "Signer and sender are not the same"
104 );
105 return Ok(());
106 }
107
108 let governance_id =
109 match validation_req.content().get_governance_id() {
110 Ok(governance_id) => governance_id,
111 Err(e) => {
112 observe("rejected");
113 warn!(
114 msg_type = "NetworkRequest",
115 error = %e,
116 "Failed to get governance_id"
117 );
118 return Ok(());
119 }
120 };
121
122 if self.governance_id != governance_id {
123 observe("rejected");
124 warn!(
125 msg_type = "NetworkRequest",
126 expected_governance_id = %self.governance_id,
127 received_governance_id = %governance_id,
128 "Invalid governance_id"
129 );
130 return Ok(());
131 }
132
133 let schema_id = match validation_req.content().get_schema_id() {
134 Ok(schema_id) => schema_id,
135 Err(e) => {
136 observe("rejected");
137 warn!(
138 msg_type = "NetworkRequest",
139 error = %e,
140 "Failed to get schema_id"
141 );
142 return Ok(());
143 }
144 };
145
146 if self.schema_id != schema_id {
147 observe("rejected");
148 warn!(
149 msg_type = "NetworkRequest",
150 expected_schema_id = ?self.schema_id,
151 received_schema_id = ?schema_id,
152 "Invalid schema_id"
153 );
154 return Ok(());
155 }
156
157 if let Some(ns) = self.creators.get(&sender) {
158 let namespace =
159 match validation_req.content().get_namespace() {
160 Ok(namespace) => namespace,
161 Err(e) => {
162 observe("rejected");
163 warn!(
164 msg_type = "NetworkRequest",
165 error = %e,
166 "Failed to get namespace"
167 );
168 return Ok(());
169 }
170 };
171 if !ns.contains(&namespace) {
172 observe("rejected");
173 warn!(
174 msg_type = "NetworkRequest",
175 sender = %sender,
176 namespace = ?namespace,
177 "Invalid sender namespace"
178 );
179 return Ok(());
180 }
181 } else {
182 observe("rejected");
183 warn!(
184 msg_type = "NetworkRequest",
185 sender = %sender,
186 "Sender is not a creator"
187 );
188 return Ok(());
189 }
190
191 if self.gov_version < validation_req.content().get_gov_version()
192 && let Err(e) =
193 try_to_update(ctx, self.governance_id.clone(), None)
194 .await
195 {
196 error!(
197 msg_type = "NetworkRequest",
198 error = %e,
199 "Failed to update governance"
200 );
201 return Err(emit_fail(ctx, e).await);
202 }
203
204 let child = ctx
205 .create_child(
206 &format!("{}", validation_req.signature().signer),
207 ValiWorker {
208 init_state: Some(self.init_state.clone()),
209 node_key: sender.clone(),
210 our_key: self.our_key.clone(),
211 governance_id: self.governance_id.clone(),
212 gov_version: self.gov_version,
213 sn: self.sn,
214 current_roles: CurrentWorkerRoles {
215 evaluation: crate::governance::role_register::RoleDataRegister {
216 workers: self
217 .current_roles
218 .evaluation
219 .iter()
220 .filter(|role| role.namespace.is_ancestor_or_equal_of(&validation_req.content().get_namespace().unwrap_or_default()))
221 .map(|role| role.key.clone())
222 .collect(),
223 quorum: self.current_roles.evaluation_quorum.clone(),
224 },
225 approval: crate::governance::role_register::RoleDataRegister {
226 workers: std::collections::HashSet::new(),
227 quorum: crate::governance::model::Quorum::default(),
228 },
229 },
230 hash: self.hash,
231 network: self.network.clone(),
232 stop: true,
233 },
234 )
235 .await;
236
237 let validator_actor = match child {
238 Ok(child) => child,
239 Err(e) => {
240 if let ActorError::Exists { .. } = e {
241 observe("rejected");
242 warn!(
243 msg_type = "NetworkRequest",
244 error = %e,
245 "Validator actor already exists"
246 );
247 return Ok(());
248 } else {
249 error!(
250 msg_type = "NetworkRequest",
251 error = %e,
252 "Failed to create validator actor"
253 );
254 return Err(emit_fail(ctx, e).await);
255 }
256 }
257 };
258
259 if let Err(e) = validator_actor
260 .tell(ValiWorkerMessage::NetworkRequest {
261 validation_req,
262 info,
263 sender: sender.clone(),
264 })
265 .await
266 {
267 warn!(
268 msg_type = "NetworkRequest",
269 error = %e,
270 "Failed to send request to validator"
271 );
272 } else {
273 observe("delegated");
274 debug!(
275 msg_type = "NetworkRequest",
276 sender = %sender,
277 "Validation request delegated to worker"
278 );
279 }
280 }
281 ValidationSchemaMessage::Update {
282 creators,
283 sn,
284 gov_version,
285 init_state,
286 current_roles,
287 } => {
288 if let Some(metrics) = try_core_metrics() {
289 metrics.observe_schema_event("validation_schema", "update");
290 }
291 self.creators = creators;
292 self.gov_version = gov_version;
293 self.sn = sn;
294 self.init_state = init_state;
295 self.current_roles = current_roles;
296
297 debug!(
298 msg_type = "Update",
299 sn = self.sn,
300 gov_version = self.gov_version,
301 "Schema updated successfully"
302 );
303 }
304 };
305 Ok(())
306 }
307
308 async fn on_child_fault(
309 &mut self,
310 error: ActorError,
311 ctx: &mut ActorContext<Self>,
312 ) -> ChildAction {
313 if let Some(metrics) = try_core_metrics() {
314 metrics.observe_schema_event("validation_schema", "child_fault");
315 }
316 error!(
317 governance_id = %self.governance_id,
318 schema_id = ?self.schema_id,
319 gov_version = self.gov_version,
320 error = %error,
321 "Child fault in validation schema"
322 );
323 emit_fail(ctx, error).await;
324 ChildAction::Stop
325 }
326}