1use std::collections::HashSet;
2use std::sync::Arc;
3use std::time::Duration;
4
5use async_trait::async_trait;
6use ave_actors::{
7 Actor, ActorContext, ActorError, ActorPath, Handler, Message,
8 NotPersistentActor, Response,
9};
10use ave_common::identity::{DigestIdentifier, PublicKey};
11use rand::seq::IteratorRandom;
12use tracing::{Span, debug, info_span, warn};
13
14use crate::auth::{Auth, AuthMessage, AuthResponse};
15use crate::helpers::network::{
16 ActorMessage, NetworkMessage, service::NetworkSender,
17};
18use network::ComunicateInfo;
19
20#[derive(Debug, Clone, Eq, PartialEq)]
21pub struct UpdateTarget {
22 pub peer: PublicKey,
23 pub version: u64,
24}
25
26#[derive(Debug, Clone)]
27pub enum GovernanceVersionSyncMessage {
28 RefreshGovernance {
29 version: u64,
30 governance_peers: HashSet<PublicKey>,
31 },
32 Tick,
33 RoundTimeout,
34 PeerVersion {
35 peer: PublicKey,
36 version: u64,
37 },
38}
39
40impl Message for GovernanceVersionSyncMessage {}
41
42#[derive(Debug, Clone)]
43pub enum GovernanceVersionSyncResponse {
44 None,
45}
46
47impl Response for GovernanceVersionSyncResponse {}
48
49pub struct GovernanceVersionSync {
50 governance_id: DigestIdentifier,
51 our_key: Arc<PublicKey>,
52 network: Arc<NetworkSender>,
53 local_version: u64,
54 sample_size: usize,
55 tick_interval: Duration,
56 response_timeout: Duration,
57 governance_peers: HashSet<PublicKey>,
58 pending_peers: HashSet<PublicKey>,
59 update_target: Option<UpdateTarget>,
60 round_open: bool,
61}
62
63impl GovernanceVersionSync {
64 pub fn new(
65 governance_id: DigestIdentifier,
66 our_key: Arc<PublicKey>,
67 network: Arc<NetworkSender>,
68 local_version: u64,
69 sample_size: usize,
70 tick_interval: Duration,
71 response_timeout: Duration,
72 ) -> Self {
73 Self {
74 governance_id,
75 our_key,
76 network,
77 local_version,
78 sample_size: sample_size.max(1),
79 tick_interval,
80 response_timeout,
81 governance_peers: HashSet::new(),
82 pending_peers: HashSet::new(),
83 update_target: None,
84 round_open: false,
85 }
86 }
87
88 async fn schedule_tick(
89 &self,
90 ctx: &ActorContext<Self>,
91 ) -> Result<(), ActorError> {
92 let actor = ctx.reference().await?;
93 let delay = self.tick_interval;
94 tokio::spawn(async move {
95 tokio::time::sleep(delay).await;
96 let _ = actor.tell(GovernanceVersionSyncMessage::Tick).await;
97 });
98 Ok(())
99 }
100
101 async fn schedule_timeout(
102 &self,
103 ctx: &ActorContext<Self>,
104 ) -> Result<(), ActorError> {
105 let actor = ctx.reference().await?;
106 let delay = self.response_timeout;
107 tokio::spawn(async move {
108 tokio::time::sleep(delay).await;
109 let _ =
110 actor.tell(GovernanceVersionSyncMessage::RoundTimeout).await;
111 });
112 Ok(())
113 }
114
115 fn refresh_governance(
116 &mut self,
117 version: u64,
118 mut governance_peers: HashSet<PublicKey>,
119 ) {
120 governance_peers.remove(&*self.our_key);
121 self.local_version = version;
122 self.governance_peers = governance_peers;
123
124 if self
125 .update_target
126 .as_ref()
127 .is_some_and(|target| target.version <= version)
128 {
129 self.update_target = None;
130 }
131 }
132
133 async fn trigger_update_if_needed(
134 &self,
135 _ctx: &ActorContext<Self>,
136 ) -> Result<(), ActorError> {
137 let Some(UpdateTarget { peer, .. }) = self.update_target.clone() else {
138 return Ok(());
139 };
140
141 let info = ComunicateInfo {
142 receiver: peer,
143 request_id: String::default(),
144 version: 0,
145 receiver_actor: format!(
146 "/user/node/distributor_{}",
147 self.governance_id
148 ),
149 };
150
151 self.network
152 .send_command(network::CommandHelper::SendMessage {
153 message: NetworkMessage {
154 info,
155 message: ActorMessage::DistributionLedgerReq {
156 actual_sn: Some(self.local_version),
157 subject_id: self.governance_id.clone(),
158 },
159 },
160 })
161 .await
162 }
163
164 async fn get_auth_peers(
165 &self,
166 ctx: &ActorContext<Self>,
167 ) -> Result<HashSet<PublicKey>, ActorError> {
168 let auth_path = ActorPath::from("/user/node/auth");
169 let auth = ctx.system().get_actor::<Auth>(&auth_path).await?;
170 match auth
171 .ask(AuthMessage::GetAuth {
172 subject_id: self.governance_id.clone(),
173 })
174 .await
175 {
176 Ok(AuthResponse::Witnesses(mut witnesses)) => {
177 witnesses.remove(&*self.our_key);
178 Ok(witnesses)
179 }
180 Ok(_) => Ok(HashSet::new()),
181 Err(ActorError::Functional { .. }) => Ok(HashSet::new()),
182 Err(error) => Err(error),
183 }
184 }
185
186 fn select_peers(&self, auth_peers: HashSet<PublicKey>) -> Vec<PublicKey> {
187 let mut peers = self.governance_peers.clone();
188 peers.extend(auth_peers);
189 peers.remove(&*self.our_key);
190
191 if peers.is_empty() {
192 return Vec::new();
193 }
194
195 let mut rng = rand::rng();
196 peers
197 .iter()
198 .cloned()
199 .sample(&mut rng, self.sample_size.min(peers.len()))
200 }
201
202 fn peer_version(&mut self, peer: PublicKey, version: u64) -> bool {
203 if !self.round_open || !self.pending_peers.remove(&peer) {
204 return false;
205 }
206
207 if version <= self.local_version {
208 return self.pending_peers.is_empty();
209 }
210
211 let should_replace = self
212 .update_target
213 .as_ref()
214 .is_none_or(|target| version > target.version);
215 if should_replace {
216 self.update_target = Some(UpdateTarget { peer, version });
217 }
218
219 self.pending_peers.is_empty()
220 }
221
222 async fn handle_tick(
223 &mut self,
224 ctx: &ActorContext<Self>,
225 ) -> Result<(), ActorError> {
226 if self.update_target.is_some() {
227 self.schedule_tick(ctx).await?;
228 return Ok(());
229 }
230
231 let auth_peers = self.get_auth_peers(ctx).await?;
232 let peers = self.select_peers(auth_peers);
233
234 if peers.is_empty() {
235 self.schedule_tick(ctx).await?;
236 return Ok(());
237 }
238
239 self.pending_peers = peers.into_iter().collect();
240 self.round_open = !self.pending_peers.is_empty();
241
242 for peer in self.pending_peers.clone() {
243 let message = NetworkMessage {
244 info: ComunicateInfo {
245 receiver: peer.clone(),
246 request_id: String::default(),
247 version: 0,
248 receiver_actor: format!(
249 "/user/node/distributor_{}",
250 self.governance_id
251 ),
252 },
253 message: ActorMessage::GovernanceVersionReq {
254 subject_id: self.governance_id.clone(),
255 receiver_actor: ctx.path().to_string(),
256 },
257 };
258
259 if let Err(error) = self
260 .network
261 .send_command(network::CommandHelper::SendMessage { message })
262 .await
263 {
264 warn!(
265 governance_id = %self.governance_id,
266 peer = %peer,
267 error = %error,
268 "Failed to send governance version request"
269 );
270 self.pending_peers.remove(&peer);
271 }
272 }
273
274 debug!(
275 governance_id = %self.governance_id,
276 local_version = self.local_version,
277 selected_peers = self.pending_peers.len(),
278 "Governance version sync tick"
279 );
280
281 self.schedule_timeout(ctx).await?;
283 self.schedule_tick(ctx).await?;
284
285 Ok(())
286 }
287}
288
289#[async_trait]
290impl Actor for GovernanceVersionSync {
291 type Event = ();
292 type Message = GovernanceVersionSyncMessage;
293 type Response = GovernanceVersionSyncResponse;
294
295 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
296 parent_span.map_or_else(
297 || info_span!("GovernanceVersionSync"),
298 |parent| info_span!(parent: parent, "GovernanceVersionSync"),
299 )
300 }
301
302 async fn pre_start(
303 &mut self,
304 ctx: &mut ActorContext<Self>,
305 ) -> Result<(), ActorError> {
306 self.schedule_tick(ctx).await
307 }
308}
309
310impl NotPersistentActor for GovernanceVersionSync {}
311
312#[async_trait]
313impl Handler<Self> for GovernanceVersionSync {
314 async fn handle_message(
315 &mut self,
316 _sender: ActorPath,
317 msg: GovernanceVersionSyncMessage,
318 ctx: &mut ActorContext<Self>,
319 ) -> Result<GovernanceVersionSyncResponse, ActorError> {
320 match msg {
321 GovernanceVersionSyncMessage::RefreshGovernance {
322 version,
323 governance_peers,
324 } => {
325 self.refresh_governance(version, governance_peers);
326 }
327 GovernanceVersionSyncMessage::Tick => {
328 if let Err(error) = self.handle_tick(ctx).await {
329 warn!(
330 governance_id = %self.governance_id,
331 error = %error,
332 "Governance version sync tick failed"
333 );
334 }
335 }
336 GovernanceVersionSyncMessage::RoundTimeout => {
337 if self.round_open {
338 self.round_open = false;
339 self.pending_peers.clear();
340 if let Err(error) = self.trigger_update_if_needed(ctx).await
341 {
342 warn!(
343 governance_id = %self.governance_id,
344 error = %error,
345 "Failed to trigger governance update after round timeout"
346 );
347 }
348 }
349 }
350 GovernanceVersionSyncMessage::PeerVersion { peer, version } => {
351 if self.peer_version(peer, version) {
352 self.round_open = false;
353 if let Err(error) = self.trigger_update_if_needed(ctx).await
354 {
355 warn!(
356 governance_id = %self.governance_id,
357 error = %error,
358 "Failed to trigger governance update after round completion"
359 );
360 }
361 }
362 }
363 }
364
365 Ok(GovernanceVersionSyncResponse::None)
366 }
367}