Skip to main content

hashtree_cli/server/
blossom.rs

1//! Blossom protocol implementation (BUD-01, BUD-02)
2//!
3//! Implements blob storage endpoints with Nostr-based authentication.
4//! See: https://github.com/hzrd149/blossom
5
6use axum::{
7    body::Body,
8    extract::{Path, Query, State},
9    http::{header, HeaderMap, Response, StatusCode},
10    response::IntoResponse,
11    Json,
12};
13use base64::Engine;
14use hashtree_core::from_hex;
15use serde::{Deserialize, Serialize};
16use sha2::{Digest, Sha256};
17use std::collections::HashSet;
18use std::sync::{Mutex, OnceLock};
19use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
20
21use super::auth::AppState;
22use super::blob_read::{acquire_blob_read, acquire_blob_write, blob_read_timeout, BLOB_READ_BUSY};
23use super::ingest_filter::{
24    content_type_base, is_chk_content_type, validate_untrusted_blob, IngestRejection,
25};
26use super::mime::get_mime_type;
27
28/// Blossom authorization event kind (NIP-98 style)
29const BLOSSOM_AUTH_KIND: u16 = 24242;
30
31/// Cache-Control header for immutable content-addressed data (1 year)
32const IMMUTABLE_CACHE_CONTROL: &str = "public, max-age=31536000, immutable";
33const NOT_FOUND_CACHE_CONTROL: &str = "no-store";
34const IMMUTABLE_NOT_FOUND_CACHE_CONTROL: &str = "public, max-age=0, s-maxage=5";
35const OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS_ENV: &str = "HTREE_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS";
36const DEFAULT_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS: u64 = 15_000;
37
38/// Default maximum upload size in bytes (5 MB)
39pub const DEFAULT_MAX_UPLOAD_SIZE: usize = 5 * 1024 * 1024;
40const OPTIMISTIC_UPLOAD_MIN_QUEUE_CHARGE_BYTES: usize = 256 * 1024;
41const MAX_BATCH_UPLOAD_BLOBS: usize = 1024;
42const MAX_BATCH_UPLOAD_BYTES: usize = 64 * 1024 * 1024;
43const MAX_UPLOAD_CHECK_HASHES: usize = 10_000;
44const SLOW_BATCH_UPLOAD_LOG_MS_ENV: &str = "HTREE_SLOW_BATCH_UPLOAD_LOG_MS";
45
46fn slow_batch_upload_log_ms() -> Option<u128> {
47    std::env::var(SLOW_BATCH_UPLOAD_LOG_MS_ENV)
48        .ok()
49        .and_then(|value| value.parse::<u128>().ok())
50        .filter(|value| *value > 0)
51}
52
53#[derive(Debug, Clone, Copy)]
54pub(super) struct OptimisticUploadQueueSnapshot {
55    pub enabled: bool,
56    pub max_bytes: usize,
57    pub available_bytes: usize,
58    pub reserved_bytes: usize,
59    pub in_flight: usize,
60    pub queue_timeout_ms: u64,
61}
62
63/// Check if a pubkey has write access based on allowed_npubs config or social graph
64/// Returns Ok(()) if allowed, Err with JSON error body if denied
65#[allow(clippy::result_large_err)]
66fn check_write_access(state: &AppState, pubkey: &str) -> Result<(), Response<Body>> {
67    // Check if pubkey is in the allowed list (converted from npub to hex)
68    if is_allowed_write_author(state, pubkey) {
69        tracing::debug!(
70            "Blossom write allowed for {}... (allowed writer)",
71            &pubkey[..8.min(pubkey.len())]
72        );
73        return Ok(());
74    }
75
76    // Not in allowed list or social graph
77    tracing::info!(
78        "Blossom write denied for {}... (not in allowed_npubs or social graph)",
79        &pubkey[..8.min(pubkey.len())]
80    );
81    Err(Response::builder()
82        .status(StatusCode::FORBIDDEN)
83        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
84        .header(header::CONTENT_TYPE, "application/json")
85        .body(Body::from(
86            r#"{"error":"Write access denied. Your pubkey is not in the allowed list."}"#,
87        ))
88        .unwrap())
89}
90
91fn is_allowed_write_author(state: &AppState, pubkey: &str) -> bool {
92    if state.allowed_pubkeys.contains(pubkey) {
93        return true;
94    }
95
96    state
97        .social_graph
98        .as_ref()
99        .map(|sg| sg.check_write_access(pubkey))
100        .unwrap_or(false)
101}
102
103fn can_accept_upload_author(state: &AppState, pubkey: &str) -> bool {
104    state.public_writes || is_allowed_write_author(state, pubkey)
105}
106
107fn validate_upload_payload(
108    body: &[u8],
109    content_type: &str,
110    can_upload_author: bool,
111    require_random_untrusted_ingest: bool,
112) -> Result<(), (StatusCode, String)> {
113    let is_chk_upload = is_chk_content_type(content_type);
114
115    if !is_chk_upload && !can_upload_author {
116        return Err((
117            StatusCode::FORBIDDEN,
118            "Raw media uploads require write access".to_string(),
119        ));
120    }
121
122    if is_chk_upload {
123        let require_random = require_random_untrusted_ingest && !can_upload_author;
124        validate_untrusted_blob(body, require_random)
125            .map_err(|IngestRejection { status, reason }| (status, reason))?;
126    }
127
128    Ok(())
129}
130
131fn blossom_json_error(status: StatusCode, reason: impl Into<String>) -> Response<Body> {
132    let reason = reason.into();
133    Response::builder()
134        .status(status)
135        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
136        .header("X-Reason", reason.as_str())
137        .header(header::CONTENT_TYPE, "application/json")
138        .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
139        .unwrap()
140}
141
142fn blossom_retryable_json_error(
143    status: StatusCode,
144    reason: impl Into<String>,
145    retry_after_seconds: u64,
146) -> Response<Body> {
147    let reason = reason.into();
148    Response::builder()
149        .status(status)
150        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
151        .header("X-Reason", reason.as_str())
152        .header(header::RETRY_AFTER, retry_after_seconds.to_string())
153        .header(header::CONTENT_TYPE, "application/json")
154        .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
155        .unwrap()
156}
157
158/// Blob descriptor returned by upload and list endpoints
159#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct BlobDescriptor {
161    pub url: String,
162    pub sha256: String,
163    pub size: u64,
164    #[serde(rename = "type")]
165    pub mime_type: String,
166    pub uploaded: u64,
167}
168
169#[derive(Debug, Deserialize)]
170pub struct BatchUploadBlob {
171    pub sha256: String,
172    #[serde(default, alias = "contentType")]
173    pub content_type: Option<String>,
174    pub data: String,
175}
176
177#[derive(Debug, Deserialize)]
178pub struct BatchUploadRequest {
179    pub blobs: Vec<BatchUploadBlob>,
180}
181
182#[derive(Debug, Serialize)]
183pub struct BatchUploadResponse {
184    pub uploaded: usize,
185    pub blobs: Vec<BlobDescriptor>,
186}
187
188#[derive(Debug, Deserialize)]
189pub struct UploadCheckRequest {
190    pub hashes: Vec<String>,
191}
192
193#[derive(Debug, Serialize, Deserialize)]
194pub struct UploadCheckResponse {
195    pub count: usize,
196    pub present: String,
197}
198
199/// Query parameters for list endpoint
200#[derive(Debug, Deserialize)]
201pub struct ListQuery {
202    pub since: Option<u64>,
203    pub until: Option<u64>,
204    pub limit: Option<usize>,
205    pub cursor: Option<String>,
206}
207
208/// Parsed Nostr authorization event
209#[derive(Debug)]
210pub struct BlossomAuth {
211    pub pubkey: String,
212    pub kind: u16,
213    pub created_at: u64,
214    pub expiration: Option<u64>,
215    pub action: Option<String>,   // "upload", "delete", "list", "get"
216    pub blob_hashes: Vec<String>, // x tags
217    pub server: Option<String>,   // server tag
218}
219
220/// Parse and verify Nostr authorization from header
221/// Returns the verified auth or an error response
222pub fn verify_blossom_auth(
223    headers: &HeaderMap,
224    required_action: &str,
225    required_hash: Option<&str>,
226) -> Result<BlossomAuth, (StatusCode, &'static str)> {
227    let auth_header = headers
228        .get(header::AUTHORIZATION)
229        .and_then(|v| v.to_str().ok())
230        .ok_or((StatusCode::UNAUTHORIZED, "Missing Authorization header"))?;
231
232    let nostr_event = auth_header.strip_prefix("Nostr ").ok_or((
233        StatusCode::UNAUTHORIZED,
234        "Invalid auth scheme, expected 'Nostr'",
235    ))?;
236
237    // Decode base64 event
238    let engine = base64::engine::general_purpose::STANDARD;
239    let event_bytes = engine
240        .decode(nostr_event)
241        .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid base64 in auth header"))?;
242
243    let event_json: serde_json::Value = serde_json::from_slice(&event_bytes)
244        .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid JSON in auth event"))?;
245
246    // Extract event fields
247    let kind = event_json["kind"]
248        .as_u64()
249        .ok_or((StatusCode::BAD_REQUEST, "Missing kind in event"))?;
250
251    if kind != BLOSSOM_AUTH_KIND as u64 {
252        return Err((
253            StatusCode::BAD_REQUEST,
254            "Invalid event kind, expected 24242",
255        ));
256    }
257
258    let pubkey = event_json["pubkey"]
259        .as_str()
260        .ok_or((StatusCode::BAD_REQUEST, "Missing pubkey in event"))?
261        .to_string();
262
263    let created_at = event_json["created_at"]
264        .as_u64()
265        .ok_or((StatusCode::BAD_REQUEST, "Missing created_at in event"))?;
266
267    let sig = event_json["sig"]
268        .as_str()
269        .ok_or((StatusCode::BAD_REQUEST, "Missing signature in event"))?;
270
271    // Verify signature
272    if !verify_nostr_signature(&event_json, &pubkey, sig) {
273        return Err((StatusCode::UNAUTHORIZED, "Invalid signature"));
274    }
275
276    // Parse tags
277    let tags = event_json["tags"]
278        .as_array()
279        .ok_or((StatusCode::BAD_REQUEST, "Missing tags in event"))?;
280
281    let mut expiration: Option<u64> = None;
282    let mut action: Option<String> = None;
283    let mut blob_hashes: Vec<String> = Vec::new();
284    let mut server: Option<String> = None;
285
286    for tag in tags {
287        let tag_arr = tag.as_array();
288        if let Some(arr) = tag_arr {
289            if arr.len() >= 2 {
290                let tag_name = arr[0].as_str().unwrap_or("");
291                let tag_value = arr[1].as_str().unwrap_or("");
292
293                match tag_name {
294                    "t" => action = Some(tag_value.to_string()),
295                    "x" => blob_hashes.push(tag_value.to_lowercase()),
296                    "expiration" => expiration = tag_value.parse().ok(),
297                    "server" => server = Some(tag_value.to_string()),
298                    _ => {}
299                }
300            }
301        }
302    }
303
304    // Validate expiration
305    let now = SystemTime::now()
306        .duration_since(UNIX_EPOCH)
307        .unwrap()
308        .as_secs();
309
310    if let Some(exp) = expiration {
311        if exp < now {
312            return Err((StatusCode::UNAUTHORIZED, "Authorization expired"));
313        }
314    }
315
316    // Validate created_at is not in the future (with 60s tolerance)
317    if created_at > now + 60 {
318        return Err((StatusCode::BAD_REQUEST, "Event created_at is in the future"));
319    }
320
321    // Validate action matches
322    if let Some(ref act) = action {
323        if act != required_action {
324            return Err((StatusCode::FORBIDDEN, "Action mismatch"));
325        }
326    } else {
327        return Err((StatusCode::BAD_REQUEST, "Missing 't' tag for action"));
328    }
329
330    // Validate hash if required
331    if let Some(hash) = required_hash {
332        if !blob_hashes.is_empty() && !blob_hashes.contains(&hash.to_lowercase()) {
333            return Err((StatusCode::FORBIDDEN, "Blob hash not authorized"));
334        }
335    }
336
337    Ok(BlossomAuth {
338        pubkey,
339        kind: kind as u16,
340        created_at,
341        expiration,
342        action,
343        blob_hashes,
344        server,
345    })
346}
347
348/// Verify Nostr event signature using secp256k1
349fn verify_nostr_signature(event: &serde_json::Value, pubkey: &str, sig: &str) -> bool {
350    use secp256k1::{schnorr::Signature, Message, Secp256k1, XOnlyPublicKey};
351
352    // Compute event ID (sha256 of serialized event)
353    let content = event["content"].as_str().unwrap_or("");
354    let full_serialized = format!(
355        "[0,\"{}\",{},{},{},\"{}\"]",
356        pubkey,
357        event["created_at"],
358        event["kind"],
359        event["tags"],
360        escape_json_string(content),
361    );
362
363    let mut hasher = Sha256::new();
364    hasher.update(full_serialized.as_bytes());
365    let event_id = hasher.finalize();
366
367    // Parse pubkey and signature
368    let pubkey_bytes = match hex::decode(pubkey) {
369        Ok(b) => b,
370        Err(_) => return false,
371    };
372
373    let sig_bytes = match hex::decode(sig) {
374        Ok(b) => b,
375        Err(_) => return false,
376    };
377
378    let secp = Secp256k1::verification_only();
379
380    let xonly_pubkey = match XOnlyPublicKey::from_slice(&pubkey_bytes) {
381        Ok(pk) => pk,
382        Err(_) => return false,
383    };
384
385    let signature = match Signature::from_slice(&sig_bytes) {
386        Ok(s) => s,
387        Err(_) => return false,
388    };
389
390    let message = match Message::from_digest_slice(&event_id) {
391        Ok(m) => m,
392        Err(_) => return false,
393    };
394
395    secp.verify_schnorr(&signature, &message, &xonly_pubkey)
396        .is_ok()
397}
398
399/// Escape string for JSON serialization
400fn escape_json_string(s: &str) -> String {
401    let mut result = String::new();
402    for c in s.chars() {
403        match c {
404            '"' => result.push_str("\\\""),
405            '\\' => result.push_str("\\\\"),
406            '\n' => result.push_str("\\n"),
407            '\r' => result.push_str("\\r"),
408            '\t' => result.push_str("\\t"),
409            c if c.is_control() => {
410                result.push_str(&format!("\\u{:04x}", c as u32));
411            }
412            c => result.push(c),
413        }
414    }
415    result
416}
417
418/// CORS preflight handler for all Blossom endpoints
419/// Echoes back Access-Control-Request-Headers to allow any headers
420pub async fn cors_preflight(headers: HeaderMap) -> impl IntoResponse {
421    // Echo back requested headers, or use sensible defaults that cover common Blossom headers
422    let allowed_headers = headers
423        .get(header::ACCESS_CONTROL_REQUEST_HEADERS)
424        .and_then(|v| v.to_str().ok())
425        .unwrap_or("Authorization, Content-Type, X-SHA-256, x-sha-256");
426
427    // Always include common headers in addition to what was requested
428    let full_allowed = format!(
429        "{}, Authorization, Content-Type, X-SHA-256, x-sha-256, Accept, Cache-Control",
430        allowed_headers
431    );
432
433    Response::builder()
434        .status(StatusCode::NO_CONTENT)
435        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
436        .header(
437            header::ACCESS_CONTROL_ALLOW_METHODS,
438            "GET, HEAD, POST, PUT, DELETE, OPTIONS",
439        )
440        .header(header::ACCESS_CONTROL_ALLOW_HEADERS, full_allowed)
441        .header(header::ACCESS_CONTROL_MAX_AGE, "86400")
442        .body(Body::empty())
443        .unwrap()
444}
445
446fn encode_upload_check_bitset(bits: &[bool]) -> String {
447    let mut bytes = vec![0u8; bits.len().div_ceil(8)];
448    for (index, present) in bits.iter().enumerate() {
449        if *present {
450            bytes[index / 8] |= 1 << (index % 8);
451        }
452    }
453    base64::engine::general_purpose::STANDARD.encode(bytes)
454}
455
456/// POST /upload/check - Batch-check blob presence for upload planning.
457pub async fn upload_check(
458    State(state): State<AppState>,
459    Json(payload): Json<UploadCheckRequest>,
460) -> impl IntoResponse {
461    if payload.hashes.len() > MAX_UPLOAD_CHECK_HASHES {
462        return blossom_json_error(
463            StatusCode::PAYLOAD_TOO_LARGE,
464            format!("Too many hashes; maximum is {}", MAX_UPLOAD_CHECK_HASHES),
465        );
466    }
467
468    let mut requested = Vec::with_capacity(payload.hashes.len());
469    let mut unique = Vec::new();
470    for hash in payload.hashes {
471        let hash = hash.trim().to_ascii_lowercase();
472        if !is_valid_sha256(&hash) {
473            return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid SHA256 hash");
474        }
475        let bytes: [u8; 32] = match from_hex(&hash) {
476            Ok(bytes) => bytes,
477            Err(_) => return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid SHA256 hash"),
478        };
479        requested.push(bytes);
480        unique.push(bytes);
481    }
482
483    unique.sort_unstable();
484    unique.dedup();
485
486    let existing = if unique.is_empty() {
487        Vec::new()
488    } else {
489        let permit = match acquire_blob_read().await {
490            Ok(permit) => permit,
491            Err(_) => {
492                return blossom_retryable_json_error(
493                    StatusCode::SERVICE_UNAVAILABLE,
494                    BLOB_READ_BUSY,
495                    1,
496                );
497            }
498        };
499        let store = state.store.clone();
500        let lookup_hashes = unique.clone();
501        let lookup = tokio::task::spawn_blocking(move || {
502            store
503                .router()
504                .existing_local_hashes_in_sorted_candidates(&lookup_hashes)
505        });
506        let result = tokio::time::timeout(blob_read_timeout(), lookup).await;
507        drop(permit);
508        match result {
509            Ok(Ok(Ok(existing))) => existing,
510            Ok(Ok(Err(error))) => {
511                tracing::debug!("Blossom upload check failed: {}", error);
512                return blossom_json_error(StatusCode::INTERNAL_SERVER_ERROR, "Storage error");
513            }
514            Ok(Err(error)) => {
515                tracing::debug!("Blossom upload check task failed: {}", error);
516                return blossom_json_error(StatusCode::INTERNAL_SERVER_ERROR, "Storage error");
517            }
518            Err(_) => {
519                return blossom_retryable_json_error(
520                    StatusCode::SERVICE_UNAVAILABLE,
521                    "Blob check timed out",
522                    1,
523                );
524            }
525        }
526    };
527
528    let present_unique: HashSet<[u8; 32]> = unique
529        .into_iter()
530        .zip(existing)
531        .filter_map(|(hash, present)| present.then_some(hash))
532        .collect();
533    let present_bits: Vec<bool> = requested
534        .iter()
535        .map(|hash| present_unique.contains(hash))
536        .collect();
537
538    Response::builder()
539        .status(StatusCode::OK)
540        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
541        .header(header::CONTENT_TYPE, "application/json")
542        .body(Body::from(
543            serde_json::to_string(&UploadCheckResponse {
544                count: present_bits.len(),
545                present: encode_upload_check_bitset(&present_bits),
546            })
547            .unwrap(),
548        ))
549        .unwrap()
550}
551
552/// HEAD /<sha256> - Check if blob exists
553pub async fn head_blob(
554    State(state): State<AppState>,
555    Path(id): Path<String>,
556    connect_info: axum::extract::ConnectInfo<std::net::SocketAddr>,
557) -> impl IntoResponse {
558    let is_localhost = connect_info.0.ip().is_loopback();
559    let (hash_part, ext) = parse_hash_and_extension(&id);
560
561    if !is_valid_sha256(hash_part) {
562        return Response::builder()
563            .status(StatusCode::BAD_REQUEST)
564            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
565            .header("X-Reason", "Invalid SHA256 hash")
566            .body(Body::empty())
567            .unwrap();
568    }
569
570    let sha256_hex = hash_part.to_lowercase();
571    let sha256_bytes: [u8; 32] = match from_hex(&sha256_hex) {
572        Ok(b) => b,
573        Err(_) => {
574            return Response::builder()
575                .status(StatusCode::BAD_REQUEST)
576                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
577                .header("X-Reason", "Invalid SHA256 format")
578                .body(Body::empty())
579                .unwrap();
580        }
581    };
582
583    // Blossom HEAD only needs metadata; avoid reading the full blob body just to
584    // answer cache probes and CDN revalidation. The read permit keeps CDN probe
585    // storms from filling Tokio's blocking thread pool while old blobs without
586    // metadata are still being normalized.
587    let blob_size = if let Some(cached) = state.blob_cache.get_size(&sha256_hex) {
588        Ok(Ok(cached))
589    } else {
590        let permit = match acquire_blob_read().await {
591            Ok(permit) => permit,
592            Err(_) => {
593                return Response::builder()
594                    .status(StatusCode::SERVICE_UNAVAILABLE)
595                    .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
596                    .header(header::CACHE_CONTROL, NOT_FOUND_CACHE_CONTROL)
597                    .header("Retry-After", "1")
598                    .header("X-Reason", BLOB_READ_BUSY)
599                    .body(Body::empty())
600                    .unwrap();
601            }
602        };
603        let store = state.store.clone();
604        let size_read = tokio::task::spawn_blocking(move || store.blob_size(&sha256_bytes));
605        let timed = tokio::time::timeout(blob_read_timeout(), size_read).await;
606        drop(permit);
607        let result = match timed {
608            Ok(result) => result.map_err(|_| ()),
609            Err(_) => Err(()),
610        };
611        if let Ok(Ok(size)) = result {
612            state.blob_cache.put_size(sha256_hex.clone(), size);
613        }
614        result
615    };
616
617    match blob_size {
618        Ok(Ok(Some(size))) => {
619            let mime_type = ext
620                .map(|e| get_mime_type(&format!("file{}", e)))
621                .unwrap_or("application/octet-stream");
622
623            let mut builder = Response::builder()
624                .status(StatusCode::OK)
625                .header(header::CONTENT_TYPE, mime_type)
626                .header(header::CONTENT_LENGTH, size)
627                .header(header::ACCEPT_RANGES, "bytes")
628                .header(header::CACHE_CONTROL, IMMUTABLE_CACHE_CONTROL)
629                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*");
630            if is_localhost {
631                builder = builder.header("X-Source", "local");
632            }
633            builder.body(Body::empty()).unwrap()
634        }
635        Ok(Ok(None)) => Response::builder()
636            .status(StatusCode::NOT_FOUND)
637            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
638            .header(header::CACHE_CONTROL, IMMUTABLE_NOT_FOUND_CACHE_CONTROL)
639            .header("X-Reason", "Blob not found")
640            .body(Body::empty())
641            .unwrap(),
642        _ => Response::builder()
643            .status(StatusCode::INTERNAL_SERVER_ERROR)
644            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
645            .body(Body::empty())
646            .unwrap(),
647    }
648}
649
650async fn store_blossom_blob_without_blocking_runtime(
651    state: &AppState,
652    data: axum::body::Bytes,
653    pubkey: [u8; 32],
654    track_ownership: bool,
655) -> anyhow::Result<()> {
656    let mut hasher = Sha256::new();
657    hasher.update(&data);
658    let hash_hex = hex::encode(hasher.finalize());
659    let data_for_cache = data.clone();
660    let permit = acquire_blob_write()
661        .await
662        .map_err(|err| anyhow::anyhow!(err))?;
663    let store = state.store.clone();
664    tokio::task::spawn_blocking(move || {
665        let _permit = permit;
666        if track_ownership {
667            store.put_owned_blob(&data, &pubkey)?;
668        } else {
669            store.put_cached_blob(&data)?;
670        }
671        Ok::<(), anyhow::Error>(())
672    })
673    .await
674    .map_err(|err| anyhow::anyhow!("blob write task failed: {}", err))??;
675    state
676        .blob_cache
677        .put_size(hash_hex.clone(), Some(data_for_cache.len() as u64));
678    state.blob_cache.put_body(hash_hex, &data_for_cache);
679    Ok(())
680}
681
682fn upload_descriptor_response(status: StatusCode, descriptor: &BlobDescriptor) -> Response<Body> {
683    Response::builder()
684        .status(status)
685        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
686        .header(header::CONTENT_TYPE, "application/json")
687        .body(Body::from(serde_json::to_string(descriptor).unwrap()))
688        .unwrap()
689}
690
691fn optimistic_upload_queue_timeout() -> Duration {
692    let millis = std::env::var(OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS_ENV)
693        .ok()
694        .and_then(|value| value.parse::<u64>().ok())
695        .filter(|value| *value > 0)
696        .unwrap_or(DEFAULT_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS);
697    Duration::from_millis(millis)
698}
699
700fn optimistic_upload_inflight() -> &'static Mutex<HashSet<String>> {
701    static INFLIGHT: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
702    INFLIGHT.get_or_init(|| Mutex::new(HashSet::new()))
703}
704
705fn optimistic_upload_is_inflight(hash_hex: &str) -> bool {
706    optimistic_upload_inflight()
707        .lock()
708        .is_ok_and(|inflight| inflight.contains(hash_hex))
709}
710
711fn mark_optimistic_upload_inflight(hash_hex: &str) -> bool {
712    optimistic_upload_inflight()
713        .lock()
714        .map(|mut inflight| inflight.insert(hash_hex.to_string()))
715        .unwrap_or(true)
716}
717
718fn clear_optimistic_upload_inflight(hash_hex: &str) {
719    if let Ok(mut inflight) = optimistic_upload_inflight().lock() {
720        inflight.remove(hash_hex);
721    }
722}
723
724pub(super) fn optimistic_upload_queue_snapshot(state: &AppState) -> OptimisticUploadQueueSnapshot {
725    let max_bytes = state.optimistic_upload_queue_bytes;
726    let available_bytes = state
727        .optimistic_upload_queue
728        .available_permits()
729        .min(max_bytes);
730    let in_flight = optimistic_upload_inflight()
731        .lock()
732        .map(|inflight| inflight.len())
733        .unwrap_or(0);
734
735    OptimisticUploadQueueSnapshot {
736        enabled: state.optimistic_blossom_uploads,
737        max_bytes,
738        available_bytes,
739        reserved_bytes: max_bytes.saturating_sub(available_bytes),
740        in_flight,
741        queue_timeout_ms: duration_millis_u64(optimistic_upload_queue_timeout()),
742    }
743}
744
745fn duration_millis_u64(duration: Duration) -> u64 {
746    duration.as_millis().min(u128::from(u64::MAX)) as u64
747}
748
749async fn acquire_optimistic_upload_queue(
750    state: &AppState,
751    permits: u32,
752) -> Result<tokio::sync::OwnedSemaphorePermit, &'static str> {
753    match tokio::time::timeout(
754        optimistic_upload_queue_timeout(),
755        state
756            .optimistic_upload_queue
757            .clone()
758            .acquire_many_owned(permits),
759    )
760    .await
761    {
762        Ok(Ok(permit)) => Ok(permit),
763        Ok(Err(_)) => Err("Optimistic upload queue is closed"),
764        Err(_) => Err("Optimistic upload queue is full"),
765    }
766}
767
768async fn uploaded_blob_already_exists(
769    state: &AppState,
770    sha256_hash: [u8; 32],
771    sha256_hex: &str,
772) -> Result<bool, String> {
773    if let Some(Some(_)) = state.blob_cache.get_size(sha256_hex) {
774        return Ok(true);
775    }
776
777    let permit = acquire_blob_read().await.map_err(str::to_string)?;
778    let store = state.store.clone();
779    let size_read = tokio::task::spawn_blocking(move || {
780        store
781            .blob_size(&sha256_hash)
782            .map_err(|error| error.to_string())
783    });
784
785    let result = tokio::time::timeout(blob_read_timeout(), size_read).await;
786    drop(permit);
787    match result {
788        Ok(Ok(Ok(size))) => {
789            state.blob_cache.put_size(sha256_hex.to_string(), size);
790            Ok(size.is_some())
791        }
792        Ok(Ok(Err(error))) => Err(error),
793        Ok(Err(error)) => Err(format!("blob existence task failed: {}", error)),
794        Err(_) => Err("blob existence check timed out".to_string()),
795    }
796}
797
798async fn set_existing_blob_owner_without_body_write(
799    state: AppState,
800    sha256_hash: [u8; 32],
801    pubkey: [u8; 32],
802) -> anyhow::Result<()> {
803    tokio::task::spawn_blocking(move || state.store.set_blob_owner(&sha256_hash, &pubkey))
804        .await
805        .map_err(|error| anyhow::anyhow!("blob owner task failed: {}", error))??;
806    Ok(())
807}
808
809/// PUT /upload - Upload a new blob (BUD-02)
810pub async fn upload_blob(
811    State(state): State<AppState>,
812    headers: HeaderMap,
813    body: axum::body::Bytes,
814) -> impl IntoResponse {
815    // Check size limit first (before auth to save resources)
816    let max_size = state.max_upload_bytes;
817    if body.len() > max_size {
818        return Response::builder()
819            .status(StatusCode::PAYLOAD_TOO_LARGE)
820            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
821            .header(header::CONTENT_TYPE, "application/json")
822            .body(Body::from(format!(
823                r#"{{"error":"Upload size {} bytes exceeds maximum {} bytes ({} MB)"}}"#,
824                body.len(),
825                max_size,
826                max_size / 1024 / 1024
827            )))
828            .unwrap();
829    }
830
831    // Verify authorization
832    let auth = match verify_blossom_auth(&headers, "upload", None) {
833        Ok(a) => a,
834        Err((status, reason)) => {
835            return Response::builder()
836                .status(status)
837                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
838                .header("X-Reason", reason)
839                .header(header::CONTENT_TYPE, "application/json")
840                .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
841                .unwrap();
842        }
843    };
844
845    // Get content type from header
846    let content_type = headers
847        .get(header::CONTENT_TYPE)
848        .and_then(|v| v.to_str().ok())
849        .unwrap_or("application/octet-stream")
850        .to_string();
851
852    // Check write access: either in allowed_npubs/social graph OR public_writes is enabled.
853    let is_allowed_author = is_allowed_write_author(&state, &auth.pubkey);
854    let can_upload = can_accept_upload_author(&state, &auth.pubkey);
855    if !can_upload {
856        let _ = check_write_access(&state, &auth.pubkey);
857        return Response::builder()
858            .status(StatusCode::FORBIDDEN)
859            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
860            .header(header::CONTENT_TYPE, "application/json")
861            .body(Body::from(r#"{"error":"Write access denied. Your pubkey is not in the allowed list and public writes are disabled."}"#))
862            .unwrap();
863    }
864
865    if let Err((status, reason)) = validate_upload_payload(
866        &body,
867        &content_type,
868        can_upload,
869        state.require_random_untrusted_ingest,
870    ) {
871        return blossom_json_error(status, reason);
872    }
873
874    // Compute SHA256 of uploaded data
875    let mut hasher = Sha256::new();
876    hasher.update(&body);
877    let sha256_hash: [u8; 32] = hasher.finalize().into();
878    let sha256_hex = hex::encode(sha256_hash);
879
880    // If auth has x tags, verify hash matches
881    if !auth.blob_hashes.is_empty() && !auth.blob_hashes.contains(&sha256_hex) {
882        return Response::builder()
883            .status(StatusCode::FORBIDDEN)
884            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
885            .header(
886                "X-Reason",
887                "Uploaded blob hash does not match authorized hash",
888            )
889            .header(header::CONTENT_TYPE, "application/json")
890            .body(Body::from(r#"{"error":"Hash mismatch"}"#))
891            .unwrap();
892    }
893
894    // Convert pubkey hex to bytes
895    let pubkey_bytes = match from_hex(&auth.pubkey) {
896        Ok(b) => b,
897        Err(_) => {
898            return Response::builder()
899                .status(StatusCode::BAD_REQUEST)
900                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
901                .header("X-Reason", "Invalid pubkey format")
902                .body(Body::empty())
903                .unwrap();
904        }
905    };
906
907    let size = body.len() as u64;
908    let now = SystemTime::now()
909        .duration_since(UNIX_EPOCH)
910        .unwrap()
911        .as_secs();
912    let ext = mime_to_extension(&content_type_base(&content_type));
913    let descriptor = BlobDescriptor {
914        url: format!("/{}{}", sha256_hex, ext),
915        sha256: sha256_hex.clone(),
916        size,
917        mime_type: content_type,
918        uploaded: now,
919    };
920
921    if state.optimistic_blossom_uploads && optimistic_upload_is_inflight(&sha256_hex) {
922        return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
923    }
924
925    // Store public-write blobs in cache storage unless the writer is explicitly
926    // allowed, so untrusted public uploads do not become protected owned data.
927    if state.optimistic_blossom_uploads {
928        let queued_bytes = body.len().max(OPTIMISTIC_UPLOAD_MIN_QUEUE_CHARGE_BYTES);
929        if queued_bytes <= state.optimistic_upload_queue_bytes {
930            let permits = queued_bytes as u32;
931            let marked_inflight = mark_optimistic_upload_inflight(&sha256_hex);
932            if !marked_inflight {
933                return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
934            }
935            let permit = match acquire_optimistic_upload_queue(&state, permits).await {
936                Ok(permit) => permit,
937                Err(_) => {
938                    clear_optimistic_upload_inflight(&sha256_hex);
939                    return blossom_retryable_json_error(
940                        StatusCode::SERVICE_UNAVAILABLE,
941                        "Optimistic upload queue is full",
942                        2,
943                    );
944                }
945            };
946            let state_for_write = state.clone();
947            let hash_for_log = sha256_hex.clone();
948            tokio::spawn(async move {
949                let _permit = permit;
950                if let Err(error) = store_blossom_blob_without_blocking_runtime(
951                    &state_for_write,
952                    body,
953                    pubkey_bytes,
954                    is_allowed_author,
955                )
956                .await
957                {
958                    tracing::error!(
959                        "Background Blossom storage failed for {}: {:#}",
960                        hash_for_log,
961                        error
962                    );
963                }
964                clear_optimistic_upload_inflight(&hash_for_log);
965            });
966
967            return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
968        }
969
970        tracing::warn!(
971            "Blossom upload {} is larger than optimistic queue budget {}; storing synchronously",
972            queued_bytes,
973            state.optimistic_upload_queue_bytes
974        );
975    }
976
977    match uploaded_blob_already_exists(&state, sha256_hash, &sha256_hex).await {
978        Ok(true) => {
979            if is_allowed_author {
980                if let Err(error) = set_existing_blob_owner_without_body_write(
981                    state.clone(),
982                    sha256_hash,
983                    pubkey_bytes,
984                )
985                .await
986                {
987                    return Response::builder()
988                        .status(StatusCode::INTERNAL_SERVER_ERROR)
989                        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
990                        .header("X-Reason", "Storage error")
991                        .header(header::CONTENT_TYPE, "application/json")
992                        .body(Body::from(format!(r#"{{"error":"{}"}}"#, error)))
993                        .unwrap();
994                }
995            }
996            return upload_descriptor_response(StatusCode::CONFLICT, &descriptor);
997        }
998        Ok(false) => {}
999        Err(error) => {
1000            tracing::debug!(
1001                "Could not preflight Blossom upload {} before synchronous storage: {}",
1002                sha256_hex,
1003                error
1004            );
1005        }
1006    }
1007
1008    let store_result = store_blossom_blob_without_blocking_runtime(
1009        &state,
1010        body.clone(),
1011        pubkey_bytes,
1012        is_allowed_author,
1013    )
1014    .await;
1015
1016    match store_result {
1017        Ok(()) => upload_descriptor_response(StatusCode::OK, &descriptor),
1018        Err(e) => Response::builder()
1019            .status(StatusCode::INTERNAL_SERVER_ERROR)
1020            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1021            .header("X-Reason", "Storage error")
1022            .header(header::CONTENT_TYPE, "application/json")
1023            .body(Body::from(format!(r#"{{"error":"{}"}}"#, e)))
1024            .unwrap(),
1025    }
1026}
1027
1028/// POST /upload/batch - Upload multiple blobs with one auth event and one storage batch.
1029pub async fn upload_blob_batch(
1030    State(state): State<AppState>,
1031    headers: HeaderMap,
1032    Json(payload): Json<BatchUploadRequest>,
1033) -> impl IntoResponse {
1034    let started_at = Instant::now();
1035    let slow_log_ms = slow_batch_upload_log_ms();
1036    let payload_blobs = payload.blobs.len();
1037    if payload.blobs.is_empty() {
1038        return blossom_json_error(StatusCode::BAD_REQUEST, "Batch is empty");
1039    }
1040    if payload.blobs.len() > MAX_BATCH_UPLOAD_BLOBS {
1041        return blossom_json_error(
1042            StatusCode::PAYLOAD_TOO_LARGE,
1043            "Batch contains too many blobs",
1044        );
1045    }
1046
1047    let auth = match verify_blossom_auth(&headers, "upload", None) {
1048        Ok(a) => a,
1049        Err((status, reason)) => {
1050            return Response::builder()
1051                .status(status)
1052                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1053                .header("X-Reason", reason)
1054                .header(header::CONTENT_TYPE, "application/json")
1055                .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
1056                .unwrap();
1057        }
1058    };
1059    let auth_ms = started_at.elapsed().as_millis();
1060
1061    let is_allowed_author = is_allowed_write_author(&state, &auth.pubkey);
1062    let can_upload = can_accept_upload_author(&state, &auth.pubkey);
1063    if !can_upload {
1064        let _ = check_write_access(&state, &auth.pubkey);
1065        return blossom_json_error(StatusCode::FORBIDDEN, "Write access denied");
1066    }
1067
1068    let pubkey_bytes = match from_hex(&auth.pubkey) {
1069        Ok(bytes) => bytes,
1070        Err(_) => return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid pubkey format"),
1071    };
1072
1073    let now = SystemTime::now()
1074        .duration_since(UNIX_EPOCH)
1075        .unwrap()
1076        .as_secs();
1077    let mut total_bytes = 0usize;
1078    let mut items = Vec::with_capacity(payload.blobs.len());
1079    let mut descriptors = Vec::with_capacity(payload.blobs.len());
1080    let mut decode_hash_ms = 0u128;
1081    let mut validate_ms = 0u128;
1082
1083    for blob in payload.blobs {
1084        let decode_started = Instant::now();
1085        let sha256_hex = blob.sha256.to_lowercase();
1086        let expected_hash: [u8; 32] = match from_hex(&sha256_hex) {
1087            Ok(hash) => hash,
1088            Err(_) => return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid blob hash"),
1089        };
1090        if !auth.blob_hashes.is_empty() && !auth.blob_hashes.contains(&sha256_hex) {
1091            return blossom_json_error(
1092                StatusCode::FORBIDDEN,
1093                "Uploaded blob hash does not match authorized hash",
1094            );
1095        }
1096
1097        let data = match base64::engine::general_purpose::STANDARD.decode(blob.data.as_bytes()) {
1098            Ok(data) => data,
1099            Err(_) => return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid blob data"),
1100        };
1101        if data.len() > state.max_upload_bytes {
1102            return blossom_json_error(
1103                StatusCode::PAYLOAD_TOO_LARGE,
1104                "Blob exceeds maximum upload size",
1105            );
1106        }
1107        total_bytes = total_bytes.saturating_add(data.len());
1108        if total_bytes > MAX_BATCH_UPLOAD_BYTES {
1109            return blossom_json_error(
1110                StatusCode::PAYLOAD_TOO_LARGE,
1111                "Batch exceeds maximum upload size",
1112            );
1113        }
1114
1115        let mut hasher = Sha256::new();
1116        hasher.update(&data);
1117        let actual_hash: [u8; 32] = hasher.finalize().into();
1118        if actual_hash != expected_hash {
1119            return blossom_json_error(StatusCode::FORBIDDEN, "Hash mismatch");
1120        }
1121        decode_hash_ms += decode_started.elapsed().as_millis();
1122
1123        let validate_started = Instant::now();
1124        let content_type = blob
1125            .content_type
1126            .as_deref()
1127            .map(content_type_base)
1128            .unwrap_or_else(|| "application/octet-stream".to_string());
1129        if let Err((status, reason)) = validate_upload_payload(
1130            &data,
1131            &content_type,
1132            can_upload,
1133            state.require_random_untrusted_ingest,
1134        ) {
1135            return blossom_json_error(status, reason);
1136        }
1137        validate_ms += validate_started.elapsed().as_millis();
1138
1139        let ext = mime_to_extension(&content_type);
1140        descriptors.push(BlobDescriptor {
1141            url: format!("/{}{}", sha256_hex, ext),
1142            sha256: sha256_hex,
1143            size: data.len() as u64,
1144            mime_type: content_type,
1145            uploaded: now,
1146        });
1147        items.push((actual_hash, data));
1148    }
1149    let prepare_ms = started_at.elapsed().as_millis();
1150
1151    let store = state.store.clone();
1152    let store_started = Instant::now();
1153    let stored = tokio::task::spawn_blocking(move || {
1154        if is_allowed_author {
1155            store.put_owned_blobs(&items, &pubkey_bytes)
1156        } else {
1157            store.put_cached_blobs(&items)
1158        }
1159    })
1160    .await
1161    .map_err(|error| anyhow::anyhow!("blob batch write task failed: {}", error));
1162    let store_ms = store_started.elapsed().as_millis();
1163    let total_ms = started_at.elapsed().as_millis();
1164
1165    match stored {
1166        Ok(Ok(uploaded)) => {
1167            if slow_log_ms.is_some_and(|threshold| total_ms >= threshold) {
1168                tracing::warn!(
1169                    blobs = payload_blobs,
1170                    uploaded,
1171                    total_bytes,
1172                    total_ms,
1173                    auth_ms,
1174                    prepare_ms,
1175                    decode_hash_ms,
1176                    validate_ms,
1177                    store_ms,
1178                    allowed_author = is_allowed_author,
1179                    "slow Blossom batch upload"
1180                );
1181            }
1182            Response::builder()
1183                .status(StatusCode::OK)
1184                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1185                .header(header::CONTENT_TYPE, "application/json")
1186                .body(Body::from(
1187                    serde_json::to_string(&BatchUploadResponse {
1188                        uploaded,
1189                        blobs: descriptors,
1190                    })
1191                    .unwrap(),
1192                ))
1193                .unwrap()
1194        }
1195        Ok(Err(error)) | Err(error) => Response::builder()
1196            .status(StatusCode::INTERNAL_SERVER_ERROR)
1197            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1198            .header("X-Reason", "Storage error")
1199            .header(header::CONTENT_TYPE, "application/json")
1200            .body(Body::from(format!(r#"{{"error":"{}"}}"#, error)))
1201            .unwrap(),
1202    }
1203}
1204
1205/// DELETE /<sha256> - Delete a blob (BUD-02)
1206/// Note: Blob is only fully deleted when ALL owners have removed it
1207pub async fn delete_blob(
1208    State(state): State<AppState>,
1209    Path(id): Path<String>,
1210    headers: HeaderMap,
1211) -> impl IntoResponse {
1212    let (hash_part, _) = parse_hash_and_extension(&id);
1213
1214    if !is_valid_sha256(hash_part) {
1215        return Response::builder()
1216            .status(StatusCode::BAD_REQUEST)
1217            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1218            .header("X-Reason", "Invalid SHA256 hash")
1219            .body(Body::empty())
1220            .unwrap();
1221    }
1222
1223    let sha256_hex = hash_part.to_lowercase();
1224
1225    // Convert hash to bytes
1226    let sha256_bytes = match from_hex(&sha256_hex) {
1227        Ok(b) => b,
1228        Err(_) => {
1229            return Response::builder()
1230                .status(StatusCode::BAD_REQUEST)
1231                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1232                .header("X-Reason", "Invalid SHA256 hash format")
1233                .body(Body::empty())
1234                .unwrap();
1235        }
1236    };
1237
1238    // Verify authorization with hash requirement
1239    let auth = match verify_blossom_auth(&headers, "delete", Some(&sha256_hex)) {
1240        Ok(a) => a,
1241        Err((status, reason)) => {
1242            return Response::builder()
1243                .status(status)
1244                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1245                .header("X-Reason", reason)
1246                .body(Body::empty())
1247                .unwrap();
1248        }
1249    };
1250
1251    // Convert pubkey hex to bytes
1252    let pubkey_bytes = match from_hex(&auth.pubkey) {
1253        Ok(b) => b,
1254        Err(_) => {
1255            return Response::builder()
1256                .status(StatusCode::BAD_REQUEST)
1257                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1258                .header("X-Reason", "Invalid pubkey format")
1259                .body(Body::empty())
1260                .unwrap();
1261        }
1262    };
1263
1264    // Check ownership - user must be one of the owners (O(1) lookup with composite key)
1265    match state.store.is_blob_owner(&sha256_bytes, &pubkey_bytes) {
1266        Ok(true) => {
1267            // User is an owner, proceed with delete
1268        }
1269        Ok(false) => {
1270            // Check if blob exists at all (for proper error message)
1271            match state.store.blob_has_owners(&sha256_bytes) {
1272                Ok(true) => {
1273                    return Response::builder()
1274                        .status(StatusCode::FORBIDDEN)
1275                        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1276                        .header("X-Reason", "Not a blob owner")
1277                        .body(Body::empty())
1278                        .unwrap();
1279                }
1280                Ok(false) => {
1281                    return Response::builder()
1282                        .status(StatusCode::NOT_FOUND)
1283                        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1284                        .header(header::CACHE_CONTROL, NOT_FOUND_CACHE_CONTROL)
1285                        .header("X-Reason", "Blob not found")
1286                        .body(Body::empty())
1287                        .unwrap();
1288                }
1289                Err(_) => {
1290                    return Response::builder()
1291                        .status(StatusCode::INTERNAL_SERVER_ERROR)
1292                        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1293                        .body(Body::empty())
1294                        .unwrap();
1295                }
1296            }
1297        }
1298        Err(_) => {
1299            return Response::builder()
1300                .status(StatusCode::INTERNAL_SERVER_ERROR)
1301                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1302                .body(Body::empty())
1303                .unwrap();
1304        }
1305    }
1306
1307    // Remove this user's ownership (blob only deleted when no owners remain)
1308    match state
1309        .store
1310        .delete_blossom_blob(&sha256_bytes, &pubkey_bytes)
1311    {
1312        Ok(fully_deleted) => {
1313            // Return 200 OK whether blob was fully deleted or just removed from user's list
1314            // The client doesn't need to know if other owners still exist
1315            Response::builder()
1316                .status(StatusCode::OK)
1317                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1318                .header(
1319                    "X-Blob-Deleted",
1320                    if fully_deleted { "true" } else { "false" },
1321                )
1322                .body(Body::empty())
1323                .unwrap()
1324        }
1325        Err(_) => Response::builder()
1326            .status(StatusCode::INTERNAL_SERVER_ERROR)
1327            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1328            .body(Body::empty())
1329            .unwrap(),
1330    }
1331}
1332
1333/// GET /list/<pubkey> - List blobs for a pubkey (BUD-02)
1334pub async fn list_blobs(
1335    State(state): State<AppState>,
1336    Path(pubkey): Path<String>,
1337    Query(query): Query<ListQuery>,
1338    headers: HeaderMap,
1339) -> impl IntoResponse {
1340    // Validate pubkey format (64 hex chars)
1341    if pubkey.len() != 64 || !pubkey.chars().all(|c| c.is_ascii_hexdigit()) {
1342        return Response::builder()
1343            .status(StatusCode::BAD_REQUEST)
1344            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1345            .header("X-Reason", "Invalid pubkey format")
1346            .header(header::CONTENT_TYPE, "application/json")
1347            .body(Body::from("[]"))
1348            .unwrap();
1349    }
1350
1351    let pubkey_hex = pubkey.to_lowercase();
1352    let pubkey_bytes: [u8; 32] = match from_hex(&pubkey_hex) {
1353        Ok(b) => b,
1354        Err(_) => {
1355            return Response::builder()
1356                .status(StatusCode::BAD_REQUEST)
1357                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1358                .header("X-Reason", "Invalid pubkey format")
1359                .header(header::CONTENT_TYPE, "application/json")
1360                .body(Body::from("[]"))
1361                .unwrap();
1362        }
1363    };
1364
1365    let auth = match verify_blossom_auth(&headers, "list", None) {
1366        Ok(auth) => auth,
1367        Err((status, reason)) => {
1368            return Response::builder()
1369                .status(status)
1370                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1371                .header("X-Reason", reason)
1372                .header(header::CONTENT_TYPE, "application/json")
1373                .body(Body::from("[]"))
1374                .unwrap();
1375        }
1376    };
1377
1378    if !auth.pubkey.eq_ignore_ascii_case(&pubkey_hex) {
1379        return Response::builder()
1380            .status(StatusCode::FORBIDDEN)
1381            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1382            .header("X-Reason", "Pubkey mismatch")
1383            .header(header::CONTENT_TYPE, "application/json")
1384            .body(Body::from("[]"))
1385            .unwrap();
1386    }
1387
1388    // Get blobs for this pubkey
1389    match state.store.list_blobs_by_pubkey(&pubkey_bytes) {
1390        Ok(blobs) => {
1391            // Apply filters
1392            let mut filtered: Vec<_> = blobs
1393                .into_iter()
1394                .filter(|b| {
1395                    if let Some(since) = query.since {
1396                        if b.uploaded < since {
1397                            return false;
1398                        }
1399                    }
1400                    if let Some(until) = query.until {
1401                        if b.uploaded > until {
1402                            return false;
1403                        }
1404                    }
1405                    true
1406                })
1407                .collect();
1408
1409            // Sort by uploaded descending (most recent first)
1410            filtered.sort_by(|a, b| b.uploaded.cmp(&a.uploaded));
1411
1412            // Apply limit
1413            let limit = query.limit.unwrap_or(100).min(1000);
1414            filtered.truncate(limit);
1415
1416            Response::builder()
1417                .status(StatusCode::OK)
1418                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1419                .header(header::CONTENT_TYPE, "application/json")
1420                .body(Body::from(serde_json::to_string(&filtered).unwrap()))
1421                .unwrap()
1422        }
1423        Err(_) => Response::builder()
1424            .status(StatusCode::INTERNAL_SERVER_ERROR)
1425            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1426            .header(header::CONTENT_TYPE, "application/json")
1427            .body(Body::from("[]"))
1428            .unwrap(),
1429    }
1430}
1431
1432// Helper functions
1433
1434fn parse_hash_and_extension(id: &str) -> (&str, Option<&str>) {
1435    if let Some(dot_pos) = id.rfind('.') {
1436        (&id[..dot_pos], Some(&id[dot_pos..]))
1437    } else {
1438        (id, None)
1439    }
1440}
1441
1442fn is_valid_sha256(s: &str) -> bool {
1443    s.len() == 64 && s.chars().all(|c| c.is_ascii_hexdigit())
1444}
1445
1446#[cfg(test)]
1447fn store_blossom_blob(
1448    state: &AppState,
1449    data: &[u8],
1450    _sha256: &[u8; 32],
1451    pubkey: &[u8; 32],
1452    track_ownership: bool,
1453) -> anyhow::Result<()> {
1454    if track_ownership {
1455        state.store.put_owned_blob(data, pubkey)?;
1456    } else {
1457        state.store.put_cached_blob(data)?;
1458    }
1459
1460    Ok(())
1461}
1462
1463fn mime_to_extension(mime: &str) -> &'static str {
1464    match mime {
1465        "image/png" => ".png",
1466        "image/jpeg" => ".jpg",
1467        "image/gif" => ".gif",
1468        "image/webp" => ".webp",
1469        "image/svg+xml" => ".svg",
1470        "video/mp4" => ".mp4",
1471        "video/webm" => ".webm",
1472        "audio/mpeg" => ".mp3",
1473        "audio/ogg" => ".ogg",
1474        "application/pdf" => ".pdf",
1475        "text/plain" => ".txt",
1476        "text/html" => ".html",
1477        "application/json" => ".json",
1478        _ => "",
1479    }
1480}
1481
1482#[cfg(test)]
1483mod tests {
1484    use super::*;
1485    use crate::server::auth::WsRelayState;
1486    use crate::storage::HashtreeStore;
1487    use axum::response::IntoResponse;
1488    use base64::Engine;
1489    use hashtree_core::sha256;
1490    use std::collections::HashSet;
1491    use std::sync::{Arc, Mutex as StdMutex};
1492    use std::time::Duration;
1493    use tempfile::TempDir;
1494
1495    fn test_app_state(store: Arc<HashtreeStore>) -> AppState {
1496        AppState {
1497            store,
1498            auth: None,
1499            daemon_started_at: 1_700_000_000,
1500            peer_mode: crate::config::ServerMode::Normal,
1501            hash_get_enabled: true,
1502            http_webrtc_fetch: true,
1503            webrtc_peers: None,
1504            fips_transport: None,
1505            fetch_from_fips_peers: true,
1506            ws_relay: Arc::new(WsRelayState::new()),
1507            max_upload_bytes: 5 * 1024 * 1024,
1508            public_writes: true,
1509            public_plaintext_reads: true,
1510            require_random_untrusted_ingest: true,
1511            optimistic_blossom_uploads: false,
1512            optimistic_upload_queue_bytes: 512 * 1024 * 1024,
1513            optimistic_upload_queue: Arc::new(tokio::sync::Semaphore::new(512 * 1024 * 1024)),
1514            allowed_pubkeys: HashSet::new(),
1515            upstream_blossom: Vec::new(),
1516            social_graph: None,
1517            social_graph_store: None,
1518            social_graph_root: None,
1519            socialgraph_snapshot_public: false,
1520            nostr_relay: None,
1521            nostr_relay_urls: Vec::new(),
1522            tree_root_cache: Arc::new(StdMutex::new(std::collections::HashMap::new())),
1523            inflight_blob_fetches: Arc::new(tokio::sync::Mutex::new(
1524                std::collections::HashMap::new(),
1525            )),
1526            inflight_blob_reads: Arc::new(
1527                tokio::sync::Mutex::new(std::collections::HashMap::new()),
1528            ),
1529            blob_cache: Arc::new(crate::blob_cache::BlobCache::for_tests()),
1530            directory_listing_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1531            resolved_path_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1532            thumbnail_path_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1533            cid_size_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1534        }
1535    }
1536
1537    fn create_upload_auth_header(keys: &nostr::Keys) -> String {
1538        use nostr::{EventBuilder, Kind, Tag, TagKind, Timestamp};
1539
1540        let now = Timestamp::now();
1541        let event = EventBuilder::new(Kind::Custom(BLOSSOM_AUTH_KIND), "")
1542            .tags(vec![
1543                Tag::custom(TagKind::Custom("t".into()), vec!["upload".to_string()]),
1544                Tag::custom(
1545                    TagKind::Custom("expiration".into()),
1546                    vec![(now.as_secs() + 300).to_string()],
1547                ),
1548            ])
1549            .custom_created_at(now)
1550            .sign_with_keys(keys)
1551            .expect("sign blossom auth");
1552        let json = serde_json::to_vec(&event).expect("serialize auth event");
1553        format!(
1554            "Nostr {}",
1555            base64::engine::general_purpose::STANDARD.encode(json)
1556        )
1557    }
1558
1559    fn upload_check_bits(response: UploadCheckResponse) -> Vec<bool> {
1560        let bytes = base64::engine::general_purpose::STANDARD
1561            .decode(response.present)
1562            .expect("decode upload check bitset");
1563        (0..response.count)
1564            .map(|index| bytes[index / 8] & (1 << (index % 8)) != 0)
1565            .collect()
1566    }
1567
1568    #[test]
1569    fn test_is_valid_sha256() {
1570        assert!(is_valid_sha256(
1571            "e2bab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6"
1572        ));
1573        assert!(is_valid_sha256(
1574            "0000000000000000000000000000000000000000000000000000000000000000"
1575        ));
1576
1577        // Too short
1578        assert!(!is_valid_sha256("e2bab35b5296ec2242ded0a01f6d6723"));
1579        // Too long
1580        assert!(!is_valid_sha256(
1581            "e2bab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6aa"
1582        ));
1583        // Invalid chars
1584        assert!(!is_valid_sha256(
1585            "zzbab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6"
1586        ));
1587        // Empty
1588        assert!(!is_valid_sha256(""));
1589    }
1590
1591    #[test]
1592    fn test_parse_hash_and_extension() {
1593        let (hash, ext) = parse_hash_and_extension("abc123.png");
1594        assert_eq!(hash, "abc123");
1595        assert_eq!(ext, Some(".png"));
1596
1597        let (hash2, ext2) = parse_hash_and_extension("abc123");
1598        assert_eq!(hash2, "abc123");
1599        assert_eq!(ext2, None);
1600
1601        let (hash3, ext3) = parse_hash_and_extension("abc.123.jpg");
1602        assert_eq!(hash3, "abc.123");
1603        assert_eq!(ext3, Some(".jpg"));
1604    }
1605
1606    #[test]
1607    fn test_mime_to_extension() {
1608        assert_eq!(mime_to_extension("image/png"), ".png");
1609        assert_eq!(mime_to_extension("image/jpeg"), ".jpg");
1610        assert_eq!(mime_to_extension("video/mp4"), ".mp4");
1611        assert_eq!(mime_to_extension("application/octet-stream"), "");
1612        assert_eq!(mime_to_extension("unknown/type"), "");
1613    }
1614
1615    #[tokio::test]
1616    async fn upload_check_reports_present_hashes_in_request_order() {
1617        let temp_dir = TempDir::new().expect("temp dir");
1618        let store = Arc::new(
1619            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1620        );
1621
1622        let present = b"present blob";
1623        let missing = b"missing blob";
1624        let present_hash = sha256(present);
1625        let missing_hash = sha256(missing);
1626        store.put_cached_blob(present).expect("seed blob");
1627
1628        let state = test_app_state(store);
1629        let response = upload_check(
1630            State(state),
1631            Json(UploadCheckRequest {
1632                hashes: vec![
1633                    hex::encode(missing_hash),
1634                    hex::encode(present_hash),
1635                    hex::encode(present_hash),
1636                ],
1637            }),
1638        )
1639        .await
1640        .into_response();
1641
1642        assert_eq!(response.status(), StatusCode::OK);
1643        let body = axum::body::to_bytes(response.into_body(), usize::MAX)
1644            .await
1645            .expect("read response body");
1646        let parsed: UploadCheckResponse =
1647            serde_json::from_slice(&body).expect("parse upload check response");
1648        assert_eq!(upload_check_bits(parsed), vec![false, true, true]);
1649    }
1650
1651    #[tokio::test]
1652    async fn upload_check_rejects_invalid_hash() {
1653        let temp_dir = TempDir::new().expect("temp dir");
1654        let store = Arc::new(
1655            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1656        );
1657        let state = test_app_state(store);
1658        let response = upload_check(
1659            State(state),
1660            Json(UploadCheckRequest {
1661                hashes: vec!["not-a-sha256".to_string()],
1662            }),
1663        )
1664        .await
1665        .into_response();
1666
1667        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
1668    }
1669
1670    #[tokio::test]
1671    async fn upload_check_rejects_too_many_hashes() {
1672        let temp_dir = TempDir::new().expect("temp dir");
1673        let store = Arc::new(
1674            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1675        );
1676        let state = test_app_state(store);
1677        let response = upload_check(
1678            State(state),
1679            Json(UploadCheckRequest {
1680                hashes: vec!["00".repeat(32); MAX_UPLOAD_CHECK_HASHES + 1],
1681            }),
1682        )
1683        .await
1684        .into_response();
1685
1686        assert_eq!(response.status(), StatusCode::PAYLOAD_TOO_LARGE);
1687    }
1688
1689    #[tokio::test]
1690    async fn optimistic_uploads_return_accepted_and_store_in_background() {
1691        let temp_dir = TempDir::new().expect("temp dir");
1692        let store = Arc::new(
1693            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1694        );
1695        let mut state = test_app_state(Arc::clone(&store));
1696        state.optimistic_blossom_uploads = true;
1697
1698        let keys = nostr::Keys::generate();
1699        let mut headers = HeaderMap::new();
1700        headers.insert(
1701            header::AUTHORIZATION,
1702            create_upload_auth_header(&keys)
1703                .parse()
1704                .expect("auth header value"),
1705        );
1706        headers.insert(
1707            header::CONTENT_TYPE,
1708            "application/octet-stream"
1709                .parse()
1710                .expect("content type header value"),
1711        );
1712
1713        let body = axum::body::Bytes::from((0u8..=255).map(|byte| byte ^ 0x55).collect::<Vec<_>>());
1714        let hash = sha256(&body);
1715        let response = upload_blob(State(state), headers, body)
1716            .await
1717            .into_response();
1718        assert_eq!(response.status(), StatusCode::ACCEPTED);
1719
1720        for _ in 0..50 {
1721            if store.blob_exists(&hash).expect("blob exists check") {
1722                return;
1723            }
1724            tokio::time::sleep(Duration::from_millis(10)).await;
1725        }
1726
1727        panic!("optimistic upload was not stored in the background");
1728    }
1729
1730    #[tokio::test]
1731    async fn optimistic_upload_existing_blob_skips_queue() {
1732        let temp_dir = TempDir::new().expect("temp dir");
1733        let store = Arc::new(
1734            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1735        );
1736        let body = axum::body::Bytes::from(
1737            (0u16..=255)
1738                .map(|value| ((value * 73 + 19) % 256) as u8)
1739                .collect::<Vec<_>>(),
1740        );
1741        store.put_cached_blob(&body).expect("seed blob");
1742
1743        let mut state = test_app_state(Arc::clone(&store));
1744        state.optimistic_blossom_uploads = true;
1745        state.optimistic_upload_queue_bytes = 1;
1746        state.optimistic_upload_queue = Arc::new(tokio::sync::Semaphore::new(1));
1747
1748        let keys = nostr::Keys::generate();
1749        let mut headers = HeaderMap::new();
1750        headers.insert(
1751            header::AUTHORIZATION,
1752            create_upload_auth_header(&keys)
1753                .parse()
1754                .expect("auth header value"),
1755        );
1756        headers.insert(
1757            header::CONTENT_TYPE,
1758            "application/octet-stream"
1759                .parse()
1760                .expect("content type header value"),
1761        );
1762
1763        let response = upload_blob(State(state), headers, body)
1764            .await
1765            .into_response();
1766        assert_eq!(response.status(), StatusCode::CONFLICT);
1767    }
1768
1769    #[tokio::test]
1770    async fn optimistic_upload_existing_blob_uses_queue_before_preflight_when_queue_has_room() {
1771        let temp_dir = TempDir::new().expect("temp dir");
1772        let store = Arc::new(
1773            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1774        );
1775        let body = axum::body::Bytes::from((0u8..=255).rev().collect::<Vec<_>>());
1776        let hash_hex = hex::encode(sha256(&body));
1777        store.put_cached_blob(&body).expect("seed blob");
1778
1779        let mut state = test_app_state(store);
1780        state.optimistic_blossom_uploads = true;
1781
1782        let keys = nostr::Keys::generate();
1783        let mut headers = HeaderMap::new();
1784        headers.insert(
1785            header::AUTHORIZATION,
1786            create_upload_auth_header(&keys)
1787                .parse()
1788                .expect("auth header value"),
1789        );
1790        headers.insert(
1791            header::CONTENT_TYPE,
1792            "application/octet-stream"
1793                .parse()
1794                .expect("content type header value"),
1795        );
1796
1797        let response = upload_blob(State(state), headers, body)
1798            .await
1799            .into_response();
1800        assert_eq!(response.status(), StatusCode::ACCEPTED);
1801
1802        for _ in 0..50 {
1803            if !optimistic_upload_is_inflight(&hash_hex) {
1804                return;
1805            }
1806            tokio::time::sleep(Duration::from_millis(10)).await;
1807        }
1808
1809        clear_optimistic_upload_inflight(&hash_hex);
1810        panic!("optimistic upload in-flight marker was not cleared");
1811    }
1812
1813    #[tokio::test]
1814    async fn optimistic_upload_inflight_duplicate_skips_queue() {
1815        let temp_dir = TempDir::new().expect("temp dir");
1816        let store = Arc::new(
1817            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1818        );
1819        let body = axum::body::Bytes::from((0u8..=255).collect::<Vec<_>>());
1820        let hash_hex = hex::encode(sha256(&body));
1821        assert!(mark_optimistic_upload_inflight(&hash_hex));
1822
1823        let mut state = test_app_state(store);
1824        state.optimistic_blossom_uploads = true;
1825        state.optimistic_upload_queue_bytes = 1;
1826        state.optimistic_upload_queue = Arc::new(tokio::sync::Semaphore::new(1));
1827
1828        let keys = nostr::Keys::generate();
1829        let mut headers = HeaderMap::new();
1830        headers.insert(
1831            header::AUTHORIZATION,
1832            create_upload_auth_header(&keys)
1833                .parse()
1834                .expect("auth header value"),
1835        );
1836        headers.insert(
1837            header::CONTENT_TYPE,
1838            "application/octet-stream"
1839                .parse()
1840                .expect("content type header value"),
1841        );
1842
1843        let response = upload_blob(State(state), headers, body)
1844            .await
1845            .into_response();
1846        clear_optimistic_upload_inflight(&hash_hex);
1847        assert_eq!(response.status(), StatusCode::ACCEPTED);
1848    }
1849
1850    #[test]
1851    fn public_writes_accept_unlisted_authors_for_uploads() {
1852        let temp_dir = TempDir::new().expect("temp dir");
1853        let store =
1854            Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 700).expect("store"));
1855        let mut state = test_app_state(store);
1856        let pubkey = "ea4fe79e57f209309bffed2f92f0b95b59d3d1cb4e8892444398aeea7ee317ed";
1857
1858        state.public_writes = true;
1859        assert!(can_accept_upload_author(&state, pubkey));
1860        assert!(!is_allowed_write_author(&state, pubkey));
1861
1862        state.public_writes = false;
1863        assert!(!can_accept_upload_author(&state, pubkey));
1864    }
1865
1866    #[test]
1867    fn public_write_trust_allows_octet_stream_and_raw_media_payloads() {
1868        let encrypted_block: Vec<u8> = (0..=255).collect();
1869
1870        assert_eq!(
1871            validate_upload_payload(&encrypted_block, "application/octet-stream", false, true,),
1872            Ok(())
1873        );
1874
1875        assert_eq!(
1876            validate_upload_payload(b"audio bytes", "audio/mpeg", true, true,),
1877            Ok(())
1878        );
1879
1880        assert_eq!(
1881            validate_upload_payload(b"audio bytes", "audio/mpeg", false, true,),
1882            Err((
1883                StatusCode::FORBIDDEN,
1884                "Raw media uploads require write access".to_string(),
1885            ))
1886        );
1887    }
1888
1889    #[test]
1890    fn authenticated_chk_uploads_skip_entropy_heuristic() {
1891        let low_unique_block: Vec<u8> = (0..256).map(|i| (i % 139) as u8).collect();
1892
1893        assert_eq!(
1894            validate_upload_payload(&low_unique_block, "application/octet-stream", true, true,),
1895            Ok(())
1896        );
1897
1898        assert_eq!(
1899            validate_upload_payload(&low_unique_block, "application/octet-stream", false, true,),
1900            Err((
1901                StatusCode::UNSUPPORTED_MEDIA_TYPE,
1902                "Data not encrypted. Unique: 139 (min: 140)".to_string(),
1903            ))
1904        );
1905    }
1906
1907    #[test]
1908    fn unowned_public_uploads_use_cache_storage_semantics() {
1909        let temp_dir = TempDir::new().expect("temp dir");
1910        let store =
1911            Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 700).expect("store"));
1912        let state = test_app_state(Arc::clone(&store));
1913
1914        let owned = vec![1u8; 280];
1915        let owned_hash = sha256(&owned);
1916        store_blossom_blob(&state, &owned, &owned_hash, &[2u8; 32], true).expect("owned upload");
1917
1918        let public_upload = vec![3u8; 280];
1919        let public_hash = sha256(&public_upload);
1920        store_blossom_blob(&state, &public_upload, &public_hash, &[4u8; 32], false)
1921            .expect("public upload");
1922
1923        let replacement = vec![5u8; 280];
1924        let replacement_hash = sha256(&replacement);
1925        state
1926            .store
1927            .put_cached_blob(&replacement)
1928            .expect("replacement cached blob");
1929
1930        assert!(state.store.blob_exists(&owned_hash).expect("owned exists"));
1931        assert!(!state
1932            .store
1933            .blob_exists(&public_hash)
1934            .expect("public upload evicted"));
1935        assert!(state
1936            .store
1937            .blob_exists(&replacement_hash)
1938            .expect("replacement exists"));
1939        assert!(state
1940            .store
1941            .is_blob_owner(&owned_hash, &[2u8; 32])
1942            .expect("owned tracked"));
1943        assert!(!state
1944            .store
1945            .blob_has_owners(&public_hash)
1946            .expect("public upload unowned"));
1947    }
1948
1949    #[test]
1950    fn owned_blossom_uploads_are_rejected_when_storage_limit_is_full() {
1951        let temp_dir = TempDir::new().expect("temp dir");
1952        let store =
1953            Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 500).expect("store"));
1954        let state = test_app_state(Arc::clone(&store));
1955
1956        let first = vec![1u8; 300];
1957        let first_hash = sha256(&first);
1958        let owner = [2u8; 32];
1959        store_blossom_blob(&state, &first, &first_hash, &owner, true).expect("first upload");
1960
1961        let second = vec![3u8; 300];
1962        let second_hash = sha256(&second);
1963        let error = store_blossom_blob(&state, &second, &second_hash, &owner, true)
1964            .expect_err("second owned upload should exceed the storage limit");
1965
1966        assert!(
1967            error.to_string().contains("storage limit"),
1968            "unexpected error: {error}"
1969        );
1970        assert!(state
1971            .store
1972            .blob_exists(&first_hash)
1973            .expect("first blob remains"));
1974        assert!(!state
1975            .store
1976            .blob_exists(&second_hash)
1977            .expect("second blob rejected"));
1978        assert!(state
1979            .store
1980            .is_blob_owner(&first_hash, &owner)
1981            .expect("first owner tracked"));
1982        assert!(!state
1983            .store
1984            .is_blob_owner(&second_hash, &owner)
1985            .expect("second owner not tracked"));
1986    }
1987}