use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::{cell::UnsafeCell, fmt::Debug};
pub use crate::unmovable_ipc_handle::IpcCapable;
use crate::unmovable_ipc_handle::{internal::*, IpcHandleState};
use iceoryx2_bb_container::semantic_string::*;
use iceoryx2_bb_elementary::enum_gen;
use iceoryx2_bb_log::{debug, fail, fatal_panic};
use iceoryx2_bb_system_types::file_name::FileName;
use iceoryx2_bb_system_types::file_path::*;
use iceoryx2_bb_system_types::path::*;
use iceoryx2_pal_posix::posix::errno::Errno;
use iceoryx2_pal_posix::posix::Struct;
use iceoryx2_pal_posix::*;
use crate::{
adaptive_wait::*,
clock::{AsTimespec, Time, TimeError},
config::MAX_INITIAL_SEMAPHORE_VALUE,
handle_errno,
system_configuration::*,
};
use std::time::Duration;
pub use crate::clock::ClockType;
pub use crate::creation_mode::CreationMode;
pub use crate::permission::Permission;
enum_gen! { NamedSemaphoreCreationError
entry:
InsufficientPermissions,
InitialValueTooLarge,
PerProcessFileHandleLimitReached,
SystemWideFileHandleLimitReached,
AlreadyExists,
MaxFilePathLengthExceeded,
Interrupt,
NotSupportForGivenName,
DoesNotExist,
NoSpaceLeft,
UnknownError(i32)
}
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
pub enum UnnamedSemaphoreCreationError {
InitialValueTooLarge,
ExceedsMaximumNumberOfSemaphores,
InsufficientPermissions,
HandleAlreadyInitialized,
UnknownError(i32),
}
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
pub enum SemaphorePostError {
Overflow,
UnknownError(i32),
}
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
pub enum SemaphoreWaitError {
NotSupported,
DeadlockConditionDetected,
Interrupt,
UnknownError(i32),
}
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
pub enum UnnamedSemaphoreOpenIpcHandleError {
IsNotInterProcessCapable,
Uninitialized,
}
enum_gen! {
SemaphoreTimedWaitError
entry:
WaitingTimeExceedsSystemLimits
mapping:
SemaphoreWaitError,
AdaptiveWaitError,
TimeError
}
enum_gen! {
SemaphoreError
generalization:
FailedToCreate <= NamedSemaphoreCreationError; UnnamedSemaphoreCreationError,
FailedToPost <= SemaphorePostError,
FailedToWait <= SemaphoreWaitError; SemaphoreTimedWaitError
}
#[derive(PartialEq, Eq)]
enum UnlinkMode {
IgnoreNonExistingSemaphore,
FailWhenSemaphoreDoesNotExist,
}
#[derive(PartialEq, Eq)]
enum InitMode {
Create,
Open,
TryOpen,
}
mod internal {
use super::*;
#[doc(hidden)]
pub trait SemaphoreHandle {
fn handle(&self) -> *mut posix::sem_t;
fn get_clock_type(&self) -> ClockType;
}
}
pub trait SemaphoreInterface: internal::SemaphoreHandle + Debug {
fn post(&self) -> Result<(), SemaphorePostError> {
if unsafe { posix::sem_post(self.handle()) } == 0 {
return Ok(());
}
let msg = "Unable to post semaphore";
handle_errno!(SemaphorePostError, from self,
fatal Errno::EINVAL => ("This should never happen! {} since an invalid handle was provided.", msg),
Errno::EOVERFLOW => (Overflow, "{} since the operation would cause an overflow.", msg),
v => (UnknownError(v as i32), "{} since an unknown error occurred ({}).", msg, v)
);
}
fn wait(&self) -> Result<(), SemaphoreWaitError> {
if unsafe { posix::sem_wait(self.handle()) } == 0 {
return Ok(());
}
let msg = "Unable to wait on semaphore";
handle_errno!(SemaphoreWaitError, from self,
fatal Errno::EINVAL => ("This should never happen! {} since an invalid handle was provided!", msg),
Errno::ENOSYS => (NotSupported, "{} since sem_wait is not supported by the system.", msg),
Errno::EDEADLK => (DeadlockConditionDetected, "{} since a deadlock condition was detected.", msg),
Errno::EINTR => (Interrupt, "{} since an interrupt signal was received.", msg),
v => (UnknownError(v as i32), "{} since an unknown error occurred ({}).", msg, v)
);
}
fn try_wait(&self) -> Result<bool, SemaphoreWaitError> {
if unsafe { posix::sem_trywait(self.handle()) } == 0 {
return Ok(true);
}
let msg = "Unable to wait on semaphore";
handle_errno!(SemaphoreWaitError, from self,
success Errno::EAGAIN => false,
fatal Errno::EINVAL => ("This should never happen! {} since an invalid handle was provided!", msg),
Errno::ENOSYS => (NotSupported, "{} since sem_wait is not supported by the system.", msg),
Errno::EDEADLK => (DeadlockConditionDetected, "{} since a deadlock condition was detected.", msg),
Errno::EINTR => (Interrupt, "{} since an interrupt signal was received.", msg),
v => (UnknownError(v as i32), "{} since an unknown error occurred ({}).", msg, v)
);
}
fn timed_wait(&self, timeout: Duration) -> Result<bool, SemaphoreTimedWaitError> {
let msg = "Unable to timed wait on semaphore";
match self.clock_type() {
ClockType::Monotonic => {
let mut adaptive_wait = fail!(from self, when AdaptiveWaitBuilder::new()
.clock_type(self.clock_type())
.create(), "{} since the adaptive wait could not be created.", msg);
match adaptive_wait.timed_wait_while(
|| -> Result<bool, SemaphoreWaitError> { Ok(!self.try_wait()?) },
timeout,
) {
Ok(v) => Ok(v),
Err(AdaptiveTimedWaitWhileError::PredicateFailure(v)) => {
fail!(from self, with SemaphoreTimedWaitError::from(v),
"{} since try_wait() failed with ({:?}).", msg, v);
}
Err(AdaptiveTimedWaitWhileError::AdaptiveWaitError(v)) => {
fail!(from self, with SemaphoreTimedWaitError::from(v),
"{} since adaptive wait failed with ({:?}).", msg, v);
}
}
}
ClockType::Realtime => {
let wait_time = timeout
+ fail!(from self, when Time::now_with_clock(self.clock_type()),
"{} due to a failure while acquiring the current system time.", msg)
.as_duration();
if unsafe { posix::sem_timedwait(self.handle(), &wait_time.as_timespec()) } == 0 {
return Ok(true);
}
let msg = "Failed to perform timedwait";
handle_errno!(SemaphoreTimedWaitError, from self,
success Errno::ETIMEDOUT => false,
Errno::EINVAL => (WaitingTimeExceedsSystemLimits, "{} since the provided duration {:?} exceeds the maximum supported limit.", msg, timeout),
Errno::EDEADLK => (SemaphoreWaitError(SemaphoreWaitError::DeadlockConditionDetected), "{} since a deadlock condition was detected.", msg),
Errno::EINTR => (SemaphoreWaitError(SemaphoreWaitError::Interrupt), "{} since an interrupt signal occurred.", msg),
v => (SemaphoreWaitError(SemaphoreWaitError::UnknownError(v as i32)), "{} since an unknown error occurred ({}).", msg, v)
)
}
}
}
fn clock_type(&self) -> ClockType {
self.get_clock_type()
}
}
#[derive(Debug)]
pub struct NamedSemaphoreBuilder {
name: FileName,
initial_value: u32,
permission: Permission,
clock_type: ClockType,
creation_mode: Option<CreationMode>,
}
impl NamedSemaphoreBuilder {
pub fn new(name: &FileName) -> Self {
Self {
creation_mode: None,
name: *name,
initial_value: 0,
permission: Permission::OWNER_ALL,
clock_type: ClockType::default(),
}
}
pub fn clock_type(mut self, value: ClockType) -> Self {
self.clock_type = value;
self
}
pub fn open_existing(self) -> Result<NamedSemaphore, NamedSemaphoreCreationError> {
NamedSemaphore::new(self)
}
pub fn creation_mode(mut self, creation_mode: CreationMode) -> NamedSemaphoreCreationBuilder {
self.creation_mode = Some(creation_mode);
NamedSemaphoreCreationBuilder { config: self }
}
}
pub struct NamedSemaphoreCreationBuilder {
config: NamedSemaphoreBuilder,
}
impl NamedSemaphoreCreationBuilder {
pub fn initial_value(mut self, value: u32) -> Self {
self.config.initial_value = value;
self
}
pub fn permission(mut self, value: Permission) -> Self {
self.config.permission = value;
self
}
pub fn create(self) -> Result<NamedSemaphore, NamedSemaphoreCreationError> {
NamedSemaphore::new(self.config)
}
}
#[derive(Debug)]
pub struct NamedSemaphore {
name: FileName,
handle: *mut posix::sem_t,
has_ownership: bool,
clock_type: ClockType,
}
unsafe impl Send for NamedSemaphore {}
unsafe impl Sync for NamedSemaphore {}
impl Drop for NamedSemaphore {
fn drop(&mut self) {
if self.handle == posix::SEM_FAILED {
return;
}
if unsafe { posix::sem_close(self.handle) } != 0 {
fatal_panic!(from self, "This should never happen! The semaphore handle is invalid and cannot be closed.");
}
if self.has_ownership
&& self
.unlink(UnlinkMode::FailWhenSemaphoreDoesNotExist)
.is_err()
{
fatal_panic!(from self, "Failed to cleanup semaphore. Something else removed a managed semaphore which should never happen!");
}
}
}
impl NamedSemaphore {
fn new(config: NamedSemaphoreBuilder) -> Result<NamedSemaphore, NamedSemaphoreCreationError> {
let mut new_sem = NamedSemaphore {
name: config.name,
handle: posix::SEM_FAILED,
has_ownership: false,
clock_type: config.clock_type,
};
match config.creation_mode {
None => {
new_sem.open(Permission::none(), InitMode::Open, 0)?;
}
Some(CreationMode::PurgeAndCreate) => {
new_sem.has_ownership = true;
fail!(from new_sem, when new_sem.unlink(UnlinkMode::IgnoreNonExistingSemaphore), "Failed to remove semaphore before creating a new one.");
new_sem.open(config.permission, InitMode::Create, config.initial_value)?;
}
Some(CreationMode::CreateExclusive) => {
new_sem.has_ownership = true;
new_sem.open(config.permission, InitMode::Create, config.initial_value)?;
}
Some(CreationMode::OpenOrCreate) => {
match new_sem.open(Permission::none(), InitMode::TryOpen, 0) {
Ok(()) => (),
Err(NamedSemaphoreCreationError::DoesNotExist) => {
new_sem.has_ownership = true;
new_sem.open(config.permission, InitMode::Create, config.initial_value)?;
}
Err(v) => return Err(v),
}
}
};
Ok(new_sem)
}
fn unlink(&mut self, mode: UnlinkMode) -> Result<(), NamedSemaphoreCreationError> {
let file_path =
FilePath::from_path_and_file(&Path::new(b"/").unwrap(), &self.name).unwrap();
if unsafe { posix::sem_unlink(file_path.as_c_str()) } == 0 {
debug!(from self, "semaphore removed.");
return Ok(());
}
let msg = "Unable to unlink semaphore";
let ignore_non_existing_semaphore = mode == UnlinkMode::IgnoreNonExistingSemaphore;
handle_errno!(NamedSemaphoreCreationError, from self,
success_when ignore_non_existing_semaphore,
Errno::ENOENT => ((), AlreadyExists, "{} since no semaphore with the given name exists.", msg),
Errno::EACCES => (InsufficientPermissions, "{} due to insufficient permissions.", msg),
Errno::ENAMETOOLONG => (MaxFilePathLengthExceeded, "{} since the name exceeds the maximum supported length.", msg),
v => (UnknownError(v as i32), "{} since an unknown error occurred ({}).", msg, v)
)
}
fn open(
&mut self,
permission: Permission,
mode: InitMode,
initial_value: u32,
) -> Result<(), NamedSemaphoreCreationError> {
let msg;
if initial_value > MAX_INITIAL_SEMAPHORE_VALUE {
fail!(from self, with NamedSemaphoreCreationError::InitialValueTooLarge,
"Unable to create semaphore since the initial semaphore value {} is greater than the maximum supported value of {}.", initial_value, MAX_INITIAL_SEMAPHORE_VALUE);
}
let file_path =
FilePath::from_path_and_file(&Path::new(b"/").unwrap(), &self.name).unwrap();
Errno::reset();
self.handle = match mode {
InitMode::Create => unsafe {
msg = "Unable to create semaphore";
posix::sem_create(
file_path.as_c_str(),
posix::O_CREAT | posix::O_EXCL,
permission.as_mode(),
initial_value,
)
},
InitMode::Open | InitMode::TryOpen => unsafe {
msg = "Unable to open semaphore";
posix::sem_open(file_path.as_c_str(), 0)
},
};
if self.handle != posix::SEM_FAILED {
match mode {
InitMode::Create => debug!(from self, "semaphore created."),
_ => debug!(from self, "semaphore opened."),
}
return Ok(());
}
let has_try_open_mode = mode == InitMode::TryOpen;
handle_errno!(NamedSemaphoreCreationError, from self,
success_when has_try_open_mode,
Errno::ENOENT => ((), DoesNotExist, "{} since the semaphore does not exist." ,msg),
Errno::EACCES => (InsufficientPermissions, "{} due to insufficient permissions.", msg),
Errno::EEXIST => (AlreadyExists, "{} since the semaphore already exists.", msg),
Errno::EINTR => (Interrupt, "{} since an interrupt signal was received.", msg),
Errno::EINVAL => (NotSupportForGivenName, "{} since the operation is not supported for the given name.", msg),
Errno::EMFILE => (PerProcessFileHandleLimitReached, "{} since the current process already holds the maximum amount of semaphore or file descriptos.", msg),
Errno::ENAMETOOLONG => (MaxFilePathLengthExceeded, "{} since the name exceeds the maximum supported length.", msg),
Errno::ENFILE => (SystemWideFileHandleLimitReached, "{} since the system-wide semaphore or file-handle limit is reached.", msg),
Errno::ENOSPC => (NoSpaceLeft, "{} due to insufficient space on the target.", msg),
v => (UnknownError(v as i32), "{} since an unknown error occurred ({}).", msg,v)
);
}
pub fn name(&self) -> &FileName {
&self.name
}
}
impl internal::SemaphoreHandle for NamedSemaphore {
fn handle(&self) -> *mut posix::sem_t {
self.handle
}
fn get_clock_type(&self) -> ClockType {
self.clock_type
}
}
impl SemaphoreInterface for NamedSemaphore {}
#[derive(Debug)]
pub struct UnnamedSemaphoreBuilder {
clock_type: ClockType,
is_interprocess_capable: bool,
initial_value: u32,
}
impl Default for UnnamedSemaphoreBuilder {
fn default() -> Self {
UnnamedSemaphoreBuilder {
clock_type: ClockType::default(),
is_interprocess_capable: true,
initial_value: 0,
}
}
}
impl UnnamedSemaphoreBuilder {
pub fn new() -> UnnamedSemaphoreBuilder {
Self::default()
}
pub fn initial_value(mut self, value: u32) -> Self {
self.initial_value = value;
self
}
pub fn is_interprocess_capable(mut self, value: bool) -> Self {
self.is_interprocess_capable = value;
self
}
pub fn clock_type(mut self, value: ClockType) -> Self {
self.clock_type = value;
self
}
pub fn create(
self,
handle: &UnnamedSemaphoreHandle,
) -> Result<UnnamedSemaphore, UnnamedSemaphoreCreationError> {
let msg = "Unable to create semaphore";
if handle
.reference_counter
.compare_exchange(
IpcHandleState::Uninitialized as _,
IpcHandleState::PerformingInitialization as _,
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_err()
{
fail!(from self, with UnnamedSemaphoreCreationError::HandleAlreadyInitialized,
"{} since the handle is already initialized with another semaphore.", msg);
}
handle
.is_interprocess_capable
.store(self.is_interprocess_capable, Ordering::Relaxed);
unsafe { *handle.clock_type.get() = self.clock_type };
if self.initial_value > MAX_INITIAL_SEMAPHORE_VALUE {
handle.reference_counter.store(-1, Ordering::Relaxed);
fail!(from self, with UnnamedSemaphoreCreationError::InitialValueTooLarge,
"{} since the initial value {} is too large.", msg, self.initial_value);
}
if unsafe {
posix::sem_init(
handle.as_ptr(),
if self.is_interprocess_capable { 1 } else { 0 },
self.initial_value,
)
} != -1
{
handle
.reference_counter
.store(IpcHandleState::Initialized as _, Ordering::Release);
return Ok(UnnamedSemaphore::new(handle));
}
handle_errno!(UnnamedSemaphoreCreationError, from self,
Errno::EINVAL => (InitialValueTooLarge, "{} since the initial value {} is too large. Please verify posix configuration!", msg, self.initial_value),
Errno::ENOSPC => (ExceedsMaximumNumberOfSemaphores, "{} since it exceeds the maximum amount of semaphores {}.", msg, Limit::MaxNumberOfSemaphores.value()),
Errno::EPERM => (InsufficientPermissions, "{} due to insufficient permissions.", msg),
v => (UnknownError(v as i32), "{} since an unknown error occurred ({}).", msg, v)
);
}
}
#[derive(Debug)]
pub struct UnnamedSemaphoreHandle {
handle: UnsafeCell<posix::sem_t>,
clock_type: UnsafeCell<ClockType>,
is_interprocess_capable: AtomicBool,
reference_counter: AtomicI64,
}
unsafe impl Send for UnnamedSemaphoreHandle {}
unsafe impl Sync for UnnamedSemaphoreHandle {}
impl crate::unmovable_ipc_handle::internal::UnmovableIpcHandle for UnnamedSemaphoreHandle {
fn reference_counter(&self) -> &AtomicI64 {
&self.reference_counter
}
fn is_interprocess_capable(&self) -> bool {
self.is_interprocess_capable.load(Ordering::Relaxed)
}
}
impl Default for UnnamedSemaphoreHandle {
fn default() -> Self {
Self {
handle: UnsafeCell::new(posix::sem_t::new()),
clock_type: UnsafeCell::new(ClockType::default()),
is_interprocess_capable: AtomicBool::new(false),
reference_counter: AtomicI64::new(IpcHandleState::Uninitialized as _),
}
}
}
impl UnnamedSemaphoreHandle {
pub fn new() -> Self {
Self::default()
}
fn as_ptr(&self) -> *mut posix::sem_t {
self.handle.get()
}
}
#[derive(Debug)]
pub struct UnnamedSemaphore<'a> {
handle: &'a UnnamedSemaphoreHandle,
}
unsafe impl Send for UnnamedSemaphore<'_> {}
unsafe impl Sync for UnnamedSemaphore<'_> {}
impl Drop for UnnamedSemaphore<'_> {
fn drop(&mut self) {
if self.handle.reference_counter.fetch_sub(1, Ordering::AcqRel) == 1 {
if unsafe { posix::sem_destroy(self.handle.as_ptr()) } != 0 {
fatal_panic!(from self, "This should never happen! Unable to destroy semaphore since the file-descriptor was invalid.");
}
self.handle.reference_counter.store(-1, Ordering::Release);
}
}
}
impl<'a> CreateIpcConstruct<'a, UnnamedSemaphoreHandle> for UnnamedSemaphore<'a> {
fn new(handle: &'a UnnamedSemaphoreHandle) -> Self {
Self { handle }
}
}
impl<'a> IpcCapable<'a, UnnamedSemaphoreHandle> for UnnamedSemaphore<'a> {}
impl<'a> UnnamedSemaphore<'a> {
pub fn is_interprocess_capable(&self) -> bool {
self.handle.is_interprocess_capable.load(Ordering::Relaxed)
}
}
impl internal::SemaphoreHandle for UnnamedSemaphore<'_> {
fn handle(&self) -> *mut posix::sem_t {
self.handle.as_ptr()
}
fn get_clock_type(&self) -> ClockType {
unsafe { *self.handle.clock_type.get() }
}
}
impl SemaphoreInterface for UnnamedSemaphore<'_> {}