use futures_core::future::BoxFuture;
use crate::cursor::Cursor;
use crate::describe::{Column, Describe};
use crate::executor::{Execute, Executor, RefExecutor};
use crate::mysql::protocol::{
self, ColumnDefinition, ComQuery, ComStmtExecute, ComStmtPrepare, ComStmtPrepareOk, FieldFlags,
Status,
};
use crate::mysql::{MySql, MySqlArguments, MySqlCursor, MySqlTypeInfo};
impl super::MySqlConnection {
async fn prepare(&mut self, query: &str) -> crate::Result<ComStmtPrepareOk> {
self.stream.send(ComStmtPrepare { query }, true).await?;
let packet = self.stream.receive().await?;
if packet[0] == 0xFF {
return self.stream.handle_err();
}
ComStmtPrepareOk::read(packet)
}
async fn drop_column_defs(&mut self, count: usize) -> crate::Result<()> {
for _ in 0..count {
let _column = ColumnDefinition::read(self.stream.receive().await?)?;
}
if count > 0 {
self.stream.maybe_receive_eof().await?;
}
Ok(())
}
async fn get_or_prepare(&mut self, query: &str) -> crate::Result<u32> {
if let Some(&id) = self.cache_statement.get(query) {
Ok(id)
} else {
let stmt = self.prepare(query).await?;
self.cache_statement.insert(query.into(), stmt.statement_id);
self.drop_column_defs(stmt.params as usize).await?;
self.drop_column_defs(stmt.columns as usize).await?;
Ok(stmt.statement_id)
}
}
pub(crate) async fn run(
&mut self,
query: &str,
arguments: Option<MySqlArguments>,
) -> crate::Result<Option<u32>> {
self.stream.wait_until_ready().await?;
self.stream.is_ready = false;
if let Some(arguments) = arguments {
let statement_id = self.get_or_prepare(query).await?;
self.stream
.send(
ComStmtExecute {
cursor: protocol::Cursor::NO_CURSOR,
statement_id,
params: &arguments.params,
null_bitmap: &arguments.null_bitmap,
param_types: &arguments.param_types,
},
true,
)
.await?;
Ok(Some(statement_id))
} else {
self.stream.send(ComQuery { query }, true).await?;
Ok(None)
}
}
async fn affected_rows(&mut self) -> crate::Result<u64> {
let mut rows = 0;
loop {
let id = self.stream.receive().await?[0];
match id {
0x00 | 0xFE if self.stream.packet().len() < 0xFF_FF_FF => {
let status = if let Some(eof) = self.stream.maybe_handle_eof()? {
eof.status
} else {
let ok = self.stream.handle_ok()?;
rows += ok.affected_rows;
ok.status
};
if !status.contains(Status::SERVER_MORE_RESULTS_EXISTS) {
self.is_ready = true;
break;
}
}
0xFF => {
return self.stream.handle_err();
}
_ => {}
}
}
Ok(rows)
}
async fn do_describe(&mut self, query: &str) -> crate::Result<Describe<MySql>> {
self.stream.wait_until_ready().await?;
let stmt = self.prepare(query).await?;
let mut param_types = Vec::with_capacity(stmt.params as usize);
let mut result_columns = Vec::with_capacity(stmt.columns as usize);
for _ in 0..stmt.params {
let param = ColumnDefinition::read(self.stream.receive().await?)?;
param_types.push(MySqlTypeInfo::from_column_def(¶m));
}
if stmt.params > 0 {
self.stream.maybe_receive_eof().await?;
}
for _ in 0..stmt.columns {
let column = ColumnDefinition::read(self.stream.receive().await?)?;
result_columns.push(Column::<MySql> {
type_info: MySqlTypeInfo::from_column_def(&column),
name: column.column_alias.or(column.column),
table_id: column.table_alias.or(column.table),
non_null: Some(column.flags.contains(FieldFlags::NOT_NULL)),
});
}
if stmt.columns > 0 {
self.stream.maybe_receive_eof().await?;
}
Ok(Describe {
param_types: param_types.into_boxed_slice(),
result_columns: result_columns.into_boxed_slice(),
})
}
}
impl Executor for super::MySqlConnection {
type Database = MySql;
fn execute<'e, 'q: 'e, 'c: 'e, E: 'e>(
&'c mut self,
query: E,
) -> BoxFuture<'e, crate::Result<u64>>
where
E: Execute<'q, Self::Database>,
{
log_execution!(query, {
Box::pin(async move {
let (query, arguments) = query.into_parts();
self.run(query, arguments).await?;
self.affected_rows().await
})
})
}
fn fetch<'q, E>(&mut self, query: E) -> MySqlCursor<'_, 'q>
where
E: Execute<'q, Self::Database>,
{
log_execution!(query, { MySqlCursor::from_connection(self, query) })
}
#[doc(hidden)]
fn describe<'e, 'q, E: 'e>(
&'e mut self,
query: E,
) -> BoxFuture<'e, crate::Result<Describe<Self::Database>>>
where
E: Execute<'q, Self::Database>,
{
Box::pin(async move { self.do_describe(query.into_parts().0).await })
}
}
impl<'c> RefExecutor<'c> for &'c mut super::MySqlConnection {
type Database = MySql;
fn fetch_by_ref<'q, E>(self, query: E) -> MySqlCursor<'c, 'q>
where
E: Execute<'q, Self::Database>,
{
log_execution!(query, { MySqlCursor::from_connection(self, query) })
}
}