mod admin;
mod cmd;
mod config;
#[cfg(unix)]
mod daemon;
mod logic;
mod proxy;
#[allow(dead_code)]
mod proxy_lock;
mod relay_lock;
mod render;
mod state;
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use mcpr_core::proxy::forwarding::UpstreamClient;
use axum::Router;
use axum::extract::DefaultBodyLimit;
use tower_http::cors::{Any, CorsLayer};
use config::{CliAction, GatewayConfig, Mode};
use mcpr_core::protocol::schema_manager::{MemorySchemaStore, SchemaManager};
use mcpr_core::protocol::session::MemorySessionStore;
use mcpr_core::proxy::ProxyState;
use mcpr_core::proxy::RewriteConfig;
use mcpr_core::proxy::WidgetSource;
use mcpr_core::proxy::health::{self as proxy_health, SharedProxyHealth};
use proxy::proxy_routes;
use state::AppState;
static IS_DRAINING: AtomicBool = AtomicBool::new(false);
pub const DEFAULT_MAX_REQUEST_BODY_SIZE: usize = 5 * 1024 * 1024;
pub const DEFAULT_MAX_RESPONSE_BODY_SIZE: usize = 10 * 1024 * 1024;
pub const DEFAULT_MAX_CONCURRENT_UPSTREAM: usize = 100;
pub const DEFAULT_MAX_IDLE_UPSTREAM_PER_HOST: usize = 64;
pub const DEFAULT_CONNECT_TIMEOUT_SECS: u64 = 5;
pub const DEFAULT_REQUEST_TIMEOUT_SECS: u64 = 30;
pub fn build_app(app_state: AppState) -> Router {
let cors = CorsLayer::new()
.allow_origin(Any)
.allow_methods(Any)
.allow_headers(Any)
.expose_headers(Any);
let max_request = app_state.proxy.max_request_body;
let app: Router<AppState> = Router::new();
let app = proxy_routes(app);
app.with_state(app_state)
.layer(DefaultBodyLimit::max(max_request))
.layer(cors)
}
struct TunnelStatusAdapter(SharedProxyHealth);
impl mcpr_tunnel::TunnelStatusCallback for TunnelStatusAdapter {
fn on_connected(&self, _url: &str) {
proxy_health::lock_health(&self.0).tunnel_status =
proxy_health::ConnectionStatus::Connected;
}
fn on_disconnected(&self) {
proxy_health::lock_health(&self.0).tunnel_status =
proxy_health::ConnectionStatus::Disconnected;
}
fn on_evicted(&self) {
proxy_health::lock_health(&self.0).tunnel_status = proxy_health::ConnectionStatus::Evicted;
}
}
fn main() {
let mut action = config::load();
let ready_fd: Option<i32> = match &action {
CliAction::Start { foreground: true } => {
#[cfg(unix)]
daemon::stop_daemon_if_running();
None
}
CliAction::Start { foreground: false } => {
#[cfg(unix)]
{
if daemon::ensure_not_running() {
std::process::exit(0);
}
let fd = daemon::daemonize(Duration::from_secs(10)).unwrap_or_else(|e| {
eprintln!("error: {e}");
std::process::exit(1);
});
Some(fd)
}
#[cfg(not(unix))]
{
eprintln!("error: daemon mode is not supported on this platform");
eprintln!(" Use `mcpr start --foreground` or a service manager.");
std::process::exit(1);
}
}
CliAction::Restart { .. } => {
#[cfg(unix)]
{
let running_names: Vec<String> = proxy_lock::list_proxies()
.into_iter()
.filter(|(_, s)| matches!(s, proxy_lock::LockStatus::Held(_)))
.map(|(name, _)| name)
.collect();
let relay_was_running =
matches!(relay_lock::check_lock(), relay_lock::LockStatus::Held(_));
let stopped = proxy_lock::stop_all_proxies();
if !stopped.is_empty() {
eprintln!(
"Stopped {} managed proxy(ies): {}",
stopped.len(),
stopped.join(", ")
);
}
if relay_lock::stop_relay() {
eprintln!("Stopped relay.");
}
if let CliAction::Restart {
restart_proxies,
restart_relay,
..
} = &mut action
{
*restart_proxies = running_names;
*restart_relay = relay_was_running;
}
daemon::stop_daemon_if_running();
let fd = daemon::daemonize(Duration::from_secs(10)).unwrap_or_else(|e| {
eprintln!("error: {e}");
std::process::exit(1);
});
Some(fd)
}
#[cfg(not(unix))]
{
eprintln!("error: daemon management not supported on this platform");
std::process::exit(1);
}
}
CliAction::ProxyRun {
mode: Mode::Gateway(cfg),
replace,
config_content,
config_path: _,
} => {
#[cfg(unix)]
{
if !matches!(daemon::check_status(), daemon::DaemonStatus::Running(_)) {
eprintln!("error: daemon not running — run `mcpr start` first");
std::process::exit(1);
}
let proxy_name = &cfg.name;
match proxy_lock::check_lock(proxy_name) {
proxy_lock::LockStatus::Free => {}
proxy_lock::LockStatus::Stale(_) => {
proxy_lock::remove_lock(proxy_name);
}
proxy_lock::LockStatus::Held(info) => {
if *replace {
eprintln!("Stopping old \"{}\" (pid {})...", proxy_name, info.pid);
proxy_lock::stop_proxy(proxy_name);
} else {
eprintln!(
"error: proxy \"{}\" is already running (pid {}).",
proxy_name, info.pid
);
eprintln!(" Use --replace to stop the old one and start this one.");
std::process::exit(1);
}
}
}
if let Err(e) = proxy_lock::snapshot_config(proxy_name, config_content) {
eprintln!("error: failed to snapshot config: {e}");
std::process::exit(1);
}
let fd = daemon::daemonize_proxy(proxy_name, Duration::from_secs(10))
.unwrap_or_else(|e| {
eprintln!("error: {e}");
std::process::exit(1);
});
Some(fd)
}
#[cfg(not(unix))]
{
eprintln!("error: background proxy mode not supported on this platform");
std::process::exit(1);
}
}
CliAction::RelayRun {
foreground: false,
config_content,
..
} => {
#[cfg(unix)]
{
if !matches!(daemon::check_status(), daemon::DaemonStatus::Running(_)) {
eprintln!("error: daemon not running — run `mcpr start` first");
std::process::exit(1);
}
match relay_lock::check_lock() {
relay_lock::LockStatus::Free => {}
relay_lock::LockStatus::Stale(_) => relay_lock::remove_lock(),
relay_lock::LockStatus::Held(info) => {
eprintln!("error: relay already running (pid {})", info.pid);
std::process::exit(1);
}
}
if let Err(e) = relay_lock::snapshot_config(config_content) {
eprintln!("error: failed to snapshot config: {e}");
std::process::exit(1);
}
let fd = daemon::daemonize_relay(Duration::from_secs(10)).unwrap_or_else(|e| {
eprintln!("error: {e}");
std::process::exit(1);
});
Some(fd)
}
#[cfg(not(unix))]
{
eprintln!("error: background relay mode not supported on this platform");
std::process::exit(1);
}
}
CliAction::RelayRun {
foreground: true, ..
} => None,
_ => None,
};
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("Failed to create tokio runtime")
.block_on(async_main(action, ready_fd));
}
async fn async_main(action: CliAction, ready_fd: Option<i32>) {
match action {
CliAction::Start { .. } => {
#[cfg(unix)]
daemon::run_supervisor(ready_fd).await;
#[cfg(not(unix))]
{
eprintln!("error: daemon mode not supported on this platform");
std::process::exit(1);
}
}
CliAction::Stop => {
let stopped = proxy_lock::stop_all_proxies();
for name in &stopped {
eprintln!("Stopped proxy \"{}\".", name);
}
#[cfg(unix)]
daemon::stop_daemon();
#[cfg(not(unix))]
{
eprintln!("error: daemon management not supported on this platform");
std::process::exit(1);
}
}
CliAction::Restart {
restart_proxies,
restart_relay,
} => {
if !restart_proxies.is_empty() || restart_relay {
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
for name in &restart_proxies {
match logic::proxy::start_proxy(name) {
Ok(_) => eprintln!("Restarted proxy \"{}\".", name),
Err(e) => eprintln!("Failed to restart proxy \"{}\": {}", name, e),
}
}
if restart_relay {
match logic::relay::start_relay_from_snapshot() {
Ok(_) => eprintln!("Restarted relay."),
Err(e) => eprintln!("Failed to restart relay: {}", e),
}
}
});
}
#[cfg(unix)]
daemon::run_supervisor(ready_fd).await;
#[cfg(not(unix))]
{
eprintln!("error: daemon mode not supported on this platform");
std::process::exit(1);
}
}
CliAction::Status => {
#[cfg(unix)]
{
let status = logic::daemon::get_status();
let exit_code = render::daemon_status(status);
if exit_code != 0 {
std::process::exit(exit_code);
}
}
#[cfg(not(unix))]
{
eprintln!("error: daemon management not supported on this platform");
std::process::exit(1);
}
}
CliAction::Validate(args) => {
let issues = config::validate_config(args.config.as_deref());
let has_error = issues.iter().any(|(s, _)| *s == "error");
render::validate_issues(&issues);
std::process::exit(if has_error { 1 } else { 0 });
}
CliAction::Version => {
render::version_info();
}
CliAction::Update => {
eprintln!("Updating mcpr to the latest version...");
let status = std::process::Command::new("sh")
.args(["-c", "curl -fsSL https://mcpr.app/install.sh | sh"])
.status();
match status {
Ok(s) if s.success() => {
#[cfg(unix)]
if matches!(daemon::check_status(), daemon::DaemonStatus::Running(_)) {
eprintln!("Restarting daemon with updated binary...");
let exe = std::env::current_exe().unwrap_or_else(|_| "mcpr".into());
let _ = std::process::Command::new(exe).arg("restart").status();
}
}
Ok(s) => std::process::exit(s.code().unwrap_or(1)),
Err(e) => {
eprintln!("update failed: {e}");
std::process::exit(1);
}
}
}
CliAction::ProxySetup { cloud_url, output } => {
if let Err(e) = cmd::setup::run_setup(&cloud_url, output.as_deref()).await {
eprintln!("error: {e}");
std::process::exit(1);
}
}
CliAction::Proxy(cmd) => {
cmd::handle_proxy_command(cmd);
}
CliAction::ProxyRun {
mode, config_path, ..
} => match mode {
Mode::Relay(_) => {
eprintln!("error: use `mcpr relay run` instead of `mcpr proxy run` for relay mode");
std::process::exit(1);
}
Mode::Gateway(cfg) => {
run_gateway_inner(*cfg, ready_fd, config_path).await;
}
},
CliAction::Store(cmd) => {
cmd::handle_store_command(cmd);
}
CliAction::RelayRun {
relay_config,
config_path,
..
} => {
run_relay_inner(relay_config, ready_fd, config_path).await;
}
CliAction::Relay(cmd) => {
cmd::handle_relay_command(cmd);
}
}
}
#[allow(unused_variables)]
async fn run_gateway_inner(cfg: GatewayConfig, ready_fd: Option<i32>, config_path: String) {
let proxy_health_ref = proxy_health::new_shared_health();
let mcp = match cfg.mcp {
Some(url) => url,
None => {
eprintln!(
"{}: `mcp` is required in mcpr.toml. Set it to your upstream MCP server URL, e.g. mcp = \"http://localhost:8080\"",
colored::Colorize::red("error"),
);
std::process::exit(1);
}
};
validate_mcp_url(&mcp);
let proxy_name = cfg.name.clone();
let proxy_name_for_shutdown = proxy_name.clone();
let widget_source = cfg.widgets.as_ref().map(|w| {
if w.starts_with("http://") || w.starts_with("https://") {
WidgetSource::Proxy(w.clone())
} else {
WidgetSource::Static(w.clone())
}
});
let bind_port = cfg.port.unwrap_or(if cfg.tunnel { 0 } else { 3000 });
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{bind_port}"))
.await
.expect("Failed to bind");
let actual_port = listener.local_addr().unwrap().port();
#[cfg(unix)]
let daemon_pid = daemon::read_pid_file().map(|i| i.pid);
#[cfg(not(unix))]
let daemon_pid: Option<u32> = None;
#[cfg(unix)]
{
if let Err(e) = proxy_lock::write_lock(&proxy_name, actual_port, &config_path, daemon_pid) {
eprintln!("error: failed to write lockfile: {e}");
std::process::exit(1);
}
if let Some(fd) = ready_fd {
daemon::signal_ready(fd);
}
}
let public_url = if !cfg.tunnel {
proxy_health::lock_health(&proxy_health_ref).tunnel_status =
proxy_health::ConnectionStatus::Connected;
format!("http://localhost:{actual_port}")
} else {
let relay_url = cfg.relay_url.as_deref().unwrap();
if cfg.tunnel_token.is_none() {
eprintln!(
"{}: No tunnel token configured. Register at https://mcpr.app to get one, then set `tunnel.token` in mcpr.toml.",
colored::Colorize::red("error"),
);
std::process::exit(1);
}
let (token, desired_subdomain) =
GatewayConfig::resolve_tunnel_identity(cfg.tunnel_subdomain, cfg.tunnel_token);
proxy_health::lock_health(&proxy_health_ref).tunnel_status =
proxy_health::ConnectionStatus::Connecting;
match mcpr_tunnel::start_tunnel_client(
actual_port,
relay_url,
&token,
desired_subdomain.as_deref(),
TunnelStatusAdapter(proxy_health_ref.clone()),
)
.await
{
Ok(url) => url,
Err(e) => {
eprintln!(
"{}: Failed to connect to relay: {}",
colored::Colorize::red("error"),
e
);
eprintln!("Set `tunnel.enabled = false` in mcpr.toml to use proxy-only mode");
std::process::exit(1);
}
}
};
let proxy_domain = public_url
.trim_start_matches("https://")
.trim_start_matches("http://")
.trim_end_matches('/')
.to_string();
let rewrite_config = RewriteConfig {
proxy_url: public_url.clone(),
proxy_domain,
mcp_upstream: mcp.clone(),
csp: cfg.csp.clone(),
};
let connect_timeout =
Duration::from_secs(cfg.connect_timeout.unwrap_or(DEFAULT_CONNECT_TIMEOUT_SECS));
let request_timeout =
Duration::from_secs(cfg.request_timeout.unwrap_or(DEFAULT_REQUEST_TIMEOUT_SECS));
let max_concurrent = cfg
.max_concurrent_upstream
.unwrap_or(DEFAULT_MAX_CONCURRENT_UPSTREAM);
let upstream = UpstreamClient {
http_client: reqwest::Client::builder()
.connect_timeout(connect_timeout)
.pool_idle_timeout(Duration::from_secs(90))
.pool_max_idle_per_host(DEFAULT_MAX_IDLE_UPSTREAM_PER_HOST)
.build()
.expect("Failed to build HTTP client"),
semaphore: Arc::new(tokio::sync::Semaphore::new(max_concurrent)),
request_timeout,
};
let mut event_manager = mcpr_core::event::EventManager::new();
event_manager.register(Box::new(mcpr_integrations::StderrSink::new(
cfg.runtime.log_format,
)));
let mut sqlite_db_path: Option<std::path::PathBuf> = None;
if let Some(db_path) = mcpr_integrations::store::path::resolve_db_path(None) {
match mcpr_integrations::store::Store::open(mcpr_integrations::store::StoreConfig {
db_path: db_path.clone(),
mcpr_version: env!("CARGO_PKG_VERSION").to_string(),
}) {
Ok(store) => {
eprintln!(
" {} storage: {}",
colored::Colorize::dimmed("store"),
db_path.display()
);
event_manager.register(Box::new(mcpr_integrations::SqliteSink::new(store)));
sqlite_db_path = Some(db_path);
}
Err(e) => {
eprintln!(
" {}: failed to open store: {e}",
colored::Colorize::yellow("warn"),
);
}
}
}
if let Some(ref token) = cfg.cloud_token {
let endpoint = cfg
.cloud_endpoint
.clone()
.unwrap_or_else(|| "https://api.mcpr.app".to_string());
let cloud_endpoint = format!("{}/api/ingest-events", endpoint.trim_end_matches('/'));
proxy_health::lock_health(&proxy_health_ref).cloud_endpoint = Some(cloud_endpoint.clone());
let cloud_health = proxy_health_ref.clone();
event_manager.register(Box::new(mcpr_integrations::CloudSink::new(
mcpr_integrations::CloudSinkConfig {
endpoint: cloud_endpoint,
token: token.clone(),
server: cfg.cloud_server.clone(),
batch_size: cfg.cloud_batch_size.unwrap_or(100),
flush_interval: Duration::from_millis(cfg.cloud_flush_interval_ms.unwrap_or(5000)),
on_flush: Some(std::sync::Arc::new(move |status| {
use mcpr_integrations::sinks::cloud_sink::SyncStatus;
let mut h = proxy_health::lock_health(&cloud_health);
h.cloud_sync = Some(match status {
SyncStatus::Ok { count } => proxy_health::CloudSyncStatus::Ok { count },
SyncStatus::Failed { message } => {
proxy_health::CloudSyncStatus::Failed { message }
}
});
})),
},
)));
}
let event_bus_handle = event_manager.start();
let proxy = Arc::new(ProxyState {
name: proxy_name.clone(),
mcp_upstream: mcp.clone(),
upstream: upstream.clone(),
max_request_body: cfg
.max_request_body_size
.unwrap_or(DEFAULT_MAX_REQUEST_BODY_SIZE),
max_response_body: cfg
.max_response_body_size
.unwrap_or(DEFAULT_MAX_RESPONSE_BODY_SIZE),
rewrite_config: rewrite_config.into_swap(),
widget_source,
sessions: MemorySessionStore::new(),
schema_manager: Arc::new(SchemaManager::new(
proxy_name.clone(),
MemorySchemaStore::new(),
)),
health: proxy_health_ref.clone(),
event_bus: event_bus_handle.bus.clone(),
});
let app_state = AppState {
proxy: proxy.clone(),
};
if let Some(ref db_path) = sqlite_db_path {
hydrate_schema_manager_from_sqlite(&proxy.schema_manager, &proxy.name, db_path).await;
}
probe_mcp_upstream(&mcp, &upstream.http_client, &proxy_health_ref).await;
let health_proxy = proxy.clone();
let app = build_app(app_state);
render::log_startup(
&proxy_health_ref,
actual_port,
&public_url,
&mcp,
cfg.widgets.as_deref(),
cfg.cloud_server.as_deref(),
);
#[cfg(unix)]
{
let _ = proxy_lock::write_tunnel_url(&proxy_name_for_shutdown, &public_url);
let _ = proxy_lock::write_upstream_url(&proxy_name_for_shutdown, &mcp);
}
let drain_timeout = cfg.runtime.drain_timeout;
let admin_bind = cfg.runtime.admin_bind.clone();
let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
let shutdown_for_server = shutdown_tx.subscribe();
tokio::spawn(async move {
axum::serve(listener, app)
.with_graceful_shutdown(async move {
let mut rx = shutdown_for_server;
let _ = rx.changed().await;
})
.await
.expect("Server failed");
});
if admin_bind != "none" {
let admin_health_ref = proxy_health_ref.clone();
let admin_shutdown = shutdown_tx.subscribe();
tokio::spawn(async move {
admin::start_admin_server(&admin_bind, admin_health_ref, admin_shutdown).await;
});
}
tokio::spawn(async move {
health_check_loop(health_proxy).await;
});
let shutdown_trigger = shutdown_tx.clone();
tokio::spawn(async move {
let ctrl_c = tokio::signal::ctrl_c();
#[cfg(unix)]
{
use tokio::signal::unix::{SignalKind, signal};
let mut sigterm = signal(SignalKind::terminate()).expect("Failed to register SIGTERM");
tokio::select! {
_ = ctrl_c => {},
_ = sigterm.recv() => {},
}
}
#[cfg(not(unix))]
{
ctrl_c.await.expect("Failed to listen for ctrl-c");
}
eprintln!("[mcpr] Received shutdown signal, draining...");
IS_DRAINING.store(true, Ordering::SeqCst);
let _ = shutdown_trigger.send(true);
});
if let Some(dpid) = daemon_pid {
let watchdog_shutdown = shutdown_tx.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
#[cfg(unix)]
if !daemon::is_process_alive(dpid) {
eprintln!("[mcpr] daemon died, shutting down...");
IS_DRAINING.store(true, Ordering::SeqCst);
let _ = watchdog_shutdown.send(true);
break;
}
}
});
}
let _ = shutdown_rx.changed().await;
eprintln!("[mcpr] Waiting up to {drain_timeout}s for in-flight requests...");
tokio::time::sleep(Duration::from_secs(drain_timeout.min(5))).await;
event_bus_handle.shutdown().await;
proxy_lock::remove_lock(&proxy_name_for_shutdown);
eprintln!("[mcpr] Shutdown complete.");
}
#[allow(unused_variables)]
async fn run_relay_inner(
cfg: mcpr_tunnel::RelayConfig,
ready_fd: Option<i32>,
config_path: String,
) {
let (app, port) = mcpr_tunnel::build_relay_app(cfg);
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{port}"))
.await
.unwrap_or_else(|e| {
eprintln!("error: failed to bind relay on port {port}: {e}");
std::process::exit(1);
});
let actual_port = listener.local_addr().unwrap().port();
#[cfg(unix)]
let daemon_pid = daemon::read_pid_file().map(|i| i.pid);
#[cfg(unix)]
if let Err(e) = relay_lock::write_lock(actual_port, &config_path) {
eprintln!("error: failed to write relay lockfile: {e}");
std::process::exit(1);
}
#[cfg(unix)]
if let Some(fd) = ready_fd {
daemon::signal_ready(fd);
}
eprintln!(
" {} relay listening on :{actual_port}",
colored::Colorize::green("mcpr")
);
let (shutdown_tx, _) = tokio::sync::watch::channel(false);
let mut shutdown_rx = shutdown_tx.subscribe();
let shutdown_trigger = shutdown_tx.clone();
tokio::spawn(async move {
let ctrl_c = tokio::signal::ctrl_c();
#[cfg(unix)]
{
use tokio::signal::unix::{SignalKind, signal};
let mut sigterm = signal(SignalKind::terminate()).expect("Failed to register SIGTERM");
tokio::select! {
_ = ctrl_c => {},
_ = sigterm.recv() => {},
}
}
#[cfg(not(unix))]
{
ctrl_c.await.expect("Failed to listen for ctrl-c");
}
eprintln!("[mcpr] Received shutdown signal, stopping relay...");
let _ = shutdown_trigger.send(true);
});
let shutdown_for_server = shutdown_tx.subscribe();
tokio::spawn(async move {
axum::serve(listener, app)
.with_graceful_shutdown(async move {
let mut rx = shutdown_for_server;
let _ = rx.changed().await;
})
.await
.expect("Relay server failed");
});
#[cfg(unix)]
if let Some(dpid) = daemon_pid {
let watchdog_shutdown = shutdown_tx.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
if !daemon::is_process_alive(dpid) {
eprintln!("[mcpr] daemon died, shutting down relay...");
let _ = watchdog_shutdown.send(true);
break;
}
}
});
}
let _ = shutdown_rx.changed().await;
relay_lock::remove_lock();
eprintln!("[mcpr] Relay shutdown complete.");
}
fn validate_mcp_url(url: &str) {
let parsed = match url::Url::parse(url) {
Ok(u) => u,
Err(e) => {
eprintln!(
"\n {}: invalid MCP URL \"{}\": {}",
colored::Colorize::red("error"),
url,
e,
);
eprintln!(
" {} Expected format: http://host:port or https://host/path\n",
colored::Colorize::dimmed("hint"),
);
std::process::exit(1);
}
};
match parsed.scheme() {
"http" | "https" => {}
scheme => {
eprintln!(
"\n {}: unsupported scheme \"{}\" in MCP URL \"{}\"",
colored::Colorize::red("error"),
scheme,
url,
);
eprintln!(
" {} MCP URLs must use http:// or https://\n",
colored::Colorize::dimmed("hint"),
);
std::process::exit(1);
}
}
if parsed.host_str().is_none() {
eprintln!(
"\n {}: MCP URL \"{}\" has no host",
colored::Colorize::red("error"),
url,
);
eprintln!(
" {} Expected format: http://host:port or https://host/path\n",
colored::Colorize::dimmed("hint"),
);
std::process::exit(1);
}
}
async fn hydrate_schema_manager_from_sqlite(
manager: &mcpr_core::protocol::schema_manager::SchemaManager<
mcpr_core::protocol::schema_manager::MemorySchemaStore,
>,
proxy_name: &str,
db_path: &std::path::Path,
) {
use mcpr_core::protocol::schema_manager::{SchemaVersion, SchemaVersionId};
use mcpr_integrations::store::QueryEngine;
let engine = match QueryEngine::open(db_path) {
Ok(e) => e,
Err(e) => {
eprintln!(
" {}: schema hydration skipped — query engine open failed: {e}",
colored::Colorize::yellow("warn"),
);
return;
}
};
let methods = [
"initialize",
"tools/list",
"resources/list",
"resources/templates/list",
"prompts/list",
];
let mut hydrated = 0usize;
for method in methods {
let row = match engine.latest_schema_row(proxy_name, method) {
Ok(Some(r)) => r,
Ok(None) => continue,
Err(e) => {
eprintln!(
" {}: schema hydration query failed for {method}: {e}",
colored::Colorize::yellow("warn"),
);
continue;
}
};
let Ok(payload): Result<serde_json::Value, _> = serde_json::from_str(&row.payload) else {
continue;
};
let version = SchemaVersion {
id: SchemaVersionId(row.schema_hash.chars().take(16).collect()),
upstream_id: proxy_name.to_string(),
method: method.to_string(),
version: 1,
payload: std::sync::Arc::new(payload),
content_hash: row.schema_hash,
captured_at: chrono::DateTime::from_timestamp_millis(row.captured_at)
.unwrap_or_else(chrono::Utc::now),
};
manager.preload(version).await;
hydrated += 1;
}
if hydrated > 0 {
eprintln!(
" {} hydrated {hydrated} schema method(s) from {}",
colored::Colorize::dimmed("schema"),
db_path.display(),
);
}
}
async fn probe_mcp_upstream(url: &str, client: &reqwest::Client, health_ref: &SharedProxyHealth) {
let (status, warning) = check_mcp_endpoint(url, client).await;
let mut h = proxy_health::lock_health(health_ref);
h.mcp_status = status;
h.mcp_warning = warning;
}
async fn check_mcp_endpoint(
url: &str,
client: &reqwest::Client,
) -> (proxy_health::ConnectionStatus, Option<String>) {
let init_body = serde_json::json!({
"jsonrpc": "2.0",
"id": 0,
"method": "initialize",
"params": {
"protocolVersion": "2025-03-26",
"capabilities": {},
"clientInfo": {
"name": "mcpr-probe",
"version": "0.1.0"
}
}
});
let resp = match client
.post(url)
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.json(&init_body)
.timeout(std::time::Duration::from_secs(5))
.send()
.await
{
Ok(r) => r,
Err(e) => {
let hint = if e.is_connect() {
"Cannot connect. Is the MCP server running?"
} else if e.is_timeout() {
"Connection timed out. Check host and port."
} else {
"Cannot reach server. Check the URL."
};
return (
proxy_health::ConnectionStatus::Disconnected,
Some(hint.to_string()),
);
}
};
let status_code = resp.status().as_u16();
if status_code == 401 || status_code == 403 {
return (
proxy_health::ConnectionStatus::Connected,
Some(
"Server requires authentication. Status will update on first client connection."
.to_string(),
),
);
}
let body_bytes = match resp.bytes().await {
Ok(b) => b,
Err(_) => {
return (
proxy_health::ConnectionStatus::Connected,
Some("Server reachable but response unreadable".to_string()),
);
}
};
let body_text = String::from_utf8_lossy(&body_bytes);
let json_str = body_text
.lines()
.find_map(|line| {
let data = line.strip_prefix("data:")?.trim();
if data.is_empty() {
None
} else {
Some(data.to_string())
}
})
.unwrap_or_else(|| body_text.to_string());
let parsed: serde_json::Value = match serde_json::from_str(&json_str) {
Ok(v) => v,
Err(_) => {
let hint = if status_code == 404 {
"Server returned 404. Check the MCP endpoint path."
} else if (300..400).contains(&status_code) {
"Server returned a redirect. Check the URL."
} else if body_text.trim_start().starts_with('<') {
"Server returned HTML, not JSON-RPC. Not an MCP endpoint."
} else {
"Did not return JSON-RPC. Not an MCP endpoint?"
};
return (
proxy_health::ConnectionStatus::NotMcp,
Some(hint.to_string()),
);
}
};
if parsed.get("jsonrpc").and_then(|v| v.as_str()) != Some("2.0") {
return (
proxy_health::ConnectionStatus::NotMcp,
Some("Response is JSON but not JSON-RPC 2.0.".to_string()),
);
}
if let Some(err) = parsed.get("error") {
let code = err.get("code").and_then(|c| c.as_i64()).unwrap_or(0);
let msg = err
.get("message")
.and_then(|m| m.as_str())
.unwrap_or("unknown error");
if code == -32601 {
return (
proxy_health::ConnectionStatus::NotMcp,
Some("JSON-RPC server but 'initialize' method not found.".to_string()),
);
}
return (
proxy_health::ConnectionStatus::Connected,
Some(format!("MCP init error: {msg}")),
);
}
if let Some(result) = parsed.get("result") {
if result.get("serverInfo").is_some() || result.get("capabilities").is_some() {
return (proxy_health::ConnectionStatus::Connected, None);
}
return (
proxy_health::ConnectionStatus::Connected,
Some("Server responded but missing serverInfo in initialize result.".to_string()),
);
}
(proxy_health::ConnectionStatus::Connected, None)
}
async fn health_check_loop(proxy: Arc<ProxyState>) {
let http = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(5))
.build()
.unwrap();
loop {
let (mcp_status, mcp_warning) = check_mcp_endpoint(&proxy.mcp_upstream, &http).await;
let names = mcpr_core::proxy::widgets::discover_widget_names(&proxy).await;
let widgets_status = if proxy.widget_source.is_none() {
proxy_health::ConnectionStatus::Unknown
} else if names.is_empty() {
proxy_health::ConnectionStatus::Disconnected
} else {
proxy_health::ConnectionStatus::Connected
};
let widget_count = if names.is_empty() {
None
} else {
Some(names.len())
};
{
let mut h = proxy_health::lock_health(&proxy.health);
h.mcp_status = mcp_status;
h.mcp_warning = mcp_warning;
h.widgets_status = widgets_status;
h.widget_count = widget_count;
h.widget_names = names;
}
{
let h = proxy_health::lock_health(&proxy.health);
proxy
.event_bus
.emit(mcpr_core::event::ProxyEvent::Heartbeat(
mcpr_core::event::HeartbeatEvent {
ts: chrono::Utc::now().timestamp_millis(),
proxy: proxy.name.clone(),
mcp_status: h.mcp_status.label().to_string(),
tunnel_status: h.tunnel_status.label().to_string(),
widgets_status: h.widgets_status.label().to_string(),
uptime_secs: h.started_at.elapsed().as_secs(),
request_count: h.request_count,
},
));
}
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
}