pinenut_log/
parse.rs

1use std::{
2    collections::HashMap,
3    fs::File,
4    io,
5    io::{BufReader, BufWriter, Write},
6    ops::{Deref, RangeInclusive},
7    path::Path,
8};
9
10use thiserror::Error;
11
12use crate::{
13    chunk,
14    codec::Decode,
15    common::{BytesBuf, FnSink, LazyFileWriter},
16    compress::{Decompressor, ZstdDecompressor},
17    encrypt::{
18        ecdh::{ecdh_encryption_key, EMPTY_PUBLIC_KEY},
19        AesDecryptor, Decryptor,
20    },
21    DateTime, DecodingError, DecompressionError, DecryptionError, EncryptionError, EncryptionKey,
22    PublicKey, Record, SecretKey, BUFFER_LEN, FORMAT_VERSION,
23};
24
25/// Errors that can be occurred during the log parsing process ([`parse`]).
26#[derive(Error, Debug)]
27pub enum Error {
28    #[error(transparent)]
29    Io(#[from] io::Error),
30    #[error("the log file is invalid")]
31    FileInvalid,
32    #[error("the log file is incomplete")]
33    FileIncomplete,
34
35    // Chunk errors:
36    #[error("decrypt error: {0}, in {1:?}")]
37    Decrypt(DecryptionError, RangeInclusive<DateTime>),
38    #[error("decompress error: {0}, in {1:?}")]
39    Decompress(DecompressionError, RangeInclusive<DateTime>),
40    #[error("decode error: {0}, in {1:?}")]
41    Decode(DecodingError, RangeInclusive<DateTime>),
42
43    // The collection of chunk errors.
44    #[error("chunk errors: {:#?}", .0.iter().map(|e|e.to_string()).collect::<Vec<_>>())]
45    Chunks(Vec<Error>),
46}
47
48/// Parses the compressed and encrypted binary log file into multiple log records and
49/// calls them back one by one.
50pub fn parse(
51    path: impl AsRef<Path>,
52    secret_key: Option<SecretKey>,
53    callback: impl FnMut(&Record) -> Result<(), io::Error>,
54) -> Result<(), Error> {
55    let reader = BufReader::new(File::open(path.as_ref())?);
56    let mut reader = chunk::Reader::new(reader);
57
58    let parser = RecordParser::new(callback);
59    let mut processor = Processor::new(secret_key, parser);
60
61    let mut chunk_errors = Vec::new();
62
63    while let Some(header) = reader.read_header_or_reach_to_end()? {
64        // Version is not supported, just skips this chunk.
65        if header.version() != FORMAT_VERSION {
66            continue;
67        }
68
69        let payload_len = header.payload_len();
70        let time_range = header.time_range().start()..=header.time_range().end();
71        let mut sink =
72            processor.chunk_sink(payload_len, header.pub_key(), time_range, header.writeback());
73
74        if let Err(err) = reader.read_payload(payload_len, &mut sink) {
75            if err.can_continue_to_read_chunk() {
76                chunk_errors.push(err);
77            } else {
78                return Err(err);
79            }
80        }
81    }
82
83    if chunk_errors.is_empty() {
84        Ok(())
85    } else {
86        Err(Error::Chunks(chunk_errors))
87    }
88}
89
90/// Represents a formatter that formats log records into readable text.
91pub trait Format {
92    /// Formats the log record then passes the result to the writer.
93    fn format(&mut self, record: &Record, writer: &mut impl Write) -> io::Result<()>;
94}
95
96/// Parses the compressed and encrypted binary log file into readable text file.
97///
98/// Errors may be occurred during log writing, and the destination file may have been
99/// created by then. The caller is responsible for managing the destination file
100/// (e.g., deleting it) afterwards.
101#[inline]
102pub fn parse_to_file(
103    path: impl AsRef<Path>,
104    dest_path: impl AsRef<Path>,
105    secret_key: Option<SecretKey>,
106    mut formatter: impl Format,
107) -> Result<(), Error> {
108    let dest_path = dest_path.as_ref();
109    let mut writer = BufWriter::new(LazyFileWriter::new(dest_path));
110    parse(path, secret_key, |record| formatter.format(record, &mut writer))
111}
112
113/// The default formatter provides simple log formatting.
114pub struct DefaultFormatter;
115
116impl Format for DefaultFormatter {
117    #[inline]
118    fn format(&mut self, record: &Record, writer: &mut impl Write) -> io::Result<()> {
119        const LEVELS: [&str; 5] = ["E", "W", "I", "D", "V"];
120        let (meta, content) = (record.meta(), record.content());
121        let datetime: chrono::DateTime<chrono::Local> = meta.datetime().into();
122
123        writeln!(
124            writer,
125            "[{}] {}|{}|{}:{}|{}|{}",
126            LEVELS[meta.level() as usize - 1],
127            datetime.format("%F %T%.3f"),
128            meta.thread_id().unwrap_or(0),
129            meta.location().file().unwrap_or(""),
130            meta.location().line().unwrap_or(0),
131            meta.tag().unwrap_or(""),
132            content
133        )
134    }
135}
136
137// ============ Internal ============
138
139#[derive(Error, Debug)]
140enum ChunkError {
141    #[error(transparent)]
142    Io(#[from] io::Error),
143    #[error(transparent)]
144    Decrypt(#[from] DecryptionError),
145    #[error(transparent)]
146    Decompress(#[from] DecompressionError),
147    #[error(transparent)]
148    Decode(#[from] DecodingError),
149}
150
151/// # Workflow
152///
153/// ```plain
154/// ┌──────────────┐   ┌───────────┐   ┌────────────┐   ┌──────────┐   ┌────────────┐
155/// │  Read Chunk  │──▶│  Decrypt  │──▶│ Decompress │──▶│  Decode  │──▶│  Callback  │
156/// └──────────────┘   └───────────┘   └────────────┘   └──────────┘   └────────────┘
157/// ```
158struct Processor<F> {
159    decompressor: ZstdDecompressor,
160    secret_key: Option<SecretKey>,
161    encryption_keys: HashMap<PublicKey, EncryptionKey>,
162    parser: RecordParser<F>,
163}
164
165impl<F> Processor<F>
166where
167    F: FnMut(&Record) -> Result<(), io::Error>,
168{
169    #[inline]
170    fn new(secret_key: Option<SecretKey>, parser: RecordParser<F>) -> Self {
171        Self {
172            decompressor: ZstdDecompressor::new(),
173            secret_key,
174            encryption_keys: HashMap::new(),
175            parser,
176        }
177    }
178
179    fn obtain_decryptor(
180        &mut self,
181        pub_key: PublicKey,
182    ) -> Result<Option<AesDecryptor>, EncryptionError> {
183        if pub_key == EMPTY_PUBLIC_KEY {
184            // No encryption.
185            Ok(None)
186        } else if let Some(key) = self.encryption_keys.get(&pub_key) {
187            // Hit cache.
188            Ok(Some(AesDecryptor::new(key)))
189        } else {
190            // Negotiates the key.
191            if let Some(secret_key) = self.secret_key.as_ref() {
192                let key = ecdh_encryption_key(secret_key, &pub_key)?;
193                let key = self.encryption_keys.entry(pub_key).or_insert(key);
194                Ok(Some(AesDecryptor::new(key)))
195            } else {
196                Ok(None)
197            }
198        }
199    }
200
201    fn chunk_sink(
202        &mut self,
203        payload_len: usize,
204        pub_key: PublicKey,
205        time_range: RangeInclusive<DateTime>,
206        writeback: bool,
207    ) -> FnSink<impl FnMut(&[u8]) -> Result<(), Error> + '_, Error> {
208        let mut read_len = 0;
209        let mut decryptor = self.obtain_decryptor(pub_key);
210
211        FnSink::new(move |bytes: &[u8]| {
212            read_len += bytes.len();
213            let reached_to_end = read_len == payload_len;
214
215            let decryptor =
216                decryptor.as_mut().map_err(|e| Error::Decrypt(e.clone(), time_range.clone()))?;
217
218            let mut to_decompressor = FnSink::new(|bytes: &[u8]| {
219                self.decompressor.decompress(
220                    bytes,
221                    &mut FnSink::new(|bytes: &[u8]| self.parser.parse_all(bytes)),
222                )
223            });
224
225            // Because the data of the chunk written back is incomplete (the last encrypted block
226            // is lost), padding is not required when decrypting.
227            decryptor
228                .decrypt(bytes, reached_to_end && !writeback, &mut to_decompressor)
229                .map_err(|e| Error::from_chunk_error(e, time_range.clone()))?;
230
231            if reached_to_end {
232                self.parser.clear_buffer();
233            }
234
235            Ok(())
236        })
237    }
238}
239
240struct RecordParser<F> {
241    callback: F,
242    buffer: BytesBuf,
243}
244
245impl<F> RecordParser<F>
246where
247    F: FnMut(&Record) -> Result<(), io::Error>,
248{
249    #[inline]
250    fn new(callback: F) -> Self {
251        Self { callback, buffer: BytesBuf::with_capacity(BUFFER_LEN) }
252    }
253
254    #[inline]
255    fn parse_all(&mut self, mut bytes: &[u8]) -> Result<(), ChunkError> {
256        while !bytes.is_empty() {
257            let len = self.parse(bytes)?;
258            bytes = &bytes[len..];
259        }
260        Ok(())
261    }
262
263    fn parse(&mut self, bytes: &[u8]) -> Result<usize, ChunkError> {
264        let len = self.buffer.buffer(bytes);
265        let mut source = self.buffer.deref();
266        let mut read_len = 0;
267
268        let res = loop {
269            if source.is_empty() {
270                break Ok(());
271            }
272            match Record::decode(&mut source) {
273                Ok(record) => {
274                    read_len = self.buffer.len() - source.len();
275                    if let Err(e) = (self.callback)(&record) {
276                        break Err(e.into());
277                    }
278                }
279                // Not necessarily an error, writer needs to continue reading bytes.
280                Err(ref e) if matches!(e, DecodingError::UnexpectedEnd { .. }) => break Ok(()),
281                Err(e) => break Err(e.into()),
282            }
283        };
284
285        self.buffer.drain(read_len);
286        res.map(|_| len)
287    }
288
289    fn clear_buffer(&mut self) {
290        self.buffer.clear()
291    }
292}
293
294impl Error {
295    #[inline]
296    fn can_continue_to_read_chunk(&self) -> bool {
297        matches!(self, Self::Decrypt(..))
298            || matches!(self, Self::Decompress(..))
299            || matches!(self, Self::Decode(..))
300    }
301
302    #[inline]
303    fn from_chunk_error(error: ChunkError, time_range: RangeInclusive<DateTime>) -> Self {
304        use ChunkError::*;
305        match error {
306            Io(err) => Self::Io(err),
307            Decrypt(err) => Self::Decrypt(err, time_range),
308            Decompress(err) => Self::Decompress(err, time_range),
309            Decode(err) => Self::Decode(err, time_range),
310        }
311    }
312}
313
314impl From<chunk::ReadError> for Error {
315    #[inline]
316    fn from(error: chunk::ReadError) -> Self {
317        use chunk::ReadError::*;
318        match error {
319            Io(err) => Self::Io(err),
320            Invalid => Self::FileInvalid,
321            UnexpectedEnd => Self::FileIncomplete,
322        }
323    }
324}