re_data_loader/
loader_rrd.rs

1use re_log_encoding::decoder::Decoder;
2
3#[cfg(not(target_arch = "wasm32"))]
4use crossbeam::channel::Receiver;
5use re_log_types::{ApplicationId, StoreId};
6
7use crate::{DataLoader as _, LoadedData};
8
9// ---
10
11/// Loads data from any `rrd` file or in-memory contents.
12pub struct RrdLoader;
13
14impl crate::DataLoader for RrdLoader {
15    #[inline]
16    fn name(&self) -> String {
17        "rerun.data_loaders.Rrd".into()
18    }
19
20    #[cfg(not(target_arch = "wasm32"))]
21    fn load_from_path(
22        &self,
23        settings: &crate::DataLoaderSettings,
24        filepath: std::path::PathBuf,
25        tx: std::sync::mpsc::Sender<crate::LoadedData>,
26    ) -> Result<(), crate::DataLoaderError> {
27        use anyhow::Context as _;
28
29        re_tracing::profile_function!(filepath.display().to_string());
30
31        let extension = crate::extension(&filepath);
32        if !matches!(extension.as_str(), "rbl" | "rrd") {
33            // NOTE: blueprints and recordings has the same file format
34            return Err(crate::DataLoaderError::Incompatible(filepath.clone()));
35        }
36
37        re_log::debug!(
38            ?filepath,
39            loader = self.name(),
40            "Loading rrd data from filesystem…",
41        );
42
43        match extension.as_str() {
44            "rbl" => {
45                // We assume .rbl is not streamed and no retrying after seeing EOF is needed.
46                // Otherwise we'd risk retrying to read .rbl file that has no end-of-stream header and
47                // blocking the UI update thread indefinitely and making the viewer unresponsive (as .rbl
48                // files are sometimes read on UI update).
49                let file = std::fs::File::open(&filepath)
50                    .with_context(|| format!("Failed to open file {filepath:?}"))?;
51                let file = std::io::BufReader::new(file);
52
53                let decoder = Decoder::new(file)?;
54
55                // NOTE: This is IO bound, it must run on a dedicated thread, not the shared rayon thread pool.
56                std::thread::Builder::new()
57                    .name(format!("decode_and_stream({filepath:?})"))
58                    .spawn({
59                        let filepath = filepath.clone();
60                        let settings = settings.clone();
61                        move || {
62                            decode_and_stream(
63                                &filepath,
64                                &tx,
65                                decoder,
66                                settings.opened_application_id.as_ref(),
67                                // We never want to patch blueprints' store IDs, only their app IDs.
68                                None,
69                            );
70                        }
71                    })
72                    .with_context(|| format!("Failed to spawn IO thread for {filepath:?}"))?;
73            }
74
75            "rrd" => {
76                // For .rrd files we retry reading despite reaching EOF to support live (writer) streaming.
77                // Decoder will give up when it sees end of file marker (i.e. end-of-stream message header)
78                let retryable_reader = RetryableFileReader::new(&filepath).with_context(|| {
79                    format!("failed to create retryable file reader for {filepath:?}")
80                })?;
81                let decoder = Decoder::new(retryable_reader)?;
82
83                // NOTE: This is IO bound, it must run on a dedicated thread, not the shared rayon thread pool.
84                std::thread::Builder::new()
85                    .name(format!("decode_and_stream({filepath:?})"))
86                    .spawn({
87                        let filepath = filepath.clone();
88                        move || {
89                            decode_and_stream(
90                                &filepath, &tx, decoder,
91                                // Never use import semantics for .rrd files
92                                None, None,
93                            );
94                        }
95                    })
96                    .with_context(|| format!("Failed to spawn IO thread for {filepath:?}"))?;
97            }
98            _ => unreachable!(),
99        }
100
101        Ok(())
102    }
103
104    fn load_from_file_contents(
105        &self,
106        settings: &crate::DataLoaderSettings,
107        filepath: std::path::PathBuf,
108        contents: std::borrow::Cow<'_, [u8]>,
109        tx: std::sync::mpsc::Sender<crate::LoadedData>,
110    ) -> Result<(), crate::DataLoaderError> {
111        re_tracing::profile_function!(filepath.display().to_string());
112
113        let extension = crate::extension(&filepath);
114        if !matches!(extension.as_str(), "rbl" | "rrd") {
115            // NOTE: blueprints and recordings has the same file format
116            return Err(crate::DataLoaderError::Incompatible(filepath));
117        }
118
119        let contents = std::io::Cursor::new(contents);
120        let decoder = match re_log_encoding::decoder::Decoder::new(contents) {
121            Ok(decoder) => decoder,
122            Err(err) => match err {
123                // simply not interested
124                re_log_encoding::decoder::DecodeError::NotAnRrd
125                | re_log_encoding::decoder::DecodeError::Options(_) => return Ok(()),
126                _ => return Err(err.into()),
127            },
128        };
129
130        // * We never want to patch blueprints' store IDs, only their app IDs.
131        // * We neer use import semantics at all for .rrd files.
132        let forced_application_id = if extension == "rbl" {
133            settings.opened_application_id.as_ref()
134        } else {
135            None
136        };
137        let forced_recording_id = None;
138
139        decode_and_stream(
140            &filepath,
141            &tx,
142            decoder,
143            forced_application_id,
144            forced_recording_id,
145        );
146
147        Ok(())
148    }
149}
150
151fn decode_and_stream<R: std::io::Read>(
152    filepath: &std::path::Path,
153    tx: &std::sync::mpsc::Sender<crate::LoadedData>,
154    decoder: Decoder<R>,
155    forced_application_id: Option<&ApplicationId>,
156    forced_store_id: Option<&StoreId>,
157) {
158    re_tracing::profile_function!(filepath.display().to_string());
159
160    for msg in decoder {
161        let msg = match msg {
162            Ok(msg) => msg,
163            Err(err) => {
164                re_log::warn_once!("Failed to decode message in {filepath:?}: {err}");
165                continue;
166            }
167        };
168
169        let msg = if forced_application_id.is_some() || forced_store_id.is_some() {
170            match msg {
171                re_log_types::LogMsg::SetStoreInfo(set_store_info) => {
172                    re_log_types::LogMsg::SetStoreInfo(re_log_types::SetStoreInfo {
173                        info: re_log_types::StoreInfo {
174                            application_id: forced_application_id
175                                .cloned()
176                                .unwrap_or(set_store_info.info.application_id),
177                            store_id: forced_store_id
178                                .cloned()
179                                .unwrap_or(set_store_info.info.store_id),
180                            ..set_store_info.info
181                        },
182                        ..set_store_info
183                    })
184                }
185
186                re_log_types::LogMsg::ArrowMsg(store_id, arrow_msg) => {
187                    re_log_types::LogMsg::ArrowMsg(
188                        forced_store_id.cloned().unwrap_or(store_id),
189                        arrow_msg,
190                    )
191                }
192
193                re_log_types::LogMsg::BlueprintActivationCommand(blueprint_activation_command) => {
194                    re_log_types::LogMsg::BlueprintActivationCommand(blueprint_activation_command)
195                }
196            }
197        } else {
198            msg
199        };
200
201        let data = LoadedData::LogMsg(RrdLoader::name(&RrdLoader), msg);
202        if tx.send(data).is_err() {
203            break; // The other end has decided to hang up, not our problem.
204        }
205    }
206}
207
208// Retryable file reader that keeps retrying to read more data despite
209// reading zero bytes or reaching EOF.
210#[cfg(not(target_arch = "wasm32"))]
211struct RetryableFileReader {
212    reader: std::io::BufReader<std::fs::File>,
213    rx_file_notifs: Receiver<notify::Result<notify::Event>>,
214    rx_ticker: Receiver<std::time::Instant>,
215
216    #[allow(dead_code)]
217    watcher: notify::RecommendedWatcher,
218}
219
220#[cfg(not(target_arch = "wasm32"))]
221impl RetryableFileReader {
222    fn new(filepath: &std::path::Path) -> Result<Self, crate::DataLoaderError> {
223        use anyhow::Context as _;
224        use notify::{RecursiveMode, Watcher as _};
225
226        let file = std::fs::File::open(filepath)
227            .with_context(|| format!("Failed to open file {filepath:?}"))?;
228        let reader = std::io::BufReader::new(file);
229
230        #[cfg(not(any(target_os = "windows", target_arch = "wasm32")))]
231        re_crash_handler::sigint::track_sigint();
232
233        // 50ms is just a nice tradeoff: we just need the delay to not be perceptible by a human
234        // while not needlessly hammering the CPU.
235        let rx_ticker = crossbeam::channel::tick(std::time::Duration::from_millis(50));
236
237        let (tx_file_notifs, rx_file_notifs) = crossbeam::channel::unbounded();
238        let mut watcher = notify::recommended_watcher(tx_file_notifs)
239            .with_context(|| format!("failed to create file watcher for {filepath:?}"))?;
240
241        watcher
242            .watch(filepath, RecursiveMode::NonRecursive)
243            .with_context(|| format!("failed to watch file changes on {filepath:?}"))?;
244
245        Ok(Self {
246            reader,
247            rx_file_notifs,
248            rx_ticker,
249            watcher,
250        })
251    }
252}
253
254#[cfg(not(target_arch = "wasm32"))]
255impl std::io::Read for RetryableFileReader {
256    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
257        loop {
258            match self.reader.read(buf) {
259                Ok(0) => self.block_until_file_changes()?,
260                Ok(n) => {
261                    return Ok(n);
262                }
263                Err(err) => match err.kind() {
264                    std::io::ErrorKind::Interrupted => continue,
265                    _ => return Err(err),
266                },
267            };
268        }
269    }
270}
271
272#[cfg(not(target_arch = "wasm32"))]
273impl RetryableFileReader {
274    fn block_until_file_changes(&self) -> std::io::Result<usize> {
275        loop {
276            crossbeam::select! {
277                // Periodically check for SIGINT.
278                recv(self.rx_ticker) -> _ => {
279                    if re_crash_handler::sigint::was_sigint_ever_caught() {
280                        return Err(std::io::Error::new(std::io::ErrorKind::Interrupted, "SIGINT"));
281                    }
282                }
283
284                // Otherwise check for file notifications.
285                recv(self.rx_file_notifs) -> res => {
286                    return match res {
287                        Ok(Ok(event)) => match event.kind {
288                            notify::EventKind::Remove(_) => Err(std::io::Error::new(
289                                std::io::ErrorKind::NotFound,
290                                "file removed",
291                            )),
292                            _ => Ok(0),
293                        },
294                        Ok(Err(err)) => Err(std::io::Error::new(std::io::ErrorKind::Other, err)),
295                        Err(err) => Err(std::io::Error::new(std::io::ErrorKind::Other, err)),
296                    }
297                }
298            }
299        }
300    }
301}
302
303#[cfg(test)]
304mod tests {
305    use re_build_info::CrateVersion;
306    use re_chunk::RowId;
307    use re_log_encoding::encoder::DroppableEncoder;
308    use re_log_types::{
309        ApplicationId, LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource,
310    };
311
312    use super::*;
313
314    struct DeleteOnDrop {
315        path: std::path::PathBuf,
316    }
317
318    impl Drop for DeleteOnDrop {
319        fn drop(&mut self) {
320            std::fs::remove_file(&self.path).ok();
321        }
322    }
323
324    #[test]
325    fn test_loading_with_retryable_reader() {
326        // We can't use `tempfile` here since it deletes the file on drop and we want to keep it around for a bit longer.
327        let rrd_file_path = std::path::PathBuf::from("testfile.rrd");
328        let rrd_file_delete_guard = DeleteOnDrop {
329            path: rrd_file_path.clone(),
330        };
331        std::fs::remove_file(&rrd_file_path).ok(); // Remove the file just in case a previous test crashes hard.
332        let rrd_file = std::fs::OpenOptions::new()
333            .create_new(true)
334            .write(true)
335            .open(rrd_file_path.to_str().unwrap())
336            .unwrap();
337
338        let mut encoder = DroppableEncoder::new(
339            re_build_info::CrateVersion::LOCAL,
340            re_log_encoding::EncodingOptions::PROTOBUF_UNCOMPRESSED,
341            rrd_file,
342        )
343        .unwrap();
344
345        fn new_message() -> LogMsg {
346            LogMsg::SetStoreInfo(SetStoreInfo {
347                row_id: *RowId::new(),
348                info: StoreInfo {
349                    application_id: ApplicationId("test".to_owned()),
350                    store_id: StoreId::random(StoreKind::Recording),
351                    cloned_from: None,
352                    store_source: StoreSource::RustSdk {
353                        rustc_version: String::new(),
354                        llvm_version: String::new(),
355                    },
356                    store_version: Some(CrateVersion::LOCAL),
357                },
358            })
359        }
360
361        let messages = (0..5).map(|_| new_message()).collect::<Vec<_>>();
362
363        for m in &messages {
364            encoder.append(m).expect("failed to append message");
365        }
366        encoder.flush_blocking().expect("failed to flush messages");
367
368        let reader = RetryableFileReader::new(&rrd_file_path).unwrap();
369        let mut decoder = Decoder::new(reader).unwrap();
370
371        // we should be able to read 5 messages that we wrote
372        let decoded_messages = (0..5)
373            .map(|_| decoder.next().unwrap().unwrap())
374            .collect::<Vec<_>>();
375        assert_eq!(messages, decoded_messages);
376
377        // as we're using retryable reader, we should be able to read more messages that we're now going to append
378        let decoder_handle = std::thread::Builder::new()
379            .name("background decoder".into())
380            .spawn(move || {
381                let mut remaining = Vec::new();
382                for msg in decoder {
383                    let msg = msg.unwrap();
384                    remaining.push(msg);
385                }
386
387                remaining
388            })
389            .unwrap();
390
391        // append more messages to the file
392        let more_messages = (0..100).map(|_| new_message()).collect::<Vec<_>>();
393        for m in &more_messages {
394            encoder.append(m).unwrap();
395        }
396        // Close the encoder and thus the file to make sure that file is actually written out.
397        // Otherwise we can't we be sure that the filewatcher will ever see those changes.
398        // A simple flush works sometimes, but is not as reliably as closing the file since the OS may still cache the data.
399        // (in fact we can't be sure that close is enough either, but it's the best we can do)
400        // Note that this test is not entirely representative of the real usecase of having reader and writer on
401        // different processes, since file read/write visibility across processes may behave differently.
402        encoder.finish().expect("failed to finish encoder");
403        drop(encoder);
404
405        let remaining_messages = decoder_handle.join().unwrap();
406        assert_eq!(more_messages, remaining_messages);
407
408        // Drop explicitly to make sure that rustc doesn't drop it earlier.
409        drop(rrd_file_delete_guard);
410    }
411}