Skip to main content

systemprompt_analytics/repository/funnel/
mutations.rs

1use anyhow::Result;
2use chrono::Utc;
3use systemprompt_identifiers::{FunnelId, FunnelProgressId, SessionId};
4
5use super::FunnelRepository;
6use crate::models::{CreateFunnelInput, Funnel, FunnelProgress, FunnelStep, FunnelWithSteps};
7
8impl FunnelRepository {
9    pub async fn create_funnel(&self, input: &CreateFunnelInput) -> Result<FunnelWithSteps> {
10        let funnel_id = FunnelId::generate();
11        let now = Utc::now();
12
13        sqlx::query!(
14            r#"
15            INSERT INTO funnels (id, name, description, is_active, created_at, updated_at)
16            VALUES ($1, $2, $3, TRUE, $4, $4)
17            "#,
18            funnel_id.as_str(),
19            input.name,
20            input.description,
21            now
22        )
23        .execute(&*self.pool)
24        .await?;
25
26        let mut steps = Vec::with_capacity(input.steps.len());
27        for (idx, step_input) in input.steps.iter().enumerate() {
28            let step_order = i32::try_from(idx).unwrap_or(0);
29            let match_type = step_input.match_type.as_str();
30
31            sqlx::query!(
32                r#"
33                INSERT INTO funnel_steps (funnel_id, step_order, name, match_pattern, match_type)
34                VALUES ($1, $2, $3, $4, $5)
35                "#,
36                funnel_id.as_str(),
37                step_order,
38                step_input.name,
39                step_input.match_pattern,
40                match_type
41            )
42            .execute(&*self.pool)
43            .await?;
44
45            steps.push(FunnelStep {
46                funnel_id: funnel_id.clone(),
47                step_order,
48                name: step_input.name.clone(),
49                match_pattern: step_input.match_pattern.clone(),
50                match_type: step_input.match_type,
51            });
52        }
53
54        let funnel = Funnel {
55            id: funnel_id,
56            name: input.name.clone(),
57            description: input.description.clone(),
58            is_active: true,
59            created_at: now,
60            updated_at: now,
61        };
62
63        Ok(FunnelWithSteps { funnel, steps })
64    }
65
66    pub async fn deactivate(&self, id: &FunnelId) -> Result<bool> {
67        let result = sqlx::query!(
68            r#"
69            UPDATE funnels SET is_active = FALSE, updated_at = $2 WHERE id = $1
70            "#,
71            id.as_str(),
72            Utc::now()
73        )
74        .execute(&*self.pool)
75        .await?;
76
77        Ok(result.rows_affected() > 0)
78    }
79
80    pub async fn delete(&self, id: &FunnelId) -> Result<bool> {
81        let result = sqlx::query!(r#"DELETE FROM funnels WHERE id = $1"#, id.as_str())
82            .execute(&*self.pool)
83            .await?;
84
85        Ok(result.rows_affected() > 0)
86    }
87
88    pub async fn record_progress(
89        &self,
90        funnel_id: &FunnelId,
91        session_id: &SessionId,
92        step: i32,
93    ) -> Result<FunnelProgress> {
94        let now = Utc::now();
95        let step_timestamp = serde_json::json!({
96            "step": step,
97            "timestamp": now.to_rfc3339()
98        });
99
100        if let Some(mut progress) = self.find_progress(funnel_id, session_id).await? {
101            if step > progress.current_step {
102                let mut timestamps = progress
103                    .step_timestamps
104                    .as_array()
105                    .cloned()
106                    .unwrap_or_else(Vec::new);
107                timestamps.push(step_timestamp);
108
109                sqlx::query!(
110                    r#"
111                    UPDATE funnel_progress
112                    SET current_step = $3, step_timestamps = $4, updated_at = $5
113                    WHERE funnel_id = $1 AND session_id = $2
114                    "#,
115                    funnel_id.as_str(),
116                    session_id.as_str(),
117                    step,
118                    serde_json::Value::Array(timestamps.clone()),
119                    now
120                )
121                .execute(&*self.pool)
122                .await?;
123
124                progress.current_step = step;
125                progress.step_timestamps = serde_json::Value::Array(timestamps);
126                progress.updated_at = now;
127            }
128            return Ok(progress);
129        }
130
131        let id = FunnelProgressId::generate();
132        let timestamps = serde_json::json!([step_timestamp]);
133
134        sqlx::query!(
135            r#"
136            INSERT INTO funnel_progress (
137                id, funnel_id, session_id, current_step, step_timestamps, created_at, updated_at
138            )
139            VALUES ($1, $2, $3, $4, $5, $6, $6)
140            "#,
141            id.as_str(),
142            funnel_id.as_str(),
143            session_id.as_str(),
144            step,
145            timestamps,
146            now
147        )
148        .execute(&*self.pool)
149        .await?;
150
151        Ok(FunnelProgress {
152            id,
153            funnel_id: funnel_id.clone(),
154            session_id: session_id.clone(),
155            current_step: step,
156            completed_at: None,
157            dropped_at_step: None,
158            step_timestamps: timestamps,
159            created_at: now,
160            updated_at: now,
161        })
162    }
163
164    pub async fn mark_completed(
165        &self,
166        funnel_id: &FunnelId,
167        session_id: &SessionId,
168    ) -> Result<bool> {
169        let now = Utc::now();
170        let result = sqlx::query!(
171            r#"
172            UPDATE funnel_progress
173            SET completed_at = $3, updated_at = $3
174            WHERE funnel_id = $1 AND session_id = $2
175            "#,
176            funnel_id.as_str(),
177            session_id.as_str(),
178            now
179        )
180        .execute(&*self.pool)
181        .await?;
182
183        Ok(result.rows_affected() > 0)
184    }
185}