use super::{context, flow};
use crate::common::config::env_loader;
use crate::engine::interfaces::{ConnectionObject, TerminatorResult};
use crate::layers::l7::http::httpx;
use crate::resources::kv::KvStore;
use anyhow::{Result, anyhow};
use fancy_log::{LogLevel, log};
use tokio::net::TcpStream;
#[cfg(feature = "lazycert")]
use crate::lazycert::registry::CHALLENGE_REGISTRY;
pub async fn run(
stream: TcpStream,
kv: &mut KvStore,
parent_path: String,
protocol: &str,
) -> Result<()> {
log(
LogLevel::Debug,
&format!("➜ Entering Plaintext L4+ Resolver ({protocol})"),
);
let peek_limit_str = env_loader::get_env("HTTP_PLAIN_HEADER_BUFFER_SIZE", "4096".to_owned());
let peek_limit = peek_limit_str.parse::<usize>().unwrap_or(4096);
let mut buf = vec![0u8; peek_limit];
match stream.peek(&mut buf).await {
Ok(n) if n > 0 => {
let data = &buf[..n];
let mut headers = [httparse::EMPTY_HEADER; 32];
let mut req = httparse::Request::new(&mut headers);
match req.parse(data) {
Ok(httparse::Status::Complete(_) | httparse::Status::Partial) => {
if let Some(m) = req.method {
kv.insert("http.method".to_owned(), m.to_owned());
}
if let Some(p) = req.path {
kv.insert("http.path".to_owned(), p.to_owned());
}
for h in req.headers {
if h.name.eq_ignore_ascii_case("Host") {
let host_val = String::from_utf8_lossy(h.value);
kv.insert("http.host".to_owned(), host_val.to_string());
break;
}
}
log(
LogLevel::Debug,
&format!(
"⚙ L4+ HTTP Context: Host={:?}, Method={:?}",
kv.get("http.host"),
kv.get("http.method")
),
);
}
Err(_) => {
log(
LogLevel::Debug,
"⚙ Failed to parse HTTP headers in L4+ peek (Non-HTTP traffic?)",
);
}
}
}
Ok(_) => { }
Err(e) => {
log(LogLevel::Warn, &format!("⚠ Failed to peek TCP stream: {e}"));
}
}
#[cfg(feature = "lazycert")]
if let Some(path) = kv.get("http.path")
&& let Some(token) = path.strip_prefix("/.well-known/acme-challenge/")
&& !token.is_empty()
{
return handle_acme_challenge(stream, token).await;
}
let conn = ConnectionObject::Stream(Box::new(stream));
context::inject_common(kv, protocol);
let config_manager = crate::config::get();
let config = config_manager
.resolvers
.get(protocol)
.ok_or_else(|| anyhow!("No resolver config found for '{protocol}'"))?;
let execution_result = flow::execute(
&config.connection,
kv,
conn,
parent_path,
ahash::AHashMap::new(),
)
.await;
match execution_result {
Ok(TerminatorResult::Finished) => {
Ok(())
}
Ok(TerminatorResult::Upgrade {
protocol: target_proto,
conn,
parent_path: _,
}) => {
if matches!(target_proto.as_str(), "httpx" | "http/1.1" | "h1" | "h2") {
handle_plain_handover(conn, target_proto).await
} else {
Err(anyhow!(
"Unsupported L7 upgrade protocol from Plaintext: {target_proto}"
))
}
}
Err(e) => {
log(
LogLevel::Error,
&format!("✗ Plain Flow execution failed: {e:#}"),
);
Err(e)
}
}
}
async fn handle_plain_handover(conn: ConnectionObject, target_protocol: String) -> Result<()> {
log(
LogLevel::Debug,
&format!("➜ Handing over to L7 Engine ({target_protocol})..."),
);
httpx::handle_connection(conn, target_protocol)
.await
.map_err(|e| anyhow!("L7 Engine Error: {e}"))
}
#[cfg(feature = "lazycert")]
async fn handle_acme_challenge(mut stream: TcpStream, token: &str) -> Result<()> {
use tokio::io::AsyncWriteExt;
let response = if let Some(entry) = CHALLENGE_REGISTRY.get(token) {
let body = &entry.key_authorization;
format!(
"HTTP/1.1 200 OK\r\n\
Content-Type: text/plain\r\n\
Content-Length: {}\r\n\
Connection: close\r\n\
\r\n\
{}",
body.len(),
body
)
} else {
let body = "Challenge not found";
format!(
"HTTP/1.1 404 Not Found\r\n\
Content-Type: text/plain\r\n\
Content-Length: {}\r\n\
Connection: close\r\n\
\r\n\
{}",
body.len(),
body
)
};
stream.write_all(response.as_bytes()).await?;
stream.shutdown().await?;
log(
LogLevel::Debug,
&format!("ACME HTTP-01 challenge response for token: {token}"),
);
Ok(())
}