use super::kumiho_client::KumihoError;
use anyhow::{Context, Result};
use reqwest::{Client, Method, StatusCode};
use serde_json::Value;
use std::fs::OpenOptions;
use std::net::TcpListener;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::OnceLock;
use std::time::Duration;
use tokio::process::{Child, Command};
use tokio::sync::Mutex;
const BRIDGE_SCRIPT: &str = include_str!("../../resources/sidecars/kumiho_sdk_bridge.py");
const BRIDGE_SCRIPT_NAME: &str = "kumiho_sdk_bridge.py";
#[derive(Debug)]
struct BridgeState {
base_url: String,
child: Child,
}
static BRIDGE: OnceLock<Mutex<Option<BridgeState>>> = OnceLock::new();
#[derive(Debug, Clone)]
pub struct BridgeResponse {
pub status: StatusCode,
pub body: String,
}
fn bridge_enabled() -> bool {
!matches!(
std::env::var("CONSTRUCT_KUMIHO_SDK_BRIDGE")
.unwrap_or_else(|_| "1".to_string())
.trim()
.to_ascii_lowercase()
.as_str(),
"0" | "false" | "no" | "off"
)
}
fn kumiho_dir() -> Result<PathBuf> {
Ok(crate::sidecars::construct_root()?.join("kumiho"))
}
fn bridge_script_path() -> Result<PathBuf> {
Ok(kumiho_dir()?.join(BRIDGE_SCRIPT_NAME))
}
fn venv_python(dir: &Path) -> Option<PathBuf> {
if cfg!(windows) {
let candidate = dir.join("venv").join("Scripts").join("python.exe");
candidate.exists().then_some(candidate)
} else {
let python3 = dir.join("venv").join("bin").join("python3");
if python3.exists() {
return Some(python3);
}
let python = dir.join("venv").join("bin").join("python");
python.exists().then_some(python)
}
}
fn materialize_bridge_script() -> Result<PathBuf> {
let dir = kumiho_dir()?;
std::fs::create_dir_all(&dir).with_context(|| format!("creating {}", dir.display()))?;
let script = bridge_script_path()?;
let write = match std::fs::read_to_string(&script) {
Ok(existing) => existing != BRIDGE_SCRIPT,
Err(_) => true,
};
if write {
std::fs::write(&script, BRIDGE_SCRIPT)
.with_context(|| format!("writing {}", script.display()))?;
}
Ok(script)
}
fn reserve_loopback_port() -> Result<u16> {
let listener = TcpListener::bind("127.0.0.1:0").context("binding loopback bridge port")?;
let port = listener.local_addr()?.port();
drop(listener);
Ok(port)
}
fn log_file(name: &str) -> Option<std::fs::File> {
let root = crate::sidecars::construct_root().ok()?;
let dir = root.join("logs");
std::fs::create_dir_all(&dir).ok()?;
OpenOptions::new()
.create(true)
.append(true)
.open(dir.join(name))
.ok()
}
async fn poll_health(client: &Client, base_url: &str) -> bool {
let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
while tokio::time::Instant::now() < deadline {
if let Ok(resp) = client
.get(format!("{base_url}/health"))
.timeout(Duration::from_millis(500))
.send()
.await
{
if resp.status().is_success() {
return true;
}
}
tokio::time::sleep(Duration::from_millis(150)).await;
}
false
}
async fn start_bridge(client: &Client) -> Result<BridgeState> {
let dir = kumiho_dir()?;
let python = venv_python(&dir).ok_or_else(|| {
anyhow::anyhow!(
"Kumiho sidecar venv not found under {}. Run `construct install --sidecars-only`.",
dir.display()
)
})?;
let script = materialize_bridge_script()?;
let port = reserve_loopback_port()?;
let base_url = format!("http://127.0.0.1:{port}");
let stderr = log_file("kumiho-sdk-bridge.stderr.log").map(Stdio::from);
let stdout = log_file("kumiho-sdk-bridge.stdout.log").map(Stdio::from);
let mut cmd = Command::new(python);
cmd.arg(script)
.env("KUMIHO_SDK_BRIDGE_HOST", "127.0.0.1")
.env("KUMIHO_SDK_BRIDGE_PORT", port.to_string())
.env("PYTHONUNBUFFERED", "1")
.env_remove("KUMIHO_AUTO_CONFIGURE")
.stdin(Stdio::null())
.stdout(stdout.unwrap_or_else(Stdio::null))
.stderr(stderr.unwrap_or_else(Stdio::null));
#[cfg(windows)]
{
cmd.creation_flags(0x0800_0000);
}
let mut child = cmd.spawn().context("spawning Kumiho SDK bridge")?;
if poll_health(client, &base_url).await {
tracing::info!(%base_url, "Kumiho SDK bridge started");
return Ok(BridgeState { base_url, child });
}
let _ = child.kill().await;
anyhow::bail!("Kumiho SDK bridge did not become healthy");
}
async fn ensure_bridge(client: &Client) -> Option<String> {
if !bridge_enabled() {
return None;
}
let lock = BRIDGE.get_or_init(|| Mutex::new(None));
let mut guard = lock.lock().await;
if let Some(state) = guard.as_mut() {
match state.child.try_wait() {
Ok(None) => return Some(state.base_url.clone()),
Ok(Some(status)) => {
tracing::warn!(?status, "Kumiho SDK bridge exited; restarting on demand");
*guard = None;
}
Err(err) => {
tracing::warn!(error = %err, "Kumiho SDK bridge status check failed");
*guard = None;
}
}
}
match start_bridge(client).await {
Ok(state) => {
let base_url = state.base_url.clone();
*guard = Some(state);
Some(base_url)
}
Err(err) => {
tracing::warn!(error = %err, "Kumiho SDK bridge unavailable; falling back to FastAPI");
None
}
}
}
async fn mark_dead() {
let Some(lock) = BRIDGE.get() else {
return;
};
let mut guard = lock.lock().await;
if let Some(mut state) = guard.take() {
let _ = state.child.kill().await;
}
}
fn is_unsupported_bridge_response(status: StatusCode, body: &str) -> bool {
if status != StatusCode::NOT_IMPLEMENTED {
return false;
}
serde_json::from_str::<Value>(body)
.ok()
.and_then(|v| {
v.get("error_code")
.and_then(|c| c.as_str())
.map(str::to_string)
})
.is_some_and(|code| code == "kumiho_sdk_bridge_unsupported")
}
pub async fn send_raw(
client: &Client,
method: Method,
path: &str,
query: Vec<(String, String)>,
token: &str,
body: Option<Value>,
) -> Option<std::result::Result<BridgeResponse, KumihoError>> {
let token = token.trim();
if token.is_empty() {
return None;
}
let base_url = ensure_bridge(client).await?;
let url = if path == "/health" {
format!("{}/health", base_url.trim_end_matches('/'))
} else {
format!("{}/api/v1{}", base_url.trim_end_matches('/'), path)
};
let mut req = client
.request(method, &url)
.header("X-Kumiho-Token", token)
.timeout(Duration::from_secs(10));
if !query.is_empty() {
req = req.query(&query);
}
if let Some(body) = body {
req = req.json(&body);
}
let resp = match req.send().await {
Ok(resp) => resp,
Err(err) => {
tracing::warn!(error = %err, path = %path, "Kumiho SDK bridge request failed");
mark_dead().await;
return None;
}
};
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
if is_unsupported_bridge_response(status, &body) {
return None;
}
if status.is_server_error() {
tracing::warn!(
upstream_status = status.as_u16(),
path = %path,
body = %body,
"Kumiho SDK bridge returned 5xx; falling back to FastAPI"
);
return None;
}
if !status.is_success() {
return Some(Err(KumihoError::Api {
status: status.as_u16(),
body,
}));
}
Some(Ok(BridgeResponse { status, body }))
}