1use std::collections::HashSet;
2use std::sync::Arc;
3use std::time::Duration;
4
5use async_trait::async_trait;
6use ave_actors::{
7 Actor, ActorContext, ActorError, ActorPath, Handler, Message,
8 NotPersistentActor, Response,
9};
10use ave_common::identity::{DigestIdentifier, PublicKey};
11use rand::seq::IteratorRandom;
12use tracing::{Span, debug, info_span, warn};
13
14use crate::auth::{Auth, AuthMessage, AuthResponse};
15use crate::helpers::network::{
16 ActorMessage, NetworkMessage, service::NetworkSender,
17};
18use ave_network::ComunicateInfo;
19
20#[derive(Debug, Clone, Eq, PartialEq)]
21pub struct UpdateTarget {
22 pub peer: PublicKey,
23 pub version: u64,
24}
25
26#[derive(Debug, Clone)]
27pub enum GovernanceVersionSyncMessage {
28 RefreshGovernance {
29 version: u64,
30 governance_peers: HashSet<PublicKey>,
31 },
32 Tick,
33 RoundTimeout,
34 PeerVersion {
35 peer: PublicKey,
36 version: u64,
37 },
38}
39
40impl Message for GovernanceVersionSyncMessage {}
41
42#[derive(Debug, Clone)]
43pub enum GovernanceVersionSyncResponse {
44 None,
45}
46
47impl Response for GovernanceVersionSyncResponse {}
48
49pub struct GovernanceVersionSync {
50 governance_id: DigestIdentifier,
51 our_key: Arc<PublicKey>,
52 network: Arc<NetworkSender>,
53 local_version: u64,
54 sample_size: usize,
55 tick_interval: Duration,
56 response_timeout: Duration,
57 governance_peers: HashSet<PublicKey>,
58 pending_peers: HashSet<PublicKey>,
59 update_target: Option<UpdateTarget>,
60 round_open: bool,
61}
62
63impl GovernanceVersionSync {
64 pub fn new(
65 governance_id: DigestIdentifier,
66 our_key: Arc<PublicKey>,
67 network: Arc<NetworkSender>,
68 local_version: u64,
69 sample_size: usize,
70 tick_interval: Duration,
71 response_timeout: Duration,
72 ) -> Self {
73 Self {
74 governance_id,
75 our_key,
76 network,
77 local_version,
78 sample_size: sample_size.max(1),
79 tick_interval,
80 response_timeout,
81 governance_peers: HashSet::new(),
82 pending_peers: HashSet::new(),
83 update_target: None,
84 round_open: false,
85 }
86 }
87
88 async fn schedule_tick(
89 &self,
90 ctx: &ActorContext<Self>,
91 ) -> Result<(), ActorError> {
92 let actor = ctx.reference().await?;
93 let delay = self.tick_interval;
94 tokio::spawn(async move {
95 tokio::time::sleep(delay).await;
96 let _ = actor.tell(GovernanceVersionSyncMessage::Tick).await;
97 });
98 Ok(())
99 }
100
101 async fn schedule_timeout(
102 &self,
103 ctx: &ActorContext<Self>,
104 ) -> Result<(), ActorError> {
105 let actor = ctx.reference().await?;
106 let delay = self.response_timeout;
107 tokio::spawn(async move {
108 tokio::time::sleep(delay).await;
109 let _ =
110 actor.tell(GovernanceVersionSyncMessage::RoundTimeout).await;
111 });
112 Ok(())
113 }
114
115 fn refresh_governance(
116 &mut self,
117 version: u64,
118 mut governance_peers: HashSet<PublicKey>,
119 ) {
120 governance_peers.remove(&*self.our_key);
121 self.local_version = version;
122 self.governance_peers = governance_peers;
123
124 if self
125 .update_target
126 .as_ref()
127 .is_some_and(|target| target.version <= version)
128 {
129 self.update_target = None;
130 }
131 }
132
133 async fn trigger_update_if_needed(
134 &self,
135 _ctx: &ActorContext<Self>,
136 ) -> Result<(), ActorError> {
137 let Some(UpdateTarget { peer, .. }) = self.update_target.clone() else {
138 return Ok(());
139 };
140
141 let info = ComunicateInfo {
142 receiver: peer,
143 request_id: String::default(),
144 version: 0,
145 receiver_actor: format!(
146 "/user/node/distributor_{}",
147 self.governance_id
148 ),
149 };
150
151 self.network
152 .send_command(ave_network::CommandHelper::SendMessage {
153 message: NetworkMessage {
154 info,
155 message: ActorMessage::DistributionLedgerReq {
156 actual_sn: Some(self.local_version),
157 target_sn: None,
158 subject_id: self.governance_id.clone(),
159 },
160 },
161 })
162 .await
163 }
164
165 async fn get_auth_peers(
166 &self,
167 ctx: &ActorContext<Self>,
168 ) -> Result<HashSet<PublicKey>, ActorError> {
169 let auth_path = ActorPath::from("/user/node/auth");
170 let auth = ctx.system().get_actor::<Auth>(&auth_path).await?;
171 match auth
172 .ask(AuthMessage::GetAuth {
173 subject_id: self.governance_id.clone(),
174 })
175 .await
176 {
177 Ok(AuthResponse::Witnesses(mut witnesses)) => {
178 witnesses.remove(&*self.our_key);
179 Ok(witnesses)
180 }
181 Ok(_) => Ok(HashSet::new()),
182 Err(ActorError::Functional { .. }) => Ok(HashSet::new()),
183 Err(error) => Err(error),
184 }
185 }
186
187 fn select_peers(&self, auth_peers: HashSet<PublicKey>) -> Vec<PublicKey> {
188 let mut peers = self.governance_peers.clone();
189 peers.extend(auth_peers);
190 peers.remove(&*self.our_key);
191
192 if peers.is_empty() {
193 return Vec::new();
194 }
195
196 let mut rng = rand::rng();
197 peers
198 .iter()
199 .cloned()
200 .sample(&mut rng, self.sample_size.min(peers.len()))
201 }
202
203 fn peer_version(&mut self, peer: PublicKey, version: u64) -> bool {
204 if !self.round_open || !self.pending_peers.remove(&peer) {
205 return false;
206 }
207
208 if version <= self.local_version {
209 return self.pending_peers.is_empty();
210 }
211
212 let should_replace = self
213 .update_target
214 .as_ref()
215 .is_none_or(|target| version > target.version);
216 if should_replace {
217 self.update_target = Some(UpdateTarget { peer, version });
218 }
219
220 self.pending_peers.is_empty()
221 }
222
223 async fn handle_tick(
224 &mut self,
225 ctx: &ActorContext<Self>,
226 ) -> Result<(), ActorError> {
227 if self.update_target.is_some() {
228 self.schedule_tick(ctx).await?;
229 return Ok(());
230 }
231
232 let auth_peers = self.get_auth_peers(ctx).await?;
233 let peers = self.select_peers(auth_peers);
234
235 if peers.is_empty() {
236 self.schedule_tick(ctx).await?;
237 return Ok(());
238 }
239
240 self.pending_peers = peers.into_iter().collect();
241 self.round_open = !self.pending_peers.is_empty();
242
243 for peer in self.pending_peers.clone() {
244 let message = NetworkMessage {
245 info: ComunicateInfo {
246 receiver: peer.clone(),
247 request_id: String::default(),
248 version: 0,
249 receiver_actor: format!(
250 "/user/node/distributor_{}",
251 self.governance_id
252 ),
253 },
254 message: ActorMessage::GovernanceVersionReq {
255 subject_id: self.governance_id.clone(),
256 receiver_actor: ctx.path().to_string(),
257 },
258 };
259
260 if let Err(error) = self
261 .network
262 .send_command(ave_network::CommandHelper::SendMessage {
263 message,
264 })
265 .await
266 {
267 warn!(
268 governance_id = %self.governance_id,
269 peer = %peer,
270 error = %error,
271 "Failed to send governance version request"
272 );
273 self.pending_peers.remove(&peer);
274 }
275 }
276
277 debug!(
278 governance_id = %self.governance_id,
279 local_version = self.local_version,
280 selected_peers = self.pending_peers.len(),
281 "Governance version sync tick"
282 );
283
284 self.schedule_timeout(ctx).await?;
286 self.schedule_tick(ctx).await?;
287
288 Ok(())
289 }
290}
291
292#[async_trait]
293impl Actor for GovernanceVersionSync {
294 type Event = ();
295 type Message = GovernanceVersionSyncMessage;
296 type Response = GovernanceVersionSyncResponse;
297
298 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
299 parent_span.map_or_else(
300 || info_span!("GovernanceVersionSync"),
301 |parent| info_span!(parent: parent, "GovernanceVersionSync"),
302 )
303 }
304
305 async fn pre_start(
306 &mut self,
307 ctx: &mut ActorContext<Self>,
308 ) -> Result<(), ActorError> {
309 self.schedule_tick(ctx).await
310 }
311}
312
313impl NotPersistentActor for GovernanceVersionSync {}
314
315#[async_trait]
316impl Handler<Self> for GovernanceVersionSync {
317 async fn handle_message(
318 &mut self,
319 _sender: ActorPath,
320 msg: GovernanceVersionSyncMessage,
321 ctx: &mut ActorContext<Self>,
322 ) -> Result<GovernanceVersionSyncResponse, ActorError> {
323 match msg {
324 GovernanceVersionSyncMessage::RefreshGovernance {
325 version,
326 governance_peers,
327 } => {
328 self.refresh_governance(version, governance_peers);
329 }
330 GovernanceVersionSyncMessage::Tick => {
331 if let Err(error) = self.handle_tick(ctx).await {
332 warn!(
333 governance_id = %self.governance_id,
334 error = %error,
335 "Governance version sync tick failed"
336 );
337 }
338 }
339 GovernanceVersionSyncMessage::RoundTimeout => {
340 if self.round_open {
341 self.round_open = false;
342 self.pending_peers.clear();
343 if let Err(error) = self.trigger_update_if_needed(ctx).await
344 {
345 warn!(
346 governance_id = %self.governance_id,
347 error = %error,
348 "Failed to trigger governance update after round timeout"
349 );
350 }
351 }
352 }
353 GovernanceVersionSyncMessage::PeerVersion { peer, version } => {
354 if self.peer_version(peer, version) {
355 self.round_open = false;
356 if let Err(error) = self.trigger_update_if_needed(ctx).await
357 {
358 warn!(
359 governance_id = %self.governance_id,
360 error = %error,
361 "Failed to trigger governance update after round completion"
362 );
363 }
364 }
365 }
366 }
367
368 Ok(GovernanceVersionSyncResponse::None)
369 }
370}