sqlx_sqlite/
any.rs

1use std::borrow::Cow;
2
3use crate::{
4    Either, Sqlite, SqliteArgumentValue, SqliteArguments, SqliteColumn, SqliteConnectOptions,
5    SqliteConnection, SqliteQueryResult, SqliteRow, SqliteTransactionManager, SqliteTypeInfo,
6};
7use futures_core::future::BoxFuture;
8use futures_core::stream::BoxStream;
9use futures_util::{StreamExt, TryFutureExt, TryStreamExt};
10
11use sqlx_core::any::{
12    Any, AnyArguments, AnyColumn, AnyConnectOptions, AnyConnectionBackend, AnyQueryResult, AnyRow,
13    AnyStatement, AnyTypeInfo, AnyTypeInfoKind, AnyValueKind,
14};
15
16use crate::type_info::DataType;
17use sqlx_core::connection::{ConnectOptions, Connection};
18use sqlx_core::database::Database;
19use sqlx_core::describe::Describe;
20use sqlx_core::executor::Executor;
21use sqlx_core::transaction::TransactionManager;
22use std::pin::pin;
23
24sqlx_core::declare_driver_with_optional_migrate!(DRIVER = Sqlite);
25
26impl AnyConnectionBackend for SqliteConnection {
27    fn name(&self) -> &str {
28        <Sqlite as Database>::NAME
29    }
30
31    fn close(self: Box<Self>) -> BoxFuture<'static, sqlx_core::Result<()>> {
32        Connection::close(*self)
33    }
34
35    fn close_hard(self: Box<Self>) -> BoxFuture<'static, sqlx_core::Result<()>> {
36        Connection::close_hard(*self)
37    }
38
39    fn ping(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
40        Connection::ping(self)
41    }
42
43    fn begin(
44        &mut self,
45        statement: Option<Cow<'static, str>>,
46    ) -> BoxFuture<'_, sqlx_core::Result<()>> {
47        SqliteTransactionManager::begin(self, statement)
48    }
49
50    fn commit(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
51        SqliteTransactionManager::commit(self)
52    }
53
54    fn rollback(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
55        SqliteTransactionManager::rollback(self)
56    }
57
58    fn start_rollback(&mut self) {
59        SqliteTransactionManager::start_rollback(self)
60    }
61
62    fn get_transaction_depth(&self) -> usize {
63        SqliteTransactionManager::get_transaction_depth(self)
64    }
65
66    fn shrink_buffers(&mut self) {
67        // NO-OP.
68    }
69
70    fn flush(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
71        Connection::flush(self)
72    }
73
74    fn should_flush(&self) -> bool {
75        Connection::should_flush(self)
76    }
77
78    #[cfg(feature = "migrate")]
79    fn as_migrate(
80        &mut self,
81    ) -> sqlx_core::Result<&mut (dyn sqlx_core::migrate::Migrate + Send + 'static)> {
82        Ok(self)
83    }
84
85    fn fetch_many<'q>(
86        &'q mut self,
87        query: &'q str,
88        persistent: bool,
89        arguments: Option<AnyArguments<'q>>,
90    ) -> BoxStream<'q, sqlx_core::Result<Either<AnyQueryResult, AnyRow>>> {
91        let persistent = persistent && arguments.is_some();
92        let args = arguments.map(map_arguments);
93
94        Box::pin(
95            self.worker
96                .execute(query, args, self.row_channel_size, persistent, None)
97                .map_ok(flume::Receiver::into_stream)
98                .try_flatten_stream()
99                .map(
100                    move |res: sqlx_core::Result<Either<SqliteQueryResult, SqliteRow>>| match res? {
101                        Either::Left(result) => Ok(Either::Left(map_result(result))),
102                        Either::Right(row) => Ok(Either::Right(AnyRow::try_from(&row)?)),
103                    },
104                ),
105        )
106    }
107
108    fn fetch_optional<'q>(
109        &'q mut self,
110        query: &'q str,
111        persistent: bool,
112        arguments: Option<AnyArguments<'q>>,
113    ) -> BoxFuture<'q, sqlx_core::Result<Option<AnyRow>>> {
114        let persistent = persistent && arguments.is_some();
115        let args = arguments.map(map_arguments);
116
117        Box::pin(async move {
118            let mut stream = pin!(
119                self.worker
120                    .execute(query, args, self.row_channel_size, persistent, Some(1))
121                    .map_ok(flume::Receiver::into_stream)
122                    .await?
123            );
124
125            if let Some(Either::Right(row)) = stream.try_next().await? {
126                return Ok(Some(AnyRow::try_from(&row)?));
127            }
128
129            Ok(None)
130        })
131    }
132
133    fn prepare_with<'c, 'q: 'c>(
134        &'c mut self,
135        sql: &'q str,
136        _parameters: &[AnyTypeInfo],
137    ) -> BoxFuture<'c, sqlx_core::Result<AnyStatement<'q>>> {
138        Box::pin(async move {
139            let statement = Executor::prepare_with(self, sql, &[]).await?;
140            AnyStatement::try_from_statement(sql, &statement, statement.column_names.clone())
141        })
142    }
143
144    fn describe<'q>(&'q mut self, sql: &'q str) -> BoxFuture<'q, sqlx_core::Result<Describe<Any>>> {
145        Box::pin(async move { Executor::describe(self, sql).await?.try_into_any() })
146    }
147}
148
149impl<'a> TryFrom<&'a SqliteTypeInfo> for AnyTypeInfo {
150    type Error = sqlx_core::Error;
151
152    fn try_from(sqlite_type: &'a SqliteTypeInfo) -> Result<Self, Self::Error> {
153        Ok(AnyTypeInfo {
154            kind: match &sqlite_type.0 {
155                DataType::Null => AnyTypeInfoKind::Null,
156                DataType::Int4 => AnyTypeInfoKind::Integer,
157                DataType::Integer => AnyTypeInfoKind::BigInt,
158                DataType::Float => AnyTypeInfoKind::Double,
159                DataType::Blob => AnyTypeInfoKind::Blob,
160                DataType::Text => AnyTypeInfoKind::Text,
161                _ => {
162                    return Err(sqlx_core::Error::AnyDriverError(
163                        format!("Any driver does not support the SQLite type {sqlite_type:?}")
164                            .into(),
165                    ))
166                }
167            },
168        })
169    }
170}
171
172impl<'a> TryFrom<&'a SqliteColumn> for AnyColumn {
173    type Error = sqlx_core::Error;
174
175    fn try_from(col: &'a SqliteColumn) -> Result<Self, Self::Error> {
176        let type_info =
177            AnyTypeInfo::try_from(&col.type_info).map_err(|e| sqlx_core::Error::ColumnDecode {
178                index: col.name.to_string(),
179                source: e.into(),
180            })?;
181
182        Ok(AnyColumn {
183            ordinal: col.ordinal,
184            name: col.name.clone(),
185            type_info,
186        })
187    }
188}
189
190impl<'a> TryFrom<&'a SqliteRow> for AnyRow {
191    type Error = sqlx_core::Error;
192
193    fn try_from(row: &'a SqliteRow) -> Result<Self, Self::Error> {
194        AnyRow::map_from(row, row.column_names.clone())
195    }
196}
197
198impl<'a> TryFrom<&'a AnyConnectOptions> for SqliteConnectOptions {
199    type Error = sqlx_core::Error;
200
201    fn try_from(opts: &'a AnyConnectOptions) -> Result<Self, Self::Error> {
202        let mut opts_out = SqliteConnectOptions::from_url(&opts.database_url)?;
203        opts_out.log_settings = opts.log_settings.clone();
204        Ok(opts_out)
205    }
206}
207
208/// Instead of `AnyArguments::convert_into()`, we can do a direct mapping and preserve the lifetime.
209fn map_arguments(args: AnyArguments<'_>) -> SqliteArguments<'_> {
210    SqliteArguments {
211        values: args
212            .values
213            .0
214            .into_iter()
215            .map(|val| match val {
216                AnyValueKind::Null(_) => SqliteArgumentValue::Null,
217                AnyValueKind::Bool(b) => SqliteArgumentValue::Int(b as i32),
218                AnyValueKind::SmallInt(i) => SqliteArgumentValue::Int(i as i32),
219                AnyValueKind::Integer(i) => SqliteArgumentValue::Int(i),
220                AnyValueKind::BigInt(i) => SqliteArgumentValue::Int64(i),
221                AnyValueKind::Real(r) => SqliteArgumentValue::Double(r as f64),
222                AnyValueKind::Double(d) => SqliteArgumentValue::Double(d),
223                AnyValueKind::Text(t) => SqliteArgumentValue::Text(t),
224                AnyValueKind::Blob(b) => SqliteArgumentValue::Blob(b),
225                // AnyValueKind is `#[non_exhaustive]` but we should have covered everything
226                _ => unreachable!("BUG: missing mapping for {val:?}"),
227            })
228            .collect(),
229    }
230}
231
232fn map_result(res: SqliteQueryResult) -> AnyQueryResult {
233    AnyQueryResult {
234        rows_affected: res.rows_affected(),
235        last_insert_id: None,
236    }
237}