#![warn(missing_docs)]
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
fmt::{Debug, Display},
io::{Cursor, Read},
sync::{
Arc, Mutex,
atomic::{AtomicBool, AtomicI64, Ordering},
},
time::Instant,
};
use flate2::{
Compression,
bufread::{GzDecoder, GzEncoder},
};
use nix::time::{ClockId, clock_gettime};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use size_of::HumanBytes;
use tracing::warn;
#[derive(Copy, Clone, Debug)]
struct Timestamp(
i64,
);
impl Timestamp {
fn now() -> Self {
let now = clock_gettime(ClockId::CLOCK_MONOTONIC).unwrap();
Self(now.tv_sec() as i64 * 1_000_000_000 + now.tv_nsec() as i64)
}
}
impl From<Instant> for Timestamp {
fn from(value: Instant) -> Self {
let zero = unsafe { std::mem::zeroed::<Instant>() };
Self((value - zero).as_nanos() as i64)
}
}
impl Display for Timestamp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl Serialize for Timestamp {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let microseconds = self.0 as f64 / 1_000_000.0;
microseconds.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for Timestamp {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let microseconds = f64::deserialize(deserializer)?;
Ok(Self((microseconds * 1_000_000.0) as i64))
}
}
impl Debug for SamplySpan {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "SamplySpan")?;
if let Some(inner) = &self.0 {
write!(f, "({})", &inner.name)?;
}
Ok(())
}
}
struct SpanInner {
start: Timestamp,
category: &'static str,
name: &'static str,
tooltip: String,
}
impl SpanInner {
#[cold]
fn new(name: &'static str) -> Self {
Self {
start: Timestamp::now(),
category: "Other",
name,
tooltip: String::new(),
}
}
#[cold]
fn record(self, is_span: bool) {
let marker = Marker {
start: self.start,
end: if is_span {
Timestamp::now()
} else {
self.start
},
category: self.category,
name: self.name,
tooltip: self.tooltip,
};
QUEUE.with(|queue| queue.push(marker));
}
}
pub struct SamplySpan(Option<SpanInner>);
impl SamplySpan {
#[must_use]
pub fn new(name: &'static str) -> Self {
Self(markers_enabled().then(|| SpanInner::new(name)))
}
#[must_use]
pub fn with_category(mut self, category: &'static str) -> Self {
if let Some(inner) = &mut self.0 {
inner.category = category;
}
self
}
#[must_use]
pub fn with_tooltip<F>(mut self, tooltip: F) -> Self
where
F: FnOnce() -> String,
{
if let Some(inner) = &mut self.0 {
inner.tooltip = tooltip();
}
self
}
#[must_use]
pub fn with_start(mut self, start: Instant) -> Self {
if let Some(inner) = &mut self.0 {
inner.start = start.into();
}
self
}
pub fn in_scope<F, T>(self, f: F) -> T
where
F: FnOnce() -> T,
{
f()
}
pub fn record(self) {
}
}
impl Drop for SamplySpan {
fn drop(&mut self) {
if let Some(inner) = self.0.take() {
inner.record(true)
}
}
}
pub struct SamplyEvent(Option<SpanInner>);
impl SamplyEvent {
#[must_use]
pub fn new(name: &'static str) -> Self {
Self(markers_enabled().then(|| SpanInner::new(name)))
}
#[must_use]
pub fn with_category(mut self, category: &'static str) -> Self {
if let Some(inner) = &mut self.0 {
inner.category = category;
}
self
}
#[must_use]
pub fn with_tooltip<F>(mut self, tooltip: F) -> Self
where
F: FnOnce() -> String,
{
if let Some(inner) = &mut self.0 {
inner.tooltip = tooltip();
}
self
}
pub fn record(self) {
if let Some(inner) = self.0 {
inner.record(false);
}
}
}
pub struct Markers(HashMap<usize, (Option<String>, Vec<Vec<Marker>>)>);
impl Markers {
pub async fn capture<F, E>(memory_limit: Option<usize>, f: F) -> Result<Self, E>
where
F: Future<Output = Result<(), E>>,
{
static EXCLUSIVE: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
let _guard = EXCLUSIVE.lock().await;
if let Some(memory_limit) = memory_limit {
tracing::info!(
"marker capture limited to {}",
HumanBytes::from(memory_limit)
);
}
let initial_blocks =
memory_limit.map_or(i64::MAX, |memory_limit| (memory_limit / BLOCK_BYTES) as i64);
FREE_BLOCKS.store(initial_blocks, Ordering::Relaxed);
ENABLE_MARKERS.store(true, Ordering::Release);
let result = f.await;
ENABLE_MARKERS.store(false, Ordering::Release);
let remaining_blocks = FREE_BLOCKS.load(Ordering::Relaxed);
if remaining_blocks <= 0 {
tracing::info!("marker capture exceeded the limit");
} else {
let used_bytes = (initial_blocks - remaining_blocks) as usize * BLOCK_BYTES;
tracing::info!("marker capture used {}", HumanBytes::from(used_bytes));
}
result?;
let all_threads = ALL_THREAD_MARKERS.lock().unwrap();
let mut markers = HashMap::new();
for thread in &*all_threads {
markers.insert(thread.tid, (thread.name.clone(), thread.queue.take()));
}
Ok(Self(markers))
}
pub fn annotate_profile(
&self,
profile: &[u8],
product: Option<&str>,
os_cpu: Option<&str>,
) -> anyhow::Result<Vec<u8>> {
let mut json = Vec::new();
GzDecoder::new(profile).read_to_end(&mut json)?;
let mut profile = serde_json_path_to_error::from_slice::<Profile>(&json)?;
if let Some(product) = product {
profile.meta.product = product.into();
}
if let Some(os_cpu) = os_cpu {
profile.meta.os_cpu = os_cpu.into();
}
profile.meta.marker_schema.push(json!({
"name": "FelderaMarker",
"display": [
"marker-chart",
"marker-table"
],
"chartLabel": "{marker.data.name}",
"tooltipLabel": "{marker.data.name}",
"tableLabel": "{marker.data.name}",
"description": "Marker generated by Feldera.",
"fields": [
{
"key": "name",
"label": "Name",
"format": "unique-string"
}
]
}));
static CATEGORY_COLORS: [&str; 12] = [
"purple",
"green",
"orange",
"yellow",
"lightblue",
"blue",
"brown",
"magenta",
"red",
"lightred",
"darkgrey",
"grey",
];
let mut categories = profile
.meta
.categories
.iter()
.enumerate()
.map(|(index, category)| (category.name.clone(), index))
.collect::<HashMap<_, _>>();
for (category, color) in self
.0
.values()
.flat_map(|(_, markers)| markers.iter().flatten())
.map(|marker| marker.category)
.collect::<HashSet<_>>()
.into_iter()
.zip(CATEGORY_COLORS.iter().cycle())
{
categories.insert(category.into(), profile.meta.categories.len());
profile.meta.categories.push(Category {
color: (*color).into(),
name: category.into(),
other: [(String::from("subcategories"), json!(["Other"]))]
.into_iter()
.collect(),
});
}
for thread in &mut profile.threads {
if let Some(tid) = &thread.tid
&& let Ok(tid) = tid.parse::<usize>()
&& let Some((name, markers)) = self.0.get(&tid)
{
if let Some(name) = name {
thread.name = Some(name.clone());
}
for marker in markers.iter().flatten() {
thread.markers.length += 1;
thread.markers.category.push(categories[marker.category]);
thread.markers.data.push(ProfileMarkerData {
type_: Cow::from("FelderaMarker"),
name: profile.shared.add_name(&marker.tooltip),
});
thread.markers.start_time.push(marker.start);
thread.markers.end_time.push(marker.end);
thread
.markers
.name
.push(profile.shared.add_name(marker.name));
thread.markers.phase.push(1);
}
}
}
let mut output = Vec::new();
GzEncoder::new(
Cursor::new(serde_json::to_vec(&profile).unwrap()),
Compression::fast(),
)
.read_to_end(&mut output)
.unwrap();
return Ok(output);
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct Profile {
meta: Meta,
threads: Vec<Thread>,
shared: Shared,
#[serde(flatten)]
other: HashMap<String, Value>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct Meta {
product: String,
#[serde(rename = "oscpu")]
os_cpu: String,
categories: Vec<Category>,
marker_schema: Vec<Value>,
#[serde(flatten)]
other: HashMap<String, Value>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct Category {
color: String,
name: String,
#[serde(flatten)]
other: HashMap<String, Value>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct Thread {
name: Option<String>,
#[serde(default)]
markers: ProfileMarkers,
tid: Option<String>,
#[serde(flatten)]
other: HashMap<String, Value>,
}
#[derive(Default, Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct ProfileMarkers {
length: usize,
category: Vec<usize>,
data: Vec<ProfileMarkerData>,
start_time: Vec<Timestamp>,
end_time: Vec<Timestamp>,
name: Vec<usize>,
phase: Vec<usize>,
}
#[derive(Default, Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct ProfileMarkerData {
#[serde(rename = "type")]
type_: Cow<'static, str>,
name: usize,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct Shared {
string_array: Vec<String>,
#[serde(flatten)]
other: HashMap<String, Value>,
}
impl Shared {
fn add_name(&mut self, name: &str) -> usize {
let index = self.string_array.len();
self.string_array.push(name.into());
index
}
}
}
}
static ENABLE_MARKERS: AtomicBool = AtomicBool::new(false);
fn markers_enabled() -> bool {
ENABLE_MARKERS.load(Ordering::Acquire)
}
struct Marker {
start: Timestamp,
end: Timestamp,
category: &'static str,
name: &'static str,
tooltip: String,
}
struct ThreadMarkers {
tid: usize,
name: Option<String>,
queue: Arc<Queue>,
}
impl ThreadMarkers {
fn new(queue: Arc<Queue>) -> Self {
#[cfg(target_os = "linux")]
let tid = nix::unistd::gettid().as_raw() as usize;
#[cfg(not(target_os = "linux"))]
let tid = thread_id::get();
Self {
tid,
name: std::thread::current().name().map(|s| s.into()),
queue,
}
}
}
static ALL_THREAD_MARKERS: std::sync::Mutex<Vec<ThreadMarkers>> = std::sync::Mutex::new(Vec::new());
static FREE_BLOCKS: AtomicI64 = AtomicI64::new(0);
const BLOCK_CAPACITY: usize = 32;
const BLOCK_BYTES: usize = BLOCK_CAPACITY * MARKER_BYTES;
pub const MARKER_BYTES: usize = std::mem::size_of::<Marker>();
struct Queue(Mutex<Vec<Vec<Marker>>>);
impl Queue {
fn new() -> Arc<Self> {
let queue = Arc::new(Self(Mutex::new(Vec::with_capacity(32))));
ALL_THREAD_MARKERS
.lock()
.unwrap()
.push(ThreadMarkers::new(queue.clone()));
queue
}
fn push(&self, marker: Marker) {
let mut queue = self.0.lock().unwrap();
if let Some(block) = queue.last_mut()
&& block.len() < block.capacity()
{
block.push(marker);
} else {
match FREE_BLOCKS.fetch_sub(1, Ordering::Relaxed) {
1.. => {
let mut block = Vec::with_capacity(BLOCK_CAPACITY);
block.push(marker);
queue.push(block);
}
0 => warn!("marker capture space exhausted"),
_ => (),
}
}
}
fn take(&self) -> Vec<Vec<Marker>> {
std::mem::take(&mut *self.0.lock().unwrap())
}
}
thread_local! {
static QUEUE: Arc<Queue> = Queue::new();
}