mod buf;
mod writer;
use buf::DoubleBuf;
use writer::ThreadedWriter;
use std::sync::{Mutex, Arc};
pub use writer::FileWriter;
pub use buf::Metrics;
pub use buf::Slice;
pub trait Writer<T: Send + 'static>: Send {
fn process_slice(&mut self, slice: &[T]);
fn flush(&mut self);
}
pub struct AsyncLoggerNB<T: Send + 'static> {
buf: DoubleBuf<T>,
tw: ThreadedWriter,
writer: Arc<Mutex<Box<dyn Writer<T>>>>,
terminated: Arc<Mutex<bool>>,
threshold: usize,
}
impl<T: Send + 'static> AsyncLoggerNB<T> {
pub fn new(writer: Box<dyn Writer<T>>, buf_sz: usize) -> Result<AsyncLoggerNB<T>, Error> {
let buf = DoubleBuf::<T>::new(buf_sz)?;
let writer = Arc::new(Mutex::new(writer));
let writer2 = writer.clone();
let tw = ThreadedWriter::new(writer2, &buf);
let terminated = Arc::new(Mutex::new(false));
let threshold = buf_sz - buf_sz / 5;
Ok(AsyncLoggerNB {
buf,
tw,
writer,
terminated,
threshold,
})
}
pub fn terminate(self) {
let mut guard = self.terminated.lock().unwrap();
if ! *guard {
self.tw.request_stop();
self.buf.seal_buffers();
self.tw.wait_termination();
*guard = true;
}
}
pub fn write_slice(&self, slice: &[T]) -> Result<(),()> where T: Copy {
if slice.len() >= self.threshold {
let mut guard = self.writer.lock().unwrap();
guard.process_slice(slice);
} else {
self.buf.write_slice(slice)?;
}
Ok(())
}
pub fn reserve_slice(&self, reserve_size: usize) -> Result<Slice<T>,Error> where T: Copy {
if reserve_size >= self.threshold {
return Err(Error::new(ErrorKind::RequestedSizeIsTooLong, ErrorRepr::Simple));
} else {
return self.buf.reserve_slice(reserve_size, false);
}
}
#[inline]
pub fn reserve_slice_relaxed(&self, reserve_size: usize) -> Result<Slice<T>,()> where T: Copy {
return self.buf.reserve_slice(reserve_size, true).map_err(|_| {()});
}
pub fn write_value(&self, value: T) -> Result<(),()> {
let slice = [value];
self.buf.write_slice(&slice)?;
std::mem::forget(slice);
Ok(())
}
pub fn flush(&self) {
self.buf.flush();
}
pub fn get_metrics(&self) -> Metrics {
self.buf.get_metrics()
}
}
#[derive(Debug)]
pub struct Error {
kind: ErrorKind,
repr: ErrorRepr
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.to_string())
}
}
impl Error {
fn new(kind: ErrorKind, repr: ErrorRepr) -> Error {
Error {
kind,
repr
}
}
pub fn io_err(self) -> Option<std::io::Error> {
match self.repr {
ErrorRepr::IoError(e) => Some(e),
_ => None
}
}
pub fn time_err(self) -> Option<std::time::SystemTimeError> {
match self.repr {
ErrorRepr::TimeError(e) => Some(e),
_ => None
}
}
pub fn layout_err(self) -> Option<std::alloc::LayoutErr> {
match self.repr {
ErrorRepr::MemoryLayoutError(e) => Some(e),
_ => None
}
}
pub fn kind(&self) -> ErrorKind {
self.kind
}
}
impl std::error::Error for Error { }
#[derive(Debug, PartialEq, Copy, Clone)]
pub enum ErrorKind {
PathToStrConversionError,
TimeError,
IoError,
IncorrectBufferSize,
AllocFailure,
MemoryLayoutError,
LoggerIsTerminated,
RequestedSizeIsTooLong,
}
#[derive(Debug)]
enum ErrorRepr {
Simple,
IoError(std::io::Error),
TimeError(std::time::SystemTimeError),
MemoryLayoutError(std::alloc::LayoutErr),
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::Path;
use std::io::{BufRead, BufReader};
use std::fs::File;
use std::thread;
use std::sync::{Once, MutexGuard, atomic::AtomicU64, atomic::Ordering};
use std::mem::MaybeUninit;
use std::collections::HashMap;
const LOG_DIR: &str = "/tmp/AsyncLoggerNBTest_45870201463983";
static mut TEST_MUTEX: MaybeUninit<Mutex<()>> = MaybeUninit::uninit();
static INIT_MUTEX: Once = Once::new();
fn prepare<'a>() -> MutexGuard<'a, ()> {
INIT_MUTEX.call_once(|| {
unsafe { TEST_MUTEX = MaybeUninit::new(Mutex::new(())) };
});
let mtx: &Mutex<()> = unsafe { TEST_MUTEX.as_ptr().as_ref().expect("Test mutex is not initialized") };
let guard = mtx.lock().expect("Test mutex is poisoned");
if Path::new(LOG_DIR).exists() {
cleanup();
}
std::fs::create_dir(LOG_DIR).expect("Failed to create test dir");
guard
}
fn cleanup() {
std::fs::remove_dir_all(LOG_DIR).expect("Failed to delete test dir on cleanup");
}
fn get_resulting_file_path() -> String {
String::from(Path::new(LOG_DIR)
.read_dir()
.expect("Failed to list files in test directory")
.next()
.expect("No files found in test directory")
.expect("Failed to get entry inside test directory")
.path()
.to_str()
.expect("Failed to get file path as str"))
}
fn spawn_threads<T: Send + Sync + Clone + Copy + 'static>(logger: &Arc<AsyncLoggerNB<T>>, test_strings: &[&'static [T]], cnt: usize, flush_cnt: usize) {
let mut handles = vec![];
for i in 0..test_strings.len() {
let s = test_strings[i];
let logger_c = logger.clone();
let handle = thread::spawn(move || {
for i in 1..cnt+1 {
if i & 0x1 == 0 {
logger_c.write_slice(&s).unwrap();
} else {
match logger_c.reserve_slice(s.len()) {
Ok(mut bytes) => {
let dst = &mut bytes;
dst.copy_from_slice(&s);
drop(bytes);
},
Err(e) => {
if e.kind() == ErrorKind::RequestedSizeIsTooLong {
logger_c.write_slice(&s).unwrap();
} else {
panic!("Unexpected error: {:?}", e);
}
}
}
}
if i % flush_cnt == 0 {
logger_c.flush();
}
}
});
handles.push(handle);
}
for handle in handles {
handle.join().expect("Failed on thread join");
}
}
#[test]
fn test_async_logger_single_thread() {
let _guard = prepare();
let writer = FileWriter::new(LOG_DIR, std::usize::MAX).expect("Failed to create file writer");
let writer_obj: Box<dyn Writer<u8>> = Box::new(writer);
let buf_sz = 64;
let logger = AsyncLoggerNB::new(writer_obj, buf_sz).expect("Failed to create new async logger");
let mut cnt = 10000;
let write_line = "Hello, world!\n";
for _ in 0..cnt {
logger.write_slice(write_line.as_bytes()).unwrap();
}
logger.terminate();
let out_file = get_resulting_file_path();
let mut reader = BufReader::new(File::open(out_file).expect("Failed to open resulting file"));
let mut line = String::new();
loop {
let len = reader.read_line(&mut line).expect("Failed to read line from the reslting file");
if len == 0 {
break;
}
assert_eq!(write_line, line);
line.clear();
cnt -= 1;
}
cleanup();
}
fn run_threaded_test(test_strings: &'static [&[u8]], buf_sz: usize, iter_cnt: usize, flush_cnt: usize) {
let writer = FileWriter::new(LOG_DIR, std::usize::MAX).expect("Failed to create file writer");
let writer_obj: Box<dyn Writer<u8>> = Box::new(writer);
let logger = Arc::new(AsyncLoggerNB::new(writer_obj, buf_sz).expect("Failed to create new async logger"));
spawn_threads(&logger, &test_strings, iter_cnt, flush_cnt);
match Arc::try_unwrap(logger) {
Ok(logger) => logger.terminate(),
Err(_) => panic!("Failed to terminate logger because it is still used"),
};
let out_file = get_resulting_file_path();
let mut reader = BufReader::new(File::open(out_file).expect("Failed to open resulting file"));
let mut line = String::new();
let mut test_strings_hm = std::collections::HashMap::new();
for x in test_strings.iter() { test_strings_hm.insert(std::str::from_utf8(*x).unwrap().to_owned(), 0); };
loop {
let len = reader.read_line(&mut line).expect("Failed to read line from the reslting file");
if len == 0 {
break;
}
*test_strings_hm.get_mut(&line).expect(&format!("The line is not recognized: {}", line)) += 1;
line.clear();
}
test_strings_hm.iter().for_each( |(line, cnt)| {
assert_eq!(*cnt, iter_cnt, "Resulting file contains {} lines \"{}\", but expected {}", cnt, line, iter_cnt);
});
}
#[test]
fn test_async_logger_multiple_threads() {
let _guard = prepare();
static TEST_STRINGS: [&[u8]; 10] = [
b"aAaAaA AaAa 0\n",
b"bBbBbB BbBbB 1\n",
b"CcCcCcC cCcCcC 2\n",
b"DdDdD dDDDdDdDd 3\n",
b"eEeEeEe eEeEeEe E 4\n",
b"FfFf FfFf FfFfFfFf 5\n",
b"gGgGg GgGgG gGgGgGg 6\n",
b"HhHhHhHhHhH hHhHhHhHh 7\n",
b"IiIiIiI IiIiIiI iIiIiI 8\n",
b"jJjJ jJjJjJ jJjJjJjJjjJ 9\n",
];
let buf_sz = 64;
let iter_cnt = 1000;
run_threaded_test(&TEST_STRINGS, buf_sz, iter_cnt, iter_cnt + 1);
cleanup();
}
#[test]
fn test_async_logger_large_msg() {
let _guard = prepare();
static TEST_STRINGS: [&[u8]; 10] = [
b"aAaAaA AaAa 0\n",
b"bBbBbB BbBbB 1\n",
b"CcCcCcC cCcCcC 2\n",
b"DdDdD dDDDdDdDd 3\n",
b"eEeEeEe eEeEeEe E 4 eEeEeEe eEeEeEe E 4 eEeEeEe eEeEeEe E 4 eEeEeEe eEeEeEe E 4\n",
b"FfFf FfFf FfFfFfFf 5\n",
b"gGgGg GgGgG gGgGgGg 6\n",
b"HhHhHhHhHhH hHhHhHhHh 7\n",
b"IiIiIiI IiIiIiI iIiIiI 8\n",
b"jJjJ jJjJjJ jJjJjJjJjjJ 9 jJjJ jJjJjJ jJjJjJjJjjJ 9 jJjJ jJjJjJ jJjJjJjJjjJ 9\n",
];
let buf_sz = 64;
let iter_cnt = 1000;
run_threaded_test(&TEST_STRINGS, buf_sz, iter_cnt, iter_cnt + 1);
cleanup();
}
#[test]
fn test_flush() {
let _guard = prepare();
static TEST_STRINGS: [&[u8]; 10] = [
b"aAaAaA AaAa 0\n",
b"bBbBbB BbBbB 1\n",
b"CcCcCcC cCcCcC 2\n",
b"DdDdD dDDDdDdDd 3\n",
b"eEeEeEe eEeEeEe E 4\n",
b"FfFf FfFf FfFfFfFf 5\n",
b"gGgGg GgGgG gGgGgGg 6\n",
b"HhHhHhHhHhH hHhHhHhHh 7\n",
b"IiIiIiI IiIiIiI iIiIiI 8\n",
b"jJjJ jJjJjJ jJjJjJjJjjJ 9\n",
];
let buf_sz = 64;
let iter_cnt = 1000;
run_threaded_test(&TEST_STRINGS, buf_sz, iter_cnt, iter_cnt / 20);
cleanup();
}
struct WriterTest {
flush_cnt: Arc<AtomicU64>,
slice_cnt: Arc<AtomicU64>,
}
impl<T: Send + Clone + 'static> Writer<T> for WriterTest {
fn process_slice(&mut self, _slice: &[T]) {
self.slice_cnt.fetch_add(1, Ordering::Relaxed);
}
fn flush(&mut self) {
self.flush_cnt.fetch_add(1, Ordering::Relaxed);
}
}
fn test_flush2<T: Send + Clone + Copy + 'static>(write_line: &[T]) {
let buf_sz = 1024;
let flush_cnt = Arc::new(AtomicU64::new(0));
let slice_cnt = Arc::new(AtomicU64::new(0));
let writer = WriterTest {
flush_cnt: flush_cnt.clone(),
slice_cnt: slice_cnt.clone(),
};
let writer_obj: Box<dyn Writer<T>> = Box::new(writer);
let logger = Arc::new(AsyncLoggerNB::new(writer_obj, buf_sz).expect("Failed to create new async logger"));
logger.write_slice(write_line).unwrap();
logger.write_slice(write_line).unwrap();
logger.flush();
logger.write_slice(write_line).unwrap();
logger.write_slice(write_line).unwrap();
logger.flush();
match Arc::try_unwrap(logger) {
Ok(logger) => logger.terminate(),
Err(_) => panic!("Failed to terminate logger because it is still used"),
};
assert_eq!(1, flush_cnt.load(Ordering::Relaxed), "Flush count doesnt match");
let slice_cnt = slice_cnt.load(Ordering::Relaxed);
assert!(2 <= slice_cnt && 4 >= slice_cnt, "Slice count has unexpected value {}", slice_cnt);
}
#[test]
fn test_flush2_u8() {
let write_line: &[u8] = b"abc";
test_flush2(write_line);
}
struct StubWriter {
counters: [u64; 4],
lengths: [usize; 4],
}
impl Writer<u8> for StubWriter {
fn process_slice(&mut self, slice: &[u8]) {
let mut p = 0;
while p<slice.len() {
let l = (slice[p] - 49) as usize;
if l > 3 {
println!("l = {}, p = {}, slice = {}", l, p, String::from_utf8_lossy(slice));
}
self.counters[l] += 1;
p += self.lengths[l];
}
}
fn flush(&mut self) {
for i in 0..self.counters.len() {
println!("counter {}: {}", i, self.counters[i]);
}
}
}
#[ignore]
#[test]
fn heavy_concurrency_test() {
let test_strings: &[&[u8]] = &[
b"1[INFO module_x]: testing message, thread #",
b"2[INFO module_y]: testing message for thread #",
b"3[INFO module_z]: another one message for thread #",
b"4[INFO module_o]: a long long long long long long long long long long long long message for therad #",
];
let lengths = [
test_strings[0].len(),
test_strings[1].len(),
test_strings[2].len(),
test_strings[3].len(),
];
let buf_sz = 8192 * 8;
let iter_cnt = 10000000;
let writer_obj: Box<dyn Writer<u8>> = Box::new(StubWriter {counters: [0u64;4], lengths});
let logger = Arc::new(AsyncLoggerNB::new(writer_obj, buf_sz).expect("Failed to create new async logger"));
for i in 1..25+1 {
spawn_threads(&logger, &test_strings, iter_cnt, iter_cnt/100);
println!("{:?}", logger.get_metrics());
println!("{}", i);
}
match Arc::try_unwrap(logger) {
Ok(logger) => logger.terminate(),
Err(_) => panic!("Failed to terminate logger because it is still used"),
};
}
#[test]
fn test_flush2_u64() {
static WRITE_LINE: [u64; 10] = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
test_flush2(&WRITE_LINE);
}
struct IntWriter<T> {
pub counters: Arc<HashMap<T, AtomicU64>>,
pub lengths: HashMap<T, usize>,
}
impl<T: Clone + Sync + Send + Copy + 'static + Eq + std::hash::Hash> Writer<T> for IntWriter<T> {
fn process_slice(&mut self, slice: &[T]) {
let mut p = 0;
while p<slice.len() {
let l = slice[p];
(*self.counters).get(&l).unwrap().fetch_add(1, Ordering::Relaxed);
p += self.lengths.get(&l).unwrap();
}
}
fn flush(&mut self) { }
}
fn test_async_logger_param<T: Sync + Clone + Copy + Send + 'static + Eq + std::hash::Hash>(test_strings: &[&'static [T]]) {
let mut lengths = HashMap::new();
for i in 0..4 {
lengths.insert(test_strings[i][0], test_strings[i].len());
}
let buf_sz = 1024;
let iter_cnt = 10000;
let mut counters = HashMap::new();
for i in 0..4 {
counters.insert(test_strings[i][0], AtomicU64::new(0));
}
let counters = Arc::new(counters);
let writer_obj: Box<dyn Writer<T>> = Box::new(IntWriter {counters: counters.clone(), lengths});
let logger = Arc::new(AsyncLoggerNB::new(writer_obj, buf_sz).expect("Failed to create new async logger"));
for _ in 1..10 {
spawn_threads(&logger, test_strings, iter_cnt, iter_cnt/100);
}
match Arc::try_unwrap(logger) {
Ok(logger) => logger.terminate(),
Err(_) => panic!("Failed to terminate logger because it is still used"),
};
}
#[test]
fn test_async_logger_u32() {
static TEST_STRINGS: &[&[u32]] = &[
&[1, 502, 504, 5, 6, 101, 102, 103, 65536, 1000000000],
&[2, 7, 8, 9, 10, 11, 12, 13, 14, std::u32::MAX-2, 60, 61, 62, 63, 64, 65],
&[3, std::u32::MAX-3, 16, 17, 18, std::u32::MAX-3, 20],
&[4, 21, 22, 23, 24, 25, std::u32::MAX-4],
];
test_async_logger_param(TEST_STRINGS);
}
#[test]
fn test_async_logger_u64() {
static TEST_STRINGS: &[&[u64]] = &[
&[1, 502, 504, 5, 6, 101, 102, 103, 65536, 5000000000],
&[2, 7, 8, 9, 10, 11, 12, 13, 14, std::u64::MAX-2, 60, 61, 62, 63, 64, 65],
&[3, std::u64::MAX-3, 16, 17, 18, std::u64::MAX-3, 20],
&[4, 21, 22, 23, 24, 25, std::u64::MAX-4],
];
test_async_logger_param(TEST_STRINGS);
}
#[test]
fn test_async_logger_str() {
static TEST_STRINGS: &[&[&str]] = &[
&["1", "test"],
&["2",],
&["3", "test 3", "test test 3", "test 3 tst", ""],
&["4", "verdurenoj", "propergertulopus"],
];
test_async_logger_param(TEST_STRINGS);
}
struct StringWriter {
pub counters: Arc<HashMap<String, AtomicU64>>,
}
impl Writer<Box<String>> for StringWriter {
fn process_slice(&mut self, slice: &[Box<String>]) {
let mut p = 0;
while p<slice.len() {
let l: &String = &(slice[p]);
match (*self.counters).get(l) {
Some(c) => { c.fetch_add(1, Ordering::Relaxed); },
None => panic!("wrong val {}, {}, {:?}", l, p, slice)
};
p += 1;
}
}
fn flush(&mut self) { }
}
fn write_complete_slice_boxed(logger_c: &Arc<AsyncLoggerNB<Box<String>>>, s: &[&str]) {
for j in 0..s.len() {
logger_c.write_value(Box::new(s[j].to_owned())).unwrap();
}
}
fn spawn_threads_string(logger: &Arc<AsyncLoggerNB<Box<String>>>, test_strings: &'static [&'static [&str]], cnt: usize, flush_cnt: usize) {
let mut handles = vec![];
for i in 0..test_strings.len() {
let s = test_strings[i];
let logger_c = logger.clone();
let handle = thread::spawn(move || {
for l in 1..cnt+1 {
write_complete_slice_boxed(&logger_c, s);
if l % flush_cnt == 0 {
logger_c.flush();
}
}
});
handles.push(handle);
}
for handle in handles {
handle.join().expect("Failed on thread join");
}
}
fn test_async_logger_boxed(test_strings: &'static [&'static [&str]]) {
let buf_sz = 1024;
let iter_cnt = 10000;
let mut counters = HashMap::new();
for i in 0..test_strings.len() {
for j in 0..test_strings[i].len() {
counters.insert(String::from(test_strings[i][j]), AtomicU64::new(0));
}
}
let counters = Arc::new(counters);
let writer_obj: Box<dyn Writer<Box<String>>> = Box::new(StringWriter {counters: counters.clone()});
let logger = Arc::new(AsyncLoggerNB::new(writer_obj, buf_sz).expect("Failed to create new async logger"));
for _ in 1..10+1 {
spawn_threads_string(&logger, test_strings, iter_cnt, iter_cnt/100);
}
match Arc::try_unwrap(logger) {
Ok(logger) => logger.terminate(),
Err(_) => panic!("Failed to terminate logger because it is still used"),
};
for (k,v) in counters.iter() {
assert_eq!(iter_cnt*10, v.load(Ordering::Relaxed) as usize, "Counter for value {} doesn't match", k);
}
}
#[test]
fn test_async_logger_box() {
static TEST_STRINGS: &[&[&str]] = &[
&["line 1", "test"],
&["line 2",],
&["line 3", "test 3", "test test 3", "test 3 tst", ""],
&["line 4", "verdurenoj", "propergertulopus"],
];
test_async_logger_boxed(TEST_STRINGS);
}
}