ceph_async/
ceph_client.rs

1use std::collections::HashMap;
2
3use crate::ceph::{connect_to_ceph, Rados};
4use crate::cmd;
5use crate::rados;
6
7use libc::c_char;
8use std::ffi::CString;
9use std::{ptr, str};
10
11use crate::error::RadosError;
12use crate::{CephVersion, MonCommand, OsdOption, PoolOption};
13
14/// A CephClient is a struct that handles communicating with Ceph
15/// in a nicer, Rustier way
16///
17/// ```rust,no_run
18/// # use ceph::CephClient;
19/// # use ceph::cmd::CrushTree;
20/// # use ceph::error::RadosError;
21/// # fn main() {
22/// #   let _ = run();
23/// # }
24/// # fn run() -> Result<CrushTree, RadosError> {
25/// let client = CephClient::new("admin", "/etc/ceph/ceph.conf")?;
26/// let tree = client.osd_tree()?;
27/// # Ok(tree)
28/// # }
29/// ```
30pub struct CephClient {
31    rados_t: Rados,
32    simulate: bool,
33    version: CephVersion,
34}
35
36macro_rules! min_version {
37    ($version:ident, $self:ident) => {{
38        if $self.version < CephVersion::$version {
39            return Err(RadosError::MinVersion(CephVersion::$version, $self.version));
40        }
41    }};
42}
43
44impl CephClient {
45    pub fn new<T1: AsRef<str>, T2: AsRef<str>>(
46        user_id: T1,
47        config_file: T2,
48    ) -> Result<CephClient, RadosError> {
49        let rados_t = match connect_to_ceph(&user_id.as_ref(), &config_file.as_ref()) {
50            Ok(rados_t) => rados_t,
51            Err(e) => return Err(e),
52        };
53        let version: CephVersion = match cmd::version(&rados_t)?.parse() {
54            Ok(v) => v,
55            Err(e) => return Err(e),
56        };
57
58        Ok(CephClient {
59            rados_t,
60            simulate: false,
61            version,
62        })
63    }
64
65    pub fn simulate(mut self) -> Self {
66        self.simulate = true;
67        self
68    }
69
70    pub fn osd_out(&self, osd_id: u64) -> Result<(), RadosError> {
71        let osd_id = osd_id.to_string();
72        let cmd = MonCommand::new()
73            .with_prefix("osd out")
74            .with("ids", &osd_id);
75
76        if !self.simulate {
77            self.run_command(cmd)?;
78        }
79        Ok(())
80    }
81
82    pub fn osd_crush_remove(&self, osd_id: u64) -> Result<(), RadosError> {
83        let osd_id = format!("osd.{}", osd_id);
84        let cmd = MonCommand::new()
85            .with_prefix("osd crush remove")
86            .with_name(&osd_id);
87        if !self.simulate {
88            self.run_command(cmd)?;
89        }
90        Ok(())
91    }
92
93    /// Query a ceph pool.
94    pub fn osd_pool_get(&self, pool: &str, choice: &PoolOption) -> Result<String, RadosError> {
95        let cmd = MonCommand::new()
96            .with_prefix("osd pool get")
97            .with("pool", pool)
98            .with("var", choice.as_ref());
99        if let Ok(result) = self.run_command(cmd) {
100            let mut l = result.lines();
101            match l.next() {
102                Some(res) => return Ok(res.into()),
103                None => {
104                    return Err(RadosError::Error(format!(
105                        "Unable to parse osd pool get output: {:?}",
106                        result,
107                    )))
108                }
109            }
110        }
111
112        Err(RadosError::Error(
113            "No response from ceph for osd pool get".into(),
114        ))
115    }
116    /// Set a pool value
117    pub fn osd_pool_set(&self, pool: &str, key: &str, value: &str) -> Result<(), RadosError> {
118        let cmd = MonCommand::new()
119            .with_prefix("osd pool set")
120            .with("pool", pool)
121            .with("var", key)
122            .with("value", value);
123        if !self.simulate {
124            self.run_command(cmd)?;
125        }
126        Ok(())
127    }
128
129    /// Can be used to set options on an OSD
130    ///
131    /// ```rust,no_run
132    /// # use ceph::{OsdOption, CephClient};
133    /// # use ceph::error::RadosError;
134    /// # fn main() {
135    /// #   let _ = run();
136    /// # }
137    /// # fn run() -> Result<(), RadosError> {
138    /// let client = CephClient::new("admin", "/etc/ceph/ceph.conf")?;
139    /// client.osd_set(OsdOption::NoDown, false)?;
140    /// # Ok(())
141    /// # }
142    /// ```
143    pub fn osd_set(&self, key: OsdOption, force: bool) -> Result<(), RadosError> {
144        let key = key.to_string();
145        let cmd = {
146            let mut c = MonCommand::new().with_prefix("osd set").with("key", &key);
147            if force {
148                c = c.with("sure", "--yes-i-really-mean-it");
149            }
150            c
151        };
152        if !self.simulate {
153            self.run_command(cmd)?;
154        }
155        Ok(())
156    }
157
158    /// Can be used to unset options on an OSD
159    ///
160    /// ```rust,no_run
161    /// # use ceph::{OsdOption, CephClient};
162    /// # use ceph::error::RadosError;
163    /// # fn main() {
164    /// #   let _ = run();
165    /// # }
166    /// # fn run() -> Result<(), RadosError> {
167    /// let client = CephClient::new("admin", "/etc/ceph/ceph.conf")?;
168    /// client.osd_unset(OsdOption::NoDown)?;
169    /// # Ok(())
170    /// # }
171    /// ```
172    pub fn osd_unset(&self, key: OsdOption) -> Result<(), RadosError> {
173        cmd::osd_unset(&self.rados_t, &key, self.simulate).map_err(|a| a)
174    }
175
176    pub fn osd_tree(&self) -> Result<cmd::CrushTree, RadosError> {
177        cmd::osd_tree(&self.rados_t).map_err(|a| a)
178    }
179
180    /// Get cluster status
181    pub fn status(&self) -> Result<String, RadosError> {
182        let cmd = MonCommand::new().with_prefix("status").with_format("json");
183        let return_data = self.run_command(cmd)?;
184        let mut l = return_data.lines();
185        match l.next() {
186            Some(res) => Ok(res.into()),
187            None => Err(RadosError::Error("No response from ceph for status".into())),
188        }
189    }
190
191    /// List all the monitors in the cluster and their current rank
192    pub fn mon_dump(&self) -> Result<cmd::MonDump, RadosError> {
193        Ok(cmd::mon_dump(&self.rados_t)?)
194    }
195
196    /// Get the mon quorum
197    pub fn mon_quorum(&self) -> Result<String, RadosError> {
198        Ok(cmd::mon_quorum(&self.rados_t)?)
199    }
200
201    /// Show mon daemon version
202    pub fn version(&self) -> Result<CephVersion, RadosError> {
203        cmd::version(&self.rados_t)?.parse()
204    }
205
206    pub fn osd_pool_quota_get(&self, pool: &str) -> Result<u64, RadosError> {
207        Ok(cmd::osd_pool_quota_get(&self.rados_t, pool)?)
208    }
209
210    pub fn auth_del(&self, osd_id: u64) -> Result<(), RadosError> {
211        Ok(cmd::auth_del(&self.rados_t, osd_id, self.simulate)?)
212    }
213
214    pub fn osd_rm(&self, osd_id: u64) -> Result<(), RadosError> {
215        Ok(cmd::osd_rm(&self.rados_t, osd_id, self.simulate)?)
216    }
217
218    pub fn osd_create(&self, id: Option<u64>) -> Result<u64, RadosError> {
219        Ok(cmd::osd_create(&self.rados_t, id, self.simulate)?)
220    }
221
222    // Add a new mgr to the cluster
223    pub fn mgr_auth_add(&self, mgr_id: &str) -> Result<(), RadosError> {
224        Ok(cmd::mgr_auth_add(&self.rados_t, mgr_id, self.simulate)?)
225    }
226
227    // Add a new osd to the cluster
228    pub fn osd_auth_add(&self, osd_id: u64) -> Result<(), RadosError> {
229        Ok(cmd::osd_auth_add(&self.rados_t, osd_id, self.simulate)?)
230    }
231
232    /// Get a ceph-x key.  The id parameter can be either a number or a string
233    /// depending on the type of client so I went with string.
234    pub fn auth_get_key(&self, client_type: &str, id: &str) -> Result<String, RadosError> {
235        Ok(cmd::auth_get_key(&self.rados_t, client_type, id)?)
236    }
237
238    // ceph osd crush add {id-or-name} {weight}  [{bucket-type}={bucket-name} ...]
239    /// add or update crushmap position and weight for an osd
240    pub fn osd_crush_add(&self, osd_id: u64, weight: f64, host: &str) -> Result<(), RadosError> {
241        Ok(cmd::osd_crush_add(
242            &self.rados_t,
243            osd_id,
244            weight,
245            host,
246            self.simulate,
247        )?)
248    }
249
250    // ceph osd crush reweight {id} {weight}
251    /// reweight an osd in the CRUSH map
252    pub fn osd_crush_reweight(&self, osd_id: u64, weight: f64) -> Result<(), RadosError> {
253        Ok(cmd::osd_crush_reweight(
254            &self.rados_t,
255            osd_id,
256            weight,
257            self.simulate,
258        )?)
259    }
260
261    /// check if a single osd is safe to destroy/remove
262    pub fn osd_safe_to_destroy(&self, osd_id: u64) -> bool {
263        cmd::osd_safe_to_destroy(&self.rados_t, osd_id)
264    }
265
266    // Luminous + only
267
268    pub fn mgr_dump(&self) -> Result<cmd::MgrDump, RadosError> {
269        min_version!(Luminous, self);
270        Ok(cmd::mgr_dump(&self.rados_t)?)
271    }
272
273    pub fn mgr_fail(&self, mgr_id: &str) -> Result<(), RadosError> {
274        min_version!(Luminous, self);
275        Ok(cmd::mgr_fail(&self.rados_t, mgr_id, self.simulate)?)
276    }
277
278    pub fn mgr_list_modules(&self) -> Result<Vec<String>, RadosError> {
279        min_version!(Luminous, self);
280        Ok(cmd::mgr_list_modules(&self.rados_t)?)
281    }
282
283    pub fn mgr_list_services(&self) -> Result<Vec<String>, RadosError> {
284        min_version!(Luminous, self);
285        Ok(cmd::mgr_list_services(&self.rados_t)?)
286    }
287
288    pub fn mgr_enable_module(&self, module: &str, force: bool) -> Result<(), RadosError> {
289        min_version!(Luminous, self);
290        Ok(cmd::mgr_enable_module(
291            &self.rados_t,
292            module,
293            force,
294            self.simulate,
295        )?)
296    }
297
298    pub fn mgr_disable_module(&self, module: &str) -> Result<(), RadosError> {
299        min_version!(Luminous, self);
300        Ok(cmd::mgr_disable_module(
301            &self.rados_t,
302            module,
303            self.simulate,
304        )?)
305    }
306
307    pub fn mgr_metadata(&self) -> Result<Vec<cmd::MgrMetadata>, RadosError> {
308        min_version!(Luminous, self);
309        Ok(cmd::mgr_metadata(&self.rados_t)?)
310    }
311
312    pub fn osd_metadata(&self) -> Result<Vec<cmd::OsdMetadata>, RadosError> {
313        Ok(cmd::osd_metadata(&self.rados_t)?)
314    }
315
316    pub fn mgr_count_metadata(&self, property: &str) -> Result<HashMap<String, u64>, RadosError> {
317        min_version!(Luminous, self);
318        Ok(cmd::mgr_count_metadata(&self.rados_t, property)?)
319    }
320
321    pub fn mgr_versions(&self) -> Result<HashMap<String, u64>, RadosError> {
322        min_version!(Luminous, self);
323        Ok(cmd::mgr_versions(&self.rados_t)?)
324    }
325
326    pub fn run_command(&self, command: MonCommand) -> Result<String, RadosError> {
327        let cmd = command.as_json();
328        let data: Vec<*mut c_char> = Vec::with_capacity(1);
329
330        debug!("Calling rados_mon_command with {:?}", cmd);
331        let cmds = CString::new(cmd).unwrap();
332
333        let mut outbuf = ptr::null_mut();
334        let mut outs = ptr::null_mut();
335        let mut outbuf_len = 0;
336        let mut outs_len = 0;
337
338        // Ceph librados allocates these buffers internally and the pointer that comes
339        // back must be freed by call `rados_buffer_free`
340        let mut str_outbuf: String = String::new();
341        let mut str_outs: String = String::new();
342
343        let ret_code = unsafe {
344            // cmd length is 1 because we only allow one command at a time.
345            rados::rados_mon_command(
346                *self.rados_t.inner(),
347                &mut cmds.as_ptr(),
348                1,
349                data.as_ptr() as *mut c_char,
350                data.len() as usize,
351                &mut outbuf,
352                &mut outbuf_len,
353                &mut outs,
354                &mut outs_len,
355            )
356        };
357        debug!("return code: {}", ret_code);
358        if ret_code < 0 {
359            if outs_len > 0 && !outs.is_null() {
360                let slice =
361                    unsafe { ::std::slice::from_raw_parts(outs as *const u8, outs_len as usize) };
362                str_outs = String::from_utf8_lossy(slice).into_owned();
363
364                unsafe {
365                    rados::rados_buffer_free(outs);
366                }
367            }
368            return Err(RadosError::new(format!(
369                "{:?} : {}",
370                RadosError::from(ret_code),
371                str_outs
372            )));
373        }
374
375        // Copy the data from outbuf and then  call rados_buffer_free instead libc::free
376        if outbuf_len > 0 && !outbuf.is_null() {
377            let slice =
378                unsafe { ::std::slice::from_raw_parts(outbuf as *const u8, outbuf_len as usize) };
379            str_outbuf = String::from_utf8_lossy(slice).into_owned();
380
381            unsafe {
382                rados::rados_buffer_free(outbuf);
383            }
384        }
385
386        // if outs_len > 0 && !outs.is_null() {
387        //     let slice = unsafe {
388        //         ::std::slice::from_raw_parts(outs as *const u8, outs_len as usize)
389        //     };
390        //     str_outs = String::from_utf8_lossy(slice).into_owned();
391
392        //     unsafe { rados::rados_buffer_free(outs); }
393        // }
394        // println!("outs: {}", str_outs);
395
396        // Ok((str_outbuf, str_outs))
397        Ok(str_outbuf)
398    }
399}