1use sqlx_core::{Either, Url};
2
3#[cfg(target_arch = "wasm32")]
4use {
5 crate::{error::D1Error, row::D1Row},
6 std::pin::Pin,
7 worker::{js_sys, wasm_bindgen::JsValue, wasm_bindgen_futures::JsFuture},
8};
9
10#[cfg(not(target_arch = "wasm32"))]
11macro_rules! unreachable_native_impl_of_item_for_only_wasm32 {
12 ($item_for_only_wasm32:literal) => {
13 panic!(
14 "native `{}`: Invalid use of `sqlx_d1`. Be sure to use `sqlx_d1` where the target is set to \
15 `wasm32-unknown-unknown` ! \n\
16 For this, typcally, place `.cargo/config.toml` of following content at the root of \
17 your project or workspace : \n\
18 \n\
19 [build]\n\
20 target = \"wasm32-unknown-unknown\"\n",
21 $item_for_only_wasm32
22 )
23 };
24}
25
26pub struct D1Connection {
86 #[cfg(target_arch = "wasm32")]
87 pub(crate) inner: worker_sys::D1Database,
88
89 #[cfg(not(target_arch = "wasm32"))]
90 pub(crate) inner: sqlx_sqlite::SqliteConnection,
91}
92
93const _: () = {
94 unsafe impl Send for D1Connection {}
96 unsafe impl Sync for D1Connection {}
97
98 impl D1Connection {
99 pub fn new(d1: worker::D1Database) -> Self {
100 #[cfg(target_arch = "wasm32")]
101 {
102 Self {
103 inner: unsafe {
105 std::mem::transmute::<worker::D1Database, worker_sys::D1Database>(d1)
106 },
107 }
108 }
109 #[cfg(not(target_arch = "wasm32"))]
110 {
111 let _ = d1;
112 unreachable_native_impl_of_item_for_only_wasm32!("D1Cnnection::new");
113 }
114 }
115
116 #[cfg(not(target_arch = "wasm32"))]
117 pub async fn connect(url: impl AsRef<str>) -> Result<Self, sqlx_core::Error> {
118 <Self as sqlx_core::connection::Connection>::connect(url.as_ref()).await
119 }
120 }
121
122 impl Clone for D1Connection {
123 fn clone(&self) -> Self {
124 #[cfg(target_arch = "wasm32")]
125 {
126 Self {
127 inner: self.inner.clone(),
128 }
129 }
130 #[cfg(not(target_arch = "wasm32"))]
131 {
132 unreachable_native_impl_of_item_for_only_wasm32!("impl Clone for D1Connection");
133 }
134 }
135 }
136
137 impl std::fmt::Debug for D1Connection {
138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 f.debug_struct("D1Connection").finish()
140 }
141 }
142
143 impl sqlx_core::connection::Connection for D1Connection {
144 type Database = crate::D1;
145
146 type Options = D1ConnectOptions;
147
148 fn close(self) -> crate::ResultFuture<'static, ()> {
149 Box::pin(async { Ok(()) })
150 }
151
152 fn close_hard(self) -> crate::ResultFuture<'static, ()> {
153 Box::pin(async { Ok(()) })
154 }
155
156 fn ping(&mut self) -> crate::ResultFuture<'_, ()> {
157 Box::pin(async { Ok(()) })
158 }
159
160 fn begin(
161 &mut self,
162 ) -> crate::ResultFuture<'_, sqlx_core::transaction::Transaction<'_, Self::Database>>
163 where
164 Self: Sized,
165 {
166 sqlx_core::transaction::Transaction::begin(self, None)
167 }
168
169 fn shrink_buffers(&mut self) {
170 }
172
173 fn flush(&mut self) -> crate::ResultFuture<'_, ()> {
174 Box::pin(async { Ok(()) })
175 }
176
177 fn should_flush(&self) -> bool {
178 false
179 }
180 }
181
182 impl<'c> sqlx_core::executor::Executor<'c> for &'c mut D1Connection {
183 type Database = crate::D1;
184
185 fn fetch_many<'e, 'q: 'e, E>(
186 self,
187 #[allow(unused)] mut query: E,
188 ) -> futures_core::stream::BoxStream<
189 'e,
190 Result<
191 Either<
192 <Self::Database as sqlx_core::database::Database>::QueryResult,
193 <Self::Database as sqlx_core::database::Database>::Row,
194 >,
195 sqlx_core::Error,
196 >,
197 >
198 where
199 'c: 'e,
200 E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
201 {
202 <&'c D1Connection as sqlx_core::executor::Executor<'c>>::fetch_many(self, query)
203 }
204
205 fn fetch_optional<'e, 'q: 'e, E>(
206 self,
207 #[allow(unused)] mut query: E,
208 ) -> crate::ResultFuture<'e, Option<<Self::Database as sqlx_core::database::Database>::Row>>
209 where
210 'c: 'e,
211 E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
212 {
213 <&'c D1Connection as sqlx_core::executor::Executor<'c>>::fetch_optional(self, query)
214 }
215
216 fn prepare_with<'e, 'q: 'e>(
217 self,
218 sql: &'q str,
219 _parameters: &'e [<Self::Database as sqlx_core::database::Database>::TypeInfo],
220 ) -> crate::ResultFuture<'e, <Self::Database as sqlx_core::database::Database>::Statement<'q>>
221 where
222 'c: 'e,
223 {
224 Box::pin(async {
225 Ok(crate::statement::D1Statement {
226 sql: std::borrow::Cow::Borrowed(sql),
227 })
228 })
229 }
230
231 fn describe<'e, 'q: 'e>(
232 self,
233 #[allow(unused)] sql: &'q str,
234 ) -> crate::ResultFuture<'e, sqlx_core::describe::Describe<Self::Database>>
235 where
236 'c: 'e,
237 {
238 #[cfg(target_arch = "wasm32")]
239 {
240 unreachable!("wasm32 describe")
241 }
242 #[cfg(not(target_arch = "wasm32"))]
243 {
244 Box::pin(async {
247 let sqlx_core::describe::Describe {
248 columns,
249 parameters,
250 nullable
251 } = <&mut sqlx_sqlite::SqliteConnection as sqlx_core::executor::Executor>::describe(
252 &mut self.inner,
253 sql
254 ).await?;
255
256 Ok(sqlx_core::describe::Describe {
257 parameters: parameters.map(|ps| match ps {
258 Either::Left(type_infos) => Either::Left(
259 type_infos
260 .into_iter()
261 .map(crate::type_info::D1TypeInfo::from_sqlite)
262 .collect(),
263 ),
264 Either::Right(n) => Either::Right(n),
265 }),
266 columns: columns
267 .into_iter()
268 .map(crate::column::D1Column::from_sqlite)
269 .collect(),
270 nullable,
271 })
272 })
273 }
274 }
275 }
276
277 impl<'c> sqlx_core::executor::Executor<'c> for &'c D1Connection {
278 type Database = crate::D1;
279
280 fn fetch_many<'e, 'q: 'e, E>(
281 self,
282 #[allow(unused)] mut query: E,
283 ) -> futures_core::stream::BoxStream<
284 'e,
285 Result<
286 Either<
287 <Self::Database as sqlx_core::database::Database>::QueryResult,
288 <Self::Database as sqlx_core::database::Database>::Row,
289 >,
290 sqlx_core::Error,
291 >,
292 >
293 where
294 'c: 'e,
295 E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
296 {
297 #[cfg(not(target_arch = "wasm32"))]
298 {
299 unreachable_native_impl_of_item_for_only_wasm32!("impl Executor for &D1Conection");
300 }
301 #[cfg(target_arch = "wasm32")]
302 {
303 let sql = query.sql();
304 let arguments = match query.take_arguments() {
305 Ok(a) => a,
306 Err(e) => {
307 return Box::pin(futures_util::stream::once(async {
308 Err(sqlx_core::Error::Encode(e))
309 }));
310 }
311 };
312
313 struct FetchMany<F> {
314 raw_rows_future: F,
315 raw_rows: Option<js_sys::ArrayIntoIter>,
316 }
317 const _: () = {
318 unsafe impl<F> Send for FetchMany<F> {}
320
321 impl<F> FetchMany<F> {
322 fn new(raw_rows_future: F) -> Self {
323 Self {
324 raw_rows_future,
325 raw_rows: None,
326 }
327 }
328 }
329
330 impl<F> futures_core::Stream for FetchMany<F>
331 where
332 F: Future<Output = Result<Option<js_sys::Array>, JsValue>>,
333 {
334 type Item = Result<
335 Either<crate::query_result::D1QueryResult, D1Row>,
336 sqlx_core::Error,
337 >;
338
339 fn poll_next(
340 self: Pin<&mut Self>,
341 cx: &mut std::task::Context<'_>,
342 ) -> std::task::Poll<Option<Self::Item>> {
343 use std::task::Poll;
344
345 fn pop_next(
346 raw_rows: &mut js_sys::ArrayIntoIter,
347 ) -> Option<
348 Result<
349 Either<crate::query_result::D1QueryResult, D1Row>,
350 sqlx_core::Error,
351 >,
352 > {
353 let raw_row = raw_rows.next()?;
354 Some(D1Row::from_raw(raw_row).map(Either::Right))
355 }
356
357 let this = unsafe { self.get_unchecked_mut() };
358 match &mut this.raw_rows {
359 Some(raw_rows) => Poll::Ready(pop_next(raw_rows)),
360 None => {
361 match unsafe { Pin::new_unchecked(&mut this.raw_rows_future) }
362 .poll(cx)
363 {
364 Poll::Pending => Poll::Pending,
365 Poll::Ready(Err(e)) => Poll::Ready(Some(Err(
366 sqlx_core::Error::from(D1Error::from(e)),
367 ))),
368 Poll::Ready(Ok(maybe_raw_rows)) => {
369 this.raw_rows = Some(
370 maybe_raw_rows
371 .unwrap_or_else(js_sys::Array::new)
372 .into_iter(),
373 );
374 Poll::Ready(pop_next(unsafe {
375 this.raw_rows.as_mut().unwrap_unchecked()
376 }))
377 }
378 }
379 }
380 }
381 }
382 }
383 };
384
385 Box::pin(FetchMany::new(async move {
386 let mut statement = self.inner.prepare(sql).unwrap();
387 if let Some(a) = arguments {
388 statement = statement.bind(a.as_ref().iter().collect())?;
389 }
390
391 let d1_result_jsvalue = JsFuture::from(statement.all()?).await?;
392 worker_sys::D1Result::from(d1_result_jsvalue).results()
393 }))
394 }
395 }
396
397 fn fetch_optional<'e, 'q: 'e, E>(
398 self,
399 #[allow(unused)] mut query: E,
400 ) -> crate::ResultFuture<'e, Option<<Self::Database as sqlx_core::database::Database>::Row>>
401 where
402 'c: 'e,
403 E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
404 {
405 #[cfg(not(target_arch = "wasm32"))]
406 {
407 unreachable_native_impl_of_item_for_only_wasm32!("impl Executor for &D1Conection");
408 }
409 #[cfg(target_arch = "wasm32")]
410 {
411 let sql = query.sql();
412 let arguments = match query.take_arguments() {
413 Ok(a) => a,
414 Err(e) => return Box::pin(async { Err(sqlx_core::Error::Encode(e)) }),
415 };
416
417 Box::pin(worker::send::SendFuture::new(async move {
418 let mut statement = self.inner.prepare(sql).unwrap();
419 if let Some(a) = arguments {
420 statement = statement
421 .bind(a.as_ref().iter().collect())
422 .map_err(|e| sqlx_core::Error::Encode(Box::new(D1Error::from(e))))?;
423 }
424
425 let raw = JsFuture::from(statement.first(None).map_err(D1Error::from)?)
426 .await
427 .map_err(D1Error::from)?;
428 if raw.is_null() {
429 Ok(None)
430 } else {
431 D1Row::from_raw(raw).map(Some)
432 }
433 }))
434 }
435 }
436
437 fn prepare_with<'e, 'q: 'e>(
438 self,
439 sql: &'q str,
440 _parameters: &'e [<Self::Database as sqlx_core::database::Database>::TypeInfo],
441 ) -> crate::ResultFuture<'e, <Self::Database as sqlx_core::database::Database>::Statement<'q>>
442 where
443 'c: 'e,
444 {
445 Box::pin(async {
446 Ok(crate::statement::D1Statement {
447 sql: std::borrow::Cow::Borrowed(sql),
448 })
449 })
450 }
451
452 fn describe<'e, 'q: 'e>(
453 self,
454 #[allow(unused)] sql: &'q str,
455 ) -> crate::ResultFuture<'e, sqlx_core::describe::Describe<Self::Database>>
456 where
457 'c: 'e,
458 {
459 #[cfg(not(target_arch = "wasm32"))]
460 {
461 unreachable_native_impl_of_item_for_only_wasm32!("impl Executor for &D1Conection");
462 }
463 #[cfg(target_arch = "wasm32")]
464 {
465 unreachable!("wasm32 describe")
466 }
467 }
468 }
469};
470
471#[derive(Clone)]
473pub struct D1ConnectOptions {
474 pragmas: TogglePragmas,
475 #[cfg(target_arch = "wasm32")]
476 d1: worker_sys::D1Database,
477 #[cfg(not(target_arch = "wasm32"))]
478 sqlite_path: std::path::PathBuf,
479}
480const _: () = {
481 unsafe impl Send for D1ConnectOptions {}
483 unsafe impl Sync for D1ConnectOptions {}
484
485 #[cfg(target_arch = "wasm32")]
486 const URL_CONVERSION_UNSUPPORTED_MESSAGE: &str = "\
487 `sqlx_d1::D1ConnectOptions` doesn't support conversion between `Url`. \
488 Consider connect from options created by `D1ConnectOptions::new`. \
489 ";
490
491 const LOG_SETTINGS_UNSUPPORTED_MESSAGE: &str = "\
492 `sqlx_d1::D1ConnectOptions` doesn't support log settings.
493 ";
494
495 impl D1ConnectOptions {
496 pub fn new(#[allow(unused)] d1: worker::D1Database) -> Self {
497 #[cfg(target_arch = "wasm32")]
498 {
499 Self {
500 d1: unsafe {
502 core::mem::transmute::<worker::D1Database, worker_sys::D1Database>(d1)
503 },
504 pragmas: TogglePragmas::new(),
505 }
506 }
507 #[cfg(not(target_arch = "wasm32"))]
508 {
509 unreachable_native_impl_of_item_for_only_wasm32!("D1ConnectOptions::new");
510 }
511 }
512
513 pub async fn connect(self) -> Result<D1Connection, crate::error::D1Error> {
514 #[cfg(target_arch = "wasm32")]
515 {
516 let Self { d1, pragmas } = self;
517 if let Some(pragmas) = pragmas.collect() {
518 JsFuture::from(d1.exec(&pragmas.join("\n")).map_err(D1Error::from)?)
519 .await
520 .map_err(D1Error::from)?;
521 }
522 Ok(D1Connection { inner: d1 })
523 }
524 #[cfg(not(target_arch = "wasm32"))]
525 {
526 unreachable_native_impl_of_item_for_only_wasm32!("D1ConnectOptions::connect");
527 }
528 }
529 }
530
531 impl std::fmt::Debug for D1ConnectOptions {
532 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
533 f.debug_struct("D1ConnectOptions")
534 .field("pragmas", &self.pragmas)
535 .finish()
536 }
537 }
538
539 impl std::str::FromStr for D1ConnectOptions {
540 type Err = sqlx_core::Error;
541
542 fn from_str(_: &str) -> Result<Self, Self::Err> {
543 #[cfg(target_arch = "wasm32")]
544 {
545 Err(sqlx_core::Error::Configuration(From::from(
546 URL_CONVERSION_UNSUPPORTED_MESSAGE,
547 )))
548 }
549
550 #[cfg(not(target_arch = "wasm32"))]
551 {
552 use std::{
553 fs, io,
554 path::{Path, PathBuf},
555 };
556
557 fn maybe_miniflare_d1_dir_of(dir: impl AsRef<Path>) -> PathBuf {
558 dir.as_ref()
559 .join(".wrangler")
560 .join("state")
561 .join("v3")
562 .join("d1")
563 .join("miniflare-D1DatabaseObject")
564 }
565
566 let package_root = std::env::var("CARGO_MANIFEST_DIR").unwrap_or_default();
568
569 let (candidate_1, candidate_2) = (
570 maybe_miniflare_d1_dir_of(&package_root),
571 maybe_miniflare_d1_dir_of("."),
572 );
573
574 let sqlite_path = (|| -> io::Result<PathBuf> {
575 let miniflare_d1_dir =
576 match (fs::exists(&candidate_1), fs::exists(&candidate_2)) {
577 (Ok(true), _) => candidate_1,
578 (_, Ok(true)) => candidate_2,
579 (Err(e), _) | (_, Err(e)) => return Err(e),
580 (Ok(false), Ok(false)) => {
581 return Err(io::Error::new(
582 io::ErrorKind::NotFound,
583 "miniflare's D1 emulating directory not found",
584 ));
585 }
586 };
587
588 let [sqlite_path] = fs::read_dir(miniflare_d1_dir)?
589 .filter_map(|r| {
590 r.as_ref().ok().and_then(|e| {
591 let path = e.path();
592 (path.extension().is_some_and(|x| x == "sqlite")
593 && path.file_name().is_some_and(|x| x != "metadata.sqlite"))
594 .then_some(path)
595 })
596 })
597 .collect::<Vec<_>>()
598 .try_into()
599 .map_err(|_| {
600 io::Error::other(
601 "Currently, sqlx_d1 doesn't support multiple D1 bindings!",
602 )
603 })?;
604
605 Ok(sqlite_path)
606 })()
607 .map_err(|_| sqlx_core::Error::WorkerCrashed)?;
608
609 Ok(Self {
610 pragmas: TogglePragmas::new(),
611 sqlite_path,
612 })
613 }
614 }
615 }
616
617 impl sqlx_core::connection::ConnectOptions for D1ConnectOptions {
618 type Connection = D1Connection;
619
620 fn from_url(_url: &Url) -> Result<Self, sqlx_core::Error> {
621 #[cfg(target_arch = "wasm32")]
622 {
623 Err(sqlx_core::Error::Configuration(From::from(
624 URL_CONVERSION_UNSUPPORTED_MESSAGE,
625 )))
626 }
627 #[cfg(not(target_arch = "wasm32"))]
628 {
629 _url.as_str().parse()
630 }
631 }
632
633 fn to_url_lossy(&self) -> Url {
634 unreachable!("`sqlx_d1::ConnectOptions` doesn't support `ConnectOptions::to_url_lossy`")
635 }
636
637 fn connect(&self) -> crate::ResultFuture<'_, Self::Connection>
638 where
639 Self::Connection: Sized,
640 {
641 #[cfg(target_arch = "wasm32")]
642 {
643 Box::pin(worker::send::SendFuture::new(async move {
644 <Self>::connect(self.clone())
645 .await
646 .map_err(|e| sqlx_core::Error::Database(Box::new(e)))
647 }))
648 }
649
650 #[cfg(not(target_arch = "wasm32"))]
651 {
652 Box::pin(async move {
653 use sqlx_core::{connection::Connection, executor::Executor};
654
655 let mut sqlite_conn = sqlx_sqlite::SqliteConnection::connect(
656 self.sqlite_path
657 .to_str()
658 .ok_or(sqlx_core::Error::WorkerCrashed)?,
659 )
660 .await?;
661
662 if let Some(pragmas) = self.pragmas.collect() {
663 for pragma in pragmas {
664 sqlite_conn.execute(pragma).await?;
665 }
666 }
667
668 Ok(D1Connection { inner: sqlite_conn })
669 })
670 }
671 }
672
673 fn log_statements(self, _: log::LevelFilter) -> Self {
674 unreachable!("{LOG_SETTINGS_UNSUPPORTED_MESSAGE}")
675 }
676
677 fn log_slow_statements(self, _: log::LevelFilter, _: std::time::Duration) -> Self {
678 unreachable!("{LOG_SETTINGS_UNSUPPORTED_MESSAGE}")
679 }
680 }
681};
682
683#[derive(Clone, Copy)]
685struct TogglePragmas(u8);
686const _: () = {
687 impl std::ops::Not for TogglePragmas {
688 type Output = Self;
689 fn not(self) -> Self::Output {
690 Self(!self.0)
691 }
692 }
693 impl std::ops::BitOrAssign for TogglePragmas {
694 fn bitor_assign(&mut self, rhs: Self) {
695 self.0 |= rhs.0;
696 }
697 }
698 impl std::ops::BitAndAssign for TogglePragmas {
699 fn bitand_assign(&mut self, rhs: Self) {
700 self.0 &= rhs.0;
701 }
702 }
703
704 impl TogglePragmas {
705 const fn new() -> Self {
706 Self(0)
707 }
708 }
709};
710
711macro_rules! toggles {
712 ($( $name:ident as $bits:literal; )*) => {
713 impl TogglePragmas {
714 $(
715 #[allow(non_upper_case_globals)]
716 const $name: Self = Self($bits);
717 )*
718
719 fn collect(&self) -> Option<Vec<&'static str>> {
720 #[allow(unused_mut)]
721 let mut pragmas = Vec::new();
722 $(
723 if self.0 & Self::$name.0 != 0 {
724 pragmas.push(concat!(
725 "PRAGMA ",
726 stringify!($name),
727 " = on"
728 ));
729 }
730 )*
731 (!pragmas.is_empty()).then_some(pragmas)
732 }
733 }
734
735 impl std::fmt::Debug for TogglePragmas {
736 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
737 let mut f = &mut f.debug_map();
738 $(
739 f = f.entry(
740 &stringify!($name),
741 &if self.0 & Self::$name.0 != 0 {"on"} else {"off"}
742 );
743 )*
744 f.finish()
745 }
746 }
747
748 impl D1ConnectOptions {
749 $(
750 pub fn $name(mut self, yes: bool) -> Self {
751 if yes {
752 self.pragmas |= TogglePragmas::$name;
753 } else {
754 self.pragmas &= !TogglePragmas::$name;
755 }
756 self
757 }
758 )*
759 }
760 };
761}
762toggles! {
763 case_sensitive_like as 0b0000001;
764 ignore_check_constraint as 0b0000010;
765 legacy_alter_table as 0b0000100;
766 recursive_triggers as 0b0001000;
767 unordered_selects as 0b0010000;
768 foreign_keys as 0b0100000;
769 defer_foreign_keys as 0b1000000;
770}