unified-agent-api 0.2.0

Agent-agnostic facade and registry for wrapper backends
Documentation
use super::*;

pub(super) struct CountingStream<E, BE> {
    pub(super) items: VecDeque<Result<E, BE>>,
    pub(super) consumed: std::sync::Arc<AtomicUsize>,
}

impl<E, BE> Unpin for CountingStream<E, BE> {}

impl<E, BE> Stream for CountingStream<E, BE> {
    type Item = Result<E, BE>;

    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.get_mut();
        let next = this.items.pop_front();
        if next.is_some() {
            this.consumed.fetch_add(1, Ordering::SeqCst);
        }
        Poll::Ready(next)
    }
}

pub(super) struct GatedCountingStream<E, BE> {
    pub(super) first: Option<Result<E, BE>>,
    pub(super) rest: VecDeque<Result<E, BE>>,
    pub(super) gate_rx: Option<oneshot::Receiver<()>>,
    pub(super) consumed: std::sync::Arc<AtomicUsize>,
}

impl<E, BE> Unpin for GatedCountingStream<E, BE> {}

impl<E, BE> Stream for GatedCountingStream<E, BE> {
    type Item = Result<E, BE>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.get_mut();
        if let Some(first) = this.first.take() {
            this.consumed.fetch_add(1, Ordering::SeqCst);
            return Poll::Ready(Some(first));
        }

        if let Some(gate_rx) = &mut this.gate_rx {
            match Pin::new(gate_rx).poll(cx) {
                Poll::Ready(_) => {
                    this.gate_rx = None;
                }
                Poll::Pending => {
                    return Poll::Pending;
                }
            }
        }

        let next = this.rest.pop_front();
        if next.is_some() {
            this.consumed.fetch_add(1, Ordering::SeqCst);
        }
        Poll::Ready(next)
    }
}

pub(super) struct ControlledEndStream<E, BE> {
    pub(super) first: Option<Result<E, BE>>,
    pub(super) finish_rx: oneshot::Receiver<()>,
}

impl<E, BE> Unpin for ControlledEndStream<E, BE> {}

impl<E, BE> Stream for ControlledEndStream<E, BE> {
    type Item = Result<E, BE>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.get_mut();
        if let Some(first) = this.first.take() {
            return Poll::Ready(Some(first));
        }

        match Pin::new(&mut this.finish_rx).poll(cx) {
            Poll::Ready(_) => Poll::Ready(None),
            Poll::Pending => Poll::Pending,
        }
    }
}