1pub use crate::ipc_capable::{Handle, IpcCapable};
48
49use crate::ipc_capable::internal::{Capability, HandleStorage, IpcConstructible};
50use core::cell::UnsafeCell;
51use core::fmt::Debug;
52use core::marker::PhantomData;
53use core::ops::{Deref, DerefMut};
54use core::time::Duration;
55use iceoryx2_bb_elementary::scope_guard::*;
56use iceoryx2_bb_log::{fail, fatal_panic, warn};
57use iceoryx2_pal_posix::posix::MemZeroedStruct;
58use iceoryx2_pal_posix::*;
59
60use crate::adaptive_wait::*;
61use crate::clock::{AsTimespec, ClockType, NanosleepError, Time, TimeError};
62use crate::handle_errno;
63use iceoryx2_pal_posix::posix::errno::Errno;
64
65#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
66pub enum MutexCreationError {
67 InsufficientMemory,
68 InsufficientResources,
69 InsufficientPermissions,
70 NoInterProcessSupport,
71 UnableToSetType,
72 UnableToSetProtocol,
73 UnableToSetThreadTerminationBehavior,
74 UnknownError(i32),
75}
76
77#[derive(Debug, PartialEq, Eq)]
78pub enum MutexLockError<'handle, T: Sized + Debug> {
79 ExceededMaximumNumberOfRecursiveLocks,
80 DeadlockDetected,
81 LockAcquiredButOwnerDied(MutexGuard<'handle, T>),
82 UnrecoverableState,
83 UnknownError(i32),
84}
85
86impl<T: Sized + Debug> PartialEq for MutexGuard<'_, T> {
87 fn eq(&self, other: &Self) -> bool {
88 core::ptr::eq(self.handle, other.handle)
89 }
90}
91
92impl<T: Sized + Debug> Eq for MutexGuard<'_, T> {}
93
94#[derive(Debug, PartialEq, Eq)]
95pub enum MutexTimedLockError<'handle, T: Sized + Debug> {
96 TimeoutExceedsMaximumSupportedDuration,
97 MutexLockError(MutexLockError<'handle, T>),
98 NanosleepError(NanosleepError),
99 AdaptiveWaitError(AdaptiveWaitError),
100 FailureInInternalClockWhileWait(TimeError),
101}
102
103impl<T: Debug> From<TimeError> for MutexTimedLockError<'_, T> {
104 fn from(v: TimeError) -> Self {
105 MutexTimedLockError::FailureInInternalClockWhileWait(v)
106 }
107}
108
109impl<T: Debug> From<NanosleepError> for MutexTimedLockError<'_, T> {
110 fn from(v: NanosleepError) -> Self {
111 MutexTimedLockError::NanosleepError(v)
112 }
113}
114
115impl<T: Debug> From<AdaptiveWaitError> for MutexTimedLockError<'_, T> {
116 fn from(v: AdaptiveWaitError) -> Self {
117 MutexTimedLockError::AdaptiveWaitError(v)
118 }
119}
120
121#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
122pub enum MutexUnlockError {
123 OwnedByDifferentEntity,
124 UnknownError(i32),
125}
126
127#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
132pub enum MutexError {
133 CreationFailed,
134 LockFailed,
135 UnlockFailed,
136}
137
138impl<'handle, T: Debug> From<MutexLockError<'handle, T>> for MutexError {
139 fn from(_: MutexLockError<'handle, T>) -> Self {
140 MutexError::LockFailed
141 }
142}
143
144impl<'handle, T: Debug> From<MutexTimedLockError<'handle, T>> for MutexError {
145 fn from(_: MutexTimedLockError<'handle, T>) -> Self {
146 MutexError::LockFailed
147 }
148}
149
150impl From<MutexUnlockError> for MutexError {
151 fn from(_: MutexUnlockError) -> Self {
152 MutexError::UnlockFailed
153 }
154}
155
156impl From<MutexCreationError> for MutexError {
157 fn from(_: MutexCreationError) -> Self {
158 MutexError::CreationFailed
159 }
160}
161
162#[derive(Debug)]
181pub struct MutexGuard<'handle, T: Debug> {
182 handle: &'handle MutexHandle<T>,
183}
184
185impl<T: Debug> Deref for MutexGuard<'_, T> {
186 type Target = T;
187
188 fn deref(&self) -> &Self::Target {
189 unsafe { (*self.handle.value.get()).as_ref().unwrap() }
190 }
191}
192
193impl<T: Debug> DerefMut for MutexGuard<'_, T> {
194 fn deref_mut(&mut self) -> &mut Self::Target {
195 unsafe { (*self.handle.value.get()).as_mut().unwrap() }
196 }
197}
198
199impl<T: Debug> MutexGuard<'_, T> {
200 pub(crate) fn release(&self) -> Result<(), MutexUnlockError> {
201 let msg = "Unable to release lock";
202 handle_errno!(MutexUnlockError, from self,
203 errno_source unsafe { posix::pthread_mutex_unlock(self.handle.handle.get()) }.into(),
204 success Errno::ESUCCES => (),
205 Errno::EPERM => (OwnedByDifferentEntity, "{} since the current thread/process does not own the lock", msg),
206 v => (UnknownError(v as i32), "{} since an unknown error occurred ({}).", msg, v)
207 );
208 }
209}
210
211impl<T: Debug> Drop for MutexGuard<'_, T> {
212 fn drop(&mut self) {
213 if self.release().is_err() {
214 fatal_panic!(from self.handle, "This should never happen! The MutexGuard is unable to release the mutex.");
215 }
216 }
217}
218
219#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
221#[repr(i32)]
222pub enum MutexType {
223 Normal = posix::PTHREAD_MUTEX_NORMAL,
225 Recursive = posix::PTHREAD_MUTEX_RECURSIVE,
227 WithDeadlockDetection = posix::PTHREAD_MUTEX_ERRORCHECK,
230}
231
232#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
234#[repr(i32)]
235pub enum MutexThreadTerminationBehavior {
236 StallWhenLocked = posix::PTHREAD_MUTEX_STALLED,
239
240 ReleaseWhenLocked = posix::PTHREAD_MUTEX_ROBUST,
253}
254
255#[derive(Debug)]
257pub struct MutexBuilder {
258 pub(crate) is_interprocess_capable: bool,
259 pub(crate) mutex_type: MutexType,
260 pub(crate) thread_termination_behavior: MutexThreadTerminationBehavior,
261 pub(crate) clock_type: ClockType,
262}
263
264impl Default for MutexBuilder {
265 fn default() -> Self {
266 Self {
267 is_interprocess_capable: true,
268 mutex_type: MutexType::Normal,
269 thread_termination_behavior: MutexThreadTerminationBehavior::StallWhenLocked,
270 clock_type: ClockType::default(),
271 }
272 }
273}
274
275impl MutexBuilder {
276 pub fn new() -> Self {
277 Self::default()
278 }
279
280 pub fn clock_type(mut self, clock_type: ClockType) -> Self {
282 self.clock_type = clock_type;
283 self
284 }
285
286 pub fn is_interprocess_capable(mut self, value: bool) -> Self {
288 self.is_interprocess_capable = value;
289 self
290 }
291
292 pub fn mutex_type(mut self, value: MutexType) -> Self {
294 self.mutex_type = value;
295 self
296 }
297
298 pub fn thread_termination_behavior(mut self, value: MutexThreadTerminationBehavior) -> Self {
300 self.thread_termination_behavior = value;
301 self
302 }
303
304 fn initialize_mutex<T: Debug>(
305 &self,
306 mutex: *mut posix::pthread_mutex_t,
307 ) -> Result<Capability, MutexCreationError> {
308 let msg = "Unable to create mutex";
309
310 let mut mutex_attributes = ScopeGuardBuilder::new(posix::pthread_mutexattr_t::new_zeroed())
311 .on_init(
312 |attr| match unsafe { posix::pthread_mutexattr_init(attr) } {
313 0 => Ok(()),
314 _ => {
315 fail!(from self, with MutexCreationError::InsufficientMemory,
316 "{} since the mutex attribute initialization failed.", msg);
317 }
318 },
319 )
320 .on_drop(
321 |attr| match unsafe { posix::pthread_mutexattr_destroy(attr) } {
322 0 => (),
323 _ => {
324 fatal_panic!(
325 "Mutex<{}>, failed to destroy mutex attributes - possible leak?",
326 core::any::type_name::<T>()
327 );
328 }
329 },
330 )
331 .create()?;
332
333 if self.is_interprocess_capable
334 && unsafe {
335 posix::pthread_mutexattr_setpshared(
336 mutex_attributes.get_mut(),
337 posix::PTHREAD_PROCESS_SHARED,
338 )
339 } != 0
340 {
341 fail!(from self, with MutexCreationError::NoInterProcessSupport,
342 "{} due to a failure while setting the inter process flag in mutex attributes.", msg);
343 }
344
345 if unsafe {
346 posix::pthread_mutexattr_settype(mutex_attributes.get_mut(), self.mutex_type as i32)
347 } != 0
348 {
349 fail!(from self, with MutexCreationError::UnableToSetType,
350 "{} due to a failure while setting the mutex type in mutex attributes.", msg);
351 }
352
353 if unsafe {
354 posix::pthread_mutexattr_setprotocol(
355 mutex_attributes.get_mut(),
356 posix::PTHREAD_PRIO_NONE,
357 )
358 } != 0
359 {
360 fail!(from self, with MutexCreationError::UnableToSetProtocol,
361 "{} due to a failure while setting the mutex protocol in mutex attributes.", msg);
362 }
363
364 if unsafe {
365 posix::pthread_mutexattr_setrobust(
366 mutex_attributes.get_mut(),
367 self.thread_termination_behavior as i32,
368 )
369 } != 0
370 {
371 fail!(from self, with MutexCreationError::UnableToSetThreadTerminationBehavior,
372 "{} due to a failure while setting the mutex thread termination behavior in mutex attributes.", msg);
373 }
374
375 match unsafe { posix::pthread_mutex_init(mutex, mutex_attributes.get()) }.into() {
376 Errno::ESUCCES => (),
377 Errno::ENOMEM => {
378 fail!(from self, with MutexCreationError::InsufficientMemory, "{} due to insufficient memory.", msg);
379 }
380 Errno::EAGAIN => {
381 fail!(from self, with MutexCreationError::InsufficientResources,
382 "{} due to insufficient resources.",
383 msg);
384 }
385 Errno::EPERM => {
386 fail!(from self, with MutexCreationError::InsufficientPermissions,
387 "{} due to insufficient permissions.", msg
388 );
389 }
390 v => {
391 fail!(from self, with MutexCreationError::UnknownError(v as i32),
392 "{} since an unknown error occurred ({})", msg, v);
393 }
394 };
395
396 match self.is_interprocess_capable {
397 true => Ok(Capability::InterProcess),
398 false => Ok(Capability::ProcessLocal),
399 }
400 }
401
402 pub fn create<T: Debug>(
404 self,
405 t: T,
406 handle: &MutexHandle<T>,
407 ) -> Result<Mutex<'_, '_, T>, MutexCreationError> {
408 unsafe {
409 handle
410 .handle
411 .initialize(|mtx| self.initialize_mutex::<T>(mtx))?
412 };
413
414 unsafe { *handle.clock_type.get() = self.clock_type };
415 unsafe { *handle.value.get() = Some(t) };
416
417 Ok(Mutex::new(handle))
418 }
419}
420
421#[derive(Debug)]
422pub struct MutexHandle<T: Sized + Debug> {
423 pub(crate) handle: HandleStorage<posix::pthread_mutex_t>,
424 clock_type: UnsafeCell<ClockType>,
425 value: UnsafeCell<Option<T>>,
426}
427
428unsafe impl<T: Sized + Debug> Send for MutexHandle<T> {}
429unsafe impl<T: Sized + Debug> Sync for MutexHandle<T> {}
430
431impl<T: Sized + Debug> Drop for MutexHandle<T> {
432 fn drop(&mut self) {
433 if self.handle.is_initialized() {
434 unsafe {
435 self.handle.cleanup(|mtx| {
436 if posix::pthread_mutex_destroy(mtx) != 0 {
437 warn!(from self,
438 "Unable to destroy mutex. Was it already destroyed by another instance in another process?");
439 }
440 })
441 };
442 }
443 }
444}
445
446impl<T: Sized + Debug> Handle for MutexHandle<T> {
447 fn new() -> Self {
448 Self {
449 handle: HandleStorage::new(posix::pthread_mutex_t::new_zeroed()),
450 clock_type: UnsafeCell::new(ClockType::default()),
451 value: UnsafeCell::new(None),
452 }
453 }
454
455 fn is_initialized(&self) -> bool {
456 self.handle.is_initialized()
457 }
458
459 fn is_inter_process_capable(&self) -> bool {
460 self.handle.is_inter_process_capable()
461 }
462}
463
464impl<T: Sized + Debug> MutexHandle<T> {
465 fn clock_type(&self) -> ClockType {
466 unsafe { *self.clock_type.get() }
467 }
468}
469
470#[derive(Debug)]
500pub struct Mutex<'this, 'handle: 'this, T: Sized + Debug> {
501 pub(crate) handle: &'handle MutexHandle<T>,
502 _lifetime: PhantomData<&'this ()>,
503}
504
505unsafe impl<T: Sized + Send + Debug> Send for Mutex<'_, '_, T> {}
506unsafe impl<T: Sized + Send + Debug> Sync for Mutex<'_, '_, T> {}
507
508impl<'handle, T: Debug> IpcConstructible<'handle, MutexHandle<T>> for Mutex<'_, 'handle, T> {
509 fn new(handle: &'handle MutexHandle<T>) -> Self {
510 Self {
511 handle,
512 _lifetime: PhantomData,
513 }
514 }
515}
516
517impl<'handle, T: Debug> IpcCapable<'handle, MutexHandle<T>> for Mutex<'_, 'handle, T> {
518 fn is_interprocess_capable(&self) -> bool {
519 self.handle.is_inter_process_capable()
520 }
521}
522
523impl<'this, 'handle: 'this, T: Debug> Mutex<'this, 'handle, T> {
524 pub unsafe fn from_handle(handle: &'handle MutexHandle<T>) -> Mutex<'this, 'handle, T> {
532 debug_assert!(handle.is_initialized());
533
534 Self::new(handle)
535 }
536
537 pub fn lock(&'this self) -> Result<MutexGuard<'handle, T>, MutexLockError<'handle, T>> {
546 let msg = "Failed to lock";
547 handle_errno!(MutexLockError, from self,
548 errno_source unsafe { posix::pthread_mutex_lock(self.handle.handle.get()) }.into(),
549 success Errno::ESUCCES => MutexGuard { handle: self.handle },
550 Errno::EAGAIN => (ExceededMaximumNumberOfRecursiveLocks, "{} since the maximum number of recursive locks exceeded.", msg),
551 Errno::EDEADLK => (DeadlockDetected, "{} since the operation would lead to a deadlock.", msg),
552 Errno::EOWNERDEAD => (LockAcquiredButOwnerDied(MutexGuard { handle: self.handle }), "{} since the thread/process holding the mutex died.", msg),
553 Errno::ENOTRECOVERABLE => (UnrecoverableState, "{} since the thread/process holding the mutex died and the next owner did not repair the state with Mutex::make_consistent.", msg),
554 v => (UnknownError(v as i32), "{} since an unknown error occurred while acquiring the lock ({})", msg, v)
555 );
556 }
557
558 pub fn try_lock(
568 &'this self,
569 ) -> Result<Option<MutexGuard<'handle, T>>, MutexLockError<'handle, T>> {
570 let msg = "Try lock failed";
571 handle_errno!(MutexLockError, from self,
572 errno_source unsafe { posix::pthread_mutex_trylock(self.handle.handle.get()) }.into(),
573 success Errno::ESUCCES => Some(MutexGuard { handle: self.handle });
574 success Errno::EDEADLK => None;
575 success Errno::EBUSY => None,
576 Errno::EAGAIN => (ExceededMaximumNumberOfRecursiveLocks, "{} since the maximum number of recursive locks exceeded.", msg),
577 Errno::EOWNERDEAD => (LockAcquiredButOwnerDied(MutexGuard { handle: self.handle }), "{} since the thread/process holding the mutex dies.", msg),
578 Errno::ENOTRECOVERABLE => (UnrecoverableState, "{} since the thread/process holding the mutex died and the next owner did not repair the state with Mutex::make_consistent.", msg),
579 v => (UnknownError(v as i32), "{} since unknown error occurred while acquiring the lock ({})", msg, v)
580 );
581 }
582
583 pub fn timed_lock(
593 &'this self,
594 duration: Duration,
595 ) -> Result<Option<MutexGuard<'handle, T>>, MutexTimedLockError<'handle, T>> {
596 let msg = "Timed lock failed";
597
598 match self.handle.clock_type() {
599 ClockType::Realtime => {
600 let now = fail!(from self, when Time::now_with_clock(ClockType::Realtime),
601 "{} due to a failure while acquiring current system time.", msg);
602 let timeout = now.as_duration() + duration;
603 handle_errno!(MutexTimedLockError, from self,
604 errno_source unsafe { posix::pthread_mutex_timedlock(self.handle.handle.get(), &timeout.as_timespec()) }.into(),
605 success Errno::ESUCCES => Some(MutexGuard { handle: self.handle });
606 success Errno::ETIMEDOUT => None;
607 success Errno::EDEADLK => None,
608 Errno::EAGAIN => (MutexLockError(MutexLockError::ExceededMaximumNumberOfRecursiveLocks), "{} since the maximum number of recursive locks exceeded.", msg),
609 Errno::EINVAL => (TimeoutExceedsMaximumSupportedDuration, "{} since the timeout of {:?} exceeds the maximum supported duration.", msg, duration),
610 Errno::ENOTRECOVERABLE => (MutexLockError(MutexLockError::UnrecoverableState), "{} since the thread/process holding the mutex died and the next owner did not repair the state with Mutex::make_consistent.", msg),
611 v => (MutexLockError(MutexLockError::UnknownError(v as i32)), "{} since unknown error occurred while acquiring the lock ({})", msg, v)
612 )
613 }
614 ClockType::Monotonic => {
615 let time = fail!(from self, when Time::now_with_clock(ClockType::Monotonic),
616 "{} due to a failure while acquiring current system time.", msg);
617 let mut adaptive_wait = fail!(from self, when AdaptiveWaitBuilder::new()
618 .clock_type(self.handle.clock_type())
619 .create(), "{} since the adaptive wait could not be created.", msg);
620
621 loop {
622 match self.try_lock() {
623 Ok(Some(v)) => return Ok(Some(v)),
624 Ok(None) => match fail!(from self, when time.elapsed(),
625 "{} due to a failure while acquiring elapsed system time.", msg)
626 < duration
627 {
628 true => {
629 fail!(from self, when adaptive_wait.wait(), "{} since AdaptiveWait failed.", msg);
630 }
631 false => return Ok(None),
632 },
633 Err(v) => {
634 fail!(from self, with MutexTimedLockError::MutexLockError(v),
635 "{} since timed lock failed for duration {:?}.", msg, duration);
636 }
637 }
638 }
639 }
640 }
641 }
642
643 pub fn make_consistent(&self) {
650 if unsafe { posix::pthread_mutex_consistent(self.handle.handle.get()) } != 0 {
651 warn!(from self, "pthread_mutex_consistent has no effect since either the mutex was not a robust mutex or the mutex was not in an inconsistent state.");
652 }
653 }
654}