xvc_core/util/
pmp.rs

1//! Core file operationscorefil
2
3use std::collections::HashMap;
4
5use std::sync::{Arc, Mutex, RwLock};
6use std::thread::{self, JoinHandle};
7use std::time::Duration;
8use xvc_logging::{error, uwr, XvcOutputSender};
9use xvc_walker::{build_ignore_patterns, make_watcher, IgnoreRules, MatchResult, PathEvent};
10
11use crate::error::Error;
12use crate::error::Result;
13use crate::util::xvcignore::COMMON_IGNORE_PATTERNS;
14use crate::{XvcFileType, XVCIGNORE_FILENAME};
15use crossbeam_channel::{bounded, RecvError, Select, Sender};
16
17use crate::types::{xvcpath::XvcPath, xvcroot::XvcRoot};
18use crate::XvcMetadata;
19
20use super::XvcPathMetadataMap;
21
22/// A cached path metadata provider.
23/// It starts from `xvc_root` and caches [XvcMetadata] when the paths are requested.
24#[derive(Debug)]
25pub struct XvcPathMetadataProvider {
26    /// The root directory to start walking from
27    xvc_root: XvcRoot,
28    path_map: Arc<RwLock<XvcPathMetadataMap>>,
29    kill_signal_sender: Arc<Sender<bool>>,
30    // This is to keep the background thread and no one needs to read it currently
31    #[allow(dead_code)]
32    background_thread: Arc<Mutex<JoinHandle<Result<()>>>>,
33    output_sender: XvcOutputSender,
34    ignore_rules: IgnoreRules,
35}
36
37impl XvcPathMetadataProvider {
38    /// Create a new PathMetadataProvider
39    pub fn new(output_sender: &XvcOutputSender, xvc_root: &XvcRoot) -> Result<Self> {
40        let ignore_rules =
41            build_ignore_patterns(COMMON_IGNORE_PATTERNS, xvc_root, XVCIGNORE_FILENAME)?;
42        let path_map = Arc::new(RwLock::new(HashMap::new()));
43
44        let (watcher, event_receiver) = make_watcher(ignore_rules.clone())?;
45        let (kill_signal_sender, kill_signal_receiver) = bounded(1);
46
47        let xvc_root = xvc_root.clone();
48        let xvc_root_clone = xvc_root.clone();
49        let path_map_clone = path_map.clone();
50        let output_sender = output_sender.clone();
51        let event_receiver_clone = event_receiver.clone();
52
53        let background_thread = Arc::new(Mutex::new(thread::spawn(move || {
54            let path_map = path_map_clone;
55            let fs_receiver = event_receiver_clone;
56            let xvc_root = xvc_root_clone;
57            // This is not used but to keep the watcher within the thread lifetime
58            #[allow(unused_variables)]
59            let watcher = watcher;
60
61            let handle_fs_event = |fs_event, pmm: Arc<RwLock<XvcPathMetadataMap>>| match fs_event {
62                PathEvent::Create { path, metadata } => {
63                    let xvc_path = XvcPath::new(&xvc_root, &xvc_root, &path).unwrap();
64                    let xvc_md = XvcMetadata::from(metadata);
65                    let mut pmm = pmm.write().unwrap();
66                    pmm.insert(xvc_path, xvc_md);
67                }
68                PathEvent::Update { path, metadata } => {
69                    let xvc_path = XvcPath::new(&xvc_root, &xvc_root, &path).unwrap();
70                    let xvc_md = XvcMetadata::from(metadata);
71                    let mut pmm = pmm.write().unwrap();
72                    pmm.insert(xvc_path, xvc_md);
73                }
74                PathEvent::Delete { path } => {
75                    let xvc_path = XvcPath::new(&xvc_root, &xvc_root, &path).unwrap();
76                    let xvc_md = XvcMetadata {
77                        file_type: XvcFileType::Missing,
78                        size: None,
79                        modified: None,
80                    };
81                    let mut pmm = pmm.write().unwrap();
82                    pmm.insert(xvc_path, xvc_md);
83                }
84            };
85
86            let mut sel = Select::new();
87            let fs_event_index = sel.recv(&fs_receiver);
88            let kill_signal_index = sel.recv(&kill_signal_receiver);
89
90            loop {
91                if let Ok(selection) = sel.select_timeout(Duration::from_millis(100)) {
92                    let index = selection.index();
93                    if index == fs_event_index {
94                        let fs_event = selection.recv(&fs_receiver);
95                        match fs_event {
96                            Ok(Some(fs_event)) => {
97                                let pmm = path_map.clone();
98                                handle_fs_event(fs_event, pmm);
99                            }
100                            Ok(None) => return Ok(()),
101                            Err(e) => {
102                                // If the channel is disconnected, return Ok.
103                                if e == RecvError {
104                                    return Ok(());
105                                } else {
106                                    error!("Error in fs_receiver: {:?}", e);
107                                    return Err(
108                                        anyhow::anyhow!("Error in fs_receiver: {:?}", e).into()
109                                    );
110                                }
111                            }
112                        }
113                        continue;
114                    } else if index == kill_signal_index {
115                        let _ = selection.recv(&kill_signal_receiver);
116                        return Ok(());
117                    } else {
118                        return Err((anyhow::anyhow!("Unknown selection index: {}", index)).into());
119                    }
120                }
121            }
122        })));
123
124        Ok(Self {
125            xvc_root,
126            path_map,
127            kill_signal_sender: Arc::new(kill_signal_sender),
128            background_thread,
129            output_sender,
130            ignore_rules,
131        })
132    }
133
134    /// Returns the [XvcMetadata] for a given [XvcPath].
135    pub fn get(&self, path: &XvcPath) -> Option<XvcMetadata> {
136        if !self.path_map.read().unwrap().contains_key(path) {
137            uwr!(self.update_metadata(path), self.output_sender);
138        }
139        let pm = self.path_map.clone();
140        let pm = uwr!(pm.read(), self.output_sender);
141        let md = pm.get(path).cloned();
142        md
143    }
144
145    /// Returns true if the path is present in the repository.
146    pub fn path_present(&self, path: &XvcPath) -> bool {
147        if !self.path_map.read().unwrap().contains_key(path) {
148            uwr!(self.update_metadata(path), self.output_sender);
149        }
150        let pm = self.path_map.clone();
151        let pm = uwr!(pm.read(), self.output_sender);
152        if let Some(md) = pm.get(path) {
153            !md.is_missing()
154        } else {
155            false
156        }
157    }
158
159    fn update_metadata(&self, xvc_path: &XvcPath) -> Result<()> {
160        let path = xvc_path.to_absolute_path(&self.xvc_root);
161        let md = path.symlink_metadata();
162        self.path_map
163            .write()
164            .unwrap()
165            .insert(xvc_path.clone(), XvcMetadata::from(md));
166        Ok(())
167    }
168
169    /// Stop updating the paths by killing the background thread
170    pub fn stop(&self) -> Result<()> {
171        self.kill_signal_sender
172            .clone()
173            .send(true)
174            .map_err(Error::from)?;
175        Ok(())
176    }
177
178    fn update_with_glob(&self, glob: &str) -> Result<()> {
179        for entry in glob::glob(glob)? {
180            match entry {
181                Ok(entry) => {
182                    if matches!(&self.ignore_rules.check(&entry), MatchResult::Ignore) {
183                        continue;
184                    } else {
185                        let xvc_path = XvcPath::new(&self.xvc_root, &self.xvc_root, &entry)?;
186                        if self.path_map.read().unwrap().contains_key(&xvc_path) {
187                            continue;
188                        } else {
189                            let md = entry.symlink_metadata();
190                            self.path_map
191                                .write()
192                                .unwrap()
193                                .insert(xvc_path, XvcMetadata::from(md));
194                        }
195                    }
196                }
197                Err(e) => {
198                    error!(self.output_sender, "Error while globbing: {:?}", e);
199                }
200            }
201        }
202        Ok(())
203    }
204
205    /// Return all paths from the disk specified with glob
206    pub fn glob_paths(&self, glob: &str) -> Result<XvcPathMetadataMap> {
207        self.update_with_glob(glob)?;
208        let mut matches = XvcPathMetadataMap::new();
209        let pattern = glob::Pattern::new(glob)?;
210        for (p, md) in self.path_map.read().unwrap().iter() {
211            if pattern.matches(p.as_str()) && !md.is_missing() {
212                matches.insert(p.clone(), *md);
213            }
214        }
215        Ok(matches)
216    }
217
218    /// Return a snapshot of the current path metadata map.
219    /// This is a clone of the internal map and is not updated. Intended to be used in testing.
220    pub fn current_path_metadata_map_clone(&self) -> Result<XvcPathMetadataMap> {
221        Ok(self.path_map.read()?.clone())
222    }
223}
224
225impl Drop for XvcPathMetadataProvider {
226    /// Stop the background thread when quit
227    fn drop(&mut self) {
228        // Ignore if the channel is closed
229        let _ = self.stop();
230    }
231}