tag2upload_service_manager/
expire.rs

1
2use crate::prelude::*;
3
4use std::io::BufWriter;
5use std::fs::File;
6
7pub fn expire_now(gl: &Arc<Globals>) -> Result<(), IE> {
8    let overflow_msg = "implausible intervals.expiry or system time";
9
10    let now = gl.now_systemtime();
11
12    let oldest_to_keep: TimeT =
13        now
14        .checked_sub(*gl.config.intervals.expire)
15        .ok_or_else(|| internal!("{overflow_msg}"))?
16        .try_into().into_internal(overflow_msg)?;
17
18    let archive_dir = &gl.config.files.archive_dir;
19    let timestamp = format!("{}", HtTimeT(now.into()));
20    let archive_leaf = format!("expired-{timestamp}.dump.txt");
21    let archive_subdir = timestamp.split_once('-')
22        .ok_or_else(|| internal!(
23            "unexpected archive timestamp format {timestamp}"
24        ))?
25        .0;
26    let archive_subdir = format!("{archive_dir}/{archive_subdir}");
27    let archive_file = format!("{archive_subdir}/{archive_leaf}");
28    let archive_file_tmp = format!("{archive_file}.tmp");
29
30    let sync_directory = |d| (|| {
31        File::open(d).context("open")?
32            .sync_all().context("sync")
33    })()
34        .with_context(|| format!("{d:?}"))
35        .into_internal("fsync archive (sub)directory");
36
37    let (n_expired, archived_to) = db_transaction(TN::Expiry, |dbt| {
38
39        // Clear out any existing *.tmp files
40
41        let clearout_glob = format!("{archive_dir}/*/expired-*.dump.txt.tmp");
42        (|| {
43            for clearout in glob::glob(&clearout_glob).context("glob start")? {
44                let clearout = clearout.context("glob")?;
45                fs::remove_file(&clearout)
46                    .with_context(|| clearout.display().to_string())
47                    .context("delete")?;
48            }
49            Ok::<_, AE>(())
50        })().with_context(|| clearout_glob.clone())
51            .into_internal("clear out old tmp files")?;
52
53        // Create the subdirectory
54
55        fs::create_dir(&archive_subdir).or_else(|e| match e {
56            e if e.kind() == io::ErrorKind::AlreadyExists => Ok(()),
57            other => Err(other),
58        }).into_internal("create archive subdirectory")?;
59
60        // Prepare to write an archive file
61
62        let fwriter = deferred_file_creation::Writer::new_uncreated(
63            archive_file_tmp.clone(),
64            fs::OpenOptions::new().create_new(true).clone(),
65        );
66        let mut archiver = mini_sqlite_dump::Archiver::start(
67            dbt, fwriter, ["jobs"],
68        ).into_internal("prepare to archive expired data")?;
69
70        let mut n_expired = 0;
71
72        dbt.bsql_exec(bsql!("
73            DELETE FROM last_expiry
74        "))?;
75        dbt.bsql_exec(bsql!("
76            INSERT INTO last_expiry " +~(LastExpiryRow {
77                run: now.into(),
78                threshold: oldest_to_keep,
79            })"
80        "))?;
81
82        let archived_to = dbt.bsql_query_rows_concurrent(
83
84            bsql!(
85                " SELECT * FROM jobs
86                          WHERE last_update < " oldest_to_keep "
87            "),
88
89            |mut rows| {
90
91                let mut tarchiver = archiver.start_table("jobs")
92                    .into_internal("prepare to archive expired rows")?;
93
94                let mut expired_counts = HashMap::<_, i64>::default();
95
96                while let Some(row) = rows.next()
97                    .into_internal("fetch row to expire")?
98                {
99                    let jid: JobId = row.get("jid")
100                        .into_internal("extract jid")?;
101
102                    n_expired += 1;
103                    tarchiver.writer_mut().ensure_created()
104                        .with_context(|| archive_file_tmp.clone())
105                        .into_internal("start archive file")?;
106                    tarchiver.write_row(row)
107                        .with_context(|| archive_file_tmp.clone())
108                        .with_context(|| format!("jobid={jid}"))
109                        .into_internal("write row to archive file")?;
110
111                    dbt.bsql_exec_concurrent(bsql!(
112                        "DELETE FROM jobs WHERE jid = " jid
113                    ))?;
114
115                    match JobRow::from_sql_row(&row) {
116                        Ok(row) => {
117                            let shown = ShownJobStatus::new(
118                                row.status,
119                                row.duplicate_of
120                            );
121                            *expired_counts
122                                .entry(shown)
123                                .or_default()
124                                += 1;
125                        },
126                        Err(e) => error!(%e,
127                     "error interpreting expired row, for status"
128                        ),
129                    }
130                }
131
132                for (shown, n_jobs) in expired_counts {
133                    dbt.bsql_exec_concurrent(bsql!("
134                             UPDATE stats_by_shown_status_expired
135                                SET n_jobs = n_jobs + " n_jobs "
136                              WHERE shown_status = " shown "
137                            "))?;
138                }
139
140                // If we actually did any rows, flush and install
141
142                let fwriter = archiver.finish_with_writer()
143                    .with_context(|| archive_file_tmp.clone())
144                    .into_internal("complete archive file")?;
145
146                if let Some::<(File, String)>((file, _)) = {
147                    fwriter.finish_created()
148                        .with_context(|| archive_file_tmp.clone())
149                        .into_internal("finish archive file")?
150                } {
151                    file.sync_all()
152                        .with_context(|| archive_file_tmp.clone())
153                        .into_internal("fsync archive file")?;
154
155                    fs::rename(
156                        &archive_file_tmp,
157                        &archive_file,
158                    )
159                        .with_context(|| archive_file.clone())
160                        .with_context(|| archive_file_tmp.clone())
161                        .into_internal("install archive file")?;
162
163                    sync_directory(&archive_subdir)?;
164                    sync_directory(&archive_dir)?;
165
166                    Ok::<_, IE>(Some(archive_leaf.clone()))
167                } else {
168                    assert_eq!(n_expired, 0);
169                    Ok::<_, IE>(None)
170                }
171            }
172        )??;
173
174        // By returning `Ok` we instruct db_transaction to COMMIT.
175        // We have already installed the new archive file, above.
176        //
177        // COMMIT isn't supposed to be likely to fail since our TN::Expiry
178        // causes us to use BEGIN IMMEDIATE, so there oughtn't to e conflicts.
179        //
180        // If thins *do* go badly, we have written the archive,
181        // but not actually expired the rows, which is the better failure.
182        Ok::<_, IE>((n_expired, archived_to))
183    })??;
184
185    if let Some(archive) = archived_to {
186        info!(n_expired, %oldest_to_keep, ?archive, "expired");
187    } else {
188        info!(n_expired, %oldest_to_keep, "nothing expired");
189    }
190
191    Ok(())
192}
193
194impl LastExpiryRow {
195    pub fn get(dbt: &mut rusqlite::Transaction) -> Result<LastExpiryRow, IE> {
196        let dummy_time: TimeT = 0.into();
197
198        let row = dbt.bsql_query_01(bsql!("SELECT * FROM last_expiry"))?
199            .unwrap_or_else(|| LastExpiryRow {
200                run: dummy_time,
201                threshold: dummy_time,
202            });
203        Ok(row)
204    }
205
206    pub fn for_ui(dbt: &mut rusqlite::Transaction)
207                  -> Result<UiSerializeMap<LastExpiryRow>, IE>
208    {
209        Self::get(dbt).map(UiSerializeMap)
210    }
211}
212
213pub fn start_task(gl: &Arc<Globals>) {
214    gl.spawn_task_running("expiry", {
215        let gl = gl.clone();
216        async move {
217            loop {
218                let expire_every = *gl.config.intervals.expire_every;
219                select! {
220                    () = tokio::time::sleep(expire_every) => {},
221                    sd = gl.await_shutdown() => return Err(sd.into()),
222                };
223                expire_now(&gl)?;
224            }
225        }
226    })
227}
228
229//---------- deferred file writer ----------
230
231pub mod deferred_file_creation {
232
233    use super::*;
234
235    define_derive_deftly! {
236        DelegateWrite for struct, expect items:
237
238        ${define FIELD { $( ${when fmeta(delegate_write)} $fname ) }}
239
240        impl io::Write for $ttype {
241            fn write(&mut self, data: &[u8]) -> io::Result<usize> {
242                io::Write::write(&mut self.$FIELD, data)
243            }
244            fn flush(&mut self) -> io::Result<()> {
245                io::Write::flush(&mut self.$FIELD)
246            }
247        }
248    }
249
250    #[derive(Debug, Deftly)]
251    #[derive_deftly(DelegateWrite)]
252    pub struct Writer {
253        filename: String,
254        #[deftly(delegate_write)]
255        file: Either<NotYetCreated, BufWriter<File>>,
256    }
257
258    #[derive(Debug, Deftly)]
259    #[derive_deftly(DelegateWrite)]
260    struct NotYetCreated {
261        #[deftly(delegate_write)]
262        buf: Vec<u8>,
263        open_options: fs::OpenOptions,
264    }
265
266    impl Writer {
267        pub fn new_uncreated(
268            filename: String,
269            mut open_options: fs::OpenOptions,
270        ) -> Self {
271            open_options.write(true);
272            let file = Either::Left(NotYetCreated {
273                buf: vec![],
274                open_options,
275            });
276            Writer { filename, file }
277        }
278
279        /// Make the file exist (idempotent)
280        pub fn ensure_created(&mut self) -> io::Result<()> {
281            match &mut self.file {
282                Either::Left(nyc) => {
283                    let file = nyc.open_options.open(&self.filename)?;
284                    let mut file = BufWriter::new(file);
285                    file.write_all(&mem::take(&mut nyc.buf))?;
286                    self.file = Either::Right(file);
287                }
288                Either::Right(bw) => {
289                    let _: &mut BufWriter<File> = bw;
290                }
291            }
292            Ok(())
293        }
294
295        /// Finish up
296        ///
297        /// If the file was created, flush it and return the filename.
298        /// Otherwise, return None.
299        pub fn finish_created(self) -> io::Result<Option<(File, String)>> {
300            match self.file {
301                Either::Left(nyc) => {
302                    let NotYetCreated { .. } = nyc;
303                    Ok(None)
304                },
305                Either::Right(file) => {
306                    let file = file.into_inner()
307                        .map_err(|e| e.into_error())?;
308                    Ok(Some((file, self.filename)))
309                }
310            }
311        }
312    }
313}