Skip to main content

kube_runtime/reflector/
dispatcher.rs

1use core::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5use std::{fmt::Debug, sync::Arc};
6
7use educe::Educe;
8use futures::Stream;
9use pin_project::pin_project;
10use std::task::ready;
11
12use crate::reflector::{ObjectRef, Store};
13use async_broadcast::{InactiveReceiver, Receiver, Sender};
14
15use super::Lookup;
16
17#[derive(Educe)]
18#[educe(Debug(bound("K: Debug, K::DynamicType: Debug")), Clone)]
19// A helper type that holds a broadcast transmitter and a broadcast receiver,
20// used to fan-out events from a root stream to multiple listeners.
21pub(crate) struct Dispatcher<K>
22where
23    K: Lookup + Clone + 'static,
24    K::DynamicType: Eq + std::hash::Hash + Clone,
25{
26    dispatch_tx: Sender<ObjectRef<K>>,
27    // An inactive reader that prevents the channel from closing until the
28    // writer is dropped.
29    _dispatch_rx: InactiveReceiver<ObjectRef<K>>,
30}
31
32impl<K> Dispatcher<K>
33where
34    K: Lookup + Clone + 'static,
35    K::DynamicType: Eq + std::hash::Hash + Clone,
36{
37    /// Creates and returns a new self that wraps a broadcast sender and an
38    /// inactive broadcast receiver
39    ///
40    /// A buffer size is required to create the underlying broadcast channel.
41    /// Messages will be buffered until all active readers have received a copy
42    /// of the message. When the channel is full, senders will apply
43    /// backpressure by waiting for space to free up.
44    //
45    // N.B messages are eagerly broadcasted, meaning no active receivers are
46    // required for a message to be broadcasted.
47    #[cfg(feature = "unstable-runtime-subscribe")]
48    pub(crate) fn new(buf_size: usize) -> Dispatcher<K> {
49        // Create a broadcast (tx, rx) pair
50        let (mut dispatch_tx, dispatch_rx) = async_broadcast::broadcast(buf_size);
51        // The tx half will not wait for any receivers to be active before
52        // broadcasting events. If no receivers are active, events will be
53        // buffered.
54        dispatch_tx.set_await_active(false);
55        Self {
56            dispatch_tx,
57            _dispatch_rx: dispatch_rx.deactivate(),
58        }
59    }
60
61    // Calls broadcast on the channel. Will return when the channel has enough
62    // space to send an event.
63    pub(crate) async fn broadcast(&mut self, obj_ref: ObjectRef<K>) {
64        let _ = self.dispatch_tx.broadcast_direct(obj_ref).await;
65    }
66
67    // Creates a `ReflectHandle` by creating a receiver from the tx half.
68    // N.B: the new receiver will be fast-forwarded to the _latest_ event.
69    // The receiver won't have access to any events that are currently waiting
70    // to be acked by listeners.
71    #[cfg(feature = "unstable-runtime-subscribe")]
72    pub(crate) fn subscribe(&self, reader: Store<K>) -> ReflectHandle<K> {
73        ReflectHandle::new(reader, self.dispatch_tx.new_receiver())
74    }
75}
76
77/// A handle to a shared stream reader
78///
79/// [`ReflectHandle`]s are created by calling [`subscribe()`] on a [`Writer`],
80/// or by calling `clone()` on an already existing [`ReflectHandle`]. Each
81/// shared stream reader should be polled independently and driven to readiness
82/// to avoid deadlocks. When the [`Writer`]'s buffer is filled, backpressure
83/// will be applied on the root stream side.
84///
85/// When the root stream is dropped, or it ends, all [`ReflectHandle`]s
86/// subscribed to the stream will also terminate after all events yielded by
87/// the root stream have been observed. This means [`ReflectHandle`] streams
88/// can still be polled after the root stream has been dropped.
89///
90/// [`Writer`]: crate::reflector::Writer
91#[pin_project]
92pub struct ReflectHandle<K>
93where
94    K: Lookup + Clone + 'static,
95    K::DynamicType: Eq + std::hash::Hash + Clone,
96{
97    #[pin]
98    rx: Receiver<ObjectRef<K>>,
99    reader: Store<K>,
100}
101
102impl<K> Clone for ReflectHandle<K>
103where
104    K: Lookup + Clone + 'static,
105    K::DynamicType: Eq + std::hash::Hash + Clone,
106{
107    fn clone(&self) -> Self {
108        ReflectHandle::new(self.reader.clone(), self.rx.clone())
109    }
110}
111
112impl<K> ReflectHandle<K>
113where
114    K: Lookup + Clone,
115    K::DynamicType: Eq + std::hash::Hash + Clone,
116{
117    pub(super) fn new(reader: Store<K>, rx: Receiver<ObjectRef<K>>) -> ReflectHandle<K> {
118        Self { rx, reader }
119    }
120
121    /// Get a reader from a reflect handle
122    #[must_use]
123    pub fn reader(&self) -> Store<K> {
124        self.reader.clone()
125    }
126}
127
128impl<K> Stream for ReflectHandle<K>
129where
130    K: Lookup + Clone,
131    K::DynamicType: Eq + std::hash::Hash + Clone + Default,
132{
133    type Item = Arc<K>;
134
135    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
136        let mut this = self.project();
137        match ready!(this.rx.as_mut().poll_next(cx)) {
138            Some(obj_ref) => this
139                .reader
140                .get(&obj_ref)
141                .map_or(Poll::Pending, |obj| Poll::Ready(Some(obj))),
142            None => Poll::Ready(None),
143        }
144    }
145}
146
147#[cfg(feature = "unstable-runtime-subscribe")]
148#[cfg(test)]
149pub(crate) mod test {
150    use crate::{
151        WatchStreamExt,
152        watcher::{Error, Event},
153    };
154    use std::{pin::pin, sync::Arc, task::Poll};
155
156    use crate::reflector;
157    use futures::{StreamExt, poll, stream};
158    use k8s_openapi::api::core::v1::Pod;
159
160    fn testpod(name: &str) -> Pod {
161        let mut pod = Pod::default();
162        pod.metadata.name = Some(name.to_string());
163        pod
164    }
165
166    #[tokio::test]
167    async fn events_are_passed_through() {
168        let foo = testpod("foo");
169        let bar = testpod("bar");
170        let st = stream::iter([
171            Ok(Event::Apply(foo.clone())),
172            Err(Error::NoResourceVersion),
173            Ok(Event::Init),
174            Ok(Event::InitApply(foo)),
175            Ok(Event::InitApply(bar)),
176            Ok(Event::InitDone),
177        ]);
178
179        let (reader, writer) = reflector::store_shared(10);
180        let mut reflect = pin!(st.reflect_shared(writer));
181
182        // Prior to any polls, we should have an empty store.
183        assert_eq!(reader.len(), 0);
184        assert!(matches!(
185            poll!(reflect.next()),
186            Poll::Ready(Some(Ok(Event::Apply(_))))
187        ));
188
189        // Make progress and assert all events are seen
190        assert_eq!(reader.len(), 1);
191        assert!(matches!(
192            poll!(reflect.next()),
193            Poll::Ready(Some(Err(Error::NoResourceVersion)))
194        ));
195        assert_eq!(reader.len(), 1);
196
197        let restarted = poll!(reflect.next());
198        assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::Init)))));
199        assert_eq!(reader.len(), 1);
200
201        let restarted = poll!(reflect.next());
202        assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitApply(_))))));
203        assert_eq!(reader.len(), 1);
204
205        let restarted = poll!(reflect.next());
206        assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitApply(_))))));
207        assert_eq!(reader.len(), 1);
208
209        let restarted = poll!(reflect.next());
210        assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitDone)))));
211        assert_eq!(reader.len(), 2);
212
213        assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
214        assert_eq!(reader.len(), 2);
215    }
216
217    #[tokio::test]
218    async fn readers_yield_touched_objects() {
219        // Readers should yield touched objects they receive from Stream events.
220        //
221        // NOTE: a Delete(_) event will be ignored if the item does not exist in
222        // the cache. Same with a Restarted(vec![delete_item])
223        let foo = testpod("foo");
224        let bar = testpod("bar");
225        let st = stream::iter([
226            Ok(Event::Delete(foo.clone())),
227            Ok(Event::Apply(foo.clone())),
228            Err(Error::NoResourceVersion),
229            Ok(Event::Init),
230            Ok(Event::InitApply(foo.clone())),
231            Ok(Event::InitApply(bar.clone())),
232            Ok(Event::InitDone),
233        ]);
234
235        let foo = Arc::new(foo);
236        let _bar = Arc::new(bar);
237
238        let (_, writer) = reflector::store_shared(10);
239        let mut subscriber = pin!(writer.subscribe().unwrap());
240        let mut reflect = pin!(st.reflect_shared(writer));
241
242        // Deleted events should be skipped by subscriber.
243        assert!(matches!(
244            poll!(reflect.next()),
245            Poll::Ready(Some(Ok(Event::Delete(_))))
246        ));
247        assert_eq!(poll!(subscriber.next()), Poll::Pending);
248
249        assert!(matches!(
250            poll!(reflect.next()),
251            Poll::Ready(Some(Ok(Event::Apply(_))))
252        ));
253        assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
254
255        // Errors are not propagated to subscribers.
256        assert!(matches!(
257            poll!(reflect.next()),
258            Poll::Ready(Some(Err(Error::NoResourceVersion)))
259        ));
260        assert!(matches!(poll!(subscriber.next()), Poll::Pending));
261
262        // Restart event will yield all objects in the list
263
264        assert!(matches!(
265            poll!(reflect.next()),
266            Poll::Ready(Some(Ok(Event::Init)))
267        ));
268
269        assert!(matches!(
270            poll!(reflect.next()),
271            Poll::Ready(Some(Ok(Event::InitApply(_))))
272        ));
273        assert!(matches!(
274            poll!(reflect.next()),
275            Poll::Ready(Some(Ok(Event::InitApply(_))))
276        ));
277
278        assert!(matches!(
279            poll!(reflect.next()),
280            Poll::Ready(Some(Ok(Event::InitDone)))
281        ));
282
283        // these don't come back in order atm:
284        assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
285        assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
286
287        // When main channel is closed, it is propagated to subscribers
288        assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
289        assert_eq!(poll!(subscriber.next()), Poll::Ready(None));
290    }
291
292    #[tokio::test]
293    async fn readers_yield_when_tx_drops() {
294        // Once the main stream is dropped, readers should continue to make
295        // progress and read values that have been sent on the channel.
296        let foo = testpod("foo");
297        let bar = testpod("bar");
298        let st = stream::iter([
299            Ok(Event::Apply(foo.clone())),
300            Ok(Event::Init),
301            Ok(Event::InitApply(foo.clone())),
302            Ok(Event::InitApply(bar.clone())),
303            Ok(Event::InitDone),
304        ]);
305
306        let foo = Arc::new(foo);
307        let _bar = Arc::new(bar);
308
309        let (_, writer) = reflector::store_shared(10);
310        let mut subscriber = pin!(writer.subscribe().unwrap());
311        let mut reflect = Box::pin(st.reflect_shared(writer));
312
313        assert!(matches!(
314            poll!(reflect.next()),
315            Poll::Ready(Some(Ok(Event::Apply(_))))
316        ));
317        assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
318
319        // Restart event will yield all objects in the list. Broadcast values
320        // without polling and then drop.
321        //
322        // First, subscribers should be pending.
323        assert_eq!(poll!(subscriber.next()), Poll::Pending);
324
325        assert!(matches!(
326            poll!(reflect.next()),
327            Poll::Ready(Some(Ok(Event::Init)))
328        ));
329        assert_eq!(poll!(subscriber.next()), Poll::Pending);
330
331        assert!(matches!(
332            poll!(reflect.next()),
333            Poll::Ready(Some(Ok(Event::InitApply(_))))
334        ));
335        assert_eq!(poll!(subscriber.next()), Poll::Pending);
336
337        assert!(matches!(
338            poll!(reflect.next()),
339            Poll::Ready(Some(Ok(Event::InitApply(_))))
340        ));
341        assert_eq!(poll!(subscriber.next()), Poll::Pending);
342
343        assert!(matches!(
344            poll!(reflect.next()),
345            Poll::Ready(Some(Ok(Event::InitDone)))
346        ));
347        drop(reflect);
348
349        // we will get foo and bar here, but we dont have a guaranteed ordering on page events
350        assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
351        assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
352        assert_eq!(poll!(subscriber.next()), Poll::Ready(None));
353    }
354
355    #[tokio::test]
356    async fn reflect_applies_backpressure() {
357        // When the channel is full, we should observe backpressure applied.
358        //
359        // This will be manifested by receiving Poll::Pending on the reflector
360        // stream while the reader stream is not polled. Once we unblock the
361        // buffer, the reflector will make progress.
362        let foo = testpod("foo");
363        let bar = testpod("bar");
364        let st = stream::iter([
365            //TODO: include a ready event here to avoid dealing with Init?
366            Ok(Event::Apply(foo.clone())),
367            Ok(Event::Apply(bar.clone())),
368            Ok(Event::Apply(foo.clone())),
369        ]);
370
371        let foo = Arc::new(foo);
372        let bar = Arc::new(bar);
373
374        let (_, writer) = reflector::store_shared(1);
375        let mut subscriber = pin!(writer.subscribe().unwrap());
376        let mut subscriber_slow = pin!(writer.subscribe().unwrap());
377        let mut reflect = pin!(st.reflect_shared(writer));
378
379        assert_eq!(poll!(subscriber.next()), Poll::Pending);
380        assert_eq!(poll!(subscriber_slow.next()), Poll::Pending);
381
382        // Poll first subscriber, but not the second.
383        //
384        // The buffer can hold one object value, so even if we have a slow subscriber,
385        // we will still get an event from the root.
386        assert!(matches!(
387            poll!(reflect.next()),
388            Poll::Ready(Some(Ok(Event::Apply(_))))
389        ));
390        assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
391
392        // One subscriber is not reading, so we need to apply backpressure until
393        // channel has capacity.
394        //
395        // At this point, the buffer is full. Polling again will trigger the
396        // backpressure logic.
397        assert!(matches!(poll!(reflect.next()), Poll::Pending));
398
399        // Our "fast" subscriber will also have nothing else to poll until the
400        // slower subscriber advances its pointer in the buffer.
401        assert_eq!(poll!(subscriber.next()), Poll::Pending);
402
403        // Advance slow reader
404        assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(foo.clone())));
405
406        // We now have room for only one more item. In total, the previous event
407        // had two. We repeat the same pattern.
408        assert!(matches!(
409            poll!(reflect.next()),
410            Poll::Ready(Some(Ok(Event::Apply(_))))
411        ));
412        assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone())));
413        assert!(matches!(poll!(reflect.next()), Poll::Pending));
414        assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(bar.clone())));
415        assert!(matches!(
416            poll!(reflect.next()),
417            Poll::Ready(Some(Ok(Event::Apply(_))))
418        ));
419        // Poll again to drain the queue.
420        assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
421        assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
422        assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(foo.clone())));
423
424        assert_eq!(poll!(subscriber.next()), Poll::Ready(None));
425        assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(None));
426    }
427
428    // TODO (matei): tests around cloning subscribers once a watch stream has already
429    // been established. This will depend on the interfaces & impl so are left
430    // out for now.
431}