pingora_cache/
lock.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Cache lock
16
17use crate::{hashtable::ConcurrentHashTable, key::CacheHashKey, CacheKey};
18
19use pingora_timeout::timeout;
20use std::sync::Arc;
21use std::time::Duration;
22
23pub type CacheKeyLockImpl = (dyn CacheKeyLock + Send + Sync);
24
25pub trait CacheKeyLock {
26    /// Try to lock a cache fetch
27    ///
28    /// Users should call after a cache miss before fetching the asset.
29    /// The returned [Locked] will tell the caller either to fetch or wait.
30    fn lock(&self, key: &CacheKey) -> Locked;
31
32    /// Release a lock for the given key
33    ///
34    /// When the write lock is dropped without being released, the read lock holders will consider
35    /// it to be failed so that they will compete for the write lock again.
36    fn release(&self, key: &CacheKey, permit: WritePermit, reason: LockStatus);
37}
38
39const N_SHARDS: usize = 16;
40
41/// The global cache locking manager
42#[derive(Debug)]
43pub struct CacheLock {
44    lock_table: ConcurrentHashTable<LockStub, N_SHARDS>,
45    // fixed lock timeout values for now
46    age_timeout_default: Duration,
47}
48
49/// A struct representing locked cache access
50#[derive(Debug)]
51pub enum Locked {
52    /// The writer is allowed to fetch the asset
53    Write(WritePermit),
54    /// The reader waits for the writer to fetch the asset
55    Read(ReadLock),
56}
57
58impl Locked {
59    /// Is this a write lock
60    pub fn is_write(&self) -> bool {
61        matches!(self, Self::Write(_))
62    }
63}
64
65impl CacheLock {
66    /// Create a new [CacheLock] with the given lock timeout
67    ///
68    /// Age timeout refers to how long a writer has been holding onto a particular lock, and wait
69    /// timeout refers to how long a reader may hold onto any number of locks before giving up.
70    /// When either timeout is reached, the read locks are automatically unlocked.
71    pub fn new_boxed(age_timeout: Duration) -> Box<Self> {
72        Box::new(CacheLock {
73            lock_table: ConcurrentHashTable::new(),
74            age_timeout_default: age_timeout,
75        })
76    }
77
78    /// Create a new [CacheLock] with the given lock timeout
79    ///
80    /// Age timeout refers to how long a writer has been holding onto a particular lock, and wait
81    /// timeout refers to how long a reader may hold onto any number of locks before giving up.
82    /// When either timeout is reached, the read locks are automatically unlocked.
83    pub fn new(age_timeout_default: Duration) -> Self {
84        CacheLock {
85            lock_table: ConcurrentHashTable::new(),
86            age_timeout_default,
87        }
88    }
89}
90
91impl CacheKeyLock for CacheLock {
92    fn lock(&self, key: &CacheKey) -> Locked {
93        let hash = key.combined_bin();
94        let key = u128::from_be_bytes(hash); // endianness doesn't matter
95        let table = self.lock_table.get(key);
96        if let Some(lock) = table.read().get(&key) {
97            // already has an ongoing request
98            // If the lock status is dangling or timeout, the lock will _remain_ in the table
99            // and readers should attempt to replace it.
100            // In the case of writer timeout, any remaining readers that were waiting on THIS
101            // LockCore should have (or are about to) timed out on their own.
102            // Finding a Timeout status means that THIS writer's lock already expired, so future
103            // requests ought to recreate the lock.
104            if !matches!(
105                lock.0.lock_status(),
106                LockStatus::Dangling | LockStatus::Timeout
107            ) {
108                return Locked::Read(lock.read_lock());
109            }
110            // Dangling: the previous writer quit without unlocking the lock. Requests should
111            // compete for the write lock again.
112        }
113
114        let mut table = table.write();
115        // check again in case another request already added it
116        if let Some(lock) = table.get(&key) {
117            if !matches!(
118                lock.0.lock_status(),
119                LockStatus::Dangling | LockStatus::Timeout
120            ) {
121                return Locked::Read(lock.read_lock());
122            }
123        }
124        let (permit, stub) = WritePermit::new(self.age_timeout_default);
125        table.insert(key, stub);
126        Locked::Write(permit)
127    }
128
129    fn release(&self, key: &CacheKey, mut permit: WritePermit, reason: LockStatus) {
130        let hash = key.combined_bin();
131        let key = u128::from_be_bytes(hash); // endianness doesn't matter
132        if permit.lock.lock_status() == LockStatus::Timeout {
133            // if lock age timed out, then readers are capable of
134            // replacing the lock associated with this permit from the lock table
135            // (see lock() implementation)
136            // keep the lock status as Timeout accordingly when unlocking
137            // (because we aren't removing it from the lock_table)
138            permit.unlock(LockStatus::Timeout);
139        } else if let Some(_lock) = self.lock_table.write(key).remove(&key) {
140            permit.unlock(reason);
141        }
142        // these situations above should capture all possible options,
143        // else dangling cache lock may start
144    }
145}
146
147use log::warn;
148use std::sync::atomic::{AtomicU8, Ordering};
149use std::time::Instant;
150use strum::IntoStaticStr;
151use tokio::sync::Semaphore;
152
153/// Status which the read locks could possibly see.
154#[derive(Debug, Copy, Clone, PartialEq, Eq, IntoStaticStr)]
155pub enum LockStatus {
156    /// Waiting for the writer to populate the asset
157    Waiting,
158    /// The writer finishes, readers can start
159    Done,
160    /// The writer encountered error, such as network issue. A new writer will be elected.
161    TransientError,
162    /// The writer observed that no cache lock is needed (e.g., uncacheable), readers should start
163    /// to fetch independently without a new writer
164    GiveUp,
165    /// The write lock is dropped without being unlocked
166    Dangling,
167    /// The lock is held for too long
168    Timeout,
169}
170
171impl From<LockStatus> for u8 {
172    fn from(l: LockStatus) -> u8 {
173        match l {
174            LockStatus::Waiting => 0,
175            LockStatus::Done => 1,
176            LockStatus::TransientError => 2,
177            LockStatus::GiveUp => 3,
178            LockStatus::Dangling => 4,
179            LockStatus::Timeout => 5,
180        }
181    }
182}
183
184impl From<u8> for LockStatus {
185    fn from(v: u8) -> Self {
186        match v {
187            0 => Self::Waiting,
188            1 => Self::Done,
189            2 => Self::TransientError,
190            3 => Self::GiveUp,
191            4 => Self::Dangling,
192            5 => Self::Timeout,
193            _ => Self::GiveUp, // placeholder
194        }
195    }
196}
197
198#[derive(Debug)]
199pub struct LockCore {
200    pub lock_start: Instant,
201    pub age_timeout: Duration,
202    pub(super) lock: Semaphore,
203    // use u8 for Atomic enum
204    lock_status: AtomicU8,
205}
206
207impl LockCore {
208    pub fn new_arc(timeout: Duration) -> Arc<Self> {
209        Arc::new(LockCore {
210            lock: Semaphore::new(0),
211            age_timeout: timeout,
212            lock_start: Instant::now(),
213            lock_status: AtomicU8::new(LockStatus::Waiting.into()),
214        })
215    }
216
217    pub fn locked(&self) -> bool {
218        self.lock.available_permits() == 0
219    }
220
221    pub fn unlock(&self, reason: LockStatus) {
222        self.lock_status.store(reason.into(), Ordering::SeqCst);
223        // Any small positive number will do, 10 is used for RwLock as well.
224        // No need to wake up all at once.
225        self.lock.add_permits(10);
226    }
227
228    pub fn lock_status(&self) -> LockStatus {
229        self.lock_status.load(Ordering::SeqCst).into()
230    }
231}
232
233// all 3 structs below are just Arc<LockCore> with different interfaces
234
235/// ReadLock: the requests who get it need to wait until it is released
236#[derive(Debug)]
237pub struct ReadLock(Arc<LockCore>);
238
239impl ReadLock {
240    /// Wait for the writer to release the lock
241    pub async fn wait(&self) {
242        if !self.locked() {
243            return;
244        }
245
246        // FIXME: for now it is the awkward responsibility of the ReadLock to set the
247        // timeout status on the lock itself because the write permit cannot lock age
248        // timeout on its own
249        // TODO: need to be careful not to wake everyone up at the same time
250        // (maybe not an issue because regular cache lock release behaves that way)
251        if let Some(duration) = self.0.age_timeout.checked_sub(self.0.lock_start.elapsed()) {
252            match timeout(duration, self.0.lock.acquire()).await {
253                Ok(Ok(_)) => { // permit is returned to Semaphore right away
254                }
255                Ok(Err(e)) => {
256                    warn!("error acquiring semaphore {e:?}")
257                }
258                Err(_) => {
259                    self.0
260                        .lock_status
261                        .store(LockStatus::Timeout.into(), Ordering::SeqCst);
262                }
263            }
264        } else {
265            // expiration has already occurred, store timeout status
266            self.0
267                .lock_status
268                .store(LockStatus::Timeout.into(), Ordering::SeqCst);
269        }
270    }
271
272    /// Test if it is still locked
273    pub fn locked(&self) -> bool {
274        self.0.locked()
275    }
276
277    /// Whether the lock is expired, e.g., the writer has been holding the lock for too long
278    pub fn expired(&self) -> bool {
279        // NOTE: this is whether the lock is currently expired
280        // not whether it was timed out during wait()
281        self.0.lock_start.elapsed() >= self.0.age_timeout
282    }
283
284    /// The current status of the lock
285    pub fn lock_status(&self) -> LockStatus {
286        let status = self.0.lock_status();
287        if matches!(status, LockStatus::Waiting) && self.expired() {
288            LockStatus::Timeout
289        } else {
290            status
291        }
292    }
293}
294
295/// WritePermit: requires who get it need to populate the cache and then release it
296#[derive(Debug)]
297pub struct WritePermit {
298    lock: Arc<LockCore>,
299    finished: bool,
300}
301
302impl WritePermit {
303    pub fn new(timeout: Duration) -> (WritePermit, LockStub) {
304        let lock = LockCore::new_arc(timeout);
305        let stub = LockStub(lock.clone());
306        (
307            WritePermit {
308                lock,
309                finished: false,
310            },
311            stub,
312        )
313    }
314
315    pub fn unlock(&mut self, reason: LockStatus) {
316        self.finished = true;
317        self.lock.unlock(reason);
318    }
319}
320
321impl Drop for WritePermit {
322    fn drop(&mut self) {
323        // Writer exited without properly unlocking. We let others to compete for the write lock again
324        if !self.finished {
325            debug_assert!(false, "Dangling cache lock started!");
326            self.unlock(LockStatus::Dangling);
327        }
328    }
329}
330
331#[derive(Debug)]
332pub struct LockStub(pub Arc<LockCore>);
333impl LockStub {
334    pub fn read_lock(&self) -> ReadLock {
335        ReadLock(self.0.clone())
336    }
337}
338
339#[cfg(test)]
340mod test {
341    use super::*;
342    use crate::CacheKey;
343
344    #[test]
345    fn test_get_release() {
346        let cache_lock = CacheLock::new_boxed(Duration::from_secs(1000));
347        let key1 = CacheKey::new("", "a", "1");
348        let locked1 = cache_lock.lock(&key1);
349        assert!(locked1.is_write()); // write permit
350        let locked2 = cache_lock.lock(&key1);
351        assert!(!locked2.is_write()); // read lock
352        if let Locked::Write(permit) = locked1 {
353            cache_lock.release(&key1, permit, LockStatus::Done);
354        }
355        let locked3 = cache_lock.lock(&key1);
356        assert!(locked3.is_write()); // write permit again
357        if let Locked::Write(permit) = locked3 {
358            cache_lock.release(&key1, permit, LockStatus::Done);
359        }
360    }
361
362    #[tokio::test]
363    async fn test_lock() {
364        let cache_lock = CacheLock::new_boxed(Duration::from_secs(1000));
365        let key1 = CacheKey::new("", "a", "1");
366        let mut permit = match cache_lock.lock(&key1) {
367            Locked::Write(w) => w,
368            _ => panic!(),
369        };
370        let lock = match cache_lock.lock(&key1) {
371            Locked::Read(r) => r,
372            _ => panic!(),
373        };
374        assert!(lock.locked());
375        let handle = tokio::spawn(async move {
376            lock.wait().await;
377            assert_eq!(lock.lock_status(), LockStatus::Done);
378        });
379        permit.unlock(LockStatus::Done);
380        handle.await.unwrap(); // check lock is unlocked and the task is returned
381    }
382
383    #[tokio::test]
384    async fn test_lock_timeout() {
385        let cache_lock = CacheLock::new_boxed(Duration::from_secs(1));
386        let key1 = CacheKey::new("", "a", "1");
387        let mut permit = match cache_lock.lock(&key1) {
388            Locked::Write(w) => w,
389            _ => panic!(),
390        };
391        let lock = match cache_lock.lock(&key1) {
392            Locked::Read(r) => r,
393            _ => panic!(),
394        };
395        assert!(lock.locked());
396
397        let handle = tokio::spawn(async move {
398            // timed out
399            lock.wait().await;
400            assert_eq!(lock.lock_status(), LockStatus::Timeout);
401        });
402
403        tokio::time::sleep(Duration::from_millis(2100)).await;
404
405        handle.await.unwrap(); // check lock is timed out
406
407        // expired lock - we will be able to install a new lock instead
408        let mut permit2 = match cache_lock.lock(&key1) {
409            Locked::Write(w) => w,
410            _ => panic!(),
411        };
412        let lock2 = match cache_lock.lock(&key1) {
413            Locked::Read(r) => r,
414            _ => panic!(),
415        };
416        assert!(lock2.locked());
417        let handle = tokio::spawn(async move {
418            // timed out
419            lock2.wait().await;
420            assert_eq!(lock2.lock_status(), LockStatus::Done);
421        });
422
423        permit.unlock(LockStatus::Done);
424        permit2.unlock(LockStatus::Done);
425        handle.await.unwrap();
426    }
427
428    #[tokio::test]
429    async fn test_lock_expired_release() {
430        let cache_lock = CacheLock::new_boxed(Duration::from_secs(1));
431        let key1 = CacheKey::new("", "a", "1");
432        let permit = match cache_lock.lock(&key1) {
433            Locked::Write(w) => w,
434            _ => panic!(),
435        };
436
437        let lock = match cache_lock.lock(&key1) {
438            Locked::Read(r) => r,
439            _ => panic!(),
440        };
441        assert!(lock.locked());
442        let handle = tokio::spawn(async move {
443            // timed out
444            lock.wait().await;
445            assert_eq!(lock.lock_status(), LockStatus::Timeout);
446        });
447
448        tokio::time::sleep(Duration::from_millis(1100)).await; // let lock age time out
449        handle.await.unwrap(); // check lock is timed out
450
451        // writer finally finishes
452        cache_lock.release(&key1, permit, LockStatus::Done);
453
454        // can reacquire after release
455        let mut permit = match cache_lock.lock(&key1) {
456            Locked::Write(w) => w,
457            _ => panic!(),
458        };
459        assert_eq!(permit.lock.lock_status(), LockStatus::Waiting);
460
461        let lock2 = match cache_lock.lock(&key1) {
462            Locked::Read(r) => r,
463            _ => panic!(),
464        };
465        assert!(lock2.locked());
466        let handle = tokio::spawn(async move {
467            // timed out
468            lock2.wait().await;
469            assert_eq!(lock2.lock_status(), LockStatus::Done);
470        });
471
472        permit.unlock(LockStatus::Done);
473        handle.await.unwrap();
474    }
475
476    #[tokio::test]
477    async fn test_lock_expired_no_reader() {
478        let cache_lock = CacheLock::new_boxed(Duration::from_secs(1));
479        let key1 = CacheKey::new("", "a", "1");
480        let mut permit = match cache_lock.lock(&key1) {
481            Locked::Write(w) => w,
482            _ => panic!(),
483        };
484        tokio::time::sleep(Duration::from_millis(1100)).await; // let lock age time out
485
486        // lock expired without reader, but status is not yet set
487        assert_eq!(permit.lock.lock_status(), LockStatus::Waiting);
488
489        let lock = match cache_lock.lock(&key1) {
490            Locked::Read(r) => r,
491            _ => panic!(),
492        };
493        // reader expires write permit
494        lock.wait().await;
495        assert_eq!(lock.lock_status(), LockStatus::Timeout);
496        assert_eq!(permit.lock.lock_status(), LockStatus::Timeout);
497        permit.unlock(LockStatus::Timeout);
498    }
499
500    #[tokio::test]
501    async fn test_lock_concurrent() {
502        let _ = env_logger::builder().is_test(true).try_init();
503        // Test that concurrent attempts to compete for a lock run without issues
504        let cache_lock = Arc::new(CacheLock::new_boxed(Duration::from_secs(1)));
505        let key1 = CacheKey::new("", "a", "1");
506
507        let mut handles = vec![];
508
509        const READERS: usize = 30;
510        for _ in 0..READERS {
511            let key1 = key1.clone();
512            let cache_lock = cache_lock.clone();
513            // simulate a cache lookup / lock attempt loop
514            handles.push(tokio::spawn(async move {
515                // timed out
516                loop {
517                    match cache_lock.lock(&key1) {
518                        Locked::Write(permit) => {
519                            let _ = tokio::time::sleep(Duration::from_millis(5)).await;
520                            cache_lock.release(&key1, permit, LockStatus::Done);
521                            break;
522                        }
523                        Locked::Read(r) => {
524                            r.wait().await;
525                        }
526                    }
527                }
528            }));
529        }
530
531        for handle in handles {
532            handle.await.unwrap();
533        }
534    }
535}