flashkraft_core/commands/hotplug.rs
1//! USB Hotplug Detection
2//!
3//! Provides a cross-platform event stream that fires whenever a USB storage
4//! device is connected or disconnected. Callers receive a bare
5//! [`UsbHotplugEvent`] and are expected to re-run
6//! [`crate::commands::load_drives_sync`] themselves — hotplug is only a
7//! *trigger*, not a source of drive data. All block-device information
8//! (path, size, mount point) continues to come from the existing
9//! sysfs / diskutil / wmic enumeration code.
10//!
11//! ## Mechanism
12//!
13//! | Platform | Watched path | Kernel mechanism |
14//! |----------|---------------------|---------------------------|
15//! | Linux | `/sys/block` | inotify (via `notify`) |
16//! | macOS | `/dev` | FSEvents (via `notify`) |
17//! | Windows | `\\.\PhysicalDrive` | ReadDirectoryChangesW |
18//!
19//! On Linux, the kernel creates / removes entries under `/sys/block` whenever
20//! a block device appears or disappears — including USB mass-storage devices.
21//! Watching that directory with inotify requires **no privileges** and pulls
22//! in no USB-specific library.
23//!
24//! On macOS, `/dev` receives `disk*` entries when drives connect; FSEvents
25//! covers that without elevated permissions.
26//!
27//! On Windows, `notify` watches the system drive root as a lightweight
28//! proxy; the real drive list comes from the wmic enumeration re-run on
29//! every event.
30//!
31//! ## Permission notes
32//!
33//! No elevated privileges are required for any of these watches.
34//! `/sys/block` and `/dev` are world-readable; the `notify` crate uses only
35//! kernel-provided notification APIs (inotify fd, FSEvents socket,
36//! ReadDirectoryChangesW handle) that any process may open.
37//!
38//! ## Usage
39//!
40//! ```rust,ignore
41//! use flashkraft_core::commands::hotplug::{watch_usb_events, UsbHotplugEvent};
42//! use futures::StreamExt;
43//!
44//! let mut stream = watch_usb_events()?;
45//! while let Some(event) = stream.next().await {
46//! match event {
47//! UsbHotplugEvent::Arrived => println!("drive connected"),
48//! UsbHotplugEvent::Left => println!("drive disconnected"),
49//! }
50//! let drives = flashkraft_core::commands::load_drives_sync();
51//! }
52//! ```
53
54use std::path::Path;
55use std::sync::mpsc as std_mpsc;
56use std::time::Duration;
57
58use futures::Stream;
59use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
60use tokio::sync::mpsc as tokio_mpsc;
61
62// ---------------------------------------------------------------------------
63// Public types
64// ---------------------------------------------------------------------------
65
66/// A USB hotplug trigger emitted when any block device connects or disconnects.
67///
68/// This is intentionally a plain enum with no device payload. All drive
69/// metadata is obtained by re-running the sysfs / diskutil / wmic enumeration
70/// after receiving the event.
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub enum UsbHotplugEvent {
73 /// A drive was connected (a new entry appeared under the watched path).
74 Arrived,
75 /// A drive was disconnected (an entry was removed from the watched path).
76 Left,
77}
78
79// ---------------------------------------------------------------------------
80// Error type
81// ---------------------------------------------------------------------------
82
83/// Error returned when the OS refuses to create the filesystem watch.
84#[derive(Debug)]
85pub struct HotplugError(String);
86
87impl std::fmt::Display for HotplugError {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 write!(f, "hotplug watch error: {}", self.0)
90 }
91}
92
93impl std::error::Error for HotplugError {}
94
95impl From<notify::Error> for HotplugError {
96 fn from(e: notify::Error) -> Self {
97 HotplugError(e.to_string())
98 }
99}
100
101// ---------------------------------------------------------------------------
102// Platform-specific watch path
103// ---------------------------------------------------------------------------
104
105/// The filesystem path to watch for block-device add/remove events.
106///
107/// - Linux : `/sys/block` — kernel creates/removes entries here
108/// - macOS : `/dev` — `disk*` nodes appear/disappear here
109/// - Windows: `C:\` — lightweight proxy; real data comes from wmic
110/// - Other : `/dev` — reasonable fallback
111fn watch_path() -> &'static Path {
112 #[cfg(target_os = "linux")]
113 return Path::new("/sys/block");
114
115 #[cfg(target_os = "macos")]
116 return Path::new("/dev");
117
118 #[cfg(target_os = "windows")]
119 return Path::new("C:\\");
120
121 #[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
122 return Path::new("/dev");
123}
124
125// ---------------------------------------------------------------------------
126// Public API
127// ---------------------------------------------------------------------------
128
129/// Start watching for block-device connect / disconnect events.
130///
131/// Returns a [`Stream`] that yields [`UsbHotplugEvent`] values. The stream
132/// runs for the lifetime of the returned object; drop it to stop watching.
133///
134/// Internally this spawns a small background thread that owns the
135/// [`notify::RecommendedWatcher`] (which is `!Send` on some platforms) and
136/// bridges its events into an async Tokio channel.
137///
138/// # Errors
139///
140/// Returns a [`HotplugError`] if the OS refuses to create the inotify /
141/// FSEvents / ReadDirectoryChangesW watch. This is rare — it typically only
142/// happens when the watched path does not exist (e.g. `/sys/block` on a
143/// non-Linux kernel) or the process has been sandboxed.
144pub fn watch_usb_events() -> Result<impl Stream<Item = UsbHotplugEvent>, HotplugError> {
145 let path = watch_path();
146
147 // ── Verify the watch path exists before committing ────────────────────────
148 if !path.exists() {
149 // Return a stream that immediately ends rather than hard-erroring —
150 // the caller will simply get no events, which is safe.
151 let (tx, rx) = tokio_mpsc::unbounded_channel::<UsbHotplugEvent>();
152 drop(tx); // closed immediately → stream ends
153 return Ok(UnboundedReceiverStream::new(rx));
154 }
155
156 // ── Bridge: notify (sync) → tokio unbounded channel (async) ──────────────
157 //
158 // `notify::RecommendedWatcher` is not `Send` on all platforms (macOS
159 // FSEvents runs on a dedicated thread managed by CoreFoundation). We
160 // therefore own the watcher on a dedicated `std::thread` and forward
161 // filesystem events through a `std::sync::mpsc` channel to a second
162 // thread that feeds a `tokio::sync::mpsc` channel readable from async code.
163
164 // Step 1: std::sync::mpsc — raw notify events arrive here.
165 let (notify_tx, notify_rx) = std_mpsc::channel::<notify::Result<notify::Event>>();
166
167 // Step 2: tokio unbounded channel — translated UsbHotplugEvents go here.
168 let (hotplug_tx, hotplug_rx) = tokio_mpsc::unbounded_channel::<UsbHotplugEvent>();
169
170 // ── Thread A: owns the Watcher ────────────────────────────────────────────
171 //
172 // We build the watcher here (synchronously) so we can return an error if
173 // setup fails, then move it into the thread to keep it alive.
174 let mut watcher = RecommendedWatcher::new(
175 notify_tx,
176 Config::default()
177 // Poll interval is only used by the PollWatcher fallback.
178 // The recommended watcher (inotify / FSEvents / ReadDirChanges)
179 // ignores this, but setting it avoids a "missing config" warning.
180 .with_poll_interval(Duration::from_secs(2)),
181 )?;
182
183 watcher.watch(path, RecursiveMode::NonRecursive)?;
184
185 std::thread::Builder::new()
186 .name("flashkraft-hotplug-watcher".into())
187 .spawn(move || {
188 // Keep `watcher` alive for the lifetime of this thread.
189 // The thread exits when the notify_rx receiver (in Thread B) is
190 // dropped, which causes notify_tx sends to fail → watcher is
191 // dropped → inotify fd / FSEvents stream is closed.
192 let _watcher = watcher;
193
194 // Block this thread indefinitely; the watcher's background
195 // mechanism (inotify fd read-loop / kqueue / etc.) delivers
196 // events into notify_tx automatically without any polling here.
197 // We park the thread so it does not busy-spin.
198 loop {
199 std::thread::park();
200 }
201 })
202 .ok(); // If spawning fails we still return the (silent) stream.
203
204 // ── Thread B: translates notify events → UsbHotplugEvents ────────────────
205 let hotplug_tx_b = hotplug_tx;
206 std::thread::Builder::new()
207 .name("flashkraft-hotplug-bridge".into())
208 .spawn(move || {
209 for result in ¬ify_rx {
210 let event = match result {
211 Ok(e) => e,
212 Err(_) => continue,
213 };
214
215 let translated = translate_event(&event);
216 if let Some(hp_event) = translated {
217 // If the receiver (stream) has been dropped, stop.
218 if hotplug_tx_b.send(hp_event).is_err() {
219 break;
220 }
221 }
222 }
223 // notify_rx exhausted (watcher dropped) → channel closed naturally.
224 })
225 .ok();
226
227 Ok(UnboundedReceiverStream::new(hotplug_rx))
228}
229
230// ---------------------------------------------------------------------------
231// Event translation
232// ---------------------------------------------------------------------------
233
234/// Map a raw [`notify::Event`] to a [`UsbHotplugEvent`], or `None` to discard.
235///
236/// We only care about create / remove events at the top level of the watched
237/// directory. Modify events (e.g. attribute changes on existing sysfs files)
238/// are ignored to avoid spurious re-enumerations.
239fn translate_event(event: ¬ify::Event) -> Option<UsbHotplugEvent> {
240 match &event.kind {
241 // A new entry appeared — a block device was added.
242 EventKind::Create(_) => Some(UsbHotplugEvent::Arrived),
243
244 // An entry was removed — a block device was unplugged.
245 EventKind::Remove(_) => Some(UsbHotplugEvent::Left),
246
247 // On some platforms (e.g. macOS FSEvents) renames of device nodes
248 // indicate a new device taking over a slot. Treat as Arrived.
249 EventKind::Modify(notify::event::ModifyKind::Name(_)) => Some(UsbHotplugEvent::Arrived),
250
251 // Content / metadata / other modify events — ignore.
252 EventKind::Modify(_) => None,
253
254 // Access events — ignore.
255 EventKind::Access(_) => None,
256
257 // Unknown / other — conservative: treat as Arrived so the caller
258 // refreshes the drive list and discovers the true state.
259 EventKind::Other => Some(UsbHotplugEvent::Arrived),
260
261 EventKind::Any => None,
262 }
263}
264
265// ---------------------------------------------------------------------------
266// Minimal Stream wrapper around tokio::sync::mpsc::UnboundedReceiver
267// ---------------------------------------------------------------------------
268
269/// A [`Stream`] backed by a [`tokio_mpsc::UnboundedReceiver`].
270///
271/// This is a lightweight alternative to `tokio_stream::wrappers::UnboundedReceiverStream`
272/// that avoids adding `tokio-stream` as a dependency.
273struct UnboundedReceiverStream<T> {
274 inner: tokio_mpsc::UnboundedReceiver<T>,
275}
276
277impl<T> UnboundedReceiverStream<T> {
278 fn new(inner: tokio_mpsc::UnboundedReceiver<T>) -> Self {
279 Self { inner }
280 }
281}
282
283impl<T> Stream for UnboundedReceiverStream<T> {
284 type Item = T;
285
286 fn poll_next(
287 mut self: std::pin::Pin<&mut Self>,
288 cx: &mut std::task::Context<'_>,
289 ) -> std::task::Poll<Option<Self::Item>> {
290 self.inner.poll_recv(cx)
291 }
292}
293
294// ---------------------------------------------------------------------------
295// Tests
296// ---------------------------------------------------------------------------
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301 use notify::event::{CreateKind, ModifyKind, RemoveKind, RenameMode};
302
303 // ── UsbHotplugEvent trait coverage ───────────────────────────────────────
304
305 #[test]
306 fn test_event_traits() {
307 let a = UsbHotplugEvent::Arrived;
308 let b = UsbHotplugEvent::Left;
309
310 let a2 = a.clone();
311 let b2 = b.clone();
312
313 assert_eq!(a, a2);
314 assert_eq!(b, b2);
315 assert_ne!(a, b);
316
317 assert!(format!("{a:?}").contains("Arrived"));
318 assert!(format!("{b:?}").contains("Left"));
319 }
320
321 #[test]
322 fn test_variant_exhaustiveness() {
323 for event in [UsbHotplugEvent::Arrived, UsbHotplugEvent::Left] {
324 let label = match event {
325 UsbHotplugEvent::Arrived => "arrived",
326 UsbHotplugEvent::Left => "left",
327 };
328 assert!(!label.is_empty());
329 }
330 }
331
332 // ── translate_event ───────────────────────────────────────────────────────
333
334 fn make_event(kind: EventKind) -> notify::Event {
335 notify::Event {
336 kind,
337 paths: vec![],
338 attrs: Default::default(),
339 }
340 }
341
342 #[test]
343 fn test_translate_create_any_arrives() {
344 let e = make_event(EventKind::Create(CreateKind::Any));
345 assert_eq!(translate_event(&e), Some(UsbHotplugEvent::Arrived));
346 }
347
348 #[test]
349 fn test_translate_create_file_arrives() {
350 let e = make_event(EventKind::Create(CreateKind::File));
351 assert_eq!(translate_event(&e), Some(UsbHotplugEvent::Arrived));
352 }
353
354 #[test]
355 fn test_translate_create_folder_arrives() {
356 let e = make_event(EventKind::Create(CreateKind::Folder));
357 assert_eq!(translate_event(&e), Some(UsbHotplugEvent::Arrived));
358 }
359
360 #[test]
361 fn test_translate_remove_any_left() {
362 let e = make_event(EventKind::Remove(RemoveKind::Any));
363 assert_eq!(translate_event(&e), Some(UsbHotplugEvent::Left));
364 }
365
366 #[test]
367 fn test_translate_remove_file_left() {
368 let e = make_event(EventKind::Remove(RemoveKind::File));
369 assert_eq!(translate_event(&e), Some(UsbHotplugEvent::Left));
370 }
371
372 #[test]
373 fn test_translate_rename_arrives() {
374 let e = make_event(EventKind::Modify(ModifyKind::Name(RenameMode::Any)));
375 assert_eq!(translate_event(&e), Some(UsbHotplugEvent::Arrived));
376 }
377
378 #[test]
379 fn test_translate_modify_data_ignored() {
380 let e = make_event(EventKind::Modify(ModifyKind::Data(
381 notify::event::DataChange::Any,
382 )));
383 assert_eq!(translate_event(&e), None);
384 }
385
386 #[test]
387 fn test_translate_modify_metadata_ignored() {
388 let e = make_event(EventKind::Modify(ModifyKind::Metadata(
389 notify::event::MetadataKind::Any,
390 )));
391 assert_eq!(translate_event(&e), None);
392 }
393
394 #[test]
395 fn test_translate_access_ignored() {
396 let e = make_event(EventKind::Access(notify::event::AccessKind::Any));
397 assert_eq!(translate_event(&e), None);
398 }
399
400 #[test]
401 fn test_translate_other_arrives() {
402 let e = make_event(EventKind::Other);
403 assert_eq!(translate_event(&e), Some(UsbHotplugEvent::Arrived));
404 }
405
406 #[test]
407 fn test_translate_any_ignored() {
408 let e = make_event(EventKind::Any);
409 assert_eq!(translate_event(&e), None);
410 }
411
412 // ── watch_usb_events construction ─────────────────────────────────────────
413
414 /// Constructing the watch must not panic. It either succeeds (returns a
415 /// stream) or returns a clean error — both outcomes are acceptable.
416 #[test]
417 fn test_watch_usb_events_does_not_panic() {
418 let result = watch_usb_events();
419 match result {
420 Ok(_) => println!("watch_usb_events: stream created successfully"),
421 Err(ref e) => println!("watch_usb_events: OS returned error (acceptable): {e}"),
422 }
423 // Reaching this line without panicking is the assertion.
424 }
425
426 // ── HotplugError display ──────────────────────────────────────────────────
427
428 #[test]
429 fn test_hotplug_error_display() {
430 let e = HotplugError("something went wrong".into());
431 let s = format!("{e}");
432 assert!(s.contains("hotplug watch error"));
433 assert!(s.contains("something went wrong"));
434 }
435
436 #[test]
437 fn test_hotplug_error_from_notify() {
438 let notify_err = notify::Error::generic("test error");
439 let hp_err = HotplugError::from(notify_err);
440 let s = format!("{hp_err}");
441 assert!(s.contains("hotplug watch error"));
442 }
443}