1use super::*;
28use anyhow::Context;
29use byteorder::{LittleEndian as LE, ReadBytesExt, WriteBytesExt};
30use std::mem::size_of;
31
32const SCOPE_BEGIN: u8 = b'(';
33const SCOPE_END: u8 = b')';
34
35#[derive(Clone, Copy, Debug, PartialEq, Eq)]
37pub struct ScopeRecord<'s> {
38 pub start_ns: NanoSecond,
40
41 pub duration_ns: NanoSecond,
43
44 pub data: &'s str,
47}
48
49impl ScopeRecord<'_> {
50 #[inline]
52 pub fn stop_ns(&self) -> NanoSecond {
53 self.start_ns + self.duration_ns
54 }
55}
56
57#[derive(Clone, Copy, Debug, PartialEq, Eq)]
59pub struct Scope<'s> {
60 pub id: ScopeId,
63 pub record: ScopeRecord<'s>,
65 pub child_begin_position: u64,
67 pub child_end_position: u64,
69 pub next_sibling_position: u64,
71}
72
73#[derive(Clone, Default)]
75#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
76pub struct Stream(Vec<u8>);
77
78impl Stream {
79 pub fn is_empty(&self) -> bool {
81 self.0.is_empty()
82 }
83
84 pub fn len(&self) -> usize {
86 self.0.len()
87 }
88
89 pub fn bytes(&self) -> &[u8] {
91 &self.0
92 }
93
94 pub fn clear(&mut self) {
96 self.0.clear();
97 }
98
99 fn extend(&mut self, bytes: &[u8]) {
101 self.0.extend(bytes);
102 }
103}
104
105impl From<Vec<u8>> for Stream {
106 fn from(v: Vec<u8>) -> Self {
107 Self(v)
108 }
109}
110
111impl Stream {
112 #[inline]
115 pub fn begin_scope<F: Fn() -> i64>(
116 &mut self,
117 now_ns: F,
118 scope_id: ScopeId,
119 data: &str,
120 ) -> (usize, NanoSecond) {
121 self.0.push(SCOPE_BEGIN);
122
123 self.write_scope_id(scope_id);
124 let time_stamp_offset = self.0.len();
125 self.0
126 .write_i64::<LE>(NanoSecond::default())
127 .expect("can't fail");
128
129 self.write_str(data);
130 let offset = self.0.len();
132 self.write_scope_size(ScopeSize::unfinished());
133
134 let mut time_stamp_dest =
136 &mut self.0[time_stamp_offset..time_stamp_offset + size_of::<NanoSecond>()];
137 let start_ns = now_ns();
138 time_stamp_dest
139 .write_i64::<LE>(start_ns)
140 .expect("can't fail");
141 (offset, start_ns)
142 }
143
144 #[inline]
146 pub fn end_scope(&mut self, start_offset: usize, stop_ns: NanoSecond) {
147 let scope_size = self.0.len() - (start_offset + size_of::<ScopeSize>());
149 debug_assert!(start_offset + size_of::<ScopeSize>() <= self.0.len());
150 let mut dest_range = &mut self.0[start_offset..start_offset + size_of::<ScopeSize>()];
151 dest_range
152 .write_u64::<LE>(scope_size as u64)
153 .expect("can't fail");
154 debug_assert!(dest_range.is_empty());
155
156 self.0.push(SCOPE_END);
158 self.write_nanos(stop_ns);
159 }
160
161 #[inline]
162 fn write_nanos(&mut self, nanos: NanoSecond) {
163 self.0.write_i64::<LE>(nanos).expect("can't fail");
164 }
165
166 #[inline]
167 fn write_scope_size(&mut self, nanos: ScopeSize) {
168 self.0.write_u64::<LE>(nanos.0).expect("can't fail");
169 }
170
171 #[inline]
172 fn write_scope_id(&mut self, scope_id: ScopeId) {
173 self.0
175 .write_u32::<LE>(scope_id.0.get())
176 .expect("can't fail");
177 }
178
179 #[inline]
180 fn write_str(&mut self, s: &str) {
181 const MAX_STRING_LENGTH: usize = 127;
183 let len = s.floor_char_boundary(MAX_STRING_LENGTH);
184 self.0.write_u8(len as u8).expect("can't fail");
185 self.0.extend(&s.as_bytes()[0..len]);
186 }
187}
188
189#[derive(Clone)]
191#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
192pub struct StreamInfo {
193 pub stream: Stream,
195
196 pub num_scopes: usize,
198
199 pub depth: usize,
202
203 pub range_ns: (NanoSecond, NanoSecond),
207}
208
209impl Default for StreamInfo {
210 fn default() -> Self {
211 Self {
212 stream: Default::default(),
213 num_scopes: 0,
214 depth: 0,
215 range_ns: (NanoSecond::MAX, NanoSecond::MIN),
216 }
217 }
218}
219
220impl StreamInfo {
221 pub fn parse(stream: Stream) -> Result<StreamInfo> {
225 let top_scopes = Reader::from_start(&stream).read_top_scopes()?;
226 if top_scopes.is_empty() {
227 Ok(StreamInfo {
228 stream,
229 num_scopes: 0,
230 depth: 0,
231 range_ns: (NanoSecond::MAX, NanoSecond::MIN),
232 })
233 } else {
234 let (num_scopes, depth) = Reader::count_scope_and_depth(&stream)?;
235 let min_ns = top_scopes.first().unwrap().record.start_ns;
236 let max_ns = top_scopes.last().unwrap().record.stop_ns();
237
238 Ok(StreamInfo {
239 stream,
240 num_scopes,
241 depth,
242 range_ns: (min_ns, max_ns),
243 })
244 }
245 }
246
247 pub fn extend(&mut self, other: &StreamInfoRef<'_>) {
249 self.stream.extend(other.stream);
250 self.num_scopes += other.num_scopes;
251 self.depth = self.depth.max(other.depth);
252 self.range_ns.0 = self.range_ns.0.min(other.range_ns.0);
253 self.range_ns.1 = self.range_ns.1.max(other.range_ns.1);
254 }
255
256 pub fn clear(&mut self) {
258 let Self {
259 stream,
260 num_scopes,
261 depth,
262 range_ns,
263 } = self;
264 stream.clear();
265 *num_scopes = 0;
266 *depth = 0;
267 *range_ns = (NanoSecond::MAX, NanoSecond::MIN);
268 }
269
270 pub fn as_stream_into_ref(&self) -> StreamInfoRef<'_> {
272 StreamInfoRef {
273 stream: self.stream.bytes(),
274 num_scopes: self.num_scopes,
275 depth: self.depth,
276 range_ns: self.range_ns,
277 }
278 }
279}
280
281#[derive(Clone, Copy)]
283pub struct StreamInfoRef<'a> {
284 pub stream: &'a [u8],
286
287 pub num_scopes: usize,
289
290 pub depth: usize,
293
294 pub range_ns: (NanoSecond, NanoSecond),
298}
299
300#[derive(Clone, Copy, Eq, PartialEq)]
302struct ScopeSize(u64);
303
304impl ScopeSize {
305 pub fn unfinished() -> Self {
307 Self(u64::MAX)
308 }
309}
310
311#[derive(Debug)]
313pub enum Error {
314 PrematureEnd,
316 InvalidStream,
318 ScopeNeverEnded,
320 InvalidOffset,
322 Empty,
324}
325
326pub type Result<T> = std::result::Result<T, Error>;
328pub struct Reader<'s>(std::io::Cursor<&'s [u8]>);
330
331impl<'s> Reader<'s> {
332 pub fn from_start(stream: &'s Stream) -> Self {
334 Self(std::io::Cursor::new(&stream.0[..]))
335 }
336
337 pub fn with_offset(stream: &'s Stream, offset: u64) -> Result<Self> {
339 if offset <= stream.len() as u64 {
340 let mut cursor = std::io::Cursor::new(&stream.0[..]);
341 cursor.set_position(offset);
342 Ok(Self(cursor))
343 } else {
344 Err(Error::InvalidOffset)
345 }
346 }
347
348 fn parse_scope(&mut self) -> Result<Option<Scope<'s>>> {
351 match self.peek_u8() {
352 Some(SCOPE_BEGIN) => {
353 self.parse_u8()
354 .expect("swallowing already peeked SCOPE_BEGIN");
355 }
356 Some(_) | None => return Ok(None),
357 }
358
359 let scope_id = self.parse_scope_id()?;
360 let start_ns = self.parse_nanos()?;
361 let data = self.parse_string()?;
362 let scope_size = self.parse_scope_size()?;
363 if scope_size == ScopeSize::unfinished() {
364 return Err(Error::ScopeNeverEnded);
365 }
366 let child_begin_position = self.0.position();
367 self.0.set_position(child_begin_position + scope_size.0);
368 let child_end_position = self.0.position();
369
370 if self.parse_u8()? != SCOPE_END {
371 return Err(Error::InvalidStream);
372 }
373 let stop_ns = self.parse_nanos()?;
374 if stop_ns < start_ns {
375 return Err(Error::InvalidStream);
376 }
377
378 Ok(Some(Scope {
379 id: scope_id,
380 record: ScopeRecord {
381 start_ns,
382 duration_ns: stop_ns - start_ns,
383 data,
384 },
385 child_begin_position,
386 child_end_position,
387 next_sibling_position: self.0.position(),
388 }))
389 }
390
391 pub fn read_top_scopes(self) -> Result<Vec<Scope<'s>>> {
393 let mut scopes = vec![];
394 for scope in self {
395 scopes.push(scope?);
396 }
397 Ok(scopes)
398 }
399
400 fn peek_u8(&mut self) -> Option<u8> {
402 let position = self.0.position();
403 let value = self.0.read_u8().ok();
404 self.0.set_position(position);
405 value
406 }
407
408 fn parse_u8(&mut self) -> Result<u8> {
409 self.0.read_u8().map_err(|_err| Error::PrematureEnd)
410 }
411
412 fn parse_scope_id(&mut self) -> Result<ScopeId> {
413 self.0
414 .read_u32::<LE>()
415 .context("Can not parse scope id")
416 .and_then(|x| NonZeroU32::new(x).context("Not a `NonZeroU32` scope id"))
417 .map(ScopeId)
418 .map_err(|_err| Error::PrematureEnd)
419 }
420
421 fn parse_nanos(&mut self) -> Result<NanoSecond> {
422 self.0.read_i64::<LE>().map_err(|_err| Error::PrematureEnd)
423 }
424
425 fn parse_scope_size(&mut self) -> Result<ScopeSize> {
426 self.0
427 .read_u64::<LE>()
428 .map_err(|_err| Error::PrematureEnd)
429 .map(ScopeSize)
430 }
431
432 fn parse_string(&mut self) -> Result<&'s str> {
433 let len = self.parse_u8().map_err(|_err| Error::PrematureEnd)? as usize;
434 let data = self.0.get_ref();
435 let begin = self.0.position() as usize;
436 let end = begin + len;
437 if end <= data.len() {
438 let s = longest_valid_utf8_prefix(&data[begin..end]);
439 self.0.set_position(end as u64);
440 Ok(s)
441 } else {
442 Err(Error::PrematureEnd)
443 }
444 }
445
446 pub fn count_scope_and_depth(stream: &Stream) -> Result<(usize, usize)> {
449 let mut max_depth = 0;
450 let num_scopes = Self::count_all_scopes_at_offset(stream, 0, 0, &mut max_depth)?;
451 Ok((num_scopes, max_depth))
452 }
453
454 fn count_all_scopes_at_offset(
455 stream: &Stream,
456 offset: u64,
457 depth: usize,
458 max_depth: &mut usize,
459 ) -> Result<usize> {
460 *max_depth = (*max_depth).max(depth);
461
462 let mut num_scopes = 0;
463 for child_scope in Reader::with_offset(stream, offset)? {
464 num_scopes += 1 + Self::count_all_scopes_at_offset(
465 stream,
466 child_scope?.child_begin_position,
467 depth + 1,
468 max_depth,
469 )?;
470 }
471
472 Ok(num_scopes)
473 }
474}
475
476fn longest_valid_utf8_prefix(data: &[u8]) -> &str {
477 match std::str::from_utf8(data) {
478 Ok(s) => s,
479 Err(error) => {
480 std::str::from_utf8(&data[..error.valid_up_to()]).expect("We can trust valid_up_to")
482 }
483 }
484}
485
486impl<'s> Iterator for Reader<'s> {
488 type Item = Result<Scope<'s>>;
489 fn next(&mut self) -> Option<Self::Item> {
490 self.parse_scope().transpose()
491 }
492}
493
494#[test]
495fn write_scope() {
496 let mut stream: Stream = Stream::default();
497 let start = stream.begin_scope(|| 100, ScopeId::new(1), "data");
498 stream.end_scope(start.0, 300);
499
500 let scopes = Reader::from_start(&stream).read_top_scopes().unwrap();
501 assert_eq!(scopes.len(), 1);
502 assert_eq!(
503 scopes[0].record,
504 ScopeRecord {
505 start_ns: 100,
506 duration_ns: 200,
507 data: "data"
508 }
509 );
510}
511
512#[test]
513fn test_profile_data() {
514 let stream = {
515 let mut stream = Stream::default();
516 let (t0, _) = stream.begin_scope(|| 100, ScopeId::new(1), "data_top");
517 let (m1, _) = stream.begin_scope(|| 200, ScopeId::new(2), "data_middle_0");
518 stream.end_scope(m1, 300);
519 let (m1, _) = stream.begin_scope(|| 300, ScopeId::new(3), "data_middle_1");
520 stream.end_scope(m1, 400);
521 stream.end_scope(t0, 400);
522 stream
523 };
524
525 let top_scopes = Reader::from_start(&stream).read_top_scopes().unwrap();
526 assert_eq!(top_scopes.len(), 1);
527 assert_eq!(
528 top_scopes[0].record,
529 ScopeRecord {
530 start_ns: 100,
531 duration_ns: 300,
532 data: "data_top"
533 }
534 );
535
536 let middle_scopes = Reader::with_offset(&stream, top_scopes[0].child_begin_position)
537 .unwrap()
538 .read_top_scopes()
539 .unwrap();
540
541 assert_eq!(middle_scopes.len(), 2);
542
543 assert_eq!(
544 middle_scopes[0].record,
545 ScopeRecord {
546 start_ns: 200,
547 duration_ns: 100,
548 data: "data_middle_0"
549 }
550 );
551 assert_eq!(
552 middle_scopes[1].record,
553 ScopeRecord {
554 start_ns: 300,
555 duration_ns: 100,
556 data: "data_middle_1"
557 }
558 );
559}