zeroconf_tokio/
service.rs1use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use tokio::sync::oneshot;
7use zeroconf::prelude::*;
8use zeroconf::{MdnsService, ServiceRegistration};
9
10use crate::event_processor::EventProcessor;
11
12pub struct MdnsServiceAsync {
14    inner: MdnsService,
15    event_processor: EventProcessor,
16}
17
18impl MdnsServiceAsync {
19    pub fn new(service: MdnsService) -> zeroconf::Result<Self> {
21        Ok(Self {
22            inner: service,
23            event_processor: EventProcessor::new(),
24        })
25    }
26
27    pub async fn start_with_timeout(
29        &mut self,
30        timeout: Duration,
31    ) -> zeroconf::Result<ServiceRegistration> {
32        if self.event_processor.is_running() {
33            return Err("Service already running".into());
34        }
35
36        info!("Starting async mDNS service: {:?}", self.inner);
37
38        let (sender, receiver) = oneshot::channel();
39        let sender = Arc::new(Mutex::new(Some(sender)));
40
41        let callback = Box::new(move |result, _| {
42            debug!("Received service registration: {:?}", result);
43            sender
44                .lock()
45                .expect("should have been able to lock sender")
46                .take()
47                .expect("should have been able to take sender")
48                .send(result)
49                .expect("should have been able to send result");
50        });
51
52        self.inner.set_registered_callback(callback);
53
54        let event_loop = self.inner.register()?;
55        self.event_processor
56            .start_with_timeout(event_loop, timeout)?;
57
58        receiver
60            .await
61            .expect("should have been able to receive registration")
62    }
63
64    pub async fn start(&mut self) -> zeroconf::Result<ServiceRegistration> {
66        self.start_with_timeout(Duration::ZERO).await
67    }
68
69    pub async fn shutdown(&mut self) -> zeroconf::Result<()> {
71        info!("Shutting down async mDNS service: {:?}", self.inner);
72        self.event_processor.shutdown().await?;
73        info!("Service shut down");
74        Ok(())
75    }
76}
77
78#[cfg(test)]
79mod tests {
80    use super::*;
81
82    use zeroconf::ServiceType;
83
84    struct Fixture {
85        service: MdnsServiceAsync,
86    }
87
88    impl Fixture {
89        fn new() -> Self {
90            let service_type = ServiceType::new("http", "tcp").unwrap();
91            let mut service = MdnsService::new(service_type, 8080);
92
93            service.set_name("test_service");
94
95            Self {
96                service: MdnsServiceAsync::new(service).unwrap(),
97            }
98        }
99    }
100
101    #[tokio::test]
102    async fn it_registers() {
103        let mut fixture = Fixture::new();
104        let registration = fixture.service.start().await.unwrap();
105        let service_type = registration.service_type();
106
107        assert_eq!(registration.name(), "test_service");
108        assert_eq!(service_type.name(), "http");
109        assert_eq!(service_type.protocol(), "tcp");
110
111        fixture.service.shutdown().await.unwrap();
112    }
113
114    #[tokio::test]
115    async fn it_cannot_start_if_already_running() {
116        let mut fixture = Fixture::new();
117
118        fixture.service.start().await.unwrap();
119
120        let result = fixture.service.start().await;
121
122        assert!(result.is_err());
123
124        fixture.service.shutdown().await.unwrap();
125    }
126}