use ahash::{AHashMap as HashMap, AHashSet as HashSet};
use futures_util::StreamExt;
use kube_core::{Resource, ResourceExt};
use kube_runtime::watcher::Event;
use parking_lot::RwLock;
use std::{collections::hash_map::Entry, sync::Arc};
pub type ClusterRemoved = HashSet<String>;
pub type NamespacedRemoved = HashMap<String, HashSet<String>>;
pub trait IndexClusterResource<T> {
fn apply(&mut self, resource: T);
fn delete(&mut self, name: String);
fn reset(&mut self, resources: Vec<T>, removed: ClusterRemoved) {
for resource in resources.into_iter() {
self.apply(resource);
}
for name in removed.into_iter() {
self.delete(name);
}
}
}
pub trait IndexNamespacedResource<T> {
fn apply(&mut self, resource: T);
fn delete(&mut self, namespace: String, name: String);
fn reset(&mut self, resources: Vec<T>, removed: NamespacedRemoved) {
for resource in resources.into_iter() {
self.apply(resource);
}
for (ns, names) in removed.into_iter() {
for name in names.into_iter() {
self.delete(ns.clone(), name);
}
}
}
}
pub async fn namespaced<T, R>(
index: Arc<RwLock<T>>,
events: impl futures_core::Stream<Item = Event<R>>,
) where
T: IndexNamespacedResource<R>,
R: Resource + std::fmt::Debug,
{
tokio::pin!(events);
let mut keys = HashMap::new();
while let Some(event) = events.next().await {
tracing::trace!(?event);
match event {
Event::Applied(resource) => {
let namespace = resource
.namespace()
.expect("resource must have a namespace");
let name = resource.name();
keys.entry(namespace)
.or_insert_with(HashSet::new)
.insert(name);
index.write().apply(resource);
}
Event::Deleted(resource) => {
let namespace = resource
.namespace()
.expect("resource must have a namespace");
let name = resource.name();
if let Entry::Occupied(mut entry) = keys.entry(namespace.clone()) {
entry.get_mut().remove(&name);
if entry.get().is_empty() {
entry.remove();
}
}
index.write().delete(namespace, name);
}
Event::Restarted(resources) => {
let mut removed = keys.clone();
for resource in resources.iter() {
let namespace = resource
.namespace()
.expect("resource must have a namespace");
let name = resource.name();
if let Some(names) = removed.get_mut(&namespace) {
names.remove(&name);
}
keys.entry(namespace).or_default().insert(name);
}
for (namespace, names) in removed.iter() {
for name in names.iter() {
if let Entry::Occupied(mut entry) = keys.entry(namespace.clone()) {
entry.get_mut().remove(name);
if entry.get().is_empty() {
entry.remove();
}
}
}
}
index.write().reset(resources, removed);
}
}
}
}
pub async fn cluster<T, R>(
index: Arc<RwLock<T>>,
events: impl futures_core::Stream<Item = Event<R>>,
) where
T: IndexClusterResource<R>,
R: Resource + std::fmt::Debug,
{
tokio::pin!(events);
let mut keys = HashSet::new();
while let Some(event) = events.next().await {
tracing::trace!(?event);
match event {
Event::Applied(resource) => {
keys.insert(resource.name());
index.write().apply(resource);
}
Event::Deleted(resource) => {
let name = resource.name();
keys.remove(&name);
index.write().delete(name);
}
Event::Restarted(resources) => {
let mut removed = keys.clone();
for resource in resources.iter() {
let name = resource.name();
removed.remove(&name);
keys.insert(name);
}
for name in removed.iter() {
keys.remove(name);
}
index.write().reset(resources, removed);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use parking_lot::RwLock;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_test::{assert_pending, task};
#[test]
fn namespaced_restart() {
let state = Arc::new(RwLock::new(NsCache(HashMap::new())));
let (tx, rx) = mpsc::channel(1);
let mut task = task::spawn(namespaced(state.clone(), ReceiverStream::new(rx)));
tx.try_send(kube::runtime::watcher::Event::Restarted(
(0..2)
.map(|i| k8s_openapi::api::core::v1::Pod {
metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
namespace: Some("default".to_string()),
name: Some(format!("pod-{}", i)),
..Default::default()
},
..Default::default()
})
.collect(),
))
.unwrap();
assert_pending!(task.poll());
assert_eq!(
state.read().0,
Some((
"default".to_string(),
vec!["pod-0".to_string(), "pod-1".to_string(),]
.into_iter()
.collect()
))
.into_iter()
.collect()
);
tx.try_send(kube::runtime::watcher::Event::Restarted(
(1..3)
.map(|i| k8s_openapi::api::core::v1::Pod {
metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
namespace: Some("default".to_string()),
name: Some(format!("pod-{}", i)),
..Default::default()
},
..Default::default()
})
.collect(),
))
.unwrap();
assert_pending!(task.poll());
assert_eq!(
state.read().0,
Some((
"default".to_string(),
vec!["pod-1".to_string(), "pod-2".to_string(),]
.into_iter()
.collect()
))
.into_iter()
.collect()
);
}
#[test]
fn clustered_restart() {
let state = Arc::new(RwLock::new(ClusterCache(HashSet::new())));
let (tx, rx) = mpsc::channel(1);
let mut task = task::spawn(cluster(state.clone(), ReceiverStream::new(rx)));
tx.try_send(kube::runtime::watcher::Event::Restarted(
(0..2)
.map(|i| k8s_openapi::api::core::v1::Namespace {
metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
namespace: Some("default".to_string()),
name: Some(format!("ns-{}", i)),
..Default::default()
},
..Default::default()
})
.collect(),
))
.unwrap();
assert_pending!(task.poll());
assert_eq!(
state.read().0,
vec!["ns-0".to_string(), "ns-1".to_string()]
.into_iter()
.collect()
);
tx.try_send(kube::runtime::watcher::Event::Restarted(
(1..3)
.map(|i| k8s_openapi::api::core::v1::Namespace {
metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
namespace: Some("default".to_string()),
name: Some(format!("ns-{}", i)),
..Default::default()
},
..Default::default()
})
.collect(),
))
.unwrap();
assert_pending!(task.poll());
assert_eq!(
state.read().0,
vec!["ns-1".to_string(), "ns-2".to_string()]
.into_iter()
.collect(),
);
}
struct ClusterCache(HashSet<String>);
struct NsCache(HashMap<String, HashSet<String>>);
impl<T: Resource> IndexClusterResource<T> for ClusterCache {
fn apply(&mut self, resource: T) {
self.0.insert(resource.name());
}
fn delete(&mut self, name: String) {
self.0.remove(&*name);
}
}
impl<T: Resource> IndexNamespacedResource<T> for NsCache {
fn apply(&mut self, resource: T) {
let namespace = resource
.namespace()
.expect("resource must have a namespace");
let name = resource.name();
self.0
.entry(namespace)
.or_insert_with(HashSet::new)
.insert(name);
}
fn delete(&mut self, namespace: String, name: String) {
if let Entry::Occupied(mut entry) = self.0.entry(namespace) {
entry.get_mut().remove(&name);
if entry.get().is_empty() {
entry.remove();
}
}
}
}
}