systemprompt_analytics/repository/funnel/
mutations.rs1use anyhow::Result;
2use chrono::Utc;
3use systemprompt_identifiers::{FunnelId, FunnelProgressId, SessionId};
4
5use super::FunnelRepository;
6use crate::models::{CreateFunnelInput, Funnel, FunnelProgress, FunnelStep, FunnelWithSteps};
7
8impl FunnelRepository {
9 pub async fn create_funnel(&self, input: &CreateFunnelInput) -> Result<FunnelWithSteps> {
10 let funnel_id = FunnelId::generate();
11 let now = Utc::now();
12
13 sqlx::query!(
14 r#"
15 INSERT INTO funnels (id, name, description, is_active, created_at, updated_at)
16 VALUES ($1, $2, $3, TRUE, $4, $4)
17 "#,
18 funnel_id.as_str(),
19 input.name,
20 input.description,
21 now
22 )
23 .execute(&*self.pool)
24 .await?;
25
26 let mut steps = Vec::with_capacity(input.steps.len());
27 for (idx, step_input) in input.steps.iter().enumerate() {
28 let step_order = i32::try_from(idx).unwrap_or(0);
29 let match_type = step_input.match_type.as_str();
30
31 sqlx::query!(
32 r#"
33 INSERT INTO funnel_steps (funnel_id, step_order, name, match_pattern, match_type)
34 VALUES ($1, $2, $3, $4, $5)
35 "#,
36 funnel_id.as_str(),
37 step_order,
38 step_input.name,
39 step_input.match_pattern,
40 match_type
41 )
42 .execute(&*self.pool)
43 .await?;
44
45 steps.push(FunnelStep {
46 funnel_id: funnel_id.clone(),
47 step_order,
48 name: step_input.name.clone(),
49 match_pattern: step_input.match_pattern.clone(),
50 match_type: step_input.match_type,
51 });
52 }
53
54 let funnel = Funnel {
55 id: funnel_id,
56 name: input.name.clone(),
57 description: input.description.clone(),
58 is_active: true,
59 created_at: now,
60 updated_at: now,
61 };
62
63 Ok(FunnelWithSteps { funnel, steps })
64 }
65
66 pub async fn deactivate(&self, id: &FunnelId) -> Result<bool> {
67 let result = sqlx::query!(
68 r#"
69 UPDATE funnels SET is_active = FALSE, updated_at = $2 WHERE id = $1
70 "#,
71 id.as_str(),
72 Utc::now()
73 )
74 .execute(&*self.pool)
75 .await?;
76
77 Ok(result.rows_affected() > 0)
78 }
79
80 pub async fn delete(&self, id: &FunnelId) -> Result<bool> {
81 let result = sqlx::query!(r#"DELETE FROM funnels WHERE id = $1"#, id.as_str())
82 .execute(&*self.pool)
83 .await?;
84
85 Ok(result.rows_affected() > 0)
86 }
87
88 pub async fn record_progress(
89 &self,
90 funnel_id: &FunnelId,
91 session_id: &SessionId,
92 step: i32,
93 ) -> Result<FunnelProgress> {
94 let now = Utc::now();
95 let step_timestamp = serde_json::json!({
96 "step": step,
97 "timestamp": now.to_rfc3339()
98 });
99
100 if let Some(mut progress) = self.find_progress(funnel_id, session_id).await? {
101 if step > progress.current_step {
102 let mut timestamps = progress
103 .step_timestamps
104 .as_array()
105 .cloned()
106 .unwrap_or_else(Vec::new);
107 timestamps.push(step_timestamp);
108
109 sqlx::query!(
110 r#"
111 UPDATE funnel_progress
112 SET current_step = $3, step_timestamps = $4, updated_at = $5
113 WHERE funnel_id = $1 AND session_id = $2
114 "#,
115 funnel_id.as_str(),
116 session_id.as_str(),
117 step,
118 serde_json::Value::Array(timestamps.clone()),
119 now
120 )
121 .execute(&*self.pool)
122 .await?;
123
124 progress.current_step = step;
125 progress.step_timestamps = serde_json::Value::Array(timestamps);
126 progress.updated_at = now;
127 }
128 return Ok(progress);
129 }
130
131 let id = FunnelProgressId::generate();
132 let timestamps = serde_json::json!([step_timestamp]);
133
134 sqlx::query!(
135 r#"
136 INSERT INTO funnel_progress (
137 id, funnel_id, session_id, current_step, step_timestamps, created_at, updated_at
138 )
139 VALUES ($1, $2, $3, $4, $5, $6, $6)
140 "#,
141 id.as_str(),
142 funnel_id.as_str(),
143 session_id.as_str(),
144 step,
145 timestamps,
146 now
147 )
148 .execute(&*self.pool)
149 .await?;
150
151 Ok(FunnelProgress {
152 id,
153 funnel_id: funnel_id.clone(),
154 session_id: session_id.clone(),
155 current_step: step,
156 completed_at: None,
157 dropped_at_step: None,
158 step_timestamps: timestamps,
159 created_at: now,
160 updated_at: now,
161 })
162 }
163
164 pub async fn mark_completed(
165 &self,
166 funnel_id: &FunnelId,
167 session_id: &SessionId,
168 ) -> Result<bool> {
169 let now = Utc::now();
170 let result = sqlx::query!(
171 r#"
172 UPDATE funnel_progress
173 SET completed_at = $3, updated_at = $3
174 WHERE funnel_id = $1 AND session_id = $2
175 "#,
176 funnel_id.as_str(),
177 session_id.as_str(),
178 now
179 )
180 .execute(&*self.pool)
181 .await?;
182
183 Ok(result.rows_affected() > 0)
184 }
185}