1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#[cfg(any(test, feature = "coop"))]
mod coop_impl {
use crate::coop::Operation;
use std::sync::Arc;
use tokio::sync::Mutex as TokioMutex;
/// A wrapper around tokio's Mutex that integrates with Bach's coop system.
///
/// This mutex implementation ensures proper interleaving simulation with Bach's
/// cooperative scheduling system.
pub struct Mutex<T: ?Sized> {
lock_op: Operation,
// Store the inner mutex in an Arc to enable owned_lock functionality
// This is required because tokio's lock_owned methods accept Arc<Mutex> not just Mutex
inner: Arc<TokioMutex<T>>,
}
impl<T: ?Sized> Mutex<T> {
/// Creates a new mutex with the given value.
pub fn new(value: T) -> Self
where
T: Sized,
{
Self {
inner: Arc::new(TokioMutex::new(value)),
lock_op: Operation::register(),
}
}
/// Acquires the mutex.
///
/// This method will register the lock operation with Bach's coop system,
/// ensuring proper interleaving exploration during simulation.
pub async fn lock(&self) -> MutexGuard<'_, T> {
use crate::tracing::Instrument;
let span = crate::tracing::debug_span!("mutex::lock");
async {
// First acquire the operation through the coop system
self.lock_op.acquire().await;
// Then acquire the actual lock
let guard = self.inner.lock().await;
MutexGuard { guard }
}
.instrument(span)
.await
}
/// Attempts to acquire the lock without waiting.
pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, tokio::sync::TryLockError> {
// Try to acquire the actual lock
match self.inner.try_lock() {
Ok(guard) => Ok(MutexGuard { guard }),
Err(err) => Err(err),
}
}
/// Acquires ownership of the mutex, returning an owned guard that can be held across await points.
///
/// This method will register the lock operation with Bach's coop system,
/// ensuring proper interleaving exploration during simulation.
pub async fn lock_owned(self: Arc<Self>) -> OwnedMutexGuard<T>
where
T: Sized,
{
use crate::tracing::Instrument;
let span = crate::tracing::debug_span!("mutex::lock_owned");
async {
// First acquire the operation through the coop system
self.lock_op.acquire().await;
// Use tokio's lock_owned method with our already Arc-wrapped inner mutex
let guard = self.inner.clone().lock_owned().await;
OwnedMutexGuard { guard }
}
.instrument(span)
.await
}
/// Attempts to acquire the lock in an owned fashion without waiting.
pub fn try_lock_owned(
self: Arc<Self>,
) -> Result<OwnedMutexGuard<T>, tokio::sync::TryLockError>
where
T: Sized,
{
match self.inner.clone().try_lock_owned() {
Ok(guard) => Ok(OwnedMutexGuard { guard }),
Err(err) => Err(err),
}
}
}
/// A guard that releases the mutex when dropped.
pub struct MutexGuard<'a, T: ?Sized> {
guard: tokio::sync::MutexGuard<'a, T>,
}
impl<'a, T: ?Sized> std::ops::Deref for MutexGuard<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.guard
}
}
impl<'a, T: ?Sized> std::ops::DerefMut for MutexGuard<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.guard
}
}
/// An owned guard that releases the mutex when dropped.
pub struct OwnedMutexGuard<T: ?Sized> {
guard: tokio::sync::OwnedMutexGuard<T>,
}
impl<T: ?Sized> std::ops::Deref for OwnedMutexGuard<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.guard
}
}
impl<T: ?Sized> std::ops::DerefMut for OwnedMutexGuard<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.guard
}
}
}
// When the coop feature is enabled, export our wrapped implementation
#[cfg(any(test, feature = "coop"))]
pub use coop_impl::*;
// Otherwise, re-export tokio's mutex types directly
#[cfg(not(any(test, feature = "coop")))]
pub use tokio::sync::{Mutex, MutexGuard};