1use sqlx_core::{Either, Url};
2
3#[cfg(target_arch = "wasm32")]
4use {
5 crate::{error::D1Error, row::D1Row},
6 std::pin::Pin,
7 worker::{js_sys, wasm_bindgen::JsValue, wasm_bindgen_futures::JsFuture},
8};
9
10#[cfg(not(target_arch = "wasm32"))]
11macro_rules! unreachable_native_impl_of_item_for_only_wasm32 {
12 ($item_for_only_wasm32:literal) => {
13 panic!(
14 "native `{}`: Invalid use of `sqlx_d1`. Be sure to use `sqlx_d1` where the target is set to \
15 `wasm32-unknown-unknown` ! \n\
16 For this, typcally, place `.cargo/config.toml` of following content at the root of \
17 your project or workspace : \n\
18 \n\
19 [build]\n\
20 target = \"wasm32-unknown-unknown\"\n",
21 $item_for_only_wasm32
22 )
23 };
24}
25
26pub struct D1Connection {
86 #[cfg(target_arch = "wasm32")]
87 pub(crate) inner: worker_sys::D1Database,
88
89 #[cfg(not(target_arch = "wasm32"))]
90 pub(crate) inner: sqlx_sqlite::SqliteConnection,
91}
92
93const _: () = {
94 unsafe impl Send for D1Connection {}
96 unsafe impl Sync for D1Connection {}
97
98 impl D1Connection {
99 pub fn new(d1: worker::D1Database) -> Self {
100 #[cfg(target_arch = "wasm32")]
101 {
102 Self {
103 inner: unsafe {
105 std::mem::transmute::<worker::D1Database, worker_sys::D1Database>(d1)
106 },
107 }
108 }
109 #[cfg(not(target_arch = "wasm32"))]
110 {
111 let _ = d1;
112 unreachable_native_impl_of_item_for_only_wasm32!("D1Cnnection::new");
113 }
114 }
115
116 #[cfg(not(target_arch = "wasm32"))]
117 pub async fn connect(url: impl AsRef<str>) -> Result<Self, sqlx_core::Error> {
118 <Self as sqlx_core::connection::Connection>::connect(url.as_ref()).await
119 }
120 }
121
122 impl Clone for D1Connection {
123 fn clone(&self) -> Self {
124 #[cfg(target_arch = "wasm32")]
125 {
126 Self {
127 inner: self.inner.clone(),
128 }
129 }
130 #[cfg(not(target_arch = "wasm32"))]
131 {
132 unreachable_native_impl_of_item_for_only_wasm32!("impl Clone for D1Connection");
133 }
134 }
135 }
136
137 impl std::fmt::Debug for D1Connection {
138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 f.debug_struct("D1Connection").finish()
140 }
141 }
142
143 impl sqlx_core::connection::Connection for D1Connection {
144 type Database = crate::D1;
145
146 type Options = D1ConnectOptions;
147
148 fn close(self) -> crate::ResultFuture<'static, ()> {
149 Box::pin(async { Ok(()) })
150 }
151
152 fn close_hard(self) -> crate::ResultFuture<'static, ()> {
153 Box::pin(async { Ok(()) })
154 }
155
156 fn ping(&mut self) -> crate::ResultFuture<'_, ()> {
157 Box::pin(async { Ok(()) })
158 }
159
160 fn begin(
161 &mut self,
162 ) -> crate::ResultFuture<'_, sqlx_core::transaction::Transaction<'_, Self::Database>>
163 where
164 Self: Sized,
165 {
166 sqlx_core::transaction::Transaction::begin(self, None)
167 }
168
169 fn shrink_buffers(&mut self) {
170 }
172
173 fn flush(&mut self) -> crate::ResultFuture<'_, ()> {
174 Box::pin(async { Ok(()) })
175 }
176
177 fn should_flush(&self) -> bool {
178 false
179 }
180 }
181
182 impl<'c> sqlx_core::executor::Executor<'c> for &'c mut D1Connection {
183 type Database = crate::D1;
184
185 fn fetch_many<'e, 'q: 'e, E>(
186 self,
187 #[allow(unused)] mut query: E,
188 ) -> futures_core::stream::BoxStream<
189 'e,
190 Result<
191 Either<
192 <Self::Database as sqlx_core::database::Database>::QueryResult,
193 <Self::Database as sqlx_core::database::Database>::Row,
194 >,
195 sqlx_core::Error,
196 >,
197 >
198 where
199 'c: 'e,
200 E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
201 {
202 <&'c D1Connection as sqlx_core::executor::Executor<'c>>::fetch_many(self, query)
203 }
204
205 fn fetch_optional<'e, 'q: 'e, E>(
206 self,
207 #[allow(unused)] mut query: E,
208 ) -> crate::ResultFuture<'e, Option<<Self::Database as sqlx_core::database::Database>::Row>>
209 where
210 'c: 'e,
211 E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
212 {
213 <&'c D1Connection as sqlx_core::executor::Executor<'c>>::fetch_optional(self, query)
214 }
215
216 fn prepare_with<'e, 'q: 'e>(
217 self,
218 sql: &'q str,
219 _parameters: &'e [<Self::Database as sqlx_core::database::Database>::TypeInfo],
220 ) -> crate::ResultFuture<'e, <Self::Database as sqlx_core::database::Database>::Statement<'q>>
221 where
222 'c: 'e,
223 {
224 Box::pin(async {
225 Ok(crate::statement::D1Statement {
226 sql: std::borrow::Cow::Borrowed(sql),
227 })
228 })
229 }
230
231 fn describe<'e, 'q: 'e>(
232 self,
233 #[allow(unused)] sql: &'q str,
234 ) -> crate::ResultFuture<'e, sqlx_core::describe::Describe<Self::Database>>
235 where
236 'c: 'e,
237 {
238 #[cfg(target_arch = "wasm32")]
239 {
240 unreachable!("wasm32 describe")
241 }
242 #[cfg(not(target_arch = "wasm32"))]
243 {
244 Box::pin(async {
247 let sqlx_core::describe::Describe {
248 columns,
249 parameters,
250 nullable
251 } = <&mut sqlx_sqlite::SqliteConnection as sqlx_core::executor::Executor>::describe(
252 &mut self.inner,
253 sql
254 ).await?;
255
256 Ok(sqlx_core::describe::Describe {
257 parameters: parameters.map(|ps| match ps {
258 Either::Left(type_infos) => Either::Left(
259 type_infos
260 .into_iter()
261 .map(crate::type_info::D1TypeInfo::from_sqlite)
262 .collect(),
263 ),
264 Either::Right(n) => Either::Right(n),
265 }),
266 columns: columns
267 .into_iter()
268 .map(crate::column::D1Column::from_sqlite)
269 .collect(),
270 nullable,
271 })
272 })
273 }
274 }
275 }
276
277 impl<'c> sqlx_core::executor::Executor<'c> for &'c D1Connection {
278 type Database = crate::D1;
279
280 fn fetch_many<'e, 'q: 'e, E>(
281 self,
282 #[allow(unused)] mut query: E,
283 ) -> futures_core::stream::BoxStream<
284 'e,
285 Result<
286 Either<
287 <Self::Database as sqlx_core::database::Database>::QueryResult,
288 <Self::Database as sqlx_core::database::Database>::Row,
289 >,
290 sqlx_core::Error,
291 >,
292 >
293 where
294 'c: 'e,
295 E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
296 {
297 #[cfg(not(target_arch = "wasm32"))]
298 {
299 unreachable_native_impl_of_item_for_only_wasm32!("impl Executor for &D1Conection");
300 }
301 #[cfg(target_arch = "wasm32")]
302 {
303 let sql = query.sql();
304 let arguments = match query.take_arguments() {
305 Ok(a) => a,
306 Err(e) => {
307 return Box::pin(futures_util::stream::once(async {
308 Err(sqlx_core::Error::Encode(e))
309 }));
310 }
311 };
312
313 struct FetchMany<F> {
314 raw_rows_future: F,
315 raw_rows: Option<js_sys::ArrayIntoIter>,
316 }
317 const _: () = {
318 unsafe impl<F> Send for FetchMany<F> {}
320
321 impl<F> FetchMany<F> {
322 fn new(raw_rows_future: F) -> Self {
323 Self {
324 raw_rows_future,
325 raw_rows: None,
326 }
327 }
328 }
329
330 impl<F> futures_core::Stream for FetchMany<F>
331 where
332 F: Future<Output = Result<Option<js_sys::Array>, JsValue>>,
333 {
334 type Item = Result<
335 Either<crate::query_result::D1QueryResult, D1Row>,
336 sqlx_core::Error,
337 >;
338
339 fn poll_next(
340 self: Pin<&mut Self>,
341 cx: &mut std::task::Context<'_>,
342 ) -> std::task::Poll<Option<Self::Item>> {
343 use std::task::Poll;
344
345 fn pop_next(
346 raw_rows: &mut js_sys::ArrayIntoIter,
347 ) -> Option<
348 Result<
349 Either<crate::query_result::D1QueryResult, D1Row>,
350 sqlx_core::Error,
351 >,
352 > {
353 let raw_row = raw_rows.next()?;
354 Some(D1Row::from_raw(raw_row).map(Either::Right))
355 }
356
357 let this = unsafe { self.get_unchecked_mut() };
358 match &mut this.raw_rows {
359 Some(raw_rows) => Poll::Ready(pop_next(raw_rows)),
360 None => {
361 match unsafe { Pin::new_unchecked(&mut this.raw_rows_future) }
362 .poll(cx)
363 {
364 Poll::Pending => Poll::Pending,
365 Poll::Ready(Err(e)) => Poll::Ready(Some(Err(
366 sqlx_core::Error::from(D1Error::from(e)),
367 ))),
368 Poll::Ready(Ok(maybe_raw_rows)) => {
369 this.raw_rows = Some(
370 maybe_raw_rows
371 .unwrap_or_else(js_sys::Array::new)
372 .into_iter(),
373 );
374 Poll::Ready(pop_next(unsafe {
375 this.raw_rows.as_mut().unwrap_unchecked()
376 }))
377 }
378 }
379 }
380 }
381 }
382 }
383 };
384
385 Box::pin(FetchMany::new(async move {
386 let mut statement = self.inner.prepare(sql).unwrap();
387 if let Some(a) = arguments {
388 statement = statement.bind(a.as_ref().iter().collect())?;
389 }
390
391 let d1_result_jsvalue = JsFuture::from(statement.all()?).await?;
392 worker_sys::D1Result::from(d1_result_jsvalue).results()
393 }))
394 }
395 }
396
397 fn fetch_optional<'e, 'q: 'e, E>(
398 self,
399 #[allow(unused)] mut query: E,
400 ) -> crate::ResultFuture<'e, Option<<Self::Database as sqlx_core::database::Database>::Row>>
401 where
402 'c: 'e,
403 E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
404 {
405 #[cfg(not(target_arch = "wasm32"))]
406 {
407 unreachable_native_impl_of_item_for_only_wasm32!("impl Executor for &D1Conection");
408 }
409 #[cfg(target_arch = "wasm32")]
410 {
411 let sql = query.sql();
412 let arguments = match query.take_arguments() {
413 Ok(a) => a,
414 Err(e) => return Box::pin(async { Err(sqlx_core::Error::Encode(e)) }),
415 };
416
417 Box::pin(worker::send::SendFuture::new(async move {
418 let mut statement = self.inner.prepare(sql).unwrap();
419 if let Some(a) = arguments {
420 statement = statement
421 .bind(a.as_ref().iter().collect())
422 .map_err(|e| sqlx_core::Error::Encode(Box::new(D1Error::from(e))))?;
423 }
424
425 let raw = JsFuture::from(statement.first(None).map_err(D1Error::from)?)
426 .await
427 .map_err(D1Error::from)?;
428 if raw.is_null() {
429 Ok(None)
430 } else {
431 D1Row::from_raw(raw).map(Some)
432 }
433 }))
434 }
435 }
436
437 fn prepare_with<'e, 'q: 'e>(
438 self,
439 sql: &'q str,
440 _parameters: &'e [<Self::Database as sqlx_core::database::Database>::TypeInfo],
441 ) -> crate::ResultFuture<'e, <Self::Database as sqlx_core::database::Database>::Statement<'q>>
442 where
443 'c: 'e,
444 {
445 Box::pin(async {
446 Ok(crate::statement::D1Statement {
447 sql: std::borrow::Cow::Borrowed(sql),
448 })
449 })
450 }
451
452 fn describe<'e, 'q: 'e>(
453 self,
454 #[allow(unused)] sql: &'q str,
455 ) -> crate::ResultFuture<'e, sqlx_core::describe::Describe<Self::Database>>
456 where
457 'c: 'e,
458 {
459 #[cfg(not(target_arch = "wasm32"))]
460 {
461 unreachable_native_impl_of_item_for_only_wasm32!("impl Executor for &D1Conection");
462 }
463 #[cfg(target_arch = "wasm32")]
464 {
465 unreachable!("wasm32 describe")
466 }
467 }
468 }
469};
470
471#[derive(Clone)]
473pub struct D1ConnectOptions {
474 pragmas: TogglePragmas,
475 #[cfg(target_arch = "wasm32")]
476 d1: worker_sys::D1Database,
477 #[cfg(not(target_arch = "wasm32"))]
478 sqlite_path: std::path::PathBuf,
479}
480const _: () = {
481 unsafe impl Send for D1ConnectOptions {}
483 unsafe impl Sync for D1ConnectOptions {}
484
485 #[cfg(target_arch = "wasm32")]
486 const URL_CONVERSION_UNSUPPORTED_MESSAGE: &str = "\
487 `sqlx_d1::D1ConnectOptions` doesn't support conversion between `Url`. \
488 Consider connect from options created by `D1ConnectOptions::new`. \
489 ";
490
491 const LOG_SETTINGS_UNSUPPORTED_MESSAGE: &str = "\
492 `sqlx_d1::D1ConnectOptions` doesn't support log settings.
493 ";
494
495 impl D1ConnectOptions {
496 pub fn new(#[allow(unused)] d1: worker::D1Database) -> Self {
497 #[cfg(target_arch = "wasm32")]
498 {
499 Self {
500 d1: unsafe {
502 core::mem::transmute::<worker::D1Database, worker_sys::D1Database>(d1)
503 },
504 pragmas: TogglePragmas::new(),
505 }
506 }
507 #[cfg(not(target_arch = "wasm32"))]
508 {
509 unreachable_native_impl_of_item_for_only_wasm32!("D1ConnectOptions::new");
510 }
511 }
512
513 pub async fn connect(self) -> Result<D1Connection, crate::error::D1Error> {
514 #[cfg(target_arch = "wasm32")]
515 {
516 let Self { d1, pragmas } = self;
517 if let Some(pragmas) = pragmas.collect() {
518 JsFuture::from(d1.exec(&pragmas.join("\n")).map_err(D1Error::from)?)
519 .await
520 .map_err(D1Error::from)?;
521 }
522 Ok(D1Connection { inner: d1 })
523 }
524 #[cfg(not(target_arch = "wasm32"))]
525 {
526 unreachable_native_impl_of_item_for_only_wasm32!("D1ConnectOptions::connect");
527 }
528 }
529 }
530
531 impl std::fmt::Debug for D1ConnectOptions {
532 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
533 f.debug_struct("D1ConnectOptions")
534 .field("pragmas", &self.pragmas)
535 .finish()
536 }
537 }
538
539 impl std::str::FromStr for D1ConnectOptions {
540 type Err = sqlx_core::Error;
541
542 fn from_str(_: &str) -> Result<Self, Self::Err> {
543 #[cfg(target_arch = "wasm32")]
544 {
545 Err(sqlx_core::Error::Configuration(From::from(
546 URL_CONVERSION_UNSUPPORTED_MESSAGE,
547 )))
548 }
549
550 #[cfg(not(target_arch = "wasm32"))]
551 {
552 use std::{
553 fs, io,
554 path::{Path, PathBuf},
555 };
556
557 fn maybe_miniflare_d1_dir_of(dir: impl AsRef<Path>) -> PathBuf {
558 dir.as_ref()
559 .join(".wrangler")
560 .join("state")
561 .join("v3")
562 .join("d1")
563 .join("miniflare-D1DatabaseObject")
564 }
565
566 const PACKAGE_ROOT: &str = env!("CARGO_MANIFEST_DIR");
567
568 let (candidate_1, candidate_2) = (
569 maybe_miniflare_d1_dir_of(PACKAGE_ROOT),
570 maybe_miniflare_d1_dir_of("."),
571 );
572
573 let sqlite_path = (|| -> io::Result<PathBuf> {
574 let miniflare_d1_dir =
575 match (fs::exists(&candidate_1), fs::exists(&candidate_2)) {
576 (Ok(true), _) => candidate_1,
577 (_, Ok(true)) => candidate_2,
578 (Err(e), _) | (_, Err(e)) => return Err(e),
579 (Ok(false), Ok(false)) => {
580 return Err(io::Error::new(
581 io::ErrorKind::NotFound,
582 "miniflare's D1 emulating directory not found",
583 ));
584 }
585 };
586
587 let [sqlite_path] = fs::read_dir(miniflare_d1_dir)?
588 .filter_map(|r| {
589 r.as_ref().ok().and_then(|e| {
590 let path = e.path();
591 path.extension()
592 .is_some_and(|ex| ex == "sqlite")
593 .then_some(path)
594 })
595 })
596 .collect::<Vec<_>>()
597 .try_into()
598 .map_err(|_| {
599 io::Error::other(
600 "Currently, sqlx_d1 doesn't support multiple D1 bindings!",
601 )
602 })?;
603
604 Ok(sqlite_path)
605 })()
606 .map_err(|_| sqlx_core::Error::WorkerCrashed)?;
607
608 Ok(Self {
609 pragmas: TogglePragmas::new(),
610 sqlite_path,
611 })
612 }
613 }
614 }
615
616 impl sqlx_core::connection::ConnectOptions for D1ConnectOptions {
617 type Connection = D1Connection;
618
619 fn from_url(_url: &Url) -> Result<Self, sqlx_core::Error> {
620 #[cfg(target_arch = "wasm32")]
621 {
622 Err(sqlx_core::Error::Configuration(From::from(
623 URL_CONVERSION_UNSUPPORTED_MESSAGE,
624 )))
625 }
626 #[cfg(not(target_arch = "wasm32"))]
627 {
628 _url.as_str().parse()
629 }
630 }
631
632 fn to_url_lossy(&self) -> Url {
633 unreachable!("`sqlx_d1::ConnectOptions` doesn't support `ConnectOptions::to_url_lossy`")
634 }
635
636 fn connect(&self) -> crate::ResultFuture<'_, Self::Connection>
637 where
638 Self::Connection: Sized,
639 {
640 #[cfg(target_arch = "wasm32")]
641 {
642 Box::pin(worker::send::SendFuture::new(async move {
643 <Self>::connect(self.clone())
644 .await
645 .map_err(|e| sqlx_core::Error::Database(Box::new(e)))
646 }))
647 }
648
649 #[cfg(not(target_arch = "wasm32"))]
650 {
651 Box::pin(async move {
652 use sqlx_core::{connection::Connection, executor::Executor};
653
654 let mut sqlite_conn = sqlx_sqlite::SqliteConnection::connect(
655 self.sqlite_path
656 .to_str()
657 .ok_or(sqlx_core::Error::WorkerCrashed)?,
658 )
659 .await?;
660
661 if let Some(pragmas) = self.pragmas.collect() {
662 for pragma in pragmas {
663 sqlite_conn.execute(pragma).await?;
664 }
665 }
666
667 Ok(D1Connection { inner: sqlite_conn })
668 })
669 }
670 }
671
672 fn log_statements(self, _: log::LevelFilter) -> Self {
673 unreachable!("{LOG_SETTINGS_UNSUPPORTED_MESSAGE}")
674 }
675
676 fn log_slow_statements(self, _: log::LevelFilter, _: std::time::Duration) -> Self {
677 unreachable!("{LOG_SETTINGS_UNSUPPORTED_MESSAGE}")
678 }
679 }
680};
681
682#[derive(Clone, Copy)]
684struct TogglePragmas(u8);
685const _: () = {
686 impl std::ops::Not for TogglePragmas {
687 type Output = Self;
688 fn not(self) -> Self::Output {
689 Self(!self.0)
690 }
691 }
692 impl std::ops::BitOrAssign for TogglePragmas {
693 fn bitor_assign(&mut self, rhs: Self) {
694 self.0 |= rhs.0;
695 }
696 }
697 impl std::ops::BitAndAssign for TogglePragmas {
698 fn bitand_assign(&mut self, rhs: Self) {
699 self.0 &= rhs.0;
700 }
701 }
702
703 impl TogglePragmas {
704 const fn new() -> Self {
705 Self(0)
706 }
707 }
708};
709
710macro_rules! toggles {
711 ($( $name:ident as $bits:literal; )*) => {
712 impl TogglePragmas {
713 $(
714 #[allow(non_upper_case_globals)]
715 const $name: Self = Self($bits);
716 )*
717
718 fn collect(&self) -> Option<Vec<&'static str>> {
719 #[allow(unused_mut)]
720 let mut pragmas = Vec::new();
721 $(
722 if self.0 & Self::$name.0 != 0 {
723 pragmas.push(concat!(
724 "PRAGMA ",
725 stringify!($name),
726 " = on"
727 ));
728 }
729 )*
730 (!pragmas.is_empty()).then_some(pragmas)
731 }
732 }
733
734 impl std::fmt::Debug for TogglePragmas {
735 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
736 let mut f = &mut f.debug_map();
737 $(
738 f = f.entry(
739 &stringify!($name),
740 &if self.0 & Self::$name.0 != 0 {"on"} else {"off"}
741 );
742 )*
743 f.finish()
744 }
745 }
746
747 impl D1ConnectOptions {
748 $(
749 pub fn $name(mut self, yes: bool) -> Self {
750 if yes {
751 self.pragmas |= TogglePragmas::$name;
752 } else {
753 self.pragmas &= !TogglePragmas::$name;
754 }
755 self
756 }
757 )*
758 }
759 };
760}
761toggles! {
762 case_sensitive_like as 0b0000001;
763 ignore_check_constraint as 0b0000010;
764 legacy_alter_table as 0b0000100;
765 recursive_triggers as 0b0001000;
766 unordered_selects as 0b0010000;
767 foreign_keys as 0b0100000;
768 defer_foreign_keys as 0b1000000;
769}