Skip to main content

zbq_sys/
lib.rs

1//! Low-level unsafe FFI bindings for `zbq.h`.
2//!
3//! `zbq` is a Linux-specific single-producer / multi-consumer (SPMC) shared-memory
4//! ring-buffer with optional file-descriptor passing.
5//!
6//! This crate compiles `zbq.h` (by defining `ZBQ_IMPLEMENTATION`) via the `cc`
7//! build-dependency and exposes the raw C symbols through `extern "C"` blocks.
8//! For safe, idiomatic Rust wrappers see the `zbq` crate.
9#![warn(missing_docs)]
10#![allow(non_camel_case_types, non_snake_case, non_upper_case_globals)]
11
12use std::os::raw::{c_int, c_void};
13
14// =============================================================================
15// Constants
16// =============================================================================
17
18/// Maximum number of file descriptors that may be attached to a single message.
19pub const ZBQ_MAX_FD_PER_MSG: u8 = 253;
20
21/// Message alignment requirement in bytes.
22///
23/// Every committed run must be a multiple of this value and every
24/// [`zbq_msg_header`] is physically placed at an offset satisfying this.
25pub const ZBQ_MSG_ALIGN: usize = 16;
26
27// =============================================================================
28// Error codes
29// =============================================================================
30
31/// Result / error codes returned by the C API.
32#[repr(C)]
33#[derive(Clone, Copy, Debug, PartialEq, Eq)]
34pub enum zbq_error {
35    /// Operation succeeded.
36    ZBQ_OK = 0,
37    /// Invalid argument or precondition violation.
38    ZBQ_ERROR_INVALID = -1,
39    /// Memory allocation failed.
40    ZBQ_ERROR_NOMEM = -2,
41    /// Spurious wake-up or try-again condition.
42    ZBQ_ERROR_AGAIN = -3,
43    /// Low-level I/O error (syscall failure, etc.).
44    ZBQ_ERROR_IO = -4,
45    /// Protocol / state mismatch.
46    ZBQ_ERROR_PROTOCOL = -5,
47    /// Ring overflow / would exceed capacity.
48    ZBQ_ERROR_OVERFLOW = -6,
49    /// No free slot or entry not found.
50    ZBQ_ERROR_NOENT = -7,
51}
52
53// =============================================================================
54// Wakeup mode
55// =============================================================================
56
57/// Consumer blocking strategy.
58#[repr(C)]
59#[derive(Clone, Copy, Debug, PartialEq, Eq)]
60pub enum zbq_wakeup_mode {
61    /// Consumer blocks on the producer's broadcast futex.
62    ZBQ_WAKEUP_PUSH = 0,
63    /// Consumer self-manages polling (busy-poll or adaptive sleep).
64    ZBQ_WAKEUP_PULL = 1,
65}
66
67// =============================================================================
68// Commit flags
69// =============================================================================
70
71/// Flags that modify [`zbq_producer_commit`] behaviour.
72#[repr(C)]
73#[derive(Clone, Copy, Debug, PartialEq, Eq)]
74pub enum zbq_commit_flags {
75    /// No special behaviour.
76    ZBQ_COMMIT_NONE = 0,
77    /// Force a consumer scan after committing the write.
78    ZBQ_COMMIT_FORCE_SCAN = 1,
79}
80
81// =============================================================================
82// Shared-memory layout structures
83// =============================================================================
84
85/// 256-byte control region, cache-line aligned.
86///
87/// This structure lives in the shared memory segment that all consumers map
88/// read-only.  The producer writes to the fields below.
89#[repr(C, align(64))]
90pub struct zbq_control_region {
91    /// Monotonically-increasing producer head (in bytes), atomically stored
92    /// with release semantics on commit.
93    pub producer_head: u64,
94
95    _pad0: [u8; 56],
96
97    /// Futex word incremented by the producer when new data is available.
98    /// Push-mode consumers [`zbq_consumer_wait_broadcast`] on this word.
99    pub broadcast_futex: u32,
100
101    _pad1: [u8; 60],
102
103    /// PID of the producer process.  Consumers use this to open a `pidfd`
104    /// for liveness monitoring.
105    pub producer_pid: i32,
106
107    _pad2: [u8; 60],
108
109    /// Total size of the data ring in bytes.
110    pub ring_capacity: u64,
111    /// Number of failed reservation attempts that triggers a forced consumer scan.
112    pub clog_threshold: u64,
113    /// Nanoseconds after which a consumer with no progress is considered hung.
114    pub stall_timeout_ns: u64,
115    reserved: [u8; 40],
116}
117
118/// 4096-byte per-consumer metadata page, page aligned.
119///
120/// A consumer has read-write access to exactly **one** such page; the
121/// producer retains read-write access to all of them.
122#[repr(C, align(4096))]
123pub struct zbq_consumer_metadata {
124    /// Tail offset known to the consumer, atomically published with release.
125    pub tail_offset: u64,
126    /// Timestamp relative to `join_baseline` of the last progress update.
127    pub progress_timestamp: u64,
128    /// Baseline timestamp captured at consumer join time.
129    pub join_baseline: u64,
130    reserved: [u8; 4072],
131}
132
133/// 16-byte packed message header, 16-byte aligned.
134///
135/// Every committed run starts with this header followed by `payload_len` bytes
136/// of payload.  `total_len` (used by the consumer view) is
137/// `sizeof(zbq_msg_header) + payload_len`.
138#[repr(C, align(16))]
139pub struct zbq_msg_header {
140    /// Length in bytes of the payload that follows this header.
141    pub payload_len: u32,
142    /// Number of file descriptors attached to this message (0..253).
143    pub fd_count: u8,
144    /// Message-specific flags (reserved, currently ignored on the wire).
145    pub flags: u8,
146    /// Reserved for future use.
147    pub reserved: u16,
148    /// Sequence ID used to correlate the ring entry with FD records sent
149    /// over the per-consumer side socket.
150    pub seq_id: u64,
151}
152
153// =============================================================================
154// Parameter & view structures
155// =============================================================================
156
157/// Parameters supplied to [`zbq_producer_create`].
158#[repr(C)]
159#[derive(Clone, Copy, Debug, PartialEq, Eq)]
160pub struct zbq_producer_params {
161    /// Size of the data ring.  Must be non-zero and a multiple of 4096.
162    pub ring_capacity: u64,
163    /// Clog threshold (failed reservation attempts) before a forced scan.
164    pub clog_threshold: u64,
165    /// Nanoseconds after which a consumer is evicted if no progress is seen.
166    pub stall_timeout_ns: u64,
167    /// Maximum number of simultaneous consumer slots.
168    pub max_consumers: u32,
169}
170
171/// Result of a producer reservation query.
172///
173/// Describes a physically contiguous span of the ring that the producer is
174/// allowed to write into.  The run never wraps around the physical end of the
175/// buffer.
176#[repr(C)]
177#[derive(Clone, Copy, Debug, PartialEq, Eq)]
178pub struct zbq_contiguous_run {
179    /// Pointer to the first writable byte inside the producer's ring mapping.
180    pub base: *mut c_void,
181    /// Largest number of bytes that may be written starting at `base`.
182    pub max_bytes: u64,
183    /// Logical byte offset of `base` (equal to the producer head at query time).
184    pub offset: u64,
185}
186
187/// Read-only view of a message inside the ring.
188///
189/// Returned by [`zbq_consumer_peek`].
190#[repr(C)]
191#[derive(Clone, Copy, Debug, PartialEq, Eq)]
192pub struct zbq_msg_view {
193    /// Pointer to the message header inside the read-only ring mapping.
194    pub header: *const zbq_msg_header,
195    /// Pointer to the payload bytes that follow the header.
196    pub payload: *const c_void,
197    /// Length of the payload in bytes.
198    pub payload_len: u64,
199    /// Logical offset of this message in the ring.
200    pub logical_offset: u64,
201    /// Total bytes occupied by this message (`header + payload`).
202    pub total_len: u64,
203}
204
205/// Payload sent from producer to consumer during the join handshake.
206#[repr(C)]
207pub struct zbq_join_payload {
208    /// Size of the data ring.
209    pub ring_capacity: u64,
210    /// Baseline timestamp used for progress measurements.
211    pub join_baseline: u64,
212    /// Initial head offset (consumer starts reading from here).
213    pub initial_head: u64,
214    /// PID of the producer process.
215    pub producer_pid: i32,
216    /// Slot index assigned to this consumer.
217    pub slot_index: u32,
218    /// Wakeup mode (`ZBQ_WAKEUP_PUSH` or `ZBQ_WAKEUP_PULL`).
219    pub wakeup_mode: u32,
220    reserved: [u8; 28],
221}
222
223// =============================================================================
224// Opaque handles
225// =============================================================================
226
227/// Opaque handle for a ring-buffer producer.
228///
229/// Created by [`zbq_producer_create`] and destroyed by [`zbq_producer_destroy`].
230/// Do not attempt to allocate or inspect this type directly.
231#[repr(C)]
232pub struct zbq_producer {
233    _private: [u8; 0],
234}
235
236/// Opaque handle for a ring-buffer consumer.
237///
238/// Created by [`zbq_consumer_join`] and destroyed by [`zbq_consumer_destroy`].
239/// Do not attempt to allocate or inspect this type directly.
240#[repr(C)]
241pub struct zbq_consumer {
242    _private: [u8; 0],
243}
244
245// =============================================================================
246// Compile-time layout checks (must match the C compile-time assertions)
247// =============================================================================
248
249const _: () = assert!(core::mem::size_of::<zbq_control_region>() == 256);
250const _: () = assert!(core::mem::align_of::<zbq_control_region>() == 64);
251const _: () = assert!(core::mem::size_of::<zbq_consumer_metadata>() == 4096);
252const _: () = assert!(core::mem::align_of::<zbq_consumer_metadata>() == 4096);
253const _: () = assert!(core::mem::size_of::<zbq_msg_header>() == 16);
254const _: () = assert!(core::mem::align_of::<zbq_msg_header>() == 16);
255
256// =============================================================================
257// Producer API
258// =============================================================================
259
260extern "C" {
261    /// Create a new producer ring-buffer.
262    ///
263    /// `params->ring_capacity` must be a non-zero multiple of 4096 and
264    /// `params->max_consumers` must be non-zero.
265    ///
266    /// On success the newly-allocated handle is written to `*out` and
267    /// `ZBQ_OK` is returned.
268    pub fn zbq_producer_create(
269        out: *mut *mut zbq_producer,
270        params: *const zbq_producer_params,
271    ) -> c_int;
272
273    /// Destroy a producer and release all associated resources (mappings, pidfds,
274    /// sockets, slots).
275    ///
276    /// Passing `NULL` is a no-op.
277    pub fn zbq_producer_destroy(prod: *mut zbq_producer);
278
279    /// Complete the join handshake for an incoming consumer connection.
280    ///
281    /// `conn_fd` is an already-accepted Unix-domain socket descriptor.
282    /// `mode` is the desired consumer wakeup mode.
283    ///
284    /// On success the assigned consumer slot index is written to `*out_slot_id`.
285    /// The caller retains ownership of `conn_fd` and should close it after this
286    /// call returns.
287    pub fn zbq_producer_handshake(
288        prod: *mut zbq_producer,
289        conn_fd: c_int,
290        mode: zbq_wakeup_mode,
291        out_slot_id: *mut u32,
292    ) -> c_int;
293
294    /// Query the largest physically contiguous writable run available to the
295    /// producer.
296    ///
297    /// If the queue is fully clogged this returns a run with `max_bytes == 0`.
298    pub fn zbq_producer_query(prod: *mut zbq_producer) -> zbq_contiguous_run;
299
300    /// Commit `written_bytes` previously written into the contiguous run
301    /// returned by the most recent [`zbq_producer_query`].
302    ///
303    /// Returns `true` on success and `false` if `written_bytes` exceeds the
304    /// queried `max_bytes` or if the producer handle is invalid.
305    pub fn zbq_producer_commit(
306        prod: *mut zbq_producer,
307        written_bytes: u64,
308        flags: zbq_commit_flags,
309    ) -> bool;
310
311    /// Atomically increment and return the next sequence ID for FD-bearing
312    /// messages.
313    ///
314    /// The caller should store the returned value into the message header
315    /// before calling [`zbq_producer_commit`].
316    pub fn zbq_producer_next_seq_id(prod: *mut zbq_producer) -> u64;
317
318    /// Broadcast file descriptors to all push-mode consumers.
319    ///
320    /// Sends an `SCM_RIGHTS` record on each consumer's side socket containing
321    /// exactly `fd_count` descriptors and the matching `seq_id`.
322    ///
323    /// This function is a no-op (zero overhead) when `fd_count == 0`.
324    pub fn zbq_producer_broadcast_fds(
325        prod: *mut zbq_producer,
326        seq_id: u64,
327        fds: *const c_int,
328        fd_count: u8,
329    ) -> c_int;
330
331    /// Scan for dead or hung consumers and evict them.
332    ///
333    /// Normally invoked automatically when the clog threshold is reached, but
334    /// may be called explicitly when `ZBQ_COMMIT_FORCE_SCAN` is used or by
335    /// application logic.
336    pub fn zbq_producer_scan(prod: *mut zbq_producer) -> c_int;
337}
338
339// =============================================================================
340// Consumer API
341// =============================================================================
342
343extern "C" {
344    /// Join a ring-buffer as a consumer.
345    ///
346    /// `rendezvous_fd` must be a connected Unix-domain socket to the producer.
347    /// `mode` selects push (futex) or pull (self-poll) behaviour.
348    ///
349    /// On success the consumer handle is written to `*out` and `ZBQ_OK` is
350    /// returned.  The caller retains ownership of `rendezvous_fd`.
351    pub fn zbq_consumer_join(
352        rendezvous_fd: c_int,
353        mode: zbq_wakeup_mode,
354        out: *mut *mut zbq_consumer,
355    ) -> c_int;
356
357    /// Destroy a consumer and release all associated resources.
358    ///
359    /// Passing `NULL` is a no-op.
360    pub fn zbq_consumer_destroy(cons: *mut zbq_consumer);
361
362    /// Check whether the producer process is still alive.
363    ///
364    /// This polls the `pidfd` opened on the producer at join time.
365    pub fn zbq_consumer_is_producer_alive(cons: *mut zbq_consumer) -> bool;
366
367    /// Load the current producer head with acquire semantics.
368    pub fn zbq_consumer_acquire_head(cons: *const zbq_consumer) -> u64;
369
370    /// Inspect the message at the consumer's current tail.
371    ///
372    /// If data is available, `out` is populated with a [`zbq_msg_view`]
373    /// pointing into the read-only ring mapping.
374    ///
375    /// Returns `ZBQ_ERROR_AGAIN` when the caller has reached the current
376    /// producer head and there is no new data yet.
377    pub fn zbq_consumer_peek(cons: *mut zbq_consumer, out: *mut zbq_msg_view) -> c_int;
378
379    /// **Do not use this on untrusted producers.**
380    ///
381    /// Attempt to receive the file descriptors attached to the message with the
382    /// given `expected_seq_id` over the consumer's side socket.
383    ///
384    /// `out_fds` must point to an array with room for at least
385    /// [`ZBQ_MAX_FD_PER_MSG`] elements.  The actual number received is written
386    /// to `*out_fd_count`.
387    pub fn zbq_consumer_recv_fds(
388        cons: *mut zbq_consumer,
389        expected_seq_id: u64,
390        out_fds: *mut c_int,
391        out_fd_count: *mut u8,
392    ) -> c_int;
393
394    /// Atomically publish the new consumer tail and progress timestamp.
395    ///
396    /// `new_tail` must not exceed the value returned by the most recent
397    /// [`zbq_consumer_acquire_head`].  This must be called after processing
398    /// a batch of messages to release ring space back to the producer.
399    pub fn zbq_consumer_publish(cons: *mut zbq_consumer, new_tail: u64);
400
401    /// Read the current broadcast futex token.
402    ///
403    /// Push-mode consumers should load this before calling
404    /// [`zbq_consumer_wait_broadcast`] and reload it after waking up.
405    pub fn zbq_consumer_get_futex_token(cons: *const zbq_consumer) -> u32;
406
407    /// Block until the producer commits new data (push mode).
408    ///
409    /// `token` should be the value obtained from
410    /// [`zbq_consumer_get_futex_token`] immediately before this call.
411    ///
412    /// Returns `ZBQ_ERROR_AGAIN` on spurious wake-up — the caller should
413    /// re-read the token and retry.
414    pub fn zbq_consumer_wait_broadcast(cons: *mut zbq_consumer, token: u32) -> c_int;
415
416    /// Load the consumer's local tail with acquire semantics.
417    pub fn zbq_consumer_tail(cons: *const zbq_consumer) -> u64;
418
419    /// Return a read-only pointer to the shared control region.
420    pub fn zbq_consumer_ctrl(cons: *const zbq_consumer) -> *const zbq_control_region;
421
422    /// Return a mutable pointer to this consumer's private metadata page.
423    pub fn zbq_consumer_meta(cons: *mut zbq_consumer) -> *mut zbq_consumer_metadata;
424}