zng_ext_fs_watcher/
lib.rs

1#![doc(html_favicon_url = "https://raw.githubusercontent.com/zng-ui/zng/main/examples/image/res/zng-logo-icon.png")]
2#![doc(html_logo_url = "https://raw.githubusercontent.com/zng-ui/zng/main/examples/image/res/zng-logo.png")]
3//!
4//! File system events and service.
5//!
6//! # Crate
7//!
8#![doc = include_str!(concat!("../", std::env!("CARGO_PKG_README")))]
9// suppress nag about very simple boxed closure signatures.
10#![expect(clippy::type_complexity)]
11#![warn(unused_extern_crates)]
12#![warn(missing_docs)]
13
14use std::{
15    fmt, fs,
16    io::{self, Write as _},
17    ops,
18    path::{Path, PathBuf},
19    sync::Arc,
20    time::Duration,
21};
22
23use path_absolutize::Absolutize;
24use zng_app::{
25    AppExtension,
26    event::{EventHandle, event, event_args},
27    handler::{AppHandler, FilterAppHandler},
28    update::EventUpdate,
29    view_process::raw_events::LOW_MEMORY_EVENT,
30};
31use zng_handle::Handle;
32use zng_txt::Txt;
33use zng_unit::TimeUnits;
34use zng_var::{Var, VarValue};
35
36mod service;
37use service::*;
38
39mod lock;
40use lock::*;
41
42/// Application extension that provides file system change events and service.
43///
44/// # Events
45///
46/// Events this extension provides.
47///
48/// * [`FS_CHANGES_EVENT`]
49///
50/// # Services
51///
52/// Services this extension provides.
53///
54/// * [`WATCHER`]
55#[derive(Default)]
56#[non_exhaustive]
57pub struct FsWatcherManager {}
58impl AppExtension for FsWatcherManager {
59    fn init(&mut self) {
60        WATCHER_SV.write().init_watcher();
61    }
62
63    fn event_preview(&mut self, update: &mut EventUpdate) {
64        if let Some(args) = FS_CHANGES_EVENT.on(update) {
65            WATCHER_SV.write().event(args);
66        } else if LOW_MEMORY_EVENT.on(update).is_some() {
67            WATCHER_SV.write().low_memory();
68        }
69    }
70
71    fn update_preview(&mut self) {
72        WATCHER_SV.write().update();
73    }
74
75    fn deinit(&mut self) {
76        let mut flush = WATCHER_SV.write().shutdown();
77        for v in &mut flush {
78            v.flush_shutdown();
79        }
80    }
81}
82
83/// File system watcher service.
84///
85/// This is mostly a wrapper around the [`notify`](https://docs.rs/notify) crate, integrating it with events and variables.
86pub struct WATCHER;
87impl WATCHER {
88    /// Gets a read-write variable that defines interval awaited between each [`FS_CHANGES_EVENT`]. If
89    /// a watched path is constantly changing an event will be emitted every elapse of this interval,
90    /// the event args will contain a list of all the changes observed during the interval.
91    ///
92    /// Note that the first event notifies immediately, only subsequent events within this interval are debounced.
93    ///
94    /// Is `100.ms()` by default.
95    pub fn debounce(&self) -> Var<Duration> {
96        WATCHER_SV.read().debounce.clone()
97    }
98
99    /// Gets a read-write variable that defines interval awaited between each [`sync`] write.
100    ///
101    /// Is `100.ms()` by default.
102    ///
103    /// [`sync`]: WATCHER::sync
104    pub fn sync_debounce(&self) -> Var<Duration> {
105        WATCHER_SV.read().debounce.clone()
106    }
107
108    /// Gets a read-write variable that defines the fallback poll watcher interval.
109    ///
110    /// When an efficient watcher cannot be used a poll watcher fallback is used, the poll watcher reads
111    /// the directory or path every elapse of this interval. The poll watcher is also used for paths that
112    /// do not exist yet, that is also affected by this interval.
113    ///
114    /// Is `1.secs()` by default.
115    pub fn poll_interval(&self) -> Var<Duration> {
116        WATCHER_SV.read().poll_interval.clone()
117    }
118
119    /// Maximum time the service keeps the process alive to process pending IO operations when the app shuts down.
120    ///
121    /// Is 1 minute by default.
122    pub fn shutdown_timeout(&self) -> Var<Duration> {
123        WATCHER_SV.read().shutdown_timeout.clone()
124    }
125
126    /// Enable file change events for the `file`.
127    ///
128    /// Returns a handle that will stop the file watch when dropped, if there is no other active handler for the same file.
129    ///
130    /// Note that this is implemented by actually watching the parent directory and filtering the events, this is done
131    /// to ensure the watcher survives operations that remove the file and then move another file to the same path.
132    ///
133    /// See [`watch_dir`] for more details.
134    ///
135    /// [`watch_dir`]: WATCHER::watch_dir
136    pub fn watch(&self, file: impl Into<PathBuf>) -> WatcherHandle {
137        WATCHER_SV.write().watch(file.into())
138    }
139
140    /// Enable file change events for files inside `dir`, also include inner directories if `recursive` is `true`.
141    ///
142    /// Returns a handle that will stop the dir watch when dropped, if there is no other active handler for the same directory.
143    ///
144    /// The directory will be watched using an OS specific efficient watcher provided by the [`notify`](https://docs.rs/notify) crate. If there is
145    /// any error creating the watcher, such as if the directory does not exist yet a slower polling watcher will retry periodically    
146    /// until the efficient watcher can be created or the handle is dropped.
147    pub fn watch_dir(&self, dir: impl Into<PathBuf>, recursive: bool) -> WatcherHandle {
148        WATCHER_SV.write().watch_dir(dir.into(), recursive)
149    }
150
151    /// Read a file into a variable, the `init` value will start the variable and the `read` closure will be called
152    /// once immediately and every time the file changes, if the closure returns `Some(O)` the variable updates with the new value.
153    ///
154    /// Dropping the variable drops the read watch. The `read` closure is non-blocking, it is called in a [`task::wait`]
155    /// background thread.
156    ///
157    /// [`task::wait`]: zng_task::wait
158    pub fn read<O: VarValue>(
159        &self,
160        file: impl Into<PathBuf>,
161        init: O,
162        read: impl FnMut(io::Result<WatchFile>) -> Option<O> + Send + 'static,
163    ) -> Var<O> {
164        WATCHER_SV.write().read(file.into(), init, read)
165    }
166
167    /// Same operation as [`read`] but also tracks the operation status in a second var.
168    ///
169    /// The status variable is set to [`WatcherReadStatus::reading`] as soon as `read` starts and
170    /// is set to [`WatcherReadStatus::idle`] when read returns. If read returns a value the status
171    /// only updates to idle  when the new value is available on the var, or because read the same value.
172    ///
173    /// [`read`]: Self::read
174    pub fn read_status<O, S, E>(
175        &self,
176        file: impl Into<PathBuf>,
177        init: O,
178        read: impl FnMut(io::Result<WatchFile>) -> Result<Option<O>, E> + Send + 'static,
179    ) -> (Var<O>, Var<S>)
180    where
181        O: VarValue,
182        S: WatcherReadStatus<E>,
183    {
184        WATCHER_SV.write().read_status(file.into(), init, read)
185    }
186
187    /// Read a directory into a variable, the `init` value will start the variable and the `read` closure will be called
188    /// once immediately and every time any changes happen inside the dir, if the closure returns `Some(O)` the variable updates with the new value.
189    ///
190    /// The `read` closure parameter is a directory walker from the [`walkdir`](https://docs.rs/walkdir) crate.
191    ///
192    /// The directory walker is pre-configured to skip the `dir` itself and to have a max-depth of 1 if not `recursive`, these configs can.
193    ///
194    /// Dropping the variable drops the read watch. The `read` closure is non-blocking, it is called in a [`task::wait`]
195    /// background thread.
196    ///
197    /// [`task::wait`]: zng_task::wait
198    pub fn read_dir<O: VarValue>(
199        &self,
200        dir: impl Into<PathBuf>,
201        recursive: bool,
202        init: O,
203        read: impl FnMut(walkdir::WalkDir) -> Option<O> + Send + 'static,
204    ) -> Var<O> {
205        WATCHER_SV.write().read_dir(dir.into(), recursive, init, read)
206    }
207
208    /// Same operation as [`read_dir`] but also tracks the operation status in a second var.
209    ///
210    /// The status variable is set to [`WatcherReadStatus::reading`] as soon as `read` starts and
211    /// is set to [`WatcherReadStatus::idle`] when read returns. If read returns a value the status
212    /// only updates to idle when the new value is available on the var, or because read the same value.
213    ///
214    /// [`read_dir`]: Self::read_dir
215    pub fn read_dir_status<O, S, E>(
216        &self,
217        dir: impl Into<PathBuf>,
218        recursive: bool,
219        init: O,
220        read: impl FnMut(walkdir::WalkDir) -> Result<Option<O>, E> + Send + 'static,
221    ) -> (Var<O>, Var<S>)
222    where
223        O: VarValue,
224        S: WatcherReadStatus<E>,
225    {
226        WATCHER_SV.write().read_dir_status(dir.into(), recursive, init, read)
227    }
228
229    /// Bind a file with a variable, the `file` will be `read` when it changes and be `write` when the variable changes,
230    /// writes are only applied on success and will not cause a `read` on the same sync task. The `init` value is used to
231    /// create the variable, if the `file` exists it will be `read` once at the beginning.
232    ///
233    /// Dropping the variable drops the read watch. The `read` and `write` closures are non-blocking, they are called in a [`task::wait`]
234    /// background thread.
235    ///
236    /// # Sync
237    ///
238    /// The file synchronization ensures that the file is only actually modified when writing is finished by writing
239    /// to a temporary file and committing a replace only if the write succeeded. The file is write-locked for the duration
240    /// of `write` call, but the contents are not touched until commit. See [`WriteFile`] for more details.
241    ///
242    /// The [`FsWatcherManager`] blocks on app exit until all writes commit or cancel.
243    ///
244    /// ## Read Errors
245    ///
246    /// Not-found errors are handled by the watcher by calling `write` using the current variable value, other read errors
247    /// are passed to `read`. If `read` returns a value for an error the `write` closure is called to override the file,
248    /// otherwise only the variable is set and this variable update does not cause a `write`.
249    ///
250    /// ## Write Errors
251    ///
252    /// If `write` fails the file is not touched and the temporary file is removed, if the file path
253    /// does not exit all missing parent folders and the file will be created automatically before the `write`
254    /// call.
255    ///
256    /// Note that [`WriteFile::commit`] must be called to flush the temporary file and attempt to rename
257    /// it, if the file is dropped without commit it will cancel and log an error, you must call [`WriteFile::cancel`]
258    /// to correctly avoid writing.
259    ///
260    /// If the cleanup after commit fails the error is logged and ignored.
261    ///
262    /// If write fails to even create the file and/or acquire a write lock on it this error is the input for
263    /// the `write` closure.
264    ///
265    /// ## Error Handling
266    ///
267    /// You can call services or set other variables from inside the `read` and `write` closures, this can be
268    /// used to get a signal out that perhaps drops the sync var (to stop watching), alert the user that the
269    /// file is out of sync and initiate some sort of recovery routine.
270    ///
271    /// If the file synchronization is not important you can just ignore it, the watcher will try again
272    /// on the next variable or file update.
273    ///
274    /// ## Status
275    ///
276    /// Note that `read` and `write` run in background task threads, so if you are tracking the operation
277    /// status in a separate variable you may end-up with synchronization bugs between th status variable
278    /// and the actual result variable, you can use [`sync_status`] to implement racing-free status tracking.
279    ///
280    /// [`sync_status`]: Self::sync_status
281    /// [`task::wait`]: zng_task::wait
282    pub fn sync<O: VarValue>(
283        &self,
284        file: impl Into<PathBuf>,
285        init: O,
286        read: impl FnMut(io::Result<WatchFile>) -> Option<O> + Send + 'static,
287        write: impl FnMut(O, io::Result<WriteFile>) + Send + 'static,
288    ) -> Var<O> {
289        WATCHER_SV.write().sync(file.into(), init, read, write)
290    }
291
292    /// Same operation as [`sync`] but also tracks the operation status in a second var.
293    ///
294    /// The status variable is set to [`WatcherReadStatus::reading`] as soon as `read` starts and
295    /// is set to [`WatcherReadStatus::idle`] when read returns. If read returns a value the status
296    /// only updates to idle when the new sync value is available, or because read the same value.
297    ///
298    /// The status variable is set to [`WatcherSyncStatus::writing`] as soon as it updates and
299    /// is set to [`WatcherReadStatus::idle`] only when the new sync value is available, either
300    /// by update or because read the same value.
301    ///
302    /// [`sync`]: Self::sync
303    pub fn sync_status<O, S, ER, EW>(
304        &self,
305        file: impl Into<PathBuf>,
306        init: O,
307        read: impl FnMut(io::Result<WatchFile>) -> Result<Option<O>, ER> + Send + 'static,
308        write: impl FnMut(O, io::Result<WriteFile>) -> Result<(), EW> + Send + 'static,
309    ) -> (Var<O>, Var<S>)
310    where
311        O: VarValue,
312        S: WatcherSyncStatus<ER, EW>,
313    {
314        WATCHER_SV.write().sync_status(file.into(), init, read, write)
315    }
316
317    /// Watch `file` and calls `handler` every time it changes.
318    ///
319    /// Note that the `handler` is blocking, use [`async_app_hn!`] and [`task::wait`] to run IO without
320    /// blocking the app.
321    ///
322    /// [`async_app_hn!`]: macro@zng_app::handler::async_app_hn
323    /// [`task::wait`]: zng_task::wait
324    pub fn on_file_changed(&self, file: impl Into<PathBuf>, handler: impl AppHandler<FsChangesArgs>) -> EventHandle {
325        let file = file.into();
326        let handle = self.watch(file.clone());
327        FS_CHANGES_EVENT.on_event(FilterAppHandler::new(handler, move |args| {
328            let _handle = &handle;
329            args.events_for_path(&file).next().is_some()
330        }))
331    }
332
333    /// Watch `dir` and calls `handler` every time something inside it changes.
334    ///
335    /// Note that the `handler` is blocking, use [`async_app_hn!`] and [`task::wait`] to run IO without
336    /// blocking the app.
337    ///
338    /// [`async_app_hn!`]: macro@zng_app::handler::async_app_hn
339    /// [`task::wait`]: zng_task::wait
340    pub fn on_dir_changed(&self, dir: impl Into<PathBuf>, recursive: bool, handler: impl AppHandler<FsChangesArgs>) -> EventHandle {
341        let dir = dir.into();
342        let handle = self.watch_dir(dir.clone(), recursive);
343        FS_CHANGES_EVENT.on_event(FilterAppHandler::new(handler, move |args| {
344            let _handle = &handle;
345            args.events_for_path(&dir).next().is_some()
346        }))
347    }
348
349    /// Push a `note` that will be cloned on all subsequent change events until it the returned handle is dropped.
350    ///
351    /// This can be used to tag all events that happened over a period of time, something you can't do just
352    /// by receiving the events due to async delays caused by debounce.
353    ///
354    /// Note that the underlying system events the [`notify`](https://docs.rs/notify) crate uses are not guaranteed to be synchronous.
355    pub fn annotate(&self, note: Arc<dyn FsChangeNote>) -> FsChangeNoteHandle {
356        WATCHER_SV.write().annotate(note)
357    }
358}
359
360/// Represents a status type for [`WATCHER.sync_status`].
361///
362/// [`WATCHER.sync_status`]: WATCHER::sync_status
363pub trait WatcherSyncStatus<ER = io::Error, EW = io::Error>: WatcherReadStatus<ER> {
364    /// New writing value.
365    fn writing() -> Self;
366    /// New write error value.
367    fn write_error(e: EW) -> Self;
368}
369
370/// Represents a status type for [`WATCHER`] read-only operations.
371pub trait WatcherReadStatus<ER = io::Error>: VarValue + PartialEq {
372    /// New idle value.
373    fn idle() -> Self;
374    /// New reading value.
375    fn reading() -> Self;
376    /// New read error value.
377    fn read_error(e: ER) -> Self;
378}
379
380/// Represents an open read-only file provided by [`WATCHER.read`].
381///
382/// This type is a thin wrapper around the [`std::fs::File`] with some convenience parsing methods.
383///
384/// [`WATCHER.read`]: WATCHER::read
385#[derive(Debug)]
386pub struct WatchFile(fs::File);
387impl WatchFile {
388    /// Open read the file.
389    pub fn open(file: impl AsRef<Path>) -> io::Result<Self> {
390        Self::try_open_non_empty(file.as_ref(), true)
391    }
392    fn try_open_non_empty(path: &Path, retry: bool) -> io::Result<Self> {
393        let file = fs::File::open(path)?;
394
395        if retry && file.metadata()?.len() == 0 {
396            // some apps create an empty file unlocked, then write.
397            let _ = file;
398            std::thread::sleep(5.ms());
399            return Self::try_open_non_empty(path, false);
400        }
401
402        lock_shared(&file, Duration::from_secs(10))?;
403        Ok(Self(file))
404    }
405
406    /// Read the file contents as a text string.
407    pub fn text(&mut self) -> io::Result<Txt> {
408        self.string().map(Txt::from)
409    }
410
411    /// Read the file contents as a string.
412    pub fn string(&mut self) -> io::Result<String> {
413        use std::io::Read;
414        let mut s = String::new();
415        self.0.read_to_string(&mut s)?;
416        Ok(s)
417    }
418
419    /// Deserialize the file contents as JSON.
420    #[cfg(feature = "json")]
421    pub fn json<O>(&mut self) -> serde_json::Result<O>
422    where
423        O: serde::de::DeserializeOwned,
424    {
425        serde_json::from_reader(io::BufReader::new(&mut self.0))
426    }
427
428    /// Deserialize the file contents as TOML.
429    #[cfg(feature = "toml")]
430    pub fn toml<O>(&mut self) -> io::Result<O>
431    where
432        O: serde::de::DeserializeOwned,
433    {
434        use std::io::Read;
435        let mut buf = io::BufReader::new(&mut self.0);
436
437        let mut toml_str = String::new();
438        buf.read_to_string(&mut toml_str)?;
439
440        toml::de::from_str(&toml_str).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
441    }
442
443    /// Deserialize the file content as RON.
444    #[cfg(feature = "ron")]
445    pub fn ron<O>(&mut self) -> Result<O, ron::de::SpannedError>
446    where
447        O: serde::de::DeserializeOwned,
448    {
449        ron::de::from_reader(io::BufReader::new(&mut self.0))
450    }
451
452    /// Deserialize the file content as YAML.
453    #[cfg(feature = "yaml")]
454    pub fn yaml<O>(&mut self) -> serde_yaml::Result<O>
455    where
456        O: serde::de::DeserializeOwned,
457    {
458        serde_yaml::from_reader(io::BufReader::new(&mut self.0))
459    }
460
461    /// Read file and parse it.
462    pub fn parse<O: std::str::FromStr>(&mut self) -> Result<O, WatchFileParseError<O::Err>> {
463        use std::io::Read;
464        let mut s = String::new();
465        self.0.read_to_string(&mut s)?;
466        O::from_str(&s).map_err(WatchFileParseError::Parse)
467    }
468}
469impl ops::Deref for WatchFile {
470    type Target = fs::File;
471
472    fn deref(&self) -> &Self::Target {
473        &self.0
474    }
475}
476impl ops::DerefMut for WatchFile {
477    fn deref_mut(&mut self) -> &mut Self::Target {
478        &mut self.0
479    }
480}
481impl Drop for WatchFile {
482    fn drop(&mut self) {
483        let _ = FileExt::unlock(&self.0);
484    }
485}
486
487const TRANSACTION_GUID: &str = "6eIw3bYMS0uKaQMkTIQacQ";
488const TRANSACTION_LOCK_EXT: &str = "6eIw3bYMS0uKaQMkTIQacQ-lock.tmp";
489
490/// Represents an open write file provided by [`WATCHER.sync`].
491///
492/// This type actually writes to a temporary file and rename it over the actual file on commit only.
493/// The dereferenced [`fs::File`] is the temporary file, not the actual one.
494///
495/// # Transaction
496///
497/// To minimize the risk of file corruption exclusive locks are used, both the target file and the temp file
498/// are locked. An empty lock file is also used to cover the moment when both files are unlocked for the rename operation
499/// and the moment the temp file is acquired.
500///
501/// The temp file is the actual file path with file extension replaced with `{path/.file-name.ext}.{GUID}-{n}.tmp`, the `n` is a
502/// number from 0 to 999, if a temp file exists unlocked it will be reused.
503///
504/// The lock file is `{path/.file-name.ext}.{GUID}-lock.tmp`. Note that this
505/// lock file only helps for apps that use [`WriteFile`], but even without it the risk is minimal as the slow
506/// write operations are already flushed when it is time to commit.
507///
508/// [`WATCHER.sync`]: WATCHER::sync
509pub struct WriteFile {
510    temp_file: Option<fs::File>,
511    actual_file: Option<fs::File>,
512    transaction_lock: Option<fs::File>,
513
514    actual_path: PathBuf,
515    temp_path: PathBuf,
516    transaction_path: PathBuf,
517
518    cleaned: bool,
519}
520impl Drop for WriteFile {
521    fn drop(&mut self) {
522        if !self.cleaned {
523            tracing::error!("dropped sync write file without commit or cancel");
524            self.clean();
525        }
526    }
527}
528impl ops::Deref for WriteFile {
529    type Target = fs::File;
530
531    fn deref(&self) -> &Self::Target {
532        self.temp_file.as_ref().unwrap()
533    }
534}
535impl ops::DerefMut for WriteFile {
536    fn deref_mut(&mut self) -> &mut Self::Target {
537        self.temp_file.as_mut().unwrap()
538    }
539}
540impl WriteFile {
541    /// Open or create the file.
542    pub fn open(path: PathBuf) -> io::Result<Self> {
543        let actual_path = path.absolutize()?.into_owned();
544        if !actual_path.exists()
545            && let Some(parent) = actual_path.parent()
546        {
547            std::fs::create_dir_all(parent)?;
548        }
549
550        let hidden_name = match actual_path.file_name() {
551            Some(n) => format!(".{}", n.to_string_lossy()),
552            None => return Err(io::Error::new(io::ErrorKind::InvalidInput, "expected file name")),
553        };
554
555        let transaction_path = actual_path.with_file_name(format!("{hidden_name}.{TRANSACTION_LOCK_EXT}"));
556        let transaction_lock = fs::OpenOptions::new()
557            .create(true)
558            .truncate(true)
559            .write(true)
560            .open(&transaction_path)?;
561
562        const TIMEOUT: Duration = Duration::from_secs(10);
563
564        lock_exclusive(&transaction_lock, TIMEOUT)?;
565
566        let actual_file = fs::OpenOptions::new().write(true).create(true).truncate(false).open(&actual_path)?;
567        lock_exclusive(&actual_file, TIMEOUT)?;
568
569        let mut n = 0;
570        let mut temp_path = actual_path.with_file_name(format!("{hidden_name}.{TRANSACTION_GUID}-{n}.tmp"));
571        let temp_file = loop {
572            if let Ok(f) = fs::OpenOptions::new().write(true).create(true).truncate(true).open(&temp_path)
573                && let Ok(true) = f.try_lock_exclusive()
574            {
575                break f;
576            }
577
578            n += 1;
579            temp_path = actual_path.with_file_name(format!("{hidden_name}.{TRANSACTION_GUID}-{n}.tmp"));
580            n += 1;
581            if n > 1000 {
582                return Err(io::Error::new(io::ErrorKind::AlreadyExists, "cannot create temporary file"));
583            }
584        };
585
586        Ok(Self {
587            actual_file: Some(actual_file),
588            temp_file: Some(temp_file),
589            transaction_lock: Some(transaction_lock),
590            actual_path,
591            temp_path,
592            transaction_path,
593            cleaned: false,
594        })
595    }
596
597    /// Write the text string.
598    pub fn write_text(&mut self, txt: &str) -> io::Result<()> {
599        self.write_all(txt.as_bytes())
600    }
601
602    /// Serialize and write.
603    ///
604    /// If `pretty` is `true` the JSON is formatted for human reading.
605    #[cfg(feature = "json")]
606    pub fn write_json<O: serde::Serialize>(&mut self, value: &O, pretty: bool) -> io::Result<()> {
607        let mut buf = io::BufWriter::new(ops::DerefMut::deref_mut(self));
608        if pretty {
609            serde_json::to_writer_pretty(&mut buf, value)?;
610        } else {
611            serde_json::to_writer(&mut buf, value)?;
612        }
613        buf.flush()
614    }
615
616    /// Serialize and write.
617    ///
618    /// If `pretty` is `true` the TOML is formatted for human reading.
619    #[cfg(feature = "toml")]
620    pub fn write_toml<O: serde::Serialize>(&mut self, value: &O, pretty: bool) -> io::Result<()> {
621        let toml = if pretty {
622            toml::ser::to_string_pretty(value)
623        } else {
624            toml::ser::to_string(value)
625        }
626        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
627
628        self.write_all(toml.as_bytes())
629    }
630
631    /// Serialize and write.
632    ///
633    /// If `pretty` is `true` the RON if formatted for human reading using the default pretty config.
634    #[cfg(feature = "ron")]
635    pub fn write_ron<O: serde::Serialize>(&mut self, value: &O, pretty: bool) -> io::Result<()> {
636        let buf = io::BufWriter::new(ops::DerefMut::deref_mut(self));
637        struct Ffs<'a> {
638            w: io::BufWriter<&'a mut fs::File>,
639        }
640        impl fmt::Write for Ffs<'_> {
641            fn write_str(&mut self, s: &str) -> fmt::Result {
642                self.w.write_all(s.as_bytes()).map_err(|_| fmt::Error)
643            }
644        }
645        let mut buf = Ffs { w: buf };
646        if pretty {
647            ron::ser::to_writer_pretty(&mut buf, value, Default::default()).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
648        } else {
649            ron::ser::to_writer(&mut buf, value).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
650        }
651        buf.w.flush()
652    }
653
654    /// Serialize and write.
655    #[cfg(feature = "yaml")]
656    pub fn write_yaml<O: serde::Serialize>(&mut self, value: &O) -> io::Result<()> {
657        let mut buf = io::BufWriter::new(ops::DerefMut::deref_mut(self));
658        serde_yaml::to_writer(&mut buf, value).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
659        buf.flush()
660    }
661
662    /// Commit write, flush and replace the actual file with the new one.
663    pub fn commit(mut self) -> io::Result<()> {
664        let r = self.replace_actual();
665        self.clean();
666        r
667    }
668
669    /// Cancel write, the file will not be updated.
670    pub fn cancel(mut self) {
671        self.clean();
672    }
673
674    fn replace_actual(&mut self) -> io::Result<()> {
675        let mut temp_file = self.temp_file.take().unwrap();
676        temp_file.flush()?;
677        temp_file.sync_all()?;
678
679        unlock_ok(&temp_file).unwrap();
680        drop(temp_file);
681
682        let actual_file = self.actual_file.take().unwrap();
683        unlock_ok(&actual_file)?;
684        drop(actual_file);
685
686        let mut retries = 0;
687        loop {
688            // commit by replacing the actual_path with already on disk temp_path file.
689            match fs::rename(&self.temp_path, &self.actual_path) {
690                Ok(()) => {
691                    break;
692                }
693                Err(e) => match e.kind() {
694                    io::ErrorKind::PermissionDenied => {
695                        if retries == 5 {
696                            // Give-up, we manage to write lock both temp and actual just
697                            // before this, but now we can't replace actual and remove temp.
698                            // Hardware issue? Or another process holding a lock for 1s+50ms*5.
699                            return Err(e);
700                        } else if retries > 0 {
701                            // Second+ retries:
702                            //
703                            // probably a system issue.
704                            //
705                            // Windows sporadically returns ACCESS_DENIED for kernel!SetRenameInformationFile in
706                            // other apps that use the same save pattern (write-tmp -> close-tmp -> rename).
707                            // see GIMP issue: https://gitlab.gnome.org/GNOME/gimp/-/issues/1370
708                            //
709                            // I used procmon to trace all file operations, there is no other app trying to use
710                            // the temp and actual files when the ACCESS_DENIED occurs, both files are unlocked and
711                            // closed before the rename calls start. This might be a Windows bug.
712                            std::thread::sleep(30.ms());
713                        } else {
714                            // first retry:
715                            //
716                            // probably another process reading the `actual_path`.
717                            //
718                            // Reacquire a write lock and unlock, just to wait the external app.
719                            match std::fs::File::options().write(true).open(&self.actual_path) {
720                                Ok(f) => {
721                                    if lock_exclusive(&f, 10.secs()).is_ok() {
722                                        // acquired actual ok, retry
723                                        let _ = unlock_ok(&f);
724                                    }
725                                }
726                                Err(e) => match e.kind() {
727                                    io::ErrorKind::NotFound => {
728                                        // all good, rename will create actual
729                                        continue;
730                                    }
731                                    _ => {
732                                        // unknown error, let retry handle it
733                                        std::thread::sleep(30.ms());
734                                    }
735                                },
736                            }
737                        }
738
739                        retries += 1;
740                    }
741                    _ => return Err(e),
742                },
743            }
744        }
745
746        Ok(())
747    }
748
749    fn clean(&mut self) {
750        self.cleaned = true;
751
752        if let Some(tmp) = self.temp_file.take() {
753            let _ = FileExt::unlock(&tmp);
754        }
755        if let Err(e) = fs::remove_file(&self.temp_path) {
756            tracing::debug!("failed to cleanup temp file, {e}")
757        }
758
759        if let Some(file) = self.actual_file.take() {
760            let _ = FileExt::unlock(&file);
761        }
762
763        let transaction = self.transaction_lock.take().unwrap();
764        let _ = FileExt::unlock(&transaction);
765        let _ = fs::remove_file(&self.transaction_path);
766    }
767}
768
769/// Error for [`WatchFile::parse`].
770#[derive(Debug)]
771#[non_exhaustive]
772pub enum WatchFileParseError<E> {
773    /// Error reading the file.
774    Io(io::Error),
775    /// Error parsing the file.
776    Parse(E),
777}
778impl<E> From<io::Error> for WatchFileParseError<E> {
779    fn from(value: io::Error) -> Self {
780        Self::Io(value)
781    }
782}
783impl<E: fmt::Display> fmt::Display for WatchFileParseError<E> {
784    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
785        match self {
786            WatchFileParseError::Io(e) => write!(f, "read error, {e}"),
787            WatchFileParseError::Parse(e) => write!(f, "parse error, {e}"),
788        }
789    }
790}
791impl<E: std::error::Error + 'static> std::error::Error for WatchFileParseError<E> {
792    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
793        match self {
794            WatchFileParseError::Io(e) => Some(e),
795            WatchFileParseError::Parse(e) => Some(e),
796        }
797    }
798}
799
800/// Represents a [`FsChange`] note.
801///
802/// This trait is already implemented for types it applies.
803#[diagnostic::on_unimplemented(note = "`FsChangeNote` is implemented for all `T: Debug + Any + Send + Sync`")]
804pub trait FsChangeNote: fmt::Debug + std::any::Any + Send + Sync {
805    /// Access any.
806    fn as_any(&self) -> &dyn std::any::Any;
807}
808impl<T: fmt::Debug + std::any::Any + Send + Sync> FsChangeNote for T {
809    fn as_any(&self) -> &dyn std::any::Any {
810        self
811    }
812}
813
814/// Handle that holds a [`WATCHER.annotate`] note.
815///
816/// [`WATCHER.annotate`]: WATCHER::annotate
817#[derive(Clone)]
818#[must_use = "the note is removed when the handle is dropped"]
819pub struct FsChangeNoteHandle(#[expect(dead_code)] Arc<Arc<dyn FsChangeNote>>);
820
821/// Annotation for file watcher events and var update tags.
822///
823/// Identifies the [`WATCHER.sync`] file that is currently being written to.
824///
825/// [`WATCHER.sync`]: WATCHER::sync
826#[derive(Debug, PartialEq, Eq)]
827pub struct WatcherSyncWriteNote(PathBuf);
828impl WatcherSyncWriteNote {
829    /// Deref.
830    pub fn as_path(&self) -> &Path {
831        self
832    }
833}
834impl ops::Deref for WatcherSyncWriteNote {
835    type Target = Path;
836
837    fn deref(&self) -> &Self::Target {
838        self.0.as_path()
839    }
840}
841
842/// File system change event types.
843///
844/// The event for each change is available in [`FsChange::event`].
845///
846/// This module re-exports types from the [`notify`](https://docs.rs/notify) crate.
847pub mod fs_event {
848    pub use notify::event::{
849        AccessKind, AccessMode, CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind, RenameMode,
850    };
851    pub use notify::{Error, ErrorKind};
852}
853
854/// Represents a single file system change, annotated.
855#[derive(Debug)]
856#[non_exhaustive]
857pub struct FsChange {
858    /// All [`WATCHER.annotate`] that where set when this event happened.
859    ///
860    /// [`WATCHER.annotate`]: WATCHER::annotate
861    pub notes: Vec<Arc<dyn FsChangeNote>>,
862
863    /// The actual notify event or error.
864    pub event: Result<fs_event::Event, fs_event::Error>,
865}
866impl FsChange {
867    /// If the change affects the `path`.
868    pub fn is_for_path(&self, path: &Path) -> bool {
869        if let Ok(ev) = &self.event {
870            return ev.paths.iter().any(|p| p.starts_with(path));
871        }
872        false
873    }
874
875    /// If the change affects any path matched by the glob pattern.
876    pub fn is_for_glob(&self, pattern: &glob::Pattern) -> bool {
877        if let Ok(ev) = &self.event {
878            return ev.paths.iter().any(|p| pattern.matches_path(p));
879        }
880        false
881    }
882
883    /// Iterate over all notes of the type `T`.
884    pub fn notes<T: FsChangeNote>(&self) -> impl Iterator<Item = &T> {
885        self.notes.iter().filter_map(|n| FsChangeNote::as_any(&**n).downcast_ref::<T>())
886    }
887}
888
889event_args! {
890    /// [`FS_CHANGES_EVENT`] arguments.
891    pub struct FsChangesArgs {
892        /// All notify changes since the last event.
893        pub changes: Arc<Vec<FsChange>>,
894
895        ..
896
897        /// None, only app level handlers receive this event.
898        fn delivery_list(&self, list: &mut UpdateDeliveryList) {
899            let _ = list;
900        }
901    }
902}
903impl FsChangesArgs {
904    /// Iterate over all change events.
905    pub fn events(&self) -> impl Iterator<Item = &fs_event::Event> + '_ {
906        self.changes.iter().filter_map(|r| r.event.as_ref().ok())
907    }
908
909    /// Iterate over all file watcher errors.
910    pub fn errors(&self) -> impl Iterator<Item = &notify::Error> + '_ {
911        self.changes.iter().filter_map(|r| r.event.as_ref().err())
912    }
913
914    /// Returns `true` is some events where lost.
915    ///
916    /// This indicates either a lapse in the events or a change in the filesystem such that events
917    /// received so far can no longer be relied on to represent the state of the filesystem now.
918    ///
919    /// An application that simply reacts to file changes may not care about this. An application
920    /// that keeps an in-memory representation of the filesystem will need to care, and will need
921    /// to refresh that representation directly from the filesystem.
922    pub fn rescan(&self) -> bool {
923        self.events().any(|e| e.need_rescan())
924    }
925
926    /// Iterate over all changes that affects paths selected by the `glob` pattern.
927    pub fn changes_for(&self, glob: &str) -> Result<impl Iterator<Item = &FsChange> + '_, glob::PatternError> {
928        let glob = glob::Pattern::new(glob)?;
929        Ok(self.changes.iter().filter(move |c| c.is_for_glob(&glob)))
930    }
931
932    /// Iterate over all changes that affects paths that are equal to `path` or inside it.
933    pub fn changes_for_path<'a>(&'a self, path: &'a Path) -> impl Iterator<Item = &'a FsChange> + 'a {
934        self.changes.iter().filter(move |c| c.is_for_path(path))
935    }
936
937    /// Iterate over all change events that affects that are equal to `path` or inside it.
938    pub fn events_for(&self, glob: &str) -> Result<impl Iterator<Item = &fs_event::Event> + '_, glob::PatternError> {
939        let glob = glob::Pattern::new(glob)?;
940        Ok(self.events().filter(move |ev| ev.paths.iter().any(|p| glob.matches_path(p))))
941    }
942
943    /// Iterate over all change events that affects paths that are equal to `path` or inside it.
944    pub fn events_for_path<'a>(&'a self, path: &'a Path) -> impl Iterator<Item = &'a fs_event::Event> + 'a {
945        self.events().filter(move |ev| ev.paths.iter().any(|p| p.starts_with(path)))
946    }
947}
948
949event! {
950    /// Event sent by the [`WATCHER`] service on directories or files that are watched.
951    pub static FS_CHANGES_EVENT: FsChangesArgs;
952}
953
954/// Represents an active file or directory watcher in [`WATCHER`].
955#[derive(Clone)]
956#[must_use = "the watcher is dropped if the handle is dropped"]
957pub struct WatcherHandle(Handle<()>);
958
959impl WatcherHandle {
960    /// Handle to no watcher.
961    pub fn dummy() -> Self {
962        Self(Handle::dummy(()))
963    }
964
965    /// If [`perm`](Self::perm) was called in another clone of this handle.
966    ///
967    /// If `true` the resource will stay in memory for the duration of the app, unless [`force_drop`](Self::force_drop)
968    /// is also called.
969    pub fn is_permanent(&self) -> bool {
970        self.0.is_permanent()
971    }
972
973    /// Force drops the watcher, meaning it will be dropped even if there are other handles active.
974    pub fn force_drop(self) {
975        self.0.force_drop()
976    }
977
978    /// If the watcher is dropped.
979    pub fn is_dropped(&self) -> bool {
980        self.0.is_dropped()
981    }
982
983    /// Drop the handle without dropping the watcher, the watcher will stay active for the
984    /// duration of the app process.
985    pub fn perm(self) {
986        self.0.perm()
987    }
988}