async_zeroconf/service_ref.rs
1// Private helper structures to wrap the service reference
2
3use crate::{BonjourError, ZeroconfError};
4
5use bonjour_sys::{
6 DNSServiceProcessResult, DNSServiceRef, DNSServiceRefDeallocate, DNSServiceRefSockFD,
7};
8use futures::Future;
9use std::any::Any;
10use std::fmt::Display;
11use std::sync::Mutex;
12use std::time::Duration;
13use tokio::io::unix::AsyncFd;
14use tokio::sync::oneshot;
15
16/// `OpType` is used to indicate the service type and the kind of operation
17/// associated with a [`ServiceRef`]. Primarily intended for debug.
18///
19/// # Examples
20/// ```
21/// # tokio_test::block_on(async {
22/// let service = async_zeroconf::Service::new("Server", "_http._tcp", 80);
23/// let service_ref = service.publish().await?;
24///
25/// assert_eq!(service_ref.op_type().service_type(), "_http._tcp");
26/// assert_eq!(service_ref.op_type().kind(), &async_zeroconf::OpKind::Publish);
27/// # Ok::<(), async_zeroconf::ZeroconfError>(())
28/// # });
29/// ```
30#[derive(Debug, Clone)]
31pub struct OpType {
32 service_type: String,
33 kind: OpKind,
34}
35
36impl OpType {
37 pub(crate) fn new(service_type: &str, kind: OpKind) -> Self {
38 OpType {
39 service_type: service_type.to_string(),
40 kind,
41 }
42 }
43
44 /// The associated service type (e.g. `"_http._tcp"`).
45 pub fn service_type(&self) -> &str {
46 &self.service_type
47 }
48
49 /// The associated type of operation (e.g. publishing a service).
50 pub fn kind(&self) -> &OpKind {
51 &self.kind
52 }
53}
54
55impl Display for OpType {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
57 write!(f, "{:?}[{}]", self.kind, self.service_type)
58 }
59}
60
61/// `OpKind` represents the possible kinds of operation associated with a
62/// [`ServiceRef`], primarily used for debug and obtained from the [`OpType`]
63/// returned by [`ServiceRef::op_type`].
64#[derive(Debug, Clone, Eq, PartialEq)]
65pub enum OpKind {
66 /// An operation publishing a service.
67 Publish,
68 /// An operation to browse for a given type of service.
69 Browse,
70 /// An operation to resolve a service.
71 Resolve,
72}
73
74/// Struct to hold a published service, which keeps the service alive while a
75/// reference to it is held.
76/// When dropped the Service will be removed and any associated resources
77/// deallocated.
78///
79/// This should be created via a [`Service`][`crate::Service`] or a
80/// [`ServiceResolver`][`crate::ServiceResolver`]. For a browse
81/// operation the `ServiceRef` is held by the `ServiceBrowser` created by a
82/// [`ServiceBrowserBuilder`][`crate::ServiceBrowserBuilder`].
83#[derive(Debug)]
84#[must_use]
85pub struct ServiceRef {
86 shutdown_tx: Option<oneshot::Sender<()>>,
87 op_type: OpType,
88}
89
90impl ServiceRef {
91 /// Return a descriptive type of the operation associated with this
92 /// reference.
93 pub fn op_type(&self) -> &OpType {
94 &self.op_type
95 }
96}
97
98impl Drop for ServiceRef {
99 fn drop(&mut self) {
100 log::debug!("Dropping ServiceRef ({})", self.op_type);
101 // Send shutdown to end process task if idle
102 // Should only fail if rx already dropped
103 if self
104 .shutdown_tx
105 .take()
106 .expect("shutdown taken before drop")
107 .send(())
108 .is_err()
109 {}
110 }
111}
112
113// Internal type to hold the file descriptor for the socket associated with the
114// service.
115#[derive(Debug)]
116pub(crate) struct ServiceFileDescriptor {
117 pub fd: i32,
118}
119
120// Allow ServiceFileDescriptor to be convered to a AsyncFd by implementing the
121// AsRawFd trait.
122impl std::os::unix::prelude::AsRawFd for ServiceFileDescriptor {
123 fn as_raw_fd(&self) -> i32 {
124 self.fd
125 }
126}
127
128/// The `ProcessTask` trait represents the future that is returned from some
129/// functions that is awaited on to process events associated with a published
130/// service or a browse operation.
131pub trait ProcessTask: Future<Output = ()> + Send + Sync {}
132
133impl<T> ProcessTask for T where T: Future<Output = ()> + Send + Sync {}
134
135#[derive(Debug)]
136pub(crate) struct ServiceRefWrapper {
137 // Pointer to reference returned by C API
138 pub inner: DNSServiceRef,
139 // Mutex to protect service reference
140 pub lock: Mutex<()>,
141 // Async file descriptor to detect new events asynchronously
142 pub fd: AsyncFd<ServiceFileDescriptor>,
143 // Hold a reference to an (optional) context used for C API callbacks
144 context: Option<Box<dyn Any + Send>>,
145 // Operation type that created this reference
146 op_type: OpType,
147}
148
149impl ServiceRefWrapper {
150 pub fn from_service(
151 service_ref: DNSServiceRef,
152 op_type: OpType,
153 context: Option<Box<dyn Any + Send>>,
154 timeout: Option<Duration>,
155 ) -> Result<(ServiceRef, impl ProcessTask), std::io::Error> {
156 log::trace!("Call DNSServiceRefSockFD");
157 let fd = unsafe { DNSServiceRefSockFD(service_ref) };
158 log::trace!(" FD:{}", fd);
159
160 log::debug!("Creating ServiceRef ({})", op_type);
161
162 match AsyncFd::new(ServiceFileDescriptor { fd }) {
163 Ok(async_fd) => {
164 // Create channel for shutdown
165 let (tx, rx) = oneshot::channel::<()>();
166
167 // Create the wrapper for processing events
168 let wrapper = ServiceRefWrapper {
169 inner: service_ref,
170 lock: Mutex::new(()),
171 fd: async_fd,
172 context,
173 op_type: op_type.clone(),
174 };
175
176 // Spawn the task that will process events
177 let task = async move {
178 match ServiceRefWrapper::process(rx, wrapper, timeout).await {
179 Ok(_) => (),
180 Err(e) => log::error!("Error on processing: {}", e),
181 }
182 };
183
184 // Create the reference that will hold the service active
185 let s_ref = ServiceRef {
186 shutdown_tx: Some(tx),
187 op_type,
188 };
189
190 Ok((s_ref, task))
191 }
192 Err(e) => Err(e),
193 }
194 }
195
196 /// A future to wait for any pending events related to the service,
197 /// handling them and then completing the future.
198 async fn process_events(service_ref: &ServiceRefWrapper) -> Result<bool, ZeroconfError> {
199 // Wait on indication that file descriptor is readable
200 let mut fd = service_ref.fd.readable().await?;
201
202 log::trace!("Call DNSServiceProcessResult");
203
204 // Process any pending events
205 let process_err = {
206 let mut _guard = service_ref.lock.lock()?;
207 unsafe { DNSServiceProcessResult(service_ref.inner) }
208 };
209 // Clear ready flag for socket to wait for next event
210 // As there is no await point or polling between processing above and
211 // clearing the flag, there should be no opportunity to 'miss' an event
212 // between the DNSServiceProcessResult and clear_ready().
213 fd.clear_ready();
214 if process_err != 0 {
215 return Err(Into::<BonjourError>::into(process_err).into());
216 }
217
218 Ok(true)
219 }
220
221 /// Processing wrapper to keep processing events as available
222 async fn process(
223 mut rx: oneshot::Receiver<()>,
224 service_ref: ServiceRefWrapper,
225 timeout: Option<Duration>,
226 ) -> Result<(), ZeroconfError> {
227 let (tx_time, mut rx_time) = oneshot::channel();
228
229 if let Some(t) = timeout {
230 tokio::spawn(async move {
231 tokio::time::sleep(t).await;
232 match tx_time.send(()) {
233 Ok(_) => {
234 log::debug!("Sending timeout");
235 }
236 Err(_) => {
237 log::trace!("Sending timeout failed - processing ended due to shutdown");
238 }
239 }
240 });
241 }
242
243 loop {
244 tokio::select! {
245 // Shutdown event
246 _ = &mut rx => {
247 log::debug!("Process task got shutdown");
248 return Ok(());
249 }
250 // Timeout future
251 _ = &mut rx_time => {
252 log::debug!("Process task got timeout");
253 return Ok(());
254 }
255 // Event processing
256 e = Self::process_events(&service_ref) => {
257 match e {
258 Ok(b) => {
259 if b {
260 log::trace!("Events processed");
261 } else {
262 log::trace!("Got null pointer due to shutdown");
263 return Ok(());
264 }
265 },
266 Err(e) => return Err(e)
267 }
268 }
269 }
270 }
271 }
272}
273
274// Implement Send as reference is thread-safe
275unsafe impl Send for ServiceRefWrapper {}
276// Implement Sync as reference is protected by mutex
277unsafe impl Sync for ServiceRefWrapper {}
278
279impl Drop for ServiceRefWrapper {
280 fn drop(&mut self) {
281 log::debug!(
282 "Dropping and deallocating service reference ({})",
283 self.op_type
284 );
285 {
286 match self.lock.lock() {
287 Ok(_guard) => {
288 unsafe { DNSServiceRefDeallocate(self.inner) };
289 }
290 Err(_) => {
291 log::warn!("Service reference mutex was poisoned");
292 unsafe { DNSServiceRefDeallocate(self.inner) };
293 }
294 }
295 }
296 if self.context.is_some() {
297 log::debug!("Context to be dropped ({})", self.op_type);
298 }
299 }
300}