vortex_layout/layouts/chunked/
writer.rs1use std::sync::Arc;
2
3use vortex_array::arcref::ArcRef;
4use vortex_array::{ArrayContext, ArrayRef};
5use vortex_dtype::DType;
6use vortex_error::{VortexExpect, VortexResult};
7
8use crate::data::Layout;
9use crate::layouts::chunked::ChunkedLayout;
10use crate::layouts::flat::writer::FlatLayoutStrategy;
11use crate::segments::SegmentWriter;
12use crate::strategy::LayoutStrategy;
13use crate::writer::LayoutWriter;
14use crate::{LayoutVTableRef, LayoutWriterExt};
15
16#[derive(Clone)]
17pub struct ChunkedLayoutStrategy {
18 pub chunk_strategy: ArcRef<dyn LayoutStrategy>,
20}
21
22impl Default for ChunkedLayoutStrategy {
23 fn default() -> Self {
24 Self {
25 chunk_strategy: ArcRef::new_arc(Arc::new(FlatLayoutStrategy::default())),
26 }
27 }
28}
29
30impl LayoutStrategy for ChunkedLayoutStrategy {
31 fn new_writer(&self, ctx: &ArrayContext, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
32 Ok(ChunkedLayoutWriter::new(ctx.clone(), dtype.clone(), self.clone()).boxed())
33 }
34}
35
36pub struct ChunkedLayoutWriter {
40 ctx: ArrayContext,
41 options: ChunkedLayoutStrategy,
42 chunks: Vec<Box<dyn LayoutWriter>>,
43 dtype: DType,
44 row_count: u64,
45}
46
47impl ChunkedLayoutWriter {
48 pub fn new(ctx: ArrayContext, dtype: DType, options: ChunkedLayoutStrategy) -> Self {
49 Self {
50 ctx,
51 options,
52 chunks: vec![],
53 dtype,
54 row_count: 0,
55 }
56 }
57}
58
59impl LayoutWriter for ChunkedLayoutWriter {
60 fn push_chunk(
61 &mut self,
62 segment_writer: &mut dyn SegmentWriter,
63 chunk: ArrayRef,
64 ) -> VortexResult<()> {
65 self.row_count += chunk.len() as u64;
66
67 let mut chunk_writer = self
70 .options
71 .chunk_strategy
72 .new_writer(&self.ctx, chunk.dtype())?;
73 chunk_writer.push_chunk(segment_writer, chunk)?;
74 chunk_writer.flush(segment_writer)?;
75 self.chunks.push(chunk_writer);
76
77 Ok(())
78 }
79
80 fn flush(&mut self, _segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
81 Ok(())
83 }
84
85 fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
86 let mut children = vec![];
88 for writer in self.chunks.iter_mut() {
89 children.push(writer.finish(segment_writer)?);
91 }
92
93 if children.len() == 1 {
96 return Ok(children.pop().vortex_expect("child layout"));
97 }
98 Ok(chunked_layout(self.dtype.clone(), self.row_count, children))
99 }
100}
101
102pub(crate) fn chunked_layout(dtype: DType, row_count: u64, children: Vec<Layout>) -> Layout {
103 Layout::new_owned(
104 "chunked".into(),
105 LayoutVTableRef::new_ref(&ChunkedLayout),
106 dtype,
107 row_count,
108 vec![],
109 children,
110 None,
111 )
112}