zero_mysql/protocol/command/
query.rs

1use crate::buffer::BufferSet;
2use crate::constant::CommandByte;
3use crate::error::{Error, Result, eyre};
4use crate::protocol::TextRowPayload;
5use crate::protocol::command::ColumnDefinitions;
6use crate::protocol::primitive::*;
7use crate::protocol::response::{ErrPayloadBytes, OkPayloadBytes};
8
9const MAX_PAYLOAD_LENGTH: usize = (1 << 24) - 4;
10
11/// Write COM_QUERY command
12pub fn write_query(out: &mut Vec<u8>, sql: &str) {
13    write_int_1(out, CommandByte::Query as u8);
14    out.extend_from_slice(sql.as_bytes());
15}
16
17/// Read COM_QUERY response
18/// This can be:
19/// - 0xFF: ERR packet (error occurred)
20/// - 0x00: OK packet (query succeeded without result set)
21/// - 0xFB: LOCAL INFILE packet (not yet supported)
22/// - Otherwise: Result set (first byte is column count as length-encoded integer)
23pub fn read_query_response(payload: &[u8]) -> Result<QueryResponse<'_>> {
24    if payload.is_empty() {
25        return Err(Error::LibraryBug(eyre!(
26            "read_query_response: empty payload"
27        )));
28    }
29
30    match payload[0] {
31        0xFF => Err(ErrPayloadBytes(payload).into()),
32        0x00 => Ok(QueryResponse::Ok(OkPayloadBytes(payload))),
33        0xFB => Err(Error::BadUsageError(
34            "LOCAL INFILE queries are not yet supported".to_string(),
35        )),
36        _ => {
37            let (column_count, _rest) = read_int_lenenc(payload)?;
38            Ok(QueryResponse::ResultSet { column_count })
39        }
40    }
41}
42
43/// Query response variants
44#[derive(Debug)]
45pub enum QueryResponse<'a> {
46    Ok(OkPayloadBytes<'a>),
47    ResultSet { column_count: u64 },
48}
49
50// ============================================================================
51// State Machine API for Query
52// ============================================================================
53
54use crate::protocol::r#trait::TextResultSetHandler;
55
56/// Internal state of the Query state machine
57enum QueryState {
58    /// Initial state - need to read first packet
59    Start,
60    /// Reading the first response packet
61    ReadingFirstPacket,
62    /// Reading column definitions
63    ReadingColumns { num_columns: usize },
64    /// Reading rows
65    ReadingRows,
66    /// Finished
67    Finished,
68}
69
70/// State machine for Query (text protocol) with integrated handler
71///
72/// The handler is provided at construction and called directly by the state machine.
73/// The `drive()` method returns actions indicating what I/O operation is needed next.
74pub struct Query<'h, H> {
75    state: QueryState,
76    handler: &'h mut H,
77    column_defs: Option<ColumnDefinitions>,
78}
79
80impl<'h, H: TextResultSetHandler> Query<'h, H> {
81    /// Create a new Query state machine with the given handler
82    pub fn new(handler: &'h mut H) -> Self {
83        Self {
84            state: QueryState::Start,
85            handler,
86            column_defs: None,
87        }
88    }
89
90    /// Drive the state machine forward
91    ///
92    /// # Arguments
93    /// * `buffer_set` - The buffer set containing buffers to read from/write to
94    ///
95    /// # Returns
96    /// * `Action::NeedPacket(&mut Vec<u8>)` - Needs more data in the specified buffer
97    /// * `Action::Finished` - Processing complete
98    pub fn step<'buf>(
99        &mut self,
100        buffer_set: &'buf mut BufferSet,
101    ) -> Result<crate::protocol::command::Action<'buf>> {
102        use crate::protocol::command::Action;
103        match &mut self.state {
104            QueryState::Start => {
105                // Request the first packet
106                self.state = QueryState::ReadingFirstPacket;
107                Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
108            }
109
110            QueryState::ReadingFirstPacket => {
111                let payload = &buffer_set.read_buffer[..];
112                let response = read_query_response(payload)?;
113
114                match response {
115                    QueryResponse::Ok(ok_bytes) => {
116                        // Parse OK packet to check status flags
117                        use crate::constant::ServerStatusFlags;
118                        use crate::protocol::response::OkPayload;
119
120                        let ok_payload = OkPayload::try_from(ok_bytes)?;
121                        self.handler.no_result_set(ok_bytes)?;
122
123                        // Check if there are more results to come
124                        if ok_payload
125                            .status_flags
126                            .contains(ServerStatusFlags::SERVER_MORE_RESULTS_EXISTS)
127                        {
128                            // More resultsets coming, go to ReadingFirstPacket to process next result
129                            self.state = QueryState::ReadingFirstPacket;
130                            Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
131                        } else {
132                            // No more results, we're done
133                            self.state = QueryState::Finished;
134                            Ok(Action::Finished)
135                        }
136                    }
137                    QueryResponse::ResultSet { column_count } => {
138                        let num_columns = column_count as usize;
139                        self.state = QueryState::ReadingColumns { num_columns };
140                        Ok(Action::ReadColumnMetadata { num_columns })
141                    }
142                }
143            }
144
145            QueryState::ReadingColumns { num_columns } => {
146                // Parse all column definitions from the buffer
147                // The buffer contains [len(u32)][payload][len(u32)][payload]...
148                let column_defs = ColumnDefinitions::new(
149                    *num_columns,
150                    std::mem::take(&mut buffer_set.column_definition_buffer),
151                )?;
152
153                self.handler.resultset_start(column_defs.definitions())?;
154                self.column_defs = Some(column_defs);
155                self.state = QueryState::ReadingRows;
156                Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
157            }
158
159            QueryState::ReadingRows => {
160                let payload = &buffer_set.read_buffer[..];
161                // A valid row's first item is NULL (0xFB) or string<lenenc>.
162                // string<lenenc> starts with int<lenenc> which cannot start with 0xFF (ErrPacket header).
163                // Hence, 0xFF always means Err.
164                //
165                // Similarly, string<lenenc> starting with 0xFE means that the length of a string is at least 2^24, which means the packet is of the size 2^24.
166                // The Ok-Packet for EOF cannot be this long, therefore 0xFE with payload.len() determines the payload length.
167                match payload.first() {
168                    Some(0xFF) => Err(ErrPayloadBytes(payload))?,
169                    Some(0xFE) if payload.len() != MAX_PAYLOAD_LENGTH => {
170                        // Parse OK packet to check status flags
171                        use crate::constant::ServerStatusFlags;
172                        use crate::protocol::response::OkPayload;
173
174                        let ok_bytes = OkPayloadBytes(payload);
175                        let ok_payload = OkPayload::try_from(ok_bytes)?;
176                        self.handler.resultset_end(ok_bytes)?;
177
178                        // Check if there are more results to come
179                        if ok_payload
180                            .status_flags
181                            .contains(ServerStatusFlags::SERVER_MORE_RESULTS_EXISTS)
182                        {
183                            // More resultsets coming, go to ReadingFirstPacket to process next result
184                            self.state = QueryState::ReadingFirstPacket;
185                            Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
186                        } else {
187                            // No more results, we're done
188                            self.state = QueryState::Finished;
189                            Ok(Action::Finished)
190                        }
191                    }
192                    _ => {
193                        let cols = self.column_defs.as_ref().ok_or_else(|| {
194                            Error::LibraryBug(eyre!("no column definitions while reading rows"))
195                        })?;
196                        let row = TextRowPayload(payload);
197                        self.handler.row(cols.definitions(), row)?;
198                        Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
199                    }
200                }
201            }
202
203            QueryState::Finished => Err(Error::LibraryBug(eyre!(
204                "Query::step called after finished"
205            ))),
206        }
207    }
208}