re_types_core/
component_batch.rs1use crate::{ArchetypeName, ComponentDescriptor, ComponentType, Loggable, SerializationResult};
2
3use arrow::array::ListArray as ArrowListArray;
4
5#[allow(unused_imports, clippy::unused_trait_names)] use crate::Archetype;
7
8pub trait ComponentBatch {
20 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef>;
26
27 fn to_arrow_list_array(&self) -> SerializationResult<ArrowListArray> {
29 let array = self.to_arrow()?;
30 let offsets =
31 arrow::buffer::OffsetBuffer::from_lengths(std::iter::repeat_n(1, array.len()));
32 let nullable = true;
33 let field = arrow::datatypes::Field::new("item", array.data_type().clone(), nullable);
34 ArrowListArray::try_new(field.into(), offsets, array, None).map_err(|err| err.into())
35 }
36
37 #[inline]
54 fn serialized(&self, component_descr: ComponentDescriptor) -> Option<SerializedComponentBatch> {
55 match self.try_serialized(component_descr.clone()) {
56 Ok(array) => Some(array),
57
58 #[cfg(debug_assertions)]
59 Err(err) => {
60 panic!(
61 "failed to serialize data for {}: {}",
62 component_descr,
63 re_error::format_ref(&err)
64 )
65 }
66
67 #[cfg(not(debug_assertions))]
68 Err(err) => {
69 re_log::error!(
70 descriptor = %component_descr,
71 "failed to serialize data: {}",
72 re_error::format_ref(&err)
73 );
74 None
75 }
76 }
77 }
78
79 #[inline]
93 fn try_serialized(
94 &self,
95 component_descr: ComponentDescriptor,
96 ) -> SerializationResult<SerializedComponentBatch> {
97 Ok(SerializedComponentBatch {
98 array: self.to_arrow()?,
99 descriptor: component_descr,
100 })
101 }
102}
103
104#[allow(dead_code)]
105fn assert_component_batch_object_safe() {
106 let _: &dyn ComponentBatch;
107}
108
109#[derive(Debug, Clone)]
119pub struct SerializedComponentBatch {
120 pub descriptor: ComponentDescriptor,
122
123 pub array: arrow::array::ArrayRef,
124}
125
126impl re_byte_size::SizeBytes for SerializedComponentBatch {
127 #[inline]
128 fn heap_size_bytes(&self) -> u64 {
129 let Self { array, descriptor } = self;
130 array.heap_size_bytes() + descriptor.heap_size_bytes()
131 }
132}
133
134impl PartialEq for SerializedComponentBatch {
135 #[inline]
136 fn eq(&self, other: &Self) -> bool {
137 let Self { array, descriptor } = self;
138
139 *descriptor == other.descriptor && **array == *other.array
141 }
142}
143
144impl SerializedComponentBatch {
145 #[inline]
146 pub fn new(array: arrow::array::ArrayRef, descriptor: ComponentDescriptor) -> Self {
147 Self { array, descriptor }
148 }
149
150 #[inline]
151 pub fn with_descriptor_override(self, descriptor: ComponentDescriptor) -> Self {
152 Self { descriptor, ..self }
153 }
154
155 #[inline]
157 pub fn with_archetype(mut self, archetype_name: ArchetypeName) -> Self {
158 self.descriptor = self.descriptor.with_archetype(archetype_name);
159 self
160 }
161
162 #[inline]
164 pub fn with_component_type(mut self, component_type: ComponentType) -> Self {
165 self.descriptor = self.descriptor.with_component_type(component_type);
166 self
167 }
168
169 #[inline]
171 pub fn or_with_archetype(mut self, archetype_name: impl Fn() -> ArchetypeName) -> Self {
172 self.descriptor = self.descriptor.or_with_archetype(archetype_name);
173 self
174 }
175
176 #[inline]
178 pub fn or_with_component_type(
179 mut self,
180 component_type: impl FnOnce() -> ComponentType,
181 ) -> Self {
182 self.descriptor = self.descriptor.or_with_component_type(component_type);
183 self
184 }
185}
186
187#[derive(Debug, Clone)]
191pub struct SerializedComponentColumn {
192 pub list_array: arrow::array::ListArray,
193
194 pub descriptor: ComponentDescriptor,
196}
197
198impl SerializedComponentColumn {
199 pub fn repartitioned(
203 self,
204 lengths: impl IntoIterator<Item = usize>,
205 ) -> SerializationResult<Self> {
206 let Self {
207 list_array,
208 descriptor,
209 } = self;
210
211 let list_array = re_arrow_util::repartition_list_array(list_array, lengths)?;
212
213 Ok(Self {
214 list_array,
215 descriptor,
216 })
217 }
218}
219
220impl From<SerializedComponentBatch> for SerializedComponentColumn {
221 #[inline]
222 fn from(batch: SerializedComponentBatch) -> Self {
223 use arrow::{
224 array::{Array as _, ListArray},
225 buffer::OffsetBuffer,
226 datatypes::Field,
227 };
228
229 let list_array = {
230 let nullable = true;
231 let field = Field::new_list_field(batch.array.data_type().clone(), nullable);
232 let offsets = OffsetBuffer::from_lengths(std::iter::once(batch.array.len()));
233 let nulls = None;
234 ListArray::new(field.into(), offsets, batch.array, nulls)
235 };
236
237 Self {
238 list_array,
239 descriptor: batch.descriptor,
240 }
241 }
242}
243
244impl SerializedComponentBatch {
245 #[inline]
253 pub fn partitioned(
254 self,
255 lengths: impl IntoIterator<Item = usize>,
256 ) -> SerializationResult<SerializedComponentColumn> {
257 let column: SerializedComponentColumn = self.into();
258 column.repartitioned(lengths)
259 }
260}
261
262const FIELD_METADATA_KEY_ARCHETYPE_NAME: &str = "rerun:archetype";
272
273const FIELD_METADATA_KEY_COMPONENT: &str = "rerun:component";
275
276const FIELD_METADATA_KEY_COMPONENT_TYPE: &str = "rerun:component_type";
278
279impl From<&SerializedComponentBatch> for arrow::datatypes::Field {
280 #[inline]
281 fn from(batch: &SerializedComponentBatch) -> Self {
282 Self::new(
283 batch.descriptor.component.to_string(),
284 batch.array.data_type().clone(),
285 false,
286 )
287 .with_metadata(
288 [
289 batch.descriptor.archetype.map(|name| {
290 (
291 FIELD_METADATA_KEY_ARCHETYPE_NAME.to_owned(),
292 name.to_string(),
293 )
294 }),
295 Some((
296 FIELD_METADATA_KEY_COMPONENT.to_owned(),
297 batch.descriptor.component.to_string(),
298 )),
299 batch.descriptor.component_type.map(|name| {
300 (
301 FIELD_METADATA_KEY_COMPONENT_TYPE.to_owned(),
302 name.to_string(),
303 )
304 }),
305 ]
306 .into_iter()
307 .flatten()
308 .collect(),
309 )
310 }
311}
312
313impl<L: Clone + Loggable> ComponentBatch for L {
316 #[inline]
317 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
318 L::to_arrow([std::borrow::Cow::Borrowed(self)])
319 }
320}
321
322impl<L: Clone + Loggable> ComponentBatch for Option<L> {
325 #[inline]
326 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
327 L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
328 }
329}
330
331impl<L: Clone + Loggable> ComponentBatch for Vec<L> {
334 #[inline]
335 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
336 L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
337 }
338}
339
340impl<L: Loggable> ComponentBatch for Vec<Option<L>> {
343 #[inline]
344 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
345 L::to_arrow_opt(
346 self.iter()
347 .map(|opt| opt.as_ref().map(|v| std::borrow::Cow::Borrowed(v))),
348 )
349 }
350}
351
352impl<L: Loggable, const N: usize> ComponentBatch for [L; N] {
355 #[inline]
356 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
357 L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
358 }
359}
360
361impl<L: Loggable, const N: usize> ComponentBatch for [Option<L>; N] {
364 #[inline]
365 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
366 L::to_arrow_opt(
367 self.iter()
368 .map(|opt| opt.as_ref().map(|v| std::borrow::Cow::Borrowed(v))),
369 )
370 }
371}
372
373impl<L: Loggable> ComponentBatch for [L] {
376 #[inline]
377 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
378 L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
379 }
380}
381
382impl<L: Loggable> ComponentBatch for [Option<L>] {
385 #[inline]
386 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
387 L::to_arrow_opt(
388 self.iter()
389 .map(|opt| opt.as_ref().map(|v| std::borrow::Cow::Borrowed(v))),
390 )
391 }
392}