ntex_util/services/
counter.rs

1use std::{cell::Cell, cell::RefCell, future::poll_fn, rc::Rc, task::Context, task::Poll};
2
3use crate::task::LocalWaker;
4
5/// Simple counter with ability to notify task on reaching specific number
6///
7/// Counter could be cloned, total count is shared across all clones.
8#[derive(Debug)]
9pub struct Counter(usize, Rc<CounterInner>);
10
11#[derive(Debug)]
12struct CounterInner {
13    count: Cell<usize>,
14    capacity: Cell<usize>,
15    tasks: RefCell<slab::Slab<LocalWaker>>,
16}
17
18impl Counter {
19    /// Create `Counter` instance and set max value.
20    pub fn new(capacity: usize) -> Self {
21        let mut tasks = slab::Slab::new();
22        let idx = tasks.insert(LocalWaker::new());
23
24        Counter(
25            idx,
26            Rc::new(CounterInner {
27                count: Cell::new(0),
28                capacity: Cell::new(capacity),
29                tasks: RefCell::new(tasks),
30            }),
31        )
32    }
33
34    /// Get counter guard.
35    pub fn get(&self) -> CounterGuard {
36        CounterGuard::new(self.1.clone())
37    }
38
39    /// Set counter capacity
40    pub fn set_capacity(&self, cap: usize) {
41        self.1.capacity.set(cap);
42        self.1.notify();
43    }
44
45    /// Check is counter has free capacity.
46    pub fn is_available(&self) -> bool {
47        self.1.count.get() < self.1.capacity.get()
48    }
49
50    /// Check if counter is not at capacity. If counter at capacity
51    /// it registers notification for current task.
52    pub async fn available(&self) {
53        poll_fn(|cx| {
54            if self.poll_available(cx) {
55                Poll::Ready(())
56            } else {
57                Poll::Pending
58            }
59        })
60        .await
61    }
62
63    /// Wait untile counter becomes at capacity.
64    pub async fn unavailable(&self) {
65        poll_fn(|cx| {
66            if self.poll_available(cx) {
67                Poll::Pending
68            } else {
69                Poll::Ready(())
70            }
71        })
72        .await
73    }
74
75    /// Check if counter is not at capacity. If counter at capacity
76    /// it registers notification for current task.
77    fn poll_available(&self, cx: &mut Context<'_>) -> bool {
78        let tasks = self.1.tasks.borrow();
79        tasks[self.0].register(cx.waker());
80        self.1.count.get() < self.1.capacity.get()
81    }
82
83    /// Get total number of acquired counts
84    pub fn total(&self) -> usize {
85        self.1.count.get()
86    }
87}
88
89impl Clone for Counter {
90    fn clone(&self) -> Self {
91        let idx = self.1.tasks.borrow_mut().insert(LocalWaker::new());
92        Self(idx, self.1.clone())
93    }
94}
95
96impl Drop for Counter {
97    fn drop(&mut self) {
98        self.1.tasks.borrow_mut().remove(self.0);
99    }
100}
101
102#[derive(Debug)]
103pub struct CounterGuard(Rc<CounterInner>);
104
105impl CounterGuard {
106    fn new(inner: Rc<CounterInner>) -> Self {
107        inner.inc();
108        CounterGuard(inner)
109    }
110}
111
112impl Unpin for CounterGuard {}
113
114impl Drop for CounterGuard {
115    fn drop(&mut self) {
116        self.0.dec();
117    }
118}
119
120impl CounterInner {
121    fn inc(&self) {
122        let num = self.count.get() + 1;
123        self.count.set(num);
124        if num == self.capacity.get() {
125            self.notify();
126        }
127    }
128
129    fn dec(&self) {
130        let num = self.count.get();
131        self.count.set(num - 1);
132        if num == self.capacity.get() {
133            self.notify();
134        }
135    }
136
137    fn notify(&self) {
138        let tasks = self.tasks.borrow();
139        for (_, task) in &*tasks {
140            task.wake()
141        }
142    }
143}