Skip to main content

ferrilog_core/
spsc_queue.rs

1use core::mem::{MaybeUninit, size_of};
2use std::sync::atomic::{AtomicU32, Ordering};
3
4/// 1 MiB in bytes.
5const MIB: usize = 1024 * 1024;
6
7/// Per-thread log buffer size in bytes, selected at compile time via Cargo
8/// features.
9///
10/// # Configuration
11///
12/// Add one of the `buffer-*` features in your `Cargo.toml`:
13///
14/// ```toml
15/// ferrilog = { version = "0.1", features = ["buffer-4m"] }
16/// ```
17///
18/// Available features: `buffer-1m`, `buffer-2m` (default), `buffer-4m`,
19/// `buffer-8m`, `buffer-16m`, `buffer-32m`, `buffer-64m`.
20///
21/// # Feature unification
22///
23/// If multiple buffer-size features are enabled (e.g. by Cargo dependency
24/// unification), the **largest** size wins. This is consistent with the
25/// additive semantics of Cargo features.
26#[cfg(feature = "buffer-64m")]
27pub const BUFFER_SIZE: usize = 64 * MIB;
28
29/// See [`BUFFER_SIZE`] for documentation.
30#[cfg(all(feature = "buffer-32m", not(feature = "buffer-64m")))]
31pub const BUFFER_SIZE: usize = 32 * MIB;
32
33/// See [`BUFFER_SIZE`] for documentation.
34#[cfg(all(feature = "buffer-16m", not(any(feature = "buffer-32m", feature = "buffer-64m"))))]
35pub const BUFFER_SIZE: usize = 16 * MIB;
36
37/// See [`BUFFER_SIZE`] for documentation.
38#[cfg(all(
39    feature = "buffer-8m",
40    not(any(feature = "buffer-16m", feature = "buffer-32m", feature = "buffer-64m"))
41))]
42pub const BUFFER_SIZE: usize = 8 * MIB;
43
44/// See [`BUFFER_SIZE`] for documentation.
45#[cfg(all(
46    feature = "buffer-4m",
47    not(any(
48        feature = "buffer-8m",
49        feature = "buffer-16m",
50        feature = "buffer-32m",
51        feature = "buffer-64m"
52    ))
53))]
54pub const BUFFER_SIZE: usize = 4 * MIB;
55
56/// See [`BUFFER_SIZE`] for documentation.
57#[cfg(all(
58    feature = "buffer-2m",
59    not(any(
60        feature = "buffer-4m",
61        feature = "buffer-8m",
62        feature = "buffer-16m",
63        feature = "buffer-32m",
64        feature = "buffer-64m"
65    ))
66))]
67pub const BUFFER_SIZE: usize = 2 * MIB;
68
69/// See [`BUFFER_SIZE`] for documentation.
70#[cfg(all(
71    feature = "buffer-1m",
72    not(any(
73        feature = "buffer-2m",
74        feature = "buffer-4m",
75        feature = "buffer-8m",
76        feature = "buffer-16m",
77        feature = "buffer-32m",
78        feature = "buffer-64m"
79    ))
80))]
81pub const BUFFER_SIZE: usize = MIB;
82
83/// See [`BUFFER_SIZE`] for documentation. Fallback when no feature is selected.
84#[cfg(not(any(
85    feature = "buffer-1m",
86    feature = "buffer-2m",
87    feature = "buffer-4m",
88    feature = "buffer-8m",
89    feature = "buffer-16m",
90    feature = "buffer-32m",
91    feature = "buffer-64m",
92)))]
93pub const BUFFER_SIZE: usize = 2 * MIB;
94
95const _: () = assert!(BUFFER_SIZE.is_power_of_two(), "BUFFER_SIZE must be a power of two");
96const _: () = assert!(BUFFER_SIZE >= MIB, "BUFFER_SIZE must be at least 1 MiB");
97const _: () = assert!(BUFFER_SIZE <= 64 * MIB, "BUFFER_SIZE must not exceed 64 MiB");
98
99/// Message header size (8 bytes).
100pub const HEADER_SIZE: usize = core::mem::size_of::<MessageHeader>();
101
102/// Number of 8-byte blocks in the ring buffer.
103pub const BLOCK_COUNT: u32 = (BUFFER_SIZE / HEADER_SIZE) as u32;
104
105/// Byte offset of the `*const StaticInfo` pointer within the payload.
106pub const PAYLOAD_INFO_OFFSET: usize = 0;
107
108/// Byte offset of the `i64` timestamp within the payload.
109pub const PAYLOAD_TIMESTAMP_OFFSET: usize = PAYLOAD_INFO_OFFSET + size_of::<usize>();
110
111/// Byte offset where encoded arguments begin within the payload.
112pub const PAYLOAD_ARGS_OFFSET: usize = PAYLOAD_TIMESTAMP_OFFSET + size_of::<i64>();
113
114/// Fixed overhead per message (info pointer + timestamp) before encoded arguments.
115pub const PAYLOAD_HEADER_SIZE: usize = PAYLOAD_ARGS_OFFSET;
116
117/// Message header. `size` is the cross-thread control word (`AtomicU32`).
118///
119/// The header occupies exactly 8 bytes (one ring-buffer block). The payload
120/// immediately follows and stores:
121///
122/// ```text
123/// [info_ptr: *const StaticInfo (8B)] [timestamp: i64 (8B)] [encoded args...]
124/// ```
125///
126/// Layout constants [`PAYLOAD_INFO_OFFSET`], [`PAYLOAD_TIMESTAMP_OFFSET`],
127/// [`PAYLOAD_ARGS_OFFSET`], and [`PAYLOAD_HEADER_SIZE`] describe the payload
128/// structure.
129#[repr(C)]
130pub struct MessageHeader {
131    /// Total message size in bytes (header + payload), used as an atomic control word.
132    pub size: AtomicU32,
133    /// Padding to maintain 8-byte header alignment.
134    _padding: u32,
135}
136
137impl MessageHeader {
138    /// Publish a message: performs a `Release` store of the total size,
139    /// making the payload visible to the consumer.
140    #[inline(always)]
141    pub fn push(&self, payload_size: u32) {
142        self.size.store(payload_size + HEADER_SIZE as u32, Ordering::Release);
143    }
144
145    /// Returns a pointer to the beginning of the payload (immediately after the header).
146    #[inline(always)]
147    pub fn payload_pointer(&self) -> *mut u8 {
148        unsafe { (self as *const Self).add(1) as *mut u8 }
149    }
150}
151
152/// Single-producer single-consumer variable-length lock-free ring queue.
153///
154/// The underlying storage uses `MaybeUninit<u64>` as raw byte blocks instead of
155/// `[MessageHeader; N]`. This allows subsequent blocks to be safely used as raw
156/// payload byte regions without overwriting initialized `MessageHeader`/`AtomicU32`
157/// objects with invalid values.
158#[repr(C)]
159pub struct SpscVarQueue {
160    /// Raw ring buffer storage.
161    storage: [MaybeUninit<u64>; BLOCK_COUNT as usize],
162    /// Current write position in blocks (producer only).
163    pub(crate) write_index: u32,
164    /// Number of free blocks available for writing (producer only).
165    pub(crate) free_write_count: u32,
166    /// Padding to prevent false sharing between producer and consumer fields.
167    _cache_line_padding: [u8; 120],
168    /// Current read position in blocks (consumer only, atomically shared).
169    read_index: AtomicU32,
170}
171
172impl SpscVarQueue {
173    /// Returns an immutable pointer to the [`MessageHeader`] at the given block index.
174    #[inline(always)]
175    fn header_ptr_at(&self, block_index: u32) -> *const MessageHeader {
176        debug_assert!(block_index < BLOCK_COUNT);
177        unsafe { self.storage.as_ptr().add(block_index as usize) as *const MessageHeader }
178    }
179
180    /// Returns a mutable pointer to the [`MessageHeader`] at the given block index.
181    #[inline(always)]
182    fn header_mut_ptr_at(&mut self, block_index: u32) -> *mut MessageHeader {
183        debug_assert!(block_index < BLOCK_COUNT);
184        unsafe { self.storage.as_mut_ptr().add(block_index as usize) as *mut MessageHeader }
185    }
186
187    /// Allocates space on the producer side (called only by the writer thread).
188    ///
189    /// # Safety
190    /// Must be called via a raw pointer, and only the writer thread may call this method.
191    #[inline(always)]
192    pub unsafe fn producer_alloc(this: *mut Self, size: u32) -> Option<*mut MessageHeader> {
193        unsafe {
194            let queue = &mut *this;
195            let block_size = (size + HEADER_SIZE as u32 * 2 - 1) / HEADER_SIZE as u32;
196
197            // Fast path: enough space available without checking read_index.
198            // The unlikely hint tells LLVM to lay out the slow path out-of-line.
199            if core::intrinsics::unlikely(block_size >= queue.free_write_count) {
200                let read_index_cache = queue.read_index.load(Ordering::Acquire);
201
202                if read_index_cache <= queue.write_index {
203                    queue.free_write_count = BLOCK_COUNT - queue.write_index;
204                    if block_size >= queue.free_write_count && read_index_cache != 0 {
205                        // wrap around: write sentinel and reset write_index to zero
206                        (*queue.header_mut_ptr_at(0)).size.store(0, Ordering::Release);
207                        (*queue.header_mut_ptr_at(queue.write_index))
208                            .size
209                            .store(1, Ordering::Release);
210                        queue.write_index = 0;
211                        queue.free_write_count = read_index_cache;
212                    }
213                } else {
214                    queue.free_write_count = read_index_cache - queue.write_index;
215                }
216
217                if queue.free_write_count <= block_size {
218                    return None;
219                }
220            }
221
222            let result = queue.header_mut_ptr_at(queue.write_index);
223            queue.write_index += block_size;
224            queue.free_write_count -= block_size;
225            (*queue.header_mut_ptr_at(queue.write_index)).size.store(0, Ordering::Relaxed);
226            Some(result)
227        }
228    }
229
230    /// Peeks at the front message on the consumer side (called only by the background thread).
231    /// This only observes the message without advancing `read_index`.
232    ///
233    /// # Safety
234    /// Must be called via a raw pointer, and only the reader thread may call this method.
235    #[inline]
236    pub unsafe fn consumer_front(this: *const Self) -> Option<*const MessageHeader> {
237        unsafe {
238            let queue = &*this;
239            let index = queue.read_index.load(Ordering::Relaxed);
240            let size = (*queue.header_ptr_at(index)).size.load(Ordering::Acquire);
241
242            if size == 1 {
243                let size_at_zero = (*queue.header_ptr_at(0)).size.load(Ordering::Acquire);
244                if size_at_zero == 0 { None } else { Some(queue.header_ptr_at(0)) }
245            } else if size == 0 {
246                None
247            } else {
248                Some(queue.header_ptr_at(index))
249            }
250        }
251    }
252
253    /// Pops the front message on the consumer side (called only by the background thread).
254    ///
255    /// # Safety
256    /// Must be called only after `consumer_front` has returned `Some`.
257    #[inline]
258    pub unsafe fn consumer_pop(this: *mut Self) {
259        unsafe {
260            let queue = &mut *this;
261            let current_read_index = queue.read_index.load(Ordering::Relaxed);
262            let current_size =
263                (*queue.header_ptr_at(current_read_index)).size.load(Ordering::Relaxed);
264
265            if current_size == 1 {
266                let size_at_zero = (*queue.header_ptr_at(0)).size.load(Ordering::Relaxed);
267                let block_size = size_at_zero.div_ceil(HEADER_SIZE as u32);
268                queue.read_index.store(block_size, Ordering::Release);
269            } else {
270                let block_size = current_size.div_ceil(HEADER_SIZE as u32);
271                queue.read_index.store(current_read_index + block_size, Ordering::Release);
272            }
273        }
274    }
275}
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280
281    fn alloc_queue_on_heap() -> *mut SpscVarQueue {
282        unsafe {
283            let layout = std::alloc::Layout::new::<SpscVarQueue>();
284            let pointer = std::alloc::alloc_zeroed(layout) as *mut SpscVarQueue;
285            if pointer.is_null() {
286                std::alloc::handle_alloc_error(layout);
287            }
288            (*pointer).free_write_count = BLOCK_COUNT;
289            pointer
290        }
291    }
292
293    unsafe fn free_queue(pointer: *mut SpscVarQueue) {
294        unsafe {
295            let layout = std::alloc::Layout::new::<SpscVarQueue>();
296            std::alloc::dealloc(pointer as *mut u8, layout);
297        }
298    }
299
300    #[test]
301    fn test_basic_alloc_push_front_pop() {
302        unsafe {
303            let queue = alloc_queue_on_heap();
304
305            let header = SpscVarQueue::producer_alloc(queue, 16).unwrap();
306            assert!(!header.is_null());
307
308            let payload = (*header).payload_pointer();
309            (payload as *mut u64).write_unaligned(0xDEADBEEF);
310            (*header).push(16);
311
312            let front = SpscVarQueue::consumer_front(queue).unwrap();
313            let front_payload = (*front).payload_pointer() as *const u64;
314            assert_eq!(front_payload.read_unaligned(), 0xDEADBEEF);
315
316            SpscVarQueue::consumer_pop(queue);
317            assert!(SpscVarQueue::consumer_front(queue).is_none());
318
319            free_queue(queue);
320        }
321    }
322
323    #[test]
324    fn test_multiple_messages() {
325        unsafe {
326            let queue = alloc_queue_on_heap();
327
328            for i in 0..4u32 {
329                let header = SpscVarQueue::producer_alloc(queue, 4).unwrap();
330                ((*header).payload_pointer() as *mut u32).write_unaligned(i * 10);
331                (*header).push(4);
332            }
333
334            for i in 0..4u32 {
335                let front = SpscVarQueue::consumer_front(queue).unwrap();
336                let value = ((*front).payload_pointer() as *const u32).read_unaligned();
337                assert_eq!(value, i * 10);
338                SpscVarQueue::consumer_pop(queue);
339            }
340
341            assert!(SpscVarQueue::consumer_front(queue).is_none());
342            free_queue(queue);
343        }
344    }
345
346    #[test]
347    fn test_queue_full_returns_none() {
348        unsafe {
349            let queue = alloc_queue_on_heap();
350
351            let big_size = BUFFER_SIZE as u32 / 2;
352            let result1 = SpscVarQueue::producer_alloc(queue, big_size);
353            assert!(result1.is_some());
354            let header = result1.unwrap();
355            (*header).push(big_size);
356
357            // Allocating the same size again should fail
358            let result2 = SpscVarQueue::producer_alloc(queue, big_size);
359            assert!(result2.is_none());
360
361            free_queue(queue);
362        }
363    }
364}