either_slot/
tuple.rs

1mod utils;
2
3use tuple_list::{Tuple, TupleList};
4
5pub use self::utils::{Concat, Construct, Count, InElement, Index};
6use crate::{array::Element, include::*};
7
8#[derive(Debug)]
9struct Inner<T: InElement> {
10    count: AtomicUsize,
11    place: T::Place,
12}
13
14impl<T: InElement> Inner<T> {
15    const LAYOUT: Layout = Layout::new::<Self>();
16
17    fn new() -> NonNull<Self> {
18        let memory = match Global.allocate(Self::LAYOUT) {
19            Ok(memory) => memory.cast::<Self>(),
20            Err(_) => handle_alloc_error(Self::LAYOUT),
21        };
22        let value = Self {
23            count: AtomicUsize::new(T::TUPLE_LIST_SIZE),
24            place: T::init(),
25        };
26        // SAFETY: We own this fresh uninitialized memory whose layout is the same as
27        // this type.
28        unsafe { memory.as_ptr().write(value) }
29        memory
30    }
31
32    /// # Safety
33    ///
34    /// 1. `this` must own a valid `Inner` uniquely (a.k.a. no other references
35    ///    to the structure), and use an [`Acquire`] fence if atomic ordering is
36    ///    desired.
37    /// 2. The caller must not use `this` again since it is consumed and dropped
38    ///    in this function.
39    /// 3. `TAKEN` must be corresponding to the possible previous operation of
40    ///    taking out all the data in the tuple.
41    unsafe fn drop_in_place(this: NonNull<Self>) -> <T::Take as TupleList>::Tuple {
42        // SAFETY: See contract 1.
43        let inner = unsafe { this.as_ref() };
44
45        let tuple = unsafe { T::take(&inner.place) }.into_tuple();
46
47        // SAFETY: See contract 2.
48        unsafe { Global.deallocate(this.cast(), Self::LAYOUT) };
49
50        tuple
51    }
52}
53
54/// The whole tuple of concatenated head, current and tail tuples.
55pub type Whole<Head, Current, Tail> =
56    <<Head as Concat<(Current,)>>::Output as Concat<Tail>>::Output;
57
58/// The whole tuple list of concatenated head, current and tail tuples.
59pub type List<Head, Current, Tail> = <Whole<Head, Current, Tail> as Tuple>::TupleList;
60
61type Ptr<Head, Current, Tail> = NonNull<Inner<List<Head, Current, Tail>>>;
62
63/// The storage place of the tuple slot senders.
64pub type Place<Head, Current, Tail> = <List<Head, Current, Tail> as InElement>::Place;
65
66/// The corresponding tuple list type of the error type of the returned value of
67/// [`send`](Sender::send).
68pub type TakeList<Head, Current, Tail> = <List<Head, Current, Tail> as InElement>::Take;
69
70/// The error type of the returned value of [`send`](Sender::send); a tuple of
71/// [`Option`]s of stored values.
72pub type Take<Head, Current, Tail> = <TakeList<Head, Current, Tail> as TupleList>::Tuple;
73
74/// The placer of an tuple slot.
75///
76/// The 3 generic represents the position of the target element of the tuple
77/// this sender attempts to place. For example, `Sender<(A, B), C, (D, E, F)>`
78/// represents a sender of `C` in `(A, B, C, D, E, F)`.
79///
80/// The user can only access the slot once by this structure.
81#[derive(Debug)]
82pub struct Sender<Head, Current, Tail>(Ptr<Head, Current, Tail>)
83where
84    Head: Concat<(Current,)>,
85    <Head as Concat<(Current,)>>::Output: Concat<Tail>,
86    Tail: Tuple,
87    <Whole<Head, Current, Tail> as Tuple>::TupleList: InElement;
88
89// SAFETY: We satisfy the contract by exposing no reference to any associated
90// function, and provide an atomic algorithm during its access or dropping
91// process, which satisfies the need of `Send`.
92unsafe impl<Head, Current, Tail> Send for Sender<Head, Current, Tail>
93where
94    Head: Concat<(Current,)> + Send,
95    Current: Send,
96    <Head as Concat<(Current,)>>::Output: Concat<Tail>,
97    Tail: Tuple + Send,
98    <Whole<Head, Current, Tail> as Tuple>::TupleList: InElement,
99{
100}
101
102/// The typenum count of a tuple.
103pub type CountOf<T> = <<T as Tuple>::TupleList as Count>::Count;
104
105impl<Head, Current, Tail> Sender<Head, Current, Tail>
106where
107    Head: Concat<(Current,)>,
108    <Head as Concat<(Current,)>>::Output: Concat<Tail>,
109    Tail: Tuple,
110    <Whole<Head, Current, Tail> as Tuple>::TupleList: InElement,
111{
112    /// # Safety
113    ///
114    /// `inner` must hold a valid immutable reference to `Inner`.
115    unsafe fn new(inner: Ptr<Head, Current, Tail>) -> Self {
116        Sender(inner)
117    }
118
119    /// Place the value into the slot, or obtain the resulting tuple if no
120    /// other senders exist any longer.
121    pub fn send(self, value: Current) -> Result<(), Take<Head, Current, Tail>>
122    where
123        <Head as Tuple>::TupleList: Count,
124        Place<Head, Current, Tail>: Index<CountOf<Head>, Output = Element<Current>>,
125    {
126        let pointer = self.0;
127        // SAFETY: See contract 1 in `Self::new`.
128        let inner = unsafe { pointer.as_ref() };
129        let elem: &Element<Current> = Index::<CountOf<Head>>::index(&inner.place);
130
131        // SAFETY: Each sender has its ownership of one `Element` storage in its
132        // `inner`, and thus the placing is safe. Besides, the appending `Release`
133        // ordering is supplied.
134        unsafe { elem.place(value) };
135        let fetch_sub = inner.count.fetch_sub(1, Release);
136
137        // We don't want to call the dropper anymore because it decreases the reference
138        // count once more.
139        mem::forget(self);
140
141        if fetch_sub == 1 {
142            // SAFETY: We use `Acquire` fence here to observe other executions of placing
143            // values. And since the reference count is now 0, we owns `inner`, so it can be
144            // dropped, returning the tuple safely.
145            atomic::fence(Acquire);
146            return Err(unsafe { Inner::drop_in_place(pointer) });
147        }
148        Ok(())
149    }
150}
151
152impl<Head, Current, Tail> Drop for Sender<Head, Current, Tail>
153where
154    Head: Concat<(Current,)>,
155    <Head as Concat<(Current,)>>::Output: Concat<Tail>,
156    Tail: Tuple,
157    <Whole<Head, Current, Tail> as Tuple>::TupleList: InElement,
158{
159    fn drop(&mut self) {
160        let pointer = self.0;
161        // SAFETY: See contract 1 in `Self::new`.
162        let inner = unsafe { pointer.as_ref() };
163        // No additional ordering is used because we now have no more
164        // observations/modifications to slot values, except...
165        if inner.count.fetch_sub(1, Relaxed) == 1 {
166            // SAFETY: ... we now owns our `inner`.
167            atomic::fence(Acquire);
168            unsafe { Inner::drop_in_place(pointer) };
169        }
170    }
171}
172
173/// Create a tuple slot, and return a tuple of senders targeting their own
174/// respective element in the slot.
175///
176/// # Examples
177///
178/// ```rust
179/// let (s1, s2, s3) = either_slot::tuple::<(&str, u8, char)>();
180/// s1.send("1").unwrap();
181/// s2.send(2).unwrap();
182/// let ret = s3.send('3').unwrap_err();
183/// assert_eq!(ret, (Some("1"), Some(2), Some('3')));
184/// ```
185///
186/// ```rust
187/// let (s1, s2, s3) = either_slot::tuple::<(i32, u8, char)>();
188/// drop(s1);
189/// s3.send('3').unwrap();
190/// let ret = s2.send(2).unwrap_err();
191/// assert_eq!(ret, (None, Some(2), Some('3')));
192/// ```
193pub fn tuple<T>() -> <T::Sender as TupleList>::Tuple
194where
195    T: Construct,
196    <T as Tuple>::TupleList: InElement,
197{
198    let inner = Inner::<T::TupleList>::new();
199    unsafe { T::construct(inner) }.into_tuple()
200}
201
202#[cfg(test)]
203mod tests {
204    #[cfg(not(loom))]
205    use std::thread;
206
207    #[cfg(loom)]
208    use loom::thread;
209
210    use super::tuple;
211
212    #[test]
213    fn send() {
214        fn inner() {
215            let (s1, s2, s3) = tuple::<(i32, u8, char)>();
216            let j2 = thread::spawn(|| s2.send(2));
217            let j3 = thread::spawn(|| s3.send('3'));
218
219            let res = s1.send(1).and(j2.join().unwrap()).and(j3.join().unwrap());
220            assert_eq!(res, Err((Some(1), Some(2), Some('3'))));
221        }
222
223        #[cfg(not(loom))]
224        inner();
225        #[cfg(loom)]
226        loom::model(inner);
227    }
228
229    #[test]
230    fn drop_one() {
231        fn inner() {
232            let (s1, s2, s3) = tuple::<(i32, u8, char)>();
233            let j2 = thread::spawn(|| {
234                drop(s2);
235                Ok(())
236            });
237            let j3 = thread::spawn(|| s3.send('3'));
238
239            let res = s1.send(1).and(j2.join().unwrap()).and(j3.join().unwrap());
240            if let Err(r) = res {
241                assert_eq!(r, (Some(1), None, Some('3')));
242            }
243        }
244
245        #[cfg(not(loom))]
246        inner();
247        #[cfg(loom)]
248        loom::model(inner);
249    }
250}