1use std::{
2 collections::{BTreeMap, BTreeSet},
3 sync::Arc,
4};
5
6use async_trait::async_trait;
7use ave_actors::{
8 Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
9 NotPersistentActor,
10};
11use ave_common::{
12 Namespace, SchemaType, ValueWrapper,
13 identity::{DigestIdentifier, HashAlgorithm, PublicKey},
14};
15use network::ComunicateInfo;
16use tracing::{Span, debug, error, info_span, warn};
17
18use crate::{
19 Signed,
20 helpers::network::service::NetworkSender,
21 model::common::{emit_fail, node::try_to_update},
22 validation::worker::{ValiWorker, ValiWorkerMessage},
23};
24
25use super::request::ValidationReq;
26
27#[derive(Clone, Debug)]
28pub struct ValidationSchema {
29 pub our_key: Arc<PublicKey>,
30 pub governance_id: DigestIdentifier,
31 pub gov_version: u64,
32 pub sn: u64,
33 pub schema_id: SchemaType,
34 pub creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
35 pub init_state: ValueWrapper,
36 pub hash: HashAlgorithm,
37 pub network: Arc<NetworkSender>,
38}
39
40#[derive(Debug, Clone)]
41pub enum ValidationSchemaMessage {
42 NetworkRequest {
43 validation_req: Box<Signed<ValidationReq>>,
44 info: ComunicateInfo,
45 sender: PublicKey,
46 },
47 Update {
48 creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
49 sn: u64,
50 gov_version: u64,
51 init_state: ValueWrapper,
52 },
53}
54
55impl Message for ValidationSchemaMessage {}
56
57impl NotPersistentActor for ValidationSchema {}
58
59#[async_trait]
60impl Actor for ValidationSchema {
61 type Event = ();
62 type Message = ValidationSchemaMessage;
63 type Response = ();
64
65 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
66 parent_span.map_or_else(
67 || info_span!("ValidationSchema", id),
68 |parent_span| info_span!(parent: parent_span, "ValidationSchema", id),
69 )
70 }
71}
72
73#[async_trait]
74impl Handler<Self> for ValidationSchema {
75 async fn handle_message(
76 &mut self,
77 _sender: ActorPath,
78 msg: ValidationSchemaMessage,
79 ctx: &mut ActorContext<Self>,
80 ) -> Result<(), ActorError> {
81 match msg {
82 ValidationSchemaMessage::NetworkRequest {
83 validation_req,
84 info,
85 sender,
86 } => {
87 if sender != validation_req.signature().signer {
88 warn!(
89 msg_type = "NetworkRequest",
90 sender = %sender,
91 signer = %validation_req.signature().signer,
92 "Signer and sender are not the same"
93 );
94 return Ok(());
95 }
96
97 let governance_id =
98 match validation_req.content().get_governance_id() {
99 Ok(governance_id) => governance_id,
100 Err(e) => {
101 warn!(
102 msg_type = "NetworkRequest",
103 error = %e,
104 "Failed to get governance_id"
105 );
106 return Ok(());
107 }
108 };
109
110 if self.governance_id != governance_id {
111 warn!(
112 msg_type = "NetworkRequest",
113 expected_governance_id = %self.governance_id,
114 received_governance_id = %governance_id,
115 "Invalid governance_id"
116 );
117 return Ok(());
118 }
119
120 let schema_id = match validation_req.content().get_schema_id() {
121 Ok(schema_id) => schema_id,
122 Err(e) => {
123 warn!(
124 msg_type = "NetworkRequest",
125 error = %e,
126 "Failed to get schema_id"
127 );
128 return Ok(());
129 }
130 };
131
132 if self.schema_id != schema_id {
133 warn!(
134 msg_type = "NetworkRequest",
135 expected_schema_id = ?self.schema_id,
136 received_schema_id = ?schema_id,
137 "Invalid schema_id"
138 );
139 return Ok(());
140 }
141
142 if let Some(ns) = self.creators.get(&sender) {
143 let namespace =
144 match validation_req.content().get_namespace() {
145 Ok(namespace) => namespace,
146 Err(e) => {
147 warn!(
148 msg_type = "NetworkRequest",
149 error = %e,
150 "Failed to get namespace"
151 );
152 return Ok(());
153 }
154 };
155 if !ns.contains(&namespace) {
156 warn!(
157 msg_type = "NetworkRequest",
158 sender = %sender,
159 namespace = ?namespace,
160 "Invalid sender namespace"
161 );
162 return Ok(());
163 }
164 } else {
165 warn!(
166 msg_type = "NetworkRequest",
167 sender = %sender,
168 "Sender is not a creator"
169 );
170 return Ok(());
171 }
172
173 if self.gov_version < validation_req.content().get_gov_version()
174 && let Err(e) =
175 try_to_update(ctx, self.governance_id.clone(), None)
176 .await
177 {
178 error!(
179 msg_type = "NetworkRequest",
180 error = %e,
181 "Failed to update governance"
182 );
183 return Err(emit_fail(ctx, e).await);
184 }
185
186 let child = ctx
187 .create_child(
188 &format!("{}", validation_req.signature().signer),
189 ValiWorker {
190 init_state: Some(self.init_state.clone()),
191 node_key: sender.clone(),
192 our_key: self.our_key.clone(),
193 governance_id: self.governance_id.clone(),
194 gov_version: self.gov_version,
195 sn: self.sn,
196 hash: self.hash,
197 network: self.network.clone(),
198 stop: true,
199 },
200 )
201 .await;
202
203 let validator_actor = match child {
204 Ok(child) => child,
205 Err(e) => {
206 if let ActorError::Exists { .. } = e {
207 warn!(
208 msg_type = "NetworkRequest",
209 error = %e,
210 "Validator actor already exists"
211 );
212 return Ok(());
213 } else {
214 error!(
215 msg_type = "NetworkRequest",
216 error = %e,
217 "Failed to create validator actor"
218 );
219 return Err(emit_fail(ctx, e).await);
220 }
221 }
222 };
223
224 if let Err(e) = validator_actor
225 .tell(ValiWorkerMessage::NetworkRequest {
226 validation_req,
227 info,
228 sender: sender.clone(),
229 })
230 .await
231 {
232 warn!(
233 msg_type = "NetworkRequest",
234 error = %e,
235 "Failed to send request to validator"
236 );
237 } else {
238 debug!(
239 msg_type = "NetworkRequest",
240 sender = %sender,
241 "Validation request delegated to worker"
242 );
243 }
244 }
245 ValidationSchemaMessage::Update {
246 creators,
247 sn,
248 gov_version,
249 init_state,
250 } => {
251 self.creators = creators;
252 self.gov_version = gov_version;
253 self.sn = sn;
254 self.init_state = init_state;
255
256 debug!(
257 msg_type = "Update",
258 sn = self.sn,
259 gov_version = self.gov_version,
260 "Schema updated successfully"
261 );
262 }
263 };
264 Ok(())
265 }
266
267 async fn on_child_fault(
268 &mut self,
269 error: ActorError,
270 ctx: &mut ActorContext<Self>,
271 ) -> ChildAction {
272 error!(
273 governance_id = %self.governance_id,
274 schema_id = ?self.schema_id,
275 gov_version = self.gov_version,
276 error = %error,
277 "Child fault in validation schema"
278 );
279 emit_fail(ctx, error).await;
280 ChildAction::Stop
281 }
282}