pingora_cache/
lock.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Cache lock
16
17use crate::{hashtable::ConcurrentHashTable, key::CacheHashKey, CacheKey};
18
19use pingora_timeout::timeout;
20use std::sync::Arc;
21
22pub type CacheKeyLockImpl = (dyn CacheKeyLock + Send + Sync);
23
24pub trait CacheKeyLock {
25    /// Try to lock a cache fetch
26    ///
27    /// Users should call after a cache miss before fetching the asset.
28    /// The returned [Locked] will tell the caller either to fetch or wait.
29    fn lock(&self, key: &CacheKey) -> Locked;
30
31    /// Release a lock for the given key
32    ///
33    /// When the write lock is dropped without being released, the read lock holders will consider
34    /// it to be failed so that they will compete for the write lock again.
35    fn release(&self, key: &CacheKey, permit: WritePermit, reason: LockStatus);
36}
37
38const N_SHARDS: usize = 16;
39
40/// The global cache locking manager
41pub struct CacheLock {
42    lock_table: ConcurrentHashTable<LockStub, N_SHARDS>,
43    timeout: Duration, // fixed timeout value for now
44}
45
46/// A struct representing locked cache access
47#[derive(Debug)]
48pub enum Locked {
49    /// The writer is allowed to fetch the asset
50    Write(WritePermit),
51    /// The reader waits for the writer to fetch the asset
52    Read(ReadLock),
53}
54
55impl Locked {
56    /// Is this a write lock
57    pub fn is_write(&self) -> bool {
58        matches!(self, Self::Write(_))
59    }
60}
61
62impl CacheLock {
63    /// Create a new [CacheLock] with the given lock timeout
64    ///
65    /// When the timeout is reached, the read locks are automatically unlocked
66    pub fn new_boxed(timeout: Duration) -> Box<Self> {
67        Box::new(CacheLock {
68            lock_table: ConcurrentHashTable::new(),
69            timeout,
70        })
71    }
72
73    /// Create a new [CacheLock] with the given lock timeout
74    ///
75    /// When the timeout is reached, the read locks are automatically unlocked
76    pub fn new(timeout: Duration) -> Self {
77        CacheLock {
78            lock_table: ConcurrentHashTable::new(),
79            timeout,
80        }
81    }
82}
83
84impl CacheKeyLock for CacheLock {
85    fn lock(&self, key: &CacheKey) -> Locked {
86        let hash = key.combined_bin();
87        let key = u128::from_be_bytes(hash); // endianness doesn't matter
88        let table = self.lock_table.get(key);
89        if let Some(lock) = table.read().get(&key) {
90            // already has an ongoing request
91            if lock.0.lock_status() != LockStatus::Dangling {
92                return Locked::Read(lock.read_lock());
93            }
94            // Dangling: the previous writer quit without unlocking the lock. Requests should
95            // compete for the write lock again.
96        }
97
98        let mut table = table.write();
99        // check again in case another request already added it
100        if let Some(lock) = table.get(&key) {
101            if lock.0.lock_status() != LockStatus::Dangling {
102                return Locked::Read(lock.read_lock());
103            }
104        }
105        let (permit, stub) = WritePermit::new(self.timeout);
106        table.insert(key, stub);
107        Locked::Write(permit)
108    }
109
110    fn release(&self, key: &CacheKey, mut permit: WritePermit, reason: LockStatus) {
111        let hash = key.combined_bin();
112        let key = u128::from_be_bytes(hash); // endianness doesn't matter
113        if let Some(_lock) = self.lock_table.write(key).remove(&key) {
114            // make sure that the caller didn't forget to unlock it
115            permit.unlock(reason);
116        }
117    }
118}
119
120use log::warn;
121use std::sync::atomic::{AtomicU8, Ordering};
122use std::time::{Duration, Instant};
123use strum::IntoStaticStr;
124use tokio::sync::Semaphore;
125
126/// Status which the read locks could possibly see.
127#[derive(Debug, Copy, Clone, PartialEq, Eq, IntoStaticStr)]
128pub enum LockStatus {
129    /// Waiting for the writer to populate the asset
130    Waiting,
131    /// The writer finishes, readers can start
132    Done,
133    /// The writer encountered error, such as network issue. A new writer will be elected.
134    TransientError,
135    /// The writer observed that no cache lock is needed (e.g., uncacheable), readers should start
136    /// to fetch independently without a new writer
137    GiveUp,
138    /// The write lock is dropped without being unlocked
139    Dangling,
140    /// The lock is held for too long
141    Timeout,
142}
143
144impl From<LockStatus> for u8 {
145    fn from(l: LockStatus) -> u8 {
146        match l {
147            LockStatus::Waiting => 0,
148            LockStatus::Done => 1,
149            LockStatus::TransientError => 2,
150            LockStatus::GiveUp => 3,
151            LockStatus::Dangling => 4,
152            LockStatus::Timeout => 5,
153        }
154    }
155}
156
157impl From<u8> for LockStatus {
158    fn from(v: u8) -> Self {
159        match v {
160            0 => Self::Waiting,
161            1 => Self::Done,
162            2 => Self::TransientError,
163            3 => Self::GiveUp,
164            4 => Self::Dangling,
165            5 => Self::Timeout,
166            _ => Self::GiveUp, // placeholder
167        }
168    }
169}
170
171#[derive(Debug)]
172pub struct LockCore {
173    pub lock_start: Instant,
174    pub timeout: Duration,
175    pub(super) lock: Semaphore,
176    // use u8 for Atomic enum
177    lock_status: AtomicU8,
178}
179
180impl LockCore {
181    pub fn new_arc(timeout: Duration) -> Arc<Self> {
182        Arc::new(LockCore {
183            lock: Semaphore::new(0),
184            timeout,
185            lock_start: Instant::now(),
186            lock_status: AtomicU8::new(LockStatus::Waiting.into()),
187        })
188    }
189
190    pub fn locked(&self) -> bool {
191        self.lock.available_permits() == 0
192    }
193
194    pub fn unlock(&self, reason: LockStatus) {
195        self.lock_status.store(reason.into(), Ordering::SeqCst);
196        // Any small positive number will do, 10 is used for RwLock as well.
197        // No need to wake up all at once.
198        self.lock.add_permits(10);
199    }
200
201    pub fn lock_status(&self) -> LockStatus {
202        self.lock_status.load(Ordering::SeqCst).into()
203    }
204}
205
206// all 3 structs below are just Arc<LockCore> with different interfaces
207
208/// ReadLock: the requests who get it need to wait until it is released
209#[derive(Debug)]
210pub struct ReadLock(Arc<LockCore>);
211
212impl ReadLock {
213    /// Wait for the writer to release the lock
214    pub async fn wait(&self) {
215        if !self.locked() || self.expired() {
216            return;
217        }
218
219        // TODO: need to be careful not to wake everyone up at the same time
220        // (maybe not an issue because regular cache lock release behaves that way)
221        if let Some(duration) = self.0.timeout.checked_sub(self.0.lock_start.elapsed()) {
222            match timeout(duration, self.0.lock.acquire()).await {
223                Ok(Ok(_)) => { // permit is returned to Semaphore right away
224                }
225                Ok(Err(e)) => {
226                    warn!("error acquiring semaphore {e:?}")
227                }
228                Err(_) => {
229                    self.0
230                        .lock_status
231                        .store(LockStatus::Timeout.into(), Ordering::SeqCst);
232                }
233            }
234        }
235    }
236
237    /// Test if it is still locked
238    pub fn locked(&self) -> bool {
239        self.0.locked()
240    }
241
242    /// Whether the lock is expired, e.g., the writer has been holding the lock for too long
243    pub fn expired(&self) -> bool {
244        // NOTE: this is whether the lock is currently expired
245        // not whether it was timed out during wait()
246        self.0.lock_start.elapsed() >= self.0.timeout
247    }
248
249    /// The current status of the lock
250    pub fn lock_status(&self) -> LockStatus {
251        let status = self.0.lock_status();
252        if matches!(status, LockStatus::Waiting) && self.expired() {
253            LockStatus::Timeout
254        } else {
255            status
256        }
257    }
258}
259
260/// WritePermit: requires who get it need to populate the cache and then release it
261#[derive(Debug)]
262pub struct WritePermit {
263    lock: Arc<LockCore>,
264    finished: bool,
265}
266
267impl WritePermit {
268    pub fn new(timeout: Duration) -> (WritePermit, LockStub) {
269        let lock = LockCore::new_arc(timeout);
270        let stub = LockStub(lock.clone());
271        (
272            WritePermit {
273                lock,
274                finished: false,
275            },
276            stub,
277        )
278    }
279
280    pub fn unlock(&mut self, reason: LockStatus) {
281        self.finished = true;
282        self.lock.unlock(reason);
283    }
284}
285
286impl Drop for WritePermit {
287    fn drop(&mut self) {
288        // Writer exited without properly unlocking. We let others to compete for the write lock again
289        if !self.finished {
290            debug_assert!(false, "Dangling cache lock started!");
291            self.unlock(LockStatus::Dangling);
292        }
293    }
294}
295
296pub struct LockStub(pub Arc<LockCore>);
297impl LockStub {
298    pub fn read_lock(&self) -> ReadLock {
299        ReadLock(self.0.clone())
300    }
301}
302
303#[cfg(test)]
304mod test {
305    use super::*;
306    use crate::CacheKey;
307
308    #[test]
309    fn test_get_release() {
310        let cache_lock = CacheLock::new_boxed(Duration::from_secs(1000));
311        let key1 = CacheKey::new("", "a", "1");
312        let locked1 = cache_lock.lock(&key1);
313        assert!(locked1.is_write()); // write permit
314        let locked2 = cache_lock.lock(&key1);
315        assert!(!locked2.is_write()); // read lock
316        if let Locked::Write(permit) = locked1 {
317            cache_lock.release(&key1, permit, LockStatus::Done);
318        }
319        let locked3 = cache_lock.lock(&key1);
320        assert!(locked3.is_write()); // write permit again
321        if let Locked::Write(permit) = locked3 {
322            cache_lock.release(&key1, permit, LockStatus::Done);
323        }
324    }
325
326    #[tokio::test]
327    async fn test_lock() {
328        let cache_lock = CacheLock::new_boxed(Duration::from_secs(1000));
329        let key1 = CacheKey::new("", "a", "1");
330        let mut permit = match cache_lock.lock(&key1) {
331            Locked::Write(w) => w,
332            _ => panic!(),
333        };
334        let lock = match cache_lock.lock(&key1) {
335            Locked::Read(r) => r,
336            _ => panic!(),
337        };
338        assert!(lock.locked());
339        let handle = tokio::spawn(async move {
340            lock.wait().await;
341            assert_eq!(lock.lock_status(), LockStatus::Done);
342        });
343        permit.unlock(LockStatus::Done);
344        handle.await.unwrap(); // check lock is unlocked and the task is returned
345    }
346
347    #[tokio::test]
348    async fn test_lock_timeout() {
349        let cache_lock = CacheLock::new_boxed(Duration::from_secs(1));
350        let key1 = CacheKey::new("", "a", "1");
351        let mut permit = match cache_lock.lock(&key1) {
352            Locked::Write(w) => w,
353            _ => panic!(),
354        };
355        let lock = match cache_lock.lock(&key1) {
356            Locked::Read(r) => r,
357            _ => panic!(),
358        };
359        assert!(lock.locked());
360
361        let handle = tokio::spawn(async move {
362            // timed out
363            lock.wait().await;
364            assert_eq!(lock.lock_status(), LockStatus::Timeout);
365        });
366
367        tokio::time::sleep(Duration::from_secs(2)).await;
368
369        // expired lock
370        let lock2 = match cache_lock.lock(&key1) {
371            Locked::Read(r) => r,
372            _ => panic!(),
373        };
374        assert!(lock2.locked());
375        assert_eq!(lock2.lock_status(), LockStatus::Timeout);
376        lock2.wait().await;
377        assert_eq!(lock2.lock_status(), LockStatus::Timeout);
378
379        permit.unlock(LockStatus::Done);
380        handle.await.unwrap();
381    }
382
383    #[tokio::test]
384    async fn test_lock_concurrent() {
385        let _ = env_logger::builder().is_test(true).try_init();
386        // Test that concurrent attempts to compete for a lock run without issues
387        let cache_lock = Arc::new(CacheLock::new_boxed(Duration::from_secs(1)));
388        let key1 = CacheKey::new("", "a", "1");
389
390        let mut handles = vec![];
391
392        const READERS: usize = 30;
393        for _ in 0..READERS {
394            let key1 = key1.clone();
395            let cache_lock = cache_lock.clone();
396            // simulate a cache lookup / lock attempt loop
397            handles.push(tokio::spawn(async move {
398                // timed out
399                loop {
400                    match cache_lock.lock(&key1) {
401                        Locked::Write(permit) => {
402                            let _ = tokio::time::sleep(Duration::from_millis(5)).await;
403                            cache_lock.release(&key1, permit, LockStatus::Done);
404                            break;
405                        }
406                        Locked::Read(r) => {
407                            r.wait().await;
408                        }
409                    }
410                }
411            }));
412        }
413
414        for handle in handles {
415            handle.await.unwrap();
416        }
417    }
418}