gts_logger/logbackend/
dualthread.rs1use crate::error::GtsLoggerError;
2use crate::logbackend::LogBackend;
3use crate::logclient::LogEventTs;
4use gts_transport::error::GtsTransportError;
5use gts_transport::membackend::memchunk::MemChunkHolder;
6use gts_transport::sync::lfringspsc::{spsc_ring_pair, SpScRingData, SpScRingSender};
7use minstant::Instant;
8use serde::Serialize;
9use std::cell::UnsafeCell;
10use std::fmt::Debug;
11use std::io::Write;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::mpsc::channel;
14use std::sync::Arc;
15use std::time::Duration;
16
17pub struct DualThreadLogBacked<const RSIZE: usize, T>
18where
19 T: Copy + Send,
20{
21 run_flag: Arc<AtomicBool>,
23 join_handle_alpha: Option<std::thread::JoinHandle<()>>,
24 join_handle_beta: Option<std::thread::JoinHandle<()>>,
25 log_tx: UnsafeCell<SpScRingSender<RSIZE, T, MemChunkHolder<SpScRingData<RSIZE, T>>>>,
26}
27
28impl<T, const RSIZE: usize> DualThreadLogBacked<RSIZE, LogEventTs<T>>
29where
30 T: Copy + Send + 'static + Debug + Serialize,
31{
32 pub fn new(dest: impl Write + Send + 'static) -> Self {
33 let running_flag_alpha = Arc::new(AtomicBool::new(true));
34 let running_flag_beta = Arc::new(AtomicBool::new(true));
35 let running_flag_alpha_clone = running_flag_alpha.clone();
38 let running_flag_beta_clone = running_flag_beta.clone();
39 let (log_tx, mut log_rx) =
41 spsc_ring_pair::<RSIZE, LogEventTs<T>, _>(MemChunkHolder::zeroed());
42
43 let (queue_tx, queue_rx) = channel();
44
45 let join_handle_alpha = Some(std::thread::spawn(move || {
48 while running_flag_alpha_clone.load(Ordering::Relaxed) {
50 let mut counter = 0;
51 loop {
52 match log_rx.try_recv() {
54 Ok(res) => {
55 queue_tx.send(*res).unwrap();
57 counter += 1;
58 }
59 Err(GtsTransportError::WouldBlock) => {
60 break;
61 }
62 _ => unreachable!(),
63 }
64 }
65 if counter > 0 {
66 println!("READ {} items", counter);
67 }
68 std::thread::sleep(Duration::from_millis(10));
69 }
70 running_flag_beta.store(false, Ordering::Relaxed);
71 println!("logthread-alpha closed");
72 }));
73
74 let join_handle_beta = Some(std::thread::spawn(move || {
75 let mut last_send = minstant::Instant::now();
76
77 let mut dest = dest;
78 let mut logs = Vec::with_capacity(3000);
88 while running_flag_beta_clone.load(Ordering::Relaxed) {
89 loop {
90 match queue_rx.try_recv() {
91 Ok(res) => {
92 logs.push(res);
93 }
94 Err(_) => {
95 break;
97 }
98 }
99 }
100 if !logs.is_empty()
101 && (logs.len() >= 5000 || last_send.elapsed() > Duration::from_millis(5000))
102 {
103 for log in &logs {
104 dest.write(&serde_json::to_vec(log).unwrap()).unwrap();
105 }
106 last_send = Instant::now();
107 }
108 std::thread::sleep(Duration::from_millis(500));
109 }
110 println!("logthread-beta closed");
111 }));
112
113 DualThreadLogBacked {
114 run_flag: running_flag_alpha,
115 join_handle_alpha,
116 join_handle_beta,
117 log_tx: log_tx.into(),
118 }
119 }
120}
121
122impl<T, const RSIZE: usize> Drop for DualThreadLogBacked<RSIZE, T>
123where
124 T: Copy + Send,
125{
126 fn drop(&mut self) {
127 self.run_flag.store(false, Ordering::Relaxed);
128 self.join_handle_alpha.take().unwrap().join().unwrap();
129 self.join_handle_beta.take().unwrap().join().unwrap();
130 }
131}
132
133impl<T, const RSIZE: usize> LogBackend<T> for DualThreadLogBacked<RSIZE, T>
134where
135 T: Copy + Send,
136{
137 fn log(&self, event: T) -> Result<(), GtsLoggerError> {
138 let log_tx = unsafe { &mut *self.log_tx.get() };
143 log_tx.send(&event)?;
144 Ok(())
145 }
146}