Skip to main content

flashkraft_core/commands/
hotplug.rs

1//! USB Hotplug Detection
2//!
3//! Provides a cross-platform event stream that fires whenever a USB storage
4//! device is connected or disconnected.  Callers receive a bare
5//! [`UsbHotplugEvent`] and are expected to re-run
6//! [`crate::commands::load_drives_sync`] themselves — hotplug is only a
7//! *trigger*, not a source of drive data.  All block-device information
8//! (path, size, mount point) continues to come from the existing
9//! sysfs / diskutil / wmic enumeration code.
10//!
11//! ## Mechanism
12//!
13//! | Platform | Watched path        | Kernel mechanism          |
14//! |----------|---------------------|---------------------------|
15//! | Linux    | `/sys/block`        | inotify (via `notify`)    |
16//! | macOS    | `/dev`              | FSEvents (via `notify`)   |
17//! | Windows  | `\\.\PhysicalDrive` | ReadDirectoryChangesW     |
18//!
19//! On Linux, the kernel creates / removes entries under `/sys/block` whenever
20//! a block device appears or disappears — including USB mass-storage devices.
21//! Watching that directory with inotify requires **no privileges** and pulls
22//! in no USB-specific library.
23//!
24//! On macOS, `/dev` receives `disk*` entries when drives connect; FSEvents
25//! covers that without elevated permissions.
26//!
27//! On Windows, `notify` watches the system drive root as a lightweight
28//! proxy; the real drive list comes from the wmic enumeration re-run on
29//! every event.
30//!
31//! ## Permission notes
32//!
33//! No elevated privileges are required for any of these watches.
34//! `/sys/block` and `/dev` are world-readable; the `notify` crate uses only
35//! kernel-provided notification APIs (inotify fd, FSEvents socket,
36//! ReadDirectoryChangesW handle) that any process may open.
37//!
38//! ## Usage
39//!
40//! ```rust,ignore
41//! use flashkraft_core::commands::hotplug::{watch_usb_events, UsbHotplugEvent};
42//! use futures::StreamExt;
43//!
44//! let mut stream = watch_usb_events()?;
45//! while let Some(event) = stream.next().await {
46//!     match event {
47//!         UsbHotplugEvent::Arrived  => println!("drive connected"),
48//!         UsbHotplugEvent::Left     => println!("drive disconnected"),
49//!     }
50//!     let drives = flashkraft_core::commands::load_drives_sync();
51//! }
52//! ```
53
54use std::path::Path;
55use std::sync::mpsc as std_mpsc;
56use std::time::Duration;
57
58use futures::Stream;
59use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
60use tokio::sync::mpsc as tokio_mpsc;
61
62// ---------------------------------------------------------------------------
63// Public types
64// ---------------------------------------------------------------------------
65
66/// A USB hotplug trigger emitted when any block device connects or disconnects.
67///
68/// This is intentionally a plain enum with no device payload.  All drive
69/// metadata is obtained by re-running the sysfs / diskutil / wmic enumeration
70/// after receiving the event.
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub enum UsbHotplugEvent {
73    /// A drive was connected (a new entry appeared under the watched path).
74    Arrived,
75    /// A drive was disconnected (an entry was removed from the watched path).
76    Left,
77}
78
79// ---------------------------------------------------------------------------
80// Error type
81// ---------------------------------------------------------------------------
82
83/// Error returned when the OS refuses to create the filesystem watch.
84#[derive(Debug)]
85pub struct HotplugError(String);
86
87impl std::fmt::Display for HotplugError {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        write!(f, "hotplug watch error: {}", self.0)
90    }
91}
92
93impl std::error::Error for HotplugError {}
94
95impl From<notify::Error> for HotplugError {
96    fn from(e: notify::Error) -> Self {
97        HotplugError(e.to_string())
98    }
99}
100
101// ---------------------------------------------------------------------------
102// Platform-specific watch path
103// ---------------------------------------------------------------------------
104
105/// The filesystem path to watch for block-device add/remove events.
106///
107/// - Linux  : `/sys/block`  — kernel creates/removes entries here
108/// - macOS  : `/dev`        — `disk*` nodes appear/disappear here
109/// - Windows: `C:\`         — lightweight proxy; real data comes from wmic
110/// - Other  : `/dev`        — reasonable fallback
111fn watch_path() -> &'static Path {
112    #[cfg(target_os = "linux")]
113    return Path::new("/sys/block");
114
115    #[cfg(target_os = "macos")]
116    return Path::new("/dev");
117
118    #[cfg(target_os = "windows")]
119    return Path::new("C:\\");
120
121    #[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
122    return Path::new("/dev");
123}
124
125// ---------------------------------------------------------------------------
126// Public API
127// ---------------------------------------------------------------------------
128
129/// Start watching for block-device connect / disconnect events.
130///
131/// Returns a [`Stream`] that yields [`UsbHotplugEvent`] values.  The stream
132/// runs for the lifetime of the returned object; drop it to stop watching.
133///
134/// Internally this spawns a small background thread that owns the
135/// [`notify::RecommendedWatcher`] (which is `!Send` on some platforms) and
136/// bridges its events into an async Tokio channel.
137///
138/// # Errors
139///
140/// Returns a [`HotplugError`] if the OS refuses to create the inotify /
141/// FSEvents / ReadDirectoryChangesW watch.  This is rare — it typically only
142/// happens when the watched path does not exist (e.g. `/sys/block` on a
143/// non-Linux kernel) or the process has been sandboxed.
144pub fn watch_usb_events() -> Result<impl Stream<Item = UsbHotplugEvent>, HotplugError> {
145    let path = watch_path();
146
147    // ── Verify the watch path exists before committing ────────────────────────
148    if !path.exists() {
149        // Return a stream that immediately ends rather than hard-erroring —
150        // the caller will simply get no events, which is safe.
151        let (tx, rx) = tokio_mpsc::unbounded_channel::<UsbHotplugEvent>();
152        drop(tx); // closed immediately → stream ends
153        return Ok(UnboundedReceiverStream::new(rx));
154    }
155
156    // ── Bridge: notify (sync) → tokio unbounded channel (async) ──────────────
157    //
158    // `notify::RecommendedWatcher` is not `Send` on all platforms (macOS
159    // FSEvents runs on a dedicated thread managed by CoreFoundation).  We
160    // therefore own the watcher on a dedicated `std::thread` and forward
161    // filesystem events through a `std::sync::mpsc` channel to a second
162    // thread that feeds a `tokio::sync::mpsc` channel readable from async code.
163
164    // Step 1: std::sync::mpsc — raw notify events arrive here.
165    let (notify_tx, notify_rx) = std_mpsc::channel::<notify::Result<notify::Event>>();
166
167    // Step 2: tokio unbounded channel — translated UsbHotplugEvents go here.
168    let (hotplug_tx, hotplug_rx) = tokio_mpsc::unbounded_channel::<UsbHotplugEvent>();
169
170    // ── Thread A: owns the Watcher ────────────────────────────────────────────
171    //
172    // We build the watcher here (synchronously) so we can return an error if
173    // setup fails, then move it into the thread to keep it alive.
174    let mut watcher = RecommendedWatcher::new(
175        notify_tx,
176        Config::default()
177            // Poll interval is only used by the PollWatcher fallback.
178            // The recommended watcher (inotify / FSEvents / ReadDirChanges)
179            // ignores this, but setting it avoids a "missing config" warning.
180            .with_poll_interval(Duration::from_secs(2)),
181    )?;
182
183    watcher.watch(path, RecursiveMode::NonRecursive)?;
184
185    std::thread::Builder::new()
186        .name("flashkraft-hotplug-watcher".into())
187        .spawn(move || {
188            // Keep `watcher` alive for the lifetime of this thread.
189            // The thread exits when the notify_rx receiver (in Thread B) is
190            // dropped, which causes notify_tx sends to fail → watcher is
191            // dropped → inotify fd / FSEvents stream is closed.
192            let _watcher = watcher;
193
194            // Block this thread indefinitely; the watcher's background
195            // mechanism (inotify fd read-loop / kqueue / etc.) delivers
196            // events into notify_tx automatically without any polling here.
197            // We park the thread so it does not busy-spin.
198            loop {
199                std::thread::park();
200            }
201        })
202        .ok(); // If spawning fails we still return the (silent) stream.
203
204    // ── Thread B: translates notify events → UsbHotplugEvents ────────────────
205    let hotplug_tx_b = hotplug_tx;
206    std::thread::Builder::new()
207        .name("flashkraft-hotplug-bridge".into())
208        .spawn(move || {
209            for result in &notify_rx {
210                let event = match result {
211                    Ok(e) => e,
212                    Err(_) => continue,
213                };
214
215                let translated = translate_event(&event);
216                if let Some(hp_event) = translated {
217                    // If the receiver (stream) has been dropped, stop.
218                    if hotplug_tx_b.send(hp_event).is_err() {
219                        break;
220                    }
221                }
222            }
223            // notify_rx exhausted (watcher dropped) → channel closed naturally.
224        })
225        .ok();
226
227    Ok(UnboundedReceiverStream::new(hotplug_rx))
228}
229
230// ---------------------------------------------------------------------------
231// Event translation
232// ---------------------------------------------------------------------------
233
234/// Map a raw [`notify::Event`] to a [`UsbHotplugEvent`], or `None` to discard.
235///
236/// We only care about create / remove events at the top level of the watched
237/// directory.  Modify events (e.g. attribute changes on existing sysfs files)
238/// are ignored to avoid spurious re-enumerations.
239fn translate_event(event: &notify::Event) -> Option<UsbHotplugEvent> {
240    match &event.kind {
241        // A new entry appeared — a block device was added.
242        EventKind::Create(_) => Some(UsbHotplugEvent::Arrived),
243
244        // An entry was removed — a block device was unplugged.
245        EventKind::Remove(_) => Some(UsbHotplugEvent::Left),
246
247        // On some platforms (e.g. macOS FSEvents) renames of device nodes
248        // indicate a new device taking over a slot.  Treat as Arrived.
249        EventKind::Modify(notify::event::ModifyKind::Name(_)) => Some(UsbHotplugEvent::Arrived),
250
251        // Content / metadata / other modify events — ignore.
252        EventKind::Modify(_) => None,
253
254        // Access events — ignore.
255        EventKind::Access(_) => None,
256
257        // Unknown / other — conservative: treat as Arrived so the caller
258        // refreshes the drive list and discovers the true state.
259        EventKind::Other => Some(UsbHotplugEvent::Arrived),
260
261        EventKind::Any => None,
262    }
263}
264
265// ---------------------------------------------------------------------------
266// Minimal Stream wrapper around tokio::sync::mpsc::UnboundedReceiver
267// ---------------------------------------------------------------------------
268
269/// A [`Stream`] backed by a [`tokio_mpsc::UnboundedReceiver`].
270///
271/// This is a lightweight alternative to `tokio_stream::wrappers::UnboundedReceiverStream`
272/// that avoids adding `tokio-stream` as a dependency.
273struct UnboundedReceiverStream<T> {
274    inner: tokio_mpsc::UnboundedReceiver<T>,
275}
276
277impl<T> UnboundedReceiverStream<T> {
278    fn new(inner: tokio_mpsc::UnboundedReceiver<T>) -> Self {
279        Self { inner }
280    }
281}
282
283impl<T> Stream for UnboundedReceiverStream<T> {
284    type Item = T;
285
286    fn poll_next(
287        mut self: std::pin::Pin<&mut Self>,
288        cx: &mut std::task::Context<'_>,
289    ) -> std::task::Poll<Option<Self::Item>> {
290        self.inner.poll_recv(cx)
291    }
292}
293
294// ---------------------------------------------------------------------------
295// Tests
296// ---------------------------------------------------------------------------
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301    use notify::event::{CreateKind, ModifyKind, RemoveKind, RenameMode};
302
303    // ── UsbHotplugEvent trait coverage ───────────────────────────────────────
304
305    #[test]
306    fn test_event_traits() {
307        let a = UsbHotplugEvent::Arrived;
308        let b = UsbHotplugEvent::Left;
309
310        let a2 = a.clone();
311        let b2 = b.clone();
312
313        assert_eq!(a, a2);
314        assert_eq!(b, b2);
315        assert_ne!(a, b);
316
317        assert!(format!("{a:?}").contains("Arrived"));
318        assert!(format!("{b:?}").contains("Left"));
319    }
320
321    #[test]
322    fn test_variant_exhaustiveness() {
323        for event in [UsbHotplugEvent::Arrived, UsbHotplugEvent::Left] {
324            let label = match event {
325                UsbHotplugEvent::Arrived => "arrived",
326                UsbHotplugEvent::Left => "left",
327            };
328            assert!(!label.is_empty());
329        }
330    }
331
332    // ── translate_event ───────────────────────────────────────────────────────
333
334    fn make_event(kind: EventKind) -> notify::Event {
335        notify::Event {
336            kind,
337            paths: vec![],
338            attrs: Default::default(),
339        }
340    }
341
342    #[test]
343    fn test_translate_create_any_arrives() {
344        let e = make_event(EventKind::Create(CreateKind::Any));
345        assert_eq!(translate_event(&e), Some(UsbHotplugEvent::Arrived));
346    }
347
348    #[test]
349    fn test_translate_create_file_arrives() {
350        let e = make_event(EventKind::Create(CreateKind::File));
351        assert_eq!(translate_event(&e), Some(UsbHotplugEvent::Arrived));
352    }
353
354    #[test]
355    fn test_translate_create_folder_arrives() {
356        let e = make_event(EventKind::Create(CreateKind::Folder));
357        assert_eq!(translate_event(&e), Some(UsbHotplugEvent::Arrived));
358    }
359
360    #[test]
361    fn test_translate_remove_any_left() {
362        let e = make_event(EventKind::Remove(RemoveKind::Any));
363        assert_eq!(translate_event(&e), Some(UsbHotplugEvent::Left));
364    }
365
366    #[test]
367    fn test_translate_remove_file_left() {
368        let e = make_event(EventKind::Remove(RemoveKind::File));
369        assert_eq!(translate_event(&e), Some(UsbHotplugEvent::Left));
370    }
371
372    #[test]
373    fn test_translate_rename_arrives() {
374        let e = make_event(EventKind::Modify(ModifyKind::Name(RenameMode::Any)));
375        assert_eq!(translate_event(&e), Some(UsbHotplugEvent::Arrived));
376    }
377
378    #[test]
379    fn test_translate_modify_data_ignored() {
380        let e = make_event(EventKind::Modify(ModifyKind::Data(
381            notify::event::DataChange::Any,
382        )));
383        assert_eq!(translate_event(&e), None);
384    }
385
386    #[test]
387    fn test_translate_modify_metadata_ignored() {
388        let e = make_event(EventKind::Modify(ModifyKind::Metadata(
389            notify::event::MetadataKind::Any,
390        )));
391        assert_eq!(translate_event(&e), None);
392    }
393
394    #[test]
395    fn test_translate_access_ignored() {
396        let e = make_event(EventKind::Access(notify::event::AccessKind::Any));
397        assert_eq!(translate_event(&e), None);
398    }
399
400    #[test]
401    fn test_translate_other_arrives() {
402        let e = make_event(EventKind::Other);
403        assert_eq!(translate_event(&e), Some(UsbHotplugEvent::Arrived));
404    }
405
406    #[test]
407    fn test_translate_any_ignored() {
408        let e = make_event(EventKind::Any);
409        assert_eq!(translate_event(&e), None);
410    }
411
412    // ── watch_usb_events construction ─────────────────────────────────────────
413
414    /// Constructing the watch must not panic.  It either succeeds (returns a
415    /// stream) or returns a clean error — both outcomes are acceptable.
416    #[test]
417    fn test_watch_usb_events_does_not_panic() {
418        let result = watch_usb_events();
419        match result {
420            Ok(_) => println!("watch_usb_events: stream created successfully"),
421            Err(ref e) => println!("watch_usb_events: OS returned error (acceptable): {e}"),
422        }
423        // Reaching this line without panicking is the assertion.
424    }
425
426    // ── HotplugError display ──────────────────────────────────────────────────
427
428    #[test]
429    fn test_hotplug_error_display() {
430        let e = HotplugError("something went wrong".into());
431        let s = format!("{e}");
432        assert!(s.contains("hotplug watch error"));
433        assert!(s.contains("something went wrong"));
434    }
435
436    #[test]
437    fn test_hotplug_error_from_notify() {
438        let notify_err = notify::Error::generic("test error");
439        let hp_err = HotplugError::from(notify_err);
440        let s = format!("{hp_err}");
441        assert!(s.contains("hotplug watch error"));
442    }
443}