systemprompt-analytics 0.2.2

Analytics for systemprompt.io AI governance infrastructure. Session, agent, tool, and microdollar-precision cost attribution across the MCP governance pipeline.
Documentation
use std::sync::Arc;

use anyhow::Result;
use sqlx::PgPool;
use systemprompt_database::DbPool;
use systemprompt_identifiers::{ContentId, SessionId, UserId};

use crate::models::{AnalyticsEventCreated, AnalyticsEventType, CreateAnalyticsEventInput};

#[derive(Clone, Debug)]
pub struct AnalyticsEventsRepository {
    pool: Arc<PgPool>,
    write_pool: Arc<PgPool>,
}

impl AnalyticsEventsRepository {
    pub fn new(db: &DbPool) -> Result<Self> {
        let pool = db.pool_arc()?;
        let write_pool = db.write_pool_arc()?;
        Ok(Self { pool, write_pool })
    }

    pub async fn create_event(
        &self,
        session_id: &SessionId,
        user_id: &UserId,
        input: &CreateAnalyticsEventInput,
    ) -> Result<AnalyticsEventCreated> {
        let id = format!("evt_{}", uuid::Uuid::new_v4());
        let event_type = input.event_type.as_str();
        let event_category = input.event_type.category();

        let event_data = Self::build_event_data(input);

        sqlx::query!(
            r#"
            INSERT INTO analytics_events (
                id, user_id, session_id, event_type, event_category,
                severity, endpoint, event_data
            )
            VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
            "#,
            id,
            user_id.as_str(),
            session_id.as_str(),
            event_type,
            event_category,
            "info",
            input.page_url,
            event_data
        )
        .execute(&*self.write_pool)
        .await?;

        Ok(AnalyticsEventCreated {
            id,
            event_type: event_type.to_string(),
        })
    }

    pub async fn create_events_batch(
        &self,
        session_id: &SessionId,
        user_id: &UserId,
        inputs: &[CreateAnalyticsEventInput],
    ) -> Result<Vec<AnalyticsEventCreated>> {
        if inputs.is_empty() {
            return Ok(Vec::new());
        }

        let mut ids = Vec::with_capacity(inputs.len());
        let mut user_ids = Vec::with_capacity(inputs.len());
        let mut session_ids = Vec::with_capacity(inputs.len());
        let mut event_types = Vec::with_capacity(inputs.len());
        let mut event_categories = Vec::with_capacity(inputs.len());
        let mut severities = Vec::with_capacity(inputs.len());
        let mut endpoints: Vec<String> = Vec::with_capacity(inputs.len());
        let mut event_datas = Vec::with_capacity(inputs.len());

        for input in inputs {
            let id = format!("evt_{}", uuid::Uuid::new_v4());
            ids.push(id);
            user_ids.push(user_id.as_str().to_string());
            session_ids.push(session_id.as_str().to_string());
            event_types.push(input.event_type.as_str().to_string());
            event_categories.push(input.event_type.category().to_string());
            severities.push("info".to_string());
            endpoints.push(input.page_url.clone());
            event_datas.push(Self::build_event_data(input));
        }

        sqlx::query!(
            r#"
            INSERT INTO analytics_events (id, user_id, session_id, event_type, event_category, severity, endpoint, event_data)
            SELECT * FROM UNNEST($1::text[], $2::text[], $3::text[], $4::text[], $5::text[], $6::text[], $7::text[], $8::jsonb[])
            "#,
            &ids,
            &user_ids,
            &session_ids,
            &event_types,
            &event_categories,
            &severities,
            &endpoints,
            &event_datas
        )
        .execute(&*self.write_pool)
        .await?;

        Ok(ids
            .into_iter()
            .zip(event_types)
            .map(|(id, event_type)| AnalyticsEventCreated { id, event_type })
            .collect())
    }

    pub async fn count_events_by_type(
        &self,
        session_id: &SessionId,
        event_type: &AnalyticsEventType,
    ) -> Result<i64> {
        let count = sqlx::query_scalar!(
            r#"
            SELECT COUNT(*) as "count!"
            FROM analytics_events
            WHERE session_id = $1 AND event_type = $2
            "#,
            session_id.as_str(),
            event_type.as_str()
        )
        .fetch_one(&*self.pool)
        .await?;

        Ok(count)
    }

    pub async fn find_by_session(
        &self,
        session_id: &SessionId,
        limit: i64,
    ) -> Result<Vec<StoredAnalyticsEvent>> {
        let events = sqlx::query_as!(
            StoredAnalyticsEvent,
            r#"
            SELECT
                id,
                user_id as "user_id: UserId",
                session_id as "session_id: SessionId",
                event_type,
                event_category,
                endpoint as page_url,
                event_data,
                timestamp
            FROM analytics_events
            WHERE session_id = $1
            ORDER BY timestamp DESC
            LIMIT $2
            "#,
            session_id.as_str(),
            limit
        )
        .fetch_all(&*self.pool)
        .await?;

        Ok(events)
    }

    pub async fn find_by_content(
        &self,
        content_id: &ContentId,
        limit: i64,
    ) -> Result<Vec<StoredAnalyticsEvent>> {
        let events = sqlx::query_as!(
            StoredAnalyticsEvent,
            r#"
            SELECT
                id,
                user_id as "user_id: UserId",
                session_id as "session_id: SessionId",
                event_type,
                event_category,
                endpoint as page_url,
                event_data,
                timestamp
            FROM analytics_events
            WHERE event_data->>'content_id' = $1
            ORDER BY timestamp DESC
            LIMIT $2
            "#,
            content_id.as_str(),
            limit
        )
        .fetch_all(&*self.pool)
        .await?;

        Ok(events)
    }

    fn build_event_data(input: &CreateAnalyticsEventInput) -> serde_json::Value {
        let mut data = input.data.clone().unwrap_or(serde_json::json!({}));

        if let Some(obj) = data.as_object_mut() {
            if let Some(content_id) = &input.content_id {
                obj.insert(
                    "content_id".to_string(),
                    serde_json::json!(content_id.as_str()),
                );
            }
            if let Some(slug) = &input.slug {
                obj.insert("slug".to_string(), serde_json::json!(slug));
            }
            if let Some(referrer) = &input.referrer {
                obj.insert("referrer".to_string(), serde_json::json!(referrer));
            }
        }

        data
    }
}

#[derive(Debug, Clone, sqlx::FromRow)]
pub struct StoredAnalyticsEvent {
    // SQLx: analytics event primary key (text column, no typed ID defined)
    pub id: String,
    pub user_id: UserId,
    pub session_id: Option<SessionId>,
    pub event_type: String,
    pub event_category: String,
    pub page_url: Option<String>,
    pub event_data: Option<serde_json::Value>,
    pub timestamp: chrono::DateTime<chrono::Utc>,
}