use std::{collections::HashMap, sync::OnceLock, sync::RwLock, time::Duration};
use arc_swap::ArcSwapOption;
use crossbeam_channel::{bounded, Sender};
use crate::{http_server::RECV_TIMEOUT_MS, FunctionLogsJson, FunctionsJson};
cfg_if::cfg_if! {
if #[cfg(feature = "hotpath-alloc")] {
pub mod alloc;
use alloc::state::FunctionsState;
use tokio::runtime::{Handle, RuntimeFlavor};
pub use alloc::guard::{MeasurementGuard, MeasurementGuardWithLog};
pub use alloc::state::FunctionStats;
} else {
pub mod timing;
use timing::state::FunctionsState;
pub use timing::guard::{MeasurementGuard, MeasurementGuardWithLog};
pub use timing::state::FunctionStats;
}
}
pub(crate) const MAX_RESULT_LEN: usize = 1536;
impl MeasurementGuard {
pub fn build(measurement_name: &'static str, wrapper: bool, _is_async: bool) -> Self {
#[allow(clippy::needless_bool)]
let unsupported_async = if wrapper {
false
} else {
cfg_if::cfg_if! {
if #[cfg(feature = "hotpath-alloc")] {
if _is_async {
match Handle::try_current() {
Ok(h) => h.runtime_flavor() != RuntimeFlavor::CurrentThread,
Err(_) => true,
}
} else {
false
}
} else {
false
}
}
};
MeasurementGuard::new(measurement_name, wrapper, unsupported_async)
}
}
impl MeasurementGuardWithLog {
pub fn build(measurement_name: &'static str, wrapper: bool, _is_async: bool) -> Self {
#[allow(clippy::needless_bool)]
let unsupported_async = if wrapper {
false
} else {
cfg_if::cfg_if! {
if #[cfg(feature = "hotpath-alloc")] {
if _is_async {
match Handle::try_current() {
Ok(h) => h.runtime_flavor() != RuntimeFlavor::CurrentThread,
Err(_) => true,
}
} else {
false
}
} else {
false
}
}
};
MeasurementGuardWithLog::new(measurement_name, wrapper, unsupported_async)
}
}
#[doc(hidden)]
#[inline]
pub fn measure_with_log<T: std::fmt::Debug, F: FnOnce() -> T>(
name: &'static str,
wrapper: bool,
is_async: bool,
f: F,
) -> T {
let guard = MeasurementGuardWithLog::build(name, wrapper, is_async);
let result = f();
guard.finish_with_result(&result);
result
}
#[doc(hidden)]
pub async fn measure_with_log_async<T: std::fmt::Debug, F, Fut>(name: &'static str, f: F) -> T
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = T>,
{
let guard = MeasurementGuardWithLog::build(name, false, true);
let result = f().await;
guard.finish_with_result(&result);
result
}
pub(crate) fn truncate_result(s: String) -> String {
if s.len() <= MAX_RESULT_LEN {
s
} else {
format!("{}...", &s[..MAX_RESULT_LEN.saturating_sub(3)])
}
}
pub(crate) static FUNCTIONS_STATE: OnceLock<ArcSwapOption<RwLock<FunctionsState>>> =
OnceLock::new();
pub mod guard;
pub(crate) enum FunctionsQuery {
Timing(Sender<FunctionsJson>),
Alloc(Sender<Option<FunctionsJson>>),
LogsTiming {
function_name: String,
response_tx: Sender<Option<FunctionLogsJson>>,
},
LogsAlloc {
function_name: String,
response_tx: Sender<Option<FunctionLogsJson>>,
},
}
pub(crate) fn get_functions_timing_json() -> FunctionsJson {
if let Some(metrics) = try_get_functions_timing_from_worker() {
return metrics;
}
FunctionsJson {
hotpath_profiling_mode: crate::output::ProfilingMode::Timing,
total_elapsed: 0,
description: "No timing data available yet".to_string(),
caller_name: "hotpath".to_string(),
percentiles: vec![95],
data: crate::output::FunctionsDataJson(HashMap::new()),
}
}
pub(crate) fn get_function_logs_timing(function_name: &str) -> Option<FunctionLogsJson> {
let arc_swap = FUNCTIONS_STATE.get()?;
let state_option = arc_swap.load();
let state_arc = (*state_option).as_ref()?.clone();
let state_guard = state_arc.read().ok()?;
let (response_tx, response_rx) = bounded::<Option<FunctionLogsJson>>(1);
if let Some(query_tx) = &state_guard.query_tx {
query_tx
.send(FunctionsQuery::LogsTiming {
function_name: function_name.to_string(),
response_tx,
})
.ok()?;
drop(state_guard);
response_rx
.recv_timeout(Duration::from_millis(RECV_TIMEOUT_MS))
.ok()
.flatten()
} else {
None
}
}
fn try_get_functions_timing_from_worker() -> Option<FunctionsJson> {
let arc_swap = FUNCTIONS_STATE.get()?;
let state_option = arc_swap.load();
let state_arc = (*state_option).as_ref()?.clone();
let state_guard = state_arc.read().ok()?;
let (response_tx, response_rx) = bounded::<FunctionsJson>(1);
if let Some(query_tx) = &state_guard.query_tx {
query_tx.send(FunctionsQuery::Timing(response_tx)).ok()?;
drop(state_guard);
response_rx
.recv_timeout(Duration::from_millis(RECV_TIMEOUT_MS))
.ok()
} else {
None
}
}
pub(crate) fn get_functions_alloc_json() -> Option<FunctionsJson> {
let arc_swap = FUNCTIONS_STATE.get()?;
let state_option = arc_swap.load();
let state_arc = (*state_option).as_ref()?.clone();
let state_guard = state_arc.read().ok()?;
let (response_tx, response_rx) = bounded::<Option<FunctionsJson>>(1);
if let Some(query_tx) = &state_guard.query_tx {
query_tx.send(FunctionsQuery::Alloc(response_tx)).ok()?;
drop(state_guard);
response_rx
.recv_timeout(Duration::from_millis(RECV_TIMEOUT_MS))
.ok()
.flatten()
} else {
None
}
}
pub(crate) fn get_function_logs_alloc(function_name: &str) -> Option<FunctionLogsJson> {
let arc_swap = FUNCTIONS_STATE.get()?;
let state_option = arc_swap.load();
let state_arc = (*state_option).as_ref()?.clone();
let state_guard = state_arc.read().ok()?;
let (response_tx, response_rx) = bounded::<Option<FunctionLogsJson>>(1);
if let Some(query_tx) = &state_guard.query_tx {
query_tx
.send(FunctionsQuery::LogsAlloc {
function_name: function_name.to_string(),
response_tx,
})
.ok()?;
drop(state_guard);
response_rx
.recv_timeout(Duration::from_millis(RECV_TIMEOUT_MS))
.ok()
.flatten()
} else {
None
}
}