systemprompt-logging 0.2.0

Core logging module for systemprompt.io OS
Documentation
use anyhow::Result;
use serde_json::json;
use sqlx::PgPool;
use std::sync::Arc;

use super::models::{AiRequestSummary, TraceEvent};

pub use super::step_queries::{
    fetch_execution_step_events, fetch_execution_step_summary, fetch_mcp_execution_events,
    fetch_mcp_execution_summary, fetch_task_id_for_trace,
};

pub async fn fetch_log_events(pool: &Arc<PgPool>, trace_id: &str) -> Result<Vec<TraceEvent>> {
    let rows = sqlx::query!(
        r#"
        SELECT
            timestamp,
            level as type,
            CONCAT(module, ': ', message) as details,
            user_id,
            session_id,
            task_id,
            context_id,
            metadata
        FROM logs
        WHERE trace_id = $1
        ORDER BY timestamp ASC
        "#,
        trace_id
    )
    .fetch_all(&**pool)
    .await?;

    Ok(rows
        .into_iter()
        .map(|row| TraceEvent {
            event_type: row.r#type,
            timestamp: row.timestamp,
            details: row.details.unwrap_or_else(String::new),
            user_id: row.user_id.map(Into::into),
            session_id: row.session_id.map(Into::into),
            task_id: row.task_id.map(Into::into),
            context_id: row.context_id.map(Into::into),
            metadata: row.metadata,
        })
        .collect())
}

pub async fn fetch_ai_request_summary(
    pool: &Arc<PgPool>,
    trace_id: &str,
) -> Result<AiRequestSummary> {
    let row = sqlx::query!(
        r#"
        SELECT
            COALESCE(SUM(cost_microdollars), 0)::bigint as total_cost_microdollars,
            COALESCE(SUM(COALESCE(input_tokens, 0) + COALESCE(output_tokens, 0)), 0)::bigint as total_tokens,
            COALESCE(SUM(input_tokens), 0)::bigint as total_input_tokens,
            COALESCE(SUM(output_tokens), 0)::bigint as total_output_tokens,
            COUNT(*)::bigint as request_count,
            COALESCE(SUM(latency_ms), 0)::bigint as total_latency_ms
        FROM ai_requests
        WHERE trace_id = $1
        "#,
        trace_id
    )
    .fetch_one(&**pool)
    .await?;

    Ok(AiRequestSummary {
        total_cost_microdollars: row.total_cost_microdollars.unwrap_or(0),
        total_tokens: row.total_tokens.unwrap_or(0),
        total_input_tokens: row.total_input_tokens.unwrap_or(0),
        total_output_tokens: row.total_output_tokens.unwrap_or(0),
        request_count: row.request_count.unwrap_or(0),
        total_latency_ms: row.total_latency_ms.unwrap_or(0),
    })
}

pub async fn fetch_ai_request_events(
    pool: &Arc<PgPool>,
    trace_id: &str,
) -> Result<Vec<TraceEvent>> {
    let rows = sqlx::query!(
        r#"
        SELECT
            created_at as timestamp,
            provider,
            model,
            input_tokens,
            output_tokens,
            cost_microdollars,
            latency_ms,
            status,
            user_id,
            session_id,
            task_id,
            context_id
        FROM ai_requests
        WHERE trace_id = $1
        ORDER BY created_at ASC
        "#,
        trace_id
    )
    .fetch_all(&**pool)
    .await?;

    Ok(rows
        .into_iter()
        .map(|row| {
            let details = format!(
                "{}/{}: {} (in:{}, out:{}, {}ms)",
                row.provider,
                row.model,
                row.status,
                row.input_tokens.unwrap_or(0),
                row.output_tokens.unwrap_or(0),
                row.latency_ms.unwrap_or(0)
            );

            let metadata = json!({
                "cost_microdollars": row.cost_microdollars,
                "latency_ms": row.latency_ms,
                "input_tokens": row.input_tokens,
                "output_tokens": row.output_tokens,
                "tokens_used": row.input_tokens.unwrap_or(0) + row.output_tokens.unwrap_or(0),
                "provider": row.provider,
                "model": row.model
            });

            TraceEvent {
                event_type: "AI".to_string(),
                timestamp: row.timestamp,
                details,
                user_id: Some(row.user_id.into()),
                session_id: row.session_id.map(Into::into),
                task_id: row.task_id.map(Into::into),
                context_id: row.context_id.map(Into::into),
                metadata: Some(metadata.to_string()),
            }
        })
        .collect())
}