use crate::buffer::BufferSet;
use crate::constant::CommandByte;
use crate::error::{Error, Result, eyre};
use crate::protocol::TextRowPayload;
use crate::protocol::command::ColumnDefinitions;
use crate::protocol::primitive::*;
use crate::protocol::response::{ErrPayloadBytes, OkPayloadBytes};
const MAX_PAYLOAD_LENGTH: usize = (1 << 24) - 4;
pub fn write_query(out: &mut Vec<u8>, sql: &str) {
write_int_1(out, CommandByte::Query as u8);
out.extend_from_slice(sql.as_bytes());
}
pub fn read_query_response(payload: &[u8]) -> Result<QueryResponse<'_>> {
if payload.is_empty() {
return Err(Error::LibraryBug(eyre!(
"read_query_response: empty payload"
)));
}
match payload[0] {
0xFF => Err(ErrPayloadBytes(payload).into()),
0x00 => Ok(QueryResponse::Ok(OkPayloadBytes(payload))),
0xFB => Err(Error::BadUsageError(
"LOCAL INFILE queries are not yet supported".to_string(),
)),
_ => {
let (column_count, _rest) = read_int_lenenc(payload)?;
Ok(QueryResponse::ResultSet { column_count })
}
}
}
#[derive(Debug)]
pub enum QueryResponse<'a> {
Ok(OkPayloadBytes<'a>),
ResultSet { column_count: u64 },
}
use crate::protocol::r#trait::TextResultSetHandler;
enum QueryState {
Start,
ReadingFirstPacket,
ReadingColumns { num_columns: usize },
ReadingRows,
Finished,
}
pub struct Query<'h, H> {
state: QueryState,
handler: &'h mut H,
column_defs: Option<ColumnDefinitions>,
}
impl<'h, H: TextResultSetHandler> Query<'h, H> {
pub fn new(handler: &'h mut H) -> Self {
Self {
state: QueryState::Start,
handler,
column_defs: None,
}
}
pub fn step<'buf>(
&mut self,
buffer_set: &'buf mut BufferSet,
) -> Result<crate::protocol::command::Action<'buf>> {
use crate::protocol::command::Action;
match &mut self.state {
QueryState::Start => {
self.state = QueryState::ReadingFirstPacket;
Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
}
QueryState::ReadingFirstPacket => {
let payload = &buffer_set.read_buffer[..];
let response = read_query_response(payload)?;
match response {
QueryResponse::Ok(ok_bytes) => {
use crate::constant::ServerStatusFlags;
use crate::protocol::response::OkPayload;
let ok_payload = OkPayload::try_from(ok_bytes)?;
self.handler.no_result_set(ok_bytes)?;
if ok_payload
.status_flags
.contains(ServerStatusFlags::SERVER_MORE_RESULTS_EXISTS)
{
self.state = QueryState::ReadingFirstPacket;
Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
} else {
self.state = QueryState::Finished;
Ok(Action::Finished)
}
}
QueryResponse::ResultSet { column_count } => {
let num_columns = column_count as usize;
self.state = QueryState::ReadingColumns { num_columns };
Ok(Action::ReadColumnMetadata { num_columns })
}
}
}
QueryState::ReadingColumns { num_columns } => {
let column_defs = ColumnDefinitions::new(
*num_columns,
std::mem::take(&mut buffer_set.column_definition_buffer),
)?;
self.handler.resultset_start(column_defs.definitions())?;
self.column_defs = Some(column_defs);
self.state = QueryState::ReadingRows;
Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
}
QueryState::ReadingRows => {
let payload = &buffer_set.read_buffer[..];
match payload.first() {
Some(0xFF) => Err(ErrPayloadBytes(payload))?,
Some(0xFE) if payload.len() != MAX_PAYLOAD_LENGTH => {
use crate::constant::ServerStatusFlags;
use crate::protocol::response::OkPayload;
let ok_bytes = OkPayloadBytes(payload);
let ok_payload = OkPayload::try_from(ok_bytes)?;
self.handler.resultset_end(ok_bytes)?;
if ok_payload
.status_flags
.contains(ServerStatusFlags::SERVER_MORE_RESULTS_EXISTS)
{
self.state = QueryState::ReadingFirstPacket;
Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
} else {
self.state = QueryState::Finished;
Ok(Action::Finished)
}
}
_ => {
let cols = self.column_defs.as_ref().ok_or_else(|| {
Error::LibraryBug(eyre!("no column definitions while reading rows"))
})?;
let row = TextRowPayload(payload);
self.handler.row(cols.definitions(), row)?;
Ok(Action::NeedPacket(&mut buffer_set.read_buffer))
}
}
}
QueryState::Finished => Err(Error::LibraryBug(eyre!(
"Query::step called after finished"
))),
}
}
}