use regex::Regex;
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use std::sync::LazyLock;
pub use crate::output::{FunctionLogsJson, FunctionsJson};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ChannelState {
#[default]
Active,
Closed,
Full,
Notified,
}
impl ChannelState {
pub fn as_str(&self) -> &'static str {
match self {
ChannelState::Active => "active",
ChannelState::Closed => "closed",
ChannelState::Full => "full",
ChannelState::Notified => "notified",
}
}
}
impl std::fmt::Display for ChannelState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChannelType {
Bounded(usize),
Unbounded,
Oneshot,
}
impl std::fmt::Display for ChannelType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ChannelType::Bounded(size) => write!(f, "bounded[{}]", size),
ChannelType::Unbounded => write!(f, "unbounded"),
ChannelType::Oneshot => write!(f, "oneshot"),
}
}
}
impl Serialize for ChannelType {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&self.to_string())
}
}
impl<'de> Deserialize<'de> for ChannelType {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
match s.as_str() {
"unbounded" => Ok(ChannelType::Unbounded),
"oneshot" => Ok(ChannelType::Oneshot),
_ => {
if let Some(inner) = s.strip_prefix("bounded[").and_then(|x| x.strip_suffix(']')) {
let size = inner
.parse()
.map_err(|_| serde::de::Error::custom("invalid bounded size"))?;
Ok(ChannelType::Bounded(size))
} else {
Err(serde::de::Error::custom("invalid channel type"))
}
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogEntry {
pub index: u64,
pub timestamp: u64,
pub message: Option<String>,
pub tid: Option<u64>,
}
impl LogEntry {
pub fn new(index: u64, timestamp: u64, message: Option<String>, tid: Option<u64>) -> Self {
Self {
index,
timestamp,
message,
tid,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChannelsJson {
pub current_elapsed_ns: u64,
pub channels: Vec<SerializableChannelStats>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SerializableChannelStats {
pub id: u64,
pub source: String,
pub label: String,
pub has_custom_label: bool,
pub channel_type: ChannelType,
pub state: ChannelState,
pub sent_count: u64,
pub received_count: u64,
pub queued: u64,
pub type_name: String,
pub type_size: usize,
pub queued_bytes: u64,
pub iter: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChannelLogs {
pub id: String,
pub sent_logs: Vec<LogEntry>,
pub received_logs: Vec<LogEntry>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamsJson {
pub current_elapsed_ns: u64,
pub streams: Vec<SerializableStreamStats>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SerializableStreamStats {
pub id: u64,
pub source: String,
pub label: String,
pub has_custom_label: bool,
pub state: ChannelState,
pub items_yielded: u64,
pub type_name: String,
pub type_size: usize,
pub iter: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamLogs {
pub id: String,
pub logs: Vec<LogEntry>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum FutureState {
#[default]
Pending,
Running,
Suspended,
Ready,
Cancelled,
}
impl FutureState {
pub fn as_str(&self) -> &'static str {
match self {
FutureState::Pending => "pending",
FutureState::Running => "running",
FutureState::Suspended => "suspended",
FutureState::Ready => "ready",
FutureState::Cancelled => "cancelled",
}
}
}
impl std::fmt::Display for FutureState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FutureCall {
pub id: u64,
pub future_id: u64,
pub state: FutureState,
pub poll_count: u64,
pub result: Option<String>,
}
impl FutureCall {
pub fn new(id: u64, future_id: u64) -> Self {
Self {
id,
future_id,
state: FutureState::default(),
poll_count: 0,
result: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FuturesJson {
pub current_elapsed_ns: u64,
pub futures: Vec<SerializableFutureStats>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SerializableFutureStats {
pub id: u64,
pub source: String,
pub label: String,
pub has_custom_label: bool,
pub call_count: u64,
pub total_polls: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FutureCalls {
pub id: String,
pub calls: Vec<FutureCall>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ThreadMetrics {
pub os_tid: u64,
pub name: String,
pub status: String,
pub status_code: String,
pub cpu_user: f64,
pub cpu_sys: f64,
pub cpu_total: f64,
pub cpu_percent: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub alloc_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub dealloc_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub mem_diff: Option<i64>,
}
impl ThreadMetrics {
pub fn new(
os_tid: u64,
name: String,
status: String,
status_code: String,
cpu_user: f64,
cpu_sys: f64,
) -> Self {
Self {
os_tid,
name,
status,
status_code,
cpu_user,
cpu_sys,
cpu_total: cpu_user + cpu_sys,
cpu_percent: None,
alloc_bytes: None,
dealloc_bytes: None,
mem_diff: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ThreadsJson {
pub current_elapsed_ns: u64,
pub sample_interval_ms: u64,
pub threads: Vec<ThreadMetrics>,
pub thread_count: usize,
#[serde(skip_serializing_if = "Option::is_none")]
pub rss_bytes: Option<u64>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Route {
FunctionsTiming,
FunctionsAlloc,
Channels,
Streams,
Futures,
Threads,
FunctionTimingLogs { function_name: String },
FunctionAllocLogs { function_name: String },
ChannelLogs { channel_id: u64 },
StreamLogs { stream_id: u64 },
FutureCalls { future_id: u64 },
}
impl Route {
pub fn to_path(&self) -> String {
use base64::Engine;
match self {
Route::FunctionsTiming => "/functions_timing".to_string(),
Route::FunctionsAlloc => "/functions_alloc".to_string(),
Route::Channels => "/channels".to_string(),
Route::Streams => "/streams".to_string(),
Route::Futures => "/futures".to_string(),
Route::Threads => "/threads".to_string(),
Route::FunctionTimingLogs { function_name } => {
let encoded =
base64::engine::general_purpose::STANDARD.encode(function_name.as_bytes());
format!("/functions_timing/{}/logs", encoded)
}
Route::FunctionAllocLogs { function_name } => {
let encoded =
base64::engine::general_purpose::STANDARD.encode(function_name.as_bytes());
format!("/functions_alloc/{}/logs", encoded)
}
Route::ChannelLogs { channel_id } => format!("/channels/{}/logs", channel_id),
Route::StreamLogs { stream_id } => format!("/streams/{}/logs", stream_id),
Route::FutureCalls { future_id } => format!("/futures/{}/calls", future_id),
}
}
pub fn to_url(&self, port: u16) -> String {
format!("http://localhost:{}{}", port, self.to_path())
}
}
static RE_CHANNEL_LOGS: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^/channels/(\d+)/logs$").unwrap());
static RE_STREAM_LOGS: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^/streams/(\d+)/logs$").unwrap());
static RE_FUTURE_CALLS: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^/futures/(\d+)/calls$").unwrap());
static RE_FUNCTION_LOGS_TIMING: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^/functions_timing/([^/]+)/logs$").unwrap());
static RE_FUNCTION_LOGS_ALLOC: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^/functions_alloc/([^/]+)/logs$").unwrap());
fn base64_decode(encoded: &str) -> Result<String, String> {
use base64::Engine;
let bytes = base64::engine::general_purpose::STANDARD
.decode(encoded)
.map_err(|e| e.to_string())?;
String::from_utf8(bytes).map_err(|e| e.to_string())
}
impl FromStr for Route {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
let path = s.split('?').next().unwrap_or(s);
match path {
"/functions_timing" => return Ok(Route::FunctionsTiming),
"/functions_alloc" => return Ok(Route::FunctionsAlloc),
"/channels" => return Ok(Route::Channels),
"/streams" => return Ok(Route::Streams),
"/futures" => return Ok(Route::Futures),
"/threads" => return Ok(Route::Threads),
_ => {}
}
if let Some(caps) = RE_FUNCTION_LOGS_TIMING.captures(path) {
let function_name = base64_decode(&caps[1]).map_err(|_| ())?;
return Ok(Route::FunctionTimingLogs { function_name });
}
if let Some(caps) = RE_FUNCTION_LOGS_ALLOC.captures(path) {
let function_name = base64_decode(&caps[1]).map_err(|_| ())?;
return Ok(Route::FunctionAllocLogs { function_name });
}
if let Some(caps) = RE_CHANNEL_LOGS.captures(path) {
let channel_id = caps[1].parse().map_err(|_| ())?;
return Ok(Route::ChannelLogs { channel_id });
}
if let Some(caps) = RE_STREAM_LOGS.captures(path) {
let stream_id = caps[1].parse().map_err(|_| ())?;
return Ok(Route::StreamLogs { stream_id });
}
if let Some(caps) = RE_FUTURE_CALLS.captures(path) {
let future_id = caps[1].parse().map_err(|_| ())?;
return Ok(Route::FutureCalls { future_id });
}
Err(())
}
}