1#![warn(missing_docs)]
40use std::{
41 borrow::Cow,
42 collections::{HashMap, HashSet},
43 fmt::{Debug, Display},
44 io::{Cursor, Read},
45 iter::{once, repeat_n},
46 mem::swap,
47 sync::{
48 Arc,
49 atomic::{AtomicBool, AtomicI64, Ordering},
50 },
51 thread::JoinHandle,
52 time::{Duration, Instant},
53};
54
55#[cfg(target_os = "macos")]
56use std::time::{SystemTime, UNIX_EPOCH};
57
58use crossbeam::sync::{Parker, Unparker};
59use flate2::{
60 Compression,
61 bufread::{GzDecoder, GzEncoder},
62};
63use itertools::Itertools;
64use memory_stats::memory_stats;
65#[cfg(not(target_os = "macos"))]
66use nix::time::{ClockId, clock_gettime};
67use serde::{Deserialize, Serialize};
68use serde_json::{Value, json};
69use size_of::HumanBytes;
70use tracing::warn;
71
72#[derive(Debug)]
76struct AtomicOptionTimestamp(AtomicI64);
77
78impl Default for AtomicOptionTimestamp {
79 fn default() -> Self {
80 Self::new(None)
81 }
82}
83
84impl AtomicOptionTimestamp {
85 const fn new(value: Option<Timestamp>) -> Self {
86 Self(AtomicI64::new(match value {
87 Some(timestamp) => timestamp.0,
88 None => i64::MIN,
89 }))
90 }
91
92 fn load(&self) -> Option<Timestamp> {
93 let value = self.0.load(Ordering::Acquire);
94 (value != i64::MIN).then_some(Timestamp(value))
95 }
96
97 fn store(&self, value: Option<Timestamp>) {
98 self.0.store(
99 value.map_or(i64::MIN, |timestamp| timestamp.0),
100 Ordering::Release,
101 )
102 }
103}
104
105#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
106#[repr(transparent)]
107struct Timestamp(
108 i64,
117);
118
119#[cfg(target_os = "macos")]
120fn mach_absolute_time_nanos() -> i64 {
121 use mach2::mach_time::{mach_absolute_time, mach_timebase_info, mach_timebase_info_data_t};
122 use std::sync::OnceLock;
123
124 static NANOS_PER_TICK: OnceLock<(u32, u32)> = OnceLock::new();
125 let (numer, denom) = *NANOS_PER_TICK.get_or_init(|| {
126 let mut info = mach_timebase_info_data_t { numer: 0, denom: 0 };
127 unsafe {
128 mach_timebase_info(&mut info);
129 }
130 if info.denom == 0 {
131 (1, 1)
132 } else {
133 (info.numer, info.denom)
134 }
135 });
136 let ticks = unsafe { mach_absolute_time() };
137 (ticks * u64::from(numer) / u64::from(denom)) as i64
138}
139
140impl Timestamp {
141 fn now() -> Self {
142 #[cfg(target_os = "macos")]
143 {
144 Self(mach_absolute_time_nanos())
145 }
146
147 #[cfg(not(target_os = "macos"))]
148 {
149 let now = clock_gettime(ClockId::CLOCK_MONOTONIC).unwrap();
150 Self(now.tv_sec() as i64 * 1_000_000_000 + now.tv_nsec() as i64)
151 }
152 }
153
154 fn saturating_sub(self, other: Self) -> Duration {
156 if self.0 >= other.0 {
157 Duration::from_nanos(self.0.abs_diff(other.0))
158 } else {
159 Duration::ZERO
160 }
161 }
162}
163
164#[cfg(target_os = "macos")]
166fn unix_epoch_nanos() -> i64 {
167 SystemTime::now()
168 .duration_since(UNIX_EPOCH)
169 .expect("system clock is before Unix epoch")
170 .as_nanos() as i64
171}
172
173impl From<Instant> for Timestamp {
174 fn from(value: Instant) -> Self {
175 let zero = unsafe { std::mem::zeroed::<Instant>() };
188 Self((value - zero).as_nanos() as i64)
189 }
190}
191
192impl Display for Timestamp {
193 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194 write!(f, "{}", self.0)
195 }
196}
197
198impl Serialize for Timestamp {
199 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
203 where
204 S: serde::Serializer,
205 {
206 let milliseconds = self.0 as f64 / 1_000_000.0;
207 milliseconds.serialize(serializer)
208 }
209}
210
211impl<'de> Deserialize<'de> for Timestamp {
212 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
216 where
217 D: serde::Deserializer<'de>,
218 {
219 let milliseconds = f64::deserialize(deserializer)?;
220 Ok(Self((milliseconds * 1_000_000.0) as i64))
221 }
222}
223
224impl Debug for Span {
225 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
226 write!(f, "Span")?;
227 if let Some(inner) = &self.0 {
228 write!(f, "({})", &inner.name)?;
229 }
230 Ok(())
231 }
232}
233
234struct SpanInner {
235 start: Timestamp,
236 category: &'static str,
237 name: &'static str,
238 tooltip: String,
239}
240
241impl SpanInner {
242 #[cold]
243 fn new(name: &'static str) -> Self {
244 Self {
245 start: Timestamp::now(),
246 category: "Other",
247 name,
248 tooltip: String::new(),
249 }
250 }
251
252 fn into_marker(self, end: MarkerEnd) -> Marker {
253 Marker {
254 start: self.start,
255 end,
256 category: self.category,
257 name: self.name,
258 tooltip: self.tooltip,
259 }
260 }
261
262 #[cold]
263 fn record(self, end: MarkerEnd) {
264 QUEUE.with(|queue| queue.push(self.into_marker(end)));
265 }
266}
267
268pub struct Span(Option<SpanInner>);
280
281impl Span {
282 pub const BYTES: usize = std::mem::size_of::<Marker>();
285
286 #[must_use]
298 pub fn new(name: &'static str) -> Self {
299 Self(Capture::is_active().then(|| SpanInner::new(name)))
300 }
301
302 #[must_use]
309 pub fn with_category(mut self, category: &'static str) -> Self {
310 if let Some(inner) = &mut self.0 {
311 inner.category = category;
312 }
313 self
314 }
315
316 #[must_use]
324 pub fn with_tooltip<F>(mut self, tooltip: F) -> Self
325 where
326 F: FnOnce() -> String,
327 {
328 if let Some(inner) = &mut self.0 {
329 inner.tooltip = tooltip();
330 }
331 self
332 }
333
334 #[must_use]
338 pub fn with_start(mut self, start: Instant) -> Self {
339 if let Some(inner) = &mut self.0 {
340 inner.start = start.into();
341 }
342 self
343 }
344
345 pub fn in_scope<F, T>(self, f: F) -> T
347 where
348 F: FnOnce() -> T,
349 {
350 f()
351 }
352
353 pub fn record(self) {
355 }
357
358 pub fn cancel(mut self) {
360 let _ = self.0.take();
361 }
362}
363
364impl Drop for Span {
365 fn drop(&mut self) {
366 if let Some(inner) = self.0.take() {
367 inner.record(MarkerEnd::At(Timestamp::now()))
368 }
369 }
370}
371
372pub struct LongSpanBuilder(SpanInner);
374
375impl LongSpanBuilder {
376 #[must_use]
383 pub fn new(name: &'static str) -> Self {
384 Self(SpanInner::new(name))
385 }
386
387 #[must_use]
394 pub fn with_category(mut self, category: &'static str) -> Self {
395 self.0.category = category;
396 self
397 }
398
399 #[must_use]
405 pub fn with_tooltip(mut self, tooltip: impl Into<String>) -> Self {
406 self.0.tooltip = tooltip.into();
407 self
408 }
409
410 #[must_use]
414 pub fn with_start(mut self, start: Instant) -> Self {
415 self.0.start = start.into();
416 self
417 }
418
419 #[must_use]
421 pub fn build(self) -> LongSpan {
422 let timestamp = Arc::new(AtomicOptionTimestamp::default());
423 QUEUE.with(|queue| {
424 queue.push_long_span(self.0.into_marker(MarkerEnd::Long(timestamp.clone())))
425 });
426 LongSpan(timestamp)
427 }
428}
429
430pub struct LongSpan(Arc<AtomicOptionTimestamp>);
440
441impl LongSpan {
442 pub fn complete(self) {
446 }
448}
449
450impl Drop for LongSpan {
451 fn drop(&mut self) {
452 self.0.store(Some(Timestamp::now()));
453 }
454}
455
456pub struct Event(Option<SpanInner>);
468
469impl Event {
470 #[must_use]
479 pub fn new(name: &'static str) -> Self {
480 Self(Capture::is_active().then(|| SpanInner::new(name)))
481 }
482
483 #[must_use]
490 pub fn with_category(mut self, category: &'static str) -> Self {
491 if let Some(inner) = &mut self.0 {
492 inner.category = category;
493 }
494 self
495 }
496
497 #[must_use]
505 pub fn with_tooltip<F>(mut self, tooltip: F) -> Self
506 where
507 F: FnOnce() -> String,
508 {
509 if let Some(inner) = &mut self.0 {
510 inner.tooltip = tooltip();
511 }
512 self
513 }
514
515 pub fn record(self) {
517 if let Some(inner) = self.0 {
518 let end = MarkerEnd::At(inner.start);
519 inner.record(end);
520 }
521 }
522}
523
524#[derive(Clone, Debug)]
526pub struct CaptureOptions {
527 memory_limit: Option<usize>,
528 record_rss: bool,
529}
530
531impl Default for CaptureOptions {
532 fn default() -> Self {
533 Self {
534 memory_limit: None,
535 record_rss: true,
536 }
537 }
538}
539
540impl CaptureOptions {
541 pub fn new() -> Self {
543 Self::default()
544 }
545
546 pub fn with_memory_limit(self, memory_limit: Option<usize>) -> Self {
553 Self {
554 memory_limit,
555 ..self
556 }
557 }
558
559 pub fn with_record_rss(self, record_rss: bool) -> Self {
564 Self { record_rss, ..self }
565 }
566
567 pub async fn start(self) -> Capture {
574 Capture::new(self, CAPTURE_MUTEX.lock().await)
575 }
576
577 pub fn blocking_start(self) -> Capture {
588 Capture::new(self, CAPTURE_MUTEX.blocking_lock())
589 }
590
591 pub fn try_start(self) -> Result<Capture, Self> {
598 let guard = match CAPTURE_MUTEX.try_lock() {
599 Ok(guard) => guard,
600 Err(_) => return Err(self),
601 };
602 Ok(Capture::new(self, guard))
603 }
604}
605
606static CAPTURE_MUTEX: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
607
608pub struct Capture {
615 _guard: tokio::sync::MutexGuard<'static, ()>,
616 start_time: Timestamp,
617 memory: Option<JoinHandle<Vec<(Timestamp, usize)>>>,
618 unparker: Unparker,
619 request_exit: Arc<AtomicBool>,
620 block_limit: i64,
621 #[cfg(target_os = "macos")]
622 anchor: (Timestamp, i64),
623}
624
625impl Capture {
626 fn new(params: CaptureOptions, guard: tokio::sync::MutexGuard<'static, ()>) -> Self {
627 let start = Timestamp::now();
628 if let Some(memory_limit) = params.memory_limit {
629 tracing::info!(
630 "marker capture limited to {}",
631 HumanBytes::from(memory_limit)
632 );
633 }
634 let block_limit = params.memory_limit.map_or(i64::MAX, |memory_limit| {
635 (memory_limit / BYTES_PER_BLOCK) as i64
636 });
637 let parker = Parker::new();
638 let unparker = parker.unparker().clone();
639 let request_exit = Arc::new(AtomicBool::new(false));
640 let memory = params.record_rss.then(|| {
641 std::thread::Builder::new()
642 .name(String::from("capture-rss"))
643 .spawn({
644 let request_exit = request_exit.clone();
645 move || {
646 let mut memory = Vec::new();
647 while !request_exit.load(Ordering::Acquire) {
648 if let Some(memory_stats) = memory_stats() {
649 memory.push((Timestamp::now(), memory_stats.physical_mem));
650 }
651 parker.park_timeout(Duration::from_millis(100));
652 }
653 memory
654 }
655 })
656 .expect("should be able to start a capture thread")
657 });
658 FREE_BLOCKS.store(block_limit, Ordering::Relaxed);
659 MARKERS_EXHAUSTED.store(None);
660 CAPTURING.store(true, Ordering::Release);
661 Self {
662 start_time: start,
663 block_limit,
664 memory,
665 unparker,
666 request_exit,
667 #[cfg(target_os = "macos")]
668 anchor: (Timestamp::now(), unix_epoch_nanos()),
669 _guard: guard,
670 }
671 }
672
673 pub fn finish(mut self) -> Annotations {
675 let end_time = Timestamp::now();
676 CAPTURING.store(false, Ordering::Release);
677 let markers_exhausted = MARKERS_EXHAUSTED.load();
678 let free_blocks = FREE_BLOCKS.load(Ordering::Relaxed);
679 let used =
680 HumanBytes::from((self.block_limit - free_blocks.max(0)) as usize * BYTES_PER_BLOCK);
681 if free_blocks < 0 {
682 } else {
683 tracing::info!("marker capture used {used}");
684 }
685
686 let mut markers: HashMap<usize, (Option<String>, Blocks)> = self
687 .all_threads()
688 .into_iter()
689 .map(|thread| {
690 let mut blocks = thread.queue.take_blocks();
691
692 let long_spans = thread.queue.take_long_spans();
693 if !long_spans.is_empty() {
694 blocks.0.push(Block(long_spans));
695 }
696
697 (thread.tid, (thread.name, blocks))
698 })
699 .collect();
700
701 if let Some(markers_exhausted) = markers_exhausted {
702 let elapsed = markers_exhausted.saturating_sub(self.start_time);
703 let tooltip = format!(
704 "marker capture exceeded the limit ({used}) after {:.1} s",
705 elapsed.as_secs_f64()
706 );
707 tracing::info!("{tooltip}");
708 let marker = Marker {
709 start: self.start_time,
710 end: MarkerEnd::At(markers_exhausted),
711 category: "profiling",
712 name: "Profiling",
713 tooltip,
714 };
715 markers
716 .entry(nix::unistd::getpid().as_raw() as usize)
717 .or_default()
718 .1
719 .0
720 .push(Block::new(marker));
721 }
722
723 Annotations {
724 end_time,
725 markers,
726 memory: self.take_memory(),
727 #[cfg(target_os = "macos")]
728 anchor: self.anchor,
729 }
730 }
731
732 pub fn abort(self) {
736 tracing::info!("aborting profile annotation capture");
737 }
738
739 pub fn is_active() -> bool {
746 CAPTURING.load(Ordering::Acquire)
747 }
748
749 fn all_threads(&mut self) -> Vec<ThreadMarkers> {
750 ALL_THREAD_MARKERS.lock().unwrap().clone()
751 }
752
753 fn take_memory(&mut self) -> Vec<(Timestamp, usize)> {
754 if let Some(memory) = self.memory.take() {
755 self.request_exit.store(true, Ordering::Release);
756 self.unparker.unpark();
757 memory.join().unwrap_or_default()
758 } else {
759 Default::default()
760 }
761 }
762}
763
764impl Drop for Capture {
765 fn drop(&mut self) {
766 self.take_memory();
767
768 CAPTURING.store(false, Ordering::Release);
771 }
772}
773
774#[derive(thiserror::Error, Debug)]
776pub enum Error {
777 #[error("Error decompressing profile ")]
779 GzDecoderError(#[from] std::io::Error),
780
781 #[error("Error parsing profile")]
783 SerdeError(#[from] serde_json_path_to_error::Error),
784}
785
786#[derive(Default, Clone, Debug)]
788pub struct AnnotationOptions {
789 product: Option<String>,
790 os_cpu: Option<String>,
791}
792
793impl AnnotationOptions {
794 pub fn new() -> Self {
796 Self::default()
797 }
798
799 pub fn with_product(self, product: Option<impl Into<String>>) -> Self {
805 Self {
806 product: product.map(|s| s.into()),
807 ..self
808 }
809 }
810
811 pub fn with_os_cpu(self, os_cpu: Option<impl Into<String>>) -> Self {
817 Self {
818 os_cpu: os_cpu.map(|s| s.into()),
819 ..self
820 }
821 }
822}
823
824pub struct Annotations {
828 end_time: Timestamp,
829 markers: HashMap<usize, (Option<String>, Blocks)>,
830 memory: Vec<(Timestamp, usize)>,
831 #[cfg(target_os = "macos")]
832 anchor: (Timestamp, i64),
833}
834
835impl Annotations {
836 pub fn apply(&self, profile: &[u8], options: AnnotationOptions) -> Result<Vec<u8>, Error> {
843 let mut buffer = Vec::new();
846 let json = if profile.starts_with(&[0x1f, 0x8b]) {
847 GzDecoder::new(profile).read_to_end(&mut buffer)?;
848 &buffer
849 } else {
850 profile
851 };
852 let gzip = !buffer.is_empty();
853
854 let mut profile = serde_json_path_to_error::from_slice::<Profile>(json)?;
856
857 #[cfg(target_os = "macos")]
878 let to_profile_time = {
879 let profile_start_time_ms = profile.meta.start_time;
880 let (anchor_monotonic_ns, anchor_wall_clock_ns) = self.anchor;
881 let profile_start_wall_clock_ns = (profile_start_time_ms * 1_000_000.0) as i64;
882 let adjustment_ns = anchor_wall_clock_ns - profile_start_wall_clock_ns;
883 move |timestamp: Timestamp| {
884 Timestamp(timestamp.0 - anchor_monotonic_ns.0 + adjustment_ns)
885 }
886 };
887 #[cfg(not(target_os = "macos"))]
888 let to_profile_time = |timestamp: Timestamp| timestamp;
889
890 if let Some(product) = options.product {
891 profile.meta.product = product;
892 }
893 if let Some(os_cpu) = options.os_cpu {
894 profile.meta.os_cpu = os_cpu;
895 }
896 profile.meta.marker_schema.push(json!({
897 "name": "FelderaMarker",
898 "display": [
899 "marker-chart",
900 "marker-table"
901 ],
902 "chartLabel": "{marker.data.name}",
903 "tooltipLabel": "{marker.data.name}",
904 "tableLabel": "{marker.data.name}",
905 "description": "Marker generated by Feldera.",
906 "fields": [
907 {
908 "key": "name",
909 "label": "Name",
910 "format": "unique-string"
911 }
912 ]
913 }));
914 static CATEGORY_COLORS: [&str; 12] = [
917 "purple",
918 "green",
919 "orange",
920 "yellow",
921 "lightblue",
922 "blue",
923 "brown",
924 "magenta",
925 "red",
926 "lightred",
927 "darkgrey",
928 "grey",
929 ];
930 let mut categories = profile
931 .meta
932 .categories
933 .iter()
934 .enumerate()
935 .map(|(index, category)| (category.name.clone(), index))
936 .collect::<HashMap<_, _>>();
937 for (category, color) in self
938 .markers
939 .values()
940 .flat_map(|(_, markers)| markers.iter())
941 .map(|marker| marker.category)
942 .collect::<HashSet<_>>()
943 .into_iter()
944 .zip(CATEGORY_COLORS.iter().cycle())
945 {
946 categories.insert(category.into(), profile.meta.categories.len());
947 profile.meta.categories.push(Category {
948 color: (*color).into(),
949 name: category.into(),
950 other: [(String::from("subcategories"), json!(["Other"]))]
951 .into_iter()
952 .collect(),
953 });
954 }
955 for thread in &mut profile.threads {
956 if let Some(tid) = &thread.tid
957 && let Ok(tid) = tid.parse::<usize>()
958 && let Some((name, markers)) = self.markers.get(&tid)
959 {
960 if let Some(name) = name {
961 thread.name = Some(name.clone());
962 }
963 for marker in markers.iter() {
964 thread.markers.length += 1;
965 thread.markers.category.push(categories[marker.category]);
966 thread.markers.data.push(ProfileMarkerData {
967 type_: Cow::from("FelderaMarker"),
968 name: profile.shared.add_name(&marker.tooltip),
969 });
970 thread
971 .markers
972 .start_time
973 .push(to_profile_time(marker.start));
974 thread.markers.end_time.push(to_profile_time(
975 marker.end.timestamp().unwrap_or(self.end_time),
976 ));
977 thread
978 .markers
979 .name
980 .push(profile.shared.add_name(marker.name));
981 thread.markers.phase.push(1);
982 }
983 }
984 }
985
986 if !self.memory.is_empty() {
987 profile.counters.push(RawCounter {
988 name: String::from("RSS"),
989 category: String::from("Memory"),
990 description: String::from("RSS in bytes"),
991 pid: profile.threads.first().unwrap().pid.clone(),
992 main_thread_index: 0,
993 samples: {
994 RawCounterSamplesTable {
995 time: self
996 .memory
997 .iter()
998 .map(|(time, _rss)| to_profile_time(*time))
999 .collect(),
1000 time_deltas: Vec::new(),
1001 number: repeat_n(0, self.memory.len()).collect(),
1002 count: once(self.memory[0].1 as i64)
1003 .chain(
1004 self.memory
1005 .iter()
1006 .map(|(_time, rss)| *rss as i64)
1007 .tuple_windows()
1008 .map(|(prev, next)| next - prev),
1009 )
1010 .collect(),
1011 length: self.memory.len(),
1012 other: Default::default(),
1013 }
1014 },
1015 other: Default::default(),
1016 });
1017 }
1018
1019 let output = serde_json::to_vec(&profile).unwrap();
1021 let output = if gzip {
1022 let mut gzipped_output = Vec::new();
1023 GzEncoder::new(Cursor::new(output), Compression::fast())
1024 .read_to_end(&mut gzipped_output)
1025 .unwrap();
1026 gzipped_output
1027 } else {
1028 output
1029 };
1030
1031 return Ok(output);
1032
1033 #[derive(Debug, Serialize, Deserialize)]
1037 #[serde(rename_all = "camelCase")]
1038 struct Profile {
1039 meta: Meta,
1040 threads: Vec<Thread>,
1041 #[serde(default)]
1042 counters: Vec<RawCounter>,
1043 shared: Shared,
1044 #[serde(flatten)]
1045 other: HashMap<String, Value>,
1046 }
1047
1048 #[derive(Debug, Serialize, Deserialize)]
1049 #[serde(rename_all = "camelCase")]
1050 struct Meta {
1051 product: String,
1052 #[serde(rename = "oscpu")]
1053 os_cpu: String,
1054 start_time: f64,
1055 categories: Vec<Category>,
1056 marker_schema: Vec<Value>,
1057 #[serde(flatten)]
1058 other: HashMap<String, Value>,
1059 }
1060
1061 #[derive(Debug, Serialize, Deserialize)]
1062 #[serde(rename_all = "camelCase")]
1063 struct Category {
1064 color: String,
1065 name: String,
1066 #[serde(flatten)]
1067 other: HashMap<String, Value>,
1068 }
1069
1070 #[derive(Debug, Serialize, Deserialize)]
1071 #[serde(rename_all = "camelCase")]
1072 struct Thread {
1073 name: Option<String>,
1074 #[serde(default)]
1075 markers: ProfileMarkers,
1076 tid: Option<String>,
1077 pid: String,
1078 #[serde(flatten)]
1079 other: HashMap<String, Value>,
1080 }
1081
1082 #[derive(Default, Debug, Serialize, Deserialize)]
1083 #[serde(rename_all = "camelCase")]
1084 struct ProfileMarkers {
1085 length: usize,
1086 category: Vec<usize>,
1087 data: Vec<ProfileMarkerData>,
1088 start_time: Vec<Timestamp>,
1089 end_time: Vec<Timestamp>,
1090 name: Vec<usize>,
1091 phase: Vec<usize>,
1092 }
1093
1094 #[derive(Default, Debug, Serialize, Deserialize)]
1095 #[serde(rename_all = "camelCase")]
1096 struct ProfileMarkerData {
1097 #[serde(rename = "type")]
1098 type_: Cow<'static, str>,
1099 name: usize,
1100 }
1101
1102 #[derive(Debug, Serialize, Deserialize)]
1103 #[serde(rename_all = "camelCase")]
1104 struct Shared {
1105 string_array: Vec<String>,
1106 #[serde(flatten)]
1107 other: HashMap<String, Value>,
1108 }
1109
1110 impl Shared {
1111 fn add_name(&mut self, name: &str) -> usize {
1112 let index = self.string_array.len();
1113 self.string_array.push(name.into());
1114 index
1115 }
1116 }
1117
1118 #[derive(Default, Debug, Serialize, Deserialize)]
1119 #[serde(rename_all = "camelCase")]
1120 struct RawCounter {
1121 name: String,
1122 category: String,
1123 description: String,
1124 pid: String,
1125 main_thread_index: usize,
1126 samples: RawCounterSamplesTable,
1127 #[serde(flatten)]
1128 other: HashMap<String, Value>,
1129 }
1130
1131 #[derive(Default, Debug, Serialize, Deserialize)]
1132 #[serde(rename_all = "camelCase")]
1133 struct RawCounterSamplesTable {
1134 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1135 time: Vec<Timestamp>,
1136 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1137 time_deltas: Vec<Timestamp>,
1138 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1139 number: Vec<usize>,
1140 count: Vec<i64>,
1141 length: usize,
1142 #[serde(flatten)]
1143 other: HashMap<String, Value>,
1144 }
1145 }
1146}
1147
1148static CAPTURING: AtomicBool = AtomicBool::new(false);
1150
1151#[derive(Clone, Debug)]
1153enum MarkerEnd {
1154 At(Timestamp),
1156
1157 Long(Arc<AtomicOptionTimestamp>),
1160}
1161
1162impl MarkerEnd {
1163 fn should_keep(&self, capturing: bool) -> bool {
1170 if let MarkerEnd::Long(timestamp) = self {
1171 if timestamp.load().is_some() {
1172 capturing
1175 } else if Arc::strong_count(timestamp) > 1 {
1176 true
1178 } else {
1179 capturing
1186 }
1187 } else {
1188 false
1189 }
1190 }
1191
1192 fn timestamp(&self) -> Option<Timestamp> {
1195 match self {
1196 MarkerEnd::At(timestamp) => Some(*timestamp),
1197 MarkerEnd::Long(timestamp) => timestamp.load(),
1198 }
1199 }
1200}
1201
1202#[derive(Clone, Debug)]
1204struct Marker {
1205 start: Timestamp,
1207 end: MarkerEnd,
1209 category: &'static str,
1211 name: &'static str,
1213 tooltip: String,
1215}
1216
1217#[derive(Clone)]
1219struct ThreadMarkers {
1220 tid: usize,
1224
1225 name: Option<String>,
1230
1231 queue: Arc<Queue>,
1236}
1237
1238impl ThreadMarkers {
1239 fn new(queue: Arc<Queue>) -> Self {
1240 #[cfg(target_os = "linux")]
1241 let tid = nix::unistd::gettid().as_raw() as usize;
1242 #[cfg(not(target_os = "linux"))]
1243 let tid = thread_id::get();
1244
1245 Self {
1246 tid,
1247 name: std::thread::current().name().map(|s| s.into()),
1248 queue,
1249 }
1250 }
1251}
1252
1253static ALL_THREAD_MARKERS: std::sync::Mutex<Vec<ThreadMarkers>> = std::sync::Mutex::new(Vec::new());
1255
1256static FREE_BLOCKS: AtomicI64 = AtomicI64::new(0);
1260
1261const MARKERS_PER_BLOCK: usize = 32;
1263
1264const BYTES_PER_BLOCK: usize = MARKERS_PER_BLOCK * Span::BYTES;
1266
1267static MARKERS_EXHAUSTED: AtomicOptionTimestamp = AtomicOptionTimestamp::new(None);
1269
1270struct Block(Vec<Marker>);
1271impl Block {
1272 fn new(marker: Marker) -> Self {
1273 let mut markers = Vec::with_capacity(MARKERS_PER_BLOCK);
1274 markers.push(marker);
1275 Self(markers)
1276 }
1277 fn is_full(&self) -> bool {
1278 self.0.len() >= self.0.capacity()
1279 }
1280 fn push(&mut self, marker: Marker) {
1281 self.0.push(marker);
1282 }
1283}
1284
1285struct Blocks(Vec<Block>);
1286
1287impl Default for Blocks {
1288 fn default() -> Self {
1289 Self(Vec::with_capacity(32))
1290 }
1291}
1292
1293impl Blocks {
1294 fn push(&mut self, marker: Marker) {
1295 if let Some(block) = self.0.last_mut()
1296 && !block.is_full()
1297 {
1298 block.push(marker);
1299 } else {
1300 match FREE_BLOCKS.fetch_sub(1, Ordering::Relaxed) {
1301 1.. => self.0.push(Block::new(marker)),
1302 0 => {
1303 if MARKERS_EXHAUSTED.load().is_none() {
1307 MARKERS_EXHAUSTED.store(Some(Timestamp::now()));
1308 }
1309 }
1310 _ => (),
1311 }
1312 }
1313 }
1314
1315 fn iter(&self) -> impl Iterator<Item = &Marker> {
1316 self.0.iter().flat_map(|block| block.0.iter())
1317 }
1318}
1319
1320#[derive(Debug, Default)]
1321struct LongSpans {
1322 markers: Vec<Marker>,
1323}
1324
1325impl LongSpans {
1326 fn push(&mut self, marker: Marker) {
1327 if self.markers.len() == self.markers.capacity() {
1328 let capturing = Capture::is_active();
1330 self.markers
1331 .retain(|marker| marker.end.should_keep(capturing));
1332 }
1333 self.markers.push(marker);
1334 }
1335
1336 fn append(&mut self, other: &mut Vec<Marker>) {
1337 if self.markers.is_empty() {
1338 swap(&mut self.markers, other);
1339 } else {
1340 self.markers.append(other);
1341 }
1342 }
1343}
1344
1345struct Queue {
1346 blocks: std::sync::Mutex<Blocks>,
1348
1349 long_spans: std::sync::Mutex<LongSpans>,
1354}
1355
1356impl Queue {
1357 fn new() -> Arc<Self> {
1358 let queue = Arc::new(Self {
1359 blocks: Default::default(),
1360 long_spans: Default::default(),
1361 });
1362 ALL_THREAD_MARKERS
1363 .lock()
1364 .unwrap()
1365 .push(ThreadMarkers::new(queue.clone()));
1366 queue
1367 }
1368
1369 fn push(&self, marker: Marker) {
1371 self.blocks.lock().unwrap().push(marker);
1372 }
1373
1374 fn push_long_span(&self, marker: Marker) {
1376 self.long_spans.lock().unwrap().push(marker);
1377 }
1378
1379 fn take_blocks(&self) -> Blocks {
1380 std::mem::take(&mut *self.blocks.lock().unwrap())
1381 }
1382
1383 fn take_long_spans(&self) -> Vec<Marker> {
1384 let old_long_spans = std::mem::take(&mut *self.long_spans.lock().unwrap()).markers;
1385
1386 let mut new_long_spans = Vec::with_capacity(old_long_spans.capacity());
1388 for marker in &old_long_spans {
1389 if marker.end.should_keep(false) {
1390 new_long_spans.push(marker.clone());
1391 }
1392 }
1393 if !new_long_spans.is_empty() {
1394 self.long_spans.lock().unwrap().append(&mut new_long_spans);
1395 }
1396
1397 old_long_spans
1398 }
1399}
1400
1401thread_local! {
1402 static QUEUE: Arc<Queue> = Queue::new();
1403}