use std::cell::Cell as ThreadCell;
use std::ffi::{CStr, CString};
use std::fmt::Display;
use std::marker::PhantomData;
use std::mem;
use std::os::fd::{AsRawFd, RawFd};
use std::ptr::{self, NonNull};
use std::slice;
use std::str;
use quex_mariadb_sys as ffi;
use super::error::{Error, Result};
use super::rows::{Cell, Column, StatementCell};
use super::value::{
DateTimeTzValue, DateTimeValue, DateValue, ParamSource, TimeValue, Value, ValueRef,
};
pub(crate) const MYSQL_TYPE_FLOAT: u32 = 4;
pub(crate) const MYSQL_TYPE_DOUBLE: u32 = 5;
pub(crate) const MYSQL_TYPE_TIMESTAMP: u32 = 7;
pub(crate) const MYSQL_TYPE_LONGLONG: u32 = 8;
pub(crate) const MYSQL_TYPE_DATE: u32 = 10;
pub(crate) const MYSQL_TYPE_TIME: u32 = 11;
pub(crate) const MYSQL_TYPE_DATETIME: u32 = 12;
pub(crate) const MYSQL_TYPE_TIMESTAMP2: u32 = 17;
pub(crate) const MYSQL_TYPE_DATETIME2: u32 = 18;
pub(crate) const MYSQL_TYPE_TIME2: u32 = 19;
pub(crate) const NOT_NULL_FLAG: u32 = 1;
pub(crate) const WAIT_READ: i32 = ffi::MYSQL_WAIT_READ as i32;
pub(crate) const WAIT_WRITE: i32 = ffi::MYSQL_WAIT_WRITE as i32;
pub(crate) const WAIT_EXCEPT: i32 = ffi::MYSQL_WAIT_EXCEPT as i32;
pub(crate) const WAIT_TIMEOUT: i32 = ffi::MYSQL_WAIT_TIMEOUT as i32;
thread_local! {
static MYSQL_THREAD_GUARD: MysqlThreadGuard = MysqlThreadGuard::new();
}
struct MysqlThreadGuard {
initialized: ThreadCell<bool>,
}
impl MysqlThreadGuard {
fn new() -> Self {
let initialized = unsafe { ffi::mysql_thread_init() == 0 };
Self {
initialized: ThreadCell::new(initialized),
}
}
}
impl Drop for MysqlThreadGuard {
fn drop(&mut self) {
if self.initialized.get() {
unsafe {
ffi::mysql_thread_end();
}
}
}
}
#[inline]
fn mysql_thread_initialized() -> bool {
MYSQL_THREAD_GUARD
.try_with(|guard| guard.initialized.get())
.unwrap_or(false)
}
#[inline]
pub(crate) fn ensure_mysql_thread_ready() -> Result<()> {
if mysql_thread_initialized() {
Ok(())
} else {
Err(Error::new("mysql_thread_init failed"))
}
}
#[inline]
pub(crate) fn ensure_mysql_thread_ready_for_drop() -> bool {
mysql_thread_initialized()
}
pub(crate) enum DriveOutput {
Connect(MysqlOut),
Query(i32),
StoreResult(MysqlResOut),
StmtPrepare(i32),
StmtExecute(i32),
StmtStoreResult(i32),
Commit(ffi::my_bool),
Rollback(ffi::my_bool),
}
impl DriveOutput {
pub(crate) fn connect(mysql: MysqlHandle) -> Self {
Self::Connect(MysqlOut {
_ptr: mysql.as_ptr(),
})
}
pub(crate) fn query() -> Self {
Self::Query(0)
}
pub(crate) fn store_result() -> Self {
Self::StoreResult(MysqlResOut {
ptr: ptr::null_mut(),
})
}
pub(crate) fn stmt_prepare() -> Self {
Self::StmtPrepare(0)
}
pub(crate) fn stmt_execute() -> Self {
Self::StmtExecute(0)
}
pub(crate) fn stmt_store_result() -> Self {
Self::StmtStoreResult(0)
}
pub(crate) fn commit() -> Self {
Self::Commit(0)
}
pub(crate) fn rollback() -> Self {
Self::Rollback(0)
}
fn expect_connect_mut(&mut self) -> &mut MysqlOut {
match self {
Self::Connect(out) => out,
_ => panic!("drive output mismatch: expected connect"),
}
}
fn expect_query_mut(&mut self) -> &mut i32 {
match self {
Self::Query(out) => out,
_ => panic!("drive output mismatch: expected query"),
}
}
fn expect_store_result_mut(&mut self) -> &mut MysqlResOut {
match self {
Self::StoreResult(out) => out,
_ => panic!("drive output mismatch: expected store_result"),
}
}
fn expect_stmt_prepare_mut(&mut self) -> &mut i32 {
match self {
Self::StmtPrepare(out) => out,
_ => panic!("drive output mismatch: expected stmt_prepare"),
}
}
fn expect_stmt_execute_mut(&mut self) -> &mut i32 {
match self {
Self::StmtExecute(out) => out,
_ => panic!("drive output mismatch: expected stmt_execute"),
}
}
fn expect_stmt_store_result_mut(&mut self) -> &mut i32 {
match self {
Self::StmtStoreResult(out) => out,
_ => panic!("drive output mismatch: expected stmt_store_result"),
}
}
fn expect_commit_mut(&mut self) -> &mut ffi::my_bool {
match self {
Self::Commit(out) => out,
_ => panic!("drive output mismatch: expected commit"),
}
}
fn expect_rollback_mut(&mut self) -> &mut ffi::my_bool {
match self {
Self::Rollback(out) => out,
_ => panic!("drive output mismatch: expected rollback"),
}
}
pub(crate) fn query_code(&self) -> i32 {
match self {
Self::Query(code) => *code,
_ => panic!("drive output mismatch: expected query"),
}
}
pub(crate) fn connect_ptr(&self) -> *mut ffi::MYSQL {
match self {
Self::Connect(out) => out._ptr,
_ => panic!("drive output mismatch: expected connect"),
}
}
pub(crate) fn store_result_ptr(&self) -> *mut ffi::MYSQL_RES {
match self {
Self::StoreResult(out) => out.ptr,
_ => panic!("drive output mismatch: expected store_result"),
}
}
pub(crate) fn stmt_prepare_code(&self) -> i32 {
match self {
Self::StmtPrepare(code) => *code,
_ => panic!("drive output mismatch: expected stmt_prepare"),
}
}
pub(crate) fn stmt_execute_code(&self) -> i32 {
match self {
Self::StmtExecute(code) => *code,
_ => panic!("drive output mismatch: expected stmt_execute"),
}
}
pub(crate) fn stmt_store_result_code(&self) -> i32 {
match self {
Self::StmtStoreResult(code) => *code,
_ => panic!("drive output mismatch: expected stmt_store_result"),
}
}
pub(crate) fn commit_code(&self) -> ffi::my_bool {
match self {
Self::Commit(code) => *code,
_ => panic!("drive output mismatch: expected commit"),
}
}
pub(crate) fn rollback_code(&self) -> ffi::my_bool {
match self {
Self::Rollback(code) => *code,
_ => panic!("drive output mismatch: expected rollback"),
}
}
}
pub(crate) unsafe fn start_operation(op: DriveOperation<'_>, out: &mut DriveOutput) -> i32 {
match op {
DriveOperation::Connect {
mysql,
host,
user,
password,
database,
port,
unix_socket,
} => {
unsafe {
ffi::mysql_real_connect_start(
&mut out.expect_connect_mut()._ptr as *mut *mut ffi::MYSQL,
mysql.as_ptr(),
opt_ptr(host),
opt_ptr(user),
opt_ptr(password),
opt_ptr(database),
port,
opt_ptr(unix_socket),
0,
)
}
}
DriveOperation::Query { mysql, sql } => {
unsafe {
ffi::mysql_real_query_start(
out.expect_query_mut(),
mysql.as_ptr(),
sql.as_ptr(),
sql.as_bytes().len() as u64,
)
}
}
DriveOperation::StoreResult { mysql } => {
unsafe {
ffi::mysql_store_result_start(
&mut out.expect_store_result_mut().ptr as *mut *mut ffi::MYSQL_RES,
mysql.as_ptr(),
)
}
}
DriveOperation::StmtPrepare { stmt, sql } => {
unsafe {
ffi::mysql_stmt_prepare_start(
out.expect_stmt_prepare_mut(),
stmt.as_ptr(),
sql.as_ptr(),
sql.as_bytes().len() as u64,
)
}
}
DriveOperation::StmtExecute { stmt } => {
unsafe { ffi::mysql_stmt_execute_start(out.expect_stmt_execute_mut(), stmt.as_ptr()) }
}
DriveOperation::StmtStoreResult { stmt } => {
unsafe {
ffi::mysql_stmt_store_result_start(
out.expect_stmt_store_result_mut(),
stmt.as_ptr(),
)
}
}
DriveOperation::Commit { mysql } => {
unsafe { ffi::mysql_commit_start(out.expect_commit_mut(), mysql.as_ptr()) }
}
DriveOperation::Rollback { mysql } => {
unsafe { ffi::mysql_rollback_start(out.expect_rollback_mut(), mysql.as_ptr()) }
}
}
}
pub(crate) unsafe fn continue_operation(
op: DriveOperation<'_>,
out: &mut DriveOutput,
ready: i32,
) -> i32 {
match op {
DriveOperation::Connect { mysql, .. } => {
unsafe {
ffi::mysql_real_connect_cont(
&mut out.expect_connect_mut()._ptr as *mut *mut ffi::MYSQL,
mysql.as_ptr(),
ready,
)
}
}
DriveOperation::Query { mysql, .. } => {
unsafe { ffi::mysql_real_query_cont(out.expect_query_mut(), mysql.as_ptr(), ready) }
}
DriveOperation::StoreResult { mysql } => {
unsafe {
ffi::mysql_store_result_cont(
&mut out.expect_store_result_mut().ptr as *mut *mut ffi::MYSQL_RES,
mysql.as_ptr(),
ready,
)
}
}
DriveOperation::StmtPrepare { stmt, .. } => {
unsafe {
ffi::mysql_stmt_prepare_cont(out.expect_stmt_prepare_mut(), stmt.as_ptr(), ready)
}
}
DriveOperation::StmtExecute { stmt } => {
unsafe {
ffi::mysql_stmt_execute_cont(out.expect_stmt_execute_mut(), stmt.as_ptr(), ready)
}
}
DriveOperation::StmtStoreResult { stmt } => {
unsafe {
ffi::mysql_stmt_store_result_cont(
out.expect_stmt_store_result_mut(),
stmt.as_ptr(),
ready,
)
}
}
DriveOperation::Commit { mysql } => {
unsafe { ffi::mysql_commit_cont(out.expect_commit_mut(), mysql.as_ptr(), ready) }
}
DriveOperation::Rollback { mysql } => {
unsafe { ffi::mysql_rollback_cont(out.expect_rollback_mut(), mysql.as_ptr(), ready) }
}
}
}
#[derive(Clone, Copy)]
pub(crate) enum DriveOperation<'a> {
Connect {
mysql: MysqlHandle,
host: &'a Option<CString>,
user: &'a Option<CString>,
password: &'a Option<CString>,
database: &'a Option<CString>,
port: u32,
unix_socket: &'a Option<CString>,
},
Query {
mysql: MysqlHandle,
sql: &'a CString,
},
StoreResult {
mysql: MysqlHandle,
},
StmtPrepare {
stmt: StmtHandle,
sql: &'a CString,
},
StmtExecute {
stmt: StmtHandle,
},
StmtStoreResult {
stmt: StmtHandle,
},
Commit {
mysql: MysqlHandle,
},
Rollback {
mysql: MysqlHandle,
},
}
#[repr(transparent)]
#[derive(Clone, Copy)]
pub(crate) struct MysqlOut {
pub(crate) _ptr: *mut ffi::MYSQL,
}
#[repr(transparent)]
#[derive(Clone, Copy)]
pub(crate) struct MysqlResOut {
pub(crate) ptr: *mut ffi::MYSQL_RES,
}
#[derive(Clone, Copy)]
pub(crate) struct SocketRef(pub(crate) RawFd);
impl AsRawFd for SocketRef {
#[inline]
fn as_raw_fd(&self) -> RawFd {
self.0
}
}
pub(crate) struct ParamBindings<'a> {
pub(crate) binds: Vec<ffi::MYSQL_BIND>,
_scalars: Vec<ScalarSlot>,
_times: Vec<ffi::MYSQL_TIME>,
_owned: Vec<Vec<u8>>,
_lengths: Vec<u64>,
_nulls: Vec<ffi::my_bool>,
_marker: PhantomData<&'a [Value]>,
}
pub(crate) struct ParamScratch {
pub(crate) binds: Vec<ffi::MYSQL_BIND>,
scalars: Vec<ScalarSlot>,
times: Vec<ffi::MYSQL_TIME>,
owned: Vec<Vec<u8>>,
lengths: Vec<u64>,
nulls: Vec<ffi::my_bool>,
}
#[derive(Clone, Copy)]
pub(crate) struct MysqlHandle(pub(crate) NonNull<ffi::MYSQL>);
impl MysqlHandle {
#[inline]
pub(crate) fn as_ptr(self) -> *mut ffi::MYSQL {
self.0.as_ptr()
}
}
#[derive(Clone, Copy)]
pub(crate) struct StmtHandle(pub(crate) NonNull<ffi::MYSQL_STMT>);
impl StmtHandle {
#[inline]
pub(crate) fn as_ptr(self) -> *mut ffi::MYSQL_STMT {
self.0.as_ptr()
}
}
#[derive(Clone, Copy)]
pub(crate) struct ResultHandle(pub(crate) NonNull<ffi::MYSQL_RES>);
impl ResultHandle {
#[inline]
pub(crate) fn as_ptr(self) -> *mut ffi::MYSQL_RES {
self.0.as_ptr()
}
}
#[derive(Clone, Copy, Default)]
struct ScalarSlot {
i64_value: i64,
u64_value: u64,
f64_value: f64,
}
unsafe impl Send for MysqlHandle {}
unsafe impl Send for StmtHandle {}
unsafe impl Send for ResultHandle {}
unsafe impl Send for MysqlOut {}
unsafe impl Send for MysqlResOut {}
unsafe impl Send for DriveOutput {}
unsafe impl Send for DriveOperation<'_> {}
unsafe impl Send for ParamScratch {}
unsafe impl Send for ParamBindings<'_> {}
impl<'a> ParamBindings<'a> {
pub(crate) fn new<P>(params: &'a P) -> Self
where
P: ParamSource + ?Sized,
{
let len = params.len();
let mut binds = (0..len).map(|_| mysql_bind_init()).collect::<Vec<_>>();
let mut scalars = vec![ScalarSlot::default(); len];
let mut times = vec![mysql_time_init(); len];
let mut owned = Vec::with_capacity(len);
let mut lengths = vec![0; len];
let mut nulls = vec![0 as ffi::my_bool; len];
for index in 0..len {
let value = params.value_at(index);
let bind = &mut binds[index];
bind.is_null = &mut nulls[index];
match value {
ValueRef::Null => {
nulls[index] = 1;
bind.buffer_type = ffi::enum_field_types_MYSQL_TYPE_NULL;
}
ValueRef::I64(v) => {
scalars[index].i64_value = v;
bind.buffer_type = ffi::enum_field_types_MYSQL_TYPE_LONGLONG;
bind.buffer = (&mut scalars[index].i64_value as *mut i64).cast();
}
ValueRef::U64(v) => {
scalars[index].u64_value = v;
bind.buffer_type = ffi::enum_field_types_MYSQL_TYPE_LONGLONG;
bind.is_unsigned = 1;
bind.buffer = (&mut scalars[index].u64_value as *mut u64).cast();
}
ValueRef::F64(v) => {
scalars[index].f64_value = v;
bind.buffer_type = ffi::enum_field_types_MYSQL_TYPE_DOUBLE;
bind.buffer = (&mut scalars[index].f64_value as *mut f64).cast();
}
ValueRef::Date(value) => {
times[index] = mysql_date(value);
bind.buffer_type = ffi::enum_field_types_MYSQL_TYPE_DATE;
bind.buffer = (&mut times[index] as *mut ffi::MYSQL_TIME).cast();
bind.buffer_length = mem::size_of::<ffi::MYSQL_TIME>() as u64;
}
ValueRef::Time(value) => {
times[index] = mysql_time(value);
bind.buffer_type = ffi::enum_field_types_MYSQL_TYPE_TIME;
bind.buffer = (&mut times[index] as *mut ffi::MYSQL_TIME).cast();
bind.buffer_length = mem::size_of::<ffi::MYSQL_TIME>() as u64;
}
ValueRef::DateTime(value) => {
times[index] = mysql_datetime(
value,
ffi::enum_mysql_timestamp_type_MYSQL_TIMESTAMP_DATETIME,
);
bind.buffer_type = ffi::enum_field_types_MYSQL_TYPE_DATETIME;
bind.buffer = (&mut times[index] as *mut ffi::MYSQL_TIME).cast();
bind.buffer_length = mem::size_of::<ffi::MYSQL_TIME>() as u64;
}
ValueRef::DateTimeTz(value) => {
let encoded = format_datetime_tz_text(value);
lengths[index] = u64::try_from(encoded.len())
.expect("encoded datetime with offset length exceeds u64");
owned.push(encoded);
bind.buffer_type = ffi::enum_field_types_MYSQL_TYPE_STRING;
bind.buffer = owned.last().unwrap().as_ptr() as *mut _;
bind.buffer_length = lengths[index];
bind.length = &mut lengths[index];
}
ValueRef::Uuid(bytes) => {
let encoded = format_uuid_text(*bytes);
lengths[index] =
u64::try_from(encoded.len()).expect("encoded uuid length exceeds u64");
owned.push(encoded);
bind.buffer_type = ffi::enum_field_types_MYSQL_TYPE_STRING;
bind.buffer = owned.last().unwrap().as_ptr() as *mut _;
bind.buffer_length = lengths[index];
bind.length = &mut lengths[index];
}
ValueRef::Bytes(bytes) => {
lengths[index] =
u64::try_from(bytes.len()).expect("byte parameter length exceeds u64");
bind.buffer_type = ffi::enum_field_types_MYSQL_TYPE_BLOB;
bind.buffer = bytes.as_ptr() as *mut _;
bind.buffer_length = lengths[index];
bind.length = &mut lengths[index];
}
ValueRef::String(text) => {
lengths[index] =
u64::try_from(text.len()).expect("string parameter length exceeds u64");
bind.buffer_type = ffi::enum_field_types_MYSQL_TYPE_STRING;
bind.buffer = text.as_ptr() as *mut _;
bind.buffer_length = lengths[index];
bind.length = &mut lengths[index];
}
}
}
Self {
binds,
_scalars: scalars,
_times: times,
_owned: owned,
_lengths: lengths,
_nulls: nulls,
_marker: PhantomData,
}
}
}
impl ParamScratch {
pub(crate) fn new(param_count: usize) -> Self {
let binds = (0..param_count).map(|_| mysql_bind_init()).collect();
Self {
binds,
scalars: vec![ScalarSlot::default(); param_count],
times: vec![mysql_time_init(); param_count],
owned: Vec::with_capacity(param_count),
lengths: vec![0; param_count],
nulls: vec![0 as ffi::my_bool; param_count],
}
}
pub(crate) fn bind_source<P>(&mut self, params: &P) -> Result<()>
where
P: ParamSource + ?Sized,
{
if self.binds.len() != params.len() {
return Err(Error::new(format!(
"statement expects {} parameters but got {}",
self.binds.len(),
params.len()
)));
}
self.owned.clear();
self.owned.reserve(params.len());
for index in 0..params.len() {
let value = params.value_at(index);
let bind = &mut self.binds[index];
*bind = mysql_bind_init();
self.nulls[index] = 0;
self.lengths[index] = 0;
bind.is_null = &mut self.nulls[index];
match value {
ValueRef::Null => {
self.nulls[index] = 1;
bind.buffer_type = ffi::enum_field_types_MYSQL_TYPE_NULL;
}
ValueRef::I64(v) => {
self.scalars[index].i64_value = v;
bind.buffer_type = ffi::enum_field_types_MYSQL_TYPE_LONGLONG;
bind.buffer = (&mut self.scalars[index].i64_value as *mut i64).cast();
}
ValueRef::U64(v) => {
self.scalars[index].u64_value = v;
bind.buffer_type = ffi::enum_field_types_MYSQL_TYPE_LONGLONG;
bind.is_unsigned = 1;
bind.buffer = (&mut self.scalars[index].u64_value as *mut u64).cast();
}
ValueRef::F64(v) => {
self.scalars[index].f64_value = v;
bind.buffer_type = ffi::enum_field_types_MYSQL_TYPE_DOUBLE;
bind.buffer = (&mut self.scalars[index].f64_value as *mut f64).cast();
}
ValueRef::Date(value) => {
self.times[index] = mysql_date(value);
bind.buffer_type = ffi::enum_field_types_MYSQL_TYPE_DATE;
bind.buffer = (&mut self.times[index] as *mut ffi::MYSQL_TIME).cast();
bind.buffer_length = mem::size_of::<ffi::MYSQL_TIME>() as u64;
}
ValueRef::Time(value) => {
self.times[index] = mysql_time(value);
bind.buffer_type = ffi::enum_field_types_MYSQL_TYPE_TIME;
bind.buffer = (&mut self.times[index] as *mut ffi::MYSQL_TIME).cast();
bind.buffer_length = mem::size_of::<ffi::MYSQL_TIME>() as u64;
}
ValueRef::DateTime(value) => {
self.times[index] = mysql_datetime(
value,
ffi::enum_mysql_timestamp_type_MYSQL_TIMESTAMP_DATETIME,
);
bind.buffer_type = ffi::enum_field_types_MYSQL_TYPE_DATETIME;
bind.buffer = (&mut self.times[index] as *mut ffi::MYSQL_TIME).cast();
bind.buffer_length = mem::size_of::<ffi::MYSQL_TIME>() as u64;
}
ValueRef::DateTimeTz(value) => {
let encoded = format_datetime_tz_text(value);
self.lengths[index] = u64::try_from(encoded.len())
.expect("encoded datetime with offset length exceeds u64");
self.owned.push(encoded);
bind.buffer_type = ffi::enum_field_types_MYSQL_TYPE_STRING;
bind.buffer = self.owned.last().unwrap().as_ptr() as *mut _;
bind.buffer_length = self.lengths[index];
bind.length = &mut self.lengths[index];
}
ValueRef::Uuid(bytes) => {
let encoded = format_uuid_text(*bytes);
self.lengths[index] =
u64::try_from(encoded.len()).expect("encoded uuid length exceeds u64");
self.owned.push(encoded);
bind.buffer_type = ffi::enum_field_types_MYSQL_TYPE_STRING;
bind.buffer = self.owned.last().unwrap().as_ptr() as *mut _;
bind.buffer_length = self.lengths[index];
bind.length = &mut self.lengths[index];
}
ValueRef::Bytes(bytes) => {
let owned = bytes.to_vec();
self.lengths[index] =
u64::try_from(owned.len()).expect("byte parameter length exceeds u64");
self.owned.push(owned);
bind.buffer_type = ffi::enum_field_types_MYSQL_TYPE_BLOB;
bind.buffer = self.owned.last().unwrap().as_ptr() as *mut _;
bind.buffer_length = self.lengths[index];
bind.length = &mut self.lengths[index];
}
ValueRef::String(text) => {
let owned = text.as_bytes().to_vec();
self.lengths[index] =
u64::try_from(owned.len()).expect("string parameter length exceeds u64");
self.owned.push(owned);
bind.buffer_type = ffi::enum_field_types_MYSQL_TYPE_STRING;
bind.buffer = self.owned.last().unwrap().as_ptr() as *mut _;
bind.buffer_length = self.lengths[index];
bind.length = &mut self.lengths[index];
}
}
}
Ok(())
}
}
fn format_uuid_text(bytes: [u8; 16]) -> Vec<u8> {
const HEX: &[u8; 16] = b"0123456789abcdef";
let mut out = Vec::with_capacity(36);
for (index, byte) in bytes.into_iter().enumerate() {
if matches!(index, 4 | 6 | 8 | 10) {
out.push(b'-');
}
out.push(HEX[(byte >> 4) as usize]);
out.push(HEX[(byte & 0x0f) as usize]);
}
out
}
fn format_date_text(value: DateValue) -> Vec<u8> {
format!("{:04}-{:02}-{:02}", value.year, value.month, value.day).into_bytes()
}
fn format_time_text(value: TimeValue) -> Vec<u8> {
if value.microsecond == 0 {
format!("{:02}:{:02}:{:02}", value.hour, value.minute, value.second).into_bytes()
} else {
let mut micros = format!("{:06}", value.microsecond);
while micros.ends_with('0') {
micros.pop();
}
format!(
"{:02}:{:02}:{:02}.{}",
value.hour, value.minute, value.second, micros
)
.into_bytes()
}
}
fn format_datetime_tz_text(value: DateTimeTzValue) -> Vec<u8> {
let date = format_date_text(value.datetime.date);
let time = format_time_text(value.datetime.time);
let offset = format_offset(value.offset_seconds);
let mut out = Vec::with_capacity(date.len() + 1 + time.len() + offset.len());
out.extend_from_slice(&date);
out.push(b'T');
out.extend_from_slice(&time);
out.extend_from_slice(offset.as_bytes());
out
}
fn format_offset(offset_seconds: i32) -> String {
let sign = if offset_seconds < 0 { '-' } else { '+' };
let offset_seconds = offset_seconds.abs();
let hours = offset_seconds / 3600;
let minutes = (offset_seconds % 3600) / 60;
format!("{sign}{hours:02}:{minutes:02}")
}
fn mysql_time_init() -> ffi::MYSQL_TIME {
ffi::MYSQL_TIME {
year: 0,
month: 0,
day: 0,
hour: 0,
minute: 0,
second: 0,
second_part: 0,
neg: 0,
time_type: ffi::enum_mysql_timestamp_type_MYSQL_TIMESTAMP_NONE,
}
}
fn mysql_date(value: DateValue) -> ffi::MYSQL_TIME {
ffi::MYSQL_TIME {
year: value.year as u32,
month: value.month as u32,
day: value.day as u32,
hour: 0,
minute: 0,
second: 0,
second_part: 0,
neg: 0,
time_type: ffi::enum_mysql_timestamp_type_MYSQL_TIMESTAMP_DATE,
}
}
fn mysql_time(value: TimeValue) -> ffi::MYSQL_TIME {
ffi::MYSQL_TIME {
year: 0,
month: 0,
day: 0,
hour: value.hour as u32,
minute: value.minute as u32,
second: value.second as u32,
second_part: value.microsecond as u64,
neg: 0,
time_type: ffi::enum_mysql_timestamp_type_MYSQL_TIMESTAMP_TIME,
}
}
fn mysql_datetime(
value: DateTimeValue,
time_type: ffi::enum_mysql_timestamp_type,
) -> ffi::MYSQL_TIME {
ffi::MYSQL_TIME {
year: value.date.year as u32,
month: value.date.month as u32,
day: value.date.day as u32,
hour: value.time.hour as u32,
minute: value.time.minute as u32,
second: value.time.second as u32,
second_part: value.time.microsecond as u64,
neg: 0,
time_type,
}
}
#[inline]
pub(crate) fn mysql_bind_init() -> ffi::MYSQL_BIND {
unsafe { mem::MaybeUninit::<ffi::MYSQL_BIND>::zeroed().assume_init() }
}
#[inline]
pub(crate) fn decode_value(column: &Column, bytes: &[u8]) -> Result<Value> {
match column.column_type {
MYSQL_TYPE_LONGLONG => {
if bytes.first() == Some(&b'-') {
Ok(Value::I64(parse_i64_ascii(bytes)?))
} else {
Ok(Value::U64(parse_u64_ascii(bytes)?))
}
}
MYSQL_TYPE_FLOAT | MYSQL_TYPE_DOUBLE => {
let text = str::from_utf8(bytes).map_err(|err| Error::new(err.to_string()))?;
Ok(Value::F64(parse_text::<f64>(text, "f64")?))
}
MYSQL_TYPE_DATE => parse_mysql_date_text(bytes).map(Value::Date),
MYSQL_TYPE_TIME | MYSQL_TYPE_TIME2 => parse_mysql_time_text(bytes).map(Value::Time),
MYSQL_TYPE_DATETIME
| MYSQL_TYPE_DATETIME2
| MYSQL_TYPE_TIMESTAMP
| MYSQL_TYPE_TIMESTAMP2 => parse_mysql_datetime_text(bytes).map(Value::DateTime),
_ => match str::from_utf8(bytes) {
Ok(text) => Ok(Value::String(text.to_owned())),
Err(_) => Ok(Value::Bytes(bytes.to_vec())),
},
}
}
#[inline]
pub(crate) fn decode_statement_value(column: &Column, cell: &StatementCell) -> Result<Value> {
if cell.is_null != 0 {
return Ok(Value::Null);
}
match column.column_type {
1 | 2 | 3 | 8 | 9 => {
if column_is_unsigned(column.flags) != 0 {
statement_u64(cell, column).map(Value::U64)
} else {
statement_i64(cell, column).map(Value::I64)
}
}
MYSQL_TYPE_FLOAT | MYSQL_TYPE_DOUBLE => statement_f64(cell, column).map(Value::F64),
MYSQL_TYPE_DATE => parse_statement_mysql_time(cell)
.map(mysql_time_to_date)
.map(Value::Date),
MYSQL_TYPE_TIME | MYSQL_TYPE_TIME2 => parse_statement_mysql_time(cell)
.map(mysql_time_to_time)
.map(Value::Time),
MYSQL_TYPE_DATETIME
| MYSQL_TYPE_DATETIME2
| MYSQL_TYPE_TIMESTAMP
| MYSQL_TYPE_TIMESTAMP2 => parse_statement_mysql_time(cell)
.map(mysql_time_to_datetime)
.map(Value::DateTime),
_ => match str::from_utf8(statement_bytes(cell)) {
Ok(text) => Ok(Value::String(text.to_owned())),
Err(_) => Ok(Value::Bytes(statement_bytes(cell).to_vec())),
},
}
}
#[inline]
pub(crate) fn parse_number<T>(bytes: &[u8], name: &'static str) -> Result<T>
where
T: str::FromStr,
T::Err: Display,
{
let text = str::from_utf8(bytes).map_err(|err| Error::new(err.to_string()))?;
parse_text(text, name)
}
#[inline]
pub(crate) fn parse_i64_ascii(bytes: &[u8]) -> Result<i64> {
let bytes = non_null_bytes(bytes)?;
let (negative, digits) = if bytes[0] == b'-' {
(true, &bytes[1..])
} else {
(false, bytes)
};
let value = parse_u64_digits(digits, "i64")?;
if negative {
if value == (i64::MAX as u64) + 1 {
Ok(i64::MIN)
} else {
let signed = i64::try_from(value)
.map_err(|_| Error::new("failed to parse i64: out of range"))?;
Ok(-signed)
}
} else {
i64::try_from(value).map_err(|_| Error::new("failed to parse i64: out of range"))
}
}
#[inline]
pub(crate) fn parse_u64_ascii(bytes: &[u8]) -> Result<u64> {
parse_u64_digits(non_null_bytes(bytes)?, "u64")
}
#[inline]
fn parse_u64_digits(bytes: &[u8], name: &'static str) -> Result<u64> {
let mut value: u64 = 0;
for &byte in bytes {
if !byte.is_ascii_digit() {
return Err(Error::new(format!(
"failed to parse {}: invalid digit",
name
)));
}
value = value
.checked_mul(10)
.and_then(|value| value.checked_add((byte - b'0') as u64))
.ok_or_else(|| Error::new(format!("failed to parse {}: out of range", name)))?;
}
Ok(value)
}
#[inline]
fn non_null_bytes(bytes: &[u8]) -> Result<&[u8]> {
if bytes.is_empty() {
Err(Error::new("failed to parse numeric value: empty input"))
} else {
Ok(bytes)
}
}
#[inline]
pub(crate) fn cell_bytes(cell: &Cell) -> Result<&[u8]> {
if cell.is_null {
Err(Error::new("column is null"))
} else {
Ok(unsafe { mysql_cell_bytes(cell.ptr, cell.len) })
}
}
#[inline]
pub(crate) unsafe fn mysql_cell_bytes<'a>(ptr: *const u8, len: usize) -> &'a [u8] {
if len == 0 {
&[]
} else {
debug_assert!(!ptr.is_null());
unsafe { slice::from_raw_parts(ptr, len) }
}
}
#[inline]
pub(crate) fn statement_i64(cell: &StatementCell, column: &Column) -> Result<i64> {
if cell.is_null != 0 {
return Err(Error::new("column is null"));
}
match column.column_type {
1 => Ok(i8::from_ne_bytes([statement_bytes(cell)[0]]) as i64),
2 => {
let mut array = [0; 2];
array.copy_from_slice(&statement_bytes(cell)[..2]);
Ok(i16::from_ne_bytes(array) as i64)
}
3 | 9 => {
let mut array = [0; 4];
array.copy_from_slice(&statement_bytes(cell)[..4]);
Ok(i32::from_ne_bytes(array) as i64)
}
MYSQL_TYPE_LONGLONG => {
let bytes = statement_bytes(cell);
if bytes.len() == 8 {
let mut array = [0; 8];
array.copy_from_slice(bytes);
Ok(i64::from_ne_bytes(array))
} else {
parse_i64_ascii(bytes)
}
}
_ => parse_i64_ascii(statement_bytes(cell)),
}
}
#[inline]
pub(crate) fn statement_u64(cell: &StatementCell, column: &Column) -> Result<u64> {
if cell.is_null != 0 {
return Err(Error::new("column is null"));
}
match column.column_type {
1 => Ok(statement_bytes(cell)[0] as u64),
2 => {
let mut array = [0; 2];
array.copy_from_slice(&statement_bytes(cell)[..2]);
Ok(u16::from_ne_bytes(array) as u64)
}
3 | 9 => {
let mut array = [0; 4];
array.copy_from_slice(&statement_bytes(cell)[..4]);
Ok(u32::from_ne_bytes(array) as u64)
}
MYSQL_TYPE_LONGLONG => {
let bytes = statement_bytes(cell);
if bytes.len() == 8 {
let mut array = [0; 8];
array.copy_from_slice(bytes);
Ok(u64::from_ne_bytes(array))
} else {
parse_u64_ascii(bytes)
}
}
_ => parse_u64_ascii(statement_bytes(cell)),
}
}
#[inline]
pub(crate) fn statement_f64(cell: &StatementCell, column: &Column) -> Result<f64> {
if cell.is_null != 0 {
return Err(Error::new("column is null"));
}
let bytes = statement_bytes(cell);
match column.column_type {
MYSQL_TYPE_FLOAT => {
if bytes.len() == 4 {
let mut array = [0; 4];
array.copy_from_slice(bytes);
Ok(f32::from_ne_bytes(array) as f64)
} else {
parse_number::<f64>(bytes, "f64")
}
}
MYSQL_TYPE_DOUBLE => {
if bytes.len() == 8 {
let mut array = [0; 8];
array.copy_from_slice(bytes);
Ok(f64::from_ne_bytes(array))
} else {
parse_number::<f64>(bytes, "f64")
}
}
_ => parse_number::<f64>(bytes, "f64"),
}
}
#[inline]
pub(crate) fn statement_bytes(cell: &StatementCell) -> &[u8] {
&cell.buffer[..cell.length as usize]
}
pub(crate) fn parse_statement_mysql_time(cell: &StatementCell) -> Result<ffi::MYSQL_TIME> {
if cell.is_null != 0 {
return Err(Error::new("column is null"));
}
let expected = mem::size_of::<ffi::MYSQL_TIME>();
let bytes = statement_bytes(cell);
if bytes.len() != expected {
return Err(Error::new("invalid MYSQL_TIME buffer length"));
}
Ok(unsafe { bytes.as_ptr().cast::<ffi::MYSQL_TIME>().read_unaligned() })
}
#[inline]
fn parse_text<T>(text: &str, name: &'static str) -> Result<T>
where
T: str::FromStr,
T::Err: Display,
{
text.parse::<T>()
.map_err(|err| Error::new(format!("failed to parse {}: {}", name, err)))
}
pub(crate) fn parse_mysql_date_text(bytes: &[u8]) -> Result<DateValue> {
let text = str::from_utf8(bytes).map_err(|err| Error::new(err.to_string()))?;
let mut parts = text.split('-');
let year = parts
.next()
.ok_or_else(|| Error::new("invalid mysql date"))?
.parse()
.map_err(|err| Error::new(format!("failed to parse date year: {err}")))?;
let month = parts
.next()
.ok_or_else(|| Error::new("invalid mysql date"))?
.parse()
.map_err(|err| Error::new(format!("failed to parse date month: {err}")))?;
let day = parts
.next()
.ok_or_else(|| Error::new("invalid mysql date"))?
.parse()
.map_err(|err| Error::new(format!("failed to parse date day: {err}")))?;
Ok(DateValue { year, month, day })
}
pub(crate) fn parse_mysql_time_text(bytes: &[u8]) -> Result<TimeValue> {
let text = str::from_utf8(bytes).map_err(|err| Error::new(err.to_string()))?;
let text = text.strip_prefix('-').unwrap_or(text);
let (time, microsecond) = match text.split_once('.') {
Some((time, fraction)) => {
if fraction.is_empty()
|| fraction.len() > 6
|| !fraction.bytes().all(|b| b.is_ascii_digit())
{
return Err(Error::new("invalid mysql time"));
}
let mut micros = fraction
.parse::<u32>()
.map_err(|err| Error::new(format!("failed to parse time fraction: {err}")))?;
for _ in fraction.len()..6 {
micros *= 10;
}
(time, micros)
}
None => (text, 0),
};
let mut parts = time.split(':');
let hour = parts
.next()
.ok_or_else(|| Error::new("invalid mysql time"))?
.parse()
.map_err(|err| Error::new(format!("failed to parse time hour: {err}")))?;
let minute = parts
.next()
.ok_or_else(|| Error::new("invalid mysql time"))?
.parse()
.map_err(|err| Error::new(format!("failed to parse time minute: {err}")))?;
let second = parts
.next()
.ok_or_else(|| Error::new("invalid mysql time"))?
.parse()
.map_err(|err| Error::new(format!("failed to parse time second: {err}")))?;
Ok(TimeValue {
hour,
minute,
second,
microsecond,
})
}
pub(crate) fn parse_mysql_datetime_text(bytes: &[u8]) -> Result<DateTimeValue> {
let text = str::from_utf8(bytes).map_err(|err| Error::new(err.to_string()))?;
let (date, time) = text
.split_once(' ')
.or_else(|| text.split_once('T'))
.ok_or_else(|| Error::new("invalid mysql datetime"))?;
Ok(DateTimeValue {
date: parse_mysql_date_text(date.as_bytes())?,
time: parse_mysql_time_text(time.as_bytes())?,
})
}
pub(crate) fn mysql_time_to_date(value: ffi::MYSQL_TIME) -> DateValue {
DateValue {
year: value.year as i32,
month: value.month as u8,
day: value.day as u8,
}
}
pub(crate) fn mysql_time_to_time(value: ffi::MYSQL_TIME) -> TimeValue {
TimeValue {
hour: value.hour as u8,
minute: value.minute as u8,
second: value.second as u8,
microsecond: value.second_part as u32,
}
}
pub(crate) fn mysql_time_to_datetime(value: ffi::MYSQL_TIME) -> DateTimeValue {
DateTimeValue {
date: mysql_time_to_date(value),
time: mysql_time_to_time(value),
}
}
#[inline]
fn statement_buffer_len(declared_length: u64) -> usize {
declared_length.clamp(1, 8 * 1024) as usize
}
#[inline]
pub(crate) fn statement_buffer_len_for_column(column: &Column) -> usize {
match column.column_type {
1 => 1,
2 => 2,
3 | 4 | 9 => 4,
5 | 8 => 8,
MYSQL_TYPE_DATE
| MYSQL_TYPE_TIME
| MYSQL_TYPE_DATETIME
| MYSQL_TYPE_TIMESTAMP
| MYSQL_TYPE_TIME2
| MYSQL_TYPE_DATETIME2
| MYSQL_TYPE_TIMESTAMP2 => mem::size_of::<ffi::MYSQL_TIME>(),
_ => statement_buffer_len(column.declared_length),
}
}
#[inline]
pub(crate) fn bind_buffer_type(column_type: u32) -> u32 {
match column_type {
1 => ffi::enum_field_types_MYSQL_TYPE_TINY,
2 => ffi::enum_field_types_MYSQL_TYPE_SHORT,
3 => ffi::enum_field_types_MYSQL_TYPE_LONG,
4 => ffi::enum_field_types_MYSQL_TYPE_FLOAT,
5 => ffi::enum_field_types_MYSQL_TYPE_DOUBLE,
8 => ffi::enum_field_types_MYSQL_TYPE_LONGLONG,
9 => ffi::enum_field_types_MYSQL_TYPE_LONG,
_ => column_type,
}
}
#[inline]
pub(crate) fn column_is_unsigned(flags: u32) -> ffi::my_bool {
const UNSIGNED_FLAG: u32 = 32;
((flags & UNSIGNED_FLAG) != 0) as ffi::my_bool
}
#[inline]
pub(crate) fn enable_stmt_max_length(stmt: *mut ffi::MYSQL_STMT) -> Result<()> {
let update: ffi::my_bool = 1;
let rc = unsafe {
ffi::mysql_stmt_attr_set(
stmt,
ffi::enum_stmt_attr_type_STMT_ATTR_UPDATE_MAX_LENGTH,
(&update as *const ffi::my_bool).cast(),
)
};
if rc != 0 {
Err(unsafe {
Error::from_stmt(
stmt,
"mysql_stmt_attr_set(STMT_ATTR_UPDATE_MAX_LENGTH) failed",
)
})
} else {
Ok(())
}
}
#[inline]
pub(crate) fn to_cstring_ptr(value: Option<&str>) -> Result<Option<CString>> {
value.map(CString::new).transpose().map_err(Into::into)
}
#[inline]
fn opt_ptr(value: &Option<CString>) -> *const i8 {
value.as_ref().map_or(ptr::null(), |value| value.as_ptr())
}
#[inline]
pub(super) fn c_error_string(ptr: *const i8, fallback: String) -> String {
if ptr.is_null() {
return fallback;
}
let text = unsafe { CStr::from_ptr(ptr) }.to_string_lossy();
if text.is_empty() {
fallback
} else {
text.into_owned()
}
}
#[inline]
pub(super) fn c_opt_string(ptr: *const i8) -> Option<String> {
if ptr.is_null() {
None
} else {
Some(
unsafe { CStr::from_ptr(ptr) }
.to_string_lossy()
.into_owned(),
)
}
}
#[cfg(test)]
mod tests {
use std::env;
use std::thread;
use super::super::connection::Connection;
use super::super::options::ConnectOptions;
use super::super::rows::{Metadata, ResultSet, Row, RowRef, RowRefInner};
use super::super::value::ParamRefSlice;
use super::*;
fn assert_send<T: Send>() {}
#[test]
fn c_opt_string_handles_null() {
assert_eq!(c_opt_string(std::ptr::null()), None);
}
#[test]
fn mysql_cell_bytes_allows_zero_length_null_pointer() {
let bytes = unsafe { mysql_cell_bytes(std::ptr::null(), 0) };
assert!(bytes.is_empty());
}
#[test]
fn mysql_bind_init_starts_zeroed() {
let bind = mysql_bind_init();
assert!(bind.length.is_null());
assert!(bind.is_null.is_null());
assert!(bind.buffer.is_null());
assert!(bind.error.is_null());
assert!(bind.store_param_func.is_none());
assert!(bind.fetch_result.is_none());
assert!(bind.skip_result.is_none());
assert_eq!(bind.buffer_length, 0);
assert_eq!(bind.offset, 0);
assert_eq!(bind.length_value, 0);
assert_eq!(bind.flags, 0);
assert_eq!(bind.pack_length, 0);
assert_eq!(bind.buffer_type, 0);
assert_eq!(bind.error_value, 0);
assert_eq!(bind.is_unsigned, 0);
assert_eq!(bind.long_data_used, 0);
assert_eq!(bind.is_null_value, 0);
assert!(bind.extension.is_null());
}
#[test]
fn param_scratch_owns_borrowed_string_and_blob_buffers() {
let text = String::from("Ada");
let bytes = b"bytes".to_vec();
let params = [
ValueRef::String(&text),
ValueRef::Bytes(&bytes),
ValueRef::Null,
];
let mut scratch = ParamScratch::new(params.len());
scratch
.bind_source(&ParamRefSlice(¶ms))
.expect("bind source");
assert_ne!(scratch.binds[0].buffer as *const u8, text.as_ptr());
let copied_text = unsafe {
std::slice::from_raw_parts(
scratch.binds[0].buffer.cast::<u8>(),
scratch.lengths[0] as usize,
)
};
assert_eq!(copied_text, text.as_bytes());
assert_ne!(scratch.binds[1].buffer as *const u8, bytes.as_ptr());
let copied_bytes = unsafe {
std::slice::from_raw_parts(
scratch.binds[1].buffer.cast::<u8>(),
scratch.lengths[1] as usize,
)
};
assert_eq!(copied_bytes, bytes.as_slice());
}
#[test]
fn connect_options_builder_round_trips() {
let opts = ConnectOptions::new()
.host("127.0.0.1")
.port(3307)
.user("app")
.password("secret")
.database("demo")
.unix_socket("/tmp/mysql.sock");
assert_eq!(opts.host.as_deref(), Some("127.0.0.1"));
assert_eq!(opts.port, 3307);
assert_eq!(opts.user.as_deref(), Some("app"));
assert_eq!(opts.password.as_deref(), Some("secret"));
assert_eq!(opts.database.as_deref(), Some("demo"));
assert_eq!(opts.unix_socket.as_deref(), Some("/tmp/mysql.sock"));
}
#[test]
fn row_name_lookup_works() {
let metadata = Metadata {
columns: vec![
Column {
name: "id".into(),
column_type: MYSQL_TYPE_LONGLONG,
flags: 0,
declared_length: 20,
nullable: false,
},
Column {
name: "name".into(),
column_type: 253,
flags: 0,
declared_length: 255,
nullable: false,
},
],
name_order: vec![0, 1].into_boxed_slice(),
};
let row = [
Cell {
ptr: b"42".as_ptr(),
len: 2,
is_null: false,
},
Cell {
ptr: b"Ada".as_ptr(),
len: 3,
is_null: false,
},
];
let row = RowRef {
metadata: &metadata,
row: RowRefInner::Text(&row[..]),
};
assert_eq!(row.get_i64("id").unwrap(), 42);
assert_eq!(row.get_str("name").unwrap(), "Ada");
}
#[test]
fn native_types_are_send() {
assert_send::<Connection>();
assert_send::<ResultSet>();
assert_send::<Row>();
}
fn mysql_test_options() -> ConnectOptions {
let mut options = ConnectOptions::new();
if let Ok(host) = env::var("QUEX_TEST_MYSQL_HOST") {
options = options.host(host);
}
if let Ok(port) = env::var("QUEX_TEST_MYSQL_PORT") {
options = options.port(port.parse().expect("valid mysql test port"));
}
if let Ok(user) = env::var("QUEX_TEST_MYSQL_USER") {
options = options.user(user);
}
if let Ok(password) = env::var("QUEX_TEST_MYSQL_PASSWORD") {
options = options.password(password);
}
if let Ok(database) = env::var("QUEX_TEST_MYSQL_DATABASE") {
options = options.database(database);
}
if let Ok(unix_socket) = env::var("QUEX_TEST_MYSQL_SOCKET") {
options = options.unix_socket(unix_socket);
}
options
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires a local MariaDB instance and QUEX_TEST_MYSQL_* env if defaults are unsuitable"]
async fn mysql_connection_moves_across_tokio_tasks_before_query_and_drop() {
let mut conn = Connection::connect(mysql_test_options())
.await
.expect("connect");
let mut saw_different_thread = false;
let mut last_thread = thread::current().id();
for _ in 0..32 {
let (task_thread, next_conn) = tokio::spawn(async move {
tokio::task::yield_now().await;
let mut conn = conn;
let mut rows = conn.query("select 42 as id").await.expect("query");
let row = rows.next().await.expect("next").expect("row");
assert_eq!(row.get_i64("id").expect("id"), 42);
(thread::current().id(), conn)
})
.await
.expect("join query task");
saw_different_thread |= task_thread != last_thread;
last_thread = task_thread;
conn = next_conn;
if saw_different_thread {
break;
}
}
assert!(
saw_different_thread,
"connection never changed Tokio task thread"
);
let drop_thread = tokio::spawn(async move {
tokio::task::yield_now().await;
let drop_thread = thread::current().id();
drop(conn);
drop_thread
})
.await
.expect("join drop task");
assert_ne!(
drop_thread, last_thread,
"connection drop stayed on the same task thread"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires a local MariaDB instance and QUEX_TEST_MYSQL_* env if defaults are unsuitable"]
async fn mysql_result_set_moves_across_tokio_tasks_before_poll_and_drop() {
let mut conn = Connection::connect(mysql_test_options())
.await
.expect("connect");
let rows = conn
.query("select 7 as id union all select 9 as id")
.await
.expect("query");
let create_thread = thread::current().id();
let (poll_thread, rows) = tokio::spawn(async move {
tokio::task::yield_now().await;
let poll_thread = thread::current().id();
let mut rows = rows;
let row = rows.next().await.expect("next").expect("row");
assert_eq!(row.get_i64("id").expect("id"), 7);
(poll_thread, rows)
})
.await
.expect("join resultset task");
assert_ne!(
poll_thread, create_thread,
"result set poll stayed on the creator thread"
);
let drop_thread = tokio::spawn(async move {
tokio::task::yield_now().await;
let drop_thread = thread::current().id();
drop(rows);
drop_thread
})
.await
.expect("join drop task");
assert_ne!(
drop_thread, poll_thread,
"result set drop stayed on the poll thread"
);
}
}