iceoryx2_bb_posix/
barrier.rs

1// Copyright (c) 2023 Contributors to the Eclipse Foundation
2//
3// See the NOTICE file(s) distributed with this work for additional
4// information regarding copyright ownership.
5//
6// This program and the accompanying materials are made available under the
7// terms of the Apache Software License 2.0 which is available at
8// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
9// which is available at https://opensource.org/licenses/MIT.
10//
11// SPDX-License-Identifier: Apache-2.0 OR MIT
12
13//! Inter-process capable [`Barrier`] which blocks a previously defined number of waiters
14//! untilthe all the waiters reached [`Barrier::wait()`]
15//!
16//! # Examples
17//!
18//! ```
19//! # extern crate iceoryx2_loggers;
20//!
21//! use iceoryx2_bb_posix::barrier::*;
22//! use std::thread;
23//!
24//! let number_of_waiters = 2;
25//! let handle = BarrierHandle::new();
26//! let barrier = BarrierBuilder::new(number_of_waiters)
27//!                                    .is_interprocess_capable(false)
28//!                                    .create(&handle).unwrap();
29//! thread::scope(|s| {
30//!   s.spawn(|| {
31//!     println!("Thread: waiting ...");
32//!     barrier.wait();
33//!     println!("Thread: lets start!");
34//!   });
35//!
36//!   println!("main: waiting ...");
37//!   barrier.wait();
38//!   println!("all systems ready!");
39//! });
40//! ```
41pub use crate::ipc_capable::{Handle, IpcCapable};
42
43use iceoryx2_bb_elementary::scope_guard::ScopeGuardBuilder;
44use iceoryx2_log::{fail, fatal_panic, warn};
45use iceoryx2_pal_posix::posix::errno::Errno;
46use iceoryx2_pal_posix::posix::MemZeroedStruct;
47use iceoryx2_pal_posix::*;
48
49use crate::handle_errno;
50use crate::ipc_capable::internal::{Capability, HandleStorage, IpcConstructible};
51
52#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
53pub enum BarrierCreationError {
54    InsufficientMemory,
55    SystemWideBarrierLimitReached,
56    UnknownError(i32),
57}
58
59/// Builder for the [`Barrier`]. The default values for number_of_waiters is 1 and it is
60/// interprocess capable unless it is configured otherwise.
61#[derive(Debug)]
62pub struct BarrierBuilder {
63    number_of_waiters: u32,
64    is_interprocess_capable: bool,
65}
66
67impl BarrierBuilder {
68    /// Creates a new [`BarrierBuilder`] for a [`Barrier`] which is waiting for the provided number of waiters
69    pub fn new(number_of_waiters: u32) -> BarrierBuilder {
70        BarrierBuilder {
71            number_of_waiters,
72            is_interprocess_capable: false,
73        }
74    }
75
76    /// Defines if the [`Barrier`] is inter-process capable or not.
77    pub fn is_interprocess_capable(mut self, value: bool) -> Self {
78        self.is_interprocess_capable = value;
79        self
80    }
81
82    fn initialize_barrier(
83        &self,
84        barrier: *mut posix::pthread_barrier_t,
85    ) -> Result<Capability, BarrierCreationError> {
86        let msg = "Unable to create barrier";
87
88        let mut attr =
89            ScopeGuardBuilder::new( posix::pthread_barrierattr_t::new_zeroed() )
90                .on_init(|attr| {
91                    let msg = "Unable to create barrier attributes";
92                    handle_errno!(BarrierCreationError, from self,
93                        errno_source unsafe { posix::pthread_barrierattr_init(attr).into() },
94                        success Errno::ESUCCES => (),
95                        Errno::ENOMEM => (InsufficientMemory, "{} due to insufficient memory.", msg),
96                        v => (UnknownError(v as i32), "{} since an unknown error occurred ({}).", msg, v)
97                    );})
98                    .on_drop(|attr| {
99                        if unsafe{ posix::pthread_barrierattr_destroy(attr)} != 0 {
100                            fatal_panic!(from self, "This should never happen! Unable to cleanup barrier attribute.");
101                        }})
102                .create()?;
103
104        match unsafe {
105            posix::pthread_barrierattr_setpshared(
106                attr.get_mut(),
107                if self.is_interprocess_capable {
108                    posix::PTHREAD_PROCESS_SHARED
109                } else {
110                    posix::PTHREAD_PROCESS_PRIVATE
111                },
112            )
113        } {
114            0 => (),
115            v => {
116                fatal_panic!(from self, "This should never happen! Unable to set pshared attribute ({}).",v);
117            }
118        }
119
120        match unsafe { posix::pthread_barrier_init(barrier, attr.get(), self.number_of_waiters) }
121            .into()
122        {
123            Errno::ESUCCES => (),
124            Errno::ENOMEM => {
125                fail!(from self, with BarrierCreationError::InsufficientMemory, "{} due to insufficient memory.", msg);
126            }
127            Errno::EAGAIN => {
128                fail!(from self, with BarrierCreationError::SystemWideBarrierLimitReached,
129                    "{} since system-wide maximum of barriers is reached.", msg
130                );
131            }
132            v => {
133                fail!(from self, with BarrierCreationError::UnknownError(v as i32),
134                    "{} since an unknown error occurred ({}).", msg, v
135                );
136            }
137        };
138
139        match self.is_interprocess_capable {
140            true => Ok(Capability::InterProcess),
141            false => Ok(Capability::ProcessLocal),
142        }
143    }
144
145    /// Creates a new [`Barrier`]
146    pub fn create(self, handle: &BarrierHandle) -> Result<Barrier<'_>, BarrierCreationError> {
147        unsafe {
148            handle
149                .handle
150                .initialize(|barrier| self.initialize_barrier(barrier))?;
151        }
152
153        Ok(Barrier::new(handle))
154    }
155}
156
157#[derive(Debug)]
158pub struct BarrierHandle {
159    handle: HandleStorage<posix::pthread_barrier_t>,
160}
161
162unsafe impl Send for BarrierHandle {}
163unsafe impl Sync for BarrierHandle {}
164
165impl Handle for BarrierHandle {
166    fn new() -> Self {
167        Self {
168            handle: HandleStorage::new(posix::pthread_barrier_t::new_zeroed()),
169        }
170    }
171
172    fn is_inter_process_capable(&self) -> bool {
173        self.handle.is_inter_process_capable()
174    }
175
176    fn is_initialized(&self) -> bool {
177        self.handle.is_initialized()
178    }
179}
180
181impl Drop for BarrierHandle {
182    fn drop(&mut self) {
183        if self.handle.is_initialized() {
184            unsafe {
185                self.handle.cleanup(|barrier| {
186                if posix::pthread_barrier_destroy(barrier) != 0 {
187                    warn!(from self,
188                        "Unable to destroy barrier. Was it already destroyed by another instance in another process?");
189                }
190            });
191            };
192        }
193    }
194}
195
196/// Barrier which blocks a previously defined number of waiters until all the waiters
197/// reached [`Barrier::wait()`]
198#[derive(Debug)]
199pub struct Barrier<'a> {
200    handle: &'a BarrierHandle,
201}
202
203unsafe impl Sync for Barrier<'_> {}
204unsafe impl Send for Barrier<'_> {}
205
206impl Barrier<'_> {
207    pub fn wait(&self) {
208        match unsafe { posix::pthread_barrier_wait(self.handle.handle.get()) } {
209            0 | posix::PTHREAD_BARRIER_SERIAL_THREAD => (),
210            v => {
211                fatal_panic!(from self, "This should never happen! Unable to wait on barrier ({}).", v);
212            }
213        }
214    }
215}
216
217impl<'a> IpcConstructible<'a, BarrierHandle> for Barrier<'a> {
218    fn new(handle: &BarrierHandle) -> Barrier<'_> {
219        Barrier { handle }
220    }
221}
222
223impl<'a> IpcCapable<'a, BarrierHandle> for Barrier<'a> {
224    fn is_interprocess_capable(&self) -> bool {
225        self.handle.is_inter_process_capable()
226    }
227}