1use std::sync::{
2 atomic::{AtomicPtr, Ordering},
3 Arc,
4};
5
6use crossbeam_channel::{unbounded, Receiver, Sender};
7use wg::WaitGroup;
8
9#[derive(Debug)]
10struct Canceler {
11 tx: AtomicPtr<()>,
12}
13
14impl Canceler {
15 #[inline]
16 fn cancel(&self) {
17 let tx_ptr = self.tx.swap(std::ptr::null_mut(), Ordering::AcqRel);
19
20 if !tx_ptr.is_null() {
22 unsafe {
25 let _ = Box::from_raw(tx_ptr as *mut Sender<()>);
27 }
29 }
30 }
31}
32
33impl Drop for Canceler {
34 fn drop(&mut self) {
35 self.cancel();
36 }
37}
38
39#[derive(Debug)]
40#[repr(transparent)]
41struct CancelContext {
42 rx: Receiver<()>,
43}
44
45impl CancelContext {
46 fn new() -> (Self, Canceler) {
47 let (tx, rx) = unbounded();
48 (
49 Self { rx },
50 Canceler {
51 tx: AtomicPtr::new(Box::into_raw(Box::new(tx)) as _),
52 },
53 )
54 }
55
56 #[inline]
57 fn done(&self) -> Receiver<()> {
58 self.rx.clone()
59 }
60}
61
62#[derive(Debug, Clone)]
66#[repr(transparent)]
67pub struct Closer {
68 inner: Arc<CloserInner>,
69}
70
71#[derive(Debug)]
72struct CloserInner {
73 wg: WaitGroup,
74 ctx: CancelContext,
75 cancel: Canceler,
76}
77
78impl CloserInner {
79 #[inline]
80 fn new() -> Self {
81 let (ctx, cancel) = CancelContext::new();
82 Self {
83 wg: WaitGroup::new(),
84 ctx,
85 cancel,
86 }
87 }
88
89 #[inline]
90 fn with(initial: usize) -> Self {
91 let (ctx, cancel) = CancelContext::new();
92 Self {
93 wg: WaitGroup::from(initial),
94 ctx,
95 cancel,
96 }
97 }
98}
99
100impl Default for Closer {
101 fn default() -> Self {
102 Self {
103 inner: Arc::new(CloserInner::new()),
104 }
105 }
106}
107
108impl Closer {
109 #[inline]
111 pub fn new(initial: usize) -> Self {
112 Self {
113 inner: Arc::new(CloserInner::with(initial)),
114 }
115 }
116
117 #[inline]
119 pub fn add_running(&self, running: usize) {
120 self.inner.wg.add(running);
121 }
122
123 #[inline]
125 pub fn done(&self) {
126 self.inner.wg.done();
127 }
128
129 #[inline]
131 pub fn signal(&self) {
132 self.inner.cancel.cancel();
133 }
134
135 #[inline]
138 pub fn wait(&self) {
139 self.inner.wg.wait();
140 }
141
142 #[inline]
144 pub fn signal_and_wait(&self) {
145 self.signal();
146 self.wait();
147 }
148
149 pub fn listen(&self) -> Receiver<()> {
151 self.inner.ctx.done()
152 }
153}