sqlx_d1_core/
connection.rs

1use sqlx_core::{Url, Either};
2
3#[cfg(target_arch = "wasm32")]
4use {
5    crate::{error::D1Error, row::D1Row},
6    std::pin::Pin,
7    worker::{wasm_bindgen::JsValue, wasm_bindgen_futures::JsFuture, js_sys},
8};
9
10pub struct D1Connection {
11    #[cfg(target_arch = "wasm32")]
12    pub(crate) inner: worker_sys::D1Database,
13
14    #[cfg(not(target_arch = "wasm32"))]
15    pub(crate) inner: sqlx_sqlite::SqliteConnection,
16}
17
18const _: () = {
19    /* SAFETY: used in single-threaded Workers */
20    unsafe impl Send for D1Connection {}
21    unsafe impl Sync for D1Connection {}
22
23    impl D1Connection {
24        #[cfg(target_arch = "wasm32")]
25        pub fn new(d1: worker::D1Database) -> Self {
26            Self { inner: unsafe {std::mem::transmute(d1)} }
27        }
28
29        #[cfg(not(target_arch = "wasm32"))]
30        pub async fn connect(url: impl AsRef<str>) -> Result<Self, sqlx_core::Error> {
31            <Self as sqlx_core::connection::Connection>::connect(url.as_ref()).await
32        }
33    }
34
35    #[cfg(target_arch = "wasm32")]
36    impl Clone for D1Connection {
37        fn clone(&self) -> Self {
38            Self { inner: self.inner.clone() }
39        }
40    }
41
42    impl std::fmt::Debug for D1Connection {
43        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44            f.debug_struct("D1Connection").finish()
45        }
46    }
47
48    impl sqlx_core::connection::Connection for D1Connection {
49        type Database = crate::D1;
50
51        type Options = D1ConnectOptions;
52
53        fn close(self) -> crate::ResultFuture<'static, ()> {
54            Box::pin(async {Ok(())})
55        }
56
57        fn close_hard(self) -> crate::ResultFuture<'static, ()> {
58            Box::pin(async {Ok(())})
59        }
60
61        fn ping(&mut self) -> crate::ResultFuture<'_, ()> {
62            Box::pin(async {Ok(())})
63        }
64
65        fn begin(&mut self) -> crate::ResultFuture<'_, sqlx_core::transaction::Transaction<'_, Self::Database>>
66        where
67            Self: Sized,
68        {
69            sqlx_core::transaction::Transaction::begin(self)
70        }
71
72        fn shrink_buffers(&mut self) {
73            /* do nothing */
74        }
75
76        fn flush(&mut self) -> crate::ResultFuture<'_, ()> {
77            Box::pin(async {Ok(())})
78        }
79
80        fn should_flush(&self) -> bool {
81            false
82        }
83    }
84
85    impl<'c> sqlx_core::executor::Executor<'c> for &'c mut D1Connection {
86        type Database = crate::D1;
87
88        fn fetch_many<'e, 'q: 'e, E>(
89            self,
90            #[allow(unused)]
91            mut query: E,
92        ) -> futures_core::stream::BoxStream<
93            'e,
94            Result<
95                Either<
96                    <Self::Database as sqlx_core::database::Database>::QueryResult,
97                    <Self::Database as sqlx_core::database::Database>::Row
98                >,
99                sqlx_core::Error,
100            >,
101        >
102        where
103            'c: 'e,
104            E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
105        {
106            #[cfg(not(target_arch = "wasm32"))] {
107                unreachable!("native `Executor` impl")
108            }
109            #[cfg(target_arch = "wasm32")] {
110                <&'c D1Connection as sqlx_core::executor::Executor<'c>>::fetch_many(self, query)
111            }
112        }
113
114        fn fetch_optional<'e, 'q: 'e, E>(
115            self,
116            #[allow(unused)]
117            mut query: E,
118        ) -> crate::ResultFuture<'e, Option<<Self::Database as sqlx_core::database::Database>::Row>>
119        where
120            'c: 'e,
121            E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
122        {
123            #[cfg(not(target_arch = "wasm32"))] {
124                unreachable!("native `Executor` impl")
125            }
126            #[cfg(target_arch = "wasm32")] {
127                <&'c D1Connection as sqlx_core::executor::Executor<'c>>::fetch_optional(self, query)
128            }
129        }
130
131        fn prepare_with<'e, 'q: 'e>(
132            self,
133            sql: &'q str,
134            _parameters: &'e [<Self::Database as sqlx_core::database::Database>::TypeInfo],
135        ) -> crate::ResultFuture<'e, <Self::Database as sqlx_core::database::Database>::Statement<'q>>
136        where
137            'c: 'e,
138        {
139            Box::pin(async {
140                Ok(crate::statement::D1Statement {
141                    sql: std::borrow::Cow::Borrowed(sql),
142                })
143            })
144        }
145
146        fn describe<'e, 'q: 'e>(
147            self,
148            #[allow(unused)]
149            sql: &'q str,
150        ) -> crate::ResultFuture<'e, sqlx_core::describe::Describe<Self::Database>>
151        where
152            'c: 'e,
153        {
154            #[cfg(target_arch = "wasm32")] {
155                unreachable!("wasm32 describe")
156            }
157            #[cfg(not(target_arch = "wasm32"))] {
158                /* compile-time verification by macros */
159
160                Box::pin(async {
161                    let sqlx_core::describe::Describe {
162                        columns,
163                        parameters,
164                        nullable
165                    } = <&mut sqlx_sqlite::SqliteConnection as sqlx_core::executor::Executor>::describe(
166                        &mut self.inner,
167                        sql
168                    ).await?;
169                    
170                    Ok(sqlx_core::describe::Describe {
171                        parameters: parameters.map(|ps| match ps {
172                            Either::Left(type_infos) => Either::Left(type_infos.into_iter().map(crate::type_info::D1TypeInfo::from_sqlite).collect()),
173                            Either::Right(n) => Either::Right(n)
174                        }),
175                        columns: columns.into_iter().map(crate::column::D1Column::from_sqlite).collect(),
176                        nullable
177                    })
178                })
179            }
180        }
181    }
182
183    #[cfg(target_arch = "wasm32")]
184    impl<'c> sqlx_core::executor::Executor<'c> for &'c D1Connection {
185        type Database = crate::D1;
186
187        fn fetch_many<'e, 'q: 'e, E>(
188            self,
189            #[allow(unused)]
190            mut query: E,
191        ) -> futures_core::stream::BoxStream<
192            'e,
193            Result<
194                Either<
195                    <Self::Database as sqlx_core::database::Database>::QueryResult,
196                    <Self::Database as sqlx_core::database::Database>::Row
197                >,
198                sqlx_core::Error,
199            >,
200        >
201        where
202            'c: 'e,
203            E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
204        {
205            let sql = query.sql();
206            let arguments = match query.take_arguments() {
207                Ok(a) => a,
208                Err(e) => return Box::pin(futures_util::stream::once(async {Err(sqlx_core::Error::Encode(e))})),
209            };
210
211            struct FetchMany<F> {
212                raw_rows_future: F,
213                raw_rows: Option<js_sys::ArrayIntoIter>,
214            }
215            const _: () = {
216                /* SAFETY: used in single-threaded Workers */
217                unsafe impl<F> Send for FetchMany<F> {}
218
219                impl<F> FetchMany<F> {
220                    fn new(raw_rows_future: F) -> Self {
221                        Self { raw_rows_future, raw_rows: None }
222                    }
223                }
224
225                impl<F> futures_core::Stream for FetchMany<F>
226                where
227                    F: Future<Output = Result<Option<js_sys::Array>, JsValue>>,
228                {
229                    type Item = Result<
230                        Either<crate::query_result::D1QueryResult, D1Row>,
231                        sqlx_core::Error
232                    >;
233
234                    fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
235                        use std::task::Poll;
236
237                        fn pop_next(raw_rows: &mut js_sys::ArrayIntoIter) ->
238                            Option<Result<
239                                Either<crate::query_result::D1QueryResult, D1Row>,
240                                sqlx_core::Error
241                            >>
242                        {
243                            let raw_row = raw_rows.next()?;
244                            Some(D1Row::from_raw(raw_row).map(Either::Right))
245                        }
246
247                        let this = unsafe {self.get_unchecked_mut()};
248                        match &mut this.raw_rows {
249                            Some(raw_rows) => Poll::Ready(pop_next(raw_rows)),
250                            None => match unsafe {Pin::new_unchecked(&mut this.raw_rows_future)}.poll(cx) {
251                                Poll::Pending => Poll::Pending,
252                                Poll::Ready(Err(e)) => Poll::Ready(Some(Err(
253                                    sqlx_core::Error::from(D1Error::from(e))
254                                ))),
255                                Poll::Ready(Ok(maybe_raw_rows)) => {
256                                    this.raw_rows = Some(maybe_raw_rows.unwrap_or_else(js_sys::Array::new).into_iter());
257                                    Poll::Ready(pop_next(unsafe {this.raw_rows.as_mut().unwrap_unchecked()}))
258                                }
259                            }
260                        }                        
261                    }
262                }
263            };
264
265            Box::pin(FetchMany::new(async move {
266                let mut statement = self.inner.prepare(sql).unwrap();
267                if let Some(a) = arguments {
268                    statement = statement.bind(a.as_ref().iter().collect())?;
269                }
270
271                let d1_result_jsvalue = JsFuture::from(statement.all()?)
272                    .await?;
273                worker_sys::D1Result::from(d1_result_jsvalue)
274                    .results()
275            }))
276        }
277
278        fn fetch_optional<'e, 'q: 'e, E>(
279            self,
280            #[allow(unused)]
281            mut query: E,
282        ) -> crate::ResultFuture<'e, Option<<Self::Database as sqlx_core::database::Database>::Row>>
283        where
284            'c: 'e,
285            E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
286        {
287            let sql = query.sql();
288            let arguments = match query.take_arguments() {
289                Ok(a) => a,
290                Err(e) => return Box::pin(async {Err(sqlx_core::Error::Encode(e))}),
291            };
292
293            Box::pin(worker::send::SendFuture::new(async move {
294                let mut statement = self.inner.prepare(sql).unwrap();
295                if let Some(a) = arguments {
296                    statement = statement
297                        .bind(a.as_ref().iter().collect())
298                        .map_err(|e| sqlx_core::Error::Encode(Box::new(D1Error::from(e))))?;
299                }
300
301                let raw = JsFuture::from(statement.first(None).map_err(D1Error::from)?)
302                    .await
303                    .map_err(D1Error::from)?;
304                if raw.is_null() {
305                    Ok(None)
306                } else {
307                    D1Row::from_raw(raw).map(Some)
308                }
309            }))
310        }
311
312        fn prepare_with<'e, 'q: 'e>(
313            self,
314            sql: &'q str,
315            _parameters: &'e [<Self::Database as sqlx_core::database::Database>::TypeInfo],
316        ) -> crate::ResultFuture<'e, <Self::Database as sqlx_core::database::Database>::Statement<'q>>
317        where
318            'c: 'e,
319        {
320            Box::pin(async {
321                Ok(crate::statement::D1Statement {
322                    sql: std::borrow::Cow::Borrowed(sql),
323                })
324            })
325        }
326
327        fn describe<'e, 'q: 'e>(
328            self,
329            #[allow(unused)]
330            sql: &'q str,
331        ) -> crate::ResultFuture<'e, sqlx_core::describe::Describe<Self::Database>>
332        where
333            'c: 'e,
334        {
335            unreachable!("wasm32 describe")
336        }
337    }
338};
339
340/// ref: <https://developers.cloudflare.com/d1/sql-api/sql-statements/#compatible-pragma-statements>
341#[derive(Clone)]
342pub struct D1ConnectOptions {
343    pragmas: TogglePragmas,
344    #[cfg(target_arch = "wasm32")]
345    d1: worker_sys::D1Database,
346    #[cfg(not(target_arch = "wasm32"))]
347    sqlite_path: std::path::PathBuf,
348}
349const _: () = {
350    /* SAFETY: used in single-threaded Workers */
351    unsafe impl Send for D1ConnectOptions {}
352    unsafe impl Sync for D1ConnectOptions {}
353
354    #[cfg(target_arch = "wasm32")]
355    const URL_CONVERSION_UNSUPPORTED_MESSAGE: &'static str = "\
356        `sqlx_d1::D1ConnectOptions` doesn't support conversion between `Url`. \
357        Consider connect from options created by `D1ConnectOptions::new`. \
358    ";
359
360    const LOG_SETTINGS_UNSUPPORTED_MESSAGE: &'static str = "\
361        `sqlx_d1::D1ConnectOptions` doesn't support log settings.
362    ";
363
364    impl D1ConnectOptions {
365        #[cfg(target_arch = "wasm32")]
366        pub fn new(d1: worker::D1Database) -> Self {
367            Self {
368                d1: unsafe {core::mem::transmute(d1)},
369                pragmas: TogglePragmas::new(),
370            }
371        }
372    }
373
374    impl std::fmt::Debug for D1ConnectOptions {
375        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
376            f.debug_struct("D1ConnectOptions")
377                .field("pragmas", &self.pragmas)
378                .finish()
379        }
380    }
381
382    impl std::str::FromStr for D1ConnectOptions {
383        type Err = sqlx_core::Error;
384
385        fn from_str(_: &str) -> Result<Self, Self::Err> {
386            #[cfg(target_arch = "wasm32")] {
387                Err(sqlx_core::Error::Configuration(From::from(
388                    URL_CONVERSION_UNSUPPORTED_MESSAGE
389                )))
390            }
391
392            #[cfg(not(target_arch = "wasm32"))] {
393                use std::{io, fs, path::{Path, PathBuf}};
394
395                fn maybe_miniflare_d1_dir_of(dir: impl AsRef<Path>) -> PathBuf {
396                    dir.as_ref()
397                        .join(".wrangler")
398                        .join("state")
399                        .join("v3")
400                        .join("d1")
401                        .join("miniflare-D1DatabaseObject")
402                }
403            
404                const PACKAGE_ROOT: &str = env!("CARGO_MANIFEST_DIR");
405
406                let (candidate_1, candidate_2) = (
407                    maybe_miniflare_d1_dir_of(PACKAGE_ROOT),
408                    maybe_miniflare_d1_dir_of(".")
409                );
410
411                let sqlite_path = (|| -> io::Result<PathBuf> {                    
412                    let miniflare_d1_dir = match (
413                        fs::exists(&candidate_1),
414                        fs::exists(&candidate_2)
415                    ) {
416                        (Ok(true), _) => candidate_1,
417                        (_, Ok(true)) => candidate_2,
418                        (Err(e), _) | (_, Err(e)) => return Err(e),
419                        (Ok(false), Ok(false)) => return Err(io::Error::new(
420                            io::ErrorKind::NotFound,
421                            "miniflare's D1 emulating directory not found"
422                        )),
423                    };
424                    
425                    let [sqlite_path] = fs::read_dir(miniflare_d1_dir)?
426                        .filter_map(|r| r.as_ref().ok().and_then(|e| {
427                            let path = e.path();
428                            path.extension()
429                                .is_some_and(|ex| ex == "sqlite")
430                                .then_some(path)
431                        }))
432                        .collect::<Vec<_>>()
433                        .try_into()
434                        .map_err(|_| io::Error::new(
435                            io::ErrorKind::Other,
436                            "Currently, sqlx_d1 doesn't support multiple D1 bindings!"
437                        ))?;
438
439                    Ok(sqlite_path)
440                })().map_err(|_| sqlx_core::Error::WorkerCrashed)?;
441                    
442                Ok(Self {
443                    pragmas: TogglePragmas::new(),
444                    sqlite_path
445                })
446            }
447        }
448    }
449
450    impl sqlx_core::connection::ConnectOptions for D1ConnectOptions {
451        type Connection = D1Connection;
452
453        fn from_url(_url: &Url) -> Result<Self, sqlx_core::Error> {
454            #[cfg(target_arch = "wasm32")] {
455                Err(sqlx_core::Error::Configuration(From::from(
456                    URL_CONVERSION_UNSUPPORTED_MESSAGE
457                )))
458            }
459            #[cfg(not(target_arch = "wasm32"))] {
460                _url.as_str().parse()
461            }
462        }
463
464        fn to_url_lossy(&self) -> Url {
465            unreachable!("`sqlx_d1::ConnectOptions` doesn't support `ConnectOptions::to_url_lossy`")
466        }
467
468        fn connect(&self) -> crate::ResultFuture<'_, Self::Connection>
469        where
470            Self::Connection: Sized,
471        {
472            #[cfg(target_arch = "wasm32")] {
473                Box::pin(worker::send::SendFuture::new(async move {
474                    let d1 = self.d1.clone();
475
476                    if let Some(pragmas) = self.pragmas.collect() {
477                        JsFuture::from(d1.exec(&pragmas.join("\n")).map_err(D1Error::from)?)
478                            .await
479                            .map_err(D1Error::from)?;
480                    }
481
482                    Ok(D1Connection {
483                        inner: d1
484                    })
485                }))
486            }
487
488            #[cfg(not(target_arch = "wasm32"))] {
489                Box::pin(async move {
490                    use sqlx_core::{connection::Connection, executor::Executor};
491
492                    let mut sqlite_conn = sqlx_sqlite::SqliteConnection::connect(
493                        self.sqlite_path.to_str().ok_or(sqlx_core::Error::WorkerCrashed)?
494                    ).await?;
495
496                    if let Some(pragmas) = self.pragmas.collect() {
497                        for pragma in pragmas {
498                            sqlite_conn.execute(pragma).await?;
499                        }
500                    }
501                    
502                    Ok(D1Connection { inner: sqlite_conn })
503                })
504            }
505        }
506
507        fn log_statements(self, _: log::LevelFilter) -> Self {
508            unreachable!("{LOG_SETTINGS_UNSUPPORTED_MESSAGE}")
509        }
510
511        fn log_slow_statements(self, _: log::LevelFilter, _: std::time::Duration) -> Self {
512            unreachable!("{LOG_SETTINGS_UNSUPPORTED_MESSAGE}")
513        }
514    }
515};
516
517/// ref: <https://developers.cloudflare.com/d1/sql-api/sql-statements/#compatible-pragma-statements>
518#[derive(Clone, Copy)]
519struct TogglePragmas(u8);
520const _: () = {
521    impl std::ops::Not for TogglePragmas {
522        type Output = Self;
523        fn not(self) -> Self::Output {
524            Self(!self.0)
525        }
526    }
527    impl std::ops::BitOrAssign for TogglePragmas {
528        fn bitor_assign(&mut self, rhs: Self) {
529            self.0 |= self.0 | rhs.0;
530        }
531    }
532    impl std::ops::BitAndAssign for TogglePragmas {
533        fn bitand_assign(&mut self, rhs: Self) {
534            self.0 &= self.0 & rhs.0;
535        }
536    }
537    
538    impl TogglePragmas {
539        const fn new() -> Self {
540            Self(0)
541        }
542    }
543};
544
545macro_rules! toggles {
546    ($( $name:ident as $bits:literal; )*) => {
547        impl TogglePragmas {
548            $(
549                #[allow(non_upper_case_globals)]
550                const $name: Self = Self($bits);
551            )*
552
553            fn collect(&self) -> Option<Vec<&'static str>> {
554                #[allow(unused_mut)]
555                let mut pragmas = Vec::new();
556                $(
557                    if self.0 & Self::$name.0 != 0 {
558                        pragmas.push(concat!(
559                            "PRAGMA ",
560                            stringify!($name),
561                            " = on"
562                        ));
563                    }
564                )*
565                (!pragmas.is_empty()).then_some(pragmas)
566            }
567        }
568
569        impl std::fmt::Debug for TogglePragmas {
570            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
571                let mut f = &mut f.debug_map();
572                $(
573                    f = f.entry(
574                        &stringify!($name),
575                        &if self.0 & Self::$name.0 != 0 {"on"} else {"off"}
576                    );
577                )*
578                f.finish()
579            }
580        }
581
582        impl D1ConnectOptions {
583            $(
584                pub fn $name(mut self, yes: bool) -> Self {
585                    if yes {
586                        self.pragmas |= TogglePragmas::$name;
587                    } else {
588                        self.pragmas &= !TogglePragmas::$name;
589                    }
590                    self
591                }
592            )*
593        }
594    };
595}
596toggles! {
597    case_sensitive_like     as 0b0000001;
598    ignore_check_constraint as 0b0000010;
599    legacy_alter_table      as 0b0000100;
600    recursive_triggers      as 0b0001000;
601    unordered_selects       as 0b0010000;
602    foreign_keys            as 0b0100000;
603    defer_foreign_keys      as 0b1000000;
604}