systemprompt_analytics/repository/funnel/
finders.rs1use anyhow::Result;
2use systemprompt_identifiers::{FunnelId, SessionId};
3
4use super::types::{FunnelProgressRow, FunnelRow, FunnelStepRow};
5use super::FunnelRepository;
6use crate::models::{Funnel, FunnelProgress, FunnelStep, FunnelWithSteps};
7
8impl FunnelRepository {
9 pub async fn find_by_id(&self, id: &FunnelId) -> Result<Option<FunnelWithSteps>> {
10 let funnel_row = sqlx::query_as!(
11 FunnelRow,
12 r#"
13 SELECT id, name, description, is_active, created_at, updated_at
14 FROM funnels WHERE id = $1
15 "#,
16 id.as_str()
17 )
18 .fetch_optional(&*self.pool)
19 .await?;
20
21 let Some(row) = funnel_row else {
22 return Ok(None);
23 };
24
25 let funnel = row.into_funnel();
26 let steps = self.get_steps_for_funnel(id).await?;
27
28 Ok(Some(FunnelWithSteps { funnel, steps }))
29 }
30
31 pub async fn find_by_name(&self, name: &str) -> Result<Option<FunnelWithSteps>> {
32 let funnel_row = sqlx::query_as!(
33 FunnelRow,
34 r#"
35 SELECT id, name, description, is_active, created_at, updated_at
36 FROM funnels WHERE name = $1
37 "#,
38 name
39 )
40 .fetch_optional(&*self.pool)
41 .await?;
42
43 let Some(row) = funnel_row else {
44 return Ok(None);
45 };
46
47 let funnel = row.into_funnel();
48 let funnel_id = FunnelId::new(funnel.id.as_str());
49 let steps = self.get_steps_for_funnel(&funnel_id).await?;
50
51 Ok(Some(FunnelWithSteps { funnel, steps }))
52 }
53
54 pub async fn list_active(&self) -> Result<Vec<Funnel>> {
55 let rows = sqlx::query_as!(
56 FunnelRow,
57 r#"
58 SELECT id, name, description, is_active, created_at, updated_at
59 FROM funnels WHERE is_active = TRUE ORDER BY name
60 "#
61 )
62 .fetch_all(&*self.pool)
63 .await?;
64
65 Ok(rows.into_iter().map(FunnelRow::into_funnel).collect())
66 }
67
68 pub async fn list_all(&self) -> Result<Vec<Funnel>> {
69 let rows = sqlx::query_as!(
70 FunnelRow,
71 r#"
72 SELECT id, name, description, is_active, created_at, updated_at
73 FROM funnels ORDER BY name
74 "#
75 )
76 .fetch_all(&*self.pool)
77 .await?;
78
79 Ok(rows.into_iter().map(FunnelRow::into_funnel).collect())
80 }
81
82 pub async fn find_progress(
83 &self,
84 funnel_id: &FunnelId,
85 session_id: &SessionId,
86 ) -> Result<Option<FunnelProgress>> {
87 let row = sqlx::query_as!(
88 FunnelProgressRow,
89 r#"
90 SELECT id, funnel_id, session_id, current_step, completed_at, dropped_at_step,
91 step_timestamps, created_at, updated_at
92 FROM funnel_progress WHERE funnel_id = $1 AND session_id = $2
93 "#,
94 funnel_id.as_str(),
95 session_id.as_str()
96 )
97 .fetch_optional(&*self.pool)
98 .await?;
99
100 Ok(row.map(FunnelProgressRow::into_progress))
101 }
102
103 pub(super) async fn get_steps_for_funnel(
104 &self,
105 funnel_id: &FunnelId,
106 ) -> Result<Vec<FunnelStep>> {
107 let rows = sqlx::query_as!(
108 FunnelStepRow,
109 r#"
110 SELECT funnel_id, step_order, name, match_pattern, match_type
111 FROM funnel_steps WHERE funnel_id = $1 ORDER BY step_order
112 "#,
113 funnel_id.as_str()
114 )
115 .fetch_all(&*self.pool)
116 .await?;
117
118 Ok(rows.into_iter().map(FunnelStepRow::into_step).collect())
119 }
120}