sqlx_exasol/connection/
executor.rs

1use std::{borrow::Cow, future::ready};
2
3use futures_core::{future::BoxFuture, stream::BoxStream};
4use futures_util::{FutureExt, StreamExt, TryStreamExt};
5use sqlx_core::{
6    database::Database,
7    describe::Describe,
8    executor::{Execute, Executor},
9    logger::QueryLogger,
10    Either,
11};
12
13use super::stream::ResultStream;
14use crate::{
15    connection::websocket::future::{
16        self, ExecuteBatch, ExecutePrepared, GetOrPrepare, WebSocketFuture,
17    },
18    database::Exasol,
19    responses::DescribeStatement,
20    statement::{ExaStatement, ExaStatementMetadata},
21    ExaConnection, SqlxError, SqlxResult,
22};
23
24impl<'c> Executor<'c> for &'c mut ExaConnection {
25    type Database = Exasol;
26
27    fn execute<'e, 'q, E>(
28        self,
29        mut query: E,
30    ) -> BoxFuture<'e, SqlxResult<<Self::Database as Database>::QueryResult>>
31    where
32        'q: 'e,
33        'c: 'e,
34        E: 'q + Execute<'q, Self::Database>,
35    {
36        let sql = query.sql();
37        let persist = query.persistent();
38        let logger = QueryLogger::new(sql, self.log_settings.clone());
39        let arguments = match query.take_arguments().map_err(SqlxError::Encode) {
40            Ok(a) => a,
41            Err(e) => return Box::pin(ready(Err(e))),
42        };
43
44        let filter_fn = |step| async move {
45            Ok(match step {
46                Either::Left(rows) => Some(rows),
47                Either::Right(_) => None,
48            })
49        };
50
51        if let Some(arguments) = arguments {
52            let future = ExecutePrepared::new(sql, persist, arguments);
53            ResultStream::new(&mut self.ws, logger, future)
54                .try_filter_map(filter_fn)
55                .try_collect()
56                .boxed()
57        } else {
58            let future = future::Execute::new(sql);
59            ResultStream::new(&mut self.ws, logger, future)
60                .try_filter_map(filter_fn)
61                .try_collect()
62                .boxed()
63        }
64    }
65
66    fn execute_many<'e, 'q, E>(
67        self,
68        query: E,
69    ) -> BoxStream<'e, SqlxResult<<Self::Database as Database>::QueryResult>>
70    where
71        'q: 'e,
72        'c: 'e,
73        E: 'q + Execute<'q, Self::Database>,
74    {
75        self.fetch_many(query)
76            .try_filter_map(|step| async move {
77                Ok(match step {
78                    Either::Left(rows) => Some(rows),
79                    Either::Right(_) => None,
80                })
81            })
82            .boxed()
83    }
84
85    fn fetch<'e, 'q, E>(
86        self,
87        mut query: E,
88    ) -> BoxStream<'e, SqlxResult<<Self::Database as Database>::Row>>
89    where
90        'q: 'e,
91        'c: 'e,
92        E: 'q + Execute<'q, Self::Database>,
93    {
94        let sql = query.sql();
95        let persist = query.persistent();
96        let logger = QueryLogger::new(sql, self.log_settings.clone());
97        let arguments = match query.take_arguments().map_err(SqlxError::Encode) {
98            Ok(a) => a,
99            Err(e) => return Box::pin(ready(Err(e)).into_stream()),
100        };
101
102        let filter_fn = |step| async move {
103            Ok(match step {
104                Either::Left(_) => None,
105                Either::Right(row) => Some(row),
106            })
107        };
108
109        if let Some(arguments) = arguments {
110            let future = ExecutePrepared::new(sql, persist, arguments);
111            Box::pin(ResultStream::new(&mut self.ws, logger, future).try_filter_map(filter_fn))
112        } else {
113            let future = future::Execute::new(sql);
114            Box::pin(ResultStream::new(&mut self.ws, logger, future).try_filter_map(filter_fn))
115        }
116    }
117
118    fn fetch_many<'e, 'q, E>(
119        self,
120        mut query: E,
121    ) -> BoxStream<
122        'e,
123        SqlxResult<
124            Either<<Self::Database as Database>::QueryResult, <Self::Database as Database>::Row>,
125        >,
126    >
127    where
128        'q: 'e,
129        'c: 'e,
130        E: 'q + Execute<'q, Self::Database>,
131    {
132        let sql = query.sql();
133        let persist = query.persistent();
134        let logger = QueryLogger::new(sql, self.log_settings.clone());
135        let arguments = match query.take_arguments().map_err(SqlxError::Encode) {
136            Ok(a) => a,
137            Err(e) => return Box::pin(ready(Err(e)).into_stream()),
138        };
139
140        if let Some(arguments) = arguments {
141            let future = ExecutePrepared::new(sql, persist, arguments);
142            Box::pin(ResultStream::new(&mut self.ws, logger, future))
143        } else {
144            let future = ExecuteBatch::new(sql);
145            Box::pin(ResultStream::new(&mut self.ws, logger, future))
146        }
147    }
148
149    fn fetch_optional<'e, 'q, E>(
150        self,
151        query: E,
152    ) -> BoxFuture<'e, SqlxResult<Option<<Self::Database as Database>::Row>>>
153    where
154        'q: 'e,
155        'c: 'e,
156        E: 'q + Execute<'q, Self::Database>,
157    {
158        let mut s = self.fetch_many(query);
159
160        Box::pin(async move {
161            while let Some(v) = s.try_next().await? {
162                if let Either::Right(r) = v {
163                    return Ok(Some(r));
164                }
165            }
166
167            Ok(None)
168        })
169    }
170
171    fn prepare_with<'e, 'q>(
172        self,
173        sql: &'q str,
174        _parameters: &'e [<Self::Database as Database>::TypeInfo],
175    ) -> BoxFuture<'e, SqlxResult<<Self::Database as Database>::Statement<'q>>>
176    where
177        'q: 'e,
178        'c: 'e,
179    {
180        Box::pin(async move {
181            let prepared = GetOrPrepare::new(sql, true).future(&mut self.ws).await?;
182
183            Ok(ExaStatement {
184                sql: Cow::Borrowed(sql),
185                metadata: ExaStatementMetadata::new(
186                    prepared.columns.clone(),
187                    prepared.parameters.clone(),
188                ),
189            })
190        })
191    }
192
193    /// Exasol does not provide nullability information, unfortunately.
194    fn describe<'e, 'q>(self, sql: &'q str) -> BoxFuture<'e, SqlxResult<Describe<Self::Database>>>
195    where
196        'q: 'e,
197        'c: 'e,
198    {
199        Box::pin(async move {
200            let DescribeStatement {
201                columns,
202                parameters,
203                ..
204            } = future::Describe::new(sql).future(&mut self.ws).await?;
205
206            let nullable = (0..columns.len()).map(|_| None).collect();
207
208            Ok(Describe {
209                parameters: Some(Either::Left(parameters)),
210                columns,
211                nullable,
212            })
213        })
214    }
215}