1use crate::registry::rate_limit::{NoopRateLimiter, RateLimiter};
31use crate::registry::store::RegistryStore;
32use crate::registry::validator::PublishValidator;
33use acdp_primitives::error::AcdpError;
34use acdp_types::{
35 body::{Body, FullContext},
36 capabilities::CapabilitiesDocument,
37 primitives::{AgentDid, CtxId, LineageId, Status, Visibility},
38 publish::{PublishRequest, PublishResponse},
39 search::{SearchParams, SearchResponse},
40};
41
42pub struct RegistryServer<S: RegistryStore, L: RateLimiter = NoopRateLimiter> {
48 store: S,
49 caps: CapabilitiesDocument,
50 authority: String,
51 rate_limiter: L,
52 receipt_signer: Option<acdp_types::receipt::ReceiptSigner>,
57}
58
59impl<S: RegistryStore> RegistryServer<S, NoopRateLimiter> {
60 #[doc(hidden)]
64 pub fn new(store: S, caps: CapabilitiesDocument, authority: impl Into<String>) -> Self {
65 Self {
66 store,
67 caps,
68 authority: authority.into(),
69 rate_limiter: NoopRateLimiter,
70 receipt_signer: None,
71 }
72 }
73
74 pub fn try_new(
88 store: S,
89 caps: CapabilitiesDocument,
90 authority: impl Into<String>,
91 ) -> Result<Self, AcdpError> {
92 let authority = authority.into();
93 if !acdp_types::primitives::is_valid_dns_authority(&authority) {
96 return Err(AcdpError::SchemaViolation(format!(
97 "registry authority '{authority}' is not a valid DNS hostname \
98 (must be lowercase labels, e.g. 'registry.example.com'); \
99 use RegistryServer::try_new_for_test_authority for host:port test setups"
100 )));
101 }
102 acdp_validation::validate_capabilities(&caps)?;
103 let expected_did = acdp_did::authority_to_did_web(&authority);
106 if caps.registry_did != expected_did {
107 return Err(AcdpError::SchemaViolation(format!(
108 "capabilities.registry_did '{}' does not match expected '{expected_did}' \
109 for authority '{authority}'",
110 caps.registry_did
111 )));
112 }
113 Ok(Self {
114 store,
115 caps,
116 authority,
117 rate_limiter: NoopRateLimiter,
118 receipt_signer: None,
119 })
120 }
121
122 #[doc(hidden)]
131 pub fn try_new_for_test_authority(
132 store: S,
133 caps: CapabilitiesDocument,
134 authority: impl Into<String>,
135 ) -> Result<Self, AcdpError> {
136 let authority = authority.into();
137 acdp_validation::validate_capabilities(&caps)?;
138 let expected_did = acdp_did::authority_to_did_web(&authority);
139 if caps.registry_did != expected_did {
140 return Err(AcdpError::SchemaViolation(format!(
141 "capabilities.registry_did '{}' does not match expected '{expected_did}' \
142 for authority '{authority}'",
143 caps.registry_did
144 )));
145 }
146 Ok(Self {
147 store,
148 caps,
149 authority,
150 rate_limiter: NoopRateLimiter,
151 receipt_signer: None,
152 })
153 }
154}
155
156impl<S: RegistryStore, L: RateLimiter> RegistryServer<S, L> {
157 pub fn with_rate_limiter<L2: RateLimiter>(self, limiter: L2) -> RegistryServer<S, L2> {
159 RegistryServer {
160 store: self.store,
161 caps: self.caps,
162 authority: self.authority,
163 rate_limiter: limiter,
164 receipt_signer: self.receipt_signer,
165 }
166 }
167
168 pub fn with_receipt_signer(
184 mut self,
185 signer: acdp_types::receipt::ReceiptSigner,
186 ) -> Result<Self, AcdpError> {
187 if signer.registry_did() != self.caps.registry_did {
188 return Err(AcdpError::SchemaViolation(format!(
189 "receipt signer registry_did '{}' ≠ capabilities.registry_did '{}'",
190 signer.registry_did(),
191 self.caps.registry_did
192 )));
193 }
194 let parts: Vec<u64> = self
200 .caps
201 .acdp_version
202 .split('.')
203 .map(|p| p.parse::<u64>())
204 .collect::<Result<_, _>>()
205 .map_err(|_| {
206 AcdpError::SchemaViolation(format!(
207 "capabilities.acdp_version '{}' is not a plain MAJOR.MINOR.PATCH version",
208 self.caps.acdp_version
209 ))
210 })?;
211 let [major, minor, patch] = parts.as_slice() else {
212 return Err(AcdpError::SchemaViolation(format!(
213 "capabilities.acdp_version '{}' is not a plain MAJOR.MINOR.PATCH version",
214 self.caps.acdp_version
215 )));
216 };
217 if (*major, *minor, *patch) < (0, 2, 0) {
218 return Err(AcdpError::SchemaViolation(format!(
219 "acdp-registry-receipts requires capabilities.acdp_version >= 0.2.0, got '{}'",
220 self.caps.acdp_version
221 )));
222 }
223 let profile = acdp_types::profile::Profile::RegistryReceipts.as_str();
224 if !self.caps.profiles.iter().any(|p| p == profile) {
225 self.caps.profiles.push(profile.to_string());
226 }
227 self.receipt_signer = Some(signer);
228 Ok(self)
229 }
230
231 pub fn store(&self) -> &S {
234 &self.store
235 }
236
237 pub fn capabilities(&self) -> &CapabilitiesDocument {
239 &self.caps
240 }
241
242 #[cfg(feature = "client")]
258 pub async fn publish_verified(
259 &self,
260 req: &PublishRequest,
261 idempotency_key: Option<&str>,
262 resolver: &acdp_did::WebResolver,
263 ) -> Result<PublishResponse, AcdpError> {
264 self.publish_verified_in_tenant(req, idempotency_key, resolver, None)
265 .await
266 }
267
268 #[cfg(feature = "client")]
274 pub async fn publish_verified_in_tenant(
275 &self,
276 req: &PublishRequest,
277 idempotency_key: Option<&str>,
278 resolver: &acdp_did::WebResolver,
279 tenant: Option<&str>,
280 ) -> Result<PublishResponse, AcdpError> {
281 self.rate_limiter.check_publish(&req.agent_id)?;
283
284 let raw_bytes = serde_json::to_vec(req)?.len();
285 let validator = PublishValidator::for_authority(&self.caps, &self.authority);
286 let _validated = validator.validate_post_schema(req, raw_bytes)?;
287
288 acdp_verify::verify_publish_request_signature(req, resolver).await?;
290
291 let fingerprint = if self.receipt_signer.is_some() {
295 Some(producer_key_fingerprint(req, resolver).await?)
296 } else {
297 None
298 };
299
300 self.commit_via_store(req, idempotency_key, tenant, fingerprint)
308 }
309
310 pub fn publish_verified_did_key(
324 &self,
325 req: &PublishRequest,
326 idempotency_key: Option<&str>,
327 ) -> Result<PublishResponse, AcdpError> {
328 self.publish_verified_did_key_in_tenant(req, idempotency_key, None)
329 }
330
331 pub fn publish_verified_did_key_in_tenant(
337 &self,
338 req: &PublishRequest,
339 idempotency_key: Option<&str>,
340 tenant: Option<&str>,
341 ) -> Result<PublishResponse, AcdpError> {
342 self.rate_limiter.check_publish(&req.agent_id)?;
343
344 let raw_bytes = serde_json::to_vec(req)?.len();
345 let validator = PublishValidator::for_authority(&self.caps, &self.authority);
346 let _validated = validator.validate_post_schema(req, raw_bytes)?;
347
348 acdp_verify::verify_publish_request_signature_offline(req)?;
350
351 let fingerprint = if self.receipt_signer.is_some() {
354 let material = acdp_did::key::resolve_did_key(req.agent_id.as_str())?;
355 Some(acdp_crypto::fingerprint::fingerprint_did_key_material(
356 &material,
357 )?)
358 } else {
359 None
360 };
361
362 self.commit_via_store(req, idempotency_key, tenant, fingerprint)
363 }
364
365 #[doc(hidden)]
372 pub fn publish_unverified_for_tests(
373 &self,
374 req: &PublishRequest,
375 ) -> Result<PublishResponse, AcdpError> {
376 self.rate_limiter.check_publish(&req.agent_id)?;
380
381 if self.receipt_signer.is_some() {
387 return Err(AcdpError::SchemaViolation(
388 "publish_unverified_for_tests is unavailable on a receipts-advertising \
389 registry (RFC-ACDP-0010 §7: no degraded mode); use publish_verified or \
390 publish_verified_did_key"
391 .into(),
392 ));
393 }
394 let raw_bytes = serde_json::to_vec(req)?.len();
395 let validator = PublishValidator::for_authority(&self.caps, &self.authority);
396 let _validated = validator.validate_post_schema(req, raw_bytes)?;
397 self.commit_via_store(req, None, None, None)
398 }
399
400 fn commit_via_store(
405 &self,
406 req: &PublishRequest,
407 idempotency_key: Option<&str>,
408 tenant: Option<&str>,
409 producer_key_fingerprint: Option<String>,
410 ) -> Result<PublishResponse, AcdpError> {
411 let idempotency = if self.caps.supports_idempotency_key {
412 idempotency_key.map(|key| crate::registry::store::PendingIdempotencyCommit {
413 key,
414 ttl: chrono::Duration::seconds(
415 self.caps
416 .limits
417 .idempotency_key_ttl_seconds
418 .unwrap_or(86_400) as i64,
419 ),
420 })
421 } else {
422 None
423 };
424 #[allow(clippy::type_complexity)]
427 let minter: Option<
428 Box<dyn Fn(&Body) -> Result<serde_json::Value, AcdpError> + Send + Sync>,
429 > = match (&self.receipt_signer, producer_key_fingerprint) {
430 (Some(signer), Some(fp)) => Some(Box::new(move |body: &Body| {
431 let receipt = signer.mint(
432 &body.ctx_id,
433 &body.lineage_id,
434 &body.origin_registry,
435 body.created_at,
436 &body.content_hash,
437 &fp,
438 )?;
439 serde_json::to_value(receipt).map_err(AcdpError::from)
440 })),
441 _ => None,
442 };
443 let minted_expected = minter.is_some();
444 let outcome = self
445 .store
446 .commit_publish(crate::registry::store::PublishCommit {
447 req,
448 authority: &self.authority,
449 idempotency,
450 tenant,
451 receipt_minter: minter.as_deref(),
452 })?;
453 let (response, replayed) = match outcome {
454 crate::registry::store::PublishCommitOutcome::Inserted(r) => (r, false),
455 crate::registry::store::PublishCommitOutcome::IdempotentReplay(r) => (r, true),
456 };
457 if minted_expected && !replayed && response.registry_receipt.is_none() {
471 return Err(AcdpError::RegistryInternal(
472 "receipt signer is configured but the store returned no receipt — \
473 the RegistryStore implementation must invoke PublishCommit::receipt_minter \
474 inside its commit (RFC-ACDP-0010 §7: no degraded mode)"
475 .into(),
476 ));
477 }
478 Ok(response)
479 }
480
481 pub fn retrieve(
494 &self,
495 ctx_id: &CtxId,
496 requester: Option<&AgentDid>,
497 ) -> Result<Option<FullContext>, AcdpError> {
498 let Some(ctx) = self.store.get(ctx_id)? else {
499 return Ok(None);
500 };
501 if !can_retrieve(&ctx.body, requester, &self.caps) {
502 return Ok(None);
503 }
504 Ok(Some(ctx))
505 }
506
507 pub fn retrieve_body(
509 &self,
510 ctx_id: &CtxId,
511 requester: Option<&AgentDid>,
512 ) -> Result<Option<Body>, AcdpError> {
513 Ok(self.retrieve(ctx_id, requester)?.map(|c| c.body))
514 }
515
516 pub fn lineage(
523 &self,
524 lineage_id: &LineageId,
525 requester: Option<&AgentDid>,
526 ) -> Result<Vec<FullContext>, AcdpError> {
527 let all = self.store.lineage(lineage_id)?;
528 Ok(all
529 .into_iter()
530 .filter(|ctx| can_retrieve(&ctx.body, requester, &self.caps))
531 .collect())
532 }
533
534 pub fn current(
541 &self,
542 lineage_id: &LineageId,
543 requester: Option<&AgentDid>,
544 ) -> Result<Option<FullContext>, AcdpError> {
545 let all = self.store.lineage(lineage_id)?;
546 for ctx in all.into_iter().rev() {
552 if !matches!(ctx.registry_state.status, Status::Superseded)
553 && can_retrieve(&ctx.body, requester, &self.caps)
554 {
555 return Ok(Some(ctx));
556 }
557 }
558 Ok(None)
559 }
560
561 pub fn search(
574 &self,
575 params: &SearchParams,
576 requester: Option<&AgentDid>,
577 ) -> Result<SearchResponse, AcdpError> {
578 if requester.is_none() && !self.caps.anonymous_public_reads {
583 return Err(AcdpError::NotAuthorized(
584 "anonymous search requires authentication \
585 (registry caps: anonymous_public_reads=false)"
586 .into(),
587 ));
588 }
589 self.store
594 .search(params, requester, self.caps.anonymous_public_reads)
595 }
596}
597
598pub(crate) fn can_retrieve(
600 body: &Body,
601 requester: Option<&AgentDid>,
602 caps: &CapabilitiesDocument,
603) -> bool {
604 match body.visibility {
605 Visibility::Public => caps.anonymous_public_reads || requester.is_some(),
606 Visibility::Restricted | Visibility::Private => match requester {
607 None => false,
608 Some(r) => {
609 r == &body.agent_id
610 || body
611 .audience
612 .as_deref()
613 .is_some_and(|a| a.iter().any(|d| d == r))
614 }
615 },
616 }
617}
618
619#[cfg(feature = "client")]
629async fn producer_key_fingerprint(
630 req: &PublishRequest,
631 resolver: &acdp_did::WebResolver,
632) -> Result<String, AcdpError> {
633 acdp_crypto::fingerprint::fingerprint_for_key_id(
634 &req.signature.key_id,
635 &req.signature.algorithm,
636 resolver,
637 )
638 .await
639}
640
641#[cfg(test)]
642mod tests {
643 use super::*;
644 use crate::registry::store::InMemoryStore;
645 use acdp_crypto::SigningKey;
646 use acdp_producer::Producer;
647 use acdp_types::capabilities::Limits;
648 use acdp_types::primitives::{AgentDid, ContextType, Visibility};
649
650 fn caps() -> CapabilitiesDocument {
651 CapabilitiesDocument {
652 acdp_version: "0.1.0".into(),
653 registry_did: "did:web:registry.example.com".into(),
654 supported_signature_algorithms: vec!["ed25519".into()],
655 supported_did_methods: vec!["did:web".into()],
656 profiles: vec!["acdp-registry-core".into()],
657 limits: Limits {
658 max_payload_bytes: 1_048_576,
659 max_embedded_bytes: 65_536,
660 idempotency_key_ttl_seconds: None,
661 },
662 read_authentication_methods: vec![],
663 anonymous_public_reads: true,
664 supports_idempotency_key: false,
665 extensions: Default::default(),
666 }
667 }
668
669 fn producer() -> Producer {
670 Producer::new(
671 SigningKey::from_bytes(&[1u8; 32]),
672 AgentDid::new("did:web:agents.example.com:test"),
673 "did:web:agents.example.com:test#key-1",
674 )
675 }
676
677 #[test]
678 fn publish_v1_then_retrieve() {
679 let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
680 let p = producer();
681 let req = p
682 .publish_request()
683 .title("v1")
684 .context_type(ContextType::DataSnapshot)
685 .visibility(Visibility::Public)
686 .build()
687 .unwrap();
688 let resp = server.publish_unverified_for_tests(&req).unwrap();
689 assert_eq!(resp.version, 1);
690 let ctx = server.retrieve(&resp.ctx_id, None).unwrap().unwrap();
691 assert_eq!(ctx.body.title, "v1");
692 let lineage = server.lineage(&resp.lineage_id, None).unwrap();
694 assert_eq!(lineage.len(), 1);
695 let cur = server.current(&resp.lineage_id, None).unwrap().unwrap();
697 assert_eq!(cur.body.ctx_id, resp.ctx_id);
698 }
699
700 #[test]
701 fn supersession_marks_predecessor_and_returns_v2() {
702 let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
703 let p = producer();
704 let v1_req = p
705 .publish_request()
706 .title("v1")
707 .context_type(ContextType::DataSnapshot)
708 .visibility(Visibility::Public)
709 .build()
710 .unwrap();
711 let v1 = server.publish_unverified_for_tests(&v1_req).unwrap();
712
713 let v2_req = p
714 .supersede(v1.ctx_id.clone())
715 .version(2)
716 .title("v2")
717 .context_type(ContextType::DataSnapshot)
718 .visibility(Visibility::Public)
719 .build()
720 .unwrap();
721 let v2 = server.publish_unverified_for_tests(&v2_req).unwrap();
722 assert_eq!(v2.version, 2);
723 let v1_ctx = server.retrieve(&v1.ctx_id, None).unwrap().unwrap();
725 assert!(matches!(
726 v1_ctx.registry_state.status,
727 acdp_types::Status::Superseded
728 ));
729 assert_eq!(v1.lineage_id, v2.lineage_id);
731 let cur = server.current(&v1.lineage_id, None).unwrap().unwrap();
733 assert_eq!(cur.body.ctx_id, v2.ctx_id);
734 }
735
736 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
744 async fn concurrent_supersession_exactly_one_succeeds() {
745 use std::sync::Arc;
746 let server = Arc::new(RegistryServer::new(
747 InMemoryStore::new(),
748 caps(),
749 "registry.example.com",
750 ));
751 let p = producer();
752 let v1_req = p
753 .publish_request()
754 .title("v1")
755 .context_type(ContextType::DataSnapshot)
756 .visibility(Visibility::Public)
757 .build()
758 .unwrap();
759 let v1 = server.publish_unverified_for_tests(&v1_req).unwrap();
760
761 let v2a_req = p
766 .supersede(v1.ctx_id.clone())
767 .version(2)
768 .title("v2-A")
769 .context_type(ContextType::DataSnapshot)
770 .visibility(Visibility::Public)
771 .build()
772 .unwrap();
773 let v2b_req = p
774 .supersede(v1.ctx_id.clone())
775 .version(2)
776 .title("v2-B")
777 .context_type(ContextType::DataSnapshot)
778 .visibility(Visibility::Public)
779 .build()
780 .unwrap();
781
782 let s1 = Arc::clone(&server);
783 let s2 = Arc::clone(&server);
784 let h1 = tokio::task::spawn_blocking(move || s1.publish_unverified_for_tests(&v2a_req));
785 let h2 = tokio::task::spawn_blocking(move || s2.publish_unverified_for_tests(&v2b_req));
786 let (r1, r2) = (h1.await.unwrap(), h2.await.unwrap());
787
788 let outcomes = [r1, r2];
789 let successes = outcomes.iter().filter(|r| r.is_ok()).count();
790 let failures = outcomes.iter().filter(|r| r.is_err()).count();
791 assert_eq!(
792 successes, 1,
793 "exactly one concurrent supersession MUST succeed; got {successes} successes / {failures} failures"
794 );
795 assert_eq!(failures, 1);
796 for r in &outcomes {
799 if let Err(e) = r {
800 match e {
801 AcdpError::SupersededTarget { reason, .. } => assert_eq!(
802 *reason,
803 acdp_primitives::error::SupersessionReason::AlreadySuperseded,
804 "concurrent loser MUST be AlreadySuperseded"
805 ),
806 other => panic!("concurrent loser had wrong error: {other:?}"),
807 }
808 }
809 }
810 }
811
812 #[test]
813 fn hostile_supersession_by_non_owner_rejected_predecessor_unchanged() {
814 let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
819 let victim = producer_for(7, "did:web:agents.example.com:victim");
820 let v1_req = victim
821 .publish_request()
822 .title("v1")
823 .context_type(ContextType::DataSnapshot)
824 .visibility(Visibility::Public)
825 .build()
826 .unwrap();
827 let v1 = server.publish_unverified_for_tests(&v1_req).unwrap();
828
829 let attacker = producer_for(9, "did:web:evil.example.com:attacker");
832 let v2_req = attacker
833 .supersede(v1.ctx_id.clone())
834 .version(2)
835 .title("hijacked")
836 .context_type(ContextType::DataSnapshot)
837 .visibility(Visibility::Public)
838 .build()
839 .unwrap();
840 let err = server.publish_unverified_for_tests(&v2_req).unwrap_err();
841 match err {
843 AcdpError::SupersededTarget { reason, .. } => {
844 assert_eq!(reason, acdp_primitives::error::SupersessionReason::NotFound);
845 }
846 other => panic!("expected uniform SupersededTarget::NotFound, got {other:?}"),
847 }
848 let cur = server.current(&v1.lineage_id, None).unwrap().unwrap();
850 assert_eq!(cur.body.ctx_id, v1.ctx_id);
851 assert_eq!(cur.body.title, "v1");
852 assert_eq!(
853 cur.registry_state.status,
854 acdp_types::primitives::Status::Active
855 );
856 }
857
858 #[test]
859 fn owner_supersession_still_succeeds_after_ownership_check() {
860 let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
861 let p = producer();
862 let v1_req = p
863 .publish_request()
864 .title("v1")
865 .context_type(ContextType::DataSnapshot)
866 .visibility(Visibility::Public)
867 .build()
868 .unwrap();
869 let v1 = server.publish_unverified_for_tests(&v1_req).unwrap();
870 let v2_req = p
871 .supersede(v1.ctx_id.clone())
872 .version(2)
873 .title("v2")
874 .context_type(ContextType::DataSnapshot)
875 .visibility(Visibility::Public)
876 .build()
877 .unwrap();
878 let v2 = server.publish_unverified_for_tests(&v2_req).unwrap();
879 assert_eq!(v2.version, 2);
880 let cur = server.current(&v1.lineage_id, None).unwrap().unwrap();
881 assert_eq!(cur.body.ctx_id, v2.ctx_id);
882 }
883
884 #[test]
885 fn supersession_with_unknown_target_rejected_as_not_found() {
886 let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
887 let p = producer();
888 let phantom =
889 CtxId("acdp://registry.example.com/12345678-1234-4321-8123-deadbeefcafe".into());
890 let req = p
891 .supersede(phantom)
892 .version(2)
893 .title("v2-orphan")
894 .context_type(ContextType::DataSnapshot)
895 .visibility(Visibility::Public)
896 .build()
897 .unwrap();
898 let err = server.publish_unverified_for_tests(&req).unwrap_err();
899 match err {
900 AcdpError::SupersededTarget { reason, .. } => {
901 assert_eq!(reason, acdp_primitives::error::SupersessionReason::NotFound);
902 }
903 other => panic!("expected SupersededTarget::NotFound, got {other:?}"),
904 }
905 }
906
907 #[test]
908 fn version_mismatch_rejected() {
909 let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
910 let p = producer();
911 let v1_req = p
912 .publish_request()
913 .title("v1")
914 .context_type(ContextType::DataSnapshot)
915 .visibility(Visibility::Public)
916 .build()
917 .unwrap();
918 let v1 = server.publish_unverified_for_tests(&v1_req).unwrap();
919 let v3_req = p
921 .supersede(v1.ctx_id.clone())
922 .version(3)
923 .title("v3-skipped")
924 .context_type(ContextType::DataSnapshot)
925 .visibility(Visibility::Public)
926 .build()
927 .unwrap();
928 let err = server.publish_unverified_for_tests(&v3_req).unwrap_err();
929 match err {
930 AcdpError::SupersededTarget { reason, .. } => {
931 assert_eq!(
932 reason,
933 acdp_primitives::error::SupersessionReason::VersionMismatch
934 );
935 }
936 other => panic!("expected VersionMismatch, got {other:?}"),
937 }
938 }
939
940 #[test]
941 fn search_finds_published_context() {
942 let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
943 let p = producer();
944 let req = p
945 .publish_request()
946 .title("Q1 portfolio risk")
947 .context_type(ContextType::DataSnapshot)
948 .visibility(Visibility::Public)
949 .build()
950 .unwrap();
951 server.publish_unverified_for_tests(&req).unwrap();
952 let resp = server
953 .search(
954 &SearchParams {
955 q: Some("portfolio".into()),
956 ..Default::default()
957 },
958 None,
959 )
960 .unwrap();
961 assert_eq!(resp.matches.len(), 1);
962 assert_eq!(resp.matches[0].title, "Q1 portfolio risk");
963 }
964
965 #[test]
971 fn lineage_filters_restricted_for_stranger() {
972 let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
973 let p = producer();
974 let audience = AgentDid::new("did:web:audience.example.com:reader");
975 let req = p
976 .publish_request()
977 .title("restricted v1")
978 .context_type(ContextType::DataSnapshot)
979 .visibility(Visibility::Restricted)
980 .audience(vec![audience.clone()])
981 .build()
982 .unwrap();
983 let resp = server.publish_unverified_for_tests(&req).unwrap();
984
985 let stranger = AgentDid::new("did:web:other.example.com:reader");
986 let stranger_view = server.lineage(&resp.lineage_id, Some(&stranger)).unwrap();
987 assert!(
988 stranger_view.is_empty(),
989 "stranger MUST NOT see restricted bodies via lineage(); got {} entries",
990 stranger_view.len()
991 );
992
993 let audience_view = server.lineage(&resp.lineage_id, Some(&audience)).unwrap();
994 assert_eq!(
995 audience_view.len(),
996 1,
997 "audience member MUST see the restricted body via lineage()"
998 );
999 }
1000
1001 #[test]
1004 fn current_filters_private_for_stranger() {
1005 let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
1006 let p = producer();
1007 let req = p
1008 .publish_request()
1009 .title("private v1")
1010 .context_type(ContextType::DataSnapshot)
1011 .visibility(Visibility::Private)
1012 .build()
1013 .unwrap();
1014 let resp = server.publish_unverified_for_tests(&req).unwrap();
1015
1016 let stranger = AgentDid::new("did:web:other.example.com:reader");
1017 assert!(
1018 server
1019 .current(&resp.lineage_id, Some(&stranger))
1020 .unwrap()
1021 .is_none(),
1022 "stranger MUST NOT see private contexts via current()"
1023 );
1024
1025 let producer_did = AgentDid::new("did:web:agents.example.com:test");
1026 assert!(
1027 server
1028 .current(&resp.lineage_id, Some(&producer_did))
1029 .unwrap()
1030 .is_some(),
1031 "producer MUST see private contexts via current()"
1032 );
1033 }
1034
1035 #[test]
1046 fn current_returns_none_when_all_superseded() {
1047 use crate::registry::store::RegistryStore;
1048 let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
1049 let p = producer();
1050 let req = p
1051 .publish_request()
1052 .title("v1")
1053 .context_type(ContextType::DataSnapshot)
1054 .visibility(Visibility::Public)
1055 .build()
1056 .unwrap();
1057 let resp = server.publish_unverified_for_tests(&req).unwrap();
1058 server.store().mark_superseded(&resp.ctx_id).unwrap();
1060
1061 let cur = server.current(&resp.lineage_id, None).unwrap();
1062 assert!(
1063 cur.is_none(),
1064 "all-superseded lineage MUST resolve to None per RFC-ACDP-0004 §5; got {cur:?}"
1065 );
1066 }
1067
1068 #[test]
1076 fn search_suppresses_public_when_anonymous_public_reads_false() {
1077 let mut c = caps();
1078 c.anonymous_public_reads = false;
1079 let server = RegistryServer::new(InMemoryStore::new(), c, "registry.example.com");
1080 let p = producer();
1081 let req = p
1082 .publish_request()
1083 .title("public-but-flag-off")
1084 .context_type(ContextType::DataSnapshot)
1085 .visibility(Visibility::Public)
1086 .build()
1087 .unwrap();
1088 server.publish_unverified_for_tests(&req).unwrap();
1089
1090 let err = server
1092 .search(
1093 &SearchParams {
1094 q: Some("public-but-flag-off".into()),
1095 ..Default::default()
1096 },
1097 None,
1098 )
1099 .unwrap_err();
1100 assert!(
1101 matches!(err, AcdpError::NotAuthorized(_)),
1102 "vis-009: anonymous search MUST be NotAuthorized when \
1103 anonymous_public_reads=false; got {err:?}"
1104 );
1105
1106 let stranger = AgentDid::new("did:web:other.example.com:reader");
1109 let authed = server
1110 .search(
1111 &SearchParams {
1112 q: Some("public-but-flag-off".into()),
1113 ..Default::default()
1114 },
1115 Some(&stranger),
1116 )
1117 .unwrap();
1118 assert_eq!(
1119 authed.matches.len(),
1120 1,
1121 "authenticated search MUST see public contexts regardless of anonymous_public_reads"
1122 );
1123 }
1124
1125 #[test]
1128 fn try_new_rejects_did_authority_mismatch() {
1129 let mut c = caps();
1130 c.registry_did = "did:web:other.example.com".into(); let res = RegistryServer::try_new(InMemoryStore::new(), c, "registry.example.com");
1132 match res {
1133 Err(AcdpError::SchemaViolation(msg)) => {
1134 assert!(msg.contains("does not match expected"))
1135 }
1136 Err(other) => panic!("expected SchemaViolation, got {other:?}"),
1137 Ok(_) => panic!("expected Err"),
1138 }
1139 }
1140
1141 #[test]
1142 fn try_new_rejects_caps_missing_ed25519() {
1143 let mut c = caps();
1144 c.supported_signature_algorithms = vec!["ecdsa-p256".into()]; let res = RegistryServer::try_new(InMemoryStore::new(), c, "registry.example.com");
1146 assert!(matches!(res, Err(AcdpError::SchemaViolation(_))));
1147 }
1148
1149 #[test]
1150 fn try_new_accepts_valid_caps() {
1151 RegistryServer::try_new(InMemoryStore::new(), caps(), "registry.example.com").unwrap();
1152 }
1153
1154 #[test]
1157 fn try_new_accepts_valid_dns_authority() {
1158 RegistryServer::try_new(InMemoryStore::new(), caps(), "registry.example.com").unwrap();
1159 }
1160
1161 #[test]
1162 fn try_new_rejects_host_port_authority() {
1163 let res = RegistryServer::try_new(InMemoryStore::new(), caps(), "localhost:8443");
1166 assert!(matches!(res, Err(AcdpError::SchemaViolation(_))));
1167 }
1168
1169 #[test]
1170 fn try_new_rejects_uppercase_authority() {
1171 let res = RegistryServer::try_new(InMemoryStore::new(), caps(), "Registry.Example.Com");
1172 assert!(matches!(res, Err(AcdpError::SchemaViolation(_))));
1173 }
1174
1175 #[test]
1176 fn try_new_rejects_url_form_authority() {
1177 let res =
1178 RegistryServer::try_new(InMemoryStore::new(), caps(), "https://registry.example.com");
1179 assert!(matches!(res, Err(AcdpError::SchemaViolation(_))));
1180 }
1181
1182 #[test]
1183 fn try_new_for_test_accepts_host_port() {
1184 let mut c = caps();
1187 c.registry_did = acdp_did::authority_to_did_web("localhost:8443");
1188 RegistryServer::try_new_for_test_authority(InMemoryStore::new(), c, "localhost:8443")
1189 .unwrap();
1190 }
1191
1192 fn producer_for(seed: u8, did: &str) -> Producer {
1195 Producer::new(
1196 SigningKey::from_bytes(&[seed; 32]),
1197 AgentDid::new(did),
1198 format!("{did}#key-1"),
1199 )
1200 }
1201
1202 #[test]
1203 fn retrieve_restricted_blocks_stranger_returns_none() {
1204 let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
1205 let owner = AgentDid::new("did:web:agents.example.com:owner");
1206 let audience_member = AgentDid::new("did:web:agents.example.com:friend");
1207 let p = producer_for(2, owner.as_str());
1208 let req = p
1209 .publish_request()
1210 .title("restricted")
1211 .context_type(ContextType::DataSnapshot)
1212 .visibility(Visibility::Restricted)
1213 .audience(vec![audience_member.clone()])
1214 .build()
1215 .unwrap();
1216 let resp = server.publish_unverified_for_tests(&req).unwrap();
1217 let stranger = AgentDid::new("did:web:agents.example.com:stranger");
1218
1219 assert!(server.retrieve(&resp.ctx_id, None).unwrap().is_none());
1220 assert!(server
1221 .retrieve(&resp.ctx_id, Some(&stranger))
1222 .unwrap()
1223 .is_none());
1224 assert!(server
1225 .retrieve(&resp.ctx_id, Some(&owner))
1226 .unwrap()
1227 .is_some());
1228 assert!(server
1229 .retrieve(&resp.ctx_id, Some(&audience_member))
1230 .unwrap()
1231 .is_some());
1232 }
1233
1234 #[test]
1235 fn search_restricted_filters_strangers() {
1236 let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
1237 let owner = AgentDid::new("did:web:agents.example.com:owner");
1238 let p = producer_for(3, owner.as_str());
1239 let req = p
1240 .publish_request()
1241 .title("hush hush")
1242 .context_type(ContextType::DataSnapshot)
1243 .visibility(Visibility::Restricted)
1244 .audience(vec![AgentDid::new("did:web:agents.example.com:friend")])
1245 .build()
1246 .unwrap();
1247 server.publish_unverified_for_tests(&req).unwrap();
1248
1249 let stranger = AgentDid::new("did:web:agents.example.com:stranger");
1250 let r_anon = server.search(&SearchParams::default(), None).unwrap();
1251 assert!(
1252 r_anon.matches.is_empty(),
1253 "anonymous must not see restricted"
1254 );
1255 let r_stranger = server
1256 .search(&SearchParams::default(), Some(&stranger))
1257 .unwrap();
1258 assert!(r_stranger.matches.is_empty());
1259 let r_owner = server
1260 .search(&SearchParams::default(), Some(&owner))
1261 .unwrap();
1262 assert_eq!(r_owner.matches.len(), 1);
1263 }
1264
1265 #[test]
1269 fn search_private_visible_only_to_producer() {
1270 let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
1271 let owner = AgentDid::new("did:web:agents.example.com:owner");
1272 let audience_member = AgentDid::new("did:web:agents.example.com:friend");
1273 let p = producer_for(4, owner.as_str());
1274 let req = p
1275 .publish_request()
1276 .title("private note")
1277 .context_type(ContextType::DataSnapshot)
1278 .visibility(Visibility::Private)
1279 .audience(vec![audience_member.clone()])
1280 .build()
1281 .unwrap();
1282 let resp = server.publish_unverified_for_tests(&req).unwrap();
1283
1284 let r_audience = server
1285 .search(&SearchParams::default(), Some(&audience_member))
1286 .unwrap();
1287 assert!(
1288 r_audience.matches.is_empty(),
1289 "audience must NOT see private in search"
1290 );
1291 let r_owner = server
1292 .search(&SearchParams::default(), Some(&owner))
1293 .unwrap();
1294 assert_eq!(
1295 r_owner.matches.len(),
1296 1,
1297 "owner sees their own private context"
1298 );
1299
1300 assert!(server
1302 .retrieve(&resp.ctx_id, Some(&audience_member))
1303 .unwrap()
1304 .is_some());
1305 }
1306
1307 #[cfg(feature = "client")]
1319 #[tokio::test]
1320 async fn publish_verified_rejects_non_did_web_key_id() {
1321 let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
1322 let p = producer();
1323 let mut req = p
1324 .publish_request()
1325 .title("v1")
1326 .context_type(ContextType::DataSnapshot)
1327 .visibility(Visibility::Public)
1328 .build()
1329 .unwrap();
1330 let did_key = acdp_did::key::did_key_from_ed25519(
1337 &SigningKey::from_bytes(&[9u8; 32]).verifying_key_bytes(),
1338 );
1339 req.signature.key_id = acdp_did::key::did_key_url(&did_key).unwrap();
1340 let resolver = acdp_did::WebResolver::new();
1341 let err = server
1342 .publish_verified(&req, None, &resolver)
1343 .await
1344 .unwrap_err();
1345 match err {
1346 AcdpError::KeyNotAuthorized(msg) => assert!(msg.contains("did:web")),
1347 other => panic!("expected KeyNotAuthorized for non-did:web, got {other:?}"),
1348 }
1349 }
1350
1351 #[cfg(feature = "client")]
1352 #[tokio::test]
1353 async fn publish_verified_rejects_agent_id_keyid_mismatch() {
1354 let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
1355 let p = producer();
1356 let mut req = p
1357 .publish_request()
1358 .title("v1")
1359 .context_type(ContextType::DataSnapshot)
1360 .visibility(Visibility::Public)
1361 .build()
1362 .unwrap();
1363 req.signature.key_id = "did:web:other.example.com:agent#key-1".into();
1364 let resolver = acdp_did::WebResolver::new();
1365 let err = server
1366 .publish_verified(&req, None, &resolver)
1367 .await
1368 .unwrap_err();
1369 match err {
1370 AcdpError::KeyNotAuthorized(msg) => assert!(msg.contains("agent_id")),
1371 other => panic!("expected KeyNotAuthorized for agent_id mismatch, got {other:?}"),
1372 }
1373 }
1374
1375 #[cfg(feature = "client")]
1376 #[tokio::test]
1377 async fn publish_verified_rejects_keyid_without_fragment() {
1378 let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
1379 let p = producer();
1380 let mut req = p
1381 .publish_request()
1382 .title("v1")
1383 .context_type(ContextType::DataSnapshot)
1384 .visibility(Visibility::Public)
1385 .build()
1386 .unwrap();
1387 req.signature.key_id = "did:web:agents.example.com:test".into(); let resolver = acdp_did::WebResolver::new();
1389 let err = server
1390 .publish_verified(&req, None, &resolver)
1391 .await
1392 .unwrap_err();
1393 assert!(
1396 matches!(
1397 err,
1398 AcdpError::SchemaViolation(_) | AcdpError::KeyResolution(_)
1399 ),
1400 "expected fragment-rejection error, got {err:?}"
1401 );
1402 }
1403
1404 fn caps_with_idempotency() -> CapabilitiesDocument {
1407 let mut c = caps();
1408 c.supports_idempotency_key = true;
1409 c.limits.idempotency_key_ttl_seconds = Some(86_400);
1410 c
1411 }
1412
1413 #[test]
1414 fn idempotency_same_hash_returns_original_response() {
1415 let server = RegistryServer::new(
1416 InMemoryStore::new(),
1417 caps_with_idempotency(),
1418 "registry.example.com",
1419 );
1420 let p = producer();
1421 let req = p
1422 .publish_request()
1423 .title("once")
1424 .context_type(ContextType::DataSnapshot)
1425 .visibility(Visibility::Public)
1426 .build()
1427 .unwrap();
1428 let first = server.publish_unverified_for_tests(&req).unwrap();
1430 let ttl = caps_with_idempotency()
1434 .limits
1435 .idempotency_key_ttl_seconds
1436 .unwrap() as i64;
1437 server
1438 .store()
1439 .idempotency_record(
1440 &req.agent_id,
1441 "k-001",
1442 &req.content_hash,
1443 &first,
1444 chrono::Utc::now() + chrono::Duration::seconds(ttl),
1445 )
1446 .unwrap();
1447 let prior = server
1448 .store()
1449 .idempotency_lookup(&req.agent_id, "k-001")
1450 .unwrap()
1451 .unwrap();
1452 assert_eq!(prior.content_hash, req.content_hash);
1453 assert_eq!(prior.response.ctx_id, first.ctx_id);
1454 }
1455
1456 #[test]
1457 fn idempotency_evicts_after_ttl() {
1458 let store = InMemoryStore::new();
1459 let agent = AgentDid::new("did:web:agents.example.com:test");
1460 let resp = PublishResponse {
1461 registry_receipt: None,
1462 ctx_id: acdp_types::CtxId("acdp://r/12345678-1234-4321-8123-000000000099".into()),
1463 lineage_id: acdp_types::LineageId(
1464 "lin:sha256:9999999999999999999999999999999999999999999999999999999999999999"
1465 .into(),
1466 ),
1467 version: 1,
1468 created_at: chrono::Utc::now(),
1469 status: Status::Active,
1470 };
1471 let past = chrono::Utc::now() - chrono::Duration::seconds(1);
1473 store
1474 .idempotency_record(
1475 &agent,
1476 "expired",
1477 &acdp_types::ContentHash("sha256:0".into()),
1478 &resp,
1479 past,
1480 )
1481 .unwrap();
1482 let prior = store.idempotency_lookup(&agent, "expired").unwrap();
1484 assert!(
1485 prior.is_none(),
1486 "lazy TTL eviction should drop expired record"
1487 );
1488 }
1489
1490 struct AlwaysDeny;
1493 impl crate::registry::RateLimiter for AlwaysDeny {
1494 fn check_publish(&self, agent_id: &AgentDid) -> Result<(), AcdpError> {
1495 Err(AcdpError::RateLimited(format!("blocked: {agent_id}")))
1496 }
1497 }
1498
1499 #[test]
1500 fn rate_limiter_blocks_publish_before_persist() {
1501 let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com")
1502 .with_rate_limiter(AlwaysDeny);
1503 let p = producer();
1504 let req = p
1505 .publish_request()
1506 .title("blocked")
1507 .context_type(ContextType::DataSnapshot)
1508 .visibility(Visibility::Public)
1509 .build()
1510 .unwrap();
1511 let err = server.publish_unverified_for_tests(&req).unwrap_err();
1512 assert!(matches!(err, AcdpError::RateLimited(_)));
1513 let resp = server.search(&SearchParams::default(), None).unwrap();
1515 assert!(
1516 resp.matches.is_empty(),
1517 "rate-limited publish must not persist"
1518 );
1519 }
1520
1521 #[test]
1522 fn created_at_is_ms_truncated() {
1523 let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
1524 let p = producer();
1525 let req = p
1526 .publish_request()
1527 .title("ms")
1528 .context_type(ContextType::DataSnapshot)
1529 .visibility(Visibility::Public)
1530 .build()
1531 .unwrap();
1532 let resp = server.publish_unverified_for_tests(&req).unwrap();
1533 assert_eq!(
1535 resp.created_at.timestamp_subsec_nanos() % 1_000_000,
1536 0,
1537 "created_at must be millisecond-truncated per RFC-ACDP-0001 §5.3"
1538 );
1539 }
1540
1541 fn did_key_request() -> acdp_types::publish::PublishRequest {
1544 let p = Producer::new_did_key(SigningKey::from_bytes(&[7u8; 32]));
1545 p.publish_request()
1546 .title("did:key publish")
1547 .context_type(ContextType::DataSnapshot)
1548 .visibility(Visibility::Public)
1549 .build()
1550 .unwrap()
1551 }
1552
1553 #[test]
1557 fn did_key_publish_rejected_when_not_advertised() {
1558 let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
1559 let err = server
1560 .publish_verified_did_key(&did_key_request(), None)
1561 .unwrap_err();
1562 assert!(
1563 matches!(err, AcdpError::KeyResolution(ref m) if m.contains("supported_did_methods")),
1564 "got {err:?}"
1565 );
1566 }
1567
1568 #[test]
1572 fn did_key_publish_verified_end_to_end() {
1573 let mut c = caps();
1574 c.supported_did_methods.push("did:key".into());
1575 let server = RegistryServer::new(InMemoryStore::new(), c, "registry.example.com");
1576 let req = did_key_request();
1577 let resp = server.publish_verified_did_key(&req, None).unwrap();
1578 assert_eq!(resp.ctx_id.authority(), "registry.example.com");
1579
1580 let mut tampered = did_key_request();
1582 tampered.title = "tampered".into();
1583 let err = server
1584 .publish_verified_did_key(&tampered, None)
1585 .unwrap_err();
1586 assert!(matches!(err, AcdpError::HashMismatch { .. }), "got {err:?}");
1587 }
1588
1589 #[test]
1595 fn receiptless_idempotent_replay_survives_enabling_receipts() {
1596 let mut c = caps();
1597 c.acdp_version = "0.2.0".into();
1598 c.supported_did_methods.push("did:key".into());
1599 c.supports_idempotency_key = true;
1600 c.limits.idempotency_key_ttl_seconds = Some(86_400);
1601 let server = RegistryServer::new(InMemoryStore::new(), c, "registry.example.com")
1602 .with_receipt_signer(
1603 acdp_types::receipt::ReceiptSigner::new(
1604 SigningKey::from_bytes(&[0x11u8; 32]),
1605 "did:web:registry.example.com",
1606 "did:web:registry.example.com#receipt-key-1",
1607 )
1608 .unwrap(),
1609 )
1610 .unwrap();
1611
1612 let req = did_key_request();
1615 let pre_receipts_response = acdp_types::publish::PublishResponse {
1616 ctx_id: CtxId(format!(
1617 "acdp://registry.example.com/{}",
1618 uuid::Uuid::new_v4()
1619 )),
1620 lineage_id: acdp_crypto::derive_lineage_id(&CtxId(
1621 "acdp://registry.example.com/v1".into(),
1622 )),
1623 version: 1,
1624 created_at: acdp_primitives::time::trunc_ms(chrono::Utc::now()),
1625 status: Status::Active,
1626 registry_receipt: None,
1627 };
1628 server
1629 .store()
1630 .idempotency_record(
1631 &req.agent_id,
1632 "pre-receipts-key",
1633 &req.content_hash,
1634 &pre_receipts_response,
1635 chrono::Utc::now() + chrono::Duration::hours(1),
1636 )
1637 .unwrap();
1638
1639 let resp = server
1642 .publish_verified_did_key(&req, Some("pre-receipts-key"))
1643 .expect("replay of a pre-receipts record must succeed");
1644 assert_eq!(resp.ctx_id, pre_receipts_response.ctx_id);
1645 assert!(
1646 resp.registry_receipt.is_none(),
1647 "replay returns the original response verbatim"
1648 );
1649
1650 let p2 = Producer::new_did_key(SigningKey::from_bytes(&[8u8; 32]));
1652 let fresh = p2
1653 .publish_request()
1654 .title("fresh after enabling receipts")
1655 .context_type(ContextType::DataSnapshot)
1656 .visibility(Visibility::Public)
1657 .build()
1658 .unwrap();
1659 let fresh_resp = server.publish_verified_did_key(&fresh, None).unwrap();
1660 assert!(
1661 fresh_resp.registry_receipt.is_some(),
1662 "new inserts on a receipts registry must mint"
1663 );
1664 }
1665
1666 #[test]
1669 fn did_key_publish_path_refuses_did_web() {
1670 let server = RegistryServer::new(InMemoryStore::new(), caps(), "registry.example.com");
1671 let p = producer();
1672 let req = p
1673 .publish_request()
1674 .title("did:web on the offline path")
1675 .context_type(ContextType::DataSnapshot)
1676 .visibility(Visibility::Public)
1677 .build()
1678 .unwrap();
1679 let err = server.publish_verified_did_key(&req, None).unwrap_err();
1680 assert!(
1681 matches!(err, AcdpError::KeyResolution(_)),
1682 "did:web on the offline path must be refused, got {err:?}"
1683 );
1684 }
1685}