1use std::{
2 collections::{BTreeMap, BTreeSet},
3 sync::Arc,
4};
5
6use async_trait::async_trait;
7use ave_actors::{
8 Actor, ActorContext, ActorError, ActorPath, Handler, Message,
9 NotPersistentActor,
10};
11use ave_common::{
12 Namespace, SchemaType, ValueWrapper,
13 identity::{DigestIdentifier, HashAlgorithm, PublicKey, Signed},
14};
15use network::ComunicateInfo;
16use tracing::{Span, debug, error, info_span, warn};
17
18use crate::{
19 evaluation::worker::{EvalWorker, EvalWorkerMessage},
20 helpers::network::service::NetworkSender,
21 model::common::{emit_fail, node::try_to_update},
22};
23
24use super::request::EvaluationReq;
25
26#[derive(Clone, Debug)]
27pub struct EvaluationSchema {
28 pub our_key: Arc<PublicKey>,
29 pub governance_id: DigestIdentifier,
30 pub gov_version: u64,
31 pub schema_id: SchemaType,
32 pub sn: u64,
33 pub creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
34 pub init_state: ValueWrapper,
35 pub hash: HashAlgorithm,
36 pub network: Arc<NetworkSender>,
37}
38
39#[derive(Debug, Clone)]
40pub enum EvaluationSchemaMessage {
41 NetworkRequest {
42 evaluation_req: Box<Signed<EvaluationReq>>,
43 info: ComunicateInfo,
44 sender: PublicKey,
45 },
46 Update {
47 creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
48 sn: u64,
49 gov_version: u64,
50 init_state: ValueWrapper,
51 },
52}
53
54impl Message for EvaluationSchemaMessage {}
55
56impl NotPersistentActor for EvaluationSchema {}
57
58#[async_trait]
59impl Actor for EvaluationSchema {
60 type Event = ();
61 type Message = EvaluationSchemaMessage;
62 type Response = ();
63
64 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
65 parent_span.map_or_else(
66 || info_span!("EvaluationSchema", id),
67 |parent_span| info_span!(parent: parent_span, "EvaluationSchema", id),
68 )
69 }
70}
71
72#[async_trait]
73impl Handler<Self> for EvaluationSchema {
74 async fn handle_message(
75 &mut self,
76 _sender: ActorPath,
77 msg: EvaluationSchemaMessage,
78 ctx: &mut ActorContext<Self>,
79 ) -> Result<(), ActorError> {
80 match msg {
81 EvaluationSchemaMessage::NetworkRequest {
82 evaluation_req,
83 info,
84 sender,
85 } => {
86 if sender != evaluation_req.signature().signer {
87 warn!(
88 msg_type = "NetworkRequest",
89 sender = %sender,
90 signer = %evaluation_req.signature().signer,
91 "Signer and sender are not the same"
92 );
93 return Ok(());
94 }
95
96 if self.governance_id != evaluation_req.content().governance_id
97 {
98 warn!(
99 msg_type = "NetworkRequest",
100 expected_governance_id = %self.governance_id,
101 received_governance_id = %evaluation_req.content().governance_id,
102 "Invalid governance_id"
103 );
104 return Ok(());
105 }
106
107 if self.schema_id != evaluation_req.content().schema_id {
108 warn!(
109 msg_type = "NetworkRequest",
110 expected_schema_id = ?self.schema_id,
111 received_schema_id = ?evaluation_req.content().schema_id,
112 "Invalid schema_id"
113 );
114 return Ok(());
115 }
116
117 if let Some(ns) = self.creators.get(&sender) {
118 if !ns.contains(&evaluation_req.content().namespace) {
119 warn!(
120 msg_type = "NetworkRequest",
121 sender = %sender,
122 namespace = ?evaluation_req.content().namespace,
123 "Invalid sender namespace"
124 );
125 return Ok(());
126 }
127 } else {
128 warn!(
129 msg_type = "NetworkRequest",
130 sender = %sender,
131 "Sender is not a creator"
132 );
133 return Ok(());
134 }
135
136 if self.gov_version < evaluation_req.content().gov_version
137 && let Err(e) =
138 try_to_update(ctx, self.governance_id.clone(), None)
139 .await
140 {
141 error!(
142 msg_type = "NetworkRequest",
143 error = %e,
144 "Failed to update governance"
145 );
146 return Err(emit_fail(ctx, e).await);
147 }
148
149 let child = ctx
150 .create_child(
151 &format!("{}", evaluation_req.signature().signer),
152 EvalWorker {
153 node_key: sender.clone(),
154 our_key: self.our_key.clone(),
155 init_state: Some(self.init_state.clone()),
156 governance_id: self.governance_id.clone(),
157 gov_version: self.gov_version,
158 sn: self.sn,
159 hash: self.hash,
160 network: self.network.clone(),
161 stop: true,
162 },
163 )
164 .await;
165
166 let evaluator_actor = match child {
167 Ok(child) => child,
168 Err(e) => {
169 if let ActorError::Exists { .. } = e {
170 warn!(
171 msg_type = "NetworkRequest",
172 error = %e,
173 "Evaluator actor already exists"
174 );
175 return Ok(());
176 } else {
177 error!(
178 msg_type = "NetworkRequest",
179 error = %e,
180 "Failed to create evaluator actor"
181 );
182 return Err(emit_fail(ctx, e).await);
183 }
184 }
185 };
186
187 if let Err(e) = evaluator_actor
188 .tell(EvalWorkerMessage::NetworkRequest {
189 evaluation_req: *evaluation_req,
190 info,
191 sender: sender.clone(),
192 })
193 .await
194 {
195 warn!(
196 msg_type = "NetworkRequest",
197 error = %e,
198 "Failed to send request to evaluator"
199 );
200 } else {
201 debug!(
202 msg_type = "NetworkRequest",
203 sender = %sender,
204 "Evaluation request delegated to worker"
205 );
206 }
207 }
208 EvaluationSchemaMessage::Update {
209 creators,
210 sn,
211 gov_version,
212 init_state,
213 } => {
214 self.creators = creators;
215 self.gov_version = gov_version;
216 self.sn = sn;
217 self.init_state = init_state;
218
219 debug!(
220 msg_type = "Update",
221 sn = self.sn,
222 gov_version = self.gov_version,
223 "Schema updated successfully"
224 );
225 }
226 };
227 Ok(())
228 }
229
230 async fn on_child_fault(
231 &mut self,
232 error: ActorError,
233 ctx: &mut ActorContext<Self>,
234 ) -> ave_actors::ChildAction {
235 error!(
236 governance_id = %self.governance_id,
237 schema_id = ?self.schema_id,
238 gov_version = self.gov_version,
239 sn = self.sn,
240 error = %error,
241 "Child fault in evaluation schema actor"
242 );
243 emit_fail(ctx, error).await;
244 ave_actors::ChildAction::Stop
245 }
246}