sqlx_rxqlite/connection/
executor.rs

1use crate::error::RXQLiteError;
2use crate::type_info::DataType;
3use crate::RXQLiteColumn;
4use crate::{
5    RXQLite, RXQLiteConnection, RXQLiteQueryResult, RXQLiteRow, RXQLiteStatement, RXQLiteTypeInfo,
6    RXQLiteValue,
7};
8use futures_core::future::BoxFuture;
9use futures_core::stream::BoxStream;
10//use futures_util::TryStreamExt;
11use sqlx_core::describe::Describe;
12use sqlx_core::error::Error;
13use sqlx_core::executor::{Execute, Executor};
14use sqlx_core::ext::ustr::UStr;
15use sqlx_core::try_stream;
16use sqlx_core::Either;
17use rxqlite_common::Column;
18
19trait IntoRXQLitTypeInfo {
20  fn into_rxqlite_type_info(self)->RXQLiteTypeInfo;
21}
22
23impl IntoRXQLitTypeInfo for rxqlite_common::TypeInfo {
24  fn into_rxqlite_type_info(self)->RXQLiteTypeInfo {
25    match self {
26      Self::Null => RXQLiteTypeInfo(DataType::Null),
27      Self::Int => RXQLiteTypeInfo(DataType::Int),
28      Self::Float => RXQLiteTypeInfo(DataType::Float),
29      Self::Text => RXQLiteTypeInfo(DataType::Text),
30      Self::Blob => RXQLiteTypeInfo(DataType::Blob),
31      Self::Numeric => RXQLiteTypeInfo(DataType::Numeric),
32      Self::Bool => RXQLiteTypeInfo(DataType::Bool),
33      Self::Int64 => RXQLiteTypeInfo(DataType::Int64),
34      Self::Date => RXQLiteTypeInfo(DataType::Date),
35      Self::Time => RXQLiteTypeInfo(DataType::Time),
36      Self::DateTime => RXQLiteTypeInfo(DataType::Datetime),
37    }
38  }
39}
40
41impl IntoRXQLitTypeInfo for &rxqlite_common::TypeInfo {
42  fn into_rxqlite_type_info(self)->RXQLiteTypeInfo {
43    match self {
44      rxqlite_common::TypeInfo::Null => RXQLiteTypeInfo(DataType::Null),
45      rxqlite_common::TypeInfo::Int => RXQLiteTypeInfo(DataType::Int),
46      rxqlite_common::TypeInfo::Float => RXQLiteTypeInfo(DataType::Float),
47      rxqlite_common::TypeInfo::Text => RXQLiteTypeInfo(DataType::Text),
48      rxqlite_common::TypeInfo::Blob => RXQLiteTypeInfo(DataType::Blob),
49      rxqlite_common::TypeInfo::Numeric => RXQLiteTypeInfo(DataType::Numeric),
50      rxqlite_common::TypeInfo::Bool => RXQLiteTypeInfo(DataType::Bool),
51      rxqlite_common::TypeInfo::Int64 => RXQLiteTypeInfo(DataType::Int64),
52      rxqlite_common::TypeInfo::Date => RXQLiteTypeInfo(DataType::Date),
53      rxqlite_common::TypeInfo::Time => RXQLiteTypeInfo(DataType::Time),
54      rxqlite_common::TypeInfo::DateTime => RXQLiteTypeInfo(DataType::Datetime),
55    }
56  }
57}
58
59impl<'c> Executor<'c> for &'c mut RXQLiteConnection {
60    type Database = RXQLite;
61
62    fn fetch_many<'e, 'q: 'e, E: 'q>(
63        self,
64        mut query: E,
65    ) -> BoxStream<'e, Result<Either<RXQLiteQueryResult, RXQLiteRow>, Error>>
66    where
67        'c: 'e,
68        E: Execute<'q, Self::Database>,
69    {
70        let sql = query.sql();
71        let arguments = query.take_arguments();
72        //let persistent = query.persistent() && arguments.is_some();
73
74        //let args = Vec::with_capacity(arguments.len());
75
76        Box::pin(try_stream! {
77          let result_or_rows = self.inner.fetch_many(sql, match arguments {
78            Some(arguments)=>arguments.values,
79            _=>vec![],
80          }).await;
81          match result_or_rows {
82            Ok(result_or_rows)=> {
83              //println!("{}:({})",file!(),line!());
84
85              //pin_mut!(cursor);
86              let mut result_or_rows_iter=result_or_rows.into_iter();
87              while let Some(result_or_row) = result_or_rows_iter.next() {
88                match result_or_row {
89                  Ok(result_or_row) => {
90                    match result_or_row {
91                      Either::Left(res)=> {
92                        let res=Either::Left(RXQLiteQueryResult {
93                          last_insert_rowid: res.last_insert_rowid,
94                          changes: res.changes,
95                        });
96                        r#yield!(res);
97                      }
98                      Either::Right(row)=> {
99                        let size = row.inner.len();
100                        let mut values = Vec::with_capacity(size);
101                        let mut columns = Vec::with_capacity(size);
102                        let mut column_names: sqlx_core::HashMap<UStr,usize> = Default::default();
103                        for (_i,col) in row.inner.into_iter().enumerate() {
104                          let ordinal = col.ordinal;
105                          let column_name= UStr::from(row.columns[ordinal as usize].name.to_string());
106                          
107                          let column: &Column = &row.columns[ordinal as usize];
108                          let rxqlite_type_info = (&column.type_info).into_rxqlite_type_info();
109                          values.push(RXQLiteValue::new(col.value,rxqlite_type_info.clone()));
110                          columns.push(RXQLiteColumn{
111                            name : column_name.clone(),
112                            ordinal: ordinal as _,
113                            type_info: rxqlite_type_info,
114                          });
115                          column_names.insert(column_name,ordinal as _);
116                        }
117                        let row=Either::Right(RXQLiteRow {
118                          values: values.into_boxed_slice(),
119                          columns: columns.into(),
120                          column_names: column_names.into(),
121                        });
122                        r#yield!(row);
123                      }
124                    }
125                  }
126                  Err(err)=>{
127                    return Err(RXQLiteError{
128                      inner: anyhow::anyhow!(err),
129                    }.into());
130                  }
131                }
132              }
133              Ok(())
134            }
135            Err(err)=> {
136              Err(RXQLiteError{
137                inner: err,
138              }.into())
139            }
140          }
141        })
142    }
143
144    fn fetch_optional<'e, 'q: 'e, E: 'q>(
145        self,
146        mut query: E,
147    ) -> BoxFuture<'e, Result<Option<RXQLiteRow>, Error>>
148    where
149        'c: 'e,
150        E: Execute<'q, Self::Database>,
151    {
152        let sql = query.sql();
153        let arguments = query.take_arguments();
154        //let persistent = query.persistent() && arguments.is_some();
155
156        //let args = Vec::with_capacity(arguments.len());
157
158        Box::pin(async {
159            let row = self
160                .inner
161                .fetch_optional(
162                    sql,
163                    match arguments {
164                        Some(arguments) => arguments.values,
165                        _ => vec![],
166                    },
167                )
168                .await;
169            match row {
170                Ok(row) => {
171                    //println!("{}:({})",file!(),line!());
172
173                    //pin_mut!(cursor);
174
175                    if let Some(row) = row {
176                        let size = row.inner.len();
177                        let mut values = Vec::with_capacity(size);
178                        let mut columns = Vec::with_capacity(size);
179                        let mut column_names: sqlx_core::HashMap<UStr,usize> = Default::default();
180                        for (_i, col) in row.inner.into_iter().enumerate() {
181                          let ordinal = col.ordinal;
182                          let column_name= UStr::from(row.columns[ordinal as usize].name.to_string());
183                          let column: &Column = &row.columns[ordinal as usize];
184                          let rxqlite_type_info = (&column.type_info).into_rxqlite_type_info();
185                          values.push(RXQLiteValue::new(col.value,rxqlite_type_info.clone()));
186                          columns.push(RXQLiteColumn{
187                            name : column_name.clone(),
188                            ordinal: ordinal as _,
189                            type_info: rxqlite_type_info,
190                          });
191                          column_names.insert(column_name,ordinal as _);
192                          
193                          
194                        }
195                        let row = RXQLiteRow {
196                            values: values.into_boxed_slice(),
197                            columns: columns.into(),
198                            column_names: column_names.into(),
199                        };
200                        Ok(Some(row))
201                    } else {
202                        Ok(None)
203                    }
204                }
205                Err(err) => Err(RXQLiteError { inner: err }.into()),
206            }
207        })
208    }
209
210    fn fetch_one<'e, 'q: 'e, E: 'q>(self, mut query: E) -> BoxFuture<'e, Result<RXQLiteRow, Error>>
211    where
212        'c: 'e,
213        E: Execute<'q, Self::Database>,
214    {
215        let sql = query.sql();
216        let arguments = query.take_arguments();
217        //let persistent = query.persistent() && arguments.is_some();
218
219        //let args = Vec::with_capacity(arguments.len());
220
221        Box::pin(async {
222            let row = self
223                .inner
224                .fetch_one(
225                    sql,
226                    match arguments {
227                        Some(arguments) => arguments.values,
228                        _ => vec![],
229                    },
230                )
231                .await;
232            match row {
233                Ok(row) => {
234                    let size = row.inner.len();
235                    let mut values = Vec::with_capacity(size);
236                    let mut columns = Vec::with_capacity(size);
237                    let mut column_names: sqlx_core::HashMap<UStr,usize> = Default::default();
238                    for (_i, col) in row.inner.into_iter().enumerate() {
239                        let ordinal = col.ordinal;
240                        let column_name= UStr::from(row.columns[ordinal as usize].name.to_string());
241                          let column: &Column = &row.columns[ordinal as usize];
242                          let rxqlite_type_info = (&column.type_info).into_rxqlite_type_info();
243                          values.push(RXQLiteValue::new(col.value,rxqlite_type_info.clone()));
244                          columns.push(RXQLiteColumn{
245                            name : column_name.clone(),
246                            ordinal: ordinal as _,
247                            type_info: rxqlite_type_info,
248                          });
249                          column_names.insert(column_name,ordinal as _);
250                    }
251                    let row = RXQLiteRow {
252                        values: values.into_boxed_slice(),
253                        columns: columns.into(),
254                        column_names: column_names.into(),
255                    };
256                    Ok(row)
257                }
258                Err(err) => Err(RXQLiteError { inner: err }.into()),
259            }
260        })
261    }
262
263    fn prepare_with<'e, 'q: 'e>(
264        self,
265        _sql: &'q str,
266        _parameters: &[RXQLiteTypeInfo],
267    ) -> BoxFuture<'e, Result<RXQLiteStatement<'q>, Error>>
268    where
269        'c: 'e,
270    {
271        Box::pin(async {
272            Err(Error::Io(std::io::Error::new(
273                std::io::ErrorKind::Other,
274                "prepare_with not supported",
275            )))
276        })
277    }
278    #[doc(hidden)]
279    fn describe<'e, 'q: 'e>(self, _sql: &'q str) -> BoxFuture<'e, Result<Describe<RXQLite>, Error>>
280    where
281        'c: 'e,
282    {
283        Box::pin(async {
284            Err(Error::Io(std::io::Error::new(
285                std::io::ErrorKind::Other,
286                "describe not supported",
287            )))
288        })
289    }
290}