use crate::{
btf_container::BtfContainer,
export_event::checker::check_sample_types_btf,
meta::{BufferValueInterpreter, ExportedTypesStructMeta, MapSampleMeta, SampleMapType},
};
use anyhow::{anyhow, bail, Context, Result};
use log::debug;
use std::{any::Any, fmt::Display, sync::Arc};
use self::{
checker::check_export_types_btf,
event_handlers::{buffer, get_plain_text_checked_types_header, sample_map},
type_descriptor::{CheckedExportedMember, TypeDescriptor},
};
pub(crate) mod checker;
pub(crate) mod data_dumper;
pub(crate) mod event_handlers;
#[cfg(test)]
mod tests;
pub mod type_descriptor;
#[derive(Clone, Copy)]
pub enum ExportFormatType {
PlainText,
Json,
RawEvent,
}
#[derive(Debug)]
pub enum ReceivedEventData<'a> {
Buffer(&'a [u8]),
KeyValueBuffer {
key: &'a [u8],
value: &'a [u8],
},
PlainText(&'a str),
JsonText(&'a str),
}
impl<'a> Display for ReceivedEventData<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ReceivedEventData::Buffer(b) => {
write!(f, "{b:?}")?;
}
ReceivedEventData::KeyValueBuffer { key, value } => {
write!(f, "key: {key:?} value: {value:?}")?;
}
ReceivedEventData::PlainText(s) | ReceivedEventData::JsonText(s) => {
write!(f, "{s}")?;
}
}
Ok(())
}
}
impl<'a> ReceivedEventData<'a> {
pub fn trivally_to_plain_bytes(&self) -> &'a [u8] {
match self {
ReceivedEventData::Buffer(buf) => buf,
ReceivedEventData::KeyValueBuffer { value, .. } => value,
ReceivedEventData::PlainText(txt) => txt.as_bytes(),
ReceivedEventData::JsonText(txt) => txt.as_bytes(),
}
}
}
pub trait EventHandler {
fn handle_event(&self, context: Option<Arc<dyn Any>>, data: ReceivedEventData);
}
pub(crate) enum ExporterInternalImplementation {
BufferValueProcessor {
event_processor: Box<dyn InternalBufferValueEventProcessor>,
checked_types: Vec<CheckedExportedMember>,
},
KeyValueMapProcessor {
event_processor: Box<dyn InternalSampleMapProcessor>,
checked_key_types: Vec<CheckedExportedMember>,
checked_value_types: Vec<CheckedExportedMember>,
sample_map_config: MapSampleMeta,
},
}
pub struct EventExporter {
pub(crate) user_export_event_handler: Option<Arc<dyn EventHandler>>,
pub(crate) internal_impl: ExporterInternalImplementation,
pub(crate) user_ctx: Option<Arc<dyn Any>>,
pub(crate) btf_container: Arc<BtfContainer>,
}
impl EventExporter {
pub(crate) fn dump_data_to_user_callback_or_stdout(&self, data: ReceivedEventData) {
dump_data_to_user_callback_or_stdout(
self.user_export_event_handler.clone(),
self.user_ctx.clone(),
data,
);
}
}
pub(crate) fn dump_data_to_user_callback_or_stdout(
user_export_event_handler: Option<Arc<dyn EventHandler>>,
user_ctx: Option<Arc<dyn Any>>,
data: ReceivedEventData,
) {
if let Some(callback) = user_export_event_handler.as_ref() {
callback.handle_event(user_ctx, data);
} else {
println!("{data}");
}
}
pub(crate) trait InternalBufferValueEventProcessor {
fn handle_event(&self, data: &[u8]) -> Result<()>;
}
pub(crate) trait InternalSampleMapProcessor {
fn handle_event(&self, key_buffer: &[u8], value_buffer: &[u8]) -> Result<()>;
}
pub struct EventExporterBuilder {
export_format: ExportFormatType,
export_event_handler: Option<Arc<dyn EventHandler>>,
user_ctx: Option<Arc<dyn Any>>,
}
impl Default for EventExporterBuilder {
fn default() -> Self {
Self {
export_format: ExportFormatType::PlainText,
export_event_handler: None,
user_ctx: None,
}
}
}
impl EventExporterBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn set_export_format(self, fmt: ExportFormatType) -> Self {
Self {
export_format: fmt,
..self
}
}
pub fn set_export_event_handler(self, handler: Arc<dyn EventHandler>) -> Self {
Self {
export_event_handler: Some(handler),
..self
}
}
pub fn set_user_context<T: Any>(self, ctx: T) -> Self {
Self {
user_ctx: Some(Arc::new(ctx)),
..self
}
}
pub fn build_for_single_value_with_type_descriptor(
self,
export_type: TypeDescriptor,
btf_container: Arc<BtfContainer>,
intepreter: &BufferValueInterpreter,
) -> Result<Arc<EventExporter>> {
let mut checked_exported_members =
export_type.build_checked_exported_members(btf_container.borrow_btf())?;
if matches!(intepreter, BufferValueInterpreter::StackTrace { .. })
&& !matches!(self.export_format, ExportFormatType::PlainText)
{
bail!("Intepreter `stack_trace` could only be paired with plaintext export format");
}
Ok(Arc::new_cyclic(move |me| {
let internal_event_processor: Box<dyn InternalBufferValueEventProcessor> =
match (self.export_format, intepreter) {
(ExportFormatType::Json, BufferValueInterpreter::DefaultStruct) => {
Box::new(buffer::JsonExportEventHandler {
exporter: me.clone(),
})
}
(
ExportFormatType::PlainText,
BufferValueInterpreter::StackTrace {
field_map,
with_symbols,
},
) => {
debug!("Using stack trace exporter");
Box::new(buffer::PlainTextStackTraceExportEventHandler {
exporter: me.clone(),
field_mapping: field_map.clone(),
with_symbols: *with_symbols,
})
}
(ExportFormatType::PlainText, BufferValueInterpreter::DefaultStruct) => {
let header = get_plain_text_checked_types_header(
&mut checked_exported_members,
"TIME ",
);
dump_data_to_user_callback_or_stdout(
self.export_event_handler.clone(),
self.user_ctx.clone(),
ReceivedEventData::PlainText(header.as_str()),
);
Box::new(buffer::PlainStringExportEventHandler {
exporter: me.clone(),
})
}
(ExportFormatType::RawEvent, BufferValueInterpreter::DefaultStruct) => {
Box::new(buffer::RawExportEventHandler {
exporter: me.clone(),
})
}
(_, _) => unreachable!("Unexpected exportformattype + intepreter"),
};
EventExporter {
user_export_event_handler: self.export_event_handler,
user_ctx: self.user_ctx,
btf_container,
internal_impl: ExporterInternalImplementation::BufferValueProcessor {
event_processor: internal_event_processor,
checked_types: checked_exported_members,
},
}
}))
}
pub fn build_for_single_value(
self,
export_type: &ExportedTypesStructMeta,
btf_container: Arc<BtfContainer>,
intepreter: &BufferValueInterpreter,
) -> Result<Arc<EventExporter>> {
let checked_members = check_export_types_btf(export_type, btf_container.borrow_btf())?;
Self::build_for_single_value_with_type_descriptor(
self,
TypeDescriptor::CheckedMembers(checked_members),
btf_container,
intepreter,
)
}
pub fn build_for_key_value_with_type_desc(
self,
key_export_type: TypeDescriptor,
value_export_type: TypeDescriptor,
sample_config: &MapSampleMeta,
btf_container: Arc<BtfContainer>,
) -> Result<Arc<EventExporter>> {
let mut checked_key_types =
key_export_type.build_checked_exported_members(btf_container.borrow_btf())?;
let mut checked_value_types =
value_export_type.build_checked_exported_members(btf_container.borrow_btf())?;
if matches!(self.export_format, ExportFormatType::PlainText)
&& matches!(sample_config.ty, SampleMapType::LinearHist)
{
bail!("Linear hist sampling is not supported now");
}
Ok(Arc::new_cyclic(move |me| {
let internal_sample_map_processor: Box<dyn InternalSampleMapProcessor> = match self
.export_format
{
ExportFormatType::PlainText => match sample_config.ty {
SampleMapType::Log2Hist => Box::new(sample_map::Log2HistExportEventHandler {
exporter: me.clone(),
}),
SampleMapType::DefaultKV => {
let header = String::from("TIME ");
let header =
get_plain_text_checked_types_header(&mut checked_key_types, header);
let header =
get_plain_text_checked_types_header(&mut checked_value_types, header);
dump_data_to_user_callback_or_stdout(
self.export_event_handler.clone(),
self.user_ctx.clone(),
ReceivedEventData::PlainText(header.as_str()),
);
Box::new(sample_map::DefaultKVStringExportEventHandler {
exporter: me.clone(),
})
}
SampleMapType::LinearHist => unreachable!(),
},
ExportFormatType::Json => Box::new(sample_map::JsonExportEventHandler {
exporter: me.clone(),
}),
ExportFormatType::RawEvent => Box::new(sample_map::RawExportEventHandler {
exporter: me.clone(),
}),
};
EventExporter {
user_export_event_handler: self.export_event_handler,
internal_impl: ExporterInternalImplementation::KeyValueMapProcessor {
event_processor: internal_sample_map_processor,
checked_key_types,
checked_value_types,
sample_map_config: sample_config.clone(),
},
user_ctx: self.user_ctx,
btf_container,
}
}))
}
pub fn build_for_key_value(
self,
key_type_id: u32,
value_type_id: u32,
sample_config: &MapSampleMeta,
export_type: &ExportedTypesStructMeta,
btf_container: Arc<BtfContainer>,
) -> Result<Arc<EventExporter>> {
let btf = btf_container.borrow_btf();
let checked_key_types = check_sample_types_btf(btf, key_type_id, None)
.with_context(|| anyhow!("Failed to check key type"))?;
let checked_value_types =
check_sample_types_btf(btf, value_type_id, Some(export_type.clone()))
.with_context(|| anyhow!("Failed to check value type"))?;
self.build_for_key_value_with_type_desc(
TypeDescriptor::CheckedMembers(checked_key_types),
TypeDescriptor::CheckedMembers(checked_value_types),
sample_config,
btf_container,
)
}
}