pinenut_log/
logger.rs

1//! The `Logger` implementation.
2
3use std::{
4    io,
5    ops::{Deref, DerefMut},
6    sync::{Arc, Mutex},
7};
8
9use thiserror::Error;
10
11use crate::{
12    buffer::{self, Buffer, EitherMemory, Memory},
13    chunk::Chunk,
14    codec::{AccumulationEncoder, EncodingError},
15    common,
16    compress::{CompressOp, CompressionError, Compressor, ZstdCompressor},
17    encrypt::{
18        ecdh::{self, PublicKey, EMPTY_PUBLIC_KEY},
19        AesEncryptor, EncryptOp, EncryptionError, Encryptor,
20    },
21    logfile::{self, Logfile},
22    mmap::Mmap,
23    runloop::{self, Handle as RunloopHandle, Runloop},
24    ChunkError, Config, Domain, Record, RunloopError, TimeDimension, Tracker,
25    MMAP_BUFFER_EXTENSION,
26};
27
28/// The error type for [`Logger`].
29///
30/// Errors occurred in the logger can be track by the specified [`Tracker`].
31#[derive(Error, Debug)]
32pub enum Error {
33    #[error("encoding: {0}")]
34    Encode(#[from] EncodingError),
35    #[error("compression: {0}")]
36    Compress(#[from] CompressionError),
37    #[error("encryption: {0}")]
38    Encrypt(#[from] EncryptionError),
39    #[error("chunk: {0}")]
40    Chunk(#[from] ChunkError),
41    #[error("IO runloop: {0}")]
42    IoRunloop(#[from] RunloopError),
43    #[error("IO: {0}")]
44    Io(#[from] io::Error),
45}
46
47/// The `Pinenut` logger.
48pub struct Logger {
49    inner: Mutex<LoggerInner>,
50}
51
52impl Logger {
53    /// Constructs a new `Logger`.
54    #[inline]
55    pub fn new(domain: Domain, config: Config) -> Self {
56        Self { inner: Mutex::new(LoggerInner::new_inner(domain, config)) }
57    }
58
59    /// Logs the record.
60    ///
61    /// The low-level IO operations are performed asynchronously.
62    #[inline]
63    pub fn log(&self, record: &Record) {
64        self.inner.lock().unwrap().on(Operation::Input(record));
65    }
66
67    /// Flushes any buffered records asynchronously.
68    ///
69    /// The low-level IO operations are performed asynchronously.
70    #[inline]
71    pub fn flush(&self) {
72        self.inner.lock().unwrap().on(Operation::Rotate);
73    }
74
75    /// Deletes the expired log files with lifetime (seconds).
76    ///
77    /// The low-level IO operations are performed asynchronously.
78    #[inline]
79    pub fn trim(&self, lifetime: u64) {
80        self.inner.lock().unwrap().trim(lifetime);
81    }
82
83    /// Flushes then Shuts down the logger.
84    ///
85    /// All asynchronous IO operations will be waiting to complete.
86    #[inline]
87    pub fn shutdown(self) {
88        let mut inner = self.inner.into_inner().unwrap();
89        inner.on(Operation::Rotate);
90        inner.shutdown();
91    }
92}
93
94// ============ Internal ============
95
96/// Represents the logger context.
97struct Context {
98    domain: Arc<Domain>,
99    pub_key: PublicKey,
100    rotation: TimeDimension,
101    tracker: Option<Tracker>,
102}
103
104impl Context {
105    #[inline]
106    fn new(
107        domain: Domain,
108        pub_key: Option<PublicKey>,
109        rotation: TimeDimension,
110        tracker: Option<Tracker>,
111    ) -> Self {
112        Self {
113            domain: Arc::new(domain),
114            pub_key: pub_key.unwrap_or(EMPTY_PUBLIC_KEY),
115            rotation,
116            tracker,
117        }
118    }
119
120    /// Determines whether the chunk needs to be rotated.
121    #[inline]
122    pub(crate) fn rotate_chunk<B>(&self, chunk: &Chunk<B>, new_record: &Record) -> bool
123    where
124        B: Deref<Target = [u8]>,
125    {
126        !self.chunk_dimension().check_match(chunk.start_datetime(), new_record.meta().datetime())
127    }
128
129    /// Determines whether the log file needs to be rotated.
130    #[inline]
131    pub(crate) fn rotate_file<B>(&self, logfile: &Logfile, new_chunk: &Chunk<B>) -> bool
132    where
133        B: Deref<Target = [u8]>,
134    {
135        !self.file_dimension().check_match(new_chunk.start_datetime(), logfile.datetime())
136    }
137
138    /// Time dimension for chunk rotation.
139    #[inline]
140    fn chunk_dimension(&self) -> TimeDimension {
141        self.rotation
142    }
143
144    /// Time dimension for log file rotation.
145    #[inline]
146    fn file_dimension(&self) -> TimeDimension {
147        match self.chunk_dimension() {
148            TimeDimension::Minute => TimeDimension::Hour,
149            TimeDimension::Hour => TimeDimension::Day,
150            TimeDimension::Day => TimeDimension::Day,
151        }
152    }
153}
154
155/// Returns a closure that reports the error to tracker.
156macro_rules! track {
157    ($tracker:expr) => {{
158        |err| {
159            if let Some(ref tracker) = $tracker {
160                tracker.track(err.into(), file!(), line!());
161            }
162        }
163    }};
164}
165
166/// The `Core Logger` associated with the specified `Compressor`, `Encryptor` and
167/// `Memory`.
168///
169/// The current version of `Pinenut` will use the `zstd` compression algorithm and
170/// `AES` encryption algorithm to process the logs.
171type LoggerInner = Core<Option<ZstdCompressor>, Option<AesEncryptor>, EitherMemory>;
172
173impl LoggerInner {
174    #[inline]
175    pub fn new_inner(domain: Domain, config: Config) -> Self {
176        let memory = Self::initialize_memory(&domain, &config);
177
178        let keys =
179            config.key.and_then(|k| ecdh::Keys::new(&k).map_err(track!(config.tracker)).ok());
180        let encryptor = keys.as_ref().map(|k| AesEncryptor::new(&k.encryption_key));
181
182        let compressor =
183            ZstdCompressor::new(config.compression_level).map_err(track!(config.tracker)).ok();
184
185        let context =
186            Context::new(domain, keys.map(|k| k.public_key), config.rotation, config.tracker);
187
188        Self::new(context, compressor, encryptor, memory)
189    }
190
191    fn initialize_memory(domain: &Domain, config: &Config) -> EitherMemory {
192        config
193            .use_mmap
194            .then(|| {
195                let path =
196                    domain.directory.join(&domain.identifier).with_extension(MMAP_BUFFER_EXTENSION);
197                Mmap::new(path, config.buffer_len).map(EitherMemory::Mmap)
198            })
199            .and_then(|mmap| mmap.map_err(track!(config.tracker)).ok())
200            .unwrap_or_else(|| {
201                let mut vec = Vec::with_capacity(config.buffer_len);
202                #[allow(clippy::uninit_vec)]
203                unsafe {
204                    vec.set_len(config.buffer_len);
205                }
206                EitherMemory::Vec(vec)
207            })
208    }
209}
210
211/// Operation for `Core` and `Processor`.
212#[derive(Clone, Copy)]
213enum Operation<'a> {
214    Input(&'a Record<'a>),
215    Rotate,
216    Writeback,
217}
218
219/// Represents the `Core Logger`.
220struct Core<C, E, M> {
221    context: Arc<Context>,
222    processor: Processor<C, E>,
223    buffer: Buffer<M>,
224    io_runloop: Runloop<IoEvent>,
225}
226
227impl<C, E, M> Core<C, E, M>
228where
229    C: Compressor,
230    E: Encryptor,
231    M: Memory,
232{
233    fn new(context: Context, compressor: C, encryptor: E, memory: M) -> Self {
234        let context = Arc::new(context);
235        let processor = Processor::new(compressor, encryptor);
236
237        let (input_buffer, output_buffer) = Self::initialize_buffer(memory, &context);
238        let io_runloop = Io::new(Arc::clone(&context), output_buffer).run();
239
240        let mut core = Self { context, processor, buffer: input_buffer, io_runloop };
241        // Attempts to write previously unwritten chunk to the logfile.
242        core.on(Operation::Writeback);
243
244        core
245    }
246
247    fn initialize_buffer(memory: M, context: &Context) -> (Buffer<M>, Buffer<M>) {
248        let (mut input, mut output) = buffer::initialize(memory);
249        {
250            let (mut input_chunk, mut output_chunk) =
251                (Chunk::bind(input.handle()), Chunk::bind(output.handle()));
252
253            // If either side of the buffer is invalid, both sides need to be initialized.
254            // Due to the internal structure of the double buffer system, when the buffer length
255            // configuration is changed, one chunk must be invalid.
256            if !input_chunk.validate() || !output_chunk.validate() {
257                let now = chrono::Utc::now();
258                input_chunk.initialize(now, context.pub_key);
259                output_chunk.initialize(now, context.pub_key);
260            }
261        }
262        (input, output)
263    }
264
265    fn on(&mut self, operation: Operation) {
266        let mut chunk = Chunk::bind(self.buffer.handle());
267
268        let write_operation = match operation {
269            Operation::Rotate => Some(operation),
270            // Writes back if chunk payload is not empty.
271            Operation::Writeback => (chunk.payload_len() > 0).then(|| {
272                chunk.set_writeback();
273                operation
274            }),
275            // Checks if rotation is required.
276            Operation::Input(record) => (chunk.is_almost_full()
277                || self.context.rotate_chunk(&chunk, record))
278            .then_some(Operation::Rotate),
279        };
280
281        if let Some(write_operation) = write_operation {
282            self.processor
283                .process(write_operation, &mut chunk)
284                .unwrap_or_else(track!(self.context.tracker));
285
286            // If the length of the chunk is greater than 0, it means that there are bytes to be
287            // written to the file, we need to switch the buffer and perform IO write operation,
288            // otherwise we can reuse the chunk and not perform IO write operation.
289            if chunk.payload_len() > 0 {
290                // Switches the double buffering system then rebinds the chunk.
291                drop(chunk);
292                self.buffer.switch();
293                chunk = Chunk::bind(self.buffer.handle());
294
295                // Performs asynchronous file write IO operation.
296                self.io_runloop
297                    .on(IoEvent::WriteChunk)
298                    .unwrap_or_else(track!(self.context.tracker));
299            }
300
301            // Re-initialize the chunk.
302            let datetime = match operation {
303                Operation::Input(record) => record.meta().datetime(),
304                Operation::Rotate | Operation::Writeback => chrono::Utc::now(),
305            };
306            chunk.initialize(datetime, self.context.pub_key);
307        }
308
309        if let Operation::Input(record) = operation {
310            self.processor
311                .process(Operation::Input(record), &mut chunk)
312                .unwrap_or_else(track!(self.context.tracker));
313        }
314    }
315
316    #[inline]
317    fn trim(&mut self, lifetime: u64) {
318        self.io_runloop.on(IoEvent::Trim { lifetime }).unwrap_or_else(track!(self.context.tracker));
319    }
320
321    #[inline]
322    fn shutdown(self) {
323        self.io_runloop.on(IoEvent::Shutdown).unwrap_or_else(track!(self.context.tracker));
324        _ = self.io_runloop.join();
325    }
326}
327
328/// The Log processor. It processes the log record step by step.
329///
330/// # Workflow
331///
332/// ```plain
333/// ┌──────────┐   ┌──────────┐   ┌───────────┐   ┌───────────────────────────┐
334/// │  Encode  │──▶│ Compress │──▶│  Encrypt  │──▶│  Write to Chunk (Buffer)  │
335/// └──────────┘   └──────────┘   └───────────┘   └───────────────────────────┘
336/// ```
337struct Processor<C, E> {
338    encoder: AccumulationEncoder,
339    compressor: C,
340    encryptor: E,
341}
342
343impl<C, E> Processor<C, E>
344where
345    C: Compressor,
346    E: Encryptor,
347{
348    /// Length of `encoder buffer`.
349    ///
350    /// A buffer of 256 bytes should be sufficient for encoding of a log.
351    const ENCODER_BUFFER_LEN: usize = 256;
352
353    #[inline]
354    fn new(compressor: C, encryptor: E) -> Self {
355        let encoder = AccumulationEncoder::new(Self::ENCODER_BUFFER_LEN);
356        Self { encoder, compressor, encryptor }
357    }
358
359    fn process<B>(&mut self, operation: Operation, chunk: &mut Chunk<B>) -> Result<(), Error>
360    where
361        B: DerefMut<Target = [u8]>,
362    {
363        type FnSink<F> = common::FnSink<F, Error>;
364
365        let mut to_chunk = FnSink::new(|bytes: &[u8]| chunk.write(bytes).map_err(Into::into));
366
367        let mut to_encryptor = FnSink::new(|bytes: &[u8]| {
368            self.encryptor.encrypt(EncryptOp::Input(bytes), &mut to_chunk)
369        });
370
371        let mut to_compressor = FnSink::new(|bytes: &[u8]| {
372            self.compressor.compress(CompressOp::Input(bytes), &mut to_encryptor)
373        });
374
375        match operation {
376            Operation::Input(record) => {
377                self.encoder.encode(record, &mut to_compressor)?;
378                self.compressor.compress(CompressOp::Flush, &mut to_encryptor)?;
379                chunk.set_end_datetime(record.meta().datetime());
380            }
381
382            Operation::Rotate => {
383                self.compressor.compress(CompressOp::End, &mut to_encryptor)?;
384                self.encryptor.encrypt(EncryptOp::Flush, &mut to_chunk)?;
385            }
386
387            Operation::Writeback => { /* Do nothing on writeback. */ }
388        }
389
390        Ok(())
391    }
392}
393
394/// The IO handler. It is responsible for all file IO interactions.
395///
396/// It implements the [`runloop::Handle`] trait so that it can invoke a runloop to
397/// handle IO events asynchronously.
398struct Io<M> {
399    context: Arc<Context>,
400    buffer: Buffer<M>,
401    logfile: Option<Logfile>,
402}
403
404/// IO events that the [`Io`] handler can receive.
405enum IoEvent {
406    /// Writes chunk to log file.
407    WriteChunk,
408    /// Deletes the expired log files.
409    Trim { lifetime: u64 },
410    /// Shuts down the IO handler.
411    Shutdown,
412}
413
414impl<M> Io<M>
415where
416    M: Memory,
417{
418    #[inline]
419    fn new(context: Arc<Context>, buffer: Buffer<M>) -> Self {
420        let mut io = Io { context, buffer, logfile: None };
421        // Attempts to write previously unwritten chunk to the logfile.
422        if Chunk::bind(io.buffer.handle()).payload_len() > 0 {
423            io.write_chunk();
424        }
425        io
426    }
427
428    /// Writes chunk to log file.
429    fn write_chunk(&mut self) {
430        let mut chunk = Chunk::bind(self.buffer.handle());
431        // The chunk is empty, there is no need to write to the logfile.
432        if chunk.payload_len() == 0 {
433            return;
434        }
435
436        self.logfile.take_if(|f| self.context.rotate_file(f, &chunk));
437
438        let logfile = if let Some(logfile) = &mut self.logfile {
439            logfile
440        } else {
441            self.logfile = Some(Logfile::new(
442                Arc::clone(&self.context.domain),
443                chunk.start_datetime(),
444                logfile::Mode::Write,
445            ));
446            // SAFETY: a `None` variant for `logfile` would have been replaced by a `Some`
447            // variant in the code above.
448            unsafe { self.logfile.as_mut().unwrap_unchecked() }
449        };
450
451        logfile.write(&chunk).unwrap_or_else(track!(self.context.tracker));
452        logfile.flush().unwrap_or_else(track!(self.context.tracker));
453
454        // Sets the chunk length to 0 to indicate that the chunk has finished writing to the
455        // logfile and will not be written again.
456        chunk.clear();
457    }
458
459    /// Deletes the expired log files.
460    #[inline]
461    fn trim(&mut self, lifetime: u64) {
462        let expires = chrono::Utc::now().timestamp().saturating_sub_unsigned(lifetime);
463
464        if let Ok(logfiles) = Logfile::logfiles(&self.context.domain, logfile::Mode::Read)
465            .map_err(track!(self.context.tracker))
466        {
467            logfiles
468                .filter(|f| f.datetime().timestamp() < expires)
469                .for_each(|file| file.delete().unwrap_or_else(track!(self.context.tracker)));
470        }
471    }
472}
473
474impl<M> RunloopHandle for Io<M>
475where
476    M: Memory,
477{
478    type Event = IoEvent;
479
480    #[inline]
481    fn handle(&mut self, event: Self::Event, context: &mut runloop::Context) {
482        match event {
483            IoEvent::WriteChunk => self.write_chunk(),
484            IoEvent::Trim { lifetime } => self.trim(lifetime),
485            IoEvent::Shutdown => context.stop(),
486        }
487    }
488}
489
490#[cfg(test)]
491mod tests {
492    use crate::{
493        chunk, chunk::Chunk, codec::Decode, compress::ZstdCompressor, encrypt::AesEncryptor,
494        logger, logger::Operation, Record, RecordBuilder,
495    };
496
497    #[test]
498    fn test_processor() {
499        type Processor = logger::Processor<Option<ZstdCompressor>, Option<AesEncryptor>>;
500        let mut processor = Processor::new(None, None);
501
502        let mut memory = Vec::<u8>::with_capacity(256);
503        unsafe {
504            memory.set_len(256);
505        }
506
507        fn test_process<'a>(
508            processor: &mut Processor,
509            memory: &mut Vec<u8>,
510            contents: impl IntoIterator<Item = &'a str>,
511        ) {
512            let mut chunk = Chunk::bind(memory.as_mut_slice());
513            chunk.initialize(chrono::Utc::now(), [0; 33]);
514
515            let records = contents
516                .into_iter()
517                .map(|c| RecordBuilder::new().content(c).build())
518                .collect::<Vec<_>>();
519
520            for record in &records {
521                processor.process(Operation::Input(&record), &mut chunk).unwrap();
522            }
523            processor.process(Operation::Rotate, &mut chunk).unwrap();
524
525            let payload_len = chunk.payload_len();
526            let mut payload = &memory[chunk::Header::LEN..chunk::Header::LEN + payload_len];
527
528            for record in records {
529                let new_record = Record::decode(&mut payload).unwrap();
530                assert_eq!(record, new_record);
531            }
532
533            assert_eq!(payload.len(), 0);
534        }
535
536        test_process(&mut processor, &mut memory, []);
537        test_process(&mut processor, &mut memory, ["Hello", "World"]);
538    }
539}