1use std::{collections::HashSet, sync::Arc};
2
3use ave_actors::{
4 Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
5 NotPersistentActor,
6};
7
8use async_trait::async_trait;
9use ave_common::identity::{DigestIdentifier, PublicKey};
10use network::ComunicateInfo;
11use serde::{Deserialize, Serialize};
12use tracing::{Span, debug, error, info_span};
13use updater::{Updater, UpdaterMessage};
14
15use crate::{
16 NetworkMessage,
17 helpers::network::{ActorMessage, service::NetworkSender},
18 model::common::emit_fail,
19 request::manager::{RequestManager, RequestManagerMessage},
20};
21
22pub mod updater;
23
24#[derive(Clone, Debug, Serialize, Deserialize)]
25pub enum UpdateType {
26 Auth,
27 Request {
28 subject_id: DigestIdentifier,
29 id: DigestIdentifier,
30 },
31}
32
33pub struct UpdateNew {
34 pub subject_id: DigestIdentifier,
35 pub witnesses: HashSet<PublicKey>,
36 pub update_type: UpdateType,
37 pub network: Arc<NetworkSender>,
38 pub our_sn: Option<u64>,
39}
40
41#[derive(Clone, Debug)]
42pub struct Update {
43 subject_id: DigestIdentifier,
44 witnesses: HashSet<PublicKey>,
45 better: Option<(u64, PublicKey)>,
46 our_sn: Option<u64>,
47 update_type: UpdateType,
48 network: Arc<NetworkSender>,
49}
50
51impl Update {
52 pub fn new(data: UpdateNew) -> Self {
53 Self {
54 network: data.network,
55 subject_id: data.subject_id,
56 witnesses: data.witnesses,
57 update_type: data.update_type,
58 our_sn: data.our_sn,
59 better: None,
60 }
61 }
62
63 pub fn update_better(&mut self, sn: u64, sender: PublicKey) {
64 match self.better {
65 Some((better_sn, ..)) => {
66 if sn > better_sn {
67 self.better = Some((sn, sender))
68 }
69 }
70 None => {
71 self.better = Some((sn, sender));
72 }
73 }
74 }
75
76 fn check_witness(&mut self, witness: PublicKey) -> bool {
77 self.witnesses.remove(&witness)
78 }
79
80 async fn create_updates(
81 &self,
82 ctx: &mut ActorContext<Self>,
83 ) -> Result<(), ActorError> {
84 for witness in self.witnesses.clone() {
85 let updater = Updater::new(witness.clone(), self.network.clone());
86 let child = ctx.create_child(&witness.to_string(), updater).await?;
87 let message = UpdaterMessage::NetworkLastSn {
88 subject_id: self.subject_id.clone(),
89 node_key: witness,
90 };
91
92 child.tell(message).await?;
93 }
94 Ok(())
95 }
96}
97
98#[derive(Debug, Clone)]
99pub enum UpdateMessage {
100 Run,
101 Response { sender: PublicKey, sn: u64 },
102}
103
104impl Message for UpdateMessage {}
105
106#[async_trait]
107impl Actor for Update {
108 type Event = ();
109 type Message = UpdateMessage;
110 type Response = ();
111
112 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
113 parent_span.map_or_else(
114 || info_span!("Update", id),
115 |parent_span| info_span!(parent: parent_span, "Update", id),
116 )
117 }
118}
119
120impl NotPersistentActor for Update {}
121
122#[async_trait]
123impl Handler<Self> for Update {
124 async fn handle_message(
125 &mut self,
126 _sender: ActorPath,
127 msg: UpdateMessage,
128 ctx: &mut ActorContext<Self>,
129 ) -> Result<(), ActorError> {
130 match msg {
131 UpdateMessage::Run => {
132 if let Err(e) = self.create_updates(ctx).await {
133 error!(
134 msg_type = "Run",
135 error = %e,
136 "Failed to create updates"
137 );
138 return Err(emit_fail(ctx, e).await);
139 } else {
140 debug!(
141 msg_type = "Run",
142 witnesses_count = self.witnesses.len(),
143 "Updates created successfully"
144 );
145 }
146 }
147 UpdateMessage::Response { sender, sn } => {
148 if self.check_witness(sender.clone()) {
149 self.update_better(sn, sender);
150
151 if self.witnesses.is_empty() {
152 if let Some((.., better_node)) = self.better.clone() {
153 let info = ComunicateInfo {
154 receiver: better_node.clone(),
155 request_id: String::default(),
156 version: 0,
157 receiver_actor: format!(
158 "/user/node/distributor_{}",
159 self.subject_id
160 ),
161 };
162
163 if let Err(e) = self
164 .network
165 .send_command(
166 network::CommandHelper::SendMessage {
167 message: NetworkMessage {
168 info: info.clone(),
169 message: ActorMessage::DistributionLedgerReq {
170 actual_sn: self.our_sn,
171 subject_id: self.subject_id.clone(),
172 },
173 },
174 },
175 )
176 .await
177 {
178 error!(
179 msg_type = "Response",
180 error = %e,
181 node = %better_node,
182 "Failed to send request to network"
183 );
184 return Err(emit_fail(ctx, e).await);
185 } else {
186 debug!(
187 msg_type = "Response",
188 node = %info.receiver,
189 subject_id = %self.subject_id,
190 "Request sent to better node"
191 );
192 }
193 }
194
195 if let UpdateType::Request { id, subject_id } =
196 &self.update_type
197 {
198 let request_path = ActorPath::from(format!(
199 "/user/request/{}",
200 subject_id
201 ));
202 match ctx
203 .system()
204 .get_actor::<RequestManager>(&request_path)
205 .await
206 {
207 Ok(request_actor) => {
208 let request = if self.better.is_none() {
209 RequestManagerMessage::FinishReboot {
210 request_id: id.clone(),
211 }
212 } else {
213 RequestManagerMessage::RebootWait {
214 request_id: id.clone(),
215 governance_id: self
216 .subject_id
217 .clone(),
218 }
219 };
220
221 if let Err(e) =
222 request_actor.tell(request).await
223 {
224 error!(
225 msg_type = "Response",
226 error = %e,
227 subject_id = %self.subject_id,
228 "Failed to send response to request actor"
229 );
230 return Err(emit_fail(ctx, e).await);
231 }
232 }
233 Err(e) => {
234 error!(
235 msg_type = "Response",
236 path = %request_path,
237 subject_id = %self.subject_id,
238 "Request actor not found"
239 );
240 return Err(emit_fail(ctx, e).await);
241 }
242 };
243 };
244
245 debug!(
246 msg_type = "Response",
247 subject_id = %self.subject_id,
248 has_better = self.better.is_some(),
249 "All witnesses responded, update complete"
250 );
251
252 ctx.stop(None).await;
253 }
254 }
255 }
256 };
257
258 Ok(())
259 }
260
261 async fn on_child_fault(
262 &mut self,
263 error: ActorError,
264 ctx: &mut ActorContext<Self>,
265 ) -> ChildAction {
266 error!(
267 subject_id = %self.subject_id,
268 update_type = ?self.update_type,
269 error = %error,
270 "Child fault in update actor"
271 );
272 emit_fail(ctx, error).await;
273 ChildAction::Stop
274 }
275}