Skip to main content

stormchaser_engine/db/
storage.rs

1use serde_json::Value;
2use sqlx::{Executor, Postgres};
3use stormchaser_model::{BackendId, RunId, StepInstanceId, TestSummary};
4use uuid::Uuid;
5
6use stormchaser_model::test_report;
7
8#[allow(clippy::too_many_arguments)]
9/// Upsert run storage state.
10pub async fn upsert_run_storage_state<'a, E>(
11    executor: E,
12    run_id: Uuid,
13    storage_name: &str,
14    last_hash: &str,
15) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
16where
17    E: Executor<'a, Database = Postgres>,
18{
19    sqlx::query(
20        r#"
21                    INSERT INTO run_storage_states (run_id, storage_name, last_hash)
22                    VALUES ($1, $2, $3)
23                    ON CONFLICT (run_id, storage_name) DO UPDATE SET last_hash = EXCLUDED.last_hash, updated_at = NOW()
24                    "#,
25    )
26    .bind(run_id)
27    .bind(storage_name)
28    .bind(last_hash)
29    .execute(executor)
30    .await
31}
32
33#[allow(clippy::too_many_arguments)]
34/// Get storage backend id by name.
35pub async fn get_storage_backend_id_by_name<'a, E, O>(
36    executor: E,
37    name: &str,
38) -> Result<Option<O>, sqlx::Error>
39where
40    E: Executor<'a, Database = Postgres>,
41    O: Send + Unpin,
42    (O,): for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
43{
44    sqlx::query_scalar::<_, O>("SELECT id FROM storage_backends WHERE name = $1")
45        .bind(name)
46        .fetch_optional(executor)
47        .await
48}
49
50#[allow(clippy::too_many_arguments)]
51/// Get default sfs backend id.
52pub async fn get_default_sfs_backend_id<'a, E, O>(executor: E) -> Result<Option<O>, sqlx::Error>
53where
54    E: Executor<'a, Database = Postgres>,
55    O: Send + Unpin,
56    (O,): for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
57{
58    sqlx::query_scalar::<_, O>(
59        "SELECT id FROM storage_backends WHERE is_default_sfs = TRUE LIMIT 1",
60    )
61    .fetch_optional(executor)
62    .await
63}
64
65#[allow(clippy::too_many_arguments)]
66/// Insert artifact registry.
67pub async fn insert_artifact_registry<'a, E>(
68    executor: E,
69    run_id: RunId,
70    step_instance_id: StepInstanceId,
71    artifact_name: &str,
72    backend_id: BackendId,
73    remote_path: String,
74    metadata: Value,
75) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
76where
77    E: Executor<'a, Database = Postgres>,
78{
79    sqlx::query(
80        r#"
81                        INSERT INTO artifact_registry (run_id, step_instance_id, artifact_name, backend_id, remote_path, metadata)
82                        VALUES ($1, $2, $3, $4, $5, $6)
83                        "#,
84    )
85    .bind(run_id)
86    .bind(step_instance_id)
87    .bind(artifact_name)
88    .bind(backend_id)
89    .bind(remote_path)
90    .bind(metadata)
91    .execute(executor)
92    .await
93}
94
95#[allow(clippy::too_many_arguments)]
96/// Insert step test report.
97pub async fn insert_step_test_report<'a, E>(
98    executor: E,
99    run_id: Uuid,
100    step_instance_id: Uuid,
101    report_name: &str,
102    file_name: &str,
103    format: &str,
104    content: Option<&str>,
105    checksum: &str,
106    backend_id: Option<Uuid>,
107    remote_path: Option<&str>,
108) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
109where
110    E: Executor<'a, Database = Postgres>,
111{
112    sqlx::query(
113        r#"
114                    INSERT INTO step_test_reports (run_id, step_instance_id, report_name, file_name, format, content, checksum, backend_id, remote_path)
115                    VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
116                    "#,
117    )
118    .bind(run_id)
119    .bind(step_instance_id)
120    .bind(report_name)
121    .bind(file_name)
122    .bind(format)
123    .bind(content)
124    .bind(checksum)
125    .bind(backend_id)
126    .bind(remote_path)
127    .execute(executor)
128    .await
129}
130
131#[allow(clippy::too_many_arguments)]
132/// Insert step test summary.
133pub async fn insert_step_test_summary<'a, E>(
134    executor: E,
135    run_id: Uuid,
136    step_instance_id: Uuid,
137    report_name: &str,
138    summary: &TestSummary,
139) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
140where
141    E: Executor<'a, Database = Postgres>,
142{
143    sqlx::query(
144        r#"
145                    INSERT INTO step_test_summaries (run_id, step_instance_id, report_name, total_tests, passed, failed, skipped, errors, duration_ms)
146                    VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
147                    "#,
148    )
149    .bind(run_id)
150    .bind(step_instance_id)
151    .bind(report_name)
152    .bind(summary.total_tests)
153    .bind(summary.passed)
154    .bind(summary.failed)
155    .bind(summary.skipped)
156    .bind(summary.errors)
157    .bind(summary.duration_ms)
158    .execute(executor)
159    .await
160}
161
162/// Insert step test case.
163pub async fn insert_step_test_case<'a, E>(
164    executor: E,
165    run_id: Uuid,
166    step_instance_id: Uuid,
167    report_name: &str,
168    test_case: &test_report::TestCase,
169) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
170where
171    E: Executor<'a, Database = Postgres>,
172{
173    sqlx::query(
174        r#"
175                    INSERT INTO step_test_cases (run_id, step_instance_id, report_name, test_suite, test_case, status, duration_ms, message)
176                    VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
177                    "#,
178    )
179    .bind(run_id)
180    .bind(step_instance_id)
181    .bind(report_name)
182    .bind(&test_case.test_suite)
183    .bind(&test_case.test_case)
184    .bind(&test_case.status)
185    .bind(test_case.duration_ms)
186    .bind(&test_case.message)
187    .execute(executor)
188    .await
189}
190
191#[allow(clippy::too_many_arguments)]
192/// Get storage backend by name.
193pub async fn get_storage_backend_by_name<'a, E, O>(
194    executor: E,
195    name: &str,
196) -> Result<Option<O>, sqlx::Error>
197where
198    E: Executor<'a, Database = Postgres>,
199    O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
200{
201    sqlx::query_as::<_, O>("SELECT * FROM storage_backends WHERE name = $1")
202        .bind(name)
203        .fetch_optional(executor)
204        .await
205}
206
207#[allow(clippy::too_many_arguments)]
208/// Get default sfs backend.
209pub async fn get_default_sfs_backend<'a, E, O>(executor: E) -> Result<Option<O>, sqlx::Error>
210where
211    E: Executor<'a, Database = Postgres>,
212    O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
213{
214    sqlx::query_as::<_, O>("SELECT * FROM storage_backends WHERE is_default_sfs = TRUE LIMIT 1")
215        .fetch_optional(executor)
216        .await
217}
218
219#[allow(clippy::too_many_arguments)]
220/// Get run storage last hash.
221pub async fn get_run_storage_last_hash<'a, E, O>(
222    executor: E,
223    run_id: Uuid,
224    storage_name: &str,
225) -> Result<Option<O>, sqlx::Error>
226where
227    E: Executor<'a, Database = Postgres>,
228    O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
229{
230    sqlx::query_as::<_, O>(
231        "SELECT last_hash FROM run_storage_states WHERE run_id = $1 AND storage_name = $2",
232    )
233    .bind(run_id)
234    .bind(storage_name)
235    .fetch_optional(executor)
236    .await
237}
238
239/// Get storage backend by id.
240pub async fn get_storage_backend_by_id<'a, E, O>(
241    executor: E,
242    id: Uuid,
243) -> Result<Option<O>, sqlx::Error>
244where
245    E: Executor<'a, Database = Postgres>,
246    O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
247{
248    sqlx::query_as::<_, O>("SELECT * FROM storage_backends WHERE id = $1")
249        .bind(id)
250        .fetch_optional(executor)
251        .await
252}
253
254pub async fn get_artifact_by_name<'a, E>(
255    executor: E,
256    run_id: Uuid,
257    artifact_name: &str,
258) -> Result<Option<(Uuid, String)>, sqlx::Error>
259where
260    E: Executor<'a, Database = Postgres>,
261{
262    let record = sqlx::query(
263        r#"
264        SELECT backend_id, remote_path
265        FROM artifact_registry
266        WHERE run_id = $1 AND artifact_name = $2
267        ORDER BY created_at DESC
268        LIMIT 1
269        "#,
270    )
271    .bind(run_id)
272    .bind(artifact_name)
273    .fetch_optional(executor)
274    .await?;
275
276    Ok(record.map(|r| {
277        use sqlx::Row;
278        (r.get("backend_id"), r.get("remote_path"))
279    }))
280}