use std::collections::VecDeque;
use std::io;
use std::net::{Ipv4Addr, SocketAddr};
use std::os::fd::{AsRawFd, OwnedFd, RawFd};
use std::time::Duration;
use tokio::io::unix::AsyncFd;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use splicetcp::{FdFrameSource, FrameSource};
use crate::darwin::classifier::{FrameClassifier, InterceptedKind};
use crate::darwin::egress::HostEgress;
use crate::darwin::inbound_relay::InboundCommand;
use crate::darwin::tcp_bridge::TcpBridge;
use crate::datapath::FrameBuf;
use crate::dhcp::DhcpServer;
use crate::dns::DnsForwarder;
use crate::ethernet::{ETH_HEADER_LEN, build_udp_ip_ethernet};
const WRITE_QUEUE_HARD_CAP: usize = 8192;
struct FdWrapper(OwnedFd);
impl AsRawFd for FdWrapper {
fn as_raw_fd(&self) -> RawFd {
self.0.as_raw_fd()
}
}
pub struct NetworkDatapath {
pub guest_fd: OwnedFd,
pub egress: HostEgress,
pub reply_rx: mpsc::Receiver<Vec<u8>>,
pub cmd_rx: mpsc::Receiver<InboundCommand>,
pub dhcp_server: DhcpServer,
pub dns_forwarder: DnsForwarder,
pub dns_log: super::dns_log::DnsResolutionLog,
pub gateway_mac: [u8; 6],
pub gateway_ip: Ipv4Addr,
pub guest_ip: Ipv4Addr,
pub cancel: CancellationToken,
pub mtu: usize,
pub frame_sink: Option<std::sync::Arc<dyn crate::direct_rx::FrameSink>>,
pub conn_sink: Option<std::sync::Arc<dyn crate::direct_rx::ConnSink>>,
}
impl NetworkDatapath {
#[must_use]
#[allow(clippy::too_many_arguments)]
pub fn new(
guest_fd: OwnedFd,
egress: HostEgress,
reply_rx: mpsc::Receiver<Vec<u8>>,
cmd_rx: mpsc::Receiver<InboundCommand>,
dhcp_server: DhcpServer,
dns_forwarder: DnsForwarder,
gateway_ip: Ipv4Addr,
guest_ip: Ipv4Addr,
gateway_mac: [u8; 6],
cancel: CancellationToken,
mtu: usize,
) -> Self {
Self {
guest_fd,
egress,
reply_rx,
cmd_rx,
dhcp_server,
dns_forwarder,
dns_log: super::dns_log::DnsResolutionLog::new(),
gateway_mac,
gateway_ip,
guest_ip,
cancel,
mtu,
frame_sink: None,
conn_sink: None,
}
}
pub fn set_frame_sink(&mut self, sink: std::sync::Arc<dyn crate::direct_rx::FrameSink>) {
self.frame_sink = Some(sink);
}
pub fn set_conn_sink(&mut self, sink: std::sync::Arc<dyn crate::direct_rx::ConnSink>) {
self.conn_sink = Some(sink);
}
pub async fn run(self) -> io::Result<()> {
let Self {
guest_fd,
mut egress,
mut reply_rx,
mut cmd_rx,
mut dhcp_server,
dns_forwarder,
dns_log,
gateway_mac,
gateway_ip,
guest_ip,
cancel,
mtu,
frame_sink,
conn_sink,
} = self;
let guest_raw_fd = guest_fd.as_raw_fd();
set_nonblocking(guest_raw_fd)?;
let mut source = FdFrameSource::new(guest_raw_fd);
let mut device = FrameClassifier::new(gateway_ip, mtu);
device.set_gateway_mac(gateway_mac);
let mut tcp_bridge = TcpBridge::new(gateway_ip);
if frame_sink.is_some() {
tcp_bridge.enable_large_frames();
}
if let Some(ref sink) = conn_sink {
tcp_bridge.set_conn_sink(sink.clone());
}
let proxy_env = super::proxy_detect::ProxyEnvironment::detect();
egress.set_proxy_awareness(dns_log.clone(), proxy_env.clone());
tcp_bridge.set_proxy_awareness(dns_log.clone(), proxy_env);
let guest_async = AsyncFd::new(FdWrapper(guest_fd))?;
let dns_reply_tx = egress.reply_sender();
let mut guest_mac: Option<[u8; 6]> = None;
let mut write_queue: VecDeque<FrameBuf> = VecDeque::new();
let mut timer_wheel =
crate::timer_wheel::TimerWheel::<std::net::SocketAddr>::new(Duration::from_secs(1));
let mut timer_wheel_tick = tokio::time::interval(Duration::from_secs(1));
timer_wheel_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut maintenance = tokio::time::interval(Duration::from_secs(30));
maintenance.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
tracing::info!("Network datapath started (TCP shim + socket proxy mode)");
loop {
let has_pending = !write_queue.is_empty();
tokio::select! {
biased;
() = cancel.cancelled() => {
tracing::info!("Network datapath shutting down");
break;
}
writable = guest_async.writable(), if has_pending => {
let mut guard = writable?;
while let Some(frame) = write_queue.front() {
match guard.try_io(|inner| fd_write(inner.get_ref().as_raw_fd(), frame)) {
Ok(Ok(n)) if n >= frame.len() => { write_queue.pop_front(); }
Ok(Ok(n)) => {
tracing::error!(
"Guest write: short datagram ({n}/{} bytes), dropping frame",
frame.len(),
);
write_queue.pop_front();
}
Ok(Err(e)) if e.kind() == io::ErrorKind::WouldBlock => break,
Ok(Err(e)) => {
tracing::warn!("Guest write error: {}", e);
write_queue.pop_front();
}
Err(_) => break,
}
}
}
readable = guest_async.readable() => {
let mut guard = readable?;
let prev_mac = guest_mac;
source.drain(|frame| device.classify_frame(frame, &mut guest_mac));
guard.clear_ready();
if prev_mac.is_none() {
if let Some(gmac) = guest_mac {
tcp_bridge.set_fast_path_macs(gateway_mac, gmac);
}
}
let fast_acks = device.drain_fast_path(|frame_data| {
tcp_bridge.try_fast_path_intercept(frame_data)
});
for ack in fast_acks {
send_to_guest(frame_sink.as_ref(), &guest_async, &ack, &mut write_queue);
}
let hs_replies = device.drain_handshake(|frame_data| {
tcp_bridge.try_complete_handshake(frame_data)
});
for reply in hs_replies {
send_to_guest(frame_sink.as_ref(), &guest_async, &reply, &mut write_queue);
}
for reply in device.take_arp_replies() {
send_to_guest(frame_sink.as_ref(), &guest_async, &reply, &mut write_queue);
}
device.clear_unmatched_rx();
let intercepted = device.take_intercepted();
for intercepted_frame in &intercepted {
handle_intercepted_frame(
intercepted_frame,
frame_sink.as_ref(),
&guest_async,
&mut write_queue,
&mut egress,
&mut dhcp_server,
&dns_forwarder,
&dns_reply_tx,
&dns_log,
&cancel,
gateway_ip,
gateway_mac,
guest_mac.unwrap_or([0xFF; 6]),
);
}
let gated_syns = device.take_gated_syns();
let gmac = guest_mac.unwrap_or([0xFF; 6]);
for syn in &gated_syns {
if let Some(rst) = tcp_bridge.handle_outbound_syn(&syn.frame, gateway_mac, gmac) {
send_to_guest(frame_sink.as_ref(), &guest_async, &rst, &mut write_queue);
}
}
}
Some(reply_frame) = reply_rx.recv() => {
send_to_guest(frame_sink.as_ref(), &guest_async, &reply_frame, &mut write_queue);
}
Some(cmd) = cmd_rx.recv() => {
process_inbound_cmd(
cmd,
&mut tcp_bridge,
&mut egress,
guest_ip,
gateway_ip,
guest_mac,
);
}
_ = timer_wheel_tick.tick() => {
let expired = timer_wheel.advance();
for entry in &expired {
tracing::trace!(
"Timer wheel expired: {:?} action={:?}",
entry.key,
entry.action
);
}
for entry in expired {
use crate::timer_wheel::TimerAction;
match entry.action {
TimerAction::UdpFlowExpiry | TimerAction::IcmpTimeout => {
egress.expire_flow(entry.key);
}
_ => {}
}
}
}
_ = maintenance.tick() => {
egress.maintenance();
}
}
let hs_frames = tcp_bridge.poll_handshakes();
for frame in hs_frames {
send_to_guest(frame_sink.as_ref(), &guest_async, &frame, &mut write_queue);
}
drain_cmd_rx(
&mut cmd_rx,
&mut tcp_bridge,
&mut egress,
guest_ip,
gateway_ip,
guest_mac,
);
for frame in tcp_bridge.poll_fast_path() {
send_to_guest(frame_sink.as_ref(), &guest_async, &frame, &mut write_queue);
}
drain_reply_rx(
&mut reply_rx,
frame_sink.as_ref(),
&guest_async,
&mut write_queue,
);
tokio::task::yield_now().await;
}
Ok(())
}
}
#[allow(clippy::too_many_arguments)]
fn handle_intercepted_frame(
intercepted: &crate::darwin::classifier::InterceptedFrame,
frame_sink: Option<&std::sync::Arc<dyn crate::direct_rx::FrameSink>>,
guest_async: &AsyncFd<FdWrapper>,
write_queue: &mut VecDeque<FrameBuf>,
egress: &mut HostEgress,
dhcp_server: &mut DhcpServer,
dns_forwarder: &DnsForwarder,
dns_reply_tx: &mpsc::Sender<Vec<u8>>,
dns_log: &super::dns_log::DnsResolutionLog,
cancel: &CancellationToken,
gateway_ip: Ipv4Addr,
gateway_mac: [u8; 6],
guest_mac: [u8; 6],
) {
let frame = &intercepted.frame;
match intercepted.kind {
InterceptedKind::Dhcp => {
handle_dhcp(
frame,
frame_sink,
guest_async,
write_queue,
dhcp_server,
gateway_ip,
gateway_mac,
guest_mac,
);
}
InterceptedKind::Dns => {
handle_dns(
frame,
dns_forwarder,
dns_reply_tx,
dns_log,
cancel,
gateway_ip,
gateway_mac,
guest_mac,
);
}
InterceptedKind::Udp | InterceptedKind::Icmp => {
egress.handle_outbound(frame, guest_mac);
}
}
}
fn process_inbound_cmd(
cmd: InboundCommand,
tcp_bridge: &mut TcpBridge,
egress: &mut HostEgress,
guest_ip: Ipv4Addr,
gateway_ip: Ipv4Addr,
guest_mac: Option<[u8; 6]>,
) {
match cmd {
InboundCommand::TcpAccepted {
host_port, stream, ..
} => {
tracing::debug!(
"Inbound TCP accepted: guest_port={} peer={:?}",
host_port,
stream.peer_addr().ok(),
);
tcp_bridge.initiate_inbound(host_port, stream, guest_ip, gateway_ip);
}
cmd @ InboundCommand::UdpReceived { .. } => {
let mac = guest_mac.unwrap_or([0xFF; 6]);
egress.handle_inbound_command(cmd, mac);
}
}
}
#[allow(clippy::too_many_arguments)]
fn handle_dhcp(
frame: &[u8],
frame_sink: Option<&std::sync::Arc<dyn crate::direct_rx::FrameSink>>,
guest_async: &AsyncFd<FdWrapper>,
write_queue: &mut VecDeque<FrameBuf>,
dhcp_server: &mut DhcpServer,
gateway_ip: Ipv4Addr,
gateway_mac: [u8; 6],
guest_mac: [u8; 6],
) {
let ip_start = ETH_HEADER_LEN;
let ihl = ((frame[ip_start] & 0x0F) as usize) * 4;
let l4_start = ip_start + ihl;
let dhcp_start = l4_start + 8;
if dhcp_start >= frame.len() {
return;
}
let dhcp_data = &frame[dhcp_start..];
tracing::info!("DHCP packet from guest ({} bytes)", dhcp_data.len());
match dhcp_server.handle_packet(dhcp_data) {
Ok(Some(response)) => {
let reply_frame = build_udp_ip_ethernet(
gateway_ip,
Ipv4Addr::BROADCAST,
67,
68,
&response,
gateway_mac,
guest_mac,
);
tracing::info!("Sending DHCP reply frame: {} bytes", reply_frame.len());
send_to_guest(frame_sink, guest_async, &reply_frame, write_queue);
}
Ok(None) => {
tracing::info!("DHCP: no response needed");
}
Err(e) => tracing::warn!("DHCP handling error: {}", e),
}
}
#[allow(clippy::too_many_arguments)] fn handle_dns(
frame: &[u8],
dns_forwarder: &DnsForwarder,
dns_reply_tx: &mpsc::Sender<Vec<u8>>,
dns_log: &super::dns_log::DnsResolutionLog,
cancel: &CancellationToken,
gateway_ip: Ipv4Addr,
gateway_mac: [u8; 6],
guest_mac: [u8; 6],
) {
let ip_start = ETH_HEADER_LEN;
let ihl = ((frame[ip_start] & 0x0F) as usize) * 4;
let l4_start = ip_start + ihl;
let dns_start = l4_start + 8;
if dns_start >= frame.len() {
return;
}
let src_ip = Ipv4Addr::new(
frame[ip_start + 12],
frame[ip_start + 13],
frame[ip_start + 14],
frame[ip_start + 15],
);
let src_port = u16::from_be_bytes([frame[l4_start], frame[l4_start + 1]]);
let dns_data = &frame[dns_start..];
if let Some(response) = dns_forwarder.try_resolve_locally(dns_data) {
let reply_frame = build_udp_ip_ethernet(
gateway_ip,
src_ip,
53,
src_port,
&response,
gateway_mac,
guest_mac,
);
let tx = dns_reply_tx.clone();
tokio::spawn(async move {
if tx.send(reply_frame).await.is_err() {
tracing::debug!("DNS reply channel closed");
}
});
tracing::debug!("Queued local DNS response to guest");
return;
}
let upstream = dns_forwarder.upstream().to_vec();
let data = dns_data.to_vec();
let tx = dns_reply_tx.clone();
let log = dns_log.clone();
let cancel = cancel.clone();
tokio::spawn(async move {
let result = tokio::select! {
r = forward_dns_async(&data, &upstream) => r,
() = cancel.cancelled() => return,
};
match result {
Ok(response) => {
if let Some((domain, ips)) = super::dns_log::parse_dns_response_a_records(&response)
{
tracing::debug!(
domain = %domain,
ips = ?ips,
"DNS resolution logged"
);
log.record(&domain, &ips);
}
let reply_frame = build_udp_ip_ethernet(
gateway_ip,
src_ip,
53,
src_port,
&response,
gateway_mac,
guest_mac,
);
if tx.send(reply_frame).await.is_err() {
tracing::debug!("DNS reply channel closed");
}
tracing::debug!("Sent forwarded DNS response to guest");
}
Err(e) => {
tracing::warn!("DNS forwarding failed: {e}");
if let Some(servfail) = build_dns_servfail_response(&data) {
let reply_frame = build_udp_ip_ethernet(
gateway_ip,
src_ip,
53,
src_port,
&servfail,
gateway_mac,
guest_mac,
);
if tx.send(reply_frame).await.is_err() {
tracing::debug!("DNS reply channel closed");
}
}
}
}
});
}
async fn forward_dns_async(data: &[u8], upstream: &[SocketAddr]) -> Result<Vec<u8>, String> {
if data.len() < 2 {
return Err("query too short".to_string());
}
let query_id = [data[0], data[1]];
let socket = tokio::net::UdpSocket::bind("0.0.0.0:0")
.await
.map_err(|e| format!("bind failed: {e}"))?;
for addr in upstream {
if socket.send_to(data, addr).await.is_err() {
continue;
}
let mut buf = [0u8; 4096];
match tokio::time::timeout(Duration::from_secs(2), socket.recv_from(&mut buf)).await {
Ok(Ok((len, _))) if len >= 2 && buf[0] == query_id[0] && buf[1] == query_id[1] => {
return Ok(buf[..len].to_vec());
}
_ => {}
}
}
Err("all upstream DNS servers failed".to_string())
}
fn build_dns_servfail_response(query: &[u8]) -> Option<Vec<u8>> {
if query.len() < 12 {
return None;
}
let mut offset = 12;
while offset < query.len() {
let label_len = query[offset] as usize;
offset += 1;
if label_len == 0 {
break;
}
if offset + label_len > query.len() {
return None;
}
offset += label_len;
}
if offset + 4 > query.len() {
return None;
}
let question_end = offset + 4;
let mut response = Vec::with_capacity(question_end);
response.extend_from_slice(&query[..12]);
response[2] = 0x80 | (query[2] & 0x79);
response[3] = 0x80 | 0x02;
response[4..6].copy_from_slice(&1u16.to_be_bytes());
response[6..8].copy_from_slice(&0u16.to_be_bytes());
response[8..10].copy_from_slice(&0u16.to_be_bytes());
response[10..12].copy_from_slice(&0u16.to_be_bytes());
response.extend_from_slice(&query[12..question_end]);
Some(response)
}
const DRAIN_REPLY_BATCH: usize = 64;
const DRAIN_CMD_BATCH: usize = 64;
fn drain_reply_rx(
reply_rx: &mut mpsc::Receiver<Vec<u8>>,
frame_sink: Option<&std::sync::Arc<dyn crate::direct_rx::FrameSink>>,
guest_async: &AsyncFd<FdWrapper>,
write_queue: &mut VecDeque<FrameBuf>,
) {
for _ in 0..DRAIN_REPLY_BATCH {
match reply_rx.try_recv() {
Ok(reply_frame) => {
send_to_guest(frame_sink, guest_async, &reply_frame, write_queue);
}
Err(_) => break,
}
}
}
fn drain_cmd_rx(
cmd_rx: &mut mpsc::Receiver<InboundCommand>,
tcp_bridge: &mut TcpBridge,
egress: &mut HostEgress,
guest_ip: Ipv4Addr,
gateway_ip: Ipv4Addr,
guest_mac: Option<[u8; 6]>,
) {
for _ in 0..DRAIN_CMD_BATCH {
match cmd_rx.try_recv() {
Ok(cmd) => {
process_inbound_cmd(cmd, tcp_bridge, egress, guest_ip, gateway_ip, guest_mac);
}
Err(_) => break,
}
}
}
fn send_to_guest(
frame_sink: Option<&std::sync::Arc<dyn crate::direct_rx::FrameSink>>,
guest_async: &AsyncFd<FdWrapper>,
frame_data: &[u8],
write_queue: &mut VecDeque<FrameBuf>,
) {
if let Some(sink) = frame_sink {
let _ = sink.send(frame_data.to_vec());
return;
}
enqueue_or_write(
guest_async,
FrameBuf::from(frame_data.to_vec()),
write_queue,
);
}
fn enqueue_or_write(
guest_async: &AsyncFd<FdWrapper>,
frame: FrameBuf,
write_queue: &mut VecDeque<FrameBuf>,
) {
if !write_queue.is_empty() {
if write_queue.len() < WRITE_QUEUE_HARD_CAP {
write_queue.push_back(frame);
} else {
tracing::debug!("Write queue full ({WRITE_QUEUE_HARD_CAP}), dropping frame");
}
return;
}
let fd = guest_async.get_ref().as_raw_fd();
match fd_write(fd, &frame) {
Ok(n) if n >= frame.len() => {}
Ok(n) => {
tracing::error!(
"Guest write: short datagram ({n}/{} bytes), dropping frame",
frame.len(),
);
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
write_queue.push_back(frame);
}
Err(e) => {
tracing::warn!("Guest write error: {}", e);
}
}
}
fn fd_write(fd: RawFd, data: &[u8]) -> io::Result<usize> {
let n = unsafe { libc::write(fd, data.as_ptr().cast(), data.len()) };
if n < 0 {
Err(io::Error::last_os_error())
} else {
#[allow(clippy::cast_sign_loss)]
Ok(n as usize)
}
}
#[cfg(test)]
fn write_to_guest(guest_async: &AsyncFd<FdWrapper>, data: &[u8]) {
let fd = guest_async.get_ref().as_raw_fd();
let n = unsafe { libc::write(fd, data.as_ptr().cast(), data.len()) };
if n < 0 {
let err = io::Error::last_os_error();
if err.kind() == io::ErrorKind::WouldBlock {
tracing::warn!("Guest write WouldBlock: {} bytes dropped", data.len());
} else {
tracing::warn!("Guest write error: {}", err);
}
} else {
tracing::debug!("Guest write OK: {}/{} bytes", n, data.len());
}
}
#[allow(dead_code)]
fn fd_read(fd: RawFd, buf: &mut [u8]) -> io::Result<usize> {
let n = unsafe { libc::read(fd, buf.as_mut_ptr().cast(), buf.len()) };
if n < 0 {
Err(io::Error::last_os_error())
} else {
#[allow(clippy::cast_sign_loss)]
Ok(n as usize)
}
}
fn set_nonblocking(fd: RawFd) -> io::Result<()> {
let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
if flags < 0 {
return Err(io::Error::last_os_error());
}
let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) };
if ret < 0 {
return Err(io::Error::last_os_error());
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::os::fd::FromRawFd;
fn socketpair() -> (OwnedFd, OwnedFd) {
let mut fds: [i32; 2] = [0; 2];
let ret = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_DGRAM, 0, fds.as_mut_ptr()) };
assert_eq!(ret, 0, "socketpair() failed");
unsafe { (OwnedFd::from_raw_fd(fds[0]), OwnedFd::from_raw_fd(fds[1])) }
}
#[test]
fn test_set_nonblocking() {
let (a, _b) = socketpair();
set_nonblocking(a.as_raw_fd()).unwrap();
let flags = unsafe { libc::fcntl(a.as_raw_fd(), libc::F_GETFL) };
assert!(flags >= 0);
assert_ne!(flags & libc::O_NONBLOCK, 0, "O_NONBLOCK should be set");
}
#[test]
fn test_fd_read_write_roundtrip() {
let (a, b) = socketpair();
let data = b"hello network";
let n = unsafe { libc::write(b.as_raw_fd(), data.as_ptr().cast(), data.len()) };
assert_eq!(n as usize, data.len());
let mut buf = [0u8; 64];
let n = fd_read(a.as_raw_fd(), &mut buf).unwrap();
assert_eq!(n, data.len());
assert_eq!(&buf[..n], data);
}
#[tokio::test]
async fn test_write_to_guest_roundtrip() {
let (a, b) = socketpair();
set_nonblocking(a.as_raw_fd()).unwrap();
let guest_async = AsyncFd::new(FdWrapper(a)).unwrap();
let frame = b"test ethernet frame data";
write_to_guest(&guest_async, frame);
let mut buf = [0u8; 128];
let n = fd_read(b.as_raw_fd(), &mut buf).unwrap();
assert_eq!(n, frame.len());
assert_eq!(&buf[..n], frame.as_slice());
}
#[test]
fn test_fd_write_roundtrip() {
let (a, b) = socketpair();
let data = b"fd_write test data";
let n = fd_write(b.as_raw_fd(), data).unwrap();
assert_eq!(n, data.len());
let mut buf = [0u8; 64];
let n = fd_read(a.as_raw_fd(), &mut buf).unwrap();
assert_eq!(&buf[..n], data);
}
#[tokio::test]
async fn test_enqueue_or_write_direct() {
let (a, b) = socketpair();
set_nonblocking(a.as_raw_fd()).unwrap();
let guest_async = AsyncFd::new(FdWrapper(a)).unwrap();
let mut queue = VecDeque::new();
let frame_data = b"direct write frame";
enqueue_or_write(
&guest_async,
FrameBuf::from(frame_data.to_vec()),
&mut queue,
);
assert!(queue.is_empty(), "Queue should be empty after direct write");
let mut buf = [0u8; 128];
let n = fd_read(b.as_raw_fd(), &mut buf).unwrap();
assert_eq!(&buf[..n], frame_data.as_slice());
}
#[tokio::test]
async fn test_enqueue_or_write_queues_when_nonempty() {
let (a, _b) = socketpair();
set_nonblocking(a.as_raw_fd()).unwrap();
let guest_async = AsyncFd::new(FdWrapper(a)).unwrap();
let mut queue: VecDeque<FrameBuf> = VecDeque::new();
queue.push_back(FrameBuf::from(b"already queued".to_vec()));
enqueue_or_write(
&guest_async,
FrameBuf::from(b"new frame".to_vec()),
&mut queue,
);
assert_eq!(queue.len(), 2);
assert_eq!(&queue[1][..], b"new frame");
}
#[test]
fn test_build_dns_servfail_response() {
let query = vec![
0x12, 0x34, 0x01, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, b'a', 0x00, 0x00, 0x01, 0x00, 0x01, ];
let response = build_dns_servfail_response(&query).expect("should build servfail");
assert_eq!(response[0..2], query[0..2]); assert_eq!(response[2] & 0x80, 0x80); assert_eq!(response[3] & 0x0F, 0x02); assert_eq!(&response[4..6], &1u16.to_be_bytes()); assert_eq!(&response[6..8], &0u16.to_be_bytes()); assert_eq!(&response[8..10], &0u16.to_be_bytes()); assert_eq!(&response[10..12], &0u16.to_be_bytes()); assert_eq!(&response[12..], &query[12..]); }
}