tag2upload_service_manager/
expire.rs1
2use crate::prelude::*;
3
4use std::io::BufWriter;
5use std::fs::File;
6
7pub fn expire_now(gl: &Arc<Globals>) -> Result<(), IE> {
8 let overflow_msg = "implausible intervals.expiry or system time";
9
10 let now = gl.now_systemtime();
11
12 let oldest_to_keep: TimeT =
13 now
14 .checked_sub(*gl.config.intervals.expire)
15 .ok_or_else(|| internal!("{overflow_msg}"))?
16 .try_into().into_internal(overflow_msg)?;
17
18 let archive_dir = &gl.config.files.archive_dir;
19 let timestamp = format!("{}", HtTimeT(now.into()));
20 let archive_leaf = format!("expired-{timestamp}.dump.txt");
21 let archive_subdir = timestamp.split_once('-')
22 .ok_or_else(|| internal!(
23 "unexpected archive timestamp format {timestamp}"
24 ))?
25 .0;
26 let archive_subdir = format!("{archive_dir}/{archive_subdir}");
27 let archive_file = format!("{archive_subdir}/{archive_leaf}");
28 let archive_file_tmp = format!("{archive_file}.tmp");
29
30 let sync_directory = |d| (|| {
31 File::open(d).context("open")?
32 .sync_all().context("sync")
33 })()
34 .with_context(|| format!("{d:?}"))
35 .into_internal("fsync archive (sub)directory");
36
37 let (n_expired, archived_to) = db_transaction(TN::Expiry, |dbt| {
38
39 let clearout_glob = format!("{archive_dir}/*/expired-*.dump.txt.tmp");
42 (|| {
43 for clearout in glob::glob(&clearout_glob).context("glob start")? {
44 let clearout = clearout.context("glob")?;
45 fs::remove_file(&clearout)
46 .with_context(|| clearout.display().to_string())
47 .context("delete")?;
48 }
49 Ok::<_, AE>(())
50 })().with_context(|| clearout_glob.clone())
51 .into_internal("clear out old tmp files")?;
52
53 fs::create_dir(&archive_subdir).or_else(|e| match e {
56 e if e.kind() == io::ErrorKind::AlreadyExists => Ok(()),
57 other => Err(other),
58 }).into_internal("create archive subdirectory")?;
59
60 let fwriter = deferred_file_creation::Writer::new_uncreated(
63 archive_file_tmp.clone(),
64 fs::OpenOptions::new().create_new(true).clone(),
65 );
66 let mut archiver = mini_sqlite_dump::Archiver::start(
67 dbt, fwriter, ["jobs"],
68 ).into_internal("prepare to archive expired data")?;
69
70 let mut n_expired = 0;
71
72 dbt.bsql_exec(bsql!("
73 DELETE FROM last_expiry
74 "))?;
75 dbt.bsql_exec(bsql!("
76 INSERT INTO last_expiry " +~(LastExpiryRow {
77 run: now.into(),
78 threshold: oldest_to_keep,
79 })"
80 "))?;
81
82 let archived_to = dbt.bsql_query_rows_concurrent(
83
84 bsql!(
85 " SELECT * FROM jobs
86 WHERE last_update < " oldest_to_keep "
87 "),
88
89 |mut rows| {
90
91 let mut tarchiver = archiver.start_table("jobs")
92 .into_internal("prepare to archive expired rows")?;
93
94 let mut expired_counts = HashMap::<_, i64>::default();
95
96 while let Some(row) = rows.next()
97 .into_internal("fetch row to expire")?
98 {
99 let jid: JobId = row.get("jid")
100 .into_internal("extract jid")?;
101
102 n_expired += 1;
103 tarchiver.writer_mut().ensure_created()
104 .with_context(|| archive_file_tmp.clone())
105 .into_internal("start archive file")?;
106 tarchiver.write_row(row)
107 .with_context(|| archive_file_tmp.clone())
108 .with_context(|| format!("jobid={jid}"))
109 .into_internal("write row to archive file")?;
110
111 dbt.bsql_exec_concurrent(bsql!(
112 "DELETE FROM jobs WHERE jid = " jid
113 ))?;
114
115 match JobRow::from_sql_row(&row) {
116 Ok(row) => {
117 let shown = ShownJobStatus::new(
118 row.status,
119 row.duplicate_of
120 );
121 *expired_counts
122 .entry(shown)
123 .or_default()
124 += 1;
125 },
126 Err(e) => error!(%e,
127 "error interpreting expired row, for status"
128 ),
129 }
130 }
131
132 for (shown, n_jobs) in expired_counts {
133 dbt.bsql_exec_concurrent(bsql!("
134 UPDATE stats_by_shown_status_expired
135 SET n_jobs = n_jobs + " n_jobs "
136 WHERE shown_status = " shown "
137 "))?;
138 }
139
140 let fwriter = archiver.finish_with_writer()
143 .with_context(|| archive_file_tmp.clone())
144 .into_internal("complete archive file")?;
145
146 if let Some::<(File, String)>((file, _)) = {
147 fwriter.finish_created()
148 .with_context(|| archive_file_tmp.clone())
149 .into_internal("finish archive file")?
150 } {
151 file.sync_all()
152 .with_context(|| archive_file_tmp.clone())
153 .into_internal("fsync archive file")?;
154
155 fs::rename(
156 &archive_file_tmp,
157 &archive_file,
158 )
159 .with_context(|| archive_file.clone())
160 .with_context(|| archive_file_tmp.clone())
161 .into_internal("install archive file")?;
162
163 sync_directory(&archive_subdir)?;
164 sync_directory(&archive_dir)?;
165
166 Ok::<_, IE>(Some(archive_leaf.clone()))
167 } else {
168 assert_eq!(n_expired, 0);
169 Ok::<_, IE>(None)
170 }
171 }
172 )??;
173
174 Ok::<_, IE>((n_expired, archived_to))
183 })??;
184
185 if let Some(archive) = archived_to {
186 info!(n_expired, %oldest_to_keep, ?archive, "expired");
187 } else {
188 info!(n_expired, %oldest_to_keep, "nothing expired");
189 }
190
191 Ok(())
192}
193
194impl LastExpiryRow {
195 pub fn get(dbt: &mut rusqlite::Transaction) -> Result<LastExpiryRow, IE> {
196 let dummy_time: TimeT = 0.into();
197
198 let row = dbt.bsql_query_01(bsql!("SELECT * FROM last_expiry"))?
199 .unwrap_or_else(|| LastExpiryRow {
200 run: dummy_time,
201 threshold: dummy_time,
202 });
203 Ok(row)
204 }
205
206 pub fn for_ui(dbt: &mut rusqlite::Transaction)
207 -> Result<UiSerializeMap<LastExpiryRow>, IE>
208 {
209 Self::get(dbt).map(UiSerializeMap)
210 }
211}
212
213pub fn start_task(gl: &Arc<Globals>) {
214 gl.spawn_task_running("expiry", {
215 let gl = gl.clone();
216 async move {
217 loop {
218 let expire_every = *gl.config.intervals.expire_every;
219 select! {
220 () = tokio::time::sleep(expire_every) => {},
221 sd = gl.await_shutdown() => return Err(sd.into()),
222 };
223 expire_now(&gl)?;
224 }
225 }
226 })
227}
228
229pub mod deferred_file_creation {
232
233 use super::*;
234
235 define_derive_deftly! {
236 DelegateWrite for struct, expect items:
237
238 ${define FIELD { $( ${when fmeta(delegate_write)} $fname ) }}
239
240 impl io::Write for $ttype {
241 fn write(&mut self, data: &[u8]) -> io::Result<usize> {
242 io::Write::write(&mut self.$FIELD, data)
243 }
244 fn flush(&mut self) -> io::Result<()> {
245 io::Write::flush(&mut self.$FIELD)
246 }
247 }
248 }
249
250 #[derive(Debug, Deftly)]
251 #[derive_deftly(DelegateWrite)]
252 pub struct Writer {
253 filename: String,
254 #[deftly(delegate_write)]
255 file: Either<NotYetCreated, BufWriter<File>>,
256 }
257
258 #[derive(Debug, Deftly)]
259 #[derive_deftly(DelegateWrite)]
260 struct NotYetCreated {
261 #[deftly(delegate_write)]
262 buf: Vec<u8>,
263 open_options: fs::OpenOptions,
264 }
265
266 impl Writer {
267 pub fn new_uncreated(
268 filename: String,
269 mut open_options: fs::OpenOptions,
270 ) -> Self {
271 open_options.write(true);
272 let file = Either::Left(NotYetCreated {
273 buf: vec![],
274 open_options,
275 });
276 Writer { filename, file }
277 }
278
279 pub fn ensure_created(&mut self) -> io::Result<()> {
281 match &mut self.file {
282 Either::Left(nyc) => {
283 let file = nyc.open_options.open(&self.filename)?;
284 let mut file = BufWriter::new(file);
285 file.write_all(&mem::take(&mut nyc.buf))?;
286 self.file = Either::Right(file);
287 }
288 Either::Right(bw) => {
289 let _: &mut BufWriter<File> = bw;
290 }
291 }
292 Ok(())
293 }
294
295 pub fn finish_created(self) -> io::Result<Option<(File, String)>> {
300 match self.file {
301 Either::Left(nyc) => {
302 let NotYetCreated { .. } = nyc;
303 Ok(None)
304 },
305 Either::Right(file) => {
306 let file = file.into_inner()
307 .map_err(|e| e.into_error())?;
308 Ok(Some((file, self.filename)))
309 }
310 }
311 }
312 }
313}