#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tracing::info;
use tracing_subscriber::EnvFilter;
use nodedb::ServerConfig;
use nodedb::bridge::dispatch::Dispatcher;
use nodedb::config::server::apply_env_overrides;
use nodedb::control::state::SharedState;
use nodedb::data::runtime::spawn_core;
use nodedb::wal::WalManager;
fn build_tls_acceptor(
tls: &nodedb::config::server::TlsSettings,
) -> anyhow::Result<pgwire::tokio::TlsAcceptor> {
use std::fs::File;
use std::io::BufReader;
let cert_file = File::open(&tls.cert_path)
.map_err(|e| anyhow::anyhow!("failed to open TLS cert {}: {e}", tls.cert_path.display()))?;
let key_file = File::open(&tls.key_path)
.map_err(|e| anyhow::anyhow!("failed to open TLS key {}: {e}", tls.key_path.display()))?;
let certs: Vec<_> = rustls_pemfile::certs(&mut BufReader::new(cert_file))
.collect::<Result<Vec<_>, _>>()
.map_err(|e| anyhow::anyhow!("failed to parse TLS certs: {e}"))?;
let key = rustls_pemfile::private_key(&mut BufReader::new(key_file))
.map_err(|e| anyhow::anyhow!("failed to parse TLS key: {e}"))?
.ok_or_else(|| anyhow::anyhow!("no private key found in {}", tls.key_path.display()))?;
let server_config = tokio_rustls::rustls::ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(certs, key)
.map_err(|e| anyhow::anyhow!("TLS config error: {e}"))?;
Ok(pgwire::tokio::TlsAcceptor::from(Arc::new(server_config)))
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let config_path: Option<PathBuf> = std::env::args()
.nth(1)
.map(PathBuf::from)
.or_else(|| std::env::var("NODEDB_CONFIG").ok().map(PathBuf::from));
let mut config = match config_path {
Some(ref path) => ServerConfig::from_file(path)?,
None => ServerConfig::default(),
};
apply_env_overrides(&mut config);
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("warn"));
if config.log_format == "json" {
tracing_subscriber::fmt()
.with_env_filter(filter)
.json()
.init();
} else {
tracing_subscriber::fmt().with_env_filter(filter).init();
}
apply_env_overrides(&mut config);
match &config_path {
None => info!("no config file provided, using defaults"),
Some(path)
if std::env::var("NODEDB_CONFIG").is_ok() && std::env::args().nth(1).is_none() =>
{
info!(
path = %path.display(),
"config file loaded from NODEDB_CONFIG"
);
}
Some(_) => {}
}
info!(
listen = %config.listen,
cores = config.data_plane_cores,
memory_limit = config.memory_limit,
"starting nodedb"
);
config.engines.validate()?;
let byte_budgets = config.engines.to_byte_budgets(config.memory_limit);
let _governor = nodedb::memory::init_governor(config.memory_limit, &byte_budgets)?;
let wal_segment_target = config.checkpoint.wal_segment_target_bytes();
let wal = {
let mut mgr = WalManager::open_with_tuning(
&config.wal_dir(),
false,
wal_segment_target,
&config.tuning.wal,
)?;
if let Some(ref enc) = config.encryption {
let key = nodedb_wal::crypto::WalEncryptionKey::from_file(&enc.key_path)
.map_err(nodedb::Error::Wal)?;
mgr.set_encryption_ring(nodedb_wal::crypto::KeyRing::new(key));
info!(key_path = %enc.key_path.display(), "WAL encryption enabled");
}
Arc::new(mgr)
};
info!(next_lsn = %wal.next_lsn(), "WAL ready");
let wal_records: Arc<[nodedb_wal::WalRecord]> = match wal.replay() {
Ok(records) => {
if !records.is_empty() {
info!(records = records.len(), "WAL records loaded for replay");
}
Arc::from(records.into_boxed_slice())
}
Err(e) => {
tracing::warn!(error = %e, "WAL replay failed, starting with empty state");
Arc::from(Vec::new().into_boxed_slice())
}
};
let num_cores = config.data_plane_cores;
let (mut dispatcher, data_sides) = Dispatcher::new(num_cores, 1024);
let compaction_cfg = nodedb::data::runtime::CoreCompactionConfig {
interval: config.checkpoint.compaction_interval(),
tombstone_threshold: config.checkpoint.compaction_tombstone_threshold,
query: config.tuning.query.clone(),
};
let system_metrics = Arc::new(nodedb::control::metrics::SystemMetrics::new());
let mut core_handles = Vec::with_capacity(num_cores);
let mut notifiers = Vec::with_capacity(num_cores);
for (core_id, data_side) in data_sides.into_iter().enumerate() {
let (handle, notifier) = spawn_core(
core_id,
data_side.request_rx,
data_side.response_tx,
&config.data_dir,
Arc::clone(&wal_records),
num_cores,
compaction_cfg.clone(),
Some(Arc::clone(&system_metrics)),
)?;
core_handles.push(handle);
notifiers.push((core_id, notifier));
}
for (core_id, notifier) in ¬ifiers {
dispatcher.set_notifier(*core_id, *notifier);
}
info!(num_cores, "data plane cores running (eventfd-driven)");
let cluster_handle = if let Some(ref cluster_cfg) = config.cluster {
cluster_cfg
.validate()
.map_err(|e| anyhow::anyhow!("cluster config: {e}"))?;
let handle = nodedb::control::cluster::init_cluster(
cluster_cfg,
&config.data_dir,
&config.tuning.cluster_transport,
)
.await?;
Some(handle)
} else {
None
};
let mut shared = SharedState::open(
dispatcher,
Arc::clone(&wal),
&config.catalog_path(),
&config.auth,
config.tuning.clone(),
)?;
if let Some(ref jwt_config) = config.auth.jwt
&& !jwt_config.providers.is_empty()
&& let Some(state) = Arc::get_mut(&mut shared)
{
let registry = tokio::runtime::Handle::current().block_on(
nodedb::control::security::jwks::registry::JwksRegistry::init(jwt_config.clone()),
);
state.jwks_registry = Some(std::sync::Arc::new(registry));
info!(
"JWKS registry initialised with {} providers",
jwt_config.providers.len()
);
}
if let Some(ref cold_settings) = config.cold_storage {
let cold_config = cold_settings.to_cold_storage_config();
match nodedb::storage::cold::ColdStorage::new(cold_config) {
Ok(cold) => {
if let Some(state) = Arc::get_mut(&mut shared) {
state.cold_storage = Some(Arc::new(cold));
info!("cold storage (L2 tiering) initialised");
} else {
tracing::warn!(
"cold storage: Arc::get_mut failed (unexpected clone), skipping"
);
}
}
Err(e) => {
tracing::warn!(error = %e, "cold storage init failed, tiering disabled");
}
}
}
let auth_mode = config.auth.mode.clone();
match config.auth.resolve_superuser_password() {
Ok(Some(password)) => {
shared
.credentials
.bootstrap_superuser(&config.auth.superuser_name, &password)?;
info!(
user = config.auth.superuser_name,
mode = ?auth_mode,
"superuser bootstrapped"
);
}
Ok(None) => {
info!(mode = ?auth_mode, "auth mode: trust (no authentication)");
}
Err(e) => {
return Err(e.into());
}
}
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
if let Some(ref handle) = cluster_handle {
nodedb::control::cluster::start_raft(
handle,
Arc::clone(&shared),
&config.data_dir,
shutdown_rx.clone(),
&config.tuning.cluster_transport,
)?;
}
let shared_poller = Arc::clone(&shared);
tokio::spawn(async move {
loop {
shared_poller.poll_and_route_responses();
tokio::task::yield_now().await;
}
});
nodedb::control::event_trigger::spawn_event_trigger_processor(Arc::clone(&shared));
let shared_rate = Arc::clone(&shared);
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
shared_rate.reset_tenant_rate_counters();
}
});
let shared_audit = Arc::clone(&shared);
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(10)).await;
shared_audit.flush_audit_log();
}
});
let shared_mem = Arc::clone(&shared);
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(30)).await;
shared_mem.update_tenant_memory_estimates();
}
});
let shared_ckpt = Arc::clone(&shared);
let shutdown_rx_ckpt = shutdown_rx.clone();
nodedb::control::checkpoint_manager::spawn_checkpoint_task(
shared_ckpt,
num_cores,
config.checkpoint.to_manager_config(),
shutdown_rx_ckpt,
);
if let Some(ref cold_settings) = config.cold_storage {
let shared_cold = Arc::clone(&shared);
let cold_settings_clone = cold_settings.clone();
let data_dir_clone = config.data_dir.clone();
let shutdown_rx_cold = shutdown_rx.clone();
nodedb::control::cold_tier::spawn_cold_tier_task(
shared_cold,
cold_settings_clone,
data_dir_clone,
shutdown_rx_cold,
);
info!("cold tier task spawned");
}
let conn_semaphore = Arc::new(tokio::sync::Semaphore::new(config.max_connections));
info!(
max_connections = config.max_connections,
"connection limit configured"
);
let listener = nodedb::control::server::listener::Listener::bind(config.listen).await?;
let pg_listener =
nodedb::control::server::pgwire::listener::PgListener::bind(config.pg_listen).await?;
let ilp_listener = if let Some(ilp_addr) = config.ilp_listen {
Some(nodedb::control::server::ilp_listener::IlpListener::bind(ilp_addr).await?)
} else {
None
};
let resp_listener = if let Some(resp_addr) = config.resp_listen {
Some(nodedb::control::server::resp::RespListener::bind(resp_addr).await?)
} else {
None
};
eprintln!();
eprintln!(" NodeDB v{}", env!("CARGO_PKG_VERSION"));
eprintln!(" ─────────────────────────────────────");
eprintln!(" Native protocol : {}", config.listen);
eprintln!(" PostgreSQL wire : {}", config.pg_listen);
eprintln!(" HTTP API : {}", config.http_listen);
if let Some(addr) = config.resp_listen {
eprintln!(" RESP (KV) : {addr}");
}
if let Some(addr) = config.ilp_listen {
eprintln!(" ILP ingest : {addr}");
}
eprintln!(" Data Plane cores: {}", config.data_plane_cores);
eprintln!(" Data directory : {}", config.data_dir.display());
eprintln!(" Auth mode : {:?}", config.auth.mode);
eprintln!();
eprintln!(" Press Ctrl+C to stop.");
eprintln!();
let max_conns = config.max_connections;
let sem_clone = Arc::clone(&conn_semaphore);
let shared_signal = Arc::clone(&shared);
tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
let active = max_conns - sem_clone.available_permits();
if active > 0 {
eprintln!();
eprintln!(
" {} active connection(s). Draining (30s timeout)...",
active
);
eprintln!(" Press Ctrl+C again to force stop.");
} else {
eprintln!("\n Shutting down...");
}
let shapes = shared_signal.shape_registry.export_all();
if !shapes.is_empty() {
tracing::info!(shapes = shapes.len(), "persisting shape subscriptions");
}
let _ = shutdown_tx.send(true);
tokio::signal::ctrl_c().await.ok();
eprintln!(" Force stop.");
std::process::exit(1);
});
let tls_acceptor = match &config.tls {
Some(tls) => {
let check_interval = Duration::from_secs(tls.cert_reload_interval_secs.unwrap_or(3600));
let (_tls_rx, _tls_tx) = nodedb::control::server::tls_reload::start_tls_reloader(
tls,
check_interval,
Arc::clone(&shared),
)?;
let acceptor = build_tls_acceptor(tls)?;
info!(
reload_interval_secs = check_interval.as_secs(),
"pgwire TLS enabled with hot rotation"
);
Some(acceptor)
}
None => None,
};
let shared_pg = Arc::clone(&shared);
let shutdown_rx_pg = shutdown_rx.clone();
let conn_sem_pg = Arc::clone(&conn_semaphore);
tokio::spawn(async move {
if let Err(e) = pg_listener
.run(
shared_pg,
auth_mode,
tls_acceptor,
conn_sem_pg,
shutdown_rx_pg,
)
.await
{
tracing::error!(error = %e, "pgwire listener failed");
}
});
let shared_http = Arc::clone(&shared);
let http_auth_mode = config.auth.mode.clone();
let http_listen = config.http_listen;
let http_tls = config.tls.clone();
let shutdown_rx_http = shutdown_rx.clone();
tokio::spawn(async move {
if let Err(e) = nodedb::control::server::http::server::run(
http_listen,
shared_http,
http_auth_mode,
http_tls.as_ref(),
shutdown_rx_http,
)
.await
{
tracing::error!(error = %e, "HTTP API server failed");
}
});
if let Some(ilp) = ilp_listener {
let shared_ilp = Arc::clone(&shared);
let conn_sem_ilp = Arc::clone(&conn_semaphore);
let shutdown_rx_ilp = shutdown_rx.clone();
tokio::spawn(async move {
if let Err(e) = ilp.run(shared_ilp, conn_sem_ilp, shutdown_rx_ilp).await {
tracing::error!(error = %e, "ILP listener failed");
}
});
}
if let Some(resp) = resp_listener {
let shared_resp = Arc::clone(&shared);
let conn_sem_resp = Arc::clone(&conn_semaphore);
let shutdown_rx_resp = shutdown_rx.clone();
tokio::spawn(async move {
if let Err(e) = resp.run(shared_resp, conn_sem_resp, shutdown_rx_resp).await {
tracing::error!(error = %e, "RESP listener failed");
}
});
}
let sync_config = nodedb::control::server::sync::listener::SyncListenerConfig::default();
match nodedb::control::server::sync::listener::start_sync_listener(
sync_config,
Some(Arc::clone(&shared)),
)
.await
{
Ok(sync_state) => {
info!(
addr = %sync_state.config.listen_addr,
max_sessions = sync_state.config.max_sessions,
"sync WebSocket listener started"
);
}
Err(e) => {
tracing::warn!(error = %e, "sync listener failed to start (non-fatal)");
}
}
let native_tls: Option<tokio_rustls::TlsAcceptor> = match &config.tls {
Some(tls) => {
let acceptor = build_tls_acceptor(tls)?;
Some(acceptor)
}
None => None,
};
let native_auth_mode = config.auth.mode.clone();
listener
.run(
shared,
native_auth_mode,
native_tls,
conn_semaphore,
shutdown_rx,
)
.await?;
info!("server shutting down");
std::process::exit(0);
}