1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
use crate::event_driver::{EventDriver, EventType};
use anyhow::Result;
use futures::{StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::Pod;
use kube::{
api::{Api, ListParams, WatchEvent},
Client,
};
use log::{debug, error, warn};
mod event_driver;
mod util;
pub async fn watch(client: Client) -> Result<()> {
let pods: Api<Pod> = Api::all(client.clone());
let wp = ListParams::default().timeout(0);
let mut event_driver = EventDriver::new();
loop {
debug!("Creating stream with Pods API abstraction...");
let mut stream = pods.watch(&wp, "0").await?.boxed();
debug!("Watching events...");
while let Some(status) = stream.try_next().await? {
match status {
WatchEvent::<Pod>::Added(pod) => {
event_driver.new_event(pod, EventType::Added).await
}
WatchEvent::<Pod>::Deleted(pod) => {
event_driver.new_event(pod, EventType::Deleted).await
}
WatchEvent::<Pod>::Error(report) => error!("{}", report),
_ => {}
}
}
warn!("Restarting watcher...");
}
}