Skip to main content

ave_core/validation/
schema.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    sync::Arc,
4};
5
6use async_trait::async_trait;
7use ave_actors::{
8    Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
9    NotPersistentActor,
10};
11use ave_common::{
12    Namespace, SchemaType, ValueWrapper,
13    identity::{DigestIdentifier, HashAlgorithm, PublicKey},
14};
15use ave_network::ComunicateInfo;
16use tracing::{Span, debug, error, info_span, warn};
17
18use crate::{
19    Signed,
20    governance::role_register::CurrentSchemaRoles,
21    helpers::network::service::NetworkSender,
22    metrics::try_core_metrics,
23    model::common::emit_fail,
24    validation::worker::{CurrentWorkerRoles, ValiWorker, ValiWorkerMessage},
25};
26
27use super::request::ValidationReq;
28
29#[derive(Clone, Debug)]
30pub struct ValidationSchema {
31    pub our_key: Arc<PublicKey>,
32    pub governance_id: DigestIdentifier,
33    pub gov_version: u64,
34    pub sn: u64,
35    pub schema_id: SchemaType,
36    pub creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
37    pub init_state: ValueWrapper,
38    pub current_roles: CurrentSchemaRoles,
39    pub hash: HashAlgorithm,
40    pub network: Arc<NetworkSender>,
41}
42
43#[derive(Debug, Clone)]
44pub enum ValidationSchemaMessage {
45    NetworkRequest {
46        validation_req: Box<Signed<ValidationReq>>,
47        info: ComunicateInfo,
48        sender: PublicKey,
49    },
50    Update {
51        creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
52        sn: u64,
53        gov_version: u64,
54        init_state: ValueWrapper,
55        current_roles: CurrentSchemaRoles,
56    },
57}
58
59impl Message for ValidationSchemaMessage {}
60
61impl NotPersistentActor for ValidationSchema {}
62
63#[async_trait]
64impl Actor for ValidationSchema {
65    type Event = ();
66    type Message = ValidationSchemaMessage;
67    type Response = ();
68
69    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
70        parent_span.map_or_else(
71            || info_span!("ValidationSchema", id),
72            |parent_span| info_span!(parent: parent_span, "ValidationSchema", id),
73        )
74    }
75}
76
77#[async_trait]
78impl Handler<Self> for ValidationSchema {
79    async fn handle_message(
80        &mut self,
81        _sender: ActorPath,
82        msg: ValidationSchemaMessage,
83        ctx: &mut ActorContext<Self>,
84    ) -> Result<(), ActorError> {
85        match msg {
86            ValidationSchemaMessage::NetworkRequest {
87                validation_req,
88                info,
89                sender,
90            } => {
91                let observe = |result: &'static str| {
92                    if let Some(metrics) = try_core_metrics() {
93                        metrics
94                            .observe_schema_event("validation_schema", result);
95                    }
96                };
97                if sender != validation_req.signature().signer {
98                    observe("rejected");
99                    warn!(
100                        msg_type = "NetworkRequest",
101                        sender = %sender,
102                        signer = %validation_req.signature().signer,
103                        "Signer and sender are not the same"
104                    );
105                    return Ok(());
106                }
107
108                let governance_id =
109                    match validation_req.content().get_governance_id() {
110                        Ok(governance_id) => governance_id,
111                        Err(e) => {
112                            observe("rejected");
113                            warn!(
114                                msg_type = "NetworkRequest",
115                                error = %e,
116                                "Failed to get governance_id"
117                            );
118                            return Ok(());
119                        }
120                    };
121
122                if self.governance_id != governance_id {
123                    observe("rejected");
124                    warn!(
125                        msg_type = "NetworkRequest",
126                        expected_governance_id = %self.governance_id,
127                        received_governance_id = %governance_id,
128                        "Invalid governance_id"
129                    );
130                    return Ok(());
131                }
132
133                let schema_id = match validation_req.content().get_schema_id() {
134                    Ok(schema_id) => schema_id,
135                    Err(e) => {
136                        observe("rejected");
137                        warn!(
138                            msg_type = "NetworkRequest",
139                            error = %e,
140                            "Failed to get schema_id"
141                        );
142                        return Ok(());
143                    }
144                };
145
146                if self.schema_id != schema_id {
147                    observe("rejected");
148                    warn!(
149                        msg_type = "NetworkRequest",
150                        expected_schema_id = ?self.schema_id,
151                        received_schema_id = ?schema_id,
152                        "Invalid schema_id"
153                    );
154                    return Ok(());
155                }
156
157                if let Some(ns) = self.creators.get(&sender) {
158                    let namespace =
159                        match validation_req.content().get_namespace() {
160                            Ok(namespace) => namespace,
161                            Err(e) => {
162                                observe("rejected");
163                                warn!(
164                                    msg_type = "NetworkRequest",
165                                    error = %e,
166                                    "Failed to get namespace"
167                                );
168                                return Ok(());
169                            }
170                        };
171                    if !ns.contains(&namespace) {
172                        observe("rejected");
173                        warn!(
174                            msg_type = "NetworkRequest",
175                            sender = %sender,
176                            namespace = ?namespace,
177                            "Invalid sender namespace"
178                        );
179                        return Ok(());
180                    }
181                } else {
182                    observe("rejected");
183                    warn!(
184                        msg_type = "NetworkRequest",
185                        sender = %sender,
186                        "Sender is not a creator"
187                    );
188                    return Ok(());
189                }
190
191                if self.gov_version < validation_req.content().get_gov_version()
192                {
193                    observe("rejected");
194                    warn!(
195                        msg_type = "NetworkRequest",
196                        local_gov_version = self.gov_version,
197                        request_gov_version = validation_req.content().get_gov_version(),
198                        governance_id = %self.governance_id,
199                        sender = %sender,
200                        "Ignoring request with newer governance version; service nodes must update governance through resilience protocols"
201                    );
202                    return Ok(());
203                }
204
205                let child = ctx
206                    .create_child(
207                        &format!("{}", validation_req.signature().signer),
208                        ValiWorker {
209                            init_state: Some(self.init_state.clone()),
210                            node_key: sender.clone(),
211                            our_key: self.our_key.clone(),
212                            governance_id: self.governance_id.clone(),
213                            gov_version: self.gov_version,
214                            sn: self.sn,
215                            current_roles: CurrentWorkerRoles {
216                                evaluation: crate::governance::role_register::RoleDataRegister {
217                                    workers: self
218                                        .current_roles
219                                        .evaluation
220                                        .iter()
221                                        .filter(|role| role.namespace.is_ancestor_or_equal_of(&validation_req.content().get_namespace().unwrap_or_default()))
222                                        .map(|role| role.key.clone())
223                                        .collect(),
224                                    quorum: self.current_roles.evaluation_quorum.clone(),
225                                },
226                                approval: crate::governance::role_register::RoleDataRegister {
227                                    workers: std::collections::HashSet::new(),
228                                    quorum: crate::governance::model::Quorum::default(),
229                                },
230                            },
231                            hash: self.hash,
232                            network: self.network.clone(),
233                            stop: true,
234                        },
235                    )
236                    .await;
237
238                let validator_actor = match child {
239                    Ok(child) => child,
240                    Err(e) => {
241                        if let ActorError::Exists { .. } = e {
242                            observe("rejected");
243                            warn!(
244                                msg_type = "NetworkRequest",
245                                error = %e,
246                                "Validator actor already exists"
247                            );
248                            return Ok(());
249                        } else {
250                            error!(
251                                msg_type = "NetworkRequest",
252                                error = %e,
253                                "Failed to create validator actor"
254                            );
255                            return Err(emit_fail(ctx, e).await);
256                        }
257                    }
258                };
259
260                if let Err(e) = validator_actor
261                    .tell(ValiWorkerMessage::NetworkRequest {
262                        validation_req,
263                        info,
264                        sender: sender.clone(),
265                    })
266                    .await
267                {
268                    warn!(
269                        msg_type = "NetworkRequest",
270                        error = %e,
271                        "Failed to send request to validator"
272                    );
273                } else {
274                    observe("delegated");
275                    debug!(
276                        msg_type = "NetworkRequest",
277                        sender = %sender,
278                        "Validation request delegated to worker"
279                    );
280                }
281            }
282            ValidationSchemaMessage::Update {
283                creators,
284                sn,
285                gov_version,
286                init_state,
287                current_roles,
288            } => {
289                if let Some(metrics) = try_core_metrics() {
290                    metrics.observe_schema_event("validation_schema", "update");
291                }
292                self.creators = creators;
293                self.gov_version = gov_version;
294                self.sn = sn;
295                self.init_state = init_state;
296                self.current_roles = current_roles;
297
298                debug!(
299                    msg_type = "Update",
300                    sn = self.sn,
301                    gov_version = self.gov_version,
302                    "Schema updated successfully"
303                );
304            }
305        };
306        Ok(())
307    }
308
309    async fn on_child_fault(
310        &mut self,
311        error: ActorError,
312        ctx: &mut ActorContext<Self>,
313    ) -> ChildAction {
314        if let Some(metrics) = try_core_metrics() {
315            metrics.observe_schema_event("validation_schema", "child_fault");
316        }
317        error!(
318            governance_id = %self.governance_id,
319            schema_id = ?self.schema_id,
320            gov_version = self.gov_version,
321            error = %error,
322            "Child fault in validation schema"
323        );
324        emit_fail(ctx, error).await;
325        ChildAction::Stop
326    }
327}