orign 0.2.3

A globally distributed container orchestrator
Documentation
// src/commands/serve.rs

use orign::config::CONFIG;
use orign::create_app;
use orign::db;
use orign::humans::slack::server::run_slack_socket_server;
use orign::state::{AppState, MessageQueue};
use rdkafka::{admin::AdminClient, producer::FutureProducer, ClientConfig};
use rustls;
use sea_orm::DatabaseConnection;
use std::env;
use std::error::Error;
use std::sync::Arc;
use tokio::signal;
use tokio::sync::broadcast;
use tracing::debug;
use url::Url;

pub async fn execute(host: String, port: u16) -> Result<(), Box<dyn Error>> {
    // Create a shutdown signal channel
    let (shutdown_tx, _) = broadcast::channel::<()>(1);

    // Build the app
    let app_state = Arc::new(build_app_state().await);

    // Start the Slack socket mode server if enabled
    if let Ok(enabled) = std::env::var("ENABLE_SLACK_SOCKET") {
        if enabled.to_lowercase() == "true" {
            println!("Starting Slack socket mode server");

            rustls::crypto::aws_lc_rs::default_provider()
                .install_default()
                .expect("Failed to initialize rustls crypto provider");

            // Clone the database connection for the Slack socket
            let db_pool = app_state.db_pool.clone();
            let shutdown_rx = shutdown_tx.subscribe();
            run_slack_socket_server(db_pool, shutdown_rx)
                .await
                .map_err(|e| Box::new(e) as Box<dyn Error>)?;
            println!("Slack socket mode server started");
        }
    }

    let app = create_app().await?;
    // Run it
    let addr = format!("{}:{}", host, port);
    let listener = tokio::net::TcpListener::bind(&addr).await?;
    println!("Server running at http://{}", addr);

    // Handle shutdown gracefully
    let shutdown_signal = async {
        signal::ctrl_c()
            .await
            .expect("Failed to install CTRL+C signal handler");
        println!("Shutdown signal received, shutting down gracefully...");
        let _ = shutdown_tx.send(());
    };

    tokio::select! {
        result = axum::serve(listener, app) => {
            if let Err(e) = result {
                eprintln!("Server error: {}", e);
            }
        }
        _ = shutdown_signal => {
            // Will trigger the shutdown
        }
    }

    // Give a moment for shutdown tasks to complete
    tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
    println!("Server shutdown complete");
    Ok(())
}

async fn build_app_state() -> AppState {
    // Initialize DB pool
    let db_pool: DatabaseConnection = db::init_db()
        .await
        .expect("Failed to initialize database pool");

    // Pick a queue type, e.g. "redis" or "kafka"
    let message_queue = match CONFIG.message_queue_type.to_lowercase().as_str() {
        "redis" => {
            let redis_url = if let Some(url) = &CONFIG.redis_url {
                if !url.is_empty() {
                    // Redis URL exists, so use it directly but also parse it to set env vars
                    if let Ok(parsed_url) = Url::parse(url) {
                        // Extract and set host
                        if let Some(host) = parsed_url.host_str() {
                            env::set_var("REDIS_HOST", host);
                        }

                        // Extract and set port
                        if let Some(port) = parsed_url.port() {
                            env::set_var("REDIS_PORT", port.to_string());
                        } else {
                            // Default redis port if not specified in URL
                            env::set_var("REDIS_PORT", "6379");
                        }

                        // Extract and set password if present
                        if let Some(password) = parsed_url.password() {
                            env::set_var("REDIS_PASSWORD", password);
                        }
                    }

                    url.clone()
                } else {
                    build_redis_url_from_components()
                }
            } else {
                build_redis_url_from_components()
            };

            debug!("Redis URL: {}", redis_url);

            // Create the Redis client using the constructed URL
            let redis_client = Arc::new(
                redis::Client::open(redis_url.as_str()).expect("Failed to create Redis client"),
            );

            MessageQueue::Redis {
                client: redis_client,
            }
        }
        "kafka" => {
            let mut client_config = ClientConfig::new();
            client_config
                .set("bootstrap.servers", &CONFIG.kafka_bootstrap_servers)
                .set("message.timeout.ms", &CONFIG.kafka_timeout_ms);

            let producer = Arc::new(
                client_config
                    .clone()
                    .create::<FutureProducer>()
                    .expect("Failed creating Kafka producer"),
            );
            let admin = Arc::new(
                client_config
                    .create::<AdminClient<_>>()
                    .expect("Failed creating Kafka admin client"),
            );

            MessageQueue::Kafka { producer, admin }
        }
        other => {
            panic!("Unsupported message queue type: {}", other);
        }
    };

    AppState {
        db_pool,
        message_queue,
    }
}

fn build_redis_url_from_components() -> String {
    // Redis URL not present or empty, build from components
    let host = &CONFIG.redis_host;
    let port = &CONFIG.redis_port;

    match &CONFIG.redis_password {
        Some(password) if !password.is_empty() => {
            format!("redis://:{}@{}:{}", password, host, port)
        }
        _ => format!("redis://{}:{}", host, port),
    }
}