Skip to main content

wasm_sql/core/bindings/
connection.rs

1use std::sync::Arc;
2
3use sqlx::pool::PoolConnection;
4use tokio::sync::RwLock;
5
6use crate::{
7    core::bindings::{
8        SqlHostState,
9        error::IgnoreNotPresent,
10        executor::{ErasedExecutor, QueryOrRaw},
11        generated::wasm_sql::core::{
12            connection::Connection, transaction::Transaction, util_types::Error,
13        },
14        transaction::{ConnectionBoundTask, TransactionCommand},
15    },
16    execute_with,
17    sqldb::SqlDatabase,
18};
19
20use crate::core::bindings::transaction::TransactionImpl;
21
22#[derive(Clone)]
23#[allow(dead_code)]
24pub struct ConnectionImpl {
25    pub(crate) connection: Arc<RwLock<PoolConnection<SqlDatabase>>>,
26}
27
28impl crate::core::bindings::generated::wasm_sql::core::connection::Host for SqlHostState {}
29
30impl crate::core::bindings::generated::wasm_sql::core::connection::HostConnection for SqlHostState {
31    async fn drop(
32        &mut self,
33        rep: wasmtime::component::Resource<Connection>,
34    ) -> wasmtime::Result<()> {
35        self.table.delete(rep).option_not_present()?;
36        Ok(())
37    }
38
39    async fn release(&mut self, _this: wasmtime::component::Resource<Connection>) {
40        let _ = self.drop(_this).await;
41    }
42}
43
44impl crate::core::bindings::generated::wasm_sql::core::connection::HostConnectionWithStore
45    for SqlHostState
46{
47    async fn begin_transaction<T>(
48        accessor: &wasmtime::component::Accessor<T, Self>,
49        self_: wasmtime::component::Resource<Connection>,
50    ) -> Result<wasmtime::component::Resource<Transaction>, Error> {
51        let (sender, receiver) = tokio::sync::mpsc::channel::<TransactionCommand>(1);
52        let handle = accessor.spawn(ConnectionBoundTask {
53            resource: self_,
54            receiver,
55        });
56
57        let tx_impl = TransactionImpl::ConnectionBound {
58            handle: Arc::new(handle),
59            sender,
60        };
61
62        let resource = accessor.with(|mut access| {
63            let state = access.get();
64
65            state.table.push(tx_impl)
66        })?;
67
68        return Ok(resource);
69    }
70}
71
72impl ErasedExecutor<SqlHostState> for ConnectionImpl {
73    async fn fetch_all<T>(
74        &self,
75        query: QueryOrRaw,
76        accessor: &wasmtime::component::Accessor<T, SqlHostState>,
77    ) -> Result<Vec<<SqlDatabase as sqlx::Database>::Row>, Error> {
78        let mut guard = self.connection.write().await;
79
80        execute_with!(guard, accessor, query, fetch_all)
81    }
82
83    async fn execute<T>(
84        &self,
85        query: QueryOrRaw,
86        accessor: &wasmtime::component::Accessor<T, SqlHostState>,
87    ) -> Result<<SqlDatabase as sqlx::Database>::QueryResult, Error> {
88        let mut guard = self.connection.write().await;
89
90        execute_with!(guard, accessor, query, execute)
91    }
92}