1use std::sync::Arc;
2
3use arrow::array::Array as _;
4use arrow::array::ArrayRef as ArrowArrayRef;
5
6use re_log_types::{TimeInt, TimelineName};
7use re_types_core::{Component, ComponentDescriptor};
8
9use crate::{Chunk, ChunkResult, RowId};
10
11impl Chunk {
14 #[inline]
20 pub fn component_batch_raw(
21 &self,
22 component_descr: &ComponentDescriptor,
23 row_index: usize,
24 ) -> Option<ChunkResult<ArrowArrayRef>> {
25 let list_array = self.components.get(component_descr)?;
26 if list_array.len() > row_index {
27 list_array
28 .is_valid(row_index)
29 .then(|| Ok(list_array.value(row_index)))
30 } else {
31 Some(Err(crate::ChunkError::IndexOutOfBounds {
32 kind: "row".to_owned(),
33 len: list_array.len(),
34 index: row_index,
35 }))
36 }
37 }
38
39 #[inline]
43 pub fn component_batch<C: Component>(
44 &self,
45 component_descr: &ComponentDescriptor,
46 row_index: usize,
47 ) -> Option<ChunkResult<Vec<C>>> {
48 let res = self.component_batch_raw(component_descr, row_index)?;
49
50 let array = match res {
51 Ok(array) => array,
52 Err(err) => return Some(Err(err)),
53 };
54
55 let data = C::from_arrow(&*array);
56 Some(data.map_err(Into::into))
57 }
58
59 #[inline]
65 pub fn component_instance_raw(
66 &self,
67 component_descr: &ComponentDescriptor,
68 row_index: usize,
69 instance_index: usize,
70 ) -> Option<ChunkResult<ArrowArrayRef>> {
71 let res = self.component_batch_raw(component_descr, row_index)?;
72
73 let array = match res {
74 Ok(array) => array,
75 Err(err) => return Some(Err(err)),
76 };
77
78 if array.len() > instance_index {
79 Some(Ok(array.slice(instance_index, 1)))
80 } else {
81 Some(Err(crate::ChunkError::IndexOutOfBounds {
82 kind: "instance".to_owned(),
83 len: array.len(),
84 index: instance_index,
85 }))
86 }
87 }
88
89 #[inline]
94 pub fn component_instance<C: Component>(
95 &self,
96 component_descr: &ComponentDescriptor,
97 row_index: usize,
98 instance_index: usize,
99 ) -> Option<ChunkResult<C>> {
100 let res = self.component_instance_raw(component_descr, row_index, instance_index)?;
101
102 let array = match res {
103 Ok(array) => array,
104 Err(err) => return Some(Err(err)),
105 };
106
107 match C::from_arrow(&*array) {
108 Ok(data) => data.into_iter().next().map(Ok), Err(err) => Some(Err(err.into())),
110 }
111 }
112
113 #[inline]
120 pub fn component_mono_raw(
121 &self,
122 component_descr: &ComponentDescriptor,
123 row_index: usize,
124 ) -> Option<ChunkResult<ArrowArrayRef>> {
125 let res = self.component_batch_raw(component_descr, row_index)?;
126
127 let array = match res {
128 Ok(array) => array,
129 Err(err) => return Some(Err(err)),
130 };
131
132 if array.len() == 1 {
133 Some(Ok(array.slice(0, 1)))
134 } else {
135 Some(Err(crate::ChunkError::IndexOutOfBounds {
136 kind: "mono".to_owned(),
137 len: array.len(),
138 index: 0,
139 }))
140 }
141 }
142
143 #[inline]
148 pub fn component_mono<C: Component>(
149 &self,
150 component_descr: &ComponentDescriptor,
151 row_index: usize,
152 ) -> Option<ChunkResult<C>> {
153 let res = self.component_mono_raw(component_descr, row_index)?;
154
155 let array = match res {
156 Ok(array) => array,
157 Err(err) => return Some(Err(err)),
158 };
159
160 match C::from_arrow(&*array) {
161 Ok(data) => data.into_iter().next().map(Ok), Err(err) => Some(Err(err.into())),
163 }
164 }
165}
166
167pub type ChunkShared = Arc<Chunk>;
171
172#[derive(Debug, Clone)]
174pub struct UnitChunkShared(ChunkShared);
175
176impl std::ops::Deref for UnitChunkShared {
177 type Target = Chunk;
178
179 #[inline]
180 fn deref(&self) -> &Self::Target {
181 &self.0
182 }
183}
184
185impl re_byte_size::SizeBytes for UnitChunkShared {
186 #[inline]
187 fn heap_size_bytes(&self) -> u64 {
188 Chunk::heap_size_bytes(&self.0)
189 }
190}
191
192impl Chunk {
193 #[inline]
195 pub fn to_unit(self: &ChunkShared) -> Option<UnitChunkShared> {
196 (self.num_rows() == 1).then(|| UnitChunkShared(Arc::clone(self)))
197 }
198
199 #[inline]
201 pub fn into_unit(self) -> Option<UnitChunkShared> {
202 (self.num_rows() == 1).then(|| UnitChunkShared(Arc::new(self)))
203 }
204}
205
206impl UnitChunkShared {
207 #[inline]
209 pub fn into_chunk(self) -> ChunkShared {
210 self.0
211 }
212}
213
214impl UnitChunkShared {
215 #[inline]
219 pub fn index(&self, timeline: &TimelineName) -> Option<(TimeInt, RowId)> {
220 debug_assert!(self.num_rows() == 1);
221 if self.is_static() {
222 self.row_ids()
223 .next()
224 .map(|row_id| (TimeInt::STATIC, row_id))
225 } else {
226 let time_column = self.timelines.get(timeline)?;
227 let time = time_column.times().next()?;
228 self.row_ids().next().map(|row_id| (time, row_id))
229 }
230 }
231
232 #[inline]
236 pub fn row_id(&self) -> Option<RowId> {
237 debug_assert!(self.num_rows() == 1);
238 self.row_ids().next()
239 }
240
241 #[inline]
243 pub fn num_instances(&self, component_descr: &ComponentDescriptor) -> u64 {
244 debug_assert!(self.num_rows() == 1);
245 self.components
246 .iter()
247 .filter(|&(descr, _list_array)| descr == component_descr)
248 .map(|(_descr, list_array)| {
249 let array = list_array.value(0);
250 array.nulls().map_or_else(
251 || array.len(),
252 |validity| validity.len() - validity.null_count(),
253 )
254 })
255 .max()
256 .unwrap_or(0) as u64
257 }
258}
259
260impl UnitChunkShared {
263 #[inline]
267 pub fn component_batch_raw(
268 &self,
269 component_descr: &ComponentDescriptor,
270 ) -> Option<ArrowArrayRef> {
271 debug_assert!(self.num_rows() == 1);
272 let list_array = self.components.get(component_descr)?;
273 list_array.is_valid(0).then(|| list_array.value(0))
274 }
275
276 #[inline]
281 pub fn component_batch<C: Component>(
282 &self,
283 component_descr: &ComponentDescriptor,
284 ) -> Option<ChunkResult<Vec<C>>> {
285 debug_assert_eq!(Some(C::name()), component_descr.component_type);
286 let data = C::from_arrow(&*self.component_batch_raw(component_descr)?);
287 Some(data.map_err(Into::into))
288 }
289
290 #[inline]
296 pub fn component_instance_raw(
297 &self,
298 component_descr: &ComponentDescriptor,
299 instance_index: usize,
300 ) -> Option<ChunkResult<ArrowArrayRef>> {
301 let array = self.component_batch_raw(component_descr)?;
302 if array.len() > instance_index {
303 Some(Ok(array.slice(instance_index, 1)))
304 } else {
305 Some(Err(crate::ChunkError::IndexOutOfBounds {
306 kind: "instance".to_owned(),
307 len: array.len(),
308 index: instance_index,
309 }))
310 }
311 }
312
313 #[inline]
318 pub fn component_instance<C: Component>(
319 &self,
320 component_descr: &ComponentDescriptor,
321 instance_index: usize,
322 ) -> Option<ChunkResult<C>> {
323 debug_assert_eq!(Some(C::name()), component_descr.component_type);
324 let res = self.component_instance_raw(component_descr, instance_index)?;
325
326 let array = match res {
327 Ok(array) => array,
328 Err(err) => return Some(Err(err)),
329 };
330
331 match C::from_arrow(&*array) {
332 Ok(data) => data.into_iter().next().map(Ok), Err(err) => Some(Err(err.into())),
334 }
335 }
336
337 #[inline]
343 pub fn component_mono_raw(
344 &self,
345 component_descr: &ComponentDescriptor,
346 ) -> Option<ChunkResult<ArrowArrayRef>> {
347 let array = self.component_batch_raw(component_descr)?;
348 if array.len() == 1 {
349 Some(Ok(array.slice(0, 1)))
350 } else {
351 Some(Err(crate::ChunkError::IndexOutOfBounds {
352 kind: "mono".to_owned(),
353 len: array.len(),
354 index: 0,
355 }))
356 }
357 }
358
359 #[inline]
364 pub fn component_mono<C: Component>(
365 &self,
366 component_descr: &ComponentDescriptor,
367 ) -> Option<ChunkResult<C>> {
368 debug_assert_eq!(Some(C::name()), component_descr.component_type);
369 let res = self.component_mono_raw(component_descr)?;
370
371 let array = match res {
372 Ok(array) => array,
373 Err(err) => return Some(Err(err)),
374 };
375
376 match C::from_arrow(&*array) {
377 Ok(data) => data.into_iter().next().map(Ok), Err(err) => Some(Err(err.into())),
379 }
380 }
381}