async_logger/
lib.rs

1//! `AsyncLoggerNB` is implementation of asynchronous logger/queue that allows writing arbitrary slices to a memory buffer, 
2//! and then send the buffer to a processing thread. 
3//! 
4//! `AsyncLoggerNB` uses pair of fixed size buffers; 
5//! while one buffer is being written by the multiple threads, the second is being proccessed by the
6//! single "writer" thread. Writing to a buffers is lock-free operation.
7//! Blocking appears only at the moment when buffers change their roles.
8//! This makes `AsyncLoggerNB` realy fast, and at the same time allows it be bounded.
9//! It can be effectively used in mutlithreaded mutlicore environment with high level of concurrent writes 
10//! when you don't want to drop messages or run out of memory but still want to keep lock-free writes.
11//! 
12//! `AsyncLoggerNB` can process serialized data (stream of bytes) or custom complex data structures, and also references to objects.
13//! 
14//! `AsyncLoggerNB` can accept any "writer" as soon as it implements `Writer` trait. This package includes
15//! `FileWriter` that writes data to a file.
16//! 
17//! Implementation of [log](https://docs.rs/log) facade based on this crate is available as separate crate
18//! [async_logger_log](https://docs.rs/async_logger_log).
19//!
20//! # Examples
21//!
22//! ```
23//! use async_logger::FileWriter;
24//! use async_logger::AsyncLoggerNB;
25//! use std::{thread, sync::Arc};
26//!
27//! let writer = FileWriter::new("/tmp", 10*1024*1024).expect("Failed to create file writer");
28//!
29//! let logger = Arc::new(AsyncLoggerNB::new(Box::new(writer), 8192)
30//!     .expect("Failed to create new async logger"));
31//!
32//! let write_line = "Hello, world!\n";
33//! 
34//! let logger_c = logger.clone();
35//!
36//! let handle = thread::spawn(move || {
37//!
38//!     logger_c.write_slice(write_line.as_bytes()).unwrap();
39//!     logger_c.write_slice(write_line.as_bytes()).unwrap();
40//!     logger_c.flush();
41//!
42//!     logger_c.write_slice(write_line.as_bytes()).unwrap();
43//!
44//! });
45//!
46//! handle.join().expect("Failed on thread join");
47//!
48//! match Arc::try_unwrap(logger) {
49//!     Ok(logger) => logger.terminate(),
50//!     Err(_) => panic!("Failed to terminate logger because it is still in use"),
51//! };
52//! ```
53//!
54//! When the size of data to be written is known in beforehand it may be more efficient to write data
55//! directly to the underlying buffer. In this case `AsyncLoggerNB::reserve_slice` can be used:
56//!
57//! ```
58//! use async_logger::{FileWriter, AsyncLoggerNB, Writer};
59//!
60//! // implement some custom writer along the way
61//! struct Stub {}
62//! impl Writer<u8> for Stub {
63//!     fn process_slice(&mut self, slice: &[u8]) {
64//!         for item in slice {
65//!             println!("{}", item);
66//!         }
67//!     }
68//!     fn flush(&mut self) {}
69//! }
70//!
71//! let logger = AsyncLoggerNB::new(Box::new(Stub {}), 8192)
72//!     .expect("Failed to create new async logger");
73//!
74//! // getting slice for writing
75//! let mut slice = logger.reserve_slice(10).unwrap();
76//!
77//! assert_eq!(10, slice.len());
78//!
79//! // write to the logger buffer directly
80//! for i in 0..10 {
81//!     slice[i] = (i*i) as u8;
82//! }
83//!
84//! drop(slice);    // release the buffer
85//!
86//! ```
87//!
88//! Sometimes it is more efficient to write a pointer to some existing instance of struct instead
89//! of copying the complete struct into buffer. This can be achieved by moving boxed reference to a struct to
90//! `AsyncLoggerNB::write_value`. See the documentation of the function 
91//! [write_value](struct.AsyncLoggerNB.html#method.write_value) for details and example.
92//!
93//! # Performance
94//!
95//! Recommended buffer size is to let holding from tens to hundreds of
96//! messages. Choosing too small size leads to performance degradation. And choosing too big size
97//! doesn't increase performance significantly but leads to resource waste. 
98//!
99//! ### Performance tests
100//!
101//! Tests show that this lock-free implementation is at least not slower than comparable
102//! implementation with mutex, and can be at least two times faster under highly competitive load.
103//!
104//! ### Metrics
105//!
106//! `AsyncLoggerNB` collects total time spent by threads waiting for free buffer space in nanoseconds,
107//! and total count of wait events. 
108//! Metrics collection is enabled at compile time with feature `metrics`.
109//! After enabling metrics `AsyncLoggerNB::get_metrics` can be used to get the current metrics values.
110//! Note, the metrics values can wrap around after significant amount of time of running without
111//! interruption.
112//!
113//! # Notes
114//!
115//! Attempt to get several instances of `Slice` struct at the same time in the same thread can cause deadlock.
116
117mod buf;
118mod writer;
119
120
121use buf::DoubleBuf;
122use writer::ThreadedWriter;
123use std::sync::{Mutex, Arc};
124pub use writer::FileWriter;
125pub use buf::Metrics;
126pub use buf::Slice;
127
128
129/// Writer performs data processing of a fully filled buffer.
130pub trait Writer<T: Send + 'static>: Send {
131
132    /// Logger calls this function when there is data to be processed.
133    /// This function is guaranteed to be called sequentially; no internal synchronization is
134    /// required by default.
135    fn process_slice(&mut self, slice: &[T]);
136
137    /// Flush the remining data, and finalize writer. 
138    /// This function is called only on writer thread termination.
139    fn flush(&mut self);
140}
141
142
143
144/// Logger with non-blocking async processing.
145pub struct AsyncLoggerNB<T: Send + 'static> {
146    buf:    DoubleBuf<T>,
147    tw:     ThreadedWriter,
148    writer: Arc<Mutex<Box<dyn Writer<T>>>>,
149    terminated: Arc<Mutex<bool>>,
150    threshold:  usize,
151}
152
153
154impl<T: Send + 'static> AsyncLoggerNB<T> {
155
156    /// Create a new AsyncLoggerNB instance with buffer of buf_size items.
157    ///
158    /// # Errors
159    ///
160    /// `Err` is returend if `buf_sz` is greater than `std::isize::MAX` or `buf_sz` is zero or when
161    /// `T` has size of zero, or when memory allocation has failed for some reason (e.g. OOM).
162    ///
163    /// # Panics
164    ///
165    /// Panics of OS fails to create thread.
166    pub fn new(writer: Box<dyn Writer<T>>, buf_sz: usize) -> Result<AsyncLoggerNB<T>, Error> {
167
168        let buf = DoubleBuf::<T>::new(buf_sz)?;
169
170        let writer = Arc::new(Mutex::new(writer));
171
172        let writer2 = writer.clone();
173
174        let tw = ThreadedWriter::new(writer2, &buf);
175
176        let terminated = Arc::new(Mutex::new(false));
177
178        let threshold = buf_sz - buf_sz / 5;
179
180        Ok(AsyncLoggerNB {
181            buf,
182            tw,
183            writer,
184            terminated,
185            threshold,
186        })
187    }
188
189    /// Flush underlying buffers, and wait until writer thread terminates. 
190    /// Further attempts to write to buffers will return error.
191    ///
192    /// # Panics
193    ///
194    /// Panics if some of the internal mutexes is poisoned, or when writer thread paniced.
195    pub fn terminate(self) {
196
197        let mut guard = self.terminated.lock().unwrap();
198
199        if ! *guard {
200
201            self.tw.request_stop();
202
203            self.buf.seal_buffers();
204
205            self.tw.wait_termination();
206
207            *guard = true;
208        }
209    }
210
211    /// Write a slice of `<T>`. If the size of slice is larger or equal to 0.8 * buffer_size then buffer is
212    /// bypassed, and slice is handed directly to writer. Note, in this case message can appear
213    /// out-of-order.
214    /// Function blocks if message size is less than 0.8 * buffer_size, and there is not enough free space in any of buffers. 
215    /// As soon as there is free space larger than 0.8 * buffer_size available slice is written and function returns.
216    ///
217    /// # Errors
218    ///
219    /// `Err` is returned when the function tries to put slice in buffer after `terminate` was called. 
220    /// This is normally not expected, because `terminate` takes ownership on logger instance.
221    ///
222    /// # Panics
223    ///
224    /// This function panics if some of the internal mutexes is poisoned or when writer thread panics.
225    pub fn write_slice(&self, slice: &[T]) -> Result<(),()> where T: Copy {
226
227        if slice.len() >= self.threshold {
228
229            let mut guard = self.writer.lock().unwrap();
230
231            guard.process_slice(slice);
232
233        } else {
234
235            self.buf.write_slice(slice)?;
236        }
237
238        Ok(())
239    }
240
241
242    /// This function is similar to `write_slice` but instead of pushing some slice to buffer it allows
243    /// reserving some space for writing directly in the underlying destination buffer. This way excessive
244    /// copy operation from the slice to the internal buffer can be avoided.
245    /// Thus, this function is more preferable than `write_slice` but is applicable only when you know the size of the slice you need
246    /// beforehand. When the size of the slice doesn't matter use `reserve_slice_relaxed`.
247    ///
248    /// The function returns `Slice` struct that can be dereferenced as mutable slice of `<T>`. The
249    /// client code can use the dereferenced slice to write to it. The client code holds the buffer
250    /// until `Slice` instance goes out of scope or is explicitly dropped with `drop`. That
251    /// means client's code must take care of not holding the returned `Slice` instance for too long
252    /// because it can block other threads.
253    /// 
254    /// # Errors
255    ///
256    /// If the `reserve_size` is larger or equal to 0.8 * buffer_size then `Err` is returned with
257    /// `ErrorKind::RequestedSizeIsTooLong`.
258    ///
259    /// `Err` is also returned when the function is called after `terminate` was called, but
260    /// this is normally not expected, because `terminate` takes ownership on logger instance.
261    ///
262    /// # Panics
263    ///
264    /// This function panics if some of the internal mutexes is poisoned or when writer thread panics.
265    pub fn reserve_slice(&self, reserve_size: usize) -> Result<Slice<T>,Error> where T: Copy {
266
267        if reserve_size >= self.threshold {
268            return Err(Error::new(ErrorKind::RequestedSizeIsTooLong, ErrorRepr::Simple));
269        } else {
270            return self.buf.reserve_slice(reserve_size, false);
271        }
272    }
273
274
275    /// This function is similar to `reserve_slice` but returned `Slice` struct can have length  
276    /// from 1 item, and up to `reserve_size` items.
277    ///
278    /// # Errors
279    ///
280    /// `Err` is returned when the function is called after `terminate` was called.
281    /// This is normally not expected, because `terminate` takes ownership on logger instance.
282    ///
283    /// # Panics
284    ///
285    /// This function panics if some of the internal mutexes is poisoned or when writer thread panics.
286    #[inline]
287    pub fn reserve_slice_relaxed(&self, reserve_size: usize) -> Result<Slice<T>,()>  where T: Copy {
288
289        return self.buf.reserve_slice(reserve_size, true).map_err(|_| {()});
290    }
291
292    /// Write a value of type `<T>`. This method can be used for writing values that do not
293    /// implement `Copy` trait, e.g. `String`, or pointer to a string `Box<String>`. The function
294    /// takes ownership of the argument. After the argument is processed by writer `drop` for it is
295    /// called automatically.
296    /// 
297    /// Function blocks if there is not enough free space in any of buffers. 
298    /// As soon as there is free space available value is written and function returns.
299    ///
300    /// # Errors
301    ///
302    /// `Err` is returned when the function tries to put value in buffer after `terminate` was called. 
303    /// This is normally not expected, because `terminate` takes ownership on logger instance.
304    ///
305    /// # Panics
306    ///
307    /// This function panics if some of the internal mutexes is poisoned or when writer thread panics.
308    ///
309    /// # Examples
310    ///
311    /// ```
312    /// use async_logger::{FileWriter, AsyncLoggerNB, Writer};
313    ///
314    /// // implement some custom writer along the way
315    /// struct Stub {}
316    /// impl Writer<Box<String>> for Stub {
317    ///     fn process_slice(&mut self, slice: &[Box<String>]) {}
318    ///     fn flush(&mut self) {}
319    /// }
320    ///
321    /// let writer_obj: Box<dyn Writer<Box<String>>> = Box::new(Stub {});
322    ///
323    /// let logger = AsyncLoggerNB::new(Box::new(Stub {}), 8192)
324    ///     .expect("Failed to create new async logger");
325    ///
326    /// let string_ptr = Box::new("test message".to_owned());
327    /// logger.write_value(string_ptr).unwrap();
328    ///
329    /// ```
330    pub fn write_value(&self, value: T) -> Result<(),()> {
331        let slice = [value];
332        self.buf.write_slice(&slice)?;
333        std::mem::forget(slice);
334        Ok(())
335    }
336
337    /// Mark not yet full buffer as ready for writer.
338    /// This function doesn't call `Writer::flush`.
339    /// This function doesn't wait while writer process all the previously written data.
340    ///
341    /// # Panics
342    ///
343    /// Panics if some of the internal mutexes is poisoned.
344    pub fn flush(&self) {
345
346        self.buf.flush();
347    }
348
349
350    /// Return current values of performance metrics, e.g. wait event information.
351    pub fn get_metrics(&self) -> Metrics {
352        self.buf.get_metrics()
353    }
354}
355
356/// Errors returned by the crate functions.
357#[derive(Debug)]
358pub struct Error {
359    kind: ErrorKind,
360    repr: ErrorRepr
361}
362
363impl std::fmt::Display for Error {
364    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
365        write!(f, "{}", self.to_string())
366    }
367}
368
369impl Error {
370
371    fn new(kind: ErrorKind, repr: ErrorRepr) -> Error {
372        Error {
373            kind,
374            repr
375        }
376    }
377
378    /// For kind IoError return associated io error.
379    pub fn io_err(self) -> Option<std::io::Error> {
380        match self.repr {
381            ErrorRepr::IoError(e) => Some(e),
382            _ => None
383        }
384    }
385
386    /// For kind TimeError return associated time error.
387    pub fn time_err(self) -> Option<std::time::SystemTimeError> {
388        match self.repr {
389            ErrorRepr::TimeError(e) => Some(e),
390            _ => None
391        }
392    }
393
394    /// For kind MemoryLayoutError return associated memory layout error.
395    pub fn layout_err(self) -> Option<std::alloc::LayoutErr> {
396        match self.repr {
397            ErrorRepr::MemoryLayoutError(e) => Some(e),
398            _ => None
399        }
400    }
401
402    /// Returns kind of error.
403    pub fn kind(&self) -> ErrorKind {
404        self.kind
405    }
406}
407
408impl std::error::Error for Error { }
409
410
411/// Error kinds.
412#[derive(Debug, PartialEq, Copy, Clone)]
413pub enum ErrorKind {
414    PathToStrConversionError,
415    TimeError,
416    IoError,
417    IncorrectBufferSize,
418    AllocFailure,
419    MemoryLayoutError,
420    LoggerIsTerminated,
421    RequestedSizeIsTooLong,
422}
423
424#[derive(Debug)]
425enum ErrorRepr {
426    Simple,
427    IoError(std::io::Error),
428    TimeError(std::time::SystemTimeError),
429    MemoryLayoutError(std::alloc::LayoutErr),
430}
431
432
433#[cfg(test)]
434mod tests {
435
436    use super::*;
437    use std::path::Path;
438    use std::io::{BufRead, BufReader};
439    use std::fs::File;
440    use std::thread;
441    use std::sync::{Once, MutexGuard, atomic::AtomicU64, atomic::Ordering};
442    use std::mem::MaybeUninit;
443    use std::collections::HashMap;
444
445
446    const LOG_DIR: &str = "/tmp/AsyncLoggerNBTest_45870201463983";
447
448    static mut TEST_MUTEX: MaybeUninit<Mutex<()>> = MaybeUninit::uninit();
449
450    static INIT_MUTEX: Once = Once::new();
451
452
453    fn prepare<'a>() -> MutexGuard<'a, ()> {
454
455        INIT_MUTEX.call_once(|| {
456            unsafe { TEST_MUTEX = MaybeUninit::new(Mutex::new(())) };
457        });
458
459        let mtx: &Mutex<()> = unsafe { TEST_MUTEX.as_ptr().as_ref().expect("Test mutex is not initialized") };
460        let guard = mtx.lock().expect("Test mutex is poisoned");
461
462        if Path::new(LOG_DIR).exists() {
463
464            cleanup();
465        }
466
467        std::fs::create_dir(LOG_DIR).expect("Failed to create test dir");
468
469        guard
470    }
471
472
473    fn cleanup() {
474
475        std::fs::remove_dir_all(LOG_DIR).expect("Failed to delete test dir on cleanup");
476    }
477
478
479    fn get_resulting_file_path() -> String {
480
481        String::from(Path::new(LOG_DIR)
482            .read_dir()
483            .expect("Failed to list files in test directory")
484            .next()
485            .expect("No files found in test directory")
486            .expect("Failed to get entry inside test directory")
487            .path()
488            .to_str()
489            .expect("Failed to get file path as str"))
490    }
491
492
493    fn spawn_threads<T: Send + Sync + Clone + Copy + 'static>(logger: &Arc<AsyncLoggerNB<T>>, test_strings: &[&'static [T]], cnt: usize, flush_cnt: usize) {
494
495        let mut handles = vec![];
496
497        for i in 0..test_strings.len() {
498
499            let s = test_strings[i];
500
501            let logger_c = logger.clone();
502
503            let handle = thread::spawn(move || {
504
505                for i in 1..cnt+1 {
506                    if i & 0x1 == 0 {
507                        logger_c.write_slice(&s).unwrap();
508                    } else {
509                        match logger_c.reserve_slice(s.len()) {
510                            Ok(mut bytes) => {
511                                let dst = &mut bytes;
512                                dst.copy_from_slice(&s);
513                                drop(bytes);
514                            },
515                            Err(e) => {
516                                if e.kind() == ErrorKind::RequestedSizeIsTooLong {
517                                    logger_c.write_slice(&s).unwrap();
518                                } else {
519                                    panic!("Unexpected error: {:?}", e);
520                                }
521                            }
522                        }
523                    }
524
525                    if i % flush_cnt == 0 {
526                        logger_c.flush();
527                    }
528                }
529            });
530
531            handles.push(handle);
532        }
533
534        for handle in handles {
535            handle.join().expect("Failed on thread join");
536        }
537    }
538
539    ///
540    /// tests for u8 slices
541    ///
542
543    #[test]
544    fn test_async_logger_single_thread() {
545
546        let _guard = prepare();
547
548        let writer = FileWriter::new(LOG_DIR, std::usize::MAX).expect("Failed to create file writer");
549
550        let writer_obj: Box<dyn Writer<u8>> = Box::new(writer);
551
552        let buf_sz = 64;
553        
554        let logger = AsyncLoggerNB::new(writer_obj, buf_sz).expect("Failed to create new async logger");
555
556        let mut cnt = 10000;
557
558        let write_line = "Hello, world!\n";
559        
560        for _ in 0..cnt {
561            logger.write_slice(write_line.as_bytes()).unwrap();
562        }
563
564        logger.terminate();
565
566        let out_file = get_resulting_file_path();
567
568        let mut reader = BufReader::new(File::open(out_file).expect("Failed to open resulting file"));
569
570        let mut line = String::new();
571
572        loop {
573
574            let len = reader.read_line(&mut line).expect("Failed to read line from the reslting file");
575
576            if len == 0 {
577
578                break;
579            }
580
581            assert_eq!(write_line, line);
582
583            line.clear();
584
585            cnt -= 1;
586        }
587        
588        cleanup();
589    }
590
591
592    fn run_threaded_test(test_strings: &'static [&[u8]], buf_sz: usize, iter_cnt: usize, flush_cnt: usize) {
593
594        let writer = FileWriter::new(LOG_DIR, std::usize::MAX).expect("Failed to create file writer");
595
596        let writer_obj: Box<dyn Writer<u8>> = Box::new(writer);
597
598        let logger = Arc::new(AsyncLoggerNB::new(writer_obj, buf_sz).expect("Failed to create new async logger"));
599
600        spawn_threads(&logger, &test_strings, iter_cnt, flush_cnt);
601
602        match Arc::try_unwrap(logger) {
603            Ok(logger) => logger.terminate(),
604            Err(_) => panic!("Failed to terminate logger because it is still used"),
605        };
606
607        let out_file = get_resulting_file_path();
608
609        let mut reader = BufReader::new(File::open(out_file).expect("Failed to open resulting file"));
610
611        let mut line = String::new();
612
613        let mut test_strings_hm = std::collections::HashMap::new();
614
615        for x in test_strings.iter() { test_strings_hm.insert(std::str::from_utf8(*x).unwrap().to_owned(), 0); };
616
617        loop {
618
619            let len = reader.read_line(&mut line).expect("Failed to read line from the reslting file");
620
621            if len == 0 {
622
623                break;
624            }
625
626            *test_strings_hm.get_mut(&line).expect(&format!("The line is not recognized: {}", line)) += 1;
627
628            line.clear();
629        }
630
631        test_strings_hm.iter().for_each( |(line, cnt)| {
632            assert_eq!(*cnt, iter_cnt, "Resulting file contains {} lines \"{}\", but expected {}", cnt, line, iter_cnt);
633        });
634    }
635
636
637    #[test]
638    fn test_async_logger_multiple_threads() {
639
640        let _guard = prepare();
641
642        static TEST_STRINGS: [&[u8]; 10] = [
643            b"aAaAaA AaAa 0\n",
644            b"bBbBbB BbBbB 1\n",
645            b"CcCcCcC cCcCcC 2\n",
646            b"DdDdD dDDDdDdDd 3\n",
647            b"eEeEeEe eEeEeEe E 4\n",
648            b"FfFf FfFf FfFfFfFf 5\n",
649            b"gGgGg GgGgG gGgGgGg 6\n",
650            b"HhHhHhHhHhH hHhHhHhHh 7\n",
651            b"IiIiIiI IiIiIiI iIiIiI 8\n",
652            b"jJjJ jJjJjJ jJjJjJjJjjJ 9\n",
653        ];
654
655        let buf_sz = 64;
656
657        let iter_cnt = 1000;
658        
659        run_threaded_test(&TEST_STRINGS, buf_sz, iter_cnt, iter_cnt + 1);
660      
661        cleanup();
662    }
663
664
665    #[test]
666    fn test_async_logger_large_msg() {
667
668        let _guard = prepare();
669
670        static TEST_STRINGS: [&[u8]; 10] = [
671            b"aAaAaA AaAa 0\n",
672            b"bBbBbB BbBbB 1\n",
673            b"CcCcCcC cCcCcC 2\n",
674            b"DdDdD dDDDdDdDd 3\n",
675            b"eEeEeEe eEeEeEe E 4 eEeEeEe eEeEeEe E 4 eEeEeEe eEeEeEe E 4 eEeEeEe eEeEeEe E 4\n",
676            b"FfFf FfFf FfFfFfFf 5\n",
677            b"gGgGg GgGgG gGgGgGg 6\n",
678            b"HhHhHhHhHhH hHhHhHhHh 7\n",
679            b"IiIiIiI IiIiIiI iIiIiI 8\n",
680            b"jJjJ jJjJjJ jJjJjJjJjjJ 9 jJjJ jJjJjJ jJjJjJjJjjJ 9 jJjJ jJjJjJ jJjJjJjJjjJ 9\n",
681        ];
682
683        let buf_sz = 64;
684
685        let iter_cnt = 1000;
686
687        run_threaded_test(&TEST_STRINGS, buf_sz, iter_cnt, iter_cnt + 1);
688
689        cleanup();
690    }
691
692    #[test]
693    fn test_flush() {
694
695        let _guard = prepare();
696
697        static TEST_STRINGS: [&[u8]; 10] = [
698            b"aAaAaA AaAa 0\n",
699            b"bBbBbB BbBbB 1\n",
700            b"CcCcCcC cCcCcC 2\n",
701            b"DdDdD dDDDdDdDd 3\n",
702            b"eEeEeEe eEeEeEe E 4\n",
703            b"FfFf FfFf FfFfFfFf 5\n",
704            b"gGgGg GgGgG gGgGgGg 6\n",
705            b"HhHhHhHhHhH hHhHhHhHh 7\n",
706            b"IiIiIiI IiIiIiI iIiIiI 8\n",
707            b"jJjJ jJjJjJ jJjJjJjJjjJ 9\n",
708        ];
709
710        let buf_sz = 64;
711
712        let iter_cnt = 1000;
713        
714        run_threaded_test(&TEST_STRINGS, buf_sz, iter_cnt, iter_cnt / 20);
715      
716        cleanup();
717    }
718
719    struct WriterTest {
720        flush_cnt: Arc<AtomicU64>,
721        slice_cnt: Arc<AtomicU64>,
722    }
723
724    impl<T: Send + Clone + 'static> Writer<T> for WriterTest {
725
726        fn process_slice(&mut self, _slice: &[T]) {
727            self.slice_cnt.fetch_add(1, Ordering::Relaxed);
728        }
729
730        fn flush(&mut self) {
731            self.flush_cnt.fetch_add(1, Ordering::Relaxed);
732        }
733    }
734
735    fn test_flush2<T: Send + Clone + Copy + 'static>(write_line: &[T]) {
736
737        let buf_sz = 1024;
738        let flush_cnt = Arc::new(AtomicU64::new(0));
739        let slice_cnt = Arc::new(AtomicU64::new(0));
740
741        let writer = WriterTest {
742            flush_cnt: flush_cnt.clone(),
743            slice_cnt: slice_cnt.clone(),
744        };
745
746        let writer_obj: Box<dyn Writer<T>> = Box::new(writer);
747
748        let logger = Arc::new(AsyncLoggerNB::new(writer_obj, buf_sz).expect("Failed to create new async logger"));
749
750        logger.write_slice(write_line).unwrap();
751        logger.write_slice(write_line).unwrap();
752        logger.flush();
753
754        logger.write_slice(write_line).unwrap();
755        logger.write_slice(write_line).unwrap();
756        logger.flush();
757
758        match Arc::try_unwrap(logger) {
759            Ok(logger) => logger.terminate(),
760            Err(_) => panic!("Failed to terminate logger because it is still used"),
761        };
762
763        assert_eq!(1, flush_cnt.load(Ordering::Relaxed), "Flush count doesnt match");
764
765        let slice_cnt = slice_cnt.load(Ordering::Relaxed);
766        assert!(2 <= slice_cnt && 4 >= slice_cnt, "Slice count has unexpected value {}", slice_cnt);
767    }
768
769    #[test]
770    fn test_flush2_u8() {
771        let write_line: &[u8] = b"abc";
772        test_flush2(write_line);
773    }
774
775    ///
776    /// Heavy concurrency test
777    ///
778
779    struct StubWriter {
780        counters: [u64; 4],
781        lengths: [usize; 4],
782    }
783
784    impl Writer<u8> for StubWriter {
785        fn process_slice(&mut self, slice: &[u8]) {
786            let mut p = 0;
787            while p<slice.len() {
788                let l = (slice[p] - 49) as usize;
789                if l > 3 {
790                    println!("l = {}, p = {}, slice = {}", l, p, String::from_utf8_lossy(slice));
791                }
792                self.counters[l] += 1;
793                p += self.lengths[l];
794            }
795        }
796
797        fn flush(&mut self) {
798            for i in 0..self.counters.len() {
799                println!("counter {}: {}", i, self.counters[i]);
800            }
801        }
802    }
803
804    #[ignore]
805    #[test]
806    fn heavy_concurrency_test() {
807
808        let test_strings: &[&[u8]] = &[
809            b"1[INFO module_x]: testing message, thread #",
810            b"2[INFO module_y]: testing message for thread #",
811            b"3[INFO module_z]: another one message for thread #",
812            b"4[INFO module_o]: a long long long long long long long long long long long long message for therad #",
813        ];
814
815        let lengths = [
816            test_strings[0].len(),
817            test_strings[1].len(),
818            test_strings[2].len(),
819            test_strings[3].len(),
820        ];
821
822        let buf_sz = 8192 * 8;
823
824        let iter_cnt = 10000000;
825
826        let writer_obj: Box<dyn Writer<u8>> = Box::new(StubWriter {counters: [0u64;4], lengths});
827
828        let logger = Arc::new(AsyncLoggerNB::new(writer_obj, buf_sz).expect("Failed to create new async logger"));
829
830        for i in 1..25+1 {
831            spawn_threads(&logger, &test_strings, iter_cnt, iter_cnt/100);
832            println!("{:?}", logger.get_metrics());
833            println!("{}", i);
834        }
835
836        match Arc::try_unwrap(logger) {
837            Ok(logger) => logger.terminate(),
838            Err(_) => panic!("Failed to terminate logger because it is still used"),
839        };
840    }
841
842
843    ///
844    /// tests for u32 and u64 and str slices
845    ///
846
847    #[test]
848    fn test_flush2_u64() {
849        static WRITE_LINE: [u64; 10] = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
850        test_flush2(&WRITE_LINE);
851    }
852    struct IntWriter<T> {
853        pub counters: Arc<HashMap<T, AtomicU64>>,
854        pub lengths: HashMap<T, usize>,
855    }
856
857    impl<T: Clone + Sync + Send + Copy + 'static + Eq + std::hash::Hash> Writer<T> for IntWriter<T> {
858        fn process_slice(&mut self, slice: &[T]) {
859            let mut p = 0;
860            while p<slice.len() {
861                let l = slice[p];
862                (*self.counters).get(&l).unwrap().fetch_add(1, Ordering::Relaxed);
863                p += self.lengths.get(&l).unwrap();
864            }
865        }
866
867        fn flush(&mut self) { }
868    }
869
870
871    fn test_async_logger_param<T: Sync + Clone + Copy + Send + 'static + Eq + std::hash::Hash>(test_strings: &[&'static [T]]) {
872
873        let mut lengths = HashMap::new();
874        for i in 0..4 {
875            lengths.insert(test_strings[i][0], test_strings[i].len());
876        }
877
878        let buf_sz = 1024;
879
880        let iter_cnt = 10000;
881
882        let mut counters = HashMap::new();
883
884        for i in 0..4 {
885            counters.insert(test_strings[i][0], AtomicU64::new(0));
886        }
887
888        let counters = Arc::new(counters);
889
890        let writer_obj: Box<dyn Writer<T>> = Box::new(IntWriter {counters: counters.clone(), lengths});
891
892        let logger = Arc::new(AsyncLoggerNB::new(writer_obj, buf_sz).expect("Failed to create new async logger"));
893
894        for _ in 1..10 {
895            spawn_threads(&logger, test_strings, iter_cnt, iter_cnt/100);
896        }
897
898        match Arc::try_unwrap(logger) {
899            Ok(logger) => logger.terminate(),
900            Err(_) => panic!("Failed to terminate logger because it is still used"),
901        };
902    }
903
904    #[test]
905    fn test_async_logger_u32() {
906        static TEST_STRINGS: &[&[u32]] = &[
907            &[1, 502, 504, 5, 6, 101, 102, 103, 65536, 1000000000],
908            &[2, 7, 8, 9, 10, 11, 12, 13, 14, std::u32::MAX-2, 60, 61, 62, 63, 64, 65],
909            &[3, std::u32::MAX-3, 16, 17, 18, std::u32::MAX-3, 20],
910            &[4, 21, 22, 23, 24, 25, std::u32::MAX-4],
911        ];
912
913        test_async_logger_param(TEST_STRINGS);
914    }
915
916
917    #[test]
918    fn test_async_logger_u64() {
919        static TEST_STRINGS: &[&[u64]] = &[
920            &[1, 502, 504, 5, 6, 101, 102, 103, 65536, 5000000000],
921            &[2, 7, 8, 9, 10, 11, 12, 13, 14, std::u64::MAX-2, 60, 61, 62, 63, 64, 65],
922            &[3, std::u64::MAX-3, 16, 17, 18, std::u64::MAX-3, 20],
923            &[4, 21, 22, 23, 24, 25, std::u64::MAX-4],
924        ];
925
926        test_async_logger_param(TEST_STRINGS);
927    }
928
929    #[test]
930    fn test_async_logger_str() {
931        static TEST_STRINGS: &[&[&str]] = &[
932            &["1", "test"],
933            &["2",],
934            &["3", "test 3", "test test 3", "test 3 tst", ""],
935            &["4", "verdurenoj", "propergertulopus"],
936        ];
937
938        test_async_logger_param(TEST_STRINGS);
939    }
940
941    ///
942    /// Writing boxed strings
943    /// 
944    struct StringWriter {
945        pub counters: Arc<HashMap<String, AtomicU64>>,
946    }
947
948    impl Writer<Box<String>> for StringWriter {
949        fn process_slice(&mut self, slice: &[Box<String>]) {
950            let mut p = 0;
951            while p<slice.len() {
952                let l: &String = &(slice[p]);
953                match (*self.counters).get(l) {
954                    Some(c) => { c.fetch_add(1, Ordering::Relaxed); },
955                    None => panic!("wrong val {}, {}, {:?}", l, p, slice)
956                };
957                p += 1;
958            }
959        }
960
961        fn flush(&mut self) { }
962    }
963
964    fn write_complete_slice_boxed(logger_c: &Arc<AsyncLoggerNB<Box<String>>>, s: &[&str]) {
965
966        for j in 0..s.len() {
967            logger_c.write_value(Box::new(s[j].to_owned())).unwrap();
968        }
969    }
970
971    fn spawn_threads_string(logger: &Arc<AsyncLoggerNB<Box<String>>>, test_strings: &'static [&'static [&str]], cnt: usize, flush_cnt: usize) {
972
973        let mut handles = vec![];
974
975        for i in 0..test_strings.len() {
976
977            let s = test_strings[i];
978
979            let logger_c = logger.clone();
980
981            let handle = thread::spawn(move || {
982
983                for l in 1..cnt+1 {
984
985                    write_complete_slice_boxed(&logger_c, s);
986
987                    if l % flush_cnt == 0 {
988                        logger_c.flush();
989                    }
990                }
991            });
992
993            handles.push(handle);
994        }
995
996        for handle in handles {
997            handle.join().expect("Failed on thread join");
998        }
999    }
1000
1001
1002    fn test_async_logger_boxed(test_strings: &'static [&'static [&str]]) {
1003
1004        let buf_sz = 1024;
1005
1006        let iter_cnt = 10000;
1007
1008        let mut counters = HashMap::new();
1009
1010        for i in 0..test_strings.len() {
1011            for j in 0..test_strings[i].len() {
1012                counters.insert(String::from(test_strings[i][j]), AtomicU64::new(0));
1013            }
1014        }
1015
1016        let counters = Arc::new(counters);
1017
1018        let writer_obj: Box<dyn Writer<Box<String>>> = Box::new(StringWriter {counters: counters.clone()});
1019
1020        let logger = Arc::new(AsyncLoggerNB::new(writer_obj, buf_sz).expect("Failed to create new async logger"));
1021
1022        for _ in 1..10+1 {
1023            spawn_threads_string(&logger, test_strings, iter_cnt, iter_cnt/100);
1024        }
1025
1026        match Arc::try_unwrap(logger) {
1027            Ok(logger) => logger.terminate(),
1028            Err(_) => panic!("Failed to terminate logger because it is still used"),
1029        };
1030
1031        for (k,v) in counters.iter() {
1032            assert_eq!(iter_cnt*10, v.load(Ordering::Relaxed) as usize, "Counter for value {} doesn't match", k);
1033        }
1034    }
1035
1036    #[test]
1037    fn test_async_logger_box() {
1038        static TEST_STRINGS: &[&[&str]] = &[
1039            &["line 1", "test"],
1040            &["line 2",],
1041            &["line 3", "test 3", "test test 3", "test 3 tst", ""],
1042            &["line 4", "verdurenoj", "propergertulopus"],
1043        ];
1044
1045        test_async_logger_boxed(TEST_STRINGS);
1046    }
1047}