gstd/sync/mutex.rs
1// This file is part of Gear.
2
3// Copyright (C) 2021-2025 Gear Technologies Inc.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use super::access::AccessQueue;
20use crate::{
21 BlockCount, BlockNumber, Config, MessageId, async_runtime,
22 errors::{Error, Result, UsageError},
23 exec, format, msg,
24};
25use core::{
26 cell::UnsafeCell,
27 future::Future,
28 ops::{Deref, DerefMut},
29 pin::Pin,
30 task::{Context, Poll},
31};
32
33static mut NEXT_MUTEX_ID: MutexId = MutexId::new();
34
35#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
36pub(crate) struct MutexId(u32);
37
38impl MutexId {
39 pub const fn new() -> Self {
40 MutexId(0)
41 }
42
43 pub fn next(self) -> Self {
44 Self(self.0.wrapping_add(1))
45 }
46}
47
48/// A mutual exclusion primitive useful for protecting shared data.
49///
50/// This mutex will block the execution waiting for the lock to become
51/// available. The mutex can be created via a [`new`](Mutex::new) constructor.
52/// Each mutex has a type parameter which represents the data that it is
53/// protecting. The data can only be accessed through the RAII guard
54/// [`MutexGuard`] returned from [`lock`](Mutex::lock),
55/// which guarantees that data access only occurs when the mutex is
56/// locked.
57///
58/// # Examples
59///
60/// This example (program A), after locking the mutex, sends the `PING` message
61/// to another program (program B) and waits for a reply. If any other program
62/// (program C) tries to invoke program A, it will wait until program A receives
63/// the `PONG` reply from program B and unlocks the mutex.
64///
65/// ```ignored
66/// use gstd::{msg, sync::Mutex, ActorId};
67///
68/// static mut DEST: ActorId = ActorId::zero();
69/// static MUTEX: Mutex<()> = Mutex::new(());
70///
71/// #[unsafe(no_mangle)]
72/// extern "C" fn init() {
73/// // `some_address` can be obtained from the init payload
74/// # let some_address = ActorId::zero();
75/// unsafe { DEST = some_address };
76/// }
77///
78/// #[gstd::async_main]
79/// async fn main() {
80/// let payload = msg::load_bytes().expect("Unable to load payload bytes");
81/// if payload == b"START" {
82/// let _unused = MUTEX.lock().await;
83///
84/// let reply = msg::send_bytes_for_reply(unsafe { DEST }, b"PING", 0, 0)
85/// .expect("Unable to send bytes")
86/// .await
87/// .expect("Error in async message processing");
88///
89/// if reply == b"PONG" {
90/// msg::reply(b"SUCCESS", 0).unwrap();
91/// } else {
92/// msg::reply(b"FAIL", 0).unwrap();
93/// }
94/// }
95/// }
96/// # fn main() {}
97/// ```
98pub struct Mutex<T> {
99 id: UnsafeCell<Option<MutexId>>,
100 locked: UnsafeCell<Option<(MessageId, BlockNumber)>>,
101 value: UnsafeCell<T>,
102 queue: AccessQueue,
103}
104
105impl<T> From<T> for Mutex<T> {
106 fn from(t: T) -> Self {
107 Mutex::new(t)
108 }
109}
110
111impl<T: Default> Default for Mutex<T> {
112 fn default() -> Self {
113 <T as Default>::default().into()
114 }
115}
116
117impl<T> Mutex<T> {
118 /// Create a new mutex in an unlocked state ready for use.
119 pub const fn new(t: T) -> Mutex<T> {
120 Mutex {
121 id: UnsafeCell::new(None),
122 value: UnsafeCell::new(t),
123 locked: UnsafeCell::new(None),
124 queue: AccessQueue::new(),
125 }
126 }
127
128 /// Acquire a mutex, protecting the subsequent code from execution by other
129 /// actors until the mutex hasn't been unlocked.
130 ///
131 /// This function will block access to the section of code by
132 /// other programs or users that invoke the same program. If another
133 /// actor reaches the code blocked by the mutex, it goes to the wait
134 /// state until the mutex unlocks. RAII guard wrapped in the future is
135 /// returned to allow scoped unlock of the lock. When the guard goes out
136 /// of scope, the mutex will be unlocked.
137 pub fn lock(&self) -> MutexLockFuture<'_, T> {
138 MutexLockFuture {
139 mutex_id: self.get_or_assign_id(),
140 mutex: self,
141 own_up_for: None,
142 }
143 }
144
145 // Returns a mutable reference to the mutex lock owner. The function uses unsafe
146 // code because it is called from the places where there is only non-mutable
147 // reference to the mutex exists, and the latter can't be turned into a
148 // mutable one as it will break logic around the `Mutex.lock` function which
149 // must be called on a non-mutable reference to the mutex.
150 #[allow(clippy::mut_from_ref)]
151 fn locked_by_mut(&self) -> &mut Option<(MessageId, BlockNumber)> {
152 unsafe { &mut *self.locked.get() }
153 }
154
155 fn get_or_assign_id(&self) -> MutexId {
156 let id = unsafe { &mut *self.id.get() };
157 *id.get_or_insert_with(|| unsafe {
158 let id = NEXT_MUTEX_ID;
159 NEXT_MUTEX_ID = NEXT_MUTEX_ID.next();
160 id
161 })
162 }
163}
164
165/// An RAII implementation of a "scoped lock" of a mutex. When this structure is
166/// dropped (falls out of scope), the lock will be unlocked.
167///
168/// The data protected by the mutex is accessible through this guard via its
169/// [`Deref`] and [`DerefMut`] implementations.
170///
171/// This structure wrapped in the future is returned by the
172/// [`lock`](Mutex::lock) method on [`Mutex`].
173pub struct MutexGuard<'a, T> {
174 mutex: &'a Mutex<T>,
175 holder_msg_id: MessageId,
176}
177
178impl<T> MutexGuard<'_, T> {
179 #[track_caller]
180 fn ensure_access_by_holder(&self) {
181 let current_msg_id = msg::id();
182 if self.holder_msg_id != current_msg_id {
183 panic!(
184 "Mutex guard held by message 0x{} is being accessed by message 0x{}",
185 hex::encode(self.holder_msg_id),
186 hex::encode(current_msg_id)
187 );
188 }
189 }
190}
191
192impl<T> Drop for MutexGuard<'_, T> {
193 fn drop(&mut self) {
194 let is_holder_msg_signal_handler = match () {
195 #[cfg(not(feature = "ethexe"))]
196 () => msg::signal_from() == Ok(self.holder_msg_id),
197 #[cfg(feature = "ethexe")]
198 () => false,
199 };
200
201 if !is_holder_msg_signal_handler {
202 self.ensure_access_by_holder();
203 }
204
205 let locked_by = self.mutex.locked_by_mut();
206 let owner_msg_id = locked_by.map(|v| v.0);
207
208 if owner_msg_id != Some(self.holder_msg_id) && !is_holder_msg_signal_handler {
209 // If owner_msg_id is None or not equal to the holder_msg_id, firstly, it means
210 // we are in the message signal handler and, secondly, the lock was seized by
211 // some other message. In this case, the next rival message was
212 // awoken by the ousting mechanism in the MutexLockFuture::poll
213 panic!(
214 "Mutex guard held by message 0x{} does not match lock owner message {}",
215 hex::encode(self.holder_msg_id),
216 owner_msg_id.map_or("None".into(), |v| format!("0x{}", hex::encode(v)))
217 );
218 }
219
220 if owner_msg_id == Some(self.holder_msg_id) {
221 if let Some(message_id) = self.mutex.queue.dequeue() {
222 exec::wake(message_id).expect("Failed to wake the message");
223 }
224 *locked_by = None;
225 }
226 }
227}
228
229impl<'a, T> AsRef<T> for MutexGuard<'a, T> {
230 fn as_ref(&self) -> &'a T {
231 self.ensure_access_by_holder();
232 unsafe { &*self.mutex.value.get() }
233 }
234}
235
236impl<'a, T> AsMut<T> for MutexGuard<'a, T> {
237 fn as_mut(&mut self) -> &'a mut T {
238 self.ensure_access_by_holder();
239 unsafe { &mut *self.mutex.value.get() }
240 }
241}
242
243impl<T> Deref for MutexGuard<'_, T> {
244 type Target = T;
245
246 fn deref(&self) -> &T {
247 self.ensure_access_by_holder();
248 unsafe { &*self.mutex.value.get() }
249 }
250}
251
252impl<T> DerefMut for MutexGuard<'_, T> {
253 fn deref_mut(&mut self) -> &mut T {
254 self.ensure_access_by_holder();
255 unsafe { &mut *self.mutex.value.get() }
256 }
257}
258
259// we are always single-threaded
260unsafe impl<T> Sync for Mutex<T> {}
261
262/// The future returned by the [`lock`](Mutex::lock) method.
263///
264/// The output of the future is the [`MutexGuard`] that can be obtained by using
265/// `await` syntax.
266///
267/// # Examples
268///
269/// In the following example, variable types are annotated explicitly for
270/// demonstration purposes only. Usually, annotating them is unnecessary because
271/// they can be inferred automatically.
272///
273/// ```
274/// use gstd::sync::{Mutex, MutexGuard, MutexLockFuture};
275///
276/// #[gstd::async_main]
277/// async fn main() {
278/// let mutex: Mutex<i32> = Mutex::new(42);
279/// let future: MutexLockFuture<i32> = mutex.lock();
280/// let guard: MutexGuard<i32> = future.await;
281/// let value: i32 = *guard;
282/// assert_eq!(value, 42);
283/// }
284/// # fn main() {}
285/// ```
286pub struct MutexLockFuture<'a, T> {
287 mutex_id: MutexId,
288 mutex: &'a Mutex<T>,
289 // The maximum number of blocks the mutex lock can be owned.
290 // If the value is None, the default value taken from the `Config::mx_lock_duration` is used.
291 own_up_for: Option<BlockCount>,
292}
293
294impl<'a, T> MutexLockFuture<'a, T> {
295 /// Sets the maximum number of blocks the mutex lock can be owned by
296 /// some message before the ownership can be seized by another rival
297 pub fn own_up_for(self, block_count: BlockCount) -> Result<Self> {
298 if block_count == 0 {
299 Err(Error::Gstd(UsageError::ZeroMxLockDuration))
300 } else {
301 Ok(MutexLockFuture {
302 mutex_id: self.mutex_id,
303 mutex: self.mutex,
304 own_up_for: Some(block_count),
305 })
306 }
307 }
308
309 fn acquire_lock_ownership(
310 &mut self,
311 owner_msg_id: MessageId,
312 current_block: BlockNumber,
313 ) -> Poll<MutexGuard<'a, T>> {
314 let owner_deadline_block =
315 current_block.saturating_add(self.own_up_for.unwrap_or_else(Config::mx_lock_duration));
316 async_runtime::locks().remove_mx_lock_monitor(owner_msg_id, self.mutex_id);
317 if let Some(next_rival_msg_id) = self.mutex.queue.first() {
318 // Give the next rival message a chance to own the lock after this owner
319 // exceeds the lock ownership duration
320 async_runtime::locks().insert_mx_lock_monitor(
321 *next_rival_msg_id,
322 self.mutex_id,
323 owner_deadline_block,
324 );
325 }
326 let locked_by = self.mutex.locked_by_mut();
327 *locked_by = Some((owner_msg_id, owner_deadline_block));
328 Poll::Ready(MutexGuard {
329 mutex: self.mutex,
330 holder_msg_id: owner_msg_id,
331 })
332 }
333
334 fn queue_for_lock_ownership(
335 &mut self,
336 rival_msg_id: MessageId,
337 owner_deadline_block: Option<BlockNumber>,
338 ) -> Poll<MutexGuard<'a, T>> {
339 // If the message is already in the access queue, and we come here,
340 // it means the message has just been woken up from the waitlist.
341 // In that case we do not want to register yet another access attempt
342 // and just go back to the waitlist
343 if !self.mutex.queue.contains(&rival_msg_id) {
344 self.mutex.queue.enqueue(rival_msg_id);
345 if let Some(owner_deadline_block) = owner_deadline_block {
346 // Lock owner did not know about this message when it was getting into
347 // lock ownership. We have to take care of ourselves and give us a chance
348 // to oust the lock owner when the lock ownership duration expires
349 if self.mutex.queue.len() == 1 {
350 async_runtime::locks().insert_mx_lock_monitor(
351 rival_msg_id,
352 self.mutex_id,
353 owner_deadline_block,
354 );
355 }
356 }
357 }
358 Poll::Pending
359 }
360}
361
362impl<'a, T> Future for MutexLockFuture<'a, T> {
363 type Output = MutexGuard<'a, T>;
364
365 // In case of locked mutex and an `.await`, function `poll` checks if the
366 // mutex can be taken, else it waits (goes into *waiting queue*).
367 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
368 let current_msg_id = msg::id();
369 let current_block = exec::block_height();
370 let locked_by = self.mutex.locked_by_mut();
371
372 if locked_by.is_none() {
373 return self
374 .get_mut()
375 .acquire_lock_ownership(current_msg_id, current_block);
376 }
377
378 let (lock_owner_msg_id, deadline_block) =
379 (*locked_by).unwrap_or_else(|| unreachable!("Checked above"));
380
381 if current_block < deadline_block {
382 return self
383 .get_mut()
384 .queue_for_lock_ownership(current_msg_id, Some(deadline_block));
385 }
386
387 if let Some(msg_future_task) = async_runtime::futures().get_mut(&lock_owner_msg_id) {
388 msg_future_task.set_lock_exceeded();
389 exec::wake(lock_owner_msg_id).expect("Failed to wake the message");
390 }
391
392 while let Some(next_msg_id) = self.mutex.queue.dequeue() {
393 if next_msg_id == lock_owner_msg_id {
394 continue;
395 }
396 if next_msg_id == current_msg_id {
397 break;
398 }
399 exec::wake(next_msg_id).expect("Failed to wake the message");
400 *locked_by = None;
401 // We have just woken up the next lock owner, but we don't know its ownership
402 // duration, thus we pass None as owner_deadline_block. The woken up message
403 // will give us a chance to own the lock itself by registering a
404 // lock monitor for us
405 return self
406 .get_mut()
407 .queue_for_lock_ownership(current_msg_id, None);
408 }
409
410 self.get_mut()
411 .acquire_lock_ownership(current_msg_id, current_block)
412 }
413}