re_types_core/
component_batch.rs

1use crate::{ArchetypeName, ComponentDescriptor, ComponentType, Loggable, SerializationResult};
2
3use arrow::array::ListArray as ArrowListArray;
4
5#[allow(unused_imports, clippy::unused_trait_names)] // used in docstrings
6use crate::Archetype;
7
8// ---
9
10/// A [`ComponentBatch`] represents an array's worth of [`Loggable`] instances, ready to be
11/// serialized.
12///
13/// [`ComponentBatch`] is carefully designed to be erasable ("object-safe"), so that it is possible
14/// to build heterogeneous collections of [`ComponentBatch`]s (e.g. `Vec<dyn ComponentBatch>`).
15/// This erasability is what makes extending [`Archetype`]s possible with little effort.
16///
17/// You should almost never need to implement [`ComponentBatch`] manually, as it is already
18/// blanket implemented for most common use cases (arrays/vectors/slices of loggables, etc).
19pub trait ComponentBatch {
20    // NOTE: It'd be tempting to have the following associated type, but that'd be
21    // counterproductive, the whole point of this is to allow for heterogeneous collections!
22    // type Loggable: Loggable;
23
24    /// Serializes the batch into an Arrow array.
25    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef>;
26
27    /// Serializes the batch into an Arrow list array with a single component per list.
28    fn to_arrow_list_array(&self) -> SerializationResult<ArrowListArray> {
29        let array = self.to_arrow()?;
30        let offsets =
31            arrow::buffer::OffsetBuffer::from_lengths(std::iter::repeat_n(1, array.len()));
32        let nullable = true;
33        let field = arrow::datatypes::Field::new("item", array.data_type().clone(), nullable);
34        ArrowListArray::try_new(field.into(), offsets, array, None).map_err(|err| err.into())
35    }
36
37    /// Serializes the contents of this [`ComponentBatch`].
38    ///
39    /// Once serialized, the data is ready to be logged into Rerun via the [`AsComponents`] trait.
40    ///
41    /// # Fallibility
42    ///
43    /// There are very few ways in which serialization can fail, all of which are very rare to hit
44    /// in practice.
45    /// One such example is trying to serialize data with more than 2^31 elements into a `ListArray`.
46    ///
47    /// For that reason, this method favors a nice user experience over error handling: errors will
48    /// merely be logged, not returned (except in debug builds, where all errors panic).
49    ///
50    /// See also [`ComponentBatch::try_serialized`].
51    ///
52    /// [`AsComponents`]: [crate::AsComponents]
53    #[inline]
54    fn serialized(&self, component_descr: ComponentDescriptor) -> Option<SerializedComponentBatch> {
55        match self.try_serialized(component_descr.clone()) {
56            Ok(array) => Some(array),
57
58            #[cfg(debug_assertions)]
59            Err(err) => {
60                panic!(
61                    "failed to serialize data for {}: {}",
62                    component_descr,
63                    re_error::format_ref(&err)
64                )
65            }
66
67            #[cfg(not(debug_assertions))]
68            Err(err) => {
69                re_log::error!(
70                    descriptor = %component_descr,
71                    "failed to serialize data: {}",
72                    re_error::format_ref(&err)
73                );
74                None
75            }
76        }
77    }
78
79    /// Serializes the contents of this [`ComponentBatch`].
80    ///
81    /// Once serialized, the data is ready to be logged into Rerun via the [`AsComponents`] trait.
82    ///
83    /// # Fallibility
84    ///
85    /// There are very few ways in which serialization can fail, all of which are very rare to hit
86    /// in practice.
87    ///
88    /// For that reason, it generally makes sense to favor a nice user experience over error handling
89    /// in most cases, see [`ComponentBatch::serialized`].
90    ///
91    /// [`AsComponents`]: [crate::AsComponents]
92    #[inline]
93    fn try_serialized(
94        &self,
95        component_descr: ComponentDescriptor,
96    ) -> SerializationResult<SerializedComponentBatch> {
97        Ok(SerializedComponentBatch {
98            array: self.to_arrow()?,
99            descriptor: component_descr,
100        })
101    }
102}
103
104#[allow(dead_code)]
105fn assert_component_batch_object_safe() {
106    let _: &dyn ComponentBatch;
107}
108
109// ---
110
111/// The serialized contents of a [`ComponentBatch`] with associated [`ComponentDescriptor`].
112///
113/// This is what gets logged into Rerun:
114/// * See [`ComponentBatch`] to easily serialize component data.
115/// * See [`AsComponents`] for logging serialized data.
116///
117/// [`AsComponents`]: [crate::AsComponents]
118#[derive(Debug, Clone)]
119pub struct SerializedComponentBatch {
120    // TODO(cmc): Maybe Cow<> this one if it grows bigger. Or intern descriptors altogether, most likely.
121    pub descriptor: ComponentDescriptor,
122
123    pub array: arrow::array::ArrayRef,
124}
125
126impl re_byte_size::SizeBytes for SerializedComponentBatch {
127    #[inline]
128    fn heap_size_bytes(&self) -> u64 {
129        let Self { array, descriptor } = self;
130        array.heap_size_bytes() + descriptor.heap_size_bytes()
131    }
132}
133
134impl PartialEq for SerializedComponentBatch {
135    #[inline]
136    fn eq(&self, other: &Self) -> bool {
137        let Self { array, descriptor } = self;
138
139        // Descriptor first!
140        *descriptor == other.descriptor && **array == *other.array
141    }
142}
143
144impl SerializedComponentBatch {
145    #[inline]
146    pub fn new(array: arrow::array::ArrayRef, descriptor: ComponentDescriptor) -> Self {
147        Self { array, descriptor }
148    }
149
150    #[inline]
151    pub fn with_descriptor_override(self, descriptor: ComponentDescriptor) -> Self {
152        Self { descriptor, ..self }
153    }
154
155    /// Unconditionally sets the descriptor's `archetype_name` to the given one.
156    #[inline]
157    pub fn with_archetype(mut self, archetype_name: ArchetypeName) -> Self {
158        self.descriptor = self.descriptor.with_archetype(archetype_name);
159        self
160    }
161
162    /// Unconditionally sets the descriptor's `component_type` to the given one.
163    #[inline]
164    pub fn with_component_type(mut self, component_type: ComponentType) -> Self {
165        self.descriptor = self.descriptor.with_component_type(component_type);
166        self
167    }
168
169    /// Sets the descriptor's `archetype_name` to the given one iff it's not already set.
170    #[inline]
171    pub fn or_with_archetype(mut self, archetype_name: impl Fn() -> ArchetypeName) -> Self {
172        self.descriptor = self.descriptor.or_with_archetype(archetype_name);
173        self
174    }
175
176    /// Sets the descriptor's `component` to the given one iff it's not already set.
177    #[inline]
178    pub fn or_with_component_type(
179        mut self,
180        component_type: impl FnOnce() -> ComponentType,
181    ) -> Self {
182        self.descriptor = self.descriptor.or_with_component_type(component_type);
183        self
184    }
185}
186
187/// A column's worth of component data.
188///
189/// If a [`SerializedComponentBatch`] represents one row's worth of data
190#[derive(Debug, Clone)]
191pub struct SerializedComponentColumn {
192    pub list_array: arrow::array::ListArray,
193
194    // TODO(cmc): Maybe Cow<> this one if it grows bigger. Or intern descriptors altogether, most likely.
195    pub descriptor: ComponentDescriptor,
196}
197
198impl SerializedComponentColumn {
199    /// Repartitions the component data into multiple sub-batches, ignoring the previous partitioning.
200    ///
201    /// The specified `lengths` must sum to the total length of the component batch.
202    pub fn repartitioned(
203        self,
204        lengths: impl IntoIterator<Item = usize>,
205    ) -> SerializationResult<Self> {
206        let Self {
207            list_array,
208            descriptor,
209        } = self;
210
211        let list_array = re_arrow_util::repartition_list_array(list_array, lengths)?;
212
213        Ok(Self {
214            list_array,
215            descriptor,
216        })
217    }
218}
219
220impl From<SerializedComponentBatch> for SerializedComponentColumn {
221    #[inline]
222    fn from(batch: SerializedComponentBatch) -> Self {
223        use arrow::{
224            array::{Array as _, ListArray},
225            buffer::OffsetBuffer,
226            datatypes::Field,
227        };
228
229        let list_array = {
230            let nullable = true;
231            let field = Field::new_list_field(batch.array.data_type().clone(), nullable);
232            let offsets = OffsetBuffer::from_lengths(std::iter::once(batch.array.len()));
233            let nulls = None;
234            ListArray::new(field.into(), offsets, batch.array, nulls)
235        };
236
237        Self {
238            list_array,
239            descriptor: batch.descriptor,
240        }
241    }
242}
243
244impl SerializedComponentBatch {
245    /// Partitions the component data into multiple sub-batches.
246    ///
247    /// Specifically, this transforms the existing [`SerializedComponentBatch`] data into a [`SerializedComponentColumn`].
248    ///
249    /// This makes it possible to use `RecordingStream::send_columns` to send columnar data directly into Rerun.
250    ///
251    /// The specified `lengths` must sum to the total length of the component batch.
252    #[inline]
253    pub fn partitioned(
254        self,
255        lengths: impl IntoIterator<Item = usize>,
256    ) -> SerializationResult<SerializedComponentColumn> {
257        let column: SerializedComponentColumn = self.into();
258        column.repartitioned(lengths)
259    }
260}
261
262// ---
263
264// TODO(cmc): This is far from ideal and feels very hackish, but for now the priority is getting
265// all things related to tags up and running so we can gather learnings.
266// This is only used on the archetype deserialization path, which isn't ever used outside of tests anyway.
267
268// TODO(cmc): we really shouldn't be duplicating these.
269
270/// The key used to identify the [`ArchetypeName`] in field-level metadata.
271const FIELD_METADATA_KEY_ARCHETYPE_NAME: &str = "rerun:archetype";
272
273/// The [`crate::ComponentIdentifier`] in field-level metadata.
274const FIELD_METADATA_KEY_COMPONENT: &str = "rerun:component";
275
276/// The key used to identify the [`ComponentType`] in field-level metadata.
277const FIELD_METADATA_KEY_COMPONENT_TYPE: &str = "rerun:component_type";
278
279impl From<&SerializedComponentBatch> for arrow::datatypes::Field {
280    #[inline]
281    fn from(batch: &SerializedComponentBatch) -> Self {
282        Self::new(
283            batch.descriptor.component.to_string(),
284            batch.array.data_type().clone(),
285            false,
286        )
287        .with_metadata(
288            [
289                batch.descriptor.archetype.map(|name| {
290                    (
291                        FIELD_METADATA_KEY_ARCHETYPE_NAME.to_owned(),
292                        name.to_string(),
293                    )
294                }),
295                Some((
296                    FIELD_METADATA_KEY_COMPONENT.to_owned(),
297                    batch.descriptor.component.to_string(),
298                )),
299                batch.descriptor.component_type.map(|name| {
300                    (
301                        FIELD_METADATA_KEY_COMPONENT_TYPE.to_owned(),
302                        name.to_string(),
303                    )
304                }),
305            ]
306            .into_iter()
307            .flatten()
308            .collect(),
309        )
310    }
311}
312
313// --- Unary ---
314
315impl<L: Clone + Loggable> ComponentBatch for L {
316    #[inline]
317    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
318        L::to_arrow([std::borrow::Cow::Borrowed(self)])
319    }
320}
321
322// --- Unary Option ---
323
324impl<L: Clone + Loggable> ComponentBatch for Option<L> {
325    #[inline]
326    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
327        L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
328    }
329}
330
331// --- Vec ---
332
333impl<L: Clone + Loggable> ComponentBatch for Vec<L> {
334    #[inline]
335    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
336        L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
337    }
338}
339
340// --- Vec<Option> ---
341
342impl<L: Loggable> ComponentBatch for Vec<Option<L>> {
343    #[inline]
344    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
345        L::to_arrow_opt(
346            self.iter()
347                .map(|opt| opt.as_ref().map(|v| std::borrow::Cow::Borrowed(v))),
348        )
349    }
350}
351
352// --- Array ---
353
354impl<L: Loggable, const N: usize> ComponentBatch for [L; N] {
355    #[inline]
356    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
357        L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
358    }
359}
360
361// --- Array<Option> ---
362
363impl<L: Loggable, const N: usize> ComponentBatch for [Option<L>; N] {
364    #[inline]
365    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
366        L::to_arrow_opt(
367            self.iter()
368                .map(|opt| opt.as_ref().map(|v| std::borrow::Cow::Borrowed(v))),
369        )
370    }
371}
372
373// --- Slice ---
374
375impl<L: Loggable> ComponentBatch for [L] {
376    #[inline]
377    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
378        L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
379    }
380}
381
382// --- Slice<Option> ---
383
384impl<L: Loggable> ComponentBatch for [Option<L>] {
385    #[inline]
386    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
387        L::to_arrow_opt(
388            self.iter()
389                .map(|opt| opt.as_ref().map(|v| std::borrow::Cow::Borrowed(v))),
390        )
391    }
392}