ring_file/
threads.rs

1use std::cell::UnsafeCell;
2use std::fs::OpenOptions;
3use std::io::{stdout, Write};
4use std::mem::transmute;
5use std::collections::LinkedList;
6use std::path::Path;
7use std::sync::{
8    atomic::{AtomicU8, AtomicUsize, Ordering},
9    Arc,
10};
11use std::time::Duration;
12use thread_local::ThreadLocal;
13
14#[derive(Debug, PartialEq)]
15#[repr(u8)]
16enum BufferState {
17    Unlock,
18    Lock,
19    Dump,
20}
21
22struct LocalBufferMut {
23    logs: LinkedList<(i64, String)>,
24    cur_size: usize,
25    size_limit: usize,
26}
27
28impl LocalBufferMut {
29
30    #[inline(always)]
31    fn new(buf_size: usize) -> Self {
32        Self{
33            logs: LinkedList::new(),
34            cur_size: 0,
35            size_limit: buf_size,
36        }
37    }
38
39    #[inline(always)]
40    fn push(&mut self, ts: i64, content: String) {
41        self.cur_size += content.len();
42        self.logs.push_back((ts, content));
43        while self.cur_size > self.size_limit && self.logs.len() > 1 {
44            if let Some((_, old)) = self.logs.pop_front() {
45                self.cur_size -= old.len();
46            } else {
47                unreachable!();
48            }
49        }
50    }
51}
52
53struct LocalBuffer {
54    inner: UnsafeCell<LocalBufferMut>,
55    locked: AtomicU8,
56}
57
58unsafe impl Send for LocalBuffer {}
59unsafe impl Sync for LocalBuffer {}
60
61impl LocalBuffer {
62    #[inline(always)]
63    fn new(buf_size: usize) -> Arc<Self> {
64        Arc::new(Self {
65            inner: UnsafeCell::new(LocalBufferMut::new(buf_size)),
66            locked: AtomicU8::new(BufferState::Unlock as u8),
67        })
68    }
69
70    #[inline(always)]
71    fn write(&self, ts: i64, buf: String) {
72        loop {
73            match self.try_lock(BufferState::Unlock, BufferState::Lock) {
74                Ok(_) => {
75                    let inner = self.get_inner_mut();
76                    inner.push(ts, buf);
77                    self.locked.store(BufferState::Unlock as u8, Ordering::Release);
78                    return;
79                }
80                Err(s) => {
81                    if s == BufferState::Dump as u8 {
82                        std::thread::sleep(Duration::from_millis(100));
83                    } else {
84                        unreachable!();
85                    }
86                }
87            }
88        }
89    }
90
91    #[inline]
92    fn collect(&self, all: &mut Vec<(i64, String)>) {
93        loop {
94            match self.try_lock(BufferState::Unlock, BufferState::Dump) {
95                Ok(_) => {
96                    {
97                        let inner = self.get_inner();
98                        for (ts, line) in inner.logs.iter() {
99                            all.push((*ts, line.clone()));
100                        }
101                    }
102                    self.locked.store(BufferState::Unlock as u8, Ordering::Release);
103                    return;
104                }
105                Err(s) => {
106                    if s == BufferState::Lock as u8 {
107                        std::hint::spin_loop();
108                    } else {
109                        return;
110                    }
111                }
112            }
113        }
114    }
115
116    #[inline(always)]
117    fn try_lock(&self, state: BufferState, target: BufferState) -> Result<(), u8> {
118        match self.locked.compare_exchange(
119            state as u8,
120            target as u8,
121            Ordering::Acquire,
122            Ordering::Relaxed,
123        ) {
124            Ok(_) => Ok(()),
125            Err(s) => Err(s),
126        }
127    }
128
129    #[inline(always)]
130    fn get_inner(&self) -> &LocalBufferMut {
131        unsafe { transmute(self.inner.get()) }
132    }
133
134    #[inline(always)]
135    fn get_inner_mut(&self) -> &mut LocalBufferMut {
136        unsafe { transmute(self.inner.get()) }
137    }
138}
139
140/// RingFile keeps [RingBuffer] within thread local, to prevent lock contention affecting program
141/// execution.
142/// When program hang or panic, you can call dump() to collect the logs into file or stdout.
143pub struct RingFile {
144    file_path: Option<Box<Path>>,
145    buf_size: usize,
146    buffers: ThreadLocal<Arc<LocalBuffer>>,
147    count: AtomicUsize,
148}
149
150impl RingFile {
151    /// # Arguments:
152    ///
153    /// - buf_size: buffer size per thread
154    ///
155    /// - file_path: If contains a path, the target is a file, otherwise will write to stdout.
156    pub fn new(buf_size: usize, file_path: Option<Box<Path>>) -> Self {
157        Self {
158            file_path,
159            buf_size,
160            count: AtomicUsize::new(0),
161            buffers: ThreadLocal::with_capacity(32),
162        }
163    }
164
165    /// collect all the buffers, sort by timestamp and dump to disk or stdout.
166    pub fn dump(&self) -> std::io::Result<()> {
167        let mut all: Vec<(i64, String)>;
168        {
169            let mut est = self.count.load(Ordering::Relaxed) * self.buf_size / 100;
170            if est < 100 {
171                est = 100;
172            }
173            all = Vec::with_capacity(2 * est);
174            for buf in self.buffers.iter() {
175                buf.collect(&mut all);
176            }
177        }
178        all.sort_by(|a, b| a.0.cmp(&b.0));
179        macro_rules! dump_all {
180            ($f: expr) => {
181                for (_, line) in all {
182                    if let Err(e) = $f.write_all(line.as_bytes()) {
183                        println!("RingFile: dump error {:?}", e);
184                        return Err(e);
185                    }
186                }
187                $f.flush()?;
188            };
189        }
190        if let Some(path) = self.file_path.as_ref() {
191            match OpenOptions::new().write(true).create(true).truncate(true).open(path) {
192                Ok(mut f) => {
193                    dump_all!(f);
194                }
195                Err(e) => {
196                    return Err(e);
197                }
198            }
199        } else {
200            let mut f = stdout().lock();
201            dump_all!(f);
202        }
203        Ok(())
204    }
205
206    #[inline(always)]
207    pub fn write(&self, ts: i64, content: String) {
208        let buf = self.buffers.get_or(|| {
209            let _ = self.count.fetch_add(1, Ordering::Relaxed);
210            LocalBuffer::new(self.buf_size)
211        });
212        buf.write(ts, content);
213    }
214}