mod filter;
mod index;
mod meta;
use super::{
Block, BlockOffset, DataBlock, KeyedBlockHandle, block::Header as BlockHeader,
filter::BloomConstructionPolicy,
};
use crate::{
Checksum, CompressionType, InternalValue, TableId, UserKey, ValueType,
checksum::{ChecksumType, ChecksummedWriter},
coding::Encode,
encryption::EncryptionProvider,
fs::{Fs, FsFile, FsOpenOptions},
prefix::PrefixExtractor,
range_tombstone::RangeTombstone,
table::{
BlockHandle,
writer::{
filter::{FilterWriter, FullFilterWriter},
index::FullIndexWriter,
},
},
time::unix_timestamp,
vlog::BlobFileId,
};
use index::BlockIndexWriter;
use std::{io::BufWriter, path::PathBuf, sync::Arc};
#[derive(Copy, Clone, PartialEq, Eq, Debug, std::hash::Hash)]
pub struct LinkedFile {
pub blob_file_id: BlobFileId,
pub bytes: u64,
pub on_disk_bytes: u64,
pub len: usize,
}
pub struct Writer {
fs: Arc<dyn Fs>,
pub(crate) path: PathBuf,
table_id: TableId,
data_block_restart_interval: u8,
index_block_restart_interval: u8,
meta_partition_size: u32,
data_block_size: u32,
data_block_hash_ratio: f32,
data_block_compression: CompressionType,
index_block_compression: CompressionType,
block_buffer: Vec<u8>,
#[expect(clippy::struct_field_names)]
file_writer: sfa::Writer<ChecksummedWriter<BufWriter<Box<dyn FsFile>>>>,
#[expect(clippy::struct_field_names)]
index_writer: Box<dyn BlockIndexWriter<BufWriter<Box<dyn FsFile>>>>,
#[expect(clippy::struct_field_names)]
filter_writer: Box<dyn FilterWriter<BufWriter<Box<dyn FsFile>>>>,
chunk: Vec<InternalValue>,
chunk_size: usize,
pub(crate) meta: meta::Metadata,
prev_pos: (BlockOffset, BlockOffset),
current_key: Option<UserKey>,
bloom_policy: BloomConstructionPolicy,
prefix_extractor: Option<Arc<dyn PrefixExtractor>>,
previous_item: Option<(UserKey, ValueType)>,
linked_blob_files: Vec<LinkedFile>,
range_tombstones: Vec<RangeTombstone>,
initial_level: u8,
encryption: Option<Arc<dyn EncryptionProvider>>,
#[cfg(zstd_any)]
zstd_dictionary: Option<Arc<crate::compression::ZstdDictionary>>,
}
impl Writer {
pub fn new(
path: PathBuf,
table_id: TableId,
initial_level: u8,
fs: Arc<dyn Fs>,
) -> crate::Result<Self> {
let path = std::path::absolute(path)?;
let file = fs.open(&path, &FsOpenOptions::new().write(true).create_new(true))?;
let writer = BufWriter::with_capacity(u16::MAX.into(), file);
let writer = ChecksummedWriter::new(writer);
let mut writer = sfa::Writer::from_writer(writer);
writer.start("data")?;
Ok(Self {
fs,
initial_level,
meta: meta::Metadata::default(),
table_id,
data_block_restart_interval: 16,
index_block_restart_interval: 1,
data_block_hash_ratio: 0.0,
meta_partition_size: 4_096,
data_block_size: 4_096,
data_block_compression: CompressionType::None,
index_block_compression: CompressionType::None,
path,
index_writer: Box::new(FullIndexWriter::new()),
filter_writer: Box::new(FullFilterWriter::new(BloomConstructionPolicy::default())),
block_buffer: Vec::new(),
file_writer: writer,
chunk: Vec::new(),
prev_pos: (BlockOffset(0), BlockOffset(0)),
chunk_size: 0,
current_key: None,
bloom_policy: BloomConstructionPolicy::default(),
prefix_extractor: None,
previous_item: None,
linked_blob_files: Vec::new(),
range_tombstones: Vec::new(),
encryption: None,
#[cfg(zstd_any)]
zstd_dictionary: None,
})
}
pub fn link_blob_file(
&mut self,
blob_file_id: BlobFileId,
len: usize,
bytes: u64,
on_disk_bytes: u64,
) {
self.linked_blob_files.push(LinkedFile {
blob_file_id,
bytes,
on_disk_bytes,
len,
});
}
fn assert_not_started(&self, setting: &str) {
assert!(
self.meta.data_block_count == 0 && self.chunk.is_empty(),
"{setting} must be configured before writing starts",
);
}
#[must_use]
pub fn use_partitioned_filter(mut self) -> Self {
self.assert_not_started("partitioned filter");
self.filter_writer = Box::new(filter::PartitionedFilterWriter::new(self.bloom_policy))
.use_tli_compression(self.index_block_compression)
.use_partition_size(self.meta_partition_size)
.set_prefix_extractor(self.prefix_extractor.clone())
.use_encryption(self.encryption.clone());
self
}
#[must_use]
pub fn use_partitioned_index(mut self) -> Self {
self.assert_not_started("partitioned index");
self.index_writer = Box::new(index::PartitionedIndexWriter::new())
.use_compression(self.index_block_compression)
.use_partition_size(self.meta_partition_size)
.use_restart_interval(self.index_block_restart_interval)
.use_encryption(self.encryption.clone());
self
}
#[must_use]
pub fn use_data_block_restart_interval(mut self, interval: u8) -> Self {
assert!(
interval > 0,
"data block restart interval must be greater than zero",
);
self.assert_not_started("data block restart interval");
self.data_block_restart_interval = interval;
self
}
#[must_use]
pub fn use_index_block_restart_interval(mut self, interval: u8) -> Self {
assert!(
interval > 0,
"index block restart interval must be greater than zero",
);
self.assert_not_started("index block restart interval");
self.index_block_restart_interval = interval;
self.index_writer = self.index_writer.use_restart_interval(interval);
self
}
#[must_use]
pub fn use_data_block_hash_ratio(mut self, ratio: f32) -> Self {
self.data_block_hash_ratio = ratio;
self
}
#[must_use]
pub fn use_data_block_size(mut self, size: u32) -> Self {
assert!(
size <= 4 * 1_024 * 1_024,
"data block size must be <= 4 MiB",
);
self.data_block_size = size;
self
}
#[must_use]
pub fn use_meta_partition_size(mut self, size: u32) -> Self {
assert!(
size <= 4 * 1_024 * 1_024,
"data block size must be <= 4 MiB",
);
self.meta_partition_size = size;
self.index_writer = self.index_writer.use_partition_size(size);
self.filter_writer = self.filter_writer.use_partition_size(size);
self
}
#[must_use]
pub fn use_data_block_compression(mut self, compression: CompressionType) -> Self {
self.data_block_compression = compression;
self
}
#[must_use]
pub fn use_index_block_compression(mut self, compression: CompressionType) -> Self {
#[cfg(zstd_any)]
let compression = match compression {
CompressionType::ZstdDict { level, .. } => CompressionType::Zstd(level),
other => other,
};
self.index_block_compression = compression;
self.index_writer = self.index_writer.use_compression(compression);
self.filter_writer = self.filter_writer.use_tli_compression(compression);
self
}
#[must_use]
pub fn use_encryption(mut self, encryption: Option<Arc<dyn EncryptionProvider>>) -> Self {
self.index_writer = self.index_writer.use_encryption(encryption.clone());
self.filter_writer = self.filter_writer.use_encryption(encryption.clone());
self.encryption = encryption;
self
}
#[cfg(zstd_any)]
#[must_use]
pub fn use_zstd_dictionary(
mut self,
dictionary: Option<Arc<crate::compression::ZstdDictionary>>,
) -> Self {
self.zstd_dictionary = dictionary;
self
}
#[must_use]
pub fn use_bloom_policy(mut self, bloom_policy: BloomConstructionPolicy) -> Self {
self.bloom_policy = bloom_policy;
self.filter_writer = self.filter_writer.set_filter_policy(bloom_policy);
self
}
#[must_use]
pub fn use_prefix_extractor(mut self, extractor: Option<Arc<dyn PrefixExtractor>>) -> Self {
self.prefix_extractor.clone_from(&extractor);
self.filter_writer = self.filter_writer.set_prefix_extractor(extractor);
self
}
pub(crate) fn write_range_tombstone(&mut self, rt: RangeTombstone) {
self.meta.lowest_seqno = self.meta.lowest_seqno.min(rt.seqno);
self.meta.highest_seqno = self.meta.highest_seqno.max(rt.seqno);
self.range_tombstones.push(rt);
}
pub fn write(&mut self, item: InternalValue) -> crate::Result<()> {
let value_type = item.key.value_type;
let seqno = item.key.seqno;
let user_key = item.key.user_key.clone();
let value_len = item.value.len();
if item.is_tombstone() {
self.meta.tombstone_count += 1;
}
if value_type == ValueType::WeakTombstone {
self.meta.weak_tombstone_count += 1;
}
if value_type == ValueType::Value
&& let Some((prev_key, prev_type)) = &self.previous_item
&& prev_type == &ValueType::WeakTombstone
&& prev_key.as_ref() == user_key.as_ref()
{
self.meta.weak_tombstone_reclaimable_count += 1;
}
if Some(&user_key) != self.current_key.as_ref() {
self.meta.key_count += 1;
self.current_key = Some(user_key.clone());
if self.bloom_policy.is_active() {
self.filter_writer.register_key(&user_key)?;
}
}
if self.meta.first_key.is_none() {
self.meta.first_key = Some(user_key.clone());
}
self.chunk_size += user_key.len() + value_len;
self.chunk.push(item);
self.previous_item = Some((user_key, value_type));
if self.chunk_size >= self.data_block_size as usize {
self.spill_block()?;
}
self.meta.lowest_seqno = self.meta.lowest_seqno.min(seqno);
self.meta.highest_seqno = self.meta.highest_seqno.max(seqno);
self.meta.highest_kv_seqno = self.meta.highest_kv_seqno.max(seqno);
Ok(())
}
pub(crate) fn spill_block(&mut self) -> crate::Result<()> {
let Some(last) = self.chunk.last() else {
return Ok(());
};
self.block_buffer.clear();
DataBlock::encode_into(
&mut self.block_buffer,
&self.chunk,
self.data_block_restart_interval,
self.data_block_hash_ratio,
)?;
let header = Block::write_into(
&mut self.file_writer,
&self.block_buffer,
super::block::BlockType::Data,
self.data_block_compression,
self.encryption.as_deref(),
#[cfg(zstd_any)]
self.zstd_dictionary.as_deref(),
)?;
self.meta.uncompressed_size += u64::from(header.uncompressed_length);
#[expect(
clippy::cast_possible_truncation,
reason = "block header is a couple of bytes only, so cast is fine"
)]
let bytes_written = BlockHeader::serialized_len() as u32 + header.data_length;
self.index_writer
.register_data_block(KeyedBlockHandle::new(
last.key.user_key.clone(),
last.key.seqno,
BlockHandle::new(self.meta.file_pos, bytes_written),
))?;
self.meta.file_pos += u64::from(bytes_written);
self.meta.item_count += self.chunk.len();
self.meta.data_block_count += 1;
self.prev_pos.0 = self.prev_pos.1;
self.prev_pos.1 += u64::from(bytes_written);
self.meta.last_key = Some(
#[expect(clippy::expect_used, reason = "chunk is not empty")]
self.chunk
.pop()
.expect("chunk should not be empty")
.key
.user_key,
);
self.chunk.clear();
self.chunk_size = 0;
Ok(())
}
#[expect(clippy::too_many_lines)]
pub fn finish(mut self) -> crate::Result<Option<(TableId, Checksum)>> {
use std::io::Write;
self.spill_block()?;
if self.meta.item_count == 0 && self.range_tombstones.is_empty() {
self.fs.remove_file(&self.path)?;
return Ok(None);
}
if self.meta.item_count == 0 {
let mut min_start: Option<UserKey> = None;
let mut max_end: Option<UserKey> = None;
let mut sentinel_start: Option<UserKey> = None;
let mut sentinel_seqno: Option<crate::SeqNo> = None;
for rt in &self.range_tombstones {
match &min_start {
None => min_start = Some(rt.start.clone()),
Some(cur_min) if rt.start < *cur_min => min_start = Some(rt.start.clone()),
_ => {}
}
match &max_end {
None => max_end = Some(rt.end.clone()),
Some(cur_max) if rt.end > *cur_max => max_end = Some(rt.end.clone()),
_ => {}
}
match (sentinel_seqno, &sentinel_start) {
(None, _) => {
sentinel_seqno = Some(rt.seqno);
sentinel_start = Some(rt.start.clone());
}
(Some(cur_seqno), Some(cur_start))
if rt.seqno < cur_seqno
|| (rt.seqno == cur_seqno && rt.start < *cur_start) =>
{
sentinel_seqno = Some(rt.seqno);
sentinel_start = Some(rt.start.clone());
}
_ => {}
}
}
if let (Some(start), Some(end), Some(sentinel_key), Some(sentinel_seqno)) =
(min_start, max_end, sentinel_start, sentinel_seqno)
{
let saved_lo = self.meta.lowest_seqno;
let saved_hi = self.meta.highest_seqno;
let saved_kv_hi = self.meta.highest_kv_seqno;
self.write(InternalValue::new_weak_tombstone(
sentinel_key,
sentinel_seqno,
))?;
self.spill_block()?;
self.meta.lowest_seqno = saved_lo;
self.meta.highest_seqno = saved_hi;
self.meta.highest_kv_seqno = saved_kv_hi;
self.meta.first_key = Some(start);
self.meta.last_key = Some(end);
}
}
log::trace!("Finishing index writer");
let index_block_count = self.index_writer.finish(&mut self.file_writer)?;
log::trace!("Finishing filter writer");
let filter_block_count = self.filter_writer.finish(&mut self.file_writer)?;
if !self.range_tombstones.is_empty() {
use byteorder::{LE, WriteBytesExt};
self.file_writer.start("range_tombstones")?;
self.block_buffer.clear();
for rt in &self.range_tombstones {
let start_len = u16::try_from(rt.start.len()).map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
"range tombstone start key length exceeds u16::MAX",
)
})?;
let end_len = u16::try_from(rt.end.len()).map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
"range tombstone end key length exceeds u16::MAX",
)
})?;
self.block_buffer.write_u16::<LE>(start_len)?;
self.block_buffer.extend_from_slice(&rt.start);
self.block_buffer.write_u16::<LE>(end_len)?;
self.block_buffer.extend_from_slice(&rt.end);
self.block_buffer.write_u64::<LE>(rt.seqno)?;
}
Block::write_into(
&mut self.file_writer,
&self.block_buffer,
crate::table::block::BlockType::RangeTombstone,
CompressionType::None,
self.encryption.as_deref(),
#[cfg(zstd_any)]
None,
)?;
}
if !self.linked_blob_files.is_empty() {
use byteorder::{LE, WriteBytesExt};
self.file_writer.start("linked_blob_files")?;
#[expect(
clippy::cast_possible_truncation,
reason = "there are never 4 billion blob files linked to a single table"
)]
self.file_writer
.write_u32::<LE>(self.linked_blob_files.len() as u32)?;
for file in self.linked_blob_files {
self.file_writer.write_u64::<LE>(file.blob_file_id)?;
self.file_writer.write_u64::<LE>(file.len as u64)?;
self.file_writer.write_u64::<LE>(file.bytes)?;
self.file_writer.write_u64::<LE>(file.on_disk_bytes)?;
}
}
self.file_writer.start("table_version")?;
self.file_writer.write_all(&[0x3])?;
self.file_writer.start("meta")?;
{
fn meta(key: &str, value: &[u8]) -> InternalValue {
InternalValue::from_components(key, value, 0, crate::ValueType::Value)
}
let meta_items = [
meta(
"block_count#data",
&(self.meta.data_block_count as u64).to_le_bytes(),
),
meta(
"block_count#filter",
&(filter_block_count as u64).to_le_bytes(),
),
meta(
"block_count#index",
&(index_block_count as u64).to_le_bytes(),
),
meta("checksum_type", &[u8::from(ChecksumType::Xxh3)]),
meta(
"compression#data",
&self.data_block_compression.encode_into_vec(),
),
meta(
"compression#index",
&self.index_block_compression.encode_into_vec(),
),
meta("crate_version", env!("CARGO_PKG_VERSION").as_bytes()),
meta("created_at", &unix_timestamp().as_nanos().to_le_bytes()),
meta(
"data_block_hash_ratio",
&self.data_block_hash_ratio.to_le_bytes(),
),
meta("file_size", &self.meta.file_pos.to_le_bytes()),
meta("filter_hash_type", &[u8::from(ChecksumType::Xxh3)]),
meta("index_keys_have_seqno", &[0x1]),
meta("initial_level", &self.initial_level.to_le_bytes()),
meta("item_count", &(self.meta.item_count as u64).to_le_bytes()),
meta(
"key#max",
#[expect(clippy::expect_used)]
self.meta.last_key.as_ref().expect("should exist"),
),
meta(
"key#min",
#[expect(clippy::expect_used)]
self.meta.first_key.as_ref().expect("should exist"),
),
meta("key_count", &(self.meta.key_count as u64).to_le_bytes()),
meta("prefix_truncation#data", &[1]), meta("prefix_truncation#index", &[1]), meta(
"range_tombstone_count",
&(self.range_tombstones.len() as u64).to_le_bytes(),
),
meta(
"restart_interval#data",
&self.data_block_restart_interval.to_le_bytes(),
),
meta(
"restart_interval#index",
&self.index_block_restart_interval.to_le_bytes(),
),
meta("seqno#kv_max", &self.meta.highest_kv_seqno.to_le_bytes()),
meta("seqno#max", &self.meta.highest_seqno.to_le_bytes()),
meta("seqno#min", &self.meta.lowest_seqno.to_le_bytes()),
meta("table_id", &self.table_id.to_le_bytes()),
meta("table_version", &[3u8]),
meta(
"tombstone_count",
&(self.meta.tombstone_count as u64).to_le_bytes(),
),
meta("user_data_size", &self.meta.uncompressed_size.to_le_bytes()),
meta(
"weak_tombstone_count",
&(self.meta.weak_tombstone_count as u64).to_le_bytes(),
),
meta(
"weak_tombstone_reclaimable",
&(self.meta.weak_tombstone_reclaimable_count as u64).to_le_bytes(),
),
];
#[cfg(debug_assertions)]
{
let is_sorted = meta_items.iter().is_sorted_by_key(|kv| &kv.key);
assert!(is_sorted, "meta items not sorted correctly");
}
self.block_buffer.clear();
DataBlock::encode_into(&mut self.block_buffer, &meta_items, 1, 0.0)?;
Block::write_into(
&mut self.file_writer,
&self.block_buffer,
crate::table::block::BlockType::Meta,
CompressionType::None,
self.encryption.as_deref(),
#[cfg(zstd_any)]
None,
)?;
};
let mut checksum = self.file_writer.into_inner()?;
FsFile::sync_all(&**checksum.inner_mut().get_mut())?;
let checksum = checksum.checksum();
#[expect(
clippy::expect_used,
reason = "if there's no parent folder, something has gone horribly wrong"
)]
crate::file::fsync_directory(self.path.parent().expect("should have folder"), &*self.fs)?;
log::debug!(
"Written {} items in {} blocks into new table file #{}, written {} MiB",
self.meta.item_count,
self.meta.data_block_count,
self.table_id,
*self.meta.file_pos / 1_024 / 1_024,
);
Ok(Some((self.table_id, checksum)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::fs::StdFs;
use test_log::test;
#[test]
fn table_writer_count() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let path = dir.path().join("1");
let mut writer = Writer::new(path, 1, 0, Arc::new(StdFs))?;
assert_eq!(0, writer.meta.key_count);
assert_eq!(0, writer.chunk_size);
writer.write(InternalValue::from_components(
b"a",
b"a",
0,
ValueType::Value,
))?;
assert_eq!(1, writer.meta.key_count);
assert_eq!(2, writer.chunk_size);
writer.write(InternalValue::from_components(
b"b",
b"b",
0,
ValueType::Value,
))?;
assert_eq!(2, writer.meta.key_count);
assert_eq!(4, writer.chunk_size);
writer.write(InternalValue::from_components(
b"c",
b"c",
0,
ValueType::Value,
))?;
assert_eq!(3, writer.meta.key_count);
assert_eq!(6, writer.chunk_size);
writer.spill_block()?;
assert_eq!(0, writer.chunk_size);
Ok(())
}
#[test]
#[should_panic(expected = "index block restart interval must be greater than zero")]
fn writer_rejects_zero_index_block_restart_interval() {
let dir = match tempfile::tempdir() {
Ok(dir) => dir,
Err(e) => panic!("tempdir should be created: {e}"),
};
let path = dir.path().join("1");
let writer = match Writer::new(path, 1, 0, Arc::new(StdFs)) {
Ok(writer) => writer,
Err(e) => panic!("writer should be created: {e}"),
};
let _writer = writer.use_index_block_restart_interval(0);
}
#[test]
#[should_panic(expected = "data block restart interval must be greater than zero")]
fn writer_rejects_zero_data_block_restart_interval() {
let dir = match tempfile::tempdir() {
Ok(dir) => dir,
Err(e) => panic!("tempdir should be created: {e}"),
};
let path = dir.path().join("1");
let writer = match Writer::new(path, 1, 0, Arc::new(StdFs)) {
Ok(writer) => writer,
Err(e) => panic!("writer should be created: {e}"),
};
let _writer = writer.use_data_block_restart_interval(0);
}
#[test]
#[should_panic(
expected = "data block restart interval must be configured before writing starts"
)]
fn writer_rejects_data_block_restart_interval_change_after_write() {
let dir = match tempfile::tempdir() {
Ok(dir) => dir,
Err(e) => panic!("tempdir should be created: {e}"),
};
let path = dir.path().join("1");
let mut writer = match Writer::new(path, 1, 0, Arc::new(StdFs)) {
Ok(writer) => writer,
Err(e) => panic!("writer should be created: {e}"),
};
if let Err(e) = writer.write(InternalValue::from_components(
b"a",
b"v",
0,
ValueType::Value,
)) {
panic!("write should succeed: {e}");
}
let _writer = writer.use_data_block_restart_interval(2);
}
#[test]
#[should_panic(
expected = "index block restart interval must be configured before writing starts"
)]
fn writer_rejects_index_block_restart_interval_change_after_write() {
let dir = match tempfile::tempdir() {
Ok(dir) => dir,
Err(e) => panic!("tempdir should be created: {e}"),
};
let path = dir.path().join("1");
let mut writer = match Writer::new(path, 1, 0, Arc::new(StdFs)) {
Ok(writer) => writer,
Err(e) => panic!("writer should be created: {e}"),
};
if let Err(e) = writer.write(InternalValue::from_components(
b"a",
b"v",
0,
ValueType::Value,
)) {
panic!("write should succeed: {e}");
}
let _writer = writer.use_index_block_restart_interval(2);
}
#[test]
#[should_panic(expected = "partitioned index must be configured before writing starts")]
fn writer_rejects_partitioned_index_switch_after_write() {
let dir = match tempfile::tempdir() {
Ok(dir) => dir,
Err(e) => panic!("tempdir should be created: {e}"),
};
let path = dir.path().join("1");
let mut writer = match Writer::new(path, 1, 0, Arc::new(StdFs)) {
Ok(writer) => writer,
Err(e) => panic!("writer should be created: {e}"),
};
if let Err(e) = writer.write(InternalValue::from_components(
b"a",
b"v",
0,
ValueType::Value,
)) {
panic!("write should succeed: {e}");
}
let _writer = writer.use_partitioned_index();
}
#[test]
fn writer_meta_partition_size_is_chainable_with_full_index_writer() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let path = dir.path().join("full-index");
let mut writer = Writer::new(path, 1, 0, Arc::new(StdFs))?.use_meta_partition_size(8_192);
writer.write(InternalValue::from_components(
b"k",
b"v",
0,
ValueType::Value,
))?;
writer.spill_block()?;
Ok(())
}
#[test]
#[should_panic(expected = "partitioned filter must be configured before writing starts")]
fn writer_rejects_partitioned_filter_switch_after_write() {
let dir = match tempfile::tempdir() {
Ok(dir) => dir,
Err(e) => panic!("tempdir should be created: {e}"),
};
let path = dir.path().join("1");
let mut writer = match Writer::new(path, 1, 0, Arc::new(StdFs)) {
Ok(writer) => writer,
Err(e) => panic!("writer should be created: {e}"),
};
if let Err(e) = writer.write(InternalValue::from_components(
b"a",
b"v",
0,
ValueType::Value,
)) {
panic!("write should succeed: {e}");
}
let _writer = writer.use_partitioned_filter();
}
}