use axum::extract::State;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Json};
use axum::routing::{Router, get};
use clap::{Parser, crate_authors, crate_description, crate_version};
use k8s_openapi::api::core::v1::Namespace;
use kaniop_k8s_util::client::new_client_with_metrics;
use kaniop_operator::controller::{
SUBSCRIBE_BUFFER_SIZE, State as KaniopState, check_api_queryable, create_subscriber,
};
use kaniop_operator::kanidm::crd::Kanidm;
use kube::Config;
use prometheus_client::registry::Registry;
use tokio::net::TcpListener;
use tokio::signal::unix::{SignalKind, signal};
use kaniop_operator::telemetry;
use rustls::crypto::aws_lc_rs::default_provider;
async fn metrics(State(state): State<KaniopState>) -> impl IntoResponse {
match state.metrics() {
Ok(metrics) => (
StatusCode::OK,
[(
"content-type",
"application/openmetrics-text; version=1.0.0; charset=utf-8",
)],
metrics,
)
.into_response(),
Err(e) => {
tracing::error!("Failed to get metrics: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
}
async fn healthz() -> impl IntoResponse {
Json("healthy")
}
#[derive(Parser, Debug)]
#[command(
name="kaniop",
about = crate_description!(),
version = crate_version!(),
author = crate_authors!("\n"),
)]
struct Args {
#[arg(long, default_value = "0.0.0.0", env)]
listen_address: String,
#[arg(short, long, default_value_t = 8080, env)]
port: u16,
#[arg(long, default_value = "info", env)]
log_filter: String,
#[arg(long, value_enum, default_value_t = telemetry::LogFormat::Text, env)]
log_format: telemetry::LogFormat,
#[arg(short, long, env = "OPENTELEMETRY_ENDPOINT_URL")]
tracing_url: Option<String>,
#[arg(short, long, default_value_t = 0.1, env)]
sample_ratio: f64,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
default_provider().install_default().unwrap();
let args: Args = Args::parse();
telemetry::init(
&args.log_filter,
args.log_format,
args.tracing_url.as_deref(),
args.sample_ratio,
)
.await?;
let mut registry = Registry::with_prefix("kaniop");
let config = Config::infer().await?;
let client = new_client_with_metrics(config, &mut registry).await?;
let controllers = [
kaniop_operator::kanidm::controller::CONTROLLER_ID,
kaniop_group::controller::CONTROLLER_ID,
kaniop_oauth2::controller::CONTROLLER_ID,
kaniop_person::controller::CONTROLLER_ID,
kaniop_service_account::controller::CONTROLLER_ID,
];
let namespace = check_api_queryable::<Namespace>(client.clone()).await;
let namespace_r = create_subscriber::<Namespace>(SUBSCRIBE_BUFFER_SIZE);
let kanidm = check_api_queryable::<Kanidm>(client.clone()).await;
let kanidm_r = create_subscriber::<Kanidm>(SUBSCRIBE_BUFFER_SIZE);
let state = KaniopState::new(
registry,
&controllers,
namespace_r.store.clone(),
kanidm_r.store.clone(),
Some(client.clone()),
);
let kanidm_c = kaniop_operator::kanidm::controller::run(
state.clone(),
client.clone(),
namespace,
namespace_r,
kanidm,
kanidm_r,
);
let group_c = kaniop_group::controller::run(state.clone(), client.clone());
let oauth2_c = kaniop_oauth2::controller::run(state.clone(), client.clone());
let person_c = kaniop_person::controller::run(state.clone(), client.clone());
let service_account_c = kaniop_service_account::controller::run(state.clone(), client);
let app = Router::new()
.route("/metrics", get(metrics))
.route("/healthz", get(healthz))
.with_state(state.clone());
let addr = format!("{}:{}", args.listen_address, args.port);
tracing::info!("Starting HTTP server on {}", addr);
let listener = TcpListener::bind(&addr).await?;
let server = axum::serve(listener, app).with_graceful_shutdown(shutdown_signal());
tokio::join!(
kanidm_c,
group_c,
oauth2_c,
person_c,
service_account_c,
server
)
.5?;
Ok(())
}
async fn shutdown_signal() {
let mut sigterm =
signal(SignalKind::terminate()).expect("failed to install SIGTERM signal handler");
tokio::select! {
_ = tokio::signal::ctrl_c() => {},
_ = sigterm.recv() => {},
}
}