env_watcher/
lib.rs

1#[forbid(unsafe_code)]
2#[forbid(unused_imports)]
3#[forbid(missing_docs)]
4#[cfg(test)]
5mod test;
6#[cfg(feature = "derive")]
7pub mod derive;
8
9use crossbeam_channel::{Receiver, Sender};
10use diff::Diff;
11use regex::Regex;
12use std::collections::HashMap;
13use std::sync::{Arc, Mutex};
14use std::time::Duration;
15use thiserror::Error;
16use spin_sleep::sleep;
17use log::{info, debug, trace};
18
19pub type Result<T> = std::result::Result<T, Error>;
20
21/// Library error.
22#[derive(Error, Debug)]
23pub enum Error {
24    /// Incorrect pattern for variables
25    #[error("Invalid pattern: {pattern:?}. Error: {error:?}")]
26    InvalidPattern { pattern: String, error: String },
27
28    #[error("Re-init env watcher.")]
29    DoubleInitialWatcher,
30
31    #[error("In current watcher exists subscribers.")]
32    ReinitializedWithSubscribers,
33}
34
35/// Changing the current state for a subscriber
36#[derive(Debug, Clone)]
37pub enum ChangeState {
38    /// Add or change state
39    Edit(String, String),
40
41    /// Delete key
42    Delete(String),
43}
44
45#[derive(Debug, Clone, Hash, Eq, PartialEq)]
46pub enum Subscribe {
47    /// Return all env variables
48    All,
49
50    /// Subscribe by env list
51    /// let v = vec!["my.project.key1", "my.project.key2", "my.project.key3"];
52    /// let subscribe = Subscribe::Envs(v);
53    Envs(Vec<String>),
54
55    /// Subscribe by pattern env.
56    /// Example by pattern:
57    /// let v = vec!["my.project.*", "my.project2.*"];
58    /// let subscribe = Subscribe::PatternEnvs(v);
59    PatternEnvs(Vec<String>),
60}
61
62/// Baseline implementation for data.
63/// A separate thread listens for data changes through the channel, in case of data changes, we receive an event and change the data snapshot.
64pub struct EnvironmentData {
65    /// Snapshot data
66    data: Arc<Mutex<HashMap<String, String>>>,
67
68    /// Channel for receiving changes for a specific key
69    rx: Receiver<ChangeState>,
70}
71
72impl EnvironmentData {
73    /// Getter for snapshot data
74    pub fn data(&self) -> HashMap<String, String> {
75        self.data.lock().unwrap().clone()
76    }
77
78    /// Reference for current snapshot
79    pub fn ref_data(&self) -> Arc<Mutex<HashMap<String, String>>> {
80        Arc::clone(&self.data)
81    }
82
83    /// In a separate thread, we listen to the change of variables
84    pub fn receive(&self) {
85        let snapshot = Arc::clone(&self.data);
86        let rx = self.rx.clone();
87
88        std::thread::spawn(move || loop {
89            let data = rx.recv().unwrap();
90            let mut snapshot = snapshot.lock().unwrap();
91            match data {
92                ChangeState::Edit(k, v) => {
93                    snapshot.insert(k.clone(), v.clone());
94                }
95                ChangeState::Delete(k) => {
96                    snapshot.remove(&*k);
97                }
98            };
99        });
100    }
101}
102
103/// The current state of the environment
104pub struct EnvironmentWatcher {
105    /// Current env state
106    state: Arc<Mutex<HashMap<String, String>>>,
107
108    /// Sender list
109    /// key - subscribe type
110    /// value - sender list, for notification
111    senders: Arc<Mutex<HashMap<Subscribe, Vec<Sender<ChangeState>>>>>,
112
113    /// reading environment variables
114    interval: Duration,
115}
116
117impl EnvironmentWatcher {
118    /// Create a new instance to track the state
119    /// Interval - how often we request data and update the state (if required)
120    pub fn new(interval: Duration) -> Self {
121        info!("Starting env watcher with interval {:?}", &interval);
122        let env_state = Self {
123            state: Arc::new(Mutex::new(Default::default())),
124            senders: Arc::new(Mutex::new(Default::default())),
125            interval,
126        };
127        env_state.preload();
128        env_state.run();
129        env_state
130    }
131
132    /// Preload the environment
133    fn preload(&self) {
134        let mut data = self.state.lock().unwrap();
135        std::env::vars().for_each(|kv| {
136            data.insert(kv.0, kv.1);
137        });
138        trace!("Preload environment map:\n{:?}", &data)
139    }
140
141    /// Subscribers size. (only `Subscribe`)
142    pub fn size(&self) -> usize {
143        let size = self.senders.lock().unwrap().len();
144        debug!("Current subscribers size: {:?}", &size);
145        size
146    }
147
148    /// Subscribe to the keys and get a snapshot of the data
149    pub fn subscribe_snapshot(&self, subscribe: Subscribe) -> Result<EnvironmentData> {
150        let sub = self.subscribe(subscribe)?;
151        let data = EnvironmentData {
152            data: Arc::new(Mutex::new(sub.0)),
153            rx: sub.1,
154        };
155        data.receive();
156        Ok(data)
157    }
158
159    /// We subscribe to the keys, if successful, we get a snapshot of the current data and a channel for updating this data
160    pub fn subscribe(
161        &self,
162        subscribe: Subscribe,
163    ) -> Result<(HashMap<String, String>, Receiver<ChangeState>)> {
164        debug!("Subscribe by {:?}", &subscribe);
165        let (tx, rx) = crossbeam_channel::unbounded::<ChangeState>();
166
167        let mut data = {
168            let state = self.state.lock();
169
170            let state_guard = state.unwrap();
171
172            state_guard.clone()
173        };
174
175        let sub = match &subscribe {
176            Subscribe::All => (data, rx),
177
178            Subscribe::Envs(envs) => {
179                data.retain(|k, _| envs.contains(k));
180
181                (data, rx)
182            }
183
184            Subscribe::PatternEnvs(envs) => {
185                let envs = envs
186                    .iter()
187                    .map(|pattern| {
188                        Regex::new(&*pattern)
189                            .map_err(|e| Error::InvalidPattern {
190                                pattern: pattern.clone(),
191                                error: e.to_string(),
192                            })
193                            .unwrap()
194                    })
195                    .collect::<Vec<Regex>>();
196
197                data.retain(|k, _| {
198                    let mut find = false;
199                    for env in envs.iter() {
200                        match env.find(k) {
201                            None => {}
202                            Some(_) => {
203                                find = true;
204                            }
205                        }
206
207                        if find {
208                            break;
209                        }
210                    }
211                    find
212                });
213
214                (data, rx)
215            }
216        };
217
218        self._subscribe(subscribe.clone(), tx);
219        Ok(sub)
220    }
221
222    /// Adding keys to the current state.
223    fn _subscribe(&self, sub: Subscribe, tx: Sender<ChangeState>) {
224        let senders = self.senders.lock();
225        let mut guard = senders.unwrap();
226        let entry = guard.entry(sub).or_insert_with(|| vec![]);
227        entry.push(tx);
228    }
229
230    /// In a separate thread, we process state changes at intervals.
231    /// If the values change, we will notify the subscribers who have subscribed to these values.
232    pub fn run(&self) {
233        let data = Arc::clone(&self.state);
234        let subs = Arc::clone(&self.senders);
235        let interval = self.interval.clone();
236
237        std::thread::spawn(move || loop {
238            {
239                let data = data.lock();
240                let mut data_guard = data.unwrap();
241
242                let subs = subs.lock();
243                let mut subs_guard = subs.unwrap();
244
245                let mut sys_data = HashMap::<String, String>::new();
246                std::env::vars().for_each(|kv| {
247                    sys_data.insert(kv.0, kv.1);
248                });
249
250                if !sys_data.eq(&data_guard) {
251                    let different = data_guard.diff(&sys_data);
252
253                    let mut changes = HashMap::<String, ChangeState>::new();
254
255                    let remove_set = different.removed;
256                    let altered = different.altered;
257
258                    if !remove_set.is_empty() {
259                        remove_set.iter().for_each(|k| {
260                            let delete = ChangeState::Delete(k.clone());
261                            changes.insert(k.clone(), delete);
262                        });
263                    }
264
265                    if !altered.is_empty() {
266                        altered.iter().for_each(|k| {
267                            let alter =
268                                ChangeState::Edit(k.0.clone(), k.1.clone().unwrap_or_default());
269                            changes.insert(k.0.clone(), alter);
270                        });
271                    }
272
273                    if !changes.is_empty() {
274                        debug!("Find changes in environment.\nDiff {:?}", &changes);
275                        subs_guard.iter_mut().for_each(|s| {
276                            let sub = s.0;
277                            let senders = s.1;
278
279                            match sub {
280                                Subscribe::All => {
281                                    changes.iter().for_each(|change| {
282                                        senders.iter().for_each(|sender| {
283                                            sender.send(change.1.clone()).unwrap();
284                                        });
285                                    });
286                                }
287
288                                Subscribe::Envs(envs) => {
289                                    changes.iter().for_each(|change| {
290                                        if envs.contains(&change.0) {
291                                            senders.iter().for_each(|sender| {
292                                                sender.send(change.1.clone()).unwrap();
293                                            });
294                                        }
295                                    });
296                                }
297
298                                Subscribe::PatternEnvs(envs) => {
299                                    let envs = envs
300                                        .iter()
301                                        .map(|pattern| Regex::new(pattern).unwrap())
302                                        .collect::<Vec<Regex>>();
303
304                                    changes.iter().for_each(|change| {
305                                        envs.iter().for_each(|reg| {
306                                            let mat = reg.find(&*change.0);
307                                            match mat {
308                                                None => {}
309                                                Some(_) => {
310                                                    senders.iter().for_each(|sender| {
311                                                        sender.send(change.1.clone()).unwrap();
312                                                    });
313                                                }
314                                            }
315                                        });
316                                    });
317                                }
318                            }
319                        });
320                    }
321                };
322                *data_guard = sys_data;
323            }
324            sleep(interval);
325        });
326    }
327}
328
329/// Default instance with read interval 30 seconds.
330impl Default for EnvironmentWatcher {
331    fn default() -> Self {
332        let env_state = Self {
333            state: Arc::new(Mutex::new(Default::default())),
334            senders: Arc::new(Mutex::new(HashMap::default())),
335            interval: Duration::from_millis(5 * 100),
336        };
337        env_state.run();
338        env_state
339    }
340}