re_data_loader/
loader_rrd.rs

1#[cfg(not(target_arch = "wasm32"))]
2use crossbeam::channel::Receiver;
3use re_log_encoding::Decoder;
4use re_log_types::ApplicationId;
5
6use crate::{DataLoader as _, LoadedData};
7
8// ---
9
10/// Loads data from any `rrd` file or in-memory contents.
11pub struct RrdLoader;
12
13impl crate::DataLoader for RrdLoader {
14    #[inline]
15    fn name(&self) -> String {
16        "rerun.data_loaders.Rrd".into()
17    }
18
19    #[cfg(not(target_arch = "wasm32"))]
20    fn load_from_path(
21        &self,
22        settings: &crate::DataLoaderSettings,
23        filepath: std::path::PathBuf,
24        tx: std::sync::mpsc::Sender<crate::LoadedData>,
25    ) -> Result<(), crate::DataLoaderError> {
26        use anyhow::Context as _;
27
28        re_tracing::profile_function!(filepath.display().to_string());
29
30        let mut extension = crate::extension(&filepath);
31        if !matches!(extension.as_str(), "rbl" | "rrd") {
32            if filepath.is_file() || filepath.is_dir() {
33                // NOTE: blueprints and recordings have the same file format
34                return Err(crate::DataLoaderError::Incompatible(filepath.clone()));
35            } else {
36                // NOTE(1): If this is some kind of virtual file (fifo, socket, pipe, etc), then we
37                // always assume it's an RRD stream by default.
38                //
39                // NOTE(2): Because waiting for an end-of-stream marker on a pipe doesn't make sense,
40                // we tag it as `rbl` instead of `rrd` (but really this just means: please don't block
41                // indefinitely).
42                extension = "rbl".to_owned();
43            }
44        }
45
46        re_log::debug!(
47            ?filepath,
48            loader = self.name(),
49            "Loading rrd data from filesystem…",
50        );
51
52        match extension.as_str() {
53            "rbl" => {
54                // We assume .rbl is not streamed and no retrying after seeing EOF is needed.
55                // Otherwise we'd risk retrying to read .rbl file that has no end-of-stream header and
56                // blocking the UI update thread indefinitely and making the viewer unresponsive (as .rbl
57                // files are sometimes read on UI update).
58                let file = std::fs::File::open(&filepath)
59                    .with_context(|| format!("Failed to open file {filepath:?}"))?;
60                let file = std::io::BufReader::new(file);
61
62                let messages = Decoder::decode_eager(file)?;
63
64                // NOTE: This is IO bound, it must run on a dedicated thread, not the shared rayon thread pool.
65                std::thread::Builder::new()
66                    .name(format!("decode_and_stream({filepath:?})"))
67                    .spawn({
68                        let filepath = filepath.clone();
69                        let settings = settings.clone();
70                        move || {
71                            decode_and_stream(
72                                &filepath,
73                                &tx,
74                                messages,
75                                settings
76                                    .opened_store_id
77                                    .as_ref()
78                                    .map(|store_id| store_id.application_id()),
79                                // We never want to patch blueprints' store IDs, only their app IDs.
80                                None,
81                            );
82                        }
83                    })
84                    .with_context(|| format!("Failed to spawn IO thread for {filepath:?}"))?;
85            }
86
87            "rrd" => {
88                // For .rrd files we retry reading despite reaching EOF to support live (writer) streaming.
89                // Decoder will give up when it sees end of file marker (i.e. end-of-stream message header)
90                let retryable_reader = RetryableFileReader::new(&filepath).with_context(|| {
91                    format!("failed to create retryable file reader for {filepath:?}")
92                })?;
93                let wait_for_eos = true;
94                let messages = Decoder::decode_eager_with_opts(retryable_reader, wait_for_eos)?;
95
96                // NOTE: This is IO bound, it must run on a dedicated thread, not the shared rayon thread pool.
97                std::thread::Builder::new()
98                    .name(format!("decode_and_stream({filepath:?})"))
99                    .spawn({
100                        let filepath = filepath.clone();
101                        move || {
102                            decode_and_stream(
103                                &filepath, &tx, messages,
104                                // Never use import semantics for .rrd files
105                                None, None,
106                            );
107                        }
108                    })
109                    .with_context(|| format!("Failed to spawn IO thread for {filepath:?}"))?;
110            }
111            _ => unreachable!(),
112        }
113
114        Ok(())
115    }
116
117    fn load_from_file_contents(
118        &self,
119        settings: &crate::DataLoaderSettings,
120        filepath: std::path::PathBuf,
121        contents: std::borrow::Cow<'_, [u8]>,
122        tx: std::sync::mpsc::Sender<crate::LoadedData>,
123    ) -> Result<(), crate::DataLoaderError> {
124        re_tracing::profile_function!(filepath.display().to_string());
125
126        let extension = crate::extension(&filepath);
127        if !matches!(extension.as_str(), "rbl" | "rrd") {
128            // NOTE: blueprints and recordings has the same file format
129            return Err(crate::DataLoaderError::Incompatible(filepath));
130        }
131
132        let contents = std::io::Cursor::new(contents);
133        let messages = match Decoder::decode_eager(contents) {
134            Ok(decoder) => decoder,
135            Err(err) => match err {
136                // simply not interested
137                re_log_encoding::DecodeError::Codec(
138                    re_log_encoding::rrd::CodecError::NotAnRrd(_)
139                    | re_log_encoding::rrd::CodecError::InvalidOptions(_),
140                ) => return Ok(()),
141                _ => return Err(err.into()),
142            },
143        };
144
145        // * We never want to patch blueprints' store IDs, only their app IDs.
146        // * We neer use import semantics at all for .rrd files.
147        let forced_application_id = if extension == "rbl" {
148            settings
149                .opened_store_id
150                .as_ref()
151                .map(|store_id| store_id.application_id())
152        } else {
153            None
154        };
155        let forced_recording_id = None;
156
157        decode_and_stream(
158            &filepath,
159            &tx,
160            messages,
161            forced_application_id,
162            forced_recording_id,
163        );
164
165        Ok(())
166    }
167}
168
169fn decode_and_stream(
170    filepath: &std::path::Path,
171    tx: &std::sync::mpsc::Sender<crate::LoadedData>,
172    msgs: impl Iterator<Item = Result<re_log_types::LogMsg, re_log_encoding::DecodeError>>,
173    forced_application_id: Option<&ApplicationId>,
174    forced_recording_id: Option<&String>,
175) {
176    re_tracing::profile_function!(filepath.display().to_string());
177
178    for msg in msgs {
179        let msg = match msg {
180            Ok(msg) => msg,
181            Err(err) => {
182                re_log::warn_once!("Failed to decode message in {filepath:?}: {err}");
183                continue;
184            }
185        };
186
187        let msg = if forced_application_id.is_some() || forced_recording_id.is_some() {
188            match msg {
189                re_log_types::LogMsg::SetStoreInfo(set_store_info) => {
190                    let mut store_id = set_store_info.info.store_id.clone();
191                    if let Some(forced_application_id) = forced_application_id {
192                        store_id = store_id.with_application_id(forced_application_id.clone());
193                    }
194                    if let Some(forced_recording_id) = forced_recording_id {
195                        store_id = store_id.with_recording_id(forced_recording_id.clone());
196                    }
197
198                    re_log_types::LogMsg::SetStoreInfo(re_log_types::SetStoreInfo {
199                        info: re_log_types::StoreInfo {
200                            store_id,
201                            ..set_store_info.info
202                        },
203                        ..set_store_info
204                    })
205                }
206
207                re_log_types::LogMsg::ArrowMsg(mut store_id, arrow_msg) => {
208                    if let Some(forced_application_id) = forced_application_id {
209                        store_id = store_id.with_application_id(forced_application_id.clone());
210                    }
211                    if let Some(forced_recording_id) = forced_recording_id {
212                        store_id = store_id.with_recording_id(forced_recording_id.clone());
213                    }
214
215                    re_log_types::LogMsg::ArrowMsg(store_id, arrow_msg)
216                }
217
218                re_log_types::LogMsg::BlueprintActivationCommand(blueprint_activation_command) => {
219                    let mut blueprint_id = blueprint_activation_command.blueprint_id.clone();
220                    if let Some(forced_application_id) = forced_application_id {
221                        blueprint_id =
222                            blueprint_id.with_application_id(forced_application_id.clone());
223                    }
224                    re_log_types::LogMsg::BlueprintActivationCommand(
225                        re_log_types::BlueprintActivationCommand {
226                            blueprint_id,
227                            ..blueprint_activation_command
228                        },
229                    )
230                }
231            }
232        } else {
233            msg
234        };
235
236        let data = LoadedData::LogMsg(RrdLoader::name(&RrdLoader), msg);
237        if tx.send(data).is_err() {
238            break; // The other end has decided to hang up, not our problem.
239        }
240    }
241}
242
243// Retryable file reader that keeps retrying to read more data despite
244// reading zero bytes or reaching EOF.
245#[cfg(not(target_arch = "wasm32"))]
246struct RetryableFileReader {
247    reader: std::io::BufReader<std::fs::File>,
248    rx_file_notifs: Receiver<notify::Result<notify::Event>>,
249    rx_ticker: Receiver<std::time::Instant>,
250
251    #[expect(dead_code)]
252    watcher: notify::RecommendedWatcher,
253}
254
255#[cfg(not(target_arch = "wasm32"))]
256impl RetryableFileReader {
257    fn new(filepath: &std::path::Path) -> Result<Self, crate::DataLoaderError> {
258        use anyhow::Context as _;
259        use notify::{RecursiveMode, Watcher as _};
260
261        let file = std::fs::File::open(filepath)
262            .with_context(|| format!("Failed to open file {filepath:?}"))?;
263        let reader = std::io::BufReader::new(file);
264
265        #[cfg(not(any(target_os = "windows", target_arch = "wasm32")))]
266        re_crash_handler::sigint::track_sigint();
267
268        // 50ms is just a nice tradeoff: we just need the delay to not be perceptible by a human
269        // while not needlessly hammering the CPU.
270        let rx_ticker = crossbeam::channel::tick(std::time::Duration::from_millis(50));
271
272        let (tx_file_notifs, rx_file_notifs) = crossbeam::channel::unbounded();
273        let mut watcher = notify::recommended_watcher(tx_file_notifs)
274            .with_context(|| format!("failed to create file watcher for {filepath:?}"))?;
275
276        watcher
277            .watch(filepath, RecursiveMode::NonRecursive)
278            .with_context(|| format!("failed to watch file changes on {filepath:?}"))?;
279
280        Ok(Self {
281            reader,
282            rx_file_notifs,
283            rx_ticker,
284            watcher,
285        })
286    }
287}
288
289#[cfg(not(target_arch = "wasm32"))]
290impl std::io::Read for RetryableFileReader {
291    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
292        loop {
293            match self.reader.read(buf) {
294                Ok(0) => {
295                    self.block_until_file_changes()?;
296                }
297                Ok(n) => {
298                    return Ok(n);
299                }
300                Err(err) => {
301                    if err.kind() == std::io::ErrorKind::Interrupted {
302                        return Err(err);
303                    }
304                }
305            }
306        }
307    }
308}
309
310#[cfg(not(target_arch = "wasm32"))]
311impl std::io::BufRead for RetryableFileReader {
312    fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
313        self.reader.fill_buf()
314    }
315
316    fn consume(&mut self, amount: usize) {
317        self.reader.consume(amount);
318    }
319}
320
321#[cfg(not(target_arch = "wasm32"))]
322impl RetryableFileReader {
323    fn block_until_file_changes(&self) -> std::io::Result<usize> {
324        loop {
325            crossbeam::select! {
326                // Periodically check for SIGINT.
327                recv(self.rx_ticker) -> _ => {
328                    if re_crash_handler::sigint::was_sigint_ever_caught() {
329                        return Err(std::io::Error::new(std::io::ErrorKind::Interrupted, "SIGINT"));
330                    }
331                }
332
333                // Otherwise check for file notifications.
334                recv(self.rx_file_notifs) -> res => {
335                    return match res {
336                        Ok(Ok(event)) => match event.kind {
337                            notify::EventKind::Remove(_) => Err(std::io::Error::new(
338                                std::io::ErrorKind::NotFound,
339                                "file removed",
340                            )),
341                            _ => Ok(0),
342                        },
343                        Ok(Err(err)) => Err(std::io::Error::other(err)),
344                        Err(err) => Err(std::io::Error::other(err)),
345                    }
346                }
347            }
348        }
349    }
350}
351
352#[cfg(test)]
353mod tests {
354    use re_chunk::RowId;
355    use re_log_encoding::Encoder;
356    use re_log_types::{LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource};
357
358    use super::*;
359
360    struct DeleteOnDrop {
361        path: std::path::PathBuf,
362    }
363
364    impl Drop for DeleteOnDrop {
365        fn drop(&mut self) {
366            std::fs::remove_file(&self.path).ok();
367        }
368    }
369
370    #[test]
371    fn test_loading_with_retryable_reader() {
372        // We can't use `tempfile` here since it deletes the file on drop and we want to keep it around for a bit longer.
373        let rrd_file_path = std::path::PathBuf::from("testfile.rrd");
374        let rrd_file_delete_guard = DeleteOnDrop {
375            path: rrd_file_path.clone(),
376        };
377        std::fs::remove_file(&rrd_file_path).ok(); // Remove the file just in case a previous test crashes hard.
378        let rrd_file = std::fs::OpenOptions::new()
379            .create_new(true)
380            .write(true)
381            .open(rrd_file_path.to_str().unwrap())
382            .unwrap();
383
384        let mut encoder = Encoder::new_eager(
385            re_build_info::CrateVersion::LOCAL,
386            re_log_encoding::rrd::EncodingOptions::PROTOBUF_UNCOMPRESSED,
387            rrd_file,
388        )
389        .unwrap();
390
391        fn new_message() -> LogMsg {
392            LogMsg::SetStoreInfo(SetStoreInfo {
393                row_id: *RowId::new(),
394                info: StoreInfo::new(
395                    StoreId::random(StoreKind::Recording, "test_app"),
396                    StoreSource::RustSdk {
397                        rustc_version: String::new(),
398                        llvm_version: String::new(),
399                    },
400                ),
401            })
402        }
403
404        let messages = (0..5).map(|_| new_message()).collect::<Vec<_>>();
405
406        for m in &messages {
407            encoder.append(m).expect("failed to append message");
408        }
409        encoder.flush_blocking().expect("failed to flush messages");
410
411        let reader = RetryableFileReader::new(&rrd_file_path).unwrap();
412        let wait_for_eos = true;
413        let mut decoder = Decoder::decode_eager_with_opts(reader, wait_for_eos).unwrap();
414
415        // we should be able to read 5 messages that we wrote
416        let decoded_messages = (0..5)
417            .map(|_| decoder.next().unwrap().unwrap())
418            .collect::<Vec<_>>();
419        assert_eq!(messages, decoded_messages);
420
421        // as we're using retryable reader, we should be able to read more messages that we're now going to append
422        let decoder_handle = std::thread::Builder::new()
423            .name("background decoder".into())
424            .spawn(move || {
425                let mut remaining = Vec::new();
426                for msg in decoder {
427                    let msg = msg.unwrap();
428                    remaining.push(msg);
429                }
430
431                remaining
432            })
433            .unwrap();
434
435        // append more messages to the file
436        let more_messages = (0..100).map(|_| new_message()).collect::<Vec<_>>();
437        for m in &more_messages {
438            encoder.append(m).unwrap();
439        }
440        // Close the encoder and thus the file to make sure that file is actually written out.
441        // Otherwise we can't we be sure that the filewatcher will ever see those changes.
442        // A simple flush works sometimes, but is not as reliably as closing the file since the OS may still cache the data.
443        // (in fact we can't be sure that close is enough either, but it's the best we can do)
444        // Note that this test is not entirely representative of the real usecase of having reader and writer on
445        // different processes, since file read/write visibility across processes may behave differently.
446        encoder.finish().expect("failed to finish encoder");
447        drop(encoder);
448
449        let remaining_messages = decoder_handle.join().unwrap();
450        assert_eq!(more_messages, remaining_messages);
451
452        // Drop explicitly to make sure that rustc doesn't drop it earlier.
453        drop(rrd_file_delete_guard);
454    }
455}