Skip to main content

fwob_v2/
writer.rs

1use std::{
2    collections::VecDeque,
3    fs::{File, OpenOptions},
4    io::{Cursor, Read, Seek, SeekFrom, Write},
5    path::Path,
6};
7
8use fwob_core::{FrameRef, FwobError, Key, KeyType, Schema};
9
10use crate::{
11    encoding::{decode_page_payload, encode_page_payload},
12    file_header::{
13        update_counts, write_file_header, FileHeader, FILE_HEADER_LEN, MAX_PAGE_SIZE, MIN_PAGE_SIZE,
14    },
15    page::{Encoding, PageHeader, PAGE_HEADER_LEN},
16    Codec, Result, V2Error,
17};
18
19const INTERPOLATED_PROBE_WINDOW_MARGIN_DIVISOR: usize = 0;
20const RECORDED_WINDOW_SPANS: usize = 10;
21const INITIAL_COMPRESSED_PROBE_RAW_PAGES: usize = 4;
22/// Maximum number of trailing uncompressed pages a compressing codec will reclaim and repack
23/// when opening a file for append. Bounds the work: a file with thousands of raw pages reclaims
24/// at most this many. `Codec::None` only ever coalesces the single trailing page.
25const MAX_APPEND_TAIL_PAGES: usize = 10;
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum CodecSelection {
29    Fixed(Codec),
30    Smallest,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum EncodingSelection {
35    Fixed(Encoding),
36    Smallest,
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum PagePacking {
41    EstimateShrink,
42    TightFit,
43}
44
45pub const DEFAULT_PAGE_SIZE: u32 = 512 * 1024;
46pub const DEFAULT_CODEC: Codec = Codec::Zstd;
47pub const DEFAULT_ZSTD_LEVEL: i32 = 6;
48pub const DEFAULT_ENCODING: Encoding = Encoding::ColumnarBasicV1;
49pub const DEFAULT_PAGE_PACKING: PagePacking = PagePacking::EstimateShrink;
50
51#[derive(Debug, Clone, Copy, Default)]
52pub struct PackingStats {
53    pub first_page_attempts: u64,
54    pub subsequent_page_attempts: u64,
55    pub subsequent_pages: u64,
56    pub subsequent_min_attempts: u64,
57    pub subsequent_max_attempts: u64,
58    pub initial_window_attempts: u64,
59    pub window_frame_spans: [u64; RECORDED_WINDOW_SPANS],
60    pub window_span_counts: [u64; RECORDED_WINDOW_SPANS],
61    pub initial_windows: u64,
62    pub window_final_position_sums: [f64; RECORDED_WINDOW_SPANS],
63    pub window_final_position_counts: [u64; RECORDED_WINDOW_SPANS],
64}
65
66impl PackingStats {
67    pub fn subsequent_average_attempts(self) -> f64 {
68        if self.subsequent_pages == 0 {
69            0.0
70        } else {
71            self.subsequent_page_attempts as f64 / self.subsequent_pages as f64
72        }
73    }
74
75    pub fn average_initial_window_attempts(self) -> f64 {
76        if self.initial_windows == 0 {
77            0.0
78        } else {
79            self.initial_window_attempts as f64 / self.initial_windows as f64
80        }
81    }
82
83    pub fn average_window_frame_span(self, index: usize) -> f64 {
84        if index >= RECORDED_WINDOW_SPANS || self.window_span_counts[index] == 0 {
85            0.0
86        } else {
87            self.window_frame_spans[index] as f64 / self.window_span_counts[index] as f64
88        }
89    }
90
91    pub fn average_window_final_position(self, index: usize) -> f64 {
92        if index >= RECORDED_WINDOW_SPANS || self.window_final_position_counts[index] == 0 {
93            0.0
94        } else {
95            self.window_final_position_sums[index] / self.window_final_position_counts[index] as f64
96        }
97    }
98}
99
100#[derive(Debug, Clone)]
101pub struct WriterOptions {
102    pub title: String,
103    pub page_size: u32,
104    pub codec: Codec,
105    pub codec_selection: CodecSelection,
106    pub zstd_level: i32,
107    pub encoding: Encoding,
108    pub encoding_selection: EncodingSelection,
109    pub string_table: Vec<String>,
110    pub compress_partial_page: bool,
111    pub page_packing: PagePacking,
112}
113
114impl WriterOptions {
115    pub fn new(title: impl Into<String>) -> Self {
116        Self {
117            title: title.into(),
118            page_size: DEFAULT_PAGE_SIZE,
119            codec: DEFAULT_CODEC,
120            codec_selection: CodecSelection::Fixed(DEFAULT_CODEC),
121            zstd_level: DEFAULT_ZSTD_LEVEL,
122            encoding: DEFAULT_ENCODING,
123            encoding_selection: EncodingSelection::Fixed(DEFAULT_ENCODING),
124            string_table: Vec::new(),
125            compress_partial_page: false,
126            page_packing: DEFAULT_PAGE_PACKING,
127        }
128    }
129}
130
131pub struct Writer<W> {
132    inner: W,
133    header: FileHeader,
134    key_type: KeyType,
135    options: WriterOptions,
136    pending: PendingFrames,
137    last_key: Option<Key>,
138    next_compaction_frame_count: usize,
139    compressed_page_frame_hint: usize,
140    zstd_compressor: Option<zstd::bulk::Compressor<'static>>,
141    packing_stats: PackingStats,
142    current_page_attempts: u64,
143    append_tail: Option<AppendTail>,
144}
145
146type CompressedCandidate = (Codec, Encoding, Vec<u8>, usize);
147type FittingCandidate = (usize, Codec, Encoding, Vec<u8>, usize);
148
149pub trait Resize {
150    fn resize_len(&mut self, len: u64) -> std::io::Result<()>;
151}
152
153impl Resize for File {
154    fn resize_len(&mut self, len: u64) -> std::io::Result<()> {
155        self.set_len(len)
156    }
157}
158
159impl Resize for Cursor<Vec<u8>> {
160    fn resize_len(&mut self, len: u64) -> std::io::Result<()> {
161        self.get_mut().resize(len as usize, 0);
162        if self.position() > len {
163            self.set_position(len);
164        }
165        Ok(())
166    }
167}
168
169impl<T: Resize + ?Sized> Resize for &mut T {
170    fn resize_len(&mut self, len: u64) -> std::io::Result<()> {
171        (**self).resize_len(len)
172    }
173}
174
175struct AppendTail {
176    /// First page of the whole reclaimable tail (full raw pages included). Used by the
177    /// compression path, which may recompress the entire run.
178    start_page: u64,
179    page_count: u64,
180    frame_count: u64,
181    /// First page of the trailing run of under-filled pages (`>= start_page`). A raw flush only
182    /// ever rewrites pages from here on, leaving the dense leading pages in place.
183    underfilled_start_page: u64,
184    underfilled_frame_count: u64,
185    loaded: bool,
186}
187
188struct PendingFrames {
189    segments: VecDeque<Vec<u8>>,
190    head_offset: usize,
191    len: usize,
192    scratch: Vec<u8>,
193}
194
195impl PendingFrames {
196    fn new() -> Self {
197        Self {
198            segments: VecDeque::new(),
199            head_offset: 0,
200            len: 0,
201            scratch: Vec::new(),
202        }
203    }
204
205    fn is_empty(&self) -> bool {
206        self.len == 0
207    }
208
209    fn frame_count(&self, frame_len: usize) -> usize {
210        self.len / frame_len
211    }
212
213    fn byte_len(&self) -> usize {
214        self.len
215    }
216
217    fn append_copy(&mut self, bytes: &[u8]) {
218        if bytes.is_empty() {
219            return;
220        }
221        self.segments.push_back(bytes.to_vec());
222        self.len += bytes.len();
223    }
224
225    fn append_owned(&mut self, bytes: Vec<u8>) {
226        if bytes.is_empty() {
227            return;
228        }
229        self.len += bytes.len();
230        self.segments.push_back(bytes);
231    }
232
233    fn consume_front(&mut self, mut count: usize) {
234        debug_assert!(count <= self.len);
235        self.len -= count;
236        while count > 0 {
237            let front_len = self
238                .segments
239                .front()
240                .map(|front| front.len() - self.head_offset)
241                .unwrap_or(0);
242            if count < front_len {
243                self.head_offset += count;
244                return;
245            }
246
247            count -= front_len;
248            self.segments.pop_front();
249            self.head_offset = 0;
250        }
251    }
252
253    fn prefix_contiguous(&mut self, count: usize) -> &[u8] {
254        debug_assert!(count <= self.len);
255        if count == 0 {
256            return &[];
257        }
258        if let Some(front) = self.segments.front() {
259            let available = front.len() - self.head_offset;
260            if count <= available {
261                return &front[self.head_offset..self.head_offset + count];
262            }
263        }
264
265        self.scratch.clear();
266        self.scratch.reserve(count);
267        let mut remaining = count;
268        let mut first = true;
269        for segment in &self.segments {
270            let start = if first { self.head_offset } else { 0 };
271            first = false;
272            let available = segment.len() - start;
273            let take = remaining.min(available);
274            self.scratch
275                .extend_from_slice(&segment[start..start + take]);
276            remaining -= take;
277            if remaining == 0 {
278                break;
279            }
280        }
281        &self.scratch
282    }
283
284    fn copy_range(&self, offset: usize, len: usize) -> Vec<u8> {
285        debug_assert!(offset + len <= self.len);
286        let mut out = Vec::with_capacity(len);
287        let mut skip = offset;
288        let mut remaining = len;
289        let mut first = true;
290        for segment in &self.segments {
291            let start = if first { self.head_offset } else { 0 };
292            first = false;
293            let available = segment.len() - start;
294            if skip >= available {
295                skip -= available;
296                continue;
297            }
298            let segment_start = start + skip;
299            let take = remaining.min(segment.len() - segment_start);
300            out.extend_from_slice(&segment[segment_start..segment_start + take]);
301            remaining -= take;
302            skip = 0;
303            if remaining == 0 {
304                break;
305            }
306        }
307        out
308    }
309}
310
311impl Writer<File> {
312    pub fn create(path: impl AsRef<Path>, schema: Schema, options: WriterOptions) -> Result<Self> {
313        let file = File::create(path)?;
314        Self::new(file, schema, options)
315    }
316
317    pub fn open_append(path: impl AsRef<Path>, options: WriterOptions) -> Result<Self> {
318        let mut file = OpenOptions::new().read(true).write(true).open(path)?;
319        let header = crate::file_header::read_file_header(&mut file)?;
320        let key_type = KeyType::from_field(header.schema.key_field())?;
321
322        // Identify the trailing run of uncompressed pages that can be reclaimed on append, and
323        // within it the trailing run of *under-filled* pages. Two boundaries are tracked:
324        //   - `tail_start`: start of the whole reclaimable tail (full raw pages included). A
325        //     compressing codec may recompress this entire run, up to MAX_APPEND_TAIL_PAGES,
326        //     stopping only at the first compressed page.
327        //   - `underfilled_start`: start of the trailing run of under-filled pages. A plain raw
328        //     flush (no full compressed page produced) only ever rewrites this run, so already
329        //     dense pages are never touched.
330        // `Codec::None` never compresses, so its tail is just the single trailing under-filled
331        // page ("append to the last page until full").
332        let raw_page_capacity = ((header.page_size as usize - PAGE_HEADER_LEN)
333            / header.schema.frame_len as usize)
334            .max(1);
335        let max_tail_pages = if options.codec == Codec::None {
336            1
337        } else {
338            MAX_APPEND_TAIL_PAGES
339        };
340        let min_tail_start = header.page_count.saturating_sub(max_tail_pages as u64);
341        let mut tail_start = header.page_count;
342        let mut underfilled_start = header.page_count;
343        let mut run_open = true;
344        while tail_start > min_tail_start {
345            file.seek(SeekFrom::Start(header.page_offset(tail_start - 1)))?;
346            let page = PageHeader::read(&mut file, tail_start - 1)?;
347            if page.codec != Codec::None {
348                break; // compressed page: cannot grow / repack across it
349            }
350            if run_open && page.frame_count as usize >= raw_page_capacity {
351                run_open = false; // first full page (from the end) closes the under-filled run
352            }
353            if options.codec == Codec::None && !run_open {
354                break; // None reclaims only the trailing under-filled page, never a full one
355            }
356            tail_start -= 1;
357            if run_open {
358                underfilled_start = tail_start;
359            }
360        }
361
362        let mut tail_frames = 0u64;
363        let mut underfilled_frames = 0u64;
364        let mut last_key = if tail_start > 0 {
365            file.seek(SeekFrom::Start(header.page_offset(tail_start - 1)))?;
366            Some(PageHeader::read(&mut file, tail_start - 1)?.last_key)
367        } else {
368            None
369        };
370        for page_index in tail_start..header.page_count {
371            file.seek(SeekFrom::Start(header.page_offset(page_index)))?;
372            let page = PageHeader::read(&mut file, page_index)?;
373            tail_frames += u64::from(page.frame_count);
374            if page_index >= underfilled_start {
375                underfilled_frames += u64::from(page.frame_count);
376            }
377            last_key = Some(page.last_key);
378        }
379
380        let append_tail = if tail_frames == 0 {
381            None
382        } else {
383            Some(AppendTail {
384                start_page: tail_start,
385                page_count: header.page_count - tail_start,
386                frame_count: tail_frames,
387                underfilled_start_page: underfilled_start,
388                underfilled_frame_count: underfilled_frames,
389                loaded: false,
390            })
391        };
392        let append_offset = FILE_HEADER_LEN + header.page_count * u64::from(header.page_size);
393        file.seek(SeekFrom::Start(append_offset))?;
394
395        let mut append_options = options;
396        append_options.title = header.title.clone();
397        append_options.page_size = header.page_size;
398        append_options.string_table = header.string_table.clone();
399        normalize_encoding_selection(&mut append_options);
400
401        let zstd_compressor =
402            new_zstd_compressor(append_options.codec_selection, append_options.zstd_level)?;
403        Ok(Self {
404            inner: file,
405            header,
406            key_type,
407            options: append_options,
408            pending: PendingFrames::new(),
409            last_key,
410            next_compaction_frame_count: 0,
411            compressed_page_frame_hint: 0,
412            zstd_compressor,
413            packing_stats: PackingStats::default(),
414            current_page_attempts: 0,
415            append_tail,
416        })
417    }
418}
419
420impl<W: Read + Write + Seek + Resize> Writer<W> {
421    pub fn new(mut inner: W, schema: Schema, options: WriterOptions) -> Result<Self> {
422        let mut options = options;
423        normalize_encoding_selection(&mut options);
424        if !(MIN_PAGE_SIZE..=MAX_PAGE_SIZE).contains(&options.page_size) || options.title.is_empty()
425        {
426            return Err(V2Error::InvalidFileHeader);
427        }
428        let key_type = KeyType::from_field(schema.key_field())?;
429        let header = FileHeader {
430            page_size: options.page_size,
431            page_count: 0,
432            frame_count: 0,
433            key_field_index: schema.key_field_index as u16,
434            title: options.title.clone(),
435            schema,
436            string_table: options.string_table.clone(),
437        };
438        write_file_header(&mut inner, &header)?;
439        let zstd_compressor = new_zstd_compressor(options.codec_selection, options.zstd_level)?;
440        Ok(Self {
441            inner,
442            header,
443            key_type,
444            options,
445            pending: PendingFrames::new(),
446            last_key: None,
447            next_compaction_frame_count: 0,
448            compressed_page_frame_hint: 0,
449            zstd_compressor,
450            packing_stats: PackingStats::default(),
451            current_page_attempts: 0,
452            append_tail: None,
453        })
454    }
455
456    pub fn packing_stats(&self) -> PackingStats {
457        self.packing_stats
458    }
459
460    pub fn header(&self) -> &FileHeader {
461        &self.header
462    }
463
464    pub fn schema(&self) -> &Schema {
465        &self.header.schema
466    }
467
468    pub fn frame_count(&self) -> u64 {
469        let loaded_tail_frames = self
470            .append_tail
471            .as_ref()
472            .filter(|tail| tail.loaded)
473            .map(|tail| tail.frame_count)
474            .unwrap_or(0);
475        self.header.frame_count + self.pending_frame_count() as u64 - loaded_tail_frames
476    }
477
478    pub fn append_frame(&mut self, bytes: &[u8]) -> Result<()> {
479        let frame = FrameRef::new(&self.header.schema, bytes)?;
480        let key = frame.key(&self.header.schema, self.key_type)?;
481        if let Some(last_key) = self.last_key {
482            if key < last_key {
483                return Err(V2Error::KeyOrderViolation);
484            }
485        }
486
487        self.pending.append_copy(bytes);
488        self.last_key = Some(key);
489
490        self.compact_overflowing_tail()?;
491        Ok(())
492    }
493
494    pub fn append_raw_frames(&mut self, bytes: &[u8]) -> Result<()> {
495        let frame_len = self.header.schema.frame_len as usize;
496        if bytes.len() % frame_len != 0 {
497            return Err(V2Error::Core(FwobError::InvalidFrameLength {
498                expected: frame_len,
499                actual: bytes.len(),
500            }));
501        }
502        if bytes.is_empty() {
503            return Ok(());
504        }
505
506        let mut last_key = self.last_key;
507        for frame_bytes in bytes.chunks_exact(frame_len) {
508            let frame = FrameRef::new(&self.header.schema, frame_bytes)?;
509            let key = frame.key(&self.header.schema, self.key_type)?;
510            if let Some(previous) = last_key {
511                if key < previous {
512                    return Err(V2Error::KeyOrderViolation);
513                }
514            }
515            last_key = Some(key);
516        }
517
518        self.pending.append_copy(bytes);
519        self.last_key = last_key;
520        self.compact_overflowing_tail()?;
521        Ok(())
522    }
523
524    pub fn append_presorted_raw_frames(&mut self, bytes: &[u8]) -> Result<()> {
525        let frame_len = self.header.schema.frame_len as usize;
526        if bytes.len() % frame_len != 0 {
527            return Err(V2Error::Core(FwobError::InvalidFrameLength {
528                expected: frame_len,
529                actual: bytes.len(),
530            }));
531        }
532        if bytes.is_empty() {
533            return Ok(());
534        }
535
536        let first = FrameRef::new(&self.header.schema, &bytes[..frame_len])?;
537        let first_key = first.key(&self.header.schema, self.key_type)?;
538        if let Some(previous) = self.last_key {
539            if first_key < previous {
540                return Err(V2Error::KeyOrderViolation);
541            }
542        }
543
544        let last_offset = bytes.len() - frame_len;
545        let last = FrameRef::new(&self.header.schema, &bytes[last_offset..])?;
546        self.last_key = Some(last.key(&self.header.schema, self.key_type)?);
547
548        self.pending.append_copy(bytes);
549        self.compact_overflowing_tail()?;
550        Ok(())
551    }
552
553    pub fn append_frames<I, B>(&mut self, frames: I) -> Result<()>
554    where
555        I: IntoIterator<Item = B>,
556        B: AsRef<[u8]>,
557    {
558        for frame in frames {
559            self.append_frame(frame.as_ref())?;
560        }
561        Ok(())
562    }
563
564    pub fn finish(self) -> Result<()> {
565        self.finish_with_stats().map(|_| ())
566    }
567
568    pub fn finish_with_stats(mut self) -> Result<PackingStats> {
569        self.compact_overflowing_tail()?;
570        if self.options.compress_partial_page {
571            while !self.pending.is_empty() {
572                if !self.flush_one_compressed_page_allow_partial()? {
573                    break;
574                }
575            }
576        }
577        while !self.pending.is_empty() {
578            self.flush_one_raw_page()?;
579        }
580        self.inner.flush()?;
581        Ok(self.packing_stats)
582    }
583
584    fn compact_overflowing_tail(&mut self) -> Result<()> {
585        while self.should_try_compaction()? {
586            if self.options.codec == Codec::None {
587                self.flush_one_raw_page()?;
588            } else if !self.flush_one_compressed_page()? {
589                self.defer_next_compaction_attempt();
590                break;
591            }
592        }
593        Ok(())
594    }
595
596    fn should_try_compaction(&mut self) -> Result<bool> {
597        if self.pending_frame_count() == 0 {
598            return Ok(false);
599        }
600        let pending_frames = self.logical_pending_frame_count();
601        let raw_page_frames = self.raw_page_frame_capacity();
602        let minimum = if self.options.codec == Codec::None {
603            raw_page_frames + 1
604        } else {
605            raw_page_frames * INITIAL_COMPRESSED_PROBE_RAW_PAGES
606        };
607        let minimum = self.next_compaction_frame_count.max(minimum);
608        if pending_frames < minimum {
609            return Ok(false);
610        }
611        if self.raw_tail_overflows()? {
612            Ok(true)
613        } else {
614            self.defer_next_compaction_attempt();
615            Ok(false)
616        }
617    }
618
619    fn raw_tail_overflows(&mut self) -> Result<bool> {
620        if self.pending_frame_count() == 0 {
621            return Ok(false);
622        }
623        if let Some(tail) = self.append_tail.as_ref().filter(|tail| !tail.loaded) {
624            let total_frames = tail.frame_count as usize + self.pending_frame_count();
625            let encoded_len = self.raw_encoded_len_for_frames(total_frames);
626            return Ok(encoded_len > tail.page_count as usize * self.payload_capacity());
627        }
628
629        let frame_len = self.header.schema.frame_len as usize;
630        let pending_frames = self.pending_frame_count();
631        let raw_len = pending_frames * frame_len;
632        let (_encoding, encoded) = self.encode_pending_prefix(raw_len, pending_frames)?;
633        Ok(encoded.len() > self.payload_capacity())
634    }
635
636    fn defer_next_compaction_attempt(&mut self) {
637        self.next_compaction_frame_count =
638            self.logical_pending_frame_count() + self.raw_page_frame_capacity().max(1);
639    }
640
641    fn defer_next_compressed_fit_attempt(&mut self, pending_frames: usize, compressed_len: usize) {
642        if compressed_len == 0 {
643            self.defer_next_compaction_attempt();
644            return;
645        }
646
647        let capacity = self.payload_capacity();
648        let estimated_full_frames = pending_frames
649            .saturating_mul(capacity)
650            .div_ceil(compressed_len)
651            .saturating_add(1);
652        self.next_compaction_frame_count = estimated_full_frames
653            .max(pending_frames + self.raw_page_frame_capacity().max(1))
654            .max(self.next_compaction_frame_count);
655    }
656
657    fn flush_one_compressed_page(&mut self) -> Result<bool> {
658        if self.pending.is_empty() {
659            return Ok(false);
660        }
661        self.load_append_tail_for_compression()?;
662
663        let frame_len = self.header.schema.frame_len as usize;
664        let pending_frames = self.pending.frame_count(frame_len);
665        if pending_frames == 0 {
666            return Ok(false);
667        }
668        let (frame_count, codec, encoding, compressed, encoded_len) =
669            self.find_largest_fitting_prefix(pending_frames)?;
670        if frame_count == pending_frames {
671            self.defer_next_compressed_fit_attempt(pending_frames, compressed.len());
672            return Ok(false);
673        }
674        self.write_compressed_page(frame_count, codec, encoding, compressed, encoded_len)?;
675        Ok(true)
676    }
677
678    fn flush_one_compressed_page_allow_partial(&mut self) -> Result<bool> {
679        if self.pending.is_empty() {
680            return Ok(false);
681        }
682        self.load_append_tail_for_compression()?;
683        let frame_len = self.header.schema.frame_len as usize;
684        let pending_frames = self.pending.frame_count(frame_len);
685        if pending_frames == 0 {
686            return Ok(false);
687        }
688        let raw_len = pending_frames * frame_len;
689        let (codec, encoding, compressed, encoded_len) =
690            self.compress_pending_prefix(raw_len, pending_frames)?;
691        if compressed.len() <= self.payload_capacity() {
692            self.write_compressed_page(pending_frames, codec, encoding, compressed, encoded_len)?;
693        } else {
694            let (frame_count, codec, encoding, compressed, encoded_len) =
695                self.find_largest_fitting_prefix(pending_frames)?;
696            self.write_compressed_page(frame_count, codec, encoding, compressed, encoded_len)?;
697        }
698        Ok(true)
699    }
700
701    fn write_compressed_page(
702        &mut self,
703        frame_count: usize,
704        codec: Codec,
705        encoding: Encoding,
706        compressed: Vec<u8>,
707        encoded_len: usize,
708    ) -> Result<()> {
709        self.reclaim_append_tail_for_rewrite()?;
710        let frame_len = self.header.schema.frame_len as usize;
711        let raw_len = frame_count * frame_len;
712        let first_key = self.key_at_offset(0)?;
713        let last_key = self.key_at_offset(raw_len - frame_len)?;
714
715        let page_header = PageHeader::new(
716            codec,
717            encoding,
718            first_key,
719            last_key,
720            frame_count as u32,
721            encoded_len as u32,
722            compressed.len() as u32,
723            self.header.frame_count,
724            &compressed,
725        );
726
727        let page_offset =
728            FILE_HEADER_LEN + self.header.page_count * u64::from(self.header.page_size);
729        self.inner.seek(SeekFrom::Start(page_offset))?;
730        page_header.write(&mut self.inner)?;
731        self.inner.write_all(&compressed)?;
732        let written = PAGE_HEADER_LEN + compressed.len();
733        self.inner
734            .write_all(&vec![0u8; self.header.page_size as usize - written])?;
735
736        self.header.page_count += 1;
737        self.header.frame_count += frame_count as u64;
738        update_counts(
739            &mut self.inner,
740            self.header.page_count,
741            self.header.frame_count,
742        )?;
743
744        self.pending.consume_front(raw_len);
745        self.next_compaction_frame_count = 0;
746        self.compressed_page_frame_hint = frame_count;
747        self.record_compressed_page_attempts();
748        if !compressed.is_empty() {
749            self.compressed_page_frame_hint = frame_count
750                .saturating_mul(self.payload_capacity())
751                .div_ceil(compressed.len())
752                .max(frame_count);
753        }
754        Ok(())
755    }
756
757    fn flush_one_raw_page(&mut self) -> Result<()> {
758        if self.pending.is_empty() {
759            return Ok(());
760        }
761        // A raw flush rewrites only the trailing run of under-filled pages (for the compressed
762        // codec, this is the residual left when compression did not yield a full page; for
763        // `Codec::None`, the single trailing under-filled page). Dense leading pages — and any
764        // full pages a deferred compression attempt had pulled in — are left on disk untouched.
765        self.reclaim_underfilled_tail_for_raw_rewrite()?;
766        if self.pending.is_empty() {
767            return Ok(());
768        }
769
770        let frame_len = self.header.schema.frame_len as usize;
771        let pending_frames = self.pending.frame_count(frame_len);
772        let frame_count = self.find_largest_raw_page_prefix(pending_frames)?;
773        let raw_len = frame_count * frame_len;
774        let (encoding, encoded) = self.encode_pending_prefix(raw_len, frame_count)?;
775        if encoded.len() > self.payload_capacity() {
776            return Err(V2Error::FrameTooLarge);
777        }
778
779        let first_key = self.key_at_offset(0)?;
780        let last_key = self.key_at_offset(raw_len - frame_len)?;
781        let page_header = PageHeader::new(
782            Codec::None,
783            encoding,
784            first_key,
785            last_key,
786            frame_count as u32,
787            encoded.len() as u32,
788            encoded.len() as u32,
789            self.header.frame_count,
790            &encoded,
791        );
792
793        let page_offset =
794            FILE_HEADER_LEN + self.header.page_count * u64::from(self.header.page_size);
795        self.inner.seek(SeekFrom::Start(page_offset))?;
796        page_header.write(&mut self.inner)?;
797        self.inner.write_all(&encoded)?;
798        let written = PAGE_HEADER_LEN + encoded.len();
799        self.inner
800            .write_all(&vec![0u8; self.header.page_size as usize - written])?;
801
802        self.header.page_count += 1;
803        self.header.frame_count += frame_count as u64;
804        update_counts(
805            &mut self.inner,
806            self.header.page_count,
807            self.header.frame_count,
808        )?;
809
810        self.pending.consume_front(raw_len);
811        self.next_compaction_frame_count = 0;
812        Ok(())
813    }
814
815    /// Drop the whole reclaimed (loaded) append tail from the file and truncate to its start, so
816    /// the compressed pages written next replace the entire raw tail (full pages included). Used
817    /// only when a compression attempt produced a full compressed page.
818    fn reclaim_append_tail_for_rewrite(&mut self) -> Result<()> {
819        let Some(tail) = self.append_tail.as_ref() else {
820            return Ok(());
821        };
822        if !tail.loaded {
823            return Ok(());
824        }
825        let tail = self.append_tail.take().expect("tail exists");
826        self.header.page_count = tail.start_page;
827        self.header.frame_count = self.header.frame_count.saturating_sub(tail.frame_count);
828        let new_len = FILE_HEADER_LEN + self.header.page_count * u64::from(self.header.page_size);
829        self.inner.resize_len(new_len)?;
830        update_counts(
831            &mut self.inner,
832            self.header.page_count,
833            self.header.frame_count,
834        )?;
835        self.inner.seek(SeekFrom::Start(new_len))?;
836        Ok(())
837    }
838
839    /// Reclaim only the trailing run of under-filled pages for an in-place raw rewrite, leaving
840    /// the dense leading pages of the tail on disk. Handles both an unloaded tail (reads just the
841    /// under-filled run) and a tail that a deferred compression attempt already pulled wholesale
842    /// into `pending` (drops the dense leading-page frames back off the front).
843    fn reclaim_underfilled_tail_for_raw_rewrite(&mut self) -> Result<()> {
844        let Some(tail) = self.append_tail.as_ref() else {
845            return Ok(());
846        };
847        let frame_len = self.header.schema.frame_len as usize;
848        let underfilled_start = tail.underfilled_start_page;
849        let underfilled_frames = tail.underfilled_frame_count;
850        let leading_full_frames = tail.frame_count - underfilled_frames;
851        let loaded = tail.loaded;
852
853        if loaded {
854            // The whole tail sits in `pending` ahead of the new frames; the dense leading pages
855            // stay on disk, so drop their frames from the front.
856            if leading_full_frames > 0 {
857                self.pending
858                    .consume_front(leading_full_frames as usize * frame_len);
859            }
860        } else if underfilled_frames > 0 {
861            // Load just the under-filled run, ahead of the pending new frames.
862            let new_pending = self.pending.copy_range(0, self.pending.byte_len());
863            let mut merged = PendingFrames::new();
864            for page_index in underfilled_start..self.header.page_count {
865                self.inner
866                    .seek(SeekFrom::Start(self.header.page_offset(page_index)))?;
867                let page = PageHeader::read(&mut self.inner, page_index)?;
868                let mut raw = vec![0u8; page.compressed_len as usize];
869                self.inner.read_exact(&mut raw)?;
870                page.validate_payload(&raw)?;
871                let decoded = decode_page_payload(
872                    &self.header.schema,
873                    page.encoding,
874                    &raw,
875                    page.frame_count as usize,
876                )?;
877                merged.append_owned(decoded);
878            }
879            merged.append_owned(new_pending);
880            self.pending = merged;
881        }
882
883        self.append_tail = None;
884        // Reclaim only the under-filled run; dense leading pages (`< underfilled_start`) stay put.
885        // Truncate the freed pages so a rewrite that shrinks the run leaves no stale pages behind.
886        if underfilled_start < self.header.page_count {
887            self.header.page_count = underfilled_start;
888            self.header.frame_count = self.header.frame_count.saturating_sub(underfilled_frames);
889            let new_len =
890                FILE_HEADER_LEN + self.header.page_count * u64::from(self.header.page_size);
891            self.inner.resize_len(new_len)?;
892            update_counts(
893                &mut self.inner,
894                self.header.page_count,
895                self.header.frame_count,
896            )?;
897            self.inner.seek(SeekFrom::Start(new_len))?;
898        }
899        self.next_compaction_frame_count = 0;
900        Ok(())
901    }
902
903    fn load_append_tail_for_compression(&mut self) -> Result<()> {
904        let Some(tail) = self.append_tail.as_mut() else {
905            return Ok(());
906        };
907        if tail.loaded {
908            return Ok(());
909        }
910
911        let new_pending = self.pending.copy_range(0, self.pending.byte_len());
912        let mut loaded = PendingFrames::new();
913        for page_index in tail.start_page..self.header.page_count {
914            self.inner
915                .seek(SeekFrom::Start(self.header.page_offset(page_index)))?;
916            let page = PageHeader::read(&mut self.inner, page_index)?;
917            let mut raw = vec![0u8; page.compressed_len as usize];
918            self.inner.read_exact(&mut raw)?;
919            page.validate_payload(&raw)?;
920            let decoded = decode_page_payload(
921                &self.header.schema,
922                page.encoding,
923                &raw,
924                page.frame_count as usize,
925            )?;
926            loaded.append_owned(decoded);
927        }
928        loaded.append_owned(new_pending);
929        self.pending = loaded;
930        tail.loaded = true;
931        Ok(())
932    }
933
934    fn find_largest_raw_page_prefix(&mut self, pending_frames: usize) -> Result<usize> {
935        let frame_len = self.header.schema.frame_len as usize;
936        let payload_capacity = self.payload_capacity();
937        let per_page_overhead = self.raw_page_encoding_overhead();
938        if payload_capacity <= per_page_overhead {
939            return Err(V2Error::FrameTooLarge);
940        }
941        let frame_count = pending_frames.min((payload_capacity - per_page_overhead) / frame_len);
942        if frame_count == 0 {
943            return Err(V2Error::FrameTooLarge);
944        }
945        Ok(frame_count)
946    }
947
948    fn raw_page_encoding_overhead(&self) -> usize {
949        match self.options.encoding_selection {
950            EncodingSelection::Fixed(Encoding::ColumnarDeltaV1) => self.header.schema.fields.len(),
951            EncodingSelection::Fixed(Encoding::RowRawV1 | Encoding::ColumnarBasicV1)
952            | EncodingSelection::Smallest => 0,
953        }
954    }
955
956    fn raw_encoded_len_for_frames(&self, frame_count: usize) -> usize {
957        frame_count * self.header.schema.frame_len as usize + self.raw_page_encoding_overhead()
958    }
959
960    fn find_largest_fitting_prefix(&mut self, candidate_frames: usize) -> Result<FittingCandidate> {
961        match self.options.page_packing {
962            PagePacking::EstimateShrink => self.find_estimated_fitting_prefix(candidate_frames),
963            PagePacking::TightFit => self.find_gradient_fitting_prefix(candidate_frames),
964        }
965    }
966
967    fn find_estimated_fitting_prefix(
968        &mut self,
969        candidate_frames: usize,
970    ) -> Result<FittingCandidate> {
971        let frame_len = self.header.schema.frame_len as usize;
972        if self.compressed_page_frame_hint == 0 {
973            return self.find_gradient_fitting_prefix(candidate_frames);
974        }
975
976        let probe = self.compressed_page_frame_hint.min(candidate_frames).max(1);
977        let raw_len = probe * frame_len;
978        let (codec, encoding, compressed, encoded_len) =
979            self.compress_pending_prefix(raw_len, probe)?;
980        if compressed.len() <= self.payload_capacity() {
981            return Ok((probe, codec, encoding, compressed, encoded_len));
982        }
983
984        let mut probe = probe;
985        let mut compressed_len = compressed.len();
986        loop {
987            let estimated = probe
988                .saturating_mul(self.payload_capacity())
989                .checked_div(compressed_len.max(1))
990                .unwrap_or(0)
991                .min(probe.saturating_sub(1))
992                .max(1);
993            if estimated == probe {
994                return Err(V2Error::FrameTooLarge);
995            }
996
997            let raw_len = estimated * frame_len;
998            let (codec, encoding, compressed, encoded_len) =
999                self.compress_pending_prefix(raw_len, estimated)?;
1000            if compressed.len() <= self.payload_capacity() {
1001                return Ok((estimated, codec, encoding, compressed, encoded_len));
1002            }
1003
1004            probe = estimated;
1005            compressed_len = compressed.len();
1006        }
1007    }
1008
1009    fn find_gradient_fitting_prefix(
1010        &mut self,
1011        candidate_frames: usize,
1012    ) -> Result<FittingCandidate> {
1013        let frame_len = self.header.schema.frame_len as usize;
1014        let raw_page_frames = self.raw_page_frame_capacity();
1015        let initial_probe = if self.compressed_page_frame_hint > 0 {
1016            self.compressed_page_frame_hint
1017        } else {
1018            raw_page_frames.saturating_mul(INITIAL_COMPRESSED_PROBE_RAW_PAGES)
1019        };
1020        let mut probe = initial_probe.min(candidate_frames).max(1);
1021        let mut best: Option<FittingCandidate> = None;
1022        let mut overflow: Option<(usize, usize)> = None;
1023        let mut initial_window_attempts = 0u64;
1024        let mut window_bounds = [None; RECORDED_WINDOW_SPANS];
1025        let mut recorded_window_spans = 0usize;
1026
1027        for _ in 0..32 {
1028            let recorded_window_spans_before_attempt = recorded_window_spans;
1029            initial_window_attempts += 1;
1030            let raw_len = probe * frame_len;
1031            let (codec, encoding, compressed, encoded_len) =
1032                self.compress_pending_prefix(raw_len, probe)?;
1033            if compressed.len() <= self.payload_capacity() {
1034                best = Some((probe, codec, encoding, compressed, encoded_len));
1035                if recorded_window_spans == 0 {
1036                    if let Some((overflow_frame, _)) = overflow {
1037                        self.record_initial_window(overflow_frame - probe, initial_window_attempts);
1038                        window_bounds[0] = Some((probe, overflow_frame));
1039                        recorded_window_spans = 1;
1040                    }
1041                }
1042                if probe == candidate_frames {
1043                    self.record_window_final_positions(&window_bounds, probe);
1044                    return Ok(best.expect("best is set"));
1045                }
1046
1047                let mut ratio_next = probe
1048                    .saturating_mul(self.payload_capacity())
1049                    .checked_div(best.as_ref().expect("best is set").3.len().max(1))
1050                    .unwrap_or(probe)
1051                    .min(candidate_frames);
1052                if let Some((overflow_frame, _)) = overflow {
1053                    ratio_next = ratio_next.min(overflow_frame.saturating_sub(1));
1054                }
1055                let next = ratio_next;
1056                if next == probe {
1057                    self.record_window_final_positions(&window_bounds, probe);
1058                    return Ok(best.expect("best is set"));
1059                }
1060                probe = next;
1061            } else {
1062                overflow = Some((probe, compressed.len()));
1063                if let Some((fit, _, _, _, _)) = best.as_ref() {
1064                    if recorded_window_spans == 0 {
1065                        self.record_initial_window(probe - *fit, initial_window_attempts);
1066                        window_bounds[0] = Some((*fit, probe));
1067                        recorded_window_spans = 1;
1068                    }
1069                    if probe <= fit + 1 {
1070                        break;
1071                    }
1072                    let ratio_next = interpolated_probe(
1073                        *fit,
1074                        best.as_ref().expect("best is set").3.len(),
1075                        probe,
1076                        compressed.len(),
1077                        self.payload_capacity(),
1078                        *fit + 1,
1079                        probe - 1,
1080                    );
1081                    if ratio_next == probe {
1082                        break;
1083                    }
1084                    probe = ratio_next;
1085                } else {
1086                    let ratio_next = probe
1087                        .saturating_mul(self.payload_capacity())
1088                        .checked_div(compressed.len().max(1))
1089                        .unwrap_or(0)
1090                        .min(probe.saturating_sub(1))
1091                        .max(1);
1092                    let next = ratio_next.max(1);
1093                    if next == probe {
1094                        break;
1095                    }
1096                    probe = next;
1097                }
1098            }
1099            if recorded_window_spans_before_attempt > 0 {
1100                if let Some(bounds) =
1101                    self.record_next_window_span(&best, overflow, &mut recorded_window_spans)
1102                {
1103                    window_bounds[recorded_window_spans - 1] = Some(bounds);
1104                }
1105            }
1106        }
1107
1108        if best.is_none() {
1109            return Err(V2Error::FrameTooLarge);
1110        }
1111
1112        if let Some((overflow, overflow_len)) = overflow {
1113            let (mut lo_frame, mut lo_len) = {
1114                let (frames, _, _, compressed, _) = best.as_ref().expect("best is set");
1115                (*frames, compressed.len())
1116            };
1117            let mut lo = lo_frame + 1;
1118            let mut hi = overflow - 1;
1119            let mut hi_frame = overflow;
1120            let mut hi_len = overflow_len;
1121            while lo <= hi {
1122                let ratio_probe = interpolated_probe(
1123                    lo_frame,
1124                    lo_len,
1125                    hi_frame,
1126                    hi_len,
1127                    self.payload_capacity(),
1128                    lo,
1129                    hi,
1130                );
1131                let raw_len = ratio_probe * frame_len;
1132                let (codec, encoding, compressed, encoded_len) =
1133                    self.compress_pending_prefix(raw_len, ratio_probe)?;
1134                if compressed.len() <= self.payload_capacity() {
1135                    lo_frame = ratio_probe;
1136                    lo_len = compressed.len();
1137                    best = Some((ratio_probe, codec, encoding, compressed, encoded_len));
1138                    lo = ratio_probe + 1;
1139                } else if ratio_probe == 1 {
1140                    if recorded_window_spans < RECORDED_WINDOW_SPANS {
1141                        self.record_window_span(recorded_window_spans, hi_frame - lo_frame);
1142                        window_bounds[recorded_window_spans] = Some((lo_frame, hi_frame));
1143                    }
1144                    break;
1145                } else {
1146                    hi = ratio_probe - 1;
1147                    hi_frame = ratio_probe;
1148                    hi_len = compressed.len();
1149                }
1150                if recorded_window_spans < RECORDED_WINDOW_SPANS {
1151                    self.record_window_span(recorded_window_spans, hi_frame - lo_frame);
1152                    window_bounds[recorded_window_spans] = Some((lo_frame, hi_frame));
1153                    recorded_window_spans += 1;
1154                }
1155            }
1156        }
1157
1158        if let Some((final_fit, _, _, _, _)) = best.as_ref() {
1159            self.record_window_final_positions(&window_bounds, *final_fit);
1160        }
1161
1162        best.ok_or(V2Error::FrameTooLarge)
1163    }
1164
1165    fn compress_pending_prefix(
1166        &mut self,
1167        raw_len: usize,
1168        frame_count: usize,
1169    ) -> Result<CompressedCandidate> {
1170        self.current_page_attempts += 1;
1171        let schema = self.header.schema.clone();
1172        match self.options.encoding_selection {
1173            EncodingSelection::Fixed(encoding) => {
1174                let raw = self.pending.prefix_contiguous(raw_len);
1175                let encoded = encode_page_payload(&schema, encoding, raw, frame_count)?;
1176                self.compress_encoded(encoding, encoded)
1177            }
1178            EncodingSelection::Smallest => {
1179                let mut best: Option<CompressedCandidate> = None;
1180                for encoding in [Encoding::ColumnarBasicV1, Encoding::ColumnarDeltaV1] {
1181                    let raw = self.pending.prefix_contiguous(raw_len);
1182                    let encoded = encode_page_payload(&schema, encoding, raw, frame_count)?;
1183                    let candidate = self.compress_encoded(encoding, encoded)?;
1184                    if best.as_ref().is_none_or(|(_, _, best_compressed, _)| {
1185                        candidate.2.len() < best_compressed.len()
1186                    }) {
1187                        best = Some(candidate);
1188                    }
1189                }
1190                Ok(best.expect("smallest encoding has candidates"))
1191            }
1192        }
1193    }
1194
1195    fn compress_encoded(
1196        &mut self,
1197        encoding: Encoding,
1198        encoded: Vec<u8>,
1199    ) -> Result<CompressedCandidate> {
1200        let encoded_len = encoded.len();
1201        match self.options.codec_selection {
1202            CodecSelection::Fixed(codec) => {
1203                let compressed = match codec {
1204                    Codec::Zstd => self
1205                        .zstd_compressor
1206                        .as_mut()
1207                        .expect("zstd compressor is initialized")
1208                        .compress(&encoded)?,
1209                    _ => codec.compress_with_zstd_level(&encoded, self.options.zstd_level)?,
1210                };
1211                Ok((codec, encoding, compressed, encoded_len))
1212            }
1213            CodecSelection::Smallest => {
1214                let mut best_codec = Codec::None;
1215                let mut best =
1216                    Codec::None.compress_with_zstd_level(&encoded, self.options.zstd_level)?;
1217                for codec in [Codec::Lz4, Codec::Zstd] {
1218                    let compressed = match codec {
1219                        Codec::Zstd => self
1220                            .zstd_compressor
1221                            .as_mut()
1222                            .expect("zstd compressor is initialized")
1223                            .compress(&encoded)?,
1224                        _ => codec.compress_with_zstd_level(&encoded, self.options.zstd_level)?,
1225                    };
1226                    if compressed.len() < best.len() {
1227                        best_codec = codec;
1228                        best = compressed;
1229                    }
1230                }
1231                Ok((best_codec, encoding, best, encoded_len))
1232            }
1233        }
1234    }
1235
1236    fn encode_pending_prefix(
1237        &mut self,
1238        raw_len: usize,
1239        frame_count: usize,
1240    ) -> Result<(Encoding, Vec<u8>)> {
1241        let schema = self.header.schema.clone();
1242        match self.options.encoding_selection {
1243            EncodingSelection::Fixed(encoding) => Ok((
1244                encoding,
1245                encode_page_payload(
1246                    &schema,
1247                    encoding,
1248                    self.pending.prefix_contiguous(raw_len),
1249                    frame_count,
1250                )?,
1251            )),
1252            EncodingSelection::Smallest => {
1253                let mut best: Option<(Encoding, Vec<u8>)> = None;
1254                for encoding in [Encoding::ColumnarBasicV1, Encoding::ColumnarDeltaV1] {
1255                    let raw = self.pending.prefix_contiguous(raw_len);
1256                    let encoded = encode_page_payload(&schema, encoding, raw, frame_count)?;
1257                    if best
1258                        .as_ref()
1259                        .is_none_or(|(_, best_encoded)| encoded.len() < best_encoded.len())
1260                    {
1261                        best = Some((encoding, encoded));
1262                    }
1263                }
1264                Ok(best.expect("smallest encoding has candidates"))
1265            }
1266        }
1267    }
1268
1269    fn key_at_offset(&self, offset: usize) -> Result<Key> {
1270        let frame_len = self.header.schema.frame_len as usize;
1271        let frame_bytes = self.pending.copy_range(offset, frame_len);
1272        let frame = FrameRef::new(&self.header.schema, &frame_bytes)?;
1273        Ok(frame.key(&self.header.schema, self.key_type)?)
1274    }
1275
1276    fn payload_capacity(&self) -> usize {
1277        self.header.page_size as usize - PAGE_HEADER_LEN
1278    }
1279
1280    fn pending_frame_count(&self) -> usize {
1281        self.pending
1282            .frame_count(self.header.schema.frame_len as usize)
1283    }
1284
1285    fn logical_pending_frame_count(&self) -> usize {
1286        self.pending_frame_count()
1287            + self
1288                .append_tail
1289                .as_ref()
1290                .filter(|tail| !tail.loaded)
1291                .map(|tail| tail.frame_count as usize)
1292                .unwrap_or(0)
1293    }
1294
1295    fn raw_page_frame_capacity(&self) -> usize {
1296        let frame_len = self.header.schema.frame_len as usize;
1297        (self.payload_capacity() / frame_len).max(1)
1298    }
1299
1300    fn record_compressed_page_attempts(&mut self) {
1301        let attempts = self.current_page_attempts;
1302        self.current_page_attempts = 0;
1303        if attempts == 0 {
1304            return;
1305        }
1306        if self.packing_stats.first_page_attempts == 0 {
1307            self.packing_stats.first_page_attempts = attempts;
1308        } else {
1309            self.packing_stats.subsequent_page_attempts += attempts;
1310            self.packing_stats.subsequent_pages += 1;
1311            if self.packing_stats.subsequent_min_attempts == 0 {
1312                self.packing_stats.subsequent_min_attempts = attempts;
1313            } else {
1314                self.packing_stats.subsequent_min_attempts =
1315                    self.packing_stats.subsequent_min_attempts.min(attempts);
1316            }
1317            self.packing_stats.subsequent_max_attempts =
1318                self.packing_stats.subsequent_max_attempts.max(attempts);
1319        }
1320    }
1321
1322    fn record_initial_window(&mut self, frame_count: usize, attempts: u64) {
1323        self.packing_stats.initial_windows += 1;
1324        self.packing_stats.initial_window_attempts += attempts;
1325        self.record_window_span(0, frame_count);
1326    }
1327
1328    fn record_window_span(&mut self, index: usize, frame_count: usize) {
1329        if index >= RECORDED_WINDOW_SPANS {
1330            return;
1331        }
1332        self.packing_stats.window_span_counts[index] += 1;
1333        self.packing_stats.window_frame_spans[index] += frame_count as u64;
1334    }
1335
1336    fn record_next_window_span(
1337        &mut self,
1338        best: &Option<FittingCandidate>,
1339        overflow: Option<(usize, usize)>,
1340        recorded_window_spans: &mut usize,
1341    ) -> Option<(usize, usize)> {
1342        if *recorded_window_spans >= RECORDED_WINDOW_SPANS {
1343            return None;
1344        }
1345        if let (Some((fit, _, _, _, _)), Some((overflow, _))) = (best.as_ref(), overflow) {
1346            self.record_window_span(*recorded_window_spans, overflow - *fit);
1347            *recorded_window_spans += 1;
1348            Some((*fit, overflow))
1349        } else {
1350            None
1351        }
1352    }
1353
1354    fn record_window_final_position(
1355        &mut self,
1356        index: usize,
1357        fit: usize,
1358        overflow: usize,
1359        final_fit: usize,
1360    ) {
1361        if index >= RECORDED_WINDOW_SPANS || overflow <= fit {
1362            return;
1363        }
1364        let position = (final_fit - fit) as f64 / (overflow - fit) as f64;
1365        self.packing_stats.window_final_position_sums[index] += position;
1366        self.packing_stats.window_final_position_counts[index] += 1;
1367    }
1368
1369    fn record_window_final_positions(
1370        &mut self,
1371        window_bounds: &[Option<(usize, usize)>; RECORDED_WINDOW_SPANS],
1372        final_fit: usize,
1373    ) {
1374        for (index, bounds) in window_bounds.iter().enumerate() {
1375            if let Some((fit, overflow)) = bounds {
1376                self.record_window_final_position(index, *fit, *overflow, final_fit);
1377            }
1378        }
1379    }
1380}
1381
1382fn interpolated_probe(
1383    fit_frame: usize,
1384    _fit_len: usize,
1385    overflow_frame: usize,
1386    overflow_len: usize,
1387    target_len: usize,
1388    min_frame: usize,
1389    max_frame: usize,
1390) -> usize {
1391    if min_frame >= max_frame {
1392        return min_frame;
1393    }
1394    if overflow_frame <= fit_frame || overflow_len == 0 {
1395        return max_frame;
1396    }
1397
1398    let estimated = overflow_frame
1399        .saturating_mul(target_len)
1400        .checked_div(overflow_len)
1401        .unwrap_or(min_frame)
1402        .max(fit_frame + 1)
1403        .min(overflow_frame - 1);
1404    let window = max_frame - min_frame + 1;
1405    if window <= 2 {
1406        return estimated.clamp(min_frame, max_frame);
1407    }
1408    if INTERPOLATED_PROBE_WINDOW_MARGIN_DIVISOR == 0 {
1409        return estimated.clamp(min_frame, max_frame);
1410    }
1411    let margin = (window / INTERPOLATED_PROBE_WINDOW_MARGIN_DIVISOR).max(1);
1412    let clamped_min = min_frame.saturating_add(margin).min(max_frame);
1413    let clamped_max = max_frame.saturating_sub(margin).max(min_frame);
1414    if clamped_min > clamped_max {
1415        min_frame + ((max_frame - min_frame) >> 1)
1416    } else {
1417        estimated.clamp(clamped_min, clamped_max)
1418    }
1419}
1420
1421fn normalize_encoding_selection(options: &mut WriterOptions) {
1422    if matches!(options.encoding_selection, EncodingSelection::Fixed(_)) {
1423        options.encoding_selection = EncodingSelection::Fixed(options.encoding);
1424    }
1425}
1426
1427fn new_zstd_compressor(
1428    codec_selection: CodecSelection,
1429    zstd_level: i32,
1430) -> Result<Option<zstd::bulk::Compressor<'static>>> {
1431    let needs_zstd = matches!(codec_selection, CodecSelection::Fixed(Codec::Zstd))
1432        || matches!(codec_selection, CodecSelection::Smallest);
1433    if needs_zstd {
1434        Ok(Some(zstd::bulk::Compressor::new(zstd_level)?))
1435    } else {
1436        Ok(None)
1437    }
1438}