sparkles_parser/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
mod perfetto_format;
mod consts;
mod decoder;

use std::cmp::min;
use std::collections::BTreeMap;
use std::io::{Read, Write};
use log::{debug, error, info, warn};
use thiserror::Error;
use sparkles_core::headers::{LocalPacketHeader, SparklesEncoderInfo};
use sparkles_core::local_storage::id_mapping::EventType;
use crate::decoder::StreamFrameDecoder;
use crate::ParseError::Decode;
use crate::perfetto_format::PerfettoTraceFile;

pub static PARSER_BUF_SIZE: usize = 1_000_000;

#[derive(Default)]
pub struct SparklesParser {
    total_event_bytes: u64,
    total_transport_bytes: u64,

    encoder_info: Option<SparklesEncoderInfo>,
    ticks_per_ns: Option<f64>,

    event_parsers: BTreeMap<u64, ThreadParserState>,
}

#[derive(Default)]
pub struct ThreadParserState {
    thread_name: Option<String>,
    thread_id: Option<u64>,
    event_buf: Vec<(LocalPacketHeader, Vec<TracingEvent>)>,

    // start timestamp and duration for missed events packet
    missed_events: Vec<(u64, u64)>,

    // ---- TMP DATA ----
    state_machine: StreamFrameDecoder,
    // Helper for ranges handling
    cur_started_ranges: BTreeMap<u8, (TracingEventId, u64)>,
    // Current timestamp, accumulated from events
    cur_tm: u64,
    zero_diff_cnt: u64,
}

#[derive(Debug, Error)]
pub enum ParseError {
    #[error("Error while decoding frame")]
    Decode(DecodeError),
}

