hotpath 0.15.0

Simple async Rust profiler with memory and data-flow insights - quickly find and debug performance bottlenecks.
Documentation
use std::sync::OnceLock;
use std::time::Duration;

use tokio::runtime::Handle;

use crate::json::{JsonRuntimeSnapshot, JsonRuntimeWorker};

static RUNTIME_STATE: OnceLock<()> = OnceLock::new();

const DEFAULT_RUNTIME_INTERVAL_MS: u64 = 1000;

pub fn init_runtime_monitoring(handle: &Handle) {
    let handle = handle.clone();
    RUNTIME_STATE.get_or_init(|| {
        let interval_ms = std::env::var("HOTPATH_TOKIO_RUNTIME_INTERVAL_MS")
            .ok()
            .and_then(|s| s.parse().ok())
            .unwrap_or(DEFAULT_RUNTIME_INTERVAL_MS);

        let interval = Duration::from_millis(interval_ms);

        std::thread::Builder::new()
            .name("hp-runtime".into())
            .spawn(move || {
                runtime_loop(handle, interval);
            })
            .expect("Failed to spawn hp-runtime thread");
    });
}

fn runtime_loop(handle: Handle, interval: Duration) {
    loop {
        let metrics = handle.metrics();
        let snapshot = snapshot_from_metrics(&metrics);

        if let Ok(mut lock) = get_snapshot_lock().write() {
            *lock = Some(snapshot);
        }

        std::thread::sleep(interval);
    }
}

fn snapshot_from_metrics(m: &tokio::runtime::RuntimeMetrics) -> JsonRuntimeSnapshot {
    let workers = (0..m.num_workers())
        .map(|i| {
            #[allow(unused_mut)]
            let mut w = JsonRuntimeWorker {
                index: i,
                park_count: m.worker_park_count(i),
                busy_duration_ms: m.worker_total_busy_duration(i).as_millis() as u64,
                poll_count: None,
                steal_count: None,
                steal_operations: None,
                overflow_count: None,
                local_queue_depth: None,
                mean_poll_time_us: None,
            };

            #[cfg(tokio_unstable)]
            {
                w.poll_count = Some(m.worker_poll_count(i));
                w.steal_count = Some(m.worker_steal_count(i));
                w.steal_operations = Some(m.worker_steal_operations(i));
                w.overflow_count = Some(m.worker_overflow_count(i));
                w.local_queue_depth = Some(m.worker_local_queue_depth(i));
                w.mean_poll_time_us = Some(m.worker_mean_poll_time(i).as_micros() as u64);
            }

            w
        })
        .collect();

    #[allow(unused_mut)]
    let mut snapshot = JsonRuntimeSnapshot {
        num_workers: m.num_workers(),
        num_alive_tasks: m.num_alive_tasks(),
        global_queue_depth: m.global_queue_depth(),
        workers,
        num_blocking_threads: None,
        num_idle_blocking_threads: None,
        blocking_queue_depth: None,
        spawned_tasks_count: None,
        remote_schedule_count: None,
        io_driver_fd_registered_count: None,
        io_driver_fd_deregistered_count: None,
        io_driver_ready_count: None,
    };

    #[cfg(tokio_unstable)]
    {
        snapshot.num_blocking_threads = Some(m.num_blocking_threads());
        snapshot.num_idle_blocking_threads = Some(m.num_idle_blocking_threads());
        snapshot.blocking_queue_depth = Some(m.blocking_queue_depth());
        snapshot.spawned_tasks_count = Some(m.spawned_tasks_count());
        snapshot.remote_schedule_count = Some(m.remote_schedule_count());
        snapshot.io_driver_fd_registered_count = Some(m.io_driver_fd_registered_count());
        snapshot.io_driver_fd_deregistered_count = Some(m.io_driver_fd_deregistered_count());
        snapshot.io_driver_ready_count = Some(m.io_driver_ready_count());
    }

    snapshot
}

static LATEST_SNAPSHOT: OnceLock<std::sync::RwLock<Option<JsonRuntimeSnapshot>>> = OnceLock::new();

fn get_snapshot_lock() -> &'static std::sync::RwLock<Option<JsonRuntimeSnapshot>> {
    LATEST_SNAPSHOT.get_or_init(|| std::sync::RwLock::new(None))
}

pub(crate) fn get_runtime_json() -> Option<JsonRuntimeSnapshot> {
    get_snapshot_lock().read().ok()?.clone()
}