bufkit_data/archive/
clean.rs

1//! The cleaning method for Archive is complex, so it has its own module.
2
3use crate::{archive::Archive, errors::BufkitDataErr};
4use metfor::Quantity;
5use std::{collections::HashSet, io::Read, str::FromStr};
6
7struct CleanMethodInternalSiteInfo {
8    station_num: crate::site::StationNumber,
9    model: crate::models::Model,
10    id: Option<String>,
11    init_time: chrono::NaiveDateTime,
12    end_time: chrono::NaiveDateTime,
13    coords: crate::coords::Coords,
14    elevation: metfor::Meters,
15}
16
17impl Archive {
18    /// Validate files listed in the index are in the archive too, if not remove them from the
19    /// index.
20    pub fn clean(&self) -> Result<(), BufkitDataErr> {
21        let arch = Archive::connect(&self.root)?;
22
23        arch.db_conn
24            .execute("PRAGMA cache_size=10000", [])?;
25
26        println!("Building set of files from the index.");
27        let index_vals = self.get_all_files_from_index(&arch)?;
28
29        println!("Building set of files from the file system.");
30        let file_system_vals = self.get_all_files_in_data_dir(&arch)?;
31
32        println!("Comparing sets for files in index but not in the archive.");
33        let mut files_in_index_but_not_on_file_system = index_vals.difference(&file_system_vals);
34        self.remove_missing_files_from_index(&arch, &mut files_in_index_but_not_on_file_system)?;
35
36        println!("Comparing sets for files in archive but not in the index.");
37        let mut files_not_in_index = file_system_vals.difference(&index_vals);
38        self.handle_files_in_archive_but_not_index(&arch, &mut files_not_in_index)?;
39
40        println!("Compressing index.");
41        arch.db_conn.execute("VACUUM", [])?;
42
43        Ok(())
44    }
45
46    #[inline]
47    fn get_all_files_from_index(&self, arch: &Archive) -> Result<HashSet<String>, BufkitDataErr> {
48        let mut all_files_stmt = arch.db_conn.prepare("SELECT file_name FROM files")?;
49
50        let index_vals: Result<HashSet<String>, BufkitDataErr> = all_files_stmt
51            .query_map([], |row| row.get::<_, String>(0))?
52            .map(|res| res.map_err(BufkitDataErr::Database))
53            .collect();
54
55        index_vals
56    }
57
58    #[inline]
59    fn get_all_files_in_data_dir(&self, arch: &Archive) -> Result<HashSet<String>, BufkitDataErr> {
60        Ok(std::fs::read_dir(&arch.data_root())?
61            .filter_map(Result::ok)
62            .map(|de| de.path())
63            .filter(|p| p.is_file())
64            .filter_map(|p| p.file_name().map(ToOwned::to_owned))
65            .map(|p| p.to_string_lossy().to_string())
66            .collect())
67    }
68
69    #[inline]
70    fn remove_missing_files_from_index(
71        &self,
72        arch: &Archive,
73        files_in_index_but_not_on_file_system: &mut dyn Iterator<Item = &String>,
74    ) -> Result<(), BufkitDataErr> {
75        let mut del_stmt = arch
76            .db_conn
77            .prepare("DELETE FROM files WHERE file_name = ?1")?;
78
79        arch.db_conn
80            .execute("BEGIN TRANSACTION", [])?;
81
82        for missing_file in files_in_index_but_not_on_file_system {
83            del_stmt.execute(&[missing_file])?;
84            println!("Removing {} from index.", missing_file);
85        }
86        arch.db_conn
87            .execute("COMMIT TRANSACTION", [])?;
88
89        Ok(())
90    }
91
92    #[inline]
93    fn handle_files_in_archive_but_not_index(
94        &self,
95        arch: &Archive,
96        files_not_in_index: &mut dyn Iterator<Item = &String>,
97    ) -> Result<(), BufkitDataErr> {
98        let mut insert_stmt = arch.db_conn.prepare(
99            "
100                INSERT INTO files (
101                    station_num, 
102                    model,
103                    init_time,
104                    end_time,
105                    file_name, 
106                    id, 
107                    lat, 
108                    lon, 
109                    elevation_m
110                )
111                VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
112            ",
113        )?;
114
115        arch.db_conn
116            .execute("BEGIN TRANSACTION", [])?;
117        for extra_file in files_not_in_index {
118            let message = if let Some(CleanMethodInternalSiteInfo {
119                station_num,
120                model,
121                id,
122                init_time,
123                end_time,
124                coords,
125                elevation,
126            }) = arch.extract_site_info_from_file(&extra_file)
127            {
128                if arch.site(station_num).is_none() {
129                    let site = crate::site::SiteInfo {
130                        station_num,
131                        ..crate::site::SiteInfo::default()
132                    };
133
134                    arch.add_site(&site)?;
135                };
136
137                let station_num: u32 = station_num.into();
138
139                match insert_stmt.execute(&[
140                    &station_num as &dyn rusqlite::types::ToSql,
141                    &model.as_static_str() as &dyn rusqlite::types::ToSql,
142                    &init_time as &dyn rusqlite::types::ToSql,
143                    &end_time as &dyn rusqlite::types::ToSql,
144                    &extra_file,
145                    &id,
146                    &coords.lat,
147                    &coords.lon,
148                    &elevation.unpack(),
149                ]) {
150                    Ok(_) => format!("Added {}", extra_file),
151                    Err(_) => {
152                        std::fs::remove_file(arch.data_root().join(extra_file))?;
153                        format!("Duplicate file removed: {}", extra_file)
154                    }
155                }
156            } else {
157                // Remove non-bufkit file
158                std::fs::remove_file(arch.data_root().join(extra_file))?;
159                format!("Removed non-bufkit file: {}", extra_file)
160            };
161
162            println!("{}", message);
163        }
164        arch.db_conn
165            .execute("COMMIT TRANSACTION", [])?;
166
167        Ok(())
168    }
169
170    fn extract_site_info_from_file(&self, fname: &str) -> Option<CleanMethodInternalSiteInfo> {
171        let tokens: Vec<&str> = fname.split(|c| c == '_' || c == '.').collect();
172
173        if tokens.len() != 5 || tokens[3] != "buf" || tokens[4] != "gz" {
174            return None;
175        }
176
177        let model = crate::models::Model::from_str(tokens[1]).ok()?;
178
179        let file = std::fs::File::open(self.data_root().join(fname)).ok()?;
180        let mut decoder = flate2::read::GzDecoder::new(file);
181        let mut s = String::new();
182        decoder.read_to_string(&mut s).ok()?;
183
184        let crate::archive::InternalSiteInfo {
185            station_num,
186            id: parsed_site_id,
187            init_time,
188            end_time,
189            coords,
190            elevation,
191        } = Self::parse_site_info(&s).ok()?;
192
193        let id = if parsed_site_id.is_some() {
194            parsed_site_id
195        } else {
196            Some(tokens[2].to_owned())
197        };
198
199        Some(CleanMethodInternalSiteInfo {
200            station_num,
201            model,
202            id,
203            init_time,
204            end_time,
205            coords,
206            elevation,
207        })
208    }
209}
210
211#[cfg(test)]
212mod unit {
213    use crate::archive::unit::*; // test helpers.
214
215    #[test]
216    fn test_clean() {
217        let TestArchive {
218            tmp: _tmp,
219            mut arch,
220        } = create_test_archive().expect("Failed to create test archive.");
221
222        fill_test_archive(&mut arch);
223
224        arch.clean().unwrap();
225    }
226}