1#![cfg(unix)]
15#![allow(unused_imports)]
16
17use crate::JsonData;
18
19use crate::admin_sockets::*;
20use crate::error::*;
21use crate::json::*;
22use crate::JsonValue;
23use byteorder::{LittleEndian, WriteBytesExt};
24use futures::task::SpawnExt;
25use libc::*;
26use nom::number::complete::le_u32;
27use nom::IResult;
28use serde_json;
29
30use crate::completion::with_completion;
31use crate::rados::*;
32#[cfg(feature = "rados_striper")]
33use crate::rados_striper::*;
34use crate::status::*;
35use std::ffi::{CStr, CString};
36use std::marker::PhantomData;
37use std::{ptr, str};
38
39use crate::utils::*;
40use std::io::{BufRead, Cursor};
41use std::net::IpAddr;
42use std::time::{Duration, SystemTime, UNIX_EPOCH};
43
44use crate::list_stream::ListStream;
45use crate::read_stream::ReadStream;
46pub use crate::write_sink::WriteSink;
47use std::pin::Pin;
48use std::sync::Arc;
49use std::task::{Context, Poll};
50use uuid::Uuid;
51
52const CEPH_OSD_TMAP_HDR: char = 'h';
53const CEPH_OSD_TMAP_SET: char = 's';
54const CEPH_OSD_TMAP_CREATE: char = 'c';
55const CEPH_OSD_TMAP_RM: char = 'r';
56
57const DEFAULT_READ_BYTES: usize = 64 * 1024;
58
59#[derive(Debug, Clone)]
60pub enum CephHealth {
61 Ok,
62 Warning,
63 Error,
64}
65
66#[derive(Debug, Clone)]
67pub enum CephCommandTypes {
68 Mon,
69 Osd,
70 Pgs,
71}
72
73named!(
74 parse_header<TmapOperation>,
75 do_parse!(
76 char!(CEPH_OSD_TMAP_HDR)
77 >> data_len: le_u32
78 >> data: take!(data_len)
79 >> (TmapOperation::Header {
80 data: data.to_vec()
81 })
82 )
83);
84
85named!(
86 parse_create<TmapOperation>,
87 do_parse!(
88 char!(CEPH_OSD_TMAP_CREATE)
89 >> key_name_len: le_u32
90 >> key_name: take_str!(key_name_len)
91 >> data_len: le_u32
92 >> data: take!(data_len)
93 >> (TmapOperation::Create {
94 name: key_name.to_string(),
95 data: data.to_vec(),
96 })
97 )
98);
99
100named!(
101 parse_set<TmapOperation>,
102 do_parse!(
103 char!(CEPH_OSD_TMAP_SET)
104 >> key_name_len: le_u32
105 >> key_name: take_str!(key_name_len)
106 >> data_len: le_u32
107 >> data: take!(data_len)
108 >> (TmapOperation::Set {
109 key: key_name.to_string(),
110 data: data.to_vec(),
111 })
112 )
113);
114
115named!(
116 parse_remove<TmapOperation>,
117 do_parse!(
118 char!(CEPH_OSD_TMAP_RM)
119 >> key_name_len: le_u32
120 >> key_name: take_str!(key_name_len)
121 >> (TmapOperation::Remove {
122 name: key_name.to_string(),
123 })
124 )
125);
126
127#[derive(Debug)]
128pub enum TmapOperation {
129 Header { data: Vec<u8> },
130 Set { key: String, data: Vec<u8> },
131 Create { name: String, data: Vec<u8> },
132 Remove { name: String },
133}
134
135impl TmapOperation {
136 fn serialize(&self) -> RadosResult<Vec<u8>> {
137 let mut buffer: Vec<u8> = Vec::new();
138 match *self {
139 TmapOperation::Header { ref data } => {
140 buffer.push(CEPH_OSD_TMAP_HDR as u8);
141 buffer.write_u32::<LittleEndian>(data.len() as u32)?;
142 buffer.extend_from_slice(data);
143 }
144 TmapOperation::Set { ref key, ref data } => {
145 buffer.push(CEPH_OSD_TMAP_SET as u8);
146 buffer.write_u32::<LittleEndian>(key.len() as u32)?;
147 buffer.extend(key.as_bytes());
148 buffer.write_u32::<LittleEndian>(data.len() as u32)?;
149 buffer.extend_from_slice(data);
150 }
151 TmapOperation::Create { ref name, ref data } => {
152 buffer.push(CEPH_OSD_TMAP_CREATE as u8);
153 buffer.write_u32::<LittleEndian>(name.len() as u32)?;
154 buffer.extend(name.as_bytes());
155 buffer.write_u32::<LittleEndian>(data.len() as u32)?;
156 buffer.extend_from_slice(data);
157 }
158 TmapOperation::Remove { ref name } => {
159 buffer.push(CEPH_OSD_TMAP_RM as u8);
160 buffer.write_u32::<LittleEndian>(name.len() as u32)?;
161 buffer.extend(name.as_bytes());
162 }
163 }
164 Ok(buffer)
165 }
166
167 fn deserialize(input: &[u8]) -> IResult<&[u8], Vec<TmapOperation>> {
168 many0!(
169 input,
170 alt!(
171 complete!(parse_header)
172 | complete!(parse_create)
173 | complete!(parse_set)
174 | complete!(parse_remove)
175 )
176 )
177 }
178}
179
180#[derive(Debug)]
182pub struct Pool {
183 pub ctx: rados_list_ctx_t,
184}
185
186#[derive(Debug)]
187pub struct CephObject {
188 pub name: String,
189 pub entry_locator: String,
190 pub namespace: String,
191}
192
193impl Iterator for Pool {
194 type Item = CephObject;
195 fn next(&mut self) -> Option<CephObject> {
196 let mut entry_ptr: *mut *const ::libc::c_char = ptr::null_mut();
197 let mut key_ptr: *mut *const ::libc::c_char = ptr::null_mut();
198 let mut nspace_ptr: *mut *const ::libc::c_char = ptr::null_mut();
199
200 unsafe {
201 let ret_code =
202 rados_nobjects_list_next(self.ctx, &mut entry_ptr, &mut key_ptr, &mut nspace_ptr);
203 if ret_code == -ENOENT {
204 rados_nobjects_list_close(self.ctx);
206 None
207 } else if ret_code < 0 {
208 None
210 } else {
211 let object_name = CStr::from_ptr(entry_ptr as *const ::libc::c_char);
212 let mut object_locator = String::new();
213 let mut namespace = String::new();
214 if !key_ptr.is_null() {
215 object_locator.push_str(
216 &CStr::from_ptr(key_ptr as *const ::libc::c_char).to_string_lossy(),
217 );
218 }
219 if !nspace_ptr.is_null() {
220 namespace.push_str(
221 &CStr::from_ptr(nspace_ptr as *const ::libc::c_char).to_string_lossy(),
222 );
223 }
224
225 Some(CephObject {
226 name: object_name.to_string_lossy().into_owned(),
227 entry_locator: object_locator,
228 namespace,
229 })
230 }
231 }
232 }
233}
234
235#[derive(Debug)]
239pub struct ReadOperation {
240 pub object_name: String,
241 pub flags: u32,
245 read_op_handle: rados_read_op_t,
246}
247
248impl Drop for ReadOperation {
249 fn drop(&mut self) {
250 unsafe {
251 rados_release_read_op(self.read_op_handle);
252 }
253 }
254}
255
256#[derive(Debug)]
260pub struct WriteOperation {
261 pub object_name: String,
262 pub flags: u32,
266 pub mtime: time_t,
267 write_op_handle: rados_write_op_t,
268}
269
270impl Drop for WriteOperation {
271 fn drop(&mut self) {
272 unsafe {
273 rados_release_write_op(self.write_op_handle);
274 }
275 }
276}
277
278#[derive(Debug)]
281pub struct XAttr {
282 pub name: String,
283 pub value: String,
284 iter: rados_xattrs_iter_t,
285}
286
287#[derive(Debug)]
289pub struct RadosVersion {
290 pub major: i32,
291 pub minor: i32,
292 pub extra: i32,
293}
294
295impl XAttr {
296 pub fn new(iter: rados_xattrs_iter_t) -> XAttr {
299 XAttr {
300 name: String::new(),
301 value: String::new(),
302 iter,
303 }
304 }
305}
306
307impl Iterator for XAttr {
308 type Item = XAttr;
309
310 fn next(&mut self) -> Option<Self::Item> {
311 let mut name: *const c_char = ptr::null();
313 let mut value: *const c_char = ptr::null();
315 let mut val_length: usize = 0;
316 unsafe {
317 let ret_code = rados_getxattrs_next(self.iter, &mut name, &mut value, &mut val_length);
318
319 if ret_code < 0 {
320 None
322 }
323 else if value.is_null() && val_length == 0 {
325 rados_getxattrs_end(self.iter);
326 None
327 } else {
328 let name = CStr::from_ptr(name);
329 let s_bytes = std::slice::from_raw_parts(value, val_length);
331 let bytes: Vec<u8> = s_bytes.iter().map(|c| *c as u8).collect();
333 Some(XAttr {
334 name: name.to_string_lossy().into_owned(),
335 value: String::from_utf8_lossy(&bytes).into_owned(),
336 iter: self.iter,
337 })
338 }
339 }
340 }
341}
342
343pub struct IoCtx {
345 pub ioctx: rados_ioctx_t,
347}
348
349unsafe impl Send for IoCtx {}
350unsafe impl Sync for IoCtx {}
351
352impl Drop for IoCtx {
353 fn drop(&mut self) {
354 assert!(self.ioctx.is_null(), "Rados not disconnected!");
355 }
361}
362
363#[cfg(feature = "rados_striper")]
365pub struct RadosStriper {
366 rados_striper: rados_ioctx_t,
367}
368
369#[cfg(feature = "rados_striper")]
370impl Drop for RadosStriper {
371 fn drop(&mut self) {
372 if !self.rados_striper.is_null() {
373 unsafe {
374 rados_striper_destroy(self.rados_striper);
375 }
376 }
377 }
378}
379
380pub struct Rados {
382 rados: rados_t,
383 phantom: PhantomData<IoCtx>,
384}
385
386unsafe impl Send for Rados {}
387unsafe impl Sync for Rados {}
388
389impl Drop for Rados {
390 fn drop(&mut self) {
391 assert!(self.rados.is_null(), "Rados not disconnected!");
392 }
398}
399
400pub fn connect_to_ceph(user_id: &str, config_file: &str) -> RadosResult<Rados> {
402 let connect_id = CString::new(user_id)?;
403 let conf_file = CString::new(config_file)?;
404 unsafe {
405 let mut cluster_handle: rados_t = ptr::null_mut();
406 let ret_code = rados_create(&mut cluster_handle, connect_id.as_ptr());
407 if ret_code < 0 {
408 return Err(ret_code.into());
409 }
410 let ret_code = rados_conf_read_file(cluster_handle, conf_file.as_ptr());
411 if ret_code < 0 {
412 return Err(ret_code.into());
413 }
414 let ret_code = rados_connect(cluster_handle);
415 if ret_code < 0 {
416 return Err(ret_code.into());
417 }
418 Ok(Rados {
419 rados: cluster_handle,
420 phantom: PhantomData,
421 })
422 }
423}
424
425pub async fn connect_to_ceph_async(user_id: &str, config_file: &str) -> RadosResult<Rados> {
427 let user_id = user_id.to_string();
428 let config_file = config_file.to_string();
429
430 let pool = futures::executor::ThreadPool::builder()
432 .pool_size(1)
433 .create()
434 .expect("Could not spawn thread pool");
435 pool.spawn_with_handle(async move { connect_to_ceph(&user_id, &config_file) })
436 .expect("Could not spawn background task")
437 .await
438}
439
440pub async fn disconnect_from_ceph_async(mut rados: Rados) {
443 let pool = futures::executor::ThreadPool::builder()
444 .pool_size(1)
445 .create()
446 .expect("Could not spawn thread pool");
447 pool.spawn_with_handle(async move { rados.disconnect_from_ceph() })
448 .expect("Could not spawn background task")
449 .await;
450}
451
452impl Rados {
453 pub fn inner(&self) -> &rados_t {
454 &self.rados
455 }
456
457 fn set_null(&mut self) {
458 self.rados = ptr::null_mut();
459 }
460
461 pub fn disconnect_from_ceph(&mut self) {
465 if self.rados.is_null() {
466 return;
468 }
469 unsafe {
470 rados_shutdown(self.rados);
471 }
472 self.set_null();
474 }
475
476 fn conn_guard(&self) -> RadosResult<()> {
477 if self.rados.is_null() {
478 return Err(RadosError::new(
479 "Rados not connected. Please initialize cluster".to_string(),
480 ));
481 }
482 Ok(())
483 }
484
485 pub fn config_set(&self, name: &str, value: &str) -> RadosResult<()> {
487 if !self.rados.is_null() {
488 return Err(RadosError::new(
489 "Rados should not be connected when this function is called".into(),
490 ));
491 }
492 let name_str = CString::new(name)?;
493 let value_str = CString::new(value)?;
494 unsafe {
495 let ret_code = rados_conf_set(self.rados, name_str.as_ptr(), value_str.as_ptr());
496 if ret_code < 0 {
497 return Err(ret_code.into());
498 }
499 }
500 Ok(())
501 }
502
503 pub fn config_get(&self, name: &str) -> RadosResult<String> {
505 let name_str = CString::new(name)?;
506 let mut buffer: Vec<u8> = Vec::with_capacity(5120);
508 unsafe {
509 let ret_code = rados_conf_get(
510 self.rados,
511 name_str.as_ptr(),
512 buffer.as_mut_ptr() as *mut c_char,
513 buffer.capacity(),
514 );
515 if ret_code < 0 {
516 return Err(ret_code.into());
517 }
518 buffer.set_len(5120);
520 let num_bytes = buffer.iter().position(|x| x == &0u8);
522 buffer.set_len(num_bytes.unwrap_or(0));
523 Ok(String::from_utf8_lossy(&buffer).into_owned())
524 }
525 }
526
527 pub fn get_rados_ioctx(&self, pool_name: &str) -> RadosResult<IoCtx> {
531 self.conn_guard()?;
532 let pool_name_str = CString::new(pool_name)?;
533 unsafe {
534 let mut ioctx: rados_ioctx_t = ptr::null_mut();
535 let ret_code = rados_ioctx_create(self.rados, pool_name_str.as_ptr(), &mut ioctx);
536 if ret_code < 0 {
537 return Err(ret_code.into());
538 }
539 Ok(IoCtx { ioctx })
540 }
541 }
542
543 pub fn get_rados_ioctx2(&self, pool_id: i64) -> RadosResult<IoCtx> {
547 self.conn_guard()?;
548 unsafe {
549 let mut ioctx: rados_ioctx_t = ptr::null_mut();
550 let ret_code = rados_ioctx_create2(self.rados, pool_id, &mut ioctx);
551 if ret_code < 0 {
552 return Err(ret_code.into());
553 }
554 Ok(IoCtx { ioctx })
555 }
556 }
557}
558
559pub async fn destroy_rados_ioctx_async(mut ioctx: IoCtx) {
562 let pool = futures::executor::ThreadPool::builder()
563 .pool_size(1)
564 .create()
565 .expect("Could not spawn thread pool");
566 pool.spawn_with_handle(async move { ioctx.destroy_rados_ioctx() })
567 .expect("Could not spawn background task")
568 .await;
569}
570
571impl IoCtx {
572 pub fn inner(&self) -> &rados_ioctx_t {
573 &self.ioctx
574 }
575
576 fn set_null(&mut self) {
577 self.ioctx = ptr::null_mut();
578 }
579
580 pub fn destroy_rados_ioctx(&mut self) {
588 if self.ioctx.is_null() {
589 return;
591 }
592 unsafe {
593 rados_ioctx_destroy(self.ioctx);
594 }
595 self.set_null();
597 }
598 fn ioctx_guard(&self) -> RadosResult<()> {
599 if self.ioctx.is_null() {
600 return Err(RadosError::new(
601 "Rados ioctx not created. Please initialize first".to_string(),
602 ));
603 }
604 Ok(())
605 }
606 pub fn rados_stat_pool(&self) -> RadosResult<Struct_rados_pool_stat_t> {
608 self.ioctx_guard()?;
609 let mut pool_stat = Struct_rados_pool_stat_t::default();
610 unsafe {
611 let ret_code = rados_ioctx_pool_stat(self.ioctx, &mut pool_stat);
612 if ret_code < 0 {
613 return Err(ret_code.into());
614 }
615 Ok(pool_stat)
616 }
617 }
618
619 pub fn rados_pool_set_auid(&self, auid: u64) -> RadosResult<()> {
620 self.ioctx_guard()?;
621 unsafe {
622 let ret_code = rados_ioctx_pool_set_auid(self.ioctx, auid);
623 if ret_code < 0 {
624 return Err(ret_code.into());
625 }
626 Ok(())
627 }
628 }
629
630 pub fn rados_pool_get_auid(&self) -> RadosResult<u64> {
631 self.ioctx_guard()?;
632 let mut auid: u64 = 0;
633 unsafe {
634 let ret_code = rados_ioctx_pool_get_auid(self.ioctx, &mut auid);
635 if ret_code < 0 {
636 return Err(ret_code.into());
637 }
638 Ok(auid)
639 }
640 }
641
642 pub fn rados_pool_requires_alignment(&self) -> RadosResult<bool> {
644 self.ioctx_guard()?;
645 unsafe {
646 let ret_code = rados_ioctx_pool_requires_alignment(self.ioctx);
647 if ret_code < 0 {
648 return Err(ret_code.into());
649 }
650 if ret_code == 0 {
651 Ok(false)
652 } else {
653 Ok(true)
654 }
655 }
656 }
657
658 pub fn rados_pool_required_alignment(&self) -> RadosResult<u64> {
660 self.ioctx_guard()?;
661 unsafe {
662 let ret_code = rados_ioctx_pool_required_alignment(self.ioctx);
663 Ok(ret_code)
664 }
665 }
666
667 pub fn rados_object_get_id(&self) -> RadosResult<i64> {
669 self.ioctx_guard()?;
670 unsafe {
671 let pool_id = rados_ioctx_get_id(self.ioctx);
672 Ok(pool_id)
673 }
674 }
675
676 pub fn rados_get_pool_name(&self) -> RadosResult<String> {
678 self.ioctx_guard()?;
679 let mut buffer: Vec<u8> = Vec::with_capacity(500);
680
681 unsafe {
682 let ret_code = rados_ioctx_get_pool_name(
684 self.ioctx,
685 buffer.as_mut_ptr() as *mut c_char,
686 buffer.capacity() as c_uint,
687 );
688 if ret_code == -ERANGE {
689 buffer.reserve(1000);
691 buffer.set_len(1000);
692 let ret_code = rados_ioctx_get_pool_name(
693 self.ioctx,
694 buffer.as_mut_ptr() as *mut c_char,
695 buffer.capacity() as c_uint,
696 );
697 if ret_code < 0 {
698 return Err(ret_code.into());
699 }
700 Ok(String::from_utf8_lossy(&buffer).into_owned())
701 } else if ret_code < 0 {
702 Err(ret_code.into())
703 } else {
704 buffer.set_len(ret_code as usize);
705 Ok(String::from_utf8_lossy(&buffer).into_owned())
706 }
707 }
708 }
709
710 pub fn rados_locator_set_key(&self, key: &str) -> RadosResult<()> {
712 self.ioctx_guard()?;
713 let key_str = CString::new(key)?;
714 unsafe {
715 rados_ioctx_locator_set_key(self.ioctx, key_str.as_ptr());
716 }
717 Ok(())
718 }
719
720 pub fn rados_set_namespace(&self, namespace: &str) -> RadosResult<()> {
724 self.ioctx_guard()?;
725 let namespace_str = CString::new(namespace)?;
726 unsafe {
727 rados_ioctx_set_namespace(self.ioctx, namespace_str.as_ptr());
728 }
729 Ok(())
730 }
731
732 pub fn rados_list_pool_objects(&self) -> RadosResult<rados_list_ctx_t> {
734 self.ioctx_guard()?;
735 let mut rados_list_ctx: rados_list_ctx_t = ptr::null_mut();
736 unsafe {
737 let ret_code = rados_nobjects_list_open(self.ioctx, &mut rados_list_ctx);
738 if ret_code < 0 {
739 return Err(ret_code.into());
740 }
741 }
742 Ok(rados_list_ctx)
743 }
744
745 pub fn rados_snap_create(&self, snap_name: &str) -> RadosResult<()> {
747 self.ioctx_guard()?;
748
749 let snap_name_str = CString::new(snap_name)?;
750 unsafe {
751 let ret_code = rados_ioctx_snap_create(self.ioctx, snap_name_str.as_ptr());
752 if ret_code < 0 {
753 return Err(ret_code.into());
754 }
755 }
756 Ok(())
757 }
758
759 pub fn rados_snap_remove(&self, snap_name: &str) -> RadosResult<()> {
761 self.ioctx_guard()?;
762 let snap_name_str = CString::new(snap_name)?;
763
764 unsafe {
765 let ret_code = rados_ioctx_snap_remove(self.ioctx, snap_name_str.as_ptr());
766 if ret_code < 0 {
767 return Err(ret_code.into());
768 }
769 }
770 Ok(())
771 }
772
773 pub fn rados_snap_rollback(&self, object_name: &str, snap_name: &str) -> RadosResult<()> {
777 self.ioctx_guard()?;
778 let snap_name_str = CString::new(snap_name)?;
779 let object_name_str = CString::new(object_name)?;
780
781 unsafe {
782 let ret_code = rados_ioctx_snap_rollback(
783 self.ioctx,
784 object_name_str.as_ptr(),
785 snap_name_str.as_ptr(),
786 );
787 if ret_code < 0 {
788 return Err(ret_code.into());
789 }
790 }
791 Ok(())
792 }
793
794 pub fn rados_snap_set_read(&self, snap_id: u64) -> RadosResult<()> {
798 self.ioctx_guard()?;
799
800 unsafe {
801 rados_ioctx_snap_set_read(self.ioctx, snap_id);
802 }
803 Ok(())
804 }
805
806 pub fn rados_selfmanaged_snap_create(&self) -> RadosResult<u64> {
811 self.ioctx_guard()?;
812 let mut snap_id: u64 = 0;
813 unsafe {
814 let ret_code = rados_ioctx_selfmanaged_snap_create(self.ioctx, &mut snap_id);
815 if ret_code < 0 {
816 return Err(ret_code.into());
817 }
818 }
819 Ok(snap_id)
820 }
821
822 pub fn rados_selfmanaged_snap_remove(&self, snap_id: u64) -> RadosResult<()> {
826 self.ioctx_guard()?;
827
828 unsafe {
829 let ret_code = rados_ioctx_selfmanaged_snap_remove(self.ioctx, snap_id);
830 if ret_code < 0 {
831 return Err(ret_code.into());
832 }
833 }
834 Ok(())
835 }
836
837 pub fn rados_selfmanaged_snap_rollback(
841 &self,
842 object_name: &str,
843 snap_id: u64,
844 ) -> RadosResult<()> {
845 self.ioctx_guard()?;
846 let object_name_str = CString::new(object_name)?;
847
848 unsafe {
849 let ret_code = rados_ioctx_selfmanaged_snap_rollback(
850 self.ioctx,
851 object_name_str.as_ptr(),
852 snap_id,
853 );
854 if ret_code < 0 {
855 return Err(ret_code.into());
856 }
857 }
858 Ok(())
859 }
860
861 pub fn rados_snap_lookup(&self, snap_name: &str) -> RadosResult<u64> {
895 self.ioctx_guard()?;
896 let snap_name_str = CString::new(snap_name)?;
897 let mut snap_id: u64 = 0;
898 unsafe {
899 let ret_code =
900 rados_ioctx_snap_lookup(self.ioctx, snap_name_str.as_ptr(), &mut snap_id);
901 if ret_code < 0 {
902 return Err(ret_code.into());
903 }
904 }
905 Ok(snap_id)
906 }
907
908 pub fn rados_snap_get_name(&self, snap_id: u64) -> RadosResult<String> {
910 self.ioctx_guard()?;
911
912 let out_buffer: Vec<u8> = Vec::with_capacity(500);
913 let out_buff_size = out_buffer.capacity();
914 let out_str = CString::new(out_buffer)?;
915 unsafe {
916 let ret_code = rados_ioctx_snap_get_name(
917 self.ioctx,
918 snap_id,
919 out_str.as_ptr() as *mut c_char,
920 out_buff_size as c_int,
921 );
922 if ret_code == -ERANGE {}
923 if ret_code < 0 {
924 return Err(ret_code.into());
925 }
926 }
927 Ok(out_str.to_string_lossy().into_owned())
928 }
929
930 pub fn rados_snap_get_stamp(&self, snap_id: u64) -> RadosResult<time_t> {
932 self.ioctx_guard()?;
933
934 let mut time_id: time_t = 0;
935 unsafe {
936 let ret_code = rados_ioctx_snap_get_stamp(self.ioctx, snap_id, &mut time_id);
937 if ret_code < 0 {
938 return Err(ret_code.into());
939 }
940 }
941 Ok(time_id)
942 }
943
944 pub fn rados_get_object_last_version(&self) -> RadosResult<u64> {
948 self.ioctx_guard()?;
949 unsafe {
950 let obj_id = rados_get_last_version(self.ioctx);
951 Ok(obj_id)
952 }
953 }
954
955 pub fn rados_object_write(
958 &self,
959 object_name: &str,
960 buffer: &[u8],
961 offset: u64,
962 ) -> RadosResult<()> {
963 self.ioctx_guard()?;
964 let obj_name_str = CString::new(object_name)?;
965
966 unsafe {
967 let ret_code = rados_write(
968 self.ioctx,
969 obj_name_str.as_ptr(),
970 buffer.as_ptr() as *const c_char,
971 buffer.len(),
972 offset,
973 );
974 if ret_code < 0 {
975 return Err(ret_code.into());
976 }
977 }
978 Ok(())
979 }
980
981 pub fn rados_object_write_full(&self, object_name: &str, buffer: &[u8]) -> RadosResult<()> {
985 self.ioctx_guard()?;
986 let obj_name_str = CString::new(object_name)?;
987
988 unsafe {
989 let ret_code = rados_write_full(
990 self.ioctx,
991 obj_name_str.as_ptr(),
992 buffer.as_ptr() as *const ::libc::c_char,
993 buffer.len(),
994 );
995 if ret_code < 0 {
996 return Err(ret_code.into());
997 }
998 }
999 Ok(())
1000 }
1001
1002 pub async fn rados_async_object_write(
1003 &self,
1004 object_name: &str,
1005 buffer: &[u8],
1006 offset: u64,
1007 ) -> RadosResult<u32> {
1008 self.ioctx_guard()?;
1009 let obj_name_str = CString::new(object_name)?;
1010
1011 with_completion(&self, |c| unsafe {
1012 rados_aio_write(
1013 self.ioctx,
1014 obj_name_str.as_ptr(),
1015 c,
1016 buffer.as_ptr() as *const ::libc::c_char,
1017 buffer.len(),
1018 offset,
1019 )
1020 })?
1021 .await
1022 }
1023
1024 pub async fn rados_async_object_append(
1026 self: &Arc<Self>,
1027 object_name: &str,
1028 buffer: &[u8],
1029 ) -> RadosResult<u32> {
1030 self.ioctx_guard()?;
1031 let obj_name_str = CString::new(object_name)?;
1032
1033 with_completion(self, |c| unsafe {
1034 rados_aio_append(
1035 self.ioctx,
1036 obj_name_str.as_ptr(),
1037 c,
1038 buffer.as_ptr() as *const ::libc::c_char,
1039 buffer.len(),
1040 )
1041 })?
1042 .await
1043 }
1044
1045 pub async fn rados_async_object_write_full(
1047 &self,
1048 object_name: &str,
1049 buffer: &[u8],
1050 ) -> RadosResult<u32> {
1051 self.ioctx_guard()?;
1052 let obj_name_str = CString::new(object_name)?;
1053
1054 with_completion(&self, |c| unsafe {
1055 rados_aio_write_full(
1056 self.ioctx,
1057 obj_name_str.as_ptr(),
1058 c,
1059 buffer.as_ptr() as *const ::libc::c_char,
1060 buffer.len(),
1061 )
1062 })?
1063 .await
1064 }
1065
1066 pub async fn rados_async_object_remove(&self, object_name: &str) -> RadosResult<()> {
1068 self.ioctx_guard()?;
1069 let object_name_str = CString::new(object_name)?;
1070
1071 with_completion(self, |c| unsafe {
1072 rados_aio_remove(self.ioctx, object_name_str.as_ptr() as *const c_char, c)
1073 })?
1074 .await
1075 .map(|_r| ())
1076 }
1077
1078 pub async fn rados_async_object_read(
1080 &self,
1081 object_name: &str,
1082 fill_buffer: &mut Vec<u8>,
1083 read_offset: u64,
1084 ) -> RadosResult<u32> {
1085 self.ioctx_guard()?;
1086 let obj_name_str = CString::new(object_name)?;
1087
1088 if fill_buffer.capacity() == 0 {
1089 fill_buffer.reserve_exact(DEFAULT_READ_BYTES);
1090 }
1091
1092 let result = with_completion(self, |c| unsafe {
1093 rados_aio_read(
1094 self.ioctx,
1095 obj_name_str.as_ptr(),
1096 c,
1097 fill_buffer.as_mut_ptr() as *mut c_char,
1098 fill_buffer.capacity(),
1099 read_offset,
1100 )
1101 })?
1102 .await;
1103
1104 if let Ok(rval) = &result {
1105 unsafe {
1106 let len = *rval as usize;
1107 assert!(len <= fill_buffer.capacity());
1108 fill_buffer.set_len(len);
1109 }
1110 }
1111
1112 result
1113 }
1114
1115 pub fn rados_async_object_read_stream(
1135 &self,
1136 object_name: &str,
1137 buffer_size: Option<usize>,
1138 concurrency: Option<usize>,
1139 size_hint: Option<u64>,
1140 ) -> ReadStream<'_> {
1141 ReadStream::new(self, object_name, buffer_size, concurrency, size_hint)
1142 }
1143
1144 pub fn rados_async_object_write_stream(
1154 &self,
1155 object_name: &str,
1156 concurrency: Option<usize>,
1157 ) -> WriteSink<'_> {
1158 WriteSink::new(self, object_name, concurrency)
1159 }
1160
1161 pub async fn rados_async_object_stat(
1163 &self,
1164 object_name: &str,
1165 ) -> RadosResult<(u64, SystemTime)> {
1166 self.ioctx_guard()?;
1167 let object_name_str = CString::new(object_name)?;
1168 let mut psize: u64 = 0;
1169 let mut time: ::libc::time_t = 0;
1170
1171 with_completion(self, |c| unsafe {
1172 rados_aio_stat(
1173 self.ioctx,
1174 object_name_str.as_ptr(),
1175 c,
1176 &mut psize,
1177 &mut time,
1178 )
1179 })?
1180 .await?;
1181 Ok((psize, (UNIX_EPOCH + Duration::from_secs(time as u64))))
1182 }
1183
1184 pub fn rados_async_object_list(&self) -> RadosResult<ListStream> {
1185 self.ioctx_guard()?;
1186 let mut rados_list_ctx: rados_list_ctx_t = ptr::null_mut();
1187 unsafe {
1188 let r = rados_nobjects_list_open(self.ioctx, &mut rados_list_ctx);
1189 if r == 0 {
1190 Ok(ListStream::new(rados_list_ctx))
1191 } else {
1192 Err(r.into())
1193 }
1194 }
1195 }
1196
1197 pub async fn rados_async_object_getxattr(
1199 &self,
1200 object_name: &str,
1201 attr_name: &str,
1202 fill_buffer: &mut [u8],
1203 ) -> RadosResult<u32> {
1204 self.ioctx_guard()?;
1205 let object_name_str = CString::new(object_name)?;
1206 let attr_name_str = CString::new(attr_name)?;
1207
1208 with_completion(self, |c| unsafe {
1209 rados_aio_getxattr(
1210 self.ioctx,
1211 object_name_str.as_ptr() as *const c_char,
1212 c,
1213 attr_name_str.as_ptr() as *const c_char,
1214 fill_buffer.as_mut_ptr() as *mut c_char,
1215 fill_buffer.len(),
1216 )
1217 })?
1218 .await
1219 }
1220
1221 pub async fn rados_async_object_setxattr(
1223 &self,
1224 object_name: &str,
1225 attr_name: &str,
1226 attr_value: &[u8],
1227 ) -> RadosResult<u32> {
1228 self.ioctx_guard()?;
1229 let object_name_str = CString::new(object_name)?;
1230 let attr_name_str = CString::new(attr_name)?;
1231
1232 with_completion(self, |c| unsafe {
1233 rados_aio_setxattr(
1234 self.ioctx,
1235 object_name_str.as_ptr() as *const c_char,
1236 c,
1237 attr_name_str.as_ptr() as *const c_char,
1238 attr_value.as_ptr() as *mut c_char,
1239 attr_value.len(),
1240 )
1241 })?
1242 .await
1243 }
1244
1245 pub async fn rados_async_object_rmxattr(
1247 &self,
1248 object_name: &str,
1249 attr_name: &str,
1250 ) -> RadosResult<u32> {
1251 self.ioctx_guard()?;
1252 let object_name_str = CString::new(object_name)?;
1253 let attr_name_str = CString::new(attr_name)?;
1254
1255 with_completion(self, |c| unsafe {
1256 rados_aio_rmxattr(
1257 self.ioctx,
1258 object_name_str.as_ptr() as *const c_char,
1259 c,
1260 attr_name_str.as_ptr() as *const c_char,
1261 )
1262 })?
1263 .await
1264 }
1265
1266 pub fn rados_object_clone_range(
1273 &self,
1274 dst_object_name: &str,
1275 dst_offset: u64,
1276 src_object_name: &str,
1277 src_offset: u64,
1278 length: usize,
1279 ) -> RadosResult<()> {
1280 self.ioctx_guard()?;
1281 let dst_name_str = CString::new(dst_object_name)?;
1282 let src_name_str = CString::new(src_object_name)?;
1283
1284 unsafe {
1285 let ret_code = rados_clone_range(
1286 self.ioctx,
1287 dst_name_str.as_ptr(),
1288 dst_offset,
1289 src_name_str.as_ptr(),
1290 src_offset,
1291 length,
1292 );
1293 if ret_code < 0 {
1294 return Err(ret_code.into());
1295 }
1296 }
1297 Ok(())
1298 }
1299
1300 pub fn rados_object_append(&self, object_name: &str, buffer: &[u8]) -> RadosResult<()> {
1302 self.ioctx_guard()?;
1303 let obj_name_str = CString::new(object_name)?;
1304
1305 unsafe {
1306 let ret_code = rados_append(
1307 self.ioctx,
1308 obj_name_str.as_ptr(),
1309 buffer.as_ptr() as *const c_char,
1310 buffer.len(),
1311 );
1312 if ret_code < 0 {
1313 return Err(ret_code.into());
1314 }
1315 }
1316 Ok(())
1317 }
1318
1319 pub fn rados_object_read(
1326 &self,
1327 object_name: &str,
1328 fill_buffer: &mut Vec<u8>,
1329 read_offset: u64,
1330 ) -> RadosResult<i32> {
1331 self.ioctx_guard()?;
1332 let object_name_str = CString::new(object_name)?;
1333 let mut len = fill_buffer.capacity();
1334 if len == 0 {
1335 fill_buffer.reserve_exact(DEFAULT_READ_BYTES);
1336 len = fill_buffer.capacity();
1337 }
1338
1339 unsafe {
1340 let ret_code = rados_read(
1341 self.ioctx,
1342 object_name_str.as_ptr(),
1343 fill_buffer.as_mut_ptr() as *mut c_char,
1344 len,
1345 read_offset,
1346 );
1347 if ret_code < 0 {
1348 return Err(ret_code.into());
1349 }
1350 fill_buffer.set_len(ret_code as usize);
1351 Ok(ret_code)
1352 }
1353 }
1354
1355 pub fn rados_object_remove(&self, object_name: &str) -> RadosResult<()> {
1358 self.ioctx_guard()?;
1359 let object_name_str = CString::new(object_name)?;
1360
1361 unsafe {
1362 let ret_code = rados_remove(self.ioctx, object_name_str.as_ptr() as *const c_char);
1363 if ret_code < 0 {
1364 return Err(ret_code.into());
1365 }
1366 }
1367 Ok(())
1368 }
1369
1370 pub fn rados_object_trunc(&self, object_name: &str, new_size: u64) -> RadosResult<()> {
1374 self.ioctx_guard()?;
1375 let object_name_str = CString::new(object_name)?;
1376
1377 unsafe {
1378 let ret_code = rados_trunc(self.ioctx, object_name_str.as_ptr(), new_size);
1379 if ret_code < 0 {
1380 return Err(ret_code.into());
1381 }
1382 }
1383 Ok(())
1384 }
1385
1386 pub fn rados_object_getxattr(
1388 &self,
1389 object_name: &str,
1390 attr_name: &str,
1391 fill_buffer: &mut [u8],
1392 ) -> RadosResult<i32> {
1393 self.ioctx_guard()?;
1394 let object_name_str = CString::new(object_name)?;
1395 let attr_name_str = CString::new(attr_name)?;
1396
1397 unsafe {
1398 let ret_code = rados_getxattr(
1399 self.ioctx,
1400 object_name_str.as_ptr() as *const c_char,
1401 attr_name_str.as_ptr() as *const c_char,
1402 fill_buffer.as_mut_ptr() as *mut c_char,
1403 fill_buffer.len(),
1404 );
1405 if ret_code < 0 {
1406 return Err(ret_code.into());
1407 }
1408 Ok(ret_code)
1409 }
1410 }
1411
1412 pub fn rados_object_setxattr(
1414 &self,
1415 object_name: &str,
1416 attr_name: &str,
1417 attr_value: &mut [u8],
1418 ) -> RadosResult<()> {
1419 self.ioctx_guard()?;
1420 let object_name_str = CString::new(object_name)?;
1421 let attr_name_str = CString::new(attr_name)?;
1422
1423 unsafe {
1424 let ret_code = rados_setxattr(
1425 self.ioctx,
1426 object_name_str.as_ptr() as *const c_char,
1427 attr_name_str.as_ptr() as *const c_char,
1428 attr_value.as_mut_ptr() as *mut c_char,
1429 attr_value.len(),
1430 );
1431 if ret_code < 0 {
1432 return Err(ret_code.into());
1433 }
1434 }
1435 Ok(())
1436 }
1437
1438 pub fn rados_object_rmxattr(&self, object_name: &str, attr_name: &str) -> RadosResult<()> {
1440 self.ioctx_guard()?;
1441 let object_name_str = CString::new(object_name)?;
1442 let attr_name_str = CString::new(attr_name)?;
1443
1444 unsafe {
1445 let ret_code = rados_rmxattr(
1446 self.ioctx,
1447 object_name_str.as_ptr() as *const c_char,
1448 attr_name_str.as_ptr() as *const c_char,
1449 );
1450 if ret_code < 0 {
1451 return Err(ret_code.into());
1452 }
1453 }
1454 Ok(())
1455 }
1456
1457 pub fn rados_get_xattr_iterator(&self, object_name: &str) -> RadosResult<rados_xattrs_iter_t> {
1460 self.ioctx_guard()?;
1461 let object_name_str = CString::new(object_name)?;
1462 let mut xattr_iterator_handle: rados_xattrs_iter_t = ptr::null_mut();
1463
1464 unsafe {
1465 let ret_code = rados_getxattrs(
1466 self.ioctx,
1467 object_name_str.as_ptr(),
1468 &mut xattr_iterator_handle,
1469 );
1470 if ret_code < 0 {
1471 return Err(ret_code.into());
1472 }
1473 }
1474 Ok(xattr_iterator_handle)
1475 }
1476
1477 pub fn rados_object_stat(&self, object_name: &str) -> RadosResult<(u64, SystemTime)> {
1479 self.ioctx_guard()?;
1480 let object_name_str = CString::new(object_name)?;
1481 let mut psize: u64 = 0;
1482 let mut time: ::libc::time_t = 0;
1483
1484 unsafe {
1485 let ret_code = rados_stat(self.ioctx, object_name_str.as_ptr(), &mut psize, &mut time);
1486 if ret_code < 0 {
1487 return Err(ret_code.into());
1488 }
1489 }
1490 Ok((psize, (UNIX_EPOCH + Duration::from_secs(time as u64))))
1491 }
1492
1493 pub fn rados_object_tmap_update(
1495 &self,
1496 object_name: &str,
1497 update: TmapOperation,
1498 ) -> RadosResult<()> {
1499 self.ioctx_guard()?;
1500 let object_name_str = CString::new(object_name)?;
1501 let buffer = update.serialize()?;
1502 unsafe {
1503 let ret_code = rados_tmap_update(
1504 self.ioctx,
1505 object_name_str.as_ptr(),
1506 buffer.as_ptr() as *const c_char,
1507 buffer.len(),
1508 );
1509 if ret_code < 0 {
1510 return Err(ret_code.into());
1511 }
1512 }
1513 Ok(())
1514 }
1515
1516 pub fn rados_object_tmap_get(&self, object_name: &str) -> RadosResult<Vec<TmapOperation>> {
1518 self.ioctx_guard()?;
1519 let object_name_str = CString::new(object_name)?;
1520 let mut buffer: Vec<u8> = Vec::with_capacity(500);
1521
1522 unsafe {
1523 let ret_code = rados_tmap_get(
1524 self.ioctx,
1525 object_name_str.as_ptr(),
1526 buffer.as_mut_ptr() as *mut c_char,
1527 buffer.capacity(),
1528 );
1529 if ret_code == -ERANGE {
1530 buffer.reserve(1000);
1531 buffer.set_len(1000);
1532 let ret_code = rados_tmap_get(
1533 self.ioctx,
1534 object_name_str.as_ptr(),
1535 buffer.as_mut_ptr() as *mut c_char,
1536 buffer.capacity(),
1537 );
1538 if ret_code < 0 {
1539 return Err(ret_code.into());
1540 }
1541 } else if ret_code < 0 {
1542 return Err(ret_code.into());
1543 }
1544 }
1545 match TmapOperation::deserialize(&buffer) {
1546 Ok((_, tmap)) => Ok(tmap),
1547 Err(nom::Err::Incomplete(needed)) => Err(RadosError::new(format!(
1548 "deserialize of ceph tmap failed.
1549 Input from Ceph was too small. Needed: {:?} more bytes",
1550 needed
1551 ))),
1552 Err(nom::Err::Error(e)) => Err(RadosError::new(
1553 String::from_utf8_lossy(e.input).to_string(),
1554 )),
1555 Err(nom::Err::Failure(e)) => Err(RadosError::new(
1556 String::from_utf8_lossy(e.input).to_string(),
1557 )),
1558 }
1559 }
1560
1561 pub fn rados_object_exec(
1570 &self,
1571 object_name: &str,
1572 class_name: &str,
1573 method_name: &str,
1574 input_buffer: &[u8],
1575 output_buffer: &mut [u8],
1576 ) -> RadosResult<()> {
1577 self.ioctx_guard()?;
1578 let object_name_str = CString::new(object_name)?;
1579 let class_name_str = CString::new(class_name)?;
1580 let method_name_str = CString::new(method_name)?;
1581
1582 unsafe {
1583 let ret_code = rados_exec(
1584 self.ioctx,
1585 object_name_str.as_ptr(),
1586 class_name_str.as_ptr(),
1587 method_name_str.as_ptr(),
1588 input_buffer.as_ptr() as *const c_char,
1589 input_buffer.len(),
1590 output_buffer.as_mut_ptr() as *mut c_char,
1591 output_buffer.len(),
1592 );
1593 if ret_code < 0 {
1594 return Err(ret_code.into());
1595 }
1596 }
1597 Ok(())
1598 }
1599
1600 pub fn rados_object_notify(&self, object_name: &str, data: &[u8]) -> RadosResult<()> {
1604 self.ioctx_guard()?;
1605 let object_name_str = CString::new(object_name)?;
1606
1607 unsafe {
1608 let ret_code = rados_notify(
1609 self.ioctx,
1610 object_name_str.as_ptr(),
1611 0,
1612 data.as_ptr() as *const c_char,
1613 data.len() as i32,
1614 );
1615 if ret_code < 0 {
1616 return Err(ret_code.into());
1617 }
1618 }
1619 Ok(())
1620 }
1621 pub fn rados_object_notify_ack(
1634 &self,
1635 object_name: &str,
1636 notify_id: u64,
1637 cookie: u64,
1638 buffer: Option<&[u8]>,
1639 ) -> RadosResult<()> {
1640 self.ioctx_guard()?;
1641 let object_name_str = CString::new(object_name)?;
1642
1643 match buffer {
1644 Some(buf) => unsafe {
1645 let ret_code = rados_notify_ack(
1646 self.ioctx,
1647 object_name_str.as_ptr(),
1648 notify_id,
1649 cookie,
1650 buf.as_ptr() as *const c_char,
1651 buf.len() as i32,
1652 );
1653 if ret_code < 0 {
1654 return Err(ret_code.into());
1655 }
1656 },
1657 None => unsafe {
1658 let ret_code = rados_notify_ack(
1659 self.ioctx,
1660 object_name_str.as_ptr(),
1661 notify_id,
1662 cookie,
1663 ptr::null(),
1664 0,
1665 );
1666 if ret_code < 0 {
1667 return Err(ret_code.into());
1668 }
1669 },
1670 }
1671 Ok(())
1672 }
1673 pub fn rados_object_set_alloc_hint(
1679 &self,
1680 object_name: &str,
1681 expected_object_size: u64,
1682 expected_write_size: u64,
1683 ) -> RadosResult<()> {
1684 self.ioctx_guard()?;
1685 let object_name_str = CString::new(object_name)?;
1686
1687 unsafe {
1688 let ret_code = rados_set_alloc_hint(
1689 self.ioctx,
1690 object_name_str.as_ptr(),
1691 expected_object_size,
1692 expected_write_size,
1693 );
1694 if ret_code < 0 {
1695 return Err(ret_code.into());
1696 }
1697 }
1698 Ok(())
1699 }
1700
1701 pub fn rados_perform_read_operations(&self, read_op: ReadOperation) -> RadosResult<()> {
1703 self.ioctx_guard()?;
1704 let object_name_str = CString::new(read_op.object_name.clone())?;
1705
1706 unsafe {
1707 let ret_code = rados_read_op_operate(
1708 read_op.read_op_handle,
1709 self.ioctx,
1710 object_name_str.as_ptr(),
1711 read_op.flags as i32,
1712 );
1713 if ret_code < 0 {
1714 return Err(ret_code.into());
1715 }
1716 }
1717 Ok(())
1718 }
1719
1720 pub fn rados_commit_write_operations(&self, write_op: &mut WriteOperation) -> RadosResult<()> {
1722 self.ioctx_guard()?;
1723 let object_name_str = CString::new(write_op.object_name.clone())?;
1724
1725 unsafe {
1726 let ret_code = rados_write_op_operate(
1727 write_op.write_op_handle,
1728 self.ioctx,
1729 object_name_str.as_ptr(),
1730 &mut write_op.mtime,
1731 write_op.flags as i32,
1732 );
1733 if ret_code < 0 {
1734 return Err(ret_code.into());
1735 }
1736 }
1737 Ok(())
1738 }
1739
1740 pub fn rados_object_lock_exclusive(
1742 &self,
1743 object_name: &str,
1744 lock_name: &str,
1745 cookie_name: &str,
1746 description: &str,
1747 duration_time: &mut timeval,
1748 lock_flags: u8,
1749 ) -> RadosResult<()> {
1750 self.ioctx_guard()?;
1751 let object_name_str = CString::new(object_name)?;
1752 let lock_name_str = CString::new(lock_name)?;
1753 let cookie_name_str = CString::new(cookie_name)?;
1754 let description_str = CString::new(description)?;
1755
1756 unsafe {
1757 let ret_code = rados_lock_exclusive(
1758 self.ioctx,
1759 object_name_str.as_ptr(),
1760 lock_name_str.as_ptr(),
1761 cookie_name_str.as_ptr(),
1762 description_str.as_ptr(),
1763 duration_time,
1764 lock_flags,
1765 );
1766 if ret_code < 0 {
1767 return Err(ret_code.into());
1768 }
1769 }
1770 Ok(())
1771 }
1772
1773 pub fn rados_object_lock_shared(
1775 &self,
1776 object_name: &str,
1777 lock_name: &str,
1778 cookie_name: &str,
1779 description: &str,
1780 tag_name: &str,
1781 duration_time: &mut timeval,
1782 lock_flags: u8,
1783 ) -> RadosResult<()> {
1784 self.ioctx_guard()?;
1785 let object_name_str = CString::new(object_name)?;
1786 let lock_name_str = CString::new(lock_name)?;
1787 let cookie_name_str = CString::new(cookie_name)?;
1788 let description_str = CString::new(description)?;
1789 let tag_name_str = CString::new(tag_name)?;
1790
1791 unsafe {
1792 let ret_code = rados_lock_shared(
1793 self.ioctx,
1794 object_name_str.as_ptr(),
1795 lock_name_str.as_ptr(),
1796 cookie_name_str.as_ptr(),
1797 tag_name_str.as_ptr(),
1798 description_str.as_ptr(),
1799 duration_time,
1800 lock_flags,
1801 );
1802 if ret_code < 0 {
1803 return Err(ret_code.into());
1804 }
1805 }
1806 Ok(())
1807 }
1808
1809 pub fn rados_object_unlock(
1811 &self,
1812 object_name: &str,
1813 lock_name: &str,
1814 cookie_name: &str,
1815 ) -> RadosResult<()> {
1816 self.ioctx_guard()?;
1817 let object_name_str = CString::new(object_name)?;
1818 let lock_name_str = CString::new(lock_name)?;
1819 let cookie_name_str = CString::new(cookie_name)?;
1820
1821 unsafe {
1822 let ret_code = rados_unlock(
1823 self.ioctx,
1824 object_name_str.as_ptr(),
1825 lock_name_str.as_ptr(),
1826 cookie_name_str.as_ptr(),
1827 );
1828 if ret_code < 0 {
1829 return Err(ret_code.into());
1830 }
1831 }
1832 Ok(())
1833 }
1834
1835 pub fn rados_object_break_lock(
1868 &self,
1869 object_name: &str,
1870 lock_name: &str,
1871 client_name: &str,
1872 cookie_name: &str,
1873 ) -> RadosResult<()> {
1874 self.ioctx_guard()?;
1875 let object_name_str = CString::new(object_name)?;
1876 let lock_name_str = CString::new(lock_name)?;
1877 let cookie_name_str = CString::new(cookie_name)?;
1878 let client_name_str = CString::new(client_name)?;
1879
1880 unsafe {
1881 let ret_code = rados_break_lock(
1882 self.ioctx,
1883 object_name_str.as_ptr(),
1884 lock_name_str.as_ptr(),
1885 client_name_str.as_ptr(),
1886 cookie_name_str.as_ptr(),
1887 );
1888 if ret_code < 0 {
1889 return Err(ret_code.into());
1890 }
1891 }
1892 Ok(())
1893 }
1894
1895 #[cfg(feature = "rados_striper")]
1898 pub fn get_rados_striper(self) -> RadosResult<RadosStriper> {
1899 self.ioctx_guard()?;
1900 unsafe {
1901 let mut rados_striper: rados_striper_t = ptr::null_mut();
1902 let ret_code = rados_striper_create(self.ioctx, &mut rados_striper);
1903 if ret_code < 0 {
1904 return Err(ret_code.into());
1905 }
1906 Ok(RadosStriper { rados_striper })
1907 }
1908 }
1909}
1910
1911impl Rados {
1912 pub fn rados_blacklist_client(&self, client: IpAddr, expire_seconds: u32) -> RadosResult<()> {
1913 self.conn_guard()?;
1914 let client_address = CString::new(client.to_string())?;
1915 unsafe {
1916 let ret_code = rados_blacklist_add(
1917 self.rados,
1918 client_address.as_ptr() as *mut c_char,
1919 expire_seconds,
1920 );
1921
1922 if ret_code < 0 {
1923 return Err(ret_code.into());
1924 }
1925 }
1926 Ok(())
1927 }
1928
1929 #[allow(unused_variables)]
1940 pub fn rados_pools(&self) -> RadosResult<Vec<String>> {
1941 self.conn_guard()?;
1942 let mut pools: Vec<String> = Vec::new();
1943 let pool_slice: &[u8];
1944 let mut pool_buffer: Vec<u8> = Vec::with_capacity(500);
1945
1946 unsafe {
1947 let len = rados_pool_list(
1948 self.rados,
1949 pool_buffer.as_mut_ptr() as *mut c_char,
1950 pool_buffer.capacity(),
1951 );
1952 if len > pool_buffer.capacity() as i32 {
1953 pool_buffer.reserve(len as usize);
1955 let len = rados_pool_list(
1956 self.rados,
1957 pool_buffer.as_mut_ptr() as *mut c_char,
1958 pool_buffer.capacity(),
1959 );
1960 pool_buffer.set_len(len as usize);
1962 } else {
1963 pool_buffer.set_len(len as usize);
1965 }
1966 }
1967 let mut cursor = Cursor::new(&pool_buffer);
1968 loop {
1969 let mut string_buf: Vec<u8> = Vec::new();
1970 let read = cursor.read_until(0x00, &mut string_buf)?;
1971 if read == 0 || read == 1 {
1974 break;
1975 } else {
1976 pools.push(String::from_utf8_lossy(&string_buf[..read - 1]).into_owned());
1978 }
1979 }
1980
1981 Ok(pools)
1982 }
1983
1984 pub fn rados_create_pool(&self, pool_name: &str) -> RadosResult<()> {
1988 self.conn_guard()?;
1989 let pool_name_str = CString::new(pool_name)?;
1990 unsafe {
1991 let ret_code = rados_pool_create(self.rados, pool_name_str.as_ptr());
1992 if ret_code < 0 {
1993 return Err(ret_code.into());
1994 }
1995 }
1996 Ok(())
1997 }
1998 pub fn rados_delete_pool(&self, pool_name: &str) -> RadosResult<()> {
2003 self.conn_guard()?;
2004 let pool_name_str = CString::new(pool_name)?;
2005 unsafe {
2006 let ret_code = rados_pool_delete(self.rados, pool_name_str.as_ptr());
2007 if ret_code < 0 {
2008 return Err(ret_code.into());
2009 }
2010 }
2011 Ok(())
2012 }
2013
2014 pub fn rados_lookup_pool(&self, pool_name: &str) -> RadosResult<Option<i64>> {
2017 self.conn_guard()?;
2018 let pool_name_str = CString::new(pool_name)?;
2019 unsafe {
2020 let ret_code: i64 = rados_pool_lookup(self.rados, pool_name_str.as_ptr());
2021 if ret_code >= 0 {
2022 Ok(Some(ret_code))
2023 } else if ret_code as i32 == -ENOENT {
2024 Ok(None)
2025 } else {
2026 Err((ret_code as i32).into())
2027 }
2028 }
2029 }
2030
2031 pub fn rados_reverse_lookup_pool(&self, pool_id: i64) -> RadosResult<String> {
2032 self.conn_guard()?;
2033 let mut buffer: Vec<u8> = Vec::with_capacity(500);
2034
2035 unsafe {
2036 let ret_code = rados_pool_reverse_lookup(
2037 self.rados,
2038 pool_id,
2039 buffer.as_mut_ptr() as *mut c_char,
2040 buffer.capacity(),
2041 );
2042 if ret_code == -ERANGE {
2043 buffer.reserve(1000);
2045 buffer.set_len(1000);
2046 let ret_code = rados_pool_reverse_lookup(
2047 self.rados,
2048 pool_id,
2049 buffer.as_mut_ptr() as *mut c_char,
2050 buffer.capacity(),
2051 );
2052 if ret_code < 0 {
2053 return Err(ret_code.into());
2054 }
2055 Ok(String::from_utf8_lossy(&buffer).into_owned())
2056 } else if ret_code < 0 {
2057 Err(ret_code.into())
2058 } else {
2059 Ok(String::from_utf8_lossy(&buffer).into_owned())
2060 }
2061 }
2062 }
2063}
2064
2065pub fn rados_libversion() -> RadosVersion {
2067 let mut major: c_int = 0;
2068 let mut minor: c_int = 0;
2069 let mut extra: c_int = 0;
2070 unsafe {
2071 rados_version(&mut major, &mut minor, &mut extra);
2072 }
2073 RadosVersion {
2074 major,
2075 minor,
2076 extra,
2077 }
2078}
2079
2080impl Rados {
2081 pub fn rados_stat_cluster(&self) -> RadosResult<Struct_rados_cluster_stat_t> {
2088 self.conn_guard()?;
2089 let mut cluster_stat = Struct_rados_cluster_stat_t::default();
2090 unsafe {
2091 let ret_code = rados_cluster_stat(self.rados, &mut cluster_stat);
2092 if ret_code < 0 {
2093 return Err(ret_code.into());
2094 }
2095 }
2096
2097 Ok(cluster_stat)
2098 }
2099
2100 pub fn rados_fsid(&self) -> RadosResult<Uuid> {
2101 self.conn_guard()?;
2102 let mut fsid_buffer: Vec<u8> = Vec::with_capacity(37);
2103 unsafe {
2104 let ret_code = rados_cluster_fsid(
2105 self.rados,
2106 fsid_buffer.as_mut_ptr() as *mut c_char,
2107 fsid_buffer.capacity(),
2108 );
2109 if ret_code < 0 {
2110 return Err(ret_code.into());
2111 }
2112 fsid_buffer.set_len(ret_code as usize);
2114 }
2115 let fsid_str = String::from_utf8(fsid_buffer)?;
2117 Ok(fsid_str.parse()?)
2119 }
2120
2121 pub fn ping_monitor(&self, mon_id: &str) -> RadosResult<String> {
2126 self.conn_guard()?;
2127
2128 let mon_id_str = CString::new(mon_id)?;
2129 let mut out_str: *mut c_char = ptr::null_mut();
2130 let mut str_length: usize = 0;
2131 unsafe {
2132 let ret_code = rados_ping_monitor(
2133 self.rados,
2134 mon_id_str.as_ptr(),
2135 &mut out_str,
2136 &mut str_length,
2137 );
2138 if ret_code < 0 {
2139 return Err(ret_code.into());
2140 }
2141 if !out_str.is_null() {
2142 let s_bytes = std::slice::from_raw_parts(out_str, str_length);
2144 let bytes: Vec<u8> = s_bytes.iter().map(|c| *c as u8).collect();
2146 rados_buffer_free(out_str);
2148 Ok(String::from_utf8_lossy(&bytes).into_owned())
2149 } else {
2150 Ok("".into())
2151 }
2152 }
2153 }
2154}
2155
2156pub fn ceph_version(socket: &str) -> Option<String> {
2162 let cmd = "version";
2163
2164 admin_socket_command(&cmd, socket).ok().and_then(|json| {
2165 json_data(&json)
2166 .and_then(|jsondata| json_find(jsondata, &[cmd]).map(|data| json_as_string(&data)))
2167 })
2168}
2169
2170pub fn ceph_version_parse() -> Option<String> {
2174 match run_cli("ceph --version") {
2175 Ok(output) => {
2176 let n = output.status.code().unwrap();
2177 if n == 0 {
2178 Some(String::from_utf8_lossy(&output.stdout).to_string())
2179 } else {
2180 Some(String::from_utf8_lossy(&output.stderr).to_string())
2181 }
2182 }
2183 Err(_) => None,
2184 }
2185}
2186
2187impl Rados {
2188 pub fn ceph_status(&self, keys: &[&str]) -> RadosResult<String> {
2190 self.conn_guard()?;
2191 match self.ceph_mon_command("prefix", "status", Some("json")) {
2192 Ok((json, _)) => match json {
2193 Some(json) => match json_data(&json) {
2194 Some(jsondata) => {
2195 if let Some(data) = json_find(jsondata, keys) {
2196 Ok(json_as_string(&data))
2197 } else {
2198 Err(RadosError::new(
2199 "The attributes were not found in the output.".to_string(),
2200 ))
2201 }
2202 }
2203 _ => Err(RadosError::new("JSON data not found.".to_string())),
2204 },
2205 _ => Err(RadosError::new("JSON data not found.".to_string())),
2206 },
2207 Err(e) => Err(e),
2208 }
2209 }
2210
2211 pub fn ceph_health_string(&self) -> RadosResult<String> {
2214 self.conn_guard()?;
2215 match self.ceph_mon_command("prefix", "health", None) {
2216 Ok((data, _)) => Ok(data.unwrap().replace("\n", "")),
2217 Err(e) => Err(e),
2218 }
2219 }
2220
2221 pub fn ceph_health(&self) -> CephHealth {
2226 match self.ceph_health_string() {
2227 Ok(health) => {
2228 if health.contains("HEALTH_OK") {
2229 CephHealth::Ok
2230 } else if health.contains("HEALTH_WARN") {
2231 CephHealth::Warning
2232 } else {
2233 CephHealth::Error
2234 }
2235 }
2236 Err(_) => CephHealth::Error,
2237 }
2238 }
2239
2240 pub fn ceph_command(
2242 &self,
2243 name: &str,
2244 value: &str,
2245 cmd_type: CephCommandTypes,
2246 keys: &[&str],
2247 ) -> RadosResult<JsonData> {
2248 self.conn_guard()?;
2249 match cmd_type {
2250 CephCommandTypes::Osd => Err(RadosError::new("OSD CMDs Not implemented.".to_string())),
2251 CephCommandTypes::Pgs => Err(RadosError::new("PGS CMDS Not implemented.".to_string())),
2252 _ => match self.ceph_mon_command(name, value, Some("json")) {
2253 Ok((json, _)) => match json {
2254 Some(json) => match json_data(&json) {
2255 Some(jsondata) => {
2256 if let Some(data) = json_find(jsondata, keys) {
2257 Ok(data)
2258 } else {
2259 Err(RadosError::new(
2260 "The attributes were not found in the output.".to_string(),
2261 ))
2262 }
2263 }
2264 _ => Err(RadosError::new("JSON data not found.".to_string())),
2265 },
2266 _ => Err(RadosError::new("JSON data not found.".to_string())),
2267 },
2268 Err(e) => Err(e),
2269 },
2270 }
2271 }
2272
2273 pub fn ceph_commands(&self, keys: Option<&[&str]>) -> RadosResult<JsonData> {
2275 self.conn_guard()?;
2276 match self.ceph_mon_command("prefix", "get_command_descriptions", Some("json")) {
2277 Ok((json, _)) => match json {
2278 Some(json) => match json_data(&json) {
2279 Some(jsondata) => {
2280 if let Some(k) = keys {
2281 if let Some(data) = json_find(jsondata, k) {
2282 Ok(data)
2283 } else {
2284 Err(RadosError::new(
2285 "The attributes were not found in the output.".to_string(),
2286 ))
2287 }
2288 } else {
2289 Ok(jsondata)
2290 }
2291 }
2292 _ => Err(RadosError::new("JSON data not found.".to_string())),
2293 },
2294 _ => Err(RadosError::new("JSON data not found.".to_string())),
2295 },
2296 Err(e) => Err(e),
2297 }
2298 }
2299
2300 pub fn ceph_mon_command(
2302 &self,
2303 name: &str,
2304 value: &str,
2305 format: Option<&str>,
2306 ) -> RadosResult<(Option<String>, Option<String>)> {
2307 let data: Vec<*mut c_char> = Vec::with_capacity(1);
2308 self.ceph_mon_command_with_data(name, value, format, data)
2309 }
2310
2311 pub fn ceph_mon_command_without_data(
2312 &self,
2313 cmd: &serde_json::Value,
2314 ) -> RadosResult<(Vec<u8>, Option<String>)> {
2315 self.conn_guard()?;
2316 let cmd_string = cmd.to_string();
2317 debug!("ceph_mon_command_without_data: {}", cmd_string);
2318 let data: Vec<*mut c_char> = Vec::with_capacity(1);
2319 let cmds = CString::new(cmd_string).unwrap();
2320
2321 let mut outbuf_len = 0;
2322 let mut outs = ptr::null_mut();
2323 let mut outs_len = 0;
2324
2325 let mut outbuf = ptr::null_mut();
2329 let mut out: Vec<u8> = vec![];
2330 let mut status_string: Option<String> = None;
2331
2332 debug!("Calling rados_mon_command with {:?}", cmd);
2333
2334 unsafe {
2335 let ret_code = rados_mon_command(
2337 self.rados,
2338 &mut cmds.as_ptr(),
2339 1,
2340 data.as_ptr() as *mut c_char,
2341 data.len() as usize,
2342 &mut outbuf,
2343 &mut outbuf_len,
2344 &mut outs,
2345 &mut outs_len,
2346 );
2347 debug!("return code: {}", ret_code);
2348 if ret_code < 0 {
2349 if outs_len > 0 && !outs.is_null() {
2350 let slice = ::std::slice::from_raw_parts(outs as *const u8, outs_len as usize);
2351 rados_buffer_free(outs);
2352 return Err(RadosError::new(String::from_utf8_lossy(slice).into_owned()));
2353 }
2354 return Err(ret_code.into());
2355 }
2356
2357 if outbuf_len > 0 && !outbuf.is_null() {
2359 let slice = ::std::slice::from_raw_parts(outbuf as *const u8, outbuf_len as usize);
2360 out = slice.to_vec();
2361
2362 rados_buffer_free(outbuf);
2363 }
2364 if outs_len > 0 && !outs.is_null() {
2365 let slice = ::std::slice::from_raw_parts(outs as *const u8, outs_len as usize);
2366 status_string = Some(String::from_utf8(slice.to_vec())?);
2367 rados_buffer_free(outs);
2368 }
2369 }
2370
2371 Ok((out, status_string))
2372 }
2373
2374 pub fn ceph_mon_command_with_data(
2377 &self,
2378 name: &str,
2379 value: &str,
2380 format: Option<&str>,
2381 data: Vec<*mut c_char>,
2382 ) -> RadosResult<(Option<String>, Option<String>)> {
2383 self.conn_guard()?;
2384
2385 let mut cmd_strings: Vec<String> = Vec::new();
2386 match format {
2387 Some(fmt) => cmd_strings.push(format!(
2388 "{{\"{}\": \"{}\", \"format\": \"{}\"}}",
2389 name, value, fmt
2390 )),
2391 None => cmd_strings.push(format!("{{\"{}\": \"{}\"}}", name, value)),
2392 }
2393
2394 let cstrings: Vec<CString> = cmd_strings[..]
2395 .iter()
2396 .map(|s| CString::new(s.clone()).unwrap())
2397 .collect();
2398 let mut cmds: Vec<*const c_char> = cstrings.iter().map(|c| c.as_ptr()).collect();
2399
2400 let mut outbuf = ptr::null_mut();
2401 let mut outs = ptr::null_mut();
2402 let mut outbuf_len = 0;
2403 let mut outs_len = 0;
2404
2405 let mut str_outbuf: Option<String> = None;
2409 let mut str_outs: Option<String> = None;
2410
2411 debug!("Calling rados_mon_command with {:?}", cstrings);
2412
2413 unsafe {
2414 let ret_code = rados_mon_command(
2416 self.rados,
2417 cmds.as_mut_ptr(),
2418 1,
2419 data.as_ptr() as *mut c_char,
2420 data.len() as usize,
2421 &mut outbuf,
2422 &mut outbuf_len,
2423 &mut outs,
2424 &mut outs_len,
2425 );
2426 if ret_code < 0 {
2427 return Err(ret_code.into());
2428 }
2429
2430 if outbuf_len > 0 {
2432 let c_str_outbuf: &CStr = CStr::from_ptr(outbuf);
2433 let buf_outbuf: &[u8] = c_str_outbuf.to_bytes();
2434 let str_slice_outbuf: &str = str::from_utf8(buf_outbuf).unwrap();
2435 str_outbuf = Some(str_slice_outbuf.to_owned());
2436
2437 rados_buffer_free(outbuf);
2438 }
2439
2440 if outs_len > 0 {
2441 let c_str_outs: &CStr = CStr::from_ptr(outs);
2442 let buf_outs: &[u8] = c_str_outs.to_bytes();
2443 let str_slice_outs: &str = str::from_utf8(buf_outs).unwrap();
2444 str_outs = Some(str_slice_outs.to_owned());
2445
2446 rados_buffer_free(outs);
2447 }
2448 }
2449
2450 Ok((str_outbuf, str_outs))
2451 }
2452
2453 pub fn ceph_osd_command(
2455 &self,
2456 id: i32,
2457 name: &str,
2458 value: &str,
2459 format: Option<&str>,
2460 ) -> RadosResult<(Option<String>, Option<String>)> {
2461 let data: Vec<*mut c_char> = Vec::with_capacity(1);
2462 self.ceph_osd_command_with_data(id, name, value, format, data)
2463 }
2464
2465 pub fn ceph_osd_command_with_data(
2467 &self,
2468 id: i32,
2469 name: &str,
2470 value: &str,
2471 format: Option<&str>,
2472 data: Vec<*mut c_char>,
2473 ) -> RadosResult<(Option<String>, Option<String>)> {
2474 self.conn_guard()?;
2475
2476 let mut cmd_strings: Vec<String> = Vec::new();
2477 match format {
2478 Some(fmt) => cmd_strings.push(format!(
2479 "{{\"{}\": \"{}\", \"format\": \"{}\"}}",
2480 name, value, fmt
2481 )),
2482 None => cmd_strings.push(format!("{{\"{}\": \"{}\"}}", name, value)),
2483 }
2484
2485 let cstrings: Vec<CString> = cmd_strings[..]
2486 .iter()
2487 .map(|s| CString::new(s.clone()).unwrap())
2488 .collect();
2489 let mut cmds: Vec<*const c_char> = cstrings.iter().map(|c| c.as_ptr()).collect();
2490
2491 let mut outbuf = ptr::null_mut();
2492 let mut outs = ptr::null_mut();
2493 let mut outbuf_len = 0;
2494 let mut outs_len = 0;
2495
2496 let mut str_outbuf: Option<String> = None;
2500 let mut str_outs: Option<String> = None;
2501
2502 unsafe {
2503 let ret_code = rados_osd_command(
2505 self.rados,
2506 id,
2507 cmds.as_mut_ptr(),
2508 1,
2509 data.as_ptr() as *mut c_char,
2510 data.len() as usize,
2511 &mut outbuf,
2512 &mut outbuf_len,
2513 &mut outs,
2514 &mut outs_len,
2515 );
2516 if ret_code < 0 {
2517 return Err(ret_code.into());
2518 }
2519
2520 if outbuf_len > 0 {
2522 let c_str_outbuf: &CStr = CStr::from_ptr(outbuf);
2523 let buf_outbuf: &[u8] = c_str_outbuf.to_bytes();
2524 let str_slice_outbuf: &str = str::from_utf8(buf_outbuf).unwrap();
2525 str_outbuf = Some(str_slice_outbuf.to_owned());
2526
2527 rados_buffer_free(outbuf);
2528 }
2529
2530 if outs_len > 0 {
2531 let c_str_outs: &CStr = CStr::from_ptr(outs);
2532 let buf_outs: &[u8] = c_str_outs.to_bytes();
2533 let str_slice_outs: &str = str::from_utf8(buf_outs).unwrap();
2534 str_outs = Some(str_slice_outs.to_owned());
2535
2536 rados_buffer_free(outs);
2537 }
2538 }
2539
2540 Ok((str_outbuf, str_outs))
2541 }
2542
2543 pub fn ceph_pgs_command(
2545 &self,
2546 pg: &str,
2547 name: &str,
2548 value: &str,
2549 format: Option<&str>,
2550 ) -> RadosResult<(Option<String>, Option<String>)> {
2551 let data: Vec<*mut c_char> = Vec::with_capacity(1);
2552 self.ceph_pgs_command_with_data(pg, name, value, format, data)
2553 }
2554
2555 pub fn ceph_pgs_command_with_data(
2557 &self,
2558 pg: &str,
2559 name: &str,
2560 value: &str,
2561 format: Option<&str>,
2562 data: Vec<*mut c_char>,
2563 ) -> RadosResult<(Option<String>, Option<String>)> {
2564 self.conn_guard()?;
2565
2566 let mut cmd_strings: Vec<String> = Vec::new();
2567 match format {
2568 Some(fmt) => cmd_strings.push(format!(
2569 "{{\"{}\": \"{}\", \"format\": \"{}\"}}",
2570 name, value, fmt
2571 )),
2572 None => cmd_strings.push(format!("{{\"{}\": \"{}\"}}", name, value)),
2573 }
2574
2575 let pg_str = CString::new(pg).unwrap();
2576 let cstrings: Vec<CString> = cmd_strings[..]
2577 .iter()
2578 .map(|s| CString::new(s.clone()).unwrap())
2579 .collect();
2580 let mut cmds: Vec<*const c_char> = cstrings.iter().map(|c| c.as_ptr()).collect();
2581
2582 let mut outbuf = ptr::null_mut();
2583 let mut outs = ptr::null_mut();
2584 let mut outbuf_len = 0;
2585 let mut outs_len = 0;
2586
2587 let mut str_outbuf: Option<String> = None;
2591 let mut str_outs: Option<String> = None;
2592
2593 unsafe {
2594 let ret_code = rados_pg_command(
2596 self.rados,
2597 pg_str.as_ptr(),
2598 cmds.as_mut_ptr(),
2599 1,
2600 data.as_ptr() as *mut c_char,
2601 data.len() as usize,
2602 &mut outbuf,
2603 &mut outbuf_len,
2604 &mut outs,
2605 &mut outs_len,
2606 );
2607 if ret_code < 0 {
2608 return Err(ret_code.into());
2609 }
2610
2611 if outbuf_len > 0 {
2613 let c_str_outbuf: &CStr = CStr::from_ptr(outbuf);
2614 let buf_outbuf: &[u8] = c_str_outbuf.to_bytes();
2615 let str_slice_outbuf: &str = str::from_utf8(buf_outbuf).unwrap();
2616 str_outbuf = Some(str_slice_outbuf.to_owned());
2617
2618 rados_buffer_free(outbuf);
2619 }
2620
2621 if outs_len > 0 {
2622 let c_str_outs: &CStr = CStr::from_ptr(outs);
2623 let buf_outs: &[u8] = c_str_outs.to_bytes();
2624 let str_slice_outs: &str = str::from_utf8(buf_outs).unwrap();
2625 str_outs = Some(str_slice_outs.to_owned());
2626
2627 rados_buffer_free(outs);
2628 }
2629 }
2630
2631 Ok((str_outbuf, str_outs))
2632 }
2633}
2634
2635#[cfg(feature = "rados_striper")]
2636impl RadosStriper {
2637 pub fn inner(&self) -> &rados_striper_t {
2638 &self.rados_striper
2639 }
2640
2641 pub fn destroy_rados_striper(&self) {
2643 if self.rados_striper.is_null() {
2644 return;
2646 }
2647 unsafe {
2648 rados_striper_destroy(self.rados_striper);
2649 }
2650 }
2651
2652 fn rados_striper_guard(&self) -> RadosResult<()> {
2653 if self.rados_striper.is_null() {
2654 return Err(RadosError::new(
2655 "Rados striper not created. Please initialize first".to_string(),
2656 ));
2657 }
2658 Ok(())
2659 }
2660
2661 pub fn rados_object_write(
2664 &self,
2665 object_name: &str,
2666 buffer: &[u8],
2667 offset: u64,
2668 ) -> RadosResult<()> {
2669 self.rados_striper_guard()?;
2670 let obj_name_str = CString::new(object_name)?;
2671
2672 unsafe {
2673 let ret_code = rados_striper_write(
2674 self.rados_striper,
2675 obj_name_str.as_ptr(),
2676 buffer.as_ptr() as *const c_char,
2677 buffer.len(),
2678 offset,
2679 );
2680 if ret_code < 0 {
2681 return Err(ret_code.into());
2682 }
2683 }
2684 Ok(())
2685 }
2686
2687 pub fn rados_object_write_full(&self, object_name: &str, buffer: &[u8]) -> RadosResult<()> {
2691 self.rados_striper_guard()?;
2692 let obj_name_str = CString::new(object_name)?;
2693
2694 unsafe {
2695 let ret_code = rados_striper_write_full(
2696 self.rados_striper,
2697 obj_name_str.as_ptr(),
2698 buffer.as_ptr() as *const ::libc::c_char,
2699 buffer.len(),
2700 );
2701 if ret_code < 0 {
2702 return Err(ret_code.into());
2703 }
2704 }
2705 Ok(())
2706 }
2707
2708 pub fn rados_object_append(&self, object_name: &str, buffer: &[u8]) -> RadosResult<()> {
2710 self.rados_striper_guard()?;
2711 let obj_name_str = CString::new(object_name)?;
2712
2713 unsafe {
2714 let ret_code = rados_striper_append(
2715 self.rados_striper,
2716 obj_name_str.as_ptr(),
2717 buffer.as_ptr() as *const c_char,
2718 buffer.len(),
2719 );
2720 if ret_code < 0 {
2721 return Err(ret_code.into());
2722 }
2723 }
2724 Ok(())
2725 }
2726
2727 pub fn rados_object_read(
2734 &self,
2735 object_name: &str,
2736 fill_buffer: &mut Vec<u8>,
2737 read_offset: u64,
2738 ) -> RadosResult<i32> {
2739 self.rados_striper_guard()?;
2740 let object_name_str = CString::new(object_name)?;
2741 let mut len = fill_buffer.capacity();
2742 if len == 0 {
2743 fill_buffer.reserve_exact(1024 * 64);
2744 len = fill_buffer.capacity();
2745 }
2746
2747 unsafe {
2748 let ret_code = rados_striper_read(
2749 self.rados_striper,
2750 object_name_str.as_ptr(),
2751 fill_buffer.as_mut_ptr() as *mut c_char,
2752 len,
2753 read_offset,
2754 );
2755 if ret_code < 0 {
2756 return Err(ret_code.into());
2757 }
2758 fill_buffer.set_len(ret_code as usize);
2759 Ok(ret_code)
2760 }
2761 }
2762
2763 pub fn rados_object_remove(&self, object_name: &str) -> RadosResult<()> {
2766 self.rados_striper_guard()?;
2767 let object_name_str = CString::new(object_name)?;
2768
2769 unsafe {
2770 let ret_code = rados_striper_remove(
2771 self.rados_striper,
2772 object_name_str.as_ptr() as *const c_char,
2773 );
2774 if ret_code < 0 {
2775 return Err(ret_code.into());
2776 }
2777 }
2778 Ok(())
2779 }
2780
2781 pub fn rados_object_trunc(&self, object_name: &str, new_size: u64) -> RadosResult<()> {
2785 self.rados_striper_guard()?;
2786 let object_name_str = CString::new(object_name)?;
2787
2788 unsafe {
2789 let ret_code =
2790 rados_striper_trunc(self.rados_striper, object_name_str.as_ptr(), new_size);
2791 if ret_code < 0 {
2792 return Err(ret_code.into());
2793 }
2794 }
2795 Ok(())
2796 }
2797
2798 pub fn rados_object_getxattr(
2800 &self,
2801 object_name: &str,
2802 attr_name: &str,
2803 fill_buffer: &mut [u8],
2804 ) -> RadosResult<i32> {
2805 self.rados_striper_guard()?;
2806 let object_name_str = CString::new(object_name)?;
2807 let attr_name_str = CString::new(attr_name)?;
2808
2809 unsafe {
2810 let ret_code = rados_striper_getxattr(
2811 self.rados_striper,
2812 object_name_str.as_ptr() as *const c_char,
2813 attr_name_str.as_ptr() as *const c_char,
2814 fill_buffer.as_mut_ptr() as *mut c_char,
2815 fill_buffer.len(),
2816 );
2817 if ret_code < 0 {
2818 return Err(ret_code.into());
2819 }
2820 Ok(ret_code)
2821 }
2822 }
2823
2824 pub fn rados_object_setxattr(
2826 &self,
2827 object_name: &str,
2828 attr_name: &str,
2829 attr_value: &mut [u8],
2830 ) -> RadosResult<()> {
2831 self.rados_striper_guard()?;
2832 let object_name_str = CString::new(object_name)?;
2833 let attr_name_str = CString::new(attr_name)?;
2834
2835 unsafe {
2836 let ret_code = rados_striper_setxattr(
2837 self.rados_striper,
2838 object_name_str.as_ptr() as *const c_char,
2839 attr_name_str.as_ptr() as *const c_char,
2840 attr_value.as_mut_ptr() as *mut c_char,
2841 attr_value.len(),
2842 );
2843 if ret_code < 0 {
2844 return Err(ret_code.into());
2845 }
2846 }
2847 Ok(())
2848 }
2849
2850 pub fn rados_object_rmxattr(&self, object_name: &str, attr_name: &str) -> RadosResult<()> {
2852 self.rados_striper_guard()?;
2853 let object_name_str = CString::new(object_name)?;
2854 let attr_name_str = CString::new(attr_name)?;
2855
2856 unsafe {
2857 let ret_code = rados_striper_rmxattr(
2858 self.rados_striper,
2859 object_name_str.as_ptr() as *const c_char,
2860 attr_name_str.as_ptr() as *const c_char,
2861 );
2862 if ret_code < 0 {
2863 return Err(ret_code.into());
2864 }
2865 }
2866 Ok(())
2867 }
2868
2869 pub fn rados_get_xattr_iterator(&self, object_name: &str) -> RadosResult<rados_xattrs_iter_t> {
2872 self.rados_striper_guard()?;
2873 let object_name_str = CString::new(object_name)?;
2874 let mut xattr_iterator_handle: rados_xattrs_iter_t = ptr::null_mut();
2875
2876 unsafe {
2877 let ret_code = rados_striper_getxattrs(
2878 self.rados_striper,
2879 object_name_str.as_ptr(),
2880 &mut xattr_iterator_handle,
2881 );
2882 if ret_code < 0 {
2883 return Err(ret_code.into());
2884 }
2885 }
2886 Ok(xattr_iterator_handle)
2887 }
2888
2889 pub fn rados_object_stat(&self, object_name: &str) -> RadosResult<(u64, SystemTime)> {
2891 self.rados_striper_guard()?;
2892 let object_name_str = CString::new(object_name)?;
2893 let mut psize: u64 = 0;
2894 let mut time: ::libc::time_t = 0;
2895
2896 unsafe {
2897 let ret_code = rados_striper_stat(
2898 self.rados_striper,
2899 object_name_str.as_ptr(),
2900 &mut psize,
2901 &mut time,
2902 );
2903 if ret_code < 0 {
2904 return Err(ret_code.into());
2905 }
2906 }
2907 Ok((psize, (UNIX_EPOCH + Duration::from_secs(time as u64))))
2908 }
2909}