s2n_quic_platform/socket/
stats.rs1use core::{
5 fmt,
6 sync::atomic::{AtomicU64, Ordering},
7 task::Poll,
8};
9use s2n_quic_core::{
10 event::{self, EndpointPublisher},
11 io::event_loop,
12};
13use std::{
14 collections::VecDeque,
15 ffi::c_int,
16 io,
17 sync::{Arc, Mutex},
18};
19
20const ERROR_QUEUE_CAP: usize = 256;
21type Error = c_int;
22
23pub fn channel() -> (Sender, Receiver) {
24 let state = Arc::new(State::default());
25
26 let sender = Sender(state.clone());
27
28 let recv = Receiver {
29 state,
30 pending_errors: VecDeque::with_capacity(ERROR_QUEUE_CAP),
31 };
32
33 (sender, recv)
34}
35
36#[derive(Clone)]
37pub struct Sender(Arc<State>);
38
39impl fmt::Debug for Sender {
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 f.debug_struct("Sender").finish_non_exhaustive()
42 }
43}
44
45impl Sender {
46 #[inline]
47 pub fn send(&self) -> &Stats {
48 &self.0.send
49 }
50
51 #[inline]
52 pub fn recv(&self) -> &Stats {
53 &self.0.recv
54 }
55}
56
57pub struct Receiver {
58 state: Arc<State>,
59 pending_errors: VecDeque<Error>,
60}
61
62impl fmt::Debug for Receiver {
63 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64 f.debug_struct("Receiver").finish_non_exhaustive()
65 }
66}
67
68impl event_loop::Stats for Receiver {
69 #[inline]
70 fn publish<P: EndpointPublisher>(&mut self, publisher: &mut P) {
71 self.state.send.publish(
72 publisher,
73 &mut self.pending_errors,
74 |publisher, errno| {
75 publisher.on_platform_tx_error(event::builder::PlatformTxError { errno });
76 },
77 |publisher, metrics| publisher.on_platform_tx(metrics.into()),
78 );
79 self.state.recv.publish(
80 publisher,
81 &mut self.pending_errors,
82 |publisher, errno| {
83 publisher.on_platform_rx_error(event::builder::PlatformRxError { errno });
84 },
85 |publisher, metrics| publisher.on_platform_rx(metrics.into()),
86 );
87 }
88}
89
90#[derive(Default)]
91struct State {
92 send: Stats,
93 recv: Stats,
94}
95
96pub struct Stats {
97 syscalls: AtomicU64,
98 blocked: AtomicU64,
99 packets: AtomicU64,
100 errors: Mutex<VecDeque<Error>>,
101 total_errors: AtomicU64,
102 dropped_errors: AtomicU64,
103}
104
105impl Default for Stats {
106 fn default() -> Self {
107 Self {
108 syscalls: Default::default(),
109 blocked: Default::default(),
110 packets: Default::default(),
111 errors: Mutex::new(VecDeque::with_capacity(ERROR_QUEUE_CAP)),
112 total_errors: Default::default(),
113 dropped_errors: Default::default(),
114 }
115 }
116}
117
118impl fmt::Debug for Stats {
119 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
120 f.debug_struct("Stats").finish_non_exhaustive()
121 }
122}
123
124impl Stats {
125 #[inline]
126 pub fn on_operation<T, F>(&self, res: &Poll<io::Result<T>>, count_packets: F)
127 where
128 F: FnOnce(&T) -> usize,
129 {
130 match res {
131 Poll::Ready(res) => {
132 self.on_operation_result(res, count_packets);
133 }
134 Poll::Pending => {
135 self.on_operation_pending();
136 }
137 }
138 }
139
140 #[inline]
141 pub fn on_operation_result<T, F>(&self, res: &io::Result<T>, count_packets: F)
142 where
143 F: FnOnce(&T) -> usize,
144 {
145 match res {
146 Ok(value) => {
147 let packets = count_packets(value);
148 self.on_operation_ready(packets);
149 }
150 Err(err)
151 if matches!(
152 err.kind(),
153 io::ErrorKind::WouldBlock | io::ErrorKind::Interrupted
154 ) =>
155 {
156 self.on_operation_pending();
157 }
158 Err(err) => {
159 self.on_operation_ready(0);
160 if let Some(err) = err.raw_os_error() {
161 self.on_error(err);
162 } else {
163 self.dropped_errors.fetch_add(1, Ordering::Relaxed);
164 }
165 }
166 }
167 }
168
169 #[inline]
170 pub fn on_operation_ready(&self, packets: usize) {
171 if packets > 0 {
172 self.packets.fetch_add(packets as _, Ordering::Relaxed);
173 }
174 self.syscalls.fetch_add(1, Ordering::Relaxed);
175 }
176
177 #[inline]
178 pub fn on_operation_pending(&self) {
179 self.syscalls.fetch_add(1, Ordering::Relaxed);
180 self.blocked.fetch_add(1, Ordering::Relaxed);
181 }
182
183 #[inline]
184 pub fn on_error(&self, error: Error) {
185 self.total_errors.fetch_add(1, Ordering::Relaxed);
186
187 let mut did_drop = false;
188 if let Ok(mut queue) = self.errors.try_lock() {
189 if queue.len() == ERROR_QUEUE_CAP {
191 let _ = queue.pop_front();
192 did_drop = true;
193 }
194
195 queue.push_back(error);
196 } else {
197 did_drop = true;
198 };
199
200 if did_drop {
201 self.dropped_errors.fetch_add(1, Ordering::Relaxed);
202 }
203 }
204
205 #[inline]
206 fn publish<P, OnError, OnMetrics>(
207 &self,
208 publisher: &mut P,
209 errors: &mut VecDeque<Error>,
210 on_error: OnError,
211 on_metrics: OnMetrics,
212 ) where
213 OnError: Fn(&mut P, Error),
214 OnMetrics: Fn(&mut P, Metrics),
215 {
216 core::mem::swap(&mut *self.errors.lock().unwrap(), errors);
217
218 for error in errors.drain(..) {
219 on_error(publisher, error);
220 }
221
222 let metrics = self.metrics();
223 if metrics.syscalls > 0 {
224 on_metrics(publisher, metrics);
225 }
226 }
227
228 #[inline]
229 fn metrics(&self) -> Metrics {
230 macro_rules! take {
231 ($field:ident) => {{
232 let value = self.$field.swap(0, Ordering::Relaxed);
233 value.try_into().unwrap_or(usize::MAX)
234 }};
235 }
236
237 let packets = take!(packets);
238 let syscalls = take!(syscalls);
239 let blocked_syscalls = take!(blocked);
240 let total_errors = take!(total_errors);
241 let dropped_errors = take!(dropped_errors);
242
243 Metrics {
244 packets,
245 syscalls,
246 blocked_syscalls,
247 total_errors,
248 dropped_errors,
249 }
250 }
251}
252
253#[derive(Clone, Copy)]
254struct Metrics {
255 packets: usize,
256 syscalls: usize,
257 blocked_syscalls: usize,
258 total_errors: usize,
259 dropped_errors: usize,
260}
261
262impl From<Metrics> for event::builder::PlatformRx {
263 fn from(value: Metrics) -> Self {
264 Self {
265 count: value.packets,
266 syscalls: value.syscalls,
267 blocked_syscalls: value.blocked_syscalls,
268 total_errors: value.total_errors,
269 dropped_errors: value.dropped_errors,
270 }
271 }
272}
273
274impl From<Metrics> for event::builder::PlatformTx {
275 fn from(value: Metrics) -> Self {
276 Self {
277 count: value.packets,
278 syscalls: value.syscalls,
279 blocked_syscalls: value.blocked_syscalls,
280 total_errors: value.total_errors,
281 dropped_errors: value.dropped_errors,
282 }
283 }
284}