use std::{net::SocketAddr, sync::Arc, time::Duration};
use netstack::{netcore::Channel, netsock::TcpStream};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::{Semaphore, watch},
time::timeout,
};
use crate::{
funnel::{FunnelIngressSlot, IngressConn},
magic_dns::DnsView,
peerapi_doh::{find_header_end, write_status},
taildrop::{TaildropError, TaildropStore},
};
pub(crate) const FILE_SHARING_NODE_CAP: &str = "https://tailscale.com/cap/file-sharing";
const MAX_INFLIGHT: usize = 512;
const REQUEST_TIMEOUT: Duration = Duration::from_secs(300);
const MAX_HEADERS: usize = 16 * 1024;
pub(crate) async fn serve(
channel: Channel,
port: u16,
view_rx: watch::Receiver<Arc<DnsView>>,
forward_exit_egress: bool,
taildrop: Option<Arc<TaildropStore>>,
funnel_ingress: FunnelIngressSlot,
) {
use std::net::Ipv4Addr;
use netstack::CreateSocket;
let addr = SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), port);
let listener = match channel.tcp_listen(addr).await {
Ok(l) => l,
Err(e) => {
tracing::error!(error = %e, %addr, "peerapi: tcp listen failed; server inert");
return;
}
};
tracing::debug!(%addr, taildrop = taildrop.is_some(), "peerapi server accepting");
let inflight = Arc::new(Semaphore::new(MAX_INFLIGHT));
loop {
let stream = match listener.accept().await {
Ok(s) => s,
Err(e) => {
tracing::warn!(error = %e, "peerapi: accept failed, stopping server");
return;
}
};
let Ok(permit) = inflight.clone().try_acquire_owned() else {
tracing::warn!(
peer = %stream.remote_addr(),
"peerapi drop: at max in-flight requests ({MAX_INFLIGHT})"
);
continue;
};
let channel = channel.clone();
let view_rx = view_rx.clone();
let taildrop = taildrop.clone();
let funnel_ingress = funnel_ingress.clone();
tokio::spawn(async move {
let _permit = permit;
if let Err(e) = timeout(
REQUEST_TIMEOUT,
route_conn(
stream,
&channel,
&view_rx,
forward_exit_egress,
taildrop,
&funnel_ingress,
),
)
.await
{
tracing::debug!(error = %e, "peerapi: connection timed out");
}
});
}
}
async fn route_conn(
mut stream: TcpStream,
channel: &Channel,
view_rx: &watch::Receiver<Arc<DnsView>>,
forward_exit_egress: bool,
taildrop: Option<Arc<TaildropStore>>,
funnel_ingress: &FunnelIngressSlot,
) -> std::io::Result<()> {
let (seed, header_end) = match read_headers(&mut stream).await? {
Some(v) => v,
None => return Ok(()), };
let mut headers = [httparse::EMPTY_HEADER; 32];
let mut req = httparse::Request::new(&mut headers);
match req.parse(&seed) {
Ok(httparse::Status::Complete(_)) => {}
Ok(httparse::Status::Partial) | Err(_) => {
return write_status(&mut stream, "400 Bad Request").await;
}
}
let method = req.method.unwrap_or("");
let full_path = req.path.unwrap_or("");
match classify_route(method, full_path) {
Route::TaildropPut { name } => {
let offset = parse_range_offset(&req);
let content_length =
header_value(&req, "content-length").and_then(|v| v.trim().parse::<u64>().ok());
let src = stream.remote_addr();
handle_taildrop_put(
stream,
seed,
header_end,
name,
offset,
content_length,
src,
view_rx,
taildrop,
)
.await
}
Route::TaildropMethodNotAllowed => {
write_status(&mut stream, "405 Method Not Allowed").await
}
Route::Ingress => {
let target = header_value(&req, INGRESS_TARGET_HEADER).map(str::to_owned);
let src_header = header_value(&req, INGRESS_SRC_HEADER).map(str::to_owned);
let src = stream.remote_addr();
handle_ingress(stream, target, src_header, src, view_rx, funnel_ingress).await
}
Route::IngressMethodNotAllowed => write_status(&mut stream, "405 Method Not Allowed").await,
Route::DohOrOther => {
crate::peerapi_doh::handle_conn(
stream,
seed,
header_end,
channel,
view_rx,
forward_exit_egress,
)
.await
}
}
}
const INGRESS_TARGET_HEADER: &str = "Tailscale-Ingress-Target";
const INGRESS_SRC_HEADER: &str = "Tailscale-Ingress-Src";
#[derive(Debug, PartialEq, Eq)]
enum Route {
TaildropPut { name: String },
TaildropMethodNotAllowed,
Ingress,
IngressMethodNotAllowed,
DohOrOther,
}
fn classify_route(method: &str, full_path: &str) -> Route {
let raw_path = full_path.split('?').next().unwrap_or(full_path);
if let Some(encoded_name) = raw_path.strip_prefix("/v0/put/") {
if method != "PUT" {
return Route::TaildropMethodNotAllowed;
}
return Route::TaildropPut {
name: percent_decode(encoded_name),
};
}
if raw_path == "/v0/ingress" {
if method != "POST" {
return Route::IngressMethodNotAllowed;
}
return Route::Ingress;
}
Route::DohOrOther
}
async fn read_headers(stream: &mut TcpStream) -> std::io::Result<Option<(Vec<u8>, usize)>> {
let mut buf = Vec::with_capacity(1024);
let mut tmp = [0u8; 1024];
loop {
if let Some(pos) = find_header_end(&buf) {
return Ok(Some((buf, pos)));
}
if buf.len() > MAX_HEADERS {
return Ok(Some((buf, 0)));
}
let n = stream.read(&mut tmp).await?;
if n == 0 {
return Ok(if buf.is_empty() { None } else { Some((buf, 0)) });
}
buf.extend_from_slice(&tmp[..n]);
}
}
fn header_value<'a>(req: &'a httparse::Request<'_, '_>, name: &str) -> Option<&'a str> {
req.headers
.iter()
.find(|h| h.name.eq_ignore_ascii_case(name))
.and_then(|h| std::str::from_utf8(h.value).ok())
}
#[derive(Debug, PartialEq, Eq)]
enum GateDecision {
Allow,
Deny,
}
fn gate_taildrop(view: &DnsView, src: SocketAddr, store_configured: bool) -> GateDecision {
if !store_configured {
return GateDecision::Deny;
}
let node_ok = view
.self_node
.as_ref()
.is_some_and(|n| n.has_node_attr(FILE_SHARING_NODE_CAP));
if !node_ok {
return GateDecision::Deny;
}
if !source_is_known_node(view, src.ip()) {
return GateDecision::Deny;
}
GateDecision::Allow
}
fn source_is_known_node(view: &DnsView, ip: std::net::IpAddr) -> bool {
if view.peers.as_ref().and_then(|p| p.get(&ip)).is_some() {
return true;
}
view.self_node.as_ref().is_some_and(|n| {
std::net::IpAddr::from(n.tailnet_address.ipv4.addr()) == ip
|| std::net::IpAddr::from(n.tailnet_address.ipv6.addr()) == ip
})
}
fn gate_ingress(view: &DnsView, src: SocketAddr) -> GateDecision {
if source_is_known_node(view, src.ip()) {
GateDecision::Allow
} else {
GateDecision::Deny
}
}
fn ingress_101_response() -> &'static [u8] {
b"HTTP/1.1 101 Switching Protocols\r\n\r\n"
}
async fn handle_ingress(
mut stream: TcpStream,
target: Option<String>,
src_header: Option<String>,
src: SocketAddr,
view_rx: &watch::Receiver<Arc<DnsView>>,
funnel_ingress: &FunnelIngressSlot,
) -> std::io::Result<()> {
let Some(target) = target else {
return write_status(&mut stream, "400 Bad Request").await;
};
let sink = {
let guard = funnel_ingress.lock().unwrap_or_else(|e| e.into_inner());
guard.clone()
};
let Some(sink) = sink else {
return write_status(&mut stream, "404 Not Found").await;
};
{
let view = view_rx.borrow().clone();
if gate_ingress(&view, src) == GateDecision::Deny {
return write_status(&mut stream, "403 Forbidden").await;
}
}
stream.write_all(ingress_101_response()).await?;
stream.flush().await?;
let conn = IngressConn {
target,
src: src_header.unwrap_or_default(),
stream,
};
if sink.send(conn).await.is_err() {
tracing::debug!(%src, "funnel ingress: sink closed/full; dropping hijacked conn");
}
Ok(())
}
fn parse_range_offset(req: &httparse::Request<'_, '_>) -> u64 {
let Some(val) = header_value(req, "range") else {
return 0;
};
let val = val.trim();
let Some(rest) = val.strip_prefix("bytes=") else {
return 0;
};
let Some((start, _end)) = rest.split_once('-') else {
return 0;
};
start.trim().parse::<u64>().unwrap_or(0)
}
fn percent_decode(s: &str) -> String {
let bytes = s.as_bytes();
let mut out = Vec::with_capacity(bytes.len());
let mut i = 0;
while i < bytes.len() {
if bytes[i] == b'%' && i + 2 < bytes.len() {
let hi = (bytes[i + 1] as char).to_digit(16);
let lo = (bytes[i + 2] as char).to_digit(16);
if let (Some(hi), Some(lo)) = (hi, lo) {
out.push((hi * 16 + lo) as u8);
i += 3;
continue;
}
}
out.push(bytes[i]);
i += 1;
}
String::from_utf8_lossy(&out).into_owned()
}
#[allow(clippy::too_many_arguments)]
async fn handle_taildrop_put(
mut stream: TcpStream,
seed: Vec<u8>,
header_end: usize,
name: String,
offset: u64,
content_length: Option<u64>,
src: SocketAddr,
view_rx: &watch::Receiver<Arc<DnsView>>,
taildrop: Option<Arc<TaildropStore>>,
) -> std::io::Result<()> {
let Some(store) = taildrop else {
return write_status(&mut stream, "403 Forbidden").await;
};
{
let view = view_rx.borrow().clone();
if gate_taildrop(&view, src, true) == GateDecision::Deny {
return write_status(&mut stream, "403 Forbidden").await;
}
}
let Some(content_length) = content_length else {
return write_status(&mut stream, "400 Bad Request").await;
};
let body_seed = seed[header_end..].to_vec();
let reader = BodyReader::new(body_seed, &mut stream, content_length);
match store.put_file(&name, reader, offset).await {
Ok(_total) => write_taildrop_ok(&mut stream).await,
Err(TaildropError::InvalidFileName) => write_status(&mut stream, "400 Bad Request").await,
Err(TaildropError::FileExists) => write_status(&mut stream, "409 Conflict").await,
Err(TaildropError::Io(e)) => {
tracing::warn!(error = %e, %src, "taildrop put: I/O error");
write_status(&mut stream, "500 Internal Server Error").await
}
}
}
fn taildrop_ok_response() -> Vec<u8> {
const BODY: &[u8] = b"{}\n";
let head = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
BODY.len()
);
let mut out = head.into_bytes();
out.extend_from_slice(BODY);
out
}
async fn write_taildrop_ok(stream: &mut TcpStream) -> std::io::Result<()> {
stream.write_all(&taildrop_ok_response()).await?;
stream.flush().await
}
struct BodyReader<S> {
seed: std::io::Cursor<Vec<u8>>,
stream: S,
remaining: u64,
}
impl<S> BodyReader<S> {
fn new(seed: Vec<u8>, stream: S, content_length: u64) -> Self {
let seed = if seed.len() as u64 > content_length {
seed[..content_length as usize].to_vec()
} else {
seed
};
let remaining = content_length.saturating_sub(seed.len() as u64);
Self {
seed: std::io::Cursor::new(seed),
stream,
remaining,
}
}
}
impl<S: tokio::io::AsyncRead + Unpin> tokio::io::AsyncRead for BodyReader<S> {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
if (self.seed.position() as usize) < self.seed.get_ref().len() {
let dst = buf.initialize_unfilled();
let n = std::io::Read::read(&mut self.seed, dst)?;
buf.advance(n);
return std::task::Poll::Ready(Ok(()));
}
if self.remaining == 0 {
return std::task::Poll::Ready(Ok(()));
}
let want = self.remaining.min(buf.remaining() as u64) as usize;
let poll = {
let mut limited = buf.take(want);
let stream = std::pin::Pin::new(&mut self.stream);
match tokio::io::AsyncRead::poll_read(stream, cx, &mut limited) {
std::task::Poll::Ready(Ok(())) => {
std::task::Poll::Ready(Ok(limited.filled().len()))
}
std::task::Poll::Ready(Err(e)) => std::task::Poll::Ready(Err(e)),
std::task::Poll::Pending => std::task::Poll::Pending,
}
};
match poll {
std::task::Poll::Ready(Ok(n)) => {
buf.advance(n);
self.remaining -= n as u64;
std::task::Poll::Ready(Ok(()))
}
std::task::Poll::Ready(Err(e)) => std::task::Poll::Ready(Err(e)),
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
#[cfg(test)]
mod tests {
use std::net::{IpAddr, Ipv4Addr};
use super::*;
fn req_with<'a>(
buf: &'a [u8],
headers: &'a mut [httparse::Header<'a>],
) -> httparse::Request<'a, 'a> {
let mut req = httparse::Request::new(headers);
let _ = req.parse(buf);
req
}
#[test]
fn percent_decode_decodes_escapes() {
assert_eq!(percent_decode("photo.jpg"), "photo.jpg");
assert_eq!(percent_decode("my%20file.txt"), "my file.txt");
assert_eq!(percent_decode("a%2Fb"), "a/b"); assert_eq!(percent_decode("100%done"), "100%done");
}
#[test]
fn parse_range_offset_reads_resume_start() {
let buf = b"PUT /v0/put/x HTTP/1.1\r\nRange: bytes=1024-\r\n\r\n";
let mut headers = [httparse::EMPTY_HEADER; 8];
let req = req_with(buf, &mut headers);
assert_eq!(parse_range_offset(&req), 1024);
}
#[test]
fn parse_range_offset_defaults_zero() {
let buf = b"PUT /v0/put/x HTTP/1.1\r\nContent-Length: 3\r\n\r\n";
let mut headers = [httparse::EMPTY_HEADER; 8];
let req = req_with(buf, &mut headers);
assert_eq!(parse_range_offset(&req), 0);
let buf2 = b"PUT /v0/put/x HTTP/1.1\r\nRange: items=1-2\r\n\r\n";
let mut headers2 = [httparse::EMPTY_HEADER; 8];
let req2 = req_with(buf2, &mut headers2);
assert_eq!(parse_range_offset(&req2), 0);
}
fn node_at(stable: &str, ipv4: &str) -> ts_control::Node {
use ts_control::{Node, NodeCapMap, StableNodeId, TailnetAddress};
Node {
id: 1,
stable_id: StableNodeId(stable.to_string()),
hostname: stable.to_string(),
user_id: 0,
tailnet: Some("user.ts.net".to_string()),
tags: vec![],
tailnet_address: TailnetAddress {
ipv4: format!("{ipv4}/32").parse().unwrap(),
ipv6: "fd7a::1/128".parse().unwrap(),
},
node_key: [0u8; 32].into(),
node_key_expiry: None,
online: None,
last_seen: None,
key_signature: vec![],
machine_key: None,
disco_key: None,
accepted_routes: vec![],
underlay_addresses: vec![],
derp_region: None,
cap: Default::default(),
cap_map: NodeCapMap::new(),
peerapi_port: None,
peerapi_dns_proxy: false,
is_wireguard_only: false,
exit_node_dns_resolvers: vec![],
peer_relay: false,
service_vips: Default::default(),
}
}
fn view_with(node_has_cap: bool, peer_ips: &[Ipv4Addr]) -> DnsView {
use crate::peer_tracker::PeerDb;
let mut self_node = node_at("self", "100.64.0.1");
if node_has_cap {
self_node
.cap_map
.insert(FILE_SHARING_NODE_CAP.to_string(), Vec::new());
}
let mut db = PeerDb::default();
for (i, ip) in peer_ips.iter().enumerate() {
db.upsert(&node_at(&format!("peer{i}"), &ip.to_string()));
}
DnsView {
self_node: Some(self_node),
peers: Some(Arc::new(db)),
..Default::default()
}
}
fn src(ip: &str) -> SocketAddr {
SocketAddr::new(IpAddr::V4(ip.parse().unwrap()), 41234)
}
#[test]
fn gate_denies_without_store() {
let v = view_with(true, &["100.64.0.9".parse().unwrap()]);
assert_eq!(
gate_taildrop(&v, src("100.64.0.9"), false),
GateDecision::Deny
);
}
#[test]
fn gate_denies_when_node_lacks_file_sharing_cap() {
let v = view_with(false, &["100.64.0.9".parse().unwrap()]);
assert_eq!(
gate_taildrop(&v, src("100.64.0.9"), true),
GateDecision::Deny
);
}
#[test]
fn gate_denies_unknown_source_ip() {
let v = view_with(true, &["100.64.0.9".parse().unwrap()]);
assert_eq!(
gate_taildrop(&v, src("198.51.100.7"), true),
GateDecision::Deny
);
}
#[test]
fn gate_allows_known_peer_with_node_cap_and_store() {
let v = view_with(true, &["100.64.0.9".parse().unwrap()]);
assert_eq!(
gate_taildrop(&v, src("100.64.0.9"), true),
GateDecision::Allow
);
}
#[test]
fn classify_route_maps_put_to_taildrop() {
assert_eq!(
classify_route("PUT", "/v0/put/photo.jpg"),
Route::TaildropPut {
name: "photo.jpg".to_string()
}
);
assert_eq!(
classify_route("PUT", "/v0/put/my%20file.txt"),
Route::TaildropPut {
name: "my file.txt".to_string()
}
);
assert_eq!(
classify_route("PUT", "/v0/put/a.bin?x=1"),
Route::TaildropPut {
name: "a.bin".to_string()
}
);
}
#[test]
fn classify_route_non_put_on_taildrop_is_405() {
assert_eq!(
classify_route("GET", "/v0/put/photo.jpg"),
Route::TaildropMethodNotAllowed
);
assert_eq!(
classify_route("POST", "/v0/put/photo.jpg"),
Route::TaildropMethodNotAllowed
);
}
#[test]
fn classify_route_dns_query_falls_through_to_doh() {
assert_eq!(classify_route("POST", "/dns-query"), Route::DohOrOther);
assert_eq!(
classify_route("GET", "/dns-query?dns=abc"),
Route::DohOrOther
);
assert_eq!(classify_route("GET", "/something-else"), Route::DohOrOther);
}
#[test]
fn taildrop_ok_response_is_exact() {
let resp = taildrop_ok_response();
let text = String::from_utf8(resp).unwrap();
assert!(text.starts_with("HTTP/1.1 200 OK\r\n"));
assert!(text.contains("Content-Length: 3\r\n"));
assert!(text.ends_with("\r\n\r\n{}\n"));
}
use crate::taildrop::TaildropStore;
fn tmp_store() -> (std::path::PathBuf, Arc<TaildropStore>) {
let mut root = std::env::temp_dir();
root.push(format!(
"peerapi-test-{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
));
let store = Arc::new(TaildropStore::new(&root).unwrap());
(root, store)
}
async fn taildrop_request_path(
method: &str,
full_path: &str,
content_length: Option<u64>,
body: &[u8],
store: Option<Arc<TaildropStore>>,
view: &DnsView,
src: SocketAddr,
) -> (String, Vec<u8>) {
fn status(code: &str) -> (String, Vec<u8>) {
(format!("HTTP/1.1 {code}"), Vec::new())
}
match classify_route(method, full_path) {
Route::TaildropMethodNotAllowed | Route::IngressMethodNotAllowed => {
status("405 Method Not Allowed")
}
Route::Ingress => status("INGRESS"),
Route::DohOrOther => status("DOH"), Route::TaildropPut { name } => {
let Some(store) = store else {
return status("403 Forbidden");
};
if gate_taildrop(view, src, true) == GateDecision::Deny {
return status("403 Forbidden");
}
let Some(content_length) = content_length else {
return status("400 Bad Request");
};
let (mut client, server) = tokio::io::duplex(64 * 1024);
let body = body.to_vec();
tokio::spawn(async move {
client.write_all(&body).await.ok();
client.shutdown().await.ok();
});
let reader = BodyReader::new(Vec::new(), server, content_length);
match store.put_file(&name, reader, 0).await {
Ok(_) => {
let resp = taildrop_ok_response();
let text = String::from_utf8_lossy(&resp).into_owned();
let line = text.lines().next().unwrap_or("").to_string();
(line, b"{}\n".to_vec())
}
Err(TaildropError::InvalidFileName) => status("400 Bad Request"),
Err(TaildropError::FileExists) => status("409 Conflict"),
Err(TaildropError::Io(_)) => status("500 Internal Server Error"),
}
}
}
}
#[tokio::test]
async fn put_request_allowed_writes_file_and_200() {
let (root, store) = tmp_store();
let view = view_with(true, &["100.64.0.9".parse().unwrap()]);
let body = b"hello over the wire";
let (line, resp_body) = taildrop_request_path(
"PUT",
"/v0/put/wire.txt",
Some(body.len() as u64),
body,
Some(store.clone()),
&view,
src("100.64.0.9"),
)
.await;
assert_eq!(line, "HTTP/1.1 200 OK");
assert_eq!(resp_body, b"{}\n");
assert_eq!(std::fs::read(root.join("wire.txt")).unwrap(), body);
std::fs::remove_dir_all(&root).ok();
}
#[tokio::test]
async fn put_request_denied_by_gate_is_403() {
let view = view_with(true, &["100.64.0.9".parse().unwrap()]);
let (line, _) = taildrop_request_path(
"PUT",
"/v0/put/x.txt",
Some(1),
b"x",
None,
&view,
src("100.64.0.9"),
)
.await;
assert_eq!(line, "HTTP/1.1 403 Forbidden");
let (root, store) = tmp_store();
let no_cap = view_with(false, &["100.64.0.9".parse().unwrap()]);
let (line, _) = taildrop_request_path(
"PUT",
"/v0/put/x.txt",
Some(1),
b"x",
Some(store),
&no_cap,
src("100.64.0.9"),
)
.await;
assert_eq!(line, "HTTP/1.1 403 Forbidden");
std::fs::remove_dir_all(&root).ok();
}
#[tokio::test]
async fn put_request_with_bad_name_is_400() {
let (root, store) = tmp_store();
let view = view_with(true, &["100.64.0.9".parse().unwrap()]);
let (line, _) = taildrop_request_path(
"PUT",
"/v0/put/%2E%2E%2Fescape",
Some(1),
b"x",
Some(store),
&view,
src("100.64.0.9"),
)
.await;
assert_eq!(line, "HTTP/1.1 400 Bad Request");
std::fs::remove_dir_all(&root).ok();
}
#[tokio::test]
async fn get_on_taildrop_path_is_405() {
let view = view_with(true, &["100.64.0.9".parse().unwrap()]);
let (root, store) = tmp_store();
let (line, _) = taildrop_request_path(
"GET",
"/v0/put/x.txt",
Some(1),
b"x",
Some(store),
&view,
src("100.64.0.9"),
)
.await;
assert_eq!(line, "HTTP/1.1 405 Method Not Allowed");
std::fs::remove_dir_all(&root).ok();
}
#[test]
fn classify_route_maps_post_to_ingress() {
assert_eq!(classify_route("POST", "/v0/ingress"), Route::Ingress);
assert_eq!(classify_route("POST", "/v0/ingress?x=1"), Route::Ingress);
}
#[test]
fn classify_route_non_post_on_ingress_is_405() {
assert_eq!(
classify_route("GET", "/v0/ingress"),
Route::IngressMethodNotAllowed
);
assert_eq!(
classify_route("PUT", "/v0/ingress"),
Route::IngressMethodNotAllowed
);
}
#[test]
fn ingress_101_response_is_exact() {
assert_eq!(
ingress_101_response(),
b"HTTP/1.1 101 Switching Protocols\r\n\r\n"
);
}
#[test]
fn gate_ingress_allows_known_node_denies_unknown() {
let v = view_with(true, &["100.64.0.9".parse().unwrap()]);
assert_eq!(gate_ingress(&v, src("100.64.0.9")), GateDecision::Allow);
assert_eq!(gate_ingress(&v, src("100.64.0.1")), GateDecision::Allow);
assert_eq!(gate_ingress(&v, src("198.51.100.7")), GateDecision::Deny);
}
#[test]
fn gate_ingress_denies_when_no_peers_and_no_self() {
let v = DnsView::default();
assert_eq!(gate_ingress(&v, src("100.64.0.9")), GateDecision::Deny);
}
#[test]
fn dns_query_post_is_not_taildrop_405_or_404() {
assert_eq!(classify_route("POST", "/dns-query"), Route::DohOrOther);
assert_eq!(classify_route("POST", "/dns-query?x=1"), Route::DohOrOther);
}
#[tokio::test]
async fn body_reader_caps_at_content_length() {
let declared: u64 = 8;
let (mut client, server) = tokio::io::duplex(64 * 1024);
tokio::spawn(async move {
client.write_all(b"01234567OVERFLOW1234").await.ok();
client.shutdown().await.ok();
});
let mut reader = BodyReader::new(Vec::new(), server, declared);
let mut out = Vec::new();
let mut chunk = [0u8; 4];
loop {
let n = tokio::io::AsyncReadExt::read(&mut reader, &mut chunk)
.await
.unwrap();
if n == 0 {
break;
}
out.extend_from_slice(&chunk[..n]);
}
assert_eq!(out, b"01234567");
assert_eq!(out.len() as u64, declared);
}
#[tokio::test]
async fn body_reader_includes_seed_then_caps() {
let declared: u64 = 6;
let (mut client, server) = tokio::io::duplex(64 * 1024);
tokio::spawn(async move {
client.write_all(b"XXXXXXXXXX").await.ok(); client.shutdown().await.ok();
});
let mut reader = BodyReader::new(b"abc".to_vec(), server, declared);
let mut out = Vec::new();
let mut chunk = [0u8; 4];
loop {
let n = tokio::io::AsyncReadExt::read(&mut reader, &mut chunk)
.await
.unwrap();
if n == 0 {
break;
}
out.extend_from_slice(&chunk[..n]);
}
assert_eq!(out.len() as u64, declared);
assert_eq!(out, b"abcXXX");
}
}