Skip to main content

crabka_log/
log.rs

1//! `Log` — a sorted collection of `Segment`s with append/read/truncate.
2
3use std::collections::{HashMap, HashSet};
4use std::fs;
5use std::path::{Path, PathBuf};
6use std::time::SystemTime;
7
8use bytes::{Bytes, BytesMut};
9use crabka_protocol::records::{HEADER_LEN, RecordBatch};
10
11use crate::config::LogConfig;
12use crate::error::LogError;
13use crate::leader_epoch_checkpoint::LeaderEpochCheckpoint;
14use crate::name;
15use crate::retention;
16use crate::segment::{RawSegmentRead, Segment};
17use crate::txn_index::{AbortedTxn, TxnIndex};
18
19/// A Kafka-format log: a sorted collection of [`Segment`]s plus a single
20/// active segment that accepts appends.
21///
22/// `Log` is single-writer (`&mut self` for mutation) and supports
23/// concurrent readers (`&self` for `read`/`log_start_offset`/etc.).
24/// Construct one with [`Log::open`].
25#[derive(Debug)]
26// `log_start_override` mirrors Kafka's `log_start_offset` terminology;
27// renaming to drop the `log_` prefix would obscure the field's role.
28#[allow(clippy::struct_field_names)]
29pub struct Log {
30    dir: PathBuf,
31    config: std::sync::Arc<std::sync::RwLock<LogConfig>>,
32    segments: Vec<Segment>,
33    active: Option<Segment>,
34    /// Override for `log_start_offset()`. When `Some(n)`, the effective
35    /// `log_start` is `max(derived_from_segments, n)`. Used by
36    /// `trim_to_offset` (and in tests) to advance the log start pointer
37    /// without physically deleting segments (active-segment case) or to
38    /// simulate retention-driven truncation in integration tests. KIP-405's
39    /// `local_log_start_offset` co-advances with this pointer, so
40    /// [`Log::local_log_start_offset`] delegates here — there is a single
41    /// source of truth.
42    log_start_override: Option<i64>,
43
44    /// Last-Stable-Offset: the offset before the first record of any
45    /// in-flight transaction. Defaults to `log_end_offset()` when no
46    /// transactions are in flight.
47    lso: i64,
48
49    /// In-flight transactions: `producer_id` → first offset of this
50    /// producer's currently-open txn. Cleared when a commit/abort
51    /// marker for that `producer_id` is applied.
52    pending: HashMap<i64, i64>,
53
54    /// Active segment's `TxnIndex`. Reopened on segment roll.
55    active_txn_index: TxnIndex,
56
57    /// Per-partition leader-epoch checkpoint. Shared across segments —
58    /// epoch history accumulates over the log's lifetime.
59    epoch_checkpoint: LeaderEpochCheckpoint,
60}
61
62/// Result of [`Log::read`]: the absolute offset of the first batch
63/// returned and the batches themselves.
64///
65/// `start_offset` falls back to the requested offset when no batches are
66/// returned (e.g., reading at the log end), so callers can resume from
67/// the value without special-casing emptiness.
68#[derive(Debug)]
69pub struct ReadOutput {
70    /// Absolute offset of the first record in [`Self::batches`], or the
71    /// requested offset when no batches were returned.
72    pub start_offset: i64,
73    /// Decoded batches in offset order. May be empty if the log has no
74    /// data at or after the requested offset.
75    pub batches: Vec<RecordBatch>,
76}
77
78/// Verbatim, decode-free output of [`Log::read_raw`].
79#[derive(Debug, Clone)]
80pub struct RawRead {
81    /// Absolute offset of the first batch in [`Self::bytes`], or the
82    /// requested offset when no bytes were returned.
83    pub start_offset: i64,
84    /// Verbatim `.log` bytes — zero or more complete v2 batches, spanning
85    /// segment boundaries.
86    pub bytes: Bytes,
87    /// Length of [`Self::bytes`] in bytes.
88    pub total: usize,
89}
90
91impl RawRead {
92    fn empty(off: i64) -> Self {
93        Self {
94            start_offset: off,
95            bytes: Bytes::new(),
96            total: 0,
97        }
98    }
99}
100
101/// A sealed segment described for tiered-storage
102/// offload (KIP-405). Carries the on-disk file paths plus the offset / timestamp /
103/// size metadata and the leader-epoch ranges a `RemoteLogManager` needs to
104/// build remote-segment metadata. Produced by [`Log::tierable_segments`].
105#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct SegmentExport {
107    /// First absolute offset in the segment.
108    pub base_offset: i64,
109    /// Last absolute offset (inclusive) in the segment.
110    pub last_offset: i64,
111    /// Highest record timestamp in the segment, or `-1` when unknown
112    /// (a sealed segment loaded from disk without a tail scan).
113    pub max_timestamp: i64,
114    /// `.log` file size in bytes.
115    pub size_bytes: u64,
116    /// Path to the `.log` data file.
117    pub log_path: PathBuf,
118    /// Path to the `.index` (offset index) file.
119    pub offset_index_path: PathBuf,
120    /// Path to the `.timeindex` file.
121    pub time_index_path: PathBuf,
122    /// Path to the `.txnindex` file, present only when it exists on disk.
123    pub transaction_index_path: Option<PathBuf>,
124    /// Leader epochs whose coverage overlaps `[base_offset, last_offset]`,
125    /// as `(epoch, start_offset)` clamped to `base_offset`, ordered by
126    /// offset. May be empty when no epochs were recorded for this log.
127    pub leader_epochs: Vec<(i32, i64)>,
128}
129
130impl Log {
131    /// Open or create a `Log` at `dir`. Discovers existing segments by
132    /// `.log` filename, marks all but the latest as sealed, and (if the
133    /// directory is empty) creates a fresh active segment at offset 0.
134    pub fn open(dir: impl AsRef<Path>, config: LogConfig) -> Result<Self, LogError> {
135        let dir = dir.as_ref().to_path_buf();
136        fs::create_dir_all(&dir)?;
137
138        // Heal any orphaned compaction `.swap` files before
139        // we scan the directory for segments.
140        crate::recovery::swap_orphan_recover(&dir)?;
141
142        let mut base_offsets: Vec<i64> = Vec::new();
143        for entry in fs::read_dir(&dir)? {
144            let entry = entry?;
145            let Ok(file_name) = entry.file_name().into_string() else {
146                continue; // non-UTF-8 names: ignore (unlikely)
147            };
148            if let Ok(base) = name::parse_log_filename(&file_name) {
149                base_offsets.push(base);
150            }
151        }
152        base_offsets.sort_unstable();
153        base_offsets.dedup();
154
155        let mut segments: Vec<Segment> = Vec::with_capacity(base_offsets.len());
156        let mut active: Option<Segment> = None;
157        for (i, base) in base_offsets.iter().enumerate() {
158            if i + 1 < base_offsets.len() {
159                let mut seg = Segment::open(&dir, *base)?;
160                seg.seal();
161                segments.push(seg);
162            } else {
163                active = Some(Segment::open_active(&dir, *base, config.validate_on_open)?);
164            }
165        }
166
167        let active = match active {
168            Some(s) => s,
169            None => Segment::create(&dir, 0)?,
170        };
171
172        let active_txn_index = TxnIndex::open(active.txn_index_path())?;
173        let epoch_checkpoint = LeaderEpochCheckpoint::open(active.leader_epoch_checkpoint_path())?;
174        // LSO starts at log_end_offset(); computed before moving `active`.
175        let lso = active.last_offset() + 1;
176
177        let config = std::sync::Arc::new(std::sync::RwLock::new(config));
178
179        Ok(Self {
180            dir,
181            config,
182            segments,
183            active: Some(active),
184            log_start_override: None,
185            lso,
186            pending: HashMap::new(),
187            active_txn_index,
188            epoch_checkpoint,
189        })
190    }
191
192    /// Directory this log was opened against. The broker's intra-broker
193    /// log-dir reassignment (KIP-113) reads this to determine the
194    /// current owning `log.dir` of a partition without re-implementing
195    /// the directory-layout convention.
196    #[must_use]
197    pub fn dir(&self) -> &Path {
198        &self.dir
199    }
200
201    /// First absolute offset still in the log.
202    #[must_use]
203    pub fn log_start_offset(&self) -> i64 {
204        let derived = if let Some(first) = self.segments.first() {
205            first.base_offset()
206        } else if let Some(active) = &self.active {
207            active.base_offset()
208        } else {
209            0
210        };
211        if let Some(o) = self.log_start_override {
212            return derived.max(o);
213        }
214        derived
215    }
216
217    /// Advance `log_start_offset` to `new_start`. Must be in
218    /// `[current log_start, log_end]`. Used by `trim_to_offset` for the
219    /// active-segment case and by the broker's `DeleteRecords` handler.
220    /// Does NOT physically truncate on-disk segments — only shifts the
221    /// in-memory start pointer.
222    ///
223    /// `new_start` must be non-negative.
224    ///
225    /// # Errors
226    ///
227    /// Returns [`LogError::InvalidArgument`] if `new_start` is negative.
228    pub fn set_log_start_offset(&mut self, new_start: i64) -> Result<(), LogError> {
229        if new_start < 0 {
230            return Err(LogError::InvalidArgument(
231                "set_log_start_offset: new_start must be >= 0".into(),
232            ));
233        }
234        self.log_start_override = Some(new_start);
235        Ok(())
236    }
237
238    /// Deprecated alias kept for existing test/feature-helpers callers.
239    #[deprecated(note = "use set_log_start_offset")]
240    #[cfg(any(test, feature = "test-helpers"))]
241    pub fn test_set_log_start_offset(&mut self, new_start: i64) -> Result<(), LogError> {
242        self.set_log_start_offset(new_start)
243    }
244
245    /// Reset the log to be empty starting at `new_base`. Drops every
246    /// segment + on-disk file and creates a fresh active segment at
247    /// `new_base`. Used by the replicator's `OFFSET_OUT_OF_RANGE`
248    /// recovery path when the follower has fallen behind the leader's
249    /// `log_start` — `truncate_to` can't help here because we need to
250    /// move `log_start` *forward* past where there is no local data.
251    pub fn reset_to(&mut self, new_base: i64) -> Result<(), LogError> {
252        if new_base < 0 {
253            return Err(LogError::OffsetMismatch {
254                expected: 0,
255                actual: new_base,
256            });
257        }
258
259        // Drop every sealed segment + its on-disk files.
260        while let Some(popped) = self.segments.pop() {
261            let base = popped.base_offset();
262            drop(popped);
263            let _ = fs::remove_file(name::log_path(&self.dir, base));
264            let _ = fs::remove_file(name::index_path(&self.dir, base));
265            let _ = fs::remove_file(name::timeindex_path(&self.dir, base));
266        }
267
268        // Drop the active segment + its on-disk files.
269        if let Some(active) = self.active.take() {
270            let base = active.base_offset();
271            drop(active);
272            let _ = fs::remove_file(name::log_path(&self.dir, base));
273            let _ = fs::remove_file(name::index_path(&self.dir, base));
274            let _ = fs::remove_file(name::timeindex_path(&self.dir, base));
275        }
276
277        // Clear the start override so the derived value takes over.
278        self.log_start_override = None;
279
280        let new_active = Segment::create(&self.dir, new_base)?;
281        self.active_txn_index = TxnIndex::open(new_active.txn_index_path())?;
282        self.pending.clear(); // reset_to is a hard reset (after divergence)
283        self.lso = new_active.last_offset() + 1; // = new_base (empty segment)
284        self.active = Some(new_active);
285        Ok(())
286    }
287
288    /// Next offset that `append` will assign.
289    #[must_use]
290    pub fn log_end_offset(&self) -> i64 {
291        if let Some(active) = &self.active {
292            return active.last_offset() + 1;
293        }
294        0
295    }
296
297    /// Total `.log` byte size across sealed and active segments. Read from
298    /// the segments' tracked logical size rather than a filesystem stat,
299    /// so it reflects buffered appends immediately and consistently across
300    /// platforms (a directory stat can lag an open, unflushed write handle
301    /// on some OSes).
302    #[must_use]
303    pub fn size_bytes(&self) -> u64 {
304        let sealed: u64 = self
305            .segments
306            .iter()
307            .map(super::segment::Segment::size_bytes)
308            .sum();
309        sealed + self.active.as_ref().map_or(0, Segment::size_bytes)
310    }
311
312    /// Last-Stable-Offset: the highest offset that consumers in
313    /// `read_committed` isolation may see. Advances only when no
314    /// transactions are in flight; held back at the first offset of any
315    /// open (uncommitted/unaborted) transactional batch.
316    #[must_use]
317    pub fn lso(&self) -> i64 {
318        self.lso
319    }
320
321    /// Close all segments. Drop runs automatically when `self` moves;
322    /// this method just names the operation explicitly.
323    pub fn close(self) {
324        drop(self);
325    }
326
327    /// Atomically swap the active `LogConfig`. The next retention/roll check
328    /// reads the new value; in-flight `append` calls hold the lock for
329    /// trivially short windows and will not see a half-applied config.
330    ///
331    /// Callable through `&self` (the `Arc<RwLock<…>>` wrapping lets us
332    /// mutate the inner value without an exclusive borrow on the `Log`).
333    pub fn set_config(&self, new: LogConfig) {
334        *self.config.write().unwrap() = new;
335    }
336
337    /// Snapshot the current config. Allocates a clone; cheap because
338    /// `LogConfig` is small and `Clone`.
339    #[must_use]
340    pub fn config_snapshot(&self) -> LogConfig {
341        self.config.read().unwrap().clone()
342    }
343
344    /// Return all aborted transactions from the active segment's
345    /// `.txnindex` whose offset range overlaps `[start, end)`.
346    ///
347    /// Only the active segment's index is consulted (older sealed
348    /// segments' `.txnindex` files are not loaded into
349    /// memory). The window `[fetch_offset, lso)` always falls within
350    /// the active segment in practice because LSO can only advance past
351    /// a committed/aborted marker, which lands in the same segment as
352    /// the corresponding transactional batches.
353    #[must_use]
354    pub fn aborted_in_range(&self, start: i64, end: i64) -> Vec<crate::txn_index::AbortedTxn> {
355        self.active_txn_index
356            .aborted_in_range(start, end)
357            .copied()
358            .collect()
359    }
360
361    /// Append a `RecordBatch`. The batch's `base_offset` is overwritten
362    /// by the log to be the next assigned offset; `last_offset_delta`
363    /// determines how many absolute offsets this batch consumes.
364    /// Returns the assigned `base_offset`.
365    pub fn append(&mut self, batch: &mut RecordBatch) -> Result<i64, LogError> {
366        let leader_epoch = batch.partition_leader_epoch;
367        let assigned_base = self.log_end_offset();
368        batch.base_offset = assigned_base;
369        self.append_preserving_offset(batch)?;
370        // Record epoch transition when the epoch is valid and exceeds the
371        // previously recorded epoch (or no epoch has been recorded yet).
372        if leader_epoch >= 0
373            && self
374                .epoch_checkpoint
375                .latest_epoch()
376                .is_none_or(|e| leader_epoch > e)
377        {
378            self.epoch_checkpoint.append(leader_epoch, assigned_base)?;
379        }
380        Ok(assigned_base)
381    }
382
383    /// Access the per-partition leader-epoch checkpoint.
384    #[must_use]
385    pub fn epoch_checkpoint(&self) -> &LeaderEpochCheckpoint {
386        &self.epoch_checkpoint
387    }
388
389    /// Append a `RecordBatch` whose `base_offset` is set by the caller.
390    ///
391    /// Unlike [`Log::append`], this does NOT overwrite `batch.base_offset`
392    /// — it is used by the broker's replicator to preserve the
393    /// leader-assigned offset on the follower's local log.
394    ///
395    /// `offset` must equal the log's current [`Log::log_end_offset`];
396    /// otherwise this returns
397    /// [`LogError::OffsetMismatch`]. On success, `batch.base_offset` is
398    /// set to `offset` (it should already match) before the batch is
399    /// written.
400    pub fn append_at(&mut self, batch: &mut RecordBatch, offset: i64) -> Result<(), LogError> {
401        let expected = self.log_end_offset();
402        if offset != expected {
403            return Err(LogError::OffsetMismatch {
404                expected,
405                actual: offset,
406            });
407        }
408        let leader_epoch = batch.partition_leader_epoch;
409        batch.base_offset = offset;
410        self.append_preserving_offset(batch)?;
411        // Mirror the leader-side epoch bookkeeping in [`Log::append`]: record the
412        // batch's leader epoch when it advances past the latest recorded epoch,
413        // so a follower's leader-epoch checkpoint tracks replicated epochs.
414        if leader_epoch >= 0
415            && self
416                .epoch_checkpoint
417                .latest_epoch()
418                .is_none_or(|e| leader_epoch > e)
419        {
420            self.epoch_checkpoint.append(leader_epoch, offset)?;
421        }
422        Ok(())
423    }
424
425    /// Internal helper shared by [`Log::append`] and [`Log::append_at`].
426    /// Performs segment-roll-if-needed, appends to the active segment, and
427    /// honors `config.flush_on_append` — but does NOT reassign
428    /// `batch.base_offset`. Callers are responsible for setting it first.
429    /// Also updates LSO and the active `.txnindex` based on batch attributes.
430    fn append_preserving_offset(&mut self, batch: &mut RecordBatch) -> Result<(), LogError> {
431        let (segment_bytes, index_interval_bytes, flush_on_append) = {
432            let cfg = self.config.read().unwrap();
433            (
434                cfg.segment_bytes,
435                cfg.index_interval_bytes,
436                cfg.flush_on_append,
437            )
438        };
439
440        let should_roll = match &self.active {
441            Some(seg) => seg.size_bytes() >= segment_bytes,
442            None => false,
443        };
444        if should_roll {
445            self.roll_active_segment()?;
446        }
447
448        let active = self
449            .active
450            .as_mut()
451            .expect("active segment must exist after Log::open");
452        active.append(batch, index_interval_bytes)?;
453
454        if flush_on_append {
455            active.flush()?;
456        }
457
458        // --- LSO tracking + .txnindex writes ---
459        let pid = batch.producer_id;
460        if batch.attributes.is_control_batch() {
461            // Parse the inner control record: key = (version: i16, type: i16) BE.
462            // type=0 → ABORT; type=1 → COMMIT.
463            let marker_type = batch
464                .records
465                .first()
466                .and_then(|r| r.key.as_deref())
467                .and_then(parse_control_marker_type);
468            if let Some(start) = self.pending.remove(&pid) {
469                let last = batch.base_offset + i64::from(batch.last_offset_delta);
470                if marker_type == Some(0)
471                /* ABORT */
472                {
473                    self.active_txn_index.append(AbortedTxn {
474                        start_offset: start,
475                        last_offset: last,
476                        producer_id: pid,
477                    })?;
478                }
479            }
480            // LSO can advance only when no pending txns remain.
481            if self.pending.is_empty() {
482                self.lso = self.log_end_offset();
483            }
484        } else if batch.attributes.is_transactional() && pid >= 0 {
485            // Record the first offset of this txn on this partition.
486            self.pending.entry(pid).or_insert(batch.base_offset);
487            // LSO stays where it is until commit/abort.
488        } else {
489            // Non-transactional batch. LSO advances only when no in-flight txns.
490            if self.pending.is_empty() {
491                self.lso = self.log_end_offset();
492            }
493        }
494
495        Ok(())
496    }
497
498    fn roll_active_segment(&mut self) -> Result<(), LogError> {
499        let new_base = self.log_end_offset();
500        let mut old = self
501            .active
502            .take()
503            .expect("active segment must exist before rolling");
504        old.seal();
505        self.segments.push(old);
506        let new_seg = Segment::create(&self.dir, new_base)?;
507        self.active_txn_index = TxnIndex::open(new_seg.txn_index_path())?;
508        self.active = Some(new_seg);
509        Ok(())
510    }
511
512    /// Read batches starting at `offset`, returning up to roughly
513    /// `max_bytes` of `.log` data. Walks sealed segments first, then the
514    /// active segment, so reads can span segment boundaries.
515    pub fn read(&self, offset: i64, max_bytes: usize) -> Result<ReadOutput, LogError> {
516        let log_start = self.log_start_offset();
517        let log_end = self.log_end_offset();
518        if offset < log_start {
519            return Err(LogError::OffsetTooLow {
520                requested: offset,
521                log_start,
522            });
523        }
524        if offset >= log_end {
525            return Ok(ReadOutput {
526                start_offset: log_end,
527                batches: Vec::new(),
528            });
529        }
530
531        let mut batches: Vec<RecordBatch> = Vec::new();
532        let mut current_offset = offset;
533        let mut remaining = max_bytes;
534
535        for seg in &self.segments {
536            if seg.last_offset() < current_offset {
537                continue;
538            }
539            let bs = seg.read(current_offset, remaining)?;
540            if !bs.is_empty() {
541                let consumed: usize = bs.iter().map(RecordBatch::encoded_len).sum();
542                remaining = remaining.saturating_sub(consumed);
543                let last = bs.last().expect("non-empty by branch");
544                current_offset = last.base_offset + i64::from(last.last_offset_delta) + 1;
545                batches.extend(bs);
546                if remaining == 0 {
547                    break;
548                }
549            }
550        }
551
552        if (remaining > 0 || batches.is_empty())
553            && let Some(active) = &self.active
554            && current_offset <= active.last_offset()
555        {
556            let bs = active.read(current_offset, remaining.max(1))?;
557            batches.extend(bs);
558        }
559
560        let start_offset = batches.first().map_or(offset, |b| b.base_offset);
561        Ok(ReadOutput {
562            start_offset,
563            batches,
564        })
565    }
566
567    /// Like [`Log::read`] but returns verbatim wire bytes (no decode), walking
568    /// sealed segments then the active segment. Includes only batches with
569    /// `base_offset < limit_offset`, up to roughly `max_bytes` (≥ one batch).
570    pub fn read_raw(
571        &self,
572        fetch_offset: i64,
573        limit_offset: i64,
574        max_bytes: usize,
575    ) -> Result<RawRead, LogError> {
576        let log_start = self.log_start_offset();
577        if fetch_offset < log_start {
578            return Err(LogError::OffsetTooLow {
579                requested: fetch_offset,
580                log_start,
581            });
582        }
583        if fetch_offset >= limit_offset {
584            return Ok(RawRead::empty(fetch_offset));
585        }
586
587        let mut chunks: Vec<Bytes> = Vec::new();
588        let mut start_offset = fetch_offset;
589        let mut current = fetch_offset;
590        let mut remaining = max_bytes;
591        let mut got_first = false;
592
593        for seg in &self.segments {
594            if seg.last_offset() < current {
595                continue;
596            }
597            let r: RawSegmentRead =
598                seg.read_raw(current, limit_offset, remaining.max(HEADER_LEN))?;
599            if !r.is_empty() {
600                if !got_first {
601                    start_offset = r.start_offset;
602                    got_first = true;
603                }
604                remaining = remaining.saturating_sub(r.bytes.len());
605                current = r.last_offset + 1;
606                chunks.push(r.bytes);
607                if remaining == 0 || current >= limit_offset {
608                    break;
609                }
610            }
611        }
612
613        if (remaining > 0 || !got_first)
614            && current < limit_offset
615            && let Some(active) = &self.active
616            && current <= active.last_offset()
617        {
618            let r = active.read_raw(current, limit_offset, remaining.max(HEADER_LEN))?;
619            if !r.is_empty() {
620                if !got_first {
621                    start_offset = r.start_offset;
622                }
623                chunks.push(r.bytes);
624            }
625        }
626
627        let bytes = match chunks.len() {
628            0 => Bytes::new(),
629            1 => chunks.pop().expect("len==1"),
630            _ => {
631                let total: usize = chunks.iter().map(Bytes::len).sum();
632                let mut b = BytesMut::with_capacity(total);
633                for c in &chunks {
634                    b.extend_from_slice(c);
635                }
636                b.freeze()
637            }
638        };
639        let total = bytes.len();
640        Ok(RawRead {
641            start_offset,
642            bytes,
643            total,
644        })
645    }
646
647    /// Truncate the log so no records at offset `>= offset` remain. Used
648    /// by replication / leader election.
649    pub fn truncate_to(&mut self, offset: i64) -> Result<(), LogError> {
650        let log_start = self.log_start_offset();
651        let log_end = self.log_end_offset();
652        if offset >= log_end {
653            return Ok(()); // nothing to truncate
654        }
655        if offset < log_start {
656            return Err(LogError::OffsetTooLow {
657                requested: offset,
658                log_start,
659            });
660        }
661
662        // Drop sealed segments whose base_offset >= offset.
663        while let Some(last_sealed) = self.segments.last() {
664            if last_sealed.base_offset() >= offset {
665                let popped = self.segments.pop().expect("non-empty by while-let");
666                let base = popped.base_offset();
667                drop(popped);
668                let _ = fs::remove_file(name::log_path(&self.dir, base));
669                let _ = fs::remove_file(name::index_path(&self.dir, base));
670                let _ = fs::remove_file(name::timeindex_path(&self.dir, base));
671            } else {
672                break;
673            }
674        }
675
676        // Drop the active segment if its base_offset >= offset.
677        if let Some(active) = &self.active
678            && active.base_offset() >= offset
679        {
680            let base = active.base_offset();
681            self.active = None;
682            let _ = fs::remove_file(name::log_path(&self.dir, base));
683            let _ = fs::remove_file(name::index_path(&self.dir, base));
684            let _ = fs::remove_file(name::timeindex_path(&self.dir, base));
685        }
686
687        // If no active segment, promote the last sealed one (if any) and
688        // truncate it in place. Otherwise, create a fresh one at `offset`.
689        if self.active.is_none() {
690            if let Some(mut seg) = self.segments.pop() {
691                let rel = u32::try_from(offset - seg.base_offset())
692                    .map_err(|_| LogError::BadSegmentName("offset overflow".into()))?;
693                seg.truncate_to_relative(rel)?;
694                self.active_txn_index = TxnIndex::open(seg.txn_index_path())?;
695                self.active = Some(seg);
696            } else {
697                let new_seg = Segment::create(&self.dir, offset)?;
698                self.active_txn_index = TxnIndex::open(new_seg.txn_index_path())?;
699                self.active = Some(new_seg);
700            }
701        } else if let Some(active) = self.active.as_mut()
702            && active.last_offset() >= offset
703        {
704            // The surviving active segment contains records at or past
705            // `offset`; truncate them in place.
706            let rel = u32::try_from(offset - active.base_offset())
707                .map_err(|_| LogError::BadSegmentName("offset overflow".into()))?;
708            active.truncate_to_relative(rel)?;
709            self.active_txn_index = TxnIndex::open(active.txn_index_path())?;
710        }
711        // After truncation, LSO can't exceed log_end_offset.
712        self.lso = self.lso.min(self.log_end_offset());
713        // Drop leader-epoch checkpoint entries for the truncated-away tail so
714        // latest_epoch()/end_offset_for_epoch() don't report epochs that no
715        // longer have records (mirrors Kafka's truncateFromEnd).
716        self.epoch_checkpoint
717            .truncate_from_end(self.log_end_offset())?;
718        Ok(())
719    }
720
721    /// Trim from the start of the log: drop every sealed segment whose
722    /// last offset is `< target`, advance `log_start_offset` if `target`
723    /// falls inside the active segment. Active segment is never deleted
724    /// by this call. Returns the resulting `log_start_offset`.
725    ///
726    /// `target` is clamped to `[0, log_end_offset()]`. Caller asks for
727    /// trim past LEO → trim to LEO.
728    ///
729    /// # Errors
730    ///
731    /// Returns `LogError::InvalidArgument` if `target < 0`.
732    pub fn trim_to_offset(&mut self, target: i64) -> Result<i64, LogError> {
733        if target < 0 {
734            return Err(LogError::InvalidArgument(
735                "trim_to_offset: target must be >= 0".into(),
736            ));
737        }
738        let leo = self.log_end_offset();
739        let target = target.min(leo);
740        let log_start = self.log_start_offset();
741        if target <= log_start {
742            return Ok(log_start);
743        }
744
745        // Drop sealed segments whose last record is < target. A sealed
746        // segment covers [base_offset, next_segment_base_offset). The
747        // "last offset" of a sealed segment equals `next_base - 1`
748        // where `next_base` is the next segment's `base_offset`
749        // (or, for the most-recent sealed segment, the active segment's
750        // `base_offset`).
751        let active_base = self.active.as_ref().map_or(leo, Segment::base_offset);
752        let next_bases: Vec<i64> = self
753            .segments
754            .iter()
755            .map(Segment::base_offset)
756            .skip(1)
757            .chain(std::iter::once(active_base))
758            .collect();
759
760        let mut to_drop: Vec<i64> = Vec::new();
761        for (seg, next_base) in self.segments.iter().zip(next_bases.iter()) {
762            if *next_base <= target {
763                to_drop.push(seg.base_offset());
764            } else {
765                break;
766            }
767        }
768
769        let drop_set: HashSet<i64> = to_drop.iter().copied().collect();
770        self.segments
771            .retain(|s| !drop_set.contains(&s.base_offset()));
772        for base in &to_drop {
773            let _ = retention::delete_segment_files(&self.dir, *base);
774        }
775
776        // If target falls inside the active segment (or between the first
777        // remaining sealed segment's base and `target`), advance the
778        // start override.
779        let new_log_start = self
780            .segments
781            .first()
782            .map_or(active_base, Segment::base_offset);
783        if target > new_log_start {
784            self.set_log_start_offset(target)?;
785        }
786        Ok(self.log_start_offset())
787    }
788
789    /// Periodic maintenance: apply time- and size-based retention to the
790    /// sealed segments. The active segment is never deleted, and if every
791    /// segment would otherwise be evicted we retain at least one.
792    /// (Active-roll-on-age is a placeholder per the plan; skip it.)
793    pub fn tick(&mut self, now: SystemTime) -> Result<(), LogError> {
794        // Tiered topics' segment lifecycle is owned by the RemoteLogManager.
795        if self.config.read().unwrap().remote_storage_enable {
796            return Ok(());
797        }
798        let sealed_refs: Vec<&Segment> = self.segments.iter().collect();
799        let active_size = self.active.as_ref().map_or(0, Segment::size_bytes);
800
801        let cfg_guard = self.config.read().unwrap();
802        let time_evict = retention::time_based_evict(&sealed_refs, &cfg_guard, now);
803        let size_evict = retention::size_based_evict(&sealed_refs, active_size, &cfg_guard);
804        drop(cfg_guard);
805
806        // Union preserving order: time first (oldest first), then size.
807        let mut to_evict: Vec<i64> = time_evict;
808        let mut seen: HashSet<i64> = to_evict.iter().copied().collect();
809        for base in size_evict {
810            if seen.insert(base) {
811                to_evict.push(base);
812            }
813        }
814
815        // Guard: never drop the only remaining segment. `total_segments`
816        // includes the active one.
817        let total_segments = self.segments.len() + usize::from(self.active.is_some());
818        if to_evict.len() >= total_segments {
819            to_evict.truncate(total_segments.saturating_sub(1));
820        }
821
822        let evict: HashSet<i64> = to_evict.iter().copied().collect();
823        self.segments.retain(|s| !evict.contains(&s.base_offset()));
824        for base in to_evict {
825            let _ = retention::delete_segment_files(&self.dir, base);
826        }
827        Ok(())
828    }
829
830    /// First absolute offset still present on this
831    /// broker's local disk (KIP-405). This delegates to
832    /// [`Log::log_start_offset`] — the two pointers co-advance.
833    #[must_use]
834    pub fn local_log_start_offset(&self) -> i64 {
835        self.log_start_offset()
836    }
837
838    /// Earliest local `(offset, record_timestamp)` whose record
839    /// timestamp is `>= target_ts`, searching sealed segments
840    /// oldest-first then the active segment. The first segment whose
841    /// `max_timestamp >= target_ts` holds the answer; the per-segment
842    /// helper does the index lookup + forward scan. `None` when no
843    /// local record qualifies (including an empty log).
844    #[must_use]
845    pub fn offset_for_timestamp(&self, target_ts: i64) -> Option<(i64, i64)> {
846        for seg in &self.segments {
847            if seg.max_timestamp() >= target_ts
848                && let Some(hit) = seg.offset_for_timestamp(target_ts)
849            {
850                return Some(hit);
851            }
852        }
853        if let Some(active) = &self.active
854            && active.max_timestamp() >= target_ts
855        {
856            return active.offset_for_timestamp(target_ts);
857        }
858        None
859    }
860
861    /// Offset and timestamp of the record carrying the partition's
862    /// largest timestamp, scanning sealed segments then the active
863    /// segment. Ties resolve to the earliest offset (the first segment,
864    /// and the first record within it, wins). Returns `None` when the
865    /// log holds no records.
866    #[must_use]
867    pub fn max_timestamp_offset_and_ts(&self) -> Option<(i64, i64)> {
868        let mut best: Option<(i64, i64)> = None; // (timestamp, offset)
869        let candidates = self.segments.iter().chain(self.active.as_ref());
870        for seg in candidates {
871            if let Some((offset, ts)) = seg.offset_of_max_timestamp()
872                && best.is_none_or(|(best_ts, _)| ts > best_ts)
873            {
874                best = Some((ts, offset));
875            }
876        }
877        best.map(|(ts, offset)| (offset, ts))
878    }
879
880    /// Offset of the record carrying the partition's largest timestamp,
881    /// or `log_start_offset()` when the log holds no records (KIP-734
882    /// `MAX_TIMESTAMP`).
883    #[must_use]
884    pub fn offset_of_max_timestamp(&self) -> i64 {
885        self.max_timestamp_offset_and_ts()
886            .map_or_else(|| self.log_start_offset(), |(offset, _)| offset)
887    }
888
889    /// Physically delete every sealed segment whose
890    /// `last_offset < target`, then advance `log_start_offset` to `target`
891    /// (KIP-405). The active segment is never touched. Returns the count of
892    /// segments removed; a no-op (returns `Ok(0)`) when
893    /// `target <= local_log_start_offset()`.
894    ///
895    /// The caller is responsible for verifying these segments are safely
896    /// in the remote tier (`CopySegmentFinished`) before invoking this;
897    /// `Log` enforces no tiered-storage invariants. See
898    /// `crates/broker/src/remote_log_manager.rs` for the production caller.
899    ///
900    /// # Errors
901    ///
902    /// Returns [`LogError::InvalidArgument`] if `target` is negative.
903    pub fn delete_local_segments_through(&mut self, target: i64) -> Result<usize, LogError> {
904        if target < 0 {
905            return Err(LogError::InvalidArgument(
906                "delete_local_segments_through: target must be >= 0".into(),
907            ));
908        }
909        if target <= self.local_log_start_offset() {
910            return Ok(0);
911        }
912
913        // Mirror `tierable_segments`: each sealed segment's last offset is
914        // `next.base_offset - 1`, where `next` is the next sealed segment
915        // or — for the most-recent sealed segment — the active segment.
916        let active_base = self
917            .active
918            .as_ref()
919            .map_or_else(|| self.log_end_offset(), Segment::base_offset);
920        let next_bases: Vec<i64> = self
921            .segments
922            .iter()
923            .map(Segment::base_offset)
924            .skip(1)
925            .chain(std::iter::once(active_base))
926            .collect();
927
928        let to_drop: Vec<i64> = self
929            .segments
930            .iter()
931            .zip(next_bases.iter())
932            .filter_map(|(seg, next_base)| {
933                let last = *next_base - 1;
934                (last < target).then(|| seg.base_offset())
935            })
936            .collect();
937
938        let removed = to_drop.len();
939        let drop_set: HashSet<i64> = to_drop.iter().copied().collect();
940        self.segments
941            .retain(|s| !drop_set.contains(&s.base_offset()));
942        for base in &to_drop {
943            let _ = retention::delete_segment_files(&self.dir, *base);
944        }
945
946        // Advance the (single) log-start pointer. `local_log_start_offset`
947        // delegates here, so the local floor moves in lockstep.
948        self.log_start_override = Some(target);
949
950        Ok(removed)
951    }
952
953    /// Describe every sealed segment for
954    /// tiered-storage offload (KIP-405). The active segment is never included — only
955    /// sealed segments are immutable and safe to copy.
956    ///
957    /// `last_offset` is derived from the next segment's `base_offset` (the
958    /// active segment's base for the most-recent sealed segment), so it is
959    /// correct even for segments loaded from disk without a tail scan.
960    /// `max_timestamp` falls back to `-1` (unknown) when the in-memory
961    /// value has not been populated.
962    #[must_use]
963    pub fn tierable_segments(&self) -> Vec<SegmentExport> {
964        // Sort the epoch entries once here rather than per-segment inside
965        // `epochs_for_range`.
966        let mut epoch_entries = self.epoch_checkpoint.entries().to_vec();
967        epoch_entries.sort_by_key(|e| e.start_offset);
968        let active_base = self
969            .active
970            .as_ref()
971            .map_or_else(|| self.log_end_offset(), Segment::base_offset);
972        let next_bases: Vec<i64> = self
973            .segments
974            .iter()
975            .map(Segment::base_offset)
976            .skip(1)
977            .chain(std::iter::once(active_base))
978            .collect();
979
980        self.segments
981            .iter()
982            .zip(next_bases)
983            .map(|(seg, next_base)| {
984                let base = seg.base_offset();
985                let last = next_base - 1;
986                let max_ts = seg.max_timestamp();
987                let txn = name::txnindex_path(&self.dir, base);
988                SegmentExport {
989                    base_offset: base,
990                    last_offset: last,
991                    max_timestamp: if max_ts == i64::MIN { -1 } else { max_ts },
992                    size_bytes: seg.size_bytes(),
993                    log_path: name::log_path(&self.dir, base),
994                    offset_index_path: name::index_path(&self.dir, base),
995                    time_index_path: name::timeindex_path(&self.dir, base),
996                    transaction_index_path: txn.exists().then_some(txn),
997                    leader_epochs: epochs_for_range(&epoch_entries, base, last),
998                }
999            })
1000            .collect()
1001    }
1002
1003    /// Run one compaction pass over the sealed segment list. No-op if
1004    /// fewer than 2 sealed segments exist (nothing to dedup yet).
1005    ///
1006    /// The active segment is never touched. Output is a single new
1007    /// sealed segment at the lowest input base offset, replacing all
1008    /// consumed sealed segments.
1009    pub fn compact(&mut self) -> Result<(), LogError> {
1010        if self.segments.is_empty() {
1011            return Ok(());
1012        }
1013
1014        let cfg_guard = self.config.read().unwrap();
1015        if cfg_guard.cleanup_policy != crate::CleanupPolicy::Compact {
1016            return Ok(());
1017        }
1018        let index_interval = cfg_guard.index_interval_bytes;
1019        drop(cfg_guard);
1020
1021        let consumed_bases: Vec<i64> = self.segments.iter().map(Segment::base_offset).collect();
1022
1023        // Borrow sealed segments to run map + rewrite (which open
1024        // additional file handles internally for reading). Then drop the
1025        // borrows and clear self.segments so the original segments'
1026        // file handles close before atomic_swap deletes/renames
1027        // (Windows requires no open handle on a file before remove/rename).
1028        let rewrite = {
1029            let sealed_refs: Vec<&Segment> = self.segments.iter().collect();
1030            let offset_map = crate::compact::build_offset_map(&sealed_refs)?;
1031            crate::compact::rewrite_segments(&self.dir, &sealed_refs, &offset_map, index_interval)?
1032        };
1033
1034        self.segments.clear();
1035        crate::compact::atomic_swap(&self.dir, &consumed_bases, &rewrite)?;
1036
1037        // open_active(validate=true) tail-scans the new .log to populate
1038        // last_offset + max_timestamp; then seal() flips the flag.
1039        let mut new_seg = Segment::open_active(&self.dir, rewrite.new_base_offset, true)?;
1040        new_seg.seal();
1041        self.segments.push(new_seg);
1042        Ok(())
1043    }
1044}
1045
1046/// Leader epochs whose coverage `[start_e, start_{e+1})` overlaps the
1047/// segment range `[base, last]`, returned as `(epoch, start_offset)` with
1048/// the start clamped up to `base` and ordered by offset. An epoch with no
1049/// recorded entries yields an empty result.
1050///
1051/// `sorted` must be ordered by `start_offset` ascending (the caller sorts
1052/// once and reuses the slice across segments).
1053fn epochs_for_range(
1054    sorted: &[crate::leader_epoch_checkpoint::EpochEntry],
1055    base: i64,
1056    last: i64,
1057) -> Vec<(i32, i64)> {
1058    let mut out = Vec::new();
1059    for (i, e) in sorted.iter().enumerate() {
1060        // Coverage of this epoch is [start_offset, next.start_offset).
1061        let end = sorted.get(i + 1).map_or(i64::MAX, |n| n.start_offset);
1062        if e.start_offset <= last && end > base {
1063            out.push((e.epoch, e.start_offset.max(base)));
1064        }
1065    }
1066    out
1067}
1068
1069/// Parse the control-marker type from the key of the first record in a
1070/// control batch. The key encodes `(version: i16, type: i16)` in
1071/// big-endian. Returns `Some(0)` for ABORT and `Some(1)` for COMMIT.
1072/// Returns `None` if the key is shorter than 4 bytes.
1073fn parse_control_marker_type(key: &[u8]) -> Option<i16> {
1074    if key.len() < 4 {
1075        return None;
1076    }
1077    let _version = i16::from_be_bytes([key[0], key[1]]);
1078    Some(i16::from_be_bytes([key[2], key[3]]))
1079}
1080
1081#[cfg(test)]
1082mod tests {
1083    use super::*;
1084    use crate::leader_epoch_checkpoint::EpochEntry;
1085    use assert2::assert;
1086    use bytes::Bytes;
1087    use crabka_protocol::records::{Attributes, Record};
1088    use tempfile::tempdir;
1089
1090    fn sample_batch(n: i32) -> RecordBatch {
1091        let mut b = RecordBatch {
1092            base_offset: 0, // overwritten by Log::append
1093            max_timestamp: 0,
1094            last_offset_delta: n - 1,
1095            ..RecordBatch::default()
1096        };
1097        for i in 0..n {
1098            b.records.push(Record {
1099                offset_delta: i,
1100                key: Some(Bytes::from(format!("k{i}"))),
1101                value: Some(Bytes::from(format!("v{i}"))),
1102                ..Default::default()
1103            });
1104        }
1105        b
1106    }
1107
1108    fn test_log() -> (tempfile::TempDir, Log) {
1109        let dir = tempdir().unwrap();
1110        let log = Log::open(dir.path(), LogConfig::default()).unwrap();
1111        (dir, log)
1112    }
1113
1114    fn test_batch_at(_off: i64) -> RecordBatch {
1115        // `Log::append` overwrites `base_offset`; one record per batch.
1116        let mut b = RecordBatch {
1117            base_offset: 0,
1118            base_timestamp: 1_000,
1119            max_timestamp: 1_000,
1120            last_offset_delta: 0,
1121            ..RecordBatch::default()
1122        };
1123        b.records.push(Record {
1124            offset_delta: 0,
1125            value: Some(Bytes::from("v")),
1126            ..Default::default()
1127        });
1128        b
1129    }
1130
1131    #[test]
1132    fn log_read_raw_spans_and_is_byte_exact() {
1133        let (dir, mut log) = test_log();
1134        let mut wire = bytes::BytesMut::new();
1135        for off in 0..4i64 {
1136            let mut b = test_batch_at(off);
1137            log.append(&mut b).unwrap();
1138            b.encode(&mut wire).unwrap();
1139        }
1140        let wire = wire.freeze();
1141        let log_end = log.log_end_offset();
1142        let r = log.read_raw(0, log_end, 10 * 1024 * 1024).unwrap();
1143        assert!(r.start_offset == 0);
1144        assert!(r.total == wire.len());
1145        assert!(&r.bytes[..] == &wire[..]);
1146        drop(dir);
1147    }
1148
1149    #[test]
1150    fn log_read_raw_spans_multiple_segments() {
1151        // A tiny `segment_bytes` forces a roll partway through, so the
1152        // read must walk at least one sealed segment AND the active
1153        // segment — exercising the multi-chunk `BytesMut` concat path
1154        // that `log_read_raw_spans_and_is_byte_exact` (default ~1 GiB
1155        // segments) never reaches.
1156        let dir = tempdir().unwrap();
1157        let config = LogConfig {
1158            segment_bytes: 100, // tiny: roll after roughly each batch
1159            ..LogConfig::default()
1160        };
1161        let mut log = Log::open(dir.path(), config).unwrap();
1162
1163        let n: i64 = 6;
1164        let mut wire = bytes::BytesMut::new();
1165        let mut expected_bases = Vec::new();
1166        for off in 0..n {
1167            let mut b = test_batch_at(off);
1168            let base = log.append(&mut b).unwrap();
1169            expected_bases.push(base);
1170            b.encode(&mut wire).unwrap();
1171        }
1172        let wire = wire.freeze();
1173
1174        // The roll must actually have happened: at least one sealed
1175        // segment plus the active segment.
1176        assert!(
1177            !log.segments.is_empty(),
1178            "expected >=1 sealed segment (segment roll); got 0"
1179        );
1180        assert!(log.active.is_some());
1181
1182        let log_end = log.log_end_offset();
1183        let r = log.read_raw(0, log_end, 10 * 1024 * 1024).unwrap();
1184        assert!(r.start_offset == 0);
1185        assert!(r.total == wire.len());
1186        assert!(
1187            &r.bytes[..] == &wire[..],
1188            "raw bytes must be byte-exact across the segment seam"
1189        );
1190
1191        // Decode back to N batches with the expected base offsets.
1192        let mut cur: &[u8] = &r.bytes;
1193        let mut bases = Vec::new();
1194        while !cur.is_empty() {
1195            let b = crabka_protocol::records::RecordBatch::decode(&mut cur).unwrap();
1196            bases.push(b.base_offset);
1197        }
1198        assert!(bases == expected_bases);
1199        drop(dir);
1200    }
1201
1202    #[test]
1203    fn open_empty_dir_creates_first_segment() {
1204        let dir = tempdir().unwrap();
1205        let log = Log::open(dir.path(), LogConfig::default()).unwrap();
1206        assert!(log.log_start_offset() == 0);
1207        assert!(log.log_end_offset() == 0);
1208        log.close();
1209    }
1210
1211    #[test]
1212    fn dir_returns_open_path() {
1213        // The broker's KIP-113 move machinery reads this back to
1214        // determine a partition's current owning `log.dir` without
1215        // re-implementing the directory-layout convention.
1216        let dir = tempdir().unwrap();
1217        let log = Log::open(dir.path(), LogConfig::default()).unwrap();
1218        assert!(log.dir() == dir.path());
1219    }
1220
1221    #[test]
1222    fn open_creates_log_file() {
1223        let dir = tempdir().unwrap();
1224        let log = Log::open(dir.path(), LogConfig::default()).unwrap();
1225        drop(log);
1226        let log_path = dir.path().join("00000000000000000000.log");
1227        assert!(log_path.exists());
1228    }
1229
1230    #[test]
1231    fn append_assigns_monotonic_offsets() {
1232        let dir = tempdir().unwrap();
1233        let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1234        let mut b1 = sample_batch(3);
1235        let mut b2 = sample_batch(2);
1236        assert!(log.append(&mut b1).unwrap() == 0);
1237        assert!(log.append(&mut b2).unwrap() == 3);
1238        assert!(log.log_end_offset() == 5);
1239    }
1240
1241    #[test]
1242    fn append_at_matching_offset_preserves_caller_offset() {
1243        let dir = tempdir().unwrap();
1244        let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1245        let mut b = sample_batch(3);
1246        // Pretend the caller (a replicator) already knows the leader's
1247        // assigned offset for this batch is 0.
1248        log.append_at(&mut b, 0).unwrap();
1249        assert!(b.base_offset == 0);
1250        assert!(log.log_end_offset() == 3);
1251
1252        let mut b2 = sample_batch(2);
1253        log.append_at(&mut b2, 3).unwrap();
1254        assert!(b2.base_offset == 3);
1255        assert!(log.log_end_offset() == 5);
1256    }
1257
1258    #[test]
1259    fn append_at_with_mismatched_offset_errors() {
1260        let dir = tempdir().unwrap();
1261        let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1262        let mut b = sample_batch(2);
1263        let err = log.append_at(&mut b, 7).unwrap_err();
1264        assert!(matches!(
1265            err,
1266            LogError::OffsetMismatch {
1267                expected: 0,
1268                actual: 7
1269            }
1270        ));
1271        // Failure must not advance the log.
1272        assert!(log.log_end_offset() == 0);
1273    }
1274
1275    #[test]
1276    fn append_then_read_back_in_order() {
1277        let dir = tempdir().unwrap();
1278        let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1279        for _ in 0..3 {
1280            let mut b = sample_batch(2);
1281            log.append(&mut b).unwrap();
1282        }
1283        let out = log.read(0, usize::MAX).unwrap();
1284        assert!(out.batches.len() == 3);
1285        assert!(out.start_offset == 0);
1286    }
1287
1288    #[test]
1289    fn read_offset_too_low_errors() {
1290        let dir = tempdir().unwrap();
1291        let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1292        let mut b = sample_batch(2);
1293        log.append(&mut b).unwrap();
1294        assert!(matches!(
1295            log.read(-1, 1024),
1296            Err(LogError::OffsetTooLow { .. })
1297        ));
1298    }
1299
1300    #[test]
1301    fn read_at_log_end_returns_empty() {
1302        let dir = tempdir().unwrap();
1303        let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1304        let mut b = sample_batch(2);
1305        log.append(&mut b).unwrap();
1306        let out = log.read(log.log_end_offset(), 1024).unwrap();
1307        assert!(out.batches.is_empty());
1308    }
1309
1310    #[test]
1311    fn truncate_to_drops_later_records() {
1312        let dir = tempdir().unwrap();
1313        let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1314        let mut b1 = sample_batch(3);
1315        let mut b2 = sample_batch(2);
1316        log.append(&mut b1).unwrap();
1317        log.append(&mut b2).unwrap();
1318        assert!(log.log_end_offset() == 5);
1319        log.truncate_to(3).unwrap();
1320        // First batch (offsets 0..=2) survives; last_offset == 2, end == 3.
1321        assert!(log.log_end_offset() == 3);
1322    }
1323
1324    #[test]
1325    fn truncate_to_log_end_is_noop() {
1326        let dir = tempdir().unwrap();
1327        let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1328        let mut b = sample_batch(2);
1329        log.append(&mut b).unwrap();
1330        let before = log.log_end_offset();
1331        log.truncate_to(before + 100).unwrap();
1332        assert!(log.log_end_offset() == before);
1333    }
1334
1335    #[test]
1336    fn open_recovers_partial_trailing_batch() {
1337        let dir = tempdir().unwrap();
1338        {
1339            let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1340            let mut b1 = sample_batch(3);
1341            let mut b2 = sample_batch(2);
1342            log.append(&mut b1).unwrap();
1343            log.append(&mut b2).unwrap();
1344        }
1345        // Append 10 bytes of garbage to the .log file.
1346        let log_path = dir.path().join("00000000000000000000.log");
1347        let mut f = std::fs::OpenOptions::new()
1348            .append(true)
1349            .open(&log_path)
1350            .unwrap();
1351        std::io::Write::write_all(&mut f, &[0xAB; 10]).unwrap();
1352        f.sync_data().unwrap();
1353        drop(f);
1354        let log = Log::open(dir.path(), LogConfig::default()).unwrap();
1355        assert!(log.log_end_offset() == 5);
1356    }
1357
1358    #[test]
1359    fn tick_with_no_retention_is_noop() {
1360        let dir = tempdir().unwrap();
1361        let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1362        let mut b1 = sample_batch(2);
1363        let mut b2 = sample_batch(3);
1364        log.append(&mut b1).unwrap();
1365        log.append(&mut b2).unwrap();
1366        let before = log.log_end_offset();
1367        log.tick(SystemTime::now()).unwrap();
1368        assert!(log.log_end_offset() == before);
1369    }
1370
1371    #[test]
1372    fn tick_never_deletes_only_segment() {
1373        use std::time::Duration;
1374        let dir = tempdir().unwrap();
1375        let config = LogConfig {
1376            retention_ms: Some(Duration::from_secs(1)),
1377            retention_bytes: Some(0),
1378            ..LogConfig::default()
1379        };
1380        let mut log = Log::open(dir.path(), config).unwrap();
1381        let mut b1 = sample_batch(2);
1382        log.append(&mut b1).unwrap();
1383        // Advance "now" 30 days into the future.
1384        let now = SystemTime::now() + Duration::from_hours(30 * 24);
1385        log.tick(now).unwrap();
1386        assert!(log.log_end_offset() == 2);
1387    }
1388
1389    #[test]
1390    fn segment_rolls_when_bytes_exceeded() {
1391        let dir = tempdir().unwrap();
1392        let config = LogConfig {
1393            segment_bytes: 200, // tiny so we roll fast
1394            ..LogConfig::default()
1395        };
1396        let mut log = Log::open(dir.path(), config).unwrap();
1397        for _ in 0..5 {
1398            let mut b = sample_batch(2);
1399            log.append(&mut b).unwrap();
1400        }
1401        // Multiple .log files should exist now.
1402        let log_files: Vec<_> = std::fs::read_dir(dir.path())
1403            .unwrap()
1404            .filter_map(Result::ok)
1405            .filter(|e| e.path().extension().and_then(|s| s.to_str()) == Some("log"))
1406            .collect();
1407        assert!(
1408            log_files.len() >= 2,
1409            "expected segment roll; got {} .log files",
1410            log_files.len()
1411        );
1412    }
1413
1414    // ---- helpers for transactional tests ----
1415
1416    /// A transactional (non-control) batch for the given pid/epoch containing `values`.
1417    fn transactional_batch(pid: i64, epoch: i16, values: &[&str]) -> RecordBatch {
1418        let last_offset_delta = i32::try_from(values.len()).unwrap() - 1;
1419        let mut records = Vec::new();
1420        for (i, v) in values.iter().enumerate() {
1421            records.push(Record {
1422                offset_delta: i32::try_from(i).unwrap(),
1423                value: Some(Bytes::from(v.to_string())),
1424                ..Default::default()
1425            });
1426        }
1427        RecordBatch {
1428            base_offset: 0, // overwritten by Log::append
1429            last_offset_delta,
1430            producer_id: pid,
1431            producer_epoch: epoch,
1432            attributes: Attributes::default().with_transactional(true),
1433            records,
1434            ..RecordBatch::default()
1435        }
1436    }
1437
1438    /// Build a 4-byte control-marker key: (version=0: i16, `marker_type`: i16) BE.
1439    fn control_key(marker_type: i16) -> Bytes {
1440        let mut buf = [0u8; 4];
1441        buf[0..2].copy_from_slice(&0i16.to_be_bytes()); // version = 0
1442        buf[2..4].copy_from_slice(&marker_type.to_be_bytes());
1443        Bytes::from(buf.to_vec())
1444    }
1445
1446    /// A commit control batch (`marker_type=1`) for the given pid/epoch.
1447    /// Offsets are rewritten by `Log::append`.
1448    fn commit_marker(pid: i64, epoch: i16) -> RecordBatch {
1449        RecordBatch {
1450            base_offset: 0,
1451            last_offset_delta: 0,
1452            producer_id: pid,
1453            producer_epoch: epoch,
1454            attributes: Attributes::default()
1455                .with_transactional(true)
1456                .with_control(true),
1457            records: vec![Record {
1458                offset_delta: 0,
1459                key: Some(control_key(1 /* COMMIT */)),
1460                ..Default::default()
1461            }],
1462            ..RecordBatch::default()
1463        }
1464    }
1465
1466    /// An abort control batch (`marker_type=0`) for the given pid/epoch.
1467    /// Offsets are rewritten by `Log::append`.
1468    fn abort_marker(pid: i64, epoch: i16) -> RecordBatch {
1469        RecordBatch {
1470            base_offset: 0,
1471            last_offset_delta: 0,
1472            producer_id: pid,
1473            producer_epoch: epoch,
1474            attributes: Attributes::default()
1475                .with_transactional(true)
1476                .with_control(true),
1477            records: vec![Record {
1478                offset_delta: 0,
1479                key: Some(control_key(0 /* ABORT */)),
1480                ..Default::default()
1481            }],
1482            ..RecordBatch::default()
1483        }
1484    }
1485
1486    // ---- transactional LSO / txnindex tests ----
1487
1488    #[test]
1489    fn transactional_batch_holds_lso() {
1490        let dir = tempdir().unwrap();
1491        let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1492        // First, a non-txn batch — LSO advances past it.
1493        let mut b0 = sample_batch(1);
1494        log.append(&mut b0).unwrap();
1495        assert!(log.lso() == log.log_end_offset());
1496
1497        // Now an in-flight txn batch — LSO stays.
1498        let mut b1 = transactional_batch(1000, 0, &["a", "b"]); // pid=1000 epoch=0
1499        let old_lso = log.lso();
1500        log.append(&mut b1).unwrap();
1501        assert!(
1502            log.lso() == old_lso,
1503            "LSO must not advance while txn in flight"
1504        );
1505
1506        // Commit marker — LSO catches up.
1507        let mut commit = commit_marker(1000, 0);
1508        log.append(&mut commit).unwrap();
1509        assert!(log.lso() == log.log_end_offset());
1510    }
1511
1512    #[test]
1513    fn abort_marker_writes_txnindex_entry() {
1514        let dir = tempdir().unwrap();
1515        let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1516        let mut t = transactional_batch(1000, 0, &["a", "b", "c"]);
1517        log.append(&mut t).unwrap();
1518
1519        let mut a = abort_marker(1000, 0);
1520        log.append(&mut a).unwrap();
1521
1522        let idx = TxnIndex::open(dir.path().join("00000000000000000000.txnindex")).unwrap();
1523        let entries = idx.entries();
1524        assert!(entries.len() == 1);
1525        assert!(entries[0].producer_id == 1000);
1526        // Txn batch was the first append: start_offset = 0.
1527        assert!(entries[0].start_offset == 0);
1528        // last_offset = abort marker's base_offset + last_offset_delta = 3 + 0 = 3.
1529        // (The 3-record txn batch occupies offsets 0-2; the marker lands at offset 3.)
1530        assert!(entries[0].last_offset == 3);
1531    }
1532
1533    #[test]
1534    fn lso_held_by_remaining_producer_after_partial_commit() {
1535        use tempfile::TempDir;
1536        let dir = TempDir::new().unwrap();
1537        let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1538
1539        // Open two producers' transactions in parallel.
1540        let mut t1 = transactional_batch(1000, 0, &["a", "b"]);
1541        log.append(&mut t1).unwrap();
1542        let mut t2 = transactional_batch(2000, 0, &["c"]);
1543        log.append(&mut t2).unwrap();
1544        let lso_after_open = log.lso();
1545
1546        // Commit producer 1000. LSO must still be held back by 2000.
1547        let mut c1 = commit_marker(1000, 0);
1548        log.append(&mut c1).unwrap();
1549        assert!(log.lso() == lso_after_open, "LSO held by producer 2000");
1550
1551        // Commit producer 2000. LSO advances to log_end_offset.
1552        let mut c2 = commit_marker(2000, 0);
1553        log.append(&mut c2).unwrap();
1554        assert!(log.lso() == log.log_end_offset());
1555    }
1556
1557    fn sample_batch_with_epoch(n: i32, epoch: i32) -> RecordBatch {
1558        let mut b = sample_batch(n);
1559        b.partition_leader_epoch = epoch;
1560        b
1561    }
1562
1563    #[test]
1564    fn append_records_epoch_transition() {
1565        use tempfile::TempDir;
1566        let dir = TempDir::new().unwrap();
1567        let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1568        let mut b = sample_batch_with_epoch(3, 0);
1569        log.append(&mut b).unwrap();
1570        let mut b2 = sample_batch_with_epoch(2, 1); // 2 records at epoch 1
1571        log.append(&mut b2).unwrap();
1572        assert!(
1573            log.epoch_checkpoint().entries()
1574                == &[
1575                    EpochEntry {
1576                        epoch: 0,
1577                        start_offset: 0
1578                    },
1579                    EpochEntry {
1580                        epoch: 1,
1581                        start_offset: 3
1582                    }
1583                ]
1584        );
1585    }
1586
1587    #[test]
1588    fn truncate_to_drops_stale_epoch_checkpoint_entries() {
1589        use tempfile::TempDir;
1590        let dir = TempDir::new().unwrap();
1591        let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1592        // Epoch 1 at offsets 0..3, then epoch 7 starting at offset 3.
1593        let mut b1 = sample_batch_with_epoch(3, 1);
1594        log.append(&mut b1).unwrap();
1595        let epoch7_start = log.log_end_offset();
1596        let mut b2 = sample_batch_with_epoch(2, 7);
1597        log.append(&mut b2).unwrap();
1598        assert!(log.epoch_checkpoint().latest_epoch() == Some(7));
1599
1600        // Truncate away the entire epoch-7 tail.
1601        log.truncate_to(epoch7_start).unwrap();
1602
1603        assert!(log.epoch_checkpoint().latest_epoch() == Some(1));
1604        let leo = log.log_end_offset();
1605        assert!(log.epoch_checkpoint().end_offset_for_epoch(7, leo) == -1);
1606        assert!(log.epoch_checkpoint().end_offset_for_epoch(1, leo) == leo);
1607    }
1608
1609    #[test]
1610    fn set_config_swaps_active_config() {
1611        let dir = tempdir().expect("tempdir");
1612        let log = Log::open(
1613            dir.path(),
1614            LogConfig {
1615                retention_ms: Some(std::time::Duration::from_mins(1)),
1616                ..LogConfig::default()
1617            },
1618        )
1619        .expect("open");
1620        log.set_config(LogConfig {
1621            retention_ms: Some(std::time::Duration::from_mins(2)),
1622            ..LogConfig::default()
1623        });
1624        assert!(log.config_snapshot().retention_ms == Some(std::time::Duration::from_mins(2)));
1625    }
1626
1627    #[test]
1628    fn trim_to_offset_drops_old_segments() {
1629        let dir = tempdir().expect("tempdir");
1630        let mut log = Log::open(
1631            dir.path(),
1632            LogConfig {
1633                segment_bytes: 200, // small so we roll fast
1634                ..LogConfig::default()
1635            },
1636        )
1637        .expect("open");
1638        // Append 30 records to force multiple sealed segments.
1639        for _ in 0..30 {
1640            let mut b = sample_batch(1);
1641            log.append(&mut b).expect("append");
1642        }
1643        let leo = log.log_end_offset();
1644        let new_start = log.trim_to_offset(15).expect("trim");
1645        // Trim clamps to next segment boundary <= target; new_start may
1646        // be less than 15 if 15 falls inside a sealed segment that we
1647        // can't drop without losing in-range records. LEO is unaffected.
1648        assert!(new_start <= 15);
1649        assert!(log.log_end_offset() == leo);
1650        // If target landed inside the active segment, log_start advanced
1651        // exactly to target. Otherwise it advanced to a sealed boundary.
1652        assert!(log.log_start_offset() >= 0);
1653    }
1654
1655    #[test]
1656    fn trim_to_offset_clamps_to_leo() {
1657        let dir = tempdir().expect("tempdir");
1658        let mut log = Log::open(dir.path(), LogConfig::default()).expect("open");
1659        for _ in 0..3 {
1660            let mut b = sample_batch(1);
1661            log.append(&mut b).expect("append");
1662        }
1663        let leo = log.log_end_offset();
1664        let new_start = log.trim_to_offset(999).expect("trim");
1665        // Asking to trim past LEO means trim to LEO.
1666        assert!(new_start == leo);
1667    }
1668
1669    #[test]
1670    fn trim_to_offset_rejects_negative() {
1671        let dir = tempdir().expect("tempdir");
1672        let mut log = Log::open(dir.path(), LogConfig::default()).expect("open");
1673        assert!(log.trim_to_offset(-5).is_err());
1674    }
1675
1676    #[test]
1677    fn trim_to_offset_idempotent_at_or_below_log_start() {
1678        let dir = tempdir().expect("tempdir");
1679        let mut log = Log::open(dir.path(), LogConfig::default()).expect("open");
1680        for _ in 0..3 {
1681            let mut b = sample_batch(1);
1682            log.append(&mut b).expect("append");
1683        }
1684        // Trim to 0 on a fresh log → no change.
1685        let r = log.trim_to_offset(0).expect("trim");
1686        assert!(r == log.log_start_offset());
1687    }
1688
1689    fn keyed_batch(base: i64, items: &[(i32, &[u8], &[u8])]) -> RecordBatch {
1690        let records: Vec<Record> = items
1691            .iter()
1692            .map(|(d, k, v)| Record {
1693                offset_delta: *d,
1694                key: Some(Bytes::copy_from_slice(k)),
1695                value: Some(Bytes::copy_from_slice(v)),
1696                ..Default::default()
1697            })
1698            .collect();
1699        let last_delta = items.iter().map(|(d, _, _)| *d).max().unwrap_or(0);
1700        RecordBatch {
1701            base_offset: base,
1702            last_offset_delta: last_delta,
1703            max_timestamp: 0,
1704            records,
1705            ..RecordBatch::default()
1706        }
1707    }
1708
1709    #[test]
1710    fn compact_no_op_when_only_one_segment() {
1711        let dir = tempdir().unwrap();
1712        let cfg = LogConfig {
1713            cleanup_policy: crate::CleanupPolicy::Compact,
1714            ..Default::default()
1715        };
1716        let mut log = Log::open(dir.path(), cfg).unwrap();
1717        let mut b = keyed_batch(0, &[(0, b"k1", b"v1")]);
1718        log.append(&mut b).unwrap();
1719        // Only the active segment exists; sealed list is empty.
1720        log.compact().unwrap();
1721        assert!(log.log_end_offset() == 1);
1722    }
1723
1724    #[test]
1725    fn compact_dedupes_sealed_segments_keeps_active_intact() {
1726        let dir = tempdir().unwrap();
1727        let cfg = LogConfig {
1728            cleanup_policy: crate::CleanupPolicy::Compact,
1729            segment_bytes: 256, // force rolls
1730            ..Default::default()
1731        };
1732        let mut log = Log::open(dir.path(), cfg).unwrap();
1733
1734        // Write 3 sealed segments, each with one record under "k1".
1735        for i in 0..3 {
1736            let v = format!("v{i}");
1737            let mut b = keyed_batch(0, &[(0, b"k1", v.as_bytes())]);
1738            log.append(&mut b).unwrap();
1739            // Roll the active segment by forcing a tick or a large pad batch.
1740            // Easiest: call set_segment_bytes or rely on the small segment_bytes.
1741        }
1742        // Add one more append to ensure the last write is in a fresh active
1743        // segment (not part of what compaction touches).
1744        let mut b = keyed_batch(0, &[(0, b"active-key", b"active-value")]);
1745        log.append(&mut b).unwrap();
1746
1747        let active_leo_before = log.log_end_offset();
1748        log.compact().unwrap();
1749        assert!(
1750            log.log_end_offset() == active_leo_before,
1751            "compaction must not change LEO"
1752        );
1753
1754        // After compaction: read everything, assert only the newest k1 plus
1755        // the active "active-key" survive.
1756        let out = log.read(0, 1024 * 1024).unwrap();
1757        let all_records: Vec<_> = out.batches.iter().flat_map(|b| b.records.iter()).collect();
1758        let keys: Vec<&[u8]> = all_records
1759            .iter()
1760            .map(|r| r.key.as_ref().unwrap().as_ref())
1761            .collect();
1762        assert!(keys.contains(&b"k1".as_ref()), "k1 must survive as newest");
1763        assert!(
1764            keys.contains(&b"active-key".as_ref()),
1765            "active segment record must survive"
1766        );
1767    }
1768
1769    #[test]
1770    fn tierable_segments_excludes_active_and_reports_paths() {
1771        let dir = tempdir().unwrap();
1772        let config = LogConfig {
1773            segment_bytes: 200, // small so we roll fast
1774            ..LogConfig::default()
1775        };
1776        let mut log = Log::open(dir.path(), config).unwrap();
1777        for _ in 0..10 {
1778            let mut b = sample_batch(2);
1779            log.append(&mut b).unwrap();
1780        }
1781        let sealed_count = std::fs::read_dir(dir.path())
1782            .unwrap()
1783            .filter_map(Result::ok)
1784            .filter(|e| e.path().extension().and_then(|s| s.to_str()) == Some("log"))
1785            .count()
1786            - 1; // minus the active segment's .log
1787
1788        let exports = log.tierable_segments();
1789        assert!(
1790            exports.len() == sealed_count,
1791            "one export per sealed segment"
1792        );
1793
1794        let active_base = log.log_end_offset(); // not literally, but exports must be below it
1795        let mut prev_last = -1;
1796        for ex in &exports {
1797            assert!(ex.log_path.exists(), "log file present: {:?}", ex.log_path);
1798            assert!(ex.offset_index_path.exists());
1799            assert!(ex.time_index_path.exists());
1800            assert!(ex.last_offset >= ex.base_offset);
1801            assert!(ex.base_offset > prev_last, "segments are offset-ordered");
1802            prev_last = ex.last_offset;
1803            assert!(
1804                ex.last_offset < active_base,
1805                "sealed segments end before the log end"
1806            );
1807        }
1808    }
1809
1810    #[test]
1811    fn tierable_segments_empty_for_single_active_segment() {
1812        let dir = tempdir().unwrap();
1813        let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1814        let mut b = sample_batch(3);
1815        log.append(&mut b).unwrap();
1816        // No roll happened: the only segment is active and never tierable.
1817        assert!(log.tierable_segments().is_empty());
1818    }
1819
1820    #[test]
1821    fn tierable_segments_last_offset_matches_next_base() {
1822        let dir = tempdir().unwrap();
1823        let config = LogConfig {
1824            segment_bytes: 200,
1825            ..LogConfig::default()
1826        };
1827        let mut log = Log::open(dir.path(), config).unwrap();
1828        for _ in 0..8 {
1829            let mut b = sample_batch(2);
1830            log.append(&mut b).unwrap();
1831        }
1832        let exports = log.tierable_segments();
1833        // Each sealed segment's last_offset is exactly one below the next
1834        // segment's base — contiguous coverage with no gaps.
1835        for pair in exports.windows(2) {
1836            assert!(pair[0].last_offset + 1 == pair[1].base_offset);
1837        }
1838    }
1839
1840    #[test]
1841    fn tierable_segments_carry_leader_epochs() {
1842        let dir = tempdir().unwrap();
1843        let config = LogConfig {
1844            segment_bytes: 200,
1845            ..LogConfig::default()
1846        };
1847        let mut log = Log::open(dir.path(), config).unwrap();
1848        // epoch 0 for the first few, then epoch 1.
1849        for _ in 0..4 {
1850            let mut b = sample_batch_with_epoch(2, 0);
1851            log.append(&mut b).unwrap();
1852        }
1853        for _ in 0..4 {
1854            let mut b = sample_batch_with_epoch(2, 1);
1855            log.append(&mut b).unwrap();
1856        }
1857        let exports = log.tierable_segments();
1858        assert!(!exports.is_empty());
1859        // Every export carries at least one epoch, and each recorded start
1860        // offset is clamped to >= the segment base.
1861        for ex in &exports {
1862            assert!(!ex.leader_epochs.is_empty(), "export has leader epochs");
1863            for (_epoch, start) in &ex.leader_epochs {
1864                assert!(*start >= ex.base_offset);
1865                assert!(*start <= ex.last_offset);
1866            }
1867        }
1868    }
1869
1870    #[test]
1871    fn epochs_for_range_clamps_and_filters() {
1872        use crate::leader_epoch_checkpoint::EpochEntry;
1873        let entries = vec![
1874            EpochEntry {
1875                epoch: 0,
1876                start_offset: 0,
1877            },
1878            EpochEntry {
1879                epoch: 1,
1880                start_offset: 50,
1881            },
1882            EpochEntry {
1883                epoch: 2,
1884                start_offset: 100,
1885            },
1886        ];
1887        // Segment [60, 90] sits entirely in epoch 1.
1888        assert!(epochs_for_range(&entries, 60, 90) == vec![(1, 60)]);
1889        // Segment [40, 60] straddles epoch 0 (->clamped to 40) and epoch 1.
1890        assert!(epochs_for_range(&entries, 40, 60) == vec![(0, 40), (1, 50)]);
1891        // Segment [0, 200] covers all three.
1892        assert!(epochs_for_range(&entries, 0, 200) == vec![(0, 0), (1, 50), (2, 100)]);
1893        // No entries -> empty.
1894        assert!(epochs_for_range(&[], 0, 100).is_empty());
1895    }
1896
1897    #[test]
1898    fn compact_is_idempotent() {
1899        let dir = tempdir().unwrap();
1900        let cfg = LogConfig {
1901            cleanup_policy: crate::CleanupPolicy::Compact,
1902            segment_bytes: 256,
1903            ..Default::default()
1904        };
1905        let mut log = Log::open(dir.path(), cfg).unwrap();
1906        for i in 0..3 {
1907            let v = format!("v{i}");
1908            let mut b = keyed_batch(0, &[(0, b"k1", v.as_bytes())]);
1909            log.append(&mut b).unwrap();
1910        }
1911        let mut b = keyed_batch(0, &[(0, b"active", b"x")]);
1912        log.append(&mut b).unwrap();
1913        log.compact().unwrap();
1914        let leo1 = log.log_end_offset();
1915        log.compact().unwrap();
1916        let leo2 = log.log_end_offset();
1917        assert!(leo1 == leo2);
1918    }
1919
1920    // ---- Local-retention helpers (KIP-405) ----
1921
1922    /// Build a log rolled into several sealed segments under `dir`. Mirror
1923    /// of the `remote_log_manager` test helper, kept local to this module.
1924    #[allow(clippy::needless_pass_by_value)]
1925    fn rolled_log(dir: &std::path::Path, extra: LogConfig) -> Log {
1926        let mut log = Log::open(
1927            dir,
1928            LogConfig {
1929                segment_bytes: 200,
1930                ..extra
1931            },
1932        )
1933        .unwrap();
1934        for _ in 0..16 {
1935            let mut b = sample_batch(2);
1936            log.append(&mut b).unwrap();
1937        }
1938        log
1939    }
1940
1941    #[test]
1942    fn local_log_start_offset_matches_log_start_offset() {
1943        let dir = tempdir().unwrap();
1944        let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
1945        for _ in 0..3 {
1946            let mut b = sample_batch(2);
1947            log.append(&mut b).unwrap();
1948        }
1949        assert!(log.local_log_start_offset() == log.log_start_offset());
1950    }
1951
1952    #[test]
1953    fn delete_local_segments_through_drops_sealed_below_target() {
1954        let dir = tempdir().unwrap();
1955        let mut log = rolled_log(dir.path(), LogConfig::default());
1956        let exports = log.tierable_segments();
1957        assert!(
1958            exports.len() >= 3,
1959            "test needs multiple sealed segments; got {}",
1960            exports.len()
1961        );
1962
1963        // Pick a target strictly between two sealed-segment boundaries:
1964        // one past the second sealed segment's last_offset. Every sealed
1965        // segment whose last_offset < target should be deleted.
1966        let target = exports[1].last_offset + 1;
1967        let expected_deleted: Vec<i64> = exports
1968            .iter()
1969            .filter(|e| e.last_offset < target)
1970            .map(|e| e.base_offset)
1971            .collect();
1972        let active_base_before = log.log_end_offset();
1973
1974        let removed = log.delete_local_segments_through(target).unwrap();
1975        assert!(removed == expected_deleted.len());
1976
1977        // (a) sealed segments below target are gone from the in-memory list.
1978        let remaining_bases: Vec<i64> = log
1979            .tierable_segments()
1980            .iter()
1981            .map(|e| e.base_offset)
1982            .collect();
1983        for base in &expected_deleted {
1984            assert!(
1985                !remaining_bases.contains(base),
1986                "base {base} should be dropped"
1987            );
1988        }
1989
1990        // (b) on-disk files for deleted segments are gone.
1991        for base in &expected_deleted {
1992            assert!(!name::log_path(dir.path(), *base).exists());
1993            assert!(!name::index_path(dir.path(), *base).exists());
1994            assert!(!name::timeindex_path(dir.path(), *base).exists());
1995        }
1996
1997        // (c) the active segment is untouched.
1998        assert!(log.log_end_offset() == active_base_before);
1999    }
2000
2001    #[test]
2002    fn delete_local_segments_through_keeps_active_segment() {
2003        let dir = tempdir().unwrap();
2004        let mut log = rolled_log(dir.path(), LogConfig::default());
2005        let leo_before = log.log_end_offset();
2006        let active_log = dir.path().join(format!(
2007            "{:020}.log",
2008            log.tierable_segments().last().unwrap().last_offset + 1
2009        ));
2010        // The active segment's .log file should exist before and after.
2011        assert!(active_log.exists());
2012
2013        // First: target far beyond every sealed segment but well past
2014        // active.base_offset. The active segment must not be removed.
2015        let huge_target = leo_before + 1_000_000;
2016        let _ = log.delete_local_segments_through(huge_target).unwrap();
2017        assert!(active_log.exists(), "active segment must survive");
2018        assert!(
2019            log.log_end_offset() == leo_before,
2020            "active segment untouched (LEO unchanged)"
2021        );
2022        // Sealed-segment pointer should have advanced past everything.
2023        assert!(log.tierable_segments().is_empty());
2024    }
2025
2026    #[test]
2027    fn delete_local_segments_through_advances_local_start_pointer() {
2028        let dir = tempdir().unwrap();
2029        let mut log = rolled_log(dir.path(), LogConfig::default());
2030        let exports = log.tierable_segments();
2031        let target = exports[1].last_offset + 1;
2032        log.delete_local_segments_through(target).unwrap();
2033        assert!(log.local_log_start_offset() == target);
2034        assert!(log.log_start_offset() == target);
2035    }
2036
2037    #[test]
2038    fn delete_local_segments_through_is_noop_at_or_below_current_start() {
2039        let dir = tempdir().unwrap();
2040        let mut log = rolled_log(dir.path(), LogConfig::default());
2041        let start_before = log.log_start_offset();
2042        let sealed_before = log.tierable_segments().len();
2043
2044        let removed = log.delete_local_segments_through(start_before).unwrap();
2045        assert!(removed == 0);
2046        let removed_below = log
2047            .delete_local_segments_through((start_before - 1).max(0))
2048            .unwrap();
2049        assert!(removed_below == 0);
2050        assert!(log.log_start_offset() == start_before);
2051        assert!(log.tierable_segments().len() == sealed_before);
2052    }
2053
2054    #[test]
2055    fn delete_local_segments_through_rejects_negative_target() {
2056        let dir = tempdir().unwrap();
2057        let mut log = Log::open(dir.path(), LogConfig::default()).unwrap();
2058        let err = log.delete_local_segments_through(-1).unwrap_err();
2059        assert!(matches!(err, LogError::InvalidArgument(_)));
2060    }
2061
2062    #[test]
2063    fn tick_skips_retention_when_remote_storage_enable_is_true() {
2064        use std::time::Duration;
2065        let far_future = SystemTime::now() + Duration::from_hours(365 * 24);
2066
2067        // Tiered topic: tick must NOT delete anything.
2068        let dir_tiered = tempdir().unwrap();
2069        let mut tiered = rolled_log(
2070            dir_tiered.path(),
2071            LogConfig {
2072                remote_storage_enable: true,
2073                retention_ms: Some(Duration::from_millis(1)),
2074                ..LogConfig::default()
2075            },
2076        );
2077        let sealed_before = tiered.tierable_segments().len();
2078        assert!(sealed_before > 0, "test setup must roll multiple segments");
2079        tiered.tick(far_future).unwrap();
2080        assert!(
2081            tiered.tierable_segments().len() == sealed_before,
2082            "tiered topics' retention is the RemoteLogManager's job"
2083        );
2084
2085        // Non-tiered baseline: tick should still evict aggressively.
2086        let dir_plain = tempdir().unwrap();
2087        let mut plain = rolled_log(
2088            dir_plain.path(),
2089            LogConfig {
2090                remote_storage_enable: false,
2091                retention_ms: Some(Duration::from_millis(1)),
2092                ..LogConfig::default()
2093            },
2094        );
2095        assert!(!plain.tierable_segments().is_empty());
2096        plain.tick(far_future).unwrap();
2097        // Non-tiered path keeps at least one segment (the active one); every
2098        // sealed segment is evicted.
2099        assert!(
2100            plain.tierable_segments().len() == 0,
2101            "standard retention deletes all sealed segments"
2102        );
2103    }
2104
2105    fn ts_batch(ts: i64) -> RecordBatch {
2106        let mut b = RecordBatch {
2107            base_offset: 0, // overwritten by Log::append
2108            base_timestamp: ts,
2109            max_timestamp: ts,
2110            last_offset_delta: 0,
2111            ..RecordBatch::default()
2112        };
2113        b.records.push(Record {
2114            offset_delta: 0,
2115            timestamp_delta: 0,
2116            value: Some(Bytes::from("v")),
2117            ..Default::default()
2118        });
2119        b
2120    }
2121
2122    #[test]
2123    fn log_offset_for_timestamp_across_segments() {
2124        let dir = tempdir().unwrap();
2125        let config = LogConfig {
2126            segment_bytes: 1, // roll after every batch → each record its own segment
2127            ..LogConfig::default()
2128        };
2129        let mut log = Log::open(dir.path(), config).unwrap();
2130        // offsets 0..=4 with timestamps 100,200,300,400,500.
2131        for (i, ts) in [100, 200, 300, 400, 500].into_iter().enumerate() {
2132            let mut b = ts_batch(ts);
2133            assert!(log.append(&mut b).unwrap() == i64::try_from(i).unwrap());
2134        }
2135        // before-first → offset 0.
2136        assert!(log.offset_for_timestamp(50) == Some((0, 100)));
2137        // exact match on a sealed segment.
2138        assert!(log.offset_for_timestamp(300) == Some((2, 300)));
2139        // between records → next record up.
2140        assert!(log.offset_for_timestamp(350) == Some((3, 400)));
2141        // landing on the active segment's record.
2142        assert!(log.offset_for_timestamp(500) == Some((4, 500)));
2143        // after-last → None.
2144        assert!(log.offset_for_timestamp(600) == None);
2145        log.close();
2146        drop(dir);
2147    }
2148
2149    #[test]
2150    fn log_offset_for_timestamp_empty_log_is_none() {
2151        let dir = tempdir().unwrap();
2152        let log = Log::open(dir.path(), LogConfig::default()).unwrap();
2153        assert!(log.offset_for_timestamp(0) == None);
2154        log.close();
2155        drop(dir);
2156    }
2157
2158    #[test]
2159    fn log_offset_of_max_timestamp_in_active() {
2160        let dir = tempdir().unwrap();
2161        let config = LogConfig {
2162            segment_bytes: 1, // each record its own segment
2163            ..LogConfig::default()
2164        };
2165        let mut log = Log::open(dir.path(), config).unwrap();
2166        // timestamps 100,300,200 at offsets 0,1,2. Max is 300 @ offset 1.
2167        for ts in [100, 300, 200] {
2168            let mut b = ts_batch(ts);
2169            log.append(&mut b).unwrap();
2170        }
2171        assert!(log.offset_of_max_timestamp() == 1);
2172        log.close();
2173        drop(dir);
2174    }
2175
2176    #[test]
2177    fn log_offset_of_max_timestamp_empty_is_log_start() {
2178        let dir = tempdir().unwrap();
2179        let log = Log::open(dir.path(), LogConfig::default()).unwrap();
2180        assert!(log.offset_of_max_timestamp() == log.log_start_offset());
2181        assert!(log.max_timestamp_offset_and_ts() == None);
2182        log.close();
2183        drop(dir);
2184    }
2185
2186    #[test]
2187    fn log_max_timestamp_offset_and_ts_returns_pair() {
2188        let dir = tempdir().unwrap();
2189        let config = LogConfig {
2190            segment_bytes: 1,
2191            ..LogConfig::default()
2192        };
2193        let mut log = Log::open(dir.path(), config).unwrap();
2194        for ts in [100, 300, 200] {
2195            let mut b = ts_batch(ts);
2196            log.append(&mut b).unwrap();
2197        }
2198        // Max timestamp 300 lives at offset 1.
2199        assert!(log.max_timestamp_offset_and_ts() == Some((1, 300)));
2200        log.close();
2201        drop(dir);
2202    }
2203}