tag2upload_service_manager/
expire.rs
use crate::prelude::*;
use std::io::BufWriter;
use std::fs::File;
pub fn expire_now(gl: &Arc<Globals>) -> Result<(), IE> {
let overflow_msg = "implausible intervals.expiry or system time";
let now = gl.now_systemtime();
let oldest_to_keep: TimeT =
now
.checked_sub(*gl.config.intervals.expire)
.ok_or_else(|| internal!("{overflow_msg}"))?
.try_into().into_internal(overflow_msg)?;
let archive_dir = &gl.config.files.archive_dir;
let timestamp = format!("{}", HtTimeT(now.into()));
let archive_leaf = format!("expired-{timestamp}.dump.txt");
let archive_subdir = timestamp.split_once('-')
.ok_or_else(|| internal!(
"unexpected archive timestamp format {timestamp}"
))?
.0;
let archive_subdir = format!("{archive_dir}/{archive_subdir}");
let archive_file = format!("{archive_subdir}/{archive_leaf}");
let archive_file_tmp = format!("{archive_file}.tmp");
let (n_expired, archived_to) = db_transaction(TN::Expiry, |dbt| {
let clearout_glob = format!("{archive_dir}/*/expired-*.dump.txt.tmp");
(|| {
for clearout in glob::glob(&clearout_glob).context("glob start")? {
let clearout = clearout.context("glob")?;
fs::remove_file(&clearout)
.with_context(|| clearout.display().to_string())
.context("delete")?;
}
Ok::<_, AE>(())
})().with_context(|| clearout_glob.clone())
.into_internal("clear out old tmp files")?;
fs::create_dir(&archive_subdir).or_else(|e| match e {
e if e.kind() == io::ErrorKind::AlreadyExists => Ok(()),
other => Err(other),
}).into_internal("create archive subdirectory")?;
let fwriter = deferred_file_creation::Writer::new_uncreated(
archive_file_tmp.clone(),
fs::OpenOptions::new().create_new(true).clone(),
);
let mut archiver = mini_sqlite_dump::Archiver::start(
dbt, fwriter, ["jobs"],
).into_internal("prepare to archive expired data")?;
let mut n_expired = 0;
let archived_to = dbt.bsql_query_rows_concurrent(
bsql!(
" SELECT * FROM jobs
WHERE last_update < " oldest_to_keep "
"),
|mut rows| {
let mut tarchiver = archiver.start_table("jobs")
.into_internal("prepare to archive expired rows")?;
while let Some(row) = rows.next()
.into_internal("fetch row to expire")?
{
let jid: JobId = row.get("jid")
.into_internal("extract jid")?;
n_expired += 1;
tarchiver.writer_mut().ensure_created()
.with_context(|| archive_file_tmp.clone())
.into_internal("start archive file")?;
tarchiver.write_row(row)
.with_context(|| archive_file_tmp.clone())
.with_context(|| format!("jobid={jid}"))
.into_internal("write row to archive file")?;
dbt.bsql_exec_concurrent(bsql!(
"DELETE FROM jobs WHERE jid = " jid
))?;
}
let fwriter = archiver.finish_with_writer()
.with_context(|| archive_file_tmp.clone())
.into_internal("complete archive file")?;
if let Some::<String>(_) = fwriter.finish_created()
.with_context(|| archive_file_tmp.clone())
.into_internal("finish archive file")?
{
fs::rename(
&archive_file_tmp,
&archive_file,
)
.with_context(|| archive_file.clone())
.with_context(|| archive_file_tmp.clone())
.into_internal("install archive file")?;
Ok::<_, IE>(Some(archive_leaf.clone()))
} else {
assert_eq!(n_expired, 0);
Ok::<_, IE>(None)
}
}
)??;
Ok::<_, IE>((n_expired, archived_to))
})??;
if let Some(archive) = archived_to {
info!(n_expired, %oldest_to_keep, ?archive, "expired");
} else {
info!(n_expired, %oldest_to_keep, "nothing expired");
}
Ok(())
}
pub fn start_task(gl: &Arc<Globals>) {
gl.spawn_task_running("expiry", {
let gl = gl.clone();
async move {
loop {
let expire_every = *gl.config.intervals.expire_every;
select! {
() = tokio::time::sleep(expire_every) => {},
sd = gl.await_shutdown() => return Err(sd.into()),
};
expire_now(&gl)?;
}
}
})
}
pub mod deferred_file_creation {
use super::*;
define_derive_deftly! {
DelegateWrite for struct, expect items:
${define FIELD { $( ${when fmeta(delegate_write)} $fname ) }}
impl io::Write for $ttype {
fn write(&mut self, data: &[u8]) -> io::Result<usize> {
io::Write::write(&mut self.$FIELD, data)
}
fn flush(&mut self) -> io::Result<()> {
io::Write::flush(&mut self.$FIELD)
}
}
}
#[derive(Debug, Deftly)]
#[derive_deftly(DelegateWrite)]
pub struct Writer {
filename: String,
#[deftly(delegate_write)]
file: Either<NotYetCreated, BufWriter<File>>,
}
#[derive(Debug, Deftly)]
#[derive_deftly(DelegateWrite)]
struct NotYetCreated {
#[deftly(delegate_write)]
buf: Vec<u8>,
open_options: fs::OpenOptions,
}
impl Writer {
pub fn new_uncreated(
filename: String,
mut open_options: fs::OpenOptions,
) -> Self {
open_options.write(true);
let file = Either::Left(NotYetCreated {
buf: vec![],
open_options,
});
Writer { filename, file }
}
pub fn ensure_created(&mut self) -> io::Result<()> {
match &mut self.file {
Either::Left(nyc) => {
let file = nyc.open_options.open(&self.filename)?;
let mut file = BufWriter::new(file);
file.write_all(&mem::take(&mut nyc.buf))?;
self.file = Either::Right(file);
}
Either::Right(bw) => {
let _: &mut BufWriter<File> = bw;
}
}
Ok(())
}
pub fn finish_created(self) -> io::Result<Option<String>> {
match self.file {
Either::Left(nyc) => {
let NotYetCreated { .. } = nyc;
Ok(None)
},
Either::Right(mut file) => {
BufWriter::flush(&mut file)?;
Ok(Some(self.filename))
}
}
}
}
}