scirs2_core/distributed/cluster/discovery.rs
1//! Node discovery implementations for cluster management
2//!
3//! This module provides various methods for discovering nodes in the cluster,
4//! including static configuration, multicast discovery, DNS service discovery,
5//! and Consul-based discovery.
6
7use crate::error::{CoreError, CoreResult, ErrorContext};
8use std::net::{IpAddr, SocketAddr};
9use std::time::{Duration, Instant};
10
11use super::types::{
12 NodeCapabilities, NodeDiscoveryMethod, NodeInfo, NodeMetadata, NodeStatus, NodeType,
13};
14
15/// Node discovery implementation
16pub struct NodeDiscovery;
17
18impl NodeDiscovery {
19 /// Discover nodes using a specific discovery method
20 pub fn discover_nodes(method: &NodeDiscoveryMethod) -> CoreResult<Vec<NodeInfo>> {
21 match method {
22 NodeDiscoveryMethod::Static(addresses) => Self::discover_static_nodes(addresses),
23 NodeDiscoveryMethod::Multicast { group, port } => {
24 Self::discover_multicast_nodes(group, *port)
25 }
26 NodeDiscoveryMethod::DnsService { service_name } => {
27 Self::discover_dns_service_nodes(service_name)
28 }
29 NodeDiscoveryMethod::Consul { endpoint } => Self::discover_consul_nodes(endpoint),
30 }
31 }
32
33 /// Discover nodes from static address list
34 fn discover_static_nodes(addresses: &[SocketAddr]) -> CoreResult<Vec<NodeInfo>> {
35 let mut nodes = Vec::new();
36 for address in addresses {
37 if Self::is_node_reachable(*address)? {
38 nodes.push(NodeInfo {
39 id: format!("node_{address}"),
40 address: *address,
41 node_type: NodeType::Worker,
42 capabilities: NodeCapabilities::default(),
43 status: NodeStatus::Unknown,
44 last_seen: Instant::now(),
45 metadata: NodeMetadata::default(),
46 });
47 }
48 }
49 Ok(nodes)
50 }
51
52 /// Discover nodes via multicast
53 fn discover_multicast_nodes(group: &IpAddr, port: u16) -> CoreResult<Vec<NodeInfo>> {
54 use std::net::{SocketAddr, UdpSocket};
55 use std::time::Duration;
56
57 let mut discovered_nodes = Vec::new();
58
59 // Create a UDP socket for multicast discovery
60 let socket = UdpSocket::bind(SocketAddr::new(*group, port)).map_err(|e| {
61 CoreError::IoError(crate::error::ErrorContext::new(format!(
62 "Failed to bind multicast socket: {e}"
63 )))
64 })?;
65
66 // Set socket timeout for non-blocking operation
67 socket
68 .set_read_timeout(Some(Duration::from_secs(5)))
69 .map_err(|e| {
70 CoreError::IoError(crate::error::ErrorContext::new(format!(
71 "Failed to set socket timeout: {e}"
72 )))
73 })?;
74
75 // Send discovery broadcast
76 let discovery_message = b"SCIRS2_NODE_DISCOVERY";
77 let broadcast_addr = SocketAddr::new(*group, port);
78
79 match socket.send_to(discovery_message, broadcast_addr) {
80 Ok(_) => {
81 // Listen for responses
82 let mut buffer = [0u8; 1024];
83 let start_time = std::time::Instant::now();
84
85 while start_time.elapsed() < Duration::from_secs(3) {
86 match socket.recv_from(&mut buffer) {
87 Ok((size, addr)) => {
88 let response = String::from_utf8_lossy(&buffer[..size]);
89 if response.starts_with("SCIRS2_NODE_RESPONSE") {
90 // Parse node information from response
91 let parts: Vec<&str> = response.split(':').collect();
92 if parts.len() >= 3 {
93 let nodeid = parts[1usize].to_string();
94 let node_type = match parts[2usize] {
95 "master" => NodeType::Master,
96 "worker" => NodeType::Worker,
97 "storage" => NodeType::Storage,
98 "compute" => NodeType::Compute,
99 _ => NodeType::Worker,
100 };
101
102 discovered_nodes.push(NodeInfo {
103 id: nodeid,
104 address: addr,
105 node_type,
106 capabilities: NodeCapabilities::default(),
107 status: NodeStatus::Unknown,
108 last_seen: Instant::now(),
109 metadata: NodeMetadata::default(),
110 });
111 }
112 }
113 }
114 Err(_) => break, // Timeout or error, exit loop
115 }
116 }
117 }
118 Err(e) => {
119 return Err(CoreError::IoError(crate::error::ErrorContext::new(
120 format!("Failed to send discovery broadcast: {e}"),
121 )));
122 }
123 }
124
125 Ok(discovered_nodes)
126 }
127
128 /// Discover nodes via DNS service discovery
129 fn discover_dns_service_nodes(service_name: &str) -> CoreResult<Vec<NodeInfo>> {
130 // DNS-SD discovery implementation
131 // This would typically use DNS SRV records to discover services
132 #[allow(unused_mut)]
133 let mut discovered_nodes = Vec::new();
134
135 #[cfg(target_os = "linux")]
136 {
137 use std::process::Command;
138 use std::str;
139 // Try to use avahi-browse for DNS-SD discovery on Linux
140 match Command::new("avahi-browse")
141 .arg("-t") // Terminate after cache is exhausted
142 .arg("-r") // Resolve found services
143 .arg("-p") // Parseable output
144 .arg(service_name)
145 .output()
146 {
147 Ok(output) => {
148 let output_str = str::from_utf8(&output.stdout).map_err(|e| {
149 CoreError::ValidationError(ErrorContext::new(format!(
150 "Failed to parse avahi output: {e}"
151 )))
152 })?;
153
154 // Parse avahi-browse output format
155 for line in output_str.lines() {
156 let parts: Vec<&str> = line.split(';').collect();
157 if parts.len() >= 9 && parts[0usize] == "=" {
158 // Format: =;interface;protocol;name;type;domain;hostname;address;port;txt
159 let hostname = parts[6usize];
160 let address_str = parts[7usize];
161 let port_str = parts[8usize];
162
163 if let Ok(port) = port_str.parse::<u16>() {
164 // Try to parse IP address
165 if let Ok(ip) = address_str.parse::<IpAddr>() {
166 let socket_addr = SocketAddr::new(ip, port);
167 let nodeid = format!("dns_{hostname}_{port}");
168
169 discovered_nodes.push(NodeInfo {
170 id: nodeid,
171 address: socket_addr,
172 node_type: NodeType::Worker,
173 capabilities: NodeCapabilities::default(),
174 status: NodeStatus::Unknown,
175 last_seen: Instant::now(),
176 metadata: NodeMetadata {
177 hostname: hostname.to_string(),
178 operating_system: "unknown".to_string(),
179 kernel_version: "unknown".to_string(),
180 container_runtime: None,
181 labels: std::collections::HashMap::new(),
182 },
183 });
184 }
185 }
186 }
187 }
188 }
189 Err(_) => {
190 // avahi-browse not available, try nslookup for basic SRV record resolution
191 match Command::new("nslookup")
192 .arg("-type=SRV")
193 .arg(service_name)
194 .output()
195 {
196 Ok(output) => {
197 let output_str = str::from_utf8(&output.stdout).map_err(|e| {
198 CoreError::ValidationError(ErrorContext::new(format!(
199 "Failed to parse nslookup output: {e}"
200 )))
201 })?;
202
203 // Parse SRV records (simplified)
204 for line in output_str.lines() {
205 if line.contains("service =") {
206 // Extract port and hostname from SRV record
207 let parts: Vec<&str> = line.split_whitespace().collect();
208 if parts.len() >= 4 {
209 if let Ok(port) = parts[2usize].parse::<u16>() {
210 let hostname = parts[3usize].trim_end_matches('.');
211 let nodeid = format!("srv_{hostname}_{port}");
212
213 // Try to resolve hostname to IP
214 if let Ok(mut addrs) =
215 std::net::ToSocketAddrs::to_socket_addrs(&format!(
216 "{hostname}:{port}"
217 ))
218 {
219 if let Some(addr) = addrs.next() {
220 discovered_nodes.push(NodeInfo {
221 id: nodeid,
222 address: addr,
223 node_type: NodeType::Worker,
224 capabilities: NodeCapabilities::default(),
225 status: NodeStatus::Unknown,
226 last_seen: Instant::now(),
227 metadata: NodeMetadata {
228 hostname: hostname.to_string(),
229 operating_system: "unknown".to_string(),
230 kernel_version: "unknown".to_string(),
231 container_runtime: None,
232 labels: std::collections::HashMap::new(
233 ),
234 },
235 });
236 }
237 }
238 }
239 }
240 }
241 }
242 }
243 Err(_) => {
244 // Both avahi-browse and nslookup failed, return empty list
245 }
246 }
247 }
248 }
249 }
250
251 #[cfg(target_os = "windows")]
252 {
253 use std::process::Command;
254 use std::str;
255 // On Windows, try to use dns-sd command if available
256 match Command::new("dns-sd")
257 .arg("-B") // Browse for services
258 .arg(service_name)
259 .output()
260 {
261 Ok(output) => {
262 let output_str = str::from_utf8(&output.stdout).map_err(|e| {
263 CoreError::ValidationError(ErrorContext::new(format!(
264 "Failed to parse dns-sd output: {e}"
265 )))
266 })?;
267
268 // Parse dns-sd output (simplified implementation)
269 for line in output_str.lines() {
270 if line.contains(service_name) {
271 // Extract service information
272 // This is a simplified parser - real implementation would be more robust
273 let parts: Vec<&str> = line.split_whitespace().collect();
274 if parts.len() >= 2 {
275 let service_instance = parts[1usize];
276 let nodeid = format!("dnssd_{service_instance}");
277
278 // For now, use a default port and localhost
279 // Real implementation would resolve the service
280 let socket_addr = SocketAddr::new(
281 IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
282 8080,
283 );
284
285 discovered_nodes.push(NodeInfo {
286 id: nodeid,
287 address: socket_addr,
288 node_type: NodeType::Worker,
289 capabilities: NodeCapabilities::default(),
290 status: NodeStatus::Unknown,
291 last_seen: Instant::now(),
292 metadata: NodeMetadata::default(),
293 });
294 }
295 }
296 }
297 }
298 Err(_) => {
299 // dns-sd not available
300 }
301 }
302 }
303
304 Ok(discovered_nodes)
305 }
306
307 /// Discover nodes via Consul service registry
308 fn discover_consul_nodes(endpoint: &str) -> CoreResult<Vec<NodeInfo>> {
309 // Consul discovery implementation via HTTP API
310 use std::process::Command;
311 use std::str;
312
313 let mut discovered_nodes = Vec::new();
314
315 // Try to query Consul catalog API for services
316 let consul_url = if endpoint.starts_with("http") {
317 format!("{endpoint}/v1/catalog/services")
318 } else {
319 format!("http://{endpoint}/v1/catalog/services")
320 };
321
322 // Use curl to query Consul API (most portable approach)
323 match Command::new("curl")
324 .arg("-s") // Silent mode
325 .arg("-f") // Fail silently on HTTP errors
326 .arg("--connect-timeout")
327 .arg("5") // 5 second timeout
328 .arg(&consul_url)
329 .output()
330 {
331 Ok(output) => {
332 if output.status.success() {
333 let json_str = str::from_utf8(&output.stdout).map_err(|e| {
334 CoreError::ValidationError(ErrorContext::new(format!(
335 "Failed to parse Consul response: {e}"
336 )))
337 })?;
338
339 // Parse JSON response (simplified - would use serde_json in real implementation)
340 // Looking for service names in the format: {"service_name": ["tag1", "tag2"]}
341 if json_str.trim().starts_with('{') {
342 // Extract service names from JSON
343 let cleaned = json_str.replace(['{', '}'], "");
344 for service_entry in cleaned.split(',') {
345 let service_parts: Vec<&str> = service_entry.split(':').collect();
346 if service_parts.len() >= 2 {
347 let service_name = service_parts[0usize].trim().trim_matches('"');
348
349 // Query specific service details
350 let service_url = if endpoint.starts_with("http") {
351 format!("{endpoint}/v1/catalog/service/{service_name}")
352 } else {
353 format!("http://{endpoint}/v1/catalog/service/{service_name}")
354 };
355
356 match Command::new("curl")
357 .arg("-s")
358 .arg("-f")
359 .arg("--connect-timeout")
360 .arg("3")
361 .arg(&service_url)
362 .output()
363 {
364 Ok(service_output) => {
365 if service_output.status.success() {
366 let service_json =
367 str::from_utf8(&service_output.stdout)
368 .unwrap_or("");
369
370 // Simple JSON parsing to extract Address and ServicePort
371 // In real implementation, would use proper JSON parsing
372 if service_json.contains("\"Address\"")
373 && service_json.contains("\"ServicePort\"")
374 {
375 // Extract address and port (very simplified)
376 let lines: Vec<&str> =
377 service_json.lines().collect();
378 let mut address_str = "";
379 let mut port_str = "";
380
381 for line in lines {
382 if line.contains("\"Address\"") {
383 if let Some(addr_part) =
384 line.split(':').nth(1)
385 {
386 address_str = addr_part
387 .trim()
388 .trim_matches('"')
389 .trim_matches(',');
390 }
391 }
392 if line.contains("\"ServicePort\"") {
393 if let Some(port_part) =
394 line.split(':').nth(1)
395 {
396 port_str =
397 port_part.trim().trim_matches(',');
398 }
399 }
400 }
401
402 // Create node info if we have both address and port
403 if !address_str.is_empty() && !port_str.is_empty() {
404 if let (Ok(ip), Ok(port)) = (
405 address_str.parse::<IpAddr>(),
406 port_str.parse::<u16>(),
407 ) {
408 let socket_addr = SocketAddr::new(ip, port);
409 let nodeid = format!(
410 "consul_{service_name}_{address_str}"
411 );
412
413 discovered_nodes.push(NodeInfo {
414 id: nodeid,
415 address: socket_addr,
416 node_type: NodeType::Worker,
417 capabilities: NodeCapabilities::default(),
418 status: NodeStatus::Unknown,
419 last_seen: Instant::now(),
420 metadata: NodeMetadata {
421 hostname: address_str.to_string(),
422 operating_system: "unknown".to_string(),
423 kernel_version: "unknown".to_string(),
424 container_runtime: Some("consul".to_string()),
425 labels: {
426 let mut labels = std::collections::HashMap::new();
427 labels.insert("service".to_string(), service_name.to_string());
428 labels.insert("discovery".to_string(), "consul".to_string());
429 labels
430 },
431 },
432 });
433 }
434 }
435 }
436 }
437 }
438 Err(_) => continue, // Skip this service if query fails
439 }
440 }
441 }
442 }
443 } else {
444 return Err(CoreError::IoError(ErrorContext::new(format!(
445 "Failed to connect to Consul at {endpoint}"
446 ))));
447 }
448 }
449 Err(_) => {
450 return Err(CoreError::InvalidState(ErrorContext::new(
451 "curl command not available for Consul discovery",
452 )));
453 }
454 }
455
456 Ok(discovered_nodes)
457 }
458
459 /// Check if a node is reachable
460 fn is_node_reachable(address: SocketAddr) -> CoreResult<bool> {
461 // Simple reachability check
462 // In a real implementation, this would do proper health checking
463 Ok(true) // Placeholder
464 }
465}