Skip to main content

scp_node/
projection.rs

1//! Broadcast projection registry and key management.
2//!
3//! A [`ProjectedContext`] tracks the per-epoch broadcast keys needed to
4//! decrypt messages from a broadcast context that this node projects over
5//! HTTP. The projection endpoints serve decrypted content at:
6//!
7//! - `GET /scp/broadcast/<routing_id_hex>/feed`
8//! - `GET /scp/broadcast/<routing_id_hex>/messages/<blob_id_hex>`
9//!
10//! Where `routing_id = SHA-256(context_id)` per spec section 5.14.6.
11//!
12//! # Activation
13//!
14//! Projection is opt-in per context via
15//! [`ApplicationNode::enable_broadcast_projection`](crate::ApplicationNode::enable_broadcast_projection)
16//! and deactivated via
17//! [`ApplicationNode::disable_broadcast_projection`](crate::ApplicationNode::disable_broadcast_projection).
18//!
19//! See spec sections 18.11.2 and 18.11.5.
20
21use std::collections::HashMap;
22use std::sync::Arc;
23
24use axum::Json;
25use axum::Router;
26use axum::extract::{Path, Query, State};
27use axum::http::{StatusCode, header};
28use axum::response::IntoResponse;
29use axum::routing::get;
30use base64::Engine;
31use base64::engine::general_purpose::STANDARD as BASE64;
32use serde::{Deserialize, Serialize};
33use sha2::{Digest, Sha256};
34
35use scp_core::context::broadcast::BroadcastAdmission;
36use scp_core::context::params::{ProjectionPolicy, ProjectionRule};
37use scp_core::crypto::sender_keys::{BroadcastEnvelope, BroadcastKey, open_broadcast_trusted};
38use scp_core::crypto::ucan::CapabilityUri;
39use scp_core::crypto::ucan::validate::parse_ucan;
40use scp_transport::native::storage::BlobStorage;
41
42use crate::error::ApiError;
43use crate::http::NodeState;
44
45// ---------------------------------------------------------------------------
46// Routing ID derivation
47// ---------------------------------------------------------------------------
48
49/// Computes the 32-byte routing ID for a broadcast context.
50///
51/// `routing_id = SHA-256(context_id)` where `context_id` is the raw bytes
52/// of the **lowercase** hex-encoded context ID string, per spec section 5.14.6.
53/// Normalizes to lowercase before hashing so that mixed-case IDs produce
54/// the same routing ID.
55#[must_use]
56pub fn compute_routing_id(context_id: &str) -> [u8; 32] {
57    let normalized = context_id.to_ascii_lowercase();
58    let mut hasher = Sha256::new();
59    hasher.update(normalized.as_bytes());
60    hasher.finalize().into()
61}
62
63// ---------------------------------------------------------------------------
64// ProjectedContext
65// ---------------------------------------------------------------------------
66
67/// A broadcast context whose messages are projected (decrypted and served)
68/// by this node's HTTP endpoints.
69///
70/// Maps epoch numbers to their corresponding [`BroadcastKey`]s so the
71/// projection handlers can decrypt messages from any epoch the node has
72/// observed. Multiple epochs are retained for the blob TTL window so
73/// messages encrypted under previous keys can still be decrypted.
74///
75/// Stores the context's [`BroadcastAdmission`] mode and optional
76/// [`ProjectionPolicy`] so projection handlers can enforce authentication
77/// requirements (SCP-GG-007, SCP-GG-008).
78///
79/// See spec section 18.11.5.
80#[derive(Debug)]
81pub struct ProjectedContext {
82    /// The 32-byte routing ID derived as `SHA-256(context_id)`.
83    ///
84    /// Used to subscribe to this context on the relay and to form the
85    /// HTTP endpoint paths.
86    pub(crate) routing_id: [u8; 32],
87    /// The context ID (hex-encoded) for display and API responses.
88    pub(crate) context_id: String,
89    /// Broadcast keys indexed by epoch number. Multiple epochs are retained
90    /// so messages encrypted under previous keys can still be decrypted
91    /// within the blob TTL window.
92    pub(crate) keys: HashMap<u64, BroadcastKey>,
93    /// Admission mode for the broadcast context (open or gated).
94    ///
95    /// Determines the floor for projection access control: gated contexts
96    /// cannot have public projection (spec section 18.11.2.1).
97    pub(crate) admission: BroadcastAdmission,
98    /// Optional per-author projection access policy.
99    ///
100    /// When `Some`, the default rule and per-author overrides control whether
101    /// projected content requires authentication. When `None`, the admission
102    /// mode alone determines the behavior (open = public, gated = gated).
103    pub(crate) projection_policy: Option<ProjectionPolicy>,
104}
105
106impl ProjectedContext {
107    /// Creates a new [`ProjectedContext`] from a context ID, initial broadcast key,
108    /// admission mode, and optional projection policy.
109    ///
110    /// The routing ID is computed as `SHA-256(context_id)` per spec section 5.14.6.
111    /// The key is inserted at its own epoch number. The admission mode and
112    /// projection policy are stored for use by projection handlers when deciding
113    /// whether to require authentication (spec section 18.11.2.1).
114    #[must_use]
115    pub fn new(
116        context_id: &str,
117        broadcast_key: BroadcastKey,
118        admission: BroadcastAdmission,
119        projection_policy: Option<ProjectionPolicy>,
120    ) -> Self {
121        let routing_id = compute_routing_id(context_id);
122        let epoch = broadcast_key.epoch();
123        let mut keys = HashMap::new();
124        keys.insert(epoch, broadcast_key);
125        Self {
126            routing_id,
127            context_id: context_id.to_owned(),
128            keys,
129            admission,
130            projection_policy,
131        }
132    }
133
134    /// Returns the routing ID for this projected context.
135    #[must_use]
136    pub const fn routing_id(&self) -> &[u8; 32] {
137        &self.routing_id
138    }
139
140    /// Returns the context ID (hex-encoded string).
141    #[must_use]
142    pub fn context_id(&self) -> &str {
143        &self.context_id
144    }
145
146    /// Returns a reference to the keys map (epoch -> broadcast key).
147    #[must_use]
148    pub const fn keys(&self) -> &HashMap<u64, BroadcastKey> {
149        &self.keys
150    }
151
152    /// Returns the admission mode for this broadcast context.
153    #[must_use]
154    pub const fn admission(&self) -> BroadcastAdmission {
155        self.admission
156    }
157
158    /// Returns the projection policy, if any.
159    #[must_use]
160    pub const fn projection_policy(&self) -> Option<&ProjectionPolicy> {
161        self.projection_policy.as_ref()
162    }
163
164    /// Inserts a broadcast key for the given epoch.
165    ///
166    /// Keys are retained indefinitely rather than pruned after the blob TTL
167    /// window (spec §18.11.5). This is acceptable because:
168    /// - Key rotations only occur on subscriber blocks (uncommon)
169    /// - Each key is ~40 bytes (32-byte secret + epoch + author DID ref)
170    /// - Even hundreds of epochs per context is negligible memory
171    ///
172    /// If pruning becomes necessary, add a `prune_before(epoch)` method
173    /// keyed to the relay's `max_blob_ttl`.
174    ///
175    /// **Important:** After a governance ban (`RevokeReadAccess` /
176    /// `governance_ban_subscriber`), all author keys are rotated in the
177    /// `ContextManager`. The caller MUST propagate the new-epoch keys to
178    /// the projection registry via this method; otherwise the projection
179    /// endpoint cannot decrypt content encrypted under the new keys.
180    pub fn insert_key(&mut self, broadcast_key: BroadcastKey) {
181        let epoch = broadcast_key.epoch();
182        self.keys.insert(epoch, broadcast_key);
183    }
184
185    /// Removes all keys whose epoch is NOT in the given set.
186    ///
187    /// Used after a `Full`-scope governance ban to ensure historical content
188    /// encrypted under pre-ban keys is no longer decryptable by the
189    /// projection endpoint. Messages referencing purged epochs will return
190    /// 410 Gone rather than serving content that a banned subscriber may
191    /// have previously accessed.
192    ///
193    /// Takes a set of epochs to retain (typically the new post-rotation
194    /// epochs). This correctly handles epoch-divergent multi-author contexts
195    /// where authors may be at different epochs.
196    pub fn retain_only_epochs(&mut self, epochs: &std::collections::HashSet<u64>) {
197        self.keys.retain(|e, _| epochs.contains(e));
198    }
199
200    /// Returns the broadcast key for the given epoch, if present.
201    #[must_use]
202    pub fn key_for_epoch(&self, epoch: u64) -> Option<&BroadcastKey> {
203        self.keys.get(&epoch)
204    }
205}
206
207// ---------------------------------------------------------------------------
208// Hex helpers
209// ---------------------------------------------------------------------------
210
211/// Encodes a 32-byte array as a lowercase hex string (64 characters).
212///
213/// Used for formatting routing IDs and blob IDs in API responses.
214#[must_use]
215pub fn hex_encode(bytes: &[u8; 32]) -> String {
216    hex::encode(bytes)
217}
218
219/// Decodes a hex string into a 32-byte array.
220///
221/// Returns `None` if the input is not exactly 64 hex characters or contains
222/// invalid hex digits.
223#[must_use]
224pub fn hex_decode(s: &str) -> Option<[u8; 32]> {
225    let bytes = hex::decode(s).ok()?;
226    <[u8; 32]>::try_from(bytes.as_slice()).ok()
227}
228
229// ---------------------------------------------------------------------------
230// Projection policy validation
231// ---------------------------------------------------------------------------
232
233/// Validates that a projection policy is consistent with the context's
234/// admission mode.
235///
236/// Gated contexts cannot have `ProjectionRule::Public` as the default rule
237/// or in any per-author override, because that would allow unauthenticated
238/// access to content that the context's admission mode requires
239/// authentication for (spec section 18.11.2.1).
240///
241/// # Errors
242///
243/// Returns an error message if a gated context has a `Public` default
244/// projection rule or a `Public` per-author override.
245pub fn validate_projection_policy(
246    admission: BroadcastAdmission,
247    policy: Option<&ProjectionPolicy>,
248) -> Result<(), String> {
249    if let (BroadcastAdmission::Gated, Some(p)) = (admission, policy) {
250        if p.default_rule == ProjectionRule::Public {
251            return Err("gated context cannot have public projection rule".into());
252        }
253        for override_entry in &p.overrides {
254            if override_entry.rule == ProjectionRule::Public {
255                return Err(
256                    "gated context cannot have public per-author projection override".into(),
257                );
258            }
259        }
260    }
261    Ok(())
262}
263
264// ---------------------------------------------------------------------------
265// Bearer token extraction and UCAN validation
266// ---------------------------------------------------------------------------
267
268/// Extracts a bearer token from the `Authorization` header.
269///
270/// Expects the format `Bearer <token>` (case-insensitive scheme per RFC 7235
271/// section 2.1). Returns the raw token string if present and correctly
272/// formatted, or `None` if the header is missing or malformed.
273fn extract_bearer_token(headers: &axum::http::HeaderMap) -> Option<&str> {
274    let value = headers.get(header::AUTHORIZATION)?.to_str().ok()?;
275    if value.len() > 7 && value[..7].eq_ignore_ascii_case("bearer ") {
276        Some(&value[7..])
277    } else {
278        None
279    }
280}
281
282/// Validates a bearer token as a UCAN with `messages:read` capability for the
283/// given context.
284///
285/// Performs structural validation plus temporal checks: parses the JWT,
286/// verifies the UCAN header fields (algorithm, version), checks token
287/// expiry (`exp`) and not-before (`nbf`), and confirms that at least one
288/// attenuation grants `messages:read` for the specified context (or a
289/// wildcard context).
290///
291/// **Note:** This is a simplified validation suitable for projection
292/// endpoints. It does NOT perform the full 11-step UCAN validation pipeline
293/// (DID resolution, Ed25519 signature verification, delegation chain
294/// traversal, revocation check) from
295/// [`scp_core::context::broadcast::validate_messages_read_ucan`], because
296/// the projection layer does not hold DID documents or the full
297/// `BroadcastContext` state. The primary access-control boundary is key
298/// distribution (subscribers must obtain broadcast keys via the gated key
299/// request flow); this check is a secondary defense-in-depth gate.
300///
301/// Returns `Ok(())` on success, or an error response on failure.
302fn validate_projection_ucan(
303    token: &str,
304    context_id: &str,
305) -> Result<(), Box<axum::response::Response>> {
306    let ucan = parse_ucan(token).map_err(|e| {
307        tracing::debug!(error = %e, "UCAN parse failed for projection auth");
308        Box::new(ApiError::unauthorized_with("invalid UCAN token").into_response())
309    })?;
310
311    // Temporal checks: reject expired or not-yet-valid tokens.
312    let now = std::time::SystemTime::now()
313        .duration_since(std::time::UNIX_EPOCH)
314        .map(|d| d.as_secs())
315        .unwrap_or(0);
316
317    if ucan.payload.exp <= now {
318        tracing::debug!(
319            exp = ucan.payload.exp,
320            now = now,
321            "UCAN expired for projection auth"
322        );
323        return Err(Box::new(
324            ApiError::unauthorized_with("UCAN token expired").into_response(),
325        ));
326    }
327
328    if let Some(nbf) = ucan.payload.nbf
329        && nbf > now
330    {
331        tracing::debug!(
332            nbf = nbf,
333            now = now,
334            "UCAN not yet valid for projection auth"
335        );
336        return Err(Box::new(
337            ApiError::unauthorized_with("UCAN token not yet valid").into_response(),
338        ));
339    }
340
341    // Build the required capability URI for this context.
342    let required = CapabilityUri::new(context_id, "messages", "read");
343
344    // Check that at least one attenuation grants the required capability.
345    let has_capability = ucan.payload.att.iter().any(|att| {
346        att.with
347            .parse::<CapabilityUri>()
348            .is_ok_and(|cap| cap.matches(&required))
349    });
350
351    if !has_capability {
352        tracing::debug!(
353            context_id = context_id,
354            "UCAN missing messages:read capability for context"
355        );
356        return Err(Box::new(
357            ApiError::unauthorized_with("UCAN missing messages:read capability").into_response(),
358        ));
359    }
360
361    Ok(())
362}
363
364/// Determines the effective projection rule for a request, considering the
365/// context's admission mode, default projection policy, and optional
366/// per-author overrides.
367///
368/// For gated contexts without an explicit projection policy, returns
369/// `ProjectionRule::Gated`. For open contexts without a policy, returns
370/// `ProjectionRule::Public`.
371///
372/// When `author_did` is `Some`, checks for a per-author override before
373/// falling back to the default rule.
374fn effective_projection_rule(
375    admission: BroadcastAdmission,
376    policy: Option<&ProjectionPolicy>,
377    author_did: Option<&str>,
378) -> ProjectionRule {
379    match policy {
380        Some(p) => {
381            // Check per-author override first, if an author DID is available.
382            if let Some(author) = author_did
383                && let Some(ov) = p.overrides.iter().find(|o| o.did.as_ref() == author)
384            {
385                return ov.rule;
386            }
387            p.default_rule
388        }
389        None => match admission {
390            BroadcastAdmission::Gated => ProjectionRule::Gated,
391            BroadcastAdmission::Open => ProjectionRule::Public,
392        },
393    }
394}
395
396/// Checks authorization for a projection endpoint request.
397///
398/// Returns `Ok(())` if the request is authorized, or an error response if not.
399/// For `ProjectionRule::Public`, no authorization is required. For
400/// `ProjectionRule::Gated`, a valid UCAN with `messages:read` capability must
401/// be present in the `Authorization: Bearer <token>` header.
402///
403/// `ProjectionRule::AuthorChoice` is treated as `Gated` at the projection
404/// layer — the author's own choice is enforced at the context level, not
405/// at HTTP projection.
406fn check_projection_auth(
407    headers: &axum::http::HeaderMap,
408    context_id: &str,
409    rule: ProjectionRule,
410) -> Result<(), Box<axum::response::Response>> {
411    match rule {
412        ProjectionRule::Public => Ok(()),
413        ProjectionRule::Gated | ProjectionRule::AuthorChoice => {
414            let token = extract_bearer_token(headers).ok_or_else(|| {
415                Box::new(
416                    ApiError::unauthorized_with("Authorization required for gated broadcast")
417                        .into_response(),
418                )
419            })?;
420            validate_projection_ucan(token, context_id)
421        }
422    }
423}
424
425// ---------------------------------------------------------------------------
426// Timestamp formatting
427// ---------------------------------------------------------------------------
428
429/// Formats a Unix timestamp (seconds since epoch) as an ISO 8601 UTC string.
430///
431/// Produces the `YYYY-MM-DDThh:mm:ssZ` format used by the feed endpoint's
432/// `published_at` field. Returns `"1970-01-01T00:00:00Z"` for timestamp 0.
433#[must_use]
434pub fn unix_to_iso8601(ts: u64) -> String {
435    // Days in each month for non-leap and leap years.
436    const DAYS_IN_MONTH: [[u64; 12]; 2] = [
437        [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31],
438        [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31],
439    ];
440
441    const fn is_leap(y: u64) -> bool {
442        (y.is_multiple_of(4) && !y.is_multiple_of(100)) || y.is_multiple_of(400)
443    }
444
445    const fn days_in_year(y: u64) -> u64 {
446        if is_leap(y) { 366 } else { 365 }
447    }
448
449    let mut remaining = ts;
450    let seconds = remaining % 60;
451    remaining /= 60;
452    let minutes = remaining % 60;
453    remaining /= 60;
454    let hours = remaining % 24;
455    let mut days = remaining / 24;
456
457    let mut year = 1970u64;
458    loop {
459        let dy = days_in_year(year);
460        if days < dy {
461            break;
462        }
463        days -= dy;
464        year += 1;
465    }
466
467    let leap = usize::from(is_leap(year));
468    let mut month = 0usize;
469    while month < 11 && days >= DAYS_IN_MONTH[leap][month] {
470        days -= DAYS_IN_MONTH[leap][month];
471        month += 1;
472    }
473
474    format!(
475        "{year:04}-{:02}-{:02}T{hours:02}:{minutes:02}:{seconds:02}Z",
476        month + 1,
477        days + 1
478    )
479}
480
481// ---------------------------------------------------------------------------
482// Feed response types
483// ---------------------------------------------------------------------------
484
485/// JSON response body for `GET /scp/broadcast/<routing_id>/feed`.
486///
487/// Contains the context ID, the primary author DID (from the first message
488/// in the response, or the projected context's author), and an array of
489/// decrypted broadcast messages.
490///
491/// See spec section 18.11.3.
492#[derive(Debug, Clone, Serialize)]
493pub struct FeedResponse {
494    /// The hex-encoded context ID that this feed belongs to.
495    pub context_id: String,
496    /// The DID of the broadcast context's primary author.
497    ///
498    /// Derived from the first message in the response. If no messages are
499    /// present, this is an empty string.
500    pub author_did: String,
501    /// Array of decrypted broadcast messages, ordered oldest-first.
502    pub messages: Vec<FeedMessage>,
503}
504
505/// A single decrypted broadcast message in a [`FeedResponse`].
506///
507/// Each message has been deserialized from `MessagePack` ([`BroadcastEnvelope`])
508/// and decrypted with the epoch-matched broadcast key. The content is
509/// base64-encoded for JSON transport.
510///
511/// Includes all envelope metadata fields per §5.14.5 (issue #352).
512///
513/// See spec section 18.11.3.
514#[derive(Debug, Clone, Serialize)]
515pub struct FeedMessage {
516    /// The hex-encoded blob ID (SHA-256 hash) identifying this message.
517    pub id: String,
518    /// The context ID this message belongs to.
519    pub context_id: String,
520    /// The DID of the author who sealed this broadcast envelope.
521    pub author_did: String,
522    /// The per-author monotonic sequence number (§5.14.5).
523    pub sequence: u64,
524    /// Unix timestamp in milliseconds when the message was sealed.
525    pub timestamp: u64,
526    /// The broadcast key epoch used to encrypt this message.
527    pub key_epoch: u64,
528    /// ISO 8601 UTC timestamp when the relay stored this blob.
529    pub published_at: String,
530    /// Base64-encoded decrypted content.
531    pub content: String,
532}
533
534/// Query parameters for `GET /scp/broadcast/<routing_id>/feed`.
535///
536/// - `since` — Optional hex-encoded blob ID. When present, only messages
537///   stored after that blob are returned (exclusive).
538/// - `limit` — Maximum number of messages to return (default 20, max 100).
539///
540/// See spec section 18.11.3.
541#[derive(Debug, Clone, Deserialize)]
542pub struct FeedQuery {
543    /// Hex-encoded blob ID. Only messages stored after this blob are returned.
544    pub since: Option<String>,
545    /// Maximum number of messages (default 20, max 100).
546    pub limit: Option<u32>,
547}
548
549// ---------------------------------------------------------------------------
550// Feed handler
551// ---------------------------------------------------------------------------
552
553/// Default number of messages returned when `limit` is not specified.
554const DEFAULT_FEED_LIMIT: u32 = 20;
555
556/// Maximum allowed value for the `limit` query parameter.
557const MAX_FEED_LIMIT: u32 = 100;
558
559/// Decrypts stored blobs into [`FeedMessage`] values.
560///
561/// For each blob, deserializes the `MessagePack`-encoded [`BroadcastEnvelope`],
562/// looks up the epoch-matched broadcast key, and decrypts the content.
563/// Blobs that fail deserialization or decryption are logged at `warn` level
564/// and skipped. Returns the messages and the blob ID of the last successfully
565/// decrypted message (for the `ETag` header).
566fn decrypt_blobs(
567    blobs: &[scp_transport::native::storage::StoredBlob],
568    keys: &HashMap<u64, BroadcastKey>,
569) -> (Vec<FeedMessage>, Option<[u8; 32]>) {
570    let mut messages = Vec::with_capacity(blobs.len());
571    let mut latest_blob_id: Option<[u8; 32]> = None;
572
573    for stored in blobs {
574        // Deserialize BroadcastEnvelope from MessagePack.
575        let envelope: BroadcastEnvelope = match rmp_serde::from_slice(&stored.blob) {
576            Ok(env) => env,
577            Err(e) => {
578                tracing::warn!(
579                    blob_id = hex_encode(&stored.blob_id),
580                    error = %e,
581                    "failed to deserialize BroadcastEnvelope, skipping"
582                );
583                continue;
584            }
585        };
586
587        // Find the matching broadcast key for this epoch.
588        let Some(key) = keys.get(&envelope.key_epoch) else {
589            tracing::warn!(
590                blob_id = hex_encode(&stored.blob_id),
591                epoch = envelope.key_epoch,
592                "no broadcast key for epoch, skipping"
593            );
594            continue;
595        };
596
597        // Decrypt.
598        let plaintext = match open_broadcast_trusted(key, &envelope) {
599            Ok(pt) => pt,
600            Err(e) => {
601                tracing::warn!(
602                    blob_id = hex_encode(&stored.blob_id),
603                    epoch = envelope.key_epoch,
604                    error = %e,
605                    "decryption failed, skipping"
606                );
607                continue;
608            }
609        };
610
611        latest_blob_id = Some(stored.blob_id);
612
613        messages.push(FeedMessage {
614            id: hex_encode(&stored.blob_id),
615            context_id: envelope.context_id,
616            author_did: envelope.author_did,
617            sequence: envelope.sequence,
618            timestamp: envelope.timestamp,
619            key_epoch: envelope.key_epoch,
620            published_at: unix_to_iso8601(stored.stored_at),
621            content: BASE64.encode(&plaintext),
622        });
623    }
624
625    (messages, latest_blob_id)
626}
627
628/// Axum handler for `GET /scp/broadcast/<routing_id>/feed`.
629///
630/// Looks up the projected context by routing ID, queries stored blobs,
631/// deserializes each blob as a [`BroadcastEnvelope`], decrypts it with the
632/// epoch-matched broadcast key, and returns the result as a JSON
633/// [`FeedResponse`]. Blobs that fail deserialization or decryption are
634/// logged and skipped (not a 500).
635///
636/// # Headers
637///
638/// - `Cache-Control: public, max-age=30, stale-while-revalidate=300`
639/// - `ETag: "<latest_blob_id_hex>"` (the blob ID of the last message)
640///
641/// # Cursor expiry
642///
643/// When a `since` blob ID refers to a blob that has expired or been purged,
644/// the feed returns **empty** (no messages) rather than the full feed. Clients
645/// should treat an empty response to a previously-valid cursor as a signal to
646/// reset their cursor (omit `since`) and re-fetch from the beginning.
647///
648/// A `since` blob ID that belongs to a different context returns **400**.
649///
650/// # Errors
651///
652/// - **404** — Unknown routing ID (no projected context registered).
653/// - **400** — Invalid routing ID hex, invalid `since` blob ID hex, or
654///   `since` blob belongs to a different context.
655///
656/// See spec section 18.11.3.
657#[allow(clippy::too_many_lines)]
658pub async fn feed_handler(
659    State(state): State<Arc<NodeState>>,
660    Path(routing_id_hex): Path<String>,
661    Query(params): Query<FeedQuery>,
662    headers: axum::http::HeaderMap,
663) -> impl IntoResponse {
664    // Parse routing_id from hex.
665    let Some(routing_id) = hex_decode(&routing_id_hex) else {
666        return ApiError::bad_request("invalid routing_id hex").into_response();
667    };
668
669    // Look up projected context.
670    let projected_contexts = state.projected_contexts.read().await;
671    let Some(projected) = projected_contexts.get(&routing_id) else {
672        return ApiError::not_found("unknown routing_id").into_response();
673    };
674
675    // Extract context_id, keys, and auth-related fields before dropping the read lock.
676    let context_id = projected.context_id.clone();
677    let admission = projected.admission;
678    let projection_policy = projected.projection_policy.clone();
679    // We need a snapshot of the keys to avoid holding the lock during async I/O.
680    let keys: HashMap<u64, BroadcastKey> = projected.keys.clone();
681    drop(projected_contexts);
682
683    // Enforce projection auth. Feed endpoint uses the default rule (no
684    // per-author override) since the feed may contain messages from
685    // multiple authors.
686    let rule = effective_projection_rule(admission, projection_policy.as_ref(), None);
687    if let Err(resp) = check_projection_auth(&headers, &context_id, rule) {
688        return *resp;
689    }
690
691    // Resolve `since` parameter: if a blob_id hex is provided, look it up
692    // to get its stored_at timestamp for the query filter.
693    let since_ts: Option<u64> = if let Some(ref since_hex) = params.since {
694        let Some(since_blob_id) = hex_decode(since_hex) else {
695            return ApiError::bad_request("invalid since blob_id hex").into_response();
696        };
697        // Look up the blob to get its stored_at timestamp.
698        match state.blob_storage.get(&since_blob_id).await {
699            Ok(Some(blob)) => {
700                // Verify the blob belongs to this routing_id to prevent
701                // cross-context timestamp oracle (BLACK-HTTP-005).
702                if blob.routing_id != routing_id {
703                    return ApiError::bad_request("since blob_id does not belong to this context")
704                        .into_response();
705                }
706                Some(blob.stored_at)
707            }
708            Ok(None) => {
709                // Blob expired or purged — return empty feed rather than all
710                // messages. Returning all would be a surprising behavior change
711                // when a previously-valid cursor expires.
712                Some(u64::MAX)
713            }
714            Err(e) => {
715                tracing::warn!(
716                    error = %e,
717                    since_blob_id = since_hex,
718                    "failed to look up since blob"
719                );
720                // Storage error — conservative: return empty feed.
721                Some(u64::MAX)
722            }
723        }
724    } else {
725        None
726    };
727
728    // Clamp limit.
729    let limit = params
730        .limit
731        .unwrap_or(DEFAULT_FEED_LIMIT)
732        .min(MAX_FEED_LIMIT);
733
734    // Query blobs.
735    let blobs = match state.blob_storage.query(&routing_id, since_ts, limit).await {
736        Ok(blobs) => blobs,
737        Err(e) => {
738            tracing::error!(
739                error = %e,
740                routing_id = routing_id_hex,
741                "blob storage query failed"
742            );
743            return ApiError::internal_error("storage error").into_response();
744        }
745    };
746
747    // Decrypt each blob into a FeedMessage.
748    let (mut messages, latest_blob_id) = decrypt_blobs(&blobs, &keys);
749
750    // Per-author override filtering: if any per-author override makes an
751    // author's content Gated (or stricter than the default rule), filter
752    // out their messages when the request lacks valid auth. This prevents
753    // the feed from leaking per-author-gated content to unauthenticated
754    // clients (RED-302).
755    //
756    // Optimization (#355): pre-compute auth decisions for each distinct
757    // ProjectionRule found in the overrides. This parses the UCAN at most
758    // once per distinct rule instead of re-parsing per message in the
759    // retain loop. ProjectionRule has only 3 variants, so we check each
760    // at most once.
761    if let Some(ref policy) = projection_policy
762        && !policy.overrides.is_empty()
763    {
764        // Pre-compute auth result for each ProjectionRule variant that
765        // appears in the overrides. The default rule already passed auth
766        // above, so we mark it as authorized. For other rules, we call
767        // check_projection_auth once per distinct variant.
768        let auth_public = true; // Public never requires auth
769        let mut auth_gated: Option<bool> = None;
770        let mut auth_author_choice: Option<bool> = None;
771
772        // Mark the default rule as already authorized.
773        match rule {
774            ProjectionRule::Public => {} // auth_public is already true
775            ProjectionRule::Gated => {
776                auth_gated = Some(true);
777            }
778            ProjectionRule::AuthorChoice => {
779                auth_author_choice = Some(true);
780            }
781        }
782
783        // Pre-compute only the rules that actually appear in overrides
784        // and haven't been computed yet.
785        for ov in &policy.overrides {
786            if ov.rule == ProjectionRule::Gated && auth_gated.is_none() {
787                auth_gated = Some(
788                    check_projection_auth(&headers, &context_id, ProjectionRule::Gated).is_ok(),
789                );
790            } else if ov.rule == ProjectionRule::AuthorChoice && auth_author_choice.is_none() {
791                auth_author_choice = Some(
792                    check_projection_auth(&headers, &context_id, ProjectionRule::AuthorChoice)
793                        .is_ok(),
794                );
795            }
796            // ProjectionRule::Public is always authorized; already-computed
797            // rules are skipped by the is_none() guards above.
798        }
799
800        messages.retain(|msg| {
801            let author_rule =
802                effective_projection_rule(admission, Some(policy), Some(&msg.author_did));
803            match author_rule {
804                ProjectionRule::Public => auth_public,
805                ProjectionRule::Gated => auth_gated.unwrap_or(true),
806                ProjectionRule::AuthorChoice => auth_author_choice.unwrap_or(true),
807            }
808        });
809    }
810
811    // Determine author_did for the top-level response.
812    let author_did = messages
813        .first()
814        .map(|m| m.author_did.clone())
815        .unwrap_or_default();
816
817    let response = FeedResponse {
818        context_id,
819        author_did,
820        messages,
821    };
822
823    // Build response with caching headers. Gated contexts use `private`
824    // to prevent shared caches from serving authenticated content to
825    // unauthorized clients.
826    let etag = latest_blob_id
827        .map(|id| format!("\"{}\"", hex_encode(&id)))
828        .unwrap_or_default();
829
830    // If any per-author override exists, the response content varies based on
831    // auth state — use `private` even if the default rule is Public.
832    let has_per_author_overrides = projection_policy
833        .as_ref()
834        .is_some_and(|p| !p.overrides.is_empty());
835
836    let cache_control = match rule {
837        ProjectionRule::Public if !has_per_author_overrides => {
838            "public, max-age=30, stale-while-revalidate=300"
839        }
840        _ => "private, max-age=30",
841    };
842
843    let mut resp_headers = axum::http::HeaderMap::new();
844    resp_headers.insert(
845        header::CACHE_CONTROL,
846        axum::http::HeaderValue::from_static(cache_control),
847    );
848    if let (false, Ok(val)) = (etag.is_empty(), axum::http::HeaderValue::from_str(&etag)) {
849        resp_headers.insert(header::ETAG, val);
850    }
851
852    (StatusCode::OK, resp_headers, Json(response)).into_response()
853}
854
855// ---------------------------------------------------------------------------
856// Per-message handler
857// ---------------------------------------------------------------------------
858
859/// Axum handler for `GET /scp/broadcast/<routing_id>/messages/<blob_id>`.
860///
861/// Retrieves a single stored blob, deserializes it as a [`BroadcastEnvelope`],
862/// decrypts it with the epoch-matched broadcast key, and returns the result
863/// as a JSON [`FeedMessage`].
864///
865/// # Headers
866///
867/// - `Cache-Control: public, immutable, max-age=31536000` — broadcast
868///   messages are content-addressed and never change.
869/// - `ETag: "<blob_id_hex>"` — enables conditional GET.
870///
871/// # Conditional GET
872///
873/// If the client sends `If-None-Match: "<blob_id_hex>"`, the server returns
874/// **304 Not Modified** with no body, saving bandwidth for repeated fetches
875/// of the same message.
876///
877/// # Errors
878///
879/// - **400** — Invalid hex in `routing_id` or `blob_id` path segment.
880/// - **404** — Unknown routing ID (no projected context registered) or
881///   unknown blob ID (not in storage or routing ID mismatch).
882/// - **410** — Content revoked (epoch key purged after a `Full`-scope
883///   governance ban).
884/// - **500** — Decryption failure (corrupt envelope or AEAD open failure).
885///
886/// See spec section 18.11.4.
887#[allow(clippy::too_many_lines)]
888pub async fn message_handler(
889    State(state): State<Arc<NodeState>>,
890    Path((routing_id_hex, blob_id_hex_raw)): Path<(String, String)>,
891    headers: axum::http::HeaderMap,
892) -> impl IntoResponse {
893    // Normalize blob_id_hex to lowercase for consistent ETag generation.
894    let blob_id_hex = blob_id_hex_raw.to_ascii_lowercase();
895
896    // Parse routing_id from hex.
897    let Some(routing_id) = hex_decode(&routing_id_hex) else {
898        return ApiError::bad_request("invalid routing_id hex").into_response();
899    };
900
901    // Parse blob_id from hex.
902    let Some(blob_id) = hex_decode(&blob_id_hex) else {
903        return ApiError::bad_request("invalid blob_id hex").into_response();
904    };
905
906    // Look up projected context (before conditional GET to avoid
907    // cross-context blob existence oracle — BLACK-HTTP-005).
908    let projected_contexts = state.projected_contexts.read().await;
909    let Some(projected) = projected_contexts.get(&routing_id) else {
910        return ApiError::not_found("unknown routing_id").into_response();
911    };
912
913    // Snapshot keys and auth-related fields before dropping the read lock.
914    let context_id = projected.context_id.clone();
915    let admission = projected.admission;
916    let projection_policy = projected.projection_policy.clone();
917    let keys: HashMap<u64, BroadcastKey> = projected.keys.clone();
918    drop(projected_contexts);
919
920    // Pre-auth check with default rule (before we know the author). For
921    // gated contexts this rejects unauthenticated requests early. The
922    // per-author override is checked after decryption reveals the author DID.
923    let default_rule = effective_projection_rule(admission, projection_policy.as_ref(), None);
924    if matches!(
925        default_rule,
926        ProjectionRule::Gated | ProjectionRule::AuthorChoice
927    ) && let Err(resp) = check_projection_auth(&headers, &context_id, default_rule)
928    {
929        return *resp;
930    }
931
932    // Fetch the blob from storage.
933    let stored = match state.blob_storage.get(&blob_id).await {
934        Ok(Some(blob)) => blob,
935        Ok(None) => {
936            return ApiError::not_found("unknown blob_id").into_response();
937        }
938        Err(e) => {
939            tracing::error!(
940                error = %e,
941                blob_id = blob_id_hex,
942                "blob storage get failed"
943            );
944            return ApiError::internal_error("storage error").into_response();
945        }
946    };
947
948    // Verify the blob belongs to this routing_id. This MUST happen before
949    // the conditional GET check to prevent a cross-context blob existence
950    // oracle (BLACK-HTTP-005): without this ordering, an attacker could
951    // send If-None-Match with a blob_id from routing_A to routing_B and
952    // receive 304 (confirming blob existence) instead of 404.
953    if stored.routing_id != routing_id {
954        return ApiError::not_found("unknown blob_id").into_response();
955    }
956
957    // Conditional GET: check If-None-Match header. Placed after both
958    // routing_id validation and blob ownership verification so that
959    // unknown routing IDs and cross-context probes always get 404.
960    if let Some(inm) = headers.get(header::IF_NONE_MATCH)
961        && let Ok(inm_str) = inm.to_str()
962    {
963        let expected_etag = format!("\"{blob_id_hex}\"");
964        if inm_str == expected_etag {
965            return StatusCode::NOT_MODIFIED.into_response();
966        }
967    }
968
969    // Deserialize BroadcastEnvelope from MessagePack.
970    let envelope: BroadcastEnvelope = match rmp_serde::from_slice(&stored.blob) {
971        Ok(env) => env,
972        Err(e) => {
973            tracing::error!(
974                error = %e,
975                blob_id = blob_id_hex,
976                "failed to deserialize BroadcastEnvelope"
977            );
978            return ApiError::internal_error("decryption failure").into_response();
979        }
980    };
981
982    // Per-author override: the envelope exposes `author_did` without
983    // requiring decryption (it's in the MessagePack header, not the
984    // ciphertext). Check per-author auth BEFORE decrypting — authenticate
985    // before processing is a fundamental security principle.
986    let effective_rule = effective_projection_rule(
987        admission,
988        projection_policy.as_ref(),
989        Some(&envelope.author_did),
990    );
991    if effective_rule != default_rule
992        && let Err(resp) = check_projection_auth(&headers, &context_id, effective_rule)
993    {
994        return *resp;
995    }
996
997    // Find the matching broadcast key for this epoch. A missing key
998    // indicates the epoch was purged after a Full-scope governance ban
999    // (intentional revocation) — return 410 Gone rather than 500.
1000    let Some(key) = keys.get(&envelope.key_epoch) else {
1001        tracing::warn!(
1002            blob_id = blob_id_hex,
1003            epoch = envelope.key_epoch,
1004            "no broadcast key for epoch (likely purged after governance ban)"
1005        );
1006        return ApiError::gone("content revoked").into_response();
1007    };
1008
1009    // Decrypt.
1010    let plaintext = match open_broadcast_trusted(key, &envelope) {
1011        Ok(pt) => pt,
1012        Err(e) => {
1013            tracing::error!(
1014                error = %e,
1015                blob_id = blob_id_hex,
1016                epoch = envelope.key_epoch,
1017                "decryption failed"
1018            );
1019            return ApiError::internal_error("decryption failure").into_response();
1020        }
1021    };
1022
1023    let message = FeedMessage {
1024        id: hex_encode(&stored.blob_id),
1025        context_id: envelope.context_id,
1026        author_did: envelope.author_did,
1027        sequence: envelope.sequence,
1028        timestamp: envelope.timestamp,
1029        key_epoch: envelope.key_epoch,
1030        published_at: unix_to_iso8601(stored.stored_at),
1031        content: BASE64.encode(&plaintext),
1032    };
1033
1034    // Build response with immutable caching headers. Gated contexts use
1035    // `private` to prevent shared caches from serving authenticated
1036    // content to unauthorized clients.
1037    let cache_control = match effective_rule {
1038        ProjectionRule::Public => "public, immutable, max-age=31536000",
1039        ProjectionRule::Gated | ProjectionRule::AuthorChoice => {
1040            "private, immutable, max-age=31536000"
1041        }
1042    };
1043
1044    let mut resp_headers = axum::http::HeaderMap::new();
1045    resp_headers.insert(
1046        header::CACHE_CONTROL,
1047        axum::http::HeaderValue::from_static(cache_control),
1048    );
1049    let etag = format!("\"{blob_id_hex}\"");
1050    if let Ok(val) = axum::http::HeaderValue::from_str(&etag) {
1051        resp_headers.insert(header::ETAG, val);
1052    }
1053
1054    (StatusCode::OK, resp_headers, Json(message)).into_response()
1055}
1056
1057// ---------------------------------------------------------------------------
1058// Router constructors
1059// ---------------------------------------------------------------------------
1060
1061/// Returns an axum [`Router`] serving both broadcast projection endpoints.
1062///
1063/// Mounts:
1064/// - `GET /scp/broadcast/{routing_id}/feed` — paginated feed of decrypted
1065///   messages ([`feed_handler`], spec section 18.11.3).
1066/// - `GET /scp/broadcast/{routing_id}/messages/{blob_id}` — single decrypted
1067///   message with conditional GET ([`message_handler`], spec section 18.11.4).
1068///
1069/// The router shares the node's [`NodeState`] for access to projected
1070/// contexts and blob storage.
1071///
1072/// See spec sections 18.11.3 and 18.11.4.
1073pub fn broadcast_projection_router(state: Arc<NodeState>) -> Router {
1074    let limiter = state.projection_rate_limiter.clone();
1075    Router::new()
1076        .route("/scp/broadcast/{routing_id}/feed", get(feed_handler))
1077        .route(
1078            "/scp/broadcast/{routing_id}/messages/{blob_id}",
1079            get(message_handler),
1080        )
1081        .layer(axum::middleware::from_fn(move |req, next| {
1082            projection_rate_limit_middleware(req, next, limiter.clone())
1083        }))
1084        .with_state(state)
1085}
1086
1087/// Middleware that enforces per-IP rate limiting on projection endpoints.
1088///
1089/// Extracts the client IP from [`axum::extract::ConnectInfo<SocketAddr>`]
1090/// (injected by `axum::serve` for plain HTTP, or manually for TLS connections
1091/// in [`crate::tls::serve_tls`]). Falls back to `0.0.0.0` if unavailable.
1092///
1093/// Returns HTTP 429 Too Many Requests when the per-IP token bucket is exhausted.
1094/// See spec section 18.11.6.
1095async fn projection_rate_limit_middleware(
1096    request: axum::extract::Request,
1097    next: axum::middleware::Next,
1098    limiter: scp_transport::relay::rate_limit::PublishRateLimiter,
1099) -> axum::response::Response {
1100    let ip = request
1101        .extensions()
1102        .get::<axum::extract::ConnectInfo<std::net::SocketAddr>>()
1103        .map_or_else(
1104            || {
1105                tracing::warn!(
1106                    "ConnectInfo missing from request extensions; \
1107                     projection rate limiting falls back to shared 0.0.0.0 bucket \
1108                     (per-IP isolation lost)"
1109                );
1110                std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED)
1111            },
1112            |ci| ci.0.ip(),
1113        );
1114    if !limiter.check(ip).await {
1115        return (
1116            axum::http::StatusCode::TOO_MANY_REQUESTS,
1117            "rate limit exceeded",
1118        )
1119            .into_response();
1120    }
1121    next.run(request).await
1122}
1123
1124// ---------------------------------------------------------------------------
1125// Unit tests
1126// ---------------------------------------------------------------------------
1127
1128#[cfg(test)]
1129#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1130mod tests {
1131    use super::*;
1132    use ed25519_dalek::Signer;
1133    use scp_core::crypto::sender_keys::{
1134        SealBroadcastParams, SigningPayloadFields, build_broadcast_signing_payload,
1135        compute_provenance_hash, generate_broadcast_key, generate_broadcast_nonce,
1136        rotate_broadcast_key, seal_broadcast,
1137    };
1138
1139    /// Seals a broadcast envelope with test defaults for projection tests.
1140    fn test_seal(key: &BroadcastKey, payload: &[u8]) -> BroadcastEnvelope {
1141        let sk = ed25519_dalek::SigningKey::from_bytes(&[0xAA; 32]);
1142        let nonce = generate_broadcast_nonce();
1143        let provenance_hash = compute_provenance_hash(None).unwrap();
1144        let signature = sk.sign(&build_broadcast_signing_payload(&SigningPayloadFields {
1145            version: scp_core::envelope::SCP_PROTOCOL_VERSION,
1146            context_id: "test-ctx",
1147            author_did: key.author_did(),
1148            sequence: 1,
1149            key_epoch: key.epoch(),
1150            timestamp: 1_700_000_000_000,
1151            nonce: &nonce,
1152            provenance_hash: &provenance_hash,
1153        }));
1154        let params = SealBroadcastParams {
1155            context_id: "test-ctx",
1156            sequence: 1,
1157            timestamp: 1_700_000_000_000,
1158            provenance: None,
1159            signature,
1160        };
1161        seal_broadcast(key, payload, &nonce, &params).unwrap()
1162    }
1163
1164    // -----------------------------------------------------------------------
1165    // Hex helpers
1166    // -----------------------------------------------------------------------
1167
1168    #[test]
1169    fn hex_encode_produces_64_char_lowercase() {
1170        let bytes = [0xab; 32];
1171        let encoded = hex_encode(&bytes);
1172        assert_eq!(encoded.len(), 64);
1173        assert_eq!(encoded, "ab".repeat(32));
1174    }
1175
1176    #[test]
1177    fn hex_encode_all_zeros() {
1178        let bytes = [0u8; 32];
1179        assert_eq!(hex_encode(&bytes), "00".repeat(32));
1180    }
1181
1182    #[test]
1183    fn hex_decode_roundtrip() {
1184        let bytes = [0xde; 32];
1185        let encoded = hex_encode(&bytes);
1186        let decoded = hex_decode(&encoded).unwrap();
1187        assert_eq!(decoded, bytes);
1188    }
1189
1190    #[test]
1191    fn hex_decode_rejects_wrong_length() {
1192        assert!(hex_decode("abcd").is_none());
1193        assert!(hex_decode("").is_none());
1194        assert!(hex_decode(&"a".repeat(63)).is_none());
1195        assert!(hex_decode(&"a".repeat(65)).is_none());
1196    }
1197
1198    #[test]
1199    fn hex_decode_rejects_invalid_chars() {
1200        let mut s = "0".repeat(64);
1201        s.replace_range(0..1, "g"); // invalid hex char
1202        assert!(hex_decode(&s).is_none());
1203    }
1204
1205    #[test]
1206    fn hex_decode_accepts_uppercase() {
1207        let lower = hex_encode(&[0xAB; 32]);
1208        let upper = lower.to_uppercase();
1209        let decoded = hex_decode(&upper).unwrap();
1210        assert_eq!(decoded, [0xAB; 32]);
1211    }
1212
1213    // -----------------------------------------------------------------------
1214    // Timestamp formatting
1215    // -----------------------------------------------------------------------
1216
1217    #[test]
1218    fn unix_to_iso8601_epoch_zero() {
1219        assert_eq!(unix_to_iso8601(0), "1970-01-01T00:00:00Z");
1220    }
1221
1222    #[test]
1223    fn unix_to_iso8601_known_timestamp() {
1224        // 2025-01-15T10:30:00Z = 1736937000
1225        assert_eq!(unix_to_iso8601(1_736_937_000), "2025-01-15T10:30:00Z");
1226    }
1227
1228    #[test]
1229    fn unix_to_iso8601_y2k() {
1230        // 2000-01-01T00:00:00Z = 946684800
1231        assert_eq!(unix_to_iso8601(946_684_800), "2000-01-01T00:00:00Z");
1232    }
1233
1234    #[test]
1235    fn unix_to_iso8601_leap_year_feb_29() {
1236        // 2024-02-29T12:00:00Z = 1709208000
1237        assert_eq!(unix_to_iso8601(1_709_208_000), "2024-02-29T12:00:00Z");
1238    }
1239
1240    // -----------------------------------------------------------------------
1241    // decrypt_blobs
1242    // -----------------------------------------------------------------------
1243
1244    #[test]
1245    fn decrypt_blobs_with_valid_envelope() {
1246        let key = generate_broadcast_key("did:dht:alice");
1247        let envelope = test_seal(&key, b"hello world");
1248        let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
1249
1250        let blob_id = {
1251            let mut hasher = Sha256::new();
1252            hasher.update(&blob_bytes);
1253            let h: [u8; 32] = hasher.finalize().into();
1254            h
1255        };
1256
1257        let stored = scp_transport::native::storage::StoredBlob {
1258            routing_id: [0xAA; 32],
1259            blob_id,
1260            recipient_hint: None,
1261            blob_ttl: 3600,
1262            stored_at: 1_736_937_000,
1263            blob: blob_bytes,
1264        };
1265
1266        let mut keys = HashMap::new();
1267        keys.insert(0, key);
1268
1269        let (messages, latest_id) = decrypt_blobs(&[stored], &keys);
1270        assert_eq!(messages.len(), 1);
1271        assert_eq!(messages[0].author_did, "did:dht:alice");
1272        assert_eq!(messages[0].key_epoch, 0);
1273        assert_eq!(messages[0].published_at, "2025-01-15T10:30:00Z");
1274        // Verify content is valid base64 that decodes to "hello world".
1275        let decoded = BASE64.decode(&messages[0].content).unwrap();
1276        assert_eq!(decoded, b"hello world");
1277        assert_eq!(latest_id, Some(blob_id));
1278    }
1279
1280    #[test]
1281    fn decrypt_blobs_skips_invalid_msgpack() {
1282        let stored = scp_transport::native::storage::StoredBlob {
1283            routing_id: [0xAA; 32],
1284            blob_id: [0xBB; 32],
1285            recipient_hint: None,
1286            blob_ttl: 3600,
1287            stored_at: 100,
1288            blob: vec![0xFF, 0xFE], // invalid MessagePack
1289        };
1290
1291        let keys = HashMap::new();
1292        let (messages, latest_id) = decrypt_blobs(&[stored], &keys);
1293        assert!(messages.is_empty());
1294        assert!(latest_id.is_none());
1295    }
1296
1297    #[test]
1298    fn decrypt_blobs_skips_missing_epoch_key() {
1299        let key = generate_broadcast_key("did:dht:alice");
1300        let envelope = test_seal(&key, b"secret");
1301        let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
1302
1303        let stored = scp_transport::native::storage::StoredBlob {
1304            routing_id: [0xAA; 32],
1305            blob_id: [0xCC; 32],
1306            recipient_hint: None,
1307            blob_ttl: 3600,
1308            stored_at: 100,
1309            blob: blob_bytes,
1310        };
1311
1312        // Provide keys for epoch 1 only, envelope is epoch 0.
1313        let (rotated_key, _) = rotate_broadcast_key(&key, 1000).unwrap();
1314        let mut keys = HashMap::new();
1315        keys.insert(1, rotated_key);
1316
1317        let (messages, latest_id) = decrypt_blobs(&[stored], &keys);
1318        assert!(messages.is_empty());
1319        assert!(latest_id.is_none());
1320    }
1321
1322    // -----------------------------------------------------------------------
1323    // Feed handler integration tests (via axum test harness)
1324    // -----------------------------------------------------------------------
1325
1326    use std::net::SocketAddr;
1327    use std::time::Instant;
1328
1329    use axum::body::Body;
1330    use axum::http::{Request, StatusCode as HttpStatus};
1331    use http_body_util::BodyExt;
1332    use scp_transport::native::storage::{BlobStorageBackend, InMemoryBlobStorage};
1333    use tokio::sync::RwLock;
1334    use tower::ServiceExt;
1335
1336    use crate::http::NodeState;
1337
1338    /// Creates a test `NodeState` with the given projected contexts and
1339    /// blob storage.
1340    fn test_state_with(
1341        projected: HashMap<[u8; 32], ProjectedContext>,
1342        storage: InMemoryBlobStorage,
1343    ) -> Arc<NodeState> {
1344        test_state_with_rate(projected, storage, 1000)
1345    }
1346
1347    /// Creates a test `NodeState` with the given projected contexts,
1348    /// blob storage, and projection rate limit.
1349    fn test_state_with_rate(
1350        projected: HashMap<[u8; 32], ProjectedContext>,
1351        storage: InMemoryBlobStorage,
1352        rate_limit: u32,
1353    ) -> Arc<NodeState> {
1354        Arc::new(NodeState {
1355            did: "did:dht:test".to_owned(),
1356            relay_url: "wss://localhost/scp/v1".to_owned(),
1357            broadcast_contexts: RwLock::new(HashMap::new()),
1358            relay_addr: "127.0.0.1:9000".parse::<SocketAddr>().unwrap(),
1359            bridge_secret: zeroize::Zeroizing::new([0u8; 32]),
1360            dev_token: None,
1361            dev_bind_addr: None,
1362            projected_contexts: RwLock::new(projected),
1363            blob_storage: Arc::new(BlobStorageBackend::from(storage)),
1364            relay_config: scp_transport::native::server::RelayConfig::default(),
1365            start_time: Instant::now(),
1366            http_bind_addr: SocketAddr::from(([0, 0, 0, 0], 8443)),
1367            shutdown_token: tokio_util::sync::CancellationToken::new(),
1368            cors_origins: None,
1369            projection_rate_limiter: scp_transport::relay::rate_limit::PublishRateLimiter::new(
1370                rate_limit,
1371            ),
1372            tls_config: None,
1373            cert_resolver: None,
1374            did_document: scp_identity::document::DidDocument {
1375                context: vec!["https://www.w3.org/ns/did/v1".to_owned()],
1376                id: "did:dht:test".to_owned(),
1377                verification_method: vec![],
1378                authentication: vec![],
1379                assertion_method: vec![],
1380                also_known_as: vec![],
1381                service: vec![],
1382            },
1383            connection_tracker: scp_transport::relay::rate_limit::new_connection_tracker(),
1384            subscription_registry: scp_transport::relay::subscription::new_registry(),
1385            acme_challenges: None,
1386            bridge_state: Arc::new(crate::bridge_handlers::BridgeState::new()),
1387        })
1388    }
1389
1390    #[tokio::test]
1391    async fn feed_unknown_routing_id_returns_404() {
1392        let state = test_state_with(HashMap::new(), InMemoryBlobStorage::new());
1393        let router = broadcast_projection_router(state);
1394
1395        let routing_hex = hex_encode(&[0xAA; 32]);
1396        let req = Request::builder()
1397            .uri(format!("/scp/broadcast/{routing_hex}/feed"))
1398            .body(Body::empty())
1399            .unwrap();
1400
1401        let resp = router.oneshot(req).await.unwrap();
1402        assert_eq!(resp.status(), HttpStatus::NOT_FOUND);
1403
1404        let body = resp.into_body().collect().await.unwrap().to_bytes();
1405        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1406        assert_eq!(json["code"], "NOT_FOUND");
1407    }
1408
1409    #[tokio::test]
1410    async fn feed_invalid_hex_returns_400() {
1411        let state = test_state_with(HashMap::new(), InMemoryBlobStorage::new());
1412        let router = broadcast_projection_router(state);
1413
1414        let req = Request::builder()
1415            .uri("/scp/broadcast/not_hex/feed")
1416            .body(Body::empty())
1417            .unwrap();
1418
1419        let resp = router.oneshot(req).await.unwrap();
1420        assert_eq!(resp.status(), HttpStatus::BAD_REQUEST);
1421    }
1422
1423    #[tokio::test]
1424    async fn feed_returns_decrypted_messages_with_cache_headers() {
1425        let key = generate_broadcast_key("did:dht:alice");
1426        let context_id = "test_ctx_001";
1427        let projected =
1428            ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
1429        let routing_id = projected.routing_id;
1430
1431        let mut projected_map = HashMap::new();
1432        projected_map.insert(routing_id, projected);
1433
1434        let storage = InMemoryBlobStorage::new();
1435
1436        // Seal and store a broadcast message.
1437        let envelope = test_seal(&key, b"hello feed");
1438        let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
1439        let blob_id = {
1440            let mut h = Sha256::new();
1441            h.update(&blob_bytes);
1442            let r: [u8; 32] = h.finalize().into();
1443            r
1444        };
1445
1446        storage
1447            .store(routing_id, blob_id, None, 3600, blob_bytes)
1448            .await
1449            .unwrap();
1450
1451        let state = test_state_with(projected_map, storage);
1452        let router = broadcast_projection_router(state);
1453
1454        let routing_hex = hex_encode(&routing_id);
1455        let req = Request::builder()
1456            .uri(format!("/scp/broadcast/{routing_hex}/feed"))
1457            .body(Body::empty())
1458            .unwrap();
1459
1460        let resp = router.oneshot(req).await.unwrap();
1461        assert_eq!(resp.status(), HttpStatus::OK);
1462
1463        // Check Cache-Control header.
1464        let cache_control = resp
1465            .headers()
1466            .get(header::CACHE_CONTROL)
1467            .unwrap()
1468            .to_str()
1469            .unwrap();
1470        assert_eq!(
1471            cache_control,
1472            "public, max-age=30, stale-while-revalidate=300"
1473        );
1474
1475        // Check ETag header is present and contains the blob_id.
1476        let etag = resp.headers().get(header::ETAG).unwrap().to_str().unwrap();
1477        assert!(etag.contains(&hex_encode(&blob_id)));
1478
1479        // Parse response body.
1480        let body = resp.into_body().collect().await.unwrap().to_bytes();
1481        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1482
1483        assert_eq!(json["context_id"], "test_ctx_001");
1484        assert_eq!(json["author_did"], "did:dht:alice");
1485
1486        let messages = json["messages"].as_array().unwrap();
1487        assert_eq!(messages.len(), 1);
1488        assert_eq!(messages[0]["author_did"], "did:dht:alice");
1489        assert_eq!(messages[0]["key_epoch"], 0);
1490
1491        // Verify decrypted content.
1492        let content_b64 = messages[0]["content"].as_str().unwrap();
1493        let decoded = BASE64.decode(content_b64).unwrap();
1494        assert_eq!(decoded, b"hello feed");
1495    }
1496
1497    #[tokio::test]
1498    async fn feed_respects_limit_parameter() {
1499        let key = generate_broadcast_key("did:dht:alice");
1500        let context_id = "limit_ctx";
1501        let projected =
1502            ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
1503        let routing_id = projected.routing_id;
1504
1505        let mut projected_map = HashMap::new();
1506        projected_map.insert(routing_id, projected);
1507
1508        let storage = InMemoryBlobStorage::new();
1509
1510        // Store 5 messages.
1511        for i in 0u8..5 {
1512            let envelope = test_seal(&key, &[i; 10]);
1513            let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
1514            let blob_id = {
1515                let mut h = Sha256::new();
1516                h.update(&blob_bytes);
1517                let r: [u8; 32] = h.finalize().into();
1518                r
1519            };
1520            storage
1521                .store(routing_id, blob_id, None, 3600, blob_bytes)
1522                .await
1523                .unwrap();
1524        }
1525
1526        let state = test_state_with(projected_map, storage);
1527        let router = broadcast_projection_router(state);
1528
1529        let routing_hex = hex_encode(&routing_id);
1530        let req = Request::builder()
1531            .uri(format!("/scp/broadcast/{routing_hex}/feed?limit=2"))
1532            .body(Body::empty())
1533            .unwrap();
1534
1535        let resp = router.oneshot(req).await.unwrap();
1536        assert_eq!(resp.status(), HttpStatus::OK);
1537
1538        let body = resp.into_body().collect().await.unwrap().to_bytes();
1539        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1540        let messages = json["messages"].as_array().unwrap();
1541        assert_eq!(messages.len(), 2);
1542    }
1543
1544    #[tokio::test]
1545    async fn feed_limit_clamped_to_100() {
1546        let key = generate_broadcast_key("did:dht:alice");
1547        let context_id = "clamp_ctx";
1548        let projected =
1549            ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
1550        let routing_id = projected.routing_id;
1551
1552        let mut projected_map = HashMap::new();
1553        projected_map.insert(routing_id, projected);
1554
1555        let storage = InMemoryBlobStorage::new();
1556        let state = test_state_with(projected_map, storage);
1557        let router = broadcast_projection_router(state);
1558
1559        // Request limit=999 — should not crash, just clamp to 100.
1560        let routing_hex = hex_encode(&routing_id);
1561        let req = Request::builder()
1562            .uri(format!("/scp/broadcast/{routing_hex}/feed?limit=999"))
1563            .body(Body::empty())
1564            .unwrap();
1565
1566        let resp = router.oneshot(req).await.unwrap();
1567        assert_eq!(resp.status(), HttpStatus::OK);
1568
1569        let body = resp.into_body().collect().await.unwrap().to_bytes();
1570        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1571        // Empty messages is fine; the point is it didn't error.
1572        assert!(json["messages"].as_array().unwrap().is_empty());
1573    }
1574
1575    #[tokio::test]
1576    async fn feed_empty_context_returns_empty_messages() {
1577        let key = generate_broadcast_key("did:dht:alice");
1578        let context_id = "empty_ctx";
1579        let projected = ProjectedContext::new(context_id, key, BroadcastAdmission::Open, None);
1580        let routing_id = projected.routing_id;
1581
1582        let mut projected_map = HashMap::new();
1583        projected_map.insert(routing_id, projected);
1584
1585        let state = test_state_with(projected_map, InMemoryBlobStorage::new());
1586        let router = broadcast_projection_router(state);
1587
1588        let routing_hex = hex_encode(&routing_id);
1589        let req = Request::builder()
1590            .uri(format!("/scp/broadcast/{routing_hex}/feed"))
1591            .body(Body::empty())
1592            .unwrap();
1593
1594        let resp = router.oneshot(req).await.unwrap();
1595        assert_eq!(resp.status(), HttpStatus::OK);
1596
1597        let body = resp.into_body().collect().await.unwrap().to_bytes();
1598        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1599        assert_eq!(json["context_id"], "empty_ctx");
1600        assert_eq!(json["author_did"], "");
1601        assert!(json["messages"].as_array().unwrap().is_empty());
1602
1603        // No ETag when there are no messages.
1604        // (The resp was consumed, but we checked status above.)
1605    }
1606
1607    // -----------------------------------------------------------------------
1608    // Per-message handler integration tests
1609    // -----------------------------------------------------------------------
1610
1611    #[tokio::test]
1612    async fn message_unknown_routing_id_returns_404() {
1613        let state = test_state_with(HashMap::new(), InMemoryBlobStorage::new());
1614        let router = broadcast_projection_router(state);
1615
1616        let routing_hex = hex_encode(&[0xAA; 32]);
1617        let blob_hex = hex_encode(&[0xBB; 32]);
1618        let req = Request::builder()
1619            .uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
1620            .body(Body::empty())
1621            .unwrap();
1622
1623        let resp = router.oneshot(req).await.unwrap();
1624        assert_eq!(resp.status(), HttpStatus::NOT_FOUND);
1625
1626        let body = resp.into_body().collect().await.unwrap().to_bytes();
1627        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1628        assert_eq!(json["code"], "NOT_FOUND");
1629    }
1630
1631    #[tokio::test]
1632    async fn message_unknown_blob_id_returns_404() {
1633        let key = generate_broadcast_key("did:dht:alice");
1634        let context_id = "msg_ctx_404";
1635        let projected = ProjectedContext::new(context_id, key, BroadcastAdmission::Open, None);
1636        let routing_id = projected.routing_id;
1637
1638        let mut projected_map = HashMap::new();
1639        projected_map.insert(routing_id, projected);
1640
1641        let storage = InMemoryBlobStorage::new();
1642        let state = test_state_with(projected_map, storage);
1643        let router = broadcast_projection_router(state);
1644
1645        let routing_hex = hex_encode(&routing_id);
1646        let blob_hex = hex_encode(&[0xCC; 32]); // not in storage
1647        let req = Request::builder()
1648            .uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
1649            .body(Body::empty())
1650            .unwrap();
1651
1652        let resp = router.oneshot(req).await.unwrap();
1653        assert_eq!(resp.status(), HttpStatus::NOT_FOUND);
1654
1655        let body = resp.into_body().collect().await.unwrap().to_bytes();
1656        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1657        assert_eq!(json["code"], "NOT_FOUND");
1658    }
1659
1660    #[tokio::test]
1661    async fn message_returns_decrypted_single_message() {
1662        let key = generate_broadcast_key("did:dht:alice");
1663        let context_id = "msg_ctx_ok";
1664        let projected =
1665            ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
1666        let routing_id = projected.routing_id;
1667
1668        let mut projected_map = HashMap::new();
1669        projected_map.insert(routing_id, projected);
1670
1671        let storage = InMemoryBlobStorage::new();
1672
1673        // Seal and store a broadcast message.
1674        let envelope = test_seal(&key, b"single message");
1675        let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
1676        let blob_id = {
1677            let mut h = Sha256::new();
1678            h.update(&blob_bytes);
1679            let r: [u8; 32] = h.finalize().into();
1680            r
1681        };
1682
1683        storage
1684            .store(routing_id, blob_id, None, 3600, blob_bytes)
1685            .await
1686            .unwrap();
1687
1688        let state = test_state_with(projected_map, storage);
1689        let router = broadcast_projection_router(state);
1690
1691        let routing_hex = hex_encode(&routing_id);
1692        let blob_hex = hex_encode(&blob_id);
1693        let req = Request::builder()
1694            .uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
1695            .body(Body::empty())
1696            .unwrap();
1697
1698        let resp = router.oneshot(req).await.unwrap();
1699        assert_eq!(resp.status(), HttpStatus::OK);
1700
1701        // Check Cache-Control header (immutable for per-message).
1702        let cache_control = resp
1703            .headers()
1704            .get(header::CACHE_CONTROL)
1705            .unwrap()
1706            .to_str()
1707            .unwrap();
1708        assert_eq!(cache_control, "public, immutable, max-age=31536000");
1709
1710        // Check ETag header contains the blob_id.
1711        let etag = resp.headers().get(header::ETAG).unwrap().to_str().unwrap();
1712        assert_eq!(etag, format!("\"{blob_hex}\""));
1713
1714        // Parse response body.
1715        let body = resp.into_body().collect().await.unwrap().to_bytes();
1716        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1717
1718        assert_eq!(json["id"], blob_hex);
1719        assert_eq!(json["author_did"], "did:dht:alice");
1720        assert_eq!(json["key_epoch"], 0);
1721
1722        // Verify decrypted content.
1723        let content_b64 = json["content"].as_str().unwrap();
1724        let decoded = BASE64.decode(content_b64).unwrap();
1725        assert_eq!(decoded, b"single message");
1726    }
1727
1728    #[tokio::test]
1729    async fn message_conditional_get_returns_304() {
1730        let key = generate_broadcast_key("did:dht:alice");
1731        let context_id = "msg_ctx_304";
1732        let projected =
1733            ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
1734        let routing_id = projected.routing_id;
1735
1736        let mut projected_map = HashMap::new();
1737        projected_map.insert(routing_id, projected);
1738
1739        let storage = InMemoryBlobStorage::new();
1740
1741        let envelope = test_seal(&key, b"cached msg");
1742        let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
1743        let blob_id = {
1744            let mut h = Sha256::new();
1745            h.update(&blob_bytes);
1746            let r: [u8; 32] = h.finalize().into();
1747            r
1748        };
1749
1750        storage
1751            .store(routing_id, blob_id, None, 3600, blob_bytes)
1752            .await
1753            .unwrap();
1754
1755        let state = test_state_with(projected_map, storage);
1756        let router = broadcast_projection_router(state);
1757
1758        let routing_hex = hex_encode(&routing_id);
1759        let blob_hex = hex_encode(&blob_id);
1760        let etag_value = format!("\"{blob_hex}\"");
1761        let req = Request::builder()
1762            .uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
1763            .header("If-None-Match", &etag_value)
1764            .body(Body::empty())
1765            .unwrap();
1766
1767        let resp = router.oneshot(req).await.unwrap();
1768        assert_eq!(resp.status(), HttpStatus::NOT_MODIFIED);
1769
1770        // Body should be empty.
1771        let body = resp.into_body().collect().await.unwrap().to_bytes();
1772        assert!(body.is_empty());
1773    }
1774
1775    #[tokio::test]
1776    async fn message_conditional_get_non_matching_returns_200() {
1777        let key = generate_broadcast_key("did:dht:alice");
1778        let context_id = "msg_ctx_200";
1779        let projected =
1780            ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
1781        let routing_id = projected.routing_id;
1782
1783        let mut projected_map = HashMap::new();
1784        projected_map.insert(routing_id, projected);
1785
1786        let storage = InMemoryBlobStorage::new();
1787
1788        let envelope = test_seal(&key, b"fresh msg");
1789        let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
1790        let blob_id = {
1791            let mut h = Sha256::new();
1792            h.update(&blob_bytes);
1793            let r: [u8; 32] = h.finalize().into();
1794            r
1795        };
1796
1797        storage
1798            .store(routing_id, blob_id, None, 3600, blob_bytes)
1799            .await
1800            .unwrap();
1801
1802        let state = test_state_with(projected_map, storage);
1803        let router = broadcast_projection_router(state);
1804
1805        let routing_hex = hex_encode(&routing_id);
1806        let blob_hex = hex_encode(&blob_id);
1807        // Send a non-matching ETag.
1808        let wrong_etag = format!("\"{}\"", hex_encode(&[0xFF; 32]));
1809        let req = Request::builder()
1810            .uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
1811            .header("If-None-Match", &wrong_etag)
1812            .body(Body::empty())
1813            .unwrap();
1814
1815        let resp = router.oneshot(req).await.unwrap();
1816        assert_eq!(resp.status(), HttpStatus::OK);
1817
1818        let body = resp.into_body().collect().await.unwrap().to_bytes();
1819        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1820        assert_eq!(json["id"], blob_hex);
1821    }
1822
1823    #[tokio::test]
1824    async fn message_invalid_hex_returns_400() {
1825        let state = test_state_with(HashMap::new(), InMemoryBlobStorage::new());
1826        let router = broadcast_projection_router(state);
1827
1828        // Invalid routing_id hex.
1829        let req = Request::builder()
1830            .uri("/scp/broadcast/not_valid_hex/messages/also_not_hex")
1831            .body(Body::empty())
1832            .unwrap();
1833
1834        let resp = router.oneshot(req).await.unwrap();
1835        assert_eq!(resp.status(), HttpStatus::BAD_REQUEST);
1836    }
1837
1838    // -----------------------------------------------------------------------
1839    // Original tests
1840    // -----------------------------------------------------------------------
1841
1842    #[test]
1843    fn compute_routing_id_is_sha256_of_context_id_bytes() {
1844        let context_id = "abc123deadbeef";
1845        let routing_id = compute_routing_id(context_id);
1846
1847        // Manually compute expected SHA-256.
1848        let mut hasher = Sha256::new();
1849        hasher.update(context_id.as_bytes());
1850        let expected: [u8; 32] = hasher.finalize().into();
1851
1852        assert_eq!(routing_id, expected);
1853    }
1854
1855    #[test]
1856    fn compute_routing_id_deterministic() {
1857        let id = "deadbeefcafe0123456789abcdef";
1858        assert_eq!(compute_routing_id(id), compute_routing_id(id));
1859    }
1860
1861    #[test]
1862    fn compute_routing_id_distinct_for_different_inputs() {
1863        let a = compute_routing_id("context_a");
1864        let b = compute_routing_id("context_b");
1865        assert_ne!(a, b);
1866    }
1867
1868    #[test]
1869    fn projected_context_new_sets_routing_id() {
1870        let key = generate_broadcast_key("did:dht:alice");
1871        let ctx = ProjectedContext::new("abc123", key, BroadcastAdmission::Open, None);
1872
1873        let expected_routing_id = compute_routing_id("abc123");
1874        assert_eq!(ctx.routing_id, expected_routing_id);
1875        assert_eq!(ctx.context_id(), "abc123");
1876    }
1877
1878    #[test]
1879    fn projected_context_new_inserts_key_at_epoch() {
1880        let key = generate_broadcast_key("did:dht:alice");
1881        let ctx = ProjectedContext::new("abc123", key, BroadcastAdmission::Open, None);
1882
1883        assert!(ctx.key_for_epoch(0).is_some());
1884        assert_eq!(ctx.keys().len(), 1);
1885    }
1886
1887    #[test]
1888    fn enable_then_disable_roundtrip() {
1889        // Simulate the registry lifecycle: insert then remove.
1890        let mut registry: HashMap<[u8; 32], ProjectedContext> = HashMap::new();
1891
1892        let context_id = "test_context_001";
1893        let key = generate_broadcast_key("did:dht:alice");
1894        let routing_id = compute_routing_id(context_id);
1895
1896        let projected = ProjectedContext::new(context_id, key, BroadcastAdmission::Open, None);
1897        registry.insert(routing_id, projected);
1898        assert!(registry.contains_key(&routing_id));
1899
1900        // Disable: remove from registry.
1901        registry.remove(&routing_id);
1902        assert!(!registry.contains_key(&routing_id));
1903    }
1904
1905    #[test]
1906    fn multiple_epochs_stored_and_retrievable() {
1907        let key0 = generate_broadcast_key("did:dht:alice");
1908        let mut ctx =
1909            ProjectedContext::new("multi_epoch_ctx", key0, BroadcastAdmission::Open, None);
1910
1911        // Rotate to epoch 1.
1912        let key0_ref = ctx.key_for_epoch(0).expect("epoch 0 should exist");
1913        let (key1, _advance) = rotate_broadcast_key(key0_ref, 1000).expect("rotate should succeed");
1914        assert_eq!(key1.epoch(), 1);
1915        ctx.insert_key(key1);
1916
1917        // Rotate to epoch 2 from epoch 1.
1918        let key1_ref = ctx.key_for_epoch(1).expect("epoch 1 should exist");
1919        let (key2, _advance) = rotate_broadcast_key(key1_ref, 2000).expect("rotate should succeed");
1920        assert_eq!(key2.epoch(), 2);
1921        ctx.insert_key(key2);
1922
1923        // All three epochs are retained (no pruning on advance).
1924        assert!(ctx.key_for_epoch(0).is_some(), "epoch 0 retained");
1925        assert!(ctx.key_for_epoch(1).is_some(), "epoch 1 retained");
1926        assert!(ctx.key_for_epoch(2).is_some(), "epoch 2 retained");
1927        assert_eq!(ctx.keys().len(), 3);
1928
1929        // Non-existent epoch returns None.
1930        assert!(ctx.key_for_epoch(99).is_none());
1931    }
1932
1933    #[test]
1934    fn insert_key_replaces_existing_epoch() {
1935        let key0 = generate_broadcast_key("did:dht:alice");
1936        let mut ctx = ProjectedContext::new("replace_test", key0, BroadcastAdmission::Open, None);
1937
1938        // Insert a different key at epoch 0 (replacement).
1939        let replacement = generate_broadcast_key("did:dht:alice");
1940        ctx.insert_key(replacement);
1941
1942        assert_eq!(ctx.keys().len(), 1);
1943        assert!(ctx.key_for_epoch(0).is_some());
1944    }
1945
1946    #[test]
1947    fn retain_only_epochs_keeps_specified_purges_rest() {
1948        let key0 = generate_broadcast_key("did:dht:alice");
1949        let mut ctx = ProjectedContext::new("purge_test", key0, BroadcastAdmission::Open, None);
1950
1951        // Build up epochs 0, 1, 2.
1952        let k0 = ctx.key_for_epoch(0).unwrap().clone();
1953        let (k1, _) = rotate_broadcast_key(&k0, 1000).unwrap();
1954        ctx.insert_key(k1.clone());
1955        let (k2, _) = rotate_broadcast_key(&k1, 2000).unwrap();
1956        ctx.insert_key(k2);
1957        assert_eq!(ctx.keys().len(), 3);
1958
1959        // Retain only epoch 2 (simulates Full-scope ban where author
1960        // rotated to epoch 2).
1961        let retain = std::collections::HashSet::from([2]);
1962        ctx.retain_only_epochs(&retain);
1963
1964        assert!(ctx.key_for_epoch(0).is_none(), "epoch 0 purged");
1965        assert!(ctx.key_for_epoch(1).is_none(), "epoch 1 purged");
1966        assert!(ctx.key_for_epoch(2).is_some(), "epoch 2 retained");
1967        assert_eq!(ctx.keys().len(), 1);
1968    }
1969
1970    #[test]
1971    fn retain_only_epochs_handles_divergent_authors() {
1972        // Simulate two authors at different epochs: alice at epoch 3,
1973        // bob at epoch 1. Full-scope ban should retain only {3, 1}.
1974        let alice_key = generate_broadcast_key("did:dht:alice");
1975        let mut ctx = ProjectedContext::new(
1976            "divergent_test",
1977            alice_key.clone(),
1978            BroadcastAdmission::Open,
1979            None,
1980        );
1981
1982        // Alice: epochs 0, 1, 2, 3.
1983        let (a1, _) = rotate_broadcast_key(&alice_key, 1000).unwrap();
1984        ctx.insert_key(a1.clone());
1985        let (a2, _) = rotate_broadcast_key(&a1, 2000).unwrap();
1986        ctx.insert_key(a2.clone());
1987        let (a3, _) = rotate_broadcast_key(&a2, 3000).unwrap();
1988        ctx.insert_key(a3);
1989
1990        // Bob: epochs 4, 5 (start at epoch 4 to avoid collision with
1991        // alice's epochs in the HashMap).
1992        let bob_key = generate_broadcast_key("did:dht:bob");
1993        let (b1, _) = rotate_broadcast_key(&bob_key, 1000).unwrap();
1994        let (b2, _) = rotate_broadcast_key(&b1, 2000).unwrap();
1995        let (b3, _) = rotate_broadcast_key(&b2, 3000).unwrap();
1996        let (b4, _) = rotate_broadcast_key(&b3, 4000).unwrap();
1997        ctx.insert_key(b4.clone());
1998        let (b5, _) = rotate_broadcast_key(&b4, 5000).unwrap();
1999        ctx.insert_key(b5);
2000
2001        assert_eq!(ctx.keys().len(), 6); // epochs 0,1,2,3 (alice) + 4,5 (bob)
2002
2003        // Full-scope ban: alice rotated to 3, bob rotated to 5.
2004        let retain = std::collections::HashSet::from([3, 5]);
2005        ctx.retain_only_epochs(&retain);
2006
2007        assert_eq!(ctx.keys().len(), 2);
2008        assert!(ctx.key_for_epoch(3).is_some(), "alice new epoch retained");
2009        assert!(ctx.key_for_epoch(5).is_some(), "bob new epoch retained");
2010        assert!(ctx.key_for_epoch(0).is_none(), "old alice epoch purged");
2011        assert!(ctx.key_for_epoch(1).is_none(), "old alice epoch purged");
2012        assert!(ctx.key_for_epoch(2).is_none(), "old alice epoch purged");
2013        assert!(ctx.key_for_epoch(4).is_none(), "old bob epoch purged");
2014    }
2015
2016    // -------------------------------------------------------------------
2017    // Test A: Cross-context routing_id mismatch (BLACK-HTTP-005 defense)
2018    // -------------------------------------------------------------------
2019
2020    #[tokio::test]
2021    #[allow(clippy::similar_names)]
2022    async fn message_cross_context_routing_id_mismatch_returns_404() {
2023        // Create two projected contexts with different routing IDs.
2024        let key_a = generate_broadcast_key("did:dht:alice");
2025        let key_b = generate_broadcast_key("did:dht:bob");
2026        let ctx_a =
2027            ProjectedContext::new("context_a", key_a.clone(), BroadcastAdmission::Open, None);
2028        let ctx_b = ProjectedContext::new("context_b", key_b, BroadcastAdmission::Open, None);
2029        let routing_id_a = ctx_a.routing_id;
2030        let routing_id_b = ctx_b.routing_id;
2031
2032        let mut projected_map = HashMap::new();
2033        projected_map.insert(routing_id_a, ctx_a);
2034        projected_map.insert(routing_id_b, ctx_b);
2035
2036        let storage = InMemoryBlobStorage::new();
2037
2038        // Seal and store a blob under routing_A.
2039        let envelope = test_seal(&key_a, b"belongs to context_a");
2040        let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
2041        let blob_id = {
2042            let mut h = Sha256::new();
2043            h.update(&blob_bytes);
2044            let r: [u8; 32] = h.finalize().into();
2045            r
2046        };
2047        storage
2048            .store(routing_id_a, blob_id, None, 3600, blob_bytes)
2049            .await
2050            .unwrap();
2051
2052        let state = test_state_with(projected_map, storage);
2053
2054        // Request the blob via routing_B (wrong context) → must be 404.
2055        let router_b = broadcast_projection_router(Arc::clone(&state));
2056        let routing_b_hex = hex_encode(&routing_id_b);
2057        let blob_hex = hex_encode(&blob_id);
2058        let req = Request::builder()
2059            .uri(format!(
2060                "/scp/broadcast/{routing_b_hex}/messages/{blob_hex}"
2061            ))
2062            .body(Body::empty())
2063            .unwrap();
2064
2065        let resp = router_b.oneshot(req).await.unwrap();
2066        assert_eq!(
2067            resp.status(),
2068            HttpStatus::NOT_FOUND,
2069            "cross-context request must return 404, not leak blob existence"
2070        );
2071
2072        let body = resp.into_body().collect().await.unwrap().to_bytes();
2073        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
2074        assert_eq!(
2075            json["error"], "unknown blob_id",
2076            "error message must be indistinguishable from genuinely missing blob"
2077        );
2078
2079        // Verify the same blob is accessible via routing_A (correct context) → 200.
2080        let router_a = broadcast_projection_router(state);
2081        let routing_a_hex = hex_encode(&routing_id_a);
2082        let req = Request::builder()
2083            .uri(format!(
2084                "/scp/broadcast/{routing_a_hex}/messages/{blob_hex}"
2085            ))
2086            .body(Body::empty())
2087            .unwrap();
2088
2089        let resp = router_a.oneshot(req).await.unwrap();
2090        assert_eq!(
2091            resp.status(),
2092            HttpStatus::OK,
2093            "correct context should return 200"
2094        );
2095    }
2096
2097    // -------------------------------------------------------------------
2098    // Test B: Feed with `since` parameter
2099    // -------------------------------------------------------------------
2100
2101    #[tokio::test]
2102    async fn feed_since_parameter_filters_messages() {
2103        let key = generate_broadcast_key("did:dht:alice");
2104        let context_id = "since_ctx";
2105        let projected =
2106            ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
2107        let routing_id = projected.routing_id;
2108
2109        let mut projected_map = HashMap::new();
2110        projected_map.insert(routing_id, projected);
2111
2112        let storage = InMemoryBlobStorage::new();
2113
2114        // Store 3 blobs. Because InMemoryBlobStorage uses real timestamps,
2115        // all blobs may get the same stored_at. We rely on the since
2116        // parameter resolving via blob lookup → stored_at filtering.
2117        let mut blob_ids = Vec::new();
2118        for i in 0u8..3 {
2119            let envelope = test_seal(&key, &[i; 16]);
2120            let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
2121            let blob_id = {
2122                let mut h = Sha256::new();
2123                h.update(&blob_bytes);
2124                let r: [u8; 32] = h.finalize().into();
2125                r
2126            };
2127            storage
2128                .store(routing_id, blob_id, None, 3600, blob_bytes)
2129                .await
2130                .unwrap();
2131            blob_ids.push(blob_id);
2132        }
2133
2134        let state = test_state_with(projected_map, storage);
2135        let routing_hex = hex_encode(&routing_id);
2136
2137        // Test 1: since=blob_1 — all blobs have the same stored_at (within
2138        // the same second), so since filtering returns blobs with
2139        // stored_at > since_blob.stored_at. Since they're all equal, this
2140        // returns 0. This validates the since lookup path works without error.
2141        let router = broadcast_projection_router(Arc::clone(&state));
2142        let since_hex = hex_encode(&blob_ids[0]);
2143        let req = Request::builder()
2144            .uri(format!(
2145                "/scp/broadcast/{routing_hex}/feed?since={since_hex}"
2146            ))
2147            .body(Body::empty())
2148            .unwrap();
2149
2150        let resp = router.oneshot(req).await.unwrap();
2151        assert_eq!(resp.status(), HttpStatus::OK);
2152
2153        // Test 2: since=nonexistent_id — returns empty feed (not all blobs).
2154        // A nonexistent blob_id could be a cross-context oracle probe, so we
2155        // treat it as "nothing new" rather than returning the full feed.
2156        let router = broadcast_projection_router(Arc::clone(&state));
2157        let nonexistent = hex_encode(&[0xFF; 32]);
2158        let req = Request::builder()
2159            .uri(format!(
2160                "/scp/broadcast/{routing_hex}/feed?since={nonexistent}"
2161            ))
2162            .body(Body::empty())
2163            .unwrap();
2164
2165        let resp = router.oneshot(req).await.unwrap();
2166        assert_eq!(resp.status(), HttpStatus::OK);
2167        let body = resp.into_body().collect().await.unwrap().to_bytes();
2168        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
2169        let messages = json["messages"].as_array().unwrap();
2170        assert_eq!(
2171            messages.len(),
2172            0,
2173            "nonexistent since blob_id should return empty feed (cross-context oracle prevention)"
2174        );
2175
2176        // Test 3: since=invalid_hex — should return 400.
2177        let router = broadcast_projection_router(state);
2178        let req = Request::builder()
2179            .uri(format!(
2180                "/scp/broadcast/{routing_hex}/feed?since=not_valid_hex"
2181            ))
2182            .body(Body::empty())
2183            .unwrap();
2184
2185        let resp = router.oneshot(req).await.unwrap();
2186        assert_eq!(resp.status(), HttpStatus::BAD_REQUEST);
2187
2188        let body = resp.into_body().collect().await.unwrap().to_bytes();
2189        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
2190        assert_eq!(json["code"], "BAD_REQUEST");
2191    }
2192
2193    // -------------------------------------------------------------------
2194    // Test C: Multi-epoch feed decryption
2195    // -------------------------------------------------------------------
2196
2197    #[tokio::test]
2198    async fn feed_multi_epoch_decryption() {
2199        let key0 = generate_broadcast_key("did:dht:alice");
2200        let (key1, _advance) = rotate_broadcast_key(&key0, 1000).unwrap();
2201
2202        let context_id = "multi_epoch_feed_ctx";
2203        let mut projected =
2204            ProjectedContext::new(context_id, key0.clone(), BroadcastAdmission::Open, None);
2205        projected.insert_key(key1.clone());
2206        let routing_id = projected.routing_id;
2207
2208        let mut projected_map = HashMap::new();
2209        projected_map.insert(routing_id, projected);
2210
2211        let storage = InMemoryBlobStorage::new();
2212
2213        // Seal message_1 with epoch 0 key.
2214        let envelope_0 = test_seal(&key0, b"epoch zero message");
2215        let blob_bytes_0 = rmp_serde::to_vec(&envelope_0).unwrap();
2216        let blob_id_0 = {
2217            let mut h = Sha256::new();
2218            h.update(&blob_bytes_0);
2219            let r: [u8; 32] = h.finalize().into();
2220            r
2221        };
2222        storage
2223            .store(routing_id, blob_id_0, None, 3600, blob_bytes_0)
2224            .await
2225            .unwrap();
2226
2227        // Seal message_2 with epoch 1 key.
2228        let envelope_1 = test_seal(&key1, b"epoch one message");
2229        let blob_bytes_1 = rmp_serde::to_vec(&envelope_1).unwrap();
2230        let blob_id_1 = {
2231            let mut h = Sha256::new();
2232            h.update(&blob_bytes_1);
2233            let r: [u8; 32] = h.finalize().into();
2234            r
2235        };
2236        storage
2237            .store(routing_id, blob_id_1, None, 3600, blob_bytes_1)
2238            .await
2239            .unwrap();
2240
2241        let state = test_state_with(projected_map, storage);
2242        let router = broadcast_projection_router(state);
2243
2244        let routing_hex = hex_encode(&routing_id);
2245        let req = Request::builder()
2246            .uri(format!("/scp/broadcast/{routing_hex}/feed"))
2247            .body(Body::empty())
2248            .unwrap();
2249
2250        let resp = router.oneshot(req).await.unwrap();
2251        assert_eq!(resp.status(), HttpStatus::OK);
2252
2253        let body = resp.into_body().collect().await.unwrap().to_bytes();
2254        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
2255        let messages = json["messages"].as_array().unwrap();
2256        assert_eq!(
2257            messages.len(),
2258            2,
2259            "both epoch-0 and epoch-1 messages returned"
2260        );
2261
2262        // Verify both messages decrypted correctly by checking their content.
2263        let contents: Vec<String> = messages
2264            .iter()
2265            .map(|m| {
2266                let b64 = m["content"].as_str().unwrap();
2267                String::from_utf8(BASE64.decode(b64).unwrap()).unwrap()
2268            })
2269            .collect();
2270        assert!(contents.contains(&"epoch zero message".to_owned()));
2271        assert!(contents.contains(&"epoch one message".to_owned()));
2272
2273        // Verify epoch values are correct.
2274        let epochs: Vec<u64> = messages
2275            .iter()
2276            .map(|m| m["key_epoch"].as_u64().unwrap())
2277            .collect();
2278        assert!(epochs.contains(&0));
2279        assert!(epochs.contains(&1));
2280    }
2281
2282    // -------------------------------------------------------------------
2283    // Test D: Tampered ciphertext (AEAD authentication failure)
2284    // -------------------------------------------------------------------
2285
2286    #[tokio::test]
2287    async fn message_tampered_ciphertext_returns_500() {
2288        let key = generate_broadcast_key("did:dht:alice");
2289        let context_id = "tamper_ctx";
2290        let projected =
2291            ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
2292        let routing_id = projected.routing_id;
2293
2294        let mut projected_map = HashMap::new();
2295        projected_map.insert(routing_id, projected);
2296
2297        let storage = InMemoryBlobStorage::new();
2298
2299        // Seal a message, then tamper with the ciphertext.
2300        let mut envelope = test_seal(&key, b"tamper target");
2301        // Flip a byte in the ciphertext (after the 12-byte nonce).
2302        if envelope.encrypted_content.len() > 13 {
2303            envelope.encrypted_content[13] ^= 0xFF;
2304        }
2305        let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
2306        let blob_id = {
2307            let mut h = Sha256::new();
2308            h.update(&blob_bytes);
2309            let r: [u8; 32] = h.finalize().into();
2310            r
2311        };
2312        storage
2313            .store(routing_id, blob_id, None, 3600, blob_bytes)
2314            .await
2315            .unwrap();
2316
2317        // Also store a valid message.
2318        let valid_envelope = test_seal(&key, b"valid message");
2319        let valid_blob_bytes = rmp_serde::to_vec(&valid_envelope).unwrap();
2320        let valid_blob_id = {
2321            let mut h = Sha256::new();
2322            h.update(&valid_blob_bytes);
2323            let r: [u8; 32] = h.finalize().into();
2324            r
2325        };
2326        storage
2327            .store(routing_id, valid_blob_id, None, 3600, valid_blob_bytes)
2328            .await
2329            .unwrap();
2330
2331        let state = test_state_with(projected_map, storage);
2332
2333        // Per-message endpoint for tampered blob → 500 "decryption failure".
2334        let router = broadcast_projection_router(Arc::clone(&state));
2335        let routing_hex = hex_encode(&routing_id);
2336        let blob_hex = hex_encode(&blob_id);
2337        let req = Request::builder()
2338            .uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
2339            .body(Body::empty())
2340            .unwrap();
2341
2342        let resp = router.oneshot(req).await.unwrap();
2343        assert_eq!(resp.status(), HttpStatus::INTERNAL_SERVER_ERROR);
2344
2345        let body = resp.into_body().collect().await.unwrap().to_bytes();
2346        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
2347        assert_eq!(json["error"], "decryption failure");
2348
2349        // Feed endpoint → tampered message should be silently skipped,
2350        // valid message still returned.
2351        let router = broadcast_projection_router(state);
2352        let req = Request::builder()
2353            .uri(format!("/scp/broadcast/{routing_hex}/feed"))
2354            .body(Body::empty())
2355            .unwrap();
2356
2357        let resp = router.oneshot(req).await.unwrap();
2358        assert_eq!(resp.status(), HttpStatus::OK);
2359
2360        let body = resp.into_body().collect().await.unwrap().to_bytes();
2361        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
2362        let messages = json["messages"].as_array().unwrap();
2363        assert_eq!(
2364            messages.len(),
2365            1,
2366            "tampered message should be skipped, valid message returned"
2367        );
2368
2369        let content_b64 = messages[0]["content"].as_str().unwrap();
2370        let decoded = BASE64.decode(content_b64).unwrap();
2371        assert_eq!(decoded, b"valid message");
2372    }
2373
2374    // -------------------------------------------------------------------
2375    // Test D2: Purged epoch key returns 410 Gone (not 500)
2376    // -------------------------------------------------------------------
2377
2378    #[tokio::test]
2379    async fn message_purged_epoch_returns_410_gone() {
2380        // Seal a message at epoch 0, then purge epoch 0 from the
2381        // projected context (simulating a Full-scope governance ban).
2382        // The message_handler should return 410 Gone.
2383        let key = generate_broadcast_key("did:dht:alice");
2384        let context_id = "purge_410_ctx";
2385        let mut projected =
2386            ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
2387
2388        // Rotate to epoch 1 and retain only epoch 1 (purge epoch 0).
2389        let (key1, _) = rotate_broadcast_key(&key, 1000).unwrap();
2390        projected.insert_key(key1);
2391        let retain = std::collections::HashSet::from([1]);
2392        projected.retain_only_epochs(&retain);
2393        assert!(projected.key_for_epoch(0).is_none());
2394        assert!(projected.key_for_epoch(1).is_some());
2395
2396        let routing_id = projected.routing_id;
2397        let mut projected_map = HashMap::new();
2398        projected_map.insert(routing_id, projected);
2399
2400        let storage = InMemoryBlobStorage::new();
2401
2402        // Store a blob encrypted at epoch 0 (the purged epoch).
2403        let envelope = test_seal(&key, b"old content");
2404        let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
2405        let blob_id = {
2406            let mut h = Sha256::new();
2407            h.update(&blob_bytes);
2408            let r: [u8; 32] = h.finalize().into();
2409            r
2410        };
2411        storage
2412            .store(routing_id, blob_id, None, 3600, blob_bytes)
2413            .await
2414            .unwrap();
2415
2416        let state = test_state_with(projected_map, storage);
2417        let router = broadcast_projection_router(state);
2418        let routing_hex = hex_encode(&routing_id);
2419        let blob_hex = hex_encode(&blob_id);
2420        let req = Request::builder()
2421            .uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
2422            .body(Body::empty())
2423            .unwrap();
2424
2425        let resp = router.oneshot(req).await.unwrap();
2426        assert_eq!(resp.status(), HttpStatus::GONE);
2427
2428        let body = resp.into_body().collect().await.unwrap().to_bytes();
2429        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
2430        assert_eq!(json["error"], "content revoked");
2431        assert_eq!(json["code"], "GONE");
2432    }
2433
2434    // -------------------------------------------------------------------
2435    // Test E: Conditional GET after routing_id fix (BLACK-HTTP-005)
2436    // -------------------------------------------------------------------
2437
2438    #[tokio::test]
2439    async fn message_conditional_get_cross_context_returns_404_not_304() {
2440        // This test verifies that the conditional GET (If-None-Match) does
2441        // NOT short-circuit before the routing_id ownership check. An
2442        // attacker sending If-None-Match for a blob from routing_A to
2443        // routing_B must get 404, not 304.
2444        let key_a = generate_broadcast_key("did:dht:alice");
2445        let key_b = generate_broadcast_key("did:dht:bob");
2446        let ctx_a =
2447            ProjectedContext::new("ctx_a_304", key_a.clone(), BroadcastAdmission::Open, None);
2448        let ctx_b = ProjectedContext::new("ctx_b_304", key_b, BroadcastAdmission::Open, None);
2449        let routing_id_a = ctx_a.routing_id;
2450        let routing_id_b = ctx_b.routing_id;
2451
2452        let mut projected_map = HashMap::new();
2453        projected_map.insert(routing_id_a, ctx_a);
2454        projected_map.insert(routing_id_b, ctx_b);
2455
2456        let storage = InMemoryBlobStorage::new();
2457
2458        // Store blob under routing_A.
2459        let envelope = test_seal(&key_a, b"secret of A");
2460        let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
2461        let blob_id = {
2462            let mut h = Sha256::new();
2463            h.update(&blob_bytes);
2464            let r: [u8; 32] = h.finalize().into();
2465            r
2466        };
2467        storage
2468            .store(routing_id_a, blob_id, None, 3600, blob_bytes)
2469            .await
2470            .unwrap();
2471
2472        let state = test_state_with(projected_map, storage);
2473        let router = broadcast_projection_router(state);
2474
2475        // Request via routing_B with If-None-Match matching the blob_id.
2476        let routing_b_hex = hex_encode(&routing_id_b);
2477        let blob_hex = hex_encode(&blob_id);
2478        let etag_value = format!("\"{blob_hex}\"");
2479        let req = Request::builder()
2480            .uri(format!(
2481                "/scp/broadcast/{routing_b_hex}/messages/{blob_hex}"
2482            ))
2483            .header("If-None-Match", &etag_value)
2484            .body(Body::empty())
2485            .unwrap();
2486
2487        let resp = router.oneshot(req).await.unwrap();
2488        assert_eq!(
2489            resp.status(),
2490            HttpStatus::NOT_FOUND,
2491            "cross-context conditional GET must return 404, not 304"
2492        );
2493
2494        let body = resp.into_body().collect().await.unwrap().to_bytes();
2495        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
2496        assert_eq!(json["error"], "unknown blob_id");
2497    }
2498
2499    // -----------------------------------------------------------------------
2500    // Rate limiting tests
2501    // -----------------------------------------------------------------------
2502
2503    #[tokio::test]
2504    async fn rate_limit_returns_429_when_exceeded() {
2505        // Set rate to 2 req/s so the third request is rate-limited.
2506        let state = test_state_with_rate(HashMap::new(), InMemoryBlobStorage::new(), 2);
2507        let routing_hex = hex_encode(&[0xAA; 32]);
2508        let uri = format!("/scp/broadcast/{routing_hex}/feed");
2509
2510        // First two requests should succeed (404 for unknown routing, but not 429).
2511        for i in 0..2 {
2512            let router = broadcast_projection_router(Arc::clone(&state));
2513            let req = Request::builder().uri(&uri).body(Body::empty()).unwrap();
2514            let resp = router.oneshot(req).await.unwrap();
2515            assert_ne!(
2516                resp.status(),
2517                HttpStatus::TOO_MANY_REQUESTS,
2518                "request {i} should not be rate-limited"
2519            );
2520        }
2521
2522        // Third request should be rate-limited (429).
2523        let router = broadcast_projection_router(Arc::clone(&state));
2524        let req = Request::builder().uri(&uri).body(Body::empty()).unwrap();
2525        let resp = router.oneshot(req).await.unwrap();
2526        assert_eq!(
2527            resp.status(),
2528            HttpStatus::TOO_MANY_REQUESTS,
2529            "third request should be rate-limited"
2530        );
2531    }
2532
2533    #[tokio::test]
2534    async fn rate_limit_allows_different_ips() {
2535        // Verify the limiter uses per-IP buckets via the PublishRateLimiter API directly.
2536        let limiter = scp_transport::relay::rate_limit::PublishRateLimiter::new(1);
2537        let ip_a: std::net::IpAddr = "10.0.0.1".parse().unwrap();
2538        let ip_b: std::net::IpAddr = "10.0.0.2".parse().unwrap();
2539
2540        // First request from each IP should be allowed.
2541        assert!(limiter.check(ip_a).await, "ip_a first request should pass");
2542        assert!(limiter.check(ip_b).await, "ip_b first request should pass");
2543
2544        // Second request from ip_a should be rate-limited.
2545        assert!(
2546            !limiter.check(ip_a).await,
2547            "ip_a second request should be rate-limited"
2548        );
2549
2550        // Second request from ip_b should also be rate-limited (separate bucket, same rate).
2551        assert!(
2552            !limiter.check(ip_b).await,
2553            "ip_b second request should be rate-limited"
2554        );
2555    }
2556
2557    // -----------------------------------------------------------------------
2558    // SCP-GG-008: UCAN validation for gated projection endpoints
2559    // -----------------------------------------------------------------------
2560
2561    use scp_core::context::params::{ProjectionOverride, ProjectionPolicy, ProjectionRule};
2562    use scp_identity::DID;
2563
2564    #[tokio::test]
2565    async fn feed_open_context_serves_without_auth() {
2566        let key = generate_broadcast_key("did:dht:alice");
2567        let context_id = "open_no_auth_ctx";
2568        let projected =
2569            ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Open, None);
2570        let routing_id = projected.routing_id;
2571
2572        let mut projected_map = HashMap::new();
2573        projected_map.insert(routing_id, projected);
2574
2575        let storage = InMemoryBlobStorage::new();
2576
2577        // Store a message.
2578        let envelope = test_seal(&key, b"public content");
2579        let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
2580        let blob_id = {
2581            let mut h = Sha256::new();
2582            h.update(&blob_bytes);
2583            let r: [u8; 32] = h.finalize().into();
2584            r
2585        };
2586        storage
2587            .store(routing_id, blob_id, None, 3600, blob_bytes)
2588            .await
2589            .unwrap();
2590
2591        let state = test_state_with(projected_map, storage);
2592        let router = broadcast_projection_router(state);
2593
2594        // Request without any Authorization header — should succeed.
2595        let routing_hex = hex_encode(&routing_id);
2596        let req = Request::builder()
2597            .uri(format!("/scp/broadcast/{routing_hex}/feed"))
2598            .body(Body::empty())
2599            .unwrap();
2600
2601        let resp = router.oneshot(req).await.unwrap();
2602        assert_eq!(resp.status(), HttpStatus::OK);
2603
2604        // Verify public cache-control header.
2605        let cache_control = resp
2606            .headers()
2607            .get(header::CACHE_CONTROL)
2608            .unwrap()
2609            .to_str()
2610            .unwrap();
2611        assert_eq!(
2612            cache_control,
2613            "public, max-age=30, stale-while-revalidate=300"
2614        );
2615    }
2616
2617    #[tokio::test]
2618    async fn feed_gated_context_rejects_without_auth() {
2619        let key = generate_broadcast_key("did:dht:alice");
2620        let context_id = "gated_no_auth_ctx";
2621        let projected = ProjectedContext::new(context_id, key, BroadcastAdmission::Gated, None);
2622        let routing_id = projected.routing_id;
2623
2624        let mut projected_map = HashMap::new();
2625        projected_map.insert(routing_id, projected);
2626
2627        let state = test_state_with(projected_map, InMemoryBlobStorage::new());
2628        let router = broadcast_projection_router(state);
2629
2630        let routing_hex = hex_encode(&routing_id);
2631        let req = Request::builder()
2632            .uri(format!("/scp/broadcast/{routing_hex}/feed"))
2633            .body(Body::empty())
2634            .unwrap();
2635
2636        let resp = router.oneshot(req).await.unwrap();
2637        assert_eq!(resp.status(), HttpStatus::UNAUTHORIZED);
2638
2639        let body = resp.into_body().collect().await.unwrap().to_bytes();
2640        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
2641        assert_eq!(json["error"], "Authorization required for gated broadcast");
2642        assert_eq!(json["code"], "UNAUTHORIZED");
2643    }
2644
2645    #[tokio::test]
2646    async fn feed_gated_context_rejects_malformed_auth() {
2647        let key = generate_broadcast_key("did:dht:alice");
2648        let context_id = "gated_bad_auth_ctx";
2649        let projected = ProjectedContext::new(context_id, key, BroadcastAdmission::Gated, None);
2650        let routing_id = projected.routing_id;
2651
2652        let mut projected_map = HashMap::new();
2653        projected_map.insert(routing_id, projected);
2654
2655        let state = test_state_with(projected_map, InMemoryBlobStorage::new());
2656        let router = broadcast_projection_router(state);
2657
2658        let routing_hex = hex_encode(&routing_id);
2659
2660        // Test 1: "Basic" scheme (not Bearer).
2661        let req = Request::builder()
2662            .uri(format!("/scp/broadcast/{routing_hex}/feed"))
2663            .header("Authorization", "Basic dXNlcjpwYXNz")
2664            .body(Body::empty())
2665            .unwrap();
2666
2667        let resp = router.oneshot(req).await.unwrap();
2668        assert_eq!(resp.status(), HttpStatus::UNAUTHORIZED);
2669
2670        // Test 2: "Bearer" with invalid JWT (not 3 segments).
2671        let router = broadcast_projection_router(test_state_with(
2672            {
2673                let mut m = HashMap::new();
2674                let key2 = generate_broadcast_key("did:dht:alice");
2675                let p2 = ProjectedContext::new(context_id, key2, BroadcastAdmission::Gated, None);
2676                m.insert(p2.routing_id, p2);
2677                m
2678            },
2679            InMemoryBlobStorage::new(),
2680        ));
2681
2682        let req = Request::builder()
2683            .uri(format!("/scp/broadcast/{routing_hex}/feed"))
2684            .header("Authorization", "Bearer not-a-valid-jwt")
2685            .body(Body::empty())
2686            .unwrap();
2687
2688        let resp = router.oneshot(req).await.unwrap();
2689        assert_eq!(resp.status(), HttpStatus::UNAUTHORIZED);
2690
2691        let body = resp.into_body().collect().await.unwrap().to_bytes();
2692        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
2693        assert_eq!(json["code"], "UNAUTHORIZED");
2694    }
2695
2696    #[tokio::test]
2697    async fn message_gated_context_rejects_without_auth() {
2698        let key = generate_broadcast_key("did:dht:alice");
2699        let context_id = "gated_msg_no_auth";
2700        let projected =
2701            ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Gated, None);
2702        let routing_id = projected.routing_id;
2703
2704        let mut projected_map = HashMap::new();
2705        projected_map.insert(routing_id, projected);
2706
2707        let storage = InMemoryBlobStorage::new();
2708
2709        let envelope = test_seal(&key, b"secret content");
2710        let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
2711        let blob_id = {
2712            let mut h = Sha256::new();
2713            h.update(&blob_bytes);
2714            let r: [u8; 32] = h.finalize().into();
2715            r
2716        };
2717        storage
2718            .store(routing_id, blob_id, None, 3600, blob_bytes)
2719            .await
2720            .unwrap();
2721
2722        let state = test_state_with(projected_map, storage);
2723        let router = broadcast_projection_router(state);
2724
2725        let routing_hex = hex_encode(&routing_id);
2726        let blob_hex = hex_encode(&blob_id);
2727        let req = Request::builder()
2728            .uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
2729            .body(Body::empty())
2730            .unwrap();
2731
2732        let resp = router.oneshot(req).await.unwrap();
2733        assert_eq!(resp.status(), HttpStatus::UNAUTHORIZED);
2734
2735        let body = resp.into_body().collect().await.unwrap().to_bytes();
2736        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
2737        assert_eq!(json["error"], "Authorization required for gated broadcast");
2738        assert_eq!(json["code"], "UNAUTHORIZED");
2739    }
2740
2741    #[tokio::test]
2742    async fn gated_context_returns_private_cache_headers() {
2743        // Create a gated context with a synthetic but structurally valid UCAN.
2744        let key = generate_broadcast_key("did:dht:alice");
2745        let context_id = "gated_cache_ctx";
2746        let projected =
2747            ProjectedContext::new(context_id, key.clone(), BroadcastAdmission::Gated, None);
2748        let routing_id = projected.routing_id;
2749
2750        let mut projected_map = HashMap::new();
2751        projected_map.insert(routing_id, projected);
2752
2753        let storage = InMemoryBlobStorage::new();
2754
2755        let envelope = test_seal(&key, b"gated content");
2756        let blob_bytes = rmp_serde::to_vec(&envelope).unwrap();
2757        let blob_id = {
2758            let mut h = Sha256::new();
2759            h.update(&blob_bytes);
2760            let r: [u8; 32] = h.finalize().into();
2761            r
2762        };
2763        storage
2764            .store(routing_id, blob_id, None, 3600, blob_bytes)
2765            .await
2766            .unwrap();
2767
2768        let state = test_state_with(projected_map, storage);
2769
2770        // Build a structurally valid UCAN JWT with messages:read capability.
2771        let ucan_token = build_test_ucan(context_id);
2772
2773        // Test feed endpoint.
2774        let router = broadcast_projection_router(Arc::clone(&state));
2775        let routing_hex = hex_encode(&routing_id);
2776        let req = Request::builder()
2777            .uri(format!("/scp/broadcast/{routing_hex}/feed"))
2778            .header("Authorization", format!("Bearer {ucan_token}"))
2779            .body(Body::empty())
2780            .unwrap();
2781
2782        let resp = router.oneshot(req).await.unwrap();
2783        assert_eq!(resp.status(), HttpStatus::OK);
2784
2785        let cache_control = resp
2786            .headers()
2787            .get(header::CACHE_CONTROL)
2788            .unwrap()
2789            .to_str()
2790            .unwrap();
2791        assert_eq!(cache_control, "private, max-age=30");
2792
2793        // Test per-message endpoint.
2794        let router = broadcast_projection_router(state);
2795        let blob_hex = hex_encode(&blob_id);
2796        let req = Request::builder()
2797            .uri(format!("/scp/broadcast/{routing_hex}/messages/{blob_hex}"))
2798            .header("Authorization", format!("Bearer {ucan_token}"))
2799            .body(Body::empty())
2800            .unwrap();
2801
2802        let resp = router.oneshot(req).await.unwrap();
2803        assert_eq!(resp.status(), HttpStatus::OK);
2804
2805        let cache_control = resp
2806            .headers()
2807            .get(header::CACHE_CONTROL)
2808            .unwrap()
2809            .to_str()
2810            .unwrap();
2811        assert_eq!(cache_control, "private, immutable, max-age=31536000");
2812    }
2813
2814    // -----------------------------------------------------------------------
2815    // SCP-GG-008: effective_projection_rule unit tests
2816    // -----------------------------------------------------------------------
2817
2818    #[test]
2819    fn effective_rule_open_no_policy() {
2820        let rule = effective_projection_rule(BroadcastAdmission::Open, None, None);
2821        assert_eq!(rule, ProjectionRule::Public);
2822    }
2823
2824    #[test]
2825    fn effective_rule_gated_no_policy() {
2826        let rule = effective_projection_rule(BroadcastAdmission::Gated, None, None);
2827        assert_eq!(rule, ProjectionRule::Gated);
2828    }
2829
2830    #[test]
2831    fn effective_rule_with_policy_default() {
2832        let policy = ProjectionPolicy {
2833            default_rule: ProjectionRule::Gated,
2834            overrides: vec![],
2835        };
2836        let rule = effective_projection_rule(BroadcastAdmission::Open, Some(&policy), None);
2837        assert_eq!(rule, ProjectionRule::Gated);
2838    }
2839
2840    #[test]
2841    fn effective_rule_per_author_override() {
2842        let policy = ProjectionPolicy {
2843            default_rule: ProjectionRule::Gated,
2844            overrides: vec![ProjectionOverride {
2845                did: DID::from("did:dht:special_author"),
2846                rule: ProjectionRule::Public,
2847            }],
2848        };
2849        // Non-matching author falls back to default.
2850        let rule = effective_projection_rule(
2851            BroadcastAdmission::Open,
2852            Some(&policy),
2853            Some("did:dht:other"),
2854        );
2855        assert_eq!(rule, ProjectionRule::Gated);
2856
2857        // Matching author gets override.
2858        let rule = effective_projection_rule(
2859            BroadcastAdmission::Open,
2860            Some(&policy),
2861            Some("did:dht:special_author"),
2862        );
2863        assert_eq!(rule, ProjectionRule::Public);
2864    }
2865
2866    #[test]
2867    fn effective_rule_no_author_uses_default() {
2868        let policy = ProjectionPolicy {
2869            default_rule: ProjectionRule::Gated,
2870            overrides: vec![ProjectionOverride {
2871                did: DID::from("did:dht:someone"),
2872                rule: ProjectionRule::Public,
2873            }],
2874        };
2875        // No author DID → default rule.
2876        let rule = effective_projection_rule(BroadcastAdmission::Open, Some(&policy), None);
2877        assert_eq!(rule, ProjectionRule::Gated);
2878    }
2879
2880    // -----------------------------------------------------------------------
2881    // SCP-GG-008: validate_projection_policy tests
2882    // -----------------------------------------------------------------------
2883
2884    #[test]
2885    fn validate_policy_rejects_public_default_on_gated() {
2886        let policy = ProjectionPolicy {
2887            default_rule: ProjectionRule::Public,
2888            overrides: vec![],
2889        };
2890        let result = validate_projection_policy(BroadcastAdmission::Gated, Some(&policy));
2891        assert!(result.is_err());
2892        assert_eq!(
2893            result.unwrap_err(),
2894            "gated context cannot have public projection rule"
2895        );
2896    }
2897
2898    #[test]
2899    fn validate_policy_rejects_public_override_on_gated() {
2900        let policy = ProjectionPolicy {
2901            default_rule: ProjectionRule::Gated,
2902            overrides: vec![ProjectionOverride {
2903                did: DID::from("did:dht:some_author"),
2904                rule: ProjectionRule::Public,
2905            }],
2906        };
2907        let result = validate_projection_policy(BroadcastAdmission::Gated, Some(&policy));
2908        assert!(result.is_err());
2909        assert_eq!(
2910            result.unwrap_err(),
2911            "gated context cannot have public per-author projection override"
2912        );
2913    }
2914
2915    #[test]
2916    fn validate_policy_accepts_gated_default_on_gated() {
2917        let policy = ProjectionPolicy {
2918            default_rule: ProjectionRule::Gated,
2919            overrides: vec![],
2920        };
2921        assert!(validate_projection_policy(BroadcastAdmission::Gated, Some(&policy),).is_ok());
2922    }
2923
2924    #[test]
2925    fn validate_policy_accepts_public_on_open() {
2926        let policy = ProjectionPolicy {
2927            default_rule: ProjectionRule::Public,
2928            overrides: vec![],
2929        };
2930        assert!(validate_projection_policy(BroadcastAdmission::Open, Some(&policy),).is_ok());
2931    }
2932
2933    #[test]
2934    fn validate_policy_accepts_none_on_gated() {
2935        assert!(validate_projection_policy(BroadcastAdmission::Gated, None).is_ok());
2936    }
2937
2938    // -----------------------------------------------------------------------
2939    // SCP-GG-008: feed per-author override filtering (RED-302)
2940    // -----------------------------------------------------------------------
2941
2942    #[tokio::test]
2943    async fn feed_filters_per_author_gated_messages_without_auth() {
2944        use scp_core::crypto::sender_keys::broadcast::rotate_broadcast_key;
2945
2946        // Open context with per-author Gated override for alice.
2947        // Bob's messages should be visible without auth; alice's should not.
2948        let alice_key = generate_broadcast_key("did:dht:alice");
2949        let bob_key_epoch0 = generate_broadcast_key("did:dht:bob");
2950        let (bob_key, _) = rotate_broadcast_key(&bob_key_epoch0, 1_000).unwrap();
2951        let context_id = "feed_per_author_filter_ctx";
2952
2953        let policy = ProjectionPolicy {
2954            default_rule: ProjectionRule::Public,
2955            overrides: vec![ProjectionOverride {
2956                did: DID::from("did:dht:alice"),
2957                rule: ProjectionRule::Gated,
2958            }],
2959        };
2960
2961        let mut projected = ProjectedContext::new(
2962            context_id,
2963            alice_key.clone(),
2964            BroadcastAdmission::Open,
2965            Some(policy),
2966        );
2967        projected.keys.insert(bob_key.epoch(), bob_key.clone());
2968        let routing_id = projected.routing_id;
2969
2970        let mut projected_map = HashMap::new();
2971        projected_map.insert(routing_id, projected);
2972
2973        let storage = InMemoryBlobStorage::new();
2974
2975        // Store alice's message (should be filtered from unauthenticated feed).
2976        let alice_env = test_seal(&alice_key, b"alice secret");
2977        let alice_bytes = rmp_serde::to_vec(&alice_env).unwrap();
2978        let alice_blob_id = {
2979            let mut h = Sha256::new();
2980            h.update(&alice_bytes);
2981            let r: [u8; 32] = h.finalize().into();
2982            r
2983        };
2984        storage
2985            .store(routing_id, alice_blob_id, None, 3600, alice_bytes)
2986            .await
2987            .unwrap();
2988
2989        // Store bob's message (should be visible without auth).
2990        let bob_env = test_seal(&bob_key, b"bob public");
2991        let bob_bytes = rmp_serde::to_vec(&bob_env).unwrap();
2992        let bob_blob_id = {
2993            let mut h = Sha256::new();
2994            h.update(&bob_bytes);
2995            let r: [u8; 32] = h.finalize().into();
2996            r
2997        };
2998        storage
2999            .store(routing_id, bob_blob_id, None, 3600, bob_bytes)
3000            .await
3001            .unwrap();
3002
3003        let state = test_state_with(projected_map, storage);
3004        let routing_hex = hex_encode(&routing_id);
3005
3006        // Request feed WITHOUT auth — should only include bob's message.
3007        let router = broadcast_projection_router(Arc::clone(&state));
3008        let req = Request::builder()
3009            .uri(format!("/scp/broadcast/{routing_hex}/feed"))
3010            .body(Body::empty())
3011            .unwrap();
3012
3013        let resp = router.oneshot(req).await.unwrap();
3014        assert_eq!(resp.status(), HttpStatus::OK);
3015
3016        // Verify private cache-control (response varies by auth).
3017        let cache_control = resp
3018            .headers()
3019            .get(header::CACHE_CONTROL)
3020            .unwrap()
3021            .to_str()
3022            .unwrap();
3023        assert_eq!(cache_control, "private, max-age=30");
3024
3025        let body = resp.into_body().collect().await.unwrap().to_bytes();
3026        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
3027        let messages = json["messages"].as_array().unwrap();
3028
3029        // Only bob's message should be in the feed.
3030        assert_eq!(
3031            messages.len(),
3032            1,
3033            "feed should filter alice's gated message"
3034        );
3035        assert_eq!(messages[0]["author_did"], "did:dht:bob");
3036
3037        // Request feed WITH valid UCAN — should include both messages.
3038        let ucan_token = build_test_ucan(context_id);
3039        let router = broadcast_projection_router(state);
3040        let req = Request::builder()
3041            .uri(format!("/scp/broadcast/{routing_hex}/feed"))
3042            .header("Authorization", format!("Bearer {ucan_token}"))
3043            .body(Body::empty())
3044            .unwrap();
3045
3046        let resp = router.oneshot(req).await.unwrap();
3047        assert_eq!(resp.status(), HttpStatus::OK);
3048
3049        let body = resp.into_body().collect().await.unwrap().to_bytes();
3050        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
3051        let messages = json["messages"].as_array().unwrap();
3052        assert_eq!(
3053            messages.len(),
3054            2,
3055            "authenticated feed should include both messages"
3056        );
3057    }
3058
3059    // -----------------------------------------------------------------------
3060    // SCP-GG-008: UCAN temporal validation tests
3061    // -----------------------------------------------------------------------
3062
3063    #[test]
3064    fn validate_ucan_rejects_expired_token() {
3065        // Build a UCAN with exp in the past.
3066        use base64::engine::general_purpose::URL_SAFE_NO_PAD;
3067        use scp_core::crypto::ucan::{Attenuation, UcanHeader, UcanPayload};
3068
3069        let header = UcanHeader {
3070            alg: "EdDSA".to_owned(),
3071            typ: "JWT".to_owned(),
3072            ucv: "0.10.0".to_owned(),
3073            kid: None,
3074        };
3075        let payload = UcanPayload {
3076            iss: "did:dht:test".to_owned(),
3077            aud: "did:dht:test".to_owned(),
3078            exp: 1, // expired (1970-01-01T00:00:01)
3079            nbf: None,
3080            nnc: "nonce".to_owned(),
3081            att: vec![Attenuation {
3082                with: "scp:ctx:test_ctx/messages:read".to_owned(),
3083                can: "read".to_owned(),
3084            }],
3085            prf: vec![],
3086            fct: None,
3087        };
3088
3089        let h = serde_json::to_vec(&header).unwrap();
3090        let p = serde_json::to_vec(&payload).unwrap();
3091        let token = format!(
3092            "{}.{}.{}",
3093            URL_SAFE_NO_PAD.encode(&h),
3094            URL_SAFE_NO_PAD.encode(&p),
3095            URL_SAFE_NO_PAD.encode(vec![0u8; 64]),
3096        );
3097
3098        let result = validate_projection_ucan(&token, "test_ctx");
3099        assert!(result.is_err(), "expired UCAN should be rejected");
3100    }
3101
3102    #[test]
3103    fn validate_ucan_rejects_not_yet_valid_token() {
3104        use base64::engine::general_purpose::URL_SAFE_NO_PAD;
3105        use scp_core::crypto::ucan::{Attenuation, UcanHeader, UcanPayload};
3106
3107        let header = UcanHeader {
3108            alg: "EdDSA".to_owned(),
3109            typ: "JWT".to_owned(),
3110            ucv: "0.10.0".to_owned(),
3111            kid: None,
3112        };
3113        let payload = UcanPayload {
3114            iss: "did:dht:test".to_owned(),
3115            aud: "did:dht:test".to_owned(),
3116            exp: u64::MAX,
3117            nbf: Some(u64::MAX - 1), // not valid until far future
3118            nnc: "nonce".to_owned(),
3119            att: vec![Attenuation {
3120                with: "scp:ctx:test_ctx/messages:read".to_owned(),
3121                can: "read".to_owned(),
3122            }],
3123            prf: vec![],
3124            fct: None,
3125        };
3126
3127        let h = serde_json::to_vec(&header).unwrap();
3128        let p = serde_json::to_vec(&payload).unwrap();
3129        let token = format!(
3130            "{}.{}.{}",
3131            URL_SAFE_NO_PAD.encode(&h),
3132            URL_SAFE_NO_PAD.encode(&p),
3133            URL_SAFE_NO_PAD.encode(vec![0u8; 64]),
3134        );
3135
3136        let result = validate_projection_ucan(&token, "test_ctx");
3137        assert!(result.is_err(), "not-yet-valid UCAN should be rejected");
3138    }
3139
3140    // -----------------------------------------------------------------------
3141    // SCP-GG-008: per-author override handler integration test
3142    // -----------------------------------------------------------------------
3143
3144    #[tokio::test]
3145    async fn message_handler_enforces_per_author_gated_override_on_open_context() {
3146        use scp_core::crypto::sender_keys::broadcast::rotate_broadcast_key;
3147
3148        // Open context with a per-author Gated override for "did:dht:alice".
3149        // Default is Public (because Open admission + no default_rule override),
3150        // but alice's content requires auth due to the per-author override.
3151        let alice_key = generate_broadcast_key("did:dht:alice");
3152        let bob_key_epoch0 = generate_broadcast_key("did:dht:bob");
3153        // Rotate bob's key to epoch 1 so it doesn't collide with alice's epoch 0.
3154        let (bob_key, _) = rotate_broadcast_key(&bob_key_epoch0, 1_000).unwrap();
3155        let context_id = "open_per_author_override_ctx";
3156
3157        let policy = ProjectionPolicy {
3158            default_rule: ProjectionRule::Public,
3159            overrides: vec![ProjectionOverride {
3160                did: DID::from("did:dht:alice"),
3161                rule: ProjectionRule::Gated,
3162            }],
3163        };
3164
3165        let mut projected = ProjectedContext::new(
3166            context_id,
3167            alice_key.clone(),
3168            BroadcastAdmission::Open,
3169            Some(policy),
3170        );
3171        // Insert bob's key (epoch 1) so messages from both authors can be decrypted.
3172        projected.keys.insert(bob_key.epoch(), bob_key.clone());
3173        let routing_id = projected.routing_id;
3174
3175        let mut projected_map = HashMap::new();
3176        projected_map.insert(routing_id, projected);
3177
3178        let storage = InMemoryBlobStorage::new();
3179
3180        // Store a message from alice (who has a Gated override).
3181        let alice_envelope = test_seal(&alice_key, b"alice private content");
3182        let alice_blob_bytes = rmp_serde::to_vec(&alice_envelope).unwrap();
3183        let alice_blob_id = {
3184            let mut h = Sha256::new();
3185            h.update(&alice_blob_bytes);
3186            let r: [u8; 32] = h.finalize().into();
3187            r
3188        };
3189        storage
3190            .store(routing_id, alice_blob_id, None, 3600, alice_blob_bytes)
3191            .await
3192            .unwrap();
3193
3194        // Store a message from bob (no override — default Public applies).
3195        let bob_envelope = test_seal(&bob_key, b"bob public content");
3196        let bob_blob_bytes = rmp_serde::to_vec(&bob_envelope).unwrap();
3197        let bob_blob_id = {
3198            let mut h = Sha256::new();
3199            h.update(&bob_blob_bytes);
3200            let r: [u8; 32] = h.finalize().into();
3201            r
3202        };
3203        storage
3204            .store(routing_id, bob_blob_id, None, 3600, bob_blob_bytes)
3205            .await
3206            .unwrap();
3207
3208        let state = test_state_with(projected_map, storage);
3209        let routing_hex = hex_encode(&routing_id);
3210
3211        // Request alice's message without auth → should be rejected (401)
3212        // because the per-author override makes alice's content Gated.
3213        let alice_hex = hex_encode(&alice_blob_id);
3214        let router = broadcast_projection_router(Arc::clone(&state));
3215        let req = Request::builder()
3216            .uri(format!("/scp/broadcast/{routing_hex}/messages/{alice_hex}"))
3217            .body(Body::empty())
3218            .unwrap();
3219
3220        let resp = router.oneshot(req).await.unwrap();
3221        assert_eq!(
3222            resp.status(),
3223            HttpStatus::UNAUTHORIZED,
3224            "alice's content should require auth due to per-author Gated override"
3225        );
3226
3227        // Request bob's message without auth → should succeed (200)
3228        // because bob has no override and the default is Public.
3229        let bob_hex = hex_encode(&bob_blob_id);
3230        let router = broadcast_projection_router(Arc::clone(&state));
3231        let req = Request::builder()
3232            .uri(format!("/scp/broadcast/{routing_hex}/messages/{bob_hex}"))
3233            .body(Body::empty())
3234            .unwrap();
3235
3236        let resp = router.oneshot(req).await.unwrap();
3237        assert_eq!(
3238            resp.status(),
3239            HttpStatus::OK,
3240            "bob's content should be public (no per-author override)"
3241        );
3242
3243        // Request alice's message with a valid UCAN → should succeed (200).
3244        let ucan_token = build_test_ucan(context_id);
3245        let router = broadcast_projection_router(state);
3246        let req = Request::builder()
3247            .uri(format!("/scp/broadcast/{routing_hex}/messages/{alice_hex}"))
3248            .header("Authorization", format!("Bearer {ucan_token}"))
3249            .body(Body::empty())
3250            .unwrap();
3251
3252        let resp = router.oneshot(req).await.unwrap();
3253        assert_eq!(
3254            resp.status(),
3255            HttpStatus::OK,
3256            "alice's content should be accessible with valid UCAN"
3257        );
3258
3259        // Verify private cache-control on alice's gated content.
3260        let cache_control = resp
3261            .headers()
3262            .get(header::CACHE_CONTROL)
3263            .unwrap()
3264            .to_str()
3265            .unwrap();
3266        assert_eq!(cache_control, "private, immutable, max-age=31536000");
3267    }
3268
3269    // -----------------------------------------------------------------------
3270    // Helper: build a structurally valid test UCAN JWT
3271    // -----------------------------------------------------------------------
3272
3273    /// Builds a structurally valid (but not cryptographically verified) UCAN JWT
3274    /// with a `messages:read` capability for the given context. The signature is
3275    /// a dummy — suitable for testing the structural validation path in
3276    /// `validate_projection_ucan`, not the full 11-step pipeline.
3277    fn build_test_ucan(context_id: &str) -> String {
3278        use base64::engine::general_purpose::URL_SAFE_NO_PAD;
3279        use scp_core::crypto::ucan::{Attenuation, UcanHeader, UcanPayload};
3280
3281        let header = UcanHeader {
3282            alg: "EdDSA".to_owned(),
3283            typ: "JWT".to_owned(),
3284            ucv: "0.10.0".to_owned(),
3285            kid: None,
3286        };
3287        let payload = UcanPayload {
3288            iss: "did:dht:test_issuer".to_owned(),
3289            aud: "did:dht:test_audience".to_owned(),
3290            exp: u64::MAX,
3291            nbf: None,
3292            nnc: "test-nonce-001".to_owned(),
3293            att: vec![Attenuation {
3294                with: format!("scp:ctx:{context_id}/messages:read"),
3295                can: "read".to_owned(),
3296            }],
3297            prf: vec![],
3298            fct: None,
3299        };
3300
3301        let header_json = serde_json::to_vec(&header).unwrap();
3302        let payload_json = serde_json::to_vec(&payload).unwrap();
3303        // Dummy signature (32 bytes of zeros). Not cryptographically valid,
3304        // but parse_ucan only checks structural format.
3305        let signature = vec![0u8; 64];
3306
3307        format!(
3308            "{}.{}.{}",
3309            URL_SAFE_NO_PAD.encode(&header_json),
3310            URL_SAFE_NO_PAD.encode(&payload_json),
3311            URL_SAFE_NO_PAD.encode(&signature),
3312        )
3313    }
3314}