re_types_core/
component_batch.rs

1use crate::{ArchetypeName, ComponentDescriptor, ComponentType, Loggable, SerializationResult};
2
3use arrow::array::ListArray as ArrowListArray;
4
5// used in docstrings:
6#[allow(clippy::allow_attributes, unused_imports, clippy::unused_trait_names)]
7use crate::Archetype;
8
9// ---
10
11/// A [`ComponentBatch`] represents an array's worth of [`Loggable`] instances, ready to be
12/// serialized.
13///
14/// [`ComponentBatch`] is carefully designed to be erasable ("object-safe"), so that it is possible
15/// to build heterogeneous collections of [`ComponentBatch`]s (e.g. `Vec<dyn ComponentBatch>`).
16/// This erasability is what makes extending [`Archetype`]s possible with little effort.
17///
18/// You should almost never need to implement [`ComponentBatch`] manually, as it is already
19/// blanket implemented for most common use cases (arrays/vectors/slices of loggables, etc).
20pub trait ComponentBatch {
21    // NOTE: It'd be tempting to have the following associated type, but that'd be
22    // counterproductive, the whole point of this is to allow for heterogeneous collections!
23    // type Loggable: Loggable;
24
25    /// Serializes the batch into an Arrow array.
26    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef>;
27
28    /// Serializes the batch into an Arrow list array with a single component per list.
29    fn to_arrow_list_array(&self) -> SerializationResult<ArrowListArray> {
30        let array = self.to_arrow()?;
31        let offsets =
32            arrow::buffer::OffsetBuffer::from_lengths(std::iter::repeat_n(1, array.len()));
33        let nullable = true;
34        let field = arrow::datatypes::Field::new("item", array.data_type().clone(), nullable);
35        ArrowListArray::try_new(field.into(), offsets, array, None).map_err(|err| err.into())
36    }
37
38    /// Serializes the contents of this [`ComponentBatch`].
39    ///
40    /// Once serialized, the data is ready to be logged into Rerun via the [`AsComponents`] trait.
41    ///
42    /// # Fallibility
43    ///
44    /// There are very few ways in which serialization can fail, all of which are very rare to hit
45    /// in practice.
46    /// One such example is trying to serialize data with more than 2^31 elements into a `ListArray`.
47    ///
48    /// For that reason, this method favors a nice user experience over error handling: errors will
49    /// merely be logged, not returned (except in debug builds, where all errors panic).
50    ///
51    /// See also [`ComponentBatch::try_serialized`].
52    ///
53    /// [`AsComponents`]: [crate::AsComponents]
54    #[inline]
55    fn serialized(&self, component_descr: ComponentDescriptor) -> Option<SerializedComponentBatch> {
56        match self.try_serialized(component_descr.clone()) {
57            Ok(array) => Some(array),
58
59            #[cfg(debug_assertions)]
60            Err(err) => {
61                panic!(
62                    "failed to serialize data for {}: {}",
63                    component_descr,
64                    re_error::format_ref(&err)
65                )
66            }
67
68            #[cfg(not(debug_assertions))]
69            Err(err) => {
70                re_log::error!(
71                    descriptor = %component_descr,
72                    "failed to serialize data: {}",
73                    re_error::format_ref(&err)
74                );
75                None
76            }
77        }
78    }
79
80    /// Serializes the contents of this [`ComponentBatch`].
81    ///
82    /// Once serialized, the data is ready to be logged into Rerun via the [`AsComponents`] trait.
83    ///
84    /// # Fallibility
85    ///
86    /// There are very few ways in which serialization can fail, all of which are very rare to hit
87    /// in practice.
88    ///
89    /// For that reason, it generally makes sense to favor a nice user experience over error handling
90    /// in most cases, see [`ComponentBatch::serialized`].
91    ///
92    /// [`AsComponents`]: [crate::AsComponents]
93    #[inline]
94    fn try_serialized(
95        &self,
96        component_descr: ComponentDescriptor,
97    ) -> SerializationResult<SerializedComponentBatch> {
98        Ok(SerializedComponentBatch {
99            array: self.to_arrow()?,
100            descriptor: component_descr,
101        })
102    }
103}
104
105#[expect(dead_code)]
106fn assert_component_batch_object_safe() {
107    let _: &dyn ComponentBatch;
108}
109
110// ---
111
112/// The serialized contents of a [`ComponentBatch`] with associated [`ComponentDescriptor`].
113///
114/// This is what gets logged into Rerun:
115/// * See [`ComponentBatch`] to easily serialize component data.
116/// * See [`AsComponents`] for logging serialized data.
117///
118/// [`AsComponents`]: [crate::AsComponents]
119#[derive(Debug, Clone)]
120pub struct SerializedComponentBatch {
121    // TODO(cmc): Maybe Cow<> this one if it grows bigger. Or intern descriptors altogether, most likely.
122    pub descriptor: ComponentDescriptor,
123
124    pub array: arrow::array::ArrayRef,
125}
126
127impl re_byte_size::SizeBytes for SerializedComponentBatch {
128    #[inline]
129    fn heap_size_bytes(&self) -> u64 {
130        let Self { array, descriptor } = self;
131        array.heap_size_bytes() + descriptor.heap_size_bytes()
132    }
133}
134
135impl PartialEq for SerializedComponentBatch {
136    #[inline]
137    fn eq(&self, other: &Self) -> bool {
138        let Self { array, descriptor } = self;
139
140        // Descriptor first!
141        *descriptor == other.descriptor && **array == *other.array
142    }
143}
144
145impl SerializedComponentBatch {
146    #[inline]
147    pub fn new(array: arrow::array::ArrayRef, descriptor: ComponentDescriptor) -> Self {
148        Self { array, descriptor }
149    }
150
151    #[inline]
152    pub fn with_descriptor_override(self, descriptor: ComponentDescriptor) -> Self {
153        Self { descriptor, ..self }
154    }
155
156    /// Unconditionally sets the descriptor's `archetype_name` to the given one.
157    #[inline]
158    pub fn with_archetype(mut self, archetype_name: ArchetypeName) -> Self {
159        self.descriptor = self.descriptor.with_archetype(archetype_name);
160        self
161    }
162
163    /// Unconditionally sets the descriptor's `component_type` to the given one.
164    #[inline]
165    pub fn with_component_type(mut self, component_type: ComponentType) -> Self {
166        self.descriptor = self.descriptor.with_component_type(component_type);
167        self
168    }
169
170    /// Sets the descriptor's `archetype_name` to the given one iff it's not already set.
171    #[inline]
172    pub fn or_with_archetype(mut self, archetype_name: impl Fn() -> ArchetypeName) -> Self {
173        self.descriptor = self.descriptor.or_with_archetype(archetype_name);
174        self
175    }
176
177    /// Sets the descriptor's `component` to the given one iff it's not already set.
178    #[inline]
179    pub fn or_with_component_type(
180        mut self,
181        component_type: impl FnOnce() -> ComponentType,
182    ) -> Self {
183        self.descriptor = self.descriptor.or_with_component_type(component_type);
184        self
185    }
186}
187
188/// A column's worth of component data.
189///
190/// If a [`SerializedComponentBatch`] represents one row's worth of data
191#[derive(Debug, Clone)]
192pub struct SerializedComponentColumn {
193    pub list_array: arrow::array::ListArray,
194
195    // TODO(cmc): Maybe Cow<> this one if it grows bigger. Or intern descriptors altogether, most likely.
196    pub descriptor: ComponentDescriptor,
197}
198
199impl SerializedComponentColumn {
200    /// Repartitions the component data into multiple sub-batches, ignoring the previous partitioning.
201    ///
202    /// The specified `lengths` must sum to the total length of the component batch.
203    pub fn repartitioned(
204        self,
205        lengths: impl IntoIterator<Item = usize>,
206    ) -> SerializationResult<Self> {
207        let Self {
208            list_array,
209            descriptor,
210        } = self;
211
212        let list_array = re_arrow_util::repartition_list_array(list_array, lengths)?;
213
214        Ok(Self {
215            list_array,
216            descriptor,
217        })
218    }
219}
220
221impl From<SerializedComponentBatch> for SerializedComponentColumn {
222    #[inline]
223    fn from(batch: SerializedComponentBatch) -> Self {
224        use arrow::{
225            array::{Array as _, ListArray},
226            buffer::OffsetBuffer,
227            datatypes::Field,
228        };
229
230        let list_array = {
231            let nullable = true;
232            let field = Field::new_list_field(batch.array.data_type().clone(), nullable);
233            let offsets = OffsetBuffer::from_lengths(std::iter::once(batch.array.len()));
234            let nulls = None;
235            ListArray::new(field.into(), offsets, batch.array, nulls)
236        };
237
238        Self {
239            list_array,
240            descriptor: batch.descriptor,
241        }
242    }
243}
244
245impl SerializedComponentBatch {
246    /// Partitions the component data into multiple sub-batches.
247    ///
248    /// Specifically, this transforms the existing [`SerializedComponentBatch`] data into a [`SerializedComponentColumn`].
249    ///
250    /// This makes it possible to use `RecordingStream::send_columns` to send columnar data directly into Rerun.
251    ///
252    /// The specified `lengths` must sum to the total length of the component batch.
253    #[inline]
254    pub fn partitioned(
255        self,
256        lengths: impl IntoIterator<Item = usize>,
257    ) -> SerializationResult<SerializedComponentColumn> {
258        let column: SerializedComponentColumn = self.into();
259        column.repartitioned(lengths)
260    }
261}
262
263// ---
264
265// TODO(cmc): This is far from ideal and feels very hackish, but for now the priority is getting
266// all things related to tags up and running so we can gather learnings.
267// This is only used on the archetype deserialization path, which isn't ever used outside of tests anyway.
268
269// TODO(cmc): we really shouldn't be duplicating these.
270
271/// The key used to identify the [`ArchetypeName`] in field-level metadata.
272const FIELD_METADATA_KEY_ARCHETYPE_NAME: &str = "rerun:archetype";
273
274/// The [`crate::ComponentIdentifier`] in field-level metadata.
275const FIELD_METADATA_KEY_COMPONENT: &str = "rerun:component";
276
277/// The key used to identify the [`ComponentType`] in field-level metadata.
278const FIELD_METADATA_KEY_COMPONENT_TYPE: &str = "rerun:component_type";
279
280impl From<&SerializedComponentBatch> for arrow::datatypes::Field {
281    #[inline]
282    fn from(batch: &SerializedComponentBatch) -> Self {
283        Self::new(
284            batch.descriptor.component.to_string(),
285            batch.array.data_type().clone(),
286            false,
287        )
288        .with_metadata(
289            [
290                batch.descriptor.archetype.map(|name| {
291                    (
292                        FIELD_METADATA_KEY_ARCHETYPE_NAME.to_owned(),
293                        name.to_string(),
294                    )
295                }),
296                Some((
297                    FIELD_METADATA_KEY_COMPONENT.to_owned(),
298                    batch.descriptor.component.to_string(),
299                )),
300                batch.descriptor.component_type.map(|name| {
301                    (
302                        FIELD_METADATA_KEY_COMPONENT_TYPE.to_owned(),
303                        name.to_string(),
304                    )
305                }),
306            ]
307            .into_iter()
308            .flatten()
309            .collect(),
310        )
311    }
312}
313
314// --- Unary ---
315
316impl<L: Clone + Loggable> ComponentBatch for L {
317    #[inline]
318    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
319        L::to_arrow([std::borrow::Cow::Borrowed(self)])
320    }
321}
322
323// --- Unary Option ---
324
325impl<L: Clone + Loggable> ComponentBatch for Option<L> {
326    #[inline]
327    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
328        L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
329    }
330}
331
332// --- Vec ---
333
334impl<L: Clone + Loggable> ComponentBatch for Vec<L> {
335    #[inline]
336    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
337        L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
338    }
339}
340
341// --- Vec<Option> ---
342
343impl<L: Loggable> ComponentBatch for Vec<Option<L>> {
344    #[inline]
345    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
346        L::to_arrow_opt(
347            self.iter()
348                .map(|opt| opt.as_ref().map(|v| std::borrow::Cow::Borrowed(v))),
349        )
350    }
351}
352
353// --- Array ---
354
355impl<L: Loggable, const N: usize> ComponentBatch for [L; N] {
356    #[inline]
357    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
358        L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
359    }
360}
361
362// --- Array<Option> ---
363
364impl<L: Loggable, const N: usize> ComponentBatch for [Option<L>; N] {
365    #[inline]
366    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
367        L::to_arrow_opt(
368            self.iter()
369                .map(|opt| opt.as_ref().map(|v| std::borrow::Cow::Borrowed(v))),
370        )
371    }
372}
373
374// --- Slice ---
375
376impl<L: Loggable> ComponentBatch for [L] {
377    #[inline]
378    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
379        L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
380    }
381}
382
383// --- Slice<Option> ---
384
385impl<L: Loggable> ComponentBatch for [Option<L>] {
386    #[inline]
387    fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
388        L::to_arrow_opt(
389            self.iter()
390                .map(|opt| opt.as_ref().map(|v| std::borrow::Cow::Borrowed(v))),
391        )
392    }
393}