scirs2_core/distributed/cluster/
discovery.rs

1//! Node discovery implementations for cluster management
2//!
3//! This module provides various methods for discovering nodes in the cluster,
4//! including static configuration, multicast discovery, DNS service discovery,
5//! and Consul-based discovery.
6
7use crate::error::{CoreError, CoreResult, ErrorContext};
8use std::net::{IpAddr, SocketAddr};
9use std::time::{Duration, Instant};
10
11use super::types::{
12    NodeCapabilities, NodeDiscoveryMethod, NodeInfo, NodeMetadata, NodeStatus, NodeType,
13};
14
15/// Node discovery implementation
16pub struct NodeDiscovery;
17
18impl NodeDiscovery {
19    /// Discover nodes using a specific discovery method
20    pub fn discover_nodes(method: &NodeDiscoveryMethod) -> CoreResult<Vec<NodeInfo>> {
21        match method {
22            NodeDiscoveryMethod::Static(addresses) => Self::discover_static_nodes(addresses),
23            NodeDiscoveryMethod::Multicast { group, port } => {
24                Self::discover_multicast_nodes(group, *port)
25            }
26            NodeDiscoveryMethod::DnsService { service_name } => {
27                Self::discover_dns_service_nodes(service_name)
28            }
29            NodeDiscoveryMethod::Consul { endpoint } => Self::discover_consul_nodes(endpoint),
30        }
31    }
32
33    /// Discover nodes from static address list
34    fn discover_static_nodes(addresses: &[SocketAddr]) -> CoreResult<Vec<NodeInfo>> {
35        let mut nodes = Vec::new();
36        for address in addresses {
37            if Self::is_node_reachable(*address)? {
38                nodes.push(NodeInfo {
39                    id: format!("node_{address}"),
40                    address: *address,
41                    node_type: NodeType::Worker,
42                    capabilities: NodeCapabilities::default(),
43                    status: NodeStatus::Unknown,
44                    last_seen: Instant::now(),
45                    metadata: NodeMetadata::default(),
46                });
47            }
48        }
49        Ok(nodes)
50    }
51
52    /// Discover nodes via multicast
53    fn discover_multicast_nodes(group: &IpAddr, port: u16) -> CoreResult<Vec<NodeInfo>> {
54        use std::net::{SocketAddr, UdpSocket};
55        use std::time::Duration;
56
57        let mut discovered_nodes = Vec::new();
58
59        // Create a UDP socket for multicast discovery
60        let socket = UdpSocket::bind(SocketAddr::new(*group, port)).map_err(|e| {
61            CoreError::IoError(crate::error::ErrorContext::new(format!(
62                "Failed to bind multicast socket: {e}"
63            )))
64        })?;
65
66        // Set socket timeout for non-blocking operation
67        socket
68            .set_read_timeout(Some(Duration::from_secs(5)))
69            .map_err(|e| {
70                CoreError::IoError(crate::error::ErrorContext::new(format!(
71                    "Failed to set socket timeout: {e}"
72                )))
73            })?;
74
75        // Send discovery broadcast
76        let discovery_message = b"SCIRS2_NODE_DISCOVERY";
77        let broadcast_addr = SocketAddr::new(*group, port);
78
79        match socket.send_to(discovery_message, broadcast_addr) {
80            Ok(_) => {
81                // Listen for responses
82                let mut buffer = [0u8; 1024];
83                let start_time = std::time::Instant::now();
84
85                while start_time.elapsed() < Duration::from_secs(3) {
86                    match socket.recv_from(&mut buffer) {
87                        Ok((size, addr)) => {
88                            let response = String::from_utf8_lossy(&buffer[..size]);
89                            if response.starts_with("SCIRS2_NODE_RESPONSE") {
90                                // Parse node information from response
91                                let parts: Vec<&str> = response.split(':').collect();
92                                if parts.len() >= 3 {
93                                    let nodeid = parts[1usize].to_string();
94                                    let node_type = match parts[2usize] {
95                                        "master" => NodeType::Master,
96                                        "worker" => NodeType::Worker,
97                                        "storage" => NodeType::Storage,
98                                        "compute" => NodeType::Compute,
99                                        _ => NodeType::Worker,
100                                    };
101
102                                    discovered_nodes.push(NodeInfo {
103                                        id: nodeid,
104                                        address: addr,
105                                        node_type,
106                                        capabilities: NodeCapabilities::default(),
107                                        status: NodeStatus::Unknown,
108                                        last_seen: Instant::now(),
109                                        metadata: NodeMetadata::default(),
110                                    });
111                                }
112                            }
113                        }
114                        Err(_) => break, // Timeout or error, exit loop
115                    }
116                }
117            }
118            Err(e) => {
119                return Err(CoreError::IoError(crate::error::ErrorContext::new(
120                    format!("Failed to send discovery broadcast: {e}"),
121                )));
122            }
123        }
124
125        Ok(discovered_nodes)
126    }
127
128    /// Discover nodes via DNS service discovery
129    fn discover_dns_service_nodes(service_name: &str) -> CoreResult<Vec<NodeInfo>> {
130        // DNS-SD discovery implementation
131        // This would typically use DNS SRV records to discover services
132        #[allow(unused_mut)]
133        let mut discovered_nodes = Vec::new();
134
135        #[cfg(target_os = "linux")]
136        {
137            use std::process::Command;
138            use std::str;
139            // Try to use avahi-browse for DNS-SD discovery on Linux
140            match Command::new("avahi-browse")
141                .arg("-t")  // Terminate after cache is exhausted
142                .arg("-r")  // Resolve found services
143                .arg("-p")  // Parseable output
144                .arg(service_name)
145                .output()
146            {
147                Ok(output) => {
148                    let output_str = str::from_utf8(&output.stdout).map_err(|e| {
149                        CoreError::ValidationError(ErrorContext::new(format!(
150                            "Failed to parse avahi output: {e}"
151                        )))
152                    })?;
153
154                    // Parse avahi-browse output format
155                    for line in output_str.lines() {
156                        let parts: Vec<&str> = line.split(';').collect();
157                        if parts.len() >= 9 && parts[0usize] == "=" {
158                            // Format: =;interface;protocol;name;type;domain;hostname;address;port;txt
159                            let hostname = parts[6usize];
160                            let address_str = parts[7usize];
161                            let port_str = parts[8usize];
162
163                            if let Ok(port) = port_str.parse::<u16>() {
164                                // Try to parse IP address
165                                if let Ok(ip) = address_str.parse::<IpAddr>() {
166                                    let socket_addr = SocketAddr::new(ip, port);
167                                    let nodeid = format!("dns_{hostname}_{port}");
168
169                                    discovered_nodes.push(NodeInfo {
170                                        id: nodeid,
171                                        address: socket_addr,
172                                        node_type: NodeType::Worker,
173                                        capabilities: NodeCapabilities::default(),
174                                        status: NodeStatus::Unknown,
175                                        last_seen: Instant::now(),
176                                        metadata: NodeMetadata {
177                                            hostname: hostname.to_string(),
178                                            operating_system: "unknown".to_string(),
179                                            kernel_version: "unknown".to_string(),
180                                            container_runtime: None,
181                                            labels: std::collections::HashMap::new(),
182                                        },
183                                    });
184                                }
185                            }
186                        }
187                    }
188                }
189                Err(_) => {
190                    // avahi-browse not available, try nslookup for basic SRV record resolution
191                    match Command::new("nslookup")
192                        .arg("-type=SRV")
193                        .arg(service_name)
194                        .output()
195                    {
196                        Ok(output) => {
197                            let output_str = str::from_utf8(&output.stdout).map_err(|e| {
198                                CoreError::ValidationError(ErrorContext::new(format!(
199                                    "Failed to parse nslookup output: {e}"
200                                )))
201                            })?;
202
203                            // Parse SRV records (simplified)
204                            for line in output_str.lines() {
205                                if line.contains("service =") {
206                                    // Extract port and hostname from SRV record
207                                    let parts: Vec<&str> = line.split_whitespace().collect();
208                                    if parts.len() >= 4 {
209                                        if let Ok(port) = parts[2usize].parse::<u16>() {
210                                            let hostname = parts[3usize].trim_end_matches('.');
211                                            let nodeid = format!("srv_{hostname}_{port}");
212
213                                            // Try to resolve hostname to IP
214                                            if let Ok(mut addrs) =
215                                                std::net::ToSocketAddrs::to_socket_addrs(&format!(
216                                                    "{hostname}:{port}"
217                                                ))
218                                            {
219                                                if let Some(addr) = addrs.next() {
220                                                    discovered_nodes.push(NodeInfo {
221                                                        id: nodeid,
222                                                        address: addr,
223                                                        node_type: NodeType::Worker,
224                                                        capabilities: NodeCapabilities::default(),
225                                                        status: NodeStatus::Unknown,
226                                                        last_seen: Instant::now(),
227                                                        metadata: NodeMetadata {
228                                                            hostname: hostname.to_string(),
229                                                            operating_system: "unknown".to_string(),
230                                                            kernel_version: "unknown".to_string(),
231                                                            container_runtime: None,
232                                                            labels: std::collections::HashMap::new(
233                                                            ),
234                                                        },
235                                                    });
236                                                }
237                                            }
238                                        }
239                                    }
240                                }
241                            }
242                        }
243                        Err(_) => {
244                            // Both avahi-browse and nslookup failed, return empty list
245                        }
246                    }
247                }
248            }
249        }
250
251        #[cfg(target_os = "windows")]
252        {
253            use std::process::Command;
254            use std::str;
255            // On Windows, try to use dns-sd command if available
256            match Command::new("dns-sd")
257                .arg("-B")  // Browse for services
258                .arg(service_name)
259                .output()
260            {
261                Ok(output) => {
262                    let output_str = str::from_utf8(&output.stdout).map_err(|e| {
263                        CoreError::ValidationError(ErrorContext::new(format!(
264                            "Failed to parse dns-sd output: {e}"
265                        )))
266                    })?;
267
268                    // Parse dns-sd output (simplified implementation)
269                    for line in output_str.lines() {
270                        if line.contains(service_name) {
271                            // Extract service information
272                            // This is a simplified parser - real implementation would be more robust
273                            let parts: Vec<&str> = line.split_whitespace().collect();
274                            if parts.len() >= 2 {
275                                let service_instance = parts[1usize];
276                                let nodeid = format!("dnssd_{service_instance}");
277
278                                // For now, use a default port and localhost
279                                // Real implementation would resolve the service
280                                let socket_addr = SocketAddr::new(
281                                    IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
282                                    8080,
283                                );
284
285                                discovered_nodes.push(NodeInfo {
286                                    id: nodeid,
287                                    address: socket_addr,
288                                    node_type: NodeType::Worker,
289                                    capabilities: NodeCapabilities::default(),
290                                    status: NodeStatus::Unknown,
291                                    last_seen: Instant::now(),
292                                    metadata: NodeMetadata::default(),
293                                });
294                            }
295                        }
296                    }
297                }
298                Err(_) => {
299                    // dns-sd not available
300                }
301            }
302        }
303
304        Ok(discovered_nodes)
305    }
306
307    /// Discover nodes via Consul service registry
308    fn discover_consul_nodes(endpoint: &str) -> CoreResult<Vec<NodeInfo>> {
309        // Consul discovery implementation via HTTP API
310        use std::process::Command;
311        use std::str;
312
313        let mut discovered_nodes = Vec::new();
314
315        // Try to query Consul catalog API for services
316        let consul_url = if endpoint.starts_with("http") {
317            format!("{endpoint}/v1/catalog/services")
318        } else {
319            format!("http://{endpoint}/v1/catalog/services")
320        };
321
322        // Use curl to query Consul API (most portable approach)
323        match Command::new("curl")
324            .arg("-s")  // Silent mode
325            .arg("-f")  // Fail silently on HTTP errors
326            .arg("--connect-timeout")
327            .arg("5")   // 5 second timeout
328            .arg(&consul_url)
329            .output()
330        {
331            Ok(output) => {
332                if output.status.success() {
333                    let json_str = str::from_utf8(&output.stdout).map_err(|e| {
334                        CoreError::ValidationError(ErrorContext::new(format!(
335                            "Failed to parse Consul response: {e}"
336                        )))
337                    })?;
338
339                    // Parse JSON response (simplified - would use serde_json in real implementation)
340                    // Looking for service names in the format: {"service_name": ["tag1", "tag2"]}
341                    if json_str.trim().starts_with('{') {
342                        // Extract service names from JSON
343                        let cleaned = json_str.replace(['{', '}'], "");
344                        for service_entry in cleaned.split(',') {
345                            let service_parts: Vec<&str> = service_entry.split(':').collect();
346                            if service_parts.len() >= 2 {
347                                let service_name = service_parts[0usize].trim().trim_matches('"');
348
349                                // Query specific service details
350                                let service_url = if endpoint.starts_with("http") {
351                                    format!("{endpoint}/v1/catalog/service/{service_name}")
352                                } else {
353                                    format!("http://{endpoint}/v1/catalog/service/{service_name}")
354                                };
355
356                                match Command::new("curl")
357                                    .arg("-s")
358                                    .arg("-f")
359                                    .arg("--connect-timeout")
360                                    .arg("3")
361                                    .arg(&service_url)
362                                    .output()
363                                {
364                                    Ok(service_output) => {
365                                        if service_output.status.success() {
366                                            let service_json =
367                                                str::from_utf8(&service_output.stdout)
368                                                    .unwrap_or("");
369
370                                            // Simple JSON parsing to extract Address and ServicePort
371                                            // In real implementation, would use proper JSON parsing
372                                            if service_json.contains("\"Address\"")
373                                                && service_json.contains("\"ServicePort\"")
374                                            {
375                                                // Extract address and port (very simplified)
376                                                let lines: Vec<&str> =
377                                                    service_json.lines().collect();
378                                                let mut address_str = "";
379                                                let mut port_str = "";
380
381                                                for line in lines {
382                                                    if line.contains("\"Address\"") {
383                                                        if let Some(addr_part) =
384                                                            line.split(':').nth(1)
385                                                        {
386                                                            address_str = addr_part
387                                                                .trim()
388                                                                .trim_matches('"')
389                                                                .trim_matches(',');
390                                                        }
391                                                    }
392                                                    if line.contains("\"ServicePort\"") {
393                                                        if let Some(port_part) =
394                                                            line.split(':').nth(1)
395                                                        {
396                                                            port_str =
397                                                                port_part.trim().trim_matches(',');
398                                                        }
399                                                    }
400                                                }
401
402                                                // Create node info if we have both address and port
403                                                if !address_str.is_empty() && !port_str.is_empty() {
404                                                    if let (Ok(ip), Ok(port)) = (
405                                                        address_str.parse::<IpAddr>(),
406                                                        port_str.parse::<u16>(),
407                                                    ) {
408                                                        let socket_addr = SocketAddr::new(ip, port);
409                                                        let nodeid = format!(
410                                                            "consul_{service_name}_{address_str}"
411                                                        );
412
413                                                        discovered_nodes.push(NodeInfo {
414                                                            id: nodeid,
415                                                            address: socket_addr,
416                                                            node_type: NodeType::Worker,
417                                                            capabilities: NodeCapabilities::default(),
418                                                            status: NodeStatus::Unknown,
419                                                            last_seen: Instant::now(),
420                                                            metadata: NodeMetadata {
421                                                                hostname: address_str.to_string(),
422                                                                operating_system: "unknown".to_string(),
423                                                                kernel_version: "unknown".to_string(),
424                                                                container_runtime: Some("consul".to_string()),
425                                                                labels: {
426                                                                    let mut labels = std::collections::HashMap::new();
427                                                                    labels.insert("service".to_string(), service_name.to_string());
428                                                                    labels.insert("discovery".to_string(), "consul".to_string());
429                                                                    labels
430                                                                },
431                                                            },
432                                                        });
433                                                    }
434                                                }
435                                            }
436                                        }
437                                    }
438                                    Err(_) => continue, // Skip this service if query fails
439                                }
440                            }
441                        }
442                    }
443                } else {
444                    return Err(CoreError::IoError(ErrorContext::new(format!(
445                        "Failed to connect to Consul at {endpoint}"
446                    ))));
447                }
448            }
449            Err(_) => {
450                return Err(CoreError::InvalidState(ErrorContext::new(
451                    "curl command not available for Consul discovery",
452                )));
453            }
454        }
455
456        Ok(discovered_nodes)
457    }
458
459    /// Check if a node is reachable
460    fn is_node_reachable(address: SocketAddr) -> CoreResult<bool> {
461        // Simple reachability check
462        // In a real implementation, this would do proper health checking
463        Ok(true) // Placeholder
464    }
465}