1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
#![doc = include_str!("../README.md")]
#![warn(missing_docs, missing_debug_implementations)]

use std::{
    future::Future,
    process::abort,
    sync::atomic::{AtomicBool, AtomicIsize, Ordering},
    task::{Poll, Waker},
    thread::{self, current, Thread},
};

use crossbeam_queue::SegQueue;

/// A HybridFutex is a synchronization primitive that allows threads to wait for
/// a notification from another thread. The HybridFutex maintains a counter that
/// represents the number of waiters, and a queue of waiters. The counter is
/// incremented when a thread calls `wait_sync` or `wait_async` methods, and
/// decremented when a thread calls `notify_one` or `notify_many` methods.
/// A thread calling `wait_sync` or `wait_async` is blocked until it is notified
/// by another thread calling `notify_one` or `notify_many`.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use std::thread;
/// use std::time::Duration;
/// use hybridfutex::HybridFutex;
///
/// let wait_queue = Arc::new(HybridFutex::new());
/// let wait_queue_clone = wait_queue.clone();
///
/// // Spawn a thread that waits for a notification from another thread
/// let handle = thread::spawn(move || {
///     println!("Thread 1 is waiting");
///     wait_queue_clone.wait_sync();
///     println!("Thread 1 is notified");
/// });
///
/// // Wait for a short time before notifying the other thread
/// thread::sleep(Duration::from_millis(100));
///
/// // Notify the other thread
/// wait_queue.notify_one();
///
/// // Wait for the other thread to finish
/// handle.join().unwrap();
/// ```
#[derive(Debug)]
pub struct HybridFutex {
    counter: AtomicIsize,
    queue: SegQueue<Waiter>,
}

impl Default for HybridFutex {
    fn default() -> Self {
        Self::new()
    }
}

impl HybridFutex {
    /// Creates a new HybridFutex with an initial counter of 0 and an empty
    /// queue of waiters.
    ///
    /// # Examples
    ///
    /// ```
    /// use hybridfutex::HybridFutex;
    ///
    /// let wait_queue = HybridFutex::new();
    /// ```
    pub fn new() -> Self {
        Self {
            counter: AtomicIsize::new(0),
            queue: SegQueue::new(),
        }
    }
    /// Returns the current value of the counter of this HybridFutex.
    ///
    /// # Examples
    ///
    /// ```
    /// use hybridfutex::HybridFutex;
    ///
    /// let wait_queue = HybridFutex::new();
    ///
    /// assert_eq!(wait_queue.get_counter(), 0);
    /// ```
    pub fn get_counter(&self) -> isize {
        self.counter.load(Ordering::Relaxed)
    }
    /// Blocks the current thread until it is notified by another thread using
    /// the `notify_one` or `notify_many` method. The method increments the
    /// counter of the HybridFutex to indicate that the current thread is
    /// waiting. If the counter is already negative, the method does not
    /// block the thread and immediately returns.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::sync::Arc;
    /// use std::thread;
    /// use std::time::Duration;
    /// use hybridfutex::HybridFutex;
    ///
    /// let wait_queue = Arc::new(HybridFutex::new());
    /// let wait_queue_clone = wait_queue.clone();
    ///
    /// // Spawn a thread that waits for a notification from another thread
    /// let handle = thread::spawn(move || {
    ///     println!("Thread 1 is waiting");
    ///     wait_queue_clone.wait_sync();
    ///     println!("Thread 1 is notified");
    /// });
    ///
    /// // Wait for a short time before notifying the other thread
    /// thread::sleep(Duration::from_millis(100));
    ///
    /// // Notify the other thread
    /// wait_queue.notify_one();
    ///
    /// // Wait for the other thread to finish
    /// handle.join().unwrap();
    /// ```
    pub fn wait_sync(&self) {
        let old_counter = self.counter.fetch_add(1, Ordering::SeqCst);
        if old_counter >= 0 {
            let awaken = AtomicBool::new(false);
            self.queue.push(Waiter::Sync(SyncWaiter {
                awaken: &awaken,
                thread: current(),
            }));
            while {
                thread::park();
                !awaken.load(Ordering::Acquire)
            } {}
        }
    }
    /// Returns a `WaitFuture` that represents a future that resolves when the
    /// current thread is notified by another thread using the `notify_one` or
    /// `notify_many` method. The method increments the counter of the
    /// HybridFutex to indicate that the current thread is waiting.
    /// If the counter is already negative, the future immediately resolves.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::sync::Arc;
    /// use std::thread;
    /// use std::time::Duration;
    /// use hybridfutex::HybridFutex;
    /// use futures::executor::block_on;
    ///
    /// let wait_queue = Arc::new(HybridFutex::new());
    ///
    /// // Spawn a thread that waits for a notification from another thread
    /// let wqc = wait_queue.clone();
    /// let handle = thread::spawn(move || {
    ///     let fut = wqc.wait_async();
    ///     let _ = block_on(fut);
    ///     println!("Thread 1 is notified");
    /// });
    ///
    /// // Wait for a short time before notifying the other thread
    /// thread::sleep(Duration::from_millis(100));
    ///
    /// // Notify the other thread
    /// wait_queue.notify_one();
    ///
    /// // Wait for the other thread to finish
    /// handle.join().unwrap();
    /// ```
    pub fn wait_async(&self) -> WaitFuture {
        WaitFuture {
            state: 0.into(),
            wq: self,
        }
    }

