Skip to main content

ntex_util/services/
counter.rs

1use std::{cell::Cell, cell::RefCell, future::poll_fn, rc::Rc, task::Context, task::Poll};
2
3use crate::task::LocalWaker;
4
5/// Simple counter with ability to notify task on reaching specific number
6///
7/// Counter could be cloned, total count is shared across all clones.
8#[derive(Debug)]
9pub struct Counter(usize, Rc<CounterInner>);
10
11#[derive(Debug)]
12struct CounterInner {
13    count: Cell<usize>,
14    capacity: Cell<usize>,
15    tasks: RefCell<slab::Slab<LocalWaker>>,
16}
17
18impl Counter {
19    /// Create `Counter` instance and set max value.
20    pub fn new(capacity: usize) -> Self {
21        let mut tasks = slab::Slab::new();
22        let idx = tasks.insert(LocalWaker::new());
23
24        Counter(
25            idx,
26            Rc::new(CounterInner {
27                count: Cell::new(0),
28                capacity: Cell::new(capacity),
29                tasks: RefCell::new(tasks),
30            }),
31        )
32    }
33
34    /// Get counter guard.
35    pub fn get(&self) -> CounterGuard {
36        CounterGuard::new(self.1.clone())
37    }
38
39    /// Set counter capacity
40    pub fn set_capacity(&self, cap: usize) {
41        self.1.capacity.set(cap);
42        self.1.notify();
43    }
44
45    /// Check is counter has free capacity.
46    pub fn is_available(&self) -> bool {
47        self.1.count.get() < self.1.capacity.get()
48    }
49
50    /// Waits until the counter has free capacity.
51    ///
52    /// Returns immediately if there is capacity available. Otherwise,
53    /// registers the current task for wakeup and waits until a slot is freed.
54    pub async fn available(&self) {
55        poll_fn(|cx| {
56            if self.poll_available(cx) {
57                Poll::Ready(())
58            } else {
59                Poll::Pending
60            }
61        })
62        .await;
63    }
64
65    /// Waits until the counter reaches its capacity (i.e., becomes unavailable).
66    pub async fn unavailable(&self) {
67        poll_fn(|cx| {
68            if self.poll_available(cx) {
69                Poll::Pending
70            } else {
71                Poll::Ready(())
72            }
73        })
74        .await;
75    }
76
77    /// Check if counter is not at capacity. If counter at capacity
78    /// it registers notification for current task.
79    fn poll_available(&self, cx: &mut Context<'_>) -> bool {
80        if self.1.count.get() < self.1.capacity.get() {
81            true
82        } else {
83            let tasks = self.1.tasks.borrow();
84            tasks[self.0].register(cx.waker());
85            false
86        }
87    }
88
89    /// Get total number of acquired counts
90    pub fn total(&self) -> usize {
91        self.1.count.get()
92    }
93}
94
95impl Clone for Counter {
96    fn clone(&self) -> Self {
97        let idx = self.1.tasks.borrow_mut().insert(LocalWaker::new());
98        Self(idx, self.1.clone())
99    }
100}
101
102impl Drop for Counter {
103    fn drop(&mut self) {
104        self.1.tasks.borrow_mut().remove(self.0);
105    }
106}
107
108#[derive(Debug)]
109pub struct CounterGuard(Rc<CounterInner>);
110
111impl CounterGuard {
112    fn new(inner: Rc<CounterInner>) -> Self {
113        inner.inc();
114        CounterGuard(inner)
115    }
116}
117
118impl Unpin for CounterGuard {}
119
120impl Drop for CounterGuard {
121    fn drop(&mut self) {
122        self.0.dec();
123    }
124}
125
126impl CounterInner {
127    fn inc(&self) {
128        self.count.set(self.count.get() + 1);
129    }
130
131    fn dec(&self) {
132        let num = self.count.get();
133        self.count.set(num - 1);
134        if num == self.capacity.get() {
135            self.notify();
136        }
137    }
138
139    fn notify(&self) {
140        let tasks = self.tasks.borrow();
141        for (_, task) in &*tasks {
142            task.wake();
143        }
144    }
145}