Skip to main content

Log

Struct Log 

Source
pub struct Log { /* private fields */ }
Expand description

A Kafka-format log: a sorted collection of Segments plus a single active segment that accepts appends.

Log is single-writer (&mut self for mutation) and supports concurrent readers (&self for read/log_start_offset/etc.). Construct one with Log::open.

Implementations§

Source§

impl Log

Source

pub fn open(dir: impl AsRef<Path>, config: LogConfig) -> Result<Self, LogError>

Open or create a Log at dir. Discovers existing segments by .log filename, marks all but the latest as sealed, and (if the directory is empty) creates a fresh active segment at offset 0.

Source

pub fn dir(&self) -> &Path

Directory this log was opened against. The broker’s intra-broker log-dir reassignment (KIP-113) reads this to determine the current owning log.dir of a partition without re-implementing the directory-layout convention.

Source

pub fn log_start_offset(&self) -> i64

First absolute offset still in the log.

Source

pub fn set_log_start_offset(&mut self, new_start: i64) -> Result<(), LogError>

Advance log_start_offset to new_start. Must be in [current log_start, log_end]. Used by trim_to_offset for the active-segment case and by the broker’s DeleteRecords handler. Does NOT physically truncate on-disk segments — only shifts the in-memory start pointer.

new_start must be non-negative.

§Errors

Returns LogError::InvalidArgument if new_start is negative.

Source

pub fn reset_to(&mut self, new_base: i64) -> Result<(), LogError>

Reset the log to be empty starting at new_base. Drops every segment + on-disk file and creates a fresh active segment at new_base. Used by the replicator’s OFFSET_OUT_OF_RANGE recovery path when the follower has fallen behind the leader’s log_starttruncate_to can’t help here because we need to move log_start forward past where there is no local data.

Source

pub fn log_end_offset(&self) -> i64

Next offset that append will assign.

Source

pub fn size_bytes(&self) -> u64

Total .log byte size across sealed and active segments. Read from the segments’ tracked logical size rather than a filesystem stat, so it reflects buffered appends immediately and consistently across platforms (a directory stat can lag an open, unflushed write handle on some OSes).

Source

pub fn lso(&self) -> i64

Last-Stable-Offset: the highest offset that consumers in read_committed isolation may see. Advances only when no transactions are in flight; held back at the first offset of any open (uncommitted/unaborted) transactional batch.

Source

pub fn close(self)

Close all segments. Drop runs automatically when self moves; this method just names the operation explicitly.

Source

pub fn set_config(&self, new: LogConfig)

Atomically swap the active LogConfig. The next retention/roll check reads the new value; in-flight append calls hold the lock for trivially short windows and will not see a half-applied config.

Callable through &self (the Arc<RwLock<…>> wrapping lets us mutate the inner value without an exclusive borrow on the Log).

Source

pub fn config_snapshot(&self) -> LogConfig

Snapshot the current config. Allocates a clone; cheap because LogConfig is small and Clone.

Source

pub fn aborted_in_range(&self, start: i64, end: i64) -> Vec<AbortedTxn>

Return all aborted transactions from the active segment’s .txnindex whose offset range overlaps [start, end).

Only the active segment’s index is consulted (older sealed segments’ .txnindex files are not loaded into memory). The window [fetch_offset, lso) always falls within the active segment in practice because LSO can only advance past a committed/aborted marker, which lands in the same segment as the corresponding transactional batches.

Source

pub fn append(&mut self, batch: &mut RecordBatch) -> Result<i64, LogError>

Append a RecordBatch. The batch’s base_offset is overwritten by the log to be the next assigned offset; last_offset_delta determines how many absolute offsets this batch consumes. Returns the assigned base_offset.

Source

pub fn epoch_checkpoint(&self) -> &LeaderEpochCheckpoint

Access the per-partition leader-epoch checkpoint.

Source

pub fn append_at( &mut self, batch: &mut RecordBatch, offset: i64, ) -> Result<(), LogError>

Append a RecordBatch whose base_offset is set by the caller.

Unlike Log::append, this does NOT overwrite batch.base_offset — it is used by the broker’s replicator to preserve the leader-assigned offset on the follower’s local log.

offset must equal the log’s current Log::log_end_offset; otherwise this returns LogError::OffsetMismatch. On success, batch.base_offset is set to offset (it should already match) before the batch is written.

Source

pub fn read( &self, offset: i64, max_bytes: usize, ) -> Result<ReadOutput, LogError>

