1pub use crate::ipc_capable::{Handle, IpcCapable};
48
49use crate::ipc_capable::internal::{Capability, HandleStorage, IpcConstructible};
50use core::cell::UnsafeCell;
51use core::fmt::Debug;
52use core::ops::{Deref, DerefMut};
53use core::time::Duration;
54use iceoryx2_bb_elementary::scope_guard::*;
55use iceoryx2_bb_log::{fail, fatal_panic, warn};
56use iceoryx2_pal_posix::*;
57
58use crate::adaptive_wait::*;
59use crate::clock::{AsTimespec, ClockType, NanosleepError, Time, TimeError};
60use crate::handle_errno;
61use iceoryx2_pal_posix::posix::errno::Errno;
62use iceoryx2_pal_posix::posix::Struct;
63
64#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
65pub enum MutexCreationError {
66 InsufficientMemory,
67 InsufficientResources,
68 InsufficientPermissions,
69 NoInterProcessSupport,
70 UnableToSetType,
71 UnableToSetProtocol,
72 UnableToSetThreadTerminationBehavior,
73 UnknownError(i32),
74}
75
76#[derive(Debug, PartialEq, Eq)]
77pub enum MutexLockError<'mutex, 'handle, T: Sized + Debug> {
78 ExceededMaximumNumberOfRecursiveLocks,
79 DeadlockDetected,
80 LockAcquiredButOwnerDied(MutexGuard<'mutex, 'handle, T>),
81 UnrecoverableState,
82 UnknownError(i32),
83}
84
85impl<T: Sized + Debug> PartialEq for MutexGuard<'_, '_, T> {
86 fn eq(&self, other: &Self) -> bool {
87 core::ptr::eq(self.mutex, other.mutex)
88 }
89}
90
91impl<T: Sized + Debug> Eq for MutexGuard<'_, '_, T> {}
92
93#[derive(Debug, PartialEq, Eq)]
94pub enum MutexTimedLockError<'mutex, 'handle, T: Sized + Debug> {
95 TimeoutExceedsMaximumSupportedDuration,
96 MutexLockError(MutexLockError<'mutex, 'handle, T>),
97 NanosleepError(NanosleepError),
98 AdaptiveWaitError(AdaptiveWaitError),
99 FailureInInternalClockWhileWait(TimeError),
100}
101
102impl<T: Debug> From<TimeError> for MutexTimedLockError<'_, '_, T> {
103 fn from(v: TimeError) -> Self {
104 MutexTimedLockError::FailureInInternalClockWhileWait(v)
105 }
106}
107
108impl<T: Debug> From<NanosleepError> for MutexTimedLockError<'_, '_, T> {
109 fn from(v: NanosleepError) -> Self {
110 MutexTimedLockError::NanosleepError(v)
111 }
112}
113
114impl<T: Debug> From<AdaptiveWaitError> for MutexTimedLockError<'_, '_, T> {
115 fn from(v: AdaptiveWaitError) -> Self {
116 MutexTimedLockError::AdaptiveWaitError(v)
117 }
118}
119
120#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
121pub enum MutexUnlockError {
122 OwnedByDifferentEntity,
123 UnknownError(i32),
124}
125
126#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
131pub enum MutexError {
132 CreationFailed,
133 LockFailed,
134 UnlockFailed,
135}
136
137impl<'mutex, 'handle, T: Debug> From<MutexLockError<'mutex, 'handle, T>> for MutexError {
138 fn from(_: MutexLockError<'mutex, 'handle, T>) -> Self {
139 MutexError::LockFailed
140 }
141}
142
143impl<'mutex, 'handle, T: Debug> From<MutexTimedLockError<'mutex, 'handle, T>> for MutexError {
144 fn from(_: MutexTimedLockError<'mutex, 'handle, T>) -> Self {
145 MutexError::LockFailed
146 }
147}
148
149impl From<MutexUnlockError> for MutexError {
150 fn from(_: MutexUnlockError) -> Self {
151 MutexError::UnlockFailed
152 }
153}
154
155impl From<MutexCreationError> for MutexError {
156 fn from(_: MutexCreationError) -> Self {
157 MutexError::CreationFailed
158 }
159}
160
161#[derive(Debug)]
180pub struct MutexGuard<'mutex, 'handle, T: Debug> {
181 mutex: &'mutex Mutex<'handle, T>,
182}
183
184unsafe impl<T: Send + Debug> Send for MutexGuard<'_, '_, T> {}
185unsafe impl<T: Send + Sync + Debug> Sync for MutexGuard<'_, '_, T> {}
186
187impl<T: Debug> Deref for MutexGuard<'_, '_, T> {
188 type Target = T;
189
190 fn deref(&self) -> &Self::Target {
191 unsafe { (*self.mutex.handle.value.get()).as_ref().unwrap() }
192 }
193}
194
195impl<T: Debug> DerefMut for MutexGuard<'_, '_, T> {
196 fn deref_mut(&mut self) -> &mut Self::Target {
197 unsafe { (*self.mutex.handle.value.get()).as_mut().unwrap() }
198 }
199}
200
201impl<T: Debug> Drop for MutexGuard<'_, '_, T> {
202 fn drop(&mut self) {
203 if self.mutex.release().is_err() {
204 fatal_panic!(from self.mutex, "This should never happen! The MutexGuard is unable to release the mutex.");
205 }
206 }
207}
208
209#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
211#[repr(i32)]
212pub enum MutexType {
213 Normal = posix::PTHREAD_MUTEX_NORMAL,
215 Recursive = posix::PTHREAD_MUTEX_RECURSIVE,
217 WithDeadlockDetection = posix::PTHREAD_MUTEX_ERRORCHECK,
220}
221
222#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
224#[repr(i32)]
225pub enum MutexThreadTerminationBehavior {
226 StallWhenLocked = posix::PTHREAD_MUTEX_STALLED,
229
230 ReleaseWhenLocked = posix::PTHREAD_MUTEX_ROBUST,
243}
244
245#[derive(Debug)]
247pub struct MutexBuilder {
248 pub(crate) is_interprocess_capable: bool,
249 pub(crate) mutex_type: MutexType,
250 pub(crate) thread_termination_behavior: MutexThreadTerminationBehavior,
251 pub(crate) clock_type: ClockType,
252}
253
254impl Default for MutexBuilder {
255 fn default() -> Self {
256 Self {
257 is_interprocess_capable: true,
258 mutex_type: MutexType::Normal,
259 thread_termination_behavior: MutexThreadTerminationBehavior::StallWhenLocked,
260 clock_type: ClockType::default(),
261 }
262 }
263}
264
265impl MutexBuilder {
266 pub fn new() -> Self {
267 Self::default()
268 }
269
270 pub fn clock_type(mut self, clock_type: ClockType) -> Self {
272 self.clock_type = clock_type;
273 self
274 }
275
276 pub fn is_interprocess_capable(mut self, value: bool) -> Self {
278 self.is_interprocess_capable = value;
279 self
280 }
281
282 pub fn mutex_type(mut self, value: MutexType) -> Self {
284 self.mutex_type = value;
285 self
286 }
287
288 pub fn thread_termination_behavior(mut self, value: MutexThreadTerminationBehavior) -> Self {
290 self.thread_termination_behavior = value;
291 self
292 }
293
294 fn initialize_mutex<T: Debug>(
295 &self,
296 mutex: *mut posix::pthread_mutex_t,
297 ) -> Result<Capability, MutexCreationError> {
298 let msg = "Unable to create mutex";
299
300 let mut mutex_attributes = ScopeGuardBuilder::new(posix::pthread_mutexattr_t::new())
301 .on_init(
302 |attr| match unsafe { posix::pthread_mutexattr_init(attr) } {
303 0 => Ok(()),
304 _ => {
305 fail!(from self, with MutexCreationError::InsufficientMemory,
306 "{} since the mutex attribute initialization failed.", msg);
307 }
308 },
309 )
310 .on_drop(
311 |attr| match unsafe { posix::pthread_mutexattr_destroy(attr) } {
312 0 => (),
313 _ => {
314 fatal_panic!(
315 "Mutex<{}>, failed to destroy mutex attributes - possible leak?",
316 core::any::type_name::<T>()
317 );
318 }
319 },
320 )
321 .create()?;
322
323 if self.is_interprocess_capable
324 && unsafe {
325 posix::pthread_mutexattr_setpshared(
326 mutex_attributes.get_mut(),
327 posix::PTHREAD_PROCESS_SHARED,
328 )
329 } != 0
330 {
331 fail!(from self, with MutexCreationError::NoInterProcessSupport,
332 "{} due to a failure while setting the inter process flag in mutex attributes.", msg);
333 }
334
335 if unsafe {
336 posix::pthread_mutexattr_settype(mutex_attributes.get_mut(), self.mutex_type as i32)
337 } != 0
338 {
339 fail!(from self, with MutexCreationError::UnableToSetType,
340 "{} due to a failure while setting the mutex type in mutex attributes.", msg);
341 }
342
343 if unsafe {
344 posix::pthread_mutexattr_setprotocol(
345 mutex_attributes.get_mut(),
346 posix::PTHREAD_PRIO_NONE,
347 )
348 } != 0
349 {
350 fail!(from self, with MutexCreationError::UnableToSetProtocol,
351 "{} due to a failure while setting the mutex protocol in mutex attributes.", msg);
352 }
353
354 if unsafe {
355 posix::pthread_mutexattr_setrobust(
356 mutex_attributes.get_mut(),
357 self.thread_termination_behavior as i32,
358 )
359 } != 0
360 {
361 fail!(from self, with MutexCreationError::UnableToSetThreadTerminationBehavior,
362 "{} due to a failure while setting the mutex thread termination behavior in mutex attributes.", msg);
363 }
364
365 match unsafe { posix::pthread_mutex_init(mutex, mutex_attributes.get()) }.into() {
366 Errno::ESUCCES => (),
367 Errno::ENOMEM => {
368 fail!(from self, with MutexCreationError::InsufficientMemory, "{} due to insufficient memory.", msg);
369 }
370 Errno::EAGAIN => {
371 fail!(from self, with MutexCreationError::InsufficientResources,
372 "{} due to insufficient resources.",
373 msg);
374 }
375 Errno::EPERM => {
376 fail!(from self, with MutexCreationError::InsufficientPermissions,
377 "{} due to insufficient permissions.", msg
378 );
379 }
380 v => {
381 fail!(from self, with MutexCreationError::UnknownError(v as i32),
382 "{} since an unknown error occurred ({})", msg, v);
383 }
384 };
385
386 match self.is_interprocess_capable {
387 true => Ok(Capability::InterProcess),
388 false => Ok(Capability::ProcessLocal),
389 }
390 }
391
392 pub fn create<T: Debug>(
394 self,
395 t: T,
396 handle: &MutexHandle<T>,
397 ) -> Result<Mutex<T>, MutexCreationError> {
398 unsafe {
399 handle
400 .handle
401 .initialize(|mtx| self.initialize_mutex::<T>(mtx))?
402 };
403
404 unsafe { *handle.clock_type.get() = self.clock_type };
405 unsafe { *handle.value.get() = Some(t) };
406
407 Ok(Mutex::new(handle))
408 }
409}
410
411#[derive(Debug)]
412pub struct MutexHandle<T: Sized + Debug> {
413 pub(crate) handle: HandleStorage<posix::pthread_mutex_t>,
414 clock_type: UnsafeCell<ClockType>,
415 value: UnsafeCell<Option<T>>,
416}
417
418unsafe impl<T: Sized + Debug> Send for MutexHandle<T> {}
419unsafe impl<T: Sized + Debug> Sync for MutexHandle<T> {}
420
421impl<T: Sized + Debug> Drop for MutexHandle<T> {
422 fn drop(&mut self) {
423 if self.handle.is_initialized() {
424 unsafe {
425 self.handle.cleanup(|mtx| {
426 if posix::pthread_mutex_destroy(mtx) != 0 {
427 warn!(from self,
428 "Unable to destroy mutex. Was it already destroyed by another instance in another process?");
429 }
430 })
431 };
432 }
433 }
434}
435
436impl<T: Sized + Debug> Handle for MutexHandle<T> {
437 fn new() -> Self {
438 Self {
439 handle: HandleStorage::new(posix::pthread_mutex_t::new()),
440 clock_type: UnsafeCell::new(ClockType::default()),
441 value: UnsafeCell::new(None),
442 }
443 }
444
445 fn is_initialized(&self) -> bool {
446 self.handle.is_initialized()
447 }
448
449 fn is_inter_process_capable(&self) -> bool {
450 self.handle.is_inter_process_capable()
451 }
452}
453
454impl<T: Sized + Debug> MutexHandle<T> {
455 fn clock_type(&self) -> ClockType {
456 unsafe { *self.clock_type.get() }
457 }
458}
459
460#[derive(Debug)]
490pub struct Mutex<'a, T: Sized + Debug> {
491 pub(crate) handle: &'a MutexHandle<T>,
492}
493
494unsafe impl<T: Sized + Send + Debug> Send for Mutex<'_, T> {}
495unsafe impl<T: Sized + Send + Debug> Sync for Mutex<'_, T> {}
496
497impl<'a, T: Debug> IpcConstructible<'a, MutexHandle<T>> for Mutex<'a, T> {
498 fn new(handle: &'a MutexHandle<T>) -> Self {
499 Self { handle }
500 }
501}
502
503impl<'a, T: Debug> IpcCapable<'a, MutexHandle<T>> for Mutex<'a, T> {
504 fn is_interprocess_capable(&self) -> bool {
505 self.handle.is_inter_process_capable()
506 }
507}
508
509impl<T: Debug> Mutex<'_, T> {
510 pub fn lock(&self) -> Result<MutexGuard<'_, '_, T>, MutexLockError<'_, '_, T>> {
519 let msg = "Failed to lock";
520 handle_errno!(MutexLockError, from self,
521 errno_source unsafe { posix::pthread_mutex_lock(self.handle.handle.get()) }.into(),
522 success Errno::ESUCCES => MutexGuard { mutex: self },
523 Errno::EAGAIN => (ExceededMaximumNumberOfRecursiveLocks, "{} since the maximum number of recursive locks exceeded.", msg),
524 Errno::EDEADLK => (DeadlockDetected, "{} since the operation would lead to a deadlock.", msg),
525 Errno::EOWNERDEAD => (LockAcquiredButOwnerDied(MutexGuard { mutex: self }), "{} since the thread/process holding the mutex died.", msg),
526 Errno::ENOTRECOVERABLE => (UnrecoverableState, "{} since the thread/process holding the mutex died and the next owner did not repair the state with Mutex::make_consistent.", msg),
527 v => (UnknownError(v as i32), "{} since an unknown error occurred while acquiring the lock ({})", msg, v)
528 );
529 }
530
531 pub fn try_lock(&self) -> Result<Option<MutexGuard<'_, '_, T>>, MutexLockError<'_, '_, T>> {
541 let msg = "Try lock failed";
542 handle_errno!(MutexLockError, from self,
543 errno_source unsafe { posix::pthread_mutex_trylock(self.handle.handle.get()) }.into(),
544 success Errno::ESUCCES => Some(MutexGuard { mutex: self });
545 success Errno::EDEADLK => None;
546 success Errno::EBUSY => None,
547 Errno::EAGAIN => (ExceededMaximumNumberOfRecursiveLocks, "{} since the maximum number of recursive locks exceeded.", msg),
548 Errno::EOWNERDEAD => (LockAcquiredButOwnerDied(MutexGuard { mutex: self }), "{} since the thread/process holding the mutex dies.", msg),
549 Errno::ENOTRECOVERABLE => (UnrecoverableState, "{} since the thread/process holding the mutex died and the next owner did not repair the state with Mutex::make_consistent.", msg),
550 v => (UnknownError(v as i32), "{} since unknown error occurred while acquiring the lock ({})", msg, v)
551 );
552 }
553
554 pub fn timed_lock(
564 &self,
565 duration: Duration,
566 ) -> Result<Option<MutexGuard<'_, '_, T>>, MutexTimedLockError<'_, '_, T>> {
567 let msg = "Timed lock failed";
568
569 match self.handle.clock_type() {
570 ClockType::Realtime => {
571 let now = fail!(from self, when Time::now_with_clock(ClockType::Realtime),
572 "{} due to a failure while acquiring current system time.", msg);
573 let timeout = now.as_duration() + duration;
574 handle_errno!(MutexTimedLockError, from self,
575 errno_source unsafe { posix::pthread_mutex_timedlock(self.handle.handle.get(), &timeout.as_timespec()) }.into(),
576 success Errno::ESUCCES => Some(MutexGuard { mutex: self });
577 success Errno::ETIMEDOUT => None;
578 success Errno::EDEADLK => None,
579 Errno::EAGAIN => (MutexLockError(MutexLockError::ExceededMaximumNumberOfRecursiveLocks), "{} since the maximum number of recursive locks exceeded.", msg),
580 Errno::EINVAL => (TimeoutExceedsMaximumSupportedDuration, "{} since the timeout of {:?} exceeds the maximum supported duration.", msg, duration),
581 Errno::ENOTRECOVERABLE => (MutexLockError(MutexLockError::UnrecoverableState), "{} since the thread/process holding the mutex died and the next owner did not repair the state with Mutex::make_consistent.", msg),
582 v => (MutexLockError(MutexLockError::UnknownError(v as i32)), "{} since unknown error occurred while acquiring the lock ({})", msg, v)
583 )
584 }
585 ClockType::Monotonic => {
586 let time = fail!(from self, when Time::now_with_clock(ClockType::Monotonic),
587 "{} due to a failure while acquiring current system time.", msg);
588 let mut adaptive_wait = fail!(from self, when AdaptiveWaitBuilder::new()
589 .clock_type(self.handle.clock_type())
590 .create(), "{} since the adaptive wait could not be created.", msg);
591
592 loop {
593 match self.try_lock() {
594 Ok(Some(v)) => return Ok(Some(v)),
595 Ok(None) => match fail!(from self, when time.elapsed(),
596 "{} due to a failure while acquiring elapsed system time.", msg)
597 < duration
598 {
599 true => {
600 fail!(from self, when adaptive_wait.wait(), "{} since AdaptiveWait failed.", msg);
601 }
602 false => return Ok(None),
603 },
604 Err(v) => {
605 fail!(from self, with MutexTimedLockError::MutexLockError(v),
606 "{} since timed lock failed for duration {:?}.", msg, duration);
607 }
608 }
609 }
610 }
611 }
612 }
613
614 pub fn make_consistent(&self) {
621 if unsafe { posix::pthread_mutex_consistent(self.handle.handle.get()) } != 0 {
622 warn!(from self, "pthread_mutex_consistent has no effect since either the mutex was not a robust mutex or the mutex was not in an inconsistent state.");
623 }
624 }
625
626 pub(crate) fn release(&self) -> Result<(), MutexUnlockError> {
627 let msg = "Unable to release lock";
628 handle_errno!(MutexUnlockError, from self,
629 errno_source unsafe { posix::pthread_mutex_unlock(self.handle.handle.get()) }.into(),
630 success Errno::ESUCCES => (),
631 Errno::EPERM => (OwnedByDifferentEntity, "{} since the current thread/process does not own the lock", msg),
632 v => (UnknownError(v as i32), "{} since an unknown error occurred ({}).", msg, v)
633 );
634 }
635}