1use crate::{ArchetypeName, ComponentDescriptor, ComponentType, Loggable, SerializationResult};
2
3use arrow::array::{ListArray as ArrowListArray, ListArray};
4use arrow::buffer::OffsetBuffer;
5
6#[allow(clippy::allow_attributes, unused_imports, clippy::unused_trait_names)]
8use crate::Archetype;
9
10pub trait ComponentBatch {
22 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef>;
28
29 fn to_arrow_list_array(&self) -> SerializationResult<ArrowListArray> {
31 let array = self.to_arrow()?;
32 let offsets =
33 arrow::buffer::OffsetBuffer::from_lengths(std::iter::repeat_n(1, array.len()));
34 let nullable = true;
35 let field = arrow::datatypes::Field::new("item", array.data_type().clone(), nullable);
36 ArrowListArray::try_new(field.into(), offsets, array, None).map_err(|err| err.into())
37 }
38
39 #[inline]
56 fn serialized(&self, component_descr: ComponentDescriptor) -> Option<SerializedComponentBatch> {
57 match self.try_serialized(component_descr.clone()) {
58 Ok(array) => Some(array),
59
60 #[cfg(debug_assertions)]
61 Err(err) => {
62 panic!(
63 "failed to serialize data for {}: {}",
64 component_descr,
65 re_error::format_ref(&err)
66 )
67 }
68
69 #[cfg(not(debug_assertions))]
70 Err(err) => {
71 re_log::error!(
72 descriptor = %component_descr,
73 "failed to serialize data: {}",
74 re_error::format_ref(&err)
75 );
76 None
77 }
78 }
79 }
80
81 #[inline]
95 fn try_serialized(
96 &self,
97 component_descr: ComponentDescriptor,
98 ) -> SerializationResult<SerializedComponentBatch> {
99 Ok(SerializedComponentBatch {
100 array: self.to_arrow()?,
101 descriptor: component_descr,
102 })
103 }
104}
105
106#[expect(dead_code)]
107fn assert_component_batch_object_safe() {
108 let _: &dyn ComponentBatch;
109}
110
111#[derive(Debug, Clone)]
121pub struct SerializedComponentBatch {
122 pub descriptor: ComponentDescriptor,
124
125 pub array: arrow::array::ArrayRef,
126}
127
128impl re_byte_size::SizeBytes for SerializedComponentBatch {
129 #[inline]
130 fn heap_size_bytes(&self) -> u64 {
131 let Self { array, descriptor } = self;
132 array.heap_size_bytes() + descriptor.heap_size_bytes()
133 }
134}
135
136impl PartialEq for SerializedComponentBatch {
137 #[inline]
138 fn eq(&self, other: &Self) -> bool {
139 let Self { array, descriptor } = self;
140
141 *descriptor == other.descriptor && **array == *other.array
143 }
144}
145
146impl SerializedComponentBatch {
147 #[inline]
148 pub fn new(array: arrow::array::ArrayRef, descriptor: ComponentDescriptor) -> Self {
149 Self { array, descriptor }
150 }
151
152 #[inline]
153 pub fn with_descriptor_override(self, descriptor: ComponentDescriptor) -> Self {
154 Self { descriptor, ..self }
155 }
156
157 #[inline]
159 pub fn with_archetype(mut self, archetype_name: ArchetypeName) -> Self {
160 self.descriptor = self.descriptor.with_archetype(archetype_name);
161 self
162 }
163
164 #[inline]
166 pub fn with_component_type(mut self, component_type: ComponentType) -> Self {
167 self.descriptor = self.descriptor.with_component_type(component_type);
168 self
169 }
170
171 #[inline]
173 pub fn or_with_archetype(mut self, archetype_name: impl Fn() -> ArchetypeName) -> Self {
174 self.descriptor = self.descriptor.or_with_archetype(archetype_name);
175 self
176 }
177
178 #[inline]
180 pub fn or_with_component_type(
181 mut self,
182 component_type: impl FnOnce() -> ComponentType,
183 ) -> Self {
184 self.descriptor = self.descriptor.or_with_component_type(component_type);
185 self
186 }
187}
188
189#[derive(Debug, Clone, PartialEq)]
193pub struct SerializedComponentColumn {
194 pub list_array: arrow::array::ListArray,
195
196 pub descriptor: ComponentDescriptor,
198}
199
200impl SerializedComponentColumn {
201 #[inline]
202 pub fn new(list_array: arrow::array::ListArray, descriptor: ComponentDescriptor) -> Self {
203 Self {
204 list_array,
205 descriptor,
206 }
207 }
208
209 pub fn repartitioned(
213 self,
214 lengths: impl IntoIterator<Item = usize>,
215 ) -> SerializationResult<Self> {
216 let Self {
217 list_array,
218 descriptor,
219 } = self;
220
221 let list_array = repartition_list_array(list_array, lengths)?;
222
223 Ok(Self {
224 list_array,
225 descriptor,
226 })
227 }
228}
229
230impl re_byte_size::SizeBytes for SerializedComponentColumn {
231 #[inline]
232 fn heap_size_bytes(&self) -> u64 {
233 self.list_array.heap_size_bytes() + self.descriptor.heap_size_bytes()
234 }
235}
236
237impl From<SerializedComponentBatch> for SerializedComponentColumn {
238 #[inline]
239 fn from(batch: SerializedComponentBatch) -> Self {
240 use arrow::{
241 array::{Array as _, ListArray},
242 buffer::OffsetBuffer,
243 datatypes::Field,
244 };
245
246 let list_array = {
247 let nullable = true;
248 let field = Field::new_list_field(batch.array.data_type().clone(), nullable);
249 let offsets = OffsetBuffer::from_lengths(std::iter::once(batch.array.len()));
250 let nulls = None;
251 ListArray::new(field.into(), offsets, batch.array, nulls)
252 };
253
254 Self {
255 list_array,
256 descriptor: batch.descriptor,
257 }
258 }
259}
260
261#[inline]
267pub fn repartition_list_array(
268 list_array: ListArray,
269 lengths: impl IntoIterator<Item = usize>,
270) -> arrow::error::Result<ListArray> {
271 let (field, _offsets, values, _nulls) = list_array.into_parts();
272
273 let offsets = OffsetBuffer::from_lengths(lengths);
274 let nulls = None;
275
276 ListArray::try_new(field, offsets, values, nulls)
277}
278
279impl SerializedComponentBatch {
280 #[inline]
288 pub fn partitioned(
289 self,
290 lengths: impl IntoIterator<Item = usize>,
291 ) -> SerializationResult<SerializedComponentColumn> {
292 let column: SerializedComponentColumn = self.into();
293 column.repartitioned(lengths)
294 }
295}
296
297impl From<&SerializedComponentBatch> for arrow::datatypes::Field {
304 #[inline]
305 fn from(batch: &SerializedComponentBatch) -> Self {
306 Self::new(
307 batch.descriptor.component.to_string(),
308 batch.array.data_type().clone(),
309 false,
310 )
311 .with_metadata(
312 [
313 batch.descriptor.archetype.map(|name| {
314 (
315 crate::FIELD_METADATA_KEY_ARCHETYPE.to_owned(),
316 name.to_string(),
317 )
318 }),
319 Some((
320 crate::FIELD_METADATA_KEY_COMPONENT.to_owned(),
321 batch.descriptor.component.to_string(),
322 )),
323 batch.descriptor.component_type.map(|name| {
324 (
325 crate::FIELD_METADATA_KEY_COMPONENT_TYPE.to_owned(),
326 name.to_string(),
327 )
328 }),
329 ]
330 .into_iter()
331 .flatten()
332 .collect(),
333 )
334 }
335}
336
337impl<L: Clone + Loggable> ComponentBatch for L {
340 #[inline]
341 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
342 L::to_arrow([std::borrow::Cow::Borrowed(self)])
343 }
344}
345
346impl<L: Clone + Loggable> ComponentBatch for Option<L> {
349 #[inline]
350 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
351 L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
352 }
353}
354
355impl<L: Clone + Loggable> ComponentBatch for Vec<L> {
358 #[inline]
359 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
360 L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
361 }
362}
363
364impl<L: Loggable> ComponentBatch for Vec<Option<L>> {
367 #[inline]
368 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
369 L::to_arrow_opt(
370 self.iter()
371 .map(|opt| opt.as_ref().map(|v| std::borrow::Cow::Borrowed(v))),
372 )
373 }
374}
375
376impl<L: Loggable, const N: usize> ComponentBatch for [L; N] {
379 #[inline]
380 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
381 L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
382 }
383}
384
385impl<L: Loggable, const N: usize> ComponentBatch for [Option<L>; N] {
388 #[inline]
389 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
390 L::to_arrow_opt(
391 self.iter()
392 .map(|opt| opt.as_ref().map(|v| std::borrow::Cow::Borrowed(v))),
393 )
394 }
395}
396
397impl<L: Loggable> ComponentBatch for [L] {
400 #[inline]
401 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
402 L::to_arrow(self.iter().map(|v| std::borrow::Cow::Borrowed(v)))
403 }
404}
405
406impl<L: Loggable> ComponentBatch for [Option<L>] {
409 #[inline]
410 fn to_arrow(&self) -> SerializationResult<arrow::array::ArrayRef> {
411 L::to_arrow_opt(
412 self.iter()
413 .map(|opt| opt.as_ref().map(|v| std::borrow::Cow::Borrowed(v))),
414 )
415 }
416}