systemprompt-analytics 0.1.18

Analytics module for systemprompt.io - session tracking, metrics, and reporting
Documentation
use anyhow::Result;
use chrono::{Duration, Utc};
use sqlx::PgPool;
use systemprompt_identifiers::{SessionId, UserId};

use super::types::CreateSessionParams;

pub async fn update_activity(pool: &PgPool, session_id: &SessionId) -> Result<()> {
    let id = session_id.as_str();
    sqlx::query!(
        r#"
        UPDATE user_sessions
        SET last_activity_at = CURRENT_TIMESTAMP,
            duration_seconds = EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - started_at))::INTEGER
        WHERE session_id = $1
        "#,
        id
    )
    .execute(pool)
    .await?;
    Ok(())
}

pub async fn increment_request_count(pool: &PgPool, session_id: &SessionId) -> Result<()> {
    let id = session_id.as_str();
    sqlx::query!(
        r#"
        UPDATE user_sessions
        SET request_count = request_count + 1,
            last_activity_at = CURRENT_TIMESTAMP,
            duration_seconds = EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - started_at))::INTEGER
        WHERE session_id = $1
        "#,
        id
    )
    .execute(pool)
    .await?;
    Ok(())
}

pub async fn increment_task_count(pool: &PgPool, session_id: &SessionId) -> Result<()> {
    let id = session_id.as_str();
    sqlx::query!(
        "UPDATE user_sessions SET task_count = task_count + 1, last_activity_at = \
         CURRENT_TIMESTAMP WHERE session_id = $1",
        id
    )
    .execute(pool)
    .await?;
    Ok(())
}

pub async fn increment_ai_request_count(pool: &PgPool, session_id: &SessionId) -> Result<()> {
    let id = session_id.as_str();
    sqlx::query!(
        "UPDATE user_sessions SET ai_request_count = ai_request_count + 1, last_activity_at = \
         CURRENT_TIMESTAMP WHERE session_id = $1",
        id
    )
    .execute(pool)
    .await?;
    Ok(())
}

pub async fn increment_message_count(pool: &PgPool, session_id: &SessionId) -> Result<()> {
    let id = session_id.as_str();
    sqlx::query!(
        "UPDATE user_sessions SET message_count = message_count + 1, last_activity_at = \
         CURRENT_TIMESTAMP WHERE session_id = $1",
        id
    )
    .execute(pool)
    .await?;
    Ok(())
}

pub async fn end_session(pool: &PgPool, session_id: &SessionId) -> Result<()> {
    let id = session_id.as_str();
    sqlx::query!(
        r#"
        UPDATE user_sessions
        SET ended_at = CURRENT_TIMESTAMP,
            duration_seconds = EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - started_at))::INTEGER
        WHERE session_id = $1
        "#,
        id
    )
    .execute(pool)
    .await?;
    Ok(())
}

pub async fn mark_as_scanner(pool: &PgPool, session_id: &SessionId) -> Result<()> {
    let id = session_id.as_str();
    sqlx::query!(
        "UPDATE user_sessions SET is_scanner = true WHERE session_id = $1",
        id
    )
    .execute(pool)
    .await?;
    Ok(())
}

pub async fn cleanup_inactive(pool: &PgPool, inactive_hours: i32) -> Result<u64> {
    let cutoff = Utc::now() - Duration::hours(i64::from(inactive_hours));
    let result = sqlx::query!(
        r#"
        UPDATE user_sessions
        SET ended_at = CURRENT_TIMESTAMP
        WHERE ended_at IS NULL AND last_activity_at < $1
        "#,
        cutoff
    )
    .execute(pool)
    .await?;
    Ok(result.rows_affected())
}

pub async fn migrate_user_sessions(
    pool: &PgPool,
    old_user_id: &UserId,
    new_user_id: &UserId,
) -> Result<u64> {
    let old_id = old_user_id.as_str();
    let new_id = new_user_id.as_str();
    let result = sqlx::query!(
        "UPDATE user_sessions SET user_id = $1 WHERE user_id = $2",
        new_id,
        old_id
    )
    .execute(pool)
    .await?;
    Ok(result.rows_affected())
}

#[allow(clippy::cognitive_complexity)]
pub async fn create_session(pool: &PgPool, params: &CreateSessionParams<'_>) -> Result<()> {
    let session_id = params.session_id.as_str();
    let user_id = params.user_id.map(UserId::as_str);
    let session_source = params.session_source.as_str();
    sqlx::query!(
        r#"
        INSERT INTO user_sessions (
            session_id, user_id, session_source, fingerprint_hash, ip_address, user_agent,
            device_type, browser, os, country, region, city, preferred_locale,
            referrer_source, referrer_url, landing_page, entry_url,
            utm_source, utm_medium, utm_campaign, is_bot, expires_at,
            started_at, last_activity_at
        )
        VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
        "#,
        session_id,
        user_id,
        session_source,
        params.fingerprint_hash,
        params.ip_address,
        params.user_agent,
        params.device_type,
        params.browser,
        params.os,
        params.country,
        params.region,
        params.city,
        params.preferred_locale,
        params.referrer_source,
        params.referrer_url,
        params.landing_page,
        params.entry_url,
        params.utm_source,
        params.utm_medium,
        params.utm_campaign,
        params.is_bot,
        params.expires_at
    )
    .execute(pool)
    .await?;
    Ok(())
}

pub async fn increment_ai_usage(
    pool: &PgPool,
    session_id: &SessionId,
    tokens: i32,
    cost_microdollars: i64,
) -> Result<()> {
    let id = session_id.as_str();
    sqlx::query!(
        r#"
        UPDATE user_sessions
        SET ai_request_count = COALESCE(ai_request_count, 0) + 1,
            total_tokens_used = COALESCE(total_tokens_used, 0) + $1,
            total_ai_cost_microdollars = COALESCE(total_ai_cost_microdollars, 0) + $2,
            last_activity_at = CURRENT_TIMESTAMP
        WHERE session_id = $3
        "#,
        tokens,
        cost_microdollars,
        id
    )
    .execute(pool)
    .await?;
    Ok(())
}

pub async fn escalate_throttle(
    pool: &PgPool,
    session_id: &SessionId,
    new_level: i32,
) -> Result<()> {
    let id = session_id.as_str();

    sqlx::query!(
        r#"
        UPDATE user_sessions
        SET throttle_level = $1,
            throttle_escalated_at = CURRENT_TIMESTAMP
        WHERE session_id = $2
        "#,
        new_level,
        id
    )
    .execute(pool)
    .await?;

    Ok(())
}