zng_ext_fs_watcher/lib.rs
1#![doc(html_favicon_url = "https://raw.githubusercontent.com/zng-ui/zng/main/examples/image/res/zng-logo-icon.png")]
2#![doc(html_logo_url = "https://raw.githubusercontent.com/zng-ui/zng/main/examples/image/res/zng-logo.png")]
3//!
4//! File system events and service.
5//!
6//! # Crate
7//!
8#![doc = include_str!(concat!("../", std::env!("CARGO_PKG_README")))]
9// suppress nag about very simple boxed closure signatures.
10#![expect(clippy::type_complexity)]
11#![warn(unused_extern_crates)]
12#![warn(missing_docs)]
13
14use std::{
15 fmt, fs,
16 io::{self, Write as _},
17 ops,
18 path::{Path, PathBuf},
19 sync::Arc,
20 time::Duration,
21};
22
23use path_absolutize::Absolutize;
24use zng_app::{
25 AppExtension,
26 event::{EventHandle, event, event_args},
27 handler::{AppHandler, FilterAppHandler},
28 update::EventUpdate,
29 view_process::raw_events::LOW_MEMORY_EVENT,
30};
31use zng_handle::Handle;
32use zng_txt::Txt;
33use zng_unit::TimeUnits;
34use zng_var::{ArcVar, ReadOnlyArcVar, VarValue};
35
36mod service;
37use service::*;
38
39mod lock;
40use lock::*;
41
42/// Application extension that provides file system change events and service.
43///
44/// # Events
45///
46/// Events this extension provides.
47///
48/// * [`FS_CHANGES_EVENT`]
49///
50/// # Services
51///
52/// Services this extension provides.
53///
54/// * [`WATCHER`]
55#[derive(Default)]
56pub struct FsWatcherManager {}
57impl AppExtension for FsWatcherManager {
58 fn init(&mut self) {
59 WATCHER_SV.write().init_watcher();
60 }
61
62 fn event_preview(&mut self, update: &mut EventUpdate) {
63 if let Some(args) = FS_CHANGES_EVENT.on(update) {
64 WATCHER_SV.write().event(args);
65 } else if LOW_MEMORY_EVENT.on(update).is_some() {
66 WATCHER_SV.write().low_memory();
67 }
68 }
69
70 fn update_preview(&mut self) {
71 WATCHER_SV.write().update();
72 }
73
74 fn deinit(&mut self) {
75 let mut flush = WATCHER_SV.write().shutdown();
76 for v in &mut flush {
77 v.flush_shutdown();
78 }
79 }
80}
81
82/// File system watcher service.
83///
84/// This is mostly a wrapper around the [`notify`](https://docs.rs/notify) crate, integrating it with events and variables.
85pub struct WATCHER;
86impl WATCHER {
87 /// Gets a read-write variable that defines interval awaited between each [`FS_CHANGES_EVENT`]. If
88 /// a watched path is constantly changing an event will be emitted every elapse of this interval,
89 /// the event args will contain a list of all the changes observed during the interval.
90 ///
91 /// Note that the first event notifies immediately, only subsequent events within this interval are debounced.
92 ///
93 /// Is `100.ms()` by default.
94 pub fn debounce(&self) -> ArcVar<Duration> {
95 WATCHER_SV.read().debounce.clone()
96 }
97
98 /// Gets a read-write variable that defines interval awaited between each [`sync`] write.
99 ///
100 /// Is `100.ms()` by default.
101 ///
102 /// [`sync`]: WATCHER::sync
103 pub fn sync_debounce(&self) -> ArcVar<Duration> {
104 WATCHER_SV.read().debounce.clone()
105 }
106
107 /// Gets a read-write variable that defines the fallback poll watcher interval.
108 ///
109 /// When an efficient watcher cannot be used a poll watcher fallback is used, the poll watcher reads
110 /// the directory or path every elapse of this interval. The poll watcher is also used for paths that
111 /// do not exist yet, that is also affected by this interval.
112 ///
113 /// Is `1.secs()` by default.
114 pub fn poll_interval(&self) -> ArcVar<Duration> {
115 WATCHER_SV.read().poll_interval.clone()
116 }
117
118 /// Maximum time the service keeps the process alive to process pending IO operations when the app shuts down.
119 ///
120 /// Is 1 minute by default.
121 pub fn shutdown_timeout(&self) -> ArcVar<Duration> {
122 WATCHER_SV.read().shutdown_timeout.clone()
123 }
124
125 /// Enable file change events for the `file`.
126 ///
127 /// Returns a handle that will stop the file watch when dropped, if there is no other active handler for the same file.
128 ///
129 /// Note that this is implemented by actually watching the parent directory and filtering the events, this is done
130 /// to ensure the watcher survives operations that remove the file and then move another file to the same path.
131 ///
132 /// See [`watch_dir`] for more details.
133 ///
134 /// [`watch_dir`]: WATCHER::watch_dir
135 pub fn watch(&self, file: impl Into<PathBuf>) -> WatcherHandle {
136 WATCHER_SV.write().watch(file.into())
137 }
138
139 /// Enable file change events for files inside `dir`, also include inner directories if `recursive` is `true`.
140 ///
141 /// Returns a handle that will stop the dir watch when dropped, if there is no other active handler for the same directory.
142 ///
143 /// The directory will be watched using an OS specific efficient watcher provided by the [`notify`](https://docs.rs/notify) crate. If there is
144 /// any error creating the watcher, such as if the directory does not exist yet a slower polling watcher will retry periodically
145 /// until the efficient watcher can be created or the handle is dropped.
146 pub fn watch_dir(&self, dir: impl Into<PathBuf>, recursive: bool) -> WatcherHandle {
147 WATCHER_SV.write().watch_dir(dir.into(), recursive)
148 }
149
150 /// Read a file into a variable, the `init` value will start the variable and the `read` closure will be called
151 /// once immediately and every time the file changes, if the closure returns `Some(O)` the variable updates with the new value.
152 ///
153 /// Dropping the variable drops the read watch. The `read` closure is non-blocking, it is called in a [`task::wait`]
154 /// background thread.
155 ///
156 /// [`task::wait`]: zng_task::wait
157 pub fn read<O: VarValue>(
158 &self,
159 file: impl Into<PathBuf>,
160 init: O,
161 read: impl FnMut(io::Result<WatchFile>) -> Option<O> + Send + 'static,
162 ) -> ReadOnlyArcVar<O> {
163 WATCHER_SV.write().read(file.into(), init, read)
164 }
165
166 /// Same operation as [`read`] but also tracks the operation status in a second var.
167 ///
168 /// The status variable is set to [`WatcherReadStatus::reading`] as soon as `read` starts and
169 /// is set to [`WatcherReadStatus::idle`] when read returns. If read returns a value the status
170 /// only updates to idle when the new value is available on the var, or because read the same value.
171 ///
172 /// [`read`]: Self::read
173 pub fn read_status<O, S, E>(
174 &self,
175 file: impl Into<PathBuf>,
176 init: O,
177 read: impl FnMut(io::Result<WatchFile>) -> Result<Option<O>, E> + Send + 'static,
178 ) -> (ReadOnlyArcVar<O>, ReadOnlyArcVar<S>)
179 where
180 O: VarValue,
181 S: WatcherReadStatus<E>,
182 {
183 WATCHER_SV.write().read_status(file.into(), init, read)
184 }
185
186 /// Read a directory into a variable, the `init` value will start the variable and the `read` closure will be called
187 /// once immediately and every time any changes happen inside the dir, if the closure returns `Some(O)` the variable updates with the new value.
188 ///
189 /// The `read` closure parameter is a directory walker from the [`walkdir`](https://docs.rs/walkdir) crate.
190 ///
191 /// The directory walker is pre-configured to skip the `dir` itself and to have a max-depth of 1 if not `recursive`, these configs can.
192 ///
193 /// Dropping the variable drops the read watch. The `read` closure is non-blocking, it is called in a [`task::wait`]
194 /// background thread.
195 ///
196 /// [`task::wait`]: zng_task::wait
197 pub fn read_dir<O: VarValue>(
198 &self,
199 dir: impl Into<PathBuf>,
200 recursive: bool,
201 init: O,
202 read: impl FnMut(walkdir::WalkDir) -> Option<O> + Send + 'static,
203 ) -> ReadOnlyArcVar<O> {
204 WATCHER_SV.write().read_dir(dir.into(), recursive, init, read)
205 }
206
207 /// Same operation as [`read_dir`] but also tracks the operation status in a second var.
208 ///
209 /// The status variable is set to [`WatcherReadStatus::reading`] as soon as `read` starts and
210 /// is set to [`WatcherReadStatus::idle`] when read returns. If read returns a value the status
211 /// only updates to idle when the new value is available on the var, or because read the same value.
212 ///
213 /// [`read_dir`]: Self::read_dir
214 pub fn read_dir_status<O, S, E>(
215 &self,
216 dir: impl Into<PathBuf>,
217 recursive: bool,
218 init: O,
219 read: impl FnMut(walkdir::WalkDir) -> Result<Option<O>, E> + Send + 'static,
220 ) -> (ReadOnlyArcVar<O>, ReadOnlyArcVar<S>)
221 where
222 O: VarValue,
223 S: WatcherReadStatus<E>,
224 {
225 WATCHER_SV.write().read_dir_status(dir.into(), recursive, init, read)
226 }
227
228 /// Bind a file with a variable, the `file` will be `read` when it changes and be `write` when the variable changes,
229 /// writes are only applied on success and will not cause a `read` on the same sync task. The `init` value is used to
230 /// create the variable, if the `file` exists it will be `read` once at the beginning.
231 ///
232 /// Dropping the variable drops the read watch. The `read` and `write` closures are non-blocking, they are called in a [`task::wait`]
233 /// background thread.
234 ///
235 /// # Sync
236 ///
237 /// The file synchronization ensures that the file is only actually modified when writing is finished by writing
238 /// to a temporary file and committing a replace only if the write succeeded. The file is write-locked for the duration
239 /// of `write` call, but the contents are not touched until commit. See [`WriteFile`] for more details.
240 ///
241 /// The [`FsWatcherManager`] blocks on app exit until all writes commit or cancel.
242 ///
243 /// ## Read Errors
244 ///
245 /// Not-found errors are handled by the watcher by calling `write` using the current variable value, other read errors
246 /// are passed to `read`. If `read` returns a value for an error the `write` closure is called to override the file,
247 /// otherwise only the variable is set and this variable update does not cause a `write`.
248 ///
249 /// ## Write Errors
250 ///
251 /// If `write` fails the file is not touched and the temporary file is removed, if the file path
252 /// does not exit all missing parent folders and the file will be created automatically before the `write`
253 /// call.
254 ///
255 /// Note that [`WriteFile::commit`] must be called to flush the temporary file and attempt to rename
256 /// it, if the file is dropped without commit it will cancel and log an error, you must call [`WriteFile::cancel`]
257 /// to correctly avoid writing.
258 ///
259 /// If the cleanup after commit fails the error is logged and ignored.
260 ///
261 /// If write fails to even create the file and/or acquire a write lock on it this error is the input for
262 /// the `write` closure.
263 ///
264 /// ## Error Handling
265 ///
266 /// You can call services or set other variables from inside the `read` and `write` closures, this can be
267 /// used to get a signal out that perhaps drops the sync var (to stop watching), alert the user that the
268 /// file is out of sync and initiate some sort of recovery routine.
269 ///
270 /// If the file synchronization is not important you can just ignore it, the watcher will try again
271 /// on the next variable or file update.
272 ///
273 /// ## Status
274 ///
275 /// Note that `read` and `write` run in background task threads, so if you are tracking the operation
276 /// status in a separate variable you may end-up with synchronization bugs between th status variable
277 /// and the actual result variable, you can use [`sync_status`] to implement racing-free status tracking.
278 ///
279 /// [`sync_status`]: Self::sync_status
280 /// [`task::wait`]: zng_task::wait
281 pub fn sync<O: VarValue>(
282 &self,
283 file: impl Into<PathBuf>,
284 init: O,
285 read: impl FnMut(io::Result<WatchFile>) -> Option<O> + Send + 'static,
286 write: impl FnMut(O, io::Result<WriteFile>) + Send + 'static,
287 ) -> ArcVar<O> {
288 WATCHER_SV.write().sync(file.into(), init, read, write)
289 }
290
291 /// Same operation as [`sync`] but also tracks the operation status in a second var.
292 ///
293 /// The status variable is set to [`WatcherReadStatus::reading`] as soon as `read` starts and
294 /// is set to [`WatcherReadStatus::idle`] when read returns. If read returns a value the status
295 /// only updates to idle when the new sync value is available, or because read the same value.
296 ///
297 /// The status variable is set to [`WatcherSyncStatus::writing`] as soon as it updates and
298 /// is set to [`WatcherReadStatus::idle`] only when the new sync value is available, either
299 /// by update or because read the same value.
300 ///
301 /// [`sync`]: Self::sync
302 pub fn sync_status<O, S, ER, EW>(
303 &self,
304 file: impl Into<PathBuf>,
305 init: O,
306 read: impl FnMut(io::Result<WatchFile>) -> Result<Option<O>, ER> + Send + 'static,
307 write: impl FnMut(O, io::Result<WriteFile>) -> Result<(), EW> + Send + 'static,
308 ) -> (ArcVar<O>, ReadOnlyArcVar<S>)
309 where
310 O: VarValue,
311 S: WatcherSyncStatus<ER, EW>,
312 {
313 WATCHER_SV.write().sync_status(file.into(), init, read, write)
314 }
315
316 /// Watch `file` and calls `handler` every time it changes.
317 ///
318 /// Note that the `handler` is blocking, use [`async_app_hn!`] and [`task::wait`] to run IO without
319 /// blocking the app.
320 ///
321 /// [`async_app_hn!`]: macro@zng_app::handler::async_app_hn
322 /// [`task::wait`]: zng_task::wait
323 pub fn on_file_changed(&self, file: impl Into<PathBuf>, handler: impl AppHandler<FsChangesArgs>) -> EventHandle {
324 let file = file.into();
325 let handle = self.watch(file.clone());
326 FS_CHANGES_EVENT.on_event(FilterAppHandler::new(handler, move |args| {
327 let _handle = &handle;
328 args.events_for_path(&file).next().is_some()
329 }))
330 }
331
332 /// Watch `dir` and calls `handler` every time something inside it changes.
333 ///
334 /// Note that the `handler` is blocking, use [`async_app_hn!`] and [`task::wait`] to run IO without
335 /// blocking the app.
336 ///
337 /// [`async_app_hn!`]: macro@zng_app::handler::async_app_hn
338 /// [`task::wait`]: zng_task::wait
339 pub fn on_dir_changed(&self, dir: impl Into<PathBuf>, recursive: bool, handler: impl AppHandler<FsChangesArgs>) -> EventHandle {
340 let dir = dir.into();
341 let handle = self.watch_dir(dir.clone(), recursive);
342 FS_CHANGES_EVENT.on_event(FilterAppHandler::new(handler, move |args| {
343 let _handle = &handle;
344 args.events_for_path(&dir).next().is_some()
345 }))
346 }
347
348 /// Push a `note` that will be cloned on all subsequent change events until it the returned handle is dropped.
349 ///
350 /// This can be used to tag all events that happened over a period of time, something you can't do just
351 /// by receiving the events due to async delays caused by debounce.
352 ///
353 /// Note that the underlying system events the [`notify`](https://docs.rs/notify) crate uses are not guaranteed to be synchronous.
354 pub fn annotate(&self, note: Arc<dyn FsChangeNote>) -> FsChangeNoteHandle {
355 WATCHER_SV.write().annotate(note)
356 }
357}
358
359/// Represents a status type for [`WATCHER.sync_status`].
360///
361/// [`WATCHER.sync_status`]: WATCHER::sync_status
362pub trait WatcherSyncStatus<ER = io::Error, EW = io::Error>: WatcherReadStatus<ER> {
363 /// New writing value.
364 fn writing() -> Self;
365 /// New write error value.
366 fn write_error(e: EW) -> Self;
367}
368
369/// Represents a status type for [`WATCHER`] read-only operations.
370pub trait WatcherReadStatus<ER = io::Error>: VarValue + PartialEq {
371 /// New idle value.
372 fn idle() -> Self;
373 /// New reading value.
374 fn reading() -> Self;
375 /// New read error value.
376 fn read_error(e: ER) -> Self;
377}
378
379/// Represents an open read-only file provided by [`WATCHER.read`].
380///
381/// This type is a thin wrapper around the [`std::fs::File`] with some convenience parsing methods.
382///
383/// [`WATCHER.read`]: WATCHER::read
384#[derive(Debug)]
385pub struct WatchFile(fs::File);
386impl WatchFile {
387 /// Open read the file.
388 pub fn open(file: impl AsRef<Path>) -> io::Result<Self> {
389 Self::try_open_non_empty(file.as_ref(), true)
390 }
391 fn try_open_non_empty(path: &Path, retry: bool) -> io::Result<Self> {
392 let file = fs::File::open(path)?;
393
394 if retry && file.metadata()?.len() == 0 {
395 // some apps create an empty file unlocked, then write.
396 let _ = file;
397 std::thread::sleep(5.ms());
398 return Self::try_open_non_empty(path, false);
399 }
400
401 lock_shared(&file, Duration::from_secs(10))?;
402 Ok(Self(file))
403 }
404
405 /// Read the file contents as a text string.
406 pub fn text(&mut self) -> io::Result<Txt> {
407 self.string().map(Txt::from)
408 }
409
410 /// Read the file contents as a string.
411 pub fn string(&mut self) -> io::Result<String> {
412 use std::io::Read;
413 let mut s = String::new();
414 self.0.read_to_string(&mut s)?;
415 Ok(s)
416 }
417
418 /// Deserialize the file contents as JSON.
419 #[cfg(feature = "json")]
420 pub fn json<O>(&mut self) -> serde_json::Result<O>
421 where
422 O: serde::de::DeserializeOwned,
423 {
424 serde_json::from_reader(io::BufReader::new(&mut self.0))
425 }
426
427 /// Deserialize the file contents as TOML.
428 #[cfg(feature = "toml")]
429 pub fn toml<O>(&mut self) -> io::Result<O>
430 where
431 O: serde::de::DeserializeOwned,
432 {
433 use std::io::Read;
434 let mut buf = io::BufReader::new(&mut self.0);
435
436 let mut toml_str = String::new();
437 buf.read_to_string(&mut toml_str)?;
438
439 toml::de::from_str(&toml_str).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
440 }
441
442 /// Deserialize the file content as RON.
443 #[cfg(feature = "ron")]
444 pub fn ron<O>(&mut self) -> Result<O, ron::de::SpannedError>
445 where
446 O: serde::de::DeserializeOwned,
447 {
448 ron::de::from_reader(io::BufReader::new(&mut self.0))
449 }
450
451 /// Deserialize the file content as YAML.
452 #[cfg(feature = "yaml")]
453 pub fn yaml<O>(&mut self) -> serde_yaml::Result<O>
454 where
455 O: serde::de::DeserializeOwned,
456 {
457 serde_yaml::from_reader(io::BufReader::new(&mut self.0))
458 }
459
460 /// Read file and parse it.
461 pub fn parse<O: std::str::FromStr>(&mut self) -> Result<O, WatchFileParseError<O::Err>> {
462 use std::io::Read;
463 let mut s = String::new();
464 self.0.read_to_string(&mut s)?;
465 O::from_str(&s).map_err(WatchFileParseError::Parse)
466 }
467}
468impl ops::Deref for WatchFile {
469 type Target = fs::File;
470
471 fn deref(&self) -> &Self::Target {
472 &self.0
473 }
474}
475impl ops::DerefMut for WatchFile {
476 fn deref_mut(&mut self) -> &mut Self::Target {
477 &mut self.0
478 }
479}
480impl Drop for WatchFile {
481 fn drop(&mut self) {
482 let _ = FileExt::unlock(&self.0);
483 }
484}
485
486const TRANSACTION_GUID: &str = "6eIw3bYMS0uKaQMkTIQacQ";
487const TRANSACTION_LOCK_EXT: &str = "6eIw3bYMS0uKaQMkTIQacQ-lock.tmp";
488
489/// Represents an open write file provided by [`WATCHER.sync`].
490///
491/// This type actually writes to a temporary file and rename it over the actual file on commit only.
492/// The dereferenced [`fs::File`] is the temporary file, not the actual one.
493///
494/// # Transaction
495///
496/// To minimize the risk of file corruption exclusive locks are used, both the target file and the temp file
497/// are locked. An empty lock file is also used to cover the moment when both files are unlocked for the rename operation
498/// and the moment the temp file is acquired.
499///
500/// The temp file is the actual file path with file extension replaced with `{path/.file-name.ext}.{GUID}-{n}.tmp`, the `n` is a
501/// number from 0 to 999, if a temp file exists unlocked it will be reused.
502///
503/// The lock file is `{path/.file-name.ext}.{GUID}-lock.tmp`. Note that this
504/// lock file only helps for apps that use [`WriteFile`], but even without it the risk is minimal as the slow
505/// write operations are already flushed when it is time to commit.
506///
507/// [`WATCHER.sync`]: WATCHER::sync
508pub struct WriteFile {
509 temp_file: Option<fs::File>,
510 actual_file: Option<fs::File>,
511 transaction_lock: Option<fs::File>,
512
513 actual_path: PathBuf,
514 temp_path: PathBuf,
515 transaction_path: PathBuf,
516
517 cleaned: bool,
518}
519impl Drop for WriteFile {
520 fn drop(&mut self) {
521 if !self.cleaned {
522 tracing::error!("dropped sync write file without commit or cancel");
523 self.clean();
524 }
525 }
526}
527impl ops::Deref for WriteFile {
528 type Target = fs::File;
529
530 fn deref(&self) -> &Self::Target {
531 self.temp_file.as_ref().unwrap()
532 }
533}
534impl ops::DerefMut for WriteFile {
535 fn deref_mut(&mut self) -> &mut Self::Target {
536 self.temp_file.as_mut().unwrap()
537 }
538}
539impl WriteFile {
540 /// Open or create the file.
541 pub fn open(path: PathBuf) -> io::Result<Self> {
542 let actual_path = path.absolutize()?.into_owned();
543 if !actual_path.exists() {
544 if let Some(parent) = actual_path.parent() {
545 std::fs::create_dir_all(parent)?;
546 }
547 }
548
549 let hidden_name = match actual_path.file_name() {
550 Some(n) => format!(".{}", n.to_string_lossy()),
551 None => return Err(io::Error::new(io::ErrorKind::InvalidInput, "expected file name")),
552 };
553
554 let transaction_path = actual_path.with_file_name(format!("{hidden_name}.{TRANSACTION_LOCK_EXT}"));
555 let transaction_lock = fs::OpenOptions::new()
556 .create(true)
557 .truncate(true)
558 .write(true)
559 .open(&transaction_path)?;
560
561 const TIMEOUT: Duration = Duration::from_secs(10);
562
563 lock_exclusive(&transaction_lock, TIMEOUT)?;
564
565 let actual_file = fs::OpenOptions::new().write(true).create(true).truncate(false).open(&actual_path)?;
566 lock_exclusive(&actual_file, TIMEOUT)?;
567
568 let mut n = 0;
569 let mut temp_path = actual_path.with_file_name(format!("{hidden_name}.{TRANSACTION_GUID}-{n}.tmp"));
570 let temp_file = loop {
571 if let Ok(f) = fs::OpenOptions::new().write(true).create(true).truncate(true).open(&temp_path) {
572 if f.try_lock_exclusive().is_ok() {
573 break f;
574 }
575 }
576
577 n += 1;
578 temp_path = actual_path.with_file_name(format!("{hidden_name}.{TRANSACTION_GUID}-{n}.tmp"));
579 n += 1;
580 if n > 1000 {
581 return Err(io::Error::new(io::ErrorKind::AlreadyExists, "cannot create temporary file"));
582 }
583 };
584
585 Ok(Self {
586 actual_file: Some(actual_file),
587 temp_file: Some(temp_file),
588 transaction_lock: Some(transaction_lock),
589 actual_path,
590 temp_path,
591 transaction_path,
592 cleaned: false,
593 })
594 }
595
596 /// Write the text string.
597 pub fn write_text(&mut self, txt: &str) -> io::Result<()> {
598 self.write_all(txt.as_bytes())
599 }
600
601 /// Serialize and write.
602 ///
603 /// If `pretty` is `true` the JSON is formatted for human reading.
604 #[cfg(feature = "json")]
605 pub fn write_json<O: serde::Serialize>(&mut self, value: &O, pretty: bool) -> io::Result<()> {
606 let mut buf = io::BufWriter::new(ops::DerefMut::deref_mut(self));
607 if pretty {
608 serde_json::to_writer_pretty(&mut buf, value)?;
609 } else {
610 serde_json::to_writer(&mut buf, value)?;
611 }
612 buf.flush()
613 }
614
615 /// Serialize and write.
616 ///
617 /// If `pretty` is `true` the TOML is formatted for human reading.
618 #[cfg(feature = "toml")]
619 pub fn write_toml<O: serde::Serialize>(&mut self, value: &O, pretty: bool) -> io::Result<()> {
620 let toml = if pretty {
621 toml::ser::to_string_pretty(value)
622 } else {
623 toml::ser::to_string(value)
624 }
625 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
626
627 self.write_all(toml.as_bytes())
628 }
629
630 /// Serialize and write.
631 ///
632 /// If `pretty` is `true` the RON if formatted for human reading using the default pretty config.
633 #[cfg(feature = "ron")]
634 pub fn write_ron<O: serde::Serialize>(&mut self, value: &O, pretty: bool) -> io::Result<()> {
635 let mut buf = io::BufWriter::new(ops::DerefMut::deref_mut(self));
636 if pretty {
637 ron::ser::to_writer_pretty(&mut buf, value, Default::default()).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
638 } else {
639 ron::ser::to_writer(&mut buf, value).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
640 }
641 buf.flush()
642 }
643
644 /// Serialize and write.
645 #[cfg(feature = "yaml")]
646 pub fn write_yaml<O: serde::Serialize>(&mut self, value: &O) -> io::Result<()> {
647 let mut buf = io::BufWriter::new(ops::DerefMut::deref_mut(self));
648 serde_yaml::to_writer(&mut buf, value).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
649 buf.flush()
650 }
651
652 /// Commit write, flush and replace the actual file with the new one.
653 pub fn commit(mut self) -> io::Result<()> {
654 let r = self.replace_actual();
655 self.clean();
656 r
657 }
658
659 /// Cancel write, the file will not be updated.
660 pub fn cancel(mut self) {
661 self.clean();
662 }
663
664 fn replace_actual(&mut self) -> io::Result<()> {
665 let mut temp_file = self.temp_file.take().unwrap();
666 temp_file.flush()?;
667 temp_file.sync_all()?;
668
669 unlock_ok(&temp_file).unwrap();
670 drop(temp_file);
671
672 let actual_file = self.actual_file.take().unwrap();
673 unlock_ok(&actual_file)?;
674 drop(actual_file);
675
676 let mut retries = 0;
677 loop {
678 // commit by replacing the actual_path with already on disk temp_path file.
679 match fs::rename(&self.temp_path, &self.actual_path) {
680 Ok(()) => {
681 break;
682 }
683 Err(e) => match e.kind() {
684 io::ErrorKind::PermissionDenied => {
685 if retries == 5 {
686 // Give-up, we manage to write lock both temp and actual just
687 // before this, but now we can't replace actual and remove temp.
688 // Hardware issue? Or another process holding a lock for 1s+50ms*5.
689 return Err(e);
690 } else if retries > 0 {
691 // Second+ retries:
692 //
693 // probably a system issue.
694 //
695 // Windows sporadically returns ACCESS_DENIED for kernel!SetRenameInformationFile in
696 // other apps that use the same save pattern (write-tmp -> close-tmp -> rename).
697 // see GIMP issue: https://gitlab.gnome.org/GNOME/gimp/-/issues/1370
698 //
699 // I used procmon to trace all file operations, there is no other app trying to use
700 // the temp and actual files when the ACCESS_DENIED occurs, both files are unlocked and
701 // closed before the rename calls start. This might be a Windows bug.
702 std::thread::sleep(30.ms());
703 } else {
704 // first retry:
705 //
706 // probably another process reading the `actual_path`.
707 //
708 // Reacquire a write lock and unlock, just to wait the external app.
709 match std::fs::File::options().write(true).open(&self.actual_path) {
710 Ok(f) => {
711 if lock_exclusive(&f, 10.secs()).is_ok() {
712 // acquired actual ok, retry
713 let _ = unlock_ok(&f);
714 }
715 }
716 Err(e) => match e.kind() {
717 io::ErrorKind::NotFound => {
718 // all good, rename will create actual
719 continue;
720 }
721 _ => {
722 // unknown error, let retry handle it
723 std::thread::sleep(30.ms());
724 }
725 },
726 }
727 }
728
729 retries += 1;
730 }
731 _ => return Err(e),
732 },
733 }
734 }
735
736 Ok(())
737 }
738
739 fn clean(&mut self) {
740 self.cleaned = true;
741
742 if let Some(tmp) = self.temp_file.take() {
743 let _ = FileExt::unlock(&tmp);
744 }
745 if let Err(e) = fs::remove_file(&self.temp_path) {
746 tracing::debug!("failed to cleanup temp file, {e}")
747 }
748
749 if let Some(file) = self.actual_file.take() {
750 let _ = FileExt::unlock(&file);
751 }
752
753 let transaction = self.transaction_lock.take().unwrap();
754 let _ = FileExt::unlock(&transaction);
755 let _ = fs::remove_file(&self.transaction_path);
756 }
757}
758
759/// Error for [`WatchFile::parse`].
760#[derive(Debug)]
761pub enum WatchFileParseError<E> {
762 /// Error reading the file.
763 Io(io::Error),
764 /// Error parsing the file.
765 Parse(E),
766}
767impl<E> From<io::Error> for WatchFileParseError<E> {
768 fn from(value: io::Error) -> Self {
769 Self::Io(value)
770 }
771}
772impl<E: fmt::Display> fmt::Display for WatchFileParseError<E> {
773 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
774 match self {
775 WatchFileParseError::Io(e) => write!(f, "read error, {e}"),
776 WatchFileParseError::Parse(e) => write!(f, "parse error, {e}"),
777 }
778 }
779}
780impl<E: std::error::Error + 'static> std::error::Error for WatchFileParseError<E> {
781 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
782 match self {
783 WatchFileParseError::Io(e) => Some(e),
784 WatchFileParseError::Parse(e) => Some(e),
785 }
786 }
787}
788
789/// Represents a [`FsChange`] note.
790///
791/// This trait is already implemented for types it applies.
792#[diagnostic::on_unimplemented(note = "`FsChangeNote` is implemented for all `T: Debug + Any + Send + Sync`")]
793pub trait FsChangeNote: fmt::Debug + std::any::Any + Send + Sync {
794 /// Access any.
795 fn as_any(&self) -> &dyn std::any::Any;
796}
797impl<T: fmt::Debug + std::any::Any + Send + Sync> FsChangeNote for T {
798 fn as_any(&self) -> &dyn std::any::Any {
799 self
800 }
801}
802
803/// Handle that holds a [`WATCHER.annotate`] note.
804///
805/// [`WATCHER.annotate`]: WATCHER::annotate
806#[derive(Clone)]
807#[must_use = "the note is removed when the handle is dropped"]
808pub struct FsChangeNoteHandle(#[expect(dead_code)] Arc<Arc<dyn FsChangeNote>>);
809
810/// Annotation for file watcher events and var update tags.
811///
812/// Identifies the [`WATCHER.sync`] file that is currently being written to.
813///
814/// [`WATCHER.sync`]: WATCHER::sync
815#[derive(Debug, PartialEq, Eq)]
816pub struct WatcherSyncWriteNote(PathBuf);
817impl WatcherSyncWriteNote {
818 /// Deref.
819 pub fn as_path(&self) -> &Path {
820 self
821 }
822}
823impl ops::Deref for WatcherSyncWriteNote {
824 type Target = Path;
825
826 fn deref(&self) -> &Self::Target {
827 self.0.as_path()
828 }
829}
830
831/// File system change event types.
832///
833/// The event for each change is available in [`FsChange::event`].
834///
835/// This module re-exports types from the [`notify`](https://docs.rs/notify) crate.
836pub mod fs_event {
837 pub use notify::event::{
838 AccessKind, AccessMode, CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind, RenameMode,
839 };
840 pub use notify::{Error, ErrorKind};
841}
842
843/// Represents a single file system change, annotated.
844#[derive(Debug)]
845pub struct FsChange {
846 /// All [`WATCHER.annotate`] that where set when this event happened.
847 ///
848 /// [`WATCHER.annotate`]: WATCHER::annotate
849 pub notes: Vec<Arc<dyn FsChangeNote>>,
850
851 /// The actual notify event or error.
852 pub event: Result<fs_event::Event, fs_event::Error>,
853}
854impl FsChange {
855 /// If the change affects the `path`.
856 pub fn is_for_path(&self, path: &Path) -> bool {
857 if let Ok(ev) = &self.event {
858 return ev.paths.iter().any(|p| p.starts_with(path));
859 }
860 false
861 }
862
863 /// If the change affects any path matched by the glob pattern.
864 pub fn is_for_glob(&self, pattern: &glob::Pattern) -> bool {
865 if let Ok(ev) = &self.event {
866 return ev.paths.iter().any(|p| pattern.matches_path(p));
867 }
868 false
869 }
870
871 /// Iterate over all notes of the type `T`.
872 pub fn notes<T: FsChangeNote>(&self) -> impl Iterator<Item = &T> {
873 self.notes.iter().filter_map(|n| FsChangeNote::as_any(&**n).downcast_ref::<T>())
874 }
875}
876
877event_args! {
878 /// [`FS_CHANGES_EVENT`] arguments.
879 pub struct FsChangesArgs {
880 /// All notify changes since the last event.
881 pub changes: Arc<Vec<FsChange>>,
882
883 ..
884
885 /// None, only app level handlers receive this event.
886 fn delivery_list(&self, list: &mut UpdateDeliveryList) {
887 let _ = list;
888 }
889 }
890}
891impl FsChangesArgs {
892 /// Iterate over all change events.
893 pub fn events(&self) -> impl Iterator<Item = &fs_event::Event> + '_ {
894 self.changes.iter().filter_map(|r| r.event.as_ref().ok())
895 }
896
897 /// Iterate over all file watcher errors.
898 pub fn errors(&self) -> impl Iterator<Item = ¬ify::Error> + '_ {
899 self.changes.iter().filter_map(|r| r.event.as_ref().err())
900 }
901
902 /// Returns `true` is some events where lost.
903 ///
904 /// This indicates either a lapse in the events or a change in the filesystem such that events
905 /// received so far can no longer be relied on to represent the state of the filesystem now.
906 ///
907 /// An application that simply reacts to file changes may not care about this. An application
908 /// that keeps an in-memory representation of the filesystem will need to care, and will need
909 /// to refresh that representation directly from the filesystem.
910 pub fn rescan(&self) -> bool {
911 self.events().any(|e| e.need_rescan())
912 }
913
914 /// Iterate over all changes that affects paths selected by the `glob` pattern.
915 pub fn changes_for(&self, glob: &str) -> Result<impl Iterator<Item = &FsChange> + '_, glob::PatternError> {
916 let glob = glob::Pattern::new(glob)?;
917 Ok(self.changes.iter().filter(move |c| c.is_for_glob(&glob)))
918 }
919
920 /// Iterate over all changes that affects paths that are equal to `path` or inside it.
921 pub fn changes_for_path<'a>(&'a self, path: &'a Path) -> impl Iterator<Item = &'a FsChange> + 'a {
922 self.changes.iter().filter(move |c| c.is_for_path(path))
923 }
924
925 /// Iterate over all change events that affects that are equal to `path` or inside it.
926 pub fn events_for(&self, glob: &str) -> Result<impl Iterator<Item = &fs_event::Event> + '_, glob::PatternError> {
927 let glob = glob::Pattern::new(glob)?;
928 Ok(self.events().filter(move |ev| ev.paths.iter().any(|p| glob.matches_path(p))))
929 }
930
931 /// Iterate over all change events that affects paths that are equal to `path` or inside it.
932 pub fn events_for_path<'a>(&'a self, path: &'a Path) -> impl Iterator<Item = &'a fs_event::Event> + 'a {
933 self.events().filter(move |ev| ev.paths.iter().any(|p| p.starts_with(path)))
934 }
935}
936
937event! {
938 /// Event sent by the [`WATCHER`] service on directories or files that are watched.
939 pub static FS_CHANGES_EVENT: FsChangesArgs;
940}
941
942/// Represents an active file or directory watcher in [`WATCHER`].
943#[derive(Clone)]
944#[must_use = "the watcher is dropped if the handle is dropped"]
945pub struct WatcherHandle(Handle<()>);
946
947impl WatcherHandle {
948 /// Handle to no watcher.
949 pub fn dummy() -> Self {
950 Self(Handle::dummy(()))
951 }
952
953 /// If [`perm`](Self::perm) was called in another clone of this handle.
954 ///
955 /// If `true` the resource will stay in memory for the duration of the app, unless [`force_drop`](Self::force_drop)
956 /// is also called.
957 pub fn is_permanent(&self) -> bool {
958 self.0.is_permanent()
959 }
960
961 /// Force drops the watcher, meaning it will be dropped even if there are other handles active.
962 pub fn force_drop(self) {
963 self.0.force_drop()
964 }
965
966 /// If the watcher is dropped.
967 pub fn is_dropped(&self) -> bool {
968 self.0.is_dropped()
969 }
970
971 /// Drop the handle without dropping the watcher, the watcher will stay active for the
972 /// duration of the app process.
973 pub fn perm(self) {
974 self.0.perm()
975 }
976}