use std::path::PathBuf;
use std::sync::Arc;
use tokio::net::UnixListener;
use tokio_stream::wrappers::UnixListenerStream;
use tonic::transport::Server;
use cfgd_csi::cache::Cache;
use cfgd_csi::csi::v1::identity_server::IdentityServer;
use cfgd_csi::csi::v1::node_server::NodeServer;
use cfgd_csi::identity::CfgdIdentity;
use cfgd_csi::metrics::CsiMetrics;
use cfgd_csi::node::CfgdNode;
fn env_or(key: &str, default: &str) -> String {
std::env::var(key).unwrap_or_else(|_| default.to_string())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.json()
.init();
let socket_path = env_or("CSI_ENDPOINT", "/csi/csi.sock");
let cache_dir = PathBuf::from(env_or("CACHE_DIR", "/var/lib/cfgd-csi/cache"));
let cache_max_str = env_or("CACHE_MAX_BYTES", "5368709120");
let cache_max: u64 = cache_max_str.parse().unwrap_or_else(|e| {
tracing::warn!(value = %cache_max_str, error = %e, "invalid CACHE_MAX_BYTES, using default 5GB");
5_368_709_120
});
let metrics_port_str = env_or("METRICS_PORT", "9090");
let metrics_port: u16 = metrics_port_str.parse().unwrap_or_else(|e| {
tracing::warn!(value = %metrics_port_str, error = %e, "invalid METRICS_PORT, using default 9090");
9090
});
let node_id = cfgd_core::hostname_string();
tracing::info!(
socket = %socket_path,
cache_dir = %cache_dir.display(),
cache_max_bytes = cache_max,
node_id = %node_id,
"starting cfgd-csi"
);
let cache = Arc::new(Cache::new(cache_dir.clone(), cache_max)?);
let mut registry = prometheus_client::registry::Registry::default();
let metrics = Arc::new(CsiMetrics::new(&mut registry));
let registry = Arc::new(registry);
let metrics_registry = registry.clone();
tokio::spawn(async move {
let app = axum::Router::new().route(
"/metrics",
axum::routing::get(move || {
let r = metrics_registry.clone();
async move {
let mut buf = String::new();
if prometheus_client::encoding::text::encode(&mut buf, &r).is_err() {
return (
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
"encoding error".to_string(),
);
}
(axum::http::StatusCode::OK, buf)
}
}),
);
let listener = match tokio::net::TcpListener::bind(("0.0.0.0", metrics_port)).await {
Ok(l) => l,
Err(e) => {
tracing::error!(port = metrics_port, error = %e, "failed to bind metrics port");
return;
}
};
tracing::info!(port = metrics_port, "metrics server listening");
if let Err(e) = axum::serve(listener, app).await {
tracing::error!(error = %e, "metrics server failed");
}
});
let _ = std::fs::remove_file(&socket_path);
if let Some(parent) = PathBuf::from(&socket_path).parent() {
std::fs::create_dir_all(parent)?;
}
let listener = UnixListener::bind(&socket_path)?;
let stream = UnixListenerStream::new(listener);
tracing::info!(socket = %socket_path, "gRPC server listening");
Server::builder()
.add_service(IdentityServer::new(CfgdIdentity::new(cache_dir)))
.add_service(NodeServer::new(CfgdNode::new(cache, metrics, node_id)))
.serve_with_incoming_shutdown(stream, async {
shutdown_signal().await;
tracing::info!("received shutdown signal, draining");
})
.await?;
tracing::info!("cfgd-csi stopped");
Ok(())
}
async fn shutdown_signal() {
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("failed to register SIGTERM handler");
let ctrl_c = tokio::signal::ctrl_c();
tokio::select! {
_ = sigterm.recv() => {}
_ = ctrl_c => {}
}
}