async_zeroconf/
service_browser.rs

1use crate::{
2    BonjourError, Interface, OpKind, OpType, ProcessTask, Service, ServiceRef, ServiceRefWrapper,
3    ServiceResolver, ZeroconfError,
4};
5
6use core::pin::Pin;
7use core::task::{Context, Poll};
8use futures::stream::StreamExt;
9use futures_core::Stream;
10use std::ffi;
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::mpsc;
14
15use bonjour_sys::{DNSServiceErrorType, DNSServiceFlags, DNSServiceRef};
16
17/// `ServiceBrowserBuilder` is used to browse for services. Once all the
18/// required information is added to the `ServiceBrowserBuilder` the
19/// [`browse`][`ServiceBrowserBuilder::browse`] method will produce a
20/// [`ServiceBrowser`] which can be used as a stream, or the
21/// [`ServiceBrowser::recv`] method will produce the next service found.
22///
23/// # Note
24/// This does not resolve the services so does not contain all information
25/// associated with the service. A further resolve operation is required to
26/// fully populate the service. This can be done with a [`ServiceResolver`].
27/// Alternatively, the [`ServiceBrowser::recv_resolve`] method can be
28/// used to resolve the services inline, or [`ServiceBrowser::resolving`] used
29/// to convert the stream into one that resolves services before returning
30/// them.
31///
32/// # Examples
33/// ```
34/// # tokio_test::block_on(async {
35/// let mut browser = async_zeroconf::ServiceBrowserBuilder::new("_http._tcp");
36/// let mut services = browser
37///     .timeout(tokio::time::Duration::from_secs(2))
38///     .browse()?;
39///
40/// while let Some(v) = services.recv().await {
41///     println!("Service = {:?}", v);
42/// }
43/// # Ok::<(), async_zeroconf::ZeroconfError>(())
44/// # });
45/// ```
46#[derive(Debug, Clone, Eq, PartialEq, Hash)]
47pub struct ServiceBrowserBuilder {
48    interface: Interface,
49    service_type: String,
50    domain: Option<String>,
51    timeout: Option<Duration>,
52    close_on_end: bool,
53}
54
55/// Struct used to get the results of a service browser which should be
56/// constructed with a [`ServiceBrowserBuilder`].
57#[derive(Debug)]
58pub struct ServiceBrowser {
59    // Channel to receive found services
60    rx: mpsc::UnboundedReceiver<(Result<Service, ZeroconfError>, bool)>,
61    // Reference to the socket used to process events
62    delegate: ServiceRef,
63    // Close if no more events
64    close_on_end: bool,
65}
66
67impl Stream for ServiceBrowser {
68    type Item = Result<Service, ZeroconfError>;
69
70    fn poll_next(
71        mut self: Pin<&mut Self>,
72        cx: &mut Context<'_>,
73    ) -> Poll<Option<<Self as futures_core::Stream>::Item>> {
74        self.rx.poll_recv(cx).map(|p| {
75            p.map(|s| {
76                if s.1 {
77                    self.close()
78                };
79                s.0
80            })
81        })
82    }
83}
84
85impl ServiceBrowser {
86    // Close the underlying receiver
87    fn close(&mut self) {
88        if self.close_on_end {
89            log::debug!("Got end of events ({})", self.delegate.op_type());
90            self.rx.close();
91        }
92    }
93
94    /// Receive a service from the browser.
95    ///
96    /// A response of `None` indicates that the browse operation has
97    /// finished, for example due to a timeout or error.
98    ///
99    /// # Examples
100    /// ```
101    /// # tokio_test::block_on(async {
102    /// let mut browser = async_zeroconf::ServiceBrowserBuilder::new("_http._tcp");
103    /// let mut services = browser
104    ///     .timeout(tokio::time::Duration::from_secs(2))
105    ///     .browse()?;
106    ///
107    /// while let Some(v) = services.recv().await {
108    ///     println!("Service = {:?}", v);
109    /// }
110    /// # Ok::<(), async_zeroconf::ZeroconfError>(())
111    /// # });
112    /// ```
113    pub async fn recv(&mut self) -> Option<Result<Service, ZeroconfError>> {
114        self.rx.recv().await.map(|s| {
115            if s.1 {
116                self.close()
117            };
118            s.0
119        })
120    }
121
122    /// Receive a service from the browser, resolving it before returning it
123    ///
124    /// A response of `None` indicates that the browse operation has
125    /// finished, for example due to a timeout or error. If the resolve
126    /// operation fails the error will be contained in the inner `Result`.
127    ///
128    /// # Examples
129    /// ```
130    /// # tokio_test::block_on(async {
131    /// let mut browser = async_zeroconf::ServiceBrowserBuilder::new("_http._tcp");
132    /// let mut services = browser
133    ///     .timeout(tokio::time::Duration::from_secs(2))
134    ///     .browse()?;
135    ///
136    /// while let Some(Ok(v)) = services.recv_resolve().await {
137    ///     println!("Resolved Service = {:?}", v);
138    /// }
139    /// # Ok::<(), async_zeroconf::ZeroconfError>(())
140    /// # });
141    /// ```
142    pub async fn recv_resolve(&mut self) -> Option<Result<Service, ZeroconfError>> {
143        match self.recv().await {
144            Some(Ok(service)) => Some(ServiceResolver::r(&service).await),
145            Some(Err(e)) => Some(Err(e)),
146            None => None,
147        }
148    }
149
150    /// Return a stream that includes the resolve operation before returning
151    /// results. The [`ServiceBrowser`] is consumed to produce the new stream.
152    ///
153    /// The values produced by the stream are equivalent to those produced by
154    /// [`recv_resolve`][`ServiceBrowser::recv_resolve`].
155    ///
156    /// # Examples
157    /// ```
158    /// use tokio_stream::StreamExt;
159    /// # tokio_test::block_on(async {
160    /// let mut browser = async_zeroconf::ServiceBrowserBuilder::new("_http._tcp");
161    /// let mut services = browser
162    ///     .timeout(tokio::time::Duration::from_secs(2))
163    ///     .browse()?;
164    ///
165    /// let mut stream = services.resolving();
166    /// while let Some(Ok(v)) = stream.next().await {
167    ///     println!("Resolved Service = {:?}", v);
168    /// }
169    /// # Ok::<(), async_zeroconf::ZeroconfError>(())
170    /// # });
171    /// ```
172    pub fn resolving(self) -> impl Stream<Item = Result<Service, ZeroconfError>> + Unpin {
173        Box::pin(self.then(|service| async move {
174            match service {
175                Ok(s) => ServiceResolver::r(&s).await,
176                Err(e) => Err(e),
177            }
178        }))
179    }
180}
181
182#[derive(Debug)]
183struct ServiceBrowserContext {
184    tx: mpsc::UnboundedSender<(Result<Service, ZeroconfError>, bool)>,
185}
186
187impl ServiceBrowserContext {
188    fn send(&self, result: Result<Service, ZeroconfError>, last: bool) {
189        if let Err(e) = self.tx.send((result, last)) {
190            log::warn!("Failed to send Service, receiver dropped: {}", e);
191        }
192    }
193}
194
195impl Drop for ServiceBrowserContext {
196    fn drop(&mut self) {
197        log::trace!("Dropping ServiceBrowserContext");
198    }
199}
200
201unsafe fn browse_callback_inner(
202    intf_index: u32,
203    name: *const libc::c_char,
204    regtype: *const libc::c_char,
205    domain: *const libc::c_char,
206) -> Result<Service, ZeroconfError> {
207    let c_name = ffi::CStr::from_ptr(name);
208    let c_type = ffi::CStr::from_ptr(regtype);
209    let c_domain = ffi::CStr::from_ptr(domain);
210    let name = c_name.to_str()?;
211    let regtype = c_type.to_str()?;
212    let domain = c_domain.to_str()?;
213
214    log::debug!(
215        "ServiceBrowse Callback OK ({}:{}:{})",
216        name,
217        regtype,
218        domain
219    );
220    let mut service = Service::new(name, regtype, 0);
221    service
222        .set_interface(Interface::Interface(intf_index))
223        .set_domain(domain.to_string())
224        .set_browse();
225    Ok(service)
226}
227
228// Callback passed to DNSServiceBrowse
229unsafe extern "C" fn browse_callback(
230    _sd_ref: DNSServiceRef,
231    flags: DNSServiceFlags,
232    intf_index: u32,
233    error: DNSServiceErrorType,
234    name: *const libc::c_char,
235    regtype: *const libc::c_char,
236    domain: *const libc::c_char,
237    context: *mut libc::c_void,
238) {
239    let proxy = &*(context as *const ServiceBrowserContext);
240    if error == 0 {
241        let more = (flags & 0x1) == 0x1;
242        let add = (flags & 0x2) == 0x2;
243
244        if add {
245            let service = browse_callback_inner(intf_index, name, regtype, domain);
246            if !more {
247                log::trace!("End of services (for now)");
248            }
249
250            proxy.send(service, !more);
251        } else {
252            let c_name = ffi::CStr::from_ptr(name);
253            if let Ok(s) = c_name.to_str() {
254                log::debug!("ServiceBrowse Remove {}", s);
255            }
256        }
257    } else {
258        proxy.send(Err(error.into()), false);
259
260        log::error!(
261            "ServiceBrowse Callback Error ({}:{})",
262            error,
263            Into::<BonjourError>::into(error)
264        )
265    }
266}
267
268impl ServiceBrowserBuilder {
269    /// Create a new `ServiceBrowserBuilder` for the specified service type
270    pub fn new(service_type: &str) -> Self {
271        ServiceBrowserBuilder {
272            interface: Default::default(),
273            service_type: service_type.to_string(),
274            domain: None,
275            timeout: None,
276            close_on_end: false,
277        }
278    }
279
280    /// Set the timeout
281    pub fn timeout(&mut self, timeout: Duration) -> &mut Self {
282        self.timeout = Some(timeout);
283        self
284    }
285
286    /// Set the browser to close if no more [`Service`]s are found.
287    ///
288    /// # Note
289    /// The browser can only detect the end of the [`Service`]s if
290    /// any are found. A timeout can be used in combination with closing on
291    /// end to ensure that the browser will terminate.
292    pub fn close_on_end(&mut self) -> &mut Self {
293        self.close_on_end = true;
294        self
295    }
296
297    /// Set the interface for service discovery rather than all
298    pub fn interface(&mut self, interface: Interface) -> &mut Self {
299        self.interface = interface;
300        self
301    }
302
303    /// Set the domain for service discovery rather than all
304    pub fn domain(&mut self, domain: String) -> &mut Self {
305        self.domain = Some(domain);
306        self
307    }
308
309    /// Start the browsing operation, which will continue until the specified
310    /// timeout or until the [`ServiceBrowser`] is dropped.
311    ///
312    /// # Examples
313    /// ```
314    /// # tokio_test::block_on(async {
315    /// let mut browser = async_zeroconf::ServiceBrowserBuilder::new("_http._tcp");
316    /// let mut services = browser
317    ///     .timeout(tokio::time::Duration::from_secs(2))
318    ///     .browse()?;
319    ///
320    /// while let Some(Ok(v)) = services.recv().await {
321    ///     println!("Service = {:?}", v);
322    /// }
323    /// # Ok::<(), async_zeroconf::ZeroconfError>(())
324    /// # });
325    /// ```
326    pub fn browse(&self) -> Result<ServiceBrowser, ZeroconfError> {
327        let (browser, task) = self.browse_task()?;
328
329        tokio::spawn(task);
330
331        Ok(browser)
332    }
333
334    /// Start the browsing operation, which will continue until the specified
335    /// timeout or until the [`ServiceBrowser`] is dropped. The returned
336    /// [`ProcessTask`] future must be awaited to process events associated with
337    /// the browser.
338    ///
339    /// # Note
340    /// This method is intended if more control is needed over how the task
341    /// is spawned. [`ServiceBrowserBuilder::browse`] will automatically spawn
342    /// the task.
343    ///
344    /// # Examples
345    /// ```
346    /// # tokio_test::block_on(async {
347    /// let mut browser = async_zeroconf::ServiceBrowserBuilder::new("_http._tcp");
348    /// let (mut services, task) = browser
349    ///     .timeout(tokio::time::Duration::from_secs(2))
350    ///     .browse_task()?;
351    ///
352    /// tokio::spawn(task);
353    ///
354    /// while let Some(Ok(v)) = services.recv().await {
355    ///     println!("Service = {:?}", v);
356    /// }
357    /// # Ok::<(), async_zeroconf::ZeroconfError>(())
358    /// # });
359    /// ```
360    pub fn browse_task(&self) -> Result<(ServiceBrowser, impl ProcessTask), ZeroconfError> {
361        let (tx, rx) = mpsc::unbounded_channel();
362
363        let callback_context = ServiceBrowserContext { tx };
364
365        let context = Arc::new(callback_context);
366        let context_ptr =
367            Arc::as_ptr(&context) as *mut Arc<ServiceBrowserContext> as *mut libc::c_void;
368
369        let service_handle = crate::c_intf::service_browse(
370            &self.interface,
371            &self.service_type,
372            self.domain.as_deref(),
373            Some(browse_callback),
374            context_ptr,
375        )?;
376
377        let (service_ref, task) = ServiceRefWrapper::from_service(
378            service_handle,
379            OpType::new(&self.service_type, OpKind::Browse),
380            Some(Box::new(context)),
381            self.timeout,
382        )?;
383
384        log::debug!("Created ServiceBrowser");
385        let browser = ServiceBrowser {
386            rx,
387            delegate: service_ref,
388            close_on_end: self.close_on_end,
389        };
390
391        Ok((browser, task))
392    }
393}