1use crate::FsEvent;
2use crossbeam_channel::{Receiver, Sender, bounded, unbounded};
3use dispatch2::{DispatchQueue, DispatchQueueAttr, DispatchRetained};
4use libc::dev_t;
5use objc2_core_foundation::{CFArray, CFString, CFTimeInterval};
6use objc2_core_services::{
7 ConstFSEventStreamRef, FSEventStreamContext, FSEventStreamCreate, FSEventStreamEventFlags,
8 FSEventStreamEventId, FSEventStreamGetDeviceBeingWatched, FSEventStreamInvalidate,
9 FSEventStreamRef, FSEventStreamRelease, FSEventStreamSetDispatchQueue, FSEventStreamStart,
10 FSEventStreamStop, kFSEventStreamCreateFlagFileEvents, kFSEventStreamCreateFlagNoDefer,
11 kFSEventStreamCreateFlagWatchRoot,
12};
13use std::{
14 ffi::c_void,
15 ops::{Deref, DerefMut},
16 ptr::NonNull,
17 slice,
18};
19
20type EventsCallback = Box<dyn FnMut(Vec<FsEvent>) + Send>;
21
22pub struct EventStream {
23 stream: FSEventStreamRef,
24}
25
26unsafe impl Send for EventStream {}
27
28impl Drop for EventStream {
29 fn drop(&mut self) {
30 unsafe {
31 FSEventStreamRelease(self.stream);
32 }
33 }
34}
35
36impl EventStream {
37 pub fn new(
38 paths: &[&str],
39 since_event_id: FSEventStreamEventId,
40 latency: CFTimeInterval,
41 callback: EventsCallback,
42 ) -> Self {
43 unsafe extern "C-unwind" fn drop_callback(info: *const c_void) {
44 let _cb: Box<EventsCallback> = unsafe { Box::from_raw(info as _) };
45 }
46
47 unsafe extern "C-unwind" fn raw_callback(
48 _stream: ConstFSEventStreamRef, callback_info: *mut c_void, num_events: usize, event_paths: NonNull<c_void>, event_flags: NonNull<FSEventStreamEventFlags>, event_ids: NonNull<FSEventStreamEventId>, ) {
55 let event_paths = unsafe {
56 slice::from_raw_parts(event_paths.as_ptr() as *const *const i8, num_events)
57 };
58 let event_flags = unsafe { slice::from_raw_parts(event_flags.as_ptr(), num_events) };
59 let event_ids = unsafe { slice::from_raw_parts(event_ids.as_ptr(), num_events) };
60 let events: Vec<_> = event_paths
61 .iter()
62 .zip(event_flags)
63 .zip(event_ids)
64 .map(|((&path, &flag), &id)| unsafe { FsEvent::from_raw(path, flag, id) })
65 .collect();
66
67 let callback = unsafe { (callback_info as *mut EventsCallback).as_mut() }.unwrap();
68 callback(events);
69 }
70
71 let paths: Vec<_> = paths.iter().map(|&x| CFString::from_str(x)).collect();
72 let paths = CFArray::from_retained_objects(&paths);
73 let mut context = FSEventStreamContext {
74 version: 0,
75 info: Box::leak(Box::new(callback)) as *mut _ as *mut _,
76 retain: None,
77 release: Some(drop_callback),
78 copyDescription: None,
79 };
80
81 let stream: FSEventStreamRef = unsafe {
82 FSEventStreamCreate(
83 None,
84 Some(raw_callback),
85 &mut context,
86 paths.as_opaque(),
87 since_event_id,
88 latency,
89 kFSEventStreamCreateFlagNoDefer
90 | kFSEventStreamCreateFlagFileEvents
91 | kFSEventStreamCreateFlagWatchRoot,
92 )
93 };
94 Self { stream }
95 }
96
97 pub fn spawn(self) -> Option<EventStreamWithQueue> {
99 let queue = DispatchQueue::new("cardinal-sdk-queue", DispatchQueueAttr::SERIAL);
100 unsafe { FSEventStreamSetDispatchQueue(self.stream, Some(&queue)) };
101 let result = unsafe { FSEventStreamStart(self.stream) };
102 if !result {
103 unsafe { FSEventStreamStop(self.stream) };
104 unsafe { FSEventStreamInvalidate(self.stream) };
105 return None;
106 }
107 let stream = self.stream;
108 Some(EventStreamWithQueue { stream, queue })
109 }
110
111 pub fn dev(&self) -> dev_t {
113 unsafe { FSEventStreamGetDeviceBeingWatched(self.stream.cast_const()) }
114 }
115}
116
117pub struct EventStreamWithQueue {
121 stream: FSEventStreamRef,
122 #[allow(dead_code)]
123 queue: DispatchRetained<DispatchQueue>,
124}
125
126impl Drop for EventStreamWithQueue {
127 fn drop(&mut self) {
128 unsafe {
129 FSEventStreamStop(self.stream);
130 FSEventStreamInvalidate(self.stream);
131 }
132 }
133}
134
135pub struct EventWatcher {
136 receiver: Receiver<Vec<FsEvent>>,
137 _cancellation_token: Sender<()>,
138}
139
140impl Deref for EventWatcher {
141 type Target = Receiver<Vec<FsEvent>>;
142
143 fn deref(&self) -> &Self::Target {
144 &self.receiver
145 }
146}
147
148impl DerefMut for EventWatcher {
149 fn deref_mut(&mut self) -> &mut Self::Target {
150 &mut self.receiver
151 }
152}
153
154impl EventWatcher {
155 pub fn noop() -> Self {
156 Self {
157 receiver: unbounded().1,
158 _cancellation_token: bounded::<()>(1).0,
159 }
160 }
161
162 pub fn spawn(
163 path: String,
164 since_event_id: FSEventStreamEventId,
165 latency: f64,
166 ) -> (dev_t, EventWatcher) {
167 let (_cancellation_token, cancellation_token_rx) = bounded::<()>(1);
168 let (sender, receiver) = unbounded();
169 let dev = 0;
170 std::thread::Builder::new()
171 .name("cardinal-sdk-event-watcher".to_string())
172 .spawn(move || {
173 let stream = EventStream::new(
174 &[&path],
175 since_event_id,
176 latency,
177 Box::new(move |events| {
178 let _ = sender.send(events);
179 }),
180 );
181 let _stream_and_queue = stream.spawn().expect("failed to spawn event stream");
182 let _ = cancellation_token_rx.recv();
183 })
184 .unwrap();
185 (
186 dev,
187 EventWatcher {
188 receiver,
189 _cancellation_token,
190 },
191 )
192 }
193}
194
195#[cfg(all(test, target_os = "macos"))]
196mod tests {
197 use super::*;
198 use crate::utils::current_event_id;
199 use crossbeam_channel::RecvTimeoutError;
200 use std::time::{Duration, Instant};
201 use tempfile::tempdir;
202
203 #[test]
204 fn drop_then_respawn_event_watcher_delivers_events() {
205 let temp_dir = tempdir().expect("failed to create tempdir");
206 let watched_root = temp_dir.path().to_path_buf();
207 let watched_root = watched_root.canonicalize().expect("failed to canonicalize");
209 let watch_path = watched_root
210 .to_str()
211 .expect("tempdir path should be utf8")
212 .to_string();
213
214 let (_, initial_watcher) =
215 EventWatcher::spawn(watch_path.clone(), current_event_id(), 0.05);
216 drop(initial_watcher);
217
218 std::thread::sleep(Duration::from_millis(500));
220
221 let (_, respawned_watcher) = EventWatcher::spawn(watch_path, current_event_id(), 0.05);
222
223 std::thread::sleep(Duration::from_millis(500));
225
226 let created_file = watched_root.join("respawn_event.txt");
227 std::fs::write(&created_file, "cardinal").expect("failed to write test file");
228
229 let deadline = Instant::now() + Duration::from_secs(5);
230 let mut observed_change = false;
231 while Instant::now() < deadline {
232 match respawned_watcher.recv_timeout(Duration::from_millis(200)) {
233 Ok(batch) => {
234 if batch
235 .iter()
236 .any(|event| event.path.starts_with(&created_file))
237 {
238 observed_change = true;
239 break;
240 }
241 }
242 Err(RecvTimeoutError::Timeout) => continue,
243 Err(RecvTimeoutError::Disconnected) => break,
244 }
245 }
246
247 drop(respawned_watcher);
248 assert!(
249 observed_change,
250 "respawned watcher failed to deliver file change event"
251 );
252 }
253}