use crate::agent::log_line::LogLine;
use crate::agent::task_logger::TaskLogger;
use crate::context::ContainerEngine;
use crate::context::ResolvedProxyConfig;
use crate::context::docker_client::DockerClient;
use crate::context::tsk_env::TskEnv;
use crate::tui::events::{ServerEvent, ServerEventSender};
use anyhow::{Context, Result};
use bollard::models::{ContainerCreateBody, HostConfig};
use bollard::query_parameters::RemoveContainerOptions;
use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::os::unix::io::AsRawFd;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
const TSK_AGENT_NETWORK_PREFIX: &str = "tsk-agent-";
const PROXY_IMAGE: &str = "tsk/proxy";
const PROXY_PORT: &str = "3128/tcp";
#[derive(Clone)]
pub struct ProxySyncManager {
fingerprint_locks: Arc<RwLock<HashMap<String, Arc<Mutex<()>>>>>,
runtime_dir: PathBuf,
}
impl ProxySyncManager {
fn new(runtime_dir: &Path) -> Self {
Self {
fingerprint_locks: Arc::new(RwLock::new(HashMap::new())),
runtime_dir: runtime_dir.to_path_buf(),
}
}
async fn with_lock<F, Fut, T>(&self, fingerprint: &str, operation: F) -> T
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = T>,
{
let lock = self.get_or_create_lock(fingerprint).await;
let _guard = lock.lock().await;
let lock_path = self.lock_path(fingerprint);
if let Some(parent) = lock_path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let _flock_file = acquire_flock(lock_path.clone()).await.unwrap_or_else(|e| {
panic!(
"Failed to acquire proxy file lock at {}: {}",
lock_path.display(),
e
)
});
operation().await
}
async fn get_or_create_lock(&self, fingerprint: &str) -> Arc<Mutex<()>> {
{
let locks = self.fingerprint_locks.read().await;
if let Some(lock) = locks.get(fingerprint) {
return Arc::clone(lock);
}
}
let mut locks = self.fingerprint_locks.write().await;
if let Some(lock) = locks.get(fingerprint) {
return Arc::clone(lock);
}
let new_lock = Arc::new(Mutex::new(()));
locks.insert(fingerprint.to_string(), Arc::clone(&new_lock));
new_lock
}
fn lock_path(&self, fingerprint: &str) -> PathBuf {
self.runtime_dir.join(format!("proxy-{fingerprint}.lock"))
}
}
async fn acquire_flock(lock_path: PathBuf) -> std::io::Result<File> {
tokio::task::spawn_blocking(move || {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&lock_path)?;
let ret = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX) };
if ret != 0 {
return Err(std::io::Error::last_os_error());
}
Ok(file)
})
.await
.expect("flock spawn_blocking task panicked")
}
pub struct ProxySession {
pub proxy_container_name: String,
pub network_name: String,
pub proxy_ip: Option<String>,
fingerprint: String,
}
pub struct ProxyManager {
docker_client: Arc<dyn DockerClient>,
tsk_env: Arc<TskEnv>,
container_engine: ContainerEngine,
event_sender: Option<ServerEventSender>,
sync_manager: ProxySyncManager,
}
impl ProxyManager {
pub fn new(
client: Arc<dyn DockerClient>,
tsk_env: Arc<TskEnv>,
container_engine: ContainerEngine,
event_sender: Option<ServerEventSender>,
) -> Self {
let sync_manager = ProxySyncManager::new(tsk_env.runtime_dir());
Self {
docker_client: client,
tsk_env,
container_engine,
event_sender,
sync_manager,
}
}
fn emit(&self, event: ServerEvent) {
crate::tui::events::emit_or_print(&self.event_sender, event);
}
pub(crate) async fn ensure_proxy(
&self,
proxy_config: &ResolvedProxyConfig,
logger: &TaskLogger,
) -> Result<String> {
let container_name = proxy_config.proxy_container_name();
if !self.is_proxy_running(&container_name).await? {
self.build_proxy(false, logger)
.await
.context("Failed to build proxy image")?;
}
let network_name = proxy_config.external_network_name();
self.ensure_network(&network_name)
.await
.context("Failed to ensure network exists")?;
self.ensure_proxy_container(proxy_config)
.await
.context("Failed to ensure proxy container is running")?;
self.wait_for_proxy_health(&container_name, logger)
.await
.context("Failed to wait for proxy health")?;
Ok(container_name)
}
pub(crate) async fn resolve_proxy_ip(
&self,
container_name: &str,
network_name: &str,
) -> Result<String> {
let json_data = self
.docker_client
.inspect_container(container_name)
.await
.map_err(|e| anyhow::anyhow!("Failed to inspect proxy container: {e}"))?;
let data: serde_json::Value = serde_json::from_str(&json_data)
.context("Failed to parse proxy container inspect data")?;
let ip = data
.get("NetworkSettings")
.and_then(|ns| ns.get("Networks"))
.and_then(|n| n.get(network_name))
.and_then(|net| net.get("IPAddress"))
.and_then(|ip| ip.as_str())
.filter(|ip| !ip.is_empty())
.ok_or_else(|| {
anyhow::anyhow!(
"Could not resolve IP for proxy container '{container_name}' on network '{network_name}'"
)
})?;
Ok(ip.to_string())
}
pub async fn build_proxy(&self, no_cache: bool, logger: &TaskLogger) -> Result<()> {
logger.log(LogLine::tsk_message(format!(
"Building proxy image: {PROXY_IMAGE}"
)));
use crate::assets::utils::extract_dockerfile_to_temp;
let legacy_squid_conf = self.tsk_env.config_dir().join("squid.conf");
if legacy_squid_conf.exists() {
logger.log(LogLine::tsk_warning(format!(
"Warning: {} is deprecated. Use squid_conf or squid_conf_path in tsk.toml instead.",
legacy_squid_conf.display()
)));
}
let dockerfile_dir = extract_dockerfile_to_temp("tsk-proxy")
.context("Failed to extract proxy Dockerfile")?;
let tar_archive = self
.create_tar_archive_from_directory(&dockerfile_dir)
.context("Failed to create tar archive for proxy build")?;
let _ = std::fs::remove_dir_all(&dockerfile_dir);
let mut options_builder = bollard::query_parameters::BuildImageOptionsBuilder::default();
options_builder = options_builder.dockerfile("Dockerfile");
options_builder = options_builder.t(PROXY_IMAGE);
options_builder = options_builder.nocache(no_cache);
if self.container_engine == ContainerEngine::Podman {
options_builder = options_builder.networkmode("host");
}
let options = options_builder.build();
let mut build_stream = self
.docker_client
.build_image(options, tar_archive)
.await
.map_err(|e| anyhow::anyhow!("Failed to build proxy image: {e}"))?;
use futures_util::StreamExt;
let mut build_output = String::new();
while let Some(result) = build_stream.next().await {
match result {
Ok(line) => {
build_output.push_str(&line);
}
Err(e) => {
logger.log(LogLine::tsk_message(&build_output));
return Err(anyhow::anyhow!("Failed to build proxy image: {e}"));
}
}
}
Ok(())
}
pub async fn stop_proxy(&self, container_name: &str) -> Result<()> {
self.emit(ServerEvent::StatusMessage(format!(
"Stopping tsk proxy container {container_name}..."
)));
match self
.docker_client
.remove_container(
container_name,
Some(RemoveContainerOptions {
force: true,
..Default::default()
}),
)
.await
{
Ok(_) => {
self.emit(ServerEvent::StatusMessage(format!(
"Proxy container {container_name} stopped successfully"
)));
Ok(())
}
Err(e) if e.to_lowercase().contains("no such container") => {
self.emit(ServerEvent::StatusMessage(format!(
"Proxy container {container_name} was not running"
)));
Ok(())
}
Err(e) => Err(anyhow::anyhow!(
"Failed to stop proxy container {container_name}: {e}"
)),
}
}
pub async fn is_proxy_running(&self, container_name: &str) -> Result<bool> {
match self.docker_client.inspect_container(container_name).await {
Ok(json_data) => {
let data: serde_json::Value = serde_json::from_str(&json_data)
.map_err(|e| anyhow::anyhow!("Failed to parse container info: {e}"))?;
Ok(data
.get("State")
.and_then(|s| s.get("Running"))
.and_then(|r| r.as_bool())
.unwrap_or(false))
}
Err(e) if e.to_lowercase().contains("no such container") => Ok(false),
Err(e) => Err(anyhow::anyhow!(e)),
}
}
pub async fn count_connected_agents(&self, container_name: &str) -> Result<usize> {
match self.docker_client.inspect_container(container_name).await {
Ok(json_data) => {
let data: serde_json::Value = serde_json::from_str(&json_data)
.map_err(|e| anyhow::anyhow!("Failed to parse container info: {e}"))?;
let count = data
.get("NetworkSettings")
.and_then(|ns| ns.get("Networks"))
.and_then(|n| n.as_object())
.map(|networks| {
networks
.keys()
.filter(|name| name.starts_with(TSK_AGENT_NETWORK_PREFIX))
.count()
})
.unwrap_or(0);
Ok(count)
}
Err(e) if e.to_lowercase().contains("no such container") => Ok(0),
Err(e) => Err(anyhow::anyhow!(e)),
}
}
pub async fn acquire_proxy(
&self,
task_id: &str,
proxy_config: &ResolvedProxyConfig,
logger: &TaskLogger,
) -> Result<ProxySession> {
let fingerprint = proxy_config.fingerprint();
let task_id = task_id.to_string();
let proxy_config = proxy_config.clone();
self.sync_manager
.with_lock(&fingerprint, || async {
let proxy_container_name = self.ensure_proxy(&proxy_config, logger).await?;
let network_name = self.create_agent_network(&task_id).await?;
if let Err(e) = self
.connect_proxy_to_network(&proxy_container_name, &network_name)
.await
{
self.cleanup_agent_network(&proxy_container_name, &network_name)
.await;
return Err(e);
}
let proxy_ip = match self
.resolve_proxy_ip(&proxy_container_name, &network_name)
.await
{
Ok(ip) => Some(ip),
Err(e) => {
logger.log(LogLine::tsk_warning(format!(
"Warning: Could not resolve proxy IP for extra_hosts: {e}"
)));
None
}
};
Ok(ProxySession {
proxy_container_name,
network_name,
proxy_ip,
fingerprint: fingerprint.clone(),
})
})
.await
}
pub async fn release_proxy(&self, session: &ProxySession) {
self.sync_manager
.with_lock(&session.fingerprint, || async {
self.cleanup_agent_network(&session.proxy_container_name, &session.network_name)
.await;
self.maybe_stop_proxy_by_name(&session.proxy_container_name)
.await;
})
.await;
}
pub async fn force_stop_proxy(&self, proxy_config: &ResolvedProxyConfig) -> Result<bool> {
let container_name = proxy_config.proxy_container_name();
if !self.is_proxy_running(&container_name).await? {
return Ok(false);
}
let agent_count = self.count_connected_agents(&container_name).await?;
if agent_count == 0 {
self.stop_proxy(&container_name).await?;
Ok(true)
} else {
Ok(false)
}
}
async fn maybe_stop_proxy_by_name(&self, container_name: &str) {
match self.is_proxy_running(container_name).await {
Ok(true) => {}
_ => return,
}
if let Ok(0) = self.count_connected_agents(container_name).await
&& let Err(e) = self.stop_proxy(container_name).await
{
self.emit(ServerEvent::WarningMessage(format!(
"Warning: Failed to stop idle proxy {container_name}: {e}"
)));
}
}
async fn ensure_network(&self, network_name: &str) -> Result<()> {
if !self
.docker_client
.network_exists(network_name)
.await
.map_err(|e| anyhow::anyhow!("Failed to check if network exists: {e}"))?
{
self.docker_client
.create_network(network_name)
.await
.map_err(|e| anyhow::anyhow!("Failed to create network: {e}"))?;
}
Ok(())
}
async fn ensure_proxy_container(&self, proxy_config: &ResolvedProxyConfig) -> Result<()> {
let container_name = proxy_config.proxy_container_name();
let network_name = proxy_config.external_network_name();
let host_ports_env = format!("TSK_HOST_PORTS={}", proxy_config.host_ports_env());
let binds = if let Some(ref squid_conf_content) = proxy_config.squid_conf {
let fingerprint = proxy_config.fingerprint();
let proxy_conf_dir = self.tsk_env.proxy_config_dir(&fingerprint);
std::fs::create_dir_all(&proxy_conf_dir)
.context("Failed to create proxy config directory")?;
let squid_conf_path = proxy_conf_dir.join("squid.conf");
std::fs::write(&squid_conf_path, squid_conf_content)
.context("Failed to write squid.conf")?;
Some(vec![format!(
"{}:/etc/squid/squid.conf:ro",
squid_conf_path.display()
)])
} else {
None
};
let container_config = ContainerCreateBody {
image: Some(PROXY_IMAGE.to_string()),
exposed_ports: Some(vec![PROXY_PORT.to_string()]),
env: Some(vec![host_ports_env]),
host_config: Some(HostConfig {
network_mode: Some(network_name),
extra_hosts: Some(vec!["host.docker.internal:host-gateway".to_string()]),
restart_policy: Some(bollard::models::RestartPolicy {
name: Some(bollard::models::RestartPolicyNameEnum::UNLESS_STOPPED),
maximum_retry_count: None,
}),
binds,
readonly_rootfs: Some(true),
cap_drop: Some(vec!["ALL".to_string()]),
cap_add: Some(vec![
"NET_ADMIN".to_string(), "SETUID".to_string(), "SETGID".to_string(), "CHOWN".to_string(), ]),
security_opt: Some(vec!["no-new-privileges:true".to_string()]),
tmpfs: Some(HashMap::from([
("/var/cache/squid".to_string(), "size=10m".to_string()),
("/var/log/squid".to_string(), "size=50m".to_string()),
("/var/run/squid".to_string(), "size=1m".to_string()),
])),
..Default::default()
}),
..Default::default()
};
let create_options = bollard::query_parameters::CreateContainerOptionsBuilder::default()
.name(&container_name)
.build();
match self
.docker_client
.create_container(Some(create_options), container_config)
.await
{
Ok(_) => {
self.docker_client
.start_container(&container_name)
.await
.map_err(|e| anyhow::anyhow!("Failed to start proxy container: {e}"))?;
}
Err(e) => {
if e.to_lowercase().contains("already in use") {
match self.docker_client.start_container(&container_name).await {
Ok(_) => (),
Err(e) if e.to_lowercase().contains("already started") => (),
Err(e) => {
return Err(anyhow::anyhow!("Failed to start proxy container: {e}"));
}
}
} else {
return Err(anyhow::anyhow!("Failed to create proxy container: {e}"));
}
}
}
Ok(())
}
async fn wait_for_proxy_health(&self, container_name: &str, logger: &TaskLogger) -> Result<()> {
const MAX_RETRIES: u32 = 30; const RETRY_DELAY_MS: u64 = 1000;
for attempt in 1..=MAX_RETRIES {
match self.docker_client.inspect_container(container_name).await {
Ok(json_data) => {
if let Ok(data) = serde_json::from_str::<serde_json::Value>(&json_data) {
if let Some(state) = data.get("State") {
if let Some(running) = state.get("Running").and_then(|v| v.as_bool())
&& !running
{
return Err(anyhow::anyhow!("Proxy container is not running"));
}
if let Some(health) = state.get("Health") {
if let Some(status) = health.get("Status").and_then(|v| v.as_str())
{
match status {
"healthy" => {
logger.log(LogLine::tsk_message(
"Proxy container is healthy",
));
return Ok(());
}
"unhealthy" => {
return Err(anyhow::anyhow!(
"Proxy container is unhealthy"
));
}
"starting" => {
if attempt == 1 {
logger.log(LogLine::tsk_message(
"Waiting for proxy container to become healthy...",
));
}
}
_ => {
}
}
}
} else {
logger.log(LogLine::tsk_message(
"Proxy container is running (no health check configured)",
));
return Ok(());
}
}
}
}
Err(e) if e.to_lowercase().contains("no such container") => {
return Err(anyhow::anyhow!("Proxy container not found"));
}
Err(_) => {
}
}
if attempt < MAX_RETRIES {
tokio::time::sleep(tokio::time::Duration::from_millis(RETRY_DELAY_MS)).await;
}
}
Err(anyhow::anyhow!(
"Proxy container failed to become healthy after {} seconds",
MAX_RETRIES
))
}
fn create_tar_archive_from_directory(&self, dir_path: &Path) -> Result<Vec<u8>> {
use tar::Builder;
let mut tar_data = Vec::new();
{
let mut builder = Builder::new(&mut tar_data);
builder.append_dir_all(".", dir_path)?;
builder.finish()?;
}
Ok(tar_data)
}
pub(crate) async fn create_agent_network(&self, task_id: &str) -> Result<String> {
let network_name = Self::agent_network_name(task_id);
self.docker_client
.create_internal_network(&network_name)
.await
.map_err(|e| anyhow::anyhow!("Failed to create agent network: {e}"))?;
Ok(network_name)
}
pub(crate) async fn connect_proxy_to_network(
&self,
proxy_container_name: &str,
network_name: &str,
) -> Result<()> {
self.docker_client
.connect_container_to_network(proxy_container_name, network_name)
.await
.map_err(|e| anyhow::anyhow!("Failed to connect proxy to network: {e}"))
}
pub(crate) async fn cleanup_agent_network(
&self,
proxy_container_name: &str,
network_name: &str,
) {
if let Err(e) = self
.docker_client
.disconnect_container_from_network(proxy_container_name, network_name)
.await
{
self.emit(ServerEvent::WarningMessage(format!(
"Warning: Failed to disconnect proxy from network {network_name}: {e}"
)));
}
if let Err(e) = self.docker_client.remove_network(network_name).await {
self.emit(ServerEvent::WarningMessage(format!(
"Warning: Failed to remove network {network_name}: {e}"
)));
}
}
pub fn agent_network_name(task_id: &str) -> String {
format!("{TSK_AGENT_NETWORK_PREFIX}{task_id}")
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::agent::task_logger::TaskLogger;
use crate::context::AppContext;
use crate::test_utils::TrackedDockerClient;
fn default_proxy_config() -> ResolvedProxyConfig {
ResolvedProxyConfig {
host_ports: vec![],
squid_conf: None,
}
}
#[tokio::test]
async fn test_ensure_proxy_success() {
let mock_client = Arc::new(TrackedDockerClient::default());
let ctx = AppContext::builder().build();
let manager = ProxyManager::new(
mock_client.clone(),
ctx.tsk_env(),
ContainerEngine::Docker,
None,
);
let proxy_config = default_proxy_config();
let result = manager
.ensure_proxy(&proxy_config, &TaskLogger::no_file())
.await;
assert!(result.is_ok());
let container_name = result.unwrap();
assert_eq!(container_name, proxy_config.proxy_container_name());
let create_calls = mock_client.create_container_calls.lock().unwrap();
assert_eq!(create_calls.len(), 1);
let (options, config) = &create_calls[0];
assert_eq!(
options.as_ref().unwrap().name,
Some(proxy_config.proxy_container_name())
);
let host_config = config.host_config.as_ref().unwrap();
let extra_hosts = host_config.extra_hosts.as_ref().unwrap();
assert!(extra_hosts.contains(&"host.docker.internal:host-gateway".to_string()));
let env = config.env.as_ref().unwrap();
assert!(env.iter().any(|e| e.starts_with("TSK_HOST_PORTS=")));
let start_calls = mock_client.start_container_calls.lock().unwrap();
assert_eq!(start_calls.len(), 1);
assert_eq!(start_calls[0], proxy_config.proxy_container_name());
}
#[tokio::test]
async fn test_ensure_proxy_with_host_ports() {
let mock_client = Arc::new(TrackedDockerClient::default());
let ctx = AppContext::builder().build();
let manager = ProxyManager::new(
mock_client.clone(),
ctx.tsk_env(),
ContainerEngine::Docker,
None,
);
let proxy_config = ResolvedProxyConfig {
host_ports: vec![5432, 6379],
squid_conf: None,
};
let result = manager
.ensure_proxy(&proxy_config, &TaskLogger::no_file())
.await;
assert!(result.is_ok());
let create_calls = mock_client.create_container_calls.lock().unwrap();
let (_, config) = &create_calls[0];
let env = config.env.as_ref().unwrap();
assert!(env.contains(&"TSK_HOST_PORTS=5432,6379".to_string()));
}
#[tokio::test]
async fn test_stop_proxy_success() {
let mock_client = Arc::new(TrackedDockerClient::default());
let ctx = AppContext::builder().build();
let manager = ProxyManager::new(
mock_client.clone(),
ctx.tsk_env(),
ContainerEngine::Docker,
None,
);
let proxy_config = default_proxy_config();
let container_name = proxy_config.proxy_container_name();
let result = manager.stop_proxy(&container_name).await;
assert!(result.is_ok());
let remove_calls = mock_client.remove_container_calls.lock().unwrap();
assert_eq!(remove_calls.len(), 1);
assert_eq!(remove_calls[0].0, container_name);
assert!(remove_calls[0].1.as_ref().unwrap().force);
}
#[tokio::test]
async fn test_stop_proxy_container_not_found() {
use crate::context::docker_client::DockerClient;
use async_trait::async_trait;
use bollard::models::ContainerCreateBody;
use bollard::query_parameters::*;
use futures_util::Stream;
struct NoContainerDockerClient;
#[async_trait]
impl DockerClient for NoContainerDockerClient {
async fn remove_container(
&self,
_id: &str,
_options: Option<RemoveContainerOptions>,
) -> Result<(), String> {
Err("No such container: tsk-proxy-abcd1234".to_string())
}
async fn create_container(
&self,
_options: Option<CreateContainerOptions>,
_config: ContainerCreateBody,
) -> Result<String, String> {
Ok("test-id".to_string())
}
async fn start_container(&self, _id: &str) -> Result<(), String> {
Ok(())
}
async fn wait_container(&self, _id: &str) -> Result<i64, String> {
Ok(0)
}
async fn kill_container(&self, _id: &str) -> Result<(), String> {
Ok(())
}
async fn logs(
&self,
_id: &str,
_options: Option<LogsOptions>,
) -> Result<String, String> {
Ok("".to_string())
}
async fn logs_stream(
&self,
_id: &str,
_options: Option<LogsOptions>,
) -> Result<Box<dyn Stream<Item = Result<String, String>> + Send + Unpin>, String>
{
use futures_util::stream;
let stream = stream::once(async { Ok("".to_string()) });
Ok(Box::new(Box::pin(stream)))
}
async fn create_network(&self, _name: &str) -> Result<String, String> {
Ok("network-id".to_string())
}
async fn network_exists(&self, _name: &str) -> Result<bool, String> {
Ok(true)
}
async fn build_image(
&self,
_options: BuildImageOptions,
_tar_archive: Vec<u8>,
) -> Result<Box<dyn Stream<Item = Result<String, String>> + Send + Unpin>, String>
{
use futures_util::stream;
let stream = stream::once(async { Ok("Building...".to_string()) });
Ok(Box::new(Box::pin(stream)))
}
async fn image_exists(&self, _tag: &str) -> Result<bool, String> {
Ok(true)
}
async fn inspect_container(&self, _id: &str) -> Result<String, String> {
Ok(r#"{"State": {"Health": {"Status": "healthy"}}}"#.to_string())
}
async fn attach_container(&self, _id: &str) -> Result<(), String> {
Ok(())
}
async fn upload_to_container(
&self,
_id: &str,
_dest_path: &str,
_tar_data: Vec<u8>,
) -> Result<(), String> {
Ok(())
}
async fn create_internal_network(&self, _name: &str) -> Result<String, String> {
Ok("internal-network-id".to_string())
}
async fn connect_container_to_network(
&self,
_container: &str,
_network: &str,
) -> Result<(), String> {
Ok(())
}
async fn disconnect_container_from_network(
&self,
_container: &str,
_network: &str,
) -> Result<(), String> {
Ok(())
}
async fn remove_network(&self, _name: &str) -> Result<(), String> {
Ok(())
}
async fn ping(&self) -> Result<String, String> {
Ok("OK".to_string())
}
}
let mock_client: Arc<dyn DockerClient> = Arc::new(NoContainerDockerClient);
let ctx = AppContext::builder().build();
let manager = ProxyManager::new(mock_client, ctx.tsk_env(), ContainerEngine::Docker, None);
let proxy_config = default_proxy_config();
let result = manager
.stop_proxy(&proxy_config.proxy_container_name())
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_wait_for_proxy_health_success() {
use serde_json::json;
let mock_client = Arc::new(TrackedDockerClient {
inspect_container_response: json!({
"State": {
"Running": true,
"Health": {
"Status": "healthy"
}
}
})
.to_string(),
..Default::default()
});
let ctx = AppContext::builder().build();
let manager = ProxyManager::new(
mock_client.clone(),
ctx.tsk_env(),
ContainerEngine::Docker,
None,
);
let result = manager
.wait_for_proxy_health("tsk-proxy-test", &TaskLogger::no_file())
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_wait_for_proxy_health_unhealthy() {
use serde_json::json;
let mock_client = Arc::new(TrackedDockerClient {
inspect_container_response: json!({
"State": {
"Running": true,
"Health": {
"Status": "unhealthy"
}
}
})
.to_string(),
..Default::default()
});
let ctx = AppContext::builder().build();
let manager = ProxyManager::new(
mock_client.clone(),
ctx.tsk_env(),
ContainerEngine::Docker,
None,
);
let result = manager
.wait_for_proxy_health("tsk-proxy-test", &TaskLogger::no_file())
.await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("unhealthy"));
}
#[tokio::test]
async fn test_wait_for_proxy_health_no_health_check() {
use serde_json::json;
let mock_client = Arc::new(TrackedDockerClient {
inspect_container_response: json!({
"State": {
"Running": true
}
})
.to_string(),
..Default::default()
});
let ctx = AppContext::builder().build();
let manager = ProxyManager::new(
mock_client.clone(),
ctx.tsk_env(),
ContainerEngine::Docker,
None,
);
let result = manager
.wait_for_proxy_health("tsk-proxy-test", &TaskLogger::no_file())
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_wait_for_proxy_health_not_running() {
use serde_json::json;
let mock_client = Arc::new(TrackedDockerClient {
inspect_container_response: json!({
"State": {
"Running": false
}
})
.to_string(),
..Default::default()
});
let ctx = AppContext::builder().build();
let manager = ProxyManager::new(
mock_client.clone(),
ctx.tsk_env(),
ContainerEngine::Docker,
None,
);
let result = manager
.wait_for_proxy_health("tsk-proxy-test", &TaskLogger::no_file())
.await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("not running"));
}
#[test]
fn test_agent_network_name() {
assert_eq!(
ProxyManager::agent_network_name("test-task-123"),
"tsk-agent-test-task-123"
);
assert_eq!(
ProxyManager::agent_network_name("2024-01-15-feat-auth"),
"tsk-agent-2024-01-15-feat-auth"
);
}
#[tokio::test]
async fn test_create_agent_network() {
let mock_client = Arc::new(TrackedDockerClient::default());
let ctx = AppContext::builder().build();
let manager = ProxyManager::new(
mock_client.clone(),
ctx.tsk_env(),
ContainerEngine::Docker,
None,
);
let result = manager.create_agent_network("test-task-123").await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), "tsk-agent-test-task-123");
let calls = mock_client.create_internal_network_calls.lock().unwrap();
assert_eq!(calls.len(), 1);
assert_eq!(calls[0], "tsk-agent-test-task-123");
}
#[tokio::test]
async fn test_create_agent_network_error() {
let mock_client = Arc::new(TrackedDockerClient {
create_internal_network_error: Some("Network creation failed".to_string()),
..Default::default()
});
let ctx = AppContext::builder().build();
let manager = ProxyManager::new(
mock_client.clone(),
ctx.tsk_env(),
ContainerEngine::Docker,
None,
);
let result = manager.create_agent_network("test-task-123").await;
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Failed to create agent network")
);
}
#[tokio::test]
async fn test_connect_proxy_to_network() {
let mock_client = Arc::new(TrackedDockerClient::default());
let ctx = AppContext::builder().build();
let manager = ProxyManager::new(
mock_client.clone(),
ctx.tsk_env(),
ContainerEngine::Docker,
None,
);
let proxy_config = default_proxy_config();
let container_name = proxy_config.proxy_container_name();
let result = manager
.connect_proxy_to_network(&container_name, "tsk-agent-test-123")
.await;
assert!(result.is_ok());
let calls = mock_client.connect_network_calls.lock().unwrap();
assert_eq!(calls.len(), 1);
assert_eq!(calls[0], (container_name, "tsk-agent-test-123".to_string()));
}
#[tokio::test]
async fn test_cleanup_agent_network() {
let mock_client = Arc::new(TrackedDockerClient::default());
let ctx = AppContext::builder().build();
let manager = ProxyManager::new(
mock_client.clone(),
ctx.tsk_env(),
ContainerEngine::Docker,
None,
);
let proxy_config = default_proxy_config();
let container_name = proxy_config.proxy_container_name();
manager
.cleanup_agent_network(&container_name, "tsk-agent-test-123")
.await;
let disconnect_calls = mock_client.disconnect_network_calls.lock().unwrap();
assert_eq!(disconnect_calls.len(), 1);
assert_eq!(
disconnect_calls[0],
(container_name, "tsk-agent-test-123".to_string())
);
let remove_calls = mock_client.remove_network_calls.lock().unwrap();
assert_eq!(remove_calls.len(), 1);
assert_eq!(remove_calls[0], "tsk-agent-test-123");
}
#[tokio::test]
async fn test_cleanup_agent_network_handles_errors_gracefully() {
let mock_client = Arc::new(TrackedDockerClient {
remove_network_error: Some("Network in use".to_string()),
..Default::default()
});
let ctx = AppContext::builder().build();
let manager = ProxyManager::new(
mock_client.clone(),
ctx.tsk_env(),
ContainerEngine::Docker,
None,
);
let proxy_config = default_proxy_config();
manager
.cleanup_agent_network(&proxy_config.proxy_container_name(), "tsk-agent-test-123")
.await;
let disconnect_calls = mock_client.disconnect_network_calls.lock().unwrap();
assert_eq!(disconnect_calls.len(), 1);
let remove_calls = mock_client.remove_network_calls.lock().unwrap();
assert_eq!(remove_calls.len(), 1);
}
#[tokio::test]
async fn test_build_proxy_without_custom_squid_conf() {
let mock_client = Arc::new(TrackedDockerClient::default());
let ctx = AppContext::builder().build();
let manager = ProxyManager::new(
mock_client.clone(),
ctx.tsk_env(),
ContainerEngine::Docker,
None,
);
let result = manager.build_proxy(false, &TaskLogger::no_file()).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_is_proxy_running_true() {
use serde_json::json;
let mock_client = Arc::new(TrackedDockerClient {
inspect_container_response: json!({
"State": {
"Running": true
}
})
.to_string(),
..Default::default()
});
let ctx = AppContext::builder().build();
let manager = ProxyManager::new(mock_client, ctx.tsk_env(), ContainerEngine::Docker, None);
let result = manager.is_proxy_running("tsk-proxy-test").await;
assert!(result.is_ok());
assert!(result.unwrap());
}
#[tokio::test]
async fn test_is_proxy_running_false() {
use serde_json::json;
let mock_client = Arc::new(TrackedDockerClient {
inspect_container_response: json!({
"State": {
"Running": false
}
})
.to_string(),
..Default::default()
});
let ctx = AppContext::builder().build();
let manager = ProxyManager::new(mock_client, ctx.tsk_env(), ContainerEngine::Docker, None);
let result = manager.is_proxy_running("tsk-proxy-test").await;
assert!(result.is_ok());
assert!(!result.unwrap());
}
#[tokio::test]
async fn test_count_connected_agents() {
use serde_json::json;
let mock_client = Arc::new(TrackedDockerClient {
inspect_container_response: json!({
"NetworkSettings": {
"Networks": {
"tsk-external": {},
"tsk-agent-task1": {},
"tsk-agent-task2": {},
"tsk-agent-task3": {}
}
}
})
.to_string(),
..Default::default()
});
let ctx = AppContext::builder().build();
let manager = ProxyManager::new(mock_client, ctx.tsk_env(), ContainerEngine::Docker, None);
let result = manager.count_connected_agents("tsk-proxy-test").await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), 3);
}
#[tokio::test]
async fn test_force_stop_proxy_no_agents() {
use serde_json::json;
let mock_client = Arc::new(TrackedDockerClient {
inspect_container_response: json!({
"State": {
"Running": true
},
"NetworkSettings": {
"Networks": {
"tsk-external": {}
}
}
})
.to_string(),
..Default::default()
});
let ctx = AppContext::builder().build();
let manager = ProxyManager::new(
mock_client.clone(),
ctx.tsk_env(),
ContainerEngine::Docker,
None,
);
let proxy_config = default_proxy_config();
let result = manager.force_stop_proxy(&proxy_config).await;
assert!(result.is_ok());
assert!(result.unwrap());
let remove_calls = mock_client.remove_container_calls.lock().unwrap();
assert_eq!(remove_calls.len(), 1);
assert_eq!(remove_calls[0].0, proxy_config.proxy_container_name());
}
#[tokio::test]
async fn test_force_stop_proxy_with_agents() {
use serde_json::json;
let mock_client = Arc::new(TrackedDockerClient {
inspect_container_response: json!({
"State": {
"Running": true
},
"NetworkSettings": {
"Networks": {
"tsk-external": {},
"tsk-agent-task1": {},
"tsk-agent-task2": {}
}
}
})
.to_string(),
..Default::default()
});
let ctx = AppContext::builder().build();
let manager = ProxyManager::new(
mock_client.clone(),
ctx.tsk_env(),
ContainerEngine::Docker,
None,
);
let proxy_config = default_proxy_config();
let result = manager.force_stop_proxy(&proxy_config).await;
assert!(result.is_ok());
assert!(!result.unwrap());
let remove_calls = mock_client.remove_container_calls.lock().unwrap();
assert_eq!(remove_calls.len(), 0);
}
#[tokio::test]
async fn test_ensure_proxy_with_squid_conf_mount() {
let mock_client = Arc::new(TrackedDockerClient::default());
let ctx = AppContext::builder().build();
let manager = ProxyManager::new(
mock_client.clone(),
ctx.tsk_env(),
ContainerEngine::Docker,
None,
);
let proxy_config = ResolvedProxyConfig {
host_ports: vec![],
squid_conf: Some("http_port 3128\nacl custom src all".to_string()),
};
let result = manager
.ensure_proxy(&proxy_config, &TaskLogger::no_file())
.await;
assert!(result.is_ok());
let create_calls = mock_client.create_container_calls.lock().unwrap();
let (_, config) = &create_calls[0];
let host_config = config.host_config.as_ref().unwrap();
let binds = host_config.binds.as_ref().unwrap();
assert_eq!(binds.len(), 1);
assert!(binds[0].ends_with("squid.conf:/etc/squid/squid.conf:ro"));
let fingerprint = proxy_config.fingerprint();
let squid_path = ctx
.tsk_env()
.proxy_config_dir(&fingerprint)
.join("squid.conf");
assert!(squid_path.exists());
let content = std::fs::read_to_string(&squid_path).unwrap();
assert_eq!(content, "http_port 3128\nacl custom src all");
}
#[tokio::test]
async fn test_resolve_proxy_ip() {
use serde_json::json;
let agent_network = "tsk-agent-test-task";
let mock_client = Arc::new(TrackedDockerClient {
inspect_container_response: json!({
"State": {
"Health": {
"Status": "healthy"
}
},
"NetworkSettings": {
"Networks": {
agent_network: {
"IPAddress": "172.18.0.2"
}
}
}
})
.to_string(),
..Default::default()
});
let ctx = AppContext::builder().build();
let manager = ProxyManager::new(mock_client, ctx.tsk_env(), ContainerEngine::Docker, None);
let result = manager
.resolve_proxy_ip("tsk-proxy-test", agent_network)
.await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), "172.18.0.2");
}
#[tokio::test]
async fn test_acquire_proxy_creates_network_and_connects() {
use serde_json::json;
let mock_client = Arc::new(TrackedDockerClient {
inspect_container_response: json!({
"State": {
"Health": {
"Status": "healthy"
}
},
"NetworkSettings": {
"Networks": {
"tsk-agent-test-task": {
"IPAddress": "172.18.0.5"
}
}
}
})
.to_string(),
..Default::default()
});
let ctx = AppContext::builder().build();
let manager = ProxyManager::new(
mock_client.clone(),
ctx.tsk_env(),
ContainerEngine::Docker,
None,
);
let proxy_config = default_proxy_config();
let session = manager
.acquire_proxy("test-task", &proxy_config, &TaskLogger::no_file())
.await;
assert!(session.is_ok());
let session = session.unwrap();
assert_eq!(
session.proxy_container_name,
proxy_config.proxy_container_name()
);
assert_eq!(session.network_name, "tsk-agent-test-task");
assert_eq!(session.proxy_ip, Some("172.18.0.5".to_string()));
let internal_calls = mock_client.create_internal_network_calls.lock().unwrap();
assert_eq!(internal_calls.len(), 1);
assert_eq!(internal_calls[0], "tsk-agent-test-task");
let connect_calls = mock_client.connect_network_calls.lock().unwrap();
assert_eq!(connect_calls.len(), 1);
assert_eq!(
connect_calls[0],
(
proxy_config.proxy_container_name(),
"tsk-agent-test-task".to_string()
)
);
}
#[tokio::test]
async fn test_release_proxy_cleans_up_network_and_stops_idle_proxy() {
use serde_json::json;
let mock_client = Arc::new(TrackedDockerClient {
inspect_container_response: json!({
"State": {
"Running": true
},
"NetworkSettings": {
"Networks": {
"tsk-external": {}
}
}
})
.to_string(),
..Default::default()
});
let ctx = AppContext::builder().build();
let proxy_config = default_proxy_config();
let manager = ProxyManager::new(
mock_client.clone(),
ctx.tsk_env(),
ContainerEngine::Docker,
None,
);
let session = ProxySession {
proxy_container_name: proxy_config.proxy_container_name(),
network_name: "tsk-agent-test-task".to_string(),
proxy_ip: Some("172.18.0.5".to_string()),
fingerprint: proxy_config.fingerprint(),
};
manager.release_proxy(&session).await;
let disconnect_calls = mock_client.disconnect_network_calls.lock().unwrap();
assert_eq!(disconnect_calls.len(), 1);
assert_eq!(
disconnect_calls[0],
(
proxy_config.proxy_container_name(),
"tsk-agent-test-task".to_string()
)
);
let remove_network_calls = mock_client.remove_network_calls.lock().unwrap();
assert_eq!(remove_network_calls.len(), 1);
assert_eq!(remove_network_calls[0], "tsk-agent-test-task");
let remove_container_calls = mock_client.remove_container_calls.lock().unwrap();
assert_eq!(remove_container_calls.len(), 1);
assert_eq!(
remove_container_calls[0].0,
proxy_config.proxy_container_name()
);
}
}