1use std::sync::Arc;
5
6use futures::future::try_join_all;
7use termtree::Tree;
8use vortex_array::serde::SerializedArray;
9use vortex_error::VortexResult;
10use vortex_utils::aliases::hash_map::HashMap;
11
12use crate::LayoutRef;
13use crate::layouts::flat::Flat;
14use crate::layouts::flat::FlatLayout;
15use crate::segments::SegmentId;
16use crate::segments::SegmentSource;
17
18pub(super) async fn display_tree_with_segment_sizes(
25 layout: LayoutRef,
26 segment_source: Arc<dyn SegmentSource>,
27) -> VortexResult<DisplayLayoutTree> {
28 let mut segments_to_fetch = Vec::new();
30 collect_segments_to_fetch(&layout, &mut segments_to_fetch)?;
31 segments_to_fetch.dedup();
32
33 let fetch_futures = segments_to_fetch.iter().map(|&segment_id| {
35 let segment_source = Arc::clone(&segment_source);
36 async move {
37 let buffer = segment_source.request(segment_id).await?;
38 let parts = SerializedArray::try_from(buffer)?;
39 VortexResult::Ok((segment_id, parts.buffer_lengths()))
40 }
41 });
42 let results = try_join_all(fetch_futures).await?;
43 let segment_buffer_sizes: HashMap<SegmentId, Vec<usize>> = results.into_iter().collect();
44
45 Ok(DisplayLayoutTree {
46 layout,
47 segment_buffer_sizes: Some(segment_buffer_sizes),
48 verbose: true,
49 })
50}
51
52fn collect_segments_to_fetch(
54 layout: &LayoutRef,
55 segment_ids: &mut Vec<SegmentId>,
56) -> VortexResult<()> {
57 if let Some(flat_layout) = layout.as_opt::<Flat>() {
59 if flat_layout.array_tree().is_none() {
60 segment_ids.push(flat_layout.segment_id());
61 }
62 } else {
63 segment_ids.extend(layout.segment_ids());
65 }
66
67 for child in layout.children()? {
69 collect_segments_to_fetch(&child, segment_ids)?;
70 }
71 Ok(())
72}
73
74fn format_flat_layout_buffers(
76 flat_layout: &FlatLayout,
77 segment_buffer_sizes: Option<&HashMap<SegmentId, Vec<usize>>>,
78) -> String {
79 let segment_id = flat_layout.segment_id();
80
81 if let Some(array_tree) = flat_layout.array_tree()
83 && let Ok(parts) = SerializedArray::from_array_tree(array_tree.as_ref().to_vec())
84 {
85 return format_buffer_sizes(&parts.buffer_lengths(), *segment_id);
86 }
87
88 if let Some(sizes_map) = segment_buffer_sizes
90 && let Some(buffer_sizes) = sizes_map.get(&segment_id)
91 {
92 return format_buffer_sizes(buffer_sizes, *segment_id);
93 }
94
95 format!("segment: {}", *segment_id)
97}
98
99fn format_buffer_sizes(buffer_sizes: &[usize], segment_id: u32) -> String {
100 let buffer_sizes_str: Vec<String> = buffer_sizes.iter().map(|s| format!("{}B", s)).collect();
101 let total: usize = buffer_sizes.iter().sum();
102 format!(
103 "segment {}, buffers=[{}], total={}B",
104 segment_id,
105 buffer_sizes_str.join(", "),
106 total
107 )
108}
109
110pub struct DisplayLayoutTree {
112 layout: LayoutRef,
113 segment_buffer_sizes: Option<HashMap<SegmentId, Vec<usize>>>,
114 verbose: bool,
115}
116
117impl DisplayLayoutTree {
118 pub fn new(layout: LayoutRef, verbose: bool) -> Self {
120 Self {
121 layout,
122 segment_buffer_sizes: None,
123 verbose,
124 }
125 }
126
127 fn make_tree(&self, layout: LayoutRef) -> VortexResult<Tree<String>> {
128 let mut node_parts = vec![
130 format!("{}", layout.encoding()),
131 format!("dtype: {}", layout.dtype()),
132 ];
133
134 let nchildren = layout.nchildren();
136 if nchildren > 0 {
137 node_parts.push(format!("children: {}", nchildren));
138 }
139
140 if self.verbose {
142 let metadata = layout.metadata();
143 if !metadata.is_empty() {
144 node_parts.push(format!("metadata: {} bytes", metadata.len()));
145 }
146 node_parts.push(format!("rows: {}", layout.row_count()));
147 }
148
149 if let Some(flat_layout) = layout.as_opt::<Flat>() {
151 node_parts.push(format_flat_layout_buffers(
152 flat_layout,
153 self.segment_buffer_sizes.as_ref(),
154 ));
155 } else {
156 if self.verbose {
158 let segment_ids = layout.segment_ids();
159 if !segment_ids.is_empty() {
160 node_parts.push(format!(
161 "segments: [{}]",
162 segment_ids
163 .iter()
164 .map(|s| format!("{}", **s))
165 .collect::<Vec<_>>()
166 .join(", ")
167 ));
168 }
169 }
170 }
171
172 let node_name = node_parts.join(", ");
173
174 let children = layout.children()?;
176 let child_names: Vec<_> = layout.child_names().collect();
177
178 let child_trees: VortexResult<Vec<Tree<String>>> =
180 if !children.is_empty() && child_names.len() == children.len() {
181 children
183 .into_iter()
184 .zip(child_names.iter())
185 .map(|(child, name)| {
186 let child_tree = self.make_tree(child)?;
187 Ok(Tree::new(format!("{}: {}", name, child_tree.root))
188 .with_leaves(child_tree.leaves))
189 })
190 .collect()
191 } else if !children.is_empty() {
192 children.into_iter().map(|c| self.make_tree(c)).collect()
194 } else {
195 Ok(Vec::new())
197 };
198
199 Ok(Tree::new(node_name).with_leaves(child_trees?))
200 }
201}
202
203impl std::fmt::Display for DisplayLayoutTree {
204 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205 match self.make_tree(Arc::clone(&self.layout)) {
206 Ok(tree) => write!(f, "{}", tree),
207 Err(e) => write!(f, "Error building layout tree: {}", e),
208 }
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use std::sync::Arc;
215
216 use vortex_array::ArrayContext;
217 use vortex_array::IntoArray;
218 use vortex_array::arrays::BoolArray;
219 use vortex_array::arrays::PrimitiveArray;
220 use vortex_array::builders::ArrayBuilder;
221 use vortex_array::builders::VarBinViewBuilder;
222 use vortex_array::dtype::DType;
223 use vortex_array::dtype::FieldName;
224 use vortex_array::dtype::Nullability;
225 use vortex_array::dtype::Nullability::NonNullable;
226 use vortex_array::dtype::PType;
227 use vortex_array::dtype::StructFields;
228 use vortex_array::serde::SerializedArray;
229 use vortex_array::validity::Validity;
230 use vortex_buffer::BitBufferMut;
231 use vortex_buffer::buffer;
232 use vortex_io::runtime::single::block_on;
233 use vortex_io::session::RuntimeSessionExt;
234
235 use crate::IntoLayout;
236 use crate::OwnedLayoutChildren;
237 use crate::layouts::chunked::ChunkedLayout;
238 use crate::layouts::flat::Flat;
239 use crate::layouts::flat::writer::FlatLayoutStrategy;
240 use crate::layouts::struct_::StructLayout;
241 use crate::segments::TestSegments;
242 use crate::sequence::SequenceId;
243 use crate::sequence::SequentialArrayStreamExt;
244 use crate::strategy::LayoutStrategy;
245 use crate::test::SESSION;
246
247 #[test]
249 fn test_display_tree_inline_array_tree() {
250 if std::env::var("NEXTEST_RUN_ID").is_ok() {
252 temp_env::with_var("FLAT_LAYOUT_INLINE_ARRAY_NODE", Some("1"), || {
253 block_on(|handle| async move {
254 let session = SESSION.clone().with_handle(handle);
255 let ctx = ArrayContext::empty();
256 let segments = Arc::new(TestSegments::default());
257
258 let (ptr1, eof1) = SequenceId::root().split();
260 let mut validity_builder = BitBufferMut::with_capacity(5);
261 for b in [true, false, true, true, false] {
262 validity_builder.append(b);
263 }
264 let validity = Validity::Array(
265 BoolArray::new(validity_builder.freeze(), Validity::NonNullable)
266 .into_array(),
267 );
268 let array1 = PrimitiveArray::new(buffer![1i64, 2, 3, 4, 5], validity);
269 let layout1 = FlatLayoutStrategy::default()
270 .write_stream(
271 ctx.clone(),
272 Arc::<TestSegments>::clone(&segments),
273 array1.into_array().to_array_stream().sequenced(ptr1),
274 eof1,
275 &session,
276 )
277 .await
278 .unwrap();
279
280 let (ptr2, eof2) = SequenceId::root().split();
282 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(NonNullable), 5);
283 for s in [
284 "hello world this is long",
285 "another long string",
286 "short",
287 "medium str",
288 "x",
289 ] {
290 builder.append_value(s);
291 }
292 let layout2 = FlatLayoutStrategy::default()
293 .write_stream(
294 ctx.clone(),
295 Arc::<TestSegments>::clone(&segments),
296 builder
297 .finish()
298 .into_array()
299 .to_array_stream()
300 .sequenced(ptr2),
301 eof2,
302 &session,
303 )
304 .await
305 .unwrap();
306
307 let struct_layout = StructLayout::new(
309 5,
310 DType::Struct(
311 StructFields::new(
312 vec![FieldName::from("numbers"), FieldName::from("strings")].into(),
313 vec![
314 DType::Primitive(PType::I64, Nullability::Nullable),
315 DType::Utf8(NonNullable),
316 ],
317 ),
318 NonNullable,
319 ),
320 vec![
321 ChunkedLayout::new(
322 5,
323 DType::Primitive(PType::I64, Nullability::Nullable),
324 OwnedLayoutChildren::layout_children(vec![layout1]),
325 )
326 .into_layout(),
327 layout2,
328 ],
329 )
330 .into_layout();
331
332 let output = format!("{}", struct_layout.display_tree_verbose(true));
333
334 let expected = "\
335vortex.struct, dtype: {numbers=i64?, strings=utf8}, children: 2, rows: 5
336├── numbers: vortex.chunked, dtype: i64?, children: 1, rows: 5
337│ └── [0]: vortex.flat, dtype: i64?, metadata: 171 bytes, rows: 5, segment 0, buffers=[40B, 1B], total=41B
338└── strings: vortex.flat, dtype: utf8, metadata: 110 bytes, rows: 5, segment 1, buffers=[43B, 80B], total=123B
339";
340 assert_eq!(output, expected);
341 })
342 })
343 }
344 }
345
346 #[test]
348 fn test_display_tree_with_segment_source() {
349 if std::env::var("NEXTEST_RUN_ID").is_ok() {
350 temp_env::with_var("FLAT_LAYOUT_INLINE_ARRAY_NODE", None::<&str>, || {
351 block_on(|handle| async move {
352 let session = SESSION.clone().with_handle(handle);
353 let ctx = ArrayContext::empty();
354 let segments = Arc::new(TestSegments::default());
355
356 let (ptr1, eof1) = SequenceId::root().split();
358 let array1 =
359 PrimitiveArray::new(buffer![1i32, 2, 3, 4, 5], Validity::NonNullable);
360 let layout1 = FlatLayoutStrategy::default()
361 .write_stream(
362 ctx.clone(),
363 Arc::<TestSegments>::clone(&segments),
364 array1.into_array().to_array_stream().sequenced(ptr1),
365 eof1,
366 &session,
367 )
368 .await
369 .unwrap();
370
371 let (ptr2, eof2) = SequenceId::root().split();
373 let array2 =
374 PrimitiveArray::new(buffer![6i32, 7, 8, 9, 10], Validity::NonNullable);
375 let layout2 = FlatLayoutStrategy::default()
376 .write_stream(
377 ctx.clone(),
378 Arc::<TestSegments>::clone(&segments),
379 array2.into_array().to_array_stream().sequenced(ptr2),
380 eof2,
381 &session,
382 )
383 .await
384 .unwrap();
385
386 let chunked_layout = ChunkedLayout::new(
388 10,
389 DType::Primitive(PType::I32, NonNullable),
390 OwnedLayoutChildren::layout_children(vec![layout1, layout2]),
391 )
392 .into_layout();
393
394 let output = chunked_layout
395 .display_tree_with_segments(segments)
396 .await
397 .unwrap();
398
399 let expected = "\
400vortex.chunked, dtype: i32, children: 2, rows: 10
401├── [0]: vortex.flat, dtype: i32, rows: 5, segment 0, buffers=[20B], total=20B
402└── [1]: vortex.flat, dtype: i32, rows: 5, segment 1, buffers=[20B], total=20B
403";
404 assert_eq!(output.to_string(), expected);
405 })
406 })
407 }
408 }
409
410 #[test]
412 fn test_display_array_tree_with_inline_node() {
413 if std::env::var("NEXTEST_RUN_ID").is_ok() {
414 temp_env::with_var("FLAT_LAYOUT_INLINE_ARRAY_NODE", Some("1"), || {
415 let ctx = ArrayContext::empty();
416 let segments = Arc::new(TestSegments::default());
417 let (ptr, eof) = SequenceId::root().split();
418
419 let array = PrimitiveArray::new(buffer![1i32, 2, 3, 4, 5], Validity::AllValid);
421 let layout = block_on(|handle| async {
422 let session = SESSION.clone().with_handle(handle);
423 FlatLayoutStrategy::default()
424 .write_stream(
425 ctx.clone(),
426 Arc::<TestSegments>::clone(&segments),
427 array.into_array().to_array_stream().sequenced(ptr),
428 eof,
429 &session,
430 )
431 .await
432 .unwrap()
433 });
434
435 let flat_layout = layout.as_::<Flat>();
436
437 let array_tree = flat_layout.array_tree().expect(
438 "array_tree should be populated when FLAT_LAYOUT_INLINE_ARRAY_NODE is set",
439 );
440
441 let parts = SerializedArray::from_array_tree(array_tree.as_ref().to_vec())
442 .expect("should parse array_tree");
443 assert_eq!(parts.buffer_lengths(), vec![20]); assert_eq!(
446 layout.display_tree().to_string(),
447 "\
448vortex.flat, dtype: i32?, segment 0, buffers=[20B], total=20B
449"
450 );
451 })
452 }
453 }
454
455 #[test]
457 fn test_display_tree_without_inline_node() {
458 if std::env::var("NEXTEST_RUN_ID").is_ok() {
459 temp_env::with_var("FLAT_LAYOUT_INLINE_ARRAY_NODE", Some("1"), || {
460 let ctx = ArrayContext::empty();
461 let segments = Arc::new(TestSegments::default());
462 let (ptr, eof) = SequenceId::root().split();
463
464 let array = PrimitiveArray::new(buffer![10i64, 20, 30], Validity::NonNullable);
466 let layout = block_on(|handle| async {
467 let session = SESSION.clone().with_handle(handle);
468 FlatLayoutStrategy::default()
469 .write_stream(
470 ctx,
471 Arc::<TestSegments>::clone(&segments),
472 array.into_array().to_array_stream().sequenced(ptr),
473 eof,
474 &session,
475 )
476 .await
477 .unwrap()
478 });
479
480 assert_eq!(
482 layout.display_tree().to_string(),
483 "\
484vortex.flat, dtype: i64, segment 0, buffers=[24B], total=24B
485"
486 );
487 })
488 }
489 }
490}