cedros-login-server 0.0.45

Authentication server for cedros-login with email/password, Google OAuth, and Solana wallet sign-in
Documentation
//! Cedros Login Server - Standalone binary
//!
//! Run with:
//! ```bash
//! cargo run
//! # or
//! cedros-login-server
//! ```

#[cfg(feature = "redis-rate-limit")]
use cedros_login::middleware::rate_limit::RedisRateLimitStore;
use cedros_login::services::{
    init_logging, init_metrics, DiscordNotificationService, LogEmailService,
    LogNotificationService, OutboxWorker, OutboxWorkerConfig, PostmarkEmailService,
    TelegramNotificationService,
};
use cedros_login::utils::TokenCipher;
use cedros_login::{
    create_micro_batch_worker, create_referral_payout_worker, create_withdrawal_worker,
    router_with_storage, Config, NoopCallback, Storage,
};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::signal;
use tokio_util::sync::CancellationToken;
use tracing::info;

fn is_production_like_environment(environment: &str) -> bool {
    let env_lc = environment.trim().to_ascii_lowercase();
    !matches!(env_lc.as_str(), "dev" | "development" | "local" | "test")
}

#[cfg(feature = "redis-rate-limit")]
async fn verify_rate_limit_backend(config: &Config) -> Result<(), std::io::Error> {
    if !config.rate_limit.enabled || config.rate_limit.store != "redis" {
        return Ok(());
    }

    let redis_url = config.rate_limit.redis_url.as_deref().ok_or_else(|| {
        std::io::Error::new(
            std::io::ErrorKind::InvalidInput,
            "REDIS_URL is required when RATE_LIMIT_STORE=redis",
        )
    })?;

    let store = RedisRateLimitStore::new(redis_url).map_err(|e| {
        std::io::Error::new(
            std::io::ErrorKind::InvalidInput,
            format!("Failed to configure Redis rate limit backend: {}", e),
        )
    })?;

    store.ping().await.map_err(|e| {
        std::io::Error::new(
            std::io::ErrorKind::ConnectionRefused,
            format!("Failed to connect to Redis rate limit backend: {}", e),
        )
    })?;

    Ok(())
}

