Skip to main content

ferrilog_core/
logger.rs

1#[cfg(debug_assertions)]
2use std::sync::atomic::AtomicBool;
3use std::{
4    cell::UnsafeCell,
5    io::{self, Write},
6    sync::{
7        OnceLock,
8        atomic::{AtomicI64, AtomicPtr, AtomicU8, AtomicU32, AtomicU64, Ordering},
9    },
10};
11
12use crate::{
13    Level, StaticHeaderWriteFn, level_from_u8,
14    output::{OutputBuffer, RotationConfig},
15    poller::{self, PollerState, ReadyMessageHeap},
16    spsc_queue::{PAYLOAD_ARGS_OFFSET, PAYLOAD_INFO_OFFSET, SpscVarQueue},
17    thread_buffer::{THREAD_BUFFER_REGISTRY, ThreadBuffer},
18    timestamp_counter::TimestampCounter,
19};
20
21/// Function pointer type for decoding a log record's payload into a formatted buffer.
22pub type DecodeFn = unsafe fn(*const u8, &mut Vec<u8>);
23
24/// Callback function type invoked when the per-thread SPSC queue is full.
25/// The argument is the cumulative count of queue-full events.
26pub type QueueFullCallback = fn(u64);
27
28/// Default log level (DBG = accept all messages).
29const DEFAULT_LEVEL: u8 = Level::DBG as u8;
30
31/// Default flush delay (3 seconds in nanoseconds).
32const DEFAULT_FLUSH_DELAY_NS: i64 = 3_000_000_000;
33
34/// Default flush buffer size (8 KB).
35const DEFAULT_FLUSH_BUFFER_SIZE: u32 = 8 * 1024;
36
37/// Default flush level (OFF = never auto-flush by level).
38const DEFAULT_FLUSH_LEVEL: u8 = Level::OFF as u8;
39
40/// Default queue-full policy (Drop).
41const DEFAULT_QUEUE_FULL_POLICY: u8 = QueueFullPolicy::Drop as u8;
42
43/// Global log level filter. Read on every `log!()` call (hot path).
44///
45/// All top-level static configuration fields below are typically written once
46/// at startup (write-once-read-forever) and read on every log call or
47/// queue-full event. Placing them as top-level statics eliminates the
48/// `OnceLock` indirection of `GlobalLogger` on the hot path.
49///
50/// # Ordering rationale
51///
52/// - `CURRENT_LEVEL` uses `Relaxed`: log level filtering is eventually
53///   consistent; a few extra messages after `set_level()` is acceptable.
54///   On x86 a `Relaxed` load is identical to a plain `mov`.
55/// - Config fields (`FLUSH_*`, `QUEUE_FULL_*`) use `Relaxed`: they are set
56///   once at startup and read on cold paths (poll / queue-full). Eventual
57///   consistency is sufficient.
58/// - `QUEUE_FULL_COUNT` uses `Relaxed` for `fetch_add`: the count is
59///   advisory; exact ordering between increments across threads is not
60///   required.
61static CURRENT_LEVEL: AtomicU8 = AtomicU8::new(DEFAULT_LEVEL);
62
63/// Flush delay in nanoseconds. Read by the poller each cycle.
64static FLUSH_DELAY_NS: AtomicI64 = AtomicI64::new(DEFAULT_FLUSH_DELAY_NS);
65
66/// Flush buffer size threshold in bytes. Read by the poller each cycle.
67static FLUSH_BUFFER_SIZE: AtomicU32 = AtomicU32::new(DEFAULT_FLUSH_BUFFER_SIZE);
68
69/// Minimum level that triggers an immediate flush. Read by the poller.
70static FLUSH_LEVEL: AtomicU8 = AtomicU8::new(DEFAULT_FLUSH_LEVEL);
71
72/// Queue-full policy (Drop=0, Block=1). Read when queue is full.
73static QUEUE_FULL_POLICY: AtomicU8 = AtomicU8::new(DEFAULT_QUEUE_FULL_POLICY);
74
75/// Cumulative count of queue-full events. Incremented by writer threads.
76static QUEUE_FULL_COUNT: AtomicU64 = AtomicU64::new(0);
77
78/// Callback function pointer for queue-full events. null = no callback.
79static QUEUE_FULL_CALLBACK: AtomicPtr<()> = AtomicPtr::new(std::ptr::null_mut());
80
81// ---------------------------------------------------------------------------
82// Public types
83// ---------------------------------------------------------------------------
84
85/// Policy applied when a thread's SPSC log queue is full.
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87#[repr(u8)]
88pub enum QueueFullPolicy {
89    /// Drop the log message.
90    Drop = 0,
91    /// Block (spin) until space becomes available.
92    Block = 1,
93}
94
95/// Internal action derived from [`QueueFullPolicy`].
96pub(crate) enum QueueFullAction {
97    /// Discard the current log message.
98    Drop,
99    /// Retry pushing the message into the queue.
100    Retry,
101}
102
103/// Compile-time metadata associated with a single log call site.
104#[derive(Clone, Copy)]
105pub struct StaticInfo {
106    /// Severity level of this call site.
107    pub level: Level,
108    /// Source location string (e.g. `"src/main.rs:42"`).
109    pub location: &'static str,
110    /// The format string for this log call site.
111    pub format_string: &'static str,
112    /// Decoder function that reads the payload and appends formatted output.
113    pub decode: DecodeFn,
114}
115
116// ---------------------------------------------------------------------------
117// GlobalLogger — only holds state that requires runtime initialization
118// ---------------------------------------------------------------------------
119
120/// Mutable state accessed exclusively by the poller thread (or the
121/// configuration API when the poller is stopped).
122struct LoggerInner {
123    timestamp_counter: TimestampCounter,
124    output: OutputBuffer,
125    next_flush_deadline_nanoseconds: Option<i64>,
126}
127
128/// The global logger instance.
129///
130/// Only contains state that requires runtime initialization
131/// ([`TimestampCounter`], [`OutputBuffer`]) and the poller state machine.
132/// All configuration atomics live as top-level statics so the hot path
133/// (`check_level`) never touches this struct.
134struct GlobalLogger {
135    inner: UnsafeCell<LoggerInner>,
136    poller_state: PollerState,
137    #[cfg(debug_assertions)]
138    inner_in_use: AtomicBool,
139}
140
141// Safety: LoggerInner has exclusive access guaranteed by the poller state
142// machine. PollerState is internally Sync.
143unsafe impl Sync for GlobalLogger {}
144
145impl GlobalLogger {
146    fn new() -> Self {
147        let mut timestamp_counter = TimestampCounter::new();
148        timestamp_counter.init_default();
149        Self {
150            inner: UnsafeCell::new(LoggerInner {
151                timestamp_counter,
152                output: OutputBuffer::new(),
153                next_flush_deadline_nanoseconds: None,
154            }),
155            poller_state: PollerState::default(),
156            #[cfg(debug_assertions)]
157            inner_in_use: AtomicBool::new(false),
158        }
159    }
160
161    /// Obtain an exclusive raw pointer to [`LoggerInner`].
162    ///
163    /// # Safety
164    /// The caller must guarantee that only one thread accesses `inner`
165    /// at any given time:
166    /// - During `poll()`: the poller thread has exclusive access.
167    /// - During configuration API calls: the poller must be stopped.
168    ///
169    /// The returned pointer must not be used to create overlapping `&mut` references.
170    #[inline]
171    unsafe fn inner_ptr(&self) -> *mut LoggerInner {
172        #[cfg(debug_assertions)]
173        assert!(
174            !self.inner_in_use.swap(true, Ordering::AcqRel),
175            "ferrilog: concurrent access to LoggerInner detected"
176        );
177        self.inner.get()
178    }
179
180    /// Release the exclusive-access marker for `inner` (debug mode only).
181    #[inline]
182    unsafe fn inner_release(&self) {
183        #[cfg(debug_assertions)]
184        self.inner_in_use.store(false, Ordering::Release);
185    }
186
187    fn ensure_poller_stopped(&self) -> io::Result<()> {
188        if self.poller_state.is_running() {
189            Err(io::Error::other("polling thread is running; call stop_polling_thread() first"))
190        } else {
191            Ok(())
192        }
193    }
194}
195
196/// Return a reference to the lazily-initialized global logger singleton.
197///
198/// This is only called on cold paths (poll, output/header configuration,
199/// poller start/stop). The hot path (`check_level`) does NOT go through
200/// this function.
201fn global_logger() -> &'static GlobalLogger {
202    static LOGGER: OnceLock<GlobalLogger> = OnceLock::new();
203    LOGGER.get_or_init(GlobalLogger::new)
204}
205
206// ---------------------------------------------------------------------------
207// Public API — hot-path level check (no OnceLock indirection)
208// ---------------------------------------------------------------------------
209
210/// Set the global minimum log level. Messages below this level are discarded.
211///
212/// Uses `Relaxed` ordering: the change is eventually visible to all threads.
213pub fn set_level(level: Level) {
214    CURRENT_LEVEL.store(level as u8, Ordering::Relaxed);
215}
216
217/// Return `true` if the given level is enabled under the current global level.
218///
219/// This is called on every `log!()` invocation. It is a single `Relaxed`
220/// atomic load — no OnceLock, no indirection, one instruction on x86.
221#[inline(always)]
222pub fn check_level(level: Level) -> bool {
223    level as u8 >= CURRENT_LEVEL.load(Ordering::Relaxed)
224}
225
226// ---------------------------------------------------------------------------
227// Public API — configuration (write-once at startup, read on cold paths)
228// ---------------------------------------------------------------------------
229
230/// Replace the output writer. The polling thread must be stopped first.
231pub fn set_output_writer<W>(writer: W)
232where
233    W: Write + Send + 'static,
234{
235    let logger = global_logger();
236    logger
237        .ensure_poller_stopped()
238        .expect("set_output_writer requires the polling thread to be stopped first");
239    // Safety: poller is stopped, only the current thread accesses inner
240    unsafe {
241        let inner = &mut *logger.inner_ptr();
242        inner.output.set_writer(Box::new(writer));
243        logger.inner_release();
244    }
245}
246
247/// Set the policy applied when a thread's log queue is full.
248pub fn set_queue_full_policy(policy: QueueFullPolicy) {
249    QUEUE_FULL_POLICY.store(policy as u8, Ordering::Relaxed);
250}
251
252/// Set (or clear) the callback invoked on each queue-full event.
253pub fn set_queue_full_callback(callback: Option<QueueFullCallback>) {
254    let ptr = match callback {
255        Some(f) => f as *mut (),
256        None => std::ptr::null_mut(),
257    };
258    QUEUE_FULL_CALLBACK.store(ptr, Ordering::Release);
259}
260
261/// Return the cumulative number of queue-full events since the last reset.
262pub fn queue_full_count() -> u64 {
263    QUEUE_FULL_COUNT.load(Ordering::Relaxed)
264}
265
266/// Reset the queue-full event counter to zero.
267pub fn reset_queue_full_count() {
268    QUEUE_FULL_COUNT.store(0, Ordering::Relaxed);
269}
270
271/// Set the flush delay in nanoseconds. Buffered output is flushed after
272/// this duration elapses with no new messages.
273pub fn set_flush_delay(nanoseconds: i64) {
274    FLUSH_DELAY_NS.store(nanoseconds.max(0), Ordering::Relaxed);
275}
276
277/// Set the flush buffer size threshold in bytes. The output buffer is
278/// flushed when its length reaches this value.
279pub fn set_flush_buffer_size(bytes: u32) {
280    FLUSH_BUFFER_SIZE.store(bytes, Ordering::Relaxed);
281}
282
283/// Set the level at or above which a flush is triggered immediately
284/// after writing a record.
285pub fn flush_on(level: Level) {
286    FLUSH_LEVEL.store(level as u8, Ordering::Relaxed);
287}
288
289/// Set the log output file path. The polling thread must be stopped first.
290pub fn set_log_file(path: &str) -> io::Result<()> {
291    let logger = global_logger();
292    logger.ensure_poller_stopped()?;
293    // Safety: poller is stopped, only the current thread accesses inner
294    unsafe {
295        let inner = &mut *logger.inner_ptr();
296        let result = inner.output.set_log_file(path);
297        logger.inner_release();
298        result
299    }
300}
301
302/// Set the log file with automatic size-based rotation.
303/// The polling thread must be stopped first.
304pub fn set_log_file_with_rotation(path: &str, config: RotationConfig) -> io::Result<()> {
305    let logger = global_logger();
306    logger.ensure_poller_stopped()?;
307    // Safety: poller is stopped, only the current thread accesses inner
308    unsafe {
309        let inner = &mut *logger.inner_ptr();
310        let result = inner.output.set_log_file_with_rotation(path, config);
311        logger.inner_release();
312        result
313    }
314}
315
316/// Set the header pattern for log output formatting. The polling thread
317/// must be stopped first.
318pub fn set_header_pattern(pattern: &str) -> io::Result<()> {
319    let logger = global_logger();
320    logger.ensure_poller_stopped()?;
321    // Safety: poller is stopped, only the current thread accesses inner
322    unsafe {
323        let inner = &mut *logger.inner_ptr();
324        let result = inner.output.set_header_pattern(pattern);
325        logger.inner_release();
326        result
327    }
328}
329
330/// Install a compile-time-generated static header writer. The polling thread
331/// must be stopped first.
332pub fn set_static_header(write: StaticHeaderWriteFn) -> io::Result<()> {
333    let logger = global_logger();
334    logger.ensure_poller_stopped()?;
335    unsafe {
336        let inner = &mut *logger.inner_ptr();
337        inner.output.set_static_header(write);
338        logger.inner_release();
339    }
340    Ok(())
341}
342
343// ---------------------------------------------------------------------------
344// Public API — poller control
345// ---------------------------------------------------------------------------
346
347/// Start the background polling thread that drains per-thread log queues.
348pub fn start_polling_thread(interval_nanoseconds: i64) -> io::Result<()> {
349    global_logger().poller_state.start(interval_nanoseconds, crate::logger::poll)
350}
351
352/// Start the background polling thread pinned to the specified CPU core.
353/// On Linux this uses `sched_setaffinity`; on other platforms the core_id hint is ignored.
354pub fn start_polling_thread_on_core(interval_nanoseconds: i64, core_id: usize) -> io::Result<()> {
355    global_logger().poller_state.start_on_core(
356        interval_nanoseconds,
357        crate::logger::poll,
358        Some(core_id),
359    )
360}
361
362/// Stop the background polling thread and wait for it to finish.
363pub fn stop_polling_thread() -> io::Result<()> {
364    global_logger().poller_state.stop()
365}
366
367// ---------------------------------------------------------------------------
368// Poll — drains all thread buffers, decodes messages, writes output
369// ---------------------------------------------------------------------------
370
371/// Poll all thread buffers once, decode pending messages, and write them
372/// to the output. If `force_flush` is true the output is flushed
373/// unconditionally.
374pub fn poll(force_flush: bool) -> io::Result<()> {
375    // Read config from top-level statics (no OnceLock indirection)
376    let flush_buffer_size = FLUSH_BUFFER_SIZE.load(Ordering::Relaxed) as usize;
377    let flush_level = level_from_u8(FLUSH_LEVEL.load(Ordering::Relaxed));
378    let flush_delay_nanoseconds = FLUSH_DELAY_NS.load(Ordering::Relaxed);
379
380    let mut buffers = Vec::with_capacity(THREAD_BUFFER_REGISTRY.count());
381    for index in 0..THREAD_BUFFER_REGISTRY.count() {
382        let pointer = THREAD_BUFFER_REGISTRY.get(index);
383        if !pointer.is_null() {
384            buffers.push(pointer);
385        }
386    }
387
388    let logger = global_logger();
389    // Safety: poll() is only called from the poller thread (or manually, never concurrently)
390    let inner = unsafe { &mut *logger.inner_ptr() };
391    let result = poll_inner(
392        inner,
393        &buffers,
394        flush_buffer_size,
395        flush_level,
396        flush_delay_nanoseconds,
397        force_flush,
398    );
399    unsafe { logger.inner_release() };
400    result
401}
402
403fn poll_inner(
404    inner: &mut LoggerInner,
405    buffers: &[*mut ThreadBuffer],
406    flush_buffer_size: usize,
407    flush_level: Level,
408    flush_delay_nanoseconds: i64,
409    force_flush: bool,
410) -> io::Result<()> {
411    let LoggerInner { timestamp_counter, output, next_flush_deadline_nanoseconds } = inner;
412
413    timestamp_counter.calibrate();
414    let mut heap = ReadyMessageHeap::from_buffers(buffers);
415
416    while let Some(node) = heap.pop() {
417        let payload = unsafe { (*node.header()).payload_pointer() as *const u8 };
418
419        // Read *const StaticInfo from the beginning of the payload
420        let info: &StaticInfo = unsafe {
421            let info_ptr =
422                payload.add(PAYLOAD_INFO_OFFSET).cast::<*const StaticInfo>().read_unaligned();
423            &*info_ptr
424        };
425
426        let timestamp_nanoseconds =
427            timestamp_counter.timestamp_to_nanoseconds(node.raw_timestamp());
428        let thread_name = unsafe { (*node.thread_buffer()).name_bytes() };
429        unsafe {
430            output.write_record(
431                timestamp_nanoseconds,
432                info.level,
433                thread_name,
434                info.location,
435                |buffer| (info.decode)(payload.add(PAYLOAD_ARGS_OFFSET), buffer),
436            )?;
437        }
438
439        if output.buffered_len() >= flush_buffer_size || info.level >= flush_level {
440            output.flush()?;
441            *next_flush_deadline_nanoseconds = None;
442        }
443
444        unsafe {
445            let queue_pointer = &raw mut (*node.thread_buffer()).queue;
446            SpscVarQueue::consumer_pop(queue_pointer);
447        }
448        heap.refresh_buffer(node.thread_buffer());
449    }
450
451    poller::cleanup_finished_buffers();
452
453    if force_flush {
454        output.flush()?;
455        *next_flush_deadline_nanoseconds = None;
456    } else if output.has_pending() {
457        let now = timestamp_counter.read_nanoseconds();
458        match *next_flush_deadline_nanoseconds {
459            Some(deadline_nanoseconds) if now > deadline_nanoseconds => {
460                output.flush()?;
461                *next_flush_deadline_nanoseconds = None;
462            }
463            None => {
464                *next_flush_deadline_nanoseconds = Some(now + flush_delay_nanoseconds);
465            }
466            _ => {}
467        }
468    } else {
469        *next_flush_deadline_nanoseconds = None;
470    }
471
472    Ok(())
473}
474
475// ---------------------------------------------------------------------------
476// Queue-full handler (cold path — no OnceLock indirection)
477// ---------------------------------------------------------------------------
478
479/// Handle a queue-full event: increment the counter, invoke the callback
480/// (if set), and return the configured action (Drop or Retry).
481///
482/// Reads directly from top-level statics — no `global_logger()` call.
483pub(crate) fn handle_queue_full() -> QueueFullAction {
484    let count = QUEUE_FULL_COUNT.fetch_add(1, Ordering::Relaxed) + 1;
485    let ptr = QUEUE_FULL_CALLBACK.load(Ordering::Acquire);
486    if !ptr.is_null() {
487        // Safety: ptr was converted from a valid fn(u64) pointer by set_queue_full_callback
488        let callback: QueueFullCallback = unsafe { std::mem::transmute(ptr) };
489        callback(count);
490    }
491
492    match queue_full_policy_from_u8(QUEUE_FULL_POLICY.load(Ordering::Relaxed)) {
493        QueueFullPolicy::Drop => QueueFullAction::Drop,
494        QueueFullPolicy::Block => QueueFullAction::Retry,
495    }
496}
497
498/// Convert a raw `u8` to a [`QueueFullPolicy`]. Any unrecognized value
499/// defaults to [`QueueFullPolicy::Drop`].
500#[inline]
501pub(crate) fn queue_full_policy_from_u8(value: u8) -> QueueFullPolicy {
502    match value {
503        1 => QueueFullPolicy::Block,
504        _ => QueueFullPolicy::Drop,
505    }
506}