use crate::attr::*;
use crate::bind::*;
use crate::conn::*;
use crate::error::*;
use crate::handle::*;
use crate::poll::*;
use crate::stream;
use crate::util;
use futures::future::Either;
use futures::*;
use futures_state_stream::*;
use odbc_sys::*;
use std::any::Any;
use std::fmt;
use std::sync::{Arc, Mutex, RwLock};
#[derive(Default)]
struct SqlParameterData {
parameters: Vec<Box<Any>>,
data: Vec<Box<SqlValueData>>,
}
pub struct SqlStatement {
handle: SQLHSTMT,
connection: Arc<RwLock<SqlConnection>>,
parameters: Mutex<SqlParameterData>,
}
impl fmt::Debug for SqlStatement {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"{{ handle: {:p}, connection: {:?} }}",
self.handle, self.connection
)
}
}
impl Drop for SqlStatement {
fn drop(&mut self) {
unsafe { self.dealloc_handle().unwrap() }
}
}
impl SqlHandle for SqlStatement {
const TYPE: HandleType = SQL_HANDLE_STMT;
type Type = SQLHSTMT;
unsafe fn typed_handle(&self) -> SQLHSTMT {
self.handle
}
unsafe fn handle(&self) -> SQLHANDLE {
self.handle as SQLHANDLE
}
}
impl SqlAttribute for SqlStatementAttribute {
fn buffer_length(&self) -> Option<SqlAttributeStringLength> {
let value = match self {
SQL_ATTR_ROWS_FETCHED_PTR => SQL_IS_POINTER,
_ => SQL_IS_UINTEGER,
};
Some(value)
}
}
unsafe impl SqlAttributes for SqlStatement {
type AttributeType = SqlStatementAttribute;
const GETTER_NAME: &'static str = "SQLGetStmtAttrW";
const GETTER: unsafe extern "system" fn(
SQLHSTMT,
SqlStatementAttribute,
SQLPOINTER,
SQLINTEGER,
*mut SQLINTEGER,
) -> SQLRETURN = SQLGetStmtAttrW;
const SETTER: unsafe extern "system" fn(
SQLHSTMT,
SqlStatementAttribute,
SQLPOINTER,
SQLINTEGER,
) -> SQLRETURN = SQLSetStmtAttrW;
const SETTER_NAME: &'static str = "SQLSetStmtAttrW";
}
unsafe impl Send for SqlStatement {}
unsafe impl Sync for SqlStatement {}
unsafe impl SqlRawHandle for SqlStatement {}
const SQL_TYPE_INFO_COLUMN_COUNT: SQLUSMALLINT = 19;
use odbc_futures_derive::Odbc;
#[derive(Debug, Clone, Odbc)]
#[odbc_columns_seq]
pub struct SqlTypeInfo {
pub type_name: String,
pub data_type: SqlDataType,
pub column_size: Option<usize>,
pub literal_prefix: Option<String>,
pub literal_suffix: Option<String>,
pub create_params: Option<String>,
pub nullable: Nullable,
pub case_sensitive: bool,
pub searchable: i16,
pub unsigned_attribute: Option<bool>,
pub fixed_prec_scale: Option<bool>,
pub auto_unique_value: Option<bool>,
pub local_type_name: Option<String>,
pub minimum_scale: Option<SQLSMALLINT>,
pub maximum_scale: Option<SQLSMALLINT>,
pub sql_data_type: SqlDataType,
pub sql_datetime_sub: Option<SqlDataType>,
pub num_prec_radix: Option<usize>,
pub interval_precision: Option<SQLSMALLINT>,
}
impl Default for SqlTypeInfo {
fn default() -> SqlTypeInfo {
SqlTypeInfo {
type_name: Default::default(),
data_type: SQL_UNKNOWN_TYPE,
column_size: Default::default(),
literal_prefix: Default::default(),
literal_suffix: Default::default(),
create_params: Default::default(),
nullable: SQL_NULLABLE_UNKNOWN,
case_sensitive: Default::default(),
searchable: Default::default(),
unsigned_attribute: Default::default(),
fixed_prec_scale: Default::default(),
auto_unique_value: Default::default(),
local_type_name: Default::default(),
minimum_scale: Default::default(),
maximum_scale: Default::default(),
sql_data_type: SQL_UNKNOWN_TYPE,
sql_datetime_sub: Default::default(),
num_prec_radix: Default::default(),
interval_precision: Default::default(),
}
}
}
impl SqlStatement {
pub fn new(connection: &Arc<RwLock<SqlConnection>>) -> SqlResult<SqlStatement> {
let connection = Arc::clone(connection);
let handle = {
let lock = connection.read().unwrap();
unsafe { lock.alloc_child_handle::<SqlStatement>()? as SQLHSTMT }
};
Ok(SqlStatement {
handle,
connection,
parameters: Mutex::new(Default::default()),
})
}
pub fn set_async(&mut self, is_async: bool) -> SqlResult {
unsafe {
self.set_attribute(
SqlStatementAttribute::SQL_ATTR_ASYNC_ENABLE,
is_async as usize,
)
}
}
pub fn get_async(&self) -> SqlResult<bool> {
unsafe {
self.get_attribute::<SQLUINTEGER>(SqlStatementAttribute::SQL_ATTR_ASYNC_ENABLE)
.map(util::u32_to_bool)
}
}
pub fn bind_parameter<TValue, TContext>(
&mut self,
parameter_number: SQLUSMALLINT,
value: TValue,
options: Option<SqlBindParameterOptions>,
) -> SqlResult
where
TValue: SqlValue<TContext> + 'static,
SqlValueContext: AsMut<TContext>,
{
self.bind_boxed_parameter(parameter_number, Box::new(value), options)
}
pub fn bind_boxed_parameter<TValue, TContext>(
&mut self,
parameter_number: SQLUSMALLINT,
mut value: Box<TValue>,
options: Option<SqlBindParameterOptions>,
) -> SqlResult
where
TValue: SqlValue<TContext> + 'static,
SqlValueContext: AsMut<TContext>,
{
let options = options.unwrap_or_default();
let mut lock = self.parameters.lock().unwrap();
assert!(parameter_number > 0, "parameters start at 1");
let index = (parameter_number - 1) as usize;
assert!(index <= lock.parameters.len());
if index < lock.parameters.len() {
let old_data =
std::mem::replace(&mut lock.data[index], Box::new(SqlValueData::default()));
let data = &mut lock.data[index];
if let Err(e) = value.bind_parameter(
parameter_number,
options,
self,
data.context.as_mut(),
&mut data.len,
) {
*data = old_data;
Err(e)
} else {
lock.parameters[index] = value as Box<Any>;
Ok(())
}
} else {
let mut data = Box::new(SqlValueData::default());
value.bind_parameter(
parameter_number,
options,
self,
data.context.as_mut(),
&mut data.len,
)?;
lock.data.push(data);
lock.parameters.push(value as Box<Any>);
Ok(())
}
}
pub fn reset_parameters(&mut self) -> SqlResult {
let mut lock = self.parameters.lock().unwrap();
let ret = unsafe { SQLFreeStmt(self.handle, SQL_RESET_PARAMS) };
match ret {
SQL_SUCCESS | SQL_SUCCESS_WITH_INFO => {
lock.data.clear();
lock.parameters.clear();
Ok(())
}
SQL_ERROR => Err(self.get_detailed_error(ret)),
_ => panic!(
"Unexpected SQLFreeStmt(SQL_RESET_PARAMS) return code: {:?}",
ret
),
}
}
pub fn unbind(&mut self) -> SqlResult {
let ret = unsafe { SQLFreeStmt(self.handle, SQL_UNBIND) };
match ret {
SQL_SUCCESS | SQL_SUCCESS_WITH_INFO => Ok(()),
SQL_ERROR => Err(self.get_detailed_error(ret)),
_ => panic!("Unexpected SQLFreeStmt(SQL_UNBIND) return code: {:?}", ret),
}
}
pub fn close(&mut self) -> SqlResult {
let ret = unsafe { SQLFreeStmt(self.handle, SQL_CLOSE) };
match ret {
SQL_SUCCESS | SQL_SUCCESS_WITH_INFO => Ok(()),
SQL_ERROR => Err(self.get_detailed_error(ret)),
_ => panic!("Unexpected SQLFreeStmt(SQL_CLOSE) return code: {:?}", ret),
}
}
pub fn prepare(&mut self, statement_text: &str) -> SqlResult {
let raw_statement_text: Vec<u16> = statement_text.encode_utf16().collect();
self.prepare_impl(&raw_statement_text).unwrap_async()
}
pub fn row_count(&self) -> SqlResult<Option<usize>> {
let mut row_count: SQLLEN = 0;
let ret = unsafe { SQLRowCount(self.handle, &mut row_count) };
match ret {
SQL_SUCCESS | SQL_SUCCESS_WITH_INFO => {
let count = if row_count < 0 {
None
} else {
Some(row_count as usize)
};
Ok(count)
}
SQL_ERROR => Err(self.get_detailed_error(ret)),
_ => panic!("Unexpected SQLRowCount return code: {:?}", ret),
}
}
pub fn prepare_async(self, statement_text: &str) -> SqlFuture<SqlStatement> {
let raw_statement_text: Vec<u16> = statement_text.encode_utf16().collect();
let f = SqlValueFuture::new(self, move |statement: &mut SqlStatement| {
statement.prepare_impl(&raw_statement_text)
});
flatten_value_future(f)
}
fn prepare_impl(&mut self, raw_statement_text: &[u16]) -> SqlPoll {
let ret = unsafe {
SQLPrepareW(
self.handle,
raw_statement_text.as_ptr(),
raw_statement_text.len() as SQLINTEGER,
)
};
match ret {
SQL_SUCCESS | SQL_SUCCESS_WITH_INFO => Ok(Async::Ready(())),
SQL_STILL_EXECUTING => Ok(Async::NotReady),
SQL_ERROR => Err(self.get_detailed_error(ret)),
_ => panic!("Unexpected SQLPrepareW return code: {:?}", ret),
}
}
pub fn exec(&mut self) -> SqlResult {
self.exec_impl().unwrap_async()
}
pub fn exec_async(self) -> SqlFuture<SqlStatement> {
let f = SqlValueFuture::new(self, Self::exec_impl);
flatten_value_future(f)
}
fn exec_impl(&mut self) -> SqlPoll {
let ret = unsafe { SQLExecute(self.handle) };
match ret {
SQL_NO_DATA | SQL_SUCCESS | SQL_SUCCESS_WITH_INFO => Ok(Async::Ready(())),
SQL_STILL_EXECUTING => Ok(Async::NotReady),
SQL_ERROR => Err(self.get_detailed_error(ret)),
_ => panic!("Unexpected SQLExecute return code: {:?}", ret),
}
}
pub fn exec_direct(&mut self, statement_text: &str) -> SqlResult {
let raw_statement_text: Vec<u16> = statement_text.encode_utf16().collect();
self.exec_direct_impl(&raw_statement_text).unwrap_async()
}
pub fn exec_direct_async(self, statement_text: &str) -> SqlFuture<SqlStatement> {
let raw_statement_text: Vec<u16> = statement_text.encode_utf16().collect();
let f = SqlValueFuture::new(self, move |statement: &mut SqlStatement| {
statement.exec_direct_impl(&raw_statement_text)
});
flatten_value_future(f)
}
fn exec_direct_impl(&mut self, raw_statement_text: &[u16]) -> SqlPoll {
let ret = unsafe {
SQLExecDirectW(
self.handle,
raw_statement_text.as_ptr(),
raw_statement_text.len() as SQLINTEGER,
)
};
match ret {
SQL_NO_DATA | SQL_SUCCESS | SQL_SUCCESS_WITH_INFO => Ok(Async::Ready(())),
SQL_STILL_EXECUTING => Ok(Async::NotReady),
SQL_ERROR => Err(self.get_detailed_error(ret)),
_ => panic!("Unexpected SQLExecDirectW return code: {:?}", ret),
}
}
pub fn get_type_info_async(
self,
data_type: Option<SqlDataType>,
) -> impl Future<Item = SqlStatementStream<SqlTypeInfo>, Error = (SqlError, SqlStatement)> + Send
{
let data_type = data_type.unwrap_or(SQL_UNKNOWN_TYPE);
SqlValueFuture::new(self, move |stmt| stmt.get_type_info_impl(data_type))
.and_then(|(statement, _)| {
stream::describe_cols_stream(statement, SQL_TYPE_INFO_COLUMN_COUNT as SQLSMALLINT)
.collect()
})
.and_then(|(columns, statement)| stream::fetch_state_stream(columns, statement))
}
pub fn get_type_info_collect_async(
self,
data_type: Option<SqlDataType>,
) -> impl Future<Item = (Vec<SqlTypeInfo>, SqlStatement), Error = (SqlError, SqlStatement)> + Send
{
self.get_type_info_async(data_type)
.and_then(StateStream::collect)
}
fn get_type_info_impl(&self, data_type: SqlDataType) -> SqlPoll {
let ret = unsafe { SQLGetTypeInfo(self.handle, data_type) };
match ret {
SQL_SUCCESS | SQL_SUCCESS_WITH_INFO => Ok(Async::Ready(())),
SQL_ERROR => {
let e = self.get_detailed_error(ret);
Err(e)
}
SQL_STILL_EXECUTING => Ok(Async::NotReady),
_ => panic!(
"Unexpected SQLGetTypeInfo({:p}, {:?}) return code: {:?}",
self.handle, data_type, ret
),
}
}
pub fn exec_direct_stream_async<T>(
self,
statement_text: &str,
) -> impl Future<Item = SqlStatementStream<T>, Error = (SqlError, SqlStatement)> + Send
where
T: SqlColumns,
{
let raw_statement_text: Vec<u16> = statement_text.encode_utf16().collect();
SqlValueFuture::new(self, move |statement| {
statement.exec_direct_impl(&raw_statement_text)
})
.and_then(|(statement, _)| SqlValueFuture::new(statement, stream::num_result_cols))
.and_then(|(statement, num_result_cols)| {
stream::describe_cols_stream(statement, num_result_cols).collect()
})
.and_then(|(columns, statement)| stream::fetch_state_stream(columns, statement))
}
pub fn exec_direct_collect_async<T>(
self,
statement_text: &str,
) -> impl Future<Item = (Vec<T>, SqlStatement), Error = (SqlError, SqlStatement)> + Send
where
T: SqlColumns,
{
self.exec_direct_stream_async::<T>(statement_text)
.and_then(StateStream::collect)
}
pub fn exec_stream_async<T>(
self,
) -> impl Future<Item = SqlStatementStream<T>, Error = (SqlError, SqlStatement)> + Send
where
T: SqlColumns,
{
self.exec_async()
.and_then(|statement| SqlValueFuture::new(statement, stream::num_result_cols))
.and_then(|(statement, num_result_cols)| {
stream::describe_cols_stream(statement, num_result_cols).collect()
})
.and_then(|(columns, statement)| stream::fetch_state_stream(columns, statement))
}
pub fn exec_collect_async<T>(
self,
) -> impl Future<Item = (Vec<T>, SqlStatement), Error = (SqlError, SqlStatement)> + Send
where
T: SqlColumns,
{
self.exec_stream_async::<T>().and_then(StateStream::collect)
}
fn more_results_impl(&mut self) -> SqlPoll<bool> {
let ret = unsafe { SQLMoreResults(self.handle) };
match ret {
SQL_SUCCESS | SQL_SUCCESS_WITH_INFO => Ok(Async::Ready(true)),
SQL_NO_DATA => Ok(Async::Ready(false)),
SQL_STILL_EXECUTING => Ok(Async::NotReady),
SQL_ERROR => Err(self.get_detailed_error(ret)),
_ => panic!("Unexpected SQLMoreResults return code: {:?}", ret),
}
}
pub fn more_results_stream_async<T>(
self,
) -> impl Future<
Item = Either<SqlStatementStream<T>, SqlStatement>,
Error = (SqlError, SqlStatement),
> + Send
where
T: SqlColumns,
{
SqlValueFuture::new(self, Self::more_results_impl)
.and_then(|(statement, more)| statement.more_stream_impl(more))
}
pub fn more_results_collect_async<T>(
self,
) -> impl Future<Item = (Option<Vec<T>>, SqlStatement), Error = (SqlError, SqlStatement)> + Send
where
T: SqlColumns,
{
self.more_results_stream_async().and_then(
|stream_or_statement| -> Box<
Future<Item = (Option<Vec<T>>, SqlStatement), Error = (SqlError, SqlStatement)>
+ Send,
> {
match stream_or_statement {
Either::A(stream) => {
let f = stream
.collect()
.map(|(data, statement)| (Some(data), statement));
Box::new(f)
}
Either::B(statement) => {
let f = futures::finished((None, statement));
Box::new(f)
}
}
},
)
}
fn more_stream_impl<T: SqlColumns>(
self,
more: bool,
) -> Box<
dyn Future<
Item = Either<SqlStatementStream<T>, SqlStatement>,
Error = (SqlError, SqlStatement),
> + Send,
> {
if more {
let f = SqlValueFuture::new(self, stream::num_result_cols)
.and_then(|(statement, num_result_cols)| {
stream::describe_cols_stream(statement, num_result_cols).collect()
})
.and_then(|(columns, statement)| stream::fetch_state_stream(columns, statement))
.map(Either::A);
Box::new(f)
} else {
let f = futures::finished(Either::B(self));
Box::new(f)
}
}
pub fn connection(&self) -> Arc<RwLock<SqlConnection>> {
Arc::clone(&self.connection)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::env::*;
use crate::util::{print_diagnostics, print_diagnostics_async};
#[test]
fn bind() {
let mut env = SqlEnvironment::new().unwrap();
print_diagnostics(&env);
env.set_version(OdbcVersion::SQL_OV_ODBC3_80).unwrap();
print_diagnostics(&env);
let arc_env = Arc::new(RwLock::new(env));
let connection_string = std::env::var("ODBC_CONNECTION_STRING_MSSQL").unwrap();
let mut connection = SqlConnection::new(&arc_env).unwrap();
print_diagnostics(&connection);
print_diagnostics_async(&arc_env);
connection.driver_connect(&connection_string).unwrap();
print_diagnostics(&connection);
let arc_connection = Arc::new(RwLock::new(connection));
let mut statement = SqlStatement::new(&arc_connection).unwrap();
print_diagnostics(&statement);
print_diagnostics_async(&arc_connection);
const HELLO_WORLD: &'static str = "Привіт, світ! 你气不气?";
statement
.bind_parameter::<_, Vec<u16>>(1, Some(HELLO_WORLD.to_owned()), None)
.unwrap();
print_diagnostics(&statement);
statement.exec_direct("print ?").unwrap();
print_diagnostics(&statement);
let diag = statement.diagnostics().unwrap();
let record = diag.iter().find(|r| r.message.contains(HELLO_WORLD));
assert_ne!(None, record);
}
#[test]
fn bind_input() {
let mut env = SqlEnvironment::new().unwrap();
print_diagnostics(&env);
env.set_version(OdbcVersion::SQL_OV_ODBC3_80).unwrap();
print_diagnostics(&env);
let arc_env = Arc::new(RwLock::new(env));
let connection_string = std::env::var("ODBC_CONNECTION_STRING_MSSQL").unwrap();
let mut connection = SqlConnection::new(&arc_env).unwrap();
print_diagnostics(&connection);
print_diagnostics_async(&arc_env);
connection.driver_connect(&connection_string).unwrap();
print_diagnostics(&connection);
let arc_connection = Arc::new(RwLock::new(connection));
let mut statement = SqlStatement::new(&arc_connection).unwrap();
print_diagnostics(&statement);
print_diagnostics_async(&arc_connection);
const HELLO_WORLD: &'static str = "Привіт, світ! 你气不气?";
statement
.bind_parameter::<_, Vec<u16>>(1, Some(HELLO_WORLD.to_owned()), None)
.unwrap();
print_diagnostics(&statement);
statement.exec_direct("print ?").unwrap();
print_diagnostics(&statement);
let diag = statement.diagnostics().unwrap();
let record = diag.iter().find(|r| r.message.contains(HELLO_WORLD));
assert_ne!(None, record);
}
#[test]
fn stream_cols() {
use super::*;
use std::sync::{Arc, RwLock};
let mut env = SqlEnvironment::new().unwrap();
env.set_version(OdbcVersion::SQL_OV_ODBC3_80).unwrap();
let arc_env = Arc::new(RwLock::new(env));
let mut connection = SqlConnection::new(&arc_env).unwrap();
let connection_string = std::env::var("ODBC_CONNECTION_STRING_MSSQL").unwrap();
connection.set_connect_async(true).unwrap();
connection.set_async(true).unwrap();
use odbc_futures_derive::Odbc;
#[derive(Debug, Clone, Default, Odbc)]
struct HelloWorld {
a: u16,
b: String,
c: f32,
d: Option<String>,
e: Option<u8>,
f: bool,
g: Vec<u8>,
#[odbc_string_utf8]
h: String,
i: String,
}
const LONG_SIZE: usize = 16384;
tokio::run(
connection
.driver_connect_async(&connection_string)
.then(|connection_result| {
let connection = connection_result.unwrap();
let arc_connection = Arc::new(RwLock::new(connection));
let mut statement = SqlStatement::new(&arc_connection).unwrap();
for index in 1u16..=6 {
statement
.bind_parameter(index, Some(LONG_SIZE), None)
.unwrap();
}
statement.exec_direct_collect_async::<HelloWorld>(
r#"
select 12 as a
, getutcdate() as b
, 12.34 as c
, newid() as d
, 0 as e
, null as f
, cast(replicate(cast(cast(0 as tinyint) as varbinary(max)), ?) as varbinary(max)) as g
, cast(replicate(cast('0' as nvarchar(max)), ?) as nvarchar(max)) as h
, cast(replicate(cast('0' as varchar(max)), ?) as varchar(max)) as i
union all
select 12 as a
, sysutcdatetime() as b
, 55 as c
, null as d
, null as e
, cast(1 as bit) as f
, cast(replicate(cast(cast(1 as tinyint) as varbinary(max)), (? * 2)) as varbinary(max)) as g
, cast(replicate(cast('1' as nvarchar(max)), (? * 2)) as nvarchar(max)) as h
, cast(replicate(cast('1' as varchar(max)), (? * 2)) as varchar(max)) as i
"#,
)
})
.then(|results| {
let (data, _stmt) = results.unwrap();
let rows: Vec<HelloWorld> = data;
for (idx, row) in rows.iter().enumerate() {
let byte = idx as u8;
let char = std::char::from_digit(idx as u32, 10).unwrap();
assert_eq!(LONG_SIZE * (idx + 1), row.g.len());
assert!(row.g.iter().all(|x| *x == byte));
assert_eq!(LONG_SIZE * (idx + 1), row.h.len());
assert!(row.h.chars().all(|x| x == char));
let i = row.i.as_str();
assert_eq!(LONG_SIZE * (idx + 1), i.len());
assert!(i.chars().all(|x| x == char));
}
Ok(())
}),
);
}
#[cfg(feature = "chrono")]
#[test]
fn chrono() {
use super::*;
use chrono::*;
let mut env = SqlEnvironment::new().unwrap();
print_diagnostics(&env);
env.set_version(OdbcVersion::SQL_OV_ODBC3_80).unwrap();
print_diagnostics(&env);
let arc_env = Arc::new(RwLock::new(env));
let connection_string = std::env::var("ODBC_CONNECTION_STRING_MSSQL").unwrap();
let mut connection = SqlConnection::new(&arc_env).unwrap();
print_diagnostics(&connection);
print_diagnostics_async(&arc_env);
connection.driver_connect(&connection_string).unwrap();
print_diagnostics(&connection);
let arc_connection = Arc::new(RwLock::new(connection));
let mut statement = SqlStatement::new(&arc_connection).unwrap();
print_diagnostics(&statement);
print_diagnostics_async(&arc_connection);
let now = Utc::now();
let fixed = now.timezone().fix();
let now_fixed = now.with_timezone(&fixed);
let now_naive = now.naive_utc();
let expected = now_naive.format("%Y-%m-%dT%H:%M:%S%.6f").to_string();
statement.bind_parameter(1, Some(now_naive), None).unwrap();
statement
.exec_direct("PRINT N'Naive: ' + CONVERT(NVARCHAR(30), ?, 126)")
.unwrap();
print_diagnostics(&statement);
let diag = statement.diagnostics().unwrap();
let record = diag.iter().find(|r| r.message.contains(&expected));
assert_ne!(None, record);
statement.bind_parameter(1, Some(now_fixed), None).unwrap();
statement
.exec_direct("PRINT N'UTC: ' + CONVERT(NVARCHAR(36), ?, 127)")
.unwrap();
print_diagnostics(&statement);
let local = Local::now();
let local_offset = now.timezone().fix();
let local_fixed = local.with_timezone(&local_offset);
statement
.bind_parameter(1, Some(local_fixed), None)
.unwrap();
statement
.exec_direct("PRINT N'Local: ' + CONVERT(NVARCHAR(36), ?, 127)")
.unwrap();
print_diagnostics(&statement);
}
#[test]
fn more_results_cols() {
use super::*;
use std::sync::{Arc, RwLock};
use tokio;
let mut env = SqlEnvironment::new().unwrap();
env.set_version(OdbcVersion::SQL_OV_ODBC3_80).unwrap();
let arc_env = Arc::new(RwLock::new(env));
let mut connection = SqlConnection::new(&arc_env).unwrap();
let connection_string = std::env::var("ODBC_CONNECTION_STRING_MSSQL").unwrap();
connection.set_connect_async(true).unwrap();
connection.set_async(true).unwrap();
use odbc_futures_derive::Odbc;
#[derive(Debug, Clone, Default, Odbc)]
struct HelloWorld {
a: u16,
b: String,
c: f32,
d: Option<String>,
e: Option<u8>,
f: bool,
}
#[derive(Debug, Clone, Default, Odbc)]
struct GoodbyeWorld {
f: u16,
e: String,
d: f32,
c: Option<String>,
b: Option<u8>,
a: bool,
}
tokio::run(
connection
.driver_connect_async(&connection_string)
.then(|connection_result| {
let connection = connection_result.unwrap();
let arc_connection = Arc::new(RwLock::new(connection));
let statement = SqlStatement::new(&arc_connection).unwrap();
statement.exec_direct_collect_async::<HelloWorld>(
r#"
select 12 as a,
getutcdate() as b,
12.34 as c,
newid() as d,
0 as e,
null as f
;
select 12 as f,
sysutcdatetime() as e,
55 as d,
null as c,
null as b,
cast(1 as bit) as a
;
"#,
)
})
.and_then(|(first_results, statement)| {
println!("{:#?}", first_results);
statement.more_results_collect_async::<GoodbyeWorld>()
})
.and_then(|(second_results, _statement)| {
println!("{:#?}", second_results.unwrap());
Ok(())
})
.then(|r| {
r.map_err(|(e, _)| e).unwrap();
Ok(())
}),
);
}
}