1#![allow(clippy::single_match)]
10#![allow(clippy::needless_range_loop)]
11#![allow(clippy::type_complexity)]
12
13use crate::sys::cpg as ffi;
15
16use std::collections::HashMap;
17use std::ffi::{CStr, CString};
18use std::fmt;
19use std::os::raw::{c_int, c_void};
20use std::ptr::copy_nonoverlapping;
21use std::slice;
22use std::string::String;
23use std::sync::Mutex;
24
25use crate::string_from_bytes;
27use crate::{CsError, DispatchFlags, NodeId, Result};
28
29const CPG_NAMELEN_MAX: usize = 128;
30const CPG_MEMBERS_MAX: usize = 128;
31
32#[derive(Copy, Clone)]
34pub struct RingId {
35 pub nodeid: NodeId,
36 pub seq: u64,
37}
38
39#[derive(Copy, Clone)]
43pub enum Guarantee {
44 TypeUnordered,
45 TypeFifo,
46 TypeAgreed,
47 TypeSafe,
48}
49
50impl Guarantee {
52 pub fn to_c(&self) -> u32 {
53 match self {
54 Guarantee::TypeUnordered => ffi::CPG_TYPE_UNORDERED,
55 Guarantee::TypeFifo => ffi::CPG_TYPE_FIFO,
56 Guarantee::TypeAgreed => ffi::CPG_TYPE_AGREED,
57 Guarantee::TypeSafe => ffi::CPG_TYPE_SAFE,
58 }
59 }
60}
61
62#[derive(Copy, Clone)]
64pub enum FlowControlState {
65 Disabled,
66 Enabled,
67}
68
69#[derive(Copy, Clone)]
71pub enum Model1Flags {
72 None,
73}
74
75#[derive(Copy, Clone)]
77pub enum Reason {
78 Undefined = 0,
79 Join = 1,
80 Leave = 2,
81 NodeDown = 3,
82 NodeUp = 4,
83 ProcDown = 5,
84}
85
86impl Reason {
88 pub fn new(r: u32) -> Reason {
89 match r {
90 0 => Reason::Undefined,
91 1 => Reason::Join,
92 2 => Reason::Leave,
93 3 => Reason::NodeDown,
94 4 => Reason::NodeUp,
95 5 => Reason::ProcDown,
96 _ => Reason::Undefined,
97 }
98 }
99}
100impl fmt::Display for Reason {
101 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
102 match self {
103 Reason::Undefined => write!(f, "Undefined"),
104 Reason::Join => write!(f, "Join"),
105 Reason::Leave => write!(f, "Leave"),
106 Reason::NodeDown => write!(f, "NodeDown"),
107 Reason::NodeUp => write!(f, "NodeUp"),
108 Reason::ProcDown => write!(f, "ProcDown"),
109 }
110 }
111}
112
113pub struct Address {
115 pub nodeid: NodeId,
116 pub pid: u32,
117 pub reason: Reason,
118}
119impl fmt::Debug for Address {
120 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
121 write!(
122 f,
123 "[nodeid: {}, pid: {}, reason: {}]",
124 self.nodeid, self.pid, self.reason
125 )
126 }
127}
128
129#[derive(Copy, Clone)]
131pub struct Model1Data {
132 pub flags: Model1Flags,
133 pub deliver_fn: Option<
134 fn(
135 handle: &Handle,
136 group_name: String,
137 nodeid: NodeId,
138 pid: u32,
139 msg: &[u8],
140 msg_len: usize,
141 ),
142 >,
143 pub confchg_fn: Option<
144 fn(
145 handle: &Handle,
146 group_name: &str,
147 member_list: Vec<Address>,
148 left_list: Vec<Address>,
149 joined_list: Vec<Address>,
150 ),
151 >,
152 pub totem_confchg_fn: Option<fn(handle: &Handle, ring_id: RingId, member_list: Vec<NodeId>)>,
153}
154
155#[derive(Copy, Clone)]
157pub enum ModelData {
158 ModelNone,
159 ModelV1(Model1Data),
160}
161
162pub struct Handle {
164 cpg_handle: u64, model_data: ModelData,
166 clone: bool,
167}
168
169impl Clone for Handle {
170 fn clone(&self) -> Handle {
171 Handle {
172 cpg_handle: self.cpg_handle,
173 model_data: self.model_data,
174 clone: true,
175 }
176 }
177}
178
179impl Drop for Handle {
180 fn drop(self: &mut Handle) {
181 if !self.clone {
182 let _e = finalize(self);
183 }
184 }
185}
186
187impl PartialEq for Handle {
189 fn eq(&self, other: &Handle) -> bool {
190 self.cpg_handle == other.cpg_handle
191 }
192}
193
194lazy_static! {
196 static ref HANDLE_HASH: Mutex<HashMap<u64, Handle>> = Mutex::new(HashMap::new());
197}
198
199fn string_to_cpg_name(group: &str) -> Result<ffi::cpg_name> {
201 if group.len() > CPG_NAMELEN_MAX - 1 {
202 return Err(CsError::CsErrInvalidParam);
203 }
204
205 let c_name = match CString::new(group) {
206 Ok(n) => n,
207 Err(_) => return Err(CsError::CsErrLibrary),
208 };
209 let mut c_group = ffi::cpg_name {
210 length: group.len() as u32,
211 value: [0; CPG_NAMELEN_MAX],
212 };
213
214 unsafe {
215 copy_nonoverlapping(c_name.as_ptr(), c_group.value.as_mut_ptr(), group.len());
217 }
218
219 Ok(c_group)
220}
221
222fn cpg_array_to_vec(list: *const ffi::cpg_address, list_entries: usize) -> Vec<Address> {
224 let temp: &[ffi::cpg_address] = unsafe { slice::from_raw_parts(list, list_entries) };
225 let mut r_vec = Vec::<Address>::new();
226
227 for i in 0..list_entries {
228 let a: Address = Address {
229 nodeid: NodeId::from(temp[i].nodeid),
230 pid: temp[i].pid,
231 reason: Reason::new(temp[i].reason),
232 };
233 r_vec.push(a);
234 }
235 r_vec
236}
237
238extern "C" fn rust_deliver_fn(
240 handle: ffi::cpg_handle_t,
241 group_name: *const ffi::cpg_name,
242 nodeid: u32,
243 pid: u32,
244 msg: *mut ::std::os::raw::c_void,
245 msg_len: usize,
246) {
247 if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) {
248 let r_group_name = unsafe {
250 CStr::from_ptr(&(*group_name).value[0])
251 .to_string_lossy()
252 .into_owned()
253 };
254
255 let data: &[u8] = unsafe { std::slice::from_raw_parts(msg as *const u8, msg_len) };
256
257 match h.model_data {
258 ModelData::ModelV1(md) => {
259 if let Some(cb) = md.deliver_fn {
260 (cb)(h, r_group_name, NodeId::from(nodeid), pid, data, msg_len);
261 }
262 }
263 _ => {}
264 }
265 }
266}
267
268extern "C" fn rust_confchg_fn(
270 handle: ffi::cpg_handle_t,
271 group_name: *const ffi::cpg_name,
272 member_list: *const ffi::cpg_address,
273 member_list_entries: usize,
274 left_list: *const ffi::cpg_address,
275 left_list_entries: usize,
276 joined_list: *const ffi::cpg_address,
277 joined_list_entries: usize,
278) {
279 if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) {
280 let r_group_name = unsafe {
281 CStr::from_ptr(&(*group_name).value[0])
282 .to_string_lossy()
283 .into_owned()
284 };
285 let r_member_list = cpg_array_to_vec(member_list, member_list_entries);
286 let r_left_list = cpg_array_to_vec(left_list, left_list_entries);
287 let r_joined_list = cpg_array_to_vec(joined_list, joined_list_entries);
288
289 match h.model_data {
290 ModelData::ModelV1(md) => {
291 if let Some(cb) = md.confchg_fn {
292 (cb)(h, &r_group_name, r_member_list, r_left_list, r_joined_list);
293 }
294 }
295 _ => {}
296 }
297 }
298}
299
300extern "C" fn rust_totem_confchg_fn(
302 handle: ffi::cpg_handle_t,
303 ring_id: ffi::cpg_ring_id,
304 member_list_entries: u32,
305 member_list: *const u32,
306) {
307 if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) {
308 let r_ring_id = RingId {
309 nodeid: NodeId::from(ring_id.nodeid),
310 seq: ring_id.seq,
311 };
312 let mut r_member_list = Vec::<NodeId>::new();
313 let temp_members: &[u32] =
314 unsafe { slice::from_raw_parts(member_list, member_list_entries as usize) };
315 for i in 0..member_list_entries as usize {
316 r_member_list.push(NodeId::from(temp_members[i]));
317 }
318
319 match h.model_data {
320 ModelData::ModelV1(md) => {
321 if let Some(cb) = md.totem_confchg_fn {
322 (cb)(h, r_ring_id, r_member_list);
323 }
324 }
325 _ => {}
326 }
327 }
328}
329
330pub fn initialize(model_data: &ModelData, context: u64) -> Result<Handle> {
334 let mut handle: ffi::cpg_handle_t = 0;
335 let mut m = match model_data {
336 ModelData::ModelV1(_v1) => {
337 ffi::cpg_model_v1_data_t {
338 model: ffi::CPG_MODEL_V1,
339 cpg_deliver_fn: Some(rust_deliver_fn),
340 cpg_confchg_fn: Some(rust_confchg_fn),
341 cpg_totem_confchg_fn: Some(rust_totem_confchg_fn),
342 flags: 0, }
344 }
345 _ => return Err(CsError::CsErrInvalidParam),
346 };
347
348 unsafe {
349 let c_context: *mut c_void = &mut &context as *mut _ as *mut c_void;
350 let c_model: *mut ffi::cpg_model_data_t = &mut m as *mut _ as *mut ffi::cpg_model_data_t;
351 let res = ffi::cpg_model_initialize(&mut handle, m.model, c_model, c_context);
352
353 if res == ffi::CS_OK {
354 let rhandle = Handle {
355 cpg_handle: handle,
356 model_data: *model_data,
357 clone: false,
358 };
359 HANDLE_HASH.lock().unwrap().insert(handle, rhandle.clone());
360 Ok(rhandle)
361 } else {
362 Err(CsError::from_c(res))
363 }
364 }
365}
366
367pub fn finalize(handle: &Handle) -> Result<()> {
369 let res = unsafe { ffi::cpg_finalize(handle.cpg_handle) };
370 if res == ffi::CS_OK {
371 HANDLE_HASH.lock().unwrap().remove(&handle.cpg_handle);
372 Ok(())
373 } else {
374 Err(CsError::from_c(res))
375 }
376}
377
378pub fn fd_get(handle: &Handle) -> Result<i32> {
381 let c_fd: *mut c_int = &mut 0 as *mut _ as *mut c_int;
382 let res = unsafe { ffi::cpg_fd_get(handle.cpg_handle, c_fd) };
383 if res == ffi::CS_OK {
384 Ok(unsafe { *c_fd })
385 } else {
386 Err(CsError::from_c(res))
387 }
388}
389
390pub fn dispatch(handle: &Handle, flags: DispatchFlags) -> Result<()> {
392 let res = unsafe { ffi::cpg_dispatch(handle.cpg_handle, flags as u32) };
393 if res == ffi::CS_OK {
394 Ok(())
395 } else {
396 Err(CsError::from_c(res))
397 }
398}
399
400pub fn join(handle: &Handle, group: &str) -> Result<()> {
402 let res = unsafe {
403 let c_group = string_to_cpg_name(group)?;
404 ffi::cpg_join(handle.cpg_handle, &c_group)
405 };
406 if res == ffi::CS_OK {
407 Ok(())
408 } else {
409 Err(CsError::from_c(res))
410 }
411}
412
413pub fn leave(handle: &Handle, group: &str) -> Result<()> {
416 let res = unsafe {
417 let c_group = string_to_cpg_name(group)?;
418 ffi::cpg_leave(handle.cpg_handle, &c_group)
419 };
420 if res == ffi::CS_OK {
421 Ok(())
422 } else {
423 Err(CsError::from_c(res))
424 }
425}
426
427pub fn local_get(handle: &Handle) -> Result<NodeId> {
429 let mut nodeid: u32 = 0;
430 let res = unsafe { ffi::cpg_local_get(handle.cpg_handle, &mut nodeid) };
431 if res == ffi::CS_OK {
432 Ok(NodeId::from(nodeid))
433 } else {
434 Err(CsError::from_c(res))
435 }
436}
437
438pub fn membership_get(handle: &Handle, group: &str) -> Result<Vec<Address>> {
440 let mut member_list_entries: i32 = 0;
441 let member_list = [ffi::cpg_address {
442 nodeid: 0,
443 pid: 0,
444 reason: 0,
445 }; CPG_MEMBERS_MAX];
446 let res = unsafe {
447 let mut c_group = string_to_cpg_name(group)?;
448 let c_memlist = member_list.as_ptr() as *mut ffi::cpg_address;
449 ffi::cpg_membership_get(
450 handle.cpg_handle,
451 &mut c_group,
452 &mut *c_memlist,
453 &mut member_list_entries,
454 )
455 };
456 if res == ffi::CS_OK {
457 Ok(cpg_array_to_vec(
458 member_list.as_ptr(),
459 member_list_entries as usize,
460 ))
461 } else {
462 Err(CsError::from_c(res))
463 }
464}
465
466pub fn max_atomic_msgsize_get(handle: &Handle) -> Result<u32> {
470 let mut asize: u32 = 0;
471 let res = unsafe { ffi::cpg_max_atomic_msgsize_get(handle.cpg_handle, &mut asize) };
472 if res == ffi::CS_OK {
473 Ok(asize)
474 } else {
475 Err(CsError::from_c(res))
476 }
477}
478
479pub fn context_get(handle: &Handle) -> Result<u64> {
483 let mut c_context: *mut c_void = &mut 0u64 as *mut _ as *mut c_void;
484 let (res, context) = unsafe {
485 let r = ffi::cpg_context_get(handle.cpg_handle, &mut c_context);
486 let context: u64 = c_context as u64;
487 (r, context)
488 };
489 if res == ffi::CS_OK {
490 Ok(context)
491 } else {
492 Err(CsError::from_c(res))
493 }
494}
495
496pub fn context_set(handle: &Handle, context: u64) -> Result<()> {
501 let res = unsafe {
502 let c_context = context as *mut c_void;
503 ffi::cpg_context_set(handle.cpg_handle, c_context)
504 };
505 if res == ffi::CS_OK {
506 Ok(())
507 } else {
508 Err(CsError::from_c(res))
509 }
510}
511
512pub fn flow_control_state_get(handle: &Handle) -> Result<bool> {
514 let mut fc_state: u32 = 0;
515 let res = unsafe { ffi::cpg_flow_control_state_get(handle.cpg_handle, &mut fc_state) };
516 if res == ffi::CS_OK {
517 if fc_state == 1 {
518 Ok(true)
519 } else {
520 Ok(false)
521 }
522 } else {
523 Err(CsError::from_c(res))
524 }
525}
526
527pub fn mcast_joined(handle: &Handle, guarantee: Guarantee, msg: &[u8]) -> Result<()> {
529 let c_iovec = ffi::iovec {
530 iov_base: msg.as_ptr() as *mut c_void,
531 iov_len: msg.len(),
532 };
533 let res = unsafe { ffi::cpg_mcast_joined(handle.cpg_handle, guarantee.to_c(), &c_iovec, 1) };
534 if res == ffi::CS_OK {
535 Ok(())
536 } else {
537 Err(CsError::from_c(res))
538 }
539}
540
541#[derive(Copy, Clone)]
543pub enum CpgIterType {
544 NameOnly = 1,
545 OneGroup = 2,
546 All = 3,
547}
548
549pub struct CpgIterStart {
554 iter_handle: u64,
555}
556
557pub struct CpgIter {
559 pub group: String,
560 pub nodeid: NodeId,
561 pub pid: u32,
562}
563
564pub struct CpgIntoIter {
565 iter_handle: u64,
566}
567
568impl fmt::Debug for CpgIter {
569 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
570 write!(
571 f,
572 "[group: {}, nodeid: {}, pid: {}]",
573 self.group, self.nodeid, self.pid
574 )
575 }
576}
577
578impl Iterator for CpgIntoIter {
579 type Item = CpgIter;
580
581 fn next(&mut self) -> Option<CpgIter> {
582 let mut c_iter_description = ffi::cpg_iteration_description_t {
583 nodeid: 0,
584 pid: 0,
585 group: ffi::cpg_name {
586 length: 0_u32,
587 value: [0; CPG_NAMELEN_MAX],
588 },
589 };
590 let res = unsafe { ffi::cpg_iteration_next(self.iter_handle, &mut c_iter_description) };
591
592 if res == ffi::CS_OK {
593 let r_group =
594 match string_from_bytes(c_iter_description.group.value.as_ptr(), CPG_NAMELEN_MAX) {
595 Ok(groupname) => groupname,
596 Err(_) => return None,
597 };
598 Some(CpgIter {
599 group: r_group,
600 nodeid: NodeId::from(c_iter_description.nodeid),
601 pid: c_iter_description.pid,
602 })
603 } else if res == ffi::CS_ERR_NO_SECTIONS {
604 unsafe {
606 ffi::cpg_iteration_finalize(self.iter_handle)
608 };
609 None
610 } else {
611 None
612 }
613 }
614}
615
616impl CpgIterStart {
617 pub fn new(cpg_handle: &Handle, group: &str, iter_type: CpgIterType) -> Result<CpgIterStart> {
619 let mut iter_handle: u64 = 0;
620 let res = unsafe {
621 let mut c_group = string_to_cpg_name(group)?;
622 let c_itertype = iter_type as u32;
623 let c_group_ptr = {
625 match iter_type {
626 CpgIterType::All => std::ptr::null_mut(),
627 _ => &mut c_group,
628 }
629 };
630 ffi::cpg_iteration_initialize(
631 cpg_handle.cpg_handle,
632 c_itertype,
633 c_group_ptr,
634 &mut iter_handle,
635 )
636 };
637 if res == ffi::CS_OK {
638 Ok(CpgIterStart { iter_handle })
639 } else {
640 Err(CsError::from_c(res))
641 }
642 }
643}
644
645impl IntoIterator for CpgIterStart {
646 type Item = CpgIter;
647 type IntoIter = CpgIntoIter;
648
649 fn into_iter(self) -> Self::IntoIter {
650 CpgIntoIter {
651 iter_handle: self.iter_handle,
652 }
653 }
654}