use anyhow::Result;
use axum::{
Json, Router,
body::Body,
extract::{Path, State},
http::{HeaderMap, Request, StatusCode, header},
response::{
Html, IntoResponse, Response,
sse::{Event, KeepAlive, Sse},
},
routing::{get, post},
};
use clap::Parser;
use rmcp::{ServiceExt, model::CallToolRequestParams, transport::child_process::TokioChildProcess};
use rust_embed::Embed;
use serde::{Deserialize, Serialize};
use std::path::Path as FsPath;
use std::process::Command as StdCommand;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
use tokio::process::Command;
use tokio::sync::{Mutex, RwLock};
use torc::config::TorcConfig;
use torc::network_utils::find_available_port;
use tracing::{error, info, warn};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
#[derive(Embed)]
#[folder = "torc-dash/static/"]
struct Assets;
#[derive(Default)]
struct ManagedServer {
pid: Option<u32>,
port: Option<u16>,
output_lines: Vec<String>,
}
struct McpClient {
peer: rmcp::service::Peer<rmcp::service::RoleClient>,
tools: Vec<rmcp::model::Tool>,
}
struct AppState {
api_url: String,
client: reqwest::Client,
torc_bin: String,
torc_server_bin: String,
torc_mcp_server_bin: String,
managed_server: Mutex<ManagedServer>,
anthropic_api_key: RwLock<Option<String>>,
anthropic_base_url: RwLock<String>,
anthropic_auth_header: RwLock<String>,
mcp_client: Mutex<Option<McpClient>>,
anthropic_model: String,
}
#[derive(Parser)]
#[command(name = "torc-dash")]
#[command(about = "Torc workflow dashboard with CLI integration")]
struct Cli {
#[arg(short, long, default_value_t = 8090)]
port: u16,
#[arg(long, default_value = "127.0.0.1")]
host: String,
#[arg(
short,
long,
default_value = "http://localhost:8080/torc-service/v1",
env = "TORC_API_URL"
)]
api_url: String,
#[arg(long, default_value = "torc", env = "TORC_BIN")]
torc_bin: String,
#[arg(long, default_value = "torc-server", env = "TORC_SERVER_BIN")]
torc_server_bin: String,
#[arg(long)]
standalone: bool,
#[arg(long, default_value_t = 0)]
server_port: u16,
#[arg(long, default_value = "0.0.0.0")]
server_host: String,
#[arg(long, env = "DATABASE_URL")]
database: Option<String>,
#[arg(long, default_value_t = 5)]
completion_check_interval_secs: u32,
#[cfg(unix)]
#[arg(long, value_name = "PATH")]
socket: Option<std::path::PathBuf>,
#[arg(long, env = "TORC_TLS_CA_CERT")]
tls_ca_cert: Option<String>,
#[arg(long, env = "TORC_TLS_INSECURE")]
tls_insecure: bool,
#[arg(long, env = "ANTHROPIC_API_KEY")]
anthropic_api_key: Option<String>,
#[arg(long, env = "ANTHROPIC_FOUNDRY_API_KEY")]
anthropic_foundry_api_key: Option<String>,
#[arg(long, env = "ANTHROPIC_FOUNDRY_RESOURCE")]
anthropic_foundry_resource: Option<String>,
#[arg(long, env = "ANTHROPIC_BASE_URL")]
anthropic_base_url: Option<String>,
#[arg(long, env = "ANTHROPIC_AUTH_HEADER")]
anthropic_auth_header: Option<String>,
#[arg(long, default_value = "torc-mcp-server", env = "TORC_MCP_SERVER_BIN")]
torc_mcp_server_bin: String,
#[arg(
long,
default_value = "claude-sonnet-4-20250514",
env = "ANTHROPIC_MODEL"
)]
anthropic_model: String,
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive("torc_dash=info".parse().unwrap()),
)
.init();
let cli = Cli::parse();
let file_config = TorcConfig::load().unwrap_or_default();
let dash_config = &file_config.dash;
let host = if cli.host != "127.0.0.1" {
cli.host.clone()
} else {
dash_config.host.clone()
};
let port = if cli.port != 8090 {
cli.port
} else {
dash_config.port
};
let api_url = if cli.api_url != "http://localhost:8080/torc-service/v1" {
cli.api_url.clone()
} else {
dash_config.api_url.clone()
};
let torc_bin = if cli.torc_bin != "torc" {
cli.torc_bin.clone()
} else {
dash_config.torc_bin.clone()
};
let torc_server_bin = if cli.torc_server_bin != "torc-server" {
cli.torc_server_bin.clone()
} else {
dash_config.torc_server_bin.clone()
};
let standalone = cli.standalone || dash_config.standalone;
let server_port = if cli.server_port != 0 {
cli.server_port
} else {
dash_config.server_port
};
let server_host = if cli.server_host != "0.0.0.0" {
cli.server_host.clone()
} else {
dash_config.server_host.clone()
};
let database = cli
.database
.clone()
.or_else(|| dash_config.database.clone());
let completion_check_interval_secs = if cli.completion_check_interval_secs != 5 {
cli.completion_check_interval_secs
} else {
dash_config.completion_check_interval_secs
};
#[cfg(unix)]
let socket_path = cli
.socket
.clone()
.or_else(|| dash_config.socket.as_ref().map(std::path::PathBuf::from));
#[cfg(unix)]
if let Some(ref socket_path) = socket_path {
info!(
"Starting torc-dash v{} ({}{}) on unix:{} torc_bin={} server_bin={}",
env!("CARGO_PKG_VERSION"),
env!("GIT_HASH"),
env!("GIT_DIRTY"),
socket_path.display(),
torc_bin,
torc_server_bin
);
} else {
info!(
"Starting torc-dash v{} ({}{}) on {}:{} torc_bin={} server_bin={}",
env!("CARGO_PKG_VERSION"),
env!("GIT_HASH"),
env!("GIT_DIRTY"),
host,
port,
torc_bin,
torc_server_bin
);
}
#[cfg(not(unix))]
info!(
"Starting torc-dash v{} ({}{}) on {}:{} torc_bin={} server_bin={}",
env!("CARGO_PKG_VERSION"),
env!("GIT_HASH"),
env!("GIT_DIRTY"),
host,
port,
torc_bin,
torc_server_bin
);
let mut actual_server_port = server_port;
let managed_server = if standalone {
if api_url != "http://localhost:8080/torc-service/v1" {
warn!(
"--api-url is ignored in standalone mode. Use --server-host and --server-port to configure the managed server."
);
}
info!(
"Standalone mode: starting torc-server on {}:{} (port 0 = auto-detect)",
server_host, server_port
);
let mut args = vec![
"run".to_string(),
"--host".to_string(),
server_host.clone(),
"--port".to_string(),
server_port.to_string(),
"--completion-check-interval-secs".to_string(),
completion_check_interval_secs.to_string(),
];
if let Some(ref db) = database {
args.push("--database".to_string());
args.push(db.clone());
}
let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
match Command::new(&torc_server_bin)
.args(&args_refs)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
{
Ok(mut child) => {
let pid = child.id();
info!("Started torc-server with PID {:?}", pid);
let mut port_reported = false;
if let Some(stdout) = child.stdout.take() {
let mut reader = BufReader::new(stdout);
let mut line = String::new();
let timeout = tokio::time::Duration::from_secs(10);
let start = std::time::Instant::now();
loop {
if start.elapsed() > timeout {
error!("Timeout waiting for server to report port");
break;
}
match tokio::time::timeout(
tokio::time::Duration::from_millis(100),
reader.read_line(&mut line),
)
.await
{
Ok(Ok(0)) => break, Ok(Ok(_)) => {
if let Some(port_str) = line.strip_prefix("TORC_SERVER_PORT=")
&& let Ok(port) = port_str.trim().parse::<u16>()
{
actual_server_port = port;
port_reported = true;
info!("Server reported actual port: {}", actual_server_port);
break;
}
line.clear();
}
Ok(Err(e)) => {
error!("Error reading server output: {}", e);
break;
}
Err(_) => {
continue;
}
}
}
}
if server_port == 0 && !port_reported {
let mut stderr_output = String::new();
if let Some(mut stderr) = child.stderr.take() {
let _ = stderr.read_to_string(&mut stderr_output).await;
}
if let Ok(Some(status)) = child.try_wait() {
let stderr_output = stderr_output.trim();
let details = if stderr_output.is_empty() {
format!("torc-server exited with status {}", status)
} else {
format!(
"torc-server exited with status {}: {}",
status, stderr_output
)
};
return Err(anyhow::anyhow!(
"Managed server failed to start before reporting a port: {}",
details
));
}
return Err(anyhow::anyhow!(
"Managed server did not report an assigned port within 10 seconds"
));
}
ManagedServer {
pid,
port: Some(actual_server_port),
output_lines: vec![format!(
"Server started with PID {} on port {}",
pid.unwrap_or(0),
actual_server_port
)],
}
}
Err(e) => {
error!("Failed to start torc-server: {}", e);
error!("Make sure torc-server is in your PATH or specify --torc-server-bin");
return Err(anyhow::anyhow!("Failed to start torc-server: {}", e));
}
}
} else {
ManagedServer::default()
};
let final_api_url = if standalone {
let connect_host = if server_host == "0.0.0.0" || server_host == "::" {
"localhost".to_string()
} else {
server_host.clone()
};
format!(
"http://{}:{}/torc-service/v1",
connect_host, actual_server_port
)
} else {
api_url.clone()
};
info!("API URL: {}", final_api_url);
let tls = torc::client::apis::configuration::TlsConfig {
ca_cert_path: cli.tls_ca_cert.as_ref().map(std::path::PathBuf::from),
insecure: cli.tls_insecure,
};
let http_client = tls
.build_async_client()
.expect("Failed to build HTTP client with TLS config");
let (resolved_api_key, mut anthropic_base_url, mut anthropic_auth_header) =
if let (Some(foundry_key), Some(foundry_resource)) = (
cli.anthropic_foundry_api_key.as_ref(),
cli.anthropic_foundry_resource.as_ref(),
) {
info!(
"AI Chat: using Azure AI Foundry (resource={})",
foundry_resource
);
(
Some(foundry_key.clone()),
format!(
"https://{}.services.ai.azure.com/anthropic/v1",
foundry_resource
),
"x-api-key".to_string(),
)
} else if let Some(ref api_key) = cli.anthropic_api_key {
info!("AI Chat: using direct Anthropic API");
(
Some(api_key.clone()),
"https://api.anthropic.com/v1".to_string(),
"x-api-key".to_string(),
)
} else {
info!("AI Chat: disabled (no API key configured)");
(
None,
"https://api.anthropic.com/v1".to_string(),
"x-api-key".to_string(),
)
};
if let Some(ref url_override) = cli.anthropic_base_url {
info!("AI Chat: base URL overridden to {}", url_override);
anthropic_base_url = url_override.clone();
}
if let Some(ref header_override) = cli.anthropic_auth_header {
info!("AI Chat: auth header overridden to {}", header_override);
anthropic_auth_header = header_override.clone();
}
let state = Arc::new(AppState {
api_url: final_api_url,
client: http_client,
torc_bin,
torc_server_bin: torc_server_bin.clone(),
torc_mcp_server_bin: cli.torc_mcp_server_bin.clone(),
managed_server: Mutex::new(managed_server),
anthropic_api_key: RwLock::new(resolved_api_key),
anthropic_base_url: RwLock::new(anthropic_base_url),
anthropic_auth_header: RwLock::new(anthropic_auth_header),
mcp_client: Mutex::new(None),
anthropic_model: cli.anthropic_model.clone(),
});
let app = Router::new()
.route("/", get(index_handler))
.route("/static/*path", get(static_handler))
.route("/api/cli/create", post(cli_create_handler))
.route("/api/cli/create-slurm", post(cli_create_slurm_handler))
.route("/api/cli/validate", post(cli_validate_handler))
.route("/api/cli/run", post(cli_run_handler))
.route("/api/cli/submit", post(cli_submit_handler))
.route("/api/cli/initialize", post(cli_initialize_handler))
.route(
"/api/cli/check-initialize",
post(cli_check_initialize_handler),
)
.route("/api/cli/delete", post(cli_delete_handler))
.route("/api/cli/cancel", post(cli_cancel_handler))
.route("/api/cli/reinitialize", post(cli_reinitialize_handler))
.route("/api/cli/reset-status", post(cli_reset_status_handler))
.route("/api/cli/execution-plan", post(cli_execution_plan_handler))
.route("/api/cli/run-stream", get(cli_run_stream_handler))
.route("/api/cli/recover", post(cli_recover_handler))
.route("/api/cli/sync-status", post(cli_sync_status_handler))
.route("/api/cli/export", post(cli_export_handler))
.route("/api/cli/import", post(cli_import_handler))
.route("/api/cli/read-file", post(cli_read_file_handler))
.route("/api/cli/plot-resources", post(cli_plot_resources_handler))
.route(
"/api/cli/list-resource-dbs",
post(cli_list_resource_dbs_handler),
)
.route(
"/api/cli/slurm-parse-logs",
post(cli_slurm_parse_logs_handler),
)
.route("/api/cli/slurm-sacct", post(cli_slurm_sacct_handler))
.route("/api/cli/hpc-profiles", get(cli_hpc_profiles_handler))
.route("/api/server/start", post(server_start_handler))
.route("/api/server/stop", post(server_stop_handler))
.route("/api/server/status", get(server_status_handler))
.route("/api/chat", post(chat_handler))
.route("/api/chat/status", get(chat_status_handler))
.route("/api/chat/configure", post(configure_chat_handler))
.route("/api/version", get(version_handler))
.route("/api/user", get(user_handler))
.route(
"/torc-service/*path",
get(proxy_handler)
.post(proxy_handler)
.put(proxy_handler)
.patch(proxy_handler)
.delete(proxy_handler),
)
.with_state(state);
#[cfg(unix)]
if let Some(ref socket_path) = socket_path {
if socket_path.exists() {
std::fs::remove_file(socket_path)?;
}
let uds = tokio::net::UnixListener::bind(socket_path)?;
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(socket_path, std::fs::Permissions::from_mode(0o600))?;
info!("Dashboard available at unix:{}", socket_path.display());
info!(
"To connect via SSH tunnel:\n ssh -L 8090:{} user@this-host\n Then open http://localhost:8090",
socket_path.display()
);
let mut make_service = app.into_make_service();
loop {
let (stream, _addr) = uds.accept().await?;
let tower_service = tower::Service::call(&mut make_service, &stream)
.await
.unwrap_or_else(|e| match e {});
let hyper_service = hyper_util::service::TowerToHyperService::new(tower_service);
tokio::spawn(async move {
let io = hyper_util::rt::TokioIo::new(stream);
if let Err(err) = hyper_util::server::conn::auto::Builder::new(
hyper_util::rt::TokioExecutor::new(),
)
.serve_connection(io, hyper_service)
.await
{
error!("Error serving Unix socket connection: {err}");
}
});
}
}
let (std_listener, actual_port) = find_available_port(&host, port)?;
info!("Dashboard available at http://{}:{}", host, actual_port);
if actual_port != port {
info!(
"Note: Requested port {} was in use, using port {} instead",
port, actual_port
);
}
std_listener.set_nonblocking(true)?;
let listener = tokio::net::TcpListener::from_std(std_listener)?;
axum::serve(listener, app).await?;
Ok(())
}
async fn index_handler() -> impl IntoResponse {
match Assets::get("index.html") {
Some(content) => Html(content.data.into_owned()).into_response(),
None => (StatusCode::NOT_FOUND, "index.html not found").into_response(),
}
}
async fn static_handler(Path(path): Path<String>) -> impl IntoResponse {
let path = path.trim_start_matches('/');
match Assets::get(path) {
Some(content) => {
let mime = mime_guess::from_path(path).first_or_octet_stream();
(
[(header::CONTENT_TYPE, mime.as_ref())],
content.data.into_owned(),
)
.into_response()
}
None => (StatusCode::NOT_FOUND, format!("File not found: {}", path)).into_response(),
}
}
async fn proxy_handler(
State(state): State<Arc<AppState>>,
req: Request<Body>,
) -> impl IntoResponse {
let path = req.uri().path();
let query = req
.uri()
.query()
.map(|q| format!("?{}", q))
.unwrap_or_default();
let method = req.method().clone();
let target_path = path.strip_prefix("/torc-service/v1").unwrap_or(path);
let target_url = format!("{}{}{}", state.api_url, target_path, query);
let mut proxy_req = state.client.request(method, &target_url);
for (name, value) in req.headers() {
if name != header::HOST {
proxy_req = proxy_req.header(name, value);
}
}
let body_bytes = match axum::body::to_bytes(req.into_body(), usize::MAX).await {
Ok(bytes) => bytes,
Err(e) => {
error!("Failed to read request body: {}", e);
return (
StatusCode::BAD_REQUEST,
format!("Failed to read body: {}", e),
)
.into_response();
}
};
if !body_bytes.is_empty() {
proxy_req = proxy_req.body(body_bytes);
}
match proxy_req.send().await {
Ok(resp) => {
let status = StatusCode::from_u16(resp.status().as_u16())
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let headers = resp.headers().clone();
let mut response = Response::builder().status(status);
for (name, value) in headers.iter() {
response = response.header(name, value);
}
let stream = resp.bytes_stream();
response
.body(Body::from_stream(stream))
.unwrap()
.into_response()
}
Err(e) => {
error!("Proxy request failed: {}", e);
(StatusCode::BAD_GATEWAY, format!("Proxy error: {}", e)).into_response()
}
}
}
fn extract_workflow_id(stdout: &str) -> Option<String> {
if let Ok(json) = serde_json::from_str::<serde_json::Value>(stdout)
&& let Some(id) = json.get("workflow_id")
{
return Some(id.to_string().trim_matches('"').to_string());
}
for line in stdout.lines() {
if line.contains("Created workflow") {
if let Some(id) = line.split_whitespace().last() {
return Some(id.trim().to_string());
}
} else if let Some(pos) = line.find("ID:") {
let after = &line[pos + "ID:".len()..];
if let Some(id) = after.split_whitespace().next() {
return Some(id.to_string());
}
}
}
None
}
fn validate_file_extension(ext: &str) -> Option<&'static str> {
match ext {
".json" | "json" => Some(".json"),
".yaml" | "yaml" => Some(".yaml"),
".yml" | "yml" => Some(".yml"),
".kdl" | "kdl" => Some(".kdl"),
".json5" | "json5" => Some(".json5"),
_ => None,
}
}
#[derive(Deserialize)]
struct CreateRequest {
spec: String,
#[serde(default)]
is_file: bool,
#[serde(default)]
file_extension: Option<String>,
}
#[derive(Deserialize)]
struct CreateSlurmRequest {
spec: String,
#[serde(default)]
is_file: bool,
#[serde(default)]
file_extension: Option<String>,
account: String,
#[serde(default)]
profile: Option<String>,
}
#[derive(Deserialize)]
struct WorkflowIdRequest {
workflow_id: String,
}
#[derive(Deserialize)]
struct InitializeRequest {
workflow_id: String,
#[serde(default)]
force: bool,
}
#[derive(Deserialize)]
struct RecoverRequest {
workflow_id: String,
#[serde(default)]
dry_run: bool,
#[serde(default = "default_memory_multiplier")]
memory_multiplier: f64,
#[serde(default = "default_runtime_multiplier")]
runtime_multiplier: f64,
#[serde(default)]
retry_unknown: bool,
#[serde(default = "default_output_dir")]
output_dir: String,
}
fn default_memory_multiplier() -> f64 {
1.5
}
fn default_runtime_multiplier() -> f64 {
1.5
}
fn default_output_dir() -> String {
"torc_output".to_string()
}
#[derive(Deserialize)]
struct SyncStatusRequest {
workflow_id: String,
#[serde(default)]
dry_run: bool,
}
#[derive(Serialize)]
struct CliResponse {
success: bool,
stdout: String,
stderr: String,
exit_code: Option<i32>,
}
#[derive(Deserialize)]
struct ExportRequest {
workflow_id: String,
#[serde(default)]
output: Option<String>,
#[serde(default)]
include_results: bool,
#[serde(default)]
include_events: bool,
}
#[derive(Deserialize)]
struct ImportRequest {
#[serde(default)]
file_path: Option<String>,
#[serde(default)]
content: Option<String>,
#[serde(default)]
name: Option<String>,
#[serde(default)]
skip_results: bool,
#[serde(default)]
skip_events: bool,
}
#[derive(Deserialize)]
struct ReadFileRequest {
path: String,
}
#[derive(Serialize)]
struct ReadFileResponse {
success: bool,
content: Option<String>,
error: Option<String>,
is_json: bool,
exists: bool,
}
async fn cli_create_handler(
State(state): State<Arc<AppState>>,
Json(req): Json<CreateRequest>,
) -> impl IntoResponse {
let spec_content = req.spec.clone();
let raw_extension = req.file_extension.as_deref().unwrap_or(".json");
let file_extension = match validate_file_extension(raw_extension) {
Some(ext) => ext,
None => {
return Json(CliResponse {
success: false,
stdout: String::new(),
stderr: format!(
"Invalid file extension '{}'. Allowed: .json, .yaml, .yml, .kdl, .json5",
raw_extension
),
exit_code: None,
});
}
};
let result = if req.is_file {
run_torc_command(
&state.torc_bin,
&["-f", "json", "workflows", "create", &req.spec],
&state.api_url,
)
.await
} else {
let unique_id = uuid::Uuid::new_v4();
let temp_path = format!("torc_spec_{}{}", unique_id, file_extension);
if let Err(e) = tokio::fs::write(&temp_path, &spec_content).await {
return Json(CliResponse {
success: false,
stdout: String::new(),
stderr: format!("Failed to write spec file: {}", e),
exit_code: None,
});
}
let result = run_torc_command(
&state.torc_bin,
&["-f", "json", "workflows", "create", &temp_path],
&state.api_url,
)
.await;
if result.success {
if let Some(workflow_id) = extract_workflow_id(&result.stdout) {
let workflow_name = serde_json::from_str::<serde_json::Value>(&spec_content)
.ok()
.and_then(|spec| spec.get("name").and_then(|v| v.as_str()).map(String::from))
.unwrap_or_else(|| "workflow".to_string());
let sanitized_name: String = workflow_name
.chars()
.map(|c| {
if c.is_alphanumeric() || c == '-' || c == '_' {
c
} else {
'_'
}
})
.collect();
let final_path = format!("{}_{}{}", sanitized_name, workflow_id, file_extension);
match tokio::fs::rename(&temp_path, &final_path).await {
Ok(_) => {
info!("Saved workflow spec to: {}", final_path);
}
Err(e) => {
warn!(
"Failed to rename workflow spec from {} to {}: {}. Keeping original file.",
temp_path, final_path, e
);
}
}
} else {
let fallback_path = format!("workflow_{}{}", uuid::Uuid::new_v4(), file_extension);
if let Err(e) = tokio::fs::rename(&temp_path, &fallback_path).await {
warn!(
"Failed to preserve workflow spec as {}: {}. File remains at {}.",
fallback_path, e, temp_path
);
} else {
info!(
"Saved workflow spec to: {} (ID extraction failed but workflow was created)",
fallback_path
);
}
}
} else {
if let Err(e) = tokio::fs::remove_file(&temp_path).await {
warn!("Failed to clean up temp file {}: {}", temp_path, e);
}
}
result
};
Json(result)
}
async fn cli_create_slurm_handler(
State(state): State<Arc<AppState>>,
Json(req): Json<CreateSlurmRequest>,
) -> impl IntoResponse {
let spec_content = req.spec.clone();
let raw_extension = req.file_extension.as_deref().unwrap_or(".json");
let file_extension = match validate_file_extension(raw_extension) {
Some(ext) => ext,
None => {
return Json(CliResponse {
success: false,
stdout: String::new(),
stderr: format!(
"Invalid file extension '{}'. Allowed: .json, .yaml, .yml, .kdl, .json5",
raw_extension
),
exit_code: None,
});
}
};
let result = if req.is_file {
let mut args = vec![
"-f",
"json",
"workflows",
"create-slurm",
"--account",
&req.account,
];
if let Some(ref profile) = req.profile {
args.push("--hpc-profile");
args.push(profile);
}
args.push(&req.spec);
run_torc_command(&state.torc_bin, &args, &state.api_url).await
} else {
let unique_id = uuid::Uuid::new_v4();
let temp_path = format!("torc_spec_{}{}", unique_id, file_extension);
if let Err(e) = tokio::fs::write(&temp_path, &spec_content).await {
return Json(CliResponse {
success: false,
stdout: String::new(),
stderr: format!("Failed to write spec file: {}", e),
exit_code: None,
});
}
let mut args = vec![
"-f",
"json",
"workflows",
"create-slurm",
"--account",
&req.account,
];
if let Some(ref profile) = req.profile {
args.push("--hpc-profile");
args.push(profile);
}
args.push(&temp_path);
let result = run_torc_command(&state.torc_bin, &args, &state.api_url).await;
if result.success {
if let Some(workflow_id) = extract_workflow_id(&result.stdout) {
let workflow_name = serde_json::from_str::<serde_json::Value>(&spec_content)
.ok()
.and_then(|spec| spec.get("name").and_then(|v| v.as_str()).map(String::from))
.unwrap_or_else(|| "workflow".to_string());
let sanitized_name: String = workflow_name
.chars()
.map(|c| {
if c.is_alphanumeric() || c == '-' || c == '_' {
c
} else {
'_'
}
})
.collect();
let final_path = format!("{}_{}{}", sanitized_name, workflow_id, file_extension);
match tokio::fs::rename(&temp_path, &final_path).await {
Ok(_) => {
info!("Saved workflow spec to: {}", final_path);
}
Err(e) => {
warn!(
"Failed to rename workflow spec from {} to {}: {}. Keeping original file.",
temp_path, final_path, e
);
}
}
} else {
let fallback_path = format!("workflow_{}{}", uuid::Uuid::new_v4(), file_extension);
if let Err(e) = tokio::fs::rename(&temp_path, &fallback_path).await {
warn!(
"Failed to preserve workflow spec as {}: {}. File remains at {}.",
fallback_path, e, temp_path
);
} else {
info!(
"Saved workflow spec to: {} (ID extraction failed but workflow was created)",
fallback_path
);
}
}
} else {
if let Err(e) = tokio::fs::remove_file(&temp_path).await {
warn!("Failed to clean up temp file {}: {}", temp_path, e);
}
}
result
};
Json(result)
}
async fn cli_validate_handler(
State(state): State<Arc<AppState>>,
Json(req): Json<CreateRequest>,
) -> impl IntoResponse {
let result = if req.is_file {
run_torc_command(
&state.torc_bin,
&["-f", "json", "workflows", "create", &req.spec, "--dry-run"],
&state.api_url,
)
.await
} else {
let extension = req.file_extension.as_deref().unwrap_or(".json");
let unique_id = uuid::Uuid::new_v4();
let temp_path = format!("/tmp/torc_spec_{}{}", unique_id, extension);
if let Err(e) = tokio::fs::write(&temp_path, &req.spec).await {
return Json(CliResponse {
success: false,
stdout: String::new(),
stderr: format!("Failed to write temp file: {}", e),
exit_code: None,
});
}
let result = run_torc_command(
&state.torc_bin,
&["-f", "json", "workflows", "create", &temp_path, "--dry-run"],
&state.api_url,
)
.await;
let _ = tokio::fs::remove_file(&temp_path).await;
result
};
Json(result)
}
async fn cli_run_handler(
State(state): State<Arc<AppState>>,
Json(req): Json<WorkflowIdRequest>,
) -> impl IntoResponse {
let result = run_torc_command(
&state.torc_bin,
&["workflows", "run", &req.workflow_id],
&state.api_url,
)
.await;
Json(result)
}
async fn cli_submit_handler(
State(state): State<Arc<AppState>>,
Json(req): Json<WorkflowIdRequest>,
) -> impl IntoResponse {
let result = run_torc_command(
&state.torc_bin,
&["workflows", "submit", &req.workflow_id],
&state.api_url,
)
.await;
Json(result)
}
async fn cli_initialize_handler(
State(state): State<Arc<AppState>>,
Json(req): Json<InitializeRequest>,
) -> impl IntoResponse {
let mut args = vec!["workflows", "initialize", &req.workflow_id];
if req.force {
args.push("--force");
}
let result = run_torc_command(&state.torc_bin, &args, &state.api_url).await;
Json(result)
}
async fn cli_check_initialize_handler(
State(state): State<Arc<AppState>>,
Json(req): Json<WorkflowIdRequest>,
) -> impl IntoResponse {
let result = run_torc_command(
&state.torc_bin,
&[
"-f",
"json",
"workflows",
"initialize",
&req.workflow_id,
"--dry-run",
],
&state.api_url,
)
.await;
Json(result)
}
async fn cli_delete_handler(
State(state): State<Arc<AppState>>,
Json(req): Json<WorkflowIdRequest>,
) -> impl IntoResponse {
let result = run_torc_command(
&state.torc_bin,
&["workflows", "delete", "--no-prompts", &req.workflow_id],
&state.api_url,
)
.await;
Json(result)
}
async fn cli_cancel_handler(
State(state): State<Arc<AppState>>,
Json(req): Json<WorkflowIdRequest>,
) -> impl IntoResponse {
let result = run_torc_command(
&state.torc_bin,
&["workflows", "cancel", &req.workflow_id],
&state.api_url,
)
.await;
Json(result)
}
async fn cli_reinitialize_handler(
State(state): State<Arc<AppState>>,
Json(req): Json<InitializeRequest>,
) -> impl IntoResponse {
let mut args = vec!["workflows", "reinitialize", &req.workflow_id];
if req.force {
args.push("--force");
}
let result = run_torc_command(&state.torc_bin, &args, &state.api_url).await;
Json(result)
}
async fn cli_reset_status_handler(
State(state): State<Arc<AppState>>,
Json(req): Json<WorkflowIdRequest>,
) -> impl IntoResponse {
let result = run_torc_command(
&state.torc_bin,
&[
"workflows",
"reset-status",
"--no-prompts",
&req.workflow_id,
],
&state.api_url,
)
.await;
Json(result)
}
#[derive(Serialize)]
struct ExecutionPlanResponse {
success: bool,
data: Option<serde_json::Value>,
error: Option<String>,
}
async fn cli_execution_plan_handler(
State(state): State<Arc<AppState>>,
Json(req): Json<WorkflowIdRequest>,
) -> impl IntoResponse {
let args = vec![
"-f",
"json",
"workflows",
"execution-plan",
&req.workflow_id,
];
info!("Running: {} {}", state.torc_bin, args.join(" "));
let output = Command::new(&state.torc_bin)
.args(&args)
.env("TORC_API_URL", &state.api_url)
.output()
.await;
match output {
Ok(output) => {
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
if !output.status.success() {
return Json(ExecutionPlanResponse {
success: false,
data: None,
error: Some(format!("Command failed: {}", stderr)),
});
}
match serde_json::from_str::<serde_json::Value>(&stdout) {
Ok(data) => Json(ExecutionPlanResponse {
success: true,
data: Some(data),
error: None,
}),
Err(e) => Json(ExecutionPlanResponse {
success: false,
data: None,
error: Some(format!(
"Failed to parse JSON output: {}. Output: {}",
e, stdout
)),
}),
}
}
Err(e) => Json(ExecutionPlanResponse {
success: false,
data: None,
error: Some(format!("Failed to execute command: {}", e)),
}),
}
}
async fn cli_run_stream_handler(
State(state): State<Arc<AppState>>,
axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
) -> impl IntoResponse {
let workflow_id = match params.get("workflow_id") {
Some(id) => id.clone(),
None => {
return Err((StatusCode::BAD_REQUEST, "Missing workflow_id parameter"));
}
};
info!("Starting streaming run for workflow: {}", workflow_id);
let torc_bin = state.torc_bin.clone();
let api_url = state.api_url.clone();
let http_client = state.client.clone();
let stream = async_stream::stream! {
yield Ok::<_, std::convert::Infallible>(Event::default()
.event("start")
.data(format!("Running workflow {}", workflow_id)));
let mut child = match Command::new(&torc_bin)
.args(["workflows", "run", &workflow_id])
.env("TORC_API_URL", &api_url)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
{
Ok(child) => child,
Err(e) => {
yield Ok(Event::default()
.event("error")
.data(format!("Failed to start command: {}", e)));
yield Ok(Event::default()
.event("end")
.data("error"));
return;
}
};
let stdout = child.stdout.take();
let stderr = child.stderr.take();
let (tx, mut rx) = tokio::sync::mpsc::channel::<(String, String)>(100);
if let Some(stdout) = stdout {
let tx_stdout = tx.clone();
tokio::spawn(async move {
let reader = BufReader::new(stdout);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
let _ = tx_stdout.send(("stdout".to_string(), line)).await;
}
});
}
if let Some(stderr) = stderr {
let tx_stderr = tx.clone();
tokio::spawn(async move {
let reader = BufReader::new(stderr);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
let _ = tx_stderr.send(("stderr".to_string(), line)).await;
}
});
}
drop(tx);
let api_url_status = api_url.clone();
let workflow_id_status = workflow_id.clone();
let http_client_status = http_client.clone();
let (status_tx, mut status_rx) = tokio::sync::mpsc::channel::<String>(10);
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(3));
loop {
interval.tick().await;
let url = format!("{}/jobs?workflow_id={}&limit={}", api_url_status, workflow_id_status, torc::MAX_RECORD_TRANSFER_COUNT);
match http_client_status.get(&url).send().await {
Ok(resp) if resp.status().is_success() => {
if let Ok(json) = resp.json::<serde_json::Value>().await {
let items = json.get("items").and_then(|v| v.as_array());
if let Some(jobs) = items {
let mut uninitialized = 0i64;
let mut blocked = 0i64;
let mut ready = 0i64;
let mut pending = 0i64;
let mut running = 0i64;
let mut completed = 0i64;
let mut failed = 0i64;
let mut canceled = 0i64;
let mut terminated = 0i64;
let mut disabled = 0i64;
for job in jobs {
let status = job.get("status").and_then(|v| v.as_i64()).unwrap_or(-1);
match status {
0 => uninitialized += 1,
1 => blocked += 1,
2 => ready += 1,
3 => pending += 1,
4 => running += 1,
5 => completed += 1,
6 => failed += 1,
7 => canceled += 1,
8 => terminated += 1,
9 => disabled += 1,
_ => {} }
}
let total = jobs.len() as i64;
let mut parts = Vec::new();
if completed > 0 { parts.push(format!("{} completed", completed)); }
if running > 0 { parts.push(format!("{} running", running)); }
if ready > 0 { parts.push(format!("{} ready", ready)); }
if pending > 0 { parts.push(format!("{} pending", pending)); }
if blocked > 0 { parts.push(format!("{} blocked", blocked)); }
if failed > 0 { parts.push(format!("{} failed", failed)); }
if uninitialized > 0 { parts.push(format!("{} uninitialized", uninitialized)); }
if canceled > 0 { parts.push(format!("{} canceled", canceled)); }
if terminated > 0 { parts.push(format!("{} terminated", terminated)); }
if disabled > 0 { parts.push(format!("{} disabled", disabled)); }
let status_msg = format!(
"Jobs: {} (total: {})",
if parts.is_empty() { "none".to_string() } else { parts.join(", ") },
total
);
if status_tx.send(status_msg).await.is_err() {
break; }
if running == 0 && ready == 0 && pending == 0 && blocked == 0 && uninitialized == 0 {
break;
}
}
}
}
_ => {} }
}
});
loop {
tokio::select! {
msg = rx.recv() => {
match msg {
Some((event_type, line)) => {
yield Ok(Event::default()
.event(&event_type)
.data(line));
}
None => {
break;
}
}
}
status = status_rx.recv() => {
if let Some(status_msg) = status {
yield Ok(Event::default()
.event("status")
.data(status_msg));
}
}
}
}
match child.wait().await {
Ok(status) => {
let exit_code = status.code().unwrap_or(-1);
yield Ok(Event::default()
.event("end")
.data(if status.success() { "success" } else { "failed" }));
yield Ok(Event::default()
.event("exit_code")
.data(exit_code.to_string()));
}
Err(e) => {
yield Ok(Event::default()
.event("error")
.data(format!("Failed to wait for command: {}", e)));
yield Ok(Event::default()
.event("end")
.data("error"));
}
}
};
Ok(Sse::new(stream).keep_alive(KeepAlive::default()))
}
async fn cli_recover_handler(
State(state): State<Arc<AppState>>,
Json(req): Json<RecoverRequest>,
) -> impl IntoResponse {
info!(
"Running recover for workflow: {} (dry_run={})",
req.workflow_id, req.dry_run
);
let torc_bin = state.torc_bin.clone();
let api_url = state.api_url.clone();
let mut args = vec![
"-f".to_string(),
"json".to_string(),
"recover".to_string(),
req.workflow_id.clone(),
"-o".to_string(),
req.output_dir.clone(),
"--memory-multiplier".to_string(),
req.memory_multiplier.to_string(),
"--runtime-multiplier".to_string(),
req.runtime_multiplier.to_string(),
];
if req.dry_run {
args.push("--dry-run".to_string());
}
if req.retry_unknown {
args.push("--retry-unknown".to_string());
}
let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
let output = Command::new(&torc_bin)
.args(&args_refs)
.env("TORC_API_URL", &api_url)
.output()
.await;
match output {
Ok(output) => {
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
let exit_code = output.status.code();
if output.status.success() {
if let Ok(json) = serde_json::from_str::<serde_json::Value>(&stdout) {
Json(serde_json::json!({
"success": true,
"data": json,
"stderr": stderr,
"exit_code": exit_code
}))
} else {
Json(serde_json::json!({
"success": true,
"stdout": stdout,
"stderr": stderr,
"exit_code": exit_code
}))
}
} else {
let error_msg = if let Ok(json) = serde_json::from_str::<serde_json::Value>(&stdout)
{
json.get("error")
.and_then(|e| e.as_str())
.unwrap_or(&stderr)
.to_string()
} else {
stderr.clone()
};
Json(serde_json::json!({
"success": false,
"error": error_msg,
"stdout": stdout,
"stderr": stderr,
"exit_code": exit_code
}))
}
}
Err(e) => Json(serde_json::json!({
"success": false,
"error": format!("Failed to execute recover command: {}", e)
})),
}
}
async fn cli_sync_status_handler(
State(state): State<Arc<AppState>>,
Json(req): Json<SyncStatusRequest>,
) -> impl IntoResponse {
info!(
"Running sync-status for workflow: {} (dry_run={})",
req.workflow_id, req.dry_run
);
let mut args = vec![
"-f".to_string(),
"json".to_string(),
"workflows".to_string(),
"sync-status".to_string(),
req.workflow_id.clone(),
];
if req.dry_run {
args.push("--dry-run".to_string());
}
let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
let output = Command::new(&state.torc_bin)
.args(&args_refs)
.env("TORC_API_URL", &state.api_url)
.output()
.await;
match output {
Ok(output) => {
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
let exit_code = output.status.code();
if output.status.success() {
if let Ok(json) = serde_json::from_str::<serde_json::Value>(&stdout) {
Json(serde_json::json!({
"success": true,
"data": json,
"stderr": stderr,
"exit_code": exit_code
}))
} else {
Json(serde_json::json!({
"success": true,
"stdout": stdout,
"stderr": stderr,
"exit_code": exit_code
}))
}
} else {
let error_msg = if let Ok(json) = serde_json::from_str::<serde_json::Value>(&stdout)
{
json.get("error")
.and_then(|e| e.as_str())
.unwrap_or(&stderr)
.to_string()
} else {
stderr.clone()
};
Json(serde_json::json!({
"success": false,
"error": error_msg,
"stdout": stdout,
"stderr": stderr,
"exit_code": exit_code
}))
}
}
Err(e) => Json(serde_json::json!({
"success": false,
"error": format!("Failed to execute sync-status command: {}", e)
})),
}
}
async fn cli_read_file_handler(Json(req): Json<ReadFileRequest>) -> impl IntoResponse {
let path = FsPath::new(&req.path);
let exists = path.exists();
if !exists {
return Json(ReadFileResponse {
success: true,
content: None,
error: None,
is_json: false,
exists: false,
});
}
if !path.is_file() {
return Json(ReadFileResponse {
success: false,
content: None,
error: Some("Path is not a file".to_string()),
is_json: false,
exists: true,
});
}
match tokio::fs::read_to_string(&req.path).await {
Ok(content) => {
let is_json = req.path.to_lowercase().ends_with(".json")
|| req.path.to_lowercase().ends_with(".json5");
let content = if is_json {
match serde_json::from_str::<serde_json::Value>(&content) {
Ok(value) => serde_json::to_string_pretty(&value).unwrap_or(content),
Err(_) => content, }
} else {
content
};
Json(ReadFileResponse {
success: true,
content: Some(content),
error: None,
is_json,
exists: true,
})
}
Err(e) => Json(ReadFileResponse {
success: false,
content: None,
error: Some(format!("Failed to read file: {}", e)),
is_json: false,
exists: true,
}),
}
}
async fn cli_export_handler(
State(state): State<Arc<AppState>>,
Json(req): Json<ExportRequest>,
) -> impl IntoResponse {
let default_output = format!("workflow_{}.json", req.workflow_id);
let output_raw = req.output.as_deref().unwrap_or(&default_output);
let output_path = FsPath::new(output_raw);
let output_abs = if output_path.is_relative() {
std::env::current_dir()
.map(|cwd| cwd.join(output_path).to_string_lossy().to_string())
.unwrap_or_else(|_| output_raw.to_string())
} else {
output_raw.to_string()
};
let mut args = vec![
"-f",
"json",
"workflows",
"export",
&req.workflow_id,
"-o",
&output_abs,
];
if req.include_results {
args.push("--include-results");
}
if req.include_events {
args.push("--include-events");
}
let result = run_torc_command(&state.torc_bin, &args, &state.api_url).await;
Json(result)
}
async fn cli_import_handler(
State(state): State<Arc<AppState>>,
Json(req): Json<ImportRequest>,
) -> impl IntoResponse {
let temp_path_owned;
let import_path = if let Some(ref path) = req.file_path {
path.as_str()
} else if let Some(ref content) = req.content {
let unique_id = uuid::Uuid::new_v4();
temp_path_owned = format!("/tmp/torc_import_{}.json", unique_id);
if let Err(e) = tokio::fs::write(&temp_path_owned, content).await {
return Json(CliResponse {
success: false,
stdout: String::new(),
stderr: format!("Failed to write temp file: {}", e),
exit_code: None,
});
}
temp_path_owned.as_str()
} else {
return Json(CliResponse {
success: false,
stdout: String::new(),
stderr: "Either file_path or content must be provided".to_string(),
exit_code: None,
});
};
let name_str;
let mut args = vec!["-f", "json", "workflows", "import", import_path];
if let Some(ref name) = req.name {
name_str = name.clone();
args.push("--name");
args.push(&name_str);
}
if req.skip_results {
args.push("--skip-results");
}
if req.skip_events {
args.push("--skip-events");
}
let result = run_torc_command(&state.torc_bin, &args, &state.api_url).await;
if req.file_path.is_none() && req.content.is_some() {
let _ = tokio::fs::remove_file(import_path).await;
}
Json(result)
}
#[derive(Deserialize)]
struct PlotResourcesRequest {
db_paths: Vec<String>,
#[serde(default)]
output_dir: Option<String>,
#[serde(default = "default_prefix")]
prefix: String,
}
fn default_prefix() -> String {
"resource_plot".to_string()
}
#[derive(Serialize)]
struct PlotResourcesResponse {
success: bool,
plots: Vec<PlotData>,
error: Option<String>,
}
#[derive(Serialize)]
struct PlotData {
name: String,
data: serde_json::Value,
}
#[derive(Deserialize)]
struct ListResourceDbsRequest {
base_dir: String,
}
#[derive(Serialize)]
struct ListResourceDbsResponse {
success: bool,
databases: Vec<ResourceDbInfo>,
error: Option<String>,
}
#[derive(Serialize)]
struct ResourceDbInfo {
path: String,
name: String,
size_bytes: u64,
modified: String,
}
async fn cli_plot_resources_handler(
State(state): State<Arc<AppState>>,
Json(req): Json<PlotResourcesRequest>,
) -> impl IntoResponse {
if req.db_paths.is_empty() {
return Json(PlotResourcesResponse {
success: false,
plots: vec![],
error: Some("No database paths provided".to_string()),
});
}
let temp_dir = std::env::temp_dir().join(format!("torc_plots_{}", std::process::id()));
let has_custom_output_dir = req.output_dir.is_some();
let output_dir = req
.output_dir
.map(std::path::PathBuf::from)
.unwrap_or(temp_dir.clone());
if let Err(e) = tokio::fs::create_dir_all(&output_dir).await {
return Json(PlotResourcesResponse {
success: false,
plots: vec![],
error: Some(format!("Failed to create output directory: {}", e)),
});
}
let mut args: Vec<&str> = vec!["plot-resources"];
let db_paths: Vec<&str> = req.db_paths.iter().map(|s| s.as_str()).collect();
args.extend(db_paths.iter());
let output_dir_str = output_dir.to_string_lossy().to_string();
args.push("--output-dir");
args.push(&output_dir_str);
args.push("--prefix");
args.push(&req.prefix);
args.push("--format");
args.push("json");
info!("Running: {} {}", state.torc_bin, args.join(" "));
let output = Command::new(&state.torc_bin).args(&args).output().await;
match output {
Ok(output) => {
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Json(PlotResourcesResponse {
success: false,
plots: vec![],
error: Some(format!("Command failed: {}", stderr)),
});
}
let mut plots = Vec::new();
match tokio::fs::read_dir(&output_dir).await {
Ok(mut entries) => {
while let Ok(Some(entry)) = entries.next_entry().await {
let path = entry.path();
if let Some(name) = path.file_name().and_then(|n| n.to_str())
&& name.starts_with(&req.prefix)
&& name.ends_with(".json")
{
match tokio::fs::read_to_string(&path).await {
Ok(content) => {
match serde_json::from_str::<serde_json::Value>(&content) {
Ok(data) => {
plots.push(PlotData {
name: name.to_string(),
data,
});
}
Err(e) => {
warn!("Failed to parse plot JSON {}: {}", name, e);
}
}
}
Err(e) => {
warn!("Failed to read plot file {}: {}", name, e);
}
}
}
}
}
Err(e) => {
return Json(PlotResourcesResponse {
success: false,
plots: vec![],
error: Some(format!("Failed to read output directory: {}", e)),
});
}
}
if !has_custom_output_dir {
let _ = tokio::fs::remove_dir_all(&temp_dir).await;
}
plots.sort_by(|a, b| a.name.cmp(&b.name));
Json(PlotResourcesResponse {
success: true,
plots,
error: None,
})
}
Err(e) => Json(PlotResourcesResponse {
success: false,
plots: vec![],
error: Some(format!("Failed to execute command: {}", e)),
}),
}
}
async fn cli_list_resource_dbs_handler(
Json(req): Json<ListResourceDbsRequest>,
) -> impl IntoResponse {
let base_path = FsPath::new(&req.base_dir);
if !base_path.exists() {
return Json(ListResourceDbsResponse {
success: true,
databases: vec![],
error: None,
});
}
if !base_path.is_dir() {
return Json(ListResourceDbsResponse {
success: false,
databases: vec![],
error: Some("Path is not a directory".to_string()),
});
}
let mut databases = Vec::new();
match tokio::fs::read_dir(base_path).await {
Ok(mut entries) => {
while let Ok(Some(entry)) = entries.next_entry().await {
let path = entry.path();
if let Some(ext) = path.extension()
&& ext == "db"
&& let Ok(metadata) = tokio::fs::metadata(&path).await
{
let modified = metadata
.modified()
.ok()
.and_then(|t| {
t.duration_since(std::time::UNIX_EPOCH).ok().map(|d| {
chrono::DateTime::from_timestamp(d.as_secs() as i64, 0)
.map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
.unwrap_or_default()
})
})
.unwrap_or_default();
databases.push(ResourceDbInfo {
path: path.to_string_lossy().to_string(),
name: path
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_default(),
size_bytes: metadata.len(),
modified,
});
}
}
}
Err(e) => {
return Json(ListResourceDbsResponse {
success: false,
databases: vec![],
error: Some(format!("Failed to read directory: {}", e)),
});
}
}
databases.sort_by(|a, b| b.modified.cmp(&a.modified));
Json(ListResourceDbsResponse {
success: true,
databases,
error: None,
})
}
#[derive(Deserialize)]
struct SlurmParseLogsRequest {
workflow_id: i64,
#[serde(default = "default_output_dir")]
output_dir: String,
#[serde(default)]
errors_only: bool,
}
#[derive(Serialize)]
struct SlurmParseLogsResponse {
success: bool,
data: Option<serde_json::Value>,
error: Option<String>,
}
#[derive(Deserialize)]
struct SlurmSacctRequest {
workflow_id: i64,
#[serde(default = "default_output_dir")]
output_dir: String,
}
#[derive(Serialize)]
struct SlurmSacctResponse {
success: bool,
data: Option<serde_json::Value>,
error: Option<String>,
}
async fn cli_slurm_parse_logs_handler(
State(state): State<Arc<AppState>>,
Json(req): Json<SlurmParseLogsRequest>,
) -> impl IntoResponse {
let workflow_id_str = req.workflow_id.to_string();
let mut args = vec![
"-f",
"json",
"slurm",
"parse-logs",
&req.output_dir,
"--workflow-id",
&workflow_id_str,
];
if req.errors_only {
args.push("--errors-only");
}
info!("Running: {} {}", state.torc_bin, args.join(" "));
let output = Command::new(&state.torc_bin)
.args(&args)
.env("TORC_API_URL", &state.api_url)
.output()
.await;
match output {
Ok(output) => {
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
if !output.status.success() {
return Json(SlurmParseLogsResponse {
success: false,
data: None,
error: Some(format!("Command failed: {}", stderr)),
});
}
match serde_json::from_str::<serde_json::Value>(&stdout) {
Ok(data) => Json(SlurmParseLogsResponse {
success: true,
data: Some(data),
error: None,
}),
Err(e) => Json(SlurmParseLogsResponse {
success: false,
data: None,
error: Some(format!(
"Failed to parse JSON output: {}. Output: {}",
e, stdout
)),
}),
}
}
Err(e) => Json(SlurmParseLogsResponse {
success: false,
data: None,
error: Some(format!("Failed to execute command: {}", e)),
}),
}
}
async fn cli_slurm_sacct_handler(
State(state): State<Arc<AppState>>,
Json(req): Json<SlurmSacctRequest>,
) -> impl IntoResponse {
let workflow_id_str = req.workflow_id.to_string();
let args = vec![
"-f",
"json",
"slurm",
"sacct",
&workflow_id_str,
"--output-dir",
&req.output_dir,
];
info!("Running: {} {}", state.torc_bin, args.join(" "));
let output = Command::new(&state.torc_bin)
.args(&args)
.env("TORC_API_URL", &state.api_url)
.output()
.await;
match output {
Ok(output) => {
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
if !output.status.success() {
return Json(SlurmSacctResponse {
success: false,
data: None,
error: Some(format!("Command failed: {}", stderr)),
});
}
match serde_json::from_str::<serde_json::Value>(&stdout) {
Ok(data) => Json(SlurmSacctResponse {
success: true,
data: Some(data),
error: None,
}),
Err(e) => Json(SlurmSacctResponse {
success: false,
data: None,
error: Some(format!(
"Failed to parse JSON output: {}. Output: {}",
e, stdout
)),
}),
}
}
Err(e) => Json(SlurmSacctResponse {
success: false,
data: None,
error: Some(format!("Failed to execute command: {}", e)),
}),
}
}
#[derive(Serialize)]
struct HpcProfileInfo {
name: String,
display_name: String,
description: String,
is_detected: bool,
}
#[derive(Serialize)]
struct HpcProfilesResponse {
success: bool,
profiles: Vec<HpcProfileInfo>,
detected_profile: Option<String>,
error: Option<String>,
}
async fn cli_hpc_profiles_handler(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let args = vec!["-f", "json", "hpc", "list"];
info!("Running: {} {}", state.torc_bin, args.join(" "));
let output = Command::new(&state.torc_bin)
.args(&args)
.env("TORC_API_URL", &state.api_url)
.output()
.await;
match output {
Ok(output) => {
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
if !output.status.success() {
return Json(HpcProfilesResponse {
success: false,
profiles: vec![],
detected_profile: None,
error: Some(format!("Command failed: {}", stderr)),
});
}
match serde_json::from_str::<Vec<serde_json::Value>>(&stdout) {
Ok(items) => {
let mut profiles = Vec::new();
let mut detected_profile = None;
for item in items {
let name = item
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let display_name = item
.get("display_name")
.and_then(|v| v.as_str())
.unwrap_or(&name)
.to_string();
let description = item
.get("description")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let is_detected = item
.get("detected")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if is_detected {
detected_profile = Some(name.clone());
}
profiles.push(HpcProfileInfo {
name,
display_name,
description,
is_detected,
});
}
Json(HpcProfilesResponse {
success: true,
profiles,
detected_profile,
error: None,
})
}
Err(e) => Json(HpcProfilesResponse {
success: false,
profiles: vec![],
detected_profile: None,
error: Some(format!(
"Failed to parse JSON output: {}. Output: {}",
e, stdout
)),
}),
}
}
Err(e) => Json(HpcProfilesResponse {
success: false,
profiles: vec![],
detected_profile: None,
error: Some(format!("Failed to execute command: {}", e)),
}),
}
}
#[derive(Deserialize)]
struct ServerStartRequest {
#[serde(default = "default_server_port")]
port: u16,
#[serde(default)]
database: Option<String>,
#[serde(default = "default_completion_interval")]
completion_check_interval_secs: u32,
#[serde(default = "default_log_level")]
log_level: String,
}
fn default_server_port() -> u16 {
8080
}
fn default_completion_interval() -> u32 {
5
}
fn default_log_level() -> String {
"info".to_string()
}
#[derive(Serialize)]
struct ServerStartResponse {
success: bool,
message: String,
pid: Option<u32>,
port: Option<u16>,
}
#[derive(Serialize)]
struct ServerStopResponse {
success: bool,
message: String,
}
#[derive(Serialize)]
struct ServerStatusResponse {
running: bool,
managed: bool,
pid: Option<u32>,
port: Option<u16>,
output_lines: Vec<String>,
}
async fn server_start_handler(
State(state): State<Arc<AppState>>,
Json(req): Json<ServerStartRequest>,
) -> impl IntoResponse {
let mut managed = state.managed_server.lock().await;
if managed.pid.is_some() {
return Json(ServerStartResponse {
success: false,
message: "Server is already running".to_string(),
pid: managed.pid,
port: managed.port,
});
}
let mut args = vec![
"run".to_string(),
"--port".to_string(),
req.port.to_string(),
"--log-level".to_string(),
req.log_level.clone(),
"--completion-check-interval-secs".to_string(),
req.completion_check_interval_secs.to_string(),
];
if let Some(ref db) = req.database
&& !db.is_empty()
{
args.push("--database".to_string());
args.push(db.clone());
}
info!(
"Starting torc-server: {} {}",
state.torc_server_bin,
args.join(" ")
);
let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
match Command::new(&state.torc_server_bin)
.args(&args_refs)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
{
Ok(mut child) => {
let pid = child.id();
let mut actual_port = req.port;
if let Some(stdout) = child.stdout.take() {
let mut reader = BufReader::new(stdout);
let mut line = String::new();
let timeout = tokio::time::Duration::from_secs(10);
let start = std::time::Instant::now();
loop {
if start.elapsed() > timeout {
warn!("Timeout waiting for server to report port, using requested port");
break;
}
match tokio::time::timeout(
tokio::time::Duration::from_millis(100),
reader.read_line(&mut line),
)
.await
{
Ok(Ok(0)) => break, Ok(Ok(_)) => {
if let Some(port_str) = line.strip_prefix("TORC_SERVER_PORT=")
&& let Ok(port) = port_str.trim().parse::<u16>()
{
actual_port = port;
info!("Server reported actual port: {}", actual_port);
break;
}
line.clear();
}
Ok(Err(e)) => {
warn!("Error reading server output: {}", e);
break;
}
Err(_) => {
continue;
}
}
}
}
managed.pid = pid;
managed.port = Some(actual_port);
managed.output_lines.clear();
managed.output_lines.push(format!(
"Server started with PID {} on port {}",
pid.unwrap_or(0),
actual_port
));
Json(ServerStartResponse {
success: true,
message: format!("Server started on port {}", actual_port),
pid,
port: Some(actual_port),
})
}
Err(e) => Json(ServerStartResponse {
success: false,
message: format!("Failed to start server: {}", e),
pid: None,
port: None,
}),
}
}
async fn server_stop_handler(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let mut managed = state.managed_server.lock().await;
if let Some(pid) = managed.pid {
#[cfg(unix)]
{
let result = StdCommand::new("kill").arg(pid.to_string()).status();
match result {
Ok(status) if status.success() => {
managed.pid = None;
managed.port = None;
managed.output_lines.push("Server stopped".to_string());
Json(ServerStopResponse {
success: true,
message: "Server stopped".to_string(),
})
}
Ok(_) => {
let _ = StdCommand::new("kill")
.args(["-9", &pid.to_string()])
.status();
managed.pid = None;
managed.port = None;
Json(ServerStopResponse {
success: true,
message: "Server force stopped".to_string(),
})
}
Err(e) => Json(ServerStopResponse {
success: false,
message: format!("Failed to stop server: {}", e),
}),
}
}
#[cfg(not(unix))]
{
let result = StdCommand::new("taskkill")
.args(["/PID", &pid.to_string(), "/F"])
.status();
match result {
Ok(status) if status.success() => {
managed.pid = None;
managed.port = None;
Json(ServerStopResponse {
success: true,
message: "Server stopped".to_string(),
})
}
_ => Json(ServerStopResponse {
success: false,
message: "Failed to stop server".to_string(),
}),
}
}
} else {
Json(ServerStopResponse {
success: false,
message: "No managed server is running".to_string(),
})
}
}
async fn server_status_handler(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let managed = state.managed_server.lock().await;
let mut running = false;
if let Some(pid) = managed.pid {
#[cfg(unix)]
{
if let Ok(status) = StdCommand::new("kill")
.args(["-0", &pid.to_string()])
.status()
{
running = status.success();
}
}
#[cfg(not(unix))]
{
if let Ok(output) = StdCommand::new("tasklist")
.args(["/FI", &format!("PID eq {}", pid)])
.output()
{
let output_str = String::from_utf8_lossy(&output.stdout);
running = output_str.contains(&pid.to_string());
}
}
}
Json(ServerStatusResponse {
running,
managed: managed.pid.is_some(),
pid: if running { managed.pid } else { None },
port: if running { managed.port } else { None },
output_lines: managed.output_lines.clone(),
})
}
#[derive(Serialize)]
struct VersionResponse {
version: String,
api_version: String,
server_version: Option<String>,
server_api_version: Option<String>,
version_mismatch: Option<String>,
mismatch_severity: Option<String>,
}
async fn version_handler(State(state): State<Arc<AppState>>) -> impl IntoResponse {
use torc::client::version_check;
let api_url = state.api_url.clone();
let result = tokio::task::spawn_blocking(move || {
let config = torc::client::apis::configuration::Configuration {
base_path: api_url,
..Default::default()
};
version_check::check_version(&config)
})
.await
.ok();
let (server_version, server_api_version, version_mismatch, mismatch_severity) = match result {
Some(result) => match &result.server_version {
Some(server_ver) => {
let severity_str = match result.severity {
version_check::VersionMismatchSeverity::None => None,
version_check::VersionMismatchSeverity::Patch => Some("patch".to_string()),
version_check::VersionMismatchSeverity::Minor => Some("minor".to_string()),
version_check::VersionMismatchSeverity::Major => Some("major".to_string()),
};
let mismatch_msg = if result.severity.has_warning() {
Some(result.message.clone())
} else {
None
};
(
Some(server_ver.clone()),
result.server_api_version.clone(),
mismatch_msg,
severity_str,
)
}
None => (None, None, None, None),
},
None => (None, None, None, None),
};
let server_version_display = server_version
.as_ref()
.map(|v| v.split(" (").next().unwrap_or(v).to_string());
Json(VersionResponse {
version: env!("CARGO_PKG_VERSION").to_string(),
api_version: version_check::CLIENT_API_VERSION.to_string(),
server_version: server_version_display,
server_api_version,
version_mismatch,
mismatch_severity,
})
}
#[derive(Serialize)]
struct UserResponse {
user: String,
}
async fn user_handler() -> impl IntoResponse {
let user = std::env::var("USER")
.or_else(|_| std::env::var("USERNAME"))
.unwrap_or_else(|_| "unknown".to_string());
Json(UserResponse { user })
}
#[derive(Deserialize)]
struct ChatRequest {
messages: Vec<ChatMessage>,
#[serde(default)]
workflow_id: Option<i64>,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
struct ChatMessage {
role: String,
content: ChatContent,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(untagged)]
enum ChatContent {
Text(String),
Blocks(Vec<serde_json::Value>),
}
#[derive(Serialize)]
struct ChatStatusResponse {
available: bool,
#[serde(skip_serializing_if = "Option::is_none")]
reason: Option<String>,
}
async fn chat_status_handler(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let available = state.anthropic_api_key.read().await.is_some();
let reason = if !available {
Some(
"No API key configured. Set ANTHROPIC_API_KEY or \
ANTHROPIC_FOUNDRY_API_KEY + ANTHROPIC_FOUNDRY_RESOURCE"
.to_string(),
)
} else {
None
};
Json(ChatStatusResponse { available, reason })
}
#[derive(Deserialize)]
struct ConfigureChatRequest {
api_key: String,
#[serde(default = "default_provider")]
provider: String,
#[serde(default)]
foundry_resource: Option<String>,
#[serde(default)]
base_url: Option<String>,
#[serde(default)]
auth_header: Option<String>,
}
fn default_provider() -> String {
"anthropic".to_string()
}
async fn configure_chat_handler(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Json(req): Json<ConfigureChatRequest>,
) -> impl IntoResponse {
validate_same_origin(&headers)
.map_err(|status| (status, "Cross-origin requests are not allowed"))?;
let key = req.api_key.trim().to_string();
if key.is_empty() {
return Err((StatusCode::BAD_REQUEST, "API key must not be empty"));
}
let (base_url, auth_header) = match req.provider.as_str() {
"foundry" => {
let resource = req
.foundry_resource
.as_deref()
.map(str::trim)
.filter(|s| !s.is_empty())
.ok_or((StatusCode::BAD_REQUEST, "Foundry resource name is required"))?;
info!(
"AI Chat: configured via dashboard UI as Azure AI Foundry (resource={})",
resource
);
(
format!("https://{}.services.ai.azure.com/anthropic/v1", resource),
"x-api-key".to_string(),
)
}
"custom" => {
let url = req
.base_url
.as_deref()
.map(str::trim)
.filter(|s| !s.is_empty())
.ok_or((StatusCode::BAD_REQUEST, "Base URL is required"))?
.to_string();
let header = req
.auth_header
.as_deref()
.map(str::trim)
.filter(|s| !s.is_empty())
.unwrap_or("x-api-key")
.to_string();
info!(
"AI Chat: configured via dashboard UI as custom endpoint (url={})",
url
);
(url, header)
}
_ => {
info!("AI Chat: configured via dashboard UI as direct Anthropic API");
(
"https://api.anthropic.com/v1".to_string(),
"x-api-key".to_string(),
)
}
};
*state.anthropic_api_key.write().await = Some(key);
*state.anthropic_base_url.write().await = base_url;
*state.anthropic_auth_header.write().await = auth_header;
Ok(Json(ChatStatusResponse {
available: true,
reason: None,
}))
}
async fn ensure_mcp_client(
state: &AppState,
) -> Result<(
rmcp::service::Peer<rmcp::service::RoleClient>,
Vec<rmcp::model::Tool>,
)> {
let mut guard = state.mcp_client.lock().await;
if let Some(ref client) = *guard {
return Ok((client.peer.clone(), client.tools.clone()));
}
info!(
"Spawning torc-mcp-server: {} --api-url {}",
state.torc_mcp_server_bin, state.api_url
);
let mut command = tokio::process::Command::new(&state.torc_mcp_server_bin);
command
.arg("--api-url")
.arg(&state.api_url)
.stderr(std::process::Stdio::inherit());
let child_process = TokioChildProcess::new(command)
.map_err(|e| anyhow::anyhow!("Failed to spawn torc-mcp-server: {}", e))?;
let running_service: rmcp::service::RunningService<rmcp::service::RoleClient, _> = ()
.serve(child_process)
.await
.map_err(|e| anyhow::anyhow!("MCP handshake failed: {}", e))?;
let peer = running_service.peer().clone();
let tools = peer
.list_all_tools()
.await
.map_err(|e| anyhow::anyhow!("Failed to list MCP tools: {}", e))?;
info!("MCP client connected, discovered {} tools", tools.len());
*guard = Some(McpClient {
peer: peer.clone(),
tools: tools.clone(),
});
Ok((peer, tools))
}
fn mcp_tools_to_claude_tools(tools: &[rmcp::model::Tool]) -> Vec<serde_json::Value> {
tools
.iter()
.map(|tool| {
serde_json::json!({
"name": tool.name.as_ref(),
"description": tool.description.as_ref(),
"input_schema": tool.schema_as_json_value(),
})
})
.collect()
}
fn chat_system_prompt(workflow_id: Option<i64>) -> String {
let mut prompt = String::from(
"You are an AI assistant for the Torc workflow orchestration system. \
You help users manage, monitor, debug, and recover computational workflows. \
You have access to tools that let you interact with the Torc server.\n\n\
When a user asks about workflows or jobs, use the available tools to get real data \
rather than speculating. Be concise and helpful.\n\n\
When showing job or workflow data, format it clearly. \
If a tool returns an error, explain what went wrong and suggest alternatives.",
);
if let Some(wf_id) = workflow_id {
prompt.push_str(&format!(
"\n\nThe user is currently viewing workflow {}. \
Use this workflow_id by default when calling tools, unless they specify a different one.",
wf_id
));
}
prompt
}
fn extract_tool_result_text(result: &rmcp::model::CallToolResult) -> String {
result
.content
.iter()
.filter_map(|c| c.as_text().map(|t| t.text.clone()))
.collect::<Vec<_>>()
.join("\n")
}
fn validate_same_origin(headers: &HeaderMap) -> Result<(), StatusCode> {
let origin = match headers.get(header::ORIGIN).and_then(|v| v.to_str().ok()) {
Some(origin) => origin,
None => return Ok(()),
};
let host = headers
.get(header::HOST)
.and_then(|v| v.to_str().ok())
.ok_or(StatusCode::FORBIDDEN)?;
let expected_http = format!("http://{}", host);
let expected_https = format!("https://{}", host);
if origin == expected_http || origin == expected_https {
Ok(())
} else {
Err(StatusCode::FORBIDDEN)
}
}
async fn chat_handler(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Json(req): Json<ChatRequest>,
) -> impl IntoResponse {
validate_same_origin(&headers)
.map_err(|status| (status, "Cross-origin requests are not allowed"))?;
let api_key = match state.anthropic_api_key.read().await.clone() {
Some(key) => key,
None => {
return Err((StatusCode::SERVICE_UNAVAILABLE, "No API key configured"));
}
};
let model = state.anthropic_model.clone();
let messages_url = format!("{}/messages", state.anthropic_base_url.read().await);
let auth_header = state.anthropic_auth_header.read().await.clone();
let workflow_id = req.workflow_id;
let initial_messages = req.messages;
let (peer, tools) = match ensure_mcp_client(&state).await {
Ok(result) => result,
Err(e) => {
*state.mcp_client.lock().await = None;
error!("MCP client error: {}", e);
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to connect to MCP server",
));
}
};
let claude_tools = mcp_tools_to_claude_tools(&tools);
let system_prompt = chat_system_prompt(workflow_id);
let http_client = state.client.clone();
let stream = async_stream::stream! {
let mut messages: Vec<serde_json::Value> = initial_messages
.iter()
.map(|m| {
serde_json::json!({
"role": m.role,
"content": m.content,
})
})
.collect();
let max_tool_rounds = 20;
let mut round = 0;
loop {
if round >= max_tool_rounds {
yield Ok::<_, std::convert::Infallible>(Event::default()
.event("error")
.data("Maximum tool call rounds reached"));
break;
}
round += 1;
let api_body = serde_json::json!({
"model": model,
"max_tokens": 8192,
"system": system_prompt,
"messages": messages,
"tools": claude_tools,
});
let response = match http_client
.post(&messages_url)
.header(&auth_header, &api_key)
.header("anthropic-version", "2023-06-01")
.header("content-type", "application/json")
.json(&api_body)
.send()
.await
{
Ok(resp) => resp,
Err(e) => {
yield Ok(Event::default()
.event("error")
.data(format!("API request failed: {}", e)));
break;
}
};
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
yield Ok(Event::default()
.event("error")
.data(format!("Claude API error ({}): {}", status, body)));
break;
}
let resp_json: serde_json::Value = match response.json().await {
Ok(json) => json,
Err(e) => {
yield Ok(Event::default()
.event("error")
.data(format!("Failed to parse API response: {}", e)));
break;
}
};
let stop_reason = resp_json["stop_reason"].as_str().unwrap_or("end_turn");
let content_blocks = resp_json["content"].as_array().cloned().unwrap_or_default();
let mut text_parts = Vec::new();
let mut tool_uses = Vec::new();
for block in &content_blocks {
match block["type"].as_str() {
Some("text") => {
if let Some(text) = block["text"].as_str() {
let text_owned = text.to_string();
let json_text = serde_json::to_string(&text_owned)
.unwrap_or_else(|_| format!("\"{}\"", text_owned));
yield Ok(Event::default()
.event("text")
.data(json_text));
text_parts.push(text_owned);
}
}
Some("tool_use") => {
tool_uses.push(block.clone());
yield Ok(Event::default()
.event("tool_use")
.data(serde_json::json!({
"id": block["id"],
"name": block["name"],
"input": block["input"],
}).to_string()));
}
_ => {}
}
}
if stop_reason != "tool_use" || tool_uses.is_empty() {
yield Ok(Event::default()
.event("done")
.data(""));
break;
}
let mut tool_results = Vec::new();
for tool_use in &tool_uses {
let tool_name = tool_use["name"].as_str().unwrap_or("");
let tool_id = tool_use["id"].as_str().unwrap_or("");
let tool_input = tool_use["input"].as_object();
let arguments = tool_input.cloned();
let request = arguments.map_or_else(
|| CallToolRequestParams::new(tool_name.to_string()),
|arguments| {
CallToolRequestParams::new(tool_name.to_string()).with_arguments(arguments)
},
);
match peer
.call_tool(request)
.await
{
Ok(result) => {
let result_text = extract_tool_result_text(&result);
let is_error = result.is_error.unwrap_or(false);
yield Ok(Event::default()
.event("tool_result")
.data(serde_json::json!({
"id": tool_id,
"name": tool_name,
"result": result_text,
"is_error": is_error,
}).to_string()));
tool_results.push(serde_json::json!({
"type": "tool_result",
"tool_use_id": tool_id,
"content": result_text,
"is_error": is_error,
}));
}
Err(e) => {
let error_msg = format!("Tool call failed: {}", e);
yield Ok(Event::default()
.event("tool_result")
.data(serde_json::json!({
"id": tool_id,
"name": tool_name,
"result": error_msg,
"is_error": true,
}).to_string()));
tool_results.push(serde_json::json!({
"type": "tool_result",
"tool_use_id": tool_id,
"content": error_msg,
"is_error": true,
}));
}
}
}
messages.push(serde_json::json!({
"role": "assistant",
"content": content_blocks,
}));
messages.push(serde_json::json!({
"role": "user",
"content": tool_results,
}));
}
};
Ok(Sse::new(stream).keep_alive(KeepAlive::default()))
}
async fn run_torc_command(torc_bin: &str, args: &[&str], api_url: &str) -> CliResponse {
info!("Running: {} {}", torc_bin, args.join(" "));
let output = Command::new(torc_bin)
.args(args)
.env("TORC_API_URL", api_url)
.output()
.await;
match output {
Ok(output) => {
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
let success = output.status.success();
if !success {
warn!("Command failed: {} {}", torc_bin, args.join(" "));
warn!("stderr: {}", stderr);
}
CliResponse {
success,
stdout,
stderr,
exit_code: output.status.code(),
}
}
Err(e) => {
error!("Failed to execute command: {}", e);
CliResponse {
success: false,
stdout: String::new(),
stderr: format!("Failed to execute command: {}", e),
exit_code: None,
}
}
}
}