pinenut_log/
extract.rs

1use std::{
2    io,
3    io::{BufReader, BufWriter, Read, Seek, Write},
4    ops::RangeInclusive,
5    path::{Path, PathBuf},
6    sync::Arc,
7};
8
9use thiserror::Error;
10
11use crate::{chunk, common, common::LazyFileWriter, logfile, logfile::Logfile, DateTime, Domain};
12
13/// Errors that can be occurred during the log extraction process ([`extract`]).
14#[derive(Error, Debug)]
15pub enum Error {
16    #[error(transparent)]
17    Io(#[from] io::Error),
18    #[error("the log file is invalid: {0}")]
19    FileInvalid(PathBuf),
20    #[error("the log file is incomplete: {0}")]
21    FileIncomplete(PathBuf),
22    #[error("logs in the specified time range were not found")]
23    NotFound,
24}
25
26/// Extracts the logs for the specified time range and writes them to the destination
27/// file.
28///
29/// Errors may be occurred during log writing, and the destination file may have been
30/// created by then. The caller is responsible for managing the destination file
31/// (e.g., deleting it) afterwards.
32pub fn extract(
33    domain: Domain,
34    time_range: RangeInclusive<DateTime>,
35    dest_path: impl AsRef<Path>,
36) -> Result<(), Error> {
37    let dest_path = dest_path.as_ref();
38    let mut writer = BufWriter::new(LazyFileWriter::new(dest_path));
39
40    for mut logfile in logfiles(domain, &time_range)? {
41        let mut reader = BufReader::new(logfile.open()?);
42        extract_chunks(&mut reader, &mut writer, &time_range)
43            .map_err(|err| Error::from_chunk_error(err, logfile.path()))?;
44    }
45
46    if writer.into_inner().map_err(|err| err.into_error())?.is_empty() {
47        Err(Error::NotFound)
48    } else {
49        Ok(())
50    }
51}
52
53// ============ Internal ============
54
55fn extract_chunks<R, W>(
56    reader: &mut R,
57    writer: &mut W,
58    time_range: &RangeInclusive<DateTime>,
59) -> Result<(), chunk::ReadError>
60where
61    R: Read + Seek,
62    W: Write,
63{
64    let mut reader = chunk::Reader::new(reader);
65    while let Some(header) = reader.read_header_or_reach_to_end()? {
66        if header.time_range().start().gt(time_range.end()) {
67            return Ok(());
68        }
69
70        let payload_len = header.payload_len();
71
72        if header.time_range().end().lt(time_range.start()) {
73            reader.skip(payload_len)?;
74            continue;
75        }
76
77        // Write header.
78        let header_bytes = header.clone().bytes();
79        writer.write_all(header_bytes.as_ref())?;
80
81        type FnSink<F> = common::FnSink<F, chunk::ReadError>;
82
83        // Write payload.
84        reader.read_payload(
85            payload_len,
86            &mut FnSink::new(|bytes: &[u8]| writer.write_all(bytes).map_err(Into::into)),
87        )?;
88    }
89    Ok(())
90}
91
92fn logfiles(domain: Domain, time_range: &RangeInclusive<DateTime>) -> Result<Vec<Logfile>, Error> {
93    let mut original =
94        Logfile::logfiles(&Arc::new(domain), logfile::Mode::Read)?.collect::<Vec<_>>();
95    original.sort_by_key(|f| f.datetime());
96
97    let mut logfiles = Vec::new();
98
99    for logfile in original {
100        if logfile.datetime().ge(time_range.end()) {
101            break;
102        }
103        if logfile.datetime().le(time_range.start()) && !logfiles.is_empty() {
104            logfiles.clear();
105        }
106        logfiles.push(logfile);
107    }
108
109    Ok(logfiles)
110}
111
112impl Error {
113    #[inline]
114    fn from_chunk_error(error: chunk::ReadError, path: PathBuf) -> Self {
115        use chunk::ReadError::*;
116        match error {
117            Io(err) => Self::Io(err),
118            Invalid => Self::FileInvalid(path),
119            UnexpectedEnd => Self::FileIncomplete(path),
120        }
121    }
122}