Skip to main content

talos_api_rs/client/
discovery.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3//! Cluster discovery helpers for working with Talos clusters.
4//!
5//! This module provides utilities for discovering cluster members and their health status.
6//!
7//! # Example
8//!
9//! ```no_run
10//! use talos_api_rs::client::discovery::ClusterDiscovery;
11//! use talos_api_rs::client::TalosClientConfig;
12//!
13//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
14//! // Discover cluster from a single known endpoint
15//! let discovery = ClusterDiscovery::from_endpoint("https://192.168.1.100:50000")
16//!     .with_ca_cert("/path/to/ca.crt")
17//!     .with_client_cert("/path/to/client.crt", "/path/to/client.key")
18//!     .build();
19//!
20//! let members = discovery.discover_members().await?;
21//! for member in &members {
22//!     println!("{}: {} ({:?})", member.name, member.endpoint, member.role);
23//! }
24//!
25//! // Check health of all members
26//! let health = discovery.check_cluster_health().await?;
27//! println!("Healthy: {}/{}", health.healthy_count(), health.total_count());
28//! # Ok(())
29//! # }
30//! ```
31
32use crate::client::{TalosClient, TalosClientConfig};
33use crate::error::Result;
34use std::collections::HashMap;
35use std::time::Duration;
36
37/// Role of a node in the Talos cluster.
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
39pub enum NodeRole {
40    /// Control plane node (runs etcd, API server, etc.)
41    ControlPlane,
42    /// Worker node (runs workloads only)
43    Worker,
44    /// Unknown role
45    Unknown,
46}
47
48impl std::fmt::Display for NodeRole {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        match self {
51            Self::ControlPlane => write!(f, "controlplane"),
52            Self::Worker => write!(f, "worker"),
53            Self::Unknown => write!(f, "unknown"),
54        }
55    }
56}
57
58/// Information about a discovered cluster member.
59#[derive(Debug, Clone)]
60pub struct ClusterMember {
61    /// Node name/hostname
62    pub name: String,
63    /// gRPC endpoint URL
64    pub endpoint: String,
65    /// Node role
66    pub role: NodeRole,
67    /// Whether this node is an etcd member
68    pub is_etcd_member: bool,
69}
70
71impl ClusterMember {
72    /// Create a new cluster member
73    #[must_use]
74    pub fn new(name: impl Into<String>, endpoint: impl Into<String>, role: NodeRole) -> Self {
75        Self {
76            name: name.into(),
77            endpoint: endpoint.into(),
78            role,
79            is_etcd_member: role == NodeRole::ControlPlane,
80        }
81    }
82
83    /// Check if this is a control plane node
84    #[must_use]
85    pub fn is_control_plane(&self) -> bool {
86        self.role == NodeRole::ControlPlane
87    }
88
89    /// Check if this is a worker node
90    #[must_use]
91    pub fn is_worker(&self) -> bool {
92        self.role == NodeRole::Worker
93    }
94}
95
96/// Health status of a single node.
97#[derive(Debug, Clone)]
98pub struct NodeHealth {
99    /// Node name
100    pub name: String,
101    /// Node endpoint
102    pub endpoint: String,
103    /// Whether the node is reachable and healthy
104    pub is_healthy: bool,
105    /// Talos version if healthy
106    pub version: Option<String>,
107    /// Error message if unhealthy
108    pub error: Option<String>,
109    /// Response time in milliseconds
110    pub response_time_ms: Option<u64>,
111}
112
113impl NodeHealth {
114    /// Create a healthy node health status
115    #[must_use]
116    pub fn healthy(
117        name: impl Into<String>,
118        endpoint: impl Into<String>,
119        version: impl Into<String>,
120        response_time_ms: u64,
121    ) -> Self {
122        Self {
123            name: name.into(),
124            endpoint: endpoint.into(),
125            is_healthy: true,
126            version: Some(version.into()),
127            error: None,
128            response_time_ms: Some(response_time_ms),
129        }
130    }
131
132    /// Create an unhealthy node health status
133    #[must_use]
134    pub fn unhealthy(
135        name: impl Into<String>,
136        endpoint: impl Into<String>,
137        error: impl Into<String>,
138    ) -> Self {
139        Self {
140            name: name.into(),
141            endpoint: endpoint.into(),
142            is_healthy: false,
143            version: None,
144            error: Some(error.into()),
145            response_time_ms: None,
146        }
147    }
148}
149
150/// Health status of the entire cluster.
151#[derive(Debug, Clone)]
152pub struct ClusterHealth {
153    /// Health status of each node
154    pub nodes: Vec<NodeHealth>,
155    /// Overall cluster status
156    pub is_healthy: bool,
157}
158
159impl ClusterHealth {
160    /// Create cluster health from individual node health
161    #[must_use]
162    pub fn from_nodes(nodes: Vec<NodeHealth>) -> Self {
163        let is_healthy = !nodes.is_empty() && nodes.iter().all(|n| n.is_healthy);
164        Self { nodes, is_healthy }
165    }
166
167    /// Get the number of healthy nodes
168    #[must_use]
169    pub fn healthy_count(&self) -> usize {
170        self.nodes.iter().filter(|n| n.is_healthy).count()
171    }
172
173    /// Get the total number of nodes
174    #[must_use]
175    pub fn total_count(&self) -> usize {
176        self.nodes.len()
177    }
178
179    /// Get unhealthy nodes
180    #[must_use]
181    pub fn unhealthy_nodes(&self) -> Vec<&NodeHealth> {
182        self.nodes.iter().filter(|n| !n.is_healthy).collect()
183    }
184
185    /// Get healthy nodes
186    #[must_use]
187    pub fn healthy_nodes(&self) -> Vec<&NodeHealth> {
188        self.nodes.iter().filter(|n| n.is_healthy).collect()
189    }
190
191    /// Get the average response time of healthy nodes
192    #[must_use]
193    pub fn avg_response_time_ms(&self) -> Option<u64> {
194        let times: Vec<u64> = self
195            .nodes
196            .iter()
197            .filter_map(|n| n.response_time_ms)
198            .collect();
199
200        if times.is_empty() {
201            None
202        } else {
203            Some(times.iter().sum::<u64>() / times.len() as u64)
204        }
205    }
206}
207
208/// Builder for cluster discovery operations.
209#[derive(Debug, Clone)]
210pub struct ClusterDiscoveryBuilder {
211    /// Initial endpoint to connect to
212    endpoint: String,
213    /// CA certificate path
214    ca_cert: Option<String>,
215    /// Client certificate path
216    client_cert: Option<String>,
217    /// Client key path
218    client_key: Option<String>,
219    /// Connection timeout
220    connect_timeout: Duration,
221    /// Request timeout for health checks
222    request_timeout: Duration,
223    /// Skip TLS verification
224    insecure: bool,
225}
226
227impl ClusterDiscoveryBuilder {
228    /// Create a new builder with the given endpoint
229    #[must_use]
230    pub fn new(endpoint: impl Into<String>) -> Self {
231        Self {
232            endpoint: endpoint.into(),
233            ca_cert: None,
234            client_cert: None,
235            client_key: None,
236            connect_timeout: Duration::from_secs(10),
237            request_timeout: Duration::from_secs(5),
238            insecure: false,
239        }
240    }
241
242    /// Set CA certificate path
243    #[must_use]
244    pub fn with_ca_cert(mut self, path: impl Into<String>) -> Self {
245        self.ca_cert = Some(path.into());
246        self
247    }
248
249    /// Set client certificate and key paths
250    #[must_use]
251    pub fn with_client_cert(
252        mut self,
253        cert_path: impl Into<String>,
254        key_path: impl Into<String>,
255    ) -> Self {
256        self.client_cert = Some(cert_path.into());
257        self.client_key = Some(key_path.into());
258        self
259    }
260
261    /// Set connection timeout
262    #[must_use]
263    pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
264        self.connect_timeout = timeout;
265        self
266    }
267
268    /// Set request timeout for health checks
269    #[must_use]
270    pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
271        self.request_timeout = timeout;
272        self
273    }
274
275    /// Skip TLS verification (insecure)
276    #[must_use]
277    pub fn insecure(mut self) -> Self {
278        self.insecure = true;
279        self
280    }
281
282    /// Build the cluster discovery instance
283    #[must_use]
284    pub fn build(self) -> ClusterDiscovery {
285        ClusterDiscovery {
286            endpoint: self.endpoint,
287            ca_cert: self.ca_cert,
288            client_cert: self.client_cert,
289            client_key: self.client_key,
290            connect_timeout: self.connect_timeout,
291            request_timeout: self.request_timeout,
292            insecure: self.insecure,
293        }
294    }
295}
296
297/// Cluster discovery helper for Talos clusters.
298///
299/// Provides utilities for discovering cluster members and checking their health.
300#[derive(Debug, Clone)]
301pub struct ClusterDiscovery {
302    endpoint: String,
303    ca_cert: Option<String>,
304    client_cert: Option<String>,
305    client_key: Option<String>,
306    connect_timeout: Duration,
307    request_timeout: Duration,
308    insecure: bool,
309}
310
311impl ClusterDiscovery {
312    /// Create a new discovery instance from an endpoint
313    #[must_use]
314    pub fn from_endpoint(endpoint: impl Into<String>) -> ClusterDiscoveryBuilder {
315        ClusterDiscoveryBuilder::new(endpoint)
316    }
317
318    /// Create a client config for connecting to a specific endpoint
319    fn create_config(&self, endpoint: &str) -> TalosClientConfig {
320        let mut config = TalosClientConfig::new(endpoint)
321            .with_connect_timeout(self.connect_timeout)
322            .with_request_timeout(self.request_timeout);
323
324        if let Some(ref ca) = self.ca_cert {
325            config = config.with_ca(ca);
326        }
327
328        if let (Some(ref cert), Some(ref key)) = (&self.client_cert, &self.client_key) {
329            config = config.with_client_cert(cert).with_client_key(key);
330        }
331
332        if self.insecure {
333            config = config.insecure();
334        }
335
336        config
337    }
338
339    /// Connect to the primary endpoint and get a client
340    async fn connect_primary(&self) -> Result<TalosClient> {
341        let config = self.create_config(&self.endpoint);
342        TalosClient::new(config).await
343    }
344
345    /// Discover cluster members via etcd member list
346    ///
347    /// This connects to the initial endpoint and queries the etcd member list
348    /// to discover all control plane nodes.
349    pub async fn discover_members(&self) -> Result<Vec<ClusterMember>> {
350        let client = self.connect_primary().await?;
351
352        // Use EtcdMemberList to discover control plane nodes
353        let etcd_response = client
354            .etcd_member_list(crate::resources::EtcdMemberListRequest::new())
355            .await?;
356
357        let mut members = Vec::new();
358
359        for result in &etcd_response.results {
360            for member in &result.members {
361                // Extract endpoint from member's client URLs
362                let endpoint = member
363                    .client_urls
364                    .first()
365                    .map(|url| {
366                        // Convert etcd client URL to Talos API endpoint
367                        // etcd typically uses port 2379, Talos API uses 50000
368                        url.replace(":2379", ":50000")
369                            .replace("http://", "https://")
370                    })
371                    .unwrap_or_else(|| self.endpoint.clone());
372
373                members.push(ClusterMember {
374                    name: member.hostname.clone(),
375                    endpoint,
376                    role: NodeRole::ControlPlane,
377                    is_etcd_member: true,
378                });
379            }
380        }
381
382        Ok(members)
383    }
384
385    /// Check health of a single endpoint
386    ///
387    /// Tries the Version API first, falls back to Hostname API if unavailable.
388    /// This is necessary because Docker-based Talos clusters don't implement
389    /// the Version API.
390    async fn check_endpoint_health(&self, name: &str, endpoint: &str) -> NodeHealth {
391        let config = self.create_config(endpoint);
392        let start = std::time::Instant::now();
393
394        match TalosClient::new(config).await {
395            Ok(client) => {
396                // Try Version API first
397                let mut version_client = client.version();
398                let version_req = crate::api::version::VersionRequest { client: false };
399
400                match version_client.version(version_req).await {
401                    Ok(response) => {
402                        let elapsed = start.elapsed().as_millis() as u64;
403                        NodeHealth::healthy(name, endpoint, &response.get_ref().tag, elapsed)
404                    }
405                    Err(version_err) => {
406                        // Version API failed - try Hostname API as fallback
407                        // This is common in Docker-based clusters where Version is unimplemented
408                        let mut machine_client = client.machine();
409                        match machine_client.hostname(()).await {
410                            Ok(response) => {
411                                let elapsed = start.elapsed().as_millis() as u64;
412                                // Extract hostname from response
413                                let hostname = response
414                                    .get_ref()
415                                    .messages
416                                    .first()
417                                    .map(|m| m.hostname.as_str())
418                                    .unwrap_or("unknown");
419                                // Mark as healthy with hostname instead of version
420                                NodeHealth::healthy(
421                                    name,
422                                    endpoint,
423                                    format!("(hostname: {})", hostname),
424                                    elapsed,
425                                )
426                            }
427                            Err(_) => {
428                                // Both APIs failed - report the version error
429                                NodeHealth::unhealthy(name, endpoint, version_err.to_string())
430                            }
431                        }
432                    }
433                }
434            }
435            Err(e) => NodeHealth::unhealthy(name, endpoint, e.to_string()),
436        }
437    }
438
439    /// Check health of all discovered cluster members
440    ///
441    /// This first discovers members, then checks health of each one.
442    pub async fn check_cluster_health(&self) -> Result<ClusterHealth> {
443        let members = self.discover_members().await?;
444        self.check_members_health(&members).await
445    }
446
447    /// Check health of specific cluster members
448    ///
449    /// Useful when you already have a list of members.
450    pub async fn check_members_health(&self, members: &[ClusterMember]) -> Result<ClusterHealth> {
451        let mut health_results = Vec::with_capacity(members.len());
452
453        for member in members {
454            let health = self
455                .check_endpoint_health(&member.name, &member.endpoint)
456                .await;
457            health_results.push(health);
458        }
459
460        Ok(ClusterHealth::from_nodes(health_results))
461    }
462
463    /// Check health of multiple endpoints directly
464    ///
465    /// Useful when you have a list of endpoint URLs but not member info.
466    pub async fn check_endpoints_health(&self, endpoints: &[String]) -> Result<ClusterHealth> {
467        let mut health_results = Vec::with_capacity(endpoints.len());
468
469        for endpoint in endpoints {
470            let health = self.check_endpoint_health(endpoint, endpoint).await;
471            health_results.push(health);
472        }
473
474        Ok(ClusterHealth::from_nodes(health_results))
475    }
476
477    /// Get a map of endpoint to version for healthy nodes
478    pub async fn get_cluster_versions(&self) -> Result<HashMap<String, String>> {
479        let health = self.check_cluster_health().await?;
480
481        Ok(health
482            .nodes
483            .into_iter()
484            .filter_map(|n| n.version.map(|v| (n.endpoint, v)))
485            .collect())
486    }
487}
488
489#[cfg(test)]
490mod tests {
491    use super::*;
492
493    #[test]
494    fn test_node_role_display() {
495        assert_eq!(format!("{}", NodeRole::ControlPlane), "controlplane");
496        assert_eq!(format!("{}", NodeRole::Worker), "worker");
497        assert_eq!(format!("{}", NodeRole::Unknown), "unknown");
498    }
499
500    #[test]
501    fn test_cluster_member_new() {
502        let member = ClusterMember::new(
503            "node1",
504            "https://192.168.1.100:50000",
505            NodeRole::ControlPlane,
506        );
507        assert_eq!(member.name, "node1");
508        assert_eq!(member.endpoint, "https://192.168.1.100:50000");
509        assert!(member.is_control_plane());
510        assert!(!member.is_worker());
511        assert!(member.is_etcd_member);
512    }
513
514    #[test]
515    fn test_cluster_member_worker() {
516        let member = ClusterMember::new("worker1", "https://192.168.1.200:50000", NodeRole::Worker);
517        assert!(!member.is_control_plane());
518        assert!(member.is_worker());
519        assert!(!member.is_etcd_member);
520    }
521
522    #[test]
523    fn test_node_health_healthy() {
524        let health = NodeHealth::healthy("node1", "https://192.168.1.100:50000", "v1.9.0", 42);
525        assert!(health.is_healthy);
526        assert_eq!(health.version, Some("v1.9.0".to_string()));
527        assert_eq!(health.response_time_ms, Some(42));
528        assert!(health.error.is_none());
529    }
530
531    #[test]
532    fn test_node_health_unhealthy() {
533        let health =
534            NodeHealth::unhealthy("node1", "https://192.168.1.100:50000", "connection refused");
535        assert!(!health.is_healthy);
536        assert!(health.version.is_none());
537        assert!(health.response_time_ms.is_none());
538        assert_eq!(health.error, Some("connection refused".to_string()));
539    }
540
541    #[test]
542    fn test_cluster_health_all_healthy() {
543        let nodes = vec![
544            NodeHealth::healthy("node1", "endpoint1", "v1.9.0", 10),
545            NodeHealth::healthy("node2", "endpoint2", "v1.9.0", 20),
546        ];
547        let health = ClusterHealth::from_nodes(nodes);
548
549        assert!(health.is_healthy);
550        assert_eq!(health.healthy_count(), 2);
551        assert_eq!(health.total_count(), 2);
552        assert_eq!(health.unhealthy_nodes().len(), 0);
553    }
554
555    #[test]
556    fn test_cluster_health_partial_healthy() {
557        let nodes = vec![
558            NodeHealth::healthy("node1", "endpoint1", "v1.9.0", 10),
559            NodeHealth::unhealthy("node2", "endpoint2", "timeout"),
560        ];
561        let health = ClusterHealth::from_nodes(nodes);
562
563        assert!(!health.is_healthy);
564        assert_eq!(health.healthy_count(), 1);
565        assert_eq!(health.total_count(), 2);
566        assert_eq!(health.unhealthy_nodes().len(), 1);
567    }
568
569    #[test]
570    fn test_cluster_health_avg_response_time() {
571        let nodes = vec![
572            NodeHealth::healthy("node1", "endpoint1", "v1.9.0", 10),
573            NodeHealth::healthy("node2", "endpoint2", "v1.9.0", 20),
574            NodeHealth::healthy("node3", "endpoint3", "v1.9.0", 30),
575        ];
576        let health = ClusterHealth::from_nodes(nodes);
577
578        assert_eq!(health.avg_response_time_ms(), Some(20));
579    }
580
581    #[test]
582    fn test_cluster_health_no_response_time() {
583        let nodes = vec![NodeHealth::unhealthy("node1", "endpoint1", "error")];
584        let health = ClusterHealth::from_nodes(nodes);
585
586        assert_eq!(health.avg_response_time_ms(), None);
587    }
588
589    #[test]
590    fn test_cluster_discovery_builder() {
591        let discovery = ClusterDiscovery::from_endpoint("https://192.168.1.100:50000")
592            .with_ca_cert("/path/to/ca.crt")
593            .with_client_cert("/path/to/client.crt", "/path/to/client.key")
594            .with_connect_timeout(Duration::from_secs(5))
595            .with_request_timeout(Duration::from_secs(3))
596            .build();
597
598        assert_eq!(discovery.endpoint, "https://192.168.1.100:50000");
599        assert_eq!(discovery.ca_cert, Some("/path/to/ca.crt".to_string()));
600        assert_eq!(
601            discovery.client_cert,
602            Some("/path/to/client.crt".to_string())
603        );
604        assert_eq!(
605            discovery.client_key,
606            Some("/path/to/client.key".to_string())
607        );
608        assert_eq!(discovery.connect_timeout, Duration::from_secs(5));
609        assert_eq!(discovery.request_timeout, Duration::from_secs(3));
610        assert!(!discovery.insecure);
611    }
612
613    #[test]
614    fn test_cluster_discovery_builder_insecure() {
615        let discovery = ClusterDiscovery::from_endpoint("https://192.168.1.100:50000")
616            .insecure()
617            .build();
618
619        assert!(discovery.insecure);
620    }
621}