pub struct CortexAdapter<State> { /* private fields */ }Expand description
One-file CortEX adapter: projects envelopes into RedEX payloads,
tails the same file, drives a RedexFold implementation, and
exposes the materialized state as a read handle.
Created via Self::open.
Implementations§
Source§impl<State> CortexAdapter<State>
impl<State> CortexAdapter<State>
Sourcepub fn state(&self) -> Arc<RwLock<State>> ⓘ
pub fn state(&self) -> Arc<RwLock<State>> ⓘ
Read-only access to the materialized state. The returned Arc
is cheap to clone; all readers and the fold task share the
same RwLock.
Sourcepub fn folded_through_seq(&self) -> Option<u64>
pub fn folded_through_seq(&self) -> Option<u64>
Highest RedEX sequence the fold task has processed —
applied successfully OR skipped via recoverable_decode
under Stop policy.
Use Self::applied_through_seq if you need “state
actually reflects this seq” semantics; the difference
matters under Stop+recoverable-decode where the
skip-and-continue path advances this watermark (so
wait_for_seq doesn’t deadlock on a bad event) but
leaves applied_through_seq behind.
None if no event has been folded yet since open.
Sourcepub fn applied_through_seq(&self) -> Option<u64>
pub fn applied_through_seq(&self) -> Option<u64>
Highest RedEX sequence K such that every seq in
start_seq..=K was successfully applied to state via
Ok(()) RedexFold::apply. A strict-prefix watermark:
any skip (recoverable_decode under Stop, or any error
under LogAndContinue) at seq M permanently caps this
watermark at M-1, even if subsequent seqs apply
successfully.
snapshot persists this value, so a restore tails from
applied_through_seq + 1 and re-attempts every seq from
the first skip onwards — preserving “log is the source
of truth” even across the skip-and-continue path.
Distinct from Self::folded_through_seq, which is the
highest processed seq and advances over skips so live
consumers don’t deadlock.
None if no event has been applied yet since open
(start_seq=0 with no successful applies, or start_seq=0
with the very first event having been skipped).
Sourcepub fn fold_errors(&self) -> u64
pub fn fold_errors(&self) -> u64
Cumulative count of fold errors (only ever increases under
FoldErrorPolicy::LogAndContinue; under Stop it is 0 or
1, with the task exiting after the first error).
Sourcepub fn is_running(&self) -> bool
pub fn is_running(&self) -> bool
True if the fold task is currently running (has not observed
shutdown, an error under Stop, or a tail-end signal).
Sourcepub async fn wait_for_seq(&self, seq: u64) -> Result<(), Option<u64>>
pub async fn wait_for_seq(&self, seq: u64) -> Result<(), Option<u64>>
Block until the fold task has processed every event up
through seq (applied successfully OR skipped via
recoverable_decode under Stop policy), or until the
fold task stops (e.g. close, non-recoverable fold error
under Stop).
Returns Ok(()) when the folded watermark reaches seq,
or Err(folded) where folded is the highest seq processed
before the fold task stopped (None if it stopped without
processing anything). Pre-fix this returned () for both
outcomes, so a caller waiting on a seq the fold task never
reached (close, Stop-policy halt, retention-evicted tail
lag) silently observed stale state. Mirrors
Self::wait_for_applied_seq’s shape.
Use pattern:
let seq = adapter.ingest(envelope)?;
adapter.wait_for_seq(seq).await?;
let state = adapter.state().read();
// state reflects the ingest, UNLESS the event at `seq`
// was skipped via recoverable_decode under `Stop`.Subtle point. This method waits on
Self::folded_through_seq, which advances over
events the fold task processed but skipped via
RedexError::is_recoverable_decode. The skip-and-
continue path is the documented DoS-resistance contract
(a single corrupt event must not wedge the task forever).
If you need to confirm state actually reflects the
ingest at seq, follow up with
adapter.applied_through_seq() >= Some(seq).
Sourcepub async fn wait_for_applied_seq(&self, seq: u64) -> Result<(), Option<u64>>
pub async fn wait_for_applied_seq(&self, seq: u64) -> Result<(), Option<u64>>
RYW-strength wait. Resolves when the applied watermark
(events that actually ran through the fold body, not
recoverable-skipped) catches up to seq, OR when the fold
task stops before reaching seq.
Returns Ok(()) on a real apply-through, Err(applied)
where applied is the last successfully-applied seq on
stop. Differs from Self::wait_for_seq which resolves
on the folded watermark (including skipped events). RYW
requires applied, not folded — otherwise a producer whose
write hit a recoverable-decode skip would observe
Ok(()) and then read state that doesn’t reflect the
write.
Sourcepub fn close(&self) -> Result<(), CortexAdapterError>
pub fn close(&self) -> Result<(), CortexAdapterError>
Close the adapter. Stops the fold task (after it finishes any in-progress apply), leaves the RedEX file open so other adapters / callers can continue using it, and leaves the state handle readable. Idempotent.
Sourcepub fn changes(&self) -> impl Stream<Item = u64> + Send + 'static
pub fn changes(&self) -> impl Stream<Item = u64> + Send + 'static
Stream of RedEX sequences, one per successful (or
LogAndContinue-skipped) fold application. Used by reactive
queries: on each emission, the caller re-reads
Self::state to compute its current view.
Lag semantics: if a subscriber falls more than 64 events
behind (the internal broadcast channel capacity), the channel
drops intermediate events. This implementation filters lag
errors out silently — by the time the subscriber catches up,
state() reflects the latest applied events regardless of
how many signals were missed. Subscribers that need to
observe lag (e.g. for telemetry or reactive-backpressure)
should use Self::changes_with_lag instead.
The stream ends when all adapter handles have been dropped and the fold task has exited.
Sourcepub fn changes_with_lag(
&self,
) -> impl Stream<Item = ChangeEvent> + Send + 'static
pub fn changes_with_lag( &self, ) -> impl Stream<Item = ChangeEvent> + Send + 'static
Stream of changes that surfaces broadcast-channel lag as a
Lagged(n) event interleaved with the sequence emissions.
The yielded items are ChangeEvents — Seq(u64) for a
successful fold-apply notification, and Lagged(n) when the
subscriber fell n events behind the broadcast channel
(capacity 64). Pre-fix Self::changes silently dropped
Lagged errors via filter_map(|r| r.ok()); downstream
telemetry consumers had no way to surface “you missed N
changes.” This method is the lossless counterpart — by the
time a subscriber sees Lagged(n), state() already
reflects past those n events, so the subscriber can react
(re-read state, log lag, apply backpressure) without
missing data.
Sourcepub fn ingest<E: IntoRedexPayload>(
&self,
envelope: E,
) -> Result<u64, CortexAdapterError>
pub fn ingest<E: IntoRedexPayload>( &self, envelope: E, ) -> Result<u64, CortexAdapterError>
Append an envelope. Projects to (EventMeta, tail), builds the
concatenated payload, calls RedexFile::append, and returns
the assigned RedEX sequence.
Sourcepub fn ingest_with_token<E: IntoRedexPayload>(
&self,
envelope: E,
) -> Result<WriteToken, CortexAdapterError>
pub fn ingest_with_token<E: IntoRedexPayload>( &self, envelope: E, ) -> Result<WriteToken, CortexAdapterError>
Append an envelope and return a WriteToken addressing
the resulting write. The token is the typed handle the
read-your-writes API consumes via
Self::wait_for_token; equivalent to calling Self::ingest
and pairing the returned seq with the envelope’s
meta.origin_hash, but does both in one shot so the binding
surface can round-trip a single value.
Sourcepub async fn wait_for_token(
&self,
token: WriteToken,
deadline: Duration,
) -> Result<(), WaitForTokenError>
pub async fn wait_for_token( &self, token: WriteToken, deadline: Duration, ) -> Result<(), WaitForTokenError>
Block until the fold task has processed every event up
through token.seq, or deadline elapses. Returns
Err(WaitForTokenError::Timeout) on deadline; Ok(())
once the watermark catches up (or the fold task stops —
see Self::wait_for_seq for the same caveat).
The token’s origin_hash is informational at this layer
— the generic CortexAdapter folds every event in its
RedEX file regardless of origin. Origin-bound adapters
(e.g. super::tasks::TasksAdapter,
super::memories::MemoriesAdapter) layer their own
origin assertion on top.
Sourcepub fn ryw_metrics(&self) -> RywMetricsSnapshot
pub fn ryw_metrics(&self) -> RywMetricsSnapshot
Snapshot the RYW counters for this adapter. Cheap; reads
four atomics under Relaxed.
Source§impl<State: Send + Sync + 'static> CortexAdapter<State>
impl<State: Send + Sync + 'static> CortexAdapter<State>
Sourcepub fn open<F>(
redex: &Redex,
name: &ChannelName,
redex_config: RedexFileConfig,
adapter_config: CortexAdapterConfig,
fold: F,
initial_state: State,
) -> Result<Self, CortexAdapterError>
pub fn open<F>( redex: &Redex, name: &ChannelName, redex_config: RedexFileConfig, adapter_config: CortexAdapterConfig, fold: F, initial_state: State, ) -> Result<Self, CortexAdapterError>
Open an adapter against a RedEX file.
Opens (or reuses) <redex>/<name> via
Redex::open_file,
spawns a background task that tails the file and drives
fold, and returns the handle.
Source§impl<State> CortexAdapter<State>
impl<State> CortexAdapter<State>
Sourcepub fn snapshot(&self) -> Result<(Vec<u8>, Option<u64>), CortexAdapterError>
pub fn snapshot(&self) -> Result<(Vec<u8>, Option<u64>), CortexAdapterError>
Capture a point-in-time snapshot of the materialized state.
Returns (state_bytes, last_seq) where state_bytes is the
postcard-serialized state and last_seq is the highest RedEX
sequence successfully applied to it as a strict prefix
(i.e. Self::applied_through_seq). Persist both together —
they form a consistent pair, guaranteed by the adapter
holding the state write lock while advancing the watermark.
Restore via Self::open_from_snapshot on a State that
also implements DeserializeOwned. Restore tails from
last_seq + 1, so any events that were processed but
skipped via recoverable_decode between snapshots are
re-attempted on restore — preserving “log is the source of
truth” even across the skip-and-continue path. (Pre-fix,
snapshot read folded_through_seq, which advances over
skipped events; restore tailed from past them and the gap
became permanent in durable state.)
Re-apply double-counting. state_bytes reflects
in-memory state at snapshot time, which includes the
effects of every successful fold including applies past
any prior skip. Restore tails from the strict-prefix
last_seq + 1, so seqs past the skip are re-fed to the
fold function — fold functions that are not idempotent
against re-application will produce divergent state on
restore. The mitigations: (1) make the fold idempotent
(the standard event-sourcing recommendation); (2)
snapshot only when applied_through_seq() == folded_through_seq() (no gap → no re-apply); or (3)
accept best-effort restore semantics for adapters that
have ever observed a recoverable_decode skip. The
trade-off vs. the pre-fix behavior is asymmetric:
pre-fix, the skipped seq was permanently lost; post-fix,
the skipped seq is re-attempted, at the cost of
double-applying intervening successful seqs for
non-idempotent folds.
last_seq is None if no event has been applied yet
since open (the snapshot is still meaningful — it
represents the initial State — but callers typically wait
until Self::wait_for_seq has returned and
Self::applied_through_seq has advanced before
snapshotting).
Source§impl<State> CortexAdapter<State>
impl<State> CortexAdapter<State>
Sourcepub fn open_from_snapshot<F>(
redex: &Redex,
name: &ChannelName,
redex_config: RedexFileConfig,
adapter_config: CortexAdapterConfig,
fold: F,
state_bytes: &[u8],
last_seq: Option<u64>,
) -> Result<Self, CortexAdapterError>
pub fn open_from_snapshot<F>( redex: &Redex, name: &ChannelName, redex_config: RedexFileConfig, adapter_config: CortexAdapterConfig, fold: F, state_bytes: &[u8], last_seq: Option<u64>, ) -> Result<Self, CortexAdapterError>
Open an adapter from a previously-captured snapshot, skipping
the [0, last_seq] replay.
state_bytes is the blob returned from Self::snapshot.
last_seq is its companion sequence. The tail starts at
last_seq + 1; the initial state is deserialized from the
blob; the fold task is spawned as usual.
If last_seq is None (no events had been folded at
snapshot time), the tail starts at seq 0 — equivalent to
StartPosition::FromBeginning with the deserialized initial
state.