1#[cfg(debug_assertions)]
2use std::sync::atomic::AtomicBool;
3use std::{
4 cell::UnsafeCell,
5 io::{self, Write},
6 sync::{
7 OnceLock,
8 atomic::{AtomicI64, AtomicPtr, AtomicU8, AtomicU32, AtomicU64, Ordering},
9 },
10};
11
12use crate::{
13 Level, StaticHeaderWriteFn, level_from_u8,
14 output::{OutputBuffer, RotationConfig},
15 poller::{self, PollerState, ReadyMessageHeap},
16 spsc_queue::{PAYLOAD_ARGS_OFFSET, PAYLOAD_INFO_OFFSET, SpscVarQueue},
17 thread_buffer::{THREAD_BUFFER_REGISTRY, ThreadBuffer},
18 timestamp_counter::TimestampCounter,
19};
20
21pub type DecodeFn = unsafe fn(*const u8, &mut Vec<u8>);
23
24pub type QueueFullCallback = fn(u64);
27
28const DEFAULT_LEVEL: u8 = Level::DBG as u8;
30
31const DEFAULT_FLUSH_DELAY_NS: i64 = 3_000_000_000;
33
34const DEFAULT_FLUSH_BUFFER_SIZE: u32 = 8 * 1024;
36
37const DEFAULT_FLUSH_LEVEL: u8 = Level::OFF as u8;
39
40const DEFAULT_QUEUE_FULL_POLICY: u8 = QueueFullPolicy::Drop as u8;
42
43static CURRENT_LEVEL: AtomicU8 = AtomicU8::new(DEFAULT_LEVEL);
62
63static FLUSH_DELAY_NS: AtomicI64 = AtomicI64::new(DEFAULT_FLUSH_DELAY_NS);
65
66static FLUSH_BUFFER_SIZE: AtomicU32 = AtomicU32::new(DEFAULT_FLUSH_BUFFER_SIZE);
68
69static FLUSH_LEVEL: AtomicU8 = AtomicU8::new(DEFAULT_FLUSH_LEVEL);
71
72static QUEUE_FULL_POLICY: AtomicU8 = AtomicU8::new(DEFAULT_QUEUE_FULL_POLICY);
74
75static QUEUE_FULL_COUNT: AtomicU64 = AtomicU64::new(0);
77
78static QUEUE_FULL_CALLBACK: AtomicPtr<()> = AtomicPtr::new(std::ptr::null_mut());
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87#[repr(u8)]
88pub enum QueueFullPolicy {
89 Drop = 0,
91 Block = 1,
93}
94
95pub(crate) enum QueueFullAction {
97 Drop,
99 Retry,
101}
102
103#[derive(Clone, Copy)]
105pub struct StaticInfo {
106 pub level: Level,
108 pub location: &'static str,
110 pub format_string: &'static str,
112 pub decode: DecodeFn,
114}
115
116struct LoggerInner {
123 timestamp_counter: TimestampCounter,
124 output: OutputBuffer,
125 next_flush_deadline_nanoseconds: Option<i64>,
126}
127
128struct GlobalLogger {
135 inner: UnsafeCell<LoggerInner>,
136 poller_state: PollerState,
137 #[cfg(debug_assertions)]
138 inner_in_use: AtomicBool,
139}
140
141unsafe impl Sync for GlobalLogger {}
144
145impl GlobalLogger {
146 fn new() -> Self {
147 let mut timestamp_counter = TimestampCounter::new();
148 timestamp_counter.init_default();
149 Self {
150 inner: UnsafeCell::new(LoggerInner {
151 timestamp_counter,
152 output: OutputBuffer::new(),
153 next_flush_deadline_nanoseconds: None,
154 }),
155 poller_state: PollerState::default(),
156 #[cfg(debug_assertions)]
157 inner_in_use: AtomicBool::new(false),
158 }
159 }
160
161 #[inline]
171 unsafe fn inner_ptr(&self) -> *mut LoggerInner {
172 #[cfg(debug_assertions)]
173 assert!(
174 !self.inner_in_use.swap(true, Ordering::AcqRel),
175 "ferrilog: concurrent access to LoggerInner detected"
176 );
177 self.inner.get()
178 }
179
180 #[inline]
182 unsafe fn inner_release(&self) {
183 #[cfg(debug_assertions)]
184 self.inner_in_use.store(false, Ordering::Release);
185 }
186
187 fn ensure_poller_stopped(&self) -> io::Result<()> {
188 if self.poller_state.is_running() {
189 Err(io::Error::other("polling thread is running; call stop_polling_thread() first"))
190 } else {
191 Ok(())
192 }
193 }
194}
195
196fn global_logger() -> &'static GlobalLogger {
202 static LOGGER: OnceLock<GlobalLogger> = OnceLock::new();
203 LOGGER.get_or_init(GlobalLogger::new)
204}
205
206pub fn set_level(level: Level) {
214 CURRENT_LEVEL.store(level as u8, Ordering::Relaxed);
215}
216
217#[inline(always)]
222pub fn check_level(level: Level) -> bool {
223 level as u8 >= CURRENT_LEVEL.load(Ordering::Relaxed)
224}
225
226pub fn set_output_writer<W>(writer: W)
232where
233 W: Write + Send + 'static,
234{
235 let logger = global_logger();
236 logger
237 .ensure_poller_stopped()
238 .expect("set_output_writer requires the polling thread to be stopped first");
239 unsafe {
241 let inner = &mut *logger.inner_ptr();
242 inner.output.set_writer(Box::new(writer));
243 logger.inner_release();
244 }
245}
246
247pub fn set_queue_full_policy(policy: QueueFullPolicy) {
249 QUEUE_FULL_POLICY.store(policy as u8, Ordering::Relaxed);
250}
251
252pub fn set_queue_full_callback(callback: Option<QueueFullCallback>) {
254 let ptr = match callback {
255 Some(f) => f as *mut (),
256 None => std::ptr::null_mut(),
257 };
258 QUEUE_FULL_CALLBACK.store(ptr, Ordering::Release);
259}
260
261pub fn queue_full_count() -> u64 {
263 QUEUE_FULL_COUNT.load(Ordering::Relaxed)
264}
265
266pub fn reset_queue_full_count() {
268 QUEUE_FULL_COUNT.store(0, Ordering::Relaxed);
269}
270
271pub fn set_flush_delay(nanoseconds: i64) {
274 FLUSH_DELAY_NS.store(nanoseconds.max(0), Ordering::Relaxed);
275}
276
277pub fn set_flush_buffer_size(bytes: u32) {
280 FLUSH_BUFFER_SIZE.store(bytes, Ordering::Relaxed);
281}
282
283pub fn flush_on(level: Level) {
286 FLUSH_LEVEL.store(level as u8, Ordering::Relaxed);
287}
288
289pub fn set_log_file(path: &str) -> io::Result<()> {
291 let logger = global_logger();
292 logger.ensure_poller_stopped()?;
293 unsafe {
295 let inner = &mut *logger.inner_ptr();
296 let result = inner.output.set_log_file(path);
297 logger.inner_release();
298 result
299 }
300}
301
302pub fn set_log_file_with_rotation(path: &str, config: RotationConfig) -> io::Result<()> {
305 let logger = global_logger();
306 logger.ensure_poller_stopped()?;
307 unsafe {
309 let inner = &mut *logger.inner_ptr();
310 let result = inner.output.set_log_file_with_rotation(path, config);
311 logger.inner_release();
312 result
313 }
314}
315
316pub fn set_header_pattern(pattern: &str) -> io::Result<()> {
319 let logger = global_logger();
320 logger.ensure_poller_stopped()?;
321 unsafe {
323 let inner = &mut *logger.inner_ptr();
324 let result = inner.output.set_header_pattern(pattern);
325 logger.inner_release();
326 result
327 }
328}
329
330pub fn set_static_header(write: StaticHeaderWriteFn) -> io::Result<()> {
333 let logger = global_logger();
334 logger.ensure_poller_stopped()?;
335 unsafe {
336 let inner = &mut *logger.inner_ptr();
337 inner.output.set_static_header(write);
338 logger.inner_release();
339 }
340 Ok(())
341}
342
343pub fn start_polling_thread(interval_nanoseconds: i64) -> io::Result<()> {
349 global_logger().poller_state.start(interval_nanoseconds, crate::logger::poll)
350}
351
352pub fn start_polling_thread_on_core(interval_nanoseconds: i64, core_id: usize) -> io::Result<()> {
355 global_logger().poller_state.start_on_core(
356 interval_nanoseconds,
357 crate::logger::poll,
358 Some(core_id),
359 )
360}
361
362pub fn stop_polling_thread() -> io::Result<()> {
364 global_logger().poller_state.stop()
365}
366
367pub fn poll(force_flush: bool) -> io::Result<()> {
375 let flush_buffer_size = FLUSH_BUFFER_SIZE.load(Ordering::Relaxed) as usize;
377 let flush_level = level_from_u8(FLUSH_LEVEL.load(Ordering::Relaxed));
378 let flush_delay_nanoseconds = FLUSH_DELAY_NS.load(Ordering::Relaxed);
379
380 let mut buffers = Vec::with_capacity(THREAD_BUFFER_REGISTRY.count());
381 for index in 0..THREAD_BUFFER_REGISTRY.count() {
382 let pointer = THREAD_BUFFER_REGISTRY.get(index);
383 if !pointer.is_null() {
384 buffers.push(pointer);
385 }
386 }
387
388 let logger = global_logger();
389 let inner = unsafe { &mut *logger.inner_ptr() };
391 let result = poll_inner(
392 inner,
393 &buffers,
394 flush_buffer_size,
395 flush_level,
396 flush_delay_nanoseconds,
397 force_flush,
398 );
399 unsafe { logger.inner_release() };
400 result
401}
402
403fn poll_inner(
404 inner: &mut LoggerInner,
405 buffers: &[*mut ThreadBuffer],
406 flush_buffer_size: usize,
407 flush_level: Level,
408 flush_delay_nanoseconds: i64,
409 force_flush: bool,
410) -> io::Result<()> {
411 let LoggerInner { timestamp_counter, output, next_flush_deadline_nanoseconds } = inner;
412
413 timestamp_counter.calibrate();
414 let mut heap = ReadyMessageHeap::from_buffers(buffers);
415
416 while let Some(node) = heap.pop() {
417 let payload = unsafe { (*node.header()).payload_pointer() as *const u8 };
418
419 let info: &StaticInfo = unsafe {
421 let info_ptr =
422 payload.add(PAYLOAD_INFO_OFFSET).cast::<*const StaticInfo>().read_unaligned();
423 &*info_ptr
424 };
425
426 let timestamp_nanoseconds =
427 timestamp_counter.timestamp_to_nanoseconds(node.raw_timestamp());
428 let thread_name = unsafe { (*node.thread_buffer()).name_bytes() };
429 unsafe {
430 output.write_record(
431 timestamp_nanoseconds,
432 info.level,
433 thread_name,
434 info.location,
435 |buffer| (info.decode)(payload.add(PAYLOAD_ARGS_OFFSET), buffer),
436 )?;
437 }
438
439 if output.buffered_len() >= flush_buffer_size || info.level >= flush_level {
440 output.flush()?;
441 *next_flush_deadline_nanoseconds = None;
442 }
443
444 unsafe {
445 let queue_pointer = &raw mut (*node.thread_buffer()).queue;
446 SpscVarQueue::consumer_pop(queue_pointer);
447 }
448 heap.refresh_buffer(node.thread_buffer());
449 }
450
451 poller::cleanup_finished_buffers();
452
453 if force_flush {
454 output.flush()?;
455 *next_flush_deadline_nanoseconds = None;
456 } else if output.has_pending() {
457 let now = timestamp_counter.read_nanoseconds();
458 match *next_flush_deadline_nanoseconds {
459 Some(deadline_nanoseconds) if now > deadline_nanoseconds => {
460 output.flush()?;
461 *next_flush_deadline_nanoseconds = None;
462 }
463 None => {
464 *next_flush_deadline_nanoseconds = Some(now + flush_delay_nanoseconds);
465 }
466 _ => {}
467 }
468 } else {
469 *next_flush_deadline_nanoseconds = None;
470 }
471
472 Ok(())
473}
474
475pub(crate) fn handle_queue_full() -> QueueFullAction {
484 let count = QUEUE_FULL_COUNT.fetch_add(1, Ordering::Relaxed) + 1;
485 let ptr = QUEUE_FULL_CALLBACK.load(Ordering::Acquire);
486 if !ptr.is_null() {
487 let callback: QueueFullCallback = unsafe { std::mem::transmute(ptr) };
489 callback(count);
490 }
491
492 match queue_full_policy_from_u8(QUEUE_FULL_POLICY.load(Ordering::Relaxed)) {
493 QueueFullPolicy::Drop => QueueFullAction::Drop,
494 QueueFullPolicy::Block => QueueFullAction::Retry,
495 }
496}
497
498#[inline]
501pub(crate) fn queue_full_policy_from_u8(value: u8) -> QueueFullPolicy {
502 match value {
503 1 => QueueFullPolicy::Block,
504 _ => QueueFullPolicy::Drop,
505 }
506}