use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use futures::{StreamExt, TryStreamExt};
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
use tracing::{debug, error, info, trace, warn};
use kube::{
api::{Api, ListParams, Resource, ResourceExt},
Client,
};
use kube_runtime::watcher;
use kube_runtime::watcher::Event;
use crate::manifest::Manifest;
use crate::object::ObjectKey;
use crate::object::ObjectState;
use crate::operator::Operator;
use crate::state::{run_to_completion, SharedState};
use crate::store::Store;
use crate::util::PrettyEvent;
#[derive(Debug)]
enum ObjectEvent<R> {
Applied(R),
Deleted {
name: String,
namespace: Option<String>,
},
}
impl<R: Resource> From<&ObjectEvent<R>> for PrettyEvent {
fn from(event: &ObjectEvent<R>) -> Self {
match event {
ObjectEvent::Applied(object) => PrettyEvent::Applied {
name: object.name(),
namespace: object.namespace(),
},
ObjectEvent::Deleted { name, namespace } => PrettyEvent::Deleted {
name: name.to_string(),
namespace: namespace.clone(),
},
}
}
}
pub struct OperatorRuntime<O: Operator> {
client: Client,
handlers: HashMap<ObjectKey, Sender<ObjectEvent<O::Manifest>>>,
operator: Arc<O>,
list_params: ListParams,
signal: Option<Arc<AtomicBool>>,
store: Store,
}
impl<O: Operator> OperatorRuntime<O> {
pub fn new(kubeconfig: &kube::Config, operator: O, params: Option<ListParams>) -> Self {
let client = Client::try_from(kubeconfig.clone())
.expect("Unable to create kube::Client from kubeconfig.");
let list_params = params.unwrap_or_default();
OperatorRuntime {
client,
handlers: HashMap::new(),
operator: Arc::new(operator),
list_params,
signal: None,
store: Store::new(),
}
}
#[cfg(not(feature = "admission-webhook"))]
pub(crate) fn new_with_store(
kubeconfig: &kube::Config,
operator: O,
params: Option<ListParams>,
store: Store,
) -> Self {
let client = Client::try_from(kubeconfig.clone())
.expect("Unable to create kube::Client from kubeconfig.");
let list_params = params.unwrap_or_default();
OperatorRuntime {
client,
handlers: HashMap::new(),
operator: Arc::new(operator),
list_params,
signal: None,
store,
}
}
#[tracing::instrument(
level="trace",
skip(self, event),
fields(event = ?PrettyEvent::from(&event))
)]
async fn dispatch(&mut self, event: ObjectEvent<O::Manifest>) -> anyhow::Result<()> {
match event {
ObjectEvent::Applied(object) => {
let key: ObjectKey = (&object).into();
match self.handlers.get_mut(&key) {
Some(sender) => {
trace!("Found existing event handler for object.");
match sender.send(ObjectEvent::Applied(object)).await {
Ok(_) => trace!("Successfully sent event to handler for object."),
Err(error) => error!(
name=key.name(),
namespace=?key.namespace(),
?error,
"Error while sending event. Will retry on next event.",
),
}
}
None => {
debug!(
name=key.name(),
namespace=?key.namespace(),
"Creating event handler for object.",
);
self.handlers.insert(
key.clone(),
self.start_object(object).await?,
);
}
}
Ok(())
}
ObjectEvent::Deleted { name, namespace } => {
let key = ObjectKey::new(namespace.clone(), name.clone());
if let Some(sender) = self.handlers.remove(&key) {
debug!(
"Removed event handler for object {} in namespace {:?}.",
key.name(),
key.namespace()
);
sender
.send(ObjectEvent::Deleted { name, namespace })
.await?;
}
Ok(())
}
}
}
async fn start_object(
&self,
manifest: O::Manifest,
) -> anyhow::Result<Sender<ObjectEvent<O::Manifest>>> {
let (sender, mut receiver) = tokio::sync::mpsc::channel::<ObjectEvent<O::Manifest>>(128);
let deleted = Arc::new(RwLock::new(false));
let deleted_event = Arc::new(RwLock::new(false));
let object_state = self.operator.initialize_object_state(&manifest).await?;
let (manifest_tx, manifest_rx) = Manifest::new(manifest, self.store.clone());
let reflector_deleted = Arc::clone(&deleted);
let reflector_deleted_event = Arc::clone(&deleted_event);
tokio::spawn(async move {
while let Some(event) = receiver.recv().await {
match event {
ObjectEvent::Applied(manifest) => {
trace!(
name=%manifest.name(),
namespace=?manifest.namespace(),
"Resource applied.",
);
let meta = manifest.meta();
if meta.deletion_timestamp.is_some() {
{
let mut event = reflector_deleted.write().await;
*event = true;
}
}
match manifest_tx.send(manifest) {
Ok(()) => (),
Err(_) => {
debug!("Manifest receiver hung up, exiting.");
return;
}
}
}
ObjectEvent::Deleted { name, namespace } => {
debug!(
%name,
?namespace,
"Resource deleted.",
);
{
let mut event = reflector_deleted.write().await;
*event = true;
}
{
let mut event = reflector_deleted_event.write().await;
*event = true;
}
break;
}
}
}
});
tokio::spawn(run_object_task::<O>(
self.client.clone(),
manifest_rx,
self.operator.shared_state().await,
object_state,
deleted,
deleted_event,
Arc::clone(&self.operator),
));
Ok(sender)
}
#[tracing::instrument(
level="trace",
skip(self, objects),
fields(count=objects.len())
)]
async fn resync(&mut self, objects: Vec<O::Manifest>) -> anyhow::Result<()> {
let current_objects: HashSet<ObjectKey> = objects.iter().map(|obj| obj.into()).collect();
let objects_in_state: HashSet<ObjectKey> = self.handlers.keys().cloned().collect();
for key in objects_in_state.difference(¤t_objects) {
trace!(
name=key.name(),
namespace=?key.namespace(),
"object_deleted"
);
self.dispatch(ObjectEvent::Deleted {
name: key.name().to_string(),
namespace: key.namespace().cloned(),
})
.await?;
}
for object in objects.into_iter() {
trace!(
name=%object.name(),
namespace=?object.namespace(),
"object_applied"
);
self.dispatch(ObjectEvent::Applied(object)).await?
}
Ok(())
}
#[tracing::instrument(
level="trace",
skip(self, event),
fields(event=?PrettyEvent::from(&event))
)]
pub(crate) async fn handle_event(&mut self, event: Event<O::Manifest>) {
if let Some(ref signal) = self.signal {
if matches!(event, kube_runtime::watcher::Event::Applied(_))
&& signal.load(Ordering::Relaxed)
{
warn!("Controller is shutting down (got signal). Dropping Add event.");
return;
}
}
match event {
Event::Restarted(objects) => {
info!("Got a watch restart. Resyncing queue...");
match self.resync(objects).await {
Ok(()) => info!("Finished resync of objects."),
Err(error) => warn!(?error, "Error resyncing objects."),
};
}
Event::Applied(object) => {
match self.dispatch(ObjectEvent::Applied(object)).await {
Ok(()) => debug!("Dispatched event for processing."),
Err(error) => warn!(?error, "Error dispatching object event."),
};
}
Event::Deleted(object) => {
let key: ObjectKey = (&object).into();
let event = ObjectEvent::<O::Manifest>::Deleted {
name: key.name().to_string(),
namespace: key.namespace().cloned(),
};
match self.dispatch(event).await {
Ok(()) => debug!("Dispatched event for processing."),
Err(error) => warn!(?error, "Error dispatching object event."),
};
}
}
}
pub async fn main_loop(&mut self) {
let api = Api::<O::Manifest>::all(self.client.clone());
let mut informer = watcher(api, self.list_params.clone()).boxed();
loop {
match informer.try_next().await {
Ok(Some(event)) => self.handle_event(event).await,
Ok(None) => break,
Err(error) => warn!(?error, "Error streaming object events."),
}
}
}
#[cfg(not(feature = "admission-webhook"))]
pub async fn start(&mut self) {
self.main_loop().await;
}
#[cfg(feature = "admission-webhook")]
pub async fn start(&mut self) {
let hook = crate::admission::endpoint(Arc::clone(&self.operator));
let main = self.main_loop();
tokio::select!(
_ = main => warn!("Main loop exited"),
_ = hook => warn!("Admission hook exited."),
)
}
}
async fn wait_event(event: Arc<RwLock<bool>>) {
loop {
{
let event = event.read().await;
if *event {
break;
}
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
async fn run_object_task<O: Operator>(
client: Client,
manifest: Manifest<O::Manifest>,
shared: SharedState<<O::ObjectState as ObjectState>::SharedState>,
mut object_state: O::ObjectState,
deleted: Arc<RwLock<bool>>,
deleted_event: Arc<RwLock<bool>>,
operator: Arc<O>,
) {
debug!("Running registration hook.");
let state: O::InitialState = Default::default();
let (namespace, name) = {
let m = manifest.latest();
match operator.registration_hook(manifest.clone()).await {
Ok(()) => debug!("Running hook complete."),
Err(e) => {
error!(
"Operator registration hook for object {} in namespace {:?} failed: {:?}",
m.name(),
m.namespace(),
e
);
return;
}
}
(m.namespace(), m.name())
};
tokio::select! {
_ = run_to_completion(&client, state, shared.clone(), &mut object_state, manifest.clone()) => (),
_ = wait_event(Arc::clone(&deleted)) => {
let state: O::DeletedState = Default::default();
debug!("Object {} in namespace {:?} terminated. Jumping to state {:?}.", name, &namespace, state);
run_to_completion(&client, state, shared.clone(), &mut object_state, manifest.clone()).await;
}
}
debug!(
"Resource {} in namespace {:?} waiting for deregistration.",
name, namespace
);
wait_event(Arc::clone(&deleted)).await;
{
let mut state_writer = shared.write().await;
object_state.async_drop(&mut state_writer).await;
}
match operator.deregistration_hook(manifest.clone()).await {
Ok(()) => (),
Err(e) => warn!(
"Operator deregistration hook for object {} in namespace {:?} failed: {:?}",
name, namespace, e
),
}
let api_client: Api<O::Manifest> = match namespace {
Some(ref namespace) => kube::Api::namespaced(client, namespace),
None => kube::Api::all(client),
};
let dp = kube::api::DeleteParams {
grace_period_seconds: Some(0),
..Default::default()
};
match api_client.delete(&name, &dp).await {
Ok(_) => {
debug!(
?namespace,
%name,
"Object deregistered"
);
}
Err(e) => match e {
kube::error::Error::Api(kube::error::ErrorResponse { code, .. }) if code == 404 => {
debug!(?namespace, %name, "Object already deleted")
}
error => {
warn!(
?namespace,
%name,
?error,
"Unable to deregister object with Kubernetes API"
);
}
},
}
wait_event(deleted_event).await;
debug!(?namespace, %name, "Object deleted");
}