re_data_loader/
loader_external.rs1use std::{
2 io::Read as _,
3 path::PathBuf,
4 sync::{Arc, LazyLock, atomic::AtomicBool},
5};
6
7use ahash::HashMap;
8use indexmap::IndexSet;
9
10use crate::{DataLoader as _, LoadedData};
11
12pub const EXTERNAL_DATA_LOADER_PREFIX: &str = "rerun-loader-";
18
19pub const EXTERNAL_DATA_LOADER_INCOMPATIBLE_EXIT_CODE: i32 = 66;
24
25pub static EXTERNAL_LOADER_PATHS: LazyLock<Vec<PathBuf>> = LazyLock::new(|| {
33 re_tracing::profile_scope!("initialize-external-loaders");
34
35 let dir_separator = if cfg!(target_os = "windows") {
36 ';'
37 } else {
38 ':'
39 };
40
41 let dirpaths = std::env::var("PATH")
42 .ok()
43 .into_iter()
44 .flat_map(|paths| {
45 paths
46 .split(dir_separator)
47 .map(ToOwned::to_owned)
48 .collect::<Vec<_>>()
49 })
50 .map(PathBuf::from);
51
52 let mut executables = HashMap::<String, IndexSet<PathBuf>>::default();
54 for dirpath in dirpaths {
55 re_tracing::profile_scope!("dir", dirpath.to_string_lossy());
56 let Ok(dir) = std::fs::read_dir(dirpath) else {
57 continue;
58 };
59 let paths = dir.into_iter().filter_map(|entry| {
60 let Ok(entry) = entry else {
61 return None;
62 };
63 let filepath = entry.path();
64 let is_rerun_loader = filepath.file_name().is_some_and(|filename| {
65 filename
66 .to_string_lossy()
67 .starts_with(EXTERNAL_DATA_LOADER_PREFIX)
68 });
69 (is_executable(&filepath) && is_rerun_loader).then_some(filepath)
70 });
71
72 for path in paths {
73 if let Some(filename) = path.file_name() {
74 let exe_paths = executables
75 .entry(filename.to_string_lossy().to_string())
76 .or_default();
77 exe_paths.insert(path.clone());
78 }
79 }
80 }
81
82 executables
83 .into_iter()
84 .filter_map(|(name, paths)| {
85 if paths.len() > 1 {
86 re_log::debug!(name, ?paths, "Found duplicated data-loader in $PATH");
87 }
88
89 paths.into_iter().next()
91 })
92 .collect()
93});
94
95#[inline]
97pub fn iter_external_loaders() -> impl ExactSizeIterator<Item = PathBuf> {
98 re_tracing::profile_wait!("EXTERNAL_LOADER_PATHS");
99 EXTERNAL_LOADER_PATHS.iter().cloned()
100}
101
102pub struct ExternalLoader;
114
115impl crate::DataLoader for ExternalLoader {
116 #[inline]
117 fn name(&self) -> String {
118 "rerun.data_loaders.External".into()
119 }
120
121 fn load_from_path(
122 &self,
123 settings: &crate::DataLoaderSettings,
124 filepath: PathBuf,
125 tx: std::sync::mpsc::Sender<crate::LoadedData>,
126 ) -> Result<(), crate::DataLoaderError> {
127 use std::process::{Command, Stdio};
128
129 re_tracing::profile_function!(filepath.display().to_string());
130
131 let external_loaders = {
132 re_tracing::profile_wait!("EXTERNAL_LOADER_PATHS");
133 EXTERNAL_LOADER_PATHS.iter()
134 };
135
136 #[derive(PartialEq, Eq)]
137 struct CompatibleLoaderFound;
138 let (tx_feedback, rx_feedback) = std::sync::mpsc::channel::<CompatibleLoaderFound>();
139
140 let args = settings.to_cli_args();
141 for exe in external_loaders {
142 let args = args.clone();
143 let filepath = filepath.clone();
144 let tx = tx.clone();
145 let tx_feedback = tx_feedback.clone();
146
147 _ = std::thread::Builder::new().name(exe.to_string_lossy().to_string()).spawn(move || {
150 re_tracing::profile_function!(exe.to_string_lossy());
151
152 let child = Command::new(exe)
153 .env_remove("RERUN_APP_ONLY")
156 .arg(filepath.clone())
157 .args(args)
158 .stdout(Stdio::piped())
159 .stderr(Stdio::piped())
160 .spawn();
161
162 let mut child = match child {
163 Ok(child) => child,
164 Err(err) => {
165 re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
166 return;
167 }
168 };
169
170 let Some(stdout) = child.stdout.take() else {
171 let reason = "stdout unreachable";
172 re_log::error!(?filepath, loader = ?exe, %reason, "Failed to execute external loader");
173 return;
174 };
175 let Some(stderr) = child.stderr.take() else {
176 let reason = "stderr unreachable";
177 re_log::error!(?filepath, loader = ?exe, %reason, "Failed to execute external loader");
178 return;
179 };
180
181 re_log::debug!(?filepath, loader = ?exe, "Loading data from filesystem using external loader…",);
182
183 let is_sending_data = Arc::new(AtomicBool::new(false));
186
187 let stdout = std::io::BufReader::new(stdout);
188 match re_log_encoding::Decoder::decode_eager(stdout) {
189 Ok(decoder) => {
190 let filepath = filepath.clone();
191 let tx = tx.clone();
192 if let Err(err) = std::thread::Builder::new()
195 .name(format!("decode_and_stream({filepath:?})"))
196 .spawn({
197 let filepath = filepath.clone();
198 let is_sending_data = Arc::clone(&is_sending_data);
199 move || decode_and_stream(&filepath, &tx, is_sending_data, decoder)
200 })
201 {
202 re_log::error!(?filepath, loader = ?exe, %err, "Failed to open spawn IO thread");
203 return;
204 }
205 }
206 Err(re_log_encoding::DecodeError::Read(_)) => {
207 }
212 Err(err) => {
213 re_log::error!(?filepath, loader = ?exe, %err, "Failed to decode external loader's output");
214 return;
215 }
216 }
217
218 loop {
225 re_tracing::profile_scope!("waiting for compatibility");
226
227 match child.try_wait() {
228 Ok(Some(_)) => break,
229 Ok(None) => {
230 if is_sending_data.load(std::sync::atomic::Ordering::Relaxed) {
231 re_log::debug!(loader = ?exe, ?filepath, "compatible external loader found");
234 tx_feedback.send(CompatibleLoaderFound).ok();
235 break; }
237
238 std::thread::yield_now();
240 }
241 Err(err) => {
242 re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
243 return;
244 }
245 }
246 }
247
248 let status = match child.wait() {
250 Ok(output) => output,
251 Err(err) => {
252 re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
253 return;
254 }
255 };
256
257 let is_compatible =
259 status.code() != Some(crate::EXTERNAL_DATA_LOADER_INCOMPATIBLE_EXIT_CODE);
260
261 if is_compatible {
262 let mut stderr = std::io::BufReader::new(stderr);
263 let mut stderr_str = String::new();
264 stderr.read_to_string(&mut stderr_str).ok();
265
266 if status.success() {
267 re_log::debug!(loader = ?exe, ?filepath, "Compatible external loader found");
268
269 let stderr_indented = stderr_str.lines().map(|line| format!(" {line}")).collect::<Vec<_>>().join("\n");
271 re_log::debug!("Dataloader stderr:\n{stderr_indented}");
272
273 tx_feedback.send(CompatibleLoaderFound).ok();
274 } else {
275 re_log::error!(?filepath, loader = ?exe, %stderr_str, "Failed to execute external loader");
276 }
277 }
278 })?;
279 }
280
281 re_tracing::profile_wait!("compatible_loader");
282
283 drop(tx_feedback);
284
285 let any_compatible_loader = rx_feedback.recv() == Ok(CompatibleLoaderFound);
286 if !any_compatible_loader {
287 return Err(crate::DataLoaderError::Incompatible(filepath.clone()));
290 }
291
292 Ok(())
293 }
294
295 #[inline]
296 fn load_from_file_contents(
297 &self,
298 _settings: &crate::DataLoaderSettings,
299 path: PathBuf,
300 _contents: std::borrow::Cow<'_, [u8]>,
301 _tx: std::sync::mpsc::Sender<crate::LoadedData>,
302 ) -> Result<(), crate::DataLoaderError> {
303 Err(crate::DataLoaderError::Incompatible(path))
306 }
307}
308
309#[expect(clippy::needless_pass_by_value)]
310fn decode_and_stream(
311 filepath: &std::path::Path,
312 tx: &std::sync::mpsc::Sender<crate::LoadedData>,
313 is_sending_data: Arc<AtomicBool>,
314 msgs: impl Iterator<Item = Result<re_log_types::LogMsg, re_log_encoding::DecodeError>>,
315) {
316 re_tracing::profile_function!(filepath.display().to_string());
317
318 for msg in msgs {
319 is_sending_data.store(true, std::sync::atomic::Ordering::Relaxed);
320
321 let msg = match msg {
322 Ok(msg) => msg,
323 Err(err) => {
324 re_log::warn_once!("Failed to decode message in {filepath:?}: {err}");
325 continue;
326 }
327 };
328
329 let data = LoadedData::LogMsg(ExternalLoader::name(&ExternalLoader), msg);
330 if tx.send(data).is_err() {
331 break; }
333 }
334}
335
336fn is_executable(path: &std::path::Path) -> bool {
339 if !path.is_file() {
340 return false;
341 }
342
343 let Ok(_metadata) = std::fs::metadata(path) else {
344 return false;
345 };
346
347 #[cfg(unix)]
348 {
349 use std::os::unix::fs::PermissionsExt as _;
350 let permissions = _metadata.permissions();
351 permissions.mode() & 0o111 != 0
352 }
353
354 #[cfg(windows)]
355 {
356 path.extension()
357 .and_then(|ext| ext.to_str())
358 .map(|ext| matches!(ext.to_lowercase().as_str(), "exe" | "bat" | "cmd"))
359 .unwrap_or(false)
360 }
361}