datafusion_execution/cache/
lru_queue.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{
19    collections::HashMap,
20    hash::Hash,
21    sync::{Arc, Weak},
22};
23
24use parking_lot::Mutex;
25
26#[derive(Default)]
27/// Provides a Least Recently Used queue with unbounded capacity.
28///
29/// # Examples
30///
31/// ```
32/// use datafusion_execution::cache::lru_queue::LruQueue;
33///
34/// let mut lru_queue: LruQueue<i32, i32> = LruQueue::new();
35/// lru_queue.put(1, 10);
36/// lru_queue.put(2, 20);
37/// lru_queue.put(3, 30);
38/// assert_eq!(lru_queue.get(&2), Some(&20));
39/// assert_eq!(lru_queue.pop(), Some((1, 10)));
40/// assert_eq!(lru_queue.pop(), Some((3, 30)));
41/// assert_eq!(lru_queue.pop(), Some((2, 20)));
42/// assert_eq!(lru_queue.pop(), None);
43/// ```
44pub struct LruQueue<K: Eq + Hash + Clone, V> {
45    data: LruData<K, V>,
46    queue: LruList<K>,
47}
48
49/// Maps the key to the [`LruNode`] in queue and the value.
50type LruData<K, V> = HashMap<K, (Arc<Mutex<LruNode<K>>>, V)>;
51
52#[derive(Default)]
53/// Doubly-linked list that maintains the LRU order
54struct LruList<K> {
55    head: Link<K>,
56    tail: Link<K>,
57}
58
59/// Doubly-linked list node.
60struct LruNode<K> {
61    key: K,
62    prev: Link<K>,
63    next: Link<K>,
64}
65
66/// Weak pointer to a [`LruNode`], used to connect nodes in the doubly-linked list.
67/// The strong reference is guaranteed to be stored in the `data` map of the [`LruQueue`].
68type Link<K> = Option<Weak<Mutex<LruNode<K>>>>;
69
70impl<K: Eq + Hash + Clone, V> LruQueue<K, V> {
71    pub fn new() -> Self {
72        Self {
73            data: HashMap::new(),
74            queue: LruList {
75                head: None,
76                tail: None,
77            },
78        }
79    }
80
81    /// Returns a reference to value mapped by `key`, if it exists.
82    /// If the entry exists, it becomes the most recently used.
83    pub fn get(&mut self, key: &K) -> Option<&V> {
84        if let Some(value) = self.remove(key) {
85            self.put(key.clone(), value);
86        }
87        self.data.get(key).map(|(_, value)| value)
88    }
89
90    /// Returns a reference to value mapped by `key`, if it exists.
91    /// Does not affect the queue order.
92    pub fn peek(&self, key: &K) -> Option<&V> {
93        self.data.get(key).map(|(_, value)| value)
94    }
95
96    /// Checks whether there is an entry with key `key` in the queue.
97    /// Does not affect the queue order.
98    pub fn contains_key(&self, key: &K) -> bool {
99        self.data.contains_key(key)
100    }
101
102    /// Inserts an entry in the queue, becoming the most recently used.
103    /// If the entry already exists, returns the previous value.
104    pub fn put(&mut self, key: K, value: V) -> Option<V> {
105        let old_value = self.remove(&key);
106
107        let node = Arc::new(Mutex::new(LruNode {
108            key: key.clone(),
109            prev: None,
110            next: None,
111        }));
112
113        match self.queue.head {
114            // queue is not empty
115            Some(ref old_head) => {
116                old_head
117                    .upgrade()
118                    .expect("value has been unexpectedly dropped")
119                    .lock()
120                    .prev = Some(Arc::downgrade(&node));
121                node.lock().next = Some(Weak::clone(old_head));
122                self.queue.head = Some(Arc::downgrade(&node));
123            }
124            // queue is empty
125            _ => {
126                self.queue.head = Some(Arc::downgrade(&node));
127                self.queue.tail = Some(Arc::downgrade(&node));
128            }
129        }
130
131        self.data.insert(key, (node, value));
132
133        old_value
134    }
135
136    /// Removes and returns the least recently used value.
137    /// Returns `None` if the queue is empty.
138    pub fn pop(&mut self) -> Option<(K, V)> {
139        let key_to_remove = self.queue.tail.as_ref().map(|n| {
140            n.upgrade()
141                .expect("value has been unexpectedly dropped")
142                .lock()
143                .key
144                .clone()
145        });
146        if let Some(k) = key_to_remove {
147            let value = self.remove(&k).unwrap(); // confirmed above that the entry exists
148            Some((k, value))
149        } else {
150            None
151        }
152    }
153
154    /// Removes a specific entry from the queue, if it exists.
155    pub fn remove(&mut self, key: &K) -> Option<V> {
156        if let Some((old_node, old_value)) = self.data.remove(key) {
157            let LruNode { key: _, prev, next } = &*old_node.lock();
158            match (prev, next) {
159                // single node in the queue
160                (None, None) => {
161                    self.queue.head = None;
162                    self.queue.tail = None;
163                }
164                // removed the head node
165                (None, Some(n)) => {
166                    let n_strong =
167                        n.upgrade().expect("value has been unexpectedly dropped");
168                    n_strong.lock().prev = None;
169                    self.queue.head = Some(Weak::clone(n));
170                }
171                // removed the tail node
172                (Some(p), None) => {
173                    let p_strong =
174                        p.upgrade().expect("value has been unexpectedly dropped");
175                    p_strong.lock().next = None;
176                    self.queue.tail = Some(Weak::clone(p));
177                }
178                // removed a middle node
179                (Some(p), Some(n)) => {
180                    let n_strong =
181                        n.upgrade().expect("value has been unexpectedly dropped");
182                    let p_strong =
183                        p.upgrade().expect("value has been unexpectedly dropped");
184                    n_strong.lock().prev = Some(Weak::clone(p));
185                    p_strong.lock().next = Some(Weak::clone(n));
186                }
187            };
188            Some(old_value)
189        } else {
190            None
191        }
192    }
193
194    /// Returns the number of entries in the queue.
195    pub fn len(&self) -> usize {
196        self.data.len()
197    }
198
199    /// Checks whether the queue has no items.
200    pub fn is_empty(&self) -> bool {
201        self.data.is_empty()
202    }
203
204    /// Removes all entries from the queue.
205    pub fn clear(&mut self) {
206        self.queue.head = None;
207        self.queue.tail = None;
208        self.data.clear();
209    }
210
211    /// Returns a reference to the entries currently in the queue.
212    pub fn list_entries(&self) -> HashMap<&K, &V> {
213        self.data.iter().map(|(k, (_, v))| (k, v)).collect()
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use std::collections::HashMap;
220
221    use rand::seq::IndexedRandom;
222
223    use crate::cache::lru_queue::LruQueue;
224
225    #[test]
226    fn test_get() {
227        let mut lru_queue: LruQueue<i32, i32> = LruQueue::new();
228
229        // value does not exist
230        assert_eq!(lru_queue.get(&1), None);
231
232        // value exists
233        lru_queue.put(1, 10);
234        assert_eq!(lru_queue.get(&1), Some(&10));
235        assert_eq!(lru_queue.get(&1), Some(&10));
236
237        // value is removed
238        lru_queue.remove(&1);
239        assert_eq!(lru_queue.get(&1), None);
240    }
241
242    #[test]
243    fn test_peek() {
244        let mut lru_queue: LruQueue<i32, i32> = LruQueue::new();
245
246        // value does not exist
247        assert_eq!(lru_queue.peek(&1), None);
248
249        // value exists
250        lru_queue.put(1, 10);
251        assert_eq!(lru_queue.peek(&1), Some(&10));
252        assert_eq!(lru_queue.peek(&1), Some(&10));
253
254        // value is removed
255        lru_queue.remove(&1);
256        assert_eq!(lru_queue.peek(&1), None);
257    }
258
259    #[test]
260    fn test_put() {
261        let mut lru_queue: LruQueue<i32, i32> = LruQueue::new();
262
263        // no previous value
264        assert_eq!(lru_queue.put(1, 10), None);
265
266        // update, the previous value is returned
267        assert_eq!(lru_queue.put(1, 11), Some(10));
268        assert_eq!(lru_queue.put(1, 12), Some(11));
269        assert_eq!(lru_queue.put(1, 13), Some(12));
270    }
271
272    #[test]
273    fn test_remove() {
274        let mut lru_queue: LruQueue<i32, i32> = LruQueue::new();
275
276        // value does not exist
277        assert_eq!(lru_queue.remove(&1), None);
278
279        // value exists and is returned
280        lru_queue.put(1, 10);
281        assert_eq!(lru_queue.remove(&1), Some(10));
282
283        // value does not exist
284        assert_eq!(lru_queue.remove(&1), None);
285    }
286
287    #[test]
288    fn test_contains_key() {
289        let mut lru_queue: LruQueue<i32, i32> = LruQueue::new();
290
291        // value does not exist
292        assert!(!lru_queue.contains_key(&1));
293
294        // value exists
295        lru_queue.put(1, 10);
296        assert!(lru_queue.contains_key(&1));
297
298        // value is removed
299        lru_queue.remove(&1);
300        assert!(!lru_queue.contains_key(&1));
301    }
302
303    #[test]
304    fn test_len() {
305        let mut lru_queue: LruQueue<i32, i32> = LruQueue::new();
306
307        // empty
308        assert_eq!(lru_queue.len(), 0);
309
310        // puts
311        lru_queue.put(1, 10);
312        assert_eq!(lru_queue.len(), 1);
313        lru_queue.put(2, 20);
314        assert_eq!(lru_queue.len(), 2);
315        lru_queue.put(3, 30);
316        assert_eq!(lru_queue.len(), 3);
317        lru_queue.put(1, 11);
318        lru_queue.put(3, 31);
319        assert_eq!(lru_queue.len(), 3);
320
321        // removes
322        lru_queue.remove(&1);
323        assert_eq!(lru_queue.len(), 2);
324        lru_queue.remove(&1);
325        assert_eq!(lru_queue.len(), 2);
326        lru_queue.remove(&4);
327        assert_eq!(lru_queue.len(), 2);
328        lru_queue.remove(&3);
329        assert_eq!(lru_queue.len(), 1);
330        lru_queue.remove(&2);
331        assert_eq!(lru_queue.len(), 0);
332        lru_queue.remove(&2);
333        assert_eq!(lru_queue.len(), 0);
334
335        // clear
336        lru_queue.put(1, 10);
337        lru_queue.put(2, 20);
338        lru_queue.put(3, 30);
339        assert_eq!(lru_queue.len(), 3);
340        lru_queue.clear();
341        assert_eq!(lru_queue.len(), 0);
342    }
343
344    #[test]
345    fn test_is_empty() {
346        let mut lru_queue: LruQueue<i32, i32> = LruQueue::new();
347
348        // empty
349        assert!(lru_queue.is_empty());
350
351        // puts
352        lru_queue.put(1, 10);
353        assert!(!lru_queue.is_empty());
354        lru_queue.put(2, 20);
355        assert!(!lru_queue.is_empty());
356
357        // removes
358        lru_queue.remove(&1);
359        assert!(!lru_queue.is_empty());
360        lru_queue.remove(&1);
361        assert!(!lru_queue.is_empty());
362        lru_queue.remove(&2);
363        assert!(lru_queue.is_empty());
364
365        // clear
366        lru_queue.put(1, 10);
367        lru_queue.put(2, 20);
368        lru_queue.put(3, 30);
369        assert!(!lru_queue.is_empty());
370        lru_queue.clear();
371        assert!(lru_queue.is_empty());
372    }
373
374    #[test]
375    fn test_clear() {
376        let mut lru_queue: LruQueue<i32, i32> = LruQueue::new();
377
378        // empty
379        lru_queue.clear();
380
381        // filled
382        lru_queue.put(1, 10);
383        lru_queue.put(2, 20);
384        lru_queue.put(3, 30);
385        assert_eq!(lru_queue.get(&1), Some(&10));
386        assert_eq!(lru_queue.get(&2), Some(&20));
387        assert_eq!(lru_queue.get(&3), Some(&30));
388        lru_queue.clear();
389        assert_eq!(lru_queue.get(&1), None);
390        assert_eq!(lru_queue.get(&2), None);
391        assert_eq!(lru_queue.get(&3), None);
392        assert_eq!(lru_queue.len(), 0);
393    }
394
395    #[test]
396    fn test_pop() {
397        let mut lru_queue: LruQueue<i32, i32> = LruQueue::new();
398
399        // empty queue
400        assert_eq!(lru_queue.pop(), None);
401
402        // simplest case
403        lru_queue.put(1, 10);
404        lru_queue.put(2, 20);
405        lru_queue.put(3, 30);
406        assert_eq!(lru_queue.pop(), Some((1, 10)));
407        assert_eq!(lru_queue.pop(), Some((2, 20)));
408        assert_eq!(lru_queue.pop(), Some((3, 30)));
409        assert_eq!(lru_queue.pop(), None);
410
411        // 'get' changes the order
412        lru_queue.put(1, 10);
413        lru_queue.put(2, 20);
414        lru_queue.put(3, 30);
415        lru_queue.get(&2);
416        assert_eq!(lru_queue.pop(), Some((1, 10)));
417        assert_eq!(lru_queue.pop(), Some((3, 30)));
418        assert_eq!(lru_queue.pop(), Some((2, 20)));
419        assert_eq!(lru_queue.pop(), None);
420
421        // multiple 'gets'
422        lru_queue.put(1, 10);
423        lru_queue.put(2, 20);
424        lru_queue.put(3, 30);
425        lru_queue.get(&2);
426        lru_queue.get(&3);
427        lru_queue.get(&1);
428        assert_eq!(lru_queue.pop(), Some((2, 20)));
429        assert_eq!(lru_queue.pop(), Some((3, 30)));
430        assert_eq!(lru_queue.pop(), Some((1, 10)));
431        assert_eq!(lru_queue.pop(), None);
432
433        // 'peak' does not change the order
434        lru_queue.put(1, 10);
435        lru_queue.put(2, 20);
436        lru_queue.put(3, 30);
437        lru_queue.peek(&2);
438        assert_eq!(lru_queue.pop(), Some((1, 10)));
439        assert_eq!(lru_queue.pop(), Some((2, 20)));
440        assert_eq!(lru_queue.pop(), Some((3, 30)));
441        assert_eq!(lru_queue.pop(), None);
442
443        // 'contains' does not change the order
444        lru_queue.put(1, 10);
445        lru_queue.put(2, 20);
446        lru_queue.put(3, 30);
447        lru_queue.contains_key(&2);
448        assert_eq!(lru_queue.pop(), Some((1, 10)));
449        assert_eq!(lru_queue.pop(), Some((2, 20)));
450        assert_eq!(lru_queue.pop(), Some((3, 30)));
451        assert_eq!(lru_queue.pop(), None);
452
453        // 'put' on the same key promotes it
454        lru_queue.put(1, 10);
455        lru_queue.put(2, 20);
456        lru_queue.put(3, 30);
457        lru_queue.put(2, 21);
458        assert_eq!(lru_queue.pop(), Some((1, 10)));
459        assert_eq!(lru_queue.pop(), Some((3, 30)));
460        assert_eq!(lru_queue.pop(), Some((2, 21)));
461        assert_eq!(lru_queue.pop(), None);
462
463        // multiple 'puts'
464        lru_queue.put(1, 10);
465        lru_queue.put(2, 20);
466        lru_queue.put(3, 30);
467        lru_queue.put(2, 21);
468        lru_queue.put(3, 31);
469        lru_queue.put(1, 11);
470        assert_eq!(lru_queue.pop(), Some((2, 21)));
471        assert_eq!(lru_queue.pop(), Some((3, 31)));
472        assert_eq!(lru_queue.pop(), Some((1, 11)));
473        assert_eq!(lru_queue.pop(), None);
474
475        // 'remove' an element in the middle of the queue
476        lru_queue.put(1, 10);
477        lru_queue.put(2, 20);
478        lru_queue.put(3, 30);
479        lru_queue.remove(&2);
480        assert_eq!(lru_queue.pop(), Some((1, 10)));
481        assert_eq!(lru_queue.pop(), Some((3, 30)));
482        assert_eq!(lru_queue.pop(), None);
483
484        // 'remove' the LRU
485        lru_queue.put(1, 10);
486        lru_queue.put(2, 20);
487        lru_queue.put(3, 30);
488        lru_queue.remove(&1);
489        assert_eq!(lru_queue.pop(), Some((2, 20)));
490        assert_eq!(lru_queue.pop(), Some((3, 30)));
491        assert_eq!(lru_queue.pop(), None);
492
493        // 'remove' the MRU
494        lru_queue.put(1, 10);
495        lru_queue.put(2, 20);
496        lru_queue.put(3, 30);
497        lru_queue.remove(&3);
498        assert_eq!(lru_queue.pop(), Some((1, 10)));
499        assert_eq!(lru_queue.pop(), Some((2, 20)));
500        assert_eq!(lru_queue.pop(), None);
501    }
502
503    #[test]
504    /// Fuzzy test using an hashmap as the base to check the methods.
505    fn test_fuzzy() {
506        let mut lru_queue: LruQueue<i32, i32> = LruQueue::new();
507        let mut map: HashMap<i32, i32> = HashMap::new();
508        let max_keys = 1_000;
509        let methods = ["get", "put", "remove", "pop", "contains", "len"];
510        let mut rng = rand::rng();
511
512        for i in 0..1_000_000 {
513            match *methods.choose(&mut rng).unwrap() {
514                "get" => {
515                    assert_eq!(lru_queue.get(&(i % max_keys)), map.get(&(i % max_keys)))
516                }
517                "put" => assert_eq!(
518                    lru_queue.put(i % max_keys, i),
519                    map.insert(i % max_keys, i)
520                ),
521                "remove" => assert_eq!(
522                    lru_queue.remove(&(i % max_keys)),
523                    map.remove(&(i % max_keys))
524                ),
525                "pop" => {
526                    let removed = lru_queue.pop();
527                    if let Some((k, v)) = removed {
528                        assert_eq!(Some(v), map.remove(&k))
529                    }
530                }
531                "contains" => {
532                    assert_eq!(
533                        lru_queue.contains_key(&(i % max_keys)),
534                        map.contains_key(&(i % max_keys))
535                    )
536                }
537                "len" => assert_eq!(lru_queue.len(), map.len()),
538                _ => unreachable!(),
539            }
540        }
541    }
542}