re_data_loader/
loader_external.rs

1use std::{
2    io::Read as _,
3    sync::{atomic::AtomicBool, Arc},
4};
5
6use ahash::HashMap;
7use once_cell::sync::Lazy;
8
9use crate::{DataLoader as _, LoadedData};
10
11// ---
12
13/// To register a new external data loader, simply add an executable in your $PATH whose name
14/// starts with this prefix.
15// NOTE: this constant is duplicated in `rerun` to avoid an extra dependency there.
16pub const EXTERNAL_DATA_LOADER_PREFIX: &str = "rerun-loader-";
17
18/// When an external [`crate::DataLoader`] is asked to load some data that it doesn't know
19/// how to load, it should exit with this exit code.
20// NOTE: Always keep in sync with other languages.
21// NOTE: this constant is duplicated in `rerun` to avoid an extra dependency there.
22pub const EXTERNAL_DATA_LOADER_INCOMPATIBLE_EXIT_CODE: i32 = 66;
23
24/// Keeps track of the paths all external executable [`crate::DataLoader`]s.
25///
26/// Lazy initialized the first time a file is opened by running a full scan of the `$PATH`.
27///
28/// External loaders are _not_ registered on a per-extension basis: we want users to be able to
29/// filter data on a much more fine-grained basis that just file extensions (e.g. checking the file
30/// itself for magic bytes).
31pub static EXTERNAL_LOADER_PATHS: Lazy<Vec<std::path::PathBuf>> = Lazy::new(|| {
32    re_tracing::profile_scope!("initialize-external-loaders");
33
34    let dir_separator = if cfg!(target_os = "windows") {
35        ';'
36    } else {
37        ':'
38    };
39
40    let dirpaths = std::env::var("PATH")
41        .ok()
42        .into_iter()
43        .flat_map(|paths| {
44            paths
45                .split(dir_separator)
46                .map(ToOwned::to_owned)
47                .collect::<Vec<_>>()
48        })
49        .map(std::path::PathBuf::from);
50
51    let mut executables = HashMap::<String, Vec<std::path::PathBuf>>::default();
52    for dirpath in dirpaths {
53        re_tracing::profile_scope!("dir", dirpath.to_string_lossy());
54        let Ok(dir) = std::fs::read_dir(dirpath) else {
55            continue;
56        };
57        let paths = dir.into_iter().filter_map(|entry| {
58            let Ok(entry) = entry else {
59                return None;
60            };
61            let filepath = entry.path();
62            let is_rerun_loader = filepath.file_name().is_some_and(|filename| {
63                filename
64                    .to_string_lossy()
65                    .starts_with(EXTERNAL_DATA_LOADER_PREFIX)
66            });
67            (filepath.is_file() && is_rerun_loader).then_some(filepath)
68        });
69
70        for path in paths {
71            if let Some(filename) = path.file_name() {
72                let exe_paths = executables
73                    .entry(filename.to_string_lossy().to_string())
74                    .or_default();
75                exe_paths.push(path.clone());
76            }
77        }
78    }
79
80    executables
81        .into_iter()
82        .filter_map(|(name, paths)| {
83            if paths.len() > 1 {
84                re_log::debug!(name, ?paths, "Found duplicated data-loader in $PATH");
85            }
86
87            // Only keep the first entry according to PATH order.
88            paths.into_iter().next()
89        })
90        .collect()
91});
92
93/// Iterator over all registered external [`crate::DataLoader`]s.
94#[inline]
95pub fn iter_external_loaders() -> impl ExactSizeIterator<Item = std::path::PathBuf> {
96    re_tracing::profile_wait!("EXTERNAL_LOADER_PATHS");
97    EXTERNAL_LOADER_PATHS.iter().cloned()
98}
99
100// ---
101
102/// A [`crate::DataLoader`] that forwards the path to load to all executables present in
103/// the user's `PATH` with a name that starts with [`EXTERNAL_DATA_LOADER_PREFIX`].
104///
105/// The external loaders are expected to log rrd data to their standard output.
106///
107/// Refer to our `external_data_loader` example for more information.
108///
109/// Checkout our [guide](https://www.rerun.io/docs/reference/data-loaders/overview) on
110/// how to implement external loaders.
111pub struct ExternalLoader;
112
113impl crate::DataLoader for ExternalLoader {
114    #[inline]
115    fn name(&self) -> String {
116        "rerun.data_loaders.External".into()
117    }
118
119    fn load_from_path(
120        &self,
121        settings: &crate::DataLoaderSettings,
122        filepath: std::path::PathBuf,
123        tx: std::sync::mpsc::Sender<crate::LoadedData>,
124    ) -> Result<(), crate::DataLoaderError> {
125        use std::process::{Command, Stdio};
126
127        re_tracing::profile_function!(filepath.display().to_string());
128
129        let external_loaders = {
130            re_tracing::profile_wait!("EXTERNAL_LOADER_PATHS");
131            EXTERNAL_LOADER_PATHS.iter()
132        };
133
134        #[derive(PartialEq, Eq)]
135        struct CompatibleLoaderFound;
136        let (tx_feedback, rx_feedback) = std::sync::mpsc::channel::<CompatibleLoaderFound>();
137
138        let args = settings.to_cli_args();
139        for exe in external_loaders {
140            let args = args.clone();
141            let filepath = filepath.clone();
142            let tx = tx.clone();
143            let tx_feedback = tx_feedback.clone();
144
145            // NOTE: This is completely IO bound (spawning and waiting for child process), it must run on a
146            // dedicated thread, not the shared rayon thread pool.
147            _ = std::thread::Builder::new().name(exe.to_string_lossy().to_string()).spawn(move || {
148                re_tracing::profile_function!(exe.to_string_lossy());
149
150                let child = Command::new(exe)
151                    // Make sure the child dataloader doesn't think it's a Rerun Viewer, otherwise
152                    // it's never gonna be able to log anything.
153                    .env_remove("RERUN_APP_ONLY")
154                    .arg(filepath.clone())
155                    .args(args)
156                    .stdout(Stdio::piped())
157                    .stderr(Stdio::piped())
158                    .spawn();
159
160                let mut child = match child {
161                    Ok(child) => child,
162                    Err(err) => {
163                        re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
164                        return;
165                    }
166                };
167
168                let Some(stdout) = child.stdout.take() else {
169                    let reason = "stdout unreachable";
170                    re_log::error!(?filepath, loader = ?exe, %reason, "Failed to execute external loader");
171                    return;
172                };
173                let Some(stderr) = child.stderr.take() else {
174                    let reason = "stderr unreachable";
175                    re_log::error!(?filepath, loader = ?exe, %reason, "Failed to execute external loader");
176                    return;
177                };
178
179                re_log::debug!(?filepath, loader = ?exe, "Loading data from filesystem using external loader…",);
180
181                // A single value will be sent on this channel as soon as the child process starts
182                // streaming data to stdout.
183                let is_sending_data = Arc::new(AtomicBool::new(false));
184
185
186                let stdout = std::io::BufReader::new(stdout);
187                match re_log_encoding::decoder::Decoder::new(stdout) {
188                    Ok(decoder) => {
189                        let filepath = filepath.clone();
190                        let tx = tx.clone();
191                        // NOTE: This is completely IO bound, it must run on a dedicated thread, not the shared
192                        // rayon thread pool.
193                        if let Err(err) = std::thread::Builder::new()
194                            .name(format!("decode_and_stream({filepath:?})"))
195                            .spawn({
196                                let filepath = filepath.clone();
197                                let is_sending_data = Arc::clone(&is_sending_data);
198                                move || decode_and_stream(&filepath, &tx, is_sending_data, decoder)
199                            })
200                        {
201                            re_log::error!(?filepath, loader = ?exe, %err, "Failed to open spawn IO thread");
202                            return;
203                        }
204                    }
205                    Err(re_log_encoding::decoder::DecodeError::Read(_)) => {
206                        // The child was not interested in that file and left without logging
207                        // anything.
208                        // That's fine, we just need to make sure to check its exit status further
209                        // down, still.
210                    }
211                    Err(err) => {
212                        re_log::error!(?filepath, loader = ?exe, %err, "Failed to decode external loader's output");
213                        return;
214                    }
215                };
216
217                // We have to wait in order to know whether the child process is a compatible loader.
218                //
219                // This can manifest itself in two distinct ways:
220                // 1. If it exits immediately with an INCOMPATIBLE exit code, then we have our
221                //   answer straight away.
222                // - If it starts streaming data, then we immediately assume it's compatible.
223                loop {
224                    re_tracing::profile_scope!("waiting for compatibility");
225
226                    match child.try_wait() {
227                        Ok(Some(_)) => break,
228                        Ok(None) => {
229                            if is_sending_data.load(std::sync::atomic::Ordering::Relaxed) {
230                                // The child process has started streaming data, it is therefore compatible.
231                                // Let's get out ASAP.
232                                re_log::debug!(loader = ?exe, ?filepath, "compatible external loader found");
233                                tx_feedback.send(CompatibleLoaderFound).ok();
234                                break; // we still want to check for errors once it finally exits!
235                            }
236
237                            // NOTE: This will busy loop if there's no work available in the native OS thread-pool.
238                            std::thread::yield_now();
239
240                            continue;
241                        }
242                        Err(err) => {
243                            re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
244                            return;
245                        }
246                    };
247                }
248
249                // NOTE: `try_wait` and `wait` are idempotent.
250                let status = match child.wait() {
251                    Ok(output) => output,
252                    Err(err) => {
253                        re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
254                        return;
255                    }
256                };
257
258                // NOTE: We assume that plugins are compatible until proven otherwise.
259                let is_compatible =
260                    status.code() != Some(crate::EXTERNAL_DATA_LOADER_INCOMPATIBLE_EXIT_CODE);
261
262                if is_compatible && !status.success() {
263                    let mut stderr = std::io::BufReader::new(stderr);
264                    let mut reason = String::new();
265                    stderr.read_to_string(&mut reason).ok();
266                    re_log::error!(?filepath, loader = ?exe, %reason, "Failed to execute external loader");
267                }
268
269                if is_compatible {
270                    re_log::debug!(loader = ?exe, ?filepath, "compatible external loader found");
271                    tx_feedback.send(CompatibleLoaderFound).ok();
272                }
273            })?;
274        }
275
276        re_tracing::profile_wait!("compatible_loader");
277
278        drop(tx_feedback);
279
280        let any_compatible_loader = rx_feedback.recv() == Ok(CompatibleLoaderFound);
281        if !any_compatible_loader {
282            // NOTE: The only way to get here is if all loaders closed then sending end of the
283            // channel without sending anything, i.e. none of them are compatible.
284            return Err(crate::DataLoaderError::Incompatible(filepath.clone()));
285        }
286
287        Ok(())
288    }
289
290    #[inline]
291    fn load_from_file_contents(
292        &self,
293        _settings: &crate::DataLoaderSettings,
294        path: std::path::PathBuf,
295        _contents: std::borrow::Cow<'_, [u8]>,
296        _tx: std::sync::mpsc::Sender<crate::LoadedData>,
297    ) -> Result<(), crate::DataLoaderError> {
298        // TODO(#5324): You could imagine a world where plugins can be streamed rrd data via their
299        // standard input… but today is not world.
300        Err(crate::DataLoaderError::Incompatible(path))
301    }
302}
303
304#[allow(clippy::needless_pass_by_value)]
305fn decode_and_stream<R: std::io::Read>(
306    filepath: &std::path::Path,
307    tx: &std::sync::mpsc::Sender<crate::LoadedData>,
308    is_sending_data: Arc<AtomicBool>,
309    decoder: re_log_encoding::decoder::Decoder<R>,
310) {
311    re_tracing::profile_function!(filepath.display().to_string());
312
313    for msg in decoder {
314        is_sending_data.store(true, std::sync::atomic::Ordering::Relaxed);
315
316        let msg = match msg {
317            Ok(msg) => msg,
318            Err(err) => {
319                re_log::warn_once!("Failed to decode message in {filepath:?}: {err}");
320                continue;
321            }
322        };
323
324        let data = LoadedData::LogMsg(ExternalLoader::name(&ExternalLoader), msg);
325        if tx.send(data).is_err() {
326            break; // The other end has decided to hang up, not our problem.
327        }
328    }
329}