Skip to main content

net/adapter/net/cortex/
adapter.rs

1//! `CortexAdapter<State>` — one RedEX file, one fold, one materialized
2//! state.
3
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::{Arc, OnceLock};
6use std::time::Duration;
7
8use futures::{Stream, StreamExt};
9use parking_lot::RwLock;
10use tokio::sync::{broadcast, Notify, Semaphore};
11use tokio_stream::wrappers::BroadcastStream;
12
13use serde::de::DeserializeOwned;
14use serde::Serialize;
15
16use super::super::channel::ChannelName;
17use super::super::redex::{
18    Redex, RedexError, RedexEvent, RedexFile, RedexFileConfig, RedexFold, WriteToken,
19};
20use super::config::{CortexAdapterConfig, FoldErrorPolicy, StartPosition};
21use super::envelope::IntoRedexPayload;
22use super::error::CortexAdapterError;
23use super::meta::EVENT_META_SIZE;
24
25/// Process-wide cap on in-flight RYW waits across every
26/// `CortexAdapter` in the process. `CortexAdapterConfig::ryw_inflight_cap`
27/// bounds per-adapter; this caps the total. Operators running
28/// thousands of adapters need the process-wide bound so the
29/// cumulative permit count doesn't dwarf the memory budget.
30///
31/// Default: no global cap. Operators install one at startup via
32/// [`set_global_ryw_inflight_cap`]; the install is one-shot
33/// (`OnceLock`), so subsequent calls return `false` and leave the
34/// previous cap in place.
35static GLOBAL_RYW_CAP: OnceLock<Arc<Semaphore>> = OnceLock::new();
36
37/// Install a process-wide cap on concurrent `wait_for_token`
38/// permits across every `CortexAdapter` instance. Call once at
39/// process startup before any RYW traffic. `cap` is clamped to a
40/// minimum of 1.
41///
42/// Returns `true` if the cap was installed by this call, `false`
43/// if a prior call already installed one.
44pub fn set_global_ryw_inflight_cap(cap: usize) -> bool {
45    GLOBAL_RYW_CAP
46        .set(Arc::new(Semaphore::new(cap.max(1))))
47        .is_ok()
48}
49
50/// Lookup the installed global cap; `None` when no cap has been
51/// set (default).
52fn current_global_ryw_semaphore() -> Option<Arc<Semaphore>> {
53    GLOBAL_RYW_CAP.get().cloned()
54}
55
56/// One-file CortEX adapter: projects envelopes into RedEX payloads,
57/// tails the same file, drives a [`RedexFold`] implementation, and
58/// exposes the materialized state as a read handle.
59///
60/// Created via [`Self::open`].
61pub struct CortexAdapter<State> {
62    inner: Arc<AdapterInner<State>>,
63}
64
65/// Capacity of the post-fold change-notification broadcast channel.
66/// A slow subscriber that falls more than this many events behind
67/// gets a `Lagged` signal and should re-read state fresh.
68const CHANGES_BROADCAST_CAP: usize = 64;
69
70/// Item type yielded by [`CortexAdapter::changes_with_lag`].
71///
72/// The plain `changes()` stream uses `filter_map(|r| r.ok())`
73/// which silently drops `BroadcastStream::Lagged(n)` errors —
74/// downstream telemetry consumers have no way to surface "you
75/// missed N changes." This enum exposes both shapes; subscribers
76/// who need only the latest sequence can stay on
77/// [`CortexAdapter::changes`].
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub enum ChangeEvent {
80    /// A successful fold apply produced this RedEX sequence.
81    Seq(u64),
82    /// The subscriber fell `n` events behind the broadcast channel
83    /// and `n` change notifications were dropped. By the time the
84    /// subscriber sees this, `state()` already reflects past those
85    /// events — the lag value is purely observability.
86    Lagged(u64),
87}
88
89struct AdapterInner<State> {
90    file: RedexFile,
91    state: Arc<RwLock<State>>,
92    /// Highest RedEX seq applied to state, as a signed i64 so we can
93    /// sentinel "nothing folded yet" with `start_seq - 1` (can be
94    /// negative when `start_seq == 0`).
95    /// Highest RedEX seq the fold task has *processed* — applied
96    /// successfully OR skipped via `recoverable_decode` under
97    /// `Stop` policy. The skip-and-continue path advances this
98    /// watermark so live consumers waiting via `wait_for_seq`
99    /// don't deadlock on a single permanently-bad event (the
100    /// DoS-resistance contract documented on
101    /// `RedexError::is_recoverable_decode`).
102    ///
103    /// **Use `applied_through_seq` instead** if you need
104    /// "state actually reflects this seq" semantics — that
105    /// watermark only advances on `Ok(())` folds, and is what
106    /// `snapshot` persists so restore tails from a position
107    /// consistent with the in-memory state.
108    ///
109    /// `u64::MAX` is the "nothing folded yet" sentinel; safe
110    /// because the `open_from_snapshot` overflow guard rejects
111    /// `last_seq == u64::MAX`, so no real event can ever occupy
112    /// that slot.
113    folded_through_seq: AtomicU64,
114    /// Highest RedEX seq K such that **every** seq in
115    /// `start_seq..=K` was applied to state via a successful
116    /// `RedexFold::apply`. A *strict-prefix* watermark — any
117    /// skip (`recoverable_decode` under `Stop`, or any error
118    /// under `LogAndContinue`) breaks the prefix at the skipped
119    /// seq, and `applied_through_seq` cannot advance past it
120    /// even when later seqs apply successfully. Distinct from
121    /// `folded_through_seq`, which is the highest *processed*
122    /// seq and advances over skips so live consumers don't
123    /// deadlock.
124    ///
125    /// Snapshot persists this value, so restore tails from
126    /// `applied_through_seq + 1` with the guarantee that **every
127    /// prior seq is reflected in state**. Without strict-prefix
128    /// semantics, a skip at seq M followed by successful applies
129    /// at M+1, M+2 would advance `applied` to M+2 (highest
130    /// individual apply); snapshot persists M+2; restore tails
131    /// from M+3; seq M is never re-attempted on restore — its
132    /// mutations are permanently lost from durable state, even
133    /// though the log still carries the event. That's the
134    /// "violates log is the source of truth" failure audited as
135    /// #6+#7.
136    ///
137    /// Same `u64::MAX` "nothing applied yet" sentinel as
138    /// `folded_through_seq`. `start_seq == 0` is the only
139    /// configuration that initializes to sentinel; an
140    /// `open_from_snapshot` with `last_seq = Some(K)` initializes
141    /// to `K` (the snapshot's contract is that `start_seq..=K`
142    /// is already in the rehydrated state).
143    applied_through_seq: AtomicU64,
144    /// First RedEX seq this adapter began folding from. Any seq
145    /// strictly below this is conceptually behind us — `wait_for_seq`
146    /// short-circuits without blocking on the watermark, even when
147    /// `start_seq == 0` puts the watermark at the `u64::MAX`
148    /// sentinel. Stored on inner so the wait predicate doesn't
149    /// need to reach back into the open-time `start_seq` local.
150    start_seq: u64,
151    fold_errors: AtomicU64,
152    running: AtomicBool,
153    closed: AtomicBool,
154    notify: Notify,
155    shutdown: Notify,
156    /// Broadcast of RedEX seqs after each successful (or LogAndContinue-skipped)
157    /// fold apply. Subscribers: see [`CortexAdapter::changes`].
158    changes_tx: broadcast::Sender<u64>,
159    /// Per-adapter cap on concurrent `wait_for_token` callers. `None`
160    /// when `CortexAdapterConfig::ryw_inflight_cap == 0` (unbounded).
161    /// Otherwise sized to `ryw_inflight_cap` permits; each pending
162    /// wait holds one permit for its duration. Exceeding the cap
163    /// returns `WaitForTokenError::QueueFull` immediately.
164    ryw_inflight: Option<Arc<Semaphore>>,
165    /// Per-adapter RYW counters. See [`RywMetricsSnapshot`].
166    ryw_metrics: RywMetricsAtomic,
167}
168
169/// Atomic counters backing [`CortexAdapter::ryw_metrics`]. One
170/// instance lives on each adapter (each adapter is per-channel
171/// already, so channel-labeling falls out of the adapter identity).
172#[derive(Debug, Default)]
173struct RywMetricsAtomic {
174    waits_total: AtomicU64,
175    timeouts_total: AtomicU64,
176    queue_full_total: AtomicU64,
177    wrong_origin_total: AtomicU64,
178    wait_duration_nanos_sum: AtomicU64,
179}
180
181/// Snapshot of the RYW counters on a [`CortexAdapter`].
182///
183/// All counters are monotonic; computing a rate is the caller's job
184/// (divide deltas between two snapshots by the sampling interval).
185/// `wait_duration_nanos_sum / waits_total` approximates the mean
186/// wait time without a full histogram.
187#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
188pub struct RywMetricsSnapshot {
189    /// Cumulative `wait_for_token` calls that took a permit (i.e.
190    /// were not rejected up-front with `QueueFull` / `WrongOrigin`).
191    pub waits_total: u64,
192    /// Cumulative waits that returned `Timeout`.
193    pub timeouts_total: u64,
194    /// Cumulative waits rejected with `QueueFull`.
195    pub queue_full_total: u64,
196    /// Cumulative waits rejected with `WrongOrigin` at the bound-
197    /// adapter layer. The generic `CortexAdapter::wait_for_token`
198    /// never increments this; only `TasksAdapter::wait_for_token`
199    /// and `MemoriesAdapter::wait_for_token` do, when their guard
200    /// fires.
201    pub wrong_origin_total: u64,
202    /// Sum of nanoseconds spent inside `wait_for_token` past the
203    /// permit acquisition. Divide by `waits_total` for the mean.
204    pub wait_duration_nanos_sum: u64,
205}
206
207impl<State> CortexAdapter<State> {
208    /// Read-only access to the materialized state. The returned `Arc`
209    /// is cheap to clone; all readers and the fold task share the
210    /// same `RwLock`.
211    pub fn state(&self) -> Arc<RwLock<State>> {
212        self.inner.state.clone()
213    }
214
215    /// Highest RedEX sequence the fold task has *processed* —
216    /// applied successfully OR skipped via `recoverable_decode`
217    /// under `Stop` policy.
218    ///
219    /// Use [`Self::applied_through_seq`] if you need "state
220    /// actually reflects this seq" semantics; the difference
221    /// matters under `Stop`+recoverable-decode where the
222    /// skip-and-continue path advances *this* watermark (so
223    /// `wait_for_seq` doesn't deadlock on a bad event) but
224    /// leaves `applied_through_seq` behind.
225    ///
226    /// `None` if no event has been folded yet since open.
227    pub fn folded_through_seq(&self) -> Option<u64> {
228        let v = self.inner.folded_through_seq.load(Ordering::Acquire);
229        if v == u64::MAX {
230            None
231        } else {
232            Some(v)
233        }
234    }
235
236    /// Highest RedEX sequence K such that *every* seq in
237    /// `start_seq..=K` was successfully applied to state via
238    /// `Ok(()) RedexFold::apply`. A *strict-prefix* watermark:
239    /// any skip (`recoverable_decode` under `Stop`, or any error
240    /// under `LogAndContinue`) at seq M permanently caps this
241    /// watermark at M-1, even if subsequent seqs apply
242    /// successfully.
243    ///
244    /// `snapshot` persists this value, so a restore tails from
245    /// `applied_through_seq + 1` and re-attempts every seq from
246    /// the first skip onwards — preserving "log is the source
247    /// of truth" even across the skip-and-continue path.
248    ///
249    /// Distinct from [`Self::folded_through_seq`], which is the
250    /// highest *processed* seq and advances over skips so live
251    /// consumers don't deadlock.
252    ///
253    /// `None` if no event has been applied yet since open
254    /// (start_seq=0 with no successful applies, or start_seq=0
255    /// with the very first event having been skipped).
256    pub fn applied_through_seq(&self) -> Option<u64> {
257        let v = self.inner.applied_through_seq.load(Ordering::Acquire);
258        if v == u64::MAX {
259            None
260        } else {
261            Some(v)
262        }
263    }
264
265    /// Cumulative count of fold errors (only ever increases under
266    /// [`FoldErrorPolicy::LogAndContinue`]; under `Stop` it is 0 or
267    /// 1, with the task exiting after the first error).
268    pub fn fold_errors(&self) -> u64 {
269        self.inner.fold_errors.load(Ordering::Acquire)
270    }
271
272    /// True if the fold task is currently running (has not observed
273    /// shutdown, an error under `Stop`, or a tail-end signal).
274    pub fn is_running(&self) -> bool {
275        self.inner.running.load(Ordering::Acquire)
276    }
277
278    /// Block until the fold task has *processed* every event up
279    /// through `seq` (applied successfully OR skipped via
280    /// `recoverable_decode` under `Stop` policy), or until the
281    /// fold task stops (e.g. close, non-recoverable fold error
282    /// under `Stop`).
283    ///
284    /// Returns `Ok(())` when the folded watermark reaches `seq`,
285    /// or `Err(folded)` where `folded` is the highest seq processed
286    /// before the fold task stopped (`None` if it stopped without
287    /// processing anything). Pre-fix this returned `()` for both
288    /// outcomes, so a caller waiting on a seq the fold task never
289    /// reached (close, Stop-policy halt, retention-evicted tail
290    /// lag) silently observed stale state. Mirrors
291    /// [`Self::wait_for_applied_seq`]'s shape.
292    ///
293    /// Use pattern:
294    /// ```ignore
295    /// let seq = adapter.ingest(envelope)?;
296    /// adapter.wait_for_seq(seq).await?;
297    /// let state = adapter.state().read();
298    /// // state reflects the ingest, UNLESS the event at `seq`
299    /// // was skipped via recoverable_decode under `Stop`.
300    /// ```
301    ///
302    /// **Subtle point.** This method waits on
303    /// [`Self::folded_through_seq`], which advances over
304    /// events the fold task processed but skipped via
305    /// `RedexError::is_recoverable_decode`. The skip-and-
306    /// continue path is the documented DoS-resistance contract
307    /// (a single corrupt event must not wedge the task forever).
308    /// If you need to confirm `state` actually reflects the
309    /// ingest at `seq`, follow up with
310    /// `adapter.applied_through_seq() >= Some(seq)`.
311    pub async fn wait_for_seq(&self, seq: u64) -> Result<(), Option<u64>> {
312        // Any seq strictly below `start_seq` is conceptually behind
313        // us — those events were applied before we opened the
314        // adapter (or are explicitly past the LiveOnly cutoff).
315        // Short-circuit returning immediately so a caller that
316        // passes a stale seq cannot hang. This also covers the
317        // `start_seq == 0 && seq == 0` blocked-forever-on-empty-
318        // file case for adapters opened with `FromBeginning` on a
319        // freshly-created log: `seq < start_seq` is false, but the
320        // sentinel check below correctly waits for the first event.
321        if seq < self.inner.start_seq {
322            return Ok(());
323        }
324        loop {
325            let notified = self.inner.notify.notified();
326            tokio::pin!(notified);
327            notified.as_mut().enable();
328
329            let watermark = self.inner.folded_through_seq.load(Ordering::Acquire);
330            // `u64::MAX` is the "nothing folded yet" sentinel — any
331            // other value is a real applied seq, so `watermark >= seq`
332            // after the sentinel check returns exactly when seq has
333            // been applied.
334            if watermark != u64::MAX && watermark >= seq {
335                return Ok(());
336            }
337            if !self.inner.running.load(Ordering::Acquire) {
338                return Err(self.folded_through_seq());
339            }
340            notified.await;
341        }
342    }
343
344    /// RYW-strength wait. Resolves when the **applied** watermark
345    /// (events that actually ran through the fold body, not
346    /// recoverable-skipped) catches up to `seq`, OR when the fold
347    /// task stops before reaching `seq`.
348    ///
349    /// Returns `Ok(())` on a real apply-through, `Err(applied)`
350    /// where `applied` is the last successfully-applied seq on
351    /// stop. Differs from [`Self::wait_for_seq`] which resolves
352    /// on the folded watermark (including skipped events). RYW
353    /// requires applied, not folded — otherwise a producer whose
354    /// write hit a recoverable-decode skip would observe
355    /// `Ok(())` and then read state that doesn't reflect the
356    /// write.
357    pub async fn wait_for_applied_seq(&self, seq: u64) -> Result<(), Option<u64>> {
358        if seq < self.inner.start_seq {
359            return Ok(());
360        }
361        loop {
362            let notified = self.inner.notify.notified();
363            tokio::pin!(notified);
364            notified.as_mut().enable();
365
366            let watermark = self.inner.applied_through_seq.load(Ordering::Acquire);
367            if watermark != u64::MAX && watermark >= seq {
368                return Ok(());
369            }
370            if !self.inner.running.load(Ordering::Acquire) {
371                // Stopped without ever reaching seq. Surface the
372                // last-applied watermark so the caller can build
373                // a typed error.
374                let applied = self.applied_through_seq();
375                return Err(applied);
376            }
377            notified.await;
378        }
379    }
380
381    /// Close the adapter. Stops the fold task (after it finishes any
382    /// in-progress apply), leaves the RedEX file open so other
383    /// adapters / callers can continue using it, and leaves the
384    /// state handle readable. Idempotent.
385    pub fn close(&self) -> Result<(), CortexAdapterError> {
386        if self.inner.closed.swap(true, Ordering::AcqRel) {
387            return Ok(());
388        }
389        // `notify_one()` stores a permit if the fold task hasn't yet
390        // reached its `shutdown.notified()` poll, so a close that
391        // races the spawn → first-select window is still observed.
392        // `notify_waiters()` would drop the signal in that window.
393        self.inner.shutdown.notify_one();
394        Ok(())
395    }
396
397    /// Stream of RedEX sequences, one per successful (or
398    /// `LogAndContinue`-skipped) fold application. Used by reactive
399    /// queries: on each emission, the caller re-reads
400    /// [`Self::state`] to compute its current view.
401    ///
402    /// Lag semantics: if a subscriber falls more than 64 events
403    /// behind (the internal broadcast channel capacity), the channel
404    /// drops intermediate events. This implementation filters lag
405    /// errors out silently — by the time the subscriber catches up,
406    /// `state()` reflects the latest applied events regardless of
407    /// how many signals were missed. Subscribers that need to
408    /// observe lag (e.g. for telemetry or reactive-backpressure)
409    /// should use [`Self::changes_with_lag`] instead.
410    ///
411    /// The stream ends when all adapter handles have been dropped
412    /// and the fold task has exited.
413    pub fn changes(&self) -> impl Stream<Item = u64> + Send + 'static {
414        BroadcastStream::new(self.inner.changes_tx.subscribe())
415            .filter_map(|r| async move { r.ok() })
416    }
417
418    /// Stream of changes that surfaces broadcast-channel lag as a
419    /// `Lagged(n)` event interleaved with the sequence emissions.
420    ///
421    /// The yielded items are [`ChangeEvent`]s — `Seq(u64)` for a
422    /// successful fold-apply notification, and `Lagged(n)` when the
423    /// subscriber fell `n` events behind the broadcast channel
424    /// (capacity 64). Pre-fix [`Self::changes`] silently dropped
425    /// `Lagged` errors via `filter_map(|r| r.ok())`; downstream
426    /// telemetry consumers had no way to surface "you missed N
427    /// changes." This method is the lossless counterpart — by the
428    /// time a subscriber sees `Lagged(n)`, `state()` already
429    /// reflects past those n events, so the subscriber can react
430    /// (re-read state, log lag, apply backpressure) without
431    /// missing data.
432    pub fn changes_with_lag(&self) -> impl Stream<Item = ChangeEvent> + Send + 'static {
433        use futures::StreamExt;
434        BroadcastStream::new(self.inner.changes_tx.subscribe()).map(|r| match r {
435            Ok(seq) => ChangeEvent::Seq(seq),
436            Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => {
437                ChangeEvent::Lagged(n)
438            }
439        })
440    }
441
442    /// Append an envelope. Projects to `(EventMeta, tail)`, builds the
443    /// concatenated payload, calls [`RedexFile::append`], and returns
444    /// the assigned RedEX sequence.
445    pub fn ingest<E: IntoRedexPayload>(&self, envelope: E) -> Result<u64, CortexAdapterError> {
446        if self.inner.closed.load(Ordering::Acquire) {
447            return Err(CortexAdapterError::Closed);
448        }
449        let (meta, tail) = envelope.into_redex_payload();
450        let mut buf = Vec::with_capacity(EVENT_META_SIZE + tail.len());
451        buf.extend_from_slice(&meta.to_bytes());
452        buf.extend_from_slice(&tail);
453        Ok(self.inner.file.append(&buf)?)
454    }
455
456    /// Append an envelope and return a [`WriteToken`] addressing
457    /// the resulting write. The token is the typed handle the
458    /// read-your-writes API consumes via
459    /// [`Self::wait_for_token`]; equivalent to calling [`Self::ingest`]
460    /// and pairing the returned seq with the envelope's
461    /// `meta.origin_hash`, but does both in one shot so the binding
462    /// surface can round-trip a single value.
463    pub fn ingest_with_token<E: IntoRedexPayload>(
464        &self,
465        envelope: E,
466    ) -> Result<WriteToken, CortexAdapterError> {
467        if self.inner.closed.load(Ordering::Acquire) {
468            return Err(CortexAdapterError::Closed);
469        }
470        let (meta, tail) = envelope.into_redex_payload();
471        let origin_hash = meta.origin_hash;
472        let mut buf = Vec::with_capacity(EVENT_META_SIZE + tail.len());
473        buf.extend_from_slice(&meta.to_bytes());
474        buf.extend_from_slice(&tail);
475        let seq = self.inner.file.append(&buf)?;
476        Ok(WriteToken::new(origin_hash, seq))
477    }
478
479    /// Block until the fold task has processed every event up
480    /// through `token.seq`, or `deadline` elapses. Returns
481    /// `Err(WaitForTokenError::Timeout)` on deadline; `Ok(())`
482    /// once the watermark catches up (or the fold task stops —
483    /// see [`Self::wait_for_seq`] for the same caveat).
484    ///
485    /// The token's `origin_hash` is informational at this layer
486    /// — the generic [`CortexAdapter`] folds every event in its
487    /// RedEX file regardless of origin. Origin-bound adapters
488    /// (e.g. [`super::tasks::TasksAdapter`],
489    /// [`super::memories::MemoriesAdapter`]) layer their own
490    /// origin assertion on top.
491    pub async fn wait_for_token(
492        &self,
493        token: WriteToken,
494        deadline: Duration,
495    ) -> Result<(), WaitForTokenError> {
496        // Try-acquire FIRST so backpressure surfaces before the timer
497        // arms — under saturation a `QueueFull` is the correct
498        // diagnostic, not a `Timeout` masking it.
499        //
500        // Two-tier acquire: global cap (process-wide; bounds
501        // cumulative permit count across every adapter) then
502        // per-adapter cap. Both must succeed; if either is
503        // saturated we bump the metric and return QueueFull. The
504        // global permit is dropped at function return; tests that
505        // exercise saturation rely on this being held for the
506        // duration of the wait.
507        let _global_permit = match current_global_ryw_semaphore() {
508            Some(sem) => Some(sem.try_acquire_owned().map_err(|_| {
509                self.inner
510                    .ryw_metrics
511                    .queue_full_total
512                    .fetch_add(1, Ordering::Relaxed);
513                WaitForTokenError::QueueFull
514            })?),
515            None => None,
516        };
517        let _permit = match &self.inner.ryw_inflight {
518            Some(sem) => Some(sem.clone().try_acquire_owned().map_err(|_| {
519                self.inner
520                    .ryw_metrics
521                    .queue_full_total
522                    .fetch_add(1, Ordering::Relaxed);
523                WaitForTokenError::QueueFull
524            })?),
525            None => None,
526        };
527        self.inner
528            .ryw_metrics
529            .waits_total
530            .fetch_add(1, Ordering::Relaxed);
531        let started = tokio::time::Instant::now();
532        // RYW waits on the *applied* watermark, not the folded
533        // watermark — events skipped via recoverable_decode under
534        // FoldErrorPolicy::Stop advance folded but not applied,
535        // and a producer whose write hit such a skip must NOT
536        // observe Ok(()) (that would let them read state that
537        // doesn't reflect their write).
538        let outcome =
539            match tokio::time::timeout(deadline, self.wait_for_applied_seq(token.seq)).await {
540                Ok(Ok(())) => Ok(()),
541                Ok(Err(applied_through_seq)) => Err(WaitForTokenError::FoldStopped {
542                    applied_through_seq,
543                }),
544                Err(_) => {
545                    self.inner
546                        .ryw_metrics
547                        .timeouts_total
548                        .fetch_add(1, Ordering::Relaxed);
549                    Err(WaitForTokenError::Timeout)
550                }
551            };
552        let nanos = u64::try_from(started.elapsed().as_nanos()).unwrap_or(u64::MAX);
553        self.inner
554            .ryw_metrics
555            .wait_duration_nanos_sum
556            .fetch_add(nanos, Ordering::Relaxed);
557        outcome
558    }
559
560    /// Snapshot the RYW counters for this adapter. Cheap; reads
561    /// four atomics under `Relaxed`.
562    pub fn ryw_metrics(&self) -> RywMetricsSnapshot {
563        let m = &self.inner.ryw_metrics;
564        RywMetricsSnapshot {
565            waits_total: m.waits_total.load(Ordering::Relaxed),
566            timeouts_total: m.timeouts_total.load(Ordering::Relaxed),
567            queue_full_total: m.queue_full_total.load(Ordering::Relaxed),
568            wrong_origin_total: m.wrong_origin_total.load(Ordering::Relaxed),
569            wait_duration_nanos_sum: m.wait_duration_nanos_sum.load(Ordering::Relaxed),
570        }
571    }
572
573    /// Bump the `wrong_origin_total` RYW counter. Called by the
574    /// origin-bound adapter wrappers when their guard fires; not
575    /// part of the generic adapter's own happy path.
576    pub(super) fn note_wrong_origin(&self) {
577        self.inner
578            .ryw_metrics
579            .wrong_origin_total
580            .fetch_add(1, Ordering::Relaxed);
581    }
582}
583
584/// Errors surfaced by [`CortexAdapter::wait_for_token`] and the
585/// origin-bound adapters that wrap it.
586#[derive(Debug, PartialEq, Eq)]
587pub enum WaitForTokenError {
588    /// Deadline elapsed before the fold watermark advanced to
589    /// the token's seq. The write may still land later; the
590    /// caller can retry with a fresh deadline or accept the
591    /// stale read.
592    Timeout,
593    /// Token belongs to a different origin than this adapter
594    /// folds. Origin-bound adapters surface this to catch the
595    /// caller-side aliasing where a token from chain A is waited
596    /// on against an adapter bound to chain B — the wait would
597    /// otherwise hang on a seq that can never land here.
598    WrongOrigin {
599        /// origin the token was issued for.
600        token_origin: u64,
601        /// origin this adapter is bound to.
602        adapter_origin: u64,
603    },
604    /// Per-channel in-flight cap is saturated — back-pressure for
605    /// callers who can shed load instead of stacking unbounded
606    /// pending waits. See
607    /// [`super::config::CortexAdapterConfig::with_ryw_inflight_cap`].
608    QueueFull,
609    /// Fold task stopped before the token's seq was reached.
610    /// Under `FoldErrorPolicy::Stop` an unrecoverable fold error
611    /// halts the task, and any pending RYW wait observing
612    /// `running == false` surfaces this variant rather than a
613    /// silent `Ok(())` — otherwise a producer cannot distinguish
614    /// "your write is visible" from "the adapter is dead and
615    /// never will reach your write."
616    FoldStopped {
617        /// Last seq the fold task fully applied before stopping.
618        /// `None` if the task stopped before applying any event.
619        applied_through_seq: Option<u64>,
620    },
621}
622
623impl std::fmt::Display for WaitForTokenError {
624    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
625        match self {
626            Self::Timeout => f.write_str("read-your-writes wait timed out"),
627            Self::WrongOrigin {
628                token_origin,
629                adapter_origin,
630            } => write!(
631                f,
632                "token origin {:016x} != adapter origin {:016x}",
633                token_origin, adapter_origin
634            ),
635            Self::QueueFull => f.write_str("read-your-writes wait-queue saturated; retry later"),
636            Self::FoldStopped {
637                applied_through_seq,
638            } => match applied_through_seq {
639                Some(seq) => write!(
640                    f,
641                    "fold task stopped before reaching token seq (applied through {})",
642                    seq
643                ),
644                None => f.write_str("fold task stopped before applying any event"),
645            },
646        }
647    }
648}
649
650impl std::error::Error for WaitForTokenError {}
651
652impl<State: Send + Sync + 'static> CortexAdapter<State> {
653    /// Open an adapter against a RedEX file.
654    ///
655    /// Opens (or reuses) `<redex>/<name>` via
656    /// [`Redex::open_file`](super::super::redex::Redex::open_file),
657    /// spawns a background task that tails the file and drives
658    /// `fold`, and returns the handle.
659    pub fn open<F>(
660        redex: &Redex,
661        name: &ChannelName,
662        redex_config: RedexFileConfig,
663        adapter_config: CortexAdapterConfig,
664        fold: F,
665        initial_state: State,
666    ) -> Result<Self, CortexAdapterError>
667    where
668        F: RedexFold<State> + Send + 'static,
669    {
670        // Positions that skip a non-empty event prefix require
671        // externally-rehydrated state — the watermark would
672        // otherwise advance past events the adapter never saw,
673        // making `wait_for_seq(k)` return immediately for skipped
674        // k while `state` has never observed those events.
675        // Callers using these positions must use
676        // `open_from_snapshot` (which carries the matching
677        // `last_seq` + serialized state) and routes through
678        // `open_unchecked` below.
679        match adapter_config.start {
680            StartPosition::FromBeginning => {}
681            StartPosition::LiveOnly => {
682                return Err(CortexAdapterError::InvalidStartPosition("LiveOnly"));
683            }
684            StartPosition::FromSeq(n) if n > 0 => {
685                return Err(CortexAdapterError::InvalidStartPosition("FromSeq(n>0)"));
686            }
687            StartPosition::FromSeq(_) => {} // FromSeq(0) is equivalent to FromBeginning
688        }
689        Self::open_unchecked(
690            redex,
691            name,
692            redex_config,
693            adapter_config,
694            fold,
695            initial_state,
696        )
697    }
698
699    /// Internal open path that bypasses the start-position
700    /// guard. Used by `open_from_snapshot`, where the externally-
701    /// rehydrated state is the legitimate reason to skip the
702    /// event prefix.
703    fn open_unchecked<F>(
704        redex: &Redex,
705        name: &ChannelName,
706        redex_config: RedexFileConfig,
707        adapter_config: CortexAdapterConfig,
708        mut fold: F,
709        initial_state: State,
710    ) -> Result<Self, CortexAdapterError>
711    where
712        F: RedexFold<State> + Send + 'static,
713    {
714        let file = redex.open_file(name, redex_config)?;
715
716        let start_seq = match adapter_config.start {
717            StartPosition::FromBeginning => 0,
718            StartPosition::LiveOnly => file.next_seq(),
719            StartPosition::FromSeq(n) => n,
720        };
721
722        let state = Arc::new(RwLock::new(initial_state));
723        // Initial watermark encodes "applied through start_seq-1", so
724        // `wait_for_seq(start_seq-1)` returns immediately after open
725        // (those seqs are conceptually behind us) while
726        // `wait_for_seq(start_seq)` blocks until the first event
727        // actually folds. `start_seq == 0` encodes the "nothing
728        // folded yet" state with the `u64::MAX` sentinel.
729        let initial_watermark: u64 = if start_seq == 0 {
730            u64::MAX
731        } else {
732            start_seq - 1
733        };
734        let (changes_tx, _) = broadcast::channel(CHANGES_BROADCAST_CAP);
735        let ryw_inflight = if adapter_config.ryw_inflight_cap == 0 {
736            None
737        } else {
738            Some(Arc::new(Semaphore::new(adapter_config.ryw_inflight_cap)))
739        };
740        let inner = Arc::new(AdapterInner {
741            file: file.clone(),
742            state: state.clone(),
743            folded_through_seq: AtomicU64::new(initial_watermark),
744            // Mirror folded's initial watermark: the snapshot/restore
745            // contract treats the sentinel as "nothing applied yet"
746            // and the (start_seq - 1) seed as "applied through the
747            // pre-snapshot prefix" — same semantics, applied to the
748            // strict watermark.
749            applied_through_seq: AtomicU64::new(initial_watermark),
750            start_seq,
751            fold_errors: AtomicU64::new(0),
752            running: AtomicBool::new(true),
753            closed: AtomicBool::new(false),
754            notify: Notify::new(),
755            shutdown: Notify::new(),
756            changes_tx,
757            ryw_inflight,
758            ryw_metrics: RywMetricsAtomic::default(),
759        });
760
761        let policy = adapter_config.on_fold_error;
762        let inner_task = inner.clone();
763
764        tokio::spawn(async move {
765            // Registration and consumption share a task. Pre-fix
766            // `file.tail(start_seq)` ran on the caller's task and the
767            // `tokio::spawn` then queued; concurrent appends in that
768            // window could saturate the bounded tail channel before
769            // this task polled even once, evicting the watcher with
770            // `Lagged` and killing the fold loop on its first item.
771            // Doing both inside the spawn pins them adjacent in
772            // scheduler order.
773            let mut stream = Box::pin(inner_task.file.tail(start_seq));
774            'outer: loop {
775                tokio::select! {
776                    biased;
777                    _ = inner_task.shutdown.notified() => {
778                        break 'outer;
779                    }
780                    next = stream.next() => {
781                        match next {
782                            None => break 'outer,
783                            Some(Err(RedexError::Lagged)) => {
784                                // Subscriber fell behind the bounded
785                                // tail buffer. Pre-fix this broke the
786                                // fold loop permanently and a watcher
787                                // sitting on `wait_for_seq` would
788                                // silently observe stale state past
789                                // the lag point.
790                                //
791                                // Recover by catching up the gap via
792                                // direct reads from the in-memory
793                                // index, then resubscribing live.
794                                // A naive `file.tail(folded+1)`
795                                // resubscribe would deadlock when the
796                                // gap exceeds `tail_buffer_size`:
797                                // backfill pre-flight would signal
798                                // `Lagged` again and we'd loop on the
799                                // same signal indefinitely.
800                                let resume_head = 'catchup: loop {
801                                    let folded = inner_task
802                                        .folded_through_seq
803                                        .load(Ordering::Acquire);
804                                    let resume = if folded == u64::MAX {
805                                        start_seq
806                                    } else {
807                                        folded.saturating_add(1)
808                                    };
809                                    let head = inner_task.file.next_seq();
810                                    if resume >= head {
811                                        break 'catchup Some(head);
812                                    }
813                                    let events = inner_task
814                                        .file
815                                        .read_range(resume, head);
816                                    if events.is_empty() {
817                                        // Retention has evicted the
818                                        // gap. The fold cannot recover
819                                        // what is no longer durable;
820                                        // halt rather than silently
821                                        // skip ahead.
822                                        tracing::error!(
823                                            gap_start = resume,
824                                            gap_end = head,
825                                            "cortex fold task cannot recover \
826                                             from tail lag: events evicted by \
827                                             retention"
828                                        );
829                                        break 'catchup None;
830                                    }
831                                    let mut halted = false;
832                                    for event in events {
833                                        if handle_event(
834                                            &inner_task,
835                                            &mut fold,
836                                            &event,
837                                            policy,
838                                        ) {
839                                            halted = true;
840                                            break;
841                                        }
842                                    }
843                                    if halted {
844                                        break 'catchup None;
845                                    }
846                                };
847                                match resume_head {
848                                    Some(head) => {
849                                        stream = Box::pin(
850                                            inner_task.file.tail(head),
851                                        );
852                                        tracing::warn!(
853                                            "cortex fold task recovered \
854                                             from tail lag"
855                                        );
856                                    }
857                                    None => break 'outer,
858                                }
859                            }
860                            Some(Err(_)) => {
861                                // Closed / Io / other terminal error.
862                                break 'outer;
863                            }
864                            Some(Ok(event)) => {
865                                if handle_event(
866                                    &inner_task,
867                                    &mut fold,
868                                    &event,
869                                    policy,
870                                ) {
871                                    break 'outer;
872                                }
873                            }
874                        }
875                    }
876                }
877            }
878            inner_task.running.store(false, Ordering::Release);
879            inner_task.notify.notify_waiters();
880        });
881
882        Ok(Self { inner })
883    }
884}
885
886impl<State> CortexAdapter<State>
887where
888    State: Serialize + Send + Sync + 'static,
889{
890    /// Capture a point-in-time snapshot of the materialized state.
891    ///
892    /// Returns `(state_bytes, last_seq)` where `state_bytes` is the
893    /// postcard-serialized state and `last_seq` is the highest RedEX
894    /// sequence successfully *applied* to it as a strict prefix
895    /// (i.e. [`Self::applied_through_seq`]). Persist both together —
896    /// they form a consistent pair, guaranteed by the adapter
897    /// holding the state write lock while advancing the watermark.
898    ///
899    /// Restore via [`Self::open_from_snapshot`] on a State that
900    /// also implements `DeserializeOwned`. Restore tails from
901    /// `last_seq + 1`, so any events that were processed but
902    /// *skipped* via `recoverable_decode` between snapshots are
903    /// re-attempted on restore — preserving "log is the source of
904    /// truth" even across the skip-and-continue path. (Pre-fix,
905    /// `snapshot` read `folded_through_seq`, which advances over
906    /// skipped events; restore tailed from past them and the gap
907    /// became permanent in durable state.)
908    ///
909    /// **Re-apply double-counting.** `state_bytes` reflects
910    /// in-memory state at snapshot time, which includes the
911    /// effects of every successful fold *including* applies past
912    /// any prior skip. Restore tails from the strict-prefix
913    /// `last_seq + 1`, so seqs past the skip are re-fed to the
914    /// fold function — fold functions that are not idempotent
915    /// against re-application will produce divergent state on
916    /// restore. The mitigations: (1) make the fold idempotent
917    /// (the standard event-sourcing recommendation); (2)
918    /// snapshot only when `applied_through_seq() ==
919    /// folded_through_seq()` (no gap → no re-apply); or (3)
920    /// accept best-effort restore semantics for adapters that
921    /// have ever observed a recoverable_decode skip. The
922    /// trade-off vs. the pre-fix behavior is asymmetric:
923    /// pre-fix, the skipped seq was permanently lost; post-fix,
924    /// the skipped seq is re-attempted, at the cost of
925    /// double-applying intervening successful seqs for
926    /// non-idempotent folds.
927    ///
928    /// `last_seq` is `None` if no event has been applied yet
929    /// since open (the snapshot is still meaningful — it
930    /// represents the initial State — but callers typically wait
931    /// until [`Self::wait_for_seq`] has returned and
932    /// [`Self::applied_through_seq`] has advanced before
933    /// snapshotting).
934    pub fn snapshot(&self) -> Result<(Vec<u8>, Option<u64>), CortexAdapterError> {
935        let state = self.inner.state.read();
936        let bytes = postcard::to_allocvec(&*state).map_err(|e| {
937            CortexAdapterError::Redex(RedexError::Encode(format!("snapshot serialize: {}", e)))
938        })?;
939        let watermark = self.inner.applied_through_seq.load(Ordering::Acquire);
940        let last_seq = if watermark == u64::MAX {
941            None
942        } else {
943            Some(watermark)
944        };
945        Ok((bytes, last_seq))
946    }
947}
948
949impl<State> CortexAdapter<State>
950where
951    State: DeserializeOwned + Send + Sync + 'static,
952{
953    /// Open an adapter from a previously-captured snapshot, skipping
954    /// the `[0, last_seq]` replay.
955    ///
956    /// `state_bytes` is the blob returned from [`Self::snapshot`].
957    /// `last_seq` is its companion sequence. The tail starts at
958    /// `last_seq + 1`; the initial state is deserialized from the
959    /// blob; the fold task is spawned as usual.
960    ///
961    /// If `last_seq` is `None` (no events had been folded at
962    /// snapshot time), the tail starts at seq 0 — equivalent to
963    /// `StartPosition::FromBeginning` with the deserialized initial
964    /// state.
965    pub fn open_from_snapshot<F>(
966        redex: &Redex,
967        name: &ChannelName,
968        redex_config: RedexFileConfig,
969        adapter_config: CortexAdapterConfig,
970        fold: F,
971        state_bytes: &[u8],
972        last_seq: Option<u64>,
973    ) -> Result<Self, CortexAdapterError>
974    where
975        F: RedexFold<State> + Send + 'static,
976    {
977        let initial_state: State = postcard::from_bytes(state_bytes).map_err(|e| {
978            CortexAdapterError::Redex(RedexError::Encode(format!("deserialize snapshot: {}", e)))
979        })?;
980        let start = match last_seq {
981            Some(n) => {
982                let next = n.checked_add(1).ok_or_else(|| {
983                    CortexAdapterError::Redex(RedexError::Encode(
984                        "snapshot last_seq at u64::MAX; cannot resume".to_string(),
985                    ))
986                })?;
987                StartPosition::FromSeq(next)
988            }
989            None => StartPosition::FromBeginning,
990        };
991        let config = CortexAdapterConfig {
992            start,
993            on_fold_error: adapter_config.on_fold_error,
994            ryw_inflight_cap: adapter_config.ryw_inflight_cap,
995        };
996        // Route through `open_unchecked` so the externally-
997        // rehydrated state can skip its event prefix.
998        Self::open_unchecked(redex, name, redex_config, config, fold, initial_state)
999    }
1000}
1001
1002impl<State> Clone for CortexAdapter<State> {
1003    fn clone(&self) -> Self {
1004        Self {
1005            inner: self.inner.clone(),
1006        }
1007    }
1008}
1009
1010impl<State> std::fmt::Debug for CortexAdapter<State> {
1011    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1012        f.debug_struct("CortexAdapter")
1013            .field("folded_through_seq", &self.folded_through_seq())
1014            .field("applied_through_seq", &self.applied_through_seq())
1015            .field("fold_errors", &self.fold_errors())
1016            .field("running", &self.is_running())
1017            .field("closed", &self.inner.closed.load(Ordering::Acquire))
1018            .finish()
1019    }
1020}
1021
1022/// Apply one event. Returns `true` if the task should exit
1023/// (Stop policy + error).
1024fn handle_event<State, F>(
1025    inner: &Arc<AdapterInner<State>>,
1026    fold: &mut F,
1027    event: &RedexEvent,
1028    policy: FoldErrorPolicy,
1029) -> bool
1030where
1031    F: RedexFold<State>,
1032{
1033    let seq = event.entry.seq;
1034    // Guard the `u64::MAX` sentinel. `applied_through_seq` /
1035    // `folded_through_seq` use `u64::MAX` to encode "nothing
1036    // folded yet"; a real applied seq of `u64::MAX` (only
1037    // reachable after the file's `next_seq` wraps a full 2^64
1038    // append range, but `wrapping_add(1)` in the strict-prefix
1039    // advance below can hit `u64::MAX` from `prev == u64::MAX -
1040    // 1`) would silently fold into the sentinel and `snapshot`
1041    // would persist `None`, making restore re-replay the entire
1042    // chain. `watermark.rs::Watermark` guards the analogous
1043    // `seq_or_ts` case the same way; the redex-seq path needs
1044    // the same gate. Halt rather than overflow the sentinel.
1045    if seq == u64::MAX {
1046        tracing::error!(
1047            "cortex fold halting: redex seq reached u64::MAX (sentinel \
1048             collision); this means the file has assigned every possible \
1049             seq value, an effectively impossible-without-bug condition"
1050        );
1051        return true;
1052    }
1053    // Hold the write lock across both the fold and the watermark
1054    // update so that a `snapshot()` holding `state.read()` observes
1055    // a consistent `(state, folded_through_seq)` pair — otherwise
1056    // the state could reflect seq N while the watermark still reads
1057    // N-1, causing restore to double-apply event N.
1058    let result = {
1059        let mut state = inner.state.write();
1060        let r = fold.apply(event, &mut state);
1061        // Under `Stop` policy, a per-event recoverable decode
1062        // failure (postcard error, EventMeta shape mismatch —
1063        // anything `RedexError::is_recoverable_decode` flags) is
1064        // treated as skip-and-continue rather than halting. Halting
1065        // on every such failure would let a single bad event (disk
1066        // corruption past the 32-bit checksum, or a
1067        // deliberately-crafted matching-collision tail) wedge the
1068        // fold task permanently — a DoS vector against multi-tenant
1069        // cortex instances via one bad event. Stream-level errors
1070        // (`Io`, `Closed`, `Lagged`) and authorization /
1071        // configuration errors still halt under `Stop` as
1072        // documented.
1073        //
1074        // Two watermarks; both written under the state write lock
1075        // so a concurrent `snapshot` reading either one observes a
1076        // value consistent with the state it sees:
1077        //   - `applied_through_seq` is a STRICT-PREFIX watermark:
1078        //     it advances by exactly one only when this seq is
1079        //     the immediate successor of the current value (or
1080        //     equals `start_seq` from the sentinel state). Any
1081        //     skip — `recoverable_decode` under `Stop`, OR any
1082        //     error under `LogAndContinue` — breaks the prefix
1083        //     at the skipped seq; further successful applies
1084        //     cannot heal it. `snapshot` persists this watermark
1085        //     so restore tails from a position guaranteed to
1086        //     have every prior seq reflected in state.
1087        //   - `folded_through_seq` is the HIGHEST PROCESSED
1088        //     watermark: advances on `Ok(())` AND on every skip
1089        //     path. Live consumers waiting via `wait_for_seq` use
1090        //     this watermark so a single bad event doesn't
1091        //     deadlock them — the DoS-resistance contract.
1092        // Why two watermarks, not one strict-prefix that lives
1093        // consumers also wait on: collapsing them re-introduces
1094        // the deadlock — a consumer waiting for the skipped seq
1095        // would block forever even though the fold task has
1096        // moved on.
1097        // Order: `applied` is stored first so any reader that
1098        // observes the `folded` Release also synchronizes-with
1099        // the `applied` Release that preceded it — `applied <=
1100        // folded` is therefore an invariant at every observable
1101        // point.
1102        let applied = matches!(&r, Ok(()));
1103        let recoverable_decode = matches!(&r, Err(e) if e.is_recoverable_decode());
1104        let advance = applied
1105            || matches!((&r, policy), (Err(_), FoldErrorPolicy::LogAndContinue))
1106            || recoverable_decode;
1107        if applied {
1108            // Strict-prefix advance: bump by exactly one if this
1109            // seq is the immediate successor of the current
1110            // applied watermark (or this is the very first event
1111            // and matches `start_seq`). Otherwise the prefix is
1112            // broken by a prior skip; leave the watermark alone
1113            // and let restore re-attempt from the gap.
1114            let prev = inner.applied_through_seq.load(Ordering::Acquire);
1115            let advance_applied = if prev == u64::MAX {
1116                seq == inner.start_seq
1117            } else {
1118                seq == prev.wrapping_add(1)
1119            };
1120            if advance_applied {
1121                inner.applied_through_seq.store(seq, Ordering::Release);
1122            }
1123        }
1124        if advance {
1125            inner.folded_through_seq.store(seq, Ordering::Release);
1126        }
1127        r
1128    };
1129
1130    match result {
1131        Ok(()) => {
1132            inner.notify.notify_waiters();
1133            let _ = inner.changes_tx.send(seq);
1134            false
1135        }
1136        Err(err) => {
1137            inner.fold_errors.fetch_add(1, Ordering::AcqRel);
1138            tracing::warn!(seq = seq, error = %err, "cortex fold error");
1139            // Per-event decode errors always skip-and-continue;
1140            // only stream-level / configuration errors halt under
1141            // `Stop`.
1142            let recoverable_decode = err.is_recoverable_decode();
1143            match policy {
1144                FoldErrorPolicy::Stop if !recoverable_decode => {
1145                    // Wake subscribers via `notify_waiters` so
1146                    // anything parked on `inner.notify` unblocks
1147                    // and can observe the halt via
1148                    // `is_running()`. Do NOT broadcast `seq` on
1149                    // `changes_tx`: this branch did not advance
1150                    // `folded_through_seq` (the `advance` gate
1151                    // above is false for `Stop + non-recoverable`),
1152                    // and `changes_tx` is documented as carrying
1153                    // *successful fold-apply* notifications. A
1154                    // `ChangeEvent::Seq(seq)` for an unapplied
1155                    // sequence would mislead consumers into
1156                    // thinking the watermark advanced past the
1157                    // failure — the very mis-routing the broadcast
1158                    // contract was designed to avoid.
1159                    //
1160                    // The trade-off: subscribers using
1161                    // `changes_with_lag()` won't see a terminal
1162                    // event in the stream on halt; they need to
1163                    // poll `is_running()` separately (or rely on
1164                    // the broadcast channel ending when all adapter
1165                    // handles are dropped). That's the documented
1166                    // failure mode for non-recoverable halts —
1167                    // surfacing a phantom seq was not.
1168                    inner.notify.notify_waiters();
1169                    true
1170                }
1171                FoldErrorPolicy::Stop | FoldErrorPolicy::LogAndContinue => {
1172                    // Watermark was already advanced inside the lock
1173                    // above; just notify waiters.
1174                    inner.notify.notify_waiters();
1175                    let _ = inner.changes_tx.send(seq);
1176                    false
1177                }
1178            }
1179        }
1180    }
1181}
1182
1183#[cfg(test)]
1184mod tests {
1185    use super::super::super::channel::ChannelName;
1186    use super::super::super::redex::{RedexError, RedexFold};
1187    use super::super::envelope::EventEnvelope;
1188    use super::super::meta::EventMeta;
1189    use super::*;
1190    use bytes::Bytes;
1191
1192    fn cn(s: &str) -> ChannelName {
1193        ChannelName::new(s).unwrap()
1194    }
1195
1196    struct CountFold;
1197    impl RedexFold<u64> for CountFold {
1198        fn apply(&mut self, _ev: &RedexEvent, state: &mut u64) -> Result<(), RedexError> {
1199            *state += 1;
1200            Ok(())
1201        }
1202    }
1203
1204    /// Pin: `wait_for_seq(seq)` short-circuits without blocking
1205    /// when `seq < start_seq` — those events were applied before
1206    /// the adapter opened (e.g. they're folded into the snapshot
1207    /// the caller passed to `open_from_snapshot`). Pre-fix the
1208    /// function used only the `watermark >= seq` check; until at
1209    /// least one event landed under the new adapter, the
1210    /// `u64::MAX` "nothing folded yet" sentinel kept the
1211    /// comparison false and a caller waiting for a stale seq
1212    /// would block forever.
1213    #[tokio::test]
1214    async fn wait_for_seq_short_circuits_below_start_seq() {
1215        let redex = Redex::new();
1216        // Pre-populate the file with 5 events via a temporary
1217        // FromBeginning adapter, then snapshot.
1218        let bytes;
1219        let last_seq;
1220        {
1221            let pre = CortexAdapter::<u64>::open(
1222                &redex,
1223                &cn("cortex/short-circuit"),
1224                RedexFileConfig::default(),
1225                CortexAdapterConfig::default(),
1226                CountFold,
1227                0u64,
1228            )
1229            .unwrap();
1230            for i in 0..5u64 {
1231                let meta = EventMeta::new(1, 0, 1, i, 0);
1232                let env = EventEnvelope::new(meta, Bytes::from_static(b""));
1233                let seq = pre.ingest(env).unwrap();
1234                pre.wait_for_seq(seq).await.unwrap();
1235            }
1236            let (b, ls) = pre.snapshot().unwrap();
1237            bytes = b;
1238            last_seq = ls;
1239            pre.close().unwrap();
1240        }
1241
1242        // Restore from snapshot: `start_seq` is `last_seq + 1 =
1243        // 5` (the snapshot already absorbed seqs 0..=4). Any
1244        // wait_for_seq below 5 is conceptually behind us.
1245        let adapter = CortexAdapter::<u64>::open_from_snapshot(
1246            &redex,
1247            &cn("cortex/short-circuit"),
1248            RedexFileConfig::default(),
1249            CortexAdapterConfig::default(),
1250            CountFold,
1251            &bytes,
1252            last_seq,
1253        )
1254        .unwrap();
1255
1256        // wait_for_seq(0..5) must all return immediately.
1257        // Wrap each in a tight timeout — pre-fix behavior was
1258        // an indefinite block.
1259        for seq in 0..5u64 {
1260            tokio::time::timeout(std::time::Duration::from_secs(2), adapter.wait_for_seq(seq))
1261                .await
1262                .unwrap_or_else(|_| {
1263                    panic!(
1264                        "wait_for_seq({}) blocked past start_seq=5 — \
1265                     short-circuit regressed",
1266                        seq
1267                    )
1268                })
1269                .expect("wait_for_seq must Ok-return when seq < start_seq");
1270        }
1271    }
1272
1273    #[tokio::test]
1274    async fn test_open_ingest_wait_query() {
1275        let redex = Redex::new();
1276        let adapter = CortexAdapter::<u64>::open(
1277            &redex,
1278            &cn("cortex/counts"),
1279            RedexFileConfig::default(),
1280            CortexAdapterConfig::default(),
1281            CountFold,
1282            0u64,
1283        )
1284        .unwrap();
1285
1286        for i in 0..10u64 {
1287            let meta = EventMeta::new(1, 0, 1, i, 0);
1288            let env = EventEnvelope::new(meta, Bytes::from_static(b""));
1289            let seq = adapter.ingest(env).unwrap();
1290            adapter.wait_for_seq(seq).await.unwrap();
1291        }
1292
1293        assert_eq!(*adapter.state().read(), 10);
1294        assert_eq!(adapter.fold_errors(), 0);
1295        assert!(adapter.is_running());
1296    }
1297
1298    #[tokio::test]
1299    async fn ingest_with_token_carries_envelope_origin_and_assigned_seq() {
1300        let redex = Redex::new();
1301        let adapter = CortexAdapter::<u64>::open(
1302            &redex,
1303            &cn("cortex/ryw-token"),
1304            RedexFileConfig::default(),
1305            CortexAdapterConfig::default(),
1306            CountFold,
1307            0u64,
1308        )
1309        .unwrap();
1310
1311        let origin: u64 = 0xCAFE_F00D_DEAD_BEEF;
1312        let meta = EventMeta::new(1, 0, origin, 0, 0);
1313        let env = EventEnvelope::new(meta, Bytes::from_static(b""));
1314        let token = adapter.ingest_with_token(env).unwrap();
1315
1316        assert_eq!(token.origin_hash, origin);
1317        assert_eq!(token.seq, 0);
1318
1319        adapter
1320            .wait_for_token(token, Duration::from_secs(2))
1321            .await
1322            .unwrap();
1323        assert_eq!(*adapter.state().read(), 1);
1324    }
1325
1326    #[tokio::test]
1327    async fn wait_for_token_returns_queue_full_above_cap() {
1328        let redex = Redex::new();
1329        let cfg = CortexAdapterConfig::default().with_ryw_inflight_cap(2);
1330        let adapter = Arc::new(
1331            CortexAdapter::<u64>::open(
1332                &redex,
1333                &cn("cortex/ryw-queue"),
1334                RedexFileConfig::default(),
1335                cfg,
1336                CountFold,
1337                0u64,
1338            )
1339            .unwrap(),
1340        );
1341
1342        // Pin two waiters on a seq that never lands — they hold the
1343        // permits until their deadline elapses.
1344        let token = WriteToken::new(0xABCD_EF01, 999);
1345        let a = adapter.clone();
1346        let h1 = tokio::spawn(async move { a.wait_for_token(token, Duration::from_secs(5)).await });
1347        let a = adapter.clone();
1348        let h2 = tokio::spawn(async move { a.wait_for_token(token, Duration::from_secs(5)).await });
1349
1350        // Give both tasks a moment to claim their permits.
1351        tokio::time::sleep(Duration::from_millis(50)).await;
1352
1353        // A third waiter on the same adapter should be rejected
1354        // immediately with QueueFull — no wait, no timeout.
1355        let started = tokio::time::Instant::now();
1356        let err = adapter
1357            .wait_for_token(token, Duration::from_secs(5))
1358            .await
1359            .unwrap_err();
1360        assert_eq!(err, WaitForTokenError::QueueFull);
1361        assert!(
1362            started.elapsed() < Duration::from_millis(100),
1363            "QueueFull must return immediately, not wait for deadline"
1364        );
1365
1366        // Drop the holders so their tasks finish on their own
1367        // schedule.
1368        let _ = (h1, h2);
1369    }
1370
1371    #[tokio::test]
1372    async fn ryw_metrics_track_waits_timeouts_and_queue_full() {
1373        let redex = Redex::new();
1374        let cfg = CortexAdapterConfig::default().with_ryw_inflight_cap(1);
1375        let adapter = Arc::new(
1376            CortexAdapter::<u64>::open(
1377                &redex,
1378                &cn("cortex/ryw-metrics"),
1379                RedexFileConfig::default(),
1380                cfg,
1381                CountFold,
1382                0u64,
1383            )
1384            .unwrap(),
1385        );
1386
1387        // No waits yet → all zeros.
1388        let s0 = adapter.ryw_metrics();
1389        assert_eq!(s0, RywMetricsSnapshot::default());
1390
1391        // Pin a long waiter to hold the only permit, then attempt a
1392        // second wait — it must hit QueueFull and bump that counter.
1393        let token = WriteToken::new(0xABCD_EF01, 999);
1394        let a = adapter.clone();
1395        let holder = tokio::spawn(async move {
1396            let _ = a.wait_for_token(token, Duration::from_secs(2)).await;
1397        });
1398        tokio::time::sleep(Duration::from_millis(50)).await;
1399        let err = adapter
1400            .wait_for_token(token, Duration::from_secs(1))
1401            .await
1402            .unwrap_err();
1403        assert_eq!(err, WaitForTokenError::QueueFull);
1404
1405        // Wait for the holder to time out, then check the counters.
1406        holder.await.unwrap();
1407        let s1 = adapter.ryw_metrics();
1408        assert_eq!(s1.waits_total, 1, "holder takes one permit + one wait slot");
1409        assert_eq!(s1.timeouts_total, 1, "holder's deadline elapsed");
1410        assert_eq!(s1.queue_full_total, 1, "saturating attempt was rejected");
1411        assert_eq!(s1.wrong_origin_total, 0);
1412        assert!(
1413            s1.wait_duration_nanos_sum >= 1_000_000_000,
1414            "holder waited at least its 2s deadline, observed {}ns",
1415            s1.wait_duration_nanos_sum
1416        );
1417    }
1418
1419    #[tokio::test]
1420    async fn wait_for_token_with_zero_cap_skips_queue_check() {
1421        let redex = Redex::new();
1422        let cfg = CortexAdapterConfig::default().with_ryw_inflight_cap(0);
1423        let adapter = CortexAdapter::<u64>::open(
1424            &redex,
1425            &cn("cortex/ryw-uncapped"),
1426            RedexFileConfig::default(),
1427            cfg,
1428            CountFold,
1429            0u64,
1430        )
1431        .unwrap();
1432
1433        // With cap=0 the semaphore is None; the path goes straight
1434        // to the deadline.
1435        let token = WriteToken::new(0xABCD_EF01, 999);
1436        let err = adapter
1437            .wait_for_token(token, Duration::from_millis(20))
1438            .await
1439            .unwrap_err();
1440        assert_eq!(err, WaitForTokenError::Timeout);
1441    }
1442
1443    #[tokio::test]
1444    async fn wait_for_token_times_out_when_seq_never_lands() {
1445        let redex = Redex::new();
1446        let adapter = CortexAdapter::<u64>::open(
1447            &redex,
1448            &cn("cortex/ryw-timeout"),
1449            RedexFileConfig::default(),
1450            CortexAdapterConfig::default(),
1451            CountFold,
1452            0u64,
1453        )
1454        .unwrap();
1455
1456        let unreachable = WriteToken::new(0xDEAD_BEEF, 999);
1457        let err = adapter
1458            .wait_for_token(unreachable, Duration::from_millis(50))
1459            .await
1460            .unwrap_err();
1461        assert_eq!(err, WaitForTokenError::Timeout);
1462    }
1463
1464    #[tokio::test]
1465    async fn test_close_stops_fold_task() {
1466        let redex = Redex::new();
1467        let adapter = CortexAdapter::<u64>::open(
1468            &redex,
1469            &cn("cortex/close"),
1470            RedexFileConfig::default(),
1471            CortexAdapterConfig::default(),
1472            CountFold,
1473            0u64,
1474        )
1475        .unwrap();
1476
1477        adapter.close().unwrap();
1478        // Close is idempotent.
1479        adapter.close().unwrap();
1480
1481        // Ingest after close returns Closed.
1482        let meta = EventMeta::new(0, 0, 0, 0, 0);
1483        let env = EventEnvelope::new(meta, Bytes::from_static(b""));
1484        let err = adapter.ingest(env).unwrap_err();
1485        assert!(matches!(err, CortexAdapterError::Closed));
1486
1487        // State handle still readable.
1488        assert_eq!(*adapter.state().read(), 0);
1489    }
1490
1491    struct FailAtSeq(u64);
1492    impl RedexFold<u64> for FailAtSeq {
1493        fn apply(&mut self, ev: &RedexEvent, state: &mut u64) -> Result<(), RedexError> {
1494            if ev.entry.seq == self.0 {
1495                Err(RedexError::Encode(format!(
1496                    "deliberate failure at seq {}",
1497                    ev.entry.seq
1498                )))
1499            } else {
1500                *state += 1;
1501                Ok(())
1502            }
1503        }
1504    }
1505
1506    /// Returns `RedexError::Decode` (recoverable) at the
1507    /// configured seqs; counts the attempt so tests can assert
1508    /// re-attempt after restore. Apply on any other seq bumps
1509    /// state.
1510    struct FailDecodeAtSeqs {
1511        skip: std::collections::HashSet<u64>,
1512        attempts: Arc<AtomicU64>,
1513    }
1514    impl RedexFold<u64> for FailDecodeAtSeqs {
1515        fn apply(&mut self, ev: &RedexEvent, state: &mut u64) -> Result<(), RedexError> {
1516            self.attempts.fetch_add(1, Ordering::AcqRel);
1517            if self.skip.contains(&ev.entry.seq) {
1518                Err(RedexError::Decode(format!(
1519                    "deliberate decode skip at seq {}",
1520                    ev.entry.seq
1521                )))
1522            } else {
1523                *state += 1;
1524                Ok(())
1525            }
1526        }
1527    }
1528
1529    #[tokio::test]
1530    async fn test_stop_policy_halts_on_first_error() {
1531        let redex = Redex::new();
1532        let adapter = CortexAdapter::<u64>::open(
1533            &redex,
1534            &cn("cortex/stop"),
1535            RedexFileConfig::default(),
1536            CortexAdapterConfig::default(), // Stop is default
1537            FailAtSeq(3),
1538            0u64,
1539        )
1540        .unwrap();
1541
1542        for i in 0..10u64 {
1543            let meta = EventMeta::new(0, 0, 0, i, 0);
1544            let env = EventEnvelope::new(meta, Bytes::from_static(b""));
1545            adapter.ingest(env).unwrap();
1546        }
1547
1548        // Wait until fold task stops. wait_for_seq surfaces the
1549        // halt as Err(Some(folded_through)) — pre-fix it returned
1550        // silently and the caller had no way to distinguish a
1551        // halt from a successful fold-through.
1552        let folded = adapter
1553            .wait_for_seq(10)
1554            .await
1555            .expect_err("Stop policy must surface the halt to wait_for_seq");
1556        // Seqs 0..=2 folded; seq 3 errored; the watermark caps at
1557        // 2 under Stop+non-recoverable so `folded_through` reflects
1558        // the successful prefix.
1559        assert_eq!(folded, Some(2));
1560        assert!(!adapter.is_running());
1561        assert_eq!(adapter.fold_errors(), 1);
1562        // Seqs 0..=2 folded; seq 3 errored; seqs 4..=9 never folded.
1563        assert_eq!(*adapter.state().read(), 3);
1564    }
1565
1566    /// `changes_tx` is the broadcast channel `changes_with_lag`
1567    /// surfaces as `ChangeEvent::Seq(u64)` — documented as
1568    /// "successful fold-apply notification". On the
1569    /// Stop+non-recoverable halt path, the watermark
1570    /// (`folded_through_seq`) is NOT advanced, so emitting the
1571    /// failing seq on `changes_tx` would mis-represent an
1572    /// unapplied event as if it were folded. Subscribers reading
1573    /// the broadcast and trusting the contract would advance
1574    /// their own state machines past the failure.
1575    ///
1576    /// Pin: after a Stop-policy halt, the broadcast must contain
1577    /// the prefix that *did* apply (seqs 0..=2), and must NOT
1578    /// contain the failing seq (3) or any later seq.
1579    #[tokio::test]
1580    async fn stop_policy_does_not_broadcast_failing_seq() {
1581        use futures::StreamExt;
1582
1583        let redex = Redex::new();
1584        let adapter = CortexAdapter::<u64>::open(
1585            &redex,
1586            &cn("cortex/stop-no-phantom-seq"),
1587            RedexFileConfig::default(),
1588            CortexAdapterConfig::default(), // Stop is default
1589            FailAtSeq(3),
1590            0u64,
1591        )
1592        .unwrap();
1593
1594        // Subscribe BEFORE ingesting so we capture every seq.
1595        let mut changes = adapter.changes_with_lag();
1596
1597        for i in 0..10u64 {
1598            let meta = EventMeta::new(0, 0, 0, i, 0);
1599            let env = EventEnvelope::new(meta, Bytes::from_static(b""));
1600            adapter.ingest(env).unwrap();
1601        }
1602
1603        // Wait for halt. wait_for_seq now surfaces the halt as
1604        // `Err(Some(folded))` rather than silently returning.
1605        let _ = adapter
1606            .wait_for_seq(10)
1607            .await
1608            .expect_err("Stop policy halt");
1609        assert!(!adapter.is_running(), "Stop policy must halt the task");
1610        assert_eq!(adapter.fold_errors(), 1);
1611
1612        // Drain the broadcast with a short bound so a regression
1613        // that re-emits the phantom seq doesn't hang the test.
1614        let mut received: Vec<u64> = Vec::new();
1615        loop {
1616            match tokio::time::timeout(std::time::Duration::from_millis(50), changes.next()).await {
1617                Ok(Some(ChangeEvent::Seq(s))) => received.push(s),
1618                Ok(Some(ChangeEvent::Lagged(_))) => continue,
1619                Ok(None) | Err(_) => break,
1620            }
1621        }
1622
1623        // Successful prefix (0, 1, 2) must be visible. The failing
1624        // seq (3) and any later seq must NOT.
1625        assert_eq!(
1626            received,
1627            vec![0, 1, 2],
1628            "broadcast must carry only successfully-folded seqs; \
1629             pre-fix this would include 3 (the failing seq) as a \
1630             phantom Seq(3) event, mis-routing subscribers' state"
1631        );
1632    }
1633
1634    // ========================================================================
1635    // applied_through_seq vs folded_through_seq
1636    // ========================================================================
1637
1638    /// Under `Stop` policy, a per-event recoverable decode error
1639    /// advances `folded_through_seq` (so live consumers don't
1640    /// deadlock on a permanently-bad event) but must NOT advance
1641    /// `applied_through_seq` (state at the skipped seq was never
1642    /// written). When the two were the same atomic,
1643    /// `wait_for_seq(seq)` returned claiming "state reflects seq"
1644    /// for events whose mutation never landed.
1645    #[tokio::test]
1646    async fn recoverable_decode_skip_advances_folded_but_not_applied() {
1647        let redex = Redex::new();
1648        let attempts = Arc::new(AtomicU64::new(0));
1649        let fold = FailDecodeAtSeqs {
1650            skip: [3u64].into_iter().collect(),
1651            attempts: attempts.clone(),
1652        };
1653        let adapter = CortexAdapter::<u64>::open(
1654            &redex,
1655            &cn("cortex/audit-6-skip"),
1656            RedexFileConfig::default(),
1657            CortexAdapterConfig::default(), // Stop is default
1658            fold,
1659            0u64,
1660        )
1661        .unwrap();
1662
1663        for i in 0..6u64 {
1664            let meta = EventMeta::new(0, 0, 0, i, 0);
1665            let env = EventEnvelope::new(meta, Bytes::from_static(b""));
1666            adapter.ingest(env).unwrap();
1667        }
1668        adapter.wait_for_seq(5).await.unwrap();
1669
1670        // Live consumer didn't deadlock — folded reached the tail.
1671        assert_eq!(
1672            adapter.folded_through_seq(),
1673            Some(5),
1674            "folded must advance over the recoverable_decode skip",
1675        );
1676        // STRICT-PREFIX: applied caps at 2 (the last consecutive
1677        // successful seq before the gap at seq 3). Subsequent
1678        // successful applies at 4 and 5 do NOT heal the prefix.
1679        assert_eq!(
1680            adapter.applied_through_seq(),
1681            Some(2),
1682            "strict-prefix watermark caps at the last consecutive Ok before the skip",
1683        );
1684        // State count = 5 (seqs 0,1,2,4,5 applied; 3 skipped).
1685        // The fold function still mutated state for the seqs it
1686        // saw — only the WATERMARK is strict-prefix.
1687        assert_eq!(*adapter.state().read(), 5);
1688        assert_eq!(adapter.fold_errors(), 1);
1689        // Task still running — recoverable decode does not halt
1690        // under Stop.
1691        assert!(adapter.is_running());
1692        // Six attempts (one per event). The `attempts` counter
1693        // pins that the fold function was invoked for the
1694        // skipped seq (skip is detected inside the fold, not
1695        // upstream).
1696        assert_eq!(attempts.load(Ordering::Acquire), 6);
1697    }
1698
1699    /// When the FIRST event is skipped via
1700    /// recoverable_decode, `applied_through_seq`
1701    /// stays at the "nothing applied yet" sentinel until a
1702    /// subsequent successful fold. This is the strict shape: an
1703    /// adapter freshly opened that has only seen skipped events
1704    /// reports `None` from `applied_through_seq`, even though
1705    /// `folded_through_seq` reports the highest skipped seq.
1706    #[tokio::test]
1707    async fn applied_stays_at_sentinel_when_only_skipped_events_processed() {
1708        let redex = Redex::new();
1709        let attempts = Arc::new(AtomicU64::new(0));
1710        let fold = FailDecodeAtSeqs {
1711            skip: [0u64, 1, 2].into_iter().collect(),
1712            attempts: attempts.clone(),
1713        };
1714        let adapter = CortexAdapter::<u64>::open(
1715            &redex,
1716            &cn("cortex/audit-6-only-skips"),
1717            RedexFileConfig::default(),
1718            CortexAdapterConfig::default(),
1719            fold,
1720            0u64,
1721        )
1722        .unwrap();
1723
1724        for i in 0..3u64 {
1725            let meta = EventMeta::new(0, 0, 0, i, 0);
1726            let env = EventEnvelope::new(meta, Bytes::from_static(b""));
1727            adapter.ingest(env).unwrap();
1728        }
1729        adapter.wait_for_seq(2).await.unwrap();
1730
1731        assert_eq!(adapter.folded_through_seq(), Some(2));
1732        assert_eq!(
1733            adapter.applied_through_seq(),
1734            None,
1735            "no successful apply ⇒ applied_through_seq stays at sentinel",
1736        );
1737        assert_eq!(*adapter.state().read(), 0);
1738    }
1739
1740    /// `snapshot()` must use `applied_through_seq`, not
1741    /// `folded_through_seq`. When the two were the same atomic,
1742    /// snapshot persisted the highest folded seq — including
1743    /// skipped events — so a restore tailed from past the skip
1744    /// and the skipped seq became permanently lost from durable
1745    /// state, even though the in-memory state at snapshot time
1746    /// never reflected it.
1747    #[tokio::test]
1748    async fn snapshot_last_seq_reflects_applied_not_folded_after_skip() {
1749        let redex = Redex::new();
1750        let attempts = Arc::new(AtomicU64::new(0));
1751        let fold = FailDecodeAtSeqs {
1752            skip: [5u64].into_iter().collect(),
1753            attempts: attempts.clone(),
1754        };
1755        let adapter = CortexAdapter::<u64>::open(
1756            &redex,
1757            &cn("cortex/audit-7-snapshot"),
1758            RedexFileConfig::default(),
1759            CortexAdapterConfig::default(),
1760            fold,
1761            0u64,
1762        )
1763        .unwrap();
1764
1765        // 8 events; seq 5 will be skipped via recoverable_decode.
1766        for i in 0..8u64 {
1767            let meta = EventMeta::new(0, 0, 0, i, 0);
1768            let env = EventEnvelope::new(meta, Bytes::from_static(b""));
1769            adapter.ingest(env).unwrap();
1770        }
1771        adapter.wait_for_seq(7).await.unwrap();
1772
1773        let folded = adapter.folded_through_seq();
1774        let applied = adapter.applied_through_seq();
1775        assert_eq!(folded, Some(7), "folded includes the skipped seq's slot");
1776        assert_eq!(
1777            applied,
1778            Some(4),
1779            "strict-prefix applied caps at the last consecutive Ok (seq 4) before the gap at seq 5",
1780        );
1781
1782        let (_bytes, last_seq) = adapter.snapshot().unwrap();
1783        // snapshot reflects applied (strict-prefix), not folded.
1784        // Pre-fix snapshot returned Some(7) — restore would tail
1785        // from seq 8 and seq 5 would be permanently lost from
1786        // durable state.
1787        assert_eq!(last_seq, applied);
1788        assert_eq!(last_seq, Some(4));
1789    }
1790
1791    /// Strict shape: when the LAST processed event is skipped,
1792    /// snapshot's `last_seq` must stay at the highest *applied*
1793    /// seq — not advance to the skipped one. Without the split,
1794    /// snapshot returned the skipped seq; restore tailed from
1795    /// past it; skipped event was lost.
1796    #[tokio::test]
1797    async fn snapshot_last_seq_does_not_advance_to_a_skipped_tail() {
1798        let redex = Redex::new();
1799        let attempts = Arc::new(AtomicU64::new(0));
1800        let fold = FailDecodeAtSeqs {
1801            // Skip the LAST event so applied < folded at snapshot time.
1802            skip: [4u64].into_iter().collect(),
1803            attempts: attempts.clone(),
1804        };
1805        let adapter = CortexAdapter::<u64>::open(
1806            &redex,
1807            &cn("cortex/audit-7-tail-skip"),
1808            RedexFileConfig::default(),
1809            CortexAdapterConfig::default(),
1810            fold,
1811            0u64,
1812        )
1813        .unwrap();
1814
1815        for i in 0..5u64 {
1816            let meta = EventMeta::new(0, 0, 0, i, 0);
1817            let env = EventEnvelope::new(meta, Bytes::from_static(b""));
1818            adapter.ingest(env).unwrap();
1819        }
1820        adapter.wait_for_seq(4).await.unwrap();
1821
1822        assert_eq!(adapter.folded_through_seq(), Some(4));
1823        assert_eq!(
1824            adapter.applied_through_seq(),
1825            Some(3),
1826            "tail apply was skipped ⇒ applied stays at the prior successful seq",
1827        );
1828
1829        let (_bytes, last_seq) = adapter.snapshot().unwrap();
1830        assert_eq!(
1831            last_seq,
1832            Some(3),
1833            "pre-fix this would be Some(4) and a restore would tail from 5, \
1834             dropping seq 4 permanently from durable state",
1835        );
1836    }
1837
1838    /// End-to-end: an adapter whose fold previously skipped seq
1839    /// N via recoverable_decode is snapshotted, closed, and
1840    /// reopened via `open_from_snapshot` with a fold that NOW
1841    /// handles seq N successfully (e.g. a software upgrade fixed
1842    /// the decoder). On restore, seq N must be re-fed to the
1843    /// fold function — without the strict-prefix watermark,
1844    /// snapshot's `last_seq` would be the highest folded
1845    /// (advanced over skips), restore would tail from past the
1846    /// skipped seq, and the previously-skipped event would be
1847    /// permanently lost.
1848    ///
1849    /// Asserts the strict-prefix `last_seq` and that the skipped
1850    /// seq is re-fed on restore. State-count after restore is
1851    /// fold-dependent (see `snapshot` doc on the re-apply
1852    /// double-counting trade-off) and is not pinned here.
1853    #[tokio::test]
1854    async fn restore_after_skip_re_attempts_skipped_event() {
1855        let redex = Redex::new();
1856        let pre_attempts = Arc::new(AtomicU64::new(0));
1857        let pre_fold = FailDecodeAtSeqs {
1858            skip: [2u64].into_iter().collect(),
1859            attempts: pre_attempts.clone(),
1860        };
1861        let adapter = CortexAdapter::<u64>::open(
1862            &redex,
1863            &cn("cortex/audit-6-7-restore-reattempt"),
1864            RedexFileConfig::default(),
1865            CortexAdapterConfig::default(),
1866            pre_fold,
1867            0u64,
1868        )
1869        .unwrap();
1870
1871        for i in 0..5u64 {
1872            let meta = EventMeta::new(0, 0, 0, i, 0);
1873            let env = EventEnvelope::new(meta, Bytes::from_static(b""));
1874            adapter.ingest(env).unwrap();
1875        }
1876        adapter.wait_for_seq(4).await.unwrap();
1877
1878        // Pre-snapshot state: STRICT-PREFIX applied caps at 1
1879        // (seq 2 was skipped; the prefix breaks there). Folded
1880        // reached 4. The fold function still wrote state for
1881        // every Ok (0,1,3,4) so state count is 4.
1882        assert_eq!(adapter.applied_through_seq(), Some(1));
1883        assert_eq!(adapter.folded_through_seq(), Some(4));
1884        assert_eq!(*adapter.state().read(), 4);
1885        let (bytes, last_seq) = adapter.snapshot().unwrap();
1886        // snapshot must reflect applied (strict-prefix), so
1887        // restore tails from seq 2 and re-attempts the skipped
1888        // event plus everything after.
1889        assert_eq!(last_seq, Some(1));
1890        adapter.close().unwrap();
1891
1892        // Reopen with a fold that handles seq 2 successfully
1893        // this time (no entries in `skip`). The previously-
1894        // skipped event must be re-fed, AND so must seqs 3 and
1895        // 4 (because their prior application is not part of the
1896        // snapshot's strict-prefix watermark — only seqs 0..=1
1897        // are guaranteed reflected in the rehydrated state).
1898        let post_attempts = Arc::new(AtomicU64::new(0));
1899        let post_fold = FailDecodeAtSeqs {
1900            skip: std::collections::HashSet::new(),
1901            attempts: post_attempts.clone(),
1902        };
1903        let restored = CortexAdapter::<u64>::open_from_snapshot(
1904            &redex,
1905            &cn("cortex/audit-6-7-restore-reattempt"),
1906            RedexFileConfig::default(),
1907            CortexAdapterConfig::default(),
1908            post_fold,
1909            &bytes,
1910            last_seq,
1911        )
1912        .unwrap();
1913
1914        restored.wait_for_seq(4).await.unwrap();
1915
1916        // Pre-fix this assertion would FAIL: snapshot would have
1917        // returned Some(4) (folded), restore would tail from
1918        // seq 5, the post_fold would never see seq 2, and the
1919        // skipped event would be permanently lost.
1920        let post_invocations = post_attempts.load(Ordering::Acquire);
1921        assert_eq!(
1922            post_invocations, 3,
1923            "post-restore fold must be re-fed seqs 2, 3, 4 (everything past the \
1924             snapshot's strict-prefix watermark)",
1925        );
1926        // After restore, the strict-prefix watermark heals all
1927        // the way to seq 4 because every event from start_seq=2
1928        // onwards applied successfully on the second pass.
1929        assert_eq!(restored.applied_through_seq(), Some(4));
1930        // State count is fold-dependent on idempotency. CountFold
1931        // is *not* idempotent (each apply increments), so seqs
1932        // 3 and 4 are double-counted: pre-restore in-memory
1933        // state was 4 (seqs 0,1,3,4 applied), snapshot bytes
1934        // serialized that 4, restore deserializes 4 + applies
1935        // seqs 2,3,4 = 7. The audit-fix's *correctness* claim is
1936        // not "state matches what a re-fold from scratch would
1937        // produce" — that requires either an idempotent fold or
1938        // snapshotting only when there's no gap (see `snapshot`
1939        // doc). The claim is "the skipped seq is re-fed instead
1940        // of being permanently lost from durable state" — pinned
1941        // by `post_invocations == 3` above.
1942        assert_eq!(
1943            *restored.state().read(),
1944            7,
1945            "non-idempotent CountFold double-counts seqs past the gap; this is the \
1946             documented re-apply trade-off, not a bug",
1947        );
1948    }
1949
1950    /// Pin: `wait_for_seq(seq)` returns promptly even when seq
1951    /// was skipped via recoverable_decode. The DoS-resistance
1952    /// contract (one bad event must not deadlock live consumers)
1953    /// is preserved by the `applied`/`folded` split — `wait_for_seq`
1954    /// still waits on `folded`, which advances over skips.
1955    #[tokio::test]
1956    async fn wait_for_seq_returns_after_recoverable_decode_skip() {
1957        let redex = Redex::new();
1958        let attempts = Arc::new(AtomicU64::new(0));
1959        let fold = FailDecodeAtSeqs {
1960            skip: [0u64].into_iter().collect(),
1961            attempts: attempts.clone(),
1962        };
1963        let adapter = CortexAdapter::<u64>::open(
1964            &redex,
1965            &cn("cortex/audit-6-no-deadlock"),
1966            RedexFileConfig::default(),
1967            CortexAdapterConfig::default(),
1968            fold,
1969            0u64,
1970        )
1971        .unwrap();
1972
1973        let meta = EventMeta::new(0, 0, 0, 0, 0);
1974        let env = EventEnvelope::new(meta, Bytes::from_static(b""));
1975        let seq = adapter.ingest(env).unwrap();
1976
1977        // wait_for_seq must NOT block on the skipped seq.
1978        // Tight timeout — pre-fix shape never had this hazard;
1979        // the post-fix split must preserve it. (If `wait_for_seq`
1980        // ever silently switches to `applied_through_seq`, this
1981        // regresses with an indefinite hang.)
1982        tokio::time::timeout(std::time::Duration::from_secs(2), adapter.wait_for_seq(seq))
1983            .await
1984            .expect("wait_for_seq must return promptly even for a recoverable-decode-skipped seq")
1985            .expect("recoverable-decode skip still advances the folded watermark");
1986
1987        // Confirm what was actually observable at return:
1988        // folded reached seq, applied did not.
1989        assert_eq!(adapter.folded_through_seq(), Some(0));
1990        assert_eq!(adapter.applied_through_seq(), None);
1991    }
1992
1993    // ========================================================================
1994    // open must reject FromSeq(n>0) / LiveOnly
1995    // ========================================================================
1996
1997    /// `open` rejects `StartPosition::FromSeq(n)` for n > 0
1998    /// because the watermark would advance past events the adapter
1999    /// never folded, leaving `wait_for_seq` lying about applied
2000    /// state. Callers that intentionally skip a prefix must use
2001    /// `open_from_snapshot`.
2002    #[test]
2003    fn open_rejects_from_seq_n_greater_than_zero() {
2004        let redex = Redex::new();
2005        let cfg = CortexAdapterConfig::new().with_start(StartPosition::FromSeq(5));
2006        let result = CortexAdapter::<u64>::open(
2007            &redex,
2008            &cn("cortex/from-seq-guard"),
2009            RedexFileConfig::default(),
2010            cfg,
2011            CountFold,
2012            0u64,
2013        );
2014        assert!(
2015            matches!(result, Err(CortexAdapterError::InvalidStartPosition(_))),
2016            "open must reject FromSeq(n>0), got {:?}",
2017            result.map(|_| "Ok"),
2018        );
2019    }
2020
2021    /// `open` rejects `StartPosition::LiveOnly` for the same
2022    /// reason — the start_seq is `file.next_seq()`, so any prior
2023    /// events go un-folded but the watermark advances past them.
2024    #[test]
2025    fn open_rejects_live_only_start_position() {
2026        let redex = Redex::new();
2027        let cfg = CortexAdapterConfig::new().with_start(StartPosition::LiveOnly);
2028        let result = CortexAdapter::<u64>::open(
2029            &redex,
2030            &cn("cortex/live-only-guard"),
2031            RedexFileConfig::default(),
2032            cfg,
2033            CountFold,
2034            0u64,
2035        );
2036        assert!(
2037            matches!(result, Err(CortexAdapterError::InvalidStartPosition(_))),
2038            "open must reject LiveOnly, got {:?}",
2039            result.map(|_| "Ok"),
2040        );
2041    }
2042
2043    // ========================================================================
2044    // Stop policy must skip-and-continue on per-event Decode errors
2045    // ========================================================================
2046
2047    struct FailDecodeAtSeq(u64);
2048    impl RedexFold<u64> for FailDecodeAtSeq {
2049        fn apply(&mut self, ev: &RedexEvent, state: &mut u64) -> Result<(), RedexError> {
2050            if ev.entry.seq == self.0 {
2051                // Decode-class error: simulates a corrupt postcard
2052                // tail / EventMeta shape mismatch / checksum miss
2053                // — exactly what the cortex fold paths surface as
2054                // RedexError::Decode.
2055                Err(RedexError::Decode(format!(
2056                    "deliberate decode failure at seq {}",
2057                    ev.entry.seq
2058                )))
2059            } else {
2060                *state += 1;
2061                Ok(())
2062            }
2063        }
2064    }
2065
2066    /// Under `Stop` policy, a `RedexError::Decode` MUST NOT halt
2067    /// the fold task — it's a per-event recoverable failure
2068    /// (corrupt event payload past the 32-bit checksum, or an
2069    /// attacker-crafted matching collision). Pre-fix this hung
2070    /// the task on the first bad event, DoSing the cortex via one
2071    /// payload. Post-fix: the bad event is logged + skipped, the
2072    /// watermark advances, and subsequent events still fold.
2073    #[tokio::test]
2074    async fn stop_policy_skips_recoverable_decode_error() {
2075        let redex = Redex::new();
2076        let adapter = CortexAdapter::<u64>::open(
2077            &redex,
2078            &cn("cortex/decode-skip"),
2079            RedexFileConfig::default(),
2080            CortexAdapterConfig::default(), // Stop is default
2081            FailDecodeAtSeq(3),
2082            0u64,
2083        )
2084        .unwrap();
2085
2086        for i in 0..10u64 {
2087            let meta = EventMeta::new(0, 0, 0, i, 0);
2088            let env = EventEnvelope::new(meta, Bytes::from_static(b""));
2089            let seq = adapter.ingest(env).unwrap();
2090            adapter.wait_for_seq(seq).await.unwrap();
2091        }
2092
2093        // Fold task is still running — the decode error didn't
2094        // halt it. fold_errors counts the one bad event.
2095        assert!(
2096            adapter.is_running(),
2097            "Stop policy must NOT halt on RedexError::Decode"
2098        );
2099        assert_eq!(adapter.fold_errors(), 1);
2100        // Seqs 0,1,2,4,5,6,7,8,9 folded; seq 3 skipped.
2101        assert_eq!(*adapter.state().read(), 9);
2102    }
2103
2104    /// `Encode` errors (storage / user-fold-level) STILL halt
2105    /// under `Stop` — pins the conservative boundary so the
2106    /// recoverable-decode carve-out is strictly limited to per-event
2107    /// decode failures. The pre-existing `test_stop_policy_halts_on_first_error`
2108    /// already exercises this with `RedexError::Encode`, but we
2109    /// pin the contract explicitly here so a future expansion of
2110    /// `is_recoverable_decode` (e.g. accidentally including
2111    /// `Encode`) is caught.
2112    #[test]
2113    fn redex_error_recoverable_decode_classification_is_decode_only() {
2114        assert!(RedexError::Decode("x".into()).is_recoverable_decode());
2115        assert!(!RedexError::Encode("x".into()).is_recoverable_decode());
2116        assert!(!RedexError::Closed.is_recoverable_decode());
2117        assert!(!RedexError::Io("x".into()).is_recoverable_decode());
2118        assert!(!RedexError::Lagged.is_recoverable_decode());
2119        assert!(!RedexError::Unauthorized.is_recoverable_decode());
2120    }
2121
2122    /// `FromSeq(0)` is equivalent to `FromBeginning` (no events
2123    /// skipped) and must still be accepted — pins the boundary so
2124    /// the start-position guard doesn't accidentally lock out the
2125    /// degenerate-but-valid `FromSeq(0)` form.
2126    #[tokio::test]
2127    async fn open_accepts_from_seq_zero() {
2128        let redex = Redex::new();
2129        let cfg = CortexAdapterConfig::new().with_start(StartPosition::FromSeq(0));
2130        CortexAdapter::<u64>::open(
2131            &redex,
2132            &cn("cortex/from-seq-zero"),
2133            RedexFileConfig::default(),
2134            cfg,
2135            CountFold,
2136            0u64,
2137        )
2138        .expect("FromSeq(0) is equivalent to FromBeginning and must be accepted");
2139    }
2140
2141    // ========================================================================
2142    // changes_with_lag must surface BroadcastStream::Lagged
2143    // ========================================================================
2144
2145    /// `changes_with_lag` yields a `ChangeEvent::Lagged(n)` when a
2146    /// subscriber falls behind the broadcast channel capacity. Pre-
2147    /// fix `changes()` silently dropped these events via
2148    /// `filter_map(|r| r.ok())`; downstream telemetry consumers had
2149    /// no way to detect or count missed change notifications.
2150    ///
2151    /// Setup: subscribe, then ingest more than CHANGES_BROADCAST_CAP
2152    /// (64) events without polling the stream. The broadcast channel
2153    /// drops the oldest, and the next stream poll surfaces a
2154    /// `Lagged(n)` for the dropped count.
2155    #[tokio::test]
2156    async fn changes_with_lag_yields_lagged_when_subscriber_falls_behind() {
2157        let redex = Redex::new();
2158        let adapter = CortexAdapter::<u64>::open(
2159            &redex,
2160            &cn("cortex/lag"),
2161            RedexFileConfig::default(),
2162            CortexAdapterConfig::default(),
2163            CountFold,
2164            0u64,
2165        )
2166        .unwrap();
2167
2168        let stream = adapter.changes_with_lag();
2169        tokio::pin!(stream);
2170
2171        // Ingest CHANGES_BROADCAST_CAP + 16 events without polling
2172        // the stream. The broadcast channel will drop the oldest 16
2173        // (or thereabouts — the exact count depends on broadcast
2174        // semantics; we just need at least one Lagged emission).
2175        let total = (CHANGES_BROADCAST_CAP + 16) as u64;
2176        for i in 0..total {
2177            let meta = EventMeta::new(0, 0, 0, i, 0);
2178            let env = EventEnvelope::new(meta, Bytes::from_static(b""));
2179            let seq = adapter.ingest(env).unwrap();
2180            adapter.wait_for_seq(seq).await.unwrap();
2181        }
2182
2183        // First poll should see a Lagged event (the broadcast channel
2184        // has overflowed). Drain the stream up to a reasonable cap and
2185        // assert at least one Lagged event was observed.
2186        let mut saw_lagged = false;
2187        let mut saw_seq = false;
2188        for _ in 0..(total as usize + 4) {
2189            match tokio::time::timeout(std::time::Duration::from_millis(50), stream.next()).await {
2190                Ok(Some(ChangeEvent::Lagged(n))) => {
2191                    saw_lagged = true;
2192                    assert!(n > 0, "Lagged count must be positive");
2193                }
2194                Ok(Some(ChangeEvent::Seq(_))) => {
2195                    saw_seq = true;
2196                }
2197                Ok(None) | Err(_) => break,
2198            }
2199        }
2200        assert!(
2201            saw_lagged,
2202            "subscriber that fell behind {} events must observe Lagged",
2203            CHANGES_BROADCAST_CAP + 16,
2204        );
2205        assert!(
2206            saw_seq,
2207            "the stream should still emit Seq events after the lag",
2208        );
2209    }
2210
2211    /// Pin: when the tail stream surfaces `RedexError::Lagged` the
2212    /// fold task catches the gap up via direct in-memory reads and
2213    /// resubscribes live, rather than silently exiting. Pre-fix
2214    /// the match arm broke out of the fold loop the first time the
2215    /// subscriber fell behind `tail_buffer_size`; state then never
2216    /// advanced past the lag point and `wait_for_seq` returned
2217    /// immediately on the `running == false` branch with no
2218    /// indication anything went wrong.
2219    ///
2220    /// Triggers `Lagged` deterministically via the backfill
2221    /// pre-flight: 50 events in the index with `tail_buffer_size =
2222    /// 4` exceeds the buffer, so `RedexFile::tail` signals
2223    /// `Lagged` as the very first stream item.
2224    #[tokio::test]
2225    async fn fold_task_recovers_from_tail_lagged() {
2226        let redex = Redex::new();
2227        let cfg = RedexFileConfig::default().with_tail_buffer_size(4);
2228
2229        // Stage the gap in the file's in-memory index without
2230        // going through an adapter (no watchers → no lag pressure
2231        // during stage). `CountFold` ignores payload, so empty
2232        // bytes are fine.
2233        let file = redex
2234            .open_file(&cn("cortex/tail-lag-recovery"), cfg)
2235            .unwrap();
2236        for _ in 0..50u64 {
2237            file.append(b"").unwrap();
2238        }
2239
2240        // Open a FromBeginning adapter on the same file. The
2241        // tail()'s backfill pre-flight sees 50 retained events vs.
2242        // buffer=4 and signals Lagged first thing. Post-fix the
2243        // recovery loop reads the gap from the index and tails
2244        // live; pre-fix the fold task exits and state stays at 0.
2245        let adapter = CortexAdapter::<u64>::open(
2246            &redex,
2247            &cn("cortex/tail-lag-recovery"),
2248            RedexFileConfig::default(), // ignored on reopen
2249            CortexAdapterConfig::default(),
2250            CountFold,
2251            0u64,
2252        )
2253        .unwrap();
2254
2255        tokio::time::timeout(std::time::Duration::from_secs(5), adapter.wait_for_seq(49))
2256            .await
2257            .expect("wait_for_seq(49) timed out — tail-Lagged recovery regressed")
2258            .expect("fold task must remain alive through Lagged recovery");
2259
2260        assert_eq!(*adapter.state().read(), 50);
2261        assert!(
2262            adapter.is_running(),
2263            "fold task must stay alive after recovering from tail lag",
2264        );
2265        assert_eq!(adapter.fold_errors(), 0);
2266    }
2267
2268    /// `changes()` continues to silently drop lag (the documented
2269    /// best-effort behavior) — pins the contract so a future
2270    /// refactor doesn't accidentally surface `Lagged` through the
2271    /// simple stream and break consumers that don't want it.
2272    #[tokio::test]
2273    async fn changes_filters_out_lag_silently() {
2274        let redex = Redex::new();
2275        let adapter = CortexAdapter::<u64>::open(
2276            &redex,
2277            &cn("cortex/lag-silent"),
2278            RedexFileConfig::default(),
2279            CortexAdapterConfig::default(),
2280            CountFold,
2281            0u64,
2282        )
2283        .unwrap();
2284
2285        let stream = adapter.changes();
2286        tokio::pin!(stream);
2287
2288        // Same overflow setup.
2289        let total = (CHANGES_BROADCAST_CAP + 16) as u64;
2290        for i in 0..total {
2291            let meta = EventMeta::new(0, 0, 0, i, 0);
2292            let env = EventEnvelope::new(meta, Bytes::from_static(b""));
2293            let seq = adapter.ingest(env).unwrap();
2294            adapter.wait_for_seq(seq).await.unwrap();
2295        }
2296
2297        // Drain everything we can from the stream. Item type is `u64`
2298        // (not Result), so we can't observe Lagged in any form. Just
2299        // verify the stream still produces some seqs without errors.
2300        let mut got_seq = false;
2301        for _ in 0..(total as usize + 4) {
2302            match tokio::time::timeout(std::time::Duration::from_millis(50), stream.next()).await {
2303                Ok(Some(_seq)) => {
2304                    got_seq = true;
2305                }
2306                Ok(None) | Err(_) => break,
2307            }
2308        }
2309        assert!(got_seq, "changes() must still emit seqs after lag");
2310    }
2311
2312    #[tokio::test]
2313    async fn test_log_and_continue_skips_errors() {
2314        let redex = Redex::new();
2315        let cfg =
2316            CortexAdapterConfig::new().with_fold_error_policy(FoldErrorPolicy::LogAndContinue);
2317        let adapter = CortexAdapter::<u64>::open(
2318            &redex,
2319            &cn("cortex/lc"),
2320            RedexFileConfig::default(),
2321            cfg,
2322            FailAtSeq(3),
2323            0u64,
2324        )
2325        .unwrap();
2326
2327        for i in 0..10u64 {
2328            let meta = EventMeta::new(0, 0, 0, i, 0);
2329            let env = EventEnvelope::new(meta, Bytes::from_static(b""));
2330            let seq = adapter.ingest(env).unwrap();
2331            adapter.wait_for_seq(seq).await.unwrap();
2332        }
2333
2334        assert!(adapter.is_running());
2335        assert_eq!(adapter.fold_errors(), 1);
2336        // All seqs except 3 were folded → state == 9.
2337        assert_eq!(*adapter.state().read(), 9);
2338    }
2339}