Skip to main content

ranvier_runtime/
cluster.rs

1use async_trait::async_trait;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::time::Duration;
5use tokio::time::sleep;
6use tracing::{info, warn};
7
8// In a real scenario, this might use ranvier_core::cluster::DistributedLock or the local one in distributed.rs
9use crate::distributed::{DistributedError, DistributedLock, LockOptions};
10
11/// Interface for components participating in cluster leader election.
12#[async_trait]
13pub trait LeaderElection: Send + Sync {
14    /// Attempt to become the leader or renew leadership if already the leader.
15    async fn try_become_leader(&self) -> Result<bool, DistributedError>;
16
17    /// Returns true if this node is currently the elected leader.
18    fn is_leader(&self) -> bool;
19
20    /// Step down from leadership explicitly.
21    async fn step_down(&self) -> Result<(), DistributedError>;
22}
23
24/// A standard implementation of leader election using a distributed lock.
25pub struct LockBasedElection<L: DistributedLock> {
26    lock: Arc<L>,
27    node_id: String,
28    resource_key: String,
29    is_leader: Arc<AtomicBool>,
30    ttl_ms: u64,
31}
32
33impl<L: DistributedLock> LockBasedElection<L> {
34    pub fn new(lock: Arc<L>, node_id: String, resource_key: String, ttl_ms: u64) -> Self {
35        Self {
36            lock,
37            node_id,
38            resource_key,
39            is_leader: Arc::new(AtomicBool::new(false)),
40            ttl_ms,
41        }
42    }
43}
44
45#[async_trait]
46impl<L: DistributedLock> LeaderElection for LockBasedElection<L> {
47    async fn try_become_leader(&self) -> Result<bool, DistributedError> {
48        let opts = LockOptions {
49            ttl_ms: self.ttl_ms,
50            retry_count: 0,
51            retry_delay_ms: 0,
52        };
53
54        match self.lock.acquire(&self.resource_key, opts).await {
55            Ok(_guard) => {
56                if !self.is_leader.load(Ordering::Relaxed) {
57                    info!("Node {} became the cluster leader", self.node_id);
58                    self.is_leader.store(true, Ordering::SeqCst);
59                }
60                // We don't save the guard here in this simple implementation,
61                // but a production version would keep the guard to extend it later.
62                Ok(true)
63            }
64            Err(DistributedError::LockError(_)) => {
65                if self.is_leader.load(Ordering::Relaxed) {
66                    warn!("Node {} lost leadership!", self.node_id);
67                    self.is_leader.store(false, Ordering::SeqCst);
68                }
69                Ok(false)
70            }
71            Err(e) => Err(e),
72        }
73    }
74
75    fn is_leader(&self) -> bool {
76        self.is_leader.load(Ordering::Relaxed)
77    }
78
79    async fn step_down(&self) -> Result<(), DistributedError> {
80        if self.is_leader.swap(false, Ordering::SeqCst) {
81            info!("Node {} stepping down from leadership", self.node_id);
82            // In a real implementation, we would release the guard here.
83        }
84        Ok(())
85    }
86}
87
88/// Manages the background task that periodically attempts to renew leadership.
89pub struct ClusterManager {
90    // Hidden internal structures for background task handle
91}
92
93impl ClusterManager {
94    /// Starts a background task that periodically polls the leader election
95    pub fn start_election_loop<E: LeaderElection + 'static>(
96        election: Arc<E>,
97        interval: Duration,
98    ) -> tokio::task::JoinHandle<()> {
99        tokio::spawn(async move {
100            loop {
101                match election.try_become_leader().await {
102                    Ok(true) => {
103                        // Successfully became or renewed leader.
104                    }
105                    Ok(false) => {
106                        // Not the leader.
107                    }
108                    Err(err) => {
109                        warn!("Error in leader election: {}", err);
110                    }
111                }
112                sleep(interval).await;
113            }
114        })
115    }
116}