Skip to main content

net/adapter/net/cortex/tasks/
adapter.rs

1//! `TasksAdapter` — a typed wrapper around `CortexAdapter<TasksState>`
2//! with domain-level ingest helpers.
3
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6use std::time::Duration;
7
8use bytes::Bytes;
9use parking_lot::RwLock;
10use serde::{Deserialize, Serialize};
11
12use super::super::super::channel::ChannelName;
13use super::super::super::redex::{Redex, RedexError, RedexFileConfig, WriteToken};
14use super::super::adapter::{CortexAdapter, WaitForTokenError};
15use super::super::config::CortexAdapterConfig;
16use super::super::envelope::EventEnvelope;
17use super::super::error::CortexAdapterError;
18use super::super::meta::{compute_checksum_with_meta, EventMeta};
19use super::super::watermark::WatermarkingFold;
20use super::dispatch::{
21    DISPATCH_TASK_COMPLETED, DISPATCH_TASK_CREATED, DISPATCH_TASK_DELETED, DISPATCH_TASK_RENAMED,
22    TASKS_CHANNEL,
23};
24use super::fold::TasksFold;
25use super::state::TasksState;
26use super::types::{
27    TaskCompletedPayload, TaskCreatedPayload, TaskDeletedPayload, TaskId, TaskRenamedPayload,
28};
29use super::watch::TasksWatcher;
30
31/// Return shape of [`TasksAdapter::snapshot_and_watch`]: the
32/// initial filter result plus a boxed stream that emits every
33/// subsequent change (dedup'd, with the initial skipped so the
34/// caller doesn't double-render).
35pub type TasksSnapshotAndWatch = (
36    Vec<super::types::Task>,
37    std::pin::Pin<Box<dyn futures::Stream<Item = Vec<super::types::Task>> + Send + 'static>>,
38);
39
40use futures::StreamExt;
41
42/// Wire format for [`TasksAdapter::snapshot`]: wraps the `TasksState`
43/// postcard blob produced by the underlying [`CortexAdapter`] alongside
44/// the typed adapter's own `app_seq` counter so restore preserves
45/// per-origin monotonicity of `EventMeta::seq_or_ts`.
46#[derive(Serialize, Deserialize)]
47struct TasksSnapshotPayload {
48    /// Next-to-assign `app_seq` value at snapshot time — the adapter
49    /// restores its counter to this so post-restore `EventMeta`
50    /// records continue with monotonic per-origin sequencing.
51    app_seq: u64,
52    /// The `CortexAdapter::snapshot` blob (postcard of `TasksState`).
53    inner: Vec<u8>,
54}
55
56/// Typed wrapper around `CortexAdapter<TasksState>` that exposes
57/// domain-level operations (`create`, `rename`, `complete`, `delete`)
58/// and hides the `EventMeta` + postcard plumbing.
59pub struct TasksAdapter {
60    inner: CortexAdapter<TasksState>,
61    /// Producer identity stamped on every `EventMeta`.
62    origin_hash: u64,
63    /// Monotonic per-origin counter for `EventMeta::seq_or_ts`.
64    /// Shared with the inner `WatermarkingFold` wrapper around
65    /// [`TasksFold`]: the fold task advances this counter via
66    /// `fetch_max(seq_or_ts + 1)` for every replayed event whose
67    /// `origin_hash` matches ours, so reopening against a Redex
68    /// with pre-existing same-origin events produces a counter
69    /// that's already past every assigned `seq_or_ts` by the time
70    /// the constructor returns. `ingest_typed` then
71    /// load-and-CAS-commits against the same atomic.
72    app_seq: Arc<AtomicU64>,
73}
74
75impl TasksAdapter {
76    /// Open the tasks adapter against a `Redex` manager.
77    ///
78    /// Uses [`TASKS_CHANNEL`] (`"cortex/tasks"`). Replays the full
79    /// history into state on open; subsequent events are appended to
80    /// the same channel.
81    ///
82    /// `async` because the constructor awaits the fold task's
83    /// catch-up before returning: the inner `WatermarkingFold`
84    /// observes every replayed event's `EventMeta` and advances
85    /// `app_seq` past any pre-existing same-origin `seq_or_ts`,
86    /// so the first `ingest_typed` after `open` cannot collide
87    /// with an already-stored event.
88    pub async fn open(redex: &Redex, origin_hash: u64) -> Result<Self, CortexAdapterError> {
89        Self::open_with_config(redex, origin_hash, RedexFileConfig::default()).await
90    }
91
92    /// Like [`Self::open`] but with a caller-supplied `RedexFileConfig`
93    /// (useful for `persistent: true` or custom retention).
94    pub async fn open_with_config(
95        redex: &Redex,
96        origin_hash: u64,
97        redex_config: RedexFileConfig,
98    ) -> Result<Self, CortexAdapterError> {
99        let name = ChannelName::new(TASKS_CHANNEL).map_err(|e| {
100            CortexAdapterError::Redex(super::super::super::redex::RedexError::Channel(
101                e.to_string(),
102            ))
103        })?;
104        let app_seq = Arc::new(AtomicU64::new(0));
105        let fold = WatermarkingFold::new(TasksFold, app_seq.clone(), origin_hash);
106        let inner = CortexAdapter::open(
107            redex,
108            &name,
109            redex_config.clone(),
110            CortexAdapterConfig::default(),
111            fold,
112            TasksState::new(),
113        )?;
114
115        // Wait for the fold task to catch up so the wrapper has
116        // observed every pre-existing event before any caller-driven
117        // ingest can race against it. `redex.open_file` is idempotent
118        // (returns the same handle the inner adapter already holds),
119        // so re-opening here is cheap.
120        let file = redex.open_file(&name, redex_config)?;
121        let next_seq = file.next_seq();
122        if next_seq > 0 {
123            inner
124                .wait_for_seq(next_seq - 1)
125                .await
126                .map_err(|folded_through| CortexAdapterError::FoldStoppedBeforeSeq {
127                    wanted: next_seq - 1,
128                    folded_through,
129                })?;
130        }
131
132        Ok(Self {
133            inner,
134            origin_hash,
135            app_seq,
136        })
137    }
138
139    /// Create a new task. Returns the RedEX seq of the append.
140    pub fn create(
141        &self,
142        id: TaskId,
143        title: impl Into<String>,
144        now_ns: u64,
145    ) -> Result<u64, CortexAdapterError> {
146        let payload = TaskCreatedPayload {
147            id,
148            title: title.into(),
149            now_ns,
150        };
151        self.ingest_typed(DISPATCH_TASK_CREATED, &payload)
152    }
153
154    /// Rename an existing task. No-op at fold time if `id` is unknown.
155    pub fn rename(
156        &self,
157        id: TaskId,
158        new_title: impl Into<String>,
159        now_ns: u64,
160    ) -> Result<u64, CortexAdapterError> {
161        let payload = TaskRenamedPayload {
162            id,
163            new_title: new_title.into(),
164            now_ns,
165        };
166        self.ingest_typed(DISPATCH_TASK_RENAMED, &payload)
167    }
168
169    /// Mark a task completed. No-op at fold time if `id` is unknown.
170    pub fn complete(&self, id: TaskId, now_ns: u64) -> Result<u64, CortexAdapterError> {
171        let payload = TaskCompletedPayload { id, now_ns };
172        self.ingest_typed(DISPATCH_TASK_COMPLETED, &payload)
173    }
174
175    /// Delete a task. No-op at fold time if `id` is unknown.
176    pub fn delete(&self, id: TaskId) -> Result<u64, CortexAdapterError> {
177        let payload = TaskDeletedPayload { id };
178        self.ingest_typed(DISPATCH_TASK_DELETED, &payload)
179    }
180
181    /// Read-only access to the materialized state.
182    pub fn state(&self) -> Arc<RwLock<TasksState>> {
183        self.inner.state()
184    }
185
186    /// Total task count in the current state. Cheap; acquires the
187    /// state read lock briefly. Matches the Node/Python SDK surface.
188    pub fn count(&self) -> usize {
189        self.inner.state().read().len()
190    }
191
192    /// Block until every event up through `seq` has been folded.
193    /// Returns `Err(folded)` if the fold task stopped before
194    /// reaching `seq`; see [`CortexAdapter::wait_for_seq`] for the
195    /// stop-vs-success rationale.
196    pub async fn wait_for_seq(&self, seq: u64) -> Result<(), Option<u64>> {
197        self.inner.wait_for_seq(seq).await
198    }
199
200    /// Block until the fold task has processed every event up
201    /// through `token.seq`, or `deadline` elapses. Read-your-writes
202    /// wait: a writer who got `token` from this origin's ingest
203    /// path can call this to make sure the local fold has caught
204    /// up before reading state.
205    ///
206    /// Rejects tokens issued for a different origin with
207    /// [`WaitForTokenError::WrongOrigin`] — protects against the
208    /// `causal_tokens.get(other_origin).wait(my_token)` aliasing
209    /// failure where a wait on this adapter would never resolve
210    /// because the targeted seq belongs to someone else's chain.
211    pub async fn wait_for_token(
212        &self,
213        token: WriteToken,
214        deadline: Duration,
215    ) -> Result<(), WaitForTokenError> {
216        if token.origin_hash != self.origin_hash {
217            self.inner.note_wrong_origin();
218            return Err(WaitForTokenError::WrongOrigin {
219                token_origin: token.origin_hash,
220                adapter_origin: self.origin_hash,
221            });
222        }
223        self.inner.wait_for_token(token, deadline).await
224    }
225
226    /// Non-blocking RYW poll. Synchronously checks origin binding +
227    /// the applied watermark and returns without scheduling any
228    /// wait. Use for "is my write visible yet?" queries where the
229    /// caller doesn't want to block:
230    ///
231    /// - `Ok(())` — the write is observable; subsequent reads see it.
232    /// - `Err(WaitForTokenError::WrongOrigin {..})` — the token's
233    ///   `origin_hash` doesn't match this adapter's bound origin.
234    /// - `Err(WaitForTokenError::FoldStopped {..})` — the fold task
235    ///   has stopped before reaching the target seq; the write will
236    ///   never become observable.
237    /// - `Err(WaitForTokenError::Timeout)` — not yet (try again later).
238    ///
239    /// Mirrors the FFI's `timeout_ms == 0` shape so every binding
240    /// can expose a "poll, don't wait" entry point with consistent
241    /// semantics. No semaphore permit is taken; `QueueFull` is not
242    /// reachable on this path.
243    pub fn poll_for_token(&self, token: WriteToken) -> Result<(), WaitForTokenError> {
244        if token.origin_hash != self.origin_hash {
245            self.inner.note_wrong_origin();
246            return Err(WaitForTokenError::WrongOrigin {
247                token_origin: token.origin_hash,
248                adapter_origin: self.origin_hash,
249            });
250        }
251        match self.inner.applied_through_seq() {
252            Some(applied) if applied >= token.seq => Ok(()),
253            _ if !self.inner.is_running() => Err(WaitForTokenError::FoldStopped {
254                applied_through_seq: self.inner.applied_through_seq(),
255            }),
256            _ => Err(WaitForTokenError::Timeout),
257        }
258    }
259
260    /// Close the adapter. See [`CortexAdapter::close`].
261    pub fn close(&self) -> Result<(), CortexAdapterError> {
262        self.inner.close()
263    }
264
265    /// True if the fold task is currently running.
266    pub fn is_running(&self) -> bool {
267        self.inner.is_running()
268    }
269
270    /// Access the wrapped [`CortexAdapter`] for cases that need the
271    /// lower-level surface.
272    pub fn as_cortex(&self) -> &CortexAdapter<TasksState> {
273        &self.inner
274    }
275
276    /// Origin hash this adapter is bound to. Stamped on every
277    /// outgoing `EventMeta`; tokens with a different origin reject
278    /// at `wait_for_token`.
279    pub fn origin_hash(&self) -> u64 {
280        self.origin_hash
281    }
282
283    /// Start building a reactive watcher. See
284    /// [`TasksWatcher::stream`] for emission semantics (initial +
285    /// deduplicated on filter-result change).
286    pub fn watch(&self) -> TasksWatcher {
287        TasksWatcher::new(self.inner.state(), self.inner.changes().boxed())
288    }
289
290    /// One-shot combo: a snapshot of the current filter result PLUS
291    /// a stream that emits every **subsequent** change to that
292    /// filter. The stream skips the initial emission so the caller
293    /// doesn't see the snapshot twice — the snapshot is the initial
294    /// state; the stream carries deltas from there forward.
295    ///
296    /// Useful for UI-style consumers: "paint what's there now, then
297    /// react to changes" without a manual dedup against the first
298    /// emission.
299    pub fn snapshot_and_watch(&self, watcher: TasksWatcher) -> TasksSnapshotAndWatch {
300        use futures::StreamExt;
301        // Compute the snapshot from the adapter's current state,
302        // reusing the watcher's configured filter. Holding the read
303        // lock only for the execute call keeps it brief.
304        let initial = {
305            let state = self.inner.state();
306            let guard = state.read();
307            watcher.spec_for_snapshot().execute(&guard)
308        };
309        // Skip ONLY the first emission, and only if it equals the
310        // snapshot. Subsequent emissions always forward. A sticky
311        // `skip_while(|c| c == &initial)` would handle the
312        // snapshot-vs-watcher race (state changes between
313        // snapshot read and `watcher.stream()` start, so the
314        // watcher's first emission ≠ snapshot — we want to forward
315        // it) but introduces a starvation hazard: under an
316        // (A → B → A) state oscillation that the single-slot
317        // `tokio::sync::watch` collapses into final A, the
318        // surviving A equals `initial` so it would be skipped —
319        // the consumer would be silent until state diverged from
320        // A. The first-only filter handles both cases:
321        //   - leading match (no state change since snapshot): skip
322        //     the first emission → consumer sees no duplicate
323        //   - leading divergence (state changed during the race):
324        //     first emission ≠ snapshot → forwarded
325        //   - oscillation back to initial (A → B → A): the watch's
326        //     surviving A is forwarded as the first item if state
327        //     hadn't changed since snapshot — caller can dedup
328        //     against their snapshot if they care, or treat it as
329        //     "fold tick observed" signal.
330        // Implemented via `enumerate().filter(...)` rather than a
331        // separate state-carrying skip primitive, since
332        // `futures::StreamExt::filter` doesn't accept a `FnMut`.
333        let initial_for_stream = initial.clone();
334        let stream = watcher
335            .stream()
336            .enumerate()
337            .filter(move |(i, current)| {
338                let drop_first = *i == 0 && current == &initial_for_stream;
339                futures::future::ready(!drop_first)
340            })
341            .map(|(_, current)| current)
342            .boxed();
343        (initial, stream)
344    }
345
346    /// Capture a snapshot suitable for restore. Returns
347    /// `(state_bytes, last_seq)` — persist both together.
348    pub fn snapshot(&self) -> Result<(Vec<u8>, Option<u64>), CortexAdapterError> {
349        let (inner, last_seq) = self.inner.snapshot()?;
350        let payload = TasksSnapshotPayload {
351            app_seq: self.app_seq.load(Ordering::Acquire),
352            inner,
353        };
354        let bytes = postcard::to_allocvec(&payload).map_err(|e| {
355            CortexAdapterError::Redex(RedexError::Encode(format!("tasks snapshot wrap: {}", e)))
356        })?;
357        Ok((bytes, last_seq))
358    }
359
360    /// Open the tasks adapter from a snapshot, skipping replay of
361    /// events up through `last_seq`.
362    ///
363    /// See [`Self::open`] for why this is `async`.
364    pub async fn open_from_snapshot(
365        redex: &Redex,
366        origin_hash: u64,
367        state_bytes: &[u8],
368        last_seq: Option<u64>,
369    ) -> Result<Self, CortexAdapterError> {
370        Self::open_from_snapshot_with_config(
371            redex,
372            origin_hash,
373            RedexFileConfig::default(),
374            state_bytes,
375            last_seq,
376        )
377        .await
378    }
379
380    /// Like [`Self::open_from_snapshot`] but with a caller-supplied
381    /// `RedexFileConfig` (e.g. for `persistent: true`).
382    pub async fn open_from_snapshot_with_config(
383        redex: &Redex,
384        origin_hash: u64,
385        redex_config: RedexFileConfig,
386        state_bytes: &[u8],
387        last_seq: Option<u64>,
388    ) -> Result<Self, CortexAdapterError> {
389        let payload: TasksSnapshotPayload = postcard::from_bytes(state_bytes).map_err(|e| {
390            CortexAdapterError::Redex(RedexError::Encode(format!("tasks snapshot unwrap: {}", e)))
391        })?;
392        let name = ChannelName::new(TASKS_CHANNEL)
393            .map_err(|e| CortexAdapterError::Redex(RedexError::Channel(e.to_string())))?;
394
395        // Pre-load the snapshot's persisted counter into the
396        // shared atomic. The wrapper fold then advances the
397        // counter past any events written between snapshot capture
398        // and close as part of its replay pass. A separate
399        // synchronous post-`last_seq` tail walk would double IO/CPU
400        // on large logs.
401        let app_seq = Arc::new(AtomicU64::new(payload.app_seq));
402        let fold = WatermarkingFold::new(TasksFold, app_seq.clone(), origin_hash);
403        let inner = CortexAdapter::open_from_snapshot(
404            redex,
405            &name,
406            redex_config.clone(),
407            CortexAdapterConfig::default(),
408            fold,
409            &payload.inner,
410            last_seq,
411        )?;
412
413        // Wait for the wrapper fold to observe every replay-tail
414        // event before returning. `next_seq` may be `last_seq + 1`
415        // (no post-snapshot writes) in which case the wait is a
416        // no-op fast path inside `wait_for_seq`.
417        let file = redex.open_file(&name, redex_config)?;
418        let next_seq = file.next_seq();
419        if next_seq > 0 {
420            inner
421                .wait_for_seq(next_seq - 1)
422                .await
423                .map_err(|folded_through| CortexAdapterError::FoldStoppedBeforeSeq {
424                    wanted: next_seq - 1,
425                    folded_through,
426                })?;
427        }
428
429        Ok(Self {
430            inner,
431            origin_hash,
432            app_seq,
433        })
434    }
435
436    /// Build the `EventEnvelope` + ingest. Keeps postcard serialization
437    /// and `EventMeta` assembly in one place.
438    ///
439    /// `app_seq` is reserved with a single atomic `fetch_add`
440    /// before constructing the `EventEnvelope`. `inner.ingest`
441    /// then commits the envelope to the Redex log. If the ingest
442    /// fails, the reserved seq is "lost" — i.e. the per-origin
443    /// `seq_or_ts` space has a one-unit gap — which is harmless:
444    ///
445    ///   * `WatermarkingFold` advances via `fetch_max` against
446    ///     events that actually landed in the log. The gap from
447    ///     a failed ingest is invisible to the watermark.
448    ///   * The next successful ingest gets a strictly-larger seq,
449    ///     so no duplicate is ever stamped.
450    ///   * `seq_or_ts` is not required to be contiguous — it's a
451    ///     monotonic per-origin tag, nothing more.
452    ///
453    /// **Why not load + ingest + CAS-commit?** That shape races
454    /// against the `WatermarkingFold` task: when the fold
455    /// processes the just-ingested event before the foreground
456    /// thread's CAS runs, the watermark advances to the expected
457    /// post-CAS value, the CAS observes the now-stale `app_seq`
458    /// mismatch, and surfaces a phantom "concurrent ingest_typed
459    /// produced duplicate app_seq" error even though no actual
460    /// duplicate happened. Single-adapter timing usually has the
461    /// foreground CAS running first; dual-adapter timing
462    /// (memories + tasks under one NetDb) gives the fold task
463    /// enough head-room to land first and the bug surfaces
464    /// deterministically. The race is in the protocol:
465    /// `fetch_add` sidesteps it.
466    ///
467    /// **Why no `fetch_sub` rollback on ingest failure?** This is
468    /// the chosen design — see the harmlessness rationale above:
469    /// the gap is invisible to `WatermarkingFold` (it advances via
470    /// `fetch_max` against landed events), the next successful
471    /// ingest gets a strictly-larger seq, and `seq_or_ts` is a
472    /// monotonic tag, not a contiguous counter. A `fetch_sub` on
473    /// `Err` would re-introduce the CAS-style race described
474    /// above: two foreground threads each `fetch_add` then `ingest`;
475    /// if A's ingest fails and A `fetch_sub`s after B already
476    /// `fetch_add`-ed, B's reserved seq jumps backwards and the
477    /// next thread can collide. The pre-fix audit doc warned that
478    /// a higher counter could survive a snapshot/restore; in
479    /// practice the second-adapter-on-same-origin recovery via
480    /// on-disk scan is gated by the substrate's already-required
481    /// uniqueness contract (one in-memory adapter per
482    /// `(channel, origin_hash)`), so the cross-adapter collision
483    /// described there is unreachable today.
484    fn ingest_typed<T: serde::Serialize>(
485        &self,
486        dispatch: u8,
487        payload: &T,
488    ) -> Result<u64, CortexAdapterError> {
489        let tail = postcard::to_allocvec(payload).map_err(|e| {
490            CortexAdapterError::Redex(super::super::super::redex::RedexError::Encode(
491                e.to_string(),
492            ))
493        })?;
494        let app_seq = self.app_seq.fetch_add(1, Ordering::AcqRel);
495        // Build the meta with checksum=0 first; `compute_checksum_with_meta`
496        // hashes the header (with the checksum slot zeroed) plus
497        // the tail, closing the audit-#8 dispatch-flip undercoverage
498        // hole that the legacy tail-only `compute_checksum` left
499        // open.
500        let mut meta = EventMeta::new(dispatch, 0, self.origin_hash, app_seq, 0);
501        meta.checksum = compute_checksum_with_meta(&meta, &tail);
502        let payload_bytes = Bytes::from(tail);
503        let env = EventEnvelope::new(meta, payload_bytes);
504        self.inner.ingest(env)
505    }
506}
507
508impl std::fmt::Debug for TasksAdapter {
509    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
510        f.debug_struct("TasksAdapter")
511            .field("origin_hash", &self.origin_hash)
512            .field("app_seq", &self.app_seq.load(Ordering::Acquire))
513            .field("inner", &self.inner)
514            .finish()
515    }
516}
517
518#[cfg(test)]
519mod tests {
520    use super::*;
521    use crate::adapter::net::redex::Redex;
522
523    /// Cross-origin aliasing protection on the RYW surface. A
524    /// `WriteToken` is `(origin_hash, seq)`; if the adapter accepted
525    /// a token bound to a different origin, a wait would either
526    /// resolve against someone else's chain (silent RYW
527    /// violation) or block forever (the targeted seq never
528    /// arrives on this origin). The guard at both `wait_for_token`
529    /// and `poll_for_token` short-circuits with `WrongOrigin` and
530    /// bumps the `wrong_origin_total` RYW metric so operator
531    /// dashboards see the leak attempt.
532    #[tokio::test]
533    async fn poll_and_wait_for_token_reject_mismatched_origin() {
534        const OUR_ORIGIN: u64 = 0xAAAA_BBBB_CCCC_DDDD;
535        const FOREIGN_ORIGIN: u64 = 0x1111_2222_3333_4444;
536        let redex = Redex::new();
537        let adapter = TasksAdapter::open(&redex, OUR_ORIGIN).await.unwrap();
538        assert_eq!(adapter.origin_hash(), OUR_ORIGIN);
539
540        // Counter starts at 0.
541        assert_eq!(adapter.as_cortex().ryw_metrics().wrong_origin_total, 0);
542
543        let foreign_token = WriteToken::new(FOREIGN_ORIGIN, 0);
544
545        // Synchronous poll: must reject with WrongOrigin and
546        // bump the counter (proves the guard fired, not just that
547        // some unrelated branch returned Err).
548        match adapter.poll_for_token(foreign_token) {
549            Err(WaitForTokenError::WrongOrigin {
550                token_origin,
551                adapter_origin,
552            }) => {
553                assert_eq!(token_origin, FOREIGN_ORIGIN);
554                assert_eq!(adapter_origin, OUR_ORIGIN);
555            }
556            other => panic!("expected WrongOrigin, got {:?}", other),
557        }
558        assert_eq!(adapter.as_cortex().ryw_metrics().wrong_origin_total, 1);
559
560        // Async wait: same contract; counter increments again.
561        match adapter
562            .wait_for_token(foreign_token, Duration::from_millis(10))
563            .await
564        {
565            Err(WaitForTokenError::WrongOrigin { .. }) => {}
566            other => panic!("expected WrongOrigin, got {:?}", other),
567        }
568        assert_eq!(adapter.as_cortex().ryw_metrics().wrong_origin_total, 2);
569
570        // Sanity: a token with the right origin (even at a seq we
571        // haven't reached) returns Timeout, not WrongOrigin —
572        // proves the guard is keyed on origin, not on seq.
573        let our_token = WriteToken::new(OUR_ORIGIN, 999);
574        match adapter.poll_for_token(our_token) {
575            Err(WaitForTokenError::Timeout) => {}
576            other => panic!("expected Timeout for matched-origin token, got {:?}", other),
577        }
578        // wrong_origin_total must NOT have moved for the matched-origin call.
579        assert_eq!(adapter.as_cortex().ryw_metrics().wrong_origin_total, 2);
580
581        adapter.close().unwrap();
582    }
583}