systemprompt_analytics/repository/funnel/
mutations.rs1use crate::Result;
8use chrono::Utc;
9use systemprompt_identifiers::{FunnelId, FunnelProgressId, SessionId};
10
11use super::FunnelRepository;
12use crate::models::{CreateFunnelInput, Funnel, FunnelProgress, FunnelStep, FunnelWithSteps};
13
14impl FunnelRepository {
15 pub async fn create_funnel(&self, input: &CreateFunnelInput) -> Result<FunnelWithSteps> {
16 let funnel_id = FunnelId::generate();
17 let now = Utc::now();
18
19 sqlx::query!(
20 r#"
21 INSERT INTO funnels (id, name, description, is_active, created_at, updated_at)
22 VALUES ($1, $2, $3, TRUE, $4, $4)
23 "#,
24 funnel_id.as_str(),
25 input.name,
26 input.description,
27 now
28 )
29 .execute(&*self.write_pool)
30 .await?;
31
32 let mut funnel_ids_arr = Vec::with_capacity(input.steps.len());
33 let mut step_orders = Vec::with_capacity(input.steps.len());
34 let mut names = Vec::with_capacity(input.steps.len());
35 let mut patterns = Vec::with_capacity(input.steps.len());
36 let mut match_types = Vec::with_capacity(input.steps.len());
37 let mut steps = Vec::with_capacity(input.steps.len());
38
39 for (idx, step_input) in input.steps.iter().enumerate() {
40 let step_order = i32::try_from(idx).unwrap_or(0);
41 funnel_ids_arr.push(funnel_id.as_str().to_owned());
42 step_orders.push(step_order);
43 names.push(step_input.name.clone());
44 patterns.push(step_input.match_pattern.clone());
45 match_types.push(step_input.match_type.as_str().to_owned());
46
47 steps.push(FunnelStep {
48 funnel_id: funnel_id.clone(),
49 step_order,
50 name: step_input.name.clone(),
51 match_pattern: step_input.match_pattern.clone(),
52 match_type: step_input.match_type,
53 });
54 }
55
56 if !steps.is_empty() {
57 sqlx::query!(
58 r#"
59 INSERT INTO funnel_steps (funnel_id, step_order, name, match_pattern, match_type)
60 SELECT * FROM UNNEST($1::text[], $2::int4[], $3::text[], $4::text[], $5::text[])
61 "#,
62 &funnel_ids_arr,
63 &step_orders,
64 &names,
65 &patterns,
66 &match_types
67 )
68 .execute(&*self.write_pool)
69 .await?;
70 }
71
72 let funnel = Funnel {
73 id: funnel_id,
74 name: input.name.clone(),
75 description: input.description.clone(),
76 is_active: true,
77 created_at: now,
78 updated_at: now,
79 };
80
81 Ok(FunnelWithSteps { funnel, steps })
82 }
83
84 pub async fn deactivate(&self, id: &FunnelId) -> Result<bool> {
85 let result = sqlx::query!(
86 r#"
87 UPDATE funnels SET is_active = FALSE, updated_at = $2 WHERE id = $1
88 "#,
89 id.as_str(),
90 Utc::now()
91 )
92 .execute(&*self.write_pool)
93 .await?;
94
95 Ok(result.rows_affected() > 0)
96 }
97
98 pub async fn delete(&self, id: &FunnelId) -> Result<bool> {
99 let result = sqlx::query!(r#"DELETE FROM funnels WHERE id = $1"#, id.as_str())
100 .execute(&*self.write_pool)
101 .await?;
102
103 Ok(result.rows_affected() > 0)
104 }
105
106 pub async fn record_progress(
107 &self,
108 funnel_id: &FunnelId,
109 session_id: &SessionId,
110 step: i32,
111 ) -> Result<FunnelProgress> {
112 let now = Utc::now();
113 let step_timestamp = serde_json::json!({
114 "step": step,
115 "timestamp": now.to_rfc3339()
116 });
117
118 if let Some(mut progress) = self.find_progress(funnel_id, session_id).await? {
119 if step > progress.current_step {
120 let mut timestamps = progress
121 .step_timestamps
122 .as_array()
123 .cloned()
124 .unwrap_or_else(Vec::new);
125 timestamps.push(step_timestamp);
126
127 sqlx::query!(
128 r#"
129 UPDATE funnel_progress
130 SET current_step = $3, step_timestamps = $4, updated_at = $5
131 WHERE funnel_id = $1 AND session_id = $2
132 "#,
133 funnel_id.as_str(),
134 session_id.as_str(),
135 step,
136 serde_json::Value::Array(timestamps.clone()),
137 now
138 )
139 .execute(&*self.write_pool)
140 .await?;
141
142 progress.current_step = step;
143 progress.step_timestamps = serde_json::Value::Array(timestamps);
144 progress.updated_at = now;
145 }
146 return Ok(progress);
147 }
148
149 let id = FunnelProgressId::generate();
150 let timestamps = serde_json::json!([step_timestamp]);
151
152 sqlx::query!(
153 r#"
154 INSERT INTO funnel_progress (
155 id, funnel_id, session_id, current_step, step_timestamps, created_at, updated_at
156 )
157 VALUES ($1, $2, $3, $4, $5, $6, $6)
158 "#,
159 id.as_str(),
160 funnel_id.as_str(),
161 session_id.as_str(),
162 step,
163 timestamps,
164 now
165 )
166 .execute(&*self.write_pool)
167 .await?;
168
169 Ok(FunnelProgress {
170 id,
171 funnel_id: funnel_id.clone(),
172 session_id: session_id.clone(),
173 current_step: step,
174 completed_at: None,
175 dropped_at_step: None,
176 step_timestamps: timestamps,
177 created_at: now,
178 updated_at: now,
179 })
180 }
181
182 pub async fn mark_completed(
183 &self,
184 funnel_id: &FunnelId,
185 session_id: &SessionId,
186 ) -> Result<bool> {
187 let now = Utc::now();
188 let result = sqlx::query!(
189 r#"
190 UPDATE funnel_progress
191 SET completed_at = $3, updated_at = $3
192 WHERE funnel_id = $1 AND session_id = $2
193 "#,
194 funnel_id.as_str(),
195 session_id.as_str(),
196 now
197 )
198 .execute(&*self.write_pool)
199 .await?;
200
201 Ok(result.rows_affected() > 0)
202 }
203}