Skip to main content

stormchaser_engine/db/steps/
definitions.rs

1use serde_json::Value;
2use sqlx::{Executor, Postgres};
3
4/// Stepdefinitioninput.
5pub struct StepDefinitionInput {
6    /// The step type.
7    pub step_type: String,
8    /// The schema.
9    pub schema: Value,
10    /// The documentation.
11    pub documentation: Option<String>,
12}
13
14#[allow(clippy::too_many_arguments)]
15/// Upsert step definition.
16pub async fn upsert_step_definition<'a, E>(
17    executor: E,
18    step_type: &str,
19    schema: &Value,
20    documentation: Option<&str>,
21) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
22where
23    E: Executor<'a, Database = Postgres>,
24{
25    sqlx::query(
26        r#"
27                INSERT INTO step_definitions (step_type, schema, documentation, registered_at)
28                VALUES ($1, $2, $3, NOW())
29                ON CONFLICT (step_type) DO UPDATE SET
30                    schema = EXCLUDED.schema,
31                    documentation = EXCLUDED.documentation
32                "#,
33    )
34    .bind(step_type)
35    .bind(schema)
36    .bind(documentation)
37    .execute(executor)
38    .await
39}
40
41#[allow(clippy::too_many_arguments)]
42/// Upsert step definition with wasm.
43pub async fn upsert_step_definition_with_wasm<'a, E>(
44    executor: E,
45    step_type: &str,
46    schema: &Value,
47    documentation: Option<&str>,
48    wasm_module: &str,
49    wasm_function: &str,
50    wasm_config: &Value,
51) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
52where
53    E: Executor<'a, Database = Postgres>,
54{
55    sqlx::query(
56        r#"
57        INSERT INTO step_definitions (step_type, schema, documentation, registered_at, wasm_module, wasm_function, wasm_config)
58        VALUES ($1, $2, $3, NOW(), $4, $5, $6)
59        ON CONFLICT (step_type) DO UPDATE SET
60            schema = EXCLUDED.schema,
61            documentation = EXCLUDED.documentation,
62            wasm_module = EXCLUDED.wasm_module,
63            wasm_function = EXCLUDED.wasm_function,
64            wasm_config = EXCLUDED.wasm_config
65        "#,
66    )
67    .bind(step_type)
68    .bind(schema)
69    .bind(documentation)
70    .bind(wasm_module)
71    .bind(wasm_function)
72    .bind(wasm_config)
73    .execute(executor)
74    .await
75}
76
77#[allow(clippy::too_many_arguments)]
78/// Get wasm step definition.
79pub async fn get_wasm_step_definition<'a, E, O>(
80    executor: E,
81    step_type: &str,
82) -> Result<Option<O>, sqlx::Error>
83where
84    E: Executor<'a, Database = Postgres>,
85    O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
86{
87    sqlx::query_as::<_, O>(
88        "SELECT wasm_module, wasm_function, wasm_config FROM step_definitions WHERE step_type = $1 AND wasm_module IS NOT NULL"
89    )
90    .bind(step_type)
91    .fetch_optional(executor)
92    .await
93}