poddy/k8s/
reflector.rs

1use crate::client::Client;
2use futures::Stream;
3use k8s_openapi::serde::de::DeserializeOwned;
4use kube::{
5    api::ListParams,
6    runtime::{
7        reflector::{self, reflector, Store},
8        watcher,
9    },
10    Api, Resource,
11};
12use std::{convert::Infallible, fmt::Debug, hash::Hash, ops::Deref, pin::Pin};
13
14pub struct Reflector<K>
15where
16    K: Resource + 'static,
17    K::DynamicType: Hash + Eq,
18{
19    pub reader: Store<K>,
20    pub stream: Pin<Box<dyn Stream<Item = watcher::Result<watcher::Event<K>>> + Send>>,
21}
22
23impl<K> Reflector<K>
24where
25    K: Resource + Debug + Send + Sync + DeserializeOwned + Clone + 'static,
26    K::DynamicType: Clone + Default + Hash + Eq,
27{
28    pub async fn new(client: &Client) -> anyhow::Result<Reflector<K>> {
29        Ok(client
30            .run(|context| {
31                let pods: Api<K> = context.api_namespaced();
32                async {
33                    let (reader, writer) = reflector::store();
34                    let lp = ListParams::default();
35                    let stream = Box::pin(reflector(writer, watcher(pods, lp)));
36                    Ok::<_, Infallible>(Reflector { reader, stream })
37                }
38            })
39            .await?)
40    }
41}
42
43impl<K> Deref for Reflector<K>
44where
45    K: Resource + Debug + Send + DeserializeOwned + Clone + 'static,
46    K::DynamicType: Clone + Default + Hash + Eq,
47{
48    type Target = Store<K>;
49
50    fn deref(&self) -> &Self::Target {
51        &self.reader
52    }
53}