aggligator/agg/
dump.rs

1//! Dump data for performance analysis.
2//!
3//! The purpose of the data is to debug connection performance issues
4//! and to help with the development of Aggligator.
5//!
6//! The data can be visualized using the `PlotDump.ipynb` script
7//! from the repository.
8//!
9
10use serde::{Deserialize, Serialize};
11use std::{io::Error, path::Path};
12use tokio::{
13    fs::File,
14    io::{AsyncWriteExt, BufWriter},
15    sync::mpsc,
16};
17
18/// Link dump data for analysis.
19#[derive(Debug, Clone, Default, Serialize, Deserialize)]
20pub struct LinkDump {
21    /// Whether the link is present.
22    pub present: bool,
23    /// The link id.
24    pub link_id: u128,
25    /// Whether the link is currently unconfirmed.
26    pub unconfirmed: bool,
27    /// Whether the sender is idle.
28    pub tx_idle: bool,
29    /// Whether the sender is being polled for readiness.
30    pub tx_pending: bool,
31    /// Whether the sender is being flushed.
32    pub tx_flushing: bool,
33    /// Whether the sender has been flushed.
34    pub tx_flushed: bool,
35    /// Then length of the acknowledgement queue.
36    pub tx_ack_queue: usize,
37    /// Amount of sent, unacknowledged data.
38    pub txed_unacked_data: usize,
39    /// Current limit of sent, unacknowledged data.
40    pub txed_unacked_data_limit: usize,
41    /// How many times `txed_unacked_data_limit` has been increased
42    /// without send buffer overrun.
43    pub txed_unacked_data_limit_increased_consecutively: usize,
44    /// Total bytes sent.
45    pub total_sent: u64,
46    /// Total bytes received.
47    pub total_recved: u64,
48    /// Roundtrip time in milliseconds.
49    pub roundtrip: f32,
50}
51
52/// Connection dump data for analysis.
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct ConnDump {
55    /// Connection id.
56    pub conn_id: u128,
57    /// Running time in seconds.
58    pub runtime: f32,
59    /// Amount of sent, unacknowledged data.
60    pub txed_unacked: usize,
61    /// Amount of sent, unconsumed data.
62    pub txed_unconsumed: usize,
63    /// Amount of sent, unconsumable data.
64    pub txed_unconsumable: usize,
65    /// Send buffer size.
66    pub send_buffer: u32,
67    /// Receive buffer size of remote endpoint.
68    pub remote_receive_buffer: u32,
69    /// Resend queue length.
70    pub resend_queue: usize,
71    /// Amount of received, unconsumable data.
72    pub rxed_reliable_size: usize,
73    /// Amount of received data consumed since sending last
74    /// Consumed message.
75    pub rxed_reliable_consumed_since_last_ack: usize,
76    /// Link 0.
77    pub link0: LinkDump,
78    /// Link 1.
79    pub link1: LinkDump,
80    /// Link 2.
81    pub link2: LinkDump,
82    /// Link 3.
83    pub link3: LinkDump,
84    /// Link 4.
85    pub link4: LinkDump,
86    /// Link 5.
87    pub link5: LinkDump,
88    /// Link 6.
89    pub link6: LinkDump,
90    /// Link 7.
91    pub link7: LinkDump,
92    /// Link 8.
93    pub link8: LinkDump,
94    /// Link 9.
95    pub link9: LinkDump,
96}
97
98/// Dumps analysis data from the channel to a JSON line file.
99///
100/// The file has one JSON object per line.
101pub async fn dump_to_json_line_file(
102    path: impl AsRef<Path>, mut rx: mpsc::Receiver<ConnDump>,
103) -> Result<(), Error> {
104    let file = File::create(path).await?;
105    let mut writer = BufWriter::new(file);
106
107    while let Some(dump) = rx.recv().await {
108        let line = serde_json::to_vec(&dump).unwrap();
109        writer.write_all(&line).await?;
110        writer.write_u8(b'\n').await?;
111    }
112
113    writer.flush().await?;
114
115    Ok(())
116}