use std::cell::RefCell;
use std::fs::{File, OpenOptions};
use std::io::{self, Write};
use std::path::Path;
use std::sync::atomic::{self, AtomicU32};
use std::sync::Arc;
use std::time::{Instant, SystemTime};
use log::warn;
const BUF_SIZE: usize = 128;
const COORDINATOR_STREAM_ID: u32 = 0;
const RT_SUBSTREAM_ID: u64 = 0;
pub trait Trace {
#[must_use = "tracing events must be finished, otherwise they aren't recorded"]
fn start_trace(&self) -> Option<EventTiming>;
fn finish_trace(
&mut self,
timing: Option<EventTiming>,
description: &str,
attributes: &[(&str, &dyn AttributeValue)],
);
}
#[derive(Debug)]
pub(crate) struct CoordinatorLog {
shared: Arc<SharedLog>,
buf: Vec<u8>,
}
#[derive(Debug)]
pub(crate) struct CoordinatorMetrics<'l> {
pub(crate) file: &'l File,
pub(crate) counter: u32,
}
impl CoordinatorLog {
pub(crate) fn open(path: &Path) -> io::Result<CoordinatorLog> {
let timestamp = SystemTime::now();
let epoch = Instant::now();
let file = OpenOptions::new()
.append(true)
.create_new(true)
.open(path)?;
let mut buf = Vec::with_capacity(BUF_SIZE);
write_epoch_metadata(&mut buf, timestamp);
write_once(&file, &buf)?;
Ok(CoordinatorLog {
shared: Arc::new(SharedLog {
file,
counter: AtomicU32::new(0),
epoch,
}),
buf: Vec::with_capacity(BUF_SIZE),
})
}
pub(crate) fn metrics<'l>(&'l self) -> CoordinatorMetrics<'l> {
CoordinatorMetrics {
file: &self.shared.file,
counter: self.shared.counter.load(atomic::Ordering::Relaxed),
}
}
pub(crate) fn new_stream(&self, stream_id: u32) -> Log {
Log {
shared: self.shared.clone(),
stream_id,
stream_counter: 0,
buf: Vec::with_capacity(BUF_SIZE),
}
}
pub(crate) fn clone_shared(&self) -> Arc<SharedLog> {
self.shared.clone()
}
fn next_stream_count(&mut self) -> u32 {
self.shared.counter.fetch_add(1, atomic::Ordering::AcqRel)
}
}
#[derive(Debug)]
pub(crate) struct SharedLog {
file: File,
counter: AtomicU32,
epoch: Instant,
}
#[derive(Debug)]
pub(crate) struct Log {
shared: Arc<SharedLog>,
stream_id: u32,
stream_counter: u32,
buf: Vec<u8>,
}
#[derive(Debug)]
pub(crate) struct Metrics {
pub(crate) counter: u32,
}
impl Log {
fn next_stream_count(&mut self) -> u32 {
let count = self.stream_counter;
self.stream_counter = self.stream_counter.wrapping_add(1);
count
}
pub(crate) fn metrics(&self) -> Metrics {
Metrics {
counter: self.shared.counter.load(atomic::Ordering::Relaxed),
}
}
}
fn write_epoch_metadata(buf: &mut Vec<u8>, time: SystemTime) {
#[allow(clippy::unreadable_literal)]
const MAGIC: u32 = 0x75D11D4D;
const PACKET_SIZE: u32 = 23;
#[allow(clippy::cast_possible_truncation)]
const OPTION_LENGTH: u16 = OPTION.len() as u16;
const OPTION: &[u8] = b"epoch";
#[allow(clippy::cast_possible_truncation)]
let nanos_since_unix = time
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64;
buf.extend_from_slice(&MAGIC.to_be_bytes());
buf.extend_from_slice(&PACKET_SIZE.to_be_bytes());
buf.extend_from_slice(&OPTION_LENGTH.to_be_bytes());
buf.extend_from_slice(OPTION);
buf.extend_from_slice(&nanos_since_unix.to_be_bytes());
}
fn write_once<W>(mut output: W, buf: &[u8]) -> io::Result<()>
where
W: Write,
{
output.write(buf).and_then(|written| {
if written == buf.len() {
Ok(())
} else {
Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write entire trace event",
))
}
})
}
impl Clone for Log {
fn clone(&self) -> Log {
Log {
shared: self.shared.clone(),
stream_id: self.stream_id,
stream_counter: 0,
buf: Vec::with_capacity(BUF_SIZE),
}
}
}
pub(crate) fn start<L>(log: &Option<L>) -> Option<EventTiming>
where
L: TraceLog,
{
if log.is_some() {
Some(EventTiming::start())
} else {
None
}
}
pub(crate) trait TraceLog {
fn append(&mut self, substream_id: u64, event: &Event<'_>) -> io::Result<()>;
}
impl<L> TraceLog for &'_ mut L
where
L: TraceLog,
{
fn append(&mut self, substream_id: u64, event: &Event<'_>) -> io::Result<()> {
L::append(self, substream_id, event)
}
}
impl TraceLog for CoordinatorLog {
fn append(&mut self, substream_id: u64, event: &Event<'_>) -> io::Result<()> {
let stream_count = self.next_stream_count();
format_event(
&mut self.buf,
self.shared.epoch,
COORDINATOR_STREAM_ID,
stream_count,
substream_id,
event,
);
write_once(&self.shared.file, &self.buf)
}
}
impl TraceLog for Log {
fn append(&mut self, substream_id: u64, event: &Event<'_>) -> io::Result<()> {
let stream_count = self.next_stream_count();
format_event(
&mut self.buf,
self.shared.epoch,
self.stream_id,
stream_count,
substream_id,
event,
);
write_once(&self.shared.file, &self.buf)
}
}
impl<'a> TraceLog for &'a SharedLog {
fn append(&mut self, substream_id: u64, event: &Event<'_>) -> io::Result<()> {
thread_local! {
static BUF: RefCell<Vec<u8>> = RefCell::new(Vec::new());
}
BUF.with(|buf| {
let mut buf = buf.borrow_mut();
let stream_count = self.counter.fetch_add(1, atomic::Ordering::AcqRel);
format_event(
&mut buf,
self.epoch,
COORDINATOR_STREAM_ID,
stream_count,
substream_id,
event,
);
write_once(&self.file, &buf)
})
}
}
fn format_event(
buf: &mut Vec<u8>,
epoch: Instant,
stream_id: u32,
stream_count: u32,
substream_id: u64,
event: &Event<'_>,
) {
#[allow(clippy::unreadable_literal)]
const MAGIC: u32 = 0xC1FC1FB7;
let start_nanos: u64 = nanos_since_epoch(epoch, event.start);
let end_nanos: u64 = nanos_since_epoch(epoch, event.end);
let description: &[u8] = event.description.as_bytes();
#[allow(clippy::cast_possible_truncation)]
let description_len: u16 = description.len() as u16;
buf.clear();
buf.extend_from_slice(&MAGIC.to_be_bytes());
buf.extend_from_slice(&0_u32.to_be_bytes()); buf.extend_from_slice(&stream_id.to_be_bytes());
buf.extend_from_slice(&stream_count.to_be_bytes());
buf.extend_from_slice(&substream_id.to_be_bytes());
buf.extend_from_slice(&start_nanos.to_be_bytes());
buf.extend_from_slice(&end_nanos.to_be_bytes());
buf.extend_from_slice(&description_len.to_be_bytes());
buf.extend_from_slice(description);
for (name, value) in event.attributes {
use private::AttributeValue;
(&**name).write_attribute(buf);
buf.push(value.type_byte());
value.write_attribute(buf);
}
#[allow(clippy::cast_possible_truncation)]
let packet_size = buf.len() as u32;
buf[4..8].copy_from_slice(&packet_size.to_be_bytes());
}
#[track_caller]
#[allow(clippy::cast_possible_truncation)]
fn nanos_since_epoch(epoch: Instant, time: Instant) -> u64 {
time.duration_since(epoch).as_nanos() as u64
}
pub(crate) fn finish<L>(
log: Option<L>,
timing: Option<EventTiming>,
substream_id: u64,
description: &str,
attributes: &[(&str, &dyn AttributeValue)],
) where
L: TraceLog,
{
debug_assert!(
description.len() < u16::MAX as usize,
"description for trace event too long"
);
if let (Some(mut log), Some(timing)) = (log, timing) {
let event = timing.finish(description, attributes);
if let Err(err) = log.append(substream_id, &event) {
warn!("error writing trace data: {}", err);
}
}
}
pub(crate) fn finish_rt<L>(
log: Option<L>,
timing: Option<EventTiming>,
description: &str,
attributes: &[(&str, &dyn AttributeValue)],
) where
L: TraceLog,
{
finish(log, timing, RT_SUBSTREAM_ID, description, attributes)
}
#[derive(Clone, Debug)]
#[must_use = "tracing events must be finished, otherwise they aren't recorded"]
pub struct EventTiming {
start: Instant,
}
impl EventTiming {
fn start() -> EventTiming {
let start = Instant::now();
EventTiming { start }
}
fn finish<'e>(
self,
description: &'e str,
attributes: &'e [(&'e str, &'e dyn AttributeValue)],
) -> Event<'e> {
let end = Instant::now();
Event {
start: self.start,
end,
description,
attributes,
}
}
}
pub(crate) struct Event<'e> {
start: Instant,
end: Instant,
description: &'e str,
attributes: &'e [(&'e str, &'e dyn AttributeValue)],
}
pub trait AttributeValue: private::AttributeValue {}
impl<'a, T> AttributeValue for &'a T where T: AttributeValue + ?Sized {}
mod private {
use std::num::{
NonZeroI16, NonZeroI32, NonZeroI64, NonZeroI8, NonZeroIsize, NonZeroU16, NonZeroU32,
NonZeroU64, NonZeroU8, NonZeroUsize,
};
const UNSIGNED_INTEGER_BYTE: u8 = 0b001;
const SIGNED_INTEGER_BYTE: u8 = 0b010;
const FLOAT_BYTE: u8 = 0b011;
const STRING_BYTE: u8 = 0b100;
const ARRAY_MARKER_BYTE: u8 = 1 << 7;
pub trait AttributeValue {
fn type_byte(&self) -> u8;
fn write_attribute(&self, buf: &mut Vec<u8>);
}
impl<'a, T> AttributeValue for &'a T
where
T: AttributeValue + ?Sized,
{
fn type_byte(&self) -> u8 {
(&**self).type_byte()
}
fn write_attribute(&self, buf: &mut Vec<u8>) {
(&**self).write_attribute(buf)
}
}
macro_rules! impl_write_attribute {
($ty: ty as $f_ty: ty, $n_ty: ty, $type_byte: expr) => {
impl_write_attribute!($ty as $f_ty, $type_byte);
impl AttributeValue for $n_ty {
fn type_byte(&self) -> u8 {
$type_byte
}
fn write_attribute(&self, buf: &mut Vec<u8>) {
#[allow(trivial_numeric_casts)] let value = self.get() as $f_ty;
buf.extend_from_slice(&value.to_be_bytes());
}
}
impl super::AttributeValue for $n_ty {}
};
($ty: ty as $f_ty: ty, $type_byte: expr) => {
impl AttributeValue for $ty {
fn type_byte(&self) -> u8 {
$type_byte
}
fn write_attribute(&self, buf: &mut Vec<u8>) {
#[allow(trivial_numeric_casts)] let value = *self as $f_ty;
buf.extend_from_slice(&value.to_be_bytes());
}
}
impl super::AttributeValue for $ty {}
};
}
impl_write_attribute!(u8 as u64, NonZeroU8, UNSIGNED_INTEGER_BYTE);
impl_write_attribute!(u16 as u64, NonZeroU16, UNSIGNED_INTEGER_BYTE);
impl_write_attribute!(u32 as u64, NonZeroU32, UNSIGNED_INTEGER_BYTE);
impl_write_attribute!(u64 as u64, NonZeroU64, UNSIGNED_INTEGER_BYTE);
impl_write_attribute!(usize as u64, NonZeroUsize, UNSIGNED_INTEGER_BYTE);
impl_write_attribute!(i8 as i64, NonZeroI8, SIGNED_INTEGER_BYTE);
impl_write_attribute!(i16 as i64, NonZeroI16, SIGNED_INTEGER_BYTE);
impl_write_attribute!(i32 as i64, NonZeroI32, SIGNED_INTEGER_BYTE);
impl_write_attribute!(i64 as i64, NonZeroI64, SIGNED_INTEGER_BYTE);
impl_write_attribute!(isize as i64, NonZeroIsize, SIGNED_INTEGER_BYTE);
impl_write_attribute!(f32 as f64, FLOAT_BYTE);
impl_write_attribute!(f64 as f64, FLOAT_BYTE);
impl AttributeValue for str {
fn type_byte(&self) -> u8 {
STRING_BYTE
}
fn write_attribute(&self, buf: &mut Vec<u8>) {
let bytes = self.as_bytes();
debug_assert!(bytes.len() < u16::MAX as usize);
#[allow(clippy::cast_possible_truncation)]
let length = bytes.len() as u16;
buf.extend_from_slice(&length.to_be_bytes());
buf.extend_from_slice(bytes);
}
}
impl super::AttributeValue for str {}
impl AttributeValue for String {
fn type_byte(&self) -> u8 {
(&**self).type_byte()
}
fn write_attribute(&self, buf: &mut Vec<u8>) {
(&**self).write_attribute(buf)
}
}
impl super::AttributeValue for String {}
impl<T> AttributeValue for [T]
where
T: AttributeValue + Default,
{
fn type_byte(&self) -> u8 {
let type_byte = match self.first() {
Some(elem) => elem.type_byte(),
None => T::default().type_byte(),
};
type_byte | ARRAY_MARKER_BYTE
}
fn write_attribute(&self, buf: &mut Vec<u8>) {
debug_assert!(self.len() < u16::MAX as usize);
#[allow(clippy::cast_possible_truncation)]
let length = self.len() as u16;
buf.extend_from_slice(&length.to_be_bytes());
for attribute in self.iter() {
attribute.write_attribute(buf)
}
}
}
impl<T> super::AttributeValue for [T] where T: super::AttributeValue + Default {}
impl<T, const N: usize> AttributeValue for [T; N]
where
T: AttributeValue + Default,
{
fn type_byte(&self) -> u8 {
(&self[..]).type_byte()
}
fn write_attribute(&self, buf: &mut Vec<u8>) {
(&self[..]).write_attribute(buf)
}
}
impl<T, const N: usize> super::AttributeValue for [T; N] where T: super::AttributeValue + Default {}
}