zeroconf_tokio/
service.rs

1//! Asynchronous mDNS service registration.
2
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use tokio::sync::oneshot;
7use zeroconf::prelude::*;
8use zeroconf::{MdnsService, ServiceRegistration};
9
10use crate::event_processor::EventProcessor;
11
12/// Asynchronous mDNS service registration.
13pub struct MdnsServiceAsync {
14    inner: MdnsService,
15    event_processor: EventProcessor,
16}
17
18impl MdnsServiceAsync {
19    /// Create a new asynchronous mDNS service.
20    pub fn new(service: MdnsService) -> zeroconf::Result<Self> {
21        Ok(Self {
22            inner: service,
23            event_processor: EventProcessor::new(),
24        })
25    }
26
27    /// Start the service with a timeout passed to the [`zeroconf::EventLoop`].
28    pub async fn start_with_timeout(
29        &mut self,
30        timeout: Duration,
31    ) -> zeroconf::Result<ServiceRegistration> {
32        if self.event_processor.is_running() {
33            return Err("Service already running".into());
34        }
35
36        info!("Starting async mDNS service: {:?}", self.inner);
37
38        let (sender, receiver) = oneshot::channel();
39        let sender = Arc::new(Mutex::new(Some(sender)));
40
41        let callback = Box::new(move |result, _| {
42            debug!("Received service registration: {:?}", result);
43            sender
44                .lock()
45                .expect("should have been able to lock sender")
46                .take()
47                .expect("should have been able to take sender")
48                .send(result)
49                .expect("should have been able to send result");
50        });
51
52        self.inner.set_registered_callback(callback);
53
54        let event_loop = self.inner.register()?;
55        self.event_processor
56            .start_with_timeout(event_loop, timeout)?;
57
58        // await on registration
59        receiver
60            .await
61            .expect("should have been able to receive registration")
62    }
63
64    /// Start the service.
65    pub async fn start(&mut self) -> zeroconf::Result<ServiceRegistration> {
66        self.start_with_timeout(Duration::ZERO).await
67    }
68
69    /// Shutdown the service.
70    pub async fn shutdown(&mut self) -> zeroconf::Result<()> {
71        info!("Shutting down async mDNS service: {:?}", self.inner);
72        self.event_processor.shutdown().await?;
73        info!("Service shut down");
74        Ok(())
75    }
76}
77
78#[cfg(test)]
79mod tests {
80    use super::*;
81
82    use zeroconf::ServiceType;
83
84    struct Fixture {
85        service: MdnsServiceAsync,
86    }
87
88    impl Fixture {
89        fn new() -> Self {
90            let service_type = ServiceType::new("http", "tcp").unwrap();
91            let mut service = MdnsService::new(service_type, 8080);
92
93            service.set_name("test_service");
94
95            Self {
96                service: MdnsServiceAsync::new(service).unwrap(),
97            }
98        }
99    }
100
101    #[tokio::test]
102    async fn it_registers() {
103        let mut fixture = Fixture::new();
104        let registration = fixture.service.start().await.unwrap();
105        let service_type = registration.service_type();
106
107        assert_eq!(registration.name(), "test_service");
108        assert_eq!(service_type.name(), "http");
109        assert_eq!(service_type.protocol(), "tcp");
110
111        fixture.service.shutdown().await.unwrap();
112    }
113
114    #[tokio::test]
115    async fn it_cannot_start_if_already_running() {
116        let mut fixture = Fixture::new();
117
118        fixture.service.start().await.unwrap();
119
120        let result = fixture.service.start().await;
121
122        assert!(result.is_err());
123
124        fixture.service.shutdown().await.unwrap();
125    }
126}