#[cfg(not(feature = "redis-rate-limit"))]
async fn verify_rate_limit_backend(_config: &Config) -> Result<(), std::io::Error> {
    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Load environment variables from .env file
    dotenvy::dotenv().ok();

    // Initialize tracing with reloadable filter (allows runtime log level changes)
    let logging_service = init_logging("cedros_login=info,tower_http=info,axum=info");

    // Initialize Prometheus metrics
    let _metrics_handle = init_metrics();

    // Load configuration from environment (infrastructure + defaults)
    let config = Config::from_env()?;
    if config.database.url.is_none()
        && is_production_like_environment(&config.notification.environment)
    {
        return Err(std::io::Error::new(
            std::io::ErrorKind::InvalidInput,
            "DATABASE_URL is required in production-like environments",
        )
        .into());
    }
    verify_rate_limit_backend(&config).await?;

    let addr = format!("{}:{}", config.server.host, config.server.port);

    info!("Starting Cedros Login Server");

    // Initialize storage (Postgres if DATABASE_URL is set, otherwise in-memory)
    let storage = Storage::from_config(&config.database).await?;
    if config.database.url.is_some() {
        info!("Using PostgreSQL storage");
    } else {
        info!("Using in-memory storage (set DATABASE_URL for PostgreSQL)");
    }

    info!("Listening on {}", addr);

    // Create shutdown channel for cleanup task
    let (cleanup_shutdown_tx, cleanup_shutdown_rx) = tokio::sync::watch::channel(false);

    // Start background cleanup task (runs every hour)
    const CLEANUP_INTERVAL_SECS: u64 = 3600;
    let cleanup_handle = storage.start_cleanup_task(CLEANUP_INTERVAL_SECS, cleanup_shutdown_rx);

    // Start outbox worker for async email/notification delivery
    // Configure email service based on environment
    let email_service: Arc<dyn cedros_login::services::EmailService> = match (
        std::env::var("POSTMARK_API_TOKEN"),
        std::env::var("POSTMARK_FROM_EMAIL"),
    ) {
        (Ok(api_token), Ok(from_email)) => {
            info!("Using Postmark email service");
            Arc::new(PostmarkEmailService::try_new(api_token, from_email)?)
        }
        _ => {
            info!("Using log-only email service (set POSTMARK_API_TOKEN and POSTMARK_FROM_EMAIL for real emails)");
            Arc::new(LogEmailService::new())
        }
    };

    // Configure notification service based on environment
    let notification_service: Arc<dyn cedros_login::services::NotificationService> = if config
        .notification
        .discord_enabled()
    {
        info!("Using Discord notification service");
        let webhook_url = config
            .notification
            .discord_webhook_url
            .clone()
            .ok_or("discord_webhook_url required when DISCORD_WEBHOOK_URL is set")?;
        Arc::new(DiscordNotificationService::new(
            webhook_url,
            config.notification.environment.clone(),
        ))
    } else if config.notification.telegram_enabled() {
        info!("Using Telegram notification service");
        let bot_token = config
            .notification
            .telegram_bot_token
            .clone()
            .ok_or("telegram_bot_token required when TELEGRAM_BOT_TOKEN is set")?;
        let chat_id = config
            .notification
            .telegram_chat_id
            .clone()
            .ok_or("telegram_chat_id required when TELEGRAM_CHAT_ID is set")?;
        Arc::new(TelegramNotificationService::new(
            bot_token,
            chat_id,
            config.notification.environment.clone(),
        ))
    } else {
        info!("Using log-only notification service (set DISCORD_WEBHOOK_URL or TELEGRAM_BOT_TOKEN for real notifications)");
        Arc::new(LogNotificationService::new())
    };

    // Create cancellation token for graceful shutdown of background workers
    let cancel_token = CancellationToken::new();

    // Start the outbox worker with cancellation support
    let base_url = config
        .server
        .frontend_url
        .clone()
        .unwrap_or_else(|| "http://localhost:3000".to_string());
    let token_cipher = TokenCipher::new(&config.jwt.secret);
    // Create settings service for runtime-configurable values (from database)
    let settings_service = Arc::new(cedros_login::services::SettingsService::new(
        storage.system_settings_repo.clone(),
    ));

    let outbox_worker_handle = OutboxWorker::new(
        storage.outbox_repo.clone(),
        email_service,
        notification_service.clone(),
        OutboxWorkerConfig::default(),
        base_url,
        token_cipher,
    )
    .with_settings(settings_service.clone())
    .start(cancel_token.clone());

    // Refresh settings cache so runtime values are available immediately
    if let Err(e) = settings_service.refresh().await {
        tracing::warn!("Failed to load initial settings from database: {}", e);
    } else {
        info!("Loaded runtime settings from database (configure via admin dashboard)");

        // Apply logging settings from database (allows runtime log level changes)
        if let Err(e) = logging_service.apply_from_settings(&settings_service).await {
            tracing::warn!("Failed to apply logging settings: {}", e);
        }
    }

    // Start withdrawal worker for Privacy Cash deposits (if enabled)
    let withdrawal_worker_handle = create_withdrawal_worker(
        &config,
        &storage,
        settings_service.clone(),
        notification_service,
        cancel_token.clone(),
    );
    if withdrawal_worker_handle.is_some() {
        info!("Withdrawal worker started for Privacy Cash deposits");
    }

    // Start referral payout worker for automated on-chain referral payouts (if enabled)
    let referral_payout_worker_handle = create_referral_payout_worker(
        &config,
        &storage,
        settings_service.clone(),
        cancel_token.clone(),
    );
    if referral_payout_worker_handle.is_some() {
        info!("Referral payout worker started for automated payouts");
    }

    // Start micro batch worker for SOL micro deposits (if enabled)
    let micro_batch_worker_handle =
        create_micro_batch_worker(&config, &storage, settings_service, cancel_token.clone());
    if micro_batch_worker_handle.is_some() {
        info!("Micro batch worker started for SOL micro deposits");
    }

    // Create router with storage backend
    let callback = Arc::new(NoopCallback);
    let app = router_with_storage(config, callback, storage);

    // Start server with graceful shutdown
    let listener = TcpListener::bind(&addr).await?;
    axum::serve(
        listener,
        app.into_make_service_with_connect_info::<SocketAddr>(),
    )
    .with_graceful_shutdown(shutdown_signal())
    .await?;

    // Signal workers to shut down gracefully
    info!("Signaling workers to shut down...");
    cancel_token.cancel();
    let _ = cleanup_shutdown_tx.send(true);

    // Wait for outbox worker to finish current batch (with timeout)
    let shutdown_timeout = tokio::time::Duration::from_secs(30);
    if tokio::time::timeout(shutdown_timeout, outbox_worker_handle)
        .await
        .is_err()
    {
        info!("Outbox worker shutdown timed out");
    }

    // Wait for withdrawal worker to finish (if running)
    if let Some(handle) = withdrawal_worker_handle {
        if tokio::time::timeout(shutdown_timeout, handle)
            .await
            .is_err()
        {
            info!("Withdrawal worker shutdown timed out");
        }
    }

    // Wait for referral payout worker to finish (if running)
    if let Some(handle) = referral_payout_worker_handle {
        if tokio::time::timeout(shutdown_timeout, handle)
            .await
            .is_err()
        {
            info!("Referral payout worker shutdown timed out");
        }
    }

    // Wait for micro batch worker to finish (if running)
    if let Some(handle) = micro_batch_worker_handle {
        if tokio::time::timeout(shutdown_timeout, handle)
            .await
            .is_err()
        {
            info!("Micro batch worker shutdown timed out");
        }
    }

    // Wait for cleanup task to finish gracefully (with timeout)
    if tokio::time::timeout(shutdown_timeout, cleanup_handle)
        .await
        .is_err()
    {
        info!("Cleanup task shutdown timed out");
    }

    info!("Server shutdown complete");
    Ok(())
}

