re_types_core/
loggable_batch.rs1use std::borrow::Cow;
2
3use crate::{
4 ArchetypeFieldName, ArchetypeName, Component, ComponentDescriptor, ComponentName, Loggable,
5 SerializationResult,
6};
7
8use arrow::array::ListArray as ArrowListArray;
9
10#[allow(unused_imports)] use crate::Archetype;
12
13pub trait LoggableBatch {
25 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef>;
31}
32
33#[allow(dead_code)]
34fn assert_loggablebatch_object_safe() {
35 let _: &dyn LoggableBatch;
36}
37
38pub trait ComponentBatch: LoggableBatch {
40 fn to_arrow_list_array(&self) -> SerializationResult<ArrowListArray> {
42 let array = self.to_arrow()?;
43 let offsets =
44 arrow::buffer::OffsetBuffer::from_lengths(std::iter::repeat(1).take(array.len()));
45 let nullable = true;
46 let field = arrow::datatypes::Field::new("item", array.data_type().clone(), nullable);
47 ArrowListArray::try_new(field.into(), offsets, array, None).map_err(|err| err.into())
48 }
49
50 fn descriptor(&self) -> Cow<'_, ComponentDescriptor>;
54
55 #[inline]
72 fn serialized(&self) -> Option<SerializedComponentBatch> {
73 match self.try_serialized() {
74 Ok(array) => Some(array),
75
76 #[cfg(debug_assertions)]
77 Err(err) => {
78 panic!(
79 "failed to serialize data for {}: {}",
80 self.descriptor(),
81 re_error::format_ref(&err)
82 )
83 }
84
85 #[cfg(not(debug_assertions))]
86 Err(err) => {
87 re_log::error!(
88 descriptor = %self.descriptor(),
89 "failed to serialize data: {}",
90 re_error::format_ref(&err)
91 );
92 None
93 }
94 }
95 }
96
97 #[inline]
111 fn try_serialized(&self) -> SerializationResult<SerializedComponentBatch> {
112 Ok(SerializedComponentBatch {
113 array: self.to_arrow()?,
114 descriptor: self.descriptor().into_owned(),
115 })
116 }
117
118 #[inline]
127 fn name(&self) -> ComponentName {
128 self.descriptor().component_name
129 }
130}
131
132#[allow(dead_code)]
133fn assert_component_batch_object_safe() {
134 let _: &dyn LoggableBatch;
135}
136
137#[derive(Debug, Clone)]
147pub struct SerializedComponentBatch {
148 pub array: arrow::array::ArrayRef,
149
150 pub descriptor: ComponentDescriptor,
152}
153
154impl re_byte_size::SizeBytes for SerializedComponentBatch {
155 #[inline]
156 fn heap_size_bytes(&self) -> u64 {
157 let Self { array, descriptor } = self;
158 array.heap_size_bytes() + descriptor.heap_size_bytes()
159 }
160}
161
162impl PartialEq for SerializedComponentBatch {
163 #[inline]
164 fn eq(&self, other: &Self) -> bool {
165 let Self { array, descriptor } = self;
166
167 *descriptor == other.descriptor && **array == *other.array
169 }
170}
171
172impl SerializedComponentBatch {
173 #[inline]
174 pub fn new(array: arrow::array::ArrayRef, descriptor: ComponentDescriptor) -> Self {
175 Self { array, descriptor }
176 }
177
178 #[inline]
179 pub fn with_descriptor_override(self, descriptor: ComponentDescriptor) -> Self {
180 Self { descriptor, ..self }
181 }
182
183 #[inline]
185 pub fn with_archetype_name(mut self, archetype_name: ArchetypeName) -> Self {
186 self.descriptor = self.descriptor.with_archetype_name(archetype_name);
187 self
188 }
189
190 #[inline]
192 pub fn with_archetype_field_name(mut self, archetype_field_name: ArchetypeFieldName) -> Self {
193 self.descriptor = self
194 .descriptor
195 .with_archetype_field_name(archetype_field_name);
196 self
197 }
198
199 #[inline]
201 pub fn or_with_archetype_name(mut self, archetype_name: impl Fn() -> ArchetypeName) -> Self {
202 self.descriptor = self.descriptor.or_with_archetype_name(archetype_name);
203 self
204 }
205
206 #[inline]
208 pub fn or_with_archetype_field_name(
209 mut self,
210 archetype_field_name: impl FnOnce() -> ArchetypeFieldName,
211 ) -> Self {
212 self.descriptor = self
213 .descriptor
214 .or_with_archetype_field_name(archetype_field_name);
215 self
216 }
217}
218
219#[derive(Debug, Clone)]
223pub struct SerializedComponentColumn {
224 pub list_array: arrow::array::ListArray,
225
226 pub descriptor: ComponentDescriptor,
228}
229
230impl SerializedComponentColumn {
231 pub fn repartitioned(
235 self,
236 lengths: impl IntoIterator<Item = usize>,
237 ) -> SerializationResult<Self> {
238 let Self {
239 list_array,
240 descriptor,
241 } = self;
242
243 let list_array = re_arrow_util::arrow_util::repartition_list_array(list_array, lengths)?;
244
245 Ok(Self {
246 list_array,
247 descriptor,
248 })
249 }
250}
251
252impl From<SerializedComponentBatch> for SerializedComponentColumn {
253 #[inline]
254 fn from(batch: SerializedComponentBatch) -> Self {
255 use arrow::{
256 array::{Array, ListArray},
257 buffer::OffsetBuffer,
258 datatypes::Field,
259 };
260
261 let list_array = {
262 let nullable = true;
263 let field = Field::new_list_field(batch.array.data_type().clone(), nullable);
264 let offsets = OffsetBuffer::from_lengths(std::iter::once(batch.array.len()));
265 let nulls = None;
266 ListArray::new(field.into(), offsets, batch.array, nulls)
267 };
268
269 Self {
270 list_array,
271 descriptor: batch.descriptor,
272 }
273 }
274}
275
276impl SerializedComponentBatch {
277 #[inline]
285 pub fn partitioned(
286 self,
287 lengths: impl IntoIterator<Item = usize>,
288 ) -> SerializationResult<SerializedComponentColumn> {
289 let column: SerializedComponentColumn = self.into();
290 column.repartitioned(lengths)
291 }
292}
293
294const FIELD_METADATA_KEY_ARCHETYPE_NAME: &str = "rerun.archetype_name";
304
305const FIELD_METADATA_KEY_ARCHETYPE_FIELD_NAME: &str = "rerun.archetype_field_name";
307
308impl From<&SerializedComponentBatch> for arrow::datatypes::Field {
309 #[inline]
310 fn from(batch: &SerializedComponentBatch) -> Self {
311 Self::new(
312 batch.descriptor.component_name.to_string(),
313 batch.array.data_type().clone(),
314 false,
315 )
316 .with_metadata(
317 [
318 batch.descriptor.archetype_name.map(|name| {
319 (
320 FIELD_METADATA_KEY_ARCHETYPE_NAME.to_owned(),
321 name.to_string(),
322 )
323 }),
324 batch.descriptor.archetype_field_name.map(|name| {
325 (
326 FIELD_METADATA_KEY_ARCHETYPE_FIELD_NAME.to_owned(),
327 name.to_string(),
328 )
329 }),
330 ]
331 .into_iter()
332 .flatten()
333 .collect(),
334 )
335 }
336}
337
338impl<L: Clone + Loggable> LoggableBatch for L {
341 #[inline]
342 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
343 L::to_arrow([std::borrow::Cow::Borrowed(self)])
344 }
345}
346
347impl<C: Component> ComponentBatch for C {
348 #[inline]
349 fn descriptor(&self) -> Cow<'_, ComponentDescriptor> {
350 C::descriptor().into()
351 }
352}
353
354impl<L: Clone + Loggable> LoggableBatch for Option<L> {
357 #[inline]
358 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
359 L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
360 }
361}
362
363impl<C: Component> ComponentBatch for Option<C> {
364 #[inline]
365 fn descriptor(&self) -> Cow<'_, ComponentDescriptor> {
366 C::descriptor().into()
367 }
368}
369
370impl<L: Clone + Loggable> LoggableBatch for Vec<L> {
373 #[inline]
374 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
375 L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
376 }
377}
378
379impl<C: Component> ComponentBatch for Vec<C> {
380 #[inline]
381 fn descriptor(&self) -> Cow<'_, ComponentDescriptor> {
382 C::descriptor().into()
383 }
384}
385
386impl<L: Loggable> LoggableBatch for Vec<Option<L>> {
389 #[inline]
390 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
391 L::to_arrow_opt(
392 self.iter()
393 .map(|opt| opt.as_ref().map(|v| std::borrow::Cow::Borrowed(v))),
394 )
395 }
396}
397
398impl<C: Component> ComponentBatch for Vec<Option<C>> {
399 #[inline]
400 fn descriptor(&self) -> Cow<'_, ComponentDescriptor> {
401 C::descriptor().into()
402 }
403}
404
405impl<L: Loggable, const N: usize> LoggableBatch for [L; N] {
408 #[inline]
409 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
410 L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
411 }
412}
413
414impl<C: Component, const N: usize> ComponentBatch for [C; N] {
415 #[inline]
416 fn descriptor(&self) -> Cow<'_, ComponentDescriptor> {
417 C::descriptor().into()
418 }
419}
420
421impl<L: Loggable, const N: usize> LoggableBatch for [Option<L>; N] {
424 #[inline]
425 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
426 L::to_arrow_opt(
427 self.iter()
428 .map(|opt| opt.as_ref().map(|v| std::borrow::Cow::Borrowed(v))),
429 )
430 }
431}
432
433impl<C: Component, const N: usize> ComponentBatch for [Option<C>; N] {
434 #[inline]
435 fn descriptor(&self) -> Cow<'_, ComponentDescriptor> {
436 C::descriptor().into()
437 }
438}
439
440impl<L: Loggable> LoggableBatch for [L] {
443 #[inline]
444 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
445 L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
446 }
447}
448
449impl<C: Component> ComponentBatch for [C] {
450 #[inline]
451 fn descriptor(&self) -> Cow<'_, ComponentDescriptor> {
452 C::descriptor().into()
453 }
454}
455
456impl<L: Loggable> LoggableBatch for [Option<L>] {
459 #[inline]
460 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
461 L::to_arrow_opt(
462 self.iter()
463 .map(|opt| opt.as_ref().map(|v| std::borrow::Cow::Borrowed(v))),
464 )
465 }
466}
467
468impl<C: Component> ComponentBatch for [Option<C>] {
469 #[inline]
470 fn descriptor(&self) -> Cow<'_, ComponentDescriptor> {
471 C::descriptor().into()
472 }
473}