lance_core/cache/entry_io.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Streaming readers/writers for cache entry bodies.
5//!
6//! [`CacheCodecImpl`](super::CacheCodecImpl) bodies are written and read
7//! through these wrappers. They keep serialization streaming (no buffering of
8//! the whole entry) and reads zero-copy (sections borrow from the input
9//! [`Bytes`]), while tracking the byte position needed to keep Arrow IPC
10//! sections 64-byte aligned (see [`lance_arrow::ipc`]).
11//!
12//! Body layout primitives:
13//!
14//! ```text
15//! HEADER : [header_len: u32 LE][header proto bytes]
16//! ARROW_IPC : [pad to 64B][self-delimiting IPC stream]
17//! RAW_BLOB : [len: u64 LE][bytes]
18//! ```
19
20use std::io::Write;
21
22use arrow_array::RecordBatch;
23use bytes::Bytes;
24use prost::Message;
25
26use crate::{Error, Result};
27
28/// Writes a cache entry body: a header followed by sections, streaming
29/// directly to the underlying writer.
30///
31/// The envelope is written by the [`CacheCodec`](super::CacheCodec) wrapper
32/// before this writer is handed to
33/// [`CacheCodecImpl::serialize`](super::CacheCodecImpl::serialize).
34pub struct CacheEntryWriter<'a> {
35 writer: &'a mut dyn Write,
36 /// Absolute byte offset within the entry, used to align IPC sections.
37 pos: usize,
38}
39
40impl<'a> CacheEntryWriter<'a> {
41 /// Create a writer positioned at the start of an entry (offset 0).
42 ///
43 /// Use this for nested serialization into a standalone buffer. The
44 /// envelope-aware entry point is [`CacheCodec::serialize`](super::CacheCodec::serialize).
45 pub fn new(writer: &'a mut dyn Write) -> Self {
46 Self { writer, pos: 0 }
47 }
48
49 /// Create a writer whose section alignment accounts for `pos` bytes
50 /// already written ahead of the body (i.e. the envelope).
51 pub(crate) fn with_pos(writer: &'a mut dyn Write, pos: usize) -> Self {
52 Self { writer, pos }
53 }
54
55 /// Write a single discriminant byte (e.g. a variant tag).
56 pub fn write_u8(&mut self, value: u8) -> Result<()> {
57 self.writer.write_all(&[value])?;
58 self.pos += 1;
59 Ok(())
60 }
61
62 /// Write a protobuf header as `[len: u32 LE][bytes]`.
63 pub fn write_header<P: Message>(&mut self, header: &P) -> Result<()> {
64 let bytes = header.encode_to_vec();
65 let len = u32::try_from(bytes.len())
66 .map_err(|_| Error::io(format!("cache header too large: {} bytes", bytes.len())))?;
67 self.writer.write_all(&len.to_le_bytes())?;
68 self.writer.write_all(&bytes)?;
69 self.pos += 4 + bytes.len();
70 Ok(())
71 }
72
73 /// Write `batch` as a 64-byte-aligned Arrow IPC section.
74 pub fn write_ipc(&mut self, batch: &RecordBatch) -> Result<()> {
75 lance_arrow::ipc::write_ipc_section(self.writer, &mut self.pos, batch)
76 .map_err(|e| Error::io(e.to_string()))
77 }
78
79 /// Write `batches` as a single 64-byte-aligned multi-batch Arrow IPC
80 /// section. The iterator must yield at least one batch.
81 pub fn write_ipc_batches<I>(&mut self, batches: I) -> Result<()>
82 where
83 I: IntoIterator<Item = RecordBatch>,
84 {
85 lance_arrow::ipc::write_ipc_section_batches(self.writer, &mut self.pos, batches)
86 .map_err(|e| Error::io(e.to_string()))
87 }
88
89 /// Write a raw blob as `[len: u64 LE][bytes]`.
90 ///
91 /// Only for byte payloads that already have their own stable, portable
92 /// encoding (e.g. a roaring bitmap, a varint-packed stream).
93 pub fn write_raw(&mut self, bytes: &[u8]) -> Result<()> {
94 lance_arrow::ipc::write_len_prefixed_bytes(self.writer, bytes)
95 .map_err(|e| Error::io(e.to_string()))?;
96 self.pos += 8 + bytes.len();
97 Ok(())
98 }
99
100 /// The underlying writer, for a payload that carries its own framing.
101 ///
102 /// Use this only when the codec writes a self-delimiting or whole-body
103 /// payload — e.g. streaming a roaring bitmap as the entire body, where the
104 /// length prefix of [`write_raw`](Self::write_raw) would be redundant and
105 /// buffering to measure that length would force an extra copy. For
106 /// structured bodies prefer [`write_header`](Self::write_header) /
107 /// [`write_ipc`](Self::write_ipc) / [`write_raw`](Self::write_raw), which
108 /// give you versioning and 64-byte IPC alignment.
109 ///
110 /// Bytes written through this do **not** advance the section-alignment
111 /// position, so it must not be interleaved with [`write_ipc`](Self::write_ipc).
112 pub fn raw_writer(&mut self) -> &mut dyn Write {
113 self.writer
114 }
115}
116
117/// Reads a cache entry body, tracking an offset into the input and exposing
118/// the entry's `type_version` so implementors can branch for backward compat.
119///
120/// All reads are zero-copy: returned [`Bytes`] and the buffers behind decoded
121/// [`RecordBatch`]es borrow from the input allocation.
122pub struct CacheEntryReader<'a> {
123 data: &'a Bytes,
124 offset: usize,
125 version: u32,
126}
127
128impl<'a> CacheEntryReader<'a> {
129 /// Create a reader over `data`, starting at body byte `offset`, for an
130 /// entry written at `version`.
131 pub fn new(data: &'a Bytes, offset: usize, version: u32) -> Self {
132 Self {
133 data,
134 offset,
135 version,
136 }
137 }
138
139 /// The `type_version` from the envelope. Branch on this for backward compat.
140 pub fn version(&self) -> u32 {
141 self.version
142 }
143
144 /// Read a single discriminant byte written by [`CacheEntryWriter::write_u8`].
145 pub fn read_u8(&mut self) -> Result<u8> {
146 let bytes = self.data.as_ref();
147 let v = *bytes
148 .get(self.offset)
149 .ok_or_else(|| Error::io("cache entry: truncated, missing tag byte".to_string()))?;
150 self.offset += 1;
151 Ok(v)
152 }
153
154 /// Read a protobuf header written by [`CacheEntryWriter::write_header`].
155 pub fn read_header<P: Message + Default>(&mut self) -> Result<P> {
156 let bytes = self.data.as_ref();
157 let len_end = self
158 .offset
159 .checked_add(4)
160 .filter(|&e| e <= bytes.len())
161 .ok_or_else(|| Error::io("cache header: truncated length prefix".to_string()))?;
162 let len = u32::from_le_bytes(bytes[self.offset..len_end].try_into().unwrap()) as usize;
163 let data_end = len_end
164 .checked_add(len)
165 .filter(|&e| e <= bytes.len())
166 .ok_or_else(|| Error::io("cache header: truncated body".to_string()))?;
167 let msg = P::decode(&bytes[len_end..data_end])
168 .map_err(|e| Error::io(format!("cache header decode failed: {e}")))?;
169 self.offset = data_end;
170 Ok(msg)
171 }
172
173 /// Read one [`RecordBatch`] from a 64-byte-aligned IPC section.
174 pub fn read_ipc(&mut self) -> Result<RecordBatch> {
175 lance_arrow::ipc::read_ipc_section_at(self.data, &mut self.offset)
176 .map_err(|e| Error::io(e.to_string()))
177 }
178
179 /// Read all [`RecordBatch`]es from a 64-byte-aligned multi-batch IPC
180 /// section written by [`CacheEntryWriter::write_ipc_batches`].
181 pub fn read_ipc_batches(&mut self) -> Result<Vec<RecordBatch>> {
182 lance_arrow::ipc::read_ipc_section_batches_at(self.data, &mut self.offset)
183 .map_err(|e| Error::io(e.to_string()))
184 }
185
186 /// Read a raw blob written by [`CacheEntryWriter::write_raw`], zero-copy.
187 pub fn read_raw(&mut self) -> Result<Bytes> {
188 lance_arrow::ipc::read_len_prefixed_bytes_at(self.data, &mut self.offset)
189 .map_err(|e| Error::io(e.to_string()))
190 }
191
192 /// The not-yet-consumed body bytes as a zero-copy slice.
193 ///
194 /// For a payload that carries its own framing and is parsed with the
195 /// codec's own cursor — the read counterpart of
196 /// [`CacheEntryWriter::raw_writer`]. For structured bodies prefer
197 /// [`read_header`](Self::read_header) / [`read_ipc`](Self::read_ipc) /
198 /// [`read_raw`](Self::read_raw).
199 pub fn body(&self) -> Bytes {
200 self.data.slice(self.offset..)
201 }
202}