pcap_toolkit/export/
parquet.rs1use std::path::Path;
30use std::sync::Arc;
31
32use parquet::basic::{Compression, Repetition, ZstdLevel};
33use parquet::column::writer::ColumnWriter;
34use parquet::data_type::ByteArray;
35use parquet::file::properties::WriterProperties;
36use parquet::file::writer::SerializedFileWriter;
37use parquet::schema::parser::parse_message_type;
38use parquet::schema::types::Type;
39
40use crate::error::ExportError;
41
42use super::{PacketRecord, PacketSink};
43
44const BATCH_SIZE: usize = 4096;
46
47const SCHEMA_STR: &str = "
48message schema {
49 REQUIRED INT64 timestamp_ns;
50 OPTIONAL BYTE_ARRAY src_ip (STRING);
51 OPTIONAL BYTE_ARRAY dst_ip (STRING);
52 OPTIONAL INT32 src_port;
53 OPTIONAL INT32 dst_port;
54 OPTIONAL INT32 protocol;
55 OPTIONAL INT64 flow_id;
56 REQUIRED INT32 caplen;
57 REQUIRED INT32 origlen;
58 OPTIONAL INT32 tcp_flags;
59 OPTIONAL BYTE_ARRAY payload;
60}
61";
62
63pub struct ParquetSink {
71 writer: Option<SerializedFileWriter<std::fs::File>>,
72 buffer: Vec<PacketRecord>,
73 count: u64,
74}
75
76impl ParquetSink {
77 pub fn create(path: &Path, compress_payload: bool) -> Result<Self, ExportError> {
79 let schema: Arc<Type> = Arc::new(
80 parse_message_type(SCHEMA_STR).map_err(|e| ExportError::Parquet(e.to_string()))?,
81 );
82 let compression = if compress_payload {
83 Compression::ZSTD(
84 ZstdLevel::try_new(3).map_err(|e| ExportError::Parquet(e.to_string()))?,
85 )
86 } else {
87 Compression::SNAPPY
88 };
89 let props = Arc::new(
90 WriterProperties::builder()
91 .set_compression(compression)
92 .build(),
93 );
94 let file = std::fs::File::create(path)?;
95 let writer = SerializedFileWriter::new(file, schema, props)
96 .map_err(|e| ExportError::Parquet(e.to_string()))?;
97 Ok(Self {
98 writer: Some(writer),
99 buffer: Vec::with_capacity(BATCH_SIZE),
100 count: 0,
101 })
102 }
103}
104
105impl PacketSink for ParquetSink {
106 fn write(&mut self, record: &PacketRecord) -> Result<(), ExportError> {
107 self.buffer.push(record.clone());
108 self.count += 1;
109 if self.buffer.len() >= BATCH_SIZE {
110 let batch = std::mem::take(&mut self.buffer);
111 if let Some(ref mut w) = self.writer {
112 write_row_group(w, &batch)?;
113 }
114 }
115 Ok(())
116 }
117
118 fn close(&mut self) -> Result<u64, ExportError> {
119 if !self.buffer.is_empty() {
120 let batch = std::mem::take(&mut self.buffer);
121 if let Some(ref mut w) = self.writer {
122 write_row_group(w, &batch)?;
123 }
124 }
125 if let Some(writer) = self.writer.take() {
126 writer
127 .close()
128 .map_err(|e| ExportError::Parquet(e.to_string()))?;
129 }
130 Ok(self.count)
131 }
132}
133
134fn write_row_group(
136 writer: &mut SerializedFileWriter<std::fs::File>,
137 chunk: &[PacketRecord],
138) -> Result<(), ExportError> {
139 let mut rg = writer
140 .next_row_group()
141 .map_err(|e| ExportError::Parquet(e.to_string()))?;
142
143 {
145 let values: Vec<i64> = chunk.iter().map(|r| r.timestamp_ns as i64).collect();
146 write_required_i64(&mut rg, &values)?;
147 }
148
149 {
151 let strings: Vec<Option<String>> = chunk
152 .iter()
153 .map(|r| r.src_ip.map(|ip| ip.to_string()))
154 .collect();
155 write_optional_bytes(&mut rg, &strings)?;
156 }
157
158 {
160 let strings: Vec<Option<String>> = chunk
161 .iter()
162 .map(|r| r.dst_ip.map(|ip| ip.to_string()))
163 .collect();
164 write_optional_bytes(&mut rg, &strings)?;
165 }
166
167 {
169 let values: Vec<Option<i32>> = chunk.iter().map(|r| r.src_port.map(|p| p as i32)).collect();
170 write_optional_i32(&mut rg, &values)?;
171 }
172
173 {
175 let values: Vec<Option<i32>> = chunk.iter().map(|r| r.dst_port.map(|p| p as i32)).collect();
176 write_optional_i32(&mut rg, &values)?;
177 }
178
179 {
181 let values: Vec<Option<i32>> = chunk.iter().map(|r| r.protocol.map(|p| p as i32)).collect();
182 write_optional_i32(&mut rg, &values)?;
183 }
184
185 {
187 let values: Vec<Option<i64>> = chunk
188 .iter()
189 .map(|r| r.flow_id.map(|id| id as i64))
190 .collect();
191 write_optional_i64(&mut rg, &values)?;
192 }
193
194 {
196 let values: Vec<i32> = chunk.iter().map(|r| r.caplen as i32).collect();
197 write_required_i32(&mut rg, &values)?;
198 }
199
200 {
202 let values: Vec<i32> = chunk.iter().map(|r| r.origlen as i32).collect();
203 write_required_i32(&mut rg, &values)?;
204 }
205
206 {
208 let values: Vec<Option<i32>> = chunk
209 .iter()
210 .map(|r| r.tcp_flags.map(|f| f as i32))
211 .collect();
212 write_optional_i32(&mut rg, &values)?;
213 }
214
215 {
217 let payloads: Vec<Option<&[u8]>> = chunk
218 .iter()
219 .map(|r| {
220 if r.payload.is_empty() {
221 None
222 } else {
223 Some(r.payload.as_slice())
224 }
225 })
226 .collect();
227 write_optional_binary(&mut rg, &payloads)?;
228 }
229
230 rg.close()
231 .map_err(|e| ExportError::Parquet(e.to_string()))?;
232 Ok(())
233}
234
235fn write_required_i64(
238 rg: &mut parquet::file::writer::SerializedRowGroupWriter<std::fs::File>,
239 values: &[i64],
240) -> Result<(), ExportError> {
241 let mut col = rg
242 .next_column()
243 .map_err(|e| ExportError::Parquet(e.to_string()))?
244 .expect("column count mismatch");
245 match col.untyped() {
246 ColumnWriter::Int64ColumnWriter(w) => {
247 w.write_batch(values, None, None)
248 .map_err(|e| ExportError::Parquet(e.to_string()))?;
249 }
250 _ => return Err(ExportError::Parquet("expected INT64 column".into())),
251 }
252 col.close()
253 .map_err(|e| ExportError::Parquet(e.to_string()))?;
254 Ok(())
255}
256
257fn write_required_i32(
258 rg: &mut parquet::file::writer::SerializedRowGroupWriter<std::fs::File>,
259 values: &[i32],
260) -> Result<(), ExportError> {
261 let mut col = rg
262 .next_column()
263 .map_err(|e| ExportError::Parquet(e.to_string()))?
264 .expect("column count mismatch");
265 match col.untyped() {
266 ColumnWriter::Int32ColumnWriter(w) => {
267 w.write_batch(values, None, None)
268 .map_err(|e| ExportError::Parquet(e.to_string()))?;
269 }
270 _ => return Err(ExportError::Parquet("expected INT32 column".into())),
271 }
272 col.close()
273 .map_err(|e| ExportError::Parquet(e.to_string()))?;
274 Ok(())
275}
276
277fn write_optional_i32(
278 rg: &mut parquet::file::writer::SerializedRowGroupWriter<std::fs::File>,
279 values: &[Option<i32>],
280) -> Result<(), ExportError> {
281 let non_null: Vec<i32> = values.iter().filter_map(|v| *v).collect();
282 let def_levels: Vec<i16> = values
283 .iter()
284 .map(|v| if v.is_some() { 1 } else { 0 })
285 .collect();
286
287 let mut col = rg
288 .next_column()
289 .map_err(|e| ExportError::Parquet(e.to_string()))?
290 .expect("column count mismatch");
291 match col.untyped() {
292 ColumnWriter::Int32ColumnWriter(w) => {
293 w.write_batch(&non_null, Some(&def_levels), None)
294 .map_err(|e| ExportError::Parquet(e.to_string()))?;
295 }
296 _ => return Err(ExportError::Parquet("expected INT32 column".into())),
297 }
298 col.close()
299 .map_err(|e| ExportError::Parquet(e.to_string()))?;
300 Ok(())
301}
302
303fn write_optional_i64(
304 rg: &mut parquet::file::writer::SerializedRowGroupWriter<std::fs::File>,
305 values: &[Option<i64>],
306) -> Result<(), ExportError> {
307 let non_null: Vec<i64> = values.iter().filter_map(|v| *v).collect();
308 let def_levels: Vec<i16> = values
309 .iter()
310 .map(|v| if v.is_some() { 1 } else { 0 })
311 .collect();
312
313 let mut col = rg
314 .next_column()
315 .map_err(|e| ExportError::Parquet(e.to_string()))?
316 .expect("column count mismatch");
317 match col.untyped() {
318 ColumnWriter::Int64ColumnWriter(w) => {
319 w.write_batch(&non_null, Some(&def_levels), None)
320 .map_err(|e| ExportError::Parquet(e.to_string()))?;
321 }
322 _ => return Err(ExportError::Parquet("expected INT64 column".into())),
323 }
324 col.close()
325 .map_err(|e| ExportError::Parquet(e.to_string()))?;
326 Ok(())
327}
328
329fn write_optional_bytes(
330 rg: &mut parquet::file::writer::SerializedRowGroupWriter<std::fs::File>,
331 values: &[Option<String>],
332) -> Result<(), ExportError> {
333 let non_null: Vec<ByteArray> = values
334 .iter()
335 .filter_map(|v| v.as_deref())
336 .map(|s| ByteArray::from(s.as_bytes().to_vec()))
337 .collect();
338 let def_levels: Vec<i16> = values
339 .iter()
340 .map(|v| if v.is_some() { 1 } else { 0 })
341 .collect();
342
343 let mut col = rg
344 .next_column()
345 .map_err(|e| ExportError::Parquet(e.to_string()))?
346 .expect("column count mismatch");
347 match col.untyped() {
348 ColumnWriter::ByteArrayColumnWriter(w) => {
349 w.write_batch(&non_null, Some(&def_levels), None)
350 .map_err(|e| ExportError::Parquet(e.to_string()))?;
351 }
352 _ => return Err(ExportError::Parquet("expected BYTE_ARRAY column".into())),
353 }
354 col.close()
355 .map_err(|e| ExportError::Parquet(e.to_string()))?;
356 Ok(())
357}
358
359fn write_optional_binary(
360 rg: &mut parquet::file::writer::SerializedRowGroupWriter<std::fs::File>,
361 values: &[Option<&[u8]>],
362) -> Result<(), ExportError> {
363 let non_null: Vec<ByteArray> = values
364 .iter()
365 .filter_map(|v| *v)
366 .map(|b| ByteArray::from(b.to_vec()))
367 .collect();
368 let def_levels: Vec<i16> = values
369 .iter()
370 .map(|v| if v.is_some() { 1 } else { 0 })
371 .collect();
372
373 let mut col = rg
374 .next_column()
375 .map_err(|e| ExportError::Parquet(e.to_string()))?
376 .expect("column count mismatch");
377 match col.untyped() {
378 ColumnWriter::ByteArrayColumnWriter(w) => {
379 w.write_batch(&non_null, Some(&def_levels), None)
380 .map_err(|e| ExportError::Parquet(e.to_string()))?;
381 }
382 _ => return Err(ExportError::Parquet("expected BYTE_ARRAY column".into())),
383 }
384 col.close()
385 .map_err(|e| ExportError::Parquet(e.to_string()))?;
386 Ok(())
387}
388
389pub fn column_repetitions() -> Vec<(usize, Repetition)> {
392 let schema = parse_message_type(SCHEMA_STR).unwrap();
393 schema
394 .get_fields()
395 .iter()
396 .enumerate()
397 .map(|(i, f)| (i, f.get_basic_info().repetition()))
398 .collect()
399}