Skip to main content

pcap_toolkit/export/
parquet.rs

1//! Apache Parquet columnar writer for packet export.
2//!
3//! Uses the `parquet` crate's column-writer API to produce typed, columnar
4//! Parquet files without requiring the Arrow library.
5//!
6//! Schema
7//! ------
8//! ```text
9//! message schema {
10//!   REQUIRED INT64  timestamp_ns;
11//!   OPTIONAL BINARY src_ip      (STRING);
12//!   OPTIONAL BINARY dst_ip      (STRING);
13//!   OPTIONAL INT32  src_port;
14//!   OPTIONAL INT32  dst_port;
15//!   OPTIONAL INT32  protocol;
16//!   OPTIONAL INT64  flow_id;
17//!   REQUIRED INT32  caplen;
18//!   REQUIRED INT32  origlen;
19//!   OPTIONAL INT32  tcp_flags;
20//!   OPTIONAL BINARY payload;
21//! }
22//! ```
23//!
24//! Row groups contain up to [`BATCH_SIZE`] rows. When `compress_payload` is
25//! `true` ZSTD compression is applied to all columns (Parquet has no
26//! per-field compression; ZSTD on a binary column containing payloads yields
27//! the largest savings).
28
29use std::path::Path;
30use std::sync::Arc;
31
32use parquet::basic::{Compression, Repetition, ZstdLevel};
33use parquet::column::writer::ColumnWriter;
34use parquet::data_type::ByteArray;
35use parquet::file::properties::WriterProperties;
36use parquet::file::writer::SerializedFileWriter;
37use parquet::schema::parser::parse_message_type;
38use parquet::schema::types::Type;
39
40use crate::error::ExportError;
41
42use super::{PacketRecord, PacketSink};
43
44/// Number of packets per Parquet row group.
45const BATCH_SIZE: usize = 4096;
46
47const SCHEMA_STR: &str = "
48message schema {
49  REQUIRED INT64  timestamp_ns;
50  OPTIONAL BYTE_ARRAY src_ip      (STRING);
51  OPTIONAL BYTE_ARRAY dst_ip      (STRING);
52  OPTIONAL INT32  src_port;
53  OPTIONAL INT32  dst_port;
54  OPTIONAL INT32  protocol;
55  OPTIONAL INT64  flow_id;
56  REQUIRED INT32  caplen;
57  REQUIRED INT32  origlen;
58  OPTIONAL INT32  tcp_flags;
59  OPTIONAL BYTE_ARRAY payload;
60}
61";
62
63// ── Streaming sink ────────────────────────────────────────────────────────────
64
65/// Streaming Parquet writer that implements [`PacketSink`].
66///
67/// Records are accumulated into an internal buffer; a row group is flushed
68/// every [`BATCH_SIZE`] records. The remaining records are flushed on
69/// [`PacketSink::close`].
70pub struct ParquetSink {
71    writer: Option<SerializedFileWriter<std::fs::File>>,
72    buffer: Vec<PacketRecord>,
73    count: u64,
74}
75
76impl ParquetSink {
77    /// Create a new `ParquetSink` writing to `path`.
78    pub fn create(path: &Path, compress_payload: bool) -> Result<Self, ExportError> {
79        let schema: Arc<Type> = Arc::new(
80            parse_message_type(SCHEMA_STR).map_err(|e| ExportError::Parquet(e.to_string()))?,
81        );
82        let compression = if compress_payload {
83            Compression::ZSTD(
84                ZstdLevel::try_new(3).map_err(|e| ExportError::Parquet(e.to_string()))?,
85            )
86        } else {
87            Compression::SNAPPY
88        };
89        let props = Arc::new(
90            WriterProperties::builder()
91                .set_compression(compression)
92                .build(),
93        );
94        let file = std::fs::File::create(path)?;
95        let writer = SerializedFileWriter::new(file, schema, props)
96            .map_err(|e| ExportError::Parquet(e.to_string()))?;
97        Ok(Self {
98            writer: Some(writer),
99            buffer: Vec::with_capacity(BATCH_SIZE),
100            count: 0,
101        })
102    }
103}
104
105impl PacketSink for ParquetSink {
106    fn write(&mut self, record: &PacketRecord) -> Result<(), ExportError> {
107        self.buffer.push(record.clone());
108        self.count += 1;
109        if self.buffer.len() >= BATCH_SIZE {
110            let batch = std::mem::take(&mut self.buffer);
111            if let Some(ref mut w) = self.writer {
112                write_row_group(w, &batch)?;
113            }
114        }
115        Ok(())
116    }
117
118    fn close(&mut self) -> Result<u64, ExportError> {
119        if !self.buffer.is_empty() {
120            let batch = std::mem::take(&mut self.buffer);
121            if let Some(ref mut w) = self.writer {
122                write_row_group(w, &batch)?;
123            }
124        }
125        if let Some(writer) = self.writer.take() {
126            writer
127                .close()
128                .map_err(|e| ExportError::Parquet(e.to_string()))?;
129        }
130        Ok(self.count)
131    }
132}
133
134/// Write one row group from a slice of records.
135fn write_row_group(
136    writer: &mut SerializedFileWriter<std::fs::File>,
137    chunk: &[PacketRecord],
138) -> Result<(), ExportError> {
139    let mut rg = writer
140        .next_row_group()
141        .map_err(|e| ExportError::Parquet(e.to_string()))?;
142
143    // ── timestamp_ns (REQUIRED INT64) ────────────────────────────────────────
144    {
145        let values: Vec<i64> = chunk.iter().map(|r| r.timestamp_ns as i64).collect();
146        write_required_i64(&mut rg, &values)?;
147    }
148
149    // ── src_ip (OPTIONAL BYTE_ARRAY / STRING) ────────────────────────────────
150    {
151        let strings: Vec<Option<String>> = chunk
152            .iter()
153            .map(|r| r.src_ip.map(|ip| ip.to_string()))
154            .collect();
155        write_optional_bytes(&mut rg, &strings)?;
156    }
157
158    // ── dst_ip ───────────────────────────────────────────────────────────────
159    {
160        let strings: Vec<Option<String>> = chunk
161            .iter()
162            .map(|r| r.dst_ip.map(|ip| ip.to_string()))
163            .collect();
164        write_optional_bytes(&mut rg, &strings)?;
165    }
166
167    // ── src_port (OPTIONAL INT32) ────────────────────────────────────────────
168    {
169        let values: Vec<Option<i32>> = chunk.iter().map(|r| r.src_port.map(|p| p as i32)).collect();
170        write_optional_i32(&mut rg, &values)?;
171    }
172
173    // ── dst_port ─────────────────────────────────────────────────────────────
174    {
175        let values: Vec<Option<i32>> = chunk.iter().map(|r| r.dst_port.map(|p| p as i32)).collect();
176        write_optional_i32(&mut rg, &values)?;
177    }
178
179    // ── protocol ─────────────────────────────────────────────────────────────
180    {
181        let values: Vec<Option<i32>> = chunk.iter().map(|r| r.protocol.map(|p| p as i32)).collect();
182        write_optional_i32(&mut rg, &values)?;
183    }
184
185    // ── flow_id (OPTIONAL INT64) ─────────────────────────────────────────────
186    {
187        let values: Vec<Option<i64>> = chunk
188            .iter()
189            .map(|r| r.flow_id.map(|id| id as i64))
190            .collect();
191        write_optional_i64(&mut rg, &values)?;
192    }
193
194    // ── caplen (REQUIRED INT32) ──────────────────────────────────────────────
195    {
196        let values: Vec<i32> = chunk.iter().map(|r| r.caplen as i32).collect();
197        write_required_i32(&mut rg, &values)?;
198    }
199
200    // ── origlen (REQUIRED INT32) ─────────────────────────────────────────────
201    {
202        let values: Vec<i32> = chunk.iter().map(|r| r.origlen as i32).collect();
203        write_required_i32(&mut rg, &values)?;
204    }
205
206    // ── tcp_flags (OPTIONAL INT32) ───────────────────────────────────────────
207    {
208        let values: Vec<Option<i32>> = chunk
209            .iter()
210            .map(|r| r.tcp_flags.map(|f| f as i32))
211            .collect();
212        write_optional_i32(&mut rg, &values)?;
213    }
214
215    // ── payload (OPTIONAL BYTE_ARRAY) ────────────────────────────────────────
216    {
217        let payloads: Vec<Option<&[u8]>> = chunk
218            .iter()
219            .map(|r| {
220                if r.payload.is_empty() {
221                    None
222                } else {
223                    Some(r.payload.as_slice())
224                }
225            })
226            .collect();
227        write_optional_binary(&mut rg, &payloads)?;
228    }
229
230    rg.close()
231        .map_err(|e| ExportError::Parquet(e.to_string()))?;
232    Ok(())
233}
234
235// ── Column helpers ────────────────────────────────────────────────────────────
236
237fn write_required_i64(
238    rg: &mut parquet::file::writer::SerializedRowGroupWriter<std::fs::File>,
239    values: &[i64],
240) -> Result<(), ExportError> {
241    let mut col = rg
242        .next_column()
243        .map_err(|e| ExportError::Parquet(e.to_string()))?
244        .expect("column count mismatch");
245    match col.untyped() {
246        ColumnWriter::Int64ColumnWriter(w) => {
247            w.write_batch(values, None, None)
248                .map_err(|e| ExportError::Parquet(e.to_string()))?;
249        }
250        _ => return Err(ExportError::Parquet("expected INT64 column".into())),
251    }
252    col.close()
253        .map_err(|e| ExportError::Parquet(e.to_string()))?;
254    Ok(())
255}
256
257fn write_required_i32(
258    rg: &mut parquet::file::writer::SerializedRowGroupWriter<std::fs::File>,
259    values: &[i32],
260) -> Result<(), ExportError> {
261    let mut col = rg
262        .next_column()
263        .map_err(|e| ExportError::Parquet(e.to_string()))?
264        .expect("column count mismatch");
265    match col.untyped() {
266        ColumnWriter::Int32ColumnWriter(w) => {
267            w.write_batch(values, None, None)
268                .map_err(|e| ExportError::Parquet(e.to_string()))?;
269        }
270        _ => return Err(ExportError::Parquet("expected INT32 column".into())),
271    }
272    col.close()
273        .map_err(|e| ExportError::Parquet(e.to_string()))?;
274    Ok(())
275}
276
277fn write_optional_i32(
278    rg: &mut parquet::file::writer::SerializedRowGroupWriter<std::fs::File>,
279    values: &[Option<i32>],
280) -> Result<(), ExportError> {
281    let non_null: Vec<i32> = values.iter().filter_map(|v| *v).collect();
282    let def_levels: Vec<i16> = values
283        .iter()
284        .map(|v| if v.is_some() { 1 } else { 0 })
285        .collect();
286
287    let mut col = rg
288        .next_column()
289        .map_err(|e| ExportError::Parquet(e.to_string()))?
290        .expect("column count mismatch");
291    match col.untyped() {
292        ColumnWriter::Int32ColumnWriter(w) => {
293            w.write_batch(&non_null, Some(&def_levels), None)
294                .map_err(|e| ExportError::Parquet(e.to_string()))?;
295        }
296        _ => return Err(ExportError::Parquet("expected INT32 column".into())),
297    }
298    col.close()
299        .map_err(|e| ExportError::Parquet(e.to_string()))?;
300    Ok(())
301}
302
303fn write_optional_i64(
304    rg: &mut parquet::file::writer::SerializedRowGroupWriter<std::fs::File>,
305    values: &[Option<i64>],
306) -> Result<(), ExportError> {
307    let non_null: Vec<i64> = values.iter().filter_map(|v| *v).collect();
308    let def_levels: Vec<i16> = values
309        .iter()
310        .map(|v| if v.is_some() { 1 } else { 0 })
311        .collect();
312
313    let mut col = rg
314        .next_column()
315        .map_err(|e| ExportError::Parquet(e.to_string()))?
316        .expect("column count mismatch");
317    match col.untyped() {
318        ColumnWriter::Int64ColumnWriter(w) => {
319            w.write_batch(&non_null, Some(&def_levels), None)
320                .map_err(|e| ExportError::Parquet(e.to_string()))?;
321        }
322        _ => return Err(ExportError::Parquet("expected INT64 column".into())),
323    }
324    col.close()
325        .map_err(|e| ExportError::Parquet(e.to_string()))?;
326    Ok(())
327}
328
329fn write_optional_bytes(
330    rg: &mut parquet::file::writer::SerializedRowGroupWriter<std::fs::File>,
331    values: &[Option<String>],
332) -> Result<(), ExportError> {
333    let non_null: Vec<ByteArray> = values
334        .iter()
335        .filter_map(|v| v.as_deref())
336        .map(|s| ByteArray::from(s.as_bytes().to_vec()))
337        .collect();
338    let def_levels: Vec<i16> = values
339        .iter()
340        .map(|v| if v.is_some() { 1 } else { 0 })
341        .collect();
342
343    let mut col = rg
344        .next_column()
345        .map_err(|e| ExportError::Parquet(e.to_string()))?
346        .expect("column count mismatch");
347    match col.untyped() {
348        ColumnWriter::ByteArrayColumnWriter(w) => {
349            w.write_batch(&non_null, Some(&def_levels), None)
350                .map_err(|e| ExportError::Parquet(e.to_string()))?;
351        }
352        _ => return Err(ExportError::Parquet("expected BYTE_ARRAY column".into())),
353    }
354    col.close()
355        .map_err(|e| ExportError::Parquet(e.to_string()))?;
356    Ok(())
357}
358
359fn write_optional_binary(
360    rg: &mut parquet::file::writer::SerializedRowGroupWriter<std::fs::File>,
361    values: &[Option<&[u8]>],
362) -> Result<(), ExportError> {
363    let non_null: Vec<ByteArray> = values
364        .iter()
365        .filter_map(|v| *v)
366        .map(|b| ByteArray::from(b.to_vec()))
367        .collect();
368    let def_levels: Vec<i16> = values
369        .iter()
370        .map(|v| if v.is_some() { 1 } else { 0 })
371        .collect();
372
373    let mut col = rg
374        .next_column()
375        .map_err(|e| ExportError::Parquet(e.to_string()))?
376        .expect("column count mismatch");
377    match col.untyped() {
378        ColumnWriter::ByteArrayColumnWriter(w) => {
379            w.write_batch(&non_null, Some(&def_levels), None)
380                .map_err(|e| ExportError::Parquet(e.to_string()))?;
381        }
382        _ => return Err(ExportError::Parquet("expected BYTE_ARRAY column".into())),
383    }
384    col.close()
385        .map_err(|e| ExportError::Parquet(e.to_string()))?;
386    Ok(())
387}
388
389/// Check which physical type the schema assigns to each column index.
390/// Used only for sanity testing; not called in production.
391pub fn column_repetitions() -> Vec<(usize, Repetition)> {
392    let schema = parse_message_type(SCHEMA_STR).unwrap();
393    schema
394        .get_fields()
395        .iter()
396        .enumerate()
397        .map(|(i, f)| (i, f.get_basic_info().repetition()))
398        .collect()
399}