Skip to main content

ave_core/evaluation/
schema.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    sync::Arc,
4};
5
6use async_trait::async_trait;
7use ave_actors::{
8    Actor, ActorContext, ActorError, ActorPath, Handler, Message,
9    NotPersistentActor,
10};
11use ave_common::{
12    Namespace, SchemaType, ValueWrapper,
13    identity::{DigestIdentifier, HashAlgorithm, PublicKey, Signed},
14};
15use network::ComunicateInfo;
16use tracing::{Span, debug, error, info_span, warn};
17
18use crate::{
19    evaluation::worker::{EvalWorker, EvalWorkerMessage},
20    helpers::network::service::NetworkSender,
21    metrics::try_core_metrics,
22    model::common::{emit_fail, node::try_to_update},
23};
24
25use super::request::EvaluationReq;
26
27#[derive(Clone, Debug)]
28pub struct EvaluationSchema {
29    pub our_key: Arc<PublicKey>,
30    pub governance_id: DigestIdentifier,
31    pub gov_version: u64,
32    pub schema_id: SchemaType,
33    pub sn: u64,
34    pub creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
35    pub init_state: ValueWrapper,
36    pub hash: HashAlgorithm,
37    pub network: Arc<NetworkSender>,
38}
39
40#[derive(Debug, Clone)]
41pub enum EvaluationSchemaMessage {
42    NetworkRequest {
43        evaluation_req: Box<Signed<EvaluationReq>>,
44        info: ComunicateInfo,
45        sender: PublicKey,
46    },
47    Update {
48        creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
49        sn: u64,
50        gov_version: u64,
51        init_state: ValueWrapper,
52    },
53}
54
55impl Message for EvaluationSchemaMessage {}
56
57impl NotPersistentActor for EvaluationSchema {}
58
59#[async_trait]
60impl Actor for EvaluationSchema {
61    type Event = ();
62    type Message = EvaluationSchemaMessage;
63    type Response = ();
64
65    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
66        parent_span.map_or_else(
67            || info_span!("EvaluationSchema", id),
68            |parent_span| info_span!(parent: parent_span, "EvaluationSchema", id),
69        )
70    }
71}
72
73#[async_trait]
74impl Handler<Self> for EvaluationSchema {
75    async fn handle_message(
76        &mut self,
77        _sender: ActorPath,
78        msg: EvaluationSchemaMessage,
79        ctx: &mut ActorContext<Self>,
80    ) -> Result<(), ActorError> {
81        match msg {
82            EvaluationSchemaMessage::NetworkRequest {
83                evaluation_req,
84                info,
85                sender,
86            } => {
87                let observe = |result: &'static str| {
88                    if let Some(metrics) = try_core_metrics() {
89                        metrics
90                            .observe_schema_event("evaluation_schema", result);
91                    }
92                };
93                if sender != evaluation_req.signature().signer {
94                    observe("rejected");
95                    warn!(
96                        msg_type = "NetworkRequest",
97                        sender = %sender,
98                        signer = %evaluation_req.signature().signer,
99                        "Signer and sender are not the same"
100                    );
101                    return Ok(());
102                }
103
104                if self.governance_id != evaluation_req.content().governance_id
105                {
106                    observe("rejected");
107                    warn!(
108                        msg_type = "NetworkRequest",
109                        expected_governance_id = %self.governance_id,
110                        received_governance_id = %evaluation_req.content().governance_id,
111                        "Invalid governance_id"
112                    );
113                    return Ok(());
114                }
115
116                if self.schema_id != evaluation_req.content().schema_id {
117                    observe("rejected");
118                    warn!(
119                        msg_type = "NetworkRequest",
120                        expected_schema_id = ?self.schema_id,
121                        received_schema_id = ?evaluation_req.content().schema_id,
122                        "Invalid schema_id"
123                    );
124                    return Ok(());
125                }
126
127                if let Some(ns) = self.creators.get(&sender) {
128                    if !ns.contains(&evaluation_req.content().namespace) {
129                        observe("rejected");
130                        warn!(
131                            msg_type = "NetworkRequest",
132                            sender = %sender,
133                            namespace = ?evaluation_req.content().namespace,
134                            "Invalid sender namespace"
135                        );
136                        return Ok(());
137                    }
138                } else {
139                    observe("rejected");
140                    warn!(
141                        msg_type = "NetworkRequest",
142                        sender = %sender,
143                        "Sender is not a creator"
144                    );
145                    return Ok(());
146                }
147
148                if self.gov_version < evaluation_req.content().gov_version
149                    && let Err(e) =
150                        try_to_update(ctx, self.governance_id.clone(), None)
151                            .await
152                {
153                    error!(
154                        msg_type = "NetworkRequest",
155                        error = %e,
156                        "Failed to update governance"
157                    );
158                    return Err(emit_fail(ctx, e).await);
159                }
160
161                let child = ctx
162                    .create_child(
163                        &format!("{}", evaluation_req.signature().signer),
164                        EvalWorker {
165                            node_key: sender.clone(),
166                            our_key: self.our_key.clone(),
167                            init_state: Some(self.init_state.clone()),
168                            governance_id: self.governance_id.clone(),
169                            gov_version: self.gov_version,
170                            sn: self.sn,
171                            hash: self.hash,
172                            network: self.network.clone(),
173                            stop: true,
174                        },
175                    )
176                    .await;
177
178                let evaluator_actor = match child {
179                    Ok(child) => child,
180                    Err(e) => {
181                        if let ActorError::Exists { .. } = e {
182                            warn!(
183                                msg_type = "NetworkRequest",
184                                error = %e,
185                                "Evaluator actor already exists"
186                            );
187                            observe("rejected");
188                            return Ok(());
189                        } else {
190                            error!(
191                                msg_type = "NetworkRequest",
192                                error = %e,
193                                "Failed to create evaluator actor"
194                            );
195                            return Err(emit_fail(ctx, e).await);
196                        }
197                    }
198                };
199
200                if let Err(e) = evaluator_actor
201                    .tell(EvalWorkerMessage::NetworkRequest {
202                        evaluation_req: *evaluation_req,
203                        info,
204                        sender: sender.clone(),
205                    })
206                    .await
207                {
208                    warn!(
209                        msg_type = "NetworkRequest",
210                        error = %e,
211                        "Failed to send request to evaluator"
212                    );
213                } else {
214                    observe("delegated");
215                    debug!(
216                        msg_type = "NetworkRequest",
217                        sender = %sender,
218                        "Evaluation request delegated to worker"
219                    );
220                }
221            }
222            EvaluationSchemaMessage::Update {
223                creators,
224                sn,
225                gov_version,
226                init_state,
227            } => {
228                if let Some(metrics) = try_core_metrics() {
229                    metrics.observe_schema_event("evaluation_schema", "update");
230                }
231                self.creators = creators;
232                self.gov_version = gov_version;
233                self.sn = sn;
234                self.init_state = init_state;
235
236                debug!(
237                    msg_type = "Update",
238                    sn = self.sn,
239                    gov_version = self.gov_version,
240                    "Schema updated successfully"
241                );
242            }
243        };
244        Ok(())
245    }
246
247    async fn on_child_fault(
248        &mut self,
249        error: ActorError,
250        ctx: &mut ActorContext<Self>,
251    ) -> ave_actors::ChildAction {
252        if let Some(metrics) = try_core_metrics() {
253            metrics.observe_schema_event("evaluation_schema", "child_fault");
254        }
255        error!(
256            governance_id = %self.governance_id,
257            schema_id = ?self.schema_id,
258            gov_version = self.gov_version,
259            sn = self.sn,
260            error = %error,
261            "Child fault in evaluation schema actor"
262        );
263        emit_fail(ctx, error).await;
264        ave_actors::ChildAction::Stop
265    }
266}