1use std::sync::Mutex;
15
16use acdp_primitives::error::AcdpError;
17use acdp_types::{
18 body::{Body, FullContext, RegistryState},
19 primitives::{AgentDid, CtxId, LineageId, Status, Visibility},
20 publish::{PublishRequest, PublishResponse},
21 search::{SearchParams, SearchResponse, SearchResult},
22};
23
24pub trait RegistryStore: Send + Sync {
30 fn put(&self, body: Body) -> Result<(), AcdpError>;
33
34 fn get(&self, ctx_id: &CtxId) -> Result<Option<FullContext>, AcdpError>;
36
37 fn lineage(&self, lineage_id: &LineageId) -> Result<Vec<FullContext>, AcdpError>;
39
40 fn current(&self, lineage_id: &LineageId) -> Result<Option<FullContext>, AcdpError>;
51
52 fn mark_superseded(&self, ctx_id: &CtxId) -> Result<(), AcdpError>;
54
55 fn first_version_ctx_id(&self, lineage_id: &LineageId) -> Result<Option<CtxId>, AcdpError>;
70
71 fn search(
88 &self,
89 params: &SearchParams,
90 requester: Option<&AgentDid>,
91 anonymous_public_reads: bool,
92 ) -> Result<SearchResponse, AcdpError>;
93
94 fn idempotency_lookup(
122 &self,
123 _agent_id: &AgentDid,
124 _key: &str,
125 ) -> Result<Option<IdempotencyRecord>, AcdpError> {
126 Ok(None)
127 }
128
129 fn idempotency_record(
133 &self,
134 _agent_id: &AgentDid,
135 _key: &str,
136 _hash: &acdp_types::primitives::ContentHash,
137 _response: &acdp_types::publish::PublishResponse,
138 _expires_at: chrono::DateTime<chrono::Utc>,
139 ) -> Result<(), AcdpError> {
140 Ok(())
141 }
142
143 fn idempotency_evict_expired(
146 &self,
147 _now: chrono::DateTime<chrono::Utc>,
148 ) -> Result<(), AcdpError> {
149 Ok(())
150 }
151
152 fn commit_publish(&self, commit: PublishCommit<'_>) -> Result<PublishCommitOutcome, AcdpError>;
175}
176
177pub struct PublishCommit<'a> {
186 pub req: &'a PublishRequest,
188 pub authority: &'a str,
191 pub idempotency: Option<PendingIdempotencyCommit<'a>>,
195 pub tenant: Option<&'a str>,
202 #[allow(clippy::type_complexity)]
212 pub receipt_minter:
213 Option<&'a (dyn Fn(&Body) -> Result<serde_json::Value, AcdpError> + Send + Sync)>,
214}
215
216pub struct PendingIdempotencyCommit<'a> {
218 pub key: &'a str,
220 pub ttl: chrono::Duration,
223}
224
225#[derive(Debug)]
227pub enum PublishCommitOutcome {
228 Inserted(PublishResponse),
231 IdempotentReplay(PublishResponse),
234}
235
236#[derive(Debug, Clone)]
239pub struct IdempotencyRecord {
240 pub content_hash: acdp_types::primitives::ContentHash,
243 pub response: acdp_types::publish::PublishResponse,
245 pub expires_at: chrono::DateTime<chrono::Utc>,
247}
248
249#[derive(Default)]
254pub struct InMemoryStore {
255 inner: Mutex<Inner>,
256}
257
258#[derive(Default)]
259struct Inner {
260 by_ctx: std::collections::BTreeMap<String, FullContext>,
263 lineages: std::collections::BTreeMap<String, Vec<String>>,
265 idempotency: std::collections::HashMap<(String, String), IdempotencyRecord>,
267}
268
269impl InMemoryStore {
270 pub fn new() -> Self {
272 Self::default()
273 }
274
275 fn lock(&self) -> std::sync::MutexGuard<'_, Inner> {
276 self.inner.lock().expect("InMemoryStore mutex poisoned")
277 }
278}
279
280pub(crate) fn project_status(
288 stored: &Status,
289 body: &Body,
290 now: chrono::DateTime<chrono::Utc>,
291) -> Status {
292 match stored {
293 Status::Active => match body.expires_at {
294 Some(exp) if exp <= now => Status::Expired,
295 _ => Status::Active,
296 },
297 other => other.clone(),
298 }
299}
300
301pub(crate) fn project_context(
304 mut ctx: FullContext,
305 now: chrono::DateTime<chrono::Utc>,
306) -> FullContext {
307 ctx.registry_state.status = project_status(&ctx.registry_state.status, &ctx.body, now);
308 ctx
309}
310
311fn can_surface_in_search(
324 body: &Body,
325 requester: Option<&AgentDid>,
326 anonymous_public_reads: bool,
327) -> bool {
328 match body.visibility {
329 Visibility::Public => anonymous_public_reads || requester.is_some(),
330 Visibility::Restricted => match requester {
331 None => false,
332 Some(r) => {
333 r == &body.agent_id
334 || body
335 .audience
336 .as_deref()
337 .is_some_and(|a| a.iter().any(|d| d == r))
338 }
339 },
340 Visibility::Private => requester == Some(&body.agent_id),
341 }
342}
343
344impl RegistryStore for InMemoryStore {
345 fn put(&self, body: Body) -> Result<(), AcdpError> {
346 let ctx_id = body.ctx_id.0.clone();
347 let lineage_id = body.lineage_id.0.clone();
348 let ctx = FullContext {
349 body,
350 registry_state: RegistryState {
351 status: Status::Active,
352 extensions: Default::default(),
353 },
354 registry_receipt: None,
355 extensions: Default::default(),
356 };
357 let mut g = self.lock();
358 if g.by_ctx.contains_key(&ctx_id) {
359 return Err(AcdpError::SchemaViolation(format!(
360 "duplicate ctx_id '{ctx_id}' in store"
361 )));
362 }
363 g.by_ctx.insert(ctx_id.clone(), ctx);
364 g.lineages.entry(lineage_id).or_default().push(ctx_id);
365 Ok(())
366 }
367
368 fn get(&self, ctx_id: &CtxId) -> Result<Option<FullContext>, AcdpError> {
369 let now = chrono::Utc::now();
370 Ok(self
371 .lock()
372 .by_ctx
373 .get(ctx_id.as_str())
374 .cloned()
375 .map(|c| project_context(c, now)))
376 }
377
378 fn lineage(&self, lineage_id: &LineageId) -> Result<Vec<FullContext>, AcdpError> {
379 let now = chrono::Utc::now();
380 let g = self.lock();
381 let Some(ids) = g.lineages.get(lineage_id.as_str()) else {
382 return Ok(Vec::new());
383 };
384 Ok(ids
385 .iter()
386 .filter_map(|id| g.by_ctx.get(id).cloned().map(|c| project_context(c, now)))
387 .collect())
388 }
389
390 fn current(&self, lineage_id: &LineageId) -> Result<Option<FullContext>, AcdpError> {
391 let now = chrono::Utc::now();
392 let g = self.lock();
393 let Some(ids) = g.lineages.get(lineage_id.as_str()) else {
394 return Ok(None);
395 };
396 for id in ids.iter().rev() {
408 if let Some(ctx) = g.by_ctx.get(id) {
409 let projected = project_context(ctx.clone(), now);
410 if !matches!(projected.registry_state.status, Status::Superseded) {
411 return Ok(Some(projected));
412 }
413 }
414 }
415 Ok(None)
416 }
417
418 fn mark_superseded(&self, ctx_id: &CtxId) -> Result<(), AcdpError> {
419 let mut g = self.lock();
420 if let Some(ctx) = g.by_ctx.get_mut(ctx_id.as_str()) {
421 ctx.registry_state.status = Status::Superseded;
422 }
423 Ok(())
424 }
425
426 fn first_version_ctx_id(&self, lineage_id: &LineageId) -> Result<Option<CtxId>, AcdpError> {
427 let g = self.lock();
428 Ok(g.lineages
429 .get(lineage_id.as_str())
430 .and_then(|ids| ids.first().cloned())
431 .map(CtxId))
432 }
433
434 fn idempotency_lookup(
435 &self,
436 agent_id: &AgentDid,
437 key: &str,
438 ) -> Result<Option<IdempotencyRecord>, AcdpError> {
439 self.idempotency_evict_expired(chrono::Utc::now())?;
442 let g = self.lock();
443 Ok(g.idempotency
444 .get(&(agent_id.as_str().to_string(), key.to_string()))
445 .cloned())
446 }
447
448 fn idempotency_record(
449 &self,
450 agent_id: &AgentDid,
451 key: &str,
452 hash: &acdp_types::primitives::ContentHash,
453 response: &acdp_types::publish::PublishResponse,
454 expires_at: chrono::DateTime<chrono::Utc>,
455 ) -> Result<(), AcdpError> {
456 let mut g = self.lock();
457 g.idempotency.insert(
458 (agent_id.as_str().to_string(), key.to_string()),
459 IdempotencyRecord {
460 content_hash: hash.clone(),
461 response: response.clone(),
462 expires_at,
463 },
464 );
465 Ok(())
466 }
467
468 fn idempotency_evict_expired(
469 &self,
470 now: chrono::DateTime<chrono::Utc>,
471 ) -> Result<(), AcdpError> {
472 let mut g = self.lock();
473 g.idempotency.retain(|_, r| r.expires_at > now);
474 Ok(())
475 }
476
477 fn commit_publish(&self, commit: PublishCommit<'_>) -> Result<PublishCommitOutcome, AcdpError> {
478 use crate::registry::validator::assign_identifiers;
479
480 let PublishCommit {
481 req,
482 authority,
483 idempotency,
484 tenant: _,
487 receipt_minter,
488 } = commit;
489 let now = chrono::Utc::now();
490 let mut g = self.lock();
491
492 if let Some(idem) = &idempotency {
494 let idem_key = (req.agent_id.as_str().to_string(), idem.key.to_string());
495 if let Some(prior) = g.idempotency.get(&idem_key) {
496 if prior.expires_at > now {
497 return if prior.content_hash == req.content_hash {
498 Ok(PublishCommitOutcome::IdempotentReplay(
500 prior.response.clone(),
501 ))
502 } else {
503 Err(AcdpError::DuplicatePublish(format!(
505 "Idempotency-Key '{}' was previously used by '{}' \
506 with a different content_hash",
507 idem.key, req.agent_id
508 )))
509 };
510 }
511 }
513 }
514
515 let first_v1 = if let Some(prev) = &req.supersedes {
517 let prev_full = g.by_ctx.get(prev.as_str()).cloned().ok_or_else(|| {
518 AcdpError::SupersededTarget {
519 reason: acdp_primitives::error::SupersessionReason::NotFound,
520 message: format!("supersedes target '{prev}' not found in this registry"),
521 }
522 })?;
523
524 let is_owner = req.agent_id == prev_full.body.agent_id
534 || prev_full.body.contributors.contains(&req.agent_id);
535 if !is_owner {
536 return Err(AcdpError::SupersededTarget {
542 reason: acdp_primitives::error::SupersessionReason::NotFound,
543 message: format!("supersedes target '{prev}' not found in this registry"),
544 });
545 }
546
547 if let Some(declared) = &req.lineage_id {
549 if declared != &prev_full.body.lineage_id {
550 return Err(AcdpError::SupersededTarget {
551 reason: acdp_primitives::error::SupersessionReason::LineageMismatch,
552 message: format!(
553 "declared lineage_id '{declared}' ≠ predecessor's '{}'",
554 prev_full.body.lineage_id
555 ),
556 });
557 }
558 }
559 if req.version != prev_full.body.version + 1 {
561 return Err(AcdpError::SupersededTarget {
562 reason: acdp_primitives::error::SupersessionReason::VersionMismatch,
563 message: format!(
564 "version {} ≠ predecessor.version + 1 ({})",
565 req.version,
566 prev_full.body.version + 1
567 ),
568 });
569 }
570 if matches!(prev_full.registry_state.status, Status::Superseded) {
574 return Err(AcdpError::SupersededTarget {
575 reason: acdp_primitives::error::SupersessionReason::AlreadySuperseded,
576 message: format!("supersedes target '{prev}' has already been superseded"),
577 });
578 }
579
580 g.lineages
584 .get(prev_full.body.lineage_id.as_str())
585 .and_then(|ids| ids.first().cloned())
586 .map(CtxId)
587 } else {
588 None
589 };
590
591 let validated = crate::registry::validator::ValidatedPublish {
593 recomputed_hash: req.content_hash.clone(),
594 };
595 let (ctx_id, lineage_id) =
596 assign_identifiers(authority, &req.supersedes, first_v1.as_ref(), &validated)?;
597
598 let created_at = acdp_primitives::time::trunc_ms(now);
600 let body = Body {
601 ctx_id: ctx_id.clone(),
602 lineage_id: lineage_id.clone(),
603 origin_registry: authority.to_string(),
604 created_at,
605 content_hash: req.content_hash.clone(),
606 signature: req.signature.clone(),
607 version: req.version,
608 supersedes: req.supersedes.clone(),
609 agent_id: req.agent_id.clone(),
610 contributors: req.contributors.clone(),
611 title: req.title.clone(),
612 context_type: req.context_type.clone(),
613 data_refs: req.data_refs.clone(),
614 derived_from: req.derived_from.clone(),
615 visibility: req.visibility.clone(),
616 audience: req.audience.clone(),
617 acdp_version: req.acdp_version.clone(),
618 description: req.description.clone(),
619 summary: req.summary.clone(),
620 tags: req.tags.clone(),
621 domain: req.domain.clone(),
622 expires_at: req.expires_at,
623 data_period: req.data_period.clone(),
624 metadata: req.metadata.clone(),
625 schema_uri: req.schema_uri.clone(),
626 extensions: Default::default(),
627 };
628
629 let ctx_id_str = body.ctx_id.0.clone();
631 let lineage_id_str = body.lineage_id.0.clone();
632 if g.by_ctx.contains_key(&ctx_id_str) {
633 return Err(AcdpError::SchemaViolation(format!(
637 "ctx_id collision: '{ctx_id_str}' already exists"
638 )));
639 }
640 let registry_receipt = receipt_minter.map(|mint| mint(&body)).transpose()?;
644
645 let stored = FullContext {
646 body,
647 registry_state: RegistryState {
648 status: Status::Active,
649 extensions: Default::default(),
650 },
651 registry_receipt: registry_receipt.clone(),
652 extensions: Default::default(),
653 };
654 g.by_ctx.insert(ctx_id_str.clone(), stored);
655 g.lineages
656 .entry(lineage_id_str)
657 .or_default()
658 .push(ctx_id_str);
659
660 if let Some(prev) = &req.supersedes {
662 if let Some(prev_ctx) = g.by_ctx.get_mut(prev.as_str()) {
663 prev_ctx.registry_state.status = Status::Superseded;
664 }
665 }
666
667 let response = PublishResponse {
668 ctx_id,
669 lineage_id,
670 version: req.version,
671 created_at,
672 status: Status::Active,
673 registry_receipt,
674 };
675
676 if let Some(idem) = idempotency {
678 let expires_at = now + idem.ttl;
679 g.idempotency.insert(
680 (req.agent_id.as_str().to_string(), idem.key.to_string()),
681 IdempotencyRecord {
682 content_hash: req.content_hash.clone(),
683 response: response.clone(),
684 expires_at,
685 },
686 );
687 }
688
689 Ok(PublishCommitOutcome::Inserted(response))
690 }
691
692 fn search(
693 &self,
694 params: &SearchParams,
695 requester: Option<&AgentDid>,
696 anonymous_public_reads: bool,
697 ) -> Result<SearchResponse, AcdpError> {
698 let g = self.lock();
699 let now = chrono::Utc::now();
700
701 let q_lower = params.q.as_deref().map(str::to_lowercase);
702 let domain = params.domain.as_deref();
703 let agent = params.agent_id.as_deref();
704 let context_type = params.context_type.as_deref();
705 let derived_from = params.derived_from.as_deref();
706 let schema_uri = params.schema_uri.as_deref();
707 let tags: Option<Vec<&str>> = params.tags.as_deref().map(|s| {
708 s.split(',')
709 .map(str::trim)
710 .filter(|t| !t.is_empty())
711 .collect()
712 });
713
714 let created_after = parse_opt_rfc3339(¶ms.created_after)?;
717 let created_before = parse_opt_rfc3339(¶ms.created_before)?;
718 let dp_start_after = parse_opt_rfc3339(¶ms.data_period_start_after)?;
719 let dp_end_before = parse_opt_rfc3339(¶ms.data_period_end_before)?;
720 let expires_after = parse_opt_rfc3339(¶ms.expires_after)?;
721 let expires_before = parse_opt_rfc3339(¶ms.expires_before)?;
722
723 let mut matches: Vec<&FullContext> = g
724 .by_ctx
725 .values()
726 .filter(|ctx| {
727 let body = &ctx.body;
728
729 if !can_surface_in_search(body, requester, anonymous_public_reads) {
733 return false;
734 }
735
736 if let Some(q) = &q_lower {
737 let haystack = format!(
738 "{} {} {} {} {} {}",
739 body.title,
740 body.description.as_deref().unwrap_or(""),
741 body.summary.as_deref().unwrap_or(""),
742 body.domain.as_deref().unwrap_or(""),
743 body.agent_id.as_str(),
744 body.tags.as_ref().map(|t| t.join(" ")).unwrap_or_default(),
745 )
746 .to_lowercase();
747 if !haystack.contains(q) {
748 return false;
749 }
750 }
751 if let Some(d) = domain {
752 if body.domain.as_deref() != Some(d) {
753 return false;
754 }
755 }
756 if let Some(a) = agent {
757 if body.agent_id.as_str() != a {
758 return false;
759 }
760 }
761 if let Some(t) = context_type {
762 let body_type = serde_json::to_value(&body.context_type)
763 .ok()
764 .and_then(|v| v.as_str().map(str::to_string))
765 .unwrap_or_default();
766 if body_type != t {
767 return false;
768 }
769 }
770 if let Some(df) = derived_from {
771 if !body.derived_from.iter().any(|c| c.as_str() == df) {
772 return false;
773 }
774 }
775 if let Some(req_tags) = &tags {
776 let body_tags = body.tags.as_deref().unwrap_or(&[]);
777 if !req_tags.iter().all(|t| body_tags.iter().any(|bt| bt == t)) {
778 return false;
779 }
780 }
781 if let Some(uri) = schema_uri {
782 if body.schema_uri.as_deref() != Some(uri) {
783 return false;
784 }
785 }
786 if let Some(after) = created_after {
787 if body.created_at < after {
788 return false;
789 }
790 }
791 if let Some(before) = created_before {
792 if body.created_at > before {
793 return false;
794 }
795 }
796 if let Some(after) = dp_start_after {
797 match &body.data_period {
798 Some(p) if p.start >= after => {}
799 _ => return false,
800 }
801 }
802 if let Some(before) = dp_end_before {
803 match &body.data_period {
804 Some(p) if p.end <= before => {}
805 _ => return false,
806 }
807 }
808 if let Some(after) = expires_after {
809 match body.expires_at {
810 Some(e) if e >= after => {}
811 _ => return false,
812 }
813 }
814 if let Some(before) = expires_before {
815 match body.expires_at {
816 Some(e) if e <= before => {}
817 _ => return false,
818 }
819 }
820 let want_status = params.status.as_deref().unwrap_or("active");
824 let effective = project_status(&ctx.registry_state.status, body, now);
825 if effective.as_str() != want_status {
826 return false;
827 }
828 true
829 })
830 .collect();
831
832 matches.sort_by(|a, b| {
835 b.body
836 .created_at
837 .cmp(&a.body.created_at)
838 .then_with(|| a.body.ctx_id.as_str().cmp(b.body.ctx_id.as_str()))
839 });
840
841 let total_estimate = Some(matches.len() as u64);
847
848 let cursor_anchor = params
852 .cursor
853 .as_deref()
854 .map(decode_cursor)
855 .transpose()?
856 .flatten();
857 if let Some((anchor_ms, anchor_id)) = &cursor_anchor {
858 matches.retain(|c| {
859 let ms = c.body.created_at.timestamp_millis();
860 ms < *anchor_ms || (ms == *anchor_ms && c.body.ctx_id.as_str() > anchor_id.as_str())
861 });
862 }
863
864 let limit = params.limit.unwrap_or(50).clamp(1, 100) as usize;
869 let next_cursor = if matches.len() > limit {
870 matches.get(limit - 1).map(|c| {
871 encode_cursor(c.body.created_at.timestamp_millis(), c.body.ctx_id.as_str())
872 })
873 } else {
874 None
875 };
876
877 let projected: Vec<SearchResult> = matches
878 .iter()
879 .take(limit)
880 .map(|ctx| SearchResult {
881 ctx_id: ctx.body.ctx_id.clone(),
882 lineage_id: ctx.body.lineage_id.clone(),
883 agent_id: ctx.body.agent_id.clone(),
884 title: ctx.body.title.clone(),
885 summary: ctx.body.summary.clone(),
886 context_type: ctx.body.context_type.clone(),
887 domain: ctx.body.domain.clone(),
888 created_at: ctx.body.created_at,
889 status: project_status(&ctx.registry_state.status, &ctx.body, now),
890 visibility: Some(ctx.body.visibility.clone()),
896 })
897 .collect();
898
899 Ok(SearchResponse {
900 matches: projected,
901 total_estimate,
902 next_cursor,
903 })
904 }
905}
906
907fn parse_opt_rfc3339(
910 s: &Option<String>,
911) -> Result<Option<chrono::DateTime<chrono::Utc>>, AcdpError> {
912 let Some(raw) = s.as_deref() else {
913 return Ok(None);
914 };
915 let dt = chrono::DateTime::parse_from_rfc3339(raw)
916 .map_err(|e| AcdpError::SchemaViolation(format!("malformed datetime '{raw}': {e}")))?;
917 Ok(Some(dt.with_timezone(&chrono::Utc)))
918}
919
920const CURSOR_TTL: chrono::Duration = chrono::Duration::seconds(3600);
924
925fn encode_cursor(created_at_ms: i64, ctx_id: &str) -> String {
933 use base64::{engine::general_purpose::STANDARD, Engine};
934 let mint_ms = chrono::Utc::now().timestamp_millis();
935 STANDARD.encode(format!("{mint_ms}:{created_at_ms}:{ctx_id}"))
936}
937
938fn decode_cursor(s: &str) -> Result<Option<(i64, String)>, AcdpError> {
939 use base64::{engine::general_purpose::STANDARD, Engine};
940 let bytes = STANDARD
941 .decode(s)
942 .map_err(|_| AcdpError::InvalidCursor("cursor is not valid base64".into()))?;
943 let decoded = String::from_utf8(bytes)
944 .map_err(|_| AcdpError::InvalidCursor("cursor is not utf-8".into()))?;
945 let mut parts = decoded.splitn(3, ':');
948 let mint_str = parts
949 .next()
950 .ok_or_else(|| AcdpError::InvalidCursor("cursor missing mint timestamp".into()))?;
951 let anchor_str = parts
952 .next()
953 .ok_or_else(|| AcdpError::InvalidCursor("cursor missing anchor timestamp".into()))?;
954 let ctx_id = parts
955 .next()
956 .ok_or_else(|| AcdpError::InvalidCursor("cursor missing ctx_id".into()))?;
957 let mint_ms: i64 = mint_str
958 .parse()
959 .map_err(|_| AcdpError::InvalidCursor("cursor mint millis is not an integer".into()))?;
960 let anchor_ms: i64 = anchor_str
961 .parse()
962 .map_err(|_| AcdpError::InvalidCursor("cursor anchor millis is not an integer".into()))?;
963
964 let now = chrono::Utc::now().timestamp_millis();
969 let age_ms = now.saturating_sub(mint_ms);
970 if age_ms > CURSOR_TTL.num_milliseconds() {
971 return Err(AcdpError::CursorExpired);
976 }
977 Ok(Some((anchor_ms, ctx_id.to_string())))
978}
979
980#[cfg(test)]
981mod tests {
982 use super::*;
983 use acdp_crypto::SigningKey;
984 use acdp_producer::Producer;
985 use acdp_types::body::{DataPeriod, Signature};
986 use acdp_types::primitives::{AgentDid, ContentHash, ContextType, Visibility};
987 use chrono::Utc;
988
989 fn fake_body(ctx_id: &str, lineage_id: &str, title: &str) -> Body {
990 Body {
991 ctx_id: CtxId(ctx_id.into()),
992 lineage_id: LineageId(lineage_id.into()),
993 origin_registry: "registry.example.com".into(),
994 created_at: Utc::now(),
995 content_hash: ContentHash("sha256:0".into()),
996 signature: Signature {
997 algorithm: "ed25519".into(),
998 key_id: "did:web:agents.example.com:test#key-1".into(),
999 value: "A".repeat(88),
1000 },
1001 version: 1,
1002 supersedes: None,
1003 agent_id: AgentDid::new("did:web:agents.example.com:test"),
1004 contributors: vec![],
1005 title: title.into(),
1006 context_type: ContextType::DataSnapshot,
1007 data_refs: vec![],
1008 derived_from: vec![],
1009 visibility: Visibility::Public,
1010 audience: None,
1011 acdp_version: None,
1012 description: None,
1013 summary: None,
1014 tags: None,
1015 domain: None,
1016 expires_at: None,
1017 data_period: None,
1018 metadata: None,
1019 schema_uri: None,
1020 extensions: Default::default(),
1021 }
1022 }
1023
1024 #[test]
1025 fn put_get_round_trip() {
1026 let s = InMemoryStore::new();
1027 let id = "acdp://r/12345678-1234-4321-8123-123456781234";
1028 let lin = "lin:sha256:1111111111111111111111111111111111111111111111111111111111111111";
1029 s.put(fake_body(id, lin, "A")).unwrap();
1030 let got = s.get(&CtxId(id.into())).unwrap().unwrap();
1031 assert_eq!(got.body.title, "A");
1032 assert!(matches!(got.registry_state.status, Status::Active));
1033 }
1034
1035 #[test]
1036 fn lineage_orders_by_publish_order() {
1037 let s = InMemoryStore::new();
1038 let lin = "lin:sha256:2222222222222222222222222222222222222222222222222222222222222222";
1039 let v1 = "acdp://r/12345678-1234-4321-8123-000000000001";
1040 let v2 = "acdp://r/12345678-1234-4321-8123-000000000002";
1041 s.put(fake_body(v1, lin, "v1")).unwrap();
1042 s.put(fake_body(v2, lin, "v2")).unwrap();
1043 let lineage = s.lineage(&LineageId(lin.into())).unwrap();
1044 assert_eq!(lineage.len(), 2);
1045 assert_eq!(lineage[0].body.title, "v1");
1046 assert_eq!(lineage[1].body.title, "v2");
1047 }
1048
1049 #[test]
1050 fn supersession_marks_predecessor() {
1051 let s = InMemoryStore::new();
1052 let lin = "lin:sha256:3333333333333333333333333333333333333333333333333333333333333333";
1053 let v1 = "acdp://r/12345678-1234-4321-8123-000000000003";
1054 s.put(fake_body(v1, lin, "v1")).unwrap();
1055 s.mark_superseded(&CtxId(v1.into())).unwrap();
1056 let got = s.get(&CtxId(v1.into())).unwrap().unwrap();
1057 assert!(matches!(got.registry_state.status, Status::Superseded));
1058 }
1059
1060 fn expired_body(
1063 ctx_id: &str,
1064 lineage_id: &str,
1065 title: &str,
1066 expires_at: chrono::DateTime<chrono::Utc>,
1067 ) -> Body {
1068 let mut b = fake_body(ctx_id, lineage_id, title);
1069 b.expires_at = Some(expires_at);
1070 b
1071 }
1072
1073 #[test]
1074 fn get_projects_active_to_expired_when_past_expires_at() {
1075 use chrono::Duration;
1076 let s = InMemoryStore::new();
1077 let lin = "lin:sha256:5555555555555555555555555555555555555555555555555555555555555555";
1078 let id = "acdp://r/12345678-1234-4321-8123-000000000006";
1079 s.put(expired_body(
1080 id,
1081 lin,
1082 "old",
1083 chrono::Utc::now() - Duration::hours(1),
1084 ))
1085 .unwrap();
1086 let got = s.get(&CtxId(id.into())).unwrap().unwrap();
1087 assert!(
1088 matches!(got.registry_state.status, Status::Expired),
1089 "expected Status::Expired projection, got {:?}",
1090 got.registry_state.status
1091 );
1092 }
1093
1094 #[test]
1095 fn get_keeps_active_when_expires_at_in_future() {
1096 use chrono::Duration;
1097 let s = InMemoryStore::new();
1098 let lin = "lin:sha256:6666666666666666666666666666666666666666666666666666666666666666";
1099 let id = "acdp://r/12345678-1234-4321-8123-000000000007";
1100 s.put(expired_body(
1101 id,
1102 lin,
1103 "fresh",
1104 chrono::Utc::now() + Duration::hours(1),
1105 ))
1106 .unwrap();
1107 let got = s.get(&CtxId(id.into())).unwrap().unwrap();
1108 assert!(matches!(got.registry_state.status, Status::Active));
1109 }
1110
1111 #[test]
1112 fn search_status_active_filters_out_expired() {
1113 use chrono::Duration;
1114 let s = InMemoryStore::new();
1115 let lin = "lin:sha256:7777777777777777777777777777777777777777777777777777777777777777";
1116 let id = "acdp://r/12345678-1234-4321-8123-000000000008";
1117 s.put(expired_body(
1118 id,
1119 lin,
1120 "old",
1121 chrono::Utc::now() - Duration::hours(1),
1122 ))
1123 .unwrap();
1124 let resp = s.search(&SearchParams::default(), None, true).unwrap();
1125 assert!(
1126 resp.matches.is_empty(),
1127 "expired must not surface under status=active default"
1128 );
1129 let resp = s
1131 .search(
1132 &SearchParams {
1133 status: Some("expired".into()),
1134 ..Default::default()
1135 },
1136 None,
1137 true,
1138 )
1139 .unwrap();
1140 assert_eq!(resp.matches.len(), 1);
1141 }
1142
1143 #[test]
1145 fn search_filters_by_created_after() {
1146 let s = InMemoryStore::new();
1147 let lin = "lin:sha256:8888888888888888888888888888888888888888888888888888888888888888";
1148 let mut body = fake_body(
1149 "acdp://r/12345678-1234-4321-8123-000000000009",
1150 lin,
1151 "match",
1152 );
1153 body.created_at = chrono::DateTime::parse_from_rfc3339("2026-01-01T00:00:00.000Z")
1154 .unwrap()
1155 .with_timezone(&chrono::Utc);
1156 s.put(body).unwrap();
1157 let resp = s
1159 .search(
1160 &SearchParams {
1161 created_after: Some("2026-02-01T00:00:00.000Z".into()),
1162 ..Default::default()
1163 },
1164 None,
1165 true,
1166 )
1167 .unwrap();
1168 assert_eq!(resp.matches.len(), 0);
1169 let resp = s
1171 .search(
1172 &SearchParams {
1173 created_after: Some("2025-12-01T00:00:00.000Z".into()),
1174 ..Default::default()
1175 },
1176 None,
1177 true,
1178 )
1179 .unwrap();
1180 assert_eq!(resp.matches.len(), 1);
1181 }
1182
1183 #[test]
1184 fn search_invalid_rfc3339_filter_rejected() {
1185 let s = InMemoryStore::new();
1186 let err = s
1187 .search(
1188 &SearchParams {
1189 created_after: Some("not-a-date".into()),
1190 ..Default::default()
1191 },
1192 None,
1193 true,
1194 )
1195 .unwrap_err();
1196 assert!(matches!(err, AcdpError::SchemaViolation(_)));
1197 }
1198
1199 #[test]
1201 fn search_cursor_pages_results() {
1202 let s = InMemoryStore::new();
1203 let lin = "lin:sha256:9999999999999999999999999999999999999999999999999999999999999999";
1204 let base = chrono::DateTime::parse_from_rfc3339("2026-01-01T00:00:00.000Z")
1206 .unwrap()
1207 .with_timezone(&chrono::Utc);
1208 for i in 0..5u8 {
1209 let mut body = fake_body(
1210 &format!("acdp://r/12345678-1234-4321-8123-00000000010{i}"),
1211 lin,
1212 "match",
1213 );
1214 body.created_at = base + chrono::Duration::minutes(i as i64);
1215 s.put(body).unwrap();
1216 }
1217 let p1 = s
1218 .search(
1219 &SearchParams {
1220 limit: Some(2),
1221 ..Default::default()
1222 },
1223 None,
1224 true,
1225 )
1226 .unwrap();
1227 assert_eq!(p1.matches.len(), 2);
1228 let cursor = p1.next_cursor.expect("page 1 should carry a cursor");
1229 let p2 = s
1230 .search(
1231 &SearchParams {
1232 limit: Some(2),
1233 cursor: Some(cursor.clone()),
1234 ..Default::default()
1235 },
1236 None,
1237 true,
1238 )
1239 .unwrap();
1240 assert_eq!(p2.matches.len(), 2);
1241 for r in &p2.matches {
1243 assert!(
1244 !p1.matches.iter().any(|q| q.ctx_id == r.ctx_id),
1245 "page 2 overlapped page 1"
1246 );
1247 }
1248 assert_eq!(
1252 p1.total_estimate, p2.total_estimate,
1253 "total_estimate MUST be stable across pages (BUG-08); \
1254 p1={:?}, p2={:?}",
1255 p1.total_estimate, p2.total_estimate
1256 );
1257 assert_eq!(
1258 p1.total_estimate,
1259 Some(5),
1260 "total_estimate MUST reflect total matches across all pages, got {:?}",
1261 p1.total_estimate
1262 );
1263 }
1264
1265 #[test]
1266 fn search_limit_zero_does_not_underflow() {
1267 let s = InMemoryStore::new();
1270 let lin = "lin:sha256:8888888888888888888888888888888888888888888888888888888888888888";
1271 for i in 0..3u8 {
1272 let body = fake_body(
1273 &format!("acdp://r/12345678-1234-4321-8123-00000000020{i}"),
1274 lin,
1275 "match",
1276 );
1277 s.put(body).unwrap();
1278 }
1279 let page = s
1280 .search(
1281 &SearchParams {
1282 limit: Some(0),
1283 ..Default::default()
1284 },
1285 None,
1286 true,
1287 )
1288 .expect("limit=0 must not panic or error");
1289 assert_eq!(page.matches.len(), 1);
1291 assert!(page.next_cursor.is_some());
1292 }
1293
1294 #[test]
1295 fn search_malformed_cursor_rejected() {
1296 let s = InMemoryStore::new();
1297 let err = s
1298 .search(
1299 &SearchParams {
1300 cursor: Some("not_base64!@#".into()),
1301 ..Default::default()
1302 },
1303 None,
1304 true,
1305 )
1306 .unwrap_err();
1307 assert!(matches!(err, AcdpError::InvalidCursor(_)));
1308 }
1309
1310 #[test]
1315 fn search_aged_cursor_rejected_as_cursor_expired() {
1316 use base64::{engine::general_purpose::STANDARD, Engine};
1317 let s = InMemoryStore::new();
1318 let stale_mint_ms = chrono::Utc::now().timestamp_millis() - 7200 * 1000;
1320 let aged = STANDARD.encode(format!(
1321 "{stale_mint_ms}:0:acdp://r/12345678-1234-4321-8123-1234567812aa"
1322 ));
1323 let err = s
1324 .search(
1325 &SearchParams {
1326 cursor: Some(aged),
1327 ..Default::default()
1328 },
1329 None,
1330 true,
1331 )
1332 .unwrap_err();
1333 assert!(
1334 matches!(err, AcdpError::CursorExpired),
1335 "expired cursor MUST surface CursorExpired, got {err:?}"
1336 );
1337 }
1338
1339 #[test]
1340 fn search_filters_by_status_default_active() {
1341 let s = InMemoryStore::new();
1342 let lin = "lin:sha256:4444444444444444444444444444444444444444444444444444444444444444";
1343 let v1 = "acdp://r/12345678-1234-4321-8123-000000000004";
1344 let v2 = "acdp://r/12345678-1234-4321-8123-000000000005";
1345 s.put(fake_body(v1, lin, "old")).unwrap();
1346 s.put(fake_body(v2, lin, "new")).unwrap();
1347 s.mark_superseded(&CtxId(v1.into())).unwrap();
1348 let resp = s
1349 .search(
1350 &SearchParams {
1351 q: Some("old".into()),
1352 ..Default::default()
1353 },
1354 None,
1355 true,
1356 )
1357 .unwrap();
1358 assert_eq!(resp.matches.len(), 0);
1360 let resp = s
1361 .search(
1362 &SearchParams {
1363 q: Some("new".into()),
1364 ..Default::default()
1365 },
1366 None,
1367 true,
1368 )
1369 .unwrap();
1370 assert_eq!(resp.matches.len(), 1);
1371 }
1372
1373 #[test]
1377 fn store_round_trip_from_real_publish_request() {
1378 use crate::registry::server::RegistryServer;
1379 use acdp_types::capabilities::{CapabilitiesDocument, Limits};
1380
1381 let key = SigningKey::from_bytes(&[7u8; 32]);
1382 let p = Producer::new(
1383 key,
1384 AgentDid::new("did:web:agents.example.com:test"),
1385 "did:web:agents.example.com:test#key-1",
1386 );
1387 let req = p
1388 .publish_request()
1389 .title("hello")
1390 .context_type(ContextType::DataSnapshot)
1391 .visibility(Visibility::Public)
1392 .build()
1393 .unwrap();
1394
1395 let caps = CapabilitiesDocument {
1396 acdp_version: "0.1.0".into(),
1397 registry_did: "did:web:registry.example.com".into(),
1398 supported_signature_algorithms: vec!["ed25519".into()],
1399 supported_did_methods: vec!["did:web".into()],
1400 profiles: vec!["acdp-registry-core".into()],
1401 limits: Limits {
1402 max_payload_bytes: 1_048_576,
1403 max_embedded_bytes: 65_536,
1404 idempotency_key_ttl_seconds: None,
1405 },
1406 read_authentication_methods: vec![],
1407 anonymous_public_reads: true,
1408 supports_idempotency_key: false,
1409 extensions: Default::default(),
1410 };
1411
1412 let server = RegistryServer::new(InMemoryStore::new(), caps, "registry.example.com");
1413 let resp = server.publish_unverified_for_tests(&req).unwrap();
1414 assert_eq!(resp.version, 1);
1415 let ctx = server.retrieve(&resp.ctx_id, None).unwrap().unwrap();
1416 assert_eq!(ctx.body.title, "hello");
1417
1418 let _: Option<DataPeriod> = ctx.body.data_period.clone();
1420 }
1421}