use colored::Colorize;
use tokio::process::Command;
use crate::cli::commands;
use crate::cli::error::{CliResult, ErrorFactory};
use crate::cli::ui;
use crate::commands::kafka_logs::LogConfig;
#[cfg(feature = "docker")]
use crate::commands::print_docker_ps;
use crate::commands::{
load_xbp_config, pm2_available, pm2_env, pm2_flush, pm2_list, pm2_logs, pm2_monitor,
pm2_resurrect, pm2_save, pm2_snapshot, pm2_start_wrapper, run_cloudflared_tcp,
run_interactive_shell, run_ports, start_log_shipping, tail_kafka_topic, CloudflaredTcpOptions,
InteractiveShellOptions,
};
#[cfg(any(feature = "docker", feature = "systemd"))]
use crate::logging::log_warn;
use crate::logging::{get_log_directory, log_error, log_info};
use crate::strategies::{get_all_services, ServiceConfig, XbpConfig};
#[derive(Debug, Clone, PartialEq, Eq)]
enum ConfiguredLogsRoute {
Pm2 {
process_name: String,
service_name: String,
},
#[cfg(feature = "systemd")]
Systemd {
unit_name: String,
service_name: String,
},
Unsupported {
service_name: String,
has_log_files: bool,
},
}
pub async fn handle_start(args: Vec<String>, debug: bool) -> CliResult<()> {
if let Err(e) =
ui::with_loader("Starting process with PM2", pm2_start_wrapper(args, debug)).await
{
let _ = log_error("start", "Failed to start process", Some(&e)).await;
return Err(ErrorFactory::operation(
"start",
"start process via PM2",
e,
Some("Pass a valid start command, e.g. `xbp start \"./binary --port 3000\"`."),
));
}
if let Err(e) = ui::with_loader("Saving PM2 process table", pm2_save(debug)).await {
let _ = log_error("start", "pm2 save failed", Some(&e)).await;
return Err(ErrorFactory::operation(
"start",
"persist PM2 process list",
e,
Some("Ensure PM2 has permission to write its dump file."),
));
}
if let Err(e) = ui::with_loader("Refreshing PM2 process list", pm2_list(debug)).await {
let _ = log_error("start", "pm2 list failed", Some(&e)).await;
return Err(ErrorFactory::operation(
"start",
"list PM2 processes",
e,
None,
));
}
Ok(())
}
pub async fn handle_logs_flag() -> CliResult<()> {
let log_dir = get_log_directory().await.map_err(|e| {
ErrorFactory::operation("logs", "resolve log directory", e.to_string(), None)
})?;
ui::section("Logs");
ui::divider(56);
println!(
" {} {}",
"Path:".bright_white(),
log_dir.display().to_string().cyan()
);
if cfg!(target_os = "windows") {
println!(" {}", "Opening in Explorer...".dimmed());
let _ = Command::new("explorer").arg(log_dir).spawn();
} else {
println!(" {}", "Quick view:".bright_blue());
println!(" cd {}", log_dir.display().to_string().cyan());
println!(" tail -f xbp-*.log");
}
Ok(())
}
pub async fn handle_ports(
cmd: commands::PortsCmd,
global_port: Option<u16>,
debug: bool,
) -> CliResult<()> {
let mut args: Vec<String> = Vec::new();
let port: Option<u16> = global_port.or(cmd.port);
if let Some(p) = port {
args.push("-p".to_string());
args.push(p.to_string());
}
if cmd.kill {
args.push("--kill".to_string());
args.push("-k".to_string());
}
if cmd.nginx {
args.push("-n".to_string());
args.push("--nginx".to_string());
}
if cmd.full {
args.push("--full".to_string());
}
if cmd.no_local {
args.push("--no-local".to_string());
}
if cmd.exposure {
args.push("--exposure".to_string());
}
if let Err(e) = ui::with_loader("Scanning host ports", run_ports(&args, debug)).await {
let _ = log_error("ports", "Error running ports", Some(&e)).await;
return Err(ErrorFactory::operation(
"ports",
"inspect ports",
e,
Some("Use `xbp ports -h` to verify valid flags and arguments."),
));
}
Ok(())
}
pub async fn handle_logs(cmd: commands::LogsCmd, debug: bool) -> CliResult<()> {
let commands::LogsCmd {
project,
ssh_host,
ssh_username,
ssh_password,
} = cmd;
let remote_requested = ssh_host.is_some() || ssh_username.is_some() || ssh_password.is_some();
if remote_requested {
if let Err(e) = ui::with_loader(
"Opening remote log stream",
crate::commands::ssh_logs::run_remote_logs(
project.clone(),
ssh_host,
ssh_username,
ssh_password,
debug,
),
)
.await
{
let _ = log_error("logs", "Remote logs failed", Some(&e)).await;
return Err(ErrorFactory::operation(
"logs",
"stream remote logs",
e,
Some("Verify SSH host, username, and password parameters."),
));
}
return Ok(());
}
let loaded_config = if project.is_some() {
load_xbp_config().await.ok()
} else {
None
};
#[cfg(feature = "docker")]
{
if let Some(ref target) = project {
match crate::commands::try_stream_docker_logs(target, debug).await {
Ok(Some(())) => return Ok(()),
Ok(None) => {}
Err(e) => {
let _ = log_error("docker", "docker logs failed", Some(&e)).await;
return Err(ErrorFactory::operation(
"docker",
"stream docker logs",
e,
Some("Check Docker daemon health and container identifiers."),
));
}
}
}
}
if let Some(requested) = project.as_deref() {
if let Some(config) = loaded_config.as_ref() {
match resolve_configured_logs_route(config, requested) {
Ok(Some(ConfiguredLogsRoute::Pm2 { process_name, .. })) => {
if let Err(e) = ui::with_loader(
"Opening PM2 log stream",
pm2_logs(Some(process_name), debug),
)
.await
{
let _ = log_error("pm2", "pm2 logs failed", Some(&e)).await;
return Err(ErrorFactory::operation(
"pm2",
"stream PM2 logs",
e,
Some("Run `xbp list` to inspect available PM2 process names."),
));
}
return Ok(());
}
#[cfg(feature = "systemd")]
Ok(Some(ConfiguredLogsRoute::Systemd {
unit_name,
service_name,
})) => {
let runtime = crate::commands::SystemdRuntime::detect();
if !runtime.is_available() {
return Err(ErrorFactory::config(
"logs",
format!(
"Service `{}` is configured to use systemd unit `{}`, but systemd log streaming is unavailable on this host/build.",
service_name, unit_name
),
Some(
"Use `xbp ssh` on the Linux host, or rebuild XBP with `--features systemd` where applicable.",
),
));
}
if let Err(e) = ui::with_loader(
"Opening systemd log stream",
crate::commands::stream_systemd_logs(&unit_name, 200),
)
.await
{
let _ = log_error("systemd", "systemd logs failed", Some(&e)).await;
return Err(ErrorFactory::operation(
"systemd",
"stream systemd logs",
e,
Some("Check `journalctl` access and the configured systemd service name."),
));
}
return Ok(());
}
Ok(Some(ConfiguredLogsRoute::Unsupported {
service_name,
has_log_files,
})) => {
let message = if has_log_files {
format!(
"Service `{}` is configured in this project, but `xbp logs` does not directly stream project `log_files` targets.",
service_name
)
} else {
format!(
"Service `{}` is configured in this project, but it does not use `start_wrapper: pm2` or `systemd_service_name` for direct `xbp logs` streaming.",
service_name
)
};
let hint = if has_log_files {
"Run `xbp tail` to follow configured log files, or add `start_wrapper: pm2` / `systemd_service_name` for direct `xbp logs <service>` support."
} else {
"Use `xbp service dev <name>` for foreground logs, or configure `start_wrapper: pm2`, `systemd_service_name`, or `log_files`."
};
return Err(ErrorFactory::config("logs", message, Some(hint)));
}
Ok(None) => {}
Err(error) => {
return Err(ErrorFactory::config(
"logs",
error,
Some("Run `xbp services` to inspect configured service names."),
));
}
}
}
}
if let Some(requested) = project.as_deref() {
if let Some(config) = loaded_config.as_ref() {
if pm2_available().await.is_err() {
let configured_services = configured_service_names(config);
if !configured_services.is_empty() {
return Err(ErrorFactory::config(
"logs",
format!(
"`{}` did not match any configured service in this project, and PM2 is not available for raw process log streaming. Configured services: {}.",
requested,
configured_services.join(", ")
),
Some(
"Run `xbp services` to inspect service names, or install PM2 for raw `xbp logs <pm2-process>` usage.",
),
));
}
}
}
}
if let Err(e) = ui::with_loader("Opening PM2 log stream", pm2_logs(project, debug)).await {
let _ = log_error("pm2", "pm2 logs failed", Some(&e)).await;
return Err(ErrorFactory::operation(
"pm2",
"stream PM2 logs",
e,
Some("Run `xbp list` to inspect available PM2 process names."),
));
}
Ok(())
}
fn resolve_configured_logs_route(
config: &XbpConfig,
requested: &str,
) -> Result<Option<ConfiguredLogsRoute>, String> {
let Some(service) = resolve_configured_log_service(config, requested)? else {
return Ok(None);
};
if matches!(service.start_wrapper.as_deref(), Some("pm2")) {
return Ok(Some(ConfiguredLogsRoute::Pm2 {
process_name: service.name.clone(),
service_name: service.name,
}));
}
#[cfg(feature = "systemd")]
if let Some(unit_name) = service.systemd_service_name.clone() {
return Ok(Some(ConfiguredLogsRoute::Systemd {
unit_name,
service_name: service.name,
}));
}
Ok(Some(ConfiguredLogsRoute::Unsupported {
service_name: service.name,
has_log_files: config
.log_files
.as_ref()
.is_some_and(|log_files| !log_files.is_empty()),
}))
}
fn resolve_configured_log_service(
config: &XbpConfig,
requested: &str,
) -> Result<Option<ServiceConfig>, String> {
let services = get_all_services(config);
if services.is_empty() {
return Ok(None);
}
if let Some(service) = services.iter().find(|service| service.name == requested) {
return Ok(Some(service.clone()));
}
let suffix = format!("-{}", requested);
let mut suffix_matches = services
.iter()
.filter(|service| service.name.ends_with(&suffix))
.cloned()
.collect::<Vec<_>>();
match suffix_matches.len() {
0 => Ok(None),
1 => Ok(suffix_matches.pop()),
_ => Err(format!(
"Service alias `{}` is ambiguous in this project: {}.",
requested,
suffix_matches
.iter()
.map(|service| service.name.as_str())
.collect::<Vec<_>>()
.join(", ")
)),
}
}
fn configured_service_names(config: &XbpConfig) -> Vec<String> {
get_all_services(config)
.into_iter()
.map(|service| service.name)
.collect()
}
pub async fn handle_ssh(cmd: commands::SshCmd, debug: bool) -> CliResult<()> {
let options = InteractiveShellOptions {
ssh_host: cmd.ssh_host,
ssh_port: cmd.ssh_port,
ssh_username: cmd.ssh_username,
ssh_password: cmd.ssh_password,
private_key: cmd.private_key,
private_key_passphrase: cmd.private_key_passphrase,
command: cmd.command,
term: cmd.term,
no_host_key_check: cmd.no_host_key_check,
host_key: cmd.host_key,
known_hosts_file: cmd.known_hosts_file,
cloudflared_hostname: cmd.cloudflared_hostname,
cloudflared_binary: cmd.cloudflared_binary,
cloudflared_destination: cmd.cloudflared_destination,
};
if let Err(e) = run_interactive_shell(options, debug).await {
let _ = log_error("shell", "Interactive shell failed", Some(&e)).await;
return Err(ErrorFactory::operation(
"ssh",
"open interactive SSH shell",
e,
Some(
"Use `--host-key` when tunneling through cloudflared, or `--no-host-key-check` for explicit insecure fallback.",
),
));
}
Ok(())
}
pub async fn handle_cloudflared(cmd: commands::CloudflaredCmd, debug: bool) -> CliResult<()> {
match cmd.command {
commands::CloudflaredSubCommand::Tcp(tcp_cmd) => {
let hostname = match tcp_cmd.hostname.as_deref().map(str::trim) {
Some(hostname) if !hostname.is_empty() => hostname.to_string(),
_ => {
return Err(ErrorFactory::validation(
"cloudflared",
"`xbp cloudflared tcp` requires `--hostname <access-hostname>`.",
Some("Example: `xbp cloudflared tcp --hostname bastion.example.com`."),
));
}
};
let options = CloudflaredTcpOptions {
hostname,
listener: tcp_cmd.listener,
destination: tcp_cmd.destination,
binary_path: tcp_cmd.binary,
};
if let Err(e) = run_cloudflared_tcp(options, debug).await {
let _ = log_error("cloudflared", "TCP forwarder failed", Some(&e)).await;
return Err(ErrorFactory::operation(
"cloudflared",
"run Access TCP forwarder",
e,
Some(
"Verify the Access hostname, local listener, and cloudflared install path.",
),
));
}
}
}
Ok(())
}
pub async fn handle_list(debug: bool) -> CliResult<()> {
if let Err(e) = ui::with_loader("Loading PM2 processes", pm2_list(debug)).await {
let _ = log_error("pm2", "pm2 list failed", Some(&e)).await;
return Err(ErrorFactory::operation(
"pm2",
"list PM2 processes",
e,
Some("Ensure PM2 is installed and running."),
));
}
#[cfg(feature = "docker")]
{
if let Err(e) = print_docker_ps(debug).await {
let _ = log_warn("docker", "docker ps snapshot failed", Some(&e)).await;
}
}
#[cfg(feature = "systemd")]
if let Err(e) = crate::commands::show_systemd_status(debug).await {
if debug {
let _ = log_warn(
"systemd",
"Could not check systemd status",
Some(&e.to_string()),
)
.await;
}
}
Ok(())
}
pub async fn handle_snapshot(debug: bool) -> CliResult<()> {
match ui::with_loader("Creating PM2 snapshot", pm2_snapshot(debug)).await {
Ok(path) => {
let _ = log_info(
"snapshot",
"Saved PM2 snapshot",
Some(&path.display().to_string()),
)
.await;
println!("Saved PM2 snapshot to {}", path.display());
Ok(())
}
Err(e) => {
let _ = log_error("snapshot", "PM2 snapshot failed", Some(&e)).await;
Err(ErrorFactory::operation(
"snapshot",
"create PM2 snapshot",
e,
Some("Verify PM2_HOME is writable."),
))
}
}
}
pub async fn handle_resurrect(debug: bool) -> CliResult<()> {
if let Err(e) = ui::with_loader("Restoring PM2 snapshot", pm2_resurrect(debug)).await {
let _ = log_error("pm2", "pm2 resurrect failed", Some(&e)).await;
return Err(ErrorFactory::operation(
"pm2",
"resurrect PM2 processes",
e,
Some("Create a snapshot first with `xbp snapshot`."),
));
}
Ok(())
}
pub async fn handle_stop(target: Option<String>, debug: bool) -> CliResult<()> {
let target = target.unwrap_or_else(|| "all".to_string());
if let Err(e) = ui::with_loader(
&format!("Stopping PM2 target `{}`", target),
crate::commands::pm2_stop(&target, debug),
)
.await
{
let _ = log_error("pm2", "pm2 stop failed", Some(&e)).await;
return Err(ErrorFactory::operation(
"pm2",
&format!("stop `{}`", target),
e,
Some("Use `xbp list` to confirm process names/ids."),
));
}
Ok(())
}
pub async fn handle_flush(target: Option<String>, debug: bool) -> CliResult<()> {
if let Err(e) = ui::with_loader("Flushing PM2 logs", pm2_flush(target.as_deref(), debug)).await
{
let _ = log_error("pm2", "pm2 flush failed", Some(&e)).await;
return Err(ErrorFactory::operation("pm2", "flush PM2 logs", e, None));
}
Ok(())
}
pub async fn handle_env(target: String, debug: bool) -> CliResult<()> {
if let Err(e) = ui::with_loader(
&format!("Inspecting PM2 env for `{}`", target),
pm2_env(&target, debug),
)
.await
{
let _ = log_error("pm2", "pm2 env failed", Some(&e)).await;
return Err(ErrorFactory::operation(
"pm2",
&format!("inspect env for `{}`", target),
e,
Some("Use PM2 process name or numeric id."),
));
}
Ok(())
}
pub async fn handle_monitor(cmd: commands::MonitorCmd, debug: bool) -> CliResult<()> {
match cmd.command {
Some(commands::MonitorSubCommand::Check) => {
if !crate::commands::service::is_xbp_project().await {
return crate::cli::handlers::project::handle_project_selection().await;
}
if let Err(e) = ui::with_loader(
"Running monitor check",
crate::commands::monitor::run_single_check(),
)
.await
{
let _ = log_error("monitor", "Monitor check failed", Some(&e.to_string())).await;
return Err(ErrorFactory::operation(
"monitor",
"run monitor check",
e.to_string(),
Some("Confirm monitoring config fields are present."),
));
}
}
Some(commands::MonitorSubCommand::Start) => {
if !crate::commands::service::is_xbp_project().await {
return crate::cli::handlers::project::handle_project_selection().await;
}
if let Err(e) = ui::with_loader(
"Starting monitor daemon",
crate::commands::monitor::start_monitor_daemon(),
)
.await
{
let _ = log_error("monitor", "Monitor daemon failed", Some(&e.to_string())).await;
return Err(ErrorFactory::operation(
"monitor",
"start monitor daemon",
e.to_string(),
Some("Validate monitor config and process permissions."),
));
}
}
None => {
if let Err(e) = ui::with_loader("Opening PM2 monitor UI", pm2_monitor(debug)).await {
let _ = log_error("monitor", "PM2 monitor failed", Some(&e)).await;
return Err(ErrorFactory::operation(
"monitor",
"open PM2 monitor",
e,
Some("Ensure PM2 daemon is available."),
));
}
}
}
Ok(())
}
pub async fn handle_tail(cmd: commands::TailCmd, _debug: bool) -> CliResult<()> {
if cmd.kafka {
match LogConfig::from_xbp_config().await {
Ok(Some(config)) => {
if let Err(e) =
ui::with_loader("Tailing Kafka topic", tail_kafka_topic(&config)).await
{
let _ =
log_error("tail", "Failed to tail Kafka topic", Some(&e.to_string())).await;
return Err(ErrorFactory::operation(
"tail",
"tail Kafka topic",
e.to_string(),
Some("Check Kafka broker/topic values in config."),
));
}
}
Ok(None) => {
let _ = log_error(
"tail",
"No log configuration found in xbp.yaml/xbp.json",
None,
)
.await;
return Err(ErrorFactory::config(
"tail",
"No log configuration found in xbp.yaml/xbp.json",
Some("Configure kafka logging fields or run `xbp tail` without `--kafka`."),
));
}
Err(e) => {
let _ =
log_error("tail", "Failed to load configuration", Some(&e.to_string())).await;
return Err(ErrorFactory::operation(
"tail",
"load log configuration",
e.to_string(),
Some("Fix xbp config syntax/path issues and retry."),
));
}
}
} else if cmd.ship {
if let Err(e) = ui::with_loader("Shipping logs to Kafka", start_log_shipping()).await {
let _ = log_error("tail", "Failed to ship logs", Some(&e.to_string())).await;
return Err(ErrorFactory::operation(
"tail",
"ship logs",
e.to_string(),
Some("Ensure log shipping backend is configured correctly."),
));
}
} else if let Err(e) = ui::with_loader("Starting live log tail", start_log_shipping()).await {
let _ = log_error("tail", "Failed to tail logs", Some(&e.to_string())).await;
return Err(ErrorFactory::operation(
"tail",
"tail logs",
e.to_string(),
Some("Ensure runtime log shipping prerequisites are met."),
));
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::{resolve_configured_logs_route, ConfiguredLogsRoute};
use crate::strategies::XbpConfig;
fn parse_config(yaml: &str) -> XbpConfig {
serde_yaml::from_str(yaml).expect("parse test config")
}
#[test]
fn configured_logs_route_uses_unique_suffix_alias_for_pm2_service() {
let config = parse_config(
r#"
project_name: athena
version: 1.0.0
port: 4052
build_dir: .
services:
- name: athena-api
target: rust
branch: main
port: 4052
start_wrapper: pm2
"#,
);
let route = resolve_configured_logs_route(&config, "api")
.expect("resolve route")
.expect("configured route");
assert_eq!(
route,
ConfiguredLogsRoute::Pm2 {
process_name: "athena-api".to_string(),
service_name: "athena-api".to_string(),
}
);
}
#[test]
fn configured_logs_route_marks_non_pm2_service_as_unsupported() {
let config = parse_config(
r#"
project_name: athena
version: 1.0.0
port: 4052
build_dir: .
services:
- name: athena-api
target: rust
branch: main
port: 4052
start_wrapper: null
"#,
);
let route = resolve_configured_logs_route(&config, "api")
.expect("resolve route")
.expect("configured route");
assert_eq!(
route,
ConfiguredLogsRoute::Unsupported {
service_name: "athena-api".to_string(),
has_log_files: false,
}
);
}
#[test]
fn configured_logs_route_rejects_ambiguous_suffix_aliases() {
let config = parse_config(
r#"
project_name: demo
version: 1.0.0
port: 3000
build_dir: .
services:
- name: athena-api
target: rust
branch: main
port: 4052
start_wrapper: pm2
- name: billing-api
target: rust
branch: main
port: 4053
start_wrapper: pm2
"#,
);
let error = resolve_configured_logs_route(&config, "api").expect_err("ambiguous alias");
assert!(error.contains("ambiguous"));
assert!(error.contains("athena-api"));
assert!(error.contains("billing-api"));
}
}