Skip to main content

systemprompt_analytics/repository/funnel/
mutations.rs

1//! Funnel and funnel-progress mutations for `FunnelRepository`.
2//!
3//! Creates funnels with their ordered steps, deactivates and deletes them,
4//! and advances or completes per-session progress. All writes go to the write
5//! pool; `record_progress` only moves a session forward, never backward.
6
7use crate::Result;
8use chrono::Utc;
9use systemprompt_identifiers::{FunnelId, FunnelProgressId, SessionId};
10
11use super::FunnelRepository;
12use crate::models::{CreateFunnelInput, Funnel, FunnelProgress, FunnelStep, FunnelWithSteps};
13
14impl FunnelRepository {
15    pub async fn create_funnel(&self, input: &CreateFunnelInput) -> Result<FunnelWithSteps> {
16        let funnel_id = FunnelId::generate();
17        let now = Utc::now();
18
19        sqlx::query!(
20            r#"
21            INSERT INTO funnels (id, name, description, is_active, created_at, updated_at)
22            VALUES ($1, $2, $3, TRUE, $4, $4)
23            "#,
24            funnel_id.as_str(),
25            input.name,
26            input.description,
27            now
28        )
29        .execute(&*self.write_pool)
30        .await?;
31
32        let mut funnel_ids_arr = Vec::with_capacity(input.steps.len());
33        let mut step_orders = Vec::with_capacity(input.steps.len());
34        let mut names = Vec::with_capacity(input.steps.len());
35        let mut patterns = Vec::with_capacity(input.steps.len());
36        let mut match_types = Vec::with_capacity(input.steps.len());
37        let mut steps = Vec::with_capacity(input.steps.len());
38
39        for (idx, step_input) in input.steps.iter().enumerate() {
40            let step_order = i32::try_from(idx).unwrap_or(0);
41            funnel_ids_arr.push(funnel_id.as_str().to_owned());
42            step_orders.push(step_order);
43            names.push(step_input.name.clone());
44            patterns.push(step_input.match_pattern.clone());
45            match_types.push(step_input.match_type.as_str().to_owned());
46
47            steps.push(FunnelStep {
48                funnel_id: funnel_id.clone(),
49                step_order,
50                name: step_input.name.clone(),
51                match_pattern: step_input.match_pattern.clone(),
52                match_type: step_input.match_type,
53            });
54        }
55
56        if !steps.is_empty() {
57            sqlx::query!(
58                r#"
59                INSERT INTO funnel_steps (funnel_id, step_order, name, match_pattern, match_type)
60                SELECT * FROM UNNEST($1::text[], $2::int4[], $3::text[], $4::text[], $5::text[])
61                "#,
62                &funnel_ids_arr,
63                &step_orders,
64                &names,
65                &patterns,
66                &match_types
67            )
68            .execute(&*self.write_pool)
69            .await?;
70        }
71
72        let funnel = Funnel {
73            id: funnel_id,
74            name: input.name.clone(),
75            description: input.description.clone(),
76            is_active: true,
77            created_at: now,
78            updated_at: now,
79        };
80
81        Ok(FunnelWithSteps { funnel, steps })
82    }
83
84    pub async fn deactivate(&self, id: &FunnelId) -> Result<bool> {
85        let result = sqlx::query!(
86            r#"
87            UPDATE funnels SET is_active = FALSE, updated_at = $2 WHERE id = $1
88            "#,
89            id.as_str(),
90            Utc::now()
91        )
92        .execute(&*self.write_pool)
93        .await?;
94
95        Ok(result.rows_affected() > 0)
96    }
97
98    pub async fn delete(&self, id: &FunnelId) -> Result<bool> {
99        let result = sqlx::query!(r#"DELETE FROM funnels WHERE id = $1"#, id.as_str())
100            .execute(&*self.write_pool)
101            .await?;
102
103        Ok(result.rows_affected() > 0)
104    }
105
106    pub async fn record_progress(
107        &self,
108        funnel_id: &FunnelId,
109        session_id: &SessionId,
110        step: i32,
111    ) -> Result<FunnelProgress> {
112        let now = Utc::now();
113        let step_timestamp = serde_json::json!({
114            "step": step,
115            "timestamp": now.to_rfc3339()
116        });
117
118        if let Some(mut progress) = self.find_progress(funnel_id, session_id).await? {
119            if step > progress.current_step {
120                let mut timestamps = progress
121                    .step_timestamps
122                    .as_array()
123                    .cloned()
124                    .unwrap_or_else(Vec::new);
125                timestamps.push(step_timestamp);
126
127                sqlx::query!(
128                    r#"
129                    UPDATE funnel_progress
130                    SET current_step = $3, step_timestamps = $4, updated_at = $5
131                    WHERE funnel_id = $1 AND session_id = $2
132                    "#,
133                    funnel_id.as_str(),
134                    session_id.as_str(),
135                    step,
136                    serde_json::Value::Array(timestamps.clone()),
137                    now
138                )
139                .execute(&*self.write_pool)
140                .await?;
141
142                progress.current_step = step;
143                progress.step_timestamps = serde_json::Value::Array(timestamps);
144                progress.updated_at = now;
145            }
146            return Ok(progress);
147        }
148
149        let id = FunnelProgressId::generate();
150        let timestamps = serde_json::json!([step_timestamp]);
151
152        sqlx::query!(
153            r#"
154            INSERT INTO funnel_progress (
155                id, funnel_id, session_id, current_step, step_timestamps, created_at, updated_at
156            )
157            VALUES ($1, $2, $3, $4, $5, $6, $6)
158            "#,
159            id.as_str(),
160            funnel_id.as_str(),
161            session_id.as_str(),
162            step,
163            timestamps,
164            now
165        )
166        .execute(&*self.write_pool)
167        .await?;
168
169        Ok(FunnelProgress {
170            id,
171            funnel_id: funnel_id.clone(),
172            session_id: session_id.clone(),
173            current_step: step,
174            completed_at: None,
175            dropped_at_step: None,
176            step_timestamps: timestamps,
177            created_at: now,
178            updated_at: now,
179        })
180    }
181
182    pub async fn mark_completed(
183        &self,
184        funnel_id: &FunnelId,
185        session_id: &SessionId,
186    ) -> Result<bool> {
187        let now = Utc::now();
188        let result = sqlx::query!(
189            r#"
190            UPDATE funnel_progress
191            SET completed_at = $3, updated_at = $3
192            WHERE funnel_id = $1 AND session_id = $2
193            "#,
194            funnel_id.as_str(),
195            session_id.as_str(),
196            now
197        )
198        .execute(&*self.write_pool)
199        .await?;
200
201        Ok(result.rows_affected() > 0)
202    }
203}