use std::io::{self, BufRead, BufReader, Write};
use std::net::{TcpStream, ToSocketAddrs};
use std::process::Command;
use std::time::{Duration, Instant};
use colored::Colorize;
use crate::utils::safe_truncate;
use rayon::prelude::*;
use serde::Serialize;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum PortsError {
#[error("Failed to list ports: {0}")]
CommandFailed(String),
#[error("IO error: {0}")]
Io(#[from] io::Error),
}
#[derive(Debug, Serialize, Clone)]
pub struct PortEntry {
pub port: u16,
pub proto: String,
pub pid: u32,
pub process: String,
pub address: String,
}
#[derive(Debug, Serialize, Clone)]
pub struct QuickScanEntry {
pub port: u16,
pub open: bool,
pub service: String,
pub latency_ms: Option<u64>,
pub banner: Option<String>,
}
const QUICK_SCAN_PORTS: &[(u16, &str)] = &[
(22, "ssh"),
(80, "http"),
(443, "https"),
(3000, "dev-server"),
(3306, "mysql"),
(5000, "flask"),
(5173, "vite"),
(5432, "postgres"),
(6379, "redis"),
(8000, "django"),
(8080, "http-alt"),
(8443, "https-alt"),
(8888, "jupyter"),
(9090, "prometheus"),
(9200, "elasticsearch"),
(27017, "mongodb"),
];
pub fn quick_scan(host: &str) -> Vec<QuickScanEntry> {
let timeout = Duration::from_millis(200);
let host_owned = host.to_string();
QUICK_SCAN_PORTS
.par_iter()
.map(|&(port, service)| {
let addr_str = format!("{host_owned}:{port}");
let start = Instant::now();
let open = match addr_str.to_socket_addrs() {
Ok(mut addrs) => addrs
.next()
.map(|a| TcpStream::connect_timeout(&a, timeout).is_ok())
.unwrap_or(false),
Err(_) => false,
};
let latency_ms = if open {
Some(start.elapsed().as_millis() as u64)
} else {
None
};
let banner = if open {
grab_banner(&host_owned, port)
} else {
None
};
QuickScanEntry {
port,
open,
service: service.to_string(),
latency_ms,
banner,
}
})
.collect()
}
pub fn scan_range(host: &str, start_port: u16, end_port: u16) -> Vec<QuickScanEntry> {
let timeout = Duration::from_millis(200);
let host_owned = host.to_string();
let ports: Vec<u16> = (start_port..=end_port).collect();
ports
.par_iter()
.map(|&port| {
let addr_str = format!("{host_owned}:{port}");
let start = Instant::now();
let open = match addr_str.to_socket_addrs() {
Ok(mut addrs) => addrs
.next()
.map(|a| TcpStream::connect_timeout(&a, timeout).is_ok())
.unwrap_or(false),
Err(_) => false,
};
let latency_ms = if open {
Some(start.elapsed().as_millis() as u64)
} else {
None
};
let banner = if open {
grab_banner(&host_owned, port)
} else {
None
};
let svc = service_hint(port)
.map(|s| s.to_string())
.unwrap_or_else(|| format!("port-{port}"));
QuickScanEntry {
port,
open,
service: svc,
latency_ms,
banner,
}
})
.collect()
}
fn grab_banner(host: &str, port: u16) -> Option<String> {
let addr_str = format!("{host}:{port}");
let addr = addr_str.to_socket_addrs().ok()?.next()?;
let stream = TcpStream::connect_timeout(&addr, Duration::from_millis(500)).ok()?;
stream
.set_read_timeout(Some(Duration::from_millis(500)))
.ok()?;
stream
.set_write_timeout(Some(Duration::from_millis(500)))
.ok()?;
let mut stream = stream;
match port {
80 | 3000 | 4200 | 4321 | 5000 | 5173 | 5500 | 8000 | 8080 | 8443 | 8888 | 9090 => {
let req = format!("HEAD / HTTP/1.0\r\nHost: {host}\r\n\r\n");
if stream.write_all(req.as_bytes()).is_err() {
return None;
}
let _ = stream.flush();
let mut reader = BufReader::new(&stream);
let mut first_line = String::new();
if reader.read_line(&mut first_line).is_ok() && !first_line.is_empty() {
let mut server = None;
for _ in 0..15 {
let mut header_line = String::new();
if reader.read_line(&mut header_line).is_err() || header_line.trim().is_empty()
{
break;
}
if let Some(val) = header_line.strip_prefix("Server: ") {
server = Some(val.trim().to_string());
break;
}
let lower = header_line.to_lowercase();
if lower.starts_with("server:") {
let val = header_line[7..].trim().to_string();
server = Some(val);
break;
}
}
let status = first_line.trim().to_string();
match server {
Some(srv) => Some(format!("{status} | {srv}")),
None => Some(status),
}
} else {
None
}
}
6379 => {
if stream.write_all(b"PING\r\n").is_err() {
return None;
}
let _ = stream.flush();
let mut reader = BufReader::new(&stream);
let mut line = String::new();
if reader.read_line(&mut line).is_ok() && !line.is_empty() {
Some(format!("Redis: {}", line.trim()))
} else {
None
}
}
_ => {
let mut reader = BufReader::new(&stream);
let mut line = String::new();
if reader.read_line(&mut line).is_ok() && !line.is_empty() {
let trimmed = line.trim().to_string();
Some(safe_truncate(&trimmed, 120))
} else {
None
}
}
}
}
pub fn collect_quick_scan(host: &str) -> Vec<QuickScanEntry> {
quick_scan(host)
}
fn service_hint(port: u16) -> Option<&'static str> {
match port {
21 => Some("ftp"),
22 => Some("ssh"),
23 => Some("telnet"),
25 => Some("smtp"),
53 => Some("dns"),
80 => Some("http"),
110 => Some("pop3"),
143 => Some("imap"),
443 => Some("https"),
445 => Some("smb"),
993 => Some("imaps"),
995 => Some("pop3s"),
1433 => Some("mssql"),
1521 => Some("oracle"),
2375 => Some("docker"),
2376 => Some("docker-tls"),
3000 => Some("dev-server"),
3306 => Some("mysql"),
4200 => Some("angular"),
4321 => Some("astro"),
5000 => Some("flask"),
5173 => Some("vite"),
5432 => Some("postgres"),
5500 => Some("live-server"),
5672 => Some("rabbitmq"),
6379 => Some("redis"),
8000 => Some("django"),
8080 => Some("http-alt"),
8443 => Some("https-alt"),
8888 => Some("jupyter"),
9090 => Some("prometheus"),
9200 => Some("elasticsearch"),
9418 => Some("git-daemon"),
15672 => Some("rabbitmq-mgmt"),
27017 => Some("mongodb"),
_ => None,
}
}
pub fn service_hint_pub(port: u16) -> Option<&'static str> {
service_hint(port)
}
pub fn collect_ports() -> Result<Vec<PortEntry>, PortsError> {
list_ports()
}
fn list_ports() -> Result<Vec<PortEntry>, PortsError> {
#[cfg(target_os = "windows")]
{
list_ports_windows()
}
#[cfg(target_os = "linux")]
{
list_ports_linux()
}
#[cfg(target_os = "macos")]
{
list_ports_macos()
}
#[cfg(not(any(target_os = "windows", target_os = "linux", target_os = "macos")))]
{
Err(PortsError::CommandFailed(
"Unsupported platform".to_string(),
))
}
}
#[cfg(target_os = "windows")]
fn list_ports_windows() -> Result<Vec<PortEntry>, PortsError> {
let output = Command::new("netstat")
.args(["-ano"])
.output()
.map_err(|e| PortsError::CommandFailed(e.to_string()))?;
if !output.status.success() {
return Err(PortsError::CommandFailed("netstat failed".to_string()));
}
let stdout = String::from_utf8_lossy(&output.stdout);
let mut entries = Vec::new();
let mut seen_ports = std::collections::HashSet::new();
use sysinfo::System;
let mut sys = System::new();
sys.refresh_processes(sysinfo::ProcessesToUpdate::All, true);
for line in stdout.lines() {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 5 && parts[3] == "LISTENING" {
let proto = parts[0].to_string();
if let Some((addr, port_str)) = parts[1].rsplit_once(':') {
if let Ok(port) = port_str.parse::<u16>() {
if seen_ports.insert(port) {
let pid = parts[4].parse::<u32>().unwrap_or(0);
let process = resolve_process_name(&sys, pid);
entries.push(PortEntry {
port,
proto,
pid,
process,
address: addr.to_string(),
});
}
}
}
}
}
entries.sort_by_key(|e| e.port);
Ok(entries)
}
#[cfg(target_os = "windows")]
fn resolve_process_name(sys: &sysinfo::System, pid: u32) -> String {
use sysinfo::Pid;
sys.process(Pid::from_u32(pid))
.map(|p| p.name().to_string_lossy().to_string())
.unwrap_or_else(|| "—".to_string())
}
#[cfg(target_os = "linux")]
fn list_ports_linux() -> Result<Vec<PortEntry>, PortsError> {
let output = Command::new("ss")
.args(["-tlnp"])
.output()
.map_err(|e| PortsError::CommandFailed(e.to_string()))?;
if !output.status.success() {
return Err(PortsError::CommandFailed("ss command failed".to_string()));
}
let stdout = String::from_utf8_lossy(&output.stdout);
let mut entries = Vec::new();
for line in stdout.lines().skip(1) {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 5 {
if let Some((addr, port_str)) = parts[3].rsplit_once(':') {
if let Ok(port) = port_str.parse::<u16>() {
let (process, pid) = if parts.len() > 5 {
extract_linux_process(parts[5])
} else {
("—".to_string(), 0)
};
entries.push(PortEntry {
port,
proto: "TCP".to_string(),
pid,
process,
address: addr
.trim_start_matches('[')
.trim_end_matches(']')
.to_string(),
});
}
}
}
}
entries.sort_by_key(|e| e.port);
Ok(entries)
}
#[cfg(target_os = "linux")]
fn extract_linux_process(field: &str) -> (String, u32) {
let mut name = "—".to_string();
let mut pid = 0u32;
if let Some(start) = field.find("((\"") {
let rest = &field[start + 3..];
if let Some(end) = rest.find('"') {
name = rest[..end].to_string();
}
}
if let Some(start) = field.find("pid=") {
let rest = &field[start + 4..];
if let Some(end) = rest.find(|c: char| !c.is_ascii_digit()) {
pid = rest[..end].parse().unwrap_or(0);
}
}
(name, pid)
}
#[cfg(target_os = "macos")]
fn list_ports_macos() -> Result<Vec<PortEntry>, PortsError> {
let output = Command::new("lsof")
.args(["-iTCP", "-sTCP:LISTEN", "-n", "-P"])
.output()
.map_err(|e| PortsError::CommandFailed(e.to_string()))?;
if !output.status.success() {
return Err(PortsError::CommandFailed("lsof failed".to_string()));
}
let stdout = String::from_utf8_lossy(&output.stdout);
let mut entries = Vec::new();
for line in stdout.lines().skip(1) {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 9 {
let process = parts[0].to_string();
let pid = parts[1].parse::<u32>().unwrap_or(0);
for part in &parts[8..] {
if let Some((addr, port_str)) = part.rsplit_once(':') {
if let Ok(port) = port_str.parse::<u16>() {
let address = if addr == "*" {
"0.0.0.0".to_string()
} else {
addr.to_string()
};
entries.push(PortEntry {
port,
proto: "TCP".to_string(),
pid,
process: process.clone(),
address,
});
break;
}
}
}
}
}
entries.sort_by_key(|e| e.port);
Ok(entries)
}
pub fn run(port_filter: Option<u16>, json: bool) -> Result<(), PortsError> {
let mut entries = list_ports()?;
if let Some(filter_port) = port_filter {
entries.retain(|e| e.port == filter_port);
}
if json {
let json_str = serde_json::to_string_pretty(&entries)
.map_err(|e| PortsError::Io(io::Error::other(e)))?;
println!("{json_str}");
if port_filter.is_some() && entries.is_empty() {
return Err(PortsError::CommandFailed(format!(
"No process listening on port {}",
port_filter.unwrap()
)));
}
return Ok(());
}
println!();
println!(
" {} {} {} {} {}",
"devpulse".bold(),
"──".dimmed(),
"Ports".bold(),
"──".dimmed(),
"Listening".dimmed()
);
println!();
if entries.is_empty() {
if let Some(p) = port_filter {
println!(" No process listening on port {p}.");
} else {
println!(" No listening ports found.");
}
println!();
if port_filter.is_some() {
return Err(PortsError::CommandFailed(format!(
"No process listening on port {}",
port_filter.unwrap()
)));
}
return Ok(());
}
println!(
" {:<8} {:<8} {:<8} {:<17} {}",
"Port".bold(),
"Proto".bold(),
"PID".bold(),
"Process".bold(),
"Address".bold()
);
println!(" {}", "─".repeat(55).dimmed());
for entry in &entries {
let port_str = entry.port.to_string().bold().white().to_string();
let hint = service_hint(entry.port)
.map(|h| format!(" ({})", h.dimmed()))
.unwrap_or_default();
println!(
" {:<8} {:<8} {:<8} {:<17} {}{}",
port_str, entry.proto, entry.pid, entry.process, entry.address, hint,
);
}
println!();
println!(
" {} listening port(s) found",
entries.len().to_string().bold()
);
println!();
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_service_hint_known_ports() {
assert_eq!(service_hint(80), Some("http"));
assert_eq!(service_hint(443), Some("https"));
assert_eq!(service_hint(3000), Some("dev-server"));
assert_eq!(service_hint(5432), Some("postgres"));
}
#[test]
fn test_service_hint_unknown_port() {
assert_eq!(service_hint(12345), None);
}
#[test]
fn test_port_entry_serialization() {
let entry = PortEntry {
port: 8080,
proto: "TCP".to_string(),
pid: 1234,
process: "node".to_string(),
address: "127.0.0.1".to_string(),
};
let json = serde_json::to_string(&entry).unwrap();
assert!(json.contains("\"port\":8080"));
assert!(json.contains("\"process\":\"node\""));
}
#[test]
fn test_quick_scan_returns_all_ports() {
let results = quick_scan("127.0.0.1");
assert_eq!(results.len(), QUICK_SCAN_PORTS.len());
for entry in &results {
assert!(!entry.service.is_empty());
if entry.open {
assert!(entry.latency_ms.is_some());
} else {
assert!(entry.latency_ms.is_none());
assert!(entry.banner.is_none());
}
}
}
#[test]
fn test_quick_scan_entry_serialization() {
let entry = QuickScanEntry {
port: 8080,
open: true,
service: "http-alt".to_string(),
latency_ms: Some(2),
banner: Some("HTTP/1.1 200 OK | nginx".to_string()),
};
let json = serde_json::to_string(&entry).unwrap();
assert!(json.contains("\"open\":true"));
assert!(json.contains("\"service\":\"http-alt\""));
assert!(json.contains("\"banner\""));
}
}