1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
//! Stream-based `FSEvents` interface.
#![allow(
    clippy::non_send_fields_in_send_ty,
    clippy::cast_possible_wrap,
    clippy::borrow_interior_mutable_const,
    clippy::module_name_repetitions
)]

use std::ffi::{c_void, CStr, OsStr};
use std::io;
use std::os::raw::c_char;
use std::os::unix::ffi::OsStrExt;
use std::panic::catch_unwind;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::mpsc::channel;
use std::task::{Context, Poll};
use std::thread;
use std::time::Duration;

#[cfg(feature = "async-std")]
use async_std1 as async_std;
use core_foundation::array::CFArray;
use core_foundation::base::{CFIndex, FromVoid};
use core_foundation::dictionary::CFDictionary;
use core_foundation::number::CFNumber;
use core_foundation::runloop::{kCFRunLoopBeforeWaiting, kCFRunLoopDefaultMode, CFRunLoop};
use core_foundation::string::CFString;
use either::Either;
use futures_core::Stream;
use futures_util::StreamExt;
use log::{debug, error};
#[cfg(feature = "tokio")]
use tokio1 as tokio;
#[cfg(feature = "tokio")]
use tokio_stream::wrappers::ReceiverStream;

use crate::ffi::{
    kFSEventStreamCreateFlagFileEvents, kFSEventStreamCreateFlagUseCFTypes,
    kFSEventStreamCreateFlagUseExtendedData, kFSEventStreamEventExtendedDataPathKey,
    kFSEventStreamEventExtendedFileIDKey, CFRunLoopExt, FSEventStreamCreateFlags,
    FSEventStreamEventFlags, FSEventStreamEventId, SysFSEventStream, SysFSEventStreamContext,
    SysFSEventStreamRef,
};
pub use crate::flags::StreamFlags;
use crate::impl_release_callback;
use crate::observer::create_oneshot_observer;
use crate::utils::FlagsExt;

#[cfg(test)]
pub(crate) static TEST_RUNNING_RUNLOOP_COUNT: std::sync::atomic::AtomicUsize =
    std::sync::atomic::AtomicUsize::new(0);

/// An owned permission to stop an [`EventStream`](EventStream) and terminate its backing `RunLoop`.
///
/// A `EventStreamHandler` *detaches* the associated Stream and `RunLoop` when it is dropped, which
/// means that there is no longer any handle to them and no way to `abort` them.
///
/// Dropping the handler without first calling [`abort`](EventStreamHandler::abort) is not
/// recommended because this leaves a spawned thread behind and causes memory leaks.
pub struct EventStreamHandler {
    runloop: Option<(CFRunLoop, thread::JoinHandle<()>)>,
}

// Safety:
// - According to the Apple documentation, it's safe to move `CFRef`s across threads.
//   https://developer.apple.com/library/archive/documentation/Cocoa/Conceptual/Multithreading/ThreadSafetySummary/ThreadSafetySummary.html
unsafe impl Send for EventStreamHandler {}

impl EventStreamHandler {
    /// Stop an [`EventStream`](EventStream) and terminate its backing `RunLoop`.
    ///
    /// Calling this method multiple times has no extra effect and won't cause any panic, error,
    /// or undefined behavior.
    pub fn abort(&mut self) {
        if let Some((runloop, thread_handle)) = self.runloop.take() {
            let (tx, rx) = channel();
            let observer = create_oneshot_observer(kCFRunLoopBeforeWaiting, tx);
            runloop.add_observer(&observer, unsafe { kCFRunLoopDefaultMode });

            if !runloop.is_waiting() {
                // Wait the RunLoop to enter Waiting state.
                rx.recv().expect("channel to receive BeforeWaiting signal");
            }

            runloop.remove_observer(&observer, unsafe { kCFRunLoopDefaultMode });
            runloop.stop();

            // Wait for the thread to shut down.
            thread_handle.join().expect("thread to shut down");
        }
    }
}

/// An `FSEvents` API event.
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct Event {
    pub path: PathBuf,
    pub inode: Option<i64>,
    pub flags: StreamFlags,
    pub raw_flags: FSEventStreamEventFlags,
    pub id: FSEventStreamEventId,
}

/// A stream of `FSEvents` API events.
///
/// Call [`create_event_stream`](create_event_stream) to create it.
pub struct EventStream {
    #[cfg(feature = "tokio")]
    stream: ReceiverStream<Event>,
    #[cfg(feature = "async-std")]
    stream: async_std::channel::Receiver<Event>,
}

impl Stream for EventStream {
    type Item = Event;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.stream.poll_next_unpin(cx)
    }
}

