Skip to main content

ave_core/governance/
version_sync.rs

1use std::collections::HashSet;
2use std::sync::Arc;
3use std::time::Duration;
4
5use async_trait::async_trait;
6use ave_actors::{
7    Actor, ActorContext, ActorError, ActorPath, Handler, Message,
8    NotPersistentActor, Response,
9};
10use ave_common::identity::{DigestIdentifier, PublicKey};
11use rand::seq::IteratorRandom;
12use tracing::{Span, debug, info_span, warn};
13
14use crate::auth::{Auth, AuthMessage, AuthResponse};
15use crate::helpers::network::{
16    ActorMessage, NetworkMessage, service::NetworkSender,
17};
18use ave_network::ComunicateInfo;
19
20#[derive(Debug, Clone, Eq, PartialEq)]
21pub struct UpdateTarget {
22    pub peer: PublicKey,
23    pub version: u64,
24}
25
26#[derive(Debug, Clone)]
27pub enum GovernanceVersionSyncMessage {
28    RefreshGovernance {
29        version: u64,
30        governance_peers: HashSet<PublicKey>,
31    },
32    Tick,
33    RoundTimeout,
34    PeerVersion {
35        peer: PublicKey,
36        version: u64,
37    },
38}
39
40impl Message for GovernanceVersionSyncMessage {}
41
42#[derive(Debug, Clone)]
43pub enum GovernanceVersionSyncResponse {
44    None,
45}
46
47impl Response for GovernanceVersionSyncResponse {}
48
49pub struct GovernanceVersionSync {
50    governance_id: DigestIdentifier,
51    our_key: Arc<PublicKey>,
52    network: Arc<NetworkSender>,
53    local_version: u64,
54    sample_size: usize,
55    tick_interval: Duration,
56    response_timeout: Duration,
57    governance_peers: HashSet<PublicKey>,
58    pending_peers: HashSet<PublicKey>,
59    update_target: Option<UpdateTarget>,
60    round_open: bool,
61}
62
63impl GovernanceVersionSync {
64    pub fn new(
65        governance_id: DigestIdentifier,
66        our_key: Arc<PublicKey>,
67        network: Arc<NetworkSender>,
68        local_version: u64,
69        sample_size: usize,
70        tick_interval: Duration,
71        response_timeout: Duration,
72    ) -> Self {
73        Self {
74            governance_id,
75            our_key,
76            network,
77            local_version,
78            sample_size: sample_size.max(1),
79            tick_interval,
80            response_timeout,
81            governance_peers: HashSet::new(),
82            pending_peers: HashSet::new(),
83            update_target: None,
84            round_open: false,
85        }
86    }
87
88    async fn schedule_tick(
89        &self,
90        ctx: &ActorContext<Self>,
91    ) -> Result<(), ActorError> {
92        let actor = ctx.reference().await?;
93        let delay = self.tick_interval;
94        tokio::spawn(async move {
95            tokio::time::sleep(delay).await;
96            let _ = actor.tell(GovernanceVersionSyncMessage::Tick).await;
97        });
98        Ok(())
99    }
100
101    async fn schedule_timeout(
102        &self,
103        ctx: &ActorContext<Self>,
104    ) -> Result<(), ActorError> {
105        let actor = ctx.reference().await?;
106        let delay = self.response_timeout;
107        tokio::spawn(async move {
108            tokio::time::sleep(delay).await;
109            let _ =
110                actor.tell(GovernanceVersionSyncMessage::RoundTimeout).await;
111        });
112        Ok(())
113    }
114
115    fn refresh_governance(
116        &mut self,
117        version: u64,
118        mut governance_peers: HashSet<PublicKey>,
119    ) {
120        governance_peers.remove(&*self.our_key);
121        self.local_version = version;
122        self.governance_peers = governance_peers;
123
124        if self
125            .update_target
126            .as_ref()
127            .is_some_and(|target| target.version <= version)
128        {
129            self.update_target = None;
130        }
131    }
132
133    async fn trigger_update_if_needed(
134        &self,
135        _ctx: &ActorContext<Self>,
136    ) -> Result<(), ActorError> {
137        let Some(UpdateTarget { peer, .. }) = self.update_target.clone() else {
138            return Ok(());
139        };
140
141        let info = ComunicateInfo {
142            receiver: peer,
143            request_id: String::default(),
144            version: 0,
145            receiver_actor: format!(
146                "/user/node/distributor_{}",
147                self.governance_id
148            ),
149        };
150
151        self.network
152            .send_command(ave_network::CommandHelper::SendMessage {
153                message: NetworkMessage {
154                    info,
155                    message: ActorMessage::DistributionLedgerReq {
156                        actual_sn: Some(self.local_version),
157                        target_sn: None,
158                        subject_id: self.governance_id.clone(),
159                    },
160                },
161            })
162            .await
163    }
164
165    async fn get_auth_peers(
166        &self,
167        ctx: &ActorContext<Self>,
168    ) -> Result<HashSet<PublicKey>, ActorError> {
169        let auth_path = ActorPath::from("/user/node/auth");
170        let auth = ctx.system().get_actor::<Auth>(&auth_path).await?;
171        match auth
172            .ask(AuthMessage::GetAuth {
173                subject_id: self.governance_id.clone(),
174            })
175            .await
176        {
177            Ok(AuthResponse::Witnesses(mut witnesses)) => {
178                witnesses.remove(&*self.our_key);
179                Ok(witnesses)
180            }
181            Ok(_) => Ok(HashSet::new()),
182            Err(ActorError::Functional { .. }) => Ok(HashSet::new()),
183            Err(error) => Err(error),
184        }
185    }
186
187    fn select_peers(&self, auth_peers: HashSet<PublicKey>) -> Vec<PublicKey> {
188        let mut peers = self.governance_peers.clone();
189        peers.extend(auth_peers);
190        peers.remove(&*self.our_key);
191
192        if peers.is_empty() {
193            return Vec::new();
194        }
195
196        let mut rng = rand::rng();
197        peers
198            .iter()
199            .cloned()
200            .sample(&mut rng, self.sample_size.min(peers.len()))
201    }
202
203    fn peer_version(&mut self, peer: PublicKey, version: u64) -> bool {
204        if !self.round_open || !self.pending_peers.remove(&peer) {
205            return false;
206        }
207
208        if version <= self.local_version {
209            return self.pending_peers.is_empty();
210        }
211
212        let should_replace = self
213            .update_target
214            .as_ref()
215            .is_none_or(|target| version > target.version);
216        if should_replace {
217            self.update_target = Some(UpdateTarget { peer, version });
218        }
219
220        self.pending_peers.is_empty()
221    }
222
223    async fn handle_tick(
224        &mut self,
225        ctx: &ActorContext<Self>,
226    ) -> Result<(), ActorError> {
227        if self.update_target.is_some() {
228            self.schedule_tick(ctx).await?;
229            return Ok(());
230        }
231
232        let auth_peers = self.get_auth_peers(ctx).await?;
233        let peers = self.select_peers(auth_peers);
234
235        if peers.is_empty() {
236            self.schedule_tick(ctx).await?;
237            return Ok(());
238        }
239
240        self.pending_peers = peers.into_iter().collect();
241        self.round_open = !self.pending_peers.is_empty();
242
243        for peer in self.pending_peers.clone() {
244            let message = NetworkMessage {
245                info: ComunicateInfo {
246                    receiver: peer.clone(),
247                    request_id: String::default(),
248                    version: 0,
249                    receiver_actor: format!(
250                        "/user/node/distributor_{}",
251                        self.governance_id
252                    ),
253                },
254                message: ActorMessage::GovernanceVersionReq {
255                    subject_id: self.governance_id.clone(),
256                    receiver_actor: ctx.path().to_string(),
257                },
258            };
259
260            if let Err(error) = self
261                .network
262                .send_command(ave_network::CommandHelper::SendMessage { message })
263                .await
264            {
265                warn!(
266                    governance_id = %self.governance_id,
267                    peer = %peer,
268                    error = %error,
269                    "Failed to send governance version request"
270                );
271                self.pending_peers.remove(&peer);
272            }
273        }
274
275        debug!(
276            governance_id = %self.governance_id,
277            local_version = self.local_version,
278            selected_peers = self.pending_peers.len(),
279            "Governance version sync tick"
280        );
281
282        // The actual network request/response path is integrated later.
283        self.schedule_timeout(ctx).await?;
284        self.schedule_tick(ctx).await?;
285
286        Ok(())
287    }
288}
289
290#[async_trait]
291impl Actor for GovernanceVersionSync {
292    type Event = ();
293    type Message = GovernanceVersionSyncMessage;
294    type Response = GovernanceVersionSyncResponse;
295
296    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
297        parent_span.map_or_else(
298            || info_span!("GovernanceVersionSync"),
299            |parent| info_span!(parent: parent, "GovernanceVersionSync"),
300        )
301    }
302
303    async fn pre_start(
304        &mut self,
305        ctx: &mut ActorContext<Self>,
306    ) -> Result<(), ActorError> {
307        self.schedule_tick(ctx).await
308    }
309}
310
311impl NotPersistentActor for GovernanceVersionSync {}
312
313#[async_trait]
314impl Handler<Self> for GovernanceVersionSync {
315    async fn handle_message(
316        &mut self,
317        _sender: ActorPath,
318        msg: GovernanceVersionSyncMessage,
319        ctx: &mut ActorContext<Self>,
320    ) -> Result<GovernanceVersionSyncResponse, ActorError> {
321        match msg {
322            GovernanceVersionSyncMessage::RefreshGovernance {
323                version,
324                governance_peers,
325            } => {
326                self.refresh_governance(version, governance_peers);
327            }
328            GovernanceVersionSyncMessage::Tick => {
329                if let Err(error) = self.handle_tick(ctx).await {
330                    warn!(
331                        governance_id = %self.governance_id,
332                        error = %error,
333                        "Governance version sync tick failed"
334                    );
335                }
336            }
337            GovernanceVersionSyncMessage::RoundTimeout => {
338                if self.round_open {
339                    self.round_open = false;
340                    self.pending_peers.clear();
341                    if let Err(error) = self.trigger_update_if_needed(ctx).await
342                    {
343                        warn!(
344                            governance_id = %self.governance_id,
345                            error = %error,
346                            "Failed to trigger governance update after round timeout"
347                        );
348                    }
349                }
350            }
351            GovernanceVersionSyncMessage::PeerVersion { peer, version } => {
352                if self.peer_version(peer, version) {
353                    self.round_open = false;
354                    if let Err(error) = self.trigger_update_if_needed(ctx).await
355                    {
356                        warn!(
357                            governance_id = %self.governance_id,
358                            error = %error,
359                            "Failed to trigger governance update after round completion"
360                        );
361                    }
362                }
363            }
364        }
365
366        Ok(GovernanceVersionSyncResponse::None)
367    }
368}