1use std::{
4 io,
5 ops::{Deref, DerefMut},
6 sync::{Arc, Mutex},
7};
8
9use thiserror::Error;
10
11use crate::{
12 buffer::{self, Buffer, EitherMemory, Memory},
13 chunk::Chunk,
14 codec::{AccumulationEncoder, EncodingError},
15 common,
16 compress::{CompressOp, CompressionError, Compressor, ZstdCompressor},
17 encrypt::{
18 ecdh::{self, PublicKey, EMPTY_PUBLIC_KEY},
19 AesEncryptor, EncryptOp, EncryptionError, Encryptor,
20 },
21 logfile::{self, Logfile},
22 mmap::Mmap,
23 runloop::{self, Handle as RunloopHandle, Runloop},
24 ChunkError, Config, Domain, Record, RunloopError, TimeDimension, Tracker,
25 MMAP_BUFFER_EXTENSION,
26};
27
28#[derive(Error, Debug)]
32pub enum Error {
33 #[error("encoding: {0}")]
34 Encode(#[from] EncodingError),
35 #[error("compression: {0}")]
36 Compress(#[from] CompressionError),
37 #[error("encryption: {0}")]
38 Encrypt(#[from] EncryptionError),
39 #[error("chunk: {0}")]
40 Chunk(#[from] ChunkError),
41 #[error("IO runloop: {0}")]
42 IoRunloop(#[from] RunloopError),
43 #[error("IO: {0}")]
44 Io(#[from] io::Error),
45}
46
47pub struct Logger {
49 inner: Mutex<LoggerInner>,
50}
51
52impl Logger {
53 #[inline]
55 pub fn new(domain: Domain, config: Config) -> Self {
56 Self { inner: Mutex::new(LoggerInner::new_inner(domain, config)) }
57 }
58
59 #[inline]
63 pub fn log(&self, record: &Record) {
64 self.inner.lock().unwrap().on(Operation::Input(record));
65 }
66
67 #[inline]
71 pub fn flush(&self) {
72 self.inner.lock().unwrap().on(Operation::Rotate);
73 }
74
75 #[inline]
79 pub fn trim(&self, lifetime: u64) {
80 self.inner.lock().unwrap().trim(lifetime);
81 }
82
83 #[inline]
87 pub fn shutdown(self) {
88 let mut inner = self.inner.into_inner().unwrap();
89 inner.on(Operation::Rotate);
90 inner.shutdown();
91 }
92}
93
94struct Context {
98 domain: Arc<Domain>,
99 pub_key: PublicKey,
100 rotation: TimeDimension,
101 tracker: Option<Tracker>,
102}
103
104impl Context {
105 #[inline]
106 fn new(
107 domain: Domain,
108 pub_key: Option<PublicKey>,
109 rotation: TimeDimension,
110 tracker: Option<Tracker>,
111 ) -> Self {
112 Self {
113 domain: Arc::new(domain),
114 pub_key: pub_key.unwrap_or(EMPTY_PUBLIC_KEY),
115 rotation,
116 tracker,
117 }
118 }
119
120 #[inline]
122 pub(crate) fn rotate_chunk<B>(&self, chunk: &Chunk<B>, new_record: &Record) -> bool
123 where
124 B: Deref<Target = [u8]>,
125 {
126 !self.chunk_dimension().check_match(chunk.start_datetime(), new_record.meta().datetime())
127 }
128
129 #[inline]
131 pub(crate) fn rotate_file<B>(&self, logfile: &Logfile, new_chunk: &Chunk<B>) -> bool
132 where
133 B: Deref<Target = [u8]>,
134 {
135 !self.file_dimension().check_match(new_chunk.start_datetime(), logfile.datetime())
136 }
137
138 #[inline]
140 fn chunk_dimension(&self) -> TimeDimension {
141 self.rotation
142 }
143
144 #[inline]
146 fn file_dimension(&self) -> TimeDimension {
147 match self.chunk_dimension() {
148 TimeDimension::Minute => TimeDimension::Hour,
149 TimeDimension::Hour => TimeDimension::Day,
150 TimeDimension::Day => TimeDimension::Day,
151 }
152 }
153}
154
155macro_rules! track {
157 ($tracker:expr) => {{
158 |err| {
159 if let Some(ref tracker) = $tracker {
160 tracker.track(err.into(), file!(), line!());
161 }
162 }
163 }};
164}
165
166type LoggerInner = Core<Option<ZstdCompressor>, Option<AesEncryptor>, EitherMemory>;
172
173impl LoggerInner {
174 #[inline]
175 pub fn new_inner(domain: Domain, config: Config) -> Self {
176 let memory = Self::initialize_memory(&domain, &config);
177
178 let keys =
179 config.key.and_then(|k| ecdh::Keys::new(&k).map_err(track!(config.tracker)).ok());
180 let encryptor = keys.as_ref().map(|k| AesEncryptor::new(&k.encryption_key));
181
182 let compressor =
183 ZstdCompressor::new(config.compression_level).map_err(track!(config.tracker)).ok();
184
185 let context =
186 Context::new(domain, keys.map(|k| k.public_key), config.rotation, config.tracker);
187
188 Self::new(context, compressor, encryptor, memory)
189 }
190
191 fn initialize_memory(domain: &Domain, config: &Config) -> EitherMemory {
192 config
193 .use_mmap
194 .then(|| {
195 let path =
196 domain.directory.join(&domain.identifier).with_extension(MMAP_BUFFER_EXTENSION);
197 Mmap::new(path, config.buffer_len).map(EitherMemory::Mmap)
198 })
199 .and_then(|mmap| mmap.map_err(track!(config.tracker)).ok())
200 .unwrap_or_else(|| {
201 let mut vec = Vec::with_capacity(config.buffer_len);
202 #[allow(clippy::uninit_vec)]
203 unsafe {
204 vec.set_len(config.buffer_len);
205 }
206 EitherMemory::Vec(vec)
207 })
208 }
209}
210
211#[derive(Clone, Copy)]
213enum Operation<'a> {
214 Input(&'a Record<'a>),
215 Rotate,
216 Writeback,
217}
218
219struct Core<C, E, M> {
221 context: Arc<Context>,
222 processor: Processor<C, E>,
223 buffer: Buffer<M>,
224 io_runloop: Runloop<IoEvent>,
225}
226
227impl<C, E, M> Core<C, E, M>
228where
229 C: Compressor,
230 E: Encryptor,
231 M: Memory,
232{
233 fn new(context: Context, compressor: C, encryptor: E, memory: M) -> Self {
234 let context = Arc::new(context);
235 let processor = Processor::new(compressor, encryptor);
236
237 let (input_buffer, output_buffer) = Self::initialize_buffer(memory, &context);
238 let io_runloop = Io::new(Arc::clone(&context), output_buffer).run();
239
240 let mut core = Self { context, processor, buffer: input_buffer, io_runloop };
241 core.on(Operation::Writeback);
243
244 core
245 }
246
247 fn initialize_buffer(memory: M, context: &Context) -> (Buffer<M>, Buffer<M>) {
248 let (mut input, mut output) = buffer::initialize(memory);
249 {
250 let (mut input_chunk, mut output_chunk) =
251 (Chunk::bind(input.handle()), Chunk::bind(output.handle()));
252
253 if !input_chunk.validate() || !output_chunk.validate() {
257 let now = chrono::Utc::now();
258 input_chunk.initialize(now, context.pub_key);
259 output_chunk.initialize(now, context.pub_key);
260 }
261 }
262 (input, output)
263 }
264
265 fn on(&mut self, operation: Operation) {
266 let mut chunk = Chunk::bind(self.buffer.handle());
267
268 let write_operation = match operation {
269 Operation::Rotate => Some(operation),
270 Operation::Writeback => (chunk.payload_len() > 0).then(|| {
272 chunk.set_writeback();
273 operation
274 }),
275 Operation::Input(record) => (chunk.is_almost_full()
277 || self.context.rotate_chunk(&chunk, record))
278 .then_some(Operation::Rotate),
279 };
280
281 if let Some(write_operation) = write_operation {
282 self.processor
283 .process(write_operation, &mut chunk)
284 .unwrap_or_else(track!(self.context.tracker));
285
286 if chunk.payload_len() > 0 {
290 drop(chunk);
292 self.buffer.switch();
293 chunk = Chunk::bind(self.buffer.handle());
294
295 self.io_runloop
297 .on(IoEvent::WriteChunk)
298 .unwrap_or_else(track!(self.context.tracker));
299 }
300
301 let datetime = match operation {
303 Operation::Input(record) => record.meta().datetime(),
304 Operation::Rotate | Operation::Writeback => chrono::Utc::now(),
305 };
306 chunk.initialize(datetime, self.context.pub_key);
307 }
308
309 if let Operation::Input(record) = operation {
310 self.processor
311 .process(Operation::Input(record), &mut chunk)
312 .unwrap_or_else(track!(self.context.tracker));
313 }
314 }
315
316 #[inline]
317 fn trim(&mut self, lifetime: u64) {
318 self.io_runloop.on(IoEvent::Trim { lifetime }).unwrap_or_else(track!(self.context.tracker));
319 }
320
321 #[inline]
322 fn shutdown(self) {
323 self.io_runloop.on(IoEvent::Shutdown).unwrap_or_else(track!(self.context.tracker));
324 _ = self.io_runloop.join();
325 }
326}
327
328struct Processor<C, E> {
338 encoder: AccumulationEncoder,
339 compressor: C,
340 encryptor: E,
341}
342
343impl<C, E> Processor<C, E>
344where
345 C: Compressor,
346 E: Encryptor,
347{
348 const ENCODER_BUFFER_LEN: usize = 256;
352
353 #[inline]
354 fn new(compressor: C, encryptor: E) -> Self {
355 let encoder = AccumulationEncoder::new(Self::ENCODER_BUFFER_LEN);
356 Self { encoder, compressor, encryptor }
357 }
358
359 fn process<B>(&mut self, operation: Operation, chunk: &mut Chunk<B>) -> Result<(), Error>
360 where
361 B: DerefMut<Target = [u8]>,
362 {
363 type FnSink<F> = common::FnSink<F, Error>;
364
365 let mut to_chunk = FnSink::new(|bytes: &[u8]| chunk.write(bytes).map_err(Into::into));
366
367 let mut to_encryptor = FnSink::new(|bytes: &[u8]| {
368 self.encryptor.encrypt(EncryptOp::Input(bytes), &mut to_chunk)
369 });
370
371 let mut to_compressor = FnSink::new(|bytes: &[u8]| {
372 self.compressor.compress(CompressOp::Input(bytes), &mut to_encryptor)
373 });
374
375 match operation {
376 Operation::Input(record) => {
377 self.encoder.encode(record, &mut to_compressor)?;
378 self.compressor.compress(CompressOp::Flush, &mut to_encryptor)?;
379 chunk.set_end_datetime(record.meta().datetime());
380 }
381
382 Operation::Rotate => {
383 self.compressor.compress(CompressOp::End, &mut to_encryptor)?;
384 self.encryptor.encrypt(EncryptOp::Flush, &mut to_chunk)?;
385 }
386
387 Operation::Writeback => { }
388 }
389
390 Ok(())
391 }
392}
393
394struct Io<M> {
399 context: Arc<Context>,
400 buffer: Buffer<M>,
401 logfile: Option<Logfile>,
402}
403
404enum IoEvent {
406 WriteChunk,
408 Trim { lifetime: u64 },
410 Shutdown,
412}
413
414impl<M> Io<M>
415where
416 M: Memory,
417{
418 #[inline]
419 fn new(context: Arc<Context>, buffer: Buffer<M>) -> Self {
420 let mut io = Io { context, buffer, logfile: None };
421 if Chunk::bind(io.buffer.handle()).payload_len() > 0 {
423 io.write_chunk();
424 }
425 io
426 }
427
428 fn write_chunk(&mut self) {
430 let mut chunk = Chunk::bind(self.buffer.handle());
431 if chunk.payload_len() == 0 {
433 return;
434 }
435
436 self.logfile.take_if(|f| self.context.rotate_file(f, &chunk));
437
438 let logfile = if let Some(logfile) = &mut self.logfile {
439 logfile
440 } else {
441 self.logfile = Some(Logfile::new(
442 Arc::clone(&self.context.domain),
443 chunk.start_datetime(),
444 logfile::Mode::Write,
445 ));
446 unsafe { self.logfile.as_mut().unwrap_unchecked() }
449 };
450
451 logfile.write(&chunk).unwrap_or_else(track!(self.context.tracker));
452 logfile.flush().unwrap_or_else(track!(self.context.tracker));
453
454 chunk.clear();
457 }
458
459 #[inline]
461 fn trim(&mut self, lifetime: u64) {
462 let expires = chrono::Utc::now().timestamp().saturating_sub_unsigned(lifetime);
463
464 if let Ok(logfiles) = Logfile::logfiles(&self.context.domain, logfile::Mode::Read)
465 .map_err(track!(self.context.tracker))
466 {
467 logfiles
468 .filter(|f| f.datetime().timestamp() < expires)
469 .for_each(|file| file.delete().unwrap_or_else(track!(self.context.tracker)));
470 }
471 }
472}
473
474impl<M> RunloopHandle for Io<M>
475where
476 M: Memory,
477{
478 type Event = IoEvent;
479
480 #[inline]
481 fn handle(&mut self, event: Self::Event, context: &mut runloop::Context) {
482 match event {
483 IoEvent::WriteChunk => self.write_chunk(),
484 IoEvent::Trim { lifetime } => self.trim(lifetime),
485 IoEvent::Shutdown => context.stop(),
486 }
487 }
488}
489
490#[cfg(test)]
491mod tests {
492 use crate::{
493 chunk, chunk::Chunk, codec::Decode, compress::ZstdCompressor, encrypt::AesEncryptor,
494 logger, logger::Operation, Record, RecordBuilder,
495 };
496
497 #[test]
498 fn test_processor() {
499 type Processor = logger::Processor<Option<ZstdCompressor>, Option<AesEncryptor>>;
500 let mut processor = Processor::new(None, None);
501
502 let mut memory = Vec::<u8>::with_capacity(256);
503 unsafe {
504 memory.set_len(256);
505 }
506
507 fn test_process<'a>(
508 processor: &mut Processor,
509 memory: &mut Vec<u8>,
510 contents: impl IntoIterator<Item = &'a str>,
511 ) {
512 let mut chunk = Chunk::bind(memory.as_mut_slice());
513 chunk.initialize(chrono::Utc::now(), [0; 33]);
514
515 let records = contents
516 .into_iter()
517 .map(|c| RecordBuilder::new().content(c).build())
518 .collect::<Vec<_>>();
519
520 for record in &records {
521 processor.process(Operation::Input(&record), &mut chunk).unwrap();
522 }
523 processor.process(Operation::Rotate, &mut chunk).unwrap();
524
525 let payload_len = chunk.payload_len();
526 let mut payload = &memory[chunk::Header::LEN..chunk::Header::LEN + payload_len];
527
528 for record in records {
529 let new_record = Record::decode(&mut payload).unwrap();
530 assert_eq!(record, new_record);
531 }
532
533 assert_eq!(payload.len(), 0);
534 }
535
536 test_process(&mut processor, &mut memory, []);
537 test_process(&mut processor, &mut memory, ["Hello", "World"]);
538 }
539}