1use core::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5use std::{fmt::Debug, sync::Arc};
6
7use educe::Educe;
8use futures::Stream;
9use pin_project::pin_project;
10use std::task::ready;
11
12use crate::reflector::{ObjectRef, Store};
13use async_broadcast::{InactiveReceiver, Receiver, Sender};
14
15use super::Lookup;
16
17#[derive(Educe)]
18#[educe(Debug(bound("K: Debug, K::DynamicType: Debug")), Clone)]
19pub(crate) struct Dispatcher<K>
22where
23 K: Lookup + Clone + 'static,
24 K::DynamicType: Eq + std::hash::Hash + Clone,
25{
26 dispatch_tx: Sender<ObjectRef<K>>,
27 _dispatch_rx: InactiveReceiver<ObjectRef<K>>,
30}
31
32impl<K> Dispatcher<K>
33where
34 K: Lookup + Clone + 'static,
35 K::DynamicType: Eq + std::hash::Hash + Clone,
36{
37 #[cfg(feature = "unstable-runtime-subscribe")]
48 pub(crate) fn new(buf_size: usize) -> Dispatcher<K> {
49 let (mut dispatch_tx, dispatch_rx) = async_broadcast::broadcast(buf_size);
51 dispatch_tx.set_await_active(false);
55 Self {
56 dispatch_tx,
57 _dispatch_rx: dispatch_rx.deactivate(),
58 }
59 }
60
61 pub(crate) async fn broadcast(&mut self, obj_ref: ObjectRef<K>) {
64 let _ = self.dispatch_tx.broadcast_direct(obj_ref).await;
65 }
66
67 #[cfg(feature = "unstable-runtime-subscribe")]
72 pub(crate) fn subscribe(&self, reader: Store<K>) -> ReflectHandle<K> {
73 ReflectHandle::new(reader, self.dispatch_tx.new_receiver())
74 }
75}
76
77#[pin_project]
92pub struct ReflectHandle<K>
93where
94 K: Lookup + Clone + 'static,
95 K::DynamicType: Eq + std::hash::Hash + Clone,
96{
97 #[pin]
98 rx: Receiver<ObjectRef<K>>,
99 reader: Store<K>,
100}
101
102impl<K> Clone for ReflectHandle<K>
103where
104 K: Lookup + Clone + 'static,
105 K::DynamicType: Eq + std::hash::Hash + Clone,
106{
107 fn clone(&self) -> Self {
108 ReflectHandle::new(self.reader.clone(), self.rx.clone())
109 }
110}
111
112impl<K> ReflectHandle<K>
113where
114 K: Lookup + Clone,
115 K::DynamicType: Eq + std::hash::Hash + Clone,
116{
117 pub(super) fn new(reader: Store<K>, rx: Receiver<ObjectRef<K>>) -> ReflectHandle<K> {
118 Self { rx, reader }
119 }
120
121 #[must_use]
123 pub fn reader(&self) -> Store<K> {
124 self.reader.clone()
125 }
126}
127
128impl<K> Stream for ReflectHandle<K>
129where
130 K: Lookup + Clone,
131 K::DynamicType: Eq + std::hash::Hash + Clone + Default,
132{
133 type Item = Arc<K>;
134
135 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
136 let mut this = self.project();
137 match ready!(this.rx.as_mut().poll_next(cx)) {
138 Some(obj_ref) => this
139 .reader
140 .get(&obj_ref)
141 .map_or(Poll::Pending, |obj| Poll::Ready(Some(obj))),
142 None => Poll::Ready(None),
143 }
144 }
145}
146
147#[cfg(feature = "unstable-runtime-subscribe")]
148#[cfg(test)]
149pub(crate) mod test {
150 use crate::{
151 WatchStreamExt,
152 watcher::{Error, Event},
153 };
154 use std::{pin::pin, sync::Arc, task::Poll};
155
156 use crate::reflector;
157 use futures::{StreamExt, poll, stream};
158 use k8s_openapi::api::core::v1::Pod;
159
160 fn testpod(name: &str) -> Pod {
161 let mut pod = Pod::default();
162 pod.metadata.name = Some(name.to_string());
163 pod
164 }
165
166 #[tokio::test]
167 async fn events_are_passed_through() {
168 let foo = testpod("foo");
169 let bar = testpod("bar");
170 let st = stream::iter([
171 Ok(Event::Apply(foo.clone())),
172 Err(Error::NoResourceVersion),
173 Ok(Event::Init),
174 Ok(Event::InitApply(foo)),
175 Ok(Event::InitApply(bar)),
176 Ok(Event::InitDone),
177 ]);
178
179 let (reader, writer) = reflector::store_shared(10);
180 let mut reflect = pin!(st.reflect_shared(writer));
181
182 assert_eq!(reader.len(), 0);
184 assert!(matches!(
185 poll!(reflect.next()),
186 Poll::Ready(Some(Ok(Event::Apply(_))))
187 ));
188
189 assert_eq!(reader.len(), 1);
191 assert!(matches!(
192 poll!(reflect.next()),
193 Poll::Ready(Some(Err(Error::NoResourceVersion)))
194 ));
195 assert_eq!(reader.len(), 1);
196
197 let restarted = poll!(reflect.next());
198 assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::Init)))));
199 assert_eq!(reader.len(), 1);
200
201 let restarted = poll!(reflect.next());
202 assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitApply(_))))));
203 assert_eq!(reader.len(), 1);
204
205 let restarted = poll!(reflect.next());
206 assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitApply(_))))));
207 assert_eq!(reader.len(), 1);
208
209 let restarted = poll!(reflect.next());
210 assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitDone)))));
211 assert_eq!(reader.len(), 2);
212
213 assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
214 assert_eq!(reader.len(), 2);
215 }
216
217 #[tokio::test]
218 async fn readers_yield_touched_objects() {
219 let foo = testpod("foo");
224 let bar = testpod("bar");
225 let st = stream::iter([
226 Ok(Event::Delete(foo.clone())),
227 Ok(Event::Apply(foo.clone())),
228 Err(Error::NoResourceVersion),
229 Ok(Event::Init),
230 Ok(Event::InitApply(foo.clone())),
231 Ok(Event::InitApply(bar.clone())),
232 Ok(Event::InitDone),
233 ]);
234
235 let foo = Arc::new(foo);
236 let _bar = Arc::new(bar);
237
238 let (_, writer) = reflector::store_shared(10);
239 let mut subscriber = pin!(writer.subscribe().unwrap());
240 let mut reflect = pin!(st.reflect_shared(writer));
241
242 assert!(matches!(
244 poll!(reflect.next()),
245 Poll::Ready(Some(Ok(Event::Delete(_))))
246 ));
247 assert_eq!(poll!(subscriber.next()), Poll::Pending);
248
249 assert!(matches!(
250 poll!(reflect.next()),
251 Poll::Ready(Some(Ok(Event::Apply(_))))
252 ));
253 assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
254
255 assert!(matches!(
257 poll!(reflect.next()),
258 Poll::Ready(Some(Err(Error::NoResourceVersion)))
259 ));
260 assert!(matches!(poll!(subscriber.next()), Poll::Pending));
261
262 assert!(matches!(
265 poll!(reflect.next()),
266 Poll::Ready(Some(Ok(Event::Init)))
267 ));
268
269 assert!(matches!(
270 poll!(reflect.next()),
271 Poll::Ready(Some(Ok(Event::InitApply(_))))
272 ));
273 assert!(matches!(
274 poll!(reflect.next()),
275 Poll::Ready(Some(Ok(Event::InitApply(_))))
276 ));
277
278 assert!(matches!(
279 poll!(reflect.next()),
280 Poll::Ready(Some(Ok(Event::InitDone)))
281 ));
282
283 assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
285 assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
286
287 assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
289 assert_eq!(poll!(subscriber.next()), Poll::Ready(None));
290 }
291
292 #[tokio::test]
293 async fn readers_yield_when_tx_drops() {
294 let foo = testpod("foo");
297 let bar = testpod("bar");
298 let st = stream::iter([
299 Ok(Event::Apply(foo.clone())),
300 Ok(Event::Init),
301 Ok(Event::InitApply(foo.clone())),
302 Ok(Event::InitApply(bar.clone())),
303 Ok(Event::InitDone),
304 ]);
305
306 let foo = Arc::new(foo);
307 let _bar = Arc::new(bar);
308
309 let (_, writer) = reflector::store_shared(10);
310 let mut subscriber = pin!(writer.subscribe().unwrap());
311 let mut reflect = Box::pin(st.reflect_shared(writer));
312
313 assert!(matches!(
314 poll!(reflect.next()),
315 Poll::Ready(Some(Ok(Event::Apply(_))))
316 ));
317 assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
318
319 assert_eq!(poll!(subscriber.next()), Poll::Pending);
324
325 assert!(matches!(
326 poll!(reflect.next()),
327 Poll::Ready(Some(Ok(Event::Init)))
328 ));
329 assert_eq!(poll!(subscriber.next()), Poll::Pending);
330
331 assert!(matches!(
332 poll!(reflect.next()),
333 Poll::Ready(Some(Ok(Event::InitApply(_))))
334 ));
335 assert_eq!(poll!(subscriber.next()), Poll::Pending);
336
337 assert!(matches!(
338 poll!(reflect.next()),
339 Poll::Ready(Some(Ok(Event::InitApply(_))))
340 ));
341 assert_eq!(poll!(subscriber.next()), Poll::Pending);
342
343 assert!(matches!(
344 poll!(reflect.next()),
345 Poll::Ready(Some(Ok(Event::InitDone)))
346 ));
347 drop(reflect);
348
349 assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
351 assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
352 assert_eq!(poll!(subscriber.next()), Poll::Ready(None));
353 }
354
355 #[tokio::test]
356 async fn reflect_applies_backpressure() {
357 let foo = testpod("foo");
363 let bar = testpod("bar");
364 let st = stream::iter([
365 Ok(Event::Apply(foo.clone())),
367 Ok(Event::Apply(bar.clone())),
368 Ok(Event::Apply(foo.clone())),
369 ]);
370
371 let foo = Arc::new(foo);
372 let bar = Arc::new(bar);
373
374 let (_, writer) = reflector::store_shared(1);
375 let mut subscriber = pin!(writer.subscribe().unwrap());
376 let mut subscriber_slow = pin!(writer.subscribe().unwrap());
377 let mut reflect = pin!(st.reflect_shared(writer));
378
379 assert_eq!(poll!(subscriber.next()), Poll::Pending);
380 assert_eq!(poll!(subscriber_slow.next()), Poll::Pending);
381
382 assert!(matches!(
387 poll!(reflect.next()),
388 Poll::Ready(Some(Ok(Event::Apply(_))))
389 ));
390 assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
391
392 assert!(matches!(poll!(reflect.next()), Poll::Pending));
398
399 assert_eq!(poll!(subscriber.next()), Poll::Pending);
402
403 assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(foo.clone())));
405
406 assert!(matches!(
409 poll!(reflect.next()),
410 Poll::Ready(Some(Ok(Event::Apply(_))))
411 ));
412 assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone())));
413 assert!(matches!(poll!(reflect.next()), Poll::Pending));
414 assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(bar.clone())));
415 assert!(matches!(
416 poll!(reflect.next()),
417 Poll::Ready(Some(Ok(Event::Apply(_))))
418 ));
419 assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
421 assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
422 assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(foo.clone())));
423
424 assert_eq!(poll!(subscriber.next()), Poll::Ready(None));
425 assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(None));
426 }
427
428 }