Skip to main content

pcap_toolkit/sort/
mod.rs

1//! Two-pass chronological sorting for legacy PCAP files.
2//!
3//! ## Algorithm
4//!
5//! **First pass** — scan the input file sequentially, tracking byte offsets.
6//! For each packet emit one [`PacketIndex`] record (20 bytes). The index is
7//! held in memory (default) or streamed to a `.idx` sidecar file (`--on-disk`).
8//!
9//! **Second pass** — sort the index by `timestamp_ns`, then seek-and-stream
10//! packets in chronological order into one or more output PCAP files.
11//! Time-slicing (`--slice`) splits output into separate files at regular
12//! intervals.
13
14pub mod index;
15mod writer;
16
17use std::fs::File;
18use std::path::{Path, PathBuf};
19
20use pcap_parser::traits::PcapReaderIterator;
21use pcap_parser::{LegacyPcapReader, PcapBlockOwned, PcapError};
22use rayon::prelude::*;
23
24use crate::bpf::BpfExpr;
25use crate::error::SortError;
26use crate::filter::{Filter, PacketMeta};
27use crate::transform::{self, TransformOptions};
28use index::{FilePacketIndex, IndexStore, PacketIndex, read_packet_at, sidecar_path};
29use writer::{GlobalHeader, SlicedWriter};
30
31const BUF_SIZE: usize = 65536;
32
33// ── Public types ─────────────────────────────────────────────────────────────
34
35/// Options for [`sort_file`].
36#[derive(Debug, Default)]
37pub struct SortOptions {
38    /// Destination: a file path when no slicing, a directory path when slicing.
39    pub output: PathBuf,
40    /// Time-slice interval in seconds. `None` produces a single output file.
41    pub slice_secs: Option<u64>,
42    /// When `true`, write the index to a `.idx` sidecar file instead of RAM.
43    pub on_disk: bool,
44    /// Structured packet filter applied during the second pass.
45    pub filter: Filter,
46    /// BPF expression filter AND-ed with `filter`. `None` = no BPF filter.
47    pub bpf_filter: Option<BpfExpr>,
48    /// Packet-level transformations (truncation, timestamp shift, IP mapping).
49    pub transform: TransformOptions,
50}
51
52/// Summary returned by [`sort_file`] on success.
53#[derive(Debug)]
54pub struct SortReport {
55    /// Total packets written across all output files.
56    pub packets_written: u64,
57    /// Paths of output PCAP files, in creation order.
58    pub files_written: Vec<PathBuf>,
59}
60
61// ── Public entry point ───────────────────────────────────────────────────────
62
63/// Sort `input` chronologically and write the result according to `opts`.
64///
65/// Thin wrapper around [`sort_files`] for the common single-input case.
66///
67/// PCAPng input is not yet supported and returns [`SortError::PcapNgNotSupported`].
68///
69/// # Errors
70/// Returns [`SortError`] on I/O failure, unsupported input format, or
71/// invalid slice specification.
72pub fn sort_file(input: &Path, opts: &SortOptions) -> Result<SortReport, SortError> {
73    sort_files(&[input], opts)
74}
75
76/// Sort and merge one or more input files chronologically.
77///
78/// First passes are run in **parallel** across all inputs using Rayon; the
79/// resulting per-file indexes are merged and sorted by timestamp before a
80/// single sequential second pass writes the output.
81///
82/// All input files must share the same link-layer type; if they differ
83/// [`SortError::IncompatibleLinkType`] is returned.
84///
85/// # Errors
86/// Returns [`SortError`] on I/O failure, unsupported input format,
87/// incompatible link types, or invalid slice specification.
88pub fn sort_files(inputs: &[&Path], opts: &SortOptions) -> Result<SortReport, SortError> {
89    if inputs.is_empty() {
90        return Ok(SortReport {
91            packets_written: 0,
92            files_written: vec![],
93        });
94    }
95
96    // ── Parallel first pass ───────────────────────────────────────────────
97    let results: Vec<Result<(GlobalHeader, IndexStore), SortError>> = inputs
98        .par_iter()
99        .map(|p| first_pass(p, opts.on_disk, &opts.filter, opts.bpf_filter.as_ref()))
100        .collect();
101
102    let mut headers_and_stores: Vec<(GlobalHeader, IndexStore)> = Vec::with_capacity(inputs.len());
103    let mut first_err: Option<SortError> = None;
104    for r in results {
105        match r {
106            Ok(hs) => headers_and_stores.push(hs),
107            Err(e) => {
108                if first_err.is_none() {
109                    first_err = Some(e);
110                }
111            }
112        }
113    }
114    if let Some(e) = first_err {
115        for (_, store) in &headers_and_stores {
116            if let Some(p) = store.sidecar_path() {
117                let _ = std::fs::remove_file(p);
118            }
119        }
120        return Err(e);
121    }
122
123    // ── Link-type compatibility check ─────────────────────────────────────
124    let reference_hdr = headers_and_stores[0].0;
125    for (i, (hdr, _)) in headers_and_stores.iter().enumerate().skip(1) {
126        if hdr.network != reference_hdr.network {
127            // Clean up any on-disk sidecar files before returning.
128            for (_, store) in &headers_and_stores {
129                if let Some(p) = store.sidecar_path() {
130                    let _ = std::fs::remove_file(p);
131                }
132            }
133            return Err(SortError::IncompatibleLinkType {
134                path: inputs[i].to_owned(),
135                first: reference_hdr.network,
136                found: hdr.network,
137            });
138        }
139    }
140
141    // ── Merge per-file indexes ────────────────────────────────────────────
142    let mut merged: Vec<FilePacketIndex> = Vec::new();
143    for (file_id, (_, store)) in headers_and_stores.into_iter().enumerate() {
144        let sidecar = store.sidecar_path().map(Path::to_owned);
145        let entries = store.into_sorted()?;
146        for entry in entries {
147            merged.push(FilePacketIndex { entry, file_id });
148        }
149        if let Some(ref p) = sidecar {
150            let _ = std::fs::remove_file(p);
151        }
152    }
153
154    merged.sort_unstable_by_key(|e| e.entry.timestamp_ns);
155
156    second_pass_multi(inputs, reference_hdr, &merged, opts)
157}
158
159/// Parse a human-readable slice duration string into seconds.
160///
161/// Supported suffixes: `s` (seconds), `m` (minutes), `h` (hours), `d` (days).
162/// A bare integer is treated as seconds.
163///
164/// # Errors
165/// Returns [`SortError::InvalidSlice`] if the string cannot be parsed.
166///
167/// # Examples
168/// ```
169/// use pcap_toolkit::sort::parse_slice;
170/// assert_eq!(parse_slice("1h").unwrap(), 3600);
171/// assert_eq!(parse_slice("30m").unwrap(), 1800);
172/// assert_eq!(parse_slice("1d").unwrap(), 86400);
173/// assert_eq!(parse_slice("120").unwrap(), 120);
174/// ```
175pub fn parse_slice(s: &str) -> Result<u64, SortError> {
176    let (digits, multiplier) = if let Some(n) = s.strip_suffix('s') {
177        (n, 1u64)
178    } else if let Some(n) = s.strip_suffix('m') {
179        (n, 60)
180    } else if let Some(n) = s.strip_suffix('h') {
181        (n, 3600)
182    } else if let Some(n) = s.strip_suffix('d') {
183        (n, 86400)
184    } else {
185        (s, 1)
186    };
187
188    digits
189        .parse::<u64>()
190        .map(|n| n * multiplier)
191        .map_err(|_| SortError::InvalidSlice(s.to_owned()))
192}
193
194// ── First pass ───────────────────────────────────────────────────────────────
195
196/// Scan a single input file to build its packet index and capture the global header.
197///
198/// When `filter` / `bpf` are active (7.4 pre-filter) packets that cannot
199/// match are skipped from the index entirely, reducing second-pass seeks.
200fn first_pass(
201    input: &Path,
202    on_disk: bool,
203    filter: &Filter,
204    bpf: Option<&BpfExpr>,
205) -> Result<(GlobalHeader, IndexStore), SortError> {
206    let magic = read_magic(input)?;
207    if u32::from_le_bytes(magic) == 0x0a0d_0d0a {
208        return Err(SortError::PcapNgNotSupported);
209    }
210
211    let file = File::open(input)?;
212    let mut reader =
213        LegacyPcapReader::new(BUF_SIZE, file).map_err(|e| SortError::Parse(format!("{e:?}")))?;
214
215    let sidecar = on_disk.then(|| sidecar_path(input));
216    let store = match &sidecar {
217        Some(p) => IndexStore::disk(p)?,
218        None => IndexStore::memory(),
219    };
220
221    first_pass_inner(&mut reader, store, filter, bpf).map_err(|e| {
222        if let Some(p) = &sidecar {
223            let _ = std::fs::remove_file(p);
224        }
225        e
226    })
227}
228
229fn first_pass_inner<R: std::io::Read>(
230    reader: &mut LegacyPcapReader<R>,
231    mut store: IndexStore,
232    filter: &Filter,
233    bpf: Option<&BpfExpr>,
234) -> Result<(GlobalHeader, IndexStore), SortError> {
235    let mut global_hdr: Option<GlobalHeader> = None;
236    let mut file_offset: u64 = 0;
237    let has_filter = !filter.is_empty() || bpf.is_some();
238
239    loop {
240        match reader.next() {
241            Ok((consumed, block)) => {
242                match block {
243                    PcapBlockOwned::LegacyHeader(hdr) => {
244                        global_hdr = Some(GlobalHeader {
245                            magic_number: hdr.magic_number,
246                            version_major: hdr.version_major,
247                            version_minor: hdr.version_minor,
248                            thiszone: hdr.thiszone,
249                            sigfigs: hdr.sigfigs,
250                            snaplen: hdr.snaplen,
251                            network: hdr.network.0,
252                        });
253                        file_offset += consumed as u64;
254                    }
255                    PcapBlockOwned::Legacy(pkt) => {
256                        let ns_precision = global_hdr
257                            .as_ref()
258                            .map(|h| h.is_nanosecond())
259                            .unwrap_or(false);
260
261                        let timestamp_ns = if ns_precision {
262                            u64::from(pkt.ts_sec) * 1_000_000_000 + u64::from(pkt.ts_usec)
263                        } else {
264                            u64::from(pkt.ts_sec) * 1_000_000_000 + u64::from(pkt.ts_usec) * 1_000
265                        };
266
267                        // 7.4 pre-filter: skip packets that cannot match the
268                        // active filter, avoiding unnecessary second-pass seeks.
269                        let should_index = if has_filter {
270                            let meta = PacketMeta::from_packet(timestamp_ns, pkt.caplen, pkt.data);
271                            let structured = filter.is_empty() || filter.matches(&meta);
272                            let bpf_ok = bpf.is_none_or(|b| b.eval(&meta));
273                            structured && bpf_ok
274                        } else {
275                            true
276                        };
277
278                        if should_index {
279                            store.push(PacketIndex {
280                                timestamp_ns,
281                                byte_offset: file_offset,
282                                caplen: pkt.caplen,
283                            })?;
284                        }
285
286                        file_offset += consumed as u64;
287                    }
288                    _ => {
289                        file_offset += consumed as u64;
290                    }
291                }
292                reader.consume(consumed);
293            }
294            Err(PcapError::Eof) => break,
295            Err(PcapError::Incomplete(_)) => {
296                reader
297                    .refill()
298                    .map_err(|e| SortError::Parse(format!("{e:?}")))?;
299            }
300            Err(e) => return Err(SortError::Parse(format!("{e:?}"))),
301        }
302    }
303
304    let hdr = global_hdr.ok_or(SortError::Parse("missing PCAP global header".into()))?;
305    Ok((hdr, store))
306}
307
308// ── Second pass ───────────────────────────────────────────────────────────────
309
310/// Multi-file second pass: open all source files, seek-and-stream packets in
311/// sorted order, apply transforms, and write output.
312fn second_pass_multi(
313    inputs: &[&Path],
314    header: GlobalHeader,
315    sorted: &[FilePacketIndex],
316    opts: &SortOptions,
317) -> Result<SortReport, SortError> {
318    let mut src_files: Vec<File> = inputs
319        .iter()
320        .map(|p| File::open(p).map_err(SortError::Io))
321        .collect::<Result<_, _>>()?;
322
323    let big_endian = header.is_big_endian();
324    let mut writer = SlicedWriter::new(opts.output.clone(), header, opts.slice_secs);
325
326    let ts_delta: i64 = if let Some(target_ns) = opts.transform.timestamp_start_ns {
327        sorted
328            .first()
329            .map(|p| target_ns as i64 - p.entry.timestamp_ns as i64)
330            .unwrap_or(0)
331    } else {
332        0
333    };
334    let has_transform = !opts.transform.is_empty();
335    // Second-pass filter is kept for correctness when pre-filter was not
336    // active (no filter) or when the caller bypasses sort_files directly.
337    let has_filter = !opts.filter.is_empty() || opts.bpf_filter.is_some();
338
339    for entry in sorted {
340        let (origlen, mut data) = read_packet_at(
341            &mut src_files[entry.file_id],
342            entry.entry.byte_offset,
343            entry.entry.caplen,
344            big_endian,
345        )?;
346
347        if has_filter {
348            let meta = PacketMeta::from_packet(entry.entry.timestamp_ns, entry.entry.caplen, &data);
349            let structured_pass = opts.filter.is_empty() || opts.filter.matches(&meta);
350            let bpf_pass = opts
351                .bpf_filter
352                .as_ref()
353                .map(|b| b.eval(&meta))
354                .unwrap_or(true);
355            if !structured_pass || !bpf_pass {
356                continue;
357            }
358        }
359
360        let (ts, caplen, new_origlen) = if has_transform {
361            transform::apply(
362                &mut data,
363                entry.entry.timestamp_ns,
364                ts_delta,
365                origlen,
366                &opts.transform,
367            )
368        } else {
369            (entry.entry.timestamp_ns, data.len() as u32, origlen)
370        };
371
372        writer.write_packet(ts, caplen, new_origlen, &data)?;
373    }
374
375    let (files_written, packets_written) = writer.finish()?;
376    Ok(SortReport {
377        packets_written,
378        files_written,
379    })
380}
381
382// ── Helper ───────────────────────────────────────────────────────────────────
383
384fn read_magic(path: &Path) -> Result<[u8; 4], SortError> {
385    use std::io::Read;
386    let mut magic = [0u8; 4];
387    let mut f = File::open(path)?;
388    f.read_exact(&mut magic)?;
389    Ok(magic)
390}
391
392#[cfg(test)]
393mod tests {
394    use super::*;
395
396    #[test]
397    fn test_parse_slice_hours() {
398        assert_eq!(parse_slice("1h").unwrap(), 3600);
399        assert_eq!(parse_slice("2h").unwrap(), 7200);
400    }
401
402    #[test]
403    fn test_parse_slice_minutes() {
404        assert_eq!(parse_slice("30m").unwrap(), 1800);
405    }
406
407    #[test]
408    fn test_parse_slice_days() {
409        assert_eq!(parse_slice("1d").unwrap(), 86400);
410    }
411
412    #[test]
413    fn test_parse_slice_seconds_explicit() {
414        assert_eq!(parse_slice("120s").unwrap(), 120);
415    }
416
417    #[test]
418    fn test_parse_slice_bare_number() {
419        assert_eq!(parse_slice("3600").unwrap(), 3600);
420    }
421
422    #[test]
423    fn test_parse_slice_invalid() {
424        assert!(parse_slice("abc").is_err());
425        assert!(parse_slice("").is_err());
426        assert!(parse_slice("1x").is_err());
427    }
428}