zeroconf_tokio/
browser.rs

1//! Asynchronous mDNS browser.
2
3use std::sync::Arc;
4use std::time::Duration;
5
6use tokio::sync::mpsc;
7use tokio::sync::Mutex;
8use zeroconf::prelude::*;
9use zeroconf::MdnsBrowser;
10use zeroconf::ServiceDiscovery;
11
12use crate::event_processor::EventProcessor;
13
14type Sender = mpsc::Sender<zeroconf::Result<ServiceDiscovery>>;
15type Receiver = mpsc::Receiver<zeroconf::Result<ServiceDiscovery>>;
16
17/// Asynchronous mDNS browser.
18pub struct MdnsBrowserAsync {
19    inner: MdnsBrowser,
20    event_processor: EventProcessor,
21    sender: Arc<Mutex<Sender>>,
22    receiver: Receiver,
23}
24
25impl MdnsBrowserAsync {
26    /// Create a new asynchronous mDNS browser.
27    pub fn new(browser: MdnsBrowser) -> zeroconf::Result<Self> {
28        let (sender, receiver) = mpsc::channel(1);
29
30        Ok(Self {
31            inner: browser,
32            event_processor: EventProcessor::new(),
33            sender: Arc::new(Mutex::new(sender)),
34            receiver,
35        })
36    }
37
38    /// Start the browser with a timeout passed to the [`zeroconf::EventLoop`].
39    pub async fn start_with_timeout(&mut self, timeout: Duration) -> zeroconf::Result<()> {
40        if self.event_processor.is_running() {
41            return Err("Browser already running".into());
42        }
43
44        info!("Starting async mDNS browser: {:?}", self.inner);
45
46        let sender = self.sender.clone();
47
48        let callback = Box::new(move |result, _| {
49            debug!("Received service discovery: {:?}", result);
50            let sender = sender.clone();
51            tokio::spawn(async move { sender.lock().await.send(result).await });
52        });
53
54        self.inner.set_service_discovered_callback(callback);
55
56        let event_loop = self.inner.browse_services()?;
57        self.event_processor
58            .start_with_timeout(event_loop, timeout)?;
59
60        Ok(())
61    }
62
63    /// Start the browser.
64    pub async fn start(&mut self) -> zeroconf::Result<()> {
65        self.start_with_timeout(Duration::ZERO).await
66    }
67
68    /// Get the next discovered service or `None` if the browser is not running.
69    pub async fn next(&mut self) -> Option<zeroconf::Result<ServiceDiscovery>> {
70        if !self.event_processor.is_running() {
71            return None;
72        }
73
74        self.receiver.recv().await
75    }
76
77    /// Shutdown the browser.
78    pub async fn shutdown(&mut self) -> zeroconf::Result<()> {
79        info!("Shutting down browser...");
80        self.event_processor.shutdown().await?;
81        info!("Browser shut down");
82        Ok(())
83    }
84}
85
86#[cfg(test)]
87mod tests {
88    use ntest::timeout;
89    use zeroconf::prelude::*;
90    use zeroconf::MdnsService;
91    use zeroconf::ServiceType;
92
93    use crate::MdnsServiceAsync;
94
95    use super::*;
96
97    struct Fixture {
98        services: Vec<MdnsServiceAsync>,
99        browser: MdnsBrowserAsync,
100    }
101
102    impl Fixture {
103        fn new(services: Vec<MdnsServiceAsync>, browser: MdnsBrowserAsync) -> Self {
104            Self { services, browser }
105        }
106
107        fn with_single_service() -> Self {
108            let service_type = ServiceType::new("http", "tcp").unwrap();
109            let mut service = MdnsService::new(service_type.clone(), 8080);
110
111            service.set_name("test_service");
112
113            Self::new(
114                vec![MdnsServiceAsync::new(service).unwrap()],
115                MdnsBrowserAsync::new(MdnsBrowser::new(service_type)).unwrap(),
116            )
117        }
118
119        async fn start(&mut self) -> zeroconf::Result<()> {
120            for service in self.services.iter_mut() {
121                service.start().await?;
122            }
123
124            self.browser.start().await
125        }
126
127        async fn shutdown(&mut self) {
128            self.browser.shutdown().await.unwrap();
129
130            for service in self.services.iter_mut() {
131                service.shutdown().await.unwrap();
132            }
133        }
134    }
135
136    #[tokio::test]
137    async fn it_discovers() {
138        let mut fixture = Fixture::with_single_service();
139
140        fixture.start().await.unwrap();
141
142        let discovered_service = fixture.browser.next().await.unwrap().unwrap();
143        let service_type = discovered_service.service_type();
144
145        assert_eq!(discovered_service.name(), "test_service");
146        assert_eq!(service_type.name(), "http");
147        assert_eq!(service_type.protocol(), "tcp");
148        assert_eq!(discovered_service.port(), &8080);
149
150        fixture.shutdown().await;
151    }
152
153    #[tokio::test(flavor = "multi_thread")]
154    async fn it_discovers_multi_thread() {
155        let mut fixture = Fixture::with_single_service();
156        fixture.start().await.unwrap();
157
158        let discovered_service = fixture.browser.next().await.unwrap().unwrap();
159        let service_type = discovered_service.service_type();
160
161        assert_eq!(discovered_service.name(), "test_service");
162        assert_eq!(service_type.name(), "http");
163        assert_eq!(service_type.protocol(), "tcp");
164        assert_eq!(discovered_service.port(), &8080);
165
166        fixture.shutdown().await;
167    }
168
169    #[tokio::test]
170    async fn it_drops_without_shutdown_gracefully() {
171        let mut fixture = Fixture::with_single_service();
172        fixture.start().await.unwrap();
173
174        let discovered_service = fixture.browser.next().await.unwrap().unwrap();
175        let service_type = discovered_service.service_type();
176
177        assert_eq!(discovered_service.name(), "test_service");
178        assert_eq!(service_type.name(), "http");
179        assert_eq!(service_type.protocol(), "tcp");
180        assert_eq!(discovered_service.port(), &8080);
181    }
182
183    #[tokio::test]
184    #[timeout(10000)]
185    async fn it_discovers_n_services() {
186        let service_type = ServiceType::new("http", "tcp").unwrap();
187        let mut service1 = MdnsService::new(service_type.clone(), 8080);
188        let mut service2 = MdnsService::new(service_type.clone(), 8081);
189
190        service1.set_name("test_service_1");
191        service2.set_name("test_service_2");
192
193        let services = vec![
194            MdnsServiceAsync::new(service1).unwrap(),
195            MdnsServiceAsync::new(service2).unwrap(),
196        ];
197
198        let browser = MdnsBrowserAsync::new(MdnsBrowser::new(service_type)).unwrap();
199        let mut fixture = Fixture::new(services, browser);
200        let mut service1_discovered = false;
201        let mut service2_discovered = false;
202
203        while let Some(Ok(service)) = fixture.browser.next().await {
204            if service1_discovered && service2_discovered {
205                break;
206            }
207
208            if service.name() == "test_service_1" {
209                service1_discovered = true;
210            } else if service.name() == "test_service_2" {
211                service2_discovered = true;
212            }
213        }
214    }
215
216    #[tokio::test]
217    async fn it_cannot_start_if_already_running() {
218        let mut fixture = Fixture::with_single_service();
219        fixture.start().await.unwrap();
220
221        let result = fixture.browser.start().await;
222
223        assert!(result.is_err());
224
225        fixture.shutdown().await;
226    }
227
228    #[tokio::test]
229    async fn it_cannot_discover_if_not_running() {
230        let mut fixture = Fixture::with_single_service();
231
232        let result = fixture.browser.next().await;
233
234        assert!(result.is_none());
235    }
236}