sqlx_rqlite/connection/
executor.rs1use crate::error::RqliteError;
2use crate::type_info::DataType;
3use crate::RqliteColumn;
4use crate::{
5 Rqlite, RqliteConnection, RqliteQueryResult, RqliteRow, RqliteStatement, RqliteTypeInfo,
6 RqliteValue,
7};
8use futures_core::future::BoxFuture;
9use futures_core::stream::BoxStream;
10use futures_util::pin_mut;
11use futures_util::TryStreamExt;
12use sqlx_core::describe::Describe;
13use sqlx_core::error::Error;
14use sqlx_core::executor::{Execute, Executor};
15use sqlx_core::ext::ustr::UStr;
16use sqlx_core::try_stream;
17use sqlx_core::Either;
18
19impl<'c> Executor<'c> for &'c mut RqliteConnection {
20 type Database = Rqlite;
21
22 fn fetch_many<'e, 'q: 'e, E: 'q>(
23 self,
24 mut query: E,
25 ) -> BoxStream<'e, Result<Either<RqliteQueryResult, RqliteRow>, Error>>
26 where
27 'c: 'e,
28 E: Execute<'q, Self::Database>,
29 {
30 let sql = query.sql();
31 let arguments = query.take_arguments();
32 Box::pin(try_stream! {
37 let cursor = self.inner.execute(sql, match arguments {
38 Some(arguments)=>arguments.values,
39 _=>vec![],
40 }).await;
41 match cursor {
42 Ok(cursor)=> {
43 pin_mut!(cursor);
46
47 while let Some(row) = cursor.next_row() {
48 let size = row.column_count();
49 let mut values = Vec::with_capacity(size);
50 let mut columns = Vec::with_capacity(size);
51 for (i,value) in row.row.into_iter().enumerate() {
53 values.push(RqliteValue::new(value,RqliteTypeInfo(DataType::Null)));
54 columns.push(RqliteColumn{
55 name : UStr::from(""),
56 ordinal: i,
57 type_info: RqliteTypeInfo(DataType::Null),
58 });
59 }
60 let row=Either::Right(RqliteRow {
61 values: values.into_boxed_slice(),
62 columns: columns.into(),
63 column_names : Default::default(),
64 });
65 r#yield!(row);
66 }
67 Ok(())
68 }
69 Err(err)=> {
70 Err(RqliteError{
71 inner: err,
72 }.into())
73 }
74 }
75 })
76 }
77
78 fn fetch_optional<'e, 'q: 'e, E: 'q>(
79 self,
80 query: E,
81 ) -> BoxFuture<'e, Result<Option<RqliteRow>, Error>>
82 where
83 'c: 'e,
84 E: Execute<'q, Self::Database>,
85 {
86 let mut s = self.fetch_many(query);
87
88 Box::pin(async move {
89 while let Some(v) = s.try_next().await? {
90 if let Either::Right(r) = v {
91 return Ok(Some(r));
92 }
93 }
94
95 Ok(None)
96 })
97 }
98
99 fn prepare_with<'e, 'q: 'e>(
100 self,
101 _sql: &'q str,
102 _parameters: &[RqliteTypeInfo],
103 ) -> BoxFuture<'e, Result<RqliteStatement<'q>, Error>>
104 where
105 'c: 'e,
106 {
107 Box::pin(async {
108 Err(Error::Io(std::io::Error::new(
109 std::io::ErrorKind::Other,
110 "prepare_with not supported",
111 )))
112 })
113 }
114 #[doc(hidden)]
115 fn describe<'e, 'q: 'e>(self, _sql: &'q str) -> BoxFuture<'e, Result<Describe<Rqlite>, Error>>
116 where
117 'c: 'e,
118 {
119 Box::pin(async {
120 Err(Error::Io(std::io::Error::new(
121 std::io::ErrorKind::Other,
122 "describe not supported",
123 )))
124 })
125 }
126}