Skip to main content

matrix_sdk/event_cache/caches/
lock.rs

1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A read-write, cross-thread, and cross-process lock.
16//!
17//! Such a lock is usually used to manage states of the various caches in the
18//! Event Cache, e.g. [`RoomEventCache`].
19//!
20//! [`RoomEventCache`]: super::super::RoomEventCache
21
22use std::ops::{Deref, DerefMut};
23
24use matrix_sdk_base::event_cache::store::{
25    EventCacheStoreLock, EventCacheStoreLockGuard, EventCacheStoreLockState,
26};
27use tokio::sync::{Mutex, RwLock, RwLockWriteGuard};
28
29use super::super::Result;
30
31/// State for a single cache.
32///
33/// This aims at containing all the inner mutable states that ought to be
34/// updated at the same time.
35pub struct StateLock<S> {
36    /// The per-thread lock around the real state.
37    locked_state: RwLock<S>,
38
39    /// A lock taken to avoid multiple attempts to upgrade from a read lock
40    /// to a write lock.
41    ///
42    /// Please see inline comment of [`Self::read`] to understand why it
43    /// exists.
44    state_lock_upgrade_mutex: Mutex<()>,
45}
46
47impl<S> StateLock<S> {
48    /// Construct a new [`StateLock`].
49    pub fn new_inner(state: S) -> Self {
50        Self { locked_state: RwLock::new(state), state_lock_upgrade_mutex: Mutex::new(()) }
51    }
52
53    /// Lock this [`StateLock`] with per-thread shared access.
54    ///
55    /// This method locks the per-thread lock over the state, and then locks
56    /// the cross-process lock over the store. It returns an RAII guard
57    /// which will drop the read access to the state and to the store when
58    /// dropped.
59    ///
60    /// If the cross-process lock over the store is dirty (see
61    /// [`EventCacheStoreLockState`]), the state is reset to the last chunk.
62    pub async fn read<'a>(&'a self) -> Result<StateLockReadGuard<'a, S>>
63    where
64        S: Store,
65        StateLockWriteGuard<'a, S>: Reload,
66    {
67        // Only one call at a time to `read` is allowed.
68        //
69        // Why? Because in case the cross-process lock over the store is dirty, we need
70        // to upgrade the read lock over the state to a write lock.
71        //
72        // ## Upgradable read lock
73        //
74        // One may argue that this upgrades can be done with an _upgradable read lock_
75        // [^1] [^2]. We don't want to use this solution: an upgradable read lock is
76        // basically a mutex because we are losing the shared access property, i.e.
77        // having multiple read locks at the same time. This is an important property to
78        // hold for performance concerns.
79        //
80        // ## Downgradable write lock
81        //
82        // One may also argue we could first obtain a write lock over the state from the
83        // beginning, thus removing the need to upgrade the read lock to a write lock.
84        // The write lock is then downgraded to a read lock once the dirty is cleaned
85        // up. It can potentially create a deadlock in the following situation:
86        //
87        // - `read` is called once, it takes a write lock, then downgrades it to a read
88        //   lock: the guard is kept alive somewhere,
89        // - `read` is called again, and waits to obtain the write lock, which is
90        //   impossible as long as the guard from the previous call is not dropped.
91        //
92        // ## “Atomic” read and write
93        //
94        // One may finally argue to first obtain a read lock over the state, then drop
95        // it if the cross-process lock over the store is dirty, and immediately obtain
96        // a write lock (which can later be downgraded to a read lock). The problem is
97        // that this write lock is async: anything can happen between the drop and the
98        // new lock acquisition, and it's not possible to pause the runtime in the
99        // meantime.
100        //
101        // ## Semaphore with 1 permit, aka a Mutex
102        //
103        // The chosen idea is to allow only one execution at a time of this method: it
104        // becomes a critical section. That way we are free to “upgrade” the read lock
105        // by dropping it and obtaining a new write lock. All callers to this method are
106        // waiting, so nothing can happen in the meantime.
107        //
108        // Note that it doesn't conflict with the `write` method because this latter
109        // immediately obtains a write lock, which avoids any conflict with this method.
110        //
111        // [^1]: https://docs.rs/lock_api/0.4.14/lock_api/struct.RwLock.html#method.upgradable_read
112        // [^2]: https://docs.rs/async-lock/3.4.1/async_lock/struct.RwLock.html#method.upgradable_read
113        let _state_lock_upgrade_guard = self.state_lock_upgrade_mutex.lock().await;
114
115        // Obtain a read lock.
116        let state_guard = self.locked_state.read().await;
117
118        match state_guard.store().lock().await? {
119            EventCacheStoreLockState::Clean(store_guard) => {
120                Ok(StateLockReadGuard { state: state_guard, store: store_guard })
121            }
122            EventCacheStoreLockState::Dirty(store_guard) => {
123                // Drop the read lock, and take a write lock to modify the state.
124                // This is safe because only one reader at a time (see
125                // `Self::state_lock_upgrade_mutex`) is allowed.
126                drop(state_guard);
127                let state_guard = self.locked_state.write().await;
128
129                let mut guard = StateLockWriteGuard { state: state_guard, store: store_guard };
130
131                // Reload the state.
132                guard.reload().await?;
133
134                // All good now, mark the cross-process lock as non-dirty.
135                EventCacheStoreLockGuard::clear_dirty(&guard.store);
136
137                // Downgrade the write guard to a read guard.
138                let guard = guard.downgrade();
139
140                Ok(guard)
141            }
142        }
143    }
144
145    /// Lock this [`StateLock`] with exclusive per-thread write access.
146    ///
147    /// This method locks the per-thread lock over the state, and then locks
148    /// the cross-process lock over the store. It returns an RAII guard
149    /// which will drop the write access to the state and to the store when
150    /// dropped.
151    ///
152    /// If the cross-process lock over the store is dirty (see
153    /// [`EventCacheStoreLockState`]), the state is reset to the last chunk.
154    pub async fn write<'a>(&'a self) -> Result<StateLockWriteGuard<'a, S>>
155    where
156        S: Store,
157        StateLockWriteGuard<'a, S>: Reload,
158    {
159        let state_guard = self.locked_state.write().await;
160
161        match state_guard.store().lock().await? {
162            EventCacheStoreLockState::Clean(store_guard) => {
163                Ok(StateLockWriteGuard { state: state_guard, store: store_guard })
164            }
165            EventCacheStoreLockState::Dirty(store_guard) => {
166                let mut guard = StateLockWriteGuard { state: state_guard, store: store_guard };
167
168                // Reload the state.
169                guard.reload().await?;
170
171                // All good now, mark the cross-process lock as non-dirty.
172                EventCacheStoreLockGuard::clear_dirty(&guard.store);
173
174                Ok(guard)
175            }
176        }
177    }
178}
179
180/// The read lock guard returned by [`StateLock::read`].
181pub struct StateLockReadGuard<'a, S> {
182    /// The per-thread read lock guard over the state `S`.
183    pub state: tokio::sync::RwLockReadGuard<'a, S>,
184
185    /// The cross-process lock guard over the store.
186    pub store: EventCacheStoreLockGuard,
187}
188
189impl<'a, S> Deref for StateLockReadGuard<'a, S> {
190    type Target = S;
191
192    fn deref(&self) -> &Self::Target {
193        &self.state
194    }
195}
196
197/// The write lock guard return by [`StateLock::write`].
198pub struct StateLockWriteGuard<'a, S> {
199    /// The per-thread write lock guard over the state `S`.
200    pub state: RwLockWriteGuard<'a, S>,
201
202    /// The cross-process lock guard over the store.
203    pub store: EventCacheStoreLockGuard,
204}
205
206impl<'a, S> Deref for StateLockWriteGuard<'a, S> {
207    type Target = S;
208
209    fn deref(&self) -> &Self::Target {
210        &self.state
211    }
212}
213
214impl<'a, S> DerefMut for StateLockWriteGuard<'a, S> {
215    fn deref_mut(&mut self) -> &mut Self::Target {
216        &mut self.state
217    }
218}
219
220impl<'a, S> StateLockWriteGuard<'a, S> {
221    /// Synchronously downgrades a write lock into a read lock.
222    ///
223    /// The per-thread/state lock is downgraded atomically, without allowing
224    /// any writers to take exclusive access of the lock in the meantime.
225    ///
226    /// It returns an RAII guard which will drop the read access to the
227    /// state and to the store when dropped.
228    fn downgrade(self) -> StateLockReadGuard<'a, S> {
229        StateLockReadGuard { state: self.state.downgrade(), store: self.store }
230    }
231}
232
233/// Trait to give access to the [`EventCacheStoreLock`].
234pub trait Store {
235    /// Return a reference to [`EventCacheStoreLock`].
236    fn store(&self) -> &EventCacheStoreLock;
237}
238
239/// Trait to reload the state `S` in [`StateLock`].
240pub trait Reload {
241    /// Reload the state entirely from zero.
242    async fn reload(&mut self) -> Result<()>;
243}