use std::collections::HashMap;
use async_trait::async_trait;
use k8s_openapi::api::core::v1::{ConfigMap, EnvVarSource, Secret};
use kube::api::Api;
use std::sync::Arc;
use thiserror::Error;
use tracing::{error, info};
use crate::container::Container;
use crate::log::Sender;
use crate::node::Builder;
use crate::plugin_watcher::PluginRegistry;
use crate::pod::Pod;
use crate::pod::Status as PodStatus;
use krator::{ObjectState, State};
#[async_trait]
pub trait Provider: Sized + Send + Sync + 'static {
type ProviderState: 'static + Send + Sync;
type PodState: ObjectState<
Manifest = Pod,
Status = PodStatus,
SharedState = Self::ProviderState,
>;
type InitialState: Default + State<Self::PodState>;
type TerminatedState: Default + State<Self::PodState>;
const ARCH: &'static str;
fn provider_state(&self) -> krator::SharedState<Self::ProviderState>;
async fn node(&self, _builder: &mut Builder) -> anyhow::Result<()> {
Ok(())
}
async fn initialize_pod_state(&self, pod: &Pod) -> anyhow::Result<Self::PodState>;
async fn logs(
&self,
namespace: String,
pod: String,
container: String,
sender: Sender,
) -> anyhow::Result<()>;
async fn exec(&self, _pod: Pod, _command: String) -> anyhow::Result<Vec<String>> {
Err(NotImplementedError.into())
}
fn volume_path(&self) -> Option<std::path::PathBuf> {
None
}
fn plugin_registry(&self) -> Option<Arc<PluginRegistry>> {
None
}
async fn env_vars(
container: &Container,
pod: &Pod,
client: &kube::Client,
) -> HashMap<String, String> {
let mut env = HashMap::new();
let vars = match container.env().as_ref() {
Some(e) => e,
None => return env,
};
for env_var in vars.clone().into_iter() {
let key = env_var.name;
let value = match env_var.value {
Some(v) => v,
None => {
on_missing_env_value(
env_var.value_from,
client,
pod.namespace(),
&field_map(pod),
)
.await
}
};
env.insert(key, value);
}
env
}
}
pub async fn env_vars(
container: &Container,
pod: &Pod,
client: &kube::Client,
) -> HashMap<String, String> {
let mut env = HashMap::new();
let vars = match container.env().as_ref() {
Some(e) => e,
None => return env,
};
for env_var in vars.clone().into_iter() {
let key = env_var.name;
let value = match env_var.value {
Some(v) => v,
None => {
on_missing_env_value(env_var.value_from, client, pod.namespace(), &field_map(pod))
.await
}
};
env.insert(key, value);
}
env
}
#[doc(hidden)]
async fn on_missing_env_value(
env_var_source: Option<EnvVarSource>,
client: &kube::Client,
ns: &str,
fields: &HashMap<String, String>,
) -> String {
let env_src = match env_var_source {
Some(env_src) => env_src,
None => return String::new(),
};
if let Some(cfkey) = env_src.config_map_key_ref.as_ref() {
let name = cfkey.name.as_deref().unwrap_or_default();
match Api::<ConfigMap>::namespaced(client.clone(), ns)
.get(name)
.await
{
Ok(cfgmap) => {
return cfgmap
.data
.unwrap_or_default()
.get(&cfkey.key)
.cloned()
.unwrap_or_default();
}
Err(e) => {
error!("Error fetching config map {}: {}", name, e);
return "".to_string();
}
}
}
if let Some(seckey) = env_src.secret_key_ref.as_ref() {
let name = seckey.name.as_deref().unwrap_or_default();
match Api::<Secret>::namespaced(client.clone(), ns)
.get(name)
.await
{
Ok(secret) => {
return secret
.data
.unwrap_or_default()
.remove(&seckey.key)
.map(|s| String::from_utf8(s.0).unwrap_or_default())
.unwrap_or_default();
}
Err(e) => {
error!("Error fetching secret {}: {}", name, e);
return String::new();
}
}
}
if let Some(cfkey) = env_src.field_ref.as_ref() {
return fields.get(&cfkey.field_path).cloned().unwrap_or_default();
}
String::new()
}
fn field_map(pod: &Pod) -> HashMap<String, String> {
let mut map: HashMap<String, String> = HashMap::new();
map.insert("metadata.name".into(), pod.name().to_owned());
map.insert("metadata.namespace".into(), pod.namespace().to_owned());
map.insert(
"spec.serviceAccountName".into(),
pod.service_account_name().unwrap_or_default().to_owned(),
);
map.insert(
"status.hostIP".into(),
pod.host_ip().unwrap_or_default().to_owned(),
);
map.insert(
"status.podIP".into(),
pod.pod_ip().unwrap_or_default().to_owned(),
);
pod.labels().iter().for_each(|(k, v)| {
info!("adding {} to labels", k);
map.insert(format!("metadata.labels.{}", k), v.clone());
});
pod.annotations().iter().for_each(|(k, v)| {
map.insert(format!("metadata.annotations.{}", k), v.clone());
});
map
}
#[derive(Debug, Error)]
pub enum ProviderError {
#[error("cannot find pod {}", pod_name)]
PodNotFound {
pod_name: String,
},
#[error("cannot find container {} in pod {}", container_name, pod_name)]
ContainerNotFound {
pod_name: String,
container_name: String,
},
}
#[derive(Error, Debug)]
#[error("Operation not supported")]
pub struct NotImplementedError;