use std::io::Read as _;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, LazyLock};
use ahash::HashMap;
use indexmap::IndexSet;
use crate::{DataLoader as _, LoadedData};
pub const EXTERNAL_DATA_LOADER_PREFIX: &str = "rerun-loader-";
pub const EXTERNAL_DATA_LOADER_INCOMPATIBLE_EXIT_CODE: i32 = 66;
pub static EXTERNAL_LOADER_PATHS: LazyLock<Vec<PathBuf>> = LazyLock::new(|| {
re_tracing::profile_scope!("initialize-external-loaders");
let dir_separator = if cfg!(target_os = "windows") {
';'
} else {
':'
};
let dirpaths = std::env::var("PATH")
.ok()
.into_iter()
.flat_map(|paths| {
paths
.split(dir_separator)
.map(ToOwned::to_owned)
.collect::<Vec<_>>()
})
.map(PathBuf::from);
let mut executables = HashMap::<String, IndexSet<PathBuf>>::default();
for dirpath in dirpaths {
re_tracing::profile_scope!("dir", dirpath.to_string_lossy());
let Ok(dir) = std::fs::read_dir(dirpath) else {
continue;
};
let paths = dir.into_iter().filter_map(|entry| {
let Ok(entry) = entry else {
return None;
};
let filepath = entry.path();
let is_rerun_loader = filepath.file_name().is_some_and(|filename| {
filename
.to_string_lossy()
.starts_with(EXTERNAL_DATA_LOADER_PREFIX)
});
(is_executable(&filepath) && is_rerun_loader).then_some(filepath)
});
for path in paths {
if let Some(filename) = path.file_name() {
let exe_paths = executables
.entry(filename.to_string_lossy().to_string())
.or_default();
exe_paths.insert(path.clone());
}
}
}
executables
.into_iter()
.filter_map(|(name, paths)| {
if paths.len() > 1 {
re_log::debug!(name, ?paths, "Found duplicated data-loader in $PATH");
}
paths.into_iter().next()
})
.collect()
});
#[inline]
pub fn iter_external_loaders() -> impl ExactSizeIterator<Item = PathBuf> {
re_tracing::profile_wait!("EXTERNAL_LOADER_PATHS");
EXTERNAL_LOADER_PATHS.iter().cloned()
}
pub struct ExternalLoader;
impl crate::DataLoader for ExternalLoader {
#[inline]
fn name(&self) -> String {
"rerun.data_loaders.External".into()
}
fn load_from_path(
&self,
settings: &crate::DataLoaderSettings,
filepath: PathBuf,
tx: crossbeam::channel::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
use std::process::{Command, Stdio};
re_tracing::profile_function!(filepath.display().to_string());
let external_loaders = {
re_tracing::profile_wait!("EXTERNAL_LOADER_PATHS");
EXTERNAL_LOADER_PATHS.iter()
};
#[derive(Debug, PartialEq, Eq)]
struct CompatibleLoaderFound;
let (tx_feedback, rx_feedback) = crossbeam::channel::bounded::<CompatibleLoaderFound>(64);
let args = settings.to_cli_args();
for exe in external_loaders {
let args = args.clone();
let filepath = filepath.clone();
let tx = tx.clone();
let tx_feedback = tx_feedback.clone();
_ = std::thread::Builder::new().name(exe.to_string_lossy().to_string()).spawn(move || {
re_tracing::profile_function!(exe.to_string_lossy());
let child = Command::new(exe)
.env_remove("RERUN_APP_ONLY")
.arg(filepath.clone())
.args(args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn();
let mut child = match child {
Ok(child) => child,
Err(err) => {
re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
return;
}
};
let Some(stdout) = child.stdout.take() else {
let reason = "stdout unreachable";
re_log::error!(?filepath, loader = ?exe, %reason, "Failed to execute external loader");
return;
};
let Some(stderr) = child.stderr.take() else {
let reason = "stderr unreachable";
re_log::error!(?filepath, loader = ?exe, %reason, "Failed to execute external loader");
return;
};
re_log::debug!(?filepath, loader = ?exe, "Loading data from filesystem using external loader…",);
let is_sending_data = Arc::new(AtomicBool::new(false));
let stdout = std::io::BufReader::new(stdout);
match re_log_encoding::Decoder::decode_eager(stdout) {
Ok(decoder) => {
let filepath = filepath.clone();
let tx = tx.clone();
if let Err(err) = std::thread::Builder::new()
.name(format!("decode_and_stream({filepath:?})"))
.spawn({
let filepath = filepath.clone();
let is_sending_data = Arc::clone(&is_sending_data);
move || decode_and_stream(&filepath, &tx, is_sending_data, decoder)
})
{
re_log::error!(?filepath, loader = ?exe, %err, "Failed to open spawn IO thread");
return;
}
}
Err(re_log_encoding::DecodeError::Read(_)) => {
}
Err(err) => {
re_log::error!(?filepath, loader = ?exe, %err, "Failed to decode external loader's output");
return;
}
}
loop {
re_tracing::profile_scope!("waiting for compatibility");
match child.try_wait() {
Ok(Some(_)) => break,
Ok(None) => {
if is_sending_data.load(std::sync::atomic::Ordering::Relaxed) {
re_log::debug!(loader = ?exe, ?filepath, "compatible external loader found");
re_quota_channel::send_crossbeam(&tx_feedback, CompatibleLoaderFound).ok();
break; }
std::thread::yield_now();
}
Err(err) => {
re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
return;
}
}
}
let status = match child.wait() {
Ok(output) => output,
Err(err) => {
re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
return;
}
};
let is_compatible =
status.code() != Some(crate::EXTERNAL_DATA_LOADER_INCOMPATIBLE_EXIT_CODE);
if is_compatible {
let mut stderr = std::io::BufReader::new(stderr);
let mut stderr_str = String::new();
stderr.read_to_string(&mut stderr_str).ok();
if status.success() {
re_log::debug!(loader = ?exe, ?filepath, "Compatible external loader found");
let stderr_indented = stderr_str.lines().map(|line| format!(" {line}")).collect::<Vec<_>>().join("\n");
re_log::debug!("Dataloader stderr:\n{stderr_indented}");
re_quota_channel::send_crossbeam(&tx_feedback, CompatibleLoaderFound).ok();
} else {
re_log::error!(?filepath, loader = ?exe, %stderr_str, "Failed to execute external loader");
}
}
})?;
}
re_tracing::profile_wait!("compatible_loader");
drop(tx_feedback);
let any_compatible_loader = rx_feedback.recv() == Ok(CompatibleLoaderFound);
if !any_compatible_loader {
return Err(crate::DataLoaderError::Incompatible(filepath.clone()));
}
Ok(())
}
#[inline]
fn load_from_file_contents(
&self,
_settings: &crate::DataLoaderSettings,
path: PathBuf,
_contents: std::borrow::Cow<'_, [u8]>,
_tx: crossbeam::channel::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
Err(crate::DataLoaderError::Incompatible(path))
}
}
#[expect(clippy::needless_pass_by_value)]
fn decode_and_stream(
filepath: &std::path::Path,
tx: &crossbeam::channel::Sender<crate::LoadedData>,
is_sending_data: Arc<AtomicBool>,
msgs: impl Iterator<Item = Result<re_log_types::LogMsg, re_log_encoding::DecodeError>>,
) {
re_tracing::profile_function!(filepath.display().to_string());
for msg in msgs {
is_sending_data.store(true, std::sync::atomic::Ordering::Relaxed);
let msg = match msg {
Ok(msg) => msg,
Err(err) => {
re_log::warn!(?filepath, "Failed to decode message: {err}");
continue;
}
};
let data = LoadedData::LogMsg(ExternalLoader::name(&ExternalLoader), msg);
if re_quota_channel::send_crossbeam(tx, data).is_err() {
break; }
}
}
fn is_executable(path: &std::path::Path) -> bool {
if !path.is_file() {
return false;
}
let Ok(_metadata) = std::fs::metadata(path) else {
return false;
};
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt as _;
let permissions = _metadata.permissions();
permissions.mode() & 0o111 != 0
}
#[cfg(windows)]
{
path.extension()
.and_then(|ext| ext.to_str())
.map(|ext| matches!(ext.to_lowercase().as_str(), "exe" | "bat" | "cmd"))
.unwrap_or(false)
}
}