1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
// MIT/Apache2 License

//! This crate provides an abstraction for outsourcing a piece of work to another thread. This is a common
//! pattern in not only async programs, but OS-specific programs where it is expected that a certain piece of
//! work is done on a certain thread. To accomplish this, the crate provides a `Two`, short for "Two Way
//! Oneshot" (not a recursive acronym). It is implemented as a sender and a receiver, similar to other channels
//! in the Rust ecosystem. The primary difference is that the sender comes bundled with a data type of your
//! choice, and that the sender and receiver are both consumed upon data transmission.
//!
//! # Example
//!
//! Creates a thread that processes the work of squaring a number.
//!
//! ```
//! use orphan_crippler::two;
//! use std::thread;
//!
//! let (mut sender, receiver) = two::<i32, i32>(5);
//!
//! thread::spawn(move || {
//!     let input = sender.input().unwrap();
//!     let result = input * input;
//!     sender.send::<i32>(result);
//! });
//!
//! assert_eq!(receiver.recv(), 25);
//! ```
//!
//! # Features
//!
//! If the `parking_lot` feature is enabled, this crate uses mutexes from the `parking_lot` crate internally,
//! rather than `std::sync` mutexes. This is recommended if you are using `parking_lot` mutexes elsewhere in your
//! application.

#![deprecated = "use crossbeam_channel instead"]
#![warn(clippy::pedantic)]
#![allow(clippy::match_wild_err_arm)]
#![allow(clippy::single_match_else)]
#![allow(unused_unsafe)]

use std::{
    any::{self, Any},
    cell::UnsafeCell,
    future::Future,
    marker::PhantomData,
    mem::{self, MaybeUninit},
    pin::Pin,
    ptr,
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc,
    },
    task::{Context, Poll, Waker},
    thread::{self, Thread},
};

#[cfg(not(feature = "parking_lot"))]
use std::sync;

/// Creates a sender and a receiver for the two-way oneshot (`two`) channel.
///
/// # Example
///
/// ```
/// use orphan_crippler::two;
///
/// let (sender, receiver) = two::<i32, i32>(16);
/// ```
#[inline]
pub fn two<I, R: Any + Send>(input: I) -> (Sender<I>, Receiver<R>) {
    let inner = Arc::new(Inner {
        result: UnsafeCell::new(MaybeUninit::uninit()),
        is_result_set: AtomicBool::new(false),
        tow: Mutex::new(ThreadOrWaker::None),
    });

    let sender = Sender {
        inner: inner.clone(),
        input: Some(input),
    };

    let receiver = Receiver {
        inner,
        _marker: PhantomData,
    };

    (sender, receiver)
}

/// Equivalent to [`complete`], but uses a boxed `R` to avoid a layer of indirection, if you already have
/// one of those.
#[inline]
pub fn complete_boxed<R: Any + Send>(boxed: Box<R>) -> Receiver<R> {
    Receiver {
        inner: Arc::new(Inner {
            result: UnsafeCell::new(MaybeUninit::new(boxed)),
            is_result_set: AtomicBool::new(true),
            tow: Mutex::new(ThreadOrWaker::None),
        }),
        _marker: PhantomData,
    }
}

/// Creates a receiver that automatically resolves.
///
/// # Example
///
/// ```
/// use orphan_crippler::complete;
///
/// let recv = complete::<i32>(6);
/// assert_eq!(recv.recv(), 6);
/// ```
#[inline]
pub fn complete<R: Any + Send>(result: R) -> Receiver<R> {
    complete_boxed(Box::new(result))
}

/* Sender and Receiver structs, for actually using the channel */

/// The sender for the two-way oneshot channel. It is consumed upon sending its data.
#[must_use]
pub struct Sender<I> {
    // the part of the heap where the object proper is kept
    inner: Arc<dyn InnerGeneric + Send + Sync + 'static>,
    // the object we're supposed to be delivering
    input: Option<I>,
}

impl<I> Sender<I> {
    /// Get the input for this two-way oneshot channel.
    ///
    /// # Example
    ///
    /// ```
    /// use orphan_crippler::two;
    ///
    /// let (mut sender, _) = two::<i32, ()>(43);
    /// assert_eq!(sender.input(), Some(43));
    /// ```
    #[inline]
    pub fn input(&mut self) -> Option<I> {
        self.input.take()
    }

    /// Send the result back down the channel to the receiver.
    ///
    /// # Example
    ///
    /// ```
    /// use orphan_crippler::two;
    ///
    /// let (sender, receiver) = two::<(), i32>(());
    /// sender.send::<i32>(37);
    /// assert_eq!(receiver.recv(), 37);
    /// ```
    #[inline]
    pub fn send<T: Any + Send>(self, res: T) {
        // SAFETY: called before wake(), so we the end user won't be informed that there's a value until
        //         we're ready
        unsafe {
            self.inner.set_result(Box::new(res));
        }
        self.inner.wake();
    }
}

