1use sqlx::PgPool;
2use uuid::Uuid;
3
4use dk_core::RepoId;
5
6#[derive(Debug, Clone)]
7pub struct PipelineStep {
8 pub repo_id: RepoId,
9 pub step_order: i32,
10 pub step_type: String,
11 pub config: serde_json::Value,
12 pub required: bool,
13}
14
15#[derive(Debug, Clone)]
16pub struct VerificationResult {
17 pub id: Uuid,
18 pub changeset_id: Uuid,
19 pub step_order: i32,
20 pub status: String,
21 pub output: Option<String>,
22}
23
24pub struct PipelineStore {
25 db: PgPool,
26}
27
28impl PipelineStore {
29 pub fn new(db: PgPool) -> Self {
30 Self { db }
31 }
32
33 pub async fn get_pipeline(&self, repo_id: RepoId) -> dk_core::Result<Vec<PipelineStep>> {
34 let rows: Vec<(Uuid, i32, String, serde_json::Value, bool)> = sqlx::query_as(
35 "SELECT repo_id, step_order, step_type, config, required FROM verification_pipelines WHERE repo_id = $1 ORDER BY step_order",
36 )
37 .bind(repo_id)
38 .fetch_all(&self.db)
39 .await?;
40
41 Ok(rows
42 .into_iter()
43 .map(|r| PipelineStep {
44 repo_id: r.0,
45 step_order: r.1,
46 step_type: r.2,
47 config: r.3,
48 required: r.4,
49 })
50 .collect())
51 }
52
53 pub async fn set_pipeline(&self, repo_id: RepoId, steps: &[PipelineStep]) -> dk_core::Result<()> {
54 sqlx::query("DELETE FROM verification_pipelines WHERE repo_id = $1")
55 .bind(repo_id)
56 .execute(&self.db)
57 .await?;
58
59 for step in steps {
60 sqlx::query(
61 r#"INSERT INTO verification_pipelines (repo_id, step_order, step_type, config, required)
62 VALUES ($1, $2, $3, $4, $5)"#,
63 )
64 .bind(repo_id)
65 .bind(step.step_order)
66 .bind(&step.step_type)
67 .bind(&step.config)
68 .bind(step.required)
69 .execute(&self.db)
70 .await?;
71 }
72 Ok(())
73 }
74
75 pub async fn record_result(
76 &self,
77 changeset_id: Uuid,
78 step_order: i32,
79 status: &str,
80 output: Option<&str>,
81 ) -> dk_core::Result<VerificationResult> {
82 let row: (Uuid,) = sqlx::query_as(
83 r#"INSERT INTO verification_results (changeset_id, step_order, status, output, started_at, completed_at)
84 VALUES ($1, $2, $3, $4, now(), CASE WHEN $3 IN ('pass', 'fail', 'skip') THEN now() ELSE NULL END)
85 RETURNING id"#,
86 )
87 .bind(changeset_id)
88 .bind(step_order)
89 .bind(status)
90 .bind(output)
91 .fetch_one(&self.db)
92 .await?;
93
94 Ok(VerificationResult {
95 id: row.0,
96 changeset_id,
97 step_order,
98 status: status.to_string(),
99 output: output.map(String::from),
100 })
101 }
102
103 pub async fn get_results(&self, changeset_id: Uuid) -> dk_core::Result<Vec<VerificationResult>> {
104 let rows: Vec<(Uuid, Uuid, i32, String, Option<String>)> = sqlx::query_as(
105 "SELECT id, changeset_id, step_order, status, output FROM verification_results WHERE changeset_id = $1 ORDER BY step_order",
106 )
107 .bind(changeset_id)
108 .fetch_all(&self.db)
109 .await?;
110
111 Ok(rows
112 .into_iter()
113 .map(|r| VerificationResult {
114 id: r.0,
115 changeset_id: r.1,
116 step_order: r.2,
117 status: r.3,
118 output: r.4,
119 })
120 .collect())
121 }
122}