sqlx_d1_core/
connection.rs

1use sqlx_core::{Url, Either};
2
3#[cfg(target_arch = "wasm32")]
4use {
5    crate::{error::D1Error, row::D1Row},
6    std::pin::Pin,
7    worker::{wasm_bindgen::JsValue, wasm_bindgen_futures::JsFuture, js_sys},
8};
9
10#[cfg(not(target_arch = "wasm32"))]
11macro_rules! unreachable_native_impl_of_item_for_only_wasm32 {
12    ($item_for_only_wasm32:literal) => {
13        panic!(
14            "native `{}`: Invalid use of `sqlx_d1`. Be sure to use `sqlx_d1` where the target is set to \
15            `wasm32-unknown-unknown` ! \n\
16            For this, typcally, place `.cargo/config.toml` of following content at the root of \
17            your project or workspace : \n\
18            \n\
19            [build]\n\
20            target = \"wasm32-unknown-unknown\"\n",
21            $item_for_only_wasm32
22        )
23    };
24}
25
26/// ## Example
27/// 
28/// ```toml
29/// # Cargo.toml
30/// 
31/// [dependencies]
32/// sqlx_d1 = { version = "0.1", features = ["macros"] }
33/// worker = { version = "0.6", features = ["d1"] }
34/// serde = { version = "1.0", features = ["derive"] }
35/// ```
36/// ```toml
37/// # wrangler.toml
38/// 
39/// [[d1_database]]
40/// binding = "DB"
41/// database_name = "..."
42/// database_id = "..."
43/// ```
44/// ```rust,ignore
45/// // src/lib.rs
46/// 
47/// #[worker::event(fetch)]
48/// async fn main(
49///     mut req: worker::Request,
50///     env: worker::Env,
51///     _ctx: worker::Context,
52/// ) -> worker::Result<worker::Response> {
53///     let d1 = env.d1("DB")?;
54///     let conn = sqlx_d1::D1Connection::new(d1);
55/// 
56///     #[derive(serde::Deserialize)]
57///     struct CreateUser {
58///         name: String,
59///         age: Option<u8>,
60///     }
61/// 
62///     let req = req.json::<CreateUser>().await?;
63/// 
64///     let id = sqlx_d1::query!(
65///         "
66///         INSERT INTO users (name, age) VALUES (?, ?)
67///         RETURNING id
68///         ",
69///             req.name,
70///             req.age
71///         )
72///         .fetch_one(&conn)
73///         .await
74///         .map_err(|e| worker::Error::RustError(e.to_string()))?
75///         .id;
76/// 
77///     worker::Response::ok(format!("Your id is {id}!"))
78/// }
79/// ```
80pub struct D1Connection {
81    #[cfg(target_arch = "wasm32")]
82    pub(crate) inner: worker_sys::D1Database,
83
84    #[cfg(not(target_arch = "wasm32"))]
85    pub(crate) inner: sqlx_sqlite::SqliteConnection,
86}
87
88const _: () = {
89    /* SAFETY: used in single-threaded Workers */
90    unsafe impl Send for D1Connection {}
91    unsafe impl Sync for D1Connection {}
92
93    impl D1Connection {
94        pub fn new(d1: worker::D1Database) -> Self {
95            #[cfg(target_arch = "wasm32")] {
96                Self { inner: unsafe {std::mem::transmute(d1)} }
97            }
98            #[cfg(not(target_arch = "wasm32"))] {
99                let _ = d1;
100                unreachable_native_impl_of_item_for_only_wasm32!("D1Cnnection::new");
101            }
102        }
103
104        #[cfg(not(target_arch = "wasm32"))]
105        pub async fn connect(url: impl AsRef<str>) -> Result<Self, sqlx_core::Error> {
106            <Self as sqlx_core::connection::Connection>::connect(url.as_ref()).await
107        }
108    }
109
110    impl Clone for D1Connection {
111        fn clone(&self) -> Self {
112            #[cfg(target_arch = "wasm32")] {
113                Self { inner: self.inner.clone() }
114            }
115            #[cfg(not(target_arch = "wasm32"))] {
116                unreachable_native_impl_of_item_for_only_wasm32!("impl Clone for D1Connection");
117            }
118        }
119    }
120
121    impl std::fmt::Debug for D1Connection {
122        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123            f.debug_struct("D1Connection").finish()
124        }
125    }
126
127    impl sqlx_core::connection::Connection for D1Connection {
128        type Database = crate::D1;
129
130        type Options = D1ConnectOptions;
131
132        fn close(self) -> crate::ResultFuture<'static, ()> {
133            Box::pin(async {Ok(())})
134        }
135
136        fn close_hard(self) -> crate::ResultFuture<'static, ()> {
137            Box::pin(async {Ok(())})
138        }
139
140        fn ping(&mut self) -> crate::ResultFuture<'_, ()> {
141            Box::pin(async {Ok(())})
142        }
143
144        fn begin(&mut self) -> crate::ResultFuture<'_, sqlx_core::transaction::Transaction<'_, Self::Database>>
145        where
146            Self: Sized,
147        {
148            sqlx_core::transaction::Transaction::begin(self)
149        }
150
151        fn shrink_buffers(&mut self) {
152            /* do nothing */
153        }
154
155        fn flush(&mut self) -> crate::ResultFuture<'_, ()> {
156            Box::pin(async {Ok(())})
157        }
158
159        fn should_flush(&self) -> bool {
160            false
161        }
162    }
163
164    impl<'c> sqlx_core::executor::Executor<'c> for &'c mut D1Connection {
165        type Database = crate::D1;
166
167        fn fetch_many<'e, 'q: 'e, E>(
168            self,
169            #[allow(unused)]
170            mut query: E,
171        ) -> futures_core::stream::BoxStream<
172            'e,
173            Result<
174                Either<
175                    <Self::Database as sqlx_core::database::Database>::QueryResult,
176                    <Self::Database as sqlx_core::database::Database>::Row
177                >,
178                sqlx_core::Error,
179            >,
180        >
181        where
182            'c: 'e,
183            E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
184        {
185            <&'c D1Connection as sqlx_core::executor::Executor<'c>>::fetch_many(self, query)
186        }
187
188        fn fetch_optional<'e, 'q: 'e, E>(
189            self,
190            #[allow(unused)]
191            mut query: E,
192        ) -> crate::ResultFuture<'e, Option<<Self::Database as sqlx_core::database::Database>::Row>>
193        where
194            'c: 'e,
195            E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
196        {
197            <&'c D1Connection as sqlx_core::executor::Executor<'c>>::fetch_optional(self, query)
198        }
199
200        fn prepare_with<'e, 'q: 'e>(
201            self,
202            sql: &'q str,
203            _parameters: &'e [<Self::Database as sqlx_core::database::Database>::TypeInfo],
204        ) -> crate::ResultFuture<'e, <Self::Database as sqlx_core::database::Database>::Statement<'q>>
205        where
206            'c: 'e,
207        {
208            Box::pin(async {
209                Ok(crate::statement::D1Statement {
210                    sql: std::borrow::Cow::Borrowed(sql),
211                })
212            })
213        }
214
215        fn describe<'e, 'q: 'e>(
216            self,
217            #[allow(unused)]
218            sql: &'q str,
219        ) -> crate::ResultFuture<'e, sqlx_core::describe::Describe<Self::Database>>
220        where
221            'c: 'e,
222        {
223            #[cfg(target_arch = "wasm32")] {
224                unreachable!("wasm32 describe")
225            }
226            #[cfg(not(target_arch = "wasm32"))] {
227                /* compile-time verification by macros */
228
229                Box::pin(async {
230                    let sqlx_core::describe::Describe {
231                        columns,
232                        parameters,
233                        nullable
234                    } = <&mut sqlx_sqlite::SqliteConnection as sqlx_core::executor::Executor>::describe(
235                        &mut self.inner,
236                        sql
237                    ).await?;
238                    
239                    Ok(sqlx_core::describe::Describe {
240                        parameters: parameters.map(|ps| match ps {
241                            Either::Left(type_infos) => Either::Left(type_infos.into_iter().map(crate::type_info::D1TypeInfo::from_sqlite).collect()),
242                            Either::Right(n) => Either::Right(n)
243                        }),
244                        columns: columns.into_iter().map(crate::column::D1Column::from_sqlite).collect(),
245                        nullable
246                    })
247                })
248            }
249        }
250    }
251
252    impl<'c> sqlx_core::executor::Executor<'c> for &'c D1Connection {
253        type Database = crate::D1;
254
255        fn fetch_many<'e, 'q: 'e, E>(
256            self,
257            #[allow(unused)]
258            mut query: E,
259        ) -> futures_core::stream::BoxStream<
260            'e,
261            Result<
262                Either<
263                    <Self::Database as sqlx_core::database::Database>::QueryResult,
264                    <Self::Database as sqlx_core::database::Database>::Row
265                >,
266                sqlx_core::Error,
267            >,
268        >
269        where
270            'c: 'e,
271            E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
272        {
273            #[cfg(not(target_arch = "wasm32"))] {
274                unreachable_native_impl_of_item_for_only_wasm32!("impl Executor for &D1Conection");
275            }
276            #[cfg(target_arch = "wasm32")] {
277                let sql = query.sql();
278                let arguments = match query.take_arguments() {
279                    Ok(a) => a,
280                    Err(e) => return Box::pin(futures_util::stream::once(async {Err(sqlx_core::Error::Encode(e))})),
281                };
282
283                struct FetchMany<F> {
284                    raw_rows_future: F,
285                    raw_rows: Option<js_sys::ArrayIntoIter>,
286                }
287                const _: () = {
288                    /* SAFETY: used in single-threaded Workers */
289                    unsafe impl<F> Send for FetchMany<F> {}
290
291                    impl<F> FetchMany<F> {
292                        fn new(raw_rows_future: F) -> Self {
293                            Self { raw_rows_future, raw_rows: None }
294                        }
295                    }
296
297                    impl<F> futures_core::Stream for FetchMany<F>
298                    where
299                        F: Future<Output = Result<Option<js_sys::Array>, JsValue>>,
300                    {
301                        type Item = Result<
302                            Either<crate::query_result::D1QueryResult, D1Row>,
303                            sqlx_core::Error
304                        >;
305
306                        fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
307                            use std::task::Poll;
308
309                            fn pop_next(raw_rows: &mut js_sys::ArrayIntoIter) ->
310                                Option<Result<
311                                    Either<crate::query_result::D1QueryResult, D1Row>,
312                                    sqlx_core::Error
313                                >>
314                            {
315                                let raw_row = raw_rows.next()?;
316                                Some(D1Row::from_raw(raw_row).map(Either::Right))
317                            }
318
319                            let this = unsafe {self.get_unchecked_mut()};
320                            match &mut this.raw_rows {
321                                Some(raw_rows) => Poll::Ready(pop_next(raw_rows)),
322                                None => match unsafe {Pin::new_unchecked(&mut this.raw_rows_future)}.poll(cx) {
323                                    Poll::Pending => Poll::Pending,
324                                    Poll::Ready(Err(e)) => Poll::Ready(Some(Err(
325                                        sqlx_core::Error::from(D1Error::from(e))
326                                    ))),
327                                    Poll::Ready(Ok(maybe_raw_rows)) => {
328                                        this.raw_rows = Some(maybe_raw_rows.unwrap_or_else(js_sys::Array::new).into_iter());
329                                        Poll::Ready(pop_next(unsafe {this.raw_rows.as_mut().unwrap_unchecked()}))
330                                    }
331                                }
332                            }                        
333                        }
334                    }
335                };
336
337                Box::pin(FetchMany::new(async move {
338                    let mut statement = self.inner.prepare(sql).unwrap();
339                    if let Some(a) = arguments {
340                        statement = statement.bind(a.as_ref().iter().collect())?;
341                    }
342
343                    let d1_result_jsvalue = JsFuture::from(statement.all()?)
344                        .await?;
345                    worker_sys::D1Result::from(d1_result_jsvalue)
346                        .results()
347                }))
348            }
349        }
350
351        fn fetch_optional<'e, 'q: 'e, E>(
352            self,
353            #[allow(unused)]
354            mut query: E,
355        ) -> crate::ResultFuture<'e, Option<<Self::Database as sqlx_core::database::Database>::Row>>
356        where
357            'c: 'e,
358            E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
359        {
360            #[cfg(not(target_arch = "wasm32"))] {
361                unreachable_native_impl_of_item_for_only_wasm32!("impl Executor for &D1Conection");
362            }
363            #[cfg(target_arch = "wasm32")] {
364                let sql = query.sql();
365                let arguments = match query.take_arguments() {
366                    Ok(a) => a,
367                    Err(e) => return Box::pin(async {Err(sqlx_core::Error::Encode(e))}),
368                };
369
370                Box::pin(worker::send::SendFuture::new(async move {
371                    let mut statement = self.inner.prepare(sql).unwrap();
372                    if let Some(a) = arguments {
373                        statement = statement
374                            .bind(a.as_ref().iter().collect())
375                            .map_err(|e| sqlx_core::Error::Encode(Box::new(D1Error::from(e))))?;
376                    }
377
378                    let raw = JsFuture::from(statement.first(None).map_err(D1Error::from)?)
379                        .await
380                        .map_err(D1Error::from)?;
381                    if raw.is_null() {
382                        Ok(None)
383                    } else {
384                        D1Row::from_raw(raw).map(Some)
385                    }
386                }))
387            }
388        }
389
390        fn prepare_with<'e, 'q: 'e>(
391            self,
392            sql: &'q str,
393            _parameters: &'e [<Self::Database as sqlx_core::database::Database>::TypeInfo],
394        ) -> crate::ResultFuture<'e, <Self::Database as sqlx_core::database::Database>::Statement<'q>>
395        where
396            'c: 'e,
397        {
398            Box::pin(async {
399                Ok(crate::statement::D1Statement {
400                    sql: std::borrow::Cow::Borrowed(sql),
401                })
402            })
403        }
404
405        fn describe<'e, 'q: 'e>(
406            self,
407            #[allow(unused)]
408            sql: &'q str,
409        ) -> crate::ResultFuture<'e, sqlx_core::describe::Describe<Self::Database>>
410        where
411            'c: 'e,
412        {
413            #[cfg(not(target_arch = "wasm32"))] {
414                unreachable_native_impl_of_item_for_only_wasm32!("impl Executor for &D1Conection");
415            }
416            #[cfg(target_arch = "wasm32")] {
417                unreachable!("wasm32 describe")
418            }
419        }
420    }
421};
422
423/// ref: <https://developers.cloudflare.com/d1/sql-api/sql-statements/#compatible-pragma-statements>
424#[derive(Clone)]
425pub struct D1ConnectOptions {
426    pragmas: TogglePragmas,
427    #[cfg(target_arch = "wasm32")]
428    d1: worker_sys::D1Database,
429    #[cfg(not(target_arch = "wasm32"))]
430    sqlite_path: std::path::PathBuf,
431}
432const _: () = {
433    /* SAFETY: used in single-threaded Workers */
434    unsafe impl Send for D1ConnectOptions {}
435    unsafe impl Sync for D1ConnectOptions {}
436
437    #[cfg(target_arch = "wasm32")]
438    const URL_CONVERSION_UNSUPPORTED_MESSAGE: &'static str = "\
439        `sqlx_d1::D1ConnectOptions` doesn't support conversion between `Url`. \
440        Consider connect from options created by `D1ConnectOptions::new`. \
441    ";
442
443    const LOG_SETTINGS_UNSUPPORTED_MESSAGE: &'static str = "\
444        `sqlx_d1::D1ConnectOptions` doesn't support log settings.
445    ";
446
447    impl D1ConnectOptions {
448        pub fn new(
449            #[allow(unused)]
450            d1: worker::D1Database,
451        ) -> Self {
452            #[cfg(target_arch = "wasm32")] {
453                Self {
454                    d1: unsafe {core::mem::transmute(d1)},
455                    pragmas: TogglePragmas::new(),
456                }
457            }
458            #[cfg(not(target_arch = "wasm32"))] {
459                unreachable_native_impl_of_item_for_only_wasm32!("D1ConnectOptions::new");
460            }
461        }
462
463        pub async fn connect(self) -> Result<D1Connection, crate::error::D1Error> {
464            #[cfg(target_arch = "wasm32")] {
465                let Self { d1, pragmas } = self;
466                if let Some(pragmas) = pragmas.collect() {
467                    JsFuture::from(d1.exec(&pragmas.join("\n")).map_err(D1Error::from)?)
468                        .await
469                        .map_err(D1Error::from)?;
470                }
471                Ok(D1Connection { inner: d1 })
472            }
473            #[cfg(not(target_arch = "wasm32"))] {
474                unreachable_native_impl_of_item_for_only_wasm32!("D1ConnectOptions::connect");
475            }
476        }
477    }
478
479    impl std::fmt::Debug for D1ConnectOptions {
480        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
481            f.debug_struct("D1ConnectOptions")
482                .field("pragmas", &self.pragmas)
483                .finish()
484        }
485    }
486
487    impl std::str::FromStr for D1ConnectOptions {
488        type Err = sqlx_core::Error;
489
490        fn from_str(_: &str) -> Result<Self, Self::Err> {
491            #[cfg(target_arch = "wasm32")] {
492                Err(sqlx_core::Error::Configuration(From::from(
493                    URL_CONVERSION_UNSUPPORTED_MESSAGE
494                )))
495            }
496
497            #[cfg(not(target_arch = "wasm32"))] {
498                use std::{io, fs, path::{Path, PathBuf}};
499
500                fn maybe_miniflare_d1_dir_of(dir: impl AsRef<Path>) -> PathBuf {
501                    dir.as_ref()
502                        .join(".wrangler")
503                        .join("state")
504                        .join("v3")
505                        .join("d1")
506                        .join("miniflare-D1DatabaseObject")
507                }
508            
509                const PACKAGE_ROOT: &str = env!("CARGO_MANIFEST_DIR");
510
511                let (candidate_1, candidate_2) = (
512                    maybe_miniflare_d1_dir_of(PACKAGE_ROOT),
513                    maybe_miniflare_d1_dir_of(".")
514                );
515
516                let sqlite_path = (|| -> io::Result<PathBuf> {                    
517                    let miniflare_d1_dir = match (
518                        fs::exists(&candidate_1),
519                        fs::exists(&candidate_2)
520                    ) {
521                        (Ok(true), _) => candidate_1,
522                        (_, Ok(true)) => candidate_2,
523                        (Err(e), _) | (_, Err(e)) => return Err(e),
524                        (Ok(false), Ok(false)) => return Err(io::Error::new(
525                            io::ErrorKind::NotFound,
526                            "miniflare's D1 emulating directory not found"
527                        )),
528                    };
529                    
530                    let [sqlite_path] = fs::read_dir(miniflare_d1_dir)?
531                        .filter_map(|r| r.as_ref().ok().and_then(|e| {
532                            let path = e.path();
533                            path.extension()
534                                .is_some_and(|ex| ex == "sqlite")
535                                .then_some(path)
536                        }))
537                        .collect::<Vec<_>>()
538                        .try_into()
539                        .map_err(|_| io::Error::new(
540                            io::ErrorKind::Other,
541                            "Currently, sqlx_d1 doesn't support multiple D1 bindings!"
542                        ))?;
543
544                    Ok(sqlite_path)
545                })().map_err(|_| sqlx_core::Error::WorkerCrashed)?;
546                    
547                Ok(Self {
548                    pragmas: TogglePragmas::new(),
549                    sqlite_path
550                })
551            }
552        }
553    }
554
555    impl sqlx_core::connection::ConnectOptions for D1ConnectOptions {
556        type Connection = D1Connection;
557
558        fn from_url(_url: &Url) -> Result<Self, sqlx_core::Error> {
559            #[cfg(target_arch = "wasm32")] {
560                Err(sqlx_core::Error::Configuration(From::from(
561                    URL_CONVERSION_UNSUPPORTED_MESSAGE
562                )))
563            }
564            #[cfg(not(target_arch = "wasm32"))] {
565                _url.as_str().parse()
566            }
567        }
568
569        fn to_url_lossy(&self) -> Url {
570            unreachable!("`sqlx_d1::ConnectOptions` doesn't support `ConnectOptions::to_url_lossy`")
571        }
572
573        fn connect(&self) -> crate::ResultFuture<'_, Self::Connection>
574        where
575            Self::Connection: Sized,
576        {
577            #[cfg(target_arch = "wasm32")] {
578                Box::pin(worker::send::SendFuture::new(async move {
579                    <Self>::connect(self.clone()).await
580                        .map_err(|e| sqlx_core::Error::Database(Box::new(e)))
581                }))
582            }
583
584            #[cfg(not(target_arch = "wasm32"))] {
585                Box::pin(async move {
586                    use sqlx_core::{connection::Connection, executor::Executor};
587
588                    let mut sqlite_conn = sqlx_sqlite::SqliteConnection::connect(
589                        self.sqlite_path.to_str().ok_or(sqlx_core::Error::WorkerCrashed)?
590                    ).await?;
591
592                    if let Some(pragmas) = self.pragmas.collect() {
593                        for pragma in pragmas {
594                            sqlite_conn.execute(pragma).await?;
595                        }
596                    }
597                    
598                    Ok(D1Connection { inner: sqlite_conn })
599                })
600            }
601        }
602
603        fn log_statements(self, _: log::LevelFilter) -> Self {
604            unreachable!("{LOG_SETTINGS_UNSUPPORTED_MESSAGE}")
605        }
606
607        fn log_slow_statements(self, _: log::LevelFilter, _: std::time::Duration) -> Self {
608            unreachable!("{LOG_SETTINGS_UNSUPPORTED_MESSAGE}")
609        }
610    }
611};
612
613/// ref: <https://developers.cloudflare.com/d1/sql-api/sql-statements/#compatible-pragma-statements>
614#[derive(Clone, Copy)]
615struct TogglePragmas(u8);
616const _: () = {
617    impl std::ops::Not for TogglePragmas {
618        type Output = Self;
619        fn not(self) -> Self::Output {
620            Self(!self.0)
621        }
622    }
623    impl std::ops::BitOrAssign for TogglePragmas {
624        fn bitor_assign(&mut self, rhs: Self) {
625            self.0 |= self.0 | rhs.0;
626        }
627    }
628    impl std::ops::BitAndAssign for TogglePragmas {
629        fn bitand_assign(&mut self, rhs: Self) {
630            self.0 &= self.0 & rhs.0;
631        }
632    }
633    
634    impl TogglePragmas {
635        const fn new() -> Self {
636            Self(0)
637        }
638    }
639};
640
641macro_rules! toggles {
642    ($( $name:ident as $bits:literal; )*) => {
643        impl TogglePragmas {
644            $(
645                #[allow(non_upper_case_globals)]
646                const $name: Self = Self($bits);
647            )*
648
649            fn collect(&self) -> Option<Vec<&'static str>> {
650                #[allow(unused_mut)]
651                let mut pragmas = Vec::new();
652                $(
653                    if self.0 & Self::$name.0 != 0 {
654                        pragmas.push(concat!(
655                            "PRAGMA ",
656                            stringify!($name),
657                            " = on"
658                        ));
659                    }
660                )*
661                (!pragmas.is_empty()).then_some(pragmas)
662            }
663        }
664
665        impl std::fmt::Debug for TogglePragmas {
666            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
667                let mut f = &mut f.debug_map();
668                $(
669                    f = f.entry(
670                        &stringify!($name),
671                        &if self.0 & Self::$name.0 != 0 {"on"} else {"off"}
672                    );
673                )*
674                f.finish()
675            }
676        }
677
678        impl D1ConnectOptions {
679            $(
680                pub fn $name(mut self, yes: bool) -> Self {
681                    if yes {
682                        self.pragmas |= TogglePragmas::$name;
683                    } else {
684                        self.pragmas &= !TogglePragmas::$name;
685                    }
686                    self
687                }
688            )*
689        }
690    };
691}
692toggles! {
693    case_sensitive_like     as 0b0000001;
694    ignore_check_constraint as 0b0000010;
695    legacy_alter_table      as 0b0000100;
696    recursive_triggers      as 0b0001000;
697    unordered_selects       as 0b0010000;
698    foreign_keys            as 0b0100000;
699    defer_foreign_keys      as 0b1000000;
700}