1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
pub mod pcap;
pub mod s3;
pub mod tournament_tree;
mod util;
use async_channel::bounded;
use async_compression::futures::bufread::{GzipDecoder, ZstdDecoder};
use bytes::Bytes;
use futures::io::AsyncRead;
use futures::stream::{StreamExt, TryStreamExt};
use tracing::{Instrument, Level};
use util::TakeThenBuffered;
fn download_s3_object_chunks_in_parallel(
path: &str,
) -> impl futures::AsyncBufRead + std::marker::Unpin {
// 128kb * 4_simultaneous_downloads == 128kB * 4000 files == 512MB for one chunk per-file. If decompression rate requires > 1 parallel chunk to achieve goal throughput,
// let's say 4 to start are needed, our expected max memory footprint will be 512MB + (512MB/12) * 4 == 512MB + 170.6666MB == 682.6666MB when 1/12th of the files are "active" at a time.
let object_chunks = s3::ObjectChunks::new(path, 1024 * 128).unwrap().boxed(); /* TODO: proper error on failure */
// TODO: confirm that s3 object downloads actually happen in parallel. If they're just concurrent, might need to add a spawn() somewhere?
let parallel_downloader = TakeThenBuffered::new(object_chunks, 1, 4);
parallel_downloader.into_async_read()
}
#[tracing::instrument]
pub fn stream_and_decode_pcap_packets(
path: String,
) -> impl futures::stream::Stream<Item = (u64, Bytes)> {
// Load the file with the provided path from S3 or the local file system based on the presence or absence of s3:// at the beginning
// of the file name. Wrap the file loader (which implements AsyncRead) in a ZstdDecoder or GzipDecoder if path ends with .zst or .gz.
// Continually batch all available packets into a vector using ready_chunks(), then forward them to the receiver in the 1-deep async
// channel created below. NOTE: the purpose of the 1-deep channel is to allow for parallelism and cross-thread communication between
// the thread/task which decompresses the file and parses out a stream of packets with the thread/task responsible for merging the packets
// together. Because there is only one merging thread, it is critical for throughput that our design allows for parallel merging w/r/t file decompression.
// NOTE: by using a one-deep channel holding all ready chunks associated w/ the stream, we guarantee that no further downloading,
// decompression, or file reading will occur until the first packet has been processed for the file and the next has been requested.
let (sender, receiver) = bounded(1);
smol::spawn(async move {
async fn decode_pcap_packets_to_channel<T: AsyncRead + std::marker::Unpin>(
reader: T,
channel: async_channel::Sender<Vec<(u64, Bytes)>>,
) {
/* TODO: create a "tracing" span tree associated with this file. would be cool to see this tree build all the way up to the merge function in the single-threaded case */
// TODO: is this better than stream.forward()?
let mut packet_stream = crate::pcap::Packets::new(1024 * 64, reader)
.await
.unwrap() /* TODO: nice error indicating what the issue is and bail */
.map(std::result::Result::unwrap)
.ready_chunks(2048); // batch as many packets as are available (up to 2048) into a single vector.. Maybe use a 1 or 2 deep channel somehow? while next_packet = ready!(packet_stream.next()) { packets.push(next_packet) }; send(ready_packets).. maybe could collect_ready()
// TODO: validate the choice of 2048 here or choose a number which makes more sense? is a smaller number like 1024 any better?? trade off between parallelism and memory usage
while let Some(packets) = packet_stream.next().instrument(tracing::trace_span!("NextPacket")).await {
tracing::event!(Level::TRACE, ts = packets[0].0);
channel.send(packets).await.unwrap();
}
channel.close();
}
// TODO: ask the rust user's forum for ideas about how to remove redundancy and simplify this code
// perhaps implement a .decompressed() function on an enum type to return a decompressed stream?
if path.starts_with("s3://") {
let s3_object_stream = download_s3_object_chunks_in_parallel(&path);
if path.ends_with(".zst") {
// TODO: consider implementing some sort of from() function for the enum to unify this code?
decode_pcap_packets_to_channel(ZstdDecoder::new(s3_object_stream), sender).await
} else if path.ends_with(".gz") {
decode_pcap_packets_to_channel(GzipDecoder::new(s3_object_stream), sender).await
} else /* if path.ends_with(".pcap") */ {
// uncompressed
decode_pcap_packets_to_channel(s3_object_stream, sender).await
}
} else { // local file loader. TODO: consider switching to use io_uring w/ Tokio for this?
// TODO: proper error handling if file doesn't exist
let file = std::fs::OpenOptions::new().read(true).open(&path).unwrap();
let loader = smol::io::BufReader::with_capacity( 1024 * 128, smol::Unblock::with_capacity(1024 * 128, file));
if path.ends_with(".zst") {
decode_pcap_packets_to_channel(ZstdDecoder::new(loader), sender).await;
} else if path.ends_with(".gz") {
decode_pcap_packets_to_channel(GzipDecoder::new(loader), sender).await;
} else /* if path.ends_with(".pcap") */ {
// uncompressed
decode_pcap_packets_to_channel(loader, sender).await;
}
}
})
.detach();
receiver.map(futures::stream::iter).flatten() // hide the vector-batching we used to minimize atomic operations w/ inter-thread communication
}