systemprompt_analytics/repository/funnel/
mutations.rs1use anyhow::Result;
2use chrono::Utc;
3use systemprompt_identifiers::{FunnelId, FunnelProgressId, SessionId};
4
5use super::FunnelRepository;
6use crate::models::{CreateFunnelInput, Funnel, FunnelProgress, FunnelStep, FunnelWithSteps};
7
8impl FunnelRepository {
9 pub async fn create_funnel(&self, input: &CreateFunnelInput) -> Result<FunnelWithSteps> {
10 let funnel_id = FunnelId::generate();
11 let now = Utc::now();
12
13 sqlx::query!(
14 r#"
15 INSERT INTO funnels (id, name, description, is_active, created_at, updated_at)
16 VALUES ($1, $2, $3, TRUE, $4, $4)
17 "#,
18 funnel_id.as_str(),
19 input.name,
20 input.description,
21 now
22 )
23 .execute(&*self.write_pool)
24 .await?;
25
26 let mut funnel_ids_arr = Vec::with_capacity(input.steps.len());
27 let mut step_orders = Vec::with_capacity(input.steps.len());
28 let mut names = Vec::with_capacity(input.steps.len());
29 let mut patterns = Vec::with_capacity(input.steps.len());
30 let mut match_types = Vec::with_capacity(input.steps.len());
31 let mut steps = Vec::with_capacity(input.steps.len());
32
33 for (idx, step_input) in input.steps.iter().enumerate() {
34 let step_order = i32::try_from(idx).unwrap_or(0);
35 funnel_ids_arr.push(funnel_id.as_str().to_string());
36 step_orders.push(step_order);
37 names.push(step_input.name.clone());
38 patterns.push(step_input.match_pattern.clone());
39 match_types.push(step_input.match_type.as_str().to_string());
40
41 steps.push(FunnelStep {
42 funnel_id: funnel_id.clone(),
43 step_order,
44 name: step_input.name.clone(),
45 match_pattern: step_input.match_pattern.clone(),
46 match_type: step_input.match_type,
47 });
48 }
49
50 if !steps.is_empty() {
51 sqlx::query!(
52 r#"
53 INSERT INTO funnel_steps (funnel_id, step_order, name, match_pattern, match_type)
54 SELECT * FROM UNNEST($1::text[], $2::int4[], $3::text[], $4::text[], $5::text[])
55 "#,
56 &funnel_ids_arr,
57 &step_orders,
58 &names,
59 &patterns,
60 &match_types
61 )
62 .execute(&*self.write_pool)
63 .await?;
64 }
65
66 let funnel = Funnel {
67 id: funnel_id,
68 name: input.name.clone(),
69 description: input.description.clone(),
70 is_active: true,
71 created_at: now,
72 updated_at: now,
73 };
74
75 Ok(FunnelWithSteps { funnel, steps })
76 }
77
78 pub async fn deactivate(&self, id: &FunnelId) -> Result<bool> {
79 let result = sqlx::query!(
80 r#"
81 UPDATE funnels SET is_active = FALSE, updated_at = $2 WHERE id = $1
82 "#,
83 id.as_str(),
84 Utc::now()
85 )
86 .execute(&*self.write_pool)
87 .await?;
88
89 Ok(result.rows_affected() > 0)
90 }
91
92 pub async fn delete(&self, id: &FunnelId) -> Result<bool> {
93 let result = sqlx::query!(r#"DELETE FROM funnels WHERE id = $1"#, id.as_str())
94 .execute(&*self.write_pool)
95 .await?;
96
97 Ok(result.rows_affected() > 0)
98 }
99
100 pub async fn record_progress(
101 &self,
102 funnel_id: &FunnelId,
103 session_id: &SessionId,
104 step: i32,
105 ) -> Result<FunnelProgress> {
106 let now = Utc::now();
107 let step_timestamp = serde_json::json!({
108 "step": step,
109 "timestamp": now.to_rfc3339()
110 });
111
112 if let Some(mut progress) = self.find_progress(funnel_id, session_id).await? {
113 if step > progress.current_step {
114 let mut timestamps = progress
115 .step_timestamps
116 .as_array()
117 .cloned()
118 .unwrap_or_else(Vec::new);
119 timestamps.push(step_timestamp);
120
121 sqlx::query!(
122 r#"
123 UPDATE funnel_progress
124 SET current_step = $3, step_timestamps = $4, updated_at = $5
125 WHERE funnel_id = $1 AND session_id = $2
126 "#,
127 funnel_id.as_str(),
128 session_id.as_str(),
129 step,
130 serde_json::Value::Array(timestamps.clone()),
131 now
132 )
133 .execute(&*self.write_pool)
134 .await?;
135
136 progress.current_step = step;
137 progress.step_timestamps = serde_json::Value::Array(timestamps);
138 progress.updated_at = now;
139 }
140 return Ok(progress);
141 }
142
143 let id = FunnelProgressId::generate();
144 let timestamps = serde_json::json!([step_timestamp]);
145
146 sqlx::query!(
147 r#"
148 INSERT INTO funnel_progress (
149 id, funnel_id, session_id, current_step, step_timestamps, created_at, updated_at
150 )
151 VALUES ($1, $2, $3, $4, $5, $6, $6)
152 "#,
153 id.as_str(),
154 funnel_id.as_str(),
155 session_id.as_str(),
156 step,
157 timestamps,
158 now
159 )
160 .execute(&*self.write_pool)
161 .await?;
162
163 Ok(FunnelProgress {
164 id,
165 funnel_id: funnel_id.clone(),
166 session_id: session_id.clone(),
167 current_step: step,
168 completed_at: None,
169 dropped_at_step: None,
170 step_timestamps: timestamps,
171 created_at: now,
172 updated_at: now,
173 })
174 }
175
176 pub async fn mark_completed(
177 &self,
178 funnel_id: &FunnelId,
179 session_id: &SessionId,
180 ) -> Result<bool> {
181 let now = Utc::now();
182 let result = sqlx::query!(
183 r#"
184 UPDATE funnel_progress
185 SET completed_at = $3, updated_at = $3
186 WHERE funnel_id = $1 AND session_id = $2
187 "#,
188 funnel_id.as_str(),
189 session_id.as_str(),
190 now
191 )
192 .execute(&*self.write_pool)
193 .await?;
194
195 Ok(result.rows_affected() > 0)
196 }
197}