Skip to main content

ave_core/validation/
schema.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    sync::Arc,
4};
5
6use async_trait::async_trait;
7use ave_actors::{
8    Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
9    NotPersistentActor,
10};
11use ave_common::{
12    Namespace, SchemaType, ValueWrapper,
13    identity::{DigestIdentifier, HashAlgorithm, PublicKey},
14};
15use network::ComunicateInfo;
16use tracing::{Span, debug, error, info_span, warn};
17
18use crate::{
19    Signed,
20    governance::role_register::CurrentSchemaRoles,
21    helpers::network::service::NetworkSender,
22    metrics::try_core_metrics,
23    model::common::{emit_fail, node::try_to_update},
24    validation::worker::{CurrentWorkerRoles, ValiWorker, ValiWorkerMessage},
25};
26
27use super::request::ValidationReq;
28
29#[derive(Clone, Debug)]
30pub struct ValidationSchema {
31    pub our_key: Arc<PublicKey>,
32    pub governance_id: DigestIdentifier,
33    pub gov_version: u64,
34    pub sn: u64,
35    pub schema_id: SchemaType,
36    pub creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
37    pub init_state: ValueWrapper,
38    pub current_roles: CurrentSchemaRoles,
39    pub hash: HashAlgorithm,
40    pub network: Arc<NetworkSender>,
41}
42
43#[derive(Debug, Clone)]
44pub enum ValidationSchemaMessage {
45    NetworkRequest {
46        validation_req: Box<Signed<ValidationReq>>,
47        info: ComunicateInfo,
48        sender: PublicKey,
49    },
50    Update {
51        creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
52        sn: u64,
53        gov_version: u64,
54        init_state: ValueWrapper,
55        current_roles: CurrentSchemaRoles,
56    },
57}
58
59impl Message for ValidationSchemaMessage {}
60
61impl NotPersistentActor for ValidationSchema {}
62
63#[async_trait]
64impl Actor for ValidationSchema {
65    type Event = ();
66    type Message = ValidationSchemaMessage;
67    type Response = ();
68
69    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
70        parent_span.map_or_else(
71            || info_span!("ValidationSchema", id),
72            |parent_span| info_span!(parent: parent_span, "ValidationSchema", id),
73        )
74    }
75}
76
77#[async_trait]
78impl Handler<Self> for ValidationSchema {
79    async fn handle_message(
80        &mut self,
81        _sender: ActorPath,
82        msg: ValidationSchemaMessage,
83        ctx: &mut ActorContext<Self>,
84    ) -> Result<(), ActorError> {
85        match msg {
86            ValidationSchemaMessage::NetworkRequest {
87                validation_req,
88                info,
89                sender,
90            } => {
91                let observe = |result: &'static str| {
92                    if let Some(metrics) = try_core_metrics() {
93                        metrics
94                            .observe_schema_event("validation_schema", result);
95                    }
96                };
97                if sender != validation_req.signature().signer {
98                    observe("rejected");
99                    warn!(
100                        msg_type = "NetworkRequest",
101                        sender = %sender,
102                        signer = %validation_req.signature().signer,
103                        "Signer and sender are not the same"
104                    );
105                    return Ok(());
106                }
107
108                let governance_id =
109                    match validation_req.content().get_governance_id() {
110                        Ok(governance_id) => governance_id,
111                        Err(e) => {
112                            observe("rejected");
113                            warn!(
114                                msg_type = "NetworkRequest",
115                                error = %e,
116                                "Failed to get governance_id"
117                            );
118                            return Ok(());
119                        }
120                    };
121
122                if self.governance_id != governance_id {
123                    observe("rejected");
124                    warn!(
125                        msg_type = "NetworkRequest",
126                        expected_governance_id = %self.governance_id,
127                        received_governance_id = %governance_id,
128                        "Invalid governance_id"
129                    );
130                    return Ok(());
131                }
132
133                let schema_id = match validation_req.content().get_schema_id() {
134                    Ok(schema_id) => schema_id,
135                    Err(e) => {
136                        observe("rejected");
137                        warn!(
138                            msg_type = "NetworkRequest",
139                            error = %e,
140                            "Failed to get schema_id"
141                        );
142                        return Ok(());
143                    }
144                };
145
146                if self.schema_id != schema_id {
147                    observe("rejected");
148                    warn!(
149                        msg_type = "NetworkRequest",
150                        expected_schema_id = ?self.schema_id,
151                        received_schema_id = ?schema_id,
152                        "Invalid schema_id"
153                    );
154                    return Ok(());
155                }
156
157                if let Some(ns) = self.creators.get(&sender) {
158                    let namespace =
159                        match validation_req.content().get_namespace() {
160                            Ok(namespace) => namespace,
161                            Err(e) => {
162                                observe("rejected");
163                                warn!(
164                                    msg_type = "NetworkRequest",
165                                    error = %e,
166                                    "Failed to get namespace"
167                                );
168                                return Ok(());
169                            }
170                        };
171                    if !ns.contains(&namespace) {
172                        observe("rejected");
173                        warn!(
174                            msg_type = "NetworkRequest",
175                            sender = %sender,
176                            namespace = ?namespace,
177                            "Invalid sender namespace"
178                        );
179                        return Ok(());
180                    }
181                } else {
182                    observe("rejected");
183                    warn!(
184                        msg_type = "NetworkRequest",
185                        sender = %sender,
186                        "Sender is not a creator"
187                    );
188                    return Ok(());
189                }
190
191                if self.gov_version < validation_req.content().get_gov_version()
192                    && let Err(e) =
193                        try_to_update(ctx, self.governance_id.clone(), None)
194                            .await
195                {
196                    error!(
197                        msg_type = "NetworkRequest",
198                        error = %e,
199                        "Failed to update governance"
200                    );
201                    return Err(emit_fail(ctx, e).await);
202                }
203
204                let child = ctx
205                    .create_child(
206                        &format!("{}", validation_req.signature().signer),
207                        ValiWorker {
208                            init_state: Some(self.init_state.clone()),
209                            node_key: sender.clone(),
210                            our_key: self.our_key.clone(),
211                            governance_id: self.governance_id.clone(),
212                            gov_version: self.gov_version,
213                            sn: self.sn,
214                            current_roles: CurrentWorkerRoles {
215                                evaluation: crate::governance::role_register::RoleDataRegister {
216                                    workers: self
217                                        .current_roles
218                                        .evaluation
219                                        .iter()
220                                        .filter(|role| role.namespace.is_ancestor_or_equal_of(&validation_req.content().get_namespace().unwrap_or_default()))
221                                        .map(|role| role.key.clone())
222                                        .collect(),
223                                    quorum: self.current_roles.evaluation_quorum.clone(),
224                                },
225                                approval: crate::governance::role_register::RoleDataRegister {
226                                    workers: std::collections::HashSet::new(),
227                                    quorum: crate::governance::model::Quorum::default(),
228                                },
229                            },
230                            hash: self.hash,
231                            network: self.network.clone(),
232                            stop: true,
233                        },
234                    )
235                    .await;
236
237                let validator_actor = match child {
238                    Ok(child) => child,
239                    Err(e) => {
240                        if let ActorError::Exists { .. } = e {
241                            observe("rejected");
242                            warn!(
243                                msg_type = "NetworkRequest",
244                                error = %e,
245                                "Validator actor already exists"
246                            );
247                            return Ok(());
248                        } else {
249                            error!(
250                                msg_type = "NetworkRequest",
251                                error = %e,
252                                "Failed to create validator actor"
253                            );
254                            return Err(emit_fail(ctx, e).await);
255                        }
256                    }
257                };
258
259                if let Err(e) = validator_actor
260                    .tell(ValiWorkerMessage::NetworkRequest {
261                        validation_req,
262                        info,
263                        sender: sender.clone(),
264                    })
265                    .await
266                {
267                    warn!(
268                        msg_type = "NetworkRequest",
269                        error = %e,
270                        "Failed to send request to validator"
271                    );
272                } else {
273                    observe("delegated");
274                    debug!(
275                        msg_type = "NetworkRequest",
276                        sender = %sender,
277                        "Validation request delegated to worker"
278                    );
279                }
280            }
281            ValidationSchemaMessage::Update {
282                creators,
283                sn,
284                gov_version,
285                init_state,
286                current_roles,
287            } => {
288                if let Some(metrics) = try_core_metrics() {
289                    metrics.observe_schema_event("validation_schema", "update");
290                }
291                self.creators = creators;
292                self.gov_version = gov_version;
293                self.sn = sn;
294                self.init_state = init_state;
295                self.current_roles = current_roles;
296
297                debug!(
298                    msg_type = "Update",
299                    sn = self.sn,
300                    gov_version = self.gov_version,
301                    "Schema updated successfully"
302                );
303            }
304        };
305        Ok(())
306    }
307
308    async fn on_child_fault(
309        &mut self,
310        error: ActorError,
311        ctx: &mut ActorContext<Self>,
312    ) -> ChildAction {
313        if let Some(metrics) = try_core_metrics() {
314            metrics.observe_schema_event("validation_schema", "child_fault");
315        }
316        error!(
317            governance_id = %self.governance_id,
318            schema_id = ?self.schema_id,
319            gov_version = self.gov_version,
320            error = %error,
321            "Child fault in validation schema"
322        );
323        emit_fail(ctx, error).await;
324        ChildAction::Stop
325    }
326}