photon_etcd_cluster/cluster_node/mod.rs
1//! Cluster membership and leader election.
2//!
3//! The [`ClusterNode`] manages cluster membership, leader election, and health
4//! monitoring via etcd. Each worker process creates one `ClusterNode` instance.
5//!
6//! # Example
7//!
8//! ```ignore
9//! use photon_etcd_cluster::ClusterNode;
10//! use tokio::sync::broadcast;
11//!
12//! let node = ClusterNode::new(
13//! vec!["http://localhost:2379".to_string()],
14//! "worker-1".to_string(),
15//! "192.168.1.10".parse()?,
16//! "my-service".to_string(),
17//! Some(5),
18//! );
19//!
20//! // Run node in background
21//! let n = node.clone();
22//! let (shutdown_tx, _) = broadcast::channel(1);
23//! let mut shutdown_rx = shutdown_tx.subscribe();
24//! tokio::spawn(async move {
25//! n.run(&mut shutdown_rx).await
26//! });
27//!
28//! // Check leadership status
29//! if node.is_leader() {
30//! println!("I am the leader!");
31//! }
32//! ```
33//!
34//! # With Metrics Collection
35//!
36//! ```ignore
37//! use photon_etcd_cluster::{ClusterNodeBuilder, SystemMetricsCollector};
38//!
39//! let node = ClusterNodeBuilder::new(
40//! vec!["http://localhost:2379".to_string()],
41//! "worker-1".to_string(),
42//! "192.168.1.10".parse()?,
43//! "my-service".to_string(),
44//! )
45//! .ttl(5)
46//! .metrics_collector(SystemMetricsCollector::new())
47//! .metrics_update_interval(5) // Update every 5 seconds
48//! .build();
49//! ```
50
51mod election;
52mod lease;
53mod metrics_task;
54mod state;
55
56use std::net::IpAddr;
57use std::sync::atomic::Ordering;
58use std::sync::Arc;
59use std::time::Duration;
60
61use etcd_client::{Client, LeaderKey, ResignOptions};
62use tokio::select;
63use tokio::sync::{broadcast, watch};
64use tracing::{error, info};
65
66use crate::connection::{connect_with_retry, node_key};
67use crate::error::ClusterError;
68use crate::metrics::{MetricsCollector, NoopMetricsCollector};
69use crate::types::HealthStatus;
70
71use state::ClusterNodeInner;
72
73/// Manages cluster membership and leader election via etcd.
74///
75/// Each worker process creates one `ClusterNode` instance that:
76/// - Registers the worker in the cluster node registry
77/// - Participates in leader election
78/// - Maintains health status via lease keep-alive
79#[derive(Clone)]
80pub struct ClusterNode {
81 inner: Arc<ClusterNodeInner>,
82}
83
84impl ClusterNode {
85 /// Creates a new ClusterNode.
86 ///
87 /// # Arguments
88 /// * `etcd_endpoints` - List of etcd endpoints (e.g., `["http://localhost:2379"]`)
89 /// * `node_id` - Unique identifier for this node
90 /// * `node_ip` - IP address of this node (used for service discovery)
91 /// * `group_name` - Logical group name for cluster membership
92 /// * `ttl` - Lease TTL in seconds (default: 5)
93 ///
94 /// # Panics
95 /// Panics if `etcd_endpoints` is empty or contains invalid URLs.
96 pub fn new(
97 etcd_endpoints: Vec<String>,
98 node_id: String,
99 node_ip: IpAddr,
100 group_name: String,
101 ttl: Option<i64>,
102 ) -> Self {
103 assert!(!etcd_endpoints.is_empty(), "etcd_endpoints cannot be empty");
104 for endpoint in &etcd_endpoints {
105 assert!(
106 endpoint.starts_with("http://") || endpoint.starts_with("https://"),
107 "Invalid etcd endpoint '{}': must start with http:// or https://",
108 endpoint
109 );
110 }
111
112 ClusterNode {
113 inner: Arc::new(ClusterNodeInner::new(
114 etcd_endpoints,
115 node_id,
116 node_ip,
117 group_name,
118 ttl.unwrap_or(5),
119 )),
120 }
121 }
122
123 /// Returns the current health status of the node's etcd connection.
124 pub fn current_health(&self) -> HealthStatus {
125 self.inner.node_health.load()
126 }
127
128 /// Returns true if this node is currently the leader.
129 pub fn is_leader(&self) -> bool {
130 self.inner.is_leader.load(Ordering::Acquire)
131 }
132
133 pub(in crate::cluster_node) fn set_unhealthy(&self) {
134 self.inner.node_health.store(HealthStatus::Unhealthy);
135 }
136
137 pub(in crate::cluster_node) fn set_healthy(&self) {
138 self.inner.node_health.store(HealthStatus::Healthy);
139 }
140
141 // Accessors for submodules (visibility limited to cluster_node module)
142 #[inline]
143 pub(in crate::cluster_node) fn inner(&self) -> &ClusterNodeInner {
144 &self.inner
145 }
146
147 #[inline]
148 pub(in crate::cluster_node) fn set_leader(&self, is_leader: bool) {
149 self.inner.is_leader.store(is_leader, Ordering::Release);
150 }
151
152 /// Starts the node's main loop.
153 ///
154 /// This method runs indefinitely, maintaining cluster membership and
155 /// participating in leader election. It automatically reconnects on
156 /// connection failures.
157 ///
158 /// If a metrics collector and update interval are configured, a background
159 /// task will periodically update the node's metrics in etcd.
160 ///
161 /// # Arguments
162 /// * `shutdown` - Shutdown signal receiver. When a message is received,
163 /// the node will gracefully resign leadership and exit.
164 ///
165 /// # Returns
166 /// Returns `Ok(())` on graceful shutdown, `Err` if unable to connect to etcd.
167 pub async fn run(&self, shutdown: &mut broadcast::Receiver<()>) -> Result<(), ClusterError> {
168 let (leader_tx, leader_rx) = watch::channel(None);
169 let (lease_healthy_tx, mut lease_healthy_rx) = watch::channel(true);
170 let (metrics_healthy_tx, mut metrics_healthy_rx) = watch::channel(true);
171
172 loop {
173 info!("🔵 Starting main loop ({})", self.inner.node_id);
174 let _ = leader_tx.send(None);
175 self.set_leader(false); // Reset leader state at start of each loop
176
177 // Reset lease health for new iteration (only if it was false)
178 // Use send_if_modified to avoid triggering change notification if already true
179 lease_healthy_tx.send_if_modified(|val| {
180 if !*val {
181 *val = true;
182 true
183 } else {
184 false
185 }
186 });
187 metrics_healthy_tx.send_if_modified(|val| {
188 if !*val {
189 *val = true;
190 true
191 } else {
192 false
193 }
194 });
195
196 let client_watcher = shutdown.resubscribe();
197 let client = connect_with_retry(&self.inner.etcd_endpoints, Some(client_watcher)).await;
198 let Some(mut client) = client else {
199 return Err(ClusterError::Internal("Failed to connect to etcd".into()));
200 };
201
202 let (lease_id, node_lease_ka) =
203 match lease::create_lease(self, &mut client, self.inner.ttl, lease_healthy_tx.clone())
204 .await
205 {
206 Ok((lease_id, keepalive)) => (lease_id, keepalive),
207 Err(e) => {
208 error!("Failed to create node lease: {}", e);
209 self.set_unhealthy();
210 tokio::time::sleep(Duration::from_secs(1)).await;
211 continue;
212 }
213 };
214
215 match lease::register_local_node(self, &mut client, lease_id).await {
216 Ok(_) => {
217 info!("Node registered: {}", self.inner.node_id);
218 self.set_healthy();
219 }
220 Err(e) => {
221 error!("Failed to register local node: {}", e);
222 self.set_unhealthy();
223 node_lease_ka.abort();
224 // Await the aborted task to ensure cleanup completes
225 let _ = node_lease_ka.await;
226 tokio::time::sleep(Duration::from_secs(1)).await;
227 continue;
228 }
229 }
230
231 // Spawn metrics update task if configured
232 let metrics_task =
233 metrics_task::spawn_metrics_update_task(self, lease_id, metrics_healthy_tx.clone());
234
235 let self_clone = self.clone();
236 let leader_tx_copy = leader_tx.clone();
237 let shutdown_watcher = shutdown.resubscribe();
238
239 // Mark the initial values as seen so changed() doesn't trigger immediately
240 // borrow_and_update() marks the current value as seen
241 let _ = lease_healthy_rx.borrow_and_update();
242 let _ = metrics_healthy_rx.borrow_and_update();
243
244 select! {
245 _ = shutdown.recv() => {
246 node_lease_ka.abort();
247 metrics_task::abort_metrics_task(metrics_task).await;
248 // Await the aborted task to ensure cleanup completes
249 let _ = node_lease_ka.await;
250
251 self.leave(&mut client, lease_id).await.ok();
252
253 let leader = leader_rx.borrow().clone();
254 if let Some(leader) = leader {
255 self.resign(&mut client, leader.clone()).await.ok();
256 }
257 self.set_leader(false); // Clear leader state on shutdown
258 info!("ClusterNode shutdown complete ({})", self.inner.node_id);
259 return Ok(());
260 },
261 // React to lease health changes
262 _ = lease_healthy_rx.changed() => {
263 if !*lease_healthy_rx.borrow() {
264 error!("Lease keep-alive failed - triggering reconnection");
265 node_lease_ka.abort();
266 metrics_task::abort_metrics_task(metrics_task).await;
267 let _ = node_lease_ka.await;
268
269 let leader = leader_rx.borrow().clone();
270 if let Some(leader) = leader {
271 self.resign(&mut client, leader.clone()).await.ok();
272 }
273
274 self.set_unhealthy();
275 self.set_leader(false);
276 self.leave(&mut client, lease_id).await.ok();
277 continue;
278 }
279 },
280 // React to metrics update health changes
281 _ = metrics_healthy_rx.changed() => {
282 if !*metrics_healthy_rx.borrow() {
283 error!("Metrics update failed repeatedly - triggering reconnection");
284 node_lease_ka.abort();
285 metrics_task::abort_metrics_task(metrics_task).await;
286 let _ = node_lease_ka.await;
287
288 let leader = leader_rx.borrow().clone();
289 if let Some(leader) = leader {
290 self.resign(&mut client, leader.clone()).await.ok();
291 }
292
293 self.set_unhealthy();
294 self.set_leader(false);
295 self.leave(&mut client, lease_id).await.ok();
296 continue;
297 }
298 },
299 res = election::run_election_loop(&self_clone, lease_id, leader_tx_copy, shutdown_watcher) => {
300 if let Err(e) = res {
301 error!("Election loop error: {}", e);
302
303 let leader = leader_rx.borrow().clone();
304 if let Some(leader) = leader {
305 self.resign(&mut client, leader.clone()).await.ok();
306 }
307
308 self.set_unhealthy();
309 self.set_leader(false); // Clear leader state on error
310 node_lease_ka.abort();
311 metrics_task::abort_metrics_task(metrics_task).await;
312 // Await the aborted task to ensure cleanup completes
313 let _ = node_lease_ka.await;
314 self.leave(&mut client, lease_id).await.ok();
315 continue;
316 }
317 },
318 }
319 }
320 }
321
322 /// Removes this node from the cluster registry.
323 pub async fn leave(&self, client: &mut Client, lease_id: i64) -> Result<(), ClusterError> {
324 // Delete node registration
325 let key = node_key(&self.inner.group_name, &self.inner.node_id);
326 if let Err(e) = client.delete(key, None).await {
327 error!("Failed to delete node registration: {}", e);
328 }
329 self.set_unhealthy();
330 self.set_leader(false); // Clear leader state when leaving
331 if let Err(e) = client.lease_revoke(lease_id).await {
332 error!("Failed to revoke lease: {}", e);
333 };
334
335 Ok(())
336 }
337
338 /// Resigns from leadership.
339 pub async fn resign(&self, client: &mut Client, leader: LeaderKey) -> Result<(), ClusterError> {
340 let resign_option = ResignOptions::new().with_leader(leader);
341 client.resign(Some(resign_option)).await.map_err(|e| {
342 ClusterError::Internal(format!("Failed to resign from election: {}", e))
343 })?;
344 self.set_leader(false); // Clear leader state when resigning
345 Ok(())
346 }
347}
348
349/// Builder for creating [`ClusterNode`] instances with custom configuration.
350///
351/// This builder provides a fluent API for configuring cluster nodes with
352/// optional features like metrics collection.
353///
354/// # Example
355///
356/// ```ignore
357/// use photon_etcd_cluster::{ClusterNodeBuilder, SystemMetricsCollector};
358///
359/// // Basic usage (no metrics)
360/// let node = ClusterNodeBuilder::new(
361/// vec!["http://localhost:2379".to_string()],
362/// "worker-1".to_string(),
363/// "192.168.1.10".parse()?,
364/// "my-service".to_string(),
365/// )
366/// .ttl(5)
367/// .build();
368///
369/// // With system metrics
370/// let node = ClusterNodeBuilder::new(
371/// vec!["http://localhost:2379".to_string()],
372/// "worker-1".to_string(),
373/// "192.168.1.10".parse()?,
374/// "my-service".to_string(),
375/// )
376/// .ttl(5)
377/// .metrics_collector(SystemMetricsCollector::new())
378/// .metrics_update_interval(5) // Update every 5 seconds
379/// .build();
380/// ```
381pub struct ClusterNodeBuilder {
382 etcd_endpoints: Vec<String>,
383 node_id: String,
384 node_ip: IpAddr,
385 group_name: String,
386 ttl: i64,
387 metrics_collector: Arc<dyn MetricsCollector>,
388 metrics_update_interval: u64,
389}
390
391impl ClusterNodeBuilder {
392 /// Creates a new builder with required parameters.
393 ///
394 /// # Arguments
395 ///
396 /// * `etcd_endpoints` - List of etcd endpoints (e.g., `["http://localhost:2379"]`)
397 /// * `node_id` - Unique identifier for this node
398 /// * `node_ip` - IP address of this node (used for service discovery)
399 /// * `group_name` - Logical group name for cluster membership
400 ///
401 /// # Panics
402 ///
403 /// Panics if `etcd_endpoints` is empty or contains invalid URLs.
404 pub fn new(
405 etcd_endpoints: Vec<String>,
406 node_id: String,
407 node_ip: IpAddr,
408 group_name: String,
409 ) -> Self {
410 assert!(!etcd_endpoints.is_empty(), "etcd_endpoints cannot be empty");
411 for endpoint in &etcd_endpoints {
412 assert!(
413 endpoint.starts_with("http://") || endpoint.starts_with("https://"),
414 "Invalid etcd endpoint '{}': must start with http:// or https://",
415 endpoint
416 );
417 }
418
419 Self {
420 etcd_endpoints,
421 node_id,
422 node_ip,
423 group_name,
424 ttl: 5, // Default TTL
425 metrics_collector: Arc::new(NoopMetricsCollector::new()),
426 metrics_update_interval: 0, // Disabled by default
427 }
428 }
429
430 /// Sets the lease TTL in seconds.
431 ///
432 /// The TTL determines how long etcd will wait before considering the node
433 /// as failed if heartbeats stop. Default is 5 seconds.
434 ///
435 /// # Arguments
436 ///
437 /// * `ttl` - Lease TTL in seconds (must be > 0)
438 pub fn ttl(mut self, ttl: i64) -> Self {
439 assert!(ttl > 0, "TTL must be positive");
440 self.ttl = ttl;
441 self
442 }
443
444 /// Sets a custom metrics collector.
445 ///
446 /// The collector will be called periodically to gather metrics that are
447 /// stored in etcd alongside the node registration. These metrics can be
448 /// used by load balancers for weighted routing decisions.
449 ///
450 /// By default, a [`NoopMetricsCollector`] is used which produces no metrics.
451 ///
452 /// # Example
453 ///
454 /// ```ignore
455 /// use photon_etcd_cluster::{ClusterNodeBuilder, SystemMetricsCollector};
456 ///
457 /// let node = ClusterNodeBuilder::new(endpoints, id, ip, group)
458 /// .metrics_collector(SystemMetricsCollector::new())
459 /// .metrics_update_interval(5)
460 /// .build();
461 /// ```
462 pub fn metrics_collector<C: MetricsCollector + 'static>(mut self, collector: C) -> Self {
463 self.metrics_collector = Arc::new(collector);
464 self
465 }
466
467 /// Sets the interval in seconds for metrics updates.
468 ///
469 /// When set to a value > 0, a background task will periodically update
470 /// the node's metrics in etcd at the specified interval.
471 ///
472 /// Set to 0 (default) to disable periodic metrics updates. In this case,
473 /// metrics will only be set during initial registration.
474 ///
475 /// Recommended value: Same as or slightly less than the TTL to ensure
476 /// fresh metrics are available for load balancing decisions.
477 ///
478 /// # Arguments
479 ///
480 /// * `interval` - Update interval in seconds (0 = disabled)
481 pub fn metrics_update_interval(mut self, interval: u64) -> Self {
482 self.metrics_update_interval = interval;
483 self
484 }
485
486 /// Builds the [`ClusterNode`] with the configured options.
487 pub fn build(self) -> ClusterNode {
488 if self.metrics_update_interval > 0 {
489 info!(
490 "ClusterNode configured with metrics collector (update_interval={}s)",
491 self.metrics_update_interval
492 );
493 }
494
495 ClusterNode {
496 inner: Arc::new(ClusterNodeInner::with_metrics(
497 self.etcd_endpoints,
498 self.node_id,
499 self.node_ip,
500 self.group_name,
501 self.ttl,
502 self.metrics_collector,
503 self.metrics_update_interval,
504 )),
505 }
506 }
507}
508
509#[cfg(test)]
510mod tests {
511 use super::*;
512
513 #[tokio::test]
514 async fn test_leader_state_tracking() {
515 let node = ClusterNode::new(
516 vec!["http://localhost:2379".to_string()],
517 "test-node".to_string(),
518 "127.0.0.1".parse().unwrap(),
519 "test-group".to_string(),
520 Some(5),
521 );
522
523 // Initially not a leader
524 assert!(!node.is_leader());
525
526 // Simulate becoming a leader
527 node.set_leader(true);
528 assert!(node.is_leader());
529
530 // Simulate stepping down
531 node.set_leader(false);
532 assert!(!node.is_leader());
533 }
534
535 #[test]
536 #[should_panic(expected = "etcd_endpoints cannot be empty")]
537 fn test_empty_endpoints_panics() {
538 let _ = ClusterNode::new(
539 vec![],
540 "test".to_string(),
541 "127.0.0.1".parse().unwrap(),
542 "test".to_string(),
543 None,
544 );
545 }
546
547 #[test]
548 #[should_panic(expected = "Invalid etcd endpoint")]
549 fn test_invalid_endpoint_panics() {
550 let _ = ClusterNode::new(
551 vec!["localhost:2379".to_string()],
552 "test".to_string(),
553 "127.0.0.1".parse().unwrap(),
554 "test".to_string(),
555 None,
556 );
557 }
558}