1use serde_json::Value;
2use sqlx::{Executor, Postgres};
3use stormchaser_model::{BackendId, RunId, StepInstanceId, TestSummary};
4use uuid::Uuid;
5
6use stormchaser_model::test_report;
7
8#[allow(clippy::too_many_arguments)]
9pub async fn upsert_run_storage_state<'a, E>(
11 executor: E,
12 run_id: Uuid,
13 storage_name: &str,
14 last_hash: &str,
15) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
16where
17 E: Executor<'a, Database = Postgres>,
18{
19 sqlx::query(
20 r#"
21 INSERT INTO run_storage_states (run_id, storage_name, last_hash)
22 VALUES ($1, $2, $3)
23 ON CONFLICT (run_id, storage_name) DO UPDATE SET last_hash = EXCLUDED.last_hash, updated_at = NOW()
24 "#,
25 )
26 .bind(run_id)
27 .bind(storage_name)
28 .bind(last_hash)
29 .execute(executor)
30 .await
31}
32
33#[allow(clippy::too_many_arguments)]
34pub async fn get_storage_backend_id_by_name<'a, E, O>(
36 executor: E,
37 name: &str,
38) -> Result<Option<O>, sqlx::Error>
39where
40 E: Executor<'a, Database = Postgres>,
41 O: Send + Unpin,
42 (O,): for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
43{
44 sqlx::query_scalar::<_, O>("SELECT id FROM storage_backends WHERE name = $1")
45 .bind(name)
46 .fetch_optional(executor)
47 .await
48}
49
50#[allow(clippy::too_many_arguments)]
51pub async fn get_default_sfs_backend_id<'a, E, O>(executor: E) -> Result<Option<O>, sqlx::Error>
53where
54 E: Executor<'a, Database = Postgres>,
55 O: Send + Unpin,
56 (O,): for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
57{
58 sqlx::query_scalar::<_, O>(
59 "SELECT id FROM storage_backends WHERE is_default_sfs = TRUE LIMIT 1",
60 )
61 .fetch_optional(executor)
62 .await
63}
64
65#[allow(clippy::too_many_arguments)]
66pub async fn insert_artifact_registry<'a, E>(
68 executor: E,
69 run_id: RunId,
70 step_instance_id: StepInstanceId,
71 artifact_name: &str,
72 backend_id: BackendId,
73 remote_path: String,
74 metadata: Value,
75) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
76where
77 E: Executor<'a, Database = Postgres>,
78{
79 sqlx::query(
80 r#"
81 INSERT INTO artifact_registry (run_id, step_instance_id, artifact_name, backend_id, remote_path, metadata)
82 VALUES ($1, $2, $3, $4, $5, $6)
83 "#,
84 )
85 .bind(run_id)
86 .bind(step_instance_id)
87 .bind(artifact_name)
88 .bind(backend_id)
89 .bind(remote_path)
90 .bind(metadata)
91 .execute(executor)
92 .await
93}
94
95#[allow(clippy::too_many_arguments)]
96pub async fn insert_step_test_report<'a, E>(
98 executor: E,
99 run_id: Uuid,
100 step_instance_id: Uuid,
101 report_name: &str,
102 file_name: &str,
103 format: &str,
104 content: Option<&str>,
105 checksum: &str,
106 backend_id: Option<Uuid>,
107 remote_path: Option<&str>,
108) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
109where
110 E: Executor<'a, Database = Postgres>,
111{
112 sqlx::query(
113 r#"
114 INSERT INTO step_test_reports (run_id, step_instance_id, report_name, file_name, format, content, checksum, backend_id, remote_path)
115 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
116 "#,
117 )
118 .bind(run_id)
119 .bind(step_instance_id)
120 .bind(report_name)
121 .bind(file_name)
122 .bind(format)
123 .bind(content)
124 .bind(checksum)
125 .bind(backend_id)
126 .bind(remote_path)
127 .execute(executor)
128 .await
129}
130
131#[allow(clippy::too_many_arguments)]
132pub async fn insert_step_test_summary<'a, E>(
134 executor: E,
135 run_id: Uuid,
136 step_instance_id: Uuid,
137 report_name: &str,
138 summary: &TestSummary,
139) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
140where
141 E: Executor<'a, Database = Postgres>,
142{
143 sqlx::query(
144 r#"
145 INSERT INTO step_test_summaries (run_id, step_instance_id, report_name, total_tests, passed, failed, skipped, errors, duration_ms)
146 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
147 "#,
148 )
149 .bind(run_id)
150 .bind(step_instance_id)
151 .bind(report_name)
152 .bind(summary.total_tests)
153 .bind(summary.passed)
154 .bind(summary.failed)
155 .bind(summary.skipped)
156 .bind(summary.errors)
157 .bind(summary.duration_ms)
158 .execute(executor)
159 .await
160}
161
162pub async fn insert_step_test_case<'a, E>(
164 executor: E,
165 run_id: Uuid,
166 step_instance_id: Uuid,
167 report_name: &str,
168 test_case: &test_report::TestCase,
169) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
170where
171 E: Executor<'a, Database = Postgres>,
172{
173 sqlx::query(
174 r#"
175 INSERT INTO step_test_cases (run_id, step_instance_id, report_name, test_suite, test_case, status, duration_ms, message)
176 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
177 "#,
178 )
179 .bind(run_id)
180 .bind(step_instance_id)
181 .bind(report_name)
182 .bind(&test_case.test_suite)
183 .bind(&test_case.test_case)
184 .bind(&test_case.status)
185 .bind(test_case.duration_ms)
186 .bind(&test_case.message)
187 .execute(executor)
188 .await
189}
190
191#[allow(clippy::too_many_arguments)]
192pub async fn get_storage_backend_by_name<'a, E, O>(
194 executor: E,
195 name: &str,
196) -> Result<Option<O>, sqlx::Error>
197where
198 E: Executor<'a, Database = Postgres>,
199 O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
200{
201 sqlx::query_as::<_, O>("SELECT * FROM storage_backends WHERE name = $1")
202 .bind(name)
203 .fetch_optional(executor)
204 .await
205}
206
207#[allow(clippy::too_many_arguments)]
208pub async fn get_default_sfs_backend<'a, E, O>(executor: E) -> Result<Option<O>, sqlx::Error>
210where
211 E: Executor<'a, Database = Postgres>,
212 O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
213{
214 sqlx::query_as::<_, O>("SELECT * FROM storage_backends WHERE is_default_sfs = TRUE LIMIT 1")
215 .fetch_optional(executor)
216 .await
217}
218
219#[allow(clippy::too_many_arguments)]
220pub async fn get_run_storage_last_hash<'a, E, O>(
222 executor: E,
223 run_id: Uuid,
224 storage_name: &str,
225) -> Result<Option<O>, sqlx::Error>
226where
227 E: Executor<'a, Database = Postgres>,
228 O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
229{
230 sqlx::query_as::<_, O>(
231 "SELECT last_hash FROM run_storage_states WHERE run_id = $1 AND storage_name = $2",
232 )
233 .bind(run_id)
234 .bind(storage_name)
235 .fetch_optional(executor)
236 .await
237}
238
239pub async fn get_storage_backend_by_id<'a, E, O>(
241 executor: E,
242 id: Uuid,
243) -> Result<Option<O>, sqlx::Error>
244where
245 E: Executor<'a, Database = Postgres>,
246 O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
247{
248 sqlx::query_as::<_, O>("SELECT * FROM storage_backends WHERE id = $1")
249 .bind(id)
250 .fetch_optional(executor)
251 .await
252}
253
254pub async fn get_artifact_by_name<'a, E>(
255 executor: E,
256 run_id: Uuid,
257 artifact_name: &str,
258) -> Result<Option<(Uuid, String)>, sqlx::Error>
259where
260 E: Executor<'a, Database = Postgres>,
261{
262 let record = sqlx::query(
263 r#"
264 SELECT backend_id, remote_path
265 FROM artifact_registry
266 WHERE run_id = $1 AND artifact_name = $2
267 ORDER BY created_at DESC
268 LIMIT 1
269 "#,
270 )
271 .bind(run_id)
272 .bind(artifact_name)
273 .fetch_optional(executor)
274 .await?;
275
276 Ok(record.map(|r| {
277 use sqlx::Row;
278 (r.get("backend_id"), r.get("remote_path"))
279 }))
280}