flowprep 0.1.0

Convert network telemetry (pcap, flow CSVs, vendor exports) into ML-ready canonical NetFlow parquet
flowprep-0.1.0 is not a library.

flowprep converts the network telemetry you actually have — packet captures, flow CSVs, vendor exports — into a single, clean, typed, unit-normalized parquet table that you can hand directly to a model, a notebook, or a data pipeline.

It is built and maintained by DeepTempo, where it is used in production as the ingestion front door for our LogLM: flow telemetry arriving from many different sources and formats is normalized through this same canonicalization layer, at scale, before it ever reaches inference. We open-sourced it because everyone working with network flow data ends up rebuilding this exact step, usually badly, usually more than once.

The problem

Every network-ML paper and every SOC data pipeline starts by solving the same unglamorous problem. Flow data arrives as:

  • raw pcap/pcapng captures,
  • CSV exports from research datasets (CIC-IDS, UNSW-NB15, …),
  • nfdump/nfcapd binaries, Zeek logs, VPC flow logs,
  • and a long tail of vendor formats, each with its own column names, duration units, and timestamp encodings.

The same field might be called src_ip, Source IP, srcaddr, ipv4_src_addr, or client_ip. A duration might be in seconds, milliseconds, microseconds, or nanoseconds — and nothing tells you which. A timestamp might be a datetime string, epoch seconds, or epoch nanoseconds.

Flow collectors (goflow2, nfdump) solve transport and storage, but they stop at JSON, protobuf, or their own binary formats. Feature-extraction tooling in the research world is fragmented and hard to reproduce. Nothing hands you a clean parquet table with one schema, ready for training or inference — including for the new generation of network foundation models.

flowprep is that missing step.

Quick start

# install from crates.io (Rust stable, no system dependencies — no libpcap, no JVM)
cargo install flowprep

# or build from source
cargo build --release

# or just run the demo against the bundled examples
./demo.sh

Three subcommands:

# 1. Raw packet captures -> bidirectional flow records
flowprep pcap capture.pcap flows.parquet

# 2. Any aliased flow table (CSV or parquet) -> the canonical schema
flowprep canonicalize cic_export.csv flows.parquet

# 3. Inspect any parquet file from the terminal, no Python required
flowprep peek flows.parquet -n 20

The output is ordinary parquet. There are no bindings to install and no client library to learn — pandas, polars, DuckDB, Spark, and Arrow in any language read it natively. The file format is the API.

import pandas as pd
df = pd.read_parquet("flows.parquet")   # that's the whole integration

What it does

Field alias resolution

flowprep knows 100+ column spellings observed in the wild — CICIDS's Total Fwd Bytes, nfdump's in_bytes, IPFIX's l4_src_port, Zeek-style names, and many vendor variants — and maps them all onto one canonical schema. The alias map is not hard-coded: it is loaded at compile time from a declarative schema file (schemas/netflow/v1/schema.json), the same artifact DeepTempo's production ingestion uses. Adding support for a new vendor's column names is a JSON edit, not a code change.

Unit and encoding normalization

  • Durations arrive in seconds, milliseconds, microseconds, or nanoseconds; the unit is inferred from the source column's name (duration_ms, flow_duration_microseconds, …) and everything lands as float64 seconds.
  • Timestamps arrive as datetime strings, typed timestamps, or epoch values at any precision; epoch precision is inferred from magnitude and everything lands as int64 epoch microseconds.
  • Protocols arrive as IANA numbers or names (tcp, udp, icmp); names are mapped to numbers.

Bidirectional flow aggregation (pcap)

Packets are grouped by a direction-normalized 5-tuple, so both halves of a conversation aggregate into a single flow record with separate forward/backward byte and packet counters. Flows split on a 60s idle timeout and a 1h maximum duration. The reader streams pcap and pcapng, keeps constant memory on the packet path, and is robust to the slightly-out-of-order packets real captures contain.

Label passthrough

Ground-truth columns (label, attack, attack_type, …) survive canonicalization unchanged, so labeled research datasets stay labeled — convert once, train immediately.

Canonical schema

field type notes
timestamp int64 flow start, epoch microseconds
src_ip / dest_ip string
src_port / dest_port int32
fwd_bytes / bwd_bytes int64 bwd zero-filled for single-counter sources
fwd_pkts / bwd_pkts int64 nullable
flow_dur float64 seconds
protocol int32 IANA number; names auto-mapped

Plus any passthrough label columns present in the source.

