libbpf_rs/
ringbuf.rs

1use core::ffi::c_void;
2use std::fmt::Debug;
3use std::fmt::Formatter;
4use std::fmt::Result as FmtResult;
5use std::ops::Deref as _;
6use std::ops::DerefMut as _;
7use std::os::raw::c_ulong;
8use std::os::unix::prelude::AsRawFd;
9use std::os::unix::prelude::BorrowedFd;
10use std::ptr::null_mut;
11use std::ptr::NonNull;
12use std::slice;
13use std::time::Duration;
14
15use crate::util;
16use crate::util::validate_bpf_ret;
17use crate::AsRawLibbpf;
18use crate::Error;
19use crate::ErrorExt as _;
20use crate::MapCore;
21use crate::MapType;
22use crate::Result;
23
24type Cb<'a> = Box<dyn FnMut(&[u8]) -> i32 + 'a>;
25
26struct RingBufferCallback<'a> {
27    cb: Cb<'a>,
28}
29
30impl<'a> RingBufferCallback<'a> {
31    fn new<F>(cb: F) -> Self
32    where
33        F: FnMut(&[u8]) -> i32 + 'a,
34    {
35        RingBufferCallback { cb: Box::new(cb) }
36    }
37}
38
39impl Debug for RingBufferCallback<'_> {
40    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
41        let Self { cb } = self;
42        f.debug_struct("RingBufferCallback")
43            .field("cb", &(cb.deref() as *const _))
44            .finish()
45    }
46}
47
48/// Builds [`RingBuffer`] instances.
49///
50/// `ringbuf`s are a special kind of [`Map`][crate::Map], used to transfer data
51/// between [`Program`][crate::Program]s and userspace. As of Linux 5.8, the
52/// `ringbuf` map is now preferred over the `perf buffer`.
53#[derive(Debug, Default)]
54pub struct RingBufferBuilder<'slf, 'cb> {
55    fd_callbacks: Vec<(BorrowedFd<'slf>, RingBufferCallback<'cb>)>,
56}
57
58impl<'slf, 'cb: 'slf> RingBufferBuilder<'slf, 'cb> {
59    /// Create a new `RingBufferBuilder` object.
60    pub fn new() -> Self {
61        RingBufferBuilder {
62            fd_callbacks: vec![],
63        }
64    }
65
66    /// Add a new ringbuf `map` and associated `callback` to this ring buffer
67    /// manager. The callback should take one argument, a slice of raw bytes,
68    /// and return an i32.
69    ///
70    /// Negative return values in the callback will stop ring buffer consumption early and
71    /// propagate the error code to the polling caller.
72    ///
73    /// The callback provides a raw byte slice. You may find libraries such as
74    /// [`plain`](https://crates.io/crates/plain) helpful.
75    pub fn add<NewF>(&mut self, map: &'slf dyn MapCore, callback: NewF) -> Result<&mut Self>
76    where
77        NewF: FnMut(&[u8]) -> i32 + 'cb,
78    {
79        if map.map_type() != MapType::RingBuf {
80            return Err(Error::with_invalid_data("Must use a RingBuf map"));
81        }
82        self.fd_callbacks
83            .push((map.as_fd(), RingBufferCallback::new(callback)));
84        Ok(self)
85    }
86
87    /// Build a new [`RingBuffer`]. Must have added at least one ringbuf.
88    pub fn build(self) -> Result<RingBuffer<'cb>> {
89        let mut cbs = vec![];
90        let mut rb_ptr: Option<NonNull<libbpf_sys::ring_buffer>> = None;
91        let c_sample_cb: libbpf_sys::ring_buffer_sample_fn = Some(Self::call_sample_cb);
92
93        for (fd, callback) in self.fd_callbacks {
94            let mut sample_cb = Box::new(callback);
95            match rb_ptr {
96                None => {
97                    // Allocate a new ringbuf manager and add a ringbuf to it
98                    // SAFETY: All pointers are valid or rightly NULL.
99                    //         The object referenced by `sample_cb` is
100                    //         not modified by `libbpf`
101                    let ptr = unsafe {
102                        libbpf_sys::ring_buffer__new(
103                            fd.as_raw_fd(),
104                            c_sample_cb,
105                            sample_cb.deref_mut() as *mut _ as *mut _,
106                            null_mut(),
107                        )
108                    };
109                    let ptr = validate_bpf_ret(ptr).context("failed to create new ring buffer")?;
110                    rb_ptr = Some(ptr)
111                }
112                Some(mut ptr) => {
113                    // Add a ringbuf to the existing ringbuf manager
114                    // SAFETY: All pointers are valid or rightly NULL.
115                    //         The object referenced by `sample_cb` is
116                    //         not modified by `libbpf`
117                    let err = unsafe {
118                        libbpf_sys::ring_buffer__add(
119                            ptr.as_ptr(),
120                            fd.as_raw_fd(),
121                            c_sample_cb,
122                            sample_cb.deref_mut() as *mut _ as *mut _,
123                        )
124                    };
125
126                    // Handle errors
127                    if err != 0 {
128                        // SAFETY: The pointer is valid.
129                        let () = unsafe { libbpf_sys::ring_buffer__free(ptr.as_mut()) };
130                        return Err(Error::from_raw_os_error(err));
131                    }
132                }
133            }
134
135            let () = cbs.push(sample_cb);
136        }
137
138        match rb_ptr {
139            Some(ptr) => Ok(RingBuffer { ptr, _cbs: cbs }),
140            None => Err(Error::with_invalid_data(
141                "You must add at least one ring buffer map and callback before building",
142            )),
143        }
144    }
145
146    unsafe extern "C" fn call_sample_cb(ctx: *mut c_void, data: *mut c_void, size: c_ulong) -> i32 {
147        let callback_struct = ctx as *mut RingBufferCallback<'_>;
148        let callback = unsafe { (*callback_struct).cb.as_mut() };
149        let slice = unsafe { slice::from_raw_parts(data as *const u8, size as usize) };
150
151        callback(slice)
152    }
153}
154
155/// The canonical interface for managing a collection of `ringbuf` maps.
156///
157/// `ringbuf`s are a special kind of [`Map`][crate::Map], used to transfer data
158/// between [`Program`][crate::Program]s and userspace. As of Linux 5.8, the
159/// `ringbuf` map is now preferred over the `perf buffer`.
160#[derive(Debug)]
161pub struct RingBuffer<'cb> {
162    ptr: NonNull<libbpf_sys::ring_buffer>,
163    #[allow(clippy::vec_box)]
164    _cbs: Vec<Box<RingBufferCallback<'cb>>>,
165}
166
167impl RingBuffer<'_> {
168    /// Poll from all open ring buffers, calling the registered callback for
169    /// each one. Polls continually until we either run out of events to consume
170    /// or `timeout` is reached. If `timeout` is `Duration::MAX`, this will block
171    /// indefinitely until an event occurs.
172    ///
173    /// Return the amount of events consumed, or a negative value in case of error.
174    pub fn poll_raw(&self, timeout: Duration) -> i32 {
175        let mut timeout_ms = -1;
176        if timeout != Duration::MAX {
177            timeout_ms = timeout.as_millis() as i32;
178        }
179
180        unsafe { libbpf_sys::ring_buffer__poll(self.ptr.as_ptr(), timeout_ms) }
181    }
182
183    /// Poll from all open ring buffers, calling the registered callback for
184    /// each one. Polls continually until we either run out of events to consume
185    /// or `timeout` is reached. If `timeout` is `Duration::MAX`, this will block
186    /// indefinitely until an event occurs.
187    pub fn poll(&self, timeout: Duration) -> Result<()> {
188        let ret = self.poll_raw(timeout);
189
190        util::parse_ret(ret)
191    }
192
193    /// Greedily consume from all open ring buffers, calling the registered
194    /// callback for each one. Consumes continually until we run out of events
195    /// to consume or one of the callbacks returns a non-zero integer.
196    ///
197    /// Return the amount of events consumed, or a negative value in case of error.
198    pub fn consume_raw(&self) -> i32 {
199        unsafe { libbpf_sys::ring_buffer__consume(self.ptr.as_ptr()) }
200    }
201
202    /// Greedily consume from all open ring buffers, calling the registered
203    /// callback for each one. Continues until `len` items have been consumed,
204    /// no more events are available, or a callback returns a non-zero value.
205    ///
206    /// Return the amount of events consumed, or a negative value in case of error.
207    pub fn consume_raw_n(&self, len: usize) -> i32 {
208        unsafe { libbpf_sys::ring_buffer__consume_n(self.ptr.as_ptr(), len as libbpf_sys::size_t) }
209    }
210
211    /// Greedily consume from all open ring buffers, calling the registered
212    /// callback for each one. Consumes continually until we run out of events
213    /// to consume or one of the callbacks returns a non-zero integer.
214    pub fn consume(&self) -> Result<()> {
215        let ret = self.consume_raw();
216
217        util::parse_ret(ret)
218    }
219
220    /// Get an fd that can be used to sleep until data is available
221    pub fn epoll_fd(&self) -> i32 {
222        unsafe { libbpf_sys::ring_buffer__epoll_fd(self.ptr.as_ptr()) }
223    }
224}
225
226impl AsRawLibbpf for RingBuffer<'_> {
227    type LibbpfType = libbpf_sys::ring_buffer;
228
229    /// Retrieve the underlying [`libbpf_sys::ring_buffer`].
230    fn as_libbpf_object(&self) -> NonNull<Self::LibbpfType> {
231        self.ptr
232    }
233}
234
235// SAFETY: `ring_buffer` objects can safely be polled from any thread.
236unsafe impl Send for RingBuffer<'_> {}
237
238impl Drop for RingBuffer<'_> {
239    fn drop(&mut self) {
240        unsafe {
241            libbpf_sys::ring_buffer__free(self.ptr.as_ptr());
242        }
243    }
244}
245
246#[cfg(test)]
247mod test {
248    use super::*;
249
250    /// Check that `RingBuffer` is `Send`.
251    #[test]
252    fn ringbuffer_is_send() {
253        fn test<T>()
254        where
255            T: Send,
256        {
257        }
258
259        test::<RingBuffer<'_>>();
260    }
261}