re_types_core/
component_batch.rs1use crate::{ArchetypeName, ComponentDescriptor, ComponentType, Loggable, SerializationResult};
2
3use arrow::array::ListArray as ArrowListArray;
4
5#[allow(clippy::allow_attributes, unused_imports, clippy::unused_trait_names)]
7use crate::Archetype;
8
9pub trait ComponentBatch {
21 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef>;
27
28 fn to_arrow_list_array(&self) -> SerializationResult<ArrowListArray> {
30 let array = self.to_arrow()?;
31 let offsets =
32 arrow::buffer::OffsetBuffer::from_lengths(std::iter::repeat_n(1, array.len()));
33 let nullable = true;
34 let field = arrow::datatypes::Field::new("item", array.data_type().clone(), nullable);
35 ArrowListArray::try_new(field.into(), offsets, array, None).map_err(|err| err.into())
36 }
37
38 #[inline]
55 fn serialized(&self, component_descr: ComponentDescriptor) -> Option<SerializedComponentBatch> {
56 match self.try_serialized(component_descr.clone()) {
57 Ok(array) => Some(array),
58
59 #[cfg(debug_assertions)]
60 Err(err) => {
61 panic!(
62 "failed to serialize data for {}: {}",
63 component_descr,
64 re_error::format_ref(&err)
65 )
66 }
67
68 #[cfg(not(debug_assertions))]
69 Err(err) => {
70 re_log::error!(
71 descriptor = %component_descr,
72 "failed to serialize data: {}",
73 re_error::format_ref(&err)
74 );
75 None
76 }
77 }
78 }
79
80 #[inline]
94 fn try_serialized(
95 &self,
96 component_descr: ComponentDescriptor,
97 ) -> SerializationResult<SerializedComponentBatch> {
98 Ok(SerializedComponentBatch {
99 array: self.to_arrow()?,
100 descriptor: component_descr,
101 })
102 }
103}
104
105#[expect(dead_code)]
106fn assert_component_batch_object_safe() {
107 let _: &dyn ComponentBatch;
108}
109
110#[derive(Debug, Clone)]
120pub struct SerializedComponentBatch {
121 pub descriptor: ComponentDescriptor,
123
124 pub array: arrow::array::ArrayRef,
125}
126
127impl re_byte_size::SizeBytes for SerializedComponentBatch {
128 #[inline]
129 fn heap_size_bytes(&self) -> u64 {
130 let Self { array, descriptor } = self;
131 array.heap_size_bytes() + descriptor.heap_size_bytes()
132 }
133}
134
135impl PartialEq for SerializedComponentBatch {
136 #[inline]
137 fn eq(&self, other: &Self) -> bool {
138 let Self { array, descriptor } = self;
139
140 *descriptor == other.descriptor && **array == *other.array
142 }
143}
144
145impl SerializedComponentBatch {
146 #[inline]
147 pub fn new(array: arrow::array::ArrayRef, descriptor: ComponentDescriptor) -> Self {
148 Self { array, descriptor }
149 }
150
151 #[inline]
152 pub fn with_descriptor_override(self, descriptor: ComponentDescriptor) -> Self {
153 Self { descriptor, ..self }
154 }
155
156 #[inline]
158 pub fn with_archetype(mut self, archetype_name: ArchetypeName) -> Self {
159 self.descriptor = self.descriptor.with_archetype(archetype_name);
160 self
161 }
162
163 #[inline]
165 pub fn with_component_type(mut self, component_type: ComponentType) -> Self {
166 self.descriptor = self.descriptor.with_component_type(component_type);
167 self
168 }
169
170 #[inline]
172 pub fn or_with_archetype(mut self, archetype_name: impl Fn() -> ArchetypeName) -> Self {
173 self.descriptor = self.descriptor.or_with_archetype(archetype_name);
174 self
175 }
176
177 #[inline]
179 pub fn or_with_component_type(
180 mut self,
181 component_type: impl FnOnce() -> ComponentType,
182 ) -> Self {
183 self.descriptor = self.descriptor.or_with_component_type(component_type);
184 self
185 }
186}
187
188#[derive(Debug, Clone)]
192pub struct SerializedComponentColumn {
193 pub list_array: arrow::array::ListArray,
194
195 pub descriptor: ComponentDescriptor,
197}
198
199impl SerializedComponentColumn {
200 pub fn repartitioned(
204 self,
205 lengths: impl IntoIterator<Item = usize>,
206 ) -> SerializationResult<Self> {
207 let Self {
208 list_array,
209 descriptor,
210 } = self;
211
212 let list_array = re_arrow_util::repartition_list_array(list_array, lengths)?;
213
214 Ok(Self {
215 list_array,
216 descriptor,
217 })
218 }
219}
220
221impl From<SerializedComponentBatch> for SerializedComponentColumn {
222 #[inline]
223 fn from(batch: SerializedComponentBatch) -> Self {
224 use arrow::{
225 array::{Array as _, ListArray},
226 buffer::OffsetBuffer,
227 datatypes::Field,
228 };
229
230 let list_array = {
231 let nullable = true;
232 let field = Field::new_list_field(batch.array.data_type().clone(), nullable);
233 let offsets = OffsetBuffer::from_lengths(std::iter::once(batch.array.len()));
234 let nulls = None;
235 ListArray::new(field.into(), offsets, batch.array, nulls)
236 };
237
238 Self {
239 list_array,
240 descriptor: batch.descriptor,
241 }
242 }
243}
244
245impl SerializedComponentBatch {
246 #[inline]
254 pub fn partitioned(
255 self,
256 lengths: impl IntoIterator<Item = usize>,
257 ) -> SerializationResult<SerializedComponentColumn> {
258 let column: SerializedComponentColumn = self.into();
259 column.repartitioned(lengths)
260 }
261}
262
263const FIELD_METADATA_KEY_ARCHETYPE_NAME: &str = "rerun:archetype";
273
274const FIELD_METADATA_KEY_COMPONENT: &str = "rerun:component";
276
277const FIELD_METADATA_KEY_COMPONENT_TYPE: &str = "rerun:component_type";
279
280impl From<&SerializedComponentBatch> for arrow::datatypes::Field {
281 #[inline]
282 fn from(batch: &SerializedComponentBatch) -> Self {
283 Self::new(
284 batch.descriptor.component.to_string(),
285 batch.array.data_type().clone(),
286 false,
287 )
288 .with_metadata(
289 [
290 batch.descriptor.archetype.map(|name| {
291 (
292 FIELD_METADATA_KEY_ARCHETYPE_NAME.to_owned(),
293 name.to_string(),
294 )
295 }),
296 Some((
297 FIELD_METADATA_KEY_COMPONENT.to_owned(),
298 batch.descriptor.component.to_string(),
299 )),
300 batch.descriptor.component_type.map(|name| {
301 (
302 FIELD_METADATA_KEY_COMPONENT_TYPE.to_owned(),
303 name.to_string(),
304 )
305 }),
306 ]
307 .into_iter()
308 .flatten()
309 .collect(),
310 )
311 }
312}
313
314impl<L: Clone + Loggable> ComponentBatch for L {
317 #[inline]
318 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
319 L::to_arrow([std::borrow::Cow::Borrowed(self)])
320 }
321}
322
323impl<L: Clone + Loggable> ComponentBatch for Option<L> {
326 #[inline]
327 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
328 L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
329 }
330}
331
332impl<L: Clone + Loggable> ComponentBatch for Vec<L> {
335 #[inline]
336 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
337 L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
338 }
339}
340
341impl<L: Loggable> ComponentBatch for Vec<Option<L>> {
344 #[inline]
345 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
346 L::to_arrow_opt(
347 self.iter()
348 .map(|opt| opt.as_ref().map(|v| std::borrow::Cow::Borrowed(v))),
349 )
350 }
351}
352
353impl<L: Loggable, const N: usize> ComponentBatch for [L; N] {
356 #[inline]
357 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
358 L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
359 }
360}
361
362impl<L: Loggable, const N: usize> ComponentBatch for [Option<L>; N] {
365 #[inline]
366 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
367 L::to_arrow_opt(
368 self.iter()
369 .map(|opt| opt.as_ref().map(|v| std::borrow::Cow::Borrowed(v))),
370 )
371 }
372}
373
374impl<L: Loggable> ComponentBatch for [L] {
377 #[inline]
378 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
379 L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
380 }
381}
382
383impl<L: Loggable> ComponentBatch for [Option<L>] {
386 #[inline]
387 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
388 L::to_arrow_opt(
389 self.iter()
390 .map(|opt| opt.as_ref().map(|v| std::borrow::Cow::Borrowed(v))),
391 )
392 }
393}