1#![cfg(unix)]
15#![allow(unused_imports)]
16
17use crate::JsonData;
18
19use crate::admin_sockets::*;
20use crate::error::*;
21use crate::json::*;
22use crate::JsonValue;
23use byteorder::{LittleEndian, WriteBytesExt};
24use libc::*;
25use nom::number::complete::le_u32;
26use nom::IResult;
27use serde_json;
28
29use crate::rados::*;
30#[cfg(feature = "rados_striper")]
31use crate::rados_striper::*;
32use crate::status::*;
33use std::ffi::{CStr, CString};
34use std::marker::PhantomData;
35use std::{ptr, str};
36
37use crate::utils::*;
38use std::io::{BufRead, Cursor};
39use std::net::IpAddr;
40use std::time::{Duration, SystemTime, UNIX_EPOCH};
41
42use uuid::Uuid;
43
44const CEPH_OSD_TMAP_HDR: char = 'h';
45const CEPH_OSD_TMAP_SET: char = 's';
46const CEPH_OSD_TMAP_CREATE: char = 'c';
47const CEPH_OSD_TMAP_RM: char = 'r';
48
49#[derive(Debug, Clone)]
50pub enum CephHealth {
51 Ok,
52 Warning,
53 Error,
54}
55
56#[derive(Debug, Clone)]
57pub enum CephCommandTypes {
58 Mon,
59 Osd,
60 Pgs,
61}
62
63named!(
64 parse_header<TmapOperation>,
65 do_parse!(
66 char!(CEPH_OSD_TMAP_HDR)
67 >> data_len: le_u32
68 >> data: take!(data_len)
69 >> (TmapOperation::Header {
70 data: data.to_vec()
71 })
72 )
73);
74
75named!(
76 parse_create<TmapOperation>,
77 do_parse!(
78 char!(CEPH_OSD_TMAP_CREATE)
79 >> key_name_len: le_u32
80 >> key_name: take_str!(key_name_len)
81 >> data_len: le_u32
82 >> data: take!(data_len)
83 >> (TmapOperation::Create {
84 name: key_name.to_string(),
85 data: data.to_vec(),
86 })
87 )
88);
89
90named!(
91 parse_set<TmapOperation>,
92 do_parse!(
93 char!(CEPH_OSD_TMAP_SET)
94 >> key_name_len: le_u32
95 >> key_name: take_str!(key_name_len)
96 >> data_len: le_u32
97 >> data: take!(data_len)
98 >> (TmapOperation::Set {
99 key: key_name.to_string(),
100 data: data.to_vec(),
101 })
102 )
103);
104
105named!(
106 parse_remove<TmapOperation>,
107 do_parse!(
108 char!(CEPH_OSD_TMAP_RM)
109 >> key_name_len: le_u32
110 >> key_name: take_str!(key_name_len)
111 >> (TmapOperation::Remove {
112 name: key_name.to_string(),
113 })
114 )
115);
116
117#[derive(Debug)]
118pub enum TmapOperation {
119 Header { data: Vec<u8> },
120 Set { key: String, data: Vec<u8> },
121 Create { name: String, data: Vec<u8> },
122 Remove { name: String },
123}
124
125impl TmapOperation {
126 fn serialize(&self) -> RadosResult<Vec<u8>> {
127 let mut buffer: Vec<u8> = Vec::new();
128 match *self {
129 TmapOperation::Header { ref data } => {
130 buffer.push(CEPH_OSD_TMAP_HDR as u8);
131 buffer.write_u32::<LittleEndian>(data.len() as u32)?;
132 buffer.extend_from_slice(data);
133 }
134 TmapOperation::Set { ref key, ref data } => {
135 buffer.push(CEPH_OSD_TMAP_SET as u8);
136 buffer.write_u32::<LittleEndian>(key.len() as u32)?;
137 buffer.extend(key.as_bytes());
138 buffer.write_u32::<LittleEndian>(data.len() as u32)?;
139 buffer.extend_from_slice(data);
140 }
141 TmapOperation::Create { ref name, ref data } => {
142 buffer.push(CEPH_OSD_TMAP_CREATE as u8);
143 buffer.write_u32::<LittleEndian>(name.len() as u32)?;
144 buffer.extend(name.as_bytes());
145 buffer.write_u32::<LittleEndian>(data.len() as u32)?;
146 buffer.extend_from_slice(data);
147 }
148 TmapOperation::Remove { ref name } => {
149 buffer.push(CEPH_OSD_TMAP_RM as u8);
150 buffer.write_u32::<LittleEndian>(name.len() as u32)?;
151 buffer.extend(name.as_bytes());
152 }
153 }
154 Ok(buffer)
155 }
156
157 fn deserialize(input: &[u8]) -> IResult<&[u8], Vec<TmapOperation>> {
158 many0!(
159 input,
160 alt!(
161 complete!(parse_header)
162 | complete!(parse_create)
163 | complete!(parse_set)
164 | complete!(parse_remove)
165 )
166 )
167 }
168}
169
170#[derive(Debug)]
172pub struct Pool {
173 pub ctx: rados_list_ctx_t,
174}
175
176#[derive(Debug)]
177pub struct CephObject {
178 pub name: String,
179 pub entry_locator: String,
180 pub namespace: String,
181}
182
183impl Iterator for Pool {
184 type Item = CephObject;
185 fn next(&mut self) -> Option<CephObject> {
186 let mut entry_ptr: *mut *const ::libc::c_char = ptr::null_mut();
187 let mut key_ptr: *mut *const ::libc::c_char = ptr::null_mut();
188 let mut nspace_ptr: *mut *const ::libc::c_char = ptr::null_mut();
189
190 unsafe {
191 let ret_code =
192 rados_nobjects_list_next(self.ctx, &mut entry_ptr, &mut key_ptr, &mut nspace_ptr);
193 if ret_code == -ENOENT {
194 rados_nobjects_list_close(self.ctx);
196 None
197 } else if ret_code < 0 {
198 None
200 } else {
201 let object_name = CStr::from_ptr(entry_ptr as *const ::libc::c_char);
202 let mut object_locator = String::new();
203 let mut namespace = String::new();
204 if !key_ptr.is_null() {
205 object_locator.push_str(
206 &CStr::from_ptr(key_ptr as *const ::libc::c_char).to_string_lossy(),
207 );
208 }
209 if !nspace_ptr.is_null() {
210 namespace.push_str(
211 &CStr::from_ptr(nspace_ptr as *const ::libc::c_char).to_string_lossy(),
212 );
213 }
214
215 Some(CephObject {
216 name: object_name.to_string_lossy().into_owned(),
217 entry_locator: object_locator,
218 namespace,
219 })
220 }
221 }
222 }
223}
224
225#[derive(Debug)]
229pub struct ReadOperation {
230 pub object_name: String,
231 pub flags: u32,
235 read_op_handle: rados_read_op_t,
236}
237
238impl Drop for ReadOperation {
239 fn drop(&mut self) {
240 unsafe {
241 rados_release_read_op(self.read_op_handle);
242 }
243 }
244}
245
246#[derive(Debug)]
250pub struct WriteOperation {
251 pub object_name: String,
252 pub flags: u32,
256 pub mtime: time_t,
257 write_op_handle: rados_write_op_t,
258}
259
260impl Drop for WriteOperation {
261 fn drop(&mut self) {
262 unsafe {
263 rados_release_write_op(self.write_op_handle);
264 }
265 }
266}
267
268#[derive(Debug)]
271pub struct XAttr {
272 pub name: String,
273 pub value: String,
274 iter: rados_xattrs_iter_t,
275}
276
277#[derive(Debug)]
279pub struct RadosVersion {
280 pub major: i32,
281 pub minor: i32,
282 pub extra: i32,
283}
284
285impl XAttr {
286 pub fn new(iter: rados_xattrs_iter_t) -> XAttr {
289 XAttr {
290 name: String::new(),
291 value: String::new(),
292 iter,
293 }
294 }
295}
296
297impl Iterator for XAttr {
298 type Item = XAttr;
299
300 fn next(&mut self) -> Option<Self::Item> {
301 let mut name: *const c_char = ptr::null();
303 let mut value: *const c_char = ptr::null();
305 let mut val_length: usize = 0;
306 unsafe {
307 let ret_code = rados_getxattrs_next(self.iter, &mut name, &mut value, &mut val_length);
308
309 if ret_code < 0 {
310 None
312 }
313 else if value.is_null() && val_length == 0 {
315 rados_getxattrs_end(self.iter);
316 None
317 } else {
318 let name = CStr::from_ptr(name);
319 let s_bytes = std::slice::from_raw_parts(value, val_length);
321 let bytes: Vec<u8> = s_bytes.iter().map(|c| *c as u8).collect();
323 Some(XAttr {
324 name: name.to_string_lossy().into_owned(),
325 value: String::from_utf8_lossy(&bytes).into_owned(),
326 iter: self.iter,
327 })
328 }
329 }
330 }
331}
332
333pub struct IoCtx {
335 ioctx: rados_ioctx_t,
336}
337
338unsafe impl Send for IoCtx {}
339unsafe impl Sync for IoCtx {}
340
341impl Drop for IoCtx {
342 fn drop(&mut self) {
343 if !self.ioctx.is_null() {
344 unsafe {
345 rados_ioctx_destroy(self.ioctx);
346 }
347 }
348 }
349}
350
351#[cfg(feature = "rados_striper")]
353pub struct RadosStriper {
354 rados_striper: rados_ioctx_t,
355}
356
357#[cfg(feature = "rados_striper")]
358impl Drop for RadosStriper {
359 fn drop(&mut self) {
360 if !self.rados_striper.is_null() {
361 unsafe {
362 rados_striper_destroy(self.rados_striper);
363 }
364 }
365 }
366}
367
368pub struct Rados {
370 rados: rados_t,
371 phantom: PhantomData<IoCtx>,
372}
373
374unsafe impl Sync for Rados {}
375
376impl Drop for Rados {
377 fn drop(&mut self) {
378 if !self.rados.is_null() {
379 unsafe {
380 rados_shutdown(self.rados);
381 }
382 }
383 }
384}
385
386pub fn connect_to_ceph(user_id: &str, config_file: &str) -> RadosResult<Rados> {
388 let connect_id = CString::new(user_id)?;
389 let conf_file = CString::new(config_file)?;
390 unsafe {
391 let mut cluster_handle: rados_t = ptr::null_mut();
392 let ret_code = rados_create(&mut cluster_handle, connect_id.as_ptr());
393 if ret_code < 0 {
394 return Err(ret_code.into());
395 }
396 let ret_code = rados_conf_read_file(cluster_handle, conf_file.as_ptr());
397 if ret_code < 0 {
398 return Err(ret_code.into());
399 }
400 let ret_code = rados_connect(cluster_handle);
401 if ret_code < 0 {
402 return Err(ret_code.into());
403 }
404 Ok(Rados {
405 rados: cluster_handle,
406 phantom: PhantomData,
407 })
408 }
409}
410
411impl Rados {
412 pub fn inner(&self) -> &rados_t {
413 &self.rados
414 }
415
416 pub fn disconnect_from_ceph(&self) {
420 if self.rados.is_null() {
421 return;
423 }
424 unsafe {
425 rados_shutdown(self.rados);
426 }
427 }
428
429 fn conn_guard(&self) -> RadosResult<()> {
430 if self.rados.is_null() {
431 return Err(RadosError::new(
432 "Rados not connected. Please initialize cluster".to_string(),
433 ));
434 }
435 Ok(())
436 }
437
438 pub fn config_set(&self, name: &str, value: &str) -> RadosResult<()> {
440 if !self.rados.is_null() {
441 return Err(RadosError::new(
442 "Rados should not be connected when this function is called".into(),
443 ));
444 }
445 let name_str = CString::new(name)?;
446 let value_str = CString::new(value)?;
447 unsafe {
448 let ret_code = rados_conf_set(self.rados, name_str.as_ptr(), value_str.as_ptr());
449 if ret_code < 0 {
450 return Err(ret_code.into());
451 }
452 }
453 Ok(())
454 }
455
456 pub fn config_get(&self, name: &str) -> RadosResult<String> {
458 let name_str = CString::new(name)?;
459 let mut buffer: Vec<u8> = Vec::with_capacity(5120);
461 unsafe {
462 let ret_code = rados_conf_get(
463 self.rados,
464 name_str.as_ptr(),
465 buffer.as_mut_ptr() as *mut c_char,
466 buffer.capacity(),
467 );
468 if ret_code < 0 {
469 return Err(ret_code.into());
470 }
471 buffer.set_len(5120);
473 let num_bytes = buffer.iter().position(|x| x == &0u8);
475 buffer.set_len(num_bytes.unwrap_or(0));
476 Ok(String::from_utf8_lossy(&buffer).into_owned())
477 }
478 }
479
480 pub fn get_rados_ioctx(&self, pool_name: &str) -> RadosResult<IoCtx> {
484 self.conn_guard()?;
485 let pool_name_str = CString::new(pool_name)?;
486 unsafe {
487 let mut ioctx: rados_ioctx_t = ptr::null_mut();
488 let ret_code = rados_ioctx_create(self.rados, pool_name_str.as_ptr(), &mut ioctx);
489 if ret_code < 0 {
490 return Err(ret_code.into());
491 }
492 Ok(IoCtx { ioctx })
493 }
494 }
495
496 pub fn get_rados_ioctx2(&self, pool_id: i64) -> RadosResult<IoCtx> {
500 self.conn_guard()?;
501 unsafe {
502 let mut ioctx: rados_ioctx_t = ptr::null_mut();
503 let ret_code = rados_ioctx_create2(self.rados, pool_id, &mut ioctx);
504 if ret_code < 0 {
505 return Err(ret_code.into());
506 }
507 Ok(IoCtx { ioctx })
508 }
509 }
510}
511
512impl IoCtx {
513 pub fn inner(&self) -> &rados_ioctx_t {
514 &self.ioctx
515 }
516
517 pub fn destroy_rados_ioctx(&self) {
525 if self.ioctx.is_null() {
526 return;
528 }
529 unsafe {
530 rados_ioctx_destroy(self.ioctx);
531 }
532 }
533 fn ioctx_guard(&self) -> RadosResult<()> {
534 if self.ioctx.is_null() {
535 return Err(RadosError::new(
536 "Rados ioctx not created. Please initialize first".to_string(),
537 ));
538 }
539 Ok(())
540 }
541 pub fn rados_stat_pool(&self) -> RadosResult<Struct_rados_pool_stat_t> {
543 self.ioctx_guard()?;
544 let mut pool_stat = Struct_rados_pool_stat_t::default();
545 unsafe {
546 let ret_code = rados_ioctx_pool_stat(self.ioctx, &mut pool_stat);
547 if ret_code < 0 {
548 return Err(ret_code.into());
549 }
550 Ok(pool_stat)
551 }
552 }
553
554 pub fn rados_pool_set_auid(&self, auid: u64) -> RadosResult<()> {
555 self.ioctx_guard()?;
556 unsafe {
557 let ret_code = rados_ioctx_pool_set_auid(self.ioctx, auid);
558 if ret_code < 0 {
559 return Err(ret_code.into());
560 }
561 Ok(())
562 }
563 }
564
565 pub fn rados_pool_get_auid(&self) -> RadosResult<u64> {
566 self.ioctx_guard()?;
567 let mut auid: u64 = 0;
568 unsafe {
569 let ret_code = rados_ioctx_pool_get_auid(self.ioctx, &mut auid);
570 if ret_code < 0 {
571 return Err(ret_code.into());
572 }
573 Ok(auid)
574 }
575 }
576
577 pub fn rados_pool_requires_alignment(&self) -> RadosResult<bool> {
579 self.ioctx_guard()?;
580 unsafe {
581 let ret_code = rados_ioctx_pool_requires_alignment(self.ioctx);
582 if ret_code < 0 {
583 return Err(ret_code.into());
584 }
585 if ret_code == 0 {
586 Ok(false)
587 } else {
588 Ok(true)
589 }
590 }
591 }
592
593 pub fn rados_pool_required_alignment(&self) -> RadosResult<u64> {
595 self.ioctx_guard()?;
596 unsafe {
597 let ret_code = rados_ioctx_pool_required_alignment(self.ioctx);
598 Ok(ret_code)
599 }
600 }
601
602 pub fn rados_object_get_id(&self) -> RadosResult<i64> {
604 self.ioctx_guard()?;
605 unsafe {
606 let pool_id = rados_ioctx_get_id(self.ioctx);
607 Ok(pool_id)
608 }
609 }
610
611 pub fn rados_get_pool_name(&self) -> RadosResult<String> {
613 self.ioctx_guard()?;
614 let mut buffer: Vec<u8> = Vec::with_capacity(500);
615
616 unsafe {
617 let ret_code = rados_ioctx_get_pool_name(
619 self.ioctx,
620 buffer.as_mut_ptr() as *mut c_char,
621 buffer.capacity() as c_uint,
622 );
623 if ret_code == -ERANGE {
624 buffer.reserve(1000);
626 buffer.set_len(1000);
627 let ret_code = rados_ioctx_get_pool_name(
628 self.ioctx,
629 buffer.as_mut_ptr() as *mut c_char,
630 buffer.capacity() as c_uint,
631 );
632 if ret_code < 0 {
633 return Err(ret_code.into());
634 }
635 Ok(String::from_utf8_lossy(&buffer).into_owned())
636 } else if ret_code < 0 {
637 Err(ret_code.into())
638 } else {
639 buffer.set_len(ret_code as usize);
640 Ok(String::from_utf8_lossy(&buffer).into_owned())
641 }
642 }
643 }
644
645 pub fn rados_locator_set_key(&self, key: &str) -> RadosResult<()> {
647 self.ioctx_guard()?;
648 let key_str = CString::new(key)?;
649 unsafe {
650 rados_ioctx_locator_set_key(self.ioctx, key_str.as_ptr());
651 }
652 Ok(())
653 }
654
655 pub fn rados_set_namespace(&self, namespace: &str) -> RadosResult<()> {
659 self.ioctx_guard()?;
660 let namespace_str = CString::new(namespace)?;
661 unsafe {
662 rados_ioctx_set_namespace(self.ioctx, namespace_str.as_ptr());
663 }
664 Ok(())
665 }
666
667 pub fn rados_list_pool_objects(&self) -> RadosResult<rados_list_ctx_t> {
669 self.ioctx_guard()?;
670 let mut rados_list_ctx: rados_list_ctx_t = ptr::null_mut();
671 unsafe {
672 let ret_code = rados_nobjects_list_open(self.ioctx, &mut rados_list_ctx);
673 if ret_code < 0 {
674 return Err(ret_code.into());
675 }
676 }
677 Ok(rados_list_ctx)
678 }
679
680 pub fn rados_snap_create(&self, snap_name: &str) -> RadosResult<()> {
682 self.ioctx_guard()?;
683
684 let snap_name_str = CString::new(snap_name)?;
685 unsafe {
686 let ret_code = rados_ioctx_snap_create(self.ioctx, snap_name_str.as_ptr());
687 if ret_code < 0 {
688 return Err(ret_code.into());
689 }
690 }
691 Ok(())
692 }
693
694 pub fn rados_snap_remove(&self, snap_name: &str) -> RadosResult<()> {
696 self.ioctx_guard()?;
697 let snap_name_str = CString::new(snap_name)?;
698
699 unsafe {
700 let ret_code = rados_ioctx_snap_remove(self.ioctx, snap_name_str.as_ptr());
701 if ret_code < 0 {
702 return Err(ret_code.into());
703 }
704 }
705 Ok(())
706 }
707
708 pub fn rados_snap_rollback(&self, object_name: &str, snap_name: &str) -> RadosResult<()> {
712 self.ioctx_guard()?;
713 let snap_name_str = CString::new(snap_name)?;
714 let object_name_str = CString::new(object_name)?;
715
716 unsafe {
717 let ret_code = rados_ioctx_snap_rollback(
718 self.ioctx,
719 object_name_str.as_ptr(),
720 snap_name_str.as_ptr(),
721 );
722 if ret_code < 0 {
723 return Err(ret_code.into());
724 }
725 }
726 Ok(())
727 }
728
729 pub fn rados_snap_set_read(&self, snap_id: u64) -> RadosResult<()> {
733 self.ioctx_guard()?;
734
735 unsafe {
736 rados_ioctx_snap_set_read(self.ioctx, snap_id);
737 }
738 Ok(())
739 }
740
741 pub fn rados_selfmanaged_snap_create(&self) -> RadosResult<u64> {
746 self.ioctx_guard()?;
747 let mut snap_id: u64 = 0;
748 unsafe {
749 let ret_code = rados_ioctx_selfmanaged_snap_create(self.ioctx, &mut snap_id);
750 if ret_code < 0 {
751 return Err(ret_code.into());
752 }
753 }
754 Ok(snap_id)
755 }
756
757 pub fn rados_selfmanaged_snap_remove(&self, snap_id: u64) -> RadosResult<()> {
761 self.ioctx_guard()?;
762
763 unsafe {
764 let ret_code = rados_ioctx_selfmanaged_snap_remove(self.ioctx, snap_id);
765 if ret_code < 0 {
766 return Err(ret_code.into());
767 }
768 }
769 Ok(())
770 }
771
772 pub fn rados_selfmanaged_snap_rollback(
776 &self,
777 object_name: &str,
778 snap_id: u64,
779 ) -> RadosResult<()> {
780 self.ioctx_guard()?;
781 let object_name_str = CString::new(object_name)?;
782
783 unsafe {
784 let ret_code = rados_ioctx_selfmanaged_snap_rollback(
785 self.ioctx,
786 object_name_str.as_ptr(),
787 snap_id,
788 );
789 if ret_code < 0 {
790 return Err(ret_code.into());
791 }
792 }
793 Ok(())
794 }
795
796 pub fn rados_snap_lookup(&self, snap_name: &str) -> RadosResult<u64> {
830 self.ioctx_guard()?;
831 let snap_name_str = CString::new(snap_name)?;
832 let mut snap_id: u64 = 0;
833 unsafe {
834 let ret_code =
835 rados_ioctx_snap_lookup(self.ioctx, snap_name_str.as_ptr(), &mut snap_id);
836 if ret_code < 0 {
837 return Err(ret_code.into());
838 }
839 }
840 Ok(snap_id)
841 }
842
843 pub fn rados_snap_get_name(&self, snap_id: u64) -> RadosResult<String> {
845 self.ioctx_guard()?;
846
847 let out_buffer: Vec<u8> = Vec::with_capacity(500);
848 let out_buff_size = out_buffer.capacity();
849 let out_str = CString::new(out_buffer)?;
850 unsafe {
851 let ret_code = rados_ioctx_snap_get_name(
852 self.ioctx,
853 snap_id,
854 out_str.as_ptr() as *mut c_char,
855 out_buff_size as c_int,
856 );
857 if ret_code == -ERANGE {}
858 if ret_code < 0 {
859 return Err(ret_code.into());
860 }
861 }
862 Ok(out_str.to_string_lossy().into_owned())
863 }
864
865 pub fn rados_snap_get_stamp(&self, snap_id: u64) -> RadosResult<time_t> {
867 self.ioctx_guard()?;
868
869 let mut time_id: time_t = 0;
870 unsafe {
871 let ret_code = rados_ioctx_snap_get_stamp(self.ioctx, snap_id, &mut time_id);
872 if ret_code < 0 {
873 return Err(ret_code.into());
874 }
875 }
876 Ok(time_id)
877 }
878
879 pub fn rados_get_object_last_version(&self) -> RadosResult<u64> {
883 self.ioctx_guard()?;
884 unsafe {
885 let obj_id = rados_get_last_version(self.ioctx);
886 Ok(obj_id)
887 }
888 }
889
890 pub fn rados_object_write(
893 &self,
894 object_name: &str,
895 buffer: &[u8],
896 offset: u64,
897 ) -> RadosResult<()> {
898 self.ioctx_guard()?;
899 let obj_name_str = CString::new(object_name)?;
900
901 unsafe {
902 let ret_code = rados_write(
903 self.ioctx,
904 obj_name_str.as_ptr(),
905 buffer.as_ptr() as *const c_char,
906 buffer.len(),
907 offset,
908 );
909 if ret_code < 0 {
910 return Err(ret_code.into());
911 }
912 }
913 Ok(())
914 }
915
916 pub fn rados_object_write_full(&self, object_name: &str, buffer: &[u8]) -> RadosResult<()> {
920 self.ioctx_guard()?;
921 let obj_name_str = CString::new(object_name)?;
922
923 unsafe {
924 let ret_code = rados_write_full(
925 self.ioctx,
926 obj_name_str.as_ptr(),
927 buffer.as_ptr() as *const ::libc::c_char,
928 buffer.len(),
929 );
930 if ret_code < 0 {
931 return Err(ret_code.into());
932 }
933 }
934 Ok(())
935 }
936
937 pub fn rados_object_clone_range(
944 &self,
945 dst_object_name: &str,
946 dst_offset: u64,
947 src_object_name: &str,
948 src_offset: u64,
949 length: usize,
950 ) -> RadosResult<()> {
951 self.ioctx_guard()?;
952 let dst_name_str = CString::new(dst_object_name)?;
953 let src_name_str = CString::new(src_object_name)?;
954
955 unsafe {
956 let ret_code = rados_clone_range(
957 self.ioctx,
958 dst_name_str.as_ptr(),
959 dst_offset,
960 src_name_str.as_ptr(),
961 src_offset,
962 length,
963 );
964 if ret_code < 0 {
965 return Err(ret_code.into());
966 }
967 }
968 Ok(())
969 }
970
971 pub fn rados_object_append(&self, object_name: &str, buffer: &[u8]) -> RadosResult<()> {
973 self.ioctx_guard()?;
974 let obj_name_str = CString::new(object_name)?;
975
976 unsafe {
977 let ret_code = rados_append(
978 self.ioctx,
979 obj_name_str.as_ptr(),
980 buffer.as_ptr() as *const c_char,
981 buffer.len(),
982 );
983 if ret_code < 0 {
984 return Err(ret_code.into());
985 }
986 }
987 Ok(())
988 }
989
990 pub fn rados_object_read(
997 &self,
998 object_name: &str,
999 fill_buffer: &mut Vec<u8>,
1000 read_offset: u64,
1001 ) -> RadosResult<i32> {
1002 self.ioctx_guard()?;
1003 let object_name_str = CString::new(object_name)?;
1004 let mut len = fill_buffer.capacity();
1005 if len == 0 {
1006 fill_buffer.reserve_exact(1024 * 64);
1007 len = fill_buffer.capacity();
1008 }
1009
1010 unsafe {
1011 let ret_code = rados_read(
1012 self.ioctx,
1013 object_name_str.as_ptr(),
1014 fill_buffer.as_mut_ptr() as *mut c_char,
1015 len,
1016 read_offset,
1017 );
1018 if ret_code < 0 {
1019 return Err(ret_code.into());
1020 }
1021 fill_buffer.set_len(ret_code as usize);
1022 Ok(ret_code)
1023 }
1024 }
1025
1026 pub fn rados_object_remove(&self, object_name: &str) -> RadosResult<()> {
1029 self.ioctx_guard()?;
1030 let object_name_str = CString::new(object_name)?;
1031
1032 unsafe {
1033 let ret_code = rados_remove(self.ioctx, object_name_str.as_ptr() as *const c_char);
1034 if ret_code < 0 {
1035 return Err(ret_code.into());
1036 }
1037 }
1038 Ok(())
1039 }
1040
1041 pub fn rados_object_trunc(&self, object_name: &str, new_size: u64) -> RadosResult<()> {
1045 self.ioctx_guard()?;
1046 let object_name_str = CString::new(object_name)?;
1047
1048 unsafe {
1049 let ret_code = rados_trunc(self.ioctx, object_name_str.as_ptr(), new_size);
1050 if ret_code < 0 {
1051 return Err(ret_code.into());
1052 }
1053 }
1054 Ok(())
1055 }
1056
1057 pub fn rados_object_getxattr(
1059 &self,
1060 object_name: &str,
1061 attr_name: &str,
1062 fill_buffer: &mut [u8],
1063 ) -> RadosResult<i32> {
1064 self.ioctx_guard()?;
1065 let object_name_str = CString::new(object_name)?;
1066 let attr_name_str = CString::new(attr_name)?;
1067
1068 unsafe {
1069 let ret_code = rados_getxattr(
1070 self.ioctx,
1071 object_name_str.as_ptr() as *const c_char,
1072 attr_name_str.as_ptr() as *const c_char,
1073 fill_buffer.as_mut_ptr() as *mut c_char,
1074 fill_buffer.len(),
1075 );
1076 if ret_code < 0 {
1077 return Err(ret_code.into());
1078 }
1079 Ok(ret_code)
1080 }
1081 }
1082
1083 pub fn rados_object_setxattr(
1085 &self,
1086 object_name: &str,
1087 attr_name: &str,
1088 attr_value: &mut [u8],
1089 ) -> RadosResult<()> {
1090 self.ioctx_guard()?;
1091 let object_name_str = CString::new(object_name)?;
1092 let attr_name_str = CString::new(attr_name)?;
1093
1094 unsafe {
1095 let ret_code = rados_setxattr(
1096 self.ioctx,
1097 object_name_str.as_ptr() as *const c_char,
1098 attr_name_str.as_ptr() as *const c_char,
1099 attr_value.as_mut_ptr() as *mut c_char,
1100 attr_value.len(),
1101 );
1102 if ret_code < 0 {
1103 return Err(ret_code.into());
1104 }
1105 }
1106 Ok(())
1107 }
1108
1109 pub fn rados_object_rmxattr(&self, object_name: &str, attr_name: &str) -> RadosResult<()> {
1111 self.ioctx_guard()?;
1112 let object_name_str = CString::new(object_name)?;
1113 let attr_name_str = CString::new(attr_name)?;
1114
1115 unsafe {
1116 let ret_code = rados_rmxattr(
1117 self.ioctx,
1118 object_name_str.as_ptr() as *const c_char,
1119 attr_name_str.as_ptr() as *const c_char,
1120 );
1121 if ret_code < 0 {
1122 return Err(ret_code.into());
1123 }
1124 }
1125 Ok(())
1126 }
1127
1128 pub fn rados_get_xattr_iterator(&self, object_name: &str) -> RadosResult<rados_xattrs_iter_t> {
1131 self.ioctx_guard()?;
1132 let object_name_str = CString::new(object_name)?;
1133 let mut xattr_iterator_handle: rados_xattrs_iter_t = ptr::null_mut();
1134
1135 unsafe {
1136 let ret_code = rados_getxattrs(
1137 self.ioctx,
1138 object_name_str.as_ptr(),
1139 &mut xattr_iterator_handle,
1140 );
1141 if ret_code < 0 {
1142 return Err(ret_code.into());
1143 }
1144 }
1145 Ok(xattr_iterator_handle)
1146 }
1147
1148 pub fn rados_object_stat(&self, object_name: &str) -> RadosResult<(u64, SystemTime)> {
1150 self.ioctx_guard()?;
1151 let object_name_str = CString::new(object_name)?;
1152 let mut psize: u64 = 0;
1153 let mut time: ::libc::time_t = 0;
1154
1155 unsafe {
1156 let ret_code = rados_stat(self.ioctx, object_name_str.as_ptr(), &mut psize, &mut time);
1157 if ret_code < 0 {
1158 return Err(ret_code.into());
1159 }
1160 }
1161 Ok((psize, (UNIX_EPOCH + Duration::from_secs(time as u64))))
1162 }
1163
1164 pub fn rados_object_tmap_update(
1166 &self,
1167 object_name: &str,
1168 update: TmapOperation,
1169 ) -> RadosResult<()> {
1170 self.ioctx_guard()?;
1171 let object_name_str = CString::new(object_name)?;
1172 let buffer = update.serialize()?;
1173 unsafe {
1174 let ret_code = rados_tmap_update(
1175 self.ioctx,
1176 object_name_str.as_ptr(),
1177 buffer.as_ptr() as *const c_char,
1178 buffer.len(),
1179 );
1180 if ret_code < 0 {
1181 return Err(ret_code.into());
1182 }
1183 }
1184 Ok(())
1185 }
1186
1187 pub fn rados_object_tmap_get(&self, object_name: &str) -> RadosResult<Vec<TmapOperation>> {
1189 self.ioctx_guard()?;
1190 let object_name_str = CString::new(object_name)?;
1191 let mut buffer: Vec<u8> = Vec::with_capacity(500);
1192
1193 unsafe {
1194 let ret_code = rados_tmap_get(
1195 self.ioctx,
1196 object_name_str.as_ptr(),
1197 buffer.as_mut_ptr() as *mut c_char,
1198 buffer.capacity(),
1199 );
1200 if ret_code == -ERANGE {
1201 buffer.reserve(1000);
1202 buffer.set_len(1000);
1203 let ret_code = rados_tmap_get(
1204 self.ioctx,
1205 object_name_str.as_ptr(),
1206 buffer.as_mut_ptr() as *mut c_char,
1207 buffer.capacity(),
1208 );
1209 if ret_code < 0 {
1210 return Err(ret_code.into());
1211 }
1212 } else if ret_code < 0 {
1213 return Err(ret_code.into());
1214 }
1215 }
1216 match TmapOperation::deserialize(&buffer) {
1217 Ok((_, tmap)) => Ok(tmap),
1218 Err(nom::Err::Incomplete(needed)) => Err(RadosError::new(format!(
1219 "deserialize of ceph tmap failed.
1220 Input from Ceph was too small. Needed: {:?} more bytes",
1221 needed
1222 ))),
1223 Err(nom::Err::Error((e, _))) => {
1224 Err(RadosError::new(String::from_utf8_lossy(e).to_string()))
1225 }
1226 Err(nom::Err::Failure((e, _))) => {
1227 Err(RadosError::new(String::from_utf8_lossy(e).to_string()))
1228 }
1229 }
1230 }
1231
1232 pub fn rados_object_exec(
1241 &self,
1242 object_name: &str,
1243 class_name: &str,
1244 method_name: &str,
1245 input_buffer: &[u8],
1246 output_buffer: &mut [u8],
1247 ) -> RadosResult<()> {
1248 self.ioctx_guard()?;
1249 let object_name_str = CString::new(object_name)?;
1250 let class_name_str = CString::new(class_name)?;
1251 let method_name_str = CString::new(method_name)?;
1252
1253 unsafe {
1254 let ret_code = rados_exec(
1255 self.ioctx,
1256 object_name_str.as_ptr(),
1257 class_name_str.as_ptr(),
1258 method_name_str.as_ptr(),
1259 input_buffer.as_ptr() as *const c_char,
1260 input_buffer.len(),
1261 output_buffer.as_mut_ptr() as *mut c_char,
1262 output_buffer.len(),
1263 );
1264 if ret_code < 0 {
1265 return Err(ret_code.into());
1266 }
1267 }
1268 Ok(())
1269 }
1270
1271 pub fn rados_object_notify(&self, object_name: &str, data: &[u8]) -> RadosResult<()> {
1275 self.ioctx_guard()?;
1276 let object_name_str = CString::new(object_name)?;
1277
1278 unsafe {
1279 let ret_code = rados_notify(
1280 self.ioctx,
1281 object_name_str.as_ptr(),
1282 0,
1283 data.as_ptr() as *const c_char,
1284 data.len() as i32,
1285 );
1286 if ret_code < 0 {
1287 return Err(ret_code.into());
1288 }
1289 }
1290 Ok(())
1291 }
1292 pub fn rados_object_notify_ack(
1305 &self,
1306 object_name: &str,
1307 notify_id: u64,
1308 cookie: u64,
1309 buffer: Option<&[u8]>,
1310 ) -> RadosResult<()> {
1311 self.ioctx_guard()?;
1312 let object_name_str = CString::new(object_name)?;
1313
1314 match buffer {
1315 Some(buf) => unsafe {
1316 let ret_code = rados_notify_ack(
1317 self.ioctx,
1318 object_name_str.as_ptr(),
1319 notify_id,
1320 cookie,
1321 buf.as_ptr() as *const c_char,
1322 buf.len() as i32,
1323 );
1324 if ret_code < 0 {
1325 return Err(ret_code.into());
1326 }
1327 },
1328 None => unsafe {
1329 let ret_code = rados_notify_ack(
1330 self.ioctx,
1331 object_name_str.as_ptr(),
1332 notify_id,
1333 cookie,
1334 ptr::null(),
1335 0,
1336 );
1337 if ret_code < 0 {
1338 return Err(ret_code.into());
1339 }
1340 },
1341 }
1342 Ok(())
1343 }
1344 pub fn rados_object_set_alloc_hint(
1350 &self,
1351 object_name: &str,
1352 expected_object_size: u64,
1353 expected_write_size: u64,
1354 ) -> RadosResult<()> {
1355 self.ioctx_guard()?;
1356 let object_name_str = CString::new(object_name)?;
1357
1358 unsafe {
1359 let ret_code = rados_set_alloc_hint(
1360 self.ioctx,
1361 object_name_str.as_ptr(),
1362 expected_object_size,
1363 expected_write_size,
1364 );
1365 if ret_code < 0 {
1366 return Err(ret_code.into());
1367 }
1368 }
1369 Ok(())
1370 }
1371
1372 pub fn rados_perform_read_operations(&self, read_op: ReadOperation) -> RadosResult<()> {
1374 self.ioctx_guard()?;
1375 let object_name_str = CString::new(read_op.object_name.clone())?;
1376
1377 unsafe {
1378 let ret_code = rados_read_op_operate(
1379 read_op.read_op_handle,
1380 self.ioctx,
1381 object_name_str.as_ptr(),
1382 read_op.flags as i32,
1383 );
1384 if ret_code < 0 {
1385 return Err(ret_code.into());
1386 }
1387 }
1388 Ok(())
1389 }
1390
1391 pub fn rados_commit_write_operations(&self, write_op: &mut WriteOperation) -> RadosResult<()> {
1393 self.ioctx_guard()?;
1394 let object_name_str = CString::new(write_op.object_name.clone())?;
1395
1396 unsafe {
1397 let ret_code = rados_write_op_operate(
1398 write_op.write_op_handle,
1399 self.ioctx,
1400 object_name_str.as_ptr(),
1401 &mut write_op.mtime,
1402 write_op.flags as i32,
1403 );
1404 if ret_code < 0 {
1405 return Err(ret_code.into());
1406 }
1407 }
1408 Ok(())
1409 }
1410
1411 pub fn rados_object_lock_exclusive(
1413 &self,
1414 object_name: &str,
1415 lock_name: &str,
1416 cookie_name: &str,
1417 description: &str,
1418 duration_time: &mut timeval,
1419 lock_flags: u8,
1420 ) -> RadosResult<()> {
1421 self.ioctx_guard()?;
1422 let object_name_str = CString::new(object_name)?;
1423 let lock_name_str = CString::new(lock_name)?;
1424 let cookie_name_str = CString::new(cookie_name)?;
1425 let description_str = CString::new(description)?;
1426
1427 unsafe {
1428 let ret_code = rados_lock_exclusive(
1429 self.ioctx,
1430 object_name_str.as_ptr(),
1431 lock_name_str.as_ptr(),
1432 cookie_name_str.as_ptr(),
1433 description_str.as_ptr(),
1434 duration_time,
1435 lock_flags,
1436 );
1437 if ret_code < 0 {
1438 return Err(ret_code.into());
1439 }
1440 }
1441 Ok(())
1442 }
1443
1444 pub fn rados_object_lock_shared(
1446 &self,
1447 object_name: &str,
1448 lock_name: &str,
1449 cookie_name: &str,
1450 description: &str,
1451 tag_name: &str,
1452 duration_time: &mut timeval,
1453 lock_flags: u8,
1454 ) -> RadosResult<()> {
1455 self.ioctx_guard()?;
1456 let object_name_str = CString::new(object_name)?;
1457 let lock_name_str = CString::new(lock_name)?;
1458 let cookie_name_str = CString::new(cookie_name)?;
1459 let description_str = CString::new(description)?;
1460 let tag_name_str = CString::new(tag_name)?;
1461
1462 unsafe {
1463 let ret_code = rados_lock_shared(
1464 self.ioctx,
1465 object_name_str.as_ptr(),
1466 lock_name_str.as_ptr(),
1467 cookie_name_str.as_ptr(),
1468 tag_name_str.as_ptr(),
1469 description_str.as_ptr(),
1470 duration_time,
1471 lock_flags,
1472 );
1473 if ret_code < 0 {
1474 return Err(ret_code.into());
1475 }
1476 }
1477 Ok(())
1478 }
1479
1480 pub fn rados_object_unlock(
1482 &self,
1483 object_name: &str,
1484 lock_name: &str,
1485 cookie_name: &str,
1486 ) -> RadosResult<()> {
1487 self.ioctx_guard()?;
1488 let object_name_str = CString::new(object_name)?;
1489 let lock_name_str = CString::new(lock_name)?;
1490 let cookie_name_str = CString::new(cookie_name)?;
1491
1492 unsafe {
1493 let ret_code = rados_unlock(
1494 self.ioctx,
1495 object_name_str.as_ptr(),
1496 lock_name_str.as_ptr(),
1497 cookie_name_str.as_ptr(),
1498 );
1499 if ret_code < 0 {
1500 return Err(ret_code.into());
1501 }
1502 }
1503 Ok(())
1504 }
1505
1506 pub fn rados_object_break_lock(
1539 &self,
1540 object_name: &str,
1541 lock_name: &str,
1542 client_name: &str,
1543 cookie_name: &str,
1544 ) -> RadosResult<()> {
1545 self.ioctx_guard()?;
1546 let object_name_str = CString::new(object_name)?;
1547 let lock_name_str = CString::new(lock_name)?;
1548 let cookie_name_str = CString::new(cookie_name)?;
1549 let client_name_str = CString::new(client_name)?;
1550
1551 unsafe {
1552 let ret_code = rados_break_lock(
1553 self.ioctx,
1554 object_name_str.as_ptr(),
1555 lock_name_str.as_ptr(),
1556 client_name_str.as_ptr(),
1557 cookie_name_str.as_ptr(),
1558 );
1559 if ret_code < 0 {
1560 return Err(ret_code.into());
1561 }
1562 }
1563 Ok(())
1564 }
1565
1566 #[cfg(feature = "rados_striper")]
1569 pub fn get_rados_striper(self) -> RadosResult<RadosStriper> {
1570 self.ioctx_guard()?;
1571 unsafe {
1572 let mut rados_striper: rados_striper_t = ptr::null_mut();
1573 let ret_code = rados_striper_create(self.ioctx, &mut rados_striper);
1574 if ret_code < 0 {
1575 return Err(ret_code.into());
1576 }
1577 Ok(RadosStriper { rados_striper })
1578 }
1579 }
1580}
1581
1582impl Rados {
1583 pub fn rados_blacklist_client(&self, client: IpAddr, expire_seconds: u32) -> RadosResult<()> {
1584 self.conn_guard()?;
1585 let client_address = CString::new(client.to_string())?;
1586 unsafe {
1587 let ret_code = rados_blacklist_add(
1588 self.rados,
1589 client_address.as_ptr() as *mut c_char,
1590 expire_seconds,
1591 );
1592
1593 if ret_code < 0 {
1594 return Err(ret_code.into());
1595 }
1596 }
1597 Ok(())
1598 }
1599
1600 #[allow(unused_variables)]
1611 pub fn rados_pools(&self) -> RadosResult<Vec<String>> {
1612 self.conn_guard()?;
1613 let mut pools: Vec<String> = Vec::new();
1614 let pool_slice: &[u8];
1615 let mut pool_buffer: Vec<u8> = Vec::with_capacity(500);
1616
1617 unsafe {
1618 let len = rados_pool_list(
1619 self.rados,
1620 pool_buffer.as_mut_ptr() as *mut c_char,
1621 pool_buffer.capacity(),
1622 );
1623 if len > pool_buffer.capacity() as i32 {
1624 pool_buffer.reserve(len as usize);
1626 let len = rados_pool_list(
1627 self.rados,
1628 pool_buffer.as_mut_ptr() as *mut c_char,
1629 pool_buffer.capacity(),
1630 );
1631 pool_buffer.set_len(len as usize);
1633 } else {
1634 pool_buffer.set_len(len as usize);
1636 }
1637 }
1638 let mut cursor = Cursor::new(&pool_buffer);
1639 loop {
1640 let mut string_buf: Vec<u8> = Vec::new();
1641 let read = cursor.read_until(0x00, &mut string_buf)?;
1642 if read == 0 || read == 1 {
1645 break;
1646 } else {
1647 pools.push(String::from_utf8_lossy(&string_buf[..read - 1]).into_owned());
1649 }
1650 }
1651
1652 Ok(pools)
1653 }
1654
1655 pub fn rados_create_pool(&self, pool_name: &str) -> RadosResult<()> {
1659 self.conn_guard()?;
1660 let pool_name_str = CString::new(pool_name)?;
1661 unsafe {
1662 let ret_code = rados_pool_create(self.rados, pool_name_str.as_ptr());
1663 if ret_code < 0 {
1664 return Err(ret_code.into());
1665 }
1666 }
1667 Ok(())
1668 }
1669 pub fn rados_delete_pool(&self, pool_name: &str) -> RadosResult<()> {
1674 self.conn_guard()?;
1675 let pool_name_str = CString::new(pool_name)?;
1676 unsafe {
1677 let ret_code = rados_pool_delete(self.rados, pool_name_str.as_ptr());
1678 if ret_code < 0 {
1679 return Err(ret_code.into());
1680 }
1681 }
1682 Ok(())
1683 }
1684
1685 pub fn rados_lookup_pool(&self, pool_name: &str) -> RadosResult<Option<i64>> {
1688 self.conn_guard()?;
1689 let pool_name_str = CString::new(pool_name)?;
1690 unsafe {
1691 let ret_code: i64 = rados_pool_lookup(self.rados, pool_name_str.as_ptr());
1692 if ret_code >= 0 {
1693 Ok(Some(ret_code))
1694 } else if ret_code as i32 == -ENOENT {
1695 Ok(None)
1696 } else {
1697 Err((ret_code as i32).into())
1698 }
1699 }
1700 }
1701
1702 pub fn rados_reverse_lookup_pool(&self, pool_id: i64) -> RadosResult<String> {
1703 self.conn_guard()?;
1704 let mut buffer: Vec<u8> = Vec::with_capacity(500);
1705
1706 unsafe {
1707 let ret_code = rados_pool_reverse_lookup(
1708 self.rados,
1709 pool_id,
1710 buffer.as_mut_ptr() as *mut c_char,
1711 buffer.capacity(),
1712 );
1713 if ret_code == -ERANGE {
1714 buffer.reserve(1000);
1716 buffer.set_len(1000);
1717 let ret_code = rados_pool_reverse_lookup(
1718 self.rados,
1719 pool_id,
1720 buffer.as_mut_ptr() as *mut c_char,
1721 buffer.capacity(),
1722 );
1723 if ret_code < 0 {
1724 return Err(ret_code.into());
1725 }
1726 Ok(String::from_utf8_lossy(&buffer).into_owned())
1727 } else if ret_code < 0 {
1728 Err(ret_code.into())
1729 } else {
1730 Ok(String::from_utf8_lossy(&buffer).into_owned())
1731 }
1732 }
1733 }
1734}
1735
1736pub fn rados_libversion() -> RadosVersion {
1738 let mut major: c_int = 0;
1739 let mut minor: c_int = 0;
1740 let mut extra: c_int = 0;
1741 unsafe {
1742 rados_version(&mut major, &mut minor, &mut extra);
1743 }
1744 RadosVersion {
1745 major,
1746 minor,
1747 extra,
1748 }
1749}
1750
1751impl Rados {
1752 pub fn rados_stat_cluster(&self) -> RadosResult<Struct_rados_cluster_stat_t> {
1759 self.conn_guard()?;
1760 let mut cluster_stat = Struct_rados_cluster_stat_t::default();
1761 unsafe {
1762 let ret_code = rados_cluster_stat(self.rados, &mut cluster_stat);
1763 if ret_code < 0 {
1764 return Err(ret_code.into());
1765 }
1766 }
1767
1768 Ok(cluster_stat)
1769 }
1770
1771 pub fn rados_fsid(&self) -> RadosResult<Uuid> {
1772 self.conn_guard()?;
1773 let mut fsid_buffer: Vec<u8> = Vec::with_capacity(37);
1774 unsafe {
1775 let ret_code = rados_cluster_fsid(
1776 self.rados,
1777 fsid_buffer.as_mut_ptr() as *mut c_char,
1778 fsid_buffer.capacity(),
1779 );
1780 if ret_code < 0 {
1781 return Err(ret_code.into());
1782 }
1783 fsid_buffer.set_len(ret_code as usize);
1785 }
1786 let fsid_str = String::from_utf8(fsid_buffer)?;
1788 Ok(fsid_str.parse()?)
1790 }
1791
1792 pub fn ping_monitor(&self, mon_id: &str) -> RadosResult<String> {
1797 self.conn_guard()?;
1798
1799 let mon_id_str = CString::new(mon_id)?;
1800 let mut out_str: *mut c_char = ptr::null_mut();
1801 let mut str_length: usize = 0;
1802 unsafe {
1803 let ret_code = rados_ping_monitor(
1804 self.rados,
1805 mon_id_str.as_ptr(),
1806 &mut out_str,
1807 &mut str_length,
1808 );
1809 if ret_code < 0 {
1810 return Err(ret_code.into());
1811 }
1812 if !out_str.is_null() {
1813 let s_bytes = std::slice::from_raw_parts(out_str, str_length);
1815 let bytes: Vec<u8> = s_bytes.iter().map(|c| *c as u8).collect();
1817 rados_buffer_free(out_str);
1819 Ok(String::from_utf8_lossy(&bytes).into_owned())
1820 } else {
1821 Ok("".into())
1822 }
1823 }
1824 }
1825}
1826
1827pub fn ceph_version(socket: &str) -> Option<String> {
1833 let cmd = "version";
1834
1835 admin_socket_command(&cmd, socket).ok().and_then(|json| {
1836 json_data(&json)
1837 .and_then(|jsondata| json_find(jsondata, &[cmd]).map(|data| json_as_string(&data)))
1838 })
1839}
1840
1841pub fn ceph_version_parse() -> Option<String> {
1845 match run_cli("ceph --version") {
1846 Ok(output) => {
1847 let n = output.status.code().unwrap();
1848 if n == 0 {
1849 Some(String::from_utf8_lossy(&output.stdout).to_string())
1850 } else {
1851 Some(String::from_utf8_lossy(&output.stderr).to_string())
1852 }
1853 }
1854 Err(_) => None,
1855 }
1856}
1857
1858impl Rados {
1859 pub fn ceph_status(&self, keys: &[&str]) -> RadosResult<String> {
1861 self.conn_guard()?;
1862 match self.ceph_mon_command("prefix", "status", Some("json")) {
1863 Ok((json, _)) => match json {
1864 Some(json) => match json_data(&json) {
1865 Some(jsondata) => {
1866 if let Some(data) = json_find(jsondata, keys) {
1867 Ok(json_as_string(&data))
1868 } else {
1869 Err(RadosError::new(
1870 "The attributes were not found in the output.".to_string(),
1871 ))
1872 }
1873 }
1874 _ => Err(RadosError::new("JSON data not found.".to_string())),
1875 },
1876 _ => Err(RadosError::new("JSON data not found.".to_string())),
1877 },
1878 Err(e) => Err(e),
1879 }
1880 }
1881
1882 pub fn ceph_health_string(&self) -> RadosResult<String> {
1885 self.conn_guard()?;
1886 match self.ceph_mon_command("prefix", "health", None) {
1887 Ok((data, _)) => Ok(data.unwrap().replace("\n", "")),
1888 Err(e) => Err(e),
1889 }
1890 }
1891
1892 pub fn ceph_health(&self) -> CephHealth {
1897 match self.ceph_health_string() {
1898 Ok(health) => {
1899 if health.contains("HEALTH_OK") {
1900 CephHealth::Ok
1901 } else if health.contains("HEALTH_WARN") {
1902 CephHealth::Warning
1903 } else {
1904 CephHealth::Error
1905 }
1906 }
1907 Err(_) => CephHealth::Error,
1908 }
1909 }
1910
1911 pub fn ceph_command(
1913 &self,
1914 name: &str,
1915 value: &str,
1916 cmd_type: CephCommandTypes,
1917 keys: &[&str],
1918 ) -> RadosResult<JsonData> {
1919 self.conn_guard()?;
1920 match cmd_type {
1921 CephCommandTypes::Osd => Err(RadosError::new("OSD CMDs Not implemented.".to_string())),
1922 CephCommandTypes::Pgs => Err(RadosError::new("PGS CMDS Not implemented.".to_string())),
1923 _ => match self.ceph_mon_command(name, value, Some("json")) {
1924 Ok((json, _)) => match json {
1925 Some(json) => match json_data(&json) {
1926 Some(jsondata) => {
1927 if let Some(data) = json_find(jsondata, keys) {
1928 Ok(data)
1929 } else {
1930 Err(RadosError::new(
1931 "The attributes were not found in the output.".to_string(),
1932 ))
1933 }
1934 }
1935 _ => Err(RadosError::new("JSON data not found.".to_string())),
1936 },
1937 _ => Err(RadosError::new("JSON data not found.".to_string())),
1938 },
1939 Err(e) => Err(e),
1940 },
1941 }
1942 }
1943
1944 pub fn ceph_commands(&self, keys: Option<&[&str]>) -> RadosResult<JsonData> {
1946 self.conn_guard()?;
1947 match self.ceph_mon_command("prefix", "get_command_descriptions", Some("json")) {
1948 Ok((json, _)) => match json {
1949 Some(json) => match json_data(&json) {
1950 Some(jsondata) => {
1951 if let Some(k) = keys {
1952 if let Some(data) = json_find(jsondata, k) {
1953 Ok(data)
1954 } else {
1955 Err(RadosError::new(
1956 "The attributes were not found in the output.".to_string(),
1957 ))
1958 }
1959 } else {
1960 Ok(jsondata)
1961 }
1962 }
1963 _ => Err(RadosError::new("JSON data not found.".to_string())),
1964 },
1965 _ => Err(RadosError::new("JSON data not found.".to_string())),
1966 },
1967 Err(e) => Err(e),
1968 }
1969 }
1970
1971 pub fn ceph_mon_command(
1973 &self,
1974 name: &str,
1975 value: &str,
1976 format: Option<&str>,
1977 ) -> RadosResult<(Option<String>, Option<String>)> {
1978 let data: Vec<*mut c_char> = Vec::with_capacity(1);
1979 self.ceph_mon_command_with_data(name, value, format, data)
1980 }
1981
1982 pub fn ceph_mon_command_without_data(
1983 &self,
1984 cmd: &serde_json::Value,
1985 ) -> RadosResult<(Vec<u8>, Option<String>)> {
1986 self.conn_guard()?;
1987 let cmd_string = cmd.to_string();
1988 debug!("ceph_mon_command_without_data: {}", cmd_string);
1989 let data: Vec<*mut c_char> = Vec::with_capacity(1);
1990 let cmds = CString::new(cmd_string).unwrap();
1991
1992 let mut outbuf_len = 0;
1993 let mut outs = ptr::null_mut();
1994 let mut outs_len = 0;
1995
1996 let mut outbuf = ptr::null_mut();
2000 let mut out: Vec<u8> = vec![];
2001 let mut status_string: Option<String> = None;
2002
2003 debug!("Calling rados_mon_command with {:?}", cmd);
2004
2005 unsafe {
2006 let ret_code = rados_mon_command(
2008 self.rados,
2009 &mut cmds.as_ptr(),
2010 1,
2011 data.as_ptr() as *mut c_char,
2012 data.len() as usize,
2013 &mut outbuf,
2014 &mut outbuf_len,
2015 &mut outs,
2016 &mut outs_len,
2017 );
2018 debug!("return code: {}", ret_code);
2019 if ret_code < 0 {
2020 if outs_len > 0 && !outs.is_null() {
2021 let slice = ::std::slice::from_raw_parts(outs as *const u8, outs_len as usize);
2022 rados_buffer_free(outs);
2023 return Err(RadosError::new(String::from_utf8_lossy(slice).into_owned()));
2024 }
2025 return Err(ret_code.into());
2026 }
2027
2028 if outbuf_len > 0 && !outbuf.is_null() {
2030 let slice = ::std::slice::from_raw_parts(outbuf as *const u8, outbuf_len as usize);
2031 out = slice.to_vec();
2032
2033 rados_buffer_free(outbuf);
2034 }
2035 if outs_len > 0 && !outs.is_null() {
2036 let slice = ::std::slice::from_raw_parts(outs as *const u8, outs_len as usize);
2037 status_string = Some(String::from_utf8(slice.to_vec())?);
2038 rados_buffer_free(outs);
2039 }
2040 }
2041
2042 Ok((out, status_string))
2043 }
2044
2045 pub fn ceph_mon_command_with_data(
2048 &self,
2049 name: &str,
2050 value: &str,
2051 format: Option<&str>,
2052 data: Vec<*mut c_char>,
2053 ) -> RadosResult<(Option<String>, Option<String>)> {
2054 self.conn_guard()?;
2055
2056 let mut cmd_strings: Vec<String> = Vec::new();
2057 match format {
2058 Some(fmt) => cmd_strings.push(format!(
2059 "{{\"{}\": \"{}\", \"format\": \"{}\"}}",
2060 name, value, fmt
2061 )),
2062 None => cmd_strings.push(format!("{{\"{}\": \"{}\"}}", name, value)),
2063 }
2064
2065 let cstrings: Vec<CString> = cmd_strings[..]
2066 .iter()
2067 .map(|s| CString::new(s.clone()).unwrap())
2068 .collect();
2069 let mut cmds: Vec<*const c_char> = cstrings.iter().map(|c| c.as_ptr()).collect();
2070
2071 let mut outbuf = ptr::null_mut();
2072 let mut outs = ptr::null_mut();
2073 let mut outbuf_len = 0;
2074 let mut outs_len = 0;
2075
2076 let mut str_outbuf: Option<String> = None;
2080 let mut str_outs: Option<String> = None;
2081
2082 debug!("Calling rados_mon_command with {:?}", cstrings);
2083
2084 unsafe {
2085 let ret_code = rados_mon_command(
2087 self.rados,
2088 cmds.as_mut_ptr(),
2089 1,
2090 data.as_ptr() as *mut c_char,
2091 data.len() as usize,
2092 &mut outbuf,
2093 &mut outbuf_len,
2094 &mut outs,
2095 &mut outs_len,
2096 );
2097 if ret_code < 0 {
2098 return Err(ret_code.into());
2099 }
2100
2101 if outbuf_len > 0 {
2103 let c_str_outbuf: &CStr = CStr::from_ptr(outbuf);
2104 let buf_outbuf: &[u8] = c_str_outbuf.to_bytes();
2105 let str_slice_outbuf: &str = str::from_utf8(buf_outbuf).unwrap();
2106 str_outbuf = Some(str_slice_outbuf.to_owned());
2107
2108 rados_buffer_free(outbuf);
2109 }
2110
2111 if outs_len > 0 {
2112 let c_str_outs: &CStr = CStr::from_ptr(outs);
2113 let buf_outs: &[u8] = c_str_outs.to_bytes();
2114 let str_slice_outs: &str = str::from_utf8(buf_outs).unwrap();
2115 str_outs = Some(str_slice_outs.to_owned());
2116
2117 rados_buffer_free(outs);
2118 }
2119 }
2120
2121 Ok((str_outbuf, str_outs))
2122 }
2123
2124 pub fn ceph_osd_command(
2126 &self,
2127 id: i32,
2128 name: &str,
2129 value: &str,
2130 format: Option<&str>,
2131 ) -> RadosResult<(Option<String>, Option<String>)> {
2132 let data: Vec<*mut c_char> = Vec::with_capacity(1);
2133 self.ceph_osd_command_with_data(id, name, value, format, data)
2134 }
2135
2136 pub fn ceph_osd_command_with_data(
2138 &self,
2139 id: i32,
2140 name: &str,
2141 value: &str,
2142 format: Option<&str>,
2143 data: Vec<*mut c_char>,
2144 ) -> RadosResult<(Option<String>, Option<String>)> {
2145 self.conn_guard()?;
2146
2147 let mut cmd_strings: Vec<String> = Vec::new();
2148 match format {
2149 Some(fmt) => cmd_strings.push(format!(
2150 "{{\"{}\": \"{}\", \"format\": \"{}\"}}",
2151 name, value, fmt
2152 )),
2153 None => cmd_strings.push(format!("{{\"{}\": \"{}\"}}", name, value)),
2154 }
2155
2156 let cstrings: Vec<CString> = cmd_strings[..]
2157 .iter()
2158 .map(|s| CString::new(s.clone()).unwrap())
2159 .collect();
2160 let mut cmds: Vec<*const c_char> = cstrings.iter().map(|c| c.as_ptr()).collect();
2161
2162 let mut outbuf = ptr::null_mut();
2163 let mut outs = ptr::null_mut();
2164 let mut outbuf_len = 0;
2165 let mut outs_len = 0;
2166
2167 let mut str_outbuf: Option<String> = None;
2171 let mut str_outs: Option<String> = None;
2172
2173 unsafe {
2174 let ret_code = rados_osd_command(
2176 self.rados,
2177 id,
2178 cmds.as_mut_ptr(),
2179 1,
2180 data.as_ptr() as *mut c_char,
2181 data.len() as usize,
2182 &mut outbuf,
2183 &mut outbuf_len,
2184 &mut outs,
2185 &mut outs_len,
2186 );
2187 if ret_code < 0 {
2188 return Err(ret_code.into());
2189 }
2190
2191 if outbuf_len > 0 {
2193 let c_str_outbuf: &CStr = CStr::from_ptr(outbuf);
2194 let buf_outbuf: &[u8] = c_str_outbuf.to_bytes();
2195 let str_slice_outbuf: &str = str::from_utf8(buf_outbuf).unwrap();
2196 str_outbuf = Some(str_slice_outbuf.to_owned());
2197
2198 rados_buffer_free(outbuf);
2199 }
2200
2201 if outs_len > 0 {
2202 let c_str_outs: &CStr = CStr::from_ptr(outs);
2203 let buf_outs: &[u8] = c_str_outs.to_bytes();
2204 let str_slice_outs: &str = str::from_utf8(buf_outs).unwrap();
2205 str_outs = Some(str_slice_outs.to_owned());
2206
2207 rados_buffer_free(outs);
2208 }
2209 }
2210
2211 Ok((str_outbuf, str_outs))
2212 }
2213
2214 pub fn ceph_pgs_command(
2216 &self,
2217 pg: &str,
2218 name: &str,
2219 value: &str,
2220 format: Option<&str>,
2221 ) -> RadosResult<(Option<String>, Option<String>)> {
2222 let data: Vec<*mut c_char> = Vec::with_capacity(1);
2223 self.ceph_pgs_command_with_data(pg, name, value, format, data)
2224 }
2225
2226 pub fn ceph_pgs_command_with_data(
2228 &self,
2229 pg: &str,
2230 name: &str,
2231 value: &str,
2232 format: Option<&str>,
2233 data: Vec<*mut c_char>,
2234 ) -> RadosResult<(Option<String>, Option<String>)> {
2235 self.conn_guard()?;
2236
2237 let mut cmd_strings: Vec<String> = Vec::new();
2238 match format {
2239 Some(fmt) => cmd_strings.push(format!(
2240 "{{\"{}\": \"{}\", \"format\": \"{}\"}}",
2241 name, value, fmt
2242 )),
2243 None => cmd_strings.push(format!("{{\"{}\": \"{}\"}}", name, value)),
2244 }
2245
2246 let pg_str = CString::new(pg).unwrap();
2247 let cstrings: Vec<CString> = cmd_strings[..]
2248 .iter()
2249 .map(|s| CString::new(s.clone()).unwrap())
2250 .collect();
2251 let mut cmds: Vec<*const c_char> = cstrings.iter().map(|c| c.as_ptr()).collect();
2252
2253 let mut outbuf = ptr::null_mut();
2254 let mut outs = ptr::null_mut();
2255 let mut outbuf_len = 0;
2256 let mut outs_len = 0;
2257
2258 let mut str_outbuf: Option<String> = None;
2262 let mut str_outs: Option<String> = None;
2263
2264 unsafe {
2265 let ret_code = rados_pg_command(
2267 self.rados,
2268 pg_str.as_ptr(),
2269 cmds.as_mut_ptr(),
2270 1,
2271 data.as_ptr() as *mut c_char,
2272 data.len() as usize,
2273 &mut outbuf,
2274 &mut outbuf_len,
2275 &mut outs,
2276 &mut outs_len,
2277 );
2278 if ret_code < 0 {
2279 return Err(ret_code.into());
2280 }
2281
2282 if outbuf_len > 0 {
2284 let c_str_outbuf: &CStr = CStr::from_ptr(outbuf);
2285 let buf_outbuf: &[u8] = c_str_outbuf.to_bytes();
2286 let str_slice_outbuf: &str = str::from_utf8(buf_outbuf).unwrap();
2287 str_outbuf = Some(str_slice_outbuf.to_owned());
2288
2289 rados_buffer_free(outbuf);
2290 }
2291
2292 if outs_len > 0 {
2293 let c_str_outs: &CStr = CStr::from_ptr(outs);
2294 let buf_outs: &[u8] = c_str_outs.to_bytes();
2295 let str_slice_outs: &str = str::from_utf8(buf_outs).unwrap();
2296 str_outs = Some(str_slice_outs.to_owned());
2297
2298 rados_buffer_free(outs);
2299 }
2300 }
2301
2302 Ok((str_outbuf, str_outs))
2303 }
2304}
2305
2306#[cfg(feature = "rados_striper")]
2307impl RadosStriper {
2308 pub fn inner(&self) -> &rados_striper_t {
2309 &self.rados_striper
2310 }
2311
2312 pub fn destroy_rados_striper(&self) {
2314 if self.rados_striper.is_null() {
2315 return;
2317 }
2318 unsafe {
2319 rados_striper_destroy(self.rados_striper);
2320 }
2321 }
2322
2323 fn rados_striper_guard(&self) -> RadosResult<()> {
2324 if self.rados_striper.is_null() {
2325 return Err(RadosError::new(
2326 "Rados striper not created. Please initialize first".to_string(),
2327 ));
2328 }
2329 Ok(())
2330 }
2331
2332 pub fn rados_object_write(
2335 &self,
2336 object_name: &str,
2337 buffer: &[u8],
2338 offset: u64,
2339 ) -> RadosResult<()> {
2340 self.rados_striper_guard()?;
2341 let obj_name_str = CString::new(object_name)?;
2342
2343 unsafe {
2344 let ret_code = rados_striper_write(
2345 self.rados_striper,
2346 obj_name_str.as_ptr(),
2347 buffer.as_ptr() as *const c_char,
2348 buffer.len(),
2349 offset,
2350 );
2351 if ret_code < 0 {
2352 return Err(ret_code.into());
2353 }
2354 }
2355 Ok(())
2356 }
2357
2358 pub fn rados_object_write_full(&self, object_name: &str, buffer: &[u8]) -> RadosResult<()> {
2362 self.rados_striper_guard()?;
2363 let obj_name_str = CString::new(object_name)?;
2364
2365 unsafe {
2366 let ret_code = rados_striper_write_full(
2367 self.rados_striper,
2368 obj_name_str.as_ptr(),
2369 buffer.as_ptr() as *const ::libc::c_char,
2370 buffer.len(),
2371 );
2372 if ret_code < 0 {
2373 return Err(ret_code.into());
2374 }
2375 }
2376 Ok(())
2377 }
2378
2379 pub fn rados_object_append(&self, object_name: &str, buffer: &[u8]) -> RadosResult<()> {
2381 self.rados_striper_guard()?;
2382 let obj_name_str = CString::new(object_name)?;
2383
2384 unsafe {
2385 let ret_code = rados_striper_append(
2386 self.rados_striper,
2387 obj_name_str.as_ptr(),
2388 buffer.as_ptr() as *const c_char,
2389 buffer.len(),
2390 );
2391 if ret_code < 0 {
2392 return Err(ret_code.into());
2393 }
2394 }
2395 Ok(())
2396 }
2397
2398 pub fn rados_object_read(
2405 &self,
2406 object_name: &str,
2407 fill_buffer: &mut Vec<u8>,
2408 read_offset: u64,
2409 ) -> RadosResult<i32> {
2410 self.rados_striper_guard()?;
2411 let object_name_str = CString::new(object_name)?;
2412 let mut len = fill_buffer.capacity();
2413 if len == 0 {
2414 fill_buffer.reserve_exact(1024 * 64);
2415 len = fill_buffer.capacity();
2416 }
2417
2418 unsafe {
2419 let ret_code = rados_striper_read(
2420 self.rados_striper,
2421 object_name_str.as_ptr(),
2422 fill_buffer.as_mut_ptr() as *mut c_char,
2423 len,
2424 read_offset,
2425 );
2426 if ret_code < 0 {
2427 return Err(ret_code.into());
2428 }
2429 fill_buffer.set_len(ret_code as usize);
2430 Ok(ret_code)
2431 }
2432 }
2433
2434 pub fn rados_object_remove(&self, object_name: &str) -> RadosResult<()> {
2437 self.rados_striper_guard()?;
2438 let object_name_str = CString::new(object_name)?;
2439
2440 unsafe {
2441 let ret_code = rados_striper_remove(
2442 self.rados_striper,
2443 object_name_str.as_ptr() as *const c_char,
2444 );
2445 if ret_code < 0 {
2446 return Err(ret_code.into());
2447 }
2448 }
2449 Ok(())
2450 }
2451
2452 pub fn rados_object_trunc(&self, object_name: &str, new_size: u64) -> RadosResult<()> {
2456 self.rados_striper_guard()?;
2457 let object_name_str = CString::new(object_name)?;
2458
2459 unsafe {
2460 let ret_code =
2461 rados_striper_trunc(self.rados_striper, object_name_str.as_ptr(), new_size);
2462 if ret_code < 0 {
2463 return Err(ret_code.into());
2464 }
2465 }
2466 Ok(())
2467 }
2468
2469 pub fn rados_object_getxattr(
2471 &self,
2472 object_name: &str,
2473 attr_name: &str,
2474 fill_buffer: &mut [u8],
2475 ) -> RadosResult<i32> {
2476 self.rados_striper_guard()?;
2477 let object_name_str = CString::new(object_name)?;
2478 let attr_name_str = CString::new(attr_name)?;
2479
2480 unsafe {
2481 let ret_code = rados_striper_getxattr(
2482 self.rados_striper,
2483 object_name_str.as_ptr() as *const c_char,
2484 attr_name_str.as_ptr() as *const c_char,
2485 fill_buffer.as_mut_ptr() as *mut c_char,
2486 fill_buffer.len(),
2487 );
2488 if ret_code < 0 {
2489 return Err(ret_code.into());
2490 }
2491 Ok(ret_code)
2492 }
2493 }
2494
2495 pub fn rados_object_setxattr(
2497 &self,
2498 object_name: &str,
2499 attr_name: &str,
2500 attr_value: &mut [u8],
2501 ) -> RadosResult<()> {
2502 self.rados_striper_guard()?;
2503 let object_name_str = CString::new(object_name)?;
2504 let attr_name_str = CString::new(attr_name)?;
2505
2506 unsafe {
2507 let ret_code = rados_striper_setxattr(
2508 self.rados_striper,
2509 object_name_str.as_ptr() as *const c_char,
2510 attr_name_str.as_ptr() as *const c_char,
2511 attr_value.as_mut_ptr() as *mut c_char,
2512 attr_value.len(),
2513 );
2514 if ret_code < 0 {
2515 return Err(ret_code.into());
2516 }
2517 }
2518 Ok(())
2519 }
2520
2521 pub fn rados_object_rmxattr(&self, object_name: &str, attr_name: &str) -> RadosResult<()> {
2523 self.rados_striper_guard()?;
2524 let object_name_str = CString::new(object_name)?;
2525 let attr_name_str = CString::new(attr_name)?;
2526
2527 unsafe {
2528 let ret_code = rados_striper_rmxattr(
2529 self.rados_striper,
2530 object_name_str.as_ptr() as *const c_char,
2531 attr_name_str.as_ptr() as *const c_char,
2532 );
2533 if ret_code < 0 {
2534 return Err(ret_code.into());
2535 }
2536 }
2537 Ok(())
2538 }
2539
2540 pub fn rados_get_xattr_iterator(&self, object_name: &str) -> RadosResult<rados_xattrs_iter_t> {
2543 self.rados_striper_guard()?;
2544 let object_name_str = CString::new(object_name)?;
2545 let mut xattr_iterator_handle: rados_xattrs_iter_t = ptr::null_mut();
2546
2547 unsafe {
2548 let ret_code = rados_striper_getxattrs(
2549 self.rados_striper,
2550 object_name_str.as_ptr(),
2551 &mut xattr_iterator_handle,
2552 );
2553 if ret_code < 0 {
2554 return Err(ret_code.into());
2555 }
2556 }
2557 Ok(xattr_iterator_handle)
2558 }
2559
2560 pub fn rados_object_stat(&self, object_name: &str) -> RadosResult<(u64, SystemTime)> {
2562 self.rados_striper_guard()?;
2563 let object_name_str = CString::new(object_name)?;
2564 let mut psize: u64 = 0;
2565 let mut time: ::libc::time_t = 0;
2566
2567 unsafe {
2568 let ret_code = rados_striper_stat(
2569 self.rados_striper,
2570 object_name_str.as_ptr(),
2571 &mut psize,
2572 &mut time,
2573 );
2574 if ret_code < 0 {
2575 return Err(ret_code.into());
2576 }
2577 }
2578 Ok((psize, (UNIX_EPOCH + Duration::from_secs(time as u64))))
2579 }
2580}