1#![allow(clippy::single_match)]
10#![allow(clippy::needless_range_loop)]
11#![allow(clippy::type_complexity)]
12
13use crate::sys::cpg as ffi;
15
16use std::collections::HashMap;
17use std::ffi::{CStr, CString};
18use std::fmt;
19use std::os::raw::{c_int, c_void};
20use std::ptr::copy_nonoverlapping;
21use std::slice;
22use std::string::String;
23use std::sync::Mutex;
24
25use crate::string_from_bytes;
27use crate::{CsError, DispatchFlags, NodeId, Result};
28
29const CPG_NAMELEN_MAX: usize = 128;
30const CPG_MEMBERS_MAX: usize = 128;
31
32#[derive(Copy, Clone)]
34pub struct RingId {
35 pub nodeid: NodeId,
36 pub seq: u64,
37}
38
39#[derive(Copy, Clone)]
43pub enum Guarantee {
44 TypeUnordered,
45 TypeFifo,
46 TypeAgreed,
47 TypeSafe,
48}
49
50impl Guarantee {
52 pub fn to_c(&self) -> u32 {
53 match self {
54 Guarantee::TypeUnordered => ffi::CPG_TYPE_UNORDERED,
55 Guarantee::TypeFifo => ffi::CPG_TYPE_FIFO,
56 Guarantee::TypeAgreed => ffi::CPG_TYPE_AGREED,
57 Guarantee::TypeSafe => ffi::CPG_TYPE_SAFE,
58 }
59 }
60}
61
62#[derive(Copy, Clone)]
64pub enum FlowControlState {
65 Disabled,
66 Enabled,
67}
68
69#[derive(Copy, Clone)]
71pub enum Model1Flags {
72 None,
73}
74
75#[derive(Copy, Clone)]
77pub enum Reason {
78 Undefined = 0,
79 Join = 1,
80 Leave = 2,
81 NodeDown = 3,
82 NodeUp = 4,
83 ProcDown = 5,
84}
85
86impl Reason {
88 pub fn new(r: u32) -> Reason {
89 match r {
90 0 => Reason::Undefined,
91 1 => Reason::Join,
92 2 => Reason::Leave,
93 3 => Reason::NodeDown,
94 4 => Reason::NodeUp,
95 5 => Reason::ProcDown,
96 _ => Reason::Undefined,
97 }
98 }
99}
100impl fmt::Display for Reason {
101 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
102 match self {
103 Reason::Undefined => write!(f, "Undefined"),
104 Reason::Join => write!(f, "Join"),
105 Reason::Leave => write!(f, "Leave"),
106 Reason::NodeDown => write!(f, "NodeDown"),
107 Reason::NodeUp => write!(f, "NodeUp"),
108 Reason::ProcDown => write!(f, "ProcDown"),
109 }
110 }
111}
112
113pub struct Address {
115 pub nodeid: NodeId,
116 pub pid: u32,
117 pub reason: Reason,
118}
119impl fmt::Debug for Address {
120 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
121 write!(
122 f,
123 "[nodeid: {}, pid: {}, reason: {}]",
124 self.nodeid, self.pid, self.reason
125 )
126 }
127}
128
129#[derive(Copy, Clone)]
131pub struct Model1Data {
132 pub flags: Model1Flags,
133 pub deliver_fn: Option<
134 fn(
135 handle: &Handle,
136 group_name: String,
137 nodeid: NodeId,
138 pid: u32,
139 msg: &[u8],
140 msg_len: usize,
141 ),
142 >,
143 pub confchg_fn: Option<
144 fn(
145 handle: &Handle,
146 group_name: &str,
147 member_list: Vec<Address>,
148 left_list: Vec<Address>,
149 joined_list: Vec<Address>,
150 ),
151 >,
152 pub totem_confchg_fn: Option<fn(handle: &Handle, ring_id: RingId, member_list: Vec<NodeId>)>,
153}
154
155#[derive(Copy, Clone)]
157pub enum ModelData {
158 ModelNone,
159 ModelV1(Model1Data),
160}
161
162#[derive(Copy, Clone)]
164pub struct Handle {
165 cpg_handle: u64, model_data: ModelData,
167}
168
169lazy_static! {
171 static ref HANDLE_HASH: Mutex<HashMap<u64, Handle>> = Mutex::new(HashMap::new());
172}
173
174fn string_to_cpg_name(group: &str) -> Result<ffi::cpg_name> {
176 if group.len() > CPG_NAMELEN_MAX - 1 {
177 return Err(CsError::CsErrInvalidParam);
178 }
179
180 let c_name = match CString::new(group) {
181 Ok(n) => n,
182 Err(_) => return Err(CsError::CsErrLibrary),
183 };
184 let mut c_group = ffi::cpg_name {
185 length: group.len() as u32,
186 value: [0; CPG_NAMELEN_MAX],
187 };
188
189 unsafe {
190 copy_nonoverlapping(c_name.as_ptr(), c_group.value.as_mut_ptr(), group.len());
192 }
193
194 Ok(c_group)
195}
196
197fn cpg_array_to_vec(list: *const ffi::cpg_address, list_entries: usize) -> Vec<Address> {
199 let temp: &[ffi::cpg_address] = unsafe { slice::from_raw_parts(list, list_entries) };
200 let mut r_vec = Vec::<Address>::new();
201
202 for i in 0..list_entries {
203 let a: Address = Address {
204 nodeid: NodeId::from(temp[i].nodeid),
205 pid: temp[i].pid,
206 reason: Reason::new(temp[i].reason),
207 };
208 r_vec.push(a);
209 }
210 r_vec
211}
212
213extern "C" fn rust_deliver_fn(
215 handle: ffi::cpg_handle_t,
216 group_name: *const ffi::cpg_name,
217 nodeid: u32,
218 pid: u32,
219 msg: *mut ::std::os::raw::c_void,
220 msg_len: usize,
221) {
222 if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) {
223 let r_group_name = unsafe {
225 CStr::from_ptr(&(*group_name).value[0])
226 .to_string_lossy()
227 .into_owned()
228 };
229
230 let data: &[u8] = unsafe { std::slice::from_raw_parts(msg as *const u8, msg_len) };
231
232 match h.model_data {
233 ModelData::ModelV1(md) => {
234 if let Some(cb) = md.deliver_fn {
235 (cb)(h, r_group_name, NodeId::from(nodeid), pid, data, msg_len);
236 }
237 }
238 _ => {}
239 }
240 }
241}
242
243extern "C" fn rust_confchg_fn(
245 handle: ffi::cpg_handle_t,
246 group_name: *const ffi::cpg_name,
247 member_list: *const ffi::cpg_address,
248 member_list_entries: usize,
249 left_list: *const ffi::cpg_address,
250 left_list_entries: usize,
251 joined_list: *const ffi::cpg_address,
252 joined_list_entries: usize,
253) {
254 if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) {
255 let r_group_name = unsafe {
256 CStr::from_ptr(&(*group_name).value[0])
257 .to_string_lossy()
258 .into_owned()
259 };
260 let r_member_list = cpg_array_to_vec(member_list, member_list_entries);
261 let r_left_list = cpg_array_to_vec(left_list, left_list_entries);
262 let r_joined_list = cpg_array_to_vec(joined_list, joined_list_entries);
263
264 match h.model_data {
265 ModelData::ModelV1(md) => {
266 if let Some(cb) = md.confchg_fn {
267 (cb)(h, &r_group_name, r_member_list, r_left_list, r_joined_list);
268 }
269 }
270 _ => {}
271 }
272 }
273}
274
275extern "C" fn rust_totem_confchg_fn(
277 handle: ffi::cpg_handle_t,
278 ring_id: ffi::cpg_ring_id,
279 member_list_entries: u32,
280 member_list: *const u32,
281) {
282 if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) {
283 let r_ring_id = RingId {
284 nodeid: NodeId::from(ring_id.nodeid),
285 seq: ring_id.seq,
286 };
287 let mut r_member_list = Vec::<NodeId>::new();
288 let temp_members: &[u32] =
289 unsafe { slice::from_raw_parts(member_list, member_list_entries as usize) };
290 for i in 0..member_list_entries as usize {
291 r_member_list.push(NodeId::from(temp_members[i]));
292 }
293
294 match h.model_data {
295 ModelData::ModelV1(md) => {
296 if let Some(cb) = md.totem_confchg_fn {
297 (cb)(h, r_ring_id, r_member_list);
298 }
299 }
300 _ => {}
301 }
302 }
303}
304
305pub fn initialize(model_data: &ModelData, context: u64) -> Result<Handle> {
309 let mut handle: ffi::cpg_handle_t = 0;
310 let mut m = match model_data {
311 ModelData::ModelV1(_v1) => {
312 ffi::cpg_model_v1_data_t {
313 model: ffi::CPG_MODEL_V1,
314 cpg_deliver_fn: Some(rust_deliver_fn),
315 cpg_confchg_fn: Some(rust_confchg_fn),
316 cpg_totem_confchg_fn: Some(rust_totem_confchg_fn),
317 flags: 0, }
319 }
320 _ => return Err(CsError::CsErrInvalidParam),
321 };
322
323 unsafe {
324 let c_context: *mut c_void = &mut &context as *mut _ as *mut c_void;
325 let c_model: *mut ffi::cpg_model_data_t = &mut m as *mut _ as *mut ffi::cpg_model_data_t;
326 let res = ffi::cpg_model_initialize(&mut handle, m.model, c_model, c_context);
327
328 if res == ffi::CS_OK {
329 let rhandle = Handle {
330 cpg_handle: handle,
331 model_data: *model_data,
332 };
333 HANDLE_HASH.lock().unwrap().insert(handle, rhandle);
334 Ok(rhandle)
335 } else {
336 Err(CsError::from_c(res))
337 }
338 }
339}
340
341pub fn finalize(handle: Handle) -> Result<()> {
343 let res = unsafe { ffi::cpg_finalize(handle.cpg_handle) };
344 if res == ffi::CS_OK {
345 HANDLE_HASH.lock().unwrap().remove(&handle.cpg_handle);
346 Ok(())
347 } else {
348 Err(CsError::from_c(res))
349 }
350}
351
352pub fn fd_get(handle: Handle) -> Result<i32> {
355 let c_fd: *mut c_int = &mut 0 as *mut _ as *mut c_int;
356 let res = unsafe { ffi::cpg_fd_get(handle.cpg_handle, c_fd) };
357 if res == ffi::CS_OK {
358 Ok(c_fd as i32)
359 } else {
360 Err(CsError::from_c(res))
361 }
362}
363
364pub fn dispatch(handle: Handle, flags: DispatchFlags) -> Result<()> {
366 let res = unsafe { ffi::cpg_dispatch(handle.cpg_handle, flags as u32) };
367 if res == ffi::CS_OK {
368 Ok(())
369 } else {
370 Err(CsError::from_c(res))
371 }
372}
373
374pub fn join(handle: Handle, group: &str) -> Result<()> {
376 let res = unsafe {
377 let c_group = string_to_cpg_name(group)?;
378 ffi::cpg_join(handle.cpg_handle, &c_group)
379 };
380 if res == ffi::CS_OK {
381 Ok(())
382 } else {
383 Err(CsError::from_c(res))
384 }
385}
386
387pub fn leave(handle: Handle, group: &str) -> Result<()> {
390 let res = unsafe {
391 let c_group = string_to_cpg_name(group)?;
392 ffi::cpg_leave(handle.cpg_handle, &c_group)
393 };
394 if res == ffi::CS_OK {
395 Ok(())
396 } else {
397 Err(CsError::from_c(res))
398 }
399}
400
401pub fn local_get(handle: Handle) -> Result<NodeId> {
403 let mut nodeid: u32 = 0;
404 let res = unsafe { ffi::cpg_local_get(handle.cpg_handle, &mut nodeid) };
405 if res == ffi::CS_OK {
406 Ok(NodeId::from(nodeid))
407 } else {
408 Err(CsError::from_c(res))
409 }
410}
411
412pub fn membership_get(handle: Handle, group: &str) -> Result<Vec<Address>> {
414 let mut member_list_entries: i32 = 0;
415 let member_list = [ffi::cpg_address {
416 nodeid: 0,
417 pid: 0,
418 reason: 0,
419 }; CPG_MEMBERS_MAX];
420 let res = unsafe {
421 let mut c_group = string_to_cpg_name(group)?;
422 let c_memlist = member_list.as_ptr() as *mut ffi::cpg_address;
423 ffi::cpg_membership_get(
424 handle.cpg_handle,
425 &mut c_group,
426 &mut *c_memlist,
427 &mut member_list_entries,
428 )
429 };
430 if res == ffi::CS_OK {
431 Ok(cpg_array_to_vec(
432 member_list.as_ptr(),
433 member_list_entries as usize,
434 ))
435 } else {
436 Err(CsError::from_c(res))
437 }
438}
439
440pub fn max_atomic_msgsize_get(handle: Handle) -> Result<u32> {
444 let mut asize: u32 = 0;
445 let res = unsafe { ffi::cpg_max_atomic_msgsize_get(handle.cpg_handle, &mut asize) };
446 if res == ffi::CS_OK {
447 Ok(asize)
448 } else {
449 Err(CsError::from_c(res))
450 }
451}
452
453pub fn context_get(handle: Handle) -> Result<u64> {
457 let mut c_context: *mut c_void = &mut 0u64 as *mut _ as *mut c_void;
458 let (res, context) = unsafe {
459 let r = ffi::cpg_context_get(handle.cpg_handle, &mut c_context);
460 let context: u64 = c_context as u64;
461 (r, context)
462 };
463 if res == ffi::CS_OK {
464 Ok(context)
465 } else {
466 Err(CsError::from_c(res))
467 }
468}
469
470pub fn context_set(handle: Handle, context: u64) -> Result<()> {
475 let res = unsafe {
476 let c_context = context as *mut c_void;
477 ffi::cpg_context_set(handle.cpg_handle, c_context)
478 };
479 if res == ffi::CS_OK {
480 Ok(())
481 } else {
482 Err(CsError::from_c(res))
483 }
484}
485
486pub fn flow_control_state_get(handle: Handle) -> Result<bool> {
488 let mut fc_state: u32 = 0;
489 let res = unsafe { ffi::cpg_flow_control_state_get(handle.cpg_handle, &mut fc_state) };
490 if res == ffi::CS_OK {
491 if fc_state == 1 {
492 Ok(true)
493 } else {
494 Ok(false)
495 }
496 } else {
497 Err(CsError::from_c(res))
498 }
499}
500
501pub fn mcast_joined(handle: Handle, guarantee: Guarantee, msg: &[u8]) -> Result<()> {
503 let c_iovec = ffi::iovec {
504 iov_base: msg.as_ptr() as *mut c_void,
505 iov_len: msg.len(),
506 };
507 let res = unsafe { ffi::cpg_mcast_joined(handle.cpg_handle, guarantee.to_c(), &c_iovec, 1) };
508 if res == ffi::CS_OK {
509 Ok(())
510 } else {
511 Err(CsError::from_c(res))
512 }
513}
514
515#[derive(Copy, Clone)]
517pub enum CpgIterType {
518 NameOnly = 1,
519 OneGroup = 2,
520 All = 3,
521}
522
523pub struct CpgIterStart {
528 iter_handle: u64,
529}
530
531pub struct CpgIter {
533 pub group: String,
534 pub nodeid: NodeId,
535 pub pid: u32,
536}
537
538pub struct CpgIntoIter {
539 iter_handle: u64,
540}
541
542impl fmt::Debug for CpgIter {
543 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
544 write!(
545 f,
546 "[group: {}, nodeid: {}, pid: {}]",
547 self.group, self.nodeid, self.pid
548 )
549 }
550}
551
552impl Iterator for CpgIntoIter {
553 type Item = CpgIter;
554
555 fn next(&mut self) -> Option<CpgIter> {
556 let mut c_iter_description = ffi::cpg_iteration_description_t {
557 nodeid: 0,
558 pid: 0,
559 group: ffi::cpg_name {
560 length: 0_u32,
561 value: [0; CPG_NAMELEN_MAX],
562 },
563 };
564 let res = unsafe { ffi::cpg_iteration_next(self.iter_handle, &mut c_iter_description) };
565
566 if res == ffi::CS_OK {
567 let r_group =
568 match string_from_bytes(c_iter_description.group.value.as_ptr(), CPG_NAMELEN_MAX) {
569 Ok(groupname) => groupname,
570 Err(_) => return None,
571 };
572 Some(CpgIter {
573 group: r_group,
574 nodeid: NodeId::from(c_iter_description.nodeid),
575 pid: c_iter_description.pid,
576 })
577 } else if res == ffi::CS_ERR_NO_SECTIONS {
578 unsafe {
580 ffi::cpg_iteration_finalize(self.iter_handle)
582 };
583 None
584 } else {
585 None
586 }
587 }
588}
589
590impl CpgIterStart {
591 pub fn new(cpg_handle: Handle, group: &str, iter_type: CpgIterType) -> Result<CpgIterStart> {
593 let mut iter_handle: u64 = 0;
594 let res = unsafe {
595 let mut c_group = string_to_cpg_name(group)?;
596 let c_itertype = iter_type as u32;
597 let c_group_ptr = {
599 match iter_type {
600 CpgIterType::All => std::ptr::null_mut(),
601 _ => &mut c_group,
602 }
603 };
604 ffi::cpg_iteration_initialize(
605 cpg_handle.cpg_handle,
606 c_itertype,
607 c_group_ptr,
608 &mut iter_handle,
609 )
610 };
611 if res == ffi::CS_OK {
612 Ok(CpgIterStart { iter_handle })
613 } else {
614 Err(CsError::from_c(res))
615 }
616 }
617}
618
619impl IntoIterator for CpgIterStart {
620 type Item = CpgIter;
621 type IntoIter = CpgIntoIter;
622
623 fn into_iter(self) -> Self::IntoIter {
624 CpgIntoIter {
625 iter_handle: self.iter_handle,
626 }
627 }
628}