1pub use crate::ipc_capable::{Handle, IpcCapable};
49
50use crate::ipc_capable::internal::{Capability, HandleStorage, IpcConstructible};
51use iceoryx2_bb_elementary::scope_guard::*;
52use iceoryx2_bb_log::{fail, fatal_panic, warn};
53use iceoryx2_pal_posix::*;
54use std::cell::UnsafeCell;
55use std::fmt::Debug;
56use std::ops::{Deref, DerefMut};
57use std::time::Duration;
58
59use crate::adaptive_wait::*;
60use crate::clock::{AsTimespec, ClockType, NanosleepError, Time, TimeError};
61use crate::handle_errno;
62use iceoryx2_pal_posix::posix::errno::Errno;
63use iceoryx2_pal_posix::posix::Struct;
64
65#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
66pub enum MutexCreationError {
67 InsufficientMemory,
68 InsufficientResources,
69 InsufficientPermissions,
70 NoInterProcessSupport,
71 UnableToSetType,
72 UnableToSetProtocol,
73 UnableToSetThreadTerminationBehavior,
74 UnableToSetPriorityCeiling,
75 UnknownError(i32),
76}
77
78#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
79pub enum MutexGetPrioCeilingError {
80 InsufficientPermissions,
81 UnknownError(i32),
82}
83
84#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
85pub enum MutexSetPrioCeilingError {
86 ValueOutOfRange,
87 InsufficientPermissions,
88 UnknownError(i32),
89}
90
91#[derive(Debug, PartialEq, Eq)]
92pub enum MutexLockError<'mutex, 'handle, T: Sized + Debug> {
93 ExceededMaximumNumberOfRecursiveLocks,
94 DeadlockDetected,
95 LockAcquiredButOwnerDied(MutexGuard<'mutex, 'handle, T>),
96 UnrecoverableState,
97 UnknownError(i32),
98}
99
100impl<T: Sized + Debug> PartialEq for MutexGuard<'_, '_, T> {
101 fn eq(&self, other: &Self) -> bool {
102 core::ptr::eq(self.mutex, other.mutex)
103 }
104}
105
106impl<T: Sized + Debug> Eq for MutexGuard<'_, '_, T> {}
107
108#[derive(Debug, PartialEq, Eq)]
109pub enum MutexTimedLockError<'mutex, 'handle, T: Sized + Debug> {
110 TimeoutExceedsMaximumSupportedDuration,
111 MutexLockError(MutexLockError<'mutex, 'handle, T>),
112 NanosleepError(NanosleepError),
113 AdaptiveWaitError(AdaptiveWaitError),
114 FailureInInternalClockWhileWait(TimeError),
115}
116
117impl<T: Debug> From<TimeError> for MutexTimedLockError<'_, '_, T> {
118 fn from(v: TimeError) -> Self {
119 MutexTimedLockError::FailureInInternalClockWhileWait(v)
120 }
121}
122
123impl<T: Debug> From<NanosleepError> for MutexTimedLockError<'_, '_, T> {
124 fn from(v: NanosleepError) -> Self {
125 MutexTimedLockError::NanosleepError(v)
126 }
127}
128
129impl<T: Debug> From<AdaptiveWaitError> for MutexTimedLockError<'_, '_, T> {
130 fn from(v: AdaptiveWaitError) -> Self {
131 MutexTimedLockError::AdaptiveWaitError(v)
132 }
133}
134
135#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
136pub enum MutexUnlockError {
137 OwnedByDifferentEntity,
138 UnknownError(i32),
139}
140
141#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
146pub enum MutexError {
147 CreationFailed,
148 LockFailed,
149 UnlockFailed,
150}
151
152impl<'mutex, 'handle, T: Debug> From<MutexLockError<'mutex, 'handle, T>> for MutexError {
153 fn from(_: MutexLockError<'mutex, 'handle, T>) -> Self {
154 MutexError::LockFailed
155 }
156}
157
158impl<'mutex, 'handle, T: Debug> From<MutexTimedLockError<'mutex, 'handle, T>> for MutexError {
159 fn from(_: MutexTimedLockError<'mutex, 'handle, T>) -> Self {
160 MutexError::LockFailed
161 }
162}
163
164impl From<MutexUnlockError> for MutexError {
165 fn from(_: MutexUnlockError) -> Self {
166 MutexError::UnlockFailed
167 }
168}
169
170impl From<MutexCreationError> for MutexError {
171 fn from(_: MutexCreationError) -> Self {
172 MutexError::CreationFailed
173 }
174}
175
176#[derive(Debug)]
195pub struct MutexGuard<'mutex, 'handle, T: Debug> {
196 mutex: &'mutex Mutex<'handle, T>,
197}
198
199unsafe impl<T: Send + Debug> Send for MutexGuard<'_, '_, T> {}
200unsafe impl<T: Send + Sync + Debug> Sync for MutexGuard<'_, '_, T> {}
201
202impl<T: Debug> Deref for MutexGuard<'_, '_, T> {
203 type Target = T;
204
205 fn deref(&self) -> &Self::Target {
206 unsafe { (*self.mutex.handle.value.get()).as_ref().unwrap() }
207 }
208}
209
210impl<T: Debug> DerefMut for MutexGuard<'_, '_, T> {
211 fn deref_mut(&mut self) -> &mut Self::Target {
212 unsafe { (*self.mutex.handle.value.get()).as_mut().unwrap() }
213 }
214}
215
216impl<T: Debug> Drop for MutexGuard<'_, '_, T> {
217 fn drop(&mut self) {
218 if self.mutex.release().is_err() {
219 fatal_panic!(from self.mutex, "This should never happen! The MutexGuard is unable to release the mutex.");
220 }
221 }
222}
223
224#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
226#[repr(i32)]
227pub enum MutexType {
228 Normal = posix::PTHREAD_MUTEX_NORMAL,
230 Recursive = posix::PTHREAD_MUTEX_RECURSIVE,
232 WithDeadlockDetection = posix::PTHREAD_MUTEX_ERRORCHECK,
235}
236
237#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
240#[repr(i32)]
241pub enum MutexPriorityInheritance {
242 None = posix::PTHREAD_PRIO_NONE,
244 Inherit = posix::PTHREAD_PRIO_INHERIT,
247 Protect = posix::PTHREAD_PRIO_PROTECT,
250}
251
252#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
254#[repr(i32)]
255pub enum MutexThreadTerminationBehavior {
256 StallWhenLocked = posix::PTHREAD_MUTEX_STALLED,
259
260 ReleaseWhenLocked = posix::PTHREAD_MUTEX_ROBUST,
273}
274
275#[derive(Debug)]
277pub struct MutexBuilder {
278 pub(crate) is_interprocess_capable: bool,
279 pub(crate) mutex_type: MutexType,
280 pub(crate) thread_termination_behavior: MutexThreadTerminationBehavior,
281 pub(crate) priority_inheritance: MutexPriorityInheritance,
282 pub(crate) priority_ceiling: Option<i32>,
283 pub(crate) clock_type: ClockType,
284}
285
286impl Default for MutexBuilder {
287 fn default() -> Self {
288 Self {
289 is_interprocess_capable: true,
290 mutex_type: MutexType::Normal,
291 priority_inheritance: MutexPriorityInheritance::None,
292 priority_ceiling: None,
293 thread_termination_behavior: MutexThreadTerminationBehavior::StallWhenLocked,
294 clock_type: ClockType::default(),
295 }
296 }
297}
298
299impl MutexBuilder {
300 pub fn new() -> Self {
301 Self::default()
302 }
303
304 pub fn clock_type(mut self, clock_type: ClockType) -> Self {
306 self.clock_type = clock_type;
307 self
308 }
309
310 pub fn is_interprocess_capable(mut self, value: bool) -> Self {
312 self.is_interprocess_capable = value;
313 self
314 }
315
316 pub fn mutex_type(mut self, value: MutexType) -> Self {
318 self.mutex_type = value;
319 self
320 }
321
322 pub fn thread_termination_behavior(mut self, value: MutexThreadTerminationBehavior) -> Self {
324 self.thread_termination_behavior = value;
325 self
326 }
327
328 pub fn priority_inheritance(mut self, value: MutexPriorityInheritance) -> Self {
330 self.priority_inheritance = value;
331 self
332 }
333
334 pub fn priority_ceiling(mut self, value: i32) -> Self {
337 self.priority_ceiling = Some(value);
338 self
339 }
340
341 fn initialize_mutex<T: Debug>(
342 &self,
343 mutex: *mut posix::pthread_mutex_t,
344 ) -> Result<Capability, MutexCreationError> {
345 let msg = "Unable to create mutex";
346
347 let mut mutex_attributes = ScopeGuardBuilder::new(posix::pthread_mutexattr_t::new())
348 .on_init(
349 |attr| match unsafe { posix::pthread_mutexattr_init(attr) } {
350 0 => Ok(()),
351 _ => {
352 fail!(from self, with MutexCreationError::InsufficientMemory,
353 "{} since the mutex attribute initialization failed.", msg);
354 }
355 },
356 )
357 .on_drop(
358 |attr| match unsafe { posix::pthread_mutexattr_destroy(attr) } {
359 0 => (),
360 _ => {
361 fatal_panic!(
362 "Mutex<{}>, failed to destroy mutex attributes - possible leak?",
363 std::any::type_name::<T>()
364 );
365 }
366 },
367 )
368 .create()?;
369
370 if self.is_interprocess_capable
371 && unsafe {
372 posix::pthread_mutexattr_setpshared(
373 mutex_attributes.get_mut(),
374 posix::PTHREAD_PROCESS_SHARED,
375 )
376 } != 0
377 {
378 fail!(from self, with MutexCreationError::NoInterProcessSupport,
379 "{} due to a failure while setting the inter process flag in mutex attributes.", msg);
380 }
381
382 if unsafe {
383 posix::pthread_mutexattr_settype(mutex_attributes.get_mut(), self.mutex_type as i32)
384 } != 0
385 {
386 fail!(from self, with MutexCreationError::UnableToSetType,
387 "{} due to a failure while setting the mutex type in mutex attributes.", msg);
388 }
389
390 if unsafe {
391 posix::pthread_mutexattr_setprotocol(
392 mutex_attributes.get_mut(),
393 self.priority_inheritance as i32,
394 )
395 } != 0
396 {
397 fail!(from self, with MutexCreationError::UnableToSetProtocol,
398 "{} due to a failure while setting the mutex protocol in mutex attributes.", msg);
399 }
400
401 if unsafe {
402 posix::pthread_mutexattr_setrobust(
403 mutex_attributes.get_mut(),
404 self.thread_termination_behavior as i32,
405 )
406 } != 0
407 {
408 fail!(from self, with MutexCreationError::UnableToSetThreadTerminationBehavior,
409 "{} due to a failure while setting the mutex thread termination behavior in mutex attributes.", msg);
410 }
411
412 if self.priority_ceiling.is_some() {
413 let msg = "Failed to set the mutex priority ceiling";
414 handle_errno!(MutexCreationError, from self,
415 errno_source unsafe {
416 posix::pthread_mutexattr_setprioceiling(
417 mutex_attributes
418 .get_mut(), self.priority_ceiling.unwrap())
419 }.into(),
420 continue_on_success,
421 success Errno::ESUCCES => (),
422 Errno::EPERM => (UnableToSetPriorityCeiling, "{} since the user does not have enough permissions to set them.", msg),
423 v => (UnableToSetPriorityCeiling, "{} since an unknown error has occurred ({}).",msg, v)
424 );
425 }
426
427 match unsafe { posix::pthread_mutex_init(mutex, mutex_attributes.get()) }.into() {
428 Errno::ESUCCES => (),
429 Errno::ENOMEM => {
430 fail!(from self, with MutexCreationError::InsufficientMemory, "{} due to insufficient memory.", msg);
431 }
432 Errno::EAGAIN => {
433 fail!(from self, with MutexCreationError::InsufficientResources,
434 "{} due to insufficient resources.",
435 msg);
436 }
437 Errno::EPERM => {
438 fail!(from self, with MutexCreationError::InsufficientPermissions,
439 "{} due to insufficient permissions.", msg
440 );
441 }
442 v => {
443 fail!(from self, with MutexCreationError::UnknownError(v as i32),
444 "{} since an unknown error occurred ({})", msg, v);
445 }
446 };
447
448 match self.is_interprocess_capable {
449 true => Ok(Capability::InterProcess),
450 false => Ok(Capability::ProcessLocal),
451 }
452 }
453
454 pub fn create<T: Debug>(
456 self,
457 t: T,
458 handle: &MutexHandle<T>,
459 ) -> Result<Mutex<T>, MutexCreationError> {
460 unsafe {
461 handle
462 .handle
463 .initialize(|mtx| self.initialize_mutex::<T>(mtx))?
464 };
465
466 unsafe { *handle.clock_type.get() = self.clock_type };
467 unsafe { *handle.value.get() = Some(t) };
468
469 Ok(Mutex::new(handle))
470 }
471}
472
473#[derive(Debug)]
474pub struct MutexHandle<T: Sized + Debug> {
475 pub(crate) handle: HandleStorage<posix::pthread_mutex_t>,
476 clock_type: UnsafeCell<ClockType>,
477 value: UnsafeCell<Option<T>>,
478}
479
480unsafe impl<T: Sized + Debug> Send for MutexHandle<T> {}
481unsafe impl<T: Sized + Debug> Sync for MutexHandle<T> {}
482
483impl<T: Sized + Debug> Drop for MutexHandle<T> {
484 fn drop(&mut self) {
485 if self.handle.is_initialized() {
486 unsafe {
487 self.handle.cleanup(|mtx| {
488 if posix::pthread_mutex_destroy(mtx) != 0 {
489 warn!(from self,
490 "Unable to destroy mutex. Was it already destroyed by another instance in another process?");
491 }
492 })
493 };
494 }
495 }
496}
497
498impl<T: Sized + Debug> Handle for MutexHandle<T> {
499 fn new() -> Self {
500 Self {
501 handle: HandleStorage::new(posix::pthread_mutex_t::new()),
502 clock_type: UnsafeCell::new(ClockType::default()),
503 value: UnsafeCell::new(None),
504 }
505 }
506
507 fn is_initialized(&self) -> bool {
508 self.handle.is_initialized()
509 }
510
511 fn is_inter_process_capable(&self) -> bool {
512 self.handle.is_inter_process_capable()
513 }
514}
515
516impl<T: Sized + Debug> MutexHandle<T> {
517 fn clock_type(&self) -> ClockType {
518 unsafe { *self.clock_type.get() }
519 }
520}
521
522#[derive(Debug)]
552pub struct Mutex<'a, T: Sized + Debug> {
553 pub(crate) handle: &'a MutexHandle<T>,
554}
555
556unsafe impl<T: Sized + Send + Debug> Send for Mutex<'_, T> {}
557unsafe impl<T: Sized + Send + Debug> Sync for Mutex<'_, T> {}
558
559impl<'a, T: Debug> IpcConstructible<'a, MutexHandle<T>> for Mutex<'a, T> {
560 fn new(handle: &'a MutexHandle<T>) -> Self {
561 Self { handle }
562 }
563}
564
565impl<'a, T: Debug> IpcCapable<'a, MutexHandle<T>> for Mutex<'a, T> {
566 fn is_interprocess_capable(&self) -> bool {
567 self.handle.is_inter_process_capable()
568 }
569}
570
571impl<T: Debug> Mutex<'_, T> {
572 pub fn lock(&self) -> Result<MutexGuard<'_, '_, T>, MutexLockError<'_, '_, T>> {
581 let msg = "Failed to lock";
582 handle_errno!(MutexLockError, from self,
583 errno_source unsafe { posix::pthread_mutex_lock(self.handle.handle.get()) }.into(),
584 success Errno::ESUCCES => MutexGuard { mutex: self },
585 Errno::EAGAIN => (ExceededMaximumNumberOfRecursiveLocks, "{} since the maximum number of recursive locks exceeded.", msg),
586 Errno::EDEADLK => (DeadlockDetected, "{} since the operation would lead to a deadlock.", msg),
587 Errno::EOWNERDEAD => (LockAcquiredButOwnerDied(MutexGuard { mutex: self }), "{} since the thread/process holding the mutex died.", msg),
588 Errno::ENOTRECOVERABLE => (UnrecoverableState, "{} since the thread/process holding the mutex died and the next owner did not repair the state with Mutex::make_consistent.", msg),
589 v => (UnknownError(v as i32), "{} since an unknown error occurred while acquiring the lock ({})", msg, v)
590 );
591 }
592
593 pub fn try_lock(&self) -> Result<Option<MutexGuard<'_, '_, T>>, MutexLockError<'_, '_, T>> {
603 let msg = "Try lock failed";
604 handle_errno!(MutexLockError, from self,
605 errno_source unsafe { posix::pthread_mutex_trylock(self.handle.handle.get()) }.into(),
606 success Errno::ESUCCES => Some(MutexGuard { mutex: self });
607 success Errno::EDEADLK => None;
608 success Errno::EBUSY => None,
609 Errno::EAGAIN => (ExceededMaximumNumberOfRecursiveLocks, "{} since the maximum number of recursive locks exceeded.", msg),
610 Errno::EOWNERDEAD => (LockAcquiredButOwnerDied(MutexGuard { mutex: self }), "{} since the thread/process holding the mutex dies.", msg),
611 Errno::ENOTRECOVERABLE => (UnrecoverableState, "{} since the thread/process holding the mutex died and the next owner did not repair the state with Mutex::make_consistent.", msg),
612 v => (UnknownError(v as i32), "{} since unknown error occurred while acquiring the lock ({})", msg, v)
613 );
614 }
615
616 pub fn timed_lock(
626 &self,
627 duration: Duration,
628 ) -> Result<Option<MutexGuard<'_, '_, T>>, MutexTimedLockError<'_, '_, T>> {
629 let msg = "Timed lock failed";
630
631 match self.handle.clock_type() {
632 ClockType::Realtime => {
633 let now = fail!(from self, when Time::now_with_clock(ClockType::Realtime),
634 "{} due to a failure while acquiring current system time.", msg);
635 let timeout = now.as_duration() + duration;
636 handle_errno!(MutexTimedLockError, from self,
637 errno_source unsafe { posix::pthread_mutex_timedlock(self.handle.handle.get(), &timeout.as_timespec()) }.into(),
638 success Errno::ESUCCES => Some(MutexGuard { mutex: self });
639 success Errno::ETIMEDOUT => None;
640 success Errno::EDEADLK => None,
641 Errno::EAGAIN => (MutexLockError(MutexLockError::ExceededMaximumNumberOfRecursiveLocks), "{} since the maximum number of recursive locks exceeded.", msg),
642 Errno::EINVAL => (TimeoutExceedsMaximumSupportedDuration, "{} since the timeout of {:?} exceeds the maximum supported duration.", msg, duration),
643 Errno::ENOTRECOVERABLE => (MutexLockError(MutexLockError::UnrecoverableState), "{} since the thread/process holding the mutex died and the next owner did not repair the state with Mutex::make_consistent.", msg),
644 v => (MutexLockError(MutexLockError::UnknownError(v as i32)), "{} since unknown error occurred while acquiring the lock ({})", msg, v)
645 )
646 }
647 ClockType::Monotonic => {
648 let time = fail!(from self, when Time::now_with_clock(ClockType::Monotonic),
649 "{} due to a failure while acquiring current system time.", msg);
650 let mut adaptive_wait = fail!(from self, when AdaptiveWaitBuilder::new()
651 .clock_type(self.handle.clock_type())
652 .create(), "{} since the adaptive wait could not be created.", msg);
653
654 loop {
655 match self.try_lock() {
656 Ok(Some(v)) => return Ok(Some(v)),
657 Ok(None) => match fail!(from self, when time.elapsed(),
658 "{} due to a failure while acquiring elapsed system time.", msg)
659 < duration
660 {
661 true => {
662 fail!(from self, when adaptive_wait.wait(), "{} since AdaptiveWait failed.", msg);
663 }
664 false => return Ok(None),
665 },
666 Err(v) => {
667 fail!(from self, with MutexTimedLockError::MutexLockError(v),
668 "{} since timed lock failed for duration {:?}.", msg, duration);
669 }
670 }
671 }
672 }
673 }
674 }
675
676 pub fn make_consistent(&self) {
683 if unsafe { posix::pthread_mutex_consistent(self.handle.handle.get()) } != 0 {
684 warn!(from self, "pthread_mutex_consistent has no effect since either the mutex was not a robust mutex or the mutex was not in an inconsistent state.");
685 }
686 }
687
688 pub fn priority_ceiling(&self) -> Result<i32, MutexGetPrioCeilingError> {
690 let mut value: i32 = 0;
691 let msg = "Unable to acquire priority ceiling";
692 handle_errno!(MutexGetPrioCeilingError, from self,
693 errno_source unsafe { posix::pthread_mutex_getprioceiling(self.handle.handle.get(), &mut value) }.into(),
694 success Errno::ESUCCES => value,
695 Errno::EPERM => (InsufficientPermissions, "{} due to insufficient permissions.", msg),
696 v => (UnknownError(v as i32), "{} since an unknown error occurred ({}).", msg, v)
697 );
698 }
699
700 pub fn set_priority_ceiling(&self, value: i32) -> Result<i32, MutexSetPrioCeilingError> {
702 let mut previous_value: i32 = 0;
703 let msg = "Unable to set priority ceiling";
704 handle_errno!(MutexSetPrioCeilingError, from self,
705 errno_source unsafe { posix::pthread_mutex_setprioceiling(self.handle.handle.get(), value, &mut previous_value) }.into(),
706 success Errno::ESUCCES => previous_value,
707 Errno::EINVAL => (ValueOutOfRange, "{} since the new priority ceiling value {} is out of range.", msg, value),
708 Errno::EPERM => (InsufficientPermissions, "{} due to insufficient permissions.", msg),
709 v => (UnknownError(v as i32), "{} since an unknown error occurred ({}).", msg, v)
710 );
711 }
712
713 pub(crate) fn release(&self) -> Result<(), MutexUnlockError> {
714 let msg = "Unable to release lock";
715 handle_errno!(MutexUnlockError, from self,
716 errno_source unsafe { posix::pthread_mutex_unlock(self.handle.handle.get()) }.into(),
717 success Errno::ESUCCES => (),
718 Errno::EPERM => (OwnedByDifferentEntity, "{} since the current thread/process does not own the lock", msg),
719 v => (UnknownError(v as i32), "{} since an unknown error occurred ({}).", msg, v)
720 );
721 }
722}