1use alloc::vec::Vec;
2use core::{
3 array,
4 iter::{self, FusedIterator, TrustedLen},
5 marker::PhantomData,
6 mem::MaybeUninit,
7 ptr,
8};
9
10use crate::include::*;
11
12const MAX_COUNT: usize = isize::MAX as _;
13
14#[derive(Debug)]
20pub struct Element<T> {
21 storage: UnsafeCell<MaybeUninit<T>>,
22 placed: AtomicBool,
23}
24
25impl<T> Default for Element<T> {
26 fn default() -> Self {
27 Element {
28 storage: UnsafeCell::new(MaybeUninit::uninit()),
29 placed: AtomicBool::new(false),
30 }
31 }
32}
33
34impl<T> Element<T> {
35 pub fn vec(count: usize) -> Vec<Self> {
36 iter::repeat_with(Default::default)
37 .take(count)
38 .collect::<Vec<_>>()
39 }
40
41 pub fn array<const N: usize>() -> [Self; N] {
42 array::from_fn(|_| Default::default())
43 }
44
45 pub(crate) unsafe fn place(&self, data: T) {
51 unsafe { self.storage.with_mut(|ptr| (*ptr).write(data)) };
52 self.placed.store(true, Relaxed);
53 }
54
55 pub(crate) unsafe fn take(&self) -> Option<T> {
62 self.placed
63 .load(Relaxed)
64 .then(|| unsafe { self.storage.with_mut(|ptr| (*ptr).assume_init_read()) })
65 }
66}
67
68pub trait Place<T>: AsRef<[Element<T>]> {}
74impl<T, P> Place<T> for P where P: AsRef<[Element<T>]> {}
75
76struct Inner<T, P>
77where
78 P: Place<T>,
79{
80 count: AtomicUsize,
81 place: P,
82 marker: PhantomData<[T]>,
83}
84
85impl<T, P> Inner<T, P>
86where
87 P: Place<T>,
88{
89 const LAYOUT: Layout = Layout::new::<Self>();
90
91 fn new(place: P) -> NonNull<Self> {
92 let count = place.as_ref().len();
93 assert!(
94 count <= MAX_COUNT,
95 "the length of the slot must not exceed `isize::MAX`"
96 );
97 assert!(count > 0, "the slot must not be empty");
98
99 let memory = match Global.allocate(Self::LAYOUT) {
100 Ok(memory) => memory.cast::<Self>(),
101 Err(_) => handle_alloc_error(Self::LAYOUT),
102 };
103 let value = Self {
104 count: AtomicUsize::new(count),
105 place,
106 marker: PhantomData,
107 };
108 unsafe { memory.as_ptr().write(value) }
111 memory
112 }
113
114 unsafe fn drop_in_place(this: NonNull<Self>, start: usize) {
122 let inner = unsafe { this.as_ref() };
124
125 for elem in inner.place.as_ref().get(start..).into_iter().flatten() {
126 unsafe { drop(elem.take()) }
128 }
129 unsafe { ptr::drop_in_place(this.as_ptr()) };
131 unsafe { Global.deallocate(this.cast(), Inner::<T, P>::LAYOUT) };
133 }
134}
135
136#[derive(Debug)]
140pub struct Sender<T, P>
141where
142 P: Place<T>,
143{
144 inner: NonNull<Inner<T, P>>,
145 index: usize,
146}
147
148unsafe impl<T: Send, P: Place<T>> Send for Sender<T, P> {}
152
153impl<T, P> Sender<T, P>
154where
155 P: Place<T>,
156{
157 unsafe fn new(inner: NonNull<Inner<T, P>>, index: usize) -> Self {
162 Sender { inner, index }
163 }
164
165 pub fn send(self, value: T) -> Result<(), SenderIter<T, P>> {
168 let inner = unsafe { self.inner.as_ref() };
170 let elem = unsafe { inner.place.as_ref().get_unchecked(self.index) };
172
173 unsafe { elem.place(value) };
177 let fetch_sub = inner.count.fetch_sub(1, Release);
178
179 let pointer = self.inner;
180 mem::forget(self);
183
184 if fetch_sub == 1 {
185 atomic::fence(Acquire);
189 return Err(unsafe { SenderIter::new(pointer) });
190 }
191 Ok(())
192 }
193}
194
195impl<T, P: Place<T>> Drop for Sender<T, P> {
196 fn drop(&mut self) {
197 let inner = unsafe { self.inner.as_ref() };
199 if inner.count.fetch_sub(1, Relaxed) == 1 {
202 atomic::fence(Acquire);
204 unsafe { Inner::drop_in_place(self.inner, 0) }
205 }
206 }
207}
208
209#[derive(Debug)]
215pub struct SenderIter<T, P>
216where
217 P: Place<T>,
218{
219 inner: NonNull<Inner<T, P>>,
220 index: usize,
221}
222
223unsafe impl<T: Send, P: Place<T>> Send for SenderIter<T, P> {}
225
226impl<T, P: Place<T>> SenderIter<T, P> {
227 unsafe fn new(inner: NonNull<Inner<T, P>>) -> Self {
231 Self { inner, index: 0 }
232 }
233}
234
235impl<T, P: Place<T>> Iterator for SenderIter<T, P> {
236 type Item = T;
237
238 fn next(&mut self) -> Option<Self::Item> {
239 let inner = unsafe { self.inner.as_ref() };
241
242 while let Some(elem) = inner.place.as_ref().get(self.index) {
245 self.index += 1;
246
247 if let Some(data) = unsafe { elem.take() } {
250 return Some(data);
251 }
252 }
253 None
254 }
255
256 fn size_hint(&self) -> (usize, Option<usize>) {
257 let inner = unsafe { self.inner.as_ref() };
259 let len = inner.place.as_ref().len();
260 (0, Some(len))
261 }
262}
263
264impl<T, P: Place<T>> FusedIterator for SenderIter<T, P> {}
265
266impl<T, P: Place<T>> Drop for SenderIter<T, P> {
267 fn drop(&mut self) {
268 unsafe { Inner::drop_in_place(self.inner, self.index) }
271 }
272}
273
274#[derive(Debug)]
283pub struct InitIter<T, P: Place<T>> {
284 inner: NonNull<Inner<T, P>>,
285 index: usize,
286}
287
288unsafe impl<T: Send, P: Place<T>> Send for InitIter<T, P> {}
289
290impl<T, P: Place<T>> InitIter<T, P> {
291 unsafe fn new(inner: NonNull<Inner<T, P>>) -> Self {
295 InitIter { inner, index: 0 }
296 }
297}
298
299impl<T, P: Place<T>> Iterator for InitIter<T, P> {
300 type Item = Sender<T, P>;
301
302 fn next(&mut self) -> Option<Self::Item> {
303 let inner = unsafe { self.inner.as_ref() };
305 let len = inner.place.as_ref().len();
306 if self.index < len {
307 let s = unsafe { Sender::new(self.inner, self.index) };
309 self.index += 1;
310 Some(s)
311 } else {
312 None
313 }
314 }
315
316 fn size_hint(&self) -> (usize, Option<usize>) {
317 let inner = unsafe { self.inner.as_ref() };
319 let len = inner.place.as_ref().len();
320 (len, Some(len))
321 }
322}
323
324impl<T, P: Place<T>> Drop for InitIter<T, P> {
325 fn drop(&mut self) {
326 self.for_each(drop)
327 }
328}
329
330impl<T, P: Place<T>> ExactSizeIterator for InitIter<T, P> {}
331
332impl<T, P: Place<T>> FusedIterator for InitIter<T, P> {}
333
334unsafe impl<T, P: Place<T>> TrustedLen for InitIter<T, P> {}
335
336pub fn from_place<T, P: Place<T>>(place: P) -> InitIter<T, P> {
339 let inner = Inner::new(place);
340 unsafe { InitIter::new(inner) }
342}
343
344pub fn vec<T>(count: usize) -> InitIter<T, Vec<Element<T>>> {
347 from_place(Element::vec(count))
348}
349
350pub fn array<T, const N: usize>() -> [Sender<T, [Element<T>; N]>; N] {
374 let inner = Inner::new(Element::array());
375 array::from_fn(move |index| unsafe { Sender::new(inner, index) })
377}
378
379#[cfg(test)]
380mod tests {
381 use alloc::vec::Vec;
382 #[cfg(not(loom))]
383 use std::thread;
384
385 #[cfg(loom)]
386 use loom::thread;
387
388 use crate::array::{from_place, Element};
389
390 #[test]
391 fn send() {
392 fn inner() {
393 let j = from_place(Element::array::<3>())
394 .enumerate()
395 .map(|(i, s)| thread::spawn(move || s.send(i)))
396 .collect::<Vec<_>>();
397
398 let iter = j
399 .into_iter()
400 .map(|j| j.join().unwrap())
401 .fold(Ok(()), Result::and)
402 .unwrap_err();
403
404 assert_eq!(iter.collect::<Vec<_>>(), [0, 1, 2]);
405 }
406
407 #[cfg(not(loom))]
408 inner();
409 #[cfg(loom)]
410 loom::model(inner);
411 }
412
413 #[test]
414 fn drop_one() {
415 fn inner() {
416 let j = from_place(Element::vec(3))
417 .enumerate()
418 .map(|(i, s)| {
419 if i != 1 {
420 thread::spawn(move || s.send(i))
421 } else {
422 thread::spawn(move || {
423 drop(s);
424 Ok(())
425 })
426 }
427 })
428 .collect::<Vec<_>>();
429
430 let res = j
431 .into_iter()
432 .map(|j| j.join().unwrap())
433 .fold(Ok(()), Result::and);
434
435 if let Err(iter) = res {
436 assert_eq!(iter.collect::<Vec<_>>(), [0, 2]);
437 }
438 }
439
440 #[cfg(not(loom))]
441 inner();
442 #[cfg(loom)]
443 loom::model(inner);
444 }
445}