1use crate::error::RXQLiteError;
2use crate::type_info::DataType;
3use crate::RXQLiteColumn;
4use crate::{
5 RXQLite, RXQLiteConnection, RXQLiteQueryResult, RXQLiteRow, RXQLiteStatement, RXQLiteTypeInfo,
6 RXQLiteValue,
7};
8use futures_core::future::BoxFuture;
9use futures_core::stream::BoxStream;
10use sqlx_core::describe::Describe;
12use sqlx_core::error::Error;
13use sqlx_core::executor::{Execute, Executor};
14use sqlx_core::ext::ustr::UStr;
15use sqlx_core::try_stream;
16use sqlx_core::Either;
17use rxqlite_common::Column;
18
19trait IntoRXQLitTypeInfo {
20 fn into_rxqlite_type_info(self)->RXQLiteTypeInfo;
21}
22
23impl IntoRXQLitTypeInfo for rxqlite_common::TypeInfo {
24 fn into_rxqlite_type_info(self)->RXQLiteTypeInfo {
25 match self {
26 Self::Null => RXQLiteTypeInfo(DataType::Null),
27 Self::Int => RXQLiteTypeInfo(DataType::Int),
28 Self::Float => RXQLiteTypeInfo(DataType::Float),
29 Self::Text => RXQLiteTypeInfo(DataType::Text),
30 Self::Blob => RXQLiteTypeInfo(DataType::Blob),
31 Self::Numeric => RXQLiteTypeInfo(DataType::Numeric),
32 Self::Bool => RXQLiteTypeInfo(DataType::Bool),
33 Self::Int64 => RXQLiteTypeInfo(DataType::Int64),
34 Self::Date => RXQLiteTypeInfo(DataType::Date),
35 Self::Time => RXQLiteTypeInfo(DataType::Time),
36 Self::DateTime => RXQLiteTypeInfo(DataType::Datetime),
37 }
38 }
39}
40
41impl IntoRXQLitTypeInfo for &rxqlite_common::TypeInfo {
42 fn into_rxqlite_type_info(self)->RXQLiteTypeInfo {
43 match self {
44 rxqlite_common::TypeInfo::Null => RXQLiteTypeInfo(DataType::Null),
45 rxqlite_common::TypeInfo::Int => RXQLiteTypeInfo(DataType::Int),
46 rxqlite_common::TypeInfo::Float => RXQLiteTypeInfo(DataType::Float),
47 rxqlite_common::TypeInfo::Text => RXQLiteTypeInfo(DataType::Text),
48 rxqlite_common::TypeInfo::Blob => RXQLiteTypeInfo(DataType::Blob),
49 rxqlite_common::TypeInfo::Numeric => RXQLiteTypeInfo(DataType::Numeric),
50 rxqlite_common::TypeInfo::Bool => RXQLiteTypeInfo(DataType::Bool),
51 rxqlite_common::TypeInfo::Int64 => RXQLiteTypeInfo(DataType::Int64),
52 rxqlite_common::TypeInfo::Date => RXQLiteTypeInfo(DataType::Date),
53 rxqlite_common::TypeInfo::Time => RXQLiteTypeInfo(DataType::Time),
54 rxqlite_common::TypeInfo::DateTime => RXQLiteTypeInfo(DataType::Datetime),
55 }
56 }
57}
58
59impl<'c> Executor<'c> for &'c mut RXQLiteConnection {
60 type Database = RXQLite;
61
62 fn fetch_many<'e, 'q: 'e, E: 'q>(
63 self,
64 mut query: E,
65 ) -> BoxStream<'e, Result<Either<RXQLiteQueryResult, RXQLiteRow>, Error>>
66 where
67 'c: 'e,
68 E: Execute<'q, Self::Database>,
69 {
70 let sql = query.sql();
71 let arguments = query.take_arguments();
72 Box::pin(try_stream! {
77 let result_or_rows = self.inner.fetch_many(sql, match arguments {
78 Some(arguments)=>arguments.values,
79 _=>vec![],
80 }).await;
81 match result_or_rows {
82 Ok(result_or_rows)=> {
83 let mut result_or_rows_iter=result_or_rows.into_iter();
87 while let Some(result_or_row) = result_or_rows_iter.next() {
88 match result_or_row {
89 Ok(result_or_row) => {
90 match result_or_row {
91 Either::Left(res)=> {
92 let res=Either::Left(RXQLiteQueryResult {
93 last_insert_rowid: res.last_insert_rowid,
94 changes: res.changes,
95 });
96 r#yield!(res);
97 }
98 Either::Right(row)=> {
99 let size = row.inner.len();
100 let mut values = Vec::with_capacity(size);
101 let mut columns = Vec::with_capacity(size);
102 let mut column_names: sqlx_core::HashMap<UStr,usize> = Default::default();
103 for (_i,col) in row.inner.into_iter().enumerate() {
104 let ordinal = col.ordinal;
105 let column_name= UStr::from(row.columns[ordinal as usize].name.to_string());
106
107 let column: &Column = &row.columns[ordinal as usize];
108 let rxqlite_type_info = (&column.type_info).into_rxqlite_type_info();
109 values.push(RXQLiteValue::new(col.value,rxqlite_type_info.clone()));
110 columns.push(RXQLiteColumn{
111 name : column_name.clone(),
112 ordinal: ordinal as _,
113 type_info: rxqlite_type_info,
114 });
115 column_names.insert(column_name,ordinal as _);
116 }
117 let row=Either::Right(RXQLiteRow {
118 values: values.into_boxed_slice(),
119 columns: columns.into(),
120 column_names: column_names.into(),
121 });
122 r#yield!(row);
123 }
124 }
125 }
126 Err(err)=>{
127 return Err(RXQLiteError{
128 inner: anyhow::anyhow!(err),
129 }.into());
130 }
131 }
132 }
133 Ok(())
134 }
135 Err(err)=> {
136 Err(RXQLiteError{
137 inner: err,
138 }.into())
139 }
140 }
141 })
142 }
143
144 fn fetch_optional<'e, 'q: 'e, E: 'q>(
145 self,
146 mut query: E,
147 ) -> BoxFuture<'e, Result<Option<RXQLiteRow>, Error>>
148 where
149 'c: 'e,
150 E: Execute<'q, Self::Database>,
151 {
152 let sql = query.sql();
153 let arguments = query.take_arguments();
154 Box::pin(async {
159 let row = self
160 .inner
161 .fetch_optional(
162 sql,
163 match arguments {
164 Some(arguments) => arguments.values,
165 _ => vec![],
166 },
167 )
168 .await;
169 match row {
170 Ok(row) => {
171 if let Some(row) = row {
176 let size = row.inner.len();
177 let mut values = Vec::with_capacity(size);
178 let mut columns = Vec::with_capacity(size);
179 let mut column_names: sqlx_core::HashMap<UStr,usize> = Default::default();
180 for (_i, col) in row.inner.into_iter().enumerate() {
181 let ordinal = col.ordinal;
182 let column_name= UStr::from(row.columns[ordinal as usize].name.to_string());
183 let column: &Column = &row.columns[ordinal as usize];
184 let rxqlite_type_info = (&column.type_info).into_rxqlite_type_info();
185 values.push(RXQLiteValue::new(col.value,rxqlite_type_info.clone()));
186 columns.push(RXQLiteColumn{
187 name : column_name.clone(),
188 ordinal: ordinal as _,
189 type_info: rxqlite_type_info,
190 });
191 column_names.insert(column_name,ordinal as _);
192
193
194 }
195 let row = RXQLiteRow {
196 values: values.into_boxed_slice(),
197 columns: columns.into(),
198 column_names: column_names.into(),
199 };
200 Ok(Some(row))
201 } else {
202 Ok(None)
203 }
204 }
205 Err(err) => Err(RXQLiteError { inner: err }.into()),
206 }
207 })
208 }
209
210 fn fetch_one<'e, 'q: 'e, E: 'q>(self, mut query: E) -> BoxFuture<'e, Result<RXQLiteRow, Error>>
211 where
212 'c: 'e,
213 E: Execute<'q, Self::Database>,
214 {
215 let sql = query.sql();
216 let arguments = query.take_arguments();
217 Box::pin(async {
222 let row = self
223 .inner
224 .fetch_one(
225 sql,
226 match arguments {
227 Some(arguments) => arguments.values,
228 _ => vec![],
229 },
230 )
231 .await;
232 match row {
233 Ok(row) => {
234 let size = row.inner.len();
235 let mut values = Vec::with_capacity(size);
236 let mut columns = Vec::with_capacity(size);
237 let mut column_names: sqlx_core::HashMap<UStr,usize> = Default::default();
238 for (_i, col) in row.inner.into_iter().enumerate() {
239 let ordinal = col.ordinal;
240 let column_name= UStr::from(row.columns[ordinal as usize].name.to_string());
241 let column: &Column = &row.columns[ordinal as usize];
242 let rxqlite_type_info = (&column.type_info).into_rxqlite_type_info();
243 values.push(RXQLiteValue::new(col.value,rxqlite_type_info.clone()));
244 columns.push(RXQLiteColumn{
245 name : column_name.clone(),
246 ordinal: ordinal as _,
247 type_info: rxqlite_type_info,
248 });
249 column_names.insert(column_name,ordinal as _);
250 }
251 let row = RXQLiteRow {
252 values: values.into_boxed_slice(),
253 columns: columns.into(),
254 column_names: column_names.into(),
255 };
256 Ok(row)
257 }
258 Err(err) => Err(RXQLiteError { inner: err }.into()),
259 }
260 })
261 }
262
263 fn prepare_with<'e, 'q: 'e>(
264 self,
265 _sql: &'q str,
266 _parameters: &[RXQLiteTypeInfo],
267 ) -> BoxFuture<'e, Result<RXQLiteStatement<'q>, Error>>
268 where
269 'c: 'e,
270 {
271 Box::pin(async {
272 Err(Error::Io(std::io::Error::new(
273 std::io::ErrorKind::Other,
274 "prepare_with not supported",
275 )))
276 })
277 }
278 #[doc(hidden)]
279 fn describe<'e, 'q: 'e>(self, _sql: &'q str) -> BoxFuture<'e, Result<Describe<RXQLite>, Error>>
280 where
281 'c: 'e,
282 {
283 Box::pin(async {
284 Err(Error::Io(std::io::Error::new(
285 std::io::ErrorKind::Other,
286 "describe not supported",
287 )))
288 })
289 }
290}