use crate::chkerr;
use crate::conn::Purity;
use crate::connection::CommonCreateParamsBuilder;
use crate::AssertSend;
use crate::AssertSync;
use crate::Connection;
use crate::Context;
use crate::DpiPool;
use crate::Error;
use crate::OdpiStr;
use crate::Privilege;
use crate::Result;
use odpic_sys::*;
use std::convert::TryInto;
use std::fmt;
use std::ptr;
use std::time::Duration;
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum CloseMode {
Default,
Force,
}
impl CloseMode {
fn to_dpi_value(self) -> dpiPoolCloseMode {
match self {
CloseMode::Default => DPI_MODE_POOL_CLOSE_DEFAULT,
CloseMode::Force => DPI_MODE_POOL_CLOSE_FORCE,
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum GetMode {
Wait,
NoWait,
ForceGet,
TimedWait(Duration),
}
impl GetMode {
fn to_dpi_value(self) -> dpiPoolGetMode {
match self {
GetMode::Wait => DPI_MODE_POOL_GET_WAIT as dpiPoolGetMode,
GetMode::NoWait => DPI_MODE_POOL_GET_NOWAIT as dpiPoolGetMode,
GetMode::ForceGet => DPI_MODE_POOL_GET_FORCEGET as dpiPoolGetMode,
GetMode::TimedWait(_) => DPI_MODE_POOL_GET_TIMEDWAIT as dpiPoolGetMode,
}
}
fn to_wait_timeout(self) -> Result<Option<u32>> {
if let GetMode::TimedWait(ref dur) = self {
match dur.as_millis().try_into() {
Ok(msecs) => Ok(Some(msecs)),
Err(err) => Err(Error::out_of_range(format!(
"too long timed wait duration {:?}",
dur
))
.add_source(err)),
}
} else {
Ok(None)
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum PoolType {
Homogeneous,
Heterogeneous,
}
#[derive(Clone, Copy, Debug, PartialEq)]
struct I32Seconds(i32);
impl I32Seconds {
fn try_from(dur: Option<Duration>, msg: &str) -> Result<I32Seconds> {
if let Some(dur) = dur {
match dur.as_secs().try_into() {
Ok(secs) => Ok(I32Seconds(secs)),
Err(err) => {
Err(Error::out_of_range(format!("too long {} {:?}", msg, dur)).add_source(err))
}
}
} else {
Ok(I32Seconds(-1))
}
}
fn into(self) -> Option<Duration> {
match self.0.try_into() {
Ok(secs) => Some(Duration::from_secs(secs)),
Err(_) => None,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct PoolOptions {
username: String,
password: String,
privilege: Option<Privilege>,
external_auth: bool,
tag: String,
match_any_tag: bool,
purity: Option<Purity>,
connection_class: String,
}
impl PoolOptions {
pub fn new() -> PoolOptions {
PoolOptions {
username: "".into(),
password: "".into(),
privilege: None,
external_auth: false,
tag: "".into(),
match_any_tag: false,
purity: None,
connection_class: "".into(),
}
}
pub fn username<S>(mut self, username: S) -> Self
where
S: Into<String>,
{
self.username = username.into();
self
}
pub fn password<S>(mut self, password: S) -> Self
where
S: Into<String>,
{
self.password = password.into();
self
}
pub fn privilege(mut self, privilege: Privilege) -> Self {
self.privilege = Some(privilege);
self
}
pub fn external_auth(mut self, enable: bool) -> Self {
self.external_auth = enable;
self
}
pub fn tag<S>(mut self, tag: S) -> Self
where
S: Into<String>,
{
self.tag = tag.into();
self
}
pub fn match_any_tag(mut self, enable: bool) -> Self {
self.match_any_tag = enable;
self
}
pub fn purity(mut self, purity: Purity) -> Self {
self.purity = Some(purity);
self
}
pub fn connection_class<S>(mut self, connection_class: S) -> Self
where
S: Into<String>,
{
self.connection_class = connection_class.into();
self
}
fn to_dpi_conn_create_params(&self, ctxt: &Context) -> dpiConnCreateParams {
let mut conn_params = ctxt.conn_create_params();
if let Some(privilege) = self.privilege {
conn_params.authMode |= privilege.to_dpi();
}
conn_params.externalAuth = i32::from(self.external_auth);
let s = OdpiStr::new(&self.tag);
conn_params.tag = s.ptr;
conn_params.tagLength = s.len;
conn_params.matchAnyTag = i32::from(self.match_any_tag);
if let Some(purity) = self.purity {
conn_params.purity = purity.to_dpi();
}
let s = OdpiStr::new(&self.connection_class);
conn_params.connectionClass = s.ptr;
conn_params.connectionClassLength = s.len;
conn_params
}
}
impl Default for PoolOptions {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
struct U32Seconds(u32);
impl U32Seconds {
fn try_from(dur: Duration, msg: &str) -> Result<U32Seconds> {
match dur.as_secs().try_into() {
Ok(secs) => Ok(U32Seconds(secs)),
Err(err) => {
Err(Error::out_of_range(format!("too long {} {:?}", msg, dur)).add_source(err))
}
}
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
struct U32Milliseconds(u32);
impl U32Milliseconds {
fn try_from(dur: Duration, msg: &str) -> Result<U32Milliseconds> {
match dur.as_millis().try_into() {
Ok(secs) => Ok(U32Milliseconds(secs)),
Err(err) => {
Err(Error::out_of_range(format!("too long {} {:?}", msg, dur)).add_source(err))
}
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct PoolBuilder {
username: String,
password: String,
connect_string: String,
min_connections: Option<u32>,
max_connections: Option<u32>,
connection_increment: Option<u32>,
ping_interval: Option<I32Seconds>,
ping_timeout: Option<U32Milliseconds>,
homogeneous: Option<i32>,
external_auth: Option<bool>,
get_mode: Option<GetMode>,
timeout: Option<U32Seconds>,
max_lifetime_connection: Option<U32Seconds>,
plsql_fixup_callback: Option<String>,
max_connections_per_shard: Option<u32>,
common_params: CommonCreateParamsBuilder,
}
impl PoolBuilder {
pub fn new<U, P, C>(username: U, password: P, connect_string: C) -> PoolBuilder
where
U: Into<String>,
P: Into<String>,
C: Into<String>,
{
PoolBuilder {
username: username.into(),
password: password.into(),
connect_string: connect_string.into(),
min_connections: None,
max_connections: None,
connection_increment: None,
ping_interval: None,
ping_timeout: None,
homogeneous: None,
external_auth: None,
get_mode: None,
timeout: None,
max_lifetime_connection: None,
plsql_fixup_callback: None,
max_connections_per_shard: None,
common_params: Default::default(),
}
}
pub fn min_connections(&mut self, num: u32) -> &mut PoolBuilder {
self.min_connections = Some(num);
self
}
pub fn max_connections(&mut self, num: u32) -> &mut PoolBuilder {
self.max_connections = Some(num);
self
}
pub fn connection_increment(&mut self, num: u32) -> &mut PoolBuilder {
self.connection_increment = Some(num);
self
}
pub fn ping_interval(&mut self, dur: Option<Duration>) -> Result<&mut PoolBuilder> {
self.ping_interval = Some(I32Seconds::try_from(dur, "ping interval")?);
Ok(self)
}
pub fn ping_timeout(&mut self, dur: Duration) -> Result<&mut PoolBuilder> {
self.ping_timeout = Some(U32Milliseconds::try_from(dur, "ping timeout")?);
Ok(self)
}
pub fn pool_type(&mut self, pool_type: PoolType) -> &mut PoolBuilder {
self.homogeneous = Some(match pool_type {
PoolType::Homogeneous => 1,
PoolType::Heterogeneous => 0,
});
self
}
pub fn external_auth(&mut self, b: bool) -> &mut PoolBuilder {
self.external_auth = Some(b);
self
}
pub fn get_mode(&mut self, mode: GetMode) -> &mut PoolBuilder {
self.get_mode = Some(mode);
self
}
pub fn timeout(&mut self, dur: Duration) -> Result<&mut PoolBuilder> {
self.timeout = Some(U32Seconds::try_from(dur, "timeout")?);
Ok(self)
}
pub fn max_lifetime_connection(&mut self, dur: Duration) -> Result<&mut PoolBuilder> {
self.max_lifetime_connection = Some(U32Seconds::try_from(dur, "max lifetime connection")?);
Ok(self)
}
pub fn plsql_fixup_callback<T>(&mut self, plsql: T) -> &mut PoolBuilder
where
T: Into<String>,
{
self.plsql_fixup_callback = Some(plsql.into());
self
}
pub fn max_connections_per_shard(&mut self, num: u32) -> &mut PoolBuilder {
self.max_connections_per_shard = Some(num);
self
}
fn to_dpi_pool_create_params(&self, ctxt: &Context) -> Result<dpiPoolCreateParams> {
let mut pool_params = ctxt.pool_create_params();
if let Some(val) = self.min_connections {
pool_params.minSessions = val;
}
if let Some(val) = self.max_connections {
pool_params.maxSessions = val;
}
if let Some(val) = self.connection_increment {
pool_params.sessionIncrement = val;
}
if let Some(val) = self.ping_interval {
pool_params.pingInterval = val.0;
}
if let Some(val) = self.ping_timeout {
pool_params.pingTimeout = val.0 as i32;
}
if let Some(val) = self.homogeneous {
pool_params.homogeneous = val;
}
if let Some(val) = self.external_auth {
pool_params.externalAuth = i32::from(val);
}
if let Some(val) = self.get_mode {
pool_params.getMode = val.to_dpi_value();
if let Some(wait_timeout) = val.to_wait_timeout()? {
pool_params.waitTimeout = wait_timeout;
}
}
if let Some(val) = self.timeout {
pool_params.timeout = val.0;
}
if let Some(val) = self.max_lifetime_connection {
pool_params.maxLifetimeSession = val.0;
}
if let Some(ref val) = self.plsql_fixup_callback {
let s = OdpiStr::new(val);
pool_params.plsqlFixupCallback = s.ptr;
pool_params.plsqlFixupCallbackLength = s.len;
}
if let Some(val) = self.max_connections_per_shard {
pool_params.maxSessionsPerShard = val;
}
Ok(pool_params)
}
pub fn events(&mut self, b: bool) -> &mut PoolBuilder {
self.common_params.events(b);
self
}
pub fn edition<S>(&mut self, edition: S) -> &mut PoolBuilder
where
S: Into<String>,
{
self.common_params.edition(edition);
self
}
pub fn driver_name<S>(&mut self, driver_name: S) -> &mut PoolBuilder
where
S: Into<String>,
{
self.common_params.driver_name(driver_name);
self
}
pub fn stmt_cache_size(&mut self, size: u32) -> &mut PoolBuilder {
self.common_params.stmt_cache_size(size);
self
}
pub fn build(&self) -> Result<Pool> {
let ctxt = Context::new0()?;
let username = OdpiStr::new(&self.username);
let password = OdpiStr::new(&self.password);
let connect_string = OdpiStr::new(&self.connect_string);
let common_params = self.common_params.build(&ctxt);
let mut pool_params = self.to_dpi_pool_create_params(&ctxt)?;
let mut handle = ptr::null_mut();
chkerr!(
&ctxt,
dpiPool_create(
ctxt.context,
username.ptr,
username.len,
password.ptr,
password.len,
connect_string.ptr,
connect_string.len,
&common_params,
&mut pool_params,
&mut handle
)
);
Ok(Pool {
ctxt,
handle: DpiPool::new(handle),
})
}
}
#[derive(Clone)]
pub struct Pool {
ctxt: Context,
handle: DpiPool,
}
impl Pool {
fn handle(&self) -> *mut dpiPool {
self.handle.raw()
}
fn ctxt(&self) -> &Context {
&self.ctxt
}
pub fn get(&self) -> Result<Connection> {
self.get_with_options(&PoolOptions::new())
}
pub fn get_with_options(&self, options: &PoolOptions) -> Result<Connection> {
let ctxt = Context::new()?;
let username = OdpiStr::new(&options.username);
let password = OdpiStr::new(&options.password);
let mut conn_params = options.to_dpi_conn_create_params(&ctxt);
let mut handle = ptr::null_mut();
chkerr!(
&ctxt,
dpiPool_acquireConnection(
self.handle(),
username.ptr,
username.len,
password.ptr,
password.len,
&mut conn_params,
&mut handle
)
);
ctxt.set_warning();
Ok(Connection::from_dpi_handle(ctxt, handle, &conn_params))
}
pub fn close(&self, mode: &CloseMode) -> Result<()> {
chkerr!(
self.ctxt(),
dpiPool_close(self.handle(), mode.to_dpi_value())
);
Ok(())
}
pub fn busy_count(&self) -> Result<u32> {
let mut count = 0;
chkerr!(self.ctxt(), dpiPool_getBusyCount(self.handle(), &mut count));
Ok(count)
}
pub fn get_mode(&self) -> Result<GetMode> {
let mut val = 0;
chkerr!(self.ctxt(), dpiPool_getGetMode(self.handle(), &mut val));
match val as u32 {
DPI_MODE_POOL_GET_WAIT => Ok(GetMode::Wait),
DPI_MODE_POOL_GET_NOWAIT => Ok(GetMode::NoWait),
DPI_MODE_POOL_GET_FORCEGET => Ok(GetMode::ForceGet),
DPI_MODE_POOL_GET_TIMEDWAIT => {
let mut val = 0;
chkerr!(self.ctxt(), dpiPool_getWaitTimeout(self.handle(), &mut val));
Ok(GetMode::TimedWait(Duration::from_millis(val.into())))
}
_ => Err(Error::internal_error(format!(
"unknown dpiPoolGetMode {}",
val
))),
}
}
pub fn set_get_mode(&mut self, mode: &GetMode) -> Result<()> {
let get_mode = mode.to_dpi_value();
let wait_timeout = mode.to_wait_timeout()?;
chkerr!(self.ctxt(), dpiPool_setGetMode(self.handle(), get_mode));
if let Some(msecs) = wait_timeout {
chkerr!(self.ctxt(), dpiPool_setWaitTimeout(self.handle(), msecs));
}
Ok(())
}
pub fn max_lifetime_connection(&self) -> Result<Duration> {
let mut val = 0;
chkerr!(
self.ctxt(),
dpiPool_getMaxLifetimeSession(self.handle(), &mut val)
);
Ok(Duration::from_secs(val.into()))
}
pub fn set_max_lifetime_connection(&mut self, dur: Duration) -> Result<()> {
let val = U32Seconds::try_from(dur, "max lifetime connection")?;
chkerr!(
self.ctxt(),
dpiPool_setMaxLifetimeSession(self.handle(), val.0)
);
Ok(())
}
pub fn max_connections_per_shard(&self) -> Result<u32> {
let mut val = 0;
chkerr!(
self.ctxt(),
dpiPool_getMaxSessionsPerShard(self.handle(), &mut val)
);
Ok(val)
}
pub fn set_max_connections_per_shard(&mut self, max_connections: u32) -> Result<()> {
chkerr!(
self.ctxt(),
dpiPool_setMaxSessionsPerShard(self.handle(), max_connections)
);
Ok(())
}
pub fn open_count(&self) -> Result<u32> {
let mut val = 0;
chkerr!(self.ctxt(), dpiPool_getOpenCount(self.handle(), &mut val));
Ok(val)
}
pub fn ping_interval(&self) -> Result<Option<Duration>> {
let mut val = 0;
chkerr!(
self.ctxt(),
dpiPool_getPingInterval(self.handle(), &mut val)
);
Ok(I32Seconds(val).into())
}
pub fn set_ping_interval(&mut self, interval: Option<Duration>) -> Result<()> {
let val = I32Seconds::try_from(interval, "ping interval")?;
chkerr!(self.ctxt(), dpiPool_setPingInterval(self.handle(), val.0));
Ok(())
}
pub fn reconfigure(
&self,
min_connections: u32,
max_connections: u32,
connection_increment: u32,
) -> Result<()> {
chkerr!(
self.ctxt(),
dpiPool_reconfigure(
self.handle(),
min_connections,
max_connections,
connection_increment
)
);
Ok(())
}
#[doc(hidden)] pub fn soda_metadata_cache(&self) -> Result<bool> {
let mut val = 0;
chkerr!(
self.ctxt(),
dpiPool_getSodaMetadataCache(self.handle(), &mut val)
);
Ok(val != 0)
}
#[doc(hidden)] pub fn set_soda_metadata_cache(&mut self, enabled: bool) -> Result<()> {
let enabled = i32::from(enabled);
chkerr!(
self.ctxt(),
dpiPool_setSodaMetadataCache(self.handle(), enabled)
);
Ok(())
}
pub fn stmt_cache_size(&self) -> Result<u32> {
let mut val = 0;
chkerr!(
self.ctxt(),
dpiPool_getStmtCacheSize(self.handle(), &mut val)
);
Ok(val)
}
pub fn set_stmt_cache_size(&mut self, cache_size: u32) -> Result<()> {
chkerr!(
self.ctxt(),
dpiPool_setStmtCacheSize(self.handle(), cache_size)
);
Ok(())
}
pub fn timeout(&self) -> Result<Duration> {
let mut val = 0;
chkerr!(self.ctxt(), dpiPool_getTimeout(self.handle(), &mut val));
Ok(Duration::from_secs(val.into()))
}
pub fn set_timeout(&mut self, timeout: Duration) -> Result<()> {
let val = U32Seconds::try_from(timeout, "timeout")?;
chkerr!(self.ctxt(), dpiPool_setTimeout(self.handle(), val.0));
Ok(())
}
}
impl fmt::Debug for Pool {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Poll {{ handle: {:?}", self.handle())
}
}
impl AssertSync for Pool {}
impl AssertSend for Pool {}