zng_ext_fs_watcher/lib.rs
1#![doc(html_favicon_url = "https://raw.githubusercontent.com/zng-ui/zng/main/examples/image/res/zng-logo-icon.png")]
2#![doc(html_logo_url = "https://raw.githubusercontent.com/zng-ui/zng/main/examples/image/res/zng-logo.png")]
3//!
4//! File system events and service.
5//!
6//! # Crate
7//!
8#![doc = include_str!(concat!("../", std::env!("CARGO_PKG_README")))]
9// suppress nag about very simple boxed closure signatures.
10#![expect(clippy::type_complexity)]
11#![warn(unused_extern_crates)]
12#![warn(missing_docs)]
13
14use std::{
15 fmt, fs,
16 io::{self, Write as _},
17 ops,
18 path::{Path, PathBuf},
19 sync::Arc,
20 time::Duration,
21};
22
23use path_absolutize::Absolutize;
24use zng_app::{
25 AppExtension,
26 event::{EventHandle, event, event_args},
27 handler::{Handler, HandlerExt as _},
28 update::EventUpdate,
29 view_process::raw_events::LOW_MEMORY_EVENT,
30};
31use zng_handle::Handle;
32use zng_txt::Txt;
33use zng_unit::TimeUnits;
34use zng_var::{Var, VarValue};
35
36mod service;
37use service::*;
38
39mod lock;
40use lock::*;
41
42/// Application extension that provides file system change events and service.
43///
44/// # Events
45///
46/// Events this extension provides.
47///
48/// * [`FS_CHANGES_EVENT`]
49///
50/// # Services
51///
52/// Services this extension provides.
53///
54/// * [`WATCHER`]
55#[derive(Default)]
56#[non_exhaustive]
57pub struct FsWatcherManager {}
58impl AppExtension for FsWatcherManager {
59 fn init(&mut self) {
60 WATCHER_SV.write().init_watcher();
61 }
62
63 fn event_preview(&mut self, update: &mut EventUpdate) {
64 if let Some(args) = FS_CHANGES_EVENT.on(update) {
65 WATCHER_SV.write().event(args);
66 } else if LOW_MEMORY_EVENT.on(update).is_some() {
67 WATCHER_SV.write().low_memory();
68 }
69 }
70
71 fn update_preview(&mut self) {
72 WATCHER_SV.write().update();
73 }
74
75 fn deinit(&mut self) {
76 let mut flush = WATCHER_SV.write().shutdown();
77 for v in &mut flush {
78 v.flush_shutdown();
79 }
80 }
81}
82
83/// File system watcher service.
84///
85/// This is mostly a wrapper around the [`notify`](https://docs.rs/notify) crate, integrating it with events and variables.
86///
87/// # Provider
88///
89/// This service is provided by the [`FsWatcherManager`] extension, it will panic if used in an app not extended.
90pub struct WATCHER;
91impl WATCHER {
92 /// Gets a read-write variable that defines interval awaited between each [`FS_CHANGES_EVENT`]. If
93 /// a watched path is constantly changing an event will be emitted every elapse of this interval,
94 /// the event args will contain a list of all the changes observed during the interval.
95 ///
96 /// Note that the first event notifies immediately, only subsequent events within this interval are debounced.
97 ///
98 /// Is `100.ms()` by default.
99 pub fn debounce(&self) -> Var<Duration> {
100 WATCHER_SV.read().debounce.clone()
101 }
102
103 /// Gets a read-write variable that defines interval awaited between each [`sync`] write.
104 ///
105 /// Is `100.ms()` by default.
106 ///
107 /// [`sync`]: WATCHER::sync
108 pub fn sync_debounce(&self) -> Var<Duration> {
109 WATCHER_SV.read().debounce.clone()
110 }
111
112 /// Gets a read-write variable that defines the fallback poll watcher interval.
113 ///
114 /// When an efficient watcher cannot be used a poll watcher fallback is used, the poll watcher reads
115 /// the directory or path every elapse of this interval. The poll watcher is also used for paths that
116 /// do not exist yet, that is also affected by this interval.
117 ///
118 /// Is `1.secs()` by default.
119 pub fn poll_interval(&self) -> Var<Duration> {
120 WATCHER_SV.read().poll_interval.clone()
121 }
122
123 /// Maximum time the service keeps the process alive to finish pending IO operations when the app shuts down.
124 ///
125 /// Is 1 minute by default.
126 pub fn shutdown_timeout(&self) -> Var<Duration> {
127 WATCHER_SV.read().shutdown_timeout.clone()
128 }
129
130 /// Enable file change events for the `file`.
131 ///
132 /// Returns a handle that will stop the file watch when dropped, if there is no other active handler for the same file.
133 ///
134 /// Note that this is implemented by actually watching the parent directory and filtering the events, this is done
135 /// to ensure the watcher survives operations that remove the file and then move another file to the same path.
136 ///
137 /// See [`watch_dir`] for more details.
138 ///
139 /// [`watch_dir`]: WATCHER::watch_dir
140 pub fn watch(&self, file: impl Into<PathBuf>) -> WatcherHandle {
141 WATCHER_SV.write().watch(file.into())
142 }
143
144 /// Enable file change events for files inside `dir`, also include inner directories if `recursive` is `true`.
145 ///
146 /// Returns a handle that will stop the dir watch when dropped, if there is no other active handler for the same directory.
147 ///
148 /// The directory will be watched using an OS specific efficient watcher provided by the [`notify`](https://docs.rs/notify) crate. If there is
149 /// any error creating the watcher, such as if the directory does not exist yet a slower polling watcher will retry periodically
150 /// until the efficient watcher can be created or the handle is dropped.
151 pub fn watch_dir(&self, dir: impl Into<PathBuf>, recursive: bool) -> WatcherHandle {
152 WATCHER_SV.write().watch_dir(dir.into(), recursive)
153 }
154
155 /// Read a file into a variable, the `init` value will start the variable and the `read` closure will be called
156 /// once immediately and every time the file changes, if the closure returns `Some(O)` the variable updates with the new value.
157 ///
158 /// Dropping the variable drops the read watch. The `read` closure is non-blocking, it is called in a [`task::wait`]
159 /// background thread.
160 ///
161 /// [`task::wait`]: zng_task::wait
162 pub fn read<O: VarValue>(
163 &self,
164 file: impl Into<PathBuf>,
165 init: O,
166 read: impl FnMut(io::Result<WatchFile>) -> Option<O> + Send + 'static,
167 ) -> Var<O> {
168 WATCHER_SV.write().read(file.into(), init, read)
169 }
170
171 /// Same operation as [`read`] but also tracks the operation status in a second var.
172 ///
173 /// The status variable is set to [`WatcherReadStatus::reading`] as soon as `read` starts and
174 /// is set to [`WatcherReadStatus::idle`] when read returns. If read returns a value the status
175 /// only updates to idle when the new value is available on the var, or because read the same value.
176 ///
177 /// [`read`]: Self::read
178 pub fn read_status<O, S, E>(
179 &self,
180 file: impl Into<PathBuf>,
181 init: O,
182 read: impl FnMut(io::Result<WatchFile>) -> Result<Option<O>, E> + Send + 'static,
183 ) -> (Var<O>, Var<S>)
184 where
185 O: VarValue,
186 S: WatcherReadStatus<E>,
187 {
188 WATCHER_SV.write().read_status(file.into(), init, read)
189 }
190
191 /// Read a directory into a variable, the `init` value will start the variable and the `read` closure will be called
192 /// once immediately and every time any changes happen inside the dir, if the closure returns `Some(O)` the variable updates with the new value.
193 ///
194 /// The `read` closure parameter is a directory walker from the [`walkdir`](https://docs.rs/walkdir) crate.
195 ///
196 /// The directory walker is pre-configured to skip the `dir` itself and to have a max-depth of 1 if not `recursive`, these configs can.
197 ///
198 /// Dropping the variable drops the read watch. The `read` closure is non-blocking, it is called in a [`task::wait`]
199 /// background thread.
200 ///
201 /// [`task::wait`]: zng_task::wait
202 pub fn read_dir<O: VarValue>(
203 &self,
204 dir: impl Into<PathBuf>,
205 recursive: bool,
206 init: O,
207 read: impl FnMut(walkdir::WalkDir) -> Option<O> + Send + 'static,
208 ) -> Var<O> {
209 WATCHER_SV.write().read_dir(dir.into(), recursive, init, read)
210 }
211
212 /// Same operation as [`read_dir`] but also tracks the operation status in a second var.
213 ///
214 /// The status variable is set to [`WatcherReadStatus::reading`] as soon as `read` starts and
215 /// is set to [`WatcherReadStatus::idle`] when read returns. If read returns a value the status
216 /// only updates to idle when the new value is available on the var, or because read the same value.
217 ///
218 /// [`read_dir`]: Self::read_dir
219 pub fn read_dir_status<O, S, E>(
220 &self,
221 dir: impl Into<PathBuf>,
222 recursive: bool,
223 init: O,
224 read: impl FnMut(walkdir::WalkDir) -> Result<Option<O>, E> + Send + 'static,
225 ) -> (Var<O>, Var<S>)
226 where
227 O: VarValue,
228 S: WatcherReadStatus<E>,
229 {
230 WATCHER_SV.write().read_dir_status(dir.into(), recursive, init, read)
231 }
232
233 /// Bind a file with a variable, the `file` will be `read` when it changes and be `write` when the variable changes,
234 /// writes are only applied on success and will not cause a `read` on the same sync task. The `init` value is used to
235 /// create the variable, if the `file` exists it will be `read` once at the beginning.
236 ///
237 /// Dropping the variable drops the read watch. The `read` and `write` closures are non-blocking, they are called in a [`task::wait`]
238 /// background thread.
239 ///
240 /// # Sync
241 ///
242 /// The file synchronization ensures that the file is only actually modified when write is finished by writing
243 /// to a temporary file and committing a replace only if the write succeeded. The file is write-locked for the duration
244 /// of `write` call, but the contents are not touched until commit. See [`WriteFile`] for more details.
245 ///
246 /// The [`FsWatcherManager`] blocks on app exit until all writes commit or cancel. See [`WATCHER::shutdown_timeout`] for
247 /// more details.
248 ///
249 /// ## Read Errors
250 ///
251 /// Not-found errors are handled by the watcher by calling `write` using the current variable value, other read errors
252 /// are passed to `read`. If `read` returns a value for an error the `write` closure is called to override the file,
253 /// otherwise only the variable is set and this variable update does not cause a `write`.
254 ///
255 /// ## Write Errors
256 ///
257 /// If `write` fails the file is not touched and the temporary file is removed, if the file path
258 /// does not exit all missing parent folders and the file will be created automatically before the `write`
259 /// call.
260 ///
261 /// Note that [`WriteFile::commit`] must be called to flush the temporary file and attempt to rename
262 /// it, if the file is dropped without commit it will cancel and log an error, you must call [`WriteFile::cancel`]
263 /// to correctly avoid writing.
264 ///
265 /// If the cleanup after commit fails the error is logged and ignored.
266 ///
267 /// If write fails to even create the file and/or acquire a write lock on it this error is the input for
268 /// the `write` closure.
269 ///
270 /// ## Error Handling
271 ///
272 /// You can call services or set other variables from inside the `read` and `write` closures, this can be
273 /// used to get a signal out to handle the error, perhaps drop the sync var (to stop watching), alert the user that the
274 /// file is out of sync and initiate some sort of recovery routine.
275 ///
276 /// If the file synchronization is not important you can just ignore it, the watcher will try again
277 /// on the next variable or file update.
278 ///
279 /// ## Status
280 ///
281 /// Note that `read` and `write` run in background task threads, so if you are tracking the operation
282 /// status in a separate variable you may end-up with synchronization bugs between the status variable
283 /// and the actual result variable, you can use [`sync_status`] to implement racing-free status tracking.
284 ///
285 /// [`sync_status`]: Self::sync_status
286 /// [`task::wait`]: zng_task::wait
287 pub fn sync<O: VarValue>(
288 &self,
289 file: impl Into<PathBuf>,
290 init: O,
291 read: impl FnMut(io::Result<WatchFile>) -> Option<O> + Send + 'static,
292 write: impl FnMut(O, io::Result<WriteFile>) + Send + 'static,
293 ) -> Var<O> {
294 WATCHER_SV.write().sync(file.into(), init, read, write)
295 }
296
297 /// Same operation as [`sync`] but also tracks the operation status in a second var.
298 ///
299 /// The status variable is set to [`WatcherReadStatus::reading`] as soon as `read` starts and
300 /// is set to [`WatcherReadStatus::idle`] when read returns. If read returns a value the status
301 /// only updates to idle when the new sync value is available, or because read the same value.
302 ///
303 /// The status variable is set to [`WatcherSyncStatus::writing`] as soon as it updates and
304 /// is set to [`WatcherReadStatus::idle`] only when the new sync value is available, either
305 /// by update or because read the same value.
306 ///
307 /// [`sync`]: Self::sync
308 pub fn sync_status<O, S, ER, EW>(
309 &self,
310 file: impl Into<PathBuf>,
311 init: O,
312 read: impl FnMut(io::Result<WatchFile>) -> Result<Option<O>, ER> + Send + 'static,
313 write: impl FnMut(O, io::Result<WriteFile>) -> Result<(), EW> + Send + 'static,
314 ) -> (Var<O>, Var<S>)
315 where
316 O: VarValue,
317 S: WatcherSyncStatus<ER, EW>,
318 {
319 WATCHER_SV.write().sync_status(file.into(), init, read, write)
320 }
321
322 /// Watch `file` and calls `handler` every time it changes.
323 ///
324 /// Note that the `handler` is blocking, use [`async_hn!`] and [`task::wait`] to run IO without
325 /// blocking the app.
326 ///
327 /// [`async_hn!`]: macro@zng_app::handler::async_hn
328 /// [`task::wait`]: zng_task::wait
329 pub fn on_file_changed(&self, file: impl Into<PathBuf>, handler: Handler<FsChangesArgs>) -> EventHandle {
330 let file = file.into();
331 let handle = self.watch(file.clone());
332 FS_CHANGES_EVENT.on_event(handler.filtered(move |args| {
333 let _handle = &handle;
334 args.events_for_path(&file).next().is_some()
335 }))
336 }
337
338 /// Watch `dir` and calls `handler` every time something inside it changes.
339 ///
340 /// Note that the `handler` is blocking, use [`async_hn!`] and [`task::wait`] to run IO without
341 /// blocking the app.
342 ///
343 /// [`async_hn!`]: macro@zng_app::handler::async_hn
344 /// [`task::wait`]: zng_task::wait
345 pub fn on_dir_changed(&self, dir: impl Into<PathBuf>, recursive: bool, handler: Handler<FsChangesArgs>) -> EventHandle {
346 let dir = dir.into();
347 let handle = self.watch_dir(dir.clone(), recursive);
348 FS_CHANGES_EVENT.on_event(handler.filtered(move |args| {
349 let _handle = &handle;
350 args.events_for_path(&dir).next().is_some()
351 }))
352 }
353
354 /// Push a `note` that will be cloned on all subsequent change events until the returned handle is dropped.
355 ///
356 /// This can be used to tag all events that happened over a period of time, something you can't do just
357 /// by receiving the events due to async delays caused by debounce.
358 ///
359 /// Note that the underlying system events the [`notify`](https://docs.rs/notify) crate uses are not guaranteed to be synchronous.
360 pub fn annotate(&self, note: Arc<dyn FsChangeNote>) -> FsChangeNoteHandle {
361 WATCHER_SV.write().annotate(note)
362 }
363}
364
365/// Represents a status type for [`WATCHER.sync_status`].
366///
367/// [`WATCHER.sync_status`]: WATCHER::sync_status
368pub trait WatcherSyncStatus<ER = io::Error, EW = io::Error>: WatcherReadStatus<ER> {
369 /// New writing value.
370 fn writing() -> Self;
371 /// New write error value.
372 fn write_error(e: EW) -> Self;
373}
374
375/// Represents a status type for [`WATCHER`] read-only operations.
376pub trait WatcherReadStatus<ER = io::Error>: VarValue + PartialEq {
377 /// New idle value.
378 fn idle() -> Self;
379 /// New reading value.
380 fn reading() -> Self;
381 /// New read error value.
382 fn read_error(e: ER) -> Self;
383}
384
385/// Represents an open read-only file provided by [`WATCHER.read`].
386///
387/// This type is a thin wrapper around the [`std::fs::File`] with some convenience parsing methods.
388///
389/// [`WATCHER.read`]: WATCHER::read
390#[derive(Debug)]
391pub struct WatchFile(fs::File);
392impl WatchFile {
393 /// Open read the file.
394 pub fn open(file: impl AsRef<Path>) -> io::Result<Self> {
395 Self::try_open_non_empty(file.as_ref(), true)
396 }
397 fn try_open_non_empty(path: &Path, retry: bool) -> io::Result<Self> {
398 let file = fs::File::open(path)?;
399
400 if retry && file.metadata()?.len() == 0 {
401 // some apps create an empty file unlocked, then write.
402 let _ = file;
403 std::thread::sleep(5.ms());
404 return Self::try_open_non_empty(path, false);
405 }
406
407 lock_shared(&file, Duration::from_secs(10))?;
408 Ok(Self(file))
409 }
410
411 /// Read the file contents as a text string.
412 pub fn text(&mut self) -> io::Result<Txt> {
413 self.string().map(Txt::from)
414 }
415
416 /// Read the file contents as a string.
417 pub fn string(&mut self) -> io::Result<String> {
418 use std::io::Read;
419 let mut s = String::new();
420 self.0.read_to_string(&mut s)?;
421 Ok(s)
422 }
423
424 /// Deserialize the file contents as JSON.
425 #[cfg(feature = "json")]
426 pub fn json<O>(&mut self) -> serde_json::Result<O>
427 where
428 O: serde::de::DeserializeOwned,
429 {
430 serde_json::from_reader(io::BufReader::new(&mut self.0))
431 }
432
433 /// Deserialize the file contents as TOML.
434 #[cfg(feature = "toml")]
435 pub fn toml<O>(&mut self) -> io::Result<O>
436 where
437 O: serde::de::DeserializeOwned,
438 {
439 use std::io::Read;
440 let mut buf = io::BufReader::new(&mut self.0);
441
442 let mut toml_str = String::new();
443 buf.read_to_string(&mut toml_str)?;
444
445 toml::de::from_str(&toml_str).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
446 }
447
448 /// Deserialize the file content as RON.
449 #[cfg(feature = "ron")]
450 pub fn ron<O>(&mut self) -> Result<O, ron::de::SpannedError>
451 where
452 O: serde::de::DeserializeOwned,
453 {
454 ron::de::from_reader(io::BufReader::new(&mut self.0))
455 }
456
457 /// Deserialize the file content as YAML.
458 #[cfg(feature = "yaml")]
459 pub fn yaml<O>(&mut self) -> serde_yaml::Result<O>
460 where
461 O: serde::de::DeserializeOwned,
462 {
463 serde_yaml::from_reader(io::BufReader::new(&mut self.0))
464 }
465
466 /// Read file and parse it.
467 pub fn parse<O: std::str::FromStr>(&mut self) -> Result<O, WatchFileParseError<O::Err>> {
468 use std::io::Read;
469 let mut s = String::new();
470 self.0.read_to_string(&mut s)?;
471 O::from_str(&s).map_err(WatchFileParseError::Parse)
472 }
473}
474impl ops::Deref for WatchFile {
475 type Target = fs::File;
476
477 fn deref(&self) -> &Self::Target {
478 &self.0
479 }
480}
481impl ops::DerefMut for WatchFile {
482 fn deref_mut(&mut self) -> &mut Self::Target {
483 &mut self.0
484 }
485}
486impl Drop for WatchFile {
487 fn drop(&mut self) {
488 let _ = FileExt::unlock(&self.0);
489 }
490}
491
492const TRANSACTION_GUID: &str = "6eIw3bYMS0uKaQMkTIQacQ";
493const TRANSACTION_LOCK_EXT: &str = "6eIw3bYMS0uKaQMkTIQacQ-lock.tmp";
494
495/// Represents an open write file provided by [`WATCHER.sync`].
496///
497/// This struct writes to a temporary file and renames it over the actual file on commit only.
498/// The dereferenced [`fs::File`] is the temporary file, not the actual one.
499///
500/// # Transaction
501///
502/// To minimize the risk of file corruption exclusive locks are used, both the target file and the temp file
503/// are locked. An empty lock file is also used to cover the moment when both files are unlocked for the rename operation
504/// and the moment the temp file is acquired.
505///
506/// The temp file is the actual file path with file extension replaced with `{path/.file-name.ext}.{GUID}-{n}.tmp`, the `n` is a
507/// number from 0 to 999, if a temp file exists unlocked it will be reused.
508///
509/// The lock file is `{path/.file-name.ext}.{GUID}-lock.tmp`. Note that this
510/// lock file only helps for apps that use [`WriteFile`], but even without it the risk is minimal as the slow
511/// write operations are already flushed when it is time to commit.
512///
513/// [`WATCHER.sync`]: WATCHER::sync
514pub struct WriteFile {
515 temp_file: Option<fs::File>,
516 actual_file: Option<fs::File>,
517 transaction_lock: Option<fs::File>,
518
519 actual_path: PathBuf,
520 temp_path: PathBuf,
521 transaction_path: PathBuf,
522
523 cleaned: bool,
524}
525impl Drop for WriteFile {
526 fn drop(&mut self) {
527 if !self.cleaned {
528 tracing::error!("dropped sync write file without commit or cancel");
529 self.clean();
530 }
531 }
532}
533impl ops::Deref for WriteFile {
534 type Target = fs::File;
535
536 fn deref(&self) -> &Self::Target {
537 self.temp_file.as_ref().unwrap()
538 }
539}
540impl ops::DerefMut for WriteFile {
541 fn deref_mut(&mut self) -> &mut Self::Target {
542 self.temp_file.as_mut().unwrap()
543 }
544}
545impl WriteFile {
546 /// Open or create the file.
547 pub fn open(path: PathBuf) -> io::Result<Self> {
548 let actual_path = path.absolutize()?.into_owned();
549 if !actual_path.exists()
550 && let Some(parent) = actual_path.parent()
551 {
552 std::fs::create_dir_all(parent)?;
553 }
554
555 let hidden_name = match actual_path.file_name() {
556 Some(n) => format!(".{}", n.to_string_lossy()),
557 None => return Err(io::Error::new(io::ErrorKind::InvalidInput, "expected file name")),
558 };
559
560 let transaction_path = actual_path.with_file_name(format!("{hidden_name}.{TRANSACTION_LOCK_EXT}"));
561 let transaction_lock = fs::OpenOptions::new()
562 .create(true)
563 .truncate(true)
564 .write(true)
565 .open(&transaction_path)?;
566
567 const TIMEOUT: Duration = Duration::from_secs(10);
568
569 lock_exclusive(&transaction_lock, TIMEOUT)?;
570
571 let actual_file = fs::OpenOptions::new().write(true).create(true).truncate(false).open(&actual_path)?;
572 lock_exclusive(&actual_file, TIMEOUT)?;
573
574 let mut n = 0;
575 let mut temp_path = actual_path.with_file_name(format!("{hidden_name}.{TRANSACTION_GUID}-{n}.tmp"));
576 let temp_file = loop {
577 if let Ok(f) = fs::OpenOptions::new().write(true).create(true).truncate(true).open(&temp_path)
578 && let Ok(true) = f.try_lock_exclusive()
579 {
580 break f;
581 }
582
583 n += 1;
584 temp_path = actual_path.with_file_name(format!("{hidden_name}.{TRANSACTION_GUID}-{n}.tmp"));
585 n += 1;
586 if n > 1000 {
587 return Err(io::Error::new(io::ErrorKind::AlreadyExists, "cannot create temporary file"));
588 }
589 };
590
591 Ok(Self {
592 actual_file: Some(actual_file),
593 temp_file: Some(temp_file),
594 transaction_lock: Some(transaction_lock),
595 actual_path,
596 temp_path,
597 transaction_path,
598 cleaned: false,
599 })
600 }
601
602 /// Write the text string.
603 pub fn write_text(&mut self, txt: &str) -> io::Result<()> {
604 self.write_all(txt.as_bytes())
605 }
606
607 /// Serialize and write.
608 ///
609 /// If `pretty` is `true` the JSON is formatted for human reading.
610 #[cfg(feature = "json")]
611 pub fn write_json<O: serde::Serialize>(&mut self, value: &O, pretty: bool) -> io::Result<()> {
612 let mut buf = io::BufWriter::new(ops::DerefMut::deref_mut(self));
613 if pretty {
614 serde_json::to_writer_pretty(&mut buf, value)?;
615 } else {
616 serde_json::to_writer(&mut buf, value)?;
617 }
618 buf.flush()
619 }
620
621 /// Serialize and write.
622 ///
623 /// If `pretty` is `true` the TOML is formatted for human reading.
624 #[cfg(feature = "toml")]
625 pub fn write_toml<O: serde::Serialize>(&mut self, value: &O, pretty: bool) -> io::Result<()> {
626 let toml = if pretty {
627 toml::ser::to_string_pretty(value)
628 } else {
629 toml::ser::to_string(value)
630 }
631 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
632
633 self.write_all(toml.as_bytes())
634 }
635
636 /// Serialize and write.
637 ///
638 /// If `pretty` is `true` the RON if formatted for human reading using the default pretty config.
639 #[cfg(feature = "ron")]
640 pub fn write_ron<O: serde::Serialize>(&mut self, value: &O, pretty: bool) -> io::Result<()> {
641 let buf = io::BufWriter::new(ops::DerefMut::deref_mut(self));
642 struct Ffs<'a> {
643 w: io::BufWriter<&'a mut fs::File>,
644 }
645 impl fmt::Write for Ffs<'_> {
646 fn write_str(&mut self, s: &str) -> fmt::Result {
647 self.w.write_all(s.as_bytes()).map_err(|_| fmt::Error)
648 }
649 }
650 let mut buf = Ffs { w: buf };
651 if pretty {
652 ron::ser::to_writer_pretty(&mut buf, value, Default::default()).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
653 } else {
654 ron::ser::to_writer(&mut buf, value).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
655 }
656 buf.w.flush()
657 }
658
659 /// Serialize and write.
660 #[cfg(feature = "yaml")]
661 pub fn write_yaml<O: serde::Serialize>(&mut self, value: &O) -> io::Result<()> {
662 let mut buf = io::BufWriter::new(ops::DerefMut::deref_mut(self));
663 serde_yaml::to_writer(&mut buf, value).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
664 buf.flush()
665 }
666
667 /// Commit write, flush and replace the actual file with the new one.
668 pub fn commit(mut self) -> io::Result<()> {
669 let r = self.replace_actual();
670 self.clean();
671 r
672 }
673
674 /// Cancel write, the file will not be updated.
675 pub fn cancel(mut self) {
676 self.clean();
677 }
678
679 fn replace_actual(&mut self) -> io::Result<()> {
680 let mut temp_file = self.temp_file.take().unwrap();
681 temp_file.flush()?;
682 temp_file.sync_all()?;
683
684 unlock_ok(&temp_file).unwrap();
685 drop(temp_file);
686
687 let actual_file = self.actual_file.take().unwrap();
688 unlock_ok(&actual_file)?;
689 drop(actual_file);
690
691 let mut retries = 0;
692 loop {
693 // commit by replacing the actual_path with already on disk temp_path file.
694 match fs::rename(&self.temp_path, &self.actual_path) {
695 Ok(()) => {
696 break;
697 }
698 Err(e) => match e.kind() {
699 io::ErrorKind::PermissionDenied => {
700 if retries == 5 {
701 // Give-up, we managed to write lock both temp and actual just
702 // before this, but now we can't replace actual and remove temp.
703 // Hardware issue? Or another process holding a lock for 1s+50ms*5.
704 return Err(e);
705 } else if retries > 0 {
706 // Second+ retries:
707 //
708 // probably a system issue.
709 //
710 // Windows sporadically returns ACCESS_DENIED for kernel!SetRenameInformationFile in
711 // other apps that use the same save pattern (write-tmp -> close-tmp -> rename).
712 // see GIMP issue: https://gitlab.gnome.org/GNOME/gimp/-/issues/1370
713 //
714 // I used procmon to trace all file operations, there is no other app trying to use
715 // the temp and actual files when the ACCESS_DENIED occurs, both files are unlocked and
716 // closed before the rename calls start. This might be a Windows bug.
717 std::thread::sleep(30.ms());
718 } else {
719 // first retry:
720 //
721 // probably another process reading the `actual_path`.
722 //
723 // Reacquire a write lock and unlock, just to wait the external app.
724 match std::fs::File::options().write(true).open(&self.actual_path) {
725 Ok(f) => {
726 if lock_exclusive(&f, 10.secs()).is_ok() {
727 // acquired actual ok, retry
728 let _ = unlock_ok(&f);
729 }
730 }
731 Err(e) => match e.kind() {
732 io::ErrorKind::NotFound => {
733 // all good, rename will create actual
734 continue;
735 }
736 _ => {
737 // unknown error, let retry handle it
738 std::thread::sleep(30.ms());
739 }
740 },
741 }
742 }
743
744 retries += 1;
745 }
746 _ => return Err(e),
747 },
748 }
749 }
750
751 Ok(())
752 }
753
754 fn clean(&mut self) {
755 self.cleaned = true;
756
757 if let Some(tmp) = self.temp_file.take() {
758 let _ = FileExt::unlock(&tmp);
759 }
760 if let Err(e) = fs::remove_file(&self.temp_path) {
761 tracing::debug!("failed to cleanup temp file, {e}")
762 }
763
764 if let Some(file) = self.actual_file.take() {
765 let _ = FileExt::unlock(&file);
766 }
767
768 let transaction = self.transaction_lock.take().unwrap();
769 let _ = FileExt::unlock(&transaction);
770 let _ = fs::remove_file(&self.transaction_path);
771 }
772}
773
774/// Error for [`WatchFile::parse`].
775#[derive(Debug)]
776#[non_exhaustive]
777pub enum WatchFileParseError<E> {
778 /// Error reading the file.
779 Io(io::Error),
780 /// Error parsing the file.
781 Parse(E),
782}
783impl<E> From<io::Error> for WatchFileParseError<E> {
784 fn from(value: io::Error) -> Self {
785 Self::Io(value)
786 }
787}
788impl<E: fmt::Display> fmt::Display for WatchFileParseError<E> {
789 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
790 match self {
791 WatchFileParseError::Io(e) => write!(f, "read error, {e}"),
792 WatchFileParseError::Parse(e) => write!(f, "parse error, {e}"),
793 }
794 }
795}
796impl<E: std::error::Error + 'static> std::error::Error for WatchFileParseError<E> {
797 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
798 match self {
799 WatchFileParseError::Io(e) => Some(e),
800 WatchFileParseError::Parse(e) => Some(e),
801 }
802 }
803}
804
805/// Represents a [`FsChange`] note.
806///
807/// This trait is already implemented for all types it applies.
808#[diagnostic::on_unimplemented(note = "`FsChangeNote` is implemented for all `T: Debug + Any + Send + Sync`")]
809pub trait FsChangeNote: fmt::Debug + std::any::Any + Send + Sync {
810 /// Access any.
811 fn as_any(&self) -> &dyn std::any::Any;
812}
813impl<T: fmt::Debug + std::any::Any + Send + Sync> FsChangeNote for T {
814 fn as_any(&self) -> &dyn std::any::Any {
815 self
816 }
817}
818
819/// Handle that holds a [`WATCHER.annotate`] note.
820///
821/// [`WATCHER.annotate`]: WATCHER::annotate
822#[derive(Clone)]
823#[must_use = "the note is removed when the handle is dropped"]
824pub struct FsChangeNoteHandle(#[expect(dead_code)] Arc<Arc<dyn FsChangeNote>>);
825
826/// Annotation for file watcher events and var update tags.
827///
828/// Identifies the [`WATCHER.sync`] file that is currently being written to.
829///
830/// [`WATCHER.sync`]: WATCHER::sync
831#[derive(Debug, PartialEq, Eq)]
832pub struct WatcherSyncWriteNote(PathBuf);
833impl WatcherSyncWriteNote {
834 /// Deref.
835 pub fn as_path(&self) -> &Path {
836 self
837 }
838}
839impl ops::Deref for WatcherSyncWriteNote {
840 type Target = Path;
841
842 fn deref(&self) -> &Self::Target {
843 self.0.as_path()
844 }
845}
846
847/// File system change event types.
848///
849/// The event for each change is available in [`FsChange::event`].
850///
851/// This module re-exports types from the [`notify`](https://docs.rs/notify) crate.
852pub mod fs_event {
853 pub use notify::event::{
854 AccessKind, AccessMode, CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind, RenameMode,
855 };
856 pub use notify::{Error, ErrorKind};
857}
858
859/// Represents a single file system change, annotated.
860#[derive(Debug)]
861#[non_exhaustive]
862pub struct FsChange {
863 /// All [`WATCHER.annotate`] that where set when this event happened.
864 ///
865 /// [`WATCHER.annotate`]: WATCHER::annotate
866 pub notes: Vec<Arc<dyn FsChangeNote>>,
867
868 /// The actual notify event or error.
869 pub event: Result<fs_event::Event, fs_event::Error>,
870}
871impl FsChange {
872 /// If the change affects the `path`.
873 pub fn is_for_path(&self, path: &Path) -> bool {
874 if let Ok(ev) = &self.event {
875 return ev.paths.iter().any(|p| p.starts_with(path));
876 }
877 false
878 }
879
880 /// If the change affects any path matched by the glob pattern.
881 pub fn is_for_glob(&self, pattern: &glob::Pattern) -> bool {
882 if let Ok(ev) = &self.event {
883 return ev.paths.iter().any(|p| pattern.matches_path(p));
884 }
885 false
886 }
887
888 /// Iterate over all notes of the type `T`.
889 pub fn notes<T: FsChangeNote>(&self) -> impl Iterator<Item = &T> {
890 self.notes.iter().filter_map(|n| FsChangeNote::as_any(&**n).downcast_ref::<T>())
891 }
892}
893
894event_args! {
895 /// [`FS_CHANGES_EVENT`] arguments.
896 pub struct FsChangesArgs {
897 /// All notify changes since the last event.
898 pub changes: Arc<Vec<FsChange>>,
899
900 ..
901
902 /// None, only app level handlers receive this event.
903 fn delivery_list(&self, list: &mut UpdateDeliveryList) {
904 let _ = list;
905 }
906 }
907}
908impl FsChangesArgs {
909 /// Iterate over all change events.
910 pub fn events(&self) -> impl Iterator<Item = &fs_event::Event> + '_ {
911 self.changes.iter().filter_map(|r| r.event.as_ref().ok())
912 }
913
914 /// Iterate over all file watcher errors.
915 pub fn errors(&self) -> impl Iterator<Item = ¬ify::Error> + '_ {
916 self.changes.iter().filter_map(|r| r.event.as_ref().err())
917 }
918
919 /// Returns `true` is some events where lost.
920 ///
921 /// This indicates either a lapse in the events or a change in the filesystem such that events
922 /// received so far can no longer be relied on to represent the state of the filesystem now.
923 ///
924 /// An application that simply reacts to file changes may not care about this. An application
925 /// that keeps an in-memory representation of the filesystem will need to care, and will need
926 /// to refresh that representation directly from the filesystem.
927 pub fn rescan(&self) -> bool {
928 self.events().any(|e| e.need_rescan())
929 }
930
931 /// Iterate over all changes that affects paths selected by the `glob` pattern.
932 pub fn changes_for(&self, glob: &str) -> Result<impl Iterator<Item = &FsChange> + '_, glob::PatternError> {
933 let glob = glob::Pattern::new(glob)?;
934 Ok(self.changes.iter().filter(move |c| c.is_for_glob(&glob)))
935 }
936
937 /// Iterate over all changes that affects paths that are equal to `path` or inside it.
938 pub fn changes_for_path<'a>(&'a self, path: &'a Path) -> impl Iterator<Item = &'a FsChange> + 'a {
939 self.changes.iter().filter(move |c| c.is_for_path(path))
940 }
941
942 /// Iterate over all change events that affects that are equal to `path` or inside it.
943 pub fn events_for(&self, glob: &str) -> Result<impl Iterator<Item = &fs_event::Event> + '_, glob::PatternError> {
944 let glob = glob::Pattern::new(glob)?;
945 Ok(self.events().filter(move |ev| ev.paths.iter().any(|p| glob.matches_path(p))))
946 }
947
948 /// Iterate over all change events that affects paths that are equal to `path` or inside it.
949 pub fn events_for_path<'a>(&'a self, path: &'a Path) -> impl Iterator<Item = &'a fs_event::Event> + 'a {
950 self.events().filter(move |ev| ev.paths.iter().any(|p| p.starts_with(path)))
951 }
952}
953
954event! {
955 /// Event sent by the [`WATCHER`] service on directories or files that are watched.
956 pub static FS_CHANGES_EVENT: FsChangesArgs;
957}
958
959/// Represents an active file or directory watcher in [`WATCHER`].
960#[derive(Clone)]
961#[must_use = "the watcher is dropped if the handle is dropped"]
962pub struct WatcherHandle(Handle<()>);
963
964impl WatcherHandle {
965 /// Handle to no watcher.
966 pub fn dummy() -> Self {
967 Self(Handle::dummy(()))
968 }
969
970 /// If [`perm`](Self::perm) was called in another clone of this handle.
971 ///
972 /// If `true` the resource will stay in memory for the duration of the app, unless [`force_drop`](Self::force_drop)
973 /// is also called.
974 pub fn is_permanent(&self) -> bool {
975 self.0.is_permanent()
976 }
977
978 /// Force drops the watcher, meaning it will be dropped even if there are other handles active.
979 pub fn force_drop(self) {
980 self.0.force_drop()
981 }
982
983 /// If the watcher is dropped.
984 pub fn is_dropped(&self) -> bool {
985 self.0.is_dropped()
986 }
987
988 /// Drop the handle without dropping the watcher, the watcher will stay active for the
989 /// duration of the app process.
990 pub fn perm(self) {
991 self.0.perm()
992 }
993}