use std::path::PathBuf;
use anyhow::Context as _;
use crossbeam::channel;
use itertools::Itertools as _;
use re_chunk::external::crossbeam;
use re_log_encoding::RawRrdManifest;
use re_quota_channel::send_crossbeam;
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum InputSource {
Stdin,
File(PathBuf),
}
impl std::fmt::Display for InputSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Stdin => write!(f, "stdin"),
Self::File(path) => write!(f, "{path:?}"),
}
}
}
#[expect(clippy::type_complexity)] pub fn read_rrd_streams_from_file_or_stdin(
paths: &[String],
) -> (
channel::Receiver<(InputSource, anyhow::Result<re_log_types::LogMsg>)>,
channel::Receiver<(u64, Vec<(InputSource, anyhow::Result<RawRrdManifest>)>)>,
) {
read_any_rrd_streams_from_file_or_stdin::<re_log_types::LogMsg>(paths)
}
#[expect(clippy::type_complexity)] pub fn read_raw_rrd_streams_from_file_or_stdin(
paths: &[String],
) -> (
channel::Receiver<(
InputSource,
anyhow::Result<re_protos::log_msg::v1alpha1::log_msg::Msg>,
)>,
channel::Receiver<(u64, Vec<(InputSource, anyhow::Result<RawRrdManifest>)>)>,
) {
read_any_rrd_streams_from_file_or_stdin::<re_protos::log_msg::v1alpha1::log_msg::Msg>(paths)
}
#[expect(clippy::type_complexity)] fn read_any_rrd_streams_from_file_or_stdin<
T: re_log_encoding::DecoderEntrypoint + Send + std::fmt::Debug + 'static,
>(
paths: &[String],
) -> (
channel::Receiver<(InputSource, anyhow::Result<T>)>,
channel::Receiver<(u64, Vec<(InputSource, anyhow::Result<RawRrdManifest>)>)>,
) {
let path_to_input_rrds = paths
.iter()
.filter(|s| !s.is_empty()) .map(PathBuf::from)
.collect_vec();
let (tx_msgs, rx_msgs) = crossbeam::channel::bounded(100);
let (tx_metadata, rx_metadata) = crossbeam::channel::bounded(1);
_ = std::thread::Builder::new()
.name("rerun-rrd-in".to_owned())
.spawn(move || {
let mut rrd_manifests = Vec::new();
let mut size_bytes = 0;
if path_to_input_rrds.is_empty() {
let source = InputSource::Stdin;
let stdin = std::io::BufReader::new(std::io::stdin().lock());
let mut decoder = re_log_encoding::Decoder::decode_lazy(stdin);
for res in &mut decoder {
let res = res.context("couldn't decode message from stdin -- skipping");
send_crossbeam(&tx_msgs, (source.clone(), res)).ok();
}
size_bytes += decoder.num_bytes_processed();
match decoder.rrd_manifests().context("couldn't decode footers") {
Ok(v) => rrd_manifests.extend(v.into_iter().map(|m| (source.clone(), Ok(m)))),
Err(err) => rrd_manifests.push((source.clone(), Err(err))),
}
} else {
for rrd_path in path_to_input_rrds {
let rrd_file = match std::fs::File::open(&rrd_path)
.with_context(|| format!("couldn't open {rrd_path:?} -- skipping"))
{
Ok(file) => file,
Err(err) => {
send_crossbeam(
&tx_msgs,
(InputSource::File(rrd_path.clone()), Err(err)),
)
.ok();
continue;
}
};
let source = InputSource::File(rrd_path.clone());
let rrd_file = std::io::BufReader::new(rrd_file);
let mut decoder = re_log_encoding::Decoder::decode_lazy(rrd_file);
for res in &mut decoder {
let res = res.context("decode rrd message").with_context(|| {
format!("couldn't decode message {rrd_path:?} -- skipping")
});
send_crossbeam(&tx_msgs, (source.clone(), res)).ok();
}
size_bytes += decoder.num_bytes_processed();
match decoder.rrd_manifests().context("couldn't decode footers") {
Ok(v) => {
rrd_manifests.extend(v.into_iter().map(|m| (source.clone(), Ok(m))));
}
Err(err) => rrd_manifests.push((source.clone(), Err(err))),
}
}
}
send_crossbeam(&tx_metadata, (size_bytes, rrd_manifests)).ok();
});
(rx_msgs, rx_metadata)
}