sqlx_core_oldapi/mssql/connection/
executor.rs1use crate::describe::Describe;
2use crate::error::Error;
3use crate::executor::{Execute, Executor};
4use crate::logger::QueryLogger;
5use crate::mssql::connection::prepare::prepare;
6use crate::mssql::protocol::col_meta_data::Flags;
7use crate::mssql::protocol::done::Status;
8use crate::mssql::protocol::message::Message;
9use crate::mssql::protocol::packet::PacketType;
10use crate::mssql::protocol::rpc::{OptionFlags, Procedure, RpcRequest};
11use crate::mssql::protocol::sql_batch::SqlBatch;
12use crate::mssql::{
13 Mssql, MssqlArguments, MssqlConnection, MssqlQueryResult, MssqlRow, MssqlStatement,
14 MssqlTypeInfo,
15};
16use either::Either;
17use futures_core::future::BoxFuture;
18use futures_core::stream::BoxStream;
19use futures_util::TryStreamExt;
20use std::borrow::Cow;
21use std::sync::Arc;
22
23impl MssqlConnection {
24 async fn run(&mut self, query: &str, arguments: Option<MssqlArguments>) -> Result<(), Error> {
25 self.stream.wait_until_ready().await?;
26 self.stream.pending_done_count += 1;
27
28 if let Some(mut arguments) = arguments {
29 let proc = Either::Right(Procedure::ExecuteSql);
30 let mut proc_args = MssqlArguments::default();
31
32 proc_args.add_unnamed(query);
34
35 if !arguments.data.is_empty() {
36 proc_args.add_unnamed(&*arguments.declarations);
39
40 proc_args.append(&mut arguments);
42 }
43
44 self.stream
45 .write_packet_and_flush(
46 PacketType::Rpc,
47 RpcRequest {
48 transaction_descriptor: self.stream.transaction_descriptor,
49 arguments: &proc_args,
50 procedure: proc,
51 options: OptionFlags::empty(),
52 },
53 )
54 .await?;
55 } else {
56 self.stream
57 .write_packet_and_flush(
58 PacketType::SqlBatch,
59 SqlBatch {
60 transaction_descriptor: self.stream.transaction_descriptor,
61 sql: query,
62 },
63 )
64 .await?;
65 }
66
67 Ok(())
68 }
69}
70
71impl<'c> Executor<'c> for &'c mut MssqlConnection {
72 type Database = Mssql;
73
74 fn fetch_many<'e, 'q: 'e, E: 'q>(
75 self,
76 mut query: E,
77 ) -> BoxStream<'e, Result<Either<MssqlQueryResult, MssqlRow>, Error>>
78 where
79 'c: 'e,
80 E: Execute<'q, Self::Database>,
81 {
82 let sql = query.sql();
83 let arguments = query.take_arguments();
84 let mut logger = QueryLogger::new(sql, self.log_settings.clone());
85
86 Box::pin(try_stream! {
87 self.run(sql, arguments).await?;
88
89 loop {
90 let message = self.stream.recv_message().await?;
91
92 match message {
93 Message::Row(row) => {
94 let columns = Arc::clone(&self.stream.columns);
95 let column_names = Arc::clone(&self.stream.column_names);
96
97 logger.increment_rows_returned();
98
99 r#yield!(Either::Right(MssqlRow { row, column_names, columns }));
100 }
101
102 Message::Done(done) | Message::DoneProc(done) => {
103 if !done.status.contains(Status::DONE_MORE) {
104 self.stream.handle_done(&done);
105 }
106
107 if done.status.contains(Status::DONE_COUNT) {
108 let rows_affected = done.affected_rows;
109 logger.increase_rows_affected(rows_affected);
110 r#yield!(Either::Left(MssqlQueryResult {
111 rows_affected,
112 }));
113 }
114
115 if !done.status.contains(Status::DONE_MORE) {
116 break;
117 }
118 }
119
120 Message::DoneInProc(done) => {
121 if done.status.contains(Status::DONE_COUNT) {
122 let rows_affected = done.affected_rows;
123 logger.increase_rows_affected(rows_affected);
124 r#yield!(Either::Left(MssqlQueryResult {
125 rows_affected,
126 }));
127 }
128 }
129
130 _ => {}
131 }
132 }
133
134 Ok(())
135 })
136 }
137
138 fn fetch_optional<'e, 'q: 'e, E: 'q>(
139 self,
140 query: E,
141 ) -> BoxFuture<'e, Result<Option<MssqlRow>, Error>>
142 where
143 'c: 'e,
144 E: Execute<'q, Self::Database>,
145 {
146 let mut s = self.fetch_many(query);
147
148 Box::pin(async move {
149 while let Some(v) = s.try_next().await? {
150 if let Either::Right(r) = v {
151 return Ok(Some(r));
152 }
153 }
154
155 Ok(None)
156 })
157 }
158
159 fn prepare_with<'e, 'q: 'e>(
160 self,
161 sql: &'q str,
162 _parameters: &[MssqlTypeInfo],
163 ) -> BoxFuture<'e, Result<MssqlStatement<'q>, Error>>
164 where
165 'c: 'e,
166 {
167 Box::pin(async move {
168 let metadata = prepare(self, sql).await?;
169
170 Ok(MssqlStatement {
171 sql: Cow::Borrowed(sql),
172 metadata,
173 })
174 })
175 }
176
177 fn describe<'e, 'q: 'e>(
178 self,
179 sql: &'q str,
180 ) -> BoxFuture<'e, Result<Describe<Self::Database>, Error>>
181 where
182 'c: 'e,
183 {
184 Box::pin(async move {
185 let metadata = prepare(self, sql).await?;
186
187 let mut nullable = Vec::with_capacity(metadata.columns.len());
188
189 for col in metadata.columns.iter() {
190 nullable.push(Some(col.flags.contains(Flags::NULLABLE)));
191 }
192
193 Ok(Describe {
194 nullable,
195 columns: (metadata.columns).clone(),
196 parameters: None,
197 })
198 })
199 }
200}