Read batches starting at offset, returning up to roughly max_bytes of .log data. Walks sealed segments first, then the active segment, so reads can span segment boundaries.

Source

pub fn read_raw( &self, fetch_offset: i64, limit_offset: i64, max_bytes: usize, ) -> Result<RawRead, LogError>

Like Log::read but returns verbatim wire bytes (no decode), walking sealed segments then the active segment. Includes only batches with base_offset < limit_offset, up to roughly max_bytes (≥ one batch).

Source

pub fn truncate_to(&mut self, offset: i64) -> Result<(), LogError>

Truncate the log so no records at offset >= offset remain. Used by replication / leader election.

Source

pub fn trim_to_offset(&mut self, target: i64) -> Result<i64, LogError>

Trim from the start of the log: drop every sealed segment whose last offset is < target, advance log_start_offset if target falls inside the active segment. Active segment is never deleted by this call. Returns the resulting log_start_offset.

target is clamped to [0, log_end_offset()]. Caller asks for trim past LEO → trim to LEO.

§Errors

Returns LogError::InvalidArgument if target < 0.

Source

pub fn tick(&mut self, now: SystemTime) -> Result<(), LogError>

Periodic maintenance: apply time- and size-based retention to the sealed segments. The active segment is never deleted, and if every segment would otherwise be evicted we retain at least one. (Active-roll-on-age is a placeholder per the plan; skip it.)

Source

pub fn local_log_start_offset(&self) -> i64

First absolute offset still present on this broker’s local disk (KIP-405). This delegates to Log::log_start_offset — the two pointers co-advance.

Source

pub fn offset_for_timestamp(&self, target_ts: i64) -> Option<(i64, i64)>

Earliest local (offset, record_timestamp) whose record timestamp is >= target_ts, searching sealed segments oldest-first then the active segment. The first segment whose max_timestamp >= target_ts holds the answer; the per-segment helper does the index lookup + forward scan. None when no local record qualifies (including an empty log).

Source

pub fn max_timestamp_offset_and_ts(&self) -> Option<(i64, i64)>

Offset and timestamp of the record carrying the partition’s largest timestamp, scanning sealed segments then the active segment. Ties resolve to the earliest offset (the first segment, and the first record within it, wins). Returns None when the log holds no records.

Source

pub fn offset_of_max_timestamp(&self) -> i64

Offset of the record carrying the partition’s largest timestamp, or log_start_offset() when the log holds no records (KIP-734 MAX_TIMESTAMP).

Source

pub fn delete_local_segments_through( &mut self, target: i64, ) -> Result<usize, LogError>

Physically delete every sealed segment whose last_offset < target, then advance log_start_offset to target (KIP-405). The active segment is never touched. Returns the count of segments removed; a no-op (returns Ok(0)) when target <= local_log_start_offset().

The caller is responsible for verifying these segments are safely in the remote tier (CopySegmentFinished) before invoking this; Log enforces no tiered-storage invariants. See crates/broker/src/remote_log_manager.rs for the production caller.

§Errors

Returns LogError::InvalidArgument if target is negative.

Source

pub fn tierable_segments(&self) -> Vec<SegmentExport>

Describe every sealed segment for tiered-storage offload (KIP-405). The active segment is never included — only sealed segments are immutable and safe to copy.

last_offset is derived from the next segment’s base_offset (the active segment’s base for the most-recent sealed segment), so it is correct even for segments loaded from disk without a tail scan. max_timestamp falls back to -1 (unknown) when the in-memory value has not been populated.

Source

pub fn compact(&mut self) -> Result<(), LogError>

Run one compaction pass over the sealed segment list. No-op if fewer than 2 sealed segments exist (nothing to dedup yet).

The active segment is never touched. Output is a single new sealed segment at the lowest input base offset, replacing all consumed sealed segments.

Trait Implementations§

Source§

impl Debug for Log

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl Freeze for Log

§

impl RefUnwindSafe for Log

§

impl Send for Log

§

impl Sync for Log

§

impl Unpin for Log

§

impl UnsafeUnpin for Log

§

impl UnwindSafe for Log

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<ST, DT> CastableFrom<ST, Initialized, Initialized> for DT
where ST: ?Sized, DT: ?Sized,

Source§

impl<ST, DT> CastableFrom<ST, Uninit, Uninit> for DT
where ST: ?Sized, DT: ?Sized,

Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Read<Exclusive, BecauseExclusive> for T
where T: ?Sized,

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.