cardinal_sdk/
event_stream.rs

1use crate::FsEvent;
2use crossbeam_channel::{Receiver, Sender, bounded, unbounded};
3use dispatch2::{DispatchQueue, DispatchQueueAttr, DispatchRetained};
4use libc::dev_t;
5use objc2_core_foundation::{CFArray, CFString, CFTimeInterval};
6use objc2_core_services::{
7    ConstFSEventStreamRef, FSEventStreamContext, FSEventStreamCreate, FSEventStreamEventFlags,
8    FSEventStreamEventId, FSEventStreamGetDeviceBeingWatched, FSEventStreamInvalidate,
9    FSEventStreamRef, FSEventStreamRelease, FSEventStreamSetDispatchQueue, FSEventStreamStart,
10    FSEventStreamStop, kFSEventStreamCreateFlagFileEvents, kFSEventStreamCreateFlagNoDefer,
11    kFSEventStreamCreateFlagWatchRoot,
12};
13use std::{
14    ffi::c_void,
15    ops::{Deref, DerefMut},
16    ptr::NonNull,
17    slice,
18};
19
20type EventsCallback = Box<dyn FnMut(Vec<FsEvent>) + Send>;
21
22pub struct EventStream {
23    stream: FSEventStreamRef,
24}
25
26unsafe impl Send for EventStream {}
27
28impl Drop for EventStream {
29    fn drop(&mut self) {
30        unsafe {
31            FSEventStreamRelease(self.stream);
32        }
33    }
34}
35
36impl EventStream {
37    pub fn new(
38        paths: &[&str],
39        since_event_id: FSEventStreamEventId,
40        latency: CFTimeInterval,
41        callback: EventsCallback,
42    ) -> Self {
43        unsafe extern "C-unwind" fn drop_callback(info: *const c_void) {
44            let _cb: Box<EventsCallback> = unsafe { Box::from_raw(info as _) };
45        }
46
47        unsafe extern "C-unwind" fn raw_callback(
48            _stream: ConstFSEventStreamRef, // ConstFSEventStreamRef streamRef
49            callback_info: *mut c_void,     // void *clientCallBackInfo
50            num_events: usize,              // size_t numEvents
51            event_paths: NonNull<c_void>,   // void *eventPaths
52            event_flags: NonNull<FSEventStreamEventFlags>, // const FSEventStreamEventFlags eventFlags[]
53            event_ids: NonNull<FSEventStreamEventId>,      // const FSEventStreamEventId eventIds[]
54        ) {
55            let event_paths = unsafe {
56                slice::from_raw_parts(event_paths.as_ptr() as *const *const i8, num_events)
57            };
58            let event_flags = unsafe { slice::from_raw_parts(event_flags.as_ptr(), num_events) };
59            let event_ids = unsafe { slice::from_raw_parts(event_ids.as_ptr(), num_events) };
60            let events: Vec<_> = event_paths
61                .iter()
62                .zip(event_flags)
63                .zip(event_ids)
64                .map(|((&path, &flag), &id)| unsafe { FsEvent::from_raw(path, flag, id) })
65                .collect();
66
67            let callback = unsafe { (callback_info as *mut EventsCallback).as_mut() }.unwrap();
68            callback(events);
69        }
70
71        let paths: Vec<_> = paths.iter().map(|&x| CFString::from_str(x)).collect();
72        let paths = CFArray::from_retained_objects(&paths);
73        let mut context = FSEventStreamContext {
74            version: 0,
75            info: Box::leak(Box::new(callback)) as *mut _ as *mut _,
76            retain: None,
77            release: Some(drop_callback),
78            copyDescription: None,
79        };
80
81        let stream: FSEventStreamRef = unsafe {
82            FSEventStreamCreate(
83                None,
84                Some(raw_callback),
85                &mut context,
86                paths.as_opaque(),
87                since_event_id,
88                latency,
89                kFSEventStreamCreateFlagNoDefer
90                    | kFSEventStreamCreateFlagFileEvents
91                    | kFSEventStreamCreateFlagWatchRoot,
92            )
93        };
94        Self { stream }
95    }
96
97    // Start the FSEventStream with a dispatch queue.
98    pub fn spawn(self) -> Option<EventStreamWithQueue> {
99        let queue = DispatchQueue::new("cardinal-sdk-queue", DispatchQueueAttr::SERIAL);
100        unsafe { FSEventStreamSetDispatchQueue(self.stream, Some(&queue)) };
101        let result = unsafe { FSEventStreamStart(self.stream) };
102        if !result {
103            unsafe { FSEventStreamStop(self.stream) };
104            unsafe { FSEventStreamInvalidate(self.stream) };
105            return None;
106        }
107        let stream = self.stream;
108        Some(EventStreamWithQueue { stream, queue })
109    }
110
111    // Get device id being watched by this event stream.
112    pub fn dev(&self) -> dev_t {
113        unsafe { FSEventStreamGetDeviceBeingWatched(self.stream.cast_const()) }
114    }
115}
116
117/// FSEventStream with dispatch queue.
118///
119/// Dropping this struct will stop the FSEventStream and release the dispatch queue.
120pub struct EventStreamWithQueue {
121    stream: FSEventStreamRef,
122    #[allow(dead_code)]
123    queue: DispatchRetained<DispatchQueue>,
124}
125
126impl Drop for EventStreamWithQueue {
127    fn drop(&mut self) {
128        unsafe {
129            FSEventStreamStop(self.stream);
130            FSEventStreamInvalidate(self.stream);
131        }
132    }
133}
134
135pub struct EventWatcher {
136    receiver: Receiver<Vec<FsEvent>>,
137    _cancellation_token: Sender<()>,
138}
139
140impl Deref for EventWatcher {
141    type Target = Receiver<Vec<FsEvent>>;
142
143    fn deref(&self) -> &Self::Target {
144        &self.receiver
145    }
146}
147
148impl DerefMut for EventWatcher {
149    fn deref_mut(&mut self) -> &mut Self::Target {
150        &mut self.receiver
151    }
152}
153
154impl EventWatcher {
155    pub fn noop() -> Self {
156        Self {
157            receiver: unbounded().1,
158            _cancellation_token: bounded::<()>(1).0,
159        }
160    }
161
162    pub fn spawn(
163        path: String,
164        since_event_id: FSEventStreamEventId,
165        latency: f64,
166    ) -> (dev_t, EventWatcher) {
167        let (_cancellation_token, cancellation_token_rx) = bounded::<()>(1);
168        let (sender, receiver) = unbounded();
169        let dev = 0;
170        std::thread::Builder::new()
171            .name("cardinal-sdk-event-watcher".to_string())
172            .spawn(move || {
173                let stream = EventStream::new(
174                    &[&path],
175                    since_event_id,
176                    latency,
177                    Box::new(move |events| {
178                        let _ = sender.send(events);
179                    }),
180                );
181                let _stream_and_queue = stream.spawn().expect("failed to spawn event stream");
182                let _ = cancellation_token_rx.recv();
183            })
184            .unwrap();
185        (
186            dev,
187            EventWatcher {
188                receiver,
189                _cancellation_token,
190            },
191        )
192    }
193}
194
195#[cfg(all(test, target_os = "macos"))]
196mod tests {
197    use super::*;
198    use crate::utils::current_event_id;
199    use crossbeam_channel::RecvTimeoutError;
200    use std::time::{Duration, Instant};
201    use tempfile::tempdir;
202
203    #[test]
204    fn drop_then_respawn_event_watcher_delivers_events() {
205        let temp_dir = tempdir().expect("failed to create tempdir");
206        let watched_root = temp_dir.path().to_path_buf();
207        // canonicalize /var -> /private/var
208        let watched_root = watched_root.canonicalize().expect("failed to canonicalize");
209        let watch_path = watched_root
210            .to_str()
211            .expect("tempdir path should be utf8")
212            .to_string();
213
214        let (_, initial_watcher) =
215            EventWatcher::spawn(watch_path.clone(), current_event_id(), 0.05);
216        drop(initial_watcher);
217
218        // Give the background thread a moment to observe the drop.
219        std::thread::sleep(Duration::from_millis(500));
220
221        let (_, respawned_watcher) = EventWatcher::spawn(watch_path, current_event_id(), 0.05);
222
223        // Allow the stream to start before triggering filesystem activity.
224        std::thread::sleep(Duration::from_millis(500));
225
226        let created_file = watched_root.join("respawn_event.txt");
227        std::fs::write(&created_file, "cardinal").expect("failed to write test file");
228
229        let deadline = Instant::now() + Duration::from_secs(5);
230        let mut observed_change = false;
231        while Instant::now() < deadline {
232            match respawned_watcher.recv_timeout(Duration::from_millis(200)) {
233                Ok(batch) => {
234                    if batch
235                        .iter()
236                        .any(|event| event.path.starts_with(&created_file))
237                    {
238                        observed_change = true;
239                        break;
240                    }
241                }
242                Err(RecvTimeoutError::Timeout) => continue,
243                Err(RecvTimeoutError::Disconnected) => break,
244            }
245        }
246
247        drop(respawned_watcher);
248        assert!(
249            observed_change,
250            "respawned watcher failed to deliver file change event"
251        );
252    }
253}