Example: a real research dataset

The repo ships a ~100k-row slice of CIC-IDS-2017 with its original quirks intact — aliased packet columns (total_fwd_pkts), typed datetime timestamps, and attack labels:

$ flowprep canonicalize examples/cic2017_sample.parquet /tmp/cic.parquet
Wrote 101094 flows to /tmp/cic.parquet        # ~60 ms

$ flowprep peek /tmp/cic.parquet -n 3
+------------------+---------------+--------------+----------+-----------+-----------+-----------+----------+----------+----------+----------+--------+-------+
| timestamp        | src_ip        | dest_ip      | src_port | dest_port | fwd_bytes | bwd_bytes | fwd_pkts | bwd_pkts | flow_dur | protocol | attack | label |
+------------------+---------------+--------------+----------+-----------+-----------+-----------+----------+----------+----------+----------+--------+-------+
| 1488876958000000 | 8.254.250.126 | 192.168.10.5 | 80       | 49188     | 12        | 0         | 2        | 0        | 4e-6     |          | benign | 0     |
| 1488877019000000 | 192.168.10.9  | 192.168.10.3 | 1056     | 88        | 2812      | 2820      | 7        | 4        | 0.000655 |          | benign | 0     |
| 1488877062000000 | 192.168.10.17 | 192.168.10.3 | 35504    | 88        | 3150      | 3152      | 10       | 6        | 0.001122 |          | benign | 0     |
+------------------+---------------+--------------+----------+-----------+-----------+-----------+----------+----------+----------+----------+--------+-------+

Performance

The per-packet decode path is exactly where interpreted languages pay the serialization tax, and it's why flowprep is a compiled tool. On a 500k-packet capture (Apple Silicon laptop, single thread):

implementation throughput wall time
flowprep (Rust) ~1.4M packets/s 0.35 s
equivalent Python (dpkt + pyarrow) ~78k packets/s 6.4 s

That's an 18.5x difference producing byte-identical flow output (tests/bench_pcap.py reproduces the measurement). Canonicalizing the 101k-row CIC sample takes ~60 ms end to end, including parquet read and zstd-compressed write.

Design notes

  • Apache Arrow is the data plane. flowprep is built on arrow-rs and the parquet crate; columnar data goes from reader to writer without detours through row-by-row object representations.
  • Schema as data, not code. The canonical field set, alias mappings, and unit-detection rules live in one JSON artifact embedded at compile time. Tools and pipelines in other languages can consume the same file.
  • Single static binary. No venv, no JVM, no libpcap, no runtime dependencies. cargo build --release produces one file you can copy to an air-gapped sensor box.

Production use at DeepTempo

DeepTempo builds LogLM — a log language model for network security that turns flow telemetry into embeddings and incident classifications. Customer flow data arrives in wildly different shapes (cloud flow logs, IPFIX exporters, pcaps, SIEM exports), and this canonicalization layer is how all of them converge to the one schema the model consumes, at production scale, ahead of inference. flowprep is that layer, maintained in the open: if it mangles a format you care about, the fix benefits our pipeline and yours equally.

Roadmap

  • nfcapd (nfdump binary) reader
  • OCSF Network Activity event reader
  • Zeek conn.log reader
  • IPv6 flow-tuple test coverage and pcapng per-interface timestamp resolutions
  • Published canonical-parquet versions of common research datasets

Contributions welcome — especially "here is a flow export flowprep can't parse" issues with a small sample attached.

Development

cargo build --release

# end-to-end tests (python harness generates fixtures; needs dpkt + pyarrow)
python3 -m venv .venv && .venv/bin/pip install dpkt pyarrow
.venv/bin/python tests/test_e2e.py

# throughput benchmark
.venv/bin/python tests/bench_pcap.py

Release

Releases are tag-driven. Pushing a v* tag builds Linux, macOS, and Windows binaries, attaches them to a GitHub release, and publishes the crate to crates.io using CARGO_REGISTRY_TOKEN.

Acknowledgments

The bundled example slice derives from the CIC-IDS-2017 dataset: Sharafaldin, Lashkari & Ghorbani, "Toward Generating a New Intrusion Detection Dataset and Intrusion Traffic Characterization", ICISSP 2018 (Canadian Institute for Cybersecurity, University of New Brunswick). It is included solely as a small conversion example; for research use, obtain the full dataset from UNB CIC and cite accordingly.

License

Apache-2.0 © DeepTempo