use std::borrow::Cow;
use ahash::{HashMap, HashMapExt as _};
use re_log_channel::LogSender;
use re_log_types::{ApplicationId, FileSource, LogMsg};
use crate::{ImportedData, Importer as _, ImporterError, RrdImporter};
#[cfg(not(target_arch = "wasm32"))]
pub fn import_from_path(
settings: &crate::ImporterSettings,
file_source: FileSource,
path: &std::path::Path,
tx: &LogSender,
) -> Result<(), ImporterError> {
re_tracing::profile_function!(path.to_string_lossy());
if !path.exists() {
return Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("path does not exist: {path:?}"),
)
.into());
}
re_log::info!("Loading {path:?}…");
let application_id = settings.application_id.clone().or_else(|| {
path.file_name()
.map(|f| f.to_string_lossy().to_string())
.map(ApplicationId::from)
});
let settings = crate::ImporterSettings {
force_store_info: !crate::lerobot::is_lerobot_dataset(path),
application_id,
..settings.clone()
};
let rx = import(&settings, path, None)?;
send(settings, file_source, rx, tx);
Ok(())
}
pub fn import_from_file_contents(
settings: &crate::ImporterSettings,
file_source: FileSource,
filepath: &std::path::Path,
contents: std::borrow::Cow<'_, [u8]>,
tx: &LogSender,
) -> Result<(), ImporterError> {
re_tracing::profile_function!(filepath.to_string_lossy());
re_log::info!("Loading {filepath:?}…");
let application_id = settings.application_id.clone().or_else(|| {
filepath
.file_name()
.map(|f| f.to_string_lossy().to_string())
.map(ApplicationId::from)
});
let settings = crate::ImporterSettings {
application_id,
..settings.clone()
};
let data = import(&settings, filepath, Some(contents))?;
send(settings, file_source, data, tx);
Ok(())
}
pub fn prepare_store_info(store_id: &re_log_types::StoreId, file_source: FileSource) -> LogMsg {
re_tracing::profile_function!();
use re_log_types::SetStoreInfo;
let store_source = re_log_types::StoreSource::File { file_source };
LogMsg::SetStoreInfo(SetStoreInfo {
row_id: *re_chunk::RowId::new(),
info: re_log_types::StoreInfo::new(store_id.clone(), store_source),
})
}
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn import(
settings: &crate::ImporterSettings,
path: &std::path::Path,
contents: Option<std::borrow::Cow<'_, [u8]>>,
) -> Result<crossbeam::channel::Receiver<ImportedData>, ImporterError> {
re_tracing::profile_function!(path.display().to_string());
let contents: Option<std::sync::Arc<std::borrow::Cow<'static, [u8]>>> =
contents.map(|contents| std::sync::Arc::new(Cow::Owned(contents.into_owned())));
let rx_importer = {
let (tx_importer, rx_importer) = crossbeam::channel::bounded(1024);
let any_compatible_importer = {
#[derive(Debug, PartialEq, Eq)]
struct CompatibleImporterFound;
let (tx_feedback, rx_feedback) =
crossbeam::channel::bounded::<CompatibleImporterFound>(128);
let importers = {
use rayon::iter::Either;
use crate::Importer as _;
let extension = crate::extension(path);
if crate::is_supported_file_extension(&extension) {
Either::Left(
crate::iter_importers()
.filter(|importer| importer.name() != crate::ExternalImporter.name()),
)
} else {
Either::Right(crate::iter_importers())
}
};
for importer in importers {
let importer = std::sync::Arc::clone(&importer);
let settings = settings.clone();
let path = path.to_owned();
let contents = contents.clone();
let tx_importer = tx_importer.clone();
let tx_feedback = tx_feedback.clone();
rayon::spawn(move || {
re_tracing::profile_scope!("inner", importer.name());
if let Some(contents) = contents.as_deref() {
let contents = Cow::Borrowed(contents.as_ref());
if let Err(err) = importer.import_from_file_contents(
&settings,
path.clone(),
contents,
tx_importer,
) {
if err.is_incompatible() {
return;
}
re_log::error!(?path, importer = importer.name(), %err, "Failed to import data");
}
} else if let Err(err) =
importer.import_from_path(&settings, path.clone(), tx_importer)
{
if err.is_incompatible() {
return;
}
re_log::error!(?path, importer = importer.name(), %err, "Failed to import data from file");
}
re_log::debug!(
importer = importer.name(),
?path,
"compatible importer found"
);
re_quota_channel::send_crossbeam(&tx_feedback, CompatibleImporterFound).ok();
});
}
re_tracing::profile_wait!("compatible_importer");
drop(tx_feedback);
rx_feedback.recv() == Ok(CompatibleImporterFound)
};
any_compatible_importer.then_some(rx_importer)
};
if let Some(rx_importer) = rx_importer {
Ok(rx_importer)
} else {
Err(ImporterError::Incompatible(path.to_owned()))
}
}
#[cfg(target_arch = "wasm32")]
#[expect(clippy::needless_pass_by_value)]
pub(crate) fn import(
settings: &crate::ImporterSettings,
path: &std::path::Path,
contents: Option<std::borrow::Cow<'_, [u8]>>,
) -> Result<crossbeam::channel::Receiver<ImportedData>, ImporterError> {
re_tracing::profile_function!(path.display().to_string());
let rx_importer = {
let (tx_importer, rx_importer) = crossbeam::channel::unbounded();
let any_compatible_importer = crate::iter_importers().any(|importer| {
if let Some(contents) = contents.as_deref() {
let settings = settings.clone();
let tx_importer = tx_importer.clone();
let path = path.to_owned();
let contents = Cow::Borrowed(contents);
if let Err(err) = importer.import_from_file_contents(&settings, path.clone(), contents, tx_importer) {
if err.is_incompatible() {
return false;
}
re_log::error!(?path, importer = importer.name(), %err, "Failed to import data from file");
}
true
} else {
false
}
});
any_compatible_importer.then_some(rx_importer)
};
if let Some(rx_importer) = rx_importer {
Ok(rx_importer)
} else {
Err(ImporterError::Incompatible(path.to_owned()))
}
}
pub(crate) fn send(
settings: crate::ImporterSettings,
file_source: FileSource,
rx_importer: crossbeam::channel::Receiver<ImportedData>,
tx: &LogSender,
) {
spawn({
re_tracing::profile_function!();
#[derive(Default, Debug)]
struct Tracked {
is_rrd_or_rbl: bool,
already_has_store_info: bool,
}
let mut store_info_tracker: HashMap<re_log_types::StoreId, Tracked> = HashMap::new();
let tx = tx.clone();
move || {
for data in rx_importer {
let importer_name = data.importer_name().clone();
let msg = match data.into_log_msg() {
Ok(msg) => {
let store_info = match &msg {
LogMsg::SetStoreInfo(set_store_info) => {
Some((set_store_info.info.store_id.clone(), true))
}
LogMsg::ArrowMsg(store_id, _arrow_msg) => {
Some((store_id.clone(), false))
}
LogMsg::BlueprintActivationCommand(_) => None,
};
if let Some((store_id, store_info_created)) = store_info {
let tracked = store_info_tracker.entry(store_id).or_default();
tracked.is_rrd_or_rbl =
*importer_name == RrdImporter::name(&RrdImporter);
tracked.already_has_store_info |= store_info_created;
}
msg
}
Err(err) => {
re_log::error!(%err, "Couldn't serialize component data");
continue;
}
};
tx.send(msg.into()).ok();
}
for (store_id, tracked) in store_info_tracker {
let is_a_preexisting_recording =
Some(&store_id) == settings.opened_store_id.as_ref();
let should_force_store_info = settings.force_store_info && !tracked.is_rrd_or_rbl;
let should_send_new_store_info = should_force_store_info
|| (!tracked.already_has_store_info && !is_a_preexisting_recording);
if should_send_new_store_info {
let store_info = prepare_store_info(&store_id, file_source.clone());
tx.send(store_info.into()).ok();
}
}
tx.quit(None).ok();
}
});
}
#[cfg(not(target_arch = "wasm32"))]
fn spawn<F>(f: F)
where
F: FnOnce() + Send + 'static,
{
if 1 < rayon::current_num_threads() {
rayon::spawn(f);
} else {
std::thread::Builder::new()
.name("importer".to_owned())
.spawn(f)
.expect("Failed to spawn a thread");
}
}
#[cfg(target_arch = "wasm32")]
fn spawn<F>(f: F)
where
F: FnOnce(),
{
f();
}