libbpf_async/
ringbuf.rs

1// Copyright (C) 2021 and 2022 The libbpf-async Authors.
2//
3// Licensed under LGPL-2.1 or BSD-2-Clause.
4
5use core::task::{Context, Poll};
6use libbpf_rs::{query::MapInfoIter, Map};
7use std::io::Result;
8use std::num::NonZeroUsize;
9use std::os::fd::{AsFd, AsRawFd, RawFd};
10use tokio::io::unix::AsyncFd;
11use tokio::io::{AsyncRead, ReadBuf};
12
13const BPF_RINGBUF_BUSY_BIT: u32 = 1 << 31;
14const BPF_RINGBUF_DISCARD_BIT: u32 = 1 << 30;
15const BPF_RINGBUF_HDR_SZ: u32 = 8;
16
17pub struct RingBuffer {
18    mask: u64,
19    async_fd: AsyncFd<RawFd>,
20    consumer: *mut core::ffi::c_void,
21    producer: *mut core::ffi::c_void,
22    data: *mut core::ffi::c_void,
23}
24
25impl RingBuffer {
26    pub fn new(map: &Map) -> Self {
27        let mut max_entries = 0;
28        for m in MapInfoIter::default() {
29            if let Ok(name) = m.name.to_str() {
30                if name == map.name() {
31                    max_entries = m.max_entries;
32                }
33            }
34        }
35        let psize = page_size::get();
36        let fd = map.as_fd().as_raw_fd();
37        let consumer = unsafe {
38            nix::sys::mman::mmap(
39                None,
40                NonZeroUsize::new(psize).expect("page size must not be zero"),
41                nix::sys::mman::ProtFlags::PROT_WRITE | nix::sys::mman::ProtFlags::PROT_READ,
42                nix::sys::mman::MapFlags::MAP_SHARED,
43                fd,
44                0,
45            )
46            .unwrap()
47        };
48        let producer = unsafe {
49            nix::sys::mman::mmap(
50                None,
51                NonZeroUsize::new(psize + 2 * max_entries as usize)
52                    .expect("page size + 2 * max_entries must not be zero"),
53                nix::sys::mman::ProtFlags::PROT_READ,
54                nix::sys::mman::MapFlags::MAP_SHARED,
55                fd,
56                psize as i64,
57            )
58            .unwrap()
59        };
60
61        RingBuffer {
62            mask: (max_entries - 1) as u64,
63            async_fd: AsyncFd::with_interest(fd, tokio::io::Interest::READABLE).unwrap(),
64            consumer,
65            producer,
66            data: unsafe { producer.add(psize) },
67        }
68    }
69
70    fn roundup_len(mut len: u32) -> u32 {
71        len <<= 2;
72        len >>= 2;
73        len += BPF_RINGBUF_HDR_SZ;
74        (len + 7) / 8 * 8
75    }
76}
77
78impl Drop for RingBuffer {
79    fn drop(&mut self) {
80        let psize = page_size::get();
81        unsafe {
82            let _ = nix::sys::mman::munmap(self.consumer, psize);
83            let _ = nix::sys::mman::munmap(self.producer, psize + 2 * (self.mask as usize + 1));
84        }
85    }
86}
87
88impl AsyncRead for RingBuffer {
89    fn poll_read(
90        self: core::pin::Pin<&mut Self>,
91        cx: &mut Context<'_>,
92        buf: &mut ReadBuf<'_>,
93    ) -> Poll<Result<()>> {
94        loop {
95            let mut cons_pos =
96                unsafe { std::ptr::read_volatile(self.consumer as *const std::os::raw::c_ulong) };
97            std::sync::atomic::fence(std::sync::atomic::Ordering::SeqCst);
98            let prod_pos =
99                unsafe { std::ptr::read_volatile(self.producer as *const std::os::raw::c_ulong) };
100            std::sync::atomic::fence(std::sync::atomic::Ordering::SeqCst);
101            if cons_pos < prod_pos {
102                let len_ptr = unsafe { self.data.offset((cons_pos & self.mask) as isize) };
103                let mut len = unsafe { std::ptr::read_volatile(len_ptr as *const u32) };
104                std::sync::atomic::fence(std::sync::atomic::Ordering::SeqCst);
105
106                if (len & BPF_RINGBUF_BUSY_BIT) == 0 {
107                    cons_pos += RingBuffer::roundup_len(len) as u64;
108                    if (len & BPF_RINGBUF_DISCARD_BIT) == 0 {
109                        let sample = unsafe {
110                            std::slice::from_raw_parts_mut(
111                                len_ptr.offset(BPF_RINGBUF_HDR_SZ as isize) as *mut u8,
112                                len as usize,
113                            )
114                        };
115                        len = std::cmp::min(len, buf.capacity() as u32);
116                        buf.put_slice(&sample[..len as usize]);
117                    }
118                    std::sync::atomic::fence(std::sync::atomic::Ordering::SeqCst);
119                    unsafe {
120                        std::ptr::write_volatile(
121                            self.consumer as *mut std::os::raw::c_ulong,
122                            cons_pos,
123                        )
124                    };
125                    if (len & BPF_RINGBUF_DISCARD_BIT) == 0 {
126                        return Poll::Ready(Ok(()));
127                    } else {
128                        continue;
129                    }
130                }
131            }
132            let mut ev = futures::ready!(self.async_fd.poll_read_ready(cx))?;
133            ev.clear_ready();
134        }
135    }
136}