ferrilog_core/
thread_buffer.rs1use std::{
2 cell::Cell,
3 sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering},
4};
5
6use crate::spsc_queue::{BLOCK_COUNT, MessageHeader, SpscVarQueue};
7
8#[repr(C)]
11pub struct ThreadBuffer {
12 pub queue: SpscVarQueue,
14 pub should_deallocate: AtomicBool,
16 pub name: [u8; 32],
18 pub name_length: usize,
20}
21
22unsafe impl Send for ThreadBuffer {}
23
24impl ThreadBuffer {
25 pub fn new_on_heap() -> *mut Self {
27 unsafe {
28 let layout = std::alloc::Layout::new::<Self>();
29 let pointer = std::alloc::alloc_zeroed(layout) as *mut Self;
30 if pointer.is_null() {
31 std::alloc::handle_alloc_error(layout);
32 }
33 let queue_pointer = &raw mut (*pointer).queue;
34 let free_write_count_pointer = (queue_pointer as *mut u8)
35 .add(core::mem::offset_of!(SpscVarQueue, free_write_count))
36 as *mut u32;
37 free_write_count_pointer.write(BLOCK_COUNT);
38 (*pointer).should_deallocate = AtomicBool::new(false);
39 pointer
40 }
41 }
42
43 pub unsafe fn free_on_heap(pointer: *mut Self) {
48 let layout = std::alloc::Layout::new::<Self>();
49 unsafe { std::alloc::dealloc(pointer as *mut u8, layout) };
50 }
51
52 pub fn set_name(&mut self, thread_name: &str) {
54 let length = thread_name.len().min(self.name.len());
55 self.name[..length].copy_from_slice(&thread_name.as_bytes()[..length]);
56 self.name_length = length;
57 }
58
59 pub fn name_bytes(&self) -> &[u8] {
61 &self.name[..self.name_length]
62 }
63}
64
65pub const MAX_THREADS: usize = 256;
69
70pub struct ThreadBufferRegistry {
84 count: AtomicUsize,
86 slots: [AtomicPtr<ThreadBuffer>; MAX_THREADS],
89}
90
91unsafe impl Sync for ThreadBufferRegistry {}
92
93impl Default for ThreadBufferRegistry {
94 fn default() -> Self {
95 Self::new()
96 }
97}
98
99impl ThreadBufferRegistry {
100 #[inline]
106 const fn null_slot() -> AtomicPtr<ThreadBuffer> {
107 AtomicPtr::new(std::ptr::null_mut())
108 }
109
110 pub const fn new() -> Self {
112 Self { count: AtomicUsize::new(0), slots: [const { Self::null_slot() }; MAX_THREADS] }
113 }
114
115 pub fn register(&self, thread_buffer: *mut ThreadBuffer) {
119 let index = self.count.fetch_add(1, Ordering::AcqRel);
120 assert!(index < MAX_THREADS, "ferrilog: exceeded maximum thread count {MAX_THREADS}");
121 self.slots[index].store(thread_buffer, Ordering::Release);
122 }
123
124 #[inline]
126 pub fn count(&self) -> usize {
127 self.count.load(Ordering::Acquire)
128 }
129
130 #[inline]
134 pub fn get(&self, index: usize) -> *mut ThreadBuffer {
135 self.slots[index].load(Ordering::Acquire)
136 }
137
138 pub fn clear_slot(&self, index: usize) {
140 self.slots[index].store(std::ptr::null_mut(), Ordering::Release);
141 }
142}
143
144pub static THREAD_BUFFER_REGISTRY: ThreadBufferRegistry = ThreadBufferRegistry::new();
146
147#[thread_local]
151static BUFFER_POINTER: Cell<*mut ThreadBuffer> = Cell::new(std::ptr::null_mut());
152
153struct BufferGuard {
156 _non_zero_sized: u8,
157}
158
159impl Drop for BufferGuard {
160 fn drop(&mut self) {
161 let pointer = BUFFER_POINTER.get();
162 if !pointer.is_null() {
163 unsafe {
164 (*pointer).should_deallocate.store(true, Ordering::Release);
165 }
166 BUFFER_POINTER.set(std::ptr::null_mut());
167 }
168 }
169}
170
171thread_local! {
172 static BUFFER_GUARD: BufferGuard = const { BufferGuard { _non_zero_sized: 0 } };
173}
174
175#[doc(hidden)]
179pub fn preallocate() {
180 if !BUFFER_POINTER.get().is_null() {
181 return;
182 }
183
184 let thread_buffer = ThreadBuffer::new_on_heap();
185
186 unsafe {
187 let thread_id = std::thread::current().id();
188 let name = format!("{:?}", thread_id);
189 (*thread_buffer).set_name(&name);
190 }
191
192 BUFFER_POINTER.set(thread_buffer);
193 BUFFER_GUARD.with(|_| {});
194
195 THREAD_BUFFER_REGISTRY.register(thread_buffer);
197}
198
199#[inline(always)]
205pub fn get_thread_buffer() -> *mut ThreadBuffer {
206 let pointer = BUFFER_POINTER.get();
207 if core::intrinsics::unlikely(pointer.is_null()) { get_thread_buffer_slow() } else { pointer }
208}
209
210#[cold]
212#[inline(never)]
213fn get_thread_buffer_slow() -> *mut ThreadBuffer {
214 preallocate();
215 BUFFER_POINTER.get()
216}
217
218pub fn set_thread_name(thread_name: &str) {
223 let thread_buffer = get_thread_buffer();
224 unsafe {
225 (*thread_buffer).set_name(thread_name);
226 }
227}
228
229#[inline(always)]
231pub fn alloc_message(size: u32) -> Option<*mut MessageHeader> {
232 let thread_buffer = get_thread_buffer();
233 let mut spin_count = 0u32;
234
235 loop {
236 let result = unsafe {
237 let queue_pointer = &raw mut (*thread_buffer).queue;
238 SpscVarQueue::producer_alloc(queue_pointer, size)
239 };
240
241 if core::intrinsics::likely(result.is_some()) {
242 return result;
243 }
244
245 match crate::logger::handle_queue_full() {
246 crate::logger::QueueFullAction::Drop => return None,
247 crate::logger::QueueFullAction::Retry => {
248 spin_count += 1;
249 if spin_count <= 64 {
250 std::hint::spin_loop();
251 } else {
252 std::thread::yield_now();
253 spin_count = 0;
254 }
255 }
256 }
257 }
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263
264 #[test]
265 fn test_preallocate_and_get() {
266 preallocate();
267 let pointer = get_thread_buffer();
268 assert!(!pointer.is_null());
269 let pointer2 = get_thread_buffer();
270 assert_eq!(pointer, pointer2);
271 }
272
273 #[test]
274 fn test_alloc_message() {
275 let header = alloc_message(16);
276 assert!(header.is_some());
277 }
278
279 #[test]
280 fn test_thread_buffer_registered() {
281 preallocate();
282 let count = THREAD_BUFFER_REGISTRY.count();
283 assert!(count > 0);
284 }
285
286 #[test]
287 fn test_set_thread_name() {
288 set_thread_name("worker-a");
289 let pointer = get_thread_buffer();
290 let name = unsafe { std::str::from_utf8((*pointer).name_bytes()).unwrap() };
291 assert_eq!(name, "worker-a");
292 }
293}