cdbc_mssql/connection/
executor.rs1use cdbc::describe::Describe;
2use cdbc::executor::{Execute, Executor};
3use crate::connection::prepare::prepare;
4use crate::protocol::col_meta_data::Flags;
5use crate::protocol::done::Status;
6use crate::protocol::message::Message;
7use crate::protocol::packet::PacketType;
8use crate::protocol::rpc::{OptionFlags, Procedure, RpcRequest};
9use crate::protocol::sql_batch::SqlBatch;
10use crate::{
11 Mssql, MssqlArguments, MssqlConnection, MssqlQueryResult, MssqlRow, MssqlStatement,
12 MssqlTypeInfo,
13};
14use either::Either;
15use std::borrow::Cow;
16use std::sync::Arc;
17use cdbc::Error;
18use cdbc::io::chan_stream::{ChanStream, TryStream};
19
20impl MssqlConnection {
21 fn run(&mut self, query: &str, arguments: Option<MssqlArguments>) -> Result<(), Error> {
22 self.stream.wait_until_ready()?;
23 self.stream.pending_done_count += 1;
24
25 if let Some(mut arguments) = arguments {
26 let proc = Either::Right(Procedure::ExecuteSql);
27 let mut proc_args = MssqlArguments::default();
28
29 proc_args.add_unnamed(query);
31
32 if !arguments.data.is_empty() {
33 proc_args.add_unnamed(&*arguments.declarations);
36
37 proc_args.append(&mut arguments);
39 }
40
41 self.stream.write_packet(
42 PacketType::Rpc,
43 RpcRequest {
44 transaction_descriptor: self.stream.transaction_descriptor,
45 arguments: &proc_args,
46 procedure: proc,
47 options: OptionFlags::empty(),
48 },
49 );
50 } else {
51 self.stream.write_packet(
52 PacketType::SqlBatch,
53 SqlBatch {
54 transaction_descriptor: self.stream.transaction_descriptor,
55 sql: query,
56 },
57 );
58 }
59
60 self.stream.flush()?;
61
62 Ok(())
63 }
64}
65
66impl Executor for MssqlConnection {
67 type Database = Mssql;
68
69 fn fetch_many<'q, E: 'q>(
70 &mut self,
71 mut query: E,
72 ) -> ChanStream<Either<MssqlQueryResult, MssqlRow>>
73 where
74 E: Execute<'q, Self::Database>,
75 {
76 let sql = query.sql().to_owned();
77 let arguments = query.take_arguments();
78 chan_stream! {
79 self.run(&sql, arguments)?;
80
81 loop {
82 let message = self.stream.recv_message()?;
83
84 match message {
85 Message::Row(row) => {
86 let columns = Arc::clone(&self.stream.columns);
87 let column_names = Arc::clone(&self.stream.column_names);
88
89 r#yield!(Either::Right(MssqlRow { row, column_names, columns }));
90 }
91
92 Message::Done(done) | Message::DoneProc(done) => {
93 if !done.status.contains(Status::DONE_MORE) {
94 self.stream.handle_done(&done);
95 }
96
97 if done.status.contains(Status::DONE_COUNT) {
98 r#yield!(Either::Left(MssqlQueryResult {
99 rows_affected: done.affected_rows,
100 }));
101 }
102
103 if !done.status.contains(Status::DONE_MORE) {
104 break;
105 }
106 }
107
108 Message::DoneInProc(done) => {
109 if done.status.contains(Status::DONE_COUNT) {
110 r#yield!(Either::Left(MssqlQueryResult {
111 rows_affected: done.affected_rows,
112 }));
113 }
114 }
115
116 _ => {}
117 }
118 }
119
120 Ok(())
121 }
122 }
123
124 fn fetch_optional<'q, E: 'q>(
125 &mut self,
126 query: E,
127 ) -> Result<Option<MssqlRow>, Error>
128 where
129 E: Execute<'q, Self::Database>,
130 {
131 let mut s = self.fetch_many(query);
132 while let Some(v) = s.try_next()? {
133 if let Either::Right(r) = v {
134 return Ok(Some(r));
135 }
136 }
137 Ok(None)
138 }
139
140 fn prepare_with<'q>(
141 &mut self,
142 sql: &'q str,
143 _parameters: &[MssqlTypeInfo],
144 ) -> Result<MssqlStatement, Error> {
145 let metadata = prepare(self, sql)?;
146
147 Ok(MssqlStatement {
148 sql: sql.to_string(),
149 metadata,
150 })
151 }
152
153 fn describe<'q>(
154 &mut self,
155 sql: &'q str,
156 ) -> Result<Describe<Self::Database>, Error>
157 {
158 let metadata = prepare(self, sql)?;
159
160 let mut nullable = Vec::with_capacity(metadata.columns.len());
161
162 for col in metadata.columns.iter() {
163 nullable.push(Some(col.flags.contains(Flags::NULLABLE)));
164 }
165
166 Ok(Describe {
167 nullable,
168 columns: (metadata.columns).clone(),
169 parameters: None,
170 })
171 }
172}