#![allow(missing_docs)]
use anyhow::Result;
use clap::{Args, Parser, builder::styling};
use dotenvy::dotenv;
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};
use std::env;
use std::path::PathBuf;
use std::str::FromStr;
use torc::config::TorcConfig;
use tracing::info;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use tracing_timing::{Builder, Histogram};
use torc::server::http_server;
use torc::server::logging;
use torc::server::service;
#[derive(Args, Clone, Default)]
struct ServerConfig {
#[arg(short, long, default_value = "info", env = "RUST_LOG")]
log_level: String,
#[arg(long)]
https: bool,
#[arg(long, env = "TORC_TLS_CERT")]
tls_cert: Option<String>,
#[arg(long, env = "TORC_TLS_KEY")]
tls_key: Option<String>,
#[arg(
short = 'H',
long,
visible_alias = "url",
visible_short_alias = 'u',
default_value = "0.0.0.0"
)]
host: String,
#[arg(short, long, default_value_t = 8080)]
port: u16,
#[arg(short, long, default_value_t = 1)]
threads: u32,
#[arg(short, long)]
database: Option<String>,
#[arg(long, env = "TORC_AUTH_FILE")]
auth_file: Option<String>,
#[arg(long, default_value_t = false)]
require_auth: bool,
#[arg(long, default_value_t = 60, env = "TORC_CREDENTIAL_CACHE_TTL_SECS")]
credential_cache_ttl_secs: u64,
#[arg(long, default_value_t = false)]
enforce_access_control: bool,
#[arg(long, env = "TORC_LOG_DIR")]
log_dir: Option<PathBuf>,
#[arg(long, default_value_t = false)]
json_logs: bool,
#[arg(long, default_value_t = false)]
daemon: bool,
#[arg(long, default_value = "/var/run/torc-server.pid")]
pid_file: PathBuf,
#[arg(short, long, env = "TORC_COMPLETION_CHECK_INTERVAL_SECS")]
completion_check_interval_secs: Option<f64>,
#[arg(long = "admin-user", env = "TORC_ADMIN_USERS")]
admin_users: Vec<String>,
#[arg(long, default_value_t = false, hide = true)]
shutdown_on_stdin_eof: bool,
}
const STYLES: styling::Styles = styling::Styles::styled()
.header(styling::AnsiColor::Green.on_default().bold())
.usage(styling::AnsiColor::Green.on_default().bold())
.literal(styling::AnsiColor::Cyan.on_default().bold())
.placeholder(styling::AnsiColor::Cyan.on_default());
#[derive(Parser)]
#[command(name = "torc-server")]
#[command(about = "Torc workflow orchestration server")]
#[command(styles = STYLES)]
#[command(
after_help = "Use 'torc-server run --help' to see server configuration options.\n\
Use 'torc-server service --help' to see service management options."
)]
struct Cli {
#[command(subcommand)]
command: Option<Commands>,
}
#[derive(clap::Subcommand)]
enum Commands {
Run {
#[command(flatten)]
config: ServerConfig,
},
Service {
#[command(subcommand)]
action: ServiceAction,
},
}
#[derive(clap::Subcommand)]
#[allow(clippy::large_enum_variant)]
enum ServiceAction {
Install {
#[arg(long)]
user: bool,
#[command(flatten)]
config: ServerConfig,
},
Uninstall {
#[arg(long)]
user: bool,
},
Start {
#[arg(long)]
user: bool,
},
Stop {
#[arg(long)]
user: bool,
},
Status {
#[arg(long)]
user: bool,
},
}
#[cfg(unix)]
fn daemonize_process(pid_file: &std::path::Path) -> Result<()> {
use daemonize::Daemonize;
let daemonize = Daemonize::new()
.pid_file(pid_file)
.working_directory(env::current_dir()?)
.umask(0o027);
daemonize
.start()
.map_err(|e| anyhow::anyhow!("Failed to daemonize: {}", e))?;
Ok(())
}
#[cfg(not(unix))]
fn daemonize_process(_pid_file: &std::path::Path) -> Result<()> {
anyhow::bail!("Daemon mode is only supported on Unix/Linux systems");
}
const DEFAULT_RUN_INTERVAL_SECS: f64 = 30.0;
fn main() -> Result<()> {
dotenv().ok();
let cli = Cli::parse();
match cli.command {
Some(Commands::Service { action }) => handle_service_action(action),
Some(Commands::Run { config }) => run_server(config),
None => {
let cli = Cli::parse_from(["torc-server", "run"]);
if let Some(Commands::Run { config }) = cli.command {
run_server(config)
} else {
unreachable!()
}
}
}
}
fn handle_service_action(action: ServiceAction) -> Result<()> {
let (command, user_level, config) = match action {
ServiceAction::Install { user, config } => {
let svc_config = service::ServiceConfig {
log_dir: config.log_dir,
database: config.database,
host: config.host,
port: config.port,
threads: config.threads,
auth_file: config.auth_file,
require_auth: config.require_auth,
credential_cache_ttl_secs: config.credential_cache_ttl_secs,
enforce_access_control: config.enforce_access_control,
log_level: config.log_level,
json_logs: config.json_logs,
https: config.https,
tls_cert: config.tls_cert,
tls_key: config.tls_key,
admin_users: config.admin_users,
completion_check_interval_secs: config.completion_check_interval_secs,
};
(service::ServiceCommand::Install, user, Some(svc_config))
}
ServiceAction::Uninstall { user } => (service::ServiceCommand::Uninstall, user, None),
ServiceAction::Start { user } => (service::ServiceCommand::Start, user, None),
ServiceAction::Stop { user } => (service::ServiceCommand::Stop, user, None),
ServiceAction::Status { user } => (service::ServiceCommand::Status, user, None),
};
service::execute_service_command(command, config.as_ref(), user_level)
}
fn run_server(cli_config: ServerConfig) -> Result<()> {
let file_config = TorcConfig::load().unwrap_or_default();
let server_file_config = &file_config.server;
let config = ServerConfig {
log_level: if cli_config.log_level != "info" {
cli_config.log_level
} else {
server_file_config.log_level.clone()
},
https: cli_config.https || server_file_config.https,
tls_cert: cli_config
.tls_cert
.or_else(|| server_file_config.tls_cert.clone()),
tls_key: cli_config
.tls_key
.or_else(|| server_file_config.tls_key.clone()),
host: if cli_config.host != "0.0.0.0" {
cli_config.host
} else {
server_file_config.host.clone()
},
port: if cli_config.port != 8080 {
cli_config.port
} else {
server_file_config.port
},
threads: if cli_config.threads != 1 {
cli_config.threads
} else {
server_file_config.threads
},
database: cli_config
.database
.or_else(|| server_file_config.database.clone()),
auth_file: cli_config
.auth_file
.or_else(|| server_file_config.auth_file.clone()),
require_auth: cli_config.require_auth || server_file_config.require_auth,
credential_cache_ttl_secs: if cli_config.credential_cache_ttl_secs != 60 {
cli_config.credential_cache_ttl_secs
} else {
server_file_config.credential_cache_ttl_secs
},
enforce_access_control: cli_config.enforce_access_control
|| server_file_config.enforce_access_control,
log_dir: cli_config
.log_dir
.or_else(|| server_file_config.logging.log_dir.clone()),
json_logs: cli_config.json_logs || server_file_config.logging.json_logs,
daemon: cli_config.daemon,
pid_file: cli_config.pid_file,
completion_check_interval_secs: cli_config
.completion_check_interval_secs
.or(Some(server_file_config.completion_check_interval_secs)),
admin_users: cli_config.admin_users,
shutdown_on_stdin_eof: cli_config.shutdown_on_stdin_eof,
};
if config.daemon {
daemonize_process(&config.pid_file)?;
}
let timing_enabled = env::var("TORC_TIMING_ENABLED")
.map(|v| v == "1" || v.to_lowercase() == "true")
.unwrap_or(false);
let _log_guard;
if timing_enabled {
let timing_layer = Builder::default()
.no_span_recursion()
.layer(|| Histogram::new_with_max(60_000_000_000, 2).unwrap());
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| config.log_level.clone().into());
if let Some(ref log_dir) = config.log_dir {
let file_writer = logging::create_rotating_writer(log_dir)?;
let (non_blocking, guard) = tracing_appender::non_blocking(file_writer);
_log_guard = Some(guard);
tracing_subscriber::registry()
.with(
tracing_subscriber::fmt::layer()
.with_writer(std::io::stderr)
.with_target(true)
.with_level(true)
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE),
)
.with(
tracing_subscriber::fmt::layer()
.with_writer(non_blocking)
.with_ansi(false)
.with_target(true)
.with_level(true)
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE),
)
.with(env_filter)
.with(timing_layer)
.init();
} else {
_log_guard = None;
tracing_subscriber::registry()
.with(
tracing_subscriber::fmt::layer()
.with_writer(std::io::stderr)
.with_target(true)
.with_level(true)
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE),
)
.with(env_filter)
.with(timing_layer)
.init();
}
info!("Timing instrumentation enabled - timing data is being collected");
if config.log_dir.is_some() {
info!(
"File logging configured with size-based rotation (10 MiB per file, 5 files max)"
);
}
info!(
"Use external tools like tokio-console or OpenTelemetry exporters to view timing data"
);
} else {
_log_guard = logging::init_logging(
config.log_dir.as_deref(),
&config.log_level,
config.json_logs,
)?;
}
let database_url = if let Some(db_path) = &config.database {
format!("sqlite:{}", db_path)
} else {
env::var("DATABASE_URL").expect("DATABASE_URL must be set or --database must be provided")
};
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(config.threads as usize)
.enable_all()
.build()?;
runtime.block_on(async {
let connect_options = SqliteConnectOptions::from_str(&database_url)?
.journal_mode(SqliteJournalMode::Wal)
.foreign_keys(true)
.create_if_missing(true)
.busy_timeout(std::time::Duration::from_secs(45))
.pragma("synchronous", "NORMAL")
.pragma("cache_size", "-16000");
let max_connections = config.threads.max(2) + 2;
let pool = SqlitePoolOptions::new()
.max_connections(max_connections)
.connect_with(connect_options)
.await?;
let version = env!("CARGO_PKG_VERSION");
let git_hash = env!("GIT_HASH");
info!(
"Starting torc-server version={} ({})",
version, git_hash
);
info!("Connected to database: {}", database_url);
info!("Database configured with WAL journal mode and foreign key constraints");
info!("Running database migrations...");
sqlx::migrate!("./torc-server/migrations")
.run(&pool)
.await
.expect("Failed to run migrations");
info!("Database migrations completed successfully");
let htpasswd = if let Some(auth_file_path) = &config.auth_file {
info!("Loading htpasswd file from: {}", auth_file_path);
match torc::server::htpasswd::HtpasswdFile::load(auth_file_path) {
Ok(htpasswd) => {
info!("Loaded {} users from htpasswd file", htpasswd.user_count());
Some(htpasswd)
}
Err(e) => {
eprintln!("Error loading htpasswd file: {}", e);
std::process::exit(1);
}
}
} else {
if config.require_auth {
eprintln!("Error: --require-auth specified but no --auth-file provided");
std::process::exit(1);
}
info!("No htpasswd file configured, authentication disabled");
None
};
if config.require_auth {
info!("Authentication is REQUIRED for all requests");
} else if htpasswd.is_some() {
info!("Authentication is OPTIONAL (backward compatible mode)");
}
let addr = format!("{}:{}", config.host, config.port);
info!(
"Tokio runtime configured with {} worker threads",
config.threads
);
info!("Listening on {}", addr);
let completion_check_interval_secs = config
.completion_check_interval_secs
.unwrap_or(DEFAULT_RUN_INTERVAL_SECS);
if config.enforce_access_control {
info!("Access control is ENABLED - users can only access their own workflows and workflows shared via access groups");
}
let mut admin_users = server_file_config.admin_users.clone();
for user in &config.admin_users {
if !admin_users.contains(user) {
admin_users.push(user.clone());
}
}
if !admin_users.is_empty() {
info!("Admin users configured: {:?}", admin_users);
}
http_server::create(
&addr,
config.https,
pool,
htpasswd,
config.require_auth,
config.credential_cache_ttl_secs,
config.enforce_access_control,
completion_check_interval_secs,
admin_users,
config.tls_cert,
config.tls_key,
config.auth_file.clone(),
config.shutdown_on_stdin_eof,
)
.await;
Ok(())
})
}