re_data_loader/
loader_external.rs

1use std::{
2    io::Read as _,
3    path::PathBuf,
4    sync::{Arc, LazyLock, atomic::AtomicBool},
5};
6
7use ahash::HashMap;
8use indexmap::IndexSet;
9
10use crate::{DataLoader as _, LoadedData};
11
12// ---
13
14/// To register a new external data loader, simply add an executable in your $PATH whose name
15/// starts with this prefix.
16// NOTE: this constant is duplicated in `rerun` to avoid an extra dependency there.
17pub const EXTERNAL_DATA_LOADER_PREFIX: &str = "rerun-loader-";
18
19/// When an external [`crate::DataLoader`] is asked to load some data that it doesn't know
20/// how to load, it should exit with this exit code.
21// NOTE: Always keep in sync with other languages.
22// NOTE: this constant is duplicated in `rerun` to avoid an extra dependency there.
23pub const EXTERNAL_DATA_LOADER_INCOMPATIBLE_EXIT_CODE: i32 = 66;
24
25/// Keeps track of the paths all external executable [`crate::DataLoader`]s.
26///
27/// Lazy initialized the first time a file is opened by running a full scan of the `$PATH`.
28///
29/// External loaders are _not_ registered on a per-extension basis: we want users to be able to
30/// filter data on a much more fine-grained basis that just file extensions (e.g. checking the file
31/// itself for magic bytes).
32pub static EXTERNAL_LOADER_PATHS: LazyLock<Vec<PathBuf>> = LazyLock::new(|| {
33    re_tracing::profile_scope!("initialize-external-loaders");
34
35    let dir_separator = if cfg!(target_os = "windows") {
36        ';'
37    } else {
38        ':'
39    };
40
41    let dirpaths = std::env::var("PATH")
42        .ok()
43        .into_iter()
44        .flat_map(|paths| {
45            paths
46                .split(dir_separator)
47                .map(ToOwned::to_owned)
48                .collect::<Vec<_>>()
49        })
50        .map(PathBuf::from);
51
52    // For each file name, collect the paths to all executables that match that name.
53    let mut executables = HashMap::<String, IndexSet<PathBuf>>::default();
54    for dirpath in dirpaths {
55        re_tracing::profile_scope!("dir", dirpath.to_string_lossy());
56        let Ok(dir) = std::fs::read_dir(dirpath) else {
57            continue;
58        };
59        let paths = dir.into_iter().filter_map(|entry| {
60            let Ok(entry) = entry else {
61                return None;
62            };
63            let filepath = entry.path();
64            let is_rerun_loader = filepath.file_name().is_some_and(|filename| {
65                filename
66                    .to_string_lossy()
67                    .starts_with(EXTERNAL_DATA_LOADER_PREFIX)
68            });
69            (is_executable(&filepath) && is_rerun_loader).then_some(filepath)
70        });
71
72        for path in paths {
73            if let Some(filename) = path.file_name() {
74                let exe_paths = executables
75                    .entry(filename.to_string_lossy().to_string())
76                    .or_default();
77                exe_paths.insert(path.clone());
78            }
79        }
80    }
81
82    executables
83        .into_iter()
84        .filter_map(|(name, paths)| {
85            if paths.len() > 1 {
86                re_log::debug!(name, ?paths, "Found duplicated data-loader in $PATH");
87            }
88
89            // Only keep the first entry according to PATH order.
90            paths.into_iter().next()
91        })
92        .collect()
93});
94
95/// Iterator over all registered external [`crate::DataLoader`]s.
96#[inline]
97pub fn iter_external_loaders() -> impl ExactSizeIterator<Item = PathBuf> {
98    re_tracing::profile_wait!("EXTERNAL_LOADER_PATHS");
99    EXTERNAL_LOADER_PATHS.iter().cloned()
100}
101
102// ---
103
104/// A [`crate::DataLoader`] that forwards the path to load to all executables present in
105/// the user's `PATH` with a name that starts with [`EXTERNAL_DATA_LOADER_PREFIX`].
106///
107/// The external loaders are expected to log rrd data to their standard output.
108///
109/// Refer to our `external_data_loader` example for more information.
110///
111/// Checkout our [guide](https://www.rerun.io/docs/reference/data-loaders/overview) on
112/// how to implement external loaders.
113pub struct ExternalLoader;
114
115impl crate::DataLoader for ExternalLoader {
116    #[inline]
117    fn name(&self) -> String {
118        "rerun.data_loaders.External".into()
119    }
120
121    fn load_from_path(
122        &self,
123        settings: &crate::DataLoaderSettings,
124        filepath: PathBuf,
125        tx: std::sync::mpsc::Sender<crate::LoadedData>,
126    ) -> Result<(), crate::DataLoaderError> {
127        use std::process::{Command, Stdio};
128
129        re_tracing::profile_function!(filepath.display().to_string());
130
131        let external_loaders = {
132            re_tracing::profile_wait!("EXTERNAL_LOADER_PATHS");
133            EXTERNAL_LOADER_PATHS.iter()
134        };
135
136        #[derive(PartialEq, Eq)]
137        struct CompatibleLoaderFound;
138        let (tx_feedback, rx_feedback) = std::sync::mpsc::channel::<CompatibleLoaderFound>();
139
140        let args = settings.to_cli_args();
141        for exe in external_loaders {
142            let args = args.clone();
143            let filepath = filepath.clone();
144            let tx = tx.clone();
145            let tx_feedback = tx_feedback.clone();
146
147            // NOTE: This is completely IO bound (spawning and waiting for child process), it must run on a
148            // dedicated thread, not the shared rayon thread pool.
149            _ = std::thread::Builder::new().name(exe.to_string_lossy().to_string()).spawn(move || {
150                re_tracing::profile_function!(exe.to_string_lossy());
151
152                let child = Command::new(exe)
153                    // Make sure the child dataloader doesn't think it's a Rerun Viewer, otherwise
154                    // it's never gonna be able to log anything.
155                    .env_remove("RERUN_APP_ONLY")
156                    .arg(filepath.clone())
157                    .args(args)
158                    .stdout(Stdio::piped())
159                    .stderr(Stdio::piped())
160                    .spawn();
161
162                let mut child = match child {
163                    Ok(child) => child,
164                    Err(err) => {
165                        re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
166                        return;
167                    }
168                };
169
170                let Some(stdout) = child.stdout.take() else {
171                    let reason = "stdout unreachable";
172                    re_log::error!(?filepath, loader = ?exe, %reason, "Failed to execute external loader");
173                    return;
174                };
175                let Some(stderr) = child.stderr.take() else {
176                    let reason = "stderr unreachable";
177                    re_log::error!(?filepath, loader = ?exe, %reason, "Failed to execute external loader");
178                    return;
179                };
180
181                re_log::debug!(?filepath, loader = ?exe, "Loading data from filesystem using external loader…",);
182
183                // A single value will be sent on this channel as soon as the child process starts
184                // streaming data to stdout.
185                let is_sending_data = Arc::new(AtomicBool::new(false));
186
187                let stdout = std::io::BufReader::new(stdout);
188                match re_log_encoding::Decoder::decode_eager(stdout) {
189                    Ok(decoder) => {
190                        let filepath = filepath.clone();
191                        let tx = tx.clone();
192                        // NOTE: This is completely IO bound, it must run on a dedicated thread, not the shared
193                        // rayon thread pool.
194                        if let Err(err) = std::thread::Builder::new()
195                            .name(format!("decode_and_stream({filepath:?})"))
196                            .spawn({
197                                let filepath = filepath.clone();
198                                let is_sending_data = Arc::clone(&is_sending_data);
199                                move || decode_and_stream(&filepath, &tx, is_sending_data, decoder)
200                            })
201                        {
202                            re_log::error!(?filepath, loader = ?exe, %err, "Failed to open spawn IO thread");
203                            return;
204                        }
205                    }
206                    Err(re_log_encoding::DecodeError::Read(_)) => {
207                        // The child was not interested in that file and left without logging
208                        // anything.
209                        // That's fine, we just need to make sure to check its exit status further
210                        // down, still.
211                    }
212                    Err(err) => {
213                        re_log::error!(?filepath, loader = ?exe, %err, "Failed to decode external loader's output");
214                        return;
215                    }
216                }
217
218                // We have to wait in order to know whether the child process is a compatible loader.
219                //
220                // This can manifest itself in two distinct ways:
221                // 1. If it exits immediately with an INCOMPATIBLE exit code, then we have our
222                //   answer straight away.
223                // - If it starts streaming data, then we immediately assume it's compatible.
224                loop {
225                    re_tracing::profile_scope!("waiting for compatibility");
226
227                    match child.try_wait() {
228                        Ok(Some(_)) => break,
229                        Ok(None) => {
230                            if is_sending_data.load(std::sync::atomic::Ordering::Relaxed) {
231                                // The child process has started streaming data, it is therefore compatible.
232                                // Let's get out ASAP.
233                                re_log::debug!(loader = ?exe, ?filepath, "compatible external loader found");
234                                tx_feedback.send(CompatibleLoaderFound).ok();
235                                break; // we still want to check for errors once it finally exits!
236                            }
237
238                            // NOTE: This will busy loop if there's no work available in the native OS thread-pool.
239                            std::thread::yield_now();
240                        }
241                        Err(err) => {
242                            re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
243                            return;
244                        }
245                    }
246                }
247
248                // NOTE: `try_wait` and `wait` are idempotent.
249                let status = match child.wait() {
250                    Ok(output) => output,
251                    Err(err) => {
252                        re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
253                        return;
254                    }
255                };
256
257                // NOTE: We assume that plugins are compatible until proven otherwise.
258                let is_compatible =
259                    status.code() != Some(crate::EXTERNAL_DATA_LOADER_INCOMPATIBLE_EXIT_CODE);
260
261                if is_compatible {
262                    let mut stderr = std::io::BufReader::new(stderr);
263                    let mut stderr_str = String::new();
264                    stderr.read_to_string(&mut stderr_str).ok();
265
266                    if status.success() {
267                        re_log::debug!(loader = ?exe, ?filepath, "Compatible external loader found");
268
269                        // Include any log output of the external loader in the console, because maybe it has useful information:
270                        let stderr_indented = stderr_str.lines().map(|line| format!("  {line}")).collect::<Vec<_>>().join("\n");
271                        re_log::debug!("Dataloader stderr:\n{stderr_indented}");
272
273                        tx_feedback.send(CompatibleLoaderFound).ok();
274                    } else {
275                        re_log::error!(?filepath, loader = ?exe, %stderr_str, "Failed to execute external loader");
276                    }
277                }
278            })?;
279        }
280
281        re_tracing::profile_wait!("compatible_loader");
282
283        drop(tx_feedback);
284
285        let any_compatible_loader = rx_feedback.recv() == Ok(CompatibleLoaderFound);
286        if !any_compatible_loader {
287            // NOTE: The only way to get here is if all loaders closed then sending end of the
288            // channel without sending anything, i.e. none of them are compatible.
289            return Err(crate::DataLoaderError::Incompatible(filepath.clone()));
290        }
291
292        Ok(())
293    }
294
295    #[inline]
296    fn load_from_file_contents(
297        &self,
298        _settings: &crate::DataLoaderSettings,
299        path: PathBuf,
300        _contents: std::borrow::Cow<'_, [u8]>,
301        _tx: std::sync::mpsc::Sender<crate::LoadedData>,
302    ) -> Result<(), crate::DataLoaderError> {
303        // TODO(#5324): You could imagine a world where plugins can be streamed rrd data via their
304        // standard input… but today is not world.
305        Err(crate::DataLoaderError::Incompatible(path))
306    }
307}
308
309#[expect(clippy::needless_pass_by_value)]
310fn decode_and_stream(
311    filepath: &std::path::Path,
312    tx: &std::sync::mpsc::Sender<crate::LoadedData>,
313    is_sending_data: Arc<AtomicBool>,
314    msgs: impl Iterator<Item = Result<re_log_types::LogMsg, re_log_encoding::DecodeError>>,
315) {
316    re_tracing::profile_function!(filepath.display().to_string());
317
318    for msg in msgs {
319        is_sending_data.store(true, std::sync::atomic::Ordering::Relaxed);
320
321        let msg = match msg {
322            Ok(msg) => msg,
323            Err(err) => {
324                re_log::warn_once!("Failed to decode message in {filepath:?}: {err}");
325                continue;
326            }
327        };
328
329        let data = LoadedData::LogMsg(ExternalLoader::name(&ExternalLoader), msg);
330        if tx.send(data).is_err() {
331            break; // The other end has decided to hang up, not our problem.
332        }
333    }
334}
335
336/// This helpfully ignores files with the correct prefix that are not executable,
337/// e.g. debug symbols for compiled binaries.
338fn is_executable(path: &std::path::Path) -> bool {
339    if !path.is_file() {
340        return false;
341    }
342
343    let Ok(_metadata) = std::fs::metadata(path) else {
344        return false;
345    };
346
347    #[cfg(unix)]
348    {
349        use std::os::unix::fs::PermissionsExt as _;
350        let permissions = _metadata.permissions();
351        permissions.mode() & 0o111 != 0
352    }
353
354    #[cfg(windows)]
355    {
356        path.extension()
357            .and_then(|ext| ext.to_str())
358            .map(|ext| matches!(ext.to_lowercase().as_str(), "exe" | "bat" | "cmd"))
359            .unwrap_or(false)
360    }
361}