1#![allow(
3 clippy::non_send_fields_in_send_ty,
4 clippy::cast_possible_wrap,
5 clippy::borrow_interior_mutable_const,
6 clippy::module_name_repetitions
7)]
8
9use std::ffi::{c_void, CStr, OsStr};
10use std::fmt::{Display, Formatter};
11use std::io;
12use std::os::raw::c_char;
13use std::os::unix::ffi::OsStrExt;
14use std::panic::catch_unwind;
15use std::path::{Path, PathBuf};
16use std::pin::Pin;
17use std::sync::mpsc::channel;
18use std::task::{Context, Poll};
19use std::thread;
20use std::time::Duration;
21
22#[cfg(feature = "async-std")]
23use async_std1 as async_std;
24use core_foundation::array::CFArray;
25use core_foundation::base::{CFIndex, FromVoid};
26use core_foundation::dictionary::CFDictionary;
27use core_foundation::number::CFNumber;
28use core_foundation::runloop::{kCFRunLoopBeforeWaiting, kCFRunLoopDefaultMode, CFRunLoop};
29use core_foundation::string::CFString;
30use futures_core::Stream;
31use futures_util::stream::{iter, StreamExt};
32use log::{debug, error};
33#[cfg(feature = "tokio")]
34use tokio1 as tokio;
35#[cfg(feature = "tokio")]
36use tokio_stream::wrappers::ReceiverStream;
37
38use crate::ffi::{
39 kFSEventStreamCreateFlagFileEvents, kFSEventStreamCreateFlagUseCFTypes,
40 kFSEventStreamCreateFlagUseExtendedData, kFSEventStreamEventExtendedDataPathKey,
41 kFSEventStreamEventExtendedFileIDKey, CFRunLoopExt, FSEventStreamCreateFlags,
42 FSEventStreamEventFlags, FSEventStreamEventId, SysFSEventStream, SysFSEventStreamContext,
43 SysFSEventStreamRef,
44};
45pub use crate::flags::StreamFlags;
46use crate::impl_release_callback;
47use crate::observer::create_oneshot_observer;
48use crate::utils::FlagsExt;
49
50#[cfg(test)]
51pub(crate) static TEST_RUNNING_RUNLOOP_COUNT: std::sync::atomic::AtomicUsize =
52 std::sync::atomic::AtomicUsize::new(0);
53
54pub struct EventStreamHandler {
62 runloop: Option<(CFRunLoop, thread::JoinHandle<()>)>,
63}
64
65unsafe impl Send for EventStreamHandler {}
69
70impl EventStreamHandler {
71 pub fn abort(&mut self) {
76 if let Some((runloop, thread_handle)) = self.runloop.take() {
77 let (tx, rx) = channel();
78 let observer = create_oneshot_observer(kCFRunLoopBeforeWaiting, tx);
79 runloop.add_observer(&observer, unsafe { kCFRunLoopDefaultMode });
80
81 if !runloop.is_waiting() {
82 rx.recv().expect("channel to receive BeforeWaiting signal");
84 }
85
86 runloop.remove_observer(&observer, unsafe { kCFRunLoopDefaultMode });
87 runloop.stop();
88
89 thread_handle.join().expect("thread to shut down");
91 }
92 }
93}
94
95#[derive(Debug, Clone, Eq, PartialEq, Hash)]
97pub struct Event {
98 pub path: PathBuf,
99 pub inode: Option<i64>,
100 pub flags: StreamFlags,
101 pub raw_flags: FSEventStreamEventFlags,
102 pub id: FSEventStreamEventId,
103}
104
105impl Display for Event {
106 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
107 write!(
108 f,
109 "[{}] path: {:?}({}), flags: {} ({:x})",
110 self.id,
111 self.path,
112 self.inode.unwrap_or(-1),
113 self.flags,
114 self.raw_flags
115 )
116 }
117}
118
119pub struct EventStream {
126 #[cfg(feature = "tokio")]
127 stream: ReceiverStream<Vec<Event>>,
128 #[cfg(feature = "async-std")]
129 stream: async_std::channel::Receiver<Vec<Event>>,
130}
131
132impl EventStream {
133 pub fn into_flatten(self) -> impl Stream<Item = Event> {
135 self.flat_map(iter)
136 }
137}
138
139impl Stream for EventStream {
140 type Item = Vec<Event>;
141
142 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
143 self.stream.poll_next_unpin(cx)
144 }
145}
146
147pub(crate) struct StreamContextInfo {
148 #[cfg(feature = "tokio")]
149 event_handler: tokio::sync::mpsc::Sender<Vec<Event>>,
150 #[cfg(feature = "async-std")]
151 event_handler: async_std::channel::Sender<Vec<Event>>,
152}
153
154impl_release_callback!(release_context, StreamContextInfo);
155
156struct SendWrapper<T>(T);
157
158unsafe impl<T> Send for SendWrapper<T> {}
159
160impl<T> SendWrapper<T> {
161 const unsafe fn new(t: T) -> Self {
162 Self(t)
163 }
164}
165
166pub fn create_event_stream<P: AsRef<Path>>(
174 paths_to_watch: impl IntoIterator<Item = P>,
175 since_when: FSEventStreamEventId,
176 latency: Duration,
177 flags: FSEventStreamCreateFlags,
178) -> io::Result<(EventStream, EventStreamHandler)> {
179 if flags.contains(kFSEventStreamCreateFlagUseExtendedData)
180 && !flags.contains(kFSEventStreamCreateFlagUseCFTypes)
181 {
182 panic!("UseExtendedData requires UseCFTypes");
183 }
184
185 #[cfg(feature = "tokio")]
186 let (event_tx, event_rx) = tokio::sync::mpsc::channel(1024);
187 #[cfg(feature = "async-std")]
188 let (event_tx, event_rx) = async_std::channel::bounded(1024);
189
190 let context = StreamContextInfo {
195 event_handler: event_tx,
196 };
197
198 let stream_context = SysFSEventStreamContext::new(context, release_context);
199
200 let callback = if flags.contains(kFSEventStreamCreateFlagUseCFTypes) {
201 if flags.contains(kFSEventStreamCreateFlagUseExtendedData) {
202 if flags.contains(kFSEventStreamCreateFlagFileEvents) {
203 cf_ext_with_id_callback
204 } else {
205 cf_ext_callback
206 }
207 } else {
208 cf_callback
209 }
210 } else {
211 normal_callback
212 };
213
214 let mut stream = SysFSEventStream::new(
215 callback,
216 &stream_context,
217 paths_to_watch,
218 since_when,
219 latency,
220 flags,
221 )?;
222
223 let (runloop_tx, runloop_rx) = channel();
225
226 let thread_handle = thread::spawn(move || {
227 #[cfg(test)]
228 TEST_RUNNING_RUNLOOP_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
229
230 let current_runloop = CFRunLoop::get_current();
231
232 stream.schedule(¤t_runloop, unsafe { kCFRunLoopDefaultMode });
233 stream.start();
234
235 runloop_tx
240 .send(unsafe { SendWrapper::new(current_runloop) })
241 .expect("send runloop to stream");
242
243 CFRunLoop::run_current();
244 stream.stop();
245 stream.invalidate();
246
247 #[cfg(test)]
248 TEST_RUNNING_RUNLOOP_COUNT.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
249 });
250
251 #[cfg(feature = "tokio")]
252 let stream = ReceiverStream::new(event_rx);
253 #[cfg(feature = "async-std")]
254 let stream = event_rx;
255 Ok((
256 EventStream { stream },
257 EventStreamHandler {
258 runloop: Some((
259 runloop_rx.recv().expect("receive runloop from worker").0,
260 thread_handle,
261 )),
262 },
263 ))
264}
265
266enum CallbackError {
267 ToI64,
268 ParseFlags,
269}
270
271macro_rules! define_callback {
272 ($name: ident, ($num: ident, $paths: ident, $flags: ident, $ids: ident)$body: block) => {
273 extern "C" fn $name(
274 stream_ref: SysFSEventStreamRef,
275 info: *mut c_void,
276 num_events: usize, event_paths: *mut c_void, event_flags: *const FSEventStreamEventFlags, event_ids: *const FSEventStreamEventId, ) {
281 fn callback_impl(
282 _stream_ref: SysFSEventStreamRef,
283 info: *mut c_void,
284 num_events: usize, event_paths: *mut c_void, event_flags: *const FSEventStreamEventFlags, event_ids: *const FSEventStreamEventId, ) {
289 fn event_iter(
290 $num: usize,
291 $paths: *mut c_void,
292 $flags: *const FSEventStreamEventFlags,
293 $ids: *const FSEventStreamEventId,
294 ) -> impl Iterator<Item = Result<Event, CallbackError>> {
295 $body
296 }
297
298 debug!("Received {} event(s)", num_events);
299
300 let info = info as *const StreamContextInfo;
301 let event_handler = unsafe { &(*info).event_handler };
302
303 let events = event_iter(num_events, event_paths, event_flags, event_ids)
304 .filter_map(|event| {
305 if let Err(e) = &event {
306 match e {
307 CallbackError::ToI64 => {
308 error!("Unable to convert inode field to i64")
309 }
310 CallbackError::ParseFlags => error!("Unable to parse flags"),
311 }
312 }
313 event.ok()
314 })
315 .collect();
316
317 if let Err(e) = event_handler.try_send(events) {
318 error!("Unable to send event from callback: {}", e);
319 }
320 }
321
322 drop(catch_unwind(move || {
323 callback_impl(
324 stream_ref,
325 info,
326 num_events,
327 event_paths,
328 event_flags,
329 event_ids,
330 );
331 }));
332 }
333 };
334}
335
336define_callback!(cf_ext_with_id_callback, (num, paths, flags, ids){
337 let paths = unsafe { CFArray::<CFDictionary<CFString>>::from_void(paths) };
338 (0..num).map(move |idx| {
339 Ok((
340 unsafe { paths.get_unchecked(idx as CFIndex) },
341 unsafe { *flags.add(idx) },
342 unsafe { *ids.add(idx) },
343 ))
344 .and_then(|(dict, flags, id)| {
345 Ok(Event {
346 path: PathBuf::from(
347 (*unsafe {
348 CFString::from_void(*dict.get(&*kFSEventStreamEventExtendedDataPathKey),)
349 })
350 .to_string(),
351 ),
352 inode: Some(
353 unsafe {CFNumber::from_void(*dict.get(&*kFSEventStreamEventExtendedFileIDKey))}
354 .to_i64()
355 .ok_or(CallbackError::ToI64)?,
356 ),
357 flags: StreamFlags::from_bits(flags).ok_or(CallbackError::ParseFlags)?,
358 raw_flags: flags,
359 id,
360 })
361 })
362 })
363});
364
365define_callback!(cf_ext_callback, (num, paths, flags, ids){
366 let paths = unsafe { CFArray::<CFDictionary<CFString>>::from_void(paths) };
367 (0..num).map(move |idx| {
368 Ok((
369 unsafe { paths.get_unchecked(idx as CFIndex) },
370 unsafe { *flags.add(idx) },
371 unsafe { *ids.add(idx) },
372 ))
373 .and_then(|(dict, flags, id)| {
374 Ok(Event {
375 path: PathBuf::from(
376 (*unsafe {
377 CFString::from_void(*dict.get(&*kFSEventStreamEventExtendedDataPathKey),)
378 })
379 .to_string(),
380 ),
381 inode: None,
382 flags: StreamFlags::from_bits(flags).ok_or(CallbackError::ParseFlags)?,
383 raw_flags: flags,
384 id,
385 })
386 })
387 })
388});
389
390define_callback!(cf_callback, (num, paths, flags, ids){
391 let paths = unsafe { CFArray::<CFString>::from_void(paths) };
392 (0..num).map(move |idx| {
393 Ok((
394 unsafe { paths.get_unchecked(idx as CFIndex) },
395 unsafe { *flags.add(idx) },
396 unsafe { *ids.add(idx) },
397 ))
398 .and_then(|(path, flags, id)| {
399 Ok(Event {
400 path: PathBuf::from((*path).to_string()),
401 inode: None,
402 flags: StreamFlags::from_bits(flags)
403 .ok_or(CallbackError::ParseFlags)?,
404 raw_flags: flags,
405 id,
406 })
407 })
408 })
409});
410
411define_callback!(normal_callback, (num, paths, flags, ids){
412 let paths = paths as *const *const c_char;
413 (0..num).map(move |idx| {
414 Ok((
415 unsafe { *paths.add(idx) },
416 unsafe { *flags.add(idx) },
417 unsafe { *ids.add(idx) },
418 ))
419 .and_then(|(path, flags, id)| {
420 Ok(Event {
421 path: PathBuf::from(
422 OsStr::from_bytes(unsafe { CStr::from_ptr(path) }.to_bytes())
423 .to_os_string(),
424 ),
425 inode: None,
426 flags: StreamFlags::from_bits(flags).ok_or(CallbackError::ParseFlags)?,
427 raw_flags: flags,
428 id,
429 })
430 })
431 })
432});