1pub mod index;
15mod writer;
16
17use std::fs::File;
18use std::path::{Path, PathBuf};
19
20use pcap_parser::traits::PcapReaderIterator;
21use pcap_parser::{LegacyPcapReader, PcapBlockOwned, PcapError};
22use rayon::prelude::*;
23
24use crate::bpf::BpfExpr;
25use crate::error::SortError;
26use crate::filter::{Filter, PacketMeta};
27use crate::transform::{self, TransformOptions};
28use index::{FilePacketIndex, IndexStore, PacketIndex, read_packet_at, sidecar_path};
29use writer::{GlobalHeader, SlicedWriter};
30
31const BUF_SIZE: usize = 65536;
32
33#[derive(Debug, Default)]
37pub struct SortOptions {
38 pub output: PathBuf,
40 pub slice_secs: Option<u64>,
42 pub on_disk: bool,
44 pub filter: Filter,
46 pub bpf_filter: Option<BpfExpr>,
48 pub transform: TransformOptions,
50}
51
52#[derive(Debug)]
54pub struct SortReport {
55 pub packets_written: u64,
57 pub files_written: Vec<PathBuf>,
59}
60
61pub fn sort_file(input: &Path, opts: &SortOptions) -> Result<SortReport, SortError> {
73 sort_files(&[input], opts)
74}
75
76pub fn sort_files(inputs: &[&Path], opts: &SortOptions) -> Result<SortReport, SortError> {
89 if inputs.is_empty() {
90 return Ok(SortReport {
91 packets_written: 0,
92 files_written: vec![],
93 });
94 }
95
96 let results: Vec<Result<(GlobalHeader, IndexStore), SortError>> = inputs
98 .par_iter()
99 .map(|p| first_pass(p, opts.on_disk, &opts.filter, opts.bpf_filter.as_ref()))
100 .collect();
101
102 let mut headers_and_stores: Vec<(GlobalHeader, IndexStore)> = Vec::with_capacity(inputs.len());
103 for r in results {
104 headers_and_stores.push(r?);
105 }
106
107 let reference_hdr = headers_and_stores[0].0;
109 for (i, (hdr, _)) in headers_and_stores.iter().enumerate().skip(1) {
110 if hdr.network != reference_hdr.network {
111 for (_, store) in &headers_and_stores {
113 if let Some(p) = store.sidecar_path() {
114 let _ = std::fs::remove_file(p);
115 }
116 }
117 return Err(SortError::IncompatibleLinkType {
118 path: inputs[i].to_owned(),
119 first: reference_hdr.network,
120 found: hdr.network,
121 });
122 }
123 }
124
125 let mut merged: Vec<FilePacketIndex> = Vec::new();
127 for (file_id, (_, store)) in headers_and_stores.into_iter().enumerate() {
128 let sidecar = store.sidecar_path().map(Path::to_owned);
129 let entries = store.into_sorted()?;
130 for entry in entries {
131 merged.push(FilePacketIndex { entry, file_id });
132 }
133 if let Some(ref p) = sidecar {
134 let _ = std::fs::remove_file(p);
135 }
136 }
137
138 merged.sort_unstable_by_key(|e| e.entry.timestamp_ns);
139
140 second_pass_multi(inputs, reference_hdr, &merged, opts)
141}
142
143pub fn parse_slice(s: &str) -> Result<u64, SortError> {
160 let (digits, multiplier) = if let Some(n) = s.strip_suffix('s') {
161 (n, 1u64)
162 } else if let Some(n) = s.strip_suffix('m') {
163 (n, 60)
164 } else if let Some(n) = s.strip_suffix('h') {
165 (n, 3600)
166 } else if let Some(n) = s.strip_suffix('d') {
167 (n, 86400)
168 } else {
169 (s, 1)
170 };
171
172 digits
173 .parse::<u64>()
174 .map(|n| n * multiplier)
175 .map_err(|_| SortError::InvalidSlice(s.to_owned()))
176}
177
178fn first_pass(
185 input: &Path,
186 on_disk: bool,
187 filter: &Filter,
188 bpf: Option<&BpfExpr>,
189) -> Result<(GlobalHeader, IndexStore), SortError> {
190 let magic = read_magic(input)?;
191 if u32::from_le_bytes(magic) == 0x0a0d_0d0a {
192 return Err(SortError::PcapNgNotSupported);
193 }
194
195 let file = File::open(input)?;
196 let mut reader =
197 LegacyPcapReader::new(BUF_SIZE, file).map_err(|e| SortError::Parse(format!("{e:?}")))?;
198
199 let store = if on_disk {
200 IndexStore::disk(&sidecar_path(input))?
201 } else {
202 IndexStore::memory()
203 };
204
205 first_pass_inner(&mut reader, store, filter, bpf)
206}
207
208fn first_pass_inner<R: std::io::Read>(
209 reader: &mut LegacyPcapReader<R>,
210 mut store: IndexStore,
211 filter: &Filter,
212 bpf: Option<&BpfExpr>,
213) -> Result<(GlobalHeader, IndexStore), SortError> {
214 let mut global_hdr: Option<GlobalHeader> = None;
215 let mut file_offset: u64 = 0;
216 let has_filter = !filter.is_empty() || bpf.is_some();
217
218 loop {
219 match reader.next() {
220 Ok((consumed, block)) => {
221 match block {
222 PcapBlockOwned::LegacyHeader(hdr) => {
223 global_hdr = Some(GlobalHeader {
224 magic_number: hdr.magic_number,
225 version_major: hdr.version_major,
226 version_minor: hdr.version_minor,
227 thiszone: hdr.thiszone,
228 sigfigs: hdr.sigfigs,
229 snaplen: hdr.snaplen,
230 network: hdr.network.0,
231 });
232 file_offset += consumed as u64;
233 }
234 PcapBlockOwned::Legacy(pkt) => {
235 let ns_precision = global_hdr
236 .as_ref()
237 .map(|h| h.is_nanosecond())
238 .unwrap_or(false);
239
240 let timestamp_ns = if ns_precision {
241 u64::from(pkt.ts_sec) * 1_000_000_000 + u64::from(pkt.ts_usec)
242 } else {
243 u64::from(pkt.ts_sec) * 1_000_000_000 + u64::from(pkt.ts_usec) * 1_000
244 };
245
246 let should_index = if has_filter {
249 let meta = PacketMeta::from_packet(timestamp_ns, pkt.caplen, pkt.data);
250 let structured = filter.is_empty() || filter.matches(&meta);
251 let bpf_ok = bpf.is_none_or(|b| b.eval(&meta));
252 structured && bpf_ok
253 } else {
254 true
255 };
256
257 if should_index {
258 store.push(PacketIndex {
259 timestamp_ns,
260 byte_offset: file_offset,
261 caplen: pkt.caplen,
262 })?;
263 }
264
265 file_offset += consumed as u64;
266 }
267 _ => {
268 file_offset += consumed as u64;
269 }
270 }
271 reader.consume(consumed);
272 }
273 Err(PcapError::Eof) => break,
274 Err(PcapError::Incomplete(_)) => {
275 reader
276 .refill()
277 .map_err(|e| SortError::Parse(format!("{e:?}")))?;
278 }
279 Err(e) => return Err(SortError::Parse(format!("{e:?}"))),
280 }
281 }
282
283 let hdr = global_hdr.ok_or(SortError::Parse("missing PCAP global header".into()))?;
284 Ok((hdr, store))
285}
286
287fn second_pass_multi(
292 inputs: &[&Path],
293 header: GlobalHeader,
294 sorted: &[FilePacketIndex],
295 opts: &SortOptions,
296) -> Result<SortReport, SortError> {
297 let mut src_files: Vec<File> = inputs
298 .iter()
299 .map(|p| File::open(p).map_err(SortError::Io))
300 .collect::<Result<_, _>>()?;
301
302 let big_endian = header.is_big_endian();
303 let mut writer = SlicedWriter::new(opts.output.clone(), header, opts.slice_secs);
304
305 let ts_delta: i64 = if let Some(target_ns) = opts.transform.timestamp_start_ns {
306 sorted
307 .first()
308 .map(|p| target_ns as i64 - p.entry.timestamp_ns as i64)
309 .unwrap_or(0)
310 } else {
311 0
312 };
313 let has_transform = !opts.transform.is_empty();
314 let has_filter = !opts.filter.is_empty() || opts.bpf_filter.is_some();
317
318 for entry in sorted {
319 let (origlen, mut data) = read_packet_at(
320 &mut src_files[entry.file_id],
321 entry.entry.byte_offset,
322 entry.entry.caplen,
323 big_endian,
324 )?;
325
326 if has_filter {
327 let meta = PacketMeta::from_packet(entry.entry.timestamp_ns, entry.entry.caplen, &data);
328 let structured_pass = opts.filter.is_empty() || opts.filter.matches(&meta);
329 let bpf_pass = opts
330 .bpf_filter
331 .as_ref()
332 .map(|b| b.eval(&meta))
333 .unwrap_or(true);
334 if !structured_pass || !bpf_pass {
335 continue;
336 }
337 }
338
339 let (ts, caplen, new_origlen) = if has_transform {
340 transform::apply(
341 &mut data,
342 entry.entry.timestamp_ns,
343 ts_delta,
344 origlen,
345 &opts.transform,
346 )
347 } else {
348 (entry.entry.timestamp_ns, data.len() as u32, origlen)
349 };
350
351 writer.write_packet(ts, caplen, new_origlen, &data)?;
352 }
353
354 let (files_written, packets_written) = writer.finish()?;
355 Ok(SortReport {
356 packets_written,
357 files_written,
358 })
359}
360
361fn read_magic(path: &Path) -> Result<[u8; 4], SortError> {
364 use std::io::Read;
365 let mut magic = [0u8; 4];
366 let mut f = File::open(path)?;
367 f.read_exact(&mut magic)?;
368 Ok(magic)
369}
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374
375 #[test]
376 fn test_parse_slice_hours() {
377 assert_eq!(parse_slice("1h").unwrap(), 3600);
378 assert_eq!(parse_slice("2h").unwrap(), 7200);
379 }
380
381 #[test]
382 fn test_parse_slice_minutes() {
383 assert_eq!(parse_slice("30m").unwrap(), 1800);
384 }
385
386 #[test]
387 fn test_parse_slice_days() {
388 assert_eq!(parse_slice("1d").unwrap(), 86400);
389 }
390
391 #[test]
392 fn test_parse_slice_seconds_explicit() {
393 assert_eq!(parse_slice("120s").unwrap(), 120);
394 }
395
396 #[test]
397 fn test_parse_slice_bare_number() {
398 assert_eq!(parse_slice("3600").unwrap(), 3600);
399 }
400
401 #[test]
402 fn test_parse_slice_invalid() {
403 assert!(parse_slice("abc").is_err());
404 assert!(parse_slice("").is_err());
405 assert!(parse_slice("1x").is_err());
406 }
407}