re_data_loader/
loader_external.rs1use std::{
2 io::Read as _,
3 sync::{atomic::AtomicBool, Arc},
4};
5
6use ahash::HashMap;
7use once_cell::sync::Lazy;
8
9use crate::{DataLoader as _, LoadedData};
10
11pub const EXTERNAL_DATA_LOADER_PREFIX: &str = "rerun-loader-";
17
18pub const EXTERNAL_DATA_LOADER_INCOMPATIBLE_EXIT_CODE: i32 = 66;
23
24pub static EXTERNAL_LOADER_PATHS: Lazy<Vec<std::path::PathBuf>> = Lazy::new(|| {
32 re_tracing::profile_scope!("initialize-external-loaders");
33
34 let dir_separator = if cfg!(target_os = "windows") {
35 ';'
36 } else {
37 ':'
38 };
39
40 let dirpaths = std::env::var("PATH")
41 .ok()
42 .into_iter()
43 .flat_map(|paths| {
44 paths
45 .split(dir_separator)
46 .map(ToOwned::to_owned)
47 .collect::<Vec<_>>()
48 })
49 .map(std::path::PathBuf::from);
50
51 let mut executables = HashMap::<String, Vec<std::path::PathBuf>>::default();
52 for dirpath in dirpaths {
53 re_tracing::profile_scope!("dir", dirpath.to_string_lossy());
54 let Ok(dir) = std::fs::read_dir(dirpath) else {
55 continue;
56 };
57 let paths = dir.into_iter().filter_map(|entry| {
58 let Ok(entry) = entry else {
59 return None;
60 };
61 let filepath = entry.path();
62 let is_rerun_loader = filepath.file_name().is_some_and(|filename| {
63 filename
64 .to_string_lossy()
65 .starts_with(EXTERNAL_DATA_LOADER_PREFIX)
66 });
67 (filepath.is_file() && is_rerun_loader).then_some(filepath)
68 });
69
70 for path in paths {
71 if let Some(filename) = path.file_name() {
72 let exe_paths = executables
73 .entry(filename.to_string_lossy().to_string())
74 .or_default();
75 exe_paths.push(path.clone());
76 }
77 }
78 }
79
80 executables
81 .into_iter()
82 .filter_map(|(name, paths)| {
83 if paths.len() > 1 {
84 re_log::debug!(name, ?paths, "Found duplicated data-loader in $PATH");
85 }
86
87 paths.into_iter().next()
89 })
90 .collect()
91});
92
93#[inline]
95pub fn iter_external_loaders() -> impl ExactSizeIterator<Item = std::path::PathBuf> {
96 re_tracing::profile_wait!("EXTERNAL_LOADER_PATHS");
97 EXTERNAL_LOADER_PATHS.iter().cloned()
98}
99
100pub struct ExternalLoader;
112
113impl crate::DataLoader for ExternalLoader {
114 #[inline]
115 fn name(&self) -> String {
116 "rerun.data_loaders.External".into()
117 }
118
119 fn load_from_path(
120 &self,
121 settings: &crate::DataLoaderSettings,
122 filepath: std::path::PathBuf,
123 tx: std::sync::mpsc::Sender<crate::LoadedData>,
124 ) -> Result<(), crate::DataLoaderError> {
125 use std::process::{Command, Stdio};
126
127 re_tracing::profile_function!(filepath.display().to_string());
128
129 let external_loaders = {
130 re_tracing::profile_wait!("EXTERNAL_LOADER_PATHS");
131 EXTERNAL_LOADER_PATHS.iter()
132 };
133
134 #[derive(PartialEq, Eq)]
135 struct CompatibleLoaderFound;
136 let (tx_feedback, rx_feedback) = std::sync::mpsc::channel::<CompatibleLoaderFound>();
137
138 let args = settings.to_cli_args();
139 for exe in external_loaders {
140 let args = args.clone();
141 let filepath = filepath.clone();
142 let tx = tx.clone();
143 let tx_feedback = tx_feedback.clone();
144
145 _ = std::thread::Builder::new().name(exe.to_string_lossy().to_string()).spawn(move || {
148 re_tracing::profile_function!(exe.to_string_lossy());
149
150 let child = Command::new(exe)
151 .env_remove("RERUN_APP_ONLY")
154 .arg(filepath.clone())
155 .args(args)
156 .stdout(Stdio::piped())
157 .stderr(Stdio::piped())
158 .spawn();
159
160 let mut child = match child {
161 Ok(child) => child,
162 Err(err) => {
163 re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
164 return;
165 }
166 };
167
168 let Some(stdout) = child.stdout.take() else {
169 let reason = "stdout unreachable";
170 re_log::error!(?filepath, loader = ?exe, %reason, "Failed to execute external loader");
171 return;
172 };
173 let Some(stderr) = child.stderr.take() else {
174 let reason = "stderr unreachable";
175 re_log::error!(?filepath, loader = ?exe, %reason, "Failed to execute external loader");
176 return;
177 };
178
179 re_log::debug!(?filepath, loader = ?exe, "Loading data from filesystem using external loader…",);
180
181 let is_sending_data = Arc::new(AtomicBool::new(false));
184
185
186 let stdout = std::io::BufReader::new(stdout);
187 match re_log_encoding::decoder::Decoder::new(stdout) {
188 Ok(decoder) => {
189 let filepath = filepath.clone();
190 let tx = tx.clone();
191 if let Err(err) = std::thread::Builder::new()
194 .name(format!("decode_and_stream({filepath:?})"))
195 .spawn({
196 let filepath = filepath.clone();
197 let is_sending_data = Arc::clone(&is_sending_data);
198 move || decode_and_stream(&filepath, &tx, is_sending_data, decoder)
199 })
200 {
201 re_log::error!(?filepath, loader = ?exe, %err, "Failed to open spawn IO thread");
202 return;
203 }
204 }
205 Err(re_log_encoding::decoder::DecodeError::Read(_)) => {
206 }
211 Err(err) => {
212 re_log::error!(?filepath, loader = ?exe, %err, "Failed to decode external loader's output");
213 return;
214 }
215 };
216
217 loop {
224 re_tracing::profile_scope!("waiting for compatibility");
225
226 match child.try_wait() {
227 Ok(Some(_)) => break,
228 Ok(None) => {
229 if is_sending_data.load(std::sync::atomic::Ordering::Relaxed) {
230 re_log::debug!(loader = ?exe, ?filepath, "compatible external loader found");
233 tx_feedback.send(CompatibleLoaderFound).ok();
234 break; }
236
237 std::thread::yield_now();
239
240 continue;
241 }
242 Err(err) => {
243 re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
244 return;
245 }
246 };
247 }
248
249 let status = match child.wait() {
251 Ok(output) => output,
252 Err(err) => {
253 re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
254 return;
255 }
256 };
257
258 let is_compatible =
260 status.code() != Some(crate::EXTERNAL_DATA_LOADER_INCOMPATIBLE_EXIT_CODE);
261
262 if is_compatible && !status.success() {
263 let mut stderr = std::io::BufReader::new(stderr);
264 let mut reason = String::new();
265 stderr.read_to_string(&mut reason).ok();
266 re_log::error!(?filepath, loader = ?exe, %reason, "Failed to execute external loader");
267 }
268
269 if is_compatible {
270 re_log::debug!(loader = ?exe, ?filepath, "compatible external loader found");
271 tx_feedback.send(CompatibleLoaderFound).ok();
272 }
273 })?;
274 }
275
276 re_tracing::profile_wait!("compatible_loader");
277
278 drop(tx_feedback);
279
280 let any_compatible_loader = rx_feedback.recv() == Ok(CompatibleLoaderFound);
281 if !any_compatible_loader {
282 return Err(crate::DataLoaderError::Incompatible(filepath.clone()));
285 }
286
287 Ok(())
288 }
289
290 #[inline]
291 fn load_from_file_contents(
292 &self,
293 _settings: &crate::DataLoaderSettings,
294 path: std::path::PathBuf,
295 _contents: std::borrow::Cow<'_, [u8]>,
296 _tx: std::sync::mpsc::Sender<crate::LoadedData>,
297 ) -> Result<(), crate::DataLoaderError> {
298 Err(crate::DataLoaderError::Incompatible(path))
301 }
302}
303
304#[allow(clippy::needless_pass_by_value)]
305fn decode_and_stream<R: std::io::Read>(
306 filepath: &std::path::Path,
307 tx: &std::sync::mpsc::Sender<crate::LoadedData>,
308 is_sending_data: Arc<AtomicBool>,
309 decoder: re_log_encoding::decoder::Decoder<R>,
310) {
311 re_tracing::profile_function!(filepath.display().to_string());
312
313 for msg in decoder {
314 is_sending_data.store(true, std::sync::atomic::Ordering::Relaxed);
315
316 let msg = match msg {
317 Ok(msg) => msg,
318 Err(err) => {
319 re_log::warn_once!("Failed to decode message in {filepath:?}: {err}");
320 continue;
321 }
322 };
323
324 let data = LoadedData::LogMsg(ExternalLoader::name(&ExternalLoader), msg);
325 if tx.send(data).is_err() {
326 break; }
328 }
329}