zng_ext_fs_watcher/
lib.rs

1#![doc(html_favicon_url = "https://raw.githubusercontent.com/zng-ui/zng/main/examples/image/res/zng-logo-icon.png")]
2#![doc(html_logo_url = "https://raw.githubusercontent.com/zng-ui/zng/main/examples/image/res/zng-logo.png")]
3//!
4//! File system events and service.
5//!
6//! # Crate
7//!
8#![doc = include_str!(concat!("../", std::env!("CARGO_PKG_README")))]
9// suppress nag about very simple boxed closure signatures.
10#![expect(clippy::type_complexity)]
11#![warn(unused_extern_crates)]
12#![warn(missing_docs)]
13
14use std::{
15    fmt, fs,
16    io::{self, Write as _},
17    ops,
18    path::{Path, PathBuf},
19    sync::Arc,
20    time::Duration,
21};
22
23use path_absolutize::Absolutize;
24use zng_app::{
25    AppExtension,
26    event::{EventHandle, event, event_args},
27    handler::{AppHandler, FilterAppHandler},
28    update::EventUpdate,
29    view_process::raw_events::LOW_MEMORY_EVENT,
30};
31use zng_handle::Handle;
32use zng_txt::Txt;
33use zng_unit::TimeUnits;
34use zng_var::{ArcVar, ReadOnlyArcVar, VarValue};
35
36mod service;
37use service::*;
38
39mod lock;
40use lock::*;
41
42/// Application extension that provides file system change events and service.
43///
44/// # Events
45///
46/// Events this extension provides.
47///
48/// * [`FS_CHANGES_EVENT`]
49///
50/// # Services
51///
52/// Services this extension provides.
53///
54/// * [`WATCHER`]
55#[derive(Default)]
56pub struct FsWatcherManager {}
57impl AppExtension for FsWatcherManager {
58    fn init(&mut self) {
59        WATCHER_SV.write().init_watcher();
60    }
61
62    fn event_preview(&mut self, update: &mut EventUpdate) {
63        if let Some(args) = FS_CHANGES_EVENT.on(update) {
64            WATCHER_SV.write().event(args);
65        } else if LOW_MEMORY_EVENT.on(update).is_some() {
66            WATCHER_SV.write().low_memory();
67        }
68    }
69
70    fn update_preview(&mut self) {
71        WATCHER_SV.write().update();
72    }
73
74    fn deinit(&mut self) {
75        let mut flush = WATCHER_SV.write().shutdown();
76        for v in &mut flush {
77            v.flush_shutdown();
78        }
79    }
80}
81
82/// File system watcher service.
83///
84/// This is mostly a wrapper around the [`notify`](https://docs.rs/notify) crate, integrating it with events and variables.
85pub struct WATCHER;
86impl WATCHER {
87    /// Gets a read-write variable that defines interval awaited between each [`FS_CHANGES_EVENT`]. If
88    /// a watched path is constantly changing an event will be emitted every elapse of this interval,
89    /// the event args will contain a list of all the changes observed during the interval.
90    ///
91    /// Note that the first event notifies immediately, only subsequent events within this interval are debounced.
92    ///
93    /// Is `100.ms()` by default.
94    pub fn debounce(&self) -> ArcVar<Duration> {
95        WATCHER_SV.read().debounce.clone()
96    }
97
98    /// Gets a read-write variable that defines interval awaited between each [`sync`] write.
99    ///
100    /// Is `100.ms()` by default.
101    ///
102    /// [`sync`]: WATCHER::sync
103    pub fn sync_debounce(&self) -> ArcVar<Duration> {
104        WATCHER_SV.read().debounce.clone()
105    }
106
107    /// Gets a read-write variable that defines the fallback poll watcher interval.
108    ///
109    /// When an efficient watcher cannot be used a poll watcher fallback is used, the poll watcher reads
110    /// the directory or path every elapse of this interval. The poll watcher is also used for paths that
111    /// do not exist yet, that is also affected by this interval.
112    ///
113    /// Is `1.secs()` by default.
114    pub fn poll_interval(&self) -> ArcVar<Duration> {
115        WATCHER_SV.read().poll_interval.clone()
116    }
117
118    /// Maximum time the service keeps the process alive to process pending IO operations when the app shuts down.
119    ///
120    /// Is 1 minute by default.
121    pub fn shutdown_timeout(&self) -> ArcVar<Duration> {
122        WATCHER_SV.read().shutdown_timeout.clone()
123    }
124
125    /// Enable file change events for the `file`.
126    ///
127    /// Returns a handle that will stop the file watch when dropped, if there is no other active handler for the same file.
128    ///
129    /// Note that this is implemented by actually watching the parent directory and filtering the events, this is done
130    /// to ensure the watcher survives operations that remove the file and then move another file to the same path.
131    ///
132    /// See [`watch_dir`] for more details.
133    ///
134    /// [`watch_dir`]: WATCHER::watch_dir
135    pub fn watch(&self, file: impl Into<PathBuf>) -> WatcherHandle {
136        WATCHER_SV.write().watch(file.into())
137    }
138
139    /// Enable file change events for files inside `dir`, also include inner directories if `recursive` is `true`.
140    ///
141    /// Returns a handle that will stop the dir watch when dropped, if there is no other active handler for the same directory.
142    ///
143    /// The directory will be watched using an OS specific efficient watcher provided by the [`notify`](https://docs.rs/notify) crate. If there is
144    /// any error creating the watcher, such as if the directory does not exist yet a slower polling watcher will retry periodically    
145    /// until the efficient watcher can be created or the handle is dropped.
146    pub fn watch_dir(&self, dir: impl Into<PathBuf>, recursive: bool) -> WatcherHandle {
147        WATCHER_SV.write().watch_dir(dir.into(), recursive)
148    }
149
150    /// Read a file into a variable, the `init` value will start the variable and the `read` closure will be called
151    /// once immediately and every time the file changes, if the closure returns `Some(O)` the variable updates with the new value.
152    ///
153    /// Dropping the variable drops the read watch. The `read` closure is non-blocking, it is called in a [`task::wait`]
154    /// background thread.
155    ///
156    /// [`task::wait`]: zng_task::wait
157    pub fn read<O: VarValue>(
158        &self,
159        file: impl Into<PathBuf>,
160        init: O,
161        read: impl FnMut(io::Result<WatchFile>) -> Option<O> + Send + 'static,
162    ) -> ReadOnlyArcVar<O> {
163        WATCHER_SV.write().read(file.into(), init, read)
164    }
165
166    /// Same operation as [`read`] but also tracks the operation status in a second var.
167    ///
168    /// The status variable is set to [`WatcherReadStatus::reading`] as soon as `read` starts and
169    /// is set to [`WatcherReadStatus::idle`] when read returns. If read returns a value the status
170    /// only updates to idle  when the new value is available on the var, or because read the same value.
171    ///
172    /// [`read`]: Self::read
173    pub fn read_status<O, S, E>(
174        &self,
175        file: impl Into<PathBuf>,
176        init: O,
177        read: impl FnMut(io::Result<WatchFile>) -> Result<Option<O>, E> + Send + 'static,
178    ) -> (ReadOnlyArcVar<O>, ReadOnlyArcVar<S>)
179    where
180        O: VarValue,
181        S: WatcherReadStatus<E>,
182    {
183        WATCHER_SV.write().read_status(file.into(), init, read)
184    }
185
186    /// Read a directory into a variable, the `init` value will start the variable and the `read` closure will be called
187    /// once immediately and every time any changes happen inside the dir, if the closure returns `Some(O)` the variable updates with the new value.
188    ///
189    /// The `read` closure parameter is a directory walker from the [`walkdir`](https://docs.rs/walkdir) crate.
190    ///
191    /// The directory walker is pre-configured to skip the `dir` itself and to have a max-depth of 1 if not `recursive`, these configs can.
192    ///
193    /// Dropping the variable drops the read watch. The `read` closure is non-blocking, it is called in a [`task::wait`]
194    /// background thread.
195    ///
196    /// [`task::wait`]: zng_task::wait
197    pub fn read_dir<O: VarValue>(
198        &self,
199        dir: impl Into<PathBuf>,
200        recursive: bool,
201        init: O,
202        read: impl FnMut(walkdir::WalkDir) -> Option<O> + Send + 'static,
203    ) -> ReadOnlyArcVar<O> {
204        WATCHER_SV.write().read_dir(dir.into(), recursive, init, read)
205    }
206
207    /// Same operation as [`read_dir`] but also tracks the operation status in a second var.
208    ///
209    /// The status variable is set to [`WatcherReadStatus::reading`] as soon as `read` starts and
210    /// is set to [`WatcherReadStatus::idle`] when read returns. If read returns a value the status
211    /// only updates to idle when the new value is available on the var, or because read the same value.
212    ///
213    /// [`read_dir`]: Self::read_dir
214    pub fn read_dir_status<O, S, E>(
215        &self,
216        dir: impl Into<PathBuf>,
217        recursive: bool,
218        init: O,
219        read: impl FnMut(walkdir::WalkDir) -> Result<Option<O>, E> + Send + 'static,
220    ) -> (ReadOnlyArcVar<O>, ReadOnlyArcVar<S>)
221    where
222        O: VarValue,
223        S: WatcherReadStatus<E>,
224    {
225        WATCHER_SV.write().read_dir_status(dir.into(), recursive, init, read)
226    }
227
228    /// Bind a file with a variable, the `file` will be `read` when it changes and be `write` when the variable changes,
229    /// writes are only applied on success and will not cause a `read` on the same sync task. The `init` value is used to
230    /// create the variable, if the `file` exists it will be `read` once at the beginning.
231    ///
232    /// Dropping the variable drops the read watch. The `read` and `write` closures are non-blocking, they are called in a [`task::wait`]
233    /// background thread.
234    ///
235    /// # Sync
236    ///
237    /// The file synchronization ensures that the file is only actually modified when writing is finished by writing
238    /// to a temporary file and committing a replace only if the write succeeded. The file is write-locked for the duration
239    /// of `write` call, but the contents are not touched until commit. See [`WriteFile`] for more details.
240    ///
241    /// The [`FsWatcherManager`] blocks on app exit until all writes commit or cancel.
242    ///
243    /// ## Read Errors
244    ///
245    /// Not-found errors are handled by the watcher by calling `write` using the current variable value, other read errors
246    /// are passed to `read`. If `read` returns a value for an error the `write` closure is called to override the file,
247    /// otherwise only the variable is set and this variable update does not cause a `write`.
248    ///
249    /// ## Write Errors
250    ///
251    /// If `write` fails the file is not touched and the temporary file is removed, if the file path
252    /// does not exit all missing parent folders and the file will be created automatically before the `write`
253    /// call.
254    ///
255    /// Note that [`WriteFile::commit`] must be called to flush the temporary file and attempt to rename
256    /// it, if the file is dropped without commit it will cancel and log an error, you must call [`WriteFile::cancel`]
257    /// to correctly avoid writing.
258    ///
259    /// If the cleanup after commit fails the error is logged and ignored.
260    ///
261    /// If write fails to even create the file and/or acquire a write lock on it this error is the input for
262    /// the `write` closure.
263    ///
264    /// ## Error Handling
265    ///
266    /// You can call services or set other variables from inside the `read` and `write` closures, this can be
267    /// used to get a signal out that perhaps drops the sync var (to stop watching), alert the user that the
268    /// file is out of sync and initiate some sort of recovery routine.
269    ///
270    /// If the file synchronization is not important you can just ignore it, the watcher will try again
271    /// on the next variable or file update.
272    ///
273    /// ## Status
274    ///
275    /// Note that `read` and `write` run in background task threads, so if you are tracking the operation
276    /// status in a separate variable you may end-up with synchronization bugs between th status variable
277    /// and the actual result variable, you can use [`sync_status`] to implement racing-free status tracking.
278    ///
279    /// [`sync_status`]: Self::sync_status
280    /// [`task::wait`]: zng_task::wait
281    pub fn sync<O: VarValue>(
282        &self,
283        file: impl Into<PathBuf>,
284        init: O,
285        read: impl FnMut(io::Result<WatchFile>) -> Option<O> + Send + 'static,
286        write: impl FnMut(O, io::Result<WriteFile>) + Send + 'static,
287    ) -> ArcVar<O> {
288        WATCHER_SV.write().sync(file.into(), init, read, write)
289    }
290
291    /// Same operation as [`sync`] but also tracks the operation status in a second var.
292    ///
293    /// The status variable is set to [`WatcherReadStatus::reading`] as soon as `read` starts and
294    /// is set to [`WatcherReadStatus::idle`] when read returns. If read returns a value the status
295    /// only updates to idle when the new sync value is available, or because read the same value.
296    ///
297    /// The status variable is set to [`WatcherSyncStatus::writing`] as soon as it updates and
298    /// is set to [`WatcherReadStatus::idle`] only when the new sync value is available, either
299    /// by update or because read the same value.
300    ///
301    /// [`sync`]: Self::sync
302    pub fn sync_status<O, S, ER, EW>(
303        &self,
304        file: impl Into<PathBuf>,
305        init: O,
306        read: impl FnMut(io::Result<WatchFile>) -> Result<Option<O>, ER> + Send + 'static,
307        write: impl FnMut(O, io::Result<WriteFile>) -> Result<(), EW> + Send + 'static,
308    ) -> (ArcVar<O>, ReadOnlyArcVar<S>)
309    where
310        O: VarValue,
311        S: WatcherSyncStatus<ER, EW>,
312    {
313        WATCHER_SV.write().sync_status(file.into(), init, read, write)
314    }
315
316    /// Watch `file` and calls `handler` every time it changes.
317    ///
318    /// Note that the `handler` is blocking, use [`async_app_hn!`] and [`task::wait`] to run IO without
319    /// blocking the app.
320    ///
321    /// [`async_app_hn!`]: macro@zng_app::handler::async_app_hn
322    /// [`task::wait`]: zng_task::wait
323    pub fn on_file_changed(&self, file: impl Into<PathBuf>, handler: impl AppHandler<FsChangesArgs>) -> EventHandle {
324        let file = file.into();
325        let handle = self.watch(file.clone());
326        FS_CHANGES_EVENT.on_event(FilterAppHandler::new(handler, move |args| {
327            let _handle = &handle;
328            args.events_for_path(&file).next().is_some()
329        }))
330    }
331
332    /// Watch `dir` and calls `handler` every time something inside it changes.
333    ///
334    /// Note that the `handler` is blocking, use [`async_app_hn!`] and [`task::wait`] to run IO without
335    /// blocking the app.
336    ///
337    /// [`async_app_hn!`]: macro@zng_app::handler::async_app_hn
338    /// [`task::wait`]: zng_task::wait
339    pub fn on_dir_changed(&self, dir: impl Into<PathBuf>, recursive: bool, handler: impl AppHandler<FsChangesArgs>) -> EventHandle {
340        let dir = dir.into();
341        let handle = self.watch_dir(dir.clone(), recursive);
342        FS_CHANGES_EVENT.on_event(FilterAppHandler::new(handler, move |args| {
343            let _handle = &handle;
344            args.events_for_path(&dir).next().is_some()
345        }))
346    }
347
348    /// Push a `note` that will be cloned on all subsequent change events until it the returned handle is dropped.
349    ///
350    /// This can be used to tag all events that happened over a period of time, something you can't do just
351    /// by receiving the events due to async delays caused by debounce.
352    ///
353    /// Note that the underlying system events the [`notify`](https://docs.rs/notify) crate uses are not guaranteed to be synchronous.
354    pub fn annotate(&self, note: Arc<dyn FsChangeNote>) -> FsChangeNoteHandle {
355        WATCHER_SV.write().annotate(note)
356    }
357}
358
359/// Represents a status type for [`WATCHER.sync_status`].
360///
361/// [`WATCHER.sync_status`]: WATCHER::sync_status
362pub trait WatcherSyncStatus<ER = io::Error, EW = io::Error>: WatcherReadStatus<ER> {
363    /// New writing value.
364    fn writing() -> Self;
365    /// New write error value.
366    fn write_error(e: EW) -> Self;
367}
368
369/// Represents a status type for [`WATCHER`] read-only operations.
370pub trait WatcherReadStatus<ER = io::Error>: VarValue + PartialEq {
371    /// New idle value.
372    fn idle() -> Self;
373    /// New reading value.
374    fn reading() -> Self;
375    /// New read error value.
376    fn read_error(e: ER) -> Self;
377}
378
379/// Represents an open read-only file provided by [`WATCHER.read`].
380///
381/// This type is a thin wrapper around the [`std::fs::File`] with some convenience parsing methods.
382///
383/// [`WATCHER.read`]: WATCHER::read
384#[derive(Debug)]
385pub struct WatchFile(fs::File);
386impl WatchFile {
387    /// Open read the file.
388    pub fn open(file: impl AsRef<Path>) -> io::Result<Self> {
389        Self::try_open_non_empty(file.as_ref(), true)
390    }
391    fn try_open_non_empty(path: &Path, retry: bool) -> io::Result<Self> {
392        let file = fs::File::open(path)?;
393
394        if retry && file.metadata()?.len() == 0 {
395            // some apps create an empty file unlocked, then write.
396            let _ = file;
397            std::thread::sleep(5.ms());
398            return Self::try_open_non_empty(path, false);
399        }
400
401        lock_shared(&file, Duration::from_secs(10))?;
402        Ok(Self(file))
403    }
404
405    /// Read the file contents as a text string.
406    pub fn text(&mut self) -> io::Result<Txt> {
407        self.string().map(Txt::from)
408    }
409
410    /// Read the file contents as a string.
411    pub fn string(&mut self) -> io::Result<String> {
412        use std::io::Read;
413        let mut s = String::new();
414        self.0.read_to_string(&mut s)?;
415        Ok(s)
416    }
417
418    /// Deserialize the file contents as JSON.
419    #[cfg(feature = "json")]
420    pub fn json<O>(&mut self) -> serde_json::Result<O>
421    where
422        O: serde::de::DeserializeOwned,
423    {
424        serde_json::from_reader(io::BufReader::new(&mut self.0))
425    }
426
427    /// Deserialize the file contents as TOML.
428    #[cfg(feature = "toml")]
429    pub fn toml<O>(&mut self) -> io::Result<O>
430    where
431        O: serde::de::DeserializeOwned,
432    {
433        use std::io::Read;
434        let mut buf = io::BufReader::new(&mut self.0);
435
436        let mut toml_str = String::new();
437        buf.read_to_string(&mut toml_str)?;
438
439        toml::de::from_str(&toml_str).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
440    }
441
442    /// Deserialize the file content as RON.
443    #[cfg(feature = "ron")]
444    pub fn ron<O>(&mut self) -> Result<O, ron::de::SpannedError>
445    where
446        O: serde::de::DeserializeOwned,
447    {
448        ron::de::from_reader(io::BufReader::new(&mut self.0))
449    }
450
451    /// Deserialize the file content as YAML.
452    #[cfg(feature = "yaml")]
453    pub fn yaml<O>(&mut self) -> serde_yaml::Result<O>
454    where
455        O: serde::de::DeserializeOwned,
456    {
457        serde_yaml::from_reader(io::BufReader::new(&mut self.0))
458    }
459
460    /// Read file and parse it.
461    pub fn parse<O: std::str::FromStr>(&mut self) -> Result<O, WatchFileParseError<O::Err>> {
462        use std::io::Read;
463        let mut s = String::new();
464        self.0.read_to_string(&mut s)?;
465        O::from_str(&s).map_err(WatchFileParseError::Parse)
466    }
467}
468impl ops::Deref for WatchFile {
469    type Target = fs::File;
470
471    fn deref(&self) -> &Self::Target {
472        &self.0
473    }
474}
475impl ops::DerefMut for WatchFile {
476    fn deref_mut(&mut self) -> &mut Self::Target {
477        &mut self.0
478    }
479}
480impl Drop for WatchFile {
481    fn drop(&mut self) {
482        let _ = FileExt::unlock(&self.0);
483    }
484}
485
486const TRANSACTION_GUID: &str = "6eIw3bYMS0uKaQMkTIQacQ";
487const TRANSACTION_LOCK_EXT: &str = "6eIw3bYMS0uKaQMkTIQacQ-lock.tmp";
488
489/// Represents an open write file provided by [`WATCHER.sync`].
490///
491/// This type actually writes to a temporary file and rename it over the actual file on commit only.
492/// The dereferenced [`fs::File`] is the temporary file, not the actual one.
493///
494/// # Transaction
495///
496/// To minimize the risk of file corruption exclusive locks are used, both the target file and the temp file
497/// are locked. An empty lock file is also used to cover the moment when both files are unlocked for the rename operation
498/// and the moment the temp file is acquired.
499///
500/// The temp file is the actual file path with file extension replaced with `{path/.file-name.ext}.{GUID}-{n}.tmp`, the `n` is a
501/// number from 0 to 999, if a temp file exists unlocked it will be reused.
502///
503/// The lock file is `{path/.file-name.ext}.{GUID}-lock.tmp`. Note that this
504/// lock file only helps for apps that use [`WriteFile`], but even without it the risk is minimal as the slow
505/// write operations are already flushed when it is time to commit.
506///
507/// [`WATCHER.sync`]: WATCHER::sync
508pub struct WriteFile {
509    temp_file: Option<fs::File>,
510    actual_file: Option<fs::File>,
511    transaction_lock: Option<fs::File>,
512
513    actual_path: PathBuf,
514    temp_path: PathBuf,
515    transaction_path: PathBuf,
516
517    cleaned: bool,
518}
519impl Drop for WriteFile {
520    fn drop(&mut self) {
521        if !self.cleaned {
522            tracing::error!("dropped sync write file without commit or cancel");
523            self.clean();
524        }
525    }
526}
527impl ops::Deref for WriteFile {
528    type Target = fs::File;
529
530    fn deref(&self) -> &Self::Target {
531        self.temp_file.as_ref().unwrap()
532    }
533}
534impl ops::DerefMut for WriteFile {
535    fn deref_mut(&mut self) -> &mut Self::Target {
536        self.temp_file.as_mut().unwrap()
537    }
538}
539impl WriteFile {
540    /// Open or create the file.
541    pub fn open(path: PathBuf) -> io::Result<Self> {
542        let actual_path = path.absolutize()?.into_owned();
543        if !actual_path.exists() {
544            if let Some(parent) = actual_path.parent() {
545                std::fs::create_dir_all(parent)?;
546            }
547        }
548
549        let hidden_name = match actual_path.file_name() {
550            Some(n) => format!(".{}", n.to_string_lossy()),
551            None => return Err(io::Error::new(io::ErrorKind::InvalidInput, "expected file name")),
552        };
553
554        let transaction_path = actual_path.with_file_name(format!("{hidden_name}.{TRANSACTION_LOCK_EXT}"));
555        let transaction_lock = fs::OpenOptions::new()
556            .create(true)
557            .truncate(true)
558            .write(true)
559            .open(&transaction_path)?;
560
561        const TIMEOUT: Duration = Duration::from_secs(10);
562
563        lock_exclusive(&transaction_lock, TIMEOUT)?;
564
565        let actual_file = fs::OpenOptions::new().write(true).create(true).truncate(false).open(&actual_path)?;
566        lock_exclusive(&actual_file, TIMEOUT)?;
567
568        let mut n = 0;
569        let mut temp_path = actual_path.with_file_name(format!("{hidden_name}.{TRANSACTION_GUID}-{n}.tmp"));
570        let temp_file = loop {
571            if let Ok(f) = fs::OpenOptions::new().write(true).create(true).truncate(true).open(&temp_path) {
572                if f.try_lock_exclusive().is_ok() {
573                    break f;
574                }
575            }
576
577            n += 1;
578            temp_path = actual_path.with_file_name(format!("{hidden_name}.{TRANSACTION_GUID}-{n}.tmp"));
579            n += 1;
580            if n > 1000 {
581                return Err(io::Error::new(io::ErrorKind::AlreadyExists, "cannot create temporary file"));
582            }
583        };
584
585        Ok(Self {
586            actual_file: Some(actual_file),
587            temp_file: Some(temp_file),
588            transaction_lock: Some(transaction_lock),
589            actual_path,
590            temp_path,
591            transaction_path,
592            cleaned: false,
593        })
594    }
595
596    /// Write the text string.
597    pub fn write_text(&mut self, txt: &str) -> io::Result<()> {
598        self.write_all(txt.as_bytes())
599    }
600
601    /// Serialize and write.
602    ///
603    /// If `pretty` is `true` the JSON is formatted for human reading.
604    #[cfg(feature = "json")]
605    pub fn write_json<O: serde::Serialize>(&mut self, value: &O, pretty: bool) -> io::Result<()> {
606        let mut buf = io::BufWriter::new(ops::DerefMut::deref_mut(self));
607        if pretty {
608            serde_json::to_writer_pretty(&mut buf, value)?;
609        } else {
610            serde_json::to_writer(&mut buf, value)?;
611        }
612        buf.flush()
613    }
614
615    /// Serialize and write.
616    ///
617    /// If `pretty` is `true` the TOML is formatted for human reading.
618    #[cfg(feature = "toml")]
619    pub fn write_toml<O: serde::Serialize>(&mut self, value: &O, pretty: bool) -> io::Result<()> {
620        let toml = if pretty {
621            toml::ser::to_string_pretty(value)
622        } else {
623            toml::ser::to_string(value)
624        }
625        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
626
627        self.write_all(toml.as_bytes())
628    }
629
630    /// Serialize and write.
631    ///
632    /// If `pretty` is `true` the RON if formatted for human reading using the default pretty config.
633    #[cfg(feature = "ron")]
634    pub fn write_ron<O: serde::Serialize>(&mut self, value: &O, pretty: bool) -> io::Result<()> {
635        let mut buf = io::BufWriter::new(ops::DerefMut::deref_mut(self));
636        if pretty {
637            ron::ser::to_writer_pretty(&mut buf, value, Default::default()).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
638        } else {
639            ron::ser::to_writer(&mut buf, value).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
640        }
641        buf.flush()
642    }
643
644    /// Serialize and write.
645    #[cfg(feature = "yaml")]
646    pub fn write_yaml<O: serde::Serialize>(&mut self, value: &O) -> io::Result<()> {
647        let mut buf = io::BufWriter::new(ops::DerefMut::deref_mut(self));
648        serde_yaml::to_writer(&mut buf, value).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
649        buf.flush()
650    }
651
652    /// Commit write, flush and replace the actual file with the new one.
653    pub fn commit(mut self) -> io::Result<()> {
654        let r = self.replace_actual();
655        self.clean();
656        r
657    }
658
659    /// Cancel write, the file will not be updated.
660    pub fn cancel(mut self) {
661        self.clean();
662    }
663
664    fn replace_actual(&mut self) -> io::Result<()> {
665        let mut temp_file = self.temp_file.take().unwrap();
666        temp_file.flush()?;
667        temp_file.sync_all()?;
668
669        unlock_ok(&temp_file).unwrap();
670        drop(temp_file);
671
672        let actual_file = self.actual_file.take().unwrap();
673        unlock_ok(&actual_file)?;
674        drop(actual_file);
675
676        let mut retries = 0;
677        loop {
678            // commit by replacing the actual_path with already on disk temp_path file.
679            match fs::rename(&self.temp_path, &self.actual_path) {
680                Ok(()) => {
681                    break;
682                }
683                Err(e) => match e.kind() {
684                    io::ErrorKind::PermissionDenied => {
685                        if retries == 5 {
686                            // Give-up, we manage to write lock both temp and actual just
687                            // before this, but now we can't replace actual and remove temp.
688                            // Hardware issue? Or another process holding a lock for 1s+50ms*5.
689                            return Err(e);
690                        } else if retries > 0 {
691                            // Second+ retries:
692                            //
693                            // probably a system issue.
694                            //
695                            // Windows sporadically returns ACCESS_DENIED for kernel!SetRenameInformationFile in
696                            // other apps that use the same save pattern (write-tmp -> close-tmp -> rename).
697                            // see GIMP issue: https://gitlab.gnome.org/GNOME/gimp/-/issues/1370
698                            //
699                            // I used procmon to trace all file operations, there is no other app trying to use
700                            // the temp and actual files when the ACCESS_DENIED occurs, both files are unlocked and
701                            // closed before the rename calls start. This might be a Windows bug.
702                            std::thread::sleep(30.ms());
703                        } else {
704                            // first retry:
705                            //
706                            // probably another process reading the `actual_path`.
707                            //
708                            // Reacquire a write lock and unlock, just to wait the external app.
709                            match std::fs::File::options().write(true).open(&self.actual_path) {
710                                Ok(f) => {
711                                    if lock_exclusive(&f, 10.secs()).is_ok() {
712                                        // acquired actual ok, retry
713                                        let _ = unlock_ok(&f);
714                                    }
715                                }
716                                Err(e) => match e.kind() {
717                                    io::ErrorKind::NotFound => {
718                                        // all good, rename will create actual
719                                        continue;
720                                    }
721                                    _ => {
722                                        // unknown error, let retry handle it
723                                        std::thread::sleep(30.ms());
724                                    }
725                                },
726                            }
727                        }
728
729                        retries += 1;
730                    }
731                    _ => return Err(e),
732                },
733            }
734        }
735
736        Ok(())
737    }
738
739    fn clean(&mut self) {
740        self.cleaned = true;
741
742        if let Some(tmp) = self.temp_file.take() {
743            let _ = FileExt::unlock(&tmp);
744        }
745        if let Err(e) = fs::remove_file(&self.temp_path) {
746            tracing::debug!("failed to cleanup temp file, {e}")
747        }
748
749        if let Some(file) = self.actual_file.take() {
750            let _ = FileExt::unlock(&file);
751        }
752
753        let transaction = self.transaction_lock.take().unwrap();
754        let _ = FileExt::unlock(&transaction);
755        let _ = fs::remove_file(&self.transaction_path);
756    }
757}
758
759/// Error for [`WatchFile::parse`].
760#[derive(Debug)]
761pub enum WatchFileParseError<E> {
762    /// Error reading the file.
763    Io(io::Error),
764    /// Error parsing the file.
765    Parse(E),
766}
767impl<E> From<io::Error> for WatchFileParseError<E> {
768    fn from(value: io::Error) -> Self {
769        Self::Io(value)
770    }
771}
772impl<E: fmt::Display> fmt::Display for WatchFileParseError<E> {
773    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
774        match self {
775            WatchFileParseError::Io(e) => write!(f, "read error, {e}"),
776            WatchFileParseError::Parse(e) => write!(f, "parse error, {e}"),
777        }
778    }
779}
780impl<E: std::error::Error + 'static> std::error::Error for WatchFileParseError<E> {
781    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
782        match self {
783            WatchFileParseError::Io(e) => Some(e),
784            WatchFileParseError::Parse(e) => Some(e),
785        }
786    }
787}
788
789/// Represents a [`FsChange`] note.
790///
791/// This trait is already implemented for types it applies.
792#[diagnostic::on_unimplemented(note = "`FsChangeNote` is implemented for all `T: Debug + Any + Send + Sync`")]
793pub trait FsChangeNote: fmt::Debug + std::any::Any + Send + Sync {
794    /// Access any.
795    fn as_any(&self) -> &dyn std::any::Any;
796}
797impl<T: fmt::Debug + std::any::Any + Send + Sync> FsChangeNote for T {
798    fn as_any(&self) -> &dyn std::any::Any {
799        self
800    }
801}
802
803/// Handle that holds a [`WATCHER.annotate`] note.
804///
805/// [`WATCHER.annotate`]: WATCHER::annotate
806#[derive(Clone)]
807#[must_use = "the note is removed when the handle is dropped"]
808pub struct FsChangeNoteHandle(#[expect(dead_code)] Arc<Arc<dyn FsChangeNote>>);
809
810/// Annotation for file watcher events and var update tags.
811///
812/// Identifies the [`WATCHER.sync`] file that is currently being written to.
813///
814/// [`WATCHER.sync`]: WATCHER::sync
815#[derive(Debug, PartialEq, Eq)]
816pub struct WatcherSyncWriteNote(PathBuf);
817impl WatcherSyncWriteNote {
818    /// Deref.
819    pub fn as_path(&self) -> &Path {
820        self
821    }
822}
823impl ops::Deref for WatcherSyncWriteNote {
824    type Target = Path;
825
826    fn deref(&self) -> &Self::Target {
827        self.0.as_path()
828    }
829}
830
831/// File system change event types.
832///
833/// The event for each change is available in [`FsChange::event`].
834///
835/// This module re-exports types from the [`notify`](https://docs.rs/notify) crate.
836pub mod fs_event {
837    pub use notify::event::{
838        AccessKind, AccessMode, CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind, RenameMode,
839    };
840    pub use notify::{Error, ErrorKind};
841}
842
843/// Represents a single file system change, annotated.
844#[derive(Debug)]
845pub struct FsChange {
846    /// All [`WATCHER.annotate`] that where set when this event happened.
847    ///
848    /// [`WATCHER.annotate`]: WATCHER::annotate
849    pub notes: Vec<Arc<dyn FsChangeNote>>,
850
851    /// The actual notify event or error.
852    pub event: Result<fs_event::Event, fs_event::Error>,
853}
854impl FsChange {
855    /// If the change affects the `path`.
856    pub fn is_for_path(&self, path: &Path) -> bool {
857        if let Ok(ev) = &self.event {
858            return ev.paths.iter().any(|p| p.starts_with(path));
859        }
860        false
861    }
862
863    /// If the change affects any path matched by the glob pattern.
864    pub fn is_for_glob(&self, pattern: &glob::Pattern) -> bool {
865        if let Ok(ev) = &self.event {
866            return ev.paths.iter().any(|p| pattern.matches_path(p));
867        }
868        false
869    }
870
871    /// Iterate over all notes of the type `T`.
872    pub fn notes<T: FsChangeNote>(&self) -> impl Iterator<Item = &T> {
873        self.notes.iter().filter_map(|n| FsChangeNote::as_any(&**n).downcast_ref::<T>())
874    }
875}
876
877event_args! {
878    /// [`FS_CHANGES_EVENT`] arguments.
879    pub struct FsChangesArgs {
880        /// All notify changes since the last event.
881        pub changes: Arc<Vec<FsChange>>,
882
883        ..
884
885        /// None, only app level handlers receive this event.
886        fn delivery_list(&self, list: &mut UpdateDeliveryList) {
887            let _ = list;
888        }
889    }
890}
891impl FsChangesArgs {
892    /// Iterate over all change events.
893    pub fn events(&self) -> impl Iterator<Item = &fs_event::Event> + '_ {
894        self.changes.iter().filter_map(|r| r.event.as_ref().ok())
895    }
896
897    /// Iterate over all file watcher errors.
898    pub fn errors(&self) -> impl Iterator<Item = &notify::Error> + '_ {
899        self.changes.iter().filter_map(|r| r.event.as_ref().err())
900    }
901
902    /// Returns `true` is some events where lost.
903    ///
904    /// This indicates either a lapse in the events or a change in the filesystem such that events
905    /// received so far can no longer be relied on to represent the state of the filesystem now.
906    ///
907    /// An application that simply reacts to file changes may not care about this. An application
908    /// that keeps an in-memory representation of the filesystem will need to care, and will need
909    /// to refresh that representation directly from the filesystem.
910    pub fn rescan(&self) -> bool {
911        self.events().any(|e| e.need_rescan())
912    }
913
914    /// Iterate over all changes that affects paths selected by the `glob` pattern.
915    pub fn changes_for(&self, glob: &str) -> Result<impl Iterator<Item = &FsChange> + '_, glob::PatternError> {
916        let glob = glob::Pattern::new(glob)?;
917        Ok(self.changes.iter().filter(move |c| c.is_for_glob(&glob)))
918    }
919
920    /// Iterate over all changes that affects paths that are equal to `path` or inside it.
921    pub fn changes_for_path<'a>(&'a self, path: &'a Path) -> impl Iterator<Item = &'a FsChange> + 'a {
922        self.changes.iter().filter(move |c| c.is_for_path(path))
923    }
924
925    /// Iterate over all change events that affects that are equal to `path` or inside it.
926    pub fn events_for(&self, glob: &str) -> Result<impl Iterator<Item = &fs_event::Event> + '_, glob::PatternError> {
927        let glob = glob::Pattern::new(glob)?;
928        Ok(self.events().filter(move |ev| ev.paths.iter().any(|p| glob.matches_path(p))))
929    }
930
931    /// Iterate over all change events that affects paths that are equal to `path` or inside it.
932    pub fn events_for_path<'a>(&'a self, path: &'a Path) -> impl Iterator<Item = &'a fs_event::Event> + 'a {
933        self.events().filter(move |ev| ev.paths.iter().any(|p| p.starts_with(path)))
934    }
935}
936
937event! {
938    /// Event sent by the [`WATCHER`] service on directories or files that are watched.
939    pub static FS_CHANGES_EVENT: FsChangesArgs;
940}
941
942/// Represents an active file or directory watcher in [`WATCHER`].
943#[derive(Clone)]
944#[must_use = "the watcher is dropped if the handle is dropped"]
945pub struct WatcherHandle(Handle<()>);
946
947impl WatcherHandle {
948    /// Handle to no watcher.
949    pub fn dummy() -> Self {
950        Self(Handle::dummy(()))
951    }
952
953    /// If [`perm`](Self::perm) was called in another clone of this handle.
954    ///
955    /// If `true` the resource will stay in memory for the duration of the app, unless [`force_drop`](Self::force_drop)
956    /// is also called.
957    pub fn is_permanent(&self) -> bool {
958        self.0.is_permanent()
959    }
960
961    /// Force drops the watcher, meaning it will be dropped even if there are other handles active.
962    pub fn force_drop(self) {
963        self.0.force_drop()
964    }
965
966    /// If the watcher is dropped.
967    pub fn is_dropped(&self) -> bool {
968        self.0.is_dropped()
969    }
970
971    /// Drop the handle without dropping the watcher, the watcher will stay active for the
972    /// duration of the app process.
973    pub fn perm(self) {
974        self.0.perm()
975    }
976}