1use std::borrow::Cow;
2
3use crate::{
4 Either, Sqlite, SqliteArgumentValue, SqliteArguments, SqliteColumn, SqliteConnectOptions,
5 SqliteConnection, SqliteQueryResult, SqliteRow, SqliteTransactionManager, SqliteTypeInfo,
6};
7use futures_core::future::BoxFuture;
8use futures_core::stream::BoxStream;
9use futures_util::{StreamExt, TryFutureExt, TryStreamExt};
10
11use sqlx_core::any::{
12 Any, AnyArguments, AnyColumn, AnyConnectOptions, AnyConnectionBackend, AnyQueryResult, AnyRow,
13 AnyStatement, AnyTypeInfo, AnyTypeInfoKind, AnyValueKind,
14};
15
16use crate::type_info::DataType;
17use sqlx_core::connection::{ConnectOptions, Connection};
18use sqlx_core::database::Database;
19use sqlx_core::describe::Describe;
20use sqlx_core::executor::Executor;
21use sqlx_core::transaction::TransactionManager;
22use std::pin::pin;
23
24sqlx_core::declare_driver_with_optional_migrate!(DRIVER = Sqlite);
25
26impl AnyConnectionBackend for SqliteConnection {
27 fn name(&self) -> &str {
28 <Sqlite as Database>::NAME
29 }
30
31 fn close(self: Box<Self>) -> BoxFuture<'static, sqlx_core::Result<()>> {
32 Connection::close(*self)
33 }
34
35 fn close_hard(self: Box<Self>) -> BoxFuture<'static, sqlx_core::Result<()>> {
36 Connection::close_hard(*self)
37 }
38
39 fn ping(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
40 Connection::ping(self)
41 }
42
43 fn begin(
44 &mut self,
45 statement: Option<Cow<'static, str>>,
46 ) -> BoxFuture<'_, sqlx_core::Result<()>> {
47 SqliteTransactionManager::begin(self, statement)
48 }
49
50 fn commit(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
51 SqliteTransactionManager::commit(self)
52 }
53
54 fn rollback(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
55 SqliteTransactionManager::rollback(self)
56 }
57
58 fn start_rollback(&mut self) {
59 SqliteTransactionManager::start_rollback(self)
60 }
61
62 fn get_transaction_depth(&self) -> usize {
63 SqliteTransactionManager::get_transaction_depth(self)
64 }
65
66 fn shrink_buffers(&mut self) {
67 }
69
70 fn flush(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
71 Connection::flush(self)
72 }
73
74 fn should_flush(&self) -> bool {
75 Connection::should_flush(self)
76 }
77
78 #[cfg(feature = "migrate")]
79 fn as_migrate(
80 &mut self,
81 ) -> sqlx_core::Result<&mut (dyn sqlx_core::migrate::Migrate + Send + 'static)> {
82 Ok(self)
83 }
84
85 fn fetch_many<'q>(
86 &'q mut self,
87 query: &'q str,
88 persistent: bool,
89 arguments: Option<AnyArguments<'q>>,
90 ) -> BoxStream<'q, sqlx_core::Result<Either<AnyQueryResult, AnyRow>>> {
91 let persistent = persistent && arguments.is_some();
92 let args = arguments.map(map_arguments);
93
94 Box::pin(
95 self.worker
96 .execute(query, args, self.row_channel_size, persistent, None)
97 .map_ok(flume::Receiver::into_stream)
98 .try_flatten_stream()
99 .map(
100 move |res: sqlx_core::Result<Either<SqliteQueryResult, SqliteRow>>| match res? {
101 Either::Left(result) => Ok(Either::Left(map_result(result))),
102 Either::Right(row) => Ok(Either::Right(AnyRow::try_from(&row)?)),
103 },
104 ),
105 )
106 }
107
108 fn fetch_optional<'q>(
109 &'q mut self,
110 query: &'q str,
111 persistent: bool,
112 arguments: Option<AnyArguments<'q>>,
113 ) -> BoxFuture<'q, sqlx_core::Result<Option<AnyRow>>> {
114 let persistent = persistent && arguments.is_some();
115 let args = arguments.map(map_arguments);
116
117 Box::pin(async move {
118 let mut stream = pin!(
119 self.worker
120 .execute(query, args, self.row_channel_size, persistent, Some(1))
121 .map_ok(flume::Receiver::into_stream)
122 .await?
123 );
124
125 if let Some(Either::Right(row)) = stream.try_next().await? {
126 return Ok(Some(AnyRow::try_from(&row)?));
127 }
128
129 Ok(None)
130 })
131 }
132
133 fn prepare_with<'c, 'q: 'c>(
134 &'c mut self,
135 sql: &'q str,
136 _parameters: &[AnyTypeInfo],
137 ) -> BoxFuture<'c, sqlx_core::Result<AnyStatement<'q>>> {
138 Box::pin(async move {
139 let statement = Executor::prepare_with(self, sql, &[]).await?;
140 AnyStatement::try_from_statement(sql, &statement, statement.column_names.clone())
141 })
142 }
143
144 fn describe<'q>(&'q mut self, sql: &'q str) -> BoxFuture<'q, sqlx_core::Result<Describe<Any>>> {
145 Box::pin(async move { Executor::describe(self, sql).await?.try_into_any() })
146 }
147}
148
149impl<'a> TryFrom<&'a SqliteTypeInfo> for AnyTypeInfo {
150 type Error = sqlx_core::Error;
151
152 fn try_from(sqlite_type: &'a SqliteTypeInfo) -> Result<Self, Self::Error> {
153 Ok(AnyTypeInfo {
154 kind: match &sqlite_type.0 {
155 DataType::Null => AnyTypeInfoKind::Null,
156 DataType::Int4 => AnyTypeInfoKind::Integer,
157 DataType::Integer => AnyTypeInfoKind::BigInt,
158 DataType::Float => AnyTypeInfoKind::Double,
159 DataType::Blob => AnyTypeInfoKind::Blob,
160 DataType::Text => AnyTypeInfoKind::Text,
161 _ => {
162 return Err(sqlx_core::Error::AnyDriverError(
163 format!("Any driver does not support the SQLite type {sqlite_type:?}")
164 .into(),
165 ))
166 }
167 },
168 })
169 }
170}
171
172impl<'a> TryFrom<&'a SqliteColumn> for AnyColumn {
173 type Error = sqlx_core::Error;
174
175 fn try_from(col: &'a SqliteColumn) -> Result<Self, Self::Error> {
176 let type_info =
177 AnyTypeInfo::try_from(&col.type_info).map_err(|e| sqlx_core::Error::ColumnDecode {
178 index: col.name.to_string(),
179 source: e.into(),
180 })?;
181
182 Ok(AnyColumn {
183 ordinal: col.ordinal,
184 name: col.name.clone(),
185 type_info,
186 })
187 }
188}
189
190impl<'a> TryFrom<&'a SqliteRow> for AnyRow {
191 type Error = sqlx_core::Error;
192
193 fn try_from(row: &'a SqliteRow) -> Result<Self, Self::Error> {
194 AnyRow::map_from(row, row.column_names.clone())
195 }
196}
197
198impl<'a> TryFrom<&'a AnyConnectOptions> for SqliteConnectOptions {
199 type Error = sqlx_core::Error;
200
201 fn try_from(opts: &'a AnyConnectOptions) -> Result<Self, Self::Error> {
202 let mut opts_out = SqliteConnectOptions::from_url(&opts.database_url)?;
203 opts_out.log_settings = opts.log_settings.clone();
204 Ok(opts_out)
205 }
206}
207
208fn map_arguments(args: AnyArguments<'_>) -> SqliteArguments<'_> {
210 SqliteArguments {
211 values: args
212 .values
213 .0
214 .into_iter()
215 .map(|val| match val {
216 AnyValueKind::Null(_) => SqliteArgumentValue::Null,
217 AnyValueKind::Bool(b) => SqliteArgumentValue::Int(b as i32),
218 AnyValueKind::SmallInt(i) => SqliteArgumentValue::Int(i as i32),
219 AnyValueKind::Integer(i) => SqliteArgumentValue::Int(i),
220 AnyValueKind::BigInt(i) => SqliteArgumentValue::Int64(i),
221 AnyValueKind::Real(r) => SqliteArgumentValue::Double(r as f64),
222 AnyValueKind::Double(d) => SqliteArgumentValue::Double(d),
223 AnyValueKind::Text(t) => SqliteArgumentValue::Text(t),
224 AnyValueKind::Blob(b) => SqliteArgumentValue::Blob(b),
225 _ => unreachable!("BUG: missing mapping for {val:?}"),
227 })
228 .collect(),
229 }
230}
231
232fn map_result(res: SqliteQueryResult) -> AnyQueryResult {
233 AnyQueryResult {
234 rows_affected: res.rows_affected(),
235 last_insert_id: None,
236 }
237}