1use std::{sync::Arc, time::Duration};
2
3use ave_actors::{
4 Actor, ActorContext, ActorError, ActorPath, ChildAction,
5 FixedIntervalStrategy, Handler, Message, NotPersistentActor, RetryActor,
6 RetryMessage, Strategy,
7};
8
9use async_trait::async_trait;
10use ave_common::identity::{DigestIdentifier, PublicKey};
11use ave_network::ComunicateInfo;
12use tracing::{Span, debug, error, info_span, warn};
13
14use crate::{
15 ActorMessage,
16 helpers::network::{NetworkMessage, service::NetworkSender},
17 model::{common::emit_fail, network::RetryNetwork},
18 update::UpdateWitnessOffer,
19};
20
21use super::{Update, UpdateMessage};
22
23#[derive(Clone, Debug)]
24pub struct Updater {
25 network: Arc<NetworkSender>,
26 node_key: PublicKey,
27 round: u64,
28 witness_retry_count: usize,
29 witness_retry_interval_secs: u64,
30}
31
32impl Updater {
33 pub const fn new(
34 node_key: PublicKey,
35 round: u64,
36 network: Arc<NetworkSender>,
37 witness_retry_count: usize,
38 witness_retry_interval_secs: u64,
39 ) -> Self {
40 Self {
41 node_key,
42 network,
43 round,
44 witness_retry_count,
45 witness_retry_interval_secs,
46 }
47 }
48}
49
50#[derive(Debug, Clone)]
51pub enum UpdaterMessage {
52 EndRetry,
53 NetworkLastSn {
54 subject_id: DigestIdentifier,
55 actual_sn: Option<u64>,
56 },
57 NetworkNoOffer {
58 sender: PublicKey,
59 },
60 NetworkResponse {
61 offer: UpdateWitnessOffer,
62 sender: PublicKey,
63 },
64}
65
66impl Message for UpdaterMessage {}
67
68impl NotPersistentActor for Updater {}
69
70#[async_trait]
71impl Actor for Updater {
72 type Event = ();
73 type Message = UpdaterMessage;
74 type Response = ();
75
76 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
77 parent_span.map_or_else(
78 || info_span!("Updater", id),
79 |parent_span| info_span!(parent: parent_span, "Updater", id),
80 )
81 }
82}
83
84#[async_trait]
85impl Handler<Self> for Updater {
86 async fn handle_message(
87 &mut self,
88 _sender: ActorPath,
89 msg: UpdaterMessage,
90 ctx: &mut ActorContext<Self>,
91 ) -> Result<(), ActorError> {
92 match msg {
93 UpdaterMessage::EndRetry => {
94 warn!(
95 node_key = %self.node_key,
96 "Retry exhausted, notifying parent and stopping"
97 );
98
99 match ctx.get_parent::<Update>().await {
100 Ok(update_actor) => {
101 if let Err(e) = update_actor
102 .tell(UpdateMessage::Response {
103 sender: self.node_key.clone(),
104 offer: None,
105 round: self.round,
106 })
107 .await
108 {
109 error!(
110 error = %e,
111 "Failed to send timeout response to update actor"
112 );
113 emit_fail(ctx, e).await;
114 } else {
115 debug!(
116 node = %self.node_key,
117 "Timeout response sent to update actor"
118 );
119 }
120 }
121 Err(e) => {
122 error!(
123 error = %e,
124 path = %ctx.path().parent(),
125 "Update actor not found"
126 );
127 emit_fail(ctx, e).await;
128 }
129 };
130
131 ctx.stop(None).await;
132 }
133 UpdaterMessage::NetworkLastSn {
134 subject_id,
135 actual_sn,
136 } => {
137 let message = NetworkMessage {
138 info: ComunicateInfo {
139 request_id: String::default(),
140 version: 0,
141 receiver: self.node_key.clone(),
142 receiver_actor: format!(
143 "/user/node/distributor_{}",
144 subject_id
145 ),
146 },
147 message: ActorMessage::DistributionGetLastSn {
148 subject_id: subject_id.clone(),
149 actual_sn,
150 receiver_actor: ctx.path().to_string(),
151 },
152 };
153
154 let target = RetryNetwork::new(self.network.clone());
155
156 let strategy =
157 Strategy::FixedInterval(FixedIntervalStrategy::new(
158 self.witness_retry_count.max(1),
159 Duration::from_secs(
160 self.witness_retry_interval_secs.max(1),
161 ),
162 ));
163
164 let retry_actor = RetryActor::new_with_parent_message::<Self>(
165 target,
166 message,
167 strategy,
168 UpdaterMessage::EndRetry,
169 );
170
171 let retry = match ctx
172 .create_child::<RetryActor<RetryNetwork>, _>(
173 "retry",
174 retry_actor,
175 )
176 .await
177 {
178 Ok(retry) => retry,
179 Err(e) => {
180 error!(
181 msg_type = "NetworkLastSn",
182 error = %e,
183 "Failed to create retry actor"
184 );
185 return Err(emit_fail(ctx, e).await);
186 }
187 };
188
189 if let Err(e) = retry.tell(RetryMessage::Retry).await {
190 error!(
191 msg_type = "NetworkLastSn",
192 error = %e,
193 "Failed to send retry message to retry actor"
194 );
195 return Err(emit_fail(ctx, e).await);
196 } else {
197 debug!(
198 msg_type = "NetworkLastSn",
199 subject_id = %subject_id,
200 node_key = %self.node_key,
201 "Last SN request sent to network with retry"
202 );
203 };
204 }
205 UpdaterMessage::NetworkResponse { offer, sender } => {
206 if sender != self.node_key {
207 warn!(
208 msg_type = "NetworkResponse",
209 expected_node = %self.node_key,
210 sender = %sender,
211 "Ignoring update response from unexpected sender"
212 );
213 return Ok(());
214 }
215
216 match ctx.get_parent::<Update>().await {
217 Ok(update_actor) => {
218 if let Err(e) = update_actor
219 .tell(UpdateMessage::Response {
220 sender: self.node_key.clone(),
221 offer: Some(offer.clone()),
222 round: self.round,
223 })
224 .await
225 {
226 error!(
227 msg_type = "NetworkResponse",
228 error = %e,
229 "Failed to send response to update actor"
230 );
231 return Err(emit_fail(ctx, e).await);
232 }
233 }
234 Err(e) => {
235 error!(
236 msg_type = "NetworkResponse",
237 error = %e,
238 path = %ctx.path().parent(),
239 "Update actor not found"
240 );
241 return Err(emit_fail(ctx, e).await);
242 }
243 };
244
245 'retry: {
246 let Ok(retry) = ctx
247 .get_child::<RetryActor<RetryNetwork>>("retry")
248 .await
249 else {
250 debug!(
251 msg_type = "NetworkResponse",
252 sender = %sender,
253 "Retry actor not found while closing updater"
254 );
255 break 'retry;
257 };
258
259 if let Err(e) = retry.tell(RetryMessage::End).await {
260 warn!(
261 msg_type = "NetworkResponse",
262 error = %e,
263 "Failed to end retry actor"
264 );
265 break 'retry;
267 };
268 }
269
270 debug!(
271 msg_type = "NetworkResponse",
272 sn = offer.sn,
273 sender = %sender,
274 "Network response processed successfully"
275 );
276
277 ctx.stop(None).await;
278 }
279 UpdaterMessage::NetworkNoOffer { sender } => {
280 if sender != self.node_key {
281 warn!(
282 msg_type = "NetworkNoOffer",
283 expected_node = %self.node_key,
284 sender = %sender,
285 "Ignoring empty update response from unexpected sender"
286 );
287 return Ok(());
288 }
289
290 match ctx.get_parent::<Update>().await {
291 Ok(update_actor) => {
292 if let Err(e) = update_actor
293 .tell(UpdateMessage::Response {
294 sender: self.node_key.clone(),
295 offer: None,
296 round: self.round,
297 })
298 .await
299 {
300 error!(
301 msg_type = "NetworkNoOffer",
302 error = %e,
303 "Failed to send empty response to update actor"
304 );
305 return Err(emit_fail(ctx, e).await);
306 }
307 }
308 Err(e) => {
309 error!(
310 msg_type = "NetworkNoOffer",
311 error = %e,
312 path = %ctx.path().parent(),
313 "Update actor not found"
314 );
315 return Err(emit_fail(ctx, e).await);
316 }
317 };
318
319 'retry: {
320 let Ok(retry) = ctx
321 .get_child::<RetryActor<RetryNetwork>>("retry")
322 .await
323 else {
324 debug!(
325 msg_type = "NetworkNoOffer",
326 sender = %sender,
327 "Retry actor not found while closing updater"
328 );
329 break 'retry;
330 };
331
332 if let Err(e) = retry.tell(RetryMessage::End).await {
333 warn!(
334 msg_type = "NetworkNoOffer",
335 error = %e,
336 "Failed to end retry actor"
337 );
338 break 'retry;
339 };
340 }
341
342 debug!(
343 msg_type = "NetworkNoOffer",
344 sender = %sender,
345 "Empty network response processed successfully"
346 );
347
348 ctx.stop(None).await;
349 }
350 };
351
352 Ok(())
353 }
354
355 async fn on_child_fault(
356 &mut self,
357 error: ActorError,
358 ctx: &mut ActorContext<Self>,
359 ) -> ChildAction {
360 error!(
361 node = %self.node_key,
362 error = %error,
363 "Child fault in updater actor"
364 );
365 emit_fail(ctx, error).await;
366 ChildAction::Stop
367 }
368}