re_types_core/
component_batch.rs

1use crate::{ArchetypeName, ComponentDescriptor, ComponentType, Loggable, SerializationResult};
2
3use arrow::array::{ListArray as ArrowListArray, ListArray};
4use arrow::buffer::OffsetBuffer;
5
6// used in docstrings:
7#[allow(clippy::allow_attributes, unused_imports, clippy::unused_trait_names)]
8use crate::Archetype;
9
10// ---
11
12/// A [`ComponentBatch`] represents an array's worth of [`Loggable`] instances, ready to be
13/// serialized.
14///
15/// [`ComponentBatch`] is carefully designed to be erasable ("object-safe"), so that it is possible
16/// to build heterogeneous collections of [`ComponentBatch`]s (e.g. `Vec<dyn ComponentBatch>`).
17/// This erasability is what makes extending [`Archetype`]s possible with little effort.
18///
19/// You should almost never need to implement [`ComponentBatch`] manually, as it is already
20/// blanket implemented for most common use cases (arrays/vectors/slices of loggables, etc).
21pub trait ComponentBatch {
22    // NOTE: It'd be tempting to have the following associated type, but that'd be
23    // counterproductive, the whole point of this is to allow for heterogeneous collections!
24    // type Loggable: Loggable;
25
26    /// Serializes the batch into an Arrow array.
27    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef>;
28
29    /// Serializes the batch into an Arrow list array with a single component per list.
30    fn to_arrow_list_array(&self) -> SerializationResult<ArrowListArray> {
31        let array = self.to_arrow()?;
32        let offsets =
33            arrow::buffer::OffsetBuffer::from_lengths(std::iter::repeat_n(1, array.len()));
34        let nullable = true;
35        let field = arrow::datatypes::Field::new("item", array.data_type().clone(), nullable);
36        ArrowListArray::try_new(field.into(), offsets, array, None).map_err(|err| err.into())
37    }
38
39    /// Serializes the contents of this [`ComponentBatch`].
40    ///
41    /// Once serialized, the data is ready to be logged into Rerun via the [`AsComponents`] trait.
42    ///
43    /// # Fallibility
44    ///
45    /// There are very few ways in which serialization can fail, all of which are very rare to hit
46    /// in practice.
47    /// One such example is trying to serialize data with more than 2^31 elements into a `ListArray`.
48    ///
49    /// For that reason, this method favors a nice user experience over error handling: errors will
50    /// merely be logged, not returned (except in debug builds, where all errors panic).
51    ///
52    /// See also [`ComponentBatch::try_serialized`].
53    ///
54    /// [`AsComponents`]: [crate::AsComponents]
55    #[inline]
56    fn serialized(&self, component_descr: ComponentDescriptor) -> Option<SerializedComponentBatch> {
57        match self.try_serialized(component_descr.clone()) {
58            Ok(array) => Some(array),
59
60            #[cfg(debug_assertions)]
61            Err(err) => {
62                panic!(
63                    "failed to serialize data for {}: {}",
64                    component_descr,
65                    re_error::format_ref(&err)
66                )
67            }
68
69            #[cfg(not(debug_assertions))]
70            Err(err) => {
71                re_log::error!(
72                    descriptor = %component_descr,
73                    "failed to serialize data: {}",
74                    re_error::format_ref(&err)
75                );
76                None
77            }
78        }
79    }
80
81    /// Serializes the contents of this [`ComponentBatch`].
82    ///
83    /// Once serialized, the data is ready to be logged into Rerun via the [`AsComponents`] trait.
84    ///
85    /// # Fallibility
86    ///
87    /// There are very few ways in which serialization can fail, all of which are very rare to hit
88    /// in practice.
89    ///
90    /// For that reason, it generally makes sense to favor a nice user experience over error handling
91    /// in most cases, see [`ComponentBatch::serialized`].
92    ///
93    /// [`AsComponents`]: [crate::AsComponents]
94    #[inline]
95    fn try_serialized(
96        &self,
97        component_descr: ComponentDescriptor,
98    ) -> SerializationResult<SerializedComponentBatch> {
99        Ok(SerializedComponentBatch {
100            array: self.to_arrow()?,
101            descriptor: component_descr,
102        })
103    }
104}
105
106#[expect(dead_code)]
107fn assert_component_batch_object_safe() {
108    let _: &dyn ComponentBatch;
109}
110
111// ---
112
113/// The serialized contents of a [`ComponentBatch`] with associated [`ComponentDescriptor`].
114///
115/// This is what gets logged into Rerun:
116/// * See [`ComponentBatch`] to easily serialize component data.
117/// * See [`AsComponents`] for logging serialized data.
118///
119/// [`AsComponents`]: [crate::AsComponents]
120#[derive(Debug, Clone)]
121pub struct SerializedComponentBatch {
122    // TODO(cmc): Maybe Cow<> this one if it grows bigger. Or intern descriptors altogether, most likely.
123    pub descriptor: ComponentDescriptor,
124
125    pub array: arrow::array::ArrayRef,
126}
127
128impl re_byte_size::SizeBytes for SerializedComponentBatch {
129    #[inline]
130    fn heap_size_bytes(&self) -> u64 {
131        let Self { array, descriptor } = self;
132        array.heap_size_bytes() + descriptor.heap_size_bytes()
133    }
134}
135
136impl PartialEq for SerializedComponentBatch {
137    #[inline]
138    fn eq(&self, other: &Self) -> bool {
139        let Self { array, descriptor } = self;
140
141        // Descriptor first!
142        *descriptor == other.descriptor && **array == *other.array
143    }
144}
145
146impl SerializedComponentBatch {
147    #[inline]
148    pub fn new(array: arrow::array::ArrayRef, descriptor: ComponentDescriptor) -> Self {
149        Self { array, descriptor }
150    }
151
152    #[inline]
153    pub fn with_descriptor_override(self, descriptor: ComponentDescriptor) -> Self {
154        Self { descriptor, ..self }
155    }
156
157    /// Unconditionally sets the descriptor's `archetype_name` to the given one.
158    #[inline]
159    pub fn with_archetype(mut self, archetype_name: ArchetypeName) -> Self {
160        self.descriptor = self.descriptor.with_archetype(archetype_name);
161        self
162    }
163
164    /// Unconditionally sets the descriptor's `component_type` to the given one.
165    #[inline]
166    pub fn with_component_type(mut self, component_type: ComponentType) -> Self {
167        self.descriptor = self.descriptor.with_component_type(component_type);
168        self
169    }
170
171    /// Sets the descriptor's `archetype_name` to the given one iff it's not already set.
172    #[inline]
173    pub fn or_with_archetype(mut self, archetype_name: impl Fn() -> ArchetypeName) -> Self {
174        self.descriptor = self.descriptor.or_with_archetype(archetype_name);
175        self
176    }
177
178    /// Sets the descriptor's `component` to the given one iff it's not already set.
179    #[inline]
180    pub fn or_with_component_type(
181        mut self,
182        component_type: impl FnOnce() -> ComponentType,
183    ) -> Self {
184        self.descriptor = self.descriptor.or_with_component_type(component_type);
185        self
186    }
187}
188
189/// A column's worth of component data.
190///
191/// If a [`SerializedComponentBatch`] represents one row's worth of data
192#[derive(Debug, Clone, PartialEq)]
193pub struct SerializedComponentColumn {
194    pub list_array: arrow::array::ListArray,
195
196    // TODO(cmc): Maybe Cow<> this one if it grows bigger. Or intern descriptors altogether, most likely.
197    pub descriptor: ComponentDescriptor,
198}
199
200impl SerializedComponentColumn {
201    #[inline]
202    pub fn new(list_array: arrow::array::ListArray, descriptor: ComponentDescriptor) -> Self {
203        Self {
204            list_array,
205            descriptor,
206        }
207    }
208
209    /// Repartitions the component data into multiple sub-batches, ignoring the previous partitioning.
210    ///
211    /// The specified `lengths` must sum to the total length of the component batch.
212    pub fn repartitioned(
213        self,
214        lengths: impl IntoIterator<Item = usize>,
215    ) -> SerializationResult<Self> {
216        let Self {
217            list_array,
218            descriptor,
219        } = self;
220
221        let list_array = repartition_list_array(list_array, lengths)?;
222
223        Ok(Self {
224            list_array,
225            descriptor,
226        })
227    }
228}
229
230impl re_byte_size::SizeBytes for SerializedComponentColumn {
231    #[inline]
232    fn heap_size_bytes(&self) -> u64 {
233        self.list_array.heap_size_bytes() + self.descriptor.heap_size_bytes()
234    }
235}
236
237impl From<SerializedComponentBatch> for SerializedComponentColumn {
238    #[inline]
239    fn from(batch: SerializedComponentBatch) -> Self {
240        use arrow::{
241            array::{Array as _, ListArray},
242            buffer::OffsetBuffer,
243            datatypes::Field,
244        };
245
246        let list_array = {
247            let nullable = true;
248            let field = Field::new_list_field(batch.array.data_type().clone(), nullable);
249            let offsets = OffsetBuffer::from_lengths(std::iter::once(batch.array.len()));
250            let nulls = None;
251            ListArray::new(field.into(), offsets, batch.array, nulls)
252        };
253
254        Self {
255            list_array,
256            descriptor: batch.descriptor,
257        }
258    }
259}
260
261/// Repartitions a [`ListArray`] according to the specified `lengths`, ignoring previous partitioning.
262///
263/// The specified `lengths` must sum to the total length underlying values (i.e. the child array).
264///
265/// The validity of the values is ignored.
266#[inline]
267pub fn repartition_list_array(
268    list_array: ListArray,
269    lengths: impl IntoIterator<Item = usize>,
270) -> arrow::error::Result<ListArray> {
271    let (field, _offsets, values, _nulls) = list_array.into_parts();
272
273    let offsets = OffsetBuffer::from_lengths(lengths);
274    let nulls = None;
275
276    ListArray::try_new(field, offsets, values, nulls)
277}
278
279impl SerializedComponentBatch {
280    /// Partitions the component data into multiple sub-batches.
281    ///
282    /// Specifically, this transforms the existing [`SerializedComponentBatch`] data into a [`SerializedComponentColumn`].
283    ///
284    /// This makes it possible to use `RecordingStream::send_columns` to send columnar data directly into Rerun.
285    ///
286    /// The specified `lengths` must sum to the total length of the component batch.
287    #[inline]
288    pub fn partitioned(
289        self,
290        lengths: impl IntoIterator<Item = usize>,
291    ) -> SerializationResult<SerializedComponentColumn> {
292        let column: SerializedComponentColumn = self.into();
293        column.repartitioned(lengths)
294    }
295}
296
297// ---
298
299// TODO(cmc): This is far from ideal and feels very hackish, but for now the priority is getting
300// all things related to tags up and running so we can gather learnings.
301// This is only used on the archetype deserialization path, which isn't ever used outside of tests anyway.
302
303impl From<&SerializedComponentBatch> for arrow::datatypes::Field {
304    #[inline]
305    fn from(batch: &SerializedComponentBatch) -> Self {
306        Self::new(
307            batch.descriptor.component.to_string(),
308            batch.array.data_type().clone(),
309            false,
310        )
311        .with_metadata(
312            [
313                batch.descriptor.archetype.map(|name| {
314                    (
315                        crate::FIELD_METADATA_KEY_ARCHETYPE.to_owned(),
316                        name.to_string(),
317                    )
318                }),
319                Some((
320                    crate::FIELD_METADATA_KEY_COMPONENT.to_owned(),
321                    batch.descriptor.component.to_string(),
322                )),
323                batch.descriptor.component_type.map(|name| {
324                    (
325                        crate::FIELD_METADATA_KEY_COMPONENT_TYPE.to_owned(),
326                        name.to_string(),
327                    )
328                }),
329            ]
330            .into_iter()
331            .flatten()
332            .collect(),
333        )
334    }
335}
336
337// --- Unary ---
338
339impl<L: Clone + Loggable> ComponentBatch for L {
340    #[inline]
341    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
342        L::to_arrow([std::borrow::Cow::Borrowed(self)])
343    }
344}
345
346// --- Unary Option ---
347
348impl<L: Clone + Loggable> ComponentBatch for Option<L> {
349    #[inline]
350    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
351        L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
352    }
353}
354
355// --- Vec ---
356
357impl<L: Clone + Loggable> ComponentBatch for Vec<L> {
358    #[inline]
359    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
360        L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
361    }
362}
363
364// --- Vec<Option> ---
365
366impl<L: Loggable> ComponentBatch for Vec<Option<L>> {
367    #[inline]
368    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
369        L::to_arrow_opt(
370            self.iter()
371                .map(|opt| opt.as_ref().map(|v| std::borrow::Cow::Borrowed(v))),
372        )
373    }
374}
375
376// --- Array ---
377
378impl<L: Loggable, const N: usize> ComponentBatch for [L; N] {
379    #[inline]
380    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
381        L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
382    }
383}
384
385// --- Array<Option> ---
386
387impl<L: Loggable, const N: usize> ComponentBatch for [Option<L>; N] {
388    #[inline]
389    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
390        L::to_arrow_opt(
391            self.iter()
392                .map(|opt| opt.as_ref().map(|v| std::borrow::Cow::Borrowed(v))),
393        )
394    }
395}
396
397// --- Slice ---
398
399impl<L: Loggable> ComponentBatch for [L] {
400    #[inline]
401    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
402        L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
403    }
404}
405
406// --- Slice<Option> ---
407
408impl<L: Loggable> ComponentBatch for [Option<L>] {
409    #[inline]
410    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
411        L::to_arrow_opt(
412            self.iter()
413                .map(|opt| opt.as_ref().map(|v| std::borrow::Cow::Borrowed(v))),
414        )
415    }
416}