Skip to main content

lance_core/cache/
entry_io.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Streaming readers/writers for cache entry bodies.
5//!
6//! [`CacheCodecImpl`](super::CacheCodecImpl) bodies are written and read
7//! through these wrappers. They keep serialization streaming (no buffering of
8//! the whole entry) and reads zero-copy (sections borrow from the input
9//! [`Bytes`]), while tracking the byte position needed to keep Arrow IPC
10//! sections 64-byte aligned (see [`lance_arrow::ipc`]).
11//!
12//! Body layout primitives:
13//!
14//! ```text
15//! HEADER    : [header_len: u32 LE][header proto bytes]
16//! ARROW_IPC : [pad to 64B][self-delimiting IPC stream]
17//! RAW_BLOB  : [len: u64 LE][bytes]
18//! ```
19
20use std::io::Write;
21
22use arrow_array::RecordBatch;
23use bytes::Bytes;
24use prost::Message;
25
26use crate::{Error, Result};
27
28/// Writes a cache entry body: a header followed by sections, streaming
29/// directly to the underlying writer.
30///
31/// The envelope is written by the [`CacheCodec`](super::CacheCodec) wrapper
32/// before this writer is handed to
33/// [`CacheCodecImpl::serialize`](super::CacheCodecImpl::serialize).
34pub struct CacheEntryWriter<'a> {
35    writer: &'a mut dyn Write,
36    /// Absolute byte offset within the entry, used to align IPC sections.
37    pos: usize,
38}
39
40impl<'a> CacheEntryWriter<'a> {
41    /// Create a writer positioned at the start of an entry (offset 0).
42    ///
43    /// Use this for nested serialization into a standalone buffer. The
44    /// envelope-aware entry point is [`CacheCodec::serialize`](super::CacheCodec::serialize).
45    pub fn new(writer: &'a mut dyn Write) -> Self {
46        Self { writer, pos: 0 }
47    }
48
49    /// Create a writer whose section alignment accounts for `pos` bytes
50    /// already written ahead of the body (i.e. the envelope).
51    pub(crate) fn with_pos(writer: &'a mut dyn Write, pos: usize) -> Self {
52        Self { writer, pos }
53    }
54
55    /// Write a single discriminant byte (e.g. a variant tag).
56    pub fn write_u8(&mut self, value: u8) -> Result<()> {
57        self.writer.write_all(&[value])?;
58        self.pos += 1;
59        Ok(())
60    }
61
62    /// Write a protobuf header as `[len: u32 LE][bytes]`.
63    pub fn write_header<P: Message>(&mut self, header: &P) -> Result<()> {
64        let bytes = header.encode_to_vec();
65        let len = u32::try_from(bytes.len())
66            .map_err(|_| Error::io(format!("cache header too large: {} bytes", bytes.len())))?;
67        self.writer.write_all(&len.to_le_bytes())?;
68        self.writer.write_all(&bytes)?;
69        self.pos += 4 + bytes.len();
70        Ok(())
71    }
72
73    /// Write `batch` as a 64-byte-aligned Arrow IPC section.
74    pub fn write_ipc(&mut self, batch: &RecordBatch) -> Result<()> {
75        lance_arrow::ipc::write_ipc_section(self.writer, &mut self.pos, batch)
76            .map_err(|e| Error::io(e.to_string()))
77    }
78
79    /// Write `batches` as a single 64-byte-aligned multi-batch Arrow IPC
80    /// section. The iterator must yield at least one batch.
81    pub fn write_ipc_batches<I>(&mut self, batches: I) -> Result<()>
82    where
83        I: IntoIterator<Item = RecordBatch>,
84    {
85        lance_arrow::ipc::write_ipc_section_batches(self.writer, &mut self.pos, batches)
86            .map_err(|e| Error::io(e.to_string()))
87    }
88
89    /// Write a raw blob as `[len: u64 LE][bytes]`.
90    ///
91    /// Only for byte payloads that already have their own stable, portable
92    /// encoding (e.g. a roaring bitmap, a varint-packed stream).
93    pub fn write_raw(&mut self, bytes: &[u8]) -> Result<()> {
94        lance_arrow::ipc::write_len_prefixed_bytes(self.writer, bytes)
95            .map_err(|e| Error::io(e.to_string()))?;
96        self.pos += 8 + bytes.len();
97        Ok(())
98    }
99
100    /// The underlying writer, for a payload that carries its own framing.
101    ///
102    /// Use this only when the codec writes a self-delimiting or whole-body
103    /// payload — e.g. streaming a roaring bitmap as the entire body, where the
104    /// length prefix of [`write_raw`](Self::write_raw) would be redundant and
105    /// buffering to measure that length would force an extra copy. For
106    /// structured bodies prefer [`write_header`](Self::write_header) /
107    /// [`write_ipc`](Self::write_ipc) / [`write_raw`](Self::write_raw), which
108    /// give you versioning and 64-byte IPC alignment.
109    ///
110    /// Bytes written through this do **not** advance the section-alignment
111    /// position, so it must not be interleaved with [`write_ipc`](Self::write_ipc).
112    pub fn raw_writer(&mut self) -> &mut dyn Write {
113        self.writer
114    }
115}
116
117/// Reads a cache entry body, tracking an offset into the input and exposing
118/// the entry's `type_version` so implementors can branch for backward compat.
119///
120/// All reads are zero-copy: returned [`Bytes`] and the buffers behind decoded
121/// [`RecordBatch`]es borrow from the input allocation.
122pub struct CacheEntryReader<'a> {
123    data: &'a Bytes,
124    offset: usize,
125    version: u32,
126}
127
128impl<'a> CacheEntryReader<'a> {
129    /// Create a reader over `data`, starting at body byte `offset`, for an
130    /// entry written at `version`.
131    pub fn new(data: &'a Bytes, offset: usize, version: u32) -> Self {
132        Self {
133            data,
134            offset,
135            version,
136        }
137    }
138
139    /// The `type_version` from the envelope. Branch on this for backward compat.
140    pub fn version(&self) -> u32 {
141        self.version
142    }
143
144    /// Read a single discriminant byte written by [`CacheEntryWriter::write_u8`].
145    pub fn read_u8(&mut self) -> Result<u8> {
146        let bytes = self.data.as_ref();
147        let v = *bytes
148            .get(self.offset)
149            .ok_or_else(|| Error::io("cache entry: truncated, missing tag byte".to_string()))?;
150        self.offset += 1;
151        Ok(v)
152    }
153
154    /// Read a protobuf header written by [`CacheEntryWriter::write_header`].
155    pub fn read_header<P: Message + Default>(&mut self) -> Result<P> {
156        let bytes = self.data.as_ref();
157        let len_end = self
158            .offset
159            .checked_add(4)
160            .filter(|&e| e <= bytes.len())
161            .ok_or_else(|| Error::io("cache header: truncated length prefix".to_string()))?;
162        let len = u32::from_le_bytes(bytes[self.offset..len_end].try_into().unwrap()) as usize;
163        let data_end = len_end
164            .checked_add(len)
165            .filter(|&e| e <= bytes.len())
166            .ok_or_else(|| Error::io("cache header: truncated body".to_string()))?;
167        let msg = P::decode(&bytes[len_end..data_end])
168            .map_err(|e| Error::io(format!("cache header decode failed: {e}")))?;
169        self.offset = data_end;
170        Ok(msg)
171    }
172
173    /// Read one [`RecordBatch`] from a 64-byte-aligned IPC section.
174    pub fn read_ipc(&mut self) -> Result<RecordBatch> {
175        lance_arrow::ipc::read_ipc_section_at(self.data, &mut self.offset)
176            .map_err(|e| Error::io(e.to_string()))
177    }
178
179    /// Read all [`RecordBatch`]es from a 64-byte-aligned multi-batch IPC
180    /// section written by [`CacheEntryWriter::write_ipc_batches`].
181    pub fn read_ipc_batches(&mut self) -> Result<Vec<RecordBatch>> {
182        lance_arrow::ipc::read_ipc_section_batches_at(self.data, &mut self.offset)
183            .map_err(|e| Error::io(e.to_string()))
184    }
185
186    /// Read a raw blob written by [`CacheEntryWriter::write_raw`], zero-copy.
187    pub fn read_raw(&mut self) -> Result<Bytes> {
188        lance_arrow::ipc::read_len_prefixed_bytes_at(self.data, &mut self.offset)
189            .map_err(|e| Error::io(e.to_string()))
190    }
191
192    /// The not-yet-consumed body bytes as a zero-copy slice.
193    ///
194    /// For a payload that carries its own framing and is parsed with the
195    /// codec's own cursor — the read counterpart of
196    /// [`CacheEntryWriter::raw_writer`]. For structured bodies prefer
197    /// [`read_header`](Self::read_header) / [`read_ipc`](Self::read_ipc) /
198    /// [`read_raw`](Self::read_raw).
199    pub fn body(&self) -> Bytes {
200        self.data.slice(self.offset..)
201    }
202}