dis_spawner/
event_stream.rs1use futures::Stream;
2use k8s_openapi::api::core::v1::Event as KubeEventResource;
3use kube::{
4 api::ListParams,
5 runtime::watcher::{watcher, Error as KubeWatcherError, Event as KubeWatcherEvent},
6 Api, Client, Error as KubeError,
7};
8use tokio_stream::StreamExt;
9
10fn list_params_for_resource(resource_name: &str) -> ListParams {
11 ListParams {
12 field_selector: Some(format!(
13 "involvedObject.name={},involvedObject.kind=SessionLivedBackend",
14 resource_name
15 )),
16 ..ListParams::default()
17 }
18}
19
20pub async fn past_events(
21 client: Client,
22 resource_name: &str,
23 namespace: &str,
24) -> Result<Vec<KubeEventResource>, KubeError> {
25 let api = Api::<KubeEventResource>::namespaced(client, &namespace);
26
27 let list_params = list_params_for_resource(resource_name);
28
29 Ok(api.list(&list_params).await?.items)
30}
31
32pub async fn event_stream(
33 client: Client,
34 resource_name: &str,
35 namespace: &str,
36) -> Result<impl Stream<Item = Result<KubeEventResource, KubeWatcherError>>, KubeError> {
37 let api = Api::<KubeEventResource>::namespaced(client, &namespace);
38
39 let list_params = list_params_for_resource(resource_name);
40
41 let event_stream = watcher(api.clone(), list_params.clone()).filter_map(|event| match event {
42 Ok(event) => match event {
43 KubeWatcherEvent::Applied(event) => Some(Ok(event)),
44 _ => None,
45 },
46 Err(err) => Some(Err(err)),
47 });
48
49 let mut pre_events = api.list(&list_params).await?.items;
50 pre_events.sort_by_key(|event| event.event_time.clone());
51
52 let pre_events = futures::stream::iter(pre_events.into_iter().map(|d| Ok(d)));
53
54 Ok(pre_events.merge(event_stream))
55}