mod formatted;
pub use formatted::*;
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use crate::channels::ChannelType;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub(crate) enum ChannelState {
#[default]
Active,
Closed,
Notified,
}
impl ChannelState {
pub fn as_str(&self) -> &'static str {
match self {
ChannelState::Active => "active",
ChannelState::Closed => "closed",
ChannelState::Notified => "notified",
}
}
}
impl std::fmt::Display for ChannelState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}
impl std::fmt::Display for ChannelType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ChannelType::Bounded(size) => write!(f, "bounded[{}]", size),
ChannelType::Unbounded => write!(f, "unbounded"),
ChannelType::Oneshot => write!(f, "oneshot"),
}
}
}
impl Serialize for ChannelType {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&self.to_string())
}
}
impl<'de> Deserialize<'de> for ChannelType {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
match s.as_str() {
"unbounded" => Ok(ChannelType::Unbounded),
"oneshot" => Ok(ChannelType::Oneshot),
_ => {
if let Some(inner) = s.strip_prefix("bounded[").and_then(|x| x.strip_suffix(']')) {
let size = inner
.parse()
.map_err(|_| serde::de::Error::custom("invalid bounded size"))?;
Ok(ChannelType::Bounded(size))
} else {
Err(serde::de::Error::custom("invalid channel type"))
}
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct DataFlowLogEntry {
pub index: u64,
pub timestamp: u64,
pub message: Option<String>,
pub tid: Option<u64>,
}
impl DataFlowLogEntry {
pub fn new(index: u64, timestamp: u64, message: Option<String>, tid: Option<u64>) -> Self {
Self {
index,
timestamp,
message,
tid,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct ChannelLogs {
pub id: String,
pub sent_logs: Vec<DataFlowLogEntry>,
pub received_logs: Vec<DataFlowLogEntry>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct StreamLogs {
pub id: String,
pub logs: Vec<DataFlowLogEntry>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub(crate) enum FutureState {
#[default]
Pending,
Running,
Suspended,
Ready,
Cancelled,
}
impl FutureState {
pub fn as_str(&self) -> &'static str {
match self {
FutureState::Pending => "pending",
FutureState::Running => "running",
FutureState::Suspended => "suspended",
FutureState::Ready => "ready",
FutureState::Cancelled => "cancelled",
}
}
}
impl std::fmt::Display for FutureState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct FutureLog {
pub id: u32,
pub future_id: u32,
pub state: FutureState,
pub poll_count: u64,
pub total_poll_duration_ns: u64,
pub max_poll_duration_ns: u64,
pub last_poll_duration_ns: u64,
pub total_poll_alloc_bytes: Option<u64>,
pub total_poll_alloc_count: Option<u64>,
pub max_poll_alloc_bytes: Option<u64>,
pub last_poll_alloc_bytes: Option<u64>,
pub result: Option<String>,
}
impl FutureLog {
pub fn new(id: u32, future_id: u32) -> Self {
Self {
id,
future_id,
state: FutureState::default(),
poll_count: 0,
total_poll_duration_ns: 0,
max_poll_duration_ns: 0,
last_poll_duration_ns: 0,
total_poll_alloc_bytes: None,
total_poll_alloc_count: None,
max_poll_alloc_bytes: None,
last_poll_alloc_bytes: None,
result: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct FutureLogsList {
pub id: String,
pub call_count: u64,
pub total_polls: u64,
pub total_poll_duration_ns: u64,
pub total_poll_alloc_bytes: Option<u64>,
pub total_poll_alloc_count: Option<u64>,
pub calls: Vec<FutureLog>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct ThreadMetrics {
pub os_tid: u64,
pub name: String,
pub status: String,
pub status_code: String,
pub cpu_user: f64,
pub cpu_sys: f64,
pub cpu_total: f64,
pub cpu_percent: Option<f64>,
pub cpu_percent_max: Option<f64>,
pub cpu_percent_avg: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub alloc_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub dealloc_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub mem_diff: Option<i64>,
}
impl ThreadMetrics {
pub fn new(
os_tid: u64,
name: String,
status: String,
status_code: String,
cpu_user: f64,
cpu_sys: f64,
) -> Self {
Self {
os_tid,
name,
status,
status_code,
cpu_user,
cpu_sys,
cpu_total: cpu_user + cpu_sys,
cpu_percent: None,
cpu_percent_max: None,
cpu_percent_avg: None,
alloc_bytes: None,
dealloc_bytes: None,
mem_diff: None,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Route {
FunctionsTiming,
FunctionsAlloc,
Threads,
FunctionTimingLogs { function_id: u32 },
FunctionAllocLogs { function_id: u32 },
Debug,
DebugDbgLogs { id: u32 },
DebugValLogs { id: u32 },
DebugGaugeLogs { id: u32 },
DataFlow,
DataFlowChannelLogs { channel_id: u32 },
DataFlowStreamLogs { stream_id: u32 },
DataFlowFutureLogs { future_id: u32 },
TokioRuntime,
ProfilerStatus,
}
impl Route {
pub fn to_path(&self) -> String {
match self {
Route::FunctionsTiming => "/functions_timing".to_string(),
Route::FunctionsAlloc => "/functions_alloc".to_string(),
Route::Threads => "/threads".to_string(),
Route::FunctionTimingLogs { function_id } => {
format!("/functions_timing/{}/logs", function_id)
}
Route::FunctionAllocLogs { function_id } => {
format!("/functions_alloc/{}/logs", function_id)
}
Route::Debug => "/debug".to_string(),
Route::DebugDbgLogs { id } => format!("/debug/dbg/{}/logs", id),
Route::DebugValLogs { id } => format!("/debug/val/{}/logs", id),
Route::DebugGaugeLogs { id } => format!("/debug/gauge/{}/logs", id),
Route::DataFlow => "/data_flow".to_string(),
Route::DataFlowChannelLogs { channel_id } => {
format!("/data_flow/channel/{}/logs", channel_id)
}
Route::DataFlowStreamLogs { stream_id } => {
format!("/data_flow/stream/{}/logs", stream_id)
}
Route::DataFlowFutureLogs { future_id } => {
format!("/data_flow/future/{}/logs", future_id)
}
Route::TokioRuntime => "/tokio_runtime".to_string(),
Route::ProfilerStatus => "/profiler_status".to_string(),
}
}
pub fn to_url(&self, port: u16) -> String {
format!("http://localhost:{}{}", port, self.to_path())
}
}
fn parse_id_from_path(path: &str, prefix: &str) -> Option<u32> {
let rest = path.strip_prefix(prefix)?;
let id_str = rest.strip_suffix("/logs")?;
id_str.parse().ok()
}
impl FromStr for Route {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
let path = s.split('?').next().unwrap_or(s);
match path {
"/functions_timing" => return Ok(Route::FunctionsTiming),
"/functions_alloc" => return Ok(Route::FunctionsAlloc),
"/threads" => return Ok(Route::Threads),
"/debug" => return Ok(Route::Debug),
"/data_flow" => return Ok(Route::DataFlow),
"/tokio_runtime" => return Ok(Route::TokioRuntime),
"/profiler_status" => return Ok(Route::ProfilerStatus),
_ => {}
}
if let Some(function_id) = parse_id_from_path(path, "/functions_timing/") {
return Ok(Route::FunctionTimingLogs { function_id });
}
if let Some(function_id) = parse_id_from_path(path, "/functions_alloc/") {
return Ok(Route::FunctionAllocLogs { function_id });
}
if let Some(id) = parse_id_from_path(path, "/debug/dbg/") {
return Ok(Route::DebugDbgLogs { id });
}
if let Some(id) = parse_id_from_path(path, "/debug/val/") {
return Ok(Route::DebugValLogs { id });
}
if let Some(id) = parse_id_from_path(path, "/debug/gauge/") {
return Ok(Route::DebugGaugeLogs { id });
}
if let Some(channel_id) = parse_id_from_path(path, "/data_flow/channel/") {
return Ok(Route::DataFlowChannelLogs { channel_id });
}
if let Some(stream_id) = parse_id_from_path(path, "/data_flow/stream/") {
return Ok(Route::DataFlowStreamLogs { stream_id });
}
if let Some(future_id) = parse_id_from_path(path, "/data_flow/future/") {
return Ok(Route::DataFlowFutureLogs { future_id });
}
Err(())
}
}