1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
use futures::Stream;
use k8s_openapi::api::core::v1::Event as KubeEventResource;
use kube::{
    api::ListParams,
    runtime::watcher::{watcher, Error as KubeWatcherError, Event as KubeWatcherEvent},
    Api, Client, Error as KubeError,
};
use tokio_stream::StreamExt;

pub async fn event_stream(
    client: Client,
    resource_name: &str,
    namespace: &str,
) -> Result<impl Stream<Item = Result<KubeEventResource, KubeWatcherError>>, KubeError> {
    let api = Api::<KubeEventResource>::namespaced(client, &namespace);

    let list_params = ListParams {
        field_selector: Some(format!(
            "involvedObject.name={},involvedObject.kind=SessionLivedBackend",
            resource_name
        )),
        ..ListParams::default()
    };

    let event_stream = watcher(api.clone(), list_params.clone()).filter_map(|event| match event {
        Ok(event) => match event {
            KubeWatcherEvent::Applied(event) => Some(Ok(event)),
            _ => None,
        },
        Err(err) => Some(Err(err)),
    });

    let mut pre_events = api.list(&list_params).await?.items;
    pre_events.sort_by_key(|event| event.event_time.clone());

    let pre_events = futures::stream::iter(pre_events.into_iter().map(|d| Ok(d)));

    Ok(pre_events.merge(event_stream))
}