zero_mysql/protocol/command/
prepared.rs1use crate::buffer::BufferSet;
2use crate::constant::CommandByte;
3use crate::error::{Error, Result, eyre};
4use crate::protocol::BinaryRowPayload;
5use crate::protocol::primitive::*;
6use crate::protocol::response::{ErrPayloadBytes, OkPayloadBytes};
7use crate::protocol::r#trait::param::Params;
8use zerocopy::byteorder::little_endian::{U16 as U16LE, U32 as U32LE};
9use zerocopy::{FromBytes, Immutable, KnownLayout};
10
11#[repr(C, packed)]
13#[derive(Debug, Clone, Copy, FromBytes, KnownLayout, Immutable)]
14pub struct PrepareOk {
15 statement_id: U32LE,
16 num_columns: U16LE,
17 num_params: U16LE,
18 _reserved: u8,
19 warning_count: U16LE, }
21
22impl PrepareOk {
23 pub fn statement_id(&self) -> u32 {
25 self.statement_id.get()
26 }
27
28 pub fn num_columns(&self) -> u16 {
30 self.num_columns.get()
31 }
32
33 pub fn num_params(&self) -> u16 {
35 self.num_params.get()
36 }
37
38 pub fn warning_count(&self) -> u16 {
40 self.warning_count.get()
41 }
42}
43
44pub fn write_prepare(out: &mut Vec<u8>, sql: &str) {
46 write_int_1(out, CommandByte::StmtPrepare as u8);
47 out.extend_from_slice(sql.as_bytes());
48}
49
50pub fn read_prepare_ok(payload: &[u8]) -> Result<&PrepareOk> {
52 let (status, data) = read_int_1(payload)?;
53 debug_assert_eq!(status, 0x00);
54 Ok(PrepareOk::ref_from_bytes(&data[..11])?)
55}
56
57pub fn write_execute<P: Params>(out: &mut Vec<u8>, statement_id: u32, params: P) -> Result<()> {
59 write_int_1(out, CommandByte::StmtExecute as u8);
60 write_int_4(out, statement_id);
61
62 write_int_1(out, 0x00);
64
65 write_int_4(out, 1);
67
68 let num_params = params.len();
69
70 if num_params > 0 {
71 params.encode_null_bitmap(out);
73
74 let send_types_to_server = true;
76 if send_types_to_server {
77 write_int_1(out, 0x01);
78 params.encode_types(out);
79 } else {
80 write_int_1(out, 0x00);
81 }
82
83 params.encode_values(out)?; }
85 Ok(())
86}
87
88pub fn read_execute_response(payload: &[u8], cache_metadata: bool) -> Result<ExecuteResponse<'_>> {
91 if payload.is_empty() {
92 return Err(Error::LibraryBug(eyre!(
93 "read_execute_response: empty payload"
94 )));
95 }
96
97 match payload[0] {
98 0x00 => Ok(ExecuteResponse::Ok(OkPayloadBytes(payload))),
99 0xFF => Err(ErrPayloadBytes(payload).into()),
100 _ => {
101 let (column_count, rest) = read_int_lenenc(payload)?;
102
103 let has_column_metadata = if cache_metadata {
105 if rest.is_empty() {
106 return Err(Error::LibraryBug(eyre!(
107 "read_execute_response: missing metadata_follows flag"
108 )));
109 }
110 rest[0] != 0
111 } else {
112 true
114 };
115
116 Ok(ExecuteResponse::ResultSet {
117 column_count,
118 has_column_metadata,
119 })
120 }
121 }
122}
123
124#[derive(Debug)]
126pub enum ExecuteResponse<'a> {
127 Ok(OkPayloadBytes<'a>),
128 ResultSet {
129 column_count: u64,
130 has_column_metadata: bool,
131 },
132}
133
134pub fn read_binary_row<'a>(payload: &'a [u8], num_columns: usize) -> Result<BinaryRowPayload<'a>> {
136 crate::protocol::command::resultset::read_binary_row(payload, num_columns)
137}
138
139pub fn write_close_statement(out: &mut Vec<u8>, statement_id: u32) {
141 write_int_1(out, CommandByte::StmtClose as u8);
142 write_int_4(out, statement_id);
143}
144
145pub fn write_reset_statement(out: &mut Vec<u8>, statement_id: u32) {
147 write_int_1(out, CommandByte::StmtReset as u8);
148 write_int_4(out, statement_id);
149}
150
151use crate::PreparedStatement;
156use crate::protocol::command::ColumnDefinitions;
157use crate::protocol::r#trait::BinaryResultSetHandler;
158
159enum ExecState {
161 Start,
163 ReadingFirstPacket,
165 ReadingColumns { num_columns: usize },
167 ReadingRows { num_columns: usize },
169 Finished,
171}
172
173pub struct Exec<'h, 'stmt, H> {
178 state: ExecState,
179 handler: &'h mut H,
180 stmt: &'stmt mut PreparedStatement,
181 cache_metadata: bool,
182}
183
184impl<'h, 'stmt, H: BinaryResultSetHandler> Exec<'h, 'stmt, H> {
185 pub fn new(
187 handler: &'h mut H,
188 stmt: &'stmt mut PreparedStatement,
189 cache_metadata: bool,
190 ) -> Self {
191 Self {
192 state: ExecState::Start,
193 handler,
194 stmt,
195 cache_metadata,
196 }
197 }
198
199 pub fn step<'buf>(
208 &mut self,
209 buffer_set: &'buf mut BufferSet,
210 ) -> Result<crate::protocol::command::Action<'buf>> {
211 use crate::protocol::command::Action;
212 match &mut self.state {
213 ExecState::Start => {
214 self.state = ExecState::ReadingFirstPacket;
216 Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
217 }
218
219 ExecState::ReadingFirstPacket => {
220 let payload = &buffer_set.read_buffer[..];
221 let response = read_execute_response(payload, self.cache_metadata)?;
222
223 match response {
224 ExecuteResponse::Ok(ok_bytes) => {
225 use crate::constant::ServerStatusFlags;
227 use crate::protocol::response::OkPayload;
228
229 let ok_payload = OkPayload::try_from(ok_bytes)?;
230 self.handler.no_result_set(ok_bytes)?;
231
232 if ok_payload
234 .status_flags
235 .contains(ServerStatusFlags::SERVER_MORE_RESULTS_EXISTS)
236 {
237 self.state = ExecState::ReadingFirstPacket;
239 Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
240 } else {
241 self.state = ExecState::Finished;
243 Ok(Action::Finished)
244 }
245 }
246 ExecuteResponse::ResultSet {
247 column_count,
248 has_column_metadata,
249 } => {
250 let num_columns = column_count as usize;
251
252 if has_column_metadata {
253 self.state = ExecState::ReadingColumns { num_columns };
255 Ok(Action::ReadColumnMetadata { num_columns })
256 } else {
257 if let Some(cols) = self.stmt.column_definitions() {
259 self.handler.resultset_start(cols)?;
260 self.state = ExecState::ReadingRows { num_columns };
261 Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
262 } else {
263 Err(Error::LibraryBug(eyre!(
265 "no cached column definitions available"
266 )))
267 }
268 }
269 }
270 }
271 }
272
273 ExecState::ReadingColumns { num_columns } => {
274 let column_defs = ColumnDefinitions::new(
277 *num_columns,
278 std::mem::take(&mut buffer_set.column_definition_buffer),
279 )?;
280
281 self.handler.resultset_start(column_defs.definitions())?;
283 self.stmt.set_column_definitions(column_defs);
284
285 self.state = ExecState::ReadingRows {
287 num_columns: *num_columns,
288 };
289 Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
290 }
291
292 ExecState::ReadingRows { num_columns } => {
293 let payload = &buffer_set.read_buffer[..];
294 match payload[0] {
295 0x00 => {
296 let row = read_binary_row(payload, *num_columns)?;
297 let cols = self.stmt.column_definitions().ok_or_else(|| {
298 Error::LibraryBug(eyre!("no column definitions while reading rows"))
299 })?;
300 self.handler.row(cols, row)?;
301 Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
302 }
303 0xFE => {
304 use crate::constant::ServerStatusFlags;
306 use crate::protocol::response::OkPayload;
307
308 let eof_bytes = OkPayloadBytes(payload);
309 eof_bytes.assert_eof()?;
310 let ok_payload = OkPayload::try_from(eof_bytes)?;
311 self.handler.resultset_end(eof_bytes)?;
312
313 if ok_payload
315 .status_flags
316 .contains(ServerStatusFlags::SERVER_MORE_RESULTS_EXISTS)
317 {
318 self.state = ExecState::ReadingFirstPacket;
320 Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
321 } else {
322 self.state = ExecState::Finished;
324 Ok(Action::Finished)
325 }
326 }
327 header => Err(Error::LibraryBug(eyre!(
328 "unexpected row packet header: 0x{:02X}",
329 header
330 ))),
331 }
332 }
333
334 ExecState::Finished => {
335 Err(Error::LibraryBug(eyre!("Exec::step called after finished")))
336 }
337 }
338 }
339}