1use nix::sys::inotify::AddWatchFlags;
2use std::{
3 marker::PhantomData,
4 ops::{Deref, DerefMut},
5 path::PathBuf,
6 time::Duration,
7};
8use thiserror::Error;
9use tokio::{
10 sync::{mpsc::Sender as MpscSend, oneshot::Sender as OnceSend},
11 task::JoinHandle,
12};
13use tokio_stream::wrappers::ReceiverStream;
14
15use crate::{
16 futures::{DirectoryWatchFuture, DirectoryWatchStream, FileWatchFuture, FileWatchStream},
17 task::WatchRequestInner,
18};
19
20#[derive(Debug, Clone)]
21pub struct Handle {
22 pub(crate) request_tx: MpscSend<WatchRequestInner>,
23}
24
25#[derive(Debug)]
26pub struct OwnedHandle {
27 pub(crate) inner: Handle,
28 pub(crate) shutdown: OnceSend<()>,
29 pub(crate) join: JoinHandle<()>,
30}
31
32impl OwnedHandle {
33 pub const DEFAULT_SHUTDOWN: Duration = Duration::from_secs(2);
34 pub const DEFAULT_REQUEST_BUFFER: usize = 32;
35
36 pub async fn shutdown_with(mut self, wait: Duration) {
37 let _ = self.shutdown.send(());
38
39 let join = tokio::time::timeout(wait, &mut self.join);
40
41 match join.await {
42 Err(_) => self.join.abort(),
43 Ok(Err(e)) => {
44 if e.is_cancelled() {
45 panic!("The Watch Task was cancelled without consuming the OwnedHandle");
46 }
47
48 std::panic::resume_unwind(e.into_panic());
49 }
50 Ok(Ok(())) => {}
51 }
52 }
53
54 pub async fn shutdown(self) {
55 self.shutdown_with(Self::DEFAULT_SHUTDOWN).await
56 }
57
58 pub async fn wait(self) -> Result<(), tokio::task::JoinError> {
59 self.join.await
60 }
61}
62
63impl Deref for OwnedHandle {
64 type Target = Handle;
65
66 fn deref(&self) -> &Self::Target {
67 &self.inner
68 }
69}
70
71impl DerefMut for OwnedHandle {
72 fn deref_mut(&mut self) -> &mut Self::Target {
73 &mut self.inner
74 }
75}
76
77#[derive(Debug, Error)]
78pub enum RequestError {
79 #[error("There is no file or directory at the path: {0}")]
80 DoesNotExist(PathBuf),
81 #[error("The inode at {0} does not have the correct type for this operation")]
82 IncorrectType(PathBuf),
83}
84
85#[derive(Debug, Error)]
86pub enum WatchError {
87 #[error("The watcher task was shutdown while before the next event could be received")]
88 WatcherShutdown,
89}
90
91impl Handle {
92 pub fn file(&mut self, path: PathBuf) -> Result<WatchRequest<'_, FileEvents>, RequestError> {
94 if !path.exists() {
95 return Err(RequestError::DoesNotExist(path));
96 }
97 if path.is_dir() {
98 return Err(RequestError::IncorrectType(path));
99 }
100
101 Ok(WatchRequest {
102 handle: self,
103 path,
104 buffer: FileEvents::DEFAULT_BUFFER,
105 flags: AddWatchFlags::empty(),
106 _type: Default::default(),
107 })
108 }
109
110 pub fn dir(
112 &mut self,
113 path: PathBuf,
114 ) -> Result<WatchRequest<'_, DirectoryEvents>, RequestError> {
115 if !path.exists() {
118 return Err(RequestError::DoesNotExist(path));
119 }
120 if !path.is_dir() {
121 return Err(RequestError::IncorrectType(path));
122 }
123
124 Ok(WatchRequest {
125 handle: self,
126 path,
127 buffer: DirectoryEvents::DEFAULT_BUFFER,
128 flags: AddWatchFlags::empty(),
129 _type: Default::default(),
130 })
131 }
132}
133
134mod sealed {
135 pub trait Sealed {}
136}
137
138pub trait WatchType: sealed::Sealed {
139 const DEFAULT_BUFFER: usize;
140}
141
142pub enum FileEvents {}
143pub enum DirectoryEvents {}
144
145impl sealed::Sealed for FileEvents {}
146impl sealed::Sealed for DirectoryEvents {}
147
148impl WatchType for FileEvents {
149 const DEFAULT_BUFFER: usize = 16;
150}
151
152impl WatchType for DirectoryEvents {
153 const DEFAULT_BUFFER: usize = 32;
154}
155
156pub struct WatchRequest<'handle, T: WatchType> {
158 handle: &'handle mut Handle,
159 path: PathBuf,
160 buffer: usize,
161 flags: AddWatchFlags,
162 _type: PhantomData<T>,
163}
164
165impl<T: WatchType> WatchRequest<'_, T> {
167 pub fn buffer(mut self, size: usize) -> Self {
171 self.buffer = size;
172 self
173 }
174
175 pub fn read(mut self, set: bool) -> Self {
177 self.flags.set(AddWatchFlags::IN_ACCESS, set);
178 self
179 }
180
181 pub fn modify(mut self, set: bool) -> Self {
183 self.flags.set(AddWatchFlags::IN_MODIFY, set);
184 self
185 }
186
187 pub fn open(mut self, set: bool) -> Self {
189 self.flags.set(AddWatchFlags::IN_OPEN, set);
190 self
191 }
192
193 pub fn close(mut self, set: bool) -> Self {
195 self.flags.set(AddWatchFlags::IN_CLOSE, set);
196 self
197 }
198
199 }
202
203impl<'handle> WatchRequest<'handle, FileEvents> {
205 pub async fn next(self) -> Result<FileWatchFuture, WatchError> {
209 let (sender, rx) = tokio::sync::oneshot::channel();
210
211 let sender = crate::task::Sender::Once(sender);
212
213 let (setup_tx, setup_rx) = tokio::sync::oneshot::channel();
214
215 self.handle
216 .request_tx
217 .try_send(WatchRequestInner::Start {
218 flags: self.flags,
219 path: self.path,
220 dir: false,
221 sender,
222 watch_token_tx: setup_tx,
223 })
224 .map_err(|_| WatchError::WatcherShutdown)?;
225
226 let watch_token = setup_rx.await.map_err(|_| WatchError::WatcherShutdown)?;
227
228 Ok(FileWatchFuture {
229 inner: rx,
230 watch_token,
231 closed: false,
232 handle: self.handle.clone(),
233 })
234 }
235
236 pub async fn watch(self) -> Result<FileWatchStream, WatchError> {
240 let (sender, rx) = tokio::sync::mpsc::channel(self.buffer);
241
242 let sender = crate::task::Sender::Stream(sender);
243
244 let (setup_tx, setup_rx) = tokio::sync::oneshot::channel();
245
246 self.handle
247 .request_tx
248 .try_send(WatchRequestInner::Start {
249 flags: self.flags,
250 path: self.path,
251 dir: false,
252 sender,
253 watch_token_tx: setup_tx,
254 })
255 .map_err(|_| WatchError::WatcherShutdown)?;
256
257 let watch_token = setup_rx.await.map_err(|_| WatchError::WatcherShutdown)?;
258
259 Ok(FileWatchStream {
260 inner: ReceiverStream::from(rx),
261 watch_token,
262 handle: self.handle.clone(),
263 })
264 }
265}
266
267impl<'handle> WatchRequest<'handle, DirectoryEvents> {
269 pub async fn next(self) -> Result<DirectoryWatchFuture, WatchError> {
273 let (sender, rx) = tokio::sync::oneshot::channel();
274
275 let sender = crate::task::Sender::Once(sender);
276
277 let (setup_tx, setup_rx) = tokio::sync::oneshot::channel();
278
279 self.handle
280 .request_tx
281 .try_send(WatchRequestInner::Start {
282 flags: self.flags,
283 path: self.path,
284 dir: true,
285 sender,
286 watch_token_tx: setup_tx,
287 })
288 .map_err(|_| WatchError::WatcherShutdown)?;
289
290 let watch_token = setup_rx.await.map_err(|_| WatchError::WatcherShutdown)?;
291
292 Ok(DirectoryWatchFuture {
293 inner: rx,
294 watch_token,
295 handle: self.handle.clone(),
296 closed: false,
297 })
298 }
299
300 pub async fn watch(self) -> Result<DirectoryWatchStream, WatchError> {
304 let (sender, rx) = tokio::sync::mpsc::channel(self.buffer);
305
306 let sender = crate::task::Sender::Stream(sender);
307
308 let (setup_tx, setup_rx) = tokio::sync::oneshot::channel();
309
310 self.handle
311 .request_tx
312 .try_send(WatchRequestInner::Start {
313 flags: self.flags,
314 path: self.path,
315 dir: true,
316 sender,
317 watch_token_tx: setup_tx,
318 })
319 .map_err(|_| WatchError::WatcherShutdown)?;
320
321 let watch_token = setup_rx.await.map_err(|_| WatchError::WatcherShutdown)?;
322
323 Ok(DirectoryWatchStream {
324 inner: ReceiverStream::from(rx),
325 watch_token,
326 handle: self.handle.clone(),
327 })
328 }
329}