use crate::binding::*;
use crate::chkerr;
use crate::connection::Conn;
use crate::new_odpi_str;
use crate::sql_type::Object;
use crate::sql_type::ObjectType;
use crate::sql_type::OracleType;
use crate::sql_type::Timestamp;
use crate::to_odpi_str;
use crate::to_rust_slice;
use crate::Connection;
use crate::Context;
use crate::DpiMsgProps;
use crate::DpiQueue;
use crate::Error;
use crate::Result;
use std::borrow::ToOwned;
use std::fmt;
use std::marker::PhantomData;
use std::os::raw::c_char;
use std::ptr;
use std::time::Duration;
pub trait Payload: ToOwned {
type TypeInfo;
fn payload_type(payload_type: &Self::TypeInfo) -> Result<Option<ObjectType>>;
fn get(props: &MsgProps<Self>) -> Result<Self::Owned>;
fn set(&self, props: &mut MsgProps<Self>) -> Result<()>;
}
impl Payload for [u8] {
type TypeInfo = ();
fn payload_type(_payload_type: &Self::TypeInfo) -> Result<Option<ObjectType>> {
Ok(None)
}
fn get(props: &MsgProps<Self>) -> Result<Vec<u8>> {
let mut ptr = ptr::null();
let mut len = 0;
chkerr!(
props.ctxt(),
dpiMsgProps_getPayload(props.handle.raw, ptr::null_mut(), &mut ptr, &mut len)
);
Ok(to_rust_slice(ptr, len).to_vec())
}
fn set(&self, props: &mut MsgProps<Self>) -> Result<()> {
chkerr!(
props.ctxt(),
dpiMsgProps_setPayloadBytes(
props.handle.raw,
self.as_ptr() as *const c_char,
self.len() as u32
)
);
props.payload_type = None;
Ok(())
}
}
impl Payload for Object {
type TypeInfo = ObjectType;
fn payload_type(payload_type: &Self::TypeInfo) -> Result<Option<ObjectType>> {
Ok(Some(payload_type.clone()))
}
fn get(props: &MsgProps<Self>) -> Result<Object> {
let objtype = props.payload_type.as_ref().ok_or(Error::NoDataFound)?;
let mut obj_handle = ptr::null_mut();
chkerr!(
props.ctxt(),
dpiMsgProps_getPayload(
props.handle.raw,
&mut obj_handle,
ptr::null_mut(),
ptr::null_mut()
)
);
Ok(Object::new(props.conn.clone(), obj_handle, objtype.clone()))
}
fn set(&self, props: &mut MsgProps<Self>) -> Result<()> {
chkerr!(
props.ctxt(),
dpiMsgProps_setPayloadObject(props.handle.raw, self.handle)
);
props.payload_type = Some(self.object_type().clone());
Ok(())
}
}
pub struct Queue<T>
where
T: Payload + ?Sized,
{
conn: Conn,
handle: DpiQueue,
payload_type: Option<ObjectType>,
enq_options: Option<EnqOptions>,
deq_options: Option<DeqOptions>,
phantom: PhantomData<T>,
}
impl<'a, T: 'a> Queue<T>
where
T: Payload + ?Sized,
{
fn handle(&self) -> *mut dpiQueue {
self.handle.raw
}
fn ctxt(&self) -> &Context {
self.conn.ctxt()
}
pub fn new(
conn: &Connection,
queue_name: &str,
payload_type: &T::TypeInfo,
) -> Result<Queue<T>> {
let mut handle = ptr::null_mut();
let name = to_odpi_str(queue_name);
let payload_type = T::payload_type(payload_type)?;
let objtype = payload_type
.as_ref()
.map(|t| t.handle().raw)
.unwrap_or(ptr::null_mut());
chkerr!(
conn.ctxt(),
dpiConn_newQueue(conn.handle(), name.ptr, name.len, objtype, &mut handle)
);
Ok(Queue {
conn: conn.conn.clone(),
handle: DpiQueue::new(handle),
payload_type,
enq_options: None,
deq_options: None,
phantom: PhantomData,
})
}
pub fn dequeue(&self) -> Result<MsgProps<T>> {
let mut props = ptr::null_mut();
chkerr!(self.ctxt(), dpiQueue_deqOne(self.handle(), &mut props));
Ok(MsgProps::from_dpi_msg_props(
self.conn.clone(),
DpiMsgProps::new(props),
self.payload_type.clone(),
))
}
pub fn dequeue_many(&self, max_size: u32) -> Result<Vec<MsgProps<T>>> {
let mut num_props = max_size;
let mut handles = Vec::<DpiMsgProps>::with_capacity(max_size as usize);
chkerr!(
self.ctxt(),
dpiQueue_deqMany(
self.handle(),
&mut num_props,
handles.as_mut_ptr() as *mut *mut dpiMsgProps
)
);
let num_props = num_props as usize;
unsafe {
handles.set_len(num_props);
}
let props: Vec<_> = handles
.into_iter()
.map(|handle| {
MsgProps::from_dpi_msg_props(self.conn.clone(), handle, self.payload_type.clone())
})
.collect();
Ok(props)
}
pub fn enqueue(&self, props: &MsgProps<T>) -> Result<()> {
chkerr!(self.ctxt(), dpiQueue_enqOne(self.handle(), props.handle()));
Ok(())
}
pub fn enqueue_many<I>(&self, props: I) -> Result<()>
where
I: IntoIterator<Item = &'a MsgProps<T>>,
{
let iter = props.into_iter();
let (lower, _) = iter.size_hint();
let mut raw_props = Vec::with_capacity(lower);
for msg in iter {
let handle = msg.handle();
raw_props.push(handle);
unsafe {
dpiMsgProps_addRef(handle);
}
}
chkerr!(
self.ctxt(),
dpiQueue_enqMany(
self.handle(),
raw_props.len() as u32,
raw_props.as_mut_ptr()
),
for handle in raw_props {
unsafe {
dpiMsgProps_release(handle);
}
}
);
for handle in raw_props {
unsafe {
dpiMsgProps_release(handle);
}
}
Ok(())
}
pub fn deq_options(&mut self) -> Result<&mut DeqOptions> {
if self.deq_options.is_none() {
let mut handle = ptr::null_mut();
chkerr!(
self.ctxt(),
dpiQueue_getDeqOptions(self.handle(), &mut handle)
);
self.deq_options = Some(DeqOptions::new(self.ctxt().clone(), handle));
}
Ok(self.deq_options.as_mut().unwrap())
}
pub fn enq_options(&mut self) -> Result<&mut EnqOptions> {
if self.enq_options.is_none() {
let mut handle = ptr::null_mut();
chkerr!(
self.ctxt(),
dpiQueue_getEnqOptions(self.handle(), &mut handle)
);
self.enq_options = Some(EnqOptions::new(self.ctxt().clone(), handle));
}
Ok(self.enq_options.as_mut().unwrap())
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum MessageDeliveryMode {
Persistent,
Buffered,
PersistentOrBuffered,
}
impl MessageDeliveryMode {
fn from_dpi_value(val: dpiMessageDeliveryMode) -> Result<MessageDeliveryMode> {
match val as u32 {
DPI_MODE_MSG_PERSISTENT => Ok(MessageDeliveryMode::Persistent),
DPI_MODE_MSG_BUFFERED => Ok(MessageDeliveryMode::Buffered),
DPI_MODE_MSG_PERSISTENT_OR_BUFFERED => Ok(MessageDeliveryMode::PersistentOrBuffered),
_ => Err(Error::InternalError(format!(
"unknown dpiMessageDeliveryMode {}",
val
))),
}
}
fn to_dpi_value(&self) -> dpiMessageDeliveryMode {
match self {
MessageDeliveryMode::Persistent => DPI_MODE_MSG_PERSISTENT as dpiMessageDeliveryMode,
MessageDeliveryMode::Buffered => DPI_MODE_MSG_PERSISTENT as dpiMessageDeliveryMode,
MessageDeliveryMode::PersistentOrBuffered => {
DPI_MODE_MSG_PERSISTENT as dpiMessageDeliveryMode
}
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum MessageState {
Ready,
Waiting,
Processed,
Expired,
}
impl MessageState {
fn from_dpi_value(val: dpiMessageState) -> Result<MessageState> {
match val {
DPI_MSG_STATE_READY => Ok(MessageState::Ready),
DPI_MSG_STATE_WAITING => Ok(MessageState::Waiting),
DPI_MSG_STATE_PROCESSED => Ok(MessageState::Processed),
DPI_MSG_STATE_EXPIRED => Ok(MessageState::Expired),
_ => Err(Error::InternalError(format!(
"unknown dpiMessageState {}",
val
))),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum DeqMode {
Browse,
Locked,
Remove,
RemoveNoData,
}
impl DeqMode {
fn from_dpi_value(val: dpiDeqMode) -> Result<DeqMode> {
match val {
DPI_MODE_DEQ_BROWSE => Ok(DeqMode::Browse),
DPI_MODE_DEQ_LOCKED => Ok(DeqMode::Locked),
DPI_MODE_DEQ_REMOVE => Ok(DeqMode::Remove),
DPI_MODE_DEQ_REMOVE_NO_DATA => Ok(DeqMode::RemoveNoData),
_ => Err(Error::InternalError(format!("unknown dpiDeqMode {}", val))),
}
}
fn to_dpi_value(&self) -> dpiDeqMode {
match self {
DeqMode::Browse => DPI_MODE_DEQ_BROWSE,
DeqMode::Locked => DPI_MODE_DEQ_LOCKED,
DeqMode::Remove => DPI_MODE_DEQ_REMOVE,
DeqMode::RemoveNoData => DPI_MODE_DEQ_REMOVE_NO_DATA,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum DeqNavigation {
FirstMessage,
NextTransaction,
NextMessage,
}
impl DeqNavigation {
fn from_dpi_value(val: dpiDeqNavigation) -> Result<DeqNavigation> {
match val {
DPI_DEQ_NAV_FIRST_MSG => Ok(DeqNavigation::FirstMessage),
DPI_DEQ_NAV_NEXT_TRANSACTION => Ok(DeqNavigation::NextTransaction),
DPI_DEQ_NAV_NEXT_MSG => Ok(DeqNavigation::NextMessage),
_ => Err(Error::InternalError(format!(
"unknown dpiDeqNavigation {}",
val
))),
}
}
fn to_dpi_value(&self) -> dpiDeqNavigation {
match self {
DeqNavigation::FirstMessage => DPI_DEQ_NAV_FIRST_MSG,
DeqNavigation::NextTransaction => DPI_DEQ_NAV_NEXT_TRANSACTION,
DeqNavigation::NextMessage => DPI_DEQ_NAV_NEXT_MSG,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Visibility {
Immediate,
OnCommit,
}
impl Visibility {
fn from_dpi_value(val: dpiVisibility) -> Result<Visibility> {
match val {
DPI_VISIBILITY_IMMEDIATE => Ok(Visibility::Immediate),
DPI_VISIBILITY_ON_COMMIT => Ok(Visibility::OnCommit),
_ => Err(Error::InternalError(format!(
"unknown dpiVisibility {}",
val
))),
}
}
fn to_dpi_value(&self) -> dpiVisibility {
match self {
Visibility::Immediate => DPI_VISIBILITY_IMMEDIATE,
Visibility::OnCommit => DPI_VISIBILITY_ON_COMMIT,
}
}
}
pub struct DeqOptions {
ctxt: Context,
handle: *mut dpiDeqOptions,
}
impl DeqOptions {
fn new(ctxt: Context, handle: *mut dpiDeqOptions) -> DeqOptions {
DeqOptions { ctxt, handle }
}
fn ctxt(&self) -> &Context {
&self.ctxt
}
pub fn condition(&self) -> Result<String> {
let mut s = new_odpi_str();
chkerr!(
self.ctxt(),
dpiDeqOptions_getCondition(self.handle, &mut s.ptr, &mut s.len)
);
Ok(s.to_string())
}
pub fn consumer_name(&self) -> Result<String> {
let mut s = new_odpi_str();
chkerr!(
self.ctxt(),
dpiDeqOptions_getConsumerName(self.handle, &mut s.ptr, &mut s.len)
);
Ok(s.to_string())
}
pub fn correlation(&self) -> Result<String> {
let mut s = new_odpi_str();
chkerr!(
self.ctxt(),
dpiDeqOptions_getCorrelation(self.handle, &mut s.ptr, &mut s.len)
);
Ok(s.to_string())
}
pub fn mode(&self) -> Result<DeqMode> {
let mut val = 0;
chkerr!(self.ctxt(), dpiDeqOptions_getMode(self.handle, &mut val));
DeqMode::from_dpi_value(val)
}
pub fn message_id(&self) -> Result<Vec<u8>> {
let mut msg = new_odpi_str();
chkerr!(
self.ctxt(),
dpiDeqOptions_getMsgId(self.handle, &mut msg.ptr, &mut msg.len)
);
Ok(msg.to_vec())
}
pub fn navigation(&self) -> Result<DeqNavigation> {
let mut val = 0;
chkerr!(
self.ctxt(),
dpiDeqOptions_getNavigation(self.handle, &mut val)
);
DeqNavigation::from_dpi_value(val)
}
pub fn transformation(&self) -> Result<String> {
let mut s = new_odpi_str();
chkerr!(
self.ctxt(),
dpiDeqOptions_getTransformation(self.handle, &mut s.ptr, &mut s.len)
);
Ok(s.to_string())
}
pub fn visibility(&self) -> Result<Visibility> {
let mut val = 0;
chkerr!(
self.ctxt(),
dpiDeqOptions_getVisibility(self.handle, &mut val)
);
Visibility::from_dpi_value(val)
}
pub fn wait(&self) -> Result<Duration> {
let mut val = 0;
chkerr!(self.ctxt(), dpiDeqOptions_getWait(self.handle, &mut val));
Ok(Duration::from_secs(val as u64))
}
pub fn set_condition(&mut self, val: &str) -> Result<()> {
let val = to_odpi_str(val);
chkerr!(
self.ctxt(),
dpiDeqOptions_setCondition(self.handle, val.ptr, val.len)
);
Ok(())
}
pub fn set_consumer_name(&mut self, val: &str) -> Result<()> {
let val = to_odpi_str(val);
chkerr!(
self.ctxt(),
dpiDeqOptions_setConsumerName(self.handle, val.ptr, val.len)
);
Ok(())
}
pub fn set_correlation(&mut self, val: &str) -> Result<()> {
let val = to_odpi_str(val);
chkerr!(
self.ctxt(),
dpiDeqOptions_setCorrelation(self.handle, val.ptr, val.len)
);
Ok(())
}
pub fn set_delivery_mode(&mut self, val: &MessageDeliveryMode) -> Result<()> {
chkerr!(
self.ctxt(),
dpiDeqOptions_setDeliveryMode(self.handle, val.to_dpi_value())
);
Ok(())
}
pub fn set_mode(&mut self, val: &DeqMode) -> Result<()> {
chkerr!(
self.ctxt(),
dpiDeqOptions_setMode(self.handle, val.to_dpi_value())
);
Ok(())
}
pub fn set_message_id(&mut self, val: &[u8]) -> Result<()> {
let ptr = if val.is_empty() {
ptr::null()
} else {
val.as_ptr() as *const c_char
};
let len = val.len() as u32;
chkerr!(self.ctxt(), dpiDeqOptions_setMsgId(self.handle, ptr, len));
Ok(())
}
pub fn set_navigation(&mut self, val: &DeqNavigation) -> Result<()> {
chkerr!(
self.ctxt(),
dpiDeqOptions_setNavigation(self.handle, val.to_dpi_value())
);
Ok(())
}
pub fn set_transformation(&mut self, val: &str) -> Result<()> {
let val = to_odpi_str(val);
chkerr!(
self.ctxt(),
dpiDeqOptions_setTransformation(self.handle, val.ptr, val.len)
);
Ok(())
}
pub fn set_visibility(&mut self, val: &Visibility) -> Result<()> {
chkerr!(
self.ctxt(),
dpiDeqOptions_setVisibility(self.handle, val.to_dpi_value())
);
Ok(())
}
pub fn set_wait(&mut self, val: &Duration) -> Result<()> {
let secs = val.as_secs();
let secs = if secs > u32::max_value().into() {
u32::max_value()
} else {
secs as u32
};
chkerr!(self.ctxt(), dpiDeqOptions_setWait(self.handle, secs));
Ok(())
}
}
impl fmt::Debug for DeqOptions {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "DeqOptions {{ handle: {:?} }}", self.handle)
}
}
pub struct EnqOptions {
ctxt: Context,
handle: *mut dpiEnqOptions,
}
impl EnqOptions {
fn new(ctxt: Context, handle: *mut dpiEnqOptions) -> EnqOptions {
EnqOptions { ctxt, handle }
}
fn ctxt(&self) -> &Context {
&self.ctxt
}
pub fn transformation(&self) -> Result<String> {
let mut s = new_odpi_str();
chkerr!(
self.ctxt(),
dpiEnqOptions_getTransformation(self.handle, &mut s.ptr, &mut s.len)
);
Ok(s.to_string())
}
pub fn visibility(&self) -> Result<Visibility> {
let mut val = 0;
chkerr!(
self.ctxt(),
dpiEnqOptions_getVisibility(self.handle, &mut val)
);
Visibility::from_dpi_value(val)
}
pub fn set_delivery_mode(&mut self, val: &MessageDeliveryMode) -> Result<()> {
chkerr!(
self.ctxt(),
dpiEnqOptions_setDeliveryMode(self.handle, val.to_dpi_value())
);
Ok(())
}
pub fn set_transformation(&mut self, val: &str) -> Result<()> {
let val = to_odpi_str(val);
chkerr!(
self.ctxt(),
dpiEnqOptions_setTransformation(self.handle, val.ptr, val.len)
);
Ok(())
}
pub fn set_visibility(&mut self, val: &Visibility) -> Result<()> {
chkerr!(
self.ctxt(),
dpiEnqOptions_setVisibility(self.handle, val.to_dpi_value())
);
Ok(())
}
}
impl fmt::Debug for EnqOptions {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "EnqOptions {{ handle: {:?} }}", self.handle)
}
}
#[derive(Clone)]
pub struct MsgProps<T>
where
T: Payload + ?Sized,
{
conn: Conn,
handle: DpiMsgProps,
payload_type: Option<ObjectType>,
phantom: PhantomData<T>,
}
impl<T> MsgProps<T>
where
T: Payload + ?Sized,
{
fn handle(&self) -> *mut dpiMsgProps {
self.handle.raw()
}
fn ctxt(&self) -> &Context {
self.conn.ctxt()
}
pub fn new(conn: &Connection) -> Result<MsgProps<T>> {
let mut handle = ptr::null_mut();
chkerr!(conn.ctxt(), dpiConn_newMsgProps(conn.handle(), &mut handle));
Ok(MsgProps {
conn: conn.conn.clone(),
handle: DpiMsgProps::new(handle),
payload_type: None,
phantom: PhantomData,
})
}
fn from_dpi_msg_props(
conn: Conn,
handle: DpiMsgProps,
payload_type: Option<ObjectType>,
) -> MsgProps<T> {
MsgProps {
conn,
handle,
payload_type,
phantom: PhantomData,
}
}
pub fn num_attempts(&self) -> Result<i32> {
let mut val = 0;
chkerr!(
self.ctxt(),
dpiMsgProps_getNumAttempts(self.handle(), &mut val)
);
Ok(val)
}
pub fn correlation(&self) -> Result<String> {
let mut s = new_odpi_str();
chkerr!(
self.ctxt(),
dpiMsgProps_getCorrelation(self.handle(), &mut s.ptr, &mut s.len)
);
Ok(s.to_string())
}
pub fn delay(&self) -> Result<Duration> {
let mut secs = 0;
chkerr!(self.ctxt(), dpiMsgProps_getDelay(self.handle(), &mut secs));
Ok(Duration::from_secs(secs as u64))
}
pub fn delivery_mode(&self) -> Result<MessageDeliveryMode> {
let mut val = 0;
chkerr!(
self.ctxt(),
dpiMsgProps_getDeliveryMode(self.handle(), &mut val)
);
MessageDeliveryMode::from_dpi_value(val)
}
pub fn enq_time(&self) -> Result<Timestamp> {
let mut val = Default::default();
chkerr!(self.ctxt(), dpiMsgProps_getEnqTime(self.handle(), &mut val));
Ok(Timestamp::from_dpi_timestamp(&val, &OracleType::Date))
}
pub fn exception_queue(&self) -> Result<String> {
let mut s = new_odpi_str();
chkerr!(
self.ctxt(),
dpiMsgProps_getExceptionQ(self.handle(), &mut s.ptr, &mut s.len)
);
Ok(s.to_string())
}
pub fn expiration(&self) -> Result<Duration> {
let mut val = 0;
chkerr!(
self.ctxt(),
dpiMsgProps_getExpiration(self.handle(), &mut val)
);
Ok(Duration::from_secs(val as u64))
}
pub fn message_id(&self) -> Result<Vec<u8>> {
let mut msg = new_odpi_str();
chkerr!(
self.ctxt(),
dpiMsgProps_getMsgId(self.handle(), &mut msg.ptr, &mut msg.len)
);
Ok(msg.to_vec())
}
pub fn original_message_id(&self) -> Result<Vec<u8>> {
let mut msg = new_odpi_str();
chkerr!(
self.ctxt(),
dpiMsgProps_getOriginalMsgId(self.handle(), &mut msg.ptr, &mut msg.len)
);
Ok(msg.to_vec())
}
pub fn payload(&self) -> Result<T::Owned> {
T::get(self)
}
pub fn priority(&self) -> Result<i32> {
let mut val = 0;
chkerr!(
self.ctxt(),
dpiMsgProps_getPriority(self.handle(), &mut val)
);
Ok(val)
}
pub fn state(&self) -> Result<MessageState> {
let mut val = 0;
chkerr!(self.ctxt(), dpiMsgProps_getState(self.handle(), &mut val));
MessageState::from_dpi_value(val)
}
pub fn set_correlation(&mut self, val: &str) -> Result<()> {
let val = to_odpi_str(val);
chkerr!(
self.ctxt(),
dpiMsgProps_setCorrelation(self.handle(), val.ptr, val.len)
);
Ok(())
}
pub fn set_delay(&mut self, val: &Duration) -> Result<()> {
let secs = val.as_secs();
if secs > i32::max_value() as u64 {
Err(Error::OutOfRange(format!("too long duration {:?}", val)))
} else {
chkerr!(
self.ctxt(),
dpiMsgProps_setDelay(self.handle(), secs as i32)
);
Ok(())
}
}
pub fn set_exception_queue(&mut self, val: &str) -> Result<()> {
let val = to_odpi_str(val);
chkerr!(
self.ctxt(),
dpiMsgProps_setExceptionQ(self.handle(), val.ptr, val.len)
);
Ok(())
}
pub fn set_expiration(&mut self, val: &Duration) -> Result<()> {
let secs = val.as_secs();
if secs > i32::max_value() as u64 {
Err(Error::OutOfRange(format!("too long duration {:?}", val)))
} else {
chkerr!(
self.ctxt(),
dpiMsgProps_setExpiration(self.handle(), secs as i32)
);
Ok(())
}
}
pub fn set_original_message_id(&mut self, val: &[u8]) -> Result<()> {
let ptr = if val.is_empty() {
ptr::null()
} else {
val.as_ptr() as *const c_char
};
let len = val.len() as u32;
chkerr!(
self.ctxt(),
dpiMsgProps_setOriginalMsgId(self.handle(), ptr, len)
);
Ok(())
}
pub fn set_payload(&mut self, val: &T) -> Result<()> {
val.set(self)
}
pub fn set_priority(&mut self, val: i32) -> Result<()> {
chkerr!(self.ctxt(), dpiMsgProps_setPriority(self.handle(), val));
Ok(())
}
}
impl<T> fmt::Debug for MsgProps<T>
where
T: Payload,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "MsgProps {{ handle: {:?} }}", self.handle())
}
}