use std::collections::HashMap;
use std::sync::Arc;
use axum::Json;
use axum::Router;
use axum::extract::{Path, Query, State};
use axum::http::{StatusCode, header};
use axum::response::IntoResponse;
use axum::routing::get;
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use scp_core::context::broadcast::BroadcastAdmission;
use scp_core::context::params::{ProjectionPolicy, ProjectionRule};
use scp_core::crypto::sender_keys::{BroadcastEnvelope, BroadcastKey, open_broadcast_trusted};
use scp_core::crypto::ucan::CapabilityUri;
use scp_core::crypto::ucan::validate::parse_ucan;
use scp_transport::native::storage::BlobStorage;
use crate::error::ApiError;
use crate::http::NodeState;
#[must_use]
pub fn compute_routing_id(context_id: &str) -> [u8; 32] {
let normalized = context_id.to_ascii_lowercase();
let mut hasher = Sha256::new();
hasher.update(normalized.as_bytes());
hasher.finalize().into()
}
#[derive(Debug)]
pub struct ProjectedContext {
pub(crate) routing_id: [u8; 32],
pub(crate) context_id: String,
pub(crate) keys: HashMap<u64, BroadcastKey>,
pub(crate) admission: BroadcastAdmission,
pub(crate) projection_policy: Option<ProjectionPolicy>,
}
impl ProjectedContext {
#[must_use]
pub fn new(
context_id: &str,
broadcast_key: BroadcastKey,
admission: BroadcastAdmission,
projection_policy: Option<ProjectionPolicy>,
) -> Self {
let routing_id = compute_routing_id(context_id);
let epoch = broadcast_key.epoch();
let mut keys = HashMap::new();
keys.insert(epoch, broadcast_key);
Self {
routing_id,
context_id: context_id.to_owned(),
keys,
admission,
projection_policy,
}
}
#[must_use]
pub const fn routing_id(&self) -> &[u8; 32] {
&self.routing_id
}
#[must_use]
pub fn context_id(&self) -> &str {
&self.context_id
}
#[must_use]
pub const fn keys(&self) -> &HashMap<u64, BroadcastKey> {
&self.keys
}
#[must_use]
pub const fn admission(&self) -> BroadcastAdmission {
self.admission
}
#[must_use]
pub const fn projection_policy(&self) -> Option<&ProjectionPolicy> {
self.projection_policy.as_ref()
}
pub fn insert_key(&mut self, broadcast_key: BroadcastKey) {
let epoch = broadcast_key.epoch();
self.keys.insert(epoch, broadcast_key);
}
pub fn retain_only_epochs(&mut self, epochs: &std::collections::HashSet<u64>) {
self.keys.retain(|e, _| epochs.contains(e));
}
#[must_use]
pub fn key_for_epoch(&self, epoch: u64) -> Option<&BroadcastKey> {
self.keys.get(&epoch)
}
}
#[must_use]
pub fn hex_encode(bytes: &[u8; 32]) -> String {
hex::encode(bytes)
}
#[must_use]
pub fn hex_decode(s: &str) -> Option<[u8; 32]> {
let bytes = hex::decode(s).ok()?;
<[u8; 32]>::try_from(bytes.as_slice()).ok()
}
pub fn validate_projection_policy(
admission: BroadcastAdmission,
policy: Option<&ProjectionPolicy>,
) -> Result<(), String> {
if let (BroadcastAdmission::Gated, Some(p)) = (admission, policy) {
if p.default_rule == ProjectionRule::Public {
return Err("gated context cannot have public projection rule".into());
}
for override_entry in &p.overrides {
if override_entry.rule == ProjectionRule::Public {
return Err(
"gated context cannot have public per-author projection override".into(),
);
}
}
}
Ok(())
}
fn extract_bearer_token(headers: &axum::http::HeaderMap) -> Option<&str> {
let value = headers.get(header::AUTHORIZATION)?.to_str().ok()?;
if value.len() > 7 && value[..7].eq_ignore_ascii_case("bearer ") {
Some(&value[7..])
} else {
None
}
}
fn validate_projection_ucan(
token: &str,
context_id: &str,
) -> Result<(), Box<axum::response::Response>> {
let ucan = parse_ucan(token).map_err(|e| {
tracing::debug!(error = %e, "UCAN parse failed for projection auth");
Box::new(ApiError::unauthorized_with("invalid UCAN token").into_response())
})?;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
if ucan.payload.exp <= now {
tracing::debug!(
exp = ucan.payload.exp,
now = now,
"UCAN expired for projection auth"
);
return Err(Box::new(
ApiError::unauthorized_with("UCAN token expired").into_response(),
));
}
if let Some(nbf) = ucan.payload.nbf
&& nbf > now
{
tracing::debug!(
nbf = nbf,
now = now,
"UCAN not yet valid for projection auth"
);
return Err(Box::new(
ApiError::unauthorized_with("UCAN token not yet valid").into_response(),
));
}
let required = CapabilityUri::new(context_id, "messages", "read");
let has_capability = ucan.payload.att.iter().any(|att| {
att.with
.parse::<CapabilityUri>()
.is_ok_and(|cap| cap.matches(&required))
});
if !has_capability {
tracing::debug!(
context_id = context_id,
"UCAN missing messages:read capability for context"
);
return Err(Box::new(
ApiError::unauthorized_with("UCAN missing messages:read capability").into_response(),
));
}
Ok(())
}
fn effective_projection_rule(
admission: BroadcastAdmission,
policy: Option<&ProjectionPolicy>,
author_did: Option<&str>,
) -> ProjectionRule {
match policy {
Some(p) => {
if let Some(author) = author_did
&& let Some(ov) = p.overrides.iter().find(|o| o.did.as_ref() == author)
{
return ov.rule;
}
p.default_rule
}
None => match admission {
BroadcastAdmission::Gated => ProjectionRule::Gated,
BroadcastAdmission::Open => ProjectionRule::Public,
},
}
}
fn check_projection_auth(
headers: &axum::http::HeaderMap,
context_id: &str,
rule: ProjectionRule,
) -> Result<(), Box<axum::response::Response>> {
match rule {
ProjectionRule::Public => Ok(()),
ProjectionRule::Gated | ProjectionRule::AuthorChoice => {
let token = extract_bearer_token(headers).ok_or_else(|| {
Box::new(
ApiError::unauthorized_with("Authorization required for gated broadcast")
.into_response(),
)
})?;
validate_projection_ucan(token, context_id)
}
}
}
#[must_use]
pub fn unix_to_iso8601(ts: u64) -> String {
const DAYS_IN_MONTH: [[u64; 12]; 2] = [
[31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31],
[31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31],
];
const fn is_leap(y: u64) -> bool {
(y.is_multiple_of(4) && !y.is_multiple_of(100)) || y.is_multiple_of(400)
}
const fn days_in_year(y: u64) -> u64 {
if is_leap(y) { 366 } else { 365 }
}
let mut remaining = ts;
let seconds = remaining % 60;
remaining /= 60;
let minutes = remaining % 60;
remaining /= 60;
let hours = remaining % 24;
let mut days = remaining / 24;
let mut year = 1970u64;
loop {
let dy = days_in_year(year);
if days < dy {
break;
}
days -= dy;
year += 1;
}
let leap = usize::from(is_leap(year));
let mut month = 0usize;
while month < 11 && days >= DAYS_IN_MONTH[leap][month] {
days -= DAYS_IN_MONTH[leap][month];
month += 1;
}
format!(
"{year:04}-{:02}-{:02}T{hours:02}:{minutes:02}:{seconds:02}Z",
month + 1,
days + 1
)
}
#[derive(Debug, Clone, Serialize)]
pub struct FeedResponse {
pub context_id: String,
pub author_did: String,
pub messages: Vec<FeedMessage>,
}
#[derive(Debug, Clone, Serialize)]
pub struct FeedMessage {
pub id: String,
pub context_id: String,
pub author_did: String,
pub sequence: u64,
pub timestamp: u64,
pub key_epoch: u64,
pub published_at: String,
pub content: String,
}
#[derive(Debug, Clone, Deserialize)]
pub struct FeedQuery {
pub since: Option<String>,
pub limit: Option<u32>,
}
const DEFAULT_FEED_LIMIT: u32 = 20;
const MAX_FEED_LIMIT: u32 = 100;
fn decrypt_blobs(
blobs: &[scp_transport::native::storage::StoredBlob],
keys: &HashMap<u64, BroadcastKey>,
) -> (Vec<FeedMessage>, Option<[u8; 32]>) {
let mut messages = Vec::with_capacity(blobs.len());
let mut latest_blob_id: Option<[u8; 32]> = None;
for stored in blobs {
let envelope: BroadcastEnvelope = match rmp_serde::from_slice(&stored.blob) {
Ok(env) => env,
Err(e) => {
tracing::warn!(
blob_id = hex_encode(&stored.blob_id),
error = %e,
"failed to deserialize BroadcastEnvelope, skipping"
);
continue;
}
};
let Some(key) = keys.get(&envelope.key_epoch) else {
tracing::warn!(
blob_id = hex_encode(&stored.blob_id),
epoch = envelope.key_epoch,
"no broadcast key for epoch, skipping"
);
continue;
};
let plaintext = match open_broadcast_trusted(key, &envelope) {
Ok(pt) => pt,
Err(e) => {
tracing::warn!(
blob_id = hex_encode(&stored.blob_id),
epoch = envelope.key_epoch,
error = %e,
"decryption failed, skipping"
);
continue;
}
};
latest_blob_id = Some(stored.blob_id);
messages.push(FeedMessage {
id: hex_encode(&stored.blob_id),
context_id: envelope.context_id,
author_did: envelope.author_did,
sequence: envelope.sequence,
timestamp: envelope.timestamp,
key_epoch: envelope.key_epoch,
published_at: unix_to_iso8601(stored.stored_at),
content: BASE64.encode(&plaintext),
});
}
(messages, latest_blob_id)
}
#[allow(clippy::too_many_lines)]
pub async fn feed_handler(
State(state): State<Arc<NodeState>>,
Path(routing_id_hex): Path<String>,
Query(params): Query<FeedQuery>,
headers: axum::http::HeaderMap,
) -> impl IntoResponse {
let Some(routing_id) = hex_decode(&routing_id_hex) else {
return ApiError::bad_request("invalid routing_id hex").into_response();
};
let projected_contexts = state.projected_contexts.read().await;
let Some(projected) = projected_contexts.get(&routing_id) else {
return ApiError::not_found("unknown routing_id").into_response();
};
let context_id = projected.context_id.clone();
let admission = projected.admission;
let projection_policy = projected.projection_policy.clone();
let keys: HashMap<u64, BroadcastKey> = projected.keys.clone();
drop(projected_contexts);
let rule = effective_projection_rule(admission, projection_policy.as_ref(), None);
if let Err(resp) = check_projection_auth(&headers, &context_id, rule) {
return *resp;
}
let since_ts: Option<u64> = if let Some(ref since_hex) = params.since {
let Some(since_blob_id) = hex_decode(since_hex) else {
return ApiError::bad_request("invalid since blob_id hex").into_response();
};
match state.blob_storage.get(&since_blob_id).await {
Ok(Some(blob)) => {
if blob.routing_id != routing_id {
return ApiError::bad_request("since blob_id does not belong to this context")
.into_response();
}
Some(blob.stored_at)
}
Ok(None) => {
Some(u64::MAX)
}
Err(e) => {
tracing::warn!(
error = %e,
since_blob_id = since_hex,
"failed to look up since blob"
);
Some(u64::MAX)
}
}
} else {
None
};
let limit = params
.limit
.unwrap_or(DEFAULT_FEED_LIMIT)
.min(MAX_FEED_LIMIT);
let blobs = match state.blob_storage.query(&routing_id, since_ts, limit).await {
Ok(blobs) => blobs,
Err(e) => {
tracing::error!(
error = %e,
routing_id = routing_id_hex,
"blob storage query failed"
);
return ApiError::internal_error("storage error").into_response();
}
};
let (mut messages, latest_blob_id) = decrypt_blobs(&blobs, &keys);
if let Some(ref policy) = projection_policy
&& !policy.overrides.is_empty()
{
let auth_public = true; let mut auth_gated: Option<bool> = None;
let mut auth_author_choice: Option<bool> = None;
match rule {
ProjectionRule::Public => {} ProjectionRule::Gated => {
auth_gated = Some(true);
}
ProjectionRule::AuthorChoice => {
auth_author_choice = Some(true);
}
}
for ov in &policy.overrides {
if ov.rule == ProjectionRule::Gated && auth_gated.is_none() {
auth_gated = Some(
check_projection_auth(&headers, &context_id, ProjectionRule::Gated).is_ok(),
);
} else if ov.rule == ProjectionRule::AuthorChoice && auth_author_choice.is_none() {
auth_author_choice = Some(
check_projection_auth(&headers, &context_id, ProjectionRule::AuthorChoice)
.is_ok(),
);
}
}
messages.retain(|msg| {
let author_rule =
effective_projection_rule(admission, Some(policy), Some(&msg.author_did));
match author_rule {
ProjectionRule::Public => auth_public,
ProjectionRule::Gated => auth_gated.unwrap_or(true),
ProjectionRule::AuthorChoice => auth_author_choice.unwrap_or(true),
}
});
}
let author_did = messages
.first()
.map(|m| m.author_did.clone())
.unwrap_or_default();
let response = FeedResponse {
context_id,
author_did,
messages,
};
let etag = latest_blob_id
.map(|id| format!("\"{}\"", hex_encode(&id)))
.unwrap_or_default();
let has_per_author_overrides = projection_policy
.as_ref()
.is_some_and(|p| !p.overrides.is_empty());
let cache_control = match rule {
ProjectionRule::Public if !has_per_author_overrides => {
"public, max-age=30, stale-while-revalidate=300"
}
_ => "private, max-age=30",
};
let mut resp_headers = axum::http::HeaderMap::new();
resp_headers.insert(
header::CACHE_CONTROL,
axum::http::HeaderValue::from_static(cache_control),
);
if let (false, Ok(val)) = (etag.is_empty(), axum::http::HeaderValue::from_str(&etag)) {
resp_headers.insert(header::ETAG, val);
}
(StatusCode::OK, resp_headers, Json(response)).into_response()
}
#[allow(clippy::too_many_lines)]
pub async fn message_handler(
State(state): State<Arc<NodeState>>,
Path((routing_id_hex, blob_id_hex_raw)): Path<(String, String)>,
headers: axum::http::HeaderMap,
) -> impl IntoResponse {
let blob_id_hex = blob_id_hex_raw.to_ascii_lowercase();
let Some(routing_id) = hex_decode(&routing_id_hex) else {
return ApiError::bad_request("invalid routing_id hex").into_response();
};
let Some(blob_id) = hex_decode(&blob_id_hex) else {
return ApiError::bad_request("invalid blob_id hex").into_response();
};
let projected_contexts = state.projected_contexts.read().await;
let Some(projected) = projected_contexts.get(&routing_id) else {
return ApiError::not_found("unknown routing_id").into_response();
};
let context_id = projected.context_id.clone();
let admission = projected.admission;
let projection_policy = projected.projection_policy.clone();
let keys: HashMap<u64, BroadcastKey> = projected.keys.clone();
drop(projected_contexts);
let default_rule = effective_projection_rule(admission, projection_policy.as_ref(), None);
if matches!(
default_rule,
ProjectionRule::Gated | ProjectionRule::AuthorChoice
) && let Err(resp) = check_projection_auth(&headers, &context_id, default_rule)
{
return *resp;
}
let stored = match state.blob_storage.get(&blob_id).await {
Ok(Some(blob)) => blob,
Ok(None) => {
return ApiError::not_found("unknown blob_id").into_response();
}
Err(e) => {
tracing::error!(
error = %e,
blob_id = blob_id_hex,
"blob storage get failed"
);
return ApiError::internal_error("storage error").into_response();
}
};
if stored.routing_id != routing_id {
return ApiError::not_found("unknown blob_id").into_response();
}
if let Some(inm) = headers.get(header::IF_NONE_MATCH)
&& let Ok(inm_str) = inm.to_str()
{
let expected_etag = format!("\"{blob_id_hex}\"");
if inm_str == expected_etag {
return StatusCode::NOT_MODIFIED.into_response();
}
}
let envelope: BroadcastEnvelope = match rmp_serde::from_slice(&stored.blob) {
Ok(env) => env,
Err(e) => {
tracing::error!(
error = %e,
blob_id = blob_id_hex,
"failed to deserialize BroadcastEnvelope"
);
return ApiError::internal_error("decryption failure").into_response();
}
};
let effective_rule = effective_projection_rule(
admission,
projection_policy.as_ref(),
Some(&envelope.author_did),
);
if effective_rule != default_rule
&& let Err(resp) = check_projection_auth(&headers, &context_id, effective_rule)
{
return *resp;
}
let Some(key) = keys.get(&envelope.key_epoch) else {
tracing::warn!(
blob_id = blob_id_hex,
epoch = envelope.key_epoch,
"no broadcast key for epoch (likely purged after governance ban)"
);
return ApiError::gone("content revoked").into_response();
};
let plaintext = match open_broadcast_trusted(key, &envelope) {
Ok(pt) => pt,
Err(e) => {
tracing::error!(
error = %e,
blob_id = blob_id_hex,
epoch = envelope.key_epoch,
"decryption failed"
);
return ApiError::internal_error("decryption failure").into_response();
}
};
let message = FeedMessage {
id: hex_encode(&stored.blob_id),
context_id: envelope.context_id,
author_did: envelope.author_did,
sequence: envelope.sequence,
timestamp: envelope.timestamp,
key_epoch: envelope.key_epoch,
published_at: unix_to_iso8601(stored.stored_at),
content: BASE64.encode(&plaintext),
};
let cache_control = match effective_rule {
ProjectionRule::Public => "public, immutable, max-age=31536000",
ProjectionRule::Gated | ProjectionRule::AuthorChoice => {
"private, immutable, max-age=31536000"
}
};
let mut resp_headers = axum::http::HeaderMap::new();
resp_headers.insert(
header::CACHE_CONTROL,
axum::http::HeaderValue::from_static(cache_control),
);
let etag = format!("\"{blob_id_hex}\"");
if let Ok(val) = axum::http::HeaderValue::from_str(&etag) {
resp_headers.insert(header::ETAG, val);
}
(StatusCode::OK, resp_headers, Json(message)).into_response()
}
pub fn broadcast_projection_router(state: Arc<NodeState>) -> Router {
let limiter = state.projection_rate_limiter.clone();
Router::new()
.route("/scp/broadcast/{routing_id}/feed", get(feed_handler))
.route(
"/scp/broadcast/{routing_id}/messages/{blob_id}",
get(message_handler),
)
.layer(axum::middleware::from_fn(move |req, next| {
projection_rate_limit_middleware(req, next, limiter.clone())
}))
.with_state(state)
}
async fn projection_rate_limit_middleware(
request: axum::extract::Request,
next: axum::middleware::Next,
limiter: scp_transport::relay::rate_limit::PublishRateLimiter,
) -> axum::response::Response {
let ip = request
.extensions()
.get::<axum::extract::ConnectInfo<std::net::SocketAddr>>()
.map_or_else(
|| {
tracing::warn!(
"ConnectInfo missing from request extensions; \
projection rate limiting falls back to shared 0.0.0.0 bucket \
(per-IP isolation lost)"
);
std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED)
},
|ci| ci.0.ip(),
);
if !limiter.check(ip).await {
return (
axum::http::StatusCode::TOO_MANY_REQUESTS,
"rate limit exceeded",
)
.into_response();
}
next.run(request).await
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use ed25519_dalek::Signer;
use scp_core::crypto::sender_keys::{
SealBroadcastParams, SigningPayloadFields, build_broadcast_signing_payload,
compute_provenance_hash, generate_broadcast_key, generate_broadcast_nonce,
rotate_broadcast_key, seal_broadcast,
};
fn test_seal(key: &BroadcastKey, payload: &[u8]) -> BroadcastEnvelope {
let sk = ed25519_dalek::SigningKey::from_bytes(&[0xAA; 32]);
let nonce = generate_broadcast_nonce();
let provenance_hash = compute_provenance_hash(None).unwrap();
let signature = sk.sign(&build_broadcast_signing_payload(&SigningPayloadFields {
version: scp_core::envelope::SCP_PROTOCOL_VERSION,
context_id: "test-ctx",
author_did: key.author_did(),
sequence: 1,
key_epoch: key.epoch(),
timestamp: 1_700_000_000_000,
nonce: &nonce,
provenance_hash: &provenance_hash,
}));
let params = SealBroadcastParams {
context_id: "test-ctx",
sequence: 1,
timestamp: 1_700_000_000_000,
provenance: None,
signature,
};
seal_broadcast(key, payload, &nonce, ¶ms).unwrap()
}
#[test]
fn hex_encode_produces_64_char_lowercase() {
let bytes = [0xab; 32];
let encoded = hex_encode(&bytes);
assert_eq!(encoded.len(), 64);
assert_eq!(encoded, "ab".repeat(32));
}
#[test]
fn hex_encode_all_zeros() {
let bytes = [0u8; 32];
assert_eq!(hex_encode(&bytes), "00".repeat(32));
}
#[test]
fn hex_decode_roundtrip() {
let bytes = [0xde; 32];
let encoded = hex_encode(&bytes);
let decoded = hex_decode(&encoded).unwrap();
assert_eq!(decoded, bytes);
}
#[test]
fn hex_decode_rejects_wrong_length() {
assert!(hex_decode("abcd").is_none());
assert!(hex_decode("").is_none());
assert!(hex_decode(&"a".repeat(63)).is_none());
assert!(hex_decode(&"a".repeat(65)).is_none());
}
#[test]
fn hex_decode_rejects_invalid_chars() {
let mut s = "0".repeat(64);
s.replace_range(0..1, "g"); assert!(hex_decode(&s).is_none());
}
#[test]
fn hex_decode_accepts_uppercase() {
let lower = hex_encode(&[0xAB; 32]);
let upper = lower.to_uppercase();
let decoded = hex_decode(&upper).unwrap();
assert_eq!(decoded, [0xAB; 32]);
}
#[test]
fn unix_to_iso8601_epoch_zero() {
assert_eq!(unix_to_iso8601(0), "1970-01-01T00:00:00Z");
}
#[test]
fn unix_to_iso8601_known_timestamp() {
assert_eq!(unix_to_iso8601(1_736_937_000), "2025-01-15T10:30:00Z");
}
#[test]
fn unix_to_iso8601_y2k() {
assert_eq!(unix_to_iso8601(946_684_800), "2000-01-01T00:00:00Z");
}
#[test]
fn unix_to_iso8601_leap_year_feb_29() {
assert_eq!(unix_to_iso8601(1_709_208_000), "2024-02-29T12:00:00Z");
}
#[test]
fn decrypt_blobs_with_valid_envelope() {
let key = generate_broadcast_key("did:dht:alice");
let envelope = test_seal(&key, b"hello world");
let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
let blob_id = {
let mut hasher = Sha256::new();
hasher.update(&blob_bytes);
let h: [u8; 32] = hasher.finalize().into();
h
};
let stored = scp_transport::native::storage::StoredBlob {
routing_id: [0xAA; 32],
blob_id,
recipient_hint: None,
blob_ttl: 3600,
stored_at: 1_736_937_000,
blob: blob_bytes,
};
let mut keys = HashMap::new();
keys.insert(0, key);
let (messages, latest_id) = decrypt_blobs(&[stored], &keys);
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].author_did, "did:dht:alice");
assert_eq!(messages[0].key_epoch, 0);
assert_eq!(messages[0].published_at, "2025-01-15T10:30:00Z");
let decoded = BASE64.decode(&messages[0].content).unwrap();
assert_eq!(decoded, b"hello world");
assert_eq!(latest_id, Some(blob_id));
}
#[test]
fn decrypt_blobs_skips_invalid_msgpack() {
let stored = scp_transport::native::storage::StoredBlob {
routing_id: [0xAA; 32],
blob_id: [0xBB; 32],
recipient_hint: None,
blob_ttl: 3600,
stored_at: 100,
blob: vec![0xFF, 0xFE], };
let keys = HashMap::new();
let (messages, latest_id) = decrypt_blobs(&[stored], &keys);
assert!(messages.is_empty());
assert!(latest_id.is_none());
}
#[test]
fn decrypt_blobs_skips_missing_epoch_key() {
let key = generate_broadcast_key("did:dht:alice");
let envelope = test_seal(&key, b"secret");
let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
let stored = scp_transport::native::storage::StoredBlob {
routing_id: [0xAA; 32],
blob_id: [0xCC; 32],
recipient_hint: None,
blob_ttl: 3600,
stored_at: 100,
blob: blob_bytes,
};
let (rotated_key, _) = rotate_broadcast_key(&key, 1000).unwrap();
let mut keys = HashMap::new();
keys.insert(1, rotated_key);
let (messages, latest_id) = decrypt_blobs(&[stored], &keys);
assert!(messages.is_empty());
assert!(latest_id.is_none());
}
use std::net::SocketAddr;
use std::time::Instant;
use axum::body::Body;
use axum::http::{Request, StatusCode as HttpStatus};
use http_body_util::BodyExt;
use scp_transport::native::storage::{BlobStorageBackend, InMemoryBlobStorage};
use tokio::sync::RwLock;
use tower::ServiceExt;
use crate::http::NodeState;
fn test_state_with(
projected: HashMap<[u8; 32], ProjectedContext>,
storage: InMemoryBlobStorage,
) -> Arc<NodeState> {
test_state_with_rate(projected, storage, 1000)
}
fn test_state_with_rate(
projected: HashMap<[u8; 32], ProjectedContext>,
storage: InMemoryBlobStorage,
rate_limit: u32,
) -> Arc<NodeState> {
Arc::new(NodeState {
did: "did:dht:test".to_owned(),
relay_url: "wss://localhost/scp/v1".to_owned(),
broadcast_contexts: RwLock::new(HashMap::new()),
relay_addr: "127.0.0.1:9000".parse::<SocketAddr>().unwrap(),
bridge_secret: zeroize::Zeroizing::new([0u8; 32]),
dev_token: None,
dev_bind_addr: None,
projected_contexts: RwLock::new(projected),
blob_storage: Arc::new(BlobStorageBackend::from(storage)),
relay_config: scp_transport::native::server::RelayConfig::default(),
start_time: Instant::now(),
http_bind_addr: SocketAddr::from(([0, 0, 0, 0], 8443)),
shutdown_token: tokio_util::sync::CancellationToken::new(),
cors_origins: None,
projection_rate_limiter: scp_transport::relay::rate_limit::PublishRateLimiter::new(
rate_limit,
),
tls_config: None,
cert_resolver: None,
did_document: scp_identity::document::DidDocument {
context: vec!["https://www.w3.org/ns/did/v1".to_owned()],
id: "did:dht:test".to_owned(),
verification_method: vec![],
authentication: vec![],
assertion_method: vec![],
also_known_as: vec![],
service: vec![],
},
connection_tracker: scp_transport::relay::rate_limit::new_connection_tracker(),
subscription_registry: scp_transport::relay::subscription::new_registry(),
acme_challenges: None,
bridge_state: Arc::new(crate::bridge_handlers::BridgeState::new()),
})
}
#[tokio::test]
async fn feed_unknown_routing_id_returns_404() {
let state = test_state_with(HashMap::new(), InMemoryBlobStorage::new());
let router = broadcast_projection_router(state);
let routing_hex = hex_encode(&[0xAA; 32]);
let req = Request::builder()
.uri(format!("/scp/broadcast/{routing_hex}/feed"))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::NOT_FOUND);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["code"], "NOT_FOUND");
}
#[tokio::test]
async fn feed_invalid_hex_returns_400() {
let state = test_state_with(HashMap::new(), InMemoryBlobStorage::new());
let router = broadcast_projection_router(state);
let req = Request::builder()
.uri("/scp/broadcast/not_hex/feed")
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::BAD_REQUEST);
}
#[tokio::test]
async fn feed_returns_decrypted_messages_with_cache_headers() {
let key = generate_broadcast_key("did:dht:alice");
let context_id = "test_ctx_001";
let projected =
ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
let routing_id = projected.routing_id;
let mut projected_map = HashMap::new();
projected_map.insert(routing_id, projected);
let storage = InMemoryBlobStorage::new();
let envelope = test_seal(&key, b"hello feed");
let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
let blob_id = {
let mut h = Sha256::new();
h.update(&blob_bytes);
let r: [u8; 32] = h.finalize().into();
r
};
storage
.store(routing_id, blob_id, None, 3600, blob_bytes)
.await
.unwrap();
let state = test_state_with(projected_map, storage);
let router = broadcast_projection_router(state);
let routing_hex = hex_encode(&routing_id);
let req = Request::builder()
.uri(format!("/scp/broadcast/{routing_hex}/feed"))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::OK);
let cache_control = resp
.headers()
.get(header::CACHE_CONTROL)
.unwrap()
.to_str()
.unwrap();
assert_eq!(
cache_control,
"public, max-age=30, stale-while-revalidate=300"
);
let etag = resp.headers().get(header::ETAG).unwrap().to_str().unwrap();
assert!(etag.contains(&hex_encode(&blob_id)));
let body = resp.into_body().collect().await.unwrap().to_bytes();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["context_id"], "test_ctx_001");
assert_eq!(json["author_did"], "did:dht:alice");
let messages = json["messages"].as_array().unwrap();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0]["author_did"], "did:dht:alice");
assert_eq!(messages[0]["key_epoch"], 0);
let content_b64 = messages[0]["content"].as_str().unwrap();
let decoded = BASE64.decode(content_b64).unwrap();
assert_eq!(decoded, b"hello feed");
}
#[tokio::test]
async fn feed_respects_limit_parameter() {
let key = generate_broadcast_key("did:dht:alice");
let context_id = "limit_ctx";
let projected =
ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
let routing_id = projected.routing_id;
let mut projected_map = HashMap::new();
projected_map.insert(routing_id, projected);
let storage = InMemoryBlobStorage::new();
for i in 0u8..5 {
let envelope = test_seal(&key, &[i; 10]);
let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
let blob_id = {
let mut h = Sha256::new();
h.update(&blob_bytes);
let r: [u8; 32] = h.finalize().into();
r
};
storage
.store(routing_id, blob_id, None, 3600, blob_bytes)
.await
.unwrap();
}
let state = test_state_with(projected_map, storage);
let router = broadcast_projection_router(state);
let routing_hex = hex_encode(&routing_id);
let req = Request::builder()
.uri(format!("/scp/broadcast/{routing_hex}/feed?limit=2"))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::OK);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
let messages = json["messages"].as_array().unwrap();
assert_eq!(messages.len(), 2);
}
#[tokio::test]
async fn feed_limit_clamped_to_100() {
let key = generate_broadcast_key("did:dht:alice");
let context_id = "clamp_ctx";
let projected =
ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
let routing_id = projected.routing_id;
let mut projected_map = HashMap::new();
projected_map.insert(routing_id, projected);
let storage = InMemoryBlobStorage::new();
let state = test_state_with(projected_map, storage);
let router = broadcast_projection_router(state);
let routing_hex = hex_encode(&routing_id);
let req = Request::builder()
.uri(format!("/scp/broadcast/{routing_hex}/feed?limit=999"))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::OK);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert!(json["messages"].as_array().unwrap().is_empty());
}
#[tokio::test]
async fn feed_empty_context_returns_empty_messages() {
let key = generate_broadcast_key("did:dht:alice");
let context_id = "empty_ctx";
let projected = ProjectedContext::new(context_id, key, BroadcastAdmission::Open, None);
let routing_id = projected.routing_id;
let mut projected_map = HashMap::new();
projected_map.insert(routing_id, projected);
let state = test_state_with(projected_map, InMemoryBlobStorage::new());
let router = broadcast_projection_router(state);
let routing_hex = hex_encode(&routing_id);
let req = Request::builder()
.uri(format!("/scp/broadcast/{routing_hex}/feed"))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::OK);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["context_id"], "empty_ctx");
assert_eq!(json["author_did"], "");
assert!(json["messages"].as_array().unwrap().is_empty());
}
#[tokio::test]
async fn message_unknown_routing_id_returns_404() {
let state = test_state_with(HashMap::new(), InMemoryBlobStorage::new());
let router = broadcast_projection_router(state);
let routing_hex = hex_encode(&[0xAA; 32]);
let blob_hex = hex_encode(&[0xBB; 32]);
let req = Request::builder()
.uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::NOT_FOUND);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["code"], "NOT_FOUND");
}
#[tokio::test]
async fn message_unknown_blob_id_returns_404() {
let key = generate_broadcast_key("did:dht:alice");
let context_id = "msg_ctx_404";
let projected = ProjectedContext::new(context_id, key, BroadcastAdmission::Open, None);
let routing_id = projected.routing_id;
let mut projected_map = HashMap::new();
projected_map.insert(routing_id, projected);
let storage = InMemoryBlobStorage::new();
let state = test_state_with(projected_map, storage);
let router = broadcast_projection_router(state);
let routing_hex = hex_encode(&routing_id);
let blob_hex = hex_encode(&[0xCC; 32]); let req = Request::builder()
.uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::NOT_FOUND);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["code"], "NOT_FOUND");
}
#[tokio::test]
async fn message_returns_decrypted_single_message() {
let key = generate_broadcast_key("did:dht:alice");
let context_id = "msg_ctx_ok";
let projected =
ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
let routing_id = projected.routing_id;
let mut projected_map = HashMap::new();
projected_map.insert(routing_id, projected);
let storage = InMemoryBlobStorage::new();
let envelope = test_seal(&key, b"single message");
let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
let blob_id = {
let mut h = Sha256::new();
h.update(&blob_bytes);
let r: [u8; 32] = h.finalize().into();
r
};
storage
.store(routing_id, blob_id, None, 3600, blob_bytes)
.await
.unwrap();
let state = test_state_with(projected_map, storage);
let router = broadcast_projection_router(state);
let routing_hex = hex_encode(&routing_id);
let blob_hex = hex_encode(&blob_id);
let req = Request::builder()
.uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::OK);
let cache_control = resp
.headers()
.get(header::CACHE_CONTROL)
.unwrap()
.to_str()
.unwrap();
assert_eq!(cache_control, "public, immutable, max-age=31536000");
let etag = resp.headers().get(header::ETAG).unwrap().to_str().unwrap();
assert_eq!(etag, format!("\"{blob_hex}\""));
let body = resp.into_body().collect().await.unwrap().to_bytes();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["id"], blob_hex);
assert_eq!(json["author_did"], "did:dht:alice");
assert_eq!(json["key_epoch"], 0);
let content_b64 = json["content"].as_str().unwrap();
let decoded = BASE64.decode(content_b64).unwrap();
assert_eq!(decoded, b"single message");
}
#[tokio::test]
async fn message_conditional_get_returns_304() {
let key = generate_broadcast_key("did:dht:alice");
let context_id = "msg_ctx_304";
let projected =
ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
let routing_id = projected.routing_id;
let mut projected_map = HashMap::new();
projected_map.insert(routing_id, projected);
let storage = InMemoryBlobStorage::new();
let envelope = test_seal(&key, b"cached msg");
let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
let blob_id = {
let mut h = Sha256::new();
h.update(&blob_bytes);
let r: [u8; 32] = h.finalize().into();
r
};
storage
.store(routing_id, blob_id, None, 3600, blob_bytes)
.await
.unwrap();
let state = test_state_with(projected_map, storage);
let router = broadcast_projection_router(state);
let routing_hex = hex_encode(&routing_id);
let blob_hex = hex_encode(&blob_id);
let etag_value = format!("\"{blob_hex}\"");
let req = Request::builder()
.uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
.header("If-None-Match", &etag_value)
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::NOT_MODIFIED);
let body = resp.into_body().collect().await.unwrap().to_bytes();
assert!(body.is_empty());
}
#[tokio::test]
async fn message_conditional_get_non_matching_returns_200() {
let key = generate_broadcast_key("did:dht:alice");
let context_id = "msg_ctx_200";
let projected =
ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
let routing_id = projected.routing_id;
let mut projected_map = HashMap::new();
projected_map.insert(routing_id, projected);
let storage = InMemoryBlobStorage::new();
let envelope = test_seal(&key, b"fresh msg");
let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
let blob_id = {
let mut h = Sha256::new();
h.update(&blob_bytes);
let r: [u8; 32] = h.finalize().into();
r
};
storage
.store(routing_id, blob_id, None, 3600, blob_bytes)
.await
.unwrap();
let state = test_state_with(projected_map, storage);
let router = broadcast_projection_router(state);
let routing_hex = hex_encode(&routing_id);
let blob_hex = hex_encode(&blob_id);
let wrong_etag = format!("\"{}\"", hex_encode(&[0xFF; 32]));
let req = Request::builder()
.uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
.header("If-None-Match", &wrong_etag)
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::OK);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["id"], blob_hex);
}
#[tokio::test]
async fn message_invalid_hex_returns_400() {
let state = test_state_with(HashMap::new(), InMemoryBlobStorage::new());
let router = broadcast_projection_router(state);
let req = Request::builder()
.uri("/scp/broadcast/not_valid_hex/messages/also_not_hex")
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::BAD_REQUEST);
}
#[test]
fn compute_routing_id_is_sha256_of_context_id_bytes() {
let context_id = "abc123deadbeef";
let routing_id = compute_routing_id(context_id);
let mut hasher = Sha256::new();
hasher.update(context_id.as_bytes());
let expected: [u8; 32] = hasher.finalize().into();
assert_eq!(routing_id, expected);
}
#[test]
fn compute_routing_id_deterministic() {
let id = "deadbeefcafe0123456789abcdef";
assert_eq!(compute_routing_id(id), compute_routing_id(id));
}
#[test]
fn compute_routing_id_distinct_for_different_inputs() {
let a = compute_routing_id("context_a");
let b = compute_routing_id("context_b");
assert_ne!(a, b);
}
#[test]
fn projected_context_new_sets_routing_id() {
let key = generate_broadcast_key("did:dht:alice");
let ctx = ProjectedContext::new("abc123", key, BroadcastAdmission::Open, None);
let expected_routing_id = compute_routing_id("abc123");
assert_eq!(ctx.routing_id, expected_routing_id);
assert_eq!(ctx.context_id(), "abc123");
}
#[test]
fn projected_context_new_inserts_key_at_epoch() {
let key = generate_broadcast_key("did:dht:alice");
let ctx = ProjectedContext::new("abc123", key, BroadcastAdmission::Open, None);
assert!(ctx.key_for_epoch(0).is_some());
assert_eq!(ctx.keys().len(), 1);
}
#[test]
fn enable_then_disable_roundtrip() {
let mut registry: HashMap<[u8; 32], ProjectedContext> = HashMap::new();
let context_id = "test_context_001";
let key = generate_broadcast_key("did:dht:alice");
let routing_id = compute_routing_id(context_id);
let projected = ProjectedContext::new(context_id, key, BroadcastAdmission::Open, None);
registry.insert(routing_id, projected);
assert!(registry.contains_key(&routing_id));
registry.remove(&routing_id);
assert!(!registry.contains_key(&routing_id));
}
#[test]
fn multiple_epochs_stored_and_retrievable() {
let key0 = generate_broadcast_key("did:dht:alice");
let mut ctx =
ProjectedContext::new("multi_epoch_ctx", key0, BroadcastAdmission::Open, None);
let key0_ref = ctx.key_for_epoch(0).expect("epoch 0 should exist");
let (key1, _advance) = rotate_broadcast_key(key0_ref, 1000).expect("rotate should succeed");
assert_eq!(key1.epoch(), 1);
ctx.insert_key(key1);
let key1_ref = ctx.key_for_epoch(1).expect("epoch 1 should exist");
let (key2, _advance) = rotate_broadcast_key(key1_ref, 2000).expect("rotate should succeed");
assert_eq!(key2.epoch(), 2);
ctx.insert_key(key2);
assert!(ctx.key_for_epoch(0).is_some(), "epoch 0 retained");
assert!(ctx.key_for_epoch(1).is_some(), "epoch 1 retained");
assert!(ctx.key_for_epoch(2).is_some(), "epoch 2 retained");
assert_eq!(ctx.keys().len(), 3);
assert!(ctx.key_for_epoch(99).is_none());
}
#[test]
fn insert_key_replaces_existing_epoch() {
let key0 = generate_broadcast_key("did:dht:alice");
let mut ctx = ProjectedContext::new("replace_test", key0, BroadcastAdmission::Open, None);
let replacement = generate_broadcast_key("did:dht:alice");
ctx.insert_key(replacement);
assert_eq!(ctx.keys().len(), 1);
assert!(ctx.key_for_epoch(0).is_some());
}
#[test]
fn retain_only_epochs_keeps_specified_purges_rest() {
let key0 = generate_broadcast_key("did:dht:alice");
let mut ctx = ProjectedContext::new("purge_test", key0, BroadcastAdmission::Open, None);
let k0 = ctx.key_for_epoch(0).unwrap().clone();
let (k1, _) = rotate_broadcast_key(&k0, 1000).unwrap();
ctx.insert_key(k1.clone());
let (k2, _) = rotate_broadcast_key(&k1, 2000).unwrap();
ctx.insert_key(k2);
assert_eq!(ctx.keys().len(), 3);
let retain = std::collections::HashSet::from([2]);
ctx.retain_only_epochs(&retain);
assert!(ctx.key_for_epoch(0).is_none(), "epoch 0 purged");
assert!(ctx.key_for_epoch(1).is_none(), "epoch 1 purged");
assert!(ctx.key_for_epoch(2).is_some(), "epoch 2 retained");
assert_eq!(ctx.keys().len(), 1);
}
#[test]
fn retain_only_epochs_handles_divergent_authors() {
let alice_key = generate_broadcast_key("did:dht:alice");
let mut ctx = ProjectedContext::new(
"divergent_test",
alice_key.clone(),
BroadcastAdmission::Open,
None,
);
let (a1, _) = rotate_broadcast_key(&alice_key, 1000).unwrap();
ctx.insert_key(a1.clone());
let (a2, _) = rotate_broadcast_key(&a1, 2000).unwrap();
ctx.insert_key(a2.clone());
let (a3, _) = rotate_broadcast_key(&a2, 3000).unwrap();
ctx.insert_key(a3);
let bob_key = generate_broadcast_key("did:dht:bob");
let (b1, _) = rotate_broadcast_key(&bob_key, 1000).unwrap();
let (b2, _) = rotate_broadcast_key(&b1, 2000).unwrap();
let (b3, _) = rotate_broadcast_key(&b2, 3000).unwrap();
let (b4, _) = rotate_broadcast_key(&b3, 4000).unwrap();
ctx.insert_key(b4.clone());
let (b5, _) = rotate_broadcast_key(&b4, 5000).unwrap();
ctx.insert_key(b5);
assert_eq!(ctx.keys().len(), 6);
let retain = std::collections::HashSet::from([3, 5]);
ctx.retain_only_epochs(&retain);
assert_eq!(ctx.keys().len(), 2);
assert!(ctx.key_for_epoch(3).is_some(), "alice new epoch retained");
assert!(ctx.key_for_epoch(5).is_some(), "bob new epoch retained");
assert!(ctx.key_for_epoch(0).is_none(), "old alice epoch purged");
assert!(ctx.key_for_epoch(1).is_none(), "old alice epoch purged");
assert!(ctx.key_for_epoch(2).is_none(), "old alice epoch purged");
assert!(ctx.key_for_epoch(4).is_none(), "old bob epoch purged");
}
#[tokio::test]
#[allow(clippy::similar_names)]
async fn message_cross_context_routing_id_mismatch_returns_404() {
let key_a = generate_broadcast_key("did:dht:alice");
let key_b = generate_broadcast_key("did:dht:bob");
let ctx_a =
ProjectedContext::new("context_a", key_a.clone(), BroadcastAdmission::Open, None);
let ctx_b = ProjectedContext::new("context_b", key_b, BroadcastAdmission::Open, None);
let routing_id_a = ctx_a.routing_id;
let routing_id_b = ctx_b.routing_id;
let mut projected_map = HashMap::new();
projected_map.insert(routing_id_a, ctx_a);
projected_map.insert(routing_id_b, ctx_b);
let storage = InMemoryBlobStorage::new();
let envelope = test_seal(&key_a, b"belongs to context_a");
let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
let blob_id = {
let mut h = Sha256::new();
h.update(&blob_bytes);
let r: [u8; 32] = h.finalize().into();
r
};
storage
.store(routing_id_a, blob_id, None, 3600, blob_bytes)
.await
.unwrap();
let state = test_state_with(projected_map, storage);
let router_b = broadcast_projection_router(Arc::clone(&state));
let routing_b_hex = hex_encode(&routing_id_b);
let blob_hex = hex_encode(&blob_id);
let req = Request::builder()
.uri(format!(
"/scp/broadcast/{routing_b_hex}/messages/{blob_hex}"
))
.body(Body::empty())
.unwrap();
let resp = router_b.oneshot(req).await.unwrap();
assert_eq!(
resp.status(),
HttpStatus::NOT_FOUND,
"cross-context request must return 404, not leak blob existence"
);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(
json["error"], "unknown blob_id",
"error message must be indistinguishable from genuinely missing blob"
);
let router_a = broadcast_projection_router(state);
let routing_a_hex = hex_encode(&routing_id_a);
let req = Request::builder()
.uri(format!(
"/scp/broadcast/{routing_a_hex}/messages/{blob_hex}"
))
.body(Body::empty())
.unwrap();
let resp = router_a.oneshot(req).await.unwrap();
assert_eq!(
resp.status(),
HttpStatus::OK,
"correct context should return 200"
);
}
#[tokio::test]
async fn feed_since_parameter_filters_messages() {
let key = generate_broadcast_key("did:dht:alice");
let context_id = "since_ctx";
let projected =
ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
let routing_id = projected.routing_id;
let mut projected_map = HashMap::new();
projected_map.insert(routing_id, projected);
let storage = InMemoryBlobStorage::new();
let mut blob_ids = Vec::new();
for i in 0u8..3 {
let envelope = test_seal(&key, &[i; 16]);
let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
let blob_id = {
let mut h = Sha256::new();
h.update(&blob_bytes);
let r: [u8; 32] = h.finalize().into();
r
};
storage
.store(routing_id, blob_id, None, 3600, blob_bytes)
.await
.unwrap();
blob_ids.push(blob_id);
}
let state = test_state_with(projected_map, storage);
let routing_hex = hex_encode(&routing_id);
let router = broadcast_projection_router(Arc::clone(&state));
let since_hex = hex_encode(&blob_ids[0]);
let req = Request::builder()
.uri(format!(
"/scp/broadcast/{routing_hex}/feed?since={since_hex}"
))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::OK);
let router = broadcast_projection_router(Arc::clone(&state));
let nonexistent = hex_encode(&[0xFF; 32]);
let req = Request::builder()
.uri(format!(
"/scp/broadcast/{routing_hex}/feed?since={nonexistent}"
))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::OK);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
let messages = json["messages"].as_array().unwrap();
assert_eq!(
messages.len(),
0,
"nonexistent since blob_id should return empty feed (cross-context oracle prevention)"
);
let router = broadcast_projection_router(state);
let req = Request::builder()
.uri(format!(
"/scp/broadcast/{routing_hex}/feed?since=not_valid_hex"
))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::BAD_REQUEST);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["code"], "BAD_REQUEST");
}
#[tokio::test]
async fn feed_multi_epoch_decryption() {
let key0 = generate_broadcast_key("did:dht:alice");
let (key1, _advance) = rotate_broadcast_key(&key0, 1000).unwrap();
let context_id = "multi_epoch_feed_ctx";
let mut projected =
ProjectedContext::new(context_id, key0.clone(), BroadcastAdmission::Open, None);
projected.insert_key(key1.clone());
let routing_id = projected.routing_id;
let mut projected_map = HashMap::new();
projected_map.insert(routing_id, projected);
let storage = InMemoryBlobStorage::new();
let envelope_0 = test_seal(&key0, b"epoch zero message");
let blob_bytes_0 = rmp_serde::to_vec(&envelope_0).unwrap();
let blob_id_0 = {
let mut h = Sha256::new();
h.update(&blob_bytes_0);
let r: [u8; 32] = h.finalize().into();
r
};
storage
.store(routing_id, blob_id_0, None, 3600, blob_bytes_0)
.await
.unwrap();
let envelope_1 = test_seal(&key1, b"epoch one message");
let blob_bytes_1 = rmp_serde::to_vec(&envelope_1).unwrap();
let blob_id_1 = {
let mut h = Sha256::new();
h.update(&blob_bytes_1);
let r: [u8; 32] = h.finalize().into();
r
};
storage
.store(routing_id, blob_id_1, None, 3600, blob_bytes_1)
.await
.unwrap();
let state = test_state_with(projected_map, storage);
let router = broadcast_projection_router(state);
let routing_hex = hex_encode(&routing_id);
let req = Request::builder()
.uri(format!("/scp/broadcast/{routing_hex}/feed"))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::OK);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
let messages = json["messages"].as_array().unwrap();
assert_eq!(
messages.len(),
2,
"both epoch-0 and epoch-1 messages returned"
);
let contents: Vec<String> = messages
.iter()
.map(|m| {
let b64 = m["content"].as_str().unwrap();
String::from_utf8(BASE64.decode(b64).unwrap()).unwrap()
})
.collect();
assert!(contents.contains(&"epoch zero message".to_owned()));
assert!(contents.contains(&"epoch one message".to_owned()));
let epochs: Vec<u64> = messages
.iter()
.map(|m| m["key_epoch"].as_u64().unwrap())
.collect();
assert!(epochs.contains(&0));
assert!(epochs.contains(&1));
}
#[tokio::test]
async fn message_tampered_ciphertext_returns_500() {
let key = generate_broadcast_key("did:dht:alice");
let context_id = "tamper_ctx";
let projected =
ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
let routing_id = projected.routing_id;
let mut projected_map = HashMap::new();
projected_map.insert(routing_id, projected);
let storage = InMemoryBlobStorage::new();
let mut envelope = test_seal(&key, b"tamper target");
if envelope.encrypted_content.len() > 13 {
envelope.encrypted_content[13] ^= 0xFF;
}
let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
let blob_id = {
let mut h = Sha256::new();
h.update(&blob_bytes);
let r: [u8; 32] = h.finalize().into();
r
};
storage
.store(routing_id, blob_id, None, 3600, blob_bytes)
.await
.unwrap();
let valid_envelope = test_seal(&key, b"valid message");
let valid_blob_bytes = rmp_serde::to_vec(&valid_envelope).unwrap();
let valid_blob_id = {
let mut h = Sha256::new();
h.update(&valid_blob_bytes);
let r: [u8; 32] = h.finalize().into();
r
};
storage
.store(routing_id, valid_blob_id, None, 3600, valid_blob_bytes)
.await
.unwrap();
let state = test_state_with(projected_map, storage);
let router = broadcast_projection_router(Arc::clone(&state));
let routing_hex = hex_encode(&routing_id);
let blob_hex = hex_encode(&blob_id);
let req = Request::builder()
.uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::INTERNAL_SERVER_ERROR);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["error"], "decryption failure");
let router = broadcast_projection_router(state);
let req = Request::builder()
.uri(format!("/scp/broadcast/{routing_hex}/feed"))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::OK);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
let messages = json["messages"].as_array().unwrap();
assert_eq!(
messages.len(),
1,
"tampered message should be skipped, valid message returned"
);
let content_b64 = messages[0]["content"].as_str().unwrap();
let decoded = BASE64.decode(content_b64).unwrap();
assert_eq!(decoded, b"valid message");
}
#[tokio::test]
async fn message_purged_epoch_returns_410_gone() {
let key = generate_broadcast_key("did:dht:alice");
let context_id = "purge_410_ctx";
let mut projected =
ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
let (key1, _) = rotate_broadcast_key(&key, 1000).unwrap();
projected.insert_key(key1);
let retain = std::collections::HashSet::from([1]);
projected.retain_only_epochs(&retain);
assert!(projected.key_for_epoch(0).is_none());
assert!(projected.key_for_epoch(1).is_some());
let routing_id = projected.routing_id;
let mut projected_map = HashMap::new();
projected_map.insert(routing_id, projected);
let storage = InMemoryBlobStorage::new();
let envelope = test_seal(&key, b"old content");
let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
let blob_id = {
let mut h = Sha256::new();
h.update(&blob_bytes);
let r: [u8; 32] = h.finalize().into();
r
};
storage
.store(routing_id, blob_id, None, 3600, blob_bytes)
.await
.unwrap();
let state = test_state_with(projected_map, storage);
let router = broadcast_projection_router(state);
let routing_hex = hex_encode(&routing_id);
let blob_hex = hex_encode(&blob_id);
let req = Request::builder()
.uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::GONE);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["error"], "content revoked");
assert_eq!(json["code"], "GONE");
}
#[tokio::test]
async fn message_conditional_get_cross_context_returns_404_not_304() {
let key_a = generate_broadcast_key("did:dht:alice");
let key_b = generate_broadcast_key("did:dht:bob");
let ctx_a =
ProjectedContext::new("ctx_a_304", key_a.clone(), BroadcastAdmission::Open, None);
let ctx_b = ProjectedContext::new("ctx_b_304", key_b, BroadcastAdmission::Open, None);
let routing_id_a = ctx_a.routing_id;
let routing_id_b = ctx_b.routing_id;
let mut projected_map = HashMap::new();
projected_map.insert(routing_id_a, ctx_a);
projected_map.insert(routing_id_b, ctx_b);
let storage = InMemoryBlobStorage::new();
let envelope = test_seal(&key_a, b"secret of A");
let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
let blob_id = {
let mut h = Sha256::new();
h.update(&blob_bytes);
let r: [u8; 32] = h.finalize().into();
r
};
storage
.store(routing_id_a, blob_id, None, 3600, blob_bytes)
.await
.unwrap();
let state = test_state_with(projected_map, storage);
let router = broadcast_projection_router(state);
let routing_b_hex = hex_encode(&routing_id_b);
let blob_hex = hex_encode(&blob_id);
let etag_value = format!("\"{blob_hex}\"");
let req = Request::builder()
.uri(format!(
"/scp/broadcast/{routing_b_hex}/messages/{blob_hex}"
))
.header("If-None-Match", &etag_value)
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(
resp.status(),
HttpStatus::NOT_FOUND,
"cross-context conditional GET must return 404, not 304"
);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["error"], "unknown blob_id");
}
#[tokio::test]
async fn rate_limit_returns_429_when_exceeded() {
let state = test_state_with_rate(HashMap::new(), InMemoryBlobStorage::new(), 2);
let routing_hex = hex_encode(&[0xAA; 32]);
let uri = format!("/scp/broadcast/{routing_hex}/feed");
for i in 0..2 {
let router = broadcast_projection_router(Arc::clone(&state));
let req = Request::builder().uri(&uri).body(Body::empty()).unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_ne!(
resp.status(),
HttpStatus::TOO_MANY_REQUESTS,
"request {i} should not be rate-limited"
);
}
let router = broadcast_projection_router(Arc::clone(&state));
let req = Request::builder().uri(&uri).body(Body::empty()).unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(
resp.status(),
HttpStatus::TOO_MANY_REQUESTS,
"third request should be rate-limited"
);
}
#[tokio::test]
async fn rate_limit_allows_different_ips() {
let limiter = scp_transport::relay::rate_limit::PublishRateLimiter::new(1);
let ip_a: std::net::IpAddr = "10.0.0.1".parse().unwrap();
let ip_b: std::net::IpAddr = "10.0.0.2".parse().unwrap();
assert!(limiter.check(ip_a).await, "ip_a first request should pass");
assert!(limiter.check(ip_b).await, "ip_b first request should pass");
assert!(
!limiter.check(ip_a).await,
"ip_a second request should be rate-limited"
);
assert!(
!limiter.check(ip_b).await,
"ip_b second request should be rate-limited"
);
}
use scp_core::context::params::{ProjectionOverride, ProjectionPolicy, ProjectionRule};
use scp_identity::DID;
#[tokio::test]
async fn feed_open_context_serves_without_auth() {
let key = generate_broadcast_key("did:dht:alice");
let context_id = "open_no_auth_ctx";
let projected =
ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
let routing_id = projected.routing_id;
let mut projected_map = HashMap::new();
projected_map.insert(routing_id, projected);
let storage = InMemoryBlobStorage::new();
let envelope = test_seal(&key, b"public content");
let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
let blob_id = {
let mut h = Sha256::new();
h.update(&blob_bytes);
let r: [u8; 32] = h.finalize().into();
r
};
storage
.store(routing_id, blob_id, None, 3600, blob_bytes)
.await
.unwrap();
let state = test_state_with(projected_map, storage);
let router = broadcast_projection_router(state);
let routing_hex = hex_encode(&routing_id);
let req = Request::builder()
.uri(format!("/scp/broadcast/{routing_hex}/feed"))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::OK);
let cache_control = resp
.headers()
.get(header::CACHE_CONTROL)
.unwrap()
.to_str()
.unwrap();
assert_eq!(
cache_control,
"public, max-age=30, stale-while-revalidate=300"
);
}
#[tokio::test]
async fn feed_gated_context_rejects_without_auth() {
let key = generate_broadcast_key("did:dht:alice");
let context_id = "gated_no_auth_ctx";
let projected = ProjectedContext::new(context_id, key, BroadcastAdmission::Gated, None);
let routing_id = projected.routing_id;
let mut projected_map = HashMap::new();
projected_map.insert(routing_id, projected);
let state = test_state_with(projected_map, InMemoryBlobStorage::new());
let router = broadcast_projection_router(state);
let routing_hex = hex_encode(&routing_id);
let req = Request::builder()
.uri(format!("/scp/broadcast/{routing_hex}/feed"))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::UNAUTHORIZED);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["error"], "Authorization required for gated broadcast");
assert_eq!(json["code"], "UNAUTHORIZED");
}
#[tokio::test]
async fn feed_gated_context_rejects_malformed_auth() {
let key = generate_broadcast_key("did:dht:alice");
let context_id = "gated_bad_auth_ctx";
let projected = ProjectedContext::new(context_id, key, BroadcastAdmission::Gated, None);
let routing_id = projected.routing_id;
let mut projected_map = HashMap::new();
projected_map.insert(routing_id, projected);
let state = test_state_with(projected_map, InMemoryBlobStorage::new());
let router = broadcast_projection_router(state);
let routing_hex = hex_encode(&routing_id);
let req = Request::builder()
.uri(format!("/scp/broadcast/{routing_hex}/feed"))
.header("Authorization", "Basic dXNlcjpwYXNz")
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::UNAUTHORIZED);
let router = broadcast_projection_router(test_state_with(
{
let mut m = HashMap::new();
let key2 = generate_broadcast_key("did:dht:alice");
let p2 = ProjectedContext::new(context_id, key2, BroadcastAdmission::Gated, None);
m.insert(p2.routing_id, p2);
m
},
InMemoryBlobStorage::new(),
));
let req = Request::builder()
.uri(format!("/scp/broadcast/{routing_hex}/feed"))
.header("Authorization", "Bearer not-a-valid-jwt")
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::UNAUTHORIZED);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["code"], "UNAUTHORIZED");
}
#[tokio::test]
async fn message_gated_context_rejects_without_auth() {
let key = generate_broadcast_key("did:dht:alice");
let context_id = "gated_msg_no_auth";
let projected =
ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Gated, None);
let routing_id = projected.routing_id;
let mut projected_map = HashMap::new();
projected_map.insert(routing_id, projected);
let storage = InMemoryBlobStorage::new();
let envelope = test_seal(&key, b"secret content");
let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
let blob_id = {
let mut h = Sha256::new();
h.update(&blob_bytes);
let r: [u8; 32] = h.finalize().into();
r
};
storage
.store(routing_id, blob_id, None, 3600, blob_bytes)
.await
.unwrap();
let state = test_state_with(projected_map, storage);
let router = broadcast_projection_router(state);
let routing_hex = hex_encode(&routing_id);
let blob_hex = hex_encode(&blob_id);
let req = Request::builder()
.uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::UNAUTHORIZED);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["error"], "Authorization required for gated broadcast");
assert_eq!(json["code"], "UNAUTHORIZED");
}
#[tokio::test]
async fn gated_context_returns_private_cache_headers() {
let key = generate_broadcast_key("did:dht:alice");
let context_id = "gated_cache_ctx";
let projected =
ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Gated, None);
let routing_id = projected.routing_id;
let mut projected_map = HashMap::new();
projected_map.insert(routing_id, projected);
let storage = InMemoryBlobStorage::new();
let envelope = test_seal(&key, b"gated content");
let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
let blob_id = {
let mut h = Sha256::new();
h.update(&blob_bytes);
let r: [u8; 32] = h.finalize().into();
r
};
storage
.store(routing_id, blob_id, None, 3600, blob_bytes)
.await
.unwrap();
let state = test_state_with(projected_map, storage);
let ucan_token = build_test_ucan(context_id);
let router = broadcast_projection_router(Arc::clone(&state));
let routing_hex = hex_encode(&routing_id);
let req = Request::builder()
.uri(format!("/scp/broadcast/{routing_hex}/feed"))
.header("Authorization", format!("Bearer {ucan_token}"))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::OK);
let cache_control = resp
.headers()
.get(header::CACHE_CONTROL)
.unwrap()
.to_str()
.unwrap();
assert_eq!(cache_control, "private, max-age=30");
let router = broadcast_projection_router(state);
let blob_hex = hex_encode(&blob_id);
let req = Request::builder()
.uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
.header("Authorization", format!("Bearer {ucan_token}"))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::OK);
let cache_control = resp
.headers()
.get(header::CACHE_CONTROL)
.unwrap()
.to_str()
.unwrap();
assert_eq!(cache_control, "private, immutable, max-age=31536000");
}
#[test]
fn effective_rule_open_no_policy() {
let rule = effective_projection_rule(BroadcastAdmission::Open, None, None);
assert_eq!(rule, ProjectionRule::Public);
}
#[test]
fn effective_rule_gated_no_policy() {
let rule = effective_projection_rule(BroadcastAdmission::Gated, None, None);
assert_eq!(rule, ProjectionRule::Gated);
}
#[test]
fn effective_rule_with_policy_default() {
let policy = ProjectionPolicy {
default_rule: ProjectionRule::Gated,
overrides: vec![],
};
let rule = effective_projection_rule(BroadcastAdmission::Open, Some(&policy), None);
assert_eq!(rule, ProjectionRule::Gated);
}
#[test]
fn effective_rule_per_author_override() {
let policy = ProjectionPolicy {
default_rule: ProjectionRule::Gated,
overrides: vec![ProjectionOverride {
did: DID::from("did:dht:special_author"),
rule: ProjectionRule::Public,
}],
};
let rule = effective_projection_rule(
BroadcastAdmission::Open,
Some(&policy),
Some("did:dht:other"),
);
assert_eq!(rule, ProjectionRule::Gated);
let rule = effective_projection_rule(
BroadcastAdmission::Open,
Some(&policy),
Some("did:dht:special_author"),
);
assert_eq!(rule, ProjectionRule::Public);
}
#[test]
fn effective_rule_no_author_uses_default() {
let policy = ProjectionPolicy {
default_rule: ProjectionRule::Gated,
overrides: vec![ProjectionOverride {
did: DID::from("did:dht:someone"),
rule: ProjectionRule::Public,
}],
};
let rule = effective_projection_rule(BroadcastAdmission::Open, Some(&policy), None);
assert_eq!(rule, ProjectionRule::Gated);
}
#[test]
fn validate_policy_rejects_public_default_on_gated() {
let policy = ProjectionPolicy {
default_rule: ProjectionRule::Public,
overrides: vec![],
};
let result = validate_projection_policy(BroadcastAdmission::Gated, Some(&policy));
assert!(result.is_err());
assert_eq!(
result.unwrap_err(),
"gated context cannot have public projection rule"
);
}
#[test]
fn validate_policy_rejects_public_override_on_gated() {
let policy = ProjectionPolicy {
default_rule: ProjectionRule::Gated,
overrides: vec![ProjectionOverride {
did: DID::from("did:dht:some_author"),
rule: ProjectionRule::Public,
}],
};
let result = validate_projection_policy(BroadcastAdmission::Gated, Some(&policy));
assert!(result.is_err());
assert_eq!(
result.unwrap_err(),
"gated context cannot have public per-author projection override"
);
}
#[test]
fn validate_policy_accepts_gated_default_on_gated() {
let policy = ProjectionPolicy {
default_rule: ProjectionRule::Gated,
overrides: vec![],
};
assert!(validate_projection_policy(BroadcastAdmission::Gated, Some(&policy),).is_ok());
}
#[test]
fn validate_policy_accepts_public_on_open() {
let policy = ProjectionPolicy {
default_rule: ProjectionRule::Public,
overrides: vec![],
};
assert!(validate_projection_policy(BroadcastAdmission::Open, Some(&policy),).is_ok());
}
#[test]
fn validate_policy_accepts_none_on_gated() {
assert!(validate_projection_policy(BroadcastAdmission::Gated, None).is_ok());
}
#[tokio::test]
async fn feed_filters_per_author_gated_messages_without_auth() {
use scp_core::crypto::sender_keys::broadcast::rotate_broadcast_key;
let alice_key = generate_broadcast_key("did:dht:alice");
let bob_key_epoch0 = generate_broadcast_key("did:dht:bob");
let (bob_key, _) = rotate_broadcast_key(&bob_key_epoch0, 1_000).unwrap();
let context_id = "feed_per_author_filter_ctx";
let policy = ProjectionPolicy {
default_rule: ProjectionRule::Public,
overrides: vec![ProjectionOverride {
did: DID::from("did:dht:alice"),
rule: ProjectionRule::Gated,
}],
};
let mut projected = ProjectedContext::new(
context_id,
alice_key.clone(),
BroadcastAdmission::Open,
Some(policy),
);
projected.keys.insert(bob_key.epoch(), bob_key.clone());
let routing_id = projected.routing_id;
let mut projected_map = HashMap::new();
projected_map.insert(routing_id, projected);
let storage = InMemoryBlobStorage::new();
let alice_env = test_seal(&alice_key, b"alice secret");
let alice_bytes = rmp_serde::to_vec(&alice_env).unwrap();
let alice_blob_id = {
let mut h = Sha256::new();
h.update(&alice_bytes);
let r: [u8; 32] = h.finalize().into();
r
};
storage
.store(routing_id, alice_blob_id, None, 3600, alice_bytes)
.await
.unwrap();
let bob_env = test_seal(&bob_key, b"bob public");
let bob_bytes = rmp_serde::to_vec(&bob_env).unwrap();
let bob_blob_id = {
let mut h = Sha256::new();
h.update(&bob_bytes);
let r: [u8; 32] = h.finalize().into();
r
};
storage
.store(routing_id, bob_blob_id, None, 3600, bob_bytes)
.await
.unwrap();
let state = test_state_with(projected_map, storage);
let routing_hex = hex_encode(&routing_id);
let router = broadcast_projection_router(Arc::clone(&state));
let req = Request::builder()
.uri(format!("/scp/broadcast/{routing_hex}/feed"))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::OK);
let cache_control = resp
.headers()
.get(header::CACHE_CONTROL)
.unwrap()
.to_str()
.unwrap();
assert_eq!(cache_control, "private, max-age=30");
let body = resp.into_body().collect().await.unwrap().to_bytes();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
let messages = json["messages"].as_array().unwrap();
assert_eq!(
messages.len(),
1,
"feed should filter alice's gated message"
);
assert_eq!(messages[0]["author_did"], "did:dht:bob");
let ucan_token = build_test_ucan(context_id);
let router = broadcast_projection_router(state);
let req = Request::builder()
.uri(format!("/scp/broadcast/{routing_hex}/feed"))
.header("Authorization", format!("Bearer {ucan_token}"))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), HttpStatus::OK);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
let messages = json["messages"].as_array().unwrap();
assert_eq!(
messages.len(),
2,
"authenticated feed should include both messages"
);
}
#[test]
fn validate_ucan_rejects_expired_token() {
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
use scp_core::crypto::ucan::{Attenuation, UcanHeader, UcanPayload};
let header = UcanHeader {
alg: "EdDSA".to_owned(),
typ: "JWT".to_owned(),
ucv: "0.10.0".to_owned(),
kid: None,
};
let payload = UcanPayload {
iss: "did:dht:test".to_owned(),
aud: "did:dht:test".to_owned(),
exp: 1, nbf: None,
nnc: "nonce".to_owned(),
att: vec![Attenuation {
with: "scp:ctx:test_ctx/messages:read".to_owned(),
can: "read".to_owned(),
}],
prf: vec![],
fct: None,
};
let h = serde_json::to_vec(&header).unwrap();
let p = serde_json::to_vec(&payload).unwrap();
let token = format!(
"{}.{}.{}",
URL_SAFE_NO_PAD.encode(&h),
URL_SAFE_NO_PAD.encode(&p),
URL_SAFE_NO_PAD.encode(vec![0u8; 64]),
);
let result = validate_projection_ucan(&token, "test_ctx");
assert!(result.is_err(), "expired UCAN should be rejected");
}
#[test]
fn validate_ucan_rejects_not_yet_valid_token() {
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
use scp_core::crypto::ucan::{Attenuation, UcanHeader, UcanPayload};
let header = UcanHeader {
alg: "EdDSA".to_owned(),
typ: "JWT".to_owned(),
ucv: "0.10.0".to_owned(),
kid: None,
};
let payload = UcanPayload {
iss: "did:dht:test".to_owned(),
aud: "did:dht:test".to_owned(),
exp: u64::MAX,
nbf: Some(u64::MAX - 1), nnc: "nonce".to_owned(),
att: vec![Attenuation {
with: "scp:ctx:test_ctx/messages:read".to_owned(),
can: "read".to_owned(),
}],
prf: vec![],
fct: None,
};
let h = serde_json::to_vec(&header).unwrap();
let p = serde_json::to_vec(&payload).unwrap();
let token = format!(
"{}.{}.{}",
URL_SAFE_NO_PAD.encode(&h),
URL_SAFE_NO_PAD.encode(&p),
URL_SAFE_NO_PAD.encode(vec![0u8; 64]),
);
let result = validate_projection_ucan(&token, "test_ctx");
assert!(result.is_err(), "not-yet-valid UCAN should be rejected");
}
#[tokio::test]
async fn message_handler_enforces_per_author_gated_override_on_open_context() {
use scp_core::crypto::sender_keys::broadcast::rotate_broadcast_key;
let alice_key = generate_broadcast_key("did:dht:alice");
let bob_key_epoch0 = generate_broadcast_key("did:dht:bob");
let (bob_key, _) = rotate_broadcast_key(&bob_key_epoch0, 1_000).unwrap();
let context_id = "open_per_author_override_ctx";
let policy = ProjectionPolicy {
default_rule: ProjectionRule::Public,
overrides: vec![ProjectionOverride {
did: DID::from("did:dht:alice"),
rule: ProjectionRule::Gated,
}],
};
let mut projected = ProjectedContext::new(
context_id,
alice_key.clone(),
BroadcastAdmission::Open,
Some(policy),
);
projected.keys.insert(bob_key.epoch(), bob_key.clone());
let routing_id = projected.routing_id;
let mut projected_map = HashMap::new();
projected_map.insert(routing_id, projected);
let storage = InMemoryBlobStorage::new();
let alice_envelope = test_seal(&alice_key, b"alice private content");
let alice_blob_bytes = rmp_serde::to_vec(&alice_envelope).unwrap();
let alice_blob_id = {
let mut h = Sha256::new();
h.update(&alice_blob_bytes);
let r: [u8; 32] = h.finalize().into();
r
};
storage
.store(routing_id, alice_blob_id, None, 3600, alice_blob_bytes)
.await
.unwrap();
let bob_envelope = test_seal(&bob_key, b"bob public content");
let bob_blob_bytes = rmp_serde::to_vec(&bob_envelope).unwrap();
let bob_blob_id = {
let mut h = Sha256::new();
h.update(&bob_blob_bytes);
let r: [u8; 32] = h.finalize().into();
r
};
storage
.store(routing_id, bob_blob_id, None, 3600, bob_blob_bytes)
.await
.unwrap();
let state = test_state_with(projected_map, storage);
let routing_hex = hex_encode(&routing_id);
let alice_hex = hex_encode(&alice_blob_id);
let router = broadcast_projection_router(Arc::clone(&state));
let req = Request::builder()
.uri(format!("/scp/broadcast/{routing_hex}/messages/{alice_hex}"))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(
resp.status(),
HttpStatus::UNAUTHORIZED,
"alice's content should require auth due to per-author Gated override"
);
let bob_hex = hex_encode(&bob_blob_id);
let router = broadcast_projection_router(Arc::clone(&state));
let req = Request::builder()
.uri(format!("/scp/broadcast/{routing_hex}/messages/{bob_hex}"))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(
resp.status(),
HttpStatus::OK,
"bob's content should be public (no per-author override)"
);
let ucan_token = build_test_ucan(context_id);
let router = broadcast_projection_router(state);
let req = Request::builder()
.uri(format!("/scp/broadcast/{routing_hex}/messages/{alice_hex}"))
.header("Authorization", format!("Bearer {ucan_token}"))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(
resp.status(),
HttpStatus::OK,
"alice's content should be accessible with valid UCAN"
);
let cache_control = resp
.headers()
.get(header::CACHE_CONTROL)
.unwrap()
.to_str()
.unwrap();
assert_eq!(cache_control, "private, immutable, max-age=31536000");
}
fn build_test_ucan(context_id: &str) -> String {
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
use scp_core::crypto::ucan::{Attenuation, UcanHeader, UcanPayload};
let header = UcanHeader {
alg: "EdDSA".to_owned(),
typ: "JWT".to_owned(),
ucv: "0.10.0".to_owned(),
kid: None,
};
let payload = UcanPayload {
iss: "did:dht:test_issuer".to_owned(),
aud: "did:dht:test_audience".to_owned(),
exp: u64::MAX,
nbf: None,
nnc: "test-nonce-001".to_owned(),
att: vec![Attenuation {
with: format!("scp:ctx:{context_id}/messages:read"),
can: "read".to_owned(),
}],
prf: vec![],
fct: None,
};
let header_json = serde_json::to_vec(&header).unwrap();
let payload_json = serde_json::to_vec(&payload).unwrap();
let signature = vec![0u8; 64];
format!(
"{}.{}.{}",
URL_SAFE_NO_PAD.encode(&header_json),
URL_SAFE_NO_PAD.encode(&payload_json),
URL_SAFE_NO_PAD.encode(&signature),
)
}
}