Skip to main content

systemprompt_analytics/repository/funnel/
stats.rs

1use crate::Result;
2use chrono::{DateTime, Utc};
3use systemprompt_identifiers::FunnelId;
4
5use super::FunnelRepository;
6use crate::models::{FunnelStats, FunnelStep, FunnelStepStats};
7
8impl FunnelRepository {
9    pub async fn get_stats(
10        &self,
11        funnel_id: &FunnelId,
12        since: DateTime<Utc>,
13    ) -> Result<FunnelStats> {
14        let funnel = self.find_by_id(funnel_id).await?.ok_or_else(|| {
15            crate::AnalyticsError::SessionNotFound(format!("funnel {}", funnel_id))
16        })?;
17
18        let total_entries = sqlx::query_scalar!(
19            r#"
20            SELECT COUNT(*) as "count!"
21            FROM funnel_progress
22            WHERE funnel_id = $1 AND created_at >= $2
23            "#,
24            funnel_id.as_str(),
25            since
26        )
27        .fetch_one(&*self.pool)
28        .await?;
29
30        let total_completions = sqlx::query_scalar!(
31            r#"
32            SELECT COUNT(*) as "count!"
33            FROM funnel_progress
34            WHERE funnel_id = $1 AND created_at >= $2 AND completed_at IS NOT NULL
35            "#,
36            funnel_id.as_str(),
37            since
38        )
39        .fetch_one(&*self.pool)
40        .await?;
41
42        let overall_conversion_rate = if total_entries > 0 {
43            (total_completions as f64 / total_entries as f64) * 100.0
44        } else {
45            0.0
46        };
47
48        let step_stats = self
49            .calculate_step_stats(funnel_id, &funnel.steps, since)
50            .await?;
51
52        Ok(FunnelStats {
53            funnel_id: funnel_id.clone(),
54            funnel_name: funnel.funnel.name,
55            total_entries,
56            total_completions,
57            overall_conversion_rate,
58            step_stats,
59        })
60    }
61
62    async fn calculate_step_stats(
63        &self,
64        funnel_id: &FunnelId,
65        steps: &[FunnelStep],
66        since: DateTime<Utc>,
67    ) -> Result<Vec<FunnelStepStats>> {
68        if steps.is_empty() {
69            return Ok(Vec::new());
70        }
71
72        let step_orders: Vec<i32> = steps.iter().map(|s| s.step_order).collect();
73
74        let rows = sqlx::query!(
75            r#"
76            SELECT
77                s.step_order,
78                COUNT(*) FILTER (WHERE fp.current_step >= s.step_order) as "entered_count!",
79                COUNT(*) FILTER (WHERE fp.current_step > s.step_order) as "exited_count!"
80            FROM UNNEST($3::int4[]) AS s(step_order)
81            LEFT JOIN funnel_progress fp
82                ON fp.funnel_id = $1 AND fp.created_at >= $2
83            GROUP BY s.step_order
84            ORDER BY s.step_order
85            "#,
86            funnel_id.as_str(),
87            since,
88            &step_orders
89        )
90        .fetch_all(&*self.pool)
91        .await?;
92
93        let mut stats = Vec::with_capacity(steps.len());
94        for row in rows {
95            let entered_count = row.entered_count;
96            let exited_count = row.exited_count;
97            let conversion_rate = if entered_count > 0 {
98                (exited_count as f64 / entered_count as f64) * 100.0
99            } else {
100                0.0
101            };
102
103            stats.push(FunnelStepStats {
104                step_order: row.step_order.unwrap_or(0),
105                entered_count,
106                exited_count,
107                conversion_rate,
108                avg_time_to_next_ms: None,
109            });
110        }
111
112        Ok(stats)
113    }
114}