use std::borrow::Cow;
use std::cell::UnsafeCell;
use std::collections::HashMap;
use std::sync::LazyLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use fastant::Anchor;
use fastant::Instant;
use parking_lot::Mutex;
use crate::collector::CollectToken;
use crate::collector::Config;
use crate::collector::EventRecord;
use crate::collector::SpanContext;
use crate::collector::SpanId;
use crate::collector::SpanRecord;
use crate::collector::SpanSet;
use crate::collector::TraceId;
use crate::collector::command::CancelCollect;
use crate::collector::command::CollectCommand;
use crate::collector::command::DropCollect;
use crate::collector::command::StartCollect;
use crate::collector::command::SubmitSpans;
use crate::local::local_collector::LocalSpansInner;
use crate::local::raw_span::RawKind;
use crate::local::raw_span::RawSpan;
use crate::util::command_bus::CommandBus;
use crate::util::command_bus::CommandSender;
static NEXT_COLLECT_ID: AtomicUsize = AtomicUsize::new(0);
static GLOBAL_COLLECTOR: Mutex<Option<GlobalCollector>> = Mutex::new(None);
static REPORTER_READY: AtomicBool = AtomicBool::new(false);
static COMMAND_BUS: LazyLock<CommandBus<CollectCommand>> = LazyLock::new(CommandBus::new);
pub const NOT_SAMPLED_COLLECT_ID: usize = usize::MAX;
thread_local! {
static COMMAND_SENDER: UnsafeCell<CommandSender<CollectCommand>> = {
const CHANNEL_SIZE: usize = 10240;
let tx = COMMAND_BUS.sender(CHANNEL_SIZE);
UnsafeCell::new(tx)
};
}
fn send_command(cmd: CollectCommand) {
if !reporter_ready() {
return;
}
COMMAND_SENDER
.try_with(|sender| unsafe { (*sender.get()).send(cmd) })
.ok();
}
fn reporter_ready() -> bool {
REPORTER_READY.load(Ordering::Relaxed)
}
pub fn set_reporter(reporter: impl Reporter, config: Config) {
#[cfg(feature = "enable")]
{
GlobalCollector::start(reporter, config);
}
}
pub fn flush() {
#[cfg(feature = "enable")]
{
#[cfg(target_family = "wasm")]
{
if let Some(collector) = GLOBAL_COLLECTOR.lock().as_mut() {
collector.handle_commands();
}
}
#[cfg(not(target_family = "wasm"))]
{
std::thread::Builder::new()
.name("fastrace-flush".to_string())
.spawn(move || {
if let Some(collector) = GLOBAL_COLLECTOR.lock().as_mut() {
collector.handle_commands();
}
})
.unwrap()
.join()
.unwrap();
}
}
}
pub trait Reporter: Send + 'static {
fn report(&mut self, spans: Vec<SpanRecord>);
}
#[derive(Default, Clone)]
pub(crate) struct GlobalCollect;
#[cfg_attr(test, mockall::automock)]
impl GlobalCollect {
pub fn start_collect(&self) -> usize {
let collect_id = NEXT_COLLECT_ID.fetch_add(1, Ordering::Relaxed);
send_command(CollectCommand::StartCollect(StartCollect { collect_id }));
collect_id
}
pub fn cancel_collect(&self, collect_id: usize) {
send_command(CollectCommand::CancelCollect(CancelCollect { collect_id }));
}
pub fn drop_collect(&self, collect_id: usize) {
send_command(CollectCommand::DropCollect(DropCollect { collect_id }));
}
pub fn submit_spans(&self, spans: SpanSet, collect_token: CollectToken) {
if collect_token.is_sampled {
send_command(CollectCommand::SubmitSpans(SubmitSpans {
spans,
collect_token,
}));
}
}
}
struct SpanCollection {
spans: SpanSet,
trace_id: TraceId,
parent_id: SpanId,
}
#[derive(Default)]
struct ActiveCollector {
span_collections: Vec<SpanCollection>,
danglings: HashMap<SpanId, Vec<DanglingItem>>,
canceled: bool,
}
pub(crate) struct GlobalCollector {
config: Config,
reporter: Option<Box<dyn Reporter>>,
active_collectors: HashMap<usize, ActiveCollector>,
start_collects: Vec<StartCollect>,
cancel_collects: Vec<CancelCollect>,
drop_collects: Vec<DropCollect>,
submit_spans: Vec<SubmitSpans>,
stale_spans: Vec<SpanCollection>,
}
impl GlobalCollector {
fn start(reporter: impl Reporter, config: Config) {
REPORTER_READY.store(true, Ordering::Relaxed);
let mut global_collector = GLOBAL_COLLECTOR.lock();
if let Some(collector) = global_collector.as_mut() {
collector.reporter = Some(Box::new(reporter));
collector.config = config;
} else {
*global_collector = Some(GlobalCollector {
config,
reporter: Some(Box::new(reporter)),
active_collectors: HashMap::new(),
start_collects: vec![],
cancel_collects: vec![],
drop_collects: vec![],
submit_spans: vec![],
stale_spans: vec![],
});
#[cfg(not(target_family = "wasm"))]
{
std::thread::Builder::new()
.name("fastrace-global-collector".to_string())
.spawn(move || {
loop {
let report_interval = {
let mut collector = GLOBAL_COLLECTOR.lock();
let collector = collector.as_mut().unwrap();
collector.handle_commands();
collector.config.report_interval
};
COMMAND_BUS.wait_timeout(report_interval);
}
})
.unwrap();
}
}
}
fn handle_commands(&mut self) {
debug_assert!(self.start_collects.is_empty());
debug_assert!(self.cancel_collects.is_empty());
debug_assert!(self.drop_collects.is_empty());
debug_assert!(self.submit_spans.is_empty());
debug_assert!(self.stale_spans.is_empty());
COMMAND_BUS.drain(|cmd| match cmd {
CollectCommand::StartCollect(cmd) => self.start_collects.push(cmd),
CollectCommand::CancelCollect(cmd) => self.cancel_collects.push(cmd),
CollectCommand::DropCollect(cmd) => self.drop_collects.push(cmd),
CollectCommand::SubmitSpans(cmd) => self.submit_spans.push(cmd),
});
if self.reporter.is_none() {
self.start_collects.clear();
self.cancel_collects.clear();
self.drop_collects.clear();
self.submit_spans.clear();
return;
}
for StartCollect { collect_id } in self.start_collects.drain(..) {
self.active_collectors
.insert(collect_id, ActiveCollector::default());
}
for CancelCollect { collect_id } in self.cancel_collects.drain(..) {
if let Some(active_collector) = self.active_collectors.get_mut(&collect_id) {
active_collector.span_collections.clear();
active_collector.danglings.clear();
active_collector.canceled = true;
}
}
for SubmitSpans {
spans,
collect_token,
} in self.submit_spans.drain(..)
{
if let Some(active_collector) =
self.active_collectors.get_mut(&collect_token.collect_id)
{
if !active_collector.canceled {
active_collector.span_collections.push(SpanCollection {
spans,
trace_id: collect_token.trace_id,
parent_id: collect_token.parent_id,
});
}
} else {
self.stale_spans.push(SpanCollection {
spans,
trace_id: collect_token.trace_id,
parent_id: collect_token.parent_id,
});
}
}
let anchor = Anchor::new();
let mut committed_records = Vec::new();
for DropCollect { collect_id } in self.drop_collects.drain(..) {
if let Some(mut active_collector) = self.active_collectors.remove(&collect_id) {
if !active_collector.canceled {
postprocess_span_collection(
&active_collector.span_collections,
&anchor,
&mut committed_records,
&mut active_collector.danglings,
);
}
}
}
self.stale_spans.sort_by_key(|spans| spans.trace_id);
for spans in self.stale_spans.chunk_by(|a, b| a.trace_id == b.trace_id) {
postprocess_span_collection(
spans,
&anchor,
&mut committed_records,
&mut HashMap::new(),
);
}
self.stale_spans.clear();
self.reporter.as_mut().unwrap().report(committed_records);
}
}
impl LocalSpansInner {
pub fn to_span_records(&self, parent: SpanContext) -> Vec<SpanRecord> {
let anchor: Anchor = Anchor::new();
let mut danglings = HashMap::new();
let mut records = Vec::new();
amend_local_span(
self,
parent.trace_id,
parent.span_id,
&mut records,
&mut danglings,
&anchor,
);
mount_danglings(&mut records, &mut danglings);
records
}
}
enum DanglingItem {
Event(EventRecord),
Properties(Vec<(Cow<'static, str>, Cow<'static, str>)>),
Links(Vec<SpanContext>),
}
fn postprocess_span_collection<'a>(
span_collections: impl IntoIterator<Item = &'a SpanCollection>,
anchor: &Anchor,
committed_records: &mut Vec<SpanRecord>,
danglings: &mut HashMap<SpanId, Vec<DanglingItem>>,
) {
let committed_len = committed_records.len();
for span_collection in span_collections {
match &span_collection.spans {
SpanSet::Span(raw_span) => amend_span(
raw_span,
span_collection.trace_id,
span_collection.parent_id,
committed_records,
danglings,
anchor,
),
SpanSet::LocalSpansInner(local_spans) => amend_local_span(
local_spans,
span_collection.trace_id,
span_collection.parent_id,
committed_records,
danglings,
anchor,
),
SpanSet::SharedLocalSpans(local_spans) => amend_local_span(
local_spans,
span_collection.trace_id,
span_collection.parent_id,
committed_records,
danglings,
anchor,
),
}
}
mount_danglings(&mut committed_records[committed_len..], danglings);
}
fn amend_local_span(
local_spans: &LocalSpansInner,
trace_id: TraceId,
parent_id: SpanId,
spans: &mut Vec<SpanRecord>,
dangling: &mut HashMap<SpanId, Vec<DanglingItem>>,
anchor: &Anchor,
) {
for span in local_spans.spans.iter() {
let parent_id = span.parent_id.unwrap_or(parent_id);
match span.raw_kind {
RawKind::Span => {
let begin_time_unix_ns = span.begin_instant.as_unix_nanos(anchor);
let end_time_unix_ns = if span.end_instant == Instant::ZERO {
local_spans.end_time.as_unix_nanos(anchor)
} else {
span.end_instant.as_unix_nanos(anchor)
};
spans.push(SpanRecord {
trace_id,
span_id: span.id,
parent_id,
begin_time_unix_ns,
duration_ns: end_time_unix_ns.saturating_sub(begin_time_unix_ns),
name: span.name.clone(),
properties: span.properties.clone(),
events: vec![],
links: span.links.clone(),
});
}
RawKind::Event => {
let begin_time_unix_ns = span.begin_instant.as_unix_nanos(anchor);
let event = EventRecord {
name: span.name.clone(),
timestamp_unix_ns: begin_time_unix_ns,
properties: span.properties.clone(),
};
dangling
.entry(parent_id)
.or_default()
.push(DanglingItem::Event(event));
}
RawKind::Properties => {
dangling
.entry(parent_id)
.or_default()
.push(DanglingItem::Properties(span.properties.clone()));
}
RawKind::Link => {
dangling
.entry(parent_id)
.or_default()
.push(DanglingItem::Links(span.links.clone()));
}
}
}
}
fn amend_span(
span: &RawSpan,
trace_id: TraceId,
parent_id: SpanId,
spans: &mut Vec<SpanRecord>,
dangling: &mut HashMap<SpanId, Vec<DanglingItem>>,
anchor: &Anchor,
) {
match span.raw_kind {
RawKind::Span => {
let begin_time_unix_ns = span.begin_instant.as_unix_nanos(anchor);
let end_time_unix_ns = span.end_instant.as_unix_nanos(anchor);
spans.push(SpanRecord {
trace_id,
span_id: span.id,
parent_id,
begin_time_unix_ns,
duration_ns: end_time_unix_ns.saturating_sub(begin_time_unix_ns),
name: span.name.clone(),
properties: span.properties.clone(),
events: vec![],
links: span.links.clone(),
});
}
RawKind::Event => {
let begin_time_unix_ns = span.begin_instant.as_unix_nanos(anchor);
let event = EventRecord {
name: span.name.clone(),
timestamp_unix_ns: begin_time_unix_ns,
properties: span.properties.clone(),
};
dangling
.entry(parent_id)
.or_default()
.push(DanglingItem::Event(event));
}
RawKind::Properties => {
dangling
.entry(parent_id)
.or_default()
.push(DanglingItem::Properties(span.properties.clone()));
}
RawKind::Link => {
dangling
.entry(parent_id)
.or_default()
.push(DanglingItem::Links(span.links.clone()));
}
}
}
fn mount_danglings(records: &mut [SpanRecord], danglings: &mut HashMap<SpanId, Vec<DanglingItem>>) {
for record in records.iter_mut() {
if danglings.is_empty() {
return;
}
if let Some(danglings) = danglings.remove(&record.span_id) {
for dangling in danglings {
match dangling {
DanglingItem::Event(event) => {
record.events.push(event);
}
DanglingItem::Properties(properties) => {
record.properties.extend(properties);
}
DanglingItem::Links(links) => {
record.links.extend(links);
}
}
}
}
}
}