Skip to main content

vpp_plugin/vlib/
counter.rs

1//! Per-thread, per-object counters
2//!
3//! Optimised counters suitable for use in node functions.
4
5use std::{
6    ffi::CString,
7    mem::ManuallyDrop,
8    sync::atomic::{AtomicU64, Ordering},
9};
10
11use crate::{
12    bindings::{
13        vlib_combined_counter_main_t, vlib_counter_t, vlib_free_combined_counter,
14        vlib_free_simple_counter, vlib_helper_zero_combined_counter,
15        vlib_helper_zero_simple_counter, vlib_simple_counter_main_t,
16        vlib_validate_combined_counter, vlib_validate_simple_counter,
17    },
18    vlib::{BarrierHeldMainRef, MainRef},
19    vppinfra,
20};
21
22/// Per-thread, per-object simple counters
23pub struct SimpleCounter {
24    counter: vlib_simple_counter_main_t,
25}
26
27impl SimpleCounter {
28    /// Create a new `SimpleCounter`
29    pub fn new(name: &str, stat_segment_name: &str) -> Self {
30        let name_cstr = CString::new(name).unwrap();
31        let stats_segment_name_cstr = CString::new(stat_segment_name).unwrap();
32        Self {
33            counter: vlib_simple_counter_main_t {
34                counters: std::ptr::null_mut(),
35                name: name_cstr.into_raw(),
36                stat_segment_name: stats_segment_name_cstr.into_raw(),
37                stats_entry_index: 0,
38            },
39        }
40    }
41
42    /// Allocate the given index for use, zeroing it in the process
43    ///
44    /// This is similar to the VPP C API `vlib_validate_simple_counter`.
45    ///
46    /// Note that no checks are performed to ensure that the index isn't already in use, but it
47    /// doesn't result in unsoundness if the same value is allocated for an already-allocated and
48    /// not dropped index.
49    pub fn allocate_index(&self, _vm: &BarrierHeldMainRef, index: u32) -> SimpleCounterIndex<'_> {
50        // SAFETY: self.counter is correctly initialised, the fact that the barrier is held
51        // (which can only be done from the main thread) guaranteed that no other threads are
52        // using the counters vector when it is potentially grown in size, and the zero without
53        // using atomics is safe for the same reason.
54        unsafe {
55            vlib_validate_simple_counter(std::ptr::addr_of!(self.counter).cast_mut(), index);
56            vlib_helper_zero_simple_counter(std::ptr::addr_of!(self.counter).cast_mut(), index);
57            SimpleCounterIndex::from_parts(self, index)
58        }
59    }
60}
61
62impl Drop for SimpleCounter {
63    fn drop(&mut self) {
64        // SAFETY: `self.counter` is correctly initialised and both `self.counter.name` and
65        // `self.counter.stat_segment_name` are pointers created from `CString`s
66        unsafe {
67            let _name_cstr = CString::from_raw(self.counter.name);
68            let _stat_segment_name_cstr = CString::from_raw(self.counter.stat_segment_name);
69            vlib_free_simple_counter(std::ptr::addr_of_mut!(self.counter));
70        }
71    }
72}
73
74// SAFETY: it's safe to drop the counter from other threads provided that there are no outstanding
75// references, which is guaranteed by safety preconditions.
76unsafe impl Send for SimpleCounter {}
77// SAFETY: anything that retrieves state that other threads may write to or modifies state is
78// guarded by suitable preconditions, such as SimpleCounter::allocate_index taking a
79// BarrierHeldMainRef reference ensuring it can only be called from the main thread with no
80// worker threads using it concurrently.
81unsafe impl Sync for SimpleCounter {}
82
83/// An allocated index for a [`SimpleCounter`]
84pub struct SimpleCounterIndex<'counter> {
85    counter: &'counter SimpleCounter,
86    index: u32,
87}
88
89impl<'counter> SimpleCounterIndex<'counter> {
90    /// Decomposes a simple counter index into its component parts
91    pub fn into_parts(self) -> (&'counter SimpleCounter, u32) {
92        let me = ManuallyDrop::new(self);
93        (me.counter, me.index)
94    }
95
96    /// Creates a `SimpleCounterIndex` from component parts
97    ///
98    /// # Safety
99    ///
100    /// - Must only be done from VPP worker threads or the main thread
101    /// - `index` must be a valid (previously allocated) index for the counter
102    pub unsafe fn from_parts(counter: &'counter SimpleCounter, index: u32) -> Self {
103        Self { counter, index }
104    }
105
106    fn counter_ptr(&self, vm: &MainRef) -> *mut u64 {
107        let thread_index = vm.thread_index();
108
109        // SAFETY: worker threads cannot be added after VPP has initialised and creation
110        // preconditions ensure `self.index` is valid
111        unsafe {
112            let this_thread_counters = *self.counter.counter.counters.add(thread_index as usize);
113
114            this_thread_counters.add(self.index as usize)
115        }
116    }
117
118    /// Increment this thread's per-thread counter
119    pub fn increment(&self, vm: &MainRef, count: u64) {
120        // SAFETY: no concurrent writers, since the counts are per-thread and safety conditions of zero() are upheld
121        unsafe {
122            let counter = self.counter_ptr(vm);
123            let new_count = *counter + count;
124            AtomicU64::from_ptr(counter).store(new_count, Ordering::Relaxed);
125        }
126    }
127
128    /// Zero all threads' per-thread counters
129    ///
130    /// # Safety
131    ///
132    /// - There must be no concurrent writers to this index. For example, worker threads
133    ///   without the barrier held.
134    pub unsafe fn zero(&self) {
135        // SAFETY: no concurrent writers
136        unsafe {
137            vlib_helper_zero_simple_counter(
138                std::ptr::addr_of!(self.counter.counter).cast_mut(),
139                self.index,
140            );
141        }
142    }
143
144    /// Get the total count, summing for all threads
145    // Note: &MainRef taken to ensure this is only called from a VPP main/worker thread
146    pub fn get(&self, _vm: &MainRef) -> u64 {
147        // SAFETY: counter is a valid vector
148        // Note: vlib_get_simple_counter doesn't use an atomic load and so isn't sound with
149        // concurrent writers.
150        unsafe {
151            let per_thread_counters =
152                vppinfra::vec::VecRef::from_raw(self.counter.counter.counters);
153            per_thread_counters
154                .iter()
155                .fold(0u64, |sum, counter_by_index| {
156                    let counter = AtomicU64::from_ptr(counter_by_index.add(self.index as usize));
157                    sum + counter.load(Ordering::Relaxed)
158                })
159        }
160    }
161}
162
163// SAFETY: methods can be called on the SimpleCounterIndex provided they are done so only on VPP
164// main/worker threads, which are ensured by non-unsafe methods taking a `&MainRef`.
165unsafe impl Send for SimpleCounterIndex<'_> {}
166// SAFETY: methods that mutate the counter index state do so via a combination of atomics and
167// per-thread state.
168unsafe impl Sync for SimpleCounterIndex<'_> {}
169
170/// Combined count of packets and bytes
171#[derive(Copy, Clone, Debug, PartialEq, Eq)]
172pub struct CombinedCount {
173    /// The number of packets
174    pub packets: u64,
175    /// The number of bytes
176    pub bytes: u64,
177}
178
179/// Per-thread, per-object combined packets and bytes counters
180pub struct CombinedCounter {
181    counter: vlib_combined_counter_main_t,
182}
183
184impl CombinedCounter {
185    /// Create a new `CombinedCounter`
186    pub fn new(name: &str, stat_segment_name: &str) -> Self {
187        let name_cstr = CString::new(name).unwrap();
188        let stats_segment_name_cstr = CString::new(stat_segment_name).unwrap();
189        Self {
190            counter: vlib_combined_counter_main_t {
191                counters: std::ptr::null_mut(),
192                name: name_cstr.into_raw(),
193                stat_segment_name: stats_segment_name_cstr.into_raw(),
194                stats_entry_index: 0,
195            },
196        }
197    }
198
199    /// Allocate the given index for use, zeroing it in the process
200    ///
201    /// This is similar to the VPP C API `vlib_validate_combined_counter`.
202    ///
203    /// Note that no checks are performed to ensure that the index isn't already in use, but it
204    /// doesn't result in unsoundness if the same value is allocated for an already-allocated and
205    /// not dropped index.
206    pub fn allocate_index(&self, _vm: &BarrierHeldMainRef, index: u32) -> CombinedCounterIndex<'_> {
207        // SAFETY: self.counter is correctly initialised, the fact that the barrier is held
208        // (which can only be done from the main thread) guaranteed that no other threads are
209        // using the counters vector when it is potentially grown in size, and the zero without
210        // using atomics is safe for the same reason.
211        unsafe {
212            vlib_validate_combined_counter(std::ptr::addr_of!(self.counter).cast_mut(), index);
213            vlib_helper_zero_combined_counter(std::ptr::addr_of!(self.counter).cast_mut(), index);
214            CombinedCounterIndex::from_parts(self, index)
215        }
216    }
217}
218
219impl Drop for CombinedCounter {
220    fn drop(&mut self) {
221        // SAFETY: `self.counter` is correctly initialised and both `self.counter.name` and
222        // `self.counter.stat_segment_name` are pointers created from `CString`s
223        unsafe {
224            let _name_cstr = CString::from_raw(self.counter.name);
225            let _stat_segment_name_cstr = CString::from_raw(self.counter.stat_segment_name);
226            vlib_free_combined_counter(std::ptr::addr_of_mut!(self.counter));
227        }
228    }
229}
230
231// SAFETY: it's safe to drop the counter from other threads provided that there are no outstanding
232// references, which is guaranteed by safety preconditions.
233unsafe impl Send for CombinedCounter {}
234// SAFETY: anything that retrieves state that other threads may write to or modifies state is
235// guarded by suitable preconditions, such as CombinedCounter::allocate_index taking a
236// BarrierHeldMainRef reference ensuring it can only be called from the main thread with no
237// worker threads using it concurrently.
238unsafe impl Sync for CombinedCounter {}
239
240/// An allocated index for a [`CombinedCounter`]
241pub struct CombinedCounterIndex<'counter> {
242    counter: &'counter CombinedCounter,
243    index: u32,
244}
245
246impl<'counter> CombinedCounterIndex<'counter> {
247    /// Decomposes a combined counter index into its component parts
248    pub fn into_parts(self) -> (&'counter CombinedCounter, u32) {
249        let me = ManuallyDrop::new(self);
250        (me.counter, me.index)
251    }
252
253    /// Creates a `CombinedCounterIndex` from component parts
254    ///
255    /// # Safety
256    ///
257    /// - Must only be done from VPP worker threads or the main thread
258    /// - `index` must be a valid (previously allocated) index for the counter
259    pub unsafe fn from_parts(counter: &'counter CombinedCounter, index: u32) -> Self {
260        Self { counter, index }
261    }
262
263    fn counter_ptr(&self, vm: &MainRef) -> *mut vlib_counter_t {
264        let thread_index = vm.thread_index();
265
266        // SAFETY: worker threads cannot be added after VPP has initialised and creation
267        // preconditions ensure `self.index` is valid
268        unsafe {
269            let this_thread_counters = *self.counter.counter.counters.add(thread_index as usize);
270
271            this_thread_counters.add(self.index as usize)
272        }
273    }
274
275    /// Increment this thread's per-thread counter
276    pub fn increment(&self, vm: &MainRef, packets: u64, bytes: u64) {
277        // SAFETY: no concurrent writers, since the counts are per-thread and safety conditions of zero() are upheld
278        unsafe {
279            let counter = self.counter_ptr(vm);
280            let new_packets = (*counter).packets + packets;
281            AtomicU64::from_ptr(std::ptr::addr_of_mut!((*counter).packets))
282                .store(new_packets, Ordering::Relaxed);
283            let new_bytes = (*counter).bytes + bytes;
284            AtomicU64::from_ptr(std::ptr::addr_of_mut!((*counter).bytes))
285                .store(new_bytes, Ordering::Relaxed);
286        }
287    }
288
289    /// Zero all threads' per-thread counters
290    ///
291    /// # Safety
292    ///
293    /// - There must be no concurrent writers to this index. For example, worker threads
294    ///   without the barrier held.
295    pub unsafe fn zero(&self) {
296        // SAFETY: no concurrent writers
297        unsafe {
298            vlib_helper_zero_combined_counter(
299                std::ptr::addr_of!(self.counter.counter).cast_mut(),
300                self.index,
301            );
302        }
303    }
304
305    /// Get the total count, summing for all threads
306    // Note: &MainRef taken to ensure this is only called from a VPP main/worker thread
307    pub fn get(&self, _vm: &MainRef) -> CombinedCount {
308        // SAFETY: counter is a valid vector
309        // Note: vlib_get_combined_counter doesn't use an atomic load and so isn't sound with
310        // concurrent writers.
311        unsafe {
312            let per_thread_counters =
313                vppinfra::vec::VecRef::from_raw(self.counter.counter.counters);
314            per_thread_counters.iter().fold(
315                CombinedCount {
316                    packets: 0,
317                    bytes: 0,
318                },
319                |sum, counter_by_index| {
320                    let packets = AtomicU64::from_ptr(std::ptr::addr_of_mut!(
321                        (*counter_by_index.add(self.index as usize)).packets
322                    ));
323                    let bytes = AtomicU64::from_ptr(std::ptr::addr_of_mut!(
324                        (*counter_by_index.add(self.index as usize)).bytes
325                    ));
326                    CombinedCount {
327                        packets: sum.packets + packets.load(Ordering::Relaxed),
328                        bytes: sum.bytes + bytes.load(Ordering::Relaxed),
329                    }
330                },
331            )
332        }
333    }
334}
335
336// SAFETY: methods can be called on the CombinedCounterIndex provided they are done so only on VPP
337// main/worker threads, which are ensured by non-unsafe methods taking a `&MainRef`.
338unsafe impl Send for CombinedCounterIndex<'_> {}
339// SAFETY: methods that mutate the counter index state do so via a combination of atomics and
340// per-thread state.
341unsafe impl Sync for CombinedCounterIndex<'_> {}