pub(crate) struct StreamContextInfo {
    #[cfg(feature = "tokio")]
    event_handler: tokio::sync::mpsc::Sender<Event>,
    #[cfg(feature = "async-std")]
    event_handler: async_std::channel::Sender<Event>,
    create_flags: FSEventStreamCreateFlags,
}

impl_release_callback!(release_context, StreamContextInfo);

struct SendWrapper<T>(T);

unsafe impl<T> Send for SendWrapper<T> {}

impl<T> SendWrapper<T> {
    const unsafe fn new(t: T) -> Self {
        Self(t)
    }
}

/// Create a new [`EventStream`](EventStream) and [`EventStreamHandler`](EventStreamHandler) pair.
///
/// # Errors
/// Return error when there's any invalid path in `paths_to_watch`.
///
/// # Panics
/// Panic when the given flags combination is illegal.
pub fn create_event_stream<P: AsRef<Path>>(
    paths_to_watch: impl IntoIterator<Item = P>,
    since_when: FSEventStreamEventId,
    latency: Duration,
    flags: FSEventStreamCreateFlags,
) -> io::Result<(EventStream, EventStreamHandler)> {
    if flags.contains(kFSEventStreamCreateFlagUseExtendedData)
        && !flags.contains(kFSEventStreamCreateFlagUseCFTypes)
    {
        panic!("UseExtendedData requires UseCFTypes");
    }

    #[cfg(feature = "tokio")]
    let (event_tx, event_rx) = tokio::sync::mpsc::channel(1024);
    #[cfg(feature = "async-std")]
    let (event_tx, event_rx) = async_std::channel::bounded(1024);

    // We need to associate the stream context with our callback in order to propagate events
    // to the rest of the system. This will be owned by the stream, and will be freed when the
    // stream is closed. This means we will leak the context if we panic before reacing
    // `FSEventStreamRelease`.
    let context = StreamContextInfo {
        event_handler: event_tx,
        create_flags: flags,
    };

    let stream_context = SysFSEventStreamContext::new(context, release_context);

    // We must append some additional flags because our callback parse them so
    let mut stream = SysFSEventStream::new(
        callback,
        &stream_context,
        paths_to_watch,
        since_when,
        latency,
        flags,
    )?;

    // channel to pass runloop around
    let (runloop_tx, runloop_rx) = channel();

    let thread_handle = thread::spawn(move || {
        #[cfg(test)]
        TEST_RUNNING_RUNLOOP_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);

        let current_runloop = CFRunLoop::get_current();

        stream.schedule(&current_runloop, unsafe { kCFRunLoopDefaultMode });
        stream.start();

        // the calling to CFRunLoopRun will be terminated by CFRunLoopStop call in drop()
        // Safety:
        // - According to the Apple documentation, it's safe to move `CFRef`s across threads.
        //   https://developer.apple.com/library/archive/documentation/Cocoa/Conceptual/Multithreading/ThreadSafetySummary/ThreadSafetySummary.html
        runloop_tx
            .send(unsafe { SendWrapper::new(current_runloop) })
            .expect("send runloop to stream");

        CFRunLoop::run_current();
        stream.stop();
        stream.invalidate();

        #[cfg(test)]
        TEST_RUNNING_RUNLOOP_COUNT.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
    });

    #[cfg(feature = "tokio")]
    let stream = ReceiverStream::new(event_rx);
    #[cfg(feature = "async-std")]
    let stream = event_rx;
    Ok((
        EventStream { stream },
        EventStreamHandler {
            runloop: Some((
                runloop_rx.recv().expect("receive runloop from worker").0,
                thread_handle,
            )),
        },
    ))
}

extern "C" fn callback(
    stream_ref: SysFSEventStreamRef,
    info: *mut c_void,
    num_events: usize,                           // size_t numEvents
    event_paths: *mut c_void,                    // void *eventPaths
    event_flags: *const FSEventStreamEventFlags, // const FSEventStreamEventFlags eventFlags[]
    event_ids: *const FSEventStreamEventId,      // const FSEventStreamEventId eventIds[]
) {
    drop(catch_unwind(move || {
        callback_impl(
            stream_ref,
            info,
            num_events,
            event_paths,
            event_flags,
            event_ids,
        );
    }));
}

enum CallbackError {
    ToI64,
    ParseFlags,
}

