Skip to main content

CortexAdapter

Struct CortexAdapter 

Source
pub struct CortexAdapter<State> { /* private fields */ }
Expand description

One-file CortEX adapter: projects envelopes into RedEX payloads, tails the same file, drives a RedexFold implementation, and exposes the materialized state as a read handle.

Created via Self::open.

Implementations§

Source§

impl<State> CortexAdapter<State>

Source

pub fn state(&self) -> Arc<RwLock<State>>

Read-only access to the materialized state. The returned Arc is cheap to clone; all readers and the fold task share the same RwLock.

Source

pub fn folded_through_seq(&self) -> Option<u64>

Highest RedEX sequence the fold task has processed — applied successfully OR skipped via recoverable_decode under Stop policy.

Use Self::applied_through_seq if you need “state actually reflects this seq” semantics; the difference matters under Stop+recoverable-decode where the skip-and-continue path advances this watermark (so wait_for_seq doesn’t deadlock on a bad event) but leaves applied_through_seq behind.

None if no event has been folded yet since open.

Source

pub fn applied_through_seq(&self) -> Option<u64>

Highest RedEX sequence K such that every seq in start_seq..=K was successfully applied to state via Ok(()) RedexFold::apply. A strict-prefix watermark: any skip (recoverable_decode under Stop, or any error under LogAndContinue) at seq M permanently caps this watermark at M-1, even if subsequent seqs apply successfully.

snapshot persists this value, so a restore tails from applied_through_seq + 1 and re-attempts every seq from the first skip onwards — preserving “log is the source of truth” even across the skip-and-continue path.

Distinct from Self::folded_through_seq, which is the highest processed seq and advances over skips so live consumers don’t deadlock.

None if no event has been applied yet since open (start_seq=0 with no successful applies, or start_seq=0 with the very first event having been skipped).

Source

pub fn fold_errors(&self) -> u64

Cumulative count of fold errors (only ever increases under FoldErrorPolicy::LogAndContinue; under Stop it is 0 or 1, with the task exiting after the first error).

Source

pub fn is_running(&self) -> bool

True if the fold task is currently running (has not observed shutdown, an error under Stop, or a tail-end signal).

Source

pub async fn wait_for_seq(&self, seq: u64) -> Result<(), Option<u64>>

Block until the fold task has processed every event up through seq (applied successfully OR skipped via recoverable_decode under Stop policy), or until the fold task stops (e.g. close, non-recoverable fold error under Stop).

Returns Ok(()) when the folded watermark reaches seq, or Err(folded) where folded is the highest seq processed before the fold task stopped (None if it stopped without processing anything). Pre-fix this returned () for both outcomes, so a caller waiting on a seq the fold task never reached (close, Stop-policy halt, retention-evicted tail lag) silently observed stale state. Mirrors Self::wait_for_applied_seq’s shape.

Use pattern:

let seq = adapter.ingest(envelope)?;
adapter.wait_for_seq(seq).await?;
let state = adapter.state().read();
// state reflects the ingest, UNLESS the event at `seq`
// was skipped via recoverable_decode under `Stop`.

Subtle point. This method waits on Self::folded_through_seq, which advances over events the fold task processed but skipped via RedexError::is_recoverable_decode. The skip-and- continue path is the documented DoS-resistance contract (a single corrupt event must not wedge the task forever). If you need to confirm state actually reflects the ingest at seq, follow up with adapter.applied_through_seq() >= Some(seq).

Source

pub async fn wait_for_applied_seq(&self, seq: u64) -> Result<(), Option<u64>>

RYW-strength wait. Resolves when the applied watermark (events that actually ran through the fold body, not recoverable-skipped) catches up to seq, OR when the fold task stops before reaching seq.

Returns Ok(()) on a real apply-through, Err(applied) where applied is the last successfully-applied seq on stop. Differs from Self::wait_for_seq which resolves on the folded watermark (including skipped events). RYW requires applied, not folded — otherwise a producer whose write hit a recoverable-decode skip would observe Ok(()) and then read state that doesn’t reflect the write.

Source

pub fn close(&self) -> Result<(), CortexAdapterError>

Close the adapter. Stops the fold task (after it finishes any in-progress apply), leaves the RedEX file open so other adapters / callers can continue using it, and leaves the state handle readable. Idempotent.

Source

pub fn changes(&self) -> impl Stream<Item = u64> + Send + 'static

Stream of RedEX sequences, one per successful (or LogAndContinue-skipped) fold application. Used by reactive queries: on each emission, the caller re-reads Self::state to compute its current view.

Lag semantics: if a subscriber falls more than 64 events behind (the internal broadcast channel capacity), the channel drops intermediate events. This implementation filters lag errors out silently — by the time the subscriber catches up, state() reflects the latest applied events regardless of how many signals were missed. Subscribers that need to observe lag (e.g. for telemetry or reactive-backpressure) should use Self::changes_with_lag instead.

The stream ends when all adapter handles have been dropped and the fold task has exited.

Source

pub fn changes_with_lag( &self, ) -> impl Stream<Item = ChangeEvent> + Send + 'static

Stream of changes that surfaces broadcast-channel lag as a Lagged(n) event interleaved with the sequence emissions.

The yielded items are ChangeEvents — Seq(u64) for a successful fold-apply notification, and Lagged(n) when the subscriber fell n events behind the broadcast channel (capacity 64). Pre-fix Self::changes silently dropped Lagged errors via filter_map(|r| r.ok()); downstream telemetry consumers had no way to surface “you missed N changes.” This method is the lossless counterpart — by the time a subscriber sees Lagged(n), state() already reflects past those n events, so the subscriber can react (re-read state, log lag, apply backpressure) without missing data.

