zeroconf_tokio/
browser.rs1use std::sync::Arc;
4use std::time::Duration;
5
6use tokio::sync::mpsc;
7use tokio::sync::Mutex;
8use zeroconf::prelude::*;
9use zeroconf::MdnsBrowser;
10use zeroconf::ServiceDiscovery;
11
12use crate::event_processor::EventProcessor;
13
14type Sender = mpsc::Sender<zeroconf::Result<ServiceDiscovery>>;
15type Receiver = mpsc::Receiver<zeroconf::Result<ServiceDiscovery>>;
16
17pub struct MdnsBrowserAsync {
19 inner: MdnsBrowser,
20 event_processor: EventProcessor,
21 sender: Arc<Mutex<Sender>>,
22 receiver: Receiver,
23}
24
25impl MdnsBrowserAsync {
26 pub fn new(browser: MdnsBrowser) -> zeroconf::Result<Self> {
28 let (sender, receiver) = mpsc::channel(1);
29
30 Ok(Self {
31 inner: browser,
32 event_processor: EventProcessor::new(),
33 sender: Arc::new(Mutex::new(sender)),
34 receiver,
35 })
36 }
37
38 pub async fn start_with_timeout(&mut self, timeout: Duration) -> zeroconf::Result<()> {
40 if self.event_processor.is_running() {
41 return Err("Browser already running".into());
42 }
43
44 info!("Starting async mDNS browser: {:?}", self.inner);
45
46 let sender = self.sender.clone();
47
48 let callback = Box::new(move |result, _| {
49 debug!("Received service discovery: {:?}", result);
50 let sender = sender.clone();
51 tokio::spawn(async move { sender.lock().await.send(result).await });
52 });
53
54 self.inner.set_service_discovered_callback(callback);
55
56 let event_loop = self.inner.browse_services()?;
57 self.event_processor
58 .start_with_timeout(event_loop, timeout)?;
59
60 Ok(())
61 }
62
63 pub async fn start(&mut self) -> zeroconf::Result<()> {
65 self.start_with_timeout(Duration::ZERO).await
66 }
67
68 pub async fn next(&mut self) -> Option<zeroconf::Result<ServiceDiscovery>> {
70 if !self.event_processor.is_running() {
71 return None;
72 }
73
74 self.receiver.recv().await
75 }
76
77 pub async fn shutdown(&mut self) -> zeroconf::Result<()> {
79 info!("Shutting down browser...");
80 self.event_processor.shutdown().await?;
81 info!("Browser shut down");
82 Ok(())
83 }
84}
85
86#[cfg(test)]
87mod tests {
88 use ntest::timeout;
89 use zeroconf::prelude::*;
90 use zeroconf::MdnsService;
91 use zeroconf::ServiceType;
92
93 use crate::MdnsServiceAsync;
94
95 use super::*;
96
97 struct Fixture {
98 services: Vec<MdnsServiceAsync>,
99 browser: MdnsBrowserAsync,
100 }
101
102 impl Fixture {
103 fn new(services: Vec<MdnsServiceAsync>, browser: MdnsBrowserAsync) -> Self {
104 Self { services, browser }
105 }
106
107 fn with_single_service() -> Self {
108 let service_type = ServiceType::new("http", "tcp").unwrap();
109 let mut service = MdnsService::new(service_type.clone(), 8080);
110
111 service.set_name("test_service");
112
113 Self::new(
114 vec![MdnsServiceAsync::new(service).unwrap()],
115 MdnsBrowserAsync::new(MdnsBrowser::new(service_type)).unwrap(),
116 )
117 }
118
119 async fn start(&mut self) -> zeroconf::Result<()> {
120 for service in self.services.iter_mut() {
121 service.start().await?;
122 }
123
124 self.browser.start().await
125 }
126
127 async fn shutdown(&mut self) {
128 self.browser.shutdown().await.unwrap();
129
130 for service in self.services.iter_mut() {
131 service.shutdown().await.unwrap();
132 }
133 }
134 }
135
136 #[tokio::test]
137 async fn it_discovers() {
138 let mut fixture = Fixture::with_single_service();
139
140 fixture.start().await.unwrap();
141
142 let discovered_service = fixture.browser.next().await.unwrap().unwrap();
143 let service_type = discovered_service.service_type();
144
145 assert_eq!(discovered_service.name(), "test_service");
146 assert_eq!(service_type.name(), "http");
147 assert_eq!(service_type.protocol(), "tcp");
148 assert_eq!(discovered_service.port(), &8080);
149
150 fixture.shutdown().await;
151 }
152
153 #[tokio::test(flavor = "multi_thread")]
154 async fn it_discovers_multi_thread() {
155 let mut fixture = Fixture::with_single_service();
156 fixture.start().await.unwrap();
157
158 let discovered_service = fixture.browser.next().await.unwrap().unwrap();
159 let service_type = discovered_service.service_type();
160
161 assert_eq!(discovered_service.name(), "test_service");
162 assert_eq!(service_type.name(), "http");
163 assert_eq!(service_type.protocol(), "tcp");
164 assert_eq!(discovered_service.port(), &8080);
165
166 fixture.shutdown().await;
167 }
168
169 #[tokio::test]
170 async fn it_drops_without_shutdown_gracefully() {
171 let mut fixture = Fixture::with_single_service();
172 fixture.start().await.unwrap();
173
174 let discovered_service = fixture.browser.next().await.unwrap().unwrap();
175 let service_type = discovered_service.service_type();
176
177 assert_eq!(discovered_service.name(), "test_service");
178 assert_eq!(service_type.name(), "http");
179 assert_eq!(service_type.protocol(), "tcp");
180 assert_eq!(discovered_service.port(), &8080);
181 }
182
183 #[tokio::test]
184 #[timeout(10000)]
185 async fn it_discovers_n_services() {
186 let service_type = ServiceType::new("http", "tcp").unwrap();
187 let mut service1 = MdnsService::new(service_type.clone(), 8080);
188 let mut service2 = MdnsService::new(service_type.clone(), 8081);
189
190 service1.set_name("test_service_1");
191 service2.set_name("test_service_2");
192
193 let services = vec![
194 MdnsServiceAsync::new(service1).unwrap(),
195 MdnsServiceAsync::new(service2).unwrap(),
196 ];
197
198 let browser = MdnsBrowserAsync::new(MdnsBrowser::new(service_type)).unwrap();
199 let mut fixture = Fixture::new(services, browser);
200 let mut service1_discovered = false;
201 let mut service2_discovered = false;
202
203 while let Some(Ok(service)) = fixture.browser.next().await {
204 if service1_discovered && service2_discovered {
205 break;
206 }
207
208 if service.name() == "test_service_1" {
209 service1_discovered = true;
210 } else if service.name() == "test_service_2" {
211 service2_discovered = true;
212 }
213 }
214 }
215
216 #[tokio::test]
217 async fn it_cannot_start_if_already_running() {
218 let mut fixture = Fixture::with_single_service();
219 fixture.start().await.unwrap();
220
221 let result = fixture.browser.start().await;
222
223 assert!(result.is_err());
224
225 fixture.shutdown().await;
226 }
227
228 #[tokio::test]
229 async fn it_cannot_discover_if_not_running() {
230 let mut fixture = Fixture::with_single_service();
231
232 let result = fixture.browser.next().await;
233
234 assert!(result.is_none());
235 }
236}