use std::{
collections::HashMap,
fs::File,
io,
io::{BufReader, BufWriter, Write},
ops::{Deref, RangeInclusive},
path::Path,
};
use thiserror::Error;
use crate::{
chunk,
codec::Decode,
common::{BytesBuf, FnSink, LazyFileWriter},
compress::{Decompressor, ZstdDecompressor},
encrypt::{
ecdh::{ecdh_encryption_key, EMPTY_PUBLIC_KEY},
AesDecryptor, Decryptor,
},
DateTime, DecodingError, DecompressionError, DecryptionError, EncryptionError, EncryptionKey,
PublicKey, Record, SecretKey, BUFFER_LEN, FORMAT_VERSION,
};
#[derive(Error, Debug)]
pub enum Error {
#[error(transparent)]
Io(#[from] io::Error),
#[error("the log file is invalid")]
FileInvalid,
#[error("the log file is incomplete")]
FileIncomplete,
#[error("decrypt error: {0}, in {1:?}")]
Decrypt(DecryptionError, RangeInclusive<DateTime>),
#[error("decompress error: {0}, in {1:?}")]
Decompress(DecompressionError, RangeInclusive<DateTime>),
#[error("decode error: {0}, in {1:?}")]
Decode(DecodingError, RangeInclusive<DateTime>),
#[error("chunk errors: {:#?}", .0.iter().map(|e|e.to_string()).collect::<Vec<_>>())]
Chunks(Vec<Error>),
}
pub fn parse(
path: impl AsRef<Path>,
secret_key: Option<SecretKey>,
callback: impl FnMut(&Record) -> Result<(), io::Error>,
) -> Result<(), Error> {
let reader = BufReader::new(File::open(path.as_ref())?);
let mut reader = chunk::Reader::new(reader);
let parser = RecordParser::new(callback);
let mut processor = Processor::new(secret_key, parser);
let mut chunk_errors = Vec::new();
while let Some(header) = reader.read_header_or_reach_to_end()? {
if header.version() != FORMAT_VERSION {
continue;
}
let payload_len = header.payload_len();
let time_range = header.time_range().start()..=header.time_range().end();
let mut sink =
processor.chunk_sink(payload_len, header.pub_key(), time_range, header.writeback());
if let Err(err) = reader.read_payload(payload_len, &mut sink) {
if err.can_continue_to_read_chunk() {
chunk_errors.push(err);
} else {
return Err(err);
}
}
}
if chunk_errors.is_empty() {
Ok(())
} else {
Err(Error::Chunks(chunk_errors))
}
}
pub trait Format {
fn format(&mut self, record: &Record, writer: &mut impl Write) -> io::Result<()>;
}
#[inline]
pub fn parse_to_file(
path: impl AsRef<Path>,
dest_path: impl AsRef<Path>,
secret_key: Option<SecretKey>,
mut formatter: impl Format,
) -> Result<(), Error> {
let dest_path = dest_path.as_ref();
let mut writer = BufWriter::new(LazyFileWriter::new(dest_path));
parse(path, secret_key, |record| formatter.format(record, &mut writer))
}
pub struct DefaultFormatter;
impl Format for DefaultFormatter {
#[inline]
fn format(&mut self, record: &Record, writer: &mut impl Write) -> io::Result<()> {
const LEVELS: [&str; 5] = ["E", "W", "I", "D", "V"];
let (meta, content) = (record.meta(), record.content());
let datetime: chrono::DateTime<chrono::Local> = meta.datetime().into();
writeln!(
writer,
"[{}] {}|{}|{}:{}|{}|{}",
LEVELS[meta.level() as usize - 1],
datetime.format("%F %T%.3f"),
meta.thread_id().unwrap_or(0),
meta.location().file().unwrap_or(""),
meta.location().line().unwrap_or(0),
meta.tag().unwrap_or(""),
content
)
}
}
#[derive(Error, Debug)]
enum ChunkError {
#[error(transparent)]
Io(#[from] io::Error),
#[error(transparent)]
Decrypt(#[from] DecryptionError),
#[error(transparent)]
Decompress(#[from] DecompressionError),
#[error(transparent)]
Decode(#[from] DecodingError),
}
struct Processor<F> {
decompressor: ZstdDecompressor,
secret_key: Option<SecretKey>,
encryption_keys: HashMap<PublicKey, EncryptionKey>,
parser: RecordParser<F>,
}
impl<F> Processor<F>
where
F: FnMut(&Record) -> Result<(), io::Error>,
{
#[inline]
fn new(secret_key: Option<SecretKey>, parser: RecordParser<F>) -> Self {
Self {
decompressor: ZstdDecompressor::new(),
secret_key,
encryption_keys: HashMap::new(),
parser,
}
}
fn obtain_decryptor(
&mut self,
pub_key: PublicKey,
) -> Result<Option<AesDecryptor>, EncryptionError> {
if pub_key == EMPTY_PUBLIC_KEY {
Ok(None)
} else if let Some(key) = self.encryption_keys.get(&pub_key) {
Ok(Some(AesDecryptor::new(key)))
} else {
if let Some(secret_key) = self.secret_key.as_ref() {
let key = ecdh_encryption_key(secret_key, &pub_key)?;
let key = self.encryption_keys.entry(pub_key).or_insert(key);
Ok(Some(AesDecryptor::new(key)))
} else {
Ok(None)
}
}
}
fn chunk_sink(
&mut self,
payload_len: usize,
pub_key: PublicKey,
time_range: RangeInclusive<DateTime>,
writeback: bool,
) -> FnSink<impl FnMut(&[u8]) -> Result<(), Error> + '_, Error> {
let mut read_len = 0;
let mut decryptor = self.obtain_decryptor(pub_key);
FnSink::new(move |bytes: &[u8]| {
read_len += bytes.len();
let reached_to_end = read_len == payload_len;
let decryptor =
decryptor.as_mut().map_err(|e| Error::Decrypt(e.clone(), time_range.clone()))?;
let mut to_decompressor = FnSink::new(|bytes: &[u8]| {
self.decompressor.decompress(
bytes,
&mut FnSink::new(|bytes: &[u8]| self.parser.parse_all(bytes)),
)
});
decryptor
.decrypt(bytes, reached_to_end && !writeback, &mut to_decompressor)
.map_err(|e| Error::from_chunk_error(e, time_range.clone()))?;
if reached_to_end {
self.parser.clear_buffer();
}
Ok(())
})
}
}
struct RecordParser<F> {
callback: F,
buffer: BytesBuf,
}
impl<F> RecordParser<F>
where
F: FnMut(&Record) -> Result<(), io::Error>,
{
#[inline]
fn new(callback: F) -> Self {
Self { callback, buffer: BytesBuf::with_capacity(BUFFER_LEN) }
}
#[inline]
fn parse_all(&mut self, mut bytes: &[u8]) -> Result<(), ChunkError> {
while !bytes.is_empty() {
let len = self.parse(bytes)?;
bytes = &bytes[len..];
}
Ok(())
}
fn parse(&mut self, bytes: &[u8]) -> Result<usize, ChunkError> {
let len = self.buffer.buffer(bytes);
let mut source = self.buffer.deref();
let mut read_len = 0;
let res = loop {
if source.is_empty() {
break Ok(());
}
match Record::decode(&mut source) {
Ok(record) => {
read_len = self.buffer.len() - source.len();
if let Err(e) = (self.callback)(&record) {
break Err(e.into());
}
}
Err(ref e) if matches!(e, DecodingError::UnexpectedEnd { .. }) => break Ok(()),
Err(e) => break Err(e.into()),
}
};
self.buffer.drain(read_len);
res.map(|_| len)
}
fn clear_buffer(&mut self) {
self.buffer.clear()
}
}
impl Error {
#[inline]
fn can_continue_to_read_chunk(&self) -> bool {
matches!(self, Self::Decrypt(..))
|| matches!(self, Self::Decompress(..))
|| matches!(self, Self::Decode(..))
}
#[inline]
fn from_chunk_error(error: ChunkError, time_range: RangeInclusive<DateTime>) -> Self {
use ChunkError::*;
match error {
Io(err) => Self::Io(err),
Decrypt(err) => Self::Decrypt(err, time_range),
Decompress(err) => Self::Decompress(err, time_range),
Decode(err) => Self::Decode(err, time_range),
}
}
}
impl From<chunk::ReadError> for Error {
#[inline]
fn from(error: chunk::ReadError) -> Self {
use chunk::ReadError::*;
match error {
Io(err) => Self::Io(err),
Invalid => Self::FileInvalid,
UnexpectedEnd => Self::FileIncomplete,
}
}
}