1use std::{
2 collections::VecDeque,
3 fs::{File, OpenOptions},
4 io::{Cursor, Read, Seek, SeekFrom, Write},
5 path::Path,
6};
7
8use fwob_core::{FrameRef, FwobError, Key, KeyType, Schema};
9
10use crate::{
11 encoding::{decode_page_payload, encode_page_payload},
12 file_header::{
13 update_counts, write_file_header, FileHeader, FILE_HEADER_LEN, MAX_PAGE_SIZE, MIN_PAGE_SIZE,
14 },
15 page::{Encoding, PageHeader, PAGE_HEADER_LEN},
16 Codec, Result, V2Error,
17};
18
19const INTERPOLATED_PROBE_WINDOW_MARGIN_DIVISOR: usize = 0;
20const RECORDED_WINDOW_SPANS: usize = 10;
21const INITIAL_COMPRESSED_PROBE_RAW_PAGES: usize = 4;
22const MAX_APPEND_TAIL_PAGES: usize = 10;
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum CodecSelection {
29 Fixed(Codec),
30 Smallest,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum EncodingSelection {
35 Fixed(Encoding),
36 Smallest,
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum PagePacking {
41 EstimateShrink,
42 TightFit,
43}
44
45pub const DEFAULT_PAGE_SIZE: u32 = 512 * 1024;
46pub const DEFAULT_CODEC: Codec = Codec::Zstd;
47pub const DEFAULT_ZSTD_LEVEL: i32 = 6;
48pub const DEFAULT_ENCODING: Encoding = Encoding::ColumnarBasicV1;
49pub const DEFAULT_PAGE_PACKING: PagePacking = PagePacking::EstimateShrink;
50
51#[derive(Debug, Clone, Copy, Default)]
52pub struct PackingStats {
53 pub first_page_attempts: u64,
54 pub subsequent_page_attempts: u64,
55 pub subsequent_pages: u64,
56 pub subsequent_min_attempts: u64,
57 pub subsequent_max_attempts: u64,
58 pub initial_window_attempts: u64,
59 pub window_frame_spans: [u64; RECORDED_WINDOW_SPANS],
60 pub window_span_counts: [u64; RECORDED_WINDOW_SPANS],
61 pub initial_windows: u64,
62 pub window_final_position_sums: [f64; RECORDED_WINDOW_SPANS],
63 pub window_final_position_counts: [u64; RECORDED_WINDOW_SPANS],
64}
65
66impl PackingStats {
67 pub fn subsequent_average_attempts(self) -> f64 {
68 if self.subsequent_pages == 0 {
69 0.0
70 } else {
71 self.subsequent_page_attempts as f64 / self.subsequent_pages as f64
72 }
73 }
74
75 pub fn average_initial_window_attempts(self) -> f64 {
76 if self.initial_windows == 0 {
77 0.0
78 } else {
79 self.initial_window_attempts as f64 / self.initial_windows as f64
80 }
81 }
82
83 pub fn average_window_frame_span(self, index: usize) -> f64 {
84 if index >= RECORDED_WINDOW_SPANS || self.window_span_counts[index] == 0 {
85 0.0
86 } else {
87 self.window_frame_spans[index] as f64 / self.window_span_counts[index] as f64
88 }
89 }
90
91 pub fn average_window_final_position(self, index: usize) -> f64 {
92 if index >= RECORDED_WINDOW_SPANS || self.window_final_position_counts[index] == 0 {
93 0.0
94 } else {
95 self.window_final_position_sums[index] / self.window_final_position_counts[index] as f64
96 }
97 }
98}
99
100#[derive(Debug, Clone)]
101pub struct WriterOptions {
102 pub title: String,
103 pub page_size: u32,
104 pub codec: Codec,
105 pub codec_selection: CodecSelection,
106 pub zstd_level: i32,
107 pub encoding: Encoding,
108 pub encoding_selection: EncodingSelection,
109 pub string_table: Vec<String>,
110 pub compress_partial_page: bool,
111 pub page_packing: PagePacking,
112}
113
114impl WriterOptions {
115 pub fn new(title: impl Into<String>) -> Self {
116 Self {
117 title: title.into(),
118 page_size: DEFAULT_PAGE_SIZE,
119 codec: DEFAULT_CODEC,
120 codec_selection: CodecSelection::Fixed(DEFAULT_CODEC),
121 zstd_level: DEFAULT_ZSTD_LEVEL,
122 encoding: DEFAULT_ENCODING,
123 encoding_selection: EncodingSelection::Fixed(DEFAULT_ENCODING),
124 string_table: Vec::new(),
125 compress_partial_page: false,
126 page_packing: DEFAULT_PAGE_PACKING,
127 }
128 }
129}
130
131pub struct Writer<W> {
132 inner: W,
133 header: FileHeader,
134 key_type: KeyType,
135 options: WriterOptions,
136 pending: PendingFrames,
137 last_key: Option<Key>,
138 next_compaction_frame_count: usize,
139 compressed_page_frame_hint: usize,
140 zstd_compressor: Option<zstd::bulk::Compressor<'static>>,
141 packing_stats: PackingStats,
142 current_page_attempts: u64,
143 append_tail: Option<AppendTail>,
144}
145
146type CompressedCandidate = (Codec, Encoding, Vec<u8>, usize);
147type FittingCandidate = (usize, Codec, Encoding, Vec<u8>, usize);
148
149pub trait Resize {
150 fn resize_len(&mut self, len: u64) -> std::io::Result<()>;
151}
152
153impl Resize for File {
154 fn resize_len(&mut self, len: u64) -> std::io::Result<()> {
155 self.set_len(len)
156 }
157}
158
159impl Resize for Cursor<Vec<u8>> {
160 fn resize_len(&mut self, len: u64) -> std::io::Result<()> {
161 self.get_mut().resize(len as usize, 0);
162 if self.position() > len {
163 self.set_position(len);
164 }
165 Ok(())
166 }
167}
168
169impl<T: Resize + ?Sized> Resize for &mut T {
170 fn resize_len(&mut self, len: u64) -> std::io::Result<()> {
171 (**self).resize_len(len)
172 }
173}
174
175struct AppendTail {
176 start_page: u64,
179 page_count: u64,
180 frame_count: u64,
181 underfilled_start_page: u64,
184 underfilled_frame_count: u64,
185 loaded: bool,
186}
187
188struct PendingFrames {
189 segments: VecDeque<Vec<u8>>,
190 head_offset: usize,
191 len: usize,
192 scratch: Vec<u8>,
193}
194
195impl PendingFrames {
196 fn new() -> Self {
197 Self {
198 segments: VecDeque::new(),
199 head_offset: 0,
200 len: 0,
201 scratch: Vec::new(),
202 }
203 }
204
205 fn is_empty(&self) -> bool {
206 self.len == 0
207 }
208
209 fn frame_count(&self, frame_len: usize) -> usize {
210 self.len / frame_len
211 }
212
213 fn byte_len(&self) -> usize {
214 self.len
215 }
216
217 fn append_copy(&mut self, bytes: &[u8]) {
218 if bytes.is_empty() {
219 return;
220 }
221 self.segments.push_back(bytes.to_vec());
222 self.len += bytes.len();
223 }
224
225 fn append_owned(&mut self, bytes: Vec<u8>) {
226 if bytes.is_empty() {
227 return;
228 }
229 self.len += bytes.len();
230 self.segments.push_back(bytes);
231 }
232
233 fn consume_front(&mut self, mut count: usize) {
234 debug_assert!(count <= self.len);
235 self.len -= count;
236 while count > 0 {
237 let front_len = self
238 .segments
239 .front()
240 .map(|front| front.len() - self.head_offset)
241 .unwrap_or(0);
242 if count < front_len {
243 self.head_offset += count;
244 return;
245 }
246
247 count -= front_len;
248 self.segments.pop_front();
249 self.head_offset = 0;
250 }
251 }
252
253 fn prefix_contiguous(&mut self, count: usize) -> &[u8] {
254 debug_assert!(count <= self.len);
255 if count == 0 {
256 return &[];
257 }
258 if let Some(front) = self.segments.front() {
259 let available = front.len() - self.head_offset;
260 if count <= available {
261 return &front[self.head_offset..self.head_offset + count];
262 }
263 }
264
265 self.scratch.clear();
266 self.scratch.reserve(count);
267 let mut remaining = count;
268 let mut first = true;
269 for segment in &self.segments {
270 let start = if first { self.head_offset } else { 0 };
271 first = false;
272 let available = segment.len() - start;
273 let take = remaining.min(available);
274 self.scratch
275 .extend_from_slice(&segment[start..start + take]);
276 remaining -= take;
277 if remaining == 0 {
278 break;
279 }
280 }
281 &self.scratch
282 }
283
284 fn copy_range(&self, offset: usize, len: usize) -> Vec<u8> {
285 debug_assert!(offset + len <= self.len);
286 let mut out = Vec::with_capacity(len);
287 let mut skip = offset;
288 let mut remaining = len;
289 let mut first = true;
290 for segment in &self.segments {
291 let start = if first { self.head_offset } else { 0 };
292 first = false;
293 let available = segment.len() - start;
294 if skip >= available {
295 skip -= available;
296 continue;
297 }
298 let segment_start = start + skip;
299 let take = remaining.min(segment.len() - segment_start);
300 out.extend_from_slice(&segment[segment_start..segment_start + take]);
301 remaining -= take;
302 skip = 0;
303 if remaining == 0 {
304 break;
305 }
306 }
307 out
308 }
309}
310
311impl Writer<File> {
312 pub fn create(path: impl AsRef<Path>, schema: Schema, options: WriterOptions) -> Result<Self> {
313 let file = File::create(path)?;
314 Self::new(file, schema, options)
315 }
316
317 pub fn open_append(path: impl AsRef<Path>, options: WriterOptions) -> Result<Self> {
318 let mut file = OpenOptions::new().read(true).write(true).open(path)?;
319 let header = crate::file_header::read_file_header(&mut file)?;
320 let key_type = KeyType::from_field(header.schema.key_field())?;
321
322 let raw_page_capacity = ((header.page_size as usize - PAGE_HEADER_LEN)
333 / header.schema.frame_len as usize)
334 .max(1);
335 let max_tail_pages = if options.codec == Codec::None {
336 1
337 } else {
338 MAX_APPEND_TAIL_PAGES
339 };
340 let min_tail_start = header.page_count.saturating_sub(max_tail_pages as u64);
341 let mut tail_start = header.page_count;
342 let mut underfilled_start = header.page_count;
343 let mut run_open = true;
344 while tail_start > min_tail_start {
345 file.seek(SeekFrom::Start(header.page_offset(tail_start - 1)))?;
346 let page = PageHeader::read(&mut file, tail_start - 1)?;
347 if page.codec != Codec::None {
348 break; }
350 if run_open && page.frame_count as usize >= raw_page_capacity {
351 run_open = false; }
353 if options.codec == Codec::None && !run_open {
354 break; }
356 tail_start -= 1;
357 if run_open {
358 underfilled_start = tail_start;
359 }
360 }
361
362 let mut tail_frames = 0u64;
363 let mut underfilled_frames = 0u64;
364 let mut last_key = if tail_start > 0 {
365 file.seek(SeekFrom::Start(header.page_offset(tail_start - 1)))?;
366 Some(PageHeader::read(&mut file, tail_start - 1)?.last_key)
367 } else {
368 None
369 };
370 for page_index in tail_start..header.page_count {
371 file.seek(SeekFrom::Start(header.page_offset(page_index)))?;
372 let page = PageHeader::read(&mut file, page_index)?;
373 tail_frames += u64::from(page.frame_count);
374 if page_index >= underfilled_start {
375 underfilled_frames += u64::from(page.frame_count);
376 }
377 last_key = Some(page.last_key);
378 }
379
380 let append_tail = if tail_frames == 0 {
381 None
382 } else {
383 Some(AppendTail {
384 start_page: tail_start,
385 page_count: header.page_count - tail_start,
386 frame_count: tail_frames,
387 underfilled_start_page: underfilled_start,
388 underfilled_frame_count: underfilled_frames,
389 loaded: false,
390 })
391 };
392 let append_offset = FILE_HEADER_LEN + header.page_count * u64::from(header.page_size);
393 file.seek(SeekFrom::Start(append_offset))?;
394
395 let mut append_options = options;
396 append_options.title = header.title.clone();
397 append_options.page_size = header.page_size;
398 append_options.string_table = header.string_table.clone();
399 normalize_encoding_selection(&mut append_options);
400
401 let zstd_compressor =
402 new_zstd_compressor(append_options.codec_selection, append_options.zstd_level)?;
403 Ok(Self {
404 inner: file,
405 header,
406 key_type,
407 options: append_options,
408 pending: PendingFrames::new(),
409 last_key,
410 next_compaction_frame_count: 0,
411 compressed_page_frame_hint: 0,
412 zstd_compressor,
413 packing_stats: PackingStats::default(),
414 current_page_attempts: 0,
415 append_tail,
416 })
417 }
418}
419
420impl<W: Read + Write + Seek + Resize> Writer<W> {
421 pub fn new(mut inner: W, schema: Schema, options: WriterOptions) -> Result<Self> {
422 let mut options = options;
423 normalize_encoding_selection(&mut options);
424 if !(MIN_PAGE_SIZE..=MAX_PAGE_SIZE).contains(&options.page_size) || options.title.is_empty()
425 {
426 return Err(V2Error::InvalidFileHeader);
427 }
428 let key_type = KeyType::from_field(schema.key_field())?;
429 let header = FileHeader {
430 page_size: options.page_size,
431 page_count: 0,
432 frame_count: 0,
433 key_field_index: schema.key_field_index as u16,
434 title: options.title.clone(),
435 schema,
436 string_table: options.string_table.clone(),
437 };
438 write_file_header(&mut inner, &header)?;
439 let zstd_compressor = new_zstd_compressor(options.codec_selection, options.zstd_level)?;
440 Ok(Self {
441 inner,
442 header,
443 key_type,
444 options,
445 pending: PendingFrames::new(),
446 last_key: None,
447 next_compaction_frame_count: 0,
448 compressed_page_frame_hint: 0,
449 zstd_compressor,
450 packing_stats: PackingStats::default(),
451 current_page_attempts: 0,
452 append_tail: None,
453 })
454 }
455
456 pub fn packing_stats(&self) -> PackingStats {
457 self.packing_stats
458 }
459
460 pub fn header(&self) -> &FileHeader {
461 &self.header
462 }
463
464 pub fn schema(&self) -> &Schema {
465 &self.header.schema
466 }
467
468 pub fn frame_count(&self) -> u64 {
469 let loaded_tail_frames = self
470 .append_tail
471 .as_ref()
472 .filter(|tail| tail.loaded)
473 .map(|tail| tail.frame_count)
474 .unwrap_or(0);
475 self.header.frame_count + self.pending_frame_count() as u64 - loaded_tail_frames
476 }
477
478 pub fn append_frame(&mut self, bytes: &[u8]) -> Result<()> {
479 let frame = FrameRef::new(&self.header.schema, bytes)?;
480 let key = frame.key(&self.header.schema, self.key_type)?;
481 if let Some(last_key) = self.last_key {
482 if key < last_key {
483 return Err(V2Error::KeyOrderViolation);
484 }
485 }
486
487 self.pending.append_copy(bytes);
488 self.last_key = Some(key);
489
490 self.compact_overflowing_tail()?;
491 Ok(())
492 }
493
494 pub fn append_raw_frames(&mut self, bytes: &[u8]) -> Result<()> {
495 let frame_len = self.header.schema.frame_len as usize;
496 if bytes.len() % frame_len != 0 {
497 return Err(V2Error::Core(FwobError::InvalidFrameLength {
498 expected: frame_len,
499 actual: bytes.len(),
500 }));
501 }
502 if bytes.is_empty() {
503 return Ok(());
504 }
505
506 let mut last_key = self.last_key;
507 for frame_bytes in bytes.chunks_exact(frame_len) {
508 let frame = FrameRef::new(&self.header.schema, frame_bytes)?;
509 let key = frame.key(&self.header.schema, self.key_type)?;
510 if let Some(previous) = last_key {
511 if key < previous {
512 return Err(V2Error::KeyOrderViolation);
513 }
514 }
515 last_key = Some(key);
516 }
517
518 self.pending.append_copy(bytes);
519 self.last_key = last_key;
520 self.compact_overflowing_tail()?;
521 Ok(())
522 }
523
524 pub fn append_presorted_raw_frames(&mut self, bytes: &[u8]) -> Result<()> {
525 let frame_len = self.header.schema.frame_len as usize;
526 if bytes.len() % frame_len != 0 {
527 return Err(V2Error::Core(FwobError::InvalidFrameLength {
528 expected: frame_len,
529 actual: bytes.len(),
530 }));
531 }
532 if bytes.is_empty() {
533 return Ok(());
534 }
535
536 let first = FrameRef::new(&self.header.schema, &bytes[..frame_len])?;
537 let first_key = first.key(&self.header.schema, self.key_type)?;
538 if let Some(previous) = self.last_key {
539 if first_key < previous {
540 return Err(V2Error::KeyOrderViolation);
541 }
542 }
543
544 let last_offset = bytes.len() - frame_len;
545 let last = FrameRef::new(&self.header.schema, &bytes[last_offset..])?;
546 self.last_key = Some(last.key(&self.header.schema, self.key_type)?);
547
548 self.pending.append_copy(bytes);
549 self.compact_overflowing_tail()?;
550 Ok(())
551 }
552
553 pub fn append_frames<I, B>(&mut self, frames: I) -> Result<()>
554 where
555 I: IntoIterator<Item = B>,
556 B: AsRef<[u8]>,
557 {
558 for frame in frames {
559 self.append_frame(frame.as_ref())?;
560 }
561 Ok(())
562 }
563
564 pub fn finish(self) -> Result<()> {
565 self.finish_with_stats().map(|_| ())
566 }
567
568 pub fn finish_with_stats(mut self) -> Result<PackingStats> {
569 self.compact_overflowing_tail()?;
570 if self.options.compress_partial_page {
571 while !self.pending.is_empty() {
572 if !self.flush_one_compressed_page_allow_partial()? {
573 break;
574 }
575 }
576 }
577 while !self.pending.is_empty() {
578 self.flush_one_raw_page()?;
579 }
580 self.inner.flush()?;
581 Ok(self.packing_stats)
582 }
583
584 fn compact_overflowing_tail(&mut self) -> Result<()> {
585 while self.should_try_compaction()? {
586 if self.options.codec == Codec::None {
587 self.flush_one_raw_page()?;
588 } else if !self.flush_one_compressed_page()? {
589 self.defer_next_compaction_attempt();
590 break;
591 }
592 }
593 Ok(())
594 }
595
596 fn should_try_compaction(&mut self) -> Result<bool> {
597 if self.pending_frame_count() == 0 {
598 return Ok(false);
599 }
600 let pending_frames = self.logical_pending_frame_count();
601 let raw_page_frames = self.raw_page_frame_capacity();
602 let minimum = if self.options.codec == Codec::None {
603 raw_page_frames + 1
604 } else {
605 raw_page_frames * INITIAL_COMPRESSED_PROBE_RAW_PAGES
606 };
607 let minimum = self.next_compaction_frame_count.max(minimum);
608 if pending_frames < minimum {
609 return Ok(false);
610 }
611 if self.raw_tail_overflows()? {
612 Ok(true)
613 } else {
614 self.defer_next_compaction_attempt();
615 Ok(false)
616 }
617 }
618
619 fn raw_tail_overflows(&mut self) -> Result<bool> {
620 if self.pending_frame_count() == 0 {
621 return Ok(false);
622 }
623 if let Some(tail) = self.append_tail.as_ref().filter(|tail| !tail.loaded) {
624 let total_frames = tail.frame_count as usize + self.pending_frame_count();
625 let encoded_len = self.raw_encoded_len_for_frames(total_frames);
626 return Ok(encoded_len > tail.page_count as usize * self.payload_capacity());
627 }
628
629 let frame_len = self.header.schema.frame_len as usize;
630 let pending_frames = self.pending_frame_count();
631 let raw_len = pending_frames * frame_len;
632 let (_encoding, encoded) = self.encode_pending_prefix(raw_len, pending_frames)?;
633 Ok(encoded.len() > self.payload_capacity())
634 }
635
636 fn defer_next_compaction_attempt(&mut self) {
637 self.next_compaction_frame_count =
638 self.logical_pending_frame_count() + self.raw_page_frame_capacity().max(1);
639 }
640
641 fn defer_next_compressed_fit_attempt(&mut self, pending_frames: usize, compressed_len: usize) {
642 if compressed_len == 0 {
643 self.defer_next_compaction_attempt();
644 return;
645 }
646
647 let capacity = self.payload_capacity();
648 let estimated_full_frames = pending_frames
649 .saturating_mul(capacity)
650 .div_ceil(compressed_len)
651 .saturating_add(1);
652 self.next_compaction_frame_count = estimated_full_frames
653 .max(pending_frames + self.raw_page_frame_capacity().max(1))
654 .max(self.next_compaction_frame_count);
655 }
656
657 fn flush_one_compressed_page(&mut self) -> Result<bool> {
658 if self.pending.is_empty() {
659 return Ok(false);
660 }
661 self.load_append_tail_for_compression()?;
662
663 let frame_len = self.header.schema.frame_len as usize;
664 let pending_frames = self.pending.frame_count(frame_len);
665 if pending_frames == 0 {
666 return Ok(false);
667 }
668 let (frame_count, codec, encoding, compressed, encoded_len) =
669 self.find_largest_fitting_prefix(pending_frames)?;
670 if frame_count == pending_frames {
671 self.defer_next_compressed_fit_attempt(pending_frames, compressed.len());
672 return Ok(false);
673 }
674 self.write_compressed_page(frame_count, codec, encoding, compressed, encoded_len)?;
675 Ok(true)
676 }
677
678 fn flush_one_compressed_page_allow_partial(&mut self) -> Result<bool> {
679 if self.pending.is_empty() {
680 return Ok(false);
681 }
682 self.load_append_tail_for_compression()?;
683 let frame_len = self.header.schema.frame_len as usize;
684 let pending_frames = self.pending.frame_count(frame_len);
685 if pending_frames == 0 {
686 return Ok(false);
687 }
688 let raw_len = pending_frames * frame_len;
689 let (codec, encoding, compressed, encoded_len) =
690 self.compress_pending_prefix(raw_len, pending_frames)?;
691 if compressed.len() <= self.payload_capacity() {
692 self.write_compressed_page(pending_frames, codec, encoding, compressed, encoded_len)?;
693 } else {
694 let (frame_count, codec, encoding, compressed, encoded_len) =
695 self.find_largest_fitting_prefix(pending_frames)?;
696 self.write_compressed_page(frame_count, codec, encoding, compressed, encoded_len)?;
697 }
698 Ok(true)
699 }
700
701 fn write_compressed_page(
702 &mut self,
703 frame_count: usize,
704 codec: Codec,
705 encoding: Encoding,
706 compressed: Vec<u8>,
707 encoded_len: usize,
708 ) -> Result<()> {
709 self.reclaim_append_tail_for_rewrite()?;
710 let frame_len = self.header.schema.frame_len as usize;
711 let raw_len = frame_count * frame_len;
712 let first_key = self.key_at_offset(0)?;
713 let last_key = self.key_at_offset(raw_len - frame_len)?;
714
715 let page_header = PageHeader::new(
716 codec,
717 encoding,
718 first_key,
719 last_key,
720 frame_count as u32,
721 encoded_len as u32,
722 compressed.len() as u32,
723 self.header.frame_count,
724 &compressed,
725 );
726
727 let page_offset =
728 FILE_HEADER_LEN + self.header.page_count * u64::from(self.header.page_size);
729 self.inner.seek(SeekFrom::Start(page_offset))?;
730 page_header.write(&mut self.inner)?;
731 self.inner.write_all(&compressed)?;
732 let written = PAGE_HEADER_LEN + compressed.len();
733 self.inner
734 .write_all(&vec![0u8; self.header.page_size as usize - written])?;
735
736 self.header.page_count += 1;
737 self.header.frame_count += frame_count as u64;
738 update_counts(
739 &mut self.inner,
740 self.header.page_count,
741 self.header.frame_count,
742 )?;
743
744 self.pending.consume_front(raw_len);
745 self.next_compaction_frame_count = 0;
746 self.compressed_page_frame_hint = frame_count;
747 self.record_compressed_page_attempts();
748 if !compressed.is_empty() {
749 self.compressed_page_frame_hint = frame_count
750 .saturating_mul(self.payload_capacity())
751 .div_ceil(compressed.len())
752 .max(frame_count);
753 }
754 Ok(())
755 }
756
757 fn flush_one_raw_page(&mut self) -> Result<()> {
758 if self.pending.is_empty() {
759 return Ok(());
760 }
761 self.reclaim_underfilled_tail_for_raw_rewrite()?;
766 if self.pending.is_empty() {
767 return Ok(());
768 }
769
770 let frame_len = self.header.schema.frame_len as usize;
771 let pending_frames = self.pending.frame_count(frame_len);
772 let frame_count = self.find_largest_raw_page_prefix(pending_frames)?;
773 let raw_len = frame_count * frame_len;
774 let (encoding, encoded) = self.encode_pending_prefix(raw_len, frame_count)?;
775 if encoded.len() > self.payload_capacity() {
776 return Err(V2Error::FrameTooLarge);
777 }
778
779 let first_key = self.key_at_offset(0)?;
780 let last_key = self.key_at_offset(raw_len - frame_len)?;
781 let page_header = PageHeader::new(
782 Codec::None,
783 encoding,
784 first_key,
785 last_key,
786 frame_count as u32,
787 encoded.len() as u32,
788 encoded.len() as u32,
789 self.header.frame_count,
790 &encoded,
791 );
792
793 let page_offset =
794 FILE_HEADER_LEN + self.header.page_count * u64::from(self.header.page_size);
795 self.inner.seek(SeekFrom::Start(page_offset))?;
796 page_header.write(&mut self.inner)?;
797 self.inner.write_all(&encoded)?;
798 let written = PAGE_HEADER_LEN + encoded.len();
799 self.inner
800 .write_all(&vec![0u8; self.header.page_size as usize - written])?;
801
802 self.header.page_count += 1;
803 self.header.frame_count += frame_count as u64;
804 update_counts(
805 &mut self.inner,
806 self.header.page_count,
807 self.header.frame_count,
808 )?;
809
810 self.pending.consume_front(raw_len);
811 self.next_compaction_frame_count = 0;
812 Ok(())
813 }
814
815 fn reclaim_append_tail_for_rewrite(&mut self) -> Result<()> {
819 let Some(tail) = self.append_tail.as_ref() else {
820 return Ok(());
821 };
822 if !tail.loaded {
823 return Ok(());
824 }
825 let tail = self.append_tail.take().expect("tail exists");
826 self.header.page_count = tail.start_page;
827 self.header.frame_count = self.header.frame_count.saturating_sub(tail.frame_count);
828 let new_len = FILE_HEADER_LEN + self.header.page_count * u64::from(self.header.page_size);
829 self.inner.resize_len(new_len)?;
830 update_counts(
831 &mut self.inner,
832 self.header.page_count,
833 self.header.frame_count,
834 )?;
835 self.inner.seek(SeekFrom::Start(new_len))?;
836 Ok(())
837 }
838
839 fn reclaim_underfilled_tail_for_raw_rewrite(&mut self) -> Result<()> {
844 let Some(tail) = self.append_tail.as_ref() else {
845 return Ok(());
846 };
847 let frame_len = self.header.schema.frame_len as usize;
848 let underfilled_start = tail.underfilled_start_page;
849 let underfilled_frames = tail.underfilled_frame_count;
850 let leading_full_frames = tail.frame_count - underfilled_frames;
851 let loaded = tail.loaded;
852
853 if loaded {
854 if leading_full_frames > 0 {
857 self.pending
858 .consume_front(leading_full_frames as usize * frame_len);
859 }
860 } else if underfilled_frames > 0 {
861 let new_pending = self.pending.copy_range(0, self.pending.byte_len());
863 let mut merged = PendingFrames::new();
864 for page_index in underfilled_start..self.header.page_count {
865 self.inner
866 .seek(SeekFrom::Start(self.header.page_offset(page_index)))?;
867 let page = PageHeader::read(&mut self.inner, page_index)?;
868 let mut raw = vec![0u8; page.compressed_len as usize];
869 self.inner.read_exact(&mut raw)?;
870 page.validate_payload(&raw)?;
871 let decoded = decode_page_payload(
872 &self.header.schema,
873 page.encoding,
874 &raw,
875 page.frame_count as usize,
876 )?;
877 merged.append_owned(decoded);
878 }
879 merged.append_owned(new_pending);
880 self.pending = merged;
881 }
882
883 self.append_tail = None;
884 if underfilled_start < self.header.page_count {
887 self.header.page_count = underfilled_start;
888 self.header.frame_count = self.header.frame_count.saturating_sub(underfilled_frames);
889 let new_len =
890 FILE_HEADER_LEN + self.header.page_count * u64::from(self.header.page_size);
891 self.inner.resize_len(new_len)?;
892 update_counts(
893 &mut self.inner,
894 self.header.page_count,
895 self.header.frame_count,
896 )?;
897 self.inner.seek(SeekFrom::Start(new_len))?;
898 }
899 self.next_compaction_frame_count = 0;
900 Ok(())
901 }
902
903 fn load_append_tail_for_compression(&mut self) -> Result<()> {
904 let Some(tail) = self.append_tail.as_mut() else {
905 return Ok(());
906 };
907 if tail.loaded {
908 return Ok(());
909 }
910
911 let new_pending = self.pending.copy_range(0, self.pending.byte_len());
912 let mut loaded = PendingFrames::new();
913 for page_index in tail.start_page..self.header.page_count {
914 self.inner
915 .seek(SeekFrom::Start(self.header.page_offset(page_index)))?;
916 let page = PageHeader::read(&mut self.inner, page_index)?;
917 let mut raw = vec![0u8; page.compressed_len as usize];
918 self.inner.read_exact(&mut raw)?;
919 page.validate_payload(&raw)?;
920 let decoded = decode_page_payload(
921 &self.header.schema,
922 page.encoding,
923 &raw,
924 page.frame_count as usize,
925 )?;
926 loaded.append_owned(decoded);
927 }
928 loaded.append_owned(new_pending);
929 self.pending = loaded;
930 tail.loaded = true;
931 Ok(())
932 }
933
934 fn find_largest_raw_page_prefix(&mut self, pending_frames: usize) -> Result<usize> {
935 let frame_len = self.header.schema.frame_len as usize;
936 let payload_capacity = self.payload_capacity();
937 let per_page_overhead = self.raw_page_encoding_overhead();
938 if payload_capacity <= per_page_overhead {
939 return Err(V2Error::FrameTooLarge);
940 }
941 let frame_count = pending_frames.min((payload_capacity - per_page_overhead) / frame_len);
942 if frame_count == 0 {
943 return Err(V2Error::FrameTooLarge);
944 }
945 Ok(frame_count)
946 }
947
948 fn raw_page_encoding_overhead(&self) -> usize {
949 match self.options.encoding_selection {
950 EncodingSelection::Fixed(Encoding::ColumnarDeltaV1) => self.header.schema.fields.len(),
951 EncodingSelection::Fixed(Encoding::RowRawV1 | Encoding::ColumnarBasicV1)
952 | EncodingSelection::Smallest => 0,
953 }
954 }
955
956 fn raw_encoded_len_for_frames(&self, frame_count: usize) -> usize {
957 frame_count * self.header.schema.frame_len as usize + self.raw_page_encoding_overhead()
958 }
959
960 fn find_largest_fitting_prefix(&mut self, candidate_frames: usize) -> Result<FittingCandidate> {
961 match self.options.page_packing {
962 PagePacking::EstimateShrink => self.find_estimated_fitting_prefix(candidate_frames),
963 PagePacking::TightFit => self.find_gradient_fitting_prefix(candidate_frames),
964 }
965 }
966
967 fn find_estimated_fitting_prefix(
968 &mut self,
969 candidate_frames: usize,
970 ) -> Result<FittingCandidate> {
971 let frame_len = self.header.schema.frame_len as usize;
972 if self.compressed_page_frame_hint == 0 {
973 return self.find_gradient_fitting_prefix(candidate_frames);
974 }
975
976 let probe = self.compressed_page_frame_hint.min(candidate_frames).max(1);
977 let raw_len = probe * frame_len;
978 let (codec, encoding, compressed, encoded_len) =
979 self.compress_pending_prefix(raw_len, probe)?;
980 if compressed.len() <= self.payload_capacity() {
981 return Ok((probe, codec, encoding, compressed, encoded_len));
982 }
983
984 let mut probe = probe;
985 let mut compressed_len = compressed.len();
986 loop {
987 let estimated = probe
988 .saturating_mul(self.payload_capacity())
989 .checked_div(compressed_len.max(1))
990 .unwrap_or(0)
991 .min(probe.saturating_sub(1))
992 .max(1);
993 if estimated == probe {
994 return Err(V2Error::FrameTooLarge);
995 }
996
997 let raw_len = estimated * frame_len;
998 let (codec, encoding, compressed, encoded_len) =
999 self.compress_pending_prefix(raw_len, estimated)?;
1000 if compressed.len() <= self.payload_capacity() {
1001 return Ok((estimated, codec, encoding, compressed, encoded_len));
1002 }
1003
1004 probe = estimated;
1005 compressed_len = compressed.len();
1006 }
1007 }
1008
1009 fn find_gradient_fitting_prefix(
1010 &mut self,
1011 candidate_frames: usize,
1012 ) -> Result<FittingCandidate> {
1013 let frame_len = self.header.schema.frame_len as usize;
1014 let raw_page_frames = self.raw_page_frame_capacity();
1015 let initial_probe = if self.compressed_page_frame_hint > 0 {
1016 self.compressed_page_frame_hint
1017 } else {
1018 raw_page_frames.saturating_mul(INITIAL_COMPRESSED_PROBE_RAW_PAGES)
1019 };
1020 let mut probe = initial_probe.min(candidate_frames).max(1);
1021 let mut best: Option<FittingCandidate> = None;
1022 let mut overflow: Option<(usize, usize)> = None;
1023 let mut initial_window_attempts = 0u64;
1024 let mut window_bounds = [None; RECORDED_WINDOW_SPANS];
1025 let mut recorded_window_spans = 0usize;
1026
1027 for _ in 0..32 {
1028 let recorded_window_spans_before_attempt = recorded_window_spans;
1029 initial_window_attempts += 1;
1030 let raw_len = probe * frame_len;
1031 let (codec, encoding, compressed, encoded_len) =
1032 self.compress_pending_prefix(raw_len, probe)?;
1033 if compressed.len() <= self.payload_capacity() {
1034 best = Some((probe, codec, encoding, compressed, encoded_len));
1035 if recorded_window_spans == 0 {
1036 if let Some((overflow_frame, _)) = overflow {
1037 self.record_initial_window(overflow_frame - probe, initial_window_attempts);
1038 window_bounds[0] = Some((probe, overflow_frame));
1039 recorded_window_spans = 1;
1040 }
1041 }
1042 if probe == candidate_frames {
1043 self.record_window_final_positions(&window_bounds, probe);
1044 return Ok(best.expect("best is set"));
1045 }
1046
1047 let mut ratio_next = probe
1048 .saturating_mul(self.payload_capacity())
1049 .checked_div(best.as_ref().expect("best is set").3.len().max(1))
1050 .unwrap_or(probe)
1051 .min(candidate_frames);
1052 if let Some((overflow_frame, _)) = overflow {
1053 ratio_next = ratio_next.min(overflow_frame.saturating_sub(1));
1054 }
1055 let next = ratio_next;
1056 if next == probe {
1057 self.record_window_final_positions(&window_bounds, probe);
1058 return Ok(best.expect("best is set"));
1059 }
1060 probe = next;
1061 } else {
1062 overflow = Some((probe, compressed.len()));
1063 if let Some((fit, _, _, _, _)) = best.as_ref() {
1064 if recorded_window_spans == 0 {
1065 self.record_initial_window(probe - *fit, initial_window_attempts);
1066 window_bounds[0] = Some((*fit, probe));
1067 recorded_window_spans = 1;
1068 }
1069 if probe <= fit + 1 {
1070 break;
1071 }
1072 let ratio_next = interpolated_probe(
1073 *fit,
1074 best.as_ref().expect("best is set").3.len(),
1075 probe,
1076 compressed.len(),
1077 self.payload_capacity(),
1078 *fit + 1,
1079 probe - 1,
1080 );
1081 if ratio_next == probe {
1082 break;
1083 }
1084 probe = ratio_next;
1085 } else {
1086 let ratio_next = probe
1087 .saturating_mul(self.payload_capacity())
1088 .checked_div(compressed.len().max(1))
1089 .unwrap_or(0)
1090 .min(probe.saturating_sub(1))
1091 .max(1);
1092 let next = ratio_next.max(1);
1093 if next == probe {
1094 break;
1095 }
1096 probe = next;
1097 }
1098 }
1099 if recorded_window_spans_before_attempt > 0 {
1100 if let Some(bounds) =
1101 self.record_next_window_span(&best, overflow, &mut recorded_window_spans)
1102 {
1103 window_bounds[recorded_window_spans - 1] = Some(bounds);
1104 }
1105 }
1106 }
1107
1108 if best.is_none() {
1109 return Err(V2Error::FrameTooLarge);
1110 }
1111
1112 if let Some((overflow, overflow_len)) = overflow {
1113 let (mut lo_frame, mut lo_len) = {
1114 let (frames, _, _, compressed, _) = best.as_ref().expect("best is set");
1115 (*frames, compressed.len())
1116 };
1117 let mut lo = lo_frame + 1;
1118 let mut hi = overflow - 1;
1119 let mut hi_frame = overflow;
1120 let mut hi_len = overflow_len;
1121 while lo <= hi {
1122 let ratio_probe = interpolated_probe(
1123 lo_frame,
1124 lo_len,
1125 hi_frame,
1126 hi_len,
1127 self.payload_capacity(),
1128 lo,
1129 hi,
1130 );
1131 let raw_len = ratio_probe * frame_len;
1132 let (codec, encoding, compressed, encoded_len) =
1133 self.compress_pending_prefix(raw_len, ratio_probe)?;
1134 if compressed.len() <= self.payload_capacity() {
1135 lo_frame = ratio_probe;
1136 lo_len = compressed.len();
1137 best = Some((ratio_probe, codec, encoding, compressed, encoded_len));
1138 lo = ratio_probe + 1;
1139 } else if ratio_probe == 1 {
1140 if recorded_window_spans < RECORDED_WINDOW_SPANS {
1141 self.record_window_span(recorded_window_spans, hi_frame - lo_frame);
1142 window_bounds[recorded_window_spans] = Some((lo_frame, hi_frame));
1143 }
1144 break;
1145 } else {
1146 hi = ratio_probe - 1;
1147 hi_frame = ratio_probe;
1148 hi_len = compressed.len();
1149 }
1150 if recorded_window_spans < RECORDED_WINDOW_SPANS {
1151 self.record_window_span(recorded_window_spans, hi_frame - lo_frame);
1152 window_bounds[recorded_window_spans] = Some((lo_frame, hi_frame));
1153 recorded_window_spans += 1;
1154 }
1155 }
1156 }
1157
1158 if let Some((final_fit, _, _, _, _)) = best.as_ref() {
1159 self.record_window_final_positions(&window_bounds, *final_fit);
1160 }
1161
1162 best.ok_or(V2Error::FrameTooLarge)
1163 }
1164
1165 fn compress_pending_prefix(
1166 &mut self,
1167 raw_len: usize,
1168 frame_count: usize,
1169 ) -> Result<CompressedCandidate> {
1170 self.current_page_attempts += 1;
1171 let schema = self.header.schema.clone();
1172 match self.options.encoding_selection {
1173 EncodingSelection::Fixed(encoding) => {
1174 let raw = self.pending.prefix_contiguous(raw_len);
1175 let encoded = encode_page_payload(&schema, encoding, raw, frame_count)?;
1176 self.compress_encoded(encoding, encoded)
1177 }
1178 EncodingSelection::Smallest => {
1179 let mut best: Option<CompressedCandidate> = None;
1180 for encoding in [Encoding::ColumnarBasicV1, Encoding::ColumnarDeltaV1] {
1181 let raw = self.pending.prefix_contiguous(raw_len);
1182 let encoded = encode_page_payload(&schema, encoding, raw, frame_count)?;
1183 let candidate = self.compress_encoded(encoding, encoded)?;
1184 if best.as_ref().is_none_or(|(_, _, best_compressed, _)| {
1185 candidate.2.len() < best_compressed.len()
1186 }) {
1187 best = Some(candidate);
1188 }
1189 }
1190 Ok(best.expect("smallest encoding has candidates"))
1191 }
1192 }
1193 }
1194
1195 fn compress_encoded(
1196 &mut self,
1197 encoding: Encoding,
1198 encoded: Vec<u8>,
1199 ) -> Result<CompressedCandidate> {
1200 let encoded_len = encoded.len();
1201 match self.options.codec_selection {
1202 CodecSelection::Fixed(codec) => {
1203 let compressed = match codec {
1204 Codec::Zstd => self
1205 .zstd_compressor
1206 .as_mut()
1207 .expect("zstd compressor is initialized")
1208 .compress(&encoded)?,
1209 _ => codec.compress_with_zstd_level(&encoded, self.options.zstd_level)?,
1210 };
1211 Ok((codec, encoding, compressed, encoded_len))
1212 }
1213 CodecSelection::Smallest => {
1214 let mut best_codec = Codec::None;
1215 let mut best =
1216 Codec::None.compress_with_zstd_level(&encoded, self.options.zstd_level)?;
1217 for codec in [Codec::Lz4, Codec::Zstd] {
1218 let compressed = match codec {
1219 Codec::Zstd => self
1220 .zstd_compressor
1221 .as_mut()
1222 .expect("zstd compressor is initialized")
1223 .compress(&encoded)?,
1224 _ => codec.compress_with_zstd_level(&encoded, self.options.zstd_level)?,
1225 };
1226 if compressed.len() < best.len() {
1227 best_codec = codec;
1228 best = compressed;
1229 }
1230 }
1231 Ok((best_codec, encoding, best, encoded_len))
1232 }
1233 }
1234 }
1235
1236 fn encode_pending_prefix(
1237 &mut self,
1238 raw_len: usize,
1239 frame_count: usize,
1240 ) -> Result<(Encoding, Vec<u8>)> {
1241 let schema = self.header.schema.clone();
1242 match self.options.encoding_selection {
1243 EncodingSelection::Fixed(encoding) => Ok((
1244 encoding,
1245 encode_page_payload(
1246 &schema,
1247 encoding,
1248 self.pending.prefix_contiguous(raw_len),
1249 frame_count,
1250 )?,
1251 )),
1252 EncodingSelection::Smallest => {
1253 let mut best: Option<(Encoding, Vec<u8>)> = None;
1254 for encoding in [Encoding::ColumnarBasicV1, Encoding::ColumnarDeltaV1] {
1255 let raw = self.pending.prefix_contiguous(raw_len);
1256 let encoded = encode_page_payload(&schema, encoding, raw, frame_count)?;
1257 if best
1258 .as_ref()
1259 .is_none_or(|(_, best_encoded)| encoded.len() < best_encoded.len())
1260 {
1261 best = Some((encoding, encoded));
1262 }
1263 }
1264 Ok(best.expect("smallest encoding has candidates"))
1265 }
1266 }
1267 }
1268
1269 fn key_at_offset(&self, offset: usize) -> Result<Key> {
1270 let frame_len = self.header.schema.frame_len as usize;
1271 let frame_bytes = self.pending.copy_range(offset, frame_len);
1272 let frame = FrameRef::new(&self.header.schema, &frame_bytes)?;
1273 Ok(frame.key(&self.header.schema, self.key_type)?)
1274 }
1275
1276 fn payload_capacity(&self) -> usize {
1277 self.header.page_size as usize - PAGE_HEADER_LEN
1278 }
1279
1280 fn pending_frame_count(&self) -> usize {
1281 self.pending
1282 .frame_count(self.header.schema.frame_len as usize)
1283 }
1284
1285 fn logical_pending_frame_count(&self) -> usize {
1286 self.pending_frame_count()
1287 + self
1288 .append_tail
1289 .as_ref()
1290 .filter(|tail| !tail.loaded)
1291 .map(|tail| tail.frame_count as usize)
1292 .unwrap_or(0)
1293 }
1294
1295 fn raw_page_frame_capacity(&self) -> usize {
1296 let frame_len = self.header.schema.frame_len as usize;
1297 (self.payload_capacity() / frame_len).max(1)
1298 }
1299
1300 fn record_compressed_page_attempts(&mut self) {
1301 let attempts = self.current_page_attempts;
1302 self.current_page_attempts = 0;
1303 if attempts == 0 {
1304 return;
1305 }
1306 if self.packing_stats.first_page_attempts == 0 {
1307 self.packing_stats.first_page_attempts = attempts;
1308 } else {
1309 self.packing_stats.subsequent_page_attempts += attempts;
1310 self.packing_stats.subsequent_pages += 1;
1311 if self.packing_stats.subsequent_min_attempts == 0 {
1312 self.packing_stats.subsequent_min_attempts = attempts;
1313 } else {
1314 self.packing_stats.subsequent_min_attempts =
1315 self.packing_stats.subsequent_min_attempts.min(attempts);
1316 }
1317 self.packing_stats.subsequent_max_attempts =
1318 self.packing_stats.subsequent_max_attempts.max(attempts);
1319 }
1320 }
1321
1322 fn record_initial_window(&mut self, frame_count: usize, attempts: u64) {
1323 self.packing_stats.initial_windows += 1;
1324 self.packing_stats.initial_window_attempts += attempts;
1325 self.record_window_span(0, frame_count);
1326 }
1327
1328 fn record_window_span(&mut self, index: usize, frame_count: usize) {
1329 if index >= RECORDED_WINDOW_SPANS {
1330 return;
1331 }
1332 self.packing_stats.window_span_counts[index] += 1;
1333 self.packing_stats.window_frame_spans[index] += frame_count as u64;
1334 }
1335
1336 fn record_next_window_span(
1337 &mut self,
1338 best: &Option<FittingCandidate>,
1339 overflow: Option<(usize, usize)>,
1340 recorded_window_spans: &mut usize,
1341 ) -> Option<(usize, usize)> {
1342 if *recorded_window_spans >= RECORDED_WINDOW_SPANS {
1343 return None;
1344 }
1345 if let (Some((fit, _, _, _, _)), Some((overflow, _))) = (best.as_ref(), overflow) {
1346 self.record_window_span(*recorded_window_spans, overflow - *fit);
1347 *recorded_window_spans += 1;
1348 Some((*fit, overflow))
1349 } else {
1350 None
1351 }
1352 }
1353
1354 fn record_window_final_position(
1355 &mut self,
1356 index: usize,
1357 fit: usize,
1358 overflow: usize,
1359 final_fit: usize,
1360 ) {
1361 if index >= RECORDED_WINDOW_SPANS || overflow <= fit {
1362 return;
1363 }
1364 let position = (final_fit - fit) as f64 / (overflow - fit) as f64;
1365 self.packing_stats.window_final_position_sums[index] += position;
1366 self.packing_stats.window_final_position_counts[index] += 1;
1367 }
1368
1369 fn record_window_final_positions(
1370 &mut self,
1371 window_bounds: &[Option<(usize, usize)>; RECORDED_WINDOW_SPANS],
1372 final_fit: usize,
1373 ) {
1374 for (index, bounds) in window_bounds.iter().enumerate() {
1375 if let Some((fit, overflow)) = bounds {
1376 self.record_window_final_position(index, *fit, *overflow, final_fit);
1377 }
1378 }
1379 }
1380}
1381
1382fn interpolated_probe(
1383 fit_frame: usize,
1384 _fit_len: usize,
1385 overflow_frame: usize,
1386 overflow_len: usize,
1387 target_len: usize,
1388 min_frame: usize,
1389 max_frame: usize,
1390) -> usize {
1391 if min_frame >= max_frame {
1392 return min_frame;
1393 }
1394 if overflow_frame <= fit_frame || overflow_len == 0 {
1395 return max_frame;
1396 }
1397
1398 let estimated = overflow_frame
1399 .saturating_mul(target_len)
1400 .checked_div(overflow_len)
1401 .unwrap_or(min_frame)
1402 .max(fit_frame + 1)
1403 .min(overflow_frame - 1);
1404 let window = max_frame - min_frame + 1;
1405 if window <= 2 {
1406 return estimated.clamp(min_frame, max_frame);
1407 }
1408 if INTERPOLATED_PROBE_WINDOW_MARGIN_DIVISOR == 0 {
1409 return estimated.clamp(min_frame, max_frame);
1410 }
1411 let margin = (window / INTERPOLATED_PROBE_WINDOW_MARGIN_DIVISOR).max(1);
1412 let clamped_min = min_frame.saturating_add(margin).min(max_frame);
1413 let clamped_max = max_frame.saturating_sub(margin).max(min_frame);
1414 if clamped_min > clamped_max {
1415 min_frame + ((max_frame - min_frame) >> 1)
1416 } else {
1417 estimated.clamp(clamped_min, clamped_max)
1418 }
1419}
1420
1421fn normalize_encoding_selection(options: &mut WriterOptions) {
1422 if matches!(options.encoding_selection, EncodingSelection::Fixed(_)) {
1423 options.encoding_selection = EncodingSelection::Fixed(options.encoding);
1424 }
1425}
1426
1427fn new_zstd_compressor(
1428 codec_selection: CodecSelection,
1429 zstd_level: i32,
1430) -> Result<Option<zstd::bulk::Compressor<'static>>> {
1431 let needs_zstd = matches!(codec_selection, CodecSelection::Fixed(Codec::Zstd))
1432 || matches!(codec_selection, CodecSelection::Smallest);
1433 if needs_zstd {
1434 Ok(Some(zstd::bulk::Compressor::new(zstd_level)?))
1435 } else {
1436 Ok(None)
1437 }
1438}