use std::{sync::{Arc, atomic::{AtomicUsize, Ordering}}, marker::PhantomData};
use parking_lot::RwLock;
use crate::{oci::{self, *}, pool::SessionPool, session::SessionTagInfo, task, Environment, Result, Statement};
use super::{SvcCtx, Session};
impl SvcCtx {
async fn new(env: &Environment, dblink: &str, user: &str, pass: &str, mode: u32) -> Result<Self> {
let err = Handle::<OCIError>::new(&env)?;
let inf = Handle::<OCIAuthInfo>::new(&env)?;
inf.set_attr(OCI_ATTR_DRIVER_NAME, "sibyl", &err)?;
inf.set_attr(OCI_ATTR_USERNAME, user, &err)?;
inf.set_attr(OCI_ATTR_PASSWORD, pass, &err)?;
let env = env.get_env();
let dblink = String::from(dblink);
task::execute_blocking(move || -> Result<Self> {
let mut svc = Ptr::<OCISvcCtx>::null();
oci::session_get(
env.as_ref(), err.as_ref(), svc.as_mut_ptr(), inf.as_ref(),
dblink.as_ptr(), dblink.len() as _,
mode | OCI_SESSGET_STMTCACHE
)?;
Ok(Self { svc, inf, err, env, spool: None, tag: None, active_future: AtomicUsize::new(0) })
}).await?
}
fn set_oci_nonblocking_mode(&self, mode: u8) -> Result<()> {
let srv: Ptr<OCIServer> = attr::get(OCI_ATTR_SERVER, OCI_HTYPE_SVCCTX, self.svc.as_ref(), self.err.as_ref())?;
let curr_mode : u8 = attr::get(OCI_ATTR_NONBLOCKING_MODE, OCI_HTYPE_SERVER, srv.as_ref(), self.as_ref())?;
if curr_mode != mode {
attr::set(OCI_ATTR_NONBLOCKING_MODE, mode, OCI_HTYPE_SERVER, srv.as_ref(), self.as_ref())
} else {
Ok(())
}
}
pub(crate) fn set_nonblocking_mode(&self) -> Result<()> {
self.set_oci_nonblocking_mode(1)
}
pub(crate) fn set_blocking_mode(&self) -> Result<()> {
self.set_oci_nonblocking_mode(0)
}
async fn from_session_pool<'a>(pool: &'a SessionPool<'a>, tag: &str) -> Result<(Self,bool)> {
let spool = pool.get_spool();
let env = spool.get_env();
let err = Handle::<OCIError>::new(env.as_ref())?;
let inf = Handle::<OCIAuthInfo>::new(env.as_ref())?;
let tag_len = tag.len() as u32;
let tag_ptr = Ptr::new(tag.as_ptr());
let tag_mode = if tag_len > 0 && tag.find('=').is_some() { OCI_SESSGET_MULTIPROPERTY_TAG } else { OCI_DEFAULT };
task::execute_blocking(move || -> Result<(Self,bool)> {
let name = spool.get_name();
let mut svc = Ptr::<OCISvcCtx>::null();
let mut found = oci::Aligned::new(0u8);
let mut ret_tag: *const u8 = std::ptr::null();
let mut ret_tag_len: u32 = 0;
oci::session_get_tagged(
env.as_ref(), err.as_ref(), svc.as_mut_ptr(), inf.as_ref(),
name.as_ptr(), name.len() as _,
tag_ptr.get(), tag_len, &mut ret_tag, &mut ret_tag_len,
found.as_mut_ptr(), tag_mode | OCI_SESSGET_SPOOL | OCI_SESSGET_PURITY_SELF
)?;
let found = <u8>::from(found) != 0;
let svc_ctx = Self {svc, inf, err, env,
spool: Some(spool),
tag: Some(RwLock::new(SessionTagInfo {
tag_ptr: Ptr::new(ret_tag),
tag_len: ret_tag_len as _,
new_tag: String::new(),
})),
active_future: AtomicUsize::new(0)
};
Ok((svc_ctx, found))
}).await?
}
pub(crate) fn lock(&self, id: usize) -> bool {
if let Err(current) = self.active_future.compare_exchange(0, id, Ordering::AcqRel, Ordering::Relaxed) {
current == id
} else {
true
}
}
pub(crate) fn unlock(&self) {
self.active_future.store(0, Ordering::Release)
}
}
impl<'a> Session<'a> {
pub(crate) async fn new(env: &Environment, dblink: &str, user: &str, pass: &str, mode: u32) -> Result<Self> {
let ctx = SvcCtx::new(env, dblink, user, pass, mode).await?;
ctx.set_nonblocking_mode()?;
let usr: Ptr<OCISession> = attr::get(OCI_ATTR_SESSION, OCI_HTYPE_SVCCTX, ctx.svc.as_ref(), ctx.as_ref())?;
let ctx = Arc::new(ctx);
Ok(Self { ctx, usr, phantom_env: PhantomData })
}
pub(crate) async fn from_session_pool(pool: &SessionPool<'_>, tag: &str) -> Result<(Self,bool)> {
let (ctx, found) = SvcCtx::from_session_pool(pool, tag).await?;
ctx.set_nonblocking_mode()?;
let usr: Ptr<OCISession> = attr::get(OCI_ATTR_SESSION, OCI_HTYPE_SVCCTX, ctx.svc.as_ref(), ctx.as_ref())?;
let ctx = Arc::new(ctx);
Ok((Self {ctx, usr, phantom_env: PhantomData}, found))
}
pub(crate) fn set_nonblocking_mode(&self) -> Result<()> {
self.ctx.set_nonblocking_mode()
}
pub(crate) fn set_blocking_mode(&self) -> Result<()> {
self.ctx.set_blocking_mode()
}
pub async fn ping(&self) -> Result<()> {
futures::Ping::new(self.get_svc()).await
}
pub async fn commit(&self) -> Result<()> {
futures::TransCommit::new(self.get_svc()).await
}
pub async fn rollback(&self) -> Result<()> {
futures::TransRollback::new(self.get_svc()).await
}
pub async fn prepare(&'a self, sql: &str) -> Result<Statement<'a>> {
Statement::new(sql, self).await
}
}
#[cfg(test)]
mod tests {
use crate::{Environment, Result};
#[test]
fn async_connect_static_env() -> Result<()> {
crate::block_on(async {
use std::env;
use once_cell::sync::OnceCell;
static ORACLE : OnceCell<Environment> = OnceCell::new();
let oracle = ORACLE.get_or_try_init(|| {
Environment::new()
})?;
let dbname = env::var("DBNAME").expect("database name");
let dbuser = env::var("DBUSER").expect("user name");
let dbpass = env::var("DBPASS").expect("password");
let session = oracle.connect(&dbname, &dbuser, &dbpass).await?;
session.ping().await?;
Ok(())
})
}
}