ceph_async/
ceph.rs

1// Copyright 2017 LambdaStack All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14#![cfg(unix)]
15#![allow(unused_imports)]
16
17use crate::JsonData;
18
19use crate::admin_sockets::*;
20use crate::error::*;
21use crate::json::*;
22use crate::JsonValue;
23use byteorder::{LittleEndian, WriteBytesExt};
24use futures::task::SpawnExt;
25use libc::*;
26use nom::number::complete::le_u32;
27use nom::IResult;
28use serde_json;
29
30use crate::completion::with_completion;
31use crate::rados::*;
32#[cfg(feature = "rados_striper")]
33use crate::rados_striper::*;
34use crate::status::*;
35use std::ffi::{CStr, CString};
36use std::marker::PhantomData;
37use std::{ptr, str};
38
39use crate::utils::*;
40use std::io::{BufRead, Cursor};
41use std::net::IpAddr;
42use std::time::{Duration, SystemTime, UNIX_EPOCH};
43
44use crate::list_stream::ListStream;
45use crate::read_stream::ReadStream;
46pub use crate::write_sink::WriteSink;
47use std::pin::Pin;
48use std::sync::Arc;
49use std::task::{Context, Poll};
50use uuid::Uuid;
51
52const CEPH_OSD_TMAP_HDR: char = 'h';
53const CEPH_OSD_TMAP_SET: char = 's';
54const CEPH_OSD_TMAP_CREATE: char = 'c';
55const CEPH_OSD_TMAP_RM: char = 'r';
56
57const DEFAULT_READ_BYTES: usize = 64 * 1024;
58
59#[derive(Debug, Clone)]
60pub enum CephHealth {
61    Ok,
62    Warning,
63    Error,
64}
65
66#[derive(Debug, Clone)]
67pub enum CephCommandTypes {
68    Mon,
69    Osd,
70    Pgs,
71}
72
73named!(
74    parse_header<TmapOperation>,
75    do_parse!(
76        char!(CEPH_OSD_TMAP_HDR)
77            >> data_len: le_u32
78            >> data: take!(data_len)
79            >> (TmapOperation::Header {
80                data: data.to_vec()
81            })
82    )
83);
84
85named!(
86    parse_create<TmapOperation>,
87    do_parse!(
88        char!(CEPH_OSD_TMAP_CREATE)
89            >> key_name_len: le_u32
90            >> key_name: take_str!(key_name_len)
91            >> data_len: le_u32
92            >> data: take!(data_len)
93            >> (TmapOperation::Create {
94                name: key_name.to_string(),
95                data: data.to_vec(),
96            })
97    )
98);
99
100named!(
101    parse_set<TmapOperation>,
102    do_parse!(
103        char!(CEPH_OSD_TMAP_SET)
104            >> key_name_len: le_u32
105            >> key_name: take_str!(key_name_len)
106            >> data_len: le_u32
107            >> data: take!(data_len)
108            >> (TmapOperation::Set {
109                key: key_name.to_string(),
110                data: data.to_vec(),
111            })
112    )
113);
114
115named!(
116    parse_remove<TmapOperation>,
117    do_parse!(
118        char!(CEPH_OSD_TMAP_RM)
119            >> key_name_len: le_u32
120            >> key_name: take_str!(key_name_len)
121            >> (TmapOperation::Remove {
122                name: key_name.to_string(),
123            })
124    )
125);
126
127#[derive(Debug)]
128pub enum TmapOperation {
129    Header { data: Vec<u8> },
130    Set { key: String, data: Vec<u8> },
131    Create { name: String, data: Vec<u8> },
132    Remove { name: String },
133}
134
135impl TmapOperation {
136    fn serialize(&self) -> RadosResult<Vec<u8>> {
137        let mut buffer: Vec<u8> = Vec::new();
138        match *self {
139            TmapOperation::Header { ref data } => {
140                buffer.push(CEPH_OSD_TMAP_HDR as u8);
141                buffer.write_u32::<LittleEndian>(data.len() as u32)?;
142                buffer.extend_from_slice(data);
143            }
144            TmapOperation::Set { ref key, ref data } => {
145                buffer.push(CEPH_OSD_TMAP_SET as u8);
146                buffer.write_u32::<LittleEndian>(key.len() as u32)?;
147                buffer.extend(key.as_bytes());
148                buffer.write_u32::<LittleEndian>(data.len() as u32)?;
149                buffer.extend_from_slice(data);
150            }
151            TmapOperation::Create { ref name, ref data } => {
152                buffer.push(CEPH_OSD_TMAP_CREATE as u8);
153                buffer.write_u32::<LittleEndian>(name.len() as u32)?;
154                buffer.extend(name.as_bytes());
155                buffer.write_u32::<LittleEndian>(data.len() as u32)?;
156                buffer.extend_from_slice(data);
157            }
158            TmapOperation::Remove { ref name } => {
159                buffer.push(CEPH_OSD_TMAP_RM as u8);
160                buffer.write_u32::<LittleEndian>(name.len() as u32)?;
161                buffer.extend(name.as_bytes());
162            }
163        }
164        Ok(buffer)
165    }
166
167    fn deserialize(input: &[u8]) -> IResult<&[u8], Vec<TmapOperation>> {
168        many0!(
169            input,
170            alt!(
171                complete!(parse_header)
172                    | complete!(parse_create)
173                    | complete!(parse_set)
174                    | complete!(parse_remove)
175            )
176        )
177    }
178}
179
180/// Helper to iterate over pool objects
181#[derive(Debug)]
182pub struct Pool {
183    pub ctx: rados_list_ctx_t,
184}
185
186#[derive(Debug)]
187pub struct CephObject {
188    pub name: String,
189    pub entry_locator: String,
190    pub namespace: String,
191}
192
193impl Iterator for Pool {
194    type Item = CephObject;
195    fn next(&mut self) -> Option<CephObject> {
196        let mut entry_ptr: *mut *const ::libc::c_char = ptr::null_mut();
197        let mut key_ptr: *mut *const ::libc::c_char = ptr::null_mut();
198        let mut nspace_ptr: *mut *const ::libc::c_char = ptr::null_mut();
199
200        unsafe {
201            let ret_code =
202                rados_nobjects_list_next(self.ctx, &mut entry_ptr, &mut key_ptr, &mut nspace_ptr);
203            if ret_code == -ENOENT {
204                // We're done
205                rados_nobjects_list_close(self.ctx);
206                None
207            } else if ret_code < 0 {
208                // Unknown error
209                None
210            } else {
211                let object_name = CStr::from_ptr(entry_ptr as *const ::libc::c_char);
212                let mut object_locator = String::new();
213                let mut namespace = String::new();
214                if !key_ptr.is_null() {
215                    object_locator.push_str(
216                        &CStr::from_ptr(key_ptr as *const ::libc::c_char).to_string_lossy(),
217                    );
218                }
219                if !nspace_ptr.is_null() {
220                    namespace.push_str(
221                        &CStr::from_ptr(nspace_ptr as *const ::libc::c_char).to_string_lossy(),
222                    );
223                }
224
225                Some(CephObject {
226                    name: object_name.to_string_lossy().into_owned(),
227                    entry_locator: object_locator,
228                    namespace,
229                })
230            }
231        }
232    }
233}
234
235/// A helper to create rados read operation
236/// An object read operation stores a number of operations which can be
237/// executed atomically.
238#[derive(Debug)]
239pub struct ReadOperation {
240    pub object_name: String,
241    /// flags are set by calling LIBRADOS_OPERATION_NOFLAG |
242    /// LIBRADOS_OPERATION_BALANCE_READS
243    /// all the other flags are documented in rados.rs
244    pub flags: u32,
245    read_op_handle: rados_read_op_t,
246}
247
248impl Drop for ReadOperation {
249    fn drop(&mut self) {
250        unsafe {
251            rados_release_read_op(self.read_op_handle);
252        }
253    }
254}
255
256/// A helper to create rados write operation
257/// An object write operation stores a number of operations which can be
258/// executed atomically.
259#[derive(Debug)]
260pub struct WriteOperation {
261    pub object_name: String,
262    /// flags are set by calling LIBRADOS_OPERATION_NOFLAG |
263    /// LIBRADOS_OPERATION_ORDER_READS_WRITES
264    /// all the other flags are documented in rados.rs
265    pub flags: u32,
266    pub mtime: time_t,
267    write_op_handle: rados_write_op_t,
268}
269
270impl Drop for WriteOperation {
271    fn drop(&mut self) {
272        unsafe {
273            rados_release_write_op(self.write_op_handle);
274        }
275    }
276}
277
278/// A rados object extended attribute with name and value.
279/// Can be iterated over
280#[derive(Debug)]
281pub struct XAttr {
282    pub name: String,
283    pub value: String,
284    iter: rados_xattrs_iter_t,
285}
286
287/// The version of the librados library.
288#[derive(Debug)]
289pub struct RadosVersion {
290    pub major: i32,
291    pub minor: i32,
292    pub extra: i32,
293}
294
295impl XAttr {
296    /// Creates a new XAttr.  Call rados_getxattrs to create the iterator for
297    /// this struct
298    pub fn new(iter: rados_xattrs_iter_t) -> XAttr {
299        XAttr {
300            name: String::new(),
301            value: String::new(),
302            iter,
303        }
304    }
305}
306
307impl Iterator for XAttr {
308    type Item = XAttr;
309
310    fn next(&mut self) -> Option<Self::Item> {
311        // max xattr name is 255 bytes from what I can find
312        let mut name: *const c_char = ptr::null();
313        // max xattr is 64Kb from what I can find
314        let mut value: *const c_char = ptr::null();
315        let mut val_length: usize = 0;
316        unsafe {
317            let ret_code = rados_getxattrs_next(self.iter, &mut name, &mut value, &mut val_length);
318
319            if ret_code < 0 {
320                // Something failed, however Iterator doesn't return Result so we return None
321                None
322            }
323            // end of iterator reached
324            else if value.is_null() && val_length == 0 {
325                rados_getxattrs_end(self.iter);
326                None
327            } else {
328                let name = CStr::from_ptr(name);
329                // value string
330                let s_bytes = std::slice::from_raw_parts(value, val_length);
331                // Convert from i8 -> u8
332                let bytes: Vec<u8> = s_bytes.iter().map(|c| *c as u8).collect();
333                Some(XAttr {
334                    name: name.to_string_lossy().into_owned(),
335                    value: String::from_utf8_lossy(&bytes).into_owned(),
336                    iter: self.iter,
337                })
338            }
339        }
340    }
341}
342
343/// Owns a ioctx handle
344pub struct IoCtx {
345    // This is pub within the crate to enable Completions to use it
346    pub ioctx: rados_ioctx_t,
347}
348
349unsafe impl Send for IoCtx {}
350unsafe impl Sync for IoCtx {}
351
352impl Drop for IoCtx {
353    fn drop(&mut self) {
354        assert!(self.ioctx.is_null(), "Rados not disconnected!");
355        // if !self.ioctx.is_null() {
356        //     unsafe {
357        //         rados_ioctx_destroy(self.ioctx);
358        //     }
359        // }
360    }
361}
362
363/// Owns a rados_striper handle
364#[cfg(feature = "rados_striper")]
365pub struct RadosStriper {
366    rados_striper: rados_ioctx_t,
367}
368
369#[cfg(feature = "rados_striper")]
370impl Drop for RadosStriper {
371    fn drop(&mut self) {
372        if !self.rados_striper.is_null() {
373            unsafe {
374                rados_striper_destroy(self.rados_striper);
375            }
376        }
377    }
378}
379
380/// Owns a rados handle
381pub struct Rados {
382    rados: rados_t,
383    phantom: PhantomData<IoCtx>,
384}
385
386unsafe impl Send for Rados {}
387unsafe impl Sync for Rados {}
388
389impl Drop for Rados {
390    fn drop(&mut self) {
391        assert!(self.rados.is_null(), "Rados not disconnected!");
392        // if !self.rados.is_null() {
393        //     unsafe {
394        //         rados_shutdown(self.rados);
395        //     }
396        // }
397    }
398}
399
400/// Connect to a Ceph cluster and return a connection handle rados_t
401pub fn connect_to_ceph(user_id: &str, config_file: &str) -> RadosResult<Rados> {
402    let connect_id = CString::new(user_id)?;
403    let conf_file = CString::new(config_file)?;
404    unsafe {
405        let mut cluster_handle: rados_t = ptr::null_mut();
406        let ret_code = rados_create(&mut cluster_handle, connect_id.as_ptr());
407        if ret_code < 0 {
408            return Err(ret_code.into());
409        }
410        let ret_code = rados_conf_read_file(cluster_handle, conf_file.as_ptr());
411        if ret_code < 0 {
412            return Err(ret_code.into());
413        }
414        let ret_code = rados_connect(cluster_handle);
415        if ret_code < 0 {
416            return Err(ret_code.into());
417        }
418        Ok(Rados {
419            rados: cluster_handle,
420            phantom: PhantomData,
421        })
422    }
423}
424
425/// Non-blocking wrapper for `connect_to_ceph`
426pub async fn connect_to_ceph_async(user_id: &str, config_file: &str) -> RadosResult<Rados> {
427    let user_id = user_id.to_string();
428    let config_file = config_file.to_string();
429
430    // librados doesn't have async initialization, so wrap it in a thread pool.
431    let pool = futures::executor::ThreadPool::builder()
432        .pool_size(1)
433        .create()
434        .expect("Could not spawn thread pool");
435    pool.spawn_with_handle(async move { connect_to_ceph(&user_id, &config_file) })
436        .expect("Could not spawn background task")
437        .await
438}
439
440/// The lifetime of `Rados` ends up after disconnect_from_ceph,
441/// so just move it away.
442pub async fn disconnect_from_ceph_async(mut rados: Rados) {
443    let pool = futures::executor::ThreadPool::builder()
444        .pool_size(1)
445        .create()
446        .expect("Could not spawn thread pool");
447    pool.spawn_with_handle(async move { rados.disconnect_from_ceph() })
448        .expect("Could not spawn background task")
449        .await;
450}
451
452impl Rados {
453    pub fn inner(&self) -> &rados_t {
454        &self.rados
455    }
456
457    fn set_null(&mut self) {
458        self.rados = ptr::null_mut();
459    }
460
461    /// Disconnect from a Ceph cluster and destroy the connection handle rados_t
462    /// For clean up, this is only necessary after connect_to_ceph() has
463    /// succeeded.
464    pub fn disconnect_from_ceph(&mut self) {
465        if self.rados.is_null() {
466            // No need to do anything
467            return;
468        }
469        unsafe {
470            rados_shutdown(self.rados);
471        }
472        // avoid double free
473        self.set_null();
474    }
475
476    fn conn_guard(&self) -> RadosResult<()> {
477        if self.rados.is_null() {
478            return Err(RadosError::new(
479                "Rados not connected.  Please initialize cluster".to_string(),
480            ));
481        }
482        Ok(())
483    }
484
485    /// Set the value of a configuration option
486    pub fn config_set(&self, name: &str, value: &str) -> RadosResult<()> {
487        if !self.rados.is_null() {
488            return Err(RadosError::new(
489                "Rados should not be connected when this function is called".into(),
490            ));
491        }
492        let name_str = CString::new(name)?;
493        let value_str = CString::new(value)?;
494        unsafe {
495            let ret_code = rados_conf_set(self.rados, name_str.as_ptr(), value_str.as_ptr());
496            if ret_code < 0 {
497                return Err(ret_code.into());
498            }
499        }
500        Ok(())
501    }
502
503    /// Get the value of a configuration option
504    pub fn config_get(&self, name: &str) -> RadosResult<String> {
505        let name_str = CString::new(name)?;
506        // 5K should be plenty for a config key right?
507        let mut buffer: Vec<u8> = Vec::with_capacity(5120);
508        unsafe {
509            let ret_code = rados_conf_get(
510                self.rados,
511                name_str.as_ptr(),
512                buffer.as_mut_ptr() as *mut c_char,
513                buffer.capacity(),
514            );
515            if ret_code < 0 {
516                return Err(ret_code.into());
517            }
518            // Ceph doesn't return how many bytes were written
519            buffer.set_len(5120);
520            // We need to search for the first NUL byte
521            let num_bytes = buffer.iter().position(|x| x == &0u8);
522            buffer.set_len(num_bytes.unwrap_or(0));
523            Ok(String::from_utf8_lossy(&buffer).into_owned())
524        }
525    }
526
527    /// Create an io context. The io context allows you to perform operations
528    /// within a particular pool.
529    /// For more details see rados_ioctx_t.
530    pub fn get_rados_ioctx(&self, pool_name: &str) -> RadosResult<IoCtx> {
531        self.conn_guard()?;
532        let pool_name_str = CString::new(pool_name)?;
533        unsafe {
534            let mut ioctx: rados_ioctx_t = ptr::null_mut();
535            let ret_code = rados_ioctx_create(self.rados, pool_name_str.as_ptr(), &mut ioctx);
536            if ret_code < 0 {
537                return Err(ret_code.into());
538            }
539            Ok(IoCtx { ioctx })
540        }
541    }
542
543    /// Create an io context. The io context allows you to perform operations
544    /// within a particular pool.
545    /// For more details see rados_ioctx_t.
546    pub fn get_rados_ioctx2(&self, pool_id: i64) -> RadosResult<IoCtx> {
547        self.conn_guard()?;
548        unsafe {
549            let mut ioctx: rados_ioctx_t = ptr::null_mut();
550            let ret_code = rados_ioctx_create2(self.rados, pool_id, &mut ioctx);
551            if ret_code < 0 {
552                return Err(ret_code.into());
553            }
554            Ok(IoCtx { ioctx })
555        }
556    }
557}
558
559/// The lifetime of `IoCtx` ends up after destroy_rados_ioctx,
560/// so just move it away.
561pub async fn destroy_rados_ioctx_async(mut ioctx: IoCtx) {
562    let pool = futures::executor::ThreadPool::builder()
563        .pool_size(1)
564        .create()
565        .expect("Could not spawn thread pool");
566    pool.spawn_with_handle(async move { ioctx.destroy_rados_ioctx() })
567        .expect("Could not spawn background task")
568        .await;
569}
570
571impl IoCtx {
572    pub fn inner(&self) -> &rados_ioctx_t {
573        &self.ioctx
574    }
575
576    fn set_null(&mut self) {
577        self.ioctx = ptr::null_mut();
578    }
579
580    /// This just tells librados that you no longer need to use the io context.
581    /// It may not be freed immediately if there are pending asynchronous
582    /// requests on it, but you
583    /// should not use an io context again after calling this function on it.
584    /// This does not guarantee any asynchronous writes have completed. You must
585    /// call rados_aio_flush()
586    /// on the io context before destroying it to do that.
587    pub fn destroy_rados_ioctx(&mut self) {
588        if self.ioctx.is_null() {
589            // No need to do anything
590            return;
591        }
592        unsafe {
593            rados_ioctx_destroy(self.ioctx);
594        }
595        // avoid double free
596        self.set_null();
597    }
598    fn ioctx_guard(&self) -> RadosResult<()> {
599        if self.ioctx.is_null() {
600            return Err(RadosError::new(
601                "Rados ioctx not created.  Please initialize first".to_string(),
602            ));
603        }
604        Ok(())
605    }
606    /// Note: Ceph uses kibibytes: https://en.wikipedia.org/wiki/Kibibyte
607    pub fn rados_stat_pool(&self) -> RadosResult<Struct_rados_pool_stat_t> {
608        self.ioctx_guard()?;
609        let mut pool_stat = Struct_rados_pool_stat_t::default();
610        unsafe {
611            let ret_code = rados_ioctx_pool_stat(self.ioctx, &mut pool_stat);
612            if ret_code < 0 {
613                return Err(ret_code.into());
614            }
615            Ok(pool_stat)
616        }
617    }
618
619    pub fn rados_pool_set_auid(&self, auid: u64) -> RadosResult<()> {
620        self.ioctx_guard()?;
621        unsafe {
622            let ret_code = rados_ioctx_pool_set_auid(self.ioctx, auid);
623            if ret_code < 0 {
624                return Err(ret_code.into());
625            }
626            Ok(())
627        }
628    }
629
630    pub fn rados_pool_get_auid(&self) -> RadosResult<u64> {
631        self.ioctx_guard()?;
632        let mut auid: u64 = 0;
633        unsafe {
634            let ret_code = rados_ioctx_pool_get_auid(self.ioctx, &mut auid);
635            if ret_code < 0 {
636                return Err(ret_code.into());
637            }
638            Ok(auid)
639        }
640    }
641
642    /// Test whether the specified pool requires alignment or not.
643    pub fn rados_pool_requires_alignment(&self) -> RadosResult<bool> {
644        self.ioctx_guard()?;
645        unsafe {
646            let ret_code = rados_ioctx_pool_requires_alignment(self.ioctx);
647            if ret_code < 0 {
648                return Err(ret_code.into());
649            }
650            if ret_code == 0 {
651                Ok(false)
652            } else {
653                Ok(true)
654            }
655        }
656    }
657
658    /// Get the alignment flavor of a pool
659    pub fn rados_pool_required_alignment(&self) -> RadosResult<u64> {
660        self.ioctx_guard()?;
661        unsafe {
662            let ret_code = rados_ioctx_pool_required_alignment(self.ioctx);
663            Ok(ret_code)
664        }
665    }
666
667    /// Get the pool id of the io context
668    pub fn rados_object_get_id(&self) -> RadosResult<i64> {
669        self.ioctx_guard()?;
670        unsafe {
671            let pool_id = rados_ioctx_get_id(self.ioctx);
672            Ok(pool_id)
673        }
674    }
675
676    /// Get the pool name of the io context
677    pub fn rados_get_pool_name(&self) -> RadosResult<String> {
678        self.ioctx_guard()?;
679        let mut buffer: Vec<u8> = Vec::with_capacity(500);
680
681        unsafe {
682            // length of string stored, or -ERANGE if buffer too small
683            let ret_code = rados_ioctx_get_pool_name(
684                self.ioctx,
685                buffer.as_mut_ptr() as *mut c_char,
686                buffer.capacity() as c_uint,
687            );
688            if ret_code == -ERANGE {
689                // Buffer was too small
690                buffer.reserve(1000);
691                buffer.set_len(1000);
692                let ret_code = rados_ioctx_get_pool_name(
693                    self.ioctx,
694                    buffer.as_mut_ptr() as *mut c_char,
695                    buffer.capacity() as c_uint,
696                );
697                if ret_code < 0 {
698                    return Err(ret_code.into());
699                }
700                Ok(String::from_utf8_lossy(&buffer).into_owned())
701            } else if ret_code < 0 {
702                Err(ret_code.into())
703            } else {
704                buffer.set_len(ret_code as usize);
705                Ok(String::from_utf8_lossy(&buffer).into_owned())
706            }
707        }
708    }
709
710    /// Set the key for mapping objects to pgs within an io context.
711    pub fn rados_locator_set_key(&self, key: &str) -> RadosResult<()> {
712        self.ioctx_guard()?;
713        let key_str = CString::new(key)?;
714        unsafe {
715            rados_ioctx_locator_set_key(self.ioctx, key_str.as_ptr());
716        }
717        Ok(())
718    }
719
720    /// Set the namespace for objects within an io context
721    /// The namespace specification further refines a pool into different
722    /// domains. The mapping of objects to pgs is also based on this value.
723    pub fn rados_set_namespace(&self, namespace: &str) -> RadosResult<()> {
724        self.ioctx_guard()?;
725        let namespace_str = CString::new(namespace)?;
726        unsafe {
727            rados_ioctx_set_namespace(self.ioctx, namespace_str.as_ptr());
728        }
729        Ok(())
730    }
731
732    /// Start listing objects in a pool
733    pub fn rados_list_pool_objects(&self) -> RadosResult<rados_list_ctx_t> {
734        self.ioctx_guard()?;
735        let mut rados_list_ctx: rados_list_ctx_t = ptr::null_mut();
736        unsafe {
737            let ret_code = rados_nobjects_list_open(self.ioctx, &mut rados_list_ctx);
738            if ret_code < 0 {
739                return Err(ret_code.into());
740            }
741        }
742        Ok(rados_list_ctx)
743    }
744
745    /// Create a pool-wide snapshot
746    pub fn rados_snap_create(&self, snap_name: &str) -> RadosResult<()> {
747        self.ioctx_guard()?;
748
749        let snap_name_str = CString::new(snap_name)?;
750        unsafe {
751            let ret_code = rados_ioctx_snap_create(self.ioctx, snap_name_str.as_ptr());
752            if ret_code < 0 {
753                return Err(ret_code.into());
754            }
755        }
756        Ok(())
757    }
758
759    /// Delete a pool snapshot
760    pub fn rados_snap_remove(&self, snap_name: &str) -> RadosResult<()> {
761        self.ioctx_guard()?;
762        let snap_name_str = CString::new(snap_name)?;
763
764        unsafe {
765            let ret_code = rados_ioctx_snap_remove(self.ioctx, snap_name_str.as_ptr());
766            if ret_code < 0 {
767                return Err(ret_code.into());
768            }
769        }
770        Ok(())
771    }
772
773    /// Rollback an object to a pool snapshot
774    /// The contents of the object will be the same as when the snapshot was
775    /// taken.
776    pub fn rados_snap_rollback(&self, object_name: &str, snap_name: &str) -> RadosResult<()> {
777        self.ioctx_guard()?;
778        let snap_name_str = CString::new(snap_name)?;
779        let object_name_str = CString::new(object_name)?;
780
781        unsafe {
782            let ret_code = rados_ioctx_snap_rollback(
783                self.ioctx,
784                object_name_str.as_ptr(),
785                snap_name_str.as_ptr(),
786            );
787            if ret_code < 0 {
788                return Err(ret_code.into());
789            }
790        }
791        Ok(())
792    }
793
794    /// Set the snapshot from which reads are performed.
795    /// Subsequent reads will return data as it was at the time of that
796    /// snapshot.
797    pub fn rados_snap_set_read(&self, snap_id: u64) -> RadosResult<()> {
798        self.ioctx_guard()?;
799
800        unsafe {
801            rados_ioctx_snap_set_read(self.ioctx, snap_id);
802        }
803        Ok(())
804    }
805
806    /// Allocate an ID for a self-managed snapshot
807    /// Get a unique ID to put in the snaphot context to create a snapshot.
808    /// A clone of an object is not created until a write with the new snapshot
809    /// context is completed.
810    pub fn rados_selfmanaged_snap_create(&self) -> RadosResult<u64> {
811        self.ioctx_guard()?;
812        let mut snap_id: u64 = 0;
813        unsafe {
814            let ret_code = rados_ioctx_selfmanaged_snap_create(self.ioctx, &mut snap_id);
815            if ret_code < 0 {
816                return Err(ret_code.into());
817            }
818        }
819        Ok(snap_id)
820    }
821
822    /// Remove a self-managed snapshot
823    /// This increases the snapshot sequence number, which will cause snapshots
824    /// to be removed lazily.
825    pub fn rados_selfmanaged_snap_remove(&self, snap_id: u64) -> RadosResult<()> {
826        self.ioctx_guard()?;
827
828        unsafe {
829            let ret_code = rados_ioctx_selfmanaged_snap_remove(self.ioctx, snap_id);
830            if ret_code < 0 {
831                return Err(ret_code.into());
832            }
833        }
834        Ok(())
835    }
836
837    /// Rollback an object to a self-managed snapshot
838    /// The contents of the object will be the same as when the snapshot was
839    /// taken.
840    pub fn rados_selfmanaged_snap_rollback(
841        &self,
842        object_name: &str,
843        snap_id: u64,
844    ) -> RadosResult<()> {
845        self.ioctx_guard()?;
846        let object_name_str = CString::new(object_name)?;
847
848        unsafe {
849            let ret_code = rados_ioctx_selfmanaged_snap_rollback(
850                self.ioctx,
851                object_name_str.as_ptr(),
852                snap_id,
853            );
854            if ret_code < 0 {
855                return Err(ret_code.into());
856            }
857        }
858        Ok(())
859    }
860
861    /// Set the snapshot context for use when writing to objects
862    /// This is stored in the io context, and applies to all future writes.
863    // pub fn rados_selfmanaged_snap_set_write_ctx(ctx: rados_ioctx_t) ->
864    // RadosResult<()> {
865    // if ctx.is_null() {
866    // return Err(RadosError::new("Rados ioctx not created.  Please initialize
867    // first".to_string()));
868    // }
869    //
870    // unsafe {
871    // }
872    // }
873    /// List all the ids of pool snapshots
874    // pub fn rados_snap_list(ctx: rados_ioctx_t, snaps: *mut rados_snap_t) ->
875    // RadosResult<()> {
876    // if ctx.is_null() {
877    // return Err(RadosError::new("Rados ioctx not created.  Please initialize
878    // first".to_string()));
879    // }
880    // let mut buffer: Vec<u64> = Vec::with_capacity(500);
881    //
882    //
883    // unsafe {
884    // let ret_code = rados_ioctx_snap_list(ctx, &mut buffer, buffer.capacity());
885    // if ret_code == -ERANGE {
886    // }
887    // if ret_code < 0 {
888    // return Err(ret_code.into());
889    // }
890    // }
891    // Ok(buffer)
892    // }
893    /// Get the id of a pool snapshot
894    pub fn rados_snap_lookup(&self, snap_name: &str) -> RadosResult<u64> {
895        self.ioctx_guard()?;
896        let snap_name_str = CString::new(snap_name)?;
897        let mut snap_id: u64 = 0;
898        unsafe {
899            let ret_code =
900                rados_ioctx_snap_lookup(self.ioctx, snap_name_str.as_ptr(), &mut snap_id);
901            if ret_code < 0 {
902                return Err(ret_code.into());
903            }
904        }
905        Ok(snap_id)
906    }
907
908    /// Get the name of a pool snapshot
909    pub fn rados_snap_get_name(&self, snap_id: u64) -> RadosResult<String> {
910        self.ioctx_guard()?;
911
912        let out_buffer: Vec<u8> = Vec::with_capacity(500);
913        let out_buff_size = out_buffer.capacity();
914        let out_str = CString::new(out_buffer)?;
915        unsafe {
916            let ret_code = rados_ioctx_snap_get_name(
917                self.ioctx,
918                snap_id,
919                out_str.as_ptr() as *mut c_char,
920                out_buff_size as c_int,
921            );
922            if ret_code == -ERANGE {}
923            if ret_code < 0 {
924                return Err(ret_code.into());
925            }
926        }
927        Ok(out_str.to_string_lossy().into_owned())
928    }
929
930    /// Find when a pool snapshot occurred
931    pub fn rados_snap_get_stamp(&self, snap_id: u64) -> RadosResult<time_t> {
932        self.ioctx_guard()?;
933
934        let mut time_id: time_t = 0;
935        unsafe {
936            let ret_code = rados_ioctx_snap_get_stamp(self.ioctx, snap_id, &mut time_id);
937            if ret_code < 0 {
938                return Err(ret_code.into());
939            }
940        }
941        Ok(time_id)
942    }
943
944    /// Return the version of the last object read or written to.
945    /// This exposes the internal version number of the last object read or
946    /// written via this io context
947    pub fn rados_get_object_last_version(&self) -> RadosResult<u64> {
948        self.ioctx_guard()?;
949        unsafe {
950            let obj_id = rados_get_last_version(self.ioctx);
951            Ok(obj_id)
952        }
953    }
954
955    /// Write len bytes from buf into the oid object, starting at offset off.
956    /// The value of len must be <= UINT_MAX/2.
957    pub fn rados_object_write(
958        &self,
959        object_name: &str,
960        buffer: &[u8],
961        offset: u64,
962    ) -> RadosResult<()> {
963        self.ioctx_guard()?;
964        let obj_name_str = CString::new(object_name)?;
965
966        unsafe {
967            let ret_code = rados_write(
968                self.ioctx,
969                obj_name_str.as_ptr(),
970                buffer.as_ptr() as *const c_char,
971                buffer.len(),
972                offset,
973            );
974            if ret_code < 0 {
975                return Err(ret_code.into());
976            }
977        }
978        Ok(())
979    }
980
981    /// The object is filled with the provided data. If the object exists, it is
982    /// atomically
983    /// truncated and then written.
984    pub fn rados_object_write_full(&self, object_name: &str, buffer: &[u8]) -> RadosResult<()> {
985        self.ioctx_guard()?;
986        let obj_name_str = CString::new(object_name)?;
987
988        unsafe {
989            let ret_code = rados_write_full(
990                self.ioctx,
991                obj_name_str.as_ptr(),
992                buffer.as_ptr() as *const ::libc::c_char,
993                buffer.len(),
994            );
995            if ret_code < 0 {
996                return Err(ret_code.into());
997            }
998        }
999        Ok(())
1000    }
1001
1002    pub async fn rados_async_object_write(
1003        &self,
1004        object_name: &str,
1005        buffer: &[u8],
1006        offset: u64,
1007    ) -> RadosResult<u32> {
1008        self.ioctx_guard()?;
1009        let obj_name_str = CString::new(object_name)?;
1010
1011        with_completion(&self, |c| unsafe {
1012            rados_aio_write(
1013                self.ioctx,
1014                obj_name_str.as_ptr(),
1015                c,
1016                buffer.as_ptr() as *const ::libc::c_char,
1017                buffer.len(),
1018                offset,
1019            )
1020        })?
1021        .await
1022    }
1023
1024    /// Async variant of rados_object_append
1025    pub async fn rados_async_object_append(
1026        self: &Arc<Self>,
1027        object_name: &str,
1028        buffer: &[u8],
1029    ) -> RadosResult<u32> {
1030        self.ioctx_guard()?;
1031        let obj_name_str = CString::new(object_name)?;
1032
1033        with_completion(self, |c| unsafe {
1034            rados_aio_append(
1035                self.ioctx,
1036                obj_name_str.as_ptr(),
1037                c,
1038                buffer.as_ptr() as *const ::libc::c_char,
1039                buffer.len(),
1040            )
1041        })?
1042        .await
1043    }
1044
1045    /// Async variant of rados_object_write_full
1046    pub async fn rados_async_object_write_full(
1047        &self,
1048        object_name: &str,
1049        buffer: &[u8],
1050    ) -> RadosResult<u32> {
1051        self.ioctx_guard()?;
1052        let obj_name_str = CString::new(object_name)?;
1053
1054        with_completion(&self, |c| unsafe {
1055            rados_aio_write_full(
1056                self.ioctx,
1057                obj_name_str.as_ptr(),
1058                c,
1059                buffer.as_ptr() as *const ::libc::c_char,
1060                buffer.len(),
1061            )
1062        })?
1063        .await
1064    }
1065
1066    /// Async variant of rados_object_remove
1067    pub async fn rados_async_object_remove(&self, object_name: &str) -> RadosResult<()> {
1068        self.ioctx_guard()?;
1069        let object_name_str = CString::new(object_name)?;
1070
1071        with_completion(self, |c| unsafe {
1072            rados_aio_remove(self.ioctx, object_name_str.as_ptr() as *const c_char, c)
1073        })?
1074        .await
1075        .map(|_r| ())
1076    }
1077
1078    /// Async variant of rados_object_read
1079    pub async fn rados_async_object_read(
1080        &self,
1081        object_name: &str,
1082        fill_buffer: &mut Vec<u8>,
1083        read_offset: u64,
1084    ) -> RadosResult<u32> {
1085        self.ioctx_guard()?;
1086        let obj_name_str = CString::new(object_name)?;
1087
1088        if fill_buffer.capacity() == 0 {
1089            fill_buffer.reserve_exact(DEFAULT_READ_BYTES);
1090        }
1091
1092        let result = with_completion(self, |c| unsafe {
1093            rados_aio_read(
1094                self.ioctx,
1095                obj_name_str.as_ptr(),
1096                c,
1097                fill_buffer.as_mut_ptr() as *mut c_char,
1098                fill_buffer.capacity(),
1099                read_offset,
1100            )
1101        })?
1102        .await;
1103
1104        if let Ok(rval) = &result {
1105            unsafe {
1106                let len = *rval as usize;
1107                assert!(len <= fill_buffer.capacity());
1108                fill_buffer.set_len(len);
1109            }
1110        }
1111
1112        result
1113    }
1114
1115    /// Streaming read of a RADOS object.  The `ReadStream` object implements `futures::Stream`
1116    /// for use with Stream-aware code like hyper's Body::wrap_stream.
1117    ///
1118    /// Useful for reading large objects incrementally, or anywhere you are using an interface
1119    /// that expects a stream (such as proxying objects via an HTTP server).
1120    ///
1121    /// Efficiency: If size_hint is not specified, and this function is used on a small object, it will
1122    /// issue spurious read-ahead operations beyond the object's size.
1123    /// If you have an object that you know is small, prefer to use a single `rados_async_object_read`
1124    /// instead of this streaming variant.
1125    ///
1126    /// * `buffer_size` - How much data should be read per rados read operation.  This is also
1127    ///   how much data is emitted in each Item from the stream.
1128    /// * `concurrency` - How many RADOS operations should be run in parallel for this stream,
1129    ///   or None to use a default.
1130    /// * `size_hint` - If you have prior knowledge of the object's size in bytes, pass it here to enable
1131    ///   the stream to issue fewer read-ahead operations than it would by default.  This is just
1132    ///   a hint, and does not bound the data returned -- if the object is smaller or larger
1133    ///   than `size_hint` then the actual object size will be reflected in the stream's output.
1134    pub fn rados_async_object_read_stream(
1135        &self,
1136        object_name: &str,
1137        buffer_size: Option<usize>,
1138        concurrency: Option<usize>,
1139        size_hint: Option<u64>,
1140    ) -> ReadStream<'_> {
1141        ReadStream::new(self, object_name, buffer_size, concurrency, size_hint)
1142    }
1143
1144    /// Streaming write of a RADOS object.  The `WriteSink` object implements `futures::Sink`.  Combine
1145    /// it with other stream-aware code, or bring the SinkExt trait into scope to get methods
1146    /// like send, send_all.
1147    ///
1148    /// Efficiency: this class does not coalesce writes, so each Item you send into it,
1149    ///
1150    ///
1151    /// * `concurrency` - How many RADOS operations should be run in parallel for this stream,
1152    ///   or None to use a default.
1153    pub fn rados_async_object_write_stream(
1154        &self,
1155        object_name: &str,
1156        concurrency: Option<usize>,
1157    ) -> WriteSink<'_> {
1158        WriteSink::new(self, object_name, concurrency)
1159    }
1160
1161    /// Get object stats (size,SystemTime)
1162    pub async fn rados_async_object_stat(
1163        &self,
1164        object_name: &str,
1165    ) -> RadosResult<(u64, SystemTime)> {
1166        self.ioctx_guard()?;
1167        let object_name_str = CString::new(object_name)?;
1168        let mut psize: u64 = 0;
1169        let mut time: ::libc::time_t = 0;
1170
1171        with_completion(self, |c| unsafe {
1172            rados_aio_stat(
1173                self.ioctx,
1174                object_name_str.as_ptr(),
1175                c,
1176                &mut psize,
1177                &mut time,
1178            )
1179        })?
1180        .await?;
1181        Ok((psize, (UNIX_EPOCH + Duration::from_secs(time as u64))))
1182    }
1183
1184    pub fn rados_async_object_list(&self) -> RadosResult<ListStream> {
1185        self.ioctx_guard()?;
1186        let mut rados_list_ctx: rados_list_ctx_t = ptr::null_mut();
1187        unsafe {
1188            let r = rados_nobjects_list_open(self.ioctx, &mut rados_list_ctx);
1189            if r == 0 {
1190                Ok(ListStream::new(rados_list_ctx))
1191            } else {
1192                Err(r.into())
1193            }
1194        }
1195    }
1196
1197    /// Async variant of rados_object_getxattr
1198    pub async fn rados_async_object_getxattr(
1199        &self,
1200        object_name: &str,
1201        attr_name: &str,
1202        fill_buffer: &mut [u8],
1203    ) -> RadosResult<u32> {
1204        self.ioctx_guard()?;
1205        let object_name_str = CString::new(object_name)?;
1206        let attr_name_str = CString::new(attr_name)?;
1207
1208        with_completion(self, |c| unsafe {
1209            rados_aio_getxattr(
1210                self.ioctx,
1211                object_name_str.as_ptr() as *const c_char,
1212                c,
1213                attr_name_str.as_ptr() as *const c_char,
1214                fill_buffer.as_mut_ptr() as *mut c_char,
1215                fill_buffer.len(),
1216            )
1217        })?
1218        .await
1219    }
1220
1221    /// Async variant of rados_object_setxattr
1222    pub async fn rados_async_object_setxattr(
1223        &self,
1224        object_name: &str,
1225        attr_name: &str,
1226        attr_value: &[u8],
1227    ) -> RadosResult<u32> {
1228        self.ioctx_guard()?;
1229        let object_name_str = CString::new(object_name)?;
1230        let attr_name_str = CString::new(attr_name)?;
1231
1232        with_completion(self, |c| unsafe {
1233            rados_aio_setxattr(
1234                self.ioctx,
1235                object_name_str.as_ptr() as *const c_char,
1236                c,
1237                attr_name_str.as_ptr() as *const c_char,
1238                attr_value.as_ptr() as *mut c_char,
1239                attr_value.len(),
1240            )
1241        })?
1242        .await
1243    }
1244
1245    /// Async variant of rados_object_rmxattr
1246    pub async fn rados_async_object_rmxattr(
1247        &self,
1248        object_name: &str,
1249        attr_name: &str,
1250    ) -> RadosResult<u32> {
1251        self.ioctx_guard()?;
1252        let object_name_str = CString::new(object_name)?;
1253        let attr_name_str = CString::new(attr_name)?;
1254
1255        with_completion(self, |c| unsafe {
1256            rados_aio_rmxattr(
1257                self.ioctx,
1258                object_name_str.as_ptr() as *const c_char,
1259                c,
1260                attr_name_str.as_ptr() as *const c_char,
1261            )
1262        })?
1263        .await
1264    }
1265
1266    /// Efficiently copy a portion of one object to another
1267    /// If the underlying filesystem on the OSD supports it, this will be a
1268    /// copy-on-write clone.
1269    /// The src and dest objects must be in the same pg. To ensure this, the io
1270    /// context should
1271    /// have a locator key set (see rados_ioctx_locator_set_key()).
1272    pub fn rados_object_clone_range(
1273        &self,
1274        dst_object_name: &str,
1275        dst_offset: u64,
1276        src_object_name: &str,
1277        src_offset: u64,
1278        length: usize,
1279    ) -> RadosResult<()> {
1280        self.ioctx_guard()?;
1281        let dst_name_str = CString::new(dst_object_name)?;
1282        let src_name_str = CString::new(src_object_name)?;
1283
1284        unsafe {
1285            let ret_code = rados_clone_range(
1286                self.ioctx,
1287                dst_name_str.as_ptr(),
1288                dst_offset,
1289                src_name_str.as_ptr(),
1290                src_offset,
1291                length,
1292            );
1293            if ret_code < 0 {
1294                return Err(ret_code.into());
1295            }
1296        }
1297        Ok(())
1298    }
1299
1300    /// Append len bytes from buf into the oid object.
1301    pub fn rados_object_append(&self, object_name: &str, buffer: &[u8]) -> RadosResult<()> {
1302        self.ioctx_guard()?;
1303        let obj_name_str = CString::new(object_name)?;
1304
1305        unsafe {
1306            let ret_code = rados_append(
1307                self.ioctx,
1308                obj_name_str.as_ptr(),
1309                buffer.as_ptr() as *const c_char,
1310                buffer.len(),
1311            );
1312            if ret_code < 0 {
1313                return Err(ret_code.into());
1314            }
1315        }
1316        Ok(())
1317    }
1318
1319    /// Read data from an object.  This fills the slice given and returns the
1320    /// amount of bytes read
1321    /// The io context determines the snapshot to read from, if any was set by
1322    /// rados_ioctx_snap_set_read().
1323    /// Default read size is 64K unless you call Vec::with_capacity
1324    /// with a larger size.
1325    pub fn rados_object_read(
1326        &self,
1327        object_name: &str,
1328        fill_buffer: &mut Vec<u8>,
1329        read_offset: u64,
1330    ) -> RadosResult<i32> {
1331        self.ioctx_guard()?;
1332        let object_name_str = CString::new(object_name)?;
1333        let mut len = fill_buffer.capacity();
1334        if len == 0 {
1335            fill_buffer.reserve_exact(DEFAULT_READ_BYTES);
1336            len = fill_buffer.capacity();
1337        }
1338
1339        unsafe {
1340            let ret_code = rados_read(
1341                self.ioctx,
1342                object_name_str.as_ptr(),
1343                fill_buffer.as_mut_ptr() as *mut c_char,
1344                len,
1345                read_offset,
1346            );
1347            if ret_code < 0 {
1348                return Err(ret_code.into());
1349            }
1350            fill_buffer.set_len(ret_code as usize);
1351            Ok(ret_code)
1352        }
1353    }
1354
1355    /// Delete an object
1356    /// Note: This does not delete any snapshots of the object.
1357    pub fn rados_object_remove(&self, object_name: &str) -> RadosResult<()> {
1358        self.ioctx_guard()?;
1359        let object_name_str = CString::new(object_name)?;
1360
1361        unsafe {
1362            let ret_code = rados_remove(self.ioctx, object_name_str.as_ptr() as *const c_char);
1363            if ret_code < 0 {
1364                return Err(ret_code.into());
1365            }
1366        }
1367        Ok(())
1368    }
1369
1370    /// Resize an object
1371    /// If this enlarges the object, the new area is logically filled with
1372    /// zeroes. If this shrinks the object, the excess data is removed.
1373    pub fn rados_object_trunc(&self, object_name: &str, new_size: u64) -> RadosResult<()> {
1374        self.ioctx_guard()?;
1375        let object_name_str = CString::new(object_name)?;
1376
1377        unsafe {
1378            let ret_code = rados_trunc(self.ioctx, object_name_str.as_ptr(), new_size);
1379            if ret_code < 0 {
1380                return Err(ret_code.into());
1381            }
1382        }
1383        Ok(())
1384    }
1385
1386    /// Get the value of an extended attribute on an object.
1387    pub fn rados_object_getxattr(
1388        &self,
1389        object_name: &str,
1390        attr_name: &str,
1391        fill_buffer: &mut [u8],
1392    ) -> RadosResult<i32> {
1393        self.ioctx_guard()?;
1394        let object_name_str = CString::new(object_name)?;
1395        let attr_name_str = CString::new(attr_name)?;
1396
1397        unsafe {
1398            let ret_code = rados_getxattr(
1399                self.ioctx,
1400                object_name_str.as_ptr() as *const c_char,
1401                attr_name_str.as_ptr() as *const c_char,
1402                fill_buffer.as_mut_ptr() as *mut c_char,
1403                fill_buffer.len(),
1404            );
1405            if ret_code < 0 {
1406                return Err(ret_code.into());
1407            }
1408            Ok(ret_code)
1409        }
1410    }
1411
1412    /// Set an extended attribute on an object.
1413    pub fn rados_object_setxattr(
1414        &self,
1415        object_name: &str,
1416        attr_name: &str,
1417        attr_value: &mut [u8],
1418    ) -> RadosResult<()> {
1419        self.ioctx_guard()?;
1420        let object_name_str = CString::new(object_name)?;
1421        let attr_name_str = CString::new(attr_name)?;
1422
1423        unsafe {
1424            let ret_code = rados_setxattr(
1425                self.ioctx,
1426                object_name_str.as_ptr() as *const c_char,
1427                attr_name_str.as_ptr() as *const c_char,
1428                attr_value.as_mut_ptr() as *mut c_char,
1429                attr_value.len(),
1430            );
1431            if ret_code < 0 {
1432                return Err(ret_code.into());
1433            }
1434        }
1435        Ok(())
1436    }
1437
1438    /// Delete an extended attribute from an object.
1439    pub fn rados_object_rmxattr(&self, object_name: &str, attr_name: &str) -> RadosResult<()> {
1440        self.ioctx_guard()?;
1441        let object_name_str = CString::new(object_name)?;
1442        let attr_name_str = CString::new(attr_name)?;
1443
1444        unsafe {
1445            let ret_code = rados_rmxattr(
1446                self.ioctx,
1447                object_name_str.as_ptr() as *const c_char,
1448                attr_name_str.as_ptr() as *const c_char,
1449            );
1450            if ret_code < 0 {
1451                return Err(ret_code.into());
1452            }
1453        }
1454        Ok(())
1455    }
1456
1457    /// Get the rados_xattrs_iter_t reference to iterate over xattrs on an
1458    /// object Used in conjuction with XAttr::new() to iterate.
1459    pub fn rados_get_xattr_iterator(&self, object_name: &str) -> RadosResult<rados_xattrs_iter_t> {
1460        self.ioctx_guard()?;
1461        let object_name_str = CString::new(object_name)?;
1462        let mut xattr_iterator_handle: rados_xattrs_iter_t = ptr::null_mut();
1463
1464        unsafe {
1465            let ret_code = rados_getxattrs(
1466                self.ioctx,
1467                object_name_str.as_ptr(),
1468                &mut xattr_iterator_handle,
1469            );
1470            if ret_code < 0 {
1471                return Err(ret_code.into());
1472            }
1473        }
1474        Ok(xattr_iterator_handle)
1475    }
1476
1477    /// Get object stats (size,SystemTime)
1478    pub fn rados_object_stat(&self, object_name: &str) -> RadosResult<(u64, SystemTime)> {
1479        self.ioctx_guard()?;
1480        let object_name_str = CString::new(object_name)?;
1481        let mut psize: u64 = 0;
1482        let mut time: ::libc::time_t = 0;
1483
1484        unsafe {
1485            let ret_code = rados_stat(self.ioctx, object_name_str.as_ptr(), &mut psize, &mut time);
1486            if ret_code < 0 {
1487                return Err(ret_code.into());
1488            }
1489        }
1490        Ok((psize, (UNIX_EPOCH + Duration::from_secs(time as u64))))
1491    }
1492
1493    /// Update tmap (trivial map)
1494    pub fn rados_object_tmap_update(
1495        &self,
1496        object_name: &str,
1497        update: TmapOperation,
1498    ) -> RadosResult<()> {
1499        self.ioctx_guard()?;
1500        let object_name_str = CString::new(object_name)?;
1501        let buffer = update.serialize()?;
1502        unsafe {
1503            let ret_code = rados_tmap_update(
1504                self.ioctx,
1505                object_name_str.as_ptr(),
1506                buffer.as_ptr() as *const c_char,
1507                buffer.len(),
1508            );
1509            if ret_code < 0 {
1510                return Err(ret_code.into());
1511            }
1512        }
1513        Ok(())
1514    }
1515
1516    /// Fetch complete tmap (trivial map) object
1517    pub fn rados_object_tmap_get(&self, object_name: &str) -> RadosResult<Vec<TmapOperation>> {
1518        self.ioctx_guard()?;
1519        let object_name_str = CString::new(object_name)?;
1520        let mut buffer: Vec<u8> = Vec::with_capacity(500);
1521
1522        unsafe {
1523            let ret_code = rados_tmap_get(
1524                self.ioctx,
1525                object_name_str.as_ptr(),
1526                buffer.as_mut_ptr() as *mut c_char,
1527                buffer.capacity(),
1528            );
1529            if ret_code == -ERANGE {
1530                buffer.reserve(1000);
1531                buffer.set_len(1000);
1532                let ret_code = rados_tmap_get(
1533                    self.ioctx,
1534                    object_name_str.as_ptr(),
1535                    buffer.as_mut_ptr() as *mut c_char,
1536                    buffer.capacity(),
1537                );
1538                if ret_code < 0 {
1539                    return Err(ret_code.into());
1540                }
1541            } else if ret_code < 0 {
1542                return Err(ret_code.into());
1543            }
1544        }
1545        match TmapOperation::deserialize(&buffer) {
1546            Ok((_, tmap)) => Ok(tmap),
1547            Err(nom::Err::Incomplete(needed)) => Err(RadosError::new(format!(
1548                "deserialize of ceph tmap failed.
1549            Input from Ceph was too small.  Needed: {:?} more bytes",
1550                needed
1551            ))),
1552            Err(nom::Err::Error(e)) => Err(RadosError::new(
1553                String::from_utf8_lossy(e.input).to_string(),
1554            )),
1555            Err(nom::Err::Failure(e)) => Err(RadosError::new(
1556                String::from_utf8_lossy(e.input).to_string(),
1557            )),
1558        }
1559    }
1560
1561    /// Execute an OSD class method on an object
1562    /// The OSD has a plugin mechanism for performing complicated operations on
1563    /// an object atomically.
1564    /// These plugins are called classes. This function allows librados users to
1565    /// call the custom
1566    /// methods. The input and output formats are defined by the class. Classes
1567    /// in ceph.git can
1568    /// be found in src/cls subdirectories
1569    pub fn rados_object_exec(
1570        &self,
1571        object_name: &str,
1572        class_name: &str,
1573        method_name: &str,
1574        input_buffer: &[u8],
1575        output_buffer: &mut [u8],
1576    ) -> RadosResult<()> {
1577        self.ioctx_guard()?;
1578        let object_name_str = CString::new(object_name)?;
1579        let class_name_str = CString::new(class_name)?;
1580        let method_name_str = CString::new(method_name)?;
1581
1582        unsafe {
1583            let ret_code = rados_exec(
1584                self.ioctx,
1585                object_name_str.as_ptr(),
1586                class_name_str.as_ptr(),
1587                method_name_str.as_ptr(),
1588                input_buffer.as_ptr() as *const c_char,
1589                input_buffer.len(),
1590                output_buffer.as_mut_ptr() as *mut c_char,
1591                output_buffer.len(),
1592            );
1593            if ret_code < 0 {
1594                return Err(ret_code.into());
1595            }
1596        }
1597        Ok(())
1598    }
1599
1600    /// Sychronously notify watchers of an object
1601    /// This blocks until all watchers of the object have received and reacted
1602    /// to the notify, or a timeout is reached.
1603    pub fn rados_object_notify(&self, object_name: &str, data: &[u8]) -> RadosResult<()> {
1604        self.ioctx_guard()?;
1605        let object_name_str = CString::new(object_name)?;
1606
1607        unsafe {
1608            let ret_code = rados_notify(
1609                self.ioctx,
1610                object_name_str.as_ptr(),
1611                0,
1612                data.as_ptr() as *const c_char,
1613                data.len() as i32,
1614            );
1615            if ret_code < 0 {
1616                return Err(ret_code.into());
1617            }
1618        }
1619        Ok(())
1620    }
1621    // pub fn rados_object_notify2(ctx: rados_ioctx_t, object_name: &str) ->
1622    // RadosResult<()> {
1623    // if ctx.is_null() {
1624    // return Err(RadosError::new("Rados ioctx not created.  Please initialize
1625    // first".to_string()));
1626    // }
1627    //
1628    // unsafe {
1629    // }
1630    // }
1631    //
1632    /// Acknolwedge receipt of a notify
1633    pub fn rados_object_notify_ack(
1634        &self,
1635        object_name: &str,
1636        notify_id: u64,
1637        cookie: u64,
1638        buffer: Option<&[u8]>,
1639    ) -> RadosResult<()> {
1640        self.ioctx_guard()?;
1641        let object_name_str = CString::new(object_name)?;
1642
1643        match buffer {
1644            Some(buf) => unsafe {
1645                let ret_code = rados_notify_ack(
1646                    self.ioctx,
1647                    object_name_str.as_ptr(),
1648                    notify_id,
1649                    cookie,
1650                    buf.as_ptr() as *const c_char,
1651                    buf.len() as i32,
1652                );
1653                if ret_code < 0 {
1654                    return Err(ret_code.into());
1655                }
1656            },
1657            None => unsafe {
1658                let ret_code = rados_notify_ack(
1659                    self.ioctx,
1660                    object_name_str.as_ptr(),
1661                    notify_id,
1662                    cookie,
1663                    ptr::null(),
1664                    0,
1665                );
1666                if ret_code < 0 {
1667                    return Err(ret_code.into());
1668                }
1669            },
1670        }
1671        Ok(())
1672    }
1673    /// Set allocation hint for an object
1674    /// This is an advisory operation, it will always succeed (as if it was
1675    /// submitted with a
1676    /// LIBRADOS_OP_FLAG_FAILOK flag set) and is not guaranteed to do anything
1677    /// on the backend.
1678    pub fn rados_object_set_alloc_hint(
1679        &self,
1680        object_name: &str,
1681        expected_object_size: u64,
1682        expected_write_size: u64,
1683    ) -> RadosResult<()> {
1684        self.ioctx_guard()?;
1685        let object_name_str = CString::new(object_name)?;
1686
1687        unsafe {
1688            let ret_code = rados_set_alloc_hint(
1689                self.ioctx,
1690                object_name_str.as_ptr(),
1691                expected_object_size,
1692                expected_write_size,
1693            );
1694            if ret_code < 0 {
1695                return Err(ret_code.into());
1696            }
1697        }
1698        Ok(())
1699    }
1700
1701    // Perform a compound read operation synchronously
1702    pub fn rados_perform_read_operations(&self, read_op: ReadOperation) -> RadosResult<()> {
1703        self.ioctx_guard()?;
1704        let object_name_str = CString::new(read_op.object_name.clone())?;
1705
1706        unsafe {
1707            let ret_code = rados_read_op_operate(
1708                read_op.read_op_handle,
1709                self.ioctx,
1710                object_name_str.as_ptr(),
1711                read_op.flags as i32,
1712            );
1713            if ret_code < 0 {
1714                return Err(ret_code.into());
1715            }
1716        }
1717        Ok(())
1718    }
1719
1720    // Perform a compound write operation synchronously
1721    pub fn rados_commit_write_operations(&self, write_op: &mut WriteOperation) -> RadosResult<()> {
1722        self.ioctx_guard()?;
1723        let object_name_str = CString::new(write_op.object_name.clone())?;
1724
1725        unsafe {
1726            let ret_code = rados_write_op_operate(
1727                write_op.write_op_handle,
1728                self.ioctx,
1729                object_name_str.as_ptr(),
1730                &mut write_op.mtime,
1731                write_op.flags as i32,
1732            );
1733            if ret_code < 0 {
1734                return Err(ret_code.into());
1735            }
1736        }
1737        Ok(())
1738    }
1739
1740    /// Take an exclusive lock on an object.
1741    pub fn rados_object_lock_exclusive(
1742        &self,
1743        object_name: &str,
1744        lock_name: &str,
1745        cookie_name: &str,
1746        description: &str,
1747        duration_time: &mut timeval,
1748        lock_flags: u8,
1749    ) -> RadosResult<()> {
1750        self.ioctx_guard()?;
1751        let object_name_str = CString::new(object_name)?;
1752        let lock_name_str = CString::new(lock_name)?;
1753        let cookie_name_str = CString::new(cookie_name)?;
1754        let description_str = CString::new(description)?;
1755
1756        unsafe {
1757            let ret_code = rados_lock_exclusive(
1758                self.ioctx,
1759                object_name_str.as_ptr(),
1760                lock_name_str.as_ptr(),
1761                cookie_name_str.as_ptr(),
1762                description_str.as_ptr(),
1763                duration_time,
1764                lock_flags,
1765            );
1766            if ret_code < 0 {
1767                return Err(ret_code.into());
1768            }
1769        }
1770        Ok(())
1771    }
1772
1773    /// Take a shared lock on an object.
1774    pub fn rados_object_lock_shared(
1775        &self,
1776        object_name: &str,
1777        lock_name: &str,
1778        cookie_name: &str,
1779        description: &str,
1780        tag_name: &str,
1781        duration_time: &mut timeval,
1782        lock_flags: u8,
1783    ) -> RadosResult<()> {
1784        self.ioctx_guard()?;
1785        let object_name_str = CString::new(object_name)?;
1786        let lock_name_str = CString::new(lock_name)?;
1787        let cookie_name_str = CString::new(cookie_name)?;
1788        let description_str = CString::new(description)?;
1789        let tag_name_str = CString::new(tag_name)?;
1790
1791        unsafe {
1792            let ret_code = rados_lock_shared(
1793                self.ioctx,
1794                object_name_str.as_ptr(),
1795                lock_name_str.as_ptr(),
1796                cookie_name_str.as_ptr(),
1797                tag_name_str.as_ptr(),
1798                description_str.as_ptr(),
1799                duration_time,
1800                lock_flags,
1801            );
1802            if ret_code < 0 {
1803                return Err(ret_code.into());
1804            }
1805        }
1806        Ok(())
1807    }
1808
1809    /// Release a shared or exclusive lock on an object.
1810    pub fn rados_object_unlock(
1811        &self,
1812        object_name: &str,
1813        lock_name: &str,
1814        cookie_name: &str,
1815    ) -> RadosResult<()> {
1816        self.ioctx_guard()?;
1817        let object_name_str = CString::new(object_name)?;
1818        let lock_name_str = CString::new(lock_name)?;
1819        let cookie_name_str = CString::new(cookie_name)?;
1820
1821        unsafe {
1822            let ret_code = rados_unlock(
1823                self.ioctx,
1824                object_name_str.as_ptr(),
1825                lock_name_str.as_ptr(),
1826                cookie_name_str.as_ptr(),
1827            );
1828            if ret_code < 0 {
1829                return Err(ret_code.into());
1830            }
1831        }
1832        Ok(())
1833    }
1834
1835    /// List clients that have locked the named object lock and information
1836    /// about the lock.
1837    /// The number of bytes required in each buffer is put in the corresponding
1838    /// size out parameter.
1839    /// If any of the provided buffers are too short, -ERANGE is returned after
1840    /// these sizes are filled in.
1841    // pub fn rados_object_list_lockers(ctx: rados_ioctx_t, object_name: &str,
1842    // lock_name: &str, exclusive: u8, ) ->
1843    // RadosResult<isize> {
1844    // if ctx.is_null() {
1845    // return Err(RadosError::new("Rados ioctx not created.  Please initialize
1846    // first".to_string()));
1847    // }
1848    // let object_name_str = try!(CString::new(object_name));
1849    //
1850    // unsafe {
1851    // let ret_code = rados_list_lockers(ctx,
1852    // o: *const ::libc::c_char,
1853    // name: *const ::libc::c_char,
1854    // exclusive: *mut ::libc::c_int,
1855    // tag: *mut ::libc::c_char,
1856    // tag_len: *mut size_t,
1857    // clients: *mut ::libc::c_char,
1858    // clients_len: *mut size_t,
1859    // cookies: *mut ::libc::c_char,
1860    // cookies_len: *mut size_t,
1861    // addrs: *mut ::libc::c_char,
1862    // addrs_len: *mut size_t);
1863    // }
1864    // }
1865    /// Releases a shared or exclusive lock on an object, which was taken by the
1866    /// specified client.
1867    pub fn rados_object_break_lock(
1868        &self,
1869        object_name: &str,
1870        lock_name: &str,
1871        client_name: &str,
1872        cookie_name: &str,
1873    ) -> RadosResult<()> {
1874        self.ioctx_guard()?;
1875        let object_name_str = CString::new(object_name)?;
1876        let lock_name_str = CString::new(lock_name)?;
1877        let cookie_name_str = CString::new(cookie_name)?;
1878        let client_name_str = CString::new(client_name)?;
1879
1880        unsafe {
1881            let ret_code = rados_break_lock(
1882                self.ioctx,
1883                object_name_str.as_ptr(),
1884                lock_name_str.as_ptr(),
1885                client_name_str.as_ptr(),
1886                cookie_name_str.as_ptr(),
1887            );
1888            if ret_code < 0 {
1889                return Err(ret_code.into());
1890            }
1891        }
1892        Ok(())
1893    }
1894
1895    /// Create a rados striper.
1896    /// For more details see rados_striper_t.
1897    #[cfg(feature = "rados_striper")]
1898    pub fn get_rados_striper(self) -> RadosResult<RadosStriper> {
1899        self.ioctx_guard()?;
1900        unsafe {
1901            let mut rados_striper: rados_striper_t = ptr::null_mut();
1902            let ret_code = rados_striper_create(self.ioctx, &mut rados_striper);
1903            if ret_code < 0 {
1904                return Err(ret_code.into());
1905            }
1906            Ok(RadosStriper { rados_striper })
1907        }
1908    }
1909}
1910
1911impl Rados {
1912    pub fn rados_blacklist_client(&self, client: IpAddr, expire_seconds: u32) -> RadosResult<()> {
1913        self.conn_guard()?;
1914        let client_address = CString::new(client.to_string())?;
1915        unsafe {
1916            let ret_code = rados_blacklist_add(
1917                self.rados,
1918                client_address.as_ptr() as *mut c_char,
1919                expire_seconds,
1920            );
1921
1922            if ret_code < 0 {
1923                return Err(ret_code.into());
1924            }
1925        }
1926        Ok(())
1927    }
1928
1929    /// Returns back a collection of Rados Pools
1930    ///
1931    /// pool_buffer should be allocated with:
1932    /// ```
1933    /// let capacity = 10;
1934    /// let pool_buffer: Vec<u8> = Vec::with_capacity(capacity);
1935    /// ```
1936    /// buf_size should be the value used with_capacity
1937    ///
1938    /// Returns Ok(Vec<String>) - A list of Strings of the pool names.
1939    #[allow(unused_variables)]
1940    pub fn rados_pools(&self) -> RadosResult<Vec<String>> {
1941        self.conn_guard()?;
1942        let mut pools: Vec<String> = Vec::new();
1943        let pool_slice: &[u8];
1944        let mut pool_buffer: Vec<u8> = Vec::with_capacity(500);
1945
1946        unsafe {
1947            let len = rados_pool_list(
1948                self.rados,
1949                pool_buffer.as_mut_ptr() as *mut c_char,
1950                pool_buffer.capacity(),
1951            );
1952            if len > pool_buffer.capacity() as i32 {
1953                // rados_pool_list requires more buffer than we gave it
1954                pool_buffer.reserve(len as usize);
1955                let len = rados_pool_list(
1956                    self.rados,
1957                    pool_buffer.as_mut_ptr() as *mut c_char,
1958                    pool_buffer.capacity(),
1959                );
1960                // Tell the Vec how much Ceph read into the buffer
1961                pool_buffer.set_len(len as usize);
1962            } else {
1963                // Tell the Vec how much Ceph read into the buffer
1964                pool_buffer.set_len(len as usize);
1965            }
1966        }
1967        let mut cursor = Cursor::new(&pool_buffer);
1968        loop {
1969            let mut string_buf: Vec<u8> = Vec::new();
1970            let read = cursor.read_until(0x00, &mut string_buf)?;
1971            // 0 End of the pool_buffer;
1972            // 1 Read a double \0.  Time to break
1973            if read == 0 || read == 1 {
1974                break;
1975            } else {
1976                // Read a String
1977                pools.push(String::from_utf8_lossy(&string_buf[..read - 1]).into_owned());
1978            }
1979        }
1980
1981        Ok(pools)
1982    }
1983
1984    /// Create a pool with default settings
1985    /// The default owner is the admin user (auid 0). The default crush rule is
1986    /// rule 0.
1987    pub fn rados_create_pool(&self, pool_name: &str) -> RadosResult<()> {
1988        self.conn_guard()?;
1989        let pool_name_str = CString::new(pool_name)?;
1990        unsafe {
1991            let ret_code = rados_pool_create(self.rados, pool_name_str.as_ptr());
1992            if ret_code < 0 {
1993                return Err(ret_code.into());
1994            }
1995        }
1996        Ok(())
1997    }
1998    /// Delete a pool and all data inside it
1999    /// The pool is removed from the cluster immediately, but the actual data is
2000    /// deleted in
2001    /// the background.
2002    pub fn rados_delete_pool(&self, pool_name: &str) -> RadosResult<()> {
2003        self.conn_guard()?;
2004        let pool_name_str = CString::new(pool_name)?;
2005        unsafe {
2006            let ret_code = rados_pool_delete(self.rados, pool_name_str.as_ptr());
2007            if ret_code < 0 {
2008                return Err(ret_code.into());
2009            }
2010        }
2011        Ok(())
2012    }
2013
2014    /// Lookup a Ceph pool id.  If the pool doesn't exist it will return
2015    /// Ok(None).
2016    pub fn rados_lookup_pool(&self, pool_name: &str) -> RadosResult<Option<i64>> {
2017        self.conn_guard()?;
2018        let pool_name_str = CString::new(pool_name)?;
2019        unsafe {
2020            let ret_code: i64 = rados_pool_lookup(self.rados, pool_name_str.as_ptr());
2021            if ret_code >= 0 {
2022                Ok(Some(ret_code))
2023            } else if ret_code as i32 == -ENOENT {
2024                Ok(None)
2025            } else {
2026                Err((ret_code as i32).into())
2027            }
2028        }
2029    }
2030
2031    pub fn rados_reverse_lookup_pool(&self, pool_id: i64) -> RadosResult<String> {
2032        self.conn_guard()?;
2033        let mut buffer: Vec<u8> = Vec::with_capacity(500);
2034
2035        unsafe {
2036            let ret_code = rados_pool_reverse_lookup(
2037                self.rados,
2038                pool_id,
2039                buffer.as_mut_ptr() as *mut c_char,
2040                buffer.capacity(),
2041            );
2042            if ret_code == -ERANGE {
2043                // Buffer was too small
2044                buffer.reserve(1000);
2045                buffer.set_len(1000);
2046                let ret_code = rados_pool_reverse_lookup(
2047                    self.rados,
2048                    pool_id,
2049                    buffer.as_mut_ptr() as *mut c_char,
2050                    buffer.capacity(),
2051                );
2052                if ret_code < 0 {
2053                    return Err(ret_code.into());
2054                }
2055                Ok(String::from_utf8_lossy(&buffer).into_owned())
2056            } else if ret_code < 0 {
2057                Err(ret_code.into())
2058            } else {
2059                Ok(String::from_utf8_lossy(&buffer).into_owned())
2060            }
2061        }
2062    }
2063}
2064
2065/// Get the version of librados.
2066pub fn rados_libversion() -> RadosVersion {
2067    let mut major: c_int = 0;
2068    let mut minor: c_int = 0;
2069    let mut extra: c_int = 0;
2070    unsafe {
2071        rados_version(&mut major, &mut minor, &mut extra);
2072    }
2073    RadosVersion {
2074        major,
2075        minor,
2076        extra,
2077    }
2078}
2079
2080impl Rados {
2081    /// Read usage info about the cluster
2082    /// This tells you total space, space used, space available, and number of
2083    /// objects.
2084    /// These are not updated immediately when data is written, they are
2085    /// eventually consistent.
2086    /// Note: Ceph uses kibibytes: https://en.wikipedia.org/wiki/Kibibyte
2087    pub fn rados_stat_cluster(&self) -> RadosResult<Struct_rados_cluster_stat_t> {
2088        self.conn_guard()?;
2089        let mut cluster_stat = Struct_rados_cluster_stat_t::default();
2090        unsafe {
2091            let ret_code = rados_cluster_stat(self.rados, &mut cluster_stat);
2092            if ret_code < 0 {
2093                return Err(ret_code.into());
2094            }
2095        }
2096
2097        Ok(cluster_stat)
2098    }
2099
2100    pub fn rados_fsid(&self) -> RadosResult<Uuid> {
2101        self.conn_guard()?;
2102        let mut fsid_buffer: Vec<u8> = Vec::with_capacity(37);
2103        unsafe {
2104            let ret_code = rados_cluster_fsid(
2105                self.rados,
2106                fsid_buffer.as_mut_ptr() as *mut c_char,
2107                fsid_buffer.capacity(),
2108            );
2109            if ret_code < 0 {
2110                return Err(ret_code.into());
2111            }
2112            // Tell the Vec how much Ceph read into the buffer
2113            fsid_buffer.set_len(ret_code as usize);
2114        }
2115        // Ceph actually returns the fsid as a uuid string
2116        let fsid_str = String::from_utf8(fsid_buffer)?;
2117        // Parse into a UUID and return
2118        Ok(fsid_str.parse()?)
2119    }
2120
2121    /// Ping a monitor to assess liveness
2122    /// May be used as a simply way to assess liveness, or to obtain
2123    /// information about the monitor in a simple way even in the
2124    /// absence of quorum.
2125    pub fn ping_monitor(&self, mon_id: &str) -> RadosResult<String> {
2126        self.conn_guard()?;
2127
2128        let mon_id_str = CString::new(mon_id)?;
2129        let mut out_str: *mut c_char = ptr::null_mut();
2130        let mut str_length: usize = 0;
2131        unsafe {
2132            let ret_code = rados_ping_monitor(
2133                self.rados,
2134                mon_id_str.as_ptr(),
2135                &mut out_str,
2136                &mut str_length,
2137            );
2138            if ret_code < 0 {
2139                return Err(ret_code.into());
2140            }
2141            if !out_str.is_null() {
2142                // valid string
2143                let s_bytes = std::slice::from_raw_parts(out_str, str_length);
2144                // Convert from i8 -> u8
2145                let bytes: Vec<u8> = s_bytes.iter().map(|c| *c as u8).collect();
2146                // Tell rados we're done with this buffer
2147                rados_buffer_free(out_str);
2148                Ok(String::from_utf8_lossy(&bytes).into_owned())
2149            } else {
2150                Ok("".into())
2151            }
2152        }
2153    }
2154}
2155
2156/// Ceph version - Ceph during the make release process generates the version
2157/// number along with
2158/// the github hash of the release and embeds the hard coded value into
2159/// `ceph.py` which is the
2160/// the default ceph utility.
2161pub fn ceph_version(socket: &str) -> Option<String> {
2162    let cmd = "version";
2163
2164    admin_socket_command(&cmd, socket).ok().and_then(|json| {
2165        json_data(&json)
2166            .and_then(|jsondata| json_find(jsondata, &[cmd]).map(|data| json_as_string(&data)))
2167    })
2168}
2169
2170/// This version call parses the `ceph -s` output. It does not need `sudo`
2171/// rights like
2172/// `ceph_version` does since it pulls from the admin socket.
2173pub fn ceph_version_parse() -> Option<String> {
2174    match run_cli("ceph --version") {
2175        Ok(output) => {
2176            let n = output.status.code().unwrap();
2177            if n == 0 {
2178                Some(String::from_utf8_lossy(&output.stdout).to_string())
2179            } else {
2180                Some(String::from_utf8_lossy(&output.stderr).to_string())
2181            }
2182        }
2183        Err(_) => None,
2184    }
2185}
2186
2187impl Rados {
2188    /// Only single String value
2189    pub fn ceph_status(&self, keys: &[&str]) -> RadosResult<String> {
2190        self.conn_guard()?;
2191        match self.ceph_mon_command("prefix", "status", Some("json")) {
2192            Ok((json, _)) => match json {
2193                Some(json) => match json_data(&json) {
2194                    Some(jsondata) => {
2195                        if let Some(data) = json_find(jsondata, keys) {
2196                            Ok(json_as_string(&data))
2197                        } else {
2198                            Err(RadosError::new(
2199                                "The attributes were not found in the output.".to_string(),
2200                            ))
2201                        }
2202                    }
2203                    _ => Err(RadosError::new("JSON data not found.".to_string())),
2204                },
2205                _ => Err(RadosError::new("JSON data not found.".to_string())),
2206            },
2207            Err(e) => Err(e),
2208        }
2209    }
2210
2211    /// string with the `health HEALTH_OK` or `HEALTH_WARN` or `HEALTH_ERR`
2212    /// which is also not efficient.
2213    pub fn ceph_health_string(&self) -> RadosResult<String> {
2214        self.conn_guard()?;
2215        match self.ceph_mon_command("prefix", "health", None) {
2216            Ok((data, _)) => Ok(data.unwrap().replace("\n", "")),
2217            Err(e) => Err(e),
2218        }
2219    }
2220
2221    /// Returns an enum value of:
2222    /// CephHealth::Ok
2223    /// CephHealth::Warning
2224    /// CephHealth::Error
2225    pub fn ceph_health(&self) -> CephHealth {
2226        match self.ceph_health_string() {
2227            Ok(health) => {
2228                if health.contains("HEALTH_OK") {
2229                    CephHealth::Ok
2230                } else if health.contains("HEALTH_WARN") {
2231                    CephHealth::Warning
2232                } else {
2233                    CephHealth::Error
2234                }
2235            }
2236            Err(_) => CephHealth::Error,
2237        }
2238    }
2239
2240    /// Higher level `ceph_command`
2241    pub fn ceph_command(
2242        &self,
2243        name: &str,
2244        value: &str,
2245        cmd_type: CephCommandTypes,
2246        keys: &[&str],
2247    ) -> RadosResult<JsonData> {
2248        self.conn_guard()?;
2249        match cmd_type {
2250            CephCommandTypes::Osd => Err(RadosError::new("OSD CMDs Not implemented.".to_string())),
2251            CephCommandTypes::Pgs => Err(RadosError::new("PGS CMDS Not implemented.".to_string())),
2252            _ => match self.ceph_mon_command(name, value, Some("json")) {
2253                Ok((json, _)) => match json {
2254                    Some(json) => match json_data(&json) {
2255                        Some(jsondata) => {
2256                            if let Some(data) = json_find(jsondata, keys) {
2257                                Ok(data)
2258                            } else {
2259                                Err(RadosError::new(
2260                                    "The attributes were not found in the output.".to_string(),
2261                                ))
2262                            }
2263                        }
2264                        _ => Err(RadosError::new("JSON data not found.".to_string())),
2265                    },
2266                    _ => Err(RadosError::new("JSON data not found.".to_string())),
2267                },
2268                Err(e) => Err(e),
2269            },
2270        }
2271    }
2272
2273    /// Returns the list of available commands
2274    pub fn ceph_commands(&self, keys: Option<&[&str]>) -> RadosResult<JsonData> {
2275        self.conn_guard()?;
2276        match self.ceph_mon_command("prefix", "get_command_descriptions", Some("json")) {
2277            Ok((json, _)) => match json {
2278                Some(json) => match json_data(&json) {
2279                    Some(jsondata) => {
2280                        if let Some(k) = keys {
2281                            if let Some(data) = json_find(jsondata, k) {
2282                                Ok(data)
2283                            } else {
2284                                Err(RadosError::new(
2285                                    "The attributes were not found in the output.".to_string(),
2286                                ))
2287                            }
2288                        } else {
2289                            Ok(jsondata)
2290                        }
2291                    }
2292                    _ => Err(RadosError::new("JSON data not found.".to_string())),
2293                },
2294                _ => Err(RadosError::new("JSON data not found.".to_string())),
2295            },
2296            Err(e) => Err(e),
2297        }
2298    }
2299
2300    /// Mon command that does not pass in a data payload.
2301    pub fn ceph_mon_command(
2302        &self,
2303        name: &str,
2304        value: &str,
2305        format: Option<&str>,
2306    ) -> RadosResult<(Option<String>, Option<String>)> {
2307        let data: Vec<*mut c_char> = Vec::with_capacity(1);
2308        self.ceph_mon_command_with_data(name, value, format, data)
2309    }
2310
2311    pub fn ceph_mon_command_without_data(
2312        &self,
2313        cmd: &serde_json::Value,
2314    ) -> RadosResult<(Vec<u8>, Option<String>)> {
2315        self.conn_guard()?;
2316        let cmd_string = cmd.to_string();
2317        debug!("ceph_mon_command_without_data: {}", cmd_string);
2318        let data: Vec<*mut c_char> = Vec::with_capacity(1);
2319        let cmds = CString::new(cmd_string).unwrap();
2320
2321        let mut outbuf_len = 0;
2322        let mut outs = ptr::null_mut();
2323        let mut outs_len = 0;
2324
2325        // Ceph librados allocates these buffers internally and the pointer that comes
2326        // back must be
2327        // freed by call `rados_buffer_free`
2328        let mut outbuf = ptr::null_mut();
2329        let mut out: Vec<u8> = vec![];
2330        let mut status_string: Option<String> = None;
2331
2332        debug!("Calling rados_mon_command with {:?}", cmd);
2333
2334        unsafe {
2335            // cmd length is 1 because we only allow one command at a time.
2336            let ret_code = rados_mon_command(
2337                self.rados,
2338                &mut cmds.as_ptr(),
2339                1,
2340                data.as_ptr() as *mut c_char,
2341                data.len() as usize,
2342                &mut outbuf,
2343                &mut outbuf_len,
2344                &mut outs,
2345                &mut outs_len,
2346            );
2347            debug!("return code: {}", ret_code);
2348            if ret_code < 0 {
2349                if outs_len > 0 && !outs.is_null() {
2350                    let slice = ::std::slice::from_raw_parts(outs as *const u8, outs_len as usize);
2351                    rados_buffer_free(outs);
2352                    return Err(RadosError::new(String::from_utf8_lossy(slice).into_owned()));
2353                }
2354                return Err(ret_code.into());
2355            }
2356
2357            // Copy the data from outbuf and then call rados_buffer_free instead libc::free
2358            if outbuf_len > 0 && !outbuf.is_null() {
2359                let slice = ::std::slice::from_raw_parts(outbuf as *const u8, outbuf_len as usize);
2360                out = slice.to_vec();
2361
2362                rados_buffer_free(outbuf);
2363            }
2364            if outs_len > 0 && !outs.is_null() {
2365                let slice = ::std::slice::from_raw_parts(outs as *const u8, outs_len as usize);
2366                status_string = Some(String::from_utf8(slice.to_vec())?);
2367                rados_buffer_free(outs);
2368            }
2369        }
2370
2371        Ok((out, status_string))
2372    }
2373
2374    /// Mon command that does pass in a data payload.
2375    /// Most all of the commands pass through this function.
2376    pub fn ceph_mon_command_with_data(
2377        &self,
2378        name: &str,
2379        value: &str,
2380        format: Option<&str>,
2381        data: Vec<*mut c_char>,
2382    ) -> RadosResult<(Option<String>, Option<String>)> {
2383        self.conn_guard()?;
2384
2385        let mut cmd_strings: Vec<String> = Vec::new();
2386        match format {
2387            Some(fmt) => cmd_strings.push(format!(
2388                "{{\"{}\": \"{}\", \"format\": \"{}\"}}",
2389                name, value, fmt
2390            )),
2391            None => cmd_strings.push(format!("{{\"{}\": \"{}\"}}", name, value)),
2392        }
2393
2394        let cstrings: Vec<CString> = cmd_strings[..]
2395            .iter()
2396            .map(|s| CString::new(s.clone()).unwrap())
2397            .collect();
2398        let mut cmds: Vec<*const c_char> = cstrings.iter().map(|c| c.as_ptr()).collect();
2399
2400        let mut outbuf = ptr::null_mut();
2401        let mut outs = ptr::null_mut();
2402        let mut outbuf_len = 0;
2403        let mut outs_len = 0;
2404
2405        // Ceph librados allocates these buffers internally and the pointer that comes
2406        // back must be
2407        // freed by call `rados_buffer_free`
2408        let mut str_outbuf: Option<String> = None;
2409        let mut str_outs: Option<String> = None;
2410
2411        debug!("Calling rados_mon_command with {:?}", cstrings);
2412
2413        unsafe {
2414            // cmd length is 1 because we only allow one command at a time.
2415            let ret_code = rados_mon_command(
2416                self.rados,
2417                cmds.as_mut_ptr(),
2418                1,
2419                data.as_ptr() as *mut c_char,
2420                data.len() as usize,
2421                &mut outbuf,
2422                &mut outbuf_len,
2423                &mut outs,
2424                &mut outs_len,
2425            );
2426            if ret_code < 0 {
2427                return Err(ret_code.into());
2428            }
2429
2430            // Copy the data from outbuf and then  call rados_buffer_free instead libc::free
2431            if outbuf_len > 0 {
2432                let c_str_outbuf: &CStr = CStr::from_ptr(outbuf);
2433                let buf_outbuf: &[u8] = c_str_outbuf.to_bytes();
2434                let str_slice_outbuf: &str = str::from_utf8(buf_outbuf).unwrap();
2435                str_outbuf = Some(str_slice_outbuf.to_owned());
2436
2437                rados_buffer_free(outbuf);
2438            }
2439
2440            if outs_len > 0 {
2441                let c_str_outs: &CStr = CStr::from_ptr(outs);
2442                let buf_outs: &[u8] = c_str_outs.to_bytes();
2443                let str_slice_outs: &str = str::from_utf8(buf_outs).unwrap();
2444                str_outs = Some(str_slice_outs.to_owned());
2445
2446                rados_buffer_free(outs);
2447            }
2448        }
2449
2450        Ok((str_outbuf, str_outs))
2451    }
2452
2453    /// OSD command that does not pass in a data payload.
2454    pub fn ceph_osd_command(
2455        &self,
2456        id: i32,
2457        name: &str,
2458        value: &str,
2459        format: Option<&str>,
2460    ) -> RadosResult<(Option<String>, Option<String>)> {
2461        let data: Vec<*mut c_char> = Vec::with_capacity(1);
2462        self.ceph_osd_command_with_data(id, name, value, format, data)
2463    }
2464
2465    /// OSD command that does pass in a data payload.
2466    pub fn ceph_osd_command_with_data(
2467        &self,
2468        id: i32,
2469        name: &str,
2470        value: &str,
2471        format: Option<&str>,
2472        data: Vec<*mut c_char>,
2473    ) -> RadosResult<(Option<String>, Option<String>)> {
2474        self.conn_guard()?;
2475
2476        let mut cmd_strings: Vec<String> = Vec::new();
2477        match format {
2478            Some(fmt) => cmd_strings.push(format!(
2479                "{{\"{}\": \"{}\", \"format\": \"{}\"}}",
2480                name, value, fmt
2481            )),
2482            None => cmd_strings.push(format!("{{\"{}\": \"{}\"}}", name, value)),
2483        }
2484
2485        let cstrings: Vec<CString> = cmd_strings[..]
2486            .iter()
2487            .map(|s| CString::new(s.clone()).unwrap())
2488            .collect();
2489        let mut cmds: Vec<*const c_char> = cstrings.iter().map(|c| c.as_ptr()).collect();
2490
2491        let mut outbuf = ptr::null_mut();
2492        let mut outs = ptr::null_mut();
2493        let mut outbuf_len = 0;
2494        let mut outs_len = 0;
2495
2496        // Ceph librados allocates these buffers internally and the pointer that comes
2497        // back must be
2498        // freed by call `rados_buffer_free`
2499        let mut str_outbuf: Option<String> = None;
2500        let mut str_outs: Option<String> = None;
2501
2502        unsafe {
2503            // cmd length is 1 because we only allow one command at a time.
2504            let ret_code = rados_osd_command(
2505                self.rados,
2506                id,
2507                cmds.as_mut_ptr(),
2508                1,
2509                data.as_ptr() as *mut c_char,
2510                data.len() as usize,
2511                &mut outbuf,
2512                &mut outbuf_len,
2513                &mut outs,
2514                &mut outs_len,
2515            );
2516            if ret_code < 0 {
2517                return Err(ret_code.into());
2518            }
2519
2520            // Copy the data from outbuf and then  call rados_buffer_free instead libc::free
2521            if outbuf_len > 0 {
2522                let c_str_outbuf: &CStr = CStr::from_ptr(outbuf);
2523                let buf_outbuf: &[u8] = c_str_outbuf.to_bytes();
2524                let str_slice_outbuf: &str = str::from_utf8(buf_outbuf).unwrap();
2525                str_outbuf = Some(str_slice_outbuf.to_owned());
2526
2527                rados_buffer_free(outbuf);
2528            }
2529
2530            if outs_len > 0 {
2531                let c_str_outs: &CStr = CStr::from_ptr(outs);
2532                let buf_outs: &[u8] = c_str_outs.to_bytes();
2533                let str_slice_outs: &str = str::from_utf8(buf_outs).unwrap();
2534                str_outs = Some(str_slice_outs.to_owned());
2535
2536                rados_buffer_free(outs);
2537            }
2538        }
2539
2540        Ok((str_outbuf, str_outs))
2541    }
2542
2543    /// PG command that does not pass in a data payload.
2544    pub fn ceph_pgs_command(
2545        &self,
2546        pg: &str,
2547        name: &str,
2548        value: &str,
2549        format: Option<&str>,
2550    ) -> RadosResult<(Option<String>, Option<String>)> {
2551        let data: Vec<*mut c_char> = Vec::with_capacity(1);
2552        self.ceph_pgs_command_with_data(pg, name, value, format, data)
2553    }
2554
2555    /// PG command that does pass in a data payload.
2556    pub fn ceph_pgs_command_with_data(
2557        &self,
2558        pg: &str,
2559        name: &str,
2560        value: &str,
2561        format: Option<&str>,
2562        data: Vec<*mut c_char>,
2563    ) -> RadosResult<(Option<String>, Option<String>)> {
2564        self.conn_guard()?;
2565
2566        let mut cmd_strings: Vec<String> = Vec::new();
2567        match format {
2568            Some(fmt) => cmd_strings.push(format!(
2569                "{{\"{}\": \"{}\", \"format\": \"{}\"}}",
2570                name, value, fmt
2571            )),
2572            None => cmd_strings.push(format!("{{\"{}\": \"{}\"}}", name, value)),
2573        }
2574
2575        let pg_str = CString::new(pg).unwrap();
2576        let cstrings: Vec<CString> = cmd_strings[..]
2577            .iter()
2578            .map(|s| CString::new(s.clone()).unwrap())
2579            .collect();
2580        let mut cmds: Vec<*const c_char> = cstrings.iter().map(|c| c.as_ptr()).collect();
2581
2582        let mut outbuf = ptr::null_mut();
2583        let mut outs = ptr::null_mut();
2584        let mut outbuf_len = 0;
2585        let mut outs_len = 0;
2586
2587        // Ceph librados allocates these buffers internally and the pointer that comes
2588        // back must be
2589        // freed by call `rados_buffer_free`
2590        let mut str_outbuf: Option<String> = None;
2591        let mut str_outs: Option<String> = None;
2592
2593        unsafe {
2594            // cmd length is 1 because we only allow one command at a time.
2595            let ret_code = rados_pg_command(
2596                self.rados,
2597                pg_str.as_ptr(),
2598                cmds.as_mut_ptr(),
2599                1,
2600                data.as_ptr() as *mut c_char,
2601                data.len() as usize,
2602                &mut outbuf,
2603                &mut outbuf_len,
2604                &mut outs,
2605                &mut outs_len,
2606            );
2607            if ret_code < 0 {
2608                return Err(ret_code.into());
2609            }
2610
2611            // Copy the data from outbuf and then  call rados_buffer_free instead libc::free
2612            if outbuf_len > 0 {
2613                let c_str_outbuf: &CStr = CStr::from_ptr(outbuf);
2614                let buf_outbuf: &[u8] = c_str_outbuf.to_bytes();
2615                let str_slice_outbuf: &str = str::from_utf8(buf_outbuf).unwrap();
2616                str_outbuf = Some(str_slice_outbuf.to_owned());
2617
2618                rados_buffer_free(outbuf);
2619            }
2620
2621            if outs_len > 0 {
2622                let c_str_outs: &CStr = CStr::from_ptr(outs);
2623                let buf_outs: &[u8] = c_str_outs.to_bytes();
2624                let str_slice_outs: &str = str::from_utf8(buf_outs).unwrap();
2625                str_outs = Some(str_slice_outs.to_owned());
2626
2627                rados_buffer_free(outs);
2628            }
2629        }
2630
2631        Ok((str_outbuf, str_outs))
2632    }
2633}
2634
2635#[cfg(feature = "rados_striper")]
2636impl RadosStriper {
2637    pub fn inner(&self) -> &rados_striper_t {
2638        &self.rados_striper
2639    }
2640
2641    /// This just tells librados that you no longer need to use the striper.
2642    pub fn destroy_rados_striper(&self) {
2643        if self.rados_striper.is_null() {
2644            // No need to do anything
2645            return;
2646        }
2647        unsafe {
2648            rados_striper_destroy(self.rados_striper);
2649        }
2650    }
2651
2652    fn rados_striper_guard(&self) -> RadosResult<()> {
2653        if self.rados_striper.is_null() {
2654            return Err(RadosError::new(
2655                "Rados striper not created. Please initialize first".to_string(),
2656            ));
2657        }
2658        Ok(())
2659    }
2660
2661    /// Write len bytes from buf into the oid object, starting at offset off.
2662    /// The value of len must be <= UINT_MAX/2.
2663    pub fn rados_object_write(
2664        &self,
2665        object_name: &str,
2666        buffer: &[u8],
2667        offset: u64,
2668    ) -> RadosResult<()> {
2669        self.rados_striper_guard()?;
2670        let obj_name_str = CString::new(object_name)?;
2671
2672        unsafe {
2673            let ret_code = rados_striper_write(
2674                self.rados_striper,
2675                obj_name_str.as_ptr(),
2676                buffer.as_ptr() as *const c_char,
2677                buffer.len(),
2678                offset,
2679            );
2680            if ret_code < 0 {
2681                return Err(ret_code.into());
2682            }
2683        }
2684        Ok(())
2685    }
2686
2687    /// The object is filled with the provided data. If the object exists, it is
2688    /// atomically
2689    /// truncated and then written.
2690    pub fn rados_object_write_full(&self, object_name: &str, buffer: &[u8]) -> RadosResult<()> {
2691        self.rados_striper_guard()?;
2692        let obj_name_str = CString::new(object_name)?;
2693
2694        unsafe {
2695            let ret_code = rados_striper_write_full(
2696                self.rados_striper,
2697                obj_name_str.as_ptr(),
2698                buffer.as_ptr() as *const ::libc::c_char,
2699                buffer.len(),
2700            );
2701            if ret_code < 0 {
2702                return Err(ret_code.into());
2703            }
2704        }
2705        Ok(())
2706    }
2707
2708    /// Append len bytes from buf into the oid object.
2709    pub fn rados_object_append(&self, object_name: &str, buffer: &[u8]) -> RadosResult<()> {
2710        self.rados_striper_guard()?;
2711        let obj_name_str = CString::new(object_name)?;
2712
2713        unsafe {
2714            let ret_code = rados_striper_append(
2715                self.rados_striper,
2716                obj_name_str.as_ptr(),
2717                buffer.as_ptr() as *const c_char,
2718                buffer.len(),
2719            );
2720            if ret_code < 0 {
2721                return Err(ret_code.into());
2722            }
2723        }
2724        Ok(())
2725    }
2726
2727    /// Read data from an object.  This fills the slice given and returns the
2728    /// amount of bytes read
2729    /// The io context determines the snapshot to read from, if any was set by
2730    /// rados_ioctx_snap_set_read().
2731    /// Default read size is 64K unless you call Vec::with_capacity(1024*128)
2732    /// with a larger size.
2733    pub fn rados_object_read(
2734        &self,
2735        object_name: &str,
2736        fill_buffer: &mut Vec<u8>,
2737        read_offset: u64,
2738    ) -> RadosResult<i32> {
2739        self.rados_striper_guard()?;
2740        let object_name_str = CString::new(object_name)?;
2741        let mut len = fill_buffer.capacity();
2742        if len == 0 {
2743            fill_buffer.reserve_exact(1024 * 64);
2744            len = fill_buffer.capacity();
2745        }
2746
2747        unsafe {
2748            let ret_code = rados_striper_read(
2749                self.rados_striper,
2750                object_name_str.as_ptr(),
2751                fill_buffer.as_mut_ptr() as *mut c_char,
2752                len,
2753                read_offset,
2754            );
2755            if ret_code < 0 {
2756                return Err(ret_code.into());
2757            }
2758            fill_buffer.set_len(ret_code as usize);
2759            Ok(ret_code)
2760        }
2761    }
2762
2763    /// Delete an object
2764    /// Note: This does not delete any snapshots of the object.
2765    pub fn rados_object_remove(&self, object_name: &str) -> RadosResult<()> {
2766        self.rados_striper_guard()?;
2767        let object_name_str = CString::new(object_name)?;
2768
2769        unsafe {
2770            let ret_code = rados_striper_remove(
2771                self.rados_striper,
2772                object_name_str.as_ptr() as *const c_char,
2773            );
2774            if ret_code < 0 {
2775                return Err(ret_code.into());
2776            }
2777        }
2778        Ok(())
2779    }
2780
2781    /// Resize an object
2782    /// If this enlarges the object, the new area is logically filled with
2783    /// zeroes. If this shrinks the object, the excess data is removed.
2784    pub fn rados_object_trunc(&self, object_name: &str, new_size: u64) -> RadosResult<()> {
2785        self.rados_striper_guard()?;
2786        let object_name_str = CString::new(object_name)?;
2787
2788        unsafe {
2789            let ret_code =
2790                rados_striper_trunc(self.rados_striper, object_name_str.as_ptr(), new_size);
2791            if ret_code < 0 {
2792                return Err(ret_code.into());
2793            }
2794        }
2795        Ok(())
2796    }
2797
2798    /// Get the value of an extended attribute on an object.
2799    pub fn rados_object_getxattr(
2800        &self,
2801        object_name: &str,
2802        attr_name: &str,
2803        fill_buffer: &mut [u8],
2804    ) -> RadosResult<i32> {
2805        self.rados_striper_guard()?;
2806        let object_name_str = CString::new(object_name)?;
2807        let attr_name_str = CString::new(attr_name)?;
2808
2809        unsafe {
2810            let ret_code = rados_striper_getxattr(
2811                self.rados_striper,
2812                object_name_str.as_ptr() as *const c_char,
2813                attr_name_str.as_ptr() as *const c_char,
2814                fill_buffer.as_mut_ptr() as *mut c_char,
2815                fill_buffer.len(),
2816            );
2817            if ret_code < 0 {
2818                return Err(ret_code.into());
2819            }
2820            Ok(ret_code)
2821        }
2822    }
2823
2824    /// Set an extended attribute on an object.
2825    pub fn rados_object_setxattr(
2826        &self,
2827        object_name: &str,
2828        attr_name: &str,
2829        attr_value: &mut [u8],
2830    ) -> RadosResult<()> {
2831        self.rados_striper_guard()?;
2832        let object_name_str = CString::new(object_name)?;
2833        let attr_name_str = CString::new(attr_name)?;
2834
2835        unsafe {
2836            let ret_code = rados_striper_setxattr(
2837                self.rados_striper,
2838                object_name_str.as_ptr() as *const c_char,
2839                attr_name_str.as_ptr() as *const c_char,
2840                attr_value.as_mut_ptr() as *mut c_char,
2841                attr_value.len(),
2842            );
2843            if ret_code < 0 {
2844                return Err(ret_code.into());
2845            }
2846        }
2847        Ok(())
2848    }
2849
2850    /// Delete an extended attribute from an object.
2851    pub fn rados_object_rmxattr(&self, object_name: &str, attr_name: &str) -> RadosResult<()> {
2852        self.rados_striper_guard()?;
2853        let object_name_str = CString::new(object_name)?;
2854        let attr_name_str = CString::new(attr_name)?;
2855
2856        unsafe {
2857            let ret_code = rados_striper_rmxattr(
2858                self.rados_striper,
2859                object_name_str.as_ptr() as *const c_char,
2860                attr_name_str.as_ptr() as *const c_char,
2861            );
2862            if ret_code < 0 {
2863                return Err(ret_code.into());
2864            }
2865        }
2866        Ok(())
2867    }
2868
2869    /// Get the rados_xattrs_iter_t reference to iterate over xattrs on an
2870    /// object Used in conjuction with XAttr::new() to iterate.
2871    pub fn rados_get_xattr_iterator(&self, object_name: &str) -> RadosResult<rados_xattrs_iter_t> {
2872        self.rados_striper_guard()?;
2873        let object_name_str = CString::new(object_name)?;
2874        let mut xattr_iterator_handle: rados_xattrs_iter_t = ptr::null_mut();
2875
2876        unsafe {
2877            let ret_code = rados_striper_getxattrs(
2878                self.rados_striper,
2879                object_name_str.as_ptr(),
2880                &mut xattr_iterator_handle,
2881            );
2882            if ret_code < 0 {
2883                return Err(ret_code.into());
2884            }
2885        }
2886        Ok(xattr_iterator_handle)
2887    }
2888
2889    /// Get object stats (size,SystemTime)
2890    pub fn rados_object_stat(&self, object_name: &str) -> RadosResult<(u64, SystemTime)> {
2891        self.rados_striper_guard()?;
2892        let object_name_str = CString::new(object_name)?;
2893        let mut psize: u64 = 0;
2894        let mut time: ::libc::time_t = 0;
2895
2896        unsafe {
2897            let ret_code = rados_striper_stat(
2898                self.rados_striper,
2899                object_name_str.as_ptr(),
2900                &mut psize,
2901                &mut time,
2902            );
2903            if ret_code < 0 {
2904                return Err(ret_code.into());
2905            }
2906        }
2907        Ok((psize, (UNIX_EPOCH + Duration::from_secs(time as u64))))
2908    }
2909}