1#[cfg(feature="perfetto")]
2mod perfetto_format;
3pub mod tracing_decoder;
4pub mod parsed;
5pub mod packet_decoder;
6pub mod discovery_wrapper;
7
8use std::collections::BTreeMap;
9use std::ops::Deref;
10use std::rc::Rc;
11use std::thread;
12use std::sync::atomic::AtomicBool;
13use std::sync::mpsc;
14use std::time::{Duration, Instant};
15use log::{debug, error, info, warn};
16use sparkles_core::consts::PROTOCOL_VERSION;
17use sparkles_core::local_storage::id_mapping::EventType;
18use sparkles_core::protocol::headers::SparklesMachineInfo;
19use crate::packet_decoder::{Packet, PacketDecoder, PacketReadError, ProtocolCounters};
20use crate::parsed::{ParsedEvent, ThreadInfoState};
21use crate::tracing_decoder::StreamFrameDecoder;
22
23pub use discovery_wrapper::DiscoveryWrapper;
25
26pub static PARSER_BUF_SIZE: usize = 1_000_000;
27static SHUTDOWN_SIGNAL: AtomicBool = AtomicBool::new(false);
28
29pub fn request_shutdown() {
30 SHUTDOWN_SIGNAL.store(true, std::sync::atomic::Ordering::SeqCst);
31 info!("Shutdown requested...")
32}
33pub fn is_shutting_down() -> bool {
34 SHUTDOWN_SIGNAL.load(std::sync::atomic::Ordering::SeqCst)
35}
36
37pub struct SparklesParser {
38 machine_info: Option<SparklesMachineInfo>,
39
40 event_parsers: BTreeMap<u64, ThreadParserState>,
41 local_packet_ranges: Vec<(usize, usize, u64, u64, u64)>,
42 global_i: usize,
43 interpolation_points: InterpolationPoints,
44 counters: ProtocolCounters
45}
46
47struct InterpolationPoints(BTreeMap<u64, (f64, u64)>); impl InterpolationPoints {
50 pub fn new() -> Self {
51 Self(BTreeMap::new())
52 }
53
54 fn add_interpolation_point(&mut self, ticks_per_ns: f64, cur_tm: u64) {
55 let ns = self.project_tm(cur_tm);
56 self.0.insert(cur_tm, (ticks_per_ns, ns));
57 }
58 fn get_avg_ticks_per_ns(&self) -> f64 {
59 self.0.values().map(|v| v.0).sum::<f64>() / self.0.len() as f64
60 }
61
62 fn is_empty(&self) -> bool {
63 self.0.is_empty()
64 }
65
66 fn project_tm(&self, tm: u64) -> u64 {
67 let inter_points = &self.0;
68 let closest_left = inter_points.range(..=tm).next_back();
69 let closest_right = inter_points.range(tm..).next();
70 if let Some((left_tm, (left_slope, left_ns))) = closest_left {
71 let left_ns = *left_ns as f64;
73
74 let slope = *left_slope;
75
76 (left_ns + ((tm - *left_tm) as f64) / slope) as u64
77 }
78 else if let Some((right_tm, (right_slope, right_ns))) = closest_right {
79 let right_ns = *right_ns as f64;
81
82 let slope = *right_slope;
83
84 (right_ns - ((*right_tm - tm) as f64) / slope) as u64
85 }
86 else {
87 tm
88 }
89 }
90}
91
92#[derive(Default)]
93pub struct ThreadParserState {
94 thread_name: Option<String>,
95 thread_id: Option<u64>,
96 last_thread_ord_id: u64,
97
98 missed_events: Vec<(u64, u64)>,
100
101 state_machine: StreamFrameDecoder,
103 cur_started_ranges: BTreeMap<u8, (TracingEventId, u64)>,
105 zero_diff_cnt: u64,
106
107 id_store: BTreeMap<TracingEventId, (Rc<str>, EventType)>,
108 stats: TracingStats,
109}
110
111#[derive(Copy, Clone, Default)]
112pub struct TracingStats {
113 pub total_events: usize,
114 pub min_timestamp: u64,
115 pub max_timestamp: u64,
116 pub covered_dur: u64,
117}
118
119impl TracingStats {
120 pub fn new_events(&mut self, events_cnt: usize, start_timestamp: u64, end_timestamp: u64) {
121 self.total_events += events_cnt;
122 if start_timestamp < self.min_timestamp {
123 self.min_timestamp = start_timestamp;
124 }
125 if end_timestamp > self.max_timestamp {
126 self.max_timestamp = end_timestamp;
127 }
128 self.covered_dur += end_timestamp - start_timestamp;
129 }
130}
131
132impl ThreadParserState {
133 pub fn thread_info_state(&self) -> ThreadInfoState {
134 ThreadInfoState {
135 thread_id: self.thread_id,
136 thread_name: self.thread_name.clone(),
137 thread_ord_id: self.last_thread_ord_id,
138 }
139 }
140}
141
142pub type ParseResult<T> = Result<T, PacketReadError>;
143
144impl SparklesParser {
145 pub fn new() -> Self {
147 #[cfg(feature="self-tracing")]
148 sparkles::init(sparkles::config::SparklesConfig::default()
149 .with_global_capacity(100_000)
150 .with_file_sender(sparkles::sender::file_sender::FileSenderConfig::Directory(std::path::PathBuf::from("parser-trace")))
151 ).forget();
152 Self {
153 machine_info: None,
154 event_parsers: BTreeMap::new(),
155
156 local_packet_ranges: Vec::new(),
157 global_i: 0,
158 interpolation_points: InterpolationPoints::new(),
159 counters: ProtocolCounters::default()
160 }
161 }
162
163 pub fn parse_to_end(&mut self, mut packet_decoder: PacketDecoder, mut f: impl FnMut(&ParsedEvent, &ThreadInfoState)) -> ParseResult<()> {
164 let (packets_tx, packets_rx) = mpsc::sync_channel(100);
165 let (counters_tx, counters_rx) = mpsc::sync_channel(1);
166
167 let jh = thread::Builder::new().name(String::from("Receiving thread")).spawn(move || {
168 let mut last_packet_received = Instant::now();
169 loop {
170 let packet = packet_decoder.read_packet();
171 #[cfg(feature="self-tracing")]
172 let g = sparkles_macro::range_event_start!("Parse packet");
173
174 if packet.is_ok() {
175 last_packet_received = Instant::now();
176 }
177 match packet {
178 Ok(Some(packet)) => {
179 if matches!(packet, Packet::GracefulShutdown) {
180 info!("GracefulShutdown received!");
181 break;
182 }
183 packets_tx.send(packet).unwrap();
184 }
185 Ok(None) => {
186
187 }
188 Err(e) => {
189 #[cfg(feature="self-tracing")]
190 sparkles_macro::range_event_end!(g, "Error while reading");
191 warn!("Error while reading packet: {:?}", e);
192
193 thread::sleep(Duration::from_millis(500));
194
195 if last_packet_received.elapsed() > Duration::from_secs(5) {
196 warn!("No packets received for 5 seconds. Exiting...");
197 break;
198 }
199 }
200 }
201
202 if SHUTDOWN_SIGNAL.load(std::sync::atomic::Ordering::SeqCst) {
203 break;
204 }
205 }
206 counters_tx.send(packet_decoder.counters()).unwrap();
207 }).unwrap();
208
209 while let Ok(packet) = packets_rx.recv() {
210 self.parse_single_packet(packet, &mut f);
211 }
212 self.counters = counters_rx.recv().unwrap();
213
214 jh.join().unwrap();
215
216 #[cfg(feature="self-tracing")]
217 sparkles::finalize();
218
219 self.print_stats();
220 Ok(())
221 }
222
223 pub fn parse_single_packet(&mut self, packet: Packet, f: &mut impl FnMut(&ParsedEvent, &ThreadInfoState)) {
224 match packet {
225 Packet::MachineInfo(info) => {
226 if info.ver.0 != PROTOCOL_VERSION.0 {
227 error!("Protocol major version mismatch! Parser: {}, Sender: {}", PROTOCOL_VERSION.0, info.ver.0);
228 }
229 else if info.ver.1 < PROTOCOL_VERSION.1 {
230 warn!("Sender protocol version is higher than parser! Parser: {}.{}, Sender: {}.{}",
231 info.ver.0, PROTOCOL_VERSION.1, info.ver.0, info.ver.1)
232 }
233
234 self.machine_info = Some(info);
235 }
236 Packet::TimestampFreq(ticks_per_sec, cur_tm) => {
237 let ticks_per_ns = ticks_per_sec as f64 / 1_000_000_000.0;
238 info!("Got timestamp frequency: {:?} t/ns", ticks_per_ns);
239
240 self.interpolation_points.add_interpolation_point(ticks_per_ns, cur_tm);
241 }
242 Packet::DataBytes(packets) => {
243 if self.interpolation_points.is_empty() {
244 error!("Timestamp frequency is not set! Dropping packet.");
245 }
246
247 let global_i = self.global_i;
248 self.global_i += 1;
249 for (local_i, (header, data)) in packets.into_iter().enumerate() {
250 #[cfg(feature="self-tracing")]
251 let g = sparkles_macro::range_event_start!("Parse header");
252 let thread_id = header.thread_ord_id;
253 let parser_state = self.event_parsers.entry(thread_id).or_default();
254 self.local_packet_ranges.push((global_i, local_i, thread_id, self.interpolation_points.project_tm(header.start_timestamp), self.interpolation_points.project_tm(header.end_timestamp)));
255
256 parser_state.thread_id = Some(header.thread_info.thread_id);
258 if let Some(thread_name) = header.thread_info.new_thread_name.clone() {
259 parser_state.thread_name = Some(thread_name);
260 }
261 parser_state.last_thread_ord_id = thread_id;
262
263 for (id, (name, r#type)) in header.id_store.tags.iter().enumerate() {
265 let id = id as u8;
266 if let Some((old_name, old_type)) = parser_state.id_store.get(&id) {
267 if old_name.as_ref() != name.deref() || old_type != r#type {
268 error!("ID store mismatch for thread {:?}#{:?}! ID: {}, Old: {:?}, New: {:?}", parser_state.thread_name, parser_state.thread_id,
269 id, (old_name, old_type), (name, r#type));
270 }
271 }
272 parser_state.id_store.insert(id, (Rc::from(name.deref()), *r#type));
273 }
274 #[cfg(feature="self-tracing")]
275 drop(g);
276
277 if self.interpolation_points.is_empty() {
278 continue;
279 }
280
281 #[cfg(feature="self-tracing")]
282 let g = sparkles_macro::range_event_start!("Decode raw events");
283 let new_events = parser_state.state_machine.decode_many(&data);
284 let new_events_len = new_events.len();
285 debug!("Received {} events", new_events_len);
286 #[cfg(feature="self-tracing")]
287 drop(g);
288
289 #[cfg(feature="self-tracing")]
290 let g = sparkles_macro::range_event_start!("Parse new events");
291 let mut cur_tm = header.start_timestamp;
292 let mut first = true;
293 for evt in new_events {
294 let mut dif_tm_zero = false;
295 if first {
296 first = false;
297 }
298 else {
299 let dif_tm = match evt {
300 TracingEvent::Instant(_, dif_tm) => dif_tm,
301 TracingEvent::RangePart(_, dif_tm, _) => dif_tm,
302 TracingEvent::UnnamedRangeEnd(dif_tm, _) => dif_tm
303 };
304 if dif_tm == 0 {
305 dif_tm_zero = true;
306 }
307 cur_tm += dif_tm;
308 }
309 if !dif_tm_zero {
310 parser_state.zero_diff_cnt = 0;
311 }
312 else {
313 parser_state.zero_diff_cnt += 1;
314 }
315 if cur_tm > header.end_timestamp {
316 warn!("Parsing issue: Timestamp is outside local packet! diff: {}", cur_tm - header.end_timestamp);
317 }
318
319 let timestamp = self.interpolation_points.project_tm(cur_tm) + parser_state.zero_diff_cnt * 10;
321 match evt {
322 TracingEvent::Instant(id, _) => {
323 let ev_name = if let Some((ev_name, ev_type)) = parser_state.id_store.get(&id) {
324 if ev_type != &EventType::Instant {
325 error!("Assertion failed: Instant event type is not Instant!");
326 }
327 ev_name.clone()
328 }
329 else {
330 error!("Did not find event name for id: {}", id);
331 Rc::from(format!("Unknown Instant {}", id))
332 };
333 let parsed = ParsedEvent::Instant {
334 name: ev_name.clone(),
335 tm: timestamp
336 };
337 f(&parsed, &parser_state.thread_info_state());
338 }
339 TracingEvent::RangePart(id, _, ord_id) => {
340 if let Some((ev_name, ev_type)) = parser_state.id_store.get(&id) {
341 if ev_type == &EventType::Instant {
342 error!("Assertion failed: RangePart event has Instant type!");
343 }
344 else if let EventType::RangeEnd(start_id) = ev_type {
345 if let Some((start_name, start_ev_type)) = parser_state.id_store.get(start_id) {
346 if *start_ev_type != EventType::RangeStart {
347 error!("Assertion failed: RangePart event has wrong RangeStart type!");
348 }
349 if let Some((ev_id, start_tm)) = parser_state.cur_started_ranges.remove(&ord_id) {
350 if ev_id != *start_id {
351 error!("Assertion failed: RangePart event has wrong RangeEnd id!");
352 }
353 let parsed = ParsedEvent::NamedRange {
354 name: start_name.clone(),
355 end_name: ev_name.clone(),
356 start: start_tm,
357 end: timestamp
358 };
359 f(&parsed, &parser_state.thread_info_state());
360 }
361 else {
362 warn!("Did not find start event for RangePart id: {}", id);
363 }
364 }
365 else {
366 warn!("Did not find start event for RangePart id: {}", id);
367 }
368 }
369 else {
370 parser_state.cur_started_ranges.insert(ord_id, (id, timestamp));
372 }
373 }
374 else {
375 error!("Did not find event name for id: {}", id);
376 let ev_name: Rc<str> = Rc::from(format!("Unknown RangePart {}", id));
377
378 let parsed = ParsedEvent::Instant {
379 name: ev_name,
380 tm: timestamp
381 };
382 f(&parsed, &parser_state.thread_info_state());
383 };
384
385 }
386 TracingEvent::UnnamedRangeEnd(_, ord_id ) => {
387 if let Some(start_info) = parser_state.cur_started_ranges.remove(&ord_id) {
388 if let Some((start_name, ev_type)) = parser_state.id_store.get(&start_info.0) {
389 if *ev_type != EventType::RangeStart {
390 error!("Assertion failed: UnnamedRangeEnd event has non-RangeStart type!");
391 }
392 let parsed = ParsedEvent::Range {
393 name: start_name.clone(),
394 start: start_info.1,
395 end: timestamp
396 };
397 f(&parsed, &parser_state.thread_info_state());
398 }
399 else {
400 warn!("Did not find start event for UnnamedRangeEnd id: {}. Skipping...", ord_id);
401 }
402 }
403 else {
404 warn!("Did not find start event for UnnamedRangeEnd id: {}. Skipping...", ord_id);
405 }
406 }
407 }
408 }
409
410 parser_state.stats.new_events(new_events_len, header.start_timestamp, header.end_timestamp);
411
412 parser_state.state_machine.ensure_buf_end();
413 }
414 }
415
416 Packet::FailedPages(failed_pages) => {
417 for header in failed_pages {
418 info!("Got failed pages header: {:?}", header);
419
420 let start = header.start_timestamp;
421 let dur = header.end_timestamp - header.start_timestamp;
422 let thread_ord_id = header.thread_ord_id;
423 self.thread_parser_state(thread_ord_id).missed_events.push((start, dur));
424 }
425 }
426
427 Packet::GracefulShutdown => {}
428 Packet::ConnectionAccepted => {}
429 Packet::Hello => {}
430 }
431 }
432 pub fn print_stats(&self) {
433 let ticks_per_ns = self.interpolation_points.get_avg_ticks_per_ns();
434 info!("Printing stats...");
435
436 let mut total_events = 0;
437 for (ord_id, thread) in &self.event_parsers {
438 info!("\tThread: {:?}#{:?}", thread.thread_name, ord_id);
439 let stats = thread.stats;
440
441 let events_per_sec = stats.total_events as f64 / ((stats.max_timestamp - stats.min_timestamp) as f64 / ticks_per_ns) * 1_000_000_000.0;
442 let events_per_sec_covered = stats.total_events as f64 / (stats.covered_dur as f64 / ticks_per_ns) * 1_000_000_000.0;
443 info!("Total events: {}", stats.total_events);
444 info!("Events per second (global): {} eps", events_per_sec);
445 info!("Events per second (covered): {} eps", events_per_sec_covered);
446 info!("Average event duration: {} ns", stats.covered_dur as f64 / ticks_per_ns / stats.total_events as f64);
447
448 info!("\n");
449
450 total_events += stats.total_events;
451 }
452
453 info!("Average bytes per event: {} bytes", self.counters.trace_buf as f64 / total_events as f64);
454 info!("Average transport bytes per event: {} bytes", self.counters.total_bytes() as f64 / total_events as f64);
455
456 }
457
458 #[cfg(feature="perfetto")]
461 pub fn parse_and_convert_to_perfetto(&mut self, packet_decoder: PacketDecoder) -> ParseResult<bytes::BytesMut> {
462 use crate::perfetto_format::PerfettoTraceFile;
463
464 let mut trace_res_file = PerfettoTraceFile::new();
465 self.parse_to_end(packet_decoder, |ev, thread_info| {
466 let thread_id = thread_info.thread_id.unwrap_or(999);
467 trace_res_file.set_thread_name(thread_id, thread_info.thread_name.as_deref());
468 #[cfg(feature="local-packet-bounds")]
469 trace_res_file.set_thread_name(999666 + thread_info.thread_ord_id, Some("[not thread] local packets"));
470
471 match ev {
472 ParsedEvent::Instant {
473 name,
474 tm
475 } => {
476 trace_res_file.add_point_event(name, thread_id, *tm);
477 }
478 ParsedEvent::Range {
479 name,
480 start,
481 end
482 } => {
483 trace_res_file.add_range_event(name, thread_id,
484 *start, *end);
485 }
486 ParsedEvent::NamedRange {
487 name,
488 end_name,
489 start,
490 end
491 } => {
492
493 trace_res_file.add_range_event(&format!("{} -> {}", name, end_name), thread_id,
494 *start, *end);
495 }
496 }
497 })?;
498
499 if cfg!(feature="local-packet-bounds") {
500 for (global_i, local_i, thread_ord_id, start,end) in std::mem::take(&mut self.local_packet_ranges).into_iter() {
501 trace_res_file.add_range_event(&format!("Local packet #{global_i}.{local_i}"), 999666 + thread_ord_id, start, end);
502 }
503 }
504
505 let encoder_info = self.machine_info.take().unwrap_or_else(|| {
506 warn!("Encoder info is not present in decoded data! Using default values");
507 SparklesMachineInfo::default()
508 });
509 trace_res_file.set_process_info(encoder_info.process_name, encoder_info.pid);
510
511 let bytes = trace_res_file.get_bytes();
512 Ok(bytes)
513 }
514 fn thread_parser_state(&mut self, thread_id: u64) -> &mut ThreadParserState {
515 self.event_parsers.entry(thread_id).or_default()
516 }
517}
518
519pub type TracingEventId = u8;
520
521#[derive(Debug, Copy, Clone)]
523pub enum TracingEvent {
524 Instant(TracingEventId, u64),
525 RangePart(TracingEventId, u64, u8),
526 UnnamedRangeEnd(u64, u8)
527}
528const VERSION: &str = env!("CARGO_PKG_VERSION");
529pub fn version() {
530 println!("Sparkles-parser v{VERSION}");
531 println!(" Using sparkles protocol version {}.{}", PROTOCOL_VERSION.0, PROTOCOL_VERSION.1);
532}