1#[forbid(unsafe_code)]
2#[forbid(unused_imports)]
3#[forbid(missing_docs)]
4#[cfg(test)]
5mod test;
6#[cfg(feature = "derive")]
7pub mod derive;
8
9use crossbeam_channel::{Receiver, Sender};
10use diff::Diff;
11use regex::Regex;
12use std::collections::HashMap;
13use std::sync::{Arc, Mutex};
14use std::time::Duration;
15use thiserror::Error;
16use spin_sleep::sleep;
17use log::{info, debug, trace};
18
19pub type Result<T> = std::result::Result<T, Error>;
20
21#[derive(Error, Debug)]
23pub enum Error {
24 #[error("Invalid pattern: {pattern:?}. Error: {error:?}")]
26 InvalidPattern { pattern: String, error: String },
27
28 #[error("Re-init env watcher.")]
29 DoubleInitialWatcher,
30
31 #[error("In current watcher exists subscribers.")]
32 ReinitializedWithSubscribers,
33}
34
35#[derive(Debug, Clone)]
37pub enum ChangeState {
38 Edit(String, String),
40
41 Delete(String),
43}
44
45#[derive(Debug, Clone, Hash, Eq, PartialEq)]
46pub enum Subscribe {
47 All,
49
50 Envs(Vec<String>),
54
55 PatternEnvs(Vec<String>),
60}
61
62pub struct EnvironmentData {
65 data: Arc<Mutex<HashMap<String, String>>>,
67
68 rx: Receiver<ChangeState>,
70}
71
72impl EnvironmentData {
73 pub fn data(&self) -> HashMap<String, String> {
75 self.data.lock().unwrap().clone()
76 }
77
78 pub fn ref_data(&self) -> Arc<Mutex<HashMap<String, String>>> {
80 Arc::clone(&self.data)
81 }
82
83 pub fn receive(&self) {
85 let snapshot = Arc::clone(&self.data);
86 let rx = self.rx.clone();
87
88 std::thread::spawn(move || loop {
89 let data = rx.recv().unwrap();
90 let mut snapshot = snapshot.lock().unwrap();
91 match data {
92 ChangeState::Edit(k, v) => {
93 snapshot.insert(k.clone(), v.clone());
94 }
95 ChangeState::Delete(k) => {
96 snapshot.remove(&*k);
97 }
98 };
99 });
100 }
101}
102
103pub struct EnvironmentWatcher {
105 state: Arc<Mutex<HashMap<String, String>>>,
107
108 senders: Arc<Mutex<HashMap<Subscribe, Vec<Sender<ChangeState>>>>>,
112
113 interval: Duration,
115}
116
117impl EnvironmentWatcher {
118 pub fn new(interval: Duration) -> Self {
121 info!("Starting env watcher with interval {:?}", &interval);
122 let env_state = Self {
123 state: Arc::new(Mutex::new(Default::default())),
124 senders: Arc::new(Mutex::new(Default::default())),
125 interval,
126 };
127 env_state.preload();
128 env_state.run();
129 env_state
130 }
131
132 fn preload(&self) {
134 let mut data = self.state.lock().unwrap();
135 std::env::vars().for_each(|kv| {
136 data.insert(kv.0, kv.1);
137 });
138 trace!("Preload environment map:\n{:?}", &data)
139 }
140
141 pub fn size(&self) -> usize {
143 let size = self.senders.lock().unwrap().len();
144 debug!("Current subscribers size: {:?}", &size);
145 size
146 }
147
148 pub fn subscribe_snapshot(&self, subscribe: Subscribe) -> Result<EnvironmentData> {
150 let sub = self.subscribe(subscribe)?;
151 let data = EnvironmentData {
152 data: Arc::new(Mutex::new(sub.0)),
153 rx: sub.1,
154 };
155 data.receive();
156 Ok(data)
157 }
158
159 pub fn subscribe(
161 &self,
162 subscribe: Subscribe,
163 ) -> Result<(HashMap<String, String>, Receiver<ChangeState>)> {
164 debug!("Subscribe by {:?}", &subscribe);
165 let (tx, rx) = crossbeam_channel::unbounded::<ChangeState>();
166
167 let mut data = {
168 let state = self.state.lock();
169
170 let state_guard = state.unwrap();
171
172 state_guard.clone()
173 };
174
175 let sub = match &subscribe {
176 Subscribe::All => (data, rx),
177
178 Subscribe::Envs(envs) => {
179 data.retain(|k, _| envs.contains(k));
180
181 (data, rx)
182 }
183
184 Subscribe::PatternEnvs(envs) => {
185 let envs = envs
186 .iter()
187 .map(|pattern| {
188 Regex::new(&*pattern)
189 .map_err(|e| Error::InvalidPattern {
190 pattern: pattern.clone(),
191 error: e.to_string(),
192 })
193 .unwrap()
194 })
195 .collect::<Vec<Regex>>();
196
197 data.retain(|k, _| {
198 let mut find = false;
199 for env in envs.iter() {
200 match env.find(k) {
201 None => {}
202 Some(_) => {
203 find = true;
204 }
205 }
206
207 if find {
208 break;
209 }
210 }
211 find
212 });
213
214 (data, rx)
215 }
216 };
217
218 self._subscribe(subscribe.clone(), tx);
219 Ok(sub)
220 }
221
222 fn _subscribe(&self, sub: Subscribe, tx: Sender<ChangeState>) {
224 let senders = self.senders.lock();
225 let mut guard = senders.unwrap();
226 let entry = guard.entry(sub).or_insert_with(|| vec![]);
227 entry.push(tx);
228 }
229
230 pub fn run(&self) {
233 let data = Arc::clone(&self.state);
234 let subs = Arc::clone(&self.senders);
235 let interval = self.interval.clone();
236
237 std::thread::spawn(move || loop {
238 {
239 let data = data.lock();
240 let mut data_guard = data.unwrap();
241
242 let subs = subs.lock();
243 let mut subs_guard = subs.unwrap();
244
245 let mut sys_data = HashMap::<String, String>::new();
246 std::env::vars().for_each(|kv| {
247 sys_data.insert(kv.0, kv.1);
248 });
249
250 if !sys_data.eq(&data_guard) {
251 let different = data_guard.diff(&sys_data);
252
253 let mut changes = HashMap::<String, ChangeState>::new();
254
255 let remove_set = different.removed;
256 let altered = different.altered;
257
258 if !remove_set.is_empty() {
259 remove_set.iter().for_each(|k| {
260 let delete = ChangeState::Delete(k.clone());
261 changes.insert(k.clone(), delete);
262 });
263 }
264
265 if !altered.is_empty() {
266 altered.iter().for_each(|k| {
267 let alter =
268 ChangeState::Edit(k.0.clone(), k.1.clone().unwrap_or_default());
269 changes.insert(k.0.clone(), alter);
270 });
271 }
272
273 if !changes.is_empty() {
274 debug!("Find changes in environment.\nDiff {:?}", &changes);
275 subs_guard.iter_mut().for_each(|s| {
276 let sub = s.0;
277 let senders = s.1;
278
279 match sub {
280 Subscribe::All => {
281 changes.iter().for_each(|change| {
282 senders.iter().for_each(|sender| {
283 sender.send(change.1.clone()).unwrap();
284 });
285 });
286 }
287
288 Subscribe::Envs(envs) => {
289 changes.iter().for_each(|change| {
290 if envs.contains(&change.0) {
291 senders.iter().for_each(|sender| {
292 sender.send(change.1.clone()).unwrap();
293 });
294 }
295 });
296 }
297
298 Subscribe::PatternEnvs(envs) => {
299 let envs = envs
300 .iter()
301 .map(|pattern| Regex::new(pattern).unwrap())
302 .collect::<Vec<Regex>>();
303
304 changes.iter().for_each(|change| {
305 envs.iter().for_each(|reg| {
306 let mat = reg.find(&*change.0);
307 match mat {
308 None => {}
309 Some(_) => {
310 senders.iter().for_each(|sender| {
311 sender.send(change.1.clone()).unwrap();
312 });
313 }
314 }
315 });
316 });
317 }
318 }
319 });
320 }
321 };
322 *data_guard = sys_data;
323 }
324 sleep(interval);
325 });
326 }
327}
328
329impl Default for EnvironmentWatcher {
331 fn default() -> Self {
332 let env_state = Self {
333 state: Arc::new(Mutex::new(Default::default())),
334 senders: Arc::new(Mutex::new(HashMap::default())),
335 interval: Duration::from_millis(5 * 100),
336 };
337 env_state.run();
338 env_state
339 }
340}