dis_spawner/
event_stream.rs

1use futures::Stream;
2use k8s_openapi::api::core::v1::Event as KubeEventResource;
3use kube::{
4    api::ListParams,
5    runtime::watcher::{watcher, Error as KubeWatcherError, Event as KubeWatcherEvent},
6    Api, Client, Error as KubeError,
7};
8use tokio_stream::StreamExt;
9
10fn list_params_for_resource(resource_name: &str) -> ListParams {
11    ListParams {
12        field_selector: Some(format!(
13            "involvedObject.name={},involvedObject.kind=SessionLivedBackend",
14            resource_name
15        )),
16        ..ListParams::default()
17    }
18}
19
20pub async fn past_events(
21    client: Client,
22    resource_name: &str,
23    namespace: &str,
24) -> Result<Vec<KubeEventResource>, KubeError> {
25    let api = Api::<KubeEventResource>::namespaced(client, &namespace);
26
27    let list_params = list_params_for_resource(resource_name);
28
29    Ok(api.list(&list_params).await?.items)
30}
31
32pub async fn event_stream(
33    client: Client,
34    resource_name: &str,
35    namespace: &str,
36) -> Result<impl Stream<Item = Result<KubeEventResource, KubeWatcherError>>, KubeError> {
37    let api = Api::<KubeEventResource>::namespaced(client, &namespace);
38
39    let list_params = list_params_for_resource(resource_name);
40
41    let event_stream = watcher(api.clone(), list_params.clone()).filter_map(|event| match event {
42        Ok(event) => match event {
43            KubeWatcherEvent::Applied(event) => Some(Ok(event)),
44            _ => None,
45        },
46        Err(err) => Some(Err(err)),
47    });
48
49    let mut pre_events = api.list(&list_params).await?.items;
50    pre_events.sort_by_key(|event| event.event_time.clone());
51
52    let pre_events = futures::stream::iter(pre_events.into_iter().map(|d| Ok(d)));
53
54    Ok(pre_events.merge(event_stream))
55}