sqlx_core_oldapi/mssql/connection/
executor.rs

1use crate::describe::Describe;
2use crate::error::Error;
3use crate::executor::{Execute, Executor};
4use crate::logger::QueryLogger;
5use crate::mssql::connection::prepare::prepare;
6use crate::mssql::protocol::col_meta_data::Flags;
7use crate::mssql::protocol::done::Status;
8use crate::mssql::protocol::message::Message;
9use crate::mssql::protocol::packet::PacketType;
10use crate::mssql::protocol::rpc::{OptionFlags, Procedure, RpcRequest};
11use crate::mssql::protocol::sql_batch::SqlBatch;
12use crate::mssql::{
13    Mssql, MssqlArguments, MssqlConnection, MssqlQueryResult, MssqlRow, MssqlStatement,
14    MssqlTypeInfo,
15};
16use either::Either;
17use futures_core::future::BoxFuture;
18use futures_core::stream::BoxStream;
19use futures_util::TryStreamExt;
20use std::borrow::Cow;
21use std::sync::Arc;
22
23impl MssqlConnection {
24    async fn run(&mut self, query: &str, arguments: Option<MssqlArguments>) -> Result<(), Error> {
25        self.stream.wait_until_ready().await?;
26        self.stream.pending_done_count += 1;
27
28        if let Some(mut arguments) = arguments {
29            let proc = Either::Right(Procedure::ExecuteSql);
30            let mut proc_args = MssqlArguments::default();
31
32            // SQL
33            proc_args.add_unnamed(query);
34
35            if !arguments.data.is_empty() {
36                // Declarations
37                //  NAME TYPE, NAME TYPE, ...
38                proc_args.add_unnamed(&*arguments.declarations);
39
40                // Add the list of SQL parameters _after_ our RPC parameters
41                proc_args.append(&mut arguments);
42            }
43
44            self.stream
45                .write_packet_and_flush(
46                    PacketType::Rpc,
47                    RpcRequest {
48                        transaction_descriptor: self.stream.transaction_descriptor,
49                        arguments: &proc_args,
50                        procedure: proc,
51                        options: OptionFlags::empty(),
52                    },
53                )
54                .await?;
55        } else {
56            self.stream
57                .write_packet_and_flush(
58                    PacketType::SqlBatch,
59                    SqlBatch {
60                        transaction_descriptor: self.stream.transaction_descriptor,
61                        sql: query,
62                    },
63                )
64                .await?;
65        }
66
67        Ok(())
68    }
69}
70
71impl<'c> Executor<'c> for &'c mut MssqlConnection {
72    type Database = Mssql;
73
74    fn fetch_many<'e, 'q: 'e, E: 'q>(
75        self,
76        mut query: E,
77    ) -> BoxStream<'e, Result<Either<MssqlQueryResult, MssqlRow>, Error>>
78    where
79        'c: 'e,
80        E: Execute<'q, Self::Database>,
81    {
82        let sql = query.sql();
83        let arguments = query.take_arguments();
84        let mut logger = QueryLogger::new(sql, self.log_settings.clone());
85
86        Box::pin(try_stream! {
87            self.run(sql, arguments).await?;
88
89            loop {
90                let message = self.stream.recv_message().await?;
91
92                match message {
93                    Message::Row(row) => {
94                        let columns = Arc::clone(&self.stream.columns);
95                        let column_names = Arc::clone(&self.stream.column_names);
96
97                        logger.increment_rows_returned();
98
99                        r#yield!(Either::Right(MssqlRow { row, column_names, columns }));
100                    }
101
102                    Message::Done(done) | Message::DoneProc(done) => {
103                        if !done.status.contains(Status::DONE_MORE) {
104                            self.stream.handle_done(&done);
105                        }
106
107                        if done.status.contains(Status::DONE_COUNT) {
108                            let rows_affected = done.affected_rows;
109                            logger.increase_rows_affected(rows_affected);
110                            r#yield!(Either::Left(MssqlQueryResult {
111                                rows_affected,
112                            }));
113                        }
114
115                        if !done.status.contains(Status::DONE_MORE) {
116                            break;
117                        }
118                    }
119
120                    Message::DoneInProc(done) => {
121                        if done.status.contains(Status::DONE_COUNT) {
122                            let rows_affected = done.affected_rows;
123                            logger.increase_rows_affected(rows_affected);
124                            r#yield!(Either::Left(MssqlQueryResult {
125                                rows_affected,
126                            }));
127                        }
128                    }
129
130                    _ => {}
131                }
132            }
133
134            Ok(())
135        })
136    }
137
138    fn fetch_optional<'e, 'q: 'e, E: 'q>(
139        self,
140        query: E,
141    ) -> BoxFuture<'e, Result<Option<MssqlRow>, Error>>
142    where
143        'c: 'e,
144        E: Execute<'q, Self::Database>,
145    {
146        let mut s = self.fetch_many(query);
147
148        Box::pin(async move {
149            while let Some(v) = s.try_next().await? {
150                if let Either::Right(r) = v {
151                    return Ok(Some(r));
152                }
153            }
154
155            Ok(None)
156        })
157    }
158
159    fn prepare_with<'e, 'q: 'e>(
160        self,
161        sql: &'q str,
162        _parameters: &[MssqlTypeInfo],
163    ) -> BoxFuture<'e, Result<MssqlStatement<'q>, Error>>
164    where
165        'c: 'e,
166    {
167        Box::pin(async move {
168            let metadata = prepare(self, sql).await?;
169
170            Ok(MssqlStatement {
171                sql: Cow::Borrowed(sql),
172                metadata,
173            })
174        })
175    }
176
177    fn describe<'e, 'q: 'e>(
178        self,
179        sql: &'q str,
180    ) -> BoxFuture<'e, Result<Describe<Self::Database>, Error>>
181    where
182        'c: 'e,
183    {
184        Box::pin(async move {
185            let metadata = prepare(self, sql).await?;
186
187            let mut nullable = Vec::with_capacity(metadata.columns.len());
188
189            for col in metadata.columns.iter() {
190                nullable.push(Some(col.flags.contains(Flags::NULLABLE)));
191            }
192
193            Ok(Describe {
194                nullable,
195                columns: (metadata.columns).clone(),
196                parameters: None,
197            })
198        })
199    }
200}