#[derive(Debug, Error)]
pub enum DecodeError {
    #[error("Error while reading from stream")]
    Io(#[from] std::io::Error),
    #[error("Error while deserializing data")]
    Bincode(#[from] bincode::Error),
}

type ParseResult<T> = Result<T, ParseError>;
type DecodeResult<T> = Result<T, DecodeError>;

impl SparklesParser {
    /// Decode incoming events and save them to `trace.json` in Perfetto format
    pub fn parse_and_save(&mut self, mut reader: impl Read) -> ParseResult<()> {
        if let Err(e) = self.decode_packets(&mut reader) {
            error!("Error handling client: {:?}", e);
            return Err(Decode(e));
        }

        //some stats
        let mut total_events = 0;
        let mut min_timestamp = u64::MAX;
        let mut max_timestamp = 0;
        let mut covered_dur = 0;


        let encoder_info = self.encoder_info.take().unwrap_or_else(|| {
            warn!("Encoder info is not present in decoded data! Using default values");
            SparklesEncoderInfo::default()
        });

        info!("Begin parsing... Encoder info: {:?}", encoder_info);

        let mut trace_res_file = PerfettoTraceFile::new(encoder_info.process_name, encoder_info.pid);
        let ticks_per_ns = self.ticks_per_ns.unwrap_or_else( || {
            warn!("Did not find timestamp frequency in decoded stream! Using default values");
            1.0
        });
        // iterate over all threads
        for (&thread_ord_id, parser_state) in &mut self.event_parsers {
            let thread_name = parser_state.thread_name.clone().unwrap_or("".to_string());
            let thread_id = parser_state.thread_id.unwrap_or(thread_ord_id);
            // iterate over events
            for (header, events) in &parser_state.event_buf {
                trace_res_file.set_thread_name(thread_id, thread_name.clone());

                parser_state.cur_tm = header.start_timestamp;
                let mut first = true;
                for event in events {
                    let mut dif_tm_zero = false;
                    if first {
                        first = false;
                    }
                    else {
                        let dif_tm = match event {
                            TracingEvent::Instant(_, dif_tm) => dif_tm,
                            TracingEvent::RangePart(_, dif_tm, _) => dif_tm,
                            TracingEvent::UnnamedRangeEnd(dif_tm, _) => dif_tm
                        };
                        if *dif_tm == 0 {
                            dif_tm_zero = true;
                        }
                        parser_state.cur_tm += dif_tm;
                    }
                    if !dif_tm_zero {
                        parser_state.zero_diff_cnt = 0;
                    }
                    else {
                        parser_state.zero_diff_cnt += 1;
                    }
                    // add to trace file
                    let timestamp = (parser_state.cur_tm as f64 / ticks_per_ns) as u64 + parser_state.zero_diff_cnt * 10;
                    match event {
                        TracingEvent::Instant(id, _) => {
                            let (ev_name, _) = &header.id_store.tags[*id as usize];
                            trace_res_file.add_point_event(ev_name.clone(), thread_id, timestamp);
                        }
                        TracingEvent::RangePart(id, _, ord_id) => {
                            let (ev_name, ev_type) = &header.id_store.tags[*id as usize];
                            if let EventType::RangeEnd(start_id) = ev_type {
                                let (start_name, _) = &header.id_store.tags[*start_id as usize];
                                let start_info = parser_state.cur_started_ranges.remove(ord_id).unwrap();
                                let start_tm = start_info.1;
                                let end_tm = timestamp;
                                trace_res_file.add_range_event(format!("{} -> {}", start_name, ev_name), thread_id, start_tm, end_tm);
                            }
                            else {
                                // Range start
                                parser_state.cur_started_ranges.insert(*ord_id, (*id, timestamp));
                            }
                        }
                        TracingEvent::UnnamedRangeEnd(_, ord_id ) => {
                            let start_info = parser_state.cur_started_ranges.remove(ord_id).unwrap();
                            let range_id = start_info.0;
                            let range_name = &header.id_store.tags[range_id as usize].0;
                            let start_tm = start_info.1;
                            let end_tm = timestamp;
                            trace_res_file.add_range_event(range_name.clone(), thread_id, start_tm, end_tm);
                        }
                    }
                }
                total_events += events.len();
                if header.start_timestamp < min_timestamp {
                    min_timestamp = header.start_timestamp;
                }
                if header.end_timestamp > max_timestamp {
                    max_timestamp = header.end_timestamp;
                }
                covered_dur += header.end_timestamp - header.start_timestamp;

            }
        }

        let events_per_sec = total_events as f64 / ((max_timestamp - min_timestamp) as f64 / ticks_per_ns) * 1_000_000_000.0;
        let events_per_sec_covered = total_events as f64 / (covered_dur as f64 / ticks_per_ns) * 1_000_000_000.0;
        info!("Total events: {}", total_events);
        info!("Events per second (global): {} eps", events_per_sec);
        info!("Events per second (covered): {} eps", events_per_sec_covered);
        info!("Average event duration: {} ns", covered_dur as f64 / ticks_per_ns / total_events as f64);
        info!("Average bytes per event: {} bytes", self.total_event_bytes as f64 / total_events as f64);
        info!("Average transport bytes per event: {} bytes", self.total_transport_bytes as f64 / total_events as f64);

        info!("Finished! Saving to trace.perf...");

        let mut file = std::fs::File::create("trace.perf").unwrap();
        let bytes = trace_res_file.get_bytes();
        file.write_all(&bytes).unwrap();

        info!("Your `trace.perf` is ready! Now, navigate to https://ui.perfetto.dev/ and drag'n'drop the file onto the page.");
        Ok(())
    }

    fn decode_packets(&mut self, con: &mut impl Read) -> DecodeResult<()> {
        loop {
            let mut packet_type = [0u8; 1];
            con.read_exact(&mut packet_type)?;
            info!("Packet id: {}", packet_type[0]);

            let mut events_bytes = vec![0; 10_000];

            match packet_type[0] {
                0x00 => {
                    let mut info_bytes_len = [0u8; 8];
                    con.read_exact(&mut info_bytes_len)?;
                    let info_bytes_len = u64::from_le_bytes(info_bytes_len) as usize;

                    let mut info_bytes = vec![0u8; info_bytes_len];
                    con.read_exact(&mut info_bytes)?;
                    let info = bincode::deserialize::<SparklesEncoderInfo>(&info_bytes)?;

                    if info.ver != consts::ENCODER_VERSION {
                        warn!("Encoder version mismatch! Parser: {}, Encoder: {}", consts::ENCODER_VERSION, info.ver);
                    }

                    self.encoder_info = Some(info);
                }
                0x01 => {
                    let mut total_bytes = [0u8; 8];
                    con.read_exact(&mut total_bytes)?;
                    let mut total_bytes = u64::from_le_bytes(total_bytes) as usize;

                    while total_bytes > 0 {
                        let mut header_len = [0u8; 8];
                        con.read_exact(&mut header_len)?;
                        self.total_transport_bytes += 8;
                        let header_len = u64::from_le_bytes(header_len) as usize;

                        let mut header_bytes = vec![0u8; header_len];
                        con.read_exact(&mut header_bytes)?;
                        self.total_transport_bytes += header_len as u64;
                        let header = bincode::deserialize::<LocalPacketHeader>(&header_bytes)?;

                        let mut buf_len = [0u8; 8];
                        con.read_exact(&mut buf_len)?;
                        self.total_transport_bytes += 8;
                        let buf_len = u64::from_le_bytes(buf_len) as usize;

                        let mut event_buf = Vec::with_capacity(PARSER_BUF_SIZE);
                        info!("Got packet header: {:?}", header);

                        let thread_id = header.thread_ord_id;
                        let cur_parser_state = self.event_parsers.entry(thread_id).or_default();

                        //update thread name
                        if let Some(thread_info) = &header.thread_info {
                            if let Some(thread_name) = thread_info.new_thread_name.clone() {
                                cur_parser_state.thread_name = Some(thread_name);
                                cur_parser_state.thread_id = Some(thread_info.thread_id);
                            }
                        }

                        let mut remaining_size = buf_len;
                        while remaining_size > 0 {
                            let cur_size = min(PARSER_BUF_SIZE, remaining_size);
                            events_bytes.resize(cur_size, 0);
                            con.read_exact(&mut events_bytes)?;
                            self.total_transport_bytes += cur_size as u64;

                            let new_events = cur_parser_state.state_machine.decode_many(&events_bytes);
                            let new_events_len = new_events.len();
                            event_buf.extend_from_slice(&new_events);
                            debug!("Got {} bytes, Parsed {} events", cur_size, new_events_len);
                            self.total_event_bytes += cur_size as u64;

                            remaining_size -= cur_size;
                        }
                        cur_parser_state.state_machine.ensure_buf_end();

                        total_bytes -= 8 + 8 + header_len + buf_len;

                        cur_parser_state.event_buf.push((header, event_buf));
                    }
                },
                0x02 => {
                    let mut header_len = [0u8; 8];
                    con.read_exact(&mut header_len)?;
                    self.total_transport_bytes += 8;
                    let header_len = u64::from_le_bytes(header_len) as usize;

                    let mut header_bytes = vec![0u8; header_len];
                    con.read_exact(&mut header_bytes)?;
                    self.total_transport_bytes += header_len as u64;
                    let header = bincode::deserialize::<LocalPacketHeader>(&header_bytes)?;

                    info!("Got failed packet header: {:?}", header);

                    let start = header.start_timestamp;
                    let dur = header.end_timestamp - header.start_timestamp;
                    let thread_ord_id = header.thread_ord_id;
                    self.thread_parser_state(thread_ord_id).missed_events.push((start, dur));

                },
                0x03 => {
                    let mut bytes = [0u8; 8];
                    con.read_exact(&mut bytes)?;
                    let ticks_per_sec = u64::from_le_bytes(bytes);
                    let ticks_per_ns = ticks_per_sec as f64 / 1_000_000_000.0;
                    info!("Got timestamp frequency: {:?} t/ns", ticks_per_ns);

                    self.ticks_per_ns = Some(ticks_per_ns);
                }
                0xff => {
                    info!("Client was gracefully disconnected!");

                    return Ok(());
                }
                _ => panic!("Unknown packet type!")
            }
        }
    }

    fn thread_parser_state(&mut self, thread_id: u64) -> &mut ThreadParserState {
        self.event_parsers.entry(thread_id).or_default()
    }
}

pub type TracingEventId = u8;

/// event, dif_tm
#[derive(Debug, Copy, Clone)]
pub enum TracingEvent {
    Instant(TracingEventId, u64),
    RangePart(TracingEventId, u64, u8),
    UnnamedRangeEnd(u64, u8)
}