#![cfg(unix)]
#![allow(unused_imports)]
use crate::JsonData;
use crate::admin_sockets::*;
use crate::error::*;
use crate::json::*;
use crate::JsonValue;
use byteorder::{LittleEndian, WriteBytesExt};
use futures::task::SpawnExt;
use libc::*;
use nom::number::complete::le_u32;
use nom::IResult;
use serde_json;
use crate::completion::with_completion;
use crate::rados::*;
#[cfg(feature = "rados_striper")]
use crate::rados_striper::*;
use crate::status::*;
use std::ffi::{CStr, CString};
use std::marker::PhantomData;
use std::{ptr, str};
use crate::utils::*;
use std::io::{BufRead, Cursor};
use std::net::IpAddr;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use crate::list_stream::ListStream;
use crate::read_stream::ReadStream;
pub use crate::write_sink::WriteSink;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use uuid::Uuid;
const CEPH_OSD_TMAP_HDR: char = 'h';
const CEPH_OSD_TMAP_SET: char = 's';
const CEPH_OSD_TMAP_CREATE: char = 'c';
const CEPH_OSD_TMAP_RM: char = 'r';
const DEFAULT_READ_BYTES: usize = 64 * 1024;
#[derive(Debug, Clone)]
pub enum CephHealth {
Ok,
Warning,
Error,
}
#[derive(Debug, Clone)]
pub enum CephCommandTypes {
Mon,
Osd,
Pgs,
}
named!(
parse_header<TmapOperation>,
do_parse!(
char!(CEPH_OSD_TMAP_HDR)
>> data_len: le_u32
>> data: take!(data_len)
>> (TmapOperation::Header {
data: data.to_vec()
})
)
);
named!(
parse_create<TmapOperation>,
do_parse!(
char!(CEPH_OSD_TMAP_CREATE)
>> key_name_len: le_u32
>> key_name: take_str!(key_name_len)
>> data_len: le_u32
>> data: take!(data_len)
>> (TmapOperation::Create {
name: key_name.to_string(),
data: data.to_vec(),
})
)
);
named!(
parse_set<TmapOperation>,
do_parse!(
char!(CEPH_OSD_TMAP_SET)
>> key_name_len: le_u32
>> key_name: take_str!(key_name_len)
>> data_len: le_u32
>> data: take!(data_len)
>> (TmapOperation::Set {
key: key_name.to_string(),
data: data.to_vec(),
})
)
);
named!(
parse_remove<TmapOperation>,
do_parse!(
char!(CEPH_OSD_TMAP_RM)
>> key_name_len: le_u32
>> key_name: take_str!(key_name_len)
>> (TmapOperation::Remove {
name: key_name.to_string(),
})
)
);
#[derive(Debug)]
pub enum TmapOperation {
Header { data: Vec<u8> },
Set { key: String, data: Vec<u8> },
Create { name: String, data: Vec<u8> },
Remove { name: String },
}
impl TmapOperation {
fn serialize(&self) -> RadosResult<Vec<u8>> {
let mut buffer: Vec<u8> = Vec::new();
match *self {
TmapOperation::Header { ref data } => {
buffer.push(CEPH_OSD_TMAP_HDR as u8);
buffer.write_u32::<LittleEndian>(data.len() as u32)?;
buffer.extend_from_slice(data);
}
TmapOperation::Set { ref key, ref data } => {
buffer.push(CEPH_OSD_TMAP_SET as u8);
buffer.write_u32::<LittleEndian>(key.len() as u32)?;
buffer.extend(key.as_bytes());
buffer.write_u32::<LittleEndian>(data.len() as u32)?;
buffer.extend_from_slice(data);
}
TmapOperation::Create { ref name, ref data } => {
buffer.push(CEPH_OSD_TMAP_CREATE as u8);
buffer.write_u32::<LittleEndian>(name.len() as u32)?;
buffer.extend(name.as_bytes());
buffer.write_u32::<LittleEndian>(data.len() as u32)?;
buffer.extend_from_slice(data);
}
TmapOperation::Remove { ref name } => {
buffer.push(CEPH_OSD_TMAP_RM as u8);
buffer.write_u32::<LittleEndian>(name.len() as u32)?;
buffer.extend(name.as_bytes());
}
}
Ok(buffer)
}
fn deserialize(input: &[u8]) -> IResult<&[u8], Vec<TmapOperation>> {
many0!(
input,
alt!(
complete!(parse_header)
| complete!(parse_create)
| complete!(parse_set)
| complete!(parse_remove)
)
)
}
}
#[derive(Debug)]
pub struct Pool {
pub ctx: rados_list_ctx_t,
}
#[derive(Debug)]
pub struct CephObject {
pub name: String,
pub entry_locator: String,
pub namespace: String,
}
impl Iterator for Pool {
type Item = CephObject;
fn next(&mut self) -> Option<CephObject> {
let mut entry_ptr: *mut *const ::libc::c_char = ptr::null_mut();
let mut key_ptr: *mut *const ::libc::c_char = ptr::null_mut();
let mut nspace_ptr: *mut *const ::libc::c_char = ptr::null_mut();
unsafe {
let ret_code =
rados_nobjects_list_next(self.ctx, &mut entry_ptr, &mut key_ptr, &mut nspace_ptr);
if ret_code == -ENOENT {
rados_nobjects_list_close(self.ctx);
None
} else if ret_code < 0 {
None
} else {
let object_name = CStr::from_ptr(entry_ptr as *const ::libc::c_char);
let mut object_locator = String::new();
let mut namespace = String::new();
if !key_ptr.is_null() {
object_locator.push_str(
&CStr::from_ptr(key_ptr as *const ::libc::c_char).to_string_lossy(),
);
}
if !nspace_ptr.is_null() {
namespace.push_str(
&CStr::from_ptr(nspace_ptr as *const ::libc::c_char).to_string_lossy(),
);
}
Some(CephObject {
name: object_name.to_string_lossy().into_owned(),
entry_locator: object_locator,
namespace,
})
}
}
}
}
#[derive(Debug)]
pub struct ReadOperation {
pub object_name: String,
pub flags: u32,
read_op_handle: rados_read_op_t,
}
impl Drop for ReadOperation {
fn drop(&mut self) {
unsafe {
rados_release_read_op(self.read_op_handle);
}
}
}
#[derive(Debug)]
pub struct WriteOperation {
pub object_name: String,
pub flags: u32,
pub mtime: time_t,
write_op_handle: rados_write_op_t,
}
impl Drop for WriteOperation {
fn drop(&mut self) {
unsafe {
rados_release_write_op(self.write_op_handle);
}
}
}
#[derive(Debug)]
pub struct XAttr {
pub name: String,
pub value: String,
iter: rados_xattrs_iter_t,
}
#[derive(Debug)]
pub struct RadosVersion {
pub major: i32,
pub minor: i32,
pub extra: i32,
}
impl XAttr {
pub fn new(iter: rados_xattrs_iter_t) -> XAttr {
XAttr {
name: String::new(),
value: String::new(),
iter,
}
}
}
impl Iterator for XAttr {
type Item = XAttr;
fn next(&mut self) -> Option<Self::Item> {
let mut name: *const c_char = ptr::null();
let mut value: *const c_char = ptr::null();
let mut val_length: usize = 0;
unsafe {
let ret_code = rados_getxattrs_next(self.iter, &mut name, &mut value, &mut val_length);
if ret_code < 0 {
None
}
else if value.is_null() && val_length == 0 {
rados_getxattrs_end(self.iter);
None
} else {
let name = CStr::from_ptr(name);
let s_bytes = std::slice::from_raw_parts(value, val_length);
let bytes: Vec<u8> = s_bytes.iter().map(|c| *c as u8).collect();
Some(XAttr {
name: name.to_string_lossy().into_owned(),
value: String::from_utf8_lossy(&bytes).into_owned(),
iter: self.iter,
})
}
}
}
}
pub struct IoCtx {
pub ioctx: rados_ioctx_t,
}
unsafe impl Send for IoCtx {}
unsafe impl Sync for IoCtx {}
impl Drop for IoCtx {
fn drop(&mut self) {
assert!(self.ioctx.is_null(), "Rados not disconnected!");
}
}
#[cfg(feature = "rados_striper")]
pub struct RadosStriper {
rados_striper: rados_ioctx_t,
}
#[cfg(feature = "rados_striper")]
impl Drop for RadosStriper {
fn drop(&mut self) {
if !self.rados_striper.is_null() {
unsafe {
rados_striper_destroy(self.rados_striper);
}
}
}
}
pub struct Rados {
rados: rados_t,
phantom: PhantomData<IoCtx>,
}
unsafe impl Send for Rados {}
unsafe impl Sync for Rados {}
impl Drop for Rados {
fn drop(&mut self) {
assert!(self.rados.is_null(), "Rados not disconnected!");
}
}
pub fn connect_to_ceph(user_id: &str, config_file: &str) -> RadosResult<Rados> {
let connect_id = CString::new(user_id)?;
let conf_file = CString::new(config_file)?;
unsafe {
let mut cluster_handle: rados_t = ptr::null_mut();
let ret_code = rados_create(&mut cluster_handle, connect_id.as_ptr());
if ret_code < 0 {
return Err(ret_code.into());
}
let ret_code = rados_conf_read_file(cluster_handle, conf_file.as_ptr());
if ret_code < 0 {
return Err(ret_code.into());
}
let ret_code = rados_connect(cluster_handle);
if ret_code < 0 {
return Err(ret_code.into());
}
Ok(Rados {
rados: cluster_handle,
phantom: PhantomData,
})
}
}
pub async fn connect_to_ceph_async(user_id: &str, config_file: &str) -> RadosResult<Rados> {
let user_id = user_id.to_string();
let config_file = config_file.to_string();
let pool = futures::executor::ThreadPool::builder()
.pool_size(1)
.create()
.expect("Could not spawn thread pool");
pool.spawn_with_handle(async move { connect_to_ceph(&user_id, &config_file) })
.expect("Could not spawn background task")
.await
}
pub async fn disconnect_from_ceph_async(mut rados: Rados) {
let pool = futures::executor::ThreadPool::builder()
.pool_size(1)
.create()
.expect("Could not spawn thread pool");
pool.spawn_with_handle(async move { rados.disconnect_from_ceph() })
.expect("Could not spawn background task")
.await;
}
impl Rados {
pub fn inner(&self) -> &rados_t {
&self.rados
}
fn set_null(&mut self) {
self.rados = ptr::null_mut();
}
pub fn disconnect_from_ceph(&mut self) {
if self.rados.is_null() {
return;
}
unsafe {
rados_shutdown(self.rados);
}
self.set_null();
}
fn conn_guard(&self) -> RadosResult<()> {
if self.rados.is_null() {
return Err(RadosError::new(
"Rados not connected. Please initialize cluster".to_string(),
));
}
Ok(())
}
pub fn config_set(&self, name: &str, value: &str) -> RadosResult<()> {
if !self.rados.is_null() {
return Err(RadosError::new(
"Rados should not be connected when this function is called".into(),
));
}
let name_str = CString::new(name)?;
let value_str = CString::new(value)?;
unsafe {
let ret_code = rados_conf_set(self.rados, name_str.as_ptr(), value_str.as_ptr());
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn config_get(&self, name: &str) -> RadosResult<String> {
let name_str = CString::new(name)?;
let mut buffer: Vec<u8> = Vec::with_capacity(5120);
unsafe {
let ret_code = rados_conf_get(
self.rados,
name_str.as_ptr(),
buffer.as_mut_ptr() as *mut c_char,
buffer.capacity(),
);
if ret_code < 0 {
return Err(ret_code.into());
}
buffer.set_len(5120);
let num_bytes = buffer.iter().position(|x| x == &0u8);
buffer.set_len(num_bytes.unwrap_or(0));
Ok(String::from_utf8_lossy(&buffer).into_owned())
}
}
pub fn get_rados_ioctx(&self, pool_name: &str) -> RadosResult<IoCtx> {
self.conn_guard()?;
let pool_name_str = CString::new(pool_name)?;
unsafe {
let mut ioctx: rados_ioctx_t = ptr::null_mut();
let ret_code = rados_ioctx_create(self.rados, pool_name_str.as_ptr(), &mut ioctx);
if ret_code < 0 {
return Err(ret_code.into());
}
Ok(IoCtx { ioctx })
}
}
pub fn get_rados_ioctx2(&self, pool_id: i64) -> RadosResult<IoCtx> {
self.conn_guard()?;
unsafe {
let mut ioctx: rados_ioctx_t = ptr::null_mut();
let ret_code = rados_ioctx_create2(self.rados, pool_id, &mut ioctx);
if ret_code < 0 {
return Err(ret_code.into());
}
Ok(IoCtx { ioctx })
}
}
}
pub async fn destroy_rados_ioctx_async(mut ioctx: IoCtx) {
let pool = futures::executor::ThreadPool::builder()
.pool_size(1)
.create()
.expect("Could not spawn thread pool");
pool.spawn_with_handle(async move { ioctx.destroy_rados_ioctx() })
.expect("Could not spawn background task")
.await;
}
impl IoCtx {
pub fn inner(&self) -> &rados_ioctx_t {
&self.ioctx
}
fn set_null(&mut self) {
self.ioctx = ptr::null_mut();
}
pub fn destroy_rados_ioctx(&mut self) {
if self.ioctx.is_null() {
return;
}
unsafe {
rados_ioctx_destroy(self.ioctx);
}
self.set_null();
}
fn ioctx_guard(&self) -> RadosResult<()> {
if self.ioctx.is_null() {
return Err(RadosError::new(
"Rados ioctx not created. Please initialize first".to_string(),
));
}
Ok(())
}
pub fn rados_stat_pool(&self) -> RadosResult<Struct_rados_pool_stat_t> {
self.ioctx_guard()?;
let mut pool_stat = Struct_rados_pool_stat_t::default();
unsafe {
let ret_code = rados_ioctx_pool_stat(self.ioctx, &mut pool_stat);
if ret_code < 0 {
return Err(ret_code.into());
}
Ok(pool_stat)
}
}
pub fn rados_pool_set_auid(&self, auid: u64) -> RadosResult<()> {
self.ioctx_guard()?;
unsafe {
let ret_code = rados_ioctx_pool_set_auid(self.ioctx, auid);
if ret_code < 0 {
return Err(ret_code.into());
}
Ok(())
}
}
pub fn rados_pool_get_auid(&self) -> RadosResult<u64> {
self.ioctx_guard()?;
let mut auid: u64 = 0;
unsafe {
let ret_code = rados_ioctx_pool_get_auid(self.ioctx, &mut auid);
if ret_code < 0 {
return Err(ret_code.into());
}
Ok(auid)
}
}
pub fn rados_pool_requires_alignment(&self) -> RadosResult<bool> {
self.ioctx_guard()?;
unsafe {
let ret_code = rados_ioctx_pool_requires_alignment(self.ioctx);
if ret_code < 0 {
return Err(ret_code.into());
}
if ret_code == 0 {
Ok(false)
} else {
Ok(true)
}
}
}
pub fn rados_pool_required_alignment(&self) -> RadosResult<u64> {
self.ioctx_guard()?;
unsafe {
let ret_code = rados_ioctx_pool_required_alignment(self.ioctx);
Ok(ret_code)
}
}
pub fn rados_object_get_id(&self) -> RadosResult<i64> {
self.ioctx_guard()?;
unsafe {
let pool_id = rados_ioctx_get_id(self.ioctx);
Ok(pool_id)
}
}
pub fn rados_get_pool_name(&self) -> RadosResult<String> {
self.ioctx_guard()?;
let mut buffer: Vec<u8> = Vec::with_capacity(500);
unsafe {
let ret_code = rados_ioctx_get_pool_name(
self.ioctx,
buffer.as_mut_ptr() as *mut c_char,
buffer.capacity() as c_uint,
);
if ret_code == -ERANGE {
buffer.reserve(1000);
buffer.set_len(1000);
let ret_code = rados_ioctx_get_pool_name(
self.ioctx,
buffer.as_mut_ptr() as *mut c_char,
buffer.capacity() as c_uint,
);
if ret_code < 0 {
return Err(ret_code.into());
}
Ok(String::from_utf8_lossy(&buffer).into_owned())
} else if ret_code < 0 {
Err(ret_code.into())
} else {
buffer.set_len(ret_code as usize);
Ok(String::from_utf8_lossy(&buffer).into_owned())
}
}
}
pub fn rados_locator_set_key(&self, key: &str) -> RadosResult<()> {
self.ioctx_guard()?;
let key_str = CString::new(key)?;
unsafe {
rados_ioctx_locator_set_key(self.ioctx, key_str.as_ptr());
}
Ok(())
}
pub fn rados_set_namespace(&self, namespace: &str) -> RadosResult<()> {
self.ioctx_guard()?;
let namespace_str = CString::new(namespace)?;
unsafe {
rados_ioctx_set_namespace(self.ioctx, namespace_str.as_ptr());
}
Ok(())
}
pub fn rados_list_pool_objects(&self) -> RadosResult<rados_list_ctx_t> {
self.ioctx_guard()?;
let mut rados_list_ctx: rados_list_ctx_t = ptr::null_mut();
unsafe {
let ret_code = rados_nobjects_list_open(self.ioctx, &mut rados_list_ctx);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(rados_list_ctx)
}
pub fn rados_snap_create(&self, snap_name: &str) -> RadosResult<()> {
self.ioctx_guard()?;
let snap_name_str = CString::new(snap_name)?;
unsafe {
let ret_code = rados_ioctx_snap_create(self.ioctx, snap_name_str.as_ptr());
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_snap_remove(&self, snap_name: &str) -> RadosResult<()> {
self.ioctx_guard()?;
let snap_name_str = CString::new(snap_name)?;
unsafe {
let ret_code = rados_ioctx_snap_remove(self.ioctx, snap_name_str.as_ptr());
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_snap_rollback(&self, object_name: &str, snap_name: &str) -> RadosResult<()> {
self.ioctx_guard()?;
let snap_name_str = CString::new(snap_name)?;
let object_name_str = CString::new(object_name)?;
unsafe {
let ret_code = rados_ioctx_snap_rollback(
self.ioctx,
object_name_str.as_ptr(),
snap_name_str.as_ptr(),
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_snap_set_read(&self, snap_id: u64) -> RadosResult<()> {
self.ioctx_guard()?;
unsafe {
rados_ioctx_snap_set_read(self.ioctx, snap_id);
}
Ok(())
}
pub fn rados_selfmanaged_snap_create(&self) -> RadosResult<u64> {
self.ioctx_guard()?;
let mut snap_id: u64 = 0;
unsafe {
let ret_code = rados_ioctx_selfmanaged_snap_create(self.ioctx, &mut snap_id);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(snap_id)
}
pub fn rados_selfmanaged_snap_remove(&self, snap_id: u64) -> RadosResult<()> {
self.ioctx_guard()?;
unsafe {
let ret_code = rados_ioctx_selfmanaged_snap_remove(self.ioctx, snap_id);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_selfmanaged_snap_rollback(
&self,
object_name: &str,
snap_id: u64,
) -> RadosResult<()> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
unsafe {
let ret_code = rados_ioctx_selfmanaged_snap_rollback(
self.ioctx,
object_name_str.as_ptr(),
snap_id,
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_snap_lookup(&self, snap_name: &str) -> RadosResult<u64> {
self.ioctx_guard()?;
let snap_name_str = CString::new(snap_name)?;
let mut snap_id: u64 = 0;
unsafe {
let ret_code =
rados_ioctx_snap_lookup(self.ioctx, snap_name_str.as_ptr(), &mut snap_id);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(snap_id)
}
pub fn rados_snap_get_name(&self, snap_id: u64) -> RadosResult<String> {
self.ioctx_guard()?;
let out_buffer: Vec<u8> = Vec::with_capacity(500);
let out_buff_size = out_buffer.capacity();
let out_str = CString::new(out_buffer)?;
unsafe {
let ret_code = rados_ioctx_snap_get_name(
self.ioctx,
snap_id,
out_str.as_ptr() as *mut c_char,
out_buff_size as c_int,
);
if ret_code == -ERANGE {}
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(out_str.to_string_lossy().into_owned())
}
pub fn rados_snap_get_stamp(&self, snap_id: u64) -> RadosResult<time_t> {
self.ioctx_guard()?;
let mut time_id: time_t = 0;
unsafe {
let ret_code = rados_ioctx_snap_get_stamp(self.ioctx, snap_id, &mut time_id);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(time_id)
}
pub fn rados_get_object_last_version(&self) -> RadosResult<u64> {
self.ioctx_guard()?;
unsafe {
let obj_id = rados_get_last_version(self.ioctx);
Ok(obj_id)
}
}
pub fn rados_object_write(
&self,
object_name: &str,
buffer: &[u8],
offset: u64,
) -> RadosResult<()> {
self.ioctx_guard()?;
let obj_name_str = CString::new(object_name)?;
unsafe {
let ret_code = rados_write(
self.ioctx,
obj_name_str.as_ptr(),
buffer.as_ptr() as *const c_char,
buffer.len(),
offset,
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_object_write_full(&self, object_name: &str, buffer: &[u8]) -> RadosResult<()> {
self.ioctx_guard()?;
let obj_name_str = CString::new(object_name)?;
unsafe {
let ret_code = rados_write_full(
self.ioctx,
obj_name_str.as_ptr(),
buffer.as_ptr() as *const ::libc::c_char,
buffer.len(),
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub async fn rados_async_object_write(
&self,
object_name: &str,
buffer: &[u8],
offset: u64,
) -> RadosResult<u32> {
self.ioctx_guard()?;
let obj_name_str = CString::new(object_name)?;
with_completion(&self, |c| unsafe {
rados_aio_write(
self.ioctx,
obj_name_str.as_ptr(),
c,
buffer.as_ptr() as *const ::libc::c_char,
buffer.len(),
offset,
)
})?
.await
}
pub async fn rados_async_object_append(
self: &Arc<Self>,
object_name: &str,
buffer: &[u8],
) -> RadosResult<u32> {
self.ioctx_guard()?;
let obj_name_str = CString::new(object_name)?;
with_completion(self, |c| unsafe {
rados_aio_append(
self.ioctx,
obj_name_str.as_ptr(),
c,
buffer.as_ptr() as *const ::libc::c_char,
buffer.len(),
)
})?
.await
}
pub async fn rados_async_object_write_full(
&self,
object_name: &str,
buffer: &[u8],
) -> RadosResult<u32> {
self.ioctx_guard()?;
let obj_name_str = CString::new(object_name)?;
with_completion(&self, |c| unsafe {
rados_aio_write_full(
self.ioctx,
obj_name_str.as_ptr(),
c,
buffer.as_ptr() as *const ::libc::c_char,
buffer.len(),
)
})?
.await
}
pub async fn rados_async_object_remove(&self, object_name: &str) -> RadosResult<()> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
with_completion(self, |c| unsafe {
rados_aio_remove(self.ioctx, object_name_str.as_ptr() as *const c_char, c)
})?
.await
.map(|_r| ())
}
pub async fn rados_async_object_read(
&self,
object_name: &str,
fill_buffer: &mut Vec<u8>,
read_offset: u64,
) -> RadosResult<u32> {
self.ioctx_guard()?;
let obj_name_str = CString::new(object_name)?;
if fill_buffer.capacity() == 0 {
fill_buffer.reserve_exact(DEFAULT_READ_BYTES);
}
let result = with_completion(self, |c| unsafe {
rados_aio_read(
self.ioctx,
obj_name_str.as_ptr(),
c,
fill_buffer.as_mut_ptr() as *mut c_char,
fill_buffer.capacity(),
read_offset,
)
})?
.await;
if let Ok(rval) = &result {
unsafe {
let len = *rval as usize;
assert!(len <= fill_buffer.capacity());
fill_buffer.set_len(len);
}
}
result
}
pub fn rados_async_object_read_stream(
&self,
object_name: &str,
buffer_size: Option<usize>,
concurrency: Option<usize>,
size_hint: Option<u64>,
) -> ReadStream<'_> {
ReadStream::new(self, object_name, buffer_size, concurrency, size_hint)
}
pub fn rados_async_object_write_stream(
&self,
object_name: &str,
concurrency: Option<usize>,
) -> WriteSink<'_> {
WriteSink::new(self, object_name, concurrency)
}
pub async fn rados_async_object_stat(
&self,
object_name: &str,
) -> RadosResult<(u64, SystemTime)> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
let mut psize: u64 = 0;
let mut time: ::libc::time_t = 0;
with_completion(self, |c| unsafe {
rados_aio_stat(
self.ioctx,
object_name_str.as_ptr(),
c,
&mut psize,
&mut time,
)
})?
.await?;
Ok((psize, (UNIX_EPOCH + Duration::from_secs(time as u64))))
}
pub fn rados_async_object_list(&self) -> RadosResult<ListStream> {
self.ioctx_guard()?;
let mut rados_list_ctx: rados_list_ctx_t = ptr::null_mut();
unsafe {
let r = rados_nobjects_list_open(self.ioctx, &mut rados_list_ctx);
if r == 0 {
Ok(ListStream::new(rados_list_ctx))
} else {
Err(r.into())
}
}
}
pub async fn rados_async_object_getxattr(
&self,
object_name: &str,
attr_name: &str,
fill_buffer: &mut [u8],
) -> RadosResult<u32> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
let attr_name_str = CString::new(attr_name)?;
with_completion(self, |c| unsafe {
rados_aio_getxattr(
self.ioctx,
object_name_str.as_ptr() as *const c_char,
c,
attr_name_str.as_ptr() as *const c_char,
fill_buffer.as_mut_ptr() as *mut c_char,
fill_buffer.len(),
)
})?
.await
}
pub async fn rados_async_object_setxattr(
&self,
object_name: &str,
attr_name: &str,
attr_value: &[u8],
) -> RadosResult<u32> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
let attr_name_str = CString::new(attr_name)?;
with_completion(self, |c| unsafe {
rados_aio_setxattr(
self.ioctx,
object_name_str.as_ptr() as *const c_char,
c,
attr_name_str.as_ptr() as *const c_char,
attr_value.as_ptr() as *mut c_char,
attr_value.len(),
)
})?
.await
}
pub async fn rados_async_object_rmxattr(
&self,
object_name: &str,
attr_name: &str,
) -> RadosResult<u32> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
let attr_name_str = CString::new(attr_name)?;
with_completion(self, |c| unsafe {
rados_aio_rmxattr(
self.ioctx,
object_name_str.as_ptr() as *const c_char,
c,
attr_name_str.as_ptr() as *const c_char,
)
})?
.await
}
pub fn rados_object_clone_range(
&self,
dst_object_name: &str,
dst_offset: u64,
src_object_name: &str,
src_offset: u64,
length: usize,
) -> RadosResult<()> {
self.ioctx_guard()?;
let dst_name_str = CString::new(dst_object_name)?;
let src_name_str = CString::new(src_object_name)?;
unsafe {
let ret_code = rados_clone_range(
self.ioctx,
dst_name_str.as_ptr(),
dst_offset,
src_name_str.as_ptr(),
src_offset,
length,
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_object_append(&self, object_name: &str, buffer: &[u8]) -> RadosResult<()> {
self.ioctx_guard()?;
let obj_name_str = CString::new(object_name)?;
unsafe {
let ret_code = rados_append(
self.ioctx,
obj_name_str.as_ptr(),
buffer.as_ptr() as *const c_char,
buffer.len(),
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_object_read(
&self,
object_name: &str,
fill_buffer: &mut Vec<u8>,
read_offset: u64,
) -> RadosResult<i32> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
let mut len = fill_buffer.capacity();
if len == 0 {
fill_buffer.reserve_exact(DEFAULT_READ_BYTES);
len = fill_buffer.capacity();
}
unsafe {
let ret_code = rados_read(
self.ioctx,
object_name_str.as_ptr(),
fill_buffer.as_mut_ptr() as *mut c_char,
len,
read_offset,
);
if ret_code < 0 {
return Err(ret_code.into());
}
fill_buffer.set_len(ret_code as usize);
Ok(ret_code)
}
}
pub fn rados_object_remove(&self, object_name: &str) -> RadosResult<()> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
unsafe {
let ret_code = rados_remove(self.ioctx, object_name_str.as_ptr() as *const c_char);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_object_trunc(&self, object_name: &str, new_size: u64) -> RadosResult<()> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
unsafe {
let ret_code = rados_trunc(self.ioctx, object_name_str.as_ptr(), new_size);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_object_getxattr(
&self,
object_name: &str,
attr_name: &str,
fill_buffer: &mut [u8],
) -> RadosResult<i32> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
let attr_name_str = CString::new(attr_name)?;
unsafe {
let ret_code = rados_getxattr(
self.ioctx,
object_name_str.as_ptr() as *const c_char,
attr_name_str.as_ptr() as *const c_char,
fill_buffer.as_mut_ptr() as *mut c_char,
fill_buffer.len(),
);
if ret_code < 0 {
return Err(ret_code.into());
}
Ok(ret_code)
}
}
pub fn rados_object_setxattr(
&self,
object_name: &str,
attr_name: &str,
attr_value: &mut [u8],
) -> RadosResult<()> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
let attr_name_str = CString::new(attr_name)?;
unsafe {
let ret_code = rados_setxattr(
self.ioctx,
object_name_str.as_ptr() as *const c_char,
attr_name_str.as_ptr() as *const c_char,
attr_value.as_mut_ptr() as *mut c_char,
attr_value.len(),
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_object_rmxattr(&self, object_name: &str, attr_name: &str) -> RadosResult<()> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
let attr_name_str = CString::new(attr_name)?;
unsafe {
let ret_code = rados_rmxattr(
self.ioctx,
object_name_str.as_ptr() as *const c_char,
attr_name_str.as_ptr() as *const c_char,
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_get_xattr_iterator(&self, object_name: &str) -> RadosResult<rados_xattrs_iter_t> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
let mut xattr_iterator_handle: rados_xattrs_iter_t = ptr::null_mut();
unsafe {
let ret_code = rados_getxattrs(
self.ioctx,
object_name_str.as_ptr(),
&mut xattr_iterator_handle,
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(xattr_iterator_handle)
}
pub fn rados_object_stat(&self, object_name: &str) -> RadosResult<(u64, SystemTime)> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
let mut psize: u64 = 0;
let mut time: ::libc::time_t = 0;
unsafe {
let ret_code = rados_stat(self.ioctx, object_name_str.as_ptr(), &mut psize, &mut time);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok((psize, (UNIX_EPOCH + Duration::from_secs(time as u64))))
}
pub fn rados_object_tmap_update(
&self,
object_name: &str,
update: TmapOperation,
) -> RadosResult<()> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
let buffer = update.serialize()?;
unsafe {
let ret_code = rados_tmap_update(
self.ioctx,
object_name_str.as_ptr(),
buffer.as_ptr() as *const c_char,
buffer.len(),
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_object_tmap_get(&self, object_name: &str) -> RadosResult<Vec<TmapOperation>> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
let mut buffer: Vec<u8> = Vec::with_capacity(500);
unsafe {
let ret_code = rados_tmap_get(
self.ioctx,
object_name_str.as_ptr(),
buffer.as_mut_ptr() as *mut c_char,
buffer.capacity(),
);
if ret_code == -ERANGE {
buffer.reserve(1000);
buffer.set_len(1000);
let ret_code = rados_tmap_get(
self.ioctx,
object_name_str.as_ptr(),
buffer.as_mut_ptr() as *mut c_char,
buffer.capacity(),
);
if ret_code < 0 {
return Err(ret_code.into());
}
} else if ret_code < 0 {
return Err(ret_code.into());
}
}
match TmapOperation::deserialize(&buffer) {
Ok((_, tmap)) => Ok(tmap),
Err(nom::Err::Incomplete(needed)) => Err(RadosError::new(format!(
"deserialize of ceph tmap failed.
Input from Ceph was too small. Needed: {:?} more bytes",
needed
))),
Err(nom::Err::Error(e)) => Err(RadosError::new(
String::from_utf8_lossy(e.input).to_string(),
)),
Err(nom::Err::Failure(e)) => Err(RadosError::new(
String::from_utf8_lossy(e.input).to_string(),
)),
}
}
pub fn rados_object_exec(
&self,
object_name: &str,
class_name: &str,
method_name: &str,
input_buffer: &[u8],
output_buffer: &mut [u8],
) -> RadosResult<()> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
let class_name_str = CString::new(class_name)?;
let method_name_str = CString::new(method_name)?;
unsafe {
let ret_code = rados_exec(
self.ioctx,
object_name_str.as_ptr(),
class_name_str.as_ptr(),
method_name_str.as_ptr(),
input_buffer.as_ptr() as *const c_char,
input_buffer.len(),
output_buffer.as_mut_ptr() as *mut c_char,
output_buffer.len(),
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_object_notify(&self, object_name: &str, data: &[u8]) -> RadosResult<()> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
unsafe {
let ret_code = rados_notify(
self.ioctx,
object_name_str.as_ptr(),
0,
data.as_ptr() as *const c_char,
data.len() as i32,
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_object_notify_ack(
&self,
object_name: &str,
notify_id: u64,
cookie: u64,
buffer: Option<&[u8]>,
) -> RadosResult<()> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
match buffer {
Some(buf) => unsafe {
let ret_code = rados_notify_ack(
self.ioctx,
object_name_str.as_ptr(),
notify_id,
cookie,
buf.as_ptr() as *const c_char,
buf.len() as i32,
);
if ret_code < 0 {
return Err(ret_code.into());
}
},
None => unsafe {
let ret_code = rados_notify_ack(
self.ioctx,
object_name_str.as_ptr(),
notify_id,
cookie,
ptr::null(),
0,
);
if ret_code < 0 {
return Err(ret_code.into());
}
},
}
Ok(())
}
pub fn rados_object_set_alloc_hint(
&self,
object_name: &str,
expected_object_size: u64,
expected_write_size: u64,
) -> RadosResult<()> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
unsafe {
let ret_code = rados_set_alloc_hint(
self.ioctx,
object_name_str.as_ptr(),
expected_object_size,
expected_write_size,
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_perform_read_operations(&self, read_op: ReadOperation) -> RadosResult<()> {
self.ioctx_guard()?;
let object_name_str = CString::new(read_op.object_name.clone())?;
unsafe {
let ret_code = rados_read_op_operate(
read_op.read_op_handle,
self.ioctx,
object_name_str.as_ptr(),
read_op.flags as i32,
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_commit_write_operations(&self, write_op: &mut WriteOperation) -> RadosResult<()> {
self.ioctx_guard()?;
let object_name_str = CString::new(write_op.object_name.clone())?;
unsafe {
let ret_code = rados_write_op_operate(
write_op.write_op_handle,
self.ioctx,
object_name_str.as_ptr(),
&mut write_op.mtime,
write_op.flags as i32,
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_object_lock_exclusive(
&self,
object_name: &str,
lock_name: &str,
cookie_name: &str,
description: &str,
duration_time: &mut timeval,
lock_flags: u8,
) -> RadosResult<()> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
let lock_name_str = CString::new(lock_name)?;
let cookie_name_str = CString::new(cookie_name)?;
let description_str = CString::new(description)?;
unsafe {
let ret_code = rados_lock_exclusive(
self.ioctx,
object_name_str.as_ptr(),
lock_name_str.as_ptr(),
cookie_name_str.as_ptr(),
description_str.as_ptr(),
duration_time,
lock_flags,
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_object_lock_shared(
&self,
object_name: &str,
lock_name: &str,
cookie_name: &str,
description: &str,
tag_name: &str,
duration_time: &mut timeval,
lock_flags: u8,
) -> RadosResult<()> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
let lock_name_str = CString::new(lock_name)?;
let cookie_name_str = CString::new(cookie_name)?;
let description_str = CString::new(description)?;
let tag_name_str = CString::new(tag_name)?;
unsafe {
let ret_code = rados_lock_shared(
self.ioctx,
object_name_str.as_ptr(),
lock_name_str.as_ptr(),
cookie_name_str.as_ptr(),
tag_name_str.as_ptr(),
description_str.as_ptr(),
duration_time,
lock_flags,
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_object_unlock(
&self,
object_name: &str,
lock_name: &str,
cookie_name: &str,
) -> RadosResult<()> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
let lock_name_str = CString::new(lock_name)?;
let cookie_name_str = CString::new(cookie_name)?;
unsafe {
let ret_code = rados_unlock(
self.ioctx,
object_name_str.as_ptr(),
lock_name_str.as_ptr(),
cookie_name_str.as_ptr(),
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_object_break_lock(
&self,
object_name: &str,
lock_name: &str,
client_name: &str,
cookie_name: &str,
) -> RadosResult<()> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
let lock_name_str = CString::new(lock_name)?;
let cookie_name_str = CString::new(cookie_name)?;
let client_name_str = CString::new(client_name)?;
unsafe {
let ret_code = rados_break_lock(
self.ioctx,
object_name_str.as_ptr(),
lock_name_str.as_ptr(),
client_name_str.as_ptr(),
cookie_name_str.as_ptr(),
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
#[cfg(feature = "rados_striper")]
pub fn get_rados_striper(self) -> RadosResult<RadosStriper> {
self.ioctx_guard()?;
unsafe {
let mut rados_striper: rados_striper_t = ptr::null_mut();
let ret_code = rados_striper_create(self.ioctx, &mut rados_striper);
if ret_code < 0 {
return Err(ret_code.into());
}
Ok(RadosStriper { rados_striper })
}
}
}
impl Rados {
pub fn rados_blacklist_client(&self, client: IpAddr, expire_seconds: u32) -> RadosResult<()> {
self.conn_guard()?;
let client_address = CString::new(client.to_string())?;
unsafe {
let ret_code = rados_blacklist_add(
self.rados,
client_address.as_ptr() as *mut c_char,
expire_seconds,
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
#[allow(unused_variables)]
pub fn rados_pools(&self) -> RadosResult<Vec<String>> {
self.conn_guard()?;
let mut pools: Vec<String> = Vec::new();
let pool_slice: &[u8];
let mut pool_buffer: Vec<u8> = Vec::with_capacity(500);
unsafe {
let len = rados_pool_list(
self.rados,
pool_buffer.as_mut_ptr() as *mut c_char,
pool_buffer.capacity(),
);
if len > pool_buffer.capacity() as i32 {
pool_buffer.reserve(len as usize);
let len = rados_pool_list(
self.rados,
pool_buffer.as_mut_ptr() as *mut c_char,
pool_buffer.capacity(),
);
pool_buffer.set_len(len as usize);
} else {
pool_buffer.set_len(len as usize);
}
}
let mut cursor = Cursor::new(&pool_buffer);
loop {
let mut string_buf: Vec<u8> = Vec::new();
let read = cursor.read_until(0x00, &mut string_buf)?;
if read == 0 || read == 1 {
break;
} else {
pools.push(String::from_utf8_lossy(&string_buf[..read - 1]).into_owned());
}
}
Ok(pools)
}
pub fn rados_create_pool(&self, pool_name: &str) -> RadosResult<()> {
self.conn_guard()?;
let pool_name_str = CString::new(pool_name)?;
unsafe {
let ret_code = rados_pool_create(self.rados, pool_name_str.as_ptr());
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_delete_pool(&self, pool_name: &str) -> RadosResult<()> {
self.conn_guard()?;
let pool_name_str = CString::new(pool_name)?;
unsafe {
let ret_code = rados_pool_delete(self.rados, pool_name_str.as_ptr());
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_lookup_pool(&self, pool_name: &str) -> RadosResult<Option<i64>> {
self.conn_guard()?;
let pool_name_str = CString::new(pool_name)?;
unsafe {
let ret_code: i64 = rados_pool_lookup(self.rados, pool_name_str.as_ptr());
if ret_code >= 0 {
Ok(Some(ret_code))
} else if ret_code as i32 == -ENOENT {
Ok(None)
} else {
Err((ret_code as i32).into())
}
}
}
pub fn rados_reverse_lookup_pool(&self, pool_id: i64) -> RadosResult<String> {
self.conn_guard()?;
let mut buffer: Vec<u8> = Vec::with_capacity(500);
unsafe {
let ret_code = rados_pool_reverse_lookup(
self.rados,
pool_id,
buffer.as_mut_ptr() as *mut c_char,
buffer.capacity(),
);
if ret_code == -ERANGE {
buffer.reserve(1000);
buffer.set_len(1000);
let ret_code = rados_pool_reverse_lookup(
self.rados,
pool_id,
buffer.as_mut_ptr() as *mut c_char,
buffer.capacity(),
);
if ret_code < 0 {
return Err(ret_code.into());
}
Ok(String::from_utf8_lossy(&buffer).into_owned())
} else if ret_code < 0 {
Err(ret_code.into())
} else {
Ok(String::from_utf8_lossy(&buffer).into_owned())
}
}
}
}
pub fn rados_libversion() -> RadosVersion {
let mut major: c_int = 0;
let mut minor: c_int = 0;
let mut extra: c_int = 0;
unsafe {
rados_version(&mut major, &mut minor, &mut extra);
}
RadosVersion {
major,
minor,
extra,
}
}
impl Rados {
pub fn rados_stat_cluster(&self) -> RadosResult<Struct_rados_cluster_stat_t> {
self.conn_guard()?;
let mut cluster_stat = Struct_rados_cluster_stat_t::default();
unsafe {
let ret_code = rados_cluster_stat(self.rados, &mut cluster_stat);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(cluster_stat)
}
pub fn rados_fsid(&self) -> RadosResult<Uuid> {
self.conn_guard()?;
let mut fsid_buffer: Vec<u8> = Vec::with_capacity(37);
unsafe {
let ret_code = rados_cluster_fsid(
self.rados,
fsid_buffer.as_mut_ptr() as *mut c_char,
fsid_buffer.capacity(),
);
if ret_code < 0 {
return Err(ret_code.into());
}
fsid_buffer.set_len(ret_code as usize);
}
let fsid_str = String::from_utf8(fsid_buffer)?;
Ok(fsid_str.parse()?)
}
pub fn ping_monitor(&self, mon_id: &str) -> RadosResult<String> {
self.conn_guard()?;
let mon_id_str = CString::new(mon_id)?;
let mut out_str: *mut c_char = ptr::null_mut();
let mut str_length: usize = 0;
unsafe {
let ret_code = rados_ping_monitor(
self.rados,
mon_id_str.as_ptr(),
&mut out_str,
&mut str_length,
);
if ret_code < 0 {
return Err(ret_code.into());
}
if !out_str.is_null() {
let s_bytes = std::slice::from_raw_parts(out_str, str_length);
let bytes: Vec<u8> = s_bytes.iter().map(|c| *c as u8).collect();
rados_buffer_free(out_str);
Ok(String::from_utf8_lossy(&bytes).into_owned())
} else {
Ok("".into())
}
}
}
}
pub fn ceph_version(socket: &str) -> Option<String> {
let cmd = "version";
admin_socket_command(&cmd, socket).ok().and_then(|json| {
json_data(&json)
.and_then(|jsondata| json_find(jsondata, &[cmd]).map(|data| json_as_string(&data)))
})
}
pub fn ceph_version_parse() -> Option<String> {
match run_cli("ceph --version") {
Ok(output) => {
let n = output.status.code().unwrap();
if n == 0 {
Some(String::from_utf8_lossy(&output.stdout).to_string())
} else {
Some(String::from_utf8_lossy(&output.stderr).to_string())
}
}
Err(_) => None,
}
}
impl Rados {
pub fn ceph_status(&self, keys: &[&str]) -> RadosResult<String> {
self.conn_guard()?;
match self.ceph_mon_command("prefix", "status", Some("json")) {
Ok((json, _)) => match json {
Some(json) => match json_data(&json) {
Some(jsondata) => {
if let Some(data) = json_find(jsondata, keys) {
Ok(json_as_string(&data))
} else {
Err(RadosError::new(
"The attributes were not found in the output.".to_string(),
))
}
}
_ => Err(RadosError::new("JSON data not found.".to_string())),
},
_ => Err(RadosError::new("JSON data not found.".to_string())),
},
Err(e) => Err(e),
}
}
pub fn ceph_health_string(&self) -> RadosResult<String> {
self.conn_guard()?;
match self.ceph_mon_command("prefix", "health", None) {
Ok((data, _)) => Ok(data.unwrap().replace("\n", "")),
Err(e) => Err(e),
}
}
pub fn ceph_health(&self) -> CephHealth {
match self.ceph_health_string() {
Ok(health) => {
if health.contains("HEALTH_OK") {
CephHealth::Ok
} else if health.contains("HEALTH_WARN") {
CephHealth::Warning
} else {
CephHealth::Error
}
}
Err(_) => CephHealth::Error,
}
}
pub fn ceph_command(
&self,
name: &str,
value: &str,
cmd_type: CephCommandTypes,
keys: &[&str],
) -> RadosResult<JsonData> {
self.conn_guard()?;
match cmd_type {
CephCommandTypes::Osd => Err(RadosError::new("OSD CMDs Not implemented.".to_string())),
CephCommandTypes::Pgs => Err(RadosError::new("PGS CMDS Not implemented.".to_string())),
_ => match self.ceph_mon_command(name, value, Some("json")) {
Ok((json, _)) => match json {
Some(json) => match json_data(&json) {
Some(jsondata) => {
if let Some(data) = json_find(jsondata, keys) {
Ok(data)
} else {
Err(RadosError::new(
"The attributes were not found in the output.".to_string(),
))
}
}
_ => Err(RadosError::new("JSON data not found.".to_string())),
},
_ => Err(RadosError::new("JSON data not found.".to_string())),
},
Err(e) => Err(e),
},
}
}
pub fn ceph_commands(&self, keys: Option<&[&str]>) -> RadosResult<JsonData> {
self.conn_guard()?;
match self.ceph_mon_command("prefix", "get_command_descriptions", Some("json")) {
Ok((json, _)) => match json {
Some(json) => match json_data(&json) {
Some(jsondata) => {
if let Some(k) = keys {
if let Some(data) = json_find(jsondata, k) {
Ok(data)
} else {
Err(RadosError::new(
"The attributes were not found in the output.".to_string(),
))
}
} else {
Ok(jsondata)
}
}
_ => Err(RadosError::new("JSON data not found.".to_string())),
},
_ => Err(RadosError::new("JSON data not found.".to_string())),
},
Err(e) => Err(e),
}
}
pub fn ceph_mon_command(
&self,
name: &str,
value: &str,
format: Option<&str>,
) -> RadosResult<(Option<String>, Option<String>)> {
let data: Vec<*mut c_char> = Vec::with_capacity(1);
self.ceph_mon_command_with_data(name, value, format, data)
}
pub fn ceph_mon_command_without_data(
&self,
cmd: &serde_json::Value,
) -> RadosResult<(Vec<u8>, Option<String>)> {
self.conn_guard()?;
let cmd_string = cmd.to_string();
debug!("ceph_mon_command_without_data: {}", cmd_string);
let data: Vec<*mut c_char> = Vec::with_capacity(1);
let cmds = CString::new(cmd_string).unwrap();
let mut outbuf_len = 0;
let mut outs = ptr::null_mut();
let mut outs_len = 0;
let mut outbuf = ptr::null_mut();
let mut out: Vec<u8> = vec![];
let mut status_string: Option<String> = None;
debug!("Calling rados_mon_command with {:?}", cmd);
unsafe {
let ret_code = rados_mon_command(
self.rados,
&mut cmds.as_ptr(),
1,
data.as_ptr() as *mut c_char,
data.len() as usize,
&mut outbuf,
&mut outbuf_len,
&mut outs,
&mut outs_len,
);
debug!("return code: {}", ret_code);
if ret_code < 0 {
if outs_len > 0 && !outs.is_null() {
let slice = ::std::slice::from_raw_parts(outs as *const u8, outs_len as usize);
rados_buffer_free(outs);
return Err(RadosError::new(String::from_utf8_lossy(slice).into_owned()));
}
return Err(ret_code.into());
}
if outbuf_len > 0 && !outbuf.is_null() {
let slice = ::std::slice::from_raw_parts(outbuf as *const u8, outbuf_len as usize);
out = slice.to_vec();
rados_buffer_free(outbuf);
}
if outs_len > 0 && !outs.is_null() {
let slice = ::std::slice::from_raw_parts(outs as *const u8, outs_len as usize);
status_string = Some(String::from_utf8(slice.to_vec())?);
rados_buffer_free(outs);
}
}
Ok((out, status_string))
}
pub fn ceph_mon_command_with_data(
&self,
name: &str,
value: &str,
format: Option<&str>,
data: Vec<*mut c_char>,
) -> RadosResult<(Option<String>, Option<String>)> {
self.conn_guard()?;
let mut cmd_strings: Vec<String> = Vec::new();
match format {
Some(fmt) => cmd_strings.push(format!(
"{{\"{}\": \"{}\", \"format\": \"{}\"}}",
name, value, fmt
)),
None => cmd_strings.push(format!("{{\"{}\": \"{}\"}}", name, value)),
}
let cstrings: Vec<CString> = cmd_strings[..]
.iter()
.map(|s| CString::new(s.clone()).unwrap())
.collect();
let mut cmds: Vec<*const c_char> = cstrings.iter().map(|c| c.as_ptr()).collect();
let mut outbuf = ptr::null_mut();
let mut outs = ptr::null_mut();
let mut outbuf_len = 0;
let mut outs_len = 0;
let mut str_outbuf: Option<String> = None;
let mut str_outs: Option<String> = None;
debug!("Calling rados_mon_command with {:?}", cstrings);
unsafe {
let ret_code = rados_mon_command(
self.rados,
cmds.as_mut_ptr(),
1,
data.as_ptr() as *mut c_char,
data.len() as usize,
&mut outbuf,
&mut outbuf_len,
&mut outs,
&mut outs_len,
);
if ret_code < 0 {
return Err(ret_code.into());
}
if outbuf_len > 0 {
let c_str_outbuf: &CStr = CStr::from_ptr(outbuf);
let buf_outbuf: &[u8] = c_str_outbuf.to_bytes();
let str_slice_outbuf: &str = str::from_utf8(buf_outbuf).unwrap();
str_outbuf = Some(str_slice_outbuf.to_owned());
rados_buffer_free(outbuf);
}
if outs_len > 0 {
let c_str_outs: &CStr = CStr::from_ptr(outs);
let buf_outs: &[u8] = c_str_outs.to_bytes();
let str_slice_outs: &str = str::from_utf8(buf_outs).unwrap();
str_outs = Some(str_slice_outs.to_owned());
rados_buffer_free(outs);
}
}
Ok((str_outbuf, str_outs))
}
pub fn ceph_osd_command(
&self,
id: i32,
name: &str,
value: &str,
format: Option<&str>,
) -> RadosResult<(Option<String>, Option<String>)> {
let data: Vec<*mut c_char> = Vec::with_capacity(1);
self.ceph_osd_command_with_data(id, name, value, format, data)
}
pub fn ceph_osd_command_with_data(
&self,
id: i32,
name: &str,
value: &str,
format: Option<&str>,
data: Vec<*mut c_char>,
) -> RadosResult<(Option<String>, Option<String>)> {
self.conn_guard()?;
let mut cmd_strings: Vec<String> = Vec::new();
match format {
Some(fmt) => cmd_strings.push(format!(
"{{\"{}\": \"{}\", \"format\": \"{}\"}}",
name, value, fmt
)),
None => cmd_strings.push(format!("{{\"{}\": \"{}\"}}", name, value)),
}
let cstrings: Vec<CString> = cmd_strings[..]
.iter()
.map(|s| CString::new(s.clone()).unwrap())
.collect();
let mut cmds: Vec<*const c_char> = cstrings.iter().map(|c| c.as_ptr()).collect();
let mut outbuf = ptr::null_mut();
let mut outs = ptr::null_mut();
let mut outbuf_len = 0;
let mut outs_len = 0;
let mut str_outbuf: Option<String> = None;
let mut str_outs: Option<String> = None;
unsafe {
let ret_code = rados_osd_command(
self.rados,
id,
cmds.as_mut_ptr(),
1,
data.as_ptr() as *mut c_char,
data.len() as usize,
&mut outbuf,
&mut outbuf_len,
&mut outs,
&mut outs_len,
);
if ret_code < 0 {
return Err(ret_code.into());
}
if outbuf_len > 0 {
let c_str_outbuf: &CStr = CStr::from_ptr(outbuf);
let buf_outbuf: &[u8] = c_str_outbuf.to_bytes();
let str_slice_outbuf: &str = str::from_utf8(buf_outbuf).unwrap();
str_outbuf = Some(str_slice_outbuf.to_owned());
rados_buffer_free(outbuf);
}
if outs_len > 0 {
let c_str_outs: &CStr = CStr::from_ptr(outs);
let buf_outs: &[u8] = c_str_outs.to_bytes();
let str_slice_outs: &str = str::from_utf8(buf_outs).unwrap();
str_outs = Some(str_slice_outs.to_owned());
rados_buffer_free(outs);
}
}
Ok((str_outbuf, str_outs))
}
pub fn ceph_pgs_command(
&self,
pg: &str,
name: &str,
value: &str,
format: Option<&str>,
) -> RadosResult<(Option<String>, Option<String>)> {
let data: Vec<*mut c_char> = Vec::with_capacity(1);
self.ceph_pgs_command_with_data(pg, name, value, format, data)
}
pub fn ceph_pgs_command_with_data(
&self,
pg: &str,
name: &str,
value: &str,
format: Option<&str>,
data: Vec<*mut c_char>,
) -> RadosResult<(Option<String>, Option<String>)> {
self.conn_guard()?;
let mut cmd_strings: Vec<String> = Vec::new();
match format {
Some(fmt) => cmd_strings.push(format!(
"{{\"{}\": \"{}\", \"format\": \"{}\"}}",
name, value, fmt
)),
None => cmd_strings.push(format!("{{\"{}\": \"{}\"}}", name, value)),
}
let pg_str = CString::new(pg).unwrap();
let cstrings: Vec<CString> = cmd_strings[..]
.iter()
.map(|s| CString::new(s.clone()).unwrap())
.collect();
let mut cmds: Vec<*const c_char> = cstrings.iter().map(|c| c.as_ptr()).collect();
let mut outbuf = ptr::null_mut();
let mut outs = ptr::null_mut();
let mut outbuf_len = 0;
let mut outs_len = 0;
let mut str_outbuf: Option<String> = None;
let mut str_outs: Option<String> = None;
unsafe {
let ret_code = rados_pg_command(
self.rados,
pg_str.as_ptr(),
cmds.as_mut_ptr(),
1,
data.as_ptr() as *mut c_char,
data.len() as usize,
&mut outbuf,
&mut outbuf_len,
&mut outs,
&mut outs_len,
);
if ret_code < 0 {
return Err(ret_code.into());
}
if outbuf_len > 0 {
let c_str_outbuf: &CStr = CStr::from_ptr(outbuf);
let buf_outbuf: &[u8] = c_str_outbuf.to_bytes();
let str_slice_outbuf: &str = str::from_utf8(buf_outbuf).unwrap();
str_outbuf = Some(str_slice_outbuf.to_owned());
rados_buffer_free(outbuf);
}
if outs_len > 0 {
let c_str_outs: &CStr = CStr::from_ptr(outs);
let buf_outs: &[u8] = c_str_outs.to_bytes();
let str_slice_outs: &str = str::from_utf8(buf_outs).unwrap();
str_outs = Some(str_slice_outs.to_owned());
rados_buffer_free(outs);
}
}
Ok((str_outbuf, str_outs))
}
}
#[cfg(feature = "rados_striper")]
impl RadosStriper {
pub fn inner(&self) -> &rados_striper_t {
&self.rados_striper
}
pub fn destroy_rados_striper(&self) {
if self.rados_striper.is_null() {
return;
}
unsafe {
rados_striper_destroy(self.rados_striper);
}
}
fn rados_striper_guard(&self) -> RadosResult<()> {
if self.rados_striper.is_null() {
return Err(RadosError::new(
"Rados striper not created. Please initialize first".to_string(),
));
}
Ok(())
}
pub fn rados_object_write(
&self,
object_name: &str,
buffer: &[u8],
offset: u64,
) -> RadosResult<()> {
self.rados_striper_guard()?;
let obj_name_str = CString::new(object_name)?;
unsafe {
let ret_code = rados_striper_write(
self.rados_striper,
obj_name_str.as_ptr(),
buffer.as_ptr() as *const c_char,
buffer.len(),
offset,
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_object_write_full(&self, object_name: &str, buffer: &[u8]) -> RadosResult<()> {
self.rados_striper_guard()?;
let obj_name_str = CString::new(object_name)?;
unsafe {
let ret_code = rados_striper_write_full(
self.rados_striper,
obj_name_str.as_ptr(),
buffer.as_ptr() as *const ::libc::c_char,
buffer.len(),
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_object_append(&self, object_name: &str, buffer: &[u8]) -> RadosResult<()> {
self.rados_striper_guard()?;
let obj_name_str = CString::new(object_name)?;
unsafe {
let ret_code = rados_striper_append(
self.rados_striper,
obj_name_str.as_ptr(),
buffer.as_ptr() as *const c_char,
buffer.len(),
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_object_read(
&self,
object_name: &str,
fill_buffer: &mut Vec<u8>,
read_offset: u64,
) -> RadosResult<i32> {
self.rados_striper_guard()?;
let object_name_str = CString::new(object_name)?;
let mut len = fill_buffer.capacity();
if len == 0 {
fill_buffer.reserve_exact(1024 * 64);
len = fill_buffer.capacity();
}
unsafe {
let ret_code = rados_striper_read(
self.rados_striper,
object_name_str.as_ptr(),
fill_buffer.as_mut_ptr() as *mut c_char,
len,
read_offset,
);
if ret_code < 0 {
return Err(ret_code.into());
}
fill_buffer.set_len(ret_code as usize);
Ok(ret_code)
}
}
pub fn rados_object_remove(&self, object_name: &str) -> RadosResult<()> {
self.rados_striper_guard()?;
let object_name_str = CString::new(object_name)?;
unsafe {
let ret_code = rados_striper_remove(
self.rados_striper,
object_name_str.as_ptr() as *const c_char,
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_object_trunc(&self, object_name: &str, new_size: u64) -> RadosResult<()> {
self.rados_striper_guard()?;
let object_name_str = CString::new(object_name)?;
unsafe {
let ret_code =
rados_striper_trunc(self.rados_striper, object_name_str.as_ptr(), new_size);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_object_getxattr(
&self,
object_name: &str,
attr_name: &str,
fill_buffer: &mut [u8],
) -> RadosResult<i32> {
self.rados_striper_guard()?;
let object_name_str = CString::new(object_name)?;
let attr_name_str = CString::new(attr_name)?;
unsafe {
let ret_code = rados_striper_getxattr(
self.rados_striper,
object_name_str.as_ptr() as *const c_char,
attr_name_str.as_ptr() as *const c_char,
fill_buffer.as_mut_ptr() as *mut c_char,
fill_buffer.len(),
);
if ret_code < 0 {
return Err(ret_code.into());
}
Ok(ret_code)
}
}
pub fn rados_object_setxattr(
&self,
object_name: &str,
attr_name: &str,
attr_value: &mut [u8],
) -> RadosResult<()> {
self.rados_striper_guard()?;
let object_name_str = CString::new(object_name)?;
let attr_name_str = CString::new(attr_name)?;
unsafe {
let ret_code = rados_striper_setxattr(
self.rados_striper,
object_name_str.as_ptr() as *const c_char,
attr_name_str.as_ptr() as *const c_char,
attr_value.as_mut_ptr() as *mut c_char,
attr_value.len(),
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_object_rmxattr(&self, object_name: &str, attr_name: &str) -> RadosResult<()> {
self.rados_striper_guard()?;
let object_name_str = CString::new(object_name)?;
let attr_name_str = CString::new(attr_name)?;
unsafe {
let ret_code = rados_striper_rmxattr(
self.rados_striper,
object_name_str.as_ptr() as *const c_char,
attr_name_str.as_ptr() as *const c_char,
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(())
}
pub fn rados_get_xattr_iterator(&self, object_name: &str) -> RadosResult<rados_xattrs_iter_t> {
self.rados_striper_guard()?;
let object_name_str = CString::new(object_name)?;
let mut xattr_iterator_handle: rados_xattrs_iter_t = ptr::null_mut();
unsafe {
let ret_code = rados_striper_getxattrs(
self.rados_striper,
object_name_str.as_ptr(),
&mut xattr_iterator_handle,
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok(xattr_iterator_handle)
}
pub fn rados_object_stat(&self, object_name: &str) -> RadosResult<(u64, SystemTime)> {
self.rados_striper_guard()?;
let object_name_str = CString::new(object_name)?;
let mut psize: u64 = 0;
let mut time: ::libc::time_t = 0;
unsafe {
let ret_code = rados_striper_stat(
self.rados_striper,
object_name_str.as_ptr(),
&mut psize,
&mut time,
);
if ret_code < 0 {
return Err(ret_code.into());
}
}
Ok((psize, (UNIX_EPOCH + Duration::from_secs(time as u64))))
}
}