use koda_core::{
agent::KodaAgent, config::ProviderType, providers, session::KodaSession, tools::ToolRegistry,
trust::TrustMode,
};
use koda_test_utils::Env;
use std::sync::Arc;
fn build_agent(root: std::path::PathBuf, max_context_tokens: usize) -> Arc<KodaAgent> {
let tools = ToolRegistry::new(root.clone(), max_context_tokens);
let tool_defs = ToolRegistry::new(root.clone(), max_context_tokens).get_definitions(&[], &[]);
Arc::new(KodaAgent {
project_root: root,
tools,
tool_defs,
system_prompt: "You are a test assistant.".to_string(),
})
}
async fn make_real_session(env: &Env) -> KodaSession {
let agent = build_agent(env.root.clone(), env.config.max_context_tokens);
let _ = providers::create_provider(&env.config); KodaSession::new(
env.session_id.clone(),
agent,
env.db.clone(),
&env.config,
TrustMode::Auto,
)
.await
}
#[tokio::test]
async fn session_new_auto_spawns_proxy() {
let env = Env::builder()
.provider_type(ProviderType::Mock)
.build()
.await;
let session = make_real_session(&env).await;
assert!(
session.proxy.is_some(),
"KodaSession::new must auto-spawn the built-in proxy"
);
let port = session.proxy.as_ref().unwrap().port;
assert!(port > 0, "proxy must report a real ephemeral port");
assert_eq!(
session.agent.tools.proxy_port(),
Some(port),
"agent's ToolRegistry must hold the same port the session spawned"
);
}
#[tokio::test]
async fn bash_sees_proxy_env_pointing_at_session_proxy() {
let env = Env::builder()
.provider_type(ProviderType::Mock)
.build()
.await;
let session = make_real_session(&env).await;
let port = session.proxy.as_ref().expect("proxy auto-spawned").port;
let result = session
.agent
.tools
.execute("Bash", r#"{"command":"echo \"$HTTPS_PROXY\""}"#, None, None)
.await;
assert!(
result.success,
"Bash dispatch must succeed; got output: {}",
result.output
);
let combined = format!(
"{}\n{}",
result.output,
result.full_output.as_deref().unwrap_or("")
);
let prefix = "http://127.0.0.1:";
let url_line = combined
.lines()
.find(|l| l.starts_with(prefix))
.unwrap_or_else(|| panic!("no HTTPS_PROXY url in {combined:?}"));
let port_str = &url_line[prefix.len()..];
let observed_port: u16 = port_str
.trim()
.parse()
.unwrap_or_else(|_| panic!("HTTPS_PROXY port not parseable: {url_line:?}"));
assert!(
observed_port > 0,
"HTTPS_PROXY must contain a non-zero port (host proxy is {port}); got {url_line:?}"
);
}
#[tokio::test]
async fn session_proxy_accepts_tcp_connections() {
let env = Env::builder()
.provider_type(ProviderType::Mock)
.build()
.await;
let session = make_real_session(&env).await;
let port = session.proxy.as_ref().expect("proxy spawned").port;
let conn = tokio::net::TcpStream::connect(("127.0.0.1", port)).await;
assert!(
conn.is_ok(),
"proxy must accept connections on 127.0.0.1:{port}; got: {conn:?}"
);
}
#[tokio::test]
async fn distinct_sessions_get_distinct_proxies() {
let env_a = Env::builder()
.provider_type(ProviderType::Mock)
.build()
.await;
let env_b = Env::builder()
.provider_type(ProviderType::Mock)
.build()
.await;
let sess_a = make_real_session(&env_a).await;
let sess_b = make_real_session(&env_b).await;
let port_a = sess_a.proxy.as_ref().expect("a spawned").port;
let port_b = sess_b.proxy.as_ref().expect("b spawned").port;
assert_ne!(
port_a, port_b,
"each session must get its own ephemeral proxy port"
);
}
#[tokio::test]
async fn dropping_session_releases_proxy_port() {
let env = Env::builder()
.provider_type(ProviderType::Mock)
.build()
.await;
let port = {
let session = make_real_session(&env).await;
session.proxy.as_ref().expect("spawned").port
};
let mut bound = None;
for _ in 0..50 {
match std::net::TcpListener::bind(("127.0.0.1", port)) {
Ok(l) => {
bound = Some(l);
break;
}
Err(_) => tokio::time::sleep(std::time::Duration::from_millis(20)).await,
}
}
assert!(
bound.is_some(),
"after session drop, port {port} must be re-bindable (proxy task did not abort)"
);
}
#[cfg(target_os = "macos")]
#[tokio::test]
async fn macos_kernel_blocks_direct_tcp_to_non_proxy_port() {
let env = Env::builder()
.provider_type(ProviderType::Mock)
.build()
.await;
let session = make_real_session(&env).await;
let proxy_port = session.proxy.as_ref().expect("proxy spawned").port;
let target_port = if proxy_port == 1 { 2 } else { 1 };
let cmd = format!(
r#"{{"command":"if exec 3<>/dev/tcp/127.0.0.1/{target_port} 2>/dev/null; then echo connected; exec 3<&-; else echo blocked; fi"}}"#,
);
let result = session.agent.tools.execute("Bash", &cmd, None, None).await;
let combined = format!(
"{}\n{}",
result.output,
result.full_output.as_deref().unwrap_or("")
);
assert!(
combined.contains("blocked"),
"kernel-enforced sandbox must refuse direct TCP to non-proxy port \
{target_port} (proxy is on {proxy_port}); got:\n{combined}"
);
}
#[cfg(target_os = "macos")]
#[tokio::test]
async fn macos_kernel_allows_tcp_to_proxy_port() {
let env = Env::builder()
.provider_type(ProviderType::Mock)
.build()
.await;
let session = make_real_session(&env).await;
let proxy_port = session.proxy.as_ref().expect("proxy spawned").port;
let cmd = format!(
r#"{{"command":"if exec 3<>/dev/tcp/127.0.0.1/{proxy_port} 2>/dev/null; then echo connected; exec 3<&-; else echo blocked; fi"}}"#,
);
let result = session.agent.tools.execute("Bash", &cmd, None, None).await;
let combined = format!(
"{}\n{}",
result.output,
result.full_output.as_deref().unwrap_or("")
);
assert!(
combined.contains("connected"),
"kernel-enforced sandbox must permit TCP to the proxy port {proxy_port}; got:\n{combined}"
);
}
#[cfg(target_os = "linux")]
#[tokio::test]
async fn linux_kernel_blocks_direct_tcp_to_non_proxy_port() {
if !koda_sandbox::bwrap::is_available() {
eprintln!("bwrap not available; skipping Linux kernel-enforcement test");
return;
}
let env = Env::builder()
.provider_type(ProviderType::Mock)
.build()
.await;
let session = make_real_session(&env).await;
let proxy_port = session.proxy.as_ref().expect("proxy spawned").port;
let uds = koda_sandbox::bwrap_proxy::proxy_uds_path(std::process::id(), proxy_port);
assert!(
uds.exists(),
"Phase 3c.1.b regression: UDS bridge {} should exist for the kernel-enforced \
path to activate. Without it, this test would pass spuriously via the \
env-var fallback.",
uds.display()
);
let target_port = if proxy_port == 1 { 2 } else { 1 };
let cmd = format!(
r#"{{"command":"bash -c 'if exec 3<>/dev/tcp/127.0.0.1/{target_port} 2>/dev/null; then echo connected; exec 3<&-; else echo blocked; fi'"}}"#,
);
let result = session.agent.tools.execute("Bash", &cmd, None, None).await;
let combined = format!(
"{}\n{}",
result.output,
result.full_output.as_deref().unwrap_or("")
);
assert!(
combined.contains("blocked"),
"kernel-enforced sandbox must refuse direct TCP to non-proxy port \
{target_port} (proxy is on {proxy_port}); got:\n{combined}"
);
}
#[cfg(target_os = "linux")]
#[tokio::test]
async fn linux_kernel_allows_tcp_to_proxy_port() {
if !koda_sandbox::bwrap::is_available() {
eprintln!("bwrap not available; skipping Linux kernel-allow test");
return;
}
let env = Env::builder()
.provider_type(ProviderType::Mock)
.build()
.await;
let session = make_real_session(&env).await;
let proxy_port = session.proxy.as_ref().expect("proxy spawned").port;
let uds = koda_sandbox::bwrap_proxy::proxy_uds_path(std::process::id(), proxy_port);
if !uds.exists() {
eprintln!(
"UDS bridge {} not present — kernel-enforced path inactive; skipping",
uds.display()
);
return;
}
let cmd = r#"{"command":"bash -c 'port=\"${HTTPS_PROXY##*:}\"; if exec 3<>/dev/tcp/127.0.0.1/$port 2>/dev/null; then echo connected; exec 3<&-; else echo blocked; fi'"}"#;
let result = session.agent.tools.execute("Bash", cmd, None, None).await;
let combined = format!(
"{}\n{}",
result.output,
result.full_output.as_deref().unwrap_or("")
);
assert!(
combined.contains("connected"),
"kernel-enforced sandbox must permit TCP to the in-netns proxy port \
(rewritten by stage 2 from host port {proxy_port}); got:\n{combined}"
);
}
#[tokio::test]
#[ignore = "needs curl; run with --ignored"]
async fn curl_to_blocked_host_returns_403_via_proxy() {
let env = Env::builder()
.provider_type(ProviderType::Mock)
.build()
.await;
let session = make_real_session(&env).await;
let result = session
.agent
.tools
.execute(
"Bash",
r#"{"command":"curl --max-time 5 -sS -o /dev/null -w '%{http_code}\\n' https://blocked.test/ 2>&1; echo exit=$?"}"#,
None,
None,
)
.await;
let combined = format!(
"{}\n{}",
result.output,
result.full_output.as_deref().unwrap_or("")
);
assert!(
combined.contains("403") || combined.contains("CONNECT") || combined.contains("proxy"),
"expected proxy denial signal in output; got:\n{combined}"
);
assert!(
!combined.contains("exit=0"),
"curl must NOT succeed (host is blocked); output:\n{combined}"
);
}