Skip to main content

dk_engine/
pipeline.rs

1use sqlx::PgPool;
2use uuid::Uuid;
3
4use dk_core::RepoId;
5
6#[derive(Debug, Clone)]
7pub struct PipelineStep {
8    pub repo_id: RepoId,
9    pub step_order: i32,
10    pub step_type: String,
11    pub config: serde_json::Value,
12    pub required: bool,
13}
14
15#[derive(Debug, Clone)]
16pub struct VerificationResult {
17    pub id: Uuid,
18    pub changeset_id: Uuid,
19    pub step_order: i32,
20    pub status: String,
21    pub output: Option<String>,
22}
23
24pub struct PipelineStore {
25    db: PgPool,
26}
27
28impl PipelineStore {
29    pub fn new(db: PgPool) -> Self {
30        Self { db }
31    }
32
33    pub async fn get_pipeline(&self, repo_id: RepoId) -> dk_core::Result<Vec<PipelineStep>> {
34        let rows: Vec<(Uuid, i32, String, serde_json::Value, bool)> = sqlx::query_as(
35            "SELECT repo_id, step_order, step_type, config, required FROM verification_pipelines WHERE repo_id = $1 ORDER BY step_order",
36        )
37        .bind(repo_id)
38        .fetch_all(&self.db)
39        .await?;
40
41        Ok(rows
42            .into_iter()
43            .map(|r| PipelineStep {
44                repo_id: r.0,
45                step_order: r.1,
46                step_type: r.2,
47                config: r.3,
48                required: r.4,
49            })
50            .collect())
51    }
52
53    pub async fn set_pipeline(&self, repo_id: RepoId, steps: &[PipelineStep]) -> dk_core::Result<()> {
54        sqlx::query("DELETE FROM verification_pipelines WHERE repo_id = $1")
55            .bind(repo_id)
56            .execute(&self.db)
57            .await?;
58
59        for step in steps {
60            sqlx::query(
61                r#"INSERT INTO verification_pipelines (repo_id, step_order, step_type, config, required)
62                   VALUES ($1, $2, $3, $4, $5)"#,
63            )
64            .bind(repo_id)
65            .bind(step.step_order)
66            .bind(&step.step_type)
67            .bind(&step.config)
68            .bind(step.required)
69            .execute(&self.db)
70            .await?;
71        }
72        Ok(())
73    }
74
75    pub async fn record_result(
76        &self,
77        changeset_id: Uuid,
78        step_order: i32,
79        status: &str,
80        output: Option<&str>,
81    ) -> dk_core::Result<VerificationResult> {
82        let row: (Uuid,) = sqlx::query_as(
83            r#"INSERT INTO verification_results (changeset_id, step_order, status, output, started_at, completed_at)
84               VALUES ($1, $2, $3, $4, now(), CASE WHEN $3 IN ('pass', 'fail', 'skip') THEN now() ELSE NULL END)
85               RETURNING id"#,
86        )
87        .bind(changeset_id)
88        .bind(step_order)
89        .bind(status)
90        .bind(output)
91        .fetch_one(&self.db)
92        .await?;
93
94        Ok(VerificationResult {
95            id: row.0,
96            changeset_id,
97            step_order,
98            status: status.to_string(),
99            output: output.map(String::from),
100        })
101    }
102
103    pub async fn get_results(&self, changeset_id: Uuid) -> dk_core::Result<Vec<VerificationResult>> {
104        let rows: Vec<(Uuid, Uuid, i32, String, Option<String>)> = sqlx::query_as(
105            "SELECT id, changeset_id, step_order, status, output FROM verification_results WHERE changeset_id = $1 ORDER BY step_order",
106        )
107        .bind(changeset_id)
108        .fetch_all(&self.db)
109        .await?;
110
111        Ok(rows
112            .into_iter()
113            .map(|r| VerificationResult {
114                id: r.0,
115                changeset_id: r.1,
116                step_order: r.2,
117                status: r.3,
118                output: r.4,
119            })
120            .collect())
121    }
122}