ntex_util/services/
counter.rs1use std::{cell::Cell, cell::RefCell, future::poll_fn, rc::Rc, task::Context, task::Poll};
2
3use crate::task::LocalWaker;
4
5#[derive(Debug)]
9pub struct Counter(usize, Rc<CounterInner>);
10
11#[derive(Debug)]
12struct CounterInner {
13 count: Cell<usize>,
14 capacity: Cell<usize>,
15 tasks: RefCell<slab::Slab<LocalWaker>>,
16}
17
18impl Counter {
19 pub fn new(capacity: usize) -> Self {
21 let mut tasks = slab::Slab::new();
22 let idx = tasks.insert(LocalWaker::new());
23
24 Counter(
25 idx,
26 Rc::new(CounterInner {
27 count: Cell::new(0),
28 capacity: Cell::new(capacity),
29 tasks: RefCell::new(tasks),
30 }),
31 )
32 }
33
34 pub fn get(&self) -> CounterGuard {
36 CounterGuard::new(self.1.clone())
37 }
38
39 pub fn set_capacity(&self, cap: usize) {
41 self.1.capacity.set(cap);
42 self.1.notify();
43 }
44
45 pub fn is_available(&self) -> bool {
47 self.1.count.get() < self.1.capacity.get()
48 }
49
50 pub async fn available(&self) {
55 poll_fn(|cx| {
56 if self.poll_available(cx) {
57 Poll::Ready(())
58 } else {
59 Poll::Pending
60 }
61 })
62 .await;
63 }
64
65 pub async fn unavailable(&self) {
67 poll_fn(|cx| {
68 if self.poll_available(cx) {
69 Poll::Pending
70 } else {
71 Poll::Ready(())
72 }
73 })
74 .await;
75 }
76
77 fn poll_available(&self, cx: &mut Context<'_>) -> bool {
80 if self.1.count.get() < self.1.capacity.get() {
81 true
82 } else {
83 let tasks = self.1.tasks.borrow();
84 tasks[self.0].register(cx.waker());
85 false
86 }
87 }
88
89 pub fn total(&self) -> usize {
91 self.1.count.get()
92 }
93}
94
95impl Clone for Counter {
96 fn clone(&self) -> Self {
97 let idx = self.1.tasks.borrow_mut().insert(LocalWaker::new());
98 Self(idx, self.1.clone())
99 }
100}
101
102impl Drop for Counter {
103 fn drop(&mut self) {
104 self.1.tasks.borrow_mut().remove(self.0);
105 }
106}
107
108#[derive(Debug)]
109pub struct CounterGuard(Rc<CounterInner>);
110
111impl CounterGuard {
112 fn new(inner: Rc<CounterInner>) -> Self {
113 inner.inc();
114 CounterGuard(inner)
115 }
116}
117
118impl Unpin for CounterGuard {}
119
120impl Drop for CounterGuard {
121 fn drop(&mut self) {
122 self.0.dec();
123 }
124}
125
126impl CounterInner {
127 fn inc(&self) {
128 self.count.set(self.count.get() + 1);
129 }
130
131 fn dec(&self) {
132 let num = self.count.get();
133 self.count.set(num - 1);
134 if num == self.capacity.get() {
135 self.notify();
136 }
137 }
138
139 fn notify(&self) {
140 let tasks = self.tasks.borrow();
141 for (_, task) in &*tasks {
142 task.wake();
143 }
144 }
145}