ntex_util/services/
counter.rs1use std::{cell::Cell, cell::RefCell, future::poll_fn, rc::Rc, task::Context, task::Poll};
2
3use crate::task::LocalWaker;
4
5#[derive(Debug)]
9pub struct Counter(usize, Rc<CounterInner>);
10
11#[derive(Debug)]
12struct CounterInner {
13 count: Cell<usize>,
14 capacity: Cell<usize>,
15 tasks: RefCell<slab::Slab<LocalWaker>>,
16}
17
18impl Counter {
19 pub fn new(capacity: usize) -> Self {
21 let mut tasks = slab::Slab::new();
22 let idx = tasks.insert(LocalWaker::new());
23
24 Counter(
25 idx,
26 Rc::new(CounterInner {
27 count: Cell::new(0),
28 capacity: Cell::new(capacity),
29 tasks: RefCell::new(tasks),
30 }),
31 )
32 }
33
34 pub fn get(&self) -> CounterGuard {
36 CounterGuard::new(self.1.clone())
37 }
38
39 pub fn set_capacity(&self, cap: usize) {
41 self.1.capacity.set(cap);
42 self.1.notify();
43 }
44
45 pub fn is_available(&self) -> bool {
47 self.1.count.get() < self.1.capacity.get()
48 }
49
50 pub async fn available(&self) {
53 poll_fn(|cx| {
54 if self.poll_available(cx) {
55 Poll::Ready(())
56 } else {
57 Poll::Pending
58 }
59 })
60 .await
61 }
62
63 pub async fn unavailable(&self) {
65 poll_fn(|cx| {
66 if self.poll_available(cx) {
67 Poll::Pending
68 } else {
69 Poll::Ready(())
70 }
71 })
72 .await
73 }
74
75 fn poll_available(&self, cx: &mut Context<'_>) -> bool {
78 let tasks = self.1.tasks.borrow();
79 tasks[self.0].register(cx.waker());
80 self.1.count.get() < self.1.capacity.get()
81 }
82
83 pub fn total(&self) -> usize {
85 self.1.count.get()
86 }
87}
88
89impl Clone for Counter {
90 fn clone(&self) -> Self {
91 let idx = self.1.tasks.borrow_mut().insert(LocalWaker::new());
92 Self(idx, self.1.clone())
93 }
94}
95
96impl Drop for Counter {
97 fn drop(&mut self) {
98 self.1.tasks.borrow_mut().remove(self.0);
99 }
100}
101
102#[derive(Debug)]
103pub struct CounterGuard(Rc<CounterInner>);
104
105impl CounterGuard {
106 fn new(inner: Rc<CounterInner>) -> Self {
107 inner.inc();
108 CounterGuard(inner)
109 }
110}
111
112impl Unpin for CounterGuard {}
113
114impl Drop for CounterGuard {
115 fn drop(&mut self) {
116 self.0.dec();
117 }
118}
119
120impl CounterInner {
121 fn inc(&self) {
122 let num = self.count.get() + 1;
123 self.count.set(num);
124 if num == self.capacity.get() {
125 self.notify();
126 }
127 }
128
129 fn dec(&self) {
130 let num = self.count.get();
131 self.count.set(num - 1);
132 if num == self.capacity.get() {
133 self.notify();
134 }
135 }
136
137 fn notify(&self) {
138 let tasks = self.tasks.borrow();
139 for (_, task) in &*tasks {
140 task.wake()
141 }
142 }
143}