Skip to main content

pingora_cache/
lock.rs

1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Cache lock
16
17use crate::{hashtable::ConcurrentHashTable, key::CacheHashKey, CacheKey};
18use crate::{Span, Tag};
19
20use http::Extensions;
21use pingora_timeout::timeout;
22use std::sync::Arc;
23use std::time::Duration;
24
25pub type CacheKeyLockImpl = dyn CacheKeyLock + Send + Sync;
26
27pub trait CacheKeyLock {
28    /// Try to lock a cache fetch
29    ///
30    /// If `stale_writer` is true, this fetch is to revalidate an asset already in cache.
31    /// Else this fetch was a cache miss (i.e. not found via lookup, or force missed).
32    ///
33    /// Users should call after a cache miss before fetching the asset.
34    /// The returned [Locked] will tell the caller either to fetch or wait.
35    fn lock(&self, key: &CacheKey, stale_writer: bool) -> Locked;
36
37    /// Release a lock for the given key
38    ///
39    /// When the write lock is dropped without being released, the read lock holders will consider
40    /// it to be failed so that they will compete for the write lock again.
41    fn release(&self, key: &CacheKey, permit: WritePermit, reason: LockStatus);
42
43    /// Set tags on a trace span for the cache lock wait.
44    fn trace_lock_wait(&self, span: &mut Span, _read_lock: &ReadLock, lock_status: LockStatus) {
45        let tag_value: &'static str = lock_status.into();
46        span.set_tag(|| Tag::new("status", tag_value));
47    }
48
49    /// Set a lock status for a custom `NoCacheReason`.
50    fn custom_lock_status(&self, _custom_no_cache: &'static str) -> LockStatus {
51        // treat custom no cache reasons as GiveUp by default
52        // (like OriginNotCache)
53        LockStatus::GiveUp
54    }
55}
56
57const N_SHARDS: usize = 16;
58
59/// The global cache locking manager
60#[derive(Debug)]
61pub struct CacheLock {
62    lock_table: ConcurrentHashTable<LockStub, N_SHARDS>,
63    // fixed lock timeout values for now
64    age_timeout_default: Duration,
65}
66
67/// A struct representing locked cache access
68#[derive(Debug)]
69pub enum Locked {
70    /// The writer is allowed to fetch the asset
71    Write(WritePermit),
72    /// The reader waits for the writer to fetch the asset
73    Read(ReadLock),
74}
75
76impl Locked {
77    /// Is this a write lock
78    pub fn is_write(&self) -> bool {
79        matches!(self, Self::Write(_))
80    }
81}
82
83impl CacheLock {
84    /// Create a new [CacheLock] with the given lock timeout
85    ///
86    /// Age timeout refers to how long a writer has been holding onto a particular lock, and wait
87    /// timeout refers to how long a reader may hold onto any number of locks before giving up.
88    /// When either timeout is reached, the read locks are automatically unlocked.
89    pub fn new_boxed(age_timeout: Duration) -> Box<Self> {
90        Box::new(CacheLock {
91            lock_table: ConcurrentHashTable::new(),
92            age_timeout_default: age_timeout,
93        })
94    }
95
96    /// Create a new [CacheLock] with the given lock timeout
97    ///
98    /// Age timeout refers to how long a writer has been holding onto a particular lock, and wait
99    /// timeout refers to how long a reader may hold onto any number of locks before giving up.
100    /// When either timeout is reached, the read locks are automatically unlocked.
101    pub fn new(age_timeout_default: Duration) -> Self {
102        CacheLock {
103            lock_table: ConcurrentHashTable::new(),
104            age_timeout_default,
105        }
106    }
107}
108
109impl CacheKeyLock for CacheLock {
110    fn lock(&self, key: &CacheKey, stale_writer: bool) -> Locked {
111        let hash = key.combined_bin();
112        let key = u128::from_be_bytes(hash); // endianness doesn't matter
113        let table = self.lock_table.get(key);
114        if let Some(lock) = table.read().get(&key) {
115            // already has an ongoing request
116            // If the lock status is dangling or timeout, the lock will _remain_ in the table
117            // and readers should attempt to replace it.
118            // In the case of writer timeout, any remaining readers that were waiting on THIS
119            // LockCore should have (or are about to) timed out on their own.
120            // Finding a Timeout status means that THIS writer's lock already expired, so future
121            // requests ought to recreate the lock.
122            if !matches!(
123                lock.0.lock_status(),
124                LockStatus::Dangling | LockStatus::AgeTimeout
125            ) {
126                return Locked::Read(lock.read_lock());
127            }
128            // Dangling: the previous writer quit without unlocking the lock. Requests should
129            // compete for the write lock again.
130        }
131
132        let mut table = table.write();
133        // check again in case another request already added it
134        if let Some(lock) = table.get(&key) {
135            if !matches!(
136                lock.0.lock_status(),
137                LockStatus::Dangling | LockStatus::AgeTimeout
138            ) {
139                return Locked::Read(lock.read_lock());
140            }
141        }
142        let (permit, stub) =
143            WritePermit::new(self.age_timeout_default, stale_writer, Extensions::new());
144        table.insert(key, stub);
145        Locked::Write(permit)
146    }
147
148    fn release(&self, key: &CacheKey, mut permit: WritePermit, reason: LockStatus) {
149        let hash = key.combined_bin();
150        let key = u128::from_be_bytes(hash); // endianness doesn't matter
151        if permit.lock.lock_status() == LockStatus::AgeTimeout {
152            // if lock age timed out, then readers are capable of
153            // replacing the lock associated with this permit from the lock table
154            // (see lock() implementation)
155            // keep the lock status as Timeout accordingly when unlocking
156            // (because we aren't removing it from the lock_table)
157            permit.unlock(LockStatus::AgeTimeout);
158        } else if let Some(_lock) = self.lock_table.write(key).remove(&key) {
159            permit.unlock(reason);
160        }
161        // these situations above should capture all possible options,
162        // else dangling cache lock may start
163    }
164}
165
166use log::warn;
167use std::sync::atomic::{AtomicU8, Ordering};
168use std::time::Instant;
169use strum::{FromRepr, IntoStaticStr};
170use tokio::sync::Semaphore;
171
172/// Status which the read locks could possibly see.
173#[derive(Debug, Copy, Clone, PartialEq, Eq, IntoStaticStr, FromRepr)]
174#[repr(u8)]
175pub enum LockStatus {
176    /// Waiting for the writer to populate the asset
177    Waiting = 0,
178    /// The writer finishes, readers can start
179    Done = 1,
180    /// The writer encountered error, such as network issue. A new writer will be elected.
181    TransientError = 2,
182    /// The writer observed that no cache lock is needed (e.g., uncacheable), readers should start
183    /// to fetch independently without a new writer
184    GiveUp = 3,
185    /// The write lock is dropped without being unlocked
186    Dangling = 4,
187    /// Reader has held onto cache locks for too long, give up
188    WaitTimeout = 5,
189    /// The lock is held for too long by the writer
190    AgeTimeout = 6,
191}
192
193impl From<LockStatus> for u8 {
194    fn from(l: LockStatus) -> u8 {
195        match l {
196            LockStatus::Waiting => 0,
197            LockStatus::Done => 1,
198            LockStatus::TransientError => 2,
199            LockStatus::GiveUp => 3,
200            LockStatus::Dangling => 4,
201            LockStatus::WaitTimeout => 5,
202            LockStatus::AgeTimeout => 6,
203        }
204    }
205}
206
207impl From<u8> for LockStatus {
208    fn from(v: u8) -> Self {
209        Self::from_repr(v).unwrap_or(Self::GiveUp)
210    }
211}
212
213#[derive(Debug)]
214pub struct LockCore {
215    pub lock_start: Instant,
216    pub age_timeout: Duration,
217    pub(super) lock: Semaphore,
218    // use u8 for Atomic enum
219    lock_status: AtomicU8,
220    stale_writer: bool,
221    extensions: Extensions,
222}
223
224impl LockCore {
225    pub fn new_arc(timeout: Duration, stale_writer: bool, extensions: Extensions) -> Arc<Self> {
226        Arc::new(LockCore {
227            lock: Semaphore::new(0),
228            age_timeout: timeout,
229            lock_start: Instant::now(),
230            lock_status: AtomicU8::new(LockStatus::Waiting.into()),
231            stale_writer,
232            extensions,
233        })
234    }
235
236    pub fn locked(&self) -> bool {
237        self.lock.available_permits() == 0
238    }
239
240    pub fn unlock(&self, reason: LockStatus) {
241        assert!(
242            reason != LockStatus::WaitTimeout,
243            "WaitTimeout is not stored in LockCore"
244        );
245        self.lock_status.store(reason.into(), Ordering::SeqCst);
246        // Any small positive number will do, 10 is used for RwLock as well.
247        // No need to wake up all at once.
248        self.lock.add_permits(10);
249    }
250
251    pub fn lock_status(&self) -> LockStatus {
252        self.lock_status.load(Ordering::SeqCst).into()
253    }
254
255    /// Was this lock for a stale cache fetch writer?
256    pub fn stale_writer(&self) -> bool {
257        self.stale_writer
258    }
259
260    pub fn extensions(&self) -> &Extensions {
261        &self.extensions
262    }
263}
264
265// all 3 structs below are just Arc<LockCore> with different interfaces
266
267/// ReadLock: the requests who get it need to wait until it is released
268#[derive(Debug)]
269pub struct ReadLock(Arc<LockCore>);
270
271impl ReadLock {
272    /// Wait for the writer to release the lock
273    pub async fn wait(&self) {
274        if !self.locked() {
275            return;
276        }
277
278        // FIXME: for now it is the awkward responsibility of the ReadLock to set the
279        // timeout status on the lock itself because the write permit cannot lock age
280        // timeout on its own
281        // TODO: need to be careful not to wake everyone up at the same time
282        // (maybe not an issue because regular cache lock release behaves that way)
283        if let Some(duration) = self.0.age_timeout.checked_sub(self.0.lock_start.elapsed()) {
284            match timeout(duration, self.0.lock.acquire()).await {
285                Ok(Ok(_)) => { // permit is returned to Semaphore right away
286                }
287                Ok(Err(e)) => {
288                    warn!("error acquiring semaphore {e:?}")
289                }
290                Err(_) => {
291                    self.0
292                        .lock_status
293                        .store(LockStatus::AgeTimeout.into(), Ordering::SeqCst);
294                }
295            }
296        } else {
297            // expiration has already occurred, store timeout status
298            self.0
299                .lock_status
300                .store(LockStatus::AgeTimeout.into(), Ordering::SeqCst);
301        }
302    }
303
304    /// Test if it is still locked
305    pub fn locked(&self) -> bool {
306        self.0.locked()
307    }
308
309    /// Whether the lock is expired, e.g., the writer has been holding the lock for too long
310    pub fn expired(&self) -> bool {
311        // NOTE: this is whether the lock is currently expired
312        // not whether it was timed out during wait()
313        self.0.lock_start.elapsed() >= self.0.age_timeout
314    }
315
316    /// The current status of the lock
317    pub fn lock_status(&self) -> LockStatus {
318        let status = self.0.lock_status();
319        if matches!(status, LockStatus::Waiting) && self.expired() {
320            LockStatus::AgeTimeout
321        } else {
322            status
323        }
324    }
325
326    pub fn extensions(&self) -> &Extensions {
327        self.0.extensions()
328    }
329}
330
331/// WritePermit: requires who get it need to populate the cache and then release it
332#[derive(Debug)]
333pub struct WritePermit {
334    lock: Arc<LockCore>,
335    finished: bool,
336}
337
338impl WritePermit {
339    /// Create a new lock, with a permit to be given to the associated writer.
340    pub fn new(
341        timeout: Duration,
342        stale_writer: bool,
343        extensions: Extensions,
344    ) -> (WritePermit, LockStub) {
345        let lock = LockCore::new_arc(timeout, stale_writer, extensions);
346        let stub = LockStub(lock.clone());
347        (
348            WritePermit {
349                lock,
350                finished: false,
351            },
352            stub,
353        )
354    }
355
356    /// Was this lock for a stale cache fetch writer?
357    pub fn stale_writer(&self) -> bool {
358        self.lock.stale_writer()
359    }
360
361    pub fn unlock(&mut self, reason: LockStatus) {
362        self.finished = true;
363        self.lock.unlock(reason);
364    }
365
366    pub fn lock_status(&self) -> LockStatus {
367        self.lock.lock_status()
368    }
369
370    pub fn extensions(&self) -> &Extensions {
371        self.lock.extensions()
372    }
373}
374
375impl Drop for WritePermit {
376    fn drop(&mut self) {
377        // Writer exited without properly unlocking. We let others to compete for the write lock again
378        if !self.finished {
379            debug_assert!(false, "Dangling cache lock started!");
380            self.unlock(LockStatus::Dangling);
381        }
382    }
383}
384
385#[derive(Debug)]
386pub struct LockStub(pub Arc<LockCore>);
387impl LockStub {
388    pub fn read_lock(&self) -> ReadLock {
389        ReadLock(self.0.clone())
390    }
391
392    pub fn extensions(&self) -> &Extensions {
393        &self.0.extensions
394    }
395}
396
397#[cfg(test)]
398mod test {
399    use super::*;
400    use crate::CacheKey;
401
402    #[test]
403    fn test_get_release() {
404        let cache_lock = CacheLock::new_boxed(Duration::from_secs(1000));
405        let key1 = CacheKey::new("", "a", "1");
406        let locked1 = cache_lock.lock(&key1, false);
407        assert!(locked1.is_write()); // write permit
408        let locked2 = cache_lock.lock(&key1, false);
409        assert!(!locked2.is_write()); // read lock
410        if let Locked::Write(permit) = locked1 {
411            cache_lock.release(&key1, permit, LockStatus::Done);
412        }
413        let locked3 = cache_lock.lock(&key1, false);
414        assert!(locked3.is_write()); // write permit again
415        if let Locked::Write(permit) = locked3 {
416            cache_lock.release(&key1, permit, LockStatus::Done);
417        }
418    }
419
420    #[tokio::test]
421    async fn test_lock() {
422        let cache_lock = CacheLock::new_boxed(Duration::from_secs(1000));
423        let key1 = CacheKey::new("", "a", "1");
424        let mut permit = match cache_lock.lock(&key1, false) {
425            Locked::Write(w) => w,
426            _ => panic!(),
427        };
428        let lock = match cache_lock.lock(&key1, false) {
429            Locked::Read(r) => r,
430            _ => panic!(),
431        };
432        assert!(lock.locked());
433        let handle = tokio::spawn(async move {
434            lock.wait().await;
435            assert_eq!(lock.lock_status(), LockStatus::Done);
436        });
437        permit.unlock(LockStatus::Done);
438        handle.await.unwrap(); // check lock is unlocked and the task is returned
439    }
440
441    #[tokio::test]
442    async fn test_lock_timeout() {
443        let cache_lock = CacheLock::new_boxed(Duration::from_secs(1));
444        let key1 = CacheKey::new("", "a", "1");
445        let mut permit = match cache_lock.lock(&key1, false) {
446            Locked::Write(w) => w,
447            _ => panic!(),
448        };
449        let lock = match cache_lock.lock(&key1, false) {
450            Locked::Read(r) => r,
451            _ => panic!(),
452        };
453        assert!(lock.locked());
454
455        let handle = tokio::spawn(async move {
456            // timed out
457            lock.wait().await;
458            assert_eq!(lock.lock_status(), LockStatus::AgeTimeout);
459        });
460
461        tokio::time::sleep(Duration::from_millis(2100)).await;
462
463        handle.await.unwrap(); // check lock is timed out
464
465        // expired lock - we will be able to install a new lock instead
466        let mut permit2 = match cache_lock.lock(&key1, false) {
467            Locked::Write(w) => w,
468            _ => panic!(),
469        };
470        let lock2 = match cache_lock.lock(&key1, false) {
471            Locked::Read(r) => r,
472            _ => panic!(),
473        };
474        assert!(lock2.locked());
475        let handle = tokio::spawn(async move {
476            // timed out
477            lock2.wait().await;
478            assert_eq!(lock2.lock_status(), LockStatus::Done);
479        });
480
481        permit.unlock(LockStatus::Done);
482        permit2.unlock(LockStatus::Done);
483        handle.await.unwrap();
484    }
485
486    #[tokio::test]
487    async fn test_lock_expired_release() {
488        let cache_lock = CacheLock::new_boxed(Duration::from_secs(1));
489        let key1 = CacheKey::new("", "a", "1");
490        let permit = match cache_lock.lock(&key1, false) {
491            Locked::Write(w) => w,
492            _ => panic!(),
493        };
494
495        let lock = match cache_lock.lock(&key1, false) {
496            Locked::Read(r) => r,
497            _ => panic!(),
498        };
499        assert!(lock.locked());
500        let handle = tokio::spawn(async move {
501            // timed out
502            lock.wait().await;
503            assert_eq!(lock.lock_status(), LockStatus::AgeTimeout);
504        });
505
506        tokio::time::sleep(Duration::from_millis(1100)).await; // let lock age time out
507        handle.await.unwrap(); // check lock is timed out
508
509        // writer finally finishes
510        cache_lock.release(&key1, permit, LockStatus::Done);
511
512        // can reacquire after release
513        let mut permit = match cache_lock.lock(&key1, false) {
514            Locked::Write(w) => w,
515            _ => panic!(),
516        };
517        assert_eq!(permit.lock.lock_status(), LockStatus::Waiting);
518
519        let lock2 = match cache_lock.lock(&key1, false) {
520            Locked::Read(r) => r,
521            _ => panic!(),
522        };
523        assert!(lock2.locked());
524        let handle = tokio::spawn(async move {
525            // timed out
526            lock2.wait().await;
527            assert_eq!(lock2.lock_status(), LockStatus::Done);
528        });
529
530        permit.unlock(LockStatus::Done);
531        handle.await.unwrap();
532    }
533
534    #[tokio::test]
535    async fn test_lock_expired_no_reader() {
536        let cache_lock = CacheLock::new_boxed(Duration::from_secs(1));
537        let key1 = CacheKey::new("", "a", "1");
538        let mut permit = match cache_lock.lock(&key1, false) {
539            Locked::Write(w) => w,
540            _ => panic!(),
541        };
542        tokio::time::sleep(Duration::from_millis(1100)).await; // let lock age time out
543
544        // lock expired without reader, but status is not yet set
545        assert_eq!(permit.lock.lock_status(), LockStatus::Waiting);
546
547        let lock = match cache_lock.lock(&key1, false) {
548            Locked::Read(r) => r,
549            _ => panic!(),
550        };
551        // reader expires write permit
552        lock.wait().await;
553        assert_eq!(lock.lock_status(), LockStatus::AgeTimeout);
554        assert_eq!(permit.lock.lock_status(), LockStatus::AgeTimeout);
555        permit.unlock(LockStatus::AgeTimeout);
556    }
557
558    #[tokio::test]
559    async fn test_lock_concurrent() {
560        let _ = env_logger::builder().is_test(true).try_init();
561        // Test that concurrent attempts to compete for a lock run without issues
562        let cache_lock = Arc::new(CacheLock::new_boxed(Duration::from_secs(1)));
563        let key1 = CacheKey::new("", "a", "1");
564
565        let mut handles = vec![];
566
567        const READERS: usize = 30;
568        for _ in 0..READERS {
569            let key1 = key1.clone();
570            let cache_lock = cache_lock.clone();
571            // simulate a cache lookup / lock attempt loop
572            handles.push(tokio::spawn(async move {
573                // timed out
574                loop {
575                    match cache_lock.lock(&key1, false) {
576                        Locked::Write(permit) => {
577                            let _ = tokio::time::sleep(Duration::from_millis(5)).await;
578                            cache_lock.release(&key1, permit, LockStatus::Done);
579                            break;
580                        }
581                        Locked::Read(r) => {
582                            r.wait().await;
583                        }
584                    }
585                }
586            }));
587        }
588
589        for handle in handles {
590            handle.await.unwrap();
591        }
592    }
593}