Skip to main content

ave_core/evaluation/
schema.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    sync::Arc,
4};
5
6use async_trait::async_trait;
7use ave_actors::{
8    Actor, ActorContext, ActorError, ActorPath, Handler, Message,
9    NotPersistentActor,
10};
11use ave_common::{
12    Namespace, SchemaType, ValueWrapper,
13    identity::{DigestIdentifier, HashAlgorithm, PublicKey, Signed},
14};
15use network::ComunicateInfo;
16use tracing::{Span, debug, error, info_span, warn};
17
18use crate::{
19    evaluation::worker::{EvalWorker, EvalWorkerMessage},
20    helpers::network::service::NetworkSender,
21    model::common::{emit_fail, node::try_to_update},
22};
23
24use super::request::EvaluationReq;
25
26#[derive(Clone, Debug)]
27pub struct EvaluationSchema {
28    pub our_key: Arc<PublicKey>,
29    pub governance_id: DigestIdentifier,
30    pub gov_version: u64,
31    pub schema_id: SchemaType,
32    pub sn: u64,
33    pub creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
34    pub init_state: ValueWrapper,
35    pub hash: HashAlgorithm,
36    pub network: Arc<NetworkSender>,
37}
38
39#[derive(Debug, Clone)]
40pub enum EvaluationSchemaMessage {
41    NetworkRequest {
42        evaluation_req: Box<Signed<EvaluationReq>>,
43        info: ComunicateInfo,
44        sender: PublicKey,
45    },
46    Update {
47        creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
48        sn: u64,
49        gov_version: u64,
50        init_state: ValueWrapper,
51    },
52}
53
54impl Message for EvaluationSchemaMessage {}
55
56impl NotPersistentActor for EvaluationSchema {}
57
58#[async_trait]
59impl Actor for EvaluationSchema {
60    type Event = ();
61    type Message = EvaluationSchemaMessage;
62    type Response = ();
63
64    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
65        parent_span.map_or_else(
66            || info_span!("EvaluationSchema", id),
67            |parent_span| info_span!(parent: parent_span, "EvaluationSchema", id),
68        )
69    }
70}
71
72#[async_trait]
73impl Handler<Self> for EvaluationSchema {
74    async fn handle_message(
75        &mut self,
76        _sender: ActorPath,
77        msg: EvaluationSchemaMessage,
78        ctx: &mut ActorContext<Self>,
79    ) -> Result<(), ActorError> {
80        match msg {
81            EvaluationSchemaMessage::NetworkRequest {
82                evaluation_req,
83                info,
84                sender,
85            } => {
86                if sender != evaluation_req.signature().signer {
87                    warn!(
88                        msg_type = "NetworkRequest",
89                        sender = %sender,
90                        signer = %evaluation_req.signature().signer,
91                        "Signer and sender are not the same"
92                    );
93                    return Ok(());
94                }
95
96                if self.governance_id != evaluation_req.content().governance_id
97                {
98                    warn!(
99                        msg_type = "NetworkRequest",
100                        expected_governance_id = %self.governance_id,
101                        received_governance_id = %evaluation_req.content().governance_id,
102                        "Invalid governance_id"
103                    );
104                    return Ok(());
105                }
106
107                if self.schema_id != evaluation_req.content().schema_id {
108                    warn!(
109                        msg_type = "NetworkRequest",
110                        expected_schema_id = ?self.schema_id,
111                        received_schema_id = ?evaluation_req.content().schema_id,
112                        "Invalid schema_id"
113                    );
114                    return Ok(());
115                }
116
117                if let Some(ns) = self.creators.get(&sender) {
118                    if !ns.contains(&evaluation_req.content().namespace) {
119                        warn!(
120                            msg_type = "NetworkRequest",
121                            sender = %sender,
122                            namespace = ?evaluation_req.content().namespace,
123                            "Invalid sender namespace"
124                        );
125                        return Ok(());
126                    }
127                } else {
128                    warn!(
129                        msg_type = "NetworkRequest",
130                        sender = %sender,
131                        "Sender is not a creator"
132                    );
133                    return Ok(());
134                }
135
136                if self.gov_version < evaluation_req.content().gov_version
137                    && let Err(e) =
138                        try_to_update(ctx, self.governance_id.clone(), None)
139                            .await
140                {
141                    error!(
142                        msg_type = "NetworkRequest",
143                        error = %e,
144                        "Failed to update governance"
145                    );
146                    return Err(emit_fail(ctx, e).await);
147                }
148
149                let child = ctx
150                    .create_child(
151                        &format!("{}", evaluation_req.signature().signer),
152                        EvalWorker {
153                            node_key: sender.clone(),
154                            our_key: self.our_key.clone(),
155                            init_state: Some(self.init_state.clone()),
156                            governance_id: self.governance_id.clone(),
157                            gov_version: self.gov_version,
158                            sn: self.sn,
159                            hash: self.hash,
160                            network: self.network.clone(),
161                            stop: true,
162                        },
163                    )
164                    .await;
165
166                let evaluator_actor = match child {
167                    Ok(child) => child,
168                    Err(e) => {
169                        if let ActorError::Exists { .. } = e {
170                            warn!(
171                                msg_type = "NetworkRequest",
172                                error = %e,
173                                "Evaluator actor already exists"
174                            );
175                            return Ok(());
176                        } else {
177                            error!(
178                                msg_type = "NetworkRequest",
179                                error = %e,
180                                "Failed to create evaluator actor"
181                            );
182                            return Err(emit_fail(ctx, e).await);
183                        }
184                    }
185                };
186
187                if let Err(e) = evaluator_actor
188                    .tell(EvalWorkerMessage::NetworkRequest {
189                        evaluation_req: *evaluation_req,
190                        info,
191                        sender: sender.clone(),
192                    })
193                    .await
194                {
195                    warn!(
196                        msg_type = "NetworkRequest",
197                        error = %e,
198                        "Failed to send request to evaluator"
199                    );
200                } else {
201                    debug!(
202                        msg_type = "NetworkRequest",
203                        sender = %sender,
204                        "Evaluation request delegated to worker"
205                    );
206                }
207            }
208            EvaluationSchemaMessage::Update {
209                creators,
210                sn,
211                gov_version,
212                init_state,
213            } => {
214                self.creators = creators;
215                self.gov_version = gov_version;
216                self.sn = sn;
217                self.init_state = init_state;
218
219                debug!(
220                    msg_type = "Update",
221                    sn = self.sn,
222                    gov_version = self.gov_version,
223                    "Schema updated successfully"
224                );
225            }
226        };
227        Ok(())
228    }
229
230    async fn on_child_fault(
231        &mut self,
232        error: ActorError,
233        ctx: &mut ActorContext<Self>,
234    ) -> ave_actors::ChildAction {
235        error!(
236            governance_id = %self.governance_id,
237            schema_id = ?self.schema_id,
238            gov_version = self.gov_version,
239            sn = self.sn,
240            error = %error,
241            "Child fault in evaluation schema actor"
242        );
243        emit_fail(ctx, error).await;
244        ave_actors::ChildAction::Stop
245    }
246}