1use std::collections::HashSet;
2use std::sync::Arc;
3use std::time::Duration;
4
5use async_trait::async_trait;
6use ave_actors::{
7 Actor, ActorContext, ActorError, ActorPath, Handler, Message,
8 NotPersistentActor, Response,
9};
10use ave_common::identity::{DigestIdentifier, PublicKey};
11use rand::seq::IteratorRandom;
12use tracing::{Span, debug, info_span, warn};
13
14use crate::auth::{Auth, AuthMessage, AuthResponse};
15use crate::helpers::network::{
16 ActorMessage, NetworkMessage, service::NetworkSender,
17};
18use ave_network::ComunicateInfo;
19
20#[derive(Debug, Clone, Eq, PartialEq)]
21pub struct UpdateTarget {
22 pub peer: PublicKey,
23 pub version: u64,
24}
25
26#[derive(Debug, Clone)]
27pub enum GovernanceVersionSyncMessage {
28 RefreshGovernance {
29 version: u64,
30 governance_peers: HashSet<PublicKey>,
31 },
32 Tick,
33 RoundTimeout,
34 PeerVersion {
35 peer: PublicKey,
36 version: u64,
37 },
38}
39
40impl Message for GovernanceVersionSyncMessage {}
41
42#[derive(Debug, Clone)]
43pub enum GovernanceVersionSyncResponse {
44 None,
45}
46
47impl Response for GovernanceVersionSyncResponse {}
48
49pub struct GovernanceVersionSync {
50 governance_id: DigestIdentifier,
51 our_key: Arc<PublicKey>,
52 network: Arc<NetworkSender>,
53 local_version: u64,
54 sample_size: usize,
55 tick_interval: Duration,
56 response_timeout: Duration,
57 governance_peers: HashSet<PublicKey>,
58 pending_peers: HashSet<PublicKey>,
59 update_target: Option<UpdateTarget>,
60 round_open: bool,
61}
62
63impl GovernanceVersionSync {
64 pub fn new(
65 governance_id: DigestIdentifier,
66 our_key: Arc<PublicKey>,
67 network: Arc<NetworkSender>,
68 local_version: u64,
69 sample_size: usize,
70 tick_interval: Duration,
71 response_timeout: Duration,
72 ) -> Self {
73 Self {
74 governance_id,
75 our_key,
76 network,
77 local_version,
78 sample_size: sample_size.max(1),
79 tick_interval,
80 response_timeout,
81 governance_peers: HashSet::new(),
82 pending_peers: HashSet::new(),
83 update_target: None,
84 round_open: false,
85 }
86 }
87
88 async fn schedule_tick(
89 &self,
90 ctx: &ActorContext<Self>,
91 ) -> Result<(), ActorError> {
92 let actor = ctx.reference().await?;
93 let delay = self.tick_interval;
94 tokio::spawn(async move {
95 tokio::time::sleep(delay).await;
96 let _ = actor.tell(GovernanceVersionSyncMessage::Tick).await;
97 });
98 Ok(())
99 }
100
101 async fn schedule_timeout(
102 &self,
103 ctx: &ActorContext<Self>,
104 ) -> Result<(), ActorError> {
105 let actor = ctx.reference().await?;
106 let delay = self.response_timeout;
107 tokio::spawn(async move {
108 tokio::time::sleep(delay).await;
109 let _ =
110 actor.tell(GovernanceVersionSyncMessage::RoundTimeout).await;
111 });
112 Ok(())
113 }
114
115 fn refresh_governance(
116 &mut self,
117 version: u64,
118 mut governance_peers: HashSet<PublicKey>,
119 ) {
120 governance_peers.remove(&*self.our_key);
121 self.local_version = version;
122 self.governance_peers = governance_peers;
123
124 if self
125 .update_target
126 .as_ref()
127 .is_some_and(|target| target.version <= version)
128 {
129 self.update_target = None;
130 }
131 }
132
133 async fn trigger_update_if_needed(
134 &self,
135 _ctx: &ActorContext<Self>,
136 ) -> Result<(), ActorError> {
137 let Some(UpdateTarget { peer, .. }) = self.update_target.clone() else {
138 return Ok(());
139 };
140
141 let info = ComunicateInfo {
142 receiver: peer,
143 request_id: String::default(),
144 version: 0,
145 receiver_actor: format!(
146 "/user/node/distributor_{}",
147 self.governance_id
148 ),
149 };
150
151 self.network
152 .send_command(ave_network::CommandHelper::SendMessage {
153 message: NetworkMessage {
154 info,
155 message: ActorMessage::DistributionLedgerReq {
156 actual_sn: Some(self.local_version),
157 target_sn: None,
158 subject_id: self.governance_id.clone(),
159 },
160 },
161 })
162 .await
163 }
164
165 async fn get_auth_peers(
166 &self,
167 ctx: &ActorContext<Self>,
168 ) -> Result<HashSet<PublicKey>, ActorError> {
169 let auth_path = ActorPath::from("/user/node/auth");
170 let auth = ctx.system().get_actor::<Auth>(&auth_path).await?;
171 match auth
172 .ask(AuthMessage::GetAuth {
173 subject_id: self.governance_id.clone(),
174 })
175 .await
176 {
177 Ok(AuthResponse::Witnesses(mut witnesses)) => {
178 witnesses.remove(&*self.our_key);
179 Ok(witnesses)
180 }
181 Ok(_) => Ok(HashSet::new()),
182 Err(ActorError::Functional { .. }) => Ok(HashSet::new()),
183 Err(error) => Err(error),
184 }
185 }
186
187 fn select_peers(&self, auth_peers: HashSet<PublicKey>) -> Vec<PublicKey> {
188 let mut peers = self.governance_peers.clone();
189 peers.extend(auth_peers);
190 peers.remove(&*self.our_key);
191
192 if peers.is_empty() {
193 return Vec::new();
194 }
195
196 let mut rng = rand::rng();
197 peers
198 .iter()
199 .cloned()
200 .sample(&mut rng, self.sample_size.min(peers.len()))
201 }
202
203 fn peer_version(&mut self, peer: PublicKey, version: u64) -> bool {
204 if !self.round_open || !self.pending_peers.remove(&peer) {
205 return false;
206 }
207
208 if version <= self.local_version {
209 return self.pending_peers.is_empty();
210 }
211
212 let should_replace = self
213 .update_target
214 .as_ref()
215 .is_none_or(|target| version > target.version);
216 if should_replace {
217 self.update_target = Some(UpdateTarget { peer, version });
218 }
219
220 self.pending_peers.is_empty()
221 }
222
223 async fn handle_tick(
224 &mut self,
225 ctx: &ActorContext<Self>,
226 ) -> Result<(), ActorError> {
227 if self.update_target.is_some() {
228 self.schedule_tick(ctx).await?;
229 return Ok(());
230 }
231
232 let auth_peers = self.get_auth_peers(ctx).await?;
233 let peers = self.select_peers(auth_peers);
234
235 if peers.is_empty() {
236 self.schedule_tick(ctx).await?;
237 return Ok(());
238 }
239
240 self.pending_peers = peers.into_iter().collect();
241 self.round_open = !self.pending_peers.is_empty();
242
243 for peer in self.pending_peers.clone() {
244 let message = NetworkMessage {
245 info: ComunicateInfo {
246 receiver: peer.clone(),
247 request_id: String::default(),
248 version: 0,
249 receiver_actor: format!(
250 "/user/node/distributor_{}",
251 self.governance_id
252 ),
253 },
254 message: ActorMessage::GovernanceVersionReq {
255 subject_id: self.governance_id.clone(),
256 receiver_actor: ctx.path().to_string(),
257 },
258 };
259
260 if let Err(error) = self
261 .network
262 .send_command(ave_network::CommandHelper::SendMessage { message })
263 .await
264 {
265 warn!(
266 governance_id = %self.governance_id,
267 peer = %peer,
268 error = %error,
269 "Failed to send governance version request"
270 );
271 self.pending_peers.remove(&peer);
272 }
273 }
274
275 debug!(
276 governance_id = %self.governance_id,
277 local_version = self.local_version,
278 selected_peers = self.pending_peers.len(),
279 "Governance version sync tick"
280 );
281
282 self.schedule_timeout(ctx).await?;
284 self.schedule_tick(ctx).await?;
285
286 Ok(())
287 }
288}
289
290#[async_trait]
291impl Actor for GovernanceVersionSync {
292 type Event = ();
293 type Message = GovernanceVersionSyncMessage;
294 type Response = GovernanceVersionSyncResponse;
295
296 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
297 parent_span.map_or_else(
298 || info_span!("GovernanceVersionSync"),
299 |parent| info_span!(parent: parent, "GovernanceVersionSync"),
300 )
301 }
302
303 async fn pre_start(
304 &mut self,
305 ctx: &mut ActorContext<Self>,
306 ) -> Result<(), ActorError> {
307 self.schedule_tick(ctx).await
308 }
309}
310
311impl NotPersistentActor for GovernanceVersionSync {}
312
313#[async_trait]
314impl Handler<Self> for GovernanceVersionSync {
315 async fn handle_message(
316 &mut self,
317 _sender: ActorPath,
318 msg: GovernanceVersionSyncMessage,
319 ctx: &mut ActorContext<Self>,
320 ) -> Result<GovernanceVersionSyncResponse, ActorError> {
321 match msg {
322 GovernanceVersionSyncMessage::RefreshGovernance {
323 version,
324 governance_peers,
325 } => {
326 self.refresh_governance(version, governance_peers);
327 }
328 GovernanceVersionSyncMessage::Tick => {
329 if let Err(error) = self.handle_tick(ctx).await {
330 warn!(
331 governance_id = %self.governance_id,
332 error = %error,
333 "Governance version sync tick failed"
334 );
335 }
336 }
337 GovernanceVersionSyncMessage::RoundTimeout => {
338 if self.round_open {
339 self.round_open = false;
340 self.pending_peers.clear();
341 if let Err(error) = self.trigger_update_if_needed(ctx).await
342 {
343 warn!(
344 governance_id = %self.governance_id,
345 error = %error,
346 "Failed to trigger governance update after round timeout"
347 );
348 }
349 }
350 }
351 GovernanceVersionSyncMessage::PeerVersion { peer, version } => {
352 if self.peer_version(peer, version) {
353 self.round_open = false;
354 if let Err(error) = self.trigger_update_if_needed(ctx).await
355 {
356 warn!(
357 governance_id = %self.governance_id,
358 error = %error,
359 "Failed to trigger governance update after round completion"
360 );
361 }
362 }
363 }
364 }
365
366 Ok(GovernanceVersionSyncResponse::None)
367 }
368}