import os
import subprocess
import sys
import pyarrow.parquet as pq
import dpkt
_DEFAULT_BIN = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "..", "target", "release", "flowprep"
)
FLOWPREP_BIN = os.environ.get("FLOWPREP_BIN", _DEFAULT_BIN)
def build_test_pcap(path):
with open(path, "wb") as f:
writer = dpkt.pcap.Writer(f)
base = 1750000000.0
def packet(src, dst, sport, dport, proto_cls, payload):
eth = dpkt.ethernet.Ethernet(
src=b"\x00\x01\x02\x03\x04\x05", dst=b"\x06\x07\x08\x09\x0a\x0b"
)
ip = dpkt.ip.IP(
src=bytes(map(int, src.split("."))),
dst=bytes(map(int, dst.split("."))),
)
transport = proto_cls(sport=sport, dport=dport)
transport.data = payload
ip.data = transport
ip.p = 6 if proto_cls is dpkt.tcp.TCP else 17
ip.len = len(bytes(ip))
eth.data = ip
eth.type = dpkt.ethernet.ETH_TYPE_IP
return bytes(eth)
for i in range(3):
writer.writepkt(
packet("10.0.0.1", "10.0.0.2", 44321, 443, dpkt.tcp.TCP, b"x" * 100),
ts=base + i,
)
for i in range(2):
writer.writepkt(
packet("10.0.0.2", "10.0.0.1", 443, 44321, dpkt.tcp.TCP, b"y" * 500),
ts=base + 0.5 + i,
)
writer.writepkt(
packet("10.0.0.3", "8.8.8.8", 5353, 53, dpkt.udp.UDP, b"z" * 60),
ts=base + 10,
)
def build_cicids_csv(path):
rows = [
"Source IP, Destination IP, Source Port, Destination Port, Flow Duration_Milliseconds, Total Fwd Bytes, Total Bwd Bytes, Protocol, Timestamp, Label",
"192.168.1.5,10.9.9.9,51000,80,2500,1200,34000,tcp,1750000000,BENIGN",
"192.168.1.6,10.9.9.9,51001,80,150,90,0,udp,1750000060,DDoS",
]
with open(path, "w") as f:
f.write("\n".join(rows))
def main():
build_test_pcap("/tmp/flowprep_test.pcap")
build_cicids_csv("/tmp/flowprep_test.csv")
r = subprocess.run(
[FLOWPREP_BIN, "pcap", "/tmp/flowprep_test.pcap", "/tmp/flowprep_pcap.parquet"],
capture_output=True, text=True,
)
print(r.stdout.strip(), r.stderr.strip())
assert r.returncode == 0, "pcap conversion failed"
t = pq.read_table("/tmp/flowprep_pcap.parquet")
print(t.to_pydict())
assert t.num_rows == 2, f"expected 2 flows, got {t.num_rows}"
tcp = [r for r in t.to_pylist() if r["protocol"] == 6][0]
assert tcp["fwd_pkts"] == 3 and tcp["bwd_pkts"] == 2, "direction split wrong"
assert tcp["flow_dur"] == 2.0, f"flow_dur wrong: {tcp['flow_dur']}"
r = subprocess.run(
[FLOWPREP_BIN, "canonicalize", "/tmp/flowprep_test.csv", "/tmp/flowprep_csv.parquet"],
capture_output=True, text=True,
)
print(r.stdout.strip(), r.stderr.strip())
assert r.returncode == 0, "canonicalize failed"
t = pq.read_table("/tmp/flowprep_csv.parquet")
print(t.to_pydict())
rows = t.to_pylist()
assert rows[0]["flow_dur"] == 2.5, f"ms->s conversion wrong: {rows[0]['flow_dur']}"
assert rows[0]["protocol"] == 6 and rows[1]["protocol"] == 17, "protocol mapping wrong"
assert rows[0]["timestamp"] == 1750000000_000000, "epoch-seconds detection wrong"
assert rows[1]["timestamp"] == 1750000060_000000, "epoch-seconds detection wrong"
assert rows[0]["label"] == "BENIGN", "label passthrough wrong"
print("ALL TESTS PASSED")
if __name__ == "__main__":
sys.exit(main())