1use std::{
2 collections::HashMap,
3 fs::File,
4 io,
5 io::{BufReader, BufWriter, Write},
6 ops::{Deref, RangeInclusive},
7 path::Path,
8};
9
10use thiserror::Error;
11
12use crate::{
13 chunk,
14 codec::Decode,
15 common::{BytesBuf, FnSink, LazyFileWriter},
16 compress::{Decompressor, ZstdDecompressor},
17 encrypt::{
18 ecdh::{ecdh_encryption_key, EMPTY_PUBLIC_KEY},
19 AesDecryptor, Decryptor,
20 },
21 DateTime, DecodingError, DecompressionError, DecryptionError, EncryptionError, EncryptionKey,
22 PublicKey, Record, SecretKey, BUFFER_LEN, FORMAT_VERSION,
23};
24
25#[derive(Error, Debug)]
27pub enum Error {
28 #[error(transparent)]
29 Io(#[from] io::Error),
30 #[error("the log file is invalid")]
31 FileInvalid,
32 #[error("the log file is incomplete")]
33 FileIncomplete,
34
35 #[error("decrypt error: {0}, in {1:?}")]
37 Decrypt(DecryptionError, RangeInclusive<DateTime>),
38 #[error("decompress error: {0}, in {1:?}")]
39 Decompress(DecompressionError, RangeInclusive<DateTime>),
40 #[error("decode error: {0}, in {1:?}")]
41 Decode(DecodingError, RangeInclusive<DateTime>),
42
43 #[error("chunk errors: {:#?}", .0.iter().map(|e|e.to_string()).collect::<Vec<_>>())]
45 Chunks(Vec<Error>),
46}
47
48pub fn parse(
51 path: impl AsRef<Path>,
52 secret_key: Option<SecretKey>,
53 callback: impl FnMut(&Record) -> Result<(), io::Error>,
54) -> Result<(), Error> {
55 let reader = BufReader::new(File::open(path.as_ref())?);
56 let mut reader = chunk::Reader::new(reader);
57
58 let parser = RecordParser::new(callback);
59 let mut processor = Processor::new(secret_key, parser);
60
61 let mut chunk_errors = Vec::new();
62
63 while let Some(header) = reader.read_header_or_reach_to_end()? {
64 if header.version() != FORMAT_VERSION {
66 continue;
67 }
68
69 let payload_len = header.payload_len();
70 let time_range = header.time_range().start()..=header.time_range().end();
71 let mut sink =
72 processor.chunk_sink(payload_len, header.pub_key(), time_range, header.writeback());
73
74 if let Err(err) = reader.read_payload(payload_len, &mut sink) {
75 if err.can_continue_to_read_chunk() {
76 chunk_errors.push(err);
77 } else {
78 return Err(err);
79 }
80 }
81 }
82
83 if chunk_errors.is_empty() {
84 Ok(())
85 } else {
86 Err(Error::Chunks(chunk_errors))
87 }
88}
89
90pub trait Format {
92 fn format(&mut self, record: &Record, writer: &mut impl Write) -> io::Result<()>;
94}
95
96#[inline]
102pub fn parse_to_file(
103 path: impl AsRef<Path>,
104 dest_path: impl AsRef<Path>,
105 secret_key: Option<SecretKey>,
106 mut formatter: impl Format,
107) -> Result<(), Error> {
108 let dest_path = dest_path.as_ref();
109 let mut writer = BufWriter::new(LazyFileWriter::new(dest_path));
110 parse(path, secret_key, |record| formatter.format(record, &mut writer))
111}
112
113pub struct DefaultFormatter;
115
116impl Format for DefaultFormatter {
117 #[inline]
118 fn format(&mut self, record: &Record, writer: &mut impl Write) -> io::Result<()> {
119 const LEVELS: [&str; 5] = ["E", "W", "I", "D", "V"];
120 let (meta, content) = (record.meta(), record.content());
121 let datetime: chrono::DateTime<chrono::Local> = meta.datetime().into();
122
123 writeln!(
124 writer,
125 "[{}] {}|{}|{}:{}|{}|{}",
126 LEVELS[meta.level() as usize - 1],
127 datetime.format("%F %T%.3f"),
128 meta.thread_id().unwrap_or(0),
129 meta.location().file().unwrap_or(""),
130 meta.location().line().unwrap_or(0),
131 meta.tag().unwrap_or(""),
132 content
133 )
134 }
135}
136
137#[derive(Error, Debug)]
140enum ChunkError {
141 #[error(transparent)]
142 Io(#[from] io::Error),
143 #[error(transparent)]
144 Decrypt(#[from] DecryptionError),
145 #[error(transparent)]
146 Decompress(#[from] DecompressionError),
147 #[error(transparent)]
148 Decode(#[from] DecodingError),
149}
150
151struct Processor<F> {
159 decompressor: ZstdDecompressor,
160 secret_key: Option<SecretKey>,
161 encryption_keys: HashMap<PublicKey, EncryptionKey>,
162 parser: RecordParser<F>,
163}
164
165impl<F> Processor<F>
166where
167 F: FnMut(&Record) -> Result<(), io::Error>,
168{
169 #[inline]
170 fn new(secret_key: Option<SecretKey>, parser: RecordParser<F>) -> Self {
171 Self {
172 decompressor: ZstdDecompressor::new(),
173 secret_key,
174 encryption_keys: HashMap::new(),
175 parser,
176 }
177 }
178
179 fn obtain_decryptor(
180 &mut self,
181 pub_key: PublicKey,
182 ) -> Result<Option<AesDecryptor>, EncryptionError> {
183 if pub_key == EMPTY_PUBLIC_KEY {
184 Ok(None)
186 } else if let Some(key) = self.encryption_keys.get(&pub_key) {
187 Ok(Some(AesDecryptor::new(key)))
189 } else {
190 if let Some(secret_key) = self.secret_key.as_ref() {
192 let key = ecdh_encryption_key(secret_key, &pub_key)?;
193 let key = self.encryption_keys.entry(pub_key).or_insert(key);
194 Ok(Some(AesDecryptor::new(key)))
195 } else {
196 Ok(None)
197 }
198 }
199 }
200
201 fn chunk_sink(
202 &mut self,
203 payload_len: usize,
204 pub_key: PublicKey,
205 time_range: RangeInclusive<DateTime>,
206 writeback: bool,
207 ) -> FnSink<impl FnMut(&[u8]) -> Result<(), Error> + '_, Error> {
208 let mut read_len = 0;
209 let mut decryptor = self.obtain_decryptor(pub_key);
210
211 FnSink::new(move |bytes: &[u8]| {
212 read_len += bytes.len();
213 let reached_to_end = read_len == payload_len;
214
215 let decryptor =
216 decryptor.as_mut().map_err(|e| Error::Decrypt(e.clone(), time_range.clone()))?;
217
218 let mut to_decompressor = FnSink::new(|bytes: &[u8]| {
219 self.decompressor.decompress(
220 bytes,
221 &mut FnSink::new(|bytes: &[u8]| self.parser.parse_all(bytes)),
222 )
223 });
224
225 decryptor
228 .decrypt(bytes, reached_to_end && !writeback, &mut to_decompressor)
229 .map_err(|e| Error::from_chunk_error(e, time_range.clone()))?;
230
231 if reached_to_end {
232 self.parser.clear_buffer();
233 }
234
235 Ok(())
236 })
237 }
238}
239
240struct RecordParser<F> {
241 callback: F,
242 buffer: BytesBuf,
243}
244
245impl<F> RecordParser<F>
246where
247 F: FnMut(&Record) -> Result<(), io::Error>,
248{
249 #[inline]
250 fn new(callback: F) -> Self {
251 Self { callback, buffer: BytesBuf::with_capacity(BUFFER_LEN) }
252 }
253
254 #[inline]
255 fn parse_all(&mut self, mut bytes: &[u8]) -> Result<(), ChunkError> {
256 while !bytes.is_empty() {
257 let len = self.parse(bytes)?;
258 bytes = &bytes[len..];
259 }
260 Ok(())
261 }
262
263 fn parse(&mut self, bytes: &[u8]) -> Result<usize, ChunkError> {
264 let len = self.buffer.buffer(bytes);
265 let mut source = self.buffer.deref();
266 let mut read_len = 0;
267
268 let res = loop {
269 if source.is_empty() {
270 break Ok(());
271 }
272 match Record::decode(&mut source) {
273 Ok(record) => {
274 read_len = self.buffer.len() - source.len();
275 if let Err(e) = (self.callback)(&record) {
276 break Err(e.into());
277 }
278 }
279 Err(ref e) if matches!(e, DecodingError::UnexpectedEnd { .. }) => break Ok(()),
281 Err(e) => break Err(e.into()),
282 }
283 };
284
285 self.buffer.drain(read_len);
286 res.map(|_| len)
287 }
288
289 fn clear_buffer(&mut self) {
290 self.buffer.clear()
291 }
292}
293
294impl Error {
295 #[inline]
296 fn can_continue_to_read_chunk(&self) -> bool {
297 matches!(self, Self::Decrypt(..))
298 || matches!(self, Self::Decompress(..))
299 || matches!(self, Self::Decode(..))
300 }
301
302 #[inline]
303 fn from_chunk_error(error: ChunkError, time_range: RangeInclusive<DateTime>) -> Self {
304 use ChunkError::*;
305 match error {
306 Io(err) => Self::Io(err),
307 Decrypt(err) => Self::Decrypt(err, time_range),
308 Decompress(err) => Self::Decompress(err, time_range),
309 Decode(err) => Self::Decode(err, time_range),
310 }
311 }
312}
313
314impl From<chunk::ReadError> for Error {
315 #[inline]
316 fn from(error: chunk::ReadError) -> Self {
317 use chunk::ReadError::*;
318 match error {
319 Io(err) => Self::Io(err),
320 Invalid => Self::FileInvalid,
321 UnexpectedEnd => Self::FileIncomplete,
322 }
323 }
324}