use crate::config::Config;
use crate::node;
use crate::operator::PodOperator;
use crate::plugin_watcher::PluginRegistry;
use crate::provider::Provider;
use crate::webserver::start as start_webserver;
use futures::future::{FutureExt, TryFutureExt};
use kube::api::ListParams;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::signal::ctrl_c;
use tokio::task;
use tracing::{error, info, warn};
use krator::OperatorRuntime;
pub struct Kubelet<P> {
provider: Arc<P>,
kube_config: kube::Config,
config: Box<Config>,
}
impl<P: Provider> Kubelet<P> {
pub async fn new(
provider: P,
kube_config: kube::Config,
config: Config,
) -> anyhow::Result<Self> {
Ok(Self {
provider: Arc::new(provider),
kube_config,
config: Box::new(config),
})
}
pub async fn start(&self) -> anyhow::Result<()> {
let client = kube::Client::new(self.kube_config.clone());
node::create(&client, &self.config, self.provider.clone()).await;
let signal = Arc::new(AtomicBool::new(false));
let signal_task = start_signal_task(Arc::clone(&signal)).fuse().boxed();
let plugin_registrar = start_plugin_registry(self.provider.plugin_registry())
.fuse()
.boxed();
let webserver = start_webserver(self.provider.clone(), &self.config.server_config)
.fuse()
.boxed();
let node_updater = start_node_updater(client.clone(), self.config.node_name.clone())
.fuse()
.boxed();
let services = Box::pin(async {
tokio::select! {
res = signal_task => if let Err(e) = res {
error!("Signal task completed with error {:?}", &e);
},
res = webserver => error!("Webserver task completed with result {:?}", &res),
res = node_updater => if let Err(e) = res {
error!("Node updater task completed with error {:?}", &e);
},
res = plugin_registrar => if let Err(e) = res {
error!("Plugin registrar task completed with error {:?}", &e);
}
};
signal.store(true, Ordering::Relaxed);
Ok::<(), anyhow::Error>(())
});
let signal_handler = start_signal_handler(
Arc::clone(&signal),
client.clone(),
self.config.node_name.clone(),
)
.fuse()
.boxed();
let operator = PodOperator::new(Arc::clone(&self.provider), client.clone());
let node_selector = format!("spec.nodeName={}", &self.config.node_name);
let params = ListParams {
field_selector: Some(node_selector),
..Default::default()
};
let mut operator_runtime = OperatorRuntime::new(&self.kube_config, operator, Some(params));
let operator_task = operator_runtime.start().fuse().boxed();
let core = Box::pin(async {
tokio::select! {
res = signal_handler => res.map_err(|e| {
error!("Signal handler task joined with error {:?}", &e);
e
}),
_ = operator_task => {
warn!("Pod operator has completed");
Ok(())
}
}
});
tokio::try_join!(core, services)?;
Ok(())
}
}
impl<P> Clone for Kubelet<P> {
fn clone(&self) -> Self {
Self {
provider: self.provider.clone(),
kube_config: self.kube_config.clone(),
config: self.config.clone(),
}
}
}
async fn start_signal_task(signal: Arc<AtomicBool>) -> anyhow::Result<()> {
ctrl_c().await?;
warn!("Caught keyboard interrupt.");
signal.store(true, Ordering::Relaxed);
Ok(())
}
async fn start_plugin_registry(registrar: Option<Arc<PluginRegistry>>) -> anyhow::Result<()> {
match registrar {
Some(r) => r.run().await,
None => {
task::spawn(async {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(std::u64::MAX)).await;
}
})
.map_err(anyhow::Error::from)
.await
}
}
}
async fn start_node_updater(client: kube::Client, node_name: String) -> anyhow::Result<()> {
let sleep_interval = std::time::Duration::from_secs(10);
loop {
node::update(&client, &node_name).await;
tokio::time::sleep(sleep_interval).await;
}
}
async fn start_signal_handler(
signal: Arc<AtomicBool>,
client: kube::Client,
node_name: String,
) -> anyhow::Result<()> {
let duration = std::time::Duration::from_millis(100);
loop {
if signal.load(Ordering::Relaxed) {
info!("Signal caught.");
node::drain(&client, &node_name).await?;
break Ok(());
}
tokio::time::sleep(duration).await;
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::container::Container;
use crate::plugin_watcher::PluginRegistry;
use crate::pod::{Pod, Status};
use k8s_openapi::api::core::v1::{
Container as KubeContainer, EnvVar, EnvVarSource, ObjectFieldSelector, Pod as KubePod,
PodSpec, PodStatus,
};
use krator::ObjectState;
use kube::api::ObjectMeta;
use std::collections::BTreeMap;
use tokio::sync::RwLock;
fn mock_client() -> kube::Client {
kube::Client::new(kube::Config::new(
reqwest::Url::parse("http://127.0.0.1:8080").unwrap(),
))
}
struct MockProvider;
struct ProviderState;
struct PodState;
#[async_trait::async_trait]
impl ObjectState for PodState {
type Manifest = Pod;
type Status = Status;
type SharedState = ProviderState;
async fn async_drop(self, _provider_state: &mut ProviderState) {}
}
#[async_trait::async_trait]
impl Provider for MockProvider {
type ProviderState = ProviderState;
type InitialState = crate::pod::state::Stub;
type TerminatedState = crate::pod::state::Stub;
type PodState = PodState;
const ARCH: &'static str = "mock";
async fn initialize_pod_state(&self, _pod: &Pod) -> anyhow::Result<Self::PodState> {
Ok(PodState)
}
fn provider_state(&self) -> krator::SharedState<ProviderState> {
Arc::new(RwLock::new(ProviderState {}))
}
fn plugin_registry(&self) -> Option<Arc<PluginRegistry>> {
Some(Arc::new(PluginRegistry::default()))
}
async fn logs(
&self,
_namespace: String,
_pod: String,
_container: String,
_sender: crate::log::Sender,
) -> anyhow::Result<()> {
Ok(())
}
}
#[tokio::test]
async fn test_env_vars() {
let container = Container::new(&KubeContainer {
env: Some(vec![
EnvVar {
name: "first".into(),
value: Some("value".into()),
..Default::default()
},
EnvVar {
name: "second".into(),
value_from: Some(EnvVarSource {
field_ref: Some(ObjectFieldSelector {
field_path: "metadata.labels.label".into(),
..Default::default()
}),
..Default::default()
}),
..Default::default()
},
EnvVar {
name: "third".into(),
value_from: Some(EnvVarSource {
field_ref: Some(ObjectFieldSelector {
field_path: "metadata.annotations.annotation".into(),
..Default::default()
}),
..Default::default()
}),
..Default::default()
},
EnvVar {
name: "NAME".into(),
value_from: Some(EnvVarSource {
field_ref: Some(ObjectFieldSelector {
field_path: "metadata.name".into(),
..Default::default()
}),
..Default::default()
}),
..Default::default()
},
EnvVar {
name: "NAMESPACE".into(),
value_from: Some(EnvVarSource {
field_ref: Some(ObjectFieldSelector {
field_path: "metadata.namespace".into(),
..Default::default()
}),
..Default::default()
}),
..Default::default()
},
EnvVar {
name: "HOST_IP".into(),
value_from: Some(EnvVarSource {
field_ref: Some(ObjectFieldSelector {
field_path: "status.hostIP".into(),
..Default::default()
}),
..Default::default()
}),
..Default::default()
},
EnvVar {
name: "POD_IP".into(),
value_from: Some(EnvVarSource {
field_ref: Some(ObjectFieldSelector {
field_path: "status.podIP".into(),
..Default::default()
}),
..Default::default()
}),
..Default::default()
},
]),
..Default::default()
});
let name = "my-name".to_string();
let namespace = Some("my-namespace".to_string());
let mut labels = BTreeMap::new();
labels.insert("label".to_string(), "value".to_string());
let mut annotations = BTreeMap::new();
annotations.insert("annotation".to_string(), "value".to_string());
let pod = Pod::from(KubePod {
metadata: ObjectMeta {
labels: Some(labels),
annotations: Some(annotations),
name: Some(name),
namespace,
..Default::default()
},
spec: Some(PodSpec {
service_account_name: Some("svc".to_string()),
..Default::default()
}),
status: Some(PodStatus {
host_ip: Some("10.21.77.1".to_string()),
pod_ip: Some("10.21.77.2".to_string()),
..Default::default()
}),
});
let env = MockProvider::env_vars(&container, &pod, &mock_client()).await;
assert_eq!(
"value",
env.get("first").expect("key first should exist").as_str()
);
assert_eq!(
"value",
env.get("second").expect("metadata.labels.label").as_str()
);
assert_eq!(
"value",
env.get("third")
.expect("metadata.annotations.annotation")
.as_str()
);
assert_eq!("my-name", env.get("NAME").expect("metadata.name").as_str());
assert_eq!(
"my-namespace",
env.get("NAMESPACE").expect("metadata.namespace").as_str()
);
assert_eq!("10.21.77.2", env.get("POD_IP").expect("pod_ip").as_str());
assert_eq!("10.21.77.1", env.get("HOST_IP").expect("host_ip").as_str());
}
}