1use std::{sync::Arc, time::Duration};
2
3use ave_actors::{
4 Actor, ActorContext, ActorError, ActorPath, ChildAction,
5 FixedIntervalStrategy, Handler, Message, NotPersistentActor, RetryActor,
6 RetryMessage, Strategy,
7};
8
9use async_trait::async_trait;
10use ave_common::identity::{DigestIdentifier, PublicKey};
11use network::ComunicateInfo;
12use tracing::{Span, debug, error, info_span, warn};
13
14use crate::{
15 ActorMessage,
16 helpers::network::{NetworkMessage, service::NetworkSender},
17 model::{common::emit_fail, network::RetryNetwork},
18};
19
20use super::{Update, UpdateMessage};
21
22#[derive(Clone, Debug)]
23pub struct Updater {
24 network: Arc<NetworkSender>,
25 node_key: PublicKey,
26}
27
28impl Updater {
29 pub const fn new(node_key: PublicKey, network: Arc<NetworkSender>) -> Self {
30 Self { node_key, network }
31 }
32}
33
34#[derive(Debug, Clone)]
35pub enum UpdaterMessage {
36 EndRetry,
37 NetworkLastSn {
38 subject_id: DigestIdentifier,
39 node_key: PublicKey,
40 },
41 NetworkResponse {
42 sn: u64,
43 sender: PublicKey,
44 },
45}
46
47impl Message for UpdaterMessage {}
48
49impl NotPersistentActor for Updater {}
50
51#[async_trait]
52impl Actor for Updater {
53 type Event = ();
54 type Message = UpdaterMessage;
55 type Response = ();
56
57 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
58 parent_span.map_or_else(
59 || info_span!("Updater", id),
60 |parent_span| info_span!(parent: parent_span, "Updater", id),
61 )
62 }
63}
64
65#[async_trait]
66impl Handler<Self> for Updater {
67 async fn handle_message(
68 &mut self,
69 _sender: ActorPath,
70 msg: UpdaterMessage,
71 ctx: &mut ActorContext<Self>,
72 ) -> Result<(), ActorError> {
73 match msg {
74 UpdaterMessage::EndRetry => {
75 debug!(
76 node_key = %self.node_key,
77 "Retry exhausted, notifying parent and stopping"
78 );
79
80 match ctx.get_parent::<Update>().await {
81 Ok(update_actor) => {
82 if let Err(e) = update_actor
83 .tell(UpdateMessage::Response {
84 sender: self.node_key.clone(),
85 sn: 0,
86 })
87 .await
88 {
89 error!(
90 error = %e,
91 "Failed to send timeout response to update actor"
92 );
93 emit_fail(ctx, e).await;
94 } else {
95 debug!(
96 node = %self.node_key,
97 "Timeout response sent to update actor"
98 );
99 }
100 }
101 Err(e) => {
102 error!(
103 path = %ctx.path().parent(),
104 "Update actor not found"
105 );
106 emit_fail(ctx, e).await;
107 }
108 };
109
110 ctx.stop(None).await;
111 }
112 UpdaterMessage::NetworkLastSn {
113 subject_id,
114 node_key,
115 } => {
116 let message = NetworkMessage {
117 info: ComunicateInfo {
118 request_id: String::default(),
119 version: 0,
120 receiver: node_key.clone(),
121 receiver_actor: format!(
122 "/user/node/distributor_{}",
123 subject_id
124 ),
125 },
126 message: ActorMessage::DistributionGetLastSn {
127 subject_id: subject_id.clone(),
128 receiver_actor: ctx.path().to_string(),
129 },
130 };
131
132 let target = RetryNetwork::new(self.network.clone());
133
134 let strategy = Strategy::FixedInterval(
135 FixedIntervalStrategy::new(1, Duration::from_secs(10)),
136 );
137
138 let retry_actor = RetryActor::new_with_parent_message::<Self>(
139 target,
140 message,
141 strategy,
142 UpdaterMessage::EndRetry,
143 );
144
145 let retry = match ctx
146 .create_child::<RetryActor<RetryNetwork>, _>(
147 "retry",
148 retry_actor,
149 )
150 .await
151 {
152 Ok(retry) => retry,
153 Err(e) => {
154 error!(
155 msg_type = "NetworkLastSn",
156 error = %e,
157 "Failed to create retry actor"
158 );
159 return Err(emit_fail(ctx, e).await);
160 }
161 };
162
163 if let Err(e) = retry.tell(RetryMessage::Retry).await {
164 error!(
165 msg_type = "NetworkLastSn",
166 error = %e,
167 "Failed to send retry message to retry actor"
168 );
169 return Err(emit_fail(ctx, e).await);
170 } else {
171 debug!(
172 msg_type = "NetworkLastSn",
173 subject_id = %subject_id,
174 node_key = %node_key,
175 "Last SN request sent to network with retry"
176 );
177 };
178 }
179 UpdaterMessage::NetworkResponse { sn, sender } => {
180 if sender == self.node_key {
181 match ctx.get_parent::<Update>().await {
182 Ok(update_actor) => {
183 if let Err(e) = update_actor
184 .tell(UpdateMessage::Response {
185 sender: self.node_key.clone(),
186 sn,
187 })
188 .await
189 {
190 error!(
191 msg_type = "NetworkResponse",
192 error = %e,
193 "Failed to send response to update actor"
194 );
195 return Err(emit_fail(ctx, e).await);
196 }
197 }
198 Err(e) => {
199 error!(
200 msg_type = "NetworkResponse",
201 path = %ctx.path().parent(),
202 "Update actor not found"
203 );
204 return Err(emit_fail(ctx, e).await);
205 }
206 };
207
208 'retry: {
209 let Ok(retry) = ctx
210 .get_child::<RetryActor<RetryNetwork>>("retry")
211 .await
212 else {
213 break 'retry;
215 };
216
217 if let Err(e) = retry.tell(RetryMessage::End).await {
218 warn!(
219 msg_type = "NetworkResponse",
220 error = %e,
221 "Failed to end retry actor"
222 );
223 break 'retry;
225 };
226 }
227
228 debug!(
229 msg_type = "NetworkResponse",
230 sn = sn,
231 sender = %sender,
232 "Network response processed successfully"
233 );
234
235 ctx.stop(None).await;
236 }
237 }
238 };
239
240 Ok(())
241 }
242
243 async fn on_child_fault(
244 &mut self,
245 error: ActorError,
246 ctx: &mut ActorContext<Self>,
247 ) -> ChildAction {
248 error!(
249 node = %self.node_key,
250 error = %error,
251 "Child fault in updater actor"
252 );
253 emit_fail(ctx, error).await;
254 ChildAction::Stop
255 }
256}