1use crate::{
2 BonjourError, Interface, OpKind, OpType, ProcessTask, Service, ServiceRef, ServiceRefWrapper,
3 TxtRecord, ZeroconfError,
4};
5
6use futures::Future;
7use futures::FutureExt;
8use std::ffi;
9use std::ptr;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::mpsc;
13
14use bonjour_sys::{
15 DNSServiceErrorType, DNSServiceFlags, DNSServiceRef, TXTRecordGetCount, TXTRecordGetItemAtIndex,
16};
17
18#[derive(Debug)]
43pub struct ServiceResolver {
44 timeout: Option<Duration>,
45 checked: bool,
46}
47
48impl Default for ServiceResolver {
49 fn default() -> Self {
50 ServiceResolver::new()
51 }
52}
53
54impl ServiceResolver {
55 pub fn new() -> Self {
59 ServiceResolver {
60 timeout: None,
61 checked: true,
62 }
63 }
64
65 pub fn new_with_timeout(timeout: Duration) -> Self {
67 ServiceResolver {
68 timeout: Some(timeout),
69 checked: true,
70 }
71 }
72
73 pub fn set_unchecked(&mut self) -> &mut Self {
75 self.checked = false;
76 self
77 }
78
79 pub async fn r(service: &Service) -> Result<Service, ZeroconfError> {
99 let resolver = ServiceResolver::new();
100 resolver.resolve(service).await
101 }
102
103 pub async fn resolve(&self, service: &Service) -> Result<Service, ZeroconfError> {
124 let (mut resolver, task) = self.resolve_inner(service)?;
125 tokio::spawn(task);
126 resolver.get(service).await
127 }
128
129 pub async fn resolve_task(
163 &self,
164 service: &Service,
165 ) -> Result<
166 (
167 impl Future<Output = Result<Service, ZeroconfError>>,
168 impl ProcessTask,
169 ),
170 ZeroconfError,
171 > {
172 match self.resolve_inner(service) {
173 Ok((mut resolver, task)) => {
174 let s = service.clone();
175 Ok((async move { resolver.get(&s).await }, task))
176 }
177 Err(e) => Err(e),
178 }
179 }
180
181 fn resolve_inner(
182 &self,
183 service: &Service,
184 ) -> Result<(ServiceResolverResult, impl ProcessTask), ZeroconfError> {
185 if !self.checked || (service.browse() && !service.resolve()) {
186 self.resolve_inner_unchecked(service)
187 } else {
188 Err(ZeroconfError::NotFromBrowser(service.clone()))
189 }
190 }
191
192 fn resolve_inner_unchecked(
193 &self,
194 service: &Service,
195 ) -> Result<(ServiceResolverResult, impl ProcessTask), ZeroconfError> {
196 let (tx, rx) = mpsc::unbounded_channel();
197
198 let callback_context = ServiceResolverContext { tx };
199
200 let context = Arc::new(callback_context);
201 let context_ptr =
202 Arc::as_ptr(&context) as *mut Arc<ServiceResolverContext> as *mut libc::c_void;
203
204 let domain = &service
205 .domain()
206 .as_ref()
207 .ok_or_else(|| ZeroconfError::NotFromBrowser(service.clone()))?;
208
209 let service_handle = crate::c_intf::service_resolve(
210 service.name(),
211 &service.interface(),
212 service.service_type(),
213 domain,
214 Some(resolve_callback),
215 context_ptr,
216 )?;
217
218 let (delegate, task) = ServiceRefWrapper::from_service(
219 service_handle,
220 OpType::new(service.service_type(), OpKind::Resolve),
221 Some(Box::new(context)),
222 self.timeout,
223 )?;
224
225 let result = ServiceResolverResult { rx, delegate };
226
227 Ok((result, task))
228 }
229}
230
231#[derive(Debug)]
232struct ResolverInformation {
233 interface: Interface,
234 fullname: String,
235 hosttarget: String,
236 port: u16,
237 txt_record: TxtRecord,
238}
239
240impl ResolverInformation {
241 fn merge(self, service: &Service) -> Service {
242 assert_eq!(
243 &self.interface,
244 service.interface(),
245 "Interface should match on resolved service"
246 );
247
248 let mut s = Service::new_with_txt(
249 service.name(),
250 service.service_type(),
251 self.port,
252 self.txt_record,
253 );
254
255 s.set_interface(self.interface).set_resolve();
256
257 match service.domain() {
258 Some(d) => {
259 let host = self.hosttarget.replace(d, "");
260 s.set_host(host[0..host.len() - 1].to_string())
261 .set_domain(d.to_string())
262 }
263 _ => &s,
264 };
265
266 s
267 }
268}
269
270#[derive(Debug)]
271struct ServiceResolverResult {
272 rx: mpsc::UnboundedReceiver<Result<ResolverInformation, ZeroconfError>>,
273 delegate: ServiceRef,
274}
275
276impl ServiceResolverResult {
277 async fn get(&mut self, service: &Service) -> Result<Service, ZeroconfError> {
278 self.rx
279 .recv()
280 .map(move |res| match res {
281 Some(Ok(s)) => Ok(s.merge(service)),
282 Some(Err(e)) => Err(e),
283 None => Err(ZeroconfError::Timeout(service.clone())),
284 })
285 .await
286 }
287}
288
289#[derive(Debug)]
290struct ServiceResolverContext {
291 tx: mpsc::UnboundedSender<Result<ResolverInformation, ZeroconfError>>,
292}
293
294impl ServiceResolverContext {
295 fn send(&self, info: Result<ResolverInformation, ZeroconfError>) {
296 if self.tx.send(info).is_err() {
297 log::warn!("Failed to send resolved information, receiver dropped");
298 }
299 }
300}
301
302unsafe fn resolve_callback_inner(
303 intf_index: u32,
304 fullname: *const libc::c_char,
305 hosttarget: *const libc::c_char,
306 port: u16,
307 txt_len: u16,
308 txt_record: *const libc::c_uchar,
309) -> Result<ResolverInformation, ZeroconfError> {
310 let c_fullname = ffi::CStr::from_ptr(fullname);
311 let c_hosttarget = ffi::CStr::from_ptr(hosttarget);
312 let fullname = c_fullname.to_str()?;
313 let hosttarget = c_hosttarget.to_str()?;
314 let port = port.to_be();
315
316 log::debug!(
317 "ServiceResolve Callback OK ({}:{}:{})",
318 fullname,
319 hosttarget,
320 port
321 );
322
323 let txt_count = TXTRecordGetCount(txt_len, txt_record as *const libc::c_void);
324 let mut txt = TxtRecord::new();
325 for i in 0..txt_count {
326 let keysize: u16 = 256;
327 let mut valsize = 0;
328 let mut valptr: *const libc::c_void = ptr::null_mut();
329 let mut keybuf = vec![0; (keysize + 1).into()];
330 let keyptr = keybuf.as_mut_ptr() as *mut libc::c_char;
331 let err = TXTRecordGetItemAtIndex(
332 txt_len,
333 txt_record as *const libc::c_void,
334 i,
335 keysize,
336 keyptr,
337 &mut valsize,
338 &mut valptr,
339 );
340
341 if err == 0 {
342 let keylen = keybuf.iter().position(|&c| c == 0).expect(
343 "No error reported by TXTRecordGetItemAtIndex but no null byte in key string",
344 );
345 keybuf.truncate(keylen);
346
347 let key = String::from_utf8_lossy(&keybuf);
348 let val_slice =
349 std::slice::from_raw_parts(valptr as *const libc::c_uchar, valsize.into());
350
351 txt.add_vec(key.into_owned(), val_slice.to_owned());
352 } else {
353 log::error!(
354 "TXTRecordGetItemAtIndex Callback Error ({}:{})",
355 err,
356 Into::<BonjourError>::into(err)
357 );
358 return Err(err.into());
359 }
360 }
361
362 let info = ResolverInformation {
363 interface: Interface::Interface(intf_index),
364 fullname: fullname.to_string(),
365 hosttarget: hosttarget.to_string(),
366 port,
367 txt_record: txt,
368 };
369
370 Ok(info)
371}
372
373unsafe extern "C" fn resolve_callback(
375 _sd_ref: DNSServiceRef,
376 flags: DNSServiceFlags,
377 intf_index: u32,
378 error: DNSServiceErrorType,
379 fullname: *const libc::c_char,
380 hosttarget: *const libc::c_char,
381 port: u16,
382 txt_len: u16,
383 txt_record: *const libc::c_uchar,
384 context: *mut libc::c_void,
385) {
386 let proxy = &*(context as *const ServiceResolverContext);
387 if error == 0 {
388 let more = (flags & 0x1) == 0x1;
389 if more {
390 log::warn!("Unexpected DNSServiceFlagsMoreComing set on resolve")
391 }
392
393 proxy.send(resolve_callback_inner(
394 intf_index, fullname, hosttarget, port, txt_len, txt_record,
395 ));
396 } else {
397 proxy.send(Err(error.into()));
398 log::error!(
399 "ServiceResolve Callback Error ({}:{})",
400 error,
401 Into::<BonjourError>::into(error)
402 )
403 }
404}