Skip to main content

objectiveai_sdk/cli/command/
stream_once.rs

1//! A stream that yields exactly one item, then completes. Lets unary
2//! delegations be lifted into a `Stream` return type without machinery.
3
4use futures::Stream;
5
6pub struct StreamOnce<T>(Option<T>);
7
8impl<T> StreamOnce<T> {
9    pub fn new(item: T) -> Self {
10        Self(Some(item))
11    }
12}
13
14impl<T> Stream for StreamOnce<T> {
15    type Item = T;
16
17    fn poll_next(
18        self: std::pin::Pin<&mut Self>,
19        _cx: &mut std::task::Context<'_>,
20    ) -> std::task::Poll<Option<Self::Item>> {
21        // SAFETY: We hold `Option<T>`. Calling `take()` moves the
22        // inner `T` out by value but leaves the option location
23        // (which is what's pinned) in place — we never project into
24        // the still-stored `T`. Soundness only requires that we never
25        // observe the inner `T` through a pinned borrow before taking
26        // ownership, which we don't.
27        let this = unsafe { std::pin::Pin::get_unchecked_mut(self) };
28        std::task::Poll::Ready(this.0.take())
29    }
30}