use crate::{
api::{Api, ListParams, Meta, WatchEvent},
Result,
};
use futures::{lock::Mutex, Stream, StreamExt};
use serde::de::DeserializeOwned;
use std::{sync::Arc, time::Duration};
#[derive(Clone)]
#[deprecated(note = "Replaced by kube_runtime::watcher", since = "0.38.0")]
pub struct Informer<K>
where
K: Clone + DeserializeOwned + Meta,
{
version: Arc<Mutex<String>>,
api: Api<K>,
params: ListParams,
needs_resync: Arc<Mutex<bool>>,
}
impl<K> Informer<K>
where
K: Clone + DeserializeOwned + Meta,
{
pub fn new(api: Api<K>) -> Self {
Informer {
api,
params: ListParams::default(),
version: Arc::new(Mutex::new(0.to_string())),
needs_resync: Arc::new(Mutex::new(false)),
}
}
pub fn params(mut self, lp: ListParams) -> Self {
self.params = lp;
self
}
pub fn set_version(self, v: String) -> Self {
debug!("Setting Informer version for {} to {}", self.api.resource.kind, v);
futures::executor::block_on(async {
*self.version.lock().await = v;
});
self
}
pub async fn reset(&self) {
*self.version.lock().await = 0.to_string();
}
pub fn version(&self) -> String {
futures::executor::block_on(async { self.version.lock().await.clone() })
}
pub async fn poll(&self) -> Result<impl Stream<Item = Result<WatchEvent<K>>>> {
trace!("Watching {}", self.api.resource.kind);
{
let mut needs_resync = self.needs_resync.lock().await;
if *needs_resync {
let dur = Duration::from_secs(10);
tokio::time::sleep(dur).await;
if *needs_resync {
self.reset().await;
}
*needs_resync = false;
}
}
let version = self.version.clone();
let needs_resync = self.needs_resync.clone();
let resource_version = self.version.lock().await.clone();
let stream = self.api.watch(&self.params, &resource_version).await?;
let newstream = stream.then(move |event| {
let needs_resync = needs_resync.clone();
let version = version.clone();
async move {
match &event {
Ok(WatchEvent::Added(o)) | Ok(WatchEvent::Modified(o)) | Ok(WatchEvent::Deleted(o)) => {
if let Some(nv) = Meta::resource_ver(o) {
*version.lock().await = nv.clone();
}
}
Ok(WatchEvent::Bookmark(bm)) => {
*version.lock().await = bm.metadata.resource_version.clone();
}
Ok(WatchEvent::Error(e)) => {
if e.code == 410 {
warn!("Stream desynced: {:?}", e);
*needs_resync.lock().await = true;
}
}
Err(e) => {
warn!("Unexpected watch error: {:?}", e);
}
};
event
}
});
Ok(newstream)
}
}