iceoryx2_bb_posix/
barrier.rs

1// Copyright (c) 2023 Contributors to the Eclipse Foundation
2//
3// See the NOTICE file(s) distributed with this work for additional
4// information regarding copyright ownership.
5//
6// This program and the accompanying materials are made available under the
7// terms of the Apache Software License 2.0 which is available at
8// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
9// which is available at https://opensource.org/licenses/MIT.
10//
11// SPDX-License-Identifier: Apache-2.0 OR MIT
12
13//! Inter-process capable [`Barrier`] which blocks a previously defined number of waiters
14//! untilthe all the waiters reached [`Barrier::wait()`]
15//!
16//! # Examples
17//!
18//! ```
19//! use iceoryx2_bb_posix::barrier::*;
20//! use std::thread;
21//!
22//! let number_of_waiters = 2;
23//! let handle = BarrierHandle::new();
24//! let barrier = BarrierBuilder::new(number_of_waiters)
25//!                                    .is_interprocess_capable(false)
26//!                                    .create(&handle).unwrap();
27//! thread::scope(|s| {
28//!   s.spawn(|| {
29//!     println!("Thread: waiting ...");
30//!     barrier.wait();
31//!     println!("Thread: lets start!");
32//!   });
33//!
34//!   println!("main: waiting ...");
35//!   barrier.wait();
36//!   println!("all systems ready!");
37//! });
38//! ```
39pub use crate::ipc_capable::{Handle, IpcCapable};
40
41use iceoryx2_bb_elementary::scope_guard::ScopeGuardBuilder;
42use iceoryx2_bb_log::{fail, fatal_panic, warn};
43use iceoryx2_pal_posix::posix::errno::Errno;
44use iceoryx2_pal_posix::posix::Struct;
45use iceoryx2_pal_posix::*;
46
47use crate::handle_errno;
48use crate::ipc_capable::internal::{Capability, HandleStorage, IpcConstructible};
49
50#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
51pub enum BarrierCreationError {
52    InsufficientMemory,
53    SystemWideBarrierLimitReached,
54    UnknownError(i32),
55}
56
57/// Builder for the [`Barrier`]. The default values for number_of_waiters is 1 and it is
58/// interprocess capable unless it is configured otherwise.
59#[derive(Debug)]
60pub struct BarrierBuilder {
61    number_of_waiters: u32,
62    is_interprocess_capable: bool,
63}
64
65impl BarrierBuilder {
66    /// Creates a new [`BarrierBuilder`] for a [`Barrier`] which is waiting for the provided number of waiters
67    pub fn new(number_of_waiters: u32) -> BarrierBuilder {
68        BarrierBuilder {
69            number_of_waiters,
70            is_interprocess_capable: false,
71        }
72    }
73
74    /// Defines if the [`Barrier`] is inter-process capable or not.
75    pub fn is_interprocess_capable(mut self, value: bool) -> Self {
76        self.is_interprocess_capable = value;
77        self
78    }
79
80    fn initialize_barrier(
81        &self,
82        barrier: *mut posix::pthread_barrier_t,
83    ) -> Result<Capability, BarrierCreationError> {
84        let msg = "Unable to create barrier";
85
86        let mut attr =
87            ScopeGuardBuilder::new( posix::pthread_barrierattr_t::new() )
88                .on_init(|attr| {
89                    let msg = "Unable to create barrier attributes";
90                    handle_errno!(BarrierCreationError, from self,
91                        errno_source unsafe { posix::pthread_barrierattr_init(attr).into() },
92                        success Errno::ESUCCES => (),
93                        Errno::ENOMEM => (InsufficientMemory, "{} due to insufficient memory.", msg),
94                        v => (UnknownError(v as i32), "{} since an unknown error occurred ({}).", msg, v)
95                    );})
96                    .on_drop(|attr| {
97                        if unsafe{ posix::pthread_barrierattr_destroy(attr)} != 0 {
98                            fatal_panic!(from self, "This should never happen! Unable to cleanup barrier attribute.");
99                        }})
100                .create()?;
101
102        match unsafe {
103            posix::pthread_barrierattr_setpshared(
104                attr.get_mut(),
105                if self.is_interprocess_capable {
106                    posix::PTHREAD_PROCESS_SHARED
107                } else {
108                    posix::PTHREAD_PROCESS_PRIVATE
109                },
110            )
111        } {
112            0 => (),
113            v => {
114                fatal_panic!(from self, "This should never happen! Unable to set pshared attribute ({}).",v);
115            }
116        }
117
118        match unsafe { posix::pthread_barrier_init(barrier, attr.get(), self.number_of_waiters) }
119            .into()
120        {
121            Errno::ESUCCES => (),
122            Errno::ENOMEM => {
123                fail!(from self, with BarrierCreationError::InsufficientMemory, "{} due to insufficient memory.", msg);
124            }
125            Errno::EAGAIN => {
126                fail!(from self, with BarrierCreationError::SystemWideBarrierLimitReached,
127                    "{} since system-wide maximum of barriers is reached.", msg
128                );
129            }
130            v => {
131                fail!(from self, with BarrierCreationError::UnknownError(v as i32),
132                    "{} since an unknown error occurred ({}).", msg, v
133                );
134            }
135        };
136
137        match self.is_interprocess_capable {
138            true => Ok(Capability::InterProcess),
139            false => Ok(Capability::ProcessLocal),
140        }
141    }
142
143    /// Creates a new [`Barrier`]
144    pub fn create(self, handle: &BarrierHandle) -> Result<Barrier, BarrierCreationError> {
145        unsafe {
146            handle
147                .handle
148                .initialize(|barrier| self.initialize_barrier(barrier))?;
149        }
150
151        Ok(Barrier::new(handle))
152    }
153}
154
155#[derive(Debug)]
156pub struct BarrierHandle {
157    handle: HandleStorage<posix::pthread_barrier_t>,
158}
159
160unsafe impl Send for BarrierHandle {}
161unsafe impl Sync for BarrierHandle {}
162
163impl Handle for BarrierHandle {
164    fn new() -> Self {
165        Self {
166            handle: HandleStorage::new(posix::pthread_barrier_t::new()),
167        }
168    }
169
170    fn is_inter_process_capable(&self) -> bool {
171        self.handle.is_inter_process_capable()
172    }
173
174    fn is_initialized(&self) -> bool {
175        self.handle.is_initialized()
176    }
177}
178
179impl Drop for BarrierHandle {
180    fn drop(&mut self) {
181        if self.handle.is_initialized() {
182            unsafe {
183                self.handle.cleanup(|barrier| {
184                if posix::pthread_barrier_destroy(barrier) != 0 {
185                    warn!(from self,
186                        "Unable to destroy barrier. Was it already destroyed by another instance in another process?");
187                }
188            });
189            };
190        }
191    }
192}
193
194/// Barrier which blocks a previously defined number of waiters until all the waiters
195/// reached [`Barrier::wait()`]
196#[derive(Debug)]
197pub struct Barrier<'a> {
198    handle: &'a BarrierHandle,
199}
200
201unsafe impl Sync for Barrier<'_> {}
202unsafe impl Send for Barrier<'_> {}
203
204impl Barrier<'_> {
205    pub fn wait(&self) {
206        match unsafe { posix::pthread_barrier_wait(self.handle.handle.get()) } {
207            0 | posix::PTHREAD_BARRIER_SERIAL_THREAD => (),
208            v => {
209                fatal_panic!(from self, "This should never happen! Unable to wait on barrier ({}).", v);
210            }
211        }
212    }
213}
214
215impl<'a> IpcConstructible<'a, BarrierHandle> for Barrier<'a> {
216    fn new(handle: &BarrierHandle) -> Barrier {
217        Barrier { handle }
218    }
219}
220
221impl<'a> IpcCapable<'a, BarrierHandle> for Barrier<'a> {
222    fn is_interprocess_capable(&self) -> bool {
223        self.handle.is_inter_process_capable()
224    }
225}