Skip to main content

hashtree_cli/server/
blossom.rs

1//! Blossom protocol implementation (BUD-01, BUD-02)
2//!
3//! Implements blob storage endpoints with Nostr-based authentication.
4//! See: https://github.com/hzrd149/blossom
5
6use axum::{
7    body::Body,
8    extract::{Path, Query, State},
9    http::{header, HeaderMap, Response, StatusCode},
10    response::IntoResponse,
11    Json,
12};
13use base64::Engine;
14use hashtree_core::from_hex;
15use serde::{Deserialize, Serialize};
16use sha2::{Digest, Sha256};
17use std::collections::HashSet;
18use std::sync::{Mutex, OnceLock};
19use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
20
21use super::auth::AppState;
22use super::blob_read::{acquire_blob_read, acquire_blob_write, blob_read_timeout, BLOB_READ_BUSY};
23use super::ingest_filter::{
24    content_type_base, is_chk_content_type, validate_untrusted_blob, IngestRejection,
25};
26use super::mime::get_mime_type;
27
28/// Blossom authorization event kind (NIP-98 style)
29const BLOSSOM_AUTH_KIND: u16 = 24242;
30
31/// Cache-Control header for immutable content-addressed data (1 year)
32const IMMUTABLE_CACHE_CONTROL: &str = "public, max-age=31536000, immutable";
33const NOT_FOUND_CACHE_CONTROL: &str = "no-store";
34const IMMUTABLE_NOT_FOUND_CACHE_CONTROL: &str = "public, max-age=0, s-maxage=5";
35const OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS_ENV: &str = "HTREE_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS";
36const DEFAULT_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS: u64 = 15_000;
37
38/// Default maximum upload size in bytes (5 MB)
39pub const DEFAULT_MAX_UPLOAD_SIZE: usize = 5 * 1024 * 1024;
40const OPTIMISTIC_UPLOAD_MIN_QUEUE_CHARGE_BYTES: usize = 256 * 1024;
41const MAX_BATCH_UPLOAD_BLOBS: usize = 1024;
42const MAX_BATCH_UPLOAD_BYTES: usize = 64 * 1024 * 1024;
43const SLOW_BATCH_UPLOAD_LOG_MS_ENV: &str = "HTREE_SLOW_BATCH_UPLOAD_LOG_MS";
44
45fn slow_batch_upload_log_ms() -> Option<u128> {
46    std::env::var(SLOW_BATCH_UPLOAD_LOG_MS_ENV)
47        .ok()
48        .and_then(|value| value.parse::<u128>().ok())
49        .filter(|value| *value > 0)
50}
51
52#[derive(Debug, Clone, Copy)]
53pub(super) struct OptimisticUploadQueueSnapshot {
54    pub enabled: bool,
55    pub max_bytes: usize,
56    pub available_bytes: usize,
57    pub reserved_bytes: usize,
58    pub in_flight: usize,
59    pub queue_timeout_ms: u64,
60}
61
62/// Check if a pubkey has write access based on allowed_npubs config or social graph
63/// Returns Ok(()) if allowed, Err with JSON error body if denied
64#[allow(clippy::result_large_err)]
65fn check_write_access(state: &AppState, pubkey: &str) -> Result<(), Response<Body>> {
66    // Check if pubkey is in the allowed list (converted from npub to hex)
67    if is_allowed_write_author(state, pubkey) {
68        tracing::debug!(
69            "Blossom write allowed for {}... (allowed writer)",
70            &pubkey[..8.min(pubkey.len())]
71        );
72        return Ok(());
73    }
74
75    // Not in allowed list or social graph
76    tracing::info!(
77        "Blossom write denied for {}... (not in allowed_npubs or social graph)",
78        &pubkey[..8.min(pubkey.len())]
79    );
80    Err(Response::builder()
81        .status(StatusCode::FORBIDDEN)
82        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
83        .header(header::CONTENT_TYPE, "application/json")
84        .body(Body::from(
85            r#"{"error":"Write access denied. Your pubkey is not in the allowed list."}"#,
86        ))
87        .unwrap())
88}
89
90fn is_allowed_write_author(state: &AppState, pubkey: &str) -> bool {
91    if state.allowed_pubkeys.contains(pubkey) {
92        return true;
93    }
94
95    state
96        .social_graph
97        .as_ref()
98        .map(|sg| sg.check_write_access(pubkey))
99        .unwrap_or(false)
100}
101
102fn can_accept_upload_author(state: &AppState, pubkey: &str) -> bool {
103    state.public_writes || is_allowed_write_author(state, pubkey)
104}
105
106fn validate_upload_payload(
107    body: &[u8],
108    content_type: &str,
109    can_upload_author: bool,
110    require_random_untrusted_ingest: bool,
111) -> Result<(), (StatusCode, String)> {
112    let is_chk_upload = is_chk_content_type(content_type);
113
114    if !is_chk_upload && !can_upload_author {
115        return Err((
116            StatusCode::FORBIDDEN,
117            "Raw media uploads require write access".to_string(),
118        ));
119    }
120
121    if is_chk_upload {
122        let require_random = require_random_untrusted_ingest && !can_upload_author;
123        validate_untrusted_blob(body, require_random)
124            .map_err(|IngestRejection { status, reason }| (status, reason))?;
125    }
126
127    Ok(())
128}
129
130fn blossom_json_error(status: StatusCode, reason: impl Into<String>) -> Response<Body> {
131    let reason = reason.into();
132    Response::builder()
133        .status(status)
134        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
135        .header("X-Reason", reason.as_str())
136        .header(header::CONTENT_TYPE, "application/json")
137        .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
138        .unwrap()
139}
140
141fn blossom_retryable_json_error(
142    status: StatusCode,
143    reason: impl Into<String>,
144    retry_after_seconds: u64,
145) -> Response<Body> {
146    let reason = reason.into();
147    Response::builder()
148        .status(status)
149        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
150        .header("X-Reason", reason.as_str())
151        .header(header::RETRY_AFTER, retry_after_seconds.to_string())
152        .header(header::CONTENT_TYPE, "application/json")
153        .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
154        .unwrap()
155}
156
157/// Blob descriptor returned by upload and list endpoints
158#[derive(Debug, Clone, Serialize, Deserialize)]
159pub struct BlobDescriptor {
160    pub url: String,
161    pub sha256: String,
162    pub size: u64,
163    #[serde(rename = "type")]
164    pub mime_type: String,
165    pub uploaded: u64,
166}
167
168#[derive(Debug, Deserialize)]
169pub struct BatchUploadBlob {
170    pub sha256: String,
171    #[serde(default, alias = "contentType")]
172    pub content_type: Option<String>,
173    pub data: String,
174}
175
176#[derive(Debug, Deserialize)]
177pub struct BatchUploadRequest {
178    pub blobs: Vec<BatchUploadBlob>,
179}
180
181#[derive(Debug, Serialize)]
182pub struct BatchUploadResponse {
183    pub uploaded: usize,
184    pub blobs: Vec<BlobDescriptor>,
185}
186
187/// Query parameters for list endpoint
188#[derive(Debug, Deserialize)]
189pub struct ListQuery {
190    pub since: Option<u64>,
191    pub until: Option<u64>,
192    pub limit: Option<usize>,
193    pub cursor: Option<String>,
194}
195
196/// Parsed Nostr authorization event
197#[derive(Debug)]
198pub struct BlossomAuth {
199    pub pubkey: String,
200    pub kind: u16,
201    pub created_at: u64,
202    pub expiration: Option<u64>,
203    pub action: Option<String>,   // "upload", "delete", "list", "get"
204    pub blob_hashes: Vec<String>, // x tags
205    pub server: Option<String>,   // server tag
206}
207
208/// Parse and verify Nostr authorization from header
209/// Returns the verified auth or an error response
210pub fn verify_blossom_auth(
211    headers: &HeaderMap,
212    required_action: &str,
213    required_hash: Option<&str>,
214) -> Result<BlossomAuth, (StatusCode, &'static str)> {
215    let auth_header = headers
216        .get(header::AUTHORIZATION)
217        .and_then(|v| v.to_str().ok())
218        .ok_or((StatusCode::UNAUTHORIZED, "Missing Authorization header"))?;
219
220    let nostr_event = auth_header.strip_prefix("Nostr ").ok_or((
221        StatusCode::UNAUTHORIZED,
222        "Invalid auth scheme, expected 'Nostr'",
223    ))?;
224
225    // Decode base64 event
226    let engine = base64::engine::general_purpose::STANDARD;
227    let event_bytes = engine
228        .decode(nostr_event)
229        .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid base64 in auth header"))?;
230
231    let event_json: serde_json::Value = serde_json::from_slice(&event_bytes)
232        .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid JSON in auth event"))?;
233
234    // Extract event fields
235    let kind = event_json["kind"]
236        .as_u64()
237        .ok_or((StatusCode::BAD_REQUEST, "Missing kind in event"))?;
238
239    if kind != BLOSSOM_AUTH_KIND as u64 {
240        return Err((
241            StatusCode::BAD_REQUEST,
242            "Invalid event kind, expected 24242",
243        ));
244    }
245
246    let pubkey = event_json["pubkey"]
247        .as_str()
248        .ok_or((StatusCode::BAD_REQUEST, "Missing pubkey in event"))?
249        .to_string();
250
251    let created_at = event_json["created_at"]
252        .as_u64()
253        .ok_or((StatusCode::BAD_REQUEST, "Missing created_at in event"))?;
254
255    let sig = event_json["sig"]
256        .as_str()
257        .ok_or((StatusCode::BAD_REQUEST, "Missing signature in event"))?;
258
259    // Verify signature
260    if !verify_nostr_signature(&event_json, &pubkey, sig) {
261        return Err((StatusCode::UNAUTHORIZED, "Invalid signature"));
262    }
263
264    // Parse tags
265    let tags = event_json["tags"]
266        .as_array()
267        .ok_or((StatusCode::BAD_REQUEST, "Missing tags in event"))?;
268
269    let mut expiration: Option<u64> = None;
270    let mut action: Option<String> = None;
271    let mut blob_hashes: Vec<String> = Vec::new();
272    let mut server: Option<String> = None;
273
274    for tag in tags {
275        let tag_arr = tag.as_array();
276        if let Some(arr) = tag_arr {
277            if arr.len() >= 2 {
278                let tag_name = arr[0].as_str().unwrap_or("");
279                let tag_value = arr[1].as_str().unwrap_or("");
280
281                match tag_name {
282                    "t" => action = Some(tag_value.to_string()),
283                    "x" => blob_hashes.push(tag_value.to_lowercase()),
284                    "expiration" => expiration = tag_value.parse().ok(),
285                    "server" => server = Some(tag_value.to_string()),
286                    _ => {}
287                }
288            }
289        }
290    }
291
292    // Validate expiration
293    let now = SystemTime::now()
294        .duration_since(UNIX_EPOCH)
295        .unwrap()
296        .as_secs();
297
298    if let Some(exp) = expiration {
299        if exp < now {
300            return Err((StatusCode::UNAUTHORIZED, "Authorization expired"));
301        }
302    }
303
304    // Validate created_at is not in the future (with 60s tolerance)
305    if created_at > now + 60 {
306        return Err((StatusCode::BAD_REQUEST, "Event created_at is in the future"));
307    }
308
309    // Validate action matches
310    if let Some(ref act) = action {
311        if act != required_action {
312            return Err((StatusCode::FORBIDDEN, "Action mismatch"));
313        }
314    } else {
315        return Err((StatusCode::BAD_REQUEST, "Missing 't' tag for action"));
316    }
317
318    // Validate hash if required
319    if let Some(hash) = required_hash {
320        if !blob_hashes.is_empty() && !blob_hashes.contains(&hash.to_lowercase()) {
321            return Err((StatusCode::FORBIDDEN, "Blob hash not authorized"));
322        }
323    }
324
325    Ok(BlossomAuth {
326        pubkey,
327        kind: kind as u16,
328        created_at,
329        expiration,
330        action,
331        blob_hashes,
332        server,
333    })
334}
335
336/// Verify Nostr event signature using secp256k1
337fn verify_nostr_signature(event: &serde_json::Value, pubkey: &str, sig: &str) -> bool {
338    use secp256k1::{schnorr::Signature, Message, Secp256k1, XOnlyPublicKey};
339
340    // Compute event ID (sha256 of serialized event)
341    let content = event["content"].as_str().unwrap_or("");
342    let full_serialized = format!(
343        "[0,\"{}\",{},{},{},\"{}\"]",
344        pubkey,
345        event["created_at"],
346        event["kind"],
347        event["tags"],
348        escape_json_string(content),
349    );
350
351    let mut hasher = Sha256::new();
352    hasher.update(full_serialized.as_bytes());
353    let event_id = hasher.finalize();
354
355    // Parse pubkey and signature
356    let pubkey_bytes = match hex::decode(pubkey) {
357        Ok(b) => b,
358        Err(_) => return false,
359    };
360
361    let sig_bytes = match hex::decode(sig) {
362        Ok(b) => b,
363        Err(_) => return false,
364    };
365
366    let secp = Secp256k1::verification_only();
367
368    let xonly_pubkey = match XOnlyPublicKey::from_slice(&pubkey_bytes) {
369        Ok(pk) => pk,
370        Err(_) => return false,
371    };
372
373    let signature = match Signature::from_slice(&sig_bytes) {
374        Ok(s) => s,
375        Err(_) => return false,
376    };
377
378    let message = match Message::from_digest_slice(&event_id) {
379        Ok(m) => m,
380        Err(_) => return false,
381    };
382
383    secp.verify_schnorr(&signature, &message, &xonly_pubkey)
384        .is_ok()
385}
386
387/// Escape string for JSON serialization
388fn escape_json_string(s: &str) -> String {
389    let mut result = String::new();
390    for c in s.chars() {
391        match c {
392            '"' => result.push_str("\\\""),
393            '\\' => result.push_str("\\\\"),
394            '\n' => result.push_str("\\n"),
395            '\r' => result.push_str("\\r"),
396            '\t' => result.push_str("\\t"),
397            c if c.is_control() => {
398                result.push_str(&format!("\\u{:04x}", c as u32));
399            }
400            c => result.push(c),
401        }
402    }
403    result
404}
405
406/// CORS preflight handler for all Blossom endpoints
407/// Echoes back Access-Control-Request-Headers to allow any headers
408pub async fn cors_preflight(headers: HeaderMap) -> impl IntoResponse {
409    // Echo back requested headers, or use sensible defaults that cover common Blossom headers
410    let allowed_headers = headers
411        .get(header::ACCESS_CONTROL_REQUEST_HEADERS)
412        .and_then(|v| v.to_str().ok())
413        .unwrap_or("Authorization, Content-Type, X-SHA-256, x-sha-256");
414
415    // Always include common headers in addition to what was requested
416    let full_allowed = format!(
417        "{}, Authorization, Content-Type, X-SHA-256, x-sha-256, Accept, Cache-Control",
418        allowed_headers
419    );
420
421    Response::builder()
422        .status(StatusCode::NO_CONTENT)
423        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
424        .header(
425            header::ACCESS_CONTROL_ALLOW_METHODS,
426            "GET, HEAD, PUT, DELETE, OPTIONS",
427        )
428        .header(header::ACCESS_CONTROL_ALLOW_HEADERS, full_allowed)
429        .header(header::ACCESS_CONTROL_MAX_AGE, "86400")
430        .body(Body::empty())
431        .unwrap()
432}
433
434/// HEAD /<sha256> - Check if blob exists
435pub async fn head_blob(
436    State(state): State<AppState>,
437    Path(id): Path<String>,
438    connect_info: axum::extract::ConnectInfo<std::net::SocketAddr>,
439) -> impl IntoResponse {
440    let is_localhost = connect_info.0.ip().is_loopback();
441    let (hash_part, ext) = parse_hash_and_extension(&id);
442
443    if !is_valid_sha256(hash_part) {
444        return Response::builder()
445            .status(StatusCode::BAD_REQUEST)
446            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
447            .header("X-Reason", "Invalid SHA256 hash")
448            .body(Body::empty())
449            .unwrap();
450    }
451
452    let sha256_hex = hash_part.to_lowercase();
453    let sha256_bytes: [u8; 32] = match from_hex(&sha256_hex) {
454        Ok(b) => b,
455        Err(_) => {
456            return Response::builder()
457                .status(StatusCode::BAD_REQUEST)
458                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
459                .header("X-Reason", "Invalid SHA256 format")
460                .body(Body::empty())
461                .unwrap()
462        }
463    };
464
465    // Blossom HEAD only needs metadata; avoid reading the full blob body just to
466    // answer cache probes and CDN revalidation. The read permit keeps CDN probe
467    // storms from filling Tokio's blocking thread pool while old blobs without
468    // metadata are still being normalized.
469    let blob_size = if let Some(cached) = state.blob_cache.get_size(&sha256_hex) {
470        Ok(Ok(cached))
471    } else {
472        let permit = match acquire_blob_read().await {
473            Ok(permit) => permit,
474            Err(_) => {
475                return Response::builder()
476                    .status(StatusCode::SERVICE_UNAVAILABLE)
477                    .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
478                    .header(header::CACHE_CONTROL, NOT_FOUND_CACHE_CONTROL)
479                    .header("Retry-After", "1")
480                    .header("X-Reason", BLOB_READ_BUSY)
481                    .body(Body::empty())
482                    .unwrap()
483            }
484        };
485        let store = state.store.clone();
486        let size_read = tokio::task::spawn_blocking(move || store.blob_size(&sha256_bytes));
487        let timed = tokio::time::timeout(blob_read_timeout(), size_read).await;
488        drop(permit);
489        let result = match timed {
490            Ok(result) => result.map_err(|_| ()),
491            Err(_) => Err(()),
492        };
493        if let Ok(Ok(size)) = result {
494            state.blob_cache.put_size(sha256_hex.clone(), size);
495        }
496        result
497    };
498
499    match blob_size {
500        Ok(Ok(Some(size))) => {
501            let mime_type = ext
502                .map(|e| get_mime_type(&format!("file{}", e)))
503                .unwrap_or("application/octet-stream");
504
505            let mut builder = Response::builder()
506                .status(StatusCode::OK)
507                .header(header::CONTENT_TYPE, mime_type)
508                .header(header::CONTENT_LENGTH, size)
509                .header(header::ACCEPT_RANGES, "bytes")
510                .header(header::CACHE_CONTROL, IMMUTABLE_CACHE_CONTROL)
511                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*");
512            if is_localhost {
513                builder = builder.header("X-Source", "local");
514            }
515            builder.body(Body::empty()).unwrap()
516        }
517        Ok(Ok(None)) => Response::builder()
518            .status(StatusCode::NOT_FOUND)
519            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
520            .header(header::CACHE_CONTROL, IMMUTABLE_NOT_FOUND_CACHE_CONTROL)
521            .header("X-Reason", "Blob not found")
522            .body(Body::empty())
523            .unwrap(),
524        _ => Response::builder()
525            .status(StatusCode::INTERNAL_SERVER_ERROR)
526            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
527            .body(Body::empty())
528            .unwrap(),
529    }
530}
531
532async fn store_blossom_blob_without_blocking_runtime(
533    state: &AppState,
534    data: axum::body::Bytes,
535    pubkey: [u8; 32],
536    track_ownership: bool,
537) -> anyhow::Result<()> {
538    let mut hasher = Sha256::new();
539    hasher.update(&data);
540    let hash_hex = hex::encode(hasher.finalize());
541    let data_for_cache = data.clone();
542    let permit = acquire_blob_write()
543        .await
544        .map_err(|err| anyhow::anyhow!(err))?;
545    let store = state.store.clone();
546    tokio::task::spawn_blocking(move || {
547        let _permit = permit;
548        if track_ownership {
549            store.put_owned_blob(&data, &pubkey)?;
550        } else {
551            store.put_cached_blob(&data)?;
552        }
553        Ok::<(), anyhow::Error>(())
554    })
555    .await
556    .map_err(|err| anyhow::anyhow!("blob write task failed: {}", err))??;
557    state
558        .blob_cache
559        .put_size(hash_hex.clone(), Some(data_for_cache.len() as u64));
560    state.blob_cache.put_body(hash_hex, &data_for_cache);
561    Ok(())
562}
563
564fn upload_descriptor_response(status: StatusCode, descriptor: &BlobDescriptor) -> Response<Body> {
565    Response::builder()
566        .status(status)
567        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
568        .header(header::CONTENT_TYPE, "application/json")
569        .body(Body::from(serde_json::to_string(descriptor).unwrap()))
570        .unwrap()
571}
572
573fn optimistic_upload_queue_timeout() -> Duration {
574    let millis = std::env::var(OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS_ENV)
575        .ok()
576        .and_then(|value| value.parse::<u64>().ok())
577        .filter(|value| *value > 0)
578        .unwrap_or(DEFAULT_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS);
579    Duration::from_millis(millis)
580}
581
582fn optimistic_upload_inflight() -> &'static Mutex<HashSet<String>> {
583    static INFLIGHT: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
584    INFLIGHT.get_or_init(|| Mutex::new(HashSet::new()))
585}
586
587fn optimistic_upload_is_inflight(hash_hex: &str) -> bool {
588    optimistic_upload_inflight()
589        .lock()
590        .is_ok_and(|inflight| inflight.contains(hash_hex))
591}
592
593fn mark_optimistic_upload_inflight(hash_hex: &str) -> bool {
594    optimistic_upload_inflight()
595        .lock()
596        .map(|mut inflight| inflight.insert(hash_hex.to_string()))
597        .unwrap_or(true)
598}
599
600fn clear_optimistic_upload_inflight(hash_hex: &str) {
601    if let Ok(mut inflight) = optimistic_upload_inflight().lock() {
602        inflight.remove(hash_hex);
603    }
604}
605
606pub(super) fn optimistic_upload_queue_snapshot(state: &AppState) -> OptimisticUploadQueueSnapshot {
607    let max_bytes = state.optimistic_upload_queue_bytes;
608    let available_bytes = state
609        .optimistic_upload_queue
610        .available_permits()
611        .min(max_bytes);
612    let in_flight = optimistic_upload_inflight()
613        .lock()
614        .map(|inflight| inflight.len())
615        .unwrap_or(0);
616
617    OptimisticUploadQueueSnapshot {
618        enabled: state.optimistic_blossom_uploads,
619        max_bytes,
620        available_bytes,
621        reserved_bytes: max_bytes.saturating_sub(available_bytes),
622        in_flight,
623        queue_timeout_ms: duration_millis_u64(optimistic_upload_queue_timeout()),
624    }
625}
626
627fn duration_millis_u64(duration: Duration) -> u64 {
628    duration.as_millis().min(u128::from(u64::MAX)) as u64
629}
630
631async fn acquire_optimistic_upload_queue(
632    state: &AppState,
633    permits: u32,
634) -> Result<tokio::sync::OwnedSemaphorePermit, &'static str> {
635    match tokio::time::timeout(
636        optimistic_upload_queue_timeout(),
637        state
638            .optimistic_upload_queue
639            .clone()
640            .acquire_many_owned(permits),
641    )
642    .await
643    {
644        Ok(Ok(permit)) => Ok(permit),
645        Ok(Err(_)) => Err("Optimistic upload queue is closed"),
646        Err(_) => Err("Optimistic upload queue is full"),
647    }
648}
649
650async fn uploaded_blob_already_exists(
651    state: &AppState,
652    sha256_hash: [u8; 32],
653    sha256_hex: &str,
654) -> Result<bool, String> {
655    if let Some(Some(_)) = state.blob_cache.get_size(sha256_hex) {
656        return Ok(true);
657    }
658
659    let permit = acquire_blob_read().await.map_err(str::to_string)?;
660    let store = state.store.clone();
661    let size_read = tokio::task::spawn_blocking(move || {
662        store
663            .blob_size(&sha256_hash)
664            .map_err(|error| error.to_string())
665    });
666
667    let result = tokio::time::timeout(blob_read_timeout(), size_read).await;
668    drop(permit);
669    match result {
670        Ok(Ok(Ok(size))) => {
671            state.blob_cache.put_size(sha256_hex.to_string(), size);
672            Ok(size.is_some())
673        }
674        Ok(Ok(Err(error))) => Err(error),
675        Ok(Err(error)) => Err(format!("blob existence task failed: {}", error)),
676        Err(_) => Err("blob existence check timed out".to_string()),
677    }
678}
679
680async fn set_existing_blob_owner_without_body_write(
681    state: AppState,
682    sha256_hash: [u8; 32],
683    pubkey: [u8; 32],
684) -> anyhow::Result<()> {
685    tokio::task::spawn_blocking(move || state.store.set_blob_owner(&sha256_hash, &pubkey))
686        .await
687        .map_err(|error| anyhow::anyhow!("blob owner task failed: {}", error))??;
688    Ok(())
689}
690
691/// PUT /upload - Upload a new blob (BUD-02)
692pub async fn upload_blob(
693    State(state): State<AppState>,
694    headers: HeaderMap,
695    body: axum::body::Bytes,
696) -> impl IntoResponse {
697    // Check size limit first (before auth to save resources)
698    let max_size = state.max_upload_bytes;
699    if body.len() > max_size {
700        return Response::builder()
701            .status(StatusCode::PAYLOAD_TOO_LARGE)
702            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
703            .header(header::CONTENT_TYPE, "application/json")
704            .body(Body::from(format!(
705                r#"{{"error":"Upload size {} bytes exceeds maximum {} bytes ({} MB)"}}"#,
706                body.len(),
707                max_size,
708                max_size / 1024 / 1024
709            )))
710            .unwrap();
711    }
712
713    // Verify authorization
714    let auth = match verify_blossom_auth(&headers, "upload", None) {
715        Ok(a) => a,
716        Err((status, reason)) => {
717            return Response::builder()
718                .status(status)
719                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
720                .header("X-Reason", reason)
721                .header(header::CONTENT_TYPE, "application/json")
722                .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
723                .unwrap();
724        }
725    };
726
727    // Get content type from header
728    let content_type = headers
729        .get(header::CONTENT_TYPE)
730        .and_then(|v| v.to_str().ok())
731        .unwrap_or("application/octet-stream")
732        .to_string();
733
734    // Check write access: either in allowed_npubs/social graph OR public_writes is enabled.
735    let is_allowed_author = is_allowed_write_author(&state, &auth.pubkey);
736    let can_upload = can_accept_upload_author(&state, &auth.pubkey);
737    if !can_upload {
738        let _ = check_write_access(&state, &auth.pubkey);
739        return Response::builder()
740            .status(StatusCode::FORBIDDEN)
741            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
742            .header(header::CONTENT_TYPE, "application/json")
743            .body(Body::from(r#"{"error":"Write access denied. Your pubkey is not in the allowed list and public writes are disabled."}"#))
744            .unwrap();
745    }
746
747    if let Err((status, reason)) = validate_upload_payload(
748        &body,
749        &content_type,
750        can_upload,
751        state.require_random_untrusted_ingest,
752    ) {
753        return blossom_json_error(status, reason);
754    }
755
756    // Compute SHA256 of uploaded data
757    let mut hasher = Sha256::new();
758    hasher.update(&body);
759    let sha256_hash: [u8; 32] = hasher.finalize().into();
760    let sha256_hex = hex::encode(sha256_hash);
761
762    // If auth has x tags, verify hash matches
763    if !auth.blob_hashes.is_empty() && !auth.blob_hashes.contains(&sha256_hex) {
764        return Response::builder()
765            .status(StatusCode::FORBIDDEN)
766            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
767            .header(
768                "X-Reason",
769                "Uploaded blob hash does not match authorized hash",
770            )
771            .header(header::CONTENT_TYPE, "application/json")
772            .body(Body::from(r#"{"error":"Hash mismatch"}"#))
773            .unwrap();
774    }
775
776    // Convert pubkey hex to bytes
777    let pubkey_bytes = match from_hex(&auth.pubkey) {
778        Ok(b) => b,
779        Err(_) => {
780            return Response::builder()
781                .status(StatusCode::BAD_REQUEST)
782                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
783                .header("X-Reason", "Invalid pubkey format")
784                .body(Body::empty())
785                .unwrap();
786        }
787    };
788
789    let size = body.len() as u64;
790    let now = SystemTime::now()
791        .duration_since(UNIX_EPOCH)
792        .unwrap()
793        .as_secs();
794    let ext = mime_to_extension(&content_type_base(&content_type));
795    let descriptor = BlobDescriptor {
796        url: format!("/{}{}", sha256_hex, ext),
797        sha256: sha256_hex.clone(),
798        size,
799        mime_type: content_type,
800        uploaded: now,
801    };
802
803    if state.optimistic_blossom_uploads && optimistic_upload_is_inflight(&sha256_hex) {
804        return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
805    }
806
807    // Store public-write blobs in cache storage unless the writer is explicitly
808    // allowed, so untrusted public uploads do not become protected owned data.
809    if state.optimistic_blossom_uploads {
810        let queued_bytes = body.len().max(OPTIMISTIC_UPLOAD_MIN_QUEUE_CHARGE_BYTES);
811        if queued_bytes <= state.optimistic_upload_queue_bytes {
812            let permits = queued_bytes as u32;
813            let marked_inflight = mark_optimistic_upload_inflight(&sha256_hex);
814            if !marked_inflight {
815                return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
816            }
817            let permit = match acquire_optimistic_upload_queue(&state, permits).await {
818                Ok(permit) => permit,
819                Err(_) => {
820                    clear_optimistic_upload_inflight(&sha256_hex);
821                    return blossom_retryable_json_error(
822                        StatusCode::SERVICE_UNAVAILABLE,
823                        "Optimistic upload queue is full",
824                        2,
825                    );
826                }
827            };
828            let state_for_write = state.clone();
829            let hash_for_log = sha256_hex.clone();
830            tokio::spawn(async move {
831                let _permit = permit;
832                if let Err(error) = store_blossom_blob_without_blocking_runtime(
833                    &state_for_write,
834                    body,
835                    pubkey_bytes,
836                    is_allowed_author,
837                )
838                .await
839                {
840                    tracing::error!(
841                        "Background Blossom storage failed for {}: {:#}",
842                        hash_for_log,
843                        error
844                    );
845                }
846                clear_optimistic_upload_inflight(&hash_for_log);
847            });
848
849            return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
850        }
851
852        tracing::warn!(
853            "Blossom upload {} is larger than optimistic queue budget {}; storing synchronously",
854            queued_bytes,
855            state.optimistic_upload_queue_bytes
856        );
857    }
858
859    match uploaded_blob_already_exists(&state, sha256_hash, &sha256_hex).await {
860        Ok(true) => {
861            if is_allowed_author {
862                if let Err(error) = set_existing_blob_owner_without_body_write(
863                    state.clone(),
864                    sha256_hash,
865                    pubkey_bytes,
866                )
867                .await
868                {
869                    return Response::builder()
870                        .status(StatusCode::INTERNAL_SERVER_ERROR)
871                        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
872                        .header("X-Reason", "Storage error")
873                        .header(header::CONTENT_TYPE, "application/json")
874                        .body(Body::from(format!(r#"{{"error":"{}"}}"#, error)))
875                        .unwrap();
876                }
877            }
878            return upload_descriptor_response(StatusCode::CONFLICT, &descriptor);
879        }
880        Ok(false) => {}
881        Err(error) => {
882            tracing::debug!(
883                "Could not preflight Blossom upload {} before synchronous storage: {}",
884                sha256_hex,
885                error
886            );
887        }
888    }
889
890    let store_result = store_blossom_blob_without_blocking_runtime(
891        &state,
892        body.clone(),
893        pubkey_bytes,
894        is_allowed_author,
895    )
896    .await;
897
898    match store_result {
899        Ok(()) => upload_descriptor_response(StatusCode::OK, &descriptor),
900        Err(e) => Response::builder()
901            .status(StatusCode::INTERNAL_SERVER_ERROR)
902            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
903            .header("X-Reason", "Storage error")
904            .header(header::CONTENT_TYPE, "application/json")
905            .body(Body::from(format!(r#"{{"error":"{}"}}"#, e)))
906            .unwrap(),
907    }
908}
909
910/// POST /upload/batch - Upload multiple blobs with one auth event and one storage batch.
911pub async fn upload_blob_batch(
912    State(state): State<AppState>,
913    headers: HeaderMap,
914    Json(payload): Json<BatchUploadRequest>,
915) -> impl IntoResponse {
916    let started_at = Instant::now();
917    let slow_log_ms = slow_batch_upload_log_ms();
918    let payload_blobs = payload.blobs.len();
919    if payload.blobs.is_empty() {
920        return blossom_json_error(StatusCode::BAD_REQUEST, "Batch is empty");
921    }
922    if payload.blobs.len() > MAX_BATCH_UPLOAD_BLOBS {
923        return blossom_json_error(
924            StatusCode::PAYLOAD_TOO_LARGE,
925            "Batch contains too many blobs",
926        );
927    }
928
929    let auth = match verify_blossom_auth(&headers, "upload", None) {
930        Ok(a) => a,
931        Err((status, reason)) => {
932            return Response::builder()
933                .status(status)
934                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
935                .header("X-Reason", reason)
936                .header(header::CONTENT_TYPE, "application/json")
937                .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
938                .unwrap();
939        }
940    };
941    let auth_ms = started_at.elapsed().as_millis();
942
943    let is_allowed_author = is_allowed_write_author(&state, &auth.pubkey);
944    let can_upload = can_accept_upload_author(&state, &auth.pubkey);
945    if !can_upload {
946        let _ = check_write_access(&state, &auth.pubkey);
947        return blossom_json_error(StatusCode::FORBIDDEN, "Write access denied");
948    }
949
950    let pubkey_bytes = match from_hex(&auth.pubkey) {
951        Ok(bytes) => bytes,
952        Err(_) => return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid pubkey format"),
953    };
954
955    let now = SystemTime::now()
956        .duration_since(UNIX_EPOCH)
957        .unwrap()
958        .as_secs();
959    let mut total_bytes = 0usize;
960    let mut items = Vec::with_capacity(payload.blobs.len());
961    let mut descriptors = Vec::with_capacity(payload.blobs.len());
962    let mut decode_hash_ms = 0u128;
963    let mut validate_ms = 0u128;
964
965    for blob in payload.blobs {
966        let decode_started = Instant::now();
967        let sha256_hex = blob.sha256.to_lowercase();
968        let expected_hash: [u8; 32] = match from_hex(&sha256_hex) {
969            Ok(hash) => hash,
970            Err(_) => return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid blob hash"),
971        };
972        if !auth.blob_hashes.is_empty() && !auth.blob_hashes.contains(&sha256_hex) {
973            return blossom_json_error(
974                StatusCode::FORBIDDEN,
975                "Uploaded blob hash does not match authorized hash",
976            );
977        }
978
979        let data = match base64::engine::general_purpose::STANDARD.decode(blob.data.as_bytes()) {
980            Ok(data) => data,
981            Err(_) => return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid blob data"),
982        };
983        if data.len() > state.max_upload_bytes {
984            return blossom_json_error(
985                StatusCode::PAYLOAD_TOO_LARGE,
986                "Blob exceeds maximum upload size",
987            );
988        }
989        total_bytes = total_bytes.saturating_add(data.len());
990        if total_bytes > MAX_BATCH_UPLOAD_BYTES {
991            return blossom_json_error(
992                StatusCode::PAYLOAD_TOO_LARGE,
993                "Batch exceeds maximum upload size",
994            );
995        }
996
997        let mut hasher = Sha256::new();
998        hasher.update(&data);
999        let actual_hash: [u8; 32] = hasher.finalize().into();
1000        if actual_hash != expected_hash {
1001            return blossom_json_error(StatusCode::FORBIDDEN, "Hash mismatch");
1002        }
1003        decode_hash_ms += decode_started.elapsed().as_millis();
1004
1005        let validate_started = Instant::now();
1006        let content_type = blob
1007            .content_type
1008            .as_deref()
1009            .map(content_type_base)
1010            .unwrap_or_else(|| "application/octet-stream".to_string());
1011        if let Err((status, reason)) = validate_upload_payload(
1012            &data,
1013            &content_type,
1014            can_upload,
1015            state.require_random_untrusted_ingest,
1016        ) {
1017            return blossom_json_error(status, reason);
1018        }
1019        validate_ms += validate_started.elapsed().as_millis();
1020
1021        let ext = mime_to_extension(&content_type);
1022        descriptors.push(BlobDescriptor {
1023            url: format!("/{}{}", sha256_hex, ext),
1024            sha256: sha256_hex,
1025            size: data.len() as u64,
1026            mime_type: content_type,
1027            uploaded: now,
1028        });
1029        items.push((actual_hash, data));
1030    }
1031    let prepare_ms = started_at.elapsed().as_millis();
1032
1033    let store = state.store.clone();
1034    let store_started = Instant::now();
1035    let stored = tokio::task::spawn_blocking(move || {
1036        if is_allowed_author {
1037            store.put_owned_blobs(&items, &pubkey_bytes)
1038        } else {
1039            store.put_cached_blobs(&items)
1040        }
1041    })
1042    .await
1043    .map_err(|error| anyhow::anyhow!("blob batch write task failed: {}", error));
1044    let store_ms = store_started.elapsed().as_millis();
1045    let total_ms = started_at.elapsed().as_millis();
1046
1047    match stored {
1048        Ok(Ok(uploaded)) => {
1049            if slow_log_ms.is_some_and(|threshold| total_ms >= threshold) {
1050                tracing::warn!(
1051                    blobs = payload_blobs,
1052                    uploaded,
1053                    total_bytes,
1054                    total_ms,
1055                    auth_ms,
1056                    prepare_ms,
1057                    decode_hash_ms,
1058                    validate_ms,
1059                    store_ms,
1060                    allowed_author = is_allowed_author,
1061                    "slow Blossom batch upload"
1062                );
1063            }
1064            Response::builder()
1065                .status(StatusCode::OK)
1066                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1067                .header(header::CONTENT_TYPE, "application/json")
1068                .body(Body::from(
1069                    serde_json::to_string(&BatchUploadResponse {
1070                        uploaded,
1071                        blobs: descriptors,
1072                    })
1073                    .unwrap(),
1074                ))
1075                .unwrap()
1076        }
1077        Ok(Err(error)) | Err(error) => Response::builder()
1078            .status(StatusCode::INTERNAL_SERVER_ERROR)
1079            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1080            .header("X-Reason", "Storage error")
1081            .header(header::CONTENT_TYPE, "application/json")
1082            .body(Body::from(format!(r#"{{"error":"{}"}}"#, error)))
1083            .unwrap(),
1084    }
1085}
1086
1087/// DELETE /<sha256> - Delete a blob (BUD-02)
1088/// Note: Blob is only fully deleted when ALL owners have removed it
1089pub async fn delete_blob(
1090    State(state): State<AppState>,
1091    Path(id): Path<String>,
1092    headers: HeaderMap,
1093) -> impl IntoResponse {
1094    let (hash_part, _) = parse_hash_and_extension(&id);
1095
1096    if !is_valid_sha256(hash_part) {
1097        return Response::builder()
1098            .status(StatusCode::BAD_REQUEST)
1099            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1100            .header("X-Reason", "Invalid SHA256 hash")
1101            .body(Body::empty())
1102            .unwrap();
1103    }
1104
1105    let sha256_hex = hash_part.to_lowercase();
1106
1107    // Convert hash to bytes
1108    let sha256_bytes = match from_hex(&sha256_hex) {
1109        Ok(b) => b,
1110        Err(_) => {
1111            return Response::builder()
1112                .status(StatusCode::BAD_REQUEST)
1113                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1114                .header("X-Reason", "Invalid SHA256 hash format")
1115                .body(Body::empty())
1116                .unwrap();
1117        }
1118    };
1119
1120    // Verify authorization with hash requirement
1121    let auth = match verify_blossom_auth(&headers, "delete", Some(&sha256_hex)) {
1122        Ok(a) => a,
1123        Err((status, reason)) => {
1124            return Response::builder()
1125                .status(status)
1126                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1127                .header("X-Reason", reason)
1128                .body(Body::empty())
1129                .unwrap();
1130        }
1131    };
1132
1133    // Convert pubkey hex to bytes
1134    let pubkey_bytes = match from_hex(&auth.pubkey) {
1135        Ok(b) => b,
1136        Err(_) => {
1137            return Response::builder()
1138                .status(StatusCode::BAD_REQUEST)
1139                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1140                .header("X-Reason", "Invalid pubkey format")
1141                .body(Body::empty())
1142                .unwrap();
1143        }
1144    };
1145
1146    // Check ownership - user must be one of the owners (O(1) lookup with composite key)
1147    match state.store.is_blob_owner(&sha256_bytes, &pubkey_bytes) {
1148        Ok(true) => {
1149            // User is an owner, proceed with delete
1150        }
1151        Ok(false) => {
1152            // Check if blob exists at all (for proper error message)
1153            match state.store.blob_has_owners(&sha256_bytes) {
1154                Ok(true) => {
1155                    return Response::builder()
1156                        .status(StatusCode::FORBIDDEN)
1157                        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1158                        .header("X-Reason", "Not a blob owner")
1159                        .body(Body::empty())
1160                        .unwrap();
1161                }
1162                Ok(false) => {
1163                    return Response::builder()
1164                        .status(StatusCode::NOT_FOUND)
1165                        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1166                        .header(header::CACHE_CONTROL, NOT_FOUND_CACHE_CONTROL)
1167                        .header("X-Reason", "Blob not found")
1168                        .body(Body::empty())
1169                        .unwrap();
1170                }
1171                Err(_) => {
1172                    return Response::builder()
1173                        .status(StatusCode::INTERNAL_SERVER_ERROR)
1174                        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1175                        .body(Body::empty())
1176                        .unwrap();
1177                }
1178            }
1179        }
1180        Err(_) => {
1181            return Response::builder()
1182                .status(StatusCode::INTERNAL_SERVER_ERROR)
1183                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1184                .body(Body::empty())
1185                .unwrap();
1186        }
1187    }
1188
1189    // Remove this user's ownership (blob only deleted when no owners remain)
1190    match state
1191        .store
1192        .delete_blossom_blob(&sha256_bytes, &pubkey_bytes)
1193    {
1194        Ok(fully_deleted) => {
1195            // Return 200 OK whether blob was fully deleted or just removed from user's list
1196            // The client doesn't need to know if other owners still exist
1197            Response::builder()
1198                .status(StatusCode::OK)
1199                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1200                .header(
1201                    "X-Blob-Deleted",
1202                    if fully_deleted { "true" } else { "false" },
1203                )
1204                .body(Body::empty())
1205                .unwrap()
1206        }
1207        Err(_) => Response::builder()
1208            .status(StatusCode::INTERNAL_SERVER_ERROR)
1209            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1210            .body(Body::empty())
1211            .unwrap(),
1212    }
1213}
1214
1215/// GET /list/<pubkey> - List blobs for a pubkey (BUD-02)
1216pub async fn list_blobs(
1217    State(state): State<AppState>,
1218    Path(pubkey): Path<String>,
1219    Query(query): Query<ListQuery>,
1220    headers: HeaderMap,
1221) -> impl IntoResponse {
1222    // Validate pubkey format (64 hex chars)
1223    if pubkey.len() != 64 || !pubkey.chars().all(|c| c.is_ascii_hexdigit()) {
1224        return Response::builder()
1225            .status(StatusCode::BAD_REQUEST)
1226            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1227            .header("X-Reason", "Invalid pubkey format")
1228            .header(header::CONTENT_TYPE, "application/json")
1229            .body(Body::from("[]"))
1230            .unwrap();
1231    }
1232
1233    let pubkey_hex = pubkey.to_lowercase();
1234    let pubkey_bytes: [u8; 32] = match from_hex(&pubkey_hex) {
1235        Ok(b) => b,
1236        Err(_) => {
1237            return Response::builder()
1238                .status(StatusCode::BAD_REQUEST)
1239                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1240                .header("X-Reason", "Invalid pubkey format")
1241                .header(header::CONTENT_TYPE, "application/json")
1242                .body(Body::from("[]"))
1243                .unwrap()
1244        }
1245    };
1246
1247    let auth = match verify_blossom_auth(&headers, "list", None) {
1248        Ok(auth) => auth,
1249        Err((status, reason)) => {
1250            return Response::builder()
1251                .status(status)
1252                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1253                .header("X-Reason", reason)
1254                .header(header::CONTENT_TYPE, "application/json")
1255                .body(Body::from("[]"))
1256                .unwrap();
1257        }
1258    };
1259
1260    if !auth.pubkey.eq_ignore_ascii_case(&pubkey_hex) {
1261        return Response::builder()
1262            .status(StatusCode::FORBIDDEN)
1263            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1264            .header("X-Reason", "Pubkey mismatch")
1265            .header(header::CONTENT_TYPE, "application/json")
1266            .body(Body::from("[]"))
1267            .unwrap();
1268    }
1269
1270    // Get blobs for this pubkey
1271    match state.store.list_blobs_by_pubkey(&pubkey_bytes) {
1272        Ok(blobs) => {
1273            // Apply filters
1274            let mut filtered: Vec<_> = blobs
1275                .into_iter()
1276                .filter(|b| {
1277                    if let Some(since) = query.since {
1278                        if b.uploaded < since {
1279                            return false;
1280                        }
1281                    }
1282                    if let Some(until) = query.until {
1283                        if b.uploaded > until {
1284                            return false;
1285                        }
1286                    }
1287                    true
1288                })
1289                .collect();
1290
1291            // Sort by uploaded descending (most recent first)
1292            filtered.sort_by(|a, b| b.uploaded.cmp(&a.uploaded));
1293
1294            // Apply limit
1295            let limit = query.limit.unwrap_or(100).min(1000);
1296            filtered.truncate(limit);
1297
1298            Response::builder()
1299                .status(StatusCode::OK)
1300                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1301                .header(header::CONTENT_TYPE, "application/json")
1302                .body(Body::from(serde_json::to_string(&filtered).unwrap()))
1303                .unwrap()
1304        }
1305        Err(_) => Response::builder()
1306            .status(StatusCode::INTERNAL_SERVER_ERROR)
1307            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1308            .header(header::CONTENT_TYPE, "application/json")
1309            .body(Body::from("[]"))
1310            .unwrap(),
1311    }
1312}
1313
1314// Helper functions
1315
1316fn parse_hash_and_extension(id: &str) -> (&str, Option<&str>) {
1317    if let Some(dot_pos) = id.rfind('.') {
1318        (&id[..dot_pos], Some(&id[dot_pos..]))
1319    } else {
1320        (id, None)
1321    }
1322}
1323
1324fn is_valid_sha256(s: &str) -> bool {
1325    s.len() == 64 && s.chars().all(|c| c.is_ascii_hexdigit())
1326}
1327
1328#[cfg(test)]
1329fn store_blossom_blob(
1330    state: &AppState,
1331    data: &[u8],
1332    _sha256: &[u8; 32],
1333    pubkey: &[u8; 32],
1334    track_ownership: bool,
1335) -> anyhow::Result<()> {
1336    if track_ownership {
1337        state.store.put_owned_blob(data, pubkey)?;
1338    } else {
1339        state.store.put_cached_blob(data)?;
1340    }
1341
1342    Ok(())
1343}
1344
1345fn mime_to_extension(mime: &str) -> &'static str {
1346    match mime {
1347        "image/png" => ".png",
1348        "image/jpeg" => ".jpg",
1349        "image/gif" => ".gif",
1350        "image/webp" => ".webp",
1351        "image/svg+xml" => ".svg",
1352        "video/mp4" => ".mp4",
1353        "video/webm" => ".webm",
1354        "audio/mpeg" => ".mp3",
1355        "audio/ogg" => ".ogg",
1356        "application/pdf" => ".pdf",
1357        "text/plain" => ".txt",
1358        "text/html" => ".html",
1359        "application/json" => ".json",
1360        _ => "",
1361    }
1362}
1363
1364#[cfg(test)]
1365mod tests {
1366    use super::*;
1367    use crate::server::auth::WsRelayState;
1368    use crate::storage::HashtreeStore;
1369    use axum::response::IntoResponse;
1370    use base64::Engine;
1371    use hashtree_core::sha256;
1372    use std::collections::HashSet;
1373    use std::sync::{Arc, Mutex as StdMutex};
1374    use std::time::Duration;
1375    use tempfile::TempDir;
1376
1377    fn test_app_state(store: Arc<HashtreeStore>) -> AppState {
1378        AppState {
1379            store,
1380            auth: None,
1381            daemon_started_at: 1_700_000_000,
1382            peer_mode: crate::config::ServerMode::Normal,
1383            hash_get_enabled: true,
1384            http_webrtc_fetch: true,
1385            webrtc_peers: None,
1386            fips_transport: None,
1387            fetch_from_fips_peers: true,
1388            ws_relay: Arc::new(WsRelayState::new()),
1389            max_upload_bytes: 5 * 1024 * 1024,
1390            public_writes: true,
1391            require_random_untrusted_ingest: true,
1392            optimistic_blossom_uploads: false,
1393            optimistic_upload_queue_bytes: 512 * 1024 * 1024,
1394            optimistic_upload_queue: Arc::new(tokio::sync::Semaphore::new(512 * 1024 * 1024)),
1395            allowed_pubkeys: HashSet::new(),
1396            upstream_blossom: Vec::new(),
1397            social_graph: None,
1398            social_graph_store: None,
1399            social_graph_root: None,
1400            socialgraph_snapshot_public: false,
1401            nostr_relay: None,
1402            nostr_relay_urls: Vec::new(),
1403            tree_root_cache: Arc::new(StdMutex::new(std::collections::HashMap::new())),
1404            inflight_blob_fetches: Arc::new(tokio::sync::Mutex::new(
1405                std::collections::HashMap::new(),
1406            )),
1407            inflight_blob_reads: Arc::new(
1408                tokio::sync::Mutex::new(std::collections::HashMap::new()),
1409            ),
1410            blob_cache: Arc::new(crate::blob_cache::BlobCache::for_tests()),
1411            directory_listing_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1412            resolved_path_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1413            thumbnail_path_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1414            cid_size_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1415        }
1416    }
1417
1418    fn create_upload_auth_header(keys: &nostr::Keys) -> String {
1419        use nostr::{EventBuilder, Kind, Tag, TagKind, Timestamp};
1420
1421        let now = Timestamp::now();
1422        let event = EventBuilder::new(
1423            Kind::Custom(BLOSSOM_AUTH_KIND),
1424            "",
1425            vec![
1426                Tag::custom(TagKind::Custom("t".into()), vec!["upload".to_string()]),
1427                Tag::custom(
1428                    TagKind::Custom("expiration".into()),
1429                    vec![(now.as_u64() + 300).to_string()],
1430                ),
1431            ],
1432        )
1433        .custom_created_at(now)
1434        .to_event(keys)
1435        .expect("sign blossom auth");
1436        let json = serde_json::to_vec(&event).expect("serialize auth event");
1437        format!(
1438            "Nostr {}",
1439            base64::engine::general_purpose::STANDARD.encode(json)
1440        )
1441    }
1442
1443    #[test]
1444    fn test_is_valid_sha256() {
1445        assert!(is_valid_sha256(
1446            "e2bab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6"
1447        ));
1448        assert!(is_valid_sha256(
1449            "0000000000000000000000000000000000000000000000000000000000000000"
1450        ));
1451
1452        // Too short
1453        assert!(!is_valid_sha256("e2bab35b5296ec2242ded0a01f6d6723"));
1454        // Too long
1455        assert!(!is_valid_sha256(
1456            "e2bab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6aa"
1457        ));
1458        // Invalid chars
1459        assert!(!is_valid_sha256(
1460            "zzbab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6"
1461        ));
1462        // Empty
1463        assert!(!is_valid_sha256(""));
1464    }
1465
1466    #[test]
1467    fn test_parse_hash_and_extension() {
1468        let (hash, ext) = parse_hash_and_extension("abc123.png");
1469        assert_eq!(hash, "abc123");
1470        assert_eq!(ext, Some(".png"));
1471
1472        let (hash2, ext2) = parse_hash_and_extension("abc123");
1473        assert_eq!(hash2, "abc123");
1474        assert_eq!(ext2, None);
1475
1476        let (hash3, ext3) = parse_hash_and_extension("abc.123.jpg");
1477        assert_eq!(hash3, "abc.123");
1478        assert_eq!(ext3, Some(".jpg"));
1479    }
1480
1481    #[test]
1482    fn test_mime_to_extension() {
1483        assert_eq!(mime_to_extension("image/png"), ".png");
1484        assert_eq!(mime_to_extension("image/jpeg"), ".jpg");
1485        assert_eq!(mime_to_extension("video/mp4"), ".mp4");
1486        assert_eq!(mime_to_extension("application/octet-stream"), "");
1487        assert_eq!(mime_to_extension("unknown/type"), "");
1488    }
1489
1490    #[tokio::test]
1491    async fn optimistic_uploads_return_accepted_and_store_in_background() {
1492        let temp_dir = TempDir::new().expect("temp dir");
1493        let store = Arc::new(
1494            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1495        );
1496        let mut state = test_app_state(Arc::clone(&store));
1497        state.optimistic_blossom_uploads = true;
1498
1499        let keys = nostr::Keys::generate();
1500        let mut headers = HeaderMap::new();
1501        headers.insert(
1502            header::AUTHORIZATION,
1503            create_upload_auth_header(&keys)
1504                .parse()
1505                .expect("auth header value"),
1506        );
1507        headers.insert(
1508            header::CONTENT_TYPE,
1509            "application/octet-stream"
1510                .parse()
1511                .expect("content type header value"),
1512        );
1513
1514        let body = axum::body::Bytes::from((0u8..=255).map(|byte| byte ^ 0x55).collect::<Vec<_>>());
1515        let hash = sha256(&body);
1516        let response = upload_blob(State(state), headers, body)
1517            .await
1518            .into_response();
1519        assert_eq!(response.status(), StatusCode::ACCEPTED);
1520
1521        for _ in 0..50 {
1522            if store.blob_exists(&hash).expect("blob exists check") {
1523                return;
1524            }
1525            tokio::time::sleep(Duration::from_millis(10)).await;
1526        }
1527
1528        panic!("optimistic upload was not stored in the background");
1529    }
1530
1531    #[tokio::test]
1532    async fn optimistic_upload_existing_blob_skips_queue() {
1533        let temp_dir = TempDir::new().expect("temp dir");
1534        let store = Arc::new(
1535            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1536        );
1537        let body = axum::body::Bytes::from(
1538            (0u16..=255)
1539                .map(|value| ((value * 73 + 19) % 256) as u8)
1540                .collect::<Vec<_>>(),
1541        );
1542        store.put_cached_blob(&body).expect("seed blob");
1543
1544        let mut state = test_app_state(Arc::clone(&store));
1545        state.optimistic_blossom_uploads = true;
1546        state.optimistic_upload_queue_bytes = 1;
1547        state.optimistic_upload_queue = Arc::new(tokio::sync::Semaphore::new(1));
1548
1549        let keys = nostr::Keys::generate();
1550        let mut headers = HeaderMap::new();
1551        headers.insert(
1552            header::AUTHORIZATION,
1553            create_upload_auth_header(&keys)
1554                .parse()
1555                .expect("auth header value"),
1556        );
1557        headers.insert(
1558            header::CONTENT_TYPE,
1559            "application/octet-stream"
1560                .parse()
1561                .expect("content type header value"),
1562        );
1563
1564        let response = upload_blob(State(state), headers, body)
1565            .await
1566            .into_response();
1567        assert_eq!(response.status(), StatusCode::CONFLICT);
1568    }
1569
1570    #[tokio::test]
1571    async fn optimistic_upload_existing_blob_uses_queue_before_preflight_when_queue_has_room() {
1572        let temp_dir = TempDir::new().expect("temp dir");
1573        let store = Arc::new(
1574            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1575        );
1576        let body = axum::body::Bytes::from((0u8..=255).rev().collect::<Vec<_>>());
1577        let hash_hex = hex::encode(sha256(&body));
1578        store.put_cached_blob(&body).expect("seed blob");
1579
1580        let mut state = test_app_state(store);
1581        state.optimistic_blossom_uploads = true;
1582
1583        let keys = nostr::Keys::generate();
1584        let mut headers = HeaderMap::new();
1585        headers.insert(
1586            header::AUTHORIZATION,
1587            create_upload_auth_header(&keys)
1588                .parse()
1589                .expect("auth header value"),
1590        );
1591        headers.insert(
1592            header::CONTENT_TYPE,
1593            "application/octet-stream"
1594                .parse()
1595                .expect("content type header value"),
1596        );
1597
1598        let response = upload_blob(State(state), headers, body)
1599            .await
1600            .into_response();
1601        assert_eq!(response.status(), StatusCode::ACCEPTED);
1602
1603        for _ in 0..50 {
1604            if !optimistic_upload_is_inflight(&hash_hex) {
1605                return;
1606            }
1607            tokio::time::sleep(Duration::from_millis(10)).await;
1608        }
1609
1610        clear_optimistic_upload_inflight(&hash_hex);
1611        panic!("optimistic upload in-flight marker was not cleared");
1612    }
1613
1614    #[tokio::test]
1615    async fn optimistic_upload_inflight_duplicate_skips_queue() {
1616        let temp_dir = TempDir::new().expect("temp dir");
1617        let store = Arc::new(
1618            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1619        );
1620        let body = axum::body::Bytes::from((0u8..=255).collect::<Vec<_>>());
1621        let hash_hex = hex::encode(sha256(&body));
1622        assert!(mark_optimistic_upload_inflight(&hash_hex));
1623
1624        let mut state = test_app_state(store);
1625        state.optimistic_blossom_uploads = true;
1626        state.optimistic_upload_queue_bytes = 1;
1627        state.optimistic_upload_queue = Arc::new(tokio::sync::Semaphore::new(1));
1628
1629        let keys = nostr::Keys::generate();
1630        let mut headers = HeaderMap::new();
1631        headers.insert(
1632            header::AUTHORIZATION,
1633            create_upload_auth_header(&keys)
1634                .parse()
1635                .expect("auth header value"),
1636        );
1637        headers.insert(
1638            header::CONTENT_TYPE,
1639            "application/octet-stream"
1640                .parse()
1641                .expect("content type header value"),
1642        );
1643
1644        let response = upload_blob(State(state), headers, body)
1645            .await
1646            .into_response();
1647        clear_optimistic_upload_inflight(&hash_hex);
1648        assert_eq!(response.status(), StatusCode::ACCEPTED);
1649    }
1650
1651    #[test]
1652    fn public_writes_accept_unlisted_authors_for_uploads() {
1653        let temp_dir = TempDir::new().expect("temp dir");
1654        let store =
1655            Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 700).expect("store"));
1656        let mut state = test_app_state(store);
1657        let pubkey = "ea4fe79e57f209309bffed2f92f0b95b59d3d1cb4e8892444398aeea7ee317ed";
1658
1659        state.public_writes = true;
1660        assert!(can_accept_upload_author(&state, pubkey));
1661        assert!(!is_allowed_write_author(&state, pubkey));
1662
1663        state.public_writes = false;
1664        assert!(!can_accept_upload_author(&state, pubkey));
1665    }
1666
1667    #[test]
1668    fn public_write_trust_allows_octet_stream_and_raw_media_payloads() {
1669        let encrypted_block: Vec<u8> = (0..=255).collect();
1670
1671        assert_eq!(
1672            validate_upload_payload(&encrypted_block, "application/octet-stream", false, true,),
1673            Ok(())
1674        );
1675
1676        assert_eq!(
1677            validate_upload_payload(b"audio bytes", "audio/mpeg", true, true,),
1678            Ok(())
1679        );
1680
1681        assert_eq!(
1682            validate_upload_payload(b"audio bytes", "audio/mpeg", false, true,),
1683            Err((
1684                StatusCode::FORBIDDEN,
1685                "Raw media uploads require write access".to_string(),
1686            ))
1687        );
1688    }
1689
1690    #[test]
1691    fn authenticated_chk_uploads_skip_entropy_heuristic() {
1692        let low_unique_block: Vec<u8> = (0..256).map(|i| (i % 139) as u8).collect();
1693
1694        assert_eq!(
1695            validate_upload_payload(&low_unique_block, "application/octet-stream", true, true,),
1696            Ok(())
1697        );
1698
1699        assert_eq!(
1700            validate_upload_payload(&low_unique_block, "application/octet-stream", false, true,),
1701            Err((
1702                StatusCode::UNSUPPORTED_MEDIA_TYPE,
1703                "Data not encrypted. Unique: 139 (min: 140)".to_string(),
1704            ))
1705        );
1706    }
1707
1708    #[test]
1709    fn unowned_public_uploads_use_cache_storage_semantics() {
1710        let temp_dir = TempDir::new().expect("temp dir");
1711        let store =
1712            Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 700).expect("store"));
1713        let state = test_app_state(Arc::clone(&store));
1714
1715        let owned = vec![1u8; 280];
1716        let owned_hash = sha256(&owned);
1717        store_blossom_blob(&state, &owned, &owned_hash, &[2u8; 32], true).expect("owned upload");
1718
1719        let public_upload = vec![3u8; 280];
1720        let public_hash = sha256(&public_upload);
1721        store_blossom_blob(&state, &public_upload, &public_hash, &[4u8; 32], false)
1722            .expect("public upload");
1723
1724        let replacement = vec![5u8; 280];
1725        let replacement_hash = sha256(&replacement);
1726        state
1727            .store
1728            .put_cached_blob(&replacement)
1729            .expect("replacement cached blob");
1730
1731        assert!(state.store.blob_exists(&owned_hash).expect("owned exists"));
1732        assert!(!state
1733            .store
1734            .blob_exists(&public_hash)
1735            .expect("public upload evicted"));
1736        assert!(state
1737            .store
1738            .blob_exists(&replacement_hash)
1739            .expect("replacement exists"));
1740        assert!(state
1741            .store
1742            .is_blob_owner(&owned_hash, &[2u8; 32])
1743            .expect("owned tracked"));
1744        assert!(!state
1745            .store
1746            .blob_has_owners(&public_hash)
1747            .expect("public upload unowned"));
1748    }
1749
1750    #[test]
1751    fn owned_blossom_uploads_are_rejected_when_storage_limit_is_full() {
1752        let temp_dir = TempDir::new().expect("temp dir");
1753        let store =
1754            Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 500).expect("store"));
1755        let state = test_app_state(Arc::clone(&store));
1756
1757        let first = vec![1u8; 300];
1758        let first_hash = sha256(&first);
1759        let owner = [2u8; 32];
1760        store_blossom_blob(&state, &first, &first_hash, &owner, true).expect("first upload");
1761
1762        let second = vec![3u8; 300];
1763        let second_hash = sha256(&second);
1764        let error = store_blossom_blob(&state, &second, &second_hash, &owner, true)
1765            .expect_err("second owned upload should exceed the storage limit");
1766
1767        assert!(
1768            error.to_string().contains("storage limit"),
1769            "unexpected error: {error}"
1770        );
1771        assert!(state
1772            .store
1773            .blob_exists(&first_hash)
1774            .expect("first blob remains"));
1775        assert!(!state
1776            .store
1777            .blob_exists(&second_hash)
1778            .expect("second blob rejected"));
1779        assert!(state
1780            .store
1781            .is_blob_owner(&first_hash, &owner)
1782            .expect("first owner tracked"));
1783        assert!(!state
1784            .store
1785            .is_blob_owner(&second_hash, &owner)
1786            .expect("second owner not tracked"));
1787    }
1788}