1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};

use crate::scope::in_scope;

/// A handle for sending items into the related stream.
#[derive(Debug)]
pub struct Yielder<T> {
    id: u64,
    _marker: PhantomData<fn(T)>,
}

impl<T> Yielder<T> {
    pub(crate) fn new(id: u64) -> Self {
        Self {
            id,
            _marker: PhantomData,
        }
    }

    /// Send a item into the related stream.
    pub fn yield_item(&mut self, val: T) -> Yield<'_, T> {
        let place = unsafe { in_scope(self.id) };
        let value = Some(val);
        Yield { place, value }
    }
}

impl<T, E> Yielder<Result<T, E>> {
    /// Send `Ok(value)` into the related stream.
    pub fn yield_ok(&mut self, val: T) -> Yield<'_, Result<T, E>> {
        let place = unsafe { in_scope(self.id) };
        let value = Some(Ok(val));
        Yield { place, value }
    }
}

#[derive(Debug)]
#[must_use]
pub struct Yield<'a, T> {
    place: &'a mut Option<T>,
    value: Option<T>,
}

impl<T> Future for Yield<'_, T> {
    type Output = ();

    fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
        let this = unsafe { self.get_unchecked_mut() };
        if this.value.is_none() {
            return Poll::Ready(());
        }
        if this.place.is_none() {
            *this.place = this.value.take();
        }
        Poll::Pending
    }
}