photon_etcd_cluster/cluster_node/
mod.rs

1//! Cluster membership and leader election.
2//!
3//! The [`ClusterNode`] manages cluster membership, leader election, and health
4//! monitoring via etcd. Each worker process creates one `ClusterNode` instance.
5//!
6//! # Example
7//!
8//! ```ignore
9//! use photon_etcd_cluster::ClusterNode;
10//! use tokio::sync::broadcast;
11//!
12//! let node = ClusterNode::new(
13//!     vec!["http://localhost:2379".to_string()],
14//!     "worker-1".to_string(),
15//!     "192.168.1.10".parse()?,
16//!     "my-service".to_string(),
17//!     Some(5),
18//! );
19//!
20//! // Run node in background
21//! let n = node.clone();
22//! let (shutdown_tx, _) = broadcast::channel(1);
23//! let mut shutdown_rx = shutdown_tx.subscribe();
24//! tokio::spawn(async move {
25//!     n.run(&mut shutdown_rx).await
26//! });
27//!
28//! // Check leadership status
29//! if node.is_leader() {
30//!     println!("I am the leader!");
31//! }
32//! ```
33//!
34//! # With Metrics Collection
35//!
36//! ```ignore
37//! use photon_etcd_cluster::{ClusterNodeBuilder, SystemMetricsCollector};
38//!
39//! let node = ClusterNodeBuilder::new(
40//!     vec!["http://localhost:2379".to_string()],
41//!     "worker-1".to_string(),
42//!     "192.168.1.10".parse()?,
43//!     "my-service".to_string(),
44//! )
45//! .ttl(5)
46//! .metrics_collector(SystemMetricsCollector::new())
47//! .metrics_update_interval(5)  // Update every 5 seconds
48//! .build();
49//! ```
50
51mod election;
52mod lease;
53mod metrics_task;
54mod state;
55
56use std::net::IpAddr;
57use std::sync::atomic::Ordering;
58use std::sync::Arc;
59use std::time::Duration;
60
61use etcd_client::{Client, LeaderKey, ResignOptions};
62use tokio::select;
63use tokio::sync::{broadcast, watch};
64use tracing::{error, info};
65
66use crate::connection::{connect_with_retry, node_key};
67use crate::error::ClusterError;
68use crate::metrics::{MetricsCollector, NoopMetricsCollector};
69use crate::types::HealthStatus;
70
71use state::ClusterNodeInner;
72
73/// Manages cluster membership and leader election via etcd.
74///
75/// Each worker process creates one `ClusterNode` instance that:
76/// - Registers the worker in the cluster node registry
77/// - Participates in leader election
78/// - Maintains health status via lease keep-alive
79#[derive(Clone)]
80pub struct ClusterNode {
81    inner: Arc<ClusterNodeInner>,
82}
83
84impl ClusterNode {
85    /// Creates a new ClusterNode.
86    ///
87    /// # Arguments
88    /// * `etcd_endpoints` - List of etcd endpoints (e.g., `["http://localhost:2379"]`)
89    /// * `node_id` - Unique identifier for this node
90    /// * `node_ip` - IP address of this node (used for service discovery)
91    /// * `group_name` - Logical group name for cluster membership
92    /// * `ttl` - Lease TTL in seconds (default: 5)
93    ///
94    /// # Panics
95    /// Panics if `etcd_endpoints` is empty or contains invalid URLs.
96    pub fn new(
97        etcd_endpoints: Vec<String>,
98        node_id: String,
99        node_ip: IpAddr,
100        group_name: String,
101        ttl: Option<i64>,
102    ) -> Self {
103        assert!(!etcd_endpoints.is_empty(), "etcd_endpoints cannot be empty");
104        for endpoint in &etcd_endpoints {
105            assert!(
106                endpoint.starts_with("http://") || endpoint.starts_with("https://"),
107                "Invalid etcd endpoint '{}': must start with http:// or https://",
108                endpoint
109            );
110        }
111
112        ClusterNode {
113            inner: Arc::new(ClusterNodeInner::new(
114                etcd_endpoints,
115                node_id,
116                node_ip,
117                group_name,
118                ttl.unwrap_or(5),
119            )),
120        }
121    }
122
123    /// Returns the current health status of the node's etcd connection.
124    pub fn current_health(&self) -> HealthStatus {
125        self.inner.node_health.load()
126    }
127
128    /// Returns true if this node is currently the leader.
129    pub fn is_leader(&self) -> bool {
130        self.inner.is_leader.load(Ordering::Acquire)
131    }
132
133    pub(in crate::cluster_node) fn set_unhealthy(&self) {
134        self.inner.node_health.store(HealthStatus::Unhealthy);
135    }
136
137    pub(in crate::cluster_node) fn set_healthy(&self) {
138        self.inner.node_health.store(HealthStatus::Healthy);
139    }
140
141    // Accessors for submodules (visibility limited to cluster_node module)
142    #[inline]
143    pub(in crate::cluster_node) fn inner(&self) -> &ClusterNodeInner {
144        &self.inner
145    }
146
147    #[inline]
148    pub(in crate::cluster_node) fn set_leader(&self, is_leader: bool) {
149        self.inner.is_leader.store(is_leader, Ordering::Release);
150    }
151
152    /// Starts the node's main loop.
153    ///
154    /// This method runs indefinitely, maintaining cluster membership and
155    /// participating in leader election. It automatically reconnects on
156    /// connection failures.
157    ///
158    /// If a metrics collector and update interval are configured, a background
159    /// task will periodically update the node's metrics in etcd.
160    ///
161    /// # Arguments
162    /// * `shutdown` - Shutdown signal receiver. When a message is received,
163    ///   the node will gracefully resign leadership and exit.
164    ///
165    /// # Returns
166    /// Returns `Ok(())` on graceful shutdown, `Err` if unable to connect to etcd.
167    pub async fn run(&self, shutdown: &mut broadcast::Receiver<()>) -> Result<(), ClusterError> {
168        let (leader_tx, leader_rx) = watch::channel(None);
169        let (lease_healthy_tx, mut lease_healthy_rx) = watch::channel(true);
170        let (metrics_healthy_tx, mut metrics_healthy_rx) = watch::channel(true);
171
172        loop {
173            info!("🔵 Starting main loop ({})", self.inner.node_id);
174            let _ = leader_tx.send(None);
175            self.set_leader(false); // Reset leader state at start of each loop
176
177            // Reset lease health for new iteration (only if it was false)
178            // Use send_if_modified to avoid triggering change notification if already true
179            lease_healthy_tx.send_if_modified(|val| {
180                if !*val {
181                    *val = true;
182                    true
183                } else {
184                    false
185                }
186            });
187            metrics_healthy_tx.send_if_modified(|val| {
188                if !*val {
189                    *val = true;
190                    true
191                } else {
192                    false
193                }
194            });
195
196            let client_watcher = shutdown.resubscribe();
197            let client = connect_with_retry(&self.inner.etcd_endpoints, Some(client_watcher)).await;
198            let Some(mut client) = client else {
199                return Err(ClusterError::Internal("Failed to connect to etcd".into()));
200            };
201
202            let (lease_id, node_lease_ka) =
203                match lease::create_lease(self, &mut client, self.inner.ttl, lease_healthy_tx.clone())
204                    .await
205                {
206                    Ok((lease_id, keepalive)) => (lease_id, keepalive),
207                    Err(e) => {
208                        error!("Failed to create node lease: {}", e);
209                        self.set_unhealthy();
210                        tokio::time::sleep(Duration::from_secs(1)).await;
211                        continue;
212                    }
213                };
214
215            match lease::register_local_node(self, &mut client, lease_id).await {
216                Ok(_) => {
217                    info!("Node registered: {}", self.inner.node_id);
218                    self.set_healthy();
219                }
220                Err(e) => {
221                    error!("Failed to register local node: {}", e);
222                    self.set_unhealthy();
223                    node_lease_ka.abort();
224                    // Await the aborted task to ensure cleanup completes
225                    let _ = node_lease_ka.await;
226                    tokio::time::sleep(Duration::from_secs(1)).await;
227                    continue;
228                }
229            }
230
231            // Spawn metrics update task if configured
232            let metrics_task =
233                metrics_task::spawn_metrics_update_task(self, lease_id, metrics_healthy_tx.clone());
234
235            let self_clone = self.clone();
236            let leader_tx_copy = leader_tx.clone();
237            let shutdown_watcher = shutdown.resubscribe();
238
239            // Mark the initial values as seen so changed() doesn't trigger immediately
240            // borrow_and_update() marks the current value as seen
241            let _ = lease_healthy_rx.borrow_and_update();
242            let _ = metrics_healthy_rx.borrow_and_update();
243
244            select! {
245                _ = shutdown.recv() => {
246                    node_lease_ka.abort();
247                    metrics_task::abort_metrics_task(metrics_task).await;
248                    // Await the aborted task to ensure cleanup completes
249                    let _ = node_lease_ka.await;
250
251                    self.leave(&mut client, lease_id).await.ok();
252
253                    let leader = leader_rx.borrow().clone();
254                    if let Some(leader) = leader {
255                        self.resign(&mut client, leader.clone()).await.ok();
256                    }
257                    self.set_leader(false); // Clear leader state on shutdown
258                    info!("ClusterNode shutdown complete ({})", self.inner.node_id);
259                    return Ok(());
260                },
261                // React to lease health changes
262                _ = lease_healthy_rx.changed() => {
263                    if !*lease_healthy_rx.borrow() {
264                        error!("Lease keep-alive failed - triggering reconnection");
265                        node_lease_ka.abort();
266                        metrics_task::abort_metrics_task(metrics_task).await;
267                        let _ = node_lease_ka.await;
268
269                        let leader = leader_rx.borrow().clone();
270                        if let Some(leader) = leader {
271                            self.resign(&mut client, leader.clone()).await.ok();
272                        }
273
274                        self.set_unhealthy();
275                        self.set_leader(false);
276                        self.leave(&mut client, lease_id).await.ok();
277                        continue;
278                    }
279                },
280                // React to metrics update health changes
281                _ = metrics_healthy_rx.changed() => {
282                    if !*metrics_healthy_rx.borrow() {
283                        error!("Metrics update failed repeatedly - triggering reconnection");
284                        node_lease_ka.abort();
285                        metrics_task::abort_metrics_task(metrics_task).await;
286                        let _ = node_lease_ka.await;
287
288                        let leader = leader_rx.borrow().clone();
289                        if let Some(leader) = leader {
290                            self.resign(&mut client, leader.clone()).await.ok();
291                        }
292
293                        self.set_unhealthy();
294                        self.set_leader(false);
295                        self.leave(&mut client, lease_id).await.ok();
296                        continue;
297                    }
298                },
299                res = election::run_election_loop(&self_clone, lease_id, leader_tx_copy, shutdown_watcher) => {
300                    if let Err(e) = res {
301                        error!("Election loop error: {}", e);
302
303                        let leader = leader_rx.borrow().clone();
304                        if let Some(leader) = leader {
305                            self.resign(&mut client, leader.clone()).await.ok();
306                        }
307
308                        self.set_unhealthy();
309                        self.set_leader(false); // Clear leader state on error
310                        node_lease_ka.abort();
311                        metrics_task::abort_metrics_task(metrics_task).await;
312                        // Await the aborted task to ensure cleanup completes
313                        let _ = node_lease_ka.await;
314                        self.leave(&mut client, lease_id).await.ok();
315                        continue;
316                    }
317                },
318            }
319        }
320    }
321
322    /// Removes this node from the cluster registry.
323    pub async fn leave(&self, client: &mut Client, lease_id: i64) -> Result<(), ClusterError> {
324        // Delete node registration
325        let key = node_key(&self.inner.group_name, &self.inner.node_id);
326        if let Err(e) = client.delete(key, None).await {
327            error!("Failed to delete node registration: {}", e);
328        }
329        self.set_unhealthy();
330        self.set_leader(false); // Clear leader state when leaving
331        if let Err(e) = client.lease_revoke(lease_id).await {
332            error!("Failed to revoke lease: {}", e);
333        };
334
335        Ok(())
336    }
337
338    /// Resigns from leadership.
339    pub async fn resign(&self, client: &mut Client, leader: LeaderKey) -> Result<(), ClusterError> {
340        let resign_option = ResignOptions::new().with_leader(leader);
341        client.resign(Some(resign_option)).await.map_err(|e| {
342            ClusterError::Internal(format!("Failed to resign from election: {}", e))
343        })?;
344        self.set_leader(false); // Clear leader state when resigning
345        Ok(())
346    }
347}
348
349/// Builder for creating [`ClusterNode`] instances with custom configuration.
350///
351/// This builder provides a fluent API for configuring cluster nodes with
352/// optional features like metrics collection.
353///
354/// # Example
355///
356/// ```ignore
357/// use photon_etcd_cluster::{ClusterNodeBuilder, SystemMetricsCollector};
358///
359/// // Basic usage (no metrics)
360/// let node = ClusterNodeBuilder::new(
361///     vec!["http://localhost:2379".to_string()],
362///     "worker-1".to_string(),
363///     "192.168.1.10".parse()?,
364///     "my-service".to_string(),
365/// )
366/// .ttl(5)
367/// .build();
368///
369/// // With system metrics
370/// let node = ClusterNodeBuilder::new(
371///     vec!["http://localhost:2379".to_string()],
372///     "worker-1".to_string(),
373///     "192.168.1.10".parse()?,
374///     "my-service".to_string(),
375/// )
376/// .ttl(5)
377/// .metrics_collector(SystemMetricsCollector::new())
378/// .metrics_update_interval(5)  // Update every 5 seconds
379/// .build();
380/// ```
381pub struct ClusterNodeBuilder {
382    etcd_endpoints: Vec<String>,
383    node_id: String,
384    node_ip: IpAddr,
385    group_name: String,
386    ttl: i64,
387    metrics_collector: Arc<dyn MetricsCollector>,
388    metrics_update_interval: u64,
389}
390
391impl ClusterNodeBuilder {
392    /// Creates a new builder with required parameters.
393    ///
394    /// # Arguments
395    ///
396    /// * `etcd_endpoints` - List of etcd endpoints (e.g., `["http://localhost:2379"]`)
397    /// * `node_id` - Unique identifier for this node
398    /// * `node_ip` - IP address of this node (used for service discovery)
399    /// * `group_name` - Logical group name for cluster membership
400    ///
401    /// # Panics
402    ///
403    /// Panics if `etcd_endpoints` is empty or contains invalid URLs.
404    pub fn new(
405        etcd_endpoints: Vec<String>,
406        node_id: String,
407        node_ip: IpAddr,
408        group_name: String,
409    ) -> Self {
410        assert!(!etcd_endpoints.is_empty(), "etcd_endpoints cannot be empty");
411        for endpoint in &etcd_endpoints {
412            assert!(
413                endpoint.starts_with("http://") || endpoint.starts_with("https://"),
414                "Invalid etcd endpoint '{}': must start with http:// or https://",
415                endpoint
416            );
417        }
418
419        Self {
420            etcd_endpoints,
421            node_id,
422            node_ip,
423            group_name,
424            ttl: 5, // Default TTL
425            metrics_collector: Arc::new(NoopMetricsCollector::new()),
426            metrics_update_interval: 0, // Disabled by default
427        }
428    }
429
430    /// Sets the lease TTL in seconds.
431    ///
432    /// The TTL determines how long etcd will wait before considering the node
433    /// as failed if heartbeats stop. Default is 5 seconds.
434    ///
435    /// # Arguments
436    ///
437    /// * `ttl` - Lease TTL in seconds (must be > 0)
438    pub fn ttl(mut self, ttl: i64) -> Self {
439        assert!(ttl > 0, "TTL must be positive");
440        self.ttl = ttl;
441        self
442    }
443
444    /// Sets a custom metrics collector.
445    ///
446    /// The collector will be called periodically to gather metrics that are
447    /// stored in etcd alongside the node registration. These metrics can be
448    /// used by load balancers for weighted routing decisions.
449    ///
450    /// By default, a [`NoopMetricsCollector`] is used which produces no metrics.
451    ///
452    /// # Example
453    ///
454    /// ```ignore
455    /// use photon_etcd_cluster::{ClusterNodeBuilder, SystemMetricsCollector};
456    ///
457    /// let node = ClusterNodeBuilder::new(endpoints, id, ip, group)
458    ///     .metrics_collector(SystemMetricsCollector::new())
459    ///     .metrics_update_interval(5)
460    ///     .build();
461    /// ```
462    pub fn metrics_collector<C: MetricsCollector + 'static>(mut self, collector: C) -> Self {
463        self.metrics_collector = Arc::new(collector);
464        self
465    }
466
467    /// Sets the interval in seconds for metrics updates.
468    ///
469    /// When set to a value > 0, a background task will periodically update
470    /// the node's metrics in etcd at the specified interval.
471    ///
472    /// Set to 0 (default) to disable periodic metrics updates. In this case,
473    /// metrics will only be set during initial registration.
474    ///
475    /// Recommended value: Same as or slightly less than the TTL to ensure
476    /// fresh metrics are available for load balancing decisions.
477    ///
478    /// # Arguments
479    ///
480    /// * `interval` - Update interval in seconds (0 = disabled)
481    pub fn metrics_update_interval(mut self, interval: u64) -> Self {
482        self.metrics_update_interval = interval;
483        self
484    }
485
486    /// Builds the [`ClusterNode`] with the configured options.
487    pub fn build(self) -> ClusterNode {
488        if self.metrics_update_interval > 0 {
489            info!(
490                "ClusterNode configured with metrics collector (update_interval={}s)",
491                self.metrics_update_interval
492            );
493        }
494
495        ClusterNode {
496            inner: Arc::new(ClusterNodeInner::with_metrics(
497                self.etcd_endpoints,
498                self.node_id,
499                self.node_ip,
500                self.group_name,
501                self.ttl,
502                self.metrics_collector,
503                self.metrics_update_interval,
504            )),
505        }
506    }
507}
508
509#[cfg(test)]
510mod tests {
511    use super::*;
512
513    #[tokio::test]
514    async fn test_leader_state_tracking() {
515        let node = ClusterNode::new(
516            vec!["http://localhost:2379".to_string()],
517            "test-node".to_string(),
518            "127.0.0.1".parse().unwrap(),
519            "test-group".to_string(),
520            Some(5),
521        );
522
523        // Initially not a leader
524        assert!(!node.is_leader());
525
526        // Simulate becoming a leader
527        node.set_leader(true);
528        assert!(node.is_leader());
529
530        // Simulate stepping down
531        node.set_leader(false);
532        assert!(!node.is_leader());
533    }
534
535    #[test]
536    #[should_panic(expected = "etcd_endpoints cannot be empty")]
537    fn test_empty_endpoints_panics() {
538        let _ = ClusterNode::new(
539            vec![],
540            "test".to_string(),
541            "127.0.0.1".parse().unwrap(),
542            "test".to_string(),
543            None,
544        );
545    }
546
547    #[test]
548    #[should_panic(expected = "Invalid etcd endpoint")]
549    fn test_invalid_endpoint_panics() {
550        let _ = ClusterNode::new(
551            vec!["localhost:2379".to_string()],
552            "test".to_string(),
553            "127.0.0.1".parse().unwrap(),
554            "test".to_string(),
555            None,
556        );
557    }
558}