use orign::config::CONFIG;
use orign::create_app;
use orign::db;
use orign::humans::slack::server::run_slack_socket_server;
use orign::state::{AppState, MessageQueue};
use rdkafka::{admin::AdminClient, producer::FutureProducer, ClientConfig};
use rustls;
use sea_orm::DatabaseConnection;
use std::env;
use std::error::Error;
use std::sync::Arc;
use tokio::signal;
use tokio::sync::broadcast;
use tracing::debug;
use url::Url;
pub async fn execute(host: String, port: u16) -> Result<(), Box<dyn Error>> {
let (shutdown_tx, _) = broadcast::channel::<()>(1);
let app_state = Arc::new(build_app_state().await);
if let Ok(enabled) = std::env::var("ENABLE_SLACK_SOCKET") {
if enabled.to_lowercase() == "true" {
println!("Starting Slack socket mode server");
rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.expect("Failed to initialize rustls crypto provider");
let db_pool = app_state.db_pool.clone();
let shutdown_rx = shutdown_tx.subscribe();
run_slack_socket_server(db_pool, shutdown_rx)
.await
.map_err(|e| Box::new(e) as Box<dyn Error>)?;
println!("Slack socket mode server started");
}
}
let app = create_app().await?;
let addr = format!("{}:{}", host, port);
let listener = tokio::net::TcpListener::bind(&addr).await?;
println!("Server running at http://{}", addr);
let shutdown_signal = async {
signal::ctrl_c()
.await
.expect("Failed to install CTRL+C signal handler");
println!("Shutdown signal received, shutting down gracefully...");
let _ = shutdown_tx.send(());
};
tokio::select! {
result = axum::serve(listener, app) => {
if let Err(e) = result {
eprintln!("Server error: {}", e);
}
}
_ = shutdown_signal => {
}
}
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
println!("Server shutdown complete");
Ok(())
}
async fn build_app_state() -> AppState {
let db_pool: DatabaseConnection = db::init_db()
.await
.expect("Failed to initialize database pool");
let message_queue = match CONFIG.message_queue_type.to_lowercase().as_str() {
"redis" => {
let redis_url = if let Some(url) = &CONFIG.redis_url {
if !url.is_empty() {
if let Ok(parsed_url) = Url::parse(url) {
if let Some(host) = parsed_url.host_str() {
env::set_var("REDIS_HOST", host);
}
if let Some(port) = parsed_url.port() {
env::set_var("REDIS_PORT", port.to_string());
} else {
env::set_var("REDIS_PORT", "6379");
}
if let Some(password) = parsed_url.password() {
env::set_var("REDIS_PASSWORD", password);
}
}
url.clone()
} else {
build_redis_url_from_components()
}
} else {
build_redis_url_from_components()
};
debug!("Redis URL: {}", redis_url);
let redis_client = Arc::new(
redis::Client::open(redis_url.as_str()).expect("Failed to create Redis client"),
);
MessageQueue::Redis {
client: redis_client,
}
}
"kafka" => {
let mut client_config = ClientConfig::new();
client_config
.set("bootstrap.servers", &CONFIG.kafka_bootstrap_servers)
.set("message.timeout.ms", &CONFIG.kafka_timeout_ms);
let producer = Arc::new(
client_config
.clone()
.create::<FutureProducer>()
.expect("Failed creating Kafka producer"),
);
let admin = Arc::new(
client_config
.create::<AdminClient<_>>()
.expect("Failed creating Kafka admin client"),
);
MessageQueue::Kafka { producer, admin }
}
other => {
panic!("Unsupported message queue type: {}", other);
}
};
AppState {
db_pool,
message_queue,
}
}
fn build_redis_url_from_components() -> String {
let host = &CONFIG.redis_host;
let port = &CONFIG.redis_port;
match &CONFIG.redis_password {
Some(password) if !password.is_empty() => {
format!("redis://:{}@{}:{}", password, host, port)
}
_ => format!("redis://{}:{}", host, port),
}
}