1use std::{collections::VecDeque, sync::Arc, time::Duration};
2
3use crate::{
4 ActorMessage, NetworkMessage,
5 approval::types::VotationType,
6 helpers::network::service::NetworkSender,
7 model::{
8 common::emit_fail,
9 network::{RetryNetwork, TimeOut},
10 },
11};
12use async_trait::async_trait;
13use ave_actors::{
14 Actor, ActorContext, ActorError, ActorPath, ChildAction,
15 CustomIntervalStrategy, Handler, Message, NotPersistentActor, RetryActor,
16 RetryMessage, Strategy,
17};
18use ave_common::identity::{
19 DigestIdentifier, HashAlgorithm, PublicKey, Signed, TimeStamp,
20};
21use ave_network::ComunicateInfo;
22use tracing::{Span, debug, error, info_span, warn};
23
24use super::{
25 Approval, ApprovalMessage, request::ApprovalReq, response::ApprovalRes,
26};
27
28#[derive(Clone, Debug)]
29pub struct ApprLight {
30 network: Arc<NetworkSender>,
31 node_key: PublicKey,
32 request_id: DigestIdentifier,
33 version: u64,
34}
35
36impl ApprLight {
37 pub const fn new(
38 network: Arc<NetworkSender>,
39 node_key: PublicKey,
40 request_id: DigestIdentifier,
41 version: u64,
42 ) -> Self {
43 Self {
44 network,
45 node_key,
46 request_id,
47 version,
48 }
49 }
50}
51
52pub struct InitApprLight {
53 pub request_id: DigestIdentifier,
54 pub version: u64,
55 pub our_key: Arc<PublicKey>,
56 pub node_key: PublicKey,
57 pub subject_id: String,
58 pub pass_votation: VotationType,
59 pub hash: HashAlgorithm,
60 pub network: Arc<NetworkSender>,
61}
62
63#[derive(Debug, Clone)]
64pub enum ApprLightMessage {
65 EndRetry,
66 NetworkApproval {
68 approval_req: Signed<ApprovalReq>,
69 },
70 NetworkResponse {
72 approval_res: Signed<ApprovalRes>,
73 request_id: String,
74 version: u64,
75 sender: PublicKey,
76 },
77}
78
79impl Message for ApprLightMessage {}
80
81#[async_trait]
82impl Actor for ApprLight {
83 type Event = ();
84 type Message = ApprLightMessage;
85 type Response = ();
86
87 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
88 parent_span.map_or_else(
89 || info_span!("ApprLight", id),
90 |parent_span| info_span!(parent: parent_span, "ApprLight", id),
91 )
92 }
93}
94
95#[async_trait]
96impl Handler<Self> for ApprLight {
97 async fn handle_message(
98 &mut self,
99 _sender: ActorPath,
100 msg: ApprLightMessage,
101 ctx: &mut ActorContext<Self>,
102 ) -> Result<(), ActorError> {
103 match msg {
104 ApprLightMessage::EndRetry => {
105 debug!(
106 node_key = %self.node_key,
107 "Retry exhausted, notifying parent and stopping"
108 );
109
110 match ctx.get_parent::<Approval>().await {
111 Ok(approval_actor) => {
112 if let Err(e) = approval_actor
113 .tell(ApprovalMessage::Response {
114 approval_res: ApprovalRes::TimeOut(TimeOut {
115 re_trys: 3,
116 timestamp: TimeStamp::now(),
117 who: self.node_key.clone(),
118 }),
119 sender: self.node_key.clone(),
120 signature: None,
121 })
122 .await
123 {
124 error!(
125 error = %e,
126 "Failed to send timeout response to approval actor"
127 );
128 emit_fail(ctx, e).await;
129 }
130
131 debug!(
132 request_id = %self.request_id,
133 version = self.version,
134 node_key = %self.node_key,
135 "Timeout response sent to approval actor"
136 );
137 }
138 Err(e) => {
139 error!(
140 error = %e,
141 path = %ctx.path().parent(),
142 "Approval actor not found"
143 );
144 emit_fail(ctx, e).await;
145 }
146 }
147
148 ctx.stop(None).await;
149 }
150 ApprLightMessage::NetworkApproval { approval_req } => {
151 let subject_id = approval_req.content().subject_id.clone();
153
154 let receiver_actor = format!(
155 "/user/node/subject_manager/{}/approver",
156 subject_id
157 );
158
159 let message = NetworkMessage {
160 info: ComunicateInfo {
161 request_id: self.request_id.to_string(),
162 version: self.version,
163 receiver: self.node_key.clone(),
164 receiver_actor,
165 },
166 message: ActorMessage::ApprovalReq { req: approval_req },
167 };
168
169 let target = RetryNetwork::new(self.network.clone());
170
171 let strategy = Strategy::CustomIntervalStrategy(
172 CustomIntervalStrategy::new(VecDeque::from([
173 Duration::from_hours(4),
174 Duration::from_hours(8),
175 Duration::from_hours(16),
176 ])),
177 );
178
179 let retry_actor = RetryActor::new_with_parent_message::<Self>(
180 target,
181 message,
182 strategy,
183 ApprLightMessage::EndRetry,
184 );
185
186 let retry = match ctx
187 .create_child::<RetryActor<RetryNetwork>, _>(
188 "retry",
189 retry_actor,
190 )
191 .await
192 {
193 Ok(retry) => retry,
194 Err(e) => {
195 error!(
196 msg_type = "NetworkApproval",
197 error = %e,
198 "Failed to create retry actor"
199 );
200 return Err(emit_fail(ctx, e).await);
201 }
202 };
203
204 if let Err(e) = retry.tell(RetryMessage::Retry).await {
205 error!(
206 msg_type = "NetworkApproval",
207 error = %e,
208 "Failed to send retry message"
209 );
210 return Err(emit_fail(ctx, e).await);
211 };
212
213 debug!(
214 msg_type = "NetworkApproval",
215 request_id = %self.request_id,
216 version = self.version,
217 "Retry actor created and started"
218 );
219 }
220 ApprLightMessage::NetworkResponse {
222 approval_res,
223 request_id,
224 version,
225 sender,
226 } => {
227 if request_id == self.request_id.to_string()
228 && version == self.version
229 {
230 if self.node_key != sender
231 || sender != approval_res.signature().signer
232 {
233 error!(
234 msg_type = "NetworkResponse",
235 expected_node = %self.node_key,
236 received_sender = %sender,
237 "Unexpected approval sender"
238 );
239 return Ok(());
240 }
241
242 if let Err(e) = approval_res.verify() {
243 error!(
244 msg_type = "NetworkResponse",
245 error = %e,
246 "Invalid approval signature"
247 );
248 return Ok(());
249 }
250
251 match ctx.get_parent::<Approval>().await {
252 Ok(approval_actor) => {
253 if let Err(e) = approval_actor
254 .tell(ApprovalMessage::Response {
255 approval_res: approval_res
256 .content()
257 .clone(),
258 sender: self.node_key.clone(),
259 signature: Some(
260 approval_res.signature().clone(),
261 ),
262 })
263 .await
264 {
265 error!(
266 msg_type = "NetworkResponse",
267 error = %e,
268 "Failed to send response to approval actor"
269 );
270 return Err(emit_fail(ctx, e).await);
271 }
272 }
273 Err(e) => {
274 error!(
275 msg_type = "NetworkResponse",
276 path = %ctx.path().parent(),
277 "Approval actor not found"
278 );
279 return Err(emit_fail(ctx, e).await);
280 }
281 };
282
283 'retry: {
284 let Ok(retry) = ctx
285 .get_child::<RetryActor<RetryNetwork>>("retry")
286 .await
287 else {
288 break 'retry;
289 };
290
291 if let Err(e) = retry.tell(RetryMessage::End).await {
292 error!(
293 msg_type = "NetworkResponse",
294 error = %e,
295 "Failed to send end message to retry actor"
296 );
297 break 'retry;
298 };
299 }
300
301 debug!(
302 msg_type = "NetworkResponse",
303 request_id = %request_id,
304 version = version,
305 "Approval response processed successfully"
306 );
307
308 ctx.stop(None).await;
309 } else {
310 warn!(
311 msg_type = "NetworkResponse",
312 expected_request_id = %self.request_id,
313 received_request_id = %request_id,
314 expected_version = self.version,
315 received_version = version,
316 "Mismatched request id or version"
317 );
318 }
319 }
320 }
321 Ok(())
322 }
323
324 async fn on_child_fault(
325 &mut self,
326 error: ActorError,
327 ctx: &mut ActorContext<Self>,
328 ) -> ChildAction {
329 error!(
330 request_id = %self.request_id,
331 version = self.version,
332 node_key = %self.node_key,
333 error = %error,
334 "Child fault in approval light actor"
335 );
336 emit_fail(ctx, error).await;
337 ChildAction::Stop
338 }
339}
340
341impl NotPersistentActor for ApprLight {}