Skip to main content

kube_runtime/utils/
mod.rs

1//! Helpers for manipulating built-in streams
2
3mod backoff_reset_timer;
4pub(crate) mod delayed_init;
5mod event_decode;
6mod event_modify;
7mod predicate;
8mod reflect;
9mod stream_backoff;
10mod watch_ext;
11
12/// Deprecated type alias for `EventDecode`
13#[deprecated(
14    since = "0.96.0",
15    note = "renamed to by `EventDecode`. This alias will be removed in 0.100.0."
16)]
17pub use EventDecode as EventFlatten;
18pub use backoff_reset_timer::{Backoff, ResetTimerBackoff};
19pub use event_decode::EventDecode;
20pub use event_modify::EventModify;
21pub use predicate::{Config as PredicateConfig, Predicate, PredicateFilter, predicates};
22pub use reflect::Reflect;
23pub use stream_backoff::StreamBackoff;
24pub use watch_ext::WatchStreamExt;
25
26use futures::{
27    FutureExt, Stream, StreamExt, TryStream, TryStreamExt,
28    stream::{self, Peekable},
29};
30use pin_project::pin_project;
31use std::{
32    fmt::Debug,
33    pin::{Pin, pin},
34    sync::{Arc, Mutex},
35    task::Poll,
36};
37use stream::IntoStream;
38use tokio::{runtime::Handle, task::JoinHandle};
39
40/// Allows splitting a `Stream` into several streams that each emit a disjoint subset of the input stream's items,
41/// like a streaming variant of pattern matching.
42///
43/// NOTE: The cases MUST be reunited into the same final stream (using `futures::stream::select` or similar),
44/// since cases for rejected items will *not* register wakeup correctly, and may otherwise lose items and/or deadlock.
45///
46/// NOTE: The whole set of cases will deadlock if there is ever an item that no live case wants to consume.
47#[pin_project]
48pub(crate) struct SplitCase<S: Stream, Case> {
49    // Future-unaware `Mutex` is OK because it's only taken inside single poll()s
50    inner: Arc<Mutex<Peekable<S>>>,
51    /// Tests whether an item from the stream should be consumed
52    ///
53    /// NOTE: This MUST be total over all `SplitCase`s, otherwise the input stream
54    /// will get stuck deadlocked because no candidate tries to consume the item.
55    should_consume_item: fn(&S::Item) -> bool,
56    /// Narrows the type of the consumed type, using the same precondition as `should_consume_item`.
57    ///
58    /// NOTE: This MUST return `Some` if `should_consume_item` returns `true`, since we can't put
59    /// an item back into the input stream once consumed.
60    try_extract_item_case: fn(S::Item) -> Option<Case>,
61}
62
63impl<S, Case> Stream for SplitCase<S, Case>
64where
65    S: Stream + Unpin,
66    S::Item: Debug,
67{
68    type Item = Case;
69
70    fn poll_next(
71        self: std::pin::Pin<&mut Self>,
72        cx: &mut std::task::Context<'_>,
73    ) -> std::task::Poll<Option<Self::Item>> {
74        let this = self.project();
75        let inner = this.inner.lock().unwrap();
76        let mut inner = Pin::new(inner);
77        let inner_peek = pin!(inner.as_mut().peek());
78        match inner_peek.poll(cx) {
79            Poll::Ready(Some(x_ref)) => {
80                if (this.should_consume_item)(x_ref) {
81                    let item = inner.as_mut().poll_next(cx);
82                    match item {
83                        Poll::Ready(Some(x)) => Poll::Ready(Some((this.try_extract_item_case)(x).expect(
84                            "`try_extract_item_case` returned `None` despite `should_consume_item` returning `true`",
85                        ))),
86                        res => panic!(
87                    "Peekable::poll_next() returned {res:?} when Peekable::peek() returned Ready(Some(_))"
88                ),
89                    }
90                } else {
91                    // Handled by another SplitCase instead
92                    Poll::Pending
93                }
94            }
95            Poll::Ready(None) => Poll::Ready(None),
96            Poll::Pending => Poll::Pending,
97        }
98    }
99}
100
101/// Splits a `TryStream` into separate `Ok` and `Error` streams.
102///
103/// Note: This will deadlock if one branch outlives the other
104#[allow(clippy::type_complexity)]
105fn trystream_split_result<S>(
106    stream: S,
107) -> (
108    SplitCase<IntoStream<S>, S::Ok>,
109    SplitCase<IntoStream<S>, S::Error>,
110)
111where
112    S: TryStream + Unpin,
113    S::Ok: Debug,
114    S::Error: Debug,
115{
116    let stream = Arc::new(Mutex::new(stream.into_stream().peekable()));
117    (
118        SplitCase {
119            inner: stream.clone(),
120            should_consume_item: Result::is_ok,
121            try_extract_item_case: Result::ok,
122        },
123        SplitCase {
124            inner: stream,
125            should_consume_item: Result::is_err,
126            try_extract_item_case: Result::err,
127        },
128    )
129}
130
131/// Forwards Ok elements via a stream built from `make_via_stream`, while passing errors through unmodified
132pub(crate) fn trystream_try_via<S1, S2>(
133    input_stream: S1,
134    make_via_stream: impl FnOnce(SplitCase<IntoStream<S1>, S1::Ok>) -> S2,
135) -> impl Stream<Item = Result<S2::Ok, S1::Error>>
136where
137    S1: TryStream + Unpin,
138    S2: TryStream<Error = S1::Error>,
139    S1::Ok: Debug,
140    S1::Error: Debug,
141{
142    let (oks, errs) = trystream_split_result(input_stream); // the select -> SplitCase
143    let via = make_via_stream(oks); // the map_ok/err function
144    stream::select(via.into_stream(), errs.map(Err)) // recombine
145}
146
147/// A [`JoinHandle`] that cancels the [`Future`] when dropped, rather than detaching it
148pub struct CancelableJoinHandle<T> {
149    inner: JoinHandle<T>,
150}
151
152impl<T> CancelableJoinHandle<T>
153where
154    T: Send + 'static,
155{
156    /// Wrap a future in a cancelable handle, and spawn in a runtime
157    pub fn spawn(future: impl Future<Output = T> + Send + 'static, runtime: &Handle) -> Self {
158        CancelableJoinHandle {
159            inner: runtime.spawn(future),
160        }
161    }
162}
163
164impl<T> Drop for CancelableJoinHandle<T> {
165    fn drop(&mut self) {
166        self.inner.abort()
167    }
168}
169
170impl<T> Future for CancelableJoinHandle<T> {
171    type Output = T;
172
173    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
174        self.inner.poll_unpin(cx).map(
175            // JoinError => underlying future was either aborted (which should only happen when the handle is dropped), or
176            // panicked (which should be propagated)
177            Result::unwrap,
178        )
179    }
180}
181
182#[pin_project]
183pub(crate) struct OnComplete<S, F> {
184    #[pin]
185    stream: stream::Fuse<S>,
186    #[pin]
187    on_complete: F,
188}
189
190impl<S: Stream, F: Future<Output = ()>> Stream for OnComplete<S, F> {
191    type Item = S::Item;
192
193    fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
194        let this = self.project();
195        match this.stream.poll_next(cx) {
196            Poll::Ready(None) => match this.on_complete.poll(cx) {
197                Poll::Pending => Poll::Pending,
198                Poll::Ready(()) => Poll::Ready(None),
199            },
200            x => x,
201        }
202    }
203}
204
205pub(crate) trait KubeRuntimeStreamExt: Stream + Sized {
206    /// Runs the [`Future`] `on_complete` once the [`Stream`] finishes (by returning [`None`]).
207    fn on_complete<F: Future<Output = ()>>(self, on_complete: F) -> OnComplete<Self, F> {
208        OnComplete {
209            stream: self.fuse(),
210            on_complete,
211        }
212    }
213}
214
215impl<S: Stream> KubeRuntimeStreamExt for S {}
216
217#[cfg(test)]
218mod tests {
219    use std::convert::Infallible;
220
221    use futures::stream::{self, StreamExt};
222
223    use super::trystream_try_via;
224
225    // Type-level test does not need to be executed
226    #[allow(dead_code)]
227    fn trystream_try_via_should_be_able_to_borrow() {
228        struct WeirdComplexObject {}
229        impl Drop for WeirdComplexObject {
230            fn drop(&mut self) {}
231        }
232
233        let mut x = WeirdComplexObject {};
234        let y = WeirdComplexObject {};
235        drop(trystream_try_via(
236            Box::pin(stream::once(async {
237                let _ = &mut x;
238                Result::<_, Infallible>::Ok(())
239            })),
240            |s| {
241                s.map(|()| {
242                    let _ = &y;
243                    Ok(())
244                })
245            },
246        ));
247    }
248}