sl_mpc_mate/coord/
stats.rs

1// Copyright (c) Silence Laboratories Pte. Ltd. All Rights Reserved.
2// This software is licensed under the Silence Laboratories License Agreement.
3
4use std::{
5    ops::{Deref, DerefMut},
6    pin::Pin,
7    sync::{Arc, Mutex},
8    task::{Context, Poll},
9    time::{Duration, Instant},
10};
11
12use crate::coord::*;
13
14#[derive(Default, Clone, Debug)]
15pub struct Stats {
16    pub send_count: usize,
17    pub send_size: usize,
18    pub recv_size: usize,
19    pub recv_count: usize,
20    pub wait_time: Duration,
21    pub wait_times: Vec<(MsgId, Duration)>,
22}
23
24impl Stats {
25    pub fn alloc() -> Arc<Mutex<Self>> {
26        Arc::new(Mutex::new(Self::default()))
27    }
28
29    pub fn inner(stats: Arc<Mutex<Self>>) -> Self {
30        stats.lock().unwrap().clone()
31    }
32}
33
34pub struct RelayStats<R: Relay> {
35    relay: R,
36    stats: Arc<Mutex<Stats>>,
37    waiting: Option<Instant>,
38}
39
40impl<R: Relay> RelayStats<R> {
41    pub fn new(relay: R, stats: Arc<Mutex<Stats>>) -> Self {
42        Self {
43            relay,
44            stats,
45            waiting: None,
46        }
47    }
48}
49
50impl<R: Relay> Stream for RelayStats<R> {
51    type Item = <R as Stream>::Item;
52
53    fn poll_next(
54        self: Pin<&mut Self>,
55        cx: &mut Context<'_>,
56    ) -> Poll<Option<Self::Item>> {
57        let this = self.get_mut();
58
59        match this.relay.poll_next_unpin(cx) {
60            Poll::Ready(Some(msg)) => {
61                let waiting = this.waiting.take();
62                let mut stats = this.stats.lock().unwrap();
63
64                stats.recv_size += msg.len();
65                stats.recv_count += 1;
66
67                let wait_time = waiting
68                    .map(|start| start.elapsed())
69                    .unwrap_or(Duration::new(0, 0));
70
71                if let Ok(hdr) = <&MsgHdr>::try_from(msg.as_slice()) {
72                    stats.wait_times.push((*hdr.id(), wait_time));
73                }
74
75                stats.wait_time += wait_time;
76
77                Poll::Ready(Some(msg))
78            }
79
80            Poll::Ready(None) => Poll::Ready(None),
81
82            Poll::Pending => {
83                if this.waiting.is_none() {
84                    // mark the beginning of message waiting
85                    this.waiting = Some(Instant::now());
86                }
87
88                Poll::Pending
89            }
90        }
91    }
92}
93
94impl<R: Relay> Sink<Vec<u8>> for RelayStats<R> {
95    type Error = <R as Sink<Vec<u8>>>::Error;
96
97    fn poll_ready(
98        self: Pin<&mut Self>,
99        cx: &mut Context<'_>,
100    ) -> Poll<Result<(), Self::Error>> {
101        self.get_mut().relay.poll_ready_unpin(cx)
102    }
103
104    fn start_send(
105        self: Pin<&mut Self>,
106        item: Vec<u8>,
107    ) -> Result<(), Self::Error> {
108        let _ = self.stats.lock().map(|mut stats| {
109            stats.send_size += item.len();
110            stats.send_count += 1;
111        });
112
113        self.get_mut().relay.start_send_unpin(item)
114    }
115
116    fn poll_flush(
117        self: Pin<&mut Self>,
118        cx: &mut Context<'_>,
119    ) -> Poll<Result<(), Self::Error>> {
120        self.get_mut().relay.poll_flush_unpin(cx)
121    }
122
123    fn poll_close(
124        self: Pin<&mut Self>,
125        cx: &mut Context<'_>,
126    ) -> Poll<Result<(), Self::Error>> {
127        self.get_mut().relay.poll_close_unpin(cx)
128    }
129}
130
131impl<R: Relay> Relay for RelayStats<R> {}
132
133impl<R: Relay> Deref for RelayStats<R> {
134    type Target = R;
135
136    fn deref(&self) -> &Self::Target {
137        &self.relay
138    }
139}
140
141impl<R: Relay> DerefMut for RelayStats<R> {
142    fn deref_mut(&mut self) -> &mut Self::Target {
143        &mut self.relay
144    }
145}