Skip to main content

feldera_samply/
lib.rs

1//! Annotations for Firefox Profiler profiles.
2//!
3//! [Firefox Profiler] can display user-defined annotations for timespans and
4//! events, as well as information about network activity and memory use, but
5//! only if the profiler added those annotations.  The popular [samply]
6//! profiler, however, has only very limited support for adding these
7//! annotations.  This crate provides a way for a process that is being profiled
8//! to record its own annotations and then merge them into profiler output in a
9//! postprocessing step.
10//!
11//! # Use
12//!
13//! This crate can be integrated into an existing profiler workflow.  For
14//! example, the [Feldera incremental compute engine] runs `samply` as a
15//! subprocess, targeting itself.  While `samply` runs, Feldera uses [Capture],
16//! [Span], [LongSpan], and [Event] to record annotations.  After `samply`
17//! completes, Feldera finishes the capture to obtain [Annotations], applies
18//! them, and then passes the postprocessed output to the user.  The annotation
19//! step is invisible to the user.
20//!
21//! Short of this kind of integration, where a process effectively profiles
22//! itself, there must be some way to enable capturing and saving profile data.
23//! For example, a command-line option or an environment variable could do the
24//! trick.  Once the capture is complete, the process needs to somehow save the
25//! annotations.
26//!
27//! # Viewing in Firefox Profiler
28//!
29//! Spans and events logged by this module show up in the Marker Chart and
30//! Marker Table tabs for a given thread. They are linked to particular threads
31//! and the profiler will only show them when those threads are selected.
32//!
33//! Spans and events are enabled only when a profile is running.  They have
34//! minimal overhead otherwise.
35//!
36//! [samply]: https://github.com/mstange/samply?tab=readme-ov-file#samply
37//! [Firefox Profiler]: https://profiler.firefox.com/
38//! [Feldera incremental compute engine]: https://feldera.com
39#![warn(missing_docs)]
40use std::{
41    borrow::Cow,
42    collections::{HashMap, HashSet},
43    fmt::{Debug, Display},
44    io::{Cursor, Read},
45    iter::{once, repeat_n},
46    mem::swap,
47    sync::{
48        Arc,
49        atomic::{AtomicBool, AtomicI64, Ordering},
50    },
51    thread::JoinHandle,
52    time::{Duration, Instant},
53};
54
55#[cfg(target_os = "macos")]
56use std::time::{SystemTime, UNIX_EPOCH};
57
58use crossbeam::sync::{Parker, Unparker};
59use flate2::{
60    Compression,
61    bufread::{GzDecoder, GzEncoder},
62};
63use itertools::Itertools;
64use memory_stats::memory_stats;
65#[cfg(not(target_os = "macos"))]
66use nix::time::{ClockId, clock_gettime};
67use serde::{Deserialize, Serialize};
68use serde_json::{Value, json};
69use size_of::HumanBytes;
70use tracing::warn;
71
72/// Atomic `Option<Timestamp>`.
73///
74/// Treats `i64::MIN` as a niche.
75#[derive(Debug)]
76struct AtomicOptionTimestamp(AtomicI64);
77
78impl Default for AtomicOptionTimestamp {
79    fn default() -> Self {
80        Self::new(None)
81    }
82}
83
84impl AtomicOptionTimestamp {
85    const fn new(value: Option<Timestamp>) -> Self {
86        Self(AtomicI64::new(match value {
87            Some(timestamp) => timestamp.0,
88            None => i64::MIN,
89        }))
90    }
91
92    fn load(&self) -> Option<Timestamp> {
93        let value = self.0.load(Ordering::Acquire);
94        (value != i64::MIN).then_some(Timestamp(value))
95    }
96
97    fn store(&self, value: Option<Timestamp>) {
98        self.0.store(
99            value.map_or(i64::MIN, |timestamp| timestamp.0),
100            Ordering::Release,
101        )
102    }
103}
104
105#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
106#[repr(transparent)]
107struct Timestamp(
108    /// Monotonic time in nanoseconds.
109    ///
110    /// On macOS this is [`mach_absolute_time`] converted to nanoseconds via
111    /// [`mach_timebase_info`].  On other Unix platforms this is
112    /// `CLOCK_MONOTONIC`.
113    ///
114    /// [`mach_absolute_time`]: https://developer.apple.com/documentation/kernel/1462446-mach_absolute_time
115    /// [`mach_timebase_info`]: https://developer.apple.com/documentation/kernel/1462447-mach_timebase_info
116    i64,
117);
118
119#[cfg(target_os = "macos")]
120fn mach_absolute_time_nanos() -> i64 {
121    use mach2::mach_time::{mach_absolute_time, mach_timebase_info, mach_timebase_info_data_t};
122    use std::sync::OnceLock;
123
124    static NANOS_PER_TICK: OnceLock<(u32, u32)> = OnceLock::new();
125    let (numer, denom) = *NANOS_PER_TICK.get_or_init(|| {
126        let mut info = mach_timebase_info_data_t { numer: 0, denom: 0 };
127        unsafe {
128            mach_timebase_info(&mut info);
129        }
130        if info.denom == 0 {
131            (1, 1)
132        } else {
133            (info.numer, info.denom)
134        }
135    });
136    let ticks = unsafe { mach_absolute_time() };
137    (ticks * u64::from(numer) / u64::from(denom)) as i64
138}
139
140impl Timestamp {
141    fn now() -> Self {
142        #[cfg(target_os = "macos")]
143        {
144            Self(mach_absolute_time_nanos())
145        }
146
147        #[cfg(not(target_os = "macos"))]
148        {
149            let now = clock_gettime(ClockId::CLOCK_MONOTONIC).unwrap();
150            Self(now.tv_sec() as i64 * 1_000_000_000 + now.tv_nsec() as i64)
151        }
152    }
153
154    /// Computes `self - other`, returning zero if `self < other`.
155    fn saturating_sub(self, other: Self) -> Duration {
156        if self.0 >= other.0 {
157            Duration::from_nanos(self.0.abs_diff(other.0))
158        } else {
159            Duration::ZERO
160        }
161    }
162}
163
164/// Nanoseconds since the Unix epoch.
165#[cfg(target_os = "macos")]
166fn unix_epoch_nanos() -> i64 {
167    SystemTime::now()
168        .duration_since(UNIX_EPOCH)
169        .expect("system clock is before Unix epoch")
170        .as_nanos() as i64
171}
172
173impl From<Instant> for Timestamp {
174    fn from(value: Instant) -> Self {
175        // SAFETY: On Unix, `Instant` is implemented using CLOCK_MONOTONIC,
176        // which is the clock that we need to use for the profiler, but the Rust
177        // standard library provides no way to get the value out.  We don't want
178        // to make assumptions about the layout of [Instant], and in fact it is
179        // not defined as libc's struct timespec but different and
180        // Rust-specific.  If we just transmute then we get the wrong value.  It
181        // seems rather safer to assume that the all-bytes-zeros `Instant` is
182        // the origin, and it works OK for now at least.
183        //
184        // The completely safe alternative would be to make Timestamp public and
185        // force clients to always get both a Timestamp and an Instant if they
186        // need both, which is wasteful.
187        let zero = unsafe { std::mem::zeroed::<Instant>() };
188        Self((value - zero).as_nanos() as i64)
189    }
190}
191
192impl Display for Timestamp {
193    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194        write!(f, "{}", self.0)
195    }
196}
197
198impl Serialize for Timestamp {
199    /// Serializes the format used in the [Gecko profiler] format.
200    ///
201    /// [Gecko profiler]: https://github.com/firefox-devtools/profiler/blob/main/src/types/profile.ts
202    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
203    where
204        S: serde::Serializer,
205    {
206        let milliseconds = self.0 as f64 / 1_000_000.0;
207        milliseconds.serialize(serializer)
208    }
209}
210
211impl<'de> Deserialize<'de> for Timestamp {
212    /// Deserializes the format used in the [Gecko profiler] format.
213    ///
214    /// [Gecko profiler]: https://github.com/firefox-devtools/profiler/blob/main/src/types/profile.ts
215    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
216    where
217        D: serde::Deserializer<'de>,
218    {
219        let milliseconds = f64::deserialize(deserializer)?;
220        Ok(Self((milliseconds * 1_000_000.0) as i64))
221    }
222}
223
224impl Debug for Span {
225    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
226        write!(f, "Span")?;
227        if let Some(inner) = &self.0 {
228            write!(f, "({})", &inner.name)?;
229        }
230        Ok(())
231    }
232}
233
234struct SpanInner {
235    start: Timestamp,
236    category: &'static str,
237    name: &'static str,
238    tooltip: String,
239}
240
241impl SpanInner {
242    #[cold]
243    fn new(name: &'static str) -> Self {
244        Self {
245            start: Timestamp::now(),
246            category: "Other",
247            name,
248            tooltip: String::new(),
249        }
250    }
251
252    fn into_marker(self, end: MarkerEnd) -> Marker {
253        Marker {
254            start: self.start,
255            end,
256            category: self.category,
257            name: self.name,
258            tooltip: self.tooltip,
259        }
260    }
261
262    #[cold]
263    fn record(self, end: MarkerEnd) {
264        QUEUE.with(|queue| queue.push(self.into_marker(end)));
265    }
266}
267
268/// Annotates a timespan during a [Capture].
269///
270/// Constructing and dropping a [Span], when marker spans are being captured
271/// with [Capture], records the start and end times of the [Span] along with a
272/// name, category, and tooltip.
273///
274/// `Span` is for timespans.  Use [Event] for point-in-time events.
275///
276/// When [Capture] is not active, `Span` has minimal overhead.
277///
278/// [samply]: https://github.com/mstange/samply?tab=readme-ov-file#samply
279pub struct Span(Option<SpanInner>);
280
281impl Span {
282    /// The number of bytes of memory used during capture to record a [Span],
283    /// [LongSpan], or [Event].
284    pub const BYTES: usize = std::mem::size_of::<Marker>();
285
286    /// Constructs a new [Span] with the given name.  When the constructed
287    /// span is dropped, it is automatically recorded.
288    ///
289    /// [Span] does nothing when markers are not being captured.  A span
290    /// will be recorded in a profile only if markers were being captured both
291    /// when it was created and when it was dropped.
292    ///
293    /// The name should ordinarily be a short static string indicating what
294    /// happens during the span.  The Firefox Profiler's marker chart view shows
295    /// all the spans in a thread with the same name and category on a single
296    /// horizontal timeline (unless that would cause overlaps).
297    #[must_use]
298    pub fn new(name: &'static str) -> Self {
299        Self(Capture::is_active().then(|| SpanInner::new(name)))
300    }
301
302    /// Adds `category` to this span.
303    ///
304    /// The Firefox Profiler's marker chart view groups the markers in each
305    /// category and labels them with the category name.
306    ///
307    /// The default category is "Other".
308    #[must_use]
309    pub fn with_category(mut self, category: &'static str) -> Self {
310        if let Some(inner) = &mut self.0 {
311            inner.category = category;
312        }
313        self
314    }
315
316    /// Evaluates `tooltip` and adds it to this span.
317    ///
318    /// The Firefox Profiler shows the given tooltip in the marker chart
319    /// timeline (often truncated) and on hover, and as "details" in the marker
320    /// table view.
321    ///
322    /// `tooltip` is only evaluated if capturing is active.
323    #[must_use]
324    pub fn with_tooltip<F>(mut self, tooltip: F) -> Self
325    where
326        F: FnOnce() -> String,
327    {
328        if let Some(inner) = &mut self.0 {
329            inner.tooltip = tooltip();
330        }
331        self
332    }
333
334    /// Sets the starting time for this span to `start`.  The default starting
335    /// time is when the [Span] was constructed, so this is only useful if
336    /// it's easier to create the span just before recording it.
337    #[must_use]
338    pub fn with_start(mut self, start: Instant) -> Self {
339        if let Some(inner) = &mut self.0 {
340            inner.start = start.into();
341        }
342        self
343    }
344
345    /// Calls `f` and records the span.  Returns whatever `f` returned.
346    pub fn in_scope<F, T>(self, f: F) -> T
347    where
348        F: FnOnce() -> T,
349    {
350        f()
351    }
352
353    /// Records the span.
354    pub fn record(self) {
355        // [Drop] records the span.
356    }
357
358    /// Consumes the span without recording it.
359    pub fn cancel(mut self) {
360        let _ = self.0.take();
361    }
362}
363
364impl Drop for Span {
365    fn drop(&mut self) {
366        if let Some(inner) = self.0.take() {
367            inner.record(MarkerEnd::At(Timestamp::now()))
368        }
369    }
370}
371
372/// Builds a [LongSpan] for annotating a long timespan.
373pub struct LongSpanBuilder(SpanInner);
374
375impl LongSpanBuilder {
376    /// Constructs a new [LongSpanBuilder] with the given name.
377    ///
378    /// The name should ordinarily be a short static string indicating what
379    /// happens during the span.  The Firefox Profiler's marker chart view shows
380    /// all the spans in a thread with the same name and category on a single
381    /// horizontal timeline (unless that would cause overlaps).
382    #[must_use]
383    pub fn new(name: &'static str) -> Self {
384        Self(SpanInner::new(name))
385    }
386
387    /// Adds `category` to this span.
388    ///
389    /// The Firefox Profiler's marker chart view groups the markers in each
390    /// category and labels them with the category name.
391    ///
392    /// The default category is "Other".
393    #[must_use]
394    pub fn with_category(mut self, category: &'static str) -> Self {
395        self.0.category = category;
396        self
397    }
398
399    /// Adds `tooltip` to this span.
400    ///
401    /// The Firefox Profiler shows the given tooltip in the marker chart
402    /// timeline (often truncated) and on hover, and as "details" in the marker
403    /// table view.
404    #[must_use]
405    pub fn with_tooltip(mut self, tooltip: impl Into<String>) -> Self {
406        self.0.tooltip = tooltip.into();
407        self
408    }
409
410    /// Sets the starting time for this span to `start`.  The default starting
411    /// time is when the [LongSpanBuilder] was constructed, so this is only
412    /// useful if it's easier to create the span just before recording it.
413    #[must_use]
414    pub fn with_start(mut self, start: Instant) -> Self {
415        self.0.start = start.into();
416        self
417    }
418
419    /// Builds the [LongSpan].
420    #[must_use]
421    pub fn build(self) -> LongSpan {
422        let timestamp = Arc::new(AtomicOptionTimestamp::default());
423        QUEUE.with(|queue| {
424            queue.push_long_span(self.0.into_marker(MarkerEnd::Long(timestamp.clone())))
425        });
426        LongSpan(timestamp)
427    }
428}
429
430/// A relatively expensive way to annotate a longer timespan during [Capture]s.
431///
432/// `LongSpan` is much like [Span].  However, whereas [Span] is optimized to
433/// minimize time and space overhead when a capture is not active, `LongSpan`
434/// always tracks the span.  This allows it to show up in captures that start
435/// after the `LongSpan` is allocated or that end before the `LongSpan` is
436/// recorded.
437///
438/// [samply]: https://github.com/mstange/samply?tab=readme-ov-file#samply
439pub struct LongSpan(Arc<AtomicOptionTimestamp>);
440
441impl LongSpan {
442    /// Marks this `LongSpan` as complete.
443    ///
444    /// This is equivalent to dropping it.
445    pub fn complete(self) {
446        // [Drop] completes the span.
447    }
448}
449
450impl Drop for LongSpan {
451    fn drop(&mut self) {
452        self.0.store(Some(Timestamp::now()));
453    }
454}
455
456/// Annotates an event during a [Capture].
457///
458/// When a [Capture] is running, use this type to record an event along
459/// with a name, category, and tooltip.
460///
461/// An event happens at a point in time; use [Span] to record a timespan.
462///
463/// When [Capture] is not active, `Event` has minimal overhead.
464///
465/// [samply]: https://github.com/mstange/samply?tab=readme-ov-file#samply
466/// [module documentation]: crate
467pub struct Event(Option<SpanInner>);
468
469impl Event {
470    /// Constructs a new [Event] with the given name.
471    ///
472    /// [Event] does nothing when markers are not being captured.
473    ///
474    /// The name should ordinarily be a short static string indicating what the
475    /// event did.  The Firefox Profiler's marker chart view shows all the
476    /// events in a thread with the same name and category on a single
477    /// horizontal timeline (unless that would cause overlaps).
478    #[must_use]
479    pub fn new(name: &'static str) -> Self {
480        Self(Capture::is_active().then(|| SpanInner::new(name)))
481    }
482
483    /// Adds `category` to this event.
484    ///
485    /// The Firefox Profiler's marker chart view groups the markers in each
486    /// category and labels them with the category name.
487    ///
488    /// The default category is "Other".
489    #[must_use]
490    pub fn with_category(mut self, category: &'static str) -> Self {
491        if let Some(inner) = &mut self.0 {
492            inner.category = category;
493        }
494        self
495    }
496
497    /// Evaluates `tooltip` and adds it to this event.
498    ///
499    /// The Firefox Profiler shows the given tooltip in the marker chart
500    /// timeline (often truncated) and on hover, and as "details" in the marker
501    /// table view.
502    ///
503    /// `tooltip` is only evaluated if capturing is active.
504    #[must_use]
505    pub fn with_tooltip<F>(mut self, tooltip: F) -> Self
506    where
507        F: FnOnce() -> String,
508    {
509        if let Some(inner) = &mut self.0 {
510            inner.tooltip = tooltip();
511        }
512        self
513    }
514
515    /// Records the event.
516    pub fn record(self) {
517        if let Some(inner) = self.0 {
518            let end = MarkerEnd::At(inner.start);
519            inner.record(end);
520        }
521    }
522}
523
524/// Options for capturing profile annotations.
525#[derive(Clone, Debug)]
526pub struct CaptureOptions {
527    memory_limit: Option<usize>,
528    record_rss: bool,
529}
530
531impl Default for CaptureOptions {
532    fn default() -> Self {
533        Self {
534            memory_limit: None,
535            record_rss: true,
536        }
537    }
538}
539
540impl CaptureOptions {
541    /// Creates new capture parameters with default settings.
542    pub fn new() -> Self {
543        Self::default()
544    }
545
546    /// Sets a limit on the amount of memory that can be used for recording
547    /// captured spans to `memory_limit`, in bytes.  If `memory_limit` is
548    /// `None`, then there will be no limit (which is also the default).
549    ///
550    /// The limit is honored approximately.  Recording a span takes
551    /// [Span::BYTES] bytes.
552    pub fn with_memory_limit(self, memory_limit: Option<usize>) -> Self {
553        Self {
554            memory_limit,
555            ..self
556        }
557    }
558
559    /// Enables or disables recording the amount of memory in use (the resident
560    /// set size) during the capture.  By default, RSS is recorded every 100
561    /// milliseconds.  When this feature is enabled, capture runs a thread for
562    /// recording the data.
563    pub fn with_record_rss(self, record_rss: bool) -> Self {
564        Self { record_rss, ..self }
565    }
566
567    /// Starts capturing profile annotations, returning a [Capture] that can be
568    /// used to finish or abort captures.  Dropping the [Capture] will also
569    /// abort capturing.
570    ///
571    /// For use in asynchronous contexts.  If a capture is already in progress,
572    /// this function will wait for it to complete before starting a new one.
573    pub async fn start(self) -> Capture {
574        Capture::new(self, CAPTURE_MUTEX.lock().await)
575    }
576
577    /// Starts capturing profile annotations, returning a [Capture] that can be
578    /// used to finish or abort captures.  Dropping the [Capture] will also
579    /// abort capturing.
580    ///
581    /// For use in blocking contexts.  If a capture is already in progress, this
582    /// function will wait for it to complete before starting a new one.
583    ///
584    /// # Panic
585    ///
586    /// Panics if called from an asynchronous execution context.
587    pub fn blocking_start(self) -> Capture {
588        Capture::new(self, CAPTURE_MUTEX.blocking_lock())
589    }
590
591    /// Starts capturing profile annotations, returning a [Capture] that can be
592    /// used to finish or abort captures.  Dropping the [Capture] will also
593    /// abort capturing.
594    ///
595    /// If a capture is already in progress, this function returns an error
596    /// instead of waiting.
597    pub fn try_start(self) -> Result<Capture, Self> {
598        let guard = match CAPTURE_MUTEX.try_lock() {
599            Ok(guard) => guard,
600            Err(_) => return Err(self),
601        };
602        Ok(Capture::new(self, guard))
603    }
604}
605
606static CAPTURE_MUTEX: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
607
608/// An in-progress capture of profile annotations.
609///
610/// To start a capture, use [CaptureOptions::start],
611/// [CaptureOptions::blocking_start], or [CaptureOptions::try_start].
612///
613/// Only one `Capture` may exist at one time.
614pub struct Capture {
615    _guard: tokio::sync::MutexGuard<'static, ()>,
616    start_time: Timestamp,
617    memory: Option<JoinHandle<Vec<(Timestamp, usize)>>>,
618    unparker: Unparker,
619    request_exit: Arc<AtomicBool>,
620    block_limit: i64,
621    #[cfg(target_os = "macos")]
622    anchor: (Timestamp, i64),
623}
624
625impl Capture {
626    fn new(params: CaptureOptions, guard: tokio::sync::MutexGuard<'static, ()>) -> Self {
627        let start = Timestamp::now();
628        if let Some(memory_limit) = params.memory_limit {
629            tracing::info!(
630                "marker capture limited to {}",
631                HumanBytes::from(memory_limit)
632            );
633        }
634        let block_limit = params.memory_limit.map_or(i64::MAX, |memory_limit| {
635            (memory_limit / BYTES_PER_BLOCK) as i64
636        });
637        let parker = Parker::new();
638        let unparker = parker.unparker().clone();
639        let request_exit = Arc::new(AtomicBool::new(false));
640        let memory = params.record_rss.then(|| {
641            std::thread::Builder::new()
642                .name(String::from("capture-rss"))
643                .spawn({
644                    let request_exit = request_exit.clone();
645                    move || {
646                        let mut memory = Vec::new();
647                        while !request_exit.load(Ordering::Acquire) {
648                            if let Some(memory_stats) = memory_stats() {
649                                memory.push((Timestamp::now(), memory_stats.physical_mem));
650                            }
651                            parker.park_timeout(Duration::from_millis(100));
652                        }
653                        memory
654                    }
655                })
656                .expect("should be able to start a capture thread")
657        });
658        FREE_BLOCKS.store(block_limit, Ordering::Relaxed);
659        MARKERS_EXHAUSTED.store(None);
660        CAPTURING.store(true, Ordering::Release);
661        Self {
662            start_time: start,
663            block_limit,
664            memory,
665            unparker,
666            request_exit,
667            #[cfg(target_os = "macos")]
668            anchor: (Timestamp::now(), unix_epoch_nanos()),
669            _guard: guard,
670        }
671    }
672
673    /// Finishes recording profile annotations and returns what was recorded.
674    pub fn finish(mut self) -> Annotations {
675        let end_time = Timestamp::now();
676        CAPTURING.store(false, Ordering::Release);
677        let markers_exhausted = MARKERS_EXHAUSTED.load();
678        let free_blocks = FREE_BLOCKS.load(Ordering::Relaxed);
679        let used =
680            HumanBytes::from((self.block_limit - free_blocks.max(0)) as usize * BYTES_PER_BLOCK);
681        if free_blocks < 0 {
682        } else {
683            tracing::info!("marker capture used {used}");
684        }
685
686        let mut markers: HashMap<usize, (Option<String>, Blocks)> = self
687            .all_threads()
688            .into_iter()
689            .map(|thread| {
690                let mut blocks = thread.queue.take_blocks();
691
692                let long_spans = thread.queue.take_long_spans();
693                if !long_spans.is_empty() {
694                    blocks.0.push(Block(long_spans));
695                }
696
697                (thread.tid, (thread.name, blocks))
698            })
699            .collect();
700
701        if let Some(markers_exhausted) = markers_exhausted {
702            let elapsed = markers_exhausted.saturating_sub(self.start_time);
703            let tooltip = format!(
704                "marker capture exceeded the limit ({used}) after {:.1} s",
705                elapsed.as_secs_f64()
706            );
707            tracing::info!("{tooltip}");
708            let marker = Marker {
709                start: self.start_time,
710                end: MarkerEnd::At(markers_exhausted),
711                category: "profiling",
712                name: "Profiling",
713                tooltip,
714            };
715            markers
716                .entry(nix::unistd::getpid().as_raw() as usize)
717                .or_default()
718                .1
719                .0
720                .push(Block::new(marker));
721        }
722
723        Annotations {
724            end_time,
725            markers,
726            memory: self.take_memory(),
727            #[cfg(target_os = "macos")]
728            anchor: self.anchor,
729        }
730    }
731
732    /// Aborts recording profile annotations.
733    ///
734    /// This is equivalent to dropping the `Capture` object.
735    pub fn abort(self) {
736        tracing::info!("aborting profile annotation capture");
737    }
738
739    /// Returns true if a profile annotation capture is ongoing.
740    ///
741    /// This only reports the status of annotation captures.  It does not
742    /// indicate whether `samply` or `perf` or another profiler is currently
743    /// capturing profile data for this process (this crate does not provide a
744    /// way to do that).
745    pub fn is_active() -> bool {
746        CAPTURING.load(Ordering::Acquire)
747    }
748
749    fn all_threads(&mut self) -> Vec<ThreadMarkers> {
750        ALL_THREAD_MARKERS.lock().unwrap().clone()
751    }
752
753    fn take_memory(&mut self) -> Vec<(Timestamp, usize)> {
754        if let Some(memory) = self.memory.take() {
755            self.request_exit.store(true, Ordering::Release);
756            self.unparker.unpark();
757            memory.join().unwrap_or_default()
758        } else {
759            Default::default()
760        }
761    }
762}
763
764impl Drop for Capture {
765    fn drop(&mut self) {
766        self.take_memory();
767
768        // Might already have been done in [Capture::finish] but it doesn't hurt
769        // to do it again (since the lock is still held).
770        CAPTURING.store(false, Ordering::Release);
771    }
772}
773
774/// Error returned by [Annotations::apply].
775#[derive(thiserror::Error, Debug)]
776pub enum Error {
777    /// Error decompressing the profile.
778    #[error("Error decompressing profile ")]
779    GzDecoderError(#[from] std::io::Error),
780
781    /// Error parsing the profile.
782    #[error("Error parsing profile")]
783    SerdeError(#[from] serde_json_path_to_error::Error),
784}
785
786/// Options for applying annotations.
787#[derive(Default, Clone, Debug)]
788pub struct AnnotationOptions {
789    product: Option<String>,
790    os_cpu: Option<String>,
791}
792
793impl AnnotationOptions {
794    /// Constructs a default set of options.
795    pub fn new() -> Self {
796        Self::default()
797    }
798
799    /// Overrides the product string in the profile.  `None` uses the default,
800    /// which is `PID <pid>`.
801    ///
802    /// The Firefox Profiler prominently displays the product and OS-CPU string
803    /// together in the form `<product> - <OS-CPU>`.
804    pub fn with_product(self, product: Option<impl Into<String>>) -> Self {
805        Self {
806            product: product.map(|s| s.into()),
807            ..self
808        }
809    }
810
811    /// Overrides the OS and CPU string in the profile.  `None` uses the
812    /// default, which looks like `Ubuntu 24.0.04.4 LTS`.
813    ///
814    /// The Firefox Profiler prominently displays the product and OS-CPU string
815    /// together in the form `<product> - <OS-CPU>`.
816    pub fn with_os_cpu(self, os_cpu: Option<impl Into<String>>) -> Self {
817        Self {
818            os_cpu: os_cpu.map(|s| s.into()),
819            ..self
820        }
821    }
822}
823
824/// Profile annotation data.
825///
826/// Obtained from [Capture::finish].
827pub struct Annotations {
828    end_time: Timestamp,
829    markers: HashMap<usize, (Option<String>, Blocks)>,
830    memory: Vec<(Timestamp, usize)>,
831    #[cfg(target_os = "macos")]
832    anchor: (Timestamp, i64),
833}
834
835impl Annotations {
836    /// Applies these annotations with the given `options` to `profile`, which
837    /// must be the `profile.json` output by samply, and returns the annotated
838    /// profile.
839    ///
840    /// The input may be gzipped or already decompressed.  The output will be in
841    /// the same form.
842    pub fn apply(&self, profile: &[u8], options: AnnotationOptions) -> Result<Vec<u8>, Error> {
843        // Decompress `profile` if it starts with the GZIP magic number,
844        // otherwise assume it has already been decompressed.
845        let mut buffer = Vec::new();
846        let json = if profile.starts_with(&[0x1f, 0x8b]) {
847            GzDecoder::new(profile).read_to_end(&mut buffer)?;
848            &buffer
849        } else {
850            profile
851        };
852        let gzip = !buffer.is_empty();
853
854        // Deserialize.
855        let mut profile = serde_json_path_to_error::from_slice::<Profile>(json)?;
856
857        // On macOS, timestamps in the samply profile are nanoseconds relative to `startTime`
858        // recorded in the profile's metadata. startTime is measured using wall-clock time,
859        // _not_ the monotonic clock time.
860        //
861        // Below `anchor_monotonic_ns` and `anchor_wall_clock_ns` represent the same point in time expressed
862        // in monotonic clock units and wall clock units respectively.
863        //
864        // `profile_start_wall_clock_ms` is the wall clock time when the profile started, extracted
865        // from the profile's metadata.
866        //
867        // `timestamp` is the event timestamp to convert to profile time.
868        //
869        // ```text
870        // -----------------|---------------------------------------|--------------------------|----------> time
871        //  (anchor_monotonic_ns, anchor_wall_clock_ns)      profile_start_wall_clock_ms   timestamp
872        // ```
873        //
874        // To convert a timestamp to relative nanoseconds expected by samply, we need to:
875        // 1. Calculate elapsed since the Timestamp recorded in anchor.
876        // 2. Add adjustment - the difference between the anchor wall-clock time and the profile start wall-clock time.
877        #[cfg(target_os = "macos")]
878        let to_profile_time = {
879            let profile_start_time_ms = profile.meta.start_time;
880            let (anchor_monotonic_ns, anchor_wall_clock_ns) = self.anchor;
881            let profile_start_wall_clock_ns = (profile_start_time_ms * 1_000_000.0) as i64;
882            let adjustment_ns = anchor_wall_clock_ns - profile_start_wall_clock_ns;
883            move |timestamp: Timestamp| {
884                Timestamp(timestamp.0 - anchor_monotonic_ns.0 + adjustment_ns)
885            }
886        };
887        #[cfg(not(target_os = "macos"))]
888        let to_profile_time = |timestamp: Timestamp| timestamp;
889
890        if let Some(product) = options.product {
891            profile.meta.product = product;
892        }
893        if let Some(os_cpu) = options.os_cpu {
894            profile.meta.os_cpu = os_cpu;
895        }
896        profile.meta.marker_schema.push(json!({
897            "name": "FelderaMarker",
898            "display": [
899                "marker-chart",
900                "marker-table"
901            ],
902            "chartLabel": "{marker.data.name}",
903            "tooltipLabel": "{marker.data.name}",
904            "tableLabel": "{marker.data.name}",
905            "description": "Marker generated by Feldera.",
906            "fields": [
907                {
908                    "key": "name",
909                    "label": "Name",
910                    "format": "unique-string"
911                }
912            ]
913        }));
914        /// The colors that the profiler accepts for categories (see
915        /// https://github.com/firefox-devtools/profiler/blob/main/src/types/profile.ts).
916        static CATEGORY_COLORS: [&str; 12] = [
917            "purple",
918            "green",
919            "orange",
920            "yellow",
921            "lightblue",
922            "blue",
923            "brown",
924            "magenta",
925            "red",
926            "lightred",
927            "darkgrey",
928            "grey",
929        ];
930        let mut categories = profile
931            .meta
932            .categories
933            .iter()
934            .enumerate()
935            .map(|(index, category)| (category.name.clone(), index))
936            .collect::<HashMap<_, _>>();
937        for (category, color) in self
938            .markers
939            .values()
940            .flat_map(|(_, markers)| markers.iter())
941            .map(|marker| marker.category)
942            .collect::<HashSet<_>>()
943            .into_iter()
944            .zip(CATEGORY_COLORS.iter().cycle())
945        {
946            categories.insert(category.into(), profile.meta.categories.len());
947            profile.meta.categories.push(Category {
948                color: (*color).into(),
949                name: category.into(),
950                other: [(String::from("subcategories"), json!(["Other"]))]
951                    .into_iter()
952                    .collect(),
953            });
954        }
955        for thread in &mut profile.threads {
956            if let Some(tid) = &thread.tid
957                && let Ok(tid) = tid.parse::<usize>()
958                && let Some((name, markers)) = self.markers.get(&tid)
959            {
960                if let Some(name) = name {
961                    thread.name = Some(name.clone());
962                }
963                for marker in markers.iter() {
964                    thread.markers.length += 1;
965                    thread.markers.category.push(categories[marker.category]);
966                    thread.markers.data.push(ProfileMarkerData {
967                        type_: Cow::from("FelderaMarker"),
968                        name: profile.shared.add_name(&marker.tooltip),
969                    });
970                    thread
971                        .markers
972                        .start_time
973                        .push(to_profile_time(marker.start));
974                    thread.markers.end_time.push(to_profile_time(
975                        marker.end.timestamp().unwrap_or(self.end_time),
976                    ));
977                    thread
978                        .markers
979                        .name
980                        .push(profile.shared.add_name(marker.name));
981                    thread.markers.phase.push(1);
982                }
983            }
984        }
985
986        if !self.memory.is_empty() {
987            profile.counters.push(RawCounter {
988                name: String::from("RSS"),
989                category: String::from("Memory"),
990                description: String::from("RSS in bytes"),
991                pid: profile.threads.first().unwrap().pid.clone(),
992                main_thread_index: 0,
993                samples: {
994                    RawCounterSamplesTable {
995                        time: self
996                            .memory
997                            .iter()
998                            .map(|(time, _rss)| to_profile_time(*time))
999                            .collect(),
1000                        time_deltas: Vec::new(),
1001                        number: repeat_n(0, self.memory.len()).collect(),
1002                        count: once(self.memory[0].1 as i64)
1003                            .chain(
1004                                self.memory
1005                                    .iter()
1006                                    .map(|(_time, rss)| *rss as i64)
1007                                    .tuple_windows()
1008                                    .map(|(prev, next)| next - prev),
1009                            )
1010                            .collect(),
1011                        length: self.memory.len(),
1012                        other: Default::default(),
1013                    }
1014                },
1015                other: Default::default(),
1016            });
1017        }
1018
1019        // Produce the output, gzipping it if the input was gzipped.
1020        let output = serde_json::to_vec(&profile).unwrap();
1021        let output = if gzip {
1022            let mut gzipped_output = Vec::new();
1023            GzEncoder::new(Cursor::new(output), Compression::fast())
1024                .read_to_end(&mut gzipped_output)
1025                .unwrap();
1026            gzipped_output
1027        } else {
1028            output
1029        };
1030
1031        return Ok(output);
1032
1033        /// This is the [Gecko profiler] format.
1034        ///
1035        /// [Gecko profiler]: https://github.com/firefox-devtools/profiler/blob/main/src/types/profile.ts
1036        #[derive(Debug, Serialize, Deserialize)]
1037        #[serde(rename_all = "camelCase")]
1038        struct Profile {
1039            meta: Meta,
1040            threads: Vec<Thread>,
1041            #[serde(default)]
1042            counters: Vec<RawCounter>,
1043            shared: Shared,
1044            #[serde(flatten)]
1045            other: HashMap<String, Value>,
1046        }
1047
1048        #[derive(Debug, Serialize, Deserialize)]
1049        #[serde(rename_all = "camelCase")]
1050        struct Meta {
1051            product: String,
1052            #[serde(rename = "oscpu")]
1053            os_cpu: String,
1054            start_time: f64,
1055            categories: Vec<Category>,
1056            marker_schema: Vec<Value>,
1057            #[serde(flatten)]
1058            other: HashMap<String, Value>,
1059        }
1060
1061        #[derive(Debug, Serialize, Deserialize)]
1062        #[serde(rename_all = "camelCase")]
1063        struct Category {
1064            color: String,
1065            name: String,
1066            #[serde(flatten)]
1067            other: HashMap<String, Value>,
1068        }
1069
1070        #[derive(Debug, Serialize, Deserialize)]
1071        #[serde(rename_all = "camelCase")]
1072        struct Thread {
1073            name: Option<String>,
1074            #[serde(default)]
1075            markers: ProfileMarkers,
1076            tid: Option<String>,
1077            pid: String,
1078            #[serde(flatten)]
1079            other: HashMap<String, Value>,
1080        }
1081
1082        #[derive(Default, Debug, Serialize, Deserialize)]
1083        #[serde(rename_all = "camelCase")]
1084        struct ProfileMarkers {
1085            length: usize,
1086            category: Vec<usize>,
1087            data: Vec<ProfileMarkerData>,
1088            start_time: Vec<Timestamp>,
1089            end_time: Vec<Timestamp>,
1090            name: Vec<usize>,
1091            phase: Vec<usize>,
1092        }
1093
1094        #[derive(Default, Debug, Serialize, Deserialize)]
1095        #[serde(rename_all = "camelCase")]
1096        struct ProfileMarkerData {
1097            #[serde(rename = "type")]
1098            type_: Cow<'static, str>,
1099            name: usize,
1100        }
1101
1102        #[derive(Debug, Serialize, Deserialize)]
1103        #[serde(rename_all = "camelCase")]
1104        struct Shared {
1105            string_array: Vec<String>,
1106            #[serde(flatten)]
1107            other: HashMap<String, Value>,
1108        }
1109
1110        impl Shared {
1111            fn add_name(&mut self, name: &str) -> usize {
1112                let index = self.string_array.len();
1113                self.string_array.push(name.into());
1114                index
1115            }
1116        }
1117
1118        #[derive(Default, Debug, Serialize, Deserialize)]
1119        #[serde(rename_all = "camelCase")]
1120        struct RawCounter {
1121            name: String,
1122            category: String,
1123            description: String,
1124            pid: String,
1125            main_thread_index: usize,
1126            samples: RawCounterSamplesTable,
1127            #[serde(flatten)]
1128            other: HashMap<String, Value>,
1129        }
1130
1131        #[derive(Default, Debug, Serialize, Deserialize)]
1132        #[serde(rename_all = "camelCase")]
1133        struct RawCounterSamplesTable {
1134            #[serde(default, skip_serializing_if = "Vec::is_empty")]
1135            time: Vec<Timestamp>,
1136            #[serde(default, skip_serializing_if = "Vec::is_empty")]
1137            time_deltas: Vec<Timestamp>,
1138            #[serde(default, skip_serializing_if = "Vec::is_empty")]
1139            number: Vec<usize>,
1140            count: Vec<i64>,
1141            length: usize,
1142            #[serde(flatten)]
1143            other: HashMap<String, Value>,
1144        }
1145    }
1146}
1147
1148/// Whether capturing is active.
1149static CAPTURING: AtomicBool = AtomicBool::new(false);
1150
1151/// End time of a marker.
1152#[derive(Clone, Debug)]
1153enum MarkerEnd {
1154    /// A particular end time for a [Span].
1155    At(Timestamp),
1156
1157    /// A possible end time for a [LongSpan].  If the span has not yet ended,
1158    /// this is `None`.
1159    Long(Arc<AtomicOptionTimestamp>),
1160}
1161
1162impl MarkerEnd {
1163    /// Returns false if this marker should be dropped because it is no longer
1164    /// significant.
1165    ///
1166    /// `capturing` specifies whether a capture is currently running.  More
1167    /// markers are relevant when a capture is running because they will be
1168    /// recorded when the capture ends.
1169    fn should_keep(&self, capturing: bool) -> bool {
1170        if let MarkerEnd::Long(timestamp) = self {
1171            if timestamp.load().is_some() {
1172                // Drop it if there is no capture running, because it ended
1173                // before any new capture can start.
1174                capturing
1175            } else if Arc::strong_count(timestamp) > 1 {
1176                // There's another owner who might eventually complete this span.
1177                true
1178            } else {
1179                // This span is orphaned and will never complete.  One could
1180                // argue for keeping it.  We drop it to avoid having a kind of
1181                // memory leak.
1182                //
1183                // We keep it if a capture is running, so that it is included in
1184                // the current capture.
1185                capturing
1186            }
1187        } else {
1188            false
1189        }
1190    }
1191
1192    /// Returns the inner timestamp, or `None` if there isn't one yet because
1193    /// this is an ongoing [LongSpan].
1194    fn timestamp(&self) -> Option<Timestamp> {
1195        match self {
1196            MarkerEnd::At(timestamp) => Some(*timestamp),
1197            MarkerEnd::Long(timestamp) => timestamp.load(),
1198        }
1199    }
1200}
1201
1202/// A single marker as captured.
1203#[derive(Clone, Debug)]
1204struct Marker {
1205    /// Start time.
1206    start: Timestamp,
1207    /// End time.
1208    end: MarkerEnd,
1209    /// Category (used for outer grouping).
1210    category: &'static str,
1211    /// Name (used for inner grouping).
1212    name: &'static str,
1213    /// Shown on hover.
1214    tooltip: String,
1215}
1216
1217/// Markers for a given thread.
1218#[derive(Clone)]
1219struct ThreadMarkers {
1220    /// The thread's tid.
1221    ///
1222    /// We need this to identify this thread in the profiler file.
1223    tid: usize,
1224
1225    /// The thread's name.
1226    ///
1227    /// The profiler gets thread names from the kernel, but they are truncated
1228    /// at 15 bytes.  We supply the full thread name.
1229    name: Option<String>,
1230
1231    /// The thread's markers.
1232    ///
1233    /// The thread itself records [Event]s and [Span]s by pushing them onto the
1234    /// queue.  [Capture::finish] pops them all off.
1235    queue: Arc<Queue>,
1236}
1237
1238impl ThreadMarkers {
1239    fn new(queue: Arc<Queue>) -> Self {
1240        #[cfg(target_os = "linux")]
1241        let tid = nix::unistd::gettid().as_raw() as usize;
1242        #[cfg(not(target_os = "linux"))]
1243        let tid = thread_id::get();
1244
1245        Self {
1246            tid,
1247            name: std::thread::current().name().map(|s| s.into()),
1248            queue,
1249        }
1250    }
1251}
1252
1253/// [ThreadMarkers] for every thread that has recorded a marker.
1254static ALL_THREAD_MARKERS: std::sync::Mutex<Vec<ThreadMarkers>> = std::sync::Mutex::new(Vec::new());
1255
1256/// Number of blocks of capacity left for allocation before we stop allocating.
1257///
1258/// If this is below zero, then we ran out and tried to allocate more anyhow.
1259static FREE_BLOCKS: AtomicI64 = AtomicI64::new(0);
1260
1261/// Number of [Marker]s we allocate in each [Block].
1262const MARKERS_PER_BLOCK: usize = 32;
1263
1264/// Number of bytes we account for each [Block].
1265const BYTES_PER_BLOCK: usize = MARKERS_PER_BLOCK * Span::BYTES;
1266
1267/// The time at which [FREE_BLOCKS] dropped below zero.
1268static MARKERS_EXHAUSTED: AtomicOptionTimestamp = AtomicOptionTimestamp::new(None);
1269
1270struct Block(Vec<Marker>);
1271impl Block {
1272    fn new(marker: Marker) -> Self {
1273        let mut markers = Vec::with_capacity(MARKERS_PER_BLOCK);
1274        markers.push(marker);
1275        Self(markers)
1276    }
1277    fn is_full(&self) -> bool {
1278        self.0.len() >= self.0.capacity()
1279    }
1280    fn push(&mut self, marker: Marker) {
1281        self.0.push(marker);
1282    }
1283}
1284
1285struct Blocks(Vec<Block>);
1286
1287impl Default for Blocks {
1288    fn default() -> Self {
1289        Self(Vec::with_capacity(32))
1290    }
1291}
1292
1293impl Blocks {
1294    fn push(&mut self, marker: Marker) {
1295        if let Some(block) = self.0.last_mut()
1296            && !block.is_full()
1297        {
1298            block.push(marker);
1299        } else {
1300            match FREE_BLOCKS.fetch_sub(1, Ordering::Relaxed) {
1301                1.. => self.0.push(Block::new(marker)),
1302                0 => {
1303                    // Record when marker space was exhausted.  The combination
1304                    // of `load` and `store` is not an atomic transaction, but
1305                    // it's good enough.
1306                    if MARKERS_EXHAUSTED.load().is_none() {
1307                        MARKERS_EXHAUSTED.store(Some(Timestamp::now()));
1308                    }
1309                }
1310                _ => (),
1311            }
1312        }
1313    }
1314
1315    fn iter(&self) -> impl Iterator<Item = &Marker> {
1316        self.0.iter().flat_map(|block| block.0.iter())
1317    }
1318}
1319
1320#[derive(Debug, Default)]
1321struct LongSpans {
1322    markers: Vec<Marker>,
1323}
1324
1325impl LongSpans {
1326    fn push(&mut self, marker: Marker) {
1327        if self.markers.len() == self.markers.capacity() {
1328            // Do garbage collection.
1329            let capturing = Capture::is_active();
1330            self.markers
1331                .retain(|marker| marker.end.should_keep(capturing));
1332        }
1333        self.markers.push(marker);
1334    }
1335
1336    fn append(&mut self, other: &mut Vec<Marker>) {
1337        if self.markers.is_empty() {
1338            swap(&mut self.markers, other);
1339        } else {
1340            self.markers.append(other);
1341        }
1342    }
1343}
1344
1345struct Queue {
1346    /// Records [Span] and [Event] markers.
1347    blocks: std::sync::Mutex<Blocks>,
1348
1349    /// Records [LongSpan] markers.
1350    ///
1351    /// These are recorded separately because they are not accounted the same
1352    /// way and because they need to be garbage collected.
1353    long_spans: std::sync::Mutex<LongSpans>,
1354}
1355
1356impl Queue {
1357    fn new() -> Arc<Self> {
1358        let queue = Arc::new(Self {
1359            blocks: Default::default(),
1360            long_spans: Default::default(),
1361        });
1362        ALL_THREAD_MARKERS
1363            .lock()
1364            .unwrap()
1365            .push(ThreadMarkers::new(queue.clone()));
1366        queue
1367    }
1368
1369    /// Adds `marker` if there's room with the limit.
1370    fn push(&self, marker: Marker) {
1371        self.blocks.lock().unwrap().push(marker);
1372    }
1373
1374    /// Adds `marker` to the collection of long spans.
1375    fn push_long_span(&self, marker: Marker) {
1376        self.long_spans.lock().unwrap().push(marker);
1377    }
1378
1379    fn take_blocks(&self) -> Blocks {
1380        std::mem::take(&mut *self.blocks.lock().unwrap())
1381    }
1382
1383    fn take_long_spans(&self) -> Vec<Marker> {
1384        let old_long_spans = std::mem::take(&mut *self.long_spans.lock().unwrap()).markers;
1385
1386        // Requeue the long spans that we removed that should stay in place.
1387        let mut new_long_spans = Vec::with_capacity(old_long_spans.capacity());
1388        for marker in &old_long_spans {
1389            if marker.end.should_keep(false) {
1390                new_long_spans.push(marker.clone());
1391            }
1392        }
1393        if !new_long_spans.is_empty() {
1394            self.long_spans.lock().unwrap().append(&mut new_long_spans);
1395        }
1396
1397        old_long_spans
1398    }
1399}
1400
1401thread_local! {
1402    static QUEUE: Arc<Queue> = Queue::new();
1403}