Skip to main content

re_data_loader/
load_file.rs

1use std::borrow::Cow;
2
3use ahash::{HashMap, HashMapExt as _};
4use re_log_channel::LogSender;
5use re_log_types::{FileSource, LogMsg};
6
7use crate::{DataLoader as _, DataLoaderError, LoadedData, RrdLoader};
8
9// ---
10
11/// Loads the given `path` using all [`crate::DataLoader`]s available.
12///
13/// A single `path` might be handled by more than one loader.
14///
15/// Synchronously checks whether the file exists and can be loaded. Beyond that, all
16/// errors are asynchronous and handled directly by the [`crate::DataLoader`]s themselves
17/// (i.e. they're logged).
18#[cfg(not(target_arch = "wasm32"))]
19pub fn load_from_path(
20    settings: &crate::DataLoaderSettings,
21    file_source: FileSource,
22    path: &std::path::Path,
23    // NOTE: This channel must be unbounded since we serialize all operations when running on wasm.
24    tx: &LogSender,
25) -> Result<(), DataLoaderError> {
26    use re_log_types::ApplicationId;
27
28    re_tracing::profile_function!(path.to_string_lossy());
29
30    if !path.exists() {
31        return Err(std::io::Error::new(
32            std::io::ErrorKind::NotFound,
33            format!("path does not exist: {path:?}"),
34        )
35        .into());
36    }
37
38    re_log::info!("Loading {path:?}…");
39
40    // If no application ID was specified, we derive one from the filename.
41    let application_id = settings.application_id.clone().or_else(|| {
42        path.file_name()
43            .map(|f| f.to_string_lossy().to_string())
44            .map(ApplicationId::from)
45    });
46    let settings = &crate::DataLoaderSettings {
47        // When loading a LeRobot dataset, avoid sending a `SetStoreInfo` message since the LeRobot loader handles this automatically.
48        force_store_info: !crate::lerobot::is_lerobot_dataset(path),
49        application_id,
50        ..settings.clone()
51    };
52
53    let rx = load(settings, path, None)?;
54
55    send(settings.clone(), file_source, rx, tx);
56
57    Ok(())
58}
59
60/// Loads the given `contents` using all [`crate::DataLoader`]s available.
61///
62/// A single file might be handled by more than one loader.
63///
64/// Synchronously checks that the file can be loaded. Beyond that, all errors are asynchronous
65/// and handled directly by the [`crate::DataLoader`]s themselves (i.e. they're logged).
66///
67/// `path` is only used for informational purposes, no data is ever read from the filesystem.
68pub fn load_from_file_contents(
69    settings: &crate::DataLoaderSettings,
70    file_source: FileSource,
71    filepath: &std::path::Path,
72    contents: std::borrow::Cow<'_, [u8]>,
73    // NOTE: This channel must be unbounded since we serialize all operations when running on wasm.
74    tx: &LogSender,
75) -> Result<(), DataLoaderError> {
76    re_tracing::profile_function!(filepath.to_string_lossy());
77
78    re_log::info!("Loading {filepath:?}…");
79
80    let data = load(settings, filepath, Some(contents))?;
81
82    send(settings.clone(), file_source, data, tx);
83
84    Ok(())
85}
86
87// ---
88
89/// Prepares an adequate [`re_log_types::StoreInfo`] [`LogMsg`] given the input.
90pub(crate) fn prepare_store_info(
91    store_id: &re_log_types::StoreId,
92    file_source: FileSource,
93) -> LogMsg {
94    re_tracing::profile_function!();
95
96    use re_log_types::SetStoreInfo;
97
98    let store_source = re_log_types::StoreSource::File { file_source };
99
100    LogMsg::SetStoreInfo(SetStoreInfo {
101        row_id: *re_chunk::RowId::new(),
102        info: re_log_types::StoreInfo::new(store_id.clone(), store_source),
103    })
104}
105
106/// Loads the data at `path` using all available [`crate::DataLoader`]s.
107///
108/// On success, returns a channel with all the [`LoadedData`]:
109/// - On native, this is filled asynchronously from other threads.
110/// - On wasm, this is pre-filled synchronously.
111///
112/// There is only one way this function can return an error: not a single [`crate::DataLoader`]
113/// (whether it is builtin, custom or external) was capable of loading the data, in which case
114/// [`DataLoaderError::Incompatible`] will be returned.
115#[cfg(not(target_arch = "wasm32"))]
116pub(crate) fn load(
117    settings: &crate::DataLoaderSettings,
118    path: &std::path::Path,
119    contents: Option<std::borrow::Cow<'_, [u8]>>,
120) -> Result<crossbeam::channel::Receiver<LoadedData>, DataLoaderError> {
121    re_tracing::profile_function!(path.display().to_string());
122
123    // On native we run loaders in parallel so this needs to become static.
124    let contents: Option<std::sync::Arc<std::borrow::Cow<'static, [u8]>>> =
125        contents.map(|contents| std::sync::Arc::new(Cow::Owned(contents.into_owned())));
126
127    let rx_loader = {
128        let (tx_loader, rx_loader) = crossbeam::channel::bounded(1024);
129
130        let any_compatible_loader = {
131            #[derive(PartialEq, Eq)]
132            struct CompatibleLoaderFound;
133            let (tx_feedback, rx_feedback) =
134                crossbeam::channel::bounded::<CompatibleLoaderFound>(128);
135
136            // When loading a file type with native support (.rrd, .mcap, .png, …)
137            // then we don't need the overhead and noise of external data loaders:
138            // See <https://github.com/rerun-io/rerun/issues/6530>.
139            let loaders = {
140                use rayon::iter::Either;
141
142                use crate::DataLoader as _;
143
144                let extension = crate::extension(path);
145                if crate::is_supported_file_extension(&extension) {
146                    Either::Left(
147                        crate::iter_loaders()
148                            .filter(|loader| loader.name() != crate::ExternalLoader.name()),
149                    )
150                } else {
151                    // We need to use an external dataloader
152                    Either::Right(crate::iter_loaders())
153                }
154            };
155
156            for loader in loaders {
157                let loader = std::sync::Arc::clone(&loader);
158
159                let settings = settings.clone();
160                let path = path.to_owned();
161                let contents = contents.clone(); // arc
162
163                let tx_loader = tx_loader.clone();
164                let tx_feedback = tx_feedback.clone();
165
166                rayon::spawn(move || {
167                    re_tracing::profile_scope!("inner", loader.name());
168
169                    if let Some(contents) = contents.as_deref() {
170                        let contents = Cow::Borrowed(contents.as_ref());
171
172                        if let Err(err) = loader.load_from_file_contents(
173                            &settings,
174                            path.clone(),
175                            contents,
176                            tx_loader,
177                        ) {
178                            if err.is_incompatible() {
179                                return;
180                            }
181                            re_log::error!(?path, loader = loader.name(), %err, "Failed to load data");
182                        }
183                    } else if let Err(err) =
184                        loader.load_from_path(&settings, path.clone(), tx_loader)
185                    {
186                        if err.is_incompatible() {
187                            return;
188                        }
189                        re_log::error!(?path, loader = loader.name(), %err, "Failed to load data from file");
190                    }
191
192                    re_log::debug!(loader = loader.name(), ?path, "compatible loader found");
193                    tx_feedback.send(CompatibleLoaderFound).ok();
194                });
195            }
196
197            re_tracing::profile_wait!("compatible_loader");
198
199            drop(tx_feedback);
200
201            rx_feedback.recv() == Ok(CompatibleLoaderFound)
202        };
203
204        // Implicitly closing `tx_loader`!
205
206        any_compatible_loader.then_some(rx_loader)
207    };
208
209    if let Some(rx_loader) = rx_loader {
210        Ok(rx_loader)
211    } else {
212        Err(DataLoaderError::Incompatible(path.to_owned()))
213    }
214}
215
216/// Loads the data at `path` using all available [`crate::DataLoader`]s.
217///
218/// On success, returns a channel (pre-filled synchronously) with all the [`LoadedData`].
219///
220/// There is only one way this function can return an error: not a single [`crate::DataLoader`]
221/// (whether it is builtin, custom or external) was capable of loading the data, in which case
222/// [`DataLoaderError::Incompatible`] will be returned.
223#[cfg(target_arch = "wasm32")]
224#[expect(clippy::needless_pass_by_value)]
225pub(crate) fn load(
226    settings: &crate::DataLoaderSettings,
227    path: &std::path::Path,
228    contents: Option<std::borrow::Cow<'_, [u8]>>,
229) -> Result<crossbeam::channel::Receiver<LoadedData>, DataLoaderError> {
230    re_tracing::profile_function!(path.display().to_string());
231
232    let rx_loader = {
233        let (tx_loader, rx_loader) = crossbeam::channel::unbounded();
234
235        let any_compatible_loader = crate::iter_loaders().map(|loader| {
236            if let Some(contents) = contents.as_deref() {
237                let settings = settings.clone();
238                let tx_loader = tx_loader.clone();
239                let path = path.to_owned();
240                let contents = Cow::Borrowed(contents);
241
242                if let Err(err) = loader.load_from_file_contents(&settings, path.clone(), contents, tx_loader) {
243                    if err.is_incompatible() {
244                        return false;
245                    }
246                    re_log::error!(?path, loader = loader.name(), %err, "Failed to load data from file");
247                }
248
249                true
250            } else {
251                false
252            }
253        })
254            .reduce(|any_compatible, is_compatible| any_compatible || is_compatible)
255            .unwrap_or(false);
256
257        // Implicitly closing `tx_loader`!
258
259        any_compatible_loader.then_some(rx_loader)
260    };
261
262    if let Some(rx_loader) = rx_loader {
263        Ok(rx_loader)
264    } else {
265        Err(DataLoaderError::Incompatible(path.to_owned()))
266    }
267}
268
269/// Forwards the data in `rx_loader` to `tx`, taking care of necessary conversions, if any.
270///
271/// Runs asynchronously from another thread on native, synchronously on wasm.
272pub(crate) fn send(
273    settings: crate::DataLoaderSettings,
274    file_source: FileSource,
275    rx_loader: crossbeam::channel::Receiver<LoadedData>,
276    tx: &LogSender,
277) {
278    spawn({
279        re_tracing::profile_function!();
280
281        #[derive(Default, Debug)]
282        struct Tracked {
283            is_rrd_or_rbl: bool,
284            already_has_store_info: bool,
285        }
286
287        let mut store_info_tracker: HashMap<re_log_types::StoreId, Tracked> = HashMap::new();
288
289        let tx = tx.clone();
290        move || {
291            // ## Ignoring channel errors
292            //
293            // Not our problem whether or not the other end has hung up, but we still want to
294            // poll the channel in any case so as to make sure that the data producer
295            // doesn't get stuck.
296            for data in rx_loader {
297                let data_loader_name = data.data_loader_name().clone();
298                let msg = match data.into_log_msg() {
299                    Ok(msg) => {
300                        let store_info = match &msg {
301                            LogMsg::SetStoreInfo(set_store_info) => {
302                                Some((set_store_info.info.store_id.clone(), true))
303                            }
304                            LogMsg::ArrowMsg(store_id, _arrow_msg) => {
305                                Some((store_id.clone(), false))
306                            }
307                            LogMsg::BlueprintActivationCommand(_) => None,
308                        };
309
310                        if let Some((store_id, store_info_created)) = store_info {
311                            let tracked = store_info_tracker.entry(store_id).or_default();
312                            tracked.is_rrd_or_rbl =
313                                *data_loader_name == RrdLoader::name(&RrdLoader);
314                            tracked.already_has_store_info |= store_info_created;
315                        }
316
317                        msg
318                    }
319                    Err(err) => {
320                        re_log::error!(%err, "Couldn't serialize component data");
321                        continue;
322                    }
323                };
324                tx.send(msg.into()).ok();
325            }
326
327            for (store_id, tracked) in store_info_tracker {
328                let is_a_preexisting_recording =
329                    Some(&store_id) == settings.opened_store_id.as_ref();
330
331                // Never try to send custom store info for RRDs and RBLs, they always have their own, and
332                // it's always right.
333                let should_force_store_info = settings.force_store_info && !tracked.is_rrd_or_rbl;
334
335                let should_send_new_store_info = should_force_store_info
336                    || (!tracked.already_has_store_info && !is_a_preexisting_recording);
337
338                if should_send_new_store_info {
339                    let store_info = prepare_store_info(&store_id, file_source.clone());
340                    tx.send(store_info.into()).ok();
341                }
342            }
343
344            tx.quit(None).ok();
345        }
346    });
347}
348
349// NOTE:
350// - On native, we parallelize using `rayon`.
351// - On wasm, we serialize everything, which works because the data-loading channels are unbounded.
352
353#[cfg(not(target_arch = "wasm32"))]
354fn spawn<F>(f: F)
355where
356    F: FnOnce() + Send + 'static,
357{
358    rayon::spawn(f);
359}
360
361#[cfg(target_arch = "wasm32")]
362fn spawn<F>(f: F)
363where
364    F: FnOnce(),
365{
366    f();
367}