hotpath 0.15.0

Simple async Rust profiler with memory and data-flow insights - quickly find and debug performance bottlenecks.
Documentation
use crate::debug::dbg::{get_dbg_logs, get_debug_entries_json};
use crate::debug::gauge::get_debug_gauge_logs;
use crate::debug::val::get_val_logs;
use crate::functions::{
    get_function_logs_alloc, get_function_logs_timing, get_functions_alloc_json,
    get_functions_timing_json,
};
use crate::json::Route;
use crate::json::{
    JsonChannelLogsList, JsonFunctionAllocLogsList, JsonFunctionTimingLogsList, JsonFutureLogsList,
    JsonProfilerStatus, JsonStreamLogsList,
};
use crate::lib_on::START_TIME;
use crate::output::format_duration;
use std::sync::LazyLock;

pub(crate) static METRICS_SERVER_PORT: LazyLock<u16> = LazyLock::new(|| {
    std::env::var("HOTPATH_METRICS_PORT")
        .ok()
        .and_then(|p| p.parse::<u16>().ok())
        .unwrap_or(6770)
});

pub(crate) static METRICS_SERVER_DISABLED: LazyLock<bool> = LazyLock::new(|| {
    std::env::var("HOTPATH_METRICS_SERVER_OFF")
        .ok()
        .map(|v| v == "true" || v == "1")
        .unwrap_or(false)
});

pub(crate) static RECV_TIMEOUT_MS: u64 = 250;

const TOKIO_RUNTIME_HINT: &str =
    "Tokio runtime metrics not available - use hotpath::tokio_runtime!() to start collection";

use crate::channels::get_channel_logs;
use crate::data_flow::get_data_flow_json;
use crate::futures::get_future_logs_list;
use crate::streams::get_stream_logs;
use serde::Serialize;
use std::fmt::Display;
use std::sync::OnceLock;
use std::thread;
use tiny_http::{Header, Request, Response, Server};

static HTTP_SERVER_STARTED: OnceLock<()> = OnceLock::new();
static METRICS_SERVER_ERROR: OnceLock<String> = OnceLock::new();

pub(crate) fn get_metrics_server_error() -> Option<&'static str> {
    METRICS_SERVER_ERROR.get().map(|s| s.as_str())
}

#[cfg_attr(feature = "hotpath-meta", hotpath_meta::measure(log = true))]
pub(crate) fn start_metrics_server_once(port: u16) {
    if *METRICS_SERVER_DISABLED {
        return;
    }
    HTTP_SERVER_STARTED.get_or_init(|| {
        start_metrics_server(port);
    });
}

#[cfg_attr(feature = "hotpath-meta", hotpath_meta::measure(log = true))]
fn start_metrics_server(port: u16) {
    #[cfg(feature = "threads")]
    crate::threads::init_threads_monitoring();

    thread::Builder::new()
        .name("hp-server".into())
        .spawn(move || {
            let _suspend = crate::lib_on::SuspendAllocTracking::new();
            let addr = format!("127.0.0.1:{}", port);
            let server = match Server::http(&addr) {
                Ok(s) => s,
                Err(e) => {
                    let msg = format!(
                        "{} busy ({}), skipping metrics server start. Use HOTPATH_METRICS_PORT to change the port.",
                        addr, e
                    );
                    eprintln!("[hotpath - error] {}", msg);
                    let _ = METRICS_SERVER_ERROR.set(msg);
                    return;
                }
            };

            for request in server.incoming_requests() {
                handle_request(request);
            }
        })
        .expect("Failed to spawn HTTP metrics server thread");
}

