use std::net::SocketAddr;
use std::time::Duration;
use clap::Parser;
use kameo::actor::Spawn;
use libp2p::identity::Keypair;
use sierradb::database::DatabaseBuilder;
use sierradb_cluster::{ClusterActor, ClusterArgs};
use sierradb_server::config::{AppConfig, Args};
use sierradb_server::server::Server;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info};
use tracing_subscriber::EnvFilter;
#[cfg(debug_assertions)]
const DEFAULT_ENV_FILTER: &str = "sierradb_cluster=DEBUG,sierradb_server=DEBUG,sierradb=DEBUG,INFO";
#[cfg(not(debug_assertions))]
const DEFAULT_ENV_FILTER: &str = "INFO";
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
tracing_subscriber::fmt::SubscriberBuilder::default()
.without_time()
.with_target(true)
.with_env_filter(EnvFilter::new(
args.log.as_deref().unwrap_or(DEFAULT_ENV_FILTER),
))
.init();
let config = AppConfig::load(args)?;
debug!("configuration:\n{config}");
let errs = config.validate()?;
let has_errs = !errs.is_empty();
for err in errs {
error!("config error: {err}");
}
if has_errs {
std::process::exit(1);
}
let assigned_buckets = config.assigned_buckets()?;
let assigned_partitions = config.assigned_partitions(&assigned_buckets);
let mut builder = DatabaseBuilder::new();
builder
.segment_size_bytes(config.segment.size_bytes)
.compression(config.segment.compression)
.total_buckets(config.bucket.count)
.bucket_ids(assigned_buckets.into_iter().collect::<Vec<_>>())
.sync_interval(Duration::from_millis(config.sync.interval_ms))
.sync_idle_interval(config.effective_idle_interval())
.max_batch_size(config.sync.max_batch_size)
.min_sync_bytes(config.sync.min_bytes)
.cache_capacity_bytes(config.cache.capacity_bytes);
if let Some(count) = config.threads.read {
builder.reader_threads(count);
}
if let Some(count) = config.threads.write {
builder.writer_threads(count);
}
let node_count = config.node_count()?;
let database = builder.open(config.dir)?;
let caches = database.reader_pool().caches().clone();
let keypair = Keypair::generate_ed25519();
let listen_addrs = if config.network.cluster_enabled {
vec![config.network.cluster_address]
} else {
vec![]
};
let cluster_ref = ClusterActor::spawn(ClusterArgs {
keypair,
database: database.clone(),
listen_addrs,
node_count,
node_index: config.node.index as usize,
bucket_count: config.bucket.count,
partition_count: config.partition.count,
replication_factor: config.replication.factor,
assigned_partitions,
heartbeat_timeout: Duration::from_millis(config.heartbeat.timeout_ms),
heartbeat_interval: Duration::from_millis(config.heartbeat.interval_ms),
replication_buffer_size: config.replication.buffer_size,
replication_buffer_timeout: Duration::from_millis(config.replication.buffer_timeout_ms),
replication_catchup_timeout: Duration::from_millis(config.replication.catchup_timeout_ms),
mdns: config.network.mdns,
});
let client_addr: SocketAddr = config.network.client_address.parse()?;
let shutdown = CancellationToken::new();
let server_handle = tokio::spawn({
let database = database.clone();
let shutdown = shutdown.clone();
async move {
match Server::new(
cluster_ref,
caches,
config.partition.count,
config.cache.capacity_bytes,
config.append.strict_versioning,
shutdown,
)
.listen(client_addr)
.await
{
Ok(conns) => conns,
Err(err) => {
error!("server failed: {err}");
database.shutdown().await;
std::process::abort();
}
}
}
});
info!("ready to receive connections on {client_addr}");
tokio::select! {
_ = tokio::signal::ctrl_c() => {
info!("received SIGINT, shutting down gracefully");
shutdown.cancel();
}
_ = sigterm() => {
info!("received SIGTERM, shutting down gracefully");
shutdown.cancel();
}
}
match server_handle.await {
Ok(conns) => {
match tokio::time::timeout(Duration::from_secs(5), conns.join_all()).await {
Ok(results) => {
let conn_err = results.into_iter().find_map(|res| res.err());
if let Some(err) = conn_err {
error!("failed to gracefully stop connection(s): {err}");
}
}
Err(_) => {
error!("timed out waiting for connections to gracefully stop");
}
}
}
Err(_) => {
error!("server task exited unexpectedly");
}
}
database.shutdown().await;
info!("goodbye :)");
Ok(())
}
async fn sigterm() {
#[cfg(unix)]
{
use tokio::signal::unix::{SignalKind, signal};
let mut sigterm = signal(SignalKind::terminate()).expect("failed to setup SIGTERM handler");
sigterm.recv().await;
}
#[cfg(not(unix))]
{
std::future::pending::<()>().await
}
}