Skip to main content

crabka_log/
segment.rs

1//! A single segment: `.log` + `.index` + `.timeindex` files sharing a
2//! base offset.
3
4use std::fs::{File, OpenOptions};
5use std::io::{Seek, SeekFrom};
6use std::path::{Path, PathBuf};
7
8use bytes::Bytes;
9use crabka_protocol::records::{HEADER_LEN, RecordBatch, RecordBatchHeader};
10use zerocopy::FromBytes;
11
12use crate::error::LogError;
13use crate::index::{OffsetIndex, TimeIndex};
14use crate::name;
15
16/// Positioned read: fill `buf` from `offset` in `file` without moving the
17/// file's cursor, looping over short reads until `buf` is full or EOF.
18/// Returns the number of bytes read. Lets readers share the writer's
19/// `File` handle (`&self`) with no `dup(2)`/`lseek(2)` per call — the
20/// hot fetch path runs this for every read.
21fn read_full_at(file: &File, mut offset: u64, buf: &mut [u8]) -> std::io::Result<usize> {
22    let mut total = 0;
23    while total < buf.len() {
24        match read_at(file, offset, &mut buf[total..]) {
25            Ok(0) => break, // EOF
26            Ok(n) => {
27                total += n;
28                offset += n as u64;
29            }
30            Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {}
31            Err(e) => return Err(e),
32        }
33    }
34    Ok(total)
35}
36
37#[cfg(unix)]
38fn read_at(file: &File, offset: u64, buf: &mut [u8]) -> std::io::Result<usize> {
39    std::os::unix::fs::FileExt::read_at(file, buf, offset)
40}
41
42#[cfg(windows)]
43fn read_at(file: &File, offset: u64, buf: &mut [u8]) -> std::io::Result<usize> {
44    std::os::windows::fs::FileExt::seek_read(file, buf, offset)
45}
46
47/// A single log segment: the `.log` data file paired with its sparse
48/// `.index` (offset → byte position) and `.timeindex` (timestamp →
49/// relative offset) sidecars.
50///
51/// A segment is identified by its `base_offset`: the absolute offset of
52/// its first record, encoded into the segment's 20-digit zero-padded
53/// filename. Segments are created via [`Segment::create`] (new active
54/// segment) or opened via [`Segment::open`] (read-only sealed segment)
55/// or [`Segment::open_active`] (active segment with tail recovery).
56#[derive(Debug)]
57pub struct Segment {
58    #[allow(dead_code)] // used by later phases (Log retention, recovery).
59    dir: PathBuf,
60    base_offset: i64,
61    log_file: File,
62    log_size: u64,
63    offset_index: OffsetIndex,
64    time_index: TimeIndex,
65    /// `true` once a new segment has been started after this one. Sealed
66    /// segments don't accept appends.
67    sealed: bool,
68    /// Highest timestamp observed across all batches written here.
69    max_timestamp: i64,
70    /// Last absolute offset (inclusive) of any batch in this segment.
71    last_offset: i64,
72}
73
74/// Verbatim, decode-free output of [`Segment::read_raw`].
75#[derive(Debug, Clone)]
76pub struct RawSegmentRead {
77    /// `base_offset` of the first included batch (≤ requested offset).
78    pub start_offset: i64,
79    /// Last absolute offset covered by `bytes` (`start_offset - 1` if empty).
80    pub last_offset: i64,
81    /// Verbatim `.log` bytes — one or more complete v2 batches.
82    pub bytes: Bytes,
83}
84
85impl RawSegmentRead {
86    fn empty() -> Self {
87        Self {
88            start_offset: 0,
89            last_offset: -1,
90            bytes: Bytes::new(),
91        }
92    }
93
94    /// `true` when no batch bytes were returned.
95    #[must_use]
96    pub fn is_empty(&self) -> bool {
97        self.bytes.is_empty()
98    }
99}
100
101impl Segment {
102    /// Create a fresh active segment at the given base offset. Fails if
103    /// the `.log` file already exists.
104    pub fn create(dir: &Path, base_offset: i64) -> Result<Self, LogError> {
105        let log_path = name::log_path(dir, base_offset);
106        let log_file = OpenOptions::new()
107            .read(true)
108            .write(true)
109            .create_new(true)
110            .open(&log_path)?;
111        let offset_index = OffsetIndex::open(&name::index_path(dir, base_offset))?;
112        let time_index = TimeIndex::open(&name::timeindex_path(dir, base_offset))?;
113        Ok(Self {
114            dir: dir.to_path_buf(),
115            base_offset,
116            log_file,
117            log_size: 0,
118            offset_index,
119            time_index,
120            sealed: false,
121            max_timestamp: i64::MIN,
122            last_offset: base_offset - 1,
123        })
124    }
125
126    /// Open as the active segment, scanning from the last-indexed position
127    /// to EOF when `validate` is true. A partial trailing batch (or one
128    /// that fails to decode) is truncated; cleanly decoded batches update
129    /// `last_offset` and `max_timestamp`.
130    pub fn open_active(dir: &Path, base_offset: i64, validate: bool) -> Result<Self, LogError> {
131        let mut seg = Self::open(dir, base_offset)?;
132        if validate {
133            seg.recover_active_tail()?;
134        }
135        Ok(seg)
136    }
137
138    fn recover_active_tail(&mut self) -> Result<(), LogError> {
139        let scan_start = self
140            .offset_index
141            .last_entry()
142            .map_or(0u64, |(_, pos)| u64::from(pos));
143        if scan_start >= self.log_size {
144            return Ok(());
145        }
146
147        let mut buf = Vec::new();
148        let to_read = usize::try_from(self.log_size - scan_start).unwrap_or(usize::MAX);
149        self.read_log_range(scan_start, &mut buf, to_read)?;
150
151        let mut cur: &[u8] = &buf;
152        let mut consumed: u64 = 0;
153        let mut last_offset = self.last_offset;
154        let mut max_ts = self.max_timestamp;
155        while !cur.is_empty() {
156            let before = cur.len();
157            let Ok(batch) = RecordBatch::decode(&mut cur) else {
158                break;
159            };
160            consumed += (before - cur.len()) as u64;
161            last_offset = batch.base_offset + i64::from(batch.last_offset_delta);
162            if batch.max_timestamp > max_ts {
163                max_ts = batch.max_timestamp;
164            }
165        }
166
167        let valid_end = scan_start + consumed;
168        if valid_end < self.log_size {
169            self.log_file.set_len(valid_end)?;
170            self.log_size = valid_end;
171        }
172        self.last_offset = last_offset;
173        self.max_timestamp = max_ts;
174        Ok(())
175    }
176
177    /// Open an existing segment for reading. Lightweight — no full scan.
178    /// Open an existing segment for reading. The log and index files
179    /// must already exist on disk; the segment is initialized with
180    /// `last_offset = base_offset - 1` and `max_timestamp = i64::MIN`
181    /// until tail recovery (via [`Segment::open_active`]) populates them.
182    pub fn open(dir: &Path, base_offset: i64) -> Result<Self, LogError> {
183        let log_path = name::log_path(dir, base_offset);
184        let log_file = OpenOptions::new().read(true).write(true).open(&log_path)?;
185        let log_size = log_file.metadata()?.len();
186        let offset_index = OffsetIndex::open(&name::index_path(dir, base_offset))?;
187        let time_index = TimeIndex::open(&name::timeindex_path(dir, base_offset))?;
188        Ok(Self {
189            dir: dir.to_path_buf(),
190            base_offset,
191            log_file,
192            log_size,
193            offset_index,
194            time_index,
195            sealed: false,
196            max_timestamp: i64::MIN,
197            last_offset: base_offset - 1,
198        })
199    }
200
201    /// Absolute offset of the first record this segment can hold.
202    #[must_use]
203    pub fn base_offset(&self) -> i64 {
204        self.base_offset
205    }
206
207    /// Path to this segment's `.txnindex` file (may not exist yet).
208    #[must_use]
209    pub fn txn_index_path(&self) -> std::path::PathBuf {
210        crate::name::txnindex_path(&self.dir, self.base_offset)
211    }
212
213    /// Path to the per-partition `.leader-epoch-checkpoint` file in this
214    /// segment's directory. The checkpoint is shared across all segments
215    /// in a partition — epoch history accumulates over the log's lifetime.
216    #[must_use]
217    pub fn leader_epoch_checkpoint_path(&self) -> std::path::PathBuf {
218        crate::name::leader_epoch_checkpoint_path(&self.dir)
219    }
220
221    /// Highest absolute offset (inclusive) of any batch appended to this
222    /// segment. Returns `base_offset - 1` for an empty segment.
223    #[must_use]
224    pub fn last_offset(&self) -> i64 {
225        self.last_offset
226    }
227
228    /// Current `.log` file size in bytes.
229    #[must_use]
230    pub fn size_bytes(&self) -> u64 {
231        self.log_size
232    }
233
234    /// Highest timestamp observed across all batches in this segment.
235    /// Returns `i64::MIN` for an empty segment.
236    #[must_use]
237    pub fn max_timestamp(&self) -> i64 {
238        self.max_timestamp
239    }
240
241    /// Absolute offset and record timestamp of the first record in this
242    /// segment whose timestamp is `>= target_ts`. Uses the sparse time
243    /// index for a floor position, then scans `.log` batches forward
244    /// (the index is sparse, so an exact answer needs the post-index
245    /// scan — matching Kafka's `LogSegment.findOffsetByTimestamp`).
246    /// Returns `None` when no record in this segment qualifies.
247    #[must_use]
248    pub fn offset_for_timestamp(&self, target_ts: i64) -> Option<(i64, i64)> {
249        let floor_rel = self.time_index.lookup(target_ts);
250        let scan_from = self.base_offset + i64::from(floor_rel);
251        self.scan_from_floor(scan_from, |ts| ts >= target_ts)
252    }
253
254    /// Absolute offset and timestamp of the record carrying this
255    /// segment's `max_timestamp`. Ties resolve to the earliest offset
256    /// (Kafka). Returns `None` for an empty segment. Uses the time
257    /// index's floor for the max to start the scan, then scans forward
258    /// for the first record whose timestamp equals the segment max.
259    #[must_use]
260    pub fn offset_of_max_timestamp(&self) -> Option<(i64, i64)> {
261        if self.max_timestamp == i64::MIN {
262            return None;
263        }
264        let floor_rel = self.time_index.lookup(self.max_timestamp);
265        let scan_from = self.base_offset + i64::from(floor_rel);
266        // Equality against `max_timestamp` is safe because Kafka's batch
267        // `max_timestamp` is always a real record timestamp (the largest
268        // among the batch's records), so some record's timestamp equals
269        // the segment max exactly.
270        self.scan_from_floor(scan_from, |ts| ts == self.max_timestamp)
271    }
272
273    /// Scan `.log` batches forward from `floor_offset`, returning the
274    /// (absolute offset, timestamp) of the first record whose timestamp
275    /// satisfies `pred`, or `None` at end of segment.
276    ///
277    /// Reads in bounded windows rather than slurping the whole segment
278    /// tail: the common case (a match within the first window) costs one
279    /// small read. When a window yields no match, the cursor advances
280    /// past the last batch read and the next window is fetched. The loop
281    /// terminates once the cursor passes `last_offset` (see termination
282    /// argument in [`Segment::scan_from_floor_windowed`]).
283    fn scan_from_floor(&self, floor_offset: i64, pred: impl Fn(i64) -> bool) -> Option<(i64, i64)> {
284        // One window roughly covers a default index interval's worth of
285        // log bytes, so a floor lookup typically lands a match in the
286        // first read.
287        const SCAN_WINDOW_BYTES: usize = 64 * 1024;
288        self.scan_from_floor_windowed(floor_offset, SCAN_WINDOW_BYTES, pred)
289    }
290
291    /// Window-size-parameterized core of [`Segment::scan_from_floor`].
292    /// Split out so tests can force multi-window scans with a tiny window.
293    ///
294    /// Termination: each iteration either (a) returns a match, (b) returns
295    /// `None` because `cursor > last_offset`, or (c) decodes at least one
296    /// full batch and advances `cursor` strictly past it. `read` caps
297    /// reads at `max_bytes` and (unlike `read_raw`) has no anti-stall
298    /// guarantee, so a single batch larger than the window decodes to an
299    /// empty `Vec`; we detect that (empty result while `cursor` is still
300    /// within the segment) and double the window before retrying, so the
301    /// window is bounded by the largest batch rather than the whole tail.
302    fn scan_from_floor_windowed(
303        &self,
304        floor_offset: i64,
305        window_bytes: usize,
306        pred: impl Fn(i64) -> bool,
307    ) -> Option<(i64, i64)> {
308        let mut cursor = floor_offset;
309        let mut window = window_bytes.max(1);
310        loop {
311            if cursor > self.last_offset {
312                return None;
313            }
314            let batches = self.read(cursor, window).ok()?;
315            if batches.is_empty() {
316                // The batch at `cursor` is larger than the window, so it
317                // could not be fully decoded. Grow the window and retry
318                // the same cursor; bounded by the largest batch size.
319                window = window.saturating_mul(2);
320                continue;
321            }
322            for batch in &batches {
323                for rec in &batch.records {
324                    let ts = batch.base_timestamp + rec.timestamp_delta;
325                    if pred(ts) {
326                        return Some((batch.base_offset + i64::from(rec.offset_delta), ts));
327                    }
328                }
329            }
330            // No match in this window; resume just past the last batch
331            // read. `read` includes the batch covering `cursor`, so
332            // `last_read` >= cursor and the cursor strictly advances.
333            let last = batches.last().expect("non-empty checked above");
334            let last_read = last.base_offset + i64::from(last.last_offset_delta);
335            cursor = last_read + 1;
336        }
337    }
338
339    /// `true` once the segment has been sealed via [`Segment::seal`];
340    /// sealed segments reject appends.
341    #[must_use]
342    pub fn is_sealed(&self) -> bool {
343        self.sealed
344    }
345
346    /// Read batches starting at or just before `offset`, up to roughly
347    /// `max_bytes` of `.log` data. Returns an empty `Vec` when `offset`
348    /// is past `last_offset`.
349    pub fn read(&self, offset: i64, max_bytes: usize) -> Result<Vec<RecordBatch>, LogError> {
350        if offset > self.last_offset {
351            return Ok(vec![]);
352        }
353        let target_rel = u32::try_from((offset - self.base_offset).max(0))
354            .map_err(|_| LogError::BadSegmentName("target offset out of range".into()))?;
355        let start_pos = u64::from(self.offset_index.lookup(target_rel));
356
357        let initial_cap = max_bytes.min(4 * 1024 * 1024);
358        let mut buf: Vec<u8> = Vec::with_capacity(initial_cap);
359        self.read_log_range(start_pos, &mut buf, max_bytes)?;
360
361        let mut out: Vec<RecordBatch> = Vec::new();
362        let mut total: usize = 0;
363        let mut cursor: &[u8] = &buf;
364        while !cursor.is_empty() {
365            let before = cursor.len();
366            let Ok(batch) = RecordBatch::decode(&mut cursor) else {
367                break; // partial trailing batch — stop.
368            };
369            let consumed = before - cursor.len();
370            let batch_last = batch.base_offset + i64::from(batch.last_offset_delta);
371            if batch_last >= offset {
372                out.push(batch);
373                total += consumed;
374                if total >= max_bytes {
375                    break;
376                }
377            }
378        }
379        Ok(out)
380    }
381
382    /// Read a contiguous run of **complete, verbatim** record-batch bytes
383    /// beginning at the batch containing `fetch_offset`, including only
384    /// batches whose `base_offset < limit_offset`, up to roughly `max_bytes`
385    /// (always at least one batch — Kafka's anti-stall rule). No record
386    /// decoding: only fixed batch headers are read to find boundaries.
387    pub fn read_raw(
388        &self,
389        fetch_offset: i64,
390        limit_offset: i64,
391        max_bytes: usize,
392    ) -> Result<RawSegmentRead, LogError> {
393        if fetch_offset > self.last_offset || fetch_offset >= limit_offset {
394            return Ok(RawSegmentRead::empty());
395        }
396        let target_rel = u32::try_from((fetch_offset - self.base_offset).max(0))
397            .map_err(|_| LogError::Corrupt("read_raw target offset out of range".into()))?;
398        let start_pos = u64::from(self.offset_index.lookup(target_rel));
399
400        let first_read = max_bytes.max(HEADER_LEN);
401        let mut buf: Vec<u8> = Vec::with_capacity(first_read.min(4 * 1024 * 1024));
402        self.read_log_range(start_pos, &mut buf, first_read)?;
403
404        let mut pos = 0usize;
405        let mut range_start: Option<usize> = None;
406        let mut range_end = 0usize;
407        let mut start_offset = fetch_offset;
408        let mut last_offset = fetch_offset - 1;
409
410        loop {
411            if pos + HEADER_LEN > buf.len() {
412                break;
413            }
414            let hdr = RecordBatchHeader::ref_from_bytes(&buf[pos..pos + HEADER_LEN])
415                .map_err(|_| LogError::Corrupt("record batch header".into()))?;
416            let base = hdr.base_offset.get();
417            let batch_len = usize::try_from(hdr.batch_length.get().max(0)).unwrap_or(0);
418            let total = 12 + batch_len;
419            let batch_last = base + i64::from(hdr.last_offset_delta.get());
420
421            if batch_last < fetch_offset {
422                pos += total;
423                continue;
424            }
425            if base >= limit_offset {
426                break;
427            }
428            if pos + total > buf.len() {
429                if range_start.is_none() {
430                    let mut one: Vec<u8> = Vec::with_capacity(total);
431                    self.read_log_range(start_pos + pos as u64, &mut one, total)?;
432                    if one.len() < total {
433                        break;
434                    }
435                    return Ok(RawSegmentRead {
436                        start_offset: base,
437                        last_offset: batch_last,
438                        bytes: Bytes::from(one),
439                    });
440                }
441                break;
442            }
443
444            if range_start.is_none() {
445                range_start = Some(pos);
446                start_offset = base;
447            }
448            range_end = pos + total;
449            last_offset = batch_last;
450            pos += total;
451
452            if range_end - range_start.expect("set above") >= max_bytes {
453                break;
454            }
455        }
456
457        match range_start {
458            Some(s) => Ok(RawSegmentRead {
459                start_offset,
460                last_offset,
461                bytes: Bytes::from(buf).slice(s..range_end),
462            }),
463            None => Ok(RawSegmentRead::empty()),
464        }
465    }
466
467    fn read_log_range(
468        &self,
469        start_pos: u64,
470        buf: &mut Vec<u8>,
471        max_bytes: usize,
472    ) -> Result<(), LogError> {
473        let available = self.log_size.saturating_sub(start_pos);
474        let to_read = available.min(u64::try_from(max_bytes).unwrap_or(u64::MAX));
475        let to_read = usize::try_from(to_read).unwrap_or(usize::MAX);
476        let base = buf.len();
477        buf.resize(base + to_read, 0);
478        let n = read_full_at(&self.log_file, start_pos, &mut buf[base..])?;
479        buf.truncate(base + n);
480        Ok(())
481    }
482
483    /// Append a record batch. Returns the byte position where the batch
484    /// starts.
485    ///
486    /// Side effects:
487    /// - Updates `log_size`, `max_timestamp`, `last_offset`.
488    /// - Adds sparse index entries when bytes-since-last-entry exceeds
489    ///   `index_interval_bytes` (or for the first batch).
490    pub fn append(
491        &mut self,
492        batch: &RecordBatch,
493        index_interval_bytes: u32,
494    ) -> Result<u64, LogError> {
495        use std::io::Write;
496
497        if self.sealed {
498            return Err(LogError::Io(std::io::Error::other("segment is sealed")));
499        }
500
501        let mut buf = bytes::BytesMut::with_capacity(batch.encoded_len());
502        batch.encode(&mut buf)?;
503        let bytes = buf.freeze();
504
505        let position = self.log_size;
506        self.log_file.seek(SeekFrom::End(0))?;
507        self.log_file.write_all(&bytes)?;
508        self.log_size += bytes.len() as u64;
509
510        let last_offset = batch.base_offset + i64::from(batch.last_offset_delta);
511        self.last_offset = last_offset;
512        if batch.max_timestamp > self.max_timestamp {
513            self.max_timestamp = batch.max_timestamp;
514        }
515
516        let should_index = match self.offset_index.last_entry() {
517            None => true,
518            Some((_, last_pos)) => {
519                position.saturating_sub(u64::from(last_pos)) >= u64::from(index_interval_bytes)
520            }
521        };
522        if should_index {
523            let rel = u32::try_from(batch.base_offset - self.base_offset)
524                .map_err(|_| LogError::BadSegmentName("offset overflow in segment".into()))?;
525            let pos_u32 = u32::try_from(position)
526                .map_err(|_| LogError::BadSegmentName("position overflow in segment".into()))?;
527            self.offset_index.append(rel, pos_u32)?;
528            self.time_index.append(self.max_timestamp, rel)?;
529        }
530
531        Ok(position)
532    }
533
534    /// Mark this segment as sealed. No more appends.
535    pub fn seal(&mut self) {
536        self.sealed = true;
537    }
538
539    /// Directory holding this segment's `.log`/`.index`/`.timeindex` files.
540    /// Used by the compactor to read the underlying `.log` file directly,
541    /// bypassing the `Segment::read` path which depends on the in-memory
542    /// `last_offset` (which is stale for sealed segments loaded via
543    /// `Segment::open`).
544    #[must_use]
545    pub fn dir(&self) -> &Path {
546        &self.dir
547    }
548
549    /// Force-sync everything to disk.
550    pub fn flush(&mut self) -> Result<(), LogError> {
551        self.log_file.sync_data()?;
552        self.offset_index.flush()?;
553        self.time_index.flush()?;
554        Ok(())
555    }
556
557    /// Truncate `.log` and indexes so no batches at `relative_offset` `>= rel`
558    /// remain. Used by `Log::truncate_to`. Leaves the segment unsealed.
559    pub fn truncate_to_relative(&mut self, rel: u32) -> Result<(), LogError> {
560        // Read only as far as the cut can be: every kept batch lives below
561        // the first index entry at or after `rel`. When `rel` is past the
562        // last index entry, fall back to the whole file. This avoids
563        // slurping the discarded tail on each truncate.
564        let read_limit = self
565            .offset_index
566            .position_at_or_after(rel)
567            .map_or(self.log_size, u64::from);
568        let mut buf = Vec::new();
569        let to_read = usize::try_from(read_limit).unwrap_or(usize::MAX);
570        self.read_log_range(0, &mut buf, to_read)?;
571
572        let target_abs = self.base_offset + i64::from(rel);
573        let mut cur: &[u8] = &buf;
574        let mut pos: u64 = 0;
575        let mut last_kept_offset = self.base_offset - 1;
576        let mut last_kept_ts = i64::MIN;
577        while !cur.is_empty() {
578            let before = cur.len();
579            let Ok(batch) = RecordBatch::decode(&mut cur) else {
580                break;
581            };
582            let batch_last_offset = batch.base_offset + i64::from(batch.last_offset_delta);
583            if batch_last_offset >= target_abs {
584                break;
585            }
586            pos += (before - cur.len()) as u64;
587            last_kept_offset = batch_last_offset;
588            if batch.max_timestamp > last_kept_ts {
589                last_kept_ts = batch.max_timestamp;
590            }
591        }
592
593        self.log_file.set_len(pos)?;
594        self.log_size = pos;
595        self.last_offset = last_kept_offset;
596        self.max_timestamp = last_kept_ts;
597
598        let pos_u32 =
599            u32::try_from(pos).map_err(|_| LogError::BadSegmentName("position overflow".into()))?;
600        self.offset_index.truncate_by_position(pos_u32)?;
601        self.time_index.truncate_by_relative_offset(rel)?;
602        self.sealed = false;
603        Ok(())
604    }
605}
606
607#[cfg(test)]
608mod tests {
609    use super::*;
610    use assert2::assert;
611    use bytes::Bytes;
612    use crabka_protocol::records::{Record, RecordBatch};
613    use tempfile::tempdir;
614
615    fn sample_batch(base_offset: i64, n: i32, ts_base: i64) -> RecordBatch {
616        let mut b = RecordBatch {
617            base_offset,
618            base_timestamp: ts_base,
619            max_timestamp: ts_base + i64::from(n - 1),
620            last_offset_delta: n - 1,
621            ..RecordBatch::default()
622        };
623        for i in 0..n {
624            b.records.push(Record {
625                offset_delta: i,
626                timestamp_delta: i64::from(i),
627                key: Some(Bytes::from(format!("k{i}"))),
628                value: Some(Bytes::from(format!("v{i}"))),
629                ..Default::default()
630            });
631        }
632        b
633    }
634
635    #[test]
636    fn offset_for_timestamp_finds_first_ge() {
637        let dir = tempdir().unwrap();
638        let mut seg = Segment::create(dir.path(), 0).unwrap();
639        // Two batches: offsets 0..=2 ts 100..=102, offsets 3..=4 ts 200..=201.
640        seg.append(&sample_batch(0, 3, 100), 0).unwrap();
641        seg.append(&sample_batch(3, 2, 200), 0).unwrap();
642        // sample_batch sets per-record timestamp_delta = i, base_timestamp = ts_base.
643        // Batch 1 records: (off0,ts100),(off1,ts101),(off2,ts102).
644        // Batch 2 records: (off3,ts200),(off4,ts201).
645        assert!(seg.offset_for_timestamp(100) == Some((0, 100)));
646        assert!(seg.offset_for_timestamp(101) == Some((1, 101)));
647        assert!(seg.offset_for_timestamp(150) == Some((3, 200)));
648        assert!(seg.offset_for_timestamp(201) == Some((4, 201)));
649        assert!(seg.offset_for_timestamp(202) == None);
650        drop(dir);
651    }
652
653    #[test]
654    fn scan_from_floor_finds_match_beyond_first_window() {
655        let dir = tempdir().unwrap();
656        let mut seg = Segment::create(dir.path(), 0).unwrap();
657        // Many single-record batches with increasing timestamps. With a
658        // tiny scan window each batch lands in its own window, so a match
659        // at the tail forces the windowed loop to advance many times.
660        let n = 50i64;
661        for off in 0..n {
662            let mut b = RecordBatch {
663                base_offset: off,
664                base_timestamp: 1_000 + off,
665                max_timestamp: 1_000 + off,
666                last_offset_delta: 0,
667                ..RecordBatch::default()
668            };
669            b.records.push(Record {
670                offset_delta: 0,
671                timestamp_delta: 0,
672                value: Some(Bytes::from(format!("v{off}"))),
673                ..Default::default()
674            });
675            seg.append(&b, 0).unwrap();
676        }
677        // A window of 1 byte forces one batch per read (anti-stall rule).
678        // Target ts is the very last record's, so the loop must advance
679        // through every window before matching.
680        let target = 1_000 + (n - 1);
681        let got = seg.scan_from_floor_windowed(0, 1, |ts| ts >= target);
682        assert!(got == Some((n - 1, target)));
683        // No-match case must terminate (cursor passes last_offset) → None.
684        let none = seg.scan_from_floor_windowed(0, 1, |ts| ts > 10_000);
685        assert!(none == None);
686        drop(dir);
687    }
688
689    #[test]
690    fn offset_of_max_timestamp_earliest_on_tie() {
691        let dir = tempdir().unwrap();
692        let mut seg = Segment::create(dir.path(), 0).unwrap();
693        // Batch records ts: 100,101,102 (max in batch = 102 at offset 2).
694        seg.append(&sample_batch(0, 3, 100), 0).unwrap();
695        // Second batch: offsets 3,4 ts 200,201 — segment max becomes 201 @4.
696        seg.append(&sample_batch(3, 2, 200), 0).unwrap();
697        assert!(seg.offset_of_max_timestamp() == Some((4, 201)));
698
699        // Empty segment → None.
700        let dir2 = tempdir().unwrap();
701        let empty = Segment::create(dir2.path(), 0).unwrap();
702        assert!(empty.offset_of_max_timestamp() == None);
703        drop(dir);
704        drop(dir2);
705    }
706
707    #[test]
708    fn offset_of_max_timestamp_tie_picks_earliest() {
709        let dir = tempdir().unwrap();
710        let mut seg = Segment::create(dir.path(), 0).unwrap();
711        // All three records share timestamp 500; earliest offset is 0.
712        let mut b = RecordBatch {
713            base_offset: 0,
714            base_timestamp: 500,
715            max_timestamp: 500,
716            last_offset_delta: 2,
717            ..RecordBatch::default()
718        };
719        for i in 0..3 {
720            b.records.push(Record {
721                offset_delta: i,
722                timestamp_delta: 0,
723                value: Some(Bytes::from("v")),
724                ..Default::default()
725            });
726        }
727        seg.append(&b, 0).unwrap();
728        assert!(seg.offset_of_max_timestamp() == Some((0, 500)));
729        drop(dir);
730    }
731
732    #[test]
733    fn append_then_read_back() {
734        let dir = tempdir().unwrap();
735        let mut seg = Segment::create(dir.path(), 0).unwrap();
736        let b1 = sample_batch(0, 3, 1_000_000);
737        let b2 = sample_batch(3, 2, 2_000_000);
738        seg.append(&b1, 4096).unwrap();
739        seg.append(&b2, 4096).unwrap();
740        assert!(seg.last_offset() == 4);
741        let read = seg.read(0, usize::MAX).unwrap();
742        assert!(read.len() == 2);
743        assert!(read[0].records.len() == 3);
744        assert!(read[1].records.len() == 2);
745    }
746
747    #[test]
748    fn read_at_higher_offset_skips_earlier_batches() {
749        let dir = tempdir().unwrap();
750        let mut seg = Segment::create(dir.path(), 0).unwrap();
751        seg.append(&sample_batch(0, 3, 1_000_000), 4096).unwrap();
752        seg.append(&sample_batch(3, 2, 2_000_000), 4096).unwrap();
753        let read = seg.read(4, usize::MAX).unwrap();
754        // Offset 4 falls inside the second batch (offsets 3..=4).
755        assert!(read.len() == 1);
756        assert!(read[0].base_offset == 3);
757    }
758
759    #[test]
760    fn append_to_sealed_segment_errors() {
761        let dir = tempdir().unwrap();
762        let mut seg = Segment::create(dir.path(), 0).unwrap();
763        seg.seal();
764        assert!(seg.is_sealed());
765        let err = seg.append(&sample_batch(0, 1, 0), 4096).unwrap_err();
766        assert!(matches!(err, LogError::Io(_)));
767    }
768
769    #[test]
770    fn read_past_last_offset_returns_empty() {
771        let dir = tempdir().unwrap();
772        let mut seg = Segment::create(dir.path(), 0).unwrap();
773        seg.append(&sample_batch(0, 2, 1_000), 4096).unwrap();
774        let read = seg.read(100, usize::MAX).unwrap();
775        assert!(read.is_empty());
776    }
777
778    #[test]
779    fn flush_succeeds() {
780        let dir = tempdir().unwrap();
781        let mut seg = Segment::create(dir.path(), 0).unwrap();
782        seg.append(&sample_batch(0, 1, 42), 4096).unwrap();
783        seg.flush().unwrap();
784    }
785
786    // ---- read_raw (decode-free) tests ----
787
788    fn test_segment() -> (tempfile::TempDir, Segment) {
789        let dir = tempdir().unwrap();
790        let seg = Segment::create(dir.path(), 0).unwrap();
791        (dir, seg)
792    }
793
794    fn test_batch_at(off: i64) -> RecordBatch {
795        let mut b = RecordBatch {
796            base_offset: off,
797            base_timestamp: 1_000,
798            max_timestamp: 1_000,
799            last_offset_delta: 0,
800            ..RecordBatch::default()
801        };
802        b.records.push(Record {
803            offset_delta: 0,
804            timestamp_delta: 0,
805            value: Some(Bytes::from(format!("v{off}"))),
806            ..Default::default()
807        });
808        b
809    }
810
811    #[test]
812    fn read_raw_is_byte_exact_and_multi_batch() {
813        let (dir, mut seg) = test_segment();
814        let mut wire = bytes::BytesMut::new();
815        for off in 0..3i64 {
816            let b = test_batch_at(off);
817            seg.append(&b, 0).unwrap();
818            b.encode(&mut wire).unwrap();
819        }
820        let wire = wire.freeze();
821        let r = seg.read_raw(0, 3, 10 * 1024 * 1024).unwrap();
822        assert!(r.start_offset == 0);
823        assert!(r.last_offset == 2);
824        assert!(
825            &r.bytes[..] == &wire[..],
826            "raw bytes must equal the on-disk concatenation"
827        );
828        let mut cur: &[u8] = &r.bytes;
829        let mut n = 0;
830        while !cur.is_empty() {
831            crabka_protocol::records::RecordBatch::decode(&mut cur).unwrap();
832            n += 1;
833        }
834        assert!(n == 3);
835        drop(dir);
836    }
837
838    #[test]
839    fn read_raw_clamps_at_limit_offset() {
840        let (dir, mut seg) = test_segment();
841        for off in 0..3i64 {
842            seg.append(&test_batch_at(off), 0).unwrap();
843        }
844        let r = seg.read_raw(0, 2, 10 * 1024 * 1024).unwrap();
845        assert!(r.last_offset == 1);
846        drop(dir);
847    }
848
849    #[test]
850    fn read_raw_returns_at_least_one_batch_over_budget() {
851        let (dir, mut seg) = test_segment();
852        seg.append(&test_batch_at(0), 0).unwrap();
853        let r = seg.read_raw(0, 1, 1).unwrap();
854        assert!(r.start_offset == 0);
855        assert!(r.last_offset == 0);
856        assert!(!r.bytes.is_empty());
857        drop(dir);
858    }
859}