/// The receiver for the two-way oneshot channel. It is consumed upon receiving its data.
#[must_use]
pub struct Receiver<R> {
    // the part of the heap where the channel is kept
    inner: Arc<Inner<R>>,
    _marker: PhantomData<Option<R>>,
}

impl<R: Any + Send> Receiver<R> {
    /// Wait until we get a result.
    ///
    /// # Panics
    ///
    /// Panics if the sender thread sends a type other than `R` into the channel.
    ///
    /// # Example
    ///
    /// ```
    /// use orphan_crippler::two;
    ///
    /// let (mut sender, receiver) = two::<i32, i32>(2);
    /// let result = sender.input().unwrap() + 3;
    /// sender.send(result);
    /// assert_eq!(receiver.recv(), result);
    /// ```
    #[inline]
    #[must_use]
    pub fn recv(self) -> R {
        let res = self.inner.park_until_result();
        *res
    }

    /// Alias for recv()
    #[inline]
    #[must_use]
    pub fn wait(self) -> R {
        self.recv()
    }
}

impl<R: Any + Send> Future for Receiver<R> {
    type Output = R;

    #[inline]
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<R> {
        if self.inner.is_result_set() {
            // SAFETY: we know the result is set!
            Poll::Ready(*unsafe { self.inner.get_result() })
        } else {
            *self.inner.tow.lock() = ThreadOrWaker::Waker(cx.waker().clone());
            Poll::Pending
        }
    }
}

/// The inner state of the Two.
struct Inner<T> {
    // the parker or waker we're blocked on
    tow: Mutex<ThreadOrWaker>,
    // whether or not "result" is set
    is_result_set: AtomicBool,
    // the result to be delivered to the user
    result: UnsafeCell<MaybeUninit<Box<T>>>,
}

// SAFETY: result is always protected by tow mutex and is_result_set
unsafe impl<T: Send> Send for Inner<T> {}
unsafe impl<T: Send> Sync for Inner<T> {}

impl<T: Any + Send> Inner<T> {
    #[inline]
    fn is_result_set(&self) -> bool {
        self.is_result_set.load(Ordering::Acquire)
    }

    #[inline]
    unsafe fn get_result(&self) -> Box<T> {
        // SAFETY: only ever called when there is only one reference left to the
        //         Inner<R>, or when we know the other reference is currently
        //         waiting on "tow" or "is_result_set"
        unsafe { MaybeUninit::assume_init(ptr::read(self.result.get())) }
    }

    #[inline]
    fn park_until_result(&self) -> Box<T> {
        loop {
            if self.is_result_set() {
                // SAFETY: we know the result is set
                return unsafe { self.get_result() };
            }

            let cur_thread = thread::current();
            *self.tow.lock() = ThreadOrWaker::Thread(cur_thread);

            if self.is_result_set() {
                // SAFETY: see above
                return unsafe { self.get_result() };
            }

            thread::park();
        }
    }
}

trait InnerGeneric {
    unsafe fn set_result(&self, item: Box<dyn Any + Send>);
    fn wake(&self);
}

impl<T: Any + Send> InnerGeneric for Inner<T> {
    #[inline]
    unsafe fn set_result(&self, item: Box<dyn Any + Send>) {
        // first, check to ensure we're using the proper type
        let item = match item.downcast::<T>() {
            Err(_) => panic!(
                "Passed item is not of expected type \"{}\"",
                any::type_name::<T>()
            ),
            Ok(item) => item,
        };

        // SAFETY: only called when is_result_set has yet to be set
        unsafe { ptr::write(self.result.get(), MaybeUninit::new(item)) };
        self.is_result_set.store(true, Ordering::Release);
    }

    #[inline]
    fn wake(&self) {
        let mut lock = self.tow.lock();
        match mem::take(&mut *lock) {
            ThreadOrWaker::None => (),
            ThreadOrWaker::Thread(t) => t.unpark(),
            ThreadOrWaker::Waker(w) => w.wake(),
        }
    }
}

enum ThreadOrWaker {
    None,
    Thread(Thread),
    Waker(Waker),
}

impl Default for ThreadOrWaker {
    #[inline]
    fn default() -> Self {
        Self::None
    }
}

#[cfg(feature = "parking_lot")]
#[repr(transparent)]
struct Mutex<T>(parking_lot::Mutex<T>);

#[cfg(feature = "parking_lot")]
impl<T> Mutex<T> {
    #[inline]
    fn new(val: T) -> Self {
        Self(parking_lot::Mutex::new(val))
    }

    #[inline]
    fn lock(&self) -> parking_lot::MutexGuard<'_, T> {
        self.0.lock()
    }
}

#[cfg(not(feature = "parking_lot"))]
struct Mutex<T>(sync::Mutex<T>);

#[cfg(not(feature = "parking_lot"))]
impl<T> Mutex<T> {
    #[inline]
    fn new(val: T) -> Self {
        Self(sync::Mutex::new(val))
    }

    #[inline]
    fn lock(&self) -> sync::MutexGuard<'_, T> {
        self.0.lock().expect("Unable to lock mutex")
    }
}