ceph/
ceph.rs

1// Copyright 2017 LambdaStack All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14#![cfg(unix)]
15#![allow(unused_imports)]
16
17use crate::JsonData;
18
19use crate::admin_sockets::*;
20use crate::error::*;
21use crate::json::*;
22use crate::JsonValue;
23use byteorder::{LittleEndian, WriteBytesExt};
24use libc::*;
25use nom::number::complete::le_u32;
26use nom::IResult;
27use serde_json;
28
29use crate::rados::*;
30#[cfg(feature = "rados_striper")]
31use crate::rados_striper::*;
32use crate::status::*;
33use std::ffi::{CStr, CString};
34use std::marker::PhantomData;
35use std::{ptr, str};
36
37use crate::utils::*;
38use std::io::{BufRead, Cursor};
39use std::net::IpAddr;
40use std::time::{Duration, SystemTime, UNIX_EPOCH};
41
42use uuid::Uuid;
43
44const CEPH_OSD_TMAP_HDR: char = 'h';
45const CEPH_OSD_TMAP_SET: char = 's';
46const CEPH_OSD_TMAP_CREATE: char = 'c';
47const CEPH_OSD_TMAP_RM: char = 'r';
48
49#[derive(Debug, Clone)]
50pub enum CephHealth {
51    Ok,
52    Warning,
53    Error,
54}
55
56#[derive(Debug, Clone)]
57pub enum CephCommandTypes {
58    Mon,
59    Osd,
60    Pgs,
61}
62
63named!(
64    parse_header<TmapOperation>,
65    do_parse!(
66        char!(CEPH_OSD_TMAP_HDR)
67            >> data_len: le_u32
68            >> data: take!(data_len)
69            >> (TmapOperation::Header {
70                data: data.to_vec()
71            })
72    )
73);
74
75named!(
76    parse_create<TmapOperation>,
77    do_parse!(
78        char!(CEPH_OSD_TMAP_CREATE)
79            >> key_name_len: le_u32
80            >> key_name: take_str!(key_name_len)
81            >> data_len: le_u32
82            >> data: take!(data_len)
83            >> (TmapOperation::Create {
84                name: key_name.to_string(),
85                data: data.to_vec(),
86            })
87    )
88);
89
90named!(
91    parse_set<TmapOperation>,
92    do_parse!(
93        char!(CEPH_OSD_TMAP_SET)
94            >> key_name_len: le_u32
95            >> key_name: take_str!(key_name_len)
96            >> data_len: le_u32
97            >> data: take!(data_len)
98            >> (TmapOperation::Set {
99                key: key_name.to_string(),
100                data: data.to_vec(),
101            })
102    )
103);
104
105named!(
106    parse_remove<TmapOperation>,
107    do_parse!(
108        char!(CEPH_OSD_TMAP_RM)
109            >> key_name_len: le_u32
110            >> key_name: take_str!(key_name_len)
111            >> (TmapOperation::Remove {
112                name: key_name.to_string(),
113            })
114    )
115);
116
117#[derive(Debug)]
118pub enum TmapOperation {
119    Header { data: Vec<u8> },
120    Set { key: String, data: Vec<u8> },
121    Create { name: String, data: Vec<u8> },
122    Remove { name: String },
123}
124
125impl TmapOperation {
126    fn serialize(&self) -> RadosResult<Vec<u8>> {
127        let mut buffer: Vec<u8> = Vec::new();
128        match *self {
129            TmapOperation::Header { ref data } => {
130                buffer.push(CEPH_OSD_TMAP_HDR as u8);
131                buffer.write_u32::<LittleEndian>(data.len() as u32)?;
132                buffer.extend_from_slice(data);
133            }
134            TmapOperation::Set { ref key, ref data } => {
135                buffer.push(CEPH_OSD_TMAP_SET as u8);
136                buffer.write_u32::<LittleEndian>(key.len() as u32)?;
137                buffer.extend(key.as_bytes());
138                buffer.write_u32::<LittleEndian>(data.len() as u32)?;
139                buffer.extend_from_slice(data);
140            }
141            TmapOperation::Create { ref name, ref data } => {
142                buffer.push(CEPH_OSD_TMAP_CREATE as u8);
143                buffer.write_u32::<LittleEndian>(name.len() as u32)?;
144                buffer.extend(name.as_bytes());
145                buffer.write_u32::<LittleEndian>(data.len() as u32)?;
146                buffer.extend_from_slice(data);
147            }
148            TmapOperation::Remove { ref name } => {
149                buffer.push(CEPH_OSD_TMAP_RM as u8);
150                buffer.write_u32::<LittleEndian>(name.len() as u32)?;
151                buffer.extend(name.as_bytes());
152            }
153        }
154        Ok(buffer)
155    }
156
157    fn deserialize(input: &[u8]) -> IResult<&[u8], Vec<TmapOperation>> {
158        many0!(
159            input,
160            alt!(
161                complete!(parse_header)
162                    | complete!(parse_create)
163                    | complete!(parse_set)
164                    | complete!(parse_remove)
165            )
166        )
167    }
168}
169
170/// Helper to iterate over pool objects
171#[derive(Debug)]
172pub struct Pool {
173    pub ctx: rados_list_ctx_t,
174}
175
176#[derive(Debug)]
177pub struct CephObject {
178    pub name: String,
179    pub entry_locator: String,
180    pub namespace: String,
181}
182
183impl Iterator for Pool {
184    type Item = CephObject;
185    fn next(&mut self) -> Option<CephObject> {
186        let mut entry_ptr: *mut *const ::libc::c_char = ptr::null_mut();
187        let mut key_ptr: *mut *const ::libc::c_char = ptr::null_mut();
188        let mut nspace_ptr: *mut *const ::libc::c_char = ptr::null_mut();
189
190        unsafe {
191            let ret_code =
192                rados_nobjects_list_next(self.ctx, &mut entry_ptr, &mut key_ptr, &mut nspace_ptr);
193            if ret_code == -ENOENT {
194                // We're done
195                rados_nobjects_list_close(self.ctx);
196                None
197            } else if ret_code < 0 {
198                // Unknown error
199                None
200            } else {
201                let object_name = CStr::from_ptr(entry_ptr as *const ::libc::c_char);
202                let mut object_locator = String::new();
203                let mut namespace = String::new();
204                if !key_ptr.is_null() {
205                    object_locator.push_str(
206                        &CStr::from_ptr(key_ptr as *const ::libc::c_char).to_string_lossy(),
207                    );
208                }
209                if !nspace_ptr.is_null() {
210                    namespace.push_str(
211                        &CStr::from_ptr(nspace_ptr as *const ::libc::c_char).to_string_lossy(),
212                    );
213                }
214
215                Some(CephObject {
216                    name: object_name.to_string_lossy().into_owned(),
217                    entry_locator: object_locator,
218                    namespace,
219                })
220            }
221        }
222    }
223}
224
225/// A helper to create rados read operation
226/// An object read operation stores a number of operations which can be
227/// executed atomically.
228#[derive(Debug)]
229pub struct ReadOperation {
230    pub object_name: String,
231    /// flags are set by calling LIBRADOS_OPERATION_NOFLAG |
232    /// LIBRADOS_OPERATION_BALANCE_READS
233    /// all the other flags are documented in rados.rs
234    pub flags: u32,
235    read_op_handle: rados_read_op_t,
236}
237
238impl Drop for ReadOperation {
239    fn drop(&mut self) {
240        unsafe {
241            rados_release_read_op(self.read_op_handle);
242        }
243    }
244}
245
246/// A helper to create rados write operation
247/// An object write operation stores a number of operations which can be
248/// executed atomically.
249#[derive(Debug)]
250pub struct WriteOperation {
251    pub object_name: String,
252    /// flags are set by calling LIBRADOS_OPERATION_NOFLAG |
253    /// LIBRADOS_OPERATION_ORDER_READS_WRITES
254    /// all the other flags are documented in rados.rs
255    pub flags: u32,
256    pub mtime: time_t,
257    write_op_handle: rados_write_op_t,
258}
259
260impl Drop for WriteOperation {
261    fn drop(&mut self) {
262        unsafe {
263            rados_release_write_op(self.write_op_handle);
264        }
265    }
266}
267
268/// A rados object extended attribute with name and value.
269/// Can be iterated over
270#[derive(Debug)]
271pub struct XAttr {
272    pub name: String,
273    pub value: String,
274    iter: rados_xattrs_iter_t,
275}
276
277/// The version of the librados library.
278#[derive(Debug)]
279pub struct RadosVersion {
280    pub major: i32,
281    pub minor: i32,
282    pub extra: i32,
283}
284
285impl XAttr {
286    /// Creates a new XAttr.  Call rados_getxattrs to create the iterator for
287    /// this struct
288    pub fn new(iter: rados_xattrs_iter_t) -> XAttr {
289        XAttr {
290            name: String::new(),
291            value: String::new(),
292            iter,
293        }
294    }
295}
296
297impl Iterator for XAttr {
298    type Item = XAttr;
299
300    fn next(&mut self) -> Option<Self::Item> {
301        // max xattr name is 255 bytes from what I can find
302        let mut name: *const c_char = ptr::null();
303        // max xattr is 64Kb from what I can find
304        let mut value: *const c_char = ptr::null();
305        let mut val_length: usize = 0;
306        unsafe {
307            let ret_code = rados_getxattrs_next(self.iter, &mut name, &mut value, &mut val_length);
308
309            if ret_code < 0 {
310                // Something failed, however Iterator doesn't return Result so we return None
311                None
312            }
313            // end of iterator reached
314            else if value.is_null() && val_length == 0 {
315                rados_getxattrs_end(self.iter);
316                None
317            } else {
318                let name = CStr::from_ptr(name);
319                // value string
320                let s_bytes = std::slice::from_raw_parts(value, val_length);
321                // Convert from i8 -> u8
322                let bytes: Vec<u8> = s_bytes.iter().map(|c| *c as u8).collect();
323                Some(XAttr {
324                    name: name.to_string_lossy().into_owned(),
325                    value: String::from_utf8_lossy(&bytes).into_owned(),
326                    iter: self.iter,
327                })
328            }
329        }
330    }
331}
332
333/// Owns a ioctx handle
334pub struct IoCtx {
335    ioctx: rados_ioctx_t,
336}
337
338unsafe impl Send for IoCtx {}
339unsafe impl Sync for IoCtx {}
340
341impl Drop for IoCtx {
342    fn drop(&mut self) {
343        if !self.ioctx.is_null() {
344            unsafe {
345                rados_ioctx_destroy(self.ioctx);
346            }
347        }
348    }
349}
350
351/// Owns a rados_striper handle
352#[cfg(feature = "rados_striper")]
353pub struct RadosStriper {
354    rados_striper: rados_ioctx_t,
355}
356
357#[cfg(feature = "rados_striper")]
358impl Drop for RadosStriper {
359    fn drop(&mut self) {
360        if !self.rados_striper.is_null() {
361            unsafe {
362                rados_striper_destroy(self.rados_striper);
363            }
364        }
365    }
366}
367
368/// Owns a rados handle
369pub struct Rados {
370    rados: rados_t,
371    phantom: PhantomData<IoCtx>,
372}
373
374unsafe impl Sync for Rados {}
375
376impl Drop for Rados {
377    fn drop(&mut self) {
378        if !self.rados.is_null() {
379            unsafe {
380                rados_shutdown(self.rados);
381            }
382        }
383    }
384}
385
386/// Connect to a Ceph cluster and return a connection handle rados_t
387pub fn connect_to_ceph(user_id: &str, config_file: &str) -> RadosResult<Rados> {
388    let connect_id = CString::new(user_id)?;
389    let conf_file = CString::new(config_file)?;
390    unsafe {
391        let mut cluster_handle: rados_t = ptr::null_mut();
392        let ret_code = rados_create(&mut cluster_handle, connect_id.as_ptr());
393        if ret_code < 0 {
394            return Err(ret_code.into());
395        }
396        let ret_code = rados_conf_read_file(cluster_handle, conf_file.as_ptr());
397        if ret_code < 0 {
398            return Err(ret_code.into());
399        }
400        let ret_code = rados_connect(cluster_handle);
401        if ret_code < 0 {
402            return Err(ret_code.into());
403        }
404        Ok(Rados {
405            rados: cluster_handle,
406            phantom: PhantomData,
407        })
408    }
409}
410
411impl Rados {
412    pub fn inner(&self) -> &rados_t {
413        &self.rados
414    }
415
416    /// Disconnect from a Ceph cluster and destroy the connection handle rados_t
417    /// For clean up, this is only necessary after connect_to_ceph() has
418    /// succeeded.
419    pub fn disconnect_from_ceph(&self) {
420        if self.rados.is_null() {
421            // No need to do anything
422            return;
423        }
424        unsafe {
425            rados_shutdown(self.rados);
426        }
427    }
428
429    fn conn_guard(&self) -> RadosResult<()> {
430        if self.rados.is_null() {
431            return Err(RadosError::new(
432                "Rados not connected.  Please initialize cluster".to_string(),
433            ));
434        }
435        Ok(())
436    }
437
438    /// Set the value of a configuration option
439    pub fn config_set(&self, name: &str, value: &str) -> RadosResult<()> {
440        if !self.rados.is_null() {
441            return Err(RadosError::new(
442                "Rados should not be connected when this function is called".into(),
443            ));
444        }
445        let name_str = CString::new(name)?;
446        let value_str = CString::new(value)?;
447        unsafe {
448            let ret_code = rados_conf_set(self.rados, name_str.as_ptr(), value_str.as_ptr());
449            if ret_code < 0 {
450                return Err(ret_code.into());
451            }
452        }
453        Ok(())
454    }
455
456    /// Get the value of a configuration option
457    pub fn config_get(&self, name: &str) -> RadosResult<String> {
458        let name_str = CString::new(name)?;
459        // 5K should be plenty for a config key right?
460        let mut buffer: Vec<u8> = Vec::with_capacity(5120);
461        unsafe {
462            let ret_code = rados_conf_get(
463                self.rados,
464                name_str.as_ptr(),
465                buffer.as_mut_ptr() as *mut c_char,
466                buffer.capacity(),
467            );
468            if ret_code < 0 {
469                return Err(ret_code.into());
470            }
471            // Ceph doesn't return how many bytes were written
472            buffer.set_len(5120);
473            // We need to search for the first NUL byte
474            let num_bytes = buffer.iter().position(|x| x == &0u8);
475            buffer.set_len(num_bytes.unwrap_or(0));
476            Ok(String::from_utf8_lossy(&buffer).into_owned())
477        }
478    }
479
480    /// Create an io context. The io context allows you to perform operations
481    /// within a particular pool.
482    /// For more details see rados_ioctx_t.
483    pub fn get_rados_ioctx(&self, pool_name: &str) -> RadosResult<IoCtx> {
484        self.conn_guard()?;
485        let pool_name_str = CString::new(pool_name)?;
486        unsafe {
487            let mut ioctx: rados_ioctx_t = ptr::null_mut();
488            let ret_code = rados_ioctx_create(self.rados, pool_name_str.as_ptr(), &mut ioctx);
489            if ret_code < 0 {
490                return Err(ret_code.into());
491            }
492            Ok(IoCtx { ioctx })
493        }
494    }
495
496    /// Create an io context. The io context allows you to perform operations
497    /// within a particular pool.
498    /// For more details see rados_ioctx_t.
499    pub fn get_rados_ioctx2(&self, pool_id: i64) -> RadosResult<IoCtx> {
500        self.conn_guard()?;
501        unsafe {
502            let mut ioctx: rados_ioctx_t = ptr::null_mut();
503            let ret_code = rados_ioctx_create2(self.rados, pool_id, &mut ioctx);
504            if ret_code < 0 {
505                return Err(ret_code.into());
506            }
507            Ok(IoCtx { ioctx })
508        }
509    }
510}
511
512impl IoCtx {
513    pub fn inner(&self) -> &rados_ioctx_t {
514        &self.ioctx
515    }
516
517    /// This just tells librados that you no longer need to use the io context.
518    /// It may not be freed immediately if there are pending asynchronous
519    /// requests on it, but you
520    /// should not use an io context again after calling this function on it.
521    /// This does not guarantee any asynchronous writes have completed. You must
522    /// call rados_aio_flush()
523    /// on the io context before destroying it to do that.
524    pub fn destroy_rados_ioctx(&self) {
525        if self.ioctx.is_null() {
526            // No need to do anything
527            return;
528        }
529        unsafe {
530            rados_ioctx_destroy(self.ioctx);
531        }
532    }
533    fn ioctx_guard(&self) -> RadosResult<()> {
534        if self.ioctx.is_null() {
535            return Err(RadosError::new(
536                "Rados ioctx not created.  Please initialize first".to_string(),
537            ));
538        }
539        Ok(())
540    }
541    /// Note: Ceph uses kibibytes: https://en.wikipedia.org/wiki/Kibibyte
542    pub fn rados_stat_pool(&self) -> RadosResult<Struct_rados_pool_stat_t> {
543        self.ioctx_guard()?;
544        let mut pool_stat = Struct_rados_pool_stat_t::default();
545        unsafe {
546            let ret_code = rados_ioctx_pool_stat(self.ioctx, &mut pool_stat);
547            if ret_code < 0 {
548                return Err(ret_code.into());
549            }
550            Ok(pool_stat)
551        }
552    }
553
554    pub fn rados_pool_set_auid(&self, auid: u64) -> RadosResult<()> {
555        self.ioctx_guard()?;
556        unsafe {
557            let ret_code = rados_ioctx_pool_set_auid(self.ioctx, auid);
558            if ret_code < 0 {
559                return Err(ret_code.into());
560            }
561            Ok(())
562        }
563    }
564
565    pub fn rados_pool_get_auid(&self) -> RadosResult<u64> {
566        self.ioctx_guard()?;
567        let mut auid: u64 = 0;
568        unsafe {
569            let ret_code = rados_ioctx_pool_get_auid(self.ioctx, &mut auid);
570            if ret_code < 0 {
571                return Err(ret_code.into());
572            }
573            Ok(auid)
574        }
575    }
576
577    /// Test whether the specified pool requires alignment or not.
578    pub fn rados_pool_requires_alignment(&self) -> RadosResult<bool> {
579        self.ioctx_guard()?;
580        unsafe {
581            let ret_code = rados_ioctx_pool_requires_alignment(self.ioctx);
582            if ret_code < 0 {
583                return Err(ret_code.into());
584            }
585            if ret_code == 0 {
586                Ok(false)
587            } else {
588                Ok(true)
589            }
590        }
591    }
592
593    /// Get the alignment flavor of a pool
594    pub fn rados_pool_required_alignment(&self) -> RadosResult<u64> {
595        self.ioctx_guard()?;
596        unsafe {
597            let ret_code = rados_ioctx_pool_required_alignment(self.ioctx);
598            Ok(ret_code)
599        }
600    }
601
602    /// Get the pool id of the io context
603    pub fn rados_object_get_id(&self) -> RadosResult<i64> {
604        self.ioctx_guard()?;
605        unsafe {
606            let pool_id = rados_ioctx_get_id(self.ioctx);
607            Ok(pool_id)
608        }
609    }
610
611    /// Get the pool name of the io context
612    pub fn rados_get_pool_name(&self) -> RadosResult<String> {
613        self.ioctx_guard()?;
614        let mut buffer: Vec<u8> = Vec::with_capacity(500);
615
616        unsafe {
617            // length of string stored, or -ERANGE if buffer too small
618            let ret_code = rados_ioctx_get_pool_name(
619                self.ioctx,
620                buffer.as_mut_ptr() as *mut c_char,
621                buffer.capacity() as c_uint,
622            );
623            if ret_code == -ERANGE {
624                // Buffer was too small
625                buffer.reserve(1000);
626                buffer.set_len(1000);
627                let ret_code = rados_ioctx_get_pool_name(
628                    self.ioctx,
629                    buffer.as_mut_ptr() as *mut c_char,
630                    buffer.capacity() as c_uint,
631                );
632                if ret_code < 0 {
633                    return Err(ret_code.into());
634                }
635                Ok(String::from_utf8_lossy(&buffer).into_owned())
636            } else if ret_code < 0 {
637                Err(ret_code.into())
638            } else {
639                buffer.set_len(ret_code as usize);
640                Ok(String::from_utf8_lossy(&buffer).into_owned())
641            }
642        }
643    }
644
645    /// Set the key for mapping objects to pgs within an io context.
646    pub fn rados_locator_set_key(&self, key: &str) -> RadosResult<()> {
647        self.ioctx_guard()?;
648        let key_str = CString::new(key)?;
649        unsafe {
650            rados_ioctx_locator_set_key(self.ioctx, key_str.as_ptr());
651        }
652        Ok(())
653    }
654
655    /// Set the namespace for objects within an io context
656    /// The namespace specification further refines a pool into different
657    /// domains. The mapping of objects to pgs is also based on this value.
658    pub fn rados_set_namespace(&self, namespace: &str) -> RadosResult<()> {
659        self.ioctx_guard()?;
660        let namespace_str = CString::new(namespace)?;
661        unsafe {
662            rados_ioctx_set_namespace(self.ioctx, namespace_str.as_ptr());
663        }
664        Ok(())
665    }
666
667    /// Start listing objects in a pool
668    pub fn rados_list_pool_objects(&self) -> RadosResult<rados_list_ctx_t> {
669        self.ioctx_guard()?;
670        let mut rados_list_ctx: rados_list_ctx_t = ptr::null_mut();
671        unsafe {
672            let ret_code = rados_nobjects_list_open(self.ioctx, &mut rados_list_ctx);
673            if ret_code < 0 {
674                return Err(ret_code.into());
675            }
676        }
677        Ok(rados_list_ctx)
678    }
679
680    /// Create a pool-wide snapshot
681    pub fn rados_snap_create(&self, snap_name: &str) -> RadosResult<()> {
682        self.ioctx_guard()?;
683
684        let snap_name_str = CString::new(snap_name)?;
685        unsafe {
686            let ret_code = rados_ioctx_snap_create(self.ioctx, snap_name_str.as_ptr());
687            if ret_code < 0 {
688                return Err(ret_code.into());
689            }
690        }
691        Ok(())
692    }
693
694    /// Delete a pool snapshot
695    pub fn rados_snap_remove(&self, snap_name: &str) -> RadosResult<()> {
696        self.ioctx_guard()?;
697        let snap_name_str = CString::new(snap_name)?;
698
699        unsafe {
700            let ret_code = rados_ioctx_snap_remove(self.ioctx, snap_name_str.as_ptr());
701            if ret_code < 0 {
702                return Err(ret_code.into());
703            }
704        }
705        Ok(())
706    }
707
708    /// Rollback an object to a pool snapshot
709    /// The contents of the object will be the same as when the snapshot was
710    /// taken.
711    pub fn rados_snap_rollback(&self, object_name: &str, snap_name: &str) -> RadosResult<()> {
712        self.ioctx_guard()?;
713        let snap_name_str = CString::new(snap_name)?;
714        let object_name_str = CString::new(object_name)?;
715
716        unsafe {
717            let ret_code = rados_ioctx_snap_rollback(
718                self.ioctx,
719                object_name_str.as_ptr(),
720                snap_name_str.as_ptr(),
721            );
722            if ret_code < 0 {
723                return Err(ret_code.into());
724            }
725        }
726        Ok(())
727    }
728
729    /// Set the snapshot from which reads are performed.
730    /// Subsequent reads will return data as it was at the time of that
731    /// snapshot.
732    pub fn rados_snap_set_read(&self, snap_id: u64) -> RadosResult<()> {
733        self.ioctx_guard()?;
734
735        unsafe {
736            rados_ioctx_snap_set_read(self.ioctx, snap_id);
737        }
738        Ok(())
739    }
740
741    /// Allocate an ID for a self-managed snapshot
742    /// Get a unique ID to put in the snaphot context to create a snapshot.
743    /// A clone of an object is not created until a write with the new snapshot
744    /// context is completed.
745    pub fn rados_selfmanaged_snap_create(&self) -> RadosResult<u64> {
746        self.ioctx_guard()?;
747        let mut snap_id: u64 = 0;
748        unsafe {
749            let ret_code = rados_ioctx_selfmanaged_snap_create(self.ioctx, &mut snap_id);
750            if ret_code < 0 {
751                return Err(ret_code.into());
752            }
753        }
754        Ok(snap_id)
755    }
756
757    /// Remove a self-managed snapshot
758    /// This increases the snapshot sequence number, which will cause snapshots
759    /// to be removed lazily.
760    pub fn rados_selfmanaged_snap_remove(&self, snap_id: u64) -> RadosResult<()> {
761        self.ioctx_guard()?;
762
763        unsafe {
764            let ret_code = rados_ioctx_selfmanaged_snap_remove(self.ioctx, snap_id);
765            if ret_code < 0 {
766                return Err(ret_code.into());
767            }
768        }
769        Ok(())
770    }
771
772    /// Rollback an object to a self-managed snapshot
773    /// The contents of the object will be the same as when the snapshot was
774    /// taken.
775    pub fn rados_selfmanaged_snap_rollback(
776        &self,
777        object_name: &str,
778        snap_id: u64,
779    ) -> RadosResult<()> {
780        self.ioctx_guard()?;
781        let object_name_str = CString::new(object_name)?;
782
783        unsafe {
784            let ret_code = rados_ioctx_selfmanaged_snap_rollback(
785                self.ioctx,
786                object_name_str.as_ptr(),
787                snap_id,
788            );
789            if ret_code < 0 {
790                return Err(ret_code.into());
791            }
792        }
793        Ok(())
794    }
795
796    /// Set the snapshot context for use when writing to objects
797    /// This is stored in the io context, and applies to all future writes.
798    // pub fn rados_selfmanaged_snap_set_write_ctx(ctx: rados_ioctx_t) ->
799    // RadosResult<()> {
800    // if ctx.is_null() {
801    // return Err(RadosError::new("Rados ioctx not created.  Please initialize
802    // first".to_string()));
803    // }
804    //
805    // unsafe {
806    // }
807    // }
808    /// List all the ids of pool snapshots
809    // pub fn rados_snap_list(ctx: rados_ioctx_t, snaps: *mut rados_snap_t) ->
810    // RadosResult<()> {
811    // if ctx.is_null() {
812    // return Err(RadosError::new("Rados ioctx not created.  Please initialize
813    // first".to_string()));
814    // }
815    // let mut buffer: Vec<u64> = Vec::with_capacity(500);
816    //
817    //
818    // unsafe {
819    // let ret_code = rados_ioctx_snap_list(ctx, &mut buffer, buffer.capacity());
820    // if ret_code == -ERANGE {
821    // }
822    // if ret_code < 0 {
823    // return Err(ret_code.into());
824    // }
825    // }
826    // Ok(buffer)
827    // }
828    /// Get the id of a pool snapshot
829    pub fn rados_snap_lookup(&self, snap_name: &str) -> RadosResult<u64> {
830        self.ioctx_guard()?;
831        let snap_name_str = CString::new(snap_name)?;
832        let mut snap_id: u64 = 0;
833        unsafe {
834            let ret_code =
835                rados_ioctx_snap_lookup(self.ioctx, snap_name_str.as_ptr(), &mut snap_id);
836            if ret_code < 0 {
837                return Err(ret_code.into());
838            }
839        }
840        Ok(snap_id)
841    }
842
843    /// Get the name of a pool snapshot
844    pub fn rados_snap_get_name(&self, snap_id: u64) -> RadosResult<String> {
845        self.ioctx_guard()?;
846
847        let out_buffer: Vec<u8> = Vec::with_capacity(500);
848        let out_buff_size = out_buffer.capacity();
849        let out_str = CString::new(out_buffer)?;
850        unsafe {
851            let ret_code = rados_ioctx_snap_get_name(
852                self.ioctx,
853                snap_id,
854                out_str.as_ptr() as *mut c_char,
855                out_buff_size as c_int,
856            );
857            if ret_code == -ERANGE {}
858            if ret_code < 0 {
859                return Err(ret_code.into());
860            }
861        }
862        Ok(out_str.to_string_lossy().into_owned())
863    }
864
865    /// Find when a pool snapshot occurred
866    pub fn rados_snap_get_stamp(&self, snap_id: u64) -> RadosResult<time_t> {
867        self.ioctx_guard()?;
868
869        let mut time_id: time_t = 0;
870        unsafe {
871            let ret_code = rados_ioctx_snap_get_stamp(self.ioctx, snap_id, &mut time_id);
872            if ret_code < 0 {
873                return Err(ret_code.into());
874            }
875        }
876        Ok(time_id)
877    }
878
879    /// Return the version of the last object read or written to.
880    /// This exposes the internal version number of the last object read or
881    /// written via this io context
882    pub fn rados_get_object_last_version(&self) -> RadosResult<u64> {
883        self.ioctx_guard()?;
884        unsafe {
885            let obj_id = rados_get_last_version(self.ioctx);
886            Ok(obj_id)
887        }
888    }
889
890    /// Write len bytes from buf into the oid object, starting at offset off.
891    /// The value of len must be <= UINT_MAX/2.
892    pub fn rados_object_write(
893        &self,
894        object_name: &str,
895        buffer: &[u8],
896        offset: u64,
897    ) -> RadosResult<()> {
898        self.ioctx_guard()?;
899        let obj_name_str = CString::new(object_name)?;
900
901        unsafe {
902            let ret_code = rados_write(
903                self.ioctx,
904                obj_name_str.as_ptr(),
905                buffer.as_ptr() as *const c_char,
906                buffer.len(),
907                offset,
908            );
909            if ret_code < 0 {
910                return Err(ret_code.into());
911            }
912        }
913        Ok(())
914    }
915
916    /// The object is filled with the provided data. If the object exists, it is
917    /// atomically
918    /// truncated and then written.
919    pub fn rados_object_write_full(&self, object_name: &str, buffer: &[u8]) -> RadosResult<()> {
920        self.ioctx_guard()?;
921        let obj_name_str = CString::new(object_name)?;
922
923        unsafe {
924            let ret_code = rados_write_full(
925                self.ioctx,
926                obj_name_str.as_ptr(),
927                buffer.as_ptr() as *const ::libc::c_char,
928                buffer.len(),
929            );
930            if ret_code < 0 {
931                return Err(ret_code.into());
932            }
933        }
934        Ok(())
935    }
936
937    /// Efficiently copy a portion of one object to another
938    /// If the underlying filesystem on the OSD supports it, this will be a
939    /// copy-on-write clone.
940    /// The src and dest objects must be in the same pg. To ensure this, the io
941    /// context should
942    /// have a locator key set (see rados_ioctx_locator_set_key()).
943    pub fn rados_object_clone_range(
944        &self,
945        dst_object_name: &str,
946        dst_offset: u64,
947        src_object_name: &str,
948        src_offset: u64,
949        length: usize,
950    ) -> RadosResult<()> {
951        self.ioctx_guard()?;
952        let dst_name_str = CString::new(dst_object_name)?;
953        let src_name_str = CString::new(src_object_name)?;
954
955        unsafe {
956            let ret_code = rados_clone_range(
957                self.ioctx,
958                dst_name_str.as_ptr(),
959                dst_offset,
960                src_name_str.as_ptr(),
961                src_offset,
962                length,
963            );
964            if ret_code < 0 {
965                return Err(ret_code.into());
966            }
967        }
968        Ok(())
969    }
970
971    /// Append len bytes from buf into the oid object.
972    pub fn rados_object_append(&self, object_name: &str, buffer: &[u8]) -> RadosResult<()> {
973        self.ioctx_guard()?;
974        let obj_name_str = CString::new(object_name)?;
975
976        unsafe {
977            let ret_code = rados_append(
978                self.ioctx,
979                obj_name_str.as_ptr(),
980                buffer.as_ptr() as *const c_char,
981                buffer.len(),
982            );
983            if ret_code < 0 {
984                return Err(ret_code.into());
985            }
986        }
987        Ok(())
988    }
989
990    /// Read data from an object.  This fills the slice given and returns the
991    /// amount of bytes read
992    /// The io context determines the snapshot to read from, if any was set by
993    /// rados_ioctx_snap_set_read().
994    /// Default read size is 64K unless you call Vec::with_capacity(1024*128)
995    /// with a larger size.
996    pub fn rados_object_read(
997        &self,
998        object_name: &str,
999        fill_buffer: &mut Vec<u8>,
1000        read_offset: u64,
1001    ) -> RadosResult<i32> {
1002        self.ioctx_guard()?;
1003        let object_name_str = CString::new(object_name)?;
1004        let mut len = fill_buffer.capacity();
1005        if len == 0 {
1006            fill_buffer.reserve_exact(1024 * 64);
1007            len = fill_buffer.capacity();
1008        }
1009
1010        unsafe {
1011            let ret_code = rados_read(
1012                self.ioctx,
1013                object_name_str.as_ptr(),
1014                fill_buffer.as_mut_ptr() as *mut c_char,
1015                len,
1016                read_offset,
1017            );
1018            if ret_code < 0 {
1019                return Err(ret_code.into());
1020            }
1021            fill_buffer.set_len(ret_code as usize);
1022            Ok(ret_code)
1023        }
1024    }
1025
1026    /// Delete an object
1027    /// Note: This does not delete any snapshots of the object.
1028    pub fn rados_object_remove(&self, object_name: &str) -> RadosResult<()> {
1029        self.ioctx_guard()?;
1030        let object_name_str = CString::new(object_name)?;
1031
1032        unsafe {
1033            let ret_code = rados_remove(self.ioctx, object_name_str.as_ptr() as *const c_char);
1034            if ret_code < 0 {
1035                return Err(ret_code.into());
1036            }
1037        }
1038        Ok(())
1039    }
1040
1041    /// Resize an object
1042    /// If this enlarges the object, the new area is logically filled with
1043    /// zeroes. If this shrinks the object, the excess data is removed.
1044    pub fn rados_object_trunc(&self, object_name: &str, new_size: u64) -> RadosResult<()> {
1045        self.ioctx_guard()?;
1046        let object_name_str = CString::new(object_name)?;
1047
1048        unsafe {
1049            let ret_code = rados_trunc(self.ioctx, object_name_str.as_ptr(), new_size);
1050            if ret_code < 0 {
1051                return Err(ret_code.into());
1052            }
1053        }
1054        Ok(())
1055    }
1056
1057    /// Get the value of an extended attribute on an object.
1058    pub fn rados_object_getxattr(
1059        &self,
1060        object_name: &str,
1061        attr_name: &str,
1062        fill_buffer: &mut [u8],
1063    ) -> RadosResult<i32> {
1064        self.ioctx_guard()?;
1065        let object_name_str = CString::new(object_name)?;
1066        let attr_name_str = CString::new(attr_name)?;
1067
1068        unsafe {
1069            let ret_code = rados_getxattr(
1070                self.ioctx,
1071                object_name_str.as_ptr() as *const c_char,
1072                attr_name_str.as_ptr() as *const c_char,
1073                fill_buffer.as_mut_ptr() as *mut c_char,
1074                fill_buffer.len(),
1075            );
1076            if ret_code < 0 {
1077                return Err(ret_code.into());
1078            }
1079            Ok(ret_code)
1080        }
1081    }
1082
1083    /// Set an extended attribute on an object.
1084    pub fn rados_object_setxattr(
1085        &self,
1086        object_name: &str,
1087        attr_name: &str,
1088        attr_value: &mut [u8],
1089    ) -> RadosResult<()> {
1090        self.ioctx_guard()?;
1091        let object_name_str = CString::new(object_name)?;
1092        let attr_name_str = CString::new(attr_name)?;
1093
1094        unsafe {
1095            let ret_code = rados_setxattr(
1096                self.ioctx,
1097                object_name_str.as_ptr() as *const c_char,
1098                attr_name_str.as_ptr() as *const c_char,
1099                attr_value.as_mut_ptr() as *mut c_char,
1100                attr_value.len(),
1101            );
1102            if ret_code < 0 {
1103                return Err(ret_code.into());
1104            }
1105        }
1106        Ok(())
1107    }
1108
1109    /// Delete an extended attribute from an object.
1110    pub fn rados_object_rmxattr(&self, object_name: &str, attr_name: &str) -> RadosResult<()> {
1111        self.ioctx_guard()?;
1112        let object_name_str = CString::new(object_name)?;
1113        let attr_name_str = CString::new(attr_name)?;
1114
1115        unsafe {
1116            let ret_code = rados_rmxattr(
1117                self.ioctx,
1118                object_name_str.as_ptr() as *const c_char,
1119                attr_name_str.as_ptr() as *const c_char,
1120            );
1121            if ret_code < 0 {
1122                return Err(ret_code.into());
1123            }
1124        }
1125        Ok(())
1126    }
1127
1128    /// Get the rados_xattrs_iter_t reference to iterate over xattrs on an
1129    /// object Used in conjuction with XAttr::new() to iterate.
1130    pub fn rados_get_xattr_iterator(&self, object_name: &str) -> RadosResult<rados_xattrs_iter_t> {
1131        self.ioctx_guard()?;
1132        let object_name_str = CString::new(object_name)?;
1133        let mut xattr_iterator_handle: rados_xattrs_iter_t = ptr::null_mut();
1134
1135        unsafe {
1136            let ret_code = rados_getxattrs(
1137                self.ioctx,
1138                object_name_str.as_ptr(),
1139                &mut xattr_iterator_handle,
1140            );
1141            if ret_code < 0 {
1142                return Err(ret_code.into());
1143            }
1144        }
1145        Ok(xattr_iterator_handle)
1146    }
1147
1148    /// Get object stats (size,SystemTime)
1149    pub fn rados_object_stat(&self, object_name: &str) -> RadosResult<(u64, SystemTime)> {
1150        self.ioctx_guard()?;
1151        let object_name_str = CString::new(object_name)?;
1152        let mut psize: u64 = 0;
1153        let mut time: ::libc::time_t = 0;
1154
1155        unsafe {
1156            let ret_code = rados_stat(self.ioctx, object_name_str.as_ptr(), &mut psize, &mut time);
1157            if ret_code < 0 {
1158                return Err(ret_code.into());
1159            }
1160        }
1161        Ok((psize, (UNIX_EPOCH + Duration::from_secs(time as u64))))
1162    }
1163
1164    /// Update tmap (trivial map)
1165    pub fn rados_object_tmap_update(
1166        &self,
1167        object_name: &str,
1168        update: TmapOperation,
1169    ) -> RadosResult<()> {
1170        self.ioctx_guard()?;
1171        let object_name_str = CString::new(object_name)?;
1172        let buffer = update.serialize()?;
1173        unsafe {
1174            let ret_code = rados_tmap_update(
1175                self.ioctx,
1176                object_name_str.as_ptr(),
1177                buffer.as_ptr() as *const c_char,
1178                buffer.len(),
1179            );
1180            if ret_code < 0 {
1181                return Err(ret_code.into());
1182            }
1183        }
1184        Ok(())
1185    }
1186
1187    /// Fetch complete tmap (trivial map) object
1188    pub fn rados_object_tmap_get(&self, object_name: &str) -> RadosResult<Vec<TmapOperation>> {
1189        self.ioctx_guard()?;
1190        let object_name_str = CString::new(object_name)?;
1191        let mut buffer: Vec<u8> = Vec::with_capacity(500);
1192
1193        unsafe {
1194            let ret_code = rados_tmap_get(
1195                self.ioctx,
1196                object_name_str.as_ptr(),
1197                buffer.as_mut_ptr() as *mut c_char,
1198                buffer.capacity(),
1199            );
1200            if ret_code == -ERANGE {
1201                buffer.reserve(1000);
1202                buffer.set_len(1000);
1203                let ret_code = rados_tmap_get(
1204                    self.ioctx,
1205                    object_name_str.as_ptr(),
1206                    buffer.as_mut_ptr() as *mut c_char,
1207                    buffer.capacity(),
1208                );
1209                if ret_code < 0 {
1210                    return Err(ret_code.into());
1211                }
1212            } else if ret_code < 0 {
1213                return Err(ret_code.into());
1214            }
1215        }
1216        match TmapOperation::deserialize(&buffer) {
1217            Ok((_, tmap)) => Ok(tmap),
1218            Err(nom::Err::Incomplete(needed)) => Err(RadosError::new(format!(
1219                "deserialize of ceph tmap failed.
1220            Input from Ceph was too small.  Needed: {:?} more bytes",
1221                needed
1222            ))),
1223            Err(nom::Err::Error((e, _))) => {
1224                Err(RadosError::new(String::from_utf8_lossy(e).to_string()))
1225            }
1226            Err(nom::Err::Failure((e, _))) => {
1227                Err(RadosError::new(String::from_utf8_lossy(e).to_string()))
1228            }
1229        }
1230    }
1231
1232    /// Execute an OSD class method on an object
1233    /// The OSD has a plugin mechanism for performing complicated operations on
1234    /// an object atomically.
1235    /// These plugins are called classes. This function allows librados users to
1236    /// call the custom
1237    /// methods. The input and output formats are defined by the class. Classes
1238    /// in ceph.git can
1239    /// be found in src/cls subdirectories
1240    pub fn rados_object_exec(
1241        &self,
1242        object_name: &str,
1243        class_name: &str,
1244        method_name: &str,
1245        input_buffer: &[u8],
1246        output_buffer: &mut [u8],
1247    ) -> RadosResult<()> {
1248        self.ioctx_guard()?;
1249        let object_name_str = CString::new(object_name)?;
1250        let class_name_str = CString::new(class_name)?;
1251        let method_name_str = CString::new(method_name)?;
1252
1253        unsafe {
1254            let ret_code = rados_exec(
1255                self.ioctx,
1256                object_name_str.as_ptr(),
1257                class_name_str.as_ptr(),
1258                method_name_str.as_ptr(),
1259                input_buffer.as_ptr() as *const c_char,
1260                input_buffer.len(),
1261                output_buffer.as_mut_ptr() as *mut c_char,
1262                output_buffer.len(),
1263            );
1264            if ret_code < 0 {
1265                return Err(ret_code.into());
1266            }
1267        }
1268        Ok(())
1269    }
1270
1271    /// Sychronously notify watchers of an object
1272    /// This blocks until all watchers of the object have received and reacted
1273    /// to the notify, or a timeout is reached.
1274    pub fn rados_object_notify(&self, object_name: &str, data: &[u8]) -> RadosResult<()> {
1275        self.ioctx_guard()?;
1276        let object_name_str = CString::new(object_name)?;
1277
1278        unsafe {
1279            let ret_code = rados_notify(
1280                self.ioctx,
1281                object_name_str.as_ptr(),
1282                0,
1283                data.as_ptr() as *const c_char,
1284                data.len() as i32,
1285            );
1286            if ret_code < 0 {
1287                return Err(ret_code.into());
1288            }
1289        }
1290        Ok(())
1291    }
1292    // pub fn rados_object_notify2(ctx: rados_ioctx_t, object_name: &str) ->
1293    // RadosResult<()> {
1294    // if ctx.is_null() {
1295    // return Err(RadosError::new("Rados ioctx not created.  Please initialize
1296    // first".to_string()));
1297    // }
1298    //
1299    // unsafe {
1300    // }
1301    // }
1302    //
1303    /// Acknolwedge receipt of a notify
1304    pub fn rados_object_notify_ack(
1305        &self,
1306        object_name: &str,
1307        notify_id: u64,
1308        cookie: u64,
1309        buffer: Option<&[u8]>,
1310    ) -> RadosResult<()> {
1311        self.ioctx_guard()?;
1312        let object_name_str = CString::new(object_name)?;
1313
1314        match buffer {
1315            Some(buf) => unsafe {
1316                let ret_code = rados_notify_ack(
1317                    self.ioctx,
1318                    object_name_str.as_ptr(),
1319                    notify_id,
1320                    cookie,
1321                    buf.as_ptr() as *const c_char,
1322                    buf.len() as i32,
1323                );
1324                if ret_code < 0 {
1325                    return Err(ret_code.into());
1326                }
1327            },
1328            None => unsafe {
1329                let ret_code = rados_notify_ack(
1330                    self.ioctx,
1331                    object_name_str.as_ptr(),
1332                    notify_id,
1333                    cookie,
1334                    ptr::null(),
1335                    0,
1336                );
1337                if ret_code < 0 {
1338                    return Err(ret_code.into());
1339                }
1340            },
1341        }
1342        Ok(())
1343    }
1344    /// Set allocation hint for an object
1345    /// This is an advisory operation, it will always succeed (as if it was
1346    /// submitted with a
1347    /// LIBRADOS_OP_FLAG_FAILOK flag set) and is not guaranteed to do anything
1348    /// on the backend.
1349    pub fn rados_object_set_alloc_hint(
1350        &self,
1351        object_name: &str,
1352        expected_object_size: u64,
1353        expected_write_size: u64,
1354    ) -> RadosResult<()> {
1355        self.ioctx_guard()?;
1356        let object_name_str = CString::new(object_name)?;
1357
1358        unsafe {
1359            let ret_code = rados_set_alloc_hint(
1360                self.ioctx,
1361                object_name_str.as_ptr(),
1362                expected_object_size,
1363                expected_write_size,
1364            );
1365            if ret_code < 0 {
1366                return Err(ret_code.into());
1367            }
1368        }
1369        Ok(())
1370    }
1371
1372    // Perform a compound read operation synchronously
1373    pub fn rados_perform_read_operations(&self, read_op: ReadOperation) -> RadosResult<()> {
1374        self.ioctx_guard()?;
1375        let object_name_str = CString::new(read_op.object_name.clone())?;
1376
1377        unsafe {
1378            let ret_code = rados_read_op_operate(
1379                read_op.read_op_handle,
1380                self.ioctx,
1381                object_name_str.as_ptr(),
1382                read_op.flags as i32,
1383            );
1384            if ret_code < 0 {
1385                return Err(ret_code.into());
1386            }
1387        }
1388        Ok(())
1389    }
1390
1391    // Perform a compound write operation synchronously
1392    pub fn rados_commit_write_operations(&self, write_op: &mut WriteOperation) -> RadosResult<()> {
1393        self.ioctx_guard()?;
1394        let object_name_str = CString::new(write_op.object_name.clone())?;
1395
1396        unsafe {
1397            let ret_code = rados_write_op_operate(
1398                write_op.write_op_handle,
1399                self.ioctx,
1400                object_name_str.as_ptr(),
1401                &mut write_op.mtime,
1402                write_op.flags as i32,
1403            );
1404            if ret_code < 0 {
1405                return Err(ret_code.into());
1406            }
1407        }
1408        Ok(())
1409    }
1410
1411    /// Take an exclusive lock on an object.
1412    pub fn rados_object_lock_exclusive(
1413        &self,
1414        object_name: &str,
1415        lock_name: &str,
1416        cookie_name: &str,
1417        description: &str,
1418        duration_time: &mut timeval,
1419        lock_flags: u8,
1420    ) -> RadosResult<()> {
1421        self.ioctx_guard()?;
1422        let object_name_str = CString::new(object_name)?;
1423        let lock_name_str = CString::new(lock_name)?;
1424        let cookie_name_str = CString::new(cookie_name)?;
1425        let description_str = CString::new(description)?;
1426
1427        unsafe {
1428            let ret_code = rados_lock_exclusive(
1429                self.ioctx,
1430                object_name_str.as_ptr(),
1431                lock_name_str.as_ptr(),
1432                cookie_name_str.as_ptr(),
1433                description_str.as_ptr(),
1434                duration_time,
1435                lock_flags,
1436            );
1437            if ret_code < 0 {
1438                return Err(ret_code.into());
1439            }
1440        }
1441        Ok(())
1442    }
1443
1444    /// Take a shared lock on an object.
1445    pub fn rados_object_lock_shared(
1446        &self,
1447        object_name: &str,
1448        lock_name: &str,
1449        cookie_name: &str,
1450        description: &str,
1451        tag_name: &str,
1452        duration_time: &mut timeval,
1453        lock_flags: u8,
1454    ) -> RadosResult<()> {
1455        self.ioctx_guard()?;
1456        let object_name_str = CString::new(object_name)?;
1457        let lock_name_str = CString::new(lock_name)?;
1458        let cookie_name_str = CString::new(cookie_name)?;
1459        let description_str = CString::new(description)?;
1460        let tag_name_str = CString::new(tag_name)?;
1461
1462        unsafe {
1463            let ret_code = rados_lock_shared(
1464                self.ioctx,
1465                object_name_str.as_ptr(),
1466                lock_name_str.as_ptr(),
1467                cookie_name_str.as_ptr(),
1468                tag_name_str.as_ptr(),
1469                description_str.as_ptr(),
1470                duration_time,
1471                lock_flags,
1472            );
1473            if ret_code < 0 {
1474                return Err(ret_code.into());
1475            }
1476        }
1477        Ok(())
1478    }
1479
1480    /// Release a shared or exclusive lock on an object.
1481    pub fn rados_object_unlock(
1482        &self,
1483        object_name: &str,
1484        lock_name: &str,
1485        cookie_name: &str,
1486    ) -> RadosResult<()> {
1487        self.ioctx_guard()?;
1488        let object_name_str = CString::new(object_name)?;
1489        let lock_name_str = CString::new(lock_name)?;
1490        let cookie_name_str = CString::new(cookie_name)?;
1491
1492        unsafe {
1493            let ret_code = rados_unlock(
1494                self.ioctx,
1495                object_name_str.as_ptr(),
1496                lock_name_str.as_ptr(),
1497                cookie_name_str.as_ptr(),
1498            );
1499            if ret_code < 0 {
1500                return Err(ret_code.into());
1501            }
1502        }
1503        Ok(())
1504    }
1505
1506    /// List clients that have locked the named object lock and information
1507    /// about the lock.
1508    /// The number of bytes required in each buffer is put in the corresponding
1509    /// size out parameter.
1510    /// If any of the provided buffers are too short, -ERANGE is returned after
1511    /// these sizes are filled in.
1512    // pub fn rados_object_list_lockers(ctx: rados_ioctx_t, object_name: &str,
1513    // lock_name: &str, exclusive: u8, ) ->
1514    // RadosResult<isize> {
1515    // if ctx.is_null() {
1516    // return Err(RadosError::new("Rados ioctx not created.  Please initialize
1517    // first".to_string()));
1518    // }
1519    // let object_name_str = try!(CString::new(object_name));
1520    //
1521    // unsafe {
1522    // let ret_code = rados_list_lockers(ctx,
1523    // o: *const ::libc::c_char,
1524    // name: *const ::libc::c_char,
1525    // exclusive: *mut ::libc::c_int,
1526    // tag: *mut ::libc::c_char,
1527    // tag_len: *mut size_t,
1528    // clients: *mut ::libc::c_char,
1529    // clients_len: *mut size_t,
1530    // cookies: *mut ::libc::c_char,
1531    // cookies_len: *mut size_t,
1532    // addrs: *mut ::libc::c_char,
1533    // addrs_len: *mut size_t);
1534    // }
1535    // }
1536    /// Releases a shared or exclusive lock on an object, which was taken by the
1537    /// specified client.
1538    pub fn rados_object_break_lock(
1539        &self,
1540        object_name: &str,
1541        lock_name: &str,
1542        client_name: &str,
1543        cookie_name: &str,
1544    ) -> RadosResult<()> {
1545        self.ioctx_guard()?;
1546        let object_name_str = CString::new(object_name)?;
1547        let lock_name_str = CString::new(lock_name)?;
1548        let cookie_name_str = CString::new(cookie_name)?;
1549        let client_name_str = CString::new(client_name)?;
1550
1551        unsafe {
1552            let ret_code = rados_break_lock(
1553                self.ioctx,
1554                object_name_str.as_ptr(),
1555                lock_name_str.as_ptr(),
1556                client_name_str.as_ptr(),
1557                cookie_name_str.as_ptr(),
1558            );
1559            if ret_code < 0 {
1560                return Err(ret_code.into());
1561            }
1562        }
1563        Ok(())
1564    }
1565
1566    /// Create a rados striper.
1567    /// For more details see rados_striper_t.
1568    #[cfg(feature = "rados_striper")]
1569    pub fn get_rados_striper(self) -> RadosResult<RadosStriper> {
1570        self.ioctx_guard()?;
1571        unsafe {
1572            let mut rados_striper: rados_striper_t = ptr::null_mut();
1573            let ret_code = rados_striper_create(self.ioctx, &mut rados_striper);
1574            if ret_code < 0 {
1575                return Err(ret_code.into());
1576            }
1577            Ok(RadosStriper { rados_striper })
1578        }
1579    }
1580}
1581
1582impl Rados {
1583    pub fn rados_blacklist_client(&self, client: IpAddr, expire_seconds: u32) -> RadosResult<()> {
1584        self.conn_guard()?;
1585        let client_address = CString::new(client.to_string())?;
1586        unsafe {
1587            let ret_code = rados_blacklist_add(
1588                self.rados,
1589                client_address.as_ptr() as *mut c_char,
1590                expire_seconds,
1591            );
1592
1593            if ret_code < 0 {
1594                return Err(ret_code.into());
1595            }
1596        }
1597        Ok(())
1598    }
1599
1600    /// Returns back a collection of Rados Pools
1601    ///
1602    /// pool_buffer should be allocated with:
1603    /// ```
1604    /// let capacity = 10;
1605    /// let pool_buffer: Vec<u8> = Vec::with_capacity(capacity);
1606    /// ```
1607    /// buf_size should be the value used with_capacity
1608    ///
1609    /// Returns Ok(Vec<String>) - A list of Strings of the pool names.
1610    #[allow(unused_variables)]
1611    pub fn rados_pools(&self) -> RadosResult<Vec<String>> {
1612        self.conn_guard()?;
1613        let mut pools: Vec<String> = Vec::new();
1614        let pool_slice: &[u8];
1615        let mut pool_buffer: Vec<u8> = Vec::with_capacity(500);
1616
1617        unsafe {
1618            let len = rados_pool_list(
1619                self.rados,
1620                pool_buffer.as_mut_ptr() as *mut c_char,
1621                pool_buffer.capacity(),
1622            );
1623            if len > pool_buffer.capacity() as i32 {
1624                // rados_pool_list requires more buffer than we gave it
1625                pool_buffer.reserve(len as usize);
1626                let len = rados_pool_list(
1627                    self.rados,
1628                    pool_buffer.as_mut_ptr() as *mut c_char,
1629                    pool_buffer.capacity(),
1630                );
1631                // Tell the Vec how much Ceph read into the buffer
1632                pool_buffer.set_len(len as usize);
1633            } else {
1634                // Tell the Vec how much Ceph read into the buffer
1635                pool_buffer.set_len(len as usize);
1636            }
1637        }
1638        let mut cursor = Cursor::new(&pool_buffer);
1639        loop {
1640            let mut string_buf: Vec<u8> = Vec::new();
1641            let read = cursor.read_until(0x00, &mut string_buf)?;
1642            // 0 End of the pool_buffer;
1643            // 1 Read a double \0.  Time to break
1644            if read == 0 || read == 1 {
1645                break;
1646            } else {
1647                // Read a String
1648                pools.push(String::from_utf8_lossy(&string_buf[..read - 1]).into_owned());
1649            }
1650        }
1651
1652        Ok(pools)
1653    }
1654
1655    /// Create a pool with default settings
1656    /// The default owner is the admin user (auid 0). The default crush rule is
1657    /// rule 0.
1658    pub fn rados_create_pool(&self, pool_name: &str) -> RadosResult<()> {
1659        self.conn_guard()?;
1660        let pool_name_str = CString::new(pool_name)?;
1661        unsafe {
1662            let ret_code = rados_pool_create(self.rados, pool_name_str.as_ptr());
1663            if ret_code < 0 {
1664                return Err(ret_code.into());
1665            }
1666        }
1667        Ok(())
1668    }
1669    /// Delete a pool and all data inside it
1670    /// The pool is removed from the cluster immediately, but the actual data is
1671    /// deleted in
1672    /// the background.
1673    pub fn rados_delete_pool(&self, pool_name: &str) -> RadosResult<()> {
1674        self.conn_guard()?;
1675        let pool_name_str = CString::new(pool_name)?;
1676        unsafe {
1677            let ret_code = rados_pool_delete(self.rados, pool_name_str.as_ptr());
1678            if ret_code < 0 {
1679                return Err(ret_code.into());
1680            }
1681        }
1682        Ok(())
1683    }
1684
1685    /// Lookup a Ceph pool id.  If the pool doesn't exist it will return
1686    /// Ok(None).
1687    pub fn rados_lookup_pool(&self, pool_name: &str) -> RadosResult<Option<i64>> {
1688        self.conn_guard()?;
1689        let pool_name_str = CString::new(pool_name)?;
1690        unsafe {
1691            let ret_code: i64 = rados_pool_lookup(self.rados, pool_name_str.as_ptr());
1692            if ret_code >= 0 {
1693                Ok(Some(ret_code))
1694            } else if ret_code as i32 == -ENOENT {
1695                Ok(None)
1696            } else {
1697                Err((ret_code as i32).into())
1698            }
1699        }
1700    }
1701
1702    pub fn rados_reverse_lookup_pool(&self, pool_id: i64) -> RadosResult<String> {
1703        self.conn_guard()?;
1704        let mut buffer: Vec<u8> = Vec::with_capacity(500);
1705
1706        unsafe {
1707            let ret_code = rados_pool_reverse_lookup(
1708                self.rados,
1709                pool_id,
1710                buffer.as_mut_ptr() as *mut c_char,
1711                buffer.capacity(),
1712            );
1713            if ret_code == -ERANGE {
1714                // Buffer was too small
1715                buffer.reserve(1000);
1716                buffer.set_len(1000);
1717                let ret_code = rados_pool_reverse_lookup(
1718                    self.rados,
1719                    pool_id,
1720                    buffer.as_mut_ptr() as *mut c_char,
1721                    buffer.capacity(),
1722                );
1723                if ret_code < 0 {
1724                    return Err(ret_code.into());
1725                }
1726                Ok(String::from_utf8_lossy(&buffer).into_owned())
1727            } else if ret_code < 0 {
1728                Err(ret_code.into())
1729            } else {
1730                Ok(String::from_utf8_lossy(&buffer).into_owned())
1731            }
1732        }
1733    }
1734}
1735
1736/// Get the version of librados.
1737pub fn rados_libversion() -> RadosVersion {
1738    let mut major: c_int = 0;
1739    let mut minor: c_int = 0;
1740    let mut extra: c_int = 0;
1741    unsafe {
1742        rados_version(&mut major, &mut minor, &mut extra);
1743    }
1744    RadosVersion {
1745        major,
1746        minor,
1747        extra,
1748    }
1749}
1750
1751impl Rados {
1752    /// Read usage info about the cluster
1753    /// This tells you total space, space used, space available, and number of
1754    /// objects.
1755    /// These are not updated immediately when data is written, they are
1756    /// eventually consistent.
1757    /// Note: Ceph uses kibibytes: https://en.wikipedia.org/wiki/Kibibyte
1758    pub fn rados_stat_cluster(&self) -> RadosResult<Struct_rados_cluster_stat_t> {
1759        self.conn_guard()?;
1760        let mut cluster_stat = Struct_rados_cluster_stat_t::default();
1761        unsafe {
1762            let ret_code = rados_cluster_stat(self.rados, &mut cluster_stat);
1763            if ret_code < 0 {
1764                return Err(ret_code.into());
1765            }
1766        }
1767
1768        Ok(cluster_stat)
1769    }
1770
1771    pub fn rados_fsid(&self) -> RadosResult<Uuid> {
1772        self.conn_guard()?;
1773        let mut fsid_buffer: Vec<u8> = Vec::with_capacity(37);
1774        unsafe {
1775            let ret_code = rados_cluster_fsid(
1776                self.rados,
1777                fsid_buffer.as_mut_ptr() as *mut c_char,
1778                fsid_buffer.capacity(),
1779            );
1780            if ret_code < 0 {
1781                return Err(ret_code.into());
1782            }
1783            // Tell the Vec how much Ceph read into the buffer
1784            fsid_buffer.set_len(ret_code as usize);
1785        }
1786        // Ceph actually returns the fsid as a uuid string
1787        let fsid_str = String::from_utf8(fsid_buffer)?;
1788        // Parse into a UUID and return
1789        Ok(fsid_str.parse()?)
1790    }
1791
1792    /// Ping a monitor to assess liveness
1793    /// May be used as a simply way to assess liveness, or to obtain
1794    /// information about the monitor in a simple way even in the
1795    /// absence of quorum.
1796    pub fn ping_monitor(&self, mon_id: &str) -> RadosResult<String> {
1797        self.conn_guard()?;
1798
1799        let mon_id_str = CString::new(mon_id)?;
1800        let mut out_str: *mut c_char = ptr::null_mut();
1801        let mut str_length: usize = 0;
1802        unsafe {
1803            let ret_code = rados_ping_monitor(
1804                self.rados,
1805                mon_id_str.as_ptr(),
1806                &mut out_str,
1807                &mut str_length,
1808            );
1809            if ret_code < 0 {
1810                return Err(ret_code.into());
1811            }
1812            if !out_str.is_null() {
1813                // valid string
1814                let s_bytes = std::slice::from_raw_parts(out_str, str_length);
1815                // Convert from i8 -> u8
1816                let bytes: Vec<u8> = s_bytes.iter().map(|c| *c as u8).collect();
1817                // Tell rados we're done with this buffer
1818                rados_buffer_free(out_str);
1819                Ok(String::from_utf8_lossy(&bytes).into_owned())
1820            } else {
1821                Ok("".into())
1822            }
1823        }
1824    }
1825}
1826
1827/// Ceph version - Ceph during the make release process generates the version
1828/// number along with
1829/// the github hash of the release and embeds the hard coded value into
1830/// `ceph.py` which is the
1831/// the default ceph utility.
1832pub fn ceph_version(socket: &str) -> Option<String> {
1833    let cmd = "version";
1834
1835    admin_socket_command(&cmd, socket).ok().and_then(|json| {
1836        json_data(&json)
1837            .and_then(|jsondata| json_find(jsondata, &[cmd]).map(|data| json_as_string(&data)))
1838    })
1839}
1840
1841/// This version call parses the `ceph -s` output. It does not need `sudo`
1842/// rights like
1843/// `ceph_version` does since it pulls from the admin socket.
1844pub fn ceph_version_parse() -> Option<String> {
1845    match run_cli("ceph --version") {
1846        Ok(output) => {
1847            let n = output.status.code().unwrap();
1848            if n == 0 {
1849                Some(String::from_utf8_lossy(&output.stdout).to_string())
1850            } else {
1851                Some(String::from_utf8_lossy(&output.stderr).to_string())
1852            }
1853        }
1854        Err(_) => None,
1855    }
1856}
1857
1858impl Rados {
1859    /// Only single String value
1860    pub fn ceph_status(&self, keys: &[&str]) -> RadosResult<String> {
1861        self.conn_guard()?;
1862        match self.ceph_mon_command("prefix", "status", Some("json")) {
1863            Ok((json, _)) => match json {
1864                Some(json) => match json_data(&json) {
1865                    Some(jsondata) => {
1866                        if let Some(data) = json_find(jsondata, keys) {
1867                            Ok(json_as_string(&data))
1868                        } else {
1869                            Err(RadosError::new(
1870                                "The attributes were not found in the output.".to_string(),
1871                            ))
1872                        }
1873                    }
1874                    _ => Err(RadosError::new("JSON data not found.".to_string())),
1875                },
1876                _ => Err(RadosError::new("JSON data not found.".to_string())),
1877            },
1878            Err(e) => Err(e),
1879        }
1880    }
1881
1882    /// string with the `health HEALTH_OK` or `HEALTH_WARN` or `HEALTH_ERR`
1883    /// which is also not efficient.
1884    pub fn ceph_health_string(&self) -> RadosResult<String> {
1885        self.conn_guard()?;
1886        match self.ceph_mon_command("prefix", "health", None) {
1887            Ok((data, _)) => Ok(data.unwrap().replace("\n", "")),
1888            Err(e) => Err(e),
1889        }
1890    }
1891
1892    /// Returns an enum value of:
1893    /// CephHealth::Ok
1894    /// CephHealth::Warning
1895    /// CephHealth::Error
1896    pub fn ceph_health(&self) -> CephHealth {
1897        match self.ceph_health_string() {
1898            Ok(health) => {
1899                if health.contains("HEALTH_OK") {
1900                    CephHealth::Ok
1901                } else if health.contains("HEALTH_WARN") {
1902                    CephHealth::Warning
1903                } else {
1904                    CephHealth::Error
1905                }
1906            }
1907            Err(_) => CephHealth::Error,
1908        }
1909    }
1910
1911    /// Higher level `ceph_command`
1912    pub fn ceph_command(
1913        &self,
1914        name: &str,
1915        value: &str,
1916        cmd_type: CephCommandTypes,
1917        keys: &[&str],
1918    ) -> RadosResult<JsonData> {
1919        self.conn_guard()?;
1920        match cmd_type {
1921            CephCommandTypes::Osd => Err(RadosError::new("OSD CMDs Not implemented.".to_string())),
1922            CephCommandTypes::Pgs => Err(RadosError::new("PGS CMDS Not implemented.".to_string())),
1923            _ => match self.ceph_mon_command(name, value, Some("json")) {
1924                Ok((json, _)) => match json {
1925                    Some(json) => match json_data(&json) {
1926                        Some(jsondata) => {
1927                            if let Some(data) = json_find(jsondata, keys) {
1928                                Ok(data)
1929                            } else {
1930                                Err(RadosError::new(
1931                                    "The attributes were not found in the output.".to_string(),
1932                                ))
1933                            }
1934                        }
1935                        _ => Err(RadosError::new("JSON data not found.".to_string())),
1936                    },
1937                    _ => Err(RadosError::new("JSON data not found.".to_string())),
1938                },
1939                Err(e) => Err(e),
1940            },
1941        }
1942    }
1943
1944    /// Returns the list of available commands
1945    pub fn ceph_commands(&self, keys: Option<&[&str]>) -> RadosResult<JsonData> {
1946        self.conn_guard()?;
1947        match self.ceph_mon_command("prefix", "get_command_descriptions", Some("json")) {
1948            Ok((json, _)) => match json {
1949                Some(json) => match json_data(&json) {
1950                    Some(jsondata) => {
1951                        if let Some(k) = keys {
1952                            if let Some(data) = json_find(jsondata, k) {
1953                                Ok(data)
1954                            } else {
1955                                Err(RadosError::new(
1956                                    "The attributes were not found in the output.".to_string(),
1957                                ))
1958                            }
1959                        } else {
1960                            Ok(jsondata)
1961                        }
1962                    }
1963                    _ => Err(RadosError::new("JSON data not found.".to_string())),
1964                },
1965                _ => Err(RadosError::new("JSON data not found.".to_string())),
1966            },
1967            Err(e) => Err(e),
1968        }
1969    }
1970
1971    /// Mon command that does not pass in a data payload.
1972    pub fn ceph_mon_command(
1973        &self,
1974        name: &str,
1975        value: &str,
1976        format: Option<&str>,
1977    ) -> RadosResult<(Option<String>, Option<String>)> {
1978        let data: Vec<*mut c_char> = Vec::with_capacity(1);
1979        self.ceph_mon_command_with_data(name, value, format, data)
1980    }
1981
1982    pub fn ceph_mon_command_without_data(
1983        &self,
1984        cmd: &serde_json::Value,
1985    ) -> RadosResult<(Vec<u8>, Option<String>)> {
1986        self.conn_guard()?;
1987        let cmd_string = cmd.to_string();
1988        debug!("ceph_mon_command_without_data: {}", cmd_string);
1989        let data: Vec<*mut c_char> = Vec::with_capacity(1);
1990        let cmds = CString::new(cmd_string).unwrap();
1991
1992        let mut outbuf_len = 0;
1993        let mut outs = ptr::null_mut();
1994        let mut outs_len = 0;
1995
1996        // Ceph librados allocates these buffers internally and the pointer that comes
1997        // back must be
1998        // freed by call `rados_buffer_free`
1999        let mut outbuf = ptr::null_mut();
2000        let mut out: Vec<u8> = vec![];
2001        let mut status_string: Option<String> = None;
2002
2003        debug!("Calling rados_mon_command with {:?}", cmd);
2004
2005        unsafe {
2006            // cmd length is 1 because we only allow one command at a time.
2007            let ret_code = rados_mon_command(
2008                self.rados,
2009                &mut cmds.as_ptr(),
2010                1,
2011                data.as_ptr() as *mut c_char,
2012                data.len() as usize,
2013                &mut outbuf,
2014                &mut outbuf_len,
2015                &mut outs,
2016                &mut outs_len,
2017            );
2018            debug!("return code: {}", ret_code);
2019            if ret_code < 0 {
2020                if outs_len > 0 && !outs.is_null() {
2021                    let slice = ::std::slice::from_raw_parts(outs as *const u8, outs_len as usize);
2022                    rados_buffer_free(outs);
2023                    return Err(RadosError::new(String::from_utf8_lossy(slice).into_owned()));
2024                }
2025                return Err(ret_code.into());
2026            }
2027
2028            // Copy the data from outbuf and then call rados_buffer_free instead libc::free
2029            if outbuf_len > 0 && !outbuf.is_null() {
2030                let slice = ::std::slice::from_raw_parts(outbuf as *const u8, outbuf_len as usize);
2031                out = slice.to_vec();
2032
2033                rados_buffer_free(outbuf);
2034            }
2035            if outs_len > 0 && !outs.is_null() {
2036                let slice = ::std::slice::from_raw_parts(outs as *const u8, outs_len as usize);
2037                status_string = Some(String::from_utf8(slice.to_vec())?);
2038                rados_buffer_free(outs);
2039            }
2040        }
2041
2042        Ok((out, status_string))
2043    }
2044
2045    /// Mon command that does pass in a data payload.
2046    /// Most all of the commands pass through this function.
2047    pub fn ceph_mon_command_with_data(
2048        &self,
2049        name: &str,
2050        value: &str,
2051        format: Option<&str>,
2052        data: Vec<*mut c_char>,
2053    ) -> RadosResult<(Option<String>, Option<String>)> {
2054        self.conn_guard()?;
2055
2056        let mut cmd_strings: Vec<String> = Vec::new();
2057        match format {
2058            Some(fmt) => cmd_strings.push(format!(
2059                "{{\"{}\": \"{}\", \"format\": \"{}\"}}",
2060                name, value, fmt
2061            )),
2062            None => cmd_strings.push(format!("{{\"{}\": \"{}\"}}", name, value)),
2063        }
2064
2065        let cstrings: Vec<CString> = cmd_strings[..]
2066            .iter()
2067            .map(|s| CString::new(s.clone()).unwrap())
2068            .collect();
2069        let mut cmds: Vec<*const c_char> = cstrings.iter().map(|c| c.as_ptr()).collect();
2070
2071        let mut outbuf = ptr::null_mut();
2072        let mut outs = ptr::null_mut();
2073        let mut outbuf_len = 0;
2074        let mut outs_len = 0;
2075
2076        // Ceph librados allocates these buffers internally and the pointer that comes
2077        // back must be
2078        // freed by call `rados_buffer_free`
2079        let mut str_outbuf: Option<String> = None;
2080        let mut str_outs: Option<String> = None;
2081
2082        debug!("Calling rados_mon_command with {:?}", cstrings);
2083
2084        unsafe {
2085            // cmd length is 1 because we only allow one command at a time.
2086            let ret_code = rados_mon_command(
2087                self.rados,
2088                cmds.as_mut_ptr(),
2089                1,
2090                data.as_ptr() as *mut c_char,
2091                data.len() as usize,
2092                &mut outbuf,
2093                &mut outbuf_len,
2094                &mut outs,
2095                &mut outs_len,
2096            );
2097            if ret_code < 0 {
2098                return Err(ret_code.into());
2099            }
2100
2101            // Copy the data from outbuf and then  call rados_buffer_free instead libc::free
2102            if outbuf_len > 0 {
2103                let c_str_outbuf: &CStr = CStr::from_ptr(outbuf);
2104                let buf_outbuf: &[u8] = c_str_outbuf.to_bytes();
2105                let str_slice_outbuf: &str = str::from_utf8(buf_outbuf).unwrap();
2106                str_outbuf = Some(str_slice_outbuf.to_owned());
2107
2108                rados_buffer_free(outbuf);
2109            }
2110
2111            if outs_len > 0 {
2112                let c_str_outs: &CStr = CStr::from_ptr(outs);
2113                let buf_outs: &[u8] = c_str_outs.to_bytes();
2114                let str_slice_outs: &str = str::from_utf8(buf_outs).unwrap();
2115                str_outs = Some(str_slice_outs.to_owned());
2116
2117                rados_buffer_free(outs);
2118            }
2119        }
2120
2121        Ok((str_outbuf, str_outs))
2122    }
2123
2124    /// OSD command that does not pass in a data payload.
2125    pub fn ceph_osd_command(
2126        &self,
2127        id: i32,
2128        name: &str,
2129        value: &str,
2130        format: Option<&str>,
2131    ) -> RadosResult<(Option<String>, Option<String>)> {
2132        let data: Vec<*mut c_char> = Vec::with_capacity(1);
2133        self.ceph_osd_command_with_data(id, name, value, format, data)
2134    }
2135
2136    /// OSD command that does pass in a data payload.
2137    pub fn ceph_osd_command_with_data(
2138        &self,
2139        id: i32,
2140        name: &str,
2141        value: &str,
2142        format: Option<&str>,
2143        data: Vec<*mut c_char>,
2144    ) -> RadosResult<(Option<String>, Option<String>)> {
2145        self.conn_guard()?;
2146
2147        let mut cmd_strings: Vec<String> = Vec::new();
2148        match format {
2149            Some(fmt) => cmd_strings.push(format!(
2150                "{{\"{}\": \"{}\", \"format\": \"{}\"}}",
2151                name, value, fmt
2152            )),
2153            None => cmd_strings.push(format!("{{\"{}\": \"{}\"}}", name, value)),
2154        }
2155
2156        let cstrings: Vec<CString> = cmd_strings[..]
2157            .iter()
2158            .map(|s| CString::new(s.clone()).unwrap())
2159            .collect();
2160        let mut cmds: Vec<*const c_char> = cstrings.iter().map(|c| c.as_ptr()).collect();
2161
2162        let mut outbuf = ptr::null_mut();
2163        let mut outs = ptr::null_mut();
2164        let mut outbuf_len = 0;
2165        let mut outs_len = 0;
2166
2167        // Ceph librados allocates these buffers internally and the pointer that comes
2168        // back must be
2169        // freed by call `rados_buffer_free`
2170        let mut str_outbuf: Option<String> = None;
2171        let mut str_outs: Option<String> = None;
2172
2173        unsafe {
2174            // cmd length is 1 because we only allow one command at a time.
2175            let ret_code = rados_osd_command(
2176                self.rados,
2177                id,
2178                cmds.as_mut_ptr(),
2179                1,
2180                data.as_ptr() as *mut c_char,
2181                data.len() as usize,
2182                &mut outbuf,
2183                &mut outbuf_len,
2184                &mut outs,
2185                &mut outs_len,
2186            );
2187            if ret_code < 0 {
2188                return Err(ret_code.into());
2189            }
2190
2191            // Copy the data from outbuf and then  call rados_buffer_free instead libc::free
2192            if outbuf_len > 0 {
2193                let c_str_outbuf: &CStr = CStr::from_ptr(outbuf);
2194                let buf_outbuf: &[u8] = c_str_outbuf.to_bytes();
2195                let str_slice_outbuf: &str = str::from_utf8(buf_outbuf).unwrap();
2196                str_outbuf = Some(str_slice_outbuf.to_owned());
2197
2198                rados_buffer_free(outbuf);
2199            }
2200
2201            if outs_len > 0 {
2202                let c_str_outs: &CStr = CStr::from_ptr(outs);
2203                let buf_outs: &[u8] = c_str_outs.to_bytes();
2204                let str_slice_outs: &str = str::from_utf8(buf_outs).unwrap();
2205                str_outs = Some(str_slice_outs.to_owned());
2206
2207                rados_buffer_free(outs);
2208            }
2209        }
2210
2211        Ok((str_outbuf, str_outs))
2212    }
2213
2214    /// PG command that does not pass in a data payload.
2215    pub fn ceph_pgs_command(
2216        &self,
2217        pg: &str,
2218        name: &str,
2219        value: &str,
2220        format: Option<&str>,
2221    ) -> RadosResult<(Option<String>, Option<String>)> {
2222        let data: Vec<*mut c_char> = Vec::with_capacity(1);
2223        self.ceph_pgs_command_with_data(pg, name, value, format, data)
2224    }
2225
2226    /// PG command that does pass in a data payload.
2227    pub fn ceph_pgs_command_with_data(
2228        &self,
2229        pg: &str,
2230        name: &str,
2231        value: &str,
2232        format: Option<&str>,
2233        data: Vec<*mut c_char>,
2234    ) -> RadosResult<(Option<String>, Option<String>)> {
2235        self.conn_guard()?;
2236
2237        let mut cmd_strings: Vec<String> = Vec::new();
2238        match format {
2239            Some(fmt) => cmd_strings.push(format!(
2240                "{{\"{}\": \"{}\", \"format\": \"{}\"}}",
2241                name, value, fmt
2242            )),
2243            None => cmd_strings.push(format!("{{\"{}\": \"{}\"}}", name, value)),
2244        }
2245
2246        let pg_str = CString::new(pg).unwrap();
2247        let cstrings: Vec<CString> = cmd_strings[..]
2248            .iter()
2249            .map(|s| CString::new(s.clone()).unwrap())
2250            .collect();
2251        let mut cmds: Vec<*const c_char> = cstrings.iter().map(|c| c.as_ptr()).collect();
2252
2253        let mut outbuf = ptr::null_mut();
2254        let mut outs = ptr::null_mut();
2255        let mut outbuf_len = 0;
2256        let mut outs_len = 0;
2257
2258        // Ceph librados allocates these buffers internally and the pointer that comes
2259        // back must be
2260        // freed by call `rados_buffer_free`
2261        let mut str_outbuf: Option<String> = None;
2262        let mut str_outs: Option<String> = None;
2263
2264        unsafe {
2265            // cmd length is 1 because we only allow one command at a time.
2266            let ret_code = rados_pg_command(
2267                self.rados,
2268                pg_str.as_ptr(),
2269                cmds.as_mut_ptr(),
2270                1,
2271                data.as_ptr() as *mut c_char,
2272                data.len() as usize,
2273                &mut outbuf,
2274                &mut outbuf_len,
2275                &mut outs,
2276                &mut outs_len,
2277            );
2278            if ret_code < 0 {
2279                return Err(ret_code.into());
2280            }
2281
2282            // Copy the data from outbuf and then  call rados_buffer_free instead libc::free
2283            if outbuf_len > 0 {
2284                let c_str_outbuf: &CStr = CStr::from_ptr(outbuf);
2285                let buf_outbuf: &[u8] = c_str_outbuf.to_bytes();
2286                let str_slice_outbuf: &str = str::from_utf8(buf_outbuf).unwrap();
2287                str_outbuf = Some(str_slice_outbuf.to_owned());
2288
2289                rados_buffer_free(outbuf);
2290            }
2291
2292            if outs_len > 0 {
2293                let c_str_outs: &CStr = CStr::from_ptr(outs);
2294                let buf_outs: &[u8] = c_str_outs.to_bytes();
2295                let str_slice_outs: &str = str::from_utf8(buf_outs).unwrap();
2296                str_outs = Some(str_slice_outs.to_owned());
2297
2298                rados_buffer_free(outs);
2299            }
2300        }
2301
2302        Ok((str_outbuf, str_outs))
2303    }
2304}
2305
2306#[cfg(feature = "rados_striper")]
2307impl RadosStriper {
2308    pub fn inner(&self) -> &rados_striper_t {
2309        &self.rados_striper
2310    }
2311
2312    /// This just tells librados that you no longer need to use the striper.
2313    pub fn destroy_rados_striper(&self) {
2314        if self.rados_striper.is_null() {
2315            // No need to do anything
2316            return;
2317        }
2318        unsafe {
2319            rados_striper_destroy(self.rados_striper);
2320        }
2321    }
2322
2323    fn rados_striper_guard(&self) -> RadosResult<()> {
2324        if self.rados_striper.is_null() {
2325            return Err(RadosError::new(
2326                "Rados striper not created. Please initialize first".to_string(),
2327            ));
2328        }
2329        Ok(())
2330    }
2331
2332    /// Write len bytes from buf into the oid object, starting at offset off.
2333    /// The value of len must be <= UINT_MAX/2.
2334    pub fn rados_object_write(
2335        &self,
2336        object_name: &str,
2337        buffer: &[u8],
2338        offset: u64,
2339    ) -> RadosResult<()> {
2340        self.rados_striper_guard()?;
2341        let obj_name_str = CString::new(object_name)?;
2342
2343        unsafe {
2344            let ret_code = rados_striper_write(
2345                self.rados_striper,
2346                obj_name_str.as_ptr(),
2347                buffer.as_ptr() as *const c_char,
2348                buffer.len(),
2349                offset,
2350            );
2351            if ret_code < 0 {
2352                return Err(ret_code.into());
2353            }
2354        }
2355        Ok(())
2356    }
2357
2358    /// The object is filled with the provided data. If the object exists, it is
2359    /// atomically
2360    /// truncated and then written.
2361    pub fn rados_object_write_full(&self, object_name: &str, buffer: &[u8]) -> RadosResult<()> {
2362        self.rados_striper_guard()?;
2363        let obj_name_str = CString::new(object_name)?;
2364
2365        unsafe {
2366            let ret_code = rados_striper_write_full(
2367                self.rados_striper,
2368                obj_name_str.as_ptr(),
2369                buffer.as_ptr() as *const ::libc::c_char,
2370                buffer.len(),
2371            );
2372            if ret_code < 0 {
2373                return Err(ret_code.into());
2374            }
2375        }
2376        Ok(())
2377    }
2378
2379    /// Append len bytes from buf into the oid object.
2380    pub fn rados_object_append(&self, object_name: &str, buffer: &[u8]) -> RadosResult<()> {
2381        self.rados_striper_guard()?;
2382        let obj_name_str = CString::new(object_name)?;
2383
2384        unsafe {
2385            let ret_code = rados_striper_append(
2386                self.rados_striper,
2387                obj_name_str.as_ptr(),
2388                buffer.as_ptr() as *const c_char,
2389                buffer.len(),
2390            );
2391            if ret_code < 0 {
2392                return Err(ret_code.into());
2393            }
2394        }
2395        Ok(())
2396    }
2397
2398    /// Read data from an object.  This fills the slice given and returns the
2399    /// amount of bytes read
2400    /// The io context determines the snapshot to read from, if any was set by
2401    /// rados_ioctx_snap_set_read().
2402    /// Default read size is 64K unless you call Vec::with_capacity(1024*128)
2403    /// with a larger size.
2404    pub fn rados_object_read(
2405        &self,
2406        object_name: &str,
2407        fill_buffer: &mut Vec<u8>,
2408        read_offset: u64,
2409    ) -> RadosResult<i32> {
2410        self.rados_striper_guard()?;
2411        let object_name_str = CString::new(object_name)?;
2412        let mut len = fill_buffer.capacity();
2413        if len == 0 {
2414            fill_buffer.reserve_exact(1024 * 64);
2415            len = fill_buffer.capacity();
2416        }
2417
2418        unsafe {
2419            let ret_code = rados_striper_read(
2420                self.rados_striper,
2421                object_name_str.as_ptr(),
2422                fill_buffer.as_mut_ptr() as *mut c_char,
2423                len,
2424                read_offset,
2425            );
2426            if ret_code < 0 {
2427                return Err(ret_code.into());
2428            }
2429            fill_buffer.set_len(ret_code as usize);
2430            Ok(ret_code)
2431        }
2432    }
2433
2434    /// Delete an object
2435    /// Note: This does not delete any snapshots of the object.
2436    pub fn rados_object_remove(&self, object_name: &str) -> RadosResult<()> {
2437        self.rados_striper_guard()?;
2438        let object_name_str = CString::new(object_name)?;
2439
2440        unsafe {
2441            let ret_code = rados_striper_remove(
2442                self.rados_striper,
2443                object_name_str.as_ptr() as *const c_char,
2444            );
2445            if ret_code < 0 {
2446                return Err(ret_code.into());
2447            }
2448        }
2449        Ok(())
2450    }
2451
2452    /// Resize an object
2453    /// If this enlarges the object, the new area is logically filled with
2454    /// zeroes. If this shrinks the object, the excess data is removed.
2455    pub fn rados_object_trunc(&self, object_name: &str, new_size: u64) -> RadosResult<()> {
2456        self.rados_striper_guard()?;
2457        let object_name_str = CString::new(object_name)?;
2458
2459        unsafe {
2460            let ret_code =
2461                rados_striper_trunc(self.rados_striper, object_name_str.as_ptr(), new_size);
2462            if ret_code < 0 {
2463                return Err(ret_code.into());
2464            }
2465        }
2466        Ok(())
2467    }
2468
2469    /// Get the value of an extended attribute on an object.
2470    pub fn rados_object_getxattr(
2471        &self,
2472        object_name: &str,
2473        attr_name: &str,
2474        fill_buffer: &mut [u8],
2475    ) -> RadosResult<i32> {
2476        self.rados_striper_guard()?;
2477        let object_name_str = CString::new(object_name)?;
2478        let attr_name_str = CString::new(attr_name)?;
2479
2480        unsafe {
2481            let ret_code = rados_striper_getxattr(
2482                self.rados_striper,
2483                object_name_str.as_ptr() as *const c_char,
2484                attr_name_str.as_ptr() as *const c_char,
2485                fill_buffer.as_mut_ptr() as *mut c_char,
2486                fill_buffer.len(),
2487            );
2488            if ret_code < 0 {
2489                return Err(ret_code.into());
2490            }
2491            Ok(ret_code)
2492        }
2493    }
2494
2495    /// Set an extended attribute on an object.
2496    pub fn rados_object_setxattr(
2497        &self,
2498        object_name: &str,
2499        attr_name: &str,
2500        attr_value: &mut [u8],
2501    ) -> RadosResult<()> {
2502        self.rados_striper_guard()?;
2503        let object_name_str = CString::new(object_name)?;
2504        let attr_name_str = CString::new(attr_name)?;
2505
2506        unsafe {
2507            let ret_code = rados_striper_setxattr(
2508                self.rados_striper,
2509                object_name_str.as_ptr() as *const c_char,
2510                attr_name_str.as_ptr() as *const c_char,
2511                attr_value.as_mut_ptr() as *mut c_char,
2512                attr_value.len(),
2513            );
2514            if ret_code < 0 {
2515                return Err(ret_code.into());
2516            }
2517        }
2518        Ok(())
2519    }
2520
2521    /// Delete an extended attribute from an object.
2522    pub fn rados_object_rmxattr(&self, object_name: &str, attr_name: &str) -> RadosResult<()> {
2523        self.rados_striper_guard()?;
2524        let object_name_str = CString::new(object_name)?;
2525        let attr_name_str = CString::new(attr_name)?;
2526
2527        unsafe {
2528            let ret_code = rados_striper_rmxattr(
2529                self.rados_striper,
2530                object_name_str.as_ptr() as *const c_char,
2531                attr_name_str.as_ptr() as *const c_char,
2532            );
2533            if ret_code < 0 {
2534                return Err(ret_code.into());
2535            }
2536        }
2537        Ok(())
2538    }
2539
2540    /// Get the rados_xattrs_iter_t reference to iterate over xattrs on an
2541    /// object Used in conjuction with XAttr::new() to iterate.
2542    pub fn rados_get_xattr_iterator(&self, object_name: &str) -> RadosResult<rados_xattrs_iter_t> {
2543        self.rados_striper_guard()?;
2544        let object_name_str = CString::new(object_name)?;
2545        let mut xattr_iterator_handle: rados_xattrs_iter_t = ptr::null_mut();
2546
2547        unsafe {
2548            let ret_code = rados_striper_getxattrs(
2549                self.rados_striper,
2550                object_name_str.as_ptr(),
2551                &mut xattr_iterator_handle,
2552            );
2553            if ret_code < 0 {
2554                return Err(ret_code.into());
2555            }
2556        }
2557        Ok(xattr_iterator_handle)
2558    }
2559
2560    /// Get object stats (size,SystemTime)
2561    pub fn rados_object_stat(&self, object_name: &str) -> RadosResult<(u64, SystemTime)> {
2562        self.rados_striper_guard()?;
2563        let object_name_str = CString::new(object_name)?;
2564        let mut psize: u64 = 0;
2565        let mut time: ::libc::time_t = 0;
2566
2567        unsafe {
2568            let ret_code = rados_striper_stat(
2569                self.rados_striper,
2570                object_name_str.as_ptr(),
2571                &mut psize,
2572                &mut time,
2573            );
2574            if ret_code < 0 {
2575                return Err(ret_code.into());
2576            }
2577        }
2578        Ok((psize, (UNIX_EPOCH + Duration::from_secs(time as u64))))
2579    }
2580}