net/adapter/net/cortex/tasks/adapter.rs
1//! `TasksAdapter` — a typed wrapper around `CortexAdapter<TasksState>`
2//! with domain-level ingest helpers.
3
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6use std::time::Duration;
7
8use bytes::Bytes;
9use parking_lot::RwLock;
10use serde::{Deserialize, Serialize};
11
12use super::super::super::channel::ChannelName;
13use super::super::super::redex::{Redex, RedexError, RedexFileConfig, WriteToken};
14use super::super::adapter::{CortexAdapter, WaitForTokenError};
15use super::super::config::CortexAdapterConfig;
16use super::super::envelope::EventEnvelope;
17use super::super::error::CortexAdapterError;
18use super::super::meta::{compute_checksum_with_meta, EventMeta};
19use super::super::watermark::WatermarkingFold;
20use super::dispatch::{
21 DISPATCH_TASK_COMPLETED, DISPATCH_TASK_CREATED, DISPATCH_TASK_DELETED, DISPATCH_TASK_RENAMED,
22 TASKS_CHANNEL,
23};
24use super::fold::TasksFold;
25use super::state::TasksState;
26use super::types::{
27 TaskCompletedPayload, TaskCreatedPayload, TaskDeletedPayload, TaskId, TaskRenamedPayload,
28};
29use super::watch::TasksWatcher;
30
31/// Return shape of [`TasksAdapter::snapshot_and_watch`]: the
32/// initial filter result plus a boxed stream that emits every
33/// subsequent change (dedup'd, with the initial skipped so the
34/// caller doesn't double-render).
35pub type TasksSnapshotAndWatch = (
36 Vec<super::types::Task>,
37 std::pin::Pin<Box<dyn futures::Stream<Item = Vec<super::types::Task>> + Send + 'static>>,
38);
39
40use futures::StreamExt;
41
42/// Wire format for [`TasksAdapter::snapshot`]: wraps the `TasksState`
43/// postcard blob produced by the underlying [`CortexAdapter`] alongside
44/// the typed adapter's own `app_seq` counter so restore preserves
45/// per-origin monotonicity of `EventMeta::seq_or_ts`.
46#[derive(Serialize, Deserialize)]
47struct TasksSnapshotPayload {
48 /// Next-to-assign `app_seq` value at snapshot time — the adapter
49 /// restores its counter to this so post-restore `EventMeta`
50 /// records continue with monotonic per-origin sequencing.
51 app_seq: u64,
52 /// The `CortexAdapter::snapshot` blob (postcard of `TasksState`).
53 inner: Vec<u8>,
54}
55
56/// Typed wrapper around `CortexAdapter<TasksState>` that exposes
57/// domain-level operations (`create`, `rename`, `complete`, `delete`)
58/// and hides the `EventMeta` + postcard plumbing.
59pub struct TasksAdapter {
60 inner: CortexAdapter<TasksState>,
61 /// Producer identity stamped on every `EventMeta`.
62 origin_hash: u64,
63 /// Monotonic per-origin counter for `EventMeta::seq_or_ts`.
64 /// Shared with the inner `WatermarkingFold` wrapper around
65 /// [`TasksFold`]: the fold task advances this counter via
66 /// `fetch_max(seq_or_ts + 1)` for every replayed event whose
67 /// `origin_hash` matches ours, so reopening against a Redex
68 /// with pre-existing same-origin events produces a counter
69 /// that's already past every assigned `seq_or_ts` by the time
70 /// the constructor returns. `ingest_typed` then
71 /// load-and-CAS-commits against the same atomic.
72 app_seq: Arc<AtomicU64>,
73}
74
75impl TasksAdapter {
76 /// Open the tasks adapter against a `Redex` manager.
77 ///
78 /// Uses [`TASKS_CHANNEL`] (`"cortex/tasks"`). Replays the full
79 /// history into state on open; subsequent events are appended to
80 /// the same channel.
81 ///
82 /// `async` because the constructor awaits the fold task's
83 /// catch-up before returning: the inner `WatermarkingFold`
84 /// observes every replayed event's `EventMeta` and advances
85 /// `app_seq` past any pre-existing same-origin `seq_or_ts`,
86 /// so the first `ingest_typed` after `open` cannot collide
87 /// with an already-stored event.
88 pub async fn open(redex: &Redex, origin_hash: u64) -> Result<Self, CortexAdapterError> {
89 Self::open_with_config(redex, origin_hash, RedexFileConfig::default()).await
90 }
91
92 /// Like [`Self::open`] but with a caller-supplied `RedexFileConfig`
93 /// (useful for `persistent: true` or custom retention).
94 pub async fn open_with_config(
95 redex: &Redex,
96 origin_hash: u64,
97 redex_config: RedexFileConfig,
98 ) -> Result<Self, CortexAdapterError> {
99 let name = ChannelName::new(TASKS_CHANNEL).map_err(|e| {
100 CortexAdapterError::Redex(super::super::super::redex::RedexError::Channel(
101 e.to_string(),
102 ))
103 })?;
104 let app_seq = Arc::new(AtomicU64::new(0));
105 let fold = WatermarkingFold::new(TasksFold, app_seq.clone(), origin_hash);
106 let inner = CortexAdapter::open(
107 redex,
108 &name,
109 redex_config.clone(),
110 CortexAdapterConfig::default(),
111 fold,
112 TasksState::new(),
113 )?;
114
115 // Wait for the fold task to catch up so the wrapper has
116 // observed every pre-existing event before any caller-driven
117 // ingest can race against it. `redex.open_file` is idempotent
118 // (returns the same handle the inner adapter already holds),
119 // so re-opening here is cheap.
120 let file = redex.open_file(&name, redex_config)?;
121 let next_seq = file.next_seq();
122 if next_seq > 0 {
123 inner
124 .wait_for_seq(next_seq - 1)
125 .await
126 .map_err(|folded_through| CortexAdapterError::FoldStoppedBeforeSeq {
127 wanted: next_seq - 1,
128 folded_through,
129 })?;
130 }
131
132 Ok(Self {
133 inner,
134 origin_hash,
135 app_seq,
136 })
137 }
138
139 /// Create a new task. Returns the RedEX seq of the append.
140 pub fn create(
141 &self,
142 id: TaskId,
143 title: impl Into<String>,
144 now_ns: u64,
145 ) -> Result<u64, CortexAdapterError> {
146 let payload = TaskCreatedPayload {
147 id,
148 title: title.into(),
149 now_ns,
150 };
151 self.ingest_typed(DISPATCH_TASK_CREATED, &payload)
152 }
153
154 /// Rename an existing task. No-op at fold time if `id` is unknown.
155 pub fn rename(
156 &self,
157 id: TaskId,
158 new_title: impl Into<String>,
159 now_ns: u64,
160 ) -> Result<u64, CortexAdapterError> {
161 let payload = TaskRenamedPayload {
162 id,
163 new_title: new_title.into(),
164 now_ns,
165 };
166 self.ingest_typed(DISPATCH_TASK_RENAMED, &payload)
167 }
168
169 /// Mark a task completed. No-op at fold time if `id` is unknown.
170 pub fn complete(&self, id: TaskId, now_ns: u64) -> Result<u64, CortexAdapterError> {
171 let payload = TaskCompletedPayload { id, now_ns };
172 self.ingest_typed(DISPATCH_TASK_COMPLETED, &payload)
173 }
174
175 /// Delete a task. No-op at fold time if `id` is unknown.
176 pub fn delete(&self, id: TaskId) -> Result<u64, CortexAdapterError> {
177 let payload = TaskDeletedPayload { id };
178 self.ingest_typed(DISPATCH_TASK_DELETED, &payload)
179 }
180
181 /// Read-only access to the materialized state.
182 pub fn state(&self) -> Arc<RwLock<TasksState>> {
183 self.inner.state()
184 }
185
186 /// Total task count in the current state. Cheap; acquires the
187 /// state read lock briefly. Matches the Node/Python SDK surface.
188 pub fn count(&self) -> usize {
189 self.inner.state().read().len()
190 }
191
192 /// Block until every event up through `seq` has been folded.
193 /// Returns `Err(folded)` if the fold task stopped before
194 /// reaching `seq`; see [`CortexAdapter::wait_for_seq`] for the
195 /// stop-vs-success rationale.
196 pub async fn wait_for_seq(&self, seq: u64) -> Result<(), Option<u64>> {
197 self.inner.wait_for_seq(seq).await
198 }
199
200 /// Block until the fold task has processed every event up
201 /// through `token.seq`, or `deadline` elapses. Read-your-writes
202 /// wait: a writer who got `token` from this origin's ingest
203 /// path can call this to make sure the local fold has caught
204 /// up before reading state.
205 ///
206 /// Rejects tokens issued for a different origin with
207 /// [`WaitForTokenError::WrongOrigin`] — protects against the
208 /// `causal_tokens.get(other_origin).wait(my_token)` aliasing
209 /// failure where a wait on this adapter would never resolve
210 /// because the targeted seq belongs to someone else's chain.
211 pub async fn wait_for_token(
212 &self,
213 token: WriteToken,
214 deadline: Duration,
215 ) -> Result<(), WaitForTokenError> {
216 if token.origin_hash != self.origin_hash {
217 self.inner.note_wrong_origin();
218 return Err(WaitForTokenError::WrongOrigin {
219 token_origin: token.origin_hash,
220 adapter_origin: self.origin_hash,
221 });
222 }
223 self.inner.wait_for_token(token, deadline).await
224 }
225
226 /// Non-blocking RYW poll. Synchronously checks origin binding +
227 /// the applied watermark and returns without scheduling any
228 /// wait. Use for "is my write visible yet?" queries where the
229 /// caller doesn't want to block:
230 ///
231 /// - `Ok(())` — the write is observable; subsequent reads see it.
232 /// - `Err(WaitForTokenError::WrongOrigin {..})` — the token's
233 /// `origin_hash` doesn't match this adapter's bound origin.
234 /// - `Err(WaitForTokenError::FoldStopped {..})` — the fold task
235 /// has stopped before reaching the target seq; the write will
236 /// never become observable.
237 /// - `Err(WaitForTokenError::Timeout)` — not yet (try again later).
238 ///
239 /// Mirrors the FFI's `timeout_ms == 0` shape so every binding
240 /// can expose a "poll, don't wait" entry point with consistent
241 /// semantics. No semaphore permit is taken; `QueueFull` is not
242 /// reachable on this path.
243 pub fn poll_for_token(&self, token: WriteToken) -> Result<(), WaitForTokenError> {
244 if token.origin_hash != self.origin_hash {
245 self.inner.note_wrong_origin();
246 return Err(WaitForTokenError::WrongOrigin {
247 token_origin: token.origin_hash,
248 adapter_origin: self.origin_hash,
249 });
250 }
251 match self.inner.applied_through_seq() {
252 Some(applied) if applied >= token.seq => Ok(()),
253 _ if !self.inner.is_running() => Err(WaitForTokenError::FoldStopped {
254 applied_through_seq: self.inner.applied_through_seq(),
255 }),
256 _ => Err(WaitForTokenError::Timeout),
257 }
258 }
259
260 /// Close the adapter. See [`CortexAdapter::close`].
261 pub fn close(&self) -> Result<(), CortexAdapterError> {
262 self.inner.close()
263 }
264
265 /// True if the fold task is currently running.
266 pub fn is_running(&self) -> bool {
267 self.inner.is_running()
268 }
269
270 /// Access the wrapped [`CortexAdapter`] for cases that need the
271 /// lower-level surface.
272 pub fn as_cortex(&self) -> &CortexAdapter<TasksState> {
273 &self.inner
274 }
275
276 /// Origin hash this adapter is bound to. Stamped on every
277 /// outgoing `EventMeta`; tokens with a different origin reject
278 /// at `wait_for_token`.
279 pub fn origin_hash(&self) -> u64 {
280 self.origin_hash
281 }
282
283 /// Start building a reactive watcher. See
284 /// [`TasksWatcher::stream`] for emission semantics (initial +
285 /// deduplicated on filter-result change).
286 pub fn watch(&self) -> TasksWatcher {
287 TasksWatcher::new(self.inner.state(), self.inner.changes().boxed())
288 }
289
290 /// One-shot combo: a snapshot of the current filter result PLUS
291 /// a stream that emits every **subsequent** change to that
292 /// filter. The stream skips the initial emission so the caller
293 /// doesn't see the snapshot twice — the snapshot is the initial
294 /// state; the stream carries deltas from there forward.
295 ///
296 /// Useful for UI-style consumers: "paint what's there now, then
297 /// react to changes" without a manual dedup against the first
298 /// emission.
299 pub fn snapshot_and_watch(&self, watcher: TasksWatcher) -> TasksSnapshotAndWatch {
300 use futures::StreamExt;
301 // Compute the snapshot from the adapter's current state,
302 // reusing the watcher's configured filter. Holding the read
303 // lock only for the execute call keeps it brief.
304 let initial = {
305 let state = self.inner.state();
306 let guard = state.read();
307 watcher.spec_for_snapshot().execute(&guard)
308 };
309 // Skip ONLY the first emission, and only if it equals the
310 // snapshot. Subsequent emissions always forward. A sticky
311 // `skip_while(|c| c == &initial)` would handle the
312 // snapshot-vs-watcher race (state changes between
313 // snapshot read and `watcher.stream()` start, so the
314 // watcher's first emission ≠ snapshot — we want to forward
315 // it) but introduces a starvation hazard: under an
316 // (A → B → A) state oscillation that the single-slot
317 // `tokio::sync::watch` collapses into final A, the
318 // surviving A equals `initial` so it would be skipped —
319 // the consumer would be silent until state diverged from
320 // A. The first-only filter handles both cases:
321 // - leading match (no state change since snapshot): skip
322 // the first emission → consumer sees no duplicate
323 // - leading divergence (state changed during the race):
324 // first emission ≠ snapshot → forwarded
325 // - oscillation back to initial (A → B → A): the watch's
326 // surviving A is forwarded as the first item if state
327 // hadn't changed since snapshot — caller can dedup
328 // against their snapshot if they care, or treat it as
329 // "fold tick observed" signal.
330 // Implemented via `enumerate().filter(...)` rather than a
331 // separate state-carrying skip primitive, since
332 // `futures::StreamExt::filter` doesn't accept a `FnMut`.
333 let initial_for_stream = initial.clone();
334 let stream = watcher
335 .stream()
336 .enumerate()
337 .filter(move |(i, current)| {
338 let drop_first = *i == 0 && current == &initial_for_stream;
339 futures::future::ready(!drop_first)
340 })
341 .map(|(_, current)| current)
342 .boxed();
343 (initial, stream)
344 }
345
346 /// Capture a snapshot suitable for restore. Returns
347 /// `(state_bytes, last_seq)` — persist both together.
348 pub fn snapshot(&self) -> Result<(Vec<u8>, Option<u64>), CortexAdapterError> {
349 let (inner, last_seq) = self.inner.snapshot()?;
350 let payload = TasksSnapshotPayload {
351 app_seq: self.app_seq.load(Ordering::Acquire),
352 inner,
353 };
354 let bytes = postcard::to_allocvec(&payload).map_err(|e| {
355 CortexAdapterError::Redex(RedexError::Encode(format!("tasks snapshot wrap: {}", e)))
356 })?;
357 Ok((bytes, last_seq))
358 }
359
360 /// Open the tasks adapter from a snapshot, skipping replay of
361 /// events up through `last_seq`.
362 ///
363 /// See [`Self::open`] for why this is `async`.
364 pub async fn open_from_snapshot(
365 redex: &Redex,
366 origin_hash: u64,
367 state_bytes: &[u8],
368 last_seq: Option<u64>,
369 ) -> Result<Self, CortexAdapterError> {
370 Self::open_from_snapshot_with_config(
371 redex,
372 origin_hash,
373 RedexFileConfig::default(),
374 state_bytes,
375 last_seq,
376 )
377 .await
378 }
379
380 /// Like [`Self::open_from_snapshot`] but with a caller-supplied
381 /// `RedexFileConfig` (e.g. for `persistent: true`).
382 pub async fn open_from_snapshot_with_config(
383 redex: &Redex,
384 origin_hash: u64,
385 redex_config: RedexFileConfig,
386 state_bytes: &[u8],
387 last_seq: Option<u64>,
388 ) -> Result<Self, CortexAdapterError> {
389 let payload: TasksSnapshotPayload = postcard::from_bytes(state_bytes).map_err(|e| {
390 CortexAdapterError::Redex(RedexError::Encode(format!("tasks snapshot unwrap: {}", e)))
391 })?;
392 let name = ChannelName::new(TASKS_CHANNEL)
393 .map_err(|e| CortexAdapterError::Redex(RedexError::Channel(e.to_string())))?;
394
395 // Pre-load the snapshot's persisted counter into the
396 // shared atomic. The wrapper fold then advances the
397 // counter past any events written between snapshot capture
398 // and close as part of its replay pass. A separate
399 // synchronous post-`last_seq` tail walk would double IO/CPU
400 // on large logs.
401 let app_seq = Arc::new(AtomicU64::new(payload.app_seq));
402 let fold = WatermarkingFold::new(TasksFold, app_seq.clone(), origin_hash);
403 let inner = CortexAdapter::open_from_snapshot(
404 redex,
405 &name,
406 redex_config.clone(),
407 CortexAdapterConfig::default(),
408 fold,
409 &payload.inner,
410 last_seq,
411 )?;
412
413 // Wait for the wrapper fold to observe every replay-tail
414 // event before returning. `next_seq` may be `last_seq + 1`
415 // (no post-snapshot writes) in which case the wait is a
416 // no-op fast path inside `wait_for_seq`.
417 let file = redex.open_file(&name, redex_config)?;
418 let next_seq = file.next_seq();
419 if next_seq > 0 {
420 inner
421 .wait_for_seq(next_seq - 1)
422 .await
423 .map_err(|folded_through| CortexAdapterError::FoldStoppedBeforeSeq {
424 wanted: next_seq - 1,
425 folded_through,
426 })?;
427 }
428
429 Ok(Self {
430 inner,
431 origin_hash,
432 app_seq,
433 })
434 }
435
436 /// Build the `EventEnvelope` + ingest. Keeps postcard serialization
437 /// and `EventMeta` assembly in one place.
438 ///
439 /// `app_seq` is reserved with a single atomic `fetch_add`
440 /// before constructing the `EventEnvelope`. `inner.ingest`
441 /// then commits the envelope to the Redex log. If the ingest
442 /// fails, the reserved seq is "lost" — i.e. the per-origin
443 /// `seq_or_ts` space has a one-unit gap — which is harmless:
444 ///
445 /// * `WatermarkingFold` advances via `fetch_max` against
446 /// events that actually landed in the log. The gap from
447 /// a failed ingest is invisible to the watermark.
448 /// * The next successful ingest gets a strictly-larger seq,
449 /// so no duplicate is ever stamped.
450 /// * `seq_or_ts` is not required to be contiguous — it's a
451 /// monotonic per-origin tag, nothing more.
452 ///
453 /// **Why not load + ingest + CAS-commit?** That shape races
454 /// against the `WatermarkingFold` task: when the fold
455 /// processes the just-ingested event before the foreground
456 /// thread's CAS runs, the watermark advances to the expected
457 /// post-CAS value, the CAS observes the now-stale `app_seq`
458 /// mismatch, and surfaces a phantom "concurrent ingest_typed
459 /// produced duplicate app_seq" error even though no actual
460 /// duplicate happened. Single-adapter timing usually has the
461 /// foreground CAS running first; dual-adapter timing
462 /// (memories + tasks under one NetDb) gives the fold task
463 /// enough head-room to land first and the bug surfaces
464 /// deterministically. The race is in the protocol:
465 /// `fetch_add` sidesteps it.
466 ///
467 /// **Why no `fetch_sub` rollback on ingest failure?** This is
468 /// the chosen design — see the harmlessness rationale above:
469 /// the gap is invisible to `WatermarkingFold` (it advances via
470 /// `fetch_max` against landed events), the next successful
471 /// ingest gets a strictly-larger seq, and `seq_or_ts` is a
472 /// monotonic tag, not a contiguous counter. A `fetch_sub` on
473 /// `Err` would re-introduce the CAS-style race described
474 /// above: two foreground threads each `fetch_add` then `ingest`;
475 /// if A's ingest fails and A `fetch_sub`s after B already
476 /// `fetch_add`-ed, B's reserved seq jumps backwards and the
477 /// next thread can collide. The pre-fix audit doc warned that
478 /// a higher counter could survive a snapshot/restore; in
479 /// practice the second-adapter-on-same-origin recovery via
480 /// on-disk scan is gated by the substrate's already-required
481 /// uniqueness contract (one in-memory adapter per
482 /// `(channel, origin_hash)`), so the cross-adapter collision
483 /// described there is unreachable today.
484 fn ingest_typed<T: serde::Serialize>(
485 &self,
486 dispatch: u8,
487 payload: &T,
488 ) -> Result<u64, CortexAdapterError> {
489 let tail = postcard::to_allocvec(payload).map_err(|e| {
490 CortexAdapterError::Redex(super::super::super::redex::RedexError::Encode(
491 e.to_string(),
492 ))
493 })?;
494 let app_seq = self.app_seq.fetch_add(1, Ordering::AcqRel);
495 // Build the meta with checksum=0 first; `compute_checksum_with_meta`
496 // hashes the header (with the checksum slot zeroed) plus
497 // the tail, closing the audit-#8 dispatch-flip undercoverage
498 // hole that the legacy tail-only `compute_checksum` left
499 // open.
500 let mut meta = EventMeta::new(dispatch, 0, self.origin_hash, app_seq, 0);
501 meta.checksum = compute_checksum_with_meta(&meta, &tail);
502 let payload_bytes = Bytes::from(tail);
503 let env = EventEnvelope::new(meta, payload_bytes);
504 self.inner.ingest(env)
505 }
506}
507
508impl std::fmt::Debug for TasksAdapter {
509 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
510 f.debug_struct("TasksAdapter")
511 .field("origin_hash", &self.origin_hash)
512 .field("app_seq", &self.app_seq.load(Ordering::Acquire))
513 .field("inner", &self.inner)
514 .finish()
515 }
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521 use crate::adapter::net::redex::Redex;
522
523 /// Cross-origin aliasing protection on the RYW surface. A
524 /// `WriteToken` is `(origin_hash, seq)`; if the adapter accepted
525 /// a token bound to a different origin, a wait would either
526 /// resolve against someone else's chain (silent RYW
527 /// violation) or block forever (the targeted seq never
528 /// arrives on this origin). The guard at both `wait_for_token`
529 /// and `poll_for_token` short-circuits with `WrongOrigin` and
530 /// bumps the `wrong_origin_total` RYW metric so operator
531 /// dashboards see the leak attempt.
532 #[tokio::test]
533 async fn poll_and_wait_for_token_reject_mismatched_origin() {
534 const OUR_ORIGIN: u64 = 0xAAAA_BBBB_CCCC_DDDD;
535 const FOREIGN_ORIGIN: u64 = 0x1111_2222_3333_4444;
536 let redex = Redex::new();
537 let adapter = TasksAdapter::open(&redex, OUR_ORIGIN).await.unwrap();
538 assert_eq!(adapter.origin_hash(), OUR_ORIGIN);
539
540 // Counter starts at 0.
541 assert_eq!(adapter.as_cortex().ryw_metrics().wrong_origin_total, 0);
542
543 let foreign_token = WriteToken::new(FOREIGN_ORIGIN, 0);
544
545 // Synchronous poll: must reject with WrongOrigin and
546 // bump the counter (proves the guard fired, not just that
547 // some unrelated branch returned Err).
548 match adapter.poll_for_token(foreign_token) {
549 Err(WaitForTokenError::WrongOrigin {
550 token_origin,
551 adapter_origin,
552 }) => {
553 assert_eq!(token_origin, FOREIGN_ORIGIN);
554 assert_eq!(adapter_origin, OUR_ORIGIN);
555 }
556 other => panic!("expected WrongOrigin, got {:?}", other),
557 }
558 assert_eq!(adapter.as_cortex().ryw_metrics().wrong_origin_total, 1);
559
560 // Async wait: same contract; counter increments again.
561 match adapter
562 .wait_for_token(foreign_token, Duration::from_millis(10))
563 .await
564 {
565 Err(WaitForTokenError::WrongOrigin { .. }) => {}
566 other => panic!("expected WrongOrigin, got {:?}", other),
567 }
568 assert_eq!(adapter.as_cortex().ryw_metrics().wrong_origin_total, 2);
569
570 // Sanity: a token with the right origin (even at a seq we
571 // haven't reached) returns Timeout, not WrongOrigin —
572 // proves the guard is keyed on origin, not on seq.
573 let our_token = WriteToken::new(OUR_ORIGIN, 999);
574 match adapter.poll_for_token(our_token) {
575 Err(WaitForTokenError::Timeout) => {}
576 other => panic!("expected Timeout for matched-origin token, got {:?}", other),
577 }
578 // wrong_origin_total must NOT have moved for the matched-origin call.
579 assert_eq!(adapter.as_cortex().ryw_metrics().wrong_origin_total, 2);
580
581 adapter.close().unwrap();
582 }
583}