Skip to main content

RedexFile

Struct RedexFile 

Source
pub struct RedexFile { /* private fields */ }
Expand description

A handle to a RedEX file. Cheap to clone.

Created via super::Redex::open_file.

Implementations§

Source§

impl RedexFile

Source

pub fn name(&self) -> &ChannelName

The channel name this file is bound to.

Source

pub fn config(&self) -> &RedexFileConfig

The config this file was opened with.

Source

pub fn len(&self) -> usize

Number of currently retained entries.

Source

pub fn is_empty(&self) -> bool

True if no entries are retained.

Source

pub fn retained_bytes(&self) -> u64

Sum of payload_len across every currently-retained entry. Reflects the substrate’s authoritative view of bytes-on-disk after retention trim; callers that maintain their own monotonic byte counters (e.g. greedy’s GreedyCacheRegistry) use this to resync periodically. O(n) over the retained index; cheap for the typical few-K-entry case but the caller shouldn’t call it per-event.

Source

pub fn next_seq(&self) -> u64

Next sequence to be assigned (== total append count since open, including any evicted head).

Pre-fix, this read next_seq outside the state lock. append / append_batch etc. allocate a seq via fetch_add before the disk write and fetch_sub-rollback on failure — both within the state-lock critical section. A concurrent reader without the lock could observe the temporarily-bumped value: external observers (metrics, snapshot logic, an IndexStart::FromSeq(next_seq()) re-tail) believed a seq existed that was never durably appended. Taking the state lock here serializes the read with the append’s commit-or-rollback, so callers only observe values that have been durably committed (or never assigned).

Source

pub fn len_and_next_seq(&self) -> (usize, u64)

Atomic snapshot of (len, next_seq). Observers that need both values consistently with each other (e.g. metrics dashboards comparing “retained count” to “total seqs assigned”) should use this rather than len() followed by next_seq().

Pre-fix observers called the two methods in sequence. Each method takes the state lock individually, so the per-call view is consistent — but two appends could commit between the two reads, and the resulting pair could satisfy len + 1 > next_seq_seen (reader saw post-append len but pre-append next_seq). Observers downstream of a metrics tick would then double-account the in-flight seqs. This single-lock accessor returns both values from one critical section so the snapshot is strictly consistent.

Source

pub fn is_closed(&self) -> bool

Whether Self::close has run. After close, tail streams terminate with Err(Closed) and append/append_* reject with the same error.

Source

pub fn lowest_retained_seq(&self) -> Option<u64>

Lowest sequence number currently retained in the in-memory index, or None when the file holds nothing.

next_seq() - lowest_retained_seq() is not the count — retention is by event-or-byte-or-age, not by contiguous range. Use this for “where can I safely tail from without triggering Lagged?”: passing the returned seq (or higher) to Self::tail guarantees the backfill does not signal retention-induced history loss.

Source

pub fn append(&self, payload: &[u8]) -> Result<u64, RedexError>

Append one event. Returns the assigned sequence.

Failure modes: PayloadTooLarge if the segment is full or the offset would overflow u32; Io(_) if the disk mirror fails under redex-disk. Failure atomicity: memory is committed only after the disk write succeeds (for persistent files); next_seq is rolled back on disk failure so no seq number is burnt and no in-memory entry diverges from disk.

Notify-vs-fsync ordering (durability contract). Under every FsyncPolicy except a hypothetical “always fsync on append” (which does not exist — the per-write latency penalty would dominate), notify_watchers fires while the new event is still only in the kernel page cache. A crash between notify and the next scheduled fsync (per FsyncPolicy::EveryN, Interval, or IntervalOrBytes) loses the event from disk after subscribers have already observed it.

Watchers persisting derived state past the live event must therefore:

  1. Key durable records idempotently on (channel, seq).
  2. On restart, consult the on-disk file’s Self::next_seq and tail from last_persisted_local_seq + 1, treating any seq above the local watermark and at-or-below the file’s current head as a re-delivery — re-apply only if not already reflected in derived state.

FsyncPolicy::Never (heap-only or fire-and-forget disk) makes the same contract apply to the in-memory state on process restart; persistent files with a non-Never policy narrow the window but do not close it. The cortex adapter’s applied_through_seq watermark + snapshot/restore pair already implements the idempotent-reconcile recipe; ad-hoc subscribers (a future netdb-watcher persisting elsewhere) must mirror it.

Source

