1use sqlx_core::{Url, Either};
2
3#[cfg(target_arch = "wasm32")]
4use {
5 crate::{error::D1Error, row::D1Row},
6 std::pin::Pin,
7 worker::{wasm_bindgen::JsValue, wasm_bindgen_futures::JsFuture, js_sys},
8};
9
10#[cfg(not(target_arch = "wasm32"))]
11macro_rules! unreachable_native_impl_of_item_for_only_wasm32 {
12 ($item_for_only_wasm32:literal) => {
13 panic!(
14 "native `{}`: Invalid use of `sqlx_d1`. Be sure to use `sqlx_d1` where the target is set to \
15 `wasm32-unknown-unknown` ! \n\
16 For this, typcally, place `.cargo/config.toml` of following content at the root of \
17 your project or workspace : \n\
18 \n\
19 [build]\n\
20 target = \"wasm32-unknown-unknown\"\n",
21 $item_for_only_wasm32
22 )
23 };
24}
25
26pub struct D1Connection {
81 #[cfg(target_arch = "wasm32")]
82 pub(crate) inner: worker_sys::D1Database,
83
84 #[cfg(not(target_arch = "wasm32"))]
85 pub(crate) inner: sqlx_sqlite::SqliteConnection,
86}
87
88const _: () = {
89 unsafe impl Send for D1Connection {}
91 unsafe impl Sync for D1Connection {}
92
93 impl D1Connection {
94 pub fn new(d1: worker::D1Database) -> Self {
95 #[cfg(target_arch = "wasm32")] {
96 Self { inner: unsafe {std::mem::transmute(d1)} }
97 }
98 #[cfg(not(target_arch = "wasm32"))] {
99 let _ = d1;
100 unreachable_native_impl_of_item_for_only_wasm32!("D1Cnnection::new");
101 }
102 }
103
104 #[cfg(not(target_arch = "wasm32"))]
105 pub async fn connect(url: impl AsRef<str>) -> Result<Self, sqlx_core::Error> {
106 <Self as sqlx_core::connection::Connection>::connect(url.as_ref()).await
107 }
108 }
109
110 impl Clone for D1Connection {
111 fn clone(&self) -> Self {
112 #[cfg(target_arch = "wasm32")] {
113 Self { inner: self.inner.clone() }
114 }
115 #[cfg(not(target_arch = "wasm32"))] {
116 unreachable_native_impl_of_item_for_only_wasm32!("impl Clone for D1Connection");
117 }
118 }
119 }
120
121 impl std::fmt::Debug for D1Connection {
122 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123 f.debug_struct("D1Connection").finish()
124 }
125 }
126
127 impl sqlx_core::connection::Connection for D1Connection {
128 type Database = crate::D1;
129
130 type Options = D1ConnectOptions;
131
132 fn close(self) -> crate::ResultFuture<'static, ()> {
133 Box::pin(async {Ok(())})
134 }
135
136 fn close_hard(self) -> crate::ResultFuture<'static, ()> {
137 Box::pin(async {Ok(())})
138 }
139
140 fn ping(&mut self) -> crate::ResultFuture<'_, ()> {
141 Box::pin(async {Ok(())})
142 }
143
144 fn begin(&mut self) -> crate::ResultFuture<'_, sqlx_core::transaction::Transaction<'_, Self::Database>>
145 where
146 Self: Sized,
147 {
148 sqlx_core::transaction::Transaction::begin(self)
149 }
150
151 fn shrink_buffers(&mut self) {
152 }
154
155 fn flush(&mut self) -> crate::ResultFuture<'_, ()> {
156 Box::pin(async {Ok(())})
157 }
158
159 fn should_flush(&self) -> bool {
160 false
161 }
162 }
163
164 impl<'c> sqlx_core::executor::Executor<'c> for &'c mut D1Connection {
165 type Database = crate::D1;
166
167 fn fetch_many<'e, 'q: 'e, E>(
168 self,
169 #[allow(unused)]
170 mut query: E,
171 ) -> futures_core::stream::BoxStream<
172 'e,
173 Result<
174 Either<
175 <Self::Database as sqlx_core::database::Database>::QueryResult,
176 <Self::Database as sqlx_core::database::Database>::Row
177 >,
178 sqlx_core::Error,
179 >,
180 >
181 where
182 'c: 'e,
183 E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
184 {
185 <&'c D1Connection as sqlx_core::executor::Executor<'c>>::fetch_many(self, query)
186 }
187
188 fn fetch_optional<'e, 'q: 'e, E>(
189 self,
190 #[allow(unused)]
191 mut query: E,
192 ) -> crate::ResultFuture<'e, Option<<Self::Database as sqlx_core::database::Database>::Row>>
193 where
194 'c: 'e,
195 E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
196 {
197 <&'c D1Connection as sqlx_core::executor::Executor<'c>>::fetch_optional(self, query)
198 }
199
200 fn prepare_with<'e, 'q: 'e>(
201 self,
202 sql: &'q str,
203 _parameters: &'e [<Self::Database as sqlx_core::database::Database>::TypeInfo],
204 ) -> crate::ResultFuture<'e, <Self::Database as sqlx_core::database::Database>::Statement<'q>>
205 where
206 'c: 'e,
207 {
208 Box::pin(async {
209 Ok(crate::statement::D1Statement {
210 sql: std::borrow::Cow::Borrowed(sql),
211 })
212 })
213 }
214
215 fn describe<'e, 'q: 'e>(
216 self,
217 #[allow(unused)]
218 sql: &'q str,
219 ) -> crate::ResultFuture<'e, sqlx_core::describe::Describe<Self::Database>>
220 where
221 'c: 'e,
222 {
223 #[cfg(target_arch = "wasm32")] {
224 unreachable!("wasm32 describe")
225 }
226 #[cfg(not(target_arch = "wasm32"))] {
227 Box::pin(async {
230 let sqlx_core::describe::Describe {
231 columns,
232 parameters,
233 nullable
234 } = <&mut sqlx_sqlite::SqliteConnection as sqlx_core::executor::Executor>::describe(
235 &mut self.inner,
236 sql
237 ).await?;
238
239 Ok(sqlx_core::describe::Describe {
240 parameters: parameters.map(|ps| match ps {
241 Either::Left(type_infos) => Either::Left(type_infos.into_iter().map(crate::type_info::D1TypeInfo::from_sqlite).collect()),
242 Either::Right(n) => Either::Right(n)
243 }),
244 columns: columns.into_iter().map(crate::column::D1Column::from_sqlite).collect(),
245 nullable
246 })
247 })
248 }
249 }
250 }
251
252 impl<'c> sqlx_core::executor::Executor<'c> for &'c D1Connection {
253 type Database = crate::D1;
254
255 fn fetch_many<'e, 'q: 'e, E>(
256 self,
257 #[allow(unused)]
258 mut query: E,
259 ) -> futures_core::stream::BoxStream<
260 'e,
261 Result<
262 Either<
263 <Self::Database as sqlx_core::database::Database>::QueryResult,
264 <Self::Database as sqlx_core::database::Database>::Row
265 >,
266 sqlx_core::Error,
267 >,
268 >
269 where
270 'c: 'e,
271 E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
272 {
273 #[cfg(not(target_arch = "wasm32"))] {
274 unreachable_native_impl_of_item_for_only_wasm32!("impl Executor for &D1Conection");
275 }
276 #[cfg(target_arch = "wasm32")] {
277 let sql = query.sql();
278 let arguments = match query.take_arguments() {
279 Ok(a) => a,
280 Err(e) => return Box::pin(futures_util::stream::once(async {Err(sqlx_core::Error::Encode(e))})),
281 };
282
283 struct FetchMany<F> {
284 raw_rows_future: F,
285 raw_rows: Option<js_sys::ArrayIntoIter>,
286 }
287 const _: () = {
288 unsafe impl<F> Send for FetchMany<F> {}
290
291 impl<F> FetchMany<F> {
292 fn new(raw_rows_future: F) -> Self {
293 Self { raw_rows_future, raw_rows: None }
294 }
295 }
296
297 impl<F> futures_core::Stream for FetchMany<F>
298 where
299 F: Future<Output = Result<Option<js_sys::Array>, JsValue>>,
300 {
301 type Item = Result<
302 Either<crate::query_result::D1QueryResult, D1Row>,
303 sqlx_core::Error
304 >;
305
306 fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
307 use std::task::Poll;
308
309 fn pop_next(raw_rows: &mut js_sys::ArrayIntoIter) ->
310 Option<Result<
311 Either<crate::query_result::D1QueryResult, D1Row>,
312 sqlx_core::Error
313 >>
314 {
315 let raw_row = raw_rows.next()?;
316 Some(D1Row::from_raw(raw_row).map(Either::Right))
317 }
318
319 let this = unsafe {self.get_unchecked_mut()};
320 match &mut this.raw_rows {
321 Some(raw_rows) => Poll::Ready(pop_next(raw_rows)),
322 None => match unsafe {Pin::new_unchecked(&mut this.raw_rows_future)}.poll(cx) {
323 Poll::Pending => Poll::Pending,
324 Poll::Ready(Err(e)) => Poll::Ready(Some(Err(
325 sqlx_core::Error::from(D1Error::from(e))
326 ))),
327 Poll::Ready(Ok(maybe_raw_rows)) => {
328 this.raw_rows = Some(maybe_raw_rows.unwrap_or_else(js_sys::Array::new).into_iter());
329 Poll::Ready(pop_next(unsafe {this.raw_rows.as_mut().unwrap_unchecked()}))
330 }
331 }
332 }
333 }
334 }
335 };
336
337 Box::pin(FetchMany::new(async move {
338 let mut statement = self.inner.prepare(sql).unwrap();
339 if let Some(a) = arguments {
340 statement = statement.bind(a.as_ref().iter().collect())?;
341 }
342
343 let d1_result_jsvalue = JsFuture::from(statement.all()?)
344 .await?;
345 worker_sys::D1Result::from(d1_result_jsvalue)
346 .results()
347 }))
348 }
349 }
350
351 fn fetch_optional<'e, 'q: 'e, E>(
352 self,
353 #[allow(unused)]
354 mut query: E,
355 ) -> crate::ResultFuture<'e, Option<<Self::Database as sqlx_core::database::Database>::Row>>
356 where
357 'c: 'e,
358 E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
359 {
360 #[cfg(not(target_arch = "wasm32"))] {
361 unreachable_native_impl_of_item_for_only_wasm32!("impl Executor for &D1Conection");
362 }
363 #[cfg(target_arch = "wasm32")] {
364 let sql = query.sql();
365 let arguments = match query.take_arguments() {
366 Ok(a) => a,
367 Err(e) => return Box::pin(async {Err(sqlx_core::Error::Encode(e))}),
368 };
369
370 Box::pin(worker::send::SendFuture::new(async move {
371 let mut statement = self.inner.prepare(sql).unwrap();
372 if let Some(a) = arguments {
373 statement = statement
374 .bind(a.as_ref().iter().collect())
375 .map_err(|e| sqlx_core::Error::Encode(Box::new(D1Error::from(e))))?;
376 }
377
378 let raw = JsFuture::from(statement.first(None).map_err(D1Error::from)?)
379 .await
380 .map_err(D1Error::from)?;
381 if raw.is_null() {
382 Ok(None)
383 } else {
384 D1Row::from_raw(raw).map(Some)
385 }
386 }))
387 }
388 }
389
390 fn prepare_with<'e, 'q: 'e>(
391 self,
392 sql: &'q str,
393 _parameters: &'e [<Self::Database as sqlx_core::database::Database>::TypeInfo],
394 ) -> crate::ResultFuture<'e, <Self::Database as sqlx_core::database::Database>::Statement<'q>>
395 where
396 'c: 'e,
397 {
398 Box::pin(async {
399 Ok(crate::statement::D1Statement {
400 sql: std::borrow::Cow::Borrowed(sql),
401 })
402 })
403 }
404
405 fn describe<'e, 'q: 'e>(
406 self,
407 #[allow(unused)]
408 sql: &'q str,
409 ) -> crate::ResultFuture<'e, sqlx_core::describe::Describe<Self::Database>>
410 where
411 'c: 'e,
412 {
413 #[cfg(not(target_arch = "wasm32"))] {
414 unreachable_native_impl_of_item_for_only_wasm32!("impl Executor for &D1Conection");
415 }
416 #[cfg(target_arch = "wasm32")] {
417 unreachable!("wasm32 describe")
418 }
419 }
420 }
421};
422
423#[derive(Clone)]
425pub struct D1ConnectOptions {
426 pragmas: TogglePragmas,
427 #[cfg(target_arch = "wasm32")]
428 d1: worker_sys::D1Database,
429 #[cfg(not(target_arch = "wasm32"))]
430 sqlite_path: std::path::PathBuf,
431}
432const _: () = {
433 unsafe impl Send for D1ConnectOptions {}
435 unsafe impl Sync for D1ConnectOptions {}
436
437 #[cfg(target_arch = "wasm32")]
438 const URL_CONVERSION_UNSUPPORTED_MESSAGE: &'static str = "\
439 `sqlx_d1::D1ConnectOptions` doesn't support conversion between `Url`. \
440 Consider connect from options created by `D1ConnectOptions::new`. \
441 ";
442
443 const LOG_SETTINGS_UNSUPPORTED_MESSAGE: &'static str = "\
444 `sqlx_d1::D1ConnectOptions` doesn't support log settings.
445 ";
446
447 impl D1ConnectOptions {
448 pub fn new(
449 #[allow(unused)]
450 d1: worker::D1Database,
451 ) -> Self {
452 #[cfg(target_arch = "wasm32")] {
453 Self {
454 d1: unsafe {core::mem::transmute(d1)},
455 pragmas: TogglePragmas::new(),
456 }
457 }
458 #[cfg(not(target_arch = "wasm32"))] {
459 unreachable_native_impl_of_item_for_only_wasm32!("D1ConnectOptions::new");
460 }
461 }
462
463 pub async fn connect(self) -> Result<D1Connection, crate::error::D1Error> {
464 #[cfg(target_arch = "wasm32")] {
465 let Self { d1, pragmas } = self;
466 if let Some(pragmas) = pragmas.collect() {
467 JsFuture::from(d1.exec(&pragmas.join("\n")).map_err(D1Error::from)?)
468 .await
469 .map_err(D1Error::from)?;
470 }
471 Ok(D1Connection { inner: d1 })
472 }
473 #[cfg(not(target_arch = "wasm32"))] {
474 unreachable_native_impl_of_item_for_only_wasm32!("D1ConnectOptions::connect");
475 }
476 }
477 }
478
479 impl std::fmt::Debug for D1ConnectOptions {
480 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
481 f.debug_struct("D1ConnectOptions")
482 .field("pragmas", &self.pragmas)
483 .finish()
484 }
485 }
486
487 impl std::str::FromStr for D1ConnectOptions {
488 type Err = sqlx_core::Error;
489
490 fn from_str(_: &str) -> Result<Self, Self::Err> {
491 #[cfg(target_arch = "wasm32")] {
492 Err(sqlx_core::Error::Configuration(From::from(
493 URL_CONVERSION_UNSUPPORTED_MESSAGE
494 )))
495 }
496
497 #[cfg(not(target_arch = "wasm32"))] {
498 use std::{io, fs, path::{Path, PathBuf}};
499
500 fn maybe_miniflare_d1_dir_of(dir: impl AsRef<Path>) -> PathBuf {
501 dir.as_ref()
502 .join(".wrangler")
503 .join("state")
504 .join("v3")
505 .join("d1")
506 .join("miniflare-D1DatabaseObject")
507 }
508
509 const PACKAGE_ROOT: &str = env!("CARGO_MANIFEST_DIR");
510
511 let (candidate_1, candidate_2) = (
512 maybe_miniflare_d1_dir_of(PACKAGE_ROOT),
513 maybe_miniflare_d1_dir_of(".")
514 );
515
516 let sqlite_path = (|| -> io::Result<PathBuf> {
517 let miniflare_d1_dir = match (
518 fs::exists(&candidate_1),
519 fs::exists(&candidate_2)
520 ) {
521 (Ok(true), _) => candidate_1,
522 (_, Ok(true)) => candidate_2,
523 (Err(e), _) | (_, Err(e)) => return Err(e),
524 (Ok(false), Ok(false)) => return Err(io::Error::new(
525 io::ErrorKind::NotFound,
526 "miniflare's D1 emulating directory not found"
527 )),
528 };
529
530 let [sqlite_path] = fs::read_dir(miniflare_d1_dir)?
531 .filter_map(|r| r.as_ref().ok().and_then(|e| {
532 let path = e.path();
533 path.extension()
534 .is_some_and(|ex| ex == "sqlite")
535 .then_some(path)
536 }))
537 .collect::<Vec<_>>()
538 .try_into()
539 .map_err(|_| io::Error::new(
540 io::ErrorKind::Other,
541 "Currently, sqlx_d1 doesn't support multiple D1 bindings!"
542 ))?;
543
544 Ok(sqlite_path)
545 })().map_err(|_| sqlx_core::Error::WorkerCrashed)?;
546
547 Ok(Self {
548 pragmas: TogglePragmas::new(),
549 sqlite_path
550 })
551 }
552 }
553 }
554
555 impl sqlx_core::connection::ConnectOptions for D1ConnectOptions {
556 type Connection = D1Connection;
557
558 fn from_url(_url: &Url) -> Result<Self, sqlx_core::Error> {
559 #[cfg(target_arch = "wasm32")] {
560 Err(sqlx_core::Error::Configuration(From::from(
561 URL_CONVERSION_UNSUPPORTED_MESSAGE
562 )))
563 }
564 #[cfg(not(target_arch = "wasm32"))] {
565 _url.as_str().parse()
566 }
567 }
568
569 fn to_url_lossy(&self) -> Url {
570 unreachable!("`sqlx_d1::ConnectOptions` doesn't support `ConnectOptions::to_url_lossy`")
571 }
572
573 fn connect(&self) -> crate::ResultFuture<'_, Self::Connection>
574 where
575 Self::Connection: Sized,
576 {
577 #[cfg(target_arch = "wasm32")] {
578 Box::pin(worker::send::SendFuture::new(async move {
579 <Self>::connect(self.clone()).await
580 .map_err(|e| sqlx_core::Error::Database(Box::new(e)))
581 }))
582 }
583
584 #[cfg(not(target_arch = "wasm32"))] {
585 Box::pin(async move {
586 use sqlx_core::{connection::Connection, executor::Executor};
587
588 let mut sqlite_conn = sqlx_sqlite::SqliteConnection::connect(
589 self.sqlite_path.to_str().ok_or(sqlx_core::Error::WorkerCrashed)?
590 ).await?;
591
592 if let Some(pragmas) = self.pragmas.collect() {
593 for pragma in pragmas {
594 sqlite_conn.execute(pragma).await?;
595 }
596 }
597
598 Ok(D1Connection { inner: sqlite_conn })
599 })
600 }
601 }
602
603 fn log_statements(self, _: log::LevelFilter) -> Self {
604 unreachable!("{LOG_SETTINGS_UNSUPPORTED_MESSAGE}")
605 }
606
607 fn log_slow_statements(self, _: log::LevelFilter, _: std::time::Duration) -> Self {
608 unreachable!("{LOG_SETTINGS_UNSUPPORTED_MESSAGE}")
609 }
610 }
611};
612
613#[derive(Clone, Copy)]
615struct TogglePragmas(u8);
616const _: () = {
617 impl std::ops::Not for TogglePragmas {
618 type Output = Self;
619 fn not(self) -> Self::Output {
620 Self(!self.0)
621 }
622 }
623 impl std::ops::BitOrAssign for TogglePragmas {
624 fn bitor_assign(&mut self, rhs: Self) {
625 self.0 |= self.0 | rhs.0;
626 }
627 }
628 impl std::ops::BitAndAssign for TogglePragmas {
629 fn bitand_assign(&mut self, rhs: Self) {
630 self.0 &= self.0 & rhs.0;
631 }
632 }
633
634 impl TogglePragmas {
635 const fn new() -> Self {
636 Self(0)
637 }
638 }
639};
640
641macro_rules! toggles {
642 ($( $name:ident as $bits:literal; )*) => {
643 impl TogglePragmas {
644 $(
645 #[allow(non_upper_case_globals)]
646 const $name: Self = Self($bits);
647 )*
648
649 fn collect(&self) -> Option<Vec<&'static str>> {
650 #[allow(unused_mut)]
651 let mut pragmas = Vec::new();
652 $(
653 if self.0 & Self::$name.0 != 0 {
654 pragmas.push(concat!(
655 "PRAGMA ",
656 stringify!($name),
657 " = on"
658 ));
659 }
660 )*
661 (!pragmas.is_empty()).then_some(pragmas)
662 }
663 }
664
665 impl std::fmt::Debug for TogglePragmas {
666 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
667 let mut f = &mut f.debug_map();
668 $(
669 f = f.entry(
670 &stringify!($name),
671 &if self.0 & Self::$name.0 != 0 {"on"} else {"off"}
672 );
673 )*
674 f.finish()
675 }
676 }
677
678 impl D1ConnectOptions {
679 $(
680 pub fn $name(mut self, yes: bool) -> Self {
681 if yes {
682 self.pragmas |= TogglePragmas::$name;
683 } else {
684 self.pragmas &= !TogglePragmas::$name;
685 }
686 self
687 }
688 )*
689 }
690 };
691}
692toggles! {
693 case_sensitive_like as 0b0000001;
694 ignore_check_constraint as 0b0000010;
695 legacy_alter_table as 0b0000100;
696 recursive_triggers as 0b0001000;
697 unordered_selects as 0b0010000;
698 foreign_keys as 0b0100000;
699 defer_foreign_keys as 0b1000000;
700}