kube_runtime/utils/
mod.rs1mod backoff_reset_timer;
4pub(crate) mod delayed_init;
5mod event_decode;
6mod event_modify;
7mod predicate;
8mod reflect;
9mod stream_backoff;
10mod watch_ext;
11
12#[deprecated(
14 since = "0.96.0",
15 note = "renamed to by `EventDecode`. This alias will be removed in 0.100.0."
16)]
17pub use EventDecode as EventFlatten;
18pub use backoff_reset_timer::{Backoff, ResetTimerBackoff};
19pub use event_decode::EventDecode;
20pub use event_modify::EventModify;
21pub use predicate::{Config as PredicateConfig, Predicate, PredicateFilter, predicates};
22pub use reflect::Reflect;
23pub use stream_backoff::StreamBackoff;
24pub use watch_ext::WatchStreamExt;
25
26use futures::{
27 FutureExt, Stream, StreamExt, TryStream, TryStreamExt,
28 stream::{self, Peekable},
29};
30use pin_project::pin_project;
31use std::{
32 fmt::Debug,
33 pin::{Pin, pin},
34 sync::{Arc, Mutex},
35 task::Poll,
36};
37use stream::IntoStream;
38use tokio::{runtime::Handle, task::JoinHandle};
39
40#[pin_project]
48pub(crate) struct SplitCase<S: Stream, Case> {
49 inner: Arc<Mutex<Peekable<S>>>,
51 should_consume_item: fn(&S::Item) -> bool,
56 try_extract_item_case: fn(S::Item) -> Option<Case>,
61}
62
63impl<S, Case> Stream for SplitCase<S, Case>
64where
65 S: Stream + Unpin,
66 S::Item: Debug,
67{
68 type Item = Case;
69
70 fn poll_next(
71 self: std::pin::Pin<&mut Self>,
72 cx: &mut std::task::Context<'_>,
73 ) -> std::task::Poll<Option<Self::Item>> {
74 let this = self.project();
75 let inner = this.inner.lock().unwrap();
76 let mut inner = Pin::new(inner);
77 let inner_peek = pin!(inner.as_mut().peek());
78 match inner_peek.poll(cx) {
79 Poll::Ready(Some(x_ref)) => {
80 if (this.should_consume_item)(x_ref) {
81 let item = inner.as_mut().poll_next(cx);
82 match item {
83 Poll::Ready(Some(x)) => Poll::Ready(Some((this.try_extract_item_case)(x).expect(
84 "`try_extract_item_case` returned `None` despite `should_consume_item` returning `true`",
85 ))),
86 res => panic!(
87 "Peekable::poll_next() returned {res:?} when Peekable::peek() returned Ready(Some(_))"
88 ),
89 }
90 } else {
91 Poll::Pending
93 }
94 }
95 Poll::Ready(None) => Poll::Ready(None),
96 Poll::Pending => Poll::Pending,
97 }
98 }
99}
100
101#[allow(clippy::type_complexity)]
105fn trystream_split_result<S>(
106 stream: S,
107) -> (
108 SplitCase<IntoStream<S>, S::Ok>,
109 SplitCase<IntoStream<S>, S::Error>,
110)
111where
112 S: TryStream + Unpin,
113 S::Ok: Debug,
114 S::Error: Debug,
115{
116 let stream = Arc::new(Mutex::new(stream.into_stream().peekable()));
117 (
118 SplitCase {
119 inner: stream.clone(),
120 should_consume_item: Result::is_ok,
121 try_extract_item_case: Result::ok,
122 },
123 SplitCase {
124 inner: stream,
125 should_consume_item: Result::is_err,
126 try_extract_item_case: Result::err,
127 },
128 )
129}
130
131pub(crate) fn trystream_try_via<S1, S2>(
133 input_stream: S1,
134 make_via_stream: impl FnOnce(SplitCase<IntoStream<S1>, S1::Ok>) -> S2,
135) -> impl Stream<Item = Result<S2::Ok, S1::Error>>
136where
137 S1: TryStream + Unpin,
138 S2: TryStream<Error = S1::Error>,
139 S1::Ok: Debug,
140 S1::Error: Debug,
141{
142 let (oks, errs) = trystream_split_result(input_stream); let via = make_via_stream(oks); stream::select(via.into_stream(), errs.map(Err)) }
146
147pub struct CancelableJoinHandle<T> {
149 inner: JoinHandle<T>,
150}
151
152impl<T> CancelableJoinHandle<T>
153where
154 T: Send + 'static,
155{
156 pub fn spawn(future: impl Future<Output = T> + Send + 'static, runtime: &Handle) -> Self {
158 CancelableJoinHandle {
159 inner: runtime.spawn(future),
160 }
161 }
162}
163
164impl<T> Drop for CancelableJoinHandle<T> {
165 fn drop(&mut self) {
166 self.inner.abort()
167 }
168}
169
170impl<T> Future for CancelableJoinHandle<T> {
171 type Output = T;
172
173 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
174 self.inner.poll_unpin(cx).map(
175 Result::unwrap,
178 )
179 }
180}
181
182#[pin_project]
183pub(crate) struct OnComplete<S, F> {
184 #[pin]
185 stream: stream::Fuse<S>,
186 #[pin]
187 on_complete: F,
188}
189
190impl<S: Stream, F: Future<Output = ()>> Stream for OnComplete<S, F> {
191 type Item = S::Item;
192
193 fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
194 let this = self.project();
195 match this.stream.poll_next(cx) {
196 Poll::Ready(None) => match this.on_complete.poll(cx) {
197 Poll::Pending => Poll::Pending,
198 Poll::Ready(()) => Poll::Ready(None),
199 },
200 x => x,
201 }
202 }
203}
204
205pub(crate) trait KubeRuntimeStreamExt: Stream + Sized {
206 fn on_complete<F: Future<Output = ()>>(self, on_complete: F) -> OnComplete<Self, F> {
208 OnComplete {
209 stream: self.fuse(),
210 on_complete,
211 }
212 }
213}
214
215impl<S: Stream> KubeRuntimeStreamExt for S {}
216
217#[cfg(test)]
218mod tests {
219 use std::convert::Infallible;
220
221 use futures::stream::{self, StreamExt};
222
223 use super::trystream_try_via;
224
225 #[allow(dead_code)]
227 fn trystream_try_via_should_be_able_to_borrow() {
228 struct WeirdComplexObject {}
229 impl Drop for WeirdComplexObject {
230 fn drop(&mut self) {}
231 }
232
233 let mut x = WeirdComplexObject {};
234 let y = WeirdComplexObject {};
235 drop(trystream_try_via(
236 Box::pin(stream::once(async {
237 let _ = &mut x;
238 Result::<_, Infallible>::Ok(())
239 })),
240 |s| {
241 s.map(|()| {
242 let _ = &y;
243 Ok(())
244 })
245 },
246 ));
247 }
248}