Skip to main content

ave_core/governance/
version_sync.rs

1use std::collections::HashSet;
2use std::sync::Arc;
3use std::time::Duration;
4
5use async_trait::async_trait;
6use ave_actors::{
7    Actor, ActorContext, ActorError, ActorPath, Handler, Message,
8    NotPersistentActor, Response,
9};
10use ave_common::identity::{DigestIdentifier, PublicKey};
11use rand::seq::IteratorRandom;
12use tracing::{Span, debug, info_span, warn};
13
14use crate::auth::{Auth, AuthMessage, AuthResponse};
15use crate::helpers::network::{
16    ActorMessage, NetworkMessage, service::NetworkSender,
17};
18use ave_network::ComunicateInfo;
19
20#[derive(Debug, Clone, Eq, PartialEq)]
21pub struct UpdateTarget {
22    pub peer: PublicKey,
23    pub version: u64,
24}
25
26#[derive(Debug, Clone)]
27pub enum GovernanceVersionSyncMessage {
28    RefreshGovernance {
29        version: u64,
30        governance_peers: HashSet<PublicKey>,
31    },
32    Tick,
33    RoundTimeout,
34    PeerVersion {
35        peer: PublicKey,
36        version: u64,
37    },
38}
39
40impl Message for GovernanceVersionSyncMessage {}
41
42#[derive(Debug, Clone)]
43pub enum GovernanceVersionSyncResponse {
44    None,
45}
46
47impl Response for GovernanceVersionSyncResponse {}
48
49pub struct GovernanceVersionSync {
50    governance_id: DigestIdentifier,
51    our_key: Arc<PublicKey>,
52    network: Arc<NetworkSender>,
53    local_version: u64,
54    sample_size: usize,
55    tick_interval: Duration,
56    response_timeout: Duration,
57    governance_peers: HashSet<PublicKey>,
58    pending_peers: HashSet<PublicKey>,
59    update_target: Option<UpdateTarget>,
60    round_open: bool,
61}
62
63impl GovernanceVersionSync {
64    pub fn new(
65        governance_id: DigestIdentifier,
66        our_key: Arc<PublicKey>,
67        network: Arc<NetworkSender>,
68        local_version: u64,
69        sample_size: usize,
70        tick_interval: Duration,
71        response_timeout: Duration,
72    ) -> Self {
73        Self {
74            governance_id,
75            our_key,
76            network,
77            local_version,
78            sample_size: sample_size.max(1),
79            tick_interval,
80            response_timeout,
81            governance_peers: HashSet::new(),
82            pending_peers: HashSet::new(),
83            update_target: None,
84            round_open: false,
85        }
86    }
87
88    async fn schedule_tick(
89        &self,
90        ctx: &ActorContext<Self>,
91    ) -> Result<(), ActorError> {
92        let actor = ctx.reference().await?;
93        let delay = self.tick_interval;
94        tokio::spawn(async move {
95            tokio::time::sleep(delay).await;
96            let _ = actor.tell(GovernanceVersionSyncMessage::Tick).await;
97        });
98        Ok(())
99    }
100
101    async fn schedule_timeout(
102        &self,
103        ctx: &ActorContext<Self>,
104    ) -> Result<(), ActorError> {
105        let actor = ctx.reference().await?;
106        let delay = self.response_timeout;
107        tokio::spawn(async move {
108            tokio::time::sleep(delay).await;
109            let _ =
110                actor.tell(GovernanceVersionSyncMessage::RoundTimeout).await;
111        });
112        Ok(())
113    }
114
115    fn refresh_governance(
116        &mut self,
117        version: u64,
118        mut governance_peers: HashSet<PublicKey>,
119    ) {
120        governance_peers.remove(&*self.our_key);
121        self.local_version = version;
122        self.governance_peers = governance_peers;
123
124        if self
125            .update_target
126            .as_ref()
127            .is_some_and(|target| target.version <= version)
128        {
129            self.update_target = None;
130        }
131    }
132
133    async fn trigger_update_if_needed(
134        &self,
135        _ctx: &ActorContext<Self>,
136    ) -> Result<(), ActorError> {
137        let Some(UpdateTarget { peer, .. }) = self.update_target.clone() else {
138            return Ok(());
139        };
140
141        let info = ComunicateInfo {
142            receiver: peer,
143            request_id: String::default(),
144            version: 0,
145            receiver_actor: format!(
146                "/user/node/distributor_{}",
147                self.governance_id
148            ),
149        };
150
151        self.network
152            .send_command(ave_network::CommandHelper::SendMessage {
153                message: NetworkMessage {
154                    info,
155                    message: ActorMessage::DistributionLedgerReq {
156                        actual_sn: Some(self.local_version),
157                        target_sn: None,
158                        subject_id: self.governance_id.clone(),
159                    },
160                },
161            })
162            .await
163    }
164
165    async fn get_auth_peers(
166        &self,
167        ctx: &ActorContext<Self>,
168    ) -> Result<HashSet<PublicKey>, ActorError> {
169        let auth_path = ActorPath::from("/user/node/auth");
170        let auth = ctx.system().get_actor::<Auth>(&auth_path).await?;
171        match auth
172            .ask(AuthMessage::GetAuth {
173                subject_id: self.governance_id.clone(),
174            })
175            .await
176        {
177            Ok(AuthResponse::Witnesses(mut witnesses)) => {
178                witnesses.remove(&*self.our_key);
179                Ok(witnesses)
180            }
181            Ok(_) => Ok(HashSet::new()),
182            Err(ActorError::Functional { .. }) => Ok(HashSet::new()),
183            Err(error) => Err(error),
184        }
185    }
186
187    fn select_peers(&self, auth_peers: HashSet<PublicKey>) -> Vec<PublicKey> {
188        let mut peers = self.governance_peers.clone();
189        peers.extend(auth_peers);
190        peers.remove(&*self.our_key);
191
192        if peers.is_empty() {
193            return Vec::new();
194        }
195
196        let mut rng = rand::rng();
197        peers
198            .iter()
199            .cloned()
200            .sample(&mut rng, self.sample_size.min(peers.len()))
201    }
202
203    fn peer_version(&mut self, peer: PublicKey, version: u64) -> bool {
204        if !self.round_open || !self.pending_peers.remove(&peer) {
205            return false;
206        }
207
208        if version <= self.local_version {
209            return self.pending_peers.is_empty();
210        }
211
212        let should_replace = self
213            .update_target
214            .as_ref()
215            .is_none_or(|target| version > target.version);
216        if should_replace {
217            self.update_target = Some(UpdateTarget { peer, version });
218        }
219
220        self.pending_peers.is_empty()
221    }
222
223    async fn handle_tick(
224        &mut self,
225        ctx: &ActorContext<Self>,
226    ) -> Result<(), ActorError> {
227        if self.update_target.is_some() {
228            self.schedule_tick(ctx).await?;
229            return Ok(());
230        }
231
232        let auth_peers = self.get_auth_peers(ctx).await?;
233        let peers = self.select_peers(auth_peers);
234
235        if peers.is_empty() {
236            self.schedule_tick(ctx).await?;
237            return Ok(());
238        }
239
240        self.pending_peers = peers.into_iter().collect();
241        self.round_open = !self.pending_peers.is_empty();
242
243        for peer in self.pending_peers.clone() {
244            let message = NetworkMessage {
245                info: ComunicateInfo {
246                    receiver: peer.clone(),
247                    request_id: String::default(),
248                    version: 0,
249                    receiver_actor: format!(
250                        "/user/node/distributor_{}",
251                        self.governance_id
252                    ),
253                },
254                message: ActorMessage::GovernanceVersionReq {
255                    subject_id: self.governance_id.clone(),
256                    receiver_actor: ctx.path().to_string(),
257                },
258            };
259
260            if let Err(error) = self
261                .network
262                .send_command(ave_network::CommandHelper::SendMessage {
263                    message,
264                })
265                .await
266            {
267                warn!(
268                    governance_id = %self.governance_id,
269                    peer = %peer,
270                    error = %error,
271                    "Failed to send governance version request"
272                );
273                self.pending_peers.remove(&peer);
274            }
275        }
276
277        debug!(
278            governance_id = %self.governance_id,
279            local_version = self.local_version,
280            selected_peers = self.pending_peers.len(),
281            "Governance version sync tick"
282        );
283
284        // The actual network request/response path is integrated later.
285        self.schedule_timeout(ctx).await?;
286        self.schedule_tick(ctx).await?;
287
288        Ok(())
289    }
290}
291
292#[async_trait]
293impl Actor for GovernanceVersionSync {
294    type Event = ();
295    type Message = GovernanceVersionSyncMessage;
296    type Response = GovernanceVersionSyncResponse;
297
298    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
299        parent_span.map_or_else(
300            || info_span!("GovernanceVersionSync"),
301            |parent| info_span!(parent: parent, "GovernanceVersionSync"),
302        )
303    }
304
305    async fn pre_start(
306        &mut self,
307        ctx: &mut ActorContext<Self>,
308    ) -> Result<(), ActorError> {
309        self.schedule_tick(ctx).await
310    }
311}
312
313impl NotPersistentActor for GovernanceVersionSync {}
314
315#[async_trait]
316impl Handler<Self> for GovernanceVersionSync {
317    async fn handle_message(
318        &mut self,
319        _sender: ActorPath,
320        msg: GovernanceVersionSyncMessage,
321        ctx: &mut ActorContext<Self>,
322    ) -> Result<GovernanceVersionSyncResponse, ActorError> {
323        match msg {
324            GovernanceVersionSyncMessage::RefreshGovernance {
325                version,
326                governance_peers,
327            } => {
328                self.refresh_governance(version, governance_peers);
329            }
330            GovernanceVersionSyncMessage::Tick => {
331                if let Err(error) = self.handle_tick(ctx).await {
332                    warn!(
333                        governance_id = %self.governance_id,
334                        error = %error,
335                        "Governance version sync tick failed"
336                    );
337                }
338            }
339            GovernanceVersionSyncMessage::RoundTimeout => {
340                if self.round_open {
341                    self.round_open = false;
342                    self.pending_peers.clear();
343                    if let Err(error) = self.trigger_update_if_needed(ctx).await
344                    {
345                        warn!(
346                            governance_id = %self.governance_id,
347                            error = %error,
348                            "Failed to trigger governance update after round timeout"
349                        );
350                    }
351                }
352            }
353            GovernanceVersionSyncMessage::PeerVersion { peer, version } => {
354                if self.peer_version(peer, version) {
355                    self.round_open = false;
356                    if let Err(error) = self.trigger_update_if_needed(ctx).await
357                    {
358                        warn!(
359                            governance_id = %self.governance_id,
360                            error = %error,
361                            "Failed to trigger governance update after round completion"
362                        );
363                    }
364                }
365            }
366        }
367
368        Ok(GovernanceVersionSyncResponse::None)
369    }
370}