1use std::borrow::Cow;
2
3use ahash::{HashMap, HashMapExt as _};
4use re_log_channel::LogSender;
5use re_log_types::{FileSource, LogMsg};
6
7use crate::{DataLoader as _, DataLoaderError, LoadedData, RrdLoader};
8
9#[cfg(not(target_arch = "wasm32"))]
19pub fn load_from_path(
20 settings: &crate::DataLoaderSettings,
21 file_source: FileSource,
22 path: &std::path::Path,
23 tx: &LogSender,
25) -> Result<(), DataLoaderError> {
26 use re_log_types::ApplicationId;
27
28 re_tracing::profile_function!(path.to_string_lossy());
29
30 if !path.exists() {
31 return Err(std::io::Error::new(
32 std::io::ErrorKind::NotFound,
33 format!("path does not exist: {path:?}"),
34 )
35 .into());
36 }
37
38 re_log::info!("Loading {path:?}…");
39
40 let application_id = settings.application_id.clone().or_else(|| {
42 path.file_name()
43 .map(|f| f.to_string_lossy().to_string())
44 .map(ApplicationId::from)
45 });
46 let settings = &crate::DataLoaderSettings {
47 force_store_info: !crate::lerobot::is_lerobot_dataset(path),
49 application_id,
50 ..settings.clone()
51 };
52
53 let rx = load(settings, path, None)?;
54
55 send(settings.clone(), file_source, rx, tx);
56
57 Ok(())
58}
59
60pub fn load_from_file_contents(
69 settings: &crate::DataLoaderSettings,
70 file_source: FileSource,
71 filepath: &std::path::Path,
72 contents: std::borrow::Cow<'_, [u8]>,
73 tx: &LogSender,
75) -> Result<(), DataLoaderError> {
76 re_tracing::profile_function!(filepath.to_string_lossy());
77
78 re_log::info!("Loading {filepath:?}…");
79
80 let data = load(settings, filepath, Some(contents))?;
81
82 send(settings.clone(), file_source, data, tx);
83
84 Ok(())
85}
86
87pub(crate) fn prepare_store_info(
91 store_id: &re_log_types::StoreId,
92 file_source: FileSource,
93) -> LogMsg {
94 re_tracing::profile_function!();
95
96 use re_log_types::SetStoreInfo;
97
98 let store_source = re_log_types::StoreSource::File { file_source };
99
100 LogMsg::SetStoreInfo(SetStoreInfo {
101 row_id: *re_chunk::RowId::new(),
102 info: re_log_types::StoreInfo::new(store_id.clone(), store_source),
103 })
104}
105
106#[cfg(not(target_arch = "wasm32"))]
116pub(crate) fn load(
117 settings: &crate::DataLoaderSettings,
118 path: &std::path::Path,
119 contents: Option<std::borrow::Cow<'_, [u8]>>,
120) -> Result<crossbeam::channel::Receiver<LoadedData>, DataLoaderError> {
121 re_tracing::profile_function!(path.display().to_string());
122
123 let contents: Option<std::sync::Arc<std::borrow::Cow<'static, [u8]>>> =
125 contents.map(|contents| std::sync::Arc::new(Cow::Owned(contents.into_owned())));
126
127 let rx_loader = {
128 let (tx_loader, rx_loader) = crossbeam::channel::bounded(1024);
129
130 let any_compatible_loader = {
131 #[derive(PartialEq, Eq)]
132 struct CompatibleLoaderFound;
133 let (tx_feedback, rx_feedback) =
134 crossbeam::channel::bounded::<CompatibleLoaderFound>(128);
135
136 let loaders = {
140 use rayon::iter::Either;
141
142 use crate::DataLoader as _;
143
144 let extension = crate::extension(path);
145 if crate::is_supported_file_extension(&extension) {
146 Either::Left(
147 crate::iter_loaders()
148 .filter(|loader| loader.name() != crate::ExternalLoader.name()),
149 )
150 } else {
151 Either::Right(crate::iter_loaders())
153 }
154 };
155
156 for loader in loaders {
157 let loader = std::sync::Arc::clone(&loader);
158
159 let settings = settings.clone();
160 let path = path.to_owned();
161 let contents = contents.clone(); let tx_loader = tx_loader.clone();
164 let tx_feedback = tx_feedback.clone();
165
166 rayon::spawn(move || {
167 re_tracing::profile_scope!("inner", loader.name());
168
169 if let Some(contents) = contents.as_deref() {
170 let contents = Cow::Borrowed(contents.as_ref());
171
172 if let Err(err) = loader.load_from_file_contents(
173 &settings,
174 path.clone(),
175 contents,
176 tx_loader,
177 ) {
178 if err.is_incompatible() {
179 return;
180 }
181 re_log::error!(?path, loader = loader.name(), %err, "Failed to load data");
182 }
183 } else if let Err(err) =
184 loader.load_from_path(&settings, path.clone(), tx_loader)
185 {
186 if err.is_incompatible() {
187 return;
188 }
189 re_log::error!(?path, loader = loader.name(), %err, "Failed to load data from file");
190 }
191
192 re_log::debug!(loader = loader.name(), ?path, "compatible loader found");
193 tx_feedback.send(CompatibleLoaderFound).ok();
194 });
195 }
196
197 re_tracing::profile_wait!("compatible_loader");
198
199 drop(tx_feedback);
200
201 rx_feedback.recv() == Ok(CompatibleLoaderFound)
202 };
203
204 any_compatible_loader.then_some(rx_loader)
207 };
208
209 if let Some(rx_loader) = rx_loader {
210 Ok(rx_loader)
211 } else {
212 Err(DataLoaderError::Incompatible(path.to_owned()))
213 }
214}
215
216#[cfg(target_arch = "wasm32")]
224#[expect(clippy::needless_pass_by_value)]
225pub(crate) fn load(
226 settings: &crate::DataLoaderSettings,
227 path: &std::path::Path,
228 contents: Option<std::borrow::Cow<'_, [u8]>>,
229) -> Result<crossbeam::channel::Receiver<LoadedData>, DataLoaderError> {
230 re_tracing::profile_function!(path.display().to_string());
231
232 let rx_loader = {
233 let (tx_loader, rx_loader) = crossbeam::channel::unbounded();
234
235 let any_compatible_loader = crate::iter_loaders().map(|loader| {
236 if let Some(contents) = contents.as_deref() {
237 let settings = settings.clone();
238 let tx_loader = tx_loader.clone();
239 let path = path.to_owned();
240 let contents = Cow::Borrowed(contents);
241
242 if let Err(err) = loader.load_from_file_contents(&settings, path.clone(), contents, tx_loader) {
243 if err.is_incompatible() {
244 return false;
245 }
246 re_log::error!(?path, loader = loader.name(), %err, "Failed to load data from file");
247 }
248
249 true
250 } else {
251 false
252 }
253 })
254 .reduce(|any_compatible, is_compatible| any_compatible || is_compatible)
255 .unwrap_or(false);
256
257 any_compatible_loader.then_some(rx_loader)
260 };
261
262 if let Some(rx_loader) = rx_loader {
263 Ok(rx_loader)
264 } else {
265 Err(DataLoaderError::Incompatible(path.to_owned()))
266 }
267}
268
269pub(crate) fn send(
273 settings: crate::DataLoaderSettings,
274 file_source: FileSource,
275 rx_loader: crossbeam::channel::Receiver<LoadedData>,
276 tx: &LogSender,
277) {
278 spawn({
279 re_tracing::profile_function!();
280
281 #[derive(Default, Debug)]
282 struct Tracked {
283 is_rrd_or_rbl: bool,
284 already_has_store_info: bool,
285 }
286
287 let mut store_info_tracker: HashMap<re_log_types::StoreId, Tracked> = HashMap::new();
288
289 let tx = tx.clone();
290 move || {
291 for data in rx_loader {
297 let data_loader_name = data.data_loader_name().clone();
298 let msg = match data.into_log_msg() {
299 Ok(msg) => {
300 let store_info = match &msg {
301 LogMsg::SetStoreInfo(set_store_info) => {
302 Some((set_store_info.info.store_id.clone(), true))
303 }
304 LogMsg::ArrowMsg(store_id, _arrow_msg) => {
305 Some((store_id.clone(), false))
306 }
307 LogMsg::BlueprintActivationCommand(_) => None,
308 };
309
310 if let Some((store_id, store_info_created)) = store_info {
311 let tracked = store_info_tracker.entry(store_id).or_default();
312 tracked.is_rrd_or_rbl =
313 *data_loader_name == RrdLoader::name(&RrdLoader);
314 tracked.already_has_store_info |= store_info_created;
315 }
316
317 msg
318 }
319 Err(err) => {
320 re_log::error!(%err, "Couldn't serialize component data");
321 continue;
322 }
323 };
324 tx.send(msg.into()).ok();
325 }
326
327 for (store_id, tracked) in store_info_tracker {
328 let is_a_preexisting_recording =
329 Some(&store_id) == settings.opened_store_id.as_ref();
330
331 let should_force_store_info = settings.force_store_info && !tracked.is_rrd_or_rbl;
334
335 let should_send_new_store_info = should_force_store_info
336 || (!tracked.already_has_store_info && !is_a_preexisting_recording);
337
338 if should_send_new_store_info {
339 let store_info = prepare_store_info(&store_id, file_source.clone());
340 tx.send(store_info.into()).ok();
341 }
342 }
343
344 tx.quit(None).ok();
345 }
346 });
347}
348
349#[cfg(not(target_arch = "wasm32"))]
354fn spawn<F>(f: F)
355where
356 F: FnOnce() + Send + 'static,
357{
358 rayon::spawn(f);
359}
360
361#[cfg(target_arch = "wasm32")]
362fn spawn<F>(f: F)
363where
364 F: FnOnce(),
365{
366 f();
367}