use k8s_openapi::api::core::v1::Namespace;
use kaniop_k8s_util::client::new_client_with_metrics;
use kaniop_operator::controller::{
SUBSCRIBE_BUFFER_SIZE, State as KaniopState, check_api_queryable, create_subscriber,
};
use kaniop_operator::kanidm::crd::Kanidm;
use kaniop_operator::telemetry;
use axum::extract::State;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Json};
use axum::routing::{Router, get};
use clap::{Parser, crate_authors, crate_description, crate_version};
use kube::Config;
use prometheus_client::registry::Registry;
use tokio::net::TcpListener;
use tokio::signal::unix::{SignalKind, signal};
async fn metrics(State(state): State<KaniopState>) -> impl IntoResponse {
match state.metrics() {
Ok(metrics) => (
StatusCode::OK,
[(
"content-type",
"application/openmetrics-text; version=1.0.0; charset=utf-8",
)],
metrics,
)
.into_response(),
Err(e) => {
tracing::error!("Failed to get metrics: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
}
async fn health() -> impl IntoResponse {
Json("healthy")
}
#[derive(Parser, Debug)]
#[command(
name="kaniop",
about = crate_description!(),
version = crate_version!(),
author = crate_authors!("\n"),
)]
struct Args {
#[arg(short, long, default_value_t = 8080, env)]
port: u16,
#[arg(long, default_value = "info", env)]
log_filter: String,
#[arg(long, value_enum, default_value_t = telemetry::LogFormat::Text, env)]
log_format: telemetry::LogFormat,
#[arg(short, long, env = "OPENTELEMETRY_ENDPOINT_URL")]
tracing_url: Option<String>,
#[arg(short, long, default_value_t = 0.1, env)]
sample_ratio: f64,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args: Args = Args::parse();
telemetry::init(
&args.log_filter,
args.log_format,
args.tracing_url.as_deref(),
args.sample_ratio,
)
.await?;
let mut registry = Registry::with_prefix("kaniop");
let config = Config::infer().await?;
let client = new_client_with_metrics(config, &mut registry).await?;
let controllers = [
kaniop_group::controller::CONTROLLER_ID,
kaniop_operator::kanidm::controller::CONTROLLER_ID,
kaniop_oauth2::controller::CONTROLLER_ID,
kaniop_person::controller::CONTROLLER_ID,
];
let namespace = check_api_queryable::<Namespace>(client.clone()).await;
let namespace_r = create_subscriber::<Namespace>(SUBSCRIBE_BUFFER_SIZE);
let kanidm = check_api_queryable::<Kanidm>(client.clone()).await;
let kanidm_r = create_subscriber::<Kanidm>(SUBSCRIBE_BUFFER_SIZE);
let state = KaniopState::new(
registry,
&controllers,
namespace_r.store.clone(),
kanidm_r.store.clone(),
);
let kanidm_c = kaniop_operator::kanidm::controller::run(
state.clone(),
client.clone(),
namespace,
namespace_r,
kanidm,
kanidm_r,
);
let group_c = kaniop_group::controller::run(state.clone(), client.clone());
let oauth2_c = kaniop_oauth2::controller::run(state.clone(), client.clone());
let person_c = kaniop_person::controller::run(state.clone(), client);
let app = Router::new()
.route("/metrics", get(metrics))
.route("/health", get(health))
.with_state(state.clone());
let listener = TcpListener::bind(format!("0.0.0.0:{}", args.port)).await?;
let server = axum::serve(listener, app).with_graceful_shutdown(shutdown_signal());
tokio::join!(group_c, kanidm_c, oauth2_c, person_c, server).4?;
Ok(())
}
async fn shutdown_signal() {
let mut sigterm =
signal(SignalKind::terminate()).expect("failed to install SIGTERM signal handler");
tokio::select! {
_ = tokio::signal::ctrl_c() => {},
_ = sigterm.recv() => {},
}
}