tag2upload_service_manager/
expire.rs

1
2use crate::prelude::*;
3
4use std::io::BufWriter;
5use std::fs::File;
6
7pub fn expire_now(gl: &Arc<Globals>) -> Result<(), IE> {
8    let overflow_msg = "implausible intervals.expiry or system time";
9
10    let now = gl.now_systemtime();
11
12    let oldest_to_keep: TimeT = {
13        TimeT::try_from(
14            now
15                .checked_sub(*gl.config.intervals.expire)
16                .ok_or_else(|| internal!("{overflow_msg}"))?
17        )
18            .into_internal(overflow_msg)?
19    };
20
21    let archive_dir = &gl.config.files.archive_dir;
22    let timestamp = format!("{}", HtTimeT(now.into()));
23    let archive_leaf = format!("expired-{timestamp}.dump.txt");
24    let archive_subdir = timestamp.split_once('-')
25        .ok_or_else(|| internal!(
26            "unexpected archive timestamp format {timestamp}"
27        ))?
28        .0;
29    let archive_subdir = format!("{archive_dir}/{archive_subdir}");
30    let archive_file = format!("{archive_subdir}/{archive_leaf}");
31    let archive_file_tmp = format!("{archive_file}.tmp");
32
33    let sync_directory = |d| (|| {
34        File::open(d).context("open")?
35            .sync_all().context("sync")
36    })()
37        .with_context(|| format!("{d:?}"))
38        .into_internal("fsync archive (sub)directory");
39
40    let (n_expired, archived_to) = db_transaction(TN::Expiry, |dbt| {
41
42        // Clear out any existing *.tmp files
43
44        let clearout_glob = format!("{archive_dir}/*/expired-*.dump.txt.tmp");
45        (|| {
46            for clearout in glob::glob(&clearout_glob).context("glob start")? {
47                let clearout = clearout.context("glob")?;
48                fs::remove_file(&clearout)
49                    .with_context(|| clearout.display().to_string())
50                    .context("delete")?;
51            }
52            Ok::<_, AE>(())
53        })().with_context(|| clearout_glob.clone())
54            .into_internal("clear out old tmp files")?;
55
56        // Create the subdirectory
57
58        fs::create_dir(&archive_subdir).or_else(|e| match e {
59            e if e.kind() == io::ErrorKind::AlreadyExists => Ok(()),
60            other => Err(other),
61        }).into_internal("create archive subdirectory")?;
62
63        // Prepare to write an archive file
64
65        let fwriter = deferred_file_creation::Writer::new_uncreated(
66            archive_file_tmp.clone(),
67            fs::OpenOptions::new().create_new(true).clone(),
68        );
69        let mut archiver = mini_sqlite_dump::Archiver::start(
70            dbt, fwriter, ["jobs"],
71        ).into_internal("prepare to archive expired data")?;
72
73        let mut n_expired = 0;
74
75        dbt.bsql_exec(&bsql!("
76            DELETE FROM last_expiry
77        "))?;
78        dbt.bsql_exec(&bsql!("
79            INSERT INTO last_expiry " .insert_row(LastExpiryRow {
80                run: now.into(),
81                threshold: oldest_to_keep,
82            })"
83        "))?;
84
85        let archived_to = dbt.bsql_query_rows_concurrent(
86
87            &bsql!(
88                " SELECT * FROM jobs
89                          WHERE last_update < " oldest_to_keep "
90            "),
91
92            |mut rows| {
93
94                let mut tarchiver = archiver.start_table("jobs")
95                    .into_internal("prepare to archive expired rows")?;
96
97                /// `fn handle_io_err<E>(msg: impl Display)
98                ///     -> impl FnOnce(E) -> DbError<IE>;`
99                macro_rules! handle_io_err { { $msg:expr } => {
100                    |e| {
101                        // Trickery to ensure that `e` is safely eraseable,
102                        Err::<Void, _>(e)
103                            .context(archive_file_tmp.clone())
104                            .context($msg.to_string())
105                            .into_internal("archive file")
106                            .void_unwrap_err()
107                    }
108                } }
109
110                let mut expired_counts = HashMap::<_, i64>::default();
111
112                while let Some(row) = rows.next()
113                    .db_context("fetch row to expire")?
114                {
115                    let jid: JobId = row.get("jid")
116                        .db_context("extract jid")?;
117
118                    n_expired += 1;
119                    tarchiver.writer_mut().ensure_created()
120                        .map_err(handle_io_err!("start file"))?;
121                    tarchiver.write_row(row)
122                        .map_err(handle_io_err!("write row"))?;
123
124                    // We don't archive all the intermediate states,
125                    // only the final one.
126                    dbt.bsql_exec_concurrent(&bsql!(
127                        "DELETE FROM job_history WHERE jid = " jid
128                    ))?;
129                    dbt.bsql_exec_concurrent(&bsql!(
130                        "DELETE FROM jobs WHERE jid = " jid
131                    ))?;
132
133                    match JobRow::from_sql_row(&row) {
134                        Ok(row) => {
135                            let shown = ShownJobStatus::from_state(&row.s);
136                            *expired_counts
137                                .entry(shown)
138                                .or_default()
139                                += 1;
140                        },
141                        Err(e) => error!(%e,
142                     "error interpreting expired row, for status"
143                        ),
144                    }
145                }
146
147                for (shown, n_jobs) in expired_counts {
148                    dbt.bsql_exec_concurrent(&bsql!("
149                             UPDATE stats_by_shown_status_expired
150                                SET n_jobs = n_jobs + " n_jobs "
151                              WHERE shown_status = " shown "
152                            "))?;
153                }
154
155                // If we actually did any rows, flush and install
156
157                let fwriter = archiver.finish_with_writer()
158                    .map_err(handle_io_err!("complete"))?;
159
160                if let Some::<(File, String)>((file, _)) = {
161                    fwriter.finish_created()
162                        .map_err(handle_io_err!("finish"))?
163                } {
164                    file.sync_all()
165                        .map_err(handle_io_err!("fsync"))?;
166
167                    fs::rename(
168                        &archive_file_tmp,
169                        &archive_file,
170                    )
171                        .with_context(|| archive_file.clone())
172                        .with_context(|| archive_file_tmp.clone())
173                        .into_internal("install archive file")?;
174
175                    sync_directory(&archive_subdir)?;
176                    sync_directory(&archive_dir)?;
177
178                    Ok::<_, DbError<IE>>(Some(archive_leaf.clone()))
179                } else {
180                    assert_eq!(n_expired, 0);
181                    Ok::<_, DbError<IE>>(None)
182                }
183            }
184        )?;
185
186        // By returning `Ok` we instruct db_transaction to COMMIT.
187        // We have already installed the new archive file, above.
188        //
189        // COMMIT isn't supposed to be likely to fail since our TN::Expiry
190        // causes us to use BEGIN IMMEDIATE, so there oughtn't to e conflicts.
191        //
192        // If thins *do* go badly, we have written the archive,
193        // but not actually expired the rows, which is the better failure.
194        Ok((n_expired, archived_to))
195    })?;
196
197    if let Some(archive) = archived_to {
198        info!(n_expired, %oldest_to_keep, ?archive, "expired");
199    } else {
200        info!(n_expired, %oldest_to_keep, "nothing expired");
201    }
202
203    Ok(())
204}
205
206impl LastExpiryRow {
207    pub fn get(dbt: &mut DbTransaction)
208        -> Result<LastExpiryRow, DbError<IE>>
209    {
210        let dummy_time: TimeT = 0.into();
211
212        let row = dbt.bsql_query_01(&bsql!("SELECT * FROM last_expiry"))?
213            .unwrap_or_else(|| LastExpiryRow {
214                run: dummy_time,
215                threshold: dummy_time,
216            });
217        Ok(row)
218    }
219
220    pub fn for_ui(dbt: &mut DbTransaction)
221                  -> Result<UiSerializeMap<LastExpiryRow>, DbError<IE>>
222    {
223        Self::get(dbt).map(UiSerializeMap)
224    }
225}
226
227pub fn start_task(gl: &Arc<Globals>) {
228    gl.spawn_task_running("expiry", {
229        let gl = gl.clone();
230        async move {
231            loop {
232                let expire_every = *gl.config.intervals.expire_every;
233                select! {
234                    () = tokio::time::sleep(expire_every) => {},
235                    sd = gl.await_shutdown() => return Err(sd.into()),
236                };
237                expire_now(&gl)?;
238            }
239        }
240    })
241}
242
243//---------- deferred file writer ----------
244
245pub mod deferred_file_creation {
246
247    use super::*;
248
249    define_derive_deftly! {
250        DelegateWrite for struct, expect items:
251
252        ${define FIELD { $( ${when fmeta(delegate_write)} $fname ) }}
253
254        impl io::Write for $ttype {
255            fn write(&mut self, data: &[u8]) -> io::Result<usize> {
256                io::Write::write(&mut self.$FIELD, data)
257            }
258            fn flush(&mut self) -> io::Result<()> {
259                io::Write::flush(&mut self.$FIELD)
260            }
261        }
262    }
263
264    #[derive(Debug, Deftly)]
265    #[derive_deftly(DelegateWrite)]
266    pub struct Writer {
267        filename: String,
268        #[deftly(delegate_write)]
269        file: Either<NotYetCreated, BufWriter<File>>,
270    }
271
272    #[derive(Debug, Deftly)]
273    #[derive_deftly(DelegateWrite)]
274    struct NotYetCreated {
275        #[deftly(delegate_write)]
276        buf: Vec<u8>,
277        open_options: fs::OpenOptions,
278    }
279
280    impl Writer {
281        pub fn new_uncreated(
282            filename: String,
283            mut open_options: fs::OpenOptions,
284        ) -> Self {
285            open_options.write(true);
286            let file = Either::Left(NotYetCreated {
287                buf: vec![],
288                open_options,
289            });
290            Writer { filename, file }
291        }
292
293        /// Make the file exist (idempotent)
294        pub fn ensure_created(&mut self) -> io::Result<()> {
295            match &mut self.file {
296                Either::Left(nyc) => {
297                    let file = nyc.open_options.open(&self.filename)?;
298                    let mut file = BufWriter::new(file);
299                    file.write_all(&mem::take(&mut nyc.buf))?;
300                    self.file = Either::Right(file);
301                }
302                Either::Right(bw) => {
303                    let _: &mut BufWriter<File> = bw;
304                }
305            }
306            Ok(())
307        }
308
309        /// Finish up
310        ///
311        /// If the file was created, flush it and return the filename.
312        /// Otherwise, return None.
313        pub fn finish_created(self) -> io::Result<Option<(File, String)>> {
314            match self.file {
315                Either::Left(nyc) => {
316                    let NotYetCreated { .. } = nyc;
317                    Ok(None)
318                },
319                Either::Right(file) => {
320                    let file = file.into_inner()
321                        .map_err(|e| e.into_error())?;
322                    Ok(Some((file, self.filename)))
323                }
324            }
325        }
326    }
327}