1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
//! Resizable queue implementation

use std::convert::TryInto;
use std::fmt::Debug;
use std::iter::FromIterator;
use std::sync::atomic::{AtomicUsize, Ordering};

use tokio::sync::{Mutex, Semaphore};

use crate::unlimited::Queue as UnlimitedQueue;

/// Queue that is limited in size and supports resizing.
///
/// This queue implementation has the following characteristics:
///
///   - Resizable (`deadqueue::resizable::Queue`)
///   - Based on `deadqueue::unlimited::Queue`
///   - Has limited capacity with back pressure on push
///   - Supports resizing
///   - Enabled via the `resizable` feature in your `Cargo.toml`
pub struct Queue<T> {
    queue: UnlimitedQueue<T>,
    capacity: AtomicUsize,
    push_semaphore: Semaphore,
    resize_mutex: Mutex<()>,
}

impl<T> Queue<T> {
    /// Create new empty queue
    pub fn new(max_size: usize) -> Self {
        Self {
            queue: UnlimitedQueue::new(),
            capacity: AtomicUsize::new(max_size),
            push_semaphore: Semaphore::new(max_size),
            resize_mutex: Mutex::default(),
        }
    }
    /// Get an item from the queue. If the queue is currently empty
    /// this method blocks until an item is available.
    pub async fn pop(&self) -> T {
        let item = self.queue.pop().await;
        self.push_semaphore.add_permits(1);
        item
    }
    /// Try to get an item from the queue. If the queue is currently
    /// empty return None instead.
    pub fn try_pop(&self) -> Option<T> {
        let item = self.queue.try_pop();
        if item.is_some() {
            self.push_semaphore.add_permits(1);
        }
        item
    }
    /// Push an item into the queue
    pub async fn push(&self, item: T) {
        let permit = self.push_semaphore.acquire().await.unwrap();
        self.queue.push(item);
        permit.forget();
    }
    /// Try to push an item to the queue. If the queue is currently
    /// full return the object as `Err<T>`.
    pub fn try_push(&self, item: T) -> Result<(), T> {
        match self.push_semaphore.try_acquire() {
            Ok(permit) => {
                self.queue.push(item);
                permit.forget();
                Ok(())
            }
            Err(_) => Err(item),
        }
    }
    /// Get capacity of the queue (maximum number of items queue can store).
    pub fn capacity(&self) -> usize {
        self.capacity.load(Ordering::Relaxed)
    }
    /// Get current length of queue (number of items currently stored).
    pub fn len(&self) -> usize {
        self.queue.len()
    }
    /// Returns `true` if the queue is empty.
    pub fn is_empty(&self) -> bool {
        self.queue.is_empty()
    }
    /// Returns `true` if the queue is full.
    /// **Note:** this can give an incorrect result if a simultaneous push/pop
    /// and resize ocurr while this function is executing. try_push() is the
    /// reccomended and safer mechanism in most circumstances. This method
    /// is provided as a convenience API.
    pub fn is_full(&self) -> bool {
        self.len() >= self.capacity()
    }
    /// The number of available items in the queue. If there are no
    /// items in the queue this number can become negative and stores the
    /// number of futures waiting for an item.
    pub fn available(&self) -> isize {
        self.queue.available()
    }
    /// Resize queue. This increases or decreases the queue
    /// capacity accordingly.
    ///
    /// **Note:** Increasing the capacity of a queue happens without
    /// blocking unless a resize operation is already in progress.
    /// Decreasing the capacity can block if there are futures waiting to
    /// push items to the queue.
    pub async fn resize(&self, target_capacity: usize) {
        let _guard = self.resize_mutex.lock().await;
        match target_capacity.cmp(&self.capacity()) {
            std::cmp::Ordering::Greater => {
                let diff = target_capacity - self.capacity();
                self.capacity.fetch_add(diff, Ordering::Relaxed);
                self.push_semaphore.add_permits(diff);
            }
            std::cmp::Ordering::Less => {
                // Shrinking the queue is a bit more involved
                // as there are two cases that need to be covered.
                for _ in target_capacity..self.capacity() {
                    tokio::select! {
                        biased;
                        // If there are push permits available consume
                        // them first making sure no new items are pushed
                        // to the queue.
                        push_permit = self.push_semaphore.acquire() => {
                            push_permit.unwrap().forget();
                        }
                        // If the queue contains more elements than the
                        // target capacity those need to be removed from
                        // the queue.
                        // Important: Call `self.queue.pop` and not
                        // `self.pop` as the former would add permits to
                        // the `push_semaphore` which we don't want to
                        // happen since the queue is being shrinked.
                        _ = self.queue.pop() => {}
                    };
                    self.capacity.fetch_sub(1, Ordering::Relaxed);
                }
            }
            _ => {}
        }
    }
}

impl<T> Debug for Queue<T> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Queue")
            .field("queue", &self.queue)
            .field("capacity", &self.capacity)
            .field("push_semaphore", &self.push_semaphore)
            .field("resize_mutex", &self.resize_mutex)
            .finish()
    }
}

impl<T> FromIterator<T> for Queue<T> {
    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
        let queue = UnlimitedQueue::from_iter(iter);
        let len = queue.len();
        Self {
            queue,
            capacity: len.try_into().unwrap(),
            push_semaphore: Semaphore::new(0),
            resize_mutex: Mutex::default(),
        }
    }
}