wmark/closer/
std_.rs

1use std::sync::{
2  atomic::{AtomicPtr, Ordering},
3  Arc,
4};
5
6use crossbeam_channel::{unbounded, Receiver, Sender};
7use wg::WaitGroup;
8
9#[derive(Debug)]
10struct Canceler {
11  tx: AtomicPtr<()>,
12}
13
14impl Canceler {
15  #[inline]
16  fn cancel(&self) {
17    // Safely take the sender out of the AtomicPtr.
18    let tx_ptr = self.tx.swap(std::ptr::null_mut(), Ordering::AcqRel);
19
20    // Check if the pointer is not null (indicating it hasn't been taken already).
21    if !tx_ptr.is_null() {
22      // Safe because we ensure that this is the only place that takes ownership of the pointer,
23      // and it is done only once.
24      unsafe {
25        // Convert the pointer back to a Box to take ownership and drop the sender.
26        let _ = Box::from_raw(tx_ptr as *mut Sender<()>);
27        // Sender is dropped here when `_tx_boxed` goes out of scope.
28      }
29    }
30  }
31}
32
33impl Drop for Canceler {
34  fn drop(&mut self) {
35    self.cancel();
36  }
37}
38
39#[derive(Debug)]
40#[repr(transparent)]
41struct CancelContext {
42  rx: Receiver<()>,
43}
44
45impl CancelContext {
46  fn new() -> (Self, Canceler) {
47    let (tx, rx) = unbounded();
48    (
49      Self { rx },
50      Canceler {
51        tx: AtomicPtr::new(Box::into_raw(Box::new(tx)) as _),
52      },
53    )
54  }
55
56  #[inline]
57  fn done(&self) -> Receiver<()> {
58    self.rx.clone()
59  }
60}
61
62/// Closer holds the two things we need to close a thread and wait for it to
63/// finish: a chan to tell the thread to shut down, and a WaitGroup with
64/// which to wait for it to finish shutting down.
65#[derive(Debug, Clone)]
66#[repr(transparent)]
67pub struct Closer {
68  inner: Arc<CloserInner>,
69}
70
71#[derive(Debug)]
72struct CloserInner {
73  wg: WaitGroup,
74  ctx: CancelContext,
75  cancel: Canceler,
76}
77
78impl CloserInner {
79  #[inline]
80  fn new() -> Self {
81    let (ctx, cancel) = CancelContext::new();
82    Self {
83      wg: WaitGroup::new(),
84      ctx,
85      cancel,
86    }
87  }
88
89  #[inline]
90  fn with(initial: usize) -> Self {
91    let (ctx, cancel) = CancelContext::new();
92    Self {
93      wg: WaitGroup::from(initial),
94      ctx,
95      cancel,
96    }
97  }
98}
99
100impl Default for Closer {
101  fn default() -> Self {
102    Self {
103      inner: Arc::new(CloserInner::new()),
104    }
105  }
106}
107
108impl Closer {
109  /// Constructs a new [`Closer`], with an initial count on the [`WaitGroup`].
110  #[inline]
111  pub fn new(initial: usize) -> Self {
112    Self {
113      inner: Arc::new(CloserInner::with(initial)),
114    }
115  }
116
117  /// Adds delta to the [`WaitGroup`].
118  #[inline]
119  pub fn add_running(&self, running: usize) {
120    self.inner.wg.add(running);
121  }
122
123  /// Calls [`WaitGroup::done`] on the [`WaitGroup`].
124  #[inline]
125  pub fn done(&self) {
126    self.inner.wg.done();
127  }
128
129  /// Signals the [`Closer::has_been_closed`] signal.
130  #[inline]
131  pub fn signal(&self) {
132    self.inner.cancel.cancel();
133  }
134
135  /// Waits on the [`WaitGroup`]. (It waits for the Closer's initial value, [`Closer::add_running`], and [`Closer::done`]
136  /// calls to balance out.)
137  #[inline]
138  pub fn wait(&self) {
139    self.inner.wg.wait();
140  }
141
142  /// Calls [`Closer::signal`], then [`Closer::wait`].
143  #[inline]
144  pub fn signal_and_wait(&self) {
145    self.signal();
146    self.wait();
147  }
148
149  /// Listens for the [`Closer::signal`] signal.
150  pub fn listen(&self) -> Receiver<()> {
151    self.inner.ctx.done()
152  }
153}