s2n_quic_platform/socket/
stats.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use core::{
5    fmt,
6    sync::atomic::{AtomicU64, Ordering},
7    task::Poll,
8};
9use s2n_quic_core::{
10    event::{self, EndpointPublisher},
11    io::event_loop,
12};
13use std::{
14    collections::VecDeque,
15    ffi::c_int,
16    io,
17    sync::{Arc, Mutex},
18};
19
20const ERROR_QUEUE_CAP: usize = 256;
21type Error = c_int;
22
23pub fn channel() -> (Sender, Receiver) {
24    let state = Arc::new(State::default());
25
26    let sender = Sender(state.clone());
27
28    let recv = Receiver {
29        state,
30        pending_errors: VecDeque::with_capacity(ERROR_QUEUE_CAP),
31    };
32
33    (sender, recv)
34}
35
36#[derive(Clone)]
37pub struct Sender(Arc<State>);
38
39impl fmt::Debug for Sender {
40    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41        f.debug_struct("Sender").finish_non_exhaustive()
42    }
43}
44
45impl Sender {
46    #[inline]
47    pub fn send(&self) -> &Stats {
48        &self.0.send
49    }
50
51    #[inline]
52    pub fn recv(&self) -> &Stats {
53        &self.0.recv
54    }
55}
56
57pub struct Receiver {
58    state: Arc<State>,
59    pending_errors: VecDeque<Error>,
60}
61
62impl fmt::Debug for Receiver {
63    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64        f.debug_struct("Receiver").finish_non_exhaustive()
65    }
66}
67
68impl event_loop::Stats for Receiver {
69    #[inline]
70    fn publish<P: EndpointPublisher>(&mut self, publisher: &mut P) {
71        self.state.send.publish(
72            publisher,
73            &mut self.pending_errors,
74            |publisher, errno| {
75                publisher.on_platform_tx_error(event::builder::PlatformTxError { errno });
76            },
77            |publisher, metrics| publisher.on_platform_tx(metrics.into()),
78        );
79        self.state.recv.publish(
80            publisher,
81            &mut self.pending_errors,
82            |publisher, errno| {
83                publisher.on_platform_rx_error(event::builder::PlatformRxError { errno });
84            },
85            |publisher, metrics| publisher.on_platform_rx(metrics.into()),
86        );
87    }
88}
89
90#[derive(Default)]
91struct State {
92    send: Stats,
93    recv: Stats,
94}
95
96pub struct Stats {
97    syscalls: AtomicU64,
98    blocked: AtomicU64,
99    packets: AtomicU64,
100    errors: Mutex<VecDeque<Error>>,
101    total_errors: AtomicU64,
102    dropped_errors: AtomicU64,
103}
104
105impl Default for Stats {
106    fn default() -> Self {
107        Self {
108            syscalls: Default::default(),
109            blocked: Default::default(),
110            packets: Default::default(),
111            errors: Mutex::new(VecDeque::with_capacity(ERROR_QUEUE_CAP)),
112            total_errors: Default::default(),
113            dropped_errors: Default::default(),
114        }
115    }
116}
117
118impl fmt::Debug for Stats {
119    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
120        f.debug_struct("Stats").finish_non_exhaustive()
121    }
122}
123
124impl Stats {
125    #[inline]
126    pub fn on_operation<T, F>(&self, res: &Poll<io::Result<T>>, count_packets: F)
127    where
128        F: FnOnce(&T) -> usize,
129    {
130        match res {
131            Poll::Ready(res) => {
132                self.on_operation_result(res, count_packets);
133            }
134            Poll::Pending => {
135                self.on_operation_pending();
136            }
137        }
138    }
139
140    #[inline]
141    pub fn on_operation_result<T, F>(&self, res: &io::Result<T>, count_packets: F)
142    where
143        F: FnOnce(&T) -> usize,
144    {
145        match res {
146            Ok(value) => {
147                let packets = count_packets(value);
148                self.on_operation_ready(packets);
149            }
150            Err(err)
151                if matches!(
152                    err.kind(),
153                    io::ErrorKind::WouldBlock | io::ErrorKind::Interrupted
154                ) =>
155            {
156                self.on_operation_pending();
157            }
158            Err(err) => {
159                self.on_operation_ready(0);
160                if let Some(err) = err.raw_os_error() {
161                    self.on_error(err);
162                } else {
163                    self.dropped_errors.fetch_add(1, Ordering::Relaxed);
164                }
165            }
166        }
167    }
168
169    #[inline]
170    pub fn on_operation_ready(&self, packets: usize) {
171        if packets > 0 {
172            self.packets.fetch_add(packets as _, Ordering::Relaxed);
173        }
174        self.syscalls.fetch_add(1, Ordering::Relaxed);
175    }
176
177    #[inline]
178    pub fn on_operation_pending(&self) {
179        self.syscalls.fetch_add(1, Ordering::Relaxed);
180        self.blocked.fetch_add(1, Ordering::Relaxed);
181    }
182
183    #[inline]
184    pub fn on_error(&self, error: Error) {
185        self.total_errors.fetch_add(1, Ordering::Relaxed);
186
187        let mut did_drop = false;
188        if let Ok(mut queue) = self.errors.try_lock() {
189            // drop old errors
190            if queue.len() == ERROR_QUEUE_CAP {
191                let _ = queue.pop_front();
192                did_drop = true;
193            }
194
195            queue.push_back(error);
196        } else {
197            did_drop = true;
198        };
199
200        if did_drop {
201            self.dropped_errors.fetch_add(1, Ordering::Relaxed);
202        }
203    }
204
205    #[inline]
206    fn publish<P, OnError, OnMetrics>(
207        &self,
208        publisher: &mut P,
209        errors: &mut VecDeque<Error>,
210        on_error: OnError,
211        on_metrics: OnMetrics,
212    ) where
213        OnError: Fn(&mut P, Error),
214        OnMetrics: Fn(&mut P, Metrics),
215    {
216        core::mem::swap(&mut *self.errors.lock().unwrap(), errors);
217
218        for error in errors.drain(..) {
219            on_error(publisher, error);
220        }
221
222        let metrics = self.metrics();
223        if metrics.syscalls > 0 {
224            on_metrics(publisher, metrics);
225        }
226    }
227
228    #[inline]
229    fn metrics(&self) -> Metrics {
230        macro_rules! take {
231            ($field:ident) => {{
232                let value = self.$field.swap(0, Ordering::Relaxed);
233                value.try_into().unwrap_or(usize::MAX)
234            }};
235        }
236
237        let packets = take!(packets);
238        let syscalls = take!(syscalls);
239        let blocked_syscalls = take!(blocked);
240        let total_errors = take!(total_errors);
241        let dropped_errors = take!(dropped_errors);
242
243        Metrics {
244            packets,
245            syscalls,
246            blocked_syscalls,
247            total_errors,
248            dropped_errors,
249        }
250    }
251}
252
253#[derive(Clone, Copy)]
254struct Metrics {
255    packets: usize,
256    syscalls: usize,
257    blocked_syscalls: usize,
258    total_errors: usize,
259    dropped_errors: usize,
260}
261
262impl From<Metrics> for event::builder::PlatformRx {
263    fn from(value: Metrics) -> Self {
264        Self {
265            count: value.packets,
266            syscalls: value.syscalls,
267            blocked_syscalls: value.blocked_syscalls,
268            total_errors: value.total_errors,
269            dropped_errors: value.dropped_errors,
270        }
271    }
272}
273
274impl From<Metrics> for event::builder::PlatformTx {
275    fn from(value: Metrics) -> Self {
276        Self {
277            count: value.packets,
278            syscalls: value.syscalls,
279            blocked_syscalls: value.blocked_syscalls,
280            total_errors: value.total_errors,
281            dropped_errors: value.dropped_errors,
282        }
283    }
284}