systemprompt-analytics 0.1.18

Analytics module for systemprompt.io - session tracking, metrics, and reporting
Documentation
use anyhow::Result;
use chrono::Utc;
use systemprompt_identifiers::{FunnelId, FunnelProgressId, SessionId};

use super::FunnelRepository;
use crate::models::{CreateFunnelInput, Funnel, FunnelProgress, FunnelStep, FunnelWithSteps};

impl FunnelRepository {
    pub async fn create_funnel(&self, input: &CreateFunnelInput) -> Result<FunnelWithSteps> {
        let funnel_id = FunnelId::generate();
        let now = Utc::now();

        sqlx::query!(
            r#"
            INSERT INTO funnels (id, name, description, is_active, created_at, updated_at)
            VALUES ($1, $2, $3, TRUE, $4, $4)
            "#,
            funnel_id.as_str(),
            input.name,
            input.description,
            now
        )
        .execute(&*self.write_pool)
        .await?;

        let mut steps = Vec::with_capacity(input.steps.len());
        for (idx, step_input) in input.steps.iter().enumerate() {
            let step_order = i32::try_from(idx).unwrap_or(0);
            let match_type = step_input.match_type.as_str();

            sqlx::query!(
                r#"
                INSERT INTO funnel_steps (funnel_id, step_order, name, match_pattern, match_type)
                VALUES ($1, $2, $3, $4, $5)
                "#,
                funnel_id.as_str(),
                step_order,
                step_input.name,
                step_input.match_pattern,
                match_type
            )
            .execute(&*self.write_pool)
            .await?;

            steps.push(FunnelStep {
                funnel_id: funnel_id.clone(),
                step_order,
                name: step_input.name.clone(),
                match_pattern: step_input.match_pattern.clone(),
                match_type: step_input.match_type,
            });
        }

        let funnel = Funnel {
            id: funnel_id,
            name: input.name.clone(),
            description: input.description.clone(),
            is_active: true,
            created_at: now,
            updated_at: now,
        };

        Ok(FunnelWithSteps { funnel, steps })
    }

    pub async fn deactivate(&self, id: &FunnelId) -> Result<bool> {
        let result = sqlx::query!(
            r#"
            UPDATE funnels SET is_active = FALSE, updated_at = $2 WHERE id = $1
            "#,
            id.as_str(),
            Utc::now()
        )
        .execute(&*self.write_pool)
        .await?;

        Ok(result.rows_affected() > 0)
    }

    pub async fn delete(&self, id: &FunnelId) -> Result<bool> {
        let result = sqlx::query!(r#"DELETE FROM funnels WHERE id = $1"#, id.as_str())
            .execute(&*self.write_pool)
            .await?;

        Ok(result.rows_affected() > 0)
    }

    pub async fn record_progress(
        &self,
        funnel_id: &FunnelId,
        session_id: &SessionId,
        step: i32,
    ) -> Result<FunnelProgress> {
        let now = Utc::now();
        let step_timestamp = serde_json::json!({
            "step": step,
            "timestamp": now.to_rfc3339()
        });

        if let Some(mut progress) = self.find_progress(funnel_id, session_id).await? {
            if step > progress.current_step {
                let mut timestamps = progress
                    .step_timestamps
                    .as_array()
                    .cloned()
                    .unwrap_or_else(Vec::new);
                timestamps.push(step_timestamp);

                sqlx::query!(
                    r#"
                    UPDATE funnel_progress
                    SET current_step = $3, step_timestamps = $4, updated_at = $5
                    WHERE funnel_id = $1 AND session_id = $2
                    "#,
                    funnel_id.as_str(),
                    session_id.as_str(),
                    step,
                    serde_json::Value::Array(timestamps.clone()),
                    now
                )
                .execute(&*self.write_pool)
                .await?;

                progress.current_step = step;
                progress.step_timestamps = serde_json::Value::Array(timestamps);
                progress.updated_at = now;
            }
            return Ok(progress);
        }

        let id = FunnelProgressId::generate();
        let timestamps = serde_json::json!([step_timestamp]);

        sqlx::query!(
            r#"
            INSERT INTO funnel_progress (
                id, funnel_id, session_id, current_step, step_timestamps, created_at, updated_at
            )
            VALUES ($1, $2, $3, $4, $5, $6, $6)
            "#,
            id.as_str(),
            funnel_id.as_str(),
            session_id.as_str(),
            step,
            timestamps,
            now
        )
        .execute(&*self.write_pool)
        .await?;

        Ok(FunnelProgress {
            id,
            funnel_id: funnel_id.clone(),
            session_id: session_id.clone(),
            current_step: step,
            completed_at: None,
            dropped_at_step: None,
            step_timestamps: timestamps,
            created_at: now,
            updated_at: now,
        })
    }

    pub async fn mark_completed(
        &self,
        funnel_id: &FunnelId,
        session_id: &SessionId,
    ) -> Result<bool> {
        let now = Utc::now();
        let result = sqlx::query!(
            r#"
            UPDATE funnel_progress
            SET completed_at = $3, updated_at = $3
            WHERE funnel_id = $1 AND session_id = $2
            "#,
            funnel_id.as_str(),
            session_id.as_str(),
            now
        )
        .execute(&*self.write_pool)
        .await?;

        Ok(result.rows_affected() > 0)
    }
}