sqlx_exasol_impl/connection/
executor.rs

1use futures_core::{future::BoxFuture, stream::BoxStream};
2use futures_util::{FutureExt, StreamExt, TryStreamExt};
3use sqlx_core::{
4    describe::Describe,
5    executor::{Execute, Executor},
6    logger::QueryLogger,
7    sql_str::SqlStr,
8    Either,
9};
10
11use super::stream::ResultStream;
12use crate::{
13    connection::websocket::future::{
14        self, ExecuteBatch, ExecutePrepared, GetOrPrepare, WebSocketFuture,
15    },
16    database::Exasol,
17    responses::DescribeStatement,
18    statement::{ExaStatement, ExaStatementMetadata},
19    ExaConnection, ExaQueryResult, ExaRow, ExaTypeInfo, SqlxError, SqlxResult,
20};
21
22impl<'c> Executor<'c> for &'c mut ExaConnection {
23    type Database = Exasol;
24
25    fn execute<'e, 'q, E>(self, query: E) -> BoxFuture<'e, SqlxResult<ExaQueryResult>>
26    where
27        'q: 'e,
28        'c: 'e,
29        E: 'q + Execute<'q, Exasol>,
30    {
31        match self.fetch_impl(query) {
32            Ok(stream) => stream
33                .try_filter_map(|v| std::future::ready(Ok(v.left())))
34                .try_collect()
35                .boxed(),
36            Err(e) => std::future::ready(Err(e)).boxed(),
37        }
38    }
39
40    fn execute_many<'e, 'q, E>(self, query: E) -> BoxStream<'e, SqlxResult<ExaQueryResult>>
41    where
42        'q: 'e,
43        'c: 'e,
44        E: 'q + Execute<'q, Exasol>,
45    {
46        match self.fetch_many_impl(query) {
47            Ok(stream) => stream
48                .try_filter_map(|step| std::future::ready(Ok(step.left())))
49                .boxed(),
50            Err(e) => std::future::ready(Err(e)).into_stream().boxed(),
51        }
52    }
53
54    fn fetch<'e, 'q, E>(self, query: E) -> BoxStream<'e, SqlxResult<ExaRow>>
55    where
56        'q: 'e,
57        'c: 'e,
58        E: 'q + Execute<'q, Exasol>,
59    {
60        match self.fetch_impl(query) {
61            Ok(stream) => stream
62                .try_filter_map(|v| std::future::ready(Ok(v.right())))
63                .boxed(),
64            Err(e) => std::future::ready(Err(e)).into_stream().boxed(),
65        }
66    }
67
68    fn fetch_many<'e, 'q, E>(
69        self,
70        query: E,
71    ) -> BoxStream<'e, SqlxResult<Either<ExaQueryResult, ExaRow>>>
72    where
73        'q: 'e,
74        'c: 'e,
75        E: 'q + Execute<'q, Exasol>,
76    {
77        match self.fetch_many_impl(query) {
78            Ok(stream) => stream.boxed(),
79            Err(e) => std::future::ready(Err(e)).into_stream().boxed(),
80        }
81    }
82
83    fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, SqlxResult<Vec<ExaRow>>>
84    where
85        'c: 'e,
86        E: 'q + Execute<'q, Self::Database>,
87    {
88        match self.fetch_impl(query) {
89            Ok(stream) => stream
90                .try_filter_map(|v| std::future::ready(Ok(v.right())))
91                .try_collect()
92                .boxed(),
93            Err(e) => std::future::ready(Err(e)).boxed(),
94        }
95    }
96
97    fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, SqlxResult<ExaRow>>
98    where
99        'c: 'e,
100        E: 'q + Execute<'q, Self::Database>,
101    {
102        let stream = match self.fetch_impl(query) {
103            Ok(stream) => stream,
104            Err(e) => return std::future::ready(Err(e)).boxed(),
105        };
106
107        Box::pin(async move {
108            stream
109                .try_filter_map(|v| std::future::ready(Ok(v.right())))
110                .try_next()
111                .await
112                .transpose()
113                .unwrap_or(Err(SqlxError::RowNotFound))
114        })
115    }
116
117    fn fetch_optional<'e, 'q, E>(self, query: E) -> BoxFuture<'e, SqlxResult<Option<ExaRow>>>
118    where
119        'q: 'e,
120        'c: 'e,
121        E: 'q + Execute<'q, Exasol>,
122    {
123        let stream = match self.fetch_impl(query) {
124            Ok(stream) => stream,
125            Err(e) => return std::future::ready(Err(e)).boxed(),
126        };
127
128        Box::pin(async move {
129            stream
130                .try_filter_map(|v| std::future::ready(Ok(v.right())))
131                .try_next()
132                .await
133        })
134    }
135
136    fn prepare_with<'e>(
137        self,
138        sql: SqlStr,
139        _parameters: &'e [ExaTypeInfo],
140    ) -> BoxFuture<'e, SqlxResult<ExaStatement>>
141    where
142        'c: 'e,
143    {
144        Box::pin(async move {
145            let prepared = GetOrPrepare::new(sql.clone(), true)
146                .future(&mut self.ws)
147                .await?;
148
149            Ok(ExaStatement {
150                sql,
151                metadata: ExaStatementMetadata::new(
152                    prepared.columns.clone(),
153                    prepared.parameters.clone(),
154                ),
155            })
156        })
157    }
158
159    /// Exasol does not provide nullability information, unfortunately.
160    fn describe<'e>(self, sql: SqlStr) -> BoxFuture<'e, SqlxResult<Describe<Exasol>>>
161    where
162        'c: 'e,
163    {
164        Box::pin(async move {
165            let DescribeStatement {
166                columns,
167                parameters,
168                ..
169            } = future::Describe::new(sql).future(&mut self.ws).await?;
170
171            let nullable = (0..columns.len()).map(|_| None).collect();
172
173            Ok(Describe {
174                parameters: Some(Either::Left(parameters)),
175                columns,
176                nullable,
177            })
178        })
179    }
180}
181
182impl ExaConnection {
183    fn fetch_impl<'c, 'e, 'q, E>(&'c mut self, mut query: E) -> SqlxResult<ResultStream<'e>>
184    where
185        'q: 'e,
186        'c: 'e,
187        E: 'q + Execute<'q, Exasol>,
188    {
189        let persist = query.persistent();
190        let arguments = query.take_arguments().map_err(SqlxError::Encode)?;
191        let logger = QueryLogger::new(query.sql(), self.log_settings.clone());
192        let sql = logger.sql().clone();
193
194        if let Some(arguments) = arguments {
195            let future = ExecutePrepared::new(sql, persist, arguments);
196            Ok(ResultStream::new(&mut self.ws, logger, future))
197        } else {
198            let future = future::Execute::new(sql);
199            Ok(ResultStream::new(&mut self.ws, logger, future))
200        }
201    }
202
203    fn fetch_many_impl<'c, 'e, 'q, E>(&'c mut self, mut query: E) -> SqlxResult<ResultStream<'e>>
204    where
205        'q: 'e,
206        'c: 'e,
207        E: 'q + Execute<'q, Exasol>,
208    {
209        let persist = query.persistent();
210        let arguments = query.take_arguments().map_err(SqlxError::Encode)?;
211        let logger = QueryLogger::new(query.sql(), self.log_settings.clone());
212        let sql = logger.sql().clone();
213
214        if let Some(arguments) = arguments {
215            let future = ExecutePrepared::new(sql, persist, arguments);
216            Ok(ResultStream::new(&mut self.ws, logger, future))
217        } else {
218            let future = ExecuteBatch::new(sql);
219            Ok(ResultStream::new(&mut self.ws, logger, future))
220        }
221    }
222}