#[cfg(feature = "redis-rate-limit")]
use cedros_login::middleware::rate_limit::RedisRateLimitStore;
use cedros_login::services::{
init_logging, init_metrics, DiscordNotificationService, LogEmailService,
LogNotificationService, OutboxWorker, OutboxWorkerConfig, PostmarkEmailService,
TelegramNotificationService,
};
use cedros_login::utils::TokenCipher;
use cedros_login::{
create_micro_batch_worker, create_referral_payout_worker, create_withdrawal_worker,
router_with_storage, Config, NoopCallback, Storage,
};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::signal;
use tokio_util::sync::CancellationToken;
use tracing::info;
fn is_production_like_environment(environment: &str) -> bool {
let env_lc = environment.trim().to_ascii_lowercase();
!matches!(env_lc.as_str(), "dev" | "development" | "local" | "test")
}
#[cfg(feature = "redis-rate-limit")]
async fn verify_rate_limit_backend(config: &Config) -> Result<(), std::io::Error> {
if !config.rate_limit.enabled || config.rate_limit.store != "redis" {
return Ok(());
}
let redis_url = config.rate_limit.redis_url.as_deref().ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"REDIS_URL is required when RATE_LIMIT_STORE=redis",
)
})?;
let store = RedisRateLimitStore::new(redis_url).map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("Failed to configure Redis rate limit backend: {}", e),
)
})?;
store.ping().await.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::ConnectionRefused,
format!("Failed to connect to Redis rate limit backend: {}", e),
)
})?;
Ok(())
}
#[cfg(not(feature = "redis-rate-limit"))]
async fn verify_rate_limit_backend(_config: &Config) -> Result<(), std::io::Error> {
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenvy::dotenv().ok();
let logging_service = init_logging("cedros_login=info,tower_http=info,axum=info");
let _metrics_handle = init_metrics();
let config = Config::from_env()?;
if config.database.url.is_none()
&& is_production_like_environment(&config.notification.environment)
{
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"DATABASE_URL is required in production-like environments",
)
.into());
}
verify_rate_limit_backend(&config).await?;
let addr = format!("{}:{}", config.server.host, config.server.port);
info!("Starting Cedros Login Server");
let storage = Storage::from_config(&config.database).await?;
if config.database.url.is_some() {
info!("Using PostgreSQL storage");
} else {
info!("Using in-memory storage (set DATABASE_URL for PostgreSQL)");
}
info!("Listening on {}", addr);
let (cleanup_shutdown_tx, cleanup_shutdown_rx) = tokio::sync::watch::channel(false);
const CLEANUP_INTERVAL_SECS: u64 = 3600;
let cleanup_handle = storage.start_cleanup_task(CLEANUP_INTERVAL_SECS, cleanup_shutdown_rx);
let email_service: Arc<dyn cedros_login::services::EmailService> = match (
std::env::var("POSTMARK_API_TOKEN"),
std::env::var("POSTMARK_FROM_EMAIL"),
) {
(Ok(api_token), Ok(from_email)) => {
info!("Using Postmark email service");
Arc::new(PostmarkEmailService::try_new(api_token, from_email)?)
}
_ => {
info!("Using log-only email service (set POSTMARK_API_TOKEN and POSTMARK_FROM_EMAIL for real emails)");
Arc::new(LogEmailService::new())
}
};
let notification_service: Arc<dyn cedros_login::services::NotificationService> = if config
.notification
.discord_enabled()
{
info!("Using Discord notification service");
let webhook_url = config
.notification
.discord_webhook_url
.clone()
.ok_or("discord_webhook_url required when DISCORD_WEBHOOK_URL is set")?;
Arc::new(DiscordNotificationService::new(
webhook_url,
config.notification.environment.clone(),
))
} else if config.notification.telegram_enabled() {
info!("Using Telegram notification service");
let bot_token = config
.notification
.telegram_bot_token
.clone()
.ok_or("telegram_bot_token required when TELEGRAM_BOT_TOKEN is set")?;
let chat_id = config
.notification
.telegram_chat_id
.clone()
.ok_or("telegram_chat_id required when TELEGRAM_CHAT_ID is set")?;
Arc::new(TelegramNotificationService::new(
bot_token,
chat_id,
config.notification.environment.clone(),
))
} else {
info!("Using log-only notification service (set DISCORD_WEBHOOK_URL or TELEGRAM_BOT_TOKEN for real notifications)");
Arc::new(LogNotificationService::new())
};
let cancel_token = CancellationToken::new();
let base_url = config
.server
.frontend_url
.clone()
.unwrap_or_else(|| "http://localhost:3000".to_string());
let token_cipher = TokenCipher::new(&config.jwt.secret);
let settings_service = Arc::new(cedros_login::services::SettingsService::new(
storage.system_settings_repo.clone(),
));
let outbox_worker_handle = OutboxWorker::new(
storage.outbox_repo.clone(),
email_service,
notification_service.clone(),
OutboxWorkerConfig::default(),
base_url,
token_cipher,
)
.with_settings(settings_service.clone())
.start(cancel_token.clone());
if let Err(e) = settings_service.refresh().await {
tracing::warn!("Failed to load initial settings from database: {}", e);
} else {
info!("Loaded runtime settings from database (configure via admin dashboard)");
if let Err(e) = logging_service.apply_from_settings(&settings_service).await {
tracing::warn!("Failed to apply logging settings: {}", e);
}
}
let withdrawal_worker_handle = create_withdrawal_worker(
&config,
&storage,
settings_service.clone(),
notification_service,
cancel_token.clone(),
);
if withdrawal_worker_handle.is_some() {
info!("Withdrawal worker started for Privacy Cash deposits");
}
let referral_payout_worker_handle = create_referral_payout_worker(
&config,
&storage,
settings_service.clone(),
cancel_token.clone(),
);
if referral_payout_worker_handle.is_some() {
info!("Referral payout worker started for automated payouts");
}
let micro_batch_worker_handle =
create_micro_batch_worker(&config, &storage, settings_service, cancel_token.clone());
if micro_batch_worker_handle.is_some() {
info!("Micro batch worker started for SOL micro deposits");
}
let callback = Arc::new(NoopCallback);
let app = router_with_storage(config, callback, storage);
let listener = TcpListener::bind(&addr).await?;
axum::serve(
listener,
app.into_make_service_with_connect_info::<SocketAddr>(),
)
.with_graceful_shutdown(shutdown_signal())
.await?;
info!("Signaling workers to shut down...");
cancel_token.cancel();
let _ = cleanup_shutdown_tx.send(true);
let shutdown_timeout = tokio::time::Duration::from_secs(30);
if tokio::time::timeout(shutdown_timeout, outbox_worker_handle)
.await
.is_err()
{
info!("Outbox worker shutdown timed out");
}
if let Some(handle) = withdrawal_worker_handle {
if tokio::time::timeout(shutdown_timeout, handle)
.await
.is_err()
{
info!("Withdrawal worker shutdown timed out");
}
}
if let Some(handle) = referral_payout_worker_handle {
if tokio::time::timeout(shutdown_timeout, handle)
.await
.is_err()
{
info!("Referral payout worker shutdown timed out");
}
}
if let Some(handle) = micro_batch_worker_handle {
if tokio::time::timeout(shutdown_timeout, handle)
.await
.is_err()
{
info!("Micro batch worker shutdown timed out");
}
}
if tokio::time::timeout(shutdown_timeout, cleanup_handle)
.await
.is_err()
{
info!("Cleanup task shutdown timed out");
}
info!("Server shutdown complete");
Ok(())
}
async fn shutdown_signal() {
let ctrl_c = async {
match signal::ctrl_c().await {
Ok(()) => {}
Err(e) => {
tracing::error!(error = %e, "Failed to install Ctrl+C handler, shutdown signal may not work properly");
std::future::pending::<()>().await;
}
}
};
#[cfg(unix)]
let terminate = async {
match signal::unix::signal(signal::unix::SignalKind::terminate()) {
Ok(mut stream) => {
stream.recv().await;
}
Err(e) => {
tracing::error!(error = %e, "Failed to install SIGTERM handler, SIGTERM signal may not work properly");
std::future::pending::<()>().await;
}
}
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {
info!("Received Ctrl+C, initiating graceful shutdown...");
}
_ = terminate => {
info!("Received SIGTERM, initiating graceful shutdown...");
}
}
}
#[cfg(test)]
mod tests {
use super::is_production_like_environment;
#[test]
fn production_like_environment_detection() {
assert!(is_production_like_environment("production"));
assert!(is_production_like_environment("staging"));
assert!(!is_production_like_environment("development"));
assert!(!is_production_like_environment("test"));
}
}