Skip to main content

hashtree_cli/server/
blossom.rs

1//! Blossom protocol implementation (BUD-01, BUD-02)
2//!
3//! Implements blob storage endpoints with Nostr-based authentication.
4//! See: https://github.com/hzrd149/blossom
5
6use axum::{
7    body::Body,
8    extract::{Path, Query, State},
9    http::{header, HeaderMap, Response, StatusCode},
10    response::IntoResponse,
11    Json,
12};
13use base64::Engine;
14use hashtree_core::from_hex;
15use serde::{Deserialize, Serialize};
16use sha2::{Digest, Sha256};
17use std::collections::HashSet;
18use std::sync::{Mutex, OnceLock};
19use std::time::{Duration, SystemTime, UNIX_EPOCH};
20
21use super::auth::AppState;
22use super::blob_read::{acquire_blob_read, acquire_blob_write, blob_read_timeout, BLOB_READ_BUSY};
23use super::ingest_filter::{
24    content_type_base, is_chk_content_type, validate_untrusted_blob, IngestRejection,
25};
26use super::mime::get_mime_type;
27
28/// Blossom authorization event kind (NIP-98 style)
29const BLOSSOM_AUTH_KIND: u16 = 24242;
30
31/// Cache-Control header for immutable content-addressed data (1 year)
32const IMMUTABLE_CACHE_CONTROL: &str = "public, max-age=31536000, immutable";
33const NOT_FOUND_CACHE_CONTROL: &str = "no-store";
34const IMMUTABLE_NOT_FOUND_CACHE_CONTROL: &str = "public, max-age=0, s-maxage=5";
35const OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS_ENV: &str = "HTREE_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS";
36const DEFAULT_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS: u64 = 15_000;
37
38/// Default maximum upload size in bytes (5 MB)
39pub const DEFAULT_MAX_UPLOAD_SIZE: usize = 5 * 1024 * 1024;
40const OPTIMISTIC_UPLOAD_MIN_QUEUE_CHARGE_BYTES: usize = 256 * 1024;
41const MAX_BATCH_UPLOAD_BLOBS: usize = 1024;
42const MAX_BATCH_UPLOAD_BYTES: usize = 64 * 1024 * 1024;
43
44#[derive(Debug, Clone, Copy)]
45pub(super) struct OptimisticUploadQueueSnapshot {
46    pub enabled: bool,
47    pub max_bytes: usize,
48    pub available_bytes: usize,
49    pub reserved_bytes: usize,
50    pub in_flight: usize,
51    pub queue_timeout_ms: u64,
52}
53
54/// Check if a pubkey has write access based on allowed_npubs config or social graph
55/// Returns Ok(()) if allowed, Err with JSON error body if denied
56#[allow(clippy::result_large_err)]
57fn check_write_access(state: &AppState, pubkey: &str) -> Result<(), Response<Body>> {
58    // Check if pubkey is in the allowed list (converted from npub to hex)
59    if is_allowed_write_author(state, pubkey) {
60        tracing::debug!(
61            "Blossom write allowed for {}... (allowed writer)",
62            &pubkey[..8.min(pubkey.len())]
63        );
64        return Ok(());
65    }
66
67    // Not in allowed list or social graph
68    tracing::info!(
69        "Blossom write denied for {}... (not in allowed_npubs or social graph)",
70        &pubkey[..8.min(pubkey.len())]
71    );
72    Err(Response::builder()
73        .status(StatusCode::FORBIDDEN)
74        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
75        .header(header::CONTENT_TYPE, "application/json")
76        .body(Body::from(
77            r#"{"error":"Write access denied. Your pubkey is not in the allowed list."}"#,
78        ))
79        .unwrap())
80}
81
82fn is_allowed_write_author(state: &AppState, pubkey: &str) -> bool {
83    if state.allowed_pubkeys.contains(pubkey) {
84        return true;
85    }
86
87    state
88        .social_graph
89        .as_ref()
90        .map(|sg| sg.check_write_access(pubkey))
91        .unwrap_or(false)
92}
93
94fn can_accept_upload_author(state: &AppState, pubkey: &str) -> bool {
95    state.public_writes || is_allowed_write_author(state, pubkey)
96}
97
98fn validate_upload_payload(
99    body: &[u8],
100    content_type: &str,
101    is_allowed_writer: bool,
102    require_random_untrusted_ingest: bool,
103) -> Result<(), (StatusCode, String)> {
104    let is_chk_upload = is_chk_content_type(content_type);
105
106    if !is_chk_upload && !is_allowed_writer {
107        return Err((
108            StatusCode::FORBIDDEN,
109            "Raw media uploads require write access".to_string(),
110        ));
111    }
112
113    if is_chk_upload {
114        validate_untrusted_blob(body, require_random_untrusted_ingest)
115            .map_err(|IngestRejection { status, reason }| (status, reason))?;
116    }
117
118    Ok(())
119}
120
121fn blossom_json_error(status: StatusCode, reason: impl Into<String>) -> Response<Body> {
122    let reason = reason.into();
123    Response::builder()
124        .status(status)
125        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
126        .header("X-Reason", reason.as_str())
127        .header(header::CONTENT_TYPE, "application/json")
128        .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
129        .unwrap()
130}
131
132fn blossom_retryable_json_error(
133    status: StatusCode,
134    reason: impl Into<String>,
135    retry_after_seconds: u64,
136) -> Response<Body> {
137    let reason = reason.into();
138    Response::builder()
139        .status(status)
140        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
141        .header("X-Reason", reason.as_str())
142        .header(header::RETRY_AFTER, retry_after_seconds.to_string())
143        .header(header::CONTENT_TYPE, "application/json")
144        .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
145        .unwrap()
146}
147
148/// Blob descriptor returned by upload and list endpoints
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct BlobDescriptor {
151    pub url: String,
152    pub sha256: String,
153    pub size: u64,
154    #[serde(rename = "type")]
155    pub mime_type: String,
156    pub uploaded: u64,
157}
158
159#[derive(Debug, Deserialize)]
160pub struct BatchUploadBlob {
161    pub sha256: String,
162    #[serde(default, alias = "contentType")]
163    pub content_type: Option<String>,
164    pub data: String,
165}
166
167#[derive(Debug, Deserialize)]
168pub struct BatchUploadRequest {
169    pub blobs: Vec<BatchUploadBlob>,
170}
171
172#[derive(Debug, Serialize)]
173pub struct BatchUploadResponse {
174    pub uploaded: usize,
175    pub blobs: Vec<BlobDescriptor>,
176}
177
178/// Query parameters for list endpoint
179#[derive(Debug, Deserialize)]
180pub struct ListQuery {
181    pub since: Option<u64>,
182    pub until: Option<u64>,
183    pub limit: Option<usize>,
184    pub cursor: Option<String>,
185}
186
187/// Parsed Nostr authorization event
188#[derive(Debug)]
189pub struct BlossomAuth {
190    pub pubkey: String,
191    pub kind: u16,
192    pub created_at: u64,
193    pub expiration: Option<u64>,
194    pub action: Option<String>,   // "upload", "delete", "list", "get"
195    pub blob_hashes: Vec<String>, // x tags
196    pub server: Option<String>,   // server tag
197}
198
199/// Parse and verify Nostr authorization from header
200/// Returns the verified auth or an error response
201pub fn verify_blossom_auth(
202    headers: &HeaderMap,
203    required_action: &str,
204    required_hash: Option<&str>,
205) -> Result<BlossomAuth, (StatusCode, &'static str)> {
206    let auth_header = headers
207        .get(header::AUTHORIZATION)
208        .and_then(|v| v.to_str().ok())
209        .ok_or((StatusCode::UNAUTHORIZED, "Missing Authorization header"))?;
210
211    let nostr_event = auth_header.strip_prefix("Nostr ").ok_or((
212        StatusCode::UNAUTHORIZED,
213        "Invalid auth scheme, expected 'Nostr'",
214    ))?;
215
216    // Decode base64 event
217    let engine = base64::engine::general_purpose::STANDARD;
218    let event_bytes = engine
219        .decode(nostr_event)
220        .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid base64 in auth header"))?;
221
222    let event_json: serde_json::Value = serde_json::from_slice(&event_bytes)
223        .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid JSON in auth event"))?;
224
225    // Extract event fields
226    let kind = event_json["kind"]
227        .as_u64()
228        .ok_or((StatusCode::BAD_REQUEST, "Missing kind in event"))?;
229
230    if kind != BLOSSOM_AUTH_KIND as u64 {
231        return Err((
232            StatusCode::BAD_REQUEST,
233            "Invalid event kind, expected 24242",
234        ));
235    }
236
237    let pubkey = event_json["pubkey"]
238        .as_str()
239        .ok_or((StatusCode::BAD_REQUEST, "Missing pubkey in event"))?
240        .to_string();
241
242    let created_at = event_json["created_at"]
243        .as_u64()
244        .ok_or((StatusCode::BAD_REQUEST, "Missing created_at in event"))?;
245
246    let sig = event_json["sig"]
247        .as_str()
248        .ok_or((StatusCode::BAD_REQUEST, "Missing signature in event"))?;
249
250    // Verify signature
251    if !verify_nostr_signature(&event_json, &pubkey, sig) {
252        return Err((StatusCode::UNAUTHORIZED, "Invalid signature"));
253    }
254
255    // Parse tags
256    let tags = event_json["tags"]
257        .as_array()
258        .ok_or((StatusCode::BAD_REQUEST, "Missing tags in event"))?;
259
260    let mut expiration: Option<u64> = None;
261    let mut action: Option<String> = None;
262    let mut blob_hashes: Vec<String> = Vec::new();
263    let mut server: Option<String> = None;
264
265    for tag in tags {
266        let tag_arr = tag.as_array();
267        if let Some(arr) = tag_arr {
268            if arr.len() >= 2 {
269                let tag_name = arr[0].as_str().unwrap_or("");
270                let tag_value = arr[1].as_str().unwrap_or("");
271
272                match tag_name {
273                    "t" => action = Some(tag_value.to_string()),
274                    "x" => blob_hashes.push(tag_value.to_lowercase()),
275                    "expiration" => expiration = tag_value.parse().ok(),
276                    "server" => server = Some(tag_value.to_string()),
277                    _ => {}
278                }
279            }
280        }
281    }
282
283    // Validate expiration
284    let now = SystemTime::now()
285        .duration_since(UNIX_EPOCH)
286        .unwrap()
287        .as_secs();
288
289    if let Some(exp) = expiration {
290        if exp < now {
291            return Err((StatusCode::UNAUTHORIZED, "Authorization expired"));
292        }
293    }
294
295    // Validate created_at is not in the future (with 60s tolerance)
296    if created_at > now + 60 {
297        return Err((StatusCode::BAD_REQUEST, "Event created_at is in the future"));
298    }
299
300    // Validate action matches
301    if let Some(ref act) = action {
302        if act != required_action {
303            return Err((StatusCode::FORBIDDEN, "Action mismatch"));
304        }
305    } else {
306        return Err((StatusCode::BAD_REQUEST, "Missing 't' tag for action"));
307    }
308
309    // Validate hash if required
310    if let Some(hash) = required_hash {
311        if !blob_hashes.is_empty() && !blob_hashes.contains(&hash.to_lowercase()) {
312            return Err((StatusCode::FORBIDDEN, "Blob hash not authorized"));
313        }
314    }
315
316    Ok(BlossomAuth {
317        pubkey,
318        kind: kind as u16,
319        created_at,
320        expiration,
321        action,
322        blob_hashes,
323        server,
324    })
325}
326
327/// Verify Nostr event signature using secp256k1
328fn verify_nostr_signature(event: &serde_json::Value, pubkey: &str, sig: &str) -> bool {
329    use secp256k1::{schnorr::Signature, Message, Secp256k1, XOnlyPublicKey};
330
331    // Compute event ID (sha256 of serialized event)
332    let content = event["content"].as_str().unwrap_or("");
333    let full_serialized = format!(
334        "[0,\"{}\",{},{},{},\"{}\"]",
335        pubkey,
336        event["created_at"],
337        event["kind"],
338        event["tags"],
339        escape_json_string(content),
340    );
341
342    let mut hasher = Sha256::new();
343    hasher.update(full_serialized.as_bytes());
344    let event_id = hasher.finalize();
345
346    // Parse pubkey and signature
347    let pubkey_bytes = match hex::decode(pubkey) {
348        Ok(b) => b,
349        Err(_) => return false,
350    };
351
352    let sig_bytes = match hex::decode(sig) {
353        Ok(b) => b,
354        Err(_) => return false,
355    };
356
357    let secp = Secp256k1::verification_only();
358
359    let xonly_pubkey = match XOnlyPublicKey::from_slice(&pubkey_bytes) {
360        Ok(pk) => pk,
361        Err(_) => return false,
362    };
363
364    let signature = match Signature::from_slice(&sig_bytes) {
365        Ok(s) => s,
366        Err(_) => return false,
367    };
368
369    let message = match Message::from_digest_slice(&event_id) {
370        Ok(m) => m,
371        Err(_) => return false,
372    };
373
374    secp.verify_schnorr(&signature, &message, &xonly_pubkey)
375        .is_ok()
376}
377
378/// Escape string for JSON serialization
379fn escape_json_string(s: &str) -> String {
380    let mut result = String::new();
381    for c in s.chars() {
382        match c {
383            '"' => result.push_str("\\\""),
384            '\\' => result.push_str("\\\\"),
385            '\n' => result.push_str("\\n"),
386            '\r' => result.push_str("\\r"),
387            '\t' => result.push_str("\\t"),
388            c if c.is_control() => {
389                result.push_str(&format!("\\u{:04x}", c as u32));
390            }
391            c => result.push(c),
392        }
393    }
394    result
395}
396
397/// CORS preflight handler for all Blossom endpoints
398/// Echoes back Access-Control-Request-Headers to allow any headers
399pub async fn cors_preflight(headers: HeaderMap) -> impl IntoResponse {
400    // Echo back requested headers, or use sensible defaults that cover common Blossom headers
401    let allowed_headers = headers
402        .get(header::ACCESS_CONTROL_REQUEST_HEADERS)
403        .and_then(|v| v.to_str().ok())
404        .unwrap_or("Authorization, Content-Type, X-SHA-256, x-sha-256");
405
406    // Always include common headers in addition to what was requested
407    let full_allowed = format!(
408        "{}, Authorization, Content-Type, X-SHA-256, x-sha-256, Accept, Cache-Control",
409        allowed_headers
410    );
411
412    Response::builder()
413        .status(StatusCode::NO_CONTENT)
414        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
415        .header(
416            header::ACCESS_CONTROL_ALLOW_METHODS,
417            "GET, HEAD, PUT, DELETE, OPTIONS",
418        )
419        .header(header::ACCESS_CONTROL_ALLOW_HEADERS, full_allowed)
420        .header(header::ACCESS_CONTROL_MAX_AGE, "86400")
421        .body(Body::empty())
422        .unwrap()
423}
424
425/// HEAD /<sha256> - Check if blob exists
426pub async fn head_blob(
427    State(state): State<AppState>,
428    Path(id): Path<String>,
429    connect_info: axum::extract::ConnectInfo<std::net::SocketAddr>,
430) -> impl IntoResponse {
431    let is_localhost = connect_info.0.ip().is_loopback();
432    let (hash_part, ext) = parse_hash_and_extension(&id);
433
434    if !is_valid_sha256(hash_part) {
435        return Response::builder()
436            .status(StatusCode::BAD_REQUEST)
437            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
438            .header("X-Reason", "Invalid SHA256 hash")
439            .body(Body::empty())
440            .unwrap();
441    }
442
443    let sha256_hex = hash_part.to_lowercase();
444    let sha256_bytes: [u8; 32] = match from_hex(&sha256_hex) {
445        Ok(b) => b,
446        Err(_) => {
447            return Response::builder()
448                .status(StatusCode::BAD_REQUEST)
449                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
450                .header("X-Reason", "Invalid SHA256 format")
451                .body(Body::empty())
452                .unwrap()
453        }
454    };
455
456    // Blossom HEAD only needs metadata; avoid reading the full blob body just to
457    // answer cache probes and CDN revalidation. The read permit keeps CDN probe
458    // storms from filling Tokio's blocking thread pool while old blobs without
459    // metadata are still being normalized.
460    let blob_size = if let Some(cached) = state.blob_cache.get_size(&sha256_hex) {
461        Ok(Ok(cached))
462    } else {
463        let permit = match acquire_blob_read().await {
464            Ok(permit) => permit,
465            Err(_) => {
466                return Response::builder()
467                    .status(StatusCode::SERVICE_UNAVAILABLE)
468                    .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
469                    .header(header::CACHE_CONTROL, NOT_FOUND_CACHE_CONTROL)
470                    .header("Retry-After", "1")
471                    .header("X-Reason", BLOB_READ_BUSY)
472                    .body(Body::empty())
473                    .unwrap()
474            }
475        };
476        let store = state.store.clone();
477        let size_read = tokio::task::spawn_blocking(move || store.blob_size(&sha256_bytes));
478        let timed = tokio::time::timeout(blob_read_timeout(), size_read).await;
479        drop(permit);
480        let result = match timed {
481            Ok(result) => result.map_err(|_| ()),
482            Err(_) => Err(()),
483        };
484        if let Ok(Ok(size)) = result {
485            state.blob_cache.put_size(sha256_hex.clone(), size);
486        }
487        result
488    };
489
490    match blob_size {
491        Ok(Ok(Some(size))) => {
492            let mime_type = ext
493                .map(|e| get_mime_type(&format!("file{}", e)))
494                .unwrap_or("application/octet-stream");
495
496            let mut builder = Response::builder()
497                .status(StatusCode::OK)
498                .header(header::CONTENT_TYPE, mime_type)
499                .header(header::CONTENT_LENGTH, size)
500                .header(header::ACCEPT_RANGES, "bytes")
501                .header(header::CACHE_CONTROL, IMMUTABLE_CACHE_CONTROL)
502                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*");
503            if is_localhost {
504                builder = builder.header("X-Source", "local");
505            }
506            builder.body(Body::empty()).unwrap()
507        }
508        Ok(Ok(None)) => Response::builder()
509            .status(StatusCode::NOT_FOUND)
510            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
511            .header(header::CACHE_CONTROL, IMMUTABLE_NOT_FOUND_CACHE_CONTROL)
512            .header("X-Reason", "Blob not found")
513            .body(Body::empty())
514            .unwrap(),
515        _ => Response::builder()
516            .status(StatusCode::INTERNAL_SERVER_ERROR)
517            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
518            .body(Body::empty())
519            .unwrap(),
520    }
521}
522
523async fn store_blossom_blob_without_blocking_runtime(
524    state: &AppState,
525    data: axum::body::Bytes,
526    pubkey: [u8; 32],
527    track_ownership: bool,
528) -> anyhow::Result<()> {
529    let mut hasher = Sha256::new();
530    hasher.update(&data);
531    let hash_hex = hex::encode(hasher.finalize());
532    let data_for_cache = data.clone();
533    let permit = acquire_blob_write()
534        .await
535        .map_err(|err| anyhow::anyhow!(err))?;
536    let store = state.store.clone();
537    tokio::task::spawn_blocking(move || {
538        let _permit = permit;
539        if track_ownership {
540            store.put_owned_blob(&data, &pubkey)?;
541        } else {
542            store.put_cached_blob(&data)?;
543        }
544        Ok::<(), anyhow::Error>(())
545    })
546    .await
547    .map_err(|err| anyhow::anyhow!("blob write task failed: {}", err))??;
548    state
549        .blob_cache
550        .put_size(hash_hex.clone(), Some(data_for_cache.len() as u64));
551    state.blob_cache.put_body(hash_hex, &data_for_cache);
552    Ok(())
553}
554
555fn upload_descriptor_response(status: StatusCode, descriptor: &BlobDescriptor) -> Response<Body> {
556    Response::builder()
557        .status(status)
558        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
559        .header(header::CONTENT_TYPE, "application/json")
560        .body(Body::from(serde_json::to_string(descriptor).unwrap()))
561        .unwrap()
562}
563
564fn optimistic_upload_queue_timeout() -> Duration {
565    let millis = std::env::var(OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS_ENV)
566        .ok()
567        .and_then(|value| value.parse::<u64>().ok())
568        .filter(|value| *value > 0)
569        .unwrap_or(DEFAULT_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS);
570    Duration::from_millis(millis)
571}
572
573fn optimistic_upload_inflight() -> &'static Mutex<HashSet<String>> {
574    static INFLIGHT: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
575    INFLIGHT.get_or_init(|| Mutex::new(HashSet::new()))
576}
577
578fn optimistic_upload_is_inflight(hash_hex: &str) -> bool {
579    optimistic_upload_inflight()
580        .lock()
581        .is_ok_and(|inflight| inflight.contains(hash_hex))
582}
583
584fn mark_optimistic_upload_inflight(hash_hex: &str) -> bool {
585    optimistic_upload_inflight()
586        .lock()
587        .map(|mut inflight| inflight.insert(hash_hex.to_string()))
588        .unwrap_or(true)
589}
590
591fn clear_optimistic_upload_inflight(hash_hex: &str) {
592    if let Ok(mut inflight) = optimistic_upload_inflight().lock() {
593        inflight.remove(hash_hex);
594    }
595}
596
597pub(super) fn optimistic_upload_queue_snapshot(state: &AppState) -> OptimisticUploadQueueSnapshot {
598    let max_bytes = state.optimistic_upload_queue_bytes;
599    let available_bytes = state
600        .optimistic_upload_queue
601        .available_permits()
602        .min(max_bytes);
603    let in_flight = optimistic_upload_inflight()
604        .lock()
605        .map(|inflight| inflight.len())
606        .unwrap_or(0);
607
608    OptimisticUploadQueueSnapshot {
609        enabled: state.optimistic_blossom_uploads,
610        max_bytes,
611        available_bytes,
612        reserved_bytes: max_bytes.saturating_sub(available_bytes),
613        in_flight,
614        queue_timeout_ms: duration_millis_u64(optimistic_upload_queue_timeout()),
615    }
616}
617
618fn duration_millis_u64(duration: Duration) -> u64 {
619    duration.as_millis().min(u128::from(u64::MAX)) as u64
620}
621
622async fn acquire_optimistic_upload_queue(
623    state: &AppState,
624    permits: u32,
625) -> Result<tokio::sync::OwnedSemaphorePermit, &'static str> {
626    match tokio::time::timeout(
627        optimistic_upload_queue_timeout(),
628        state
629            .optimistic_upload_queue
630            .clone()
631            .acquire_many_owned(permits),
632    )
633    .await
634    {
635        Ok(Ok(permit)) => Ok(permit),
636        Ok(Err(_)) => Err("Optimistic upload queue is closed"),
637        Err(_) => Err("Optimistic upload queue is full"),
638    }
639}
640
641async fn uploaded_blob_already_exists(
642    state: &AppState,
643    sha256_hash: [u8; 32],
644    sha256_hex: &str,
645) -> Result<bool, String> {
646    if let Some(Some(_)) = state.blob_cache.get_size(sha256_hex) {
647        return Ok(true);
648    }
649
650    let permit = acquire_blob_read().await.map_err(str::to_string)?;
651    let store = state.store.clone();
652    let size_read = tokio::task::spawn_blocking(move || {
653        store
654            .blob_size(&sha256_hash)
655            .map_err(|error| error.to_string())
656    });
657
658    let result = tokio::time::timeout(blob_read_timeout(), size_read).await;
659    drop(permit);
660    match result {
661        Ok(Ok(Ok(size))) => {
662            state.blob_cache.put_size(sha256_hex.to_string(), size);
663            Ok(size.is_some())
664        }
665        Ok(Ok(Err(error))) => Err(error),
666        Ok(Err(error)) => Err(format!("blob existence task failed: {}", error)),
667        Err(_) => Err("blob existence check timed out".to_string()),
668    }
669}
670
671async fn set_existing_blob_owner_without_body_write(
672    state: AppState,
673    sha256_hash: [u8; 32],
674    pubkey: [u8; 32],
675) -> anyhow::Result<()> {
676    tokio::task::spawn_blocking(move || state.store.set_blob_owner(&sha256_hash, &pubkey))
677        .await
678        .map_err(|error| anyhow::anyhow!("blob owner task failed: {}", error))??;
679    Ok(())
680}
681
682/// PUT /upload - Upload a new blob (BUD-02)
683pub async fn upload_blob(
684    State(state): State<AppState>,
685    headers: HeaderMap,
686    body: axum::body::Bytes,
687) -> impl IntoResponse {
688    // Check size limit first (before auth to save resources)
689    let max_size = state.max_upload_bytes;
690    if body.len() > max_size {
691        return Response::builder()
692            .status(StatusCode::PAYLOAD_TOO_LARGE)
693            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
694            .header(header::CONTENT_TYPE, "application/json")
695            .body(Body::from(format!(
696                r#"{{"error":"Upload size {} bytes exceeds maximum {} bytes ({} MB)"}}"#,
697                body.len(),
698                max_size,
699                max_size / 1024 / 1024
700            )))
701            .unwrap();
702    }
703
704    // Verify authorization
705    let auth = match verify_blossom_auth(&headers, "upload", None) {
706        Ok(a) => a,
707        Err((status, reason)) => {
708            return Response::builder()
709                .status(status)
710                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
711                .header("X-Reason", reason)
712                .header(header::CONTENT_TYPE, "application/json")
713                .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
714                .unwrap();
715        }
716    };
717
718    // Get content type from header
719    let content_type = headers
720        .get(header::CONTENT_TYPE)
721        .and_then(|v| v.to_str().ok())
722        .unwrap_or("application/octet-stream")
723        .to_string();
724
725    // Check write access: either in allowed_npubs/social graph OR public_writes is enabled.
726    let is_allowed_author = is_allowed_write_author(&state, &auth.pubkey);
727    let can_upload = can_accept_upload_author(&state, &auth.pubkey);
728    if !can_upload {
729        let _ = check_write_access(&state, &auth.pubkey);
730        return Response::builder()
731            .status(StatusCode::FORBIDDEN)
732            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
733            .header(header::CONTENT_TYPE, "application/json")
734            .body(Body::from(r#"{"error":"Write access denied. Your pubkey is not in the allowed list and public writes are disabled."}"#))
735            .unwrap();
736    }
737
738    if let Err((status, reason)) = validate_upload_payload(
739        &body,
740        &content_type,
741        can_upload,
742        state.require_random_untrusted_ingest,
743    ) {
744        return blossom_json_error(status, reason);
745    }
746
747    // Compute SHA256 of uploaded data
748    let mut hasher = Sha256::new();
749    hasher.update(&body);
750    let sha256_hash: [u8; 32] = hasher.finalize().into();
751    let sha256_hex = hex::encode(sha256_hash);
752
753    // If auth has x tags, verify hash matches
754    if !auth.blob_hashes.is_empty() && !auth.blob_hashes.contains(&sha256_hex) {
755        return Response::builder()
756            .status(StatusCode::FORBIDDEN)
757            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
758            .header(
759                "X-Reason",
760                "Uploaded blob hash does not match authorized hash",
761            )
762            .header(header::CONTENT_TYPE, "application/json")
763            .body(Body::from(r#"{"error":"Hash mismatch"}"#))
764            .unwrap();
765    }
766
767    // Convert pubkey hex to bytes
768    let pubkey_bytes = match from_hex(&auth.pubkey) {
769        Ok(b) => b,
770        Err(_) => {
771            return Response::builder()
772                .status(StatusCode::BAD_REQUEST)
773                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
774                .header("X-Reason", "Invalid pubkey format")
775                .body(Body::empty())
776                .unwrap();
777        }
778    };
779
780    let size = body.len() as u64;
781    let now = SystemTime::now()
782        .duration_since(UNIX_EPOCH)
783        .unwrap()
784        .as_secs();
785    let ext = mime_to_extension(&content_type_base(&content_type));
786    let descriptor = BlobDescriptor {
787        url: format!("/{}{}", sha256_hex, ext),
788        sha256: sha256_hex.clone(),
789        size,
790        mime_type: content_type,
791        uploaded: now,
792    };
793
794    if state.optimistic_blossom_uploads && optimistic_upload_is_inflight(&sha256_hex) {
795        return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
796    }
797
798    // Store public-write blobs in cache storage unless the writer is explicitly
799    // allowed, so untrusted public uploads do not become protected owned data.
800    if state.optimistic_blossom_uploads {
801        let queued_bytes = body.len().max(OPTIMISTIC_UPLOAD_MIN_QUEUE_CHARGE_BYTES);
802        if queued_bytes <= state.optimistic_upload_queue_bytes {
803            let permits = queued_bytes as u32;
804            let marked_inflight = mark_optimistic_upload_inflight(&sha256_hex);
805            if !marked_inflight {
806                return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
807            }
808            let permit = match acquire_optimistic_upload_queue(&state, permits).await {
809                Ok(permit) => permit,
810                Err(_) => {
811                    clear_optimistic_upload_inflight(&sha256_hex);
812                    return blossom_retryable_json_error(
813                        StatusCode::SERVICE_UNAVAILABLE,
814                        "Optimistic upload queue is full",
815                        2,
816                    );
817                }
818            };
819            let state_for_write = state.clone();
820            let hash_for_log = sha256_hex.clone();
821            tokio::spawn(async move {
822                let _permit = permit;
823                if let Err(error) = store_blossom_blob_without_blocking_runtime(
824                    &state_for_write,
825                    body,
826                    pubkey_bytes,
827                    is_allowed_author,
828                )
829                .await
830                {
831                    tracing::error!(
832                        "Background Blossom storage failed for {}: {:#}",
833                        hash_for_log,
834                        error
835                    );
836                }
837                clear_optimistic_upload_inflight(&hash_for_log);
838            });
839
840            return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
841        }
842
843        tracing::warn!(
844            "Blossom upload {} is larger than optimistic queue budget {}; storing synchronously",
845            queued_bytes,
846            state.optimistic_upload_queue_bytes
847        );
848    }
849
850    match uploaded_blob_already_exists(&state, sha256_hash, &sha256_hex).await {
851        Ok(true) => {
852            if is_allowed_author {
853                if let Err(error) = set_existing_blob_owner_without_body_write(
854                    state.clone(),
855                    sha256_hash,
856                    pubkey_bytes,
857                )
858                .await
859                {
860                    return Response::builder()
861                        .status(StatusCode::INTERNAL_SERVER_ERROR)
862                        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
863                        .header("X-Reason", "Storage error")
864                        .header(header::CONTENT_TYPE, "application/json")
865                        .body(Body::from(format!(r#"{{"error":"{}"}}"#, error)))
866                        .unwrap();
867                }
868            }
869            return upload_descriptor_response(StatusCode::CONFLICT, &descriptor);
870        }
871        Ok(false) => {}
872        Err(error) => {
873            tracing::debug!(
874                "Could not preflight Blossom upload {} before synchronous storage: {}",
875                sha256_hex,
876                error
877            );
878        }
879    }
880
881    let store_result = store_blossom_blob_without_blocking_runtime(
882        &state,
883        body.clone(),
884        pubkey_bytes,
885        is_allowed_author,
886    )
887    .await;
888
889    match store_result {
890        Ok(()) => upload_descriptor_response(StatusCode::OK, &descriptor),
891        Err(e) => Response::builder()
892            .status(StatusCode::INTERNAL_SERVER_ERROR)
893            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
894            .header("X-Reason", "Storage error")
895            .header(header::CONTENT_TYPE, "application/json")
896            .body(Body::from(format!(r#"{{"error":"{}"}}"#, e)))
897            .unwrap(),
898    }
899}
900
901/// POST /upload/batch - Upload multiple blobs with one auth event and one storage batch.
902pub async fn upload_blob_batch(
903    State(state): State<AppState>,
904    headers: HeaderMap,
905    Json(payload): Json<BatchUploadRequest>,
906) -> impl IntoResponse {
907    if payload.blobs.is_empty() {
908        return blossom_json_error(StatusCode::BAD_REQUEST, "Batch is empty");
909    }
910    if payload.blobs.len() > MAX_BATCH_UPLOAD_BLOBS {
911        return blossom_json_error(StatusCode::PAYLOAD_TOO_LARGE, "Batch contains too many blobs");
912    }
913
914    let auth = match verify_blossom_auth(&headers, "upload", None) {
915        Ok(a) => a,
916        Err((status, reason)) => {
917            return Response::builder()
918                .status(status)
919                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
920                .header("X-Reason", reason)
921                .header(header::CONTENT_TYPE, "application/json")
922                .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
923                .unwrap();
924        }
925    };
926
927    let is_allowed_author = is_allowed_write_author(&state, &auth.pubkey);
928    let can_upload = can_accept_upload_author(&state, &auth.pubkey);
929    if !can_upload {
930        let _ = check_write_access(&state, &auth.pubkey);
931        return blossom_json_error(StatusCode::FORBIDDEN, "Write access denied");
932    }
933
934    let pubkey_bytes = match from_hex(&auth.pubkey) {
935        Ok(bytes) => bytes,
936        Err(_) => return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid pubkey format"),
937    };
938
939    let now = SystemTime::now()
940        .duration_since(UNIX_EPOCH)
941        .unwrap()
942        .as_secs();
943    let mut total_bytes = 0usize;
944    let mut items = Vec::with_capacity(payload.blobs.len());
945    let mut descriptors = Vec::with_capacity(payload.blobs.len());
946
947    for blob in payload.blobs {
948        let sha256_hex = blob.sha256.to_lowercase();
949        let expected_hash: [u8; 32] = match from_hex(&sha256_hex) {
950            Ok(hash) => hash,
951            Err(_) => return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid blob hash"),
952        };
953        if !auth.blob_hashes.is_empty() && !auth.blob_hashes.contains(&sha256_hex) {
954            return blossom_json_error(StatusCode::FORBIDDEN, "Uploaded blob hash does not match authorized hash");
955        }
956
957        let data = match base64::engine::general_purpose::STANDARD.decode(blob.data.as_bytes()) {
958            Ok(data) => data,
959            Err(_) => return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid blob data"),
960        };
961        if data.len() > state.max_upload_bytes {
962            return blossom_json_error(StatusCode::PAYLOAD_TOO_LARGE, "Blob exceeds maximum upload size");
963        }
964        total_bytes = total_bytes.saturating_add(data.len());
965        if total_bytes > MAX_BATCH_UPLOAD_BYTES {
966            return blossom_json_error(StatusCode::PAYLOAD_TOO_LARGE, "Batch exceeds maximum upload size");
967        }
968
969        let mut hasher = Sha256::new();
970        hasher.update(&data);
971        let actual_hash: [u8; 32] = hasher.finalize().into();
972        if actual_hash != expected_hash {
973            return blossom_json_error(StatusCode::FORBIDDEN, "Hash mismatch");
974        }
975
976        let content_type = blob
977            .content_type
978            .as_deref()
979            .map(content_type_base)
980            .unwrap_or_else(|| "application/octet-stream".to_string());
981        if let Err((status, reason)) = validate_upload_payload(
982            &data,
983            &content_type,
984            can_upload,
985            state.require_random_untrusted_ingest,
986        ) {
987            return blossom_json_error(status, reason);
988        }
989
990        let ext = mime_to_extension(&content_type);
991        descriptors.push(BlobDescriptor {
992            url: format!("/{}{}", sha256_hex, ext),
993            sha256: sha256_hex,
994            size: data.len() as u64,
995            mime_type: content_type,
996            uploaded: now,
997        });
998        items.push((actual_hash, data));
999    }
1000
1001    let store = state.store.clone();
1002    let stored = tokio::task::spawn_blocking(move || {
1003        if is_allowed_author {
1004            store.put_owned_blobs(&items, &pubkey_bytes)
1005        } else {
1006            let mut inserted = 0usize;
1007            for (_, data) in &items {
1008                store.put_cached_blob(data)?;
1009                inserted += 1;
1010            }
1011            Ok(inserted)
1012        }
1013    })
1014    .await
1015    .map_err(|error| anyhow::anyhow!("blob batch write task failed: {}", error));
1016
1017    match stored {
1018        Ok(Ok(uploaded)) => Response::builder()
1019            .status(StatusCode::OK)
1020            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1021            .header(header::CONTENT_TYPE, "application/json")
1022            .body(Body::from(
1023                serde_json::to_string(&BatchUploadResponse {
1024                    uploaded,
1025                    blobs: descriptors,
1026                })
1027                .unwrap(),
1028            ))
1029            .unwrap(),
1030        Ok(Err(error)) | Err(error) => Response::builder()
1031            .status(StatusCode::INTERNAL_SERVER_ERROR)
1032            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1033            .header("X-Reason", "Storage error")
1034            .header(header::CONTENT_TYPE, "application/json")
1035            .body(Body::from(format!(r#"{{"error":"{}"}}"#, error)))
1036            .unwrap(),
1037    }
1038}
1039
1040/// DELETE /<sha256> - Delete a blob (BUD-02)
1041/// Note: Blob is only fully deleted when ALL owners have removed it
1042pub async fn delete_blob(
1043    State(state): State<AppState>,
1044    Path(id): Path<String>,
1045    headers: HeaderMap,
1046) -> impl IntoResponse {
1047    let (hash_part, _) = parse_hash_and_extension(&id);
1048
1049    if !is_valid_sha256(hash_part) {
1050        return Response::builder()
1051            .status(StatusCode::BAD_REQUEST)
1052            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1053            .header("X-Reason", "Invalid SHA256 hash")
1054            .body(Body::empty())
1055            .unwrap();
1056    }
1057
1058    let sha256_hex = hash_part.to_lowercase();
1059
1060    // Convert hash to bytes
1061    let sha256_bytes = match from_hex(&sha256_hex) {
1062        Ok(b) => b,
1063        Err(_) => {
1064            return Response::builder()
1065                .status(StatusCode::BAD_REQUEST)
1066                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1067                .header("X-Reason", "Invalid SHA256 hash format")
1068                .body(Body::empty())
1069                .unwrap();
1070        }
1071    };
1072
1073    // Verify authorization with hash requirement
1074    let auth = match verify_blossom_auth(&headers, "delete", Some(&sha256_hex)) {
1075        Ok(a) => a,
1076        Err((status, reason)) => {
1077            return Response::builder()
1078                .status(status)
1079                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1080                .header("X-Reason", reason)
1081                .body(Body::empty())
1082                .unwrap();
1083        }
1084    };
1085
1086    // Convert pubkey hex to bytes
1087    let pubkey_bytes = match from_hex(&auth.pubkey) {
1088        Ok(b) => b,
1089        Err(_) => {
1090            return Response::builder()
1091                .status(StatusCode::BAD_REQUEST)
1092                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1093                .header("X-Reason", "Invalid pubkey format")
1094                .body(Body::empty())
1095                .unwrap();
1096        }
1097    };
1098
1099    // Check ownership - user must be one of the owners (O(1) lookup with composite key)
1100    match state.store.is_blob_owner(&sha256_bytes, &pubkey_bytes) {
1101        Ok(true) => {
1102            // User is an owner, proceed with delete
1103        }
1104        Ok(false) => {
1105            // Check if blob exists at all (for proper error message)
1106            match state.store.blob_has_owners(&sha256_bytes) {
1107                Ok(true) => {
1108                    return Response::builder()
1109                        .status(StatusCode::FORBIDDEN)
1110                        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1111                        .header("X-Reason", "Not a blob owner")
1112                        .body(Body::empty())
1113                        .unwrap();
1114                }
1115                Ok(false) => {
1116                    return Response::builder()
1117                        .status(StatusCode::NOT_FOUND)
1118                        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1119                        .header(header::CACHE_CONTROL, NOT_FOUND_CACHE_CONTROL)
1120                        .header("X-Reason", "Blob not found")
1121                        .body(Body::empty())
1122                        .unwrap();
1123                }
1124                Err(_) => {
1125                    return Response::builder()
1126                        .status(StatusCode::INTERNAL_SERVER_ERROR)
1127                        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1128                        .body(Body::empty())
1129                        .unwrap();
1130                }
1131            }
1132        }
1133        Err(_) => {
1134            return Response::builder()
1135                .status(StatusCode::INTERNAL_SERVER_ERROR)
1136                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1137                .body(Body::empty())
1138                .unwrap();
1139        }
1140    }
1141
1142    // Remove this user's ownership (blob only deleted when no owners remain)
1143    match state
1144        .store
1145        .delete_blossom_blob(&sha256_bytes, &pubkey_bytes)
1146    {
1147        Ok(fully_deleted) => {
1148            // Return 200 OK whether blob was fully deleted or just removed from user's list
1149            // The client doesn't need to know if other owners still exist
1150            Response::builder()
1151                .status(StatusCode::OK)
1152                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1153                .header(
1154                    "X-Blob-Deleted",
1155                    if fully_deleted { "true" } else { "false" },
1156                )
1157                .body(Body::empty())
1158                .unwrap()
1159        }
1160        Err(_) => Response::builder()
1161            .status(StatusCode::INTERNAL_SERVER_ERROR)
1162            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1163            .body(Body::empty())
1164            .unwrap(),
1165    }
1166}
1167
1168/// GET /list/<pubkey> - List blobs for a pubkey (BUD-02)
1169pub async fn list_blobs(
1170    State(state): State<AppState>,
1171    Path(pubkey): Path<String>,
1172    Query(query): Query<ListQuery>,
1173    headers: HeaderMap,
1174) -> impl IntoResponse {
1175    // Validate pubkey format (64 hex chars)
1176    if pubkey.len() != 64 || !pubkey.chars().all(|c| c.is_ascii_hexdigit()) {
1177        return Response::builder()
1178            .status(StatusCode::BAD_REQUEST)
1179            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1180            .header("X-Reason", "Invalid pubkey format")
1181            .header(header::CONTENT_TYPE, "application/json")
1182            .body(Body::from("[]"))
1183            .unwrap();
1184    }
1185
1186    let pubkey_hex = pubkey.to_lowercase();
1187    let pubkey_bytes: [u8; 32] = match from_hex(&pubkey_hex) {
1188        Ok(b) => b,
1189        Err(_) => {
1190            return Response::builder()
1191                .status(StatusCode::BAD_REQUEST)
1192                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1193                .header("X-Reason", "Invalid pubkey format")
1194                .header(header::CONTENT_TYPE, "application/json")
1195                .body(Body::from("[]"))
1196                .unwrap()
1197        }
1198    };
1199
1200    let auth = match verify_blossom_auth(&headers, "list", None) {
1201        Ok(auth) => auth,
1202        Err((status, reason)) => {
1203            return Response::builder()
1204                .status(status)
1205                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1206                .header("X-Reason", reason)
1207                .header(header::CONTENT_TYPE, "application/json")
1208                .body(Body::from("[]"))
1209                .unwrap();
1210        }
1211    };
1212
1213    if !auth.pubkey.eq_ignore_ascii_case(&pubkey_hex) {
1214        return Response::builder()
1215            .status(StatusCode::FORBIDDEN)
1216            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1217            .header("X-Reason", "Pubkey mismatch")
1218            .header(header::CONTENT_TYPE, "application/json")
1219            .body(Body::from("[]"))
1220            .unwrap();
1221    }
1222
1223    // Get blobs for this pubkey
1224    match state.store.list_blobs_by_pubkey(&pubkey_bytes) {
1225        Ok(blobs) => {
1226            // Apply filters
1227            let mut filtered: Vec<_> = blobs
1228                .into_iter()
1229                .filter(|b| {
1230                    if let Some(since) = query.since {
1231                        if b.uploaded < since {
1232                            return false;
1233                        }
1234                    }
1235                    if let Some(until) = query.until {
1236                        if b.uploaded > until {
1237                            return false;
1238                        }
1239                    }
1240                    true
1241                })
1242                .collect();
1243
1244            // Sort by uploaded descending (most recent first)
1245            filtered.sort_by(|a, b| b.uploaded.cmp(&a.uploaded));
1246
1247            // Apply limit
1248            let limit = query.limit.unwrap_or(100).min(1000);
1249            filtered.truncate(limit);
1250
1251            Response::builder()
1252                .status(StatusCode::OK)
1253                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1254                .header(header::CONTENT_TYPE, "application/json")
1255                .body(Body::from(serde_json::to_string(&filtered).unwrap()))
1256                .unwrap()
1257        }
1258        Err(_) => Response::builder()
1259            .status(StatusCode::INTERNAL_SERVER_ERROR)
1260            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1261            .header(header::CONTENT_TYPE, "application/json")
1262            .body(Body::from("[]"))
1263            .unwrap(),
1264    }
1265}
1266
1267// Helper functions
1268
1269fn parse_hash_and_extension(id: &str) -> (&str, Option<&str>) {
1270    if let Some(dot_pos) = id.rfind('.') {
1271        (&id[..dot_pos], Some(&id[dot_pos..]))
1272    } else {
1273        (id, None)
1274    }
1275}
1276
1277fn is_valid_sha256(s: &str) -> bool {
1278    s.len() == 64 && s.chars().all(|c| c.is_ascii_hexdigit())
1279}
1280
1281#[cfg(test)]
1282fn store_blossom_blob(
1283    state: &AppState,
1284    data: &[u8],
1285    _sha256: &[u8; 32],
1286    pubkey: &[u8; 32],
1287    track_ownership: bool,
1288) -> anyhow::Result<()> {
1289    if track_ownership {
1290        state.store.put_owned_blob(data, pubkey)?;
1291    } else {
1292        state.store.put_cached_blob(data)?;
1293    }
1294
1295    Ok(())
1296}
1297
1298fn mime_to_extension(mime: &str) -> &'static str {
1299    match mime {
1300        "image/png" => ".png",
1301        "image/jpeg" => ".jpg",
1302        "image/gif" => ".gif",
1303        "image/webp" => ".webp",
1304        "image/svg+xml" => ".svg",
1305        "video/mp4" => ".mp4",
1306        "video/webm" => ".webm",
1307        "audio/mpeg" => ".mp3",
1308        "audio/ogg" => ".ogg",
1309        "application/pdf" => ".pdf",
1310        "text/plain" => ".txt",
1311        "text/html" => ".html",
1312        "application/json" => ".json",
1313        _ => "",
1314    }
1315}
1316
1317#[cfg(test)]
1318mod tests {
1319    use super::*;
1320    use crate::server::auth::WsRelayState;
1321    use crate::storage::HashtreeStore;
1322    use axum::response::IntoResponse;
1323    use base64::Engine;
1324    use hashtree_core::sha256;
1325    use std::collections::HashSet;
1326    use std::sync::{Arc, Mutex as StdMutex};
1327    use std::time::Duration;
1328    use tempfile::TempDir;
1329
1330    fn test_app_state(store: Arc<HashtreeStore>) -> AppState {
1331        AppState {
1332            store,
1333            auth: None,
1334            daemon_started_at: 1_700_000_000,
1335            peer_mode: crate::config::ServerMode::Normal,
1336            hash_get_enabled: true,
1337            http_webrtc_fetch: true,
1338            webrtc_peers: None,
1339            fips_transport: None,
1340            fetch_from_fips_peers: true,
1341            ws_relay: Arc::new(WsRelayState::new()),
1342            max_upload_bytes: 5 * 1024 * 1024,
1343            public_writes: true,
1344            require_random_untrusted_ingest: true,
1345            optimistic_blossom_uploads: false,
1346            optimistic_upload_queue_bytes: 512 * 1024 * 1024,
1347            optimistic_upload_queue: Arc::new(tokio::sync::Semaphore::new(512 * 1024 * 1024)),
1348            allowed_pubkeys: HashSet::new(),
1349            upstream_blossom: Vec::new(),
1350            social_graph: None,
1351            social_graph_store: None,
1352            social_graph_root: None,
1353            socialgraph_snapshot_public: false,
1354            nostr_relay: None,
1355            nostr_relay_urls: Vec::new(),
1356            tree_root_cache: Arc::new(StdMutex::new(std::collections::HashMap::new())),
1357            inflight_blob_fetches: Arc::new(tokio::sync::Mutex::new(
1358                std::collections::HashMap::new(),
1359            )),
1360            inflight_blob_reads: Arc::new(
1361                tokio::sync::Mutex::new(std::collections::HashMap::new()),
1362            ),
1363            blob_cache: Arc::new(crate::blob_cache::BlobCache::for_tests()),
1364            directory_listing_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1365            resolved_path_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1366            thumbnail_path_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1367            cid_size_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1368        }
1369    }
1370
1371    fn create_upload_auth_header(keys: &nostr::Keys) -> String {
1372        use nostr::{EventBuilder, Kind, Tag, TagKind, Timestamp};
1373
1374        let now = Timestamp::now();
1375        let event = EventBuilder::new(
1376            Kind::Custom(BLOSSOM_AUTH_KIND),
1377            "",
1378            vec![
1379                Tag::custom(TagKind::Custom("t".into()), vec!["upload".to_string()]),
1380                Tag::custom(
1381                    TagKind::Custom("expiration".into()),
1382                    vec![(now.as_u64() + 300).to_string()],
1383                ),
1384            ],
1385        )
1386        .custom_created_at(now)
1387        .to_event(keys)
1388        .expect("sign blossom auth");
1389        let json = serde_json::to_vec(&event).expect("serialize auth event");
1390        format!(
1391            "Nostr {}",
1392            base64::engine::general_purpose::STANDARD.encode(json)
1393        )
1394    }
1395
1396    #[test]
1397    fn test_is_valid_sha256() {
1398        assert!(is_valid_sha256(
1399            "e2bab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6"
1400        ));
1401        assert!(is_valid_sha256(
1402            "0000000000000000000000000000000000000000000000000000000000000000"
1403        ));
1404
1405        // Too short
1406        assert!(!is_valid_sha256("e2bab35b5296ec2242ded0a01f6d6723"));
1407        // Too long
1408        assert!(!is_valid_sha256(
1409            "e2bab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6aa"
1410        ));
1411        // Invalid chars
1412        assert!(!is_valid_sha256(
1413            "zzbab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6"
1414        ));
1415        // Empty
1416        assert!(!is_valid_sha256(""));
1417    }
1418
1419    #[test]
1420    fn test_parse_hash_and_extension() {
1421        let (hash, ext) = parse_hash_and_extension("abc123.png");
1422        assert_eq!(hash, "abc123");
1423        assert_eq!(ext, Some(".png"));
1424
1425        let (hash2, ext2) = parse_hash_and_extension("abc123");
1426        assert_eq!(hash2, "abc123");
1427        assert_eq!(ext2, None);
1428
1429        let (hash3, ext3) = parse_hash_and_extension("abc.123.jpg");
1430        assert_eq!(hash3, "abc.123");
1431        assert_eq!(ext3, Some(".jpg"));
1432    }
1433
1434    #[test]
1435    fn test_mime_to_extension() {
1436        assert_eq!(mime_to_extension("image/png"), ".png");
1437        assert_eq!(mime_to_extension("image/jpeg"), ".jpg");
1438        assert_eq!(mime_to_extension("video/mp4"), ".mp4");
1439        assert_eq!(mime_to_extension("application/octet-stream"), "");
1440        assert_eq!(mime_to_extension("unknown/type"), "");
1441    }
1442
1443    #[tokio::test]
1444    async fn optimistic_uploads_return_accepted_and_store_in_background() {
1445        let temp_dir = TempDir::new().expect("temp dir");
1446        let store = Arc::new(
1447            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1448        );
1449        let mut state = test_app_state(Arc::clone(&store));
1450        state.optimistic_blossom_uploads = true;
1451
1452        let keys = nostr::Keys::generate();
1453        let mut headers = HeaderMap::new();
1454        headers.insert(
1455            header::AUTHORIZATION,
1456            create_upload_auth_header(&keys)
1457                .parse()
1458                .expect("auth header value"),
1459        );
1460        headers.insert(
1461            header::CONTENT_TYPE,
1462            "application/octet-stream"
1463                .parse()
1464                .expect("content type header value"),
1465        );
1466
1467        let body = axum::body::Bytes::from((0u8..=255).map(|byte| byte ^ 0x55).collect::<Vec<_>>());
1468        let hash = sha256(&body);
1469        let response = upload_blob(State(state), headers, body)
1470            .await
1471            .into_response();
1472        assert_eq!(response.status(), StatusCode::ACCEPTED);
1473
1474        for _ in 0..50 {
1475            if store.blob_exists(&hash).expect("blob exists check") {
1476                return;
1477            }
1478            tokio::time::sleep(Duration::from_millis(10)).await;
1479        }
1480
1481        panic!("optimistic upload was not stored in the background");
1482    }
1483
1484    #[tokio::test]
1485    async fn optimistic_upload_existing_blob_skips_queue() {
1486        let temp_dir = TempDir::new().expect("temp dir");
1487        let store = Arc::new(
1488            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1489        );
1490        let body = axum::body::Bytes::from(
1491            (0u16..=255)
1492                .map(|value| ((value * 73 + 19) % 256) as u8)
1493                .collect::<Vec<_>>(),
1494        );
1495        store.put_cached_blob(&body).expect("seed blob");
1496
1497        let mut state = test_app_state(Arc::clone(&store));
1498        state.optimistic_blossom_uploads = true;
1499        state.optimistic_upload_queue_bytes = 1;
1500        state.optimistic_upload_queue = Arc::new(tokio::sync::Semaphore::new(1));
1501
1502        let keys = nostr::Keys::generate();
1503        let mut headers = HeaderMap::new();
1504        headers.insert(
1505            header::AUTHORIZATION,
1506            create_upload_auth_header(&keys)
1507                .parse()
1508                .expect("auth header value"),
1509        );
1510        headers.insert(
1511            header::CONTENT_TYPE,
1512            "application/octet-stream"
1513                .parse()
1514                .expect("content type header value"),
1515        );
1516
1517        let response = upload_blob(State(state), headers, body)
1518            .await
1519            .into_response();
1520        assert_eq!(response.status(), StatusCode::CONFLICT);
1521    }
1522
1523    #[tokio::test]
1524    async fn optimistic_upload_existing_blob_uses_queue_before_preflight_when_queue_has_room() {
1525        let temp_dir = TempDir::new().expect("temp dir");
1526        let store = Arc::new(
1527            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1528        );
1529        let body = axum::body::Bytes::from((0u8..=255).rev().collect::<Vec<_>>());
1530        let hash_hex = hex::encode(sha256(&body));
1531        store.put_cached_blob(&body).expect("seed blob");
1532
1533        let mut state = test_app_state(store);
1534        state.optimistic_blossom_uploads = true;
1535
1536        let keys = nostr::Keys::generate();
1537        let mut headers = HeaderMap::new();
1538        headers.insert(
1539            header::AUTHORIZATION,
1540            create_upload_auth_header(&keys)
1541                .parse()
1542                .expect("auth header value"),
1543        );
1544        headers.insert(
1545            header::CONTENT_TYPE,
1546            "application/octet-stream"
1547                .parse()
1548                .expect("content type header value"),
1549        );
1550
1551        let response = upload_blob(State(state), headers, body)
1552            .await
1553            .into_response();
1554        assert_eq!(response.status(), StatusCode::ACCEPTED);
1555
1556        for _ in 0..50 {
1557            if !optimistic_upload_is_inflight(&hash_hex) {
1558                return;
1559            }
1560            tokio::time::sleep(Duration::from_millis(10)).await;
1561        }
1562
1563        clear_optimistic_upload_inflight(&hash_hex);
1564        panic!("optimistic upload in-flight marker was not cleared");
1565    }
1566
1567    #[tokio::test]
1568    async fn optimistic_upload_inflight_duplicate_skips_queue() {
1569        let temp_dir = TempDir::new().expect("temp dir");
1570        let store = Arc::new(
1571            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1572        );
1573        let body = axum::body::Bytes::from((0u8..=255).collect::<Vec<_>>());
1574        let hash_hex = hex::encode(sha256(&body));
1575        assert!(mark_optimistic_upload_inflight(&hash_hex));
1576
1577        let mut state = test_app_state(store);
1578        state.optimistic_blossom_uploads = true;
1579        state.optimistic_upload_queue_bytes = 1;
1580        state.optimistic_upload_queue = Arc::new(tokio::sync::Semaphore::new(1));
1581
1582        let keys = nostr::Keys::generate();
1583        let mut headers = HeaderMap::new();
1584        headers.insert(
1585            header::AUTHORIZATION,
1586            create_upload_auth_header(&keys)
1587                .parse()
1588                .expect("auth header value"),
1589        );
1590        headers.insert(
1591            header::CONTENT_TYPE,
1592            "application/octet-stream"
1593                .parse()
1594                .expect("content type header value"),
1595        );
1596
1597        let response = upload_blob(State(state), headers, body)
1598            .await
1599            .into_response();
1600        clear_optimistic_upload_inflight(&hash_hex);
1601        assert_eq!(response.status(), StatusCode::ACCEPTED);
1602    }
1603
1604    #[test]
1605    fn public_writes_accept_unlisted_authors_for_uploads() {
1606        let temp_dir = TempDir::new().expect("temp dir");
1607        let store =
1608            Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 700).expect("store"));
1609        let mut state = test_app_state(store);
1610        let pubkey = "ea4fe79e57f209309bffed2f92f0b95b59d3d1cb4e8892444398aeea7ee317ed";
1611
1612        state.public_writes = true;
1613        assert!(can_accept_upload_author(&state, pubkey));
1614        assert!(!is_allowed_write_author(&state, pubkey));
1615
1616        state.public_writes = false;
1617        assert!(!can_accept_upload_author(&state, pubkey));
1618    }
1619
1620    #[test]
1621    fn public_write_trust_allows_octet_stream_and_raw_media_payloads() {
1622        let encrypted_block: Vec<u8> = (0..=255).collect();
1623
1624        assert_eq!(
1625            validate_upload_payload(&encrypted_block, "application/octet-stream", false, true,),
1626            Ok(())
1627        );
1628
1629        assert_eq!(
1630            validate_upload_payload(b"audio bytes", "audio/mpeg", true, true,),
1631            Ok(())
1632        );
1633
1634        assert_eq!(
1635            validate_upload_payload(b"audio bytes", "audio/mpeg", false, true,),
1636            Err((
1637                StatusCode::FORBIDDEN,
1638                "Raw media uploads require write access".to_string(),
1639            ))
1640        );
1641    }
1642
1643    #[test]
1644    fn unowned_public_uploads_use_cache_storage_semantics() {
1645        let temp_dir = TempDir::new().expect("temp dir");
1646        let store =
1647            Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 700).expect("store"));
1648        let state = test_app_state(Arc::clone(&store));
1649
1650        let owned = vec![1u8; 280];
1651        let owned_hash = sha256(&owned);
1652        store_blossom_blob(&state, &owned, &owned_hash, &[2u8; 32], true).expect("owned upload");
1653
1654        let public_upload = vec![3u8; 280];
1655        let public_hash = sha256(&public_upload);
1656        store_blossom_blob(&state, &public_upload, &public_hash, &[4u8; 32], false)
1657            .expect("public upload");
1658
1659        let replacement = vec![5u8; 280];
1660        let replacement_hash = sha256(&replacement);
1661        state
1662            .store
1663            .put_cached_blob(&replacement)
1664            .expect("replacement cached blob");
1665
1666        assert!(state.store.blob_exists(&owned_hash).expect("owned exists"));
1667        assert!(!state
1668            .store
1669            .blob_exists(&public_hash)
1670            .expect("public upload evicted"));
1671        assert!(state
1672            .store
1673            .blob_exists(&replacement_hash)
1674            .expect("replacement exists"));
1675        assert!(state
1676            .store
1677            .is_blob_owner(&owned_hash, &[2u8; 32])
1678            .expect("owned tracked"));
1679        assert!(!state
1680            .store
1681            .blob_has_owners(&public_hash)
1682            .expect("public upload unowned"));
1683    }
1684
1685    #[test]
1686    fn owned_blossom_uploads_are_rejected_when_storage_limit_is_full() {
1687        let temp_dir = TempDir::new().expect("temp dir");
1688        let store =
1689            Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 500).expect("store"));
1690        let state = test_app_state(Arc::clone(&store));
1691
1692        let first = vec![1u8; 300];
1693        let first_hash = sha256(&first);
1694        let owner = [2u8; 32];
1695        store_blossom_blob(&state, &first, &first_hash, &owner, true).expect("first upload");
1696
1697        let second = vec![3u8; 300];
1698        let second_hash = sha256(&second);
1699        let error = store_blossom_blob(&state, &second, &second_hash, &owner, true)
1700            .expect_err("second owned upload should exceed the storage limit");
1701
1702        assert!(
1703            error.to_string().contains("storage limit"),
1704            "unexpected error: {error}"
1705        );
1706        assert!(state
1707            .store
1708            .blob_exists(&first_hash)
1709            .expect("first blob remains"));
1710        assert!(!state
1711            .store
1712            .blob_exists(&second_hash)
1713            .expect("second blob rejected"));
1714        assert!(state
1715            .store
1716            .is_blob_owner(&first_hash, &owner)
1717            .expect("first owner tracked"));
1718        assert!(!state
1719            .store
1720            .is_blob_owner(&second_hash, &owner)
1721            .expect("second owner not tracked"));
1722    }
1723}