use std::sync::Mutex;
use crate::error::AcdpError;
use crate::types::{
body::{Body, FullContext, RegistryState},
primitives::{AgentDid, CtxId, LineageId, Status, Visibility},
publish::{PublishRequest, PublishResponse},
search::{SearchParams, SearchResponse, SearchResult},
};
pub trait RegistryStore: Send + Sync {
fn put(&self, body: Body) -> Result<(), AcdpError>;
fn get(&self, ctx_id: &CtxId) -> Result<Option<FullContext>, AcdpError>;
fn lineage(&self, lineage_id: &LineageId) -> Result<Vec<FullContext>, AcdpError>;
fn current(&self, lineage_id: &LineageId) -> Result<Option<FullContext>, AcdpError>;
fn mark_superseded(&self, ctx_id: &CtxId) -> Result<(), AcdpError>;
fn first_version_ctx_id(&self, lineage_id: &LineageId) -> Result<Option<CtxId>, AcdpError>;
fn search(
&self,
params: &SearchParams,
requester: Option<&AgentDid>,
anonymous_public_reads: bool,
) -> Result<SearchResponse, AcdpError>;
fn idempotency_lookup(
&self,
_agent_id: &AgentDid,
_key: &str,
) -> Result<Option<IdempotencyRecord>, AcdpError> {
Ok(None)
}
fn idempotency_record(
&self,
_agent_id: &AgentDid,
_key: &str,
_hash: &crate::types::primitives::ContentHash,
_response: &crate::types::publish::PublishResponse,
_expires_at: chrono::DateTime<chrono::Utc>,
) -> Result<(), AcdpError> {
Ok(())
}
fn idempotency_evict_expired(
&self,
_now: chrono::DateTime<chrono::Utc>,
) -> Result<(), AcdpError> {
Ok(())
}
fn commit_publish(&self, commit: PublishCommit<'_>) -> Result<PublishCommitOutcome, AcdpError>;
}
pub struct PublishCommit<'a> {
pub req: &'a PublishRequest,
pub authority: &'a str,
pub idempotency: Option<PendingIdempotencyCommit<'a>>,
pub tenant: Option<&'a str>,
}
pub struct PendingIdempotencyCommit<'a> {
pub key: &'a str,
pub ttl: chrono::Duration,
}
#[derive(Debug)]
pub enum PublishCommitOutcome {
Inserted(PublishResponse),
IdempotentReplay(PublishResponse),
}
#[derive(Debug, Clone)]
pub struct IdempotencyRecord {
pub content_hash: crate::types::primitives::ContentHash,
pub response: crate::types::publish::PublishResponse,
pub expires_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Default)]
pub struct InMemoryStore {
inner: Mutex<Inner>,
}
#[derive(Default)]
struct Inner {
by_ctx: std::collections::BTreeMap<String, FullContext>,
lineages: std::collections::BTreeMap<String, Vec<String>>,
idempotency: std::collections::HashMap<(String, String), IdempotencyRecord>,
}
impl InMemoryStore {
pub fn new() -> Self {
Self::default()
}
fn lock(&self) -> std::sync::MutexGuard<'_, Inner> {
self.inner.lock().expect("InMemoryStore mutex poisoned")
}
}
pub(crate) fn project_status(
stored: &Status,
body: &Body,
now: chrono::DateTime<chrono::Utc>,
) -> Status {
match stored {
Status::Active => match body.expires_at {
Some(exp) if exp <= now => Status::Expired,
_ => Status::Active,
},
other => other.clone(),
}
}
pub(crate) fn project_context(
mut ctx: FullContext,
now: chrono::DateTime<chrono::Utc>,
) -> FullContext {
ctx.registry_state.status = project_status(&ctx.registry_state.status, &ctx.body, now);
ctx
}
fn can_surface_in_search(
body: &Body,
requester: Option<&AgentDid>,
anonymous_public_reads: bool,
) -> bool {
match body.visibility {
Visibility::Public => anonymous_public_reads || requester.is_some(),
Visibility::Restricted => match requester {
None => false,
Some(r) => {
r == &body.agent_id
|| body
.audience
.as_deref()
.is_some_and(|a| a.iter().any(|d| d == r))
}
},
Visibility::Private => requester == Some(&body.agent_id),
}
}
impl RegistryStore for InMemoryStore {
fn put(&self, body: Body) -> Result<(), AcdpError> {
let ctx_id = body.ctx_id.0.clone();
let lineage_id = body.lineage_id.0.clone();
let ctx = FullContext {
body,
registry_state: RegistryState {
status: Status::Active,
extensions: Default::default(),
},
registry_receipt: None,
extensions: Default::default(),
};
let mut g = self.lock();
if g.by_ctx.contains_key(&ctx_id) {
return Err(AcdpError::SchemaViolation(format!(
"duplicate ctx_id '{ctx_id}' in store"
)));
}
g.by_ctx.insert(ctx_id.clone(), ctx);
g.lineages.entry(lineage_id).or_default().push(ctx_id);
Ok(())
}
fn get(&self, ctx_id: &CtxId) -> Result<Option<FullContext>, AcdpError> {
let now = chrono::Utc::now();
Ok(self
.lock()
.by_ctx
.get(ctx_id.as_str())
.cloned()
.map(|c| project_context(c, now)))
}
fn lineage(&self, lineage_id: &LineageId) -> Result<Vec<FullContext>, AcdpError> {
let now = chrono::Utc::now();
let g = self.lock();
let Some(ids) = g.lineages.get(lineage_id.as_str()) else {
return Ok(Vec::new());
};
Ok(ids
.iter()
.filter_map(|id| g.by_ctx.get(id).cloned().map(|c| project_context(c, now)))
.collect())
}
fn current(&self, lineage_id: &LineageId) -> Result<Option<FullContext>, AcdpError> {
let now = chrono::Utc::now();
let g = self.lock();
let Some(ids) = g.lineages.get(lineage_id.as_str()) else {
return Ok(None);
};
for id in ids.iter().rev() {
if let Some(ctx) = g.by_ctx.get(id) {
let projected = project_context(ctx.clone(), now);
if !matches!(projected.registry_state.status, Status::Superseded) {
return Ok(Some(projected));
}
}
}
Ok(None)
}
fn mark_superseded(&self, ctx_id: &CtxId) -> Result<(), AcdpError> {
let mut g = self.lock();
if let Some(ctx) = g.by_ctx.get_mut(ctx_id.as_str()) {
ctx.registry_state.status = Status::Superseded;
}
Ok(())
}
fn first_version_ctx_id(&self, lineage_id: &LineageId) -> Result<Option<CtxId>, AcdpError> {
let g = self.lock();
Ok(g.lineages
.get(lineage_id.as_str())
.and_then(|ids| ids.first().cloned())
.map(CtxId))
}
fn idempotency_lookup(
&self,
agent_id: &AgentDid,
key: &str,
) -> Result<Option<IdempotencyRecord>, AcdpError> {
self.idempotency_evict_expired(chrono::Utc::now())?;
let g = self.lock();
Ok(g.idempotency
.get(&(agent_id.as_str().to_string(), key.to_string()))
.cloned())
}
fn idempotency_record(
&self,
agent_id: &AgentDid,
key: &str,
hash: &crate::types::primitives::ContentHash,
response: &crate::types::publish::PublishResponse,
expires_at: chrono::DateTime<chrono::Utc>,
) -> Result<(), AcdpError> {
let mut g = self.lock();
g.idempotency.insert(
(agent_id.as_str().to_string(), key.to_string()),
IdempotencyRecord {
content_hash: hash.clone(),
response: response.clone(),
expires_at,
},
);
Ok(())
}
fn idempotency_evict_expired(
&self,
now: chrono::DateTime<chrono::Utc>,
) -> Result<(), AcdpError> {
let mut g = self.lock();
g.idempotency.retain(|_, r| r.expires_at > now);
Ok(())
}
fn commit_publish(&self, commit: PublishCommit<'_>) -> Result<PublishCommitOutcome, AcdpError> {
use crate::registry::validator::assign_identifiers;
let PublishCommit {
req,
authority,
idempotency,
tenant: _,
} = commit;
let now = chrono::Utc::now();
let mut g = self.lock();
if let Some(idem) = &idempotency {
let idem_key = (req.agent_id.as_str().to_string(), idem.key.to_string());
if let Some(prior) = g.idempotency.get(&idem_key) {
if prior.expires_at > now {
return if prior.content_hash == req.content_hash {
Ok(PublishCommitOutcome::IdempotentReplay(
prior.response.clone(),
))
} else {
Err(AcdpError::DuplicatePublish(format!(
"Idempotency-Key '{}' was previously used by '{}' \
with a different content_hash",
idem.key, req.agent_id
)))
};
}
}
}
let first_v1 = if let Some(prev) = &req.supersedes {
let prev_full = g.by_ctx.get(prev.as_str()).cloned().ok_or_else(|| {
AcdpError::SupersededTarget {
reason: crate::error::SupersessionReason::NotFound,
message: format!("supersedes target '{prev}' not found in this registry"),
}
})?;
let is_owner = req.agent_id == prev_full.body.agent_id
|| prev_full.body.contributors.contains(&req.agent_id);
if !is_owner {
return Err(AcdpError::SupersededTarget {
reason: crate::error::SupersessionReason::NotFound,
message: format!("supersedes target '{prev}' not found in this registry"),
});
}
if let Some(declared) = &req.lineage_id {
if declared != &prev_full.body.lineage_id {
return Err(AcdpError::SupersededTarget {
reason: crate::error::SupersessionReason::LineageMismatch,
message: format!(
"declared lineage_id '{declared}' ≠ predecessor's '{}'",
prev_full.body.lineage_id
),
});
}
}
if req.version != prev_full.body.version + 1 {
return Err(AcdpError::SupersededTarget {
reason: crate::error::SupersessionReason::VersionMismatch,
message: format!(
"version {} ≠ predecessor.version + 1 ({})",
req.version,
prev_full.body.version + 1
),
});
}
if matches!(prev_full.registry_state.status, Status::Superseded) {
return Err(AcdpError::SupersededTarget {
reason: crate::error::SupersessionReason::AlreadySuperseded,
message: format!("supersedes target '{prev}' has already been superseded"),
});
}
g.lineages
.get(prev_full.body.lineage_id.as_str())
.and_then(|ids| ids.first().cloned())
.map(CtxId)
} else {
None
};
let validated = crate::registry::validator::ValidatedPublish {
recomputed_hash: req.content_hash.clone(),
};
let (ctx_id, lineage_id) =
assign_identifiers(authority, &req.supersedes, first_v1.as_ref(), &validated)?;
let created_at = crate::time::trunc_ms(now);
let body = Body {
ctx_id: ctx_id.clone(),
lineage_id: lineage_id.clone(),
origin_registry: authority.to_string(),
created_at,
content_hash: req.content_hash.clone(),
signature: req.signature.clone(),
version: req.version,
supersedes: req.supersedes.clone(),
agent_id: req.agent_id.clone(),
contributors: req.contributors.clone(),
title: req.title.clone(),
context_type: req.context_type.clone(),
data_refs: req.data_refs.clone(),
derived_from: req.derived_from.clone(),
visibility: req.visibility.clone(),
audience: req.audience.clone(),
acdp_version: req.acdp_version.clone(),
description: req.description.clone(),
summary: req.summary.clone(),
tags: req.tags.clone(),
domain: req.domain.clone(),
expires_at: req.expires_at,
data_period: req.data_period.clone(),
metadata: req.metadata.clone(),
schema_uri: req.schema_uri.clone(),
extensions: Default::default(),
};
let ctx_id_str = body.ctx_id.0.clone();
let lineage_id_str = body.lineage_id.0.clone();
if g.by_ctx.contains_key(&ctx_id_str) {
return Err(AcdpError::SchemaViolation(format!(
"ctx_id collision: '{ctx_id_str}' already exists"
)));
}
let stored = FullContext {
body,
registry_state: RegistryState {
status: Status::Active,
extensions: Default::default(),
},
registry_receipt: None,
extensions: Default::default(),
};
g.by_ctx.insert(ctx_id_str.clone(), stored);
g.lineages
.entry(lineage_id_str)
.or_default()
.push(ctx_id_str);
if let Some(prev) = &req.supersedes {
if let Some(prev_ctx) = g.by_ctx.get_mut(prev.as_str()) {
prev_ctx.registry_state.status = Status::Superseded;
}
}
let response = PublishResponse {
ctx_id,
lineage_id,
version: req.version,
created_at,
status: Status::Active,
};
if let Some(idem) = idempotency {
let expires_at = now + idem.ttl;
g.idempotency.insert(
(req.agent_id.as_str().to_string(), idem.key.to_string()),
IdempotencyRecord {
content_hash: req.content_hash.clone(),
response: response.clone(),
expires_at,
},
);
}
Ok(PublishCommitOutcome::Inserted(response))
}
fn search(
&self,
params: &SearchParams,
requester: Option<&AgentDid>,
anonymous_public_reads: bool,
) -> Result<SearchResponse, AcdpError> {
let g = self.lock();
let now = chrono::Utc::now();
let q_lower = params.q.as_deref().map(str::to_lowercase);
let domain = params.domain.as_deref();
let agent = params.agent_id.as_deref();
let context_type = params.context_type.as_deref();
let derived_from = params.derived_from.as_deref();
let schema_uri = params.schema_uri.as_deref();
let tags: Option<Vec<&str>> = params.tags.as_deref().map(|s| {
s.split(',')
.map(str::trim)
.filter(|t| !t.is_empty())
.collect()
});
let created_after = parse_opt_rfc3339(¶ms.created_after)?;
let created_before = parse_opt_rfc3339(¶ms.created_before)?;
let dp_start_after = parse_opt_rfc3339(¶ms.data_period_start_after)?;
let dp_end_before = parse_opt_rfc3339(¶ms.data_period_end_before)?;
let expires_after = parse_opt_rfc3339(¶ms.expires_after)?;
let expires_before = parse_opt_rfc3339(¶ms.expires_before)?;
let mut matches: Vec<&FullContext> = g
.by_ctx
.values()
.filter(|ctx| {
let body = &ctx.body;
if !can_surface_in_search(body, requester, anonymous_public_reads) {
return false;
}
if let Some(q) = &q_lower {
let haystack = format!(
"{} {} {} {} {} {}",
body.title,
body.description.as_deref().unwrap_or(""),
body.summary.as_deref().unwrap_or(""),
body.domain.as_deref().unwrap_or(""),
body.agent_id.as_str(),
body.tags.as_ref().map(|t| t.join(" ")).unwrap_or_default(),
)
.to_lowercase();
if !haystack.contains(q) {
return false;
}
}
if let Some(d) = domain {
if body.domain.as_deref() != Some(d) {
return false;
}
}
if let Some(a) = agent {
if body.agent_id.as_str() != a {
return false;
}
}
if let Some(t) = context_type {
let body_type = serde_json::to_value(&body.context_type)
.ok()
.and_then(|v| v.as_str().map(str::to_string))
.unwrap_or_default();
if body_type != t {
return false;
}
}
if let Some(df) = derived_from {
if !body.derived_from.iter().any(|c| c.as_str() == df) {
return false;
}
}
if let Some(req_tags) = &tags {
let body_tags = body.tags.as_deref().unwrap_or(&[]);
if !req_tags.iter().all(|t| body_tags.iter().any(|bt| bt == t)) {
return false;
}
}
if let Some(uri) = schema_uri {
if body.schema_uri.as_deref() != Some(uri) {
return false;
}
}
if let Some(after) = created_after {
if body.created_at < after {
return false;
}
}
if let Some(before) = created_before {
if body.created_at > before {
return false;
}
}
if let Some(after) = dp_start_after {
match &body.data_period {
Some(p) if p.start >= after => {}
_ => return false,
}
}
if let Some(before) = dp_end_before {
match &body.data_period {
Some(p) if p.end <= before => {}
_ => return false,
}
}
if let Some(after) = expires_after {
match body.expires_at {
Some(e) if e >= after => {}
_ => return false,
}
}
if let Some(before) = expires_before {
match body.expires_at {
Some(e) if e <= before => {}
_ => return false,
}
}
let want_status = params.status.as_deref().unwrap_or("active");
let effective = project_status(&ctx.registry_state.status, body, now);
if effective.as_str() != want_status {
return false;
}
true
})
.collect();
matches.sort_by(|a, b| {
b.body
.created_at
.cmp(&a.body.created_at)
.then_with(|| a.body.ctx_id.as_str().cmp(b.body.ctx_id.as_str()))
});
let total_estimate = Some(matches.len() as u64);
let cursor_anchor = params
.cursor
.as_deref()
.map(decode_cursor)
.transpose()?
.flatten();
if let Some((anchor_ms, anchor_id)) = &cursor_anchor {
matches.retain(|c| {
let ms = c.body.created_at.timestamp_millis();
ms < *anchor_ms || (ms == *anchor_ms && c.body.ctx_id.as_str() > anchor_id.as_str())
});
}
let limit = params.limit.unwrap_or(50).clamp(1, 100) as usize;
let next_cursor = if matches.len() > limit {
matches.get(limit - 1).map(|c| {
encode_cursor(c.body.created_at.timestamp_millis(), c.body.ctx_id.as_str())
})
} else {
None
};
let projected: Vec<SearchResult> = matches
.iter()
.take(limit)
.map(|ctx| SearchResult {
ctx_id: ctx.body.ctx_id.clone(),
lineage_id: ctx.body.lineage_id.clone(),
agent_id: ctx.body.agent_id.clone(),
title: ctx.body.title.clone(),
summary: ctx.body.summary.clone(),
context_type: ctx.body.context_type.clone(),
domain: ctx.body.domain.clone(),
created_at: ctx.body.created_at,
status: project_status(&ctx.registry_state.status, &ctx.body, now),
visibility: Some(ctx.body.visibility.clone()),
})
.collect();
Ok(SearchResponse {
matches: projected,
total_estimate,
next_cursor,
})
}
}
fn parse_opt_rfc3339(
s: &Option<String>,
) -> Result<Option<chrono::DateTime<chrono::Utc>>, AcdpError> {
let Some(raw) = s.as_deref() else {
return Ok(None);
};
let dt = chrono::DateTime::parse_from_rfc3339(raw)
.map_err(|e| AcdpError::SchemaViolation(format!("malformed datetime '{raw}': {e}")))?;
Ok(Some(dt.with_timezone(&chrono::Utc)))
}
const CURSOR_TTL: chrono::Duration = chrono::Duration::seconds(3600);
fn encode_cursor(created_at_ms: i64, ctx_id: &str) -> String {
use base64::{engine::general_purpose::STANDARD, Engine};
let mint_ms = chrono::Utc::now().timestamp_millis();
STANDARD.encode(format!("{mint_ms}:{created_at_ms}:{ctx_id}"))
}
fn decode_cursor(s: &str) -> Result<Option<(i64, String)>, AcdpError> {
use base64::{engine::general_purpose::STANDARD, Engine};
let bytes = STANDARD
.decode(s)
.map_err(|_| AcdpError::InvalidCursor("cursor is not valid base64".into()))?;
let decoded = String::from_utf8(bytes)
.map_err(|_| AcdpError::InvalidCursor("cursor is not utf-8".into()))?;
let mut parts = decoded.splitn(3, ':');
let mint_str = parts
.next()
.ok_or_else(|| AcdpError::InvalidCursor("cursor missing mint timestamp".into()))?;
let anchor_str = parts
.next()
.ok_or_else(|| AcdpError::InvalidCursor("cursor missing anchor timestamp".into()))?;
let ctx_id = parts
.next()
.ok_or_else(|| AcdpError::InvalidCursor("cursor missing ctx_id".into()))?;
let mint_ms: i64 = mint_str
.parse()
.map_err(|_| AcdpError::InvalidCursor("cursor mint millis is not an integer".into()))?;
let anchor_ms: i64 = anchor_str
.parse()
.map_err(|_| AcdpError::InvalidCursor("cursor anchor millis is not an integer".into()))?;
let now = chrono::Utc::now().timestamp_millis();
let age_ms = now.saturating_sub(mint_ms);
if age_ms > CURSOR_TTL.num_milliseconds() {
return Err(AcdpError::CursorExpired);
}
Ok(Some((anchor_ms, ctx_id.to_string())))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::crypto::SigningKey;
use crate::producer::Producer;
use crate::types::body::{DataPeriod, Signature};
use crate::types::primitives::{AgentDid, ContentHash, ContextType, Visibility};
use chrono::Utc;
fn fake_body(ctx_id: &str, lineage_id: &str, title: &str) -> Body {
Body {
ctx_id: CtxId(ctx_id.into()),
lineage_id: LineageId(lineage_id.into()),
origin_registry: "registry.example.com".into(),
created_at: Utc::now(),
content_hash: ContentHash("sha256:0".into()),
signature: Signature {
algorithm: "ed25519".into(),
key_id: "did:web:agents.example.com:test#key-1".into(),
value: "A".repeat(88),
},
version: 1,
supersedes: None,
agent_id: AgentDid::new("did:web:agents.example.com:test"),
contributors: vec![],
title: title.into(),
context_type: ContextType::DataSnapshot,
data_refs: vec![],
derived_from: vec![],
visibility: Visibility::Public,
audience: None,
acdp_version: None,
description: None,
summary: None,
tags: None,
domain: None,
expires_at: None,
data_period: None,
metadata: None,
schema_uri: None,
extensions: Default::default(),
}
}
#[test]
fn put_get_round_trip() {
let s = InMemoryStore::new();
let id = "acdp://r/12345678-1234-4321-8123-123456781234";
let lin = "lin:sha256:1111111111111111111111111111111111111111111111111111111111111111";
s.put(fake_body(id, lin, "A")).unwrap();
let got = s.get(&CtxId(id.into())).unwrap().unwrap();
assert_eq!(got.body.title, "A");
assert!(matches!(got.registry_state.status, Status::Active));
}
#[test]
fn lineage_orders_by_publish_order() {
let s = InMemoryStore::new();
let lin = "lin:sha256:2222222222222222222222222222222222222222222222222222222222222222";
let v1 = "acdp://r/12345678-1234-4321-8123-000000000001";
let v2 = "acdp://r/12345678-1234-4321-8123-000000000002";
s.put(fake_body(v1, lin, "v1")).unwrap();
s.put(fake_body(v2, lin, "v2")).unwrap();
let lineage = s.lineage(&LineageId(lin.into())).unwrap();
assert_eq!(lineage.len(), 2);
assert_eq!(lineage[0].body.title, "v1");
assert_eq!(lineage[1].body.title, "v2");
}
#[test]
fn supersession_marks_predecessor() {
let s = InMemoryStore::new();
let lin = "lin:sha256:3333333333333333333333333333333333333333333333333333333333333333";
let v1 = "acdp://r/12345678-1234-4321-8123-000000000003";
s.put(fake_body(v1, lin, "v1")).unwrap();
s.mark_superseded(&CtxId(v1.into())).unwrap();
let got = s.get(&CtxId(v1.into())).unwrap().unwrap();
assert!(matches!(got.registry_state.status, Status::Superseded));
}
fn expired_body(
ctx_id: &str,
lineage_id: &str,
title: &str,
expires_at: chrono::DateTime<chrono::Utc>,
) -> Body {
let mut b = fake_body(ctx_id, lineage_id, title);
b.expires_at = Some(expires_at);
b
}
#[test]
fn get_projects_active_to_expired_when_past_expires_at() {
use chrono::Duration;
let s = InMemoryStore::new();
let lin = "lin:sha256:5555555555555555555555555555555555555555555555555555555555555555";
let id = "acdp://r/12345678-1234-4321-8123-000000000006";
s.put(expired_body(
id,
lin,
"old",
chrono::Utc::now() - Duration::hours(1),
))
.unwrap();
let got = s.get(&CtxId(id.into())).unwrap().unwrap();
assert!(
matches!(got.registry_state.status, Status::Expired),
"expected Status::Expired projection, got {:?}",
got.registry_state.status
);
}
#[test]
fn get_keeps_active_when_expires_at_in_future() {
use chrono::Duration;
let s = InMemoryStore::new();
let lin = "lin:sha256:6666666666666666666666666666666666666666666666666666666666666666";
let id = "acdp://r/12345678-1234-4321-8123-000000000007";
s.put(expired_body(
id,
lin,
"fresh",
chrono::Utc::now() + Duration::hours(1),
))
.unwrap();
let got = s.get(&CtxId(id.into())).unwrap().unwrap();
assert!(matches!(got.registry_state.status, Status::Active));
}
#[test]
fn search_status_active_filters_out_expired() {
use chrono::Duration;
let s = InMemoryStore::new();
let lin = "lin:sha256:7777777777777777777777777777777777777777777777777777777777777777";
let id = "acdp://r/12345678-1234-4321-8123-000000000008";
s.put(expired_body(
id,
lin,
"old",
chrono::Utc::now() - Duration::hours(1),
))
.unwrap();
let resp = s.search(&SearchParams::default(), None, true).unwrap();
assert!(
resp.matches.is_empty(),
"expired must not surface under status=active default"
);
let resp = s
.search(
&SearchParams {
status: Some("expired".into()),
..Default::default()
},
None,
true,
)
.unwrap();
assert_eq!(resp.matches.len(), 1);
}
#[test]
fn search_filters_by_created_after() {
let s = InMemoryStore::new();
let lin = "lin:sha256:8888888888888888888888888888888888888888888888888888888888888888";
let mut body = fake_body(
"acdp://r/12345678-1234-4321-8123-000000000009",
lin,
"match",
);
body.created_at = chrono::DateTime::parse_from_rfc3339("2026-01-01T00:00:00.000Z")
.unwrap()
.with_timezone(&chrono::Utc);
s.put(body).unwrap();
let resp = s
.search(
&SearchParams {
created_after: Some("2026-02-01T00:00:00.000Z".into()),
..Default::default()
},
None,
true,
)
.unwrap();
assert_eq!(resp.matches.len(), 0);
let resp = s
.search(
&SearchParams {
created_after: Some("2025-12-01T00:00:00.000Z".into()),
..Default::default()
},
None,
true,
)
.unwrap();
assert_eq!(resp.matches.len(), 1);
}
#[test]
fn search_invalid_rfc3339_filter_rejected() {
let s = InMemoryStore::new();
let err = s
.search(
&SearchParams {
created_after: Some("not-a-date".into()),
..Default::default()
},
None,
true,
)
.unwrap_err();
assert!(matches!(err, AcdpError::SchemaViolation(_)));
}
#[test]
fn search_cursor_pages_results() {
let s = InMemoryStore::new();
let lin = "lin:sha256:9999999999999999999999999999999999999999999999999999999999999999";
let base = chrono::DateTime::parse_from_rfc3339("2026-01-01T00:00:00.000Z")
.unwrap()
.with_timezone(&chrono::Utc);
for i in 0..5u8 {
let mut body = fake_body(
&format!("acdp://r/12345678-1234-4321-8123-00000000010{i}"),
lin,
"match",
);
body.created_at = base + chrono::Duration::minutes(i as i64);
s.put(body).unwrap();
}
let p1 = s
.search(
&SearchParams {
limit: Some(2),
..Default::default()
},
None,
true,
)
.unwrap();
assert_eq!(p1.matches.len(), 2);
let cursor = p1.next_cursor.expect("page 1 should carry a cursor");
let p2 = s
.search(
&SearchParams {
limit: Some(2),
cursor: Some(cursor.clone()),
..Default::default()
},
None,
true,
)
.unwrap();
assert_eq!(p2.matches.len(), 2);
for r in &p2.matches {
assert!(
!p1.matches.iter().any(|q| q.ctx_id == r.ctx_id),
"page 2 overlapped page 1"
);
}
assert_eq!(
p1.total_estimate, p2.total_estimate,
"total_estimate MUST be stable across pages (BUG-08); \
p1={:?}, p2={:?}",
p1.total_estimate, p2.total_estimate
);
assert_eq!(
p1.total_estimate,
Some(5),
"total_estimate MUST reflect total matches across all pages, got {:?}",
p1.total_estimate
);
}
#[test]
fn search_limit_zero_does_not_underflow() {
let s = InMemoryStore::new();
let lin = "lin:sha256:8888888888888888888888888888888888888888888888888888888888888888";
for i in 0..3u8 {
let body = fake_body(
&format!("acdp://r/12345678-1234-4321-8123-00000000020{i}"),
lin,
"match",
);
s.put(body).unwrap();
}
let page = s
.search(
&SearchParams {
limit: Some(0),
..Default::default()
},
None,
true,
)
.expect("limit=0 must not panic or error");
assert_eq!(page.matches.len(), 1);
assert!(page.next_cursor.is_some());
}
#[test]
fn search_malformed_cursor_rejected() {
let s = InMemoryStore::new();
let err = s
.search(
&SearchParams {
cursor: Some("not_base64!@#".into()),
..Default::default()
},
None,
true,
)
.unwrap_err();
assert!(matches!(err, AcdpError::InvalidCursor(_)));
}
#[test]
fn search_aged_cursor_rejected_as_cursor_expired() {
use base64::{engine::general_purpose::STANDARD, Engine};
let s = InMemoryStore::new();
let stale_mint_ms = chrono::Utc::now().timestamp_millis() - 7200 * 1000;
let aged = STANDARD.encode(format!(
"{stale_mint_ms}:0:acdp://r/12345678-1234-4321-8123-1234567812aa"
));
let err = s
.search(
&SearchParams {
cursor: Some(aged),
..Default::default()
},
None,
true,
)
.unwrap_err();
assert!(
matches!(err, AcdpError::CursorExpired),
"expired cursor MUST surface CursorExpired, got {err:?}"
);
}
#[test]
fn search_filters_by_status_default_active() {
let s = InMemoryStore::new();
let lin = "lin:sha256:4444444444444444444444444444444444444444444444444444444444444444";
let v1 = "acdp://r/12345678-1234-4321-8123-000000000004";
let v2 = "acdp://r/12345678-1234-4321-8123-000000000005";
s.put(fake_body(v1, lin, "old")).unwrap();
s.put(fake_body(v2, lin, "new")).unwrap();
s.mark_superseded(&CtxId(v1.into())).unwrap();
let resp = s
.search(
&SearchParams {
q: Some("old".into()),
..Default::default()
},
None,
true,
)
.unwrap();
assert_eq!(resp.matches.len(), 0);
let resp = s
.search(
&SearchParams {
q: Some("new".into()),
..Default::default()
},
None,
true,
)
.unwrap();
assert_eq!(resp.matches.len(), 1);
}
#[test]
fn store_round_trip_from_real_publish_request() {
use crate::registry::server::RegistryServer;
use crate::types::capabilities::{CapabilitiesDocument, Limits};
let key = SigningKey::from_bytes(&[7u8; 32]);
let p = Producer::new(
key,
AgentDid::new("did:web:agents.example.com:test"),
"did:web:agents.example.com:test#key-1",
);
let req = p
.publish_request()
.title("hello")
.context_type(ContextType::DataSnapshot)
.visibility(Visibility::Public)
.build()
.unwrap();
let caps = CapabilitiesDocument {
acdp_version: "0.1.0".into(),
registry_did: "did:web:registry.example.com".into(),
supported_signature_algorithms: vec!["ed25519".into()],
supported_did_methods: vec!["did:web".into()],
profiles: vec!["acdp-registry-core".into()],
limits: Limits {
max_payload_bytes: 1_048_576,
max_embedded_bytes: 65_536,
idempotency_key_ttl_seconds: None,
},
read_authentication_methods: vec![],
anonymous_public_reads: true,
supports_idempotency_key: false,
extensions: Default::default(),
};
let server = RegistryServer::new(InMemoryStore::new(), caps, "registry.example.com");
let resp = server.publish_unverified_for_tests(&req).unwrap();
assert_eq!(resp.version, 1);
let ctx = server.retrieve(&resp.ctx_id, None).unwrap().unwrap();
assert_eq!(ctx.body.title, "hello");
let _: Option<DataPeriod> = ctx.body.data_period.clone();
}
}