1use sqlx_core::{Either, Url};
2
3#[cfg(target_arch = "wasm32")]
4use {
5 crate::{error::D1Error, row::D1Row},
6 std::pin::Pin,
7 worker::{js_sys, wasm_bindgen::JsValue, wasm_bindgen_futures::JsFuture},
8};
9
10#[cfg(not(target_arch = "wasm32"))]
11macro_rules! unreachable_native_impl_of_item_for_only_wasm32 {
12 ($item_for_only_wasm32:literal) => {
13 panic!(
14 "native `{}`: Invalid use of `sqlx_d1`. Be sure to use `sqlx_d1` where the target is set to \
15 `wasm32-unknown-unknown` ! \n\
16 For this, typcally, place `.cargo/config.toml` of following content at the root of \
17 your project or workspace : \n\
18 \n\
19 [build]\n\
20 target = \"wasm32-unknown-unknown\"\n",
21 $item_for_only_wasm32
22 )
23 };
24}
25
26pub struct D1Connection {
81 #[cfg(target_arch = "wasm32")]
82 pub(crate) inner: worker_sys::D1Database,
83
84 #[cfg(not(target_arch = "wasm32"))]
85 pub(crate) inner: sqlx_sqlite::SqliteConnection,
86}
87
88const _: () = {
89 unsafe impl Send for D1Connection {}
91 unsafe impl Sync for D1Connection {}
92
93 impl D1Connection {
94 pub fn new(d1: worker::D1Database) -> Self {
95 #[cfg(target_arch = "wasm32")]
96 {
97 Self {
98 inner: unsafe {
100 std::mem::transmute::<worker::D1Database, worker_sys::D1Database>(d1)
101 },
102 }
103 }
104 #[cfg(not(target_arch = "wasm32"))]
105 {
106 let _ = d1;
107 unreachable_native_impl_of_item_for_only_wasm32!("D1Cnnection::new");
108 }
109 }
110
111 #[cfg(not(target_arch = "wasm32"))]
112 pub async fn connect(url: impl AsRef<str>) -> Result<Self, sqlx_core::Error> {
113 <Self as sqlx_core::connection::Connection>::connect(url.as_ref()).await
114 }
115 }
116
117 impl Clone for D1Connection {
118 fn clone(&self) -> Self {
119 #[cfg(target_arch = "wasm32")]
120 {
121 Self {
122 inner: self.inner.clone(),
123 }
124 }
125 #[cfg(not(target_arch = "wasm32"))]
126 {
127 unreachable_native_impl_of_item_for_only_wasm32!("impl Clone for D1Connection");
128 }
129 }
130 }
131
132 impl std::fmt::Debug for D1Connection {
133 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134 f.debug_struct("D1Connection").finish()
135 }
136 }
137
138 impl sqlx_core::connection::Connection for D1Connection {
139 type Database = crate::D1;
140
141 type Options = D1ConnectOptions;
142
143 fn close(self) -> crate::ResultFuture<'static, ()> {
144 Box::pin(async { Ok(()) })
145 }
146
147 fn close_hard(self) -> crate::ResultFuture<'static, ()> {
148 Box::pin(async { Ok(()) })
149 }
150
151 fn ping(&mut self) -> crate::ResultFuture<'_, ()> {
152 Box::pin(async { Ok(()) })
153 }
154
155 fn begin(
156 &mut self,
157 ) -> crate::ResultFuture<'_, sqlx_core::transaction::Transaction<'_, Self::Database>>
158 where
159 Self: Sized,
160 {
161 sqlx_core::transaction::Transaction::begin(self)
162 }
163
164 fn shrink_buffers(&mut self) {
165 }
167
168 fn flush(&mut self) -> crate::ResultFuture<'_, ()> {
169 Box::pin(async { Ok(()) })
170 }
171
172 fn should_flush(&self) -> bool {
173 false
174 }
175 }
176
177 impl<'c> sqlx_core::executor::Executor<'c> for &'c mut D1Connection {
178 type Database = crate::D1;
179
180 fn fetch_many<'e, 'q: 'e, E>(
181 self,
182 #[allow(unused)] mut query: E,
183 ) -> futures_core::stream::BoxStream<
184 'e,
185 Result<
186 Either<
187 <Self::Database as sqlx_core::database::Database>::QueryResult,
188 <Self::Database as sqlx_core::database::Database>::Row,
189 >,
190 sqlx_core::Error,
191 >,
192 >
193 where
194 'c: 'e,
195 E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
196 {
197 <&'c D1Connection as sqlx_core::executor::Executor<'c>>::fetch_many(self, query)
198 }
199
200 fn fetch_optional<'e, 'q: 'e, E>(
201 self,
202 #[allow(unused)] mut query: E,
203 ) -> crate::ResultFuture<'e, Option<<Self::Database as sqlx_core::database::Database>::Row>>
204 where
205 'c: 'e,
206 E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
207 {
208 <&'c D1Connection as sqlx_core::executor::Executor<'c>>::fetch_optional(self, query)
209 }
210
211 fn prepare_with<'e, 'q: 'e>(
212 self,
213 sql: &'q str,
214 _parameters: &'e [<Self::Database as sqlx_core::database::Database>::TypeInfo],
215 ) -> crate::ResultFuture<'e, <Self::Database as sqlx_core::database::Database>::Statement<'q>>
216 where
217 'c: 'e,
218 {
219 Box::pin(async {
220 Ok(crate::statement::D1Statement {
221 sql: std::borrow::Cow::Borrowed(sql),
222 })
223 })
224 }
225
226 fn describe<'e, 'q: 'e>(
227 self,
228 #[allow(unused)] sql: &'q str,
229 ) -> crate::ResultFuture<'e, sqlx_core::describe::Describe<Self::Database>>
230 where
231 'c: 'e,
232 {
233 #[cfg(target_arch = "wasm32")]
234 {
235 unreachable!("wasm32 describe")
236 }
237 #[cfg(not(target_arch = "wasm32"))]
238 {
239 Box::pin(async {
242 let sqlx_core::describe::Describe {
243 columns,
244 parameters,
245 nullable
246 } = <&mut sqlx_sqlite::SqliteConnection as sqlx_core::executor::Executor>::describe(
247 &mut self.inner,
248 sql
249 ).await?;
250
251 Ok(sqlx_core::describe::Describe {
252 parameters: parameters.map(|ps| match ps {
253 Either::Left(type_infos) => Either::Left(
254 type_infos
255 .into_iter()
256 .map(crate::type_info::D1TypeInfo::from_sqlite)
257 .collect(),
258 ),
259 Either::Right(n) => Either::Right(n),
260 }),
261 columns: columns
262 .into_iter()
263 .map(crate::column::D1Column::from_sqlite)
264 .collect(),
265 nullable,
266 })
267 })
268 }
269 }
270 }
271
272 impl<'c> sqlx_core::executor::Executor<'c> for &'c D1Connection {
273 type Database = crate::D1;
274
275 fn fetch_many<'e, 'q: 'e, E>(
276 self,
277 #[allow(unused)] mut query: E,
278 ) -> futures_core::stream::BoxStream<
279 'e,
280 Result<
281 Either<
282 <Self::Database as sqlx_core::database::Database>::QueryResult,
283 <Self::Database as sqlx_core::database::Database>::Row,
284 >,
285 sqlx_core::Error,
286 >,
287 >
288 where
289 'c: 'e,
290 E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
291 {
292 #[cfg(not(target_arch = "wasm32"))]
293 {
294 unreachable_native_impl_of_item_for_only_wasm32!("impl Executor for &D1Conection");
295 }
296 #[cfg(target_arch = "wasm32")]
297 {
298 let sql = query.sql();
299 let arguments = match query.take_arguments() {
300 Ok(a) => a,
301 Err(e) => {
302 return Box::pin(futures_util::stream::once(async {
303 Err(sqlx_core::Error::Encode(e))
304 }));
305 }
306 };
307
308 struct FetchMany<F> {
309 raw_rows_future: F,
310 raw_rows: Option<js_sys::ArrayIntoIter>,
311 }
312 const _: () = {
313 unsafe impl<F> Send for FetchMany<F> {}
315
316 impl<F> FetchMany<F> {
317 fn new(raw_rows_future: F) -> Self {
318 Self {
319 raw_rows_future,
320 raw_rows: None,
321 }
322 }
323 }
324
325 impl<F> futures_core::Stream for FetchMany<F>
326 where
327 F: Future<Output = Result<Option<js_sys::Array>, JsValue>>,
328 {
329 type Item = Result<
330 Either<crate::query_result::D1QueryResult, D1Row>,
331 sqlx_core::Error,
332 >;
333
334 fn poll_next(
335 self: Pin<&mut Self>,
336 cx: &mut std::task::Context<'_>,
337 ) -> std::task::Poll<Option<Self::Item>> {
338 use std::task::Poll;
339
340 fn pop_next(
341 raw_rows: &mut js_sys::ArrayIntoIter,
342 ) -> Option<
343 Result<
344 Either<crate::query_result::D1QueryResult, D1Row>,
345 sqlx_core::Error,
346 >,
347 > {
348 let raw_row = raw_rows.next()?;
349 Some(D1Row::from_raw(raw_row).map(Either::Right))
350 }
351
352 let this = unsafe { self.get_unchecked_mut() };
353 match &mut this.raw_rows {
354 Some(raw_rows) => Poll::Ready(pop_next(raw_rows)),
355 None => {
356 match unsafe { Pin::new_unchecked(&mut this.raw_rows_future) }
357 .poll(cx)
358 {
359 Poll::Pending => Poll::Pending,
360 Poll::Ready(Err(e)) => Poll::Ready(Some(Err(
361 sqlx_core::Error::from(D1Error::from(e)),
362 ))),
363 Poll::Ready(Ok(maybe_raw_rows)) => {
364 this.raw_rows = Some(
365 maybe_raw_rows
366 .unwrap_or_else(js_sys::Array::new)
367 .into_iter(),
368 );
369 Poll::Ready(pop_next(unsafe {
370 this.raw_rows.as_mut().unwrap_unchecked()
371 }))
372 }
373 }
374 }
375 }
376 }
377 }
378 };
379
380 Box::pin(FetchMany::new(async move {
381 let mut statement = self.inner.prepare(sql).unwrap();
382 if let Some(a) = arguments {
383 statement = statement.bind(a.as_ref().iter().collect())?;
384 }
385
386 let d1_result_jsvalue = JsFuture::from(statement.all()?).await?;
387 worker_sys::D1Result::from(d1_result_jsvalue).results()
388 }))
389 }
390 }
391
392 fn fetch_optional<'e, 'q: 'e, E>(
393 self,
394 #[allow(unused)] mut query: E,
395 ) -> crate::ResultFuture<'e, Option<<Self::Database as sqlx_core::database::Database>::Row>>
396 where
397 'c: 'e,
398 E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
399 {
400 #[cfg(not(target_arch = "wasm32"))]
401 {
402 unreachable_native_impl_of_item_for_only_wasm32!("impl Executor for &D1Conection");
403 }
404 #[cfg(target_arch = "wasm32")]
405 {
406 let sql = query.sql();
407 let arguments = match query.take_arguments() {
408 Ok(a) => a,
409 Err(e) => return Box::pin(async { Err(sqlx_core::Error::Encode(e)) }),
410 };
411
412 Box::pin(worker::send::SendFuture::new(async move {
413 let mut statement = self.inner.prepare(sql).unwrap();
414 if let Some(a) = arguments {
415 statement = statement
416 .bind(a.as_ref().iter().collect())
417 .map_err(|e| sqlx_core::Error::Encode(Box::new(D1Error::from(e))))?;
418 }
419
420 let raw = JsFuture::from(statement.first(None).map_err(D1Error::from)?)
421 .await
422 .map_err(D1Error::from)?;
423 if raw.is_null() {
424 Ok(None)
425 } else {
426 D1Row::from_raw(raw).map(Some)
427 }
428 }))
429 }
430 }
431
432 fn prepare_with<'e, 'q: 'e>(
433 self,
434 sql: &'q str,
435 _parameters: &'e [<Self::Database as sqlx_core::database::Database>::TypeInfo],
436 ) -> crate::ResultFuture<'e, <Self::Database as sqlx_core::database::Database>::Statement<'q>>
437 where
438 'c: 'e,
439 {
440 Box::pin(async {
441 Ok(crate::statement::D1Statement {
442 sql: std::borrow::Cow::Borrowed(sql),
443 })
444 })
445 }
446
447 fn describe<'e, 'q: 'e>(
448 self,
449 #[allow(unused)] sql: &'q str,
450 ) -> crate::ResultFuture<'e, sqlx_core::describe::Describe<Self::Database>>
451 where
452 'c: 'e,
453 {
454 #[cfg(not(target_arch = "wasm32"))]
455 {
456 unreachable_native_impl_of_item_for_only_wasm32!("impl Executor for &D1Conection");
457 }
458 #[cfg(target_arch = "wasm32")]
459 {
460 unreachable!("wasm32 describe")
461 }
462 }
463 }
464};
465
466#[derive(Clone)]
468pub struct D1ConnectOptions {
469 pragmas: TogglePragmas,
470 #[cfg(target_arch = "wasm32")]
471 d1: worker_sys::D1Database,
472 #[cfg(not(target_arch = "wasm32"))]
473 sqlite_path: std::path::PathBuf,
474}
475const _: () = {
476 unsafe impl Send for D1ConnectOptions {}
478 unsafe impl Sync for D1ConnectOptions {}
479
480 #[cfg(target_arch = "wasm32")]
481 const URL_CONVERSION_UNSUPPORTED_MESSAGE: &str = "\
482 `sqlx_d1::D1ConnectOptions` doesn't support conversion between `Url`. \
483 Consider connect from options created by `D1ConnectOptions::new`. \
484 ";
485
486 const LOG_SETTINGS_UNSUPPORTED_MESSAGE: &str = "\
487 `sqlx_d1::D1ConnectOptions` doesn't support log settings.
488 ";
489
490 impl D1ConnectOptions {
491 pub fn new(#[allow(unused)] d1: worker::D1Database) -> Self {
492 #[cfg(target_arch = "wasm32")]
493 {
494 Self {
495 d1: unsafe {
497 core::mem::transmute::<worker::D1Database, worker_sys::D1Database>(d1)
498 },
499 pragmas: TogglePragmas::new(),
500 }
501 }
502 #[cfg(not(target_arch = "wasm32"))]
503 {
504 unreachable_native_impl_of_item_for_only_wasm32!("D1ConnectOptions::new");
505 }
506 }
507
508 pub async fn connect(self) -> Result<D1Connection, crate::error::D1Error> {
509 #[cfg(target_arch = "wasm32")]
510 {
511 let Self { d1, pragmas } = self;
512 if let Some(pragmas) = pragmas.collect() {
513 JsFuture::from(d1.exec(&pragmas.join("\n")).map_err(D1Error::from)?)
514 .await
515 .map_err(D1Error::from)?;
516 }
517 Ok(D1Connection { inner: d1 })
518 }
519 #[cfg(not(target_arch = "wasm32"))]
520 {
521 unreachable_native_impl_of_item_for_only_wasm32!("D1ConnectOptions::connect");
522 }
523 }
524 }
525
526 impl std::fmt::Debug for D1ConnectOptions {
527 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
528 f.debug_struct("D1ConnectOptions")
529 .field("pragmas", &self.pragmas)
530 .finish()
531 }
532 }
533
534 impl std::str::FromStr for D1ConnectOptions {
535 type Err = sqlx_core::Error;
536
537 fn from_str(_: &str) -> Result<Self, Self::Err> {
538 #[cfg(target_arch = "wasm32")]
539 {
540 Err(sqlx_core::Error::Configuration(From::from(
541 URL_CONVERSION_UNSUPPORTED_MESSAGE,
542 )))
543 }
544
545 #[cfg(not(target_arch = "wasm32"))]
546 {
547 use std::{
548 fs, io,
549 path::{Path, PathBuf},
550 };
551
552 fn maybe_miniflare_d1_dir_of(dir: impl AsRef<Path>) -> PathBuf {
553 dir.as_ref()
554 .join(".wrangler")
555 .join("state")
556 .join("v3")
557 .join("d1")
558 .join("miniflare-D1DatabaseObject")
559 }
560
561 const PACKAGE_ROOT: &str = env!("CARGO_MANIFEST_DIR");
562
563 let (candidate_1, candidate_2) = (
564 maybe_miniflare_d1_dir_of(PACKAGE_ROOT),
565 maybe_miniflare_d1_dir_of("."),
566 );
567
568 let sqlite_path = (|| -> io::Result<PathBuf> {
569 let miniflare_d1_dir =
570 match (fs::exists(&candidate_1), fs::exists(&candidate_2)) {
571 (Ok(true), _) => candidate_1,
572 (_, Ok(true)) => candidate_2,
573 (Err(e), _) | (_, Err(e)) => return Err(e),
574 (Ok(false), Ok(false)) => {
575 return Err(io::Error::new(
576 io::ErrorKind::NotFound,
577 "miniflare's D1 emulating directory not found",
578 ));
579 }
580 };
581
582 let [sqlite_path] = fs::read_dir(miniflare_d1_dir)?
583 .filter_map(|r| {
584 r.as_ref().ok().and_then(|e| {
585 let path = e.path();
586 path.extension()
587 .is_some_and(|ex| ex == "sqlite")
588 .then_some(path)
589 })
590 })
591 .collect::<Vec<_>>()
592 .try_into()
593 .map_err(|_| {
594 io::Error::other(
595 "Currently, sqlx_d1 doesn't support multiple D1 bindings!",
596 )
597 })?;
598
599 Ok(sqlite_path)
600 })()
601 .map_err(|_| sqlx_core::Error::WorkerCrashed)?;
602
603 Ok(Self {
604 pragmas: TogglePragmas::new(),
605 sqlite_path,
606 })
607 }
608 }
609 }
610
611 impl sqlx_core::connection::ConnectOptions for D1ConnectOptions {
612 type Connection = D1Connection;
613
614 fn from_url(_url: &Url) -> Result<Self, sqlx_core::Error> {
615 #[cfg(target_arch = "wasm32")]
616 {
617 Err(sqlx_core::Error::Configuration(From::from(
618 URL_CONVERSION_UNSUPPORTED_MESSAGE,
619 )))
620 }
621 #[cfg(not(target_arch = "wasm32"))]
622 {
623 _url.as_str().parse()
624 }
625 }
626
627 fn to_url_lossy(&self) -> Url {
628 unreachable!("`sqlx_d1::ConnectOptions` doesn't support `ConnectOptions::to_url_lossy`")
629 }
630
631 fn connect(&self) -> crate::ResultFuture<'_, Self::Connection>
632 where
633 Self::Connection: Sized,
634 {
635 #[cfg(target_arch = "wasm32")]
636 {
637 Box::pin(worker::send::SendFuture::new(async move {
638 <Self>::connect(self.clone())
639 .await
640 .map_err(|e| sqlx_core::Error::Database(Box::new(e)))
641 }))
642 }
643
644 #[cfg(not(target_arch = "wasm32"))]
645 {
646 Box::pin(async move {
647 use sqlx_core::{connection::Connection, executor::Executor};
648
649 let mut sqlite_conn = sqlx_sqlite::SqliteConnection::connect(
650 self.sqlite_path
651 .to_str()
652 .ok_or(sqlx_core::Error::WorkerCrashed)?,
653 )
654 .await?;
655
656 if let Some(pragmas) = self.pragmas.collect() {
657 for pragma in pragmas {
658 sqlite_conn.execute(pragma).await?;
659 }
660 }
661
662 Ok(D1Connection { inner: sqlite_conn })
663 })
664 }
665 }
666
667 fn log_statements(self, _: log::LevelFilter) -> Self {
668 unreachable!("{LOG_SETTINGS_UNSUPPORTED_MESSAGE}")
669 }
670
671 fn log_slow_statements(self, _: log::LevelFilter, _: std::time::Duration) -> Self {
672 unreachable!("{LOG_SETTINGS_UNSUPPORTED_MESSAGE}")
673 }
674 }
675};
676
677#[derive(Clone, Copy)]
679struct TogglePragmas(u8);
680const _: () = {
681 impl std::ops::Not for TogglePragmas {
682 type Output = Self;
683 fn not(self) -> Self::Output {
684 Self(!self.0)
685 }
686 }
687 impl std::ops::BitOrAssign for TogglePragmas {
688 fn bitor_assign(&mut self, rhs: Self) {
689 self.0 |= rhs.0;
690 }
691 }
692 impl std::ops::BitAndAssign for TogglePragmas {
693 fn bitand_assign(&mut self, rhs: Self) {
694 self.0 &= rhs.0;
695 }
696 }
697
698 impl TogglePragmas {
699 const fn new() -> Self {
700 Self(0)
701 }
702 }
703};
704
705macro_rules! toggles {
706 ($( $name:ident as $bits:literal; )*) => {
707 impl TogglePragmas {
708 $(
709 #[allow(non_upper_case_globals)]
710 const $name: Self = Self($bits);
711 )*
712
713 fn collect(&self) -> Option<Vec<&'static str>> {
714 #[allow(unused_mut)]
715 let mut pragmas = Vec::new();
716 $(
717 if self.0 & Self::$name.0 != 0 {
718 pragmas.push(concat!(
719 "PRAGMA ",
720 stringify!($name),
721 " = on"
722 ));
723 }
724 )*
725 (!pragmas.is_empty()).then_some(pragmas)
726 }
727 }
728
729 impl std::fmt::Debug for TogglePragmas {
730 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
731 let mut f = &mut f.debug_map();
732 $(
733 f = f.entry(
734 &stringify!($name),
735 &if self.0 & Self::$name.0 != 0 {"on"} else {"off"}
736 );
737 )*
738 f.finish()
739 }
740 }
741
742 impl D1ConnectOptions {
743 $(
744 pub fn $name(mut self, yes: bool) -> Self {
745 if yes {
746 self.pragmas |= TogglePragmas::$name;
747 } else {
748 self.pragmas &= !TogglePragmas::$name;
749 }
750 self
751 }
752 )*
753 }
754 };
755}
756toggles! {
757 case_sensitive_like as 0b0000001;
758 ignore_check_constraint as 0b0000010;
759 legacy_alter_table as 0b0000100;
760 recursive_triggers as 0b0001000;
761 unordered_selects as 0b0010000;
762 foreign_keys as 0b0100000;
763 defer_foreign_keys as 0b1000000;
764}