pub mod auth;
pub mod buffers;
pub mod config;
pub mod db;
pub mod entities;
pub mod errors;
pub mod handlers;
pub mod humans;
pub mod middleware;
pub mod models;
pub mod mutation;
pub mod org;
pub mod query;
pub mod resources;
pub mod routes;
pub mod state;
pub mod validate;
use crate::config::CONFIG;
use axum::Router;
use db::init_db;
use rdkafka::admin::AdminClient;
use rdkafka::producer::FutureProducer;
use rdkafka::ClientConfig;
use routes::create_routes;
use state::AppState;
use state::MessageQueue;
use std::env;
use std::sync::Arc;
use tower_http::cors::{Any, CorsLayer};
use tower_http::trace::TraceLayer;
use tracing::debug;
use url::Url;
pub async fn create_app() -> Result<Router, Box<dyn std::error::Error>> {
println!("Creating app");
let db_pool = init_db().await?;
println!("Database pool created");
let message_queue = match CONFIG.message_queue_type.to_lowercase().as_str() {
"redis" => {
let redis_url = if let Some(url) = &CONFIG.redis_url {
if !url.is_empty() {
if let Ok(parsed_url) = Url::parse(url) {
if let Some(host) = parsed_url.host_str() {
env::set_var("REDIS_HOST", host);
}
if let Some(port) = parsed_url.port() {
env::set_var("REDIS_PORT", port.to_string());
} else {
env::set_var("REDIS_PORT", "6379");
}
if let Some(password) = parsed_url.password() {
env::set_var("REDIS_PASSWORD", password);
}
}
url.clone()
} else {
build_redis_url_from_components()
}
} else {
build_redis_url_from_components()
};
debug!("Redis URL: {}", redis_url);
let redis_client = Arc::new(redis::Client::open(redis_url.as_str())?);
MessageQueue::Redis {
client: redis_client,
}
}
"kafka" => {
let mut client_config = ClientConfig::new();
let kafka_config = client_config
.set("bootstrap.servers", &CONFIG.kafka_bootstrap_servers)
.set("message.timeout.ms", &CONFIG.kafka_timeout_ms);
let producer = Arc::new(kafka_config.clone().create::<FutureProducer>()?);
let admin = Arc::new(kafka_config.create::<AdminClient<_>>()?);
MessageQueue::Kafka { producer, admin }
}
unsupported => {
return Err(format!("Unsupported message queue type: {}", unsupported).into())
}
};
let app_state = AppState {
db_pool: db_pool.clone(),
message_queue,
};
let routes = create_routes(app_state.clone());
let cors = CorsLayer::new()
.allow_origin(Any) .allow_methods(Any)
.allow_headers(Any);
let app = routes
.layer(TraceLayer::new_for_http())
.layer(cors)
.with_state(app_state);
Ok(app)
}
fn build_redis_url_from_components() -> String {
let host = &CONFIG.redis_host;
let port = &CONFIG.redis_port;
match &CONFIG.redis_password {
Some(password) if !password.is_empty() => {
format!("redis://:{}@{}:{}", password, host, port)
}
_ => format!("redis://{}:{}", host, port),
}
}