wasm_sql/core/bindings/
executor.rs1use wasmtime::component::Accessor;
2
3use crate::{
4 core::bindings::{
5 SqlHostState,
6 generated::wasm_sql::core::{
7 query::QueryExecutor,
8 query_types::{SqlQuery, SqlString},
9 util_types::Error,
10 },
11 },
12 sqldb::SqlDatabase,
13};
14
15#[allow(dead_code)]
16pub enum QueryOrRaw {
17 Query(SqlQuery),
18 Raw(SqlString),
19}
20
21#[macro_export]
22macro_rules! execute_with {
23 ($executor:expr, $accessor:expr, $query:expr, $op:ident) => {{
24 use crate::core::bindings::generated::wasm_sql::core::util_types::Error;
25 use sqlx::Executor;
26
27 match $query {
28 QueryOrRaw::Query(query) => {
29 let query = {
30 let persistent = query.persistent.unwrap_or(true);
31
32 if let Some(args_resource) = query.args {
33 let args = $accessor.with(|mut access| {
34 let st = access.get();
35 st.table.delete(args_resource).map(|x| x.args.into_inner())
36 })?;
37 sqlx::query_with(&query.sql, args).persistent(persistent)
38 } else {
39 sqlx::query(&query.sql).persistent(persistent)
40 }
41
42 };
43
44 let result = $executor.$op(query).await?;
45 Result::<_, Error>::Ok(result)
46 }
47 QueryOrRaw::Raw(sql) => {
48 let query = sqlx::raw_sql(&sql);
49 let result = $executor.$op(query).await?;
50 Result::<_, Error>::Ok(result)
51 }
52 }
53 }};
54}
55
56#[allow(dead_code)]
57pub(crate) trait ErasedExecutor<D: wasmtime::component::HasData> {
58 async fn fetch_all<T>(
59 &self,
60 query: QueryOrRaw,
61 accessor: &Accessor<T, D>,
62 ) -> Result<Vec<<SqlDatabase as sqlx::Database>::Row>, Error>;
63
64 async fn execute<T>(
65 &self,
66 query: QueryOrRaw,
67 accessor: &Accessor<T, D>,
68 ) -> Result<<SqlDatabase as sqlx::Database>::QueryResult, Error>;
69}
70
71impl ErasedExecutor<SqlHostState> for QueryExecutor {
72 async fn fetch_all<T>(
73 &self,
74 query: QueryOrRaw,
75 accessor: &Accessor<T, SqlHostState>,
76 ) -> Result<Vec<<SqlDatabase as sqlx::Database>::Row>, Error> {
77 match self {
78 QueryExecutor::Pool => {
79 let pool = accessor.with(|mut access| access.get().sql_db.pool.clone());
80 let rows = execute_with!(pool, accessor, query, fetch_all)?;
81
82 Ok(rows)
83 }
84 QueryExecutor::Connection(resource) => {
85 let conn_impl = accessor.with(|mut access| {
86 let state = access.get();
87 state.table.get(&resource).map(|conn| conn.clone())
88 })?;
89
90 conn_impl.fetch_all(query, accessor).await
91 }
92 QueryExecutor::Transaction(resource) => {
93 let tx_impl = accessor.with(|mut access| {
94 let state = access.get();
95 state.table.get(&resource).map(|tx| tx.clone())
96 })?;
97
98 tx_impl.fetch_all(query, accessor).await
99 }
100 }
101 }
102
103 async fn execute<T>(
104 &self,
105 query: QueryOrRaw,
106 accessor: &Accessor<T, SqlHostState>,
107 ) -> Result<<SqlDatabase as sqlx::Database>::QueryResult, Error> {
108 match self {
109 QueryExecutor::Pool => {
110 let pool = accessor.with(|mut access| access.get().sql_db.pool.clone());
111 let rows = execute_with!(pool, accessor, query, execute)?;
112 Ok(rows)
113 }
114 QueryExecutor::Connection(resource) => {
115 let conn_impl = accessor.with(|mut access| {
116 let state = access.get();
117 state.table.get(&resource).map(|conn| conn.clone())
118 })?;
119
120 conn_impl.execute(query, accessor).await
121 }
122 QueryExecutor::Transaction(resource) => {
123 let tx_impl = accessor.with(|mut access| {
124 let state = access.get();
125 state.table.get(&resource).map(|tx| tx.clone())
126 })?;
127
128 tx_impl.execute(query, accessor).await
129 }
130 }
131 }
132}