matrix_sdk/event_cache/caches/lock.rs
1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A read-write, cross-thread, and cross-process lock.
16//!
17//! Such a lock is usually used to manage states of the various caches in the
18//! Event Cache, e.g. [`RoomEventCache`].
19//!
20//! [`RoomEventCache`]: super::super::RoomEventCache
21
22use std::ops::{Deref, DerefMut};
23
24use matrix_sdk_base::event_cache::store::{
25 EventCacheStoreLock, EventCacheStoreLockGuard, EventCacheStoreLockState,
26};
27use tokio::sync::{Mutex, RwLock, RwLockWriteGuard};
28
29use super::super::Result;
30
31/// State for a single cache.
32///
33/// This aims at containing all the inner mutable states that ought to be
34/// updated at the same time.
35pub struct StateLock<S> {
36 /// The per-thread lock around the real state.
37 locked_state: RwLock<S>,
38
39 /// A lock taken to avoid multiple attempts to upgrade from a read lock
40 /// to a write lock.
41 ///
42 /// Please see inline comment of [`Self::read`] to understand why it
43 /// exists.
44 state_lock_upgrade_mutex: Mutex<()>,
45}
46
47impl<S> StateLock<S> {
48 /// Construct a new [`StateLock`].
49 pub fn new_inner(state: S) -> Self {
50 Self { locked_state: RwLock::new(state), state_lock_upgrade_mutex: Mutex::new(()) }
51 }
52
53 /// Lock this [`StateLock`] with per-thread shared access.
54 ///
55 /// This method locks the per-thread lock over the state, and then locks
56 /// the cross-process lock over the store. It returns an RAII guard
57 /// which will drop the read access to the state and to the store when
58 /// dropped.
59 ///
60 /// If the cross-process lock over the store is dirty (see
61 /// [`EventCacheStoreLockState`]), the state is reset to the last chunk.
62 pub async fn read<'a>(&'a self) -> Result<StateLockReadGuard<'a, S>>
63 where
64 S: Store,
65 StateLockWriteGuard<'a, S>: Reload,
66 {
67 // Only one call at a time to `read` is allowed.
68 //
69 // Why? Because in case the cross-process lock over the store is dirty, we need
70 // to upgrade the read lock over the state to a write lock.
71 //
72 // ## Upgradable read lock
73 //
74 // One may argue that this upgrades can be done with an _upgradable read lock_
75 // [^1] [^2]. We don't want to use this solution: an upgradable read lock is
76 // basically a mutex because we are losing the shared access property, i.e.
77 // having multiple read locks at the same time. This is an important property to
78 // hold for performance concerns.
79 //
80 // ## Downgradable write lock
81 //
82 // One may also argue we could first obtain a write lock over the state from the
83 // beginning, thus removing the need to upgrade the read lock to a write lock.
84 // The write lock is then downgraded to a read lock once the dirty is cleaned
85 // up. It can potentially create a deadlock in the following situation:
86 //
87 // - `read` is called once, it takes a write lock, then downgrades it to a read
88 // lock: the guard is kept alive somewhere,
89 // - `read` is called again, and waits to obtain the write lock, which is
90 // impossible as long as the guard from the previous call is not dropped.
91 //
92 // ## “Atomic” read and write
93 //
94 // One may finally argue to first obtain a read lock over the state, then drop
95 // it if the cross-process lock over the store is dirty, and immediately obtain
96 // a write lock (which can later be downgraded to a read lock). The problem is
97 // that this write lock is async: anything can happen between the drop and the
98 // new lock acquisition, and it's not possible to pause the runtime in the
99 // meantime.
100 //
101 // ## Semaphore with 1 permit, aka a Mutex
102 //
103 // The chosen idea is to allow only one execution at a time of this method: it
104 // becomes a critical section. That way we are free to “upgrade” the read lock
105 // by dropping it and obtaining a new write lock. All callers to this method are
106 // waiting, so nothing can happen in the meantime.
107 //
108 // Note that it doesn't conflict with the `write` method because this latter
109 // immediately obtains a write lock, which avoids any conflict with this method.
110 //
111 // [^1]: https://docs.rs/lock_api/0.4.14/lock_api/struct.RwLock.html#method.upgradable_read
112 // [^2]: https://docs.rs/async-lock/3.4.1/async_lock/struct.RwLock.html#method.upgradable_read
113 let _state_lock_upgrade_guard = self.state_lock_upgrade_mutex.lock().await;
114
115 // Obtain a read lock.
116 let state_guard = self.locked_state.read().await;
117
118 match state_guard.store().lock().await? {
119 EventCacheStoreLockState::Clean(store_guard) => {
120 Ok(StateLockReadGuard { state: state_guard, store: store_guard })
121 }
122 EventCacheStoreLockState::Dirty(store_guard) => {
123 // Drop the read lock, and take a write lock to modify the state.
124 // This is safe because only one reader at a time (see
125 // `Self::state_lock_upgrade_mutex`) is allowed.
126 drop(state_guard);
127 let state_guard = self.locked_state.write().await;
128
129 let mut guard = StateLockWriteGuard { state: state_guard, store: store_guard };
130
131 // Reload the state.
132 guard.reload().await?;
133
134 // All good now, mark the cross-process lock as non-dirty.
135 EventCacheStoreLockGuard::clear_dirty(&guard.store);
136
137 // Downgrade the write guard to a read guard.
138 let guard = guard.downgrade();
139
140 Ok(guard)
141 }
142 }
143 }
144
145 /// Lock this [`StateLock`] with exclusive per-thread write access.
146 ///
147 /// This method locks the per-thread lock over the state, and then locks
148 /// the cross-process lock over the store. It returns an RAII guard
149 /// which will drop the write access to the state and to the store when
150 /// dropped.
151 ///
152 /// If the cross-process lock over the store is dirty (see
153 /// [`EventCacheStoreLockState`]), the state is reset to the last chunk.
154 pub async fn write<'a>(&'a self) -> Result<StateLockWriteGuard<'a, S>>
155 where
156 S: Store,
157 StateLockWriteGuard<'a, S>: Reload,
158 {
159 let state_guard = self.locked_state.write().await;
160
161 match state_guard.store().lock().await? {
162 EventCacheStoreLockState::Clean(store_guard) => {
163 Ok(StateLockWriteGuard { state: state_guard, store: store_guard })
164 }
165 EventCacheStoreLockState::Dirty(store_guard) => {
166 let mut guard = StateLockWriteGuard { state: state_guard, store: store_guard };
167
168 // Reload the state.
169 guard.reload().await?;
170
171 // All good now, mark the cross-process lock as non-dirty.
172 EventCacheStoreLockGuard::clear_dirty(&guard.store);
173
174 Ok(guard)
175 }
176 }
177 }
178}
179
180/// The read lock guard returned by [`StateLock::read`].
181pub struct StateLockReadGuard<'a, S> {
182 /// The per-thread read lock guard over the state `S`.
183 pub state: tokio::sync::RwLockReadGuard<'a, S>,
184
185 /// The cross-process lock guard over the store.
186 pub store: EventCacheStoreLockGuard,
187}
188
189impl<'a, S> Deref for StateLockReadGuard<'a, S> {
190 type Target = S;
191
192 fn deref(&self) -> &Self::Target {
193 &self.state
194 }
195}
196
197/// The write lock guard return by [`StateLock::write`].
198pub struct StateLockWriteGuard<'a, S> {
199 /// The per-thread write lock guard over the state `S`.
200 pub state: RwLockWriteGuard<'a, S>,
201
202 /// The cross-process lock guard over the store.
203 pub store: EventCacheStoreLockGuard,
204}
205
206impl<'a, S> Deref for StateLockWriteGuard<'a, S> {
207 type Target = S;
208
209 fn deref(&self) -> &Self::Target {
210 &self.state
211 }
212}
213
214impl<'a, S> DerefMut for StateLockWriteGuard<'a, S> {
215 fn deref_mut(&mut self) -> &mut Self::Target {
216 &mut self.state
217 }
218}
219
220impl<'a, S> StateLockWriteGuard<'a, S> {
221 /// Synchronously downgrades a write lock into a read lock.
222 ///
223 /// The per-thread/state lock is downgraded atomically, without allowing
224 /// any writers to take exclusive access of the lock in the meantime.
225 ///
226 /// It returns an RAII guard which will drop the read access to the
227 /// state and to the store when dropped.
228 fn downgrade(self) -> StateLockReadGuard<'a, S> {
229 StateLockReadGuard { state: self.state.downgrade(), store: self.store }
230 }
231}
232
233/// Trait to give access to the [`EventCacheStoreLock`].
234pub trait Store {
235 /// Return a reference to [`EventCacheStoreLock`].
236 fn store(&self) -> &EventCacheStoreLock;
237}
238
239/// Trait to reload the state `S` in [`StateLock`].
240pub trait Reload {
241 /// Reload the state entirely from zero.
242 async fn reload(&mut self) -> Result<()>;
243}