use super::error::{DockerServiceError, DockerServiceResult};
use client_core::constants::docker::DOCKER_SOCKET_PATH;
use ducker::docker::{container::DockerContainer, util::new_local_docker_connection};
use nom::{
IResult, Parser,
branch::alt,
bytes::complete::{tag, take_until, take_while1},
character::complete::char,
combinator::map,
multi::many0,
sequence::{delimited, pair},
};
use serde_yaml::Value;
use std::collections::HashMap;
use std::env;
use std::fs;
use std::net::{SocketAddr, TcpListener};
use std::path::Path;
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone)]
pub struct PortMapping {
pub host_port: u16,
pub container_port: u16,
pub protocol: String,
pub service_name: String,
}
#[derive(Debug)]
pub struct PortConflictReport {
pub conflicted_ports: Vec<PortConflict>,
pub total_checked: usize,
pub has_conflicts: bool,
}
#[derive(Debug)]
pub struct PortConflict {
pub port: u16,
pub service_name: String,
pub mapping: String,
}
#[derive(Debug, Clone)]
enum VarExpansion {
Text(String),
Variable(String),
VariableWithDefault(String, String),
}
fn var_name(input: &str) -> IResult<&str, &str> {
take_while1(|c: char| c.is_alphanumeric() || c == '_' || c == '-').parse(input)
}
fn parse_braced_var(input: &str) -> IResult<&str, VarExpansion> {
map(delimited(tag("${"), var_name, char('}')), |var_name| {
VarExpansion::Variable(var_name.to_string())
})
.parse(input)
}
fn parse_braced_var_with_default(input: &str) -> IResult<&str, VarExpansion> {
map(
delimited(
tag("${"),
pair(var_name, pair(tag(":-"), take_until("}"))),
char('}'),
),
|(var_name, (_, default_value))| {
VarExpansion::VariableWithDefault(var_name.to_string(), default_value.to_string())
},
)
.parse(input)
}
fn parse_simple_var(input: &str) -> IResult<&str, VarExpansion> {
map(pair(char('$'), var_name), |(_, var_name)| {
VarExpansion::Variable(var_name.to_string())
})
.parse(input)
}
fn parse_text(input: &str) -> IResult<&str, VarExpansion> {
map(take_while1(|c: char| c != '$'), |text: &str| {
VarExpansion::Text(text.to_string())
})
.parse(input)
}
fn parse_dollar(input: &str) -> IResult<&str, VarExpansion> {
map(char('$'), |_| VarExpansion::Text("$".to_string())).parse(input)
}
fn parse_env_string(input: &str) -> IResult<&str, Vec<VarExpansion>> {
many0(alt((
parse_braced_var_with_default, parse_braced_var, parse_simple_var, parse_text, parse_dollar, )))
.parse(input)
}
#[derive(Debug, Clone)]
pub struct PortManager {
reserved_ports: Vec<u16>,
env_vars: HashMap<String, String>,
}
impl PortManager {
pub fn new() -> Self {
Self {
reserved_ports: Vec::new(),
env_vars: HashMap::new(),
}
}
pub fn load_env_file(&mut self, env_file_path: &Path) -> DockerServiceResult<()> {
if !env_file_path.exists() {
warn!(
".env file does not exist: {path}, skip loading env vars",
path = env_file_path.display()
);
return Ok(());
}
info!(
"Start loading env file: {path}",
path = env_file_path.display()
);
let content = fs::read_to_string(env_file_path).map_err(|e| {
DockerServiceError::Configuration(format!(
"{}",
t!(
"port_manager.read_env_file_failed",
path = env_file_path.display(),
error = e.to_string()
)
))
})?;
info!(
"Read .env content successfully ({count} chars)",
count = content.len()
);
self.env_vars.clear();
for (line_num, line) in content.lines().enumerate() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
if let Some((key, value)) = line.split_once('=') {
let key = key.trim().to_string();
let value = value.trim();
let value = if (value.starts_with('"') && value.ends_with('"'))
|| (value.starts_with('\'') && value.ends_with('\''))
{
&value[1..value.len() - 1]
} else {
value
};
self.env_vars.insert(key.clone(), value.to_string());
info!(
"Line {line}: loaded env var: {key} = {value}",
line = line_num + 1,
key = key,
value = value
);
} else {
warn!(
"Line {line}: invalid env var format: {value}",
line = line_num + 1,
value = line
);
}
}
info!(
"Env loading completed: {count} variables",
count = self.env_vars.len()
);
info!(
"Loaded env var list: {vars}",
vars = format!("{:?}", self.env_vars)
);
Ok(())
}
fn expand_env_vars(&self, input: &str) -> String {
match parse_env_string(input) {
Ok((remaining, expansions)) => {
let mut result = String::new();
for expansion in expansions {
match expansion {
VarExpansion::Text(text) => {
result.push_str(&text);
}
VarExpansion::Variable(var_name) => {
if let Some(value) = self.env_vars.get(&var_name) {
result.push_str(value);
} else if let Ok(value) = env::var(&var_name) {
result.push_str(&value);
} else {
warn!("Environment variable {key} is undefined", key = var_name);
result.push_str(&format!("${{{var_name}}}"));
}
}
VarExpansion::VariableWithDefault(var_name, default_value) => {
if let Some(value) = self.env_vars.get(&var_name) {
result.push_str(value);
} else if let Ok(value) = env::var(&var_name) {
result.push_str(&value);
} else {
debug!(
"Environment variable {key} is undefined, using default: {default}",
key = var_name,
default = default_value
);
result.push_str(&default_value);
}
}
}
}
if !remaining.is_empty() {
result.push_str(remaining);
}
result
}
Err(_) => {
warn!(
"Env parse failed, return raw string: {value}",
value = input
);
input.to_string()
}
}
}
pub fn is_port_available(&self, port: u16) -> bool {
if self.reserved_ports.contains(&port) {
return false;
}
match TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))) {
Ok(listener) => {
drop(listener);
true
}
Err(_) => {
match TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], port))) {
Ok(listener) => {
drop(listener);
warn!(
"Port {port} can only bind to 127.0.0.1, permission may be restricted",
port = port
);
true
}
Err(_) => {
false
}
}
}
}
}
#[allow(dead_code)]
pub fn get_available_port(&self, preferred_port: u16) -> DockerServiceResult<u16> {
if self.is_port_available(preferred_port) {
Ok(preferred_port)
} else {
for port in (preferred_port + 1)..=(preferred_port + 100) {
if self.is_port_available(port) {
return Ok(port);
}
}
Err(DockerServiceError::Configuration(format!(
"{}",
t!("port_manager.no_available_port_from", port = preferred_port)
)))
}
}
#[allow(dead_code)]
pub fn reserve_port(&mut self, port: u16) {
if !self.reserved_ports.contains(&port) {
self.reserved_ports.push(port);
}
}
pub async fn parse_compose_ports(
&mut self,
compose_file_path: &Path,
) -> DockerServiceResult<Vec<PortMapping>> {
info!(
"Start parsing docker-compose ports: {path}",
path = compose_file_path.display()
);
if self.env_vars.is_empty() {
if let Some(parent_dir) = compose_file_path.parent() {
let env_file = parent_dir.join(".env");
if env_file.exists() {
info!(
"Env cache is empty, loading .env: {path}",
path = env_file.display()
);
if let Err(e) = self.load_env_file(&env_file) {
error!(
"Failed to load env file in parse_compose_ports: {error}",
error = e.to_string()
);
return Err(e);
}
} else {
warn!(
"Env cache is empty, but .env not found: {path}",
path = env_file.display()
);
}
}
} else {
info!(
"Env cache not empty ({count} vars), skip loading .env",
count = self.env_vars.len()
);
}
let content = std::fs::read_to_string(compose_file_path).map_err(|e| {
DockerServiceError::Configuration(format!(
"{}",
t!(
"port_manager.read_compose_file_failed",
path = compose_file_path.display(),
error = e.to_string()
)
))
})?;
let yaml: Value = serde_yaml::from_str(&content).map_err(|e| {
DockerServiceError::Configuration(format!(
"{}",
t!(
"port_manager.parse_compose_file_failed",
error = e.to_string()
)
))
})?;
let mut port_mappings = Vec::new();
if let Some(services) = yaml.get("services").and_then(|s| s.as_mapping()) {
for (service_name, service_config) in services {
let service_name = service_name.as_str().unwrap_or("unknown").to_string();
if let Some(ports) = service_config.get("ports").and_then(|p| p.as_sequence()) {
for port_def in ports {
if let Some(port_mapping) =
self.parse_port_definition(port_def, &service_name)?
{
port_mappings.push(port_mapping);
}
}
}
}
}
info!(
"Parse completed, found {count} port mappings",
count = port_mappings.len()
);
Ok(port_mappings)
}
fn parse_port_definition(
&self,
port_def: &Value,
service_name: &str,
) -> DockerServiceResult<Option<PortMapping>> {
match port_def {
Value::String(port_str) => {
info!(
"Parse port definition (raw): {value} (service: {service})",
value = port_str,
service = service_name
);
debug!(
"Current env var cache: {vars}",
vars = format!("{:?}", self.env_vars)
);
let port_str = self.expand_env_vars(port_str.trim());
info!(
"Parse port definition (expanded): {value} (service: {service})",
value = port_str,
service = service_name
);
let port_str = port_str.trim();
let (port_part, protocol) = if port_str.contains('/') {
let parts: Vec<&str> = port_str.split('/').collect();
(parts[0], parts.get(1).unwrap_or(&"tcp").to_string())
} else {
(port_str, "tcp".to_string())
};
let ports: Vec<&str> = port_part.split(':').collect();
match ports.len() {
2 => {
let host_port = ports[0].parse::<u16>().map_err(|_| {
DockerServiceError::Configuration(format!(
"{}",
t!(
"port_manager.invalid_host_port",
port = ports[0],
raw = port_str,
service = service_name
)
))
})?;
let container_port = ports[1].parse::<u16>().map_err(|_| {
DockerServiceError::Configuration(format!(
"{}",
t!(
"port_manager.invalid_container_port",
port = ports[1],
raw = port_str,
service = service_name
)
))
})?;
Ok(Some(PortMapping {
host_port,
container_port,
protocol,
service_name: service_name.to_string(),
}))
}
3 => {
let host_port = ports[1].parse::<u16>().map_err(|_| {
DockerServiceError::Configuration(format!(
"{}",
t!(
"port_manager.invalid_host_port",
port = ports[1],
raw = port_str,
service = service_name
)
))
})?;
let container_port = ports[2].parse::<u16>().map_err(|_| {
DockerServiceError::Configuration(format!(
"{}",
t!(
"port_manager.invalid_container_port",
port = ports[2],
raw = port_str,
service = service_name
)
))
})?;
Ok(Some(PortMapping {
host_port,
container_port,
protocol,
service_name: service_name.to_string(),
}))
}
_ => {
warn!(
"Cannot parse port definition: {value} (raw: {raw}) (service: {service})",
value = port_part,
raw = port_str,
service = service_name
);
Ok(None)
}
}
}
Value::Number(port_num) => {
if let Some(port) = port_num.as_u64() {
if port <= 65535 {
Ok(None)
} else {
Err(DockerServiceError::Configuration(format!(
"{}",
t!("port_manager.port_out_of_range", port = port)
)))
}
} else {
Ok(None)
}
}
_ => {
warn!(
"Unknown port definition format: {value}",
value = format!("{:?}", port_def)
);
Ok(None)
}
}
}
pub async fn smart_check_compose_port_conflicts(
&mut self,
compose_file_path: &Path,
env_file_path: &Path,
) -> DockerServiceResult<PortConflictReport> {
info!(
"Start smart port-conflict check for docker-compose: {path}",
path = compose_file_path.display()
);
if env_file_path.exists() {
info!(
".env found, loading env vars: {path}",
path = env_file_path.display()
);
match self.load_env_file(env_file_path) {
Ok(_) => info!("✅ .env loaded successfully"),
Err(e) => {
error!("❌ Failed to load .env: {error}", error = e.to_string());
return Err(e);
}
}
} else {
warn!("❌ .env not found: {path}", path = env_file_path.display());
}
info!(
"Current loaded env var count: {count}",
count = self.env_vars.len()
);
let compose_services = self.parse_compose_services(compose_file_path)?;
info!(
"Services defined in docker-compose.yml: {services}",
services = format!("{:?}", compose_services)
);
let port_mappings = self.parse_compose_ports(compose_file_path).await?;
let mut conflicted_ports = Vec::new();
let total_checked = port_mappings.len();
let running_containers = self.get_running_containers().await;
for mapping in &port_mappings {
if !self.is_port_available(mapping.host_port) {
let is_related_service = if let Ok(containers) = &running_containers {
self.is_port_used_by_compose_service(
mapping.host_port,
containers,
&mapping.service_name,
&compose_services,
)
} else {
false
};
if is_related_service {
info!(
"Port {port} is used by related compose service (service: {service}) - expected",
port = mapping.host_port,
service = mapping.service_name
);
} else {
warn!(
"Port conflict found: port {port} is occupied by other process (service: {service})",
port = mapping.host_port,
service = mapping.service_name
);
conflicted_ports.push(PortConflict {
port: mapping.host_port,
service_name: mapping.service_name.clone(),
mapping: format!(
"{}:{}/{}",
mapping.host_port, mapping.container_port, mapping.protocol
),
});
}
} else {
debug!(
"Port {port} is available (service: {service})",
port = mapping.host_port,
service = mapping.service_name
);
}
}
let has_conflicts = !conflicted_ports.is_empty();
if has_conflicts {
error!(
"Found {count} real port conflicts, checked {total} ports",
count = conflicted_ports.len(),
total = total_checked
);
} else {
info!(
"Smart port check completed, no conflicts found, checked {total} ports",
total = total_checked
);
}
Ok(PortConflictReport {
conflicted_ports,
total_checked,
has_conflicts,
})
}
fn parse_compose_services(&self, compose_file_path: &Path) -> DockerServiceResult<Vec<String>> {
let content = std::fs::read_to_string(compose_file_path).map_err(|e| {
DockerServiceError::Configuration(format!(
"{}",
t!(
"port_manager.read_compose_file_failed",
path = compose_file_path.display(),
error = e.to_string()
)
))
})?;
let yaml: Value = serde_yaml::from_str(&content).map_err(|e| {
DockerServiceError::Configuration(format!(
"{}",
t!(
"port_manager.parse_compose_file_failed",
error = e.to_string()
)
))
})?;
let mut services = Vec::new();
if let Some(services_section) = yaml.get("services").and_then(|s| s.as_mapping()) {
for (service_name, _) in services_section {
if let Some(name) = service_name.as_str() {
services.push(name.to_string());
}
}
}
Ok(services)
}
fn is_port_used_by_compose_service(
&self,
port: u16,
containers: &[DockerContainer],
service_name: &str,
compose_services: &[String],
) -> bool {
for container in containers {
if let Some(related_service) =
self.find_related_compose_service(&container.names, compose_services)
{
if related_service == service_name {
if container.ports.contains(&port.to_string()) {
debug!(
"Port {port} is used by compose service {service} container {container}",
port = port,
service = related_service,
container = container.names
);
return true;
}
}
}
}
false
}
fn find_related_compose_service(
&self,
container_name: &str,
compose_services: &[String],
) -> Option<String> {
let container_lower = container_name.to_lowercase();
for service_name in compose_services {
let service_lower = service_name.to_lowercase();
if container_lower.contains(&service_lower) {
return Some(service_name.clone());
}
if container_lower.contains(&format!("_{service_lower}_"))
|| container_lower.contains(&format!("-{service_lower}-"))
|| container_lower.ends_with(&format!("_{service_lower}"))
|| container_lower.ends_with(&format!("-{service_lower}"))
{
return Some(service_name.clone());
}
}
None
}
async fn get_running_containers(&self) -> Result<Vec<DockerContainer>, String> {
match new_local_docker_connection(DOCKER_SOCKET_PATH, None).await {
Ok(docker) => match DockerContainer::list(&docker).await {
Ok(containers) => {
debug!(
"Successfully obtained {count} container infos",
count = containers.len()
);
Ok(containers)
}
Err(e) => {
warn!(
"Failed to get container list: {error}",
error = e.to_string()
);
Err(format!(
"{}",
t!(
"port_manager.get_containers_failed_raw",
error = e.to_string()
)
))
}
},
Err(e) => {
warn!("Failed to connect Docker: {error}", error = e.to_string());
Err(format!(
"{}",
t!(
"port_manager.connect_docker_failed_raw",
error = e.to_string()
)
))
}
}
}
pub fn print_smart_conflict_report(&self, report: &PortConflictReport) {
if report.has_conflicts {
warn!("⚠️ Real port conflicts detected!");
warn!(
"Total checked: {total} port mappings",
total = report.total_checked
);
warn!(
"Conflict count: {count}",
count = report.conflicted_ports.len()
);
warn!("Conflict details:");
for conflict in &report.conflicted_ports {
warn!(
" 🔴 Port {port} is occupied by another process",
port = conflict.port
);
warn!(" Service: {service}", service = conflict.service_name);
warn!(" Mapping: {mapping}", mapping = conflict.mapping);
}
info!("💡 Suggestions:");
info!(" 1. Stop other process occupying the port");
info!(" 2. Modify port mappings in docker-compose.yml");
info!(" 3. Use command below to check port occupancy:");
for conflict in &report.conflicted_ports {
info!(" lsof -i :{port}", port = conflict.port);
}
} else {
info!("✅ Smart port check passed, no conflicts found");
info!(
"Total checked: {total} port mappings",
total = report.total_checked
);
info!("💡 Hint: related-service occupied ports are skipped");
}
}
}
impl Default for PortManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::env;
#[test]
fn test_nom_parse_simple_text() {
let result = parse_env_string("hello world");
assert!(result.is_ok());
let (remaining, expansions) = result.unwrap();
assert_eq!(remaining, "");
assert_eq!(expansions.len(), 1);
match &expansions[0] {
VarExpansion::Text(text) => assert_eq!(text, "hello world"),
_ => panic!("应该是文本"),
}
}
#[test]
fn test_nom_parse_simple_variable() {
let result = parse_env_string("${VAR_NAME}");
assert!(result.is_ok());
let (remaining, expansions) = result.unwrap();
assert_eq!(remaining, "");
assert_eq!(expansions.len(), 1);
match &expansions[0] {
VarExpansion::Variable(var_name) => assert_eq!(var_name, "VAR_NAME"),
_ => panic!("应该是变量"),
}
}
#[test]
fn test_nom_parse_variable_with_default() {
let result = parse_env_string("${VAR_NAME:-default_value}");
assert!(result.is_ok());
let (remaining, expansions) = result.unwrap();
assert_eq!(remaining, "");
assert_eq!(expansions.len(), 1);
match &expansions[0] {
VarExpansion::VariableWithDefault(var_name, default_value) => {
assert_eq!(var_name, "VAR_NAME");
assert_eq!(default_value, "default_value");
}
_ => panic!("应该是带默认值的变量"),
}
}
#[test]
fn test_nom_parse_mixed_content() {
let result = parse_env_string("Hello ${USER}, your port is ${PORT:-8080}!");
assert!(result.is_ok());
let (remaining, expansions) = result.unwrap();
assert_eq!(remaining, "");
assert_eq!(expansions.len(), 5);
match &expansions[0] {
VarExpansion::Text(text) => assert_eq!(text, "Hello "),
_ => panic!("第一个应该是文本"),
}
match &expansions[1] {
VarExpansion::Variable(var_name) => assert_eq!(var_name, "USER"),
_ => panic!("第二个应该是变量"),
}
match &expansions[2] {
VarExpansion::Text(text) => assert_eq!(text, ", your port is "),
_ => panic!("第三个应该是文本"),
}
match &expansions[3] {
VarExpansion::VariableWithDefault(var_name, default_value) => {
assert_eq!(var_name, "PORT");
assert_eq!(default_value, "8080");
}
_ => panic!("第四个应该是带默认值的变量"),
}
match &expansions[4] {
VarExpansion::Text(text) => assert_eq!(text, "!"),
_ => panic!("第五个应该是文本(感叹号)"),
}
}
#[test]
fn test_expand_env_vars_with_nom() {
let mut port_manager = PortManager::new();
port_manager
.env_vars
.insert("TEST_VAR".to_string(), "test_value".to_string());
let result = port_manager.expand_env_vars("${TEST_VAR}");
assert_eq!(result, "test_value");
let result = port_manager.expand_env_vars("${TEST_VAR:-default}");
assert_eq!(result, "test_value");
let result = port_manager.expand_env_vars("${UNDEFINED_VAR:-8080}");
assert_eq!(result, "8080");
let result = port_manager.expand_env_vars("Value: ${TEST_VAR}, Port: ${PORT:-3000}");
assert_eq!(result, "Value: test_value, Port: 3000");
let result = port_manager.expand_env_vars("no variables here");
assert_eq!(result, "no variables here");
}
#[test]
fn test_expand_env_vars_system_env() {
let port_manager = PortManager::new();
unsafe {
env::set_var("TEST_SYSTEM_VAR", "system_value");
}
let result = port_manager.expand_env_vars("${TEST_SYSTEM_VAR}");
assert_eq!(result, "system_value");
unsafe {
env::remove_var("TEST_SYSTEM_VAR");
}
}
#[test]
fn test_expand_env_vars_undefined_variable() {
let port_manager = PortManager::new();
let result = port_manager.expand_env_vars("${UNDEFINED_VAR}");
assert_eq!(result, "${UNDEFINED_VAR}");
}
}