fn event_iter(
    create_flags: FSEventStreamCreateFlags,
    num: usize,
    paths: *mut c_void,
    flags: *const FSEventStreamEventFlags,
    ids: *const FSEventStreamEventId,
) -> impl Iterator<Item = Result<Event, CallbackError>> {
    if create_flags.contains(kFSEventStreamCreateFlagUseCFTypes) {
        Either::Left(
            if create_flags.contains(kFSEventStreamCreateFlagUseExtendedData) {
                // CFDict
                let paths = unsafe { CFArray::<CFDictionary<CFString>>::from_void(paths) };
                Either::Left((0..num).map(move |idx| {
                    Ok((
                        unsafe { paths.get_unchecked(idx as CFIndex) },
                        unsafe { *flags.add(idx) },
                        unsafe { *ids.add(idx) },
                    ))
                    .and_then(|(dict, flags, id)| {
                        if create_flags.contains(kFSEventStreamCreateFlagFileEvents) {
                            // DataPathKey & FileIDKey
                            Ok(Event {
                                path: PathBuf::from(
                                    (*unsafe {
                                        CFString::from_void(
                                            *dict.get(&*kFSEventStreamEventExtendedDataPathKey),
                                        )
                                    })
                                    .to_string(),
                                ),
                                inode: Some(
                                    unsafe {
                                        CFNumber::from_void(
                                            *dict.get(&*kFSEventStreamEventExtendedFileIDKey),
                                        )
                                    }
                                    .to_i64()
                                    .ok_or(CallbackError::ToI64)?,
                                ),
                                flags: StreamFlags::from_bits(flags)
                                    .ok_or(CallbackError::ParseFlags)?,
                                raw_flags: flags,
                                id,
                            })
                        } else {
                            // DataPathKey
                            Ok(Event {
                                path: PathBuf::from(
                                    (*unsafe {
                                        CFString::from_void(
                                            *dict.get(&*kFSEventStreamEventExtendedDataPathKey),
                                        )
                                    })
                                    .to_string(),
                                ),
                                inode: None,
                                flags: StreamFlags::from_bits(flags)
                                    .ok_or(CallbackError::ParseFlags)?,
                                raw_flags: flags,
                                id,
                            })
                        }
                    })
                }))
            } else {
                // CFString
                let paths = unsafe { CFArray::<CFString>::from_void(paths) };
                Either::Right((0..num).map(move |idx| {
                    Ok((
                        unsafe { paths.get_unchecked(idx as CFIndex) },
                        unsafe { *flags.add(idx) },
                        unsafe { *ids.add(idx) },
                    ))
                    .and_then(|(path, flags, id)| {
                        Ok(Event {
                            path: PathBuf::from((*path).to_string()),
                            inode: None,
                            flags: StreamFlags::from_bits(flags)
                                .ok_or(CallbackError::ParseFlags)?,
                            raw_flags: flags,
                            id,
                        })
                    })
                }))
            },
        )
    } else {
        // Normal types
        let paths = paths as *const *const c_char;
        Either::Right((0..num).map(move |idx| {
            Ok((
                unsafe { *paths.add(idx) },
                unsafe { *flags.add(idx) },
                unsafe { *ids.add(idx) },
            ))
            .and_then(|(path, flags, id)| {
                Ok(Event {
                    path: PathBuf::from(
                        OsStr::from_bytes(unsafe { CStr::from_ptr(path) }.to_bytes())
                            .to_os_string(),
                    ),
                    inode: None,
                    flags: StreamFlags::from_bits(flags).ok_or(CallbackError::ParseFlags)?,
                    raw_flags: flags,
                    id,
                })
            })
        }))
    }
}

fn callback_impl(
    _stream_ref: SysFSEventStreamRef,
    info: *mut c_void,
    num_events: usize,                           // size_t numEvents
    event_paths: *mut c_void,                    // void *eventPaths
    event_flags: *const FSEventStreamEventFlags, // const FSEventStreamEventFlags eventFlags[]
    event_ids: *const FSEventStreamEventId,      // const FSEventStreamEventId eventIds[]
) {
    debug!("Received {} event(s)", num_events);

    let info = info as *const StreamContextInfo;
    let create_flags = unsafe { &(*info).create_flags };
    let event_handler = unsafe { &(*info).event_handler };

    for event in event_iter(
        *create_flags,
        num_events,
        event_paths,
        event_flags,
        event_ids,
    ) {
        match event {
            Ok(event) => {
                if let Err(e) = event_handler.try_send(event) {
                    error!("Unable to send event from callback: {}", e);
                }
            }
            Err(CallbackError::ToI64) => error!("Unable to convert inode field to i64"),
            Err(CallbackError::ParseFlags) => error!("Unable to parse flags"),
        }
    }
}