cdbc_mssql/connection/
executor.rs

1use cdbc::describe::Describe;
2use cdbc::executor::{Execute, Executor};
3use crate::connection::prepare::prepare;
4use crate::protocol::col_meta_data::Flags;
5use crate::protocol::done::Status;
6use crate::protocol::message::Message;
7use crate::protocol::packet::PacketType;
8use crate::protocol::rpc::{OptionFlags, Procedure, RpcRequest};
9use crate::protocol::sql_batch::SqlBatch;
10use crate::{
11    Mssql, MssqlArguments, MssqlConnection, MssqlQueryResult, MssqlRow, MssqlStatement,
12    MssqlTypeInfo,
13};
14use either::Either;
15use std::borrow::Cow;
16use std::sync::Arc;
17use cdbc::Error;
18use cdbc::io::chan_stream::{ChanStream, TryStream};
19
20impl MssqlConnection {
21    fn run(&mut self, query: &str, arguments: Option<MssqlArguments>) -> Result<(), Error> {
22        self.stream.wait_until_ready()?;
23        self.stream.pending_done_count += 1;
24
25        if let Some(mut arguments) = arguments {
26            let proc = Either::Right(Procedure::ExecuteSql);
27            let mut proc_args = MssqlArguments::default();
28
29            // SQL
30            proc_args.add_unnamed(query);
31
32            if !arguments.data.is_empty() {
33                // Declarations
34                //  NAME TYPE, NAME TYPE, ...
35                proc_args.add_unnamed(&*arguments.declarations);
36
37                // Add the list of SQL parameters _after_ our RPC parameters
38                proc_args.append(&mut arguments);
39            }
40
41            self.stream.write_packet(
42                PacketType::Rpc,
43                RpcRequest {
44                    transaction_descriptor: self.stream.transaction_descriptor,
45                    arguments: &proc_args,
46                    procedure: proc,
47                    options: OptionFlags::empty(),
48                },
49            );
50        } else {
51            self.stream.write_packet(
52                PacketType::SqlBatch,
53                SqlBatch {
54                    transaction_descriptor: self.stream.transaction_descriptor,
55                    sql: query,
56                },
57            );
58        }
59
60        self.stream.flush()?;
61
62        Ok(())
63    }
64}
65
66impl Executor for MssqlConnection {
67    type Database = Mssql;
68
69    fn fetch_many<'q, E: 'q>(
70        &mut self,
71        mut query: E,
72    ) -> ChanStream<Either<MssqlQueryResult, MssqlRow>>
73        where
74            E: Execute<'q, Self::Database>,
75    {
76        let sql = query.sql().to_owned();
77        let arguments = query.take_arguments();
78        chan_stream! {
79            self.run(&sql, arguments)?;
80
81            loop {
82                let message = self.stream.recv_message()?;
83
84                match message {
85                    Message::Row(row) => {
86                        let columns = Arc::clone(&self.stream.columns);
87                        let column_names = Arc::clone(&self.stream.column_names);
88
89                        r#yield!(Either::Right(MssqlRow { row, column_names, columns }));
90                    }
91
92                    Message::Done(done) | Message::DoneProc(done) => {
93                        if !done.status.contains(Status::DONE_MORE) {
94                            self.stream.handle_done(&done);
95                        }
96
97                        if done.status.contains(Status::DONE_COUNT) {
98                            r#yield!(Either::Left(MssqlQueryResult {
99                                rows_affected: done.affected_rows,
100                            }));
101                        }
102
103                        if !done.status.contains(Status::DONE_MORE) {
104                            break;
105                        }
106                    }
107
108                    Message::DoneInProc(done) => {
109                        if done.status.contains(Status::DONE_COUNT) {
110                            r#yield!(Either::Left(MssqlQueryResult {
111                                rows_affected: done.affected_rows,
112                            }));
113                        }
114                    }
115
116                    _ => {}
117                }
118            }
119
120            Ok(())
121        }
122    }
123
124    fn fetch_optional<'q, E: 'q>(
125        &mut self,
126        query: E,
127    ) ->  Result<Option<MssqlRow>, Error>
128        where
129            E: Execute<'q, Self::Database>,
130    {
131        let mut s = self.fetch_many(query);
132        while let Some(v) = s.try_next()? {
133            if let Either::Right(r) = v {
134                return Ok(Some(r));
135            }
136        }
137        Ok(None)
138    }
139
140    fn prepare_with<'q>(
141        &mut self,
142        sql: &'q str,
143        _parameters: &[MssqlTypeInfo],
144    ) -> Result<MssqlStatement, Error> {
145        let metadata = prepare(self, sql)?;
146
147        Ok(MssqlStatement {
148            sql: sql.to_string(),
149            metadata,
150        })
151    }
152
153    fn describe<'q>(
154        &mut self,
155        sql: &'q str,
156    ) -> Result<Describe<Self::Database>, Error>
157    {
158        let metadata = prepare(self, sql)?;
159
160        let mut nullable = Vec::with_capacity(metadata.columns.len());
161
162        for col in metadata.columns.iter() {
163            nullable.push(Some(col.flags.contains(Flags::NULLABLE)));
164        }
165
166        Ok(Describe {
167            nullable,
168            columns: (metadata.columns).clone(),
169            parameters: None,
170        })
171    }
172}