fsevent_stream/
stream.rs

1//! Stream-based `FSEvents` interface.
2#![allow(
3    clippy::non_send_fields_in_send_ty,
4    clippy::cast_possible_wrap,
5    clippy::borrow_interior_mutable_const,
6    clippy::module_name_repetitions
7)]
8
9use std::ffi::{c_void, CStr, OsStr};
10use std::fmt::{Display, Formatter};
11use std::io;
12use std::os::raw::c_char;
13use std::os::unix::ffi::OsStrExt;
14use std::panic::catch_unwind;
15use std::path::{Path, PathBuf};
16use std::pin::Pin;
17use std::sync::mpsc::channel;
18use std::task::{Context, Poll};
19use std::thread;
20use std::time::Duration;
21
22#[cfg(feature = "async-std")]
23use async_std1 as async_std;
24use core_foundation::array::CFArray;
25use core_foundation::base::{CFIndex, FromVoid};
26use core_foundation::dictionary::CFDictionary;
27use core_foundation::number::CFNumber;
28use core_foundation::runloop::{kCFRunLoopBeforeWaiting, kCFRunLoopDefaultMode, CFRunLoop};
29use core_foundation::string::CFString;
30use futures_core::Stream;
31use futures_util::stream::{iter, StreamExt};
32use log::{debug, error};
33#[cfg(feature = "tokio")]
34use tokio1 as tokio;
35#[cfg(feature = "tokio")]
36use tokio_stream::wrappers::ReceiverStream;
37
38use crate::ffi::{
39    kFSEventStreamCreateFlagFileEvents, kFSEventStreamCreateFlagUseCFTypes,
40    kFSEventStreamCreateFlagUseExtendedData, kFSEventStreamEventExtendedDataPathKey,
41    kFSEventStreamEventExtendedFileIDKey, CFRunLoopExt, FSEventStreamCreateFlags,
42    FSEventStreamEventFlags, FSEventStreamEventId, SysFSEventStream, SysFSEventStreamContext,
43    SysFSEventStreamRef,
44};
45pub use crate::flags::StreamFlags;
46use crate::impl_release_callback;
47use crate::observer::create_oneshot_observer;
48use crate::utils::FlagsExt;
49
50#[cfg(test)]
51pub(crate) static TEST_RUNNING_RUNLOOP_COUNT: std::sync::atomic::AtomicUsize =
52    std::sync::atomic::AtomicUsize::new(0);
53
54/// An owned permission to stop an [`EventStream`](EventStream) and terminate its backing `RunLoop`.
55///
56/// A `EventStreamHandler` *detaches* the associated Stream and `RunLoop` when it is dropped, which
57/// means that there is no longer any handle to them and no way to `abort` them.
58///
59/// Dropping the handler without first calling [`abort`](EventStreamHandler::abort) is not
60/// recommended because this leaves a spawned thread behind and causes memory leaks.
61pub struct EventStreamHandler {
62    runloop: Option<(CFRunLoop, thread::JoinHandle<()>)>,
63}
64
65// Safety:
66// - According to the Apple documentation, it's safe to move `CFRef`s across threads.
67//   https://developer.apple.com/library/archive/documentation/Cocoa/Conceptual/Multithreading/ThreadSafetySummary/ThreadSafetySummary.html
68unsafe impl Send for EventStreamHandler {}
69
70impl EventStreamHandler {
71    /// Stop an [`EventStream`](EventStream) and terminate its backing `RunLoop`.
72    ///
73    /// Calling this method multiple times has no extra effect and won't cause any panic, error,
74    /// or undefined behavior.
75    pub fn abort(&mut self) {
76        if let Some((runloop, thread_handle)) = self.runloop.take() {
77            let (tx, rx) = channel();
78            let observer = create_oneshot_observer(kCFRunLoopBeforeWaiting, tx);
79            runloop.add_observer(&observer, unsafe { kCFRunLoopDefaultMode });
80
81            if !runloop.is_waiting() {
82                // Wait the RunLoop to enter Waiting state.
83                rx.recv().expect("channel to receive BeforeWaiting signal");
84            }
85
86            runloop.remove_observer(&observer, unsafe { kCFRunLoopDefaultMode });
87            runloop.stop();
88
89            // Wait for the thread to shut down.
90            thread_handle.join().expect("thread to shut down");
91        }
92    }
93}
94
95/// An `FSEvents` API event.
96#[derive(Debug, Clone, Eq, PartialEq, Hash)]
97pub struct Event {
98    pub path: PathBuf,
99    pub inode: Option<i64>,
100    pub flags: StreamFlags,
101    pub raw_flags: FSEventStreamEventFlags,
102    pub id: FSEventStreamEventId,
103}
104
105impl Display for Event {
106    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
107        write!(
108            f,
109            "[{}] path: {:?}({}), flags: {} ({:x})",
110            self.id,
111            self.path,
112            self.inode.unwrap_or(-1),
113            self.flags,
114            self.raw_flags
115        )
116    }
117}
118
119/// A stream of `FSEvents` API event batches.
120///
121/// You may want a stream of [`Event`](Event) instead of a stream of batches of it.
122/// Call [`EventStream::into_flatten`](EventStream::into_flatten) to get one.
123///
124/// Call [`create_event_stream`](create_event_stream) to create it.
125pub struct EventStream {
126    #[cfg(feature = "tokio")]
127    stream: ReceiverStream<Vec<Event>>,
128    #[cfg(feature = "async-std")]
129    stream: async_std::channel::Receiver<Vec<Event>>,
130}
131
132impl EventStream {
133    /// Flatten event batches and produce a stream of [`Event`](Event).
134    pub fn into_flatten(self) -> impl Stream<Item = Event> {
135        self.flat_map(iter)
136    }
137}
138
139impl Stream for EventStream {
140    type Item = Vec<Event>;
141
142    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
143        self.stream.poll_next_unpin(cx)
144    }
145}
146
147pub(crate) struct StreamContextInfo {
148    #[cfg(feature = "tokio")]
149    event_handler: tokio::sync::mpsc::Sender<Vec<Event>>,
150    #[cfg(feature = "async-std")]
151    event_handler: async_std::channel::Sender<Vec<Event>>,
152}
153
154impl_release_callback!(release_context, StreamContextInfo);
155
156struct SendWrapper<T>(T);
157
158unsafe impl<T> Send for SendWrapper<T> {}
159
160impl<T> SendWrapper<T> {
161    const unsafe fn new(t: T) -> Self {
162        Self(t)
163    }
164}
165
166/// Create a new [`EventStream`](EventStream) and [`EventStreamHandler`](EventStreamHandler) pair.
167///
168/// # Errors
169/// Return error when there's any invalid path in `paths_to_watch`.
170///
171/// # Panics
172/// Panic when the given flags combination is illegal.
173pub fn create_event_stream<P: AsRef<Path>>(
174    paths_to_watch: impl IntoIterator<Item = P>,
175    since_when: FSEventStreamEventId,
176    latency: Duration,
177    flags: FSEventStreamCreateFlags,
178) -> io::Result<(EventStream, EventStreamHandler)> {
179    if flags.contains(kFSEventStreamCreateFlagUseExtendedData)
180        && !flags.contains(kFSEventStreamCreateFlagUseCFTypes)
181    {
182        panic!("UseExtendedData requires UseCFTypes");
183    }
184
185    #[cfg(feature = "tokio")]
186    let (event_tx, event_rx) = tokio::sync::mpsc::channel(1024);
187    #[cfg(feature = "async-std")]
188    let (event_tx, event_rx) = async_std::channel::bounded(1024);
189
190    // We need to associate the stream context with our callback in order to propagate events
191    // to the rest of the system. This will be owned by the stream, and will be freed when the
192    // stream is closed. This means we will leak the context if we panic before reacing
193    // `FSEventStreamRelease`.
194    let context = StreamContextInfo {
195        event_handler: event_tx,
196    };
197
198    let stream_context = SysFSEventStreamContext::new(context, release_context);
199
200    let callback = if flags.contains(kFSEventStreamCreateFlagUseCFTypes) {
201        if flags.contains(kFSEventStreamCreateFlagUseExtendedData) {
202            if flags.contains(kFSEventStreamCreateFlagFileEvents) {
203                cf_ext_with_id_callback
204            } else {
205                cf_ext_callback
206            }
207        } else {
208            cf_callback
209        }
210    } else {
211        normal_callback
212    };
213
214    let mut stream = SysFSEventStream::new(
215        callback,
216        &stream_context,
217        paths_to_watch,
218        since_when,
219        latency,
220        flags,
221    )?;
222
223    // channel to pass runloop around
224    let (runloop_tx, runloop_rx) = channel();
225
226    let thread_handle = thread::spawn(move || {
227        #[cfg(test)]
228        TEST_RUNNING_RUNLOOP_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
229
230        let current_runloop = CFRunLoop::get_current();
231
232        stream.schedule(&current_runloop, unsafe { kCFRunLoopDefaultMode });
233        stream.start();
234
235        // the calling to CFRunLoopRun will be terminated by CFRunLoopStop call in drop()
236        // Safety:
237        // - According to the Apple documentation, it's safe to move `CFRef`s across threads.
238        //   https://developer.apple.com/library/archive/documentation/Cocoa/Conceptual/Multithreading/ThreadSafetySummary/ThreadSafetySummary.html
239        runloop_tx
240            .send(unsafe { SendWrapper::new(current_runloop) })
241            .expect("send runloop to stream");
242
243        CFRunLoop::run_current();
244        stream.stop();
245        stream.invalidate();
246
247        #[cfg(test)]
248        TEST_RUNNING_RUNLOOP_COUNT.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
249    });
250
251    #[cfg(feature = "tokio")]
252    let stream = ReceiverStream::new(event_rx);
253    #[cfg(feature = "async-std")]
254    let stream = event_rx;
255    Ok((
256        EventStream { stream },
257        EventStreamHandler {
258            runloop: Some((
259                runloop_rx.recv().expect("receive runloop from worker").0,
260                thread_handle,
261            )),
262        },
263    ))
264}
265
266enum CallbackError {
267    ToI64,
268    ParseFlags,
269}
270
271macro_rules! define_callback {
272    ($name: ident, ($num: ident, $paths: ident, $flags: ident, $ids: ident)$body: block) => {
273        extern "C" fn $name(
274            stream_ref: SysFSEventStreamRef,
275            info: *mut c_void,
276            num_events: usize,                           // size_t numEvents
277            event_paths: *mut c_void,                    // void *eventPaths
278            event_flags: *const FSEventStreamEventFlags, // const FSEventStreamEventFlags eventFlags[]
279            event_ids: *const FSEventStreamEventId,      // const FSEventStreamEventId eventIds[]
280        ) {
281            fn callback_impl(
282                _stream_ref: SysFSEventStreamRef,
283                info: *mut c_void,
284                num_events: usize,                           // size_t numEvents
285                event_paths: *mut c_void,                    // void *eventPaths
286                event_flags: *const FSEventStreamEventFlags, // const FSEventStreamEventFlags eventFlags[]
287                event_ids: *const FSEventStreamEventId, // const FSEventStreamEventId eventIds[]
288            ) {
289                fn event_iter(
290                    $num: usize,
291                    $paths: *mut c_void,
292                    $flags: *const FSEventStreamEventFlags,
293                    $ids: *const FSEventStreamEventId,
294                ) -> impl Iterator<Item = Result<Event, CallbackError>> {
295                    $body
296                }
297
298                debug!("Received {} event(s)", num_events);
299
300                let info = info as *const StreamContextInfo;
301                let event_handler = unsafe { &(*info).event_handler };
302
303                let events = event_iter(num_events, event_paths, event_flags, event_ids)
304                    .filter_map(|event| {
305                        if let Err(e) = &event {
306                            match e {
307                                CallbackError::ToI64 => {
308                                    error!("Unable to convert inode field to i64")
309                                }
310                                CallbackError::ParseFlags => error!("Unable to parse flags"),
311                            }
312                        }
313                        event.ok()
314                    })
315                    .collect();
316
317                if let Err(e) = event_handler.try_send(events) {
318                    error!("Unable to send event from callback: {}", e);
319                }
320            }
321
322            drop(catch_unwind(move || {
323                callback_impl(
324                    stream_ref,
325                    info,
326                    num_events,
327                    event_paths,
328                    event_flags,
329                    event_ids,
330                );
331            }));
332        }
333    };
334}
335
336define_callback!(cf_ext_with_id_callback, (num, paths, flags, ids){
337    let paths = unsafe { CFArray::<CFDictionary<CFString>>::from_void(paths) };
338    (0..num).map(move |idx| {
339        Ok((
340            unsafe { paths.get_unchecked(idx as CFIndex) },
341            unsafe { *flags.add(idx) },
342            unsafe { *ids.add(idx) },
343        ))
344        .and_then(|(dict, flags, id)| {
345            Ok(Event {
346                path: PathBuf::from(
347                    (*unsafe {
348                        CFString::from_void(*dict.get(&*kFSEventStreamEventExtendedDataPathKey),)
349                    })
350                        .to_string(),
351                ),
352                inode: Some(
353                    unsafe {CFNumber::from_void(*dict.get(&*kFSEventStreamEventExtendedFileIDKey))}
354                        .to_i64()
355                        .ok_or(CallbackError::ToI64)?,
356                ),
357                flags: StreamFlags::from_bits(flags).ok_or(CallbackError::ParseFlags)?,
358                raw_flags: flags,
359                id,
360            })
361        })
362    })
363});
364
365define_callback!(cf_ext_callback, (num, paths, flags, ids){
366    let paths = unsafe { CFArray::<CFDictionary<CFString>>::from_void(paths) };
367    (0..num).map(move |idx| {
368        Ok((
369            unsafe { paths.get_unchecked(idx as CFIndex) },
370            unsafe { *flags.add(idx) },
371            unsafe { *ids.add(idx) },
372        ))
373        .and_then(|(dict, flags, id)| {
374            Ok(Event {
375                path: PathBuf::from(
376                    (*unsafe {
377                        CFString::from_void(*dict.get(&*kFSEventStreamEventExtendedDataPathKey),)
378                    })
379                        .to_string(),
380                ),
381                inode: None,
382                flags: StreamFlags::from_bits(flags).ok_or(CallbackError::ParseFlags)?,
383                raw_flags: flags,
384                id,
385            })
386        })
387    })
388});
389
390define_callback!(cf_callback, (num, paths, flags, ids){
391    let paths = unsafe { CFArray::<CFString>::from_void(paths) };
392    (0..num).map(move |idx| {
393        Ok((
394            unsafe { paths.get_unchecked(idx as CFIndex) },
395            unsafe { *flags.add(idx) },
396            unsafe { *ids.add(idx) },
397        ))
398            .and_then(|(path, flags, id)| {
399                Ok(Event {
400                    path: PathBuf::from((*path).to_string()),
401                    inode: None,
402                    flags: StreamFlags::from_bits(flags)
403                        .ok_or(CallbackError::ParseFlags)?,
404                    raw_flags: flags,
405                    id,
406                })
407            })
408    })
409});
410
411define_callback!(normal_callback, (num, paths, flags, ids){
412    let paths = paths as *const *const c_char;
413    (0..num).map(move |idx| {
414        Ok((
415            unsafe { *paths.add(idx) },
416            unsafe { *flags.add(idx) },
417            unsafe { *ids.add(idx) },
418        ))
419        .and_then(|(path, flags, id)| {
420            Ok(Event {
421                path: PathBuf::from(
422                    OsStr::from_bytes(unsafe { CStr::from_ptr(path) }.to_bytes())
423                        .to_os_string(),
424                ),
425                inode: None,
426                flags: StreamFlags::from_bits(flags).ok_or(CallbackError::ParseFlags)?,
427                raw_flags: flags,
428                id,
429            })
430        })
431    })
432});