Skip to main content

hashtree_cli/server/
blossom.rs

1//! Blossom protocol implementation (BUD-01, BUD-02)
2//!
3//! Implements blob storage endpoints with Nostr-based authentication.
4//! See: https://github.com/hzrd149/blossom
5
6use axum::{
7    body::Body,
8    extract::{Path, Query, State},
9    http::{header, HeaderMap, Response, StatusCode},
10    response::IntoResponse,
11    Json,
12};
13use base64::Engine;
14use hashtree_core::from_hex;
15use serde::{Deserialize, Serialize};
16use sha2::{Digest, Sha256};
17use std::collections::HashSet;
18use std::sync::{Mutex, OnceLock};
19use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
20
21use super::auth::AppState;
22use super::blob_read::{acquire_blob_read, acquire_blob_write, blob_read_timeout, BLOB_READ_BUSY};
23use super::ingest_filter::{
24    content_type_base, is_chk_content_type, validate_untrusted_blob, IngestRejection,
25};
26use super::mime::get_mime_type;
27
28/// Blossom authorization event kind (NIP-98 style)
29const BLOSSOM_AUTH_KIND: u16 = 24242;
30
31/// Cache-Control header for immutable content-addressed data (1 year)
32const IMMUTABLE_CACHE_CONTROL: &str = "public, max-age=31536000, immutable";
33const NOT_FOUND_CACHE_CONTROL: &str = "no-store";
34const IMMUTABLE_NOT_FOUND_CACHE_CONTROL: &str = "public, max-age=0, s-maxage=5";
35const OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS_ENV: &str = "HTREE_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS";
36const DEFAULT_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS: u64 = 15_000;
37
38/// Default maximum upload size in bytes (5 MB)
39pub const DEFAULT_MAX_UPLOAD_SIZE: usize = 5 * 1024 * 1024;
40const OPTIMISTIC_UPLOAD_MIN_QUEUE_CHARGE_BYTES: usize = 256 * 1024;
41const MAX_BATCH_UPLOAD_BLOBS: usize = 1024;
42const MAX_BATCH_UPLOAD_BYTES: usize = 64 * 1024 * 1024;
43const MAX_UPLOAD_CHECK_HASHES: usize = 10_000;
44const SLOW_BATCH_UPLOAD_LOG_MS_ENV: &str = "HTREE_SLOW_BATCH_UPLOAD_LOG_MS";
45
46fn slow_batch_upload_log_ms() -> Option<u128> {
47    std::env::var(SLOW_BATCH_UPLOAD_LOG_MS_ENV)
48        .ok()
49        .and_then(|value| value.parse::<u128>().ok())
50        .filter(|value| *value > 0)
51}
52
53#[derive(Debug, Clone, Copy)]
54pub(super) struct OptimisticUploadQueueSnapshot {
55    pub enabled: bool,
56    pub max_bytes: usize,
57    pub available_bytes: usize,
58    pub reserved_bytes: usize,
59    pub in_flight: usize,
60    pub queue_timeout_ms: u64,
61}
62
63/// Check if a pubkey has write access based on allowed_npubs config or social graph
64/// Returns Ok(()) if allowed, Err with JSON error body if denied
65#[allow(clippy::result_large_err)]
66fn check_write_access(state: &AppState, pubkey: &str) -> Result<(), Response<Body>> {
67    // Check if pubkey is in the allowed list (converted from npub to hex)
68    if is_allowed_write_author(state, pubkey) {
69        tracing::debug!(
70            "Blossom write allowed for {}... (allowed writer)",
71            &pubkey[..8.min(pubkey.len())]
72        );
73        return Ok(());
74    }
75
76    // Not in allowed list or social graph
77    tracing::info!(
78        "Blossom write denied for {}... (not in allowed_npubs or social graph)",
79        &pubkey[..8.min(pubkey.len())]
80    );
81    Err(Response::builder()
82        .status(StatusCode::FORBIDDEN)
83        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
84        .header(header::CONTENT_TYPE, "application/json")
85        .body(Body::from(
86            r#"{"error":"Write access denied. Your pubkey is not in the allowed list."}"#,
87        ))
88        .unwrap())
89}
90
91fn is_allowed_write_author(state: &AppState, pubkey: &str) -> bool {
92    if state.allowed_pubkeys.contains(pubkey) {
93        return true;
94    }
95
96    state
97        .social_graph
98        .as_ref()
99        .map(|sg| sg.check_write_access(pubkey))
100        .unwrap_or(false)
101}
102
103fn can_accept_upload_author(state: &AppState, pubkey: &str) -> bool {
104    state.public_writes || is_allowed_write_author(state, pubkey)
105}
106
107fn validate_upload_payload(
108    body: &[u8],
109    content_type: &str,
110    can_upload_author: bool,
111    require_random_untrusted_ingest: bool,
112) -> Result<(), (StatusCode, String)> {
113    let is_chk_upload = is_chk_content_type(content_type);
114
115    if !is_chk_upload && !can_upload_author {
116        return Err((
117            StatusCode::FORBIDDEN,
118            "Raw media uploads require write access".to_string(),
119        ));
120    }
121
122    if is_chk_upload {
123        let require_random = require_random_untrusted_ingest && !can_upload_author;
124        validate_untrusted_blob(body, require_random)
125            .map_err(|IngestRejection { status, reason }| (status, reason))?;
126    }
127
128    Ok(())
129}
130
131fn blossom_json_error(status: StatusCode, reason: impl Into<String>) -> Response<Body> {
132    let reason = reason.into();
133    Response::builder()
134        .status(status)
135        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
136        .header("X-Reason", reason.as_str())
137        .header(header::CONTENT_TYPE, "application/json")
138        .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
139        .unwrap()
140}
141
142fn blossom_retryable_json_error(
143    status: StatusCode,
144    reason: impl Into<String>,
145    retry_after_seconds: u64,
146) -> Response<Body> {
147    let reason = reason.into();
148    Response::builder()
149        .status(status)
150        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
151        .header("X-Reason", reason.as_str())
152        .header(header::RETRY_AFTER, retry_after_seconds.to_string())
153        .header(header::CONTENT_TYPE, "application/json")
154        .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
155        .unwrap()
156}
157
158/// Blob descriptor returned by upload and list endpoints
159#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct BlobDescriptor {
161    pub url: String,
162    pub sha256: String,
163    pub size: u64,
164    #[serde(rename = "type")]
165    pub mime_type: String,
166    pub uploaded: u64,
167}
168
169#[derive(Debug, Deserialize)]
170pub struct BatchUploadBlob {
171    pub sha256: String,
172    #[serde(default, alias = "contentType")]
173    pub content_type: Option<String>,
174    pub data: String,
175}
176
177#[derive(Debug, Deserialize)]
178pub struct BatchUploadRequest {
179    pub blobs: Vec<BatchUploadBlob>,
180}
181
182#[derive(Debug, Serialize)]
183pub struct BatchUploadResponse {
184    pub uploaded: usize,
185    pub blobs: Vec<BlobDescriptor>,
186}
187
188#[derive(Debug, Deserialize)]
189pub struct UploadCheckRequest {
190    pub hashes: Vec<String>,
191}
192
193#[derive(Debug, Serialize, Deserialize)]
194pub struct UploadCheckResponse {
195    pub count: usize,
196    pub present: String,
197}
198
199/// Query parameters for list endpoint
200#[derive(Debug, Deserialize)]
201pub struct ListQuery {
202    pub since: Option<u64>,
203    pub until: Option<u64>,
204    pub limit: Option<usize>,
205    pub cursor: Option<String>,
206}
207
208/// Parsed Nostr authorization event
209#[derive(Debug)]
210pub struct BlossomAuth {
211    pub pubkey: String,
212    pub kind: u16,
213    pub created_at: u64,
214    pub expiration: Option<u64>,
215    pub action: Option<String>,   // "upload", "delete", "list", "get"
216    pub blob_hashes: Vec<String>, // x tags
217    pub server: Option<String>,   // server tag
218}
219
220/// Parse and verify Nostr authorization from header
221/// Returns the verified auth or an error response
222pub fn verify_blossom_auth(
223    headers: &HeaderMap,
224    required_action: &str,
225    required_hash: Option<&str>,
226) -> Result<BlossomAuth, (StatusCode, &'static str)> {
227    let auth_header = headers
228        .get(header::AUTHORIZATION)
229        .and_then(|v| v.to_str().ok())
230        .ok_or((StatusCode::UNAUTHORIZED, "Missing Authorization header"))?;
231
232    let nostr_event = auth_header.strip_prefix("Nostr ").ok_or((
233        StatusCode::UNAUTHORIZED,
234        "Invalid auth scheme, expected 'Nostr'",
235    ))?;
236
237    // Decode base64 event
238    let engine = base64::engine::general_purpose::STANDARD;
239    let event_bytes = engine
240        .decode(nostr_event)
241        .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid base64 in auth header"))?;
242
243    let event_json: serde_json::Value = serde_json::from_slice(&event_bytes)
244        .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid JSON in auth event"))?;
245
246    // Extract event fields
247    let kind = event_json["kind"]
248        .as_u64()
249        .ok_or((StatusCode::BAD_REQUEST, "Missing kind in event"))?;
250
251    if kind != BLOSSOM_AUTH_KIND as u64 {
252        return Err((
253            StatusCode::BAD_REQUEST,
254            "Invalid event kind, expected 24242",
255        ));
256    }
257
258    let pubkey = event_json["pubkey"]
259        .as_str()
260        .ok_or((StatusCode::BAD_REQUEST, "Missing pubkey in event"))?
261        .to_string();
262
263    let created_at = event_json["created_at"]
264        .as_u64()
265        .ok_or((StatusCode::BAD_REQUEST, "Missing created_at in event"))?;
266
267    let sig = event_json["sig"]
268        .as_str()
269        .ok_or((StatusCode::BAD_REQUEST, "Missing signature in event"))?;
270
271    // Verify signature
272    if !verify_nostr_signature(&event_json, &pubkey, sig) {
273        return Err((StatusCode::UNAUTHORIZED, "Invalid signature"));
274    }
275
276    // Parse tags
277    let tags = event_json["tags"]
278        .as_array()
279        .ok_or((StatusCode::BAD_REQUEST, "Missing tags in event"))?;
280
281    let mut expiration: Option<u64> = None;
282    let mut action: Option<String> = None;
283    let mut blob_hashes: Vec<String> = Vec::new();
284    let mut server: Option<String> = None;
285
286    for tag in tags {
287        let tag_arr = tag.as_array();
288        if let Some(arr) = tag_arr {
289            if arr.len() >= 2 {
290                let tag_name = arr[0].as_str().unwrap_or("");
291                let tag_value = arr[1].as_str().unwrap_or("");
292
293                match tag_name {
294                    "t" => action = Some(tag_value.to_string()),
295                    "x" => blob_hashes.push(tag_value.to_lowercase()),
296                    "expiration" => expiration = tag_value.parse().ok(),
297                    "server" => server = Some(tag_value.to_string()),
298                    _ => {}
299                }
300            }
301        }
302    }
303
304    // Validate expiration
305    let now = SystemTime::now()
306        .duration_since(UNIX_EPOCH)
307        .unwrap()
308        .as_secs();
309
310    if let Some(exp) = expiration {
311        if exp < now {
312            return Err((StatusCode::UNAUTHORIZED, "Authorization expired"));
313        }
314    }
315
316    // Validate created_at is not in the future (with 60s tolerance)
317    if created_at > now + 60 {
318        return Err((StatusCode::BAD_REQUEST, "Event created_at is in the future"));
319    }
320
321    // Validate action matches
322    if let Some(ref act) = action {
323        if act != required_action {
324            return Err((StatusCode::FORBIDDEN, "Action mismatch"));
325        }
326    } else {
327        return Err((StatusCode::BAD_REQUEST, "Missing 't' tag for action"));
328    }
329
330    // Validate hash if required
331    if let Some(hash) = required_hash {
332        if !blob_hashes.is_empty() && !blob_hashes.contains(&hash.to_lowercase()) {
333            return Err((StatusCode::FORBIDDEN, "Blob hash not authorized"));
334        }
335    }
336
337    Ok(BlossomAuth {
338        pubkey,
339        kind: kind as u16,
340        created_at,
341        expiration,
342        action,
343        blob_hashes,
344        server,
345    })
346}
347
348/// Verify Nostr event signature using secp256k1
349fn verify_nostr_signature(event: &serde_json::Value, pubkey: &str, sig: &str) -> bool {
350    use secp256k1::{schnorr::Signature, Message, Secp256k1, XOnlyPublicKey};
351
352    // Compute event ID (sha256 of serialized event)
353    let content = event["content"].as_str().unwrap_or("");
354    let full_serialized = format!(
355        "[0,\"{}\",{},{},{},\"{}\"]",
356        pubkey,
357        event["created_at"],
358        event["kind"],
359        event["tags"],
360        escape_json_string(content),
361    );
362
363    let mut hasher = Sha256::new();
364    hasher.update(full_serialized.as_bytes());
365    let event_id = hasher.finalize();
366
367    // Parse pubkey and signature
368    let pubkey_bytes = match hex::decode(pubkey) {
369        Ok(b) => b,
370        Err(_) => return false,
371    };
372
373    let sig_bytes = match hex::decode(sig) {
374        Ok(b) => b,
375        Err(_) => return false,
376    };
377
378    let secp = Secp256k1::verification_only();
379
380    let xonly_pubkey = match XOnlyPublicKey::from_slice(&pubkey_bytes) {
381        Ok(pk) => pk,
382        Err(_) => return false,
383    };
384
385    let signature = match Signature::from_slice(&sig_bytes) {
386        Ok(s) => s,
387        Err(_) => return false,
388    };
389
390    let message = match Message::from_digest_slice(&event_id) {
391        Ok(m) => m,
392        Err(_) => return false,
393    };
394
395    secp.verify_schnorr(&signature, &message, &xonly_pubkey)
396        .is_ok()
397}
398
399/// Escape string for JSON serialization
400fn escape_json_string(s: &str) -> String {
401    let mut result = String::new();
402    for c in s.chars() {
403        match c {
404            '"' => result.push_str("\\\""),
405            '\\' => result.push_str("\\\\"),
406            '\n' => result.push_str("\\n"),
407            '\r' => result.push_str("\\r"),
408            '\t' => result.push_str("\\t"),
409            c if c.is_control() => {
410                result.push_str(&format!("\\u{:04x}", c as u32));
411            }
412            c => result.push(c),
413        }
414    }
415    result
416}
417
418/// CORS preflight handler for all Blossom endpoints
419/// Echoes back Access-Control-Request-Headers to allow any headers
420pub async fn cors_preflight(headers: HeaderMap) -> impl IntoResponse {
421    // Echo back requested headers, or use sensible defaults that cover common Blossom headers
422    let allowed_headers = headers
423        .get(header::ACCESS_CONTROL_REQUEST_HEADERS)
424        .and_then(|v| v.to_str().ok())
425        .unwrap_or("Authorization, Content-Type, X-SHA-256, x-sha-256");
426
427    // Always include common headers in addition to what was requested
428    let full_allowed = format!(
429        "{}, Authorization, Content-Type, X-SHA-256, x-sha-256, Accept, Cache-Control",
430        allowed_headers
431    );
432
433    Response::builder()
434        .status(StatusCode::NO_CONTENT)
435        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
436        .header(
437            header::ACCESS_CONTROL_ALLOW_METHODS,
438            "GET, HEAD, POST, PUT, DELETE, OPTIONS",
439        )
440        .header(header::ACCESS_CONTROL_ALLOW_HEADERS, full_allowed)
441        .header(header::ACCESS_CONTROL_MAX_AGE, "86400")
442        .body(Body::empty())
443        .unwrap()
444}
445
446fn encode_upload_check_bitset(bits: &[bool]) -> String {
447    let mut bytes = vec![0u8; bits.len().div_ceil(8)];
448    for (index, present) in bits.iter().enumerate() {
449        if *present {
450            bytes[index / 8] |= 1 << (index % 8);
451        }
452    }
453    base64::engine::general_purpose::STANDARD.encode(bytes)
454}
455
456/// POST /upload/check - Batch-check blob presence for upload planning.
457pub async fn upload_check(
458    State(state): State<AppState>,
459    Json(payload): Json<UploadCheckRequest>,
460) -> impl IntoResponse {
461    if payload.hashes.len() > MAX_UPLOAD_CHECK_HASHES {
462        return blossom_json_error(
463            StatusCode::PAYLOAD_TOO_LARGE,
464            format!("Too many hashes; maximum is {}", MAX_UPLOAD_CHECK_HASHES),
465        );
466    }
467
468    let mut requested = Vec::with_capacity(payload.hashes.len());
469    let mut unique = Vec::new();
470    for hash in payload.hashes {
471        let hash = hash.trim().to_ascii_lowercase();
472        if !is_valid_sha256(&hash) {
473            return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid SHA256 hash");
474        }
475        let bytes: [u8; 32] = match from_hex(&hash) {
476            Ok(bytes) => bytes,
477            Err(_) => return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid SHA256 hash"),
478        };
479        requested.push(bytes);
480        unique.push(bytes);
481    }
482
483    unique.sort_unstable();
484    unique.dedup();
485
486    let existing = if unique.is_empty() {
487        Vec::new()
488    } else {
489        let permit = match acquire_blob_read().await {
490            Ok(permit) => permit,
491            Err(_) => {
492                return blossom_retryable_json_error(
493                    StatusCode::SERVICE_UNAVAILABLE,
494                    BLOB_READ_BUSY,
495                    1,
496                );
497            }
498        };
499        let store = state.store.clone();
500        let lookup_hashes = unique.clone();
501        let lookup = tokio::task::spawn_blocking(move || {
502            store
503                .router()
504                .existing_local_hashes_in_sorted_candidates(&lookup_hashes)
505        });
506        let result = tokio::time::timeout(blob_read_timeout(), lookup).await;
507        drop(permit);
508        match result {
509            Ok(Ok(Ok(existing))) => existing,
510            Ok(Ok(Err(error))) => {
511                tracing::debug!("Blossom upload check failed: {}", error);
512                return blossom_json_error(StatusCode::INTERNAL_SERVER_ERROR, "Storage error");
513            }
514            Ok(Err(error)) => {
515                tracing::debug!("Blossom upload check task failed: {}", error);
516                return blossom_json_error(StatusCode::INTERNAL_SERVER_ERROR, "Storage error");
517            }
518            Err(_) => {
519                return blossom_retryable_json_error(
520                    StatusCode::SERVICE_UNAVAILABLE,
521                    "Blob check timed out",
522                    1,
523                );
524            }
525        }
526    };
527
528    let present_unique: HashSet<[u8; 32]> = unique
529        .into_iter()
530        .zip(existing)
531        .filter_map(|(hash, present)| present.then_some(hash))
532        .collect();
533    let present_bits: Vec<bool> = requested
534        .iter()
535        .map(|hash| present_unique.contains(hash))
536        .collect();
537
538    Response::builder()
539        .status(StatusCode::OK)
540        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
541        .header(header::CONTENT_TYPE, "application/json")
542        .body(Body::from(
543            serde_json::to_string(&UploadCheckResponse {
544                count: present_bits.len(),
545                present: encode_upload_check_bitset(&present_bits),
546            })
547            .unwrap(),
548        ))
549        .unwrap()
550}
551
552/// HEAD /<sha256> - Check if blob exists
553pub async fn head_blob(
554    State(state): State<AppState>,
555    Path(id): Path<String>,
556    connect_info: axum::extract::ConnectInfo<std::net::SocketAddr>,
557) -> impl IntoResponse {
558    let is_localhost = connect_info.0.ip().is_loopback();
559    let (hash_part, ext) = parse_hash_and_extension(&id);
560
561    if !is_valid_sha256(hash_part) {
562        return Response::builder()
563            .status(StatusCode::BAD_REQUEST)
564            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
565            .header("X-Reason", "Invalid SHA256 hash")
566            .body(Body::empty())
567            .unwrap();
568    }
569
570    let sha256_hex = hash_part.to_lowercase();
571    let sha256_bytes: [u8; 32] = match from_hex(&sha256_hex) {
572        Ok(b) => b,
573        Err(_) => {
574            return Response::builder()
575                .status(StatusCode::BAD_REQUEST)
576                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
577                .header("X-Reason", "Invalid SHA256 format")
578                .body(Body::empty())
579                .unwrap();
580        }
581    };
582
583    // Blossom HEAD only needs metadata; avoid reading the full blob body just to
584    // answer cache probes and CDN revalidation. The read permit keeps CDN probe
585    // storms from filling Tokio's blocking thread pool while old blobs without
586    // metadata are still being normalized.
587    let blob_size = if let Some(cached) = state.blob_cache.get_size(&sha256_hex) {
588        Ok(Ok(cached))
589    } else {
590        let permit = match acquire_blob_read().await {
591            Ok(permit) => permit,
592            Err(_) => {
593                return Response::builder()
594                    .status(StatusCode::SERVICE_UNAVAILABLE)
595                    .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
596                    .header(header::CACHE_CONTROL, NOT_FOUND_CACHE_CONTROL)
597                    .header("Retry-After", "1")
598                    .header("X-Reason", BLOB_READ_BUSY)
599                    .body(Body::empty())
600                    .unwrap();
601            }
602        };
603        let store = state.store.clone();
604        let size_read = tokio::task::spawn_blocking(move || store.blob_size(&sha256_bytes));
605        let timed = tokio::time::timeout(blob_read_timeout(), size_read).await;
606        drop(permit);
607        let result = match timed {
608            Ok(result) => result.map_err(|_| ()),
609            Err(_) => Err(()),
610        };
611        if let Ok(Ok(size)) = result {
612            state.blob_cache.put_size(sha256_hex.clone(), size);
613        }
614        result
615    };
616
617    match blob_size {
618        Ok(Ok(Some(size))) => {
619            let mime_type = ext
620                .map(|e| get_mime_type(&format!("file{}", e)))
621                .unwrap_or("application/octet-stream");
622
623            let mut builder = Response::builder()
624                .status(StatusCode::OK)
625                .header(header::CONTENT_TYPE, mime_type)
626                .header(header::CONTENT_LENGTH, size)
627                .header(header::ACCEPT_RANGES, "bytes")
628                .header(header::CACHE_CONTROL, IMMUTABLE_CACHE_CONTROL)
629                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*");
630            if is_localhost {
631                builder = builder.header("X-Source", "local");
632            }
633            builder.body(Body::empty()).unwrap()
634        }
635        Ok(Ok(None)) => Response::builder()
636            .status(StatusCode::NOT_FOUND)
637            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
638            .header(header::CACHE_CONTROL, IMMUTABLE_NOT_FOUND_CACHE_CONTROL)
639            .header("X-Reason", "Blob not found")
640            .body(Body::empty())
641            .unwrap(),
642        _ => Response::builder()
643            .status(StatusCode::INTERNAL_SERVER_ERROR)
644            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
645            .body(Body::empty())
646            .unwrap(),
647    }
648}
649
650async fn store_blossom_blob_without_blocking_runtime(
651    state: &AppState,
652    data: axum::body::Bytes,
653    pubkey: [u8; 32],
654    track_ownership: bool,
655) -> anyhow::Result<()> {
656    let mut hasher = Sha256::new();
657    hasher.update(&data);
658    let hash_hex = hex::encode(hasher.finalize());
659    let data_for_cache = data.clone();
660    let permit = acquire_blob_write()
661        .await
662        .map_err(|err| anyhow::anyhow!(err))?;
663    let store = state.store.clone();
664    tokio::task::spawn_blocking(move || {
665        let _permit = permit;
666        if track_ownership {
667            store.put_owned_blob(&data, &pubkey)?;
668        } else {
669            store.put_cached_blob(&data)?;
670        }
671        Ok::<(), anyhow::Error>(())
672    })
673    .await
674    .map_err(|err| anyhow::anyhow!("blob write task failed: {}", err))??;
675    state
676        .blob_cache
677        .put_size(hash_hex.clone(), Some(data_for_cache.len() as u64));
678    state.blob_cache.put_body(hash_hex, &data_for_cache);
679    Ok(())
680}
681
682fn upload_descriptor_response(status: StatusCode, descriptor: &BlobDescriptor) -> Response<Body> {
683    Response::builder()
684        .status(status)
685        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
686        .header(header::CONTENT_TYPE, "application/json")
687        .body(Body::from(serde_json::to_string(descriptor).unwrap()))
688        .unwrap()
689}
690
691fn optimistic_upload_queue_timeout() -> Duration {
692    let millis = std::env::var(OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS_ENV)
693        .ok()
694        .and_then(|value| value.parse::<u64>().ok())
695        .filter(|value| *value > 0)
696        .unwrap_or(DEFAULT_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS);
697    Duration::from_millis(millis)
698}
699
700fn optimistic_upload_inflight() -> &'static Mutex<HashSet<String>> {
701    static INFLIGHT: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
702    INFLIGHT.get_or_init(|| Mutex::new(HashSet::new()))
703}
704
705fn optimistic_upload_is_inflight(hash_hex: &str) -> bool {
706    optimistic_upload_inflight()
707        .lock()
708        .is_ok_and(|inflight| inflight.contains(hash_hex))
709}
710
711fn mark_optimistic_upload_inflight(hash_hex: &str) -> bool {
712    optimistic_upload_inflight()
713        .lock()
714        .map(|mut inflight| inflight.insert(hash_hex.to_string()))
715        .unwrap_or(true)
716}
717
718fn clear_optimistic_upload_inflight(hash_hex: &str) {
719    if let Ok(mut inflight) = optimistic_upload_inflight().lock() {
720        inflight.remove(hash_hex);
721    }
722}
723
724pub(super) fn optimistic_upload_queue_snapshot(state: &AppState) -> OptimisticUploadQueueSnapshot {
725    let max_bytes = state.optimistic_upload_queue_bytes;
726    let available_bytes = state
727        .optimistic_upload_queue
728        .available_permits()
729        .min(max_bytes);
730    let in_flight = optimistic_upload_inflight()
731        .lock()
732        .map(|inflight| inflight.len())
733        .unwrap_or(0);
734
735    OptimisticUploadQueueSnapshot {
736        enabled: state.optimistic_blossom_uploads,
737        max_bytes,
738        available_bytes,
739        reserved_bytes: max_bytes.saturating_sub(available_bytes),
740        in_flight,
741        queue_timeout_ms: duration_millis_u64(optimistic_upload_queue_timeout()),
742    }
743}
744
745fn duration_millis_u64(duration: Duration) -> u64 {
746    duration.as_millis().min(u128::from(u64::MAX)) as u64
747}
748
749async fn acquire_optimistic_upload_queue(
750    state: &AppState,
751    permits: u32,
752) -> Result<tokio::sync::OwnedSemaphorePermit, &'static str> {
753    match tokio::time::timeout(
754        optimistic_upload_queue_timeout(),
755        state
756            .optimistic_upload_queue
757            .clone()
758            .acquire_many_owned(permits),
759    )
760    .await
761    {
762        Ok(Ok(permit)) => Ok(permit),
763        Ok(Err(_)) => Err("Optimistic upload queue is closed"),
764        Err(_) => Err("Optimistic upload queue is full"),
765    }
766}
767
768async fn uploaded_blob_already_exists(
769    state: &AppState,
770    sha256_hash: [u8; 32],
771    sha256_hex: &str,
772) -> Result<bool, String> {
773    if let Some(Some(_)) = state.blob_cache.get_size(sha256_hex) {
774        return Ok(true);
775    }
776
777    let permit = acquire_blob_read().await.map_err(str::to_string)?;
778    let store = state.store.clone();
779    let size_read = tokio::task::spawn_blocking(move || {
780        store
781            .blob_size(&sha256_hash)
782            .map_err(|error| error.to_string())
783    });
784
785    let result = tokio::time::timeout(blob_read_timeout(), size_read).await;
786    drop(permit);
787    match result {
788        Ok(Ok(Ok(size))) => {
789            state.blob_cache.put_size(sha256_hex.to_string(), size);
790            Ok(size.is_some())
791        }
792        Ok(Ok(Err(error))) => Err(error),
793        Ok(Err(error)) => Err(format!("blob existence task failed: {}", error)),
794        Err(_) => Err("blob existence check timed out".to_string()),
795    }
796}
797
798async fn set_existing_blob_owner_without_body_write(
799    state: AppState,
800    sha256_hash: [u8; 32],
801    pubkey: [u8; 32],
802) -> anyhow::Result<()> {
803    tokio::task::spawn_blocking(move || state.store.set_blob_owner(&sha256_hash, &pubkey))
804        .await
805        .map_err(|error| anyhow::anyhow!("blob owner task failed: {}", error))??;
806    Ok(())
807}
808
809/// PUT /upload - Upload a new blob (BUD-02)
810pub async fn upload_blob(
811    State(state): State<AppState>,
812    headers: HeaderMap,
813    body: axum::body::Bytes,
814) -> impl IntoResponse {
815    // Check size limit first (before auth to save resources)
816    let max_size = state.max_upload_bytes;
817    if body.len() > max_size {
818        return Response::builder()
819            .status(StatusCode::PAYLOAD_TOO_LARGE)
820            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
821            .header(header::CONTENT_TYPE, "application/json")
822            .body(Body::from(format!(
823                r#"{{"error":"Upload size {} bytes exceeds maximum {} bytes ({} MB)"}}"#,
824                body.len(),
825                max_size,
826                max_size / 1024 / 1024
827            )))
828            .unwrap();
829    }
830
831    // Verify authorization
832    let auth = match verify_blossom_auth(&headers, "upload", None) {
833        Ok(a) => a,
834        Err((status, reason)) => {
835            return Response::builder()
836                .status(status)
837                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
838                .header("X-Reason", reason)
839                .header(header::CONTENT_TYPE, "application/json")
840                .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
841                .unwrap();
842        }
843    };
844
845    // Get content type from header
846    let content_type = headers
847        .get(header::CONTENT_TYPE)
848        .and_then(|v| v.to_str().ok())
849        .unwrap_or("application/octet-stream")
850        .to_string();
851
852    // Check write access: either in allowed_npubs/social graph OR public_writes is enabled.
853    let is_allowed_author = is_allowed_write_author(&state, &auth.pubkey);
854    let can_upload = can_accept_upload_author(&state, &auth.pubkey);
855    if !can_upload {
856        let _ = check_write_access(&state, &auth.pubkey);
857        return Response::builder()
858            .status(StatusCode::FORBIDDEN)
859            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
860            .header(header::CONTENT_TYPE, "application/json")
861            .body(Body::from(r#"{"error":"Write access denied. Your pubkey is not in the allowed list and public writes are disabled."}"#))
862            .unwrap();
863    }
864
865    if let Err((status, reason)) = validate_upload_payload(
866        &body,
867        &content_type,
868        can_upload,
869        state.require_random_untrusted_ingest,
870    ) {
871        return blossom_json_error(status, reason);
872    }
873
874    // Compute SHA256 of uploaded data
875    let mut hasher = Sha256::new();
876    hasher.update(&body);
877    let sha256_hash: [u8; 32] = hasher.finalize().into();
878    let sha256_hex = hex::encode(sha256_hash);
879
880    // If auth has x tags, verify hash matches
881    if !auth.blob_hashes.is_empty() && !auth.blob_hashes.contains(&sha256_hex) {
882        return Response::builder()
883            .status(StatusCode::FORBIDDEN)
884            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
885            .header(
886                "X-Reason",
887                "Uploaded blob hash does not match authorized hash",
888            )
889            .header(header::CONTENT_TYPE, "application/json")
890            .body(Body::from(r#"{"error":"Hash mismatch"}"#))
891            .unwrap();
892    }
893
894    // Convert pubkey hex to bytes
895    let pubkey_bytes = match from_hex(&auth.pubkey) {
896        Ok(b) => b,
897        Err(_) => {
898            return Response::builder()
899                .status(StatusCode::BAD_REQUEST)
900                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
901                .header("X-Reason", "Invalid pubkey format")
902                .body(Body::empty())
903                .unwrap();
904        }
905    };
906
907    let size = body.len() as u64;
908    let now = SystemTime::now()
909        .duration_since(UNIX_EPOCH)
910        .unwrap()
911        .as_secs();
912    let ext = mime_to_extension(&content_type_base(&content_type));
913    let descriptor = BlobDescriptor {
914        url: format!("/{}{}", sha256_hex, ext),
915        sha256: sha256_hex.clone(),
916        size,
917        mime_type: content_type,
918        uploaded: now,
919    };
920
921    if state.optimistic_blossom_uploads && optimistic_upload_is_inflight(&sha256_hex) {
922        return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
923    }
924
925    // Store public-write blobs in cache storage unless the writer is explicitly
926    // allowed, so untrusted public uploads do not become protected owned data.
927    if state.optimistic_blossom_uploads {
928        let queued_bytes = body.len().max(OPTIMISTIC_UPLOAD_MIN_QUEUE_CHARGE_BYTES);
929        if queued_bytes <= state.optimistic_upload_queue_bytes {
930            let permits = queued_bytes as u32;
931            let marked_inflight = mark_optimistic_upload_inflight(&sha256_hex);
932            if !marked_inflight {
933                return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
934            }
935            let permit = match acquire_optimistic_upload_queue(&state, permits).await {
936                Ok(permit) => permit,
937                Err(_) => {
938                    clear_optimistic_upload_inflight(&sha256_hex);
939                    return blossom_retryable_json_error(
940                        StatusCode::SERVICE_UNAVAILABLE,
941                        "Optimistic upload queue is full",
942                        2,
943                    );
944                }
945            };
946            let state_for_write = state.clone();
947            let hash_for_log = sha256_hex.clone();
948            tokio::spawn(async move {
949                let _permit = permit;
950                if let Err(error) = store_blossom_blob_without_blocking_runtime(
951                    &state_for_write,
952                    body,
953                    pubkey_bytes,
954                    is_allowed_author,
955                )
956                .await
957                {
958                    tracing::error!(
959                        "Background Blossom storage failed for {}: {:#}",
960                        hash_for_log,
961                        error
962                    );
963                }
964                clear_optimistic_upload_inflight(&hash_for_log);
965            });
966
967            return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
968        }
969
970        tracing::warn!(
971            "Blossom upload {} is larger than optimistic queue budget {}; storing synchronously",
972            queued_bytes,
973            state.optimistic_upload_queue_bytes
974        );
975    }
976
977    match uploaded_blob_already_exists(&state, sha256_hash, &sha256_hex).await {
978        Ok(true) => {
979            if is_allowed_author {
980                if let Err(error) = set_existing_blob_owner_without_body_write(
981                    state.clone(),
982                    sha256_hash,
983                    pubkey_bytes,
984                )
985                .await
986                {
987                    return Response::builder()
988                        .status(StatusCode::INTERNAL_SERVER_ERROR)
989                        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
990                        .header("X-Reason", "Storage error")
991                        .header(header::CONTENT_TYPE, "application/json")
992                        .body(Body::from(format!(r#"{{"error":"{}"}}"#, error)))
993                        .unwrap();
994                }
995            }
996            return upload_descriptor_response(StatusCode::CONFLICT, &descriptor);
997        }
998        Ok(false) => {}
999        Err(error) => {
1000            tracing::debug!(
1001                "Could not preflight Blossom upload {} before synchronous storage: {}",
1002                sha256_hex,
1003                error
1004            );
1005        }
1006    }
1007
1008    let store_result = store_blossom_blob_without_blocking_runtime(
1009        &state,
1010        body.clone(),
1011        pubkey_bytes,
1012        is_allowed_author,
1013    )
1014    .await;
1015
1016    match store_result {
1017        Ok(()) => upload_descriptor_response(StatusCode::OK, &descriptor),
1018        Err(e) => Response::builder()
1019            .status(StatusCode::INTERNAL_SERVER_ERROR)
1020            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1021            .header("X-Reason", "Storage error")
1022            .header(header::CONTENT_TYPE, "application/json")
1023            .body(Body::from(format!(r#"{{"error":"{}"}}"#, e)))
1024            .unwrap(),
1025    }
1026}
1027
1028/// POST /upload/batch - Upload multiple blobs with one auth event and one storage batch.
1029pub async fn upload_blob_batch(
1030    State(state): State<AppState>,
1031    headers: HeaderMap,
1032    Json(payload): Json<BatchUploadRequest>,
1033) -> impl IntoResponse {
1034    let started_at = Instant::now();
1035    let slow_log_ms = slow_batch_upload_log_ms();
1036    let payload_blobs = payload.blobs.len();
1037    if payload.blobs.is_empty() {
1038        return blossom_json_error(StatusCode::BAD_REQUEST, "Batch is empty");
1039    }
1040    if payload.blobs.len() > MAX_BATCH_UPLOAD_BLOBS {
1041        return blossom_json_error(
1042            StatusCode::PAYLOAD_TOO_LARGE,
1043            "Batch contains too many blobs",
1044        );
1045    }
1046
1047    let auth = match verify_blossom_auth(&headers, "upload", None) {
1048        Ok(a) => a,
1049        Err((status, reason)) => {
1050            return Response::builder()
1051                .status(status)
1052                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1053                .header("X-Reason", reason)
1054                .header(header::CONTENT_TYPE, "application/json")
1055                .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
1056                .unwrap();
1057        }
1058    };
1059    let auth_ms = started_at.elapsed().as_millis();
1060
1061    let is_allowed_author = is_allowed_write_author(&state, &auth.pubkey);
1062    let can_upload = can_accept_upload_author(&state, &auth.pubkey);
1063    if !can_upload {
1064        let _ = check_write_access(&state, &auth.pubkey);
1065        return blossom_json_error(StatusCode::FORBIDDEN, "Write access denied");
1066    }
1067
1068    let pubkey_bytes = match from_hex(&auth.pubkey) {
1069        Ok(bytes) => bytes,
1070        Err(_) => return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid pubkey format"),
1071    };
1072
1073    let now = SystemTime::now()
1074        .duration_since(UNIX_EPOCH)
1075        .unwrap()
1076        .as_secs();
1077    let mut total_bytes = 0usize;
1078    let mut items = Vec::with_capacity(payload.blobs.len());
1079    let mut descriptors = Vec::with_capacity(payload.blobs.len());
1080    let mut decode_hash_ms = 0u128;
1081    let mut validate_ms = 0u128;
1082
1083    for blob in payload.blobs {
1084        let decode_started = Instant::now();
1085        let sha256_hex = blob.sha256.to_lowercase();
1086        let expected_hash: [u8; 32] = match from_hex(&sha256_hex) {
1087            Ok(hash) => hash,
1088            Err(_) => return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid blob hash"),
1089        };
1090        if !auth.blob_hashes.is_empty() && !auth.blob_hashes.contains(&sha256_hex) {
1091            return blossom_json_error(
1092                StatusCode::FORBIDDEN,
1093                "Uploaded blob hash does not match authorized hash",
1094            );
1095        }
1096
1097        let data = match base64::engine::general_purpose::STANDARD.decode(blob.data.as_bytes()) {
1098            Ok(data) => data,
1099            Err(_) => return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid blob data"),
1100        };
1101        if data.len() > state.max_upload_bytes {
1102            return blossom_json_error(
1103                StatusCode::PAYLOAD_TOO_LARGE,
1104                "Blob exceeds maximum upload size",
1105            );
1106        }
1107        total_bytes = total_bytes.saturating_add(data.len());
1108        if total_bytes > MAX_BATCH_UPLOAD_BYTES {
1109            return blossom_json_error(
1110                StatusCode::PAYLOAD_TOO_LARGE,
1111                "Batch exceeds maximum upload size",
1112            );
1113        }
1114
1115        let mut hasher = Sha256::new();
1116        hasher.update(&data);
1117        let actual_hash: [u8; 32] = hasher.finalize().into();
1118        if actual_hash != expected_hash {
1119            return blossom_json_error(StatusCode::FORBIDDEN, "Hash mismatch");
1120        }
1121        decode_hash_ms += decode_started.elapsed().as_millis();
1122
1123        let validate_started = Instant::now();
1124        let content_type = blob
1125            .content_type
1126            .as_deref()
1127            .map(content_type_base)
1128            .unwrap_or_else(|| "application/octet-stream".to_string());
1129        if let Err((status, reason)) = validate_upload_payload(
1130            &data,
1131            &content_type,
1132            can_upload,
1133            state.require_random_untrusted_ingest,
1134        ) {
1135            return blossom_json_error(status, reason);
1136        }
1137        validate_ms += validate_started.elapsed().as_millis();
1138
1139        let ext = mime_to_extension(&content_type);
1140        descriptors.push(BlobDescriptor {
1141            url: format!("/{}{}", sha256_hex, ext),
1142            sha256: sha256_hex,
1143            size: data.len() as u64,
1144            mime_type: content_type,
1145            uploaded: now,
1146        });
1147        items.push((actual_hash, data));
1148    }
1149    let prepare_ms = started_at.elapsed().as_millis();
1150
1151    let store = state.store.clone();
1152    let store_started = Instant::now();
1153    let stored = tokio::task::spawn_blocking(move || {
1154        if is_allowed_author {
1155            store.put_owned_blobs(&items, &pubkey_bytes)
1156        } else {
1157            store.put_cached_blobs(&items)
1158        }
1159    })
1160    .await
1161    .map_err(|error| anyhow::anyhow!("blob batch write task failed: {}", error));
1162    let store_ms = store_started.elapsed().as_millis();
1163    let total_ms = started_at.elapsed().as_millis();
1164
1165    match stored {
1166        Ok(Ok(uploaded)) => {
1167            if slow_log_ms.is_some_and(|threshold| total_ms >= threshold) {
1168                tracing::warn!(
1169                    blobs = payload_blobs,
1170                    uploaded,
1171                    total_bytes,
1172                    total_ms,
1173                    auth_ms,
1174                    prepare_ms,
1175                    decode_hash_ms,
1176                    validate_ms,
1177                    store_ms,
1178                    allowed_author = is_allowed_author,
1179                    "slow Blossom batch upload"
1180                );
1181            }
1182            Response::builder()
1183                .status(StatusCode::OK)
1184                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1185                .header(header::CONTENT_TYPE, "application/json")
1186                .body(Body::from(
1187                    serde_json::to_string(&BatchUploadResponse {
1188                        uploaded,
1189                        blobs: descriptors,
1190                    })
1191                    .unwrap(),
1192                ))
1193                .unwrap()
1194        }
1195        Ok(Err(error)) | Err(error) => Response::builder()
1196            .status(StatusCode::INTERNAL_SERVER_ERROR)
1197            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1198            .header("X-Reason", "Storage error")
1199            .header(header::CONTENT_TYPE, "application/json")
1200            .body(Body::from(format!(r#"{{"error":"{}"}}"#, error)))
1201            .unwrap(),
1202    }
1203}
1204
1205/// DELETE /<sha256> - Delete a blob (BUD-02)
1206/// Note: Blob is only fully deleted when ALL owners have removed it
1207pub async fn delete_blob(
1208    State(state): State<AppState>,
1209    Path(id): Path<String>,
1210    headers: HeaderMap,
1211) -> impl IntoResponse {
1212    let (hash_part, _) = parse_hash_and_extension(&id);
1213
1214    if !is_valid_sha256(hash_part) {
1215        return Response::builder()
1216            .status(StatusCode::BAD_REQUEST)
1217            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1218            .header("X-Reason", "Invalid SHA256 hash")
1219            .body(Body::empty())
1220            .unwrap();
1221    }
1222
1223    let sha256_hex = hash_part.to_lowercase();
1224
1225    // Convert hash to bytes
1226    let sha256_bytes = match from_hex(&sha256_hex) {
1227        Ok(b) => b,
1228        Err(_) => {
1229            return Response::builder()
1230                .status(StatusCode::BAD_REQUEST)
1231                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1232                .header("X-Reason", "Invalid SHA256 hash format")
1233                .body(Body::empty())
1234                .unwrap();
1235        }
1236    };
1237
1238    // Verify authorization with hash requirement
1239    let auth = match verify_blossom_auth(&headers, "delete", Some(&sha256_hex)) {
1240        Ok(a) => a,
1241        Err((status, reason)) => {
1242            return Response::builder()
1243                .status(status)
1244                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1245                .header("X-Reason", reason)
1246                .body(Body::empty())
1247                .unwrap();
1248        }
1249    };
1250
1251    // Convert pubkey hex to bytes
1252    let pubkey_bytes = match from_hex(&auth.pubkey) {
1253        Ok(b) => b,
1254        Err(_) => {
1255            return Response::builder()
1256                .status(StatusCode::BAD_REQUEST)
1257                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1258                .header("X-Reason", "Invalid pubkey format")
1259                .body(Body::empty())
1260                .unwrap();
1261        }
1262    };
1263
1264    // Check ownership - user must be one of the owners (O(1) lookup with composite key)
1265    match state.store.is_blob_owner(&sha256_bytes, &pubkey_bytes) {
1266        Ok(true) => {
1267            // User is an owner, proceed with delete
1268        }
1269        Ok(false) => {
1270            // Check if blob exists at all (for proper error message)
1271            match state.store.blob_has_owners(&sha256_bytes) {
1272                Ok(true) => {
1273                    return Response::builder()
1274                        .status(StatusCode::FORBIDDEN)
1275                        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1276                        .header("X-Reason", "Not a blob owner")
1277                        .body(Body::empty())
1278                        .unwrap();
1279                }
1280                Ok(false) => {
1281                    return Response::builder()
1282                        .status(StatusCode::NOT_FOUND)
1283                        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1284                        .header(header::CACHE_CONTROL, NOT_FOUND_CACHE_CONTROL)
1285                        .header("X-Reason", "Blob not found")
1286                        .body(Body::empty())
1287                        .unwrap();
1288                }
1289                Err(_) => {
1290                    return Response::builder()
1291                        .status(StatusCode::INTERNAL_SERVER_ERROR)
1292                        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1293                        .body(Body::empty())
1294                        .unwrap();
1295                }
1296            }
1297        }
1298        Err(_) => {
1299            return Response::builder()
1300                .status(StatusCode::INTERNAL_SERVER_ERROR)
1301                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1302                .body(Body::empty())
1303                .unwrap();
1304        }
1305    }
1306
1307    // Remove this user's ownership (blob only deleted when no owners remain)
1308    match state
1309        .store
1310        .delete_blossom_blob(&sha256_bytes, &pubkey_bytes)
1311    {
1312        Ok(fully_deleted) => {
1313            // Return 200 OK whether blob was fully deleted or just removed from user's list
1314            // The client doesn't need to know if other owners still exist
1315            Response::builder()
1316                .status(StatusCode::OK)
1317                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1318                .header(
1319                    "X-Blob-Deleted",
1320                    if fully_deleted { "true" } else { "false" },
1321                )
1322                .body(Body::empty())
1323                .unwrap()
1324        }
1325        Err(_) => Response::builder()
1326            .status(StatusCode::INTERNAL_SERVER_ERROR)
1327            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1328            .body(Body::empty())
1329            .unwrap(),
1330    }
1331}
1332
1333/// GET /list/<pubkey> - List blobs for a pubkey (BUD-02)
1334pub async fn list_blobs(
1335    State(state): State<AppState>,
1336    Path(pubkey): Path<String>,
1337    Query(query): Query<ListQuery>,
1338    headers: HeaderMap,
1339) -> impl IntoResponse {
1340    // Validate pubkey format (64 hex chars)
1341    if pubkey.len() != 64 || !pubkey.chars().all(|c| c.is_ascii_hexdigit()) {
1342        return Response::builder()
1343            .status(StatusCode::BAD_REQUEST)
1344            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1345            .header("X-Reason", "Invalid pubkey format")
1346            .header(header::CONTENT_TYPE, "application/json")
1347            .body(Body::from("[]"))
1348            .unwrap();
1349    }
1350
1351    let pubkey_hex = pubkey.to_lowercase();
1352    let pubkey_bytes: [u8; 32] = match from_hex(&pubkey_hex) {
1353        Ok(b) => b,
1354        Err(_) => {
1355            return Response::builder()
1356                .status(StatusCode::BAD_REQUEST)
1357                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1358                .header("X-Reason", "Invalid pubkey format")
1359                .header(header::CONTENT_TYPE, "application/json")
1360                .body(Body::from("[]"))
1361                .unwrap();
1362        }
1363    };
1364
1365    let auth = match verify_blossom_auth(&headers, "list", None) {
1366        Ok(auth) => auth,
1367        Err((status, reason)) => {
1368            return Response::builder()
1369                .status(status)
1370                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1371                .header("X-Reason", reason)
1372                .header(header::CONTENT_TYPE, "application/json")
1373                .body(Body::from("[]"))
1374                .unwrap();
1375        }
1376    };
1377
1378    if !auth.pubkey.eq_ignore_ascii_case(&pubkey_hex) {
1379        return Response::builder()
1380            .status(StatusCode::FORBIDDEN)
1381            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1382            .header("X-Reason", "Pubkey mismatch")
1383            .header(header::CONTENT_TYPE, "application/json")
1384            .body(Body::from("[]"))
1385            .unwrap();
1386    }
1387
1388    // Get blobs for this pubkey
1389    match state.store.list_blobs_by_pubkey(&pubkey_bytes) {
1390        Ok(blobs) => {
1391            // Apply filters
1392            let mut filtered: Vec<_> = blobs
1393                .into_iter()
1394                .filter(|b| {
1395                    if let Some(since) = query.since {
1396                        if b.uploaded < since {
1397                            return false;
1398                        }
1399                    }
1400                    if let Some(until) = query.until {
1401                        if b.uploaded > until {
1402                            return false;
1403                        }
1404                    }
1405                    true
1406                })
1407                .collect();
1408
1409            // Sort by uploaded descending (most recent first)
1410            filtered.sort_by(|a, b| b.uploaded.cmp(&a.uploaded));
1411
1412            // Apply limit
1413            let limit = query.limit.unwrap_or(100).min(1000);
1414            filtered.truncate(limit);
1415
1416            Response::builder()
1417                .status(StatusCode::OK)
1418                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1419                .header(header::CONTENT_TYPE, "application/json")
1420                .body(Body::from(serde_json::to_string(&filtered).unwrap()))
1421                .unwrap()
1422        }
1423        Err(_) => Response::builder()
1424            .status(StatusCode::INTERNAL_SERVER_ERROR)
1425            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1426            .header(header::CONTENT_TYPE, "application/json")
1427            .body(Body::from("[]"))
1428            .unwrap(),
1429    }
1430}
1431
1432// Helper functions
1433
1434fn parse_hash_and_extension(id: &str) -> (&str, Option<&str>) {
1435    if let Some(dot_pos) = id.rfind('.') {
1436        (&id[..dot_pos], Some(&id[dot_pos..]))
1437    } else {
1438        (id, None)
1439    }
1440}
1441
1442fn is_valid_sha256(s: &str) -> bool {
1443    s.len() == 64 && s.chars().all(|c| c.is_ascii_hexdigit())
1444}
1445
1446#[cfg(test)]
1447fn store_blossom_blob(
1448    state: &AppState,
1449    data: &[u8],
1450    _sha256: &[u8; 32],
1451    pubkey: &[u8; 32],
1452    track_ownership: bool,
1453) -> anyhow::Result<()> {
1454    if track_ownership {
1455        state.store.put_owned_blob(data, pubkey)?;
1456    } else {
1457        state.store.put_cached_blob(data)?;
1458    }
1459
1460    Ok(())
1461}
1462
1463fn mime_to_extension(mime: &str) -> &'static str {
1464    match mime {
1465        "image/png" => ".png",
1466        "image/jpeg" => ".jpg",
1467        "image/gif" => ".gif",
1468        "image/webp" => ".webp",
1469        "image/svg+xml" => ".svg",
1470        "video/mp4" => ".mp4",
1471        "video/webm" => ".webm",
1472        "audio/mpeg" => ".mp3",
1473        "audio/ogg" => ".ogg",
1474        "application/pdf" => ".pdf",
1475        "text/plain" => ".txt",
1476        "text/html" => ".html",
1477        "application/json" => ".json",
1478        _ => "",
1479    }
1480}
1481
1482#[cfg(test)]
1483mod tests {
1484    use super::*;
1485    use crate::server::auth::WsRelayState;
1486    use crate::storage::HashtreeStore;
1487    use axum::response::IntoResponse;
1488    use base64::Engine;
1489    use hashtree_core::sha256;
1490    use std::collections::HashSet;
1491    use std::sync::{Arc, Mutex as StdMutex};
1492    use std::time::Duration;
1493    use tempfile::TempDir;
1494
1495    fn test_app_state(store: Arc<HashtreeStore>) -> AppState {
1496        AppState {
1497            store,
1498            auth: None,
1499            daemon_started_at: 1_700_000_000,
1500            peer_mode: crate::config::ServerMode::Normal,
1501            hash_get_enabled: true,
1502            http_webrtc_fetch: true,
1503            webrtc_peers: None,
1504            fips_transport: None,
1505            fetch_from_fips_peers: true,
1506            ws_relay: Arc::new(WsRelayState::new()),
1507            max_upload_bytes: 5 * 1024 * 1024,
1508            public_writes: true,
1509            require_random_untrusted_ingest: true,
1510            optimistic_blossom_uploads: false,
1511            optimistic_upload_queue_bytes: 512 * 1024 * 1024,
1512            optimistic_upload_queue: Arc::new(tokio::sync::Semaphore::new(512 * 1024 * 1024)),
1513            allowed_pubkeys: HashSet::new(),
1514            upstream_blossom: Vec::new(),
1515            social_graph: None,
1516            social_graph_store: None,
1517            social_graph_root: None,
1518            socialgraph_snapshot_public: false,
1519            nostr_relay: None,
1520            nostr_relay_urls: Vec::new(),
1521            tree_root_cache: Arc::new(StdMutex::new(std::collections::HashMap::new())),
1522            inflight_blob_fetches: Arc::new(tokio::sync::Mutex::new(
1523                std::collections::HashMap::new(),
1524            )),
1525            inflight_blob_reads: Arc::new(
1526                tokio::sync::Mutex::new(std::collections::HashMap::new()),
1527            ),
1528            blob_cache: Arc::new(crate::blob_cache::BlobCache::for_tests()),
1529            directory_listing_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1530            resolved_path_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1531            thumbnail_path_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1532            cid_size_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1533        }
1534    }
1535
1536    fn create_upload_auth_header(keys: &nostr::Keys) -> String {
1537        use nostr::{EventBuilder, Kind, Tag, TagKind, Timestamp};
1538
1539        let now = Timestamp::now();
1540        let event = EventBuilder::new(
1541            Kind::Custom(BLOSSOM_AUTH_KIND),
1542            "",
1543            vec![
1544                Tag::custom(TagKind::Custom("t".into()), vec!["upload".to_string()]),
1545                Tag::custom(
1546                    TagKind::Custom("expiration".into()),
1547                    vec![(now.as_u64() + 300).to_string()],
1548                ),
1549            ],
1550        )
1551        .custom_created_at(now)
1552        .to_event(keys)
1553        .expect("sign blossom auth");
1554        let json = serde_json::to_vec(&event).expect("serialize auth event");
1555        format!(
1556            "Nostr {}",
1557            base64::engine::general_purpose::STANDARD.encode(json)
1558        )
1559    }
1560
1561    fn upload_check_bits(response: UploadCheckResponse) -> Vec<bool> {
1562        let bytes = base64::engine::general_purpose::STANDARD
1563            .decode(response.present)
1564            .expect("decode upload check bitset");
1565        (0..response.count)
1566            .map(|index| bytes[index / 8] & (1 << (index % 8)) != 0)
1567            .collect()
1568    }
1569
1570    #[test]
1571    fn test_is_valid_sha256() {
1572        assert!(is_valid_sha256(
1573            "e2bab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6"
1574        ));
1575        assert!(is_valid_sha256(
1576            "0000000000000000000000000000000000000000000000000000000000000000"
1577        ));
1578
1579        // Too short
1580        assert!(!is_valid_sha256("e2bab35b5296ec2242ded0a01f6d6723"));
1581        // Too long
1582        assert!(!is_valid_sha256(
1583            "e2bab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6aa"
1584        ));
1585        // Invalid chars
1586        assert!(!is_valid_sha256(
1587            "zzbab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6"
1588        ));
1589        // Empty
1590        assert!(!is_valid_sha256(""));
1591    }
1592
1593    #[test]
1594    fn test_parse_hash_and_extension() {
1595        let (hash, ext) = parse_hash_and_extension("abc123.png");
1596        assert_eq!(hash, "abc123");
1597        assert_eq!(ext, Some(".png"));
1598
1599        let (hash2, ext2) = parse_hash_and_extension("abc123");
1600        assert_eq!(hash2, "abc123");
1601        assert_eq!(ext2, None);
1602
1603        let (hash3, ext3) = parse_hash_and_extension("abc.123.jpg");
1604        assert_eq!(hash3, "abc.123");
1605        assert_eq!(ext3, Some(".jpg"));
1606    }
1607
1608    #[test]
1609    fn test_mime_to_extension() {
1610        assert_eq!(mime_to_extension("image/png"), ".png");
1611        assert_eq!(mime_to_extension("image/jpeg"), ".jpg");
1612        assert_eq!(mime_to_extension("video/mp4"), ".mp4");
1613        assert_eq!(mime_to_extension("application/octet-stream"), "");
1614        assert_eq!(mime_to_extension("unknown/type"), "");
1615    }
1616
1617    #[tokio::test]
1618    async fn upload_check_reports_present_hashes_in_request_order() {
1619        let temp_dir = TempDir::new().expect("temp dir");
1620        let store = Arc::new(
1621            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1622        );
1623
1624        let present = b"present blob";
1625        let missing = b"missing blob";
1626        let present_hash = sha256(present);
1627        let missing_hash = sha256(missing);
1628        store.put_cached_blob(present).expect("seed blob");
1629
1630        let state = test_app_state(store);
1631        let response = upload_check(
1632            State(state),
1633            Json(UploadCheckRequest {
1634                hashes: vec![
1635                    hex::encode(missing_hash),
1636                    hex::encode(present_hash),
1637                    hex::encode(present_hash),
1638                ],
1639            }),
1640        )
1641        .await
1642        .into_response();
1643
1644        assert_eq!(response.status(), StatusCode::OK);
1645        let body = axum::body::to_bytes(response.into_body(), usize::MAX)
1646            .await
1647            .expect("read response body");
1648        let parsed: UploadCheckResponse =
1649            serde_json::from_slice(&body).expect("parse upload check response");
1650        assert_eq!(upload_check_bits(parsed), vec![false, true, true]);
1651    }
1652
1653    #[tokio::test]
1654    async fn upload_check_rejects_invalid_hash() {
1655        let temp_dir = TempDir::new().expect("temp dir");
1656        let store = Arc::new(
1657            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1658        );
1659        let state = test_app_state(store);
1660        let response = upload_check(
1661            State(state),
1662            Json(UploadCheckRequest {
1663                hashes: vec!["not-a-sha256".to_string()],
1664            }),
1665        )
1666        .await
1667        .into_response();
1668
1669        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
1670    }
1671
1672    #[tokio::test]
1673    async fn upload_check_rejects_too_many_hashes() {
1674        let temp_dir = TempDir::new().expect("temp dir");
1675        let store = Arc::new(
1676            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1677        );
1678        let state = test_app_state(store);
1679        let response = upload_check(
1680            State(state),
1681            Json(UploadCheckRequest {
1682                hashes: vec!["00".repeat(32); MAX_UPLOAD_CHECK_HASHES + 1],
1683            }),
1684        )
1685        .await
1686        .into_response();
1687
1688        assert_eq!(response.status(), StatusCode::PAYLOAD_TOO_LARGE);
1689    }
1690
1691    #[tokio::test]
1692    async fn optimistic_uploads_return_accepted_and_store_in_background() {
1693        let temp_dir = TempDir::new().expect("temp dir");
1694        let store = Arc::new(
1695            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1696        );
1697        let mut state = test_app_state(Arc::clone(&store));
1698        state.optimistic_blossom_uploads = true;
1699
1700        let keys = nostr::Keys::generate();
1701        let mut headers = HeaderMap::new();
1702        headers.insert(
1703            header::AUTHORIZATION,
1704            create_upload_auth_header(&keys)
1705                .parse()
1706                .expect("auth header value"),
1707        );
1708        headers.insert(
1709            header::CONTENT_TYPE,
1710            "application/octet-stream"
1711                .parse()
1712                .expect("content type header value"),
1713        );
1714
1715        let body = axum::body::Bytes::from((0u8..=255).map(|byte| byte ^ 0x55).collect::<Vec<_>>());
1716        let hash = sha256(&body);
1717        let response = upload_blob(State(state), headers, body)
1718            .await
1719            .into_response();
1720        assert_eq!(response.status(), StatusCode::ACCEPTED);
1721
1722        for _ in 0..50 {
1723            if store.blob_exists(&hash).expect("blob exists check") {
1724                return;
1725            }
1726            tokio::time::sleep(Duration::from_millis(10)).await;
1727        }
1728
1729        panic!("optimistic upload was not stored in the background");
1730    }
1731
1732    #[tokio::test]
1733    async fn optimistic_upload_existing_blob_skips_queue() {
1734        let temp_dir = TempDir::new().expect("temp dir");
1735        let store = Arc::new(
1736            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1737        );
1738        let body = axum::body::Bytes::from(
1739            (0u16..=255)
1740                .map(|value| ((value * 73 + 19) % 256) as u8)
1741                .collect::<Vec<_>>(),
1742        );
1743        store.put_cached_blob(&body).expect("seed blob");
1744
1745        let mut state = test_app_state(Arc::clone(&store));
1746        state.optimistic_blossom_uploads = true;
1747        state.optimistic_upload_queue_bytes = 1;
1748        state.optimistic_upload_queue = Arc::new(tokio::sync::Semaphore::new(1));
1749
1750        let keys = nostr::Keys::generate();
1751        let mut headers = HeaderMap::new();
1752        headers.insert(
1753            header::AUTHORIZATION,
1754            create_upload_auth_header(&keys)
1755                .parse()
1756                .expect("auth header value"),
1757        );
1758        headers.insert(
1759            header::CONTENT_TYPE,
1760            "application/octet-stream"
1761                .parse()
1762                .expect("content type header value"),
1763        );
1764
1765        let response = upload_blob(State(state), headers, body)
1766            .await
1767            .into_response();
1768        assert_eq!(response.status(), StatusCode::CONFLICT);
1769    }
1770
1771    #[tokio::test]
1772    async fn optimistic_upload_existing_blob_uses_queue_before_preflight_when_queue_has_room() {
1773        let temp_dir = TempDir::new().expect("temp dir");
1774        let store = Arc::new(
1775            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1776        );
1777        let body = axum::body::Bytes::from((0u8..=255).rev().collect::<Vec<_>>());
1778        let hash_hex = hex::encode(sha256(&body));
1779        store.put_cached_blob(&body).expect("seed blob");
1780
1781        let mut state = test_app_state(store);
1782        state.optimistic_blossom_uploads = true;
1783
1784        let keys = nostr::Keys::generate();
1785        let mut headers = HeaderMap::new();
1786        headers.insert(
1787            header::AUTHORIZATION,
1788            create_upload_auth_header(&keys)
1789                .parse()
1790                .expect("auth header value"),
1791        );
1792        headers.insert(
1793            header::CONTENT_TYPE,
1794            "application/octet-stream"
1795                .parse()
1796                .expect("content type header value"),
1797        );
1798
1799        let response = upload_blob(State(state), headers, body)
1800            .await
1801            .into_response();
1802        assert_eq!(response.status(), StatusCode::ACCEPTED);
1803
1804        for _ in 0..50 {
1805            if !optimistic_upload_is_inflight(&hash_hex) {
1806                return;
1807            }
1808            tokio::time::sleep(Duration::from_millis(10)).await;
1809        }
1810
1811        clear_optimistic_upload_inflight(&hash_hex);
1812        panic!("optimistic upload in-flight marker was not cleared");
1813    }
1814
1815    #[tokio::test]
1816    async fn optimistic_upload_inflight_duplicate_skips_queue() {
1817        let temp_dir = TempDir::new().expect("temp dir");
1818        let store = Arc::new(
1819            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1820        );
1821        let body = axum::body::Bytes::from((0u8..=255).collect::<Vec<_>>());
1822        let hash_hex = hex::encode(sha256(&body));
1823        assert!(mark_optimistic_upload_inflight(&hash_hex));
1824
1825        let mut state = test_app_state(store);
1826        state.optimistic_blossom_uploads = true;
1827        state.optimistic_upload_queue_bytes = 1;
1828        state.optimistic_upload_queue = Arc::new(tokio::sync::Semaphore::new(1));
1829
1830        let keys = nostr::Keys::generate();
1831        let mut headers = HeaderMap::new();
1832        headers.insert(
1833            header::AUTHORIZATION,
1834            create_upload_auth_header(&keys)
1835                .parse()
1836                .expect("auth header value"),
1837        );
1838        headers.insert(
1839            header::CONTENT_TYPE,
1840            "application/octet-stream"
1841                .parse()
1842                .expect("content type header value"),
1843        );
1844
1845        let response = upload_blob(State(state), headers, body)
1846            .await
1847            .into_response();
1848        clear_optimistic_upload_inflight(&hash_hex);
1849        assert_eq!(response.status(), StatusCode::ACCEPTED);
1850    }
1851
1852    #[test]
1853    fn public_writes_accept_unlisted_authors_for_uploads() {
1854        let temp_dir = TempDir::new().expect("temp dir");
1855        let store =
1856            Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 700).expect("store"));
1857        let mut state = test_app_state(store);
1858        let pubkey = "ea4fe79e57f209309bffed2f92f0b95b59d3d1cb4e8892444398aeea7ee317ed";
1859
1860        state.public_writes = true;
1861        assert!(can_accept_upload_author(&state, pubkey));
1862        assert!(!is_allowed_write_author(&state, pubkey));
1863
1864        state.public_writes = false;
1865        assert!(!can_accept_upload_author(&state, pubkey));
1866    }
1867
1868    #[test]
1869    fn public_write_trust_allows_octet_stream_and_raw_media_payloads() {
1870        let encrypted_block: Vec<u8> = (0..=255).collect();
1871
1872        assert_eq!(
1873            validate_upload_payload(&encrypted_block, "application/octet-stream", false, true,),
1874            Ok(())
1875        );
1876
1877        assert_eq!(
1878            validate_upload_payload(b"audio bytes", "audio/mpeg", true, true,),
1879            Ok(())
1880        );
1881
1882        assert_eq!(
1883            validate_upload_payload(b"audio bytes", "audio/mpeg", false, true,),
1884            Err((
1885                StatusCode::FORBIDDEN,
1886                "Raw media uploads require write access".to_string(),
1887            ))
1888        );
1889    }
1890
1891    #[test]
1892    fn authenticated_chk_uploads_skip_entropy_heuristic() {
1893        let low_unique_block: Vec<u8> = (0..256).map(|i| (i % 139) as u8).collect();
1894
1895        assert_eq!(
1896            validate_upload_payload(&low_unique_block, "application/octet-stream", true, true,),
1897            Ok(())
1898        );
1899
1900        assert_eq!(
1901            validate_upload_payload(&low_unique_block, "application/octet-stream", false, true,),
1902            Err((
1903                StatusCode::UNSUPPORTED_MEDIA_TYPE,
1904                "Data not encrypted. Unique: 139 (min: 140)".to_string(),
1905            ))
1906        );
1907    }
1908
1909    #[test]
1910    fn unowned_public_uploads_use_cache_storage_semantics() {
1911        let temp_dir = TempDir::new().expect("temp dir");
1912        let store =
1913            Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 700).expect("store"));
1914        let state = test_app_state(Arc::clone(&store));
1915
1916        let owned = vec![1u8; 280];
1917        let owned_hash = sha256(&owned);
1918        store_blossom_blob(&state, &owned, &owned_hash, &[2u8; 32], true).expect("owned upload");
1919
1920        let public_upload = vec![3u8; 280];
1921        let public_hash = sha256(&public_upload);
1922        store_blossom_blob(&state, &public_upload, &public_hash, &[4u8; 32], false)
1923            .expect("public upload");
1924
1925        let replacement = vec![5u8; 280];
1926        let replacement_hash = sha256(&replacement);
1927        state
1928            .store
1929            .put_cached_blob(&replacement)
1930            .expect("replacement cached blob");
1931
1932        assert!(state.store.blob_exists(&owned_hash).expect("owned exists"));
1933        assert!(!state
1934            .store
1935            .blob_exists(&public_hash)
1936            .expect("public upload evicted"));
1937        assert!(state
1938            .store
1939            .blob_exists(&replacement_hash)
1940            .expect("replacement exists"));
1941        assert!(state
1942            .store
1943            .is_blob_owner(&owned_hash, &[2u8; 32])
1944            .expect("owned tracked"));
1945        assert!(!state
1946            .store
1947            .blob_has_owners(&public_hash)
1948            .expect("public upload unowned"));
1949    }
1950
1951    #[test]
1952    fn owned_blossom_uploads_are_rejected_when_storage_limit_is_full() {
1953        let temp_dir = TempDir::new().expect("temp dir");
1954        let store =
1955            Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 500).expect("store"));
1956        let state = test_app_state(Arc::clone(&store));
1957
1958        let first = vec![1u8; 300];
1959        let first_hash = sha256(&first);
1960        let owner = [2u8; 32];
1961        store_blossom_blob(&state, &first, &first_hash, &owner, true).expect("first upload");
1962
1963        let second = vec![3u8; 300];
1964        let second_hash = sha256(&second);
1965        let error = store_blossom_blob(&state, &second, &second_hash, &owner, true)
1966            .expect_err("second owned upload should exceed the storage limit");
1967
1968        assert!(
1969            error.to_string().contains("storage limit"),
1970            "unexpected error: {error}"
1971        );
1972        assert!(state
1973            .store
1974            .blob_exists(&first_hash)
1975            .expect("first blob remains"));
1976        assert!(!state
1977            .store
1978            .blob_exists(&second_hash)
1979            .expect("second blob rejected"));
1980        assert!(state
1981            .store
1982            .is_blob_owner(&first_hash, &owner)
1983            .expect("first owner tracked"));
1984        assert!(!state
1985            .store
1986            .is_blob_owner(&second_hash, &owner)
1987            .expect("second owner not tracked"));
1988    }
1989}