1use arrow::array::ArrayRef;
2use arrow::datatypes::DataType as ArrowDatatype;
3use itertools::Itertools as _;
4use nohash_hasher::IntMap;
5use re_log_types::{EntityPath, NonMinI64, TimePoint, Timeline, TimelineName};
6use re_types_core::{AsComponents, ComponentBatch, ComponentDescriptor, SerializedComponentBatch};
7
8use crate::{Chunk, ChunkId, ChunkResult, RowId, TimeColumn};
9
10pub struct ChunkBuilder {
16 id: ChunkId,
17 entity_path: EntityPath,
18
19 row_ids: Vec<RowId>,
20 timelines: IntMap<TimelineName, TimeColumnBuilder>,
21 components: IntMap<ComponentDescriptor, Vec<Option<ArrayRef>>>,
22}
23
24impl Chunk {
25 #[inline]
27 pub fn builder(entity_path: impl Into<EntityPath>) -> ChunkBuilder {
28 ChunkBuilder::new(ChunkId::new(), entity_path.into())
29 }
30
31 #[inline]
35 pub fn builder_with_id(id: ChunkId, entity_path: impl Into<EntityPath>) -> ChunkBuilder {
36 ChunkBuilder::new(id, entity_path.into())
37 }
38}
39
40impl ChunkBuilder {
41 #[inline]
45 pub fn new(id: ChunkId, entity_path: EntityPath) -> Self {
46 Self {
47 id,
48 entity_path,
49
50 row_ids: Vec::new(),
51 timelines: IntMap::default(),
52 components: IntMap::default(),
53 }
54 }
55
56 pub fn with_sparse_row(
58 mut self,
59 row_id: RowId,
60 timepoint: impl Into<TimePoint>,
61 components: impl IntoIterator<Item = (ComponentDescriptor, Option<ArrayRef>)>,
62 ) -> Self {
63 let components = components.into_iter().collect_vec();
64
65 for (component_desc, _) in &components {
67 let arrays = self.components.entry(component_desc.clone()).or_default();
68 arrays.extend(std::iter::repeat_n(
69 None,
70 self.row_ids.len().saturating_sub(arrays.len()),
71 ));
72 }
73
74 self.row_ids.push(row_id);
75
76 for (timeline, cell) in timepoint.into() {
77 self.timelines
78 .entry(timeline)
79 .or_insert_with(|| TimeColumn::builder(Timeline::new(timeline, cell.typ())))
80 .with_row(cell.value);
81 }
82
83 for (component_descr, array) in components {
84 self.components
85 .entry(component_descr)
86 .or_default()
87 .push(array);
88 }
89
90 for arrays in self.components.values_mut() {
92 arrays.extend(std::iter::repeat_n(
93 None,
94 self.row_ids.len().saturating_sub(arrays.len()),
95 ));
96 }
97
98 self
99 }
100
101 #[inline]
103 pub fn with_row(
104 self,
105 row_id: RowId,
106 timepoint: impl Into<TimePoint>,
107 components: impl IntoIterator<Item = (ComponentDescriptor, ArrayRef)>,
108 ) -> Self {
109 self.with_sparse_row(
110 row_id,
111 timepoint,
112 components
113 .into_iter()
114 .map(|(component_descr, array)| (component_descr, Some(array))),
115 )
116 }
117
118 #[inline]
120 pub fn with_archetype(
121 self,
122 row_id: RowId,
123 timepoint: impl Into<TimePoint>,
124 as_components: &dyn AsComponents,
125 ) -> Self {
126 let batches = as_components.as_serialized_batches();
127 self.with_serialized_batches(row_id, timepoint, batches)
128 }
129
130 #[inline]
132 pub fn with_archetype_auto_row(
133 self,
134 timepoint: impl Into<TimePoint>,
135 as_components: &dyn AsComponents,
136 ) -> Self {
137 let batches = as_components.as_serialized_batches();
138 self.with_serialized_batches(RowId::new(), timepoint, batches)
139 }
140
141 pub fn with_component<Component: re_types_core::Component>(
143 self,
144 row_id: RowId,
145 timepoint: impl Into<TimePoint>,
146 component_descr: re_types_core::ComponentDescriptor,
147 value: &Component,
148 ) -> re_types_core::SerializationResult<Self> {
149 debug_assert_eq!(component_descr.component_type, Some(Component::name()));
150 Ok(self.with_serialized_batches(
151 row_id,
152 timepoint,
153 vec![re_types_core::SerializedComponentBatch {
154 descriptor: component_descr,
155 array: Component::to_arrow([std::borrow::Cow::Borrowed(value)])?,
156 }],
157 ))
158 }
159
160 #[inline]
162 pub fn with_component_batch(
163 self,
164 row_id: RowId,
165 timepoint: impl Into<TimePoint>,
166 component_batch: (ComponentDescriptor, &dyn ComponentBatch),
167 ) -> Self {
168 self.with_row(
169 row_id,
170 timepoint,
171 component_batch
172 .1
173 .to_arrow()
174 .ok()
175 .map(|array| (component_batch.0, array)),
176 )
177 }
178
179 #[inline]
181 pub fn with_component_batches<'a>(
182 self,
183 row_id: RowId,
184 timepoint: impl Into<TimePoint>,
185 component_batches: impl IntoIterator<Item = (ComponentDescriptor, &'a dyn ComponentBatch)>,
186 ) -> Self {
187 self.with_row(
188 row_id,
189 timepoint,
190 component_batches
191 .into_iter()
192 .filter_map(|(component_descr, component_batch)| {
193 component_batch
194 .to_arrow()
195 .ok()
196 .map(|array| (component_descr, array))
197 }),
198 )
199 }
200
201 #[inline]
203 pub fn with_sparse_component_batches<'a>(
204 self,
205 row_id: RowId,
206 timepoint: impl Into<TimePoint>,
207 component_batches: impl IntoIterator<
208 Item = (ComponentDescriptor, Option<&'a dyn ComponentBatch>),
209 >,
210 ) -> Self {
211 self.with_sparse_row(
212 row_id,
213 timepoint,
214 component_batches
215 .into_iter()
216 .map(|(component_desc, component_batch)| {
217 (
218 component_desc,
219 component_batch.and_then(|batch| batch.to_arrow().ok()),
220 )
221 }),
222 )
223 }
224
225 #[inline]
227 pub fn with_serialized_batch(
228 self,
229 row_id: RowId,
230 timepoint: impl Into<TimePoint>,
231 component_batch: SerializedComponentBatch,
232 ) -> Self {
233 self.with_row(
234 row_id,
235 timepoint,
236 [(component_batch.descriptor, component_batch.array)],
237 )
238 }
239
240 #[inline]
242 pub fn with_serialized_batches(
243 self,
244 row_id: RowId,
245 timepoint: impl Into<TimePoint>,
246 component_batches: impl IntoIterator<Item = SerializedComponentBatch>,
247 ) -> Self {
248 self.with_row(
249 row_id,
250 timepoint,
251 component_batches
252 .into_iter()
253 .map(|component_batch| (component_batch.descriptor, component_batch.array)),
254 )
255 }
256
257 #[inline]
259 pub fn with_sparse_serialized_batches(
260 self,
261 row_id: RowId,
262 timepoint: impl Into<TimePoint>,
263 component_batches: impl IntoIterator<
264 Item = (ComponentDescriptor, Option<SerializedComponentBatch>),
265 >,
266 ) -> Self {
267 self.with_sparse_row(
268 row_id,
269 timepoint,
270 component_batches
271 .into_iter()
272 .map(|(component_desc, component_batch)| {
273 (component_desc, component_batch.map(|batch| batch.array))
274 }),
275 )
276 }
277
278 #[inline]
291 pub fn build(self) -> ChunkResult<Chunk> {
292 re_tracing::profile_function!();
293 let Self {
294 id,
295 entity_path,
296 row_ids,
297 timelines,
298 components,
299 } = self;
300
301 let timelines = {
302 re_tracing::profile_scope!("timelines");
303 timelines
304 .into_iter()
305 .map(|(timeline, time_column)| (timeline, time_column.build()))
306 .collect()
307 };
308
309 let components = {
310 re_tracing::profile_scope!("components");
311 components
312 .into_iter()
313 .filter_map(|(component_desc, arrays)| {
314 let arrays = arrays.iter().map(|array| array.as_deref()).collect_vec();
315 re_arrow_util::arrays_to_list_array_opt(&arrays)
316 .map(|list_array| (component_desc, list_array))
317 })
318 .collect()
319 };
320
321 Chunk::from_native_row_ids(id, entity_path, None, &row_ids, timelines, components)
322 }
323
324 #[inline]
340 pub fn build_with_datatypes(
341 self,
342 datatypes: &IntMap<ComponentDescriptor, ArrowDatatype>,
343 ) -> ChunkResult<Chunk> {
344 let Self {
345 id,
346 entity_path,
347 row_ids,
348 timelines,
349 components,
350 } = self;
351
352 Chunk::from_native_row_ids(
353 id,
354 entity_path,
355 None,
356 &row_ids,
357 timelines
358 .into_iter()
359 .map(|(timeline, time_column)| (timeline, time_column.build()))
360 .collect(),
361 {
362 components
363 .into_iter()
364 .filter_map(|(component_desc, arrays)| {
365 let arrays = arrays.iter().map(|array| array.as_deref()).collect_vec();
366 if let Some(datatype) = datatypes.get(&component_desc) {
369 re_arrow_util::arrays_to_list_array(datatype.clone(), &arrays)
370 .map(|list_array| (component_desc, list_array))
371 } else {
372 re_arrow_util::arrays_to_list_array_opt(&arrays)
373 .map(|list_array| (component_desc, list_array))
374 }
375 })
376 .collect()
377 },
378 )
379 }
380}
381
382pub struct TimeColumnBuilder {
388 timeline: Timeline,
389
390 times: Vec<i64>,
391}
392
393impl TimeColumn {
394 #[inline]
396 pub fn builder(timeline: Timeline) -> TimeColumnBuilder {
397 TimeColumnBuilder::new(timeline)
398 }
399}
400
401impl TimeColumnBuilder {
402 #[inline]
406 pub fn new(timeline: Timeline) -> Self {
407 Self {
408 timeline,
409 times: Vec::new(),
410 }
411 }
412
413 #[inline]
415 pub fn with_row(&mut self, time: NonMinI64) -> &mut Self {
416 self.times.push(time.into());
417 self
418 }
419
420 #[inline]
422 pub fn build(self) -> TimeColumn {
423 let Self { timeline, times } = self;
424 TimeColumn::new(None, timeline, times.into())
425 }
426}