progscrape_application/persist/
backerupper.rs1use std::{
2 io::{BufWriter, Write},
3 path::{Path, PathBuf},
4};
5
6use progscrape_scrapers::StoryDate;
7use serde::{Deserialize, Serialize};
8
9use crate::{
10 persist::scrapestore::{ScrapeStoreStats, SCRAPE_STORE_VERSION},
11 timer_end, timer_start, PersistError, Shard,
12};
13
14use super::{
15 scrapestore::ScrapeStore,
16 shard::{ShardOrder, ShardRange},
17};
18
19#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
20pub enum BackupResult {
21 Empty,
22 NoChange,
23 Success(usize),
24}
25
26pub struct BackerUpper {
27 path: PathBuf,
28}
29
30impl BackerUpper {
31 pub fn new(path: impl AsRef<Path>) -> Self {
32 Self {
33 path: path.as_ref().to_owned(),
34 }
35 }
36
37 fn trace_error<E: core::fmt::Debug>(error: E) -> E {
38 tracing::error!("Ignoring error in metadata read: {:?}", error);
39 error
40 }
41
42 pub fn backup(
43 &self,
44 name: &str,
45 shard: Shard,
46 scrapes: &ScrapeStore,
47 ) -> Result<BackupResult, PersistError> {
48 let stats = scrapes.stats(shard)?;
49 if stats.count == 0 {
50 return Ok(BackupResult::Empty);
51 }
52
53 let meta = self.path.join(format!("{}.meta.json", name));
55 let meta_temp = self.path.join(format!(".{}.meta.json", name));
56 if meta.exists() {
57 if let Ok(file) = std::fs::File::open(&meta).map_err(Self::trace_error) {
58 if let Ok(current_stats) = serde_json::from_reader(file).map_err(Self::trace_error)
59 {
60 if stats == current_stats {
61 return Ok(BackupResult::NoChange);
62 }
63 }
64 }
65 }
66
67 let output = self.path.join(format!("{}.json", name));
68 let temp = self.path.join(format!(".{}.temp", name));
69 let file = std::fs::File::create(&temp)?;
70
71 let time = timer_start!();
72
73 let mut w = BufWriter::new(file);
75 const NEWLINE: [u8; 1] = [b'\n'];
76 let mut earliest = StoryDate::MAX;
77 let mut latest = StoryDate::MIN;
78 let mut count = 0;
79 scrapes.fetch_all(
80 shard,
81 |scrape| {
82 count += 1;
83 earliest = earliest.min(scrape.date);
84 latest = latest.max(scrape.date);
85 w.write_all(serde_json::to_string(&scrape)?.as_bytes())?;
86 w.write(&NEWLINE)?;
87 Ok(())
88 },
89 |error| {
90 tracing::error!("Error fetching scrape: {:?}", error);
91 },
92 )?;
93
94 let computed_stats = ScrapeStoreStats {
95 version: SCRAPE_STORE_VERSION,
96 count,
97 earliest,
98 latest,
99 };
100
101 if computed_stats != stats {
102 tracing::info!(
103 "Scrape store stats changed during backup: was {:?}, computed {:?}",
104 stats,
105 computed_stats
106 );
107 }
108
109 serde_json::to_writer(std::fs::File::create(&meta_temp)?, &computed_stats)?;
111
112 std::fs::rename(temp, &output)?;
114 std::fs::rename(meta_temp, meta)?;
115
116 timer_end!(
117 time,
118 "Successfully backed up {} stories to {}",
119 count,
120 output.to_string_lossy()
121 );
122
123 Ok(BackupResult::Success(count))
124 }
125
126 pub fn backup_range(
127 &self,
128 scrapes: &ScrapeStore,
129 shard_range: ShardRange,
130 ) -> Vec<(Shard, Result<BackupResult, PersistError>)> {
131 let mut v = vec![];
132 for shard in shard_range.iterate(ShardOrder::OldestFirst) {
133 v.push((shard, self.backup(&shard.to_string(), shard, scrapes)))
134 }
135 v
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142 use crate::test::enable_tracing;
143 use crate::PersistLocation;
144 use progscrape_scrapers::ScrapeConfig;
145 use rstest::*;
146
147 #[rstest]
148 fn test_insert(_enable_tracing: &bool) -> Result<(), Box<dyn std::error::Error>> {
149 let store = ScrapeStore::new(PersistLocation::Memory)?;
150
151 let samples = progscrape_scrapers::load_sample_scrapes(&ScrapeConfig::default());
152 let first = &samples[0..100];
153
154 for scrape in first {
155 store.insert_scrape(scrape)?;
156 }
157
158 let tempdir = tempfile::tempdir()?;
159 let backup = BackerUpper::new(tempdir);
160 backup.backup("2015-01", Shard::from_year_month(2015, 1), &store)?;
161
162 Ok(())
163 }
164}