systemprompt-logging 0.2.2

Tracing and audit infrastructure for systemprompt.io AI governance. Structured events, five-point audit traces, and SIEM-ready JSON output — part of the MCP governance pipeline.
Documentation
use anyhow::Context;
use chrono::{DateTime, Utc};
use sqlx::PgPool;
use systemprompt_identifiers::{ClientId, ContextId, LogId, SessionId, TaskId, TraceId, UserId};

use crate::models::{LogEntry, LogFilter, LogLevel, LoggingError};

struct LogRow {
    id: String,
    timestamp: DateTime<Utc>,
    level: String,
    module: String,
    message: String,
    metadata: Option<String>,
    user_id: String,
    session_id: String,
    task_id: Option<String>,
    trace_id: String,
    context_id: Option<String>,
    client_id: Option<String>,
}

fn row_to_entry(r: LogRow) -> LogEntry {
    LogEntry {
        id: LogId::new(r.id),
        timestamp: r.timestamp,
        level: r.level.parse().unwrap_or(LogLevel::Info),
        module: r.module,
        message: r.message,
        metadata: r.metadata.as_ref().and_then(|m| {
            serde_json::from_str(m)
                .map_err(|e| {
                    tracing::warn!(error = %e, raw = %m, "Failed to parse log metadata JSON");
                    e
                })
                .ok()
        }),
        user_id: UserId::new(r.user_id),
        session_id: SessionId::new(r.session_id),
        task_id: r.task_id.map(TaskId::new),
        trace_id: TraceId::new(r.trace_id),
        context_id: r.context_id.map(ContextId::new),
        client_id: r.client_id.map(ClientId::new),
    }
}

pub async fn get_log(pool: &PgPool, id: &LogId) -> Result<Option<LogEntry>, LoggingError> {
    let id_str = id.as_str();

    let row = sqlx::query_as!(
        LogRow,
        r#"
        SELECT
            id as "id!", timestamp as "timestamp!", level as "level!", module as "module!",
            message as "message!", metadata, user_id as "user_id!", session_id as "session_id!",
            task_id, trace_id as "trace_id!", context_id, client_id
        FROM logs WHERE id = $1
        "#,
        id_str
    )
    .fetch_optional(pool)
    .await
    .context("Failed to get log by id")?;

    Ok(row.map(row_to_entry))
}

pub async fn list_logs(pool: &PgPool, limit: i64) -> Result<Vec<LogEntry>, LoggingError> {
    let rows = sqlx::query_as!(
        LogRow,
        r#"
        SELECT
            id as "id!", timestamp as "timestamp!", level as "level!", module as "module!",
            message as "message!", metadata, user_id as "user_id!", session_id as "session_id!",
            task_id, trace_id as "trace_id!", context_id, client_id
        FROM logs ORDER BY timestamp DESC LIMIT $1
        "#,
        limit
    )
    .fetch_all(pool)
    .await
    .context("Failed to list logs")?;

    Ok(rows.into_iter().map(row_to_entry).collect())
}

pub async fn list_logs_paginated(
    pool: &PgPool,
    filter: &LogFilter,
) -> Result<(Vec<LogEntry>, i64), LoggingError> {
    let (offset, per_page) = calculate_pagination(filter);
    let level_filter = filter.level();
    let module_filter = filter.module();
    let message_pattern = filter.message().map(|m| format!("%{m}%"));
    let since_filter = filter.since();

    let rows = sqlx::query_as!(
        LogRow,
        r#"
        SELECT
            id as "id!", timestamp as "timestamp!", level as "level!", module as "module!",
            message as "message!", metadata, user_id as "user_id!", session_id as "session_id!",
            task_id, trace_id as "trace_id!", context_id, client_id
        FROM logs
        WHERE ($1::VARCHAR IS NULL OR level = $1)
        AND ($2::VARCHAR IS NULL OR module = $2)
        AND ($3::VARCHAR IS NULL OR message LIKE $3)
        AND ($4::TIMESTAMPTZ IS NULL OR timestamp >= $4)
        ORDER BY timestamp DESC LIMIT $5 OFFSET $6
        "#,
        level_filter,
        module_filter,
        message_pattern,
        since_filter,
        per_page,
        offset
    )
    .fetch_all(pool)
    .await
    .context("Failed to get paginated logs")?;

    let count = fetch_filtered_count(
        pool,
        level_filter.map(ToString::to_string),
        module_filter.map(ToString::to_string),
        message_pattern,
        since_filter,
    )
    .await?;
    let entries = rows.into_iter().map(row_to_entry).collect();

    Ok((entries, count))
}

fn calculate_pagination(filter: &LogFilter) -> (i64, i64) {
    let offset = i64::from(
        filter
            .page()
            .saturating_sub(1)
            .saturating_mul(filter.per_page()),
    );
    let per_page = i64::from(filter.per_page());
    (offset, per_page)
}

async fn fetch_filtered_count(
    pool: &PgPool,
    level: Option<String>,
    module: Option<String>,
    message_pattern: Option<String>,
    since: Option<DateTime<Utc>>,
) -> Result<i64, LoggingError> {
    sqlx::query_scalar!(
        r#"
        SELECT COUNT(*) as "count!" FROM logs
        WHERE ($1::VARCHAR IS NULL OR level = $1)
        AND ($2::VARCHAR IS NULL OR module = $2)
        AND ($3::VARCHAR IS NULL OR message LIKE $3)
        AND ($4::TIMESTAMPTZ IS NULL OR timestamp >= $4)
        "#,
        level,
        module,
        message_pattern,
        since
    )
    .fetch_one(pool)
    .await
    .context("Failed to count logs")
    .map_err(Into::into)
}

pub async fn list_logs_by_module_patterns(
    pool: &PgPool,
    patterns: &[String],
    limit: i64,
) -> Result<Vec<LogEntry>, LoggingError> {
    let rows = sqlx::query_as!(
        LogRow,
        r#"
        SELECT
            id as "id!", timestamp as "timestamp!", level as "level!", module as "module!",
            message as "message!", metadata, user_id as "user_id!", session_id as "session_id!",
            task_id, trace_id as "trace_id!", context_id, client_id
        FROM logs
        WHERE module LIKE ANY($1)
        ORDER BY timestamp DESC LIMIT $2
        "#,
        patterns,
        limit
    )
    .fetch_all(pool)
    .await
    .context("Failed to list logs by module patterns")?;

    Ok(rows.into_iter().map(row_to_entry).collect())
}