Skip to main content

sqlx_core_oldapi/mssql/connection/
executor.rs

1use crate::describe::Describe;
2use crate::error::Error;
3use crate::executor::{Execute, Executor};
4use crate::logger::QueryLogger;
5use crate::mssql::connection::prepare::prepare;
6use crate::mssql::protocol::col_meta_data::Flags;
7use crate::mssql::protocol::done::Status;
8use crate::mssql::protocol::message::Message;
9use crate::mssql::protocol::packet::PacketType;
10use crate::mssql::protocol::rpc::{OptionFlags, Procedure, RpcRequest};
11use crate::mssql::protocol::sql_batch::SqlBatch;
12use crate::mssql::{
13    Mssql, MssqlArguments, MssqlConnection, MssqlQueryResult, MssqlRow, MssqlStatement,
14    MssqlTypeInfo,
15};
16use either::Either;
17use futures_core::future::BoxFuture;
18use futures_core::stream::BoxStream;
19use futures_util::TryStreamExt;
20use std::borrow::Cow;
21use std::sync::Arc;
22
23impl MssqlConnection {
24    async fn run(&mut self, query: &str, arguments: Option<MssqlArguments>) -> Result<(), Error> {
25        self.stream.wait_until_ready().await?;
26        self.stream.pending_done_count += 1;
27
28        if let Some(mut arguments) = arguments {
29            let proc = Either::Right(Procedure::ExecuteSql);
30            let mut proc_args = MssqlArguments::default();
31
32            // SQL
33            proc_args.add_unnamed(query);
34
35            if !arguments.data.is_empty() {
36                // Declarations
37                //  NAME TYPE, NAME TYPE, ...
38                proc_args.add_unnamed(&*arguments.declarations);
39
40                // Add the list of SQL parameters _after_ our RPC parameters
41                proc_args.append(&mut arguments);
42            }
43
44            self.stream
45                .write_packet_and_flush(
46                    PacketType::Rpc,
47                    RpcRequest {
48                        transaction_descriptor: self.stream.transaction_descriptor,
49                        arguments: &proc_args,
50                        procedure: proc,
51                        options: OptionFlags::empty(),
52                    },
53                )
54                .await?;
55        } else {
56            self.stream
57                .write_packet_and_flush(
58                    PacketType::SqlBatch,
59                    SqlBatch {
60                        transaction_descriptor: self.stream.transaction_descriptor,
61                        sql: query,
62                    },
63                )
64                .await?;
65        }
66
67        Ok(())
68    }
69}
70
71impl<'c> Executor<'c> for &'c mut MssqlConnection {
72    type Database = Mssql;
73
74    fn fetch_many<'e, 'q: 'e, E>(
75        self,
76        mut query: E,
77    ) -> BoxStream<'e, Result<Either<MssqlQueryResult, MssqlRow>, Error>>
78    where
79        'c: 'e,
80        E: Execute<'q, Self::Database> + 'q,
81    {
82        let sql = query.sql();
83        let arguments = query.take_arguments();
84        let mut logger = QueryLogger::new(sql, self.log_settings.clone());
85
86        Box::pin(try_stream! {
87            self.run(sql, arguments).await?;
88
89            loop {
90                let message = self.stream.recv_message().await?;
91
92                match message {
93                    Message::Row(row) => {
94                        let columns = Arc::clone(&self.stream.columns);
95                        let column_names = Arc::clone(&self.stream.column_names);
96
97                        logger.increment_rows_returned();
98
99                        r#yield!(Either::Right(MssqlRow { row, column_names, columns }));
100                    }
101
102                    Message::Done(done) | Message::DoneProc(done) => {
103                        if !done.status.contains(Status::DONE_MORE) {
104                            self.stream.handle_done(&done);
105                        }
106
107                        if done.status.contains(Status::DONE_COUNT) {
108                            let rows_affected = done.affected_rows;
109                            logger.increase_rows_affected(rows_affected);
110                            r#yield!(Either::Left(MssqlQueryResult {
111                                rows_affected,
112                            }));
113                        }
114
115                        if !done.status.contains(Status::DONE_MORE) {
116                            break;
117                        }
118                    }
119
120                    Message::DoneInProc(done)
121                        if done.status.contains(Status::DONE_COUNT) => {
122                            let rows_affected = done.affected_rows;
123                            logger.increase_rows_affected(rows_affected);
124                            r#yield!(Either::Left(MssqlQueryResult {
125                                rows_affected,
126                            }));
127                        }
128
129                    _ => {}
130                }
131            }
132
133            Ok(())
134        })
135    }
136
137    fn fetch_optional<'e, 'q: 'e, E>(
138        self,
139        query: E,
140    ) -> BoxFuture<'e, Result<Option<MssqlRow>, Error>>
141    where
142        'c: 'e,
143        E: Execute<'q, Self::Database> + 'q,
144    {
145        let mut s = self.fetch_many(query);
146
147        Box::pin(async move {
148            while let Some(v) = s.try_next().await? {
149                if let Either::Right(r) = v {
150                    return Ok(Some(r));
151                }
152            }
153
154            Ok(None)
155        })
156    }
157
158    fn prepare_with<'e, 'q: 'e>(
159        self,
160        sql: &'q str,
161        _parameters: &[MssqlTypeInfo],
162    ) -> BoxFuture<'e, Result<MssqlStatement<'q>, Error>>
163    where
164        'c: 'e,
165    {
166        Box::pin(async move {
167            let metadata = prepare(self, sql).await?;
168
169            Ok(MssqlStatement {
170                sql: Cow::Borrowed(sql),
171                metadata,
172            })
173        })
174    }
175
176    fn describe<'e, 'q: 'e>(
177        self,
178        sql: &'q str,
179    ) -> BoxFuture<'e, Result<Describe<Self::Database>, Error>>
180    where
181        'c: 'e,
182    {
183        Box::pin(async move {
184            let metadata = prepare(self, sql).await?;
185
186            let mut nullable = Vec::with_capacity(metadata.columns.len());
187
188            for col in metadata.columns.iter() {
189                nullable.push(Some(col.flags.contains(Flags::NULLABLE)));
190            }
191
192            Ok(Describe {
193                nullable,
194                columns: (metadata.columns).clone(),
195                parameters: None,
196            })
197        })
198    }
199}