auto_discovery/
discovery.rs

1//! Main service discovery implementation
2
3use crate::{
4    config::DiscoveryConfig,
5    error::{DiscoveryError, Result},
6    protocols::ProtocolManager,
7    service::ServiceInfo,
8    types::ProtocolType,
9};
10use std::{
11    collections::HashMap,
12    sync::Arc,
13};
14use tokio::sync::Mutex;
15use tracing::{debug, info};
16
17/// Main service discovery interface
18pub struct ServiceDiscovery {
19    config: DiscoveryConfig,
20    protocol_manager: ProtocolManager,
21    discovered_services: Arc<Mutex<HashMap<String, ServiceInfo>>>,
22    registered_services: Arc<Mutex<HashMap<String, ServiceInfo>>>,
23}
24
25impl ServiceDiscovery {
26    /// Create a new service discovery instance with the given configuration
27    /// 
28    /// # Arguments
29    /// 
30    /// * `config` - The discovery configuration to use
31    /// 
32    /// # Errors
33    /// 
34    /// Returns an error if the configuration is invalid or if protocol initialization fails
35    pub async fn new(config: DiscoveryConfig) -> Result<Self> {
36        // Validate configuration before proceeding
37        config.validate()?;
38        
39        let protocol_manager = ProtocolManager::new(config.clone()).await?;
40
41        Ok(Self {
42            config,
43            protocol_manager,
44            discovered_services: Arc::new(Mutex::new(HashMap::new())),
45            registered_services: Arc::new(Mutex::new(HashMap::new())),
46        })
47    }
48
49    /// Discover services with optional protocol type filter
50    pub async fn discover_services(&self, protocol_type: Option<ProtocolType>) -> Result<Vec<ServiceInfo>> {
51        debug!("Starting service discovery");
52        
53        let service_types = self.config.service_types().to_vec();
54        if service_types.is_empty() {
55            return Err(DiscoveryError::configuration("No service types configured for discovery"));
56        }
57
58        let timeout = Some(self.config.protocol_timeout());
59        let mut services = match protocol_type {
60            Some(protocol) => {
61                if !self.config.is_protocol_enabled(protocol) {
62                    return Err(DiscoveryError::protocol(format!("Protocol {protocol:?} is not enabled")));
63                }
64                self.protocol_manager.discover_services_with_protocol(protocol, service_types, timeout).await?
65            }
66            None => self.protocol_manager.discover_services(service_types, timeout).await?,
67        };
68
69        // Apply service filtering
70        if let Some(filter) = self.config.filter() {
71            services.retain(|service| filter.matches(service));
72        }
73
74        // Limit number of services if configured
75        let max_services = self.config.max_services();
76        if max_services > 0 && services.len() > max_services {
77            services.truncate(max_services);
78        }
79
80        // Update discovered services cache
81        let mut discovered = self.discovered_services.lock().await;
82        for service in &services {
83            discovered.insert(service.name().to_string(), service.clone());
84        }
85
86        info!("Discovered {} services", services.len());
87        info!("Discovered {} services", services.len());
88        Ok(services)
89    }
90
91    /// Discover services with filtering by service types
92    /// 
93    /// This provides more granular control over service discovery than the basic
94    /// `discover_services` method.
95    /// 
96    /// # Arguments
97    /// 
98    /// * `service_types` - Optional list of specific service types to discover. 
99    ///   If None, uses all configured service types.
100    /// * `protocol_type` - Optional protocol filter. If None, uses all enabled protocols.
101    /// 
102    /// # Example
103    /// 
104    /// ```rust
105    /// use auto_discovery::{ServiceDiscovery, types::{ServiceType, ProtocolType}};
106    /// 
107    /// # #[tokio::main]
108    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
109    /// # let discovery = ServiceDiscovery::new(auto_discovery::config::DiscoveryConfig::new()).await?;
110    /// // Discover only HTTP services using mDNS
111    /// let http_services = discovery.discover_services_filtered(
112    ///     Some(vec![ServiceType::new("_http._tcp")?]),
113    ///     Some(ProtocolType::Mdns)
114    /// ).await?;
115    /// 
116    /// // Discover HTTP services using any protocol
117    /// let all_services = discovery.discover_services_filtered(
118    ///     Some(vec![ServiceType::new("_http._tcp")?]), 
119    ///     None
120    /// ).await?;
121    /// # Ok(())
122    /// # }
123    /// ```
124    pub async fn discover_services_filtered(&self, 
125        service_types: Option<Vec<crate::types::ServiceType>>, 
126        protocol_type: Option<ProtocolType>
127    ) -> Result<Vec<ServiceInfo>> {
128        debug!("Starting filtered service discovery");
129        
130        let target_service_types = match service_types {
131            Some(types) => types,
132            None => self.config.service_types().to_vec()
133        };
134
135        if target_service_types.is_empty() {
136            return Err(DiscoveryError::configuration("No service types specified for discovery"));
137        }
138
139        let timeout = Some(self.config.protocol_timeout());
140        let mut services = match protocol_type {
141            Some(protocol) => {
142                if !self.config.is_protocol_enabled(protocol) {
143                    return Err(DiscoveryError::protocol(format!("Protocol {protocol:?} is not enabled")));
144                }
145                self.protocol_manager.discover_services_with_protocol(protocol, target_service_types, timeout).await?
146            }
147            None => self.protocol_manager.discover_services(target_service_types, timeout).await?,
148        };
149
150        // Apply service filtering
151        if let Some(filter) = self.config.filter() {
152            services.retain(|service| filter.matches(service));
153        }
154
155        // Update discovered services cache
156        {
157            let mut discovered = self.discovered_services.lock().await;
158            for service in &services {
159                discovered.insert(service.id.to_string(), service.clone());
160            }
161        }
162
163        info!("Discovered {} filtered services", services.len());
164        Ok(services)
165    }
166
167    /// Register a service
168    pub async fn register_service(&self, service: ServiceInfo) -> Result<()> {
169        let service_name = service.name().to_string();
170        debug!("Registering service: {}", service_name);
171
172        self.protocol_manager.register_service(service.clone()).await?;
173
174        let mut registered = self.registered_services.lock().await;
175        registered.insert(service_name.clone(), service);
176
177        info!("Successfully registered service: {}", service_name);
178        Ok(())
179    }
180
181    /// Unregister a service
182    pub async fn unregister_service(&self, service: &ServiceInfo) -> Result<()> {
183        let service_name = service.name().to_string();
184        debug!("Unregistering service: {}", service_name);
185
186        self.protocol_manager.unregister_service(service).await?;
187
188        let mut registered = self.registered_services.lock().await;
189        registered.remove(&service_name);
190
191        info!("Successfully unregistered service: {}", service_name);
192        Ok(())
193    }
194
195    /// Verify a service is still available
196    pub async fn verify_service(&self, service: &ServiceInfo) -> Result<bool> {
197        debug!("Verifying service: {}", service.name());
198
199        self.protocol_manager.verify_service(service).await
200    }
201
202    /// Get all discovered services
203    pub async fn get_discovered_services(&self) -> Vec<ServiceInfo> {
204        self.discovered_services.lock().await
205            .values()
206            .cloned()
207            .collect()
208    }
209
210    /// Get all registered services
211    pub async fn get_registered_services(&self) -> Vec<ServiceInfo> {
212        self.registered_services.lock().await
213            .values()
214            .cloned()
215            .collect()
216    }
217
218    /// Check if a service exists
219    pub async fn service_exists(&self, service_name: &str) -> bool {
220        self.discovered_services.lock().await.contains_key(service_name) ||
221        self.registered_services.lock().await.contains_key(service_name)
222    }
223
224    /// Update discovery configuration
225    pub async fn update_config(&mut self, config: DiscoveryConfig) -> Result<()> {
226        self.config = config.clone();
227        self.protocol_manager = ProtocolManager::new(config).await?;
228        Ok(())
229    }
230}
231
232
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237    use crate::types::ServiceType;
238    use std::time::Duration;
239
240    #[tokio::test]
241    async fn test_service_discovery_creation() {
242        let config = DiscoveryConfig::new()
243            .with_service_type(ServiceType::new("_test._tcp").unwrap())
244            .with_timeout(Duration::from_secs(1));
245
246        let discovery = ServiceDiscovery::new(config).await;
247        assert!(discovery.is_ok());
248    }
249
250    #[tokio::test]
251    async fn test_service_registration() {
252        let config = DiscoveryConfig::new();
253        let discovery = ServiceDiscovery::new(config).await.unwrap();
254
255        let service = ServiceInfo::new("Test Service", "_test._tcp", 8080, None).unwrap();
256        let result = discovery.register_service(service).await;
257        
258        // Registration might fail due to missing protocol implementation
259        // This is expected in unit tests
260        match result {
261            Ok(_) => {
262                assert_eq!(discovery.get_registered_services().await.len(), 1);
263            }
264            Err(_) => {
265                // Expected in unit test environment
266            }
267        }
268    }
269
270    #[tokio::test]
271    async fn test_event_subscription() {
272        let config = DiscoveryConfig::new();
273        let discovery = ServiceDiscovery::new(config).await.unwrap();
274
275        // Event subscription is not explicitly tested here as it depends on the underlying protocol implementation
276        // Just ensure it can be called without error
277        let _ = discovery.discover_services(None).await;
278    }
279
280    #[tokio::test]
281    async fn test_config_validation() {
282        let invalid_config = DiscoveryConfig::new().with_timeout(Duration::ZERO);
283        let discovery = ServiceDiscovery::new(invalid_config).await;
284        assert!(discovery.is_err());
285    }
286}