tag2upload_service_manager/
expire.rs1
2use crate::prelude::*;
3
4use std::io::BufWriter;
5use std::fs::File;
6
7pub fn expire_now(gl: &Arc<Globals>) -> Result<(), IE> {
8 let overflow_msg = "implausible intervals.expiry or system time";
9
10 let now = gl.now_systemtime();
11
12 let oldest_to_keep: TimeT = {
13 TimeT::try_from(
14 now
15 .checked_sub(*gl.config.intervals.expire)
16 .ok_or_else(|| internal!("{overflow_msg}"))?
17 )
18 .into_internal(overflow_msg)?
19 };
20
21 let archive_dir = &gl.config.files.archive_dir;
22 let timestamp = format!("{}", HtTimeT(now.into()));
23 let archive_leaf = format!("expired-{timestamp}.dump.txt");
24 let archive_subdir = timestamp.split_once('-')
25 .ok_or_else(|| internal!(
26 "unexpected archive timestamp format {timestamp}"
27 ))?
28 .0;
29 let archive_subdir = format!("{archive_dir}/{archive_subdir}");
30 let archive_file = format!("{archive_subdir}/{archive_leaf}");
31 let archive_file_tmp = format!("{archive_file}.tmp");
32
33 let sync_directory = |d| (|| {
34 File::open(d).context("open")?
35 .sync_all().context("sync")
36 })()
37 .with_context(|| format!("{d:?}"))
38 .into_internal("fsync archive (sub)directory");
39
40 let (n_expired, archived_to) = db_transaction(TN::Expiry, |dbt| {
41
42 let clearout_glob = format!("{archive_dir}/*/expired-*.dump.txt.tmp");
45 (|| {
46 for clearout in glob::glob(&clearout_glob).context("glob start")? {
47 let clearout = clearout.context("glob")?;
48 fs::remove_file(&clearout)
49 .with_context(|| clearout.display().to_string())
50 .context("delete")?;
51 }
52 Ok::<_, AE>(())
53 })().with_context(|| clearout_glob.clone())
54 .into_internal("clear out old tmp files")?;
55
56 fs::create_dir(&archive_subdir).or_else(|e| match e {
59 e if e.kind() == io::ErrorKind::AlreadyExists => Ok(()),
60 other => Err(other),
61 }).into_internal("create archive subdirectory")?;
62
63 let fwriter = deferred_file_creation::Writer::new_uncreated(
66 archive_file_tmp.clone(),
67 fs::OpenOptions::new().create_new(true).clone(),
68 );
69 let mut archiver = mini_sqlite_dump::Archiver::start(
70 dbt, fwriter, ["jobs"],
71 ).into_internal("prepare to archive expired data")?;
72
73 let mut n_expired = 0;
74
75 dbt.bsql_exec(&bsql!("
76 DELETE FROM last_expiry
77 "))?;
78 dbt.bsql_exec(&bsql!("
79 INSERT INTO last_expiry " .insert_row(LastExpiryRow {
80 run: now.into(),
81 threshold: oldest_to_keep,
82 })"
83 "))?;
84
85 let archived_to = dbt.bsql_query_rows_concurrent(
86
87 &bsql!(
88 " SELECT * FROM jobs
89 WHERE last_update < " oldest_to_keep "
90 "),
91
92 |mut rows| {
93
94 let mut tarchiver = archiver.start_table("jobs")
95 .into_internal("prepare to archive expired rows")?;
96
97 macro_rules! handle_io_err { { $msg:expr } => {
100 |e| {
101 Err::<Void, _>(e)
103 .context(archive_file_tmp.clone())
104 .context($msg.to_string())
105 .into_internal("archive file")
106 .void_unwrap_err()
107 }
108 } }
109
110 let mut expired_counts = HashMap::<_, i64>::default();
111
112 while let Some(row) = rows.next()
113 .db_context("fetch row to expire")?
114 {
115 let jid: JobId = row.get("jid")
116 .db_context("extract jid")?;
117
118 n_expired += 1;
119 tarchiver.writer_mut().ensure_created()
120 .map_err(handle_io_err!("start file"))?;
121 tarchiver.write_row(row)
122 .map_err(handle_io_err!("write row"))?;
123
124 dbt.bsql_exec_concurrent(&bsql!(
127 "DELETE FROM job_history WHERE jid = " jid
128 ))?;
129 dbt.bsql_exec_concurrent(&bsql!(
130 "DELETE FROM jobs WHERE jid = " jid
131 ))?;
132
133 match JobRow::from_sql_row(&row) {
134 Ok(row) => {
135 let shown = ShownJobStatus::from_state(&row.s);
136 *expired_counts
137 .entry(shown)
138 .or_default()
139 += 1;
140 },
141 Err(e) => error!(%e,
142 "error interpreting expired row, for status"
143 ),
144 }
145 }
146
147 for (shown, n_jobs) in expired_counts {
148 dbt.bsql_exec_concurrent(&bsql!("
149 UPDATE stats_by_shown_status_expired
150 SET n_jobs = n_jobs + " n_jobs "
151 WHERE shown_status = " shown "
152 "))?;
153 }
154
155 let fwriter = archiver.finish_with_writer()
158 .map_err(handle_io_err!("complete"))?;
159
160 if let Some::<(File, String)>((file, _)) = {
161 fwriter.finish_created()
162 .map_err(handle_io_err!("finish"))?
163 } {
164 file.sync_all()
165 .map_err(handle_io_err!("fsync"))?;
166
167 fs::rename(
168 &archive_file_tmp,
169 &archive_file,
170 )
171 .with_context(|| archive_file.clone())
172 .with_context(|| archive_file_tmp.clone())
173 .into_internal("install archive file")?;
174
175 sync_directory(&archive_subdir)?;
176 sync_directory(&archive_dir)?;
177
178 Ok::<_, DbError<IE>>(Some(archive_leaf.clone()))
179 } else {
180 assert_eq!(n_expired, 0);
181 Ok::<_, DbError<IE>>(None)
182 }
183 }
184 )?;
185
186 Ok((n_expired, archived_to))
195 })?;
196
197 if let Some(archive) = archived_to {
198 info!(n_expired, %oldest_to_keep, ?archive, "expired");
199 } else {
200 info!(n_expired, %oldest_to_keep, "nothing expired");
201 }
202
203 Ok(())
204}
205
206impl LastExpiryRow {
207 pub fn get(dbt: &mut DbTransaction)
208 -> Result<LastExpiryRow, DbError<IE>>
209 {
210 let dummy_time: TimeT = 0.into();
211
212 let row = dbt.bsql_query_01(&bsql!("SELECT * FROM last_expiry"))?
213 .unwrap_or_else(|| LastExpiryRow {
214 run: dummy_time,
215 threshold: dummy_time,
216 });
217 Ok(row)
218 }
219
220 pub fn for_ui(dbt: &mut DbTransaction)
221 -> Result<UiSerializeMap<LastExpiryRow>, DbError<IE>>
222 {
223 Self::get(dbt).map(UiSerializeMap)
224 }
225}
226
227pub fn start_task(gl: &Arc<Globals>) {
228 gl.spawn_task_running("expiry", {
229 let gl = gl.clone();
230 async move {
231 loop {
232 let expire_every = *gl.config.intervals.expire_every;
233 select! {
234 () = tokio::time::sleep(expire_every) => {},
235 sd = gl.await_shutdown() => return Err(sd.into()),
236 };
237 expire_now(&gl)?;
238 }
239 }
240 })
241}
242
243pub mod deferred_file_creation {
246
247 use super::*;
248
249 define_derive_deftly! {
250 DelegateWrite for struct, expect items:
251
252 ${define FIELD { $( ${when fmeta(delegate_write)} $fname ) }}
253
254 impl io::Write for $ttype {
255 fn write(&mut self, data: &[u8]) -> io::Result<usize> {
256 io::Write::write(&mut self.$FIELD, data)
257 }
258 fn flush(&mut self) -> io::Result<()> {
259 io::Write::flush(&mut self.$FIELD)
260 }
261 }
262 }
263
264 #[derive(Debug, Deftly)]
265 #[derive_deftly(DelegateWrite)]
266 pub struct Writer {
267 filename: String,
268 #[deftly(delegate_write)]
269 file: Either<NotYetCreated, BufWriter<File>>,
270 }
271
272 #[derive(Debug, Deftly)]
273 #[derive_deftly(DelegateWrite)]
274 struct NotYetCreated {
275 #[deftly(delegate_write)]
276 buf: Vec<u8>,
277 open_options: fs::OpenOptions,
278 }
279
280 impl Writer {
281 pub fn new_uncreated(
282 filename: String,
283 mut open_options: fs::OpenOptions,
284 ) -> Self {
285 open_options.write(true);
286 let file = Either::Left(NotYetCreated {
287 buf: vec![],
288 open_options,
289 });
290 Writer { filename, file }
291 }
292
293 pub fn ensure_created(&mut self) -> io::Result<()> {
295 match &mut self.file {
296 Either::Left(nyc) => {
297 let file = nyc.open_options.open(&self.filename)?;
298 let mut file = BufWriter::new(file);
299 file.write_all(&mem::take(&mut nyc.buf))?;
300 self.file = Either::Right(file);
301 }
302 Either::Right(bw) => {
303 let _: &mut BufWriter<File> = bw;
304 }
305 }
306 Ok(())
307 }
308
309 pub fn finish_created(self) -> io::Result<Option<(File, String)>> {
314 match self.file {
315 Either::Left(nyc) => {
316 let NotYetCreated { .. } = nyc;
317 Ok(None)
318 },
319 Either::Right(file) => {
320 let file = file.into_inner()
321 .map_err(|e| e.into_error())?;
322 Ok(Some((file, self.filename)))
323 }
324 }
325 }
326 }
327}