zero_mysql/protocol/command/
query.rs

1use crate::buffer::BufferSet;
2use crate::constant::CommandByte;
3use crate::error::{Error, Result, eyre};
4use crate::protocol::TextRowPayload;
5use crate::protocol::command::ColumnDefinitions;
6use crate::protocol::primitive::*;
7use crate::protocol::response::{ErrPayloadBytes, OkPayloadBytes};
8
9const MAX_PAYLOAD_LENGTH: usize = (1 << 24) - 4;
10
11/// Write COM_QUERY command
12pub fn write_query(out: &mut Vec<u8>, sql: &str) {
13    write_int_1(out, CommandByte::Query as u8);
14    out.extend_from_slice(sql.as_bytes());
15}
16
17/// Read COM_QUERY response
18/// This can be:
19/// - 0xFF: ERR packet (error occurred)
20/// - 0x00: OK packet (query succeeded without result set)
21/// - 0xFB: LOCAL INFILE packet (not yet supported)
22/// - Otherwise: Result set (first byte is column count as length-encoded integer)
23pub fn read_query_response(payload: &[u8]) -> Result<QueryResponse<'_>> {
24    if payload.is_empty() {
25        return Err(Error::LibraryBug(eyre!("read_query_response: empty payload")));
26    }
27
28    match payload[0] {
29        0xFF => Err(ErrPayloadBytes(payload).into()),
30        0x00 => Ok(QueryResponse::Ok(OkPayloadBytes(payload))),
31        0xFB => Err(Error::BadConfigError(
32            "LOCAL INFILE queries are not yet supported".to_string(),
33        )),
34        _ => {
35            let (column_count, _rest) = read_int_lenenc(payload)?;
36            Ok(QueryResponse::ResultSet { column_count })
37        }
38    }
39}
40
41/// Query response variants
42#[derive(Debug)]
43pub enum QueryResponse<'a> {
44    Ok(OkPayloadBytes<'a>),
45    ResultSet { column_count: u64 },
46}
47
48// ============================================================================
49// State Machine API for Query
50// ============================================================================
51
52use crate::protocol::r#trait::TextResultSetHandler;
53
54/// Internal state of the Query state machine
55enum QueryState {
56    /// Initial state - need to read first packet
57    Start,
58    /// Reading the first response packet
59    ReadingFirstPacket,
60    /// Reading column definitions
61    ReadingColumns { num_columns: usize },
62    /// Reading rows
63    ReadingRows,
64    /// Finished
65    Finished,
66}
67
68/// State machine for Query (text protocol) with integrated handler
69///
70/// The handler is provided at construction and called directly by the state machine.
71/// The `drive()` method returns actions indicating what I/O operation is needed next.
72pub struct Query<'h, H> {
73    state: QueryState,
74    handler: &'h mut H,
75    column_defs: Option<ColumnDefinitions>,
76}
77
78impl<'h, H: TextResultSetHandler> Query<'h, H> {
79    /// Create a new Query state machine with the given handler
80    pub fn new(handler: &'h mut H) -> Self {
81        Self {
82            state: QueryState::Start,
83            handler,
84            column_defs: None,
85        }
86    }
87
88    /// Drive the state machine forward
89    ///
90    /// # Arguments
91    /// * `buffer_set` - The buffer set containing buffers to read from/write to
92    ///
93    /// # Returns
94    /// * `Action::NeedPacket(&mut Vec<u8>)` - Needs more data in the specified buffer
95    /// * `Action::Finished` - Processing complete
96    pub fn step<'buf>(
97        &mut self,
98        buffer_set: &'buf mut BufferSet,
99    ) -> Result<crate::protocol::command::Action<'buf>> {
100        use crate::protocol::command::Action;
101        match &mut self.state {
102            QueryState::Start => {
103                // Request the first packet
104                self.state = QueryState::ReadingFirstPacket;
105                Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
106            }
107
108            QueryState::ReadingFirstPacket => {
109                let payload = &buffer_set.read_buffer[..];
110                let response = read_query_response(payload)?;
111
112                match response {
113                    QueryResponse::Ok(ok_bytes) => {
114                        // Parse OK packet to check status flags
115                        use crate::constant::ServerStatusFlags;
116                        use crate::protocol::response::OkPayload;
117
118                        let ok_payload = OkPayload::try_from(ok_bytes)?;
119                        self.handler.no_result_set(ok_bytes)?;
120
121                        // Check if there are more results to come
122                        if ok_payload
123                            .status_flags
124                            .contains(ServerStatusFlags::SERVER_MORE_RESULTS_EXISTS)
125                        {
126                            // More resultsets coming, go to ReadingFirstPacket to process next result
127                            self.state = QueryState::ReadingFirstPacket;
128                            Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
129                        } else {
130                            // No more results, we're done
131                            self.state = QueryState::Finished;
132                            Ok(Action::Finished)
133                        }
134                    }
135                    QueryResponse::ResultSet { column_count } => {
136                        let num_columns = column_count as usize;
137                        self.state = QueryState::ReadingColumns { num_columns };
138                        Ok(Action::ReadColumnMetadata { num_columns })
139                    }
140                }
141            }
142
143            QueryState::ReadingColumns { num_columns } => {
144                // Parse all column definitions from the buffer
145                // The buffer contains [len(u32)][payload][len(u32)][payload]...
146                let column_defs = ColumnDefinitions::new(
147                    *num_columns,
148                    std::mem::take(&mut buffer_set.column_definition_buffer),
149                )?;
150
151                self.handler.resultset_start(column_defs.definitions())?;
152                self.column_defs = Some(column_defs);
153                self.state = QueryState::ReadingRows;
154                Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
155            }
156
157            QueryState::ReadingRows => {
158                let payload = &buffer_set.read_buffer[..];
159                // A valid row's first item is NULL (0xFB) or string<lenenc>.
160                // string<lenenc> starts with int<lenenc> which cannot start with 0xFF (ErrPacket header).
161                // Hence, 0xFF always means Err.
162                //
163                // Similarly, string<lenenc> starting with 0xFE means that the length of a string is at least 2^24, which means the packet is of the size 2^24.
164                // The Ok-Packet for EOF cannot be this long, therefore 0xFE with payload.len() determines the payload length.
165                match payload.first() {
166                    Some(0xFF) => Err(ErrPayloadBytes(payload))?,
167                    Some(0xFE) if payload.len() != MAX_PAYLOAD_LENGTH => {
168                        // Parse OK packet to check status flags
169                        use crate::constant::ServerStatusFlags;
170                        use crate::protocol::response::OkPayload;
171
172                        let ok_bytes = OkPayloadBytes(payload);
173                        let ok_payload = OkPayload::try_from(ok_bytes)?;
174                        self.handler.resultset_end(ok_bytes)?;
175
176                        // Check if there are more results to come
177                        if ok_payload
178                            .status_flags
179                            .contains(ServerStatusFlags::SERVER_MORE_RESULTS_EXISTS)
180                        {
181                            // More resultsets coming, go to ReadingFirstPacket to process next result
182                            self.state = QueryState::ReadingFirstPacket;
183                            Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
184                        } else {
185                            // No more results, we're done
186                            self.state = QueryState::Finished;
187                            Ok(Action::Finished)
188                        }
189                    }
190                    _ => {
191                        let cols = self.column_defs.as_ref().ok_or_else(|| {
192                            Error::LibraryBug(eyre!("no column definitions while reading rows"))
193                        })?;
194                        let row = TextRowPayload(payload);
195                        self.handler.row(cols.definitions(), row)?;
196                        Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
197                    }
198                }
199            }
200
201            QueryState::Finished => Err(Error::LibraryBug(eyre!(
202                "Query::step called after finished"
203            ))),
204        }
205    }
206}