sqlx_core_oldapi/mssql/connection/
executor.rs1use crate::describe::Describe;
2use crate::error::Error;
3use crate::executor::{Execute, Executor};
4use crate::logger::QueryLogger;
5use crate::mssql::connection::prepare::prepare;
6use crate::mssql::protocol::col_meta_data::Flags;
7use crate::mssql::protocol::done::Status;
8use crate::mssql::protocol::message::Message;
9use crate::mssql::protocol::packet::PacketType;
10use crate::mssql::protocol::rpc::{OptionFlags, Procedure, RpcRequest};
11use crate::mssql::protocol::sql_batch::SqlBatch;
12use crate::mssql::{
13 Mssql, MssqlArguments, MssqlConnection, MssqlQueryResult, MssqlRow, MssqlStatement,
14 MssqlTypeInfo,
15};
16use either::Either;
17use futures_core::future::BoxFuture;
18use futures_core::stream::BoxStream;
19use futures_util::TryStreamExt;
20use std::borrow::Cow;
21use std::sync::Arc;
22
23impl MssqlConnection {
24 async fn run(&mut self, query: &str, arguments: Option<MssqlArguments>) -> Result<(), Error> {
25 self.stream.wait_until_ready().await?;
26 self.stream.pending_done_count += 1;
27
28 if let Some(mut arguments) = arguments {
29 let proc = Either::Right(Procedure::ExecuteSql);
30 let mut proc_args = MssqlArguments::default();
31
32 proc_args.add_unnamed(query);
34
35 if !arguments.data.is_empty() {
36 proc_args.add_unnamed(&*arguments.declarations);
39
40 proc_args.append(&mut arguments);
42 }
43
44 self.stream
45 .write_packet_and_flush(
46 PacketType::Rpc,
47 RpcRequest {
48 transaction_descriptor: self.stream.transaction_descriptor,
49 arguments: &proc_args,
50 procedure: proc,
51 options: OptionFlags::empty(),
52 },
53 )
54 .await?;
55 } else {
56 self.stream
57 .write_packet_and_flush(
58 PacketType::SqlBatch,
59 SqlBatch {
60 transaction_descriptor: self.stream.transaction_descriptor,
61 sql: query,
62 },
63 )
64 .await?;
65 }
66
67 Ok(())
68 }
69}
70
71impl<'c> Executor<'c> for &'c mut MssqlConnection {
72 type Database = Mssql;
73
74 fn fetch_many<'e, 'q: 'e, E>(
75 self,
76 mut query: E,
77 ) -> BoxStream<'e, Result<Either<MssqlQueryResult, MssqlRow>, Error>>
78 where
79 'c: 'e,
80 E: Execute<'q, Self::Database> + 'q,
81 {
82 let sql = query.sql();
83 let arguments = query.take_arguments();
84 let mut logger = QueryLogger::new(sql, self.log_settings.clone());
85
86 Box::pin(try_stream! {
87 self.run(sql, arguments).await?;
88
89 loop {
90 let message = self.stream.recv_message().await?;
91
92 match message {
93 Message::Row(row) => {
94 let columns = Arc::clone(&self.stream.columns);
95 let column_names = Arc::clone(&self.stream.column_names);
96
97 logger.increment_rows_returned();
98
99 r#yield!(Either::Right(MssqlRow { row, column_names, columns }));
100 }
101
102 Message::Done(done) | Message::DoneProc(done) => {
103 if !done.status.contains(Status::DONE_MORE) {
104 self.stream.handle_done(&done);
105 }
106
107 if done.status.contains(Status::DONE_COUNT) {
108 let rows_affected = done.affected_rows;
109 logger.increase_rows_affected(rows_affected);
110 r#yield!(Either::Left(MssqlQueryResult {
111 rows_affected,
112 }));
113 }
114
115 if !done.status.contains(Status::DONE_MORE) {
116 break;
117 }
118 }
119
120 Message::DoneInProc(done)
121 if done.status.contains(Status::DONE_COUNT) => {
122 let rows_affected = done.affected_rows;
123 logger.increase_rows_affected(rows_affected);
124 r#yield!(Either::Left(MssqlQueryResult {
125 rows_affected,
126 }));
127 }
128
129 _ => {}
130 }
131 }
132
133 Ok(())
134 })
135 }
136
137 fn fetch_optional<'e, 'q: 'e, E>(
138 self,
139 query: E,
140 ) -> BoxFuture<'e, Result<Option<MssqlRow>, Error>>
141 where
142 'c: 'e,
143 E: Execute<'q, Self::Database> + 'q,
144 {
145 let mut s = self.fetch_many(query);
146
147 Box::pin(async move {
148 while let Some(v) = s.try_next().await? {
149 if let Either::Right(r) = v {
150 return Ok(Some(r));
151 }
152 }
153
154 Ok(None)
155 })
156 }
157
158 fn prepare_with<'e, 'q: 'e>(
159 self,
160 sql: &'q str,
161 _parameters: &[MssqlTypeInfo],
162 ) -> BoxFuture<'e, Result<MssqlStatement<'q>, Error>>
163 where
164 'c: 'e,
165 {
166 Box::pin(async move {
167 let metadata = prepare(self, sql).await?;
168
169 Ok(MssqlStatement {
170 sql: Cow::Borrowed(sql),
171 metadata,
172 })
173 })
174 }
175
176 fn describe<'e, 'q: 'e>(
177 self,
178 sql: &'q str,
179 ) -> BoxFuture<'e, Result<Describe<Self::Database>, Error>>
180 where
181 'c: 'e,
182 {
183 Box::pin(async move {
184 let metadata = prepare(self, sql).await?;
185
186 let mut nullable = Vec::with_capacity(metadata.columns.len());
187
188 for col in metadata.columns.iter() {
189 nullable.push(Some(col.flags.contains(Flags::NULLABLE)));
190 }
191
192 Ok(Describe {
193 nullable,
194 columns: (metadata.columns).clone(),
195 parameters: None,
196 })
197 })
198 }
199}