#![cfg(feature = "net_box")]
use core::time::Duration;
use std::net::ToSocketAddrs;
use std::rc::Rc;
pub use index::{RemoteIndex, RemoteIndexIterator};
use inner::ConnInner;
pub use options::{ConnOptions, ConnTriggers, Options};
use promise::Promise;
pub use space::RemoteSpace;
use crate::error::Error;
use crate::network::protocol;
use crate::tuple::{Decode, ToTupleBuffer, Tuple};
mod index;
mod inner;
mod options;
pub mod promise;
mod recv_queue;
mod schema;
mod send_queue;
mod space;
mod stream;
#[deprecated = "use `TarantoolError` instead"]
pub type ResponseError = crate::error::TarantoolError;
pub struct Conn {
inner: Rc<ConnInner>,
is_master: bool,
}
impl Conn {
#[inline(always)]
pub fn new(
addr: impl ToSocketAddrs,
options: ConnOptions,
triggers: Option<Rc<dyn ConnTriggers>>,
) -> Result<Self, Error> {
Ok(Conn {
inner: ConnInner::new(addr.to_socket_addrs()?.collect(), options, triggers)?,
is_master: true,
})
}
#[inline(always)]
fn downgrade(inner: Rc<ConnInner>) -> Self {
Conn {
inner,
is_master: false,
}
}
pub fn wait_connected(&self, timeout: Option<Duration>) -> Result<bool, Error> {
self.inner.wait_connected(timeout)
}
pub fn is_connected(&self) -> bool {
self.inner.is_connected()
}
pub fn close(&self) {
self.inner.close()
}
pub fn ping(&self, options: &Options) -> Result<(), Error> {
self.inner.request(&protocol::Ping, options)?;
Ok(())
}
pub fn call<T>(
&self,
fn_name: &str,
args: &T,
options: &Options,
) -> Result<Option<Tuple>, Error>
where
T: ToTupleBuffer,
T: ?Sized,
{
let res = self
.inner
.request(&protocol::Call { fn_name, args }, options)?;
Ok(Some(res))
}
pub fn call_async<A, R>(&self, fn_name: &str, args: A) -> crate::Result<Promise<R>>
where
A: ToTupleBuffer,
R: for<'de> Decode<'de> + 'static,
{
self.inner.request_async(&protocol::Call {
fn_name,
args: &args,
})
}
pub fn eval<T>(&self, expr: &str, args: &T, options: &Options) -> Result<Option<Tuple>, Error>
where
T: ToTupleBuffer,
T: ?Sized,
{
let res = self
.inner
.request(&protocol::Eval { expr, args }, options)?;
Ok(Some(res))
}
pub fn eval_async<A, R>(&self, expr: &str, args: A) -> crate::Result<Promise<R>>
where
A: ToTupleBuffer,
R: for<'de> Decode<'de> + 'static,
{
self.inner
.request_async(&protocol::Eval { expr, args: &args })
}
pub fn space(&self, name: &str) -> Result<Option<RemoteSpace>, Error> {
Ok(self
.inner
.lookup_space(name)?
.map(|space_id| RemoteSpace::new(self.inner.clone(), space_id)))
}
pub fn execute<P>(
&self,
sql: &str,
bind_params: &P,
options: &Options,
) -> Result<Vec<Tuple>, Error>
where
P: ToTupleBuffer + ?Sized,
{
self.inner
.request(&protocol::Execute { sql, bind_params }, options)
}
}
impl Drop for Conn {
fn drop(&mut self) {
if self.is_master {
self.close();
}
}
}
#[cfg(feature = "internal_test")]
mod tests {
use super::*;
use crate::test::util::listen_port;
fn test_user_conn() -> Conn {
Conn::new(
("localhost", listen_port()),
ConnOptions {
user: "test_user".into(),
password: "password".into(),
auth_method: crate::auth::AuthMethod::ChapSha1,
..ConnOptions::default()
},
None,
)
.unwrap()
}
#[crate::test(tarantool = "crate")]
fn dont_drop_worker_join_handles() {
struct UnexpectedIOError;
impl ToTupleBuffer for UnexpectedIOError {
fn write_tuple_data(&self, _: &mut impl std::io::Write) -> Result<(), Error> {
Err(Error::other("some io error"))
}
}
let conn = test_user_conn();
let e = conn
.eval("return ...", &UnexpectedIOError, &Default::default())
.unwrap_err();
assert_eq!(e.to_string(), "some io error");
let e = conn
.eval("return ...", &[1], &Default::default())
.unwrap_err();
assert_eq!(e.to_string(), "io error: not connected");
conn.close();
}
#[crate::test(tarantool = "crate")]
fn errors_in_a_row_bug() {
let conn = test_user_conn();
for _ in 0..5 {
let e = conn
.eval("error 'oops'", &(), &Default::default())
.unwrap_err();
#[rustfmt::skip]
assert_eq!(e.to_string(), "server responded with error: ProcLua: eval:1: oops");
}
}
#[cfg(feature = "picodata")]
#[crate::test(tarantool = "crate")]
async fn md5_auth_method() {
use crate::auth::AuthMethod;
let username = "Worry";
let password = "B Gone";
crate::lua_state()
.exec_with(
"local username, password = ...
box.cfg { }
box.schema.user.create(username, { if_not_exists = true, auth_type = 'md5', password = password })
box.schema.user.grant(username, 'super', nil, nil, { if_not_exists = true })",
(username, password),
)
.unwrap();
{
let conn = Conn::new(
("localhost", listen_port()),
ConnOptions {
user: username.into(),
password: password.into(),
auth_method: AuthMethod::Md5,
..ConnOptions::default()
},
None,
)
.unwrap();
conn.eval(
"print('\\x1b[32mit works!\\x1b[0m')",
&(),
&Default::default(),
)
.unwrap();
}
{
let conn = Conn::new(
("localhost", listen_port()),
ConnOptions {
user: username.into(),
password: "wrong password".into(),
auth_method: AuthMethod::Md5,
..ConnOptions::default()
},
None,
)
.unwrap();
let err = conn
.eval("return", &(), &Default::default())
.unwrap_err()
.to_string();
assert_eq!(
err,
"server responded with error: PasswordMismatch: User not found or supplied credentials are invalid"
);
}
{
let conn = Conn::new(
("localhost", listen_port()),
ConnOptions {
user: username.into(),
password: "wrong password".into(),
auth_method: AuthMethod::ChapSha1,
..ConnOptions::default()
},
None,
)
.unwrap();
let err = conn
.eval("return", &(), &Default::default())
.unwrap_err()
.to_string();
assert_eq!(
err,
"server responded with error: PasswordMismatch: User not found or supplied credentials are invalid"
);
}
crate::lua_state()
.exec_with(
"local username = ...
box.cfg { auth_type = 'chap-sha1' }
box.schema.user.drop(username)",
username,
)
.unwrap();
}
#[cfg(feature = "picodata")]
#[crate::test(tarantool = "crate")]
async fn ldap_auth_method() {
use crate::auth::AuthMethod;
let username = "Worry";
let password = "B Gone";
let _guard = crate::unwrap_ok_or!(
crate::test::util::setup_ldap_auth(username, password),
Err(e) => {
println!("{e}, skipping ldap test");
return;
}
);
{
let conn = Conn::new(
("localhost", listen_port()),
ConnOptions {
user: username.into(),
password: password.into(),
auth_method: AuthMethod::Ldap,
..ConnOptions::default()
},
None,
)
.unwrap();
conn.eval(
"print('\\x1b[32mit works!\\x1b[0m')",
&(),
&Default::default(),
)
.unwrap();
}
{
let conn = Conn::new(
("localhost", listen_port()),
ConnOptions {
user: username.into(),
password: "wrong password".into(),
auth_method: AuthMethod::Ldap,
..ConnOptions::default()
},
None,
)
.unwrap();
let err = conn
.eval("return", &(), &Default::default())
.unwrap_err()
.to_string();
assert_eq!(
err,
"server responded with error: System: Invalid credentials"
);
}
{
let conn = Conn::new(
("localhost", listen_port()),
ConnOptions {
user: username.into(),
password: "wrong password".into(),
auth_method: AuthMethod::ChapSha1,
..ConnOptions::default()
},
None,
)
.unwrap();
let err = conn
.eval("return", &(), &Default::default())
.unwrap_err()
.to_string();
assert_eq!(
err,
"server responded with error: PasswordMismatch: User not found or supplied credentials are invalid"
);
}
}
}