Skip to main content

ave_core/evaluation/
schema.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    sync::Arc,
4};
5
6use async_trait::async_trait;
7use ave_actors::{
8    Actor, ActorContext, ActorError, ActorPath, Handler, Message,
9    NotPersistentActor,
10};
11use ave_common::{
12    Namespace, SchemaType, ValueWrapper,
13    identity::{DigestIdentifier, HashAlgorithm, PublicKey, Signed},
14    request::EventRequest,
15};
16use ave_network::ComunicateInfo;
17use tracing::{Span, debug, error, info_span, warn};
18
19use crate::{
20    evaluation::worker::{EvalWorker, EvalWorkerMessage},
21    helpers::network::service::NetworkSender,
22    metrics::try_core_metrics,
23    model::common::emit_fail,
24};
25
26use super::request::{EvalWorkerContext, EvaluationReq};
27
28#[derive(Clone, Debug)]
29pub struct EvaluationSchema {
30    pub our_key: Arc<PublicKey>,
31    pub governance_id: DigestIdentifier,
32    pub gov_version: u64,
33    pub schema_id: SchemaType,
34    pub sn: u64,
35    pub members: BTreeSet<PublicKey>,
36    pub creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
37    pub issuers: BTreeMap<PublicKey, BTreeSet<Namespace>>,
38    pub issuer_any: bool,
39    pub schema_viewpoints: BTreeSet<String>,
40    pub init_state: ValueWrapper,
41    pub hash: HashAlgorithm,
42    pub network: Arc<NetworkSender>,
43}
44
45#[derive(Debug, Clone)]
46pub enum EvaluationSchemaMessage {
47    NetworkRequest {
48        evaluation_req: Box<Signed<EvaluationReq>>,
49        info: ComunicateInfo,
50        sender: PublicKey,
51    },
52    Update {
53        members: BTreeSet<PublicKey>,
54        creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
55        issuers: BTreeMap<PublicKey, BTreeSet<Namespace>>,
56        issuer_any: bool,
57        schema_viewpoints: BTreeSet<String>,
58        sn: u64,
59        gov_version: u64,
60        init_state: ValueWrapper,
61    },
62}
63
64impl Message for EvaluationSchemaMessage {}
65
66impl NotPersistentActor for EvaluationSchema {}
67
68impl EvaluationSchema {
69    fn context_for_request(
70        &self,
71        evaluation_req: &EvaluationReq,
72    ) -> EvalWorkerContext {
73        match evaluation_req.event_request.content() {
74            EventRequest::Fact(_) => EvalWorkerContext::TrackerFact {
75                issuers: self
76                    .issuers
77                    .iter()
78                    .filter(|(_, namespaces)| {
79                        namespaces.iter().any(|issuer_namespace| {
80                            issuer_namespace
81                                .is_ancestor_or_equal_of(&evaluation_req.namespace)
82                        })
83                    })
84                    .map(|(issuer, _)| issuer.clone())
85                    .collect(),
86                issuer_any: self.issuer_any,
87                schema_viewpoints: self.schema_viewpoints.clone(),
88            },
89            EventRequest::Transfer(_) => EvalWorkerContext::TrackerTransfer {
90                members: self.members.clone(),
91                creators: self.creators.clone(),
92            },
93            _ => EvalWorkerContext::Empty,
94        }
95    }
96}
97
98#[async_trait]
99impl Actor for EvaluationSchema {
100    type Event = ();
101    type Message = EvaluationSchemaMessage;
102    type Response = ();
103
104    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
105        parent_span.map_or_else(
106            || info_span!("EvaluationSchema", id),
107            |parent_span| info_span!(parent: parent_span, "EvaluationSchema", id),
108        )
109    }
110}
111
112#[async_trait]
113impl Handler<Self> for EvaluationSchema {
114    async fn handle_message(
115        &mut self,
116        _sender: ActorPath,
117        msg: EvaluationSchemaMessage,
118        ctx: &mut ActorContext<Self>,
119    ) -> Result<(), ActorError> {
120        match msg {
121            EvaluationSchemaMessage::NetworkRequest {
122                evaluation_req,
123                info,
124                sender,
125            } => {
126                let observe = |result: &'static str| {
127                    if let Some(metrics) = try_core_metrics() {
128                        metrics
129                            .observe_schema_event("evaluation_schema", result);
130                    }
131                };
132                if sender != evaluation_req.signature().signer {
133                    observe("rejected");
134                    warn!(
135                        msg_type = "NetworkRequest",
136                        sender = %sender,
137                        signer = %evaluation_req.signature().signer,
138                        "Signer and sender are not the same"
139                    );
140                    return Ok(());
141                }
142
143                if self.governance_id != evaluation_req.content().governance_id
144                {
145                    observe("rejected");
146                    warn!(
147                        msg_type = "NetworkRequest",
148                        expected_governance_id = %self.governance_id,
149                        received_governance_id = %evaluation_req.content().governance_id,
150                        "Invalid governance_id"
151                    );
152                    return Ok(());
153                }
154
155                if self.schema_id != evaluation_req.content().schema_id {
156                    observe("rejected");
157                    warn!(
158                        msg_type = "NetworkRequest",
159                        expected_schema_id = ?self.schema_id,
160                        received_schema_id = ?evaluation_req.content().schema_id,
161                        "Invalid schema_id"
162                    );
163                    return Ok(());
164                }
165
166                if let Some(ns) = self.creators.get(&sender) {
167                    if !ns.contains(&evaluation_req.content().namespace) {
168                        observe("rejected");
169                        warn!(
170                            msg_type = "NetworkRequest",
171                            sender = %sender,
172                            namespace = ?evaluation_req.content().namespace,
173                            "Invalid sender namespace"
174                        );
175                        return Ok(());
176                    }
177                } else {
178                    observe("rejected");
179                    warn!(
180                        msg_type = "NetworkRequest",
181                        sender = %sender,
182                        "Sender is not a creator"
183                    );
184                    return Ok(());
185                }
186
187                if self.gov_version < evaluation_req.content().gov_version {
188                    observe("rejected");
189                    warn!(
190                        msg_type = "NetworkRequest",
191                        local_gov_version = self.gov_version,
192                        request_gov_version = evaluation_req.content().gov_version,
193                        governance_id = %self.governance_id,
194                        sender = %sender,
195                        "Ignoring request with newer governance version; service nodes must update governance through resilience protocols"
196                    );
197                    return Ok(());
198                }
199
200                let child = ctx
201                    .create_child(
202                        &format!("{}", evaluation_req.signature().signer),
203                        EvalWorker {
204                            node_key: sender.clone(),
205                            our_key: self.our_key.clone(),
206                            init_state: Some(self.init_state.clone()),
207                            governance_id: self.governance_id.clone(),
208                            gov_version: self.gov_version,
209                            sn: self.sn,
210                            context: self
211                                .context_for_request(evaluation_req.content()),
212                            hash: self.hash,
213                            network: self.network.clone(),
214                            stop: true,
215                        },
216                    )
217                    .await;
218
219                let evaluator_actor = match child {
220                    Ok(child) => child,
221                    Err(e) => {
222                        if let ActorError::Exists { .. } = e {
223                            warn!(
224                                msg_type = "NetworkRequest",
225                                error = %e,
226                                "Evaluator actor already exists"
227                            );
228                            observe("rejected");
229                            return Ok(());
230                        } else {
231                            error!(
232                                msg_type = "NetworkRequest",
233                                error = %e,
234                                "Failed to create evaluator actor"
235                            );
236                            return Err(emit_fail(ctx, e).await);
237                        }
238                    }
239                };
240
241                if let Err(e) = evaluator_actor
242                    .tell(EvalWorkerMessage::NetworkRequest {
243                        evaluation_req: *evaluation_req,
244                        info,
245                        sender: sender.clone(),
246                    })
247                    .await
248                {
249                    warn!(
250                        msg_type = "NetworkRequest",
251                        error = %e,
252                        "Failed to send request to evaluator"
253                    );
254                } else {
255                    observe("delegated");
256                    debug!(
257                        msg_type = "NetworkRequest",
258                        sender = %sender,
259                        "Evaluation request delegated to worker"
260                    );
261                }
262            }
263            EvaluationSchemaMessage::Update {
264                members,
265                creators,
266                issuers,
267                issuer_any,
268                schema_viewpoints,
269                sn,
270                gov_version,
271                init_state,
272            } => {
273                if let Some(metrics) = try_core_metrics() {
274                    metrics.observe_schema_event("evaluation_schema", "update");
275                }
276                self.members = members;
277                self.creators = creators;
278                self.issuers = issuers;
279                self.issuer_any = issuer_any;
280                self.schema_viewpoints = schema_viewpoints;
281                self.gov_version = gov_version;
282                self.sn = sn;
283                self.init_state = init_state;
284
285                debug!(
286                    msg_type = "Update",
287                    sn = self.sn,
288                    gov_version = self.gov_version,
289                    "Schema updated successfully"
290                );
291            }
292        };
293        Ok(())
294    }
295
296    async fn on_child_fault(
297        &mut self,
298        error: ActorError,
299        ctx: &mut ActorContext<Self>,
300    ) -> ave_actors::ChildAction {
301        if let Some(metrics) = try_core_metrics() {
302            metrics.observe_schema_event("evaluation_schema", "child_fault");
303        }
304        error!(
305            governance_id = %self.governance_id,
306            schema_id = ?self.schema_id,
307            gov_version = self.gov_version,
308            sn = self.sn,
309            error = %error,
310            "Child fault in evaluation schema actor"
311        );
312        emit_fail(ctx, error).await;
313        ave_actors::ChildAction::Stop
314    }
315}