1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use futures_util::{Stream, TryStreamExt};
use vortex::array::ChunkedArray;
use vortex::stream::ArrayStream;
use vortex::Array;
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_error::VortexResult;

use crate::io::VortexWrite;
use crate::MessageWriter;

pub struct ArrayWriter<W: VortexWrite> {
    msgs: MessageWriter<W>,

    array_layouts: Vec<ArrayLayout>,
    page_ranges: Vec<ByteRange>,
}

impl<W: VortexWrite> ArrayWriter<W> {
    pub fn new(write: W) -> Self {
        Self {
            msgs: MessageWriter::new(write),
            array_layouts: vec![],
            page_ranges: vec![],
        }
    }

    pub fn array_layouts(&self) -> &[ArrayLayout] {
        &self.array_layouts
    }

    pub fn page_ranges(&self) -> &[ByteRange] {
        &self.page_ranges
    }

    pub fn into_inner(self) -> W {
        self.msgs.into_inner()
    }

    async fn write_dtype(&mut self, dtype: &DType) -> VortexResult<ByteRange> {
        let begin = self.msgs.tell();
        self.msgs.write_dtype(dtype).await?;
        let end = self.msgs.tell();
        Ok(ByteRange { begin, end })
    }

    async fn write_array_chunks<S>(&mut self, mut stream: S) -> VortexResult<ChunkOffsets>
    where
        S: Stream<Item = VortexResult<Array>> + Unpin,
    {
        let mut byte_offsets = vec![self.msgs.tell()];
        let mut row_offsets = vec![0];
        let mut row_offset = 0;

        while let Some(chunk) = stream.try_next().await? {
            row_offset += chunk.len() as u64;
            row_offsets.push(row_offset);
            self.msgs.write_batch(chunk).await?;
            byte_offsets.push(self.msgs.tell());
        }

        Ok(ChunkOffsets {
            byte_offsets,
            row_offsets,
        })
    }

    pub async fn write_array_stream<S: ArrayStream + Unpin>(
        mut self,
        mut array_stream: S,
    ) -> VortexResult<Self> {
        let dtype_pos = self.write_dtype(array_stream.dtype()).await?;
        let chunk_pos = self.write_array_chunks(&mut array_stream).await?;
        self.array_layouts.push(ArrayLayout {
            dtype: dtype_pos,
            chunks: chunk_pos,
        });
        Ok(self)
    }

    pub async fn write_array(self, array: Array) -> VortexResult<Self> {
        if let Ok(chunked) = ChunkedArray::try_from(&array) {
            self.write_array_stream(chunked.array_stream()).await
        } else {
            self.write_array_stream(array.into_array_stream()).await
        }
    }

    pub async fn write_page(mut self, buffer: Buffer) -> VortexResult<Self> {
        let begin = self.msgs.tell();
        self.msgs.write_page(buffer).await?;
        let end = self.msgs.tell();
        self.page_ranges.push(ByteRange { begin, end });
        Ok(self)
    }
}

#[derive(Copy, Clone, Debug)]
pub struct ByteRange {
    pub begin: u64,
    pub end: u64,
}

#[allow(clippy::len_without_is_empty)]
impl ByteRange {
    pub fn len(&self) -> usize {
        (self.end - self.begin) as usize
    }
}

#[derive(Clone, Debug)]
pub struct ArrayLayout {
    pub dtype: ByteRange,
    pub chunks: ChunkOffsets,
}

#[derive(Clone, Debug)]
pub struct ChunkOffsets {
    pub byte_offsets: Vec<u64>,
    pub row_offsets: Vec<u64>,
}