1use std::collections::HashMap;
22use std::sync::Arc;
23
24use axum::Json;
25use axum::Router;
26use axum::extract::{Path, Query, State};
27use axum::http::{StatusCode, header};
28use axum::response::IntoResponse;
29use axum::routing::get;
30use base64::Engine;
31use base64::engine::general_purpose::STANDARD as BASE64;
32use serde::{Deserialize, Serialize};
33use sha2::{Digest, Sha256};
34
35use scp_core::context::broadcast::BroadcastAdmission;
36use scp_core::context::params::{ProjectionPolicy, ProjectionRule};
37use scp_core::crypto::sender_keys::{BroadcastEnvelope, BroadcastKey, open_broadcast_trusted};
38use scp_core::crypto::ucan::CapabilityUri;
39use scp_core::crypto::ucan::validate::parse_ucan;
40use scp_transport::native::storage::BlobStorage;
41
42use crate::error::ApiError;
43use crate::http::NodeState;
44
45#[must_use]
56pub fn compute_routing_id(context_id: &str) -> [u8; 32] {
57 let normalized = context_id.to_ascii_lowercase();
58 let mut hasher = Sha256::new();
59 hasher.update(normalized.as_bytes());
60 hasher.finalize().into()
61}
62
63#[derive(Debug)]
81pub struct ProjectedContext {
82 pub(crate) routing_id: [u8; 32],
87 pub(crate) context_id: String,
89 pub(crate) keys: HashMap<u64, BroadcastKey>,
93 pub(crate) admission: BroadcastAdmission,
98 pub(crate) projection_policy: Option<ProjectionPolicy>,
104}
105
106impl ProjectedContext {
107 #[must_use]
115 pub fn new(
116 context_id: &str,
117 broadcast_key: BroadcastKey,
118 admission: BroadcastAdmission,
119 projection_policy: Option<ProjectionPolicy>,
120 ) -> Self {
121 let routing_id = compute_routing_id(context_id);
122 let epoch = broadcast_key.epoch();
123 let mut keys = HashMap::new();
124 keys.insert(epoch, broadcast_key);
125 Self {
126 routing_id,
127 context_id: context_id.to_owned(),
128 keys,
129 admission,
130 projection_policy,
131 }
132 }
133
134 #[must_use]
136 pub const fn routing_id(&self) -> &[u8; 32] {
137 &self.routing_id
138 }
139
140 #[must_use]
142 pub fn context_id(&self) -> &str {
143 &self.context_id
144 }
145
146 #[must_use]
148 pub const fn keys(&self) -> &HashMap<u64, BroadcastKey> {
149 &self.keys
150 }
151
152 #[must_use]
154 pub const fn admission(&self) -> BroadcastAdmission {
155 self.admission
156 }
157
158 #[must_use]
160 pub const fn projection_policy(&self) -> Option<&ProjectionPolicy> {
161 self.projection_policy.as_ref()
162 }
163
164 pub fn insert_key(&mut self, broadcast_key: BroadcastKey) {
181 let epoch = broadcast_key.epoch();
182 self.keys.insert(epoch, broadcast_key);
183 }
184
185 pub fn retain_only_epochs(&mut self, epochs: &std::collections::HashSet<u64>) {
197 self.keys.retain(|e, _| epochs.contains(e));
198 }
199
200 #[must_use]
202 pub fn key_for_epoch(&self, epoch: u64) -> Option<&BroadcastKey> {
203 self.keys.get(&epoch)
204 }
205}
206
207#[must_use]
215pub fn hex_encode(bytes: &[u8; 32]) -> String {
216 hex::encode(bytes)
217}
218
219#[must_use]
224pub fn hex_decode(s: &str) -> Option<[u8; 32]> {
225 let bytes = hex::decode(s).ok()?;
226 <[u8; 32]>::try_from(bytes.as_slice()).ok()
227}
228
229pub fn validate_projection_policy(
246 admission: BroadcastAdmission,
247 policy: Option<&ProjectionPolicy>,
248) -> Result<(), String> {
249 if let (BroadcastAdmission::Gated, Some(p)) = (admission, policy) {
250 if p.default_rule == ProjectionRule::Public {
251 return Err("gated context cannot have public projection rule".into());
252 }
253 for override_entry in &p.overrides {
254 if override_entry.rule == ProjectionRule::Public {
255 return Err(
256 "gated context cannot have public per-author projection override".into(),
257 );
258 }
259 }
260 }
261 Ok(())
262}
263
264fn extract_bearer_token(headers: &axum::http::HeaderMap) -> Option<&str> {
274 let value = headers.get(header::AUTHORIZATION)?.to_str().ok()?;
275 if value.len() > 7 && value[..7].eq_ignore_ascii_case("bearer ") {
276 Some(&value[7..])
277 } else {
278 None
279 }
280}
281
282fn validate_projection_ucan(
303 token: &str,
304 context_id: &str,
305) -> Result<(), Box<axum::response::Response>> {
306 let ucan = parse_ucan(token).map_err(|e| {
307 tracing::debug!(error = %e, "UCAN parse failed for projection auth");
308 Box::new(ApiError::unauthorized_with("invalid UCAN token").into_response())
309 })?;
310
311 let now = std::time::SystemTime::now()
313 .duration_since(std::time::UNIX_EPOCH)
314 .map(|d| d.as_secs())
315 .unwrap_or(0);
316
317 if ucan.payload.exp <= now {
318 tracing::debug!(
319 exp = ucan.payload.exp,
320 now = now,
321 "UCAN expired for projection auth"
322 );
323 return Err(Box::new(
324 ApiError::unauthorized_with("UCAN token expired").into_response(),
325 ));
326 }
327
328 if let Some(nbf) = ucan.payload.nbf
329 && nbf > now
330 {
331 tracing::debug!(
332 nbf = nbf,
333 now = now,
334 "UCAN not yet valid for projection auth"
335 );
336 return Err(Box::new(
337 ApiError::unauthorized_with("UCAN token not yet valid").into_response(),
338 ));
339 }
340
341 let required = CapabilityUri::new(context_id, "messages", "read");
343
344 let has_capability = ucan.payload.att.iter().any(|att| {
346 att.with
347 .parse::<CapabilityUri>()
348 .is_ok_and(|cap| cap.matches(&required))
349 });
350
351 if !has_capability {
352 tracing::debug!(
353 context_id = context_id,
354 "UCAN missing messages:read capability for context"
355 );
356 return Err(Box::new(
357 ApiError::unauthorized_with("UCAN missing messages:read capability").into_response(),
358 ));
359 }
360
361 Ok(())
362}
363
364fn effective_projection_rule(
375 admission: BroadcastAdmission,
376 policy: Option<&ProjectionPolicy>,
377 author_did: Option<&str>,
378) -> ProjectionRule {
379 match policy {
380 Some(p) => {
381 if let Some(author) = author_did
383 && let Some(ov) = p.overrides.iter().find(|o| o.did.as_ref() == author)
384 {
385 return ov.rule;
386 }
387 p.default_rule
388 }
389 None => match admission {
390 BroadcastAdmission::Gated => ProjectionRule::Gated,
391 BroadcastAdmission::Open => ProjectionRule::Public,
392 },
393 }
394}
395
396fn check_projection_auth(
407 headers: &axum::http::HeaderMap,
408 context_id: &str,
409 rule: ProjectionRule,
410) -> Result<(), Box<axum::response::Response>> {
411 match rule {
412 ProjectionRule::Public => Ok(()),
413 ProjectionRule::Gated | ProjectionRule::AuthorChoice => {
414 let token = extract_bearer_token(headers).ok_or_else(|| {
415 Box::new(
416 ApiError::unauthorized_with("Authorization required for gated broadcast")
417 .into_response(),
418 )
419 })?;
420 validate_projection_ucan(token, context_id)
421 }
422 }
423}
424
425#[must_use]
434pub fn unix_to_iso8601(ts: u64) -> String {
435 const DAYS_IN_MONTH: [[u64; 12]; 2] = [
437 [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31],
438 [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31],
439 ];
440
441 const fn is_leap(y: u64) -> bool {
442 (y.is_multiple_of(4) && !y.is_multiple_of(100)) || y.is_multiple_of(400)
443 }
444
445 const fn days_in_year(y: u64) -> u64 {
446 if is_leap(y) { 366 } else { 365 }
447 }
448
449 let mut remaining = ts;
450 let seconds = remaining % 60;
451 remaining /= 60;
452 let minutes = remaining % 60;
453 remaining /= 60;
454 let hours = remaining % 24;
455 let mut days = remaining / 24;
456
457 let mut year = 1970u64;
458 loop {
459 let dy = days_in_year(year);
460 if days < dy {
461 break;
462 }
463 days -= dy;
464 year += 1;
465 }
466
467 let leap = usize::from(is_leap(year));
468 let mut month = 0usize;
469 while month < 11 && days >= DAYS_IN_MONTH[leap][month] {
470 days -= DAYS_IN_MONTH[leap][month];
471 month += 1;
472 }
473
474 format!(
475 "{year:04}-{:02}-{:02}T{hours:02}:{minutes:02}:{seconds:02}Z",
476 month + 1,
477 days + 1
478 )
479}
480
481#[derive(Debug, Clone, Serialize)]
493pub struct FeedResponse {
494 pub context_id: String,
496 pub author_did: String,
501 pub messages: Vec<FeedMessage>,
503}
504
505#[derive(Debug, Clone, Serialize)]
515pub struct FeedMessage {
516 pub id: String,
518 pub context_id: String,
520 pub author_did: String,
522 pub sequence: u64,
524 pub timestamp: u64,
526 pub key_epoch: u64,
528 pub published_at: String,
530 pub content: String,
532}
533
534#[derive(Debug, Clone, Deserialize)]
542pub struct FeedQuery {
543 pub since: Option<String>,
545 pub limit: Option<u32>,
547}
548
549const DEFAULT_FEED_LIMIT: u32 = 20;
555
556const MAX_FEED_LIMIT: u32 = 100;
558
559fn decrypt_blobs(
567 blobs: &[scp_transport::native::storage::StoredBlob],
568 keys: &HashMap<u64, BroadcastKey>,
569) -> (Vec<FeedMessage>, Option<[u8; 32]>) {
570 let mut messages = Vec::with_capacity(blobs.len());
571 let mut latest_blob_id: Option<[u8; 32]> = None;
572
573 for stored in blobs {
574 let envelope: BroadcastEnvelope = match rmp_serde::from_slice(&stored.blob) {
576 Ok(env) => env,
577 Err(e) => {
578 tracing::warn!(
579 blob_id = hex_encode(&stored.blob_id),
580 error = %e,
581 "failed to deserialize BroadcastEnvelope, skipping"
582 );
583 continue;
584 }
585 };
586
587 let Some(key) = keys.get(&envelope.key_epoch) else {
589 tracing::warn!(
590 blob_id = hex_encode(&stored.blob_id),
591 epoch = envelope.key_epoch,
592 "no broadcast key for epoch, skipping"
593 );
594 continue;
595 };
596
597 let plaintext = match open_broadcast_trusted(key, &envelope) {
599 Ok(pt) => pt,
600 Err(e) => {
601 tracing::warn!(
602 blob_id = hex_encode(&stored.blob_id),
603 epoch = envelope.key_epoch,
604 error = %e,
605 "decryption failed, skipping"
606 );
607 continue;
608 }
609 };
610
611 latest_blob_id = Some(stored.blob_id);
612
613 messages.push(FeedMessage {
614 id: hex_encode(&stored.blob_id),
615 context_id: envelope.context_id,
616 author_did: envelope.author_did,
617 sequence: envelope.sequence,
618 timestamp: envelope.timestamp,
619 key_epoch: envelope.key_epoch,
620 published_at: unix_to_iso8601(stored.stored_at),
621 content: BASE64.encode(&plaintext),
622 });
623 }
624
625 (messages, latest_blob_id)
626}
627
628#[allow(clippy::too_many_lines)]
658pub async fn feed_handler(
659 State(state): State<Arc<NodeState>>,
660 Path(routing_id_hex): Path<String>,
661 Query(params): Query<FeedQuery>,
662 headers: axum::http::HeaderMap,
663) -> impl IntoResponse {
664 let Some(routing_id) = hex_decode(&routing_id_hex) else {
666 return ApiError::bad_request("invalid routing_id hex").into_response();
667 };
668
669 let projected_contexts = state.projected_contexts.read().await;
671 let Some(projected) = projected_contexts.get(&routing_id) else {
672 return ApiError::not_found("unknown routing_id").into_response();
673 };
674
675 let context_id = projected.context_id.clone();
677 let admission = projected.admission;
678 let projection_policy = projected.projection_policy.clone();
679 let keys: HashMap<u64, BroadcastKey> = projected.keys.clone();
681 drop(projected_contexts);
682
683 let rule = effective_projection_rule(admission, projection_policy.as_ref(), None);
687 if let Err(resp) = check_projection_auth(&headers, &context_id, rule) {
688 return *resp;
689 }
690
691 let since_ts: Option<u64> = if let Some(ref since_hex) = params.since {
694 let Some(since_blob_id) = hex_decode(since_hex) else {
695 return ApiError::bad_request("invalid since blob_id hex").into_response();
696 };
697 match state.blob_storage.get(&since_blob_id).await {
699 Ok(Some(blob)) => {
700 if blob.routing_id != routing_id {
703 return ApiError::bad_request("since blob_id does not belong to this context")
704 .into_response();
705 }
706 Some(blob.stored_at)
707 }
708 Ok(None) => {
709 Some(u64::MAX)
713 }
714 Err(e) => {
715 tracing::warn!(
716 error = %e,
717 since_blob_id = since_hex,
718 "failed to look up since blob"
719 );
720 Some(u64::MAX)
722 }
723 }
724 } else {
725 None
726 };
727
728 let limit = params
730 .limit
731 .unwrap_or(DEFAULT_FEED_LIMIT)
732 .min(MAX_FEED_LIMIT);
733
734 let blobs = match state.blob_storage.query(&routing_id, since_ts, limit).await {
736 Ok(blobs) => blobs,
737 Err(e) => {
738 tracing::error!(
739 error = %e,
740 routing_id = routing_id_hex,
741 "blob storage query failed"
742 );
743 return ApiError::internal_error("storage error").into_response();
744 }
745 };
746
747 let (mut messages, latest_blob_id) = decrypt_blobs(&blobs, &keys);
749
750 if let Some(ref policy) = projection_policy
762 && !policy.overrides.is_empty()
763 {
764 let auth_public = true; let mut auth_gated: Option<bool> = None;
770 let mut auth_author_choice: Option<bool> = None;
771
772 match rule {
774 ProjectionRule::Public => {} ProjectionRule::Gated => {
776 auth_gated = Some(true);
777 }
778 ProjectionRule::AuthorChoice => {
779 auth_author_choice = Some(true);
780 }
781 }
782
783 for ov in &policy.overrides {
786 if ov.rule == ProjectionRule::Gated && auth_gated.is_none() {
787 auth_gated = Some(
788 check_projection_auth(&headers, &context_id, ProjectionRule::Gated).is_ok(),
789 );
790 } else if ov.rule == ProjectionRule::AuthorChoice && auth_author_choice.is_none() {
791 auth_author_choice = Some(
792 check_projection_auth(&headers, &context_id, ProjectionRule::AuthorChoice)
793 .is_ok(),
794 );
795 }
796 }
799
800 messages.retain(|msg| {
801 let author_rule =
802 effective_projection_rule(admission, Some(policy), Some(&msg.author_did));
803 match author_rule {
804 ProjectionRule::Public => auth_public,
805 ProjectionRule::Gated => auth_gated.unwrap_or(true),
806 ProjectionRule::AuthorChoice => auth_author_choice.unwrap_or(true),
807 }
808 });
809 }
810
811 let author_did = messages
813 .first()
814 .map(|m| m.author_did.clone())
815 .unwrap_or_default();
816
817 let response = FeedResponse {
818 context_id,
819 author_did,
820 messages,
821 };
822
823 let etag = latest_blob_id
827 .map(|id| format!("\"{}\"", hex_encode(&id)))
828 .unwrap_or_default();
829
830 let has_per_author_overrides = projection_policy
833 .as_ref()
834 .is_some_and(|p| !p.overrides.is_empty());
835
836 let cache_control = match rule {
837 ProjectionRule::Public if !has_per_author_overrides => {
838 "public, max-age=30, stale-while-revalidate=300"
839 }
840 _ => "private, max-age=30",
841 };
842
843 let mut resp_headers = axum::http::HeaderMap::new();
844 resp_headers.insert(
845 header::CACHE_CONTROL,
846 axum::http::HeaderValue::from_static(cache_control),
847 );
848 if let (false, Ok(val)) = (etag.is_empty(), axum::http::HeaderValue::from_str(&etag)) {
849 resp_headers.insert(header::ETAG, val);
850 }
851
852 (StatusCode::OK, resp_headers, Json(response)).into_response()
853}
854
855#[allow(clippy::too_many_lines)]
888pub async fn message_handler(
889 State(state): State<Arc<NodeState>>,
890 Path((routing_id_hex, blob_id_hex_raw)): Path<(String, String)>,
891 headers: axum::http::HeaderMap,
892) -> impl IntoResponse {
893 let blob_id_hex = blob_id_hex_raw.to_ascii_lowercase();
895
896 let Some(routing_id) = hex_decode(&routing_id_hex) else {
898 return ApiError::bad_request("invalid routing_id hex").into_response();
899 };
900
901 let Some(blob_id) = hex_decode(&blob_id_hex) else {
903 return ApiError::bad_request("invalid blob_id hex").into_response();
904 };
905
906 let projected_contexts = state.projected_contexts.read().await;
909 let Some(projected) = projected_contexts.get(&routing_id) else {
910 return ApiError::not_found("unknown routing_id").into_response();
911 };
912
913 let context_id = projected.context_id.clone();
915 let admission = projected.admission;
916 let projection_policy = projected.projection_policy.clone();
917 let keys: HashMap<u64, BroadcastKey> = projected.keys.clone();
918 drop(projected_contexts);
919
920 let default_rule = effective_projection_rule(admission, projection_policy.as_ref(), None);
924 if matches!(
925 default_rule,
926 ProjectionRule::Gated | ProjectionRule::AuthorChoice
927 ) && let Err(resp) = check_projection_auth(&headers, &context_id, default_rule)
928 {
929 return *resp;
930 }
931
932 let stored = match state.blob_storage.get(&blob_id).await {
934 Ok(Some(blob)) => blob,
935 Ok(None) => {
936 return ApiError::not_found("unknown blob_id").into_response();
937 }
938 Err(e) => {
939 tracing::error!(
940 error = %e,
941 blob_id = blob_id_hex,
942 "blob storage get failed"
943 );
944 return ApiError::internal_error("storage error").into_response();
945 }
946 };
947
948 if stored.routing_id != routing_id {
954 return ApiError::not_found("unknown blob_id").into_response();
955 }
956
957 if let Some(inm) = headers.get(header::IF_NONE_MATCH)
961 && let Ok(inm_str) = inm.to_str()
962 {
963 let expected_etag = format!("\"{blob_id_hex}\"");
964 if inm_str == expected_etag {
965 return StatusCode::NOT_MODIFIED.into_response();
966 }
967 }
968
969 let envelope: BroadcastEnvelope = match rmp_serde::from_slice(&stored.blob) {
971 Ok(env) => env,
972 Err(e) => {
973 tracing::error!(
974 error = %e,
975 blob_id = blob_id_hex,
976 "failed to deserialize BroadcastEnvelope"
977 );
978 return ApiError::internal_error("decryption failure").into_response();
979 }
980 };
981
982 let effective_rule = effective_projection_rule(
987 admission,
988 projection_policy.as_ref(),
989 Some(&envelope.author_did),
990 );
991 if effective_rule != default_rule
992 && let Err(resp) = check_projection_auth(&headers, &context_id, effective_rule)
993 {
994 return *resp;
995 }
996
997 let Some(key) = keys.get(&envelope.key_epoch) else {
1001 tracing::warn!(
1002 blob_id = blob_id_hex,
1003 epoch = envelope.key_epoch,
1004 "no broadcast key for epoch (likely purged after governance ban)"
1005 );
1006 return ApiError::gone("content revoked").into_response();
1007 };
1008
1009 let plaintext = match open_broadcast_trusted(key, &envelope) {
1011 Ok(pt) => pt,
1012 Err(e) => {
1013 tracing::error!(
1014 error = %e,
1015 blob_id = blob_id_hex,
1016 epoch = envelope.key_epoch,
1017 "decryption failed"
1018 );
1019 return ApiError::internal_error("decryption failure").into_response();
1020 }
1021 };
1022
1023 let message = FeedMessage {
1024 id: hex_encode(&stored.blob_id),
1025 context_id: envelope.context_id,
1026 author_did: envelope.author_did,
1027 sequence: envelope.sequence,
1028 timestamp: envelope.timestamp,
1029 key_epoch: envelope.key_epoch,
1030 published_at: unix_to_iso8601(stored.stored_at),
1031 content: BASE64.encode(&plaintext),
1032 };
1033
1034 let cache_control = match effective_rule {
1038 ProjectionRule::Public => "public, immutable, max-age=31536000",
1039 ProjectionRule::Gated | ProjectionRule::AuthorChoice => {
1040 "private, immutable, max-age=31536000"
1041 }
1042 };
1043
1044 let mut resp_headers = axum::http::HeaderMap::new();
1045 resp_headers.insert(
1046 header::CACHE_CONTROL,
1047 axum::http::HeaderValue::from_static(cache_control),
1048 );
1049 let etag = format!("\"{blob_id_hex}\"");
1050 if let Ok(val) = axum::http::HeaderValue::from_str(&etag) {
1051 resp_headers.insert(header::ETAG, val);
1052 }
1053
1054 (StatusCode::OK, resp_headers, Json(message)).into_response()
1055}
1056
1057pub fn broadcast_projection_router(state: Arc<NodeState>) -> Router {
1074 let limiter = state.projection_rate_limiter.clone();
1075 Router::new()
1076 .route("/scp/broadcast/{routing_id}/feed", get(feed_handler))
1077 .route(
1078 "/scp/broadcast/{routing_id}/messages/{blob_id}",
1079 get(message_handler),
1080 )
1081 .layer(axum::middleware::from_fn(move |req, next| {
1082 projection_rate_limit_middleware(req, next, limiter.clone())
1083 }))
1084 .with_state(state)
1085}
1086
1087async fn projection_rate_limit_middleware(
1096 request: axum::extract::Request,
1097 next: axum::middleware::Next,
1098 limiter: scp_transport::relay::rate_limit::PublishRateLimiter,
1099) -> axum::response::Response {
1100 let ip = request
1101 .extensions()
1102 .get::<axum::extract::ConnectInfo<std::net::SocketAddr>>()
1103 .map_or_else(
1104 || {
1105 tracing::warn!(
1106 "ConnectInfo missing from request extensions; \
1107 projection rate limiting falls back to shared 0.0.0.0 bucket \
1108 (per-IP isolation lost)"
1109 );
1110 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED)
1111 },
1112 |ci| ci.0.ip(),
1113 );
1114 if !limiter.check(ip).await {
1115 return (
1116 axum::http::StatusCode::TOO_MANY_REQUESTS,
1117 "rate limit exceeded",
1118 )
1119 .into_response();
1120 }
1121 next.run(request).await
1122}
1123
1124#[cfg(test)]
1129#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1130mod tests {
1131 use super::*;
1132 use ed25519_dalek::Signer;
1133 use scp_core::crypto::sender_keys::{
1134 SealBroadcastParams, SigningPayloadFields, build_broadcast_signing_payload,
1135 compute_provenance_hash, generate_broadcast_key, generate_broadcast_nonce,
1136 rotate_broadcast_key, seal_broadcast,
1137 };
1138
1139 fn test_seal(key: &BroadcastKey, payload: &[u8]) -> BroadcastEnvelope {
1141 let sk = ed25519_dalek::SigningKey::from_bytes(&[0xAA; 32]);
1142 let nonce = generate_broadcast_nonce();
1143 let provenance_hash = compute_provenance_hash(None).unwrap();
1144 let signature = sk.sign(&build_broadcast_signing_payload(&SigningPayloadFields {
1145 version: scp_core::envelope::SCP_PROTOCOL_VERSION,
1146 context_id: "test-ctx",
1147 author_did: key.author_did(),
1148 sequence: 1,
1149 key_epoch: key.epoch(),
1150 timestamp: 1_700_000_000_000,
1151 nonce: &nonce,
1152 provenance_hash: &provenance_hash,
1153 }));
1154 let params = SealBroadcastParams {
1155 context_id: "test-ctx",
1156 sequence: 1,
1157 timestamp: 1_700_000_000_000,
1158 provenance: None,
1159 signature,
1160 };
1161 seal_broadcast(key, payload, &nonce, ¶ms).unwrap()
1162 }
1163
1164 #[test]
1169 fn hex_encode_produces_64_char_lowercase() {
1170 let bytes = [0xab; 32];
1171 let encoded = hex_encode(&bytes);
1172 assert_eq!(encoded.len(), 64);
1173 assert_eq!(encoded, "ab".repeat(32));
1174 }
1175
1176 #[test]
1177 fn hex_encode_all_zeros() {
1178 let bytes = [0u8; 32];
1179 assert_eq!(hex_encode(&bytes), "00".repeat(32));
1180 }
1181
1182 #[test]
1183 fn hex_decode_roundtrip() {
1184 let bytes = [0xde; 32];
1185 let encoded = hex_encode(&bytes);
1186 let decoded = hex_decode(&encoded).unwrap();
1187 assert_eq!(decoded, bytes);
1188 }
1189
1190 #[test]
1191 fn hex_decode_rejects_wrong_length() {
1192 assert!(hex_decode("abcd").is_none());
1193 assert!(hex_decode("").is_none());
1194 assert!(hex_decode(&"a".repeat(63)).is_none());
1195 assert!(hex_decode(&"a".repeat(65)).is_none());
1196 }
1197
1198 #[test]
1199 fn hex_decode_rejects_invalid_chars() {
1200 let mut s = "0".repeat(64);
1201 s.replace_range(0..1, "g"); assert!(hex_decode(&s).is_none());
1203 }
1204
1205 #[test]
1206 fn hex_decode_accepts_uppercase() {
1207 let lower = hex_encode(&[0xAB; 32]);
1208 let upper = lower.to_uppercase();
1209 let decoded = hex_decode(&upper).unwrap();
1210 assert_eq!(decoded, [0xAB; 32]);
1211 }
1212
1213 #[test]
1218 fn unix_to_iso8601_epoch_zero() {
1219 assert_eq!(unix_to_iso8601(0), "1970-01-01T00:00:00Z");
1220 }
1221
1222 #[test]
1223 fn unix_to_iso8601_known_timestamp() {
1224 assert_eq!(unix_to_iso8601(1_736_937_000), "2025-01-15T10:30:00Z");
1226 }
1227
1228 #[test]
1229 fn unix_to_iso8601_y2k() {
1230 assert_eq!(unix_to_iso8601(946_684_800), "2000-01-01T00:00:00Z");
1232 }
1233
1234 #[test]
1235 fn unix_to_iso8601_leap_year_feb_29() {
1236 assert_eq!(unix_to_iso8601(1_709_208_000), "2024-02-29T12:00:00Z");
1238 }
1239
1240 #[test]
1245 fn decrypt_blobs_with_valid_envelope() {
1246 let key = generate_broadcast_key("did:dht:alice");
1247 let envelope = test_seal(&key, b"hello world");
1248 let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
1249
1250 let blob_id = {
1251 let mut hasher = Sha256::new();
1252 hasher.update(&blob_bytes);
1253 let h: [u8; 32] = hasher.finalize().into();
1254 h
1255 };
1256
1257 let stored = scp_transport::native::storage::StoredBlob {
1258 routing_id: [0xAA; 32],
1259 blob_id,
1260 recipient_hint: None,
1261 blob_ttl: 3600,
1262 stored_at: 1_736_937_000,
1263 blob: blob_bytes,
1264 };
1265
1266 let mut keys = HashMap::new();
1267 keys.insert(0, key);
1268
1269 let (messages, latest_id) = decrypt_blobs(&[stored], &keys);
1270 assert_eq!(messages.len(), 1);
1271 assert_eq!(messages[0].author_did, "did:dht:alice");
1272 assert_eq!(messages[0].key_epoch, 0);
1273 assert_eq!(messages[0].published_at, "2025-01-15T10:30:00Z");
1274 let decoded = BASE64.decode(&messages[0].content).unwrap();
1276 assert_eq!(decoded, b"hello world");
1277 assert_eq!(latest_id, Some(blob_id));
1278 }
1279
1280 #[test]
1281 fn decrypt_blobs_skips_invalid_msgpack() {
1282 let stored = scp_transport::native::storage::StoredBlob {
1283 routing_id: [0xAA; 32],
1284 blob_id: [0xBB; 32],
1285 recipient_hint: None,
1286 blob_ttl: 3600,
1287 stored_at: 100,
1288 blob: vec![0xFF, 0xFE], };
1290
1291 let keys = HashMap::new();
1292 let (messages, latest_id) = decrypt_blobs(&[stored], &keys);
1293 assert!(messages.is_empty());
1294 assert!(latest_id.is_none());
1295 }
1296
1297 #[test]
1298 fn decrypt_blobs_skips_missing_epoch_key() {
1299 let key = generate_broadcast_key("did:dht:alice");
1300 let envelope = test_seal(&key, b"secret");
1301 let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
1302
1303 let stored = scp_transport::native::storage::StoredBlob {
1304 routing_id: [0xAA; 32],
1305 blob_id: [0xCC; 32],
1306 recipient_hint: None,
1307 blob_ttl: 3600,
1308 stored_at: 100,
1309 blob: blob_bytes,
1310 };
1311
1312 let (rotated_key, _) = rotate_broadcast_key(&key, 1000).unwrap();
1314 let mut keys = HashMap::new();
1315 keys.insert(1, rotated_key);
1316
1317 let (messages, latest_id) = decrypt_blobs(&[stored], &keys);
1318 assert!(messages.is_empty());
1319 assert!(latest_id.is_none());
1320 }
1321
1322 use std::net::SocketAddr;
1327 use std::time::Instant;
1328
1329 use axum::body::Body;
1330 use axum::http::{Request, StatusCode as HttpStatus};
1331 use http_body_util::BodyExt;
1332 use scp_transport::native::storage::{BlobStorageBackend, InMemoryBlobStorage};
1333 use tokio::sync::RwLock;
1334 use tower::ServiceExt;
1335
1336 use crate::http::NodeState;
1337
1338 fn test_state_with(
1341 projected: HashMap<[u8; 32], ProjectedContext>,
1342 storage: InMemoryBlobStorage,
1343 ) -> Arc<NodeState> {
1344 test_state_with_rate(projected, storage, 1000)
1345 }
1346
1347 fn test_state_with_rate(
1350 projected: HashMap<[u8; 32], ProjectedContext>,
1351 storage: InMemoryBlobStorage,
1352 rate_limit: u32,
1353 ) -> Arc<NodeState> {
1354 Arc::new(NodeState {
1355 did: "did:dht:test".to_owned(),
1356 relay_url: "wss://localhost/scp/v1".to_owned(),
1357 broadcast_contexts: RwLock::new(HashMap::new()),
1358 relay_addr: "127.0.0.1:9000".parse::<SocketAddr>().unwrap(),
1359 bridge_secret: zeroize::Zeroizing::new([0u8; 32]),
1360 dev_token: None,
1361 dev_bind_addr: None,
1362 projected_contexts: RwLock::new(projected),
1363 blob_storage: Arc::new(BlobStorageBackend::from(storage)),
1364 relay_config: scp_transport::native::server::RelayConfig::default(),
1365 start_time: Instant::now(),
1366 http_bind_addr: SocketAddr::from(([0, 0, 0, 0], 8443)),
1367 shutdown_token: tokio_util::sync::CancellationToken::new(),
1368 cors_origins: None,
1369 projection_rate_limiter: scp_transport::relay::rate_limit::PublishRateLimiter::new(
1370 rate_limit,
1371 ),
1372 tls_config: None,
1373 cert_resolver: None,
1374 did_document: scp_identity::document::DidDocument {
1375 context: vec!["https://www.w3.org/ns/did/v1".to_owned()],
1376 id: "did:dht:test".to_owned(),
1377 verification_method: vec![],
1378 authentication: vec![],
1379 assertion_method: vec![],
1380 also_known_as: vec![],
1381 service: vec![],
1382 },
1383 connection_tracker: scp_transport::relay::rate_limit::new_connection_tracker(),
1384 subscription_registry: scp_transport::relay::subscription::new_registry(),
1385 acme_challenges: None,
1386 bridge_state: Arc::new(crate::bridge_handlers::BridgeState::new()),
1387 })
1388 }
1389
1390 #[tokio::test]
1391 async fn feed_unknown_routing_id_returns_404() {
1392 let state = test_state_with(HashMap::new(), InMemoryBlobStorage::new());
1393 let router = broadcast_projection_router(state);
1394
1395 let routing_hex = hex_encode(&[0xAA; 32]);
1396 let req = Request::builder()
1397 .uri(format!("/scp/broadcast/{routing_hex}/feed"))
1398 .body(Body::empty())
1399 .unwrap();
1400
1401 let resp = router.oneshot(req).await.unwrap();
1402 assert_eq!(resp.status(), HttpStatus::NOT_FOUND);
1403
1404 let body = resp.into_body().collect().await.unwrap().to_bytes();
1405 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1406 assert_eq!(json["code"], "NOT_FOUND");
1407 }
1408
1409 #[tokio::test]
1410 async fn feed_invalid_hex_returns_400() {
1411 let state = test_state_with(HashMap::new(), InMemoryBlobStorage::new());
1412 let router = broadcast_projection_router(state);
1413
1414 let req = Request::builder()
1415 .uri("/scp/broadcast/not_hex/feed")
1416 .body(Body::empty())
1417 .unwrap();
1418
1419 let resp = router.oneshot(req).await.unwrap();
1420 assert_eq!(resp.status(), HttpStatus::BAD_REQUEST);
1421 }
1422
1423 #[tokio::test]
1424 async fn feed_returns_decrypted_messages_with_cache_headers() {
1425 let key = generate_broadcast_key("did:dht:alice");
1426 let context_id = "test_ctx_001";
1427 let projected =
1428 ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
1429 let routing_id = projected.routing_id;
1430
1431 let mut projected_map = HashMap::new();
1432 projected_map.insert(routing_id, projected);
1433
1434 let storage = InMemoryBlobStorage::new();
1435
1436 let envelope = test_seal(&key, b"hello feed");
1438 let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
1439 let blob_id = {
1440 let mut h = Sha256::new();
1441 h.update(&blob_bytes);
1442 let r: [u8; 32] = h.finalize().into();
1443 r
1444 };
1445
1446 storage
1447 .store(routing_id, blob_id, None, 3600, blob_bytes)
1448 .await
1449 .unwrap();
1450
1451 let state = test_state_with(projected_map, storage);
1452 let router = broadcast_projection_router(state);
1453
1454 let routing_hex = hex_encode(&routing_id);
1455 let req = Request::builder()
1456 .uri(format!("/scp/broadcast/{routing_hex}/feed"))
1457 .body(Body::empty())
1458 .unwrap();
1459
1460 let resp = router.oneshot(req).await.unwrap();
1461 assert_eq!(resp.status(), HttpStatus::OK);
1462
1463 let cache_control = resp
1465 .headers()
1466 .get(header::CACHE_CONTROL)
1467 .unwrap()
1468 .to_str()
1469 .unwrap();
1470 assert_eq!(
1471 cache_control,
1472 "public, max-age=30, stale-while-revalidate=300"
1473 );
1474
1475 let etag = resp.headers().get(header::ETAG).unwrap().to_str().unwrap();
1477 assert!(etag.contains(&hex_encode(&blob_id)));
1478
1479 let body = resp.into_body().collect().await.unwrap().to_bytes();
1481 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1482
1483 assert_eq!(json["context_id"], "test_ctx_001");
1484 assert_eq!(json["author_did"], "did:dht:alice");
1485
1486 let messages = json["messages"].as_array().unwrap();
1487 assert_eq!(messages.len(), 1);
1488 assert_eq!(messages[0]["author_did"], "did:dht:alice");
1489 assert_eq!(messages[0]["key_epoch"], 0);
1490
1491 let content_b64 = messages[0]["content"].as_str().unwrap();
1493 let decoded = BASE64.decode(content_b64).unwrap();
1494 assert_eq!(decoded, b"hello feed");
1495 }
1496
1497 #[tokio::test]
1498 async fn feed_respects_limit_parameter() {
1499 let key = generate_broadcast_key("did:dht:alice");
1500 let context_id = "limit_ctx";
1501 let projected =
1502 ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
1503 let routing_id = projected.routing_id;
1504
1505 let mut projected_map = HashMap::new();
1506 projected_map.insert(routing_id, projected);
1507
1508 let storage = InMemoryBlobStorage::new();
1509
1510 for i in 0u8..5 {
1512 let envelope = test_seal(&key, &[i; 10]);
1513 let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
1514 let blob_id = {
1515 let mut h = Sha256::new();
1516 h.update(&blob_bytes);
1517 let r: [u8; 32] = h.finalize().into();
1518 r
1519 };
1520 storage
1521 .store(routing_id, blob_id, None, 3600, blob_bytes)
1522 .await
1523 .unwrap();
1524 }
1525
1526 let state = test_state_with(projected_map, storage);
1527 let router = broadcast_projection_router(state);
1528
1529 let routing_hex = hex_encode(&routing_id);
1530 let req = Request::builder()
1531 .uri(format!("/scp/broadcast/{routing_hex}/feed?limit=2"))
1532 .body(Body::empty())
1533 .unwrap();
1534
1535 let resp = router.oneshot(req).await.unwrap();
1536 assert_eq!(resp.status(), HttpStatus::OK);
1537
1538 let body = resp.into_body().collect().await.unwrap().to_bytes();
1539 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1540 let messages = json["messages"].as_array().unwrap();
1541 assert_eq!(messages.len(), 2);
1542 }
1543
1544 #[tokio::test]
1545 async fn feed_limit_clamped_to_100() {
1546 let key = generate_broadcast_key("did:dht:alice");
1547 let context_id = "clamp_ctx";
1548 let projected =
1549 ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
1550 let routing_id = projected.routing_id;
1551
1552 let mut projected_map = HashMap::new();
1553 projected_map.insert(routing_id, projected);
1554
1555 let storage = InMemoryBlobStorage::new();
1556 let state = test_state_with(projected_map, storage);
1557 let router = broadcast_projection_router(state);
1558
1559 let routing_hex = hex_encode(&routing_id);
1561 let req = Request::builder()
1562 .uri(format!("/scp/broadcast/{routing_hex}/feed?limit=999"))
1563 .body(Body::empty())
1564 .unwrap();
1565
1566 let resp = router.oneshot(req).await.unwrap();
1567 assert_eq!(resp.status(), HttpStatus::OK);
1568
1569 let body = resp.into_body().collect().await.unwrap().to_bytes();
1570 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1571 assert!(json["messages"].as_array().unwrap().is_empty());
1573 }
1574
1575 #[tokio::test]
1576 async fn feed_empty_context_returns_empty_messages() {
1577 let key = generate_broadcast_key("did:dht:alice");
1578 let context_id = "empty_ctx";
1579 let projected = ProjectedContext::new(context_id, key, BroadcastAdmission::Open, None);
1580 let routing_id = projected.routing_id;
1581
1582 let mut projected_map = HashMap::new();
1583 projected_map.insert(routing_id, projected);
1584
1585 let state = test_state_with(projected_map, InMemoryBlobStorage::new());
1586 let router = broadcast_projection_router(state);
1587
1588 let routing_hex = hex_encode(&routing_id);
1589 let req = Request::builder()
1590 .uri(format!("/scp/broadcast/{routing_hex}/feed"))
1591 .body(Body::empty())
1592 .unwrap();
1593
1594 let resp = router.oneshot(req).await.unwrap();
1595 assert_eq!(resp.status(), HttpStatus::OK);
1596
1597 let body = resp.into_body().collect().await.unwrap().to_bytes();
1598 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1599 assert_eq!(json["context_id"], "empty_ctx");
1600 assert_eq!(json["author_did"], "");
1601 assert!(json["messages"].as_array().unwrap().is_empty());
1602
1603 }
1606
1607 #[tokio::test]
1612 async fn message_unknown_routing_id_returns_404() {
1613 let state = test_state_with(HashMap::new(), InMemoryBlobStorage::new());
1614 let router = broadcast_projection_router(state);
1615
1616 let routing_hex = hex_encode(&[0xAA; 32]);
1617 let blob_hex = hex_encode(&[0xBB; 32]);
1618 let req = Request::builder()
1619 .uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
1620 .body(Body::empty())
1621 .unwrap();
1622
1623 let resp = router.oneshot(req).await.unwrap();
1624 assert_eq!(resp.status(), HttpStatus::NOT_FOUND);
1625
1626 let body = resp.into_body().collect().await.unwrap().to_bytes();
1627 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1628 assert_eq!(json["code"], "NOT_FOUND");
1629 }
1630
1631 #[tokio::test]
1632 async fn message_unknown_blob_id_returns_404() {
1633 let key = generate_broadcast_key("did:dht:alice");
1634 let context_id = "msg_ctx_404";
1635 let projected = ProjectedContext::new(context_id, key, BroadcastAdmission::Open, None);
1636 let routing_id = projected.routing_id;
1637
1638 let mut projected_map = HashMap::new();
1639 projected_map.insert(routing_id, projected);
1640
1641 let storage = InMemoryBlobStorage::new();
1642 let state = test_state_with(projected_map, storage);
1643 let router = broadcast_projection_router(state);
1644
1645 let routing_hex = hex_encode(&routing_id);
1646 let blob_hex = hex_encode(&[0xCC; 32]); let req = Request::builder()
1648 .uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
1649 .body(Body::empty())
1650 .unwrap();
1651
1652 let resp = router.oneshot(req).await.unwrap();
1653 assert_eq!(resp.status(), HttpStatus::NOT_FOUND);
1654
1655 let body = resp.into_body().collect().await.unwrap().to_bytes();
1656 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1657 assert_eq!(json["code"], "NOT_FOUND");
1658 }
1659
1660 #[tokio::test]
1661 async fn message_returns_decrypted_single_message() {
1662 let key = generate_broadcast_key("did:dht:alice");
1663 let context_id = "msg_ctx_ok";
1664 let projected =
1665 ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
1666 let routing_id = projected.routing_id;
1667
1668 let mut projected_map = HashMap::new();
1669 projected_map.insert(routing_id, projected);
1670
1671 let storage = InMemoryBlobStorage::new();
1672
1673 let envelope = test_seal(&key, b"single message");
1675 let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
1676 let blob_id = {
1677 let mut h = Sha256::new();
1678 h.update(&blob_bytes);
1679 let r: [u8; 32] = h.finalize().into();
1680 r
1681 };
1682
1683 storage
1684 .store(routing_id, blob_id, None, 3600, blob_bytes)
1685 .await
1686 .unwrap();
1687
1688 let state = test_state_with(projected_map, storage);
1689 let router = broadcast_projection_router(state);
1690
1691 let routing_hex = hex_encode(&routing_id);
1692 let blob_hex = hex_encode(&blob_id);
1693 let req = Request::builder()
1694 .uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
1695 .body(Body::empty())
1696 .unwrap();
1697
1698 let resp = router.oneshot(req).await.unwrap();
1699 assert_eq!(resp.status(), HttpStatus::OK);
1700
1701 let cache_control = resp
1703 .headers()
1704 .get(header::CACHE_CONTROL)
1705 .unwrap()
1706 .to_str()
1707 .unwrap();
1708 assert_eq!(cache_control, "public, immutable, max-age=31536000");
1709
1710 let etag = resp.headers().get(header::ETAG).unwrap().to_str().unwrap();
1712 assert_eq!(etag, format!("\"{blob_hex}\""));
1713
1714 let body = resp.into_body().collect().await.unwrap().to_bytes();
1716 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1717
1718 assert_eq!(json["id"], blob_hex);
1719 assert_eq!(json["author_did"], "did:dht:alice");
1720 assert_eq!(json["key_epoch"], 0);
1721
1722 let content_b64 = json["content"].as_str().unwrap();
1724 let decoded = BASE64.decode(content_b64).unwrap();
1725 assert_eq!(decoded, b"single message");
1726 }
1727
1728 #[tokio::test]
1729 async fn message_conditional_get_returns_304() {
1730 let key = generate_broadcast_key("did:dht:alice");
1731 let context_id = "msg_ctx_304";
1732 let projected =
1733 ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
1734 let routing_id = projected.routing_id;
1735
1736 let mut projected_map = HashMap::new();
1737 projected_map.insert(routing_id, projected);
1738
1739 let storage = InMemoryBlobStorage::new();
1740
1741 let envelope = test_seal(&key, b"cached msg");
1742 let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
1743 let blob_id = {
1744 let mut h = Sha256::new();
1745 h.update(&blob_bytes);
1746 let r: [u8; 32] = h.finalize().into();
1747 r
1748 };
1749
1750 storage
1751 .store(routing_id, blob_id, None, 3600, blob_bytes)
1752 .await
1753 .unwrap();
1754
1755 let state = test_state_with(projected_map, storage);
1756 let router = broadcast_projection_router(state);
1757
1758 let routing_hex = hex_encode(&routing_id);
1759 let blob_hex = hex_encode(&blob_id);
1760 let etag_value = format!("\"{blob_hex}\"");
1761 let req = Request::builder()
1762 .uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
1763 .header("If-None-Match", &etag_value)
1764 .body(Body::empty())
1765 .unwrap();
1766
1767 let resp = router.oneshot(req).await.unwrap();
1768 assert_eq!(resp.status(), HttpStatus::NOT_MODIFIED);
1769
1770 let body = resp.into_body().collect().await.unwrap().to_bytes();
1772 assert!(body.is_empty());
1773 }
1774
1775 #[tokio::test]
1776 async fn message_conditional_get_non_matching_returns_200() {
1777 let key = generate_broadcast_key("did:dht:alice");
1778 let context_id = "msg_ctx_200";
1779 let projected =
1780 ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
1781 let routing_id = projected.routing_id;
1782
1783 let mut projected_map = HashMap::new();
1784 projected_map.insert(routing_id, projected);
1785
1786 let storage = InMemoryBlobStorage::new();
1787
1788 let envelope = test_seal(&key, b"fresh msg");
1789 let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
1790 let blob_id = {
1791 let mut h = Sha256::new();
1792 h.update(&blob_bytes);
1793 let r: [u8; 32] = h.finalize().into();
1794 r
1795 };
1796
1797 storage
1798 .store(routing_id, blob_id, None, 3600, blob_bytes)
1799 .await
1800 .unwrap();
1801
1802 let state = test_state_with(projected_map, storage);
1803 let router = broadcast_projection_router(state);
1804
1805 let routing_hex = hex_encode(&routing_id);
1806 let blob_hex = hex_encode(&blob_id);
1807 let wrong_etag = format!("\"{}\"", hex_encode(&[0xFF; 32]));
1809 let req = Request::builder()
1810 .uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
1811 .header("If-None-Match", &wrong_etag)
1812 .body(Body::empty())
1813 .unwrap();
1814
1815 let resp = router.oneshot(req).await.unwrap();
1816 assert_eq!(resp.status(), HttpStatus::OK);
1817
1818 let body = resp.into_body().collect().await.unwrap().to_bytes();
1819 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1820 assert_eq!(json["id"], blob_hex);
1821 }
1822
1823 #[tokio::test]
1824 async fn message_invalid_hex_returns_400() {
1825 let state = test_state_with(HashMap::new(), InMemoryBlobStorage::new());
1826 let router = broadcast_projection_router(state);
1827
1828 let req = Request::builder()
1830 .uri("/scp/broadcast/not_valid_hex/messages/also_not_hex")
1831 .body(Body::empty())
1832 .unwrap();
1833
1834 let resp = router.oneshot(req).await.unwrap();
1835 assert_eq!(resp.status(), HttpStatus::BAD_REQUEST);
1836 }
1837
1838 #[test]
1843 fn compute_routing_id_is_sha256_of_context_id_bytes() {
1844 let context_id = "abc123deadbeef";
1845 let routing_id = compute_routing_id(context_id);
1846
1847 let mut hasher = Sha256::new();
1849 hasher.update(context_id.as_bytes());
1850 let expected: [u8; 32] = hasher.finalize().into();
1851
1852 assert_eq!(routing_id, expected);
1853 }
1854
1855 #[test]
1856 fn compute_routing_id_deterministic() {
1857 let id = "deadbeefcafe0123456789abcdef";
1858 assert_eq!(compute_routing_id(id), compute_routing_id(id));
1859 }
1860
1861 #[test]
1862 fn compute_routing_id_distinct_for_different_inputs() {
1863 let a = compute_routing_id("context_a");
1864 let b = compute_routing_id("context_b");
1865 assert_ne!(a, b);
1866 }
1867
1868 #[test]
1869 fn projected_context_new_sets_routing_id() {
1870 let key = generate_broadcast_key("did:dht:alice");
1871 let ctx = ProjectedContext::new("abc123", key, BroadcastAdmission::Open, None);
1872
1873 let expected_routing_id = compute_routing_id("abc123");
1874 assert_eq!(ctx.routing_id, expected_routing_id);
1875 assert_eq!(ctx.context_id(), "abc123");
1876 }
1877
1878 #[test]
1879 fn projected_context_new_inserts_key_at_epoch() {
1880 let key = generate_broadcast_key("did:dht:alice");
1881 let ctx = ProjectedContext::new("abc123", key, BroadcastAdmission::Open, None);
1882
1883 assert!(ctx.key_for_epoch(0).is_some());
1884 assert_eq!(ctx.keys().len(), 1);
1885 }
1886
1887 #[test]
1888 fn enable_then_disable_roundtrip() {
1889 let mut registry: HashMap<[u8; 32], ProjectedContext> = HashMap::new();
1891
1892 let context_id = "test_context_001";
1893 let key = generate_broadcast_key("did:dht:alice");
1894 let routing_id = compute_routing_id(context_id);
1895
1896 let projected = ProjectedContext::new(context_id, key, BroadcastAdmission::Open, None);
1897 registry.insert(routing_id, projected);
1898 assert!(registry.contains_key(&routing_id));
1899
1900 registry.remove(&routing_id);
1902 assert!(!registry.contains_key(&routing_id));
1903 }
1904
1905 #[test]
1906 fn multiple_epochs_stored_and_retrievable() {
1907 let key0 = generate_broadcast_key("did:dht:alice");
1908 let mut ctx =
1909 ProjectedContext::new("multi_epoch_ctx", key0, BroadcastAdmission::Open, None);
1910
1911 let key0_ref = ctx.key_for_epoch(0).expect("epoch 0 should exist");
1913 let (key1, _advance) = rotate_broadcast_key(key0_ref, 1000).expect("rotate should succeed");
1914 assert_eq!(key1.epoch(), 1);
1915 ctx.insert_key(key1);
1916
1917 let key1_ref = ctx.key_for_epoch(1).expect("epoch 1 should exist");
1919 let (key2, _advance) = rotate_broadcast_key(key1_ref, 2000).expect("rotate should succeed");
1920 assert_eq!(key2.epoch(), 2);
1921 ctx.insert_key(key2);
1922
1923 assert!(ctx.key_for_epoch(0).is_some(), "epoch 0 retained");
1925 assert!(ctx.key_for_epoch(1).is_some(), "epoch 1 retained");
1926 assert!(ctx.key_for_epoch(2).is_some(), "epoch 2 retained");
1927 assert_eq!(ctx.keys().len(), 3);
1928
1929 assert!(ctx.key_for_epoch(99).is_none());
1931 }
1932
1933 #[test]
1934 fn insert_key_replaces_existing_epoch() {
1935 let key0 = generate_broadcast_key("did:dht:alice");
1936 let mut ctx = ProjectedContext::new("replace_test", key0, BroadcastAdmission::Open, None);
1937
1938 let replacement = generate_broadcast_key("did:dht:alice");
1940 ctx.insert_key(replacement);
1941
1942 assert_eq!(ctx.keys().len(), 1);
1943 assert!(ctx.key_for_epoch(0).is_some());
1944 }
1945
1946 #[test]
1947 fn retain_only_epochs_keeps_specified_purges_rest() {
1948 let key0 = generate_broadcast_key("did:dht:alice");
1949 let mut ctx = ProjectedContext::new("purge_test", key0, BroadcastAdmission::Open, None);
1950
1951 let k0 = ctx.key_for_epoch(0).unwrap().clone();
1953 let (k1, _) = rotate_broadcast_key(&k0, 1000).unwrap();
1954 ctx.insert_key(k1.clone());
1955 let (k2, _) = rotate_broadcast_key(&k1, 2000).unwrap();
1956 ctx.insert_key(k2);
1957 assert_eq!(ctx.keys().len(), 3);
1958
1959 let retain = std::collections::HashSet::from([2]);
1962 ctx.retain_only_epochs(&retain);
1963
1964 assert!(ctx.key_for_epoch(0).is_none(), "epoch 0 purged");
1965 assert!(ctx.key_for_epoch(1).is_none(), "epoch 1 purged");
1966 assert!(ctx.key_for_epoch(2).is_some(), "epoch 2 retained");
1967 assert_eq!(ctx.keys().len(), 1);
1968 }
1969
1970 #[test]
1971 fn retain_only_epochs_handles_divergent_authors() {
1972 let alice_key = generate_broadcast_key("did:dht:alice");
1975 let mut ctx = ProjectedContext::new(
1976 "divergent_test",
1977 alice_key.clone(),
1978 BroadcastAdmission::Open,
1979 None,
1980 );
1981
1982 let (a1, _) = rotate_broadcast_key(&alice_key, 1000).unwrap();
1984 ctx.insert_key(a1.clone());
1985 let (a2, _) = rotate_broadcast_key(&a1, 2000).unwrap();
1986 ctx.insert_key(a2.clone());
1987 let (a3, _) = rotate_broadcast_key(&a2, 3000).unwrap();
1988 ctx.insert_key(a3);
1989
1990 let bob_key = generate_broadcast_key("did:dht:bob");
1993 let (b1, _) = rotate_broadcast_key(&bob_key, 1000).unwrap();
1994 let (b2, _) = rotate_broadcast_key(&b1, 2000).unwrap();
1995 let (b3, _) = rotate_broadcast_key(&b2, 3000).unwrap();
1996 let (b4, _) = rotate_broadcast_key(&b3, 4000).unwrap();
1997 ctx.insert_key(b4.clone());
1998 let (b5, _) = rotate_broadcast_key(&b4, 5000).unwrap();
1999 ctx.insert_key(b5);
2000
2001 assert_eq!(ctx.keys().len(), 6); let retain = std::collections::HashSet::from([3, 5]);
2005 ctx.retain_only_epochs(&retain);
2006
2007 assert_eq!(ctx.keys().len(), 2);
2008 assert!(ctx.key_for_epoch(3).is_some(), "alice new epoch retained");
2009 assert!(ctx.key_for_epoch(5).is_some(), "bob new epoch retained");
2010 assert!(ctx.key_for_epoch(0).is_none(), "old alice epoch purged");
2011 assert!(ctx.key_for_epoch(1).is_none(), "old alice epoch purged");
2012 assert!(ctx.key_for_epoch(2).is_none(), "old alice epoch purged");
2013 assert!(ctx.key_for_epoch(4).is_none(), "old bob epoch purged");
2014 }
2015
2016 #[tokio::test]
2021 #[allow(clippy::similar_names)]
2022 async fn message_cross_context_routing_id_mismatch_returns_404() {
2023 let key_a = generate_broadcast_key("did:dht:alice");
2025 let key_b = generate_broadcast_key("did:dht:bob");
2026 let ctx_a =
2027 ProjectedContext::new("context_a", key_a.clone(), BroadcastAdmission::Open, None);
2028 let ctx_b = ProjectedContext::new("context_b", key_b, BroadcastAdmission::Open, None);
2029 let routing_id_a = ctx_a.routing_id;
2030 let routing_id_b = ctx_b.routing_id;
2031
2032 let mut projected_map = HashMap::new();
2033 projected_map.insert(routing_id_a, ctx_a);
2034 projected_map.insert(routing_id_b, ctx_b);
2035
2036 let storage = InMemoryBlobStorage::new();
2037
2038 let envelope = test_seal(&key_a, b"belongs to context_a");
2040 let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
2041 let blob_id = {
2042 let mut h = Sha256::new();
2043 h.update(&blob_bytes);
2044 let r: [u8; 32] = h.finalize().into();
2045 r
2046 };
2047 storage
2048 .store(routing_id_a, blob_id, None, 3600, blob_bytes)
2049 .await
2050 .unwrap();
2051
2052 let state = test_state_with(projected_map, storage);
2053
2054 let router_b = broadcast_projection_router(Arc::clone(&state));
2056 let routing_b_hex = hex_encode(&routing_id_b);
2057 let blob_hex = hex_encode(&blob_id);
2058 let req = Request::builder()
2059 .uri(format!(
2060 "/scp/broadcast/{routing_b_hex}/messages/{blob_hex}"
2061 ))
2062 .body(Body::empty())
2063 .unwrap();
2064
2065 let resp = router_b.oneshot(req).await.unwrap();
2066 assert_eq!(
2067 resp.status(),
2068 HttpStatus::NOT_FOUND,
2069 "cross-context request must return 404, not leak blob existence"
2070 );
2071
2072 let body = resp.into_body().collect().await.unwrap().to_bytes();
2073 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
2074 assert_eq!(
2075 json["error"], "unknown blob_id",
2076 "error message must be indistinguishable from genuinely missing blob"
2077 );
2078
2079 let router_a = broadcast_projection_router(state);
2081 let routing_a_hex = hex_encode(&routing_id_a);
2082 let req = Request::builder()
2083 .uri(format!(
2084 "/scp/broadcast/{routing_a_hex}/messages/{blob_hex}"
2085 ))
2086 .body(Body::empty())
2087 .unwrap();
2088
2089 let resp = router_a.oneshot(req).await.unwrap();
2090 assert_eq!(
2091 resp.status(),
2092 HttpStatus::OK,
2093 "correct context should return 200"
2094 );
2095 }
2096
2097 #[tokio::test]
2102 async fn feed_since_parameter_filters_messages() {
2103 let key = generate_broadcast_key("did:dht:alice");
2104 let context_id = "since_ctx";
2105 let projected =
2106 ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
2107 let routing_id = projected.routing_id;
2108
2109 let mut projected_map = HashMap::new();
2110 projected_map.insert(routing_id, projected);
2111
2112 let storage = InMemoryBlobStorage::new();
2113
2114 let mut blob_ids = Vec::new();
2118 for i in 0u8..3 {
2119 let envelope = test_seal(&key, &[i; 16]);
2120 let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
2121 let blob_id = {
2122 let mut h = Sha256::new();
2123 h.update(&blob_bytes);
2124 let r: [u8; 32] = h.finalize().into();
2125 r
2126 };
2127 storage
2128 .store(routing_id, blob_id, None, 3600, blob_bytes)
2129 .await
2130 .unwrap();
2131 blob_ids.push(blob_id);
2132 }
2133
2134 let state = test_state_with(projected_map, storage);
2135 let routing_hex = hex_encode(&routing_id);
2136
2137 let router = broadcast_projection_router(Arc::clone(&state));
2142 let since_hex = hex_encode(&blob_ids[0]);
2143 let req = Request::builder()
2144 .uri(format!(
2145 "/scp/broadcast/{routing_hex}/feed?since={since_hex}"
2146 ))
2147 .body(Body::empty())
2148 .unwrap();
2149
2150 let resp = router.oneshot(req).await.unwrap();
2151 assert_eq!(resp.status(), HttpStatus::OK);
2152
2153 let router = broadcast_projection_router(Arc::clone(&state));
2157 let nonexistent = hex_encode(&[0xFF; 32]);
2158 let req = Request::builder()
2159 .uri(format!(
2160 "/scp/broadcast/{routing_hex}/feed?since={nonexistent}"
2161 ))
2162 .body(Body::empty())
2163 .unwrap();
2164
2165 let resp = router.oneshot(req).await.unwrap();
2166 assert_eq!(resp.status(), HttpStatus::OK);
2167 let body = resp.into_body().collect().await.unwrap().to_bytes();
2168 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
2169 let messages = json["messages"].as_array().unwrap();
2170 assert_eq!(
2171 messages.len(),
2172 0,
2173 "nonexistent since blob_id should return empty feed (cross-context oracle prevention)"
2174 );
2175
2176 let router = broadcast_projection_router(state);
2178 let req = Request::builder()
2179 .uri(format!(
2180 "/scp/broadcast/{routing_hex}/feed?since=not_valid_hex"
2181 ))
2182 .body(Body::empty())
2183 .unwrap();
2184
2185 let resp = router.oneshot(req).await.unwrap();
2186 assert_eq!(resp.status(), HttpStatus::BAD_REQUEST);
2187
2188 let body = resp.into_body().collect().await.unwrap().to_bytes();
2189 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
2190 assert_eq!(json["code"], "BAD_REQUEST");
2191 }
2192
2193 #[tokio::test]
2198 async fn feed_multi_epoch_decryption() {
2199 let key0 = generate_broadcast_key("did:dht:alice");
2200 let (key1, _advance) = rotate_broadcast_key(&key0, 1000).unwrap();
2201
2202 let context_id = "multi_epoch_feed_ctx";
2203 let mut projected =
2204 ProjectedContext::new(context_id, key0.clone(), BroadcastAdmission::Open, None);
2205 projected.insert_key(key1.clone());
2206 let routing_id = projected.routing_id;
2207
2208 let mut projected_map = HashMap::new();
2209 projected_map.insert(routing_id, projected);
2210
2211 let storage = InMemoryBlobStorage::new();
2212
2213 let envelope_0 = test_seal(&key0, b"epoch zero message");
2215 let blob_bytes_0 = rmp_serde::to_vec(&envelope_0).unwrap();
2216 let blob_id_0 = {
2217 let mut h = Sha256::new();
2218 h.update(&blob_bytes_0);
2219 let r: [u8; 32] = h.finalize().into();
2220 r
2221 };
2222 storage
2223 .store(routing_id, blob_id_0, None, 3600, blob_bytes_0)
2224 .await
2225 .unwrap();
2226
2227 let envelope_1 = test_seal(&key1, b"epoch one message");
2229 let blob_bytes_1 = rmp_serde::to_vec(&envelope_1).unwrap();
2230 let blob_id_1 = {
2231 let mut h = Sha256::new();
2232 h.update(&blob_bytes_1);
2233 let r: [u8; 32] = h.finalize().into();
2234 r
2235 };
2236 storage
2237 .store(routing_id, blob_id_1, None, 3600, blob_bytes_1)
2238 .await
2239 .unwrap();
2240
2241 let state = test_state_with(projected_map, storage);
2242 let router = broadcast_projection_router(state);
2243
2244 let routing_hex = hex_encode(&routing_id);
2245 let req = Request::builder()
2246 .uri(format!("/scp/broadcast/{routing_hex}/feed"))
2247 .body(Body::empty())
2248 .unwrap();
2249
2250 let resp = router.oneshot(req).await.unwrap();
2251 assert_eq!(resp.status(), HttpStatus::OK);
2252
2253 let body = resp.into_body().collect().await.unwrap().to_bytes();
2254 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
2255 let messages = json["messages"].as_array().unwrap();
2256 assert_eq!(
2257 messages.len(),
2258 2,
2259 "both epoch-0 and epoch-1 messages returned"
2260 );
2261
2262 let contents: Vec<String> = messages
2264 .iter()
2265 .map(|m| {
2266 let b64 = m["content"].as_str().unwrap();
2267 String::from_utf8(BASE64.decode(b64).unwrap()).unwrap()
2268 })
2269 .collect();
2270 assert!(contents.contains(&"epoch zero message".to_owned()));
2271 assert!(contents.contains(&"epoch one message".to_owned()));
2272
2273 let epochs: Vec<u64> = messages
2275 .iter()
2276 .map(|m| m["key_epoch"].as_u64().unwrap())
2277 .collect();
2278 assert!(epochs.contains(&0));
2279 assert!(epochs.contains(&1));
2280 }
2281
2282 #[tokio::test]
2287 async fn message_tampered_ciphertext_returns_500() {
2288 let key = generate_broadcast_key("did:dht:alice");
2289 let context_id = "tamper_ctx";
2290 let projected =
2291 ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
2292 let routing_id = projected.routing_id;
2293
2294 let mut projected_map = HashMap::new();
2295 projected_map.insert(routing_id, projected);
2296
2297 let storage = InMemoryBlobStorage::new();
2298
2299 let mut envelope = test_seal(&key, b"tamper target");
2301 if envelope.encrypted_content.len() > 13 {
2303 envelope.encrypted_content[13] ^= 0xFF;
2304 }
2305 let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
2306 let blob_id = {
2307 let mut h = Sha256::new();
2308 h.update(&blob_bytes);
2309 let r: [u8; 32] = h.finalize().into();
2310 r
2311 };
2312 storage
2313 .store(routing_id, blob_id, None, 3600, blob_bytes)
2314 .await
2315 .unwrap();
2316
2317 let valid_envelope = test_seal(&key, b"valid message");
2319 let valid_blob_bytes = rmp_serde::to_vec(&valid_envelope).unwrap();
2320 let valid_blob_id = {
2321 let mut h = Sha256::new();
2322 h.update(&valid_blob_bytes);
2323 let r: [u8; 32] = h.finalize().into();
2324 r
2325 };
2326 storage
2327 .store(routing_id, valid_blob_id, None, 3600, valid_blob_bytes)
2328 .await
2329 .unwrap();
2330
2331 let state = test_state_with(projected_map, storage);
2332
2333 let router = broadcast_projection_router(Arc::clone(&state));
2335 let routing_hex = hex_encode(&routing_id);
2336 let blob_hex = hex_encode(&blob_id);
2337 let req = Request::builder()
2338 .uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
2339 .body(Body::empty())
2340 .unwrap();
2341
2342 let resp = router.oneshot(req).await.unwrap();
2343 assert_eq!(resp.status(), HttpStatus::INTERNAL_SERVER_ERROR);
2344
2345 let body = resp.into_body().collect().await.unwrap().to_bytes();
2346 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
2347 assert_eq!(json["error"], "decryption failure");
2348
2349 let router = broadcast_projection_router(state);
2352 let req = Request::builder()
2353 .uri(format!("/scp/broadcast/{routing_hex}/feed"))
2354 .body(Body::empty())
2355 .unwrap();
2356
2357 let resp = router.oneshot(req).await.unwrap();
2358 assert_eq!(resp.status(), HttpStatus::OK);
2359
2360 let body = resp.into_body().collect().await.unwrap().to_bytes();
2361 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
2362 let messages = json["messages"].as_array().unwrap();
2363 assert_eq!(
2364 messages.len(),
2365 1,
2366 "tampered message should be skipped, valid message returned"
2367 );
2368
2369 let content_b64 = messages[0]["content"].as_str().unwrap();
2370 let decoded = BASE64.decode(content_b64).unwrap();
2371 assert_eq!(decoded, b"valid message");
2372 }
2373
2374 #[tokio::test]
2379 async fn message_purged_epoch_returns_410_gone() {
2380 let key = generate_broadcast_key("did:dht:alice");
2384 let context_id = "purge_410_ctx";
2385 let mut projected =
2386 ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
2387
2388 let (key1, _) = rotate_broadcast_key(&key, 1000).unwrap();
2390 projected.insert_key(key1);
2391 let retain = std::collections::HashSet::from([1]);
2392 projected.retain_only_epochs(&retain);
2393 assert!(projected.key_for_epoch(0).is_none());
2394 assert!(projected.key_for_epoch(1).is_some());
2395
2396 let routing_id = projected.routing_id;
2397 let mut projected_map = HashMap::new();
2398 projected_map.insert(routing_id, projected);
2399
2400 let storage = InMemoryBlobStorage::new();
2401
2402 let envelope = test_seal(&key, b"old content");
2404 let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
2405 let blob_id = {
2406 let mut h = Sha256::new();
2407 h.update(&blob_bytes);
2408 let r: [u8; 32] = h.finalize().into();
2409 r
2410 };
2411 storage
2412 .store(routing_id, blob_id, None, 3600, blob_bytes)
2413 .await
2414 .unwrap();
2415
2416 let state = test_state_with(projected_map, storage);
2417 let router = broadcast_projection_router(state);
2418 let routing_hex = hex_encode(&routing_id);
2419 let blob_hex = hex_encode(&blob_id);
2420 let req = Request::builder()
2421 .uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
2422 .body(Body::empty())
2423 .unwrap();
2424
2425 let resp = router.oneshot(req).await.unwrap();
2426 assert_eq!(resp.status(), HttpStatus::GONE);
2427
2428 let body = resp.into_body().collect().await.unwrap().to_bytes();
2429 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
2430 assert_eq!(json["error"], "content revoked");
2431 assert_eq!(json["code"], "GONE");
2432 }
2433
2434 #[tokio::test]
2439 async fn message_conditional_get_cross_context_returns_404_not_304() {
2440 let key_a = generate_broadcast_key("did:dht:alice");
2445 let key_b = generate_broadcast_key("did:dht:bob");
2446 let ctx_a =
2447 ProjectedContext::new("ctx_a_304", key_a.clone(), BroadcastAdmission::Open, None);
2448 let ctx_b = ProjectedContext::new("ctx_b_304", key_b, BroadcastAdmission::Open, None);
2449 let routing_id_a = ctx_a.routing_id;
2450 let routing_id_b = ctx_b.routing_id;
2451
2452 let mut projected_map = HashMap::new();
2453 projected_map.insert(routing_id_a, ctx_a);
2454 projected_map.insert(routing_id_b, ctx_b);
2455
2456 let storage = InMemoryBlobStorage::new();
2457
2458 let envelope = test_seal(&key_a, b"secret of A");
2460 let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
2461 let blob_id = {
2462 let mut h = Sha256::new();
2463 h.update(&blob_bytes);
2464 let r: [u8; 32] = h.finalize().into();
2465 r
2466 };
2467 storage
2468 .store(routing_id_a, blob_id, None, 3600, blob_bytes)
2469 .await
2470 .unwrap();
2471
2472 let state = test_state_with(projected_map, storage);
2473 let router = broadcast_projection_router(state);
2474
2475 let routing_b_hex = hex_encode(&routing_id_b);
2477 let blob_hex = hex_encode(&blob_id);
2478 let etag_value = format!("\"{blob_hex}\"");
2479 let req = Request::builder()
2480 .uri(format!(
2481 "/scp/broadcast/{routing_b_hex}/messages/{blob_hex}"
2482 ))
2483 .header("If-None-Match", &etag_value)
2484 .body(Body::empty())
2485 .unwrap();
2486
2487 let resp = router.oneshot(req).await.unwrap();
2488 assert_eq!(
2489 resp.status(),
2490 HttpStatus::NOT_FOUND,
2491 "cross-context conditional GET must return 404, not 304"
2492 );
2493
2494 let body = resp.into_body().collect().await.unwrap().to_bytes();
2495 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
2496 assert_eq!(json["error"], "unknown blob_id");
2497 }
2498
2499 #[tokio::test]
2504 async fn rate_limit_returns_429_when_exceeded() {
2505 let state = test_state_with_rate(HashMap::new(), InMemoryBlobStorage::new(), 2);
2507 let routing_hex = hex_encode(&[0xAA; 32]);
2508 let uri = format!("/scp/broadcast/{routing_hex}/feed");
2509
2510 for i in 0..2 {
2512 let router = broadcast_projection_router(Arc::clone(&state));
2513 let req = Request::builder().uri(&uri).body(Body::empty()).unwrap();
2514 let resp = router.oneshot(req).await.unwrap();
2515 assert_ne!(
2516 resp.status(),
2517 HttpStatus::TOO_MANY_REQUESTS,
2518 "request {i} should not be rate-limited"
2519 );
2520 }
2521
2522 let router = broadcast_projection_router(Arc::clone(&state));
2524 let req = Request::builder().uri(&uri).body(Body::empty()).unwrap();
2525 let resp = router.oneshot(req).await.unwrap();
2526 assert_eq!(
2527 resp.status(),
2528 HttpStatus::TOO_MANY_REQUESTS,
2529 "third request should be rate-limited"
2530 );
2531 }
2532
2533 #[tokio::test]
2534 async fn rate_limit_allows_different_ips() {
2535 let limiter = scp_transport::relay::rate_limit::PublishRateLimiter::new(1);
2537 let ip_a: std::net::IpAddr = "10.0.0.1".parse().unwrap();
2538 let ip_b: std::net::IpAddr = "10.0.0.2".parse().unwrap();
2539
2540 assert!(limiter.check(ip_a).await, "ip_a first request should pass");
2542 assert!(limiter.check(ip_b).await, "ip_b first request should pass");
2543
2544 assert!(
2546 !limiter.check(ip_a).await,
2547 "ip_a second request should be rate-limited"
2548 );
2549
2550 assert!(
2552 !limiter.check(ip_b).await,
2553 "ip_b second request should be rate-limited"
2554 );
2555 }
2556
2557 use scp_core::context::params::{ProjectionOverride, ProjectionPolicy, ProjectionRule};
2562 use scp_identity::DID;
2563
2564 #[tokio::test]
2565 async fn feed_open_context_serves_without_auth() {
2566 let key = generate_broadcast_key("did:dht:alice");
2567 let context_id = "open_no_auth_ctx";
2568 let projected =
2569 ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
2570 let routing_id = projected.routing_id;
2571
2572 let mut projected_map = HashMap::new();
2573 projected_map.insert(routing_id, projected);
2574
2575 let storage = InMemoryBlobStorage::new();
2576
2577 let envelope = test_seal(&key, b"public content");
2579 let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
2580 let blob_id = {
2581 let mut h = Sha256::new();
2582 h.update(&blob_bytes);
2583 let r: [u8; 32] = h.finalize().into();
2584 r
2585 };
2586 storage
2587 .store(routing_id, blob_id, None, 3600, blob_bytes)
2588 .await
2589 .unwrap();
2590
2591 let state = test_state_with(projected_map, storage);
2592 let router = broadcast_projection_router(state);
2593
2594 let routing_hex = hex_encode(&routing_id);
2596 let req = Request::builder()
2597 .uri(format!("/scp/broadcast/{routing_hex}/feed"))
2598 .body(Body::empty())
2599 .unwrap();
2600
2601 let resp = router.oneshot(req).await.unwrap();
2602 assert_eq!(resp.status(), HttpStatus::OK);
2603
2604 let cache_control = resp
2606 .headers()
2607 .get(header::CACHE_CONTROL)
2608 .unwrap()
2609 .to_str()
2610 .unwrap();
2611 assert_eq!(
2612 cache_control,
2613 "public, max-age=30, stale-while-revalidate=300"
2614 );
2615 }
2616
2617 #[tokio::test]
2618 async fn feed_gated_context_rejects_without_auth() {
2619 let key = generate_broadcast_key("did:dht:alice");
2620 let context_id = "gated_no_auth_ctx";
2621 let projected = ProjectedContext::new(context_id, key, BroadcastAdmission::Gated, None);
2622 let routing_id = projected.routing_id;
2623
2624 let mut projected_map = HashMap::new();
2625 projected_map.insert(routing_id, projected);
2626
2627 let state = test_state_with(projected_map, InMemoryBlobStorage::new());
2628 let router = broadcast_projection_router(state);
2629
2630 let routing_hex = hex_encode(&routing_id);
2631 let req = Request::builder()
2632 .uri(format!("/scp/broadcast/{routing_hex}/feed"))
2633 .body(Body::empty())
2634 .unwrap();
2635
2636 let resp = router.oneshot(req).await.unwrap();
2637 assert_eq!(resp.status(), HttpStatus::UNAUTHORIZED);
2638
2639 let body = resp.into_body().collect().await.unwrap().to_bytes();
2640 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
2641 assert_eq!(json["error"], "Authorization required for gated broadcast");
2642 assert_eq!(json["code"], "UNAUTHORIZED");
2643 }
2644
2645 #[tokio::test]
2646 async fn feed_gated_context_rejects_malformed_auth() {
2647 let key = generate_broadcast_key("did:dht:alice");
2648 let context_id = "gated_bad_auth_ctx";
2649 let projected = ProjectedContext::new(context_id, key, BroadcastAdmission::Gated, None);
2650 let routing_id = projected.routing_id;
2651
2652 let mut projected_map = HashMap::new();
2653 projected_map.insert(routing_id, projected);
2654
2655 let state = test_state_with(projected_map, InMemoryBlobStorage::new());
2656 let router = broadcast_projection_router(state);
2657
2658 let routing_hex = hex_encode(&routing_id);
2659
2660 let req = Request::builder()
2662 .uri(format!("/scp/broadcast/{routing_hex}/feed"))
2663 .header("Authorization", "Basic dXNlcjpwYXNz")
2664 .body(Body::empty())
2665 .unwrap();
2666
2667 let resp = router.oneshot(req).await.unwrap();
2668 assert_eq!(resp.status(), HttpStatus::UNAUTHORIZED);
2669
2670 let router = broadcast_projection_router(test_state_with(
2672 {
2673 let mut m = HashMap::new();
2674 let key2 = generate_broadcast_key("did:dht:alice");
2675 let p2 = ProjectedContext::new(context_id, key2, BroadcastAdmission::Gated, None);
2676 m.insert(p2.routing_id, p2);
2677 m
2678 },
2679 InMemoryBlobStorage::new(),
2680 ));
2681
2682 let req = Request::builder()
2683 .uri(format!("/scp/broadcast/{routing_hex}/feed"))
2684 .header("Authorization", "Bearer not-a-valid-jwt")
2685 .body(Body::empty())
2686 .unwrap();
2687
2688 let resp = router.oneshot(req).await.unwrap();
2689 assert_eq!(resp.status(), HttpStatus::UNAUTHORIZED);
2690
2691 let body = resp.into_body().collect().await.unwrap().to_bytes();
2692 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
2693 assert_eq!(json["code"], "UNAUTHORIZED");
2694 }
2695
2696 #[tokio::test]
2697 async fn message_gated_context_rejects_without_auth() {
2698 let key = generate_broadcast_key("did:dht:alice");
2699 let context_id = "gated_msg_no_auth";
2700 let projected =
2701 ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Gated, None);
2702 let routing_id = projected.routing_id;
2703
2704 let mut projected_map = HashMap::new();
2705 projected_map.insert(routing_id, projected);
2706
2707 let storage = InMemoryBlobStorage::new();
2708
2709 let envelope = test_seal(&key, b"secret content");
2710 let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
2711 let blob_id = {
2712 let mut h = Sha256::new();
2713 h.update(&blob_bytes);
2714 let r: [u8; 32] = h.finalize().into();
2715 r
2716 };
2717 storage
2718 .store(routing_id, blob_id, None, 3600, blob_bytes)
2719 .await
2720 .unwrap();
2721
2722 let state = test_state_with(projected_map, storage);
2723 let router = broadcast_projection_router(state);
2724
2725 let routing_hex = hex_encode(&routing_id);
2726 let blob_hex = hex_encode(&blob_id);
2727 let req = Request::builder()
2728 .uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
2729 .body(Body::empty())
2730 .unwrap();
2731
2732 let resp = router.oneshot(req).await.unwrap();
2733 assert_eq!(resp.status(), HttpStatus::UNAUTHORIZED);
2734
2735 let body = resp.into_body().collect().await.unwrap().to_bytes();
2736 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
2737 assert_eq!(json["error"], "Authorization required for gated broadcast");
2738 assert_eq!(json["code"], "UNAUTHORIZED");
2739 }
2740
2741 #[tokio::test]
2742 async fn gated_context_returns_private_cache_headers() {
2743 let key = generate_broadcast_key("did:dht:alice");
2745 let context_id = "gated_cache_ctx";
2746 let projected =
2747 ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Gated, None);
2748 let routing_id = projected.routing_id;
2749
2750 let mut projected_map = HashMap::new();
2751 projected_map.insert(routing_id, projected);
2752
2753 let storage = InMemoryBlobStorage::new();
2754
2755 let envelope = test_seal(&key, b"gated content");
2756 let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
2757 let blob_id = {
2758 let mut h = Sha256::new();
2759 h.update(&blob_bytes);
2760 let r: [u8; 32] = h.finalize().into();
2761 r
2762 };
2763 storage
2764 .store(routing_id, blob_id, None, 3600, blob_bytes)
2765 .await
2766 .unwrap();
2767
2768 let state = test_state_with(projected_map, storage);
2769
2770 let ucan_token = build_test_ucan(context_id);
2772
2773 let router = broadcast_projection_router(Arc::clone(&state));
2775 let routing_hex = hex_encode(&routing_id);
2776 let req = Request::builder()
2777 .uri(format!("/scp/broadcast/{routing_hex}/feed"))
2778 .header("Authorization", format!("Bearer {ucan_token}"))
2779 .body(Body::empty())
2780 .unwrap();
2781
2782 let resp = router.oneshot(req).await.unwrap();
2783 assert_eq!(resp.status(), HttpStatus::OK);
2784
2785 let cache_control = resp
2786 .headers()
2787 .get(header::CACHE_CONTROL)
2788 .unwrap()
2789 .to_str()
2790 .unwrap();
2791 assert_eq!(cache_control, "private, max-age=30");
2792
2793 let router = broadcast_projection_router(state);
2795 let blob_hex = hex_encode(&blob_id);
2796 let req = Request::builder()
2797 .uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
2798 .header("Authorization", format!("Bearer {ucan_token}"))
2799 .body(Body::empty())
2800 .unwrap();
2801
2802 let resp = router.oneshot(req).await.unwrap();
2803 assert_eq!(resp.status(), HttpStatus::OK);
2804
2805 let cache_control = resp
2806 .headers()
2807 .get(header::CACHE_CONTROL)
2808 .unwrap()
2809 .to_str()
2810 .unwrap();
2811 assert_eq!(cache_control, "private, immutable, max-age=31536000");
2812 }
2813
2814 #[test]
2819 fn effective_rule_open_no_policy() {
2820 let rule = effective_projection_rule(BroadcastAdmission::Open, None, None);
2821 assert_eq!(rule, ProjectionRule::Public);
2822 }
2823
2824 #[test]
2825 fn effective_rule_gated_no_policy() {
2826 let rule = effective_projection_rule(BroadcastAdmission::Gated, None, None);
2827 assert_eq!(rule, ProjectionRule::Gated);
2828 }
2829
2830 #[test]
2831 fn effective_rule_with_policy_default() {
2832 let policy = ProjectionPolicy {
2833 default_rule: ProjectionRule::Gated,
2834 overrides: vec![],
2835 };
2836 let rule = effective_projection_rule(BroadcastAdmission::Open, Some(&policy), None);
2837 assert_eq!(rule, ProjectionRule::Gated);
2838 }
2839
2840 #[test]
2841 fn effective_rule_per_author_override() {
2842 let policy = ProjectionPolicy {
2843 default_rule: ProjectionRule::Gated,
2844 overrides: vec![ProjectionOverride {
2845 did: DID::from("did:dht:special_author"),
2846 rule: ProjectionRule::Public,
2847 }],
2848 };
2849 let rule = effective_projection_rule(
2851 BroadcastAdmission::Open,
2852 Some(&policy),
2853 Some("did:dht:other"),
2854 );
2855 assert_eq!(rule, ProjectionRule::Gated);
2856
2857 let rule = effective_projection_rule(
2859 BroadcastAdmission::Open,
2860 Some(&policy),
2861 Some("did:dht:special_author"),
2862 );
2863 assert_eq!(rule, ProjectionRule::Public);
2864 }
2865
2866 #[test]
2867 fn effective_rule_no_author_uses_default() {
2868 let policy = ProjectionPolicy {
2869 default_rule: ProjectionRule::Gated,
2870 overrides: vec![ProjectionOverride {
2871 did: DID::from("did:dht:someone"),
2872 rule: ProjectionRule::Public,
2873 }],
2874 };
2875 let rule = effective_projection_rule(BroadcastAdmission::Open, Some(&policy), None);
2877 assert_eq!(rule, ProjectionRule::Gated);
2878 }
2879
2880 #[test]
2885 fn validate_policy_rejects_public_default_on_gated() {
2886 let policy = ProjectionPolicy {
2887 default_rule: ProjectionRule::Public,
2888 overrides: vec![],
2889 };
2890 let result = validate_projection_policy(BroadcastAdmission::Gated, Some(&policy));
2891 assert!(result.is_err());
2892 assert_eq!(
2893 result.unwrap_err(),
2894 "gated context cannot have public projection rule"
2895 );
2896 }
2897
2898 #[test]
2899 fn validate_policy_rejects_public_override_on_gated() {
2900 let policy = ProjectionPolicy {
2901 default_rule: ProjectionRule::Gated,
2902 overrides: vec![ProjectionOverride {
2903 did: DID::from("did:dht:some_author"),
2904 rule: ProjectionRule::Public,
2905 }],
2906 };
2907 let result = validate_projection_policy(BroadcastAdmission::Gated, Some(&policy));
2908 assert!(result.is_err());
2909 assert_eq!(
2910 result.unwrap_err(),
2911 "gated context cannot have public per-author projection override"
2912 );
2913 }
2914
2915 #[test]
2916 fn validate_policy_accepts_gated_default_on_gated() {
2917 let policy = ProjectionPolicy {
2918 default_rule: ProjectionRule::Gated,
2919 overrides: vec![],
2920 };
2921 assert!(validate_projection_policy(BroadcastAdmission::Gated, Some(&policy),).is_ok());
2922 }
2923
2924 #[test]
2925 fn validate_policy_accepts_public_on_open() {
2926 let policy = ProjectionPolicy {
2927 default_rule: ProjectionRule::Public,
2928 overrides: vec![],
2929 };
2930 assert!(validate_projection_policy(BroadcastAdmission::Open, Some(&policy),).is_ok());
2931 }
2932
2933 #[test]
2934 fn validate_policy_accepts_none_on_gated() {
2935 assert!(validate_projection_policy(BroadcastAdmission::Gated, None).is_ok());
2936 }
2937
2938 #[tokio::test]
2943 async fn feed_filters_per_author_gated_messages_without_auth() {
2944 use scp_core::crypto::sender_keys::broadcast::rotate_broadcast_key;
2945
2946 let alice_key = generate_broadcast_key("did:dht:alice");
2949 let bob_key_epoch0 = generate_broadcast_key("did:dht:bob");
2950 let (bob_key, _) = rotate_broadcast_key(&bob_key_epoch0, 1_000).unwrap();
2951 let context_id = "feed_per_author_filter_ctx";
2952
2953 let policy = ProjectionPolicy {
2954 default_rule: ProjectionRule::Public,
2955 overrides: vec![ProjectionOverride {
2956 did: DID::from("did:dht:alice"),
2957 rule: ProjectionRule::Gated,
2958 }],
2959 };
2960
2961 let mut projected = ProjectedContext::new(
2962 context_id,
2963 alice_key.clone(),
2964 BroadcastAdmission::Open,
2965 Some(policy),
2966 );
2967 projected.keys.insert(bob_key.epoch(), bob_key.clone());
2968 let routing_id = projected.routing_id;
2969
2970 let mut projected_map = HashMap::new();
2971 projected_map.insert(routing_id, projected);
2972
2973 let storage = InMemoryBlobStorage::new();
2974
2975 let alice_env = test_seal(&alice_key, b"alice secret");
2977 let alice_bytes = rmp_serde::to_vec(&alice_env).unwrap();
2978 let alice_blob_id = {
2979 let mut h = Sha256::new();
2980 h.update(&alice_bytes);
2981 let r: [u8; 32] = h.finalize().into();
2982 r
2983 };
2984 storage
2985 .store(routing_id, alice_blob_id, None, 3600, alice_bytes)
2986 .await
2987 .unwrap();
2988
2989 let bob_env = test_seal(&bob_key, b"bob public");
2991 let bob_bytes = rmp_serde::to_vec(&bob_env).unwrap();
2992 let bob_blob_id = {
2993 let mut h = Sha256::new();
2994 h.update(&bob_bytes);
2995 let r: [u8; 32] = h.finalize().into();
2996 r
2997 };
2998 storage
2999 .store(routing_id, bob_blob_id, None, 3600, bob_bytes)
3000 .await
3001 .unwrap();
3002
3003 let state = test_state_with(projected_map, storage);
3004 let routing_hex = hex_encode(&routing_id);
3005
3006 let router = broadcast_projection_router(Arc::clone(&state));
3008 let req = Request::builder()
3009 .uri(format!("/scp/broadcast/{routing_hex}/feed"))
3010 .body(Body::empty())
3011 .unwrap();
3012
3013 let resp = router.oneshot(req).await.unwrap();
3014 assert_eq!(resp.status(), HttpStatus::OK);
3015
3016 let cache_control = resp
3018 .headers()
3019 .get(header::CACHE_CONTROL)
3020 .unwrap()
3021 .to_str()
3022 .unwrap();
3023 assert_eq!(cache_control, "private, max-age=30");
3024
3025 let body = resp.into_body().collect().await.unwrap().to_bytes();
3026 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
3027 let messages = json["messages"].as_array().unwrap();
3028
3029 assert_eq!(
3031 messages.len(),
3032 1,
3033 "feed should filter alice's gated message"
3034 );
3035 assert_eq!(messages[0]["author_did"], "did:dht:bob");
3036
3037 let ucan_token = build_test_ucan(context_id);
3039 let router = broadcast_projection_router(state);
3040 let req = Request::builder()
3041 .uri(format!("/scp/broadcast/{routing_hex}/feed"))
3042 .header("Authorization", format!("Bearer {ucan_token}"))
3043 .body(Body::empty())
3044 .unwrap();
3045
3046 let resp = router.oneshot(req).await.unwrap();
3047 assert_eq!(resp.status(), HttpStatus::OK);
3048
3049 let body = resp.into_body().collect().await.unwrap().to_bytes();
3050 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
3051 let messages = json["messages"].as_array().unwrap();
3052 assert_eq!(
3053 messages.len(),
3054 2,
3055 "authenticated feed should include both messages"
3056 );
3057 }
3058
3059 #[test]
3064 fn validate_ucan_rejects_expired_token() {
3065 use base64::engine::general_purpose::URL_SAFE_NO_PAD;
3067 use scp_core::crypto::ucan::{Attenuation, UcanHeader, UcanPayload};
3068
3069 let header = UcanHeader {
3070 alg: "EdDSA".to_owned(),
3071 typ: "JWT".to_owned(),
3072 ucv: "0.10.0".to_owned(),
3073 kid: None,
3074 };
3075 let payload = UcanPayload {
3076 iss: "did:dht:test".to_owned(),
3077 aud: "did:dht:test".to_owned(),
3078 exp: 1, nbf: None,
3080 nnc: "nonce".to_owned(),
3081 att: vec![Attenuation {
3082 with: "scp:ctx:test_ctx/messages:read".to_owned(),
3083 can: "read".to_owned(),
3084 }],
3085 prf: vec![],
3086 fct: None,
3087 };
3088
3089 let h = serde_json::to_vec(&header).unwrap();
3090 let p = serde_json::to_vec(&payload).unwrap();
3091 let token = format!(
3092 "{}.{}.{}",
3093 URL_SAFE_NO_PAD.encode(&h),
3094 URL_SAFE_NO_PAD.encode(&p),
3095 URL_SAFE_NO_PAD.encode(vec![0u8; 64]),
3096 );
3097
3098 let result = validate_projection_ucan(&token, "test_ctx");
3099 assert!(result.is_err(), "expired UCAN should be rejected");
3100 }
3101
3102 #[test]
3103 fn validate_ucan_rejects_not_yet_valid_token() {
3104 use base64::engine::general_purpose::URL_SAFE_NO_PAD;
3105 use scp_core::crypto::ucan::{Attenuation, UcanHeader, UcanPayload};
3106
3107 let header = UcanHeader {
3108 alg: "EdDSA".to_owned(),
3109 typ: "JWT".to_owned(),
3110 ucv: "0.10.0".to_owned(),
3111 kid: None,
3112 };
3113 let payload = UcanPayload {
3114 iss: "did:dht:test".to_owned(),
3115 aud: "did:dht:test".to_owned(),
3116 exp: u64::MAX,
3117 nbf: Some(u64::MAX - 1), nnc: "nonce".to_owned(),
3119 att: vec![Attenuation {
3120 with: "scp:ctx:test_ctx/messages:read".to_owned(),
3121 can: "read".to_owned(),
3122 }],
3123 prf: vec![],
3124 fct: None,
3125 };
3126
3127 let h = serde_json::to_vec(&header).unwrap();
3128 let p = serde_json::to_vec(&payload).unwrap();
3129 let token = format!(
3130 "{}.{}.{}",
3131 URL_SAFE_NO_PAD.encode(&h),
3132 URL_SAFE_NO_PAD.encode(&p),
3133 URL_SAFE_NO_PAD.encode(vec![0u8; 64]),
3134 );
3135
3136 let result = validate_projection_ucan(&token, "test_ctx");
3137 assert!(result.is_err(), "not-yet-valid UCAN should be rejected");
3138 }
3139
3140 #[tokio::test]
3145 async fn message_handler_enforces_per_author_gated_override_on_open_context() {
3146 use scp_core::crypto::sender_keys::broadcast::rotate_broadcast_key;
3147
3148 let alice_key = generate_broadcast_key("did:dht:alice");
3152 let bob_key_epoch0 = generate_broadcast_key("did:dht:bob");
3153 let (bob_key, _) = rotate_broadcast_key(&bob_key_epoch0, 1_000).unwrap();
3155 let context_id = "open_per_author_override_ctx";
3156
3157 let policy = ProjectionPolicy {
3158 default_rule: ProjectionRule::Public,
3159 overrides: vec![ProjectionOverride {
3160 did: DID::from("did:dht:alice"),
3161 rule: ProjectionRule::Gated,
3162 }],
3163 };
3164
3165 let mut projected = ProjectedContext::new(
3166 context_id,
3167 alice_key.clone(),
3168 BroadcastAdmission::Open,
3169 Some(policy),
3170 );
3171 projected.keys.insert(bob_key.epoch(), bob_key.clone());
3173 let routing_id = projected.routing_id;
3174
3175 let mut projected_map = HashMap::new();
3176 projected_map.insert(routing_id, projected);
3177
3178 let storage = InMemoryBlobStorage::new();
3179
3180 let alice_envelope = test_seal(&alice_key, b"alice private content");
3182 let alice_blob_bytes = rmp_serde::to_vec(&alice_envelope).unwrap();
3183 let alice_blob_id = {
3184 let mut h = Sha256::new();
3185 h.update(&alice_blob_bytes);
3186 let r: [u8; 32] = h.finalize().into();
3187 r
3188 };
3189 storage
3190 .store(routing_id, alice_blob_id, None, 3600, alice_blob_bytes)
3191 .await
3192 .unwrap();
3193
3194 let bob_envelope = test_seal(&bob_key, b"bob public content");
3196 let bob_blob_bytes = rmp_serde::to_vec(&bob_envelope).unwrap();
3197 let bob_blob_id = {
3198 let mut h = Sha256::new();
3199 h.update(&bob_blob_bytes);
3200 let r: [u8; 32] = h.finalize().into();
3201 r
3202 };
3203 storage
3204 .store(routing_id, bob_blob_id, None, 3600, bob_blob_bytes)
3205 .await
3206 .unwrap();
3207
3208 let state = test_state_with(projected_map, storage);
3209 let routing_hex = hex_encode(&routing_id);
3210
3211 let alice_hex = hex_encode(&alice_blob_id);
3214 let router = broadcast_projection_router(Arc::clone(&state));
3215 let req = Request::builder()
3216 .uri(format!("/scp/broadcast/{routing_hex}/messages/{alice_hex}"))
3217 .body(Body::empty())
3218 .unwrap();
3219
3220 let resp = router.oneshot(req).await.unwrap();
3221 assert_eq!(
3222 resp.status(),
3223 HttpStatus::UNAUTHORIZED,
3224 "alice's content should require auth due to per-author Gated override"
3225 );
3226
3227 let bob_hex = hex_encode(&bob_blob_id);
3230 let router = broadcast_projection_router(Arc::clone(&state));
3231 let req = Request::builder()
3232 .uri(format!("/scp/broadcast/{routing_hex}/messages/{bob_hex}"))
3233 .body(Body::empty())
3234 .unwrap();
3235
3236 let resp = router.oneshot(req).await.unwrap();
3237 assert_eq!(
3238 resp.status(),
3239 HttpStatus::OK,
3240 "bob's content should be public (no per-author override)"
3241 );
3242
3243 let ucan_token = build_test_ucan(context_id);
3245 let router = broadcast_projection_router(state);
3246 let req = Request::builder()
3247 .uri(format!("/scp/broadcast/{routing_hex}/messages/{alice_hex}"))
3248 .header("Authorization", format!("Bearer {ucan_token}"))
3249 .body(Body::empty())
3250 .unwrap();
3251
3252 let resp = router.oneshot(req).await.unwrap();
3253 assert_eq!(
3254 resp.status(),
3255 HttpStatus::OK,
3256 "alice's content should be accessible with valid UCAN"
3257 );
3258
3259 let cache_control = resp
3261 .headers()
3262 .get(header::CACHE_CONTROL)
3263 .unwrap()
3264 .to_str()
3265 .unwrap();
3266 assert_eq!(cache_control, "private, immutable, max-age=31536000");
3267 }
3268
3269 fn build_test_ucan(context_id: &str) -> String {
3278 use base64::engine::general_purpose::URL_SAFE_NO_PAD;
3279 use scp_core::crypto::ucan::{Attenuation, UcanHeader, UcanPayload};
3280
3281 let header = UcanHeader {
3282 alg: "EdDSA".to_owned(),
3283 typ: "JWT".to_owned(),
3284 ucv: "0.10.0".to_owned(),
3285 kid: None,
3286 };
3287 let payload = UcanPayload {
3288 iss: "did:dht:test_issuer".to_owned(),
3289 aud: "did:dht:test_audience".to_owned(),
3290 exp: u64::MAX,
3291 nbf: None,
3292 nnc: "test-nonce-001".to_owned(),
3293 att: vec![Attenuation {
3294 with: format!("scp:ctx:{context_id}/messages:read"),
3295 can: "read".to_owned(),
3296 }],
3297 prf: vec![],
3298 fct: None,
3299 };
3300
3301 let header_json = serde_json::to_vec(&header).unwrap();
3302 let payload_json = serde_json::to_vec(&payload).unwrap();
3303 let signature = vec![0u8; 64];
3306
3307 format!(
3308 "{}.{}.{}",
3309 URL_SAFE_NO_PAD.encode(&header_json),
3310 URL_SAFE_NO_PAD.encode(&payload_json),
3311 URL_SAFE_NO_PAD.encode(&signature),
3312 )
3313 }
3314}