zbq_sys/lib.rs
1//! Low-level unsafe FFI bindings for `zbq.h`.
2//!
3//! `zbq` is a Linux-specific single-producer / multi-consumer (SPMC) shared-memory
4//! ring-buffer with optional file-descriptor passing.
5//!
6//! This crate compiles `zbq.h` (by defining `ZBQ_IMPLEMENTATION`) via the `cc`
7//! build-dependency and exposes the raw C symbols through `extern "C"` blocks.
8//! For safe, idiomatic Rust wrappers see the `zbq` crate.
9#![warn(missing_docs)]
10#![allow(non_camel_case_types, non_snake_case, non_upper_case_globals)]
11
12use std::os::raw::{c_int, c_void};
13
14// =============================================================================
15// Constants
16// =============================================================================
17
18/// Maximum number of file descriptors that may be attached to a single message.
19pub const ZBQ_MAX_FD_PER_MSG: u8 = 253;
20
21/// Message alignment requirement in bytes.
22///
23/// Every committed run must be a multiple of this value and every
24/// [`zbq_msg_header`] is physically placed at an offset satisfying this.
25pub const ZBQ_MSG_ALIGN: usize = 16;
26
27// =============================================================================
28// Error codes
29// =============================================================================
30
31/// Result / error codes returned by the C API.
32#[repr(C)]
33#[derive(Clone, Copy, Debug, PartialEq, Eq)]
34pub enum zbq_error {
35 /// Operation succeeded.
36 ZBQ_OK = 0,
37 /// Invalid argument or precondition violation.
38 ZBQ_ERROR_INVALID = -1,
39 /// Memory allocation failed.
40 ZBQ_ERROR_NOMEM = -2,
41 /// Spurious wake-up or try-again condition.
42 ZBQ_ERROR_AGAIN = -3,
43 /// Low-level I/O error (syscall failure, etc.).
44 ZBQ_ERROR_IO = -4,
45 /// Protocol / state mismatch.
46 ZBQ_ERROR_PROTOCOL = -5,
47 /// Ring overflow / would exceed capacity.
48 ZBQ_ERROR_OVERFLOW = -6,
49 /// No free slot or entry not found.
50 ZBQ_ERROR_NOENT = -7,
51}
52
53// =============================================================================
54// Wakeup mode
55// =============================================================================
56
57/// Consumer blocking strategy.
58#[repr(C)]
59#[derive(Clone, Copy, Debug, PartialEq, Eq)]
60pub enum zbq_wakeup_mode {
61 /// Consumer blocks on the producer's broadcast futex.
62 ZBQ_WAKEUP_PUSH = 0,
63 /// Consumer self-manages polling (busy-poll or adaptive sleep).
64 ZBQ_WAKEUP_PULL = 1,
65}
66
67// =============================================================================
68// Commit flags
69// =============================================================================
70
71/// Flags that modify [`zbq_producer_commit`] behaviour.
72#[repr(C)]
73#[derive(Clone, Copy, Debug, PartialEq, Eq)]
74pub enum zbq_commit_flags {
75 /// No special behaviour.
76 ZBQ_COMMIT_NONE = 0,
77 /// Force a consumer scan after committing the write.
78 ZBQ_COMMIT_FORCE_SCAN = 1,
79}
80
81// =============================================================================
82// Shared-memory layout structures
83// =============================================================================
84
85/// 256-byte control region, cache-line aligned.
86///
87/// This structure lives in the shared memory segment that all consumers map
88/// read-only. The producer writes to the fields below.
89#[repr(C, align(64))]
90pub struct zbq_control_region {
91 /// Monotonically-increasing producer head (in bytes), atomically stored
92 /// with release semantics on commit.
93 pub producer_head: u64,
94
95 _pad0: [u8; 56],
96
97 /// Futex word incremented by the producer when new data is available.
98 /// Push-mode consumers [`zbq_consumer_wait_broadcast`] on this word.
99 pub broadcast_futex: u32,
100
101 _pad1: [u8; 60],
102
103 /// PID of the producer process. Consumers use this to open a `pidfd`
104 /// for liveness monitoring.
105 pub producer_pid: i32,
106
107 _pad2: [u8; 60],
108
109 /// Total size of the data ring in bytes.
110 pub ring_capacity: u64,
111 /// Number of failed reservation attempts that triggers a forced consumer scan.
112 pub clog_threshold: u64,
113 /// Nanoseconds after which a consumer with no progress is considered hung.
114 pub stall_timeout_ns: u64,
115 reserved: [u8; 40],
116}
117
118/// 4096-byte per-consumer metadata page, page aligned.
119///
120/// A consumer has read-write access to exactly **one** such page; the
121/// producer retains read-write access to all of them.
122#[repr(C, align(4096))]
123pub struct zbq_consumer_metadata {
124 /// Tail offset known to the consumer, atomically published with release.
125 pub tail_offset: u64,
126 /// Timestamp relative to `join_baseline` of the last progress update.
127 pub progress_timestamp: u64,
128 /// Baseline timestamp captured at consumer join time.
129 pub join_baseline: u64,
130 reserved: [u8; 4072],
131}
132
133/// 16-byte packed message header, 16-byte aligned.
134///
135/// Every committed run starts with this header followed by `payload_len` bytes
136/// of payload. `total_len` (used by the consumer view) is
137/// `sizeof(zbq_msg_header) + payload_len`.
138#[repr(C, align(16))]
139pub struct zbq_msg_header {
140 /// Length in bytes of the payload that follows this header.
141 pub payload_len: u32,
142 /// Number of file descriptors attached to this message (0..253).
143 pub fd_count: u8,
144 /// Message-specific flags (reserved, currently ignored on the wire).
145 pub flags: u8,
146 /// Reserved for future use.
147 pub reserved: u16,
148 /// Sequence ID used to correlate the ring entry with FD records sent
149 /// over the per-consumer side socket.
150 pub seq_id: u64,
151}
152
153// =============================================================================
154// Parameter & view structures
155// =============================================================================
156
157/// Parameters supplied to [`zbq_producer_create`].
158#[repr(C)]
159#[derive(Clone, Copy, Debug, PartialEq, Eq)]
160pub struct zbq_producer_params {
161 /// Size of the data ring. Must be non-zero and a multiple of 4096.
162 pub ring_capacity: u64,
163 /// Clog threshold (failed reservation attempts) before a forced scan.
164 pub clog_threshold: u64,
165 /// Nanoseconds after which a consumer is evicted if no progress is seen.
166 pub stall_timeout_ns: u64,
167 /// Maximum number of simultaneous consumer slots.
168 pub max_consumers: u32,
169}
170
171/// Result of a producer reservation query.
172///
173/// Describes a physically contiguous span of the ring that the producer is
174/// allowed to write into. The run never wraps around the physical end of the
175/// buffer.
176#[repr(C)]
177#[derive(Clone, Copy, Debug, PartialEq, Eq)]
178pub struct zbq_contiguous_run {
179 /// Pointer to the first writable byte inside the producer's ring mapping.
180 pub base: *mut c_void,
181 /// Largest number of bytes that may be written starting at `base`.
182 pub max_bytes: u64,
183 /// Logical byte offset of `base` (equal to the producer head at query time).
184 pub offset: u64,
185}
186
187/// Read-only view of a message inside the ring.
188///
189/// Returned by [`zbq_consumer_peek`].
190#[repr(C)]
191#[derive(Clone, Copy, Debug, PartialEq, Eq)]
192pub struct zbq_msg_view {
193 /// Pointer to the message header inside the read-only ring mapping.
194 pub header: *const zbq_msg_header,
195 /// Pointer to the payload bytes that follow the header.
196 pub payload: *const c_void,
197 /// Length of the payload in bytes.
198 pub payload_len: u64,
199 /// Logical offset of this message in the ring.
200 pub logical_offset: u64,
201 /// Total bytes occupied by this message (`header + payload`).
202 pub total_len: u64,
203}
204
205/// Payload sent from producer to consumer during the join handshake.
206#[repr(C)]
207pub struct zbq_join_payload {
208 /// Size of the data ring.
209 pub ring_capacity: u64,
210 /// Baseline timestamp used for progress measurements.
211 pub join_baseline: u64,
212 /// Initial head offset (consumer starts reading from here).
213 pub initial_head: u64,
214 /// PID of the producer process.
215 pub producer_pid: i32,
216 /// Slot index assigned to this consumer.
217 pub slot_index: u32,
218 /// Wakeup mode (`ZBQ_WAKEUP_PUSH` or `ZBQ_WAKEUP_PULL`).
219 pub wakeup_mode: u32,
220 reserved: [u8; 28],
221}
222
223// =============================================================================
224// Opaque handles
225// =============================================================================
226
227/// Opaque handle for a ring-buffer producer.
228///
229/// Created by [`zbq_producer_create`] and destroyed by [`zbq_producer_destroy`].
230/// Do not attempt to allocate or inspect this type directly.
231#[repr(C)]
232pub struct zbq_producer {
233 _private: [u8; 0],
234}
235
236/// Opaque handle for a ring-buffer consumer.
237///
238/// Created by [`zbq_consumer_join`] and destroyed by [`zbq_consumer_destroy`].
239/// Do not attempt to allocate or inspect this type directly.
240#[repr(C)]
241pub struct zbq_consumer {
242 _private: [u8; 0],
243}
244
245// =============================================================================
246// Compile-time layout checks (must match the C compile-time assertions)
247// =============================================================================
248
249const _: () = assert!(core::mem::size_of::<zbq_control_region>() == 256);
250const _: () = assert!(core::mem::align_of::<zbq_control_region>() == 64);
251const _: () = assert!(core::mem::size_of::<zbq_consumer_metadata>() == 4096);
252const _: () = assert!(core::mem::align_of::<zbq_consumer_metadata>() == 4096);
253const _: () = assert!(core::mem::size_of::<zbq_msg_header>() == 16);
254const _: () = assert!(core::mem::align_of::<zbq_msg_header>() == 16);
255
256// =============================================================================
257// Producer API
258// =============================================================================
259
260extern "C" {
261 /// Create a new producer ring-buffer.
262 ///
263 /// `params->ring_capacity` must be a non-zero multiple of 4096 and
264 /// `params->max_consumers` must be non-zero.
265 ///
266 /// On success the newly-allocated handle is written to `*out` and
267 /// `ZBQ_OK` is returned.
268 pub fn zbq_producer_create(
269 out: *mut *mut zbq_producer,
270 params: *const zbq_producer_params,
271 ) -> c_int;
272
273 /// Destroy a producer and release all associated resources (mappings, pidfds,
274 /// sockets, slots).
275 ///
276 /// Passing `NULL` is a no-op.
277 pub fn zbq_producer_destroy(prod: *mut zbq_producer);
278
279 /// Complete the join handshake for an incoming consumer connection.
280 ///
281 /// `conn_fd` is an already-accepted Unix-domain socket descriptor.
282 /// `mode` is the desired consumer wakeup mode.
283 ///
284 /// On success the assigned consumer slot index is written to `*out_slot_id`.
285 /// The caller retains ownership of `conn_fd` and should close it after this
286 /// call returns.
287 pub fn zbq_producer_handshake(
288 prod: *mut zbq_producer,
289 conn_fd: c_int,
290 mode: zbq_wakeup_mode,
291 out_slot_id: *mut u32,
292 ) -> c_int;
293
294 /// Query the largest physically contiguous writable run available to the
295 /// producer.
296 ///
297 /// If the queue is fully clogged this returns a run with `max_bytes == 0`.
298 pub fn zbq_producer_query(prod: *mut zbq_producer) -> zbq_contiguous_run;
299
300 /// Commit `written_bytes` previously written into the contiguous run
301 /// returned by the most recent [`zbq_producer_query`].
302 ///
303 /// Returns `true` on success and `false` if `written_bytes` exceeds the
304 /// queried `max_bytes` or if the producer handle is invalid.
305 pub fn zbq_producer_commit(
306 prod: *mut zbq_producer,
307 written_bytes: u64,
308 flags: zbq_commit_flags,
309 ) -> bool;
310
311 /// Atomically increment and return the next sequence ID for FD-bearing
312 /// messages.
313 ///
314 /// The caller should store the returned value into the message header
315 /// before calling [`zbq_producer_commit`].
316 pub fn zbq_producer_next_seq_id(prod: *mut zbq_producer) -> u64;
317
318 /// Broadcast file descriptors to all push-mode consumers.
319 ///
320 /// Sends an `SCM_RIGHTS` record on each consumer's side socket containing
321 /// exactly `fd_count` descriptors and the matching `seq_id`.
322 ///
323 /// This function is a no-op (zero overhead) when `fd_count == 0`.
324 pub fn zbq_producer_broadcast_fds(
325 prod: *mut zbq_producer,
326 seq_id: u64,
327 fds: *const c_int,
328 fd_count: u8,
329 ) -> c_int;
330
331 /// Scan for dead or hung consumers and evict them.
332 ///
333 /// Normally invoked automatically when the clog threshold is reached, but
334 /// may be called explicitly when `ZBQ_COMMIT_FORCE_SCAN` is used or by
335 /// application logic.
336 pub fn zbq_producer_scan(prod: *mut zbq_producer) -> c_int;
337}
338
339// =============================================================================
340// Consumer API
341// =============================================================================
342
343extern "C" {
344 /// Join a ring-buffer as a consumer.
345 ///
346 /// `rendezvous_fd` must be a connected Unix-domain socket to the producer.
347 /// `mode` selects push (futex) or pull (self-poll) behaviour.
348 ///
349 /// On success the consumer handle is written to `*out` and `ZBQ_OK` is
350 /// returned. The caller retains ownership of `rendezvous_fd`.
351 pub fn zbq_consumer_join(
352 rendezvous_fd: c_int,
353 mode: zbq_wakeup_mode,
354 out: *mut *mut zbq_consumer,
355 ) -> c_int;
356
357 /// Destroy a consumer and release all associated resources.
358 ///
359 /// Passing `NULL` is a no-op.
360 pub fn zbq_consumer_destroy(cons: *mut zbq_consumer);
361
362 /// Check whether the producer process is still alive.
363 ///
364 /// This polls the `pidfd` opened on the producer at join time.
365 pub fn zbq_consumer_is_producer_alive(cons: *mut zbq_consumer) -> bool;
366
367 /// Load the current producer head with acquire semantics.
368 pub fn zbq_consumer_acquire_head(cons: *const zbq_consumer) -> u64;
369
370 /// Inspect the message at the consumer's current tail.
371 ///
372 /// If data is available, `out` is populated with a [`zbq_msg_view`]
373 /// pointing into the read-only ring mapping.
374 ///
375 /// Returns `ZBQ_ERROR_AGAIN` when the caller has reached the current
376 /// producer head and there is no new data yet.
377 pub fn zbq_consumer_peek(cons: *mut zbq_consumer, out: *mut zbq_msg_view) -> c_int;
378
379 /// **Do not use this on untrusted producers.**
380 ///
381 /// Attempt to receive the file descriptors attached to the message with the
382 /// given `expected_seq_id` over the consumer's side socket.
383 ///
384 /// `out_fds` must point to an array with room for at least
385 /// [`ZBQ_MAX_FD_PER_MSG`] elements. The actual number received is written
386 /// to `*out_fd_count`.
387 pub fn zbq_consumer_recv_fds(
388 cons: *mut zbq_consumer,
389 expected_seq_id: u64,
390 out_fds: *mut c_int,
391 out_fd_count: *mut u8,
392 ) -> c_int;
393
394 /// Atomically publish the new consumer tail and progress timestamp.
395 ///
396 /// `new_tail` must not exceed the value returned by the most recent
397 /// [`zbq_consumer_acquire_head`]. This must be called after processing
398 /// a batch of messages to release ring space back to the producer.
399 pub fn zbq_consumer_publish(cons: *mut zbq_consumer, new_tail: u64);
400
401 /// Read the current broadcast futex token.
402 ///
403 /// Push-mode consumers should load this before calling
404 /// [`zbq_consumer_wait_broadcast`] and reload it after waking up.
405 pub fn zbq_consumer_get_futex_token(cons: *const zbq_consumer) -> u32;
406
407 /// Block until the producer commits new data (push mode).
408 ///
409 /// `token` should be the value obtained from
410 /// [`zbq_consumer_get_futex_token`] immediately before this call.
411 ///
412 /// Returns `ZBQ_ERROR_AGAIN` on spurious wake-up — the caller should
413 /// re-read the token and retry.
414 pub fn zbq_consumer_wait_broadcast(cons: *mut zbq_consumer, token: u32) -> c_int;
415
416 /// Load the consumer's local tail with acquire semantics.
417 pub fn zbq_consumer_tail(cons: *const zbq_consumer) -> u64;
418
419 /// Return a read-only pointer to the shared control region.
420 pub fn zbq_consumer_ctrl(cons: *const zbq_consumer) -> *const zbq_control_region;
421
422 /// Return a mutable pointer to this consumer's private metadata page.
423 pub fn zbq_consumer_meta(cons: *mut zbq_consumer) -> *mut zbq_consumer_metadata;
424}