use std::borrow::Cow;
use re_log_types::{FileSource, LogMsg};
use re_smart_channel::Sender;
use crate::{DataLoaderError, LoadedData};
#[cfg(not(target_arch = "wasm32"))]
pub fn load_from_path(
store_id: &re_log_types::StoreId,
file_source: FileSource,
path: &std::path::Path,
tx: &Sender<LogMsg>,
) -> Result<(), DataLoaderError> {
re_tracing::profile_function!(path.to_string_lossy());
if !path.exists() {
return Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"path does not exist: {path:?}",
)
.into());
}
re_log::info!("Loading {path:?}…");
let data = load(store_id, path, None)?;
let store_info = prepare_store_info(store_id, file_source, path);
if let Some(store_info) = store_info {
if tx.send(store_info).is_err() {
return Ok(()); }
}
send(store_id, data, tx);
Ok(())
}
pub fn load_from_file_contents(
store_id: &re_log_types::StoreId,
file_source: FileSource,
filepath: &std::path::Path,
contents: std::borrow::Cow<'_, [u8]>,
tx: &Sender<LogMsg>,
) -> Result<(), DataLoaderError> {
re_tracing::profile_function!(filepath.to_string_lossy());
re_log::info!("Loading {filepath:?}…");
let data = load(store_id, filepath, Some(contents))?;
let store_info = prepare_store_info(store_id, file_source, filepath);
if let Some(store_info) = store_info {
if tx.send(store_info).is_err() {
return Ok(()); }
}
send(store_id, data, tx);
Ok(())
}
#[inline]
pub fn extension(path: &std::path::Path) -> String {
path.extension()
.unwrap_or_default()
.to_ascii_lowercase()
.to_string_lossy()
.to_string()
}
pub(crate) fn prepare_store_info(
store_id: &re_log_types::StoreId,
file_source: FileSource,
path: &std::path::Path,
) -> Option<LogMsg> {
re_tracing::profile_function!(path.display().to_string());
use re_log_types::SetStoreInfo;
let app_id = re_log_types::ApplicationId(path.display().to_string());
let store_source = re_log_types::StoreSource::File { file_source };
let is_rrd = crate::SUPPORTED_RERUN_EXTENSIONS.contains(&extension(path).as_str());
(!is_rrd).then(|| {
LogMsg::SetStoreInfo(SetStoreInfo {
row_id: re_log_types::RowId::new(),
info: re_log_types::StoreInfo {
application_id: app_id.clone(),
store_id: store_id.clone(),
is_official_example: false,
started: re_log_types::Time::now(),
store_source,
store_kind: re_log_types::StoreKind::Recording,
},
})
})
}
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn load(
store_id: &re_log_types::StoreId,
path: &std::path::Path,
contents: Option<std::borrow::Cow<'_, [u8]>>,
) -> Result<std::sync::mpsc::Receiver<LoadedData>, DataLoaderError> {
re_tracing::profile_function!(path.display().to_string());
let contents: Option<std::sync::Arc<std::borrow::Cow<'static, [u8]>>> =
contents.map(|contents| std::sync::Arc::new(Cow::Owned(contents.into_owned())));
let rx_loader = {
let (tx_loader, rx_loader) = std::sync::mpsc::channel();
let any_compatible_loader = {
#[derive(PartialEq, Eq)]
struct CompatibleLoaderFound;
let (tx_feedback, rx_feedback) = std::sync::mpsc::channel::<CompatibleLoaderFound>();
for loader in crate::iter_loaders() {
let loader = std::sync::Arc::clone(&loader);
let store_id = store_id.clone();
let path = path.to_owned();
let contents = contents.clone();
let tx_loader = tx_loader.clone();
let tx_feedback = tx_feedback.clone();
rayon::spawn(move || {
re_tracing::profile_scope!("inner", loader.name());
if let Some(contents) = contents.as_deref() {
let contents = Cow::Borrowed(contents.as_ref());
if let Err(err) = loader.load_from_file_contents(
store_id,
path.clone(),
contents,
tx_loader,
) {
if err.is_incompatible() {
return;
}
re_log::error!(?path, loader = loader.name(), %err, "Failed to load data");
}
} else if let Err(err) =
loader.load_from_path(store_id, path.clone(), tx_loader)
{
if err.is_incompatible() {
return;
}
re_log::error!(?path, loader = loader.name(), %err, "Failed to load data from file");
}
re_log::debug!(loader = loader.name(), ?path, "compatible loader found");
tx_feedback.send(CompatibleLoaderFound).ok();
});
}
re_tracing::profile_wait!("compatible_loader");
drop(tx_feedback);
rx_feedback.recv() == Ok(CompatibleLoaderFound)
};
any_compatible_loader.then_some(rx_loader)
};
if let Some(rx_loader) = rx_loader {
Ok(rx_loader)
} else {
Err(DataLoaderError::Incompatible(path.to_owned()))
}
}
#[cfg(target_arch = "wasm32")]
#[allow(clippy::needless_pass_by_value)]
pub(crate) fn load(
store_id: &re_log_types::StoreId,
path: &std::path::Path,
contents: Option<std::borrow::Cow<'_, [u8]>>,
) -> Result<std::sync::mpsc::Receiver<LoadedData>, DataLoaderError> {
re_tracing::profile_function!(path.display().to_string());
let rx_loader = {
let (tx_loader, rx_loader) = std::sync::mpsc::channel();
let any_compatible_loader = crate::iter_loaders().map(|loader| {
if let Some(contents) = contents.as_deref() {
let store_id = store_id.clone();
let tx_loader = tx_loader.clone();
let path = path.to_owned();
let contents = Cow::Borrowed(contents);
if let Err(err) = loader.load_from_file_contents(store_id, path.clone(), contents, tx_loader) {
if err.is_incompatible() {
return false;
}
re_log::error!(?path, loader = loader.name(), %err, "Failed to load data from file");
}
true
} else {
false
}
})
.reduce(|any_compatible, is_compatible| any_compatible || is_compatible)
.unwrap_or(false);
any_compatible_loader.then_some(rx_loader)
};
if let Some(rx_loader) = rx_loader {
Ok(rx_loader)
} else {
Err(DataLoaderError::Incompatible(path.to_owned()))
}
}
pub(crate) fn send(
store_id: &re_log_types::StoreId,
rx_loader: std::sync::mpsc::Receiver<LoadedData>,
tx: &Sender<LogMsg>,
) {
spawn({
re_tracing::profile_function!();
let tx = tx.clone();
let store_id = store_id.clone();
move || {
for data in rx_loader {
let msg = match data.into_log_msg(&store_id) {
Ok(msg) => msg,
Err(err) => {
re_log::error!(%err, %store_id, "Couldn't serialize component data");
continue;
}
};
tx.send(msg).ok();
}
tx.quit(None).ok();
}
});
}
#[cfg(not(target_arch = "wasm32"))]
fn spawn<F>(f: F)
where
F: FnOnce() + Send + 'static,
{
rayon::spawn(f);
}
#[cfg(target_arch = "wasm32")]
fn spawn<F>(f: F)
where
F: FnOnce(),
{
f();
}