1#[cfg(not(target_arch = "wasm32"))]
2use crossbeam::channel::Receiver;
3use re_log_encoding::Decoder;
4use re_log_types::ApplicationId;
5
6use crate::{DataLoader as _, LoadedData};
7
8pub struct RrdLoader;
12
13impl crate::DataLoader for RrdLoader {
14 #[inline]
15 fn name(&self) -> String {
16 "rerun.data_loaders.Rrd".into()
17 }
18
19 #[cfg(not(target_arch = "wasm32"))]
20 fn load_from_path(
21 &self,
22 settings: &crate::DataLoaderSettings,
23 filepath: std::path::PathBuf,
24 tx: std::sync::mpsc::Sender<crate::LoadedData>,
25 ) -> Result<(), crate::DataLoaderError> {
26 use anyhow::Context as _;
27
28 re_tracing::profile_function!(filepath.display().to_string());
29
30 let mut extension = crate::extension(&filepath);
31 if !matches!(extension.as_str(), "rbl" | "rrd") {
32 if filepath.is_file() || filepath.is_dir() {
33 return Err(crate::DataLoaderError::Incompatible(filepath.clone()));
35 } else {
36 extension = "rbl".to_owned();
43 }
44 }
45
46 re_log::debug!(
47 ?filepath,
48 loader = self.name(),
49 "Loading rrd data from filesystem…",
50 );
51
52 match extension.as_str() {
53 "rbl" => {
54 let file = std::fs::File::open(&filepath)
59 .with_context(|| format!("Failed to open file {filepath:?}"))?;
60 let file = std::io::BufReader::new(file);
61
62 let messages = Decoder::decode_eager(file)?;
63
64 std::thread::Builder::new()
66 .name(format!("decode_and_stream({filepath:?})"))
67 .spawn({
68 let filepath = filepath.clone();
69 let settings = settings.clone();
70 move || {
71 decode_and_stream(
72 &filepath,
73 &tx,
74 messages,
75 settings
76 .opened_store_id
77 .as_ref()
78 .map(|store_id| store_id.application_id()),
79 None,
81 );
82 }
83 })
84 .with_context(|| format!("Failed to spawn IO thread for {filepath:?}"))?;
85 }
86
87 "rrd" => {
88 let retryable_reader = RetryableFileReader::new(&filepath).with_context(|| {
91 format!("failed to create retryable file reader for {filepath:?}")
92 })?;
93 let wait_for_eos = true;
94 let messages = Decoder::decode_eager_with_opts(retryable_reader, wait_for_eos)?;
95
96 std::thread::Builder::new()
98 .name(format!("decode_and_stream({filepath:?})"))
99 .spawn({
100 let filepath = filepath.clone();
101 move || {
102 decode_and_stream(
103 &filepath, &tx, messages,
104 None, None,
106 );
107 }
108 })
109 .with_context(|| format!("Failed to spawn IO thread for {filepath:?}"))?;
110 }
111 _ => unreachable!(),
112 }
113
114 Ok(())
115 }
116
117 fn load_from_file_contents(
118 &self,
119 settings: &crate::DataLoaderSettings,
120 filepath: std::path::PathBuf,
121 contents: std::borrow::Cow<'_, [u8]>,
122 tx: std::sync::mpsc::Sender<crate::LoadedData>,
123 ) -> Result<(), crate::DataLoaderError> {
124 re_tracing::profile_function!(filepath.display().to_string());
125
126 let extension = crate::extension(&filepath);
127 if !matches!(extension.as_str(), "rbl" | "rrd") {
128 return Err(crate::DataLoaderError::Incompatible(filepath));
130 }
131
132 let contents = std::io::Cursor::new(contents);
133 let messages = match Decoder::decode_eager(contents) {
134 Ok(decoder) => decoder,
135 Err(err) => match err {
136 re_log_encoding::DecodeError::Codec(
138 re_log_encoding::rrd::CodecError::NotAnRrd(_)
139 | re_log_encoding::rrd::CodecError::InvalidOptions(_),
140 ) => return Ok(()),
141 _ => return Err(err.into()),
142 },
143 };
144
145 let forced_application_id = if extension == "rbl" {
148 settings
149 .opened_store_id
150 .as_ref()
151 .map(|store_id| store_id.application_id())
152 } else {
153 None
154 };
155 let forced_recording_id = None;
156
157 decode_and_stream(
158 &filepath,
159 &tx,
160 messages,
161 forced_application_id,
162 forced_recording_id,
163 );
164
165 Ok(())
166 }
167}
168
169fn decode_and_stream(
170 filepath: &std::path::Path,
171 tx: &std::sync::mpsc::Sender<crate::LoadedData>,
172 msgs: impl Iterator<Item = Result<re_log_types::LogMsg, re_log_encoding::DecodeError>>,
173 forced_application_id: Option<&ApplicationId>,
174 forced_recording_id: Option<&String>,
175) {
176 re_tracing::profile_function!(filepath.display().to_string());
177
178 for msg in msgs {
179 let msg = match msg {
180 Ok(msg) => msg,
181 Err(err) => {
182 re_log::warn_once!("Failed to decode message in {filepath:?}: {err}");
183 continue;
184 }
185 };
186
187 let msg = if forced_application_id.is_some() || forced_recording_id.is_some() {
188 match msg {
189 re_log_types::LogMsg::SetStoreInfo(set_store_info) => {
190 let mut store_id = set_store_info.info.store_id.clone();
191 if let Some(forced_application_id) = forced_application_id {
192 store_id = store_id.with_application_id(forced_application_id.clone());
193 }
194 if let Some(forced_recording_id) = forced_recording_id {
195 store_id = store_id.with_recording_id(forced_recording_id.clone());
196 }
197
198 re_log_types::LogMsg::SetStoreInfo(re_log_types::SetStoreInfo {
199 info: re_log_types::StoreInfo {
200 store_id,
201 ..set_store_info.info
202 },
203 ..set_store_info
204 })
205 }
206
207 re_log_types::LogMsg::ArrowMsg(mut store_id, arrow_msg) => {
208 if let Some(forced_application_id) = forced_application_id {
209 store_id = store_id.with_application_id(forced_application_id.clone());
210 }
211 if let Some(forced_recording_id) = forced_recording_id {
212 store_id = store_id.with_recording_id(forced_recording_id.clone());
213 }
214
215 re_log_types::LogMsg::ArrowMsg(store_id, arrow_msg)
216 }
217
218 re_log_types::LogMsg::BlueprintActivationCommand(blueprint_activation_command) => {
219 let mut blueprint_id = blueprint_activation_command.blueprint_id.clone();
220 if let Some(forced_application_id) = forced_application_id {
221 blueprint_id =
222 blueprint_id.with_application_id(forced_application_id.clone());
223 }
224 re_log_types::LogMsg::BlueprintActivationCommand(
225 re_log_types::BlueprintActivationCommand {
226 blueprint_id,
227 ..blueprint_activation_command
228 },
229 )
230 }
231 }
232 } else {
233 msg
234 };
235
236 let data = LoadedData::LogMsg(RrdLoader::name(&RrdLoader), msg);
237 if tx.send(data).is_err() {
238 break; }
240 }
241}
242
243#[cfg(not(target_arch = "wasm32"))]
246struct RetryableFileReader {
247 reader: std::io::BufReader<std::fs::File>,
248 rx_file_notifs: Receiver<notify::Result<notify::Event>>,
249 rx_ticker: Receiver<std::time::Instant>,
250
251 #[expect(dead_code)]
252 watcher: notify::RecommendedWatcher,
253}
254
255#[cfg(not(target_arch = "wasm32"))]
256impl RetryableFileReader {
257 fn new(filepath: &std::path::Path) -> Result<Self, crate::DataLoaderError> {
258 use anyhow::Context as _;
259 use notify::{RecursiveMode, Watcher as _};
260
261 let file = std::fs::File::open(filepath)
262 .with_context(|| format!("Failed to open file {filepath:?}"))?;
263 let reader = std::io::BufReader::new(file);
264
265 #[cfg(not(any(target_os = "windows", target_arch = "wasm32")))]
266 re_crash_handler::sigint::track_sigint();
267
268 let rx_ticker = crossbeam::channel::tick(std::time::Duration::from_millis(50));
271
272 let (tx_file_notifs, rx_file_notifs) = crossbeam::channel::unbounded();
273 let mut watcher = notify::recommended_watcher(tx_file_notifs)
274 .with_context(|| format!("failed to create file watcher for {filepath:?}"))?;
275
276 watcher
277 .watch(filepath, RecursiveMode::NonRecursive)
278 .with_context(|| format!("failed to watch file changes on {filepath:?}"))?;
279
280 Ok(Self {
281 reader,
282 rx_file_notifs,
283 rx_ticker,
284 watcher,
285 })
286 }
287}
288
289#[cfg(not(target_arch = "wasm32"))]
290impl std::io::Read for RetryableFileReader {
291 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
292 loop {
293 match self.reader.read(buf) {
294 Ok(0) => {
295 self.block_until_file_changes()?;
296 }
297 Ok(n) => {
298 return Ok(n);
299 }
300 Err(err) => {
301 if err.kind() == std::io::ErrorKind::Interrupted {
302 return Err(err);
303 }
304 }
305 }
306 }
307 }
308}
309
310#[cfg(not(target_arch = "wasm32"))]
311impl std::io::BufRead for RetryableFileReader {
312 fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
313 self.reader.fill_buf()
314 }
315
316 fn consume(&mut self, amount: usize) {
317 self.reader.consume(amount);
318 }
319}
320
321#[cfg(not(target_arch = "wasm32"))]
322impl RetryableFileReader {
323 fn block_until_file_changes(&self) -> std::io::Result<usize> {
324 loop {
325 crossbeam::select! {
326 recv(self.rx_ticker) -> _ => {
328 if re_crash_handler::sigint::was_sigint_ever_caught() {
329 return Err(std::io::Error::new(std::io::ErrorKind::Interrupted, "SIGINT"));
330 }
331 }
332
333 recv(self.rx_file_notifs) -> res => {
335 return match res {
336 Ok(Ok(event)) => match event.kind {
337 notify::EventKind::Remove(_) => Err(std::io::Error::new(
338 std::io::ErrorKind::NotFound,
339 "file removed",
340 )),
341 _ => Ok(0),
342 },
343 Ok(Err(err)) => Err(std::io::Error::other(err)),
344 Err(err) => Err(std::io::Error::other(err)),
345 }
346 }
347 }
348 }
349 }
350}
351
352#[cfg(test)]
353mod tests {
354 use re_chunk::RowId;
355 use re_log_encoding::Encoder;
356 use re_log_types::{LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource};
357
358 use super::*;
359
360 struct DeleteOnDrop {
361 path: std::path::PathBuf,
362 }
363
364 impl Drop for DeleteOnDrop {
365 fn drop(&mut self) {
366 std::fs::remove_file(&self.path).ok();
367 }
368 }
369
370 #[test]
371 fn test_loading_with_retryable_reader() {
372 let rrd_file_path = std::path::PathBuf::from("testfile.rrd");
374 let rrd_file_delete_guard = DeleteOnDrop {
375 path: rrd_file_path.clone(),
376 };
377 std::fs::remove_file(&rrd_file_path).ok(); let rrd_file = std::fs::OpenOptions::new()
379 .create_new(true)
380 .write(true)
381 .open(rrd_file_path.to_str().unwrap())
382 .unwrap();
383
384 let mut encoder = Encoder::new_eager(
385 re_build_info::CrateVersion::LOCAL,
386 re_log_encoding::rrd::EncodingOptions::PROTOBUF_UNCOMPRESSED,
387 rrd_file,
388 )
389 .unwrap();
390
391 fn new_message() -> LogMsg {
392 LogMsg::SetStoreInfo(SetStoreInfo {
393 row_id: *RowId::new(),
394 info: StoreInfo::new(
395 StoreId::random(StoreKind::Recording, "test_app"),
396 StoreSource::RustSdk {
397 rustc_version: String::new(),
398 llvm_version: String::new(),
399 },
400 ),
401 })
402 }
403
404 let messages = (0..5).map(|_| new_message()).collect::<Vec<_>>();
405
406 for m in &messages {
407 encoder.append(m).expect("failed to append message");
408 }
409 encoder.flush_blocking().expect("failed to flush messages");
410
411 let reader = RetryableFileReader::new(&rrd_file_path).unwrap();
412 let wait_for_eos = true;
413 let mut decoder = Decoder::decode_eager_with_opts(reader, wait_for_eos).unwrap();
414
415 let decoded_messages = (0..5)
417 .map(|_| decoder.next().unwrap().unwrap())
418 .collect::<Vec<_>>();
419 assert_eq!(messages, decoded_messages);
420
421 let decoder_handle = std::thread::Builder::new()
423 .name("background decoder".into())
424 .spawn(move || {
425 let mut remaining = Vec::new();
426 for msg in decoder {
427 let msg = msg.unwrap();
428 remaining.push(msg);
429 }
430
431 remaining
432 })
433 .unwrap();
434
435 let more_messages = (0..100).map(|_| new_message()).collect::<Vec<_>>();
437 for m in &more_messages {
438 encoder.append(m).unwrap();
439 }
440 encoder.finish().expect("failed to finish encoder");
447 drop(encoder);
448
449 let remaining_messages = decoder_handle.join().unwrap();
450 assert_eq!(more_messages, remaining_messages);
451
452 drop(rrd_file_delete_guard);
454 }
455}