#[cfg_attr(feature = "hotpath-meta", hotpath_meta::measure(log = true))]
fn handle_request(request: Request) {
    let path = request.url();

    match path.parse::<Route>() {
        Ok(Route::FunctionsTiming) => {
            let formatted = get_functions_timing_json();
            respond_json(request, &formatted);
        }
        Ok(Route::FunctionsAlloc) => match get_functions_alloc_json() {
            Some(formatted) => respond_json(request, &formatted),
            None => respond_error(
                request,
                404,
                "Memory profiling not available - enable hotpath-alloc feature",
            ),
        },
        Ok(Route::FunctionTimingLogs { function_id }) => {
            match get_function_logs_timing(function_id) {
                Some(logs) => {
                    let formatted =
                        JsonFunctionTimingLogsList::from_logs(&logs, get_current_elapsed_ns());
                    respond_json(request, &formatted);
                }
                None => respond_error(
                    request,
                    404,
                    &format!("Function with id {} not found", function_id),
                ),
            }
        }
        Ok(Route::FunctionAllocLogs { function_id }) => {
            match get_function_logs_alloc(function_id) {
                Some(logs) => {
                    let formatted =
                        JsonFunctionAllocLogsList::from_logs(&logs, get_current_elapsed_ns());
                    respond_json(request, &formatted);
                }
                None => respond_error(
                    request,
                    404,
                    "Memory profiling not available - enable hotpath-alloc feature",
                ),
            }
        }
        Ok(Route::Debug) => {
            let debug_stats = get_debug_entries_json();
            respond_json(request, &debug_stats);
        }
        Ok(Route::DataFlow) => {
            let data_flow = get_data_flow_json();
            respond_json(request, &data_flow);
        }
        Ok(Route::DataFlowChannelLogs { channel_id }) => {
            match get_channel_logs(&channel_id.to_string()) {
                Some(logs) => {
                    let formatted = JsonChannelLogsList::from_logs(&logs, get_current_elapsed_ns());
                    respond_json(request, &formatted);
                }
                None => respond_error(request, 404, "Channel not found"),
            }
        }
        Ok(Route::DataFlowStreamLogs { stream_id }) => {
            match get_stream_logs(&stream_id.to_string()) {
                Some(logs) => {
                    let formatted = JsonStreamLogsList::from_logs(&logs, get_current_elapsed_ns());
                    respond_json(request, &formatted);
                }
                None => respond_error(request, 404, "Stream not found"),
            }
        }
        Ok(Route::DataFlowFutureLogs { future_id }) => match get_future_logs_list(future_id) {
            Some(calls) => {
                let formatted = JsonFutureLogsList::from(&calls);
                respond_json(request, &formatted);
            }
            None => respond_error(request, 404, "Future not found"),
        },
        Ok(Route::DebugDbgLogs { id }) => match get_dbg_logs(id) {
            Some(formatted) => respond_json(request, &formatted),
            None => respond_error(request, 404, "Debug entry not found"),
        },
        Ok(Route::DebugValLogs { id }) => match get_val_logs(id) {
            Some(formatted) => respond_json(request, &formatted),
            None => respond_error(request, 404, "Value entry not found"),
        },
        Ok(Route::DebugGaugeLogs { id }) => match get_debug_gauge_logs(id) {
            Some(logs) => respond_json(request, &logs),
            None => respond_error(request, 404, "Gauge entry not found"),
        },
        #[cfg(feature = "threads")]
        Ok(Route::Threads) => {
            let threads = crate::threads::get_threads_json();
            respond_json(request, &threads);
        }
        #[cfg(not(feature = "threads"))]
        Ok(Route::Threads) => {
            respond_error(
                request,
                404,
                "Thread monitoring not available - enable threads feature",
            );
        }
        Ok(Route::TokioRuntime) => {
            #[cfg(feature = "tokio")]
            match crate::tokio_runtime::get_runtime_json() {
                Some(snapshot) => respond_json(request, &snapshot),
                None => respond_error(request, 404, TOKIO_RUNTIME_HINT),
            }
            #[cfg(not(feature = "tokio"))]
            respond_error(request, 404, TOKIO_RUNTIME_HINT);
        }
        Ok(Route::ProfilerStatus) => {
            let status = JsonProfilerStatus {
                uptime: format_duration(get_current_elapsed_ns()),
            };
            respond_json(request, &status);
        }
        Err(_) => respond_error(request, 404, "Not found"),
    }
}

#[cfg_attr(feature = "hotpath-meta", hotpath_meta::measure(log = true))]
fn get_current_elapsed_ns() -> u64 {
    START_TIME
        .get()
        .map(|start| start.elapsed().as_nanos() as u64)
        .unwrap_or(0)
}

#[cfg_attr(feature = "hotpath-meta", hotpath_meta::measure(log = true))]
fn respond_json<T: Serialize>(request: Request, value: &T) {
    match serde_json::to_vec(value) {
        Ok(body) => {
            let mut response = Response::from_data(body);
            response.add_header(
                Header::from_bytes(b"Content-Type".as_slice(), b"application/json".as_slice())
                    .unwrap(),
            );
            let _ = request.respond(response);
        }
        Err(e) => respond_internal_error(request, e),
    }
}

#[cfg_attr(feature = "hotpath-meta", hotpath_meta::measure(log = true))]
fn respond_error(request: Request, code: u16, msg: &str) {
    let body = format!(r#"{{"error":"{}"}}"#, msg);
    let mut response = Response::from_string(body).with_status_code(code);
    response.add_header(
        Header::from_bytes(b"Content-Type".as_slice(), b"application/json".as_slice()).unwrap(),
    );
    let _ = request.respond(response);
}

fn respond_internal_error(request: Request, e: impl Display) {
    eprintln!("Internal server error: {}", e);
    let _ = request.respond(
        Response::from_string(format!("Internal server error: {}", e)).with_status_code(500),
    );
}