newton-aggregator 0.4.13

newton prover aggregator utils
Documentation
//! Supervised task spawning with panic logging.
//!
//! Local mirror of `crates/gateway/src/task_supervisor.rs`, kept here to avoid
//! a circular `aggregator → gateway` crate dependency. The aggregator binary
//! crate (`bin/newton-prover-gateway`) is composed of the gateway crate which
//! itself depends on this aggregator crate, so we cannot reach back the other
//! way for shared infrastructure.
//!
//! Use [`spawn_monitored`] for any background loop the aggregator owns. Bare
//! `tokio::spawn` dies silently to stderr on panic per `.claude/rules/rust.md`
//! "Bare `tokio::spawn` for background loops" anti-pattern; the panic is
//! invisible to Datadog and the surrounding service has no observability of
//! the failure.

use futures_util::FutureExt;
use std::{future::Future, panic::AssertUnwindSafe};
use tokio::task::JoinHandle;
use tracing::{error, info};

/// Spawns a monitored task with panic logging. No restart.
///
/// Wraps the future in [`AssertUnwindSafe`] + `catch_unwind` so a panic in the
/// supervised loop produces a structured `tracing::error!` event with the
/// task name and panic payload, rather than vanishing into stderr. The
/// returned [`JoinHandle`] completes whenever the inner task exits — cleanly
/// or via panic.
pub fn spawn_monitored<Fut>(name: &'static str, fut: Fut) -> JoinHandle<()>
where
    Fut: Future<Output = ()> + Send + 'static,
{
    tokio::spawn(async move {
        let result = AssertUnwindSafe(fut).catch_unwind().await;
        match result {
            Ok(()) => {
                info!(task = name, "monitored task exited cleanly");
            }
            Err(panic_info) => {
                let panic_msg = extract_panic_message(&panic_info);
                error!(
                    task = name,
                    panic = %panic_msg,
                    "monitored task panicked"
                );
            }
        }
    })
}

/// Extract a readable message from a panic payload.
fn extract_panic_message(panic_info: &Box<dyn std::any::Any + Send>) -> String {
    if let Some(s) = panic_info.downcast_ref::<String>() {
        s.clone()
    } else if let Some(s) = panic_info.downcast_ref::<&str>() {
        s.to_string()
    } else {
        format!("{:?}", panic_info)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn clean_exit_logs_info_and_completes() {
        let handle = spawn_monitored("test_clean", async {});
        handle.await.expect("monitored handle joins cleanly");
    }

    #[tokio::test]
    async fn panic_is_caught_and_handle_completes() {
        // The supervised loop panics; the wrapper turns that into a structured
        // log event and the JoinHandle completes Ok(()) — never propagates the
        // panic to the parent task.
        let handle = spawn_monitored("test_panic", async {
            panic!("test panic message");
        });
        handle.await.expect("panic absorbed; handle completes Ok");
    }

    #[test]
    fn extract_panic_message_handles_string_and_str() {
        let s: Box<dyn std::any::Any + Send> = Box::new(String::from("owned"));
        assert_eq!(extract_panic_message(&s), "owned");

        let r: Box<dyn std::any::Any + Send> = Box::new("static");
        assert_eq!(extract_panic_message(&r), "static");
    }
}