vpp_plugin/vlib/counter.rs
1//! Per-thread, per-object counters
2//!
3//! Optimised counters suitable for use in node functions.
4
5use std::{
6 ffi::CString,
7 mem::ManuallyDrop,
8 sync::atomic::{AtomicU64, Ordering},
9};
10
11use crate::{
12 bindings::{
13 vlib_combined_counter_main_t, vlib_counter_t, vlib_free_combined_counter,
14 vlib_free_simple_counter, vlib_helper_zero_combined_counter,
15 vlib_helper_zero_simple_counter, vlib_simple_counter_main_t,
16 vlib_validate_combined_counter, vlib_validate_simple_counter,
17 },
18 vlib::{BarrierHeldMainRef, MainRef},
19 vppinfra,
20};
21
22/// Per-thread, per-object simple counters
23pub struct SimpleCounter {
24 counter: vlib_simple_counter_main_t,
25}
26
27impl SimpleCounter {
28 /// Create a new `SimpleCounter`
29 pub fn new(name: &str, stat_segment_name: &str) -> Self {
30 let name_cstr = CString::new(name).unwrap();
31 let stats_segment_name_cstr = CString::new(stat_segment_name).unwrap();
32 Self {
33 counter: vlib_simple_counter_main_t {
34 counters: std::ptr::null_mut(),
35 name: name_cstr.into_raw(),
36 stat_segment_name: stats_segment_name_cstr.into_raw(),
37 stats_entry_index: 0,
38 },
39 }
40 }
41
42 /// Allocate the given index for use, zeroing it in the process
43 ///
44 /// This is similar to the VPP C API `vlib_validate_simple_counter`.
45 ///
46 /// Note that no checks are performed to ensure that the index isn't already in use, but it
47 /// doesn't result in unsoundness if the same value is allocated for an already-allocated and
48 /// not dropped index.
49 pub fn allocate_index(&self, _vm: &BarrierHeldMainRef, index: u32) -> SimpleCounterIndex<'_> {
50 // SAFETY: self.counter is correctly initialised, the fact that the barrier is held
51 // (which can only be done from the main thread) guaranteed that no other threads are
52 // using the counters vector when it is potentially grown in size, and the zero without
53 // using atomics is safe for the same reason.
54 unsafe {
55 vlib_validate_simple_counter(std::ptr::addr_of!(self.counter).cast_mut(), index);
56 vlib_helper_zero_simple_counter(std::ptr::addr_of!(self.counter).cast_mut(), index);
57 SimpleCounterIndex::from_parts(self, index)
58 }
59 }
60}
61
62impl Drop for SimpleCounter {
63 fn drop(&mut self) {
64 // SAFETY: `self.counter` is correctly initialised and both `self.counter.name` and
65 // `self.counter.stat_segment_name` are pointers created from `CString`s
66 unsafe {
67 let _name_cstr = CString::from_raw(self.counter.name);
68 let _stat_segment_name_cstr = CString::from_raw(self.counter.stat_segment_name);
69 vlib_free_simple_counter(std::ptr::addr_of_mut!(self.counter));
70 }
71 }
72}
73
74// SAFETY: it's safe to drop the counter from other threads provided that there are no outstanding
75// references, which is guaranteed by safety preconditions.
76unsafe impl Send for SimpleCounter {}
77// SAFETY: anything that retrieves state that other threads may write to or modifies state is
78// guarded by suitable preconditions, such as SimpleCounter::allocate_index taking a
79// BarrierHeldMainRef reference ensuring it can only be called from the main thread with no
80// worker threads using it concurrently.
81unsafe impl Sync for SimpleCounter {}
82
83/// An allocated index for a [`SimpleCounter`]
84pub struct SimpleCounterIndex<'counter> {
85 counter: &'counter SimpleCounter,
86 index: u32,
87}
88
89impl<'counter> SimpleCounterIndex<'counter> {
90 /// Decomposes a simple counter index into its component parts
91 pub fn into_parts(self) -> (&'counter SimpleCounter, u32) {
92 let me = ManuallyDrop::new(self);
93 (me.counter, me.index)
94 }
95
96 /// Creates a `SimpleCounterIndex` from component parts
97 ///
98 /// # Safety
99 ///
100 /// - Must only be done from VPP worker threads or the main thread
101 /// - `index` must be a valid (previously allocated) index for the counter
102 pub unsafe fn from_parts(counter: &'counter SimpleCounter, index: u32) -> Self {
103 Self { counter, index }
104 }
105
106 fn counter_ptr(&self, vm: &MainRef) -> *mut u64 {
107 let thread_index = vm.thread_index();
108
109 // SAFETY: worker threads cannot be added after VPP has initialised and creation
110 // preconditions ensure `self.index` is valid
111 unsafe {
112 let this_thread_counters = *self.counter.counter.counters.add(thread_index as usize);
113
114 this_thread_counters.add(self.index as usize)
115 }
116 }
117
118 /// Increment this thread's per-thread counter
119 pub fn increment(&self, vm: &MainRef, count: u64) {
120 // SAFETY: no concurrent writers, since the counts are per-thread and safety conditions of zero() are upheld
121 unsafe {
122 let counter = self.counter_ptr(vm);
123 let new_count = *counter + count;
124 AtomicU64::from_ptr(counter).store(new_count, Ordering::Relaxed);
125 }
126 }
127
128 /// Zero all threads' per-thread counters
129 ///
130 /// # Safety
131 ///
132 /// - There must be no concurrent writers to this index. For example, worker threads
133 /// without the barrier held.
134 pub unsafe fn zero(&self) {
135 // SAFETY: no concurrent writers
136 unsafe {
137 vlib_helper_zero_simple_counter(
138 std::ptr::addr_of!(self.counter.counter).cast_mut(),
139 self.index,
140 );
141 }
142 }
143
144 /// Get the total count, summing for all threads
145 // Note: &MainRef taken to ensure this is only called from a VPP main/worker thread
146 pub fn get(&self, _vm: &MainRef) -> u64 {
147 // SAFETY: counter is a valid vector
148 // Note: vlib_get_simple_counter doesn't use an atomic load and so isn't sound with
149 // concurrent writers.
150 unsafe {
151 let per_thread_counters =
152 vppinfra::vec::VecRef::from_raw(self.counter.counter.counters);
153 per_thread_counters
154 .iter()
155 .fold(0u64, |sum, counter_by_index| {
156 let counter = AtomicU64::from_ptr(counter_by_index.add(self.index as usize));
157 sum + counter.load(Ordering::Relaxed)
158 })
159 }
160 }
161}
162
163// SAFETY: methods can be called on the SimpleCounterIndex provided they are done so only on VPP
164// main/worker threads, which are ensured by non-unsafe methods taking a `&MainRef`.
165unsafe impl Send for SimpleCounterIndex<'_> {}
166// SAFETY: methods that mutate the counter index state do so via a combination of atomics and
167// per-thread state.
168unsafe impl Sync for SimpleCounterIndex<'_> {}
169
170/// Combined count of packets and bytes
171#[derive(Copy, Clone, Debug, PartialEq, Eq)]
172pub struct CombinedCount {
173 /// The number of packets
174 pub packets: u64,
175 /// The number of bytes
176 pub bytes: u64,
177}
178
179/// Per-thread, per-object combined packets and bytes counters
180pub struct CombinedCounter {
181 counter: vlib_combined_counter_main_t,
182}
183
184impl CombinedCounter {
185 /// Create a new `CombinedCounter`
186 pub fn new(name: &str, stat_segment_name: &str) -> Self {
187 let name_cstr = CString::new(name).unwrap();
188 let stats_segment_name_cstr = CString::new(stat_segment_name).unwrap();
189 Self {
190 counter: vlib_combined_counter_main_t {
191 counters: std::ptr::null_mut(),
192 name: name_cstr.into_raw(),
193 stat_segment_name: stats_segment_name_cstr.into_raw(),
194 stats_entry_index: 0,
195 },
196 }
197 }
198
199 /// Allocate the given index for use, zeroing it in the process
200 ///
201 /// This is similar to the VPP C API `vlib_validate_combined_counter`.
202 ///
203 /// Note that no checks are performed to ensure that the index isn't already in use, but it
204 /// doesn't result in unsoundness if the same value is allocated for an already-allocated and
205 /// not dropped index.
206 pub fn allocate_index(&self, _vm: &BarrierHeldMainRef, index: u32) -> CombinedCounterIndex<'_> {
207 // SAFETY: self.counter is correctly initialised, the fact that the barrier is held
208 // (which can only be done from the main thread) guaranteed that no other threads are
209 // using the counters vector when it is potentially grown in size, and the zero without
210 // using atomics is safe for the same reason.
211 unsafe {
212 vlib_validate_combined_counter(std::ptr::addr_of!(self.counter).cast_mut(), index);
213 vlib_helper_zero_combined_counter(std::ptr::addr_of!(self.counter).cast_mut(), index);
214 CombinedCounterIndex::from_parts(self, index)
215 }
216 }
217}
218
219impl Drop for CombinedCounter {
220 fn drop(&mut self) {
221 // SAFETY: `self.counter` is correctly initialised and both `self.counter.name` and
222 // `self.counter.stat_segment_name` are pointers created from `CString`s
223 unsafe {
224 let _name_cstr = CString::from_raw(self.counter.name);
225 let _stat_segment_name_cstr = CString::from_raw(self.counter.stat_segment_name);
226 vlib_free_combined_counter(std::ptr::addr_of_mut!(self.counter));
227 }
228 }
229}
230
231// SAFETY: it's safe to drop the counter from other threads provided that there are no outstanding
232// references, which is guaranteed by safety preconditions.
233unsafe impl Send for CombinedCounter {}
234// SAFETY: anything that retrieves state that other threads may write to or modifies state is
235// guarded by suitable preconditions, such as CombinedCounter::allocate_index taking a
236// BarrierHeldMainRef reference ensuring it can only be called from the main thread with no
237// worker threads using it concurrently.
238unsafe impl Sync for CombinedCounter {}
239
240/// An allocated index for a [`CombinedCounter`]
241pub struct CombinedCounterIndex<'counter> {
242 counter: &'counter CombinedCounter,
243 index: u32,
244}
245
246impl<'counter> CombinedCounterIndex<'counter> {
247 /// Decomposes a combined counter index into its component parts
248 pub fn into_parts(self) -> (&'counter CombinedCounter, u32) {
249 let me = ManuallyDrop::new(self);
250 (me.counter, me.index)
251 }
252
253 /// Creates a `CombinedCounterIndex` from component parts
254 ///
255 /// # Safety
256 ///
257 /// - Must only be done from VPP worker threads or the main thread
258 /// - `index` must be a valid (previously allocated) index for the counter
259 pub unsafe fn from_parts(counter: &'counter CombinedCounter, index: u32) -> Self {
260 Self { counter, index }
261 }
262
263 fn counter_ptr(&self, vm: &MainRef) -> *mut vlib_counter_t {
264 let thread_index = vm.thread_index();
265
266 // SAFETY: worker threads cannot be added after VPP has initialised and creation
267 // preconditions ensure `self.index` is valid
268 unsafe {
269 let this_thread_counters = *self.counter.counter.counters.add(thread_index as usize);
270
271 this_thread_counters.add(self.index as usize)
272 }
273 }
274
275 /// Increment this thread's per-thread counter
276 pub fn increment(&self, vm: &MainRef, packets: u64, bytes: u64) {
277 // SAFETY: no concurrent writers, since the counts are per-thread and safety conditions of zero() are upheld
278 unsafe {
279 let counter = self.counter_ptr(vm);
280 let new_packets = (*counter).packets + packets;
281 AtomicU64::from_ptr(std::ptr::addr_of_mut!((*counter).packets))
282 .store(new_packets, Ordering::Relaxed);
283 let new_bytes = (*counter).bytes + bytes;
284 AtomicU64::from_ptr(std::ptr::addr_of_mut!((*counter).bytes))
285 .store(new_bytes, Ordering::Relaxed);
286 }
287 }
288
289 /// Zero all threads' per-thread counters
290 ///
291 /// # Safety
292 ///
293 /// - There must be no concurrent writers to this index. For example, worker threads
294 /// without the barrier held.
295 pub unsafe fn zero(&self) {
296 // SAFETY: no concurrent writers
297 unsafe {
298 vlib_helper_zero_combined_counter(
299 std::ptr::addr_of!(self.counter.counter).cast_mut(),
300 self.index,
301 );
302 }
303 }
304
305 /// Get the total count, summing for all threads
306 // Note: &MainRef taken to ensure this is only called from a VPP main/worker thread
307 pub fn get(&self, _vm: &MainRef) -> CombinedCount {
308 // SAFETY: counter is a valid vector
309 // Note: vlib_get_combined_counter doesn't use an atomic load and so isn't sound with
310 // concurrent writers.
311 unsafe {
312 let per_thread_counters =
313 vppinfra::vec::VecRef::from_raw(self.counter.counter.counters);
314 per_thread_counters.iter().fold(
315 CombinedCount {
316 packets: 0,
317 bytes: 0,
318 },
319 |sum, counter_by_index| {
320 let packets = AtomicU64::from_ptr(std::ptr::addr_of_mut!(
321 (*counter_by_index.add(self.index as usize)).packets
322 ));
323 let bytes = AtomicU64::from_ptr(std::ptr::addr_of_mut!(
324 (*counter_by_index.add(self.index as usize)).bytes
325 ));
326 CombinedCount {
327 packets: sum.packets + packets.load(Ordering::Relaxed),
328 bytes: sum.bytes + bytes.load(Ordering::Relaxed),
329 }
330 },
331 )
332 }
333 }
334}
335
336// SAFETY: methods can be called on the CombinedCounterIndex provided they are done so only on VPP
337// main/worker threads, which are ensured by non-unsafe methods taking a `&MainRef`.
338unsafe impl Send for CombinedCounterIndex<'_> {}
339// SAFETY: methods that mutate the counter index state do so via a combination of atomics and
340// per-thread state.
341unsafe impl Sync for CombinedCounterIndex<'_> {}