Skip to main content

systemprompt_analytics/repository/funnel/
mutations.rs

1use anyhow::Result;
2use chrono::Utc;
3use systemprompt_identifiers::{FunnelId, FunnelProgressId, SessionId};
4
5use super::FunnelRepository;
6use crate::models::{CreateFunnelInput, Funnel, FunnelProgress, FunnelStep, FunnelWithSteps};
7
8impl FunnelRepository {
9    pub async fn create_funnel(&self, input: &CreateFunnelInput) -> Result<FunnelWithSteps> {
10        let funnel_id = FunnelId::generate();
11        let now = Utc::now();
12
13        sqlx::query!(
14            r#"
15            INSERT INTO funnels (id, name, description, is_active, created_at, updated_at)
16            VALUES ($1, $2, $3, TRUE, $4, $4)
17            "#,
18            funnel_id.as_str(),
19            input.name,
20            input.description,
21            now
22        )
23        .execute(&*self.write_pool)
24        .await?;
25
26        let mut funnel_ids_arr = Vec::with_capacity(input.steps.len());
27        let mut step_orders = Vec::with_capacity(input.steps.len());
28        let mut names = Vec::with_capacity(input.steps.len());
29        let mut patterns = Vec::with_capacity(input.steps.len());
30        let mut match_types = Vec::with_capacity(input.steps.len());
31        let mut steps = Vec::with_capacity(input.steps.len());
32
33        for (idx, step_input) in input.steps.iter().enumerate() {
34            let step_order = i32::try_from(idx).unwrap_or(0);
35            funnel_ids_arr.push(funnel_id.as_str().to_string());
36            step_orders.push(step_order);
37            names.push(step_input.name.clone());
38            patterns.push(step_input.match_pattern.clone());
39            match_types.push(step_input.match_type.as_str().to_string());
40
41            steps.push(FunnelStep {
42                funnel_id: funnel_id.clone(),
43                step_order,
44                name: step_input.name.clone(),
45                match_pattern: step_input.match_pattern.clone(),
46                match_type: step_input.match_type,
47            });
48        }
49
50        if !steps.is_empty() {
51            sqlx::query!(
52                r#"
53                INSERT INTO funnel_steps (funnel_id, step_order, name, match_pattern, match_type)
54                SELECT * FROM UNNEST($1::text[], $2::int4[], $3::text[], $4::text[], $5::text[])
55                "#,
56                &funnel_ids_arr,
57                &step_orders,
58                &names,
59                &patterns,
60                &match_types
61            )
62            .execute(&*self.write_pool)
63            .await?;
64        }
65
66        let funnel = Funnel {
67            id: funnel_id,
68            name: input.name.clone(),
69            description: input.description.clone(),
70            is_active: true,
71            created_at: now,
72            updated_at: now,
73        };
74
75        Ok(FunnelWithSteps { funnel, steps })
76    }
77
78    pub async fn deactivate(&self, id: &FunnelId) -> Result<bool> {
79        let result = sqlx::query!(
80            r#"
81            UPDATE funnels SET is_active = FALSE, updated_at = $2 WHERE id = $1
82            "#,
83            id.as_str(),
84            Utc::now()
85        )
86        .execute(&*self.write_pool)
87        .await?;
88
89        Ok(result.rows_affected() > 0)
90    }
91
92    pub async fn delete(&self, id: &FunnelId) -> Result<bool> {
93        let result = sqlx::query!(r#"DELETE FROM funnels WHERE id = $1"#, id.as_str())
94            .execute(&*self.write_pool)
95            .await?;
96
97        Ok(result.rows_affected() > 0)
98    }
99
100    pub async fn record_progress(
101        &self,
102        funnel_id: &FunnelId,
103        session_id: &SessionId,
104        step: i32,
105    ) -> Result<FunnelProgress> {
106        let now = Utc::now();
107        let step_timestamp = serde_json::json!({
108            "step": step,
109            "timestamp": now.to_rfc3339()
110        });
111
112        if let Some(mut progress) = self.find_progress(funnel_id, session_id).await? {
113            if step > progress.current_step {
114                let mut timestamps = progress
115                    .step_timestamps
116                    .as_array()
117                    .cloned()
118                    .unwrap_or_else(Vec::new);
119                timestamps.push(step_timestamp);
120
121                sqlx::query!(
122                    r#"
123                    UPDATE funnel_progress
124                    SET current_step = $3, step_timestamps = $4, updated_at = $5
125                    WHERE funnel_id = $1 AND session_id = $2
126                    "#,
127                    funnel_id.as_str(),
128                    session_id.as_str(),
129                    step,
130                    serde_json::Value::Array(timestamps.clone()),
131                    now
132                )
133                .execute(&*self.write_pool)
134                .await?;
135
136                progress.current_step = step;
137                progress.step_timestamps = serde_json::Value::Array(timestamps);
138                progress.updated_at = now;
139            }
140            return Ok(progress);
141        }
142
143        let id = FunnelProgressId::generate();
144        let timestamps = serde_json::json!([step_timestamp]);
145
146        sqlx::query!(
147            r#"
148            INSERT INTO funnel_progress (
149                id, funnel_id, session_id, current_step, step_timestamps, created_at, updated_at
150            )
151            VALUES ($1, $2, $3, $4, $5, $6, $6)
152            "#,
153            id.as_str(),
154            funnel_id.as_str(),
155            session_id.as_str(),
156            step,
157            timestamps,
158            now
159        )
160        .execute(&*self.write_pool)
161        .await?;
162
163        Ok(FunnelProgress {
164            id,
165            funnel_id: funnel_id.clone(),
166            session_id: session_id.clone(),
167            current_step: step,
168            completed_at: None,
169            dropped_at_step: None,
170            step_timestamps: timestamps,
171            created_at: now,
172            updated_at: now,
173        })
174    }
175
176    pub async fn mark_completed(
177        &self,
178        funnel_id: &FunnelId,
179        session_id: &SessionId,
180    ) -> Result<bool> {
181        let now = Utc::now();
182        let result = sqlx::query!(
183            r#"
184            UPDATE funnel_progress
185            SET completed_at = $3, updated_at = $3
186            WHERE funnel_id = $1 AND session_id = $2
187            "#,
188            funnel_id.as_str(),
189            session_id.as_str(),
190            now
191        )
192        .execute(&*self.write_pool)
193        .await?;
194
195        Ok(result.rows_affected() > 0)
196    }
197}