pub fn append_inline(&self, payload: &[u8; 8]) -> Result<u64, RedexError>

Append a fixed-length 8-byte inline payload. Skips the segment indirection. Returns the assigned sequence. Same failure- atomicity contract as Self::append.

Source

pub fn append_batch( &self, payloads: &[Bytes], ) -> Result<Option<u64>, RedexError>

Append many payloads. Returns Some(seq) of the FIRST event in the batch, or None for an empty input. All entries land contiguously in the index.

§Empty input

Pre-fix the signature was Result<u64, RedexError> and an empty payloads returned Ok(next_seq) — the seq value the next append would receive. Callers couldn’t distinguish “wrote zero, seq N would be next” from “wrote one event with seq N” via the return value alone.

Breaking change: signature is now Result<Option<u64>, RedexError>. None ⇒ empty input, no events appended; Some(seq) ⇒ first seq of the batch. Callers iterating over optionally-empty batches no longer need an is_empty pre-check.

Failure atomicity:

  • seq numbers are allocated after the batch is validated to fit (both segment capacity and u32 offset width);
  • for persistent files, the batch is written to disk before any in-memory commit — on disk failure the seq allocation rolls back and neither memory nor subscribers observe the batch.
Source

pub fn append_ordered(&self, payload: &[u8]) -> Result<u64, RedexError>

Strictly-ordered variant of Self::append.

Both Self::append and this method now take the state lock before allocating a sequence number (the failure-atomicity fix required moving fetch_add inside the lock so rollback on disk-write failure is safe). That means append already produces in-seq-order index insertions under contention, and the two paths are functionally equivalent for single writes.

The real distinction is at the wrapper level: this method pairs with Self::append_batch_ordered, which holds ONE lock across an entire batch, whereas Self::append_batch also holds one lock per batch today. In v1 the non-ordered and ordered paths are nearly identical. The distinction is kept so that a future optimization of Self::append (e.g. moving the seq allocation back outside the lock with a different rollback scheme) doesn’t affect callers who need guaranteed-ordered appends.

Used by super::OrderedAppender for replay determinism. Same failure-atomicity contract as Self::append.

Source

pub fn append_inline_ordered( &self, payload: &[u8; 8], ) -> Result<u64, RedexError>

Ordered variant of Self::append_inline. See Self::append_ordered. Same failure-atomicity contract.

Source

pub fn append_batch_ordered( &self, payloads: &[Bytes], ) -> Result<Option<u64>, RedexError>

Ordered variant of Self::append_batch. The whole batch is appended under one state-lock acquisition, so it’s both atomic (all-or-nothing within the batch) and strictly seq-ordered relative to any other ordered writers. Same failure-atomicity contract as Self::append_batch.

Returns Some(first_seq) on a non-empty batch and None on empty input — same convention as append_batch.

Source

pub fn append_and_fold<T, F, S>( &self, value: &T, state: &mut S, fold_fn: F, ) -> Result<u64, RedexError>
where T: Serialize, F: FnOnce(&T, &mut S),

Append value (postcard-serialized) AND run fold_fn against caller-supplied state in the same call. Returns the assigned seq.

This is the common “log-an-event and update a materialized view in one step” pattern, without spinning up a full CortEX adapter. Callers maintain state themselves; the fold closure sees the just-appended value and mutates state in place.

Note: the fold runs AFTER the RedEX append. If the append succeeds and the fold panics, the log advances but state is out of sync — callers who need crash-consistency should use the CortEX adapter’s durable snapshot + open_from_snapshot instead.

Source

pub fn append_postcard<T: Serialize>( &self, value: &T, ) -> Result<u64, RedexError>

Convenience: serialize value with postcard and append.

Source

pub fn tail( &self, from_seq: u64, ) -> impl Stream<Item = Result<RedexEvent, RedexError>> + Send + 'static

Subscribe to all events with seq >= from_seq, including those already in the index at call time.

Backfill and live registration happen atomically under the state lock: no event can interleave between backfill delivery and live subscription.

Delivery is backed by a per-subscription bounded channel of depth RedexFileConfig::tail_buffer_size.

  • Backfill overflow (requested from_seq produces more retained events than the buffer can hold): pre-flighted under the state lock; the subscriber observes RedexError::Lagged as the first stream item and no truncated history. Guaranteed deliverable because the channel is empty at that point.
  • Live overflow (subscriber falls behind during live delivery): disconnected with a best-effort RedexError::Lagged. Under sustained saturation the signal itself may be dropped (the channel is full when we try to enqueue it), in which case the subscriber sees a clean stream end.
