progscrape_application/persist/
backerupper.rs

1use std::{
2    io::{BufWriter, Write},
3    path::{Path, PathBuf},
4};
5
6use progscrape_scrapers::StoryDate;
7use serde::{Deserialize, Serialize};
8
9use crate::{
10    persist::scrapestore::{ScrapeStoreStats, SCRAPE_STORE_VERSION},
11    timer_end, timer_start, PersistError, Shard,
12};
13
14use super::{
15    scrapestore::ScrapeStore,
16    shard::{ShardOrder, ShardRange},
17};
18
19#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
20pub enum BackupResult {
21    Empty,
22    NoChange,
23    Success(usize),
24}
25
26pub struct BackerUpper {
27    path: PathBuf,
28}
29
30impl BackerUpper {
31    pub fn new(path: impl AsRef<Path>) -> Self {
32        Self {
33            path: path.as_ref().to_owned(),
34        }
35    }
36
37    fn trace_error<E: core::fmt::Debug>(error: E) -> E {
38        tracing::error!("Ignoring error in metadata read: {:?}", error);
39        error
40    }
41
42    pub fn backup(
43        &self,
44        name: &str,
45        shard: Shard,
46        scrapes: &ScrapeStore,
47    ) -> Result<BackupResult, PersistError> {
48        let stats = scrapes.stats(shard)?;
49        if stats.count == 0 {
50            return Ok(BackupResult::Empty);
51        }
52
53        // Metadata read intentionally drops some errors - we'll intentionally do more work if it's corrupt
54        let meta = self.path.join(format!("{}.meta.json", name));
55        let meta_temp = self.path.join(format!(".{}.meta.json", name));
56        if meta.exists() {
57            if let Ok(file) = std::fs::File::open(&meta).map_err(Self::trace_error) {
58                if let Ok(current_stats) = serde_json::from_reader(file).map_err(Self::trace_error)
59                {
60                    if stats == current_stats {
61                        return Ok(BackupResult::NoChange);
62                    }
63                }
64            }
65        }
66
67        let output = self.path.join(format!("{}.json", name));
68        let temp = self.path.join(format!(".{}.temp", name));
69        let file = std::fs::File::create(&temp)?;
70
71        let time = timer_start!();
72
73        // Write each scrape to the file, with a newline separating them
74        let mut w = BufWriter::new(file);
75        const NEWLINE: [u8; 1] = [b'\n'];
76        let mut earliest = StoryDate::MAX;
77        let mut latest = StoryDate::MIN;
78        let mut count = 0;
79        scrapes.fetch_all(
80            shard,
81            |scrape| {
82                count += 1;
83                earliest = earliest.min(scrape.date);
84                latest = latest.max(scrape.date);
85                w.write_all(serde_json::to_string(&scrape)?.as_bytes())?;
86                w.write(&NEWLINE)?;
87                Ok(())
88            },
89            |error| {
90                tracing::error!("Error fetching scrape: {:?}", error);
91            },
92        )?;
93
94        let computed_stats = ScrapeStoreStats {
95            version: SCRAPE_STORE_VERSION,
96            count,
97            earliest,
98            latest,
99        };
100
101        if computed_stats != stats {
102            tracing::info!(
103                "Scrape store stats changed during backup: was {:?}, computed {:?}",
104                stats,
105                computed_stats
106            );
107        }
108
109        // Note that we write our computed stats, not the ones we used for checking backup freshness!
110        serde_json::to_writer(std::fs::File::create(&meta_temp)?, &computed_stats)?;
111
112        // Atomic rename from temp to output for data files and meta
113        std::fs::rename(temp, &output)?;
114        std::fs::rename(meta_temp, meta)?;
115
116        timer_end!(
117            time,
118            "Successfully backed up {} stories to {}",
119            count,
120            output.to_string_lossy()
121        );
122
123        Ok(BackupResult::Success(count))
124    }
125
126    pub fn backup_range(
127        &self,
128        scrapes: &ScrapeStore,
129        shard_range: ShardRange,
130    ) -> Vec<(Shard, Result<BackupResult, PersistError>)> {
131        let mut v = vec![];
132        for shard in shard_range.iterate(ShardOrder::OldestFirst) {
133            v.push((shard, self.backup(&shard.to_string(), shard, scrapes)))
134        }
135        v
136    }
137}
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142    use crate::test::enable_tracing;
143    use crate::PersistLocation;
144    use progscrape_scrapers::ScrapeConfig;
145    use rstest::*;
146
147    #[rstest]
148    fn test_insert(_enable_tracing: &bool) -> Result<(), Box<dyn std::error::Error>> {
149        let store = ScrapeStore::new(PersistLocation::Memory)?;
150
151        let samples = progscrape_scrapers::load_sample_scrapes(&ScrapeConfig::default());
152        let first = &samples[0..100];
153
154        for scrape in first {
155            store.insert_scrape(scrape)?;
156        }
157
158        let tempdir = tempfile::tempdir()?;
159        let backup = BackerUpper::new(tempdir);
160        backup.backup("2015-01", Shard::from_year_month(2015, 1), &store)?;
161
162        Ok(())
163    }
164}