Skip to main content

acdp_server/registry/
store.rs

1//! Registry persistence abstraction (feature = "server").
2//!
3//! [`RegistryStore`] is the minimal contract a registry implementation
4//! must satisfy: store an immutable [`Body`] under a registry-assigned
5//! [`CtxId`], track the parent lineage, mark predecessors superseded,
6//! and project search/lineage queries. The trait is synchronous and
7//! object-safe so a [`RegistryServer`](super::server::RegistryServer)
8//! can be parameterised over any backend (in-memory for tests, SQLite
9//! for production, etc.) without an async runtime dependency.
10//!
11//! [`InMemoryStore`] is the reference implementation used by the
12//! integration tests and intended as a drop-in for prototyping.
13
14use std::sync::Mutex;
15
16use acdp_primitives::error::AcdpError;
17use acdp_types::{
18    body::{Body, FullContext, RegistryState},
19    primitives::{AgentDid, CtxId, LineageId, Status, Visibility},
20    publish::{PublishRequest, PublishResponse},
21    search::{SearchParams, SearchResponse, SearchResult},
22};
23
24/// Abstract registry persistence backend.
25///
26/// Synchronous — the in-memory implementation is mutex-guarded; async
27/// backends should wrap blocking calls with `spawn_blocking` at the
28/// HTTP boundary.
29pub trait RegistryStore: Send + Sync {
30    /// Persist a freshly-assigned context. `body.ctx_id` and
31    /// `body.lineage_id` are already populated by the server.
32    fn put(&self, body: Body) -> Result<(), AcdpError>;
33
34    /// Retrieve a stored context by `ctx_id`.
35    fn get(&self, ctx_id: &CtxId) -> Result<Option<FullContext>, AcdpError>;
36
37    /// All contexts in a lineage, oldest first.
38    fn lineage(&self, lineage_id: &LineageId) -> Result<Vec<FullContext>, AcdpError>;
39
40    /// Returns the newest non-[`Status::Superseded`] version of a lineage
41    /// — either [`Status::Active`] or [`Status::Expired`] (an
42    /// expired-but-unreplaced body is still the latest version, and
43    /// callers need to see it to know it has lapsed).
44    ///
45    /// Returns `Ok(None)` when the lineage is unknown or every version is
46    /// `Superseded` (RFC-ACDP-0004 §5.2: "if no such version exists,
47    /// returns not_found" — fixture `ret-002`). Visibility rules are NOT
48    /// applied here — filter at the server layer via
49    /// [`crate::registry::server::RegistryServer::current`].
50    fn current(&self, lineage_id: &LineageId) -> Result<Option<FullContext>, AcdpError>;
51
52    /// Mark `ctx_id`'s registry state as `superseded`. Idempotent.
53    fn mark_superseded(&self, ctx_id: &CtxId) -> Result<(), AcdpError>;
54
55    /// First-version `ctx_id` for a lineage, used to derive the
56    /// lineage_id of a supersession publish per RFC-ACDP-0001 §5.6.
57    ///
58    /// LINEAGE ANCHORING (WS-D3). Implementations SHOULD answer this
59    /// (and the supersession checks in [`Self::commit_publish`]) from
60    /// **persisted rows** — the immediate predecessor's stored
61    /// `lineage_id`/`version` and a lineage index — NOT by re-walking
62    /// the full `supersedes` chain at publish time. A registry's own
63    /// storage is trusted; anchoring removes the
64    /// `lineage_walk_failed` liveness failure where a v(N+1) publish is
65    /// rejected because some deep intermediate is unretrievable even
66    /// though the immediate predecessor exists. Reserve the full
67    /// chain walk for offline integrity audits, off the publish path.
68    /// (`InMemoryStore` implements exactly this pattern.)
69    fn first_version_ctx_id(&self, lineage_id: &LineageId) -> Result<Option<CtxId>, AcdpError>;
70
71    /// Keyword/filter search. Implementations MUST apply the RFC-ACDP-0008
72    /// §4.5 search-disclosure rules using `requester`:
73    ///
74    /// | Visibility   | Surfaces in search to                        |
75    /// |--------------|----------------------------------------------|
76    /// | `public`     | anyone                                       |
77    /// | `restricted` | producer (`agent_id`) **or** any DID in `audience` |
78    /// | `private`    | producer (`agent_id`) only — audience members must already know the ctx_id |
79    ///
80    /// `requester == None` represents an anonymous caller. Public
81    /// contexts surface only when `anonymous_public_reads` is true (the
82    /// capability flag from [`RegistryServer`](super::server::RegistryServer));
83    /// the store implements the same predicate as `RegistryServer::retrieve`
84    /// so the two endpoints stay symmetric (RFC-ACDP-0008 §4.5).
85    ///
86    /// Projection follows RFC-ACDP-0005 §2.2 `match_summary`.
87    fn search(
88        &self,
89        params: &SearchParams,
90        requester: Option<&AgentDid>,
91        anonymous_public_reads: bool,
92    ) -> Result<SearchResponse, AcdpError>;
93
94    // ── Idempotency (RFC-ACDP-0003 §6) ─────────────────────────────────
95    //
96    // Stores supporting the `idempotency_key` capability MUST implement
97    // these three methods. The default impls treat the store as
98    // non-idempotent: lookup always returns `None`, record is a no-op,
99    // evict is a no-op. A `RegistryServer` configured with
100    // `caps.supports_idempotency_key = false` MUST never call them
101    // (RFC-ACDP-0007 §3.2).
102    //
103    // ATOMICITY CONTRACT (WS-D4). Keys are scoped per `agent_id`: two
104    // different agents using the same key never interact. The
105    // idempotency record and the body persistence MUST commit
106    // atomically (single transaction / compare-and-swap / one lock —
107    // see `commit_publish`); a backend that cannot provide this (e.g.
108    // an eventually-consistent store) MUST NOT be paired with
109    // `supports_idempotency_key: true`, because concurrent identical-key
110    // publishes could mint two ctx_ids and silently defeat the
111    // guarantee the capability advertises. Durable backends should
112    // enforce a UNIQUE constraint on `(agent_id, idempotency_key)` and
113    // insert it in the same transaction as the context row.
114
115    /// Look up a prior publish record for `(agent_id, key)`.
116    ///
117    /// Returns `Some((content_hash, response))` if a record exists and
118    /// has not expired. Scoping by `agent_id` prevents a malicious
119    /// producer from poisoning another producer's key namespace
120    /// (RFC-ACDP-0003 §6 — idem-004 fixture).
121    fn idempotency_lookup(
122        &self,
123        _agent_id: &AgentDid,
124        _key: &str,
125    ) -> Result<Option<IdempotencyRecord>, AcdpError> {
126        Ok(None)
127    }
128
129    /// Record a successful publish under `(agent_id, key)` with TTL
130    /// `expires_at`. Calling on a store that does not support
131    /// idempotency is a no-op.
132    fn idempotency_record(
133        &self,
134        _agent_id: &AgentDid,
135        _key: &str,
136        _hash: &acdp_types::primitives::ContentHash,
137        _response: &acdp_types::publish::PublishResponse,
138        _expires_at: chrono::DateTime<chrono::Utc>,
139    ) -> Result<(), AcdpError> {
140        Ok(())
141    }
142
143    /// Evict records whose `expires_at` is past `now`. Implementations
144    /// may call this on a janitor schedule or lazily at lookup time.
145    fn idempotency_evict_expired(
146        &self,
147        _now: chrono::DateTime<chrono::Utc>,
148    ) -> Result<(), AcdpError> {
149        Ok(())
150    }
151
152    // ── Atomic publish commit (FEAT-01) ────────────────────────────────
153
154    /// Atomically commit a publish: idempotency lookup, supersession
155    /// validation, body insertion, predecessor supersession marking,
156    /// and idempotency record write — all under a single critical
157    /// section so two concurrent publishes targeting the same
158    /// `supersedes` (or sharing an `idempotency_key`) cannot both
159    /// succeed.
160    ///
161    /// Eliminates the TOCTOU races that the old
162    /// `put → mark_superseded → idempotency_record` sequence allowed.
163    /// Returns:
164    /// - `Inserted(response)` — the body was newly persisted.
165    /// - `IdempotentReplay(response)` — a prior record with the same
166    ///   `(agent_id, key, content_hash)` was found and its response is
167    ///   replayed verbatim (idem-002).
168    ///
169    /// On supersession contention (predecessor already marked
170    /// `Superseded`, lineage mismatch, etc.) returns
171    /// `AcdpError::SupersededTarget { reason, … }`. On idempotency-key
172    /// collision with a different `content_hash` returns
173    /// `AcdpError::DuplicatePublish` (idem-003).
174    fn commit_publish(&self, commit: PublishCommit<'_>) -> Result<PublishCommitOutcome, AcdpError>;
175}
176
177/// Single-shot atomic publish input (FEAT-01).
178///
179/// Passed to [`RegistryStore::commit_publish`] so the predecessor
180/// lookup, supersession check, body insertion, predecessor
181/// supersession marking, and idempotency record are all done under one
182/// critical section. Eliminates TOCTOU races between two concurrent
183/// publishes that target the same `supersedes` ctx_id or share an
184/// `idempotency_key`.
185pub struct PublishCommit<'a> {
186    /// The validated, signature-verified publish request.
187    pub req: &'a PublishRequest,
188    /// The hostname the registry serves — used to mint `ctx_id`s and
189    /// stored verbatim into `body.origin_registry` (BUG-01).
190    pub authority: &'a str,
191    /// Idempotency wiring, present iff the registry advertises
192    /// `caps.supports_idempotency_key` and the request carries an
193    /// `Idempotency-Key`.
194    pub idempotency: Option<PendingIdempotencyCommit<'a>>,
195    /// Tenant this publish is scoped to, if the registry is multi-tenant.
196    /// `None` means untenanted (single-tenant / V0). A multi-tenant store
197    /// MUST persist this atomically with the context row so the tenancy is
198    /// never observable as the default bucket (and never stranded there on a
199    /// crash between insert and a separate stamping UPDATE). Stores that do
200    /// not implement tenancy ignore it.
201    pub tenant: Option<&'a str>,
202    /// Receipt minting hook (ACDP 0.2, RFC-ACDP-0010). When present,
203    /// the store MUST invoke it with the fully assigned [`Body`]
204    /// (ctx_id / lineage_id / created_at populated) **inside the same
205    /// critical section / transaction as the insert**, persist the
206    /// returned receipt with the context, and include it in the
207    /// response. A context published under the
208    /// `acdp-registry-receipts` profile must never exist without its
209    /// receipt — a crash between insert and mint must not be
210    /// observable. `None` for receipt-less (0.1.0-mode) registries.
211    #[allow(clippy::type_complexity)]
212    pub receipt_minter:
213        Option<&'a (dyn Fn(&Body) -> Result<serde_json::Value, AcdpError> + Send + Sync)>,
214}
215
216/// Idempotency parameters threaded through [`PublishCommit`].
217pub struct PendingIdempotencyCommit<'a> {
218    /// The key the producer supplied in the `Idempotency-Key` header.
219    pub key: &'a str,
220    /// TTL after which the idempotency record may be evicted (typically
221    /// `caps.limits.idempotency_key_ttl_seconds`).
222    pub ttl: chrono::Duration,
223}
224
225/// Outcome of an atomic [`RegistryStore::commit_publish`].
226#[derive(Debug)]
227pub enum PublishCommitOutcome {
228    /// Fresh publish — the body was newly persisted and the response
229    /// describes the just-assigned identifiers.
230    Inserted(PublishResponse),
231    /// `(agent_id, idempotency_key)` had a prior record with the same
232    /// `content_hash` — return the original response per idem-002.
233    IdempotentReplay(PublishResponse),
234}
235
236/// Cached publish response keyed by `(agent_id, idempotency_key)`
237/// (RFC-ACDP-0003 §6).
238#[derive(Debug, Clone)]
239pub struct IdempotencyRecord {
240    /// The original request's `content_hash`. A retry with the same key
241    /// but a different hash MUST be rejected as `duplicate_publish`.
242    pub content_hash: acdp_types::primitives::ContentHash,
243    /// The response the registry returned on the first acceptance.
244    pub response: acdp_types::publish::PublishResponse,
245    /// Eviction time (TTL window from caps.limits.idempotency_key_ttl_seconds).
246    pub expires_at: chrono::DateTime<chrono::Utc>,
247}
248
249// ── In-memory reference implementation ───────────────────────────────────────
250
251/// Minimal in-memory backend. Not durable; intended for tests and
252/// prototyping. Concurrency-safe (a single `Mutex` over the table).
253#[derive(Default)]
254pub struct InMemoryStore {
255    inner: Mutex<Inner>,
256}
257
258#[derive(Default)]
259struct Inner {
260    /// All contexts keyed by `ctx_id`. Insertion-ordered per lineage
261    /// thanks to the parallel `lineages` index.
262    by_ctx: std::collections::BTreeMap<String, FullContext>,
263    /// `lineage_id -> [ctx_id, ctx_id, ...]` in publish order.
264    lineages: std::collections::BTreeMap<String, Vec<String>>,
265    /// `(agent_did, idempotency_key) -> record` (RFC-ACDP-0003 §6).
266    idempotency: std::collections::HashMap<(String, String), IdempotencyRecord>,
267}
268
269impl InMemoryStore {
270    /// Construct an empty store.
271    pub fn new() -> Self {
272        Self::default()
273    }
274
275    fn lock(&self) -> std::sync::MutexGuard<'_, Inner> {
276        self.inner.lock().expect("InMemoryStore mutex poisoned")
277    }
278}
279
280/// RFC-ACDP-0004 §4 — derive `Status::Expired` from `body.expires_at` at
281/// read time so a registry that does not run a janitor still surfaces the
282/// correct lifecycle status.
283///
284/// `Superseded` outranks `Expired` (consistent with the lifecycle precedence
285/// in RFC-ACDP-0004 §4.1 — once a successor replaces a context, expiry of
286/// the predecessor is irrelevant to the lineage's current view).
287pub(crate) fn project_status(
288    stored: &Status,
289    body: &Body,
290    now: chrono::DateTime<chrono::Utc>,
291) -> Status {
292    match stored {
293        Status::Active => match body.expires_at {
294            Some(exp) if exp <= now => Status::Expired,
295            _ => Status::Active,
296        },
297        other => other.clone(),
298    }
299}
300
301/// Materialize the effective view of a stored context: applies
302/// [`project_status`] to override the stored status when expired.
303pub(crate) fn project_context(
304    mut ctx: FullContext,
305    now: chrono::DateTime<chrono::Utc>,
306) -> FullContext {
307    ctx.registry_state.status = project_status(&ctx.registry_state.status, &ctx.body, now);
308    ctx
309}
310
311/// RFC-ACDP-0008 §4.5 search-disclosure rule.
312///
313/// Note the asymmetry vs retrieval: a `Private` context surfaces in search
314/// **only** to its producer — audience members must already know the
315/// `ctx_id` to fetch it. `Restricted` surfaces to producer + audience.
316///
317/// `anonymous_public_reads` mirrors the capability advertisement
318/// (RFC-ACDP-0008 §4.5): a registry that does NOT permit anonymous
319/// public reads MUST suppress public contexts for unauthenticated
320/// callers in both `retrieve` and `search`. The retrieval helper
321/// already consults this flag; this function pulls it through to the
322/// store-side search path (BUG-02).
323fn can_surface_in_search(
324    body: &Body,
325    requester: Option<&AgentDid>,
326    anonymous_public_reads: bool,
327) -> bool {
328    match body.visibility {
329        Visibility::Public => anonymous_public_reads || requester.is_some(),
330        Visibility::Restricted => match requester {
331            None => false,
332            Some(r) => {
333                r == &body.agent_id
334                    || body
335                        .audience
336                        .as_deref()
337                        .is_some_and(|a| a.iter().any(|d| d == r))
338            }
339        },
340        Visibility::Private => requester == Some(&body.agent_id),
341    }
342}
343
344impl RegistryStore for InMemoryStore {
345    fn put(&self, body: Body) -> Result<(), AcdpError> {
346        let ctx_id = body.ctx_id.0.clone();
347        let lineage_id = body.lineage_id.0.clone();
348        let ctx = FullContext {
349            body,
350            registry_state: RegistryState {
351                status: Status::Active,
352                extensions: Default::default(),
353            },
354            registry_receipt: None,
355            extensions: Default::default(),
356        };
357        let mut g = self.lock();
358        if g.by_ctx.contains_key(&ctx_id) {
359            return Err(AcdpError::SchemaViolation(format!(
360                "duplicate ctx_id '{ctx_id}' in store"
361            )));
362        }
363        g.by_ctx.insert(ctx_id.clone(), ctx);
364        g.lineages.entry(lineage_id).or_default().push(ctx_id);
365        Ok(())
366    }
367
368    fn get(&self, ctx_id: &CtxId) -> Result<Option<FullContext>, AcdpError> {
369        let now = chrono::Utc::now();
370        Ok(self
371            .lock()
372            .by_ctx
373            .get(ctx_id.as_str())
374            .cloned()
375            .map(|c| project_context(c, now)))
376    }
377
378    fn lineage(&self, lineage_id: &LineageId) -> Result<Vec<FullContext>, AcdpError> {
379        let now = chrono::Utc::now();
380        let g = self.lock();
381        let Some(ids) = g.lineages.get(lineage_id.as_str()) else {
382            return Ok(Vec::new());
383        };
384        Ok(ids
385            .iter()
386            .filter_map(|id| g.by_ctx.get(id).cloned().map(|c| project_context(c, now)))
387            .collect())
388    }
389
390    fn current(&self, lineage_id: &LineageId) -> Result<Option<FullContext>, AcdpError> {
391        let now = chrono::Utc::now();
392        let g = self.lock();
393        let Some(ids) = g.lineages.get(lineage_id.as_str()) else {
394            return Ok(None);
395        };
396        // RFC-ACDP-0004 §5: "Returns the unique version that has no
397        // successor. If no such version exists, returns not_found."
398        // Walk newest-to-oldest and return the first non-`Superseded`
399        // version. Both `Active` and `Expired` count — an expired
400        // body that hasn't been replaced is still the latest, and the
401        // consumer needs to see it (with status=Expired) to know it
402        // has lapsed.
403        //
404        // BUG-04: an earlier fallback returned the last entry even when
405        // every version was `Superseded`; that's a protocol violation.
406        // Now we return `None` instead.
407        for id in ids.iter().rev() {
408            if let Some(ctx) = g.by_ctx.get(id) {
409                let projected = project_context(ctx.clone(), now);
410                if !matches!(projected.registry_state.status, Status::Superseded) {
411                    return Ok(Some(projected));
412                }
413            }
414        }
415        Ok(None)
416    }
417
418    fn mark_superseded(&self, ctx_id: &CtxId) -> Result<(), AcdpError> {
419        let mut g = self.lock();
420        if let Some(ctx) = g.by_ctx.get_mut(ctx_id.as_str()) {
421            ctx.registry_state.status = Status::Superseded;
422        }
423        Ok(())
424    }
425
426    fn first_version_ctx_id(&self, lineage_id: &LineageId) -> Result<Option<CtxId>, AcdpError> {
427        let g = self.lock();
428        Ok(g.lineages
429            .get(lineage_id.as_str())
430            .and_then(|ids| ids.first().cloned())
431            .map(CtxId))
432    }
433
434    fn idempotency_lookup(
435        &self,
436        agent_id: &AgentDid,
437        key: &str,
438    ) -> Result<Option<IdempotencyRecord>, AcdpError> {
439        // Lazy TTL eviction at lookup time keeps the table bounded
440        // without requiring a janitor — see idempotency_evict_expired.
441        self.idempotency_evict_expired(chrono::Utc::now())?;
442        let g = self.lock();
443        Ok(g.idempotency
444            .get(&(agent_id.as_str().to_string(), key.to_string()))
445            .cloned())
446    }
447
448    fn idempotency_record(
449        &self,
450        agent_id: &AgentDid,
451        key: &str,
452        hash: &acdp_types::primitives::ContentHash,
453        response: &acdp_types::publish::PublishResponse,
454        expires_at: chrono::DateTime<chrono::Utc>,
455    ) -> Result<(), AcdpError> {
456        let mut g = self.lock();
457        g.idempotency.insert(
458            (agent_id.as_str().to_string(), key.to_string()),
459            IdempotencyRecord {
460                content_hash: hash.clone(),
461                response: response.clone(),
462                expires_at,
463            },
464        );
465        Ok(())
466    }
467
468    fn idempotency_evict_expired(
469        &self,
470        now: chrono::DateTime<chrono::Utc>,
471    ) -> Result<(), AcdpError> {
472        let mut g = self.lock();
473        g.idempotency.retain(|_, r| r.expires_at > now);
474        Ok(())
475    }
476
477    fn commit_publish(&self, commit: PublishCommit<'_>) -> Result<PublishCommitOutcome, AcdpError> {
478        use crate::registry::validator::assign_identifiers;
479
480        let PublishCommit {
481            req,
482            authority,
483            idempotency,
484            // InMemoryStore does not model tenancy (it is a single-tenant
485            // reference/test backend); the durable backends honor this.
486            tenant: _,
487            receipt_minter,
488        } = commit;
489        let now = chrono::Utc::now();
490        let mut g = self.lock();
491
492        // ── 1. Idempotency replay / collision ────────────────────────
493        if let Some(idem) = &idempotency {
494            let idem_key = (req.agent_id.as_str().to_string(), idem.key.to_string());
495            if let Some(prior) = g.idempotency.get(&idem_key) {
496                if prior.expires_at > now {
497                    return if prior.content_hash == req.content_hash {
498                        // idem-002: same key + same hash → replay.
499                        Ok(PublishCommitOutcome::IdempotentReplay(
500                            prior.response.clone(),
501                        ))
502                    } else {
503                        // idem-003: same key + different hash → duplicate_publish.
504                        Err(AcdpError::DuplicatePublish(format!(
505                            "Idempotency-Key '{}' was previously used by '{}' \
506                             with a different content_hash",
507                            idem.key, req.agent_id
508                        )))
509                    };
510                }
511                // Expired record — fall through and overwrite below.
512            }
513        }
514
515        // ── 2. Supersession lookups + coherence checks ──────────────
516        let first_v1 = if let Some(prev) = &req.supersedes {
517            let prev_full = g.by_ctx.get(prev.as_str()).cloned().ok_or_else(|| {
518                AcdpError::SupersededTarget {
519                    reason: acdp_primitives::error::SupersessionReason::NotFound,
520                    message: format!("supersedes target '{prev}' not found in this registry"),
521                }
522            })?;
523
524            // Producer-continuity: only the predecessor's producer (or a
525            // declared contributor) may publish a successor in its lineage.
526            // Signature verification only proves the *requester* signed
527            // their own request — it does not bind `supersedes` to the
528            // predecessor's owner. Without this check any signer could
529            // supersede another producer's context (`Superseded` side
530            // effect below + `current(lineage)` re-pointing), a full
531            // lineage takeover. RFC-ACDP-0001 §5.9 supersession is
532            // producer-scoped.
533            let is_owner = req.agent_id == prev_full.body.agent_id
534                || prev_full.body.contributors.contains(&req.agent_id);
535            if !is_owner {
536                // Uniform with the genuine not-found case above: a
537                // non-owner learns neither that the predecessor exists nor
538                // its version / superseded status (supersession existence
539                // oracle). Anyone who can legitimately read the
540                // predecessor learns nothing new from this shape.
541                return Err(AcdpError::SupersededTarget {
542                    reason: acdp_primitives::error::SupersessionReason::NotFound,
543                    message: format!("supersedes target '{prev}' not found in this registry"),
544                });
545            }
546
547            // Lineage coherence — when the producer self-verifies.
548            if let Some(declared) = &req.lineage_id {
549                if declared != &prev_full.body.lineage_id {
550                    return Err(AcdpError::SupersededTarget {
551                        reason: acdp_primitives::error::SupersessionReason::LineageMismatch,
552                        message: format!(
553                            "declared lineage_id '{declared}' ≠ predecessor's '{}'",
554                            prev_full.body.lineage_id
555                        ),
556                    });
557                }
558            }
559            // Version coherence: new.version MUST be predecessor.version + 1.
560            if req.version != prev_full.body.version + 1 {
561                return Err(AcdpError::SupersededTarget {
562                    reason: acdp_primitives::error::SupersessionReason::VersionMismatch,
563                    message: format!(
564                        "version {} ≠ predecessor.version + 1 ({})",
565                        req.version,
566                        prev_full.body.version + 1
567                    ),
568                });
569            }
570            // FEAT-01 atomicity: the check that previously raced with
571            // another concurrent publish. Now under the same lock as
572            // the insert below — exactly one of two contenders succeeds.
573            if matches!(prev_full.registry_state.status, Status::Superseded) {
574                return Err(AcdpError::SupersededTarget {
575                    reason: acdp_primitives::error::SupersessionReason::AlreadySuperseded,
576                    message: format!("supersedes target '{prev}' has already been superseded"),
577                });
578            }
579
580            // Derive the v1 ctx_id from the predecessor's lineage —
581            // same logic as `first_version_ctx_id`, inlined to stay
582            // under the existing lock.
583            g.lineages
584                .get(prev_full.body.lineage_id.as_str())
585                .and_then(|ids| ids.first().cloned())
586                .map(CtxId)
587        } else {
588            None
589        };
590
591        // ── 3. Identifier assignment ────────────────────────────────
592        let validated = crate::registry::validator::ValidatedPublish {
593            recomputed_hash: req.content_hash.clone(),
594        };
595        let (ctx_id, lineage_id) =
596            assign_identifiers(authority, &req.supersedes, first_v1.as_ref(), &validated)?;
597
598        // ── 4. Build the stored Body ────────────────────────────────
599        let created_at = acdp_primitives::time::trunc_ms(now);
600        let body = Body {
601            ctx_id: ctx_id.clone(),
602            lineage_id: lineage_id.clone(),
603            origin_registry: authority.to_string(),
604            created_at,
605            content_hash: req.content_hash.clone(),
606            signature: req.signature.clone(),
607            version: req.version,
608            supersedes: req.supersedes.clone(),
609            agent_id: req.agent_id.clone(),
610            contributors: req.contributors.clone(),
611            title: req.title.clone(),
612            context_type: req.context_type.clone(),
613            data_refs: req.data_refs.clone(),
614            derived_from: req.derived_from.clone(),
615            visibility: req.visibility.clone(),
616            audience: req.audience.clone(),
617            acdp_version: req.acdp_version.clone(),
618            description: req.description.clone(),
619            summary: req.summary.clone(),
620            tags: req.tags.clone(),
621            domain: req.domain.clone(),
622            expires_at: req.expires_at,
623            data_period: req.data_period.clone(),
624            metadata: req.metadata.clone(),
625            schema_uri: req.schema_uri.clone(),
626            extensions: Default::default(),
627        };
628
629        // ── 5. Insert (mirrors `put` but inline so we keep the lock) ─
630        let ctx_id_str = body.ctx_id.0.clone();
631        let lineage_id_str = body.lineage_id.0.clone();
632        if g.by_ctx.contains_key(&ctx_id_str) {
633            // UUID collision is astronomically unlikely but we still
634            // surface it as a SchemaViolation rather than silently
635            // overwriting.
636            return Err(AcdpError::SchemaViolation(format!(
637                "ctx_id collision: '{ctx_id_str}' already exists"
638            )));
639        }
640        // Receipt minting (RFC-ACDP-0010) — inside the critical section,
641        // before the insert becomes visible, so a context published
642        // under the receipts profile never exists without its receipt.
643        let registry_receipt = receipt_minter.map(|mint| mint(&body)).transpose()?;
644
645        let stored = FullContext {
646            body,
647            registry_state: RegistryState {
648                status: Status::Active,
649                extensions: Default::default(),
650            },
651            registry_receipt: registry_receipt.clone(),
652            extensions: Default::default(),
653        };
654        g.by_ctx.insert(ctx_id_str.clone(), stored);
655        g.lineages
656            .entry(lineage_id_str)
657            .or_default()
658            .push(ctx_id_str);
659
660        // ── 6. Mark predecessor superseded ──────────────────────────
661        if let Some(prev) = &req.supersedes {
662            if let Some(prev_ctx) = g.by_ctx.get_mut(prev.as_str()) {
663                prev_ctx.registry_state.status = Status::Superseded;
664            }
665        }
666
667        let response = PublishResponse {
668            ctx_id,
669            lineage_id,
670            version: req.version,
671            created_at,
672            status: Status::Active,
673            registry_receipt,
674        };
675
676        // ── 7. Idempotency record ───────────────────────────────────
677        if let Some(idem) = idempotency {
678            let expires_at = now + idem.ttl;
679            g.idempotency.insert(
680                (req.agent_id.as_str().to_string(), idem.key.to_string()),
681                IdempotencyRecord {
682                    content_hash: req.content_hash.clone(),
683                    response: response.clone(),
684                    expires_at,
685                },
686            );
687        }
688
689        Ok(PublishCommitOutcome::Inserted(response))
690    }
691
692    fn search(
693        &self,
694        params: &SearchParams,
695        requester: Option<&AgentDid>,
696        anonymous_public_reads: bool,
697    ) -> Result<SearchResponse, AcdpError> {
698        let g = self.lock();
699        let now = chrono::Utc::now();
700
701        let q_lower = params.q.as_deref().map(str::to_lowercase);
702        let domain = params.domain.as_deref();
703        let agent = params.agent_id.as_deref();
704        let context_type = params.context_type.as_deref();
705        let derived_from = params.derived_from.as_deref();
706        let schema_uri = params.schema_uri.as_deref();
707        let tags: Option<Vec<&str>> = params.tags.as_deref().map(|s| {
708            s.split(',')
709                .map(str::trim)
710                .filter(|t| !t.is_empty())
711                .collect()
712        });
713
714        // BUG-10: parse date-time filter params at the boundary so the
715        // hot loop just compares DateTime<Utc> values.
716        let created_after = parse_opt_rfc3339(&params.created_after)?;
717        let created_before = parse_opt_rfc3339(&params.created_before)?;
718        let dp_start_after = parse_opt_rfc3339(&params.data_period_start_after)?;
719        let dp_end_before = parse_opt_rfc3339(&params.data_period_end_before)?;
720        let expires_after = parse_opt_rfc3339(&params.expires_after)?;
721        let expires_before = parse_opt_rfc3339(&params.expires_before)?;
722
723        let mut matches: Vec<&FullContext> = g
724            .by_ctx
725            .values()
726            .filter(|ctx| {
727                let body = &ctx.body;
728
729                // RFC-ACDP-0008 §4.5 search-disclosure gate (note the
730                // private/restricted asymmetry: private contexts surface
731                // in search only to their producer).
732                if !can_surface_in_search(body, requester, anonymous_public_reads) {
733                    return false;
734                }
735
736                if let Some(q) = &q_lower {
737                    let haystack = format!(
738                        "{} {} {} {} {} {}",
739                        body.title,
740                        body.description.as_deref().unwrap_or(""),
741                        body.summary.as_deref().unwrap_or(""),
742                        body.domain.as_deref().unwrap_or(""),
743                        body.agent_id.as_str(),
744                        body.tags.as_ref().map(|t| t.join(" ")).unwrap_or_default(),
745                    )
746                    .to_lowercase();
747                    if !haystack.contains(q) {
748                        return false;
749                    }
750                }
751                if let Some(d) = domain {
752                    if body.domain.as_deref() != Some(d) {
753                        return false;
754                    }
755                }
756                if let Some(a) = agent {
757                    if body.agent_id.as_str() != a {
758                        return false;
759                    }
760                }
761                if let Some(t) = context_type {
762                    let body_type = serde_json::to_value(&body.context_type)
763                        .ok()
764                        .and_then(|v| v.as_str().map(str::to_string))
765                        .unwrap_or_default();
766                    if body_type != t {
767                        return false;
768                    }
769                }
770                if let Some(df) = derived_from {
771                    if !body.derived_from.iter().any(|c| c.as_str() == df) {
772                        return false;
773                    }
774                }
775                if let Some(req_tags) = &tags {
776                    let body_tags = body.tags.as_deref().unwrap_or(&[]);
777                    if !req_tags.iter().all(|t| body_tags.iter().any(|bt| bt == t)) {
778                        return false;
779                    }
780                }
781                if let Some(uri) = schema_uri {
782                    if body.schema_uri.as_deref() != Some(uri) {
783                        return false;
784                    }
785                }
786                if let Some(after) = created_after {
787                    if body.created_at < after {
788                        return false;
789                    }
790                }
791                if let Some(before) = created_before {
792                    if body.created_at > before {
793                        return false;
794                    }
795                }
796                if let Some(after) = dp_start_after {
797                    match &body.data_period {
798                        Some(p) if p.start >= after => {}
799                        _ => return false,
800                    }
801                }
802                if let Some(before) = dp_end_before {
803                    match &body.data_period {
804                        Some(p) if p.end <= before => {}
805                        _ => return false,
806                    }
807                }
808                if let Some(after) = expires_after {
809                    match body.expires_at {
810                        Some(e) if e >= after => {}
811                        _ => return false,
812                    }
813                }
814                if let Some(before) = expires_before {
815                    match body.expires_at {
816                        Some(e) if e <= before => {}
817                        _ => return false,
818                    }
819                }
820                // Status filter — registry default is `active`. Compare
821                // against PROJECTED status so a stored-Active body whose
822                // expires_at has passed is filtered out (RFC-ACDP-0004 §4).
823                let want_status = params.status.as_deref().unwrap_or("active");
824                let effective = project_status(&ctx.registry_state.status, body, now);
825                if effective.as_str() != want_status {
826                    return false;
827                }
828                true
829            })
830            .collect();
831
832        // Newest first; IMP-03 — fall back to ctx_id for a deterministic
833        // total order when many contexts share a millisecond.
834        matches.sort_by(|a, b| {
835            b.body
836                .created_at
837                .cmp(&a.body.created_at)
838                .then_with(|| a.body.ctx_id.as_str().cmp(b.body.ctx_id.as_str()))
839        });
840
841        // BUG-08: capture `total_estimate` BEFORE cursor filtering so
842        // it represents the total count across all pages (RFC-ACDP-0005
843        // §3 — clients use this for "page 1 of N" UIs). If we captured
844        // it after `retain`, page 2 would show "80 matches" for a
845        // 100-item search, page 3 "60", and so on.
846        let total_estimate = Some(matches.len() as u64);
847
848        // BUG-10 cursor: opaque base64 of "<created_at_ms>:<ctx_id>".
849        // ≥1h validity is implicit — cursors do not embed a timestamp,
850        // so they remain valid until the underlying context is deleted.
851        let cursor_anchor = params
852            .cursor
853            .as_deref()
854            .map(decode_cursor)
855            .transpose()?
856            .flatten();
857        if let Some((anchor_ms, anchor_id)) = &cursor_anchor {
858            matches.retain(|c| {
859                let ms = c.body.created_at.timestamp_millis();
860                ms < *anchor_ms || (ms == *anchor_ms && c.body.ctx_id.as_str() > anchor_id.as_str())
861            });
862        }
863
864        // Clamp into [1, 100]: `limit` is attacker-controlled and never
865        // lower-bounded. `limit=0` with ≥1 match would compute `limit - 1`
866        // below → debug-build subtraction panic (request-thread DoS) /
867        // release-build wrap to usize::MAX → broken pagination.
868        let limit = params.limit.unwrap_or(50).clamp(1, 100) as usize;
869        let next_cursor = if matches.len() > limit {
870            matches.get(limit - 1).map(|c| {
871                encode_cursor(c.body.created_at.timestamp_millis(), c.body.ctx_id.as_str())
872            })
873        } else {
874            None
875        };
876
877        let projected: Vec<SearchResult> = matches
878            .iter()
879            .take(limit)
880            .map(|ctx| SearchResult {
881                ctx_id: ctx.body.ctx_id.clone(),
882                lineage_id: ctx.body.lineage_id.clone(),
883                agent_id: ctx.body.agent_id.clone(),
884                title: ctx.body.title.clone(),
885                summary: ctx.body.summary.clone(),
886                context_type: ctx.body.context_type.clone(),
887                domain: ctx.body.domain.clone(),
888                created_at: ctx.body.created_at,
889                status: project_status(&ctx.registry_state.status, &ctx.body, now),
890                // RFC-ACDP-0008 §4.5: only disclose visibility when the
891                // requester is authorized for it. Public is always safe.
892                // For restricted/private, the search filter above guarantees
893                // the requester is producer-or-audience, so it's safe to
894                // surface the label.
895                visibility: Some(ctx.body.visibility.clone()),
896            })
897            .collect();
898
899        Ok(SearchResponse {
900            matches: projected,
901            total_estimate,
902            next_cursor,
903        })
904    }
905}
906
907/// Parse an optional RFC 3339 string parameter; surface a
908/// [`AcdpError::SchemaViolation`] on malformed input.
909fn parse_opt_rfc3339(
910    s: &Option<String>,
911) -> Result<Option<chrono::DateTime<chrono::Utc>>, AcdpError> {
912    let Some(raw) = s.as_deref() else {
913        return Ok(None);
914    };
915    let dt = chrono::DateTime::parse_from_rfc3339(raw)
916        .map_err(|e| AcdpError::SchemaViolation(format!("malformed datetime '{raw}': {e}")))?;
917    Ok(Some(dt.with_timezone(&chrono::Utc)))
918}
919
920/// Cursor TTL — clients SHOULD re-fetch after this window.
921/// RFC-ACDP-0005 §3 leaves the exact value to implementations; 1 hour
922/// matches the common "≥1h" cursor-validity expectation.
923const CURSOR_TTL: chrono::Duration = chrono::Duration::seconds(3600);
924
925/// Opaque cursor encoding — base64 of
926/// `<mint_unix_ms>:<created_at_millis>:<ctx_id>`.
927///
928/// The `mint_unix_ms` prefix lets [`decode_cursor`] enforce the
929/// `CURSOR_TTL` window and surface `AcdpError::CursorExpired` rather
930/// than silently accepting an ancient cursor (FEAT-04). Plain
931/// `STANDARD` engine so cursors are stable across machines.
932fn encode_cursor(created_at_ms: i64, ctx_id: &str) -> String {
933    use base64::{engine::general_purpose::STANDARD, Engine};
934    let mint_ms = chrono::Utc::now().timestamp_millis();
935    STANDARD.encode(format!("{mint_ms}:{created_at_ms}:{ctx_id}"))
936}
937
938fn decode_cursor(s: &str) -> Result<Option<(i64, String)>, AcdpError> {
939    use base64::{engine::general_purpose::STANDARD, Engine};
940    let bytes = STANDARD
941        .decode(s)
942        .map_err(|_| AcdpError::InvalidCursor("cursor is not valid base64".into()))?;
943    let decoded = String::from_utf8(bytes)
944        .map_err(|_| AcdpError::InvalidCursor("cursor is not utf-8".into()))?;
945    // Format: "<mint_ms>:<anchor_ms>:<ctx_id>". The mint timestamp
946    // prefix is FEAT-04 expiry tracking.
947    let mut parts = decoded.splitn(3, ':');
948    let mint_str = parts
949        .next()
950        .ok_or_else(|| AcdpError::InvalidCursor("cursor missing mint timestamp".into()))?;
951    let anchor_str = parts
952        .next()
953        .ok_or_else(|| AcdpError::InvalidCursor("cursor missing anchor timestamp".into()))?;
954    let ctx_id = parts
955        .next()
956        .ok_or_else(|| AcdpError::InvalidCursor("cursor missing ctx_id".into()))?;
957    let mint_ms: i64 = mint_str
958        .parse()
959        .map_err(|_| AcdpError::InvalidCursor("cursor mint millis is not an integer".into()))?;
960    let anchor_ms: i64 = anchor_str
961        .parse()
962        .map_err(|_| AcdpError::InvalidCursor("cursor anchor millis is not an integer".into()))?;
963
964    // FEAT-04: reject cursors older than CURSOR_TTL with the
965    // dedicated `cursor_expired` wire code so clients can distinguish
966    // "you typo'd the cursor" (InvalidCursor) from "your cursor aged
967    // out, restart the scan from page 1" (CursorExpired).
968    let now = chrono::Utc::now().timestamp_millis();
969    let age_ms = now.saturating_sub(mint_ms);
970    if age_ms > CURSOR_TTL.num_milliseconds() {
971        // CursorExpired is a unit variant in the wire-error mapping;
972        // the diagnostic ("aged Xms ago") is intentionally surfaced
973        // via Display rather than a payload so it round-trips through
974        // `AcdpError::from_wire_error`.
975        return Err(AcdpError::CursorExpired);
976    }
977    Ok(Some((anchor_ms, ctx_id.to_string())))
978}
979
980#[cfg(test)]
981mod tests {
982    use super::*;
983    use acdp_crypto::SigningKey;
984    use acdp_producer::Producer;
985    use acdp_types::body::{DataPeriod, Signature};
986    use acdp_types::primitives::{AgentDid, ContentHash, ContextType, Visibility};
987    use chrono::Utc;
988
989    fn fake_body(ctx_id: &str, lineage_id: &str, title: &str) -> Body {
990        Body {
991            ctx_id: CtxId(ctx_id.into()),
992            lineage_id: LineageId(lineage_id.into()),
993            origin_registry: "registry.example.com".into(),
994            created_at: Utc::now(),
995            content_hash: ContentHash("sha256:0".into()),
996            signature: Signature {
997                algorithm: "ed25519".into(),
998                key_id: "did:web:agents.example.com:test#key-1".into(),
999                value: "A".repeat(88),
1000            },
1001            version: 1,
1002            supersedes: None,
1003            agent_id: AgentDid::new("did:web:agents.example.com:test"),
1004            contributors: vec![],
1005            title: title.into(),
1006            context_type: ContextType::DataSnapshot,
1007            data_refs: vec![],
1008            derived_from: vec![],
1009            visibility: Visibility::Public,
1010            audience: None,
1011            acdp_version: None,
1012            description: None,
1013            summary: None,
1014            tags: None,
1015            domain: None,
1016            expires_at: None,
1017            data_period: None,
1018            metadata: None,
1019            schema_uri: None,
1020            extensions: Default::default(),
1021        }
1022    }
1023
1024    #[test]
1025    fn put_get_round_trip() {
1026        let s = InMemoryStore::new();
1027        let id = "acdp://r/12345678-1234-4321-8123-123456781234";
1028        let lin = "lin:sha256:1111111111111111111111111111111111111111111111111111111111111111";
1029        s.put(fake_body(id, lin, "A")).unwrap();
1030        let got = s.get(&CtxId(id.into())).unwrap().unwrap();
1031        assert_eq!(got.body.title, "A");
1032        assert!(matches!(got.registry_state.status, Status::Active));
1033    }
1034
1035    #[test]
1036    fn lineage_orders_by_publish_order() {
1037        let s = InMemoryStore::new();
1038        let lin = "lin:sha256:2222222222222222222222222222222222222222222222222222222222222222";
1039        let v1 = "acdp://r/12345678-1234-4321-8123-000000000001";
1040        let v2 = "acdp://r/12345678-1234-4321-8123-000000000002";
1041        s.put(fake_body(v1, lin, "v1")).unwrap();
1042        s.put(fake_body(v2, lin, "v2")).unwrap();
1043        let lineage = s.lineage(&LineageId(lin.into())).unwrap();
1044        assert_eq!(lineage.len(), 2);
1045        assert_eq!(lineage[0].body.title, "v1");
1046        assert_eq!(lineage[1].body.title, "v2");
1047    }
1048
1049    #[test]
1050    fn supersession_marks_predecessor() {
1051        let s = InMemoryStore::new();
1052        let lin = "lin:sha256:3333333333333333333333333333333333333333333333333333333333333333";
1053        let v1 = "acdp://r/12345678-1234-4321-8123-000000000003";
1054        s.put(fake_body(v1, lin, "v1")).unwrap();
1055        s.mark_superseded(&CtxId(v1.into())).unwrap();
1056        let got = s.get(&CtxId(v1.into())).unwrap().unwrap();
1057        assert!(matches!(got.registry_state.status, Status::Superseded));
1058    }
1059
1060    // BUG-11 — Status::Expired derived from body.expires_at at read time.
1061
1062    fn expired_body(
1063        ctx_id: &str,
1064        lineage_id: &str,
1065        title: &str,
1066        expires_at: chrono::DateTime<chrono::Utc>,
1067    ) -> Body {
1068        let mut b = fake_body(ctx_id, lineage_id, title);
1069        b.expires_at = Some(expires_at);
1070        b
1071    }
1072
1073    #[test]
1074    fn get_projects_active_to_expired_when_past_expires_at() {
1075        use chrono::Duration;
1076        let s = InMemoryStore::new();
1077        let lin = "lin:sha256:5555555555555555555555555555555555555555555555555555555555555555";
1078        let id = "acdp://r/12345678-1234-4321-8123-000000000006";
1079        s.put(expired_body(
1080            id,
1081            lin,
1082            "old",
1083            chrono::Utc::now() - Duration::hours(1),
1084        ))
1085        .unwrap();
1086        let got = s.get(&CtxId(id.into())).unwrap().unwrap();
1087        assert!(
1088            matches!(got.registry_state.status, Status::Expired),
1089            "expected Status::Expired projection, got {:?}",
1090            got.registry_state.status
1091        );
1092    }
1093
1094    #[test]
1095    fn get_keeps_active_when_expires_at_in_future() {
1096        use chrono::Duration;
1097        let s = InMemoryStore::new();
1098        let lin = "lin:sha256:6666666666666666666666666666666666666666666666666666666666666666";
1099        let id = "acdp://r/12345678-1234-4321-8123-000000000007";
1100        s.put(expired_body(
1101            id,
1102            lin,
1103            "fresh",
1104            chrono::Utc::now() + Duration::hours(1),
1105        ))
1106        .unwrap();
1107        let got = s.get(&CtxId(id.into())).unwrap().unwrap();
1108        assert!(matches!(got.registry_state.status, Status::Active));
1109    }
1110
1111    #[test]
1112    fn search_status_active_filters_out_expired() {
1113        use chrono::Duration;
1114        let s = InMemoryStore::new();
1115        let lin = "lin:sha256:7777777777777777777777777777777777777777777777777777777777777777";
1116        let id = "acdp://r/12345678-1234-4321-8123-000000000008";
1117        s.put(expired_body(
1118            id,
1119            lin,
1120            "old",
1121            chrono::Utc::now() - Duration::hours(1),
1122        ))
1123        .unwrap();
1124        let resp = s.search(&SearchParams::default(), None, true).unwrap();
1125        assert!(
1126            resp.matches.is_empty(),
1127            "expired must not surface under status=active default"
1128        );
1129        // Asking for `expired` SHOULD surface it.
1130        let resp = s
1131            .search(
1132                &SearchParams {
1133                    status: Some("expired".into()),
1134                    ..Default::default()
1135                },
1136                None,
1137                true,
1138            )
1139            .unwrap();
1140        assert_eq!(resp.matches.len(), 1);
1141    }
1142
1143    /// BUG-10 — date/time filter is honored.
1144    #[test]
1145    fn search_filters_by_created_after() {
1146        let s = InMemoryStore::new();
1147        let lin = "lin:sha256:8888888888888888888888888888888888888888888888888888888888888888";
1148        let mut body = fake_body(
1149            "acdp://r/12345678-1234-4321-8123-000000000009",
1150            lin,
1151            "match",
1152        );
1153        body.created_at = chrono::DateTime::parse_from_rfc3339("2026-01-01T00:00:00.000Z")
1154            .unwrap()
1155            .with_timezone(&chrono::Utc);
1156        s.put(body).unwrap();
1157        // `created_after` AFTER body.created_at → 0 matches
1158        let resp = s
1159            .search(
1160                &SearchParams {
1161                    created_after: Some("2026-02-01T00:00:00.000Z".into()),
1162                    ..Default::default()
1163                },
1164                None,
1165                true,
1166            )
1167            .unwrap();
1168        assert_eq!(resp.matches.len(), 0);
1169        // `created_after` BEFORE body.created_at → 1 match
1170        let resp = s
1171            .search(
1172                &SearchParams {
1173                    created_after: Some("2025-12-01T00:00:00.000Z".into()),
1174                    ..Default::default()
1175                },
1176                None,
1177                true,
1178            )
1179            .unwrap();
1180        assert_eq!(resp.matches.len(), 1);
1181    }
1182
1183    #[test]
1184    fn search_invalid_rfc3339_filter_rejected() {
1185        let s = InMemoryStore::new();
1186        let err = s
1187            .search(
1188                &SearchParams {
1189                    created_after: Some("not-a-date".into()),
1190                    ..Default::default()
1191                },
1192                None,
1193                true,
1194            )
1195            .unwrap_err();
1196        assert!(matches!(err, AcdpError::SchemaViolation(_)));
1197    }
1198
1199    /// BUG-10 cursor round-trips and pages correctly.
1200    #[test]
1201    fn search_cursor_pages_results() {
1202        let s = InMemoryStore::new();
1203        let lin = "lin:sha256:9999999999999999999999999999999999999999999999999999999999999999";
1204        // Insert 5 contexts with distinct created_at so order is deterministic.
1205        let base = chrono::DateTime::parse_from_rfc3339("2026-01-01T00:00:00.000Z")
1206            .unwrap()
1207            .with_timezone(&chrono::Utc);
1208        for i in 0..5u8 {
1209            let mut body = fake_body(
1210                &format!("acdp://r/12345678-1234-4321-8123-00000000010{i}"),
1211                lin,
1212                "match",
1213            );
1214            body.created_at = base + chrono::Duration::minutes(i as i64);
1215            s.put(body).unwrap();
1216        }
1217        let p1 = s
1218            .search(
1219                &SearchParams {
1220                    limit: Some(2),
1221                    ..Default::default()
1222                },
1223                None,
1224                true,
1225            )
1226            .unwrap();
1227        assert_eq!(p1.matches.len(), 2);
1228        let cursor = p1.next_cursor.expect("page 1 should carry a cursor");
1229        let p2 = s
1230            .search(
1231                &SearchParams {
1232                    limit: Some(2),
1233                    cursor: Some(cursor.clone()),
1234                    ..Default::default()
1235                },
1236                None,
1237                true,
1238            )
1239            .unwrap();
1240        assert_eq!(p2.matches.len(), 2);
1241        // No overlap between page 1 and page 2.
1242        for r in &p2.matches {
1243            assert!(
1244                !p1.matches.iter().any(|q| q.ctx_id == r.ctx_id),
1245                "page 2 overlapped page 1"
1246            );
1247        }
1248        // BUG-08: total_estimate MUST be stable across pages — captured
1249        // BEFORE cursor filtering. Before the fix, page 2 reported a
1250        // smaller total than page 1 (the remaining-from-cursor count).
1251        assert_eq!(
1252            p1.total_estimate, p2.total_estimate,
1253            "total_estimate MUST be stable across pages (BUG-08); \
1254             p1={:?}, p2={:?}",
1255            p1.total_estimate, p2.total_estimate
1256        );
1257        assert_eq!(
1258            p1.total_estimate,
1259            Some(5),
1260            "total_estimate MUST reflect total matches across all pages, got {:?}",
1261            p1.total_estimate
1262        );
1263    }
1264
1265    #[test]
1266    fn search_limit_zero_does_not_underflow() {
1267        // P1-1: limit=0 with ≥1 match previously computed `limit - 1`
1268        // (debug panic / release wrap). It MUST be clamped to ≥1.
1269        let s = InMemoryStore::new();
1270        let lin = "lin:sha256:8888888888888888888888888888888888888888888888888888888888888888";
1271        for i in 0..3u8 {
1272            let body = fake_body(
1273                &format!("acdp://r/12345678-1234-4321-8123-00000000020{i}"),
1274                lin,
1275                "match",
1276            );
1277            s.put(body).unwrap();
1278        }
1279        let page = s
1280            .search(
1281                &SearchParams {
1282                    limit: Some(0),
1283                    ..Default::default()
1284                },
1285                None,
1286                true,
1287            )
1288            .expect("limit=0 must not panic or error");
1289        // Clamped to 1: one result, and a cursor since more remain.
1290        assert_eq!(page.matches.len(), 1);
1291        assert!(page.next_cursor.is_some());
1292    }
1293
1294    #[test]
1295    fn search_malformed_cursor_rejected() {
1296        let s = InMemoryStore::new();
1297        let err = s
1298            .search(
1299                &SearchParams {
1300                    cursor: Some("not_base64!@#".into()),
1301                    ..Default::default()
1302                },
1303                None,
1304                true,
1305            )
1306            .unwrap_err();
1307        assert!(matches!(err, AcdpError::InvalidCursor(_)));
1308    }
1309
1310    /// FEAT-04: a cursor whose embedded mint timestamp is older than
1311    /// `CURSOR_TTL` MUST surface as `CursorExpired`, not `InvalidCursor`.
1312    /// Clients distinguish "you typo'd the cursor" from "your cursor
1313    /// aged out, restart the scan from page 1".
1314    #[test]
1315    fn search_aged_cursor_rejected_as_cursor_expired() {
1316        use base64::{engine::general_purpose::STANDARD, Engine};
1317        let s = InMemoryStore::new();
1318        // Mint a cursor 7200s in the past — twice the 3600s TTL.
1319        let stale_mint_ms = chrono::Utc::now().timestamp_millis() - 7200 * 1000;
1320        let aged = STANDARD.encode(format!(
1321            "{stale_mint_ms}:0:acdp://r/12345678-1234-4321-8123-1234567812aa"
1322        ));
1323        let err = s
1324            .search(
1325                &SearchParams {
1326                    cursor: Some(aged),
1327                    ..Default::default()
1328                },
1329                None,
1330                true,
1331            )
1332            .unwrap_err();
1333        assert!(
1334            matches!(err, AcdpError::CursorExpired),
1335            "expired cursor MUST surface CursorExpired, got {err:?}"
1336        );
1337    }
1338
1339    #[test]
1340    fn search_filters_by_status_default_active() {
1341        let s = InMemoryStore::new();
1342        let lin = "lin:sha256:4444444444444444444444444444444444444444444444444444444444444444";
1343        let v1 = "acdp://r/12345678-1234-4321-8123-000000000004";
1344        let v2 = "acdp://r/12345678-1234-4321-8123-000000000005";
1345        s.put(fake_body(v1, lin, "old")).unwrap();
1346        s.put(fake_body(v2, lin, "new")).unwrap();
1347        s.mark_superseded(&CtxId(v1.into())).unwrap();
1348        let resp = s
1349            .search(
1350                &SearchParams {
1351                    q: Some("old".into()),
1352                    ..Default::default()
1353                },
1354                None,
1355                true,
1356            )
1357            .unwrap();
1358        // Only `active` matches — superseded "old" filtered out.
1359        assert_eq!(resp.matches.len(), 0);
1360        let resp = s
1361            .search(
1362                &SearchParams {
1363                    q: Some("new".into()),
1364                    ..Default::default()
1365                },
1366                None,
1367                true,
1368            )
1369            .unwrap();
1370        assert_eq!(resp.matches.len(), 1);
1371    }
1372
1373    /// End-to-end: producer → server pipeline using the actual signing
1374    /// path. Uses a builder and the `RegistryServer` (see server.rs)
1375    /// to confirm the integration story.
1376    #[test]
1377    fn store_round_trip_from_real_publish_request() {
1378        use crate::registry::server::RegistryServer;
1379        use acdp_types::capabilities::{CapabilitiesDocument, Limits};
1380
1381        let key = SigningKey::from_bytes(&[7u8; 32]);
1382        let p = Producer::new(
1383            key,
1384            AgentDid::new("did:web:agents.example.com:test"),
1385            "did:web:agents.example.com:test#key-1",
1386        );
1387        let req = p
1388            .publish_request()
1389            .title("hello")
1390            .context_type(ContextType::DataSnapshot)
1391            .visibility(Visibility::Public)
1392            .build()
1393            .unwrap();
1394
1395        let caps = CapabilitiesDocument {
1396            acdp_version: "0.1.0".into(),
1397            registry_did: "did:web:registry.example.com".into(),
1398            supported_signature_algorithms: vec!["ed25519".into()],
1399            supported_did_methods: vec!["did:web".into()],
1400            profiles: vec!["acdp-registry-core".into()],
1401            limits: Limits {
1402                max_payload_bytes: 1_048_576,
1403                max_embedded_bytes: 65_536,
1404                idempotency_key_ttl_seconds: None,
1405            },
1406            read_authentication_methods: vec![],
1407            anonymous_public_reads: true,
1408            supports_idempotency_key: false,
1409            extensions: Default::default(),
1410        };
1411
1412        let server = RegistryServer::new(InMemoryStore::new(), caps, "registry.example.com");
1413        let resp = server.publish_unverified_for_tests(&req).unwrap();
1414        assert_eq!(resp.version, 1);
1415        let ctx = server.retrieve(&resp.ctx_id, None).unwrap().unwrap();
1416        assert_eq!(ctx.body.title, "hello");
1417
1418        // Ignore unused imports under different feature combinations
1419        let _: Option<DataPeriod> = ctx.body.data_period.clone();
1420    }
1421}