Skip to main content

systemprompt_analytics/repository/funnel/
finders.rs

1use anyhow::Result;
2use systemprompt_identifiers::{FunnelId, SessionId};
3
4use super::types::{FunnelProgressRow, FunnelRow, FunnelStepRow};
5use super::FunnelRepository;
6use crate::models::{Funnel, FunnelProgress, FunnelStep, FunnelWithSteps};
7
8impl FunnelRepository {
9    pub async fn find_by_id(&self, id: &FunnelId) -> Result<Option<FunnelWithSteps>> {
10        let funnel_row = sqlx::query_as!(
11            FunnelRow,
12            r#"
13            SELECT id, name, description, is_active, created_at, updated_at
14            FROM funnels WHERE id = $1
15            "#,
16            id.as_str()
17        )
18        .fetch_optional(&*self.pool)
19        .await?;
20
21        let Some(row) = funnel_row else {
22            return Ok(None);
23        };
24
25        let funnel = row.into_funnel();
26        let steps = self.get_steps_for_funnel(id).await?;
27
28        Ok(Some(FunnelWithSteps { funnel, steps }))
29    }
30
31    pub async fn find_by_name(&self, name: &str) -> Result<Option<FunnelWithSteps>> {
32        let funnel_row = sqlx::query_as!(
33            FunnelRow,
34            r#"
35            SELECT id, name, description, is_active, created_at, updated_at
36            FROM funnels WHERE name = $1
37            "#,
38            name
39        )
40        .fetch_optional(&*self.pool)
41        .await?;
42
43        let Some(row) = funnel_row else {
44            return Ok(None);
45        };
46
47        let funnel = row.into_funnel();
48        let funnel_id = FunnelId::new(funnel.id.as_str());
49        let steps = self.get_steps_for_funnel(&funnel_id).await?;
50
51        Ok(Some(FunnelWithSteps { funnel, steps }))
52    }
53
54    pub async fn list_active(&self) -> Result<Vec<Funnel>> {
55        let rows = sqlx::query_as!(
56            FunnelRow,
57            r#"
58            SELECT id, name, description, is_active, created_at, updated_at
59            FROM funnels WHERE is_active = TRUE ORDER BY name
60            "#
61        )
62        .fetch_all(&*self.pool)
63        .await?;
64
65        Ok(rows.into_iter().map(FunnelRow::into_funnel).collect())
66    }
67
68    pub async fn list_all(&self) -> Result<Vec<Funnel>> {
69        let rows = sqlx::query_as!(
70            FunnelRow,
71            r#"
72            SELECT id, name, description, is_active, created_at, updated_at
73            FROM funnels ORDER BY name
74            "#
75        )
76        .fetch_all(&*self.pool)
77        .await?;
78
79        Ok(rows.into_iter().map(FunnelRow::into_funnel).collect())
80    }
81
82    pub async fn find_progress(
83        &self,
84        funnel_id: &FunnelId,
85        session_id: &SessionId,
86    ) -> Result<Option<FunnelProgress>> {
87        let row = sqlx::query_as!(
88            FunnelProgressRow,
89            r#"
90            SELECT id, funnel_id, session_id, current_step, completed_at, dropped_at_step,
91                   step_timestamps, created_at, updated_at
92            FROM funnel_progress WHERE funnel_id = $1 AND session_id = $2
93            "#,
94            funnel_id.as_str(),
95            session_id.as_str()
96        )
97        .fetch_optional(&*self.pool)
98        .await?;
99
100        Ok(row.map(FunnelProgressRow::into_progress))
101    }
102
103    pub(super) async fn get_steps_for_funnel(
104        &self,
105        funnel_id: &FunnelId,
106    ) -> Result<Vec<FunnelStep>> {
107        let rows = sqlx::query_as!(
108            FunnelStepRow,
109            r#"
110            SELECT funnel_id, step_order, name, match_pattern, match_type
111            FROM funnel_steps WHERE funnel_id = $1 ORDER BY step_order
112            "#,
113            funnel_id.as_str()
114        )
115        .fetch_all(&*self.pool)
116        .await?;
117
118        Ok(rows.into_iter().map(FunnelStepRow::into_step).collect())
119    }
120}