Skip to main content

rust_corosync/
cpg.rs

1// libcpg interface for Rust
2// Copyright (c) 2020 Red Hat, Inc.
3//
4// All rights reserved.
5//
6// Author: Christine Caulfield (ccaulfi@redhat.com)
7//
8
9#![allow(clippy::single_match)]
10#![allow(clippy::needless_range_loop)]
11#![allow(clippy::type_complexity)]
12
13// For the code generated by bindgen
14use crate::sys::cpg as ffi;
15
16use std::collections::HashMap;
17use std::ffi::{CStr, CString};
18use std::fmt;
19use std::os::raw::{c_int, c_void};
20use std::ptr::copy_nonoverlapping;
21use std::slice;
22use std::string::String;
23use std::sync::Mutex;
24
25// General corosync things
26use crate::string_from_bytes;
27use crate::{CsError, DispatchFlags, NodeId, Result};
28
29const CPG_NAMELEN_MAX: usize = 128;
30const CPG_MEMBERS_MAX: usize = 128;
31
32/// RingId returned by totem_confchg_fn
33#[derive(Copy, Clone)]
34pub struct RingId {
35    pub nodeid: NodeId,
36    pub seq: u64,
37}
38
39/// Totem delivery guarantee options for [mcast_joined]
40// The C enum doesn't have numbers in the code
41// so don't assume we can match them
42#[derive(Copy, Clone)]
43pub enum Guarantee {
44    TypeUnordered,
45    TypeFifo,
46    TypeAgreed,
47    TypeSafe,
48}
49
50// Convert internal to cpg.h values.
51impl Guarantee {
52    pub fn to_c(&self) -> u32 {
53        match self {
54            Guarantee::TypeUnordered => ffi::CPG_TYPE_UNORDERED,
55            Guarantee::TypeFifo => ffi::CPG_TYPE_FIFO,
56            Guarantee::TypeAgreed => ffi::CPG_TYPE_AGREED,
57            Guarantee::TypeSafe => ffi::CPG_TYPE_SAFE,
58        }
59    }
60}
61
62/// Flow control state returned from [flow_control_state_get]
63#[derive(Copy, Clone)]
64pub enum FlowControlState {
65    Disabled,
66    Enabled,
67}
68
69/// No flags current specified for model1 so leave this at None
70#[derive(Copy, Clone)]
71pub enum Model1Flags {
72    None,
73}
74
75/// Reason for cpg item callback
76#[derive(Copy, Clone)]
77pub enum Reason {
78    Undefined = 0,
79    Join = 1,
80    Leave = 2,
81    NodeDown = 3,
82    NodeUp = 4,
83    ProcDown = 5,
84}
85
86// Convert to cpg.h values
87impl Reason {
88    pub fn new(r: u32) -> Reason {
89        match r {
90            0 => Reason::Undefined,
91            1 => Reason::Join,
92            2 => Reason::Leave,
93            3 => Reason::NodeDown,
94            4 => Reason::NodeUp,
95            5 => Reason::ProcDown,
96            _ => Reason::Undefined,
97        }
98    }
99}
100impl fmt::Display for Reason {
101    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
102        match self {
103            Reason::Undefined => write!(f, "Undefined"),
104            Reason::Join => write!(f, "Join"),
105            Reason::Leave => write!(f, "Leave"),
106            Reason::NodeDown => write!(f, "NodeDown"),
107            Reason::NodeUp => write!(f, "NodeUp"),
108            Reason::ProcDown => write!(f, "ProcDown"),
109        }
110    }
111}
112
113/// A CPG address entry returned in the callbacks
114pub struct Address {
115    pub nodeid: NodeId,
116    pub pid: u32,
117    pub reason: Reason,
118}
119impl fmt::Debug for Address {
120    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
121        write!(
122            f,
123            "[nodeid: {}, pid: {}, reason: {}]",
124            self.nodeid, self.pid, self.reason
125        )
126    }
127}
128
129/// Data for model1 [initialize]
130#[derive(Copy, Clone)]
131pub struct Model1Data {
132    pub flags: Model1Flags,
133    pub deliver_fn: Option<
134        fn(
135            handle: &Handle,
136            group_name: String,
137            nodeid: NodeId,
138            pid: u32,
139            msg: &[u8],
140            msg_len: usize,
141        ),
142    >,
143    pub confchg_fn: Option<
144        fn(
145            handle: &Handle,
146            group_name: &str,
147            member_list: Vec<Address>,
148            left_list: Vec<Address>,
149            joined_list: Vec<Address>,
150        ),
151    >,
152    pub totem_confchg_fn: Option<fn(handle: &Handle, ring_id: RingId, member_list: Vec<NodeId>)>,
153}
154
155/// Modeldata for [initialize], only v1 supported at the moment
156#[derive(Copy, Clone)]
157pub enum ModelData {
158    ModelNone,
159    ModelV1(Model1Data),
160}
161
162/// A handle into the cpg library. Returned from [initialize] and needed for all other calls
163pub struct Handle {
164    cpg_handle: u64, // Corosync library handle
165    model_data: ModelData,
166    clone: bool,
167}
168
169impl Clone for Handle {
170    fn clone(&self) -> Handle {
171        Handle {
172            cpg_handle: self.cpg_handle,
173            model_data: self.model_data,
174            clone: true,
175        }
176    }
177}
178
179impl Drop for Handle {
180    fn drop(self: &mut Handle) {
181        if !self.clone {
182            let _e = finalize(self);
183        }
184    }
185}
186
187// Clones count as equivalent
188impl PartialEq for Handle {
189    fn eq(&self, other: &Handle) -> bool {
190        self.cpg_handle == other.cpg_handle
191    }
192}
193
194// Used to convert a CPG handle into one of ours
195lazy_static! {
196    static ref HANDLE_HASH: Mutex<HashMap<u64, Handle>> = Mutex::new(HashMap::new());
197}
198
199// Convert a Rust String into a cpg_name struct for libcpg
200fn string_to_cpg_name(group: &str) -> Result<ffi::cpg_name> {
201    if group.len() > CPG_NAMELEN_MAX - 1 {
202        return Err(CsError::CsErrInvalidParam);
203    }
204
205    let c_name = match CString::new(group) {
206        Ok(n) => n,
207        Err(_) => return Err(CsError::CsErrLibrary),
208    };
209    let mut c_group = ffi::cpg_name {
210        length: group.len() as u32,
211        value: [0; CPG_NAMELEN_MAX],
212    };
213
214    unsafe {
215        // NOTE param order is 'wrong-way round' from C
216        copy_nonoverlapping(c_name.as_ptr(), c_group.value.as_mut_ptr(), group.len());
217    }
218
219    Ok(c_group)
220}
221
222// Convert an array of cpg_addresses to a Vec<cpg::Address> - used in callbacks
223fn cpg_array_to_vec(list: *const ffi::cpg_address, list_entries: usize) -> Vec<Address> {
224    let temp: &[ffi::cpg_address] = unsafe { slice::from_raw_parts(list, list_entries) };
225    let mut r_vec = Vec::<Address>::new();
226
227    for i in 0..list_entries {
228        let a: Address = Address {
229            nodeid: NodeId::from(temp[i].nodeid),
230            pid: temp[i].pid,
231            reason: Reason::new(temp[i].reason),
232        };
233        r_vec.push(a);
234    }
235    r_vec
236}
237
238// Called from CPG callback function - munge params back to Rust from C
239extern "C" fn rust_deliver_fn(
240    handle: ffi::cpg_handle_t,
241    group_name: *const ffi::cpg_name,
242    nodeid: u32,
243    pid: u32,
244    msg: *mut ::std::os::raw::c_void,
245    msg_len: usize,
246) {
247    if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) {
248        // Convert group_name into a Rust str.
249        let r_group_name = unsafe {
250            CStr::from_ptr(&(*group_name).value[0])
251                .to_string_lossy()
252                .into_owned()
253        };
254
255        let data: &[u8] = unsafe { std::slice::from_raw_parts(msg as *const u8, msg_len) };
256
257        match h.model_data {
258            ModelData::ModelV1(md) => {
259                if let Some(cb) = md.deliver_fn {
260                    (cb)(h, r_group_name, NodeId::from(nodeid), pid, data, msg_len);
261                }
262            }
263            _ => {}
264        }
265    }
266}
267
268// Called from CPG callback function - munge params back to Rust from C
269extern "C" fn rust_confchg_fn(
270    handle: ffi::cpg_handle_t,
271    group_name: *const ffi::cpg_name,
272    member_list: *const ffi::cpg_address,
273    member_list_entries: usize,
274    left_list: *const ffi::cpg_address,
275    left_list_entries: usize,
276    joined_list: *const ffi::cpg_address,
277    joined_list_entries: usize,
278) {
279    if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) {
280        let r_group_name = unsafe {
281            CStr::from_ptr(&(*group_name).value[0])
282                .to_string_lossy()
283                .into_owned()
284        };
285        let r_member_list = cpg_array_to_vec(member_list, member_list_entries);
286        let r_left_list = cpg_array_to_vec(left_list, left_list_entries);
287        let r_joined_list = cpg_array_to_vec(joined_list, joined_list_entries);
288
289        match h.model_data {
290            ModelData::ModelV1(md) => {
291                if let Some(cb) = md.confchg_fn {
292                    (cb)(h, &r_group_name, r_member_list, r_left_list, r_joined_list);
293                }
294            }
295            _ => {}
296        }
297    }
298}
299
300// Called from CPG callback function - munge params back to Rust from C
301extern "C" fn rust_totem_confchg_fn(
302    handle: ffi::cpg_handle_t,
303    ring_id: ffi::cpg_ring_id,
304    member_list_entries: u32,
305    member_list: *const u32,
306) {
307    if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) {
308        let r_ring_id = RingId {
309            nodeid: NodeId::from(ring_id.nodeid),
310            seq: ring_id.seq,
311        };
312        let mut r_member_list = Vec::<NodeId>::new();
313        let temp_members: &[u32] =
314            unsafe { slice::from_raw_parts(member_list, member_list_entries as usize) };
315        for i in 0..member_list_entries as usize {
316            r_member_list.push(NodeId::from(temp_members[i]));
317        }
318
319        match h.model_data {
320            ModelData::ModelV1(md) => {
321                if let Some(cb) = md.totem_confchg_fn {
322                    (cb)(h, r_ring_id, r_member_list);
323                }
324            }
325            _ => {}
326        }
327    }
328}
329
330/// Initialize a connection to the cpg library. You must call this before doing anything
331/// else and use the passed back [Handle].
332/// Remember to free the handle using [finalize] when finished.
333pub fn initialize(model_data: &ModelData, context: u64) -> Result<Handle> {
334    let mut handle: ffi::cpg_handle_t = 0;
335    let mut m = match model_data {
336        ModelData::ModelV1(_v1) => {
337            ffi::cpg_model_v1_data_t {
338                model: ffi::CPG_MODEL_V1,
339                cpg_deliver_fn: Some(rust_deliver_fn),
340                cpg_confchg_fn: Some(rust_confchg_fn),
341                cpg_totem_confchg_fn: Some(rust_totem_confchg_fn),
342                flags: 0, // No supported flags (yet)
343            }
344        }
345        _ => return Err(CsError::CsErrInvalidParam),
346    };
347
348    unsafe {
349        let c_context: *mut c_void = &mut &context as *mut _ as *mut c_void;
350        let c_model: *mut ffi::cpg_model_data_t = &mut m as *mut _ as *mut ffi::cpg_model_data_t;
351        let res = ffi::cpg_model_initialize(&mut handle, m.model, c_model, c_context);
352
353        if res == ffi::CS_OK {
354            let rhandle = Handle {
355                cpg_handle: handle,
356                model_data: *model_data,
357                clone: false,
358            };
359            HANDLE_HASH.lock().unwrap().insert(handle, rhandle.clone());
360            Ok(rhandle)
361        } else {
362            Err(CsError::from_c(res))
363        }
364    }
365}
366
367/// Finish with a connection to corosync
368pub fn finalize(handle: &Handle) -> Result<()> {
369    let res = unsafe { ffi::cpg_finalize(handle.cpg_handle) };
370    if res == ffi::CS_OK {
371        HANDLE_HASH.lock().unwrap().remove(&handle.cpg_handle);
372        Ok(())
373    } else {
374        Err(CsError::from_c(res))
375    }
376}
377
378// Not sure if an FD is the right thing to return here, but it will do for now.
379/// Returns a file descriptor to use for poll/select on the CPG handle
380pub fn fd_get(handle: &Handle) -> Result<i32> {
381    let c_fd: *mut c_int = &mut 0 as *mut _ as *mut c_int;
382    let res = unsafe { ffi::cpg_fd_get(handle.cpg_handle, c_fd) };
383    if res == ffi::CS_OK {
384        Ok(unsafe { *c_fd })
385    } else {
386        Err(CsError::from_c(res))
387    }
388}
389
390/// Call any/all active CPG callbacks for this [Handle] see [DispatchFlags] for details
391pub fn dispatch(handle: &Handle, flags: DispatchFlags) -> Result<()> {
392    let res = unsafe { ffi::cpg_dispatch(handle.cpg_handle, flags as u32) };
393    if res == ffi::CS_OK {
394        Ok(())
395    } else {
396        Err(CsError::from_c(res))
397    }
398}
399
400/// Joins a CPG group for sending and receiving messages
401pub fn join(handle: &Handle, group: &str) -> Result<()> {
402    let res = unsafe {
403        let c_group = string_to_cpg_name(group)?;
404        ffi::cpg_join(handle.cpg_handle, &c_group)
405    };
406    if res == ffi::CS_OK {
407        Ok(())
408    } else {
409        Err(CsError::from_c(res))
410    }
411}
412
413/// Leave the currently joined CPG group, another group can now be joined on
414/// the same [Handle] or [finalize] can be called to finish using CPG
415pub fn leave(handle: &Handle, group: &str) -> Result<()> {
416    let res = unsafe {
417        let c_group = string_to_cpg_name(group)?;
418        ffi::cpg_leave(handle.cpg_handle, &c_group)
419    };
420    if res == ffi::CS_OK {
421        Ok(())
422    } else {
423        Err(CsError::from_c(res))
424    }
425}
426
427/// Get the local node ID
428pub fn local_get(handle: &Handle) -> Result<NodeId> {
429    let mut nodeid: u32 = 0;
430    let res = unsafe { ffi::cpg_local_get(handle.cpg_handle, &mut nodeid) };
431    if res == ffi::CS_OK {
432        Ok(NodeId::from(nodeid))
433    } else {
434        Err(CsError::from_c(res))
435    }
436}
437
438/// Get a list of members of a CPG group as a vector of [Address] structs
439pub fn membership_get(handle: &Handle, group: &str) -> Result<Vec<Address>> {
440    let mut member_list_entries: i32 = 0;
441    let member_list = [ffi::cpg_address {
442        nodeid: 0,
443        pid: 0,
444        reason: 0,
445    }; CPG_MEMBERS_MAX];
446    let res = unsafe {
447        let mut c_group = string_to_cpg_name(group)?;
448        let c_memlist = member_list.as_ptr() as *mut ffi::cpg_address;
449        ffi::cpg_membership_get(
450            handle.cpg_handle,
451            &mut c_group,
452            &mut *c_memlist,
453            &mut member_list_entries,
454        )
455    };
456    if res == ffi::CS_OK {
457        Ok(cpg_array_to_vec(
458            member_list.as_ptr(),
459            member_list_entries as usize,
460        ))
461    } else {
462        Err(CsError::from_c(res))
463    }
464}
465
466/// Get the maximum size that CPG can send in one corosync message,
467/// any messages sent via [mcast_joined] that are larger than this
468/// will be fragmented
469pub fn max_atomic_msgsize_get(handle: &Handle) -> Result<u32> {
470    let mut asize: u32 = 0;
471    let res = unsafe { ffi::cpg_max_atomic_msgsize_get(handle.cpg_handle, &mut asize) };
472    if res == ffi::CS_OK {
473        Ok(asize)
474    } else {
475        Err(CsError::from_c(res))
476    }
477}
478
479/// Get the current 'context' value for this handle.
480/// The context value is an arbitrary value that is always passed
481/// back to callbacks to help identify the source
482pub fn context_get(handle: &Handle) -> Result<u64> {
483    let mut c_context: *mut c_void = &mut 0u64 as *mut _ as *mut c_void;
484    let (res, context) = unsafe {
485        let r = ffi::cpg_context_get(handle.cpg_handle, &mut c_context);
486        let context: u64 = c_context as u64;
487        (r, context)
488    };
489    if res == ffi::CS_OK {
490        Ok(context)
491    } else {
492        Err(CsError::from_c(res))
493    }
494}
495
496/// Set the current 'context' value for this handle.
497/// The context value is an arbitrary value that is always passed
498/// back to callbacks to help identify the source.
499/// Normally this is set in [initialize], but this allows it to be changed
500pub fn context_set(handle: &Handle, context: u64) -> Result<()> {
501    let res = unsafe {
502        let c_context = context as *mut c_void;
503        ffi::cpg_context_set(handle.cpg_handle, c_context)
504    };
505    if res == ffi::CS_OK {
506        Ok(())
507    } else {
508        Err(CsError::from_c(res))
509    }
510}
511
512/// Get the flow control state of corosync CPG
513pub fn flow_control_state_get(handle: &Handle) -> Result<bool> {
514    let mut fc_state: u32 = 0;
515    let res = unsafe { ffi::cpg_flow_control_state_get(handle.cpg_handle, &mut fc_state) };
516    if res == ffi::CS_OK {
517        if fc_state == 1 {
518            Ok(true)
519        } else {
520            Ok(false)
521        }
522    } else {
523        Err(CsError::from_c(res))
524    }
525}
526
527/// Send a message to the currently joined CPG group
528pub fn mcast_joined(handle: &Handle, guarantee: Guarantee, msg: &[u8]) -> Result<()> {
529    let c_iovec = ffi::iovec {
530        iov_base: msg.as_ptr() as *mut c_void,
531        iov_len: msg.len(),
532    };
533    let res = unsafe { ffi::cpg_mcast_joined(handle.cpg_handle, guarantee.to_c(), &c_iovec, 1) };
534    if res == ffi::CS_OK {
535        Ok(())
536    } else {
537        Err(CsError::from_c(res))
538    }
539}
540
541/// Type of iteration for [CpgIterStart]
542#[derive(Copy, Clone)]
543pub enum CpgIterType {
544    NameOnly = 1,
545    OneGroup = 2,
546    All = 3,
547}
548
549// Iterator based on information on this page. thank you!
550// https://stackoverflow.com/questions/30218886/how-to-implement-iterator-and-intoiterator-for-a-simple-struct
551// Object to iterate over
552/// An object to iterate over a list of CPG groups, create one of these and then use 'for' over it
553pub struct CpgIterStart {
554    iter_handle: u64,
555}
556
557/// struct returned from iterating over a [CpgIterStart]
558pub struct CpgIter {
559    pub group: String,
560    pub nodeid: NodeId,
561    pub pid: u32,
562}
563
564pub struct CpgIntoIter {
565    iter_handle: u64,
566}
567
568impl fmt::Debug for CpgIter {
569    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
570        write!(
571            f,
572            "[group: {}, nodeid: {}, pid: {}]",
573            self.group, self.nodeid, self.pid
574        )
575    }
576}
577
578impl Iterator for CpgIntoIter {
579    type Item = CpgIter;
580
581    fn next(&mut self) -> Option<CpgIter> {
582        let mut c_iter_description = ffi::cpg_iteration_description_t {
583            nodeid: 0,
584            pid: 0,
585            group: ffi::cpg_name {
586                length: 0_u32,
587                value: [0; CPG_NAMELEN_MAX],
588            },
589        };
590        let res = unsafe { ffi::cpg_iteration_next(self.iter_handle, &mut c_iter_description) };
591
592        if res == ffi::CS_OK {
593            let r_group =
594                match string_from_bytes(c_iter_description.group.value.as_ptr(), CPG_NAMELEN_MAX) {
595                    Ok(groupname) => groupname,
596                    Err(_) => return None,
597                };
598            Some(CpgIter {
599                group: r_group,
600                nodeid: NodeId::from(c_iter_description.nodeid),
601                pid: c_iter_description.pid,
602            })
603        } else if res == ffi::CS_ERR_NO_SECTIONS {
604            // End of list
605            unsafe {
606                // Yeah, we don't check this return code. There's nowhere to report it.
607                ffi::cpg_iteration_finalize(self.iter_handle)
608            };
609            None
610        } else {
611            None
612        }
613    }
614}
615
616impl CpgIterStart {
617    /// Create a new [CpgIterStart] object for iterating over a list of active CPG groups
618    pub fn new(cpg_handle: &Handle, group: &str, iter_type: CpgIterType) -> Result<CpgIterStart> {
619        let mut iter_handle: u64 = 0;
620        let res = unsafe {
621            let mut c_group = string_to_cpg_name(group)?;
622            let c_itertype = iter_type as u32;
623            // IterType 'All' requires that the group pointer is passed in as NULL
624            let c_group_ptr = {
625                match iter_type {
626                    CpgIterType::All => std::ptr::null_mut(),
627                    _ => &mut c_group,
628                }
629            };
630            ffi::cpg_iteration_initialize(
631                cpg_handle.cpg_handle,
632                c_itertype,
633                c_group_ptr,
634                &mut iter_handle,
635            )
636        };
637        if res == ffi::CS_OK {
638            Ok(CpgIterStart { iter_handle })
639        } else {
640            Err(CsError::from_c(res))
641        }
642    }
643}
644
645impl IntoIterator for CpgIterStart {
646    type Item = CpgIter;
647    type IntoIter = CpgIntoIter;
648
649    fn into_iter(self) -> Self::IntoIter {
650        CpgIntoIter {
651            iter_handle: self.iter_handle,
652        }
653    }
654}