1use crate::{archive::Archive, errors::BufkitDataErr};
4use metfor::Quantity;
5use std::{collections::HashSet, io::Read, str::FromStr};
6
7struct CleanMethodInternalSiteInfo {
8 station_num: crate::site::StationNumber,
9 model: crate::models::Model,
10 id: Option<String>,
11 init_time: chrono::NaiveDateTime,
12 end_time: chrono::NaiveDateTime,
13 coords: crate::coords::Coords,
14 elevation: metfor::Meters,
15}
16
17impl Archive {
18 pub fn clean(&self) -> Result<(), BufkitDataErr> {
21 let arch = Archive::connect(&self.root)?;
22
23 arch.db_conn
24 .execute("PRAGMA cache_size=10000", [])?;
25
26 println!("Building set of files from the index.");
27 let index_vals = self.get_all_files_from_index(&arch)?;
28
29 println!("Building set of files from the file system.");
30 let file_system_vals = self.get_all_files_in_data_dir(&arch)?;
31
32 println!("Comparing sets for files in index but not in the archive.");
33 let mut files_in_index_but_not_on_file_system = index_vals.difference(&file_system_vals);
34 self.remove_missing_files_from_index(&arch, &mut files_in_index_but_not_on_file_system)?;
35
36 println!("Comparing sets for files in archive but not in the index.");
37 let mut files_not_in_index = file_system_vals.difference(&index_vals);
38 self.handle_files_in_archive_but_not_index(&arch, &mut files_not_in_index)?;
39
40 println!("Compressing index.");
41 arch.db_conn.execute("VACUUM", [])?;
42
43 Ok(())
44 }
45
46 #[inline]
47 fn get_all_files_from_index(&self, arch: &Archive) -> Result<HashSet<String>, BufkitDataErr> {
48 let mut all_files_stmt = arch.db_conn.prepare("SELECT file_name FROM files")?;
49
50 let index_vals: Result<HashSet<String>, BufkitDataErr> = all_files_stmt
51 .query_map([], |row| row.get::<_, String>(0))?
52 .map(|res| res.map_err(BufkitDataErr::Database))
53 .collect();
54
55 index_vals
56 }
57
58 #[inline]
59 fn get_all_files_in_data_dir(&self, arch: &Archive) -> Result<HashSet<String>, BufkitDataErr> {
60 Ok(std::fs::read_dir(&arch.data_root())?
61 .filter_map(Result::ok)
62 .map(|de| de.path())
63 .filter(|p| p.is_file())
64 .filter_map(|p| p.file_name().map(ToOwned::to_owned))
65 .map(|p| p.to_string_lossy().to_string())
66 .collect())
67 }
68
69 #[inline]
70 fn remove_missing_files_from_index(
71 &self,
72 arch: &Archive,
73 files_in_index_but_not_on_file_system: &mut dyn Iterator<Item = &String>,
74 ) -> Result<(), BufkitDataErr> {
75 let mut del_stmt = arch
76 .db_conn
77 .prepare("DELETE FROM files WHERE file_name = ?1")?;
78
79 arch.db_conn
80 .execute("BEGIN TRANSACTION", [])?;
81
82 for missing_file in files_in_index_but_not_on_file_system {
83 del_stmt.execute(&[missing_file])?;
84 println!("Removing {} from index.", missing_file);
85 }
86 arch.db_conn
87 .execute("COMMIT TRANSACTION", [])?;
88
89 Ok(())
90 }
91
92 #[inline]
93 fn handle_files_in_archive_but_not_index(
94 &self,
95 arch: &Archive,
96 files_not_in_index: &mut dyn Iterator<Item = &String>,
97 ) -> Result<(), BufkitDataErr> {
98 let mut insert_stmt = arch.db_conn.prepare(
99 "
100 INSERT INTO files (
101 station_num,
102 model,
103 init_time,
104 end_time,
105 file_name,
106 id,
107 lat,
108 lon,
109 elevation_m
110 )
111 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
112 ",
113 )?;
114
115 arch.db_conn
116 .execute("BEGIN TRANSACTION", [])?;
117 for extra_file in files_not_in_index {
118 let message = if let Some(CleanMethodInternalSiteInfo {
119 station_num,
120 model,
121 id,
122 init_time,
123 end_time,
124 coords,
125 elevation,
126 }) = arch.extract_site_info_from_file(&extra_file)
127 {
128 if arch.site(station_num).is_none() {
129 let site = crate::site::SiteInfo {
130 station_num,
131 ..crate::site::SiteInfo::default()
132 };
133
134 arch.add_site(&site)?;
135 };
136
137 let station_num: u32 = station_num.into();
138
139 match insert_stmt.execute(&[
140 &station_num as &dyn rusqlite::types::ToSql,
141 &model.as_static_str() as &dyn rusqlite::types::ToSql,
142 &init_time as &dyn rusqlite::types::ToSql,
143 &end_time as &dyn rusqlite::types::ToSql,
144 &extra_file,
145 &id,
146 &coords.lat,
147 &coords.lon,
148 &elevation.unpack(),
149 ]) {
150 Ok(_) => format!("Added {}", extra_file),
151 Err(_) => {
152 std::fs::remove_file(arch.data_root().join(extra_file))?;
153 format!("Duplicate file removed: {}", extra_file)
154 }
155 }
156 } else {
157 std::fs::remove_file(arch.data_root().join(extra_file))?;
159 format!("Removed non-bufkit file: {}", extra_file)
160 };
161
162 println!("{}", message);
163 }
164 arch.db_conn
165 .execute("COMMIT TRANSACTION", [])?;
166
167 Ok(())
168 }
169
170 fn extract_site_info_from_file(&self, fname: &str) -> Option<CleanMethodInternalSiteInfo> {
171 let tokens: Vec<&str> = fname.split(|c| c == '_' || c == '.').collect();
172
173 if tokens.len() != 5 || tokens[3] != "buf" || tokens[4] != "gz" {
174 return None;
175 }
176
177 let model = crate::models::Model::from_str(tokens[1]).ok()?;
178
179 let file = std::fs::File::open(self.data_root().join(fname)).ok()?;
180 let mut decoder = flate2::read::GzDecoder::new(file);
181 let mut s = String::new();
182 decoder.read_to_string(&mut s).ok()?;
183
184 let crate::archive::InternalSiteInfo {
185 station_num,
186 id: parsed_site_id,
187 init_time,
188 end_time,
189 coords,
190 elevation,
191 } = Self::parse_site_info(&s).ok()?;
192
193 let id = if parsed_site_id.is_some() {
194 parsed_site_id
195 } else {
196 Some(tokens[2].to_owned())
197 };
198
199 Some(CleanMethodInternalSiteInfo {
200 station_num,
201 model,
202 id,
203 init_time,
204 end_time,
205 coords,
206 elevation,
207 })
208 }
209}
210
211#[cfg(test)]
212mod unit {
213 use crate::archive::unit::*; #[test]
216 fn test_clean() {
217 let TestArchive {
218 tmp: _tmp,
219 mut arch,
220 } = create_test_archive().expect("Failed to create test archive.");
221
222 fill_test_archive(&mut arch);
223
224 arch.clean().unwrap();
225 }
226}