Skip to main content

pcap_toolkit/sort/
mod.rs

1//! Two-pass chronological sorting for legacy PCAP files.
2//!
3//! ## Algorithm
4//!
5//! **First pass** — scan the input file sequentially, tracking byte offsets.
6//! For each packet emit one [`PacketIndex`] record (20 bytes). The index is
7//! held in memory (default) or streamed to a `.idx` sidecar file (`--on-disk`).
8//!
9//! **Second pass** — sort the index by `timestamp_ns`, then seek-and-stream
10//! packets in chronological order into one or more output PCAP files.
11//! Time-slicing (`--slice`) splits output into separate files at regular
12//! intervals.
13
14pub mod index;
15mod writer;
16
17use std::fs::File;
18use std::path::{Path, PathBuf};
19
20use pcap_parser::traits::PcapReaderIterator;
21use pcap_parser::{LegacyPcapReader, PcapBlockOwned, PcapError};
22use rayon::prelude::*;
23
24use crate::bpf::BpfExpr;
25use crate::error::SortError;
26use crate::filter::{Filter, PacketMeta};
27use crate::transform::{self, TransformOptions};
28use index::{FilePacketIndex, IndexStore, PacketIndex, read_packet_at, sidecar_path};
29use writer::{GlobalHeader, SlicedWriter};
30
31const BUF_SIZE: usize = 65536;
32
33// ── Public types ─────────────────────────────────────────────────────────────
34
35/// Options for [`sort_file`].
36#[derive(Debug, Default)]
37pub struct SortOptions {
38    /// Destination: a file path when no slicing, a directory path when slicing.
39    pub output: PathBuf,
40    /// Time-slice interval in seconds. `None` produces a single output file.
41    pub slice_secs: Option<u64>,
42    /// When `true`, write the index to a `.idx` sidecar file instead of RAM.
43    pub on_disk: bool,
44    /// Structured packet filter applied during the second pass.
45    pub filter: Filter,
46    /// BPF expression filter AND-ed with `filter`. `None` = no BPF filter.
47    pub bpf_filter: Option<BpfExpr>,
48    /// Packet-level transformations (truncation, timestamp shift, IP mapping).
49    pub transform: TransformOptions,
50}
51
52/// Summary returned by [`sort_file`] on success.
53#[derive(Debug)]
54pub struct SortReport {
55    /// Total packets written across all output files.
56    pub packets_written: u64,
57    /// Paths of output PCAP files, in creation order.
58    pub files_written: Vec<PathBuf>,
59}
60
61// ── Public entry point ───────────────────────────────────────────────────────
62
63/// Sort `input` chronologically and write the result according to `opts`.
64///
65/// Thin wrapper around [`sort_files`] for the common single-input case.
66///
67/// PCAPng input is not yet supported and returns [`SortError::PcapNgNotSupported`].
68///
69/// # Errors
70/// Returns [`SortError`] on I/O failure, unsupported input format, or
71/// invalid slice specification.
72pub fn sort_file(input: &Path, opts: &SortOptions) -> Result<SortReport, SortError> {
73    sort_files(&[input], opts)
74}
75
76/// Sort and merge one or more input files chronologically.
77///
78/// First passes are run in **parallel** across all inputs using Rayon; the
79/// resulting per-file indexes are merged and sorted by timestamp before a
80/// single sequential second pass writes the output.
81///
82/// All input files must share the same link-layer type; if they differ
83/// [`SortError::IncompatibleLinkType`] is returned.
84///
85/// # Errors
86/// Returns [`SortError`] on I/O failure, unsupported input format,
87/// incompatible link types, or invalid slice specification.
88pub fn sort_files(inputs: &[&Path], opts: &SortOptions) -> Result<SortReport, SortError> {
89    if inputs.is_empty() {
90        return Ok(SortReport {
91            packets_written: 0,
92            files_written: vec![],
93        });
94    }
95
96    // ── Parallel first pass ───────────────────────────────────────────────
97    let results: Vec<Result<(GlobalHeader, IndexStore), SortError>> = inputs
98        .par_iter()
99        .map(|p| first_pass(p, opts.on_disk, &opts.filter, opts.bpf_filter.as_ref()))
100        .collect();
101
102    let mut headers_and_stores: Vec<(GlobalHeader, IndexStore)> = Vec::with_capacity(inputs.len());
103    for r in results {
104        headers_and_stores.push(r?);
105    }
106
107    // ── Link-type compatibility check ─────────────────────────────────────
108    let reference_hdr = headers_and_stores[0].0;
109    for (i, (hdr, _)) in headers_and_stores.iter().enumerate().skip(1) {
110        if hdr.network != reference_hdr.network {
111            // Clean up any on-disk sidecar files before returning.
112            for (_, store) in &headers_and_stores {
113                if let Some(p) = store.sidecar_path() {
114                    let _ = std::fs::remove_file(p);
115                }
116            }
117            return Err(SortError::IncompatibleLinkType {
118                path: inputs[i].to_owned(),
119                first: reference_hdr.network,
120                found: hdr.network,
121            });
122        }
123    }
124
125    // ── Merge per-file indexes ────────────────────────────────────────────
126    let mut merged: Vec<FilePacketIndex> = Vec::new();
127    for (file_id, (_, store)) in headers_and_stores.into_iter().enumerate() {
128        let sidecar = store.sidecar_path().map(Path::to_owned);
129        let entries = store.into_sorted()?;
130        for entry in entries {
131            merged.push(FilePacketIndex { entry, file_id });
132        }
133        if let Some(ref p) = sidecar {
134            let _ = std::fs::remove_file(p);
135        }
136    }
137
138    merged.sort_unstable_by_key(|e| e.entry.timestamp_ns);
139
140    second_pass_multi(inputs, reference_hdr, &merged, opts)
141}
142
143/// Parse a human-readable slice duration string into seconds.
144///
145/// Supported suffixes: `s` (seconds), `m` (minutes), `h` (hours), `d` (days).
146/// A bare integer is treated as seconds.
147///
148/// # Errors
149/// Returns [`SortError::InvalidSlice`] if the string cannot be parsed.
150///
151/// # Examples
152/// ```
153/// use pcap_toolkit::sort::parse_slice;
154/// assert_eq!(parse_slice("1h").unwrap(), 3600);
155/// assert_eq!(parse_slice("30m").unwrap(), 1800);
156/// assert_eq!(parse_slice("1d").unwrap(), 86400);
157/// assert_eq!(parse_slice("120").unwrap(), 120);
158/// ```
159pub fn parse_slice(s: &str) -> Result<u64, SortError> {
160    let (digits, multiplier) = if let Some(n) = s.strip_suffix('s') {
161        (n, 1u64)
162    } else if let Some(n) = s.strip_suffix('m') {
163        (n, 60)
164    } else if let Some(n) = s.strip_suffix('h') {
165        (n, 3600)
166    } else if let Some(n) = s.strip_suffix('d') {
167        (n, 86400)
168    } else {
169        (s, 1)
170    };
171
172    digits
173        .parse::<u64>()
174        .map(|n| n * multiplier)
175        .map_err(|_| SortError::InvalidSlice(s.to_owned()))
176}
177
178// ── First pass ───────────────────────────────────────────────────────────────
179
180/// Scan a single input file to build its packet index and capture the global header.
181///
182/// When `filter` / `bpf` are active (7.4 pre-filter) packets that cannot
183/// match are skipped from the index entirely, reducing second-pass seeks.
184fn first_pass(
185    input: &Path,
186    on_disk: bool,
187    filter: &Filter,
188    bpf: Option<&BpfExpr>,
189) -> Result<(GlobalHeader, IndexStore), SortError> {
190    let magic = read_magic(input)?;
191    if u32::from_le_bytes(magic) == 0x0a0d_0d0a {
192        return Err(SortError::PcapNgNotSupported);
193    }
194
195    let file = File::open(input)?;
196    let mut reader =
197        LegacyPcapReader::new(BUF_SIZE, file).map_err(|e| SortError::Parse(format!("{e:?}")))?;
198
199    let store = if on_disk {
200        IndexStore::disk(&sidecar_path(input))?
201    } else {
202        IndexStore::memory()
203    };
204
205    first_pass_inner(&mut reader, store, filter, bpf)
206}
207
208fn first_pass_inner<R: std::io::Read>(
209    reader: &mut LegacyPcapReader<R>,
210    mut store: IndexStore,
211    filter: &Filter,
212    bpf: Option<&BpfExpr>,
213) -> Result<(GlobalHeader, IndexStore), SortError> {
214    let mut global_hdr: Option<GlobalHeader> = None;
215    let mut file_offset: u64 = 0;
216    let has_filter = !filter.is_empty() || bpf.is_some();
217
218    loop {
219        match reader.next() {
220            Ok((consumed, block)) => {
221                match block {
222                    PcapBlockOwned::LegacyHeader(hdr) => {
223                        global_hdr = Some(GlobalHeader {
224                            magic_number: hdr.magic_number,
225                            version_major: hdr.version_major,
226                            version_minor: hdr.version_minor,
227                            thiszone: hdr.thiszone,
228                            sigfigs: hdr.sigfigs,
229                            snaplen: hdr.snaplen,
230                            network: hdr.network.0,
231                        });
232                        file_offset += consumed as u64;
233                    }
234                    PcapBlockOwned::Legacy(pkt) => {
235                        let ns_precision = global_hdr
236                            .as_ref()
237                            .map(|h| h.is_nanosecond())
238                            .unwrap_or(false);
239
240                        let timestamp_ns = if ns_precision {
241                            u64::from(pkt.ts_sec) * 1_000_000_000 + u64::from(pkt.ts_usec)
242                        } else {
243                            u64::from(pkt.ts_sec) * 1_000_000_000 + u64::from(pkt.ts_usec) * 1_000
244                        };
245
246                        // 7.4 pre-filter: skip packets that cannot match the
247                        // active filter, avoiding unnecessary second-pass seeks.
248                        let should_index = if has_filter {
249                            let meta = PacketMeta::from_packet(timestamp_ns, pkt.caplen, pkt.data);
250                            let structured = filter.is_empty() || filter.matches(&meta);
251                            let bpf_ok = bpf.is_none_or(|b| b.eval(&meta));
252                            structured && bpf_ok
253                        } else {
254                            true
255                        };
256
257                        if should_index {
258                            store.push(PacketIndex {
259                                timestamp_ns,
260                                byte_offset: file_offset,
261                                caplen: pkt.caplen,
262                            })?;
263                        }
264
265                        file_offset += consumed as u64;
266                    }
267                    _ => {
268                        file_offset += consumed as u64;
269                    }
270                }
271                reader.consume(consumed);
272            }
273            Err(PcapError::Eof) => break,
274            Err(PcapError::Incomplete(_)) => {
275                reader
276                    .refill()
277                    .map_err(|e| SortError::Parse(format!("{e:?}")))?;
278            }
279            Err(e) => return Err(SortError::Parse(format!("{e:?}"))),
280        }
281    }
282
283    let hdr = global_hdr.ok_or(SortError::Parse("missing PCAP global header".into()))?;
284    Ok((hdr, store))
285}
286
287// ── Second pass ───────────────────────────────────────────────────────────────
288
289/// Multi-file second pass: open all source files, seek-and-stream packets in
290/// sorted order, apply transforms, and write output.
291fn second_pass_multi(
292    inputs: &[&Path],
293    header: GlobalHeader,
294    sorted: &[FilePacketIndex],
295    opts: &SortOptions,
296) -> Result<SortReport, SortError> {
297    let mut src_files: Vec<File> = inputs
298        .iter()
299        .map(|p| File::open(p).map_err(SortError::Io))
300        .collect::<Result<_, _>>()?;
301
302    let big_endian = header.is_big_endian();
303    let mut writer = SlicedWriter::new(opts.output.clone(), header, opts.slice_secs);
304
305    let ts_delta: i64 = if let Some(target_ns) = opts.transform.timestamp_start_ns {
306        sorted
307            .first()
308            .map(|p| target_ns as i64 - p.entry.timestamp_ns as i64)
309            .unwrap_or(0)
310    } else {
311        0
312    };
313    let has_transform = !opts.transform.is_empty();
314    // Second-pass filter is kept for correctness when pre-filter was not
315    // active (no filter) or when the caller bypasses sort_files directly.
316    let has_filter = !opts.filter.is_empty() || opts.bpf_filter.is_some();
317
318    for entry in sorted {
319        let (origlen, mut data) = read_packet_at(
320            &mut src_files[entry.file_id],
321            entry.entry.byte_offset,
322            entry.entry.caplen,
323            big_endian,
324        )?;
325
326        if has_filter {
327            let meta = PacketMeta::from_packet(entry.entry.timestamp_ns, entry.entry.caplen, &data);
328            let structured_pass = opts.filter.is_empty() || opts.filter.matches(&meta);
329            let bpf_pass = opts
330                .bpf_filter
331                .as_ref()
332                .map(|b| b.eval(&meta))
333                .unwrap_or(true);
334            if !structured_pass || !bpf_pass {
335                continue;
336            }
337        }
338
339        let (ts, caplen, new_origlen) = if has_transform {
340            transform::apply(
341                &mut data,
342                entry.entry.timestamp_ns,
343                ts_delta,
344                origlen,
345                &opts.transform,
346            )
347        } else {
348            (entry.entry.timestamp_ns, data.len() as u32, origlen)
349        };
350
351        writer.write_packet(ts, caplen, new_origlen, &data)?;
352    }
353
354    let (files_written, packets_written) = writer.finish()?;
355    Ok(SortReport {
356        packets_written,
357        files_written,
358    })
359}
360
361// ── Helper ───────────────────────────────────────────────────────────────────
362
363fn read_magic(path: &Path) -> Result<[u8; 4], SortError> {
364    use std::io::Read;
365    let mut magic = [0u8; 4];
366    let mut f = File::open(path)?;
367    f.read_exact(&mut magic)?;
368    Ok(magic)
369}
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374
375    #[test]
376    fn test_parse_slice_hours() {
377        assert_eq!(parse_slice("1h").unwrap(), 3600);
378        assert_eq!(parse_slice("2h").unwrap(), 7200);
379    }
380
381    #[test]
382    fn test_parse_slice_minutes() {
383        assert_eq!(parse_slice("30m").unwrap(), 1800);
384    }
385
386    #[test]
387    fn test_parse_slice_days() {
388        assert_eq!(parse_slice("1d").unwrap(), 86400);
389    }
390
391    #[test]
392    fn test_parse_slice_seconds_explicit() {
393        assert_eq!(parse_slice("120s").unwrap(), 120);
394    }
395
396    #[test]
397    fn test_parse_slice_bare_number() {
398        assert_eq!(parse_slice("3600").unwrap(), 3600);
399    }
400
401    #[test]
402    fn test_parse_slice_invalid() {
403        assert!(parse_slice("abc").is_err());
404        assert!(parse_slice("").is_err());
405        assert!(parse_slice("1x").is_err());
406    }
407}