#![allow(dead_code, unused_variables)]
use std::collections::HashSet;
use std::net::SocketAddr;
use std::sync::Arc;
use async_trait::async_trait;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Decision {
Allow,
Deny,
Escalate,
}
#[derive(Debug, Clone)]
pub struct PeekedHost {
pub sni: Option<String>,
pub requested_host: Option<String>,
pub dest: SocketAddr,
}
#[derive(Debug, Clone)]
pub struct ConnectionRequest {
pub host: PeekedHost,
pub pid: Option<u32>,
}
#[async_trait]
pub trait PolicyDecisionPoint: Send + Sync {
async fn authorize(&self, req: &ConnectionRequest) -> anyhow::Result<Decision>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum FailMode {
#[default]
Closed,
Open,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EnforcementTier {
KernelDeny,
ObserveOnly,
ProxyOnly,
}
impl EnforcementTier {
pub fn current() -> Self {
#[cfg(target_os = "linux")]
{
EnforcementTier::KernelDeny
}
#[cfg(target_os = "macos")]
{
EnforcementTier::ObserveOnly
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
EnforcementTier::ProxyOnly
}
}
}
pub struct EgressProxy {
fail_mode: FailMode,
tier: EnforcementTier,
}
impl EgressProxy {
pub fn new() -> Self {
Self {
fail_mode: FailMode::Closed,
tier: EnforcementTier::current(),
}
}
pub async fn start(
&self,
addr: SocketAddr,
pdp: Arc<dyn PolicyDecisionPoint>,
) -> anyhow::Result<()> {
let listener = TcpListener::bind(addr).await?;
tracing::info!(%addr, tier = ?self.tier, "egress proxy listening");
loop {
let (stream, _peer) = listener.accept().await?;
let pdp = pdp.clone();
let fail_mode = self.fail_mode;
tokio::spawn(async move {
if let Err(e) = handle_connection(stream, pdp, fail_mode).await {
tracing::debug!(error = %e, "egress connection ended");
}
});
}
}
fn hard_rst(&self, decision: Decision) -> anyhow::Result<()> {
tracing::warn!(
?decision,
"egress hard RST - 0 bytes left the box (PRODUCT.md B.5 step 4)"
);
Ok(())
}
fn on_error(&self, err: &anyhow::Error) -> anyhow::Result<()> {
match self.fail_mode {
FailMode::Closed => self.hard_rst(Decision::Deny),
FailMode::Open => {
debug_assert!(
self.tier != EnforcementTier::ObserveOnly,
"PRODUCT.md W0: FailMode::Open is forbidden on observe-only platforms"
);
tracing::warn!(%err, "egress error in FailMode::Open (UNSAFE, debug-only) - allowing");
Ok(())
}
}
}
}
impl Default for EgressProxy {
fn default() -> Self {
Self::new()
}
}
fn parse_connect_target(target: &str) -> Option<(String, u16)> {
match target.rsplit_once(':') {
Some((host, port)) => {
let port = port.parse().ok()?;
Some((host.to_string(), port))
}
None => Some((target.to_string(), 443)),
}
}
pub async fn handle_connection(
mut client: TcpStream,
pdp: Arc<dyn PolicyDecisionPoint>,
fail_mode: FailMode,
) -> anyhow::Result<()> {
let mut buf: Vec<u8> = Vec::with_capacity(1024);
let mut tmp = [0u8; 1024];
loop {
let n = client.read(&mut tmp).await?;
if n == 0 {
return Ok(());
}
buf.extend_from_slice(&tmp[..n]);
if buf.windows(4).any(|w| w == b"\r\n\r\n") || buf.len() > 16 * 1024 {
break;
}
}
let head = String::from_utf8_lossy(&buf);
let first = head.lines().next().unwrap_or("");
let parts: Vec<&str> = first.split_whitespace().collect();
if parts.len() < 2 || parts[0] != "CONNECT" {
let _ = client.write_all(b"HTTP/1.1 400 Bad Request\r\n\r\n").await;
return Ok(());
}
let (host, port) = match parse_connect_target(parts[1]) {
Some(hp) => hp,
None => {
let _ = client.write_all(b"HTTP/1.1 400 Bad Request\r\n\r\n").await;
return Ok(());
}
};
let req = ConnectionRequest {
host: PeekedHost {
sni: None,
requested_host: Some(host.clone()),
dest: SocketAddr::from(([0, 0, 0, 0], port)),
},
pid: None,
};
let decision = match pdp.authorize(&req).await {
Ok(d) => d,
Err(_) => match fail_mode {
FailMode::Closed => Decision::Deny,
FailMode::Open => Decision::Allow,
},
};
if decision != Decision::Allow {
let _ = client.write_all(b"HTTP/1.1 403 Forbidden\r\n\r\n").await;
tracing::warn!(%host, port, ?decision, "egress DENIED - 0 bytes left the box");
return Ok(());
}
let mut upstream = match TcpStream::connect((host.as_str(), port)).await {
Ok(u) => u,
Err(_) => {
let _ = client.write_all(b"HTTP/1.1 502 Bad Gateway\r\n\r\n").await;
return Ok(());
}
};
client
.write_all(b"HTTP/1.1 200 Connection Established\r\n\r\n")
.await?;
tokio::io::copy_bidirectional(&mut client, &mut upstream).await?;
Ok(())
}
pub struct AllowlistPdp {
allow_hosts: HashSet<String>,
}
impl AllowlistPdp {
pub fn new<I, S>(hosts: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
Self {
allow_hosts: hosts.into_iter().map(Into::into).collect(),
}
}
}
#[async_trait]
impl PolicyDecisionPoint for AllowlistPdp {
async fn authorize(&self, req: &ConnectionRequest) -> anyhow::Result<Decision> {
let host = req
.host
.requested_host
.as_deref()
.or(req.host.sni.as_deref())
.unwrap_or("");
Ok(if self.allow_hosts.contains(host) {
Decision::Allow
} else {
Decision::Deny
})
}
}
pub struct DnsSinkhole {
allow_hosts: HashSet<String>,
upstream: SocketAddr,
}
impl DnsSinkhole {
pub fn new() -> Self {
Self {
allow_hosts: HashSet::new(),
upstream: "8.8.8.8:53".parse().unwrap(),
}
}
pub fn with_allowlist(mut self, hosts: impl IntoIterator<Item = impl Into<String>>) -> Self {
self.allow_hosts = hosts.into_iter().map(Into::into).collect();
self
}
pub fn with_upstream(mut self, addr: SocketAddr) -> Self {
self.upstream = addr;
self
}
pub async fn start(&self, addr: SocketAddr) -> anyhow::Result<()> {
use hickory_proto::op::Message;
use tokio::net::UdpSocket;
let socket = UdpSocket::bind(addr).await?;
tracing::info!(%addr, "DNS sinkhole listening (PRODUCT.md B.5 step 1)");
let mut buf = [0u8; 4096];
loop {
let (n, src) = socket.recv_from(&mut buf).await?;
let raw = &buf[..n];
let msg = match Message::from_vec(raw) {
Ok(m) => m,
Err(e) => {
tracing::debug!(%src, %e, "DNS parse error - dropping");
continue;
}
};
let queries = &msg.queries;
if queries.is_empty() {
continue;
}
let qname = queries[0].name().to_utf8();
let hostname = qname.trim_end_matches('.');
if self.allow_hosts.contains(hostname) {
match self.forward_to_upstream(raw).await {
Ok(reply) => {
let _ = socket.send_to(&reply, src).await;
}
Err(e) => tracing::warn!(%hostname, %e, "DNS upstream forward failed"),
}
} else {
tracing::warn!(%hostname, %src, "DNS SINKHOLED - NXDOMAIN (PRODUCT.md W0)");
let nxdomain = build_nxdomain(&msg);
let _ = socket.send_to(&nxdomain, src).await;
}
}
}
async fn forward_to_upstream(&self, query: &[u8]) -> anyhow::Result<Vec<u8>> {
use tokio::net::UdpSocket;
use tokio::time::{timeout, Duration};
let temp = UdpSocket::bind("0.0.0.0:0").await?;
temp.send_to(query, self.upstream).await?;
let mut resp = [0u8; 4096];
let n = timeout(Duration::from_secs(3), temp.recv(&mut resp)).await??;
Ok(resp[..n].to_vec())
}
}
fn build_nxdomain(query: &hickory_proto::op::Message) -> Vec<u8> {
use hickory_proto::op::{Message, MessageType, OpCode, ResponseCode};
let mut resp = Message::new(query.metadata.id, MessageType::Response, OpCode::Query);
resp.metadata.recursion_desired = query.metadata.recursion_desired;
resp.metadata.recursion_available = false;
resp.metadata.response_code = ResponseCode::NXDomain;
for q in &query.queries {
resp.add_query(q.clone());
}
resp.to_vec().unwrap_or_default()
}
impl Default for DnsSinkhole {
fn default() -> Self {
Self::new()
}
}
pub fn egress_finding(req: &ConnectionRequest, decision: Decision) -> secureops_core::AuditFinding {
use secureops_core::{AuditFinding, MaestroLayer, NistAttackType, Severity};
let host = req
.host
.requested_host
.as_deref()
.or(req.host.sni.as_deref())
.unwrap_or("<unknown>");
let pid_str = req.pid.map(|p| p.to_string()).unwrap_or_else(|| "?".into());
let (severity, owasp_asi, title, description) = match decision {
Decision::Allow => (
Severity::Info,
"ASI01",
"Egress connection allowed",
format!("host={host} pid={pid_str} - allowed by policy"),
),
Decision::Deny => (
Severity::High,
"ASI05",
"Egress connection BLOCKED - potential data exfiltration",
format!("host={host} pid={pid_str} - denied by policy, 0 bytes left the box"),
),
Decision::Escalate => (
Severity::Critical,
"ASI05",
"Egress connection ESCALATED - suspicious exfil pattern",
format!("host={host} pid={pid_str} - escalated (exfil chain suspected, circuit breaker tripped)"),
),
};
AuditFinding::builder(
format!(
"SC-EGRESS-{:03}",
match decision {
Decision::Allow => 0,
Decision::Deny => 1,
Decision::Escalate => 2,
}
),
severity,
"egress",
)
.title(title)
.description(description)
.evidence(format!(
"SNI={:?} requested_host={:?} pid={pid_str} dest={}",
req.host.sni, req.host.requested_host, req.host.dest
))
.remediation("Review egress allowlist and check for prompt injection (PRODUCT.md B.5/B.6)")
.references(["PRODUCT.md B.5", "OWASP ASI-05"])
.owasp_asi(owasp_asi)
.maestro(MaestroLayer::L4)
.nist(if decision != Decision::Allow {
Some(NistAttackType::Evasion)
} else {
None
})
.build()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn defaults_are_fail_closed() {
assert_eq!(FailMode::default(), FailMode::Closed);
}
#[cfg(target_os = "macos")]
#[test]
fn macos_is_observe_only_tier() {
assert_eq!(EnforcementTier::current(), EnforcementTier::ObserveOnly);
}
}
#[cfg(test)]
mod connect_tests {
use super::*;
async fn spawn_proxy(pdp: Arc<dyn PolicyDecisionPoint>) -> SocketAddr {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
loop {
if let Ok((s, _)) = listener.accept().await {
let pdp = pdp.clone();
tokio::spawn(async move {
let _ = handle_connection(s, pdp, FailMode::Closed).await;
});
}
}
});
addr
}
async fn spawn_echo() -> SocketAddr {
let l = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = l.local_addr().unwrap();
tokio::spawn(async move {
loop {
if let Ok((s, _)) = l.accept().await {
tokio::spawn(async move {
let (mut r, mut w) = s.into_split();
let _ = tokio::io::copy(&mut r, &mut w).await;
});
}
}
});
addr
}
#[tokio::test]
async fn denies_non_allowlisted_host_403() {
let pdp = Arc::new(AllowlistPdp::new(["allowed.example"]));
let addr = spawn_proxy(pdp).await;
let mut c = TcpStream::connect(addr).await.unwrap();
c.write_all(b"CONNECT evil.com:443 HTTP/1.1\r\nHost: evil.com:443\r\n\r\n")
.await
.unwrap();
let mut b = [0u8; 128];
let n = c.read(&mut b).await.unwrap();
let resp = String::from_utf8_lossy(&b[..n]);
assert!(resp.contains("403"), "expected 403, got: {resp}");
}
#[tokio::test]
async fn allows_and_tunnels_to_upstream() {
let echo = spawn_echo().await;
let host = echo.ip().to_string(); let pdp = Arc::new(AllowlistPdp::new([host.clone()]));
let paddr = spawn_proxy(pdp).await;
let mut c = TcpStream::connect(paddr).await.unwrap();
let connect = format!("CONNECT {}:{} HTTP/1.1\r\n\r\n", host, echo.port());
c.write_all(connect.as_bytes()).await.unwrap();
let mut b = [0u8; 128];
let n = c.read(&mut b).await.unwrap();
let resp = String::from_utf8_lossy(&b[..n]);
assert!(resp.contains("200"), "expected 200, got: {resp}");
c.write_all(b"ping").await.unwrap();
let mut got = [0u8; 4];
c.read_exact(&mut got).await.unwrap();
assert_eq!(&got, b"ping");
}
#[test]
fn parse_connect_target_defaults_port() {
assert_eq!(parse_connect_target("h:8443"), Some(("h".into(), 8443)));
assert_eq!(parse_connect_target("h"), Some(("h".into(), 443)));
}
}