use colored::Colorize;
use netstat2::{get_sockets_info, AddressFamilyFlags, ProtocolFlags, ProtocolSocketInfo};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fs;
use std::path::PathBuf;
use std::process::Output;
use std::time::{Duration, Instant};
use tokio::process::Command;
use crate::logging::{log_info, log_timed, LogLevel};
use crate::sdk::nginx::inspect_nginx_configs;
use crate::strategies::XbpConfig;
use crate::utils::{
collect_known_xbp_projects, collect_listening_port_ownership, find_xbp_config_upwards,
parse_config_with_auto_heal,
};
use tracing::{debug, error, info, warn};
fn state_sort_order(state: &str) -> u8 {
let base = state.split_whitespace().next().unwrap_or(state);
match base.to_uppercase().as_str() {
"ESTABLISHED" => 0,
"LISTEN" => 1,
"SYNSENT" | "SYN_SENT" => 2,
"SYNRECEIVED" | "SYN_RECEIVED" => 3,
"TIMEWAIT" | "TIME_WAIT" => 4,
"CLOSEWAIT" | "CLOSE_WAIT" => 5,
_ => 6,
}
}
pub async fn run_ports(args: &[String], debug: bool) -> Result<(), String> {
let mut port_filter: Option<String> = None;
let mut kill: bool = false;
let mut nginx_search: bool = false;
let mut full_view: bool = false;
let mut no_local: bool = false;
let mut i: usize = 0;
while i < args.len() {
match args[i].as_str() {
"-p" => {
if let Some(p) = args.get(i + 1) {
port_filter = Some(p.clone());
i += 2;
} else {
return Err("-p requires a port value".to_string());
}
}
"--kill" => {
kill = true;
i += 1;
}
"-n" | "--nginx" => {
nginx_search = true;
i += 1;
}
"--full" => {
full_view = true;
i += 1;
}
"--no-local" => {
no_local = true;
i += 1;
}
_ => {
i += 1;
}
}
}
if debug {
info!("Debug mode enabled");
debug!("Args: {:?}", args);
}
let _ = log_info("ports", "Executing ports command", port_filter.as_deref()).await;
let start: Instant = Instant::now();
let command_output: String =
execute_ports_command_netstat2(port_filter.clone(), debug, kill, no_local).await;
let elapsed: Duration = start.elapsed();
let _ = log_timed(
LogLevel::Success,
"ports",
"Ports command completed",
elapsed.as_millis() as u64,
)
.await;
if debug {
debug!("execute_ports_command took: {:.2?}", elapsed);
}
if command_output.trim().is_empty() {
if let Some(port) = port_filter.clone() {
println!("No active processes found on port: {}", port);
} else {
println!("No listening TCP sockets found.");
}
} else {
display_output(command_output);
}
if nginx_search || full_view {
print_reconciled_ports(port_filter.as_deref()).await?;
if nginx_search {
if let Some(port) = port_filter {
info!("Searching NGINX configurations for port: {}", port);
search_nginx_configs(&port).await;
}
}
}
Ok(())
}
async fn execute_ports_command_netstat2(
port_filter: Option<String>,
debug: bool,
kill: bool,
no_local: bool,
) -> String {
let start: Instant = Instant::now();
if let Some(ref port) = port_filter {
return get_port_info_with_netstat(port, debug, kill, no_local).await;
}
let af_flags: AddressFamilyFlags = AddressFamilyFlags::IPV4 | AddressFamilyFlags::IPV6;
let proto_flags: ProtocolFlags = ProtocolFlags::TCP;
let sockets: Vec<netstat2::SocketInfo> = match get_sockets_info(af_flags, proto_flags) {
Ok(s) => s,
Err(e) => {
return format!("Failed to get sockets info: {}", e);
}
};
if debug {
debug!("Fetched {} TCP sockets", sockets.len());
}
let mut table_output: String = String::new();
let mut port_map: BTreeMap<u16, Vec<(&netstat2::SocketInfo, &netstat2::TcpSocketInfo)>> =
BTreeMap::new();
for socket in &sockets {
if let ProtocolSocketInfo::Tcp(ref tcp_info) = socket.protocol_socket_info {
port_map
.entry(tcp_info.local_port)
.or_default()
.push((socket, tcp_info));
}
}
let mut all_rows: Vec<(u16, String, String, String, String, HashSet<u32>)> = Vec::new();
let mut all_pids_to_kill: HashMap<u16, HashSet<u32>> = HashMap::new();
for (port, entries) in &port_map {
let mut groups: HashMap<(String, String, String), (usize, HashSet<u32>)> = HashMap::new();
let mut pids_to_kill: HashSet<u32> = HashSet::new();
for (socket, tcp_info) in entries {
if no_local && tcp_info.local_addr == tcp_info.remote_addr {
continue;
}
let state_str = format!("{:?}", tcp_info.state);
let key = (
tcp_info.local_addr.to_string(),
tcp_info.remote_addr.to_string(),
state_str,
);
let (count, group_pids) = groups.entry(key).or_insert((0, HashSet::new()));
*count += 1;
for pid in &socket.associated_pids {
pids_to_kill.insert(*pid);
group_pids.insert(*pid);
}
}
if groups.is_empty() {
continue;
}
all_pids_to_kill.insert(*port, pids_to_kill);
for ((local_addr, remote_addr, state), (count, group_pids)) in groups {
let pids: String = if group_pids.is_empty() {
"-".to_string()
} else {
let mut v: Vec<_> = group_pids.iter().map(|p| p.to_string()).collect();
v.sort();
v.join(",")
};
let count_suffix = if count > 1 {
format!(" (×{})", count)
} else {
String::new()
};
all_rows.push((
*port,
pids,
local_addr,
remote_addr,
format!("{}{}", state, count_suffix),
group_pids,
));
}
}
if !all_rows.is_empty() {
all_rows.sort_by(|a, b| {
state_sort_order(&a.4)
.cmp(&state_sort_order(&b.4))
.then_with(|| a.1.cmp(&b.1)) .then_with(|| a.0.cmp(&b.0)) .then_with(|| a.2.cmp(&b.2)) .then_with(|| a.3.cmp(&b.3)) });
let sep = "+--------+----------+--------------------+--------------------+------------+";
table_output.push_str(&format!(" {}\n", sep));
table_output.push_str(&format!(
" | {:<6} | {:<8} | {:<18} | {:<18} | {:<10} |\n",
"Port".dimmed(),
"PID".dimmed(),
"LocalAddr".dimmed(),
"RemoteAddr".dimmed(),
"State".dimmed()
));
table_output.push_str(&format!(" {}\n", sep));
for (port, pids, local_addr, remote_addr, state, _) in &all_rows {
let state_base = state.split_whitespace().next().unwrap_or(state);
let state_colored = color_state(state_base);
let suffix = state.strip_prefix(state_base).unwrap_or("").trim();
let state_display = if suffix.is_empty() {
state_colored.to_string()
} else {
format!("{} {}", state_colored, suffix)
};
table_output.push_str(&format!(
" | {:<6} | {:<8} | {:<18} | {:<18} | {:<10} |\n",
port, pids, local_addr, remote_addr, state_display
));
}
table_output.push_str(&format!(" {}\n", sep));
}
if kill {
for (port, pids_to_kill) in all_pids_to_kill {
if !pids_to_kill.is_empty() && debug {
debug!("Found {} unique PID(s) on port {}: {:?}", pids_to_kill.len(), port, pids_to_kill);
}
for pid in &pids_to_kill {
let killed = kill_process_with_debug(&pid.to_string(), debug).await;
if killed {
info!("Successfully killed process with PID: {}", pid);
table_output.push_str(&format!("Killed process with PID: {}\n", pid));
} else {
error!("Failed to kill process with PID: {}", pid);
}
}
}
}
if debug {
debug!(
"execute_ports_command_netstat2 took: {:.2?}",
start.elapsed()
);
}
table_output
}
async fn get_port_info_with_netstat(port: &str, debug: bool, kill: bool, no_local: bool) -> String {
if debug {
debug!("Using netstat to get PIDs for port: {}", port);
}
let output: Result<Output, std::io::Error> = if cfg!(target_os = "windows") {
Command::new("netstat").arg("-ano").output().await
} else {
let netstat_cmd: String = format!("sudo netstat -tulpen | grep :{}", port);
Command::new("sh")
.arg("-c")
.arg(&netstat_cmd)
.output()
.await
};
let output = match output {
Ok(o) => o,
Err(e) => {
return format!("Failed to execute netstat: {}", e);
}
};
if !output.status.success() && output.stdout.is_empty() {
return format!("No processes found on port: {}", port);
}
let stdout: std::borrow::Cow<'_, str> = String::from_utf8_lossy(&output.stdout);
let port_marker = format!(":{}", port);
let filtered_lines: Vec<String> = if cfg!(target_os = "windows") {
stdout
.lines()
.filter_map(|line| {
let trimmed = line.trim();
if trimmed.is_empty()
|| trimmed.starts_with("Proto")
|| trimmed.starts_with("Active Connections")
{
return None;
}
if trimmed.contains(&port_marker) {
Some(trimmed.to_string())
} else {
None
}
})
.collect()
} else {
stdout
.lines()
.filter_map(|line| {
let trimmed = line.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
})
.collect()
};
if filtered_lines.is_empty() {
return format!("No processes found on port: {}", port);
}
let mut groups: HashMap<(String, String, String), (usize, HashSet<String>)> = HashMap::new();
let mut pids_to_kill: HashSet<String> = HashSet::new();
for line in filtered_lines {
if line.is_empty() {
continue;
}
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.is_empty() {
continue;
}
let (pid, local_addr, foreign_addr, state) = if cfg!(target_os = "windows") {
let pid_part = parts.last().unwrap_or(&"-");
let local = parts.get(1).unwrap_or(&"-");
let foreign = parts.get(2).unwrap_or(&"-");
let state = parts.iter().rev().nth(1).unwrap_or(&"-");
(
pid_part.to_string(),
local.to_string(),
foreign.to_string(),
state.to_string(),
)
} else {
let pid_program = parts.last().unwrap_or(&"-");
let pid_value = if let Some(slash_pos) = pid_program.find('/') {
pid_program[..slash_pos].to_string()
} else {
pid_program.to_string()
};
let local = parts.get(3).unwrap_or(&"-");
let foreign = parts.get(4).unwrap_or(&"-");
let state = parts.iter().rev().nth(1).unwrap_or(&"-");
(
pid_value,
local.to_string(),
foreign.to_string(),
state.to_string(),
)
};
if no_local && local_addr == foreign_addr {
continue;
}
let key = (local_addr.clone(), foreign_addr.clone(), state.clone());
let (count, pids) = groups.entry(key).or_insert((0, HashSet::new()));
*count += 1;
if pid != "-" && pid.parse::<u32>().is_ok() {
pids_to_kill.insert(pid.clone());
pids.insert(pid.clone());
}
}
if groups.is_empty() {
return format!(
"No processes found on port: {} (filtered by --no-local)",
port
);
}
let sep = "+--------+----------+--------------------+--------------------+------------+";
let mut table_output = String::new();
table_output.push_str(&format!(" {}\n", sep));
table_output.push_str(&format!(
" | {:<6} | {:<8} | {:<18} | {:<18} | {:<10} |\n",
"Port".dimmed(),
"PID".dimmed(),
"LocalAddr".dimmed(),
"RemoteAddr".dimmed(),
"State".dimmed()
));
table_output.push_str(&format!(" {}\n", sep));
let mut group_vec: Vec<_> = groups.into_iter().collect();
group_vec.sort_by(|a, b| {
state_sort_order(&a.0 .2)
.cmp(&state_sort_order(&b.0 .2))
.then_with(|| a.0 .0.cmp(&b.0 .0))
.then_with(|| a.0 .1.cmp(&b.0 .1))
});
for ((local_addr, remote_addr, state), (count, group_pids)) in group_vec {
let pids_str = if group_pids.is_empty() {
"-".to_string()
} else {
let mut v: Vec<_> = group_pids.iter().cloned().collect();
v.sort();
v.join(",")
};
let count_suffix = if count > 1 {
format!(" (×{})", count)
} else {
String::new()
};
let state_base = state.split_whitespace().next().unwrap_or(&state);
let state_colored = color_state(state_base);
let suffix = state.strip_prefix(state_base).unwrap_or("").trim();
let state_display = if suffix.is_empty() {
format!("{}{}", state_colored, count_suffix)
} else {
format!("{} {}{}", state_colored, suffix, count_suffix)
};
table_output.push_str(&format!(
" | {:<6} | {:<8} | {:<18} | {:<18} | {:<10} |\n",
port, pids_str, local_addr, remote_addr, state_display
));
}
table_output.push_str(&format!(" {}\n", sep));
if kill && !pids_to_kill.is_empty() {
if debug {
debug!(
"Found {} unique PID(s) to kill: {:?}",
pids_to_kill.len(),
pids_to_kill
);
}
for pid in &pids_to_kill {
let killed = kill_process_with_debug(pid, debug).await;
if killed {
info!("Successfully killed process with PID: {}", pid);
table_output.push_str(&format!("Killed process with PID: {}\n", pid));
} else {
error!("Failed to kill process with PID: {}", pid);
}
}
}
table_output
}
#[cfg(target_os = "windows")]
async fn kill_process_with_debug(pid: &str, debug: bool) -> bool {
if debug {
debug!("Attempting to kill PID: {}", pid);
}
let start: Instant = Instant::now();
let kill_output = Command::new("taskkill")
.arg("/PID")
.arg(pid)
.arg("/F")
.output()
.await;
let kill_output = match kill_output {
Ok(o) => o,
Err(e) => {
if debug {
debug!("Failed to execute taskkill: {}", e);
}
return false;
}
};
let elapsed = start.elapsed();
if debug {
debug!(
"taskkill output: status={:?}, stdout='{}', stderr='{}', took: {:.2?}",
kill_output.status,
String::from_utf8_lossy(&kill_output.stdout),
String::from_utf8_lossy(&kill_output.stderr),
elapsed
);
}
kill_output.status.success()
}
#[cfg(not(target_os = "windows"))]
async fn kill_process_with_debug(pid: &str, debug: bool) -> bool {
if debug {
debug!("Attempting to kill PID: {}", pid);
}
let start: Instant = Instant::now();
let kill_output: Output = Command::new("sh")
.arg("-c")
.arg(format!("sudo kill -9 {}", pid))
.output()
.await
.expect("Failed to execute kill command");
let elapsed = start.elapsed();
if debug {
debug!(
"kill_process output: status={:?}, stdout='{}', stderr='{}', took: {:.2?}",
kill_output.status,
String::from_utf8_lossy(&kill_output.stdout),
String::from_utf8_lossy(&kill_output.stderr),
elapsed
);
}
kill_output.status.success()
}
fn color_state(state: &str) -> colored::ColoredString {
match state {
"Listen" | "LISTEN" => state.green(),
"Established" | "ESTABLISHED" => state.cyan(),
"SynReceived" | "SYN_RECEIVED" | "SynSent" | "SYN_SENT" => state.yellow(),
"TimeWait" | "TIME_WAIT" | "CloseWait" | "CLOSE_WAIT" | "FinWait1" | "FinWait2"
| "Closing" | "LastAck" => state.dimmed(),
_ => state.normal(),
}
}
fn display_output(output: String) {
println!("{}", output);
}
async fn search_nginx_configs(port: &str) {
let nginx_sites_available_path = PathBuf::from("/etc/nginx/sites-available");
if !nginx_sites_available_path.exists() {
warn!("Warning: /etc/nginx/sites-available/ not found. Skipping NGINX config search.");
return;
}
let mut found_configs = false;
match fs::read_dir(&nginx_sites_available_path) {
Ok(entries) => {
for entry in entries.flatten() {
let path = entry.path();
if path.is_file() {
let config_content = match fs::read_to_string(&path) {
Ok(content) => content,
Err(_) => continue,
};
if config_content.contains(&format!("proxy_pass http://127.0.0.1:{}", port))
|| config_content.contains(&format!("listen {}", port))
{
info!("Found port {} in NGINX config: {}", port, path.display());
found_configs = true;
}
}
}
}
Err(e) => {
error!("Error reading NGINX sites-available directory: {}", e);
return;
}
}
if !found_configs {
info!("No NGINX configurations found for port {}.", port);
}
}
async fn print_reconciled_ports(port_filter: Option<&str>) -> Result<(), String> {
let active_ports = collect_listening_port_ownership()?;
let nginx_sites = inspect_nginx_configs(false).map_err(|e| e.to_string())?;
let xbp_ports = collect_xbp_project_ports();
let mut rows: BTreeMap<u16, PortRow> = BTreeMap::new();
for (port, active) in active_ports {
let row = rows.entry(port).or_default();
row.active = true;
row.pids
.extend(active.pids.into_iter().map(|pid| pid.to_string()));
row.projects.extend(active.xbp_projects);
}
for site in nginx_sites {
for port in site.upstream_ports {
let row = rows.entry(port).or_default();
let listens = if site.listen_ports.is_empty() {
"-".to_string()
} else {
site.listen_ports
.iter()
.map(|port| port.to_string())
.collect::<Vec<_>>()
.join(",")
};
row.nginx
.push(format!("{} (listen {})", site.domain, listens));
}
}
for (port, projects) in xbp_ports {
rows.entry(port).or_default().projects.extend(projects);
}
println!("\nReconciled Ports");
println!("{:-<110}", "");
println!(
"{:<8} {:<8} {:<18} {:<34} XBP PROJECTS",
"PORT", "ACTIVE", "PIDS", "NGINX"
);
println!("{:-<110}", "");
let requested = port_filter.and_then(|port| port.parse::<u16>().ok());
let mut xbp_rows = Vec::new();
let mut other_rows = Vec::new();
for (port, row) in rows {
if requested.is_some() && requested != Some(port) {
continue;
}
if row.is_xbp() {
xbp_rows.push((port, row));
} else {
other_rows.push((port, row));
}
}
for (port, row) in xbp_rows.into_iter().chain(other_rows.into_iter()) {
let line = format!(
"{:<8} {:<8} {:<18} {:<34} {}",
port,
if row.active { "yes" } else { "no" },
join_strings(&row.pids),
join_strings(&row.nginx),
join_strings(&row.projects),
);
if row.is_xbp() {
println!("{}", line.bright_magenta());
} else {
println!("{}", line);
}
}
println!("{:-<110}", "");
Ok(())
}
fn collect_xbp_project_ports() -> BTreeMap<u16, Vec<String>> {
let mut by_port: BTreeMap<u16, Vec<String>> = BTreeMap::new();
for project in collect_known_xbp_projects() {
let Some(found) = find_xbp_config_upwards(&project.root) else {
continue;
};
let Ok(content) = fs::read_to_string(&found.config_path) else {
continue;
};
let Ok((config, _)) = parse_config_with_auto_heal::<XbpConfig>(&content, found.kind) else {
continue;
};
by_port
.entry(config.port)
.or_default()
.push(project.name.clone());
if let Some(services) = config.services {
for service in services {
by_port
.entry(service.port)
.or_default()
.push(format!("{}/{}", project.name, service.name));
}
}
}
for values in by_port.values_mut() {
values.sort();
values.dedup();
}
by_port
}
fn join_strings(values: &[String]) -> String {
if values.is_empty() {
"-".to_string()
} else {
let mut deduped = values.to_vec();
deduped.sort();
deduped.dedup();
deduped.join(", ")
}
}
#[derive(Debug, Default)]
struct PortRow {
active: bool,
pids: Vec<String>,
nginx: Vec<String>,
projects: Vec<String>,
}
impl PortRow {
fn is_xbp(&self) -> bool {
!self.projects.is_empty()
}
}