wasm_sql/core/bindings/
connection.rs1use std::sync::Arc;
2
3use sqlx::pool::PoolConnection;
4use tokio::sync::RwLock;
5
6use crate::{
7 core::bindings::{
8 SqlHostState,
9 error::IgnoreNotPresent,
10 executor::{ErasedExecutor, QueryOrRaw},
11 generated::wasm_sql::core::{
12 connection::Connection, transaction::Transaction, util_types::Error,
13 },
14 transaction::{ConnectionBoundTask, TransactionCommand},
15 },
16 execute_with,
17 sqldb::SqlDatabase,
18};
19
20use crate::core::bindings::transaction::TransactionImpl;
21
22#[derive(Clone)]
23#[allow(dead_code)]
24pub struct ConnectionImpl {
25 pub(crate) connection: Arc<RwLock<PoolConnection<SqlDatabase>>>,
26}
27
28impl crate::core::bindings::generated::wasm_sql::core::connection::Host for SqlHostState {}
29
30impl crate::core::bindings::generated::wasm_sql::core::connection::HostConnection for SqlHostState {
31 async fn drop(
32 &mut self,
33 rep: wasmtime::component::Resource<Connection>,
34 ) -> wasmtime::Result<()> {
35 self.table.delete(rep).option_not_present()?;
36 Ok(())
37 }
38
39 async fn release(&mut self, _this: wasmtime::component::Resource<Connection>) {
40 let _ = self.drop(_this).await;
41 }
42}
43
44impl crate::core::bindings::generated::wasm_sql::core::connection::HostConnectionWithStore
45 for SqlHostState
46{
47 async fn begin_transaction<T>(
48 accessor: &wasmtime::component::Accessor<T, Self>,
49 self_: wasmtime::component::Resource<Connection>,
50 ) -> Result<wasmtime::component::Resource<Transaction>, Error> {
51 let (sender, receiver) = tokio::sync::mpsc::channel::<TransactionCommand>(1);
52 let handle = accessor.spawn(ConnectionBoundTask {
53 resource: self_,
54 receiver,
55 });
56
57 let tx_impl = TransactionImpl::ConnectionBound {
58 handle: Arc::new(handle),
59 sender,
60 };
61
62 let resource = accessor.with(|mut access| {
63 let state = access.get();
64
65 state.table.push(tx_impl)
66 })?;
67
68 return Ok(resource);
69 }
70}
71
72impl ErasedExecutor<SqlHostState> for ConnectionImpl {
73 async fn fetch_all<T>(
74 &self,
75 query: QueryOrRaw,
76 accessor: &wasmtime::component::Accessor<T, SqlHostState>,
77 ) -> Result<Vec<<SqlDatabase as sqlx::Database>::Row>, Error> {
78 let mut guard = self.connection.write().await;
79
80 execute_with!(guard, accessor, query, fetch_all)
81 }
82
83 async fn execute<T>(
84 &self,
85 query: QueryOrRaw,
86 accessor: &wasmtime::component::Accessor<T, SqlHostState>,
87 ) -> Result<<SqlDatabase as sqlx::Database>::QueryResult, Error> {
88 let mut guard = self.connection.write().await;
89
90 execute_with!(guard, accessor, query, execute)
91 }
92}