mecha10_cli/services/discovery/
mod.rs

1#![allow(dead_code)]
2
3//! Discovery service for finding Mecha10 nodes on the network
4//!
5//! This service provides operations for discovering and monitoring Mecha10
6//! nodes through Redis pub/sub and network scanning.
7
8mod node_manager;
9mod parser;
10mod redis_ops;
11
12use anyhow::Result;
13use std::collections::HashMap;
14use std::time::{Duration, SystemTime};
15
16/// Information about a discovered node
17#[derive(Debug, Clone)]
18pub struct NodeInfo {
19    pub node_id: String,
20    pub node_type: String,
21    pub host: String,
22    pub port: u16,
23    pub last_seen: SystemTime,
24    pub metadata: HashMap<String, String>,
25}
26
27/// Discovery configuration
28#[derive(Debug, Clone)]
29pub struct DiscoveryConfig {
30    pub redis_url: String,
31    pub discovery_topic: String,
32    pub heartbeat_interval: Duration,
33    pub node_timeout: Duration,
34    pub scan_subnet: Option<String>,
35}
36
37impl Default for DiscoveryConfig {
38    fn default() -> Self {
39        Self {
40            redis_url: "redis://localhost:6379".to_string(),
41            discovery_topic: "mecha10/discovery".to_string(),
42            heartbeat_interval: Duration::from_secs(5),
43            node_timeout: Duration::from_secs(30),
44            scan_subnet: None,
45        }
46    }
47}
48
49/// Discovery service for finding Mecha10 nodes
50///
51/// # Examples
52///
53/// ```rust,ignore
54/// use mecha10_cli::services::{DiscoveryService, DiscoveryConfig};
55///
56/// # async fn example() -> anyhow::Result<()> {
57/// let service = DiscoveryService::new();
58///
59/// // Configure discovery
60/// let config = DiscoveryConfig::default();
61///
62/// // Discover nodes
63/// let nodes = service.discover(&config).await?;
64/// for node in nodes {
65///     println!("Found node: {} at {}:{}", node.node_id, node.host, node.port);
66/// }
67///
68/// // Monitor for new nodes
69/// service.monitor(&config, |node| {
70///     println!("New node discovered: {}", node.node_id);
71/// }).await?;
72/// # Ok(())
73/// # }
74/// ```
75pub struct DiscoveryService {
76    discovered_nodes: HashMap<String, NodeInfo>,
77}
78
79impl DiscoveryService {
80    /// Create a new discovery service
81    pub fn new() -> Self {
82        Self {
83            discovered_nodes: HashMap::new(),
84        }
85    }
86
87    /// Discover active nodes on the network
88    ///
89    /// # Arguments
90    ///
91    /// * `config` - Discovery configuration
92    pub async fn discover(&mut self, config: &DiscoveryConfig) -> Result<Vec<NodeInfo>> {
93        redis_ops::discover(config, &mut self.discovered_nodes).await
94    }
95
96    /// Monitor for new nodes
97    ///
98    /// # Arguments
99    ///
100    /// * `config` - Discovery configuration
101    /// * `callback` - Function to call when a new node is discovered
102    pub async fn monitor<F>(&mut self, config: &DiscoveryConfig, callback: F) -> Result<()>
103    where
104        F: FnMut(&NodeInfo),
105    {
106        redis_ops::monitor(config, &mut self.discovered_nodes, callback).await
107    }
108
109    /// Get list of currently known nodes
110    pub fn get_nodes(&self) -> Vec<&NodeInfo> {
111        node_manager::get_nodes(&self.discovered_nodes)
112    }
113
114    /// Get a specific node by ID
115    ///
116    /// # Arguments
117    ///
118    /// * `node_id` - Node identifier
119    pub fn get_node(&self, node_id: &str) -> Option<&NodeInfo> {
120        node_manager::get_node(&self.discovered_nodes, node_id)
121    }
122
123    /// Remove stale nodes that haven't sent heartbeats
124    ///
125    /// # Arguments
126    ///
127    /// * `timeout` - Maximum time since last heartbeat
128    pub fn cleanup_stale_nodes(&mut self, timeout: Duration) -> usize {
129        node_manager::cleanup_stale_nodes(&mut self.discovered_nodes, timeout)
130    }
131
132    /// Broadcast presence announcement
133    ///
134    /// # Arguments
135    ///
136    /// * `config` - Discovery configuration
137    /// * `node_info` - Information about this node
138    pub async fn announce(&self, config: &DiscoveryConfig, node_info: &NodeInfo) -> Result<()> {
139        redis_ops::announce(config, node_info).await
140    }
141
142    /// Send heartbeat for a node
143    ///
144    /// # Arguments
145    ///
146    /// * `config` - Discovery configuration
147    /// * `node_id` - Node identifier
148    pub async fn heartbeat(&self, config: &DiscoveryConfig, node_id: &str) -> Result<()> {
149        redis_ops::heartbeat(config, node_id).await
150    }
151
152    /// Get node count
153    pub fn node_count(&self) -> usize {
154        node_manager::node_count(&self.discovered_nodes)
155    }
156
157    /// Get nodes by type
158    ///
159    /// # Arguments
160    ///
161    /// * `node_type` - Type of nodes to filter
162    pub fn get_nodes_by_type(&self, node_type: &str) -> Vec<&NodeInfo> {
163        node_manager::get_nodes_by_type(&self.discovered_nodes, node_type)
164    }
165
166    /// Clear all discovered nodes
167    pub fn clear(&mut self) {
168        node_manager::clear(&mut self.discovered_nodes);
169    }
170}
171
172impl Default for DiscoveryService {
173    fn default() -> Self {
174        Self::new()
175    }
176}