transform_stream/
yielder.rs

1use crate::scope::in_scope;
2
3use std::future::Future;
4use std::marker::PhantomData;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8/// A handle for sending items into the related stream.
9#[derive(Debug)]
10pub struct Yielder<T> {
11    id: u64,
12    _marker: PhantomData<fn(T)>,
13}
14
15impl<T> Yielder<T> {
16    pub(crate) fn new(id: u64) -> Self {
17        Self {
18            id,
19            _marker: PhantomData,
20        }
21    }
22
23    /// Send a item into the related stream.
24    pub fn yield_(&mut self, val: T) -> Yield<'_, T> {
25        let place = unsafe { in_scope(self.id) };
26        let value = Some(val);
27        Yield { place, value }
28    }
29}
30
31impl<T, E> Yielder<Result<T, E>> {
32    /// Send `Ok(val)` into the related stream.
33    pub fn yield_ok(&mut self, val: T) -> Yield<'_, Result<T, E>> {
34        self.yield_(Ok(val))
35    }
36
37    /// Send `Err(err)` into the related stream.
38    pub fn yield_err(&mut self, err: E) -> Yield<'_, Result<T, E>> {
39        self.yield_(Err(err))
40    }
41}
42
43#[derive(Debug)]
44#[must_use]
45pub struct Yield<'a, T> {
46    place: &'a mut Option<T>,
47    value: Option<T>,
48}
49
50impl<T> Future for Yield<'_, T> {
51    type Output = ();
52
53    fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
54        let this = unsafe { self.get_unchecked_mut() };
55        if this.value.is_none() {
56            return Poll::Ready(());
57        }
58        if this.place.is_none() {
59            *this.place = this.value.take();
60        }
61        Poll::Pending
62    }
63}