use crate::{
api::{Api, ListParams, Meta, WatchEvent},
Error, Result,
};
use futures::{future::FutureExt, lock::Mutex, pin_mut, select, TryStreamExt};
use serde::de::DeserializeOwned;
use tokio::{signal::ctrl_c, time::sleep};
#[cfg(not(target_family = "windows"))] use tokio::signal;
#[cfg(target_family = "windows")] use tokio::sync::mpsc::{channel, Receiver};
use std::{collections::BTreeMap, sync::Arc, time::Duration};
#[derive(Clone)]
#[deprecated(note = "Replaced by kube_runtime::reflector", since = "0.38.0")]
pub struct Reflector<K>
where
K: Clone + DeserializeOwned + Meta,
{
state: Arc<Mutex<State<K>>>,
params: ListParams,
api: Api<K>,
}
impl<K> Reflector<K>
where
K: Clone + DeserializeOwned + Meta,
{
pub fn new(api: Api<K>) -> Self {
Reflector {
api,
params: ListParams::default(),
state: Default::default(),
}
}
pub fn params(mut self, lp: ListParams) -> Self {
self.params = lp;
self
}
pub async fn run(self) -> Result<()> {
self.reset().await?;
loop {
let ctrlc_fut = ctrl_c().fuse();
#[cfg(not(target_family = "windows"))] use signal::unix::{signal, SignalKind};
#[cfg(not(target_family = "windows"))]
let mut sigterm = signal(SignalKind::terminate()).unwrap();
#[cfg(not(target_family = "windows"))]
let sigterm_fut = sigterm.recv().fuse();
#[cfg(target_family = "windows")]
let (_tx, mut rx): (_, Receiver<()>) = channel(1);
#[cfg(target_family = "windows")]
let sigterm_fut = rx.recv().fuse();
let poll_fut = self.poll().fuse();
pin_mut!(ctrlc_fut, sigterm_fut, poll_fut);
select! {
_ctrlc = ctrlc_fut => {
info!("Received ctrl_c, exiting");
return Ok(());
},
_sigterm = sigterm_fut => {
info!("Received SIGTERM, exiting");
return Ok(());
}
poll = poll_fut => {
if let Err(e) = poll {
warn!("Poll error on {}: {}: {:?}", self.api.resource.kind, e, e);
let dur = Duration::from_secs(10);
sleep(dur).await;
self.reset().await?; }
}
}
}
}
async fn poll(&self) -> Result<()> {
let kind = &self.api.resource.kind;
let resource_version = self.state.lock().await.version.clone();
trace!("Polling {} from resourceVersion={}", kind, resource_version);
let stream = self.api.watch(&self.params, &resource_version).await?;
pin_mut!(stream);
while let Some(ev) = stream.try_next().await? {
let mut state = self.state.lock().await;
match &ev {
WatchEvent::Added(o) | WatchEvent::Modified(o) | WatchEvent::Deleted(o) => {
if let Some(nv) = Meta::resource_ver(o) {
trace!("Updating reflector version for {} to {}", kind, nv);
state.version = nv.clone();
}
}
WatchEvent::Bookmark(bm) => {
let rv = &bm.metadata.resource_version;
trace!("Updating reflector version for {} to {}", kind, rv);
state.version = rv.clone();
}
_ => {}
}
let data = &mut state.data;
match ev {
WatchEvent::Added(o) => {
debug!("Adding {} to {}", Meta::name(&o), kind);
data.entry(ObjectId::key_for(&o)).or_insert_with(|| o.clone());
}
WatchEvent::Modified(o) => {
debug!("Modifying {} in {}", Meta::name(&o), kind);
data.entry(ObjectId::key_for(&o)).and_modify(|e| *e = o.clone());
}
WatchEvent::Deleted(o) => {
debug!("Removing {} from {}", Meta::name(&o), kind);
data.remove(&ObjectId::key_for(&o));
}
WatchEvent::Bookmark(bm) => {
debug!("Bookmarking {}", &bm.types.kind);
}
WatchEvent::Error(e) => {
warn!("Failed to watch {}: {:?}", kind, e);
return Err(Error::Api(e));
}
}
}
Ok(())
}
pub async fn reset(&self) -> Result<()> {
trace!("Resetting {}", self.api.resource.kind);
let (data, version) = self.get_full_resource_entries().await?;
*self.state.lock().await = State { data, version };
Ok(())
}
async fn get_full_resource_entries(&self) -> Result<(Cache<K>, String)> {
let res = self.api.list(&self.params).await?;
let version = res.metadata.resource_version.unwrap_or_default();
trace!(
"Got {} {} at resourceVersion={:?}",
res.items.len(),
self.api.resource.kind,
version
);
let mut data = BTreeMap::new();
for i in res.items {
data.insert(ObjectId::key_for(&i), i);
}
let keys = data
.keys()
.map(ObjectId::to_string)
.collect::<Vec<_>>()
.join(", ");
debug!("Initialized with: [{}]", keys);
Ok((data, version))
}
pub async fn state(&self) -> Result<Vec<K>> {
let state = self.state.lock().await;
Ok(state.data.values().cloned().collect::<Vec<K>>())
}
pub async fn get(&self, name: &str) -> Result<Option<K>> {
let id = ObjectId {
name: name.into(),
namespace: self.api.resource.namespace.clone(),
};
Ok(self.state.lock().await.data.get(&id).map(Clone::clone))
}
pub async fn get_within(&self, name: &str, ns: &str) -> Result<Option<K>> {
let id = ObjectId {
name: name.into(),
namespace: Some(ns.into()),
};
Ok(self.state.lock().await.data.get(&id).map(Clone::clone))
}
}
#[derive(Ord, PartialOrd, Hash, Eq, PartialEq, Clone)]
struct ObjectId {
name: String,
namespace: Option<String>,
}
impl ToString for ObjectId {
fn to_string(&self) -> String {
match &self.namespace {
Some(ns) => format!("{} [{}]", self.name, ns),
None => self.name.clone(),
}
}
}
impl ObjectId {
fn key_for<K: Meta>(o: &K) -> Self {
ObjectId {
name: Meta::name(o),
namespace: Meta::namespace(o),
}
}
}
struct State<K> {
data: Cache<K>,
version: String,
}
impl<K> Default for State<K> {
fn default() -> Self {
State {
data: Default::default(),
version: 0.to_string(),
}
}
}
type Cache<K> = BTreeMap<ObjectId, K>;