1use crate::client::{TalosClient, TalosClientConfig};
33use crate::error::Result;
34use std::collections::HashMap;
35use std::time::Duration;
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
39pub enum NodeRole {
40 ControlPlane,
42 Worker,
44 Unknown,
46}
47
48impl std::fmt::Display for NodeRole {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 match self {
51 Self::ControlPlane => write!(f, "controlplane"),
52 Self::Worker => write!(f, "worker"),
53 Self::Unknown => write!(f, "unknown"),
54 }
55 }
56}
57
58#[derive(Debug, Clone)]
60pub struct ClusterMember {
61 pub name: String,
63 pub endpoint: String,
65 pub role: NodeRole,
67 pub is_etcd_member: bool,
69}
70
71impl ClusterMember {
72 #[must_use]
74 pub fn new(name: impl Into<String>, endpoint: impl Into<String>, role: NodeRole) -> Self {
75 Self {
76 name: name.into(),
77 endpoint: endpoint.into(),
78 role,
79 is_etcd_member: role == NodeRole::ControlPlane,
80 }
81 }
82
83 #[must_use]
85 pub fn is_control_plane(&self) -> bool {
86 self.role == NodeRole::ControlPlane
87 }
88
89 #[must_use]
91 pub fn is_worker(&self) -> bool {
92 self.role == NodeRole::Worker
93 }
94}
95
96#[derive(Debug, Clone)]
98pub struct NodeHealth {
99 pub name: String,
101 pub endpoint: String,
103 pub is_healthy: bool,
105 pub version: Option<String>,
107 pub error: Option<String>,
109 pub response_time_ms: Option<u64>,
111}
112
113impl NodeHealth {
114 #[must_use]
116 pub fn healthy(
117 name: impl Into<String>,
118 endpoint: impl Into<String>,
119 version: impl Into<String>,
120 response_time_ms: u64,
121 ) -> Self {
122 Self {
123 name: name.into(),
124 endpoint: endpoint.into(),
125 is_healthy: true,
126 version: Some(version.into()),
127 error: None,
128 response_time_ms: Some(response_time_ms),
129 }
130 }
131
132 #[must_use]
134 pub fn unhealthy(
135 name: impl Into<String>,
136 endpoint: impl Into<String>,
137 error: impl Into<String>,
138 ) -> Self {
139 Self {
140 name: name.into(),
141 endpoint: endpoint.into(),
142 is_healthy: false,
143 version: None,
144 error: Some(error.into()),
145 response_time_ms: None,
146 }
147 }
148}
149
150#[derive(Debug, Clone)]
152pub struct ClusterHealth {
153 pub nodes: Vec<NodeHealth>,
155 pub is_healthy: bool,
157}
158
159impl ClusterHealth {
160 #[must_use]
162 pub fn from_nodes(nodes: Vec<NodeHealth>) -> Self {
163 let is_healthy = !nodes.is_empty() && nodes.iter().all(|n| n.is_healthy);
164 Self { nodes, is_healthy }
165 }
166
167 #[must_use]
169 pub fn healthy_count(&self) -> usize {
170 self.nodes.iter().filter(|n| n.is_healthy).count()
171 }
172
173 #[must_use]
175 pub fn total_count(&self) -> usize {
176 self.nodes.len()
177 }
178
179 #[must_use]
181 pub fn unhealthy_nodes(&self) -> Vec<&NodeHealth> {
182 self.nodes.iter().filter(|n| !n.is_healthy).collect()
183 }
184
185 #[must_use]
187 pub fn healthy_nodes(&self) -> Vec<&NodeHealth> {
188 self.nodes.iter().filter(|n| n.is_healthy).collect()
189 }
190
191 #[must_use]
193 pub fn avg_response_time_ms(&self) -> Option<u64> {
194 let times: Vec<u64> = self
195 .nodes
196 .iter()
197 .filter_map(|n| n.response_time_ms)
198 .collect();
199
200 if times.is_empty() {
201 None
202 } else {
203 Some(times.iter().sum::<u64>() / times.len() as u64)
204 }
205 }
206}
207
208#[derive(Debug, Clone)]
210pub struct ClusterDiscoveryBuilder {
211 endpoint: String,
213 ca_cert: Option<String>,
215 client_cert: Option<String>,
217 client_key: Option<String>,
219 connect_timeout: Duration,
221 request_timeout: Duration,
223 insecure: bool,
225}
226
227impl ClusterDiscoveryBuilder {
228 #[must_use]
230 pub fn new(endpoint: impl Into<String>) -> Self {
231 Self {
232 endpoint: endpoint.into(),
233 ca_cert: None,
234 client_cert: None,
235 client_key: None,
236 connect_timeout: Duration::from_secs(10),
237 request_timeout: Duration::from_secs(5),
238 insecure: false,
239 }
240 }
241
242 #[must_use]
244 pub fn with_ca_cert(mut self, path: impl Into<String>) -> Self {
245 self.ca_cert = Some(path.into());
246 self
247 }
248
249 #[must_use]
251 pub fn with_client_cert(
252 mut self,
253 cert_path: impl Into<String>,
254 key_path: impl Into<String>,
255 ) -> Self {
256 self.client_cert = Some(cert_path.into());
257 self.client_key = Some(key_path.into());
258 self
259 }
260
261 #[must_use]
263 pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
264 self.connect_timeout = timeout;
265 self
266 }
267
268 #[must_use]
270 pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
271 self.request_timeout = timeout;
272 self
273 }
274
275 #[must_use]
277 pub fn insecure(mut self) -> Self {
278 self.insecure = true;
279 self
280 }
281
282 #[must_use]
284 pub fn build(self) -> ClusterDiscovery {
285 ClusterDiscovery {
286 endpoint: self.endpoint,
287 ca_cert: self.ca_cert,
288 client_cert: self.client_cert,
289 client_key: self.client_key,
290 connect_timeout: self.connect_timeout,
291 request_timeout: self.request_timeout,
292 insecure: self.insecure,
293 }
294 }
295}
296
297#[derive(Debug, Clone)]
301pub struct ClusterDiscovery {
302 endpoint: String,
303 ca_cert: Option<String>,
304 client_cert: Option<String>,
305 client_key: Option<String>,
306 connect_timeout: Duration,
307 request_timeout: Duration,
308 insecure: bool,
309}
310
311impl ClusterDiscovery {
312 #[must_use]
314 pub fn from_endpoint(endpoint: impl Into<String>) -> ClusterDiscoveryBuilder {
315 ClusterDiscoveryBuilder::new(endpoint)
316 }
317
318 fn create_config(&self, endpoint: &str) -> TalosClientConfig {
320 let mut config = TalosClientConfig::new(endpoint)
321 .with_connect_timeout(self.connect_timeout)
322 .with_request_timeout(self.request_timeout);
323
324 if let Some(ref ca) = self.ca_cert {
325 config = config.with_ca(ca);
326 }
327
328 if let (Some(ref cert), Some(ref key)) = (&self.client_cert, &self.client_key) {
329 config = config.with_client_cert(cert).with_client_key(key);
330 }
331
332 if self.insecure {
333 config = config.insecure();
334 }
335
336 config
337 }
338
339 async fn connect_primary(&self) -> Result<TalosClient> {
341 let config = self.create_config(&self.endpoint);
342 TalosClient::new(config).await
343 }
344
345 pub async fn discover_members(&self) -> Result<Vec<ClusterMember>> {
350 let client = self.connect_primary().await?;
351
352 let etcd_response = client
354 .etcd_member_list(crate::resources::EtcdMemberListRequest::new())
355 .await?;
356
357 let mut members = Vec::new();
358
359 for result in &etcd_response.results {
360 for member in &result.members {
361 let endpoint = member
363 .client_urls
364 .first()
365 .map(|url| {
366 url.replace(":2379", ":50000")
369 .replace("http://", "https://")
370 })
371 .unwrap_or_else(|| self.endpoint.clone());
372
373 members.push(ClusterMember {
374 name: member.hostname.clone(),
375 endpoint,
376 role: NodeRole::ControlPlane,
377 is_etcd_member: true,
378 });
379 }
380 }
381
382 Ok(members)
383 }
384
385 async fn check_endpoint_health(&self, name: &str, endpoint: &str) -> NodeHealth {
391 let config = self.create_config(endpoint);
392 let start = std::time::Instant::now();
393
394 match TalosClient::new(config).await {
395 Ok(client) => {
396 let mut version_client = client.version();
398 let version_req = crate::api::version::VersionRequest { client: false };
399
400 match version_client.version(version_req).await {
401 Ok(response) => {
402 let elapsed = start.elapsed().as_millis() as u64;
403 NodeHealth::healthy(name, endpoint, &response.get_ref().tag, elapsed)
404 }
405 Err(version_err) => {
406 let mut machine_client = client.machine();
409 match machine_client.hostname(()).await {
410 Ok(response) => {
411 let elapsed = start.elapsed().as_millis() as u64;
412 let hostname = response
414 .get_ref()
415 .messages
416 .first()
417 .map(|m| m.hostname.as_str())
418 .unwrap_or("unknown");
419 NodeHealth::healthy(
421 name,
422 endpoint,
423 format!("(hostname: {})", hostname),
424 elapsed,
425 )
426 }
427 Err(_) => {
428 NodeHealth::unhealthy(name, endpoint, version_err.to_string())
430 }
431 }
432 }
433 }
434 }
435 Err(e) => NodeHealth::unhealthy(name, endpoint, e.to_string()),
436 }
437 }
438
439 pub async fn check_cluster_health(&self) -> Result<ClusterHealth> {
443 let members = self.discover_members().await?;
444 self.check_members_health(&members).await
445 }
446
447 pub async fn check_members_health(&self, members: &[ClusterMember]) -> Result<ClusterHealth> {
451 let mut health_results = Vec::with_capacity(members.len());
452
453 for member in members {
454 let health = self
455 .check_endpoint_health(&member.name, &member.endpoint)
456 .await;
457 health_results.push(health);
458 }
459
460 Ok(ClusterHealth::from_nodes(health_results))
461 }
462
463 pub async fn check_endpoints_health(&self, endpoints: &[String]) -> Result<ClusterHealth> {
467 let mut health_results = Vec::with_capacity(endpoints.len());
468
469 for endpoint in endpoints {
470 let health = self.check_endpoint_health(endpoint, endpoint).await;
471 health_results.push(health);
472 }
473
474 Ok(ClusterHealth::from_nodes(health_results))
475 }
476
477 pub async fn get_cluster_versions(&self) -> Result<HashMap<String, String>> {
479 let health = self.check_cluster_health().await?;
480
481 Ok(health
482 .nodes
483 .into_iter()
484 .filter_map(|n| n.version.map(|v| (n.endpoint, v)))
485 .collect())
486 }
487}
488
489#[cfg(test)]
490mod tests {
491 use super::*;
492
493 #[test]
494 fn test_node_role_display() {
495 assert_eq!(format!("{}", NodeRole::ControlPlane), "controlplane");
496 assert_eq!(format!("{}", NodeRole::Worker), "worker");
497 assert_eq!(format!("{}", NodeRole::Unknown), "unknown");
498 }
499
500 #[test]
501 fn test_cluster_member_new() {
502 let member = ClusterMember::new(
503 "node1",
504 "https://192.168.1.100:50000",
505 NodeRole::ControlPlane,
506 );
507 assert_eq!(member.name, "node1");
508 assert_eq!(member.endpoint, "https://192.168.1.100:50000");
509 assert!(member.is_control_plane());
510 assert!(!member.is_worker());
511 assert!(member.is_etcd_member);
512 }
513
514 #[test]
515 fn test_cluster_member_worker() {
516 let member = ClusterMember::new("worker1", "https://192.168.1.200:50000", NodeRole::Worker);
517 assert!(!member.is_control_plane());
518 assert!(member.is_worker());
519 assert!(!member.is_etcd_member);
520 }
521
522 #[test]
523 fn test_node_health_healthy() {
524 let health = NodeHealth::healthy("node1", "https://192.168.1.100:50000", "v1.9.0", 42);
525 assert!(health.is_healthy);
526 assert_eq!(health.version, Some("v1.9.0".to_string()));
527 assert_eq!(health.response_time_ms, Some(42));
528 assert!(health.error.is_none());
529 }
530
531 #[test]
532 fn test_node_health_unhealthy() {
533 let health =
534 NodeHealth::unhealthy("node1", "https://192.168.1.100:50000", "connection refused");
535 assert!(!health.is_healthy);
536 assert!(health.version.is_none());
537 assert!(health.response_time_ms.is_none());
538 assert_eq!(health.error, Some("connection refused".to_string()));
539 }
540
541 #[test]
542 fn test_cluster_health_all_healthy() {
543 let nodes = vec![
544 NodeHealth::healthy("node1", "endpoint1", "v1.9.0", 10),
545 NodeHealth::healthy("node2", "endpoint2", "v1.9.0", 20),
546 ];
547 let health = ClusterHealth::from_nodes(nodes);
548
549 assert!(health.is_healthy);
550 assert_eq!(health.healthy_count(), 2);
551 assert_eq!(health.total_count(), 2);
552 assert_eq!(health.unhealthy_nodes().len(), 0);
553 }
554
555 #[test]
556 fn test_cluster_health_partial_healthy() {
557 let nodes = vec![
558 NodeHealth::healthy("node1", "endpoint1", "v1.9.0", 10),
559 NodeHealth::unhealthy("node2", "endpoint2", "timeout"),
560 ];
561 let health = ClusterHealth::from_nodes(nodes);
562
563 assert!(!health.is_healthy);
564 assert_eq!(health.healthy_count(), 1);
565 assert_eq!(health.total_count(), 2);
566 assert_eq!(health.unhealthy_nodes().len(), 1);
567 }
568
569 #[test]
570 fn test_cluster_health_avg_response_time() {
571 let nodes = vec![
572 NodeHealth::healthy("node1", "endpoint1", "v1.9.0", 10),
573 NodeHealth::healthy("node2", "endpoint2", "v1.9.0", 20),
574 NodeHealth::healthy("node3", "endpoint3", "v1.9.0", 30),
575 ];
576 let health = ClusterHealth::from_nodes(nodes);
577
578 assert_eq!(health.avg_response_time_ms(), Some(20));
579 }
580
581 #[test]
582 fn test_cluster_health_no_response_time() {
583 let nodes = vec![NodeHealth::unhealthy("node1", "endpoint1", "error")];
584 let health = ClusterHealth::from_nodes(nodes);
585
586 assert_eq!(health.avg_response_time_ms(), None);
587 }
588
589 #[test]
590 fn test_cluster_discovery_builder() {
591 let discovery = ClusterDiscovery::from_endpoint("https://192.168.1.100:50000")
592 .with_ca_cert("/path/to/ca.crt")
593 .with_client_cert("/path/to/client.crt", "/path/to/client.key")
594 .with_connect_timeout(Duration::from_secs(5))
595 .with_request_timeout(Duration::from_secs(3))
596 .build();
597
598 assert_eq!(discovery.endpoint, "https://192.168.1.100:50000");
599 assert_eq!(discovery.ca_cert, Some("/path/to/ca.crt".to_string()));
600 assert_eq!(
601 discovery.client_cert,
602 Some("/path/to/client.crt".to_string())
603 );
604 assert_eq!(
605 discovery.client_key,
606 Some("/path/to/client.key".to_string())
607 );
608 assert_eq!(discovery.connect_timeout, Duration::from_secs(5));
609 assert_eq!(discovery.request_timeout, Duration::from_secs(3));
610 assert!(!discovery.insecure);
611 }
612
613 #[test]
614 fn test_cluster_discovery_builder_insecure() {
615 let discovery = ClusterDiscovery::from_endpoint("https://192.168.1.100:50000")
616 .insecure()
617 .build();
618
619 assert!(discovery.insecure);
620 }
621}