Skip to main content

ave_core/validation/
schema.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    sync::Arc,
4};
5
6use async_trait::async_trait;
7use ave_actors::{
8    Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
9    NotPersistentActor,
10};
11use ave_common::{
12    Namespace, SchemaType, ValueWrapper,
13    identity::{DigestIdentifier, HashAlgorithm, PublicKey},
14};
15use network::ComunicateInfo;
16use tracing::{Span, debug, error, info_span, warn};
17
18use crate::{
19    Signed,
20    helpers::network::service::NetworkSender,
21    model::common::{emit_fail, node::try_to_update},
22    validation::worker::{ValiWorker, ValiWorkerMessage},
23};
24
25use super::request::ValidationReq;
26
27#[derive(Clone, Debug)]
28pub struct ValidationSchema {
29    pub our_key: Arc<PublicKey>,
30    pub governance_id: DigestIdentifier,
31    pub gov_version: u64,
32    pub sn: u64,
33    pub schema_id: SchemaType,
34    pub creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
35    pub init_state: ValueWrapper,
36    pub hash: HashAlgorithm,
37    pub network: Arc<NetworkSender>,
38}
39
40#[derive(Debug, Clone)]
41pub enum ValidationSchemaMessage {
42    NetworkRequest {
43        validation_req: Box<Signed<ValidationReq>>,
44        info: ComunicateInfo,
45        sender: PublicKey,
46    },
47    Update {
48        creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
49        sn: u64,
50        gov_version: u64,
51        init_state: ValueWrapper,
52    },
53}
54
55impl Message for ValidationSchemaMessage {}
56
57impl NotPersistentActor for ValidationSchema {}
58
59#[async_trait]
60impl Actor for ValidationSchema {
61    type Event = ();
62    type Message = ValidationSchemaMessage;
63    type Response = ();
64
65    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
66        parent_span.map_or_else(
67            || info_span!("ValidationSchema", id),
68            |parent_span| info_span!(parent: parent_span, "ValidationSchema", id),
69        )
70    }
71}
72
73#[async_trait]
74impl Handler<Self> for ValidationSchema {
75    async fn handle_message(
76        &mut self,
77        _sender: ActorPath,
78        msg: ValidationSchemaMessage,
79        ctx: &mut ActorContext<Self>,
80    ) -> Result<(), ActorError> {
81        match msg {
82            ValidationSchemaMessage::NetworkRequest {
83                validation_req,
84                info,
85                sender,
86            } => {
87                if sender != validation_req.signature().signer {
88                    warn!(
89                        msg_type = "NetworkRequest",
90                        sender = %sender,
91                        signer = %validation_req.signature().signer,
92                        "Signer and sender are not the same"
93                    );
94                    return Ok(());
95                }
96
97                let governance_id =
98                    match validation_req.content().get_governance_id() {
99                        Ok(governance_id) => governance_id,
100                        Err(e) => {
101                            warn!(
102                                msg_type = "NetworkRequest",
103                                error = %e,
104                                "Failed to get governance_id"
105                            );
106                            return Ok(());
107                        }
108                    };
109
110                if self.governance_id != governance_id {
111                    warn!(
112                        msg_type = "NetworkRequest",
113                        expected_governance_id = %self.governance_id,
114                        received_governance_id = %governance_id,
115                        "Invalid governance_id"
116                    );
117                    return Ok(());
118                }
119
120                let schema_id = match validation_req.content().get_schema_id() {
121                    Ok(schema_id) => schema_id,
122                    Err(e) => {
123                        warn!(
124                            msg_type = "NetworkRequest",
125                            error = %e,
126                            "Failed to get schema_id"
127                        );
128                        return Ok(());
129                    }
130                };
131
132                if self.schema_id != schema_id {
133                    warn!(
134                        msg_type = "NetworkRequest",
135                        expected_schema_id = ?self.schema_id,
136                        received_schema_id = ?schema_id,
137                        "Invalid schema_id"
138                    );
139                    return Ok(());
140                }
141
142                if let Some(ns) = self.creators.get(&sender) {
143                    let namespace =
144                        match validation_req.content().get_namespace() {
145                            Ok(namespace) => namespace,
146                            Err(e) => {
147                                warn!(
148                                    msg_type = "NetworkRequest",
149                                    error = %e,
150                                    "Failed to get namespace"
151                                );
152                                return Ok(());
153                            }
154                        };
155                    if !ns.contains(&namespace) {
156                        warn!(
157                            msg_type = "NetworkRequest",
158                            sender = %sender,
159                            namespace = ?namespace,
160                            "Invalid sender namespace"
161                        );
162                        return Ok(());
163                    }
164                } else {
165                    warn!(
166                        msg_type = "NetworkRequest",
167                        sender = %sender,
168                        "Sender is not a creator"
169                    );
170                    return Ok(());
171                }
172
173                if self.gov_version < validation_req.content().get_gov_version()
174                    && let Err(e) =
175                        try_to_update(ctx, self.governance_id.clone(), None)
176                            .await
177                {
178                    error!(
179                        msg_type = "NetworkRequest",
180                        error = %e,
181                        "Failed to update governance"
182                    );
183                    return Err(emit_fail(ctx, e).await);
184                }
185
186                let child = ctx
187                    .create_child(
188                        &format!("{}", validation_req.signature().signer),
189                        ValiWorker {
190                            init_state: Some(self.init_state.clone()),
191                            node_key: sender.clone(),
192                            our_key: self.our_key.clone(),
193                            governance_id: self.governance_id.clone(),
194                            gov_version: self.gov_version,
195                            sn: self.sn,
196                            hash: self.hash,
197                            network: self.network.clone(),
198                            stop: true,
199                        },
200                    )
201                    .await;
202
203                let validator_actor = match child {
204                    Ok(child) => child,
205                    Err(e) => {
206                        if let ActorError::Exists { .. } = e {
207                            warn!(
208                                msg_type = "NetworkRequest",
209                                error = %e,
210                                "Validator actor already exists"
211                            );
212                            return Ok(());
213                        } else {
214                            error!(
215                                msg_type = "NetworkRequest",
216                                error = %e,
217                                "Failed to create validator actor"
218                            );
219                            return Err(emit_fail(ctx, e).await);
220                        }
221                    }
222                };
223
224                if let Err(e) = validator_actor
225                    .tell(ValiWorkerMessage::NetworkRequest {
226                        validation_req,
227                        info,
228                        sender: sender.clone(),
229                    })
230                    .await
231                {
232                    warn!(
233                        msg_type = "NetworkRequest",
234                        error = %e,
235                        "Failed to send request to validator"
236                    );
237                } else {
238                    debug!(
239                        msg_type = "NetworkRequest",
240                        sender = %sender,
241                        "Validation request delegated to worker"
242                    );
243                }
244            }
245            ValidationSchemaMessage::Update {
246                creators,
247                sn,
248                gov_version,
249                init_state,
250            } => {
251                self.creators = creators;
252                self.gov_version = gov_version;
253                self.sn = sn;
254                self.init_state = init_state;
255
256                debug!(
257                    msg_type = "Update",
258                    sn = self.sn,
259                    gov_version = self.gov_version,
260                    "Schema updated successfully"
261                );
262            }
263        };
264        Ok(())
265    }
266
267    async fn on_child_fault(
268        &mut self,
269        error: ActorError,
270        ctx: &mut ActorContext<Self>,
271    ) -> ChildAction {
272        error!(
273            governance_id = %self.governance_id,
274            schema_id = ?self.schema_id,
275            gov_version = self.gov_version,
276            error = %error,
277            "Child fault in validation schema"
278        );
279        emit_fail(ctx, error).await;
280        ChildAction::Stop
281    }
282}