1use std::collections::HashSet;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use ave_actors::ActorPath;
6use ave_actors::{
7 Actor, ActorContext, ActorError, ChildAction, Handler, Message,
8 NotPersistentActor,
9};
10
11use ave_common::identity::{
12 CryptoError, DigestIdentifier, HashAlgorithm, PublicKey, Signature, Signed,
13 hash_borsh,
14};
15
16use request::ApprovalReq;
17use response::ApprovalRes;
18
19use tracing::{Span, debug, error, info_span, warn};
20
21use crate::approval::light::{ApprLight, ApprLightMessage};
22use crate::approval::persist::{ApprPersist, ApprPersistMessage};
23use crate::governance::model::Quorum;
24use crate::helpers::network::service::NetworkSender;
25use crate::metrics::try_core_metrics;
26use crate::model::common::{abort_req, emit_fail};
27
28use crate::model::event::ApprovalData;
29use crate::model::network::TimeOut;
30
31use crate::request::manager::{RequestManager, RequestManagerMessage};
32
33pub mod light;
34pub mod persist;
35pub mod request;
36pub mod response;
37pub mod types;
38
39#[derive(Clone, Debug)]
40pub struct Approval {
41 hash: HashAlgorithm,
42 network: Arc<NetworkSender>,
43 our_key: Arc<PublicKey>,
44 quorum: Quorum,
45 request_id: DigestIdentifier,
46 version: u64,
47 request: Signed<ApprovalReq>,
48 approvers: HashSet<PublicKey>,
49 approvers_timeout: Vec<TimeOut>,
50 approvers_agrees: Vec<Signature>,
51 approvers_disagrees: Vec<Signature>,
52 approval_req_hash: DigestIdentifier,
53 approvers_quantity: u32,
54}
55
56impl Approval {
57 fn observe_event(result: &'static str) {
58 if let Some(metrics) = try_core_metrics() {
59 metrics.observe_protocol_event("approval", result);
60 }
61 }
62
63 pub fn new(
64 our_key: Arc<PublicKey>,
65 request: Signed<ApprovalReq>,
66 quorum: Quorum,
67 approvers: HashSet<PublicKey>,
68 hash: HashAlgorithm,
69 network: Arc<NetworkSender>,
70 ) -> Self {
71 Self {
72 hash,
73 network,
74 our_key,
75 quorum,
76 request,
77 approvers_quantity: approvers.len() as u32,
78 approvers,
79 request_id: DigestIdentifier::default(),
80 version: 0,
81 approvers_timeout: vec![],
82 approvers_agrees: vec![],
83 approvers_disagrees: vec![],
84 approval_req_hash: DigestIdentifier::default(),
85 }
86 }
87
88 async fn create_approvers(
89 &self,
90 ctx: &mut ActorContext<Self>,
91 signer: PublicKey,
92 ) -> Result<(), ActorError> {
93 let subject_id = self.request.content().subject_id.to_string();
94
95 if signer == *self.our_key {
96 let approver_path = ActorPath::from(format!(
97 "/user/node/subject_manager/{}/approver",
98 subject_id
99 ));
100 let approver_actor = ctx
101 .system()
102 .get_actor::<ApprPersist>(&approver_path)
103 .await?;
104 approver_actor
105 .tell(ApprPersistMessage::LocalApproval {
106 request_id: self.request_id.clone(),
107 version: self.version,
108 approval_req: self.request.clone(),
109 })
110 .await?
111 } else {
112 let child = ctx
114 .create_child(
115 &signer.to_string(),
116 ApprLight::new(
117 self.network.clone(),
118 signer.clone(),
119 self.request_id.clone(),
120 self.version,
121 ),
122 )
123 .await?;
124
125 child
126 .tell(ApprLightMessage::NetworkApproval {
127 approval_req: self.request.clone(),
128 })
129 .await?;
130 }
131
132 Ok(())
133 }
134 fn check_approval(&mut self, approver: PublicKey) -> bool {
135 self.approvers.remove(&approver)
136 }
137
138 async fn send_approval_to_req(
139 &self,
140 ctx: &ActorContext<Self>,
141 response: bool,
142 ) -> Result<(), ActorError> {
143 let req_actor = ctx.get_parent::<RequestManager>().await?;
144
145 req_actor
146 .tell(RequestManagerMessage::ApprovalRes {
147 request_id: self.request_id.clone(),
148 appro_res: ApprovalData {
149 approval_req_signature: self.request.signature().clone(),
150 approval_req_hash: self.approval_req_hash.clone(),
151 approvers_agrees_signatures: self.approvers_agrees.clone(),
152 approvers_disagrees_signatures: self
153 .approvers_disagrees
154 .clone(),
155 approvers_timeout: self.approvers_timeout.clone(),
156 approved: response,
157 },
158 })
159 .await
160 }
161
162 fn create_appro_req_hash(&self) -> Result<DigestIdentifier, CryptoError> {
163 hash_borsh(&*self.hash.hasher(), self.request.content())
164 }
165}
166
167#[derive(Debug, Clone)]
168pub enum ApprovalMessage {
169 Create {
170 request_id: DigestIdentifier,
171 version: u64,
172 },
173 Response {
174 approval_res: ApprovalRes,
175 sender: PublicKey,
176 signature: Option<Signature>,
177 },
178}
179
180impl Message for ApprovalMessage {}
181
182#[async_trait]
183impl Actor for Approval {
184 type Event = ();
185 type Message = ApprovalMessage;
186 type Response = ();
187
188 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
189 parent_span.map_or_else(
190 || info_span!("Approval"),
191 |parent_span| info_span!(parent: parent_span, "Approval"),
192 )
193 }
194}
195
196#[async_trait]
197impl Handler<Self> for Approval {
198 async fn handle_message(
199 &mut self,
200 __sender: ActorPath,
201 msg: ApprovalMessage,
202 ctx: &mut ActorContext<Self>,
203 ) -> Result<(), ActorError> {
204 match msg {
205 ApprovalMessage::Create {
206 request_id,
207 version,
208 } => {
209 let approval_req_hash = match self.create_appro_req_hash() {
210 Ok(digest) => digest,
211 Err(e) => {
212 error!(
213 msg_type = "Create",
214 error = %e,
215 "Failed to create approval request hash"
216 );
217 return Err(emit_fail(
218 ctx,
219 ActorError::FunctionalCritical {
220 description: format!(
221 "Cannot create approval request hash: {}",
222 e
223 ),
224 },
225 )
226 .await);
227 }
228 };
229
230 self.approval_req_hash = approval_req_hash;
231 self.request_id = request_id.clone();
232 self.version = version;
233
234 for signer in self.approvers.clone() {
235 if let Err(e) =
236 self.create_approvers(ctx, signer.clone()).await
237 {
238 error!(
239 msg_type = "Create",
240 error = %e,
241 signer = %signer,
242 "Failed to create approver actor"
243 );
244 return Err(emit_fail(ctx, e).await);
245 }
246 }
247
248 debug!(
249 msg_type = "Create",
250 request_id = %request_id,
251 version = version,
252 approvers_count = self.approvers_quantity,
253 "Approval created and approvers initialized"
254 );
255 }
256 ApprovalMessage::Response {
257 approval_res,
258 sender,
259 signature,
260 } => {
261 if self.check_approval(sender.clone()) {
262 match approval_res.clone() {
263 ApprovalRes::Response {
264 approval_req_hash,
265 agrees,
266 ..
267 } => {
268 if approval_req_hash != self.approval_req_hash {
269 error!(
270 msg_type = "Response",
271 expected_hash = %self.approval_req_hash,
272 received_hash = %approval_req_hash,
273 "Invalid approval request hash"
274 );
275 return Err(ActorError::Functional {
276 description: "Approval Response, Invalid approval request hash".to_owned(),
277 });
278 }
279
280 let Some(signature) = signature else {
281 error!(
282 msg_type = "Response",
283 sender = %sender,
284 "Approval response without signature"
285 );
286 return Err(ActorError::Functional {
287 description: "Approval Response solver without signature".to_owned(),
288 });
289 };
290
291 if agrees {
292 self.approvers_agrees.push(signature);
293 } else {
294 self.approvers_disagrees.push(signature);
295 }
296 }
297 ApprovalRes::Abort(error) => {
298 Self::observe_event("abort");
299 if let Err(e) = abort_req(
300 ctx,
301 self.request_id.clone(),
302 sender.clone(),
303 error.clone(),
304 self.request.content().sn,
305 )
306 .await
307 {
308 error!(
309 msg_type = "Response",
310 request_id = %self.request_id,
311 sender = %sender,
312 abort_reason = %error,
313 error = %e,
314 "Failed to abort request"
315 );
316 return Err(emit_fail(ctx, e).await);
317 }
318
319 debug!(
320 msg_type = "Response",
321 request_id = %self.request_id,
322 sender = %sender,
323 abort_reason = %error,
324 "Approval aborted"
325 );
326
327 return Ok(());
328 }
329 ApprovalRes::TimeOut(approval_time_out) => {
330 Self::observe_event("timeout");
331 self.approvers_timeout.push(approval_time_out);
332 }
333 };
334
335 if self.quorum.check_quorum(
337 self.approvers_quantity,
338 self.approvers_agrees.len() as u32
339 + self.approvers_timeout.len() as u32,
340 ) {
341 Self::observe_event("approved");
342 if let Err(e) =
343 self.send_approval_to_req(ctx, true).await
344 {
345 error!(
346 msg_type = "Response",
347 error = %e,
348 "Failed to send approval response to request actor"
349 );
350 return Err(emit_fail(ctx, e).await);
351 };
352
353 debug!(
354 msg_type = "Response",
355 agrees = self.approvers_agrees.len(),
356 disagrees = self.approvers_disagrees.len(),
357 timeouts = self.approvers_timeout.len(),
358 "Quorum reached, approval accepted"
359 );
360 } else if self.approvers.is_empty()
361 && let Err(e) =
362 self.send_approval_to_req(ctx, false).await
363 {
364 error!(
365 msg_type = "Response",
366 error = %e,
367 "Failed to send approval response to request actor"
368 );
369 return Err(emit_fail(ctx, e).await);
370 } else if self.approvers.is_empty() {
371 Self::observe_event("rejected");
372 debug!(
373 msg_type = "Response",
374 agrees = self.approvers_agrees.len(),
375 disagrees = self.approvers_disagrees.len(),
376 timeouts = self.approvers_timeout.len(),
377 "All approvers responded, approval rejected"
378 );
379 }
380 } else {
381 warn!(
382 msg_type = "Response",
383 sender = %sender,
384 "Response from unexpected sender"
385 );
386 }
387 }
388 }
389 Ok(())
390 }
391
392 async fn on_child_fault(
393 &mut self,
394 error: ActorError,
395 ctx: &mut ActorContext<Self>,
396 ) -> ChildAction {
397 Self::observe_event("error");
398 error!(
399 request_id = %self.request_id,
400 version = self.version,
401 error = %error,
402 "Child fault in approval actor"
403 );
404 emit_fail(ctx, error).await;
405 ChildAction::Stop
406 }
407}
408
409impl NotPersistentActor for Approval {}