async_zeroconf/
service_ref.rs

1// Private helper structures to wrap the service reference
2
3use crate::{BonjourError, ZeroconfError};
4
5use bonjour_sys::{
6    DNSServiceProcessResult, DNSServiceRef, DNSServiceRefDeallocate, DNSServiceRefSockFD,
7};
8use futures::Future;
9use std::any::Any;
10use std::fmt::Display;
11use std::sync::Mutex;
12use std::time::Duration;
13use tokio::io::unix::AsyncFd;
14use tokio::sync::oneshot;
15
16/// `OpType` is used to indicate the service type and the kind of operation
17/// associated with a [`ServiceRef`]. Primarily intended for debug.
18///
19/// # Examples
20/// ```
21/// # tokio_test::block_on(async {
22/// let service = async_zeroconf::Service::new("Server", "_http._tcp", 80);
23/// let service_ref = service.publish().await?;
24///
25/// assert_eq!(service_ref.op_type().service_type(), "_http._tcp");
26/// assert_eq!(service_ref.op_type().kind(), &async_zeroconf::OpKind::Publish);
27/// # Ok::<(), async_zeroconf::ZeroconfError>(())
28/// # });
29/// ```
30#[derive(Debug, Clone)]
31pub struct OpType {
32    service_type: String,
33    kind: OpKind,
34}
35
36impl OpType {
37    pub(crate) fn new(service_type: &str, kind: OpKind) -> Self {
38        OpType {
39            service_type: service_type.to_string(),
40            kind,
41        }
42    }
43
44    /// The associated service type (e.g. `"_http._tcp"`).
45    pub fn service_type(&self) -> &str {
46        &self.service_type
47    }
48
49    /// The associated type of operation (e.g. publishing a service).
50    pub fn kind(&self) -> &OpKind {
51        &self.kind
52    }
53}
54
55impl Display for OpType {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
57        write!(f, "{:?}[{}]", self.kind, self.service_type)
58    }
59}
60
61/// `OpKind` represents the possible kinds of operation associated with a
62/// [`ServiceRef`], primarily used for debug and obtained from the [`OpType`]
63/// returned by [`ServiceRef::op_type`].
64#[derive(Debug, Clone, Eq, PartialEq)]
65pub enum OpKind {
66    /// An operation publishing a service.
67    Publish,
68    /// An operation to browse for a given type of service.
69    Browse,
70    /// An operation to resolve a service.
71    Resolve,
72}
73
74/// Struct to hold a published service, which keeps the service alive while a
75/// reference to it is held.
76/// When dropped the Service will be removed and any associated resources
77/// deallocated.
78///
79/// This should be created via a [`Service`][`crate::Service`] or a
80/// [`ServiceResolver`][`crate::ServiceResolver`]. For a browse
81/// operation the `ServiceRef` is held by the `ServiceBrowser` created by a
82/// [`ServiceBrowserBuilder`][`crate::ServiceBrowserBuilder`].
83#[derive(Debug)]
84#[must_use]
85pub struct ServiceRef {
86    shutdown_tx: Option<oneshot::Sender<()>>,
87    op_type: OpType,
88}
89
90impl ServiceRef {
91    /// Return a descriptive type of the operation associated with this
92    /// reference.
93    pub fn op_type(&self) -> &OpType {
94        &self.op_type
95    }
96}
97
98impl Drop for ServiceRef {
99    fn drop(&mut self) {
100        log::debug!("Dropping ServiceRef ({})", self.op_type);
101        // Send shutdown to end process task if idle
102        // Should only fail if rx already dropped
103        if self
104            .shutdown_tx
105            .take()
106            .expect("shutdown taken before drop")
107            .send(())
108            .is_err()
109        {}
110    }
111}
112
113// Internal type to hold the file descriptor for the socket associated with the
114// service.
115#[derive(Debug)]
116pub(crate) struct ServiceFileDescriptor {
117    pub fd: i32,
118}
119
120// Allow ServiceFileDescriptor to be convered to a AsyncFd by implementing the
121// AsRawFd trait.
122impl std::os::unix::prelude::AsRawFd for ServiceFileDescriptor {
123    fn as_raw_fd(&self) -> i32 {
124        self.fd
125    }
126}
127
128/// The `ProcessTask` trait represents the future that is returned from some
129/// functions that is awaited on to process events associated with a published
130/// service or a browse operation.
131pub trait ProcessTask: Future<Output = ()> + Send + Sync {}
132
133impl<T> ProcessTask for T where T: Future<Output = ()> + Send + Sync {}
134
135#[derive(Debug)]
136pub(crate) struct ServiceRefWrapper {
137    // Pointer to reference returned by C API
138    pub inner: DNSServiceRef,
139    // Mutex to protect service reference
140    pub lock: Mutex<()>,
141    // Async file descriptor to detect new events asynchronously
142    pub fd: AsyncFd<ServiceFileDescriptor>,
143    // Hold a reference to an (optional) context used for C API callbacks
144    context: Option<Box<dyn Any + Send>>,
145    // Operation type that created this reference
146    op_type: OpType,
147}
148
149impl ServiceRefWrapper {
150    pub fn from_service(
151        service_ref: DNSServiceRef,
152        op_type: OpType,
153        context: Option<Box<dyn Any + Send>>,
154        timeout: Option<Duration>,
155    ) -> Result<(ServiceRef, impl ProcessTask), std::io::Error> {
156        log::trace!("Call DNSServiceRefSockFD");
157        let fd = unsafe { DNSServiceRefSockFD(service_ref) };
158        log::trace!("  FD:{}", fd);
159
160        log::debug!("Creating ServiceRef ({})", op_type);
161
162        match AsyncFd::new(ServiceFileDescriptor { fd }) {
163            Ok(async_fd) => {
164                // Create channel for shutdown
165                let (tx, rx) = oneshot::channel::<()>();
166
167                // Create the wrapper for processing events
168                let wrapper = ServiceRefWrapper {
169                    inner: service_ref,
170                    lock: Mutex::new(()),
171                    fd: async_fd,
172                    context,
173                    op_type: op_type.clone(),
174                };
175
176                // Spawn the task that will process events
177                let task = async move {
178                    match ServiceRefWrapper::process(rx, wrapper, timeout).await {
179                        Ok(_) => (),
180                        Err(e) => log::error!("Error on processing: {}", e),
181                    }
182                };
183
184                // Create the reference that will hold the service active
185                let s_ref = ServiceRef {
186                    shutdown_tx: Some(tx),
187                    op_type,
188                };
189
190                Ok((s_ref, task))
191            }
192            Err(e) => Err(e),
193        }
194    }
195
196    /// A future to wait for any pending events related to the service,
197    /// handling them and then completing the future.
198    async fn process_events(service_ref: &ServiceRefWrapper) -> Result<bool, ZeroconfError> {
199        // Wait on indication that file descriptor is readable
200        let mut fd = service_ref.fd.readable().await?;
201
202        log::trace!("Call DNSServiceProcessResult");
203
204        // Process any pending events
205        let process_err = {
206            let mut _guard = service_ref.lock.lock()?;
207            unsafe { DNSServiceProcessResult(service_ref.inner) }
208        };
209        // Clear ready flag for socket to wait for next event
210        // As there is no await point or polling between processing above and
211        // clearing the flag, there should be no opportunity to 'miss' an event
212        // between the DNSServiceProcessResult and clear_ready().
213        fd.clear_ready();
214        if process_err != 0 {
215            return Err(Into::<BonjourError>::into(process_err).into());
216        }
217
218        Ok(true)
219    }
220
221    /// Processing wrapper to keep processing events as available
222    async fn process(
223        mut rx: oneshot::Receiver<()>,
224        service_ref: ServiceRefWrapper,
225        timeout: Option<Duration>,
226    ) -> Result<(), ZeroconfError> {
227        let (tx_time, mut rx_time) = oneshot::channel();
228
229        if let Some(t) = timeout {
230            tokio::spawn(async move {
231                tokio::time::sleep(t).await;
232                match tx_time.send(()) {
233                    Ok(_) => {
234                        log::debug!("Sending timeout");
235                    }
236                    Err(_) => {
237                        log::trace!("Sending timeout failed - processing ended due to shutdown");
238                    }
239                }
240            });
241        }
242
243        loop {
244            tokio::select! {
245                // Shutdown event
246                _ = &mut rx => {
247                    log::debug!("Process task got shutdown");
248                    return Ok(());
249                }
250                // Timeout future
251                _ = &mut rx_time => {
252                    log::debug!("Process task got timeout");
253                    return Ok(());
254                }
255                // Event processing
256                e = Self::process_events(&service_ref) => {
257                    match e {
258                        Ok(b) => {
259                            if b {
260                                log::trace!("Events processed");
261                            } else {
262                                log::trace!("Got null pointer due to shutdown");
263                                return Ok(());
264                            }
265                        },
266                        Err(e) => return Err(e)
267                    }
268                }
269            }
270        }
271    }
272}
273
274// Implement Send as reference is thread-safe
275unsafe impl Send for ServiceRefWrapper {}
276// Implement Sync as reference is protected by mutex
277unsafe impl Sync for ServiceRefWrapper {}
278
279impl Drop for ServiceRefWrapper {
280    fn drop(&mut self) {
281        log::debug!(
282            "Dropping and deallocating service reference ({})",
283            self.op_type
284        );
285        {
286            match self.lock.lock() {
287                Ok(_guard) => {
288                    unsafe { DNSServiceRefDeallocate(self.inner) };
289                }
290                Err(_) => {
291                    log::warn!("Service reference mutex was poisoned");
292                    unsafe { DNSServiceRefDeallocate(self.inner) };
293                }
294            }
295        }
296        if self.context.is_some() {
297            log::debug!("Context to be dropped ({})", self.op_type);
298        }
299    }
300}