Skip to main content

acdp_server/registry/
server.rs

1//! Logical registry handler (feature = "server").
2//!
3//! Wires [`PublishValidator`] together with a [`RegistryStore`] backend
4//! to provide the seven core registry operations enumerated in
5//! RFC-ACDP-0003 §2.1 and RFC-ACDP-0005:
6//!
7//! - capabilities — return the [`CapabilitiesDocument`].
8//! - publish — validate, verify signature, assign identifiers, persist.
9//! - retrieve — fetch a stored body + registry_state (visibility-filtered).
10//! - retrieve_body — fetch just the body (visibility-filtered).
11//! - lineage / current — lineage graph queries.
12//! - search — keyword + filter projection (visibility-filtered).
13//!
14//! This is the building block an HTTP-binding layer can sit on top of;
15//! the integration tests in this crate exercise it directly without
16//! mocking.
17//!
18//! # Conformant publish
19//!
20//! [`RegistryServer::publish_verified`] runs the full RFC-ACDP-0003 §2.1
21//! algorithm — structural validation, hash recomputation, DID resolution,
22//! signature verification — before persistence. It requires the `client`
23//! feature for [`acdp_did::WebResolver`].
24//!
25//! [`RegistryServer::publish_unverified_for_tests`] performs only steps
26//! 1–6 (skipping DID resolution + signature verification) and is
27//! intentionally **not** RFC-conformant; use only in tests where DID
28//! resolution would require a live network or mock server.
29
30use crate::registry::rate_limit::{NoopRateLimiter, RateLimiter};
31use crate::registry::store::RegistryStore;
32use crate::registry::validator::PublishValidator;
33use acdp_primitives::error::AcdpError;
34use acdp_types::{
35    body::{Body, FullContext},
36    capabilities::CapabilitiesDocument,
37    primitives::{AgentDid, CtxId, LineageId, Status, Visibility},
38    publish::{PublishRequest, PublishResponse},
39    search::{SearchParams, SearchResponse},
40};
41
42/// Logical registry handler over an arbitrary [`RegistryStore`].
43///
44/// `L` is the rate-limiting policy (RFC-ACDP-0008 §4.3). The default
45/// [`NoopRateLimiter`] accepts every publish; operators that need a
46/// real limiter construct via [`Self::with_rate_limiter`].
47pub struct RegistryServer<S: RegistryStore, L: RateLimiter = NoopRateLimiter> {
48    store: S,
49    caps: CapabilitiesDocument,
50    authority: String,
51    rate_limiter: L,
52    /// Receipt minting identity (ACDP 0.2, RFC-ACDP-0010). `None` =
53    /// 0.1.0-mode registry (no receipts). Set via
54    /// [`Self::with_receipt_signer`], which also advertises the
55    /// `acdp-registry-receipts` profile.
56    receipt_signer: Option<acdp_types::receipt::ReceiptSigner>,
57}
58
59impl<S: RegistryStore> RegistryServer<S, NoopRateLimiter> {
60    /// Unchecked constructor. Skips capabilities and DID-authority binding
61    /// validation; prefer [`Self::try_new`] in production. Retained for
62    /// tests that build a server from known-good fixtures.
63    #[doc(hidden)]
64    pub fn new(store: S, caps: CapabilitiesDocument, authority: impl Into<String>) -> Self {
65        Self {
66            store,
67            caps,
68            authority: authority.into(),
69            rate_limiter: NoopRateLimiter,
70            receipt_signer: None,
71        }
72    }
73
74    /// Production constructor.
75    ///
76    /// Validates that `authority` is a bare lowercase DNS hostname,
77    /// validates capabilities against RFC-ACDP-0007 §3, and enforces that
78    /// `caps.registry_did` equals `did:web:<authority>` (per
79    /// RFC-ACDP-0006 §4.1 step 3 — the registry's DID document binds it
80    /// to the authority it claims).
81    ///
82    /// A `host:port`, scheme-prefixed, or uppercase authority is rejected:
83    /// the server uses `authority` to mint `ctx_id` (`acdp://<authority>/…`)
84    /// and `origin_registry`, and a colon or slash there violates the
85    /// `acdp://` URI authority rule (RFC-ACDP-0002 §3.1). For `host:port`
86    /// test setups use [`Self::try_new_for_test_authority`].
87    pub fn try_new(
88        store: S,
89        caps: CapabilitiesDocument,
90        authority: impl Into<String>,
91    ) -> Result<Self, AcdpError> {
92        let authority = authority.into();
93        // Production authority MUST be a bare lowercase DNS hostname — no
94        // port, no scheme, no DID prefix (RFC-ACDP-0002 §3.1).
95        if !acdp_types::primitives::is_valid_dns_authority(&authority) {
96            return Err(AcdpError::SchemaViolation(format!(
97                "registry authority '{authority}' is not a valid DNS hostname \
98                 (must be lowercase labels, e.g. 'registry.example.com'); \
99                 use RegistryServer::try_new_for_test_authority for host:port test setups"
100            )));
101        }
102        acdp_validation::validate_capabilities(&caps)?;
103        // BUG-06: percent-encode `:` in `host:port` authorities — the
104        // colon is a structural separator in did:web.
105        let expected_did = acdp_did::authority_to_did_web(&authority);
106        if caps.registry_did != expected_did {
107            return Err(AcdpError::SchemaViolation(format!(
108                "capabilities.registry_did '{}' does not match expected '{expected_did}' \
109                 for authority '{authority}'",
110                caps.registry_did
111            )));
112        }
113        Ok(Self {
114            store,
115            caps,
116            authority,
117            rate_limiter: NoopRateLimiter,
118            receipt_signer: None,
119        })
120    }
121
122    /// Test-only constructor that accepts a `host:port` authority such as
123    /// `"localhost:8443"`. The authority is **not** validated as a DNS
124    /// hostname; capabilities and the DID binding are still checked.
125    ///
126    /// **Non-production only.** A server built with this constructor will
127    /// mint `ctx_id` and `origin_registry` values that do not conform to
128    /// the `acdp://` URI syntax rules (a colon in the authority segment).
129    /// Use [`Self::try_new`] for production registries.
130    #[doc(hidden)]
131    pub fn try_new_for_test_authority(
132        store: S,
133        caps: CapabilitiesDocument,
134        authority: impl Into<String>,
135    ) -> Result<Self, AcdpError> {
136        let authority = authority.into();
137        acdp_validation::validate_capabilities(&caps)?;
138        let expected_did = acdp_did::authority_to_did_web(&authority);
139        if caps.registry_did != expected_did {
140            return Err(AcdpError::SchemaViolation(format!(
141                "capabilities.registry_did '{}' does not match expected '{expected_did}' \
142                 for authority '{authority}'",
143                caps.registry_did
144            )));
145        }
146        Ok(Self {
147            store,
148            caps,
149            authority,
150            rate_limiter: NoopRateLimiter,
151            receipt_signer: None,
152        })
153    }
154}
155
156impl<S: RegistryStore, L: RateLimiter> RegistryServer<S, L> {
157    /// Replace the rate-limiting policy (RFC-ACDP-0008 §4.3).
158    pub fn with_rate_limiter<L2: RateLimiter>(self, limiter: L2) -> RegistryServer<S, L2> {
159        RegistryServer {
160            store: self.store,
161            caps: self.caps,
162            authority: self.authority,
163            rate_limiter: limiter,
164            receipt_signer: self.receipt_signer,
165        }
166    }
167
168    /// Configure receipt minting (ACDP 0.2, RFC-ACDP-0010). Every
169    /// subsequent verified publish mints a registry-signed receipt
170    /// atomically with persistence, returns it in the publish response,
171    /// and serves it on retrieval.
172    ///
173    /// Also advertises the `acdp-registry-receipts` profile — a
174    /// registry without a signing key MUST NOT advertise it, so the
175    /// profile is bound to this call rather than to raw capabilities
176    /// input. Fails if the signer's `registry_did` does not match
177    /// `caps.registry_did` (a receipt minted under a foreign DID would
178    /// fail every consumer's serving-authority cross-check).
179    ///
180    /// Note: [`Self::publish_unverified_for_tests`] never mints — the
181    /// producer key is not resolved on that path, so a fingerprint
182    /// attestation would be false.
183    pub fn with_receipt_signer(
184        mut self,
185        signer: acdp_types::receipt::ReceiptSigner,
186    ) -> Result<Self, AcdpError> {
187        if signer.registry_did() != self.caps.registry_did {
188            return Err(AcdpError::SchemaViolation(format!(
189                "receipt signer registry_did '{}' ≠ capabilities.registry_did '{}'",
190                signer.registry_did(),
191                self.caps.registry_did
192            )));
193        }
194        // RFC-ACDP-0010 §11: registries advertising the receipts
195        // profile MUST advertise acdp_version >= 0.2.0. The version
196        // string must be exactly the `^\d+\.\d+\.\d+$` form the
197        // capabilities schema mandates — malformed input is an error,
198        // never coerced.
199        let parts: Vec<u64> = self
200            .caps
201            .acdp_version
202            .split('.')
203            .map(|p| p.parse::<u64>())
204            .collect::<Result<_, _>>()
205            .map_err(|_| {
206                AcdpError::SchemaViolation(format!(
207                    "capabilities.acdp_version '{}' is not a plain MAJOR.MINOR.PATCH version",
208                    self.caps.acdp_version
209                ))
210            })?;
211        let [major, minor, patch] = parts.as_slice() else {
212            return Err(AcdpError::SchemaViolation(format!(
213                "capabilities.acdp_version '{}' is not a plain MAJOR.MINOR.PATCH version",
214                self.caps.acdp_version
215            )));
216        };
217        if (*major, *minor, *patch) < (0, 2, 0) {
218            return Err(AcdpError::SchemaViolation(format!(
219                "acdp-registry-receipts requires capabilities.acdp_version >= 0.2.0, got '{}'",
220                self.caps.acdp_version
221            )));
222        }
223        let profile = acdp_types::profile::Profile::RegistryReceipts.as_str();
224        if !self.caps.profiles.iter().any(|p| p == profile) {
225            self.caps.profiles.push(profile.to_string());
226        }
227        self.receipt_signer = Some(signer);
228        Ok(self)
229    }
230
231    /// Borrow the underlying store. Useful for tests that want to
232    /// inspect side-effects directly.
233    pub fn store(&self) -> &S {
234        &self.store
235    }
236
237    /// `GET /.well-known/acdp.json`.
238    pub fn capabilities(&self) -> &CapabilitiesDocument {
239        &self.caps
240    }
241
242    /// **RFC-conformant publish.**
243    ///
244    /// Runs RFC-ACDP-0003 §2.1 steps 1–11:
245    ///
246    /// - **1–6.** [`PublishValidator::validate_post_schema`] — schema,
247    ///   payload + embedded size, hash recomputation, algorithm /
248    ///   key_id binding.
249    /// - **7–8.** [`acdp_verify::verify_publish_request_signature`] —
250    ///   DID resolution + signature verification.
251    /// - **9.** Identifier assignment (`ctx_id`, `lineage_id`).
252    /// - **10.** Lineage coherence on supersession.
253    /// - **11.** Persistence and predecessor supersession.
254    ///
255    /// Steps 7–8 require a [`acdp_did::WebResolver`], so this method
256    /// is gated on the `client` feature.
257    #[cfg(feature = "client")]
258    pub async fn publish_verified(
259        &self,
260        req: &PublishRequest,
261        idempotency_key: Option<&str>,
262        resolver: &acdp_did::WebResolver,
263    ) -> Result<PublishResponse, AcdpError> {
264        self.publish_verified_in_tenant(req, idempotency_key, resolver, None)
265            .await
266    }
267
268    /// Like [`Self::publish_verified`] but binds the publish to a tenant so a
269    /// multi-tenant store persists `tenant_id` atomically with the context row
270    /// (rather than via a separate, non-transactional stamping UPDATE that a
271    /// crash could leave stranded in the default bucket). `tenant = None` is
272    /// identical to [`Self::publish_verified`].
273    #[cfg(feature = "client")]
274    pub async fn publish_verified_in_tenant(
275        &self,
276        req: &PublishRequest,
277        idempotency_key: Option<&str>,
278        resolver: &acdp_did::WebResolver,
279        tenant: Option<&str>,
280    ) -> Result<PublishResponse, AcdpError> {
281        // Rate-limit gate runs before any expensive work — RFC-ACDP-0008 §4.3.
282        self.rate_limiter.check_publish(&req.agent_id)?;
283
284        let raw_bytes = serde_json::to_vec(req)?.len();
285        let validator = PublishValidator::for_authority(&self.caps, &self.authority);
286        let _validated = validator.validate_post_schema(req, raw_bytes)?;
287
288        // Steps 7–8: DID resolution + signature verification.
289        acdp_verify::verify_publish_request_signature(req, resolver).await?;
290
291        // RFC-ACDP-0010: fingerprint the key that was just resolved and
292        // verified, for the receipt's `key_fingerprint` binding. Only
293        // resolved when a receipt will actually be minted.
294        let fingerprint = if self.receipt_signer.is_some() {
295            Some(producer_key_fingerprint(req, resolver).await?)
296        } else {
297            None
298        };
299
300        // FEAT-01: hand the rest of the pipeline to the store as a
301        // single atomic commit. Idempotency lookup, predecessor
302        // verification, body insertion, predecessor supersession
303        // marking, and idempotency record writing all happen under one
304        // critical section. Two concurrent publishes against the same
305        // `supersedes` (or the same `Idempotency-Key`) can no longer
306        // both succeed.
307        self.commit_via_store(req, idempotency_key, tenant, fingerprint)
308    }
309
310    /// **RFC-conformant publish for `did:key` producers — no resolver.**
311    ///
312    /// Runs the same RFC-ACDP-0003 §2.1 pipeline as
313    /// [`Self::publish_verified`], but performs steps 7–8 via the pure
314    /// did:key verifier
315    /// ([`acdp_verify::verify_publish_request_signature_offline`]),
316    /// so it is available without the `client` feature. Rejects
317    /// `did:web` (and any other method) producers with
318    /// `key_resolution_failed` — those need the resolver-backed
319    /// [`Self::publish_verified`].
320    ///
321    /// The capabilities gate still applies: the request is refused
322    /// unless `supported_did_methods` includes `"did:key"`.
323    pub fn publish_verified_did_key(
324        &self,
325        req: &PublishRequest,
326        idempotency_key: Option<&str>,
327    ) -> Result<PublishResponse, AcdpError> {
328        self.publish_verified_did_key_in_tenant(req, idempotency_key, None)
329    }
330
331    /// Like [`Self::publish_verified_did_key`] but binds the publish to a
332    /// tenant so a multi-tenant store persists `tenant_id` atomically with
333    /// the context row — the same contract as
334    /// [`Self::publish_verified_in_tenant`]. `tenant = None` is identical
335    /// to [`Self::publish_verified_did_key`].
336    pub fn publish_verified_did_key_in_tenant(
337        &self,
338        req: &PublishRequest,
339        idempotency_key: Option<&str>,
340        tenant: Option<&str>,
341    ) -> Result<PublishResponse, AcdpError> {
342        self.rate_limiter.check_publish(&req.agent_id)?;
343
344        let raw_bytes = serde_json::to_vec(req)?.len();
345        let validator = PublishValidator::for_authority(&self.caps, &self.authority);
346        let _validated = validator.validate_post_schema(req, raw_bytes)?;
347
348        // Steps 7–8, pure: did:key resolution + signature verification.
349        acdp_verify::verify_publish_request_signature_offline(req)?;
350
351        // did:key fingerprints are derivable from the DID itself — no
352        // resolver needed for the receipt binding.
353        let fingerprint = if self.receipt_signer.is_some() {
354            let material = acdp_did::key::resolve_did_key(req.agent_id.as_str())?;
355            Some(acdp_crypto::fingerprint::fingerprint_did_key_material(
356                &material,
357            )?)
358        } else {
359            None
360        };
361
362        self.commit_via_store(req, idempotency_key, tenant, fingerprint)
363    }
364
365    /// **NOT RFC-conformant.** Skips DID resolution and signature
366    /// verification (RFC-ACDP-0003 §2.1 steps 7–8).
367    ///
368    /// Intended for integration tests where DID resolution would require
369    /// a live network or mock server. Production callers MUST use
370    /// [`Self::publish_verified`].
371    #[doc(hidden)]
372    pub fn publish_unverified_for_tests(
373        &self,
374        req: &PublishRequest,
375    ) -> Result<PublishResponse, AcdpError> {
376        // Rate-limit gate fires here too — the limiter is intentionally
377        // wired BEFORE validation so it works as a defensive cap even
378        // when the test path is used.
379        self.rate_limiter.check_publish(&req.agent_id)?;
380
381        // RFC-ACDP-0010 §7: a receipts-advertising registry has no
382        // degraded mode — every persisted context must carry a receipt,
383        // and minting here would attest a `key_fingerprint` that was
384        // never resolved. Refuse outright rather than persist a
385        // receipt-less context.
386        if self.receipt_signer.is_some() {
387            return Err(AcdpError::SchemaViolation(
388                "publish_unverified_for_tests is unavailable on a receipts-advertising \
389                 registry (RFC-ACDP-0010 §7: no degraded mode); use publish_verified or \
390                 publish_verified_did_key"
391                    .into(),
392            ));
393        }
394        let raw_bytes = serde_json::to_vec(req)?.len();
395        let validator = PublishValidator::for_authority(&self.caps, &self.authority);
396        let _validated = validator.validate_post_schema(req, raw_bytes)?;
397        self.commit_via_store(req, None, None, None)
398    }
399
400    /// Drive `RegistryStore::commit_publish` from a validated request.
401    /// Unwraps `PublishCommitOutcome::Inserted` and `IdempotentReplay`
402    /// to the same `PublishResponse` for the caller (the distinction
403    /// only matters internally for logging/tracing).
404    fn commit_via_store(
405        &self,
406        req: &PublishRequest,
407        idempotency_key: Option<&str>,
408        tenant: Option<&str>,
409        producer_key_fingerprint: Option<String>,
410    ) -> Result<PublishResponse, AcdpError> {
411        let idempotency = if self.caps.supports_idempotency_key {
412            idempotency_key.map(|key| crate::registry::store::PendingIdempotencyCommit {
413                key,
414                ttl: chrono::Duration::seconds(
415                    self.caps
416                        .limits
417                        .idempotency_key_ttl_seconds
418                        .unwrap_or(86_400) as i64,
419                ),
420            })
421        } else {
422            None
423        };
424        // RFC-ACDP-0010 minting hook — runs inside the store's critical
425        // section so the receipt persists atomically with the context.
426        #[allow(clippy::type_complexity)]
427        let minter: Option<
428            Box<dyn Fn(&Body) -> Result<serde_json::Value, AcdpError> + Send + Sync>,
429        > = match (&self.receipt_signer, producer_key_fingerprint) {
430            (Some(signer), Some(fp)) => Some(Box::new(move |body: &Body| {
431                let receipt = signer.mint(
432                    &body.ctx_id,
433                    &body.lineage_id,
434                    &body.origin_registry,
435                    body.created_at,
436                    &body.content_hash,
437                    &fp,
438                )?;
439                serde_json::to_value(receipt).map_err(AcdpError::from)
440            })),
441            _ => None,
442        };
443        let minted_expected = minter.is_some();
444        let outcome = self
445            .store
446            .commit_publish(crate::registry::store::PublishCommit {
447                req,
448                authority: &self.authority,
449                idempotency,
450                tenant,
451                receipt_minter: minter.as_deref(),
452            })?;
453        let (response, replayed) = match outcome {
454            crate::registry::store::PublishCommitOutcome::Inserted(r) => (r, false),
455            crate::registry::store::PublishCommitOutcome::IdempotentReplay(r) => (r, true),
456        };
457        // RFC-ACDP-0010 §7 belt-and-braces: a receipts-advertising
458        // registry has no degraded mode. A store implementation that
459        // ignores `receipt_minter` (e.g. compiled against the older
460        // trait shape) must fail loudly here, not silently persist a
461        // receipt-less context.
462        //
463        // Scoped to NEWLY INSERTED contexts only: an idempotent replay
464        // returns the ORIGINAL publish response verbatim, and that
465        // original may legitimately predate receipts (a record minted
466        // before the registry enabled its signer, still inside the
467        // idempotency TTL). Failing such a replay would turn a correct
468        // producer retry into a 500 across the upgrade boundary — §7
469        // attests what was persisted at publish time, not re-mint time.
470        if minted_expected && !replayed && response.registry_receipt.is_none() {
471            return Err(AcdpError::RegistryInternal(
472                "receipt signer is configured but the store returned no receipt — \
473                 the RegistryStore implementation must invoke PublishCommit::receipt_minter \
474                 inside its commit (RFC-ACDP-0010 §7: no degraded mode)"
475                    .into(),
476            ));
477        }
478        Ok(response)
479    }
480
481    /// `GET /contexts/{ctx_id}`.
482    ///
483    /// Applies the RFC-ACDP-0008 §4.5 disclosure rules:
484    ///
485    /// | Visibility   | Authorized requester for retrieval                  |
486    /// |--------------|-----------------------------------------------------|
487    /// | `public`     | anyone (when `caps.anonymous_public_reads` is true) |
488    /// | `restricted` | producer (`agent_id`) **or** any DID in `audience`  |
489    /// | `private`    | producer (`agent_id`) **or** any DID in `audience`  |
490    ///
491    /// Returns `Ok(None)` (not `Err`) for unauthorized callers — prevents
492    /// existence leakage via error codes.
493    pub fn retrieve(
494        &self,
495        ctx_id: &CtxId,
496        requester: Option<&AgentDid>,
497    ) -> Result<Option<FullContext>, AcdpError> {
498        let Some(ctx) = self.store.get(ctx_id)? else {
499            return Ok(None);
500        };
501        if !can_retrieve(&ctx.body, requester, &self.caps) {
502            return Ok(None);
503        }
504        Ok(Some(ctx))
505    }
506
507    /// `GET /contexts/{ctx_id}/body`. See [`Self::retrieve`] for visibility rules.
508    pub fn retrieve_body(
509        &self,
510        ctx_id: &CtxId,
511        requester: Option<&AgentDid>,
512    ) -> Result<Option<Body>, AcdpError> {
513        Ok(self.retrieve(ctx_id, requester)?.map(|c| c.body))
514    }
515
516    /// `GET /lineages/{lineage_id}`.
517    ///
518    /// BUG-03: applies the same visibility filter as `retrieve`. A
519    /// caller who knows or guesses a `lineage_id` must not be able to
520    /// surface restricted or private bodies through the lineage
521    /// endpoint when `retrieve(ctx_id, requester)` would deny them.
522    pub fn lineage(
523        &self,
524        lineage_id: &LineageId,
525        requester: Option<&AgentDid>,
526    ) -> Result<Vec<FullContext>, AcdpError> {
527        let all = self.store.lineage(lineage_id)?;
528        Ok(all
529            .into_iter()
530            .filter(|ctx| can_retrieve(&ctx.body, requester, &self.caps))
531            .collect())
532    }
533
534    /// `GET /lineages/{lineage_id}/current`.
535    ///
536    /// BUG-03 + BUG-04: returns the newest non-`Superseded` version
537    /// visible to the requester. `None` when the lineage is unknown,
538    /// when every version is superseded (RFC-ACDP-0004 §5), or when no
539    /// visible version exists.
540    pub fn current(
541        &self,
542        lineage_id: &LineageId,
543        requester: Option<&AgentDid>,
544    ) -> Result<Option<FullContext>, AcdpError> {
545        let all = self.store.lineage(lineage_id)?;
546        // `lineage` returns versions ordered from v1 → vN; iterate in
547        // reverse to find the newest non-superseded version. `Active`
548        // and `Expired` both qualify as valid current heads (a body
549        // that expired without being superseded is still the latest
550        // and the consumer needs to see it to know it has lapsed).
551        for ctx in all.into_iter().rev() {
552            if !matches!(ctx.registry_state.status, Status::Superseded)
553                && can_retrieve(&ctx.body, requester, &self.caps)
554            {
555                return Ok(Some(ctx));
556            }
557        }
558        Ok(None)
559    }
560
561    /// `GET /contexts/search`.
562    ///
563    /// Applies the RFC-ACDP-0008 §4.5 search disclosure rules (note the
564    /// asymmetry vs retrieval): private contexts surface in search only
565    /// to their producer (audience members must already know the ctx_id).
566    ///
567    /// When `caps.anonymous_public_reads` is `false`, an anonymous search
568    /// request is rejected outright with [`AcdpError::NotAuthorized`]
569    /// (HTTP 403) rather than returning an empty `200`. An empty result
570    /// set would still leak the registry's existence and confirm that
571    /// the keyword query ran; the required response is `not_authorized`
572    /// (RFC-ACDP-0005 §2.5.5, RFC-ACDP-0008 §6.3, fixture `vis-009`).
573    pub fn search(
574        &self,
575        params: &SearchParams,
576        requester: Option<&AgentDid>,
577    ) -> Result<SearchResponse, AcdpError> {
578        // BUG-01 + vis-009: reject anonymous search when the registry
579        // does not allow anonymous reads. An empty 200 would still leak
580        // the registry's existence (and that the query executed); the
581        // normative response is 403 not_authorized.
582        if requester.is_none() && !self.caps.anonymous_public_reads {
583            return Err(AcdpError::NotAuthorized(
584                "anonymous search requires authentication \
585                 (registry caps: anonymous_public_reads=false)"
586                    .into(),
587            ));
588        }
589        // BUG-02: pass `anonymous_public_reads` to the store so search
590        // and retrieve agree. A registry advertising the flag as false
591        // MUST suppress public contexts for anonymous callers in BOTH
592        // endpoints (RFC-ACDP-0008 §4.5).
593        self.store
594            .search(params, requester, self.caps.anonymous_public_reads)
595    }
596}
597
598/// RFC-ACDP-0008 §4.5 retrieval disclosure rule.
599pub(crate) fn can_retrieve(
600    body: &Body,
601    requester: Option<&AgentDid>,
602    caps: &CapabilitiesDocument,
603) -> bool {
604    match body.visibility {
605        Visibility::Public => caps.anonymous_public_reads || requester.is_some(),
606        Visibility::Restricted | Visibility::Private => match requester {
607            None => false,
608            Some(r) => {
609                r == &body.agent_id
610                    || body
611                        .audience
612                        .as_deref()
613                        .is_some_and(|a| a.iter().any(|d| d == r))
614            }
615        },
616    }
617}
618
619/// Resolve and fingerprint the producer key named by
620/// `signature.key_id` — the binding recorded in a receipt's
621/// `key_fingerprint` (RFC-ACDP-0010). Delegates to the same
622/// [`acdp_crypto::fingerprint::fingerprint_for_key_id`] the consumer
623/// cross-check uses, so mint-time and verify-time fingerprints cannot
624/// drift. Callers MUST invoke this only after
625/// `verify_publish_request_signature` succeeded, so the fingerprinted
626/// key is the one that actually verified (the resolver's cache makes
627/// the second resolution cheap).
628#[cfg(feature = "client")]
629async fn producer_key_fingerprint(
630    req: &PublishRequest,
631    resolver: &acdp_did::WebResolver,
632) -> Result<String, AcdpError> {
633    acdp_crypto::fingerprint::fingerprint_for_key_id(
634        &req.signature.key_id,
635        &req.signature.algorithm,
636        resolver,
637    )
638    .await
639}
640
641#[cfg(test)]
642mod tests {
643    use super::*;
644    use crate::registry::store::InMemoryStore;
645    use acdp_crypto::SigningKey;
646    use acdp_producer::Producer;
647    use acdp_types::capabilities::Limits;
648    use acdp_types::primitives::{AgentDid, ContextType, Visibility};
649
650    fn caps() -> CapabilitiesDocument {
651        CapabilitiesDocument {
652            acdp_version: "0.1.0".into(),
653            registry_did: "did:web:registry.example.com".into(),
654            supported_signature_algorithms: vec!["ed25519".into()],
655            supported_did_methods: vec!["did:web".into()],
656            profiles: vec!["acdp-registry-core".into()],
657            limits: Limits {
658                max_payload_bytes: 1_048_576,
659                max_embedded_bytes: 65_536,
660                idempotency_key_ttl_seconds: None,
661            },
662            read_authentication_methods: vec![],
663            anonymous_public_reads: true,
664            supports_idempotency_key: false,
665            extensions: Default::default(),
666        }
667    }
668
669    fn producer() -> Producer {
670        Producer::new(
671            SigningKey::from_bytes(&[1u8; 32]),
672            AgentDid::new("did:web:agents.example.com:test"),
673            "did:web:agents.example.com:test#key-1",
674        )
675    }
676
677    #[test]
678    fn publish_v1_then_retrieve() {
679        let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
680        let p = producer();
681        let req = p
682            .publish_request()
683            .title("v1")
684            .context_type(ContextType::DataSnapshot)
685            .visibility(Visibility::Public)
686            .build()
687            .unwrap();
688        let resp = server.publish_unverified_for_tests(&req).unwrap();
689        assert_eq!(resp.version, 1);
690        let ctx = server.retrieve(&resp.ctx_id, None).unwrap().unwrap();
691        assert_eq!(ctx.body.title, "v1");
692        // Lineage round-trip
693        let lineage = server.lineage(&resp.lineage_id, None).unwrap();
694        assert_eq!(lineage.len(), 1);
695        // Current points at the same record
696        let cur = server.current(&resp.lineage_id, None).unwrap().unwrap();
697        assert_eq!(cur.body.ctx_id, resp.ctx_id);
698    }
699
700    #[test]
701    fn supersession_marks_predecessor_and_returns_v2() {
702        let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
703        let p = producer();
704        let v1_req = p
705            .publish_request()
706            .title("v1")
707            .context_type(ContextType::DataSnapshot)
708            .visibility(Visibility::Public)
709            .build()
710            .unwrap();
711        let v1 = server.publish_unverified_for_tests(&v1_req).unwrap();
712
713        let v2_req = p
714            .supersede(v1.ctx_id.clone())
715            .version(2)
716            .title("v2")
717            .context_type(ContextType::DataSnapshot)
718            .visibility(Visibility::Public)
719            .build()
720            .unwrap();
721        let v2 = server.publish_unverified_for_tests(&v2_req).unwrap();
722        assert_eq!(v2.version, 2);
723        // v1 was marked superseded
724        let v1_ctx = server.retrieve(&v1.ctx_id, None).unwrap().unwrap();
725        assert!(matches!(
726            v1_ctx.registry_state.status,
727            acdp_types::Status::Superseded
728        ));
729        // Same lineage
730        assert_eq!(v1.lineage_id, v2.lineage_id);
731        // Current resolves to v2
732        let cur = server.current(&v1.lineage_id, None).unwrap().unwrap();
733        assert_eq!(cur.body.ctx_id, v2.ctx_id);
734    }
735
736    /// FEAT-01: two concurrent publishes that both supersede the same
737    /// v1 MUST resolve to exactly one success + one
738    /// `SupersededTarget { AlreadySuperseded }`. The race was possible
739    /// when the supersedes check, body insert, and predecessor mark
740    /// lived in separate mutex acquisitions; `commit_publish` puts
741    /// them under one critical section so only one of two contenders
742    /// wins (RFC-ACDP-0003 §6).
743    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
744    async fn concurrent_supersession_exactly_one_succeeds() {
745        use std::sync::Arc;
746        let server = Arc::new(RegistryServer::new(
747            InMemoryStore::new(),
748            caps(),
749            "registry.example.com",
750        ));
751        let p = producer();
752        let v1_req = p
753            .publish_request()
754            .title("v1")
755            .context_type(ContextType::DataSnapshot)
756            .visibility(Visibility::Public)
757            .build()
758            .unwrap();
759        let v1 = server.publish_unverified_for_tests(&v1_req).unwrap();
760
761        // Pre-build BOTH v2 requests up front, then fire them in
762        // parallel on a multi-threaded runtime. With the prior
763        // non-atomic sequence the test would fail intermittently;
764        // with `commit_publish` it's deterministic.
765        let v2a_req = p
766            .supersede(v1.ctx_id.clone())
767            .version(2)
768            .title("v2-A")
769            .context_type(ContextType::DataSnapshot)
770            .visibility(Visibility::Public)
771            .build()
772            .unwrap();
773        let v2b_req = p
774            .supersede(v1.ctx_id.clone())
775            .version(2)
776            .title("v2-B")
777            .context_type(ContextType::DataSnapshot)
778            .visibility(Visibility::Public)
779            .build()
780            .unwrap();
781
782        let s1 = Arc::clone(&server);
783        let s2 = Arc::clone(&server);
784        let h1 = tokio::task::spawn_blocking(move || s1.publish_unverified_for_tests(&v2a_req));
785        let h2 = tokio::task::spawn_blocking(move || s2.publish_unverified_for_tests(&v2b_req));
786        let (r1, r2) = (h1.await.unwrap(), h2.await.unwrap());
787
788        let outcomes = [r1, r2];
789        let successes = outcomes.iter().filter(|r| r.is_ok()).count();
790        let failures = outcomes.iter().filter(|r| r.is_err()).count();
791        assert_eq!(
792            successes, 1,
793            "exactly one concurrent supersession MUST succeed; got {successes} successes / {failures} failures"
794        );
795        assert_eq!(failures, 1);
796        // The loser MUST get AlreadySuperseded — the predecessor was
797        // marked under the same lock the winner used.
798        for r in &outcomes {
799            if let Err(e) = r {
800                match e {
801                    AcdpError::SupersededTarget { reason, .. } => assert_eq!(
802                        *reason,
803                        acdp_primitives::error::SupersessionReason::AlreadySuperseded,
804                        "concurrent loser MUST be AlreadySuperseded"
805                    ),
806                    other => panic!("concurrent loser had wrong error: {other:?}"),
807                }
808            }
809        }
810    }
811
812    #[test]
813    fn hostile_supersession_by_non_owner_rejected_predecessor_unchanged() {
814        // P0-2: an attacker controlling their own DID must not be able to
815        // supersede a victim's context. Without the producer-continuity
816        // check this marks the victim's context `Superseded` and re-points
817        // `current(lineage)` at the attacker's body — a lineage takeover.
818        let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
819        let victim = producer_for(7, "did:web:agents.example.com:victim");
820        let v1_req = victim
821            .publish_request()
822            .title("v1")
823            .context_type(ContextType::DataSnapshot)
824            .visibility(Visibility::Public)
825            .build()
826            .unwrap();
827        let v1 = server.publish_unverified_for_tests(&v1_req).unwrap();
828
829        // Attacker signs their own valid v2, omitting lineage_id (the only
830        // self-declared coherence arm), supersedes = victim's v1.
831        let attacker = producer_for(9, "did:web:evil.example.com:attacker");
832        let v2_req = attacker
833            .supersede(v1.ctx_id.clone())
834            .version(2)
835            .title("hijacked")
836            .context_type(ContextType::DataSnapshot)
837            .visibility(Visibility::Public)
838            .build()
839            .unwrap();
840        let err = server.publish_unverified_for_tests(&v2_req).unwrap_err();
841        // Uniform with not-found: no existence / version / status oracle.
842        match err {
843            AcdpError::SupersededTarget { reason, .. } => {
844                assert_eq!(reason, acdp_primitives::error::SupersessionReason::NotFound);
845            }
846            other => panic!("expected uniform SupersededTarget::NotFound, got {other:?}"),
847        }
848        // Predecessor MUST be untouched: still current, not superseded.
849        let cur = server.current(&v1.lineage_id, None).unwrap().unwrap();
850        assert_eq!(cur.body.ctx_id, v1.ctx_id);
851        assert_eq!(cur.body.title, "v1");
852        assert_eq!(
853            cur.registry_state.status,
854            acdp_types::primitives::Status::Active
855        );
856    }
857
858    #[test]
859    fn owner_supersession_still_succeeds_after_ownership_check() {
860        let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
861        let p = producer();
862        let v1_req = p
863            .publish_request()
864            .title("v1")
865            .context_type(ContextType::DataSnapshot)
866            .visibility(Visibility::Public)
867            .build()
868            .unwrap();
869        let v1 = server.publish_unverified_for_tests(&v1_req).unwrap();
870        let v2_req = p
871            .supersede(v1.ctx_id.clone())
872            .version(2)
873            .title("v2")
874            .context_type(ContextType::DataSnapshot)
875            .visibility(Visibility::Public)
876            .build()
877            .unwrap();
878        let v2 = server.publish_unverified_for_tests(&v2_req).unwrap();
879        assert_eq!(v2.version, 2);
880        let cur = server.current(&v1.lineage_id, None).unwrap().unwrap();
881        assert_eq!(cur.body.ctx_id, v2.ctx_id);
882    }
883
884    #[test]
885    fn supersession_with_unknown_target_rejected_as_not_found() {
886        let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
887        let p = producer();
888        let phantom =
889            CtxId("acdp://registry.example.com/12345678-1234-4321-8123-deadbeefcafe".into());
890        let req = p
891            .supersede(phantom)
892            .version(2)
893            .title("v2-orphan")
894            .context_type(ContextType::DataSnapshot)
895            .visibility(Visibility::Public)
896            .build()
897            .unwrap();
898        let err = server.publish_unverified_for_tests(&req).unwrap_err();
899        match err {
900            AcdpError::SupersededTarget { reason, .. } => {
901                assert_eq!(reason, acdp_primitives::error::SupersessionReason::NotFound);
902            }
903            other => panic!("expected SupersededTarget::NotFound, got {other:?}"),
904        }
905    }
906
907    #[test]
908    fn version_mismatch_rejected() {
909        let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
910        let p = producer();
911        let v1_req = p
912            .publish_request()
913            .title("v1")
914            .context_type(ContextType::DataSnapshot)
915            .visibility(Visibility::Public)
916            .build()
917            .unwrap();
918        let v1 = server.publish_unverified_for_tests(&v1_req).unwrap();
919        // Build a v3 (wrong) supersession
920        let v3_req = p
921            .supersede(v1.ctx_id.clone())
922            .version(3)
923            .title("v3-skipped")
924            .context_type(ContextType::DataSnapshot)
925            .visibility(Visibility::Public)
926            .build()
927            .unwrap();
928        let err = server.publish_unverified_for_tests(&v3_req).unwrap_err();
929        match err {
930            AcdpError::SupersededTarget { reason, .. } => {
931                assert_eq!(
932                    reason,
933                    acdp_primitives::error::SupersessionReason::VersionMismatch
934                );
935            }
936            other => panic!("expected VersionMismatch, got {other:?}"),
937        }
938    }
939
940    #[test]
941    fn search_finds_published_context() {
942        let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
943        let p = producer();
944        let req = p
945            .publish_request()
946            .title("Q1 portfolio risk")
947            .context_type(ContextType::DataSnapshot)
948            .visibility(Visibility::Public)
949            .build()
950            .unwrap();
951        server.publish_unverified_for_tests(&req).unwrap();
952        let resp = server
953            .search(
954                &SearchParams {
955                    q: Some("portfolio".into()),
956                    ..Default::default()
957                },
958                None,
959            )
960            .unwrap();
961        assert_eq!(resp.matches.len(), 1);
962        assert_eq!(resp.matches[0].title, "Q1 portfolio risk");
963    }
964
965    // ── BUG-03 — lineage/current visibility filtering ──────────────────
966
967    /// BUG-03: a stranger calling `lineage()` MUST NOT see restricted
968    /// bodies they aren't on the audience for. The retrieval predicate
969    /// is now mirrored here.
970    #[test]
971    fn lineage_filters_restricted_for_stranger() {
972        let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
973        let p = producer();
974        let audience = AgentDid::new("did:web:audience.example.com:reader");
975        let req = p
976            .publish_request()
977            .title("restricted v1")
978            .context_type(ContextType::DataSnapshot)
979            .visibility(Visibility::Restricted)
980            .audience(vec![audience.clone()])
981            .build()
982            .unwrap();
983        let resp = server.publish_unverified_for_tests(&req).unwrap();
984
985        let stranger = AgentDid::new("did:web:other.example.com:reader");
986        let stranger_view = server.lineage(&resp.lineage_id, Some(&stranger)).unwrap();
987        assert!(
988            stranger_view.is_empty(),
989            "stranger MUST NOT see restricted bodies via lineage(); got {} entries",
990            stranger_view.len()
991        );
992
993        let audience_view = server.lineage(&resp.lineage_id, Some(&audience)).unwrap();
994        assert_eq!(
995            audience_view.len(),
996            1,
997            "audience member MUST see the restricted body via lineage()"
998        );
999    }
1000
1001    /// BUG-03: `current()` also filters by requester visibility.
1002    /// A stranger gets `None` for a private lineage.
1003    #[test]
1004    fn current_filters_private_for_stranger() {
1005        let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
1006        let p = producer();
1007        let req = p
1008            .publish_request()
1009            .title("private v1")
1010            .context_type(ContextType::DataSnapshot)
1011            .visibility(Visibility::Private)
1012            .build()
1013            .unwrap();
1014        let resp = server.publish_unverified_for_tests(&req).unwrap();
1015
1016        let stranger = AgentDid::new("did:web:other.example.com:reader");
1017        assert!(
1018            server
1019                .current(&resp.lineage_id, Some(&stranger))
1020                .unwrap()
1021                .is_none(),
1022            "stranger MUST NOT see private contexts via current()"
1023        );
1024
1025        let producer_did = AgentDid::new("did:web:agents.example.com:test");
1026        assert!(
1027            server
1028                .current(&resp.lineage_id, Some(&producer_did))
1029                .unwrap()
1030                .is_some(),
1031            "producer MUST see private contexts via current()"
1032        );
1033    }
1034
1035    // ── BUG-04 — current() superseded fallback ─────────────────────────
1036
1037    /// BUG-04: when every version of a lineage is `Superseded`,
1038    /// `current()` MUST return `None`. Previously the fallback returned
1039    /// the last entry projected, which is a protocol violation
1040    /// (RFC-ACDP-0004 §5: "If no such version exists, returns not_found").
1041    ///
1042    /// Constructing an all-superseded lineage requires a direct store
1043    /// mark — there's no publish path that produces this state today,
1044    /// but the registry's `current()` MUST not implicitly fall through.
1045    #[test]
1046    fn current_returns_none_when_all_superseded() {
1047        use crate::registry::store::RegistryStore;
1048        let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
1049        let p = producer();
1050        let req = p
1051            .publish_request()
1052            .title("v1")
1053            .context_type(ContextType::DataSnapshot)
1054            .visibility(Visibility::Public)
1055            .build()
1056            .unwrap();
1057        let resp = server.publish_unverified_for_tests(&req).unwrap();
1058        // Force the only version into Superseded directly.
1059        server.store().mark_superseded(&resp.ctx_id).unwrap();
1060
1061        let cur = server.current(&resp.lineage_id, None).unwrap();
1062        assert!(
1063            cur.is_none(),
1064            "all-superseded lineage MUST resolve to None per RFC-ACDP-0004 §5; got {cur:?}"
1065        );
1066    }
1067
1068    // ── BUG-01 / vis-009 — anonymous search honors anonymous_public_reads ──
1069
1070    /// BUG-01 + vis-009: a registry advertising `anonymous_public_reads:
1071    /// false` MUST reject an anonymous search with `not_authorized`
1072    /// (HTTP 403) — not an empty `200`, which would still leak the
1073    /// registry's existence. The same context surfaces with a `200`
1074    /// once the requester authenticates.
1075    #[test]
1076    fn search_suppresses_public_when_anonymous_public_reads_false() {
1077        let mut c = caps();
1078        c.anonymous_public_reads = false;
1079        let server = RegistryServer::new(InMemoryStore::new(), c, "registry.example.com");
1080        let p = producer();
1081        let req = p
1082            .publish_request()
1083            .title("public-but-flag-off")
1084            .context_type(ContextType::DataSnapshot)
1085            .visibility(Visibility::Public)
1086            .build()
1087            .unwrap();
1088        server.publish_unverified_for_tests(&req).unwrap();
1089
1090        // Anonymous: MUST be rejected with NotAuthorized (vis-009 s1).
1091        let err = server
1092            .search(
1093                &SearchParams {
1094                    q: Some("public-but-flag-off".into()),
1095                    ..Default::default()
1096                },
1097                None,
1098            )
1099            .unwrap_err();
1100        assert!(
1101            matches!(err, AcdpError::NotAuthorized(_)),
1102            "vis-009: anonymous search MUST be NotAuthorized when \
1103             anonymous_public_reads=false; got {err:?}"
1104        );
1105
1106        // Authenticated requester (any DID — public is universally visible
1107        // once authenticated): MUST see the context.
1108        let stranger = AgentDid::new("did:web:other.example.com:reader");
1109        let authed = server
1110            .search(
1111                &SearchParams {
1112                    q: Some("public-but-flag-off".into()),
1113                    ..Default::default()
1114                },
1115                Some(&stranger),
1116            )
1117            .unwrap();
1118        assert_eq!(
1119            authed.matches.len(),
1120            1,
1121            "authenticated search MUST see public contexts regardless of anonymous_public_reads"
1122        );
1123    }
1124
1125    // ── try_new validation tests ────────────────────────────────────────
1126
1127    #[test]
1128    fn try_new_rejects_did_authority_mismatch() {
1129        let mut c = caps();
1130        c.registry_did = "did:web:other.example.com".into(); // wrong authority
1131        let res = RegistryServer::try_new(InMemoryStore::new(), c, "registry.example.com");
1132        match res {
1133            Err(AcdpError::SchemaViolation(msg)) => {
1134                assert!(msg.contains("does not match expected"))
1135            }
1136            Err(other) => panic!("expected SchemaViolation, got {other:?}"),
1137            Ok(_) => panic!("expected Err"),
1138        }
1139    }
1140
1141    #[test]
1142    fn try_new_rejects_caps_missing_ed25519() {
1143        let mut c = caps();
1144        c.supported_signature_algorithms = vec!["ecdsa-p256".into()]; // missing ed25519
1145        let res = RegistryServer::try_new(InMemoryStore::new(), c, "registry.example.com");
1146        assert!(matches!(res, Err(AcdpError::SchemaViolation(_))));
1147    }
1148
1149    #[test]
1150    fn try_new_accepts_valid_caps() {
1151        RegistryServer::try_new(InMemoryStore::new(), caps(), "registry.example.com").unwrap();
1152    }
1153
1154    // ── WIRE-04 — try_new authority-format validation ───────────────────
1155
1156    #[test]
1157    fn try_new_accepts_valid_dns_authority() {
1158        RegistryServer::try_new(InMemoryStore::new(), caps(), "registry.example.com").unwrap();
1159    }
1160
1161    #[test]
1162    fn try_new_rejects_host_port_authority() {
1163        // A `host:port` authority would mint `acdp://localhost:8443/<uuid>`
1164        // ctx_ids — a colon violates the acdp:// authority rule.
1165        let res = RegistryServer::try_new(InMemoryStore::new(), caps(), "localhost:8443");
1166        assert!(matches!(res, Err(AcdpError::SchemaViolation(_))));
1167    }
1168
1169    #[test]
1170    fn try_new_rejects_uppercase_authority() {
1171        let res = RegistryServer::try_new(InMemoryStore::new(), caps(), "Registry.Example.Com");
1172        assert!(matches!(res, Err(AcdpError::SchemaViolation(_))));
1173    }
1174
1175    #[test]
1176    fn try_new_rejects_url_form_authority() {
1177        let res =
1178            RegistryServer::try_new(InMemoryStore::new(), caps(), "https://registry.example.com");
1179        assert!(matches!(res, Err(AcdpError::SchemaViolation(_))));
1180    }
1181
1182    #[test]
1183    fn try_new_for_test_accepts_host_port() {
1184        // The test constructor skips the DNS-authority check; it still
1185        // enforces the DID binding, so the caps DID must match.
1186        let mut c = caps();
1187        c.registry_did = acdp_did::authority_to_did_web("localhost:8443");
1188        RegistryServer::try_new_for_test_authority(InMemoryStore::new(), c, "localhost:8443")
1189            .unwrap();
1190    }
1191
1192    // ── Visibility-enforcement tests (RFC-ACDP-0008 §4.5) ───────────────
1193
1194    fn producer_for(seed: u8, did: &str) -> Producer {
1195        Producer::new(
1196            SigningKey::from_bytes(&[seed; 32]),
1197            AgentDid::new(did),
1198            format!("{did}#key-1"),
1199        )
1200    }
1201
1202    #[test]
1203    fn retrieve_restricted_blocks_stranger_returns_none() {
1204        let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
1205        let owner = AgentDid::new("did:web:agents.example.com:owner");
1206        let audience_member = AgentDid::new("did:web:agents.example.com:friend");
1207        let p = producer_for(2, owner.as_str());
1208        let req = p
1209            .publish_request()
1210            .title("restricted")
1211            .context_type(ContextType::DataSnapshot)
1212            .visibility(Visibility::Restricted)
1213            .audience(vec![audience_member.clone()])
1214            .build()
1215            .unwrap();
1216        let resp = server.publish_unverified_for_tests(&req).unwrap();
1217        let stranger = AgentDid::new("did:web:agents.example.com:stranger");
1218
1219        assert!(server.retrieve(&resp.ctx_id, None).unwrap().is_none());
1220        assert!(server
1221            .retrieve(&resp.ctx_id, Some(&stranger))
1222            .unwrap()
1223            .is_none());
1224        assert!(server
1225            .retrieve(&resp.ctx_id, Some(&owner))
1226            .unwrap()
1227            .is_some());
1228        assert!(server
1229            .retrieve(&resp.ctx_id, Some(&audience_member))
1230            .unwrap()
1231            .is_some());
1232    }
1233
1234    #[test]
1235    fn search_restricted_filters_strangers() {
1236        let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
1237        let owner = AgentDid::new("did:web:agents.example.com:owner");
1238        let p = producer_for(3, owner.as_str());
1239        let req = p
1240            .publish_request()
1241            .title("hush hush")
1242            .context_type(ContextType::DataSnapshot)
1243            .visibility(Visibility::Restricted)
1244            .audience(vec![AgentDid::new("did:web:agents.example.com:friend")])
1245            .build()
1246            .unwrap();
1247        server.publish_unverified_for_tests(&req).unwrap();
1248
1249        let stranger = AgentDid::new("did:web:agents.example.com:stranger");
1250        let r_anon = server.search(&SearchParams::default(), None).unwrap();
1251        assert!(
1252            r_anon.matches.is_empty(),
1253            "anonymous must not see restricted"
1254        );
1255        let r_stranger = server
1256            .search(&SearchParams::default(), Some(&stranger))
1257            .unwrap();
1258        assert!(r_stranger.matches.is_empty());
1259        let r_owner = server
1260            .search(&SearchParams::default(), Some(&owner))
1261            .unwrap();
1262        assert_eq!(r_owner.matches.len(), 1);
1263    }
1264
1265    /// RFC-ACDP-0008 §4.5 asymmetry: a private context surfaces in search
1266    /// only to its producer — audience members can retrieve by id but can't
1267    /// discover via search.
1268    #[test]
1269    fn search_private_visible_only_to_producer() {
1270        let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
1271        let owner = AgentDid::new("did:web:agents.example.com:owner");
1272        let audience_member = AgentDid::new("did:web:agents.example.com:friend");
1273        let p = producer_for(4, owner.as_str());
1274        let req = p
1275            .publish_request()
1276            .title("private note")
1277            .context_type(ContextType::DataSnapshot)
1278            .visibility(Visibility::Private)
1279            .audience(vec![audience_member.clone()])
1280            .build()
1281            .unwrap();
1282        let resp = server.publish_unverified_for_tests(&req).unwrap();
1283
1284        let r_audience = server
1285            .search(&SearchParams::default(), Some(&audience_member))
1286            .unwrap();
1287        assert!(
1288            r_audience.matches.is_empty(),
1289            "audience must NOT see private in search"
1290        );
1291        let r_owner = server
1292            .search(&SearchParams::default(), Some(&owner))
1293            .unwrap();
1294        assert_eq!(
1295            r_owner.matches.len(),
1296            1,
1297            "owner sees their own private context"
1298        );
1299
1300        // Audience CAN retrieve directly by id.
1301        assert!(server
1302            .retrieve(&resp.ctx_id, Some(&audience_member))
1303            .unwrap()
1304            .is_some());
1305    }
1306
1307    // ── publish_verified offline-rejection tests ────────────────────────
1308    //
1309    // Full end-to-end `publish_verified` requires a TLS-mocked DID
1310    // document (because `WebResolver` is HTTPS-only). These tests cover
1311    // the rejection paths that fire BEFORE the resolver call so they
1312    // don't need a network: malformed key_id, non-did:web key_id,
1313    // agent_id ≠ key_id DID portion. Together with the existing
1314    // `verify_signature_envelope` algorithm-downgrade unit test, they
1315    // pin the entry checks of RFC-ACDP-0003 §2.1 steps 7–8 without
1316    // requiring a TLS mock harness.
1317
1318    #[cfg(feature = "client")]
1319    #[tokio::test]
1320    async fn publish_verified_rejects_non_did_web_key_id() {
1321        let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
1322        let p = producer();
1323        let mut req = p
1324            .publish_request()
1325            .title("v1")
1326            .context_type(ContextType::DataSnapshot)
1327            .visibility(Visibility::Public)
1328            .build()
1329            .unwrap();
1330        // Mutate post-build — validation already ran and accepted did:web.
1331        // Re-sign isn't necessary: the verifier rejects before signature
1332        // check. Use a *well-formed* did:key URL (a malformed one is
1333        // caught earlier by schema validation as of ACDP 0.2): the
1334        // key_id DID portion no longer matches the did:web agent_id, so
1335        // the binding check refuses it.
1336        let did_key = acdp_did::key::did_key_from_ed25519(
1337            &SigningKey::from_bytes(&[9u8; 32]).verifying_key_bytes(),
1338        );
1339        req.signature.key_id = acdp_did::key::did_key_url(&did_key).unwrap();
1340        let resolver = acdp_did::WebResolver::new();
1341        let err = server
1342            .publish_verified(&req, None, &resolver)
1343            .await
1344            .unwrap_err();
1345        match err {
1346            AcdpError::KeyNotAuthorized(msg) => assert!(msg.contains("did:web")),
1347            other => panic!("expected KeyNotAuthorized for non-did:web, got {other:?}"),
1348        }
1349    }
1350
1351    #[cfg(feature = "client")]
1352    #[tokio::test]
1353    async fn publish_verified_rejects_agent_id_keyid_mismatch() {
1354        let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
1355        let p = producer();
1356        let mut req = p
1357            .publish_request()
1358            .title("v1")
1359            .context_type(ContextType::DataSnapshot)
1360            .visibility(Visibility::Public)
1361            .build()
1362            .unwrap();
1363        req.signature.key_id = "did:web:other.example.com:agent#key-1".into();
1364        let resolver = acdp_did::WebResolver::new();
1365        let err = server
1366            .publish_verified(&req, None, &resolver)
1367            .await
1368            .unwrap_err();
1369        match err {
1370            AcdpError::KeyNotAuthorized(msg) => assert!(msg.contains("agent_id")),
1371            other => panic!("expected KeyNotAuthorized for agent_id mismatch, got {other:?}"),
1372        }
1373    }
1374
1375    #[cfg(feature = "client")]
1376    #[tokio::test]
1377    async fn publish_verified_rejects_keyid_without_fragment() {
1378        let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
1379        let p = producer();
1380        let mut req = p
1381            .publish_request()
1382            .title("v1")
1383            .context_type(ContextType::DataSnapshot)
1384            .visibility(Visibility::Public)
1385            .build()
1386            .unwrap();
1387        req.signature.key_id = "did:web:agents.example.com:test".into(); // no '#'
1388        let resolver = acdp_did::WebResolver::new();
1389        let err = server
1390            .publish_verified(&req, None, &resolver)
1391            .await
1392            .unwrap_err();
1393        // Schema validation (step 1) catches missing-fragment before
1394        // step 7 fires, so the surface error is SchemaViolation.
1395        assert!(
1396            matches!(
1397                err,
1398                AcdpError::SchemaViolation(_) | AcdpError::KeyResolution(_)
1399            ),
1400            "expected fragment-rejection error, got {err:?}"
1401        );
1402    }
1403
1404    // ── FEAT-04 idempotency tests ──────────────────────────────────────
1405
1406    fn caps_with_idempotency() -> CapabilitiesDocument {
1407        let mut c = caps();
1408        c.supports_idempotency_key = true;
1409        c.limits.idempotency_key_ttl_seconds = Some(86_400);
1410        c
1411    }
1412
1413    #[test]
1414    fn idempotency_same_hash_returns_original_response() {
1415        let server = RegistryServer::new(
1416            InMemoryStore::new(),
1417            caps_with_idempotency(),
1418            "registry.example.com",
1419        );
1420        let p = producer();
1421        let req = p
1422            .publish_request()
1423            .title("once")
1424            .context_type(ContextType::DataSnapshot)
1425            .visibility(Visibility::Public)
1426            .build()
1427            .unwrap();
1428        // First publish (using the offline path; idempotency works either way).
1429        let first = server.publish_unverified_for_tests(&req).unwrap();
1430        // Record the idempotency entry as if it had come in through
1431        // publish_verified — we test only the lookup logic here, so
1432        // simulate via the store API.
1433        let ttl = caps_with_idempotency()
1434            .limits
1435            .idempotency_key_ttl_seconds
1436            .unwrap() as i64;
1437        server
1438            .store()
1439            .idempotency_record(
1440                &req.agent_id,
1441                "k-001",
1442                &req.content_hash,
1443                &first,
1444                chrono::Utc::now() + chrono::Duration::seconds(ttl),
1445            )
1446            .unwrap();
1447        let prior = server
1448            .store()
1449            .idempotency_lookup(&req.agent_id, "k-001")
1450            .unwrap()
1451            .unwrap();
1452        assert_eq!(prior.content_hash, req.content_hash);
1453        assert_eq!(prior.response.ctx_id, first.ctx_id);
1454    }
1455
1456    #[test]
1457    fn idempotency_evicts_after_ttl() {
1458        let store = InMemoryStore::new();
1459        let agent = AgentDid::new("did:web:agents.example.com:test");
1460        let resp = PublishResponse {
1461            registry_receipt: None,
1462            ctx_id: acdp_types::CtxId("acdp://r/12345678-1234-4321-8123-000000000099".into()),
1463            lineage_id: acdp_types::LineageId(
1464                "lin:sha256:9999999999999999999999999999999999999999999999999999999999999999"
1465                    .into(),
1466            ),
1467            version: 1,
1468            created_at: chrono::Utc::now(),
1469            status: Status::Active,
1470        };
1471        // Already-past expiration.
1472        let past = chrono::Utc::now() - chrono::Duration::seconds(1);
1473        store
1474            .idempotency_record(
1475                &agent,
1476                "expired",
1477                &acdp_types::ContentHash("sha256:0".into()),
1478                &resp,
1479                past,
1480            )
1481            .unwrap();
1482        // Lookup runs lazy eviction; the expired record MUST be gone.
1483        let prior = store.idempotency_lookup(&agent, "expired").unwrap();
1484        assert!(
1485            prior.is_none(),
1486            "lazy TTL eviction should drop expired record"
1487        );
1488    }
1489
1490    // ── FEAT-05 rate limiter tests ─────────────────────────────────────
1491
1492    struct AlwaysDeny;
1493    impl crate::registry::RateLimiter for AlwaysDeny {
1494        fn check_publish(&self, agent_id: &AgentDid) -> Result<(), AcdpError> {
1495            Err(AcdpError::RateLimited(format!("blocked: {agent_id}")))
1496        }
1497    }
1498
1499    #[test]
1500    fn rate_limiter_blocks_publish_before_persist() {
1501        let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com")
1502            .with_rate_limiter(AlwaysDeny);
1503        let p = producer();
1504        let req = p
1505            .publish_request()
1506            .title("blocked")
1507            .context_type(ContextType::DataSnapshot)
1508            .visibility(Visibility::Public)
1509            .build()
1510            .unwrap();
1511        let err = server.publish_unverified_for_tests(&req).unwrap_err();
1512        assert!(matches!(err, AcdpError::RateLimited(_)));
1513        // And the store is empty — the limiter MUST short-circuit before persist.
1514        let resp = server.search(&SearchParams::default(), None).unwrap();
1515        assert!(
1516            resp.matches.is_empty(),
1517            "rate-limited publish must not persist"
1518        );
1519    }
1520
1521    #[test]
1522    fn created_at_is_ms_truncated() {
1523        let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
1524        let p = producer();
1525        let req = p
1526            .publish_request()
1527            .title("ms")
1528            .context_type(ContextType::DataSnapshot)
1529            .visibility(Visibility::Public)
1530            .build()
1531            .unwrap();
1532        let resp = server.publish_unverified_for_tests(&req).unwrap();
1533        // Nanosecond component of a ms-truncated timestamp is always a multiple of 1_000_000.
1534        assert_eq!(
1535            resp.created_at.timestamp_subsec_nanos() % 1_000_000,
1536            0,
1537            "created_at must be millisecond-truncated per RFC-ACDP-0001 §5.3"
1538        );
1539    }
1540
1541    // ── did:key publish (ACDP 0.2) ───────────────────────────────────────
1542
1543    fn did_key_request() -> acdp_types::publish::PublishRequest {
1544        let p = Producer::new_did_key(SigningKey::from_bytes(&[7u8; 32]));
1545        p.publish_request()
1546            .title("did:key publish")
1547            .context_type(ContextType::DataSnapshot)
1548            .visibility(Visibility::Public)
1549            .build()
1550            .unwrap()
1551    }
1552
1553    /// A registry that does NOT advertise `did:key` in
1554    /// `supported_did_methods` refuses a did:key publish with
1555    /// `key_resolution_failed` (permanent) — the anchor-plan decision.
1556    #[test]
1557    fn did_key_publish_rejected_when_not_advertised() {
1558        let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
1559        let err = server
1560            .publish_verified_did_key(&did_key_request(), None)
1561            .unwrap_err();
1562        assert!(
1563            matches!(err, AcdpError::KeyResolution(ref m) if m.contains("supported_did_methods")),
1564            "got {err:?}"
1565        );
1566    }
1567
1568    /// With `did:key` advertised, the offline pipeline runs end-to-end:
1569    /// schema → hash → pure key resolution → signature → persistence.
1570    /// No resolver, no network — works in a `server`-only build.
1571    #[test]
1572    fn did_key_publish_verified_end_to_end() {
1573        let mut c = caps();
1574        c.supported_did_methods.push("did:key".into());
1575        let server = RegistryServer::new(InMemoryStore::new(), c, "registry.example.com");
1576        let req = did_key_request();
1577        let resp = server.publish_verified_did_key(&req, None).unwrap();
1578        assert_eq!(resp.ctx_id.authority(), "registry.example.com");
1579
1580        // Tampered title → hash mismatch caught before signature.
1581        let mut tampered = did_key_request();
1582        tampered.title = "tampered".into();
1583        let err = server
1584            .publish_verified_did_key(&tampered, None)
1585            .unwrap_err();
1586        assert!(matches!(err, AcdpError::HashMismatch { .. }), "got {err:?}");
1587    }
1588
1589    /// Upgrade boundary: a registry that enables receipts must still
1590    /// honor idempotent replays of records minted BEFORE the signer
1591    /// existed. The §7 no-degraded-mode check applies to newly inserted
1592    /// contexts only — a replayed pre-receipts response (no
1593    /// `registry_receipt`) is returned verbatim, not failed as a 500.
1594    #[test]
1595    fn receiptless_idempotent_replay_survives_enabling_receipts() {
1596        let mut c = caps();
1597        c.acdp_version = "0.2.0".into();
1598        c.supported_did_methods.push("did:key".into());
1599        c.supports_idempotency_key = true;
1600        c.limits.idempotency_key_ttl_seconds = Some(86_400);
1601        let server = RegistryServer::new(InMemoryStore::new(), c, "registry.example.com")
1602            .with_receipt_signer(
1603                acdp_types::receipt::ReceiptSigner::new(
1604                    SigningKey::from_bytes(&[0x11u8; 32]),
1605                    "did:web:registry.example.com",
1606                    "did:web:registry.example.com#receipt-key-1",
1607                )
1608                .unwrap(),
1609            )
1610            .unwrap();
1611
1612        // Simulate a record persisted before receipts were enabled: the
1613        // stored response carries no `registry_receipt`.
1614        let req = did_key_request();
1615        let pre_receipts_response = acdp_types::publish::PublishResponse {
1616            ctx_id: CtxId(format!(
1617                "acdp://registry.example.com/{}",
1618                uuid::Uuid::new_v4()
1619            )),
1620            lineage_id: acdp_crypto::derive_lineage_id(&CtxId(
1621                "acdp://registry.example.com/v1".into(),
1622            )),
1623            version: 1,
1624            created_at: acdp_primitives::time::trunc_ms(chrono::Utc::now()),
1625            status: Status::Active,
1626            registry_receipt: None,
1627        };
1628        server
1629            .store()
1630            .idempotency_record(
1631                &req.agent_id,
1632                "pre-receipts-key",
1633                &req.content_hash,
1634                &pre_receipts_response,
1635                chrono::Utc::now() + chrono::Duration::hours(1),
1636            )
1637            .unwrap();
1638
1639        // Same agent + key + content_hash → the replay must return the
1640        // original receipt-less response, not RegistryInternal.
1641        let resp = server
1642            .publish_verified_did_key(&req, Some("pre-receipts-key"))
1643            .expect("replay of a pre-receipts record must succeed");
1644        assert_eq!(resp.ctx_id, pre_receipts_response.ctx_id);
1645        assert!(
1646            resp.registry_receipt.is_none(),
1647            "replay returns the original response verbatim"
1648        );
1649
1650        // A FRESH publish on the same server still enforces minting.
1651        let p2 = Producer::new_did_key(SigningKey::from_bytes(&[8u8; 32]));
1652        let fresh = p2
1653            .publish_request()
1654            .title("fresh after enabling receipts")
1655            .context_type(ContextType::DataSnapshot)
1656            .visibility(Visibility::Public)
1657            .build()
1658            .unwrap();
1659        let fresh_resp = server.publish_verified_did_key(&fresh, None).unwrap();
1660        assert!(
1661            fresh_resp.registry_receipt.is_some(),
1662            "new inserts on a receipts registry must mint"
1663        );
1664    }
1665
1666    /// `publish_verified_did_key` refuses did:web producers — they need
1667    /// the resolver-backed `publish_verified`.
1668    #[test]
1669    fn did_key_publish_path_refuses_did_web() {
1670        let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
1671        let p = producer();
1672        let req = p
1673            .publish_request()
1674            .title("did:web on the offline path")
1675            .context_type(ContextType::DataSnapshot)
1676            .visibility(Visibility::Public)
1677            .build()
1678            .unwrap();
1679        let err = server.publish_verified_did_key(&req, None).unwrap_err();
1680        assert!(
1681            matches!(err, AcdpError::KeyResolution(_)),
1682            "did:web on the offline path must be refused, got {err:?}"
1683        );
1684    }
1685}