sparkles_parser/
lib.rs

1#[cfg(feature="perfetto")]
2mod perfetto_format;
3pub mod tracing_decoder;
4pub mod parsed;
5pub mod packet_decoder;
6pub mod discovery_wrapper;
7
8use std::collections::BTreeMap;
9use std::ops::Deref;
10use std::rc::Rc;
11use std::thread;
12use std::sync::atomic::AtomicBool;
13use std::sync::mpsc;
14use std::time::{Duration, Instant};
15use log::{debug, error, info, warn};
16use sparkles_core::consts::PROTOCOL_VERSION;
17use sparkles_core::local_storage::id_mapping::EventType;
18use sparkles_core::protocol::headers::SparklesMachineInfo;
19use crate::packet_decoder::{Packet, PacketDecoder, PacketReadError, ProtocolCounters};
20use crate::parsed::{ParsedEvent, ThreadInfoState};
21use crate::tracing_decoder::StreamFrameDecoder;
22
23// pub exports
24pub use discovery_wrapper::DiscoveryWrapper;
25
26pub static PARSER_BUF_SIZE: usize = 1_000_000;
27static SHUTDOWN_SIGNAL: AtomicBool = AtomicBool::new(false);
28
29pub fn request_shutdown() {
30    SHUTDOWN_SIGNAL.store(true, std::sync::atomic::Ordering::SeqCst);
31    info!("Shutdown requested...")
32}
33pub fn is_shutting_down() -> bool {
34    SHUTDOWN_SIGNAL.load(std::sync::atomic::Ordering::SeqCst)
35}
36
37pub struct SparklesParser {
38    machine_info: Option<SparklesMachineInfo>,
39
40    event_parsers: BTreeMap<u64, ThreadParserState>,
41    local_packet_ranges: Vec<(usize, usize, u64, u64, u64)>,
42    global_i: usize,
43    interpolation_points: InterpolationPoints,
44    counters: ProtocolCounters
45}
46
47struct InterpolationPoints(BTreeMap<u64, (f64, u64)>); // key: cpu tm, value: (ticks_per_ns, timestamp nanos)
48
49impl InterpolationPoints {
50    pub fn new() -> Self {
51        Self(BTreeMap::new())
52    }
53    
54    fn add_interpolation_point(&mut self, ticks_per_ns: f64, cur_tm: u64) {
55        let ns = self.project_tm(cur_tm);
56        self.0.insert(cur_tm, (ticks_per_ns, ns));
57    }
58    fn get_avg_ticks_per_ns(&self) -> f64 {
59        self.0.values().map(|v| v.0).sum::<f64>() / self.0.len() as f64
60    }
61    
62    fn is_empty(&self) -> bool {
63        self.0.is_empty()
64    }
65
66    fn project_tm(&self, tm: u64) -> u64 {
67        let inter_points = &self.0;
68        let closest_left = inter_points.range(..=tm).next_back();
69        let closest_right = inter_points.range(tm..).next();
70        if let Some((left_tm, (left_slope, left_ns))) = closest_left {
71            // interpolate using left slope
72            let left_ns = *left_ns as f64;
73
74            let slope = *left_slope;
75
76            (left_ns + ((tm - *left_tm) as f64) / slope) as u64
77        }
78        else if let Some((right_tm, (right_slope, right_ns))) = closest_right {
79            // interpolate using right slope
80            let right_ns = *right_ns as f64;
81
82            let slope = *right_slope;
83
84            (right_ns - ((*right_tm - tm) as f64) / slope) as u64
85        }
86        else {
87            tm
88        }
89    }
90}
91
92#[derive(Default)]
93pub struct ThreadParserState {
94    thread_name: Option<String>,
95    thread_id: Option<u64>,
96    last_thread_ord_id: u64,
97
98    // start timestamp and duration for missed events packet
99    missed_events: Vec<(u64, u64)>,
100
101    // ---- TMP DATA ----
102    state_machine: StreamFrameDecoder,
103    // Helper for ranges handling
104    cur_started_ranges: BTreeMap<u8, (TracingEventId, u64)>,
105    zero_diff_cnt: u64,
106    
107    id_store: BTreeMap<TracingEventId, (Rc<str>, EventType)>,
108    stats: TracingStats,
109}
110
111#[derive(Copy, Clone, Default)]
112pub struct TracingStats {
113    pub total_events: usize,
114    pub min_timestamp: u64,
115    pub max_timestamp: u64,
116    pub covered_dur: u64,
117}
118
119impl TracingStats {
120    pub fn new_events(&mut self, events_cnt: usize, start_timestamp: u64, end_timestamp: u64) {
121        self.total_events += events_cnt;
122        if start_timestamp < self.min_timestamp {
123            self.min_timestamp = start_timestamp;
124        }
125        if end_timestamp > self.max_timestamp {
126            self.max_timestamp = end_timestamp;
127        }
128        self.covered_dur += end_timestamp - start_timestamp;
129    }
130}
131
132impl ThreadParserState {
133    pub fn thread_info_state(&self) -> ThreadInfoState {
134        ThreadInfoState {
135            thread_id: self.thread_id,
136            thread_name: self.thread_name.clone(),
137            thread_ord_id: self.last_thread_ord_id,
138        }
139    }
140}
141
142pub type ParseResult<T> = Result<T, PacketReadError>;
143
144impl SparklesParser {
145    /// Initialize parser from byte stream
146    pub fn new() -> Self {
147        #[cfg(feature="self-tracing")]
148        sparkles::init(sparkles::config::SparklesConfig::default()
149            .with_global_capacity(100_000)
150            .with_file_sender(sparkles::sender::file_sender::FileSenderConfig::Directory(std::path::PathBuf::from("parser-trace")))
151        ).forget();
152        Self {
153            machine_info: None,
154            event_parsers: BTreeMap::new(),
155
156            local_packet_ranges: Vec::new(),
157            global_i: 0,
158            interpolation_points: InterpolationPoints::new(),
159            counters: ProtocolCounters::default()
160        }
161    }
162
163    pub fn parse_to_end(&mut self, mut packet_decoder: PacketDecoder, mut f: impl FnMut(&ParsedEvent, &ThreadInfoState)) -> ParseResult<()> {
164        let (packets_tx, packets_rx) = mpsc::sync_channel(100);
165        let (counters_tx, counters_rx) = mpsc::sync_channel(1);
166
167        let jh = thread::Builder::new().name(String::from("Receiving thread")).spawn(move || {
168            let mut last_packet_received = Instant::now();
169            loop {
170                let packet = packet_decoder.read_packet();
171                #[cfg(feature="self-tracing")]
172                let g = sparkles_macro::range_event_start!("Parse packet");
173
174                if packet.is_ok() {
175                    last_packet_received = Instant::now();
176                }
177                match packet {
178                    Ok(Some(packet)) => {
179                        if matches!(packet, Packet::GracefulShutdown) {
180                            info!("GracefulShutdown received!");
181                            break;
182                        }
183                        packets_tx.send(packet).unwrap();
184                    }
185                    Ok(None) => {
186
187                    }
188                    Err(e) => {
189                        #[cfg(feature="self-tracing")]
190                        sparkles_macro::range_event_end!(g, "Error while reading");
191                        warn!("Error while reading packet: {:?}", e);
192                        
193                        thread::sleep(Duration::from_millis(500));
194
195                        if last_packet_received.elapsed() > Duration::from_secs(5) {
196                            warn!("No packets received for 5 seconds. Exiting...");
197                            break;
198                        }
199                    }
200                }
201
202                if SHUTDOWN_SIGNAL.load(std::sync::atomic::Ordering::SeqCst) {
203                    break;
204                }
205            }
206            counters_tx.send(packet_decoder.counters()).unwrap();
207        }).unwrap();
208
209        while let Ok(packet) = packets_rx.recv() {
210            self.parse_single_packet(packet, &mut f);
211        }
212        self.counters = counters_rx.recv().unwrap();
213
214        jh.join().unwrap();
215
216        #[cfg(feature="self-tracing")]
217        sparkles::finalize();
218
219        self.print_stats();
220        Ok(())
221    }
222
223    pub fn parse_single_packet(&mut self, packet: Packet, f: &mut impl FnMut(&ParsedEvent, &ThreadInfoState)) {
224        match packet {
225            Packet::MachineInfo(info) => {
226                if info.ver.0 != PROTOCOL_VERSION.0 {
227                    error!("Protocol major version mismatch! Parser: {}, Sender: {}", PROTOCOL_VERSION.0, info.ver.0);
228                }
229                else if info.ver.1 < PROTOCOL_VERSION.1 {
230                    warn!("Sender protocol version is higher than parser! Parser: {}.{}, Sender: {}.{}",
231                        info.ver.0, PROTOCOL_VERSION.1, info.ver.0, info.ver.1)
232                }
233
234                self.machine_info = Some(info);
235            }
236            Packet::TimestampFreq(ticks_per_sec, cur_tm) => {
237                let ticks_per_ns = ticks_per_sec as f64 / 1_000_000_000.0;
238                info!("Got timestamp frequency: {:?} t/ns", ticks_per_ns);
239
240                self.interpolation_points.add_interpolation_point(ticks_per_ns, cur_tm);
241            }
242            Packet::DataBytes(packets) => {
243                if self.interpolation_points.is_empty() {
244                    error!("Timestamp frequency is not set! Dropping packet.");
245                }
246                
247                let global_i = self.global_i;
248                self.global_i += 1;
249                for (local_i, (header, data)) in packets.into_iter().enumerate() {
250                    #[cfg(feature="self-tracing")]
251                    let g = sparkles_macro::range_event_start!("Parse header");
252                    let thread_id = header.thread_ord_id;
253                    let parser_state = self.event_parsers.entry(thread_id).or_default();
254                    self.local_packet_ranges.push((global_i, local_i, thread_id, self.interpolation_points.project_tm(header.start_timestamp), self.interpolation_points.project_tm(header.end_timestamp)));
255
256                    //update thread name
257                    parser_state.thread_id = Some(header.thread_info.thread_id);
258                    if let Some(thread_name) = header.thread_info.new_thread_name.clone() {
259                        parser_state.thread_name = Some(thread_name);
260                    }
261                    parser_state.last_thread_ord_id = thread_id;
262
263                    // Merge id store
264                    for (id, (name, r#type)) in header.id_store.tags.iter().enumerate() {
265                        let id = id as u8;
266                        if let Some((old_name, old_type)) = parser_state.id_store.get(&id) {
267                            if old_name.as_ref() != name.deref() || old_type != r#type {
268                                error!("ID store mismatch for thread {:?}#{:?}! ID: {}, Old: {:?}, New: {:?}", parser_state.thread_name, parser_state.thread_id,
269                                                id, (old_name, old_type), (name, r#type));
270                            }
271                        }
272                        parser_state.id_store.insert(id, (Rc::from(name.deref()), *r#type));
273                    }
274                    #[cfg(feature="self-tracing")]
275                    drop(g);
276
277                    if self.interpolation_points.is_empty() {
278                        continue;
279                    }
280
281                    #[cfg(feature="self-tracing")]
282                    let g = sparkles_macro::range_event_start!("Decode raw events");
283                    let new_events = parser_state.state_machine.decode_many(&data);
284                    let new_events_len = new_events.len();
285                    debug!("Received {} events", new_events_len);
286                    #[cfg(feature="self-tracing")]
287                    drop(g);
288
289                    #[cfg(feature="self-tracing")]
290                    let g = sparkles_macro::range_event_start!("Parse new events");
291                    let mut cur_tm = header.start_timestamp;
292                    let mut first = true;
293                    for evt in new_events {
294                        let mut dif_tm_zero = false;
295                        if first {
296                            first = false;
297                        }
298                        else {
299                            let dif_tm = match evt {
300                                TracingEvent::Instant(_, dif_tm) => dif_tm,
301                                TracingEvent::RangePart(_, dif_tm, _) => dif_tm,
302                                TracingEvent::UnnamedRangeEnd(dif_tm, _) => dif_tm
303                            };
304                            if dif_tm == 0 {
305                                dif_tm_zero = true;
306                            }
307                            cur_tm += dif_tm;
308                        }
309                        if !dif_tm_zero {
310                            parser_state.zero_diff_cnt = 0;
311                        }
312                        else {
313                            parser_state.zero_diff_cnt += 1;
314                        }
315                        if cur_tm > header.end_timestamp {
316                            warn!("Parsing issue: Timestamp is outside local packet! diff: {}",  cur_tm - header.end_timestamp);
317                        }
318
319                        // Create ParsedEvent
320                        let timestamp = self.interpolation_points.project_tm(cur_tm) + parser_state.zero_diff_cnt * 10;
321                        match evt {
322                            TracingEvent::Instant(id, _) => {
323                                let ev_name = if let Some((ev_name, ev_type)) = parser_state.id_store.get(&id) {
324                                    if ev_type != &EventType::Instant {
325                                        error!("Assertion failed: Instant event type is not Instant!");
326                                    }
327                                    ev_name.clone()
328                                }
329                                else {
330                                    error!("Did not find event name for id: {}", id);
331                                    Rc::from(format!("Unknown Instant {}", id))
332                                };
333                                let parsed = ParsedEvent::Instant {
334                                    name: ev_name.clone(),
335                                    tm: timestamp
336                                };
337                                f(&parsed, &parser_state.thread_info_state());
338                            }
339                            TracingEvent::RangePart(id, _, ord_id) => {
340                                if let Some((ev_name, ev_type)) = parser_state.id_store.get(&id) {
341                                    if ev_type == &EventType::Instant {
342                                        error!("Assertion failed: RangePart event has Instant type!");
343                                    }
344                                    else if let EventType::RangeEnd(start_id) = ev_type {
345                                        if let Some((start_name, start_ev_type)) = parser_state.id_store.get(start_id) {
346                                            if *start_ev_type != EventType::RangeStart {
347                                                error!("Assertion failed: RangePart event has wrong RangeStart type!");
348                                            }
349                                            if let Some((ev_id, start_tm)) = parser_state.cur_started_ranges.remove(&ord_id) {
350                                                if ev_id != *start_id {
351                                                    error!("Assertion failed: RangePart event has wrong RangeEnd id!");
352                                                }
353                                                let parsed = ParsedEvent::NamedRange {
354                                                    name: start_name.clone(),
355                                                    end_name: ev_name.clone(),
356                                                    start: start_tm,
357                                                    end: timestamp
358                                                };
359                                                f(&parsed, &parser_state.thread_info_state());
360                                            }
361                                            else {
362                                                warn!("Did not find start event for RangePart id: {}", id);
363                                            }
364                                        }
365                                        else {
366                                            warn!("Did not find start event for RangePart id: {}", id);
367                                        }
368                                    }
369                                    else {
370                                        // Range start
371                                        parser_state.cur_started_ranges.insert(ord_id, (id, timestamp));
372                                    }
373                                }
374                                else {
375                                    error!("Did not find event name for id: {}", id);
376                                    let ev_name: Rc<str> = Rc::from(format!("Unknown RangePart {}", id));
377
378                                    let parsed = ParsedEvent::Instant {
379                                        name: ev_name,
380                                        tm: timestamp
381                                    };
382                                    f(&parsed, &parser_state.thread_info_state());
383                                };
384
385                            }
386                            TracingEvent::UnnamedRangeEnd(_, ord_id ) => {
387                                if let Some(start_info) = parser_state.cur_started_ranges.remove(&ord_id) {
388                                    if let Some((start_name, ev_type)) = parser_state.id_store.get(&start_info.0) {
389                                        if *ev_type != EventType::RangeStart {
390                                            error!("Assertion failed: UnnamedRangeEnd event has non-RangeStart type!");
391                                        }
392                                        let parsed = ParsedEvent::Range {
393                                            name: start_name.clone(),
394                                            start: start_info.1,
395                                            end: timestamp
396                                        };
397                                        f(&parsed, &parser_state.thread_info_state());
398                                    }
399                                    else {
400                                        warn!("Did not find start event for UnnamedRangeEnd id: {}. Skipping...", ord_id);
401                                    }
402                                }
403                                else {
404                                    warn!("Did not find start event for UnnamedRangeEnd id: {}. Skipping...", ord_id);
405                                }
406                            }
407                        }
408                    }
409
410                    parser_state.stats.new_events(new_events_len, header.start_timestamp, header.end_timestamp);
411
412                    parser_state.state_machine.ensure_buf_end();
413                }
414            }
415
416            Packet::FailedPages(failed_pages) => {
417                for header in failed_pages {
418                    info!("Got failed pages header: {:?}", header);
419
420                    let start = header.start_timestamp;
421                    let dur = header.end_timestamp - header.start_timestamp;
422                    let thread_ord_id = header.thread_ord_id;
423                    self.thread_parser_state(thread_ord_id).missed_events.push((start, dur));
424                }
425            }
426
427            Packet::GracefulShutdown => {}
428            Packet::ConnectionAccepted => {}
429            Packet::Hello => {}
430        }
431    }
432    pub fn print_stats(&self) {
433        let ticks_per_ns = self.interpolation_points.get_avg_ticks_per_ns();
434        info!("Printing stats...");
435        
436        let mut total_events = 0;
437        for (ord_id, thread) in &self.event_parsers {
438            info!("\tThread: {:?}#{:?}", thread.thread_name, ord_id);
439            let stats = thread.stats;
440            
441            let events_per_sec = stats.total_events as f64 / ((stats.max_timestamp - stats.min_timestamp) as f64 / ticks_per_ns) * 1_000_000_000.0;
442            let events_per_sec_covered = stats.total_events as f64 / (stats.covered_dur as f64 / ticks_per_ns) * 1_000_000_000.0;
443            info!("Total events: {}", stats.total_events);
444            info!("Events per second (global): {} eps", events_per_sec);
445            info!("Events per second (covered): {} eps", events_per_sec_covered);
446            info!("Average event duration: {} ns", stats.covered_dur as f64 / ticks_per_ns / stats.total_events as f64);
447            
448            info!("\n");
449            
450            total_events += stats.total_events;
451        }
452        
453        info!("Average bytes per event: {} bytes", self.counters.trace_buf as f64 / total_events as f64);
454        info!("Average transport bytes per event: {} bytes", self.counters.total_bytes() as f64 / total_events as f64);
455
456    }
457
458    /// Continuously pull events until EOF.
459    /// Decode incoming events and save them to `trace.json` in Perfetto format
460    #[cfg(feature="perfetto")]
461    pub fn parse_and_convert_to_perfetto(&mut self, packet_decoder: PacketDecoder) -> ParseResult<bytes::BytesMut> {
462        use crate::perfetto_format::PerfettoTraceFile;
463
464        let mut trace_res_file = PerfettoTraceFile::new();
465        self.parse_to_end(packet_decoder, |ev, thread_info| {
466            let thread_id = thread_info.thread_id.unwrap_or(999);
467            trace_res_file.set_thread_name(thread_id, thread_info.thread_name.as_deref());
468            #[cfg(feature="local-packet-bounds")]
469            trace_res_file.set_thread_name(999666 + thread_info.thread_ord_id, Some("[not thread] local packets"));
470            
471            match ev {
472                ParsedEvent::Instant {
473                    name,
474                    tm
475                } => {
476                    trace_res_file.add_point_event(name, thread_id, *tm);
477                }
478                ParsedEvent::Range {
479                    name,
480                    start,
481                    end
482                } => {
483                    trace_res_file.add_range_event(name, thread_id,
484                                                   *start, *end);
485                }
486                ParsedEvent::NamedRange {
487                    name,
488                    end_name,
489                    start,
490                    end
491                } => {
492
493                    trace_res_file.add_range_event(&format!("{} -> {}", name, end_name), thread_id, 
494                                                   *start, *end);
495                }
496            }
497        })?;
498
499        if cfg!(feature="local-packet-bounds") {
500            for (global_i, local_i, thread_ord_id, start,end) in std::mem::take(&mut self.local_packet_ranges).into_iter() {
501                trace_res_file.add_range_event(&format!("Local packet #{global_i}.{local_i}"), 999666 + thread_ord_id, start, end);
502            }
503        }
504
505        let encoder_info = self.machine_info.take().unwrap_or_else(|| {
506            warn!("Encoder info is not present in decoded data! Using default values");
507            SparklesMachineInfo::default()
508        });
509        trace_res_file.set_process_info(encoder_info.process_name, encoder_info.pid);
510
511        let bytes = trace_res_file.get_bytes();
512        Ok(bytes)
513    }
514    fn thread_parser_state(&mut self, thread_id: u64) -> &mut ThreadParserState {
515        self.event_parsers.entry(thread_id).or_default()
516    }
517}
518
519pub type TracingEventId = u8;
520
521/// event, dif_tm
522#[derive(Debug, Copy, Clone)]
523pub enum TracingEvent {
524    Instant(TracingEventId, u64),
525    RangePart(TracingEventId, u64, u8),
526    UnnamedRangeEnd(u64, u8)
527}
528const VERSION: &str = env!("CARGO_PKG_VERSION");
529pub fn version() {
530    println!("Sparkles-parser v{VERSION}");
531    println!("  Using sparkles protocol version {}.{}", PROTOCOL_VERSION.0, PROTOCOL_VERSION.1);
532}