use std::path::PathBuf;
use anyhow::Context as _;
use crossbeam::channel;
use itertools::Itertools as _;
use re_chunk::external::crossbeam;
use re_log_types::LogMsg;
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum InputSource {
Stdin,
File(PathBuf),
}
impl std::fmt::Display for InputSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Stdin => write!(f, "stdin"),
Self::File(path) => write!(f, "{path:?}"),
}
}
}
pub fn read_rrd_streams_from_file_or_stdin(
paths: &[String],
) -> (
channel::Receiver<(InputSource, anyhow::Result<LogMsg>)>,
channel::Receiver<u64>,
) {
let path_to_input_rrds = paths
.iter()
.filter(|s| !s.is_empty()) .map(PathBuf::from)
.collect_vec();
let (tx, rx) = crossbeam::channel::bounded(100);
let (tx_size_bytes, rx_size_bytes) = crossbeam::channel::bounded(1);
_ = std::thread::Builder::new()
.name("rerun-rrd-in".to_owned())
.spawn(move || {
let mut size_bytes = 0;
if path_to_input_rrds.is_empty() {
let stdin = std::io::BufReader::new(std::io::stdin().lock());
let mut decoder = match re_log_encoding::decoder::Decoder::new_concatenated(stdin)
.context("couldn't decode stdin stream -- skipping")
{
Ok(decoder) => decoder,
Err(err) => {
tx.send((InputSource::Stdin, Err(err))).ok();
return;
}
};
for res in &mut decoder {
let res = res.context("couldn't decode message from stdin -- skipping");
tx.send((InputSource::Stdin, res)).ok();
}
size_bytes += decoder.size_bytes();
} else {
for rrd_path in path_to_input_rrds {
let rrd_file = match std::fs::File::open(&rrd_path)
.with_context(|| format!("couldn't open {rrd_path:?} -- skipping"))
{
Ok(file) => file,
Err(err) => {
tx.send((InputSource::File(rrd_path.clone()), Err(err)))
.ok();
continue;
}
};
let mut decoder = match re_log_encoding::decoder::Decoder::new(rrd_file)
.with_context(|| format!("couldn't decode {rrd_path:?} -- skipping"))
{
Ok(decoder) => decoder,
Err(err) => {
tx.send((InputSource::File(rrd_path.clone()), Err(err)))
.ok();
continue;
}
};
for res in &mut decoder {
let res = res.context("decode rrd message").with_context(|| {
format!("couldn't decode message {rrd_path:?} -- skipping")
});
tx.send((InputSource::File(rrd_path.clone()), res)).ok();
}
size_bytes += decoder.size_bytes();
}
}
tx_size_bytes.send(size_bytes).ok();
});
(rx, rx_size_bytes)
}