bufkit_data/archive/
modify.rs

1use metfor::Quantity;
2use std::io::Write;
3
4use crate::{
5    errors::BufkitDataErr,
6    models::Model,
7    site::{SiteInfo, StationNumber},
8};
9
10impl crate::Archive {
11    /// Add a bufkit file to the archive.
12    pub fn add(
13        &self,
14        site_id_hint: &str,
15        stn_num_hint: Option<StationNumber>,
16        init_time_hint: Option<chrono::NaiveDateTime>,
17        model: Model,
18        text_data: &str,
19    ) -> Result<StationNumber, BufkitDataErr> {
20        let site_id_hint = site_id_hint.to_uppercase();
21
22        let super::InternalSiteInfo {
23            station_num: parsed_station_num,
24            id: parsed_site_id,
25            init_time,
26            end_time,
27            coords,
28            elevation,
29        } = Self::parse_site_info(text_data)?;
30
31        if let Some(init_time_hint) = init_time_hint {
32            if init_time_hint != init_time {
33                return Err(BufkitDataErr::MismatchedInitializationTimes {
34                    hint: init_time_hint,
35                    parsed: init_time,
36                });
37            }
38        }
39
40        //
41        // FIXME: We shouldn't do this, it's an error. The error check below should go here and
42        // return early. However, it is known that there are several "good" files on the archive 
43        // server where the site id in the URL doesn't match the one in the file. KLDN == KDLN for 
44        // some models!
45        //
46        let mut site_id = &site_id_hint;
47        if let Some(parsed_id) = parsed_site_id.as_ref() {
48            if parsed_id != &site_id_hint {
49                site_id = parsed_id;
50            }
51        }
52        let site_id = site_id;
53
54        // This is a new station!
55        if self.site(parsed_station_num).is_none() {
56            self.add_site(&SiteInfo {
57                station_num: parsed_station_num,
58                ..SiteInfo::default()
59            })?;
60        }
61
62        let file_name = self.compressed_file_name(site_id, model, init_time);
63        let site_id = Some(site_id);
64
65        let added_station_num = match std::fs::File::create(self.data_root().join(&file_name))
66            .map_err(BufkitDataErr::IO)
67            .and_then(|file| {
68                let mut encoder =
69                    flate2::write::GzEncoder::new(file, flate2::Compression::default());
70                encoder
71                    .write_all(text_data.as_bytes())
72                    .map_err(BufkitDataErr::IO)
73            })
74            .and_then(|_| {
75                self.db_conn
76                    .execute(
77                        include_str!("modify/add_file.sql"),
78                        &[
79                            &Into::<u32>::into(parsed_station_num) as &dyn rusqlite::types::ToSql,
80                            &model.as_static_str() as &dyn rusqlite::types::ToSql,
81                            &init_time as &dyn rusqlite::types::ToSql,
82                            &end_time,
83                            &file_name,
84                            &site_id,
85                            &coords.lat,
86                            &coords.lon,
87                            &elevation.unpack(),
88                        ],
89                    )
90                    .map_err(BufkitDataErr::Database)
91            }) {
92            Ok(_) => parsed_station_num,
93            Err(err) => return Err(err),
94        };
95
96        if let Some(parsed_id) = parsed_site_id {
97            if parsed_id != site_id_hint {
98                return Err(BufkitDataErr::MismatchedIDs {
99                    hint: site_id_hint,
100                    parsed: parsed_id,
101                });
102            }
103        }
104
105        if let Some(stn_num_hint) = stn_num_hint {
106            if stn_num_hint != parsed_station_num {
107                return Err(BufkitDataErr::MismatchedStationNumbers {
108                    hint: stn_num_hint,
109                    parsed: parsed_station_num,
110                });
111            }
112        }
113
114        Ok(added_station_num)
115    }
116
117    /// Add a site to the list of sites.
118    ///
119    /// If a site with this station number already exists, return an error from the underlying
120    /// database.
121    pub fn add_site(&self, site: &SiteInfo) -> Result<(), BufkitDataErr> {
122        self.db_conn.execute(
123            include_str!("modify/add_site.sql"),
124            &[
125                &Into::<u32>::into(site.station_num) as &dyn rusqlite::ToSql,
126                &site.name,
127                &site.state.map(|state_prov| state_prov.as_static_str())
128                    as &dyn rusqlite::types::ToSql,
129                &site.notes,
130                &site.time_zone.map(|tz| tz.local_minus_utc()),
131            ],
132        )?;
133
134        Ok(())
135    }
136
137    /// Modify a site's values.
138    pub fn update_site(&self, site: &SiteInfo) -> Result<(), BufkitDataErr> {
139        self.db_conn
140            .execute(
141                include_str!("modify/update_site.sql"),
142                &[
143                    &Into::<u32>::into(site.station_num),
144                    &site.state.map(|state_prov| state_prov.as_static_str())
145                        as &dyn rusqlite::types::ToSql,
146                    &site.name,
147                    &site.notes,
148                    &site.time_zone.map(|tz| tz.local_minus_utc()),
149                ],
150            )
151            .map_err(|err| err.into())
152            .map(|_| {})
153    }
154
155    /// Remove a file from the archive.
156    pub fn remove(
157        &self,
158        station_num: StationNumber,
159        model: Model,
160        init_time: chrono::NaiveDateTime,
161    ) -> Result<(), BufkitDataErr> {
162        let station_num: u32 = Into::<u32>::into(station_num);
163
164        let file_name: String = self.db_conn.query_row(
165            include_str!("modify/find_file_name.sql"),
166            &[
167                &station_num as &dyn rusqlite::types::ToSql,
168                &model.as_static_str() as &dyn rusqlite::types::ToSql,
169                &init_time as &dyn rusqlite::types::ToSql,
170            ],
171            |row| row.get(0),
172        )?;
173
174        std::fs::remove_file(self.data_root().join(file_name)).map_err(BufkitDataErr::IO)?;
175
176        self.db_conn.execute(
177            include_str!("modify/delete_file_from_index.sql"),
178            &[
179                &station_num as &dyn rusqlite::types::ToSql,
180                &model.as_static_str() as &dyn rusqlite::types::ToSql,
181                &init_time as &dyn rusqlite::types::ToSql,
182            ],
183        )?;
184
185        Ok(())
186    }
187
188    /// Remove a site and all of its files from the archive.
189    pub fn remove_site(&self, station_num: StationNumber) -> Result<(), BufkitDataErr> {
190        let station_num: u32 = Into::<u32>::into(station_num);
191
192        let mut qstmt = self
193            .db_conn
194            .prepare(include_str!("modify/find_all_files_for_site.sql"))?;
195        let mut dstmt = self
196            .db_conn
197            .prepare(include_str!("modify/delete_file_by_name.sql"))?;
198
199        let file_deletion_results: Result<Vec<()>, _> = qstmt
200            .query_map(&[&station_num], |row| row.get(0))?
201            .map(|res: Result<String, rusqlite::Error>| res.map_err(BufkitDataErr::Database))
202            .map(|res| {
203                res.and_then(|fname| {
204                    std::fs::remove_file(self.data_root().join(&fname))
205                        .map_err(BufkitDataErr::IO)
206                        .map(|_| fname)
207                })
208            })
209            .map(|res| {
210                res.and_then(|fname| {
211                    dstmt
212                        .execute([fname])
213                        .map_err(BufkitDataErr::Database)
214                        .map(|_num_rows_affected| ())
215                })
216            })
217            .collect();
218        file_deletion_results?;
219
220        self.db_conn
221            .execute(include_str!("modify/delete_site.sql"), &[&station_num])?;
222
223        Ok(())
224    }
225
226    fn compressed_file_name(
227        &self,
228        station_id: &str,
229        model: Model,
230        init_time: chrono::NaiveDateTime,
231    ) -> String {
232        let file_string = init_time.format("%Y%m%d%HZ").to_string();
233
234        format!(
235            "{}_{}_{}.buf.gz",
236            file_string,
237            model.as_static_str(),
238            station_id,
239        )
240    }
241}
242
243#[cfg(test)]
244mod unit {
245    use super::*;
246    use crate::archive::unit::*; // Set up and tear down functions.
247
248    use chrono::NaiveDate;
249
250    #[test]
251    fn test_no_duplicate_sites() {
252        let TestArchive { tmp: _tmp, arch } =
253            create_test_archive().expect("Failed to create test archive.");
254
255        let test_sites = &get_test_sites();
256
257        for site in test_sites {
258            arch.add_site(site).expect("Error adding site.");
259        }
260
261        // Try adding them again, this should fail.
262        for site in test_sites {
263            assert!(arch.add_site(site).is_err());
264        }
265    }
266
267    #[test]
268    fn test_update_site() {
269        let TestArchive { tmp: _tmp, arch } =
270            create_test_archive().expect("Failed to create test archive.");
271
272        let test_sites = &get_test_sites();
273
274        for site in test_sites {
275            arch.add_site(site).expect("Error adding site.");
276        }
277
278        const STN: StationNumber = StationNumber::new(3);
279
280        let zootown = SiteInfo {
281            station_num: StationNumber::from(STN),
282            name: Some("Zootown".to_owned()),
283            notes: Some("Mountains, not coast.".to_owned()),
284            state: Some(crate::StateProv::MT),
285            time_zone: Some(chrono::FixedOffset::west_opt(7 * 3600).unwrap()),
286        };
287
288        arch.update_site(&zootown).expect("Error updating site.");
289
290        assert_eq!(arch.site(STN).unwrap(), zootown);
291        assert_ne!(arch.site(STN).unwrap(), test_sites[2]);
292    }
293
294    #[test]
295    fn test_add() {
296        let TestArchive {
297            tmp: _tmp,
298            mut arch,
299        } = create_test_archive().expect("Failed to create test archive.");
300
301        fill_test_archive(&mut arch);
302    }
303
304    #[test]
305    fn test_remove_file() {
306        let TestArchive {
307            tmp: _tmp,
308            mut arch,
309        } = create_test_archive().expect("Failed to create test archive.");
310
311        fill_test_archive(&mut arch);
312
313        let site = StationNumber::from(727730); // Station number for KMSO
314        let init_time = NaiveDate::from_ymd_opt(2017, 4, 1).unwrap().and_hms_opt(6, 0, 0).unwrap();
315        let model = Model::GFS;
316
317        assert!(arch
318            .file_exists(site, model, init_time)
319            .expect("Error checking db"));
320        arch.remove(site, model, init_time)
321            .expect("Error while removing.");
322        assert!(!arch
323            .file_exists(site, model, init_time)
324            .expect("Error checking db"));
325    }
326
327    #[test]
328    fn test_remove_site() {
329        let TestArchive {
330            tmp: _tmp,
331            mut arch,
332        } = create_test_archive().expect("Failed to create test archive.");
333
334        fill_test_archive(&mut arch);
335
336        let station_num = StationNumber::from(727730); // Station number for KMSO
337        let init_time_model_pairs = [
338            (NaiveDate::from_ymd_opt(2017, 4, 1).unwrap().and_hms_opt(0, 0, 0).unwrap(), Model::NAM),
339            (NaiveDate::from_ymd_opt(2017, 4, 1).unwrap().and_hms_opt(6, 0, 0).unwrap(), Model::GFS),
340            (
341                NaiveDate::from_ymd_opt(2017, 4, 1).unwrap().and_hms_opt(12, 0, 0).unwrap(),
342                Model::GFS,
343            ),
344            (
345                NaiveDate::from_ymd_opt(2017, 4, 1).unwrap().and_hms_opt(12, 0, 0).unwrap(),
346                Model::NAM,
347            ),
348            (
349                NaiveDate::from_ymd_opt(2017, 4, 1).unwrap().and_hms_opt(18, 0, 0).unwrap(),
350                Model::GFS,
351            ),
352            (
353                NaiveDate::from_ymd_opt(2017, 4, 1).unwrap().and_hms_opt(18, 0, 0).unwrap(),
354                Model::NAM,
355            ),
356        ];
357
358        for &(init_time, model) in &init_time_model_pairs {
359            assert!(arch
360                .file_exists(station_num, model, init_time)
361                .expect("Error checking db"));
362        }
363
364        arch.remove_site(station_num).expect("db error deleting.");
365
366        for &(init_time, model) in &init_time_model_pairs {
367            assert!(!arch
368                .file_exists(station_num, model, init_time)
369                .expect("Error checking db"));
370        }
371    }
372}