re_types_core/
loggable_batch.rs

1use std::borrow::Cow;
2
3use crate::{
4    ArchetypeFieldName, ArchetypeName, Component, ComponentDescriptor, ComponentName, Loggable,
5    SerializationResult,
6};
7
8use arrow::array::ListArray as ArrowListArray;
9
10#[allow(unused_imports)] // used in docstrings
11use crate::Archetype;
12
13// ---
14
15/// A [`LoggableBatch`] represents an array's worth of [`Loggable`] instances, ready to be
16/// serialized.
17///
18/// [`LoggableBatch`] is carefully designed to be erasable ("object-safe"), so that it is possible
19/// to build heterogeneous collections of [`LoggableBatch`]s (e.g. `Vec<dyn LoggableBatch>`).
20/// This erasability is what makes extending [`Archetype`]s possible with little effort.
21///
22/// You should almost never need to implement [`LoggableBatch`] manually, as it is already
23/// blanket implemented for most common use cases (arrays/vectors/slices of loggables, etc).
24pub trait LoggableBatch {
25    // NOTE: It'd be tempting to have the following associated type, but that'd be
26    // counterproductive, the whole point of this is to allow for heterogeneous collections!
27    // type Loggable: Loggable;
28
29    /// Serializes the batch into an Arrow array.
30    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef>;
31}
32
33#[allow(dead_code)]
34fn assert_loggablebatch_object_safe() {
35    let _: &dyn LoggableBatch;
36}
37
38/// A [`ComponentBatch`] represents an array's worth of [`Component`] instances.
39pub trait ComponentBatch: LoggableBatch {
40    /// Serializes the batch into an Arrow list array with a single component per list.
41    fn to_arrow_list_array(&self) -> SerializationResult<ArrowListArray> {
42        let array = self.to_arrow()?;
43        let offsets =
44            arrow::buffer::OffsetBuffer::from_lengths(std::iter::repeat(1).take(array.len()));
45        let nullable = true;
46        let field = arrow::datatypes::Field::new("item", array.data_type().clone(), nullable);
47        ArrowListArray::try_new(field.into(), offsets, array, None).map_err(|err| err.into())
48    }
49
50    /// Returns the complete [`ComponentDescriptor`] for this [`ComponentBatch`].
51    ///
52    /// Every component batch is uniquely identified by its [`ComponentDescriptor`].
53    fn descriptor(&self) -> Cow<'_, ComponentDescriptor>;
54
55    /// Serializes the contents of this [`ComponentBatch`].
56    ///
57    /// Once serialized, the data is ready to be logged into Rerun via the [`AsComponents`] trait.
58    ///
59    /// # Fallibility
60    ///
61    /// There are very few ways in which serialization can fail, all of which are very rare to hit
62    /// in practice.
63    /// One such example is trying to serialize data with more than 2^31 elements into a `ListArray`.
64    ///
65    /// For that reason, this method favors a nice user experience over error handling: errors will
66    /// merely be logged, not returned (except in debug builds, where all errors panic).
67    ///
68    /// See also [`ComponentBatch::try_serialized`].
69    ///
70    /// [`AsComponents`]: [crate::AsComponents]
71    #[inline]
72    fn serialized(&self) -> Option<SerializedComponentBatch> {
73        match self.try_serialized() {
74            Ok(array) => Some(array),
75
76            #[cfg(debug_assertions)]
77            Err(err) => {
78                panic!(
79                    "failed to serialize data for {}: {}",
80                    self.descriptor(),
81                    re_error::format_ref(&err)
82                )
83            }
84
85            #[cfg(not(debug_assertions))]
86            Err(err) => {
87                re_log::error!(
88                    descriptor = %self.descriptor(),
89                    "failed to serialize data: {}",
90                    re_error::format_ref(&err)
91                );
92                None
93            }
94        }
95    }
96
97    /// Serializes the contents of this [`ComponentBatch`].
98    ///
99    /// Once serialized, the data is ready to be logged into Rerun via the [`AsComponents`] trait.
100    ///
101    /// # Fallibility
102    ///
103    /// There are very few ways in which serialization can fail, all of which are very rare to hit
104    /// in practice.
105    ///
106    /// For that reason, it generally makes sense to favor a nice user experience over error handling
107    /// in most cases, see [`ComponentBatch::serialized`].
108    ///
109    /// [`AsComponents`]: [crate::AsComponents]
110    #[inline]
111    fn try_serialized(&self) -> SerializationResult<SerializedComponentBatch> {
112        Ok(SerializedComponentBatch {
113            array: self.to_arrow()?,
114            descriptor: self.descriptor().into_owned(),
115        })
116    }
117
118    /// The fully-qualified name of this component batch, e.g. `rerun.components.Position2D`.
119    ///
120    /// This is a trivial but useful helper for `self.descriptor().component_name`.
121    ///
122    /// The default implementation already does the right thing. Do not override unless you know
123    /// what you're doing.
124    /// `Self::name()` must exactly match the value returned by `self.descriptor().component_name`,
125    /// or undefined behavior ensues.
126    #[inline]
127    fn name(&self) -> ComponentName {
128        self.descriptor().component_name
129    }
130}
131
132#[allow(dead_code)]
133fn assert_component_batch_object_safe() {
134    let _: &dyn LoggableBatch;
135}
136
137// ---
138
139/// The serialized contents of a [`ComponentBatch`] with associated [`ComponentDescriptor`].
140///
141/// This is what gets logged into Rerun:
142/// * See [`ComponentBatch`] to easily serialize component data.
143/// * See [`AsComponents`] for logging serialized data.
144///
145/// [`AsComponents`]: [crate::AsComponents]
146#[derive(Debug, Clone)]
147pub struct SerializedComponentBatch {
148    pub array: arrow::array::ArrayRef,
149
150    // TODO(cmc): Maybe Cow<> this one if it grows bigger. Or intern descriptors altogether, most likely.
151    pub descriptor: ComponentDescriptor,
152}
153
154impl re_byte_size::SizeBytes for SerializedComponentBatch {
155    #[inline]
156    fn heap_size_bytes(&self) -> u64 {
157        let Self { array, descriptor } = self;
158        array.heap_size_bytes() + descriptor.heap_size_bytes()
159    }
160}
161
162impl PartialEq for SerializedComponentBatch {
163    #[inline]
164    fn eq(&self, other: &Self) -> bool {
165        let Self { array, descriptor } = self;
166
167        // Descriptor first!
168        *descriptor == other.descriptor && **array == *other.array
169    }
170}
171
172impl SerializedComponentBatch {
173    #[inline]
174    pub fn new(array: arrow::array::ArrayRef, descriptor: ComponentDescriptor) -> Self {
175        Self { array, descriptor }
176    }
177
178    #[inline]
179    pub fn with_descriptor_override(self, descriptor: ComponentDescriptor) -> Self {
180        Self { descriptor, ..self }
181    }
182
183    /// Unconditionally sets the descriptor's `archetype_name` to the given one.
184    #[inline]
185    pub fn with_archetype_name(mut self, archetype_name: ArchetypeName) -> Self {
186        self.descriptor = self.descriptor.with_archetype_name(archetype_name);
187        self
188    }
189
190    /// Unconditionally sets the descriptor's `archetype_field_name` to the given one.
191    #[inline]
192    pub fn with_archetype_field_name(mut self, archetype_field_name: ArchetypeFieldName) -> Self {
193        self.descriptor = self
194            .descriptor
195            .with_archetype_field_name(archetype_field_name);
196        self
197    }
198
199    /// Sets the descriptor's `archetype_name` to the given one iff it's not already set.
200    #[inline]
201    pub fn or_with_archetype_name(mut self, archetype_name: impl Fn() -> ArchetypeName) -> Self {
202        self.descriptor = self.descriptor.or_with_archetype_name(archetype_name);
203        self
204    }
205
206    /// Sets the descriptor's `archetype_field_name` to the given one iff it's not already set.
207    #[inline]
208    pub fn or_with_archetype_field_name(
209        mut self,
210        archetype_field_name: impl FnOnce() -> ArchetypeFieldName,
211    ) -> Self {
212        self.descriptor = self
213            .descriptor
214            .or_with_archetype_field_name(archetype_field_name);
215        self
216    }
217}
218
219/// A column's worth of component data.
220///
221/// If a [`SerializedComponentBatch`] represents one row's worth of data
222#[derive(Debug, Clone)]
223pub struct SerializedComponentColumn {
224    pub list_array: arrow::array::ListArray,
225
226    // TODO(cmc): Maybe Cow<> this one if it grows bigger. Or intern descriptors altogether, most likely.
227    pub descriptor: ComponentDescriptor,
228}
229
230impl SerializedComponentColumn {
231    /// Repartitions the component data into multiple sub-batches, ignoring the previous partitioning.
232    ///
233    /// The specified `lengths` must sum to the total length of the component batch.
234    pub fn repartitioned(
235        self,
236        lengths: impl IntoIterator<Item = usize>,
237    ) -> SerializationResult<Self> {
238        let Self {
239            list_array,
240            descriptor,
241        } = self;
242
243        let list_array = re_arrow_util::arrow_util::repartition_list_array(list_array, lengths)?;
244
245        Ok(Self {
246            list_array,
247            descriptor,
248        })
249    }
250}
251
252impl From<SerializedComponentBatch> for SerializedComponentColumn {
253    #[inline]
254    fn from(batch: SerializedComponentBatch) -> Self {
255        use arrow::{
256            array::{Array, ListArray},
257            buffer::OffsetBuffer,
258            datatypes::Field,
259        };
260
261        let list_array = {
262            let nullable = true;
263            let field = Field::new_list_field(batch.array.data_type().clone(), nullable);
264            let offsets = OffsetBuffer::from_lengths(std::iter::once(batch.array.len()));
265            let nulls = None;
266            ListArray::new(field.into(), offsets, batch.array, nulls)
267        };
268
269        Self {
270            list_array,
271            descriptor: batch.descriptor,
272        }
273    }
274}
275
276impl SerializedComponentBatch {
277    /// Partitions the component data into multiple sub-batches.
278    ///
279    /// Specifically, this transforms the existing [`SerializedComponentBatch`] data into a [`SerializedComponentColumn`].
280    ///
281    /// This makes it possible to use `RecordingStream::send_columns` to send columnar data directly into Rerun.
282    ///
283    /// The specified `lengths` must sum to the total length of the component batch.
284    #[inline]
285    pub fn partitioned(
286        self,
287        lengths: impl IntoIterator<Item = usize>,
288    ) -> SerializationResult<SerializedComponentColumn> {
289        let column: SerializedComponentColumn = self.into();
290        column.repartitioned(lengths)
291    }
292}
293
294// ---
295
296// TODO(cmc): This is far from ideal and feels very hackish, but for now the priority is getting
297// all things related to tags up and running so we can gather learnings.
298// This is only used on the archetype deserialization path, which isn't ever used outside of tests anyway.
299
300// TODO(cmc): we really shouldn't be duplicating these.
301
302/// The key used to identify the [`crate::ArchetypeName`] in field-level metadata.
303const FIELD_METADATA_KEY_ARCHETYPE_NAME: &str = "rerun.archetype_name";
304
305/// The key used to identify the [`crate::ArchetypeFieldName`] in field-level metadata.
306const FIELD_METADATA_KEY_ARCHETYPE_FIELD_NAME: &str = "rerun.archetype_field_name";
307
308impl From<&SerializedComponentBatch> for arrow::datatypes::Field {
309    #[inline]
310    fn from(batch: &SerializedComponentBatch) -> Self {
311        Self::new(
312            batch.descriptor.component_name.to_string(),
313            batch.array.data_type().clone(),
314            false,
315        )
316        .with_metadata(
317            [
318                batch.descriptor.archetype_name.map(|name| {
319                    (
320                        FIELD_METADATA_KEY_ARCHETYPE_NAME.to_owned(),
321                        name.to_string(),
322                    )
323                }),
324                batch.descriptor.archetype_field_name.map(|name| {
325                    (
326                        FIELD_METADATA_KEY_ARCHETYPE_FIELD_NAME.to_owned(),
327                        name.to_string(),
328                    )
329                }),
330            ]
331            .into_iter()
332            .flatten()
333            .collect(),
334        )
335    }
336}
337
338// --- Unary ---
339
340impl<L: Clone + Loggable> LoggableBatch for L {
341    #[inline]
342    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
343        L::to_arrow([std::borrow::Cow::Borrowed(self)])
344    }
345}
346
347impl<C: Component> ComponentBatch for C {
348    #[inline]
349    fn descriptor(&self) -> Cow<'_, ComponentDescriptor> {
350        C::descriptor().into()
351    }
352}
353
354// --- Unary Option ---
355
356impl<L: Clone + Loggable> LoggableBatch for Option<L> {
357    #[inline]
358    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
359        L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
360    }
361}
362
363impl<C: Component> ComponentBatch for Option<C> {
364    #[inline]
365    fn descriptor(&self) -> Cow<'_, ComponentDescriptor> {
366        C::descriptor().into()
367    }
368}
369
370// --- Vec ---
371
372impl<L: Clone + Loggable> LoggableBatch for Vec<L> {
373    #[inline]
374    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
375        L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
376    }
377}
378
379impl<C: Component> ComponentBatch for Vec<C> {
380    #[inline]
381    fn descriptor(&self) -> Cow<'_, ComponentDescriptor> {
382        C::descriptor().into()
383    }
384}
385
386// --- Vec<Option> ---
387
388impl<L: Loggable> LoggableBatch for Vec<Option<L>> {
389    #[inline]
390    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
391        L::to_arrow_opt(
392            self.iter()
393                .map(|opt| opt.as_ref().map(|v| std::borrow::Cow::Borrowed(v))),
394        )
395    }
396}
397
398impl<C: Component> ComponentBatch for Vec<Option<C>> {
399    #[inline]
400    fn descriptor(&self) -> Cow<'_, ComponentDescriptor> {
401        C::descriptor().into()
402    }
403}
404
405// --- Array ---
406
407impl<L: Loggable, const N: usize> LoggableBatch for [L; N] {
408    #[inline]
409    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
410        L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
411    }
412}
413
414impl<C: Component, const N: usize> ComponentBatch for [C; N] {
415    #[inline]
416    fn descriptor(&self) -> Cow<'_, ComponentDescriptor> {
417        C::descriptor().into()
418    }
419}
420
421// --- Array<Option> ---
422
423impl<L: Loggable, const N: usize> LoggableBatch for [Option<L>; N] {
424    #[inline]
425    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
426        L::to_arrow_opt(
427            self.iter()
428                .map(|opt| opt.as_ref().map(|v| std::borrow::Cow::Borrowed(v))),
429        )
430    }
431}
432
433impl<C: Component, const N: usize> ComponentBatch for [Option<C>; N] {
434    #[inline]
435    fn descriptor(&self) -> Cow<'_, ComponentDescriptor> {
436        C::descriptor().into()
437    }
438}
439
440// --- Slice ---
441
442impl<L: Loggable> LoggableBatch for [L] {
443    #[inline]
444    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
445        L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
446    }
447}
448
449impl<C: Component> ComponentBatch for [C] {
450    #[inline]
451    fn descriptor(&self) -> Cow<'_, ComponentDescriptor> {
452        C::descriptor().into()
453    }
454}
455
456// --- Slice<Option> ---
457
458impl<L: Loggable> LoggableBatch for [Option<L>] {
459    #[inline]
460    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
461        L::to_arrow_opt(
462            self.iter()
463                .map(|opt| opt.as_ref().map(|v| std::borrow::Cow::Borrowed(v))),
464        )
465    }
466}
467
468impl<C: Component> ComponentBatch for [Option<C>] {
469    #[inline]
470    fn descriptor(&self) -> Cow<'_, ComponentDescriptor> {
471        C::descriptor().into()
472    }
473}