vortex_layout/layouts/chunked/
writer.rs1use std::sync::Arc;
2
3use arcref::ArcRef;
4use vortex_array::{ArrayContext, ArrayRef};
5use vortex_dtype::DType;
6use vortex_error::{VortexExpect, VortexResult};
7
8use crate::data::Layout;
9use crate::layouts::chunked::ChunkedLayout;
10use crate::layouts::flat::writer::FlatLayoutStrategy;
11use crate::segments::SegmentWriter;
12use crate::strategy::LayoutStrategy;
13use crate::writer::LayoutWriter;
14use crate::{LayoutVTableRef, LayoutWriterExt};
15
16#[derive(Clone)]
17pub struct ChunkedLayoutStrategy {
18 pub chunk_strategy: ArcRef<dyn LayoutStrategy>,
20}
21
22impl Default for ChunkedLayoutStrategy {
23 fn default() -> Self {
24 Self {
25 chunk_strategy: ArcRef::new_arc(Arc::new(FlatLayoutStrategy::default())),
26 }
27 }
28}
29
30impl LayoutStrategy for ChunkedLayoutStrategy {
31 fn new_writer(&self, ctx: &ArrayContext, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
32 Ok(ChunkedLayoutWriter::new(ctx.clone(), dtype.clone(), self.clone()).boxed())
33 }
34}
35
36pub struct ChunkedLayoutWriter {
40 ctx: ArrayContext,
41 options: ChunkedLayoutStrategy,
42 chunks: Vec<Box<dyn LayoutWriter>>,
43 dtype: DType,
44 row_count: u64,
45}
46
47impl ChunkedLayoutWriter {
48 pub fn new(ctx: ArrayContext, dtype: DType, options: ChunkedLayoutStrategy) -> Self {
49 Self {
50 ctx,
51 options,
52 chunks: vec![],
53 dtype,
54 row_count: 0,
55 }
56 }
57}
58
59impl LayoutWriter for ChunkedLayoutWriter {
60 fn push_chunk(
61 &mut self,
62 segment_writer: &mut dyn SegmentWriter,
63 chunk: ArrayRef,
64 ) -> VortexResult<()> {
65 assert_eq!(
66 chunk.dtype(),
67 &self.dtype,
68 "Can't push chunks of the wrong dtype into a LayoutWriter. Pushed {} but expected {}.",
69 chunk.dtype(),
70 self.dtype
71 );
72
73 self.row_count += chunk.len() as u64;
74
75 let mut chunk_writer = self
78 .options
79 .chunk_strategy
80 .new_writer(&self.ctx, chunk.dtype())?;
81 chunk_writer.push_chunk(segment_writer, chunk)?;
82 chunk_writer.flush(segment_writer)?;
83 self.chunks.push(chunk_writer);
84
85 Ok(())
86 }
87
88 fn flush(&mut self, _segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
89 Ok(())
91 }
92
93 fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
94 let mut children = vec![];
96 for writer in self.chunks.iter_mut() {
97 children.push(writer.finish(segment_writer)?);
99 }
100
101 if children.len() == 1 {
104 return Ok(children.pop().vortex_expect("child layout"));
105 }
106 Ok(chunked_layout(self.dtype.clone(), self.row_count, children))
107 }
108}
109
110pub(crate) fn chunked_layout(dtype: DType, row_count: u64, children: Vec<Layout>) -> Layout {
111 Layout::new_owned(
112 "chunked".into(),
113 LayoutVTableRef::new_ref(&ChunkedLayout),
114 dtype,
115 row_count,
116 vec![],
117 children,
118 None,
119 )
120}