sl_mpc_mate/coord/
stats.rs1use std::{
5 ops::{Deref, DerefMut},
6 pin::Pin,
7 sync::{Arc, Mutex},
8 task::{Context, Poll},
9 time::{Duration, Instant},
10};
11
12use crate::coord::*;
13
14#[derive(Default, Clone, Debug)]
15pub struct Stats {
16 pub send_count: usize,
17 pub send_size: usize,
18 pub recv_size: usize,
19 pub recv_count: usize,
20 pub wait_time: Duration,
21 pub wait_times: Vec<(MsgId, Duration)>,
22}
23
24impl Stats {
25 pub fn alloc() -> Arc<Mutex<Self>> {
26 Arc::new(Mutex::new(Self::default()))
27 }
28
29 pub fn inner(stats: Arc<Mutex<Self>>) -> Self {
30 stats.lock().unwrap().clone()
31 }
32}
33
34pub struct RelayStats<R: Relay> {
35 relay: R,
36 stats: Arc<Mutex<Stats>>,
37 waiting: Option<Instant>,
38}
39
40impl<R: Relay> RelayStats<R> {
41 pub fn new(relay: R, stats: Arc<Mutex<Stats>>) -> Self {
42 Self {
43 relay,
44 stats,
45 waiting: None,
46 }
47 }
48}
49
50impl<R: Relay> Stream for RelayStats<R> {
51 type Item = <R as Stream>::Item;
52
53 fn poll_next(
54 self: Pin<&mut Self>,
55 cx: &mut Context<'_>,
56 ) -> Poll<Option<Self::Item>> {
57 let this = self.get_mut();
58
59 match this.relay.poll_next_unpin(cx) {
60 Poll::Ready(Some(msg)) => {
61 let waiting = this.waiting.take();
62 let mut stats = this.stats.lock().unwrap();
63
64 stats.recv_size += msg.len();
65 stats.recv_count += 1;
66
67 let wait_time = waiting
68 .map(|start| start.elapsed())
69 .unwrap_or(Duration::new(0, 0));
70
71 if let Ok(hdr) = <&MsgHdr>::try_from(msg.as_slice()) {
72 stats.wait_times.push((*hdr.id(), wait_time));
73 }
74
75 stats.wait_time += wait_time;
76
77 Poll::Ready(Some(msg))
78 }
79
80 Poll::Ready(None) => Poll::Ready(None),
81
82 Poll::Pending => {
83 if this.waiting.is_none() {
84 this.waiting = Some(Instant::now());
86 }
87
88 Poll::Pending
89 }
90 }
91 }
92}
93
94impl<R: Relay> Sink<Vec<u8>> for RelayStats<R> {
95 type Error = <R as Sink<Vec<u8>>>::Error;
96
97 fn poll_ready(
98 self: Pin<&mut Self>,
99 cx: &mut Context<'_>,
100 ) -> Poll<Result<(), Self::Error>> {
101 self.get_mut().relay.poll_ready_unpin(cx)
102 }
103
104 fn start_send(
105 self: Pin<&mut Self>,
106 item: Vec<u8>,
107 ) -> Result<(), Self::Error> {
108 let _ = self.stats.lock().map(|mut stats| {
109 stats.send_size += item.len();
110 stats.send_count += 1;
111 });
112
113 self.get_mut().relay.start_send_unpin(item)
114 }
115
116 fn poll_flush(
117 self: Pin<&mut Self>,
118 cx: &mut Context<'_>,
119 ) -> Poll<Result<(), Self::Error>> {
120 self.get_mut().relay.poll_flush_unpin(cx)
121 }
122
123 fn poll_close(
124 self: Pin<&mut Self>,
125 cx: &mut Context<'_>,
126 ) -> Poll<Result<(), Self::Error>> {
127 self.get_mut().relay.poll_close_unpin(cx)
128 }
129}
130
131impl<R: Relay> Relay for RelayStats<R> {}
132
133impl<R: Relay> Deref for RelayStats<R> {
134 type Target = R;
135
136 fn deref(&self) -> &Self::Target {
137 &self.relay
138 }
139}
140
141impl<R: Relay> DerefMut for RelayStats<R> {
142 fn deref_mut(&mut self) -> &mut Self::Target {
143 &mut self.relay
144 }
145}