/// Create a future that resolves when a shutdown signal is received
async fn shutdown_signal() {
    let ctrl_c = async {
        match signal::ctrl_c().await {
            Ok(()) => {}
            Err(e) => {
                tracing::error!(error = %e, "Failed to install Ctrl+C handler, shutdown signal may not work properly");
                // Wait indefinitely since we can't listen for Ctrl+C
                std::future::pending::<()>().await;
            }
        }
    };

    #[cfg(unix)]
    let terminate = async {
        match signal::unix::signal(signal::unix::SignalKind::terminate()) {
            Ok(mut stream) => {
                stream.recv().await;
            }
            Err(e) => {
                tracing::error!(error = %e, "Failed to install SIGTERM handler, SIGTERM signal may not work properly");
                // Wait indefinitely since we can't listen for SIGTERM
                std::future::pending::<()>().await;
            }
        }
    };

    #[cfg(not(unix))]
    let terminate = std::future::pending::<()>();

    tokio::select! {
        _ = ctrl_c => {
            info!("Received Ctrl+C, initiating graceful shutdown...");
        }
        _ = terminate => {
            info!("Received SIGTERM, initiating graceful shutdown...");
        }
    }
}

#[cfg(test)]
mod tests {
    use super::is_production_like_environment;

    #[test]
    fn production_like_environment_detection() {
        assert!(is_production_like_environment("production"));
        assert!(is_production_like_environment("staging"));
        assert!(!is_production_like_environment("development"));
        assert!(!is_production_like_environment("test"));
    }
}