async_zeroconf/
service_resolver.rs

1use crate::{
2    BonjourError, Interface, OpKind, OpType, ProcessTask, Service, ServiceRef, ServiceRefWrapper,
3    TxtRecord, ZeroconfError,
4};
5
6use futures::Future;
7use futures::FutureExt;
8use std::ffi;
9use std::ptr;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::mpsc;
13
14use bonjour_sys::{
15    DNSServiceErrorType, DNSServiceFlags, DNSServiceRef, TXTRecordGetCount, TXTRecordGetItemAtIndex,
16};
17
18/// `ServiceResolver` is used resolve a service obtained from a
19/// [`ServiceBrowser`][`crate::ServiceBrowser`]. Browsing does not obtain all
20/// information about a service, for example it doesn't include port
21/// information, and resolving the service will fill this information in.
22///
23/// # Note
24/// This should be used only with services from a browse operation to ensure
25/// the interface and domain are set correctly.
26///
27/// # Examples
28/// ```
29/// # tokio_test::block_on(async {
30/// let mut browser = async_zeroconf::ServiceBrowserBuilder::new("_http._tcp");
31/// let mut services = browser
32///     .timeout(tokio::time::Duration::from_secs(2))
33///     .browse()?;
34///
35/// while let Some(Ok(v)) = services.recv().await {
36///     let resolved_service = async_zeroconf::ServiceResolver::r(&v).await?;
37///     println!("Service = {}", resolved_service);
38/// }
39/// # Ok::<(), async_zeroconf::ZeroconfError>(())
40/// # });
41/// ```
42#[derive(Debug)]
43pub struct ServiceResolver {
44    timeout: Option<Duration>,
45    checked: bool,
46}
47
48impl Default for ServiceResolver {
49    fn default() -> Self {
50        ServiceResolver::new()
51    }
52}
53
54impl ServiceResolver {
55    /// Create a new `ServiceResolver` with the default settings.
56    /// The operation will have no timeout and it will check if the service to
57    /// be resolved was from a browser.
58    pub fn new() -> Self {
59        ServiceResolver {
60            timeout: None,
61            checked: true,
62        }
63    }
64
65    /// Create a new `ServiceResolver` with a timeout.
66    pub fn new_with_timeout(timeout: Duration) -> Self {
67        ServiceResolver {
68            timeout: Some(timeout),
69            checked: true,
70        }
71    }
72
73    /// Disable checking if services came from a browser
74    pub fn set_unchecked(&mut self) -> &mut Self {
75        self.checked = false;
76        self
77    }
78
79    /// Static method to resolve the specified [`Service`], the service must have
80    /// been produced from a [`ServiceBrowser`][`crate::ServiceBrowser`] to ensure
81    /// that the required information for the resolve operation is available.
82    ///
83    /// # Examples
84    /// ```
85    /// # tokio_test::block_on(async {
86    /// let mut browser = async_zeroconf::ServiceBrowserBuilder::new("_http._tcp");
87    /// let mut services = browser
88    ///     .timeout(tokio::time::Duration::from_secs(2))
89    ///     .browse()?;
90    ///
91    /// while let Some(Ok(service)) = services.recv().await {
92    ///     let resolved = async_zeroconf::ServiceResolver::r(&service).await?;
93    ///     println!("Service = {}", resolved);
94    /// }
95    /// # Ok::<(), async_zeroconf::ZeroconfError>(())
96    /// # });
97    /// ```
98    pub async fn r(service: &Service) -> Result<Service, ZeroconfError> {
99        let resolver = ServiceResolver::new();
100        resolver.resolve(service).await
101    }
102
103    /// Resolve the specified [`Service`] using this `ServiceResolver`. This does
104    /// not consume the `ServiceResolver` so more services can be resolved
105    /// using the same settings.
106    ///
107    /// # Examples
108    /// ```
109    /// # tokio_test::block_on(async {
110    /// let mut browser = async_zeroconf::ServiceBrowserBuilder::new("_http._tcp");
111    /// let mut services = browser
112    ///     .timeout(tokio::time::Duration::from_secs(2))
113    ///     .browse()?;
114    /// let resolver = async_zeroconf::ServiceResolver::new();
115    ///
116    /// while let Some(Ok(service)) = services.recv().await {
117    ///     let resolved = resolver.resolve(&service).await?;
118    ///     println!("Service = {}", resolved);
119    /// }
120    /// # Ok::<(), async_zeroconf::ZeroconfError>(())
121    /// # });
122    /// ```
123    pub async fn resolve(&self, service: &Service) -> Result<Service, ZeroconfError> {
124        let (mut resolver, task) = self.resolve_inner(service)?;
125        tokio::spawn(task);
126        resolver.get(service).await
127    }
128
129    /// Resolve the specified [`Service`] using this `ServiceResolver`. The
130    /// returned [`ProcessTask`] future must be awaited to process events
131    /// associated with the browser.
132    ///
133    /// If the resolve operation can be constructed, this will return a
134    /// future which will produce the result of the resolve operation and
135    /// a task which should be awaited on to handle any events associated
136    /// with the resolving.
137    ///
138    /// # Note
139    /// This method is intended if more control is needed over how the task
140    /// is spawned. [`ServiceResolver::resolve`] will automatically spawn the
141    /// task.
142    ///
143    /// # Examples
144    /// ```
145    /// # tokio_test::block_on(async {
146    /// let mut browser = async_zeroconf::ServiceBrowserBuilder::new("_http._tcp");
147    /// let mut services = browser
148    ///     .timeout(tokio::time::Duration::from_secs(2))
149    ///     .browse()?;
150    /// let resolver = async_zeroconf::ServiceResolver::new();
151    ///
152    /// while let Some(Ok(service)) = services.recv().await {
153    ///     if let Ok((future, task)) = resolver.resolve_task(&service).await {
154    ///         tokio::spawn(task);
155    ///         let resolved = future.await?;
156    ///         println!("Service = {}", resolved);
157    ///     }
158    /// }
159    /// # Ok::<(), async_zeroconf::ZeroconfError>(())
160    /// # });
161    /// ```
162    pub async fn resolve_task(
163        &self,
164        service: &Service,
165    ) -> Result<
166        (
167            impl Future<Output = Result<Service, ZeroconfError>>,
168            impl ProcessTask,
169        ),
170        ZeroconfError,
171    > {
172        match self.resolve_inner(service) {
173            Ok((mut resolver, task)) => {
174                let s = service.clone();
175                Ok((async move { resolver.get(&s).await }, task))
176            }
177            Err(e) => Err(e),
178        }
179    }
180
181    fn resolve_inner(
182        &self,
183        service: &Service,
184    ) -> Result<(ServiceResolverResult, impl ProcessTask), ZeroconfError> {
185        if !self.checked || (service.browse() && !service.resolve()) {
186            self.resolve_inner_unchecked(service)
187        } else {
188            Err(ZeroconfError::NotFromBrowser(service.clone()))
189        }
190    }
191
192    fn resolve_inner_unchecked(
193        &self,
194        service: &Service,
195    ) -> Result<(ServiceResolverResult, impl ProcessTask), ZeroconfError> {
196        let (tx, rx) = mpsc::unbounded_channel();
197
198        let callback_context = ServiceResolverContext { tx };
199
200        let context = Arc::new(callback_context);
201        let context_ptr =
202            Arc::as_ptr(&context) as *mut Arc<ServiceResolverContext> as *mut libc::c_void;
203
204        let domain = &service
205            .domain()
206            .as_ref()
207            .ok_or_else(|| ZeroconfError::NotFromBrowser(service.clone()))?;
208
209        let service_handle = crate::c_intf::service_resolve(
210            service.name(),
211            &service.interface(),
212            service.service_type(),
213            domain,
214            Some(resolve_callback),
215            context_ptr,
216        )?;
217
218        let (delegate, task) = ServiceRefWrapper::from_service(
219            service_handle,
220            OpType::new(service.service_type(), OpKind::Resolve),
221            Some(Box::new(context)),
222            self.timeout,
223        )?;
224
225        let result = ServiceResolverResult { rx, delegate };
226
227        Ok((result, task))
228    }
229}
230
231#[derive(Debug)]
232struct ResolverInformation {
233    interface: Interface,
234    fullname: String,
235    hosttarget: String,
236    port: u16,
237    txt_record: TxtRecord,
238}
239
240impl ResolverInformation {
241    fn merge(self, service: &Service) -> Service {
242        assert_eq!(
243            &self.interface,
244            service.interface(),
245            "Interface should match on resolved service"
246        );
247
248        let mut s = Service::new_with_txt(
249            service.name(),
250            service.service_type(),
251            self.port,
252            self.txt_record,
253        );
254
255        s.set_interface(self.interface).set_resolve();
256
257        match service.domain() {
258            Some(d) => {
259                let host = self.hosttarget.replace(d, "");
260                s.set_host(host[0..host.len() - 1].to_string())
261                    .set_domain(d.to_string())
262            }
263            _ => &s,
264        };
265
266        s
267    }
268}
269
270#[derive(Debug)]
271struct ServiceResolverResult {
272    rx: mpsc::UnboundedReceiver<Result<ResolverInformation, ZeroconfError>>,
273    delegate: ServiceRef,
274}
275
276impl ServiceResolverResult {
277    async fn get(&mut self, service: &Service) -> Result<Service, ZeroconfError> {
278        self.rx
279            .recv()
280            .map(move |res| match res {
281                Some(Ok(s)) => Ok(s.merge(service)),
282                Some(Err(e)) => Err(e),
283                None => Err(ZeroconfError::Timeout(service.clone())),
284            })
285            .await
286    }
287}
288
289#[derive(Debug)]
290struct ServiceResolverContext {
291    tx: mpsc::UnboundedSender<Result<ResolverInformation, ZeroconfError>>,
292}
293
294impl ServiceResolverContext {
295    fn send(&self, info: Result<ResolverInformation, ZeroconfError>) {
296        if self.tx.send(info).is_err() {
297            log::warn!("Failed to send resolved information, receiver dropped");
298        }
299    }
300}
301
302unsafe fn resolve_callback_inner(
303    intf_index: u32,
304    fullname: *const libc::c_char,
305    hosttarget: *const libc::c_char,
306    port: u16,
307    txt_len: u16,
308    txt_record: *const libc::c_uchar,
309) -> Result<ResolverInformation, ZeroconfError> {
310    let c_fullname = ffi::CStr::from_ptr(fullname);
311    let c_hosttarget = ffi::CStr::from_ptr(hosttarget);
312    let fullname = c_fullname.to_str()?;
313    let hosttarget = c_hosttarget.to_str()?;
314    let port = port.to_be();
315
316    log::debug!(
317        "ServiceResolve Callback OK ({}:{}:{})",
318        fullname,
319        hosttarget,
320        port
321    );
322
323    let txt_count = TXTRecordGetCount(txt_len, txt_record as *const libc::c_void);
324    let mut txt = TxtRecord::new();
325    for i in 0..txt_count {
326        let keysize: u16 = 256;
327        let mut valsize = 0;
328        let mut valptr: *const libc::c_void = ptr::null_mut();
329        let mut keybuf = vec![0; (keysize + 1).into()];
330        let keyptr = keybuf.as_mut_ptr() as *mut libc::c_char;
331        let err = TXTRecordGetItemAtIndex(
332            txt_len,
333            txt_record as *const libc::c_void,
334            i,
335            keysize,
336            keyptr,
337            &mut valsize,
338            &mut valptr,
339        );
340
341        if err == 0 {
342            let keylen = keybuf.iter().position(|&c| c == 0).expect(
343                "No error reported by TXTRecordGetItemAtIndex but no null byte in key string",
344            );
345            keybuf.truncate(keylen);
346
347            let key = String::from_utf8_lossy(&keybuf);
348            let val_slice =
349                std::slice::from_raw_parts(valptr as *const libc::c_uchar, valsize.into());
350
351            txt.add_vec(key.into_owned(), val_slice.to_owned());
352        } else {
353            log::error!(
354                "TXTRecordGetItemAtIndex Callback Error ({}:{})",
355                err,
356                Into::<BonjourError>::into(err)
357            );
358            return Err(err.into());
359        }
360    }
361
362    let info = ResolverInformation {
363        interface: Interface::Interface(intf_index),
364        fullname: fullname.to_string(),
365        hosttarget: hosttarget.to_string(),
366        port,
367        txt_record: txt,
368    };
369
370    Ok(info)
371}
372
373// Callback passed to DNSServiceResolve
374unsafe extern "C" fn resolve_callback(
375    _sd_ref: DNSServiceRef,
376    flags: DNSServiceFlags,
377    intf_index: u32,
378    error: DNSServiceErrorType,
379    fullname: *const libc::c_char,
380    hosttarget: *const libc::c_char,
381    port: u16,
382    txt_len: u16,
383    txt_record: *const libc::c_uchar,
384    context: *mut libc::c_void,
385) {
386    let proxy = &*(context as *const ServiceResolverContext);
387    if error == 0 {
388        let more = (flags & 0x1) == 0x1;
389        if more {
390            log::warn!("Unexpected DNSServiceFlagsMoreComing set on resolve")
391        }
392
393        proxy.send(resolve_callback_inner(
394            intf_index, fullname, hosttarget, port, txt_len, txt_record,
395        ));
396    } else {
397        proxy.send(Err(error.into()));
398        log::error!(
399            "ServiceResolve Callback Error ({}:{})",
400            error,
401            Into::<BonjourError>::into(error)
402        )
403    }
404}