1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
use crate::client::Client;
use futures::Stream;
use k8s_openapi::serde::de::DeserializeOwned;
use kube::{
    api::ListParams,
    runtime::{
        reflector::{self, reflector, Store},
        watcher,
    },
    Api, Resource,
};
use std::{convert::Infallible, fmt::Debug, hash::Hash, ops::Deref, pin::Pin};

pub struct Reflector<K>
where
    K: Resource + 'static,
    K::DynamicType: Hash + Eq,
{
    pub reader: Store<K>,
    pub stream: Pin<Box<dyn Stream<Item = watcher::Result<watcher::Event<K>>> + Send>>,
}

impl<K> Reflector<K>
where
    K: Resource + Debug + Send + Sync + DeserializeOwned + Clone + 'static,
    K::DynamicType: Clone + Default + Hash + Eq,
{
    pub async fn new(client: &Client) -> anyhow::Result<Reflector<K>> {
        Ok(client
            .run(|context| {
                let pods: Api<K> = context.api_namespaced();
                async {
                    let (reader, writer) = reflector::store();
                    let lp = ListParams::default();
                    let stream = Box::pin(reflector(writer, watcher(pods, lp)));
                    Ok::<_, Infallible>(Reflector { reader, stream })
                }
            })
            .await?)
    }
}

impl<K> Deref for Reflector<K>
where
    K: Resource + Debug + Send + DeserializeOwned + Clone + 'static,
    K::DynamicType: Clone + Default + Hash + Eq,
{
    type Target = Store<K>;

    fn deref(&self) -> &Self::Target {
        &self.reader
    }
}