Source

pub fn ingest<E: IntoRedexPayload>( &self, envelope: E, ) -> Result<u64, CortexAdapterError>

Append an envelope. Projects to (EventMeta, tail), builds the concatenated payload, calls RedexFile::append, and returns the assigned RedEX sequence.

Source

pub fn ingest_with_token<E: IntoRedexPayload>( &self, envelope: E, ) -> Result<WriteToken, CortexAdapterError>

Append an envelope and return a WriteToken addressing the resulting write. The token is the typed handle the read-your-writes API consumes via Self::wait_for_token; equivalent to calling Self::ingest and pairing the returned seq with the envelope’s meta.origin_hash, but does both in one shot so the binding surface can round-trip a single value.

Source

pub async fn wait_for_token( &self, token: WriteToken, deadline: Duration, ) -> Result<(), WaitForTokenError>

Block until the fold task has processed every event up through token.seq, or deadline elapses. Returns Err(WaitForTokenError::Timeout) on deadline; Ok(()) once the watermark catches up (or the fold task stops — see Self::wait_for_seq for the same caveat).

The token’s origin_hash is informational at this layer — the generic CortexAdapter folds every event in its RedEX file regardless of origin. Origin-bound adapters (e.g. super::tasks::TasksAdapter, super::memories::MemoriesAdapter) layer their own origin assertion on top.

Source

pub fn ryw_metrics(&self) -> RywMetricsSnapshot

Snapshot the RYW counters for this adapter. Cheap; reads four atomics under Relaxed.

Source§

impl<State: Send + Sync + 'static> CortexAdapter<State>

Source

pub fn open<F>( redex: &Redex, name: &ChannelName, redex_config: RedexFileConfig, adapter_config: CortexAdapterConfig, fold: F, initial_state: State, ) -> Result<Self, CortexAdapterError>
where F: RedexFold<State> + Send + 'static,

Open an adapter against a RedEX file.

Opens (or reuses) <redex>/<name> via Redex::open_file, spawns a background task that tails the file and drives fold, and returns the handle.

Source§

impl<State> CortexAdapter<State>
where State: Serialize + Send + Sync + 'static,

Source

pub fn snapshot(&self) -> Result<(Vec<u8>, Option<u64>), CortexAdapterError>

Capture a point-in-time snapshot of the materialized state.

Returns (state_bytes, last_seq) where state_bytes is the postcard-serialized state and last_seq is the highest RedEX sequence successfully applied to it as a strict prefix (i.e. Self::applied_through_seq). Persist both together — they form a consistent pair, guaranteed by the adapter holding the state write lock while advancing the watermark.

Restore via Self::open_from_snapshot on a State that also implements DeserializeOwned. Restore tails from last_seq + 1, so any events that were processed but skipped via recoverable_decode between snapshots are re-attempted on restore — preserving “log is the source of truth” even across the skip-and-continue path. (Pre-fix, snapshot read folded_through_seq, which advances over skipped events; restore tailed from past them and the gap became permanent in durable state.)

Re-apply double-counting. state_bytes reflects in-memory state at snapshot time, which includes the effects of every successful fold including applies past any prior skip. Restore tails from the strict-prefix last_seq + 1, so seqs past the skip are re-fed to the fold function — fold functions that are not idempotent against re-application will produce divergent state on restore. The mitigations: (1) make the fold idempotent (the standard event-sourcing recommendation); (2) snapshot only when applied_through_seq() == folded_through_seq() (no gap → no re-apply); or (3) accept best-effort restore semantics for adapters that have ever observed a recoverable_decode skip. The trade-off vs. the pre-fix behavior is asymmetric: pre-fix, the skipped seq was permanently lost; post-fix, the skipped seq is re-attempted, at the cost of double-applying intervening successful seqs for non-idempotent folds.

last_seq is None if no event has been applied yet since open (the snapshot is still meaningful — it represents the initial State — but callers typically wait until Self::wait_for_seq has returned and Self::applied_through_seq has advanced before snapshotting).

Source§

impl<State> CortexAdapter<State>
where State: DeserializeOwned + Send + Sync + 'static,

Source

pub fn open_from_snapshot<F>( redex: &Redex, name: &ChannelName, redex_config: RedexFileConfig, adapter_config: CortexAdapterConfig, fold: F, state_bytes: &[u8], last_seq: Option<u64>, ) -> Result<Self, CortexAdapterError>
where F: RedexFold<State> + Send + 'static,

Open an adapter from a previously-captured snapshot, skipping the [0, last_seq] replay.

state_bytes is the blob returned from Self::snapshot. last_seq is its companion sequence. The tail starts at last_seq + 1; the initial state is deserialized from the blob; the fold task is spawned as usual.

If last_seq is None (no events had been folded at snapshot time), the tail starts at seq 0 — equivalent to StartPosition::FromBeginning with the deserialized initial state.

Trait Implementations§

Source§

impl<State> Clone for CortexAdapter<State>

Source§

fn clone(&self) -> Self

Returns a duplicate of the value. Read more
1.0.0 (const: unstable) · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<State> Debug for CortexAdapter<State>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<State> Freeze for CortexAdapter<State>

§

impl<State> !RefUnwindSafe for CortexAdapter<State>

§

impl<State> Send for CortexAdapter<State>
where State: Send + Sync,

§

impl<State> Sync for CortexAdapter<State>
where State: Send + Sync,

§

impl<State> Unpin for CortexAdapter<State>

§

impl<State> UnsafeUnpin for CortexAdapter<State>

§

impl<State> !UnwindSafe for CortexAdapter<State>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more