zero_mysql/protocol/command/
query.rs1use crate::buffer::BufferSet;
2use crate::constant::CommandByte;
3use crate::error::{Error, Result, eyre};
4use crate::protocol::TextRowPayload;
5use crate::protocol::command::ColumnDefinitions;
6use crate::protocol::primitive::*;
7use crate::protocol::response::{ErrPayloadBytes, OkPayloadBytes};
8
9const MAX_PAYLOAD_LENGTH: usize = (1 << 24) - 4;
10
11pub fn write_query(out: &mut Vec<u8>, sql: &str) {
13 write_int_1(out, CommandByte::Query as u8);
14 out.extend_from_slice(sql.as_bytes());
15}
16
17pub fn read_query_response(payload: &[u8]) -> Result<QueryResponse<'_>> {
24 if payload.is_empty() {
25 return Err(Error::LibraryBug(eyre!("read_query_response: empty payload")));
26 }
27
28 match payload[0] {
29 0xFF => Err(ErrPayloadBytes(payload).into()),
30 0x00 => Ok(QueryResponse::Ok(OkPayloadBytes(payload))),
31 0xFB => Err(Error::BadConfigError(
32 "LOCAL INFILE queries are not yet supported".to_string(),
33 )),
34 _ => {
35 let (column_count, _rest) = read_int_lenenc(payload)?;
36 Ok(QueryResponse::ResultSet { column_count })
37 }
38 }
39}
40
41#[derive(Debug)]
43pub enum QueryResponse<'a> {
44 Ok(OkPayloadBytes<'a>),
45 ResultSet { column_count: u64 },
46}
47
48use crate::protocol::r#trait::TextResultSetHandler;
53
54enum QueryState {
56 Start,
58 ReadingFirstPacket,
60 ReadingColumns { num_columns: usize },
62 ReadingRows,
64 Finished,
66}
67
68pub struct Query<'h, H> {
73 state: QueryState,
74 handler: &'h mut H,
75 column_defs: Option<ColumnDefinitions>,
76}
77
78impl<'h, H: TextResultSetHandler> Query<'h, H> {
79 pub fn new(handler: &'h mut H) -> Self {
81 Self {
82 state: QueryState::Start,
83 handler,
84 column_defs: None,
85 }
86 }
87
88 pub fn step<'buf>(
97 &mut self,
98 buffer_set: &'buf mut BufferSet,
99 ) -> Result<crate::protocol::command::Action<'buf>> {
100 use crate::protocol::command::Action;
101 match &mut self.state {
102 QueryState::Start => {
103 self.state = QueryState::ReadingFirstPacket;
105 Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
106 }
107
108 QueryState::ReadingFirstPacket => {
109 let payload = &buffer_set.read_buffer[..];
110 let response = read_query_response(payload)?;
111
112 match response {
113 QueryResponse::Ok(ok_bytes) => {
114 use crate::constant::ServerStatusFlags;
116 use crate::protocol::response::OkPayload;
117
118 let ok_payload = OkPayload::try_from(ok_bytes)?;
119 self.handler.no_result_set(ok_bytes)?;
120
121 if ok_payload
123 .status_flags
124 .contains(ServerStatusFlags::SERVER_MORE_RESULTS_EXISTS)
125 {
126 self.state = QueryState::ReadingFirstPacket;
128 Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
129 } else {
130 self.state = QueryState::Finished;
132 Ok(Action::Finished)
133 }
134 }
135 QueryResponse::ResultSet { column_count } => {
136 let num_columns = column_count as usize;
137 self.state = QueryState::ReadingColumns { num_columns };
138 Ok(Action::ReadColumnMetadata { num_columns })
139 }
140 }
141 }
142
143 QueryState::ReadingColumns { num_columns } => {
144 let column_defs = ColumnDefinitions::new(
147 *num_columns,
148 std::mem::take(&mut buffer_set.column_definition_buffer),
149 )?;
150
151 self.handler.resultset_start(column_defs.definitions())?;
152 self.column_defs = Some(column_defs);
153 self.state = QueryState::ReadingRows;
154 Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
155 }
156
157 QueryState::ReadingRows => {
158 let payload = &buffer_set.read_buffer[..];
159 match payload.first() {
166 Some(0xFF) => Err(ErrPayloadBytes(payload))?,
167 Some(0xFE) if payload.len() != MAX_PAYLOAD_LENGTH => {
168 use crate::constant::ServerStatusFlags;
170 use crate::protocol::response::OkPayload;
171
172 let ok_bytes = OkPayloadBytes(payload);
173 let ok_payload = OkPayload::try_from(ok_bytes)?;
174 self.handler.resultset_end(ok_bytes)?;
175
176 if ok_payload
178 .status_flags
179 .contains(ServerStatusFlags::SERVER_MORE_RESULTS_EXISTS)
180 {
181 self.state = QueryState::ReadingFirstPacket;
183 Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
184 } else {
185 self.state = QueryState::Finished;
187 Ok(Action::Finished)
188 }
189 }
190 _ => {
191 let cols = self.column_defs.as_ref().ok_or_else(|| {
192 Error::LibraryBug(eyre!("no column definitions while reading rows"))
193 })?;
194 let row = TextRowPayload(payload);
195 self.handler.row(cols.definitions(), row)?;
196 Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
197 }
198 }
199 }
200
201 QueryState::Finished => Err(Error::LibraryBug(eyre!(
202 "Query::step called after finished"
203 ))),
204 }
205 }
206}