1use re_log_encoding::decoder::Decoder;
2
3#[cfg(not(target_arch = "wasm32"))]
4use crossbeam::channel::Receiver;
5use re_log_types::{ApplicationId, StoreId};
6
7use crate::{DataLoader as _, LoadedData};
8
9pub struct RrdLoader;
13
14impl crate::DataLoader for RrdLoader {
15 #[inline]
16 fn name(&self) -> String {
17 "rerun.data_loaders.Rrd".into()
18 }
19
20 #[cfg(not(target_arch = "wasm32"))]
21 fn load_from_path(
22 &self,
23 settings: &crate::DataLoaderSettings,
24 filepath: std::path::PathBuf,
25 tx: std::sync::mpsc::Sender<crate::LoadedData>,
26 ) -> Result<(), crate::DataLoaderError> {
27 use anyhow::Context as _;
28
29 re_tracing::profile_function!(filepath.display().to_string());
30
31 let extension = crate::extension(&filepath);
32 if !matches!(extension.as_str(), "rbl" | "rrd") {
33 return Err(crate::DataLoaderError::Incompatible(filepath.clone()));
35 }
36
37 re_log::debug!(
38 ?filepath,
39 loader = self.name(),
40 "Loading rrd data from filesystem…",
41 );
42
43 match extension.as_str() {
44 "rbl" => {
45 let file = std::fs::File::open(&filepath)
50 .with_context(|| format!("Failed to open file {filepath:?}"))?;
51 let file = std::io::BufReader::new(file);
52
53 let decoder = Decoder::new(file)?;
54
55 std::thread::Builder::new()
57 .name(format!("decode_and_stream({filepath:?})"))
58 .spawn({
59 let filepath = filepath.clone();
60 let settings = settings.clone();
61 move || {
62 decode_and_stream(
63 &filepath,
64 &tx,
65 decoder,
66 settings.opened_application_id.as_ref(),
67 None,
69 );
70 }
71 })
72 .with_context(|| format!("Failed to spawn IO thread for {filepath:?}"))?;
73 }
74
75 "rrd" => {
76 let retryable_reader = RetryableFileReader::new(&filepath).with_context(|| {
79 format!("failed to create retryable file reader for {filepath:?}")
80 })?;
81 let decoder = Decoder::new(retryable_reader)?;
82
83 std::thread::Builder::new()
85 .name(format!("decode_and_stream({filepath:?})"))
86 .spawn({
87 let filepath = filepath.clone();
88 move || {
89 decode_and_stream(
90 &filepath, &tx, decoder,
91 None, None,
93 );
94 }
95 })
96 .with_context(|| format!("Failed to spawn IO thread for {filepath:?}"))?;
97 }
98 _ => unreachable!(),
99 }
100
101 Ok(())
102 }
103
104 fn load_from_file_contents(
105 &self,
106 settings: &crate::DataLoaderSettings,
107 filepath: std::path::PathBuf,
108 contents: std::borrow::Cow<'_, [u8]>,
109 tx: std::sync::mpsc::Sender<crate::LoadedData>,
110 ) -> Result<(), crate::DataLoaderError> {
111 re_tracing::profile_function!(filepath.display().to_string());
112
113 let extension = crate::extension(&filepath);
114 if !matches!(extension.as_str(), "rbl" | "rrd") {
115 return Err(crate::DataLoaderError::Incompatible(filepath));
117 }
118
119 let contents = std::io::Cursor::new(contents);
120 let decoder = match re_log_encoding::decoder::Decoder::new(contents) {
121 Ok(decoder) => decoder,
122 Err(err) => match err {
123 re_log_encoding::decoder::DecodeError::NotAnRrd
125 | re_log_encoding::decoder::DecodeError::Options(_) => return Ok(()),
126 _ => return Err(err.into()),
127 },
128 };
129
130 let forced_application_id = if extension == "rbl" {
133 settings.opened_application_id.as_ref()
134 } else {
135 None
136 };
137 let forced_recording_id = None;
138
139 decode_and_stream(
140 &filepath,
141 &tx,
142 decoder,
143 forced_application_id,
144 forced_recording_id,
145 );
146
147 Ok(())
148 }
149}
150
151fn decode_and_stream<R: std::io::Read>(
152 filepath: &std::path::Path,
153 tx: &std::sync::mpsc::Sender<crate::LoadedData>,
154 decoder: Decoder<R>,
155 forced_application_id: Option<&ApplicationId>,
156 forced_store_id: Option<&StoreId>,
157) {
158 re_tracing::profile_function!(filepath.display().to_string());
159
160 for msg in decoder {
161 let msg = match msg {
162 Ok(msg) => msg,
163 Err(err) => {
164 re_log::warn_once!("Failed to decode message in {filepath:?}: {err}");
165 continue;
166 }
167 };
168
169 let msg = if forced_application_id.is_some() || forced_store_id.is_some() {
170 match msg {
171 re_log_types::LogMsg::SetStoreInfo(set_store_info) => {
172 re_log_types::LogMsg::SetStoreInfo(re_log_types::SetStoreInfo {
173 info: re_log_types::StoreInfo {
174 application_id: forced_application_id
175 .cloned()
176 .unwrap_or(set_store_info.info.application_id),
177 store_id: forced_store_id
178 .cloned()
179 .unwrap_or(set_store_info.info.store_id),
180 ..set_store_info.info
181 },
182 ..set_store_info
183 })
184 }
185
186 re_log_types::LogMsg::ArrowMsg(store_id, arrow_msg) => {
187 re_log_types::LogMsg::ArrowMsg(
188 forced_store_id.cloned().unwrap_or(store_id),
189 arrow_msg,
190 )
191 }
192
193 re_log_types::LogMsg::BlueprintActivationCommand(blueprint_activation_command) => {
194 re_log_types::LogMsg::BlueprintActivationCommand(blueprint_activation_command)
195 }
196 }
197 } else {
198 msg
199 };
200
201 let data = LoadedData::LogMsg(RrdLoader::name(&RrdLoader), msg);
202 if tx.send(data).is_err() {
203 break; }
205 }
206}
207
208#[cfg(not(target_arch = "wasm32"))]
211struct RetryableFileReader {
212 reader: std::io::BufReader<std::fs::File>,
213 rx_file_notifs: Receiver<notify::Result<notify::Event>>,
214 rx_ticker: Receiver<std::time::Instant>,
215
216 #[allow(dead_code)]
217 watcher: notify::RecommendedWatcher,
218}
219
220#[cfg(not(target_arch = "wasm32"))]
221impl RetryableFileReader {
222 fn new(filepath: &std::path::Path) -> Result<Self, crate::DataLoaderError> {
223 use anyhow::Context as _;
224 use notify::{RecursiveMode, Watcher as _};
225
226 let file = std::fs::File::open(filepath)
227 .with_context(|| format!("Failed to open file {filepath:?}"))?;
228 let reader = std::io::BufReader::new(file);
229
230 #[cfg(not(any(target_os = "windows", target_arch = "wasm32")))]
231 re_crash_handler::sigint::track_sigint();
232
233 let rx_ticker = crossbeam::channel::tick(std::time::Duration::from_millis(50));
236
237 let (tx_file_notifs, rx_file_notifs) = crossbeam::channel::unbounded();
238 let mut watcher = notify::recommended_watcher(tx_file_notifs)
239 .with_context(|| format!("failed to create file watcher for {filepath:?}"))?;
240
241 watcher
242 .watch(filepath, RecursiveMode::NonRecursive)
243 .with_context(|| format!("failed to watch file changes on {filepath:?}"))?;
244
245 Ok(Self {
246 reader,
247 rx_file_notifs,
248 rx_ticker,
249 watcher,
250 })
251 }
252}
253
254#[cfg(not(target_arch = "wasm32"))]
255impl std::io::Read for RetryableFileReader {
256 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
257 loop {
258 match self.reader.read(buf) {
259 Ok(0) => self.block_until_file_changes()?,
260 Ok(n) => {
261 return Ok(n);
262 }
263 Err(err) => match err.kind() {
264 std::io::ErrorKind::Interrupted => continue,
265 _ => return Err(err),
266 },
267 };
268 }
269 }
270}
271
272#[cfg(not(target_arch = "wasm32"))]
273impl RetryableFileReader {
274 fn block_until_file_changes(&self) -> std::io::Result<usize> {
275 loop {
276 crossbeam::select! {
277 recv(self.rx_ticker) -> _ => {
279 if re_crash_handler::sigint::was_sigint_ever_caught() {
280 return Err(std::io::Error::new(std::io::ErrorKind::Interrupted, "SIGINT"));
281 }
282 }
283
284 recv(self.rx_file_notifs) -> res => {
286 return match res {
287 Ok(Ok(event)) => match event.kind {
288 notify::EventKind::Remove(_) => Err(std::io::Error::new(
289 std::io::ErrorKind::NotFound,
290 "file removed",
291 )),
292 _ => Ok(0),
293 },
294 Ok(Err(err)) => Err(std::io::Error::new(std::io::ErrorKind::Other, err)),
295 Err(err) => Err(std::io::Error::new(std::io::ErrorKind::Other, err)),
296 }
297 }
298 }
299 }
300 }
301}
302
303#[cfg(test)]
304mod tests {
305 use re_build_info::CrateVersion;
306 use re_chunk::RowId;
307 use re_log_encoding::encoder::DroppableEncoder;
308 use re_log_types::{
309 ApplicationId, LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource,
310 };
311
312 use super::*;
313
314 struct DeleteOnDrop {
315 path: std::path::PathBuf,
316 }
317
318 impl Drop for DeleteOnDrop {
319 fn drop(&mut self) {
320 std::fs::remove_file(&self.path).ok();
321 }
322 }
323
324 #[test]
325 fn test_loading_with_retryable_reader() {
326 let rrd_file_path = std::path::PathBuf::from("testfile.rrd");
328 let rrd_file_delete_guard = DeleteOnDrop {
329 path: rrd_file_path.clone(),
330 };
331 std::fs::remove_file(&rrd_file_path).ok(); let rrd_file = std::fs::OpenOptions::new()
333 .create_new(true)
334 .write(true)
335 .open(rrd_file_path.to_str().unwrap())
336 .unwrap();
337
338 let mut encoder = DroppableEncoder::new(
339 re_build_info::CrateVersion::LOCAL,
340 re_log_encoding::EncodingOptions::PROTOBUF_UNCOMPRESSED,
341 rrd_file,
342 )
343 .unwrap();
344
345 fn new_message() -> LogMsg {
346 LogMsg::SetStoreInfo(SetStoreInfo {
347 row_id: *RowId::new(),
348 info: StoreInfo {
349 application_id: ApplicationId("test".to_owned()),
350 store_id: StoreId::random(StoreKind::Recording),
351 cloned_from: None,
352 store_source: StoreSource::RustSdk {
353 rustc_version: String::new(),
354 llvm_version: String::new(),
355 },
356 store_version: Some(CrateVersion::LOCAL),
357 },
358 })
359 }
360
361 let messages = (0..5).map(|_| new_message()).collect::<Vec<_>>();
362
363 for m in &messages {
364 encoder.append(m).expect("failed to append message");
365 }
366 encoder.flush_blocking().expect("failed to flush messages");
367
368 let reader = RetryableFileReader::new(&rrd_file_path).unwrap();
369 let mut decoder = Decoder::new(reader).unwrap();
370
371 let decoded_messages = (0..5)
373 .map(|_| decoder.next().unwrap().unwrap())
374 .collect::<Vec<_>>();
375 assert_eq!(messages, decoded_messages);
376
377 let decoder_handle = std::thread::Builder::new()
379 .name("background decoder".into())
380 .spawn(move || {
381 let mut remaining = Vec::new();
382 for msg in decoder {
383 let msg = msg.unwrap();
384 remaining.push(msg);
385 }
386
387 remaining
388 })
389 .unwrap();
390
391 let more_messages = (0..100).map(|_| new_message()).collect::<Vec<_>>();
393 for m in &more_messages {
394 encoder.append(m).unwrap();
395 }
396 encoder.finish().expect("failed to finish encoder");
403 drop(encoder);
404
405 let remaining_messages = decoder_handle.join().unwrap();
406 assert_eq!(more_messages, remaining_messages);
407
408 drop(rrd_file_delete_guard);
410 }
411}