anotify/
handle.rs

1use nix::sys::inotify::AddWatchFlags;
2use std::{
3    marker::PhantomData,
4    ops::{Deref, DerefMut},
5    path::PathBuf,
6    time::Duration,
7};
8use thiserror::Error;
9use tokio::{
10    sync::{mpsc::Sender as MpscSend, oneshot::Sender as OnceSend},
11    task::JoinHandle,
12};
13use tokio_stream::wrappers::ReceiverStream;
14
15use crate::{
16    futures::{DirectoryWatchFuture, DirectoryWatchStream, FileWatchFuture, FileWatchStream},
17    task::WatchRequestInner,
18};
19
20#[derive(Debug, Clone)]
21pub struct Handle {
22    pub(crate) request_tx: MpscSend<WatchRequestInner>,
23}
24
25#[derive(Debug)]
26pub struct OwnedHandle {
27    pub(crate) inner: Handle,
28    pub(crate) shutdown: OnceSend<()>,
29    pub(crate) join: JoinHandle<()>,
30}
31
32impl OwnedHandle {
33    pub const DEFAULT_SHUTDOWN: Duration = Duration::from_secs(2);
34    pub const DEFAULT_REQUEST_BUFFER: usize = 32;
35
36    pub async fn shutdown_with(mut self, wait: Duration) {
37        let _ = self.shutdown.send(());
38
39        let join = tokio::time::timeout(wait, &mut self.join);
40
41        match join.await {
42            Err(_) => self.join.abort(),
43            Ok(Err(e)) => {
44                if e.is_cancelled() {
45                    panic!("The Watch Task was cancelled without consuming the OwnedHandle");
46                }
47
48                std::panic::resume_unwind(e.into_panic());
49            }
50            Ok(Ok(())) => {}
51        }
52    }
53
54    pub async fn shutdown(self) {
55        self.shutdown_with(Self::DEFAULT_SHUTDOWN).await
56    }
57
58    pub async fn wait(self) -> Result<(), tokio::task::JoinError> {
59        self.join.await
60    }
61}
62
63impl Deref for OwnedHandle {
64    type Target = Handle;
65
66    fn deref(&self) -> &Self::Target {
67        &self.inner
68    }
69}
70
71impl DerefMut for OwnedHandle {
72    fn deref_mut(&mut self) -> &mut Self::Target {
73        &mut self.inner
74    }
75}
76
77#[derive(Debug, Error)]
78pub enum RequestError {
79    #[error("There is no file or directory at the path: {0}")]
80    DoesNotExist(PathBuf),
81    #[error("The inode at {0} does not have the correct type for this operation")]
82    IncorrectType(PathBuf),
83}
84
85#[derive(Debug, Error)]
86pub enum WatchError {
87    #[error("The watcher task was shutdown while before the next event could be received")]
88    WatcherShutdown,
89}
90
91impl Handle {
92    /// Create a file watch builder
93    pub fn file(&mut self, path: PathBuf) -> Result<WatchRequest<'_, FileEvents>, RequestError> {
94        if !path.exists() {
95            return Err(RequestError::DoesNotExist(path));
96        }
97        if path.is_dir() {
98            return Err(RequestError::IncorrectType(path));
99        }
100
101        Ok(WatchRequest {
102            handle: self,
103            path,
104            buffer: FileEvents::DEFAULT_BUFFER,
105            flags: AddWatchFlags::empty(),
106            _type: Default::default(),
107        })
108    }
109
110    /// Create a directory watch builder
111    pub fn dir(
112        &mut self,
113        path: PathBuf,
114    ) -> Result<WatchRequest<'_, DirectoryEvents>, RequestError> {
115        // TODO(josiah) make take Into<Path>
116
117        if !path.exists() {
118            return Err(RequestError::DoesNotExist(path));
119        }
120        if !path.is_dir() {
121            return Err(RequestError::IncorrectType(path));
122        }
123
124        Ok(WatchRequest {
125            handle: self,
126            path,
127            buffer: DirectoryEvents::DEFAULT_BUFFER,
128            flags: AddWatchFlags::empty(),
129            _type: Default::default(),
130        })
131    }
132}
133
134mod sealed {
135    pub trait Sealed {}
136}
137
138pub trait WatchType: sealed::Sealed {
139    const DEFAULT_BUFFER: usize;
140}
141
142pub enum FileEvents {}
143pub enum DirectoryEvents {}
144
145impl sealed::Sealed for FileEvents {}
146impl sealed::Sealed for DirectoryEvents {}
147
148impl WatchType for FileEvents {
149    const DEFAULT_BUFFER: usize = 16;
150}
151
152impl WatchType for DirectoryEvents {
153    const DEFAULT_BUFFER: usize = 32;
154}
155
156/// Configuration and dispatch for a watch
157pub struct WatchRequest<'handle, T: WatchType> {
158    handle: &'handle mut Handle,
159    path: PathBuf,
160    buffer: usize,
161    flags: AddWatchFlags,
162    _type: PhantomData<T>,
163}
164
165/// # Common Configuration Methods
166impl<T: WatchType> WatchRequest<'_, T> {
167    /// Set the amount of items for this watch to buffer,
168    ///
169    /// value is not considered for single event watches
170    pub fn buffer(mut self, size: usize) -> Self {
171        self.buffer = size;
172        self
173    }
174
175    /// Set weather file read events should be captured
176    pub fn read(mut self, set: bool) -> Self {
177        self.flags.set(AddWatchFlags::IN_ACCESS, set);
178        self
179    }
180
181    /// Set weather file open events should be captured
182    pub fn modify(mut self, set: bool) -> Self {
183        self.flags.set(AddWatchFlags::IN_MODIFY, set);
184        self
185    }
186
187    /// Set weather file open events should be captured
188    pub fn open(mut self, set: bool) -> Self {
189        self.flags.set(AddWatchFlags::IN_OPEN, set);
190        self
191    }
192
193    /// Set weather file close events should be generated
194    pub fn close(mut self, set: bool) -> Self {
195        self.flags.set(AddWatchFlags::IN_CLOSE, set);
196        self
197    }
198
199    // TODO(josiah) moves will require a more robust background task so that move events can be
200    // coalesced correctly
201}
202
203/// # File Specific Dispatch Methods
204impl<'handle> WatchRequest<'handle, FileEvents> {
205    /// Create a watch which will only return the next captured event, and then unsubscribe
206    ///
207    /// Ignores the value set by [`buffer`][`WatchRequest::buffer`]
208    pub async fn next(self) -> Result<FileWatchFuture, WatchError> {
209        let (sender, rx) = tokio::sync::oneshot::channel();
210
211        let sender = crate::task::Sender::Once(sender);
212
213        let (setup_tx, setup_rx) = tokio::sync::oneshot::channel();
214
215        self.handle
216            .request_tx
217            .try_send(WatchRequestInner::Start {
218                flags: self.flags,
219                path: self.path,
220                dir: false,
221                sender,
222                watch_token_tx: setup_tx,
223            })
224            .map_err(|_| WatchError::WatcherShutdown)?;
225
226        let watch_token = setup_rx.await.map_err(|_| WatchError::WatcherShutdown)?;
227
228        Ok(FileWatchFuture {
229            inner: rx,
230            watch_token,
231            closed: false,
232            handle: self.handle.clone(),
233        })
234    }
235
236    /// Create a watch which will capture and return a stream of events until dropped.
237    ///
238    /// Will keep oldest events on buffer overflow set by [`buffer`][`WatchRequest::buffer`]
239    pub async fn watch(self) -> Result<FileWatchStream, WatchError> {
240        let (sender, rx) = tokio::sync::mpsc::channel(self.buffer);
241
242        let sender = crate::task::Sender::Stream(sender);
243
244        let (setup_tx, setup_rx) = tokio::sync::oneshot::channel();
245
246        self.handle
247            .request_tx
248            .try_send(WatchRequestInner::Start {
249                flags: self.flags,
250                path: self.path,
251                dir: false,
252                sender,
253                watch_token_tx: setup_tx,
254            })
255            .map_err(|_| WatchError::WatcherShutdown)?;
256
257        let watch_token = setup_rx.await.map_err(|_| WatchError::WatcherShutdown)?;
258
259        Ok(FileWatchStream {
260            inner: ReceiverStream::from(rx),
261            watch_token,
262            handle: self.handle.clone(),
263        })
264    }
265}
266
267/// # Directory Specific Dispatch Methods
268impl<'handle> WatchRequest<'handle, DirectoryEvents> {
269    /// Create a watch which will only return the next captured event, and then unsubscribe
270    ///
271    /// Ignores the value set by [`buffer`][`WatchRequest::buffer`]
272    pub async fn next(self) -> Result<DirectoryWatchFuture, WatchError> {
273        let (sender, rx) = tokio::sync::oneshot::channel();
274
275        let sender = crate::task::Sender::Once(sender);
276
277        let (setup_tx, setup_rx) = tokio::sync::oneshot::channel();
278
279        self.handle
280            .request_tx
281            .try_send(WatchRequestInner::Start {
282                flags: self.flags,
283                path: self.path,
284                dir: true,
285                sender,
286                watch_token_tx: setup_tx,
287            })
288            .map_err(|_| WatchError::WatcherShutdown)?;
289
290        let watch_token = setup_rx.await.map_err(|_| WatchError::WatcherShutdown)?;
291
292        Ok(DirectoryWatchFuture {
293            inner: rx,
294            watch_token,
295            handle: self.handle.clone(),
296            closed: false,
297        })
298    }
299
300    /// Create a watch which will capture and return a stream of events until dropped.
301    ///
302    /// Will keep oldest events on buffer overflow set by [`buffer`][`WatchRequest::buffer`]
303    pub async fn watch(self) -> Result<DirectoryWatchStream, WatchError> {
304        let (sender, rx) = tokio::sync::mpsc::channel(self.buffer);
305
306        let sender = crate::task::Sender::Stream(sender);
307
308        let (setup_tx, setup_rx) = tokio::sync::oneshot::channel();
309
310        self.handle
311            .request_tx
312            .try_send(WatchRequestInner::Start {
313                flags: self.flags,
314                path: self.path,
315                dir: true,
316                sender,
317                watch_token_tx: setup_tx,
318            })
319            .map_err(|_| WatchError::WatcherShutdown)?;
320
321        let watch_token = setup_rx.await.map_err(|_| WatchError::WatcherShutdown)?;
322
323        Ok(DirectoryWatchStream {
324            inner: ReceiverStream::from(rx),
325            watch_token,
326            handle: self.handle.clone(),
327        })
328    }
329}