1pub mod index;
15mod writer;
16
17use std::fs::File;
18use std::path::{Path, PathBuf};
19
20use pcap_parser::traits::PcapReaderIterator;
21use pcap_parser::{LegacyPcapReader, PcapBlockOwned, PcapError};
22use rayon::prelude::*;
23
24use crate::bpf::BpfExpr;
25use crate::error::SortError;
26use crate::filter::{Filter, PacketMeta};
27use crate::transform::{self, TransformOptions};
28use index::{FilePacketIndex, IndexStore, PacketIndex, read_packet_at, sidecar_path};
29use writer::{GlobalHeader, SlicedWriter};
30
31const BUF_SIZE: usize = 65536;
32
33#[derive(Debug, Default)]
37pub struct SortOptions {
38 pub output: PathBuf,
40 pub slice_secs: Option<u64>,
42 pub on_disk: bool,
44 pub filter: Filter,
46 pub bpf_filter: Option<BpfExpr>,
48 pub transform: TransformOptions,
50}
51
52#[derive(Debug)]
54pub struct SortReport {
55 pub packets_written: u64,
57 pub files_written: Vec<PathBuf>,
59}
60
61pub fn sort_file(input: &Path, opts: &SortOptions) -> Result<SortReport, SortError> {
73 sort_files(&[input], opts)
74}
75
76pub fn sort_files(inputs: &[&Path], opts: &SortOptions) -> Result<SortReport, SortError> {
89 if inputs.is_empty() {
90 return Ok(SortReport {
91 packets_written: 0,
92 files_written: vec![],
93 });
94 }
95
96 let results: Vec<Result<(GlobalHeader, IndexStore), SortError>> = inputs
98 .par_iter()
99 .map(|p| first_pass(p, opts.on_disk, &opts.filter, opts.bpf_filter.as_ref()))
100 .collect();
101
102 let mut headers_and_stores: Vec<(GlobalHeader, IndexStore)> = Vec::with_capacity(inputs.len());
103 let mut first_err: Option<SortError> = None;
104 for r in results {
105 match r {
106 Ok(hs) => headers_and_stores.push(hs),
107 Err(e) => {
108 if first_err.is_none() {
109 first_err = Some(e);
110 }
111 }
112 }
113 }
114 if let Some(e) = first_err {
115 for (_, store) in &headers_and_stores {
116 if let Some(p) = store.sidecar_path() {
117 let _ = std::fs::remove_file(p);
118 }
119 }
120 return Err(e);
121 }
122
123 let reference_hdr = headers_and_stores[0].0;
125 for (i, (hdr, _)) in headers_and_stores.iter().enumerate().skip(1) {
126 if hdr.network != reference_hdr.network {
127 for (_, store) in &headers_and_stores {
129 if let Some(p) = store.sidecar_path() {
130 let _ = std::fs::remove_file(p);
131 }
132 }
133 return Err(SortError::IncompatibleLinkType {
134 path: inputs[i].to_owned(),
135 first: reference_hdr.network,
136 found: hdr.network,
137 });
138 }
139 }
140
141 let mut merged: Vec<FilePacketIndex> = Vec::new();
143 for (file_id, (_, store)) in headers_and_stores.into_iter().enumerate() {
144 let sidecar = store.sidecar_path().map(Path::to_owned);
145 let entries = store.into_sorted()?;
146 for entry in entries {
147 merged.push(FilePacketIndex { entry, file_id });
148 }
149 if let Some(ref p) = sidecar {
150 let _ = std::fs::remove_file(p);
151 }
152 }
153
154 merged.sort_unstable_by_key(|e| e.entry.timestamp_ns);
155
156 second_pass_multi(inputs, reference_hdr, &merged, opts)
157}
158
159pub fn parse_slice(s: &str) -> Result<u64, SortError> {
176 let (digits, multiplier) = if let Some(n) = s.strip_suffix('s') {
177 (n, 1u64)
178 } else if let Some(n) = s.strip_suffix('m') {
179 (n, 60)
180 } else if let Some(n) = s.strip_suffix('h') {
181 (n, 3600)
182 } else if let Some(n) = s.strip_suffix('d') {
183 (n, 86400)
184 } else {
185 (s, 1)
186 };
187
188 digits
189 .parse::<u64>()
190 .map(|n| n * multiplier)
191 .map_err(|_| SortError::InvalidSlice(s.to_owned()))
192}
193
194fn first_pass(
201 input: &Path,
202 on_disk: bool,
203 filter: &Filter,
204 bpf: Option<&BpfExpr>,
205) -> Result<(GlobalHeader, IndexStore), SortError> {
206 let magic = read_magic(input)?;
207 if u32::from_le_bytes(magic) == 0x0a0d_0d0a {
208 return Err(SortError::PcapNgNotSupported);
209 }
210
211 let file = File::open(input)?;
212 let mut reader =
213 LegacyPcapReader::new(BUF_SIZE, file).map_err(|e| SortError::Parse(format!("{e:?}")))?;
214
215 let sidecar = on_disk.then(|| sidecar_path(input));
216 let store = match &sidecar {
217 Some(p) => IndexStore::disk(p)?,
218 None => IndexStore::memory(),
219 };
220
221 first_pass_inner(&mut reader, store, filter, bpf).map_err(|e| {
222 if let Some(p) = &sidecar {
223 let _ = std::fs::remove_file(p);
224 }
225 e
226 })
227}
228
229fn first_pass_inner<R: std::io::Read>(
230 reader: &mut LegacyPcapReader<R>,
231 mut store: IndexStore,
232 filter: &Filter,
233 bpf: Option<&BpfExpr>,
234) -> Result<(GlobalHeader, IndexStore), SortError> {
235 let mut global_hdr: Option<GlobalHeader> = None;
236 let mut file_offset: u64 = 0;
237 let has_filter = !filter.is_empty() || bpf.is_some();
238
239 loop {
240 match reader.next() {
241 Ok((consumed, block)) => {
242 match block {
243 PcapBlockOwned::LegacyHeader(hdr) => {
244 global_hdr = Some(GlobalHeader {
245 magic_number: hdr.magic_number,
246 version_major: hdr.version_major,
247 version_minor: hdr.version_minor,
248 thiszone: hdr.thiszone,
249 sigfigs: hdr.sigfigs,
250 snaplen: hdr.snaplen,
251 network: hdr.network.0,
252 });
253 file_offset += consumed as u64;
254 }
255 PcapBlockOwned::Legacy(pkt) => {
256 let ns_precision = global_hdr
257 .as_ref()
258 .map(|h| h.is_nanosecond())
259 .unwrap_or(false);
260
261 let timestamp_ns = if ns_precision {
262 u64::from(pkt.ts_sec) * 1_000_000_000 + u64::from(pkt.ts_usec)
263 } else {
264 u64::from(pkt.ts_sec) * 1_000_000_000 + u64::from(pkt.ts_usec) * 1_000
265 };
266
267 let should_index = if has_filter {
270 let meta = PacketMeta::from_packet(timestamp_ns, pkt.caplen, pkt.data);
271 let structured = filter.is_empty() || filter.matches(&meta);
272 let bpf_ok = bpf.is_none_or(|b| b.eval(&meta));
273 structured && bpf_ok
274 } else {
275 true
276 };
277
278 if should_index {
279 store.push(PacketIndex {
280 timestamp_ns,
281 byte_offset: file_offset,
282 caplen: pkt.caplen,
283 })?;
284 }
285
286 file_offset += consumed as u64;
287 }
288 _ => {
289 file_offset += consumed as u64;
290 }
291 }
292 reader.consume(consumed);
293 }
294 Err(PcapError::Eof) => break,
295 Err(PcapError::Incomplete(_)) => {
296 reader
297 .refill()
298 .map_err(|e| SortError::Parse(format!("{e:?}")))?;
299 }
300 Err(e) => return Err(SortError::Parse(format!("{e:?}"))),
301 }
302 }
303
304 let hdr = global_hdr.ok_or(SortError::Parse("missing PCAP global header".into()))?;
305 Ok((hdr, store))
306}
307
308fn second_pass_multi(
313 inputs: &[&Path],
314 header: GlobalHeader,
315 sorted: &[FilePacketIndex],
316 opts: &SortOptions,
317) -> Result<SortReport, SortError> {
318 let mut src_files: Vec<File> = inputs
319 .iter()
320 .map(|p| File::open(p).map_err(SortError::Io))
321 .collect::<Result<_, _>>()?;
322
323 let big_endian = header.is_big_endian();
324 let mut writer = SlicedWriter::new(opts.output.clone(), header, opts.slice_secs);
325
326 let ts_delta: i64 = if let Some(target_ns) = opts.transform.timestamp_start_ns {
327 sorted
328 .first()
329 .map(|p| target_ns as i64 - p.entry.timestamp_ns as i64)
330 .unwrap_or(0)
331 } else {
332 0
333 };
334 let has_transform = !opts.transform.is_empty();
335 let has_filter = !opts.filter.is_empty() || opts.bpf_filter.is_some();
338
339 for entry in sorted {
340 let (origlen, mut data) = read_packet_at(
341 &mut src_files[entry.file_id],
342 entry.entry.byte_offset,
343 entry.entry.caplen,
344 big_endian,
345 )?;
346
347 if has_filter {
348 let meta = PacketMeta::from_packet(entry.entry.timestamp_ns, entry.entry.caplen, &data);
349 let structured_pass = opts.filter.is_empty() || opts.filter.matches(&meta);
350 let bpf_pass = opts
351 .bpf_filter
352 .as_ref()
353 .map(|b| b.eval(&meta))
354 .unwrap_or(true);
355 if !structured_pass || !bpf_pass {
356 continue;
357 }
358 }
359
360 let (ts, caplen, new_origlen) = if has_transform {
361 transform::apply(
362 &mut data,
363 entry.entry.timestamp_ns,
364 ts_delta,
365 origlen,
366 &opts.transform,
367 )
368 } else {
369 (entry.entry.timestamp_ns, data.len() as u32, origlen)
370 };
371
372 writer.write_packet(ts, caplen, new_origlen, &data)?;
373 }
374
375 let (files_written, packets_written) = writer.finish()?;
376 Ok(SortReport {
377 packets_written,
378 files_written,
379 })
380}
381
382fn read_magic(path: &Path) -> Result<[u8; 4], SortError> {
385 use std::io::Read;
386 let mut magic = [0u8; 4];
387 let mut f = File::open(path)?;
388 f.read_exact(&mut magic)?;
389 Ok(magic)
390}
391
392#[cfg(test)]
393mod tests {
394 use super::*;
395
396 #[test]
397 fn test_parse_slice_hours() {
398 assert_eq!(parse_slice("1h").unwrap(), 3600);
399 assert_eq!(parse_slice("2h").unwrap(), 7200);
400 }
401
402 #[test]
403 fn test_parse_slice_minutes() {
404 assert_eq!(parse_slice("30m").unwrap(), 1800);
405 }
406
407 #[test]
408 fn test_parse_slice_days() {
409 assert_eq!(parse_slice("1d").unwrap(), 86400);
410 }
411
412 #[test]
413 fn test_parse_slice_seconds_explicit() {
414 assert_eq!(parse_slice("120s").unwrap(), 120);
415 }
416
417 #[test]
418 fn test_parse_slice_bare_number() {
419 assert_eq!(parse_slice("3600").unwrap(), 3600);
420 }
421
422 #[test]
423 fn test_parse_slice_invalid() {
424 assert!(parse_slice("abc").is_err());
425 assert!(parse_slice("").is_err());
426 assert!(parse_slice("1x").is_err());
427 }
428}