Skip to main content

ave_core/evaluation/
schema.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    sync::Arc,
4};
5
6use async_trait::async_trait;
7use ave_actors::{
8    Actor, ActorContext, ActorError, ActorPath, Handler, Message,
9    NotPersistentActor,
10};
11use ave_common::{
12    Namespace, SchemaType, ValueWrapper,
13    identity::{DigestIdentifier, HashAlgorithm, PublicKey, Signed},
14    request::EventRequest,
15};
16use ave_network::ComunicateInfo;
17use tracing::{Span, debug, error, info_span, warn};
18
19use crate::{
20    evaluation::worker::{EvalWorker, EvalWorkerMessage},
21    helpers::network::service::NetworkSender,
22    metrics::try_core_metrics,
23    model::common::emit_fail,
24};
25
26use super::request::{EvalWorkerContext, EvaluationReq};
27
28#[derive(Clone, Debug)]
29pub struct EvaluationSchema {
30    pub our_key: Arc<PublicKey>,
31    pub governance_id: DigestIdentifier,
32    pub gov_version: u64,
33    pub schema_id: SchemaType,
34    pub sn: u64,
35    pub members: BTreeSet<PublicKey>,
36    pub creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
37    pub issuers: BTreeMap<PublicKey, BTreeSet<Namespace>>,
38    pub issuer_any: bool,
39    pub schema_viewpoints: BTreeSet<String>,
40    pub init_state: ValueWrapper,
41    pub hash: HashAlgorithm,
42    pub network: Arc<NetworkSender>,
43}
44
45#[derive(Debug, Clone)]
46pub enum EvaluationSchemaMessage {
47    NetworkRequest {
48        evaluation_req: Box<Signed<EvaluationReq>>,
49        info: ComunicateInfo,
50        sender: PublicKey,
51    },
52    Update {
53        members: BTreeSet<PublicKey>,
54        creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
55        issuers: BTreeMap<PublicKey, BTreeSet<Namespace>>,
56        issuer_any: bool,
57        schema_viewpoints: BTreeSet<String>,
58        sn: u64,
59        gov_version: u64,
60        init_state: ValueWrapper,
61    },
62}
63
64impl Message for EvaluationSchemaMessage {}
65
66impl NotPersistentActor for EvaluationSchema {}
67
68impl EvaluationSchema {
69    fn context_for_request(
70        &self,
71        evaluation_req: &EvaluationReq,
72    ) -> EvalWorkerContext {
73        match evaluation_req.event_request.content() {
74            EventRequest::Fact(_) => EvalWorkerContext::TrackerFact {
75                issuers: self
76                    .issuers
77                    .iter()
78                    .filter(|(_, namespaces)| {
79                        namespaces.iter().any(|issuer_namespace| {
80                            issuer_namespace.is_ancestor_or_equal_of(
81                                &evaluation_req.namespace,
82                            )
83                        })
84                    })
85                    .map(|(issuer, _)| issuer.clone())
86                    .collect(),
87                issuer_any: self.issuer_any,
88                schema_viewpoints: self.schema_viewpoints.clone(),
89            },
90            EventRequest::Transfer(_) => EvalWorkerContext::TrackerTransfer {
91                members: self.members.clone(),
92                creators: self.creators.clone(),
93            },
94            _ => EvalWorkerContext::Empty,
95        }
96    }
97}
98
99#[async_trait]
100impl Actor for EvaluationSchema {
101    type Event = ();
102    type Message = EvaluationSchemaMessage;
103    type Response = ();
104
105    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
106        parent_span.map_or_else(
107            || info_span!("EvaluationSchema", id),
108            |parent_span| info_span!(parent: parent_span, "EvaluationSchema", id),
109        )
110    }
111}
112
113#[async_trait]
114impl Handler<Self> for EvaluationSchema {
115    async fn handle_message(
116        &mut self,
117        _sender: ActorPath,
118        msg: EvaluationSchemaMessage,
119        ctx: &mut ActorContext<Self>,
120    ) -> Result<(), ActorError> {
121        match msg {
122            EvaluationSchemaMessage::NetworkRequest {
123                evaluation_req,
124                info,
125                sender,
126            } => {
127                let observe = |result: &'static str| {
128                    if let Some(metrics) = try_core_metrics() {
129                        metrics
130                            .observe_schema_event("evaluation_schema", result);
131                    }
132                };
133                if sender != evaluation_req.signature().signer {
134                    observe("rejected");
135                    warn!(
136                        msg_type = "NetworkRequest",
137                        sender = %sender,
138                        signer = %evaluation_req.signature().signer,
139                        "Signer and sender are not the same"
140                    );
141                    return Ok(());
142                }
143
144                if self.governance_id != evaluation_req.content().governance_id
145                {
146                    observe("rejected");
147                    warn!(
148                        msg_type = "NetworkRequest",
149                        expected_governance_id = %self.governance_id,
150                        received_governance_id = %evaluation_req.content().governance_id,
151                        "Invalid governance_id"
152                    );
153                    return Ok(());
154                }
155
156                if self.schema_id != evaluation_req.content().schema_id {
157                    observe("rejected");
158                    warn!(
159                        msg_type = "NetworkRequest",
160                        expected_schema_id = ?self.schema_id,
161                        received_schema_id = ?evaluation_req.content().schema_id,
162                        "Invalid schema_id"
163                    );
164                    return Ok(());
165                }
166
167                if let Some(ns) = self.creators.get(&sender) {
168                    if !ns.contains(&evaluation_req.content().namespace) {
169                        observe("rejected");
170                        warn!(
171                            msg_type = "NetworkRequest",
172                            sender = %sender,
173                            namespace = ?evaluation_req.content().namespace,
174                            "Invalid sender namespace"
175                        );
176                        return Ok(());
177                    }
178                } else {
179                    observe("rejected");
180                    warn!(
181                        msg_type = "NetworkRequest",
182                        sender = %sender,
183                        "Sender is not a creator"
184                    );
185                    return Ok(());
186                }
187
188                if self.gov_version < evaluation_req.content().gov_version {
189                    observe("rejected");
190                    warn!(
191                        msg_type = "NetworkRequest",
192                        local_gov_version = self.gov_version,
193                        request_gov_version = evaluation_req.content().gov_version,
194                        governance_id = %self.governance_id,
195                        sender = %sender,
196                        "Ignoring request with newer governance version; service nodes must update governance through resilience protocols"
197                    );
198                    return Ok(());
199                }
200
201                let child = ctx
202                    .create_child(
203                        &format!("{}", evaluation_req.signature().signer),
204                        EvalWorker {
205                            node_key: sender.clone(),
206                            our_key: self.our_key.clone(),
207                            init_state: Some(self.init_state.clone()),
208                            governance_id: self.governance_id.clone(),
209                            gov_version: self.gov_version,
210                            sn: self.sn,
211                            context: self
212                                .context_for_request(evaluation_req.content()),
213                            hash: self.hash,
214                            network: self.network.clone(),
215                            stop: true,
216                        },
217                    )
218                    .await;
219
220                let evaluator_actor = match child {
221                    Ok(child) => child,
222                    Err(e) => {
223                        if let ActorError::Exists { .. } = e {
224                            warn!(
225                                msg_type = "NetworkRequest",
226                                error = %e,
227                                "Evaluator actor already exists"
228                            );
229                            observe("rejected");
230                            return Ok(());
231                        } else {
232                            error!(
233                                msg_type = "NetworkRequest",
234                                error = %e,
235                                "Failed to create evaluator actor"
236                            );
237                            return Err(emit_fail(ctx, e).await);
238                        }
239                    }
240                };
241
242                if let Err(e) = evaluator_actor
243                    .tell(EvalWorkerMessage::NetworkRequest {
244                        evaluation_req: *evaluation_req,
245                        info,
246                        sender: sender.clone(),
247                    })
248                    .await
249                {
250                    warn!(
251                        msg_type = "NetworkRequest",
252                        error = %e,
253                        "Failed to send request to evaluator"
254                    );
255                } else {
256                    observe("delegated");
257                    debug!(
258                        msg_type = "NetworkRequest",
259                        sender = %sender,
260                        "Evaluation request delegated to worker"
261                    );
262                }
263            }
264            EvaluationSchemaMessage::Update {
265                members,
266                creators,
267                issuers,
268                issuer_any,
269                schema_viewpoints,
270                sn,
271                gov_version,
272                init_state,
273            } => {
274                if let Some(metrics) = try_core_metrics() {
275                    metrics.observe_schema_event("evaluation_schema", "update");
276                }
277                self.members = members;
278                self.creators = creators;
279                self.issuers = issuers;
280                self.issuer_any = issuer_any;
281                self.schema_viewpoints = schema_viewpoints;
282                self.gov_version = gov_version;
283                self.sn = sn;
284                self.init_state = init_state;
285
286                debug!(
287                    msg_type = "Update",
288                    sn = self.sn,
289                    gov_version = self.gov_version,
290                    "Schema updated successfully"
291                );
292            }
293        };
294        Ok(())
295    }
296
297    async fn on_child_fault(
298        &mut self,
299        error: ActorError,
300        ctx: &mut ActorContext<Self>,
301    ) -> ave_actors::ChildAction {
302        if let Some(metrics) = try_core_metrics() {
303            metrics.observe_schema_event("evaluation_schema", "child_fault");
304        }
305        error!(
306            governance_id = %self.governance_id,
307            schema_id = ?self.schema_id,
308            gov_version = self.gov_version,
309            sn = self.sn,
310            error = %error,
311            "Child fault in evaluation schema actor"
312        );
313        emit_fail(ctx, error).await;
314        ave_actors::ChildAction::Stop
315    }
316}