#![cfg(feature = "io_trace")]
use base64::Engine;
use std::collections::HashMap;
use std::io::Write;
use tracing::{span, Subscriber};
use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
use tracing_subscriber::fmt::MakeWriter;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::Layer;
pub struct IoTraceLayer {
make_writer: NonBlocking,
}
enum IoEventType {
StorageOp(StorageOp),
DbOp(DbOp),
}
#[derive(strum::Display)]
#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]
enum StorageOp {
Read,
Write,
Remove,
Exists,
Other,
}
#[derive(strum::Display)]
#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]
enum DbOp {
Get,
Insert,
Set,
UpdateRc,
Delete,
DeleteAll,
Other,
}
struct OutputBuffer(Vec<BufferedLine>);
struct BufferedLine {
indent: usize,
output_line: String,
}
#[derive(Default)]
struct SpanInfo {
key_values: Vec<String>,
counts: HashMap<String, u64>,
}
impl<S: Subscriber + for<'span> LookupSpan<'span>> Layer<S> for IoTraceLayer {
fn on_new_span(
&self,
attrs: &span::Attributes<'_>,
id: &span::Id,
ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let span = ctx.span(id).unwrap();
let mut span_info = SpanInfo::default();
attrs.record(&mut span_info);
span.extensions_mut().insert(span_info);
span.extensions_mut().insert(OutputBuffer(vec![]));
}
fn on_event(&self, event: &tracing::Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) {
if event.metadata().target() == "io_tracer_count" {
let mut span = ctx.event_span(event);
while let Some(parent) = span {
if let Some(span_info) = parent.extensions_mut().get_mut::<SpanInfo>() {
event.record(span_info);
break;
} else {
span = parent.parent();
}
}
} else {
self.record_io_event(event, ctx);
}
}
#[allow(clippy::arithmetic_side_effects)]
fn on_exit(&self, id: &span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
let span = ctx.span(id).unwrap();
let name = span.name();
let span_line = {
let mut span_info = span.extensions_mut().replace(SpanInfo::default()).unwrap();
for (key, count) in span_info.counts.drain() {
span_info.key_values.push(format!("{key}={count}"));
}
format!("{name} {}", span_info.key_values.join(" "))
};
let OutputBuffer(mut exiting_buffer) =
span.extensions_mut().replace(OutputBuffer(vec![])).unwrap();
if let Some(parent) = span.parent() {
let mut ext = parent.extensions_mut();
let OutputBuffer(parent_buffer) = ext.get_mut().unwrap();
parent_buffer.push(BufferedLine { indent: 2, output_line: span_line });
parent_buffer.extend(exiting_buffer.drain(..).map(|mut line| {
line.indent += 2;
line
}));
} else {
let mut out = self.make_writer.make_writer();
writeln!(out, "{span_line}").unwrap();
for BufferedLine { indent, output_line } in exiting_buffer.drain(..) {
writeln!(out, "{:indent$}{output_line}", "").unwrap();
}
}
}
fn on_record(
&self,
span: &span::Id,
values: &span::Record<'_>,
ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let mut span = ctx.span(span);
while let Some(parent) = span {
if let Some(span_info) = parent.extensions_mut().get_mut::<SpanInfo>() {
values.record(span_info);
break;
} else {
span = parent.parent();
}
}
}
}
impl IoTraceLayer {
pub(crate) fn new<W: 'static + Write + Send + Sync>(out: W) -> (Self, WorkerGuard) {
let (make_writer, guard) = NonBlocking::new(out);
(Self { make_writer }, guard)
}
fn record_io_event<S: Subscriber + for<'span> LookupSpan<'span>>(
&self,
event: &tracing::Event,
ctx: tracing_subscriber::layer::Context<S>,
) {
struct FormattedSize(Option<u64>);
impl std::fmt::Display for FormattedSize {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.map_or(Ok(()), |size| write!(fmt, " size={size}"))
}
}
let mut visitor = IoEventVisitor::default();
event.record(&mut visitor);
match visitor.t {
Some(IoEventType::DbOp(db_op)) => {
let col = visitor.col.as_deref().unwrap_or("?");
let key = visitor.key.as_deref().unwrap_or("?");
let size = FormattedSize(visitor.size);
let output_line = format!("{db_op} {col} {key:?}{size}");
if let Some(span) = ctx.event_span(event) {
span.extensions_mut()
.get_mut::<OutputBuffer>()
.unwrap()
.0
.push(BufferedLine { indent: 2, output_line });
} else {
writeln!(self.make_writer.make_writer(), "{output_line}").unwrap();
}
}
Some(IoEventType::StorageOp(storage_op)) => {
let key_bytes = visitor.key.map(|key| {
base64::engine::general_purpose::STANDARD
.decode(key)
.expect("key was not properly base64-encoded")
});
let key = key_bytes
.as_ref()
.map(|k| format!("{}", unc_fmt::Bytes(&*k)))
.unwrap_or_else(|| String::from("?"));
let size = FormattedSize(visitor.size);
let tn_db_reads = visitor.tn_db_reads.unwrap();
let tn_mem_reads = visitor.tn_mem_reads.unwrap();
let span_info =
format!("{storage_op} key={key}{size} tn_db_reads={tn_db_reads} tn_mem_reads={tn_mem_reads}");
let span =
ctx.event_span(event).expect("storage operations must happen inside span");
span.extensions_mut().get_mut::<SpanInfo>().unwrap().key_values.push(span_info);
}
None => {
}
}
}
}
#[derive(Default)]
struct IoEventVisitor {
t: Option<IoEventType>,
key: Option<String>,
col: Option<String>,
size: Option<u64>,
evicted_len: Option<u64>,
tn_db_reads: Option<u64>,
tn_mem_reads: Option<u64>,
}
impl tracing::field::Visit for IoEventVisitor {
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
match field.name() {
"size" => self.size = Some(value),
"evicted_len" => self.evicted_len = Some(value),
"tn_db_reads" => self.tn_db_reads = Some(value),
"tn_mem_reads" => self.tn_mem_reads = Some(value),
_ => { }
}
}
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
if value >= 0 {
self.record_u64(field, value as u64);
}
}
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
match field.name() {
"key" => self.key = Some(value.to_owned()),
"col" => self.col = Some(value.to_owned()),
"storage_op" => {
let op = match value {
"write" => StorageOp::Write,
"read" => StorageOp::Read,
"exists" => StorageOp::Exists,
"remove" => StorageOp::Remove,
_ => StorageOp::Other,
};
self.t = Some(IoEventType::StorageOp(op));
}
"size" => {
self.size = value.parse().ok();
}
"db_op" => {
let op = match value {
"get" => DbOp::Get,
"insert" => DbOp::Insert,
"set" => DbOp::Set,
"update_rc" => DbOp::UpdateRc,
"delete" => DbOp::Delete,
"delete_all" => DbOp::DeleteAll,
_ => DbOp::Other,
};
self.t = Some(IoEventType::DbOp(op));
}
_ => { }
}
}
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
self.record_str(field, &format!("{value:?}"))
}
}
impl tracing::field::Visit for SpanInfo {
#[allow(clippy::arithmetic_side_effects)]
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
if field.name() == "counter" {
*self.counts.entry(value.to_string()).or_default() += 1;
} else {
self.record_debug(field, &value);
}
}
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
let name = field.name();
let ignore = ["message", "node_counter"];
if !ignore.contains(&name) {
self.key_values.push(format!("{name}={value:?}"));
}
}
}