1pub use crate::ipc_capable::{Handle, IpcCapable};
50
51use crate::ipc_capable::internal::{Capability, HandleStorage, IpcConstructible};
52
53use core::fmt::Debug;
54use core::marker::PhantomData;
55use core::ops::{Deref, DerefMut};
56use core::time::Duration;
57
58use iceoryx2_bb_concurrency::cell::UnsafeCell;
59use iceoryx2_bb_elementary::scope_guard::*;
60use iceoryx2_log::{fail, fatal_panic, warn};
61use iceoryx2_pal_posix::posix::MemZeroedStruct;
62use iceoryx2_pal_posix::*;
63
64use crate::adaptive_wait::*;
65use crate::clock::{AsTimespec, ClockType, NanosleepError, Time, TimeError};
66use iceoryx2_pal_posix::posix::errno::Errno;
67
68#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
69pub enum MutexCreationError {
70 InsufficientMemory,
71 InsufficientResources,
72 InsufficientPermissions,
73 NoInterProcessSupport,
74 UnableToSetType,
75 UnableToSetProtocol,
76 UnableToSetThreadTerminationBehavior,
77 UnknownError(i32),
78}
79
80#[derive(Debug, PartialEq, Eq)]
81pub enum MutexLockError<'handle, T: Sized + Debug> {
82 ExceededMaximumNumberOfRecursiveLocks,
83 DeadlockDetected,
84 LockAcquiredButOwnerDied(MutexGuard<'handle, T>),
85 UnrecoverableState,
86 UnknownError(i32),
87}
88
89impl<T: Sized + Debug> PartialEq for MutexGuard<'_, T> {
90 fn eq(&self, other: &Self) -> bool {
91 core::ptr::eq(self.handle, other.handle)
92 }
93}
94
95impl<T: Sized + Debug> Eq for MutexGuard<'_, T> {}
96
97#[derive(Debug, PartialEq, Eq)]
98pub enum MutexTimedLockError<'handle, T: Sized + Debug> {
99 TimeoutExceedsMaximumSupportedDuration,
100 MutexLockError(MutexLockError<'handle, T>),
101 NanosleepError(NanosleepError),
102 AdaptiveWaitError(AdaptiveWaitError),
103 FailureInInternalClockWhileWait(TimeError),
104}
105
106impl<T: Debug> From<TimeError> for MutexTimedLockError<'_, T> {
107 fn from(v: TimeError) -> Self {
108 MutexTimedLockError::FailureInInternalClockWhileWait(v)
109 }
110}
111
112impl<T: Debug> From<NanosleepError> for MutexTimedLockError<'_, T> {
113 fn from(v: NanosleepError) -> Self {
114 MutexTimedLockError::NanosleepError(v)
115 }
116}
117
118impl<T: Debug> From<AdaptiveWaitError> for MutexTimedLockError<'_, T> {
119 fn from(v: AdaptiveWaitError) -> Self {
120 MutexTimedLockError::AdaptiveWaitError(v)
121 }
122}
123
124#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
125pub enum MutexUnlockError {
126 OwnedByDifferentEntity,
127 UnknownError(i32),
128}
129
130#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
135pub enum MutexError {
136 CreationFailed,
137 LockFailed,
138 UnlockFailed,
139}
140
141impl<'handle, T: Debug> From<MutexLockError<'handle, T>> for MutexError {
142 fn from(_: MutexLockError<'handle, T>) -> Self {
143 MutexError::LockFailed
144 }
145}
146
147impl<'handle, T: Debug> From<MutexTimedLockError<'handle, T>> for MutexError {
148 fn from(_: MutexTimedLockError<'handle, T>) -> Self {
149 MutexError::LockFailed
150 }
151}
152
153impl From<MutexUnlockError> for MutexError {
154 fn from(_: MutexUnlockError) -> Self {
155 MutexError::UnlockFailed
156 }
157}
158
159impl From<MutexCreationError> for MutexError {
160 fn from(_: MutexCreationError) -> Self {
161 MutexError::CreationFailed
162 }
163}
164
165#[derive(Debug)]
186pub struct MutexGuard<'handle, T: Debug> {
187 handle: &'handle MutexHandle<T>,
188}
189
190impl<T: Debug> Deref for MutexGuard<'_, T> {
191 type Target = T;
192
193 fn deref(&self) -> &Self::Target {
194 unsafe { (*self.handle.value.get()).as_ref().unwrap() }
195 }
196}
197
198impl<T: Debug> DerefMut for MutexGuard<'_, T> {
199 fn deref_mut(&mut self) -> &mut Self::Target {
200 unsafe { (*self.handle.value.get()).as_mut().unwrap() }
201 }
202}
203
204impl<T: Debug> MutexGuard<'_, T> {
205 pub(crate) fn release(&self) -> Result<(), MutexUnlockError> {
206 let msg = "Unable to release lock";
207 handle_errno!(MutexUnlockError, from self,
208 errno_source unsafe { posix::pthread_mutex_unlock(self.handle.handle.get()) }.into(),
209 success Errno::ESUCCES => (),
210 Errno::EPERM => (OwnedByDifferentEntity, "{} since the current thread/process does not own the lock", msg),
211 v => (UnknownError(v as i32), "{} since an unknown error occurred ({}).", msg, v)
212 );
213 }
214}
215
216impl<T: Debug> Drop for MutexGuard<'_, T> {
217 fn drop(&mut self) {
218 if self.release().is_err() {
219 fatal_panic!(from self.handle, "This should never happen! The MutexGuard is unable to release the mutex.");
220 }
221 }
222}
223
224#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
226#[repr(i32)]
227pub enum MutexType {
228 Normal = posix::PTHREAD_MUTEX_NORMAL,
230 Recursive = posix::PTHREAD_MUTEX_RECURSIVE,
232 WithDeadlockDetection = posix::PTHREAD_MUTEX_ERRORCHECK,
235}
236
237#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
239#[repr(i32)]
240pub enum MutexThreadTerminationBehavior {
241 StallWhenLocked = posix::PTHREAD_MUTEX_STALLED,
244
245 ReleaseWhenLocked = posix::PTHREAD_MUTEX_ROBUST,
258}
259
260#[derive(Debug)]
262pub struct MutexBuilder {
263 pub(crate) is_interprocess_capable: bool,
264 pub(crate) mutex_type: MutexType,
265 pub(crate) thread_termination_behavior: MutexThreadTerminationBehavior,
266 pub(crate) clock_type: ClockType,
267}
268
269impl Default for MutexBuilder {
270 fn default() -> Self {
271 Self {
272 is_interprocess_capable: true,
273 mutex_type: MutexType::Normal,
274 thread_termination_behavior: MutexThreadTerminationBehavior::StallWhenLocked,
275 clock_type: ClockType::default(),
276 }
277 }
278}
279
280impl MutexBuilder {
281 pub fn new() -> Self {
282 Self::default()
283 }
284
285 pub fn clock_type(mut self, clock_type: ClockType) -> Self {
287 self.clock_type = clock_type;
288 self
289 }
290
291 pub fn is_interprocess_capable(mut self, value: bool) -> Self {
293 self.is_interprocess_capable = value;
294 self
295 }
296
297 pub fn mutex_type(mut self, value: MutexType) -> Self {
299 self.mutex_type = value;
300 self
301 }
302
303 pub fn thread_termination_behavior(mut self, value: MutexThreadTerminationBehavior) -> Self {
305 self.thread_termination_behavior = value;
306 self
307 }
308
309 fn initialize_mutex<T: Debug>(
310 &self,
311 mutex: *mut posix::pthread_mutex_t,
312 ) -> Result<Capability, MutexCreationError> {
313 let msg = "Unable to create mutex";
314
315 let mut mutex_attributes = ScopeGuardBuilder::new(posix::pthread_mutexattr_t::new_zeroed())
316 .on_init(
317 |attr| match unsafe { posix::pthread_mutexattr_init(attr) } {
318 0 => Ok(()),
319 _ => {
320 fail!(from self, with MutexCreationError::InsufficientMemory,
321 "{} since the mutex attribute initialization failed.", msg);
322 }
323 },
324 )
325 .on_drop(
326 |attr| match unsafe { posix::pthread_mutexattr_destroy(attr) } {
327 0 => (),
328 _ => {
329 fatal_panic!(
330 "Mutex<{}>, failed to destroy mutex attributes - possible leak?",
331 core::any::type_name::<T>()
332 );
333 }
334 },
335 )
336 .create()?;
337
338 if self.is_interprocess_capable
339 && unsafe {
340 posix::pthread_mutexattr_setpshared(
341 mutex_attributes.get_mut(),
342 posix::PTHREAD_PROCESS_SHARED,
343 )
344 } != 0
345 {
346 fail!(from self, with MutexCreationError::NoInterProcessSupport,
347 "{} due to a failure while setting the inter process flag in mutex attributes.", msg);
348 }
349
350 if unsafe {
351 posix::pthread_mutexattr_settype(mutex_attributes.get_mut(), self.mutex_type as i32)
352 } != 0
353 {
354 fail!(from self, with MutexCreationError::UnableToSetType,
355 "{} due to a failure while setting the mutex type in mutex attributes.", msg);
356 }
357
358 if unsafe {
359 posix::pthread_mutexattr_setprotocol(
360 mutex_attributes.get_mut(),
361 posix::PTHREAD_PRIO_NONE,
362 )
363 } != 0
364 {
365 fail!(from self, with MutexCreationError::UnableToSetProtocol,
366 "{} due to a failure while setting the mutex protocol in mutex attributes.", msg);
367 }
368
369 if unsafe {
370 posix::pthread_mutexattr_setrobust(
371 mutex_attributes.get_mut(),
372 self.thread_termination_behavior as i32,
373 )
374 } != 0
375 {
376 fail!(from self, with MutexCreationError::UnableToSetThreadTerminationBehavior,
377 "{} due to a failure while setting the mutex thread termination behavior in mutex attributes.", msg);
378 }
379
380 match unsafe { posix::pthread_mutex_init(mutex, mutex_attributes.get()) }.into() {
381 Errno::ESUCCES => (),
382 Errno::ENOMEM => {
383 fail!(from self, with MutexCreationError::InsufficientMemory, "{} due to insufficient memory.", msg);
384 }
385 Errno::EAGAIN => {
386 fail!(from self, with MutexCreationError::InsufficientResources,
387 "{} due to insufficient resources.",
388 msg);
389 }
390 Errno::EPERM => {
391 fail!(from self, with MutexCreationError::InsufficientPermissions,
392 "{} due to insufficient permissions.", msg
393 );
394 }
395 v => {
396 fail!(from self, with MutexCreationError::UnknownError(v as i32),
397 "{} since an unknown error occurred ({})", msg, v);
398 }
399 };
400
401 match self.is_interprocess_capable {
402 true => Ok(Capability::InterProcess),
403 false => Ok(Capability::ProcessLocal),
404 }
405 }
406
407 pub fn create<T: Debug>(
409 self,
410 t: T,
411 handle: &MutexHandle<T>,
412 ) -> Result<Mutex<'_, '_, T>, MutexCreationError> {
413 unsafe {
414 handle
415 .handle
416 .initialize(|mtx| self.initialize_mutex::<T>(mtx))?
417 };
418
419 unsafe { *handle.clock_type.get() = self.clock_type };
420 unsafe { *handle.value.get() = Some(t) };
421
422 Ok(Mutex::new(handle))
423 }
424}
425
426#[derive(Debug)]
427pub struct MutexHandle<T: Sized + Debug> {
428 pub(crate) handle: HandleStorage<posix::pthread_mutex_t>,
429 clock_type: UnsafeCell<ClockType>,
430 value: UnsafeCell<Option<T>>,
431}
432
433unsafe impl<T: Sized + Debug> Send for MutexHandle<T> {}
434unsafe impl<T: Sized + Debug> Sync for MutexHandle<T> {}
435
436impl<T: Sized + Debug> Drop for MutexHandle<T> {
437 fn drop(&mut self) {
438 if self.handle.is_initialized() {
439 unsafe {
440 self.handle.cleanup(|mtx| {
441 if posix::pthread_mutex_destroy(mtx) != 0 {
442 warn!(from self,
443 "Unable to destroy mutex. Was it already destroyed by another instance in another process?");
444 }
445 })
446 };
447 }
448 }
449}
450
451impl<T: Sized + Debug> Handle for MutexHandle<T> {
452 fn new() -> Self {
453 Self {
454 handle: HandleStorage::new(posix::pthread_mutex_t::new_zeroed()),
455 clock_type: UnsafeCell::new(ClockType::default()),
456 value: UnsafeCell::new(None),
457 }
458 }
459
460 fn is_initialized(&self) -> bool {
461 self.handle.is_initialized()
462 }
463
464 fn is_inter_process_capable(&self) -> bool {
465 self.handle.is_inter_process_capable()
466 }
467}
468
469impl<T: Sized + Debug> MutexHandle<T> {
470 fn clock_type(&self) -> ClockType {
471 unsafe { *self.clock_type.get() }
472 }
473}
474
475#[derive(Debug)]
507pub struct Mutex<'this, 'handle: 'this, T: Sized + Debug> {
508 pub(crate) handle: &'handle MutexHandle<T>,
509 _lifetime: PhantomData<&'this ()>,
510}
511
512unsafe impl<T: Sized + Send + Debug> Send for Mutex<'_, '_, T> {}
513unsafe impl<T: Sized + Send + Debug> Sync for Mutex<'_, '_, T> {}
514
515impl<'handle, T: Debug> IpcConstructible<'handle, MutexHandle<T>> for Mutex<'_, 'handle, T> {
516 fn new(handle: &'handle MutexHandle<T>) -> Self {
517 Self {
518 handle,
519 _lifetime: PhantomData,
520 }
521 }
522}
523
524impl<'handle, T: Debug> IpcCapable<'handle, MutexHandle<T>> for Mutex<'_, 'handle, T> {
525 fn is_interprocess_capable(&self) -> bool {
526 self.handle.is_inter_process_capable()
527 }
528}
529
530impl<'this, 'handle: 'this, T: Debug> Mutex<'this, 'handle, T> {
531 pub unsafe fn from_handle(handle: &'handle MutexHandle<T>) -> Mutex<'this, 'handle, T> {
539 debug_assert!(handle.is_initialized());
540
541 Self::new(handle)
542 }
543
544 pub fn lock(&'this self) -> Result<MutexGuard<'handle, T>, MutexLockError<'handle, T>> {
553 let msg = "Failed to lock";
554 handle_errno!(MutexLockError, from self,
555 errno_source unsafe { posix::pthread_mutex_lock(self.handle.handle.get()) }.into(),
556 success Errno::ESUCCES => MutexGuard { handle: self.handle },
557 Errno::EAGAIN => (ExceededMaximumNumberOfRecursiveLocks, "{} since the maximum number of recursive locks exceeded.", msg),
558 Errno::EDEADLK => (DeadlockDetected, "{} since the operation would lead to a deadlock.", msg),
559 Errno::EOWNERDEAD => (LockAcquiredButOwnerDied(MutexGuard { handle: self.handle }), "{} since the thread/process holding the mutex died.", msg),
560 Errno::ENOTRECOVERABLE => (UnrecoverableState, "{} since the thread/process holding the mutex died and the next owner did not repair the state with Mutex::make_consistent.", msg),
561 v => (UnknownError(v as i32), "{} since an unknown error occurred while acquiring the lock ({})", msg, v)
562 );
563 }
564
565 pub fn try_lock(
575 &'this self,
576 ) -> Result<Option<MutexGuard<'handle, T>>, MutexLockError<'handle, T>> {
577 let msg = "Try lock failed";
578 handle_errno!(MutexLockError, from self,
579 errno_source unsafe { posix::pthread_mutex_trylock(self.handle.handle.get()) }.into(),
580 success Errno::ESUCCES => Some(MutexGuard { handle: self.handle });
581 success Errno::EDEADLK => None;
582 success Errno::EBUSY => None,
583 Errno::EAGAIN => (ExceededMaximumNumberOfRecursiveLocks, "{} since the maximum number of recursive locks exceeded.", msg),
584 Errno::EOWNERDEAD => (LockAcquiredButOwnerDied(MutexGuard { handle: self.handle }), "{} since the thread/process holding the mutex dies.", msg),
585 Errno::ENOTRECOVERABLE => (UnrecoverableState, "{} since the thread/process holding the mutex died and the next owner did not repair the state with Mutex::make_consistent.", msg),
586 v => (UnknownError(v as i32), "{} since unknown error occurred while acquiring the lock ({})", msg, v)
587 );
588 }
589
590 pub fn timed_lock(
600 &'this self,
601 duration: Duration,
602 ) -> Result<Option<MutexGuard<'handle, T>>, MutexTimedLockError<'handle, T>> {
603 let msg = "Timed lock failed";
604
605 match self.handle.clock_type() {
606 ClockType::Realtime => {
607 let now = fail!(from self, when Time::now_with_clock(ClockType::Realtime),
608 "{} due to a failure while acquiring current system time.", msg);
609 let timeout = now.as_duration() + duration;
610 handle_errno!(MutexTimedLockError, from self,
611 errno_source unsafe { posix::pthread_mutex_timedlock(self.handle.handle.get(), &timeout.as_timespec()) }.into(),
612 success Errno::ESUCCES => Some(MutexGuard { handle: self.handle });
613 success Errno::ETIMEDOUT => None;
614 success Errno::EDEADLK => None,
615 Errno::EAGAIN => (MutexLockError(MutexLockError::ExceededMaximumNumberOfRecursiveLocks), "{} since the maximum number of recursive locks exceeded.", msg),
616 Errno::EINVAL => (TimeoutExceedsMaximumSupportedDuration, "{} since the timeout of {:?} exceeds the maximum supported duration.", msg, duration),
617 Errno::ENOTRECOVERABLE => (MutexLockError(MutexLockError::UnrecoverableState), "{} since the thread/process holding the mutex died and the next owner did not repair the state with Mutex::make_consistent.", msg),
618 v => (MutexLockError(MutexLockError::UnknownError(v as i32)), "{} since unknown error occurred while acquiring the lock ({})", msg, v)
619 )
620 }
621 ClockType::Monotonic => {
622 let time = fail!(from self, when Time::now_with_clock(ClockType::Monotonic),
623 "{} due to a failure while acquiring current system time.", msg);
624 let mut adaptive_wait = fail!(from self, when AdaptiveWaitBuilder::new()
625 .clock_type(self.handle.clock_type())
626 .create(), "{} since the adaptive wait could not be created.", msg);
627
628 loop {
629 match self.try_lock() {
630 Ok(Some(v)) => return Ok(Some(v)),
631 Ok(None) => match fail!(from self, when time.elapsed(),
632 "{} due to a failure while acquiring elapsed system time.", msg)
633 < duration
634 {
635 true => {
636 fail!(from self, when adaptive_wait.wait(), "{} since AdaptiveWait failed.", msg);
637 }
638 false => return Ok(None),
639 },
640 Err(v) => {
641 fail!(from self, with MutexTimedLockError::MutexLockError(v),
642 "{} since timed lock failed for duration {:?}.", msg, duration);
643 }
644 }
645 }
646 }
647 }
648 }
649
650 pub fn make_consistent(&self) {
657 if unsafe { posix::pthread_mutex_consistent(self.handle.handle.get()) } != 0 {
658 warn!(from self, "pthread_mutex_consistent has no effect since either the mutex was not a robust mutex or the mutex was not in an inconsistent state.");
659 }
660 }
661}