1use crate::{hashtable::ConcurrentHashTable, key::CacheHashKey, CacheKey};
18
19use pingora_timeout::timeout;
20use std::sync::Arc;
21use std::time::Duration;
22
23pub type CacheKeyLockImpl = (dyn CacheKeyLock + Send + Sync);
24
25pub trait CacheKeyLock {
26 fn lock(&self, key: &CacheKey) -> Locked;
31
32 fn release(&self, key: &CacheKey, permit: WritePermit, reason: LockStatus);
37}
38
39const N_SHARDS: usize = 16;
40
41#[derive(Debug)]
43pub struct CacheLock {
44 lock_table: ConcurrentHashTable<LockStub, N_SHARDS>,
45 age_timeout_default: Duration,
47}
48
49#[derive(Debug)]
51pub enum Locked {
52 Write(WritePermit),
54 Read(ReadLock),
56}
57
58impl Locked {
59 pub fn is_write(&self) -> bool {
61 matches!(self, Self::Write(_))
62 }
63}
64
65impl CacheLock {
66 pub fn new_boxed(age_timeout: Duration) -> Box<Self> {
72 Box::new(CacheLock {
73 lock_table: ConcurrentHashTable::new(),
74 age_timeout_default: age_timeout,
75 })
76 }
77
78 pub fn new(age_timeout_default: Duration) -> Self {
84 CacheLock {
85 lock_table: ConcurrentHashTable::new(),
86 age_timeout_default,
87 }
88 }
89}
90
91impl CacheKeyLock for CacheLock {
92 fn lock(&self, key: &CacheKey) -> Locked {
93 let hash = key.combined_bin();
94 let key = u128::from_be_bytes(hash); let table = self.lock_table.get(key);
96 if let Some(lock) = table.read().get(&key) {
97 if !matches!(
105 lock.0.lock_status(),
106 LockStatus::Dangling | LockStatus::Timeout
107 ) {
108 return Locked::Read(lock.read_lock());
109 }
110 }
113
114 let mut table = table.write();
115 if let Some(lock) = table.get(&key) {
117 if !matches!(
118 lock.0.lock_status(),
119 LockStatus::Dangling | LockStatus::Timeout
120 ) {
121 return Locked::Read(lock.read_lock());
122 }
123 }
124 let (permit, stub) = WritePermit::new(self.age_timeout_default);
125 table.insert(key, stub);
126 Locked::Write(permit)
127 }
128
129 fn release(&self, key: &CacheKey, mut permit: WritePermit, reason: LockStatus) {
130 let hash = key.combined_bin();
131 let key = u128::from_be_bytes(hash); if permit.lock.lock_status() == LockStatus::Timeout {
133 permit.unlock(LockStatus::Timeout);
139 } else if let Some(_lock) = self.lock_table.write(key).remove(&key) {
140 permit.unlock(reason);
141 }
142 }
145}
146
147use log::warn;
148use std::sync::atomic::{AtomicU8, Ordering};
149use std::time::Instant;
150use strum::IntoStaticStr;
151use tokio::sync::Semaphore;
152
153#[derive(Debug, Copy, Clone, PartialEq, Eq, IntoStaticStr)]
155pub enum LockStatus {
156 Waiting,
158 Done,
160 TransientError,
162 GiveUp,
165 Dangling,
167 Timeout,
169}
170
171impl From<LockStatus> for u8 {
172 fn from(l: LockStatus) -> u8 {
173 match l {
174 LockStatus::Waiting => 0,
175 LockStatus::Done => 1,
176 LockStatus::TransientError => 2,
177 LockStatus::GiveUp => 3,
178 LockStatus::Dangling => 4,
179 LockStatus::Timeout => 5,
180 }
181 }
182}
183
184impl From<u8> for LockStatus {
185 fn from(v: u8) -> Self {
186 match v {
187 0 => Self::Waiting,
188 1 => Self::Done,
189 2 => Self::TransientError,
190 3 => Self::GiveUp,
191 4 => Self::Dangling,
192 5 => Self::Timeout,
193 _ => Self::GiveUp, }
195 }
196}
197
198#[derive(Debug)]
199pub struct LockCore {
200 pub lock_start: Instant,
201 pub age_timeout: Duration,
202 pub(super) lock: Semaphore,
203 lock_status: AtomicU8,
205}
206
207impl LockCore {
208 pub fn new_arc(timeout: Duration) -> Arc<Self> {
209 Arc::new(LockCore {
210 lock: Semaphore::new(0),
211 age_timeout: timeout,
212 lock_start: Instant::now(),
213 lock_status: AtomicU8::new(LockStatus::Waiting.into()),
214 })
215 }
216
217 pub fn locked(&self) -> bool {
218 self.lock.available_permits() == 0
219 }
220
221 pub fn unlock(&self, reason: LockStatus) {
222 self.lock_status.store(reason.into(), Ordering::SeqCst);
223 self.lock.add_permits(10);
226 }
227
228 pub fn lock_status(&self) -> LockStatus {
229 self.lock_status.load(Ordering::SeqCst).into()
230 }
231}
232
233#[derive(Debug)]
237pub struct ReadLock(Arc<LockCore>);
238
239impl ReadLock {
240 pub async fn wait(&self) {
242 if !self.locked() {
243 return;
244 }
245
246 if let Some(duration) = self.0.age_timeout.checked_sub(self.0.lock_start.elapsed()) {
252 match timeout(duration, self.0.lock.acquire()).await {
253 Ok(Ok(_)) => { }
255 Ok(Err(e)) => {
256 warn!("error acquiring semaphore {e:?}")
257 }
258 Err(_) => {
259 self.0
260 .lock_status
261 .store(LockStatus::Timeout.into(), Ordering::SeqCst);
262 }
263 }
264 } else {
265 self.0
267 .lock_status
268 .store(LockStatus::Timeout.into(), Ordering::SeqCst);
269 }
270 }
271
272 pub fn locked(&self) -> bool {
274 self.0.locked()
275 }
276
277 pub fn expired(&self) -> bool {
279 self.0.lock_start.elapsed() >= self.0.age_timeout
282 }
283
284 pub fn lock_status(&self) -> LockStatus {
286 let status = self.0.lock_status();
287 if matches!(status, LockStatus::Waiting) && self.expired() {
288 LockStatus::Timeout
289 } else {
290 status
291 }
292 }
293}
294
295#[derive(Debug)]
297pub struct WritePermit {
298 lock: Arc<LockCore>,
299 finished: bool,
300}
301
302impl WritePermit {
303 pub fn new(timeout: Duration) -> (WritePermit, LockStub) {
304 let lock = LockCore::new_arc(timeout);
305 let stub = LockStub(lock.clone());
306 (
307 WritePermit {
308 lock,
309 finished: false,
310 },
311 stub,
312 )
313 }
314
315 pub fn unlock(&mut self, reason: LockStatus) {
316 self.finished = true;
317 self.lock.unlock(reason);
318 }
319}
320
321impl Drop for WritePermit {
322 fn drop(&mut self) {
323 if !self.finished {
325 debug_assert!(false, "Dangling cache lock started!");
326 self.unlock(LockStatus::Dangling);
327 }
328 }
329}
330
331#[derive(Debug)]
332pub struct LockStub(pub Arc<LockCore>);
333impl LockStub {
334 pub fn read_lock(&self) -> ReadLock {
335 ReadLock(self.0.clone())
336 }
337}
338
339#[cfg(test)]
340mod test {
341 use super::*;
342 use crate::CacheKey;
343
344 #[test]
345 fn test_get_release() {
346 let cache_lock = CacheLock::new_boxed(Duration::from_secs(1000));
347 let key1 = CacheKey::new("", "a", "1");
348 let locked1 = cache_lock.lock(&key1);
349 assert!(locked1.is_write()); let locked2 = cache_lock.lock(&key1);
351 assert!(!locked2.is_write()); if let Locked::Write(permit) = locked1 {
353 cache_lock.release(&key1, permit, LockStatus::Done);
354 }
355 let locked3 = cache_lock.lock(&key1);
356 assert!(locked3.is_write()); if let Locked::Write(permit) = locked3 {
358 cache_lock.release(&key1, permit, LockStatus::Done);
359 }
360 }
361
362 #[tokio::test]
363 async fn test_lock() {
364 let cache_lock = CacheLock::new_boxed(Duration::from_secs(1000));
365 let key1 = CacheKey::new("", "a", "1");
366 let mut permit = match cache_lock.lock(&key1) {
367 Locked::Write(w) => w,
368 _ => panic!(),
369 };
370 let lock = match cache_lock.lock(&key1) {
371 Locked::Read(r) => r,
372 _ => panic!(),
373 };
374 assert!(lock.locked());
375 let handle = tokio::spawn(async move {
376 lock.wait().await;
377 assert_eq!(lock.lock_status(), LockStatus::Done);
378 });
379 permit.unlock(LockStatus::Done);
380 handle.await.unwrap(); }
382
383 #[tokio::test]
384 async fn test_lock_timeout() {
385 let cache_lock = CacheLock::new_boxed(Duration::from_secs(1));
386 let key1 = CacheKey::new("", "a", "1");
387 let mut permit = match cache_lock.lock(&key1) {
388 Locked::Write(w) => w,
389 _ => panic!(),
390 };
391 let lock = match cache_lock.lock(&key1) {
392 Locked::Read(r) => r,
393 _ => panic!(),
394 };
395 assert!(lock.locked());
396
397 let handle = tokio::spawn(async move {
398 lock.wait().await;
400 assert_eq!(lock.lock_status(), LockStatus::Timeout);
401 });
402
403 tokio::time::sleep(Duration::from_millis(2100)).await;
404
405 handle.await.unwrap(); let mut permit2 = match cache_lock.lock(&key1) {
409 Locked::Write(w) => w,
410 _ => panic!(),
411 };
412 let lock2 = match cache_lock.lock(&key1) {
413 Locked::Read(r) => r,
414 _ => panic!(),
415 };
416 assert!(lock2.locked());
417 let handle = tokio::spawn(async move {
418 lock2.wait().await;
420 assert_eq!(lock2.lock_status(), LockStatus::Done);
421 });
422
423 permit.unlock(LockStatus::Done);
424 permit2.unlock(LockStatus::Done);
425 handle.await.unwrap();
426 }
427
428 #[tokio::test]
429 async fn test_lock_expired_release() {
430 let cache_lock = CacheLock::new_boxed(Duration::from_secs(1));
431 let key1 = CacheKey::new("", "a", "1");
432 let permit = match cache_lock.lock(&key1) {
433 Locked::Write(w) => w,
434 _ => panic!(),
435 };
436
437 let lock = match cache_lock.lock(&key1) {
438 Locked::Read(r) => r,
439 _ => panic!(),
440 };
441 assert!(lock.locked());
442 let handle = tokio::spawn(async move {
443 lock.wait().await;
445 assert_eq!(lock.lock_status(), LockStatus::Timeout);
446 });
447
448 tokio::time::sleep(Duration::from_millis(1100)).await; handle.await.unwrap(); cache_lock.release(&key1, permit, LockStatus::Done);
453
454 let mut permit = match cache_lock.lock(&key1) {
456 Locked::Write(w) => w,
457 _ => panic!(),
458 };
459 assert_eq!(permit.lock.lock_status(), LockStatus::Waiting);
460
461 let lock2 = match cache_lock.lock(&key1) {
462 Locked::Read(r) => r,
463 _ => panic!(),
464 };
465 assert!(lock2.locked());
466 let handle = tokio::spawn(async move {
467 lock2.wait().await;
469 assert_eq!(lock2.lock_status(), LockStatus::Done);
470 });
471
472 permit.unlock(LockStatus::Done);
473 handle.await.unwrap();
474 }
475
476 #[tokio::test]
477 async fn test_lock_expired_no_reader() {
478 let cache_lock = CacheLock::new_boxed(Duration::from_secs(1));
479 let key1 = CacheKey::new("", "a", "1");
480 let mut permit = match cache_lock.lock(&key1) {
481 Locked::Write(w) => w,
482 _ => panic!(),
483 };
484 tokio::time::sleep(Duration::from_millis(1100)).await; assert_eq!(permit.lock.lock_status(), LockStatus::Waiting);
488
489 let lock = match cache_lock.lock(&key1) {
490 Locked::Read(r) => r,
491 _ => panic!(),
492 };
493 lock.wait().await;
495 assert_eq!(lock.lock_status(), LockStatus::Timeout);
496 assert_eq!(permit.lock.lock_status(), LockStatus::Timeout);
497 permit.unlock(LockStatus::Timeout);
498 }
499
500 #[tokio::test]
501 async fn test_lock_concurrent() {
502 let _ = env_logger::builder().is_test(true).try_init();
503 let cache_lock = Arc::new(CacheLock::new_boxed(Duration::from_secs(1)));
505 let key1 = CacheKey::new("", "a", "1");
506
507 let mut handles = vec![];
508
509 const READERS: usize = 30;
510 for _ in 0..READERS {
511 let key1 = key1.clone();
512 let cache_lock = cache_lock.clone();
513 handles.push(tokio::spawn(async move {
515 loop {
517 match cache_lock.lock(&key1) {
518 Locked::Write(permit) => {
519 let _ = tokio::time::sleep(Duration::from_millis(5)).await;
520 cache_lock.release(&key1, permit, LockStatus::Done);
521 break;
522 }
523 Locked::Read(r) => {
524 r.wait().await;
525 }
526 }
527 }
528 }));
529 }
530
531 for handle in handles {
532 handle.await.unwrap();
533 }
534 }
535}