nym_async_file_watcher/
lib.rs

1// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
2// SPDX-License-Identifier: Apache-2.0
3
4use futures::StreamExt;
5use futures::channel::mpsc;
6use notify::event::{DataChange, MetadataKind, ModifyKind};
7use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
8use std::collections::HashMap;
9use std::path::{Path, PathBuf};
10use std::time::Duration;
11use tokio::time::Instant;
12
13pub use notify::{Error as NotifyError, Result as NotifyResult};
14
15pub type FileWatcherEventSender = mpsc::UnboundedSender<Event>;
16pub type FileWatcherEventReceiver = mpsc::UnboundedReceiver<Event>;
17
18/// Simple file watcher that sends a notification whenever there was any changed in the watched file.
19pub struct AsyncFileWatcher {
20    path: PathBuf,
21    watcher: RecommendedWatcher,
22    is_watching: bool,
23    filters: Option<Vec<EventKind>>,
24    last_received: HashMap<EventKind, Instant>,
25    tick_duration: Duration,
26
27    inner_rx: mpsc::UnboundedReceiver<NotifyResult<Event>>,
28    event_sender: FileWatcherEventSender,
29}
30
31impl AsyncFileWatcher {
32    pub fn new_file_changes_watcher<P: AsRef<Path>>(
33        path: P,
34        event_sender: FileWatcherEventSender,
35    ) -> NotifyResult<Self> {
36        Self::new(
37            path,
38            event_sender,
39            Some(vec![
40                EventKind::Modify(ModifyKind::Data(DataChange::Content)),
41                EventKind::Modify(ModifyKind::Data(DataChange::Any)),
42                EventKind::Modify(ModifyKind::Metadata(MetadataKind::Any)),
43            ]),
44            None,
45        )
46    }
47
48    pub fn new<P: AsRef<Path>>(
49        path: P,
50        event_sender: FileWatcherEventSender,
51        filters: Option<Vec<EventKind>>,
52        tick_duration: Option<Duration>,
53    ) -> NotifyResult<Self> {
54        let watcher_config = Config::default();
55        let (inner_tx, inner_rx) = mpsc::unbounded();
56        let watcher = RecommendedWatcher::new(
57            move |res| {
58                if let Err(_err) = inner_tx.unbounded_send(res) {
59                    // I guess it's theoretically possible during shutdown?
60                    log::error!(
61                        "failed to send watched file event - the received must have been dropped!"
62                    );
63                }
64            },
65            watcher_config,
66        )?;
67
68        Ok(AsyncFileWatcher {
69            path: path.as_ref().to_path_buf(),
70            watcher,
71            is_watching: false,
72            filters,
73            last_received: HashMap::new(),
74            tick_duration: tick_duration.unwrap_or(Duration::from_secs(5)),
75            inner_rx,
76            event_sender,
77        })
78    }
79
80    pub fn with_filters(mut self, filters: Option<Vec<EventKind>>) -> Self {
81        self.filters = filters;
82        self
83    }
84
85    pub fn with_filter(mut self, filter: EventKind) -> Self {
86        match &mut self.filters {
87            None => {
88                self.filters = Some(vec![filter]);
89            }
90            Some(filters) => filters.push(filter),
91        }
92        self
93    }
94
95    fn should_propagate(&self, event: &Event, now: Instant) -> bool {
96        // when testing I was consistently getting two `Modify(Data(Any))` events in quick succession
97        // (probably to modify content and metadata).
98        // we really only want to propagate one of them
99        if let Some(previous) = self.last_received.get(&event.kind)
100            && now.duration_since(*previous) < self.tick_duration
101        {
102            return false;
103        }
104
105        let Some(filters) = &self.filters else {
106            return true;
107        };
108
109        for filter in filters {
110            if &event.kind == filter {
111                return true;
112            }
113        }
114        false
115    }
116
117    fn start_watching(&mut self) -> NotifyResult<()> {
118        self.is_watching = true;
119        self.watcher.watch(&self.path, RecursiveMode::NonRecursive)
120    }
121
122    fn stop_watching(&mut self) -> NotifyResult<()> {
123        self.is_watching = false;
124        self.watcher.unwatch(&self.path)
125    }
126
127    pub async fn watch(&mut self) -> NotifyResult<()> {
128        self.start_watching()?;
129
130        while let Some(event) = self.inner_rx.next().await {
131            match event {
132                Ok(event) => {
133                    let now = Instant::now();
134                    if self.should_propagate(&event, now) {
135                        self.last_received.insert(event.kind, now);
136                        if let Err(_err) = self.event_sender.unbounded_send(event) {
137                            log::error!("the file watcher receiver has been dropped!");
138                        }
139                    } else {
140                        log::debug!("will not propagate information about {event:?}");
141                    }
142                }
143                Err(err) => {
144                    // TODO: to be determined if this should stop the whole thing or not
145                    // (need to know what kind of errors can be returned)
146                    log::error!(
147                        "encountered an error while watching {:?}: {err}",
148                        self.path.as_path()
149                    );
150                }
151            }
152        }
153
154        self.stop_watching()
155    }
156
157    pub fn is_watching(&self) -> bool {
158        self.is_watching
159    }
160}