    /// Notifies one waiting thread that is waiting on this HybridFutex using
    /// the `wait_sync` or `wait_async` method. If there is no current waiting
    /// threads, this function call indirectly notifies future call to
    /// `wait_sync` or `wait_async` using the internal counter.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::sync::Arc;
    /// use std::thread;
    /// use std::time::Duration;
    /// use hybridfutex::HybridFutex;
    ///
    /// let wait_queue = Arc::new(HybridFutex::new());
    /// let wait_queue_clone = wait_queue.clone();
    ///
    /// // Spawn a thread that waits for a notification from another thread
    /// let handle = thread::spawn(move || {
    ///     println!("Thread 1 is waiting");
    ///     wait_queue_clone.wait_sync();
    ///     println!("Thread 1 is notified");
    /// });
    ///
    /// // Wait for a short time before notifying the other thread
    /// thread::sleep(Duration::from_millis(100));
    ///
    /// // Notify the other thread
    /// wait_queue.notify_one();
    ///
    /// // Wait for the other thread to finish
    /// handle.join().unwrap();
    /// ```
    pub fn notify_one(&self) {
        let old_counter = self.counter.fetch_sub(1, Ordering::SeqCst);
        if old_counter > 0 {
            loop {
                if let Some(waker) = self.queue.pop() {
                    waker.wake();
                    break;
                }
            }
        }
    }

    /// Notifies a specified number of waiting threads that are waiting on this
    /// HybridFutex using the `wait_sync` or `wait_async` method. If there are
    /// less waiting threads than provided count, it indirectly notifies
    /// futures calls to to `wait_sync` and `wait_async` using the internal
    /// counter.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::sync::Arc;
    /// use std::thread;
    /// use std::time::Duration;
    /// use hybridfutex::HybridFutex;
    ///
    /// let wait_queue = Arc::new(HybridFutex::new());
    ///
    /// // Spawn multiple threads that wait for a notification from another thread
    /// let handles: Vec<_> = (0..3).map(|i| {
    ///     let wait_queue_clone = wait_queue.clone();
    ///     thread::spawn(move || {
    ///         println!("Thread {} is waiting", i);
    ///         wait_queue_clone.wait_sync();
    ///         println!("Thread {} is notified", i);
    ///     })
    /// }).collect();
    ///
    /// // Wait for a short time before notifying the threads
    /// thread::sleep(Duration::from_millis(100));
    ///
    /// // Notify two threads
    /// wait_queue.notify_many(2);
    ///
    /// // Notify single thread
    /// wait_queue.notify_one();
    ///
    /// // Wait for the other threads to finish
    /// for handle in handles {
    ///     handle.join().unwrap();
    /// }
    /// ```
    pub fn notify_many(&self, count: usize) {
        let count = count as isize;
        let old_counter = self.counter.fetch_sub(count, Ordering::SeqCst);
        if old_counter > 0 {
            for _ in 0..old_counter.min(count) {
                loop {
                    if let Some(waker) = self.queue.pop() {
                        waker.wake();
                        break;
                    }
                }
            }
        }
    }
}

