1use sqlx_core::{Url, Either};
2
3#[cfg(target_arch = "wasm32")]
4use {
5 crate::{error::D1Error, row::D1Row},
6 std::pin::Pin,
7 worker::{wasm_bindgen::JsValue, wasm_bindgen_futures::JsFuture, js_sys},
8};
9
10pub struct D1Connection {
11 #[cfg(target_arch = "wasm32")]
12 pub(crate) inner: worker_sys::D1Database,
13
14 #[cfg(not(target_arch = "wasm32"))]
15 pub(crate) inner: sqlx_sqlite::SqliteConnection,
16}
17
18const _: () = {
19 unsafe impl Send for D1Connection {}
21 unsafe impl Sync for D1Connection {}
22
23 impl D1Connection {
24 #[cfg(target_arch = "wasm32")]
25 pub fn new(d1: worker::D1Database) -> Self {
26 Self { inner: unsafe {std::mem::transmute(d1)} }
27 }
28
29 #[cfg(not(target_arch = "wasm32"))]
30 pub async fn connect(url: impl AsRef<str>) -> Result<Self, sqlx_core::Error> {
31 <Self as sqlx_core::connection::Connection>::connect(url.as_ref()).await
32 }
33 }
34
35 #[cfg(target_arch = "wasm32")]
36 impl Clone for D1Connection {
37 fn clone(&self) -> Self {
38 Self { inner: self.inner.clone() }
39 }
40 }
41
42 impl std::fmt::Debug for D1Connection {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 f.debug_struct("D1Connection").finish()
45 }
46 }
47
48 impl sqlx_core::connection::Connection for D1Connection {
49 type Database = crate::D1;
50
51 type Options = D1ConnectOptions;
52
53 fn close(self) -> crate::ResultFuture<'static, ()> {
54 Box::pin(async {Ok(())})
55 }
56
57 fn close_hard(self) -> crate::ResultFuture<'static, ()> {
58 Box::pin(async {Ok(())})
59 }
60
61 fn ping(&mut self) -> crate::ResultFuture<'_, ()> {
62 Box::pin(async {Ok(())})
63 }
64
65 fn begin(&mut self) -> crate::ResultFuture<'_, sqlx_core::transaction::Transaction<'_, Self::Database>>
66 where
67 Self: Sized,
68 {
69 sqlx_core::transaction::Transaction::begin(self)
70 }
71
72 fn shrink_buffers(&mut self) {
73 }
75
76 fn flush(&mut self) -> crate::ResultFuture<'_, ()> {
77 Box::pin(async {Ok(())})
78 }
79
80 fn should_flush(&self) -> bool {
81 false
82 }
83 }
84
85 impl<'c> sqlx_core::executor::Executor<'c> for &'c mut D1Connection {
86 type Database = crate::D1;
87
88 fn fetch_many<'e, 'q: 'e, E>(
89 self,
90 #[allow(unused)]
91 mut query: E,
92 ) -> futures_core::stream::BoxStream<
93 'e,
94 Result<
95 Either<
96 <Self::Database as sqlx_core::database::Database>::QueryResult,
97 <Self::Database as sqlx_core::database::Database>::Row
98 >,
99 sqlx_core::Error,
100 >,
101 >
102 where
103 'c: 'e,
104 E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
105 {
106 #[cfg(not(target_arch = "wasm32"))] {
107 unreachable!("native `Executor` impl")
108 }
109 #[cfg(target_arch = "wasm32")] {
110 <&'c D1Connection as sqlx_core::executor::Executor<'c>>::fetch_many(self, query)
111 }
112 }
113
114 fn fetch_optional<'e, 'q: 'e, E>(
115 self,
116 #[allow(unused)]
117 mut query: E,
118 ) -> crate::ResultFuture<'e, Option<<Self::Database as sqlx_core::database::Database>::Row>>
119 where
120 'c: 'e,
121 E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
122 {
123 #[cfg(not(target_arch = "wasm32"))] {
124 unreachable!("native `Executor` impl")
125 }
126 #[cfg(target_arch = "wasm32")] {
127 <&'c D1Connection as sqlx_core::executor::Executor<'c>>::fetch_optional(self, query)
128 }
129 }
130
131 fn prepare_with<'e, 'q: 'e>(
132 self,
133 sql: &'q str,
134 _parameters: &'e [<Self::Database as sqlx_core::database::Database>::TypeInfo],
135 ) -> crate::ResultFuture<'e, <Self::Database as sqlx_core::database::Database>::Statement<'q>>
136 where
137 'c: 'e,
138 {
139 Box::pin(async {
140 Ok(crate::statement::D1Statement {
141 sql: std::borrow::Cow::Borrowed(sql),
142 })
143 })
144 }
145
146 fn describe<'e, 'q: 'e>(
147 self,
148 #[allow(unused)]
149 sql: &'q str,
150 ) -> crate::ResultFuture<'e, sqlx_core::describe::Describe<Self::Database>>
151 where
152 'c: 'e,
153 {
154 #[cfg(target_arch = "wasm32")] {
155 unreachable!("wasm32 describe")
156 }
157 #[cfg(not(target_arch = "wasm32"))] {
158 Box::pin(async {
161 let sqlx_core::describe::Describe {
162 columns,
163 parameters,
164 nullable
165 } = <&mut sqlx_sqlite::SqliteConnection as sqlx_core::executor::Executor>::describe(
166 &mut self.inner,
167 sql
168 ).await?;
169
170 Ok(sqlx_core::describe::Describe {
171 parameters: parameters.map(|ps| match ps {
172 Either::Left(type_infos) => Either::Left(type_infos.into_iter().map(crate::type_info::D1TypeInfo::from_sqlite).collect()),
173 Either::Right(n) => Either::Right(n)
174 }),
175 columns: columns.into_iter().map(crate::column::D1Column::from_sqlite).collect(),
176 nullable
177 })
178 })
179 }
180 }
181 }
182
183 #[cfg(target_arch = "wasm32")]
184 impl<'c> sqlx_core::executor::Executor<'c> for &'c D1Connection {
185 type Database = crate::D1;
186
187 fn fetch_many<'e, 'q: 'e, E>(
188 self,
189 #[allow(unused)]
190 mut query: E,
191 ) -> futures_core::stream::BoxStream<
192 'e,
193 Result<
194 Either<
195 <Self::Database as sqlx_core::database::Database>::QueryResult,
196 <Self::Database as sqlx_core::database::Database>::Row
197 >,
198 sqlx_core::Error,
199 >,
200 >
201 where
202 'c: 'e,
203 E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
204 {
205 let sql = query.sql();
206 let arguments = match query.take_arguments() {
207 Ok(a) => a,
208 Err(e) => return Box::pin(futures_util::stream::once(async {Err(sqlx_core::Error::Encode(e))})),
209 };
210
211 struct FetchMany<F> {
212 raw_rows_future: F,
213 raw_rows: Option<js_sys::ArrayIntoIter>,
214 }
215 const _: () = {
216 unsafe impl<F> Send for FetchMany<F> {}
218
219 impl<F> FetchMany<F> {
220 fn new(raw_rows_future: F) -> Self {
221 Self { raw_rows_future, raw_rows: None }
222 }
223 }
224
225 impl<F> futures_core::Stream for FetchMany<F>
226 where
227 F: Future<Output = Result<Option<js_sys::Array>, JsValue>>,
228 {
229 type Item = Result<
230 Either<crate::query_result::D1QueryResult, D1Row>,
231 sqlx_core::Error
232 >;
233
234 fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
235 use std::task::Poll;
236
237 fn pop_next(raw_rows: &mut js_sys::ArrayIntoIter) ->
238 Option<Result<
239 Either<crate::query_result::D1QueryResult, D1Row>,
240 sqlx_core::Error
241 >>
242 {
243 let raw_row = raw_rows.next()?;
244 Some(D1Row::from_raw(raw_row).map(Either::Right))
245 }
246
247 let this = unsafe {self.get_unchecked_mut()};
248 match &mut this.raw_rows {
249 Some(raw_rows) => Poll::Ready(pop_next(raw_rows)),
250 None => match unsafe {Pin::new_unchecked(&mut this.raw_rows_future)}.poll(cx) {
251 Poll::Pending => Poll::Pending,
252 Poll::Ready(Err(e)) => Poll::Ready(Some(Err(
253 sqlx_core::Error::from(D1Error::from(e))
254 ))),
255 Poll::Ready(Ok(maybe_raw_rows)) => {
256 this.raw_rows = Some(maybe_raw_rows.unwrap_or_else(js_sys::Array::new).into_iter());
257 Poll::Ready(pop_next(unsafe {this.raw_rows.as_mut().unwrap_unchecked()}))
258 }
259 }
260 }
261 }
262 }
263 };
264
265 Box::pin(FetchMany::new(async move {
266 let mut statement = self.inner.prepare(sql).unwrap();
267 if let Some(a) = arguments {
268 statement = statement.bind(a.as_ref().iter().collect())?;
269 }
270
271 let d1_result_jsvalue = JsFuture::from(statement.all()?)
272 .await?;
273 worker_sys::D1Result::from(d1_result_jsvalue)
274 .results()
275 }))
276 }
277
278 fn fetch_optional<'e, 'q: 'e, E>(
279 self,
280 #[allow(unused)]
281 mut query: E,
282 ) -> crate::ResultFuture<'e, Option<<Self::Database as sqlx_core::database::Database>::Row>>
283 where
284 'c: 'e,
285 E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
286 {
287 let sql = query.sql();
288 let arguments = match query.take_arguments() {
289 Ok(a) => a,
290 Err(e) => return Box::pin(async {Err(sqlx_core::Error::Encode(e))}),
291 };
292
293 Box::pin(worker::send::SendFuture::new(async move {
294 let mut statement = self.inner.prepare(sql).unwrap();
295 if let Some(a) = arguments {
296 statement = statement
297 .bind(a.as_ref().iter().collect())
298 .map_err(|e| sqlx_core::Error::Encode(Box::new(D1Error::from(e))))?;
299 }
300
301 let raw = JsFuture::from(statement.first(None).map_err(D1Error::from)?)
302 .await
303 .map_err(D1Error::from)?;
304 if raw.is_null() {
305 Ok(None)
306 } else {
307 D1Row::from_raw(raw).map(Some)
308 }
309 }))
310 }
311
312 fn prepare_with<'e, 'q: 'e>(
313 self,
314 sql: &'q str,
315 _parameters: &'e [<Self::Database as sqlx_core::database::Database>::TypeInfo],
316 ) -> crate::ResultFuture<'e, <Self::Database as sqlx_core::database::Database>::Statement<'q>>
317 where
318 'c: 'e,
319 {
320 Box::pin(async {
321 Ok(crate::statement::D1Statement {
322 sql: std::borrow::Cow::Borrowed(sql),
323 })
324 })
325 }
326
327 fn describe<'e, 'q: 'e>(
328 self,
329 #[allow(unused)]
330 sql: &'q str,
331 ) -> crate::ResultFuture<'e, sqlx_core::describe::Describe<Self::Database>>
332 where
333 'c: 'e,
334 {
335 unreachable!("wasm32 describe")
336 }
337 }
338};
339
340#[derive(Clone)]
342pub struct D1ConnectOptions {
343 pragmas: TogglePragmas,
344 #[cfg(target_arch = "wasm32")]
345 d1: worker_sys::D1Database,
346 #[cfg(not(target_arch = "wasm32"))]
347 sqlite_path: std::path::PathBuf,
348}
349const _: () = {
350 unsafe impl Send for D1ConnectOptions {}
352 unsafe impl Sync for D1ConnectOptions {}
353
354 #[cfg(target_arch = "wasm32")]
355 const URL_CONVERSION_UNSUPPORTED_MESSAGE: &'static str = "\
356 `sqlx_d1::D1ConnectOptions` doesn't support conversion between `Url`. \
357 Consider connect from options created by `D1ConnectOptions::new`. \
358 ";
359
360 const LOG_SETTINGS_UNSUPPORTED_MESSAGE: &'static str = "\
361 `sqlx_d1::D1ConnectOptions` doesn't support log settings.
362 ";
363
364 impl D1ConnectOptions {
365 #[cfg(target_arch = "wasm32")]
366 pub fn new(d1: worker::D1Database) -> Self {
367 Self {
368 d1: unsafe {core::mem::transmute(d1)},
369 pragmas: TogglePragmas::new(),
370 }
371 }
372 }
373
374 impl std::fmt::Debug for D1ConnectOptions {
375 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
376 f.debug_struct("D1ConnectOptions")
377 .field("pragmas", &self.pragmas)
378 .finish()
379 }
380 }
381
382 impl std::str::FromStr for D1ConnectOptions {
383 type Err = sqlx_core::Error;
384
385 fn from_str(_: &str) -> Result<Self, Self::Err> {
386 #[cfg(target_arch = "wasm32")] {
387 Err(sqlx_core::Error::Configuration(From::from(
388 URL_CONVERSION_UNSUPPORTED_MESSAGE
389 )))
390 }
391
392 #[cfg(not(target_arch = "wasm32"))] {
393 use std::{io, fs, path::{Path, PathBuf}};
394
395 fn maybe_miniflare_d1_dir_of(dir: impl AsRef<Path>) -> PathBuf {
396 dir.as_ref()
397 .join(".wrangler")
398 .join("state")
399 .join("v3")
400 .join("d1")
401 .join("miniflare-D1DatabaseObject")
402 }
403
404 const PACKAGE_ROOT: &str = env!("CARGO_MANIFEST_DIR");
405
406 let (candidate_1, candidate_2) = (
407 maybe_miniflare_d1_dir_of(PACKAGE_ROOT),
408 maybe_miniflare_d1_dir_of(".")
409 );
410
411 let sqlite_path = (|| -> io::Result<PathBuf> {
412 let miniflare_d1_dir = match (
413 fs::exists(&candidate_1),
414 fs::exists(&candidate_2)
415 ) {
416 (Ok(true), _) => candidate_1,
417 (_, Ok(true)) => candidate_2,
418 (Err(e), _) | (_, Err(e)) => return Err(e),
419 (Ok(false), Ok(false)) => return Err(io::Error::new(
420 io::ErrorKind::NotFound,
421 "miniflare's D1 emulating directory not found"
422 )),
423 };
424
425 let [sqlite_path] = fs::read_dir(miniflare_d1_dir)?
426 .filter_map(|r| r.as_ref().ok().and_then(|e| {
427 let path = e.path();
428 path.extension()
429 .is_some_and(|ex| ex == "sqlite")
430 .then_some(path)
431 }))
432 .collect::<Vec<_>>()
433 .try_into()
434 .map_err(|_| io::Error::new(
435 io::ErrorKind::Other,
436 "Currently, sqlx_d1 doesn't support multiple D1 bindings!"
437 ))?;
438
439 Ok(sqlite_path)
440 })().map_err(|_| sqlx_core::Error::WorkerCrashed)?;
441
442 Ok(Self {
443 pragmas: TogglePragmas::new(),
444 sqlite_path
445 })
446 }
447 }
448 }
449
450 impl sqlx_core::connection::ConnectOptions for D1ConnectOptions {
451 type Connection = D1Connection;
452
453 fn from_url(_url: &Url) -> Result<Self, sqlx_core::Error> {
454 #[cfg(target_arch = "wasm32")] {
455 Err(sqlx_core::Error::Configuration(From::from(
456 URL_CONVERSION_UNSUPPORTED_MESSAGE
457 )))
458 }
459 #[cfg(not(target_arch = "wasm32"))] {
460 _url.as_str().parse()
461 }
462 }
463
464 fn to_url_lossy(&self) -> Url {
465 unreachable!("`sqlx_d1::ConnectOptions` doesn't support `ConnectOptions::to_url_lossy`")
466 }
467
468 fn connect(&self) -> crate::ResultFuture<'_, Self::Connection>
469 where
470 Self::Connection: Sized,
471 {
472 #[cfg(target_arch = "wasm32")] {
473 Box::pin(worker::send::SendFuture::new(async move {
474 let d1 = self.d1.clone();
475
476 if let Some(pragmas) = self.pragmas.collect() {
477 JsFuture::from(d1.exec(&pragmas.join("\n")).map_err(D1Error::from)?)
478 .await
479 .map_err(D1Error::from)?;
480 }
481
482 Ok(D1Connection {
483 inner: d1
484 })
485 }))
486 }
487
488 #[cfg(not(target_arch = "wasm32"))] {
489 Box::pin(async move {
490 use sqlx_core::{connection::Connection, executor::Executor};
491
492 let mut sqlite_conn = sqlx_sqlite::SqliteConnection::connect(
493 self.sqlite_path.to_str().ok_or(sqlx_core::Error::WorkerCrashed)?
494 ).await?;
495
496 if let Some(pragmas) = self.pragmas.collect() {
497 for pragma in pragmas {
498 sqlite_conn.execute(pragma).await?;
499 }
500 }
501
502 Ok(D1Connection { inner: sqlite_conn })
503 })
504 }
505 }
506
507 fn log_statements(self, _: log::LevelFilter) -> Self {
508 unreachable!("{LOG_SETTINGS_UNSUPPORTED_MESSAGE}")
509 }
510
511 fn log_slow_statements(self, _: log::LevelFilter, _: std::time::Duration) -> Self {
512 unreachable!("{LOG_SETTINGS_UNSUPPORTED_MESSAGE}")
513 }
514 }
515};
516
517#[derive(Clone, Copy)]
519struct TogglePragmas(u8);
520const _: () = {
521 impl std::ops::Not for TogglePragmas {
522 type Output = Self;
523 fn not(self) -> Self::Output {
524 Self(!self.0)
525 }
526 }
527 impl std::ops::BitOrAssign for TogglePragmas {
528 fn bitor_assign(&mut self, rhs: Self) {
529 self.0 |= self.0 | rhs.0;
530 }
531 }
532 impl std::ops::BitAndAssign for TogglePragmas {
533 fn bitand_assign(&mut self, rhs: Self) {
534 self.0 &= self.0 & rhs.0;
535 }
536 }
537
538 impl TogglePragmas {
539 const fn new() -> Self {
540 Self(0)
541 }
542 }
543};
544
545macro_rules! toggles {
546 ($( $name:ident as $bits:literal; )*) => {
547 impl TogglePragmas {
548 $(
549 #[allow(non_upper_case_globals)]
550 const $name: Self = Self($bits);
551 )*
552
553 fn collect(&self) -> Option<Vec<&'static str>> {
554 #[allow(unused_mut)]
555 let mut pragmas = Vec::new();
556 $(
557 if self.0 & Self::$name.0 != 0 {
558 pragmas.push(concat!(
559 "PRAGMA ",
560 stringify!($name),
561 " = on"
562 ));
563 }
564 )*
565 (!pragmas.is_empty()).then_some(pragmas)
566 }
567 }
568
569 impl std::fmt::Debug for TogglePragmas {
570 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
571 let mut f = &mut f.debug_map();
572 $(
573 f = f.entry(
574 &stringify!($name),
575 &if self.0 & Self::$name.0 != 0 {"on"} else {"off"}
576 );
577 )*
578 f.finish()
579 }
580 }
581
582 impl D1ConnectOptions {
583 $(
584 pub fn $name(mut self, yes: bool) -> Self {
585 if yes {
586 self.pragmas |= TogglePragmas::$name;
587 } else {
588 self.pragmas &= !TogglePragmas::$name;
589 }
590 self
591 }
592 )*
593 }
594 };
595}
596toggles! {
597 case_sensitive_like as 0b0000001;
598 ignore_check_constraint as 0b0000010;
599 legacy_alter_table as 0b0000100;
600 recursive_triggers as 0b0001000;
601 unordered_selects as 0b0010000;
602 foreign_keys as 0b0100000;
603 defer_foreign_keys as 0b1000000;
604}