1use std::cell::UnsafeCell;
2use std::fs::OpenOptions;
3use std::io::{stdout, Write};
4use std::mem::transmute;
5use std::collections::LinkedList;
6use std::path::Path;
7use std::sync::{
8 atomic::{AtomicU8, AtomicUsize, Ordering},
9 Arc,
10};
11use std::time::Duration;
12use thread_local::ThreadLocal;
13
14#[derive(Debug, PartialEq)]
15#[repr(u8)]
16enum BufferState {
17 Unlock,
18 Lock,
19 Dump,
20}
21
22struct LocalBufferMut {
23 logs: LinkedList<(i64, String)>,
24 cur_size: usize,
25 size_limit: usize,
26}
27
28impl LocalBufferMut {
29
30 #[inline(always)]
31 fn new(buf_size: usize) -> Self {
32 Self{
33 logs: LinkedList::new(),
34 cur_size: 0,
35 size_limit: buf_size,
36 }
37 }
38
39 #[inline(always)]
40 fn push(&mut self, ts: i64, content: String) {
41 self.cur_size += content.len();
42 self.logs.push_back((ts, content));
43 while self.cur_size > self.size_limit && self.logs.len() > 1 {
44 if let Some((_, old)) = self.logs.pop_front() {
45 self.cur_size -= old.len();
46 } else {
47 unreachable!();
48 }
49 }
50 }
51}
52
53struct LocalBuffer {
54 inner: UnsafeCell<LocalBufferMut>,
55 locked: AtomicU8,
56}
57
58unsafe impl Send for LocalBuffer {}
59unsafe impl Sync for LocalBuffer {}
60
61impl LocalBuffer {
62 #[inline(always)]
63 fn new(buf_size: usize) -> Arc<Self> {
64 Arc::new(Self {
65 inner: UnsafeCell::new(LocalBufferMut::new(buf_size)),
66 locked: AtomicU8::new(BufferState::Unlock as u8),
67 })
68 }
69
70 #[inline(always)]
71 fn write(&self, ts: i64, buf: String) {
72 loop {
73 match self.try_lock(BufferState::Unlock, BufferState::Lock) {
74 Ok(_) => {
75 let inner = self.get_inner_mut();
76 inner.push(ts, buf);
77 self.locked.store(BufferState::Unlock as u8, Ordering::Release);
78 return;
79 }
80 Err(s) => {
81 if s == BufferState::Dump as u8 {
82 std::thread::sleep(Duration::from_millis(100));
83 } else {
84 unreachable!();
85 }
86 }
87 }
88 }
89 }
90
91 #[inline]
92 fn collect(&self, all: &mut Vec<(i64, String)>) {
93 loop {
94 match self.try_lock(BufferState::Unlock, BufferState::Dump) {
95 Ok(_) => {
96 {
97 let inner = self.get_inner();
98 for (ts, line) in inner.logs.iter() {
99 all.push((*ts, line.clone()));
100 }
101 }
102 self.locked.store(BufferState::Unlock as u8, Ordering::Release);
103 return;
104 }
105 Err(s) => {
106 if s == BufferState::Lock as u8 {
107 std::hint::spin_loop();
108 } else {
109 return;
110 }
111 }
112 }
113 }
114 }
115
116 #[inline(always)]
117 fn try_lock(&self, state: BufferState, target: BufferState) -> Result<(), u8> {
118 match self.locked.compare_exchange(
119 state as u8,
120 target as u8,
121 Ordering::Acquire,
122 Ordering::Relaxed,
123 ) {
124 Ok(_) => Ok(()),
125 Err(s) => Err(s),
126 }
127 }
128
129 #[inline(always)]
130 fn get_inner(&self) -> &LocalBufferMut {
131 unsafe { transmute(self.inner.get()) }
132 }
133
134 #[inline(always)]
135 fn get_inner_mut(&self) -> &mut LocalBufferMut {
136 unsafe { transmute(self.inner.get()) }
137 }
138}
139
140pub struct RingFile {
144 file_path: Option<Box<Path>>,
145 buf_size: usize,
146 buffers: ThreadLocal<Arc<LocalBuffer>>,
147 count: AtomicUsize,
148}
149
150impl RingFile {
151 pub fn new(buf_size: usize, file_path: Option<Box<Path>>) -> Self {
157 Self {
158 file_path,
159 buf_size,
160 count: AtomicUsize::new(0),
161 buffers: ThreadLocal::with_capacity(32),
162 }
163 }
164
165 pub fn dump(&self) -> std::io::Result<()> {
167 let mut all: Vec<(i64, String)>;
168 {
169 let mut est = self.count.load(Ordering::Relaxed) * self.buf_size / 100;
170 if est < 100 {
171 est = 100;
172 }
173 all = Vec::with_capacity(2 * est);
174 for buf in self.buffers.iter() {
175 buf.collect(&mut all);
176 }
177 }
178 all.sort_by(|a, b| a.0.cmp(&b.0));
179 macro_rules! dump_all {
180 ($f: expr) => {
181 for (_, line) in all {
182 if let Err(e) = $f.write_all(line.as_bytes()) {
183 println!("RingFile: dump error {:?}", e);
184 return Err(e);
185 }
186 }
187 $f.flush()?;
188 };
189 }
190 if let Some(path) = self.file_path.as_ref() {
191 match OpenOptions::new().write(true).create(true).truncate(true).open(path) {
192 Ok(mut f) => {
193 dump_all!(f);
194 }
195 Err(e) => {
196 return Err(e);
197 }
198 }
199 } else {
200 let mut f = stdout().lock();
201 dump_all!(f);
202 }
203 Ok(())
204 }
205
206 #[inline(always)]
207 pub fn write(&self, ts: i64, content: String) {
208 let buf = self.buffers.get_or(|| {
209 let _ = self.count.fetch_add(1, Ordering::Relaxed);
210 LocalBuffer::new(self.buf_size)
211 });
212 buf.write(ts, content);
213 }
214}