Skip to main content

re_chunk/
builder.rs

1use arrow::array::ArrayRef;
2use arrow::datatypes::DataType as ArrowDatatype;
3use itertools::Itertools as _;
4use nohash_hasher::IntMap;
5use re_log_types::{EntityPath, NonMinI64, TimePoint, Timeline, TimelineName};
6use re_types_core::{AsComponents, ComponentBatch, ComponentDescriptor, SerializedComponentBatch};
7
8use crate::{Chunk, ChunkId, ChunkResult, RowId, TimeColumn};
9
10// ---
11
12/// Helper to incrementally build a [`Chunk`].
13///
14/// Can be created using [`Chunk::builder`].
15pub struct ChunkBuilder {
16    id: ChunkId,
17    entity_path: EntityPath,
18
19    row_ids: Vec<RowId>,
20    timelines: IntMap<TimelineName, TimeColumnBuilder>,
21    components: IntMap<ComponentDescriptor, Vec<Option<ArrayRef>>>,
22}
23
24impl Chunk {
25    /// Initializes a new [`ChunkBuilder`].
26    #[inline]
27    pub fn builder(entity_path: impl Into<EntityPath>) -> ChunkBuilder {
28        ChunkBuilder::new(ChunkId::new(), entity_path.into())
29    }
30
31    /// Initializes a new [`ChunkBuilder`].
32    ///
33    /// The final [`Chunk`] will have the specified `id`.
34    #[inline]
35    pub fn builder_with_id(id: ChunkId, entity_path: impl Into<EntityPath>) -> ChunkBuilder {
36        ChunkBuilder::new(id, entity_path.into())
37    }
38}
39
40impl ChunkBuilder {
41    /// Initializes a new [`ChunkBuilder`].
42    ///
43    /// See also [`Chunk::builder`].
44    #[inline]
45    pub fn new(id: ChunkId, entity_path: EntityPath) -> Self {
46        Self {
47            id,
48            entity_path,
49
50            row_ids: Vec::new(),
51            timelines: IntMap::default(),
52            components: IntMap::default(),
53        }
54    }
55
56    /// Add a row's worth of data using the given sparse component data.
57    pub fn with_sparse_row(
58        mut self,
59        row_id: RowId,
60        timepoint: impl Into<TimePoint>,
61        components: impl IntoIterator<Item = (ComponentDescriptor, Option<ArrayRef>)>,
62    ) -> Self {
63        let components = components.into_iter().collect_vec();
64
65        // Align all columns by appending null values for rows where we don't have data.
66        for (component_desc, _) in &components {
67            let arrays = self.components.entry(component_desc.clone()).or_default();
68            arrays.extend(std::iter::repeat_n(
69                None,
70                self.row_ids.len().saturating_sub(arrays.len()),
71            ));
72        }
73
74        self.row_ids.push(row_id);
75
76        for (timeline, cell) in timepoint.into() {
77            self.timelines
78                .entry(timeline)
79                .or_insert_with(|| TimeColumn::builder(Timeline::new(timeline, cell.typ())))
80                .with_row(cell.value);
81        }
82
83        for (component_descr, array) in components {
84            self.components
85                .entry(component_descr)
86                .or_default()
87                .push(array);
88        }
89
90        // Align all columns by appending null values for rows where we don't have data.
91        for arrays in self.components.values_mut() {
92            arrays.extend(std::iter::repeat_n(
93                None,
94                self.row_ids.len().saturating_sub(arrays.len()),
95            ));
96        }
97
98        self
99    }
100
101    /// Add a row's worth of data using the given component data.
102    #[inline]
103    pub fn with_row(
104        self,
105        row_id: RowId,
106        timepoint: impl Into<TimePoint>,
107        components: impl IntoIterator<Item = (ComponentDescriptor, ArrayRef)>,
108    ) -> Self {
109        self.with_sparse_row(
110            row_id,
111            timepoint,
112            components
113                .into_iter()
114                .map(|(component_descr, array)| (component_descr, Some(array))),
115        )
116    }
117
118    /// Add a row's worth of data by destructuring an archetype into component columns.
119    #[inline]
120    pub fn with_archetype(
121        self,
122        row_id: RowId,
123        timepoint: impl Into<TimePoint>,
124        as_components: &dyn AsComponents,
125    ) -> Self {
126        let batches = as_components.as_serialized_batches();
127        self.with_serialized_batches(row_id, timepoint, batches)
128    }
129
130    /// Add a row's worth of data by destructuring an archetype into component columns, using an automatically generated row ID.
131    #[inline]
132    pub fn with_archetype_auto_row(
133        self,
134        timepoint: impl Into<TimePoint>,
135        as_components: &dyn AsComponents,
136    ) -> Self {
137        let batches = as_components.as_serialized_batches();
138        self.with_serialized_batches(RowId::new(), timepoint, batches)
139    }
140
141    /// Add the serialized value of a single component to the chunk.
142    pub fn with_component<Component: re_types_core::Component>(
143        self,
144        row_id: RowId,
145        timepoint: impl Into<TimePoint>,
146        component_descr: re_types_core::ComponentDescriptor,
147        value: &Component,
148    ) -> re_types_core::SerializationResult<Self> {
149        debug_assert_eq!(component_descr.component_type, Some(Component::name()));
150        Ok(self.with_serialized_batches(
151            row_id,
152            timepoint,
153            vec![re_types_core::SerializedComponentBatch {
154                descriptor: component_descr,
155                array: Component::to_arrow([std::borrow::Cow::Borrowed(value)])?,
156            }],
157        ))
158    }
159
160    /// Add a row's worth of data by serializing a single [`ComponentBatch`].
161    #[inline]
162    pub fn with_component_batch(
163        self,
164        row_id: RowId,
165        timepoint: impl Into<TimePoint>,
166        component_batch: (ComponentDescriptor, &dyn ComponentBatch),
167    ) -> Self {
168        self.with_row(
169            row_id,
170            timepoint,
171            component_batch
172                .1
173                .to_arrow()
174                .ok()
175                .map(|array| (component_batch.0, array)),
176        )
177    }
178
179    /// Add a row's worth of data by serializing many [`ComponentBatch`]es.
180    #[inline]
181    pub fn with_component_batches<'a>(
182        self,
183        row_id: RowId,
184        timepoint: impl Into<TimePoint>,
185        component_batches: impl IntoIterator<Item = (ComponentDescriptor, &'a dyn ComponentBatch)>,
186    ) -> Self {
187        self.with_row(
188            row_id,
189            timepoint,
190            component_batches
191                .into_iter()
192                .filter_map(|(component_descr, component_batch)| {
193                    component_batch
194                        .to_arrow()
195                        .ok()
196                        .map(|array| (component_descr, array))
197                }),
198        )
199    }
200
201    /// Add a row's worth of data by serializing many sparse [`ComponentBatch`]es.
202    #[inline]
203    pub fn with_sparse_component_batches<'a>(
204        self,
205        row_id: RowId,
206        timepoint: impl Into<TimePoint>,
207        component_batches: impl IntoIterator<
208            Item = (ComponentDescriptor, Option<&'a dyn ComponentBatch>),
209        >,
210    ) -> Self {
211        self.with_sparse_row(
212            row_id,
213            timepoint,
214            component_batches
215                .into_iter()
216                .map(|(component_desc, component_batch)| {
217                    (
218                        component_desc,
219                        component_batch.and_then(|batch| batch.to_arrow().ok()),
220                    )
221                }),
222        )
223    }
224
225    /// Add a row's worth of data by serializing a single [`ComponentBatch`].
226    #[inline]
227    pub fn with_serialized_batch(
228        self,
229        row_id: RowId,
230        timepoint: impl Into<TimePoint>,
231        component_batch: SerializedComponentBatch,
232    ) -> Self {
233        self.with_row(
234            row_id,
235            timepoint,
236            [(component_batch.descriptor, component_batch.array)],
237        )
238    }
239
240    /// Add a row's worth of data by serializing many [`ComponentBatch`]es.
241    #[inline]
242    pub fn with_serialized_batches(
243        self,
244        row_id: RowId,
245        timepoint: impl Into<TimePoint>,
246        component_batches: impl IntoIterator<Item = SerializedComponentBatch>,
247    ) -> Self {
248        self.with_row(
249            row_id,
250            timepoint,
251            component_batches
252                .into_iter()
253                .map(|component_batch| (component_batch.descriptor, component_batch.array)),
254        )
255    }
256
257    /// Add a row's worth of data by serializing many sparse [`ComponentBatch`]es.
258    #[inline]
259    pub fn with_sparse_serialized_batches(
260        self,
261        row_id: RowId,
262        timepoint: impl Into<TimePoint>,
263        component_batches: impl IntoIterator<
264            Item = (ComponentDescriptor, Option<SerializedComponentBatch>),
265        >,
266    ) -> Self {
267        self.with_sparse_row(
268            row_id,
269            timepoint,
270            component_batches
271                .into_iter()
272                .map(|(component_desc, component_batch)| {
273                    (component_desc, component_batch.map(|batch| batch.array))
274                }),
275        )
276    }
277
278    /// Builds and returns the final [`Chunk`].
279    ///
280    /// The arrow datatype of each individual column will be guessed by inspecting the data.
281    ///
282    /// If any component column turns out to be fully sparse (i.e. only null values), that column
283    /// will be stripped out (how could we guess its datatype without any single value to inspect)?
284    ///
285    /// This is generally the desired behavior but, if you want to make sure to keep fully sparse
286    /// columns (can be useful e.g. for testing purposes), see [`ChunkBuilder::build_with_datatypes`]
287    /// instead.
288    ///
289    /// This returns an error if the chunk fails to `sanity_check`.
290    #[inline]
291    pub fn build(self) -> ChunkResult<Chunk> {
292        re_tracing::profile_function!();
293        let Self {
294            id,
295            entity_path,
296            row_ids,
297            timelines,
298            components,
299        } = self;
300
301        let timelines = {
302            re_tracing::profile_scope!("timelines");
303            timelines
304                .into_iter()
305                .map(|(timeline, time_column)| (timeline, time_column.build()))
306                .collect()
307        };
308
309        let components = {
310            re_tracing::profile_scope!("components");
311            components
312                .into_iter()
313                .filter_map(|(component_desc, arrays)| {
314                    let arrays = arrays.iter().map(|array| array.as_deref()).collect_vec();
315                    re_arrow_util::arrays_to_list_array_opt(&arrays)
316                        .map(|list_array| (component_desc, list_array))
317                })
318                .collect()
319        };
320
321        Chunk::from_native_row_ids(id, entity_path, None, &row_ids, timelines, components)
322    }
323
324    /// Builds and returns the final [`Chunk`].
325    ///
326    /// The arrow datatype of each individual column will be guessed by inspecting the data.
327    ///
328    /// If any component column turns out to be fully sparse (i.e. only null values), `datatypes`
329    /// will be used as a fallback.
330    ///
331    /// If any component column turns out to be fully sparse (i.e. only null values) _and_ doesn't
332    /// have an explicit datatype passed in, that column will be stripped out (how could we guess
333    /// its datatype without any single value to inspect)?
334    ///
335    /// You should rarely want to keep fully sparse columns around outside of testing scenarios.
336    /// See [`Self::build`].
337    ///
338    /// This returns an error if the chunk fails to `sanity_check`.
339    #[inline]
340    pub fn build_with_datatypes(
341        self,
342        datatypes: &IntMap<ComponentDescriptor, ArrowDatatype>,
343    ) -> ChunkResult<Chunk> {
344        let Self {
345            id,
346            entity_path,
347            row_ids,
348            timelines,
349            components,
350        } = self;
351
352        Chunk::from_native_row_ids(
353            id,
354            entity_path,
355            None,
356            &row_ids,
357            timelines
358                .into_iter()
359                .map(|(timeline, time_column)| (timeline, time_column.build()))
360                .collect(),
361            {
362                components
363                    .into_iter()
364                    .filter_map(|(component_desc, arrays)| {
365                        let arrays = arrays.iter().map(|array| array.as_deref()).collect_vec();
366                        // If we know the datatype in advance, we're able to keep even fully sparse
367                        // columns around.
368                        if let Some(datatype) = datatypes.get(&component_desc) {
369                            re_arrow_util::arrays_to_list_array(datatype.clone(), &arrays)
370                                .map(|list_array| (component_desc, list_array))
371                        } else {
372                            re_arrow_util::arrays_to_list_array_opt(&arrays)
373                                .map(|list_array| (component_desc, list_array))
374                        }
375                    })
376                    .collect()
377            },
378        )
379    }
380}
381
382// ---
383
384/// Helper to incrementally build a [`TimeColumn`].
385///
386/// Can be created using [`TimeColumn::builder`].
387pub struct TimeColumnBuilder {
388    timeline: Timeline,
389
390    times: Vec<i64>,
391}
392
393impl TimeColumn {
394    /// Initializes a new [`TimeColumnBuilder`].
395    #[inline]
396    pub fn builder(timeline: Timeline) -> TimeColumnBuilder {
397        TimeColumnBuilder::new(timeline)
398    }
399}
400
401impl TimeColumnBuilder {
402    /// Initializes a new [`TimeColumnBuilder`].
403    ///
404    /// See also [`TimeColumn::builder`].
405    #[inline]
406    pub fn new(timeline: Timeline) -> Self {
407        Self {
408            timeline,
409            times: Vec::new(),
410        }
411    }
412
413    /// Add a row's worth of time data using the given timestamp.
414    #[inline]
415    pub fn with_row(&mut self, time: NonMinI64) -> &mut Self {
416        self.times.push(time.into());
417        self
418    }
419
420    /// Builds and returns the final [`TimeColumn`].
421    #[inline]
422    pub fn build(self) -> TimeColumn {
423        let Self { timeline, times } = self;
424        TimeColumn::new(None, timeline, times.into())
425    }
426}