stormchaser_engine/db/steps/
definitions.rs1use serde_json::Value;
2use sqlx::{Executor, Postgres};
3
4pub struct StepDefinitionInput {
6 pub step_type: String,
8 pub schema: Value,
10 pub documentation: Option<String>,
12}
13
14#[allow(clippy::too_many_arguments)]
15pub async fn upsert_step_definition<'a, E>(
17 executor: E,
18 step_type: &str,
19 schema: &Value,
20 documentation: Option<&str>,
21) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
22where
23 E: Executor<'a, Database = Postgres>,
24{
25 sqlx::query(
26 r#"
27 INSERT INTO step_definitions (step_type, schema, documentation, registered_at)
28 VALUES ($1, $2, $3, NOW())
29 ON CONFLICT (step_type) DO UPDATE SET
30 schema = EXCLUDED.schema,
31 documentation = EXCLUDED.documentation
32 "#,
33 )
34 .bind(step_type)
35 .bind(schema)
36 .bind(documentation)
37 .execute(executor)
38 .await
39}
40
41#[allow(clippy::too_many_arguments)]
42pub async fn upsert_step_definition_with_wasm<'a, E>(
44 executor: E,
45 step_type: &str,
46 schema: &Value,
47 documentation: Option<&str>,
48 wasm_module: &str,
49 wasm_function: &str,
50 wasm_config: &Value,
51) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
52where
53 E: Executor<'a, Database = Postgres>,
54{
55 sqlx::query(
56 r#"
57 INSERT INTO step_definitions (step_type, schema, documentation, registered_at, wasm_module, wasm_function, wasm_config)
58 VALUES ($1, $2, $3, NOW(), $4, $5, $6)
59 ON CONFLICT (step_type) DO UPDATE SET
60 schema = EXCLUDED.schema,
61 documentation = EXCLUDED.documentation,
62 wasm_module = EXCLUDED.wasm_module,
63 wasm_function = EXCLUDED.wasm_function,
64 wasm_config = EXCLUDED.wasm_config
65 "#,
66 )
67 .bind(step_type)
68 .bind(schema)
69 .bind(documentation)
70 .bind(wasm_module)
71 .bind(wasm_function)
72 .bind(wasm_config)
73 .execute(executor)
74 .await
75}
76
77#[allow(clippy::too_many_arguments)]
78pub async fn get_wasm_step_definition<'a, E, O>(
80 executor: E,
81 step_type: &str,
82) -> Result<Option<O>, sqlx::Error>
83where
84 E: Executor<'a, Database = Postgres>,
85 O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
86{
87 sqlx::query_as::<_, O>(
88 "SELECT wasm_module, wasm_function, wasm_config FROM step_definitions WHERE step_type = $1 AND wasm_module IS NOT NULL"
89 )
90 .bind(step_type)
91 .fetch_optional(executor)
92 .await
93}