zero_mysql/protocol/command/
query.rs1use crate::buffer::BufferSet;
2use crate::constant::CommandByte;
3use crate::error::{Error, Result, eyre};
4use crate::protocol::TextRowPayload;
5use crate::protocol::command::ColumnDefinitions;
6use crate::protocol::primitive::*;
7use crate::protocol::response::{ErrPayloadBytes, OkPayloadBytes};
8
9const MAX_PAYLOAD_LENGTH: usize = (1 << 24) - 4;
10
11pub fn write_query(out: &mut Vec<u8>, sql: &str) {
13 write_int_1(out, CommandByte::Query as u8);
14 out.extend_from_slice(sql.as_bytes());
15}
16
17pub fn read_query_response(payload: &[u8]) -> Result<QueryResponse<'_>> {
24 if payload.is_empty() {
25 return Err(Error::LibraryBug(eyre!(
26 "read_query_response: empty payload"
27 )));
28 }
29
30 match payload[0] {
31 0xFF => Err(ErrPayloadBytes(payload).into()),
32 0x00 => Ok(QueryResponse::Ok(OkPayloadBytes(payload))),
33 0xFB => Err(Error::BadUsageError(
34 "LOCAL INFILE queries are not yet supported".to_string(),
35 )),
36 _ => {
37 let (column_count, _rest) = read_int_lenenc(payload)?;
38 Ok(QueryResponse::ResultSet { column_count })
39 }
40 }
41}
42
43#[derive(Debug)]
45pub enum QueryResponse<'a> {
46 Ok(OkPayloadBytes<'a>),
47 ResultSet { column_count: u64 },
48}
49
50use crate::protocol::r#trait::TextResultSetHandler;
55
56enum QueryState {
58 Start,
60 ReadingFirstPacket,
62 ReadingColumns { num_columns: usize },
64 ReadingRows,
66 Finished,
68}
69
70pub struct Query<'h, H> {
75 state: QueryState,
76 handler: &'h mut H,
77 column_defs: Option<ColumnDefinitions>,
78}
79
80impl<'h, H: TextResultSetHandler> Query<'h, H> {
81 pub fn new(handler: &'h mut H) -> Self {
83 Self {
84 state: QueryState::Start,
85 handler,
86 column_defs: None,
87 }
88 }
89
90 pub fn step<'buf>(
99 &mut self,
100 buffer_set: &'buf mut BufferSet,
101 ) -> Result<crate::protocol::command::Action<'buf>> {
102 use crate::protocol::command::Action;
103 match &mut self.state {
104 QueryState::Start => {
105 self.state = QueryState::ReadingFirstPacket;
107 Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
108 }
109
110 QueryState::ReadingFirstPacket => {
111 let payload = &buffer_set.read_buffer[..];
112 let response = read_query_response(payload)?;
113
114 match response {
115 QueryResponse::Ok(ok_bytes) => {
116 use crate::constant::ServerStatusFlags;
118 use crate::protocol::response::OkPayload;
119
120 let ok_payload = OkPayload::try_from(ok_bytes)?;
121 self.handler.no_result_set(ok_bytes)?;
122
123 if ok_payload
125 .status_flags
126 .contains(ServerStatusFlags::SERVER_MORE_RESULTS_EXISTS)
127 {
128 self.state = QueryState::ReadingFirstPacket;
130 Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
131 } else {
132 self.state = QueryState::Finished;
134 Ok(Action::Finished)
135 }
136 }
137 QueryResponse::ResultSet { column_count } => {
138 let num_columns = column_count as usize;
139 self.state = QueryState::ReadingColumns { num_columns };
140 Ok(Action::ReadColumnMetadata { num_columns })
141 }
142 }
143 }
144
145 QueryState::ReadingColumns { num_columns } => {
146 let column_defs = ColumnDefinitions::new(
149 *num_columns,
150 std::mem::take(&mut buffer_set.column_definition_buffer),
151 )?;
152
153 self.handler.resultset_start(column_defs.definitions())?;
154 self.column_defs = Some(column_defs);
155 self.state = QueryState::ReadingRows;
156 Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
157 }
158
159 QueryState::ReadingRows => {
160 let payload = &buffer_set.read_buffer[..];
161 match payload.first() {
168 Some(0xFF) => Err(ErrPayloadBytes(payload))?,
169 Some(0xFE) if payload.len() != MAX_PAYLOAD_LENGTH => {
170 use crate::constant::ServerStatusFlags;
172 use crate::protocol::response::OkPayload;
173
174 let ok_bytes = OkPayloadBytes(payload);
175 let ok_payload = OkPayload::try_from(ok_bytes)?;
176 self.handler.resultset_end(ok_bytes)?;
177
178 if ok_payload
180 .status_flags
181 .contains(ServerStatusFlags::SERVER_MORE_RESULTS_EXISTS)
182 {
183 self.state = QueryState::ReadingFirstPacket;
185 Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
186 } else {
187 self.state = QueryState::Finished;
189 Ok(Action::Finished)
190 }
191 }
192 _ => {
193 let cols = self.column_defs.as_ref().ok_or_else(|| {
194 Error::LibraryBug(eyre!("no column definitions while reading rows"))
195 })?;
196 let row = TextRowPayload(payload);
197 self.handler.row(cols.definitions(), row)?;
198 Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
199 }
200 }
201 }
202
203 QueryState::Finished => Err(Error::LibraryBug(eyre!(
204 "Query::step called after finished"
205 ))),
206 }
207 }
208}