use std::ops::DerefMut;
use std::mem;
use std::path::Path;
use std::ptr;
use std::result::Result as StdResult;
use std::sync::Arc;
use ceph_rust::rados::{self, rados_t, rados_ioctx_t, rados_completion_t,
Struct_rados_cluster_stat_t};
use chrono::{DateTime, Local, TimeZone};
use ffi_pool::CStringPool;
use futures::prelude::*;
use libc;
use stable_deref_trait::StableDeref;
use async::Completion;
use errors::{self, Error, ErrorKind, Result};
lazy_static! {
static ref POOL: CStringPool = CStringPool::new(128);
}
pub struct ConnectionBuilder {
handle: rados_t,
}
impl ConnectionBuilder {
pub fn new() -> Result<ConnectionBuilder> {
let mut handle = ptr::null_mut();
errors::librados(unsafe { rados::rados_create(&mut handle, ptr::null()) })?;
Ok(ConnectionBuilder { handle })
}
pub fn with_user(user: &str) -> Result<ConnectionBuilder> {
let user_cstr = POOL.get_str(user)?;
let mut handle = ptr::null_mut();
errors::librados(unsafe {
rados::rados_create(&mut handle, user_cstr.as_ptr())
})?;
mem::drop(user_cstr);
Ok(ConnectionBuilder { handle })
}
pub fn read_conf_file(self, path: &Path) -> Result<ConnectionBuilder> {
let path_cstr = POOL.get_str(&path.to_string_lossy())?;
errors::librados(unsafe {
rados::rados_conf_read_file(self.handle, path_cstr.as_ptr())
})?;
Ok(self)
}
pub fn conf_set(self, option: &str, value: &str) -> Result<ConnectionBuilder> {
let option_cstr = POOL.get_str(option)?;
let value_cstr = POOL.get_str(value)?;
errors::librados(unsafe {
rados::rados_conf_set(self.handle, option_cstr.as_ptr(), value_cstr.as_ptr())
})?;
Ok(self)
}
pub fn connect(self) -> Result<Connection> {
errors::librados(unsafe { rados::rados_connect(self.handle) })?;
Ok(Connection {
_dummy: ptr::null(),
conn: Arc::new(ClusterHandle { handle: self.handle }),
})
}
}
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct ClusterStat {
pub kb: u64,
pub kb_used: u64,
pub kb_avail: u64,
pub num_objects: u64,
}
#[derive(Clone)]
struct ClusterHandle {
handle: rados_t,
}
impl Drop for ClusterHandle {
fn drop(&mut self) {
unsafe {
rados::rados_shutdown(self.handle);
}
}
}
pub struct Connection {
_dummy: *const (),
conn: Arc<ClusterHandle>,
}
unsafe impl Send for Connection {}
impl Connection {
pub fn stat(&mut self) -> Result<ClusterStat> {
let mut cluster_stat = Struct_rados_cluster_stat_t {
kb: 0,
kb_used: 0,
kb_avail: 0,
num_objects: 0,
};
errors::librados(unsafe {
rados::rados_cluster_stat(self.conn.handle, &mut cluster_stat)
})?;
Ok(ClusterStat {
kb: cluster_stat.kb,
kb_used: cluster_stat.kb_used,
kb_avail: cluster_stat.kb_avail,
num_objects: cluster_stat.num_objects,
})
}
pub fn get_pool_context(&mut self, pool_name: &str) -> Result<Context> {
let pool_name_cstr = POOL.get_str(pool_name)?;
let mut ioctx_handle = ptr::null_mut();
errors::librados(unsafe {
rados::rados_ioctx_create(self.conn.handle, pool_name_cstr.as_ptr(), &mut ioctx_handle)
})?;
Ok(Context {
_conn: self.conn.clone(),
handle: ioctx_handle,
})
}
pub fn get_pool_context_from_id(&mut self, pool_id: u64) -> Result<Context> {
let mut ioctx_handle = ptr::null_mut();
errors::librados(unsafe {
rados::rados_ioctx_create2(
self.conn.handle,
*(&pool_id as *const u64 as *const i64),
&mut ioctx_handle,
)
})?;
Ok(Context {
_conn: self.conn.clone(),
handle: ioctx_handle,
})
}
}
#[derive(Debug)]
pub struct UnitFuture {
completion_res: StdResult<Completion<()>, Option<Error>>,
}
impl UnitFuture {
fn new<F>(init: F) -> UnitFuture
where
F: FnOnce(rados_completion_t) -> Result<()>,
{
UnitFuture { completion_res: Completion::new((), init).map_err(Some) }
}
}
impl Future for UnitFuture {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.completion_res.as_mut() {
Ok(completion) => completion.poll().map(|async| async.map(|_| ())),
Err(error) => Err(error.take().unwrap()),
}
}
}
#[derive(Debug)]
pub struct DataFuture<T> {
completion_res: StdResult<Completion<T>, Option<Error>>,
}
impl<T> DataFuture<T> {
fn new<F>(data: T, init: F) -> DataFuture<T>
where
F: FnOnce(rados_completion_t) -> Result<()>,
{
DataFuture { completion_res: Completion::new(data, init).map_err(Some) }
}
}
impl<T> Future for DataFuture<T> {
type Item = T;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.completion_res.as_mut() {
Ok(completion) => completion.poll().map(|async| async.map(|ret| ret.data)),
Err(error) => Err(error.take().unwrap()),
}
}
}
pub struct ReadFuture<B>
where
B: StableDeref + DerefMut<Target = [u8]>,
{
completion_res: StdResult<Completion<B>, Option<Error>>,
}
impl<B> ReadFuture<B>
where
B: StableDeref + DerefMut<Target = [u8]>,
{
fn new<F>(buf: B, init: F) -> Self
where
F: FnOnce(rados_completion_t) -> Result<()>,
{
ReadFuture { completion_res: Completion::new(buf, init).map_err(Some) }
}
}
impl<B> Future for ReadFuture<B>
where
B: StableDeref + DerefMut<Target = [u8]>,
{
type Item = (u32, B);
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.completion_res.as_mut() {
Ok(completion) => {
completion.poll().map(|async| {
async.map(|ret| (ret.value, ret.data))
})
}
Err(error) => Err(error.take().unwrap()),
}
}
}
pub struct StatFuture {
data_future: DataFuture<Box<(u64, libc::time_t)>>,
}
impl Future for StatFuture {
type Item = Stat;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.data_future.poll().map(|async| {
async.map(|boxed| {
let (size, last_modified) = *boxed;
Stat {
size,
last_modified: Local.timestamp(last_modified, 0),
}
})
})
}
}
pub struct ExistsFuture {
unit_future: UnitFuture,
}
impl Future for ExistsFuture {
type Item = bool;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.unit_future.poll() {
Ok(Async::Ready(())) => Ok(Async::Ready(true)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(Error(ErrorKind::Rados(err_code), _)) if err_code == libc::ENOENT as u32 => {
Ok(Async::Ready(false))
}
Err(err) => Err(err),
}
}
}
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct Stat {
pub size: u64,
pub last_modified: DateTime<Local>,
}
pub struct Context {
_conn: Arc<ClusterHandle>,
handle: rados_ioctx_t,
}
unsafe impl Send for Context {}
impl Drop for Context {
fn drop(&mut self) {
unsafe {
rados::rados_ioctx_destroy(self.handle);
}
}
}
impl Context {
pub fn get_xattr(&mut self, obj: &str, key: &str, size: usize) -> Result<Vec<u8>> {
let obj_cstr = POOL.get_str(obj)?;
let key_cstr = POOL.get_str(key)?;
let mut buf = vec![0u8; size];
errors::librados(unsafe {
rados::rados_getxattr(
self.handle,
obj_cstr.as_ptr(),
key_cstr.as_ptr(),
buf.as_mut_ptr() as *mut libc::c_char,
buf.len(),
)
})?;
mem::drop(obj_cstr);
mem::drop(key_cstr);
Ok(buf)
}
pub fn set_xattr(&mut self, obj: &str, key: &str, value: &[u8]) -> Result<()> {
let obj_cstr = POOL.get_str(obj)?;
let key_cstr = POOL.get_str(key)?;
errors::librados(unsafe {
rados::rados_setxattr(
self.handle,
obj_cstr.as_ptr(),
key_cstr.as_ptr(),
value.as_ptr() as *const libc::c_char,
value.len(),
)
})?;
mem::drop(obj_cstr);
mem::drop(key_cstr);
Ok(())
}
pub fn write(&mut self, obj: &str, buf: &[u8], offset: u64) -> Result<()> {
let object_id = POOL.get_str(obj)?;
errors::librados(unsafe {
rados::rados_write(
self.handle,
object_id.as_ptr(),
buf.as_ptr() as *const libc::c_char,
buf.len(),
offset,
)
})?;
mem::drop(object_id);
Ok(())
}
pub fn write_full(&mut self, obj: &str, buf: &[u8]) -> Result<()> {
let object_id = POOL.get_str(obj)?;
errors::librados(unsafe {
rados::rados_write_full(
self.handle,
object_id.as_ptr(),
buf.as_ptr() as *const libc::c_char,
buf.len(),
)
})?;
mem::drop(object_id);
Ok(())
}
pub fn append(&mut self, obj: &str, buf: &[u8]) -> Result<()> {
let object_id = POOL.get_str(obj)?;
errors::librados(unsafe {
rados::rados_append(
self.handle,
object_id.as_ptr(),
buf.as_ptr() as *const libc::c_char,
buf.len(),
)
})?;
mem::drop(object_id);
Ok(())
}
pub fn read(&mut self, obj: &str, buf: &mut [u8], offset: u64) -> Result<usize> {
let object_id = POOL.get_str(obj)?;
let read = errors::librados_res(unsafe {
rados::rados_read(
self.handle,
object_id.as_ptr(),
buf.as_mut_ptr() as *mut libc::c_char,
buf.len(),
offset,
)
})?;
mem::drop(object_id);
Ok(read as usize)
}
pub fn remove(&mut self, obj: &str) -> Result<()> {
let object_id = POOL.get_str(obj)?;
errors::librados(unsafe {
rados::rados_remove(self.handle, object_id.as_ptr())
})?;
mem::drop(object_id);
Ok(())
}
pub fn resize(&mut self, obj: &str, size: u64) -> Result<()> {
let object_id = POOL.get_str(obj)?;
errors::librados(unsafe {
rados::rados_trunc(self.handle, object_id.as_ptr(), size)
})?;
mem::drop(object_id);
Ok(())
}
pub fn stat(&mut self, obj: &str) -> Result<Stat> {
let object_id = POOL.get_str(obj)?;
let mut size = 0;
let mut time = 0;
errors::librados(unsafe {
rados::rados_stat(self.handle, object_id.as_ptr(), &mut size, &mut time)
})?;
mem::drop(object_id);
Ok(Stat {
size,
last_modified: Local.timestamp(time, 0),
})
}
pub fn write_async(&mut self, obj: &str, buf: &[u8], offset: u64) -> UnitFuture {
UnitFuture::new(|completion_handle| {
let object_id = POOL.get_str(obj)?;
errors::librados({
unsafe {
rados::rados_aio_write(
self.handle,
object_id.as_ptr(),
completion_handle,
buf.as_ptr() as *const libc::c_char,
buf.len(),
offset,
)
}
})?;
mem::drop(object_id);
Ok(())
})
}
pub fn append_async(&mut self, obj: &str, buf: &[u8]) -> UnitFuture {
UnitFuture::new(|completion_handle| {
let object_id = POOL.get_str(obj)?;
errors::librados({
unsafe {
rados::rados_aio_append(
self.handle,
object_id.as_ptr(),
completion_handle,
buf.as_ptr() as *const libc::c_char,
buf.len(),
)
}
})?;
mem::drop(object_id);
Ok(())
})
}
pub fn write_full_async(&mut self, obj: &str, buf: &[u8]) -> UnitFuture {
UnitFuture::new(|completion_handle| {
let object_id = POOL.get_str(obj)?;
errors::librados({
unsafe {
rados::rados_aio_write_full(
self.handle,
object_id.as_ptr(),
completion_handle,
buf.as_ptr() as *const libc::c_char,
buf.len(),
)
}
})?;
mem::drop(object_id);
Ok(())
})
}
pub fn remove_async(&mut self, obj: &str) -> UnitFuture {
UnitFuture::new(|completion_handle| {
let object_id = POOL.get_str(obj)?;
errors::librados({
unsafe {
rados::rados_aio_remove(self.handle, object_id.as_ptr(), completion_handle)
}
})?;
mem::drop(object_id);
Ok(())
})
}
pub fn read_async<B>(&mut self, obj: &str, mut buf: B, offset: u64) -> ReadFuture<B>
where
B: StableDeref + DerefMut<Target = [u8]>,
{
let buf_ptr = buf.as_mut_ptr() as *mut libc::c_char;
let buf_len = buf.len();
ReadFuture::new(buf, |completion_handle| {
let object_id = POOL.get_str(obj)?;
errors::librados(unsafe {
rados::rados_aio_read(
self.handle,
object_id.as_ptr(),
completion_handle,
buf_ptr,
buf_len,
offset,
)
})?;
mem::drop(object_id);
Ok(())
})
}
pub fn stat_async(&mut self, obj: &str) -> StatFuture {
let mut boxed = Box::new((0, 0));
let size_ptr = &mut boxed.0 as *mut u64;
let time_ptr = &mut boxed.1 as *mut libc::time_t;
let data_future = DataFuture::new(boxed, |completion_handle| {
let object_id = POOL.get_str(obj)?;
errors::librados(unsafe {
rados::rados_aio_stat(
self.handle,
object_id.as_ptr(),
completion_handle,
size_ptr,
time_ptr,
)
})?;
mem::drop(object_id);
Ok(())
});
StatFuture { data_future }
}
pub fn exists(&mut self, obj: &str) -> Result<bool> {
let object_id = POOL.get_str(obj)?;
let result = errors::librados(unsafe {
rados::rados_stat(
self.handle,
object_id.as_ptr(),
ptr::null_mut(),
ptr::null_mut(),
)
});
match result {
Ok(()) => Ok(true),
Err(Error(ErrorKind::Rados(err_code), _)) if err_code == libc::ENOENT as u32 => {
Ok(false)
}
Err(err) => Err(err),
}
}
pub fn exists_async(&mut self, obj: &str) -> ExistsFuture {
let unit_future = UnitFuture::new(|completion_handle| {
let object_id = POOL.get_str(obj)?;
errors::librados(unsafe {
rados::rados_aio_stat(
self.handle,
object_id.as_ptr(),
completion_handle,
ptr::null_mut(),
ptr::null_mut(),
)
})?;
mem::drop(object_id);
Ok(())
});
ExistsFuture { unit_future }
}
pub fn flush(&mut self) -> Result<()> {
errors::librados(unsafe { rados::rados_aio_flush(self.handle) })
}
pub fn flush_async(&mut self) -> UnitFuture {
UnitFuture::new(|completion_handle| {
errors::librados(unsafe {
rados::rados_aio_flush_async(self.handle, completion_handle)
})
})
}
}