Skip to main content

re_data_loader/
loader_lerobot.rs

1use std::thread;
2
3use anyhow::{Context as _, anyhow};
4use crossbeam::channel::Sender;
5
6use crate::lerobot::{LeRobotDatasetVersion, datasetv2, datasetv3, is_lerobot_dataset};
7use crate::{DataLoader, DataLoaderError, LoadedData};
8
9/// A [`DataLoader`] for `LeRobot` datasets.
10///
11/// An example dataset which can be loaded can be found on Hugging Face: [lerobot/pusht_image](https://huggingface.co/datasets/lerobot/pusht_image)
12pub struct LeRobotDatasetLoader;
13
14impl DataLoader for LeRobotDatasetLoader {
15    fn name(&self) -> String {
16        "LeRobotDatasetLoader".into()
17    }
18
19    fn load_from_path(
20        &self,
21        settings: &crate::DataLoaderSettings,
22        filepath: std::path::PathBuf,
23        tx: Sender<LoadedData>,
24    ) -> Result<(), DataLoaderError> {
25        if !is_lerobot_dataset(&filepath) {
26            return Err(DataLoaderError::Incompatible(filepath));
27        }
28
29        let version = LeRobotDatasetVersion::find_version(&filepath)
30            .ok_or_else(|| anyhow!("Could not determine LeRobot dataset version"))?;
31
32        match version {
33            LeRobotDatasetVersion::V1 => {
34                re_log::error!("LeRobot 'v1.x' dataset format is unsupported.");
35                Ok(())
36            }
37            LeRobotDatasetVersion::V2 => Self::load_v2_dataset(settings, filepath, tx),
38            LeRobotDatasetVersion::V3 => Self::load_v3_dataset(settings, filepath, tx),
39        }
40    }
41
42    fn load_from_file_contents(
43        &self,
44        _settings: &crate::DataLoaderSettings,
45        filepath: std::path::PathBuf,
46        _contents: std::borrow::Cow<'_, [u8]>,
47        _tx: Sender<LoadedData>,
48    ) -> Result<(), DataLoaderError> {
49        Err(DataLoaderError::Incompatible(filepath))
50    }
51}
52
53impl LeRobotDatasetLoader {
54    fn load_v2_dataset(
55        settings: &crate::DataLoaderSettings,
56        filepath: impl AsRef<std::path::Path>,
57        tx: Sender<LoadedData>,
58    ) -> Result<(), DataLoaderError> {
59        let filepath = filepath.as_ref().to_owned();
60        let dataset = datasetv2::LeRobotDatasetV2::load_from_directory(&filepath)
61            .map_err(|err| anyhow!("Loading LeRobot v2 dataset failed: {err}"))?;
62
63        let application_id = settings
64            .application_id
65            .clone()
66            .unwrap_or_else(|| filepath.display().to_string().into());
67
68        let loader_name = Self.name();
69
70        // NOTE(1): `spawn` is fine, this whole function is native-only.
71        // NOTE(2): this must spawned on a dedicated thread to avoid a deadlock!
72        // `load` will spawn a bunch of loaders on the common rayon thread pool and wait for
73        // their response via channels: we cannot be waiting for these responses on the
74        // common rayon thread pool.
75        thread::Builder::new()
76            .name(format!("load_and_stream_v2({filepath:?})"))
77            .spawn(move || {
78                re_log::info!(
79                    "Loading LeRobot v2 dataset from {:?}, with {} episode(s)",
80                    dataset.path,
81                    dataset.metadata.episode_count(),
82                );
83                datasetv2::load_and_stream(&dataset, &application_id, &tx, &loader_name);
84            })
85            .with_context(|| {
86                format!("Failed to spawn IO thread to load LeRobot v2 dataset {filepath:?}")
87            })?;
88
89        Ok(())
90    }
91
92    fn load_v3_dataset(
93        settings: &crate::DataLoaderSettings,
94        filepath: impl AsRef<std::path::Path>,
95        tx: Sender<LoadedData>,
96    ) -> Result<(), DataLoaderError> {
97        let filepath = filepath.as_ref().to_owned();
98        let dataset = datasetv3::LeRobotDatasetV3::load_from_directory(&filepath)
99            .map_err(|err| anyhow!("Loading LeRobot v3 dataset failed: {err}"))?;
100
101        let application_id = settings
102            .application_id
103            .clone()
104            .unwrap_or_else(|| filepath.display().to_string().into());
105
106        let loader_name = Self.name();
107
108        // NOTE(1): `spawn` is fine, this whole function is native-only.
109        // NOTE(2): this must spawned on a dedicated thread to avoid a deadlock!
110        // `load` will spawn a bunch of loaders on the common rayon thread pool and wait for
111        // their response via channels: we cannot be waiting for these responses on the
112        // common rayon thread pool.
113        thread::Builder::new()
114            .name(format!("load_and_stream_v3({filepath:?})"))
115            .spawn(move || {
116                re_log::info!(
117                    "Loading LeRobot v3 dataset from {:?}, with {} episode(s)",
118                    dataset.path,
119                    dataset.metadata.episode_count(),
120                );
121                datasetv3::load_and_stream(&dataset, &application_id, &tx, &loader_name);
122            })
123            .with_context(|| {
124                format!("Failed to spawn IO thread to load LeRobot v3 dataset {filepath:?}")
125            })?;
126
127        Ok(())
128    }
129}