Source

pub fn read_range(&self, start: u64, end: u64) -> Vec<RedexEvent>

One-shot read of the half-open range [start, end) from the in-memory index. Returns only entries currently retained; silently skips any seqs that have been evicted.

state.index is sorted by seq by construction, so the lookup uses slice::partition_point — O(log N) — to locate both range bounds. Pre-fix perf #52 in docs/performance/net-perf-analysis.md this iterated the whole index linearly to find start, then walked the tail to find end — O(N) per read regardless of range size.

Source

pub fn read_one(&self, seq: u64) -> Option<RedexEvent>

Read a single event by sequence. Returns None when seq has been evicted or was never appended. Convenience over [Self::read_range(seq, seq + 1)]; saves the Vec allocation when the caller only wants one event.

O(log N) via slice::partition_point — pre-fix this was O(N), scanning the entire index even for trivially-absent seqs (perf #52).

Source

pub async fn resolve_one(&self, seq: u64) -> Result<Option<Bytes>, BlobError>

Read a single event and resolve its payload — inline events return their bytes as-is, blob-ref events go through this file’s configured super::super::dataforts::blob::BlobAdapter (looked up by config.blob_adapter_id in the global registry) and have their BLAKE3 hash verified.

Returns:

  • Ok(None)seq is not retained / never appended.
  • Ok(Some(bytes)) — resolved payload.
  • Err(BlobError::*) — payload was a malformed BlobRef, the configured adapter id is unset / missing in the registry, the adapter’s fetch failed, or the hash check failed.
Source

pub fn sweep_retention(&self)

Run the retention policy synchronously. Exposed so a background task (heartbeat loop) can drive it; no hot-path cost.

§Disk I/O under parking_lot Mutex

sweep_retention and append_batch call disk operations (disk.compact_to, disk.append_entries_at) while holding state.lock(), a non-yielding parking_lot Mutex. All concurrent appenders, tail registrations, read_range, len, is_empty, and close block on the same lock for the duration of the I/O. This is a latency / starvation concern, not a correctness one. Throughput-critical deployments should drive sweep_retention from a low-priority background task scheduled outside the hot path.

A proper fix would snapshot the state needed for I/O, release the lock, perform I/O, then re-acquire to commit — that’s a substantial restructure (transactional staging area, conflict resolution against concurrent appends) and out of scope here. Documented as a known performance trade-off so future profilers don’t rediscover it as a “bug”.

Source

pub fn skip_to(&self, target_seq: u64) -> Result<(), RedexError>

Skip-ahead: drop every retained event with seq < target_seq and advance next_seq to target_seq. The next append allocates target_seq; the local sequence has a permanent gap in [old_next_seq, target_seq) (those seqs were never assigned on this file).

Used by the replication runtime’s apply_sync_response failure path (ApplyError::GapBeforeChunk): when the leader’s response carries a first_seq strictly above the replica’s local tail (the leader trimmed past the replica’s retained range), the replica skips ahead instead of replaying the missing gap. Plan §8.

Idempotent on target_seq <= current_next_seq — returns Ok(()) without mutation.

Persistent files: returns RedexError::Channel("skip_to not supported on persistent files in v1"). The persistent-tier skip-ahead path needs a disk-segment truncate plus idx/ts rebuild that doesn’t ship in v1; replicas on persistent files fall back to NACK BadRange and heartbeat-cycle recovery (the leader’s next heartbeat reports the new tail, the replica re-issues a request from its old tail, and accepts the gap-before-chunk error as a recoverable retry-with-backoff signal).

Source

pub fn close(&self) -> Result<(), RedexError>

Close the file. Outstanding tail streams receive RedexError::Closed. For persistent files, fsyncs the disk segment before returning and signals any background fsync task (Interval or EveryN) to exit. close() always fsyncs regardless of the per-file FsyncPolicy — this is the caller’s explicit durability barrier.

Source

pub fn sync(&self) -> Result<(), RedexError>

Fsync the disk segment (no-op for heap-only files).

Trait Implementations§

Source§

impl Clone for RedexFile

Source§

fn clone(&self) -> RedexFile

Returns a duplicate of the value. Read more
1.0.0 (const: unstable) · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for RedexFile

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more