re_log_encoding/
transport_to_app.rs

1//! Everything needed to convert back and forth between transport-level and application-level types.
2//!
3//! ⚠️Make sure to familiarize yourself with the [crate-level docs] first. ⚠️
4//!
5//! This is where all the complex application-level logic that precedes encoding / follows decoding
6//! happens: Chunk/Sorbet migrations, data patching (app ID injection, version propagation, BW-compat hacks,
7//! etc).
8//!
9//! To go from a freshly decoded transport-level type to its application-level equivalent, use [`ToApplication`].
10//! To prepare an application-level type for encoding, use [`ToTransport`].
11
12use re_build_info::CrateVersion;
13use re_log_types::{BlueprintActivationCommand, SetStoreInfo};
14
15use crate::ApplicationIdInjector;
16use crate::rrd::CodecError;
17
18// TODO(cmc): I'd really like a nice centralized way of communicating this.
19//
20// pub type LogMsgTransport = re_protos::log_msg::v1alpha1::log_msg::Msg;
21// pub type LogMsgApp = re_log_types::LogMsg;
22
23// ---
24
25/// Converts an application-level type to a transport-level type, ready for encoding.
26pub trait ToTransport {
27    type Output;
28    type Context<'a>;
29
30    fn to_transport(&self, context: Self::Context<'_>) -> Result<Self::Output, CodecError>;
31}
32
33impl ToTransport for re_log_types::LogMsg {
34    type Output = re_protos::log_msg::v1alpha1::log_msg::Msg;
35    type Context<'a> = crate::rrd::Compression;
36
37    fn to_transport(&self, compression: Self::Context<'_>) -> Result<Self::Output, CodecError> {
38        log_msg_app_to_transport(self, compression)
39    }
40}
41
42impl ToTransport for re_log_types::ArrowMsg {
43    type Output = re_protos::log_msg::v1alpha1::ArrowMsg;
44    type Context<'a> = (re_log_types::StoreId, crate::rrd::Compression);
45
46    fn to_transport(
47        &self,
48        (store_id, compression): Self::Context<'_>,
49    ) -> Result<Self::Output, CodecError> {
50        arrow_msg_app_to_transport(self, store_id, compression)
51    }
52}
53
54impl ToTransport for crate::RrdFooter {
55    type Output = re_protos::log_msg::v1alpha1::RrdFooter;
56    type Context<'a> = ();
57
58    fn to_transport(&self, _: Self::Context<'_>) -> Result<Self::Output, CodecError> {
59        let manifests: Result<Vec<_>, _> = self
60            .manifests
61            .values()
62            .map(|manifest| manifest.to_transport(()))
63            .collect();
64
65        Ok(Self::Output {
66            manifests: manifests?,
67        })
68    }
69}
70
71impl ToTransport for crate::RrdManifest {
72    type Output = re_protos::log_msg::v1alpha1::RrdManifest;
73    type Context<'a> = ();
74
75    fn to_transport(&self, (): Self::Context<'_>) -> Result<Self::Output, CodecError> {
76        {
77            self.sanity_check_cheap()?;
78
79            // that will only work for tests local to this crate, but that's better than nothing.
80            #[cfg(test)]
81            self.sanity_check_heavy()?;
82        }
83
84        let sorbet_schema = re_protos::common::v1alpha1::Schema::try_from(&self.sorbet_schema)
85            .map_err(CodecError::ArrowSerialization)?;
86
87        Ok(Self::Output {
88            store_id: Some(self.store_id.clone().into()),
89            sorbet_schema_sha256: Some(self.sorbet_schema_sha256.to_vec().into()),
90            sorbet_schema: Some(sorbet_schema),
91            data: Some(self.data.clone().into()),
92        })
93    }
94}
95
96/// Converts a transport-level type to an application-level type, ready for use in the viewer.
97pub trait ToApplication {
98    type Output;
99    type Context<'a>;
100
101    fn to_application(&self, context: Self::Context<'_>) -> Result<Self::Output, CodecError>;
102}
103
104impl ToApplication for re_protos::log_msg::v1alpha1::log_msg::Msg {
105    type Output = re_log_types::LogMsg;
106    type Context<'a> = (&'a mut dyn ApplicationIdInjector, Option<CrateVersion>);
107
108    fn to_application(
109        &self,
110        (app_id_injector, patched_version): Self::Context<'_>,
111    ) -> Result<Self::Output, CodecError> {
112        let mut log_msg = log_msg_transport_to_app(app_id_injector, self)?;
113
114        if let Some(patched_version) = patched_version
115            && let re_log_types::LogMsg::SetStoreInfo(msg) = &mut log_msg
116        {
117            // In the context of a native RRD stream (files, stdio, etc), this is used to patch the
118            // version advertised by the application-level object so that it matches the one advertised
119            // in the stream header.
120            // This in turn is what makes it possible to display the version of the RRD file in the viewer.
121            msg.info.store_version = Some(patched_version);
122        }
123
124        Ok(log_msg)
125    }
126}
127
128impl ToApplication for re_protos::log_msg::v1alpha1::LogMsg {
129    type Output = re_log_types::LogMsg;
130    type Context<'a> = (&'a mut dyn ApplicationIdInjector, Option<CrateVersion>);
131
132    fn to_application(
133        &self,
134        app_id_injector: Self::Context<'_>,
135    ) -> Result<Self::Output, CodecError> {
136        let Some(msg) = self.msg.as_ref() else {
137            return Err(re_protos::missing_field!(Self, "msg").into());
138        };
139
140        msg.to_application(app_id_injector)
141    }
142}
143
144impl ToApplication for re_protos::log_msg::v1alpha1::ArrowMsg {
145    type Output = re_log_types::ArrowMsg;
146    type Context<'a> = ();
147
148    fn to_application(&self, _context: Self::Context<'_>) -> Result<Self::Output, CodecError> {
149        arrow_msg_transport_to_app(self)
150    }
151}
152
153impl ToApplication for re_protos::log_msg::v1alpha1::RrdFooter {
154    type Output = crate::RrdFooter;
155    type Context<'a> = ();
156
157    fn to_application(&self, _context: Self::Context<'_>) -> Result<Self::Output, CodecError> {
158        let manifests: Result<std::collections::HashMap<_, _>, _> = self
159            .manifests
160            .iter()
161            .map(|manifest| {
162                let manifest = manifest.to_application(())?;
163                Ok::<_, CodecError>((manifest.store_id.clone(), manifest))
164            })
165            .collect();
166
167        Ok(Self::Output {
168            manifests: manifests?,
169        })
170    }
171}
172
173impl ToApplication for re_protos::log_msg::v1alpha1::RrdManifest {
174    type Output = crate::RrdManifest;
175    type Context<'a> = ();
176
177    fn to_application(&self, _context: Self::Context<'_>) -> Result<Self::Output, CodecError> {
178        let store_id = self
179            .store_id
180            .as_ref()
181            .ok_or_else(|| re_protos::missing_field!(Self, "store_id"))?;
182
183        let sorbet_schema = self
184            .sorbet_schema
185            .as_ref()
186            .ok_or_else(|| re_protos::missing_field!(Self, "sorbet_schema"))?;
187
188        let sorbet_schema_sha256 = self
189            .sorbet_schema_sha256
190            .as_ref()
191            .ok_or_else(|| re_protos::missing_field!(Self, "sorbet_schema_sha256"))?;
192        let sorbet_schema_sha256: [u8; 32] = (**sorbet_schema_sha256)
193            .try_into()
194            .map_err(|err| re_protos::invalid_field!(Self, "sorbet_schema_sha256", err))?;
195
196        let data = self
197            .data
198            .as_ref()
199            .ok_or_else(|| re_protos::missing_field!(Self, "data"))?;
200
201        let rrd_manifest = Self::Output {
202            store_id: store_id.clone().try_into()?,
203            sorbet_schema: sorbet_schema
204                .try_into()
205                .map_err(CodecError::ArrowDeserialization)?,
206            sorbet_schema_sha256,
207            data: data.try_into()?,
208        };
209
210        {
211            rrd_manifest.sanity_check_cheap()?;
212
213            // that will only work for tests local to this crate, but that's better than nothing
214            #[cfg(test)]
215            rrd_manifest.sanity_check_heavy()?;
216        }
217
218        Ok(rrd_manifest)
219    }
220}
221
222// ---
223
224/// Converts a transport-level `LogMsg` to its application-level counterpart.
225///
226/// This function attempts to migrate legacy `StoreId` with missing application id. It will return
227/// [`CodecError::StoreIdMissingApplicationId`] if a message arrives before the matching
228/// `SetStoreInfo` message.
229///
230/// The provided [`ApplicationIdInjector`] must be shared across all calls for the same stream.
231#[tracing::instrument(level = "trace", skip_all)]
232fn log_msg_transport_to_app<I: ApplicationIdInjector + ?Sized>(
233    app_id_injector: &mut I,
234    message: &re_protos::log_msg::v1alpha1::log_msg::Msg,
235) -> Result<re_log_types::LogMsg, CodecError> {
236    re_tracing::profile_function!();
237
238    use re_protos::log_msg::v1alpha1::log_msg::Msg;
239    use re_protos::missing_field;
240
241    match message {
242        Msg::SetStoreInfo(set_store_info) => {
243            let set_store_info: SetStoreInfo = set_store_info.clone().try_into()?;
244            app_id_injector.store_info_received(&set_store_info.info);
245            Ok(re_log_types::LogMsg::SetStoreInfo(set_store_info))
246        }
247
248        Msg::ArrowMsg(arrow_msg) => {
249            let encoded = arrow_msg_transport_to_app(arrow_msg)?;
250
251            //TODO(#10730): clean that up when removing 0.24 back compat
252            let store_id: re_log_types::StoreId = match arrow_msg
253                .store_id
254                .as_ref()
255                .ok_or_else(|| missing_field!(re_protos::log_msg::v1alpha1::ArrowMsg, "store_id"))?
256                .clone()
257                .try_into()
258            {
259                Ok(store_id) => store_id,
260                Err(err) => {
261                    let Some(store_id) = app_id_injector.recover_store_id(err.clone()) else {
262                        return Err(err.into());
263                    };
264
265                    store_id
266                }
267            };
268
269            Ok(re_log_types::LogMsg::ArrowMsg(store_id, encoded))
270        }
271
272        Msg::BlueprintActivationCommand(blueprint_activation_command) => {
273            //TODO(#10730): clean that up when removing 0.24 back compat
274            let blueprint_id: re_log_types::StoreId = match blueprint_activation_command
275                .blueprint_id
276                .as_ref()
277                .ok_or_else(|| {
278                    missing_field!(
279                        re_protos::log_msg::v1alpha1::BlueprintActivationCommand,
280                        "blueprint_id"
281                    )
282                })?
283                .clone()
284                .try_into()
285            {
286                Ok(store_id) => store_id,
287                Err(err) => {
288                    let Some(store_id) = app_id_injector.recover_store_id(err.clone()) else {
289                        return Err(err.into());
290                    };
291
292                    store_id
293                }
294            };
295
296            Ok(re_log_types::LogMsg::BlueprintActivationCommand(
297                BlueprintActivationCommand {
298                    blueprint_id,
299                    make_active: blueprint_activation_command.make_active,
300                    make_default: blueprint_activation_command.make_default,
301                },
302            ))
303        }
304    }
305}
306
307/// Converts a transport-level `ArrowMsg` to its application-level counterpart.
308#[tracing::instrument(level = "trace", skip_all)]
309fn arrow_msg_transport_to_app(
310    arrow_msg: &re_protos::log_msg::v1alpha1::ArrowMsg,
311) -> Result<re_log_types::ArrowMsg, CodecError> {
312    re_tracing::profile_function!();
313
314    use re_protos::log_msg::v1alpha1::Encoding;
315
316    if arrow_msg.encoding() != Encoding::ArrowIpc {
317        return Err(CodecError::UnsupportedEncoding);
318    }
319
320    let batch = decode_arrow(
321        &arrow_msg.payload,
322        arrow_msg.uncompressed_size as usize,
323        arrow_msg.compression().into(),
324    )?;
325
326    let chunk_id = re_sorbet::chunk_id_of_schema(batch.schema_ref())?.as_tuid();
327
328    // TODO(grtlr): In the future, we should be able to rely on the `chunk_id` to be present in the
329    // protobuf definitions. For now we have to extract it from the `batch`.
330    //
331    // let chunk_id = arrow_msg
332    //     .chunk_id
333    //     .ok_or_else(|| missing_field!(re_protos::log_msg::v1alpha1::ArrowMsg, "chunk_id"))?
334    //     .try_from()?;
335
336    // This also ensures that we perform all required migrations from `re_sorbet`.
337    // TODO(#10343): Would it make sense to change `re_types_core::ArrowMsg` to contain the
338    // `ChunkBatch` directly?
339    let chunk_batch = re_sorbet::ChunkBatch::try_from(&batch)?;
340
341    // TODO(emilk): it would actually be nicer if we could postpone the migration,
342    // so that there is some way to get the original (unmigrated) data out of an .rrd,
343    // which would be very useful for debugging, e.g. using the `print` command.
344
345    Ok(re_log_types::ArrowMsg {
346        chunk_id,
347        batch: chunk_batch.into(),
348        on_release: None,
349    })
350}
351
352/// Converts an application-level `LogMsg` to its transport-level counterpart.
353#[tracing::instrument(level = "trace", skip_all)]
354fn log_msg_app_to_transport(
355    message: &re_log_types::LogMsg,
356    compression: crate::rrd::Compression,
357) -> Result<re_protos::log_msg::v1alpha1::log_msg::Msg, CodecError> {
358    re_tracing::profile_function!();
359
360    let proto_msg = match message {
361        re_log_types::LogMsg::SetStoreInfo(set_store_info) => {
362            re_protos::log_msg::v1alpha1::log_msg::Msg::SetStoreInfo(set_store_info.clone().into())
363        }
364
365        re_log_types::LogMsg::ArrowMsg(store_id, arrow_msg) => {
366            let arrow_msg = arrow_msg_app_to_transport(arrow_msg, store_id.clone(), compression)?;
367            re_protos::log_msg::v1alpha1::log_msg::Msg::ArrowMsg(arrow_msg)
368        }
369
370        re_log_types::LogMsg::BlueprintActivationCommand(blueprint_activation_command) => {
371            re_protos::log_msg::v1alpha1::log_msg::Msg::BlueprintActivationCommand(
372                blueprint_activation_command.clone().into(),
373            )
374        }
375    };
376
377    Ok(proto_msg)
378}
379
380/// Converts an application-level `ArrowMsg` to its transport-level counterpart.
381#[tracing::instrument(level = "trace", skip_all)]
382fn arrow_msg_app_to_transport(
383    arrow_msg: &re_log_types::ArrowMsg,
384    store_id: re_log_types::StoreId,
385    compression: crate::rrd::Compression,
386) -> Result<re_protos::log_msg::v1alpha1::ArrowMsg, CodecError> {
387    re_tracing::profile_function!();
388
389    let re_log_types::ArrowMsg {
390        chunk_id,
391        batch,
392        on_release: _,
393    } = arrow_msg;
394
395    let payload = encode_arrow(batch, compression)?;
396
397    Ok(re_protos::log_msg::v1alpha1::ArrowMsg {
398        store_id: Some(store_id.into()),
399        chunk_id: Some((*chunk_id).into()),
400        compression: re_protos::log_msg::v1alpha1::Compression::from(compression) as i32,
401        uncompressed_size: payload.uncompressed_size,
402        encoding: re_protos::log_msg::v1alpha1::Encoding::ArrowIpc as i32,
403        payload: payload.data.into(),
404        is_static: re_sorbet::is_static_chunk(batch),
405    })
406}
407
408// ---
409
410struct EncodedArrowRecordBatch {
411    uncompressed_size: u64,
412    data: Vec<u8>,
413}
414
415/// Encodes a native `RecordBatch` to an IPC payload, optionally compressed.
416#[tracing::instrument(level = "debug", skip_all)]
417fn encode_arrow(
418    batch: &arrow::array::RecordBatch,
419    compression: crate::rrd::Compression,
420) -> Result<EncodedArrowRecordBatch, CodecError> {
421    re_tracing::profile_function!();
422
423    let mut uncompressed = Vec::new();
424    {
425        let schema = batch.schema_ref().as_ref();
426
427        let mut sw = {
428            let _span = tracing::trace_span!("schema").entered();
429            ::arrow::ipc::writer::StreamWriter::try_new(&mut uncompressed, schema)
430                .map_err(CodecError::ArrowSerialization)?
431        };
432
433        {
434            let _span = tracing::trace_span!("data").entered();
435            sw.write(batch).map_err(CodecError::ArrowSerialization)?;
436        }
437
438        sw.finish().map_err(CodecError::ArrowSerialization)?;
439    }
440
441    // This will never fail until we have 128bit-native CPUs, but `as` is too dangerous and very
442    // sensitive to refactorings.
443    let uncompressed_size = uncompressed.len().try_into()?;
444
445    let data = match compression {
446        crate::rrd::Compression::Off => uncompressed,
447        crate::rrd::Compression::LZ4 => {
448            re_tracing::profile_scope!("lz4::compress");
449            let _span = tracing::trace_span!("lz4::compress").entered();
450            lz4_flex::block::compress(&uncompressed)
451        }
452    };
453
454    Ok(EncodedArrowRecordBatch {
455        uncompressed_size,
456        data,
457    })
458}
459
460/// Decodes a potentially compressed IPC payload into a native `RecordBatch`.
461//
462// TODO(cmc): can we use the File-oriented APIs in order to re-use the transport buffer as backing
463// storage for the final RecordBatch?
464// See e.g. https://github.com/apache/arrow-rs/blob/b8b2f21f6a8254224d37a1e2d231b6b1e1767648/arrow/examples/zero_copy_ipc.rs
465#[tracing::instrument(level = "debug", skip_all)]
466fn decode_arrow(
467    data: &[u8],
468    uncompressed_size: usize,
469    compression: crate::rrd::Compression,
470) -> Result<arrow::array::RecordBatch, CodecError> {
471    let mut uncompressed = Vec::new();
472    let data = match compression {
473        crate::rrd::Compression::Off => data,
474        crate::rrd::Compression::LZ4 => {
475            re_tracing::profile_scope!("LZ4-decompress");
476            let _span = tracing::trace_span!("lz4::decompress").entered();
477            uncompressed.resize(uncompressed_size, 0);
478            lz4_flex::block::decompress_into(data, &mut uncompressed)?;
479            uncompressed.as_slice()
480        }
481    };
482
483    let mut stream = {
484        let _span = tracing::trace_span!("schema").entered();
485        ::arrow::ipc::reader::StreamReader::try_new(data, None)
486            .map_err(CodecError::ArrowDeserialization)?
487    };
488
489    let _span = tracing::trace_span!("data").entered();
490    stream
491        .next()
492        .ok_or(CodecError::MissingRecordBatch)?
493        .map_err(CodecError::ArrowDeserialization)
494}