either_slot/
array.rs

1use alloc::vec::Vec;
2use core::{
3    array,
4    iter::{self, FusedIterator, TrustedLen},
5    marker::PhantomData,
6    mem::MaybeUninit,
7    ptr,
8};
9
10use crate::include::*;
11
12const MAX_COUNT: usize = isize::MAX as _;
13
14/// The storage of elements in the slot.
15///
16/// The user should only use this type when constructing the type of custom
17/// storaging [`Place`]s. Only [`Default::default`] can be used to initialize
18/// this type.
19#[derive(Debug)]
20pub struct Element<T> {
21    storage: UnsafeCell<MaybeUninit<T>>,
22    placed: AtomicBool,
23}
24
25impl<T> Default for Element<T> {
26    fn default() -> Self {
27        Element {
28            storage: UnsafeCell::new(MaybeUninit::uninit()),
29            placed: AtomicBool::new(false),
30        }
31    }
32}
33
34impl<T> Element<T> {
35    pub fn vec(count: usize) -> Vec<Self> {
36        iter::repeat_with(Default::default)
37            .take(count)
38            .collect::<Vec<_>>()
39    }
40
41    pub fn array<const N: usize>() -> [Self; N] {
42        array::from_fn(|_| Default::default())
43    }
44
45    /// # Safety
46    ///
47    /// - This element slot must not hold a value when the function is called.
48    /// - The caller must append a [`Release`] fence if atomic ordering is
49    ///   desired.
50    pub(crate) unsafe fn place(&self, data: T) {
51        unsafe { self.storage.with_mut(|ptr| (*ptr).write(data)) };
52        self.placed.store(true, Relaxed);
53    }
54
55    /// # Safety
56    ///
57    /// - This function must be called only once if this element slot holds a
58    ///   value.
59    /// - The caller must prepend an [`Acquire`] fence if atomic ordering is
60    ///   desired.
61    pub(crate) unsafe fn take(&self) -> Option<T> {
62        self.placed
63            .load(Relaxed)
64            .then(|| unsafe { self.storage.with_mut(|ptr| (*ptr).assume_init_read()) })
65    }
66}
67
68/// The custom storage place of [`Element`]s in the slot.
69///
70/// This trait should not be directly implemented; users should implement
71/// [`AsRef`] to `[Element<T>]` instead. We don't make this trait an alias of
72/// [`core::ops::Deref`] because arrays don't implement this trait.
73pub trait Place<T>: AsRef<[Element<T>]> {}
74impl<T, P> Place<T> for P where P: AsRef<[Element<T>]> {}
75
76struct Inner<T, P>
77where
78    P: Place<T>,
79{
80    count: AtomicUsize,
81    place: P,
82    marker: PhantomData<[T]>,
83}
84
85impl<T, P> Inner<T, P>
86where
87    P: Place<T>,
88{
89    const LAYOUT: Layout = Layout::new::<Self>();
90
91    fn new(place: P) -> NonNull<Self> {
92        let count = place.as_ref().len();
93        assert!(
94            count <= MAX_COUNT,
95            "the length of the slot must not exceed `isize::MAX`"
96        );
97        assert!(count > 0, "the slot must not be empty");
98
99        let memory = match Global.allocate(Self::LAYOUT) {
100            Ok(memory) => memory.cast::<Self>(),
101            Err(_) => handle_alloc_error(Self::LAYOUT),
102        };
103        let value = Self {
104            count: AtomicUsize::new(count),
105            place,
106            marker: PhantomData,
107        };
108        // SAFETY: We own this fresh uninitialized memory whose layout is the same as
109        // this type.
110        unsafe { memory.as_ptr().write(value) }
111        memory
112    }
113
114    /// # Safety
115    ///
116    /// 1. `this` must own a valid `Inner` uniquely (a.k.a. no other references
117    ///    to the structure), and use an [`Acquire`] fence if atomic ordering is
118    ///    desired.
119    /// 2. The caller must not use `this` again since it is consumed and dropped
120    ///    in this function.
121    unsafe fn drop_in_place(this: NonNull<Self>, start: usize) {
122        // SAFETY: See contract 1.
123        let inner = unsafe { this.as_ref() };
124
125        for elem in inner.place.as_ref().get(start..).into_iter().flatten() {
126            // SAFETY: See contract 1.
127            unsafe { drop(elem.take()) }
128        }
129        // SAFETY: See contract 2.
130        unsafe { ptr::drop_in_place(this.as_ptr()) };
131        // SAFETY: See contract 2.
132        unsafe { Global.deallocate(this.cast(), Inner::<T, P>::LAYOUT) };
133    }
134}
135
136/// The placer of an array slot.
137///
138/// The user can only access the slot once by this structure.
139#[derive(Debug)]
140pub struct Sender<T, P>
141where
142    P: Place<T>,
143{
144    inner: NonNull<Inner<T, P>>,
145    index: usize,
146}
147
148// SAFETY: We satisfy the contract by exposing no reference to any associated
149// function, and provide an atomic algorithm during its access or dropping
150// process, which satisfies the need of `Send`.
151unsafe impl<T: Send, P: Place<T>> Send for Sender<T, P> {}
152
153impl<T, P> Sender<T, P>
154where
155    P: Place<T>,
156{
157    /// # Safety
158    ///
159    /// 1. `inner` must hold a valid immutable reference to `Inner`.
160    /// 2. `start` must be less than the length of `place` in `inner`.
161    unsafe fn new(inner: NonNull<Inner<T, P>>, index: usize) -> Self {
162        Sender { inner, index }
163    }
164
165    /// Place the value into the slot, or obtain the resulting iterator if no
166    /// other senders exist any longer.
167    pub fn send(self, value: T) -> Result<(), SenderIter<T, P>> {
168        // SAFETY: See contract 1 in `Self::new`.
169        let inner = unsafe { self.inner.as_ref() };
170        // SAFETY: See contract 2 in `Self::new`.
171        let elem = unsafe { inner.place.as_ref().get_unchecked(self.index) };
172
173        // SAFETY: Each sender has its ownership of one `Element` storage in its
174        // `inner`, and thus the placing is safe. Besides, the appending `Release`
175        // ordering is supplied.
176        unsafe { elem.place(value) };
177        let fetch_sub = inner.count.fetch_sub(1, Release);
178
179        let pointer = self.inner;
180        // We don't want to call the dropper anymore because it decreases the reference
181        // count once more.
182        mem::forget(self);
183
184        if fetch_sub == 1 {
185            // SAFETY: We use `Acquire` fence here to observe other executions of placing
186            // values. And since the reference count is now 0, we owns `inner`, so it can be
187            // handed to the iterator safely.
188            atomic::fence(Acquire);
189            return Err(unsafe { SenderIter::new(pointer) });
190        }
191        Ok(())
192    }
193}
194
195impl<T, P: Place<T>> Drop for Sender<T, P> {
196    fn drop(&mut self) {
197        // SAFETY: See contract 1 in `Self::new`.
198        let inner = unsafe { self.inner.as_ref() };
199        // No additional ordering is used because we now have no more
200        // observations/modifications to slot values, except...
201        if inner.count.fetch_sub(1, Relaxed) == 1 {
202            // SAFETY: ... we now owns our `inner`.
203            atomic::fence(Acquire);
204            unsafe { Inner::drop_in_place(self.inner, 0) }
205        }
206    }
207}
208
209/// The resulting iterator of values that all the senders have placed into the
210/// slot.
211///
212/// Obtaining this structure means other senders all have been consumed or
213/// dropped, which causes the inconsistency of the count of values yielded.
214#[derive(Debug)]
215pub struct SenderIter<T, P>
216where
217    P: Place<T>,
218{
219    inner: NonNull<Inner<T, P>>,
220    index: usize,
221}
222
223// SAFETY: We now owns `inner`.
224unsafe impl<T: Send, P: Place<T>> Send for SenderIter<T, P> {}
225
226impl<T, P: Place<T>> SenderIter<T, P> {
227    /// # Safety
228    ///
229    /// `inner` must owns a valid `Inner`.
230    unsafe fn new(inner: NonNull<Inner<T, P>>) -> Self {
231        Self { inner, index: 0 }
232    }
233}
234
235impl<T, P: Place<T>> Iterator for SenderIter<T, P> {
236    type Item = T;
237
238    fn next(&mut self) -> Option<Self::Item> {
239        // SAFETY: See contract 1 in `Sender::new`.
240        let inner = unsafe { self.inner.as_ref() };
241
242        // `index` in the iterator is not always less than its length, so we use the
243        // safe `get` to access the element storage.
244        while let Some(elem) = inner.place.as_ref().get(self.index) {
245            self.index += 1;
246
247            // SAFETY: We now owns `inner`, so no atomic ordering is needed; each element is
248            // only taken once since `index` is incremented at every yield.
249            if let Some(data) = unsafe { elem.take() } {
250                return Some(data);
251            }
252        }
253        None
254    }
255
256    fn size_hint(&self) -> (usize, Option<usize>) {
257        // SAFETY: See contract 1 in `Sender::new`.
258        let inner = unsafe { self.inner.as_ref() };
259        let len = inner.place.as_ref().len();
260        (0, Some(len))
261    }
262}
263
264impl<T, P: Place<T>> FusedIterator for SenderIter<T, P> {}
265
266impl<T, P: Place<T>> Drop for SenderIter<T, P> {
267    fn drop(&mut self) {
268        // SAFETY: We now owns `inner`, so no atomic ordering is needed; `index` is
269        // always equal or less then the length of `place`.
270        unsafe { Inner::drop_in_place(self.inner, self.index) }
271    }
272}
273
274/// The initialization iterator for senders.
275///
276/// The senders are ALREADY initialized upon the construction of this iterator.
277/// This structure is implemented to get rid of additional potential memory
278/// allocations.
279///
280/// When the iterator is dropped, it will drop all the senders yet to be
281/// yielded.
282#[derive(Debug)]
283pub struct InitIter<T, P: Place<T>> {
284    inner: NonNull<Inner<T, P>>,
285    index: usize,
286}
287
288unsafe impl<T: Send, P: Place<T>> Send for InitIter<T, P> {}
289
290impl<T, P: Place<T>> InitIter<T, P> {
291    /// # Safety
292    ///
293    /// `inner` must owns a valid `Inner`.
294    unsafe fn new(inner: NonNull<Inner<T, P>>) -> Self {
295        InitIter { inner, index: 0 }
296    }
297}
298
299impl<T, P: Place<T>> Iterator for InitIter<T, P> {
300    type Item = Sender<T, P>;
301
302    fn next(&mut self) -> Option<Self::Item> {
303        // SAFETY: See contract 1 in `Sender::new`.
304        let inner = unsafe { self.inner.as_ref() };
305        let len = inner.place.as_ref().len();
306        if self.index < len {
307            // SAFETY: `inner` is immutable; `index` is in (0..len).
308            let s = unsafe { Sender::new(self.inner, self.index) };
309            self.index += 1;
310            Some(s)
311        } else {
312            None
313        }
314    }
315
316    fn size_hint(&self) -> (usize, Option<usize>) {
317        // SAFETY: See contract 1 in `Sender::new`.
318        let inner = unsafe { self.inner.as_ref() };
319        let len = inner.place.as_ref().len();
320        (len, Some(len))
321    }
322}
323
324impl<T, P: Place<T>> Drop for InitIter<T, P> {
325    fn drop(&mut self) {
326        self.for_each(drop)
327    }
328}
329
330impl<T, P: Place<T>> ExactSizeIterator for InitIter<T, P> {}
331
332impl<T, P: Place<T>> FusedIterator for InitIter<T, P> {}
333
334unsafe impl<T, P: Place<T>> TrustedLen for InitIter<T, P> {}
335
336/// Construct an iterator of senders to a slot, whose values will be placed on
337/// `place`.
338pub fn from_place<T, P: Place<T>>(place: P) -> InitIter<T, P> {
339    let inner = Inner::new(place);
340    // SAFETY: `inner` owns `Inner`.
341    unsafe { InitIter::new(inner) }
342}
343
344/// Construct an iterator of senders to a slot, whose values will be placed on a
345/// [`Vec`].
346pub fn vec<T>(count: usize) -> InitIter<T, Vec<Element<T>>> {
347    from_place(Element::vec(count))
348}
349
350/// Construct an array of senders to a slot, whose values will be placed on an
351/// array.
352///
353/// This function is specialized to returning an array of senders instead of an
354/// iterator in order to keep resulting length constant.
355///
356/// # Examples
357///
358/// ```rust
359/// let [s1, s2, s3] = either_slot::array();
360/// s1.send(1).unwrap();
361/// s2.send(2).unwrap();
362/// let iter = s3.send(3).unwrap_err();
363/// assert_eq!(iter.collect::<Vec<_>>(), [1, 2, 3]);
364/// ```
365///
366/// ```rust
367/// let [s1, s2, s3] = either_slot::array();
368/// drop(s1);
369/// s3.send(3).unwrap();
370/// let iter = s2.send(2).unwrap_err();
371/// assert_eq!(iter.collect::<Vec<_>>(), [2, 3]);
372/// ```
373pub fn array<T, const N: usize>() -> [Sender<T, [Element<T>; N]>; N] {
374    let inner = Inner::new(Element::array());
375    // SAFETY: `inner` is immutable; index is in (0..N).
376    array::from_fn(move |index| unsafe { Sender::new(inner, index) })
377}
378
379#[cfg(test)]
380mod tests {
381    use alloc::vec::Vec;
382    #[cfg(not(loom))]
383    use std::thread;
384
385    #[cfg(loom)]
386    use loom::thread;
387
388    use crate::array::{from_place, Element};
389
390    #[test]
391    fn send() {
392        fn inner() {
393            let j = from_place(Element::array::<3>())
394                .enumerate()
395                .map(|(i, s)| thread::spawn(move || s.send(i)))
396                .collect::<Vec<_>>();
397
398            let iter = j
399                .into_iter()
400                .map(|j| j.join().unwrap())
401                .fold(Ok(()), Result::and)
402                .unwrap_err();
403
404            assert_eq!(iter.collect::<Vec<_>>(), [0, 1, 2]);
405        }
406
407        #[cfg(not(loom))]
408        inner();
409        #[cfg(loom)]
410        loom::model(inner);
411    }
412
413    #[test]
414    fn drop_one() {
415        fn inner() {
416            let j = from_place(Element::vec(3))
417                .enumerate()
418                .map(|(i, s)| {
419                    if i != 1 {
420                        thread::spawn(move || s.send(i))
421                    } else {
422                        thread::spawn(move || {
423                            drop(s);
424                            Ok(())
425                        })
426                    }
427                })
428                .collect::<Vec<_>>();
429
430            let res = j
431                .into_iter()
432                .map(|j| j.join().unwrap())
433                .fold(Ok(()), Result::and);
434
435            if let Err(iter) = res {
436                assert_eq!(iter.collect::<Vec<_>>(), [0, 2]);
437            }
438        }
439
440        #[cfg(not(loom))]
441        inner();
442        #[cfg(loom)]
443        loom::model(inner);
444    }
445}