import json
import unittest
from . import base
from scapy.all import *
from ipaddress import IPv4Address
from pydbus import SessionBus
class CLITest(base.TestCase):
def test_help(self):
p = self.run_helper("-h", netns=False, wait_ready=False)
e = p.stderr_all()
self.assertFalse(e)
o = p.stdout_all().lower()
self.assertIn("usage:", o)
def test_print_capabilities(self):
p = self.run_helper("--print-capabilities", netns=False, wait_ready=False)
e = p.stderr_all()
self.assertFalse(e)
o = p.stdout_all()
j = json.loads(o)
self.assertEqual(j["type"], "slirp-helper")
if "features" in j:
self.assertIsInstance(j["features"], list)
f = set(j["features"])
unknown = f.difference(
{
"dbus-address",
"dhcp",
"exit-with-parent",
"ipv4",
"ipv6",
"migrate",
"netns",
"notify-socket",
"restrict",
"tftp",
}
)
for cap in unknown:
if not cap.startswith("x-"):
self.fail("Unknown capability: %s" % cap)
def test_restrict(self):
self.skipIfNotCapable("restrict")
self.run_helper("--restrict")
def test_ipv4(self):
self.skipIfNotCapable("ipv4")
self.run_helper("--disable-ipv4 --net 12.12.0.1/8")
def test_ipv4(self):
self.skipIfNotCapable("ipv6")
self.run_helper("--disable-ipv6 --net6 fec0::/64")
def test_exit_with_parent(self):
self.skipIfNotCapable("exit-with-parent")
self.run_helper("--exit-with-parent")
def test_tftp(self):
self.skipIfNotCapable("tftp")
self.run_helper("--tftp .")
def test_net(self):
p = self.run_helper("--net 12.12.0.1/23")
p.graceful_stop()
p = self.run_helper("--net wefo/23", wait_ready=False)
e = p.stderr_all()
self.assertTrue(e)
p.graceful_stop()
def test_dbus(self):
self.skipIfNotCapable("dbus-address")
if not base.DBUS_SESSION_BUS_ADDRESS:
self.skipTest("DBUS_SESSION_BUS_ADDRESS unset")
p = self.run_helper(
"--dbus-id TestId --dbus-address %s" % base.DBUS_SESSION_BUS_ADDRESS
)
bus = SessionBus()
iface = bus.get(".Slirp1_%u" % p.p.pid, "/org/freedesktop/Slirp1/Helper")
info = iface.GetInfo()
self.assertIn("Protocol[State]", info)
class ConnTest(base.TestCase):
@base.withScapy()
def test_ping(self, s):
pkt = s.sr1(IP(dst="10.0.2.2") / ICMP())
self.assertEqual(pkt.sprintf("%ICMP.type%"), "echo-reply")
@base.isolateHostNetwork()
def test_restrict(self):
port = self.start_echo()
self.run_helper("--restrict")
with self.guest_netns():
with self.assertRaises((ConnectionError, ConnectionRefusedError)):
self.assertTcpEcho("192.168.1.100", port)
@base.isolateHostNetwork()
def test_tcp_echo(self):
port = self.start_echo()
self.run_helper()
with self.guest_netns():
self.assertTcpEcho("192.168.1.100", port)
@base.isolateHostNetwork()
def test_udp_echo(self):
port = self.start_echo(udp=True)
self.run_helper()
with self.guest_netns():
self.assertUdpEcho("192.168.1.100", port)
@unittest.skipUnless(base.has_cap("dhcp"), "Missing 'dhcp' feature")
class DHCPTest(base.TestCase):
@base.withScapy()
def test_dhcp_v4(self, s):
bootp = BOOTP(xid=RandInt())
dhcp = DHCP(options=[("message-type", "discover"), "end"])
p = (
IP(src="0.0.0.0", dst="255.255.255.255")
/ UDP(sport=68, dport=67)
/ bootp
/ dhcp
)
pkt = s.sr1(p, checkIPaddr=False)
self.assertEqual(pkt.sprintf("%BOOTP.op%"), "BOOTREPLY")
addr = IPv4Address(pkt[BOOTP].yiaddr)
self.assertGreaterEqual(addr, IPv4Address("10.0.2.15"))
self.assertLess(addr, IPv4Address("10.0.2.100"))
for o in pkt[DHCP].options:
if o[0] in ("router", "server_id"):
self.assertEqual(o[1], "10.0.2.2")
opts = [o[0] for o in pkt[DHCP].options if isinstance(o, tuple)]
self.assertIn("router", opts)
self.assertIn("name_server", opts)
self.assertIn("lease_time", opts)
self.assertIn("server_id", opts)
@base.withScapy()
def dhcp_and_net(self, s):
bootp = BOOTP(xid=RandInt())
dhcp = DHCP(options=[("message-type", "discover"), "end"])
p = (
IP(src="0.0.0.0", dst="255.255.255.255")
/ UDP(sport=68, dport=67)
/ bootp
/ dhcp
)
pkt = s.sr1(p, checkIPaddr=False)
self.assertEqual(pkt.sprintf("%BOOTP.op%"), "BOOTREPLY")
addr = IPv4Address(pkt[BOOTP].yiaddr)
self.assertGreaterEqual(addr, IPv4Address("12.34.56.15"))
self.assertLess(addr, IPv4Address("12.34.56.100"))
def test_dhcp_and_net(self):
self.dhcp_and_net(parg="--net 12.34.56.1/24")
@base.withScapy()
def dhcp_dns(self, s):
bootp = BOOTP(xid=RandInt())
dhcp = DHCP(options=[("message-type", "discover"), "end"])
p = (
IP(src="0.0.0.0", dst="255.255.255.255")
/ UDP(sport=68, dport=67)
/ bootp
/ dhcp
)
pkt = s.sr1(p, checkIPaddr=False)
for o in pkt[DHCP].options:
if o[0] == "name_server":
self.assertEqual(o[1], "8.8.8.8")
return
self.fail()
def test_dhcp_dns(self):
self.dhcp_dns(parg="--dhcp-dns 8.8.8.8")
@base.withScapy()
def dhcp_nbp(self, s):
bootp = BOOTP(xid=RandInt())
dhcp = DHCP(options=[("message-type", "discover"), "end"])
p = (
IP(src="0.0.0.0", dst="255.255.255.255")
/ UDP(sport=68, dport=67)
/ bootp
/ dhcp
)
pkt = s.sr1(p, checkIPaddr=False)
bootFileName = pkt[BOOTP].file.partition(b"\0")[0].decode()
tftpServerName = None
for o in pkt[DHCP].options:
if o[0] == "boot-file-name":
bootFileName = o[1].decode() elif o[0] in (
66,
"tftp-server-name",
"tftp_server_name",
): tftpServerName = o[1].decode()
self.assertEqual(tftpServerName, "10.0.0.1")
self.assertEqual(bootFileName, "/my-nbp")
def test_dhcp_nbp(self):
self.dhcp_nbp(parg="--dhcp-nbp tftp://10.0.0.1/my-nbp")
@base.withScapy()
def dhcp_bootfile(self, s):
bootp = BOOTP(xid=RandInt())
dhcp = DHCP(options=[("message-type", "discover"), "end"])
p = (
IP(src="0.0.0.0", dst="255.255.255.255")
/ UDP(sport=68, dport=67)
/ bootp
/ dhcp
)
pkt = s.sr1(p, checkIPaddr=False)
self.assertEqual(
pkt[BOOTP].file.partition(b"\0")[0].decode(), "http://boot.netboot.xyz/"
)
def test_dhcp_bootfile(self):
self.dhcp_bootfile(parg="--dhcp-bootfile http://boot.netboot.xyz/")