Skip to main content

ferrilog_core/
thread_buffer.rs

1use std::{
2    cell::Cell,
3    sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering},
4};
5
6use crate::spsc_queue::{BLOCK_COUNT, MessageHeader, SpscVarQueue};
7
8/// Per-thread buffer containing an SPSC queue and thread metadata.
9/// Allocated on the heap via `new_on_heap()` (1MB+, too large for the stack).
10#[repr(C)]
11pub struct ThreadBuffer {
12    /// The SPSC variable-length message queue.
13    pub queue: SpscVarQueue,
14    /// Flag indicating whether this buffer should be deallocated by the background poller.
15    pub should_deallocate: AtomicBool,
16    /// Thread name stored as raw UTF-8 bytes (up to 32 bytes).
17    pub name: [u8; 32],
18    /// Number of valid bytes in `name`.
19    pub name_length: usize,
20}
21
22unsafe impl Send for ThreadBuffer {}
23
24impl ThreadBuffer {
25    /// Allocates a zero-initialized `ThreadBuffer` on the heap.
26    pub fn new_on_heap() -> *mut Self {
27        unsafe {
28            let layout = std::alloc::Layout::new::<Self>();
29            let pointer = std::alloc::alloc_zeroed(layout) as *mut Self;
30            if pointer.is_null() {
31                std::alloc::handle_alloc_error(layout);
32            }
33            let queue_pointer = &raw mut (*pointer).queue;
34            let free_write_count_pointer = (queue_pointer as *mut u8)
35                .add(core::mem::offset_of!(SpscVarQueue, free_write_count))
36                as *mut u32;
37            free_write_count_pointer.write(BLOCK_COUNT);
38            (*pointer).should_deallocate = AtomicBool::new(false);
39            pointer
40        }
41    }
42
43    /// Frees a heap-allocated `ThreadBuffer`.
44    ///
45    /// # Safety
46    /// `pointer` must have been returned by `new_on_heap()` and must only be freed once.
47    pub unsafe fn free_on_heap(pointer: *mut Self) {
48        let layout = std::alloc::Layout::new::<Self>();
49        unsafe { std::alloc::dealloc(pointer as *mut u8, layout) };
50    }
51
52    /// Sets the thread name, truncated to the buffer capacity (32 bytes).
53    pub fn set_name(&mut self, thread_name: &str) {
54        let length = thread_name.len().min(self.name.len());
55        self.name[..length].copy_from_slice(&thread_name.as_bytes()[..length]);
56        self.name_length = length;
57    }
58
59    /// Returns the thread name as a byte slice.
60    pub fn name_bytes(&self) -> &[u8] {
61        &self.name[..self.name_length]
62    }
63}
64
65/// Maximum number of concurrent writer threads supported. Log calls from threads
66/// exceeding this limit will panic. 1MB queue x 256 = 256MB memory ceiling,
67/// which is sufficient for most use cases.
68pub const MAX_THREADS: usize = 256;
69
70/// Lock-free thread buffer registry.
71///
72/// # Design
73///
74/// - Fixed-size `AtomicPtr` array with capacity [`MAX_THREADS`].
75/// - Writer (application thread): atomic `fetch_add` to claim a slot index,
76///   then `store` the pointer.
77/// - Reader (poller): iterates `0..count`, loading each slot's pointer.
78/// - No Mutex, no heap allocation; registration path is a single atomic
79///   operation.
80///
81/// The writer appends once during `preallocate()` (fetch_add + store),
82/// and the reader traverses lock-free during `poll()`. Zero contention.
83pub struct ThreadBufferRegistry {
84    /// Number of registered slots (monotonically increasing).
85    count: AtomicUsize,
86    /// Fixed-size array where each slot stores a `*mut ThreadBuffer`.
87    /// Unused slots hold null.
88    slots: [AtomicPtr<ThreadBuffer>; MAX_THREADS],
89}
90
91unsafe impl Sync for ThreadBufferRegistry {}
92
93impl Default for ThreadBufferRegistry {
94    fn default() -> Self {
95        Self::new()
96    }
97}
98
99impl ThreadBufferRegistry {
100    /// Returns a null atomic pointer used to initialize every slot in the array.
101    ///
102    /// This is a function rather than a `const` because `AtomicPtr` has interior
103    /// mutability, and clippy warns against `const` items with interior mutability
104    /// since each use site gets a distinct instance rather than sharing one.
105    #[inline]
106    const fn null_slot() -> AtomicPtr<ThreadBuffer> {
107        AtomicPtr::new(std::ptr::null_mut())
108    }
109
110    /// Creates a new empty registry.
111    pub const fn new() -> Self {
112        Self { count: AtomicUsize::new(0), slots: [const { Self::null_slot() }; MAX_THREADS] }
113    }
114
115    /// Registers a `ThreadBuffer`. Called by the writer side, once per thread.
116    ///
117    /// Internally performs a single fetch_add (claim slot) + a single store (write pointer).
118    pub fn register(&self, thread_buffer: *mut ThreadBuffer) {
119        let index = self.count.fetch_add(1, Ordering::AcqRel);
120        assert!(index < MAX_THREADS, "ferrilog: exceeded maximum thread count {MAX_THREADS}");
121        self.slots[index].store(thread_buffer, Ordering::Release);
122    }
123
124    /// Returns the current number of registered slots.
125    #[inline]
126    pub fn count(&self) -> usize {
127        self.count.load(Ordering::Acquire)
128    }
129
130    /// Loads the pointer at the given slot index. Called by the reader side, lock-free.
131    ///
132    /// Returns null if the slot has not yet been fully written (a very brief window).
133    #[inline]
134    pub fn get(&self, index: usize) -> *mut ThreadBuffer {
135        self.slots[index].load(Ordering::Acquire)
136    }
137
138    /// Clears a slot by storing null (called by the poller after reclaiming a `ThreadBuffer`).
139    pub fn clear_slot(&self, index: usize) {
140        self.slots[index].store(std::ptr::null_mut(), Ordering::Release);
141    }
142}
143
144/// Global lock-free registry.
145pub static THREAD_BUFFER_REGISTRY: ThreadBufferRegistry = ThreadBufferRegistry::new();
146
147// --- Thread-local management ---
148
149/// Thread-local pointer to the current thread's `ThreadBuffer`.
150#[thread_local]
151static BUFFER_POINTER: Cell<*mut ThreadBuffer> = Cell::new(std::ptr::null_mut());
152
153/// RAII guard that sets `should_deallocate` on thread exit so the background
154/// poller can reclaim the buffer.
155struct BufferGuard {
156    _non_zero_sized: u8,
157}
158
159impl Drop for BufferGuard {
160    fn drop(&mut self) {
161        let pointer = BUFFER_POINTER.get();
162        if !pointer.is_null() {
163            unsafe {
164                (*pointer).should_deallocate.store(true, Ordering::Release);
165            }
166            BUFFER_POINTER.set(std::ptr::null_mut());
167        }
168    }
169}
170
171thread_local! {
172    static BUFFER_GUARD: BufferGuard = const { BufferGuard { _non_zero_sized: 0 } };
173}
174
175/// Ensures the current thread has an allocated `ThreadBuffer`.
176/// If not yet allocated, creates one on the heap and registers it in the
177/// global lock-free registry.
178#[doc(hidden)]
179pub fn preallocate() {
180    if !BUFFER_POINTER.get().is_null() {
181        return;
182    }
183
184    let thread_buffer = ThreadBuffer::new_on_heap();
185
186    unsafe {
187        let thread_id = std::thread::current().id();
188        let name = format!("{:?}", thread_id);
189        (*thread_buffer).set_name(&name);
190    }
191
192    BUFFER_POINTER.set(thread_buffer);
193    BUFFER_GUARD.with(|_| {});
194
195    // Lock-free registration: one fetch_add + one store
196    THREAD_BUFFER_REGISTRY.register(thread_buffer);
197}
198
199/// Returns the current thread's `ThreadBuffer` pointer, allocating one first if needed.
200///
201/// The null check is kept on the hot path but the allocation branch is
202/// marked `#[cold]` + `#[inline(never)]` so the compiler lays out the
203/// fast path (non-null return) contiguously in the instruction cache.
204#[inline(always)]
205pub fn get_thread_buffer() -> *mut ThreadBuffer {
206    let pointer = BUFFER_POINTER.get();
207    if core::intrinsics::unlikely(pointer.is_null()) { get_thread_buffer_slow() } else { pointer }
208}
209
210/// Cold path: allocate and register a new `ThreadBuffer` for this thread.
211#[cold]
212#[inline(never)]
213fn get_thread_buffer_slow() -> *mut ThreadBuffer {
214    preallocate();
215    BUFFER_POINTER.get()
216}
217
218/// Sets the logging thread name for the current thread.
219///
220/// The name is truncated to 32 bytes and used for the `{thread}` token in the
221/// header pattern.
222pub fn set_thread_name(thread_name: &str) {
223    let thread_buffer = get_thread_buffer();
224    unsafe {
225        (*thread_buffer).set_name(thread_name);
226    }
227}
228
229/// Allocates space for a message in the current thread's SPSC queue.
230#[inline(always)]
231pub fn alloc_message(size: u32) -> Option<*mut MessageHeader> {
232    let thread_buffer = get_thread_buffer();
233    let mut spin_count = 0u32;
234
235    loop {
236        let result = unsafe {
237            let queue_pointer = &raw mut (*thread_buffer).queue;
238            SpscVarQueue::producer_alloc(queue_pointer, size)
239        };
240
241        if core::intrinsics::likely(result.is_some()) {
242            return result;
243        }
244
245        match crate::logger::handle_queue_full() {
246            crate::logger::QueueFullAction::Drop => return None,
247            crate::logger::QueueFullAction::Retry => {
248                spin_count += 1;
249                if spin_count <= 64 {
250                    std::hint::spin_loop();
251                } else {
252                    std::thread::yield_now();
253                    spin_count = 0;
254                }
255            }
256        }
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263
264    #[test]
265    fn test_preallocate_and_get() {
266        preallocate();
267        let pointer = get_thread_buffer();
268        assert!(!pointer.is_null());
269        let pointer2 = get_thread_buffer();
270        assert_eq!(pointer, pointer2);
271    }
272
273    #[test]
274    fn test_alloc_message() {
275        let header = alloc_message(16);
276        assert!(header.is_some());
277    }
278
279    #[test]
280    fn test_thread_buffer_registered() {
281        preallocate();
282        let count = THREAD_BUFFER_REGISTRY.count();
283        assert!(count > 0);
284    }
285
286    #[test]
287    fn test_set_thread_name() {
288        set_thread_name("worker-a");
289        let pointer = get_thread_buffer();
290        let name = unsafe { std::str::from_utf8((*pointer).name_bytes()).unwrap() };
291        assert_eq!(name, "worker-a");
292    }
293}