Skip to main content

ave_core/governance/
version_sync.rs

1use std::collections::HashSet;
2use std::sync::Arc;
3use std::time::Duration;
4
5use async_trait::async_trait;
6use ave_actors::{
7    Actor, ActorContext, ActorError, ActorPath, Handler, Message,
8    NotPersistentActor, Response,
9};
10use ave_common::identity::{DigestIdentifier, PublicKey};
11use rand::seq::IteratorRandom;
12use tracing::{Span, debug, info_span, warn};
13
14use crate::auth::{Auth, AuthMessage, AuthResponse};
15use crate::helpers::network::{
16    ActorMessage, NetworkMessage, service::NetworkSender,
17};
18use network::ComunicateInfo;
19
20#[derive(Debug, Clone, Eq, PartialEq)]
21pub struct UpdateTarget {
22    pub peer: PublicKey,
23    pub version: u64,
24}
25
26#[derive(Debug, Clone)]
27pub enum GovernanceVersionSyncMessage {
28    RefreshGovernance {
29        version: u64,
30        governance_peers: HashSet<PublicKey>,
31    },
32    Tick,
33    RoundTimeout,
34    PeerVersion {
35        peer: PublicKey,
36        version: u64,
37    },
38}
39
40impl Message for GovernanceVersionSyncMessage {}
41
42#[derive(Debug, Clone)]
43pub enum GovernanceVersionSyncResponse {
44    None,
45}
46
47impl Response for GovernanceVersionSyncResponse {}
48
49pub struct GovernanceVersionSync {
50    governance_id: DigestIdentifier,
51    our_key: Arc<PublicKey>,
52    network: Arc<NetworkSender>,
53    local_version: u64,
54    sample_size: usize,
55    tick_interval: Duration,
56    response_timeout: Duration,
57    governance_peers: HashSet<PublicKey>,
58    pending_peers: HashSet<PublicKey>,
59    update_target: Option<UpdateTarget>,
60    round_open: bool,
61}
62
63impl GovernanceVersionSync {
64    pub fn new(
65        governance_id: DigestIdentifier,
66        our_key: Arc<PublicKey>,
67        network: Arc<NetworkSender>,
68        local_version: u64,
69        sample_size: usize,
70        tick_interval: Duration,
71        response_timeout: Duration,
72    ) -> Self {
73        Self {
74            governance_id,
75            our_key,
76            network,
77            local_version,
78            sample_size: sample_size.max(1),
79            tick_interval,
80            response_timeout,
81            governance_peers: HashSet::new(),
82            pending_peers: HashSet::new(),
83            update_target: None,
84            round_open: false,
85        }
86    }
87
88    async fn schedule_tick(
89        &self,
90        ctx: &ActorContext<Self>,
91    ) -> Result<(), ActorError> {
92        let actor = ctx.reference().await?;
93        let delay = self.tick_interval;
94        tokio::spawn(async move {
95            tokio::time::sleep(delay).await;
96            let _ = actor.tell(GovernanceVersionSyncMessage::Tick).await;
97        });
98        Ok(())
99    }
100
101    async fn schedule_timeout(
102        &self,
103        ctx: &ActorContext<Self>,
104    ) -> Result<(), ActorError> {
105        let actor = ctx.reference().await?;
106        let delay = self.response_timeout;
107        tokio::spawn(async move {
108            tokio::time::sleep(delay).await;
109            let _ =
110                actor.tell(GovernanceVersionSyncMessage::RoundTimeout).await;
111        });
112        Ok(())
113    }
114
115    fn refresh_governance(
116        &mut self,
117        version: u64,
118        mut governance_peers: HashSet<PublicKey>,
119    ) {
120        governance_peers.remove(&*self.our_key);
121        self.local_version = version;
122        self.governance_peers = governance_peers;
123
124        if self
125            .update_target
126            .as_ref()
127            .is_some_and(|target| target.version <= version)
128        {
129            self.update_target = None;
130        }
131    }
132
133    async fn trigger_update_if_needed(
134        &self,
135        _ctx: &ActorContext<Self>,
136    ) -> Result<(), ActorError> {
137        let Some(UpdateTarget { peer, .. }) = self.update_target.clone() else {
138            return Ok(());
139        };
140
141        let info = ComunicateInfo {
142            receiver: peer,
143            request_id: String::default(),
144            version: 0,
145            receiver_actor: format!(
146                "/user/node/distributor_{}",
147                self.governance_id
148            ),
149        };
150
151        self.network
152            .send_command(network::CommandHelper::SendMessage {
153                message: NetworkMessage {
154                    info,
155                    message: ActorMessage::DistributionLedgerReq {
156                        actual_sn: Some(self.local_version),
157                        subject_id: self.governance_id.clone(),
158                    },
159                },
160            })
161            .await
162    }
163
164    async fn get_auth_peers(
165        &self,
166        ctx: &ActorContext<Self>,
167    ) -> Result<HashSet<PublicKey>, ActorError> {
168        let auth_path = ActorPath::from("/user/node/auth");
169        let auth = ctx.system().get_actor::<Auth>(&auth_path).await?;
170        match auth
171            .ask(AuthMessage::GetAuth {
172                subject_id: self.governance_id.clone(),
173            })
174            .await
175        {
176            Ok(AuthResponse::Witnesses(mut witnesses)) => {
177                witnesses.remove(&*self.our_key);
178                Ok(witnesses)
179            }
180            Ok(_) => Ok(HashSet::new()),
181            Err(ActorError::Functional { .. }) => Ok(HashSet::new()),
182            Err(error) => Err(error),
183        }
184    }
185
186    fn select_peers(&self, auth_peers: HashSet<PublicKey>) -> Vec<PublicKey> {
187        let mut peers = self.governance_peers.clone();
188        peers.extend(auth_peers);
189        peers.remove(&*self.our_key);
190
191        if peers.is_empty() {
192            return Vec::new();
193        }
194
195        let mut rng = rand::rng();
196        peers
197            .iter()
198            .cloned()
199            .sample(&mut rng, self.sample_size.min(peers.len()))
200    }
201
202    fn peer_version(&mut self, peer: PublicKey, version: u64) -> bool {
203        if !self.round_open || !self.pending_peers.remove(&peer) {
204            return false;
205        }
206
207        if version <= self.local_version {
208            return self.pending_peers.is_empty();
209        }
210
211        let should_replace = self
212            .update_target
213            .as_ref()
214            .is_none_or(|target| version > target.version);
215        if should_replace {
216            self.update_target = Some(UpdateTarget { peer, version });
217        }
218
219        self.pending_peers.is_empty()
220    }
221
222    async fn handle_tick(
223        &mut self,
224        ctx: &ActorContext<Self>,
225    ) -> Result<(), ActorError> {
226        if self.update_target.is_some() {
227            self.schedule_tick(ctx).await?;
228            return Ok(());
229        }
230
231        let auth_peers = self.get_auth_peers(ctx).await?;
232        let peers = self.select_peers(auth_peers);
233
234        if peers.is_empty() {
235            self.schedule_tick(ctx).await?;
236            return Ok(());
237        }
238
239        self.pending_peers = peers.into_iter().collect();
240        self.round_open = !self.pending_peers.is_empty();
241
242        for peer in self.pending_peers.clone() {
243            let message = NetworkMessage {
244                info: ComunicateInfo {
245                    receiver: peer.clone(),
246                    request_id: String::default(),
247                    version: 0,
248                    receiver_actor: format!(
249                        "/user/node/distributor_{}",
250                        self.governance_id
251                    ),
252                },
253                message: ActorMessage::GovernanceVersionReq {
254                    subject_id: self.governance_id.clone(),
255                    receiver_actor: ctx.path().to_string(),
256                },
257            };
258
259            if let Err(error) = self
260                .network
261                .send_command(network::CommandHelper::SendMessage { message })
262                .await
263            {
264                warn!(
265                    governance_id = %self.governance_id,
266                    peer = %peer,
267                    error = %error,
268                    "Failed to send governance version request"
269                );
270                self.pending_peers.remove(&peer);
271            }
272        }
273
274        debug!(
275            governance_id = %self.governance_id,
276            local_version = self.local_version,
277            selected_peers = self.pending_peers.len(),
278            "Governance version sync tick"
279        );
280
281        // The actual network request/response path is integrated later.
282        self.schedule_timeout(ctx).await?;
283        self.schedule_tick(ctx).await?;
284
285        Ok(())
286    }
287}
288
289#[async_trait]
290impl Actor for GovernanceVersionSync {
291    type Event = ();
292    type Message = GovernanceVersionSyncMessage;
293    type Response = GovernanceVersionSyncResponse;
294
295    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
296        parent_span.map_or_else(
297            || info_span!("GovernanceVersionSync"),
298            |parent| info_span!(parent: parent, "GovernanceVersionSync"),
299        )
300    }
301
302    async fn pre_start(
303        &mut self,
304        ctx: &mut ActorContext<Self>,
305    ) -> Result<(), ActorError> {
306        self.schedule_tick(ctx).await
307    }
308}
309
310impl NotPersistentActor for GovernanceVersionSync {}
311
312#[async_trait]
313impl Handler<Self> for GovernanceVersionSync {
314    async fn handle_message(
315        &mut self,
316        _sender: ActorPath,
317        msg: GovernanceVersionSyncMessage,
318        ctx: &mut ActorContext<Self>,
319    ) -> Result<GovernanceVersionSyncResponse, ActorError> {
320        match msg {
321            GovernanceVersionSyncMessage::RefreshGovernance {
322                version,
323                governance_peers,
324            } => {
325                self.refresh_governance(version, governance_peers);
326            }
327            GovernanceVersionSyncMessage::Tick => {
328                if let Err(error) = self.handle_tick(ctx).await {
329                    warn!(
330                        governance_id = %self.governance_id,
331                        error = %error,
332                        "Governance version sync tick failed"
333                    );
334                }
335            }
336            GovernanceVersionSyncMessage::RoundTimeout => {
337                if self.round_open {
338                    self.round_open = false;
339                    self.pending_peers.clear();
340                    if let Err(error) = self.trigger_update_if_needed(ctx).await
341                    {
342                        warn!(
343                            governance_id = %self.governance_id,
344                            error = %error,
345                            "Failed to trigger governance update after round timeout"
346                        );
347                    }
348                }
349            }
350            GovernanceVersionSyncMessage::PeerVersion { peer, version } => {
351                if self.peer_version(peer, version) {
352                    self.round_open = false;
353                    if let Err(error) = self.trigger_update_if_needed(ctx).await
354                    {
355                        warn!(
356                            governance_id = %self.governance_id,
357                            error = %error,
358                            "Failed to trigger governance update after round completion"
359                        );
360                    }
361                }
362            }
363        }
364
365        Ok(GovernanceVersionSyncResponse::None)
366    }
367}