net/adapter/net/cortex/adapter.rs
1//! `CortexAdapter<State>` — one RedEX file, one fold, one materialized
2//! state.
3
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::{Arc, OnceLock};
6use std::time::Duration;
7
8use futures::{Stream, StreamExt};
9use parking_lot::RwLock;
10use tokio::sync::{broadcast, Notify, Semaphore};
11use tokio_stream::wrappers::BroadcastStream;
12
13use serde::de::DeserializeOwned;
14use serde::Serialize;
15
16use super::super::channel::ChannelName;
17use super::super::redex::{
18 Redex, RedexError, RedexEvent, RedexFile, RedexFileConfig, RedexFold, WriteToken,
19};
20use super::config::{CortexAdapterConfig, FoldErrorPolicy, StartPosition};
21use super::envelope::IntoRedexPayload;
22use super::error::CortexAdapterError;
23use super::meta::EVENT_META_SIZE;
24
25/// Process-wide cap on in-flight RYW waits across every
26/// `CortexAdapter` in the process. `CortexAdapterConfig::ryw_inflight_cap`
27/// bounds per-adapter; this caps the total. Operators running
28/// thousands of adapters need the process-wide bound so the
29/// cumulative permit count doesn't dwarf the memory budget.
30///
31/// Default: no global cap. Operators install one at startup via
32/// [`set_global_ryw_inflight_cap`]; the install is one-shot
33/// (`OnceLock`), so subsequent calls return `false` and leave the
34/// previous cap in place.
35static GLOBAL_RYW_CAP: OnceLock<Arc<Semaphore>> = OnceLock::new();
36
37/// Install a process-wide cap on concurrent `wait_for_token`
38/// permits across every `CortexAdapter` instance. Call once at
39/// process startup before any RYW traffic. `cap` is clamped to a
40/// minimum of 1.
41///
42/// Returns `true` if the cap was installed by this call, `false`
43/// if a prior call already installed one.
44pub fn set_global_ryw_inflight_cap(cap: usize) -> bool {
45 GLOBAL_RYW_CAP
46 .set(Arc::new(Semaphore::new(cap.max(1))))
47 .is_ok()
48}
49
50/// Lookup the installed global cap; `None` when no cap has been
51/// set (default).
52fn current_global_ryw_semaphore() -> Option<Arc<Semaphore>> {
53 GLOBAL_RYW_CAP.get().cloned()
54}
55
56/// One-file CortEX adapter: projects envelopes into RedEX payloads,
57/// tails the same file, drives a [`RedexFold`] implementation, and
58/// exposes the materialized state as a read handle.
59///
60/// Created via [`Self::open`].
61pub struct CortexAdapter<State> {
62 inner: Arc<AdapterInner<State>>,
63}
64
65/// Capacity of the post-fold change-notification broadcast channel.
66/// A slow subscriber that falls more than this many events behind
67/// gets a `Lagged` signal and should re-read state fresh.
68const CHANGES_BROADCAST_CAP: usize = 64;
69
70/// Item type yielded by [`CortexAdapter::changes_with_lag`].
71///
72/// The plain `changes()` stream uses `filter_map(|r| r.ok())`
73/// which silently drops `BroadcastStream::Lagged(n)` errors —
74/// downstream telemetry consumers have no way to surface "you
75/// missed N changes." This enum exposes both shapes; subscribers
76/// who need only the latest sequence can stay on
77/// [`CortexAdapter::changes`].
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub enum ChangeEvent {
80 /// A successful fold apply produced this RedEX sequence.
81 Seq(u64),
82 /// The subscriber fell `n` events behind the broadcast channel
83 /// and `n` change notifications were dropped. By the time the
84 /// subscriber sees this, `state()` already reflects past those
85 /// events — the lag value is purely observability.
86 Lagged(u64),
87}
88
89struct AdapterInner<State> {
90 file: RedexFile,
91 state: Arc<RwLock<State>>,
92 /// Highest RedEX seq applied to state, as a signed i64 so we can
93 /// sentinel "nothing folded yet" with `start_seq - 1` (can be
94 /// negative when `start_seq == 0`).
95 /// Highest RedEX seq the fold task has *processed* — applied
96 /// successfully OR skipped via `recoverable_decode` under
97 /// `Stop` policy. The skip-and-continue path advances this
98 /// watermark so live consumers waiting via `wait_for_seq`
99 /// don't deadlock on a single permanently-bad event (the
100 /// DoS-resistance contract documented on
101 /// `RedexError::is_recoverable_decode`).
102 ///
103 /// **Use `applied_through_seq` instead** if you need
104 /// "state actually reflects this seq" semantics — that
105 /// watermark only advances on `Ok(())` folds, and is what
106 /// `snapshot` persists so restore tails from a position
107 /// consistent with the in-memory state.
108 ///
109 /// `u64::MAX` is the "nothing folded yet" sentinel; safe
110 /// because the `open_from_snapshot` overflow guard rejects
111 /// `last_seq == u64::MAX`, so no real event can ever occupy
112 /// that slot.
113 folded_through_seq: AtomicU64,
114 /// Highest RedEX seq K such that **every** seq in
115 /// `start_seq..=K` was applied to state via a successful
116 /// `RedexFold::apply`. A *strict-prefix* watermark — any
117 /// skip (`recoverable_decode` under `Stop`, or any error
118 /// under `LogAndContinue`) breaks the prefix at the skipped
119 /// seq, and `applied_through_seq` cannot advance past it
120 /// even when later seqs apply successfully. Distinct from
121 /// `folded_through_seq`, which is the highest *processed*
122 /// seq and advances over skips so live consumers don't
123 /// deadlock.
124 ///
125 /// Snapshot persists this value, so restore tails from
126 /// `applied_through_seq + 1` with the guarantee that **every
127 /// prior seq is reflected in state**. Without strict-prefix
128 /// semantics, a skip at seq M followed by successful applies
129 /// at M+1, M+2 would advance `applied` to M+2 (highest
130 /// individual apply); snapshot persists M+2; restore tails
131 /// from M+3; seq M is never re-attempted on restore — its
132 /// mutations are permanently lost from durable state, even
133 /// though the log still carries the event. That's the
134 /// "violates log is the source of truth" failure audited as
135 /// #6+#7.
136 ///
137 /// Same `u64::MAX` "nothing applied yet" sentinel as
138 /// `folded_through_seq`. `start_seq == 0` is the only
139 /// configuration that initializes to sentinel; an
140 /// `open_from_snapshot` with `last_seq = Some(K)` initializes
141 /// to `K` (the snapshot's contract is that `start_seq..=K`
142 /// is already in the rehydrated state).
143 applied_through_seq: AtomicU64,
144 /// First RedEX seq this adapter began folding from. Any seq
145 /// strictly below this is conceptually behind us — `wait_for_seq`
146 /// short-circuits without blocking on the watermark, even when
147 /// `start_seq == 0` puts the watermark at the `u64::MAX`
148 /// sentinel. Stored on inner so the wait predicate doesn't
149 /// need to reach back into the open-time `start_seq` local.
150 start_seq: u64,
151 fold_errors: AtomicU64,
152 running: AtomicBool,
153 closed: AtomicBool,
154 notify: Notify,
155 shutdown: Notify,
156 /// Broadcast of RedEX seqs after each successful (or LogAndContinue-skipped)
157 /// fold apply. Subscribers: see [`CortexAdapter::changes`].
158 changes_tx: broadcast::Sender<u64>,
159 /// Per-adapter cap on concurrent `wait_for_token` callers. `None`
160 /// when `CortexAdapterConfig::ryw_inflight_cap == 0` (unbounded).
161 /// Otherwise sized to `ryw_inflight_cap` permits; each pending
162 /// wait holds one permit for its duration. Exceeding the cap
163 /// returns `WaitForTokenError::QueueFull` immediately.
164 ryw_inflight: Option<Arc<Semaphore>>,
165 /// Per-adapter RYW counters. See [`RywMetricsSnapshot`].
166 ryw_metrics: RywMetricsAtomic,
167}
168
169/// Atomic counters backing [`CortexAdapter::ryw_metrics`]. One
170/// instance lives on each adapter (each adapter is per-channel
171/// already, so channel-labeling falls out of the adapter identity).
172#[derive(Debug, Default)]
173struct RywMetricsAtomic {
174 waits_total: AtomicU64,
175 timeouts_total: AtomicU64,
176 queue_full_total: AtomicU64,
177 wrong_origin_total: AtomicU64,
178 wait_duration_nanos_sum: AtomicU64,
179}
180
181/// Snapshot of the RYW counters on a [`CortexAdapter`].
182///
183/// All counters are monotonic; computing a rate is the caller's job
184/// (divide deltas between two snapshots by the sampling interval).
185/// `wait_duration_nanos_sum / waits_total` approximates the mean
186/// wait time without a full histogram.
187#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
188pub struct RywMetricsSnapshot {
189 /// Cumulative `wait_for_token` calls that took a permit (i.e.
190 /// were not rejected up-front with `QueueFull` / `WrongOrigin`).
191 pub waits_total: u64,
192 /// Cumulative waits that returned `Timeout`.
193 pub timeouts_total: u64,
194 /// Cumulative waits rejected with `QueueFull`.
195 pub queue_full_total: u64,
196 /// Cumulative waits rejected with `WrongOrigin` at the bound-
197 /// adapter layer. The generic `CortexAdapter::wait_for_token`
198 /// never increments this; only `TasksAdapter::wait_for_token`
199 /// and `MemoriesAdapter::wait_for_token` do, when their guard
200 /// fires.
201 pub wrong_origin_total: u64,
202 /// Sum of nanoseconds spent inside `wait_for_token` past the
203 /// permit acquisition. Divide by `waits_total` for the mean.
204 pub wait_duration_nanos_sum: u64,
205}
206
207impl<State> CortexAdapter<State> {
208 /// Read-only access to the materialized state. The returned `Arc`
209 /// is cheap to clone; all readers and the fold task share the
210 /// same `RwLock`.
211 pub fn state(&self) -> Arc<RwLock<State>> {
212 self.inner.state.clone()
213 }
214
215 /// Highest RedEX sequence the fold task has *processed* —
216 /// applied successfully OR skipped via `recoverable_decode`
217 /// under `Stop` policy.
218 ///
219 /// Use [`Self::applied_through_seq`] if you need "state
220 /// actually reflects this seq" semantics; the difference
221 /// matters under `Stop`+recoverable-decode where the
222 /// skip-and-continue path advances *this* watermark (so
223 /// `wait_for_seq` doesn't deadlock on a bad event) but
224 /// leaves `applied_through_seq` behind.
225 ///
226 /// `None` if no event has been folded yet since open.
227 pub fn folded_through_seq(&self) -> Option<u64> {
228 let v = self.inner.folded_through_seq.load(Ordering::Acquire);
229 if v == u64::MAX {
230 None
231 } else {
232 Some(v)
233 }
234 }
235
236 /// Highest RedEX sequence K such that *every* seq in
237 /// `start_seq..=K` was successfully applied to state via
238 /// `Ok(()) RedexFold::apply`. A *strict-prefix* watermark:
239 /// any skip (`recoverable_decode` under `Stop`, or any error
240 /// under `LogAndContinue`) at seq M permanently caps this
241 /// watermark at M-1, even if subsequent seqs apply
242 /// successfully.
243 ///
244 /// `snapshot` persists this value, so a restore tails from
245 /// `applied_through_seq + 1` and re-attempts every seq from
246 /// the first skip onwards — preserving "log is the source
247 /// of truth" even across the skip-and-continue path.
248 ///
249 /// Distinct from [`Self::folded_through_seq`], which is the
250 /// highest *processed* seq and advances over skips so live
251 /// consumers don't deadlock.
252 ///
253 /// `None` if no event has been applied yet since open
254 /// (start_seq=0 with no successful applies, or start_seq=0
255 /// with the very first event having been skipped).
256 pub fn applied_through_seq(&self) -> Option<u64> {
257 let v = self.inner.applied_through_seq.load(Ordering::Acquire);
258 if v == u64::MAX {
259 None
260 } else {
261 Some(v)
262 }
263 }
264
265 /// Cumulative count of fold errors (only ever increases under
266 /// [`FoldErrorPolicy::LogAndContinue`]; under `Stop` it is 0 or
267 /// 1, with the task exiting after the first error).
268 pub fn fold_errors(&self) -> u64 {
269 self.inner.fold_errors.load(Ordering::Acquire)
270 }
271
272 /// True if the fold task is currently running (has not observed
273 /// shutdown, an error under `Stop`, or a tail-end signal).
274 pub fn is_running(&self) -> bool {
275 self.inner.running.load(Ordering::Acquire)
276 }
277
278 /// Block until the fold task has *processed* every event up
279 /// through `seq` (applied successfully OR skipped via
280 /// `recoverable_decode` under `Stop` policy), or until the
281 /// fold task stops (e.g. close, non-recoverable fold error
282 /// under `Stop`).
283 ///
284 /// Returns `Ok(())` when the folded watermark reaches `seq`,
285 /// or `Err(folded)` where `folded` is the highest seq processed
286 /// before the fold task stopped (`None` if it stopped without
287 /// processing anything). Pre-fix this returned `()` for both
288 /// outcomes, so a caller waiting on a seq the fold task never
289 /// reached (close, Stop-policy halt, retention-evicted tail
290 /// lag) silently observed stale state. Mirrors
291 /// [`Self::wait_for_applied_seq`]'s shape.
292 ///
293 /// Use pattern:
294 /// ```ignore
295 /// let seq = adapter.ingest(envelope)?;
296 /// adapter.wait_for_seq(seq).await?;
297 /// let state = adapter.state().read();
298 /// // state reflects the ingest, UNLESS the event at `seq`
299 /// // was skipped via recoverable_decode under `Stop`.
300 /// ```
301 ///
302 /// **Subtle point.** This method waits on
303 /// [`Self::folded_through_seq`], which advances over
304 /// events the fold task processed but skipped via
305 /// `RedexError::is_recoverable_decode`. The skip-and-
306 /// continue path is the documented DoS-resistance contract
307 /// (a single corrupt event must not wedge the task forever).
308 /// If you need to confirm `state` actually reflects the
309 /// ingest at `seq`, follow up with
310 /// `adapter.applied_through_seq() >= Some(seq)`.
311 pub async fn wait_for_seq(&self, seq: u64) -> Result<(), Option<u64>> {
312 // Any seq strictly below `start_seq` is conceptually behind
313 // us — those events were applied before we opened the
314 // adapter (or are explicitly past the LiveOnly cutoff).
315 // Short-circuit returning immediately so a caller that
316 // passes a stale seq cannot hang. This also covers the
317 // `start_seq == 0 && seq == 0` blocked-forever-on-empty-
318 // file case for adapters opened with `FromBeginning` on a
319 // freshly-created log: `seq < start_seq` is false, but the
320 // sentinel check below correctly waits for the first event.
321 if seq < self.inner.start_seq {
322 return Ok(());
323 }
324 loop {
325 let notified = self.inner.notify.notified();
326 tokio::pin!(notified);
327 notified.as_mut().enable();
328
329 let watermark = self.inner.folded_through_seq.load(Ordering::Acquire);
330 // `u64::MAX` is the "nothing folded yet" sentinel — any
331 // other value is a real applied seq, so `watermark >= seq`
332 // after the sentinel check returns exactly when seq has
333 // been applied.
334 if watermark != u64::MAX && watermark >= seq {
335 return Ok(());
336 }
337 if !self.inner.running.load(Ordering::Acquire) {
338 return Err(self.folded_through_seq());
339 }
340 notified.await;
341 }
342 }
343
344 /// RYW-strength wait. Resolves when the **applied** watermark
345 /// (events that actually ran through the fold body, not
346 /// recoverable-skipped) catches up to `seq`, OR when the fold
347 /// task stops before reaching `seq`.
348 ///
349 /// Returns `Ok(())` on a real apply-through, `Err(applied)`
350 /// where `applied` is the last successfully-applied seq on
351 /// stop. Differs from [`Self::wait_for_seq`] which resolves
352 /// on the folded watermark (including skipped events). RYW
353 /// requires applied, not folded — otherwise a producer whose
354 /// write hit a recoverable-decode skip would observe
355 /// `Ok(())` and then read state that doesn't reflect the
356 /// write.
357 pub async fn wait_for_applied_seq(&self, seq: u64) -> Result<(), Option<u64>> {
358 if seq < self.inner.start_seq {
359 return Ok(());
360 }
361 loop {
362 let notified = self.inner.notify.notified();
363 tokio::pin!(notified);
364 notified.as_mut().enable();
365
366 let watermark = self.inner.applied_through_seq.load(Ordering::Acquire);
367 if watermark != u64::MAX && watermark >= seq {
368 return Ok(());
369 }
370 if !self.inner.running.load(Ordering::Acquire) {
371 // Stopped without ever reaching seq. Surface the
372 // last-applied watermark so the caller can build
373 // a typed error.
374 let applied = self.applied_through_seq();
375 return Err(applied);
376 }
377 notified.await;
378 }
379 }
380
381 /// Close the adapter. Stops the fold task (after it finishes any
382 /// in-progress apply), leaves the RedEX file open so other
383 /// adapters / callers can continue using it, and leaves the
384 /// state handle readable. Idempotent.
385 pub fn close(&self) -> Result<(), CortexAdapterError> {
386 if self.inner.closed.swap(true, Ordering::AcqRel) {
387 return Ok(());
388 }
389 // `notify_one()` stores a permit if the fold task hasn't yet
390 // reached its `shutdown.notified()` poll, so a close that
391 // races the spawn → first-select window is still observed.
392 // `notify_waiters()` would drop the signal in that window.
393 self.inner.shutdown.notify_one();
394 Ok(())
395 }
396
397 /// Stream of RedEX sequences, one per successful (or
398 /// `LogAndContinue`-skipped) fold application. Used by reactive
399 /// queries: on each emission, the caller re-reads
400 /// [`Self::state`] to compute its current view.
401 ///
402 /// Lag semantics: if a subscriber falls more than 64 events
403 /// behind (the internal broadcast channel capacity), the channel
404 /// drops intermediate events. This implementation filters lag
405 /// errors out silently — by the time the subscriber catches up,
406 /// `state()` reflects the latest applied events regardless of
407 /// how many signals were missed. Subscribers that need to
408 /// observe lag (e.g. for telemetry or reactive-backpressure)
409 /// should use [`Self::changes_with_lag`] instead.
410 ///
411 /// The stream ends when all adapter handles have been dropped
412 /// and the fold task has exited.
413 pub fn changes(&self) -> impl Stream<Item = u64> + Send + 'static {
414 BroadcastStream::new(self.inner.changes_tx.subscribe())
415 .filter_map(|r| async move { r.ok() })
416 }
417
418 /// Stream of changes that surfaces broadcast-channel lag as a
419 /// `Lagged(n)` event interleaved with the sequence emissions.
420 ///
421 /// The yielded items are [`ChangeEvent`]s — `Seq(u64)` for a
422 /// successful fold-apply notification, and `Lagged(n)` when the
423 /// subscriber fell `n` events behind the broadcast channel
424 /// (capacity 64). Pre-fix [`Self::changes`] silently dropped
425 /// `Lagged` errors via `filter_map(|r| r.ok())`; downstream
426 /// telemetry consumers had no way to surface "you missed N
427 /// changes." This method is the lossless counterpart — by the
428 /// time a subscriber sees `Lagged(n)`, `state()` already
429 /// reflects past those n events, so the subscriber can react
430 /// (re-read state, log lag, apply backpressure) without
431 /// missing data.
432 pub fn changes_with_lag(&self) -> impl Stream<Item = ChangeEvent> + Send + 'static {
433 use futures::StreamExt;
434 BroadcastStream::new(self.inner.changes_tx.subscribe()).map(|r| match r {
435 Ok(seq) => ChangeEvent::Seq(seq),
436 Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => {
437 ChangeEvent::Lagged(n)
438 }
439 })
440 }
441
442 /// Append an envelope. Projects to `(EventMeta, tail)`, builds the
443 /// concatenated payload, calls [`RedexFile::append`], and returns
444 /// the assigned RedEX sequence.
445 pub fn ingest<E: IntoRedexPayload>(&self, envelope: E) -> Result<u64, CortexAdapterError> {
446 if self.inner.closed.load(Ordering::Acquire) {
447 return Err(CortexAdapterError::Closed);
448 }
449 let (meta, tail) = envelope.into_redex_payload();
450 let mut buf = Vec::with_capacity(EVENT_META_SIZE + tail.len());
451 buf.extend_from_slice(&meta.to_bytes());
452 buf.extend_from_slice(&tail);
453 Ok(self.inner.file.append(&buf)?)
454 }
455
456 /// Append an envelope and return a [`WriteToken`] addressing
457 /// the resulting write. The token is the typed handle the
458 /// read-your-writes API consumes via
459 /// [`Self::wait_for_token`]; equivalent to calling [`Self::ingest`]
460 /// and pairing the returned seq with the envelope's
461 /// `meta.origin_hash`, but does both in one shot so the binding
462 /// surface can round-trip a single value.
463 pub fn ingest_with_token<E: IntoRedexPayload>(
464 &self,
465 envelope: E,
466 ) -> Result<WriteToken, CortexAdapterError> {
467 if self.inner.closed.load(Ordering::Acquire) {
468 return Err(CortexAdapterError::Closed);
469 }
470 let (meta, tail) = envelope.into_redex_payload();
471 let origin_hash = meta.origin_hash;
472 let mut buf = Vec::with_capacity(EVENT_META_SIZE + tail.len());
473 buf.extend_from_slice(&meta.to_bytes());
474 buf.extend_from_slice(&tail);
475 let seq = self.inner.file.append(&buf)?;
476 Ok(WriteToken::new(origin_hash, seq))
477 }
478
479 /// Block until the fold task has processed every event up
480 /// through `token.seq`, or `deadline` elapses. Returns
481 /// `Err(WaitForTokenError::Timeout)` on deadline; `Ok(())`
482 /// once the watermark catches up (or the fold task stops —
483 /// see [`Self::wait_for_seq`] for the same caveat).
484 ///
485 /// The token's `origin_hash` is informational at this layer
486 /// — the generic [`CortexAdapter`] folds every event in its
487 /// RedEX file regardless of origin. Origin-bound adapters
488 /// (e.g. [`super::tasks::TasksAdapter`],
489 /// [`super::memories::MemoriesAdapter`]) layer their own
490 /// origin assertion on top.
491 pub async fn wait_for_token(
492 &self,
493 token: WriteToken,
494 deadline: Duration,
495 ) -> Result<(), WaitForTokenError> {
496 // Try-acquire FIRST so backpressure surfaces before the timer
497 // arms — under saturation a `QueueFull` is the correct
498 // diagnostic, not a `Timeout` masking it.
499 //
500 // Two-tier acquire: global cap (process-wide; bounds
501 // cumulative permit count across every adapter) then
502 // per-adapter cap. Both must succeed; if either is
503 // saturated we bump the metric and return QueueFull. The
504 // global permit is dropped at function return; tests that
505 // exercise saturation rely on this being held for the
506 // duration of the wait.
507 let _global_permit = match current_global_ryw_semaphore() {
508 Some(sem) => Some(sem.try_acquire_owned().map_err(|_| {
509 self.inner
510 .ryw_metrics
511 .queue_full_total
512 .fetch_add(1, Ordering::Relaxed);
513 WaitForTokenError::QueueFull
514 })?),
515 None => None,
516 };
517 let _permit = match &self.inner.ryw_inflight {
518 Some(sem) => Some(sem.clone().try_acquire_owned().map_err(|_| {
519 self.inner
520 .ryw_metrics
521 .queue_full_total
522 .fetch_add(1, Ordering::Relaxed);
523 WaitForTokenError::QueueFull
524 })?),
525 None => None,
526 };
527 self.inner
528 .ryw_metrics
529 .waits_total
530 .fetch_add(1, Ordering::Relaxed);
531 let started = tokio::time::Instant::now();
532 // RYW waits on the *applied* watermark, not the folded
533 // watermark — events skipped via recoverable_decode under
534 // FoldErrorPolicy::Stop advance folded but not applied,
535 // and a producer whose write hit such a skip must NOT
536 // observe Ok(()) (that would let them read state that
537 // doesn't reflect their write).
538 let outcome =
539 match tokio::time::timeout(deadline, self.wait_for_applied_seq(token.seq)).await {
540 Ok(Ok(())) => Ok(()),
541 Ok(Err(applied_through_seq)) => Err(WaitForTokenError::FoldStopped {
542 applied_through_seq,
543 }),
544 Err(_) => {
545 self.inner
546 .ryw_metrics
547 .timeouts_total
548 .fetch_add(1, Ordering::Relaxed);
549 Err(WaitForTokenError::Timeout)
550 }
551 };
552 let nanos = u64::try_from(started.elapsed().as_nanos()).unwrap_or(u64::MAX);
553 self.inner
554 .ryw_metrics
555 .wait_duration_nanos_sum
556 .fetch_add(nanos, Ordering::Relaxed);
557 outcome
558 }
559
560 /// Snapshot the RYW counters for this adapter. Cheap; reads
561 /// four atomics under `Relaxed`.
562 pub fn ryw_metrics(&self) -> RywMetricsSnapshot {
563 let m = &self.inner.ryw_metrics;
564 RywMetricsSnapshot {
565 waits_total: m.waits_total.load(Ordering::Relaxed),
566 timeouts_total: m.timeouts_total.load(Ordering::Relaxed),
567 queue_full_total: m.queue_full_total.load(Ordering::Relaxed),
568 wrong_origin_total: m.wrong_origin_total.load(Ordering::Relaxed),
569 wait_duration_nanos_sum: m.wait_duration_nanos_sum.load(Ordering::Relaxed),
570 }
571 }
572
573 /// Bump the `wrong_origin_total` RYW counter. Called by the
574 /// origin-bound adapter wrappers when their guard fires; not
575 /// part of the generic adapter's own happy path.
576 pub(super) fn note_wrong_origin(&self) {
577 self.inner
578 .ryw_metrics
579 .wrong_origin_total
580 .fetch_add(1, Ordering::Relaxed);
581 }
582}
583
584/// Errors surfaced by [`CortexAdapter::wait_for_token`] and the
585/// origin-bound adapters that wrap it.
586#[derive(Debug, PartialEq, Eq)]
587pub enum WaitForTokenError {
588 /// Deadline elapsed before the fold watermark advanced to
589 /// the token's seq. The write may still land later; the
590 /// caller can retry with a fresh deadline or accept the
591 /// stale read.
592 Timeout,
593 /// Token belongs to a different origin than this adapter
594 /// folds. Origin-bound adapters surface this to catch the
595 /// caller-side aliasing where a token from chain A is waited
596 /// on against an adapter bound to chain B — the wait would
597 /// otherwise hang on a seq that can never land here.
598 WrongOrigin {
599 /// origin the token was issued for.
600 token_origin: u64,
601 /// origin this adapter is bound to.
602 adapter_origin: u64,
603 },
604 /// Per-channel in-flight cap is saturated — back-pressure for
605 /// callers who can shed load instead of stacking unbounded
606 /// pending waits. See
607 /// [`super::config::CortexAdapterConfig::with_ryw_inflight_cap`].
608 QueueFull,
609 /// Fold task stopped before the token's seq was reached.
610 /// Under `FoldErrorPolicy::Stop` an unrecoverable fold error
611 /// halts the task, and any pending RYW wait observing
612 /// `running == false` surfaces this variant rather than a
613 /// silent `Ok(())` — otherwise a producer cannot distinguish
614 /// "your write is visible" from "the adapter is dead and
615 /// never will reach your write."
616 FoldStopped {
617 /// Last seq the fold task fully applied before stopping.
618 /// `None` if the task stopped before applying any event.
619 applied_through_seq: Option<u64>,
620 },
621}
622
623impl std::fmt::Display for WaitForTokenError {
624 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
625 match self {
626 Self::Timeout => f.write_str("read-your-writes wait timed out"),
627 Self::WrongOrigin {
628 token_origin,
629 adapter_origin,
630 } => write!(
631 f,
632 "token origin {:016x} != adapter origin {:016x}",
633 token_origin, adapter_origin
634 ),
635 Self::QueueFull => f.write_str("read-your-writes wait-queue saturated; retry later"),
636 Self::FoldStopped {
637 applied_through_seq,
638 } => match applied_through_seq {
639 Some(seq) => write!(
640 f,
641 "fold task stopped before reaching token seq (applied through {})",
642 seq
643 ),
644 None => f.write_str("fold task stopped before applying any event"),
645 },
646 }
647 }
648}
649
650impl std::error::Error for WaitForTokenError {}
651
652impl<State: Send + Sync + 'static> CortexAdapter<State> {
653 /// Open an adapter against a RedEX file.
654 ///
655 /// Opens (or reuses) `<redex>/<name>` via
656 /// [`Redex::open_file`](super::super::redex::Redex::open_file),
657 /// spawns a background task that tails the file and drives
658 /// `fold`, and returns the handle.
659 pub fn open<F>(
660 redex: &Redex,
661 name: &ChannelName,
662 redex_config: RedexFileConfig,
663 adapter_config: CortexAdapterConfig,
664 fold: F,
665 initial_state: State,
666 ) -> Result<Self, CortexAdapterError>
667 where
668 F: RedexFold<State> + Send + 'static,
669 {
670 // Positions that skip a non-empty event prefix require
671 // externally-rehydrated state — the watermark would
672 // otherwise advance past events the adapter never saw,
673 // making `wait_for_seq(k)` return immediately for skipped
674 // k while `state` has never observed those events.
675 // Callers using these positions must use
676 // `open_from_snapshot` (which carries the matching
677 // `last_seq` + serialized state) and routes through
678 // `open_unchecked` below.
679 match adapter_config.start {
680 StartPosition::FromBeginning => {}
681 StartPosition::LiveOnly => {
682 return Err(CortexAdapterError::InvalidStartPosition("LiveOnly"));
683 }
684 StartPosition::FromSeq(n) if n > 0 => {
685 return Err(CortexAdapterError::InvalidStartPosition("FromSeq(n>0)"));
686 }
687 StartPosition::FromSeq(_) => {} // FromSeq(0) is equivalent to FromBeginning
688 }
689 Self::open_unchecked(
690 redex,
691 name,
692 redex_config,
693 adapter_config,
694 fold,
695 initial_state,
696 )
697 }
698
699 /// Internal open path that bypasses the start-position
700 /// guard. Used by `open_from_snapshot`, where the externally-
701 /// rehydrated state is the legitimate reason to skip the
702 /// event prefix.
703 fn open_unchecked<F>(
704 redex: &Redex,
705 name: &ChannelName,
706 redex_config: RedexFileConfig,
707 adapter_config: CortexAdapterConfig,
708 mut fold: F,
709 initial_state: State,
710 ) -> Result<Self, CortexAdapterError>
711 where
712 F: RedexFold<State> + Send + 'static,
713 {
714 let file = redex.open_file(name, redex_config)?;
715
716 let start_seq = match adapter_config.start {
717 StartPosition::FromBeginning => 0,
718 StartPosition::LiveOnly => file.next_seq(),
719 StartPosition::FromSeq(n) => n,
720 };
721
722 let state = Arc::new(RwLock::new(initial_state));
723 // Initial watermark encodes "applied through start_seq-1", so
724 // `wait_for_seq(start_seq-1)` returns immediately after open
725 // (those seqs are conceptually behind us) while
726 // `wait_for_seq(start_seq)` blocks until the first event
727 // actually folds. `start_seq == 0` encodes the "nothing
728 // folded yet" state with the `u64::MAX` sentinel.
729 let initial_watermark: u64 = if start_seq == 0 {
730 u64::MAX
731 } else {
732 start_seq - 1
733 };
734 let (changes_tx, _) = broadcast::channel(CHANGES_BROADCAST_CAP);
735 let ryw_inflight = if adapter_config.ryw_inflight_cap == 0 {
736 None
737 } else {
738 Some(Arc::new(Semaphore::new(adapter_config.ryw_inflight_cap)))
739 };
740 let inner = Arc::new(AdapterInner {
741 file: file.clone(),
742 state: state.clone(),
743 folded_through_seq: AtomicU64::new(initial_watermark),
744 // Mirror folded's initial watermark: the snapshot/restore
745 // contract treats the sentinel as "nothing applied yet"
746 // and the (start_seq - 1) seed as "applied through the
747 // pre-snapshot prefix" — same semantics, applied to the
748 // strict watermark.
749 applied_through_seq: AtomicU64::new(initial_watermark),
750 start_seq,
751 fold_errors: AtomicU64::new(0),
752 running: AtomicBool::new(true),
753 closed: AtomicBool::new(false),
754 notify: Notify::new(),
755 shutdown: Notify::new(),
756 changes_tx,
757 ryw_inflight,
758 ryw_metrics: RywMetricsAtomic::default(),
759 });
760
761 let policy = adapter_config.on_fold_error;
762 let inner_task = inner.clone();
763
764 tokio::spawn(async move {
765 // Registration and consumption share a task. Pre-fix
766 // `file.tail(start_seq)` ran on the caller's task and the
767 // `tokio::spawn` then queued; concurrent appends in that
768 // window could saturate the bounded tail channel before
769 // this task polled even once, evicting the watcher with
770 // `Lagged` and killing the fold loop on its first item.
771 // Doing both inside the spawn pins them adjacent in
772 // scheduler order.
773 let mut stream = Box::pin(inner_task.file.tail(start_seq));
774 'outer: loop {
775 tokio::select! {
776 biased;
777 _ = inner_task.shutdown.notified() => {
778 break 'outer;
779 }
780 next = stream.next() => {
781 match next {
782 None => break 'outer,
783 Some(Err(RedexError::Lagged)) => {
784 // Subscriber fell behind the bounded
785 // tail buffer. Pre-fix this broke the
786 // fold loop permanently and a watcher
787 // sitting on `wait_for_seq` would
788 // silently observe stale state past
789 // the lag point.
790 //
791 // Recover by catching up the gap via
792 // direct reads from the in-memory
793 // index, then resubscribing live.
794 // A naive `file.tail(folded+1)`
795 // resubscribe would deadlock when the
796 // gap exceeds `tail_buffer_size`:
797 // backfill pre-flight would signal
798 // `Lagged` again and we'd loop on the
799 // same signal indefinitely.
800 let resume_head = 'catchup: loop {
801 let folded = inner_task
802 .folded_through_seq
803 .load(Ordering::Acquire);
804 let resume = if folded == u64::MAX {
805 start_seq
806 } else {
807 folded.saturating_add(1)
808 };
809 let head = inner_task.file.next_seq();
810 if resume >= head {
811 break 'catchup Some(head);
812 }
813 let events = inner_task
814 .file
815 .read_range(resume, head);
816 if events.is_empty() {
817 // Retention has evicted the
818 // gap. The fold cannot recover
819 // what is no longer durable;
820 // halt rather than silently
821 // skip ahead.
822 tracing::error!(
823 gap_start = resume,
824 gap_end = head,
825 "cortex fold task cannot recover \
826 from tail lag: events evicted by \
827 retention"
828 );
829 break 'catchup None;
830 }
831 let mut halted = false;
832 for event in events {
833 if handle_event(
834 &inner_task,
835 &mut fold,
836 &event,
837 policy,
838 ) {
839 halted = true;
840 break;
841 }
842 }
843 if halted {
844 break 'catchup None;
845 }
846 };
847 match resume_head {
848 Some(head) => {
849 stream = Box::pin(
850 inner_task.file.tail(head),
851 );
852 tracing::warn!(
853 "cortex fold task recovered \
854 from tail lag"
855 );
856 }
857 None => break 'outer,
858 }
859 }
860 Some(Err(_)) => {
861 // Closed / Io / other terminal error.
862 break 'outer;
863 }
864 Some(Ok(event)) => {
865 if handle_event(
866 &inner_task,
867 &mut fold,
868 &event,
869 policy,
870 ) {
871 break 'outer;
872 }
873 }
874 }
875 }
876 }
877 }
878 inner_task.running.store(false, Ordering::Release);
879 inner_task.notify.notify_waiters();
880 });
881
882 Ok(Self { inner })
883 }
884}
885
886impl<State> CortexAdapter<State>
887where
888 State: Serialize + Send + Sync + 'static,
889{
890 /// Capture a point-in-time snapshot of the materialized state.
891 ///
892 /// Returns `(state_bytes, last_seq)` where `state_bytes` is the
893 /// postcard-serialized state and `last_seq` is the highest RedEX
894 /// sequence successfully *applied* to it as a strict prefix
895 /// (i.e. [`Self::applied_through_seq`]). Persist both together —
896 /// they form a consistent pair, guaranteed by the adapter
897 /// holding the state write lock while advancing the watermark.
898 ///
899 /// Restore via [`Self::open_from_snapshot`] on a State that
900 /// also implements `DeserializeOwned`. Restore tails from
901 /// `last_seq + 1`, so any events that were processed but
902 /// *skipped* via `recoverable_decode` between snapshots are
903 /// re-attempted on restore — preserving "log is the source of
904 /// truth" even across the skip-and-continue path. (Pre-fix,
905 /// `snapshot` read `folded_through_seq`, which advances over
906 /// skipped events; restore tailed from past them and the gap
907 /// became permanent in durable state.)
908 ///
909 /// **Re-apply double-counting.** `state_bytes` reflects
910 /// in-memory state at snapshot time, which includes the
911 /// effects of every successful fold *including* applies past
912 /// any prior skip. Restore tails from the strict-prefix
913 /// `last_seq + 1`, so seqs past the skip are re-fed to the
914 /// fold function — fold functions that are not idempotent
915 /// against re-application will produce divergent state on
916 /// restore. The mitigations: (1) make the fold idempotent
917 /// (the standard event-sourcing recommendation); (2)
918 /// snapshot only when `applied_through_seq() ==
919 /// folded_through_seq()` (no gap → no re-apply); or (3)
920 /// accept best-effort restore semantics for adapters that
921 /// have ever observed a recoverable_decode skip. The
922 /// trade-off vs. the pre-fix behavior is asymmetric:
923 /// pre-fix, the skipped seq was permanently lost; post-fix,
924 /// the skipped seq is re-attempted, at the cost of
925 /// double-applying intervening successful seqs for
926 /// non-idempotent folds.
927 ///
928 /// `last_seq` is `None` if no event has been applied yet
929 /// since open (the snapshot is still meaningful — it
930 /// represents the initial State — but callers typically wait
931 /// until [`Self::wait_for_seq`] has returned and
932 /// [`Self::applied_through_seq`] has advanced before
933 /// snapshotting).
934 pub fn snapshot(&self) -> Result<(Vec<u8>, Option<u64>), CortexAdapterError> {
935 let state = self.inner.state.read();
936 let bytes = postcard::to_allocvec(&*state).map_err(|e| {
937 CortexAdapterError::Redex(RedexError::Encode(format!("snapshot serialize: {}", e)))
938 })?;
939 let watermark = self.inner.applied_through_seq.load(Ordering::Acquire);
940 let last_seq = if watermark == u64::MAX {
941 None
942 } else {
943 Some(watermark)
944 };
945 Ok((bytes, last_seq))
946 }
947}
948
949impl<State> CortexAdapter<State>
950where
951 State: DeserializeOwned + Send + Sync + 'static,
952{
953 /// Open an adapter from a previously-captured snapshot, skipping
954 /// the `[0, last_seq]` replay.
955 ///
956 /// `state_bytes` is the blob returned from [`Self::snapshot`].
957 /// `last_seq` is its companion sequence. The tail starts at
958 /// `last_seq + 1`; the initial state is deserialized from the
959 /// blob; the fold task is spawned as usual.
960 ///
961 /// If `last_seq` is `None` (no events had been folded at
962 /// snapshot time), the tail starts at seq 0 — equivalent to
963 /// `StartPosition::FromBeginning` with the deserialized initial
964 /// state.
965 pub fn open_from_snapshot<F>(
966 redex: &Redex,
967 name: &ChannelName,
968 redex_config: RedexFileConfig,
969 adapter_config: CortexAdapterConfig,
970 fold: F,
971 state_bytes: &[u8],
972 last_seq: Option<u64>,
973 ) -> Result<Self, CortexAdapterError>
974 where
975 F: RedexFold<State> + Send + 'static,
976 {
977 let initial_state: State = postcard::from_bytes(state_bytes).map_err(|e| {
978 CortexAdapterError::Redex(RedexError::Encode(format!("deserialize snapshot: {}", e)))
979 })?;
980 let start = match last_seq {
981 Some(n) => {
982 let next = n.checked_add(1).ok_or_else(|| {
983 CortexAdapterError::Redex(RedexError::Encode(
984 "snapshot last_seq at u64::MAX; cannot resume".to_string(),
985 ))
986 })?;
987 StartPosition::FromSeq(next)
988 }
989 None => StartPosition::FromBeginning,
990 };
991 let config = CortexAdapterConfig {
992 start,
993 on_fold_error: adapter_config.on_fold_error,
994 ryw_inflight_cap: adapter_config.ryw_inflight_cap,
995 };
996 // Route through `open_unchecked` so the externally-
997 // rehydrated state can skip its event prefix.
998 Self::open_unchecked(redex, name, redex_config, config, fold, initial_state)
999 }
1000}
1001
1002impl<State> Clone for CortexAdapter<State> {
1003 fn clone(&self) -> Self {
1004 Self {
1005 inner: self.inner.clone(),
1006 }
1007 }
1008}
1009
1010impl<State> std::fmt::Debug for CortexAdapter<State> {
1011 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1012 f.debug_struct("CortexAdapter")
1013 .field("folded_through_seq", &self.folded_through_seq())
1014 .field("applied_through_seq", &self.applied_through_seq())
1015 .field("fold_errors", &self.fold_errors())
1016 .field("running", &self.is_running())
1017 .field("closed", &self.inner.closed.load(Ordering::Acquire))
1018 .finish()
1019 }
1020}
1021
1022/// Apply one event. Returns `true` if the task should exit
1023/// (Stop policy + error).
1024fn handle_event<State, F>(
1025 inner: &Arc<AdapterInner<State>>,
1026 fold: &mut F,
1027 event: &RedexEvent,
1028 policy: FoldErrorPolicy,
1029) -> bool
1030where
1031 F: RedexFold<State>,
1032{
1033 let seq = event.entry.seq;
1034 // Guard the `u64::MAX` sentinel. `applied_through_seq` /
1035 // `folded_through_seq` use `u64::MAX` to encode "nothing
1036 // folded yet"; a real applied seq of `u64::MAX` (only
1037 // reachable after the file's `next_seq` wraps a full 2^64
1038 // append range, but `wrapping_add(1)` in the strict-prefix
1039 // advance below can hit `u64::MAX` from `prev == u64::MAX -
1040 // 1`) would silently fold into the sentinel and `snapshot`
1041 // would persist `None`, making restore re-replay the entire
1042 // chain. `watermark.rs::Watermark` guards the analogous
1043 // `seq_or_ts` case the same way; the redex-seq path needs
1044 // the same gate. Halt rather than overflow the sentinel.
1045 if seq == u64::MAX {
1046 tracing::error!(
1047 "cortex fold halting: redex seq reached u64::MAX (sentinel \
1048 collision); this means the file has assigned every possible \
1049 seq value, an effectively impossible-without-bug condition"
1050 );
1051 return true;
1052 }
1053 // Hold the write lock across both the fold and the watermark
1054 // update so that a `snapshot()` holding `state.read()` observes
1055 // a consistent `(state, folded_through_seq)` pair — otherwise
1056 // the state could reflect seq N while the watermark still reads
1057 // N-1, causing restore to double-apply event N.
1058 let result = {
1059 let mut state = inner.state.write();
1060 let r = fold.apply(event, &mut state);
1061 // Under `Stop` policy, a per-event recoverable decode
1062 // failure (postcard error, EventMeta shape mismatch —
1063 // anything `RedexError::is_recoverable_decode` flags) is
1064 // treated as skip-and-continue rather than halting. Halting
1065 // on every such failure would let a single bad event (disk
1066 // corruption past the 32-bit checksum, or a
1067 // deliberately-crafted matching-collision tail) wedge the
1068 // fold task permanently — a DoS vector against multi-tenant
1069 // cortex instances via one bad event. Stream-level errors
1070 // (`Io`, `Closed`, `Lagged`) and authorization /
1071 // configuration errors still halt under `Stop` as
1072 // documented.
1073 //
1074 // Two watermarks; both written under the state write lock
1075 // so a concurrent `snapshot` reading either one observes a
1076 // value consistent with the state it sees:
1077 // - `applied_through_seq` is a STRICT-PREFIX watermark:
1078 // it advances by exactly one only when this seq is
1079 // the immediate successor of the current value (or
1080 // equals `start_seq` from the sentinel state). Any
1081 // skip — `recoverable_decode` under `Stop`, OR any
1082 // error under `LogAndContinue` — breaks the prefix
1083 // at the skipped seq; further successful applies
1084 // cannot heal it. `snapshot` persists this watermark
1085 // so restore tails from a position guaranteed to
1086 // have every prior seq reflected in state.
1087 // - `folded_through_seq` is the HIGHEST PROCESSED
1088 // watermark: advances on `Ok(())` AND on every skip
1089 // path. Live consumers waiting via `wait_for_seq` use
1090 // this watermark so a single bad event doesn't
1091 // deadlock them — the DoS-resistance contract.
1092 // Why two watermarks, not one strict-prefix that lives
1093 // consumers also wait on: collapsing them re-introduces
1094 // the deadlock — a consumer waiting for the skipped seq
1095 // would block forever even though the fold task has
1096 // moved on.
1097 // Order: `applied` is stored first so any reader that
1098 // observes the `folded` Release also synchronizes-with
1099 // the `applied` Release that preceded it — `applied <=
1100 // folded` is therefore an invariant at every observable
1101 // point.
1102 let applied = matches!(&r, Ok(()));
1103 let recoverable_decode = matches!(&r, Err(e) if e.is_recoverable_decode());
1104 let advance = applied
1105 || matches!((&r, policy), (Err(_), FoldErrorPolicy::LogAndContinue))
1106 || recoverable_decode;
1107 if applied {
1108 // Strict-prefix advance: bump by exactly one if this
1109 // seq is the immediate successor of the current
1110 // applied watermark (or this is the very first event
1111 // and matches `start_seq`). Otherwise the prefix is
1112 // broken by a prior skip; leave the watermark alone
1113 // and let restore re-attempt from the gap.
1114 let prev = inner.applied_through_seq.load(Ordering::Acquire);
1115 let advance_applied = if prev == u64::MAX {
1116 seq == inner.start_seq
1117 } else {
1118 seq == prev.wrapping_add(1)
1119 };
1120 if advance_applied {
1121 inner.applied_through_seq.store(seq, Ordering::Release);
1122 }
1123 }
1124 if advance {
1125 inner.folded_through_seq.store(seq, Ordering::Release);
1126 }
1127 r
1128 };
1129
1130 match result {
1131 Ok(()) => {
1132 inner.notify.notify_waiters();
1133 let _ = inner.changes_tx.send(seq);
1134 false
1135 }
1136 Err(err) => {
1137 inner.fold_errors.fetch_add(1, Ordering::AcqRel);
1138 tracing::warn!(seq = seq, error = %err, "cortex fold error");
1139 // Per-event decode errors always skip-and-continue;
1140 // only stream-level / configuration errors halt under
1141 // `Stop`.
1142 let recoverable_decode = err.is_recoverable_decode();
1143 match policy {
1144 FoldErrorPolicy::Stop if !recoverable_decode => {
1145 // Wake subscribers via `notify_waiters` so
1146 // anything parked on `inner.notify` unblocks
1147 // and can observe the halt via
1148 // `is_running()`. Do NOT broadcast `seq` on
1149 // `changes_tx`: this branch did not advance
1150 // `folded_through_seq` (the `advance` gate
1151 // above is false for `Stop + non-recoverable`),
1152 // and `changes_tx` is documented as carrying
1153 // *successful fold-apply* notifications. A
1154 // `ChangeEvent::Seq(seq)` for an unapplied
1155 // sequence would mislead consumers into
1156 // thinking the watermark advanced past the
1157 // failure — the very mis-routing the broadcast
1158 // contract was designed to avoid.
1159 //
1160 // The trade-off: subscribers using
1161 // `changes_with_lag()` won't see a terminal
1162 // event in the stream on halt; they need to
1163 // poll `is_running()` separately (or rely on
1164 // the broadcast channel ending when all adapter
1165 // handles are dropped). That's the documented
1166 // failure mode for non-recoverable halts —
1167 // surfacing a phantom seq was not.
1168 inner.notify.notify_waiters();
1169 true
1170 }
1171 FoldErrorPolicy::Stop | FoldErrorPolicy::LogAndContinue => {
1172 // Watermark was already advanced inside the lock
1173 // above; just notify waiters.
1174 inner.notify.notify_waiters();
1175 let _ = inner.changes_tx.send(seq);
1176 false
1177 }
1178 }
1179 }
1180 }
1181}
1182
1183#[cfg(test)]
1184mod tests {
1185 use super::super::super::channel::ChannelName;
1186 use super::super::super::redex::{RedexError, RedexFold};
1187 use super::super::envelope::EventEnvelope;
1188 use super::super::meta::EventMeta;
1189 use super::*;
1190 use bytes::Bytes;
1191
1192 fn cn(s: &str) -> ChannelName {
1193 ChannelName::new(s).unwrap()
1194 }
1195
1196 struct CountFold;
1197 impl RedexFold<u64> for CountFold {
1198 fn apply(&mut self, _ev: &RedexEvent, state: &mut u64) -> Result<(), RedexError> {
1199 *state += 1;
1200 Ok(())
1201 }
1202 }
1203
1204 /// Pin: `wait_for_seq(seq)` short-circuits without blocking
1205 /// when `seq < start_seq` — those events were applied before
1206 /// the adapter opened (e.g. they're folded into the snapshot
1207 /// the caller passed to `open_from_snapshot`). Pre-fix the
1208 /// function used only the `watermark >= seq` check; until at
1209 /// least one event landed under the new adapter, the
1210 /// `u64::MAX` "nothing folded yet" sentinel kept the
1211 /// comparison false and a caller waiting for a stale seq
1212 /// would block forever.
1213 #[tokio::test]
1214 async fn wait_for_seq_short_circuits_below_start_seq() {
1215 let redex = Redex::new();
1216 // Pre-populate the file with 5 events via a temporary
1217 // FromBeginning adapter, then snapshot.
1218 let bytes;
1219 let last_seq;
1220 {
1221 let pre = CortexAdapter::<u64>::open(
1222 &redex,
1223 &cn("cortex/short-circuit"),
1224 RedexFileConfig::default(),
1225 CortexAdapterConfig::default(),
1226 CountFold,
1227 0u64,
1228 )
1229 .unwrap();
1230 for i in 0..5u64 {
1231 let meta = EventMeta::new(1, 0, 1, i, 0);
1232 let env = EventEnvelope::new(meta, Bytes::from_static(b""));
1233 let seq = pre.ingest(env).unwrap();
1234 pre.wait_for_seq(seq).await.unwrap();
1235 }
1236 let (b, ls) = pre.snapshot().unwrap();
1237 bytes = b;
1238 last_seq = ls;
1239 pre.close().unwrap();
1240 }
1241
1242 // Restore from snapshot: `start_seq` is `last_seq + 1 =
1243 // 5` (the snapshot already absorbed seqs 0..=4). Any
1244 // wait_for_seq below 5 is conceptually behind us.
1245 let adapter = CortexAdapter::<u64>::open_from_snapshot(
1246 &redex,
1247 &cn("cortex/short-circuit"),
1248 RedexFileConfig::default(),
1249 CortexAdapterConfig::default(),
1250 CountFold,
1251 &bytes,
1252 last_seq,
1253 )
1254 .unwrap();
1255
1256 // wait_for_seq(0..5) must all return immediately.
1257 // Wrap each in a tight timeout — pre-fix behavior was
1258 // an indefinite block.
1259 for seq in 0..5u64 {
1260 tokio::time::timeout(std::time::Duration::from_secs(2), adapter.wait_for_seq(seq))
1261 .await
1262 .unwrap_or_else(|_| {
1263 panic!(
1264 "wait_for_seq({}) blocked past start_seq=5 — \
1265 short-circuit regressed",
1266 seq
1267 )
1268 })
1269 .expect("wait_for_seq must Ok-return when seq < start_seq");
1270 }
1271 }
1272
1273 #[tokio::test]
1274 async fn test_open_ingest_wait_query() {
1275 let redex = Redex::new();
1276 let adapter = CortexAdapter::<u64>::open(
1277 &redex,
1278 &cn("cortex/counts"),
1279 RedexFileConfig::default(),
1280 CortexAdapterConfig::default(),
1281 CountFold,
1282 0u64,
1283 )
1284 .unwrap();
1285
1286 for i in 0..10u64 {
1287 let meta = EventMeta::new(1, 0, 1, i, 0);
1288 let env = EventEnvelope::new(meta, Bytes::from_static(b""));
1289 let seq = adapter.ingest(env).unwrap();
1290 adapter.wait_for_seq(seq).await.unwrap();
1291 }
1292
1293 assert_eq!(*adapter.state().read(), 10);
1294 assert_eq!(adapter.fold_errors(), 0);
1295 assert!(adapter.is_running());
1296 }
1297
1298 #[tokio::test]
1299 async fn ingest_with_token_carries_envelope_origin_and_assigned_seq() {
1300 let redex = Redex::new();
1301 let adapter = CortexAdapter::<u64>::open(
1302 &redex,
1303 &cn("cortex/ryw-token"),
1304 RedexFileConfig::default(),
1305 CortexAdapterConfig::default(),
1306 CountFold,
1307 0u64,
1308 )
1309 .unwrap();
1310
1311 let origin: u64 = 0xCAFE_F00D_DEAD_BEEF;
1312 let meta = EventMeta::new(1, 0, origin, 0, 0);
1313 let env = EventEnvelope::new(meta, Bytes::from_static(b""));
1314 let token = adapter.ingest_with_token(env).unwrap();
1315
1316 assert_eq!(token.origin_hash, origin);
1317 assert_eq!(token.seq, 0);
1318
1319 adapter
1320 .wait_for_token(token, Duration::from_secs(2))
1321 .await
1322 .unwrap();
1323 assert_eq!(*adapter.state().read(), 1);
1324 }
1325
1326 #[tokio::test]
1327 async fn wait_for_token_returns_queue_full_above_cap() {
1328 let redex = Redex::new();
1329 let cfg = CortexAdapterConfig::default().with_ryw_inflight_cap(2);
1330 let adapter = Arc::new(
1331 CortexAdapter::<u64>::open(
1332 &redex,
1333 &cn("cortex/ryw-queue"),
1334 RedexFileConfig::default(),
1335 cfg,
1336 CountFold,
1337 0u64,
1338 )
1339 .unwrap(),
1340 );
1341
1342 // Pin two waiters on a seq that never lands — they hold the
1343 // permits until their deadline elapses.
1344 let token = WriteToken::new(0xABCD_EF01, 999);
1345 let a = adapter.clone();
1346 let h1 = tokio::spawn(async move { a.wait_for_token(token, Duration::from_secs(5)).await });
1347 let a = adapter.clone();
1348 let h2 = tokio::spawn(async move { a.wait_for_token(token, Duration::from_secs(5)).await });
1349
1350 // Give both tasks a moment to claim their permits.
1351 tokio::time::sleep(Duration::from_millis(50)).await;
1352
1353 // A third waiter on the same adapter should be rejected
1354 // immediately with QueueFull — no wait, no timeout.
1355 let started = tokio::time::Instant::now();
1356 let err = adapter
1357 .wait_for_token(token, Duration::from_secs(5))
1358 .await
1359 .unwrap_err();
1360 assert_eq!(err, WaitForTokenError::QueueFull);
1361 assert!(
1362 started.elapsed() < Duration::from_millis(100),
1363 "QueueFull must return immediately, not wait for deadline"
1364 );
1365
1366 // Drop the holders so their tasks finish on their own
1367 // schedule.
1368 let _ = (h1, h2);
1369 }
1370
1371 #[tokio::test]
1372 async fn ryw_metrics_track_waits_timeouts_and_queue_full() {
1373 let redex = Redex::new();
1374 let cfg = CortexAdapterConfig::default().with_ryw_inflight_cap(1);
1375 let adapter = Arc::new(
1376 CortexAdapter::<u64>::open(
1377 &redex,
1378 &cn("cortex/ryw-metrics"),
1379 RedexFileConfig::default(),
1380 cfg,
1381 CountFold,
1382 0u64,
1383 )
1384 .unwrap(),
1385 );
1386
1387 // No waits yet → all zeros.
1388 let s0 = adapter.ryw_metrics();
1389 assert_eq!(s0, RywMetricsSnapshot::default());
1390
1391 // Pin a long waiter to hold the only permit, then attempt a
1392 // second wait — it must hit QueueFull and bump that counter.
1393 let token = WriteToken::new(0xABCD_EF01, 999);
1394 let a = adapter.clone();
1395 let holder = tokio::spawn(async move {
1396 let _ = a.wait_for_token(token, Duration::from_secs(2)).await;
1397 });
1398 tokio::time::sleep(Duration::from_millis(50)).await;
1399 let err = adapter
1400 .wait_for_token(token, Duration::from_secs(1))
1401 .await
1402 .unwrap_err();
1403 assert_eq!(err, WaitForTokenError::QueueFull);
1404
1405 // Wait for the holder to time out, then check the counters.
1406 holder.await.unwrap();
1407 let s1 = adapter.ryw_metrics();
1408 assert_eq!(s1.waits_total, 1, "holder takes one permit + one wait slot");
1409 assert_eq!(s1.timeouts_total, 1, "holder's deadline elapsed");
1410 assert_eq!(s1.queue_full_total, 1, "saturating attempt was rejected");
1411 assert_eq!(s1.wrong_origin_total, 0);
1412 assert!(
1413 s1.wait_duration_nanos_sum >= 1_000_000_000,
1414 "holder waited at least its 2s deadline, observed {}ns",
1415 s1.wait_duration_nanos_sum
1416 );
1417 }
1418
1419 #[tokio::test]
1420 async fn wait_for_token_with_zero_cap_skips_queue_check() {
1421 let redex = Redex::new();
1422 let cfg = CortexAdapterConfig::default().with_ryw_inflight_cap(0);
1423 let adapter = CortexAdapter::<u64>::open(
1424 &redex,
1425 &cn("cortex/ryw-uncapped"),
1426 RedexFileConfig::default(),
1427 cfg,
1428 CountFold,
1429 0u64,
1430 )
1431 .unwrap();
1432
1433 // With cap=0 the semaphore is None; the path goes straight
1434 // to the deadline.
1435 let token = WriteToken::new(0xABCD_EF01, 999);
1436 let err = adapter
1437 .wait_for_token(token, Duration::from_millis(20))
1438 .await
1439 .unwrap_err();
1440 assert_eq!(err, WaitForTokenError::Timeout);
1441 }
1442
1443 #[tokio::test]
1444 async fn wait_for_token_times_out_when_seq_never_lands() {
1445 let redex = Redex::new();
1446 let adapter = CortexAdapter::<u64>::open(
1447 &redex,
1448 &cn("cortex/ryw-timeout"),
1449 RedexFileConfig::default(),
1450 CortexAdapterConfig::default(),
1451 CountFold,
1452 0u64,
1453 )
1454 .unwrap();
1455
1456 let unreachable = WriteToken::new(0xDEAD_BEEF, 999);
1457 let err = adapter
1458 .wait_for_token(unreachable, Duration::from_millis(50))
1459 .await
1460 .unwrap_err();
1461 assert_eq!(err, WaitForTokenError::Timeout);
1462 }
1463
1464 #[tokio::test]
1465 async fn test_close_stops_fold_task() {
1466 let redex = Redex::new();
1467 let adapter = CortexAdapter::<u64>::open(
1468 &redex,
1469 &cn("cortex/close"),
1470 RedexFileConfig::default(),
1471 CortexAdapterConfig::default(),
1472 CountFold,
1473 0u64,
1474 )
1475 .unwrap();
1476
1477 adapter.close().unwrap();
1478 // Close is idempotent.
1479 adapter.close().unwrap();
1480
1481 // Ingest after close returns Closed.
1482 let meta = EventMeta::new(0, 0, 0, 0, 0);
1483 let env = EventEnvelope::new(meta, Bytes::from_static(b""));
1484 let err = adapter.ingest(env).unwrap_err();
1485 assert!(matches!(err, CortexAdapterError::Closed));
1486
1487 // State handle still readable.
1488 assert_eq!(*adapter.state().read(), 0);
1489 }
1490
1491 struct FailAtSeq(u64);
1492 impl RedexFold<u64> for FailAtSeq {
1493 fn apply(&mut self, ev: &RedexEvent, state: &mut u64) -> Result<(), RedexError> {
1494 if ev.entry.seq == self.0 {
1495 Err(RedexError::Encode(format!(
1496 "deliberate failure at seq {}",
1497 ev.entry.seq
1498 )))
1499 } else {
1500 *state += 1;
1501 Ok(())
1502 }
1503 }
1504 }
1505
1506 /// Returns `RedexError::Decode` (recoverable) at the
1507 /// configured seqs; counts the attempt so tests can assert
1508 /// re-attempt after restore. Apply on any other seq bumps
1509 /// state.
1510 struct FailDecodeAtSeqs {
1511 skip: std::collections::HashSet<u64>,
1512 attempts: Arc<AtomicU64>,
1513 }
1514 impl RedexFold<u64> for FailDecodeAtSeqs {
1515 fn apply(&mut self, ev: &RedexEvent, state: &mut u64) -> Result<(), RedexError> {
1516 self.attempts.fetch_add(1, Ordering::AcqRel);
1517 if self.skip.contains(&ev.entry.seq) {
1518 Err(RedexError::Decode(format!(
1519 "deliberate decode skip at seq {}",
1520 ev.entry.seq
1521 )))
1522 } else {
1523 *state += 1;
1524 Ok(())
1525 }
1526 }
1527 }
1528
1529 #[tokio::test]
1530 async fn test_stop_policy_halts_on_first_error() {
1531 let redex = Redex::new();
1532 let adapter = CortexAdapter::<u64>::open(
1533 &redex,
1534 &cn("cortex/stop"),
1535 RedexFileConfig::default(),
1536 CortexAdapterConfig::default(), // Stop is default
1537 FailAtSeq(3),
1538 0u64,
1539 )
1540 .unwrap();
1541
1542 for i in 0..10u64 {
1543 let meta = EventMeta::new(0, 0, 0, i, 0);
1544 let env = EventEnvelope::new(meta, Bytes::from_static(b""));
1545 adapter.ingest(env).unwrap();
1546 }
1547
1548 // Wait until fold task stops. wait_for_seq surfaces the
1549 // halt as Err(Some(folded_through)) — pre-fix it returned
1550 // silently and the caller had no way to distinguish a
1551 // halt from a successful fold-through.
1552 let folded = adapter
1553 .wait_for_seq(10)
1554 .await
1555 .expect_err("Stop policy must surface the halt to wait_for_seq");
1556 // Seqs 0..=2 folded; seq 3 errored; the watermark caps at
1557 // 2 under Stop+non-recoverable so `folded_through` reflects
1558 // the successful prefix.
1559 assert_eq!(folded, Some(2));
1560 assert!(!adapter.is_running());
1561 assert_eq!(adapter.fold_errors(), 1);
1562 // Seqs 0..=2 folded; seq 3 errored; seqs 4..=9 never folded.
1563 assert_eq!(*adapter.state().read(), 3);
1564 }
1565
1566 /// `changes_tx` is the broadcast channel `changes_with_lag`
1567 /// surfaces as `ChangeEvent::Seq(u64)` — documented as
1568 /// "successful fold-apply notification". On the
1569 /// Stop+non-recoverable halt path, the watermark
1570 /// (`folded_through_seq`) is NOT advanced, so emitting the
1571 /// failing seq on `changes_tx` would mis-represent an
1572 /// unapplied event as if it were folded. Subscribers reading
1573 /// the broadcast and trusting the contract would advance
1574 /// their own state machines past the failure.
1575 ///
1576 /// Pin: after a Stop-policy halt, the broadcast must contain
1577 /// the prefix that *did* apply (seqs 0..=2), and must NOT
1578 /// contain the failing seq (3) or any later seq.
1579 #[tokio::test]
1580 async fn stop_policy_does_not_broadcast_failing_seq() {
1581 use futures::StreamExt;
1582
1583 let redex = Redex::new();
1584 let adapter = CortexAdapter::<u64>::open(
1585 &redex,
1586 &cn("cortex/stop-no-phantom-seq"),
1587 RedexFileConfig::default(),
1588 CortexAdapterConfig::default(), // Stop is default
1589 FailAtSeq(3),
1590 0u64,
1591 )
1592 .unwrap();
1593
1594 // Subscribe BEFORE ingesting so we capture every seq.
1595 let mut changes = adapter.changes_with_lag();
1596
1597 for i in 0..10u64 {
1598 let meta = EventMeta::new(0, 0, 0, i, 0);
1599 let env = EventEnvelope::new(meta, Bytes::from_static(b""));
1600 adapter.ingest(env).unwrap();
1601 }
1602
1603 // Wait for halt. wait_for_seq now surfaces the halt as
1604 // `Err(Some(folded))` rather than silently returning.
1605 let _ = adapter
1606 .wait_for_seq(10)
1607 .await
1608 .expect_err("Stop policy halt");
1609 assert!(!adapter.is_running(), "Stop policy must halt the task");
1610 assert_eq!(adapter.fold_errors(), 1);
1611
1612 // Drain the broadcast with a short bound so a regression
1613 // that re-emits the phantom seq doesn't hang the test.
1614 let mut received: Vec<u64> = Vec::new();
1615 loop {
1616 match tokio::time::timeout(std::time::Duration::from_millis(50), changes.next()).await {
1617 Ok(Some(ChangeEvent::Seq(s))) => received.push(s),
1618 Ok(Some(ChangeEvent::Lagged(_))) => continue,
1619 Ok(None) | Err(_) => break,
1620 }
1621 }
1622
1623 // Successful prefix (0, 1, 2) must be visible. The failing
1624 // seq (3) and any later seq must NOT.
1625 assert_eq!(
1626 received,
1627 vec![0, 1, 2],
1628 "broadcast must carry only successfully-folded seqs; \
1629 pre-fix this would include 3 (the failing seq) as a \
1630 phantom Seq(3) event, mis-routing subscribers' state"
1631 );
1632 }
1633
1634 // ========================================================================
1635 // applied_through_seq vs folded_through_seq
1636 // ========================================================================
1637
1638 /// Under `Stop` policy, a per-event recoverable decode error
1639 /// advances `folded_through_seq` (so live consumers don't
1640 /// deadlock on a permanently-bad event) but must NOT advance
1641 /// `applied_through_seq` (state at the skipped seq was never
1642 /// written). When the two were the same atomic,
1643 /// `wait_for_seq(seq)` returned claiming "state reflects seq"
1644 /// for events whose mutation never landed.
1645 #[tokio::test]
1646 async fn recoverable_decode_skip_advances_folded_but_not_applied() {
1647 let redex = Redex::new();
1648 let attempts = Arc::new(AtomicU64::new(0));
1649 let fold = FailDecodeAtSeqs {
1650 skip: [3u64].into_iter().collect(),
1651 attempts: attempts.clone(),
1652 };
1653 let adapter = CortexAdapter::<u64>::open(
1654 &redex,
1655 &cn("cortex/audit-6-skip"),
1656 RedexFileConfig::default(),
1657 CortexAdapterConfig::default(), // Stop is default
1658 fold,
1659 0u64,
1660 )
1661 .unwrap();
1662
1663 for i in 0..6u64 {
1664 let meta = EventMeta::new(0, 0, 0, i, 0);
1665 let env = EventEnvelope::new(meta, Bytes::from_static(b""));
1666 adapter.ingest(env).unwrap();
1667 }
1668 adapter.wait_for_seq(5).await.unwrap();
1669
1670 // Live consumer didn't deadlock — folded reached the tail.
1671 assert_eq!(
1672 adapter.folded_through_seq(),
1673 Some(5),
1674 "folded must advance over the recoverable_decode skip",
1675 );
1676 // STRICT-PREFIX: applied caps at 2 (the last consecutive
1677 // successful seq before the gap at seq 3). Subsequent
1678 // successful applies at 4 and 5 do NOT heal the prefix.
1679 assert_eq!(
1680 adapter.applied_through_seq(),
1681 Some(2),
1682 "strict-prefix watermark caps at the last consecutive Ok before the skip",
1683 );
1684 // State count = 5 (seqs 0,1,2,4,5 applied; 3 skipped).
1685 // The fold function still mutated state for the seqs it
1686 // saw — only the WATERMARK is strict-prefix.
1687 assert_eq!(*adapter.state().read(), 5);
1688 assert_eq!(adapter.fold_errors(), 1);
1689 // Task still running — recoverable decode does not halt
1690 // under Stop.
1691 assert!(adapter.is_running());
1692 // Six attempts (one per event). The `attempts` counter
1693 // pins that the fold function was invoked for the
1694 // skipped seq (skip is detected inside the fold, not
1695 // upstream).
1696 assert_eq!(attempts.load(Ordering::Acquire), 6);
1697 }
1698
1699 /// When the FIRST event is skipped via
1700 /// recoverable_decode, `applied_through_seq`
1701 /// stays at the "nothing applied yet" sentinel until a
1702 /// subsequent successful fold. This is the strict shape: an
1703 /// adapter freshly opened that has only seen skipped events
1704 /// reports `None` from `applied_through_seq`, even though
1705 /// `folded_through_seq` reports the highest skipped seq.
1706 #[tokio::test]
1707 async fn applied_stays_at_sentinel_when_only_skipped_events_processed() {
1708 let redex = Redex::new();
1709 let attempts = Arc::new(AtomicU64::new(0));
1710 let fold = FailDecodeAtSeqs {
1711 skip: [0u64, 1, 2].into_iter().collect(),
1712 attempts: attempts.clone(),
1713 };
1714 let adapter = CortexAdapter::<u64>::open(
1715 &redex,
1716 &cn("cortex/audit-6-only-skips"),
1717 RedexFileConfig::default(),
1718 CortexAdapterConfig::default(),
1719 fold,
1720 0u64,
1721 )
1722 .unwrap();
1723
1724 for i in 0..3u64 {
1725 let meta = EventMeta::new(0, 0, 0, i, 0);
1726 let env = EventEnvelope::new(meta, Bytes::from_static(b""));
1727 adapter.ingest(env).unwrap();
1728 }
1729 adapter.wait_for_seq(2).await.unwrap();
1730
1731 assert_eq!(adapter.folded_through_seq(), Some(2));
1732 assert_eq!(
1733 adapter.applied_through_seq(),
1734 None,
1735 "no successful apply ⇒ applied_through_seq stays at sentinel",
1736 );
1737 assert_eq!(*adapter.state().read(), 0);
1738 }
1739
1740 /// `snapshot()` must use `applied_through_seq`, not
1741 /// `folded_through_seq`. When the two were the same atomic,
1742 /// snapshot persisted the highest folded seq — including
1743 /// skipped events — so a restore tailed from past the skip
1744 /// and the skipped seq became permanently lost from durable
1745 /// state, even though the in-memory state at snapshot time
1746 /// never reflected it.
1747 #[tokio::test]
1748 async fn snapshot_last_seq_reflects_applied_not_folded_after_skip() {
1749 let redex = Redex::new();
1750 let attempts = Arc::new(AtomicU64::new(0));
1751 let fold = FailDecodeAtSeqs {
1752 skip: [5u64].into_iter().collect(),
1753 attempts: attempts.clone(),
1754 };
1755 let adapter = CortexAdapter::<u64>::open(
1756 &redex,
1757 &cn("cortex/audit-7-snapshot"),
1758 RedexFileConfig::default(),
1759 CortexAdapterConfig::default(),
1760 fold,
1761 0u64,
1762 )
1763 .unwrap();
1764
1765 // 8 events; seq 5 will be skipped via recoverable_decode.
1766 for i in 0..8u64 {
1767 let meta = EventMeta::new(0, 0, 0, i, 0);
1768 let env = EventEnvelope::new(meta, Bytes::from_static(b""));
1769 adapter.ingest(env).unwrap();
1770 }
1771 adapter.wait_for_seq(7).await.unwrap();
1772
1773 let folded = adapter.folded_through_seq();
1774 let applied = adapter.applied_through_seq();
1775 assert_eq!(folded, Some(7), "folded includes the skipped seq's slot");
1776 assert_eq!(
1777 applied,
1778 Some(4),
1779 "strict-prefix applied caps at the last consecutive Ok (seq 4) before the gap at seq 5",
1780 );
1781
1782 let (_bytes, last_seq) = adapter.snapshot().unwrap();
1783 // snapshot reflects applied (strict-prefix), not folded.
1784 // Pre-fix snapshot returned Some(7) — restore would tail
1785 // from seq 8 and seq 5 would be permanently lost from
1786 // durable state.
1787 assert_eq!(last_seq, applied);
1788 assert_eq!(last_seq, Some(4));
1789 }
1790
1791 /// Strict shape: when the LAST processed event is skipped,
1792 /// snapshot's `last_seq` must stay at the highest *applied*
1793 /// seq — not advance to the skipped one. Without the split,
1794 /// snapshot returned the skipped seq; restore tailed from
1795 /// past it; skipped event was lost.
1796 #[tokio::test]
1797 async fn snapshot_last_seq_does_not_advance_to_a_skipped_tail() {
1798 let redex = Redex::new();
1799 let attempts = Arc::new(AtomicU64::new(0));
1800 let fold = FailDecodeAtSeqs {
1801 // Skip the LAST event so applied < folded at snapshot time.
1802 skip: [4u64].into_iter().collect(),
1803 attempts: attempts.clone(),
1804 };
1805 let adapter = CortexAdapter::<u64>::open(
1806 &redex,
1807 &cn("cortex/audit-7-tail-skip"),
1808 RedexFileConfig::default(),
1809 CortexAdapterConfig::default(),
1810 fold,
1811 0u64,
1812 )
1813 .unwrap();
1814
1815 for i in 0..5u64 {
1816 let meta = EventMeta::new(0, 0, 0, i, 0);
1817 let env = EventEnvelope::new(meta, Bytes::from_static(b""));
1818 adapter.ingest(env).unwrap();
1819 }
1820 adapter.wait_for_seq(4).await.unwrap();
1821
1822 assert_eq!(adapter.folded_through_seq(), Some(4));
1823 assert_eq!(
1824 adapter.applied_through_seq(),
1825 Some(3),
1826 "tail apply was skipped ⇒ applied stays at the prior successful seq",
1827 );
1828
1829 let (_bytes, last_seq) = adapter.snapshot().unwrap();
1830 assert_eq!(
1831 last_seq,
1832 Some(3),
1833 "pre-fix this would be Some(4) and a restore would tail from 5, \
1834 dropping seq 4 permanently from durable state",
1835 );
1836 }
1837
1838 /// End-to-end: an adapter whose fold previously skipped seq
1839 /// N via recoverable_decode is snapshotted, closed, and
1840 /// reopened via `open_from_snapshot` with a fold that NOW
1841 /// handles seq N successfully (e.g. a software upgrade fixed
1842 /// the decoder). On restore, seq N must be re-fed to the
1843 /// fold function — without the strict-prefix watermark,
1844 /// snapshot's `last_seq` would be the highest folded
1845 /// (advanced over skips), restore would tail from past the
1846 /// skipped seq, and the previously-skipped event would be
1847 /// permanently lost.
1848 ///
1849 /// Asserts the strict-prefix `last_seq` and that the skipped
1850 /// seq is re-fed on restore. State-count after restore is
1851 /// fold-dependent (see `snapshot` doc on the re-apply
1852 /// double-counting trade-off) and is not pinned here.
1853 #[tokio::test]
1854 async fn restore_after_skip_re_attempts_skipped_event() {
1855 let redex = Redex::new();
1856 let pre_attempts = Arc::new(AtomicU64::new(0));
1857 let pre_fold = FailDecodeAtSeqs {
1858 skip: [2u64].into_iter().collect(),
1859 attempts: pre_attempts.clone(),
1860 };
1861 let adapter = CortexAdapter::<u64>::open(
1862 &redex,
1863 &cn("cortex/audit-6-7-restore-reattempt"),
1864 RedexFileConfig::default(),
1865 CortexAdapterConfig::default(),
1866 pre_fold,
1867 0u64,
1868 )
1869 .unwrap();
1870
1871 for i in 0..5u64 {
1872 let meta = EventMeta::new(0, 0, 0, i, 0);
1873 let env = EventEnvelope::new(meta, Bytes::from_static(b""));
1874 adapter.ingest(env).unwrap();
1875 }
1876 adapter.wait_for_seq(4).await.unwrap();
1877
1878 // Pre-snapshot state: STRICT-PREFIX applied caps at 1
1879 // (seq 2 was skipped; the prefix breaks there). Folded
1880 // reached 4. The fold function still wrote state for
1881 // every Ok (0,1,3,4) so state count is 4.
1882 assert_eq!(adapter.applied_through_seq(), Some(1));
1883 assert_eq!(adapter.folded_through_seq(), Some(4));
1884 assert_eq!(*adapter.state().read(), 4);
1885 let (bytes, last_seq) = adapter.snapshot().unwrap();
1886 // snapshot must reflect applied (strict-prefix), so
1887 // restore tails from seq 2 and re-attempts the skipped
1888 // event plus everything after.
1889 assert_eq!(last_seq, Some(1));
1890 adapter.close().unwrap();
1891
1892 // Reopen with a fold that handles seq 2 successfully
1893 // this time (no entries in `skip`). The previously-
1894 // skipped event must be re-fed, AND so must seqs 3 and
1895 // 4 (because their prior application is not part of the
1896 // snapshot's strict-prefix watermark — only seqs 0..=1
1897 // are guaranteed reflected in the rehydrated state).
1898 let post_attempts = Arc::new(AtomicU64::new(0));
1899 let post_fold = FailDecodeAtSeqs {
1900 skip: std::collections::HashSet::new(),
1901 attempts: post_attempts.clone(),
1902 };
1903 let restored = CortexAdapter::<u64>::open_from_snapshot(
1904 &redex,
1905 &cn("cortex/audit-6-7-restore-reattempt"),
1906 RedexFileConfig::default(),
1907 CortexAdapterConfig::default(),
1908 post_fold,
1909 &bytes,
1910 last_seq,
1911 )
1912 .unwrap();
1913
1914 restored.wait_for_seq(4).await.unwrap();
1915
1916 // Pre-fix this assertion would FAIL: snapshot would have
1917 // returned Some(4) (folded), restore would tail from
1918 // seq 5, the post_fold would never see seq 2, and the
1919 // skipped event would be permanently lost.
1920 let post_invocations = post_attempts.load(Ordering::Acquire);
1921 assert_eq!(
1922 post_invocations, 3,
1923 "post-restore fold must be re-fed seqs 2, 3, 4 (everything past the \
1924 snapshot's strict-prefix watermark)",
1925 );
1926 // After restore, the strict-prefix watermark heals all
1927 // the way to seq 4 because every event from start_seq=2
1928 // onwards applied successfully on the second pass.
1929 assert_eq!(restored.applied_through_seq(), Some(4));
1930 // State count is fold-dependent on idempotency. CountFold
1931 // is *not* idempotent (each apply increments), so seqs
1932 // 3 and 4 are double-counted: pre-restore in-memory
1933 // state was 4 (seqs 0,1,3,4 applied), snapshot bytes
1934 // serialized that 4, restore deserializes 4 + applies
1935 // seqs 2,3,4 = 7. The audit-fix's *correctness* claim is
1936 // not "state matches what a re-fold from scratch would
1937 // produce" — that requires either an idempotent fold or
1938 // snapshotting only when there's no gap (see `snapshot`
1939 // doc). The claim is "the skipped seq is re-fed instead
1940 // of being permanently lost from durable state" — pinned
1941 // by `post_invocations == 3` above.
1942 assert_eq!(
1943 *restored.state().read(),
1944 7,
1945 "non-idempotent CountFold double-counts seqs past the gap; this is the \
1946 documented re-apply trade-off, not a bug",
1947 );
1948 }
1949
1950 /// Pin: `wait_for_seq(seq)` returns promptly even when seq
1951 /// was skipped via recoverable_decode. The DoS-resistance
1952 /// contract (one bad event must not deadlock live consumers)
1953 /// is preserved by the `applied`/`folded` split — `wait_for_seq`
1954 /// still waits on `folded`, which advances over skips.
1955 #[tokio::test]
1956 async fn wait_for_seq_returns_after_recoverable_decode_skip() {
1957 let redex = Redex::new();
1958 let attempts = Arc::new(AtomicU64::new(0));
1959 let fold = FailDecodeAtSeqs {
1960 skip: [0u64].into_iter().collect(),
1961 attempts: attempts.clone(),
1962 };
1963 let adapter = CortexAdapter::<u64>::open(
1964 &redex,
1965 &cn("cortex/audit-6-no-deadlock"),
1966 RedexFileConfig::default(),
1967 CortexAdapterConfig::default(),
1968 fold,
1969 0u64,
1970 )
1971 .unwrap();
1972
1973 let meta = EventMeta::new(0, 0, 0, 0, 0);
1974 let env = EventEnvelope::new(meta, Bytes::from_static(b""));
1975 let seq = adapter.ingest(env).unwrap();
1976
1977 // wait_for_seq must NOT block on the skipped seq.
1978 // Tight timeout — pre-fix shape never had this hazard;
1979 // the post-fix split must preserve it. (If `wait_for_seq`
1980 // ever silently switches to `applied_through_seq`, this
1981 // regresses with an indefinite hang.)
1982 tokio::time::timeout(std::time::Duration::from_secs(2), adapter.wait_for_seq(seq))
1983 .await
1984 .expect("wait_for_seq must return promptly even for a recoverable-decode-skipped seq")
1985 .expect("recoverable-decode skip still advances the folded watermark");
1986
1987 // Confirm what was actually observable at return:
1988 // folded reached seq, applied did not.
1989 assert_eq!(adapter.folded_through_seq(), Some(0));
1990 assert_eq!(adapter.applied_through_seq(), None);
1991 }
1992
1993 // ========================================================================
1994 // open must reject FromSeq(n>0) / LiveOnly
1995 // ========================================================================
1996
1997 /// `open` rejects `StartPosition::FromSeq(n)` for n > 0
1998 /// because the watermark would advance past events the adapter
1999 /// never folded, leaving `wait_for_seq` lying about applied
2000 /// state. Callers that intentionally skip a prefix must use
2001 /// `open_from_snapshot`.
2002 #[test]
2003 fn open_rejects_from_seq_n_greater_than_zero() {
2004 let redex = Redex::new();
2005 let cfg = CortexAdapterConfig::new().with_start(StartPosition::FromSeq(5));
2006 let result = CortexAdapter::<u64>::open(
2007 &redex,
2008 &cn("cortex/from-seq-guard"),
2009 RedexFileConfig::default(),
2010 cfg,
2011 CountFold,
2012 0u64,
2013 );
2014 assert!(
2015 matches!(result, Err(CortexAdapterError::InvalidStartPosition(_))),
2016 "open must reject FromSeq(n>0), got {:?}",
2017 result.map(|_| "Ok"),
2018 );
2019 }
2020
2021 /// `open` rejects `StartPosition::LiveOnly` for the same
2022 /// reason — the start_seq is `file.next_seq()`, so any prior
2023 /// events go un-folded but the watermark advances past them.
2024 #[test]
2025 fn open_rejects_live_only_start_position() {
2026 let redex = Redex::new();
2027 let cfg = CortexAdapterConfig::new().with_start(StartPosition::LiveOnly);
2028 let result = CortexAdapter::<u64>::open(
2029 &redex,
2030 &cn("cortex/live-only-guard"),
2031 RedexFileConfig::default(),
2032 cfg,
2033 CountFold,
2034 0u64,
2035 );
2036 assert!(
2037 matches!(result, Err(CortexAdapterError::InvalidStartPosition(_))),
2038 "open must reject LiveOnly, got {:?}",
2039 result.map(|_| "Ok"),
2040 );
2041 }
2042
2043 // ========================================================================
2044 // Stop policy must skip-and-continue on per-event Decode errors
2045 // ========================================================================
2046
2047 struct FailDecodeAtSeq(u64);
2048 impl RedexFold<u64> for FailDecodeAtSeq {
2049 fn apply(&mut self, ev: &RedexEvent, state: &mut u64) -> Result<(), RedexError> {
2050 if ev.entry.seq == self.0 {
2051 // Decode-class error: simulates a corrupt postcard
2052 // tail / EventMeta shape mismatch / checksum miss
2053 // — exactly what the cortex fold paths surface as
2054 // RedexError::Decode.
2055 Err(RedexError::Decode(format!(
2056 "deliberate decode failure at seq {}",
2057 ev.entry.seq
2058 )))
2059 } else {
2060 *state += 1;
2061 Ok(())
2062 }
2063 }
2064 }
2065
2066 /// Under `Stop` policy, a `RedexError::Decode` MUST NOT halt
2067 /// the fold task — it's a per-event recoverable failure
2068 /// (corrupt event payload past the 32-bit checksum, or an
2069 /// attacker-crafted matching collision). Pre-fix this hung
2070 /// the task on the first bad event, DoSing the cortex via one
2071 /// payload. Post-fix: the bad event is logged + skipped, the
2072 /// watermark advances, and subsequent events still fold.
2073 #[tokio::test]
2074 async fn stop_policy_skips_recoverable_decode_error() {
2075 let redex = Redex::new();
2076 let adapter = CortexAdapter::<u64>::open(
2077 &redex,
2078 &cn("cortex/decode-skip"),
2079 RedexFileConfig::default(),
2080 CortexAdapterConfig::default(), // Stop is default
2081 FailDecodeAtSeq(3),
2082 0u64,
2083 )
2084 .unwrap();
2085
2086 for i in 0..10u64 {
2087 let meta = EventMeta::new(0, 0, 0, i, 0);
2088 let env = EventEnvelope::new(meta, Bytes::from_static(b""));
2089 let seq = adapter.ingest(env).unwrap();
2090 adapter.wait_for_seq(seq).await.unwrap();
2091 }
2092
2093 // Fold task is still running — the decode error didn't
2094 // halt it. fold_errors counts the one bad event.
2095 assert!(
2096 adapter.is_running(),
2097 "Stop policy must NOT halt on RedexError::Decode"
2098 );
2099 assert_eq!(adapter.fold_errors(), 1);
2100 // Seqs 0,1,2,4,5,6,7,8,9 folded; seq 3 skipped.
2101 assert_eq!(*adapter.state().read(), 9);
2102 }
2103
2104 /// `Encode` errors (storage / user-fold-level) STILL halt
2105 /// under `Stop` — pins the conservative boundary so the
2106 /// recoverable-decode carve-out is strictly limited to per-event
2107 /// decode failures. The pre-existing `test_stop_policy_halts_on_first_error`
2108 /// already exercises this with `RedexError::Encode`, but we
2109 /// pin the contract explicitly here so a future expansion of
2110 /// `is_recoverable_decode` (e.g. accidentally including
2111 /// `Encode`) is caught.
2112 #[test]
2113 fn redex_error_recoverable_decode_classification_is_decode_only() {
2114 assert!(RedexError::Decode("x".into()).is_recoverable_decode());
2115 assert!(!RedexError::Encode("x".into()).is_recoverable_decode());
2116 assert!(!RedexError::Closed.is_recoverable_decode());
2117 assert!(!RedexError::Io("x".into()).is_recoverable_decode());
2118 assert!(!RedexError::Lagged.is_recoverable_decode());
2119 assert!(!RedexError::Unauthorized.is_recoverable_decode());
2120 }
2121
2122 /// `FromSeq(0)` is equivalent to `FromBeginning` (no events
2123 /// skipped) and must still be accepted — pins the boundary so
2124 /// the start-position guard doesn't accidentally lock out the
2125 /// degenerate-but-valid `FromSeq(0)` form.
2126 #[tokio::test]
2127 async fn open_accepts_from_seq_zero() {
2128 let redex = Redex::new();
2129 let cfg = CortexAdapterConfig::new().with_start(StartPosition::FromSeq(0));
2130 CortexAdapter::<u64>::open(
2131 &redex,
2132 &cn("cortex/from-seq-zero"),
2133 RedexFileConfig::default(),
2134 cfg,
2135 CountFold,
2136 0u64,
2137 )
2138 .expect("FromSeq(0) is equivalent to FromBeginning and must be accepted");
2139 }
2140
2141 // ========================================================================
2142 // changes_with_lag must surface BroadcastStream::Lagged
2143 // ========================================================================
2144
2145 /// `changes_with_lag` yields a `ChangeEvent::Lagged(n)` when a
2146 /// subscriber falls behind the broadcast channel capacity. Pre-
2147 /// fix `changes()` silently dropped these events via
2148 /// `filter_map(|r| r.ok())`; downstream telemetry consumers had
2149 /// no way to detect or count missed change notifications.
2150 ///
2151 /// Setup: subscribe, then ingest more than CHANGES_BROADCAST_CAP
2152 /// (64) events without polling the stream. The broadcast channel
2153 /// drops the oldest, and the next stream poll surfaces a
2154 /// `Lagged(n)` for the dropped count.
2155 #[tokio::test]
2156 async fn changes_with_lag_yields_lagged_when_subscriber_falls_behind() {
2157 let redex = Redex::new();
2158 let adapter = CortexAdapter::<u64>::open(
2159 &redex,
2160 &cn("cortex/lag"),
2161 RedexFileConfig::default(),
2162 CortexAdapterConfig::default(),
2163 CountFold,
2164 0u64,
2165 )
2166 .unwrap();
2167
2168 let stream = adapter.changes_with_lag();
2169 tokio::pin!(stream);
2170
2171 // Ingest CHANGES_BROADCAST_CAP + 16 events without polling
2172 // the stream. The broadcast channel will drop the oldest 16
2173 // (or thereabouts — the exact count depends on broadcast
2174 // semantics; we just need at least one Lagged emission).
2175 let total = (CHANGES_BROADCAST_CAP + 16) as u64;
2176 for i in 0..total {
2177 let meta = EventMeta::new(0, 0, 0, i, 0);
2178 let env = EventEnvelope::new(meta, Bytes::from_static(b""));
2179 let seq = adapter.ingest(env).unwrap();
2180 adapter.wait_for_seq(seq).await.unwrap();
2181 }
2182
2183 // First poll should see a Lagged event (the broadcast channel
2184 // has overflowed). Drain the stream up to a reasonable cap and
2185 // assert at least one Lagged event was observed.
2186 let mut saw_lagged = false;
2187 let mut saw_seq = false;
2188 for _ in 0..(total as usize + 4) {
2189 match tokio::time::timeout(std::time::Duration::from_millis(50), stream.next()).await {
2190 Ok(Some(ChangeEvent::Lagged(n))) => {
2191 saw_lagged = true;
2192 assert!(n > 0, "Lagged count must be positive");
2193 }
2194 Ok(Some(ChangeEvent::Seq(_))) => {
2195 saw_seq = true;
2196 }
2197 Ok(None) | Err(_) => break,
2198 }
2199 }
2200 assert!(
2201 saw_lagged,
2202 "subscriber that fell behind {} events must observe Lagged",
2203 CHANGES_BROADCAST_CAP + 16,
2204 );
2205 assert!(
2206 saw_seq,
2207 "the stream should still emit Seq events after the lag",
2208 );
2209 }
2210
2211 /// Pin: when the tail stream surfaces `RedexError::Lagged` the
2212 /// fold task catches the gap up via direct in-memory reads and
2213 /// resubscribes live, rather than silently exiting. Pre-fix
2214 /// the match arm broke out of the fold loop the first time the
2215 /// subscriber fell behind `tail_buffer_size`; state then never
2216 /// advanced past the lag point and `wait_for_seq` returned
2217 /// immediately on the `running == false` branch with no
2218 /// indication anything went wrong.
2219 ///
2220 /// Triggers `Lagged` deterministically via the backfill
2221 /// pre-flight: 50 events in the index with `tail_buffer_size =
2222 /// 4` exceeds the buffer, so `RedexFile::tail` signals
2223 /// `Lagged` as the very first stream item.
2224 #[tokio::test]
2225 async fn fold_task_recovers_from_tail_lagged() {
2226 let redex = Redex::new();
2227 let cfg = RedexFileConfig::default().with_tail_buffer_size(4);
2228
2229 // Stage the gap in the file's in-memory index without
2230 // going through an adapter (no watchers → no lag pressure
2231 // during stage). `CountFold` ignores payload, so empty
2232 // bytes are fine.
2233 let file = redex
2234 .open_file(&cn("cortex/tail-lag-recovery"), cfg)
2235 .unwrap();
2236 for _ in 0..50u64 {
2237 file.append(b"").unwrap();
2238 }
2239
2240 // Open a FromBeginning adapter on the same file. The
2241 // tail()'s backfill pre-flight sees 50 retained events vs.
2242 // buffer=4 and signals Lagged first thing. Post-fix the
2243 // recovery loop reads the gap from the index and tails
2244 // live; pre-fix the fold task exits and state stays at 0.
2245 let adapter = CortexAdapter::<u64>::open(
2246 &redex,
2247 &cn("cortex/tail-lag-recovery"),
2248 RedexFileConfig::default(), // ignored on reopen
2249 CortexAdapterConfig::default(),
2250 CountFold,
2251 0u64,
2252 )
2253 .unwrap();
2254
2255 tokio::time::timeout(std::time::Duration::from_secs(5), adapter.wait_for_seq(49))
2256 .await
2257 .expect("wait_for_seq(49) timed out — tail-Lagged recovery regressed")
2258 .expect("fold task must remain alive through Lagged recovery");
2259
2260 assert_eq!(*adapter.state().read(), 50);
2261 assert!(
2262 adapter.is_running(),
2263 "fold task must stay alive after recovering from tail lag",
2264 );
2265 assert_eq!(adapter.fold_errors(), 0);
2266 }
2267
2268 /// `changes()` continues to silently drop lag (the documented
2269 /// best-effort behavior) — pins the contract so a future
2270 /// refactor doesn't accidentally surface `Lagged` through the
2271 /// simple stream and break consumers that don't want it.
2272 #[tokio::test]
2273 async fn changes_filters_out_lag_silently() {
2274 let redex = Redex::new();
2275 let adapter = CortexAdapter::<u64>::open(
2276 &redex,
2277 &cn("cortex/lag-silent"),
2278 RedexFileConfig::default(),
2279 CortexAdapterConfig::default(),
2280 CountFold,
2281 0u64,
2282 )
2283 .unwrap();
2284
2285 let stream = adapter.changes();
2286 tokio::pin!(stream);
2287
2288 // Same overflow setup.
2289 let total = (CHANGES_BROADCAST_CAP + 16) as u64;
2290 for i in 0..total {
2291 let meta = EventMeta::new(0, 0, 0, i, 0);
2292 let env = EventEnvelope::new(meta, Bytes::from_static(b""));
2293 let seq = adapter.ingest(env).unwrap();
2294 adapter.wait_for_seq(seq).await.unwrap();
2295 }
2296
2297 // Drain everything we can from the stream. Item type is `u64`
2298 // (not Result), so we can't observe Lagged in any form. Just
2299 // verify the stream still produces some seqs without errors.
2300 let mut got_seq = false;
2301 for _ in 0..(total as usize + 4) {
2302 match tokio::time::timeout(std::time::Duration::from_millis(50), stream.next()).await {
2303 Ok(Some(_seq)) => {
2304 got_seq = true;
2305 }
2306 Ok(None) | Err(_) => break,
2307 }
2308 }
2309 assert!(got_seq, "changes() must still emit seqs after lag");
2310 }
2311
2312 #[tokio::test]
2313 async fn test_log_and_continue_skips_errors() {
2314 let redex = Redex::new();
2315 let cfg =
2316 CortexAdapterConfig::new().with_fold_error_policy(FoldErrorPolicy::LogAndContinue);
2317 let adapter = CortexAdapter::<u64>::open(
2318 &redex,
2319 &cn("cortex/lc"),
2320 RedexFileConfig::default(),
2321 cfg,
2322 FailAtSeq(3),
2323 0u64,
2324 )
2325 .unwrap();
2326
2327 for i in 0..10u64 {
2328 let meta = EventMeta::new(0, 0, 0, i, 0);
2329 let env = EventEnvelope::new(meta, Bytes::from_static(b""));
2330 let seq = adapter.ingest(env).unwrap();
2331 adapter.wait_for_seq(seq).await.unwrap();
2332 }
2333
2334 assert!(adapter.is_running());
2335 assert_eq!(adapter.fold_errors(), 1);
2336 // All seqs except 3 were folded → state == 9.
2337 assert_eq!(*adapter.state().read(), 9);
2338 }
2339}