use clap::Parser;
use modelexpress_common::grpc::{
api::api_service_server::ApiServiceServer, health::health_service_server::HealthServiceServer,
model::model_service_server::ModelServiceServer, p2p::p2p_service_server::P2pServiceServer,
};
use modelexpress_server::{
cache::CacheEvictionService,
config::{ServerArgs, ServerConfig},
p2p::{service::P2pServiceImpl, state::P2pStateManager},
registry::state::RegistryManager,
services::{
ApiServiceImpl, HealthServiceImpl, ModelDownloadTracker, ModelServiceImpl,
init_model_tracker,
},
};
use std::sync::Arc;
use tonic::transport::Server;
use tracing::{error, info};
use tracing_subscriber::{EnvFilter, FmtSubscriber};
const MAX_MESSAGE_SIZE: usize = 100 * 1024 * 1024;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let args = ServerArgs::parse();
if args.validate_config {
match ServerConfig::load_and_validate_strict(args) {
Ok(config) => {
println!("Configuration is valid ✓");
config.print_config();
return Ok(());
}
Err(e) => {
eprintln!("Configuration validation failed: {e}");
std::process::exit(1);
}
}
}
let config = ServerConfig::load(args)?;
let log_level = config.log_level();
let subscriber = FmtSubscriber::builder()
.with_env_filter(EnvFilter::from_default_env())
.with_max_level(log_level)
.finish();
tracing::subscriber::set_global_default(subscriber)?;
info!("Starting ModelExpress server...");
config.print_config();
let addr = config.socket_addr().map_err(|e| {
error!("Invalid server address: {e}");
e
})?;
let registry = Arc::new(RegistryManager::new());
match tokio::time::timeout(std::time::Duration::from_secs(10), registry.connect()).await {
Ok(Ok(backend)) => info!("Model registry connected (backend: {backend})"),
Ok(Err(e)) => {
error!("Failed to connect to model registry backend: {}", e);
return Err(e.to_string().into());
}
Err(_) => {
error!("Timed out connecting to model registry backend");
return Err("model registry backend connection timed out".into());
}
}
let tracker = Arc::new(ModelDownloadTracker::new(registry.clone()));
init_model_tracker(tracker)
.map_err(|e| -> Box<dyn std::error::Error + Send + Sync> { e.into() })?;
let cache_service = CacheEvictionService::new(
registry.clone(),
config.cache.eviction.clone(),
config.cache.directory.clone(),
);
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
let cache_handle = if config.cache.eviction.enabled {
info!("Starting cache eviction service...");
Some(tokio::spawn(async move {
if let Err(e) = cache_service.start(shutdown_rx).await {
error!("Cache eviction service error: {e}");
}
}))
} else {
info!("Cache eviction service is disabled");
None
};
let health_service = HealthServiceImpl;
let api_service = ApiServiceImpl;
let model_service = ModelServiceImpl;
let (health_reporter, health_service_v1) = tonic_health::server::health_reporter();
health_reporter
.set_serving::<HealthServiceServer<HealthServiceImpl>>()
.await;
health_reporter
.set_serving::<ApiServiceServer<ApiServiceImpl>>()
.await;
health_reporter
.set_serving::<ModelServiceServer<ModelServiceImpl>>()
.await;
health_reporter
.set_serving::<P2pServiceServer<P2pServiceImpl>>()
.await;
let p2p_state = Arc::new(P2pStateManager::new());
match tokio::time::timeout(std::time::Duration::from_secs(10), p2p_state.connect()).await {
Ok(Ok(backend)) => info!("P2P state manager connected (backend: {backend})"),
Ok(Err(e)) => {
error!("Failed to connect to P2P metadata backend: {}", e);
return Err(e);
}
Err(_) => {
error!("Timed out connecting to P2P metadata backend");
return Err("P2P metadata backend connection timed out".into());
}
}
let p2p_service = P2pServiceImpl::new(p2p_state.clone());
let (reaper_shutdown_tx, reaper_shutdown_rx) = tokio::sync::oneshot::channel();
let reaper_state = p2p_state.clone();
let reaper_handle = tokio::spawn(async move {
modelexpress_server::p2p::reaper::run_reaper(reaper_state, reaper_shutdown_rx).await;
});
let shutdown_signal = async move {
if let Err(e) = tokio::signal::ctrl_c().await {
error!("Failed to install CTRL+C signal handler: {e}");
return;
}
info!("Received CTRL+C, shutting down gracefully...");
if shutdown_tx.send(()).is_err() {
error!("Failed to send shutdown signal to cache eviction service");
}
if reaper_shutdown_tx.send(()).is_err() {
error!("Failed to send shutdown signal to reaper");
}
};
info!("Starting gRPC server on: {addr}");
let server_result = Server::builder()
.add_service(health_service_v1)
.add_service(HealthServiceServer::new(health_service))
.add_service(ApiServiceServer::new(api_service))
.add_service(ModelServiceServer::new(model_service))
.add_service(
P2pServiceServer::new(p2p_service)
.max_decoding_message_size(MAX_MESSAGE_SIZE)
.max_encoding_message_size(MAX_MESSAGE_SIZE),
)
.serve_with_shutdown(addr, shutdown_signal)
.await;
if let Some(handle) = cache_handle
&& let Err(e) = handle.await
{
error!("Cache eviction service join error: {e}");
}
if let Err(e) = reaper_handle.await {
error!("Reaper join error: {e}");
}
server_result?;
info!("Server shutdown complete");
Ok(())
}