#![warn(missing_docs)]
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
fmt::{Debug, Display},
io::{Cursor, Read},
iter::{once, repeat_n},
mem::swap,
sync::{
Arc,
atomic::{AtomicBool, AtomicI64, Ordering},
},
thread::JoinHandle,
time::{Duration, Instant},
};
#[cfg(target_os = "macos")]
use std::time::{SystemTime, UNIX_EPOCH};
use crossbeam::sync::{Parker, Unparker};
use flate2::{
Compression,
bufread::{GzDecoder, GzEncoder},
};
use itertools::Itertools;
use memory_stats::memory_stats;
#[cfg(not(target_os = "macos"))]
use nix::time::{ClockId, clock_gettime};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use size_of::HumanBytes;
use tracing::warn;
#[derive(Debug)]
struct AtomicOptionTimestamp(AtomicI64);
impl Default for AtomicOptionTimestamp {
fn default() -> Self {
Self::new(None)
}
}
impl AtomicOptionTimestamp {
const fn new(value: Option<Timestamp>) -> Self {
Self(AtomicI64::new(match value {
Some(timestamp) => timestamp.0,
None => i64::MIN,
}))
}
fn load(&self) -> Option<Timestamp> {
let value = self.0.load(Ordering::Acquire);
(value != i64::MIN).then_some(Timestamp(value))
}
fn store(&self, value: Option<Timestamp>) {
self.0.store(
value.map_or(i64::MIN, |timestamp| timestamp.0),
Ordering::Release,
)
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
#[repr(transparent)]
struct Timestamp(
i64,
);
#[cfg(target_os = "macos")]
fn mach_absolute_time_nanos() -> i64 {
use mach2::mach_time::{mach_absolute_time, mach_timebase_info, mach_timebase_info_data_t};
use std::sync::OnceLock;
static NANOS_PER_TICK: OnceLock<(u32, u32)> = OnceLock::new();
let (numer, denom) = *NANOS_PER_TICK.get_or_init(|| {
let mut info = mach_timebase_info_data_t { numer: 0, denom: 0 };
unsafe {
mach_timebase_info(&mut info);
}
if info.denom == 0 {
(1, 1)
} else {
(info.numer, info.denom)
}
});
let ticks = unsafe { mach_absolute_time() };
(ticks * u64::from(numer) / u64::from(denom)) as i64
}
impl Timestamp {
fn now() -> Self {
#[cfg(target_os = "macos")]
{
Self(mach_absolute_time_nanos())
}
#[cfg(not(target_os = "macos"))]
{
let now = clock_gettime(ClockId::CLOCK_MONOTONIC).unwrap();
Self(now.tv_sec() as i64 * 1_000_000_000 + now.tv_nsec() as i64)
}
}
fn saturating_sub(self, other: Self) -> Duration {
if self.0 >= other.0 {
Duration::from_nanos(self.0.abs_diff(other.0))
} else {
Duration::ZERO
}
}
}
#[cfg(target_os = "macos")]
fn unix_epoch_nanos() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system clock is before Unix epoch")
.as_nanos() as i64
}
impl From<Instant> for Timestamp {
fn from(value: Instant) -> Self {
let zero = unsafe { std::mem::zeroed::<Instant>() };
Self((value - zero).as_nanos() as i64)
}
}
impl Display for Timestamp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl Serialize for Timestamp {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let milliseconds = self.0 as f64 / 1_000_000.0;
milliseconds.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for Timestamp {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let milliseconds = f64::deserialize(deserializer)?;
Ok(Self((milliseconds * 1_000_000.0) as i64))
}
}
impl Debug for Span {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Span")?;
if let Some(inner) = &self.0 {
write!(f, "({})", &inner.name)?;
}
Ok(())
}
}
struct SpanInner {
start: Timestamp,
category: &'static str,
name: &'static str,
tooltip: String,
}
impl SpanInner {
#[cold]
fn new(name: &'static str) -> Self {
Self {
start: Timestamp::now(),
category: "Other",
name,
tooltip: String::new(),
}
}
fn into_marker(self, end: MarkerEnd) -> Marker {
Marker {
start: self.start,
end,
category: self.category,
name: self.name,
tooltip: self.tooltip,
}
}
#[cold]
fn record(self, end: MarkerEnd) {
QUEUE.with(|queue| queue.push(self.into_marker(end)));
}
}
pub struct Span(Option<SpanInner>);
impl Span {
pub const BYTES: usize = std::mem::size_of::<Marker>();
#[must_use]
pub fn new(name: &'static str) -> Self {
Self(Capture::is_active().then(|| SpanInner::new(name)))
}
#[must_use]
pub fn with_category(mut self, category: &'static str) -> Self {
if let Some(inner) = &mut self.0 {
inner.category = category;
}
self
}
#[must_use]
pub fn with_tooltip<F>(mut self, tooltip: F) -> Self
where
F: FnOnce() -> String,
{
if let Some(inner) = &mut self.0 {
inner.tooltip = tooltip();
}
self
}
#[must_use]
pub fn with_start(mut self, start: Instant) -> Self {
if let Some(inner) = &mut self.0 {
inner.start = start.into();
}
self
}
pub fn in_scope<F, T>(self, f: F) -> T
where
F: FnOnce() -> T,
{
f()
}
pub fn record(self) {
}
pub fn cancel(mut self) {
let _ = self.0.take();
}
}
impl Drop for Span {
fn drop(&mut self) {
if let Some(inner) = self.0.take() {
inner.record(MarkerEnd::At(Timestamp::now()))
}
}
}
pub struct LongSpanBuilder(SpanInner);
impl LongSpanBuilder {
#[must_use]
pub fn new(name: &'static str) -> Self {
Self(SpanInner::new(name))
}
#[must_use]
pub fn with_category(mut self, category: &'static str) -> Self {
self.0.category = category;
self
}
#[must_use]
pub fn with_tooltip(mut self, tooltip: impl Into<String>) -> Self {
self.0.tooltip = tooltip.into();
self
}
#[must_use]
pub fn with_start(mut self, start: Instant) -> Self {
self.0.start = start.into();
self
}
#[must_use]
pub fn build(self) -> LongSpan {
let timestamp = Arc::new(AtomicOptionTimestamp::default());
QUEUE.with(|queue| {
queue.push_long_span(self.0.into_marker(MarkerEnd::Long(timestamp.clone())))
});
LongSpan(timestamp)
}
}
pub struct LongSpan(Arc<AtomicOptionTimestamp>);
impl LongSpan {
pub fn complete(self) {
}
}
impl Drop for LongSpan {
fn drop(&mut self) {
self.0.store(Some(Timestamp::now()));
}
}
pub struct Event(Option<SpanInner>);
impl Event {
#[must_use]
pub fn new(name: &'static str) -> Self {
Self(Capture::is_active().then(|| SpanInner::new(name)))
}
#[must_use]
pub fn with_category(mut self, category: &'static str) -> Self {
if let Some(inner) = &mut self.0 {
inner.category = category;
}
self
}
#[must_use]
pub fn with_tooltip<F>(mut self, tooltip: F) -> Self
where
F: FnOnce() -> String,
{
if let Some(inner) = &mut self.0 {
inner.tooltip = tooltip();
}
self
}
pub fn record(self) {
if let Some(inner) = self.0 {
let end = MarkerEnd::At(inner.start);
inner.record(end);
}
}
}
#[derive(Clone, Debug)]
pub struct CaptureOptions {
memory_limit: Option<usize>,
record_rss: bool,
}
impl Default for CaptureOptions {
fn default() -> Self {
Self {
memory_limit: None,
record_rss: true,
}
}
}
impl CaptureOptions {
pub fn new() -> Self {
Self::default()
}
pub fn with_memory_limit(self, memory_limit: Option<usize>) -> Self {
Self {
memory_limit,
..self
}
}
pub fn with_record_rss(self, record_rss: bool) -> Self {
Self { record_rss, ..self }
}
pub async fn start(self) -> Capture {
Capture::new(self, CAPTURE_MUTEX.lock().await)
}
pub fn blocking_start(self) -> Capture {
Capture::new(self, CAPTURE_MUTEX.blocking_lock())
}
pub fn try_start(self) -> Result<Capture, Self> {
let guard = match CAPTURE_MUTEX.try_lock() {
Ok(guard) => guard,
Err(_) => return Err(self),
};
Ok(Capture::new(self, guard))
}
}
static CAPTURE_MUTEX: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
pub struct Capture {
_guard: tokio::sync::MutexGuard<'static, ()>,
start_time: Timestamp,
memory: Option<JoinHandle<Vec<(Timestamp, usize)>>>,
unparker: Unparker,
request_exit: Arc<AtomicBool>,
block_limit: i64,
#[cfg(target_os = "macos")]
anchor: (Timestamp, i64),
}
impl Capture {
fn new(params: CaptureOptions, guard: tokio::sync::MutexGuard<'static, ()>) -> Self {
let start = Timestamp::now();
if let Some(memory_limit) = params.memory_limit {
tracing::info!(
"marker capture limited to {}",
HumanBytes::from(memory_limit)
);
}
let block_limit = params.memory_limit.map_or(i64::MAX, |memory_limit| {
(memory_limit / BYTES_PER_BLOCK) as i64
});
let parker = Parker::new();
let unparker = parker.unparker().clone();
let request_exit = Arc::new(AtomicBool::new(false));
let memory = params.record_rss.then(|| {
std::thread::Builder::new()
.name(String::from("capture-rss"))
.spawn({
let request_exit = request_exit.clone();
move || {
let mut memory = Vec::new();
while !request_exit.load(Ordering::Acquire) {
if let Some(memory_stats) = memory_stats() {
memory.push((Timestamp::now(), memory_stats.physical_mem));
}
parker.park_timeout(Duration::from_millis(100));
}
memory
}
})
.expect("should be able to start a capture thread")
});
FREE_BLOCKS.store(block_limit, Ordering::Relaxed);
MARKERS_EXHAUSTED.store(None);
CAPTURING.store(true, Ordering::Release);
Self {
start_time: start,
block_limit,
memory,
unparker,
request_exit,
#[cfg(target_os = "macos")]
anchor: (Timestamp::now(), unix_epoch_nanos()),
_guard: guard,
}
}
pub fn finish(mut self) -> Annotations {
let end_time = Timestamp::now();
CAPTURING.store(false, Ordering::Release);
let markers_exhausted = MARKERS_EXHAUSTED.load();
let free_blocks = FREE_BLOCKS.load(Ordering::Relaxed);
let used =
HumanBytes::from((self.block_limit - free_blocks.max(0)) as usize * BYTES_PER_BLOCK);
if free_blocks < 0 {
} else {
tracing::info!("marker capture used {used}");
}
let mut markers: HashMap<usize, (Option<String>, Blocks)> = self
.all_threads()
.into_iter()
.map(|thread| {
let mut blocks = thread.queue.take_blocks();
let long_spans = thread.queue.take_long_spans();
if !long_spans.is_empty() {
blocks.0.push(Block(long_spans));
}
(thread.tid, (thread.name, blocks))
})
.collect();
if let Some(markers_exhausted) = markers_exhausted {
let elapsed = markers_exhausted.saturating_sub(self.start_time);
let tooltip = format!(
"marker capture exceeded the limit ({used}) after {:.1} s",
elapsed.as_secs_f64()
);
tracing::info!("{tooltip}");
let marker = Marker {
start: self.start_time,
end: MarkerEnd::At(markers_exhausted),
category: "profiling",
name: "Profiling",
tooltip,
};
markers
.entry(nix::unistd::getpid().as_raw() as usize)
.or_default()
.1
.0
.push(Block::new(marker));
}
Annotations {
end_time,
markers,
memory: self.take_memory(),
#[cfg(target_os = "macos")]
anchor: self.anchor,
}
}
pub fn abort(self) {
tracing::info!("aborting profile annotation capture");
}
pub fn is_active() -> bool {
CAPTURING.load(Ordering::Acquire)
}
fn all_threads(&mut self) -> Vec<ThreadMarkers> {
ALL_THREAD_MARKERS.lock().unwrap().clone()
}
fn take_memory(&mut self) -> Vec<(Timestamp, usize)> {
if let Some(memory) = self.memory.take() {
self.request_exit.store(true, Ordering::Release);
self.unparker.unpark();
memory.join().unwrap_or_default()
} else {
Default::default()
}
}
}
impl Drop for Capture {
fn drop(&mut self) {
self.take_memory();
CAPTURING.store(false, Ordering::Release);
}
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Error decompressing profile ")]
GzDecoderError(#[from] std::io::Error),
#[error("Error parsing profile")]
SerdeError(#[from] serde_json_path_to_error::Error),
}
#[derive(Default, Clone, Debug)]
pub struct AnnotationOptions {
product: Option<String>,
os_cpu: Option<String>,
}
impl AnnotationOptions {
pub fn new() -> Self {
Self::default()
}
pub fn with_product(self, product: Option<impl Into<String>>) -> Self {
Self {
product: product.map(|s| s.into()),
..self
}
}
pub fn with_os_cpu(self, os_cpu: Option<impl Into<String>>) -> Self {
Self {
os_cpu: os_cpu.map(|s| s.into()),
..self
}
}
}
pub struct Annotations {
end_time: Timestamp,
markers: HashMap<usize, (Option<String>, Blocks)>,
memory: Vec<(Timestamp, usize)>,
#[cfg(target_os = "macos")]
anchor: (Timestamp, i64),
}
impl Annotations {
pub fn apply(&self, profile: &[u8], options: AnnotationOptions) -> Result<Vec<u8>, Error> {
let mut buffer = Vec::new();
let json = if profile.starts_with(&[0x1f, 0x8b]) {
GzDecoder::new(profile).read_to_end(&mut buffer)?;
&buffer
} else {
profile
};
let gzip = !buffer.is_empty();
let mut profile = serde_json_path_to_error::from_slice::<Profile>(json)?;
#[cfg(target_os = "macos")]
let to_profile_time = {
let profile_start_time_ms = profile.meta.start_time;
let (anchor_monotonic_ns, anchor_wall_clock_ns) = self.anchor;
let profile_start_wall_clock_ns = (profile_start_time_ms * 1_000_000.0) as i64;
let adjustment_ns = anchor_wall_clock_ns - profile_start_wall_clock_ns;
move |timestamp: Timestamp| {
Timestamp(timestamp.0 - anchor_monotonic_ns.0 + adjustment_ns)
}
};
#[cfg(not(target_os = "macos"))]
let to_profile_time = |timestamp: Timestamp| timestamp;
if let Some(product) = options.product {
profile.meta.product = product;
}
if let Some(os_cpu) = options.os_cpu {
profile.meta.os_cpu = os_cpu;
}
profile.meta.marker_schema.push(json!({
"name": "FelderaMarker",
"display": [
"marker-chart",
"marker-table"
],
"chartLabel": "{marker.data.name}",
"tooltipLabel": "{marker.data.name}",
"tableLabel": "{marker.data.name}",
"description": "Marker generated by Feldera.",
"fields": [
{
"key": "name",
"label": "Name",
"format": "unique-string"
}
]
}));
static CATEGORY_COLORS: [&str; 12] = [
"purple",
"green",
"orange",
"yellow",
"lightblue",
"blue",
"brown",
"magenta",
"red",
"lightred",
"darkgrey",
"grey",
];
let mut categories = profile
.meta
.categories
.iter()
.enumerate()
.map(|(index, category)| (category.name.clone(), index))
.collect::<HashMap<_, _>>();
for (category, color) in self
.markers
.values()
.flat_map(|(_, markers)| markers.iter())
.map(|marker| marker.category)
.collect::<HashSet<_>>()
.into_iter()
.zip(CATEGORY_COLORS.iter().cycle())
{
categories.insert(category.into(), profile.meta.categories.len());
profile.meta.categories.push(Category {
color: (*color).into(),
name: category.into(),
other: [(String::from("subcategories"), json!(["Other"]))]
.into_iter()
.collect(),
});
}
for thread in &mut profile.threads {
if let Some(tid) = &thread.tid
&& let Ok(tid) = tid.parse::<usize>()
&& let Some((name, markers)) = self.markers.get(&tid)
{
if let Some(name) = name {
thread.name = Some(name.clone());
}
for marker in markers.iter() {
thread.markers.length += 1;
thread.markers.category.push(categories[marker.category]);
thread.markers.data.push(ProfileMarkerData {
type_: Cow::from("FelderaMarker"),
name: profile.shared.add_name(&marker.tooltip),
});
thread
.markers
.start_time
.push(to_profile_time(marker.start));
thread.markers.end_time.push(to_profile_time(
marker.end.timestamp().unwrap_or(self.end_time),
));
thread
.markers
.name
.push(profile.shared.add_name(marker.name));
thread.markers.phase.push(1);
}
}
}
if !self.memory.is_empty() {
profile.counters.push(RawCounter {
name: String::from("RSS"),
category: String::from("Memory"),
description: String::from("RSS in bytes"),
pid: profile.threads.first().unwrap().pid.clone(),
main_thread_index: 0,
samples: {
RawCounterSamplesTable {
time: self
.memory
.iter()
.map(|(time, _rss)| to_profile_time(*time))
.collect(),
time_deltas: Vec::new(),
number: repeat_n(0, self.memory.len()).collect(),
count: once(self.memory[0].1 as i64)
.chain(
self.memory
.iter()
.map(|(_time, rss)| *rss as i64)
.tuple_windows()
.map(|(prev, next)| next - prev),
)
.collect(),
length: self.memory.len(),
other: Default::default(),
}
},
other: Default::default(),
});
}
let output = serde_json::to_vec(&profile).unwrap();
let output = if gzip {
let mut gzipped_output = Vec::new();
GzEncoder::new(Cursor::new(output), Compression::fast())
.read_to_end(&mut gzipped_output)
.unwrap();
gzipped_output
} else {
output
};
return Ok(output);
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct Profile {
meta: Meta,
threads: Vec<Thread>,
#[serde(default)]
counters: Vec<RawCounter>,
shared: Shared,
#[serde(flatten)]
other: HashMap<String, Value>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct Meta {
product: String,
#[serde(rename = "oscpu")]
os_cpu: String,
start_time: f64,
categories: Vec<Category>,
marker_schema: Vec<Value>,
#[serde(flatten)]
other: HashMap<String, Value>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct Category {
color: String,
name: String,
#[serde(flatten)]
other: HashMap<String, Value>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct Thread {
name: Option<String>,
#[serde(default)]
markers: ProfileMarkers,
tid: Option<String>,
pid: String,
#[serde(flatten)]
other: HashMap<String, Value>,
}
#[derive(Default, Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct ProfileMarkers {
length: usize,
category: Vec<usize>,
data: Vec<ProfileMarkerData>,
start_time: Vec<Timestamp>,
end_time: Vec<Timestamp>,
name: Vec<usize>,
phase: Vec<usize>,
}
#[derive(Default, Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct ProfileMarkerData {
#[serde(rename = "type")]
type_: Cow<'static, str>,
name: usize,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct Shared {
string_array: Vec<String>,
#[serde(flatten)]
other: HashMap<String, Value>,
}
impl Shared {
fn add_name(&mut self, name: &str) -> usize {
let index = self.string_array.len();
self.string_array.push(name.into());
index
}
}
#[derive(Default, Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct RawCounter {
name: String,
category: String,
description: String,
pid: String,
main_thread_index: usize,
samples: RawCounterSamplesTable,
#[serde(flatten)]
other: HashMap<String, Value>,
}
#[derive(Default, Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct RawCounterSamplesTable {
#[serde(default, skip_serializing_if = "Vec::is_empty")]
time: Vec<Timestamp>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
time_deltas: Vec<Timestamp>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
number: Vec<usize>,
count: Vec<i64>,
length: usize,
#[serde(flatten)]
other: HashMap<String, Value>,
}
}
}
static CAPTURING: AtomicBool = AtomicBool::new(false);
#[derive(Clone, Debug)]
enum MarkerEnd {
At(Timestamp),
Long(Arc<AtomicOptionTimestamp>),
}
impl MarkerEnd {
fn should_keep(&self, capturing: bool) -> bool {
if let MarkerEnd::Long(timestamp) = self {
if timestamp.load().is_some() {
capturing
} else if Arc::strong_count(timestamp) > 1 {
true
} else {
capturing
}
} else {
false
}
}
fn timestamp(&self) -> Option<Timestamp> {
match self {
MarkerEnd::At(timestamp) => Some(*timestamp),
MarkerEnd::Long(timestamp) => timestamp.load(),
}
}
}
#[derive(Clone, Debug)]
struct Marker {
start: Timestamp,
end: MarkerEnd,
category: &'static str,
name: &'static str,
tooltip: String,
}
#[derive(Clone)]
struct ThreadMarkers {
tid: usize,
name: Option<String>,
queue: Arc<Queue>,
}
impl ThreadMarkers {
fn new(queue: Arc<Queue>) -> Self {
#[cfg(target_os = "linux")]
let tid = nix::unistd::gettid().as_raw() as usize;
#[cfg(not(target_os = "linux"))]
let tid = thread_id::get();
Self {
tid,
name: std::thread::current().name().map(|s| s.into()),
queue,
}
}
}
static ALL_THREAD_MARKERS: std::sync::Mutex<Vec<ThreadMarkers>> = std::sync::Mutex::new(Vec::new());
static FREE_BLOCKS: AtomicI64 = AtomicI64::new(0);
const MARKERS_PER_BLOCK: usize = 32;
const BYTES_PER_BLOCK: usize = MARKERS_PER_BLOCK * Span::BYTES;
static MARKERS_EXHAUSTED: AtomicOptionTimestamp = AtomicOptionTimestamp::new(None);
struct Block(Vec<Marker>);
impl Block {
fn new(marker: Marker) -> Self {
let mut markers = Vec::with_capacity(MARKERS_PER_BLOCK);
markers.push(marker);
Self(markers)
}
fn is_full(&self) -> bool {
self.0.len() >= self.0.capacity()
}
fn push(&mut self, marker: Marker) {
self.0.push(marker);
}
}
struct Blocks(Vec<Block>);
impl Default for Blocks {
fn default() -> Self {
Self(Vec::with_capacity(32))
}
}
impl Blocks {
fn push(&mut self, marker: Marker) {
if let Some(block) = self.0.last_mut()
&& !block.is_full()
{
block.push(marker);
} else {
match FREE_BLOCKS.fetch_sub(1, Ordering::Relaxed) {
1.. => self.0.push(Block::new(marker)),
0 => {
if MARKERS_EXHAUSTED.load().is_none() {
MARKERS_EXHAUSTED.store(Some(Timestamp::now()));
}
}
_ => (),
}
}
}
fn iter(&self) -> impl Iterator<Item = &Marker> {
self.0.iter().flat_map(|block| block.0.iter())
}
}
#[derive(Debug, Default)]
struct LongSpans {
markers: Vec<Marker>,
}
impl LongSpans {
fn push(&mut self, marker: Marker) {
if self.markers.len() == self.markers.capacity() {
let capturing = Capture::is_active();
self.markers
.retain(|marker| marker.end.should_keep(capturing));
}
self.markers.push(marker);
}
fn append(&mut self, other: &mut Vec<Marker>) {
if self.markers.is_empty() {
swap(&mut self.markers, other);
} else {
self.markers.append(other);
}
}
}
struct Queue {
blocks: std::sync::Mutex<Blocks>,
long_spans: std::sync::Mutex<LongSpans>,
}
impl Queue {
fn new() -> Arc<Self> {
let queue = Arc::new(Self {
blocks: Default::default(),
long_spans: Default::default(),
});
ALL_THREAD_MARKERS
.lock()
.unwrap()
.push(ThreadMarkers::new(queue.clone()));
queue
}
fn push(&self, marker: Marker) {
self.blocks.lock().unwrap().push(marker);
}
fn push_long_span(&self, marker: Marker) {
self.long_spans.lock().unwrap().push(marker);
}
fn take_blocks(&self) -> Blocks {
std::mem::take(&mut *self.blocks.lock().unwrap())
}
fn take_long_spans(&self) -> Vec<Marker> {
let old_long_spans = std::mem::take(&mut *self.long_spans.lock().unwrap()).markers;
let mut new_long_spans = Vec::with_capacity(old_long_spans.capacity());
for marker in &old_long_spans {
if marker.end.should_keep(false) {
new_long_spans.push(marker.clone());
}
}
if !new_long_spans.is_empty() {
self.long_spans.lock().unwrap().append(&mut new_long_spans);
}
old_long_spans
}
}
thread_local! {
static QUEUE: Arc<Queue> = Queue::new();
}