gstd/sync/
mutex.rs

1// This file is part of Gear.
2
3// Copyright (C) 2021-2025 Gear Technologies Inc.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use super::access::AccessQueue;
20use crate::{
21    BlockCount, BlockNumber, Config, MessageId, async_runtime,
22    errors::{Error, Result, UsageError},
23    exec, format, msg,
24};
25use core::{
26    cell::UnsafeCell,
27    future::Future,
28    ops::{Deref, DerefMut},
29    pin::Pin,
30    task::{Context, Poll},
31};
32
33static mut NEXT_MUTEX_ID: MutexId = MutexId::new();
34
35#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
36pub(crate) struct MutexId(u32);
37
38impl MutexId {
39    pub const fn new() -> Self {
40        MutexId(0)
41    }
42
43    pub fn next(self) -> Self {
44        Self(self.0.wrapping_add(1))
45    }
46}
47
48/// A mutual exclusion primitive useful for protecting shared data.
49///
50/// This mutex will block the execution waiting for the lock to become
51/// available. The mutex can be created via a [`new`](Mutex::new) constructor.
52/// Each mutex has a type parameter which represents the data that it is
53/// protecting. The data can only be accessed through the RAII guard
54/// [`MutexGuard`] returned from [`lock`](Mutex::lock),
55/// which guarantees that data access only occurs when the mutex is
56/// locked.
57///
58/// # Examples
59///
60/// This example (program A), after locking the mutex, sends the `PING` message
61/// to another program (program B) and waits for a reply. If any other program
62/// (program C) tries to invoke program A, it will wait until program A receives
63/// the `PONG` reply from program B and unlocks the mutex.
64///
65/// ```ignored
66/// use gstd::{msg, sync::Mutex, ActorId};
67///
68/// static mut DEST: ActorId = ActorId::zero();
69/// static MUTEX: Mutex<()> = Mutex::new(());
70///
71/// #[unsafe(no_mangle)]
72/// extern "C" fn init() {
73///     // `some_address` can be obtained from the init payload
74///     # let some_address = ActorId::zero();
75///     unsafe { DEST = some_address };
76/// }
77///
78/// #[gstd::async_main]
79/// async fn main() {
80///     let payload = msg::load_bytes().expect("Unable to load payload bytes");
81///     if payload == b"START" {
82///         let _unused = MUTEX.lock().await;
83///
84///         let reply = msg::send_bytes_for_reply(unsafe { DEST }, b"PING", 0, 0)
85///             .expect("Unable to send bytes")
86///             .await
87///             .expect("Error in async message processing");
88///
89///         if reply == b"PONG" {
90///             msg::reply(b"SUCCESS", 0).unwrap();
91///         } else {
92///             msg::reply(b"FAIL", 0).unwrap();
93///         }
94///     }
95/// }
96/// # fn main() {}
97/// ```
98pub struct Mutex<T> {
99    id: UnsafeCell<Option<MutexId>>,
100    locked: UnsafeCell<Option<(MessageId, BlockNumber)>>,
101    value: UnsafeCell<T>,
102    queue: AccessQueue,
103}
104
105impl<T> From<T> for Mutex<T> {
106    fn from(t: T) -> Self {
107        Mutex::new(t)
108    }
109}
110
111impl<T: Default> Default for Mutex<T> {
112    fn default() -> Self {
113        <T as Default>::default().into()
114    }
115}
116
117impl<T> Mutex<T> {
118    /// Create a new mutex in an unlocked state ready for use.
119    pub const fn new(t: T) -> Mutex<T> {
120        Mutex {
121            id: UnsafeCell::new(None),
122            value: UnsafeCell::new(t),
123            locked: UnsafeCell::new(None),
124            queue: AccessQueue::new(),
125        }
126    }
127
128    /// Acquire a mutex, protecting the subsequent code from execution by other
129    /// actors until the mutex hasn't been unlocked.
130    ///
131    /// This function will block access to the section of code by
132    /// other programs or users that invoke the same program. If another
133    /// actor reaches the code blocked by the mutex, it goes to the wait
134    /// state until the mutex unlocks. RAII guard wrapped in the future is
135    /// returned to allow scoped unlock of the lock. When the guard goes out
136    /// of scope, the mutex will be unlocked.
137    pub fn lock(&self) -> MutexLockFuture<'_, T> {
138        MutexLockFuture {
139            mutex_id: self.get_or_assign_id(),
140            mutex: self,
141            own_up_for: None,
142        }
143    }
144
145    // Returns a mutable reference to the mutex lock owner. The function uses unsafe
146    // code because it is called from the places where there is only non-mutable
147    // reference to the mutex exists, and the latter can't be turned into a
148    // mutable one as it will break logic around the `Mutex.lock` function which
149    // must be called on a non-mutable reference to the mutex.
150    #[allow(clippy::mut_from_ref)]
151    fn locked_by_mut(&self) -> &mut Option<(MessageId, BlockNumber)> {
152        unsafe { &mut *self.locked.get() }
153    }
154
155    fn get_or_assign_id(&self) -> MutexId {
156        let id = unsafe { &mut *self.id.get() };
157        *id.get_or_insert_with(|| unsafe {
158            let id = NEXT_MUTEX_ID;
159            NEXT_MUTEX_ID = NEXT_MUTEX_ID.next();
160            id
161        })
162    }
163}
164
165/// An RAII implementation of a "scoped lock" of a mutex. When this structure is
166/// dropped (falls out of scope), the lock will be unlocked.
167///
168/// The data protected by the mutex is accessible through this guard via its
169/// [`Deref`] and [`DerefMut`] implementations.
170///
171/// This structure wrapped in the future is returned by the
172/// [`lock`](Mutex::lock) method on [`Mutex`].
173pub struct MutexGuard<'a, T> {
174    mutex: &'a Mutex<T>,
175    holder_msg_id: MessageId,
176}
177
178impl<T> MutexGuard<'_, T> {
179    #[track_caller]
180    fn ensure_access_by_holder(&self) {
181        let current_msg_id = msg::id();
182        if self.holder_msg_id != current_msg_id {
183            panic!(
184                "Mutex guard held by message 0x{} is being accessed by message 0x{}",
185                hex::encode(self.holder_msg_id),
186                hex::encode(current_msg_id)
187            );
188        }
189    }
190}
191
192impl<T> Drop for MutexGuard<'_, T> {
193    fn drop(&mut self) {
194        let is_holder_msg_signal_handler = match () {
195            #[cfg(not(feature = "ethexe"))]
196            () => msg::signal_from() == Ok(self.holder_msg_id),
197            #[cfg(feature = "ethexe")]
198            () => false,
199        };
200
201        if !is_holder_msg_signal_handler {
202            self.ensure_access_by_holder();
203        }
204
205        let locked_by = self.mutex.locked_by_mut();
206        let owner_msg_id = locked_by.map(|v| v.0);
207
208        if owner_msg_id != Some(self.holder_msg_id) && !is_holder_msg_signal_handler {
209            // If owner_msg_id is None or not equal to the holder_msg_id, firstly, it means
210            // we are in the message signal handler and, secondly, the lock was seized by
211            // some other message. In this case, the next rival message was
212            // awoken by the ousting mechanism in the MutexLockFuture::poll
213            panic!(
214                "Mutex guard held by message 0x{} does not match lock owner message {}",
215                hex::encode(self.holder_msg_id),
216                owner_msg_id.map_or("None".into(), |v| format!("0x{}", hex::encode(v)))
217            );
218        }
219
220        if owner_msg_id == Some(self.holder_msg_id) {
221            if let Some(message_id) = self.mutex.queue.dequeue() {
222                exec::wake(message_id).expect("Failed to wake the message");
223            }
224            *locked_by = None;
225        }
226    }
227}
228
229impl<'a, T> AsRef<T> for MutexGuard<'a, T> {
230    fn as_ref(&self) -> &'a T {
231        self.ensure_access_by_holder();
232        unsafe { &*self.mutex.value.get() }
233    }
234}
235
236impl<'a, T> AsMut<T> for MutexGuard<'a, T> {
237    fn as_mut(&mut self) -> &'a mut T {
238        self.ensure_access_by_holder();
239        unsafe { &mut *self.mutex.value.get() }
240    }
241}
242
243impl<T> Deref for MutexGuard<'_, T> {
244    type Target = T;
245
246    fn deref(&self) -> &T {
247        self.ensure_access_by_holder();
248        unsafe { &*self.mutex.value.get() }
249    }
250}
251
252impl<T> DerefMut for MutexGuard<'_, T> {
253    fn deref_mut(&mut self) -> &mut T {
254        self.ensure_access_by_holder();
255        unsafe { &mut *self.mutex.value.get() }
256    }
257}
258
259// we are always single-threaded
260unsafe impl<T> Sync for Mutex<T> {}
261
262/// The future returned by the [`lock`](Mutex::lock) method.
263///
264/// The output of the future is the [`MutexGuard`] that can be obtained by using
265/// `await` syntax.
266///
267/// # Examples
268///
269/// In the following example, variable types are annotated explicitly for
270/// demonstration purposes only. Usually, annotating them is unnecessary because
271/// they can be inferred automatically.
272///
273/// ```
274/// use gstd::sync::{Mutex, MutexGuard, MutexLockFuture};
275///
276/// #[gstd::async_main]
277/// async fn main() {
278///     let mutex: Mutex<i32> = Mutex::new(42);
279///     let future: MutexLockFuture<i32> = mutex.lock();
280///     let guard: MutexGuard<i32> = future.await;
281///     let value: i32 = *guard;
282///     assert_eq!(value, 42);
283/// }
284/// # fn main() {}
285/// ```
286pub struct MutexLockFuture<'a, T> {
287    mutex_id: MutexId,
288    mutex: &'a Mutex<T>,
289    // The maximum number of blocks the mutex lock can be owned.
290    // If the value is None, the default value taken from the `Config::mx_lock_duration` is used.
291    own_up_for: Option<BlockCount>,
292}
293
294impl<'a, T> MutexLockFuture<'a, T> {
295    /// Sets the maximum number of blocks the mutex lock can be owned by
296    /// some message before the ownership can be seized by another rival
297    pub fn own_up_for(self, block_count: BlockCount) -> Result<Self> {
298        if block_count == 0 {
299            Err(Error::Gstd(UsageError::ZeroMxLockDuration))
300        } else {
301            Ok(MutexLockFuture {
302                mutex_id: self.mutex_id,
303                mutex: self.mutex,
304                own_up_for: Some(block_count),
305            })
306        }
307    }
308
309    fn acquire_lock_ownership(
310        &mut self,
311        owner_msg_id: MessageId,
312        current_block: BlockNumber,
313    ) -> Poll<MutexGuard<'a, T>> {
314        let owner_deadline_block =
315            current_block.saturating_add(self.own_up_for.unwrap_or_else(Config::mx_lock_duration));
316        async_runtime::locks().remove_mx_lock_monitor(owner_msg_id, self.mutex_id);
317        if let Some(next_rival_msg_id) = self.mutex.queue.first() {
318            // Give the next rival message a chance to own the lock after this owner
319            // exceeds the lock ownership duration
320            async_runtime::locks().insert_mx_lock_monitor(
321                *next_rival_msg_id,
322                self.mutex_id,
323                owner_deadline_block,
324            );
325        }
326        let locked_by = self.mutex.locked_by_mut();
327        *locked_by = Some((owner_msg_id, owner_deadline_block));
328        Poll::Ready(MutexGuard {
329            mutex: self.mutex,
330            holder_msg_id: owner_msg_id,
331        })
332    }
333
334    fn queue_for_lock_ownership(
335        &mut self,
336        rival_msg_id: MessageId,
337        owner_deadline_block: Option<BlockNumber>,
338    ) -> Poll<MutexGuard<'a, T>> {
339        // If the message is already in the access queue, and we come here,
340        // it means the message has just been woken up from the waitlist.
341        // In that case we do not want to register yet another access attempt
342        // and just go back to the waitlist
343        if !self.mutex.queue.contains(&rival_msg_id) {
344            self.mutex.queue.enqueue(rival_msg_id);
345            if let Some(owner_deadline_block) = owner_deadline_block {
346                // Lock owner did not know about this message when it was getting into
347                // lock ownership. We have to take care of ourselves and give us a chance
348                // to oust the lock owner when the lock ownership duration expires
349                if self.mutex.queue.len() == 1 {
350                    async_runtime::locks().insert_mx_lock_monitor(
351                        rival_msg_id,
352                        self.mutex_id,
353                        owner_deadline_block,
354                    );
355                }
356            }
357        }
358        Poll::Pending
359    }
360}
361
362impl<'a, T> Future for MutexLockFuture<'a, T> {
363    type Output = MutexGuard<'a, T>;
364
365    // In case of locked mutex and an `.await`, function `poll` checks if the
366    // mutex can be taken, else it waits (goes into *waiting queue*).
367    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
368        let current_msg_id = msg::id();
369        let current_block = exec::block_height();
370        let locked_by = self.mutex.locked_by_mut();
371
372        if locked_by.is_none() {
373            return self
374                .get_mut()
375                .acquire_lock_ownership(current_msg_id, current_block);
376        }
377
378        let (lock_owner_msg_id, deadline_block) =
379            (*locked_by).unwrap_or_else(|| unreachable!("Checked above"));
380
381        if current_block < deadline_block {
382            return self
383                .get_mut()
384                .queue_for_lock_ownership(current_msg_id, Some(deadline_block));
385        }
386
387        if let Some(msg_future_task) = async_runtime::futures().get_mut(&lock_owner_msg_id) {
388            msg_future_task.set_lock_exceeded();
389            exec::wake(lock_owner_msg_id).expect("Failed to wake the message");
390        }
391
392        while let Some(next_msg_id) = self.mutex.queue.dequeue() {
393            if next_msg_id == lock_owner_msg_id {
394                continue;
395            }
396            if next_msg_id == current_msg_id {
397                break;
398            }
399            exec::wake(next_msg_id).expect("Failed to wake the message");
400            *locked_by = None;
401            // We have just woken up the next lock owner, but we don't know its ownership
402            // duration, thus we pass None as owner_deadline_block. The woken up message
403            // will give us a chance to own the lock itself by registering a
404            // lock monitor for us
405            return self
406                .get_mut()
407                .queue_for_lock_ownership(current_msg_id, None);
408        }
409
410        self.get_mut()
411            .acquire_lock_ownership(current_msg_id, current_block)
412    }
413}