auto_discovery/
discovery.rs1use crate::{
4 config::DiscoveryConfig,
5 error::{DiscoveryError, Result},
6 protocols::ProtocolManager,
7 service::ServiceInfo,
8 types::ProtocolType,
9};
10use std::{
11 collections::HashMap,
12 sync::Arc,
13};
14use tokio::sync::Mutex;
15use tracing::{debug, info};
16
17pub struct ServiceDiscovery {
19 config: DiscoveryConfig,
20 protocol_manager: ProtocolManager,
21 discovered_services: Arc<Mutex<HashMap<String, ServiceInfo>>>,
22 registered_services: Arc<Mutex<HashMap<String, ServiceInfo>>>,
23}
24
25impl ServiceDiscovery {
26 pub async fn new(config: DiscoveryConfig) -> Result<Self> {
36 config.validate()?;
38
39 let protocol_manager = ProtocolManager::new(config.clone()).await?;
40
41 Ok(Self {
42 config,
43 protocol_manager,
44 discovered_services: Arc::new(Mutex::new(HashMap::new())),
45 registered_services: Arc::new(Mutex::new(HashMap::new())),
46 })
47 }
48
49 pub async fn discover_services(&self, protocol_type: Option<ProtocolType>) -> Result<Vec<ServiceInfo>> {
51 debug!("Starting service discovery");
52
53 let service_types = self.config.service_types().to_vec();
54 if service_types.is_empty() {
55 return Err(DiscoveryError::configuration("No service types configured for discovery"));
56 }
57
58 let timeout = Some(self.config.protocol_timeout());
59 let mut services = match protocol_type {
60 Some(protocol) => {
61 if !self.config.is_protocol_enabled(protocol) {
62 return Err(DiscoveryError::protocol(format!("Protocol {protocol:?} is not enabled")));
63 }
64 self.protocol_manager.discover_services_with_protocol(protocol, service_types, timeout).await?
65 }
66 None => self.protocol_manager.discover_services(service_types, timeout).await?,
67 };
68
69 if let Some(filter) = self.config.filter() {
71 services.retain(|service| filter.matches(service));
72 }
73
74 let max_services = self.config.max_services();
76 if max_services > 0 && services.len() > max_services {
77 services.truncate(max_services);
78 }
79
80 let mut discovered = self.discovered_services.lock().await;
82 for service in &services {
83 discovered.insert(service.name().to_string(), service.clone());
84 }
85
86 info!("Discovered {} services", services.len());
87 info!("Discovered {} services", services.len());
88 Ok(services)
89 }
90
91 pub async fn discover_services_filtered(&self,
125 service_types: Option<Vec<crate::types::ServiceType>>,
126 protocol_type: Option<ProtocolType>
127 ) -> Result<Vec<ServiceInfo>> {
128 debug!("Starting filtered service discovery");
129
130 let target_service_types = match service_types {
131 Some(types) => types,
132 None => self.config.service_types().to_vec()
133 };
134
135 if target_service_types.is_empty() {
136 return Err(DiscoveryError::configuration("No service types specified for discovery"));
137 }
138
139 let timeout = Some(self.config.protocol_timeout());
140 let mut services = match protocol_type {
141 Some(protocol) => {
142 if !self.config.is_protocol_enabled(protocol) {
143 return Err(DiscoveryError::protocol(format!("Protocol {protocol:?} is not enabled")));
144 }
145 self.protocol_manager.discover_services_with_protocol(protocol, target_service_types, timeout).await?
146 }
147 None => self.protocol_manager.discover_services(target_service_types, timeout).await?,
148 };
149
150 if let Some(filter) = self.config.filter() {
152 services.retain(|service| filter.matches(service));
153 }
154
155 {
157 let mut discovered = self.discovered_services.lock().await;
158 for service in &services {
159 discovered.insert(service.id.to_string(), service.clone());
160 }
161 }
162
163 info!("Discovered {} filtered services", services.len());
164 Ok(services)
165 }
166
167 pub async fn register_service(&self, service: ServiceInfo) -> Result<()> {
169 let service_name = service.name().to_string();
170 debug!("Registering service: {}", service_name);
171
172 self.protocol_manager.register_service(service.clone()).await?;
173
174 let mut registered = self.registered_services.lock().await;
175 registered.insert(service_name.clone(), service);
176
177 info!("Successfully registered service: {}", service_name);
178 Ok(())
179 }
180
181 pub async fn unregister_service(&self, service: &ServiceInfo) -> Result<()> {
183 let service_name = service.name().to_string();
184 debug!("Unregistering service: {}", service_name);
185
186 self.protocol_manager.unregister_service(service).await?;
187
188 let mut registered = self.registered_services.lock().await;
189 registered.remove(&service_name);
190
191 info!("Successfully unregistered service: {}", service_name);
192 Ok(())
193 }
194
195 pub async fn verify_service(&self, service: &ServiceInfo) -> Result<bool> {
197 debug!("Verifying service: {}", service.name());
198
199 self.protocol_manager.verify_service(service).await
200 }
201
202 pub async fn get_discovered_services(&self) -> Vec<ServiceInfo> {
204 self.discovered_services.lock().await
205 .values()
206 .cloned()
207 .collect()
208 }
209
210 pub async fn get_registered_services(&self) -> Vec<ServiceInfo> {
212 self.registered_services.lock().await
213 .values()
214 .cloned()
215 .collect()
216 }
217
218 pub async fn service_exists(&self, service_name: &str) -> bool {
220 self.discovered_services.lock().await.contains_key(service_name) ||
221 self.registered_services.lock().await.contains_key(service_name)
222 }
223
224 pub async fn update_config(&mut self, config: DiscoveryConfig) -> Result<()> {
226 self.config = config.clone();
227 self.protocol_manager = ProtocolManager::new(config).await?;
228 Ok(())
229 }
230}
231
232
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237 use crate::types::ServiceType;
238 use std::time::Duration;
239
240 #[tokio::test]
241 async fn test_service_discovery_creation() {
242 let config = DiscoveryConfig::new()
243 .with_service_type(ServiceType::new("_test._tcp").unwrap())
244 .with_timeout(Duration::from_secs(1));
245
246 let discovery = ServiceDiscovery::new(config).await;
247 assert!(discovery.is_ok());
248 }
249
250 #[tokio::test]
251 async fn test_service_registration() {
252 let config = DiscoveryConfig::new();
253 let discovery = ServiceDiscovery::new(config).await.unwrap();
254
255 let service = ServiceInfo::new("Test Service", "_test._tcp", 8080, None).unwrap();
256 let result = discovery.register_service(service).await;
257
258 match result {
261 Ok(_) => {
262 assert_eq!(discovery.get_registered_services().await.len(), 1);
263 }
264 Err(_) => {
265 }
267 }
268 }
269
270 #[tokio::test]
271 async fn test_event_subscription() {
272 let config = DiscoveryConfig::new();
273 let discovery = ServiceDiscovery::new(config).await.unwrap();
274
275 let _ = discovery.discover_services(None).await;
278 }
279
280 #[tokio::test]
281 async fn test_config_validation() {
282 let invalid_config = DiscoveryConfig::new().with_timeout(Duration::ZERO);
283 let discovery = ServiceDiscovery::new(invalid_config).await;
284 assert!(discovery.is_err());
285 }
286}