1use std::sync::Arc;
5
6use futures::future::try_join_all;
7use termtree::Tree;
8use vortex_array::serde::SerializedArray;
9use vortex_error::VortexResult;
10use vortex_utils::aliases::hash_map::HashMap;
11
12use crate::LayoutRef;
13use crate::layouts::flat::Flat;
14use crate::layouts::flat::FlatLayout;
15use crate::segments::SegmentId;
16use crate::segments::SegmentSource;
17
18pub(super) async fn display_tree_with_segment_sizes(
25 layout: LayoutRef,
26 segment_source: Arc<dyn SegmentSource>,
27) -> VortexResult<DisplayLayoutTree> {
28 let mut segments_to_fetch = Vec::new();
30 collect_segments_to_fetch(&layout, &mut segments_to_fetch)?;
31 segments_to_fetch.dedup();
32
33 let fetch_futures = segments_to_fetch.iter().map(|&segment_id| {
35 let segment_source = Arc::clone(&segment_source);
36 async move {
37 let buffer = segment_source.request(segment_id).await?;
38 let parts = SerializedArray::try_from(buffer)?;
39 VortexResult::Ok((segment_id, parts.buffer_lengths()))
40 }
41 });
42 let results = try_join_all(fetch_futures).await?;
43 let segment_buffer_sizes: HashMap<SegmentId, Vec<usize>> = results.into_iter().collect();
44
45 Ok(DisplayLayoutTree {
46 layout,
47 segment_buffer_sizes: Some(segment_buffer_sizes),
48 verbose: true,
49 })
50}
51
52fn collect_segments_to_fetch(
54 layout: &LayoutRef,
55 segment_ids: &mut Vec<SegmentId>,
56) -> VortexResult<()> {
57 if let Some(flat_layout) = layout.as_opt::<Flat>() {
59 if flat_layout.array_tree().is_none() {
60 segment_ids.push(flat_layout.segment_id());
61 }
62 } else {
63 segment_ids.extend(layout.segment_ids());
65 }
66
67 for child in layout.children()? {
69 collect_segments_to_fetch(&child, segment_ids)?;
70 }
71 Ok(())
72}
73
74fn format_flat_layout_buffers(
76 flat_layout: &FlatLayout,
77 segment_buffer_sizes: Option<&HashMap<SegmentId, Vec<usize>>>,
78) -> String {
79 let segment_id = flat_layout.segment_id();
80
81 if let Some(array_tree) = flat_layout.array_tree()
83 && let Ok(parts) = SerializedArray::from_array_tree(array_tree.as_ref().to_vec())
84 {
85 return format_buffer_sizes(&parts.buffer_lengths(), *segment_id);
86 }
87
88 if let Some(sizes_map) = segment_buffer_sizes
90 && let Some(buffer_sizes) = sizes_map.get(&segment_id)
91 {
92 return format_buffer_sizes(buffer_sizes, *segment_id);
93 }
94
95 format!("segment: {}", *segment_id)
97}
98
99fn format_buffer_sizes(buffer_sizes: &[usize], segment_id: u32) -> String {
100 let buffer_sizes_str: Vec<String> = buffer_sizes.iter().map(|s| format!("{}B", s)).collect();
101 let total: usize = buffer_sizes.iter().sum();
102 format!(
103 "segment {}, buffers=[{}], total={}B",
104 segment_id,
105 buffer_sizes_str.join(", "),
106 total
107 )
108}
109
110pub struct DisplayLayoutTree {
112 layout: LayoutRef,
113 segment_buffer_sizes: Option<HashMap<SegmentId, Vec<usize>>>,
114 verbose: bool,
115}
116
117impl DisplayLayoutTree {
118 pub fn new(layout: LayoutRef, verbose: bool) -> Self {
120 Self {
121 layout,
122 segment_buffer_sizes: None,
123 verbose,
124 }
125 }
126
127 fn make_tree(&self, layout: LayoutRef) -> VortexResult<Tree<String>> {
128 let mut node_parts = vec![
130 format!("{}", layout.encoding()),
131 format!("dtype: {}", layout.dtype()),
132 ];
133
134 let nchildren = layout.nchildren();
136 if nchildren > 0 {
137 node_parts.push(format!("children: {}", nchildren));
138 }
139
140 if self.verbose {
142 let metadata = layout.metadata();
143 if !metadata.is_empty() {
144 node_parts.push(format!("metadata: {} bytes", metadata.len()));
145 }
146 node_parts.push(format!("rows: {}", layout.row_count()));
147 }
148
149 if let Some(flat_layout) = layout.as_opt::<Flat>() {
151 node_parts.push(format_flat_layout_buffers(
152 flat_layout,
153 self.segment_buffer_sizes.as_ref(),
154 ));
155 } else {
156 if self.verbose {
158 let segment_ids = layout.segment_ids();
159 if !segment_ids.is_empty() {
160 node_parts.push(format!(
161 "segments: [{}]",
162 segment_ids
163 .iter()
164 .map(|s| format!("{}", **s))
165 .collect::<Vec<_>>()
166 .join(", ")
167 ));
168 }
169 }
170 }
171
172 let node_name = node_parts.join(", ");
173
174 let children = layout.children()?;
176 let child_names: Vec<_> = layout.child_names().collect();
177
178 let child_trees: VortexResult<Vec<Tree<String>>> =
180 if !children.is_empty() && child_names.len() == children.len() {
181 children
183 .into_iter()
184 .zip(child_names.iter())
185 .map(|(child, name)| {
186 let child_tree = self.make_tree(child)?;
187 Ok(Tree::new(format!("{}: {}", name, child_tree.root))
188 .with_leaves(child_tree.leaves))
189 })
190 .collect()
191 } else if !children.is_empty() {
192 children.into_iter().map(|c| self.make_tree(c)).collect()
194 } else {
195 Ok(Vec::new())
197 };
198
199 Ok(Tree::new(node_name).with_leaves(child_trees?))
200 }
201}
202
203impl std::fmt::Display for DisplayLayoutTree {
204 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205 match self.make_tree(Arc::clone(&self.layout)) {
206 Ok(tree) => write!(f, "{}", tree),
207 Err(e) => write!(f, "Error building layout tree: {}", e),
208 }
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use std::sync::Arc;
215
216 use vortex_array::ArrayContext;
217 use vortex_array::IntoArray;
218 use vortex_array::arrays::BoolArray;
219 use vortex_array::arrays::PrimitiveArray;
220 use vortex_array::builders::ArrayBuilder;
221 use vortex_array::builders::VarBinViewBuilder;
222 use vortex_array::dtype::DType;
223 use vortex_array::dtype::FieldName;
224 use vortex_array::dtype::Nullability;
225 use vortex_array::dtype::Nullability::NonNullable;
226 use vortex_array::dtype::PType;
227 use vortex_array::dtype::StructFields;
228 use vortex_array::serde::SerializedArray;
229 use vortex_array::validity::Validity;
230 use vortex_buffer::BitBufferMut;
231 use vortex_buffer::buffer;
232 use vortex_io::runtime::single::block_on;
233 use vortex_io::session::RuntimeSessionExt;
234 use vortex_utils::env::EnvVarGuard;
235
236 use crate::IntoLayout;
237 use crate::OwnedLayoutChildren;
238 use crate::layouts::chunked::ChunkedLayout;
239 use crate::layouts::flat::Flat;
240 use crate::layouts::flat::writer::FlatLayoutStrategy;
241 use crate::layouts::struct_::StructLayout;
242 use crate::segments::TestSegments;
243 use crate::sequence::SequenceId;
244 use crate::sequence::SequentialArrayStreamExt;
245 use crate::strategy::LayoutStrategy;
246 use crate::test::SESSION;
247
248 #[test]
250 fn test_display_tree_inline_array_tree() {
251 let _guard = EnvVarGuard::set("FLAT_LAYOUT_INLINE_ARRAY_NODE", "1");
252 block_on(|handle| async move {
253 let session = SESSION.clone().with_handle(handle);
254 let ctx = ArrayContext::empty();
255 let segments = Arc::new(TestSegments::default());
256
257 let (ptr1, eof1) = SequenceId::root().split();
259 let mut validity_builder = BitBufferMut::with_capacity(5);
260 for b in [true, false, true, true, false] {
261 validity_builder.append(b);
262 }
263 let validity = Validity::Array(
264 BoolArray::new(validity_builder.freeze(), Validity::NonNullable).into_array(),
265 );
266 let array1 = PrimitiveArray::new(buffer![1i64, 2, 3, 4, 5], validity);
267 let layout1 = FlatLayoutStrategy::default()
268 .write_stream(
269 ctx.clone(),
270 Arc::<TestSegments>::clone(&segments),
271 array1.into_array().to_array_stream().sequenced(ptr1),
272 eof1,
273 &session,
274 )
275 .await
276 .unwrap();
277
278 let (ptr2, eof2) = SequenceId::root().split();
280 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(NonNullable), 5);
281 for s in [
282 "hello world this is long",
283 "another long string",
284 "short",
285 "medium str",
286 "x",
287 ] {
288 builder.append_value(s);
289 }
290 let layout2 = FlatLayoutStrategy::default()
291 .write_stream(
292 ctx.clone(),
293 Arc::<TestSegments>::clone(&segments),
294 builder
295 .finish()
296 .into_array()
297 .to_array_stream()
298 .sequenced(ptr2),
299 eof2,
300 &session,
301 )
302 .await
303 .unwrap();
304
305 let struct_layout = StructLayout::new(
307 5,
308 DType::Struct(
309 StructFields::new(
310 vec![FieldName::from("numbers"), FieldName::from("strings")].into(),
311 vec![
312 DType::Primitive(PType::I64, Nullability::Nullable),
313 DType::Utf8(NonNullable),
314 ],
315 ),
316 NonNullable,
317 ),
318 vec![
319 ChunkedLayout::new(
320 5,
321 DType::Primitive(PType::I64, Nullability::Nullable),
322 OwnedLayoutChildren::layout_children(vec![layout1]),
323 )
324 .into_layout(),
325 layout2,
326 ],
327 )
328 .into_layout();
329
330 let output = format!("{}", struct_layout.display_tree_verbose(true));
331
332 let expected = "\
333vortex.struct, dtype: {numbers=i64?, strings=utf8}, children: 2, rows: 5
334├── numbers: vortex.chunked, dtype: i64?, children: 1, rows: 5
335│ └── [0]: vortex.flat, dtype: i64?, metadata: 171 bytes, rows: 5, segment 0, buffers=[40B, 1B], total=41B
336└── strings: vortex.flat, dtype: utf8, metadata: 110 bytes, rows: 5, segment 1, buffers=[43B, 80B], total=123B
337";
338 assert_eq!(output, expected);
339 })
340 }
341
342 #[test]
344 fn test_display_tree_with_segment_source() {
345 let _guard = EnvVarGuard::remove("FLAT_LAYOUT_INLINE_ARRAY_NODE");
347 block_on(|handle| async move {
348 let session = SESSION.clone().with_handle(handle);
349 let ctx = ArrayContext::empty();
350 let segments = Arc::new(TestSegments::default());
351
352 let (ptr1, eof1) = SequenceId::root().split();
354 let array1 = PrimitiveArray::new(buffer![1i32, 2, 3, 4, 5], Validity::NonNullable);
355 let layout1 = FlatLayoutStrategy::default()
356 .write_stream(
357 ctx.clone(),
358 Arc::<TestSegments>::clone(&segments),
359 array1.into_array().to_array_stream().sequenced(ptr1),
360 eof1,
361 &session,
362 )
363 .await
364 .unwrap();
365
366 let (ptr2, eof2) = SequenceId::root().split();
368 let array2 = PrimitiveArray::new(buffer![6i32, 7, 8, 9, 10], Validity::NonNullable);
369 let layout2 = FlatLayoutStrategy::default()
370 .write_stream(
371 ctx.clone(),
372 Arc::<TestSegments>::clone(&segments),
373 array2.into_array().to_array_stream().sequenced(ptr2),
374 eof2,
375 &session,
376 )
377 .await
378 .unwrap();
379
380 let chunked_layout = ChunkedLayout::new(
382 10,
383 DType::Primitive(PType::I32, NonNullable),
384 OwnedLayoutChildren::layout_children(vec![layout1, layout2]),
385 )
386 .into_layout();
387
388 let output = chunked_layout
389 .display_tree_with_segments(segments)
390 .await
391 .unwrap();
392
393 let expected = "\
394vortex.chunked, dtype: i32, children: 2, rows: 10
395├── [0]: vortex.flat, dtype: i32, rows: 5, segment 0, buffers=[20B], total=20B
396└── [1]: vortex.flat, dtype: i32, rows: 5, segment 1, buffers=[20B], total=20B
397";
398 assert_eq!(output.to_string(), expected);
399 })
400 }
401
402 #[test]
404 fn test_display_array_tree_with_inline_node() {
405 let _guard = EnvVarGuard::set("FLAT_LAYOUT_INLINE_ARRAY_NODE", "1");
406
407 let ctx = ArrayContext::empty();
408 let segments = Arc::new(TestSegments::default());
409 let (ptr, eof) = SequenceId::root().split();
410
411 let array = PrimitiveArray::new(buffer![1i32, 2, 3, 4, 5], Validity::AllValid);
413 let layout = block_on(|handle| async {
414 let session = SESSION.clone().with_handle(handle);
415 FlatLayoutStrategy::default()
416 .write_stream(
417 ctx.clone(),
418 Arc::<TestSegments>::clone(&segments),
419 array.into_array().to_array_stream().sequenced(ptr),
420 eof,
421 &session,
422 )
423 .await
424 .unwrap()
425 });
426
427 let flat_layout = layout.as_::<Flat>();
428
429 let array_tree = flat_layout
430 .array_tree()
431 .expect("array_tree should be populated when FLAT_LAYOUT_INLINE_ARRAY_NODE is set");
432
433 let parts = SerializedArray::from_array_tree(array_tree.as_ref().to_vec())
434 .expect("should parse array_tree");
435 assert_eq!(parts.buffer_lengths(), vec![20]); assert_eq!(
438 layout.display_tree().to_string(),
439 "\
440vortex.flat, dtype: i32?, segment 0, buffers=[20B], total=20B
441"
442 );
443 }
444
445 #[test]
447 fn test_display_tree_without_inline_node() {
448 let _guard = EnvVarGuard::set("FLAT_LAYOUT_INLINE_ARRAY_NODE", "1");
449
450 let ctx = ArrayContext::empty();
451 let segments = Arc::new(TestSegments::default());
452 let (ptr, eof) = SequenceId::root().split();
453
454 let array = PrimitiveArray::new(buffer![10i64, 20, 30], Validity::NonNullable);
456 let layout = block_on(|handle| async {
457 let session = SESSION.clone().with_handle(handle);
458 FlatLayoutStrategy::default()
459 .write_stream(
460 ctx,
461 Arc::<TestSegments>::clone(&segments),
462 array.into_array().to_array_stream().sequenced(ptr),
463 eof,
464 &session,
465 )
466 .await
467 .unwrap()
468 });
469
470 assert_eq!(
472 layout.display_tree().to_string(),
473 "\
474vortex.flat, dtype: i64, segment 0, buffers=[24B], total=24B
475"
476 );
477 }
478}