gts_logger/logbackend/
dualthread.rs

1use crate::error::GtsLoggerError;
2use crate::logbackend::LogBackend;
3use crate::logclient::LogEventTs;
4use gts_transport::error::GtsTransportError;
5use gts_transport::membackend::memchunk::MemChunkHolder;
6use gts_transport::sync::lfringspsc::{spsc_ring_pair, SpScRingData, SpScRingSender};
7use minstant::Instant;
8use serde::Serialize;
9use std::cell::UnsafeCell;
10use std::fmt::Debug;
11use std::io::Write;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::mpsc::channel;
14use std::sync::Arc;
15use std::time::Duration;
16
17pub struct DualThreadLogBacked<const RSIZE: usize, T>
18where
19    T: Copy + Send,
20{
21    // queue_rx: Receiver<T>,
22    run_flag: Arc<AtomicBool>,
23    join_handle_alpha: Option<std::thread::JoinHandle<()>>,
24    join_handle_beta: Option<std::thread::JoinHandle<()>>,
25    log_tx: UnsafeCell<SpScRingSender<RSIZE, T, MemChunkHolder<SpScRingData<RSIZE, T>>>>,
26}
27
28impl<T, const RSIZE: usize> DualThreadLogBacked<RSIZE, LogEventTs<T>>
29where
30    T: Copy + Send + 'static + Debug + Serialize,
31{
32    pub fn new(dest: impl Write + Send + 'static) -> Self {
33        let running_flag_alpha = Arc::new(AtomicBool::new(true));
34        let running_flag_beta = Arc::new(AtomicBool::new(true));
35        // let queue = Arc::new(Mutex::new(VecDeque::<T>::new()));
36
37        let running_flag_alpha_clone = running_flag_alpha.clone();
38        let running_flag_beta_clone = running_flag_beta.clone();
39        // let queue_clone = queue.clone();
40        let (log_tx, mut log_rx) =
41            spsc_ring_pair::<RSIZE, LogEventTs<T>, _>(MemChunkHolder::zeroed());
42
43        let (queue_tx, queue_rx) = channel();
44
45        // let fname = fname.map(|fname| fname.to_string());
46
47        let join_handle_alpha = Some(std::thread::spawn(move || {
48            //let mut logs = Vec::with_capacity(3000);
49            while running_flag_alpha_clone.load(Ordering::Relaxed) {
50                let mut counter = 0;
51                loop {
52                    //while logs.len() < logs.capacity() {
53                    match log_rx.try_recv() {
54                        Ok(res) => {
55                            //queue_tx.send(*res).unwrap();
56                            queue_tx.send(*res).unwrap();
57                            counter += 1;
58                        }
59                        Err(GtsTransportError::WouldBlock) => {
60                            break;
61                        }
62                        _ => unreachable!(),
63                    }
64                }
65                if counter > 0 {
66                    println!("READ {} items", counter);
67                }
68                std::thread::sleep(Duration::from_millis(10));
69            }
70            running_flag_beta.store(false, Ordering::Relaxed);
71            println!("logthread-alpha closed");
72        }));
73
74        let join_handle_beta = Some(std::thread::spawn(move || {
75            let mut last_send = minstant::Instant::now();
76
77            let mut dest = dest;
78            // enum Fp {
79            //     File(File),
80            //     Sink(Sink),
81            // };
82            // let fp = match fname {
83            //     Some(name) => Fp::File(File::open(name).unwrap()),
84            //     None => Fp::Sink(std::io::sink()),
85            // };
86
87            let mut logs = Vec::with_capacity(3000);
88            while running_flag_beta_clone.load(Ordering::Relaxed) {
89                loop {
90                    match queue_rx.try_recv() {
91                        Ok(res) => {
92                            logs.push(res);
93                        }
94                        Err(_) => {
95                            // either empty or closed, need to break
96                            break;
97                        }
98                    }
99                }
100                if !logs.is_empty()
101                    && (logs.len() >= 5000 || last_send.elapsed() > Duration::from_millis(5000))
102                {
103                    for log in &logs {
104                        dest.write(&serde_json::to_vec(log).unwrap()).unwrap();
105                    }
106                    last_send = Instant::now();
107                }
108                std::thread::sleep(Duration::from_millis(500));
109            }
110            println!("logthread-beta closed");
111        }));
112
113        DualThreadLogBacked {
114            run_flag: running_flag_alpha,
115            join_handle_alpha,
116            join_handle_beta,
117            log_tx: log_tx.into(),
118        }
119    }
120}
121
122impl<T, const RSIZE: usize> Drop for DualThreadLogBacked<RSIZE, T>
123where
124    T: Copy + Send,
125{
126    fn drop(&mut self) {
127        self.run_flag.store(false, Ordering::Relaxed);
128        self.join_handle_alpha.take().unwrap().join().unwrap();
129        self.join_handle_beta.take().unwrap().join().unwrap();
130    }
131}
132
133impl<T, const RSIZE: usize> LogBackend<T> for DualThreadLogBacked<RSIZE, T>
134where
135    T: Copy + Send,
136{
137    fn log(&self, event: T) -> Result<(), GtsLoggerError> {
138        // SAFETY: Self is !Sync, only this function uses log_tx,
139        // no reentrancy in this function.
140        // but need verify reentrancy (by signal e.g.)
141        // anyway refcell doesn't check signal-reentrancy either.
142        let log_tx = unsafe { &mut *self.log_tx.get() };
143        log_tx.send(&event)?;
144        Ok(())
145    }
146}