enum Waiter {
    Sync(SyncWaiter),
    Async(AsyncWaiter),
}

unsafe impl Send for Waiter {}

impl Waiter {
    fn wake(self) {
        match self {
            Waiter::Sync(w) => w.wake(),
            Waiter::Async(w) => w.wake(),
        }
    }
}

struct SyncWaiter {
    awaken: *const AtomicBool,
    thread: Thread,
}

impl SyncWaiter {
    fn wake(self) {
        unsafe {
            (*self.awaken).store(true, Ordering::Release);
        }
        self.thread.unpark();
    }
}

struct AsyncWaiter {
    state: *const AtomicIsize,
    waker: Waker,
}

impl AsyncWaiter {
    fn wake(self) {
        unsafe {
            (*self.state).store(!0, Ordering::Release);
        }
        self.waker.wake();
    }
}
#[derive(Debug)]
/// A future representing a thread that is waiting for a notification from
/// another thread using a `HybridFutex` synchronization primitive.
pub struct WaitFuture<'a> {
    /// The current state of the future, represented as an `AtomicIsize`.
    /// The value of this field is `0` if the future has not yet been polled,
    /// `1` if the future is waiting for a notification, and `!0` if the future
    /// has been notified.
    state: AtomicIsize,

    /// A reference to the `HybridFutex` that this future is waiting on.
    wq: &'a HybridFutex,
}

impl<'a> Future for WaitFuture<'a> {
    type Output = ();
    /// Polls the future, returning `Poll::Pending` if the future is still waiting
    /// for a notification, and `Poll::Ready(())` if the future has been notified.
    ///
    /// If the future has not yet been polled, this method increments the counter
    /// of the `HybridFutex` that the future is waiting on to indicate that the
    /// current thread is waiting. If the counter is already negative, the future
    /// immediately resolves and returns `Poll::Ready(())`. Otherwise, the method
    /// pushes a new `AsyncWaiter` onto the queue of waiters for the `HybridFutex`,
    /// and returns `Poll::Pending`.
    ///
    /// If the future has already been polled and the value of the `state` field is
    /// `1`, this method simply returns `Poll::Pending` without modifying the state
    /// or the queue of waiters.
    ///
    /// If the future has already been notified and the value of the `state` field
    /// is `!0`, this method returns `Poll::Ready(())` without modifying the state
    /// or the queue of waiters.
    fn poll(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<()> {
        match self.state.load(Ordering::Acquire) {
            0 => {
                // If the future has not yet been polled, increment the counter
                // of the HybridFutex and push a new AsyncWaiter onto the queue.
                let old_counter = self.wq.counter.fetch_add(1, Ordering::SeqCst);
                if old_counter >= 0 {
                    self.state.store(1, Ordering::Relaxed);
                    self.wq.queue.push(Waiter::Async(AsyncWaiter {
                        state: &self.state,
                        waker: cx.waker().clone(),
                    }));
                    Poll::Pending
                } else {
                    // If the counter is negative, the future has already been
                    // notified, so set the state to !0 and return Poll::Ready(()).
                    self.state.store(!0, Ordering::Relaxed);
                    Poll::Ready(())
                }
            }
            1 => Poll::Pending,
            _ => Poll::Ready(()),
        }
    }
}

impl<'a> Drop for WaitFuture<'a> {
    /// Drops the future, checking whether it has been polled before and
    /// panicking if it has not. This is to prevent potential memory leaks
    /// if the future is dropped before being polled.
    fn drop(&mut self) {
        if self.state.load(Ordering::Relaxed) == 1 {
            abort();
        }
    }
}