use std::{
io::Read,
sync::{atomic::AtomicBool, Arc},
};
use ahash::HashMap;
use once_cell::sync::Lazy;
pub const EXTERNAL_DATA_LOADER_PREFIX: &str = "rerun-loader-";
pub const EXTERNAL_DATA_LOADER_INCOMPATIBLE_EXIT_CODE: i32 = 66;
pub static EXTERNAL_LOADER_PATHS: Lazy<Vec<std::path::PathBuf>> = Lazy::new(|| {
re_tracing::profile_function!();
let dir_separator = if cfg!(target_os = "windows") {
';'
} else {
':'
};
let dirpaths = std::env::var("PATH")
.ok()
.into_iter()
.flat_map(|paths| {
paths
.split(dir_separator)
.map(ToOwned::to_owned)
.collect::<Vec<_>>()
})
.map(std::path::PathBuf::from);
let mut executables = HashMap::<String, Vec<std::path::PathBuf>>::default();
for dirpath in dirpaths {
let Ok(dir) = std::fs::read_dir(dirpath) else {
continue;
};
let paths = dir.into_iter().filter_map(|entry| {
let Ok(entry) = entry else {
return None;
};
let filepath = entry.path();
let is_rerun_loader = filepath.file_name().map_or(false, |filename| {
filename
.to_string_lossy()
.starts_with(EXTERNAL_DATA_LOADER_PREFIX)
});
(filepath.is_file() && is_rerun_loader).then_some(filepath)
});
for path in paths {
if let Some(filename) = path.file_name() {
let exe_paths = executables
.entry(filename.to_string_lossy().to_string())
.or_default();
exe_paths.push(path.clone());
}
}
}
executables
.into_iter()
.filter_map(|(name, paths)| {
if paths.len() > 1 {
re_log::debug!(name, ?paths, "Found duplicated data-loader in $PATH");
}
paths.into_iter().next()
})
.collect()
});
#[inline]
pub fn iter_external_loaders() -> impl ExactSizeIterator<Item = std::path::PathBuf> {
EXTERNAL_LOADER_PATHS.iter().cloned()
}
pub struct ExternalLoader;
impl crate::DataLoader for ExternalLoader {
#[inline]
fn name(&self) -> String {
"rerun.data_loaders.External".into()
}
fn load_from_path(
&self,
store_id: re_log_types::StoreId,
filepath: std::path::PathBuf,
tx: std::sync::mpsc::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
use std::process::{Command, Stdio};
re_tracing::profile_function!(filepath.display().to_string());
#[derive(PartialEq, Eq)]
struct CompatibleLoaderFound;
let (tx_feedback, rx_feedback) = std::sync::mpsc::channel::<CompatibleLoaderFound>();
for exe in EXTERNAL_LOADER_PATHS.iter() {
let store_id = store_id.clone();
let filepath = filepath.clone();
let tx = tx.clone();
let tx_feedback = tx_feedback.clone();
_ = std::thread::Builder::new().name(exe.to_string_lossy().to_string()).spawn(move || {
re_tracing::profile_function!(exe.to_string_lossy());
let child = Command::new(exe)
.arg(filepath.clone())
.args(["--recording-id".to_owned(), store_id.to_string()])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn();
let mut child = match child {
Ok(child) => child,
Err(err) => {
re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
return;
}
};
let Some(stdout) = child.stdout.take() else {
let reason = "stdout unreachable";
re_log::error!(?filepath, loader = ?exe, %reason, "Failed to execute external loader");
return;
};
let Some(stderr) = child.stderr.take() else {
let reason = "stderr unreachable";
re_log::error!(?filepath, loader = ?exe, %reason, "Failed to execute external loader");
return;
};
re_log::debug!(?filepath, loader = ?exe, "Loading data from filesystem using external loader…",);
let is_sending_data = Arc::new(AtomicBool::new(false));
let version_policy = re_log_encoding::decoder::VersionPolicy::Warn;
let stdout = std::io::BufReader::new(stdout);
match re_log_encoding::decoder::Decoder::new(version_policy, stdout) {
Ok(decoder) => {
let filepath = filepath.clone();
let tx = tx.clone();
if let Err(err) = std::thread::Builder::new()
.name(format!("decode_and_stream({filepath:?})"))
.spawn({
let filepath = filepath.clone();
let is_sending_data = Arc::clone(&is_sending_data);
move || decode_and_stream(&filepath, &tx, is_sending_data, decoder)
})
{
re_log::error!(?filepath, loader = ?exe, %err, "Failed to open spawn IO thread");
return;
}
}
Err(re_log_encoding::decoder::DecodeError::Read(_)) => {
}
Err(err) => {
re_log::error!(?filepath, loader = ?exe, %err, "Failed to decode external loader's output");
return;
}
};
loop {
re_tracing::profile_scope!("waiting for compatibility");
match child.try_wait() {
Ok(Some(_)) => break,
Ok(None) => {
if is_sending_data.load(std::sync::atomic::Ordering::Relaxed) {
re_log::debug!(loader = ?exe, ?filepath, "compatible external loader found");
tx_feedback.send(CompatibleLoaderFound).ok();
break; }
std::thread::yield_now();
continue;
}
Err(err) => {
re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
return;
}
};
}
let status = match child.wait() {
Ok(output) => output,
Err(err) => {
re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
return;
}
};
let is_compatible =
status.code() != Some(crate::EXTERNAL_DATA_LOADER_INCOMPATIBLE_EXIT_CODE);
if is_compatible && !status.success() {
let mut stderr = std::io::BufReader::new(stderr);
let mut reason = String::new();
stderr.read_to_string(&mut reason).ok();
re_log::error!(?filepath, loader = ?exe, %reason, "Failed to execute external loader");
}
if is_compatible {
re_log::debug!(loader = ?exe, ?filepath, "compatible external loader found");
tx_feedback.send(CompatibleLoaderFound).ok();
}
})?;
}
re_tracing::profile_wait!("compatible_loader");
drop(tx_feedback);
let any_compatible_loader = rx_feedback.recv() == Ok(CompatibleLoaderFound);
if !any_compatible_loader {
return Err(crate::DataLoaderError::Incompatible(filepath.clone()));
}
Ok(())
}
#[inline]
fn load_from_file_contents(
&self,
_store_id: re_log_types::StoreId,
path: std::path::PathBuf,
_contents: std::borrow::Cow<'_, [u8]>,
_tx: std::sync::mpsc::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
Err(crate::DataLoaderError::Incompatible(path))
}
}
#[allow(clippy::needless_pass_by_value)]
fn decode_and_stream<R: std::io::Read>(
filepath: &std::path::Path,
tx: &std::sync::mpsc::Sender<crate::LoadedData>,
is_sending_data: Arc<AtomicBool>,
decoder: re_log_encoding::decoder::Decoder<R>,
) {
re_tracing::profile_function!(filepath.display().to_string());
for msg in decoder {
is_sending_data.store(true, std::sync::atomic::Ordering::Relaxed);
let msg = match msg {
Ok(msg) => msg,
Err(err) => {
re_log::warn_once!("Failed to decode message in {filepath:?}: {err}");
continue;
}
};
if tx.send(msg.into()).is_err() {
break; }
}
}