1use core::task::{Context, Poll};
6use libbpf_rs::{query::MapInfoIter, Map};
7use std::io::Result;
8use std::num::NonZeroUsize;
9use std::os::fd::{AsFd, AsRawFd, RawFd};
10use tokio::io::unix::AsyncFd;
11use tokio::io::{AsyncRead, ReadBuf};
12
13const BPF_RINGBUF_BUSY_BIT: u32 = 1 << 31;
14const BPF_RINGBUF_DISCARD_BIT: u32 = 1 << 30;
15const BPF_RINGBUF_HDR_SZ: u32 = 8;
16
17pub struct RingBuffer {
18 mask: u64,
19 async_fd: AsyncFd<RawFd>,
20 consumer: *mut core::ffi::c_void,
21 producer: *mut core::ffi::c_void,
22 data: *mut core::ffi::c_void,
23}
24
25impl RingBuffer {
26 pub fn new(map: &Map) -> Self {
27 let mut max_entries = 0;
28 for m in MapInfoIter::default() {
29 if let Ok(name) = m.name.to_str() {
30 if name == map.name() {
31 max_entries = m.max_entries;
32 }
33 }
34 }
35 let psize = page_size::get();
36 let fd = map.as_fd().as_raw_fd();
37 let consumer = unsafe {
38 nix::sys::mman::mmap(
39 None,
40 NonZeroUsize::new(psize).expect("page size must not be zero"),
41 nix::sys::mman::ProtFlags::PROT_WRITE | nix::sys::mman::ProtFlags::PROT_READ,
42 nix::sys::mman::MapFlags::MAP_SHARED,
43 fd,
44 0,
45 )
46 .unwrap()
47 };
48 let producer = unsafe {
49 nix::sys::mman::mmap(
50 None,
51 NonZeroUsize::new(psize + 2 * max_entries as usize)
52 .expect("page size + 2 * max_entries must not be zero"),
53 nix::sys::mman::ProtFlags::PROT_READ,
54 nix::sys::mman::MapFlags::MAP_SHARED,
55 fd,
56 psize as i64,
57 )
58 .unwrap()
59 };
60
61 RingBuffer {
62 mask: (max_entries - 1) as u64,
63 async_fd: AsyncFd::with_interest(fd, tokio::io::Interest::READABLE).unwrap(),
64 consumer,
65 producer,
66 data: unsafe { producer.add(psize) },
67 }
68 }
69
70 fn roundup_len(mut len: u32) -> u32 {
71 len <<= 2;
72 len >>= 2;
73 len += BPF_RINGBUF_HDR_SZ;
74 (len + 7) / 8 * 8
75 }
76}
77
78impl Drop for RingBuffer {
79 fn drop(&mut self) {
80 let psize = page_size::get();
81 unsafe {
82 let _ = nix::sys::mman::munmap(self.consumer, psize);
83 let _ = nix::sys::mman::munmap(self.producer, psize + 2 * (self.mask as usize + 1));
84 }
85 }
86}
87
88impl AsyncRead for RingBuffer {
89 fn poll_read(
90 self: core::pin::Pin<&mut Self>,
91 cx: &mut Context<'_>,
92 buf: &mut ReadBuf<'_>,
93 ) -> Poll<Result<()>> {
94 loop {
95 let mut cons_pos =
96 unsafe { std::ptr::read_volatile(self.consumer as *const std::os::raw::c_ulong) };
97 std::sync::atomic::fence(std::sync::atomic::Ordering::SeqCst);
98 let prod_pos =
99 unsafe { std::ptr::read_volatile(self.producer as *const std::os::raw::c_ulong) };
100 std::sync::atomic::fence(std::sync::atomic::Ordering::SeqCst);
101 if cons_pos < prod_pos {
102 let len_ptr = unsafe { self.data.offset((cons_pos & self.mask) as isize) };
103 let mut len = unsafe { std::ptr::read_volatile(len_ptr as *const u32) };
104 std::sync::atomic::fence(std::sync::atomic::Ordering::SeqCst);
105
106 if (len & BPF_RINGBUF_BUSY_BIT) == 0 {
107 cons_pos += RingBuffer::roundup_len(len) as u64;
108 if (len & BPF_RINGBUF_DISCARD_BIT) == 0 {
109 let sample = unsafe {
110 std::slice::from_raw_parts_mut(
111 len_ptr.offset(BPF_RINGBUF_HDR_SZ as isize) as *mut u8,
112 len as usize,
113 )
114 };
115 len = std::cmp::min(len, buf.capacity() as u32);
116 buf.put_slice(&sample[..len as usize]);
117 }
118 std::sync::atomic::fence(std::sync::atomic::Ordering::SeqCst);
119 unsafe {
120 std::ptr::write_volatile(
121 self.consumer as *mut std::os::raw::c_ulong,
122 cons_pos,
123 )
124 };
125 if (len & BPF_RINGBUF_DISCARD_BIT) == 0 {
126 return Poll::Ready(Ok(()));
127 } else {
128 continue;
129 }
130 }
131 }
132 let mut ev = futures::ready!(self.async_fd.poll_read_ready(cx))?;
133 ev.clear_ready();
134 }
135 }
136}