async_zeroconf/service_browser.rs
1use crate::{
2 BonjourError, Interface, OpKind, OpType, ProcessTask, Service, ServiceRef, ServiceRefWrapper,
3 ServiceResolver, ZeroconfError,
4};
5
6use core::pin::Pin;
7use core::task::{Context, Poll};
8use futures::stream::StreamExt;
9use futures_core::Stream;
10use std::ffi;
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::mpsc;
14
15use bonjour_sys::{DNSServiceErrorType, DNSServiceFlags, DNSServiceRef};
16
17/// `ServiceBrowserBuilder` is used to browse for services. Once all the
18/// required information is added to the `ServiceBrowserBuilder` the
19/// [`browse`][`ServiceBrowserBuilder::browse`] method will produce a
20/// [`ServiceBrowser`] which can be used as a stream, or the
21/// [`ServiceBrowser::recv`] method will produce the next service found.
22///
23/// # Note
24/// This does not resolve the services so does not contain all information
25/// associated with the service. A further resolve operation is required to
26/// fully populate the service. This can be done with a [`ServiceResolver`].
27/// Alternatively, the [`ServiceBrowser::recv_resolve`] method can be
28/// used to resolve the services inline, or [`ServiceBrowser::resolving`] used
29/// to convert the stream into one that resolves services before returning
30/// them.
31///
32/// # Examples
33/// ```
34/// # tokio_test::block_on(async {
35/// let mut browser = async_zeroconf::ServiceBrowserBuilder::new("_http._tcp");
36/// let mut services = browser
37/// .timeout(tokio::time::Duration::from_secs(2))
38/// .browse()?;
39///
40/// while let Some(v) = services.recv().await {
41/// println!("Service = {:?}", v);
42/// }
43/// # Ok::<(), async_zeroconf::ZeroconfError>(())
44/// # });
45/// ```
46#[derive(Debug, Clone, Eq, PartialEq, Hash)]
47pub struct ServiceBrowserBuilder {
48 interface: Interface,
49 service_type: String,
50 domain: Option<String>,
51 timeout: Option<Duration>,
52 close_on_end: bool,
53}
54
55/// Struct used to get the results of a service browser which should be
56/// constructed with a [`ServiceBrowserBuilder`].
57#[derive(Debug)]
58pub struct ServiceBrowser {
59 // Channel to receive found services
60 rx: mpsc::UnboundedReceiver<(Result<Service, ZeroconfError>, bool)>,
61 // Reference to the socket used to process events
62 delegate: ServiceRef,
63 // Close if no more events
64 close_on_end: bool,
65}
66
67impl Stream for ServiceBrowser {
68 type Item = Result<Service, ZeroconfError>;
69
70 fn poll_next(
71 mut self: Pin<&mut Self>,
72 cx: &mut Context<'_>,
73 ) -> Poll<Option<<Self as futures_core::Stream>::Item>> {
74 self.rx.poll_recv(cx).map(|p| {
75 p.map(|s| {
76 if s.1 {
77 self.close()
78 };
79 s.0
80 })
81 })
82 }
83}
84
85impl ServiceBrowser {
86 // Close the underlying receiver
87 fn close(&mut self) {
88 if self.close_on_end {
89 log::debug!("Got end of events ({})", self.delegate.op_type());
90 self.rx.close();
91 }
92 }
93
94 /// Receive a service from the browser.
95 ///
96 /// A response of `None` indicates that the browse operation has
97 /// finished, for example due to a timeout or error.
98 ///
99 /// # Examples
100 /// ```
101 /// # tokio_test::block_on(async {
102 /// let mut browser = async_zeroconf::ServiceBrowserBuilder::new("_http._tcp");
103 /// let mut services = browser
104 /// .timeout(tokio::time::Duration::from_secs(2))
105 /// .browse()?;
106 ///
107 /// while let Some(v) = services.recv().await {
108 /// println!("Service = {:?}", v);
109 /// }
110 /// # Ok::<(), async_zeroconf::ZeroconfError>(())
111 /// # });
112 /// ```
113 pub async fn recv(&mut self) -> Option<Result<Service, ZeroconfError>> {
114 self.rx.recv().await.map(|s| {
115 if s.1 {
116 self.close()
117 };
118 s.0
119 })
120 }
121
122 /// Receive a service from the browser, resolving it before returning it
123 ///
124 /// A response of `None` indicates that the browse operation has
125 /// finished, for example due to a timeout or error. If the resolve
126 /// operation fails the error will be contained in the inner `Result`.
127 ///
128 /// # Examples
129 /// ```
130 /// # tokio_test::block_on(async {
131 /// let mut browser = async_zeroconf::ServiceBrowserBuilder::new("_http._tcp");
132 /// let mut services = browser
133 /// .timeout(tokio::time::Duration::from_secs(2))
134 /// .browse()?;
135 ///
136 /// while let Some(Ok(v)) = services.recv_resolve().await {
137 /// println!("Resolved Service = {:?}", v);
138 /// }
139 /// # Ok::<(), async_zeroconf::ZeroconfError>(())
140 /// # });
141 /// ```
142 pub async fn recv_resolve(&mut self) -> Option<Result<Service, ZeroconfError>> {
143 match self.recv().await {
144 Some(Ok(service)) => Some(ServiceResolver::r(&service).await),
145 Some(Err(e)) => Some(Err(e)),
146 None => None,
147 }
148 }
149
150 /// Return a stream that includes the resolve operation before returning
151 /// results. The [`ServiceBrowser`] is consumed to produce the new stream.
152 ///
153 /// The values produced by the stream are equivalent to those produced by
154 /// [`recv_resolve`][`ServiceBrowser::recv_resolve`].
155 ///
156 /// # Examples
157 /// ```
158 /// use tokio_stream::StreamExt;
159 /// # tokio_test::block_on(async {
160 /// let mut browser = async_zeroconf::ServiceBrowserBuilder::new("_http._tcp");
161 /// let mut services = browser
162 /// .timeout(tokio::time::Duration::from_secs(2))
163 /// .browse()?;
164 ///
165 /// let mut stream = services.resolving();
166 /// while let Some(Ok(v)) = stream.next().await {
167 /// println!("Resolved Service = {:?}", v);
168 /// }
169 /// # Ok::<(), async_zeroconf::ZeroconfError>(())
170 /// # });
171 /// ```
172 pub fn resolving(self) -> impl Stream<Item = Result<Service, ZeroconfError>> + Unpin {
173 Box::pin(self.then(|service| async move {
174 match service {
175 Ok(s) => ServiceResolver::r(&s).await,
176 Err(e) => Err(e),
177 }
178 }))
179 }
180}
181
182#[derive(Debug)]
183struct ServiceBrowserContext {
184 tx: mpsc::UnboundedSender<(Result<Service, ZeroconfError>, bool)>,
185}
186
187impl ServiceBrowserContext {
188 fn send(&self, result: Result<Service, ZeroconfError>, last: bool) {
189 if let Err(e) = self.tx.send((result, last)) {
190 log::warn!("Failed to send Service, receiver dropped: {}", e);
191 }
192 }
193}
194
195impl Drop for ServiceBrowserContext {
196 fn drop(&mut self) {
197 log::trace!("Dropping ServiceBrowserContext");
198 }
199}
200
201unsafe fn browse_callback_inner(
202 intf_index: u32,
203 name: *const libc::c_char,
204 regtype: *const libc::c_char,
205 domain: *const libc::c_char,
206) -> Result<Service, ZeroconfError> {
207 let c_name = ffi::CStr::from_ptr(name);
208 let c_type = ffi::CStr::from_ptr(regtype);
209 let c_domain = ffi::CStr::from_ptr(domain);
210 let name = c_name.to_str()?;
211 let regtype = c_type.to_str()?;
212 let domain = c_domain.to_str()?;
213
214 log::debug!(
215 "ServiceBrowse Callback OK ({}:{}:{})",
216 name,
217 regtype,
218 domain
219 );
220 let mut service = Service::new(name, regtype, 0);
221 service
222 .set_interface(Interface::Interface(intf_index))
223 .set_domain(domain.to_string())
224 .set_browse();
225 Ok(service)
226}
227
228// Callback passed to DNSServiceBrowse
229unsafe extern "C" fn browse_callback(
230 _sd_ref: DNSServiceRef,
231 flags: DNSServiceFlags,
232 intf_index: u32,
233 error: DNSServiceErrorType,
234 name: *const libc::c_char,
235 regtype: *const libc::c_char,
236 domain: *const libc::c_char,
237 context: *mut libc::c_void,
238) {
239 let proxy = &*(context as *const ServiceBrowserContext);
240 if error == 0 {
241 let more = (flags & 0x1) == 0x1;
242 let add = (flags & 0x2) == 0x2;
243
244 if add {
245 let service = browse_callback_inner(intf_index, name, regtype, domain);
246 if !more {
247 log::trace!("End of services (for now)");
248 }
249
250 proxy.send(service, !more);
251 } else {
252 let c_name = ffi::CStr::from_ptr(name);
253 if let Ok(s) = c_name.to_str() {
254 log::debug!("ServiceBrowse Remove {}", s);
255 }
256 }
257 } else {
258 proxy.send(Err(error.into()), false);
259
260 log::error!(
261 "ServiceBrowse Callback Error ({}:{})",
262 error,
263 Into::<BonjourError>::into(error)
264 )
265 }
266}
267
268impl ServiceBrowserBuilder {
269 /// Create a new `ServiceBrowserBuilder` for the specified service type
270 pub fn new(service_type: &str) -> Self {
271 ServiceBrowserBuilder {
272 interface: Default::default(),
273 service_type: service_type.to_string(),
274 domain: None,
275 timeout: None,
276 close_on_end: false,
277 }
278 }
279
280 /// Set the timeout
281 pub fn timeout(&mut self, timeout: Duration) -> &mut Self {
282 self.timeout = Some(timeout);
283 self
284 }
285
286 /// Set the browser to close if no more [`Service`]s are found.
287 ///
288 /// # Note
289 /// The browser can only detect the end of the [`Service`]s if
290 /// any are found. A timeout can be used in combination with closing on
291 /// end to ensure that the browser will terminate.
292 pub fn close_on_end(&mut self) -> &mut Self {
293 self.close_on_end = true;
294 self
295 }
296
297 /// Set the interface for service discovery rather than all
298 pub fn interface(&mut self, interface: Interface) -> &mut Self {
299 self.interface = interface;
300 self
301 }
302
303 /// Set the domain for service discovery rather than all
304 pub fn domain(&mut self, domain: String) -> &mut Self {
305 self.domain = Some(domain);
306 self
307 }
308
309 /// Start the browsing operation, which will continue until the specified
310 /// timeout or until the [`ServiceBrowser`] is dropped.
311 ///
312 /// # Examples
313 /// ```
314 /// # tokio_test::block_on(async {
315 /// let mut browser = async_zeroconf::ServiceBrowserBuilder::new("_http._tcp");
316 /// let mut services = browser
317 /// .timeout(tokio::time::Duration::from_secs(2))
318 /// .browse()?;
319 ///
320 /// while let Some(Ok(v)) = services.recv().await {
321 /// println!("Service = {:?}", v);
322 /// }
323 /// # Ok::<(), async_zeroconf::ZeroconfError>(())
324 /// # });
325 /// ```
326 pub fn browse(&self) -> Result<ServiceBrowser, ZeroconfError> {
327 let (browser, task) = self.browse_task()?;
328
329 tokio::spawn(task);
330
331 Ok(browser)
332 }
333
334 /// Start the browsing operation, which will continue until the specified
335 /// timeout or until the [`ServiceBrowser`] is dropped. The returned
336 /// [`ProcessTask`] future must be awaited to process events associated with
337 /// the browser.
338 ///
339 /// # Note
340 /// This method is intended if more control is needed over how the task
341 /// is spawned. [`ServiceBrowserBuilder::browse`] will automatically spawn
342 /// the task.
343 ///
344 /// # Examples
345 /// ```
346 /// # tokio_test::block_on(async {
347 /// let mut browser = async_zeroconf::ServiceBrowserBuilder::new("_http._tcp");
348 /// let (mut services, task) = browser
349 /// .timeout(tokio::time::Duration::from_secs(2))
350 /// .browse_task()?;
351 ///
352 /// tokio::spawn(task);
353 ///
354 /// while let Some(Ok(v)) = services.recv().await {
355 /// println!("Service = {:?}", v);
356 /// }
357 /// # Ok::<(), async_zeroconf::ZeroconfError>(())
358 /// # });
359 /// ```
360 pub fn browse_task(&self) -> Result<(ServiceBrowser, impl ProcessTask), ZeroconfError> {
361 let (tx, rx) = mpsc::unbounded_channel();
362
363 let callback_context = ServiceBrowserContext { tx };
364
365 let context = Arc::new(callback_context);
366 let context_ptr =
367 Arc::as_ptr(&context) as *mut Arc<ServiceBrowserContext> as *mut libc::c_void;
368
369 let service_handle = crate::c_intf::service_browse(
370 &self.interface,
371 &self.service_type,
372 self.domain.as_deref(),
373 Some(browse_callback),
374 context_ptr,
375 )?;
376
377 let (service_ref, task) = ServiceRefWrapper::from_service(
378 service_handle,
379 OpType::new(&self.service_type, OpKind::Browse),
380 Some(Box::new(context)),
381 self.timeout,
382 )?;
383
384 log::debug!("Created ServiceBrowser");
385 let browser = ServiceBrowser {
386 rx,
387 delegate: service_ref,
388 close_on_end: self.close_on_end,
389 };
390
391 Ok((browser, task))
392 }
393}