Skip to main content

hashtree_cli/server/
blossom.rs

1//! Blossom protocol implementation (BUD-01, BUD-02)
2//!
3//! Implements blob storage endpoints with Nostr-based authentication.
4//! See: https://github.com/hzrd149/blossom
5
6use axum::{
7    body::Body,
8    extract::{Path, Query, State},
9    http::{header, HeaderMap, Response, StatusCode},
10    response::IntoResponse,
11};
12use base64::Engine;
13use hashtree_core::from_hex;
14use serde::{Deserialize, Serialize};
15use sha2::{Digest, Sha256};
16use std::collections::HashSet;
17use std::sync::{Mutex, OnceLock};
18use std::time::{Duration, SystemTime, UNIX_EPOCH};
19
20use super::auth::AppState;
21use super::blob_read::{acquire_blob_read, acquire_blob_write, blob_read_timeout, BLOB_READ_BUSY};
22use super::ingest_filter::{
23    content_type_base, is_chk_content_type, validate_untrusted_blob, IngestRejection,
24};
25use super::mime::get_mime_type;
26
27/// Blossom authorization event kind (NIP-98 style)
28const BLOSSOM_AUTH_KIND: u16 = 24242;
29
30/// Cache-Control header for immutable content-addressed data (1 year)
31const IMMUTABLE_CACHE_CONTROL: &str = "public, max-age=31536000, immutable";
32const NOT_FOUND_CACHE_CONTROL: &str = "no-store";
33const IMMUTABLE_NOT_FOUND_CACHE_CONTROL: &str = "public, max-age=0, s-maxage=5";
34const OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS_ENV: &str = "HTREE_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS";
35const DEFAULT_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS: u64 = 15_000;
36
37/// Default maximum upload size in bytes (5 MB)
38pub const DEFAULT_MAX_UPLOAD_SIZE: usize = 5 * 1024 * 1024;
39const OPTIMISTIC_UPLOAD_MIN_QUEUE_CHARGE_BYTES: usize = 256 * 1024;
40
41#[derive(Debug, Clone, Copy)]
42pub(super) struct OptimisticUploadQueueSnapshot {
43    pub enabled: bool,
44    pub max_bytes: usize,
45    pub available_bytes: usize,
46    pub reserved_bytes: usize,
47    pub in_flight: usize,
48    pub queue_timeout_ms: u64,
49}
50
51/// Check if a pubkey has write access based on allowed_npubs config or social graph
52/// Returns Ok(()) if allowed, Err with JSON error body if denied
53#[allow(clippy::result_large_err)]
54fn check_write_access(state: &AppState, pubkey: &str) -> Result<(), Response<Body>> {
55    // Check if pubkey is in the allowed list (converted from npub to hex)
56    if is_allowed_write_author(state, pubkey) {
57        tracing::debug!(
58            "Blossom write allowed for {}... (allowed writer)",
59            &pubkey[..8.min(pubkey.len())]
60        );
61        return Ok(());
62    }
63
64    // Not in allowed list or social graph
65    tracing::info!(
66        "Blossom write denied for {}... (not in allowed_npubs or social graph)",
67        &pubkey[..8.min(pubkey.len())]
68    );
69    Err(Response::builder()
70        .status(StatusCode::FORBIDDEN)
71        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
72        .header(header::CONTENT_TYPE, "application/json")
73        .body(Body::from(
74            r#"{"error":"Write access denied. Your pubkey is not in the allowed list."}"#,
75        ))
76        .unwrap())
77}
78
79fn is_allowed_write_author(state: &AppState, pubkey: &str) -> bool {
80    if state.allowed_pubkeys.contains(pubkey) {
81        return true;
82    }
83
84    state
85        .social_graph
86        .as_ref()
87        .map(|sg| sg.check_write_access(pubkey))
88        .unwrap_or(false)
89}
90
91fn can_accept_upload_author(state: &AppState, pubkey: &str) -> bool {
92    state.public_writes || is_allowed_write_author(state, pubkey)
93}
94
95fn validate_upload_payload(
96    body: &[u8],
97    content_type: &str,
98    is_allowed_writer: bool,
99    require_random_untrusted_ingest: bool,
100) -> Result<(), (StatusCode, String)> {
101    let is_chk_upload = is_chk_content_type(content_type);
102
103    if !is_chk_upload && !is_allowed_writer {
104        return Err((
105            StatusCode::FORBIDDEN,
106            "Raw media uploads require write access".to_string(),
107        ));
108    }
109
110    if is_chk_upload {
111        validate_untrusted_blob(body, require_random_untrusted_ingest)
112            .map_err(|IngestRejection { status, reason }| (status, reason))?;
113    }
114
115    Ok(())
116}
117
118fn blossom_json_error(status: StatusCode, reason: impl Into<String>) -> Response<Body> {
119    let reason = reason.into();
120    Response::builder()
121        .status(status)
122        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
123        .header("X-Reason", reason.as_str())
124        .header(header::CONTENT_TYPE, "application/json")
125        .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
126        .unwrap()
127}
128
129fn blossom_retryable_json_error(
130    status: StatusCode,
131    reason: impl Into<String>,
132    retry_after_seconds: u64,
133) -> Response<Body> {
134    let reason = reason.into();
135    Response::builder()
136        .status(status)
137        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
138        .header("X-Reason", reason.as_str())
139        .header(header::RETRY_AFTER, retry_after_seconds.to_string())
140        .header(header::CONTENT_TYPE, "application/json")
141        .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
142        .unwrap()
143}
144
145/// Blob descriptor returned by upload and list endpoints
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct BlobDescriptor {
148    pub url: String,
149    pub sha256: String,
150    pub size: u64,
151    #[serde(rename = "type")]
152    pub mime_type: String,
153    pub uploaded: u64,
154}
155
156/// Query parameters for list endpoint
157#[derive(Debug, Deserialize)]
158pub struct ListQuery {
159    pub since: Option<u64>,
160    pub until: Option<u64>,
161    pub limit: Option<usize>,
162    pub cursor: Option<String>,
163}
164
165/// Parsed Nostr authorization event
166#[derive(Debug)]
167pub struct BlossomAuth {
168    pub pubkey: String,
169    pub kind: u16,
170    pub created_at: u64,
171    pub expiration: Option<u64>,
172    pub action: Option<String>,   // "upload", "delete", "list", "get"
173    pub blob_hashes: Vec<String>, // x tags
174    pub server: Option<String>,   // server tag
175}
176
177/// Parse and verify Nostr authorization from header
178/// Returns the verified auth or an error response
179pub fn verify_blossom_auth(
180    headers: &HeaderMap,
181    required_action: &str,
182    required_hash: Option<&str>,
183) -> Result<BlossomAuth, (StatusCode, &'static str)> {
184    let auth_header = headers
185        .get(header::AUTHORIZATION)
186        .and_then(|v| v.to_str().ok())
187        .ok_or((StatusCode::UNAUTHORIZED, "Missing Authorization header"))?;
188
189    let nostr_event = auth_header.strip_prefix("Nostr ").ok_or((
190        StatusCode::UNAUTHORIZED,
191        "Invalid auth scheme, expected 'Nostr'",
192    ))?;
193
194    // Decode base64 event
195    let engine = base64::engine::general_purpose::STANDARD;
196    let event_bytes = engine
197        .decode(nostr_event)
198        .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid base64 in auth header"))?;
199
200    let event_json: serde_json::Value = serde_json::from_slice(&event_bytes)
201        .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid JSON in auth event"))?;
202
203    // Extract event fields
204    let kind = event_json["kind"]
205        .as_u64()
206        .ok_or((StatusCode::BAD_REQUEST, "Missing kind in event"))?;
207
208    if kind != BLOSSOM_AUTH_KIND as u64 {
209        return Err((
210            StatusCode::BAD_REQUEST,
211            "Invalid event kind, expected 24242",
212        ));
213    }
214
215    let pubkey = event_json["pubkey"]
216        .as_str()
217        .ok_or((StatusCode::BAD_REQUEST, "Missing pubkey in event"))?
218        .to_string();
219
220    let created_at = event_json["created_at"]
221        .as_u64()
222        .ok_or((StatusCode::BAD_REQUEST, "Missing created_at in event"))?;
223
224    let sig = event_json["sig"]
225        .as_str()
226        .ok_or((StatusCode::BAD_REQUEST, "Missing signature in event"))?;
227
228    // Verify signature
229    if !verify_nostr_signature(&event_json, &pubkey, sig) {
230        return Err((StatusCode::UNAUTHORIZED, "Invalid signature"));
231    }
232
233    // Parse tags
234    let tags = event_json["tags"]
235        .as_array()
236        .ok_or((StatusCode::BAD_REQUEST, "Missing tags in event"))?;
237
238    let mut expiration: Option<u64> = None;
239    let mut action: Option<String> = None;
240    let mut blob_hashes: Vec<String> = Vec::new();
241    let mut server: Option<String> = None;
242
243    for tag in tags {
244        let tag_arr = tag.as_array();
245        if let Some(arr) = tag_arr {
246            if arr.len() >= 2 {
247                let tag_name = arr[0].as_str().unwrap_or("");
248                let tag_value = arr[1].as_str().unwrap_or("");
249
250                match tag_name {
251                    "t" => action = Some(tag_value.to_string()),
252                    "x" => blob_hashes.push(tag_value.to_lowercase()),
253                    "expiration" => expiration = tag_value.parse().ok(),
254                    "server" => server = Some(tag_value.to_string()),
255                    _ => {}
256                }
257            }
258        }
259    }
260
261    // Validate expiration
262    let now = SystemTime::now()
263        .duration_since(UNIX_EPOCH)
264        .unwrap()
265        .as_secs();
266
267    if let Some(exp) = expiration {
268        if exp < now {
269            return Err((StatusCode::UNAUTHORIZED, "Authorization expired"));
270        }
271    }
272
273    // Validate created_at is not in the future (with 60s tolerance)
274    if created_at > now + 60 {
275        return Err((StatusCode::BAD_REQUEST, "Event created_at is in the future"));
276    }
277
278    // Validate action matches
279    if let Some(ref act) = action {
280        if act != required_action {
281            return Err((StatusCode::FORBIDDEN, "Action mismatch"));
282        }
283    } else {
284        return Err((StatusCode::BAD_REQUEST, "Missing 't' tag for action"));
285    }
286
287    // Validate hash if required
288    if let Some(hash) = required_hash {
289        if !blob_hashes.is_empty() && !blob_hashes.contains(&hash.to_lowercase()) {
290            return Err((StatusCode::FORBIDDEN, "Blob hash not authorized"));
291        }
292    }
293
294    Ok(BlossomAuth {
295        pubkey,
296        kind: kind as u16,
297        created_at,
298        expiration,
299        action,
300        blob_hashes,
301        server,
302    })
303}
304
305/// Verify Nostr event signature using secp256k1
306fn verify_nostr_signature(event: &serde_json::Value, pubkey: &str, sig: &str) -> bool {
307    use secp256k1::{schnorr::Signature, Message, Secp256k1, XOnlyPublicKey};
308
309    // Compute event ID (sha256 of serialized event)
310    let content = event["content"].as_str().unwrap_or("");
311    let full_serialized = format!(
312        "[0,\"{}\",{},{},{},\"{}\"]",
313        pubkey,
314        event["created_at"],
315        event["kind"],
316        event["tags"],
317        escape_json_string(content),
318    );
319
320    let mut hasher = Sha256::new();
321    hasher.update(full_serialized.as_bytes());
322    let event_id = hasher.finalize();
323
324    // Parse pubkey and signature
325    let pubkey_bytes = match hex::decode(pubkey) {
326        Ok(b) => b,
327        Err(_) => return false,
328    };
329
330    let sig_bytes = match hex::decode(sig) {
331        Ok(b) => b,
332        Err(_) => return false,
333    };
334
335    let secp = Secp256k1::verification_only();
336
337    let xonly_pubkey = match XOnlyPublicKey::from_slice(&pubkey_bytes) {
338        Ok(pk) => pk,
339        Err(_) => return false,
340    };
341
342    let signature = match Signature::from_slice(&sig_bytes) {
343        Ok(s) => s,
344        Err(_) => return false,
345    };
346
347    let message = match Message::from_digest_slice(&event_id) {
348        Ok(m) => m,
349        Err(_) => return false,
350    };
351
352    secp.verify_schnorr(&signature, &message, &xonly_pubkey)
353        .is_ok()
354}
355
356/// Escape string for JSON serialization
357fn escape_json_string(s: &str) -> String {
358    let mut result = String::new();
359    for c in s.chars() {
360        match c {
361            '"' => result.push_str("\\\""),
362            '\\' => result.push_str("\\\\"),
363            '\n' => result.push_str("\\n"),
364            '\r' => result.push_str("\\r"),
365            '\t' => result.push_str("\\t"),
366            c if c.is_control() => {
367                result.push_str(&format!("\\u{:04x}", c as u32));
368            }
369            c => result.push(c),
370        }
371    }
372    result
373}
374
375/// CORS preflight handler for all Blossom endpoints
376/// Echoes back Access-Control-Request-Headers to allow any headers
377pub async fn cors_preflight(headers: HeaderMap) -> impl IntoResponse {
378    // Echo back requested headers, or use sensible defaults that cover common Blossom headers
379    let allowed_headers = headers
380        .get(header::ACCESS_CONTROL_REQUEST_HEADERS)
381        .and_then(|v| v.to_str().ok())
382        .unwrap_or("Authorization, Content-Type, X-SHA-256, x-sha-256");
383
384    // Always include common headers in addition to what was requested
385    let full_allowed = format!(
386        "{}, Authorization, Content-Type, X-SHA-256, x-sha-256, Accept, Cache-Control",
387        allowed_headers
388    );
389
390    Response::builder()
391        .status(StatusCode::NO_CONTENT)
392        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
393        .header(
394            header::ACCESS_CONTROL_ALLOW_METHODS,
395            "GET, HEAD, PUT, DELETE, OPTIONS",
396        )
397        .header(header::ACCESS_CONTROL_ALLOW_HEADERS, full_allowed)
398        .header(header::ACCESS_CONTROL_MAX_AGE, "86400")
399        .body(Body::empty())
400        .unwrap()
401}
402
403/// HEAD /<sha256> - Check if blob exists
404pub async fn head_blob(
405    State(state): State<AppState>,
406    Path(id): Path<String>,
407    connect_info: axum::extract::ConnectInfo<std::net::SocketAddr>,
408) -> impl IntoResponse {
409    let is_localhost = connect_info.0.ip().is_loopback();
410    let (hash_part, ext) = parse_hash_and_extension(&id);
411
412    if !is_valid_sha256(hash_part) {
413        return Response::builder()
414            .status(StatusCode::BAD_REQUEST)
415            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
416            .header("X-Reason", "Invalid SHA256 hash")
417            .body(Body::empty())
418            .unwrap();
419    }
420
421    let sha256_hex = hash_part.to_lowercase();
422    let sha256_bytes: [u8; 32] = match from_hex(&sha256_hex) {
423        Ok(b) => b,
424        Err(_) => {
425            return Response::builder()
426                .status(StatusCode::BAD_REQUEST)
427                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
428                .header("X-Reason", "Invalid SHA256 format")
429                .body(Body::empty())
430                .unwrap()
431        }
432    };
433
434    // Blossom HEAD only needs metadata; avoid reading the full blob body just to
435    // answer cache probes and CDN revalidation. The read permit keeps CDN probe
436    // storms from filling Tokio's blocking thread pool while old blobs without
437    // metadata are still being normalized.
438    let blob_size = if let Some(cached) = state.blob_cache.get_size(&sha256_hex) {
439        Ok(Ok(cached))
440    } else {
441        let permit = match acquire_blob_read().await {
442            Ok(permit) => permit,
443            Err(_) => {
444                return Response::builder()
445                    .status(StatusCode::SERVICE_UNAVAILABLE)
446                    .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
447                    .header(header::CACHE_CONTROL, NOT_FOUND_CACHE_CONTROL)
448                    .header("Retry-After", "1")
449                    .header("X-Reason", BLOB_READ_BUSY)
450                    .body(Body::empty())
451                    .unwrap()
452            }
453        };
454        let store = state.store.clone();
455        let size_read = tokio::task::spawn_blocking(move || store.blob_size(&sha256_bytes));
456        let timed = tokio::time::timeout(blob_read_timeout(), size_read).await;
457        drop(permit);
458        let result = match timed {
459            Ok(result) => result.map_err(|_| ()),
460            Err(_) => Err(()),
461        };
462        if let Ok(Ok(size)) = result {
463            state.blob_cache.put_size(sha256_hex.clone(), size);
464        }
465        result
466    };
467
468    match blob_size {
469        Ok(Ok(Some(size))) => {
470            let mime_type = ext
471                .map(|e| get_mime_type(&format!("file{}", e)))
472                .unwrap_or("application/octet-stream");
473
474            let mut builder = Response::builder()
475                .status(StatusCode::OK)
476                .header(header::CONTENT_TYPE, mime_type)
477                .header(header::CONTENT_LENGTH, size)
478                .header(header::ACCEPT_RANGES, "bytes")
479                .header(header::CACHE_CONTROL, IMMUTABLE_CACHE_CONTROL)
480                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*");
481            if is_localhost {
482                builder = builder.header("X-Source", "local");
483            }
484            builder.body(Body::empty()).unwrap()
485        }
486        Ok(Ok(None)) => Response::builder()
487            .status(StatusCode::NOT_FOUND)
488            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
489            .header(header::CACHE_CONTROL, IMMUTABLE_NOT_FOUND_CACHE_CONTROL)
490            .header("X-Reason", "Blob not found")
491            .body(Body::empty())
492            .unwrap(),
493        _ => Response::builder()
494            .status(StatusCode::INTERNAL_SERVER_ERROR)
495            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
496            .body(Body::empty())
497            .unwrap(),
498    }
499}
500
501async fn store_blossom_blob_without_blocking_runtime(
502    state: &AppState,
503    data: axum::body::Bytes,
504    pubkey: [u8; 32],
505    track_ownership: bool,
506) -> anyhow::Result<()> {
507    let mut hasher = Sha256::new();
508    hasher.update(&data);
509    let hash_hex = hex::encode(hasher.finalize());
510    let data_for_cache = data.clone();
511    let permit = acquire_blob_write()
512        .await
513        .map_err(|err| anyhow::anyhow!(err))?;
514    let store = state.store.clone();
515    tokio::task::spawn_blocking(move || {
516        let _permit = permit;
517        if track_ownership {
518            store.put_owned_blob(&data, &pubkey)?;
519        } else {
520            store.put_cached_blob(&data)?;
521        }
522        Ok::<(), anyhow::Error>(())
523    })
524    .await
525    .map_err(|err| anyhow::anyhow!("blob write task failed: {}", err))??;
526    state
527        .blob_cache
528        .put_size(hash_hex.clone(), Some(data_for_cache.len() as u64));
529    state.blob_cache.put_body(hash_hex, &data_for_cache);
530    Ok(())
531}
532
533fn upload_descriptor_response(status: StatusCode, descriptor: &BlobDescriptor) -> Response<Body> {
534    Response::builder()
535        .status(status)
536        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
537        .header(header::CONTENT_TYPE, "application/json")
538        .body(Body::from(serde_json::to_string(descriptor).unwrap()))
539        .unwrap()
540}
541
542fn optimistic_upload_queue_timeout() -> Duration {
543    let millis = std::env::var(OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS_ENV)
544        .ok()
545        .and_then(|value| value.parse::<u64>().ok())
546        .filter(|value| *value > 0)
547        .unwrap_or(DEFAULT_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS);
548    Duration::from_millis(millis)
549}
550
551fn optimistic_upload_inflight() -> &'static Mutex<HashSet<String>> {
552    static INFLIGHT: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
553    INFLIGHT.get_or_init(|| Mutex::new(HashSet::new()))
554}
555
556fn optimistic_upload_is_inflight(hash_hex: &str) -> bool {
557    optimistic_upload_inflight()
558        .lock()
559        .is_ok_and(|inflight| inflight.contains(hash_hex))
560}
561
562fn mark_optimistic_upload_inflight(hash_hex: &str) -> bool {
563    optimistic_upload_inflight()
564        .lock()
565        .map(|mut inflight| inflight.insert(hash_hex.to_string()))
566        .unwrap_or(true)
567}
568
569fn clear_optimistic_upload_inflight(hash_hex: &str) {
570    if let Ok(mut inflight) = optimistic_upload_inflight().lock() {
571        inflight.remove(hash_hex);
572    }
573}
574
575pub(super) fn optimistic_upload_queue_snapshot(state: &AppState) -> OptimisticUploadQueueSnapshot {
576    let max_bytes = state.optimistic_upload_queue_bytes;
577    let available_bytes = state
578        .optimistic_upload_queue
579        .available_permits()
580        .min(max_bytes);
581    let in_flight = optimistic_upload_inflight()
582        .lock()
583        .map(|inflight| inflight.len())
584        .unwrap_or(0);
585
586    OptimisticUploadQueueSnapshot {
587        enabled: state.optimistic_blossom_uploads,
588        max_bytes,
589        available_bytes,
590        reserved_bytes: max_bytes.saturating_sub(available_bytes),
591        in_flight,
592        queue_timeout_ms: duration_millis_u64(optimistic_upload_queue_timeout()),
593    }
594}
595
596fn duration_millis_u64(duration: Duration) -> u64 {
597    duration.as_millis().min(u128::from(u64::MAX)) as u64
598}
599
600async fn acquire_optimistic_upload_queue(
601    state: &AppState,
602    permits: u32,
603) -> Result<tokio::sync::OwnedSemaphorePermit, &'static str> {
604    match tokio::time::timeout(
605        optimistic_upload_queue_timeout(),
606        state
607            .optimistic_upload_queue
608            .clone()
609            .acquire_many_owned(permits),
610    )
611    .await
612    {
613        Ok(Ok(permit)) => Ok(permit),
614        Ok(Err(_)) => Err("Optimistic upload queue is closed"),
615        Err(_) => Err("Optimistic upload queue is full"),
616    }
617}
618
619async fn uploaded_blob_already_exists(
620    state: &AppState,
621    sha256_hash: [u8; 32],
622    sha256_hex: &str,
623) -> Result<bool, String> {
624    if let Some(Some(_)) = state.blob_cache.get_size(sha256_hex) {
625        return Ok(true);
626    }
627
628    let permit = acquire_blob_read().await.map_err(str::to_string)?;
629    let store = state.store.clone();
630    let size_read = tokio::task::spawn_blocking(move || {
631        store
632            .blob_size(&sha256_hash)
633            .map_err(|error| error.to_string())
634    });
635
636    let result = tokio::time::timeout(blob_read_timeout(), size_read).await;
637    drop(permit);
638    match result {
639        Ok(Ok(Ok(size))) => {
640            state.blob_cache.put_size(sha256_hex.to_string(), size);
641            Ok(size.is_some())
642        }
643        Ok(Ok(Err(error))) => Err(error),
644        Ok(Err(error)) => Err(format!("blob existence task failed: {}", error)),
645        Err(_) => Err("blob existence check timed out".to_string()),
646    }
647}
648
649async fn set_existing_blob_owner_without_body_write(
650    state: AppState,
651    sha256_hash: [u8; 32],
652    pubkey: [u8; 32],
653) -> anyhow::Result<()> {
654    tokio::task::spawn_blocking(move || state.store.set_blob_owner(&sha256_hash, &pubkey))
655        .await
656        .map_err(|error| anyhow::anyhow!("blob owner task failed: {}", error))??;
657    Ok(())
658}
659
660/// PUT /upload - Upload a new blob (BUD-02)
661pub async fn upload_blob(
662    State(state): State<AppState>,
663    headers: HeaderMap,
664    body: axum::body::Bytes,
665) -> impl IntoResponse {
666    // Check size limit first (before auth to save resources)
667    let max_size = state.max_upload_bytes;
668    if body.len() > max_size {
669        return Response::builder()
670            .status(StatusCode::PAYLOAD_TOO_LARGE)
671            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
672            .header(header::CONTENT_TYPE, "application/json")
673            .body(Body::from(format!(
674                r#"{{"error":"Upload size {} bytes exceeds maximum {} bytes ({} MB)"}}"#,
675                body.len(),
676                max_size,
677                max_size / 1024 / 1024
678            )))
679            .unwrap();
680    }
681
682    // Verify authorization
683    let auth = match verify_blossom_auth(&headers, "upload", None) {
684        Ok(a) => a,
685        Err((status, reason)) => {
686            return Response::builder()
687                .status(status)
688                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
689                .header("X-Reason", reason)
690                .header(header::CONTENT_TYPE, "application/json")
691                .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
692                .unwrap();
693        }
694    };
695
696    // Get content type from header
697    let content_type = headers
698        .get(header::CONTENT_TYPE)
699        .and_then(|v| v.to_str().ok())
700        .unwrap_or("application/octet-stream")
701        .to_string();
702
703    // Check write access: either in allowed_npubs/social graph OR public_writes is enabled.
704    let is_allowed_author = is_allowed_write_author(&state, &auth.pubkey);
705    let can_upload = can_accept_upload_author(&state, &auth.pubkey);
706    if !can_upload {
707        let _ = check_write_access(&state, &auth.pubkey);
708        return Response::builder()
709            .status(StatusCode::FORBIDDEN)
710            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
711            .header(header::CONTENT_TYPE, "application/json")
712            .body(Body::from(r#"{"error":"Write access denied. Your pubkey is not in the allowed list and public writes are disabled."}"#))
713            .unwrap();
714    }
715
716    if let Err((status, reason)) = validate_upload_payload(
717        &body,
718        &content_type,
719        can_upload,
720        state.require_random_untrusted_ingest,
721    ) {
722        return blossom_json_error(status, reason);
723    }
724
725    // Compute SHA256 of uploaded data
726    let mut hasher = Sha256::new();
727    hasher.update(&body);
728    let sha256_hash: [u8; 32] = hasher.finalize().into();
729    let sha256_hex = hex::encode(sha256_hash);
730
731    // If auth has x tags, verify hash matches
732    if !auth.blob_hashes.is_empty() && !auth.blob_hashes.contains(&sha256_hex) {
733        return Response::builder()
734            .status(StatusCode::FORBIDDEN)
735            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
736            .header(
737                "X-Reason",
738                "Uploaded blob hash does not match authorized hash",
739            )
740            .header(header::CONTENT_TYPE, "application/json")
741            .body(Body::from(r#"{"error":"Hash mismatch"}"#))
742            .unwrap();
743    }
744
745    // Convert pubkey hex to bytes
746    let pubkey_bytes = match from_hex(&auth.pubkey) {
747        Ok(b) => b,
748        Err(_) => {
749            return Response::builder()
750                .status(StatusCode::BAD_REQUEST)
751                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
752                .header("X-Reason", "Invalid pubkey format")
753                .body(Body::empty())
754                .unwrap();
755        }
756    };
757
758    let size = body.len() as u64;
759    let now = SystemTime::now()
760        .duration_since(UNIX_EPOCH)
761        .unwrap()
762        .as_secs();
763    let ext = mime_to_extension(&content_type_base(&content_type));
764    let descriptor = BlobDescriptor {
765        url: format!("/{}{}", sha256_hex, ext),
766        sha256: sha256_hex.clone(),
767        size,
768        mime_type: content_type,
769        uploaded: now,
770    };
771
772    if state.optimistic_blossom_uploads && optimistic_upload_is_inflight(&sha256_hex) {
773        return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
774    }
775
776    // Store public-write blobs in cache storage unless the writer is explicitly
777    // allowed, so untrusted public uploads do not become protected owned data.
778    if state.optimistic_blossom_uploads {
779        let queued_bytes = body.len().max(OPTIMISTIC_UPLOAD_MIN_QUEUE_CHARGE_BYTES);
780        if queued_bytes <= state.optimistic_upload_queue_bytes {
781            let permits = queued_bytes as u32;
782            let marked_inflight = mark_optimistic_upload_inflight(&sha256_hex);
783            if !marked_inflight {
784                return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
785            }
786            let permit = match acquire_optimistic_upload_queue(&state, permits).await {
787                Ok(permit) => permit,
788                Err(_) => {
789                    clear_optimistic_upload_inflight(&sha256_hex);
790                    return blossom_retryable_json_error(
791                        StatusCode::SERVICE_UNAVAILABLE,
792                        "Optimistic upload queue is full",
793                        2,
794                    );
795                }
796            };
797            let state_for_write = state.clone();
798            let hash_for_log = sha256_hex.clone();
799            tokio::spawn(async move {
800                let _permit = permit;
801                if let Err(error) = store_blossom_blob_without_blocking_runtime(
802                    &state_for_write,
803                    body,
804                    pubkey_bytes,
805                    is_allowed_author,
806                )
807                .await
808                {
809                    tracing::error!(
810                        "Background Blossom storage failed for {}: {:#}",
811                        hash_for_log,
812                        error
813                    );
814                }
815                clear_optimistic_upload_inflight(&hash_for_log);
816            });
817
818            return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
819        }
820
821        tracing::warn!(
822            "Blossom upload {} is larger than optimistic queue budget {}; storing synchronously",
823            queued_bytes,
824            state.optimistic_upload_queue_bytes
825        );
826    }
827
828    match uploaded_blob_already_exists(&state, sha256_hash, &sha256_hex).await {
829        Ok(true) => {
830            if is_allowed_author {
831                if let Err(error) = set_existing_blob_owner_without_body_write(
832                    state.clone(),
833                    sha256_hash,
834                    pubkey_bytes,
835                )
836                .await
837                {
838                    return Response::builder()
839                        .status(StatusCode::INTERNAL_SERVER_ERROR)
840                        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
841                        .header("X-Reason", "Storage error")
842                        .header(header::CONTENT_TYPE, "application/json")
843                        .body(Body::from(format!(r#"{{"error":"{}"}}"#, error)))
844                        .unwrap();
845                }
846            }
847            return upload_descriptor_response(StatusCode::CONFLICT, &descriptor);
848        }
849        Ok(false) => {}
850        Err(error) => {
851            tracing::debug!(
852                "Could not preflight Blossom upload {} before synchronous storage: {}",
853                sha256_hex,
854                error
855            );
856        }
857    }
858
859    let store_result = store_blossom_blob_without_blocking_runtime(
860        &state,
861        body.clone(),
862        pubkey_bytes,
863        is_allowed_author,
864    )
865    .await;
866
867    match store_result {
868        Ok(()) => upload_descriptor_response(StatusCode::OK, &descriptor),
869        Err(e) => Response::builder()
870            .status(StatusCode::INTERNAL_SERVER_ERROR)
871            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
872            .header("X-Reason", "Storage error")
873            .header(header::CONTENT_TYPE, "application/json")
874            .body(Body::from(format!(r#"{{"error":"{}"}}"#, e)))
875            .unwrap(),
876    }
877}
878
879/// DELETE /<sha256> - Delete a blob (BUD-02)
880/// Note: Blob is only fully deleted when ALL owners have removed it
881pub async fn delete_blob(
882    State(state): State<AppState>,
883    Path(id): Path<String>,
884    headers: HeaderMap,
885) -> impl IntoResponse {
886    let (hash_part, _) = parse_hash_and_extension(&id);
887
888    if !is_valid_sha256(hash_part) {
889        return Response::builder()
890            .status(StatusCode::BAD_REQUEST)
891            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
892            .header("X-Reason", "Invalid SHA256 hash")
893            .body(Body::empty())
894            .unwrap();
895    }
896
897    let sha256_hex = hash_part.to_lowercase();
898
899    // Convert hash to bytes
900    let sha256_bytes = match from_hex(&sha256_hex) {
901        Ok(b) => b,
902        Err(_) => {
903            return Response::builder()
904                .status(StatusCode::BAD_REQUEST)
905                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
906                .header("X-Reason", "Invalid SHA256 hash format")
907                .body(Body::empty())
908                .unwrap();
909        }
910    };
911
912    // Verify authorization with hash requirement
913    let auth = match verify_blossom_auth(&headers, "delete", Some(&sha256_hex)) {
914        Ok(a) => a,
915        Err((status, reason)) => {
916            return Response::builder()
917                .status(status)
918                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
919                .header("X-Reason", reason)
920                .body(Body::empty())
921                .unwrap();
922        }
923    };
924
925    // Convert pubkey hex to bytes
926    let pubkey_bytes = match from_hex(&auth.pubkey) {
927        Ok(b) => b,
928        Err(_) => {
929            return Response::builder()
930                .status(StatusCode::BAD_REQUEST)
931                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
932                .header("X-Reason", "Invalid pubkey format")
933                .body(Body::empty())
934                .unwrap();
935        }
936    };
937
938    // Check ownership - user must be one of the owners (O(1) lookup with composite key)
939    match state.store.is_blob_owner(&sha256_bytes, &pubkey_bytes) {
940        Ok(true) => {
941            // User is an owner, proceed with delete
942        }
943        Ok(false) => {
944            // Check if blob exists at all (for proper error message)
945            match state.store.blob_has_owners(&sha256_bytes) {
946                Ok(true) => {
947                    return Response::builder()
948                        .status(StatusCode::FORBIDDEN)
949                        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
950                        .header("X-Reason", "Not a blob owner")
951                        .body(Body::empty())
952                        .unwrap();
953                }
954                Ok(false) => {
955                    return Response::builder()
956                        .status(StatusCode::NOT_FOUND)
957                        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
958                        .header(header::CACHE_CONTROL, NOT_FOUND_CACHE_CONTROL)
959                        .header("X-Reason", "Blob not found")
960                        .body(Body::empty())
961                        .unwrap();
962                }
963                Err(_) => {
964                    return Response::builder()
965                        .status(StatusCode::INTERNAL_SERVER_ERROR)
966                        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
967                        .body(Body::empty())
968                        .unwrap();
969                }
970            }
971        }
972        Err(_) => {
973            return Response::builder()
974                .status(StatusCode::INTERNAL_SERVER_ERROR)
975                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
976                .body(Body::empty())
977                .unwrap();
978        }
979    }
980
981    // Remove this user's ownership (blob only deleted when no owners remain)
982    match state
983        .store
984        .delete_blossom_blob(&sha256_bytes, &pubkey_bytes)
985    {
986        Ok(fully_deleted) => {
987            // Return 200 OK whether blob was fully deleted or just removed from user's list
988            // The client doesn't need to know if other owners still exist
989            Response::builder()
990                .status(StatusCode::OK)
991                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
992                .header(
993                    "X-Blob-Deleted",
994                    if fully_deleted { "true" } else { "false" },
995                )
996                .body(Body::empty())
997                .unwrap()
998        }
999        Err(_) => Response::builder()
1000            .status(StatusCode::INTERNAL_SERVER_ERROR)
1001            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1002            .body(Body::empty())
1003            .unwrap(),
1004    }
1005}
1006
1007/// GET /list/<pubkey> - List blobs for a pubkey (BUD-02)
1008pub async fn list_blobs(
1009    State(state): State<AppState>,
1010    Path(pubkey): Path<String>,
1011    Query(query): Query<ListQuery>,
1012    headers: HeaderMap,
1013) -> impl IntoResponse {
1014    // Validate pubkey format (64 hex chars)
1015    if pubkey.len() != 64 || !pubkey.chars().all(|c| c.is_ascii_hexdigit()) {
1016        return Response::builder()
1017            .status(StatusCode::BAD_REQUEST)
1018            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1019            .header("X-Reason", "Invalid pubkey format")
1020            .header(header::CONTENT_TYPE, "application/json")
1021            .body(Body::from("[]"))
1022            .unwrap();
1023    }
1024
1025    let pubkey_hex = pubkey.to_lowercase();
1026    let pubkey_bytes: [u8; 32] = match from_hex(&pubkey_hex) {
1027        Ok(b) => b,
1028        Err(_) => {
1029            return Response::builder()
1030                .status(StatusCode::BAD_REQUEST)
1031                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1032                .header("X-Reason", "Invalid pubkey format")
1033                .header(header::CONTENT_TYPE, "application/json")
1034                .body(Body::from("[]"))
1035                .unwrap()
1036        }
1037    };
1038
1039    let auth = match verify_blossom_auth(&headers, "list", None) {
1040        Ok(auth) => auth,
1041        Err((status, reason)) => {
1042            return Response::builder()
1043                .status(status)
1044                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1045                .header("X-Reason", reason)
1046                .header(header::CONTENT_TYPE, "application/json")
1047                .body(Body::from("[]"))
1048                .unwrap();
1049        }
1050    };
1051
1052    if !auth.pubkey.eq_ignore_ascii_case(&pubkey_hex) {
1053        return Response::builder()
1054            .status(StatusCode::FORBIDDEN)
1055            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1056            .header("X-Reason", "Pubkey mismatch")
1057            .header(header::CONTENT_TYPE, "application/json")
1058            .body(Body::from("[]"))
1059            .unwrap();
1060    }
1061
1062    // Get blobs for this pubkey
1063    match state.store.list_blobs_by_pubkey(&pubkey_bytes) {
1064        Ok(blobs) => {
1065            // Apply filters
1066            let mut filtered: Vec<_> = blobs
1067                .into_iter()
1068                .filter(|b| {
1069                    if let Some(since) = query.since {
1070                        if b.uploaded < since {
1071                            return false;
1072                        }
1073                    }
1074                    if let Some(until) = query.until {
1075                        if b.uploaded > until {
1076                            return false;
1077                        }
1078                    }
1079                    true
1080                })
1081                .collect();
1082
1083            // Sort by uploaded descending (most recent first)
1084            filtered.sort_by(|a, b| b.uploaded.cmp(&a.uploaded));
1085
1086            // Apply limit
1087            let limit = query.limit.unwrap_or(100).min(1000);
1088            filtered.truncate(limit);
1089
1090            Response::builder()
1091                .status(StatusCode::OK)
1092                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1093                .header(header::CONTENT_TYPE, "application/json")
1094                .body(Body::from(serde_json::to_string(&filtered).unwrap()))
1095                .unwrap()
1096        }
1097        Err(_) => Response::builder()
1098            .status(StatusCode::INTERNAL_SERVER_ERROR)
1099            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1100            .header(header::CONTENT_TYPE, "application/json")
1101            .body(Body::from("[]"))
1102            .unwrap(),
1103    }
1104}
1105
1106// Helper functions
1107
1108fn parse_hash_and_extension(id: &str) -> (&str, Option<&str>) {
1109    if let Some(dot_pos) = id.rfind('.') {
1110        (&id[..dot_pos], Some(&id[dot_pos..]))
1111    } else {
1112        (id, None)
1113    }
1114}
1115
1116fn is_valid_sha256(s: &str) -> bool {
1117    s.len() == 64 && s.chars().all(|c| c.is_ascii_hexdigit())
1118}
1119
1120#[cfg(test)]
1121fn store_blossom_blob(
1122    state: &AppState,
1123    data: &[u8],
1124    _sha256: &[u8; 32],
1125    pubkey: &[u8; 32],
1126    track_ownership: bool,
1127) -> anyhow::Result<()> {
1128    if track_ownership {
1129        state.store.put_owned_blob(data, pubkey)?;
1130    } else {
1131        state.store.put_cached_blob(data)?;
1132    }
1133
1134    Ok(())
1135}
1136
1137fn mime_to_extension(mime: &str) -> &'static str {
1138    match mime {
1139        "image/png" => ".png",
1140        "image/jpeg" => ".jpg",
1141        "image/gif" => ".gif",
1142        "image/webp" => ".webp",
1143        "image/svg+xml" => ".svg",
1144        "video/mp4" => ".mp4",
1145        "video/webm" => ".webm",
1146        "audio/mpeg" => ".mp3",
1147        "audio/ogg" => ".ogg",
1148        "application/pdf" => ".pdf",
1149        "text/plain" => ".txt",
1150        "text/html" => ".html",
1151        "application/json" => ".json",
1152        _ => "",
1153    }
1154}
1155
1156#[cfg(test)]
1157mod tests {
1158    use super::*;
1159    use crate::server::auth::WsRelayState;
1160    use crate::storage::HashtreeStore;
1161    use axum::response::IntoResponse;
1162    use base64::Engine;
1163    use hashtree_core::sha256;
1164    use std::collections::HashSet;
1165    use std::sync::{Arc, Mutex as StdMutex};
1166    use std::time::Duration;
1167    use tempfile::TempDir;
1168
1169    fn test_app_state(store: Arc<HashtreeStore>) -> AppState {
1170        AppState {
1171            store,
1172            auth: None,
1173            daemon_started_at: 1_700_000_000,
1174            peer_mode: crate::config::ServerMode::Normal,
1175            hash_get_enabled: true,
1176            http_webrtc_fetch: true,
1177            webrtc_peers: None,
1178            fips_transport: None,
1179            fetch_from_fips_peers: true,
1180            ws_relay: Arc::new(WsRelayState::new()),
1181            max_upload_bytes: 5 * 1024 * 1024,
1182            public_writes: true,
1183            require_random_untrusted_ingest: true,
1184            optimistic_blossom_uploads: false,
1185            optimistic_upload_queue_bytes: 512 * 1024 * 1024,
1186            optimistic_upload_queue: Arc::new(tokio::sync::Semaphore::new(512 * 1024 * 1024)),
1187            allowed_pubkeys: HashSet::new(),
1188            upstream_blossom: Vec::new(),
1189            social_graph: None,
1190            social_graph_store: None,
1191            social_graph_root: None,
1192            socialgraph_snapshot_public: false,
1193            nostr_relay: None,
1194            nostr_relay_urls: Vec::new(),
1195            tree_root_cache: Arc::new(StdMutex::new(std::collections::HashMap::new())),
1196            inflight_blob_fetches: Arc::new(tokio::sync::Mutex::new(
1197                std::collections::HashMap::new(),
1198            )),
1199            inflight_blob_reads: Arc::new(
1200                tokio::sync::Mutex::new(std::collections::HashMap::new()),
1201            ),
1202            blob_cache: Arc::new(crate::blob_cache::BlobCache::for_tests()),
1203            directory_listing_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1204            resolved_path_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1205            thumbnail_path_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1206            cid_size_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1207        }
1208    }
1209
1210    fn create_upload_auth_header(keys: &nostr::Keys) -> String {
1211        use nostr::{EventBuilder, Kind, Tag, TagKind, Timestamp};
1212
1213        let now = Timestamp::now();
1214        let event = EventBuilder::new(
1215            Kind::Custom(BLOSSOM_AUTH_KIND),
1216            "",
1217            vec![
1218                Tag::custom(TagKind::Custom("t".into()), vec!["upload".to_string()]),
1219                Tag::custom(
1220                    TagKind::Custom("expiration".into()),
1221                    vec![(now.as_u64() + 300).to_string()],
1222                ),
1223            ],
1224        )
1225        .custom_created_at(now)
1226        .to_event(keys)
1227        .expect("sign blossom auth");
1228        let json = serde_json::to_vec(&event).expect("serialize auth event");
1229        format!(
1230            "Nostr {}",
1231            base64::engine::general_purpose::STANDARD.encode(json)
1232        )
1233    }
1234
1235    #[test]
1236    fn test_is_valid_sha256() {
1237        assert!(is_valid_sha256(
1238            "e2bab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6"
1239        ));
1240        assert!(is_valid_sha256(
1241            "0000000000000000000000000000000000000000000000000000000000000000"
1242        ));
1243
1244        // Too short
1245        assert!(!is_valid_sha256("e2bab35b5296ec2242ded0a01f6d6723"));
1246        // Too long
1247        assert!(!is_valid_sha256(
1248            "e2bab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6aa"
1249        ));
1250        // Invalid chars
1251        assert!(!is_valid_sha256(
1252            "zzbab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6"
1253        ));
1254        // Empty
1255        assert!(!is_valid_sha256(""));
1256    }
1257
1258    #[test]
1259    fn test_parse_hash_and_extension() {
1260        let (hash, ext) = parse_hash_and_extension("abc123.png");
1261        assert_eq!(hash, "abc123");
1262        assert_eq!(ext, Some(".png"));
1263
1264        let (hash2, ext2) = parse_hash_and_extension("abc123");
1265        assert_eq!(hash2, "abc123");
1266        assert_eq!(ext2, None);
1267
1268        let (hash3, ext3) = parse_hash_and_extension("abc.123.jpg");
1269        assert_eq!(hash3, "abc.123");
1270        assert_eq!(ext3, Some(".jpg"));
1271    }
1272
1273    #[test]
1274    fn test_mime_to_extension() {
1275        assert_eq!(mime_to_extension("image/png"), ".png");
1276        assert_eq!(mime_to_extension("image/jpeg"), ".jpg");
1277        assert_eq!(mime_to_extension("video/mp4"), ".mp4");
1278        assert_eq!(mime_to_extension("application/octet-stream"), "");
1279        assert_eq!(mime_to_extension("unknown/type"), "");
1280    }
1281
1282    #[tokio::test]
1283    async fn optimistic_uploads_return_accepted_and_store_in_background() {
1284        let temp_dir = TempDir::new().expect("temp dir");
1285        let store = Arc::new(
1286            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1287        );
1288        let mut state = test_app_state(Arc::clone(&store));
1289        state.optimistic_blossom_uploads = true;
1290
1291        let keys = nostr::Keys::generate();
1292        let mut headers = HeaderMap::new();
1293        headers.insert(
1294            header::AUTHORIZATION,
1295            create_upload_auth_header(&keys)
1296                .parse()
1297                .expect("auth header value"),
1298        );
1299        headers.insert(
1300            header::CONTENT_TYPE,
1301            "application/octet-stream"
1302                .parse()
1303                .expect("content type header value"),
1304        );
1305
1306        let body = axum::body::Bytes::from((0u8..=255).map(|byte| byte ^ 0x55).collect::<Vec<_>>());
1307        let hash = sha256(&body);
1308        let response = upload_blob(State(state), headers, body)
1309            .await
1310            .into_response();
1311        assert_eq!(response.status(), StatusCode::ACCEPTED);
1312
1313        for _ in 0..50 {
1314            if store.blob_exists(&hash).expect("blob exists check") {
1315                return;
1316            }
1317            tokio::time::sleep(Duration::from_millis(10)).await;
1318        }
1319
1320        panic!("optimistic upload was not stored in the background");
1321    }
1322
1323    #[tokio::test]
1324    async fn optimistic_upload_existing_blob_skips_queue() {
1325        let temp_dir = TempDir::new().expect("temp dir");
1326        let store = Arc::new(
1327            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1328        );
1329        let body = axum::body::Bytes::from(
1330            (0u16..=255)
1331                .map(|value| ((value * 73 + 19) % 256) as u8)
1332                .collect::<Vec<_>>(),
1333        );
1334        store.put_cached_blob(&body).expect("seed blob");
1335
1336        let mut state = test_app_state(Arc::clone(&store));
1337        state.optimistic_blossom_uploads = true;
1338        state.optimistic_upload_queue_bytes = 1;
1339        state.optimistic_upload_queue = Arc::new(tokio::sync::Semaphore::new(1));
1340
1341        let keys = nostr::Keys::generate();
1342        let mut headers = HeaderMap::new();
1343        headers.insert(
1344            header::AUTHORIZATION,
1345            create_upload_auth_header(&keys)
1346                .parse()
1347                .expect("auth header value"),
1348        );
1349        headers.insert(
1350            header::CONTENT_TYPE,
1351            "application/octet-stream"
1352                .parse()
1353                .expect("content type header value"),
1354        );
1355
1356        let response = upload_blob(State(state), headers, body)
1357            .await
1358            .into_response();
1359        assert_eq!(response.status(), StatusCode::CONFLICT);
1360    }
1361
1362    #[tokio::test]
1363    async fn optimistic_upload_existing_blob_uses_queue_before_preflight_when_queue_has_room() {
1364        let temp_dir = TempDir::new().expect("temp dir");
1365        let store = Arc::new(
1366            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1367        );
1368        let body = axum::body::Bytes::from((0u8..=255).rev().collect::<Vec<_>>());
1369        let hash_hex = hex::encode(sha256(&body));
1370        store.put_cached_blob(&body).expect("seed blob");
1371
1372        let mut state = test_app_state(store);
1373        state.optimistic_blossom_uploads = true;
1374
1375        let keys = nostr::Keys::generate();
1376        let mut headers = HeaderMap::new();
1377        headers.insert(
1378            header::AUTHORIZATION,
1379            create_upload_auth_header(&keys)
1380                .parse()
1381                .expect("auth header value"),
1382        );
1383        headers.insert(
1384            header::CONTENT_TYPE,
1385            "application/octet-stream"
1386                .parse()
1387                .expect("content type header value"),
1388        );
1389
1390        let response = upload_blob(State(state), headers, body)
1391            .await
1392            .into_response();
1393        assert_eq!(response.status(), StatusCode::ACCEPTED);
1394
1395        for _ in 0..50 {
1396            if !optimistic_upload_is_inflight(&hash_hex) {
1397                return;
1398            }
1399            tokio::time::sleep(Duration::from_millis(10)).await;
1400        }
1401
1402        clear_optimistic_upload_inflight(&hash_hex);
1403        panic!("optimistic upload in-flight marker was not cleared");
1404    }
1405
1406    #[tokio::test]
1407    async fn optimistic_upload_inflight_duplicate_skips_queue() {
1408        let temp_dir = TempDir::new().expect("temp dir");
1409        let store = Arc::new(
1410            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1411        );
1412        let body = axum::body::Bytes::from((0u8..=255).collect::<Vec<_>>());
1413        let hash_hex = hex::encode(sha256(&body));
1414        assert!(mark_optimistic_upload_inflight(&hash_hex));
1415
1416        let mut state = test_app_state(store);
1417        state.optimistic_blossom_uploads = true;
1418        state.optimistic_upload_queue_bytes = 1;
1419        state.optimistic_upload_queue = Arc::new(tokio::sync::Semaphore::new(1));
1420
1421        let keys = nostr::Keys::generate();
1422        let mut headers = HeaderMap::new();
1423        headers.insert(
1424            header::AUTHORIZATION,
1425            create_upload_auth_header(&keys)
1426                .parse()
1427                .expect("auth header value"),
1428        );
1429        headers.insert(
1430            header::CONTENT_TYPE,
1431            "application/octet-stream"
1432                .parse()
1433                .expect("content type header value"),
1434        );
1435
1436        let response = upload_blob(State(state), headers, body)
1437            .await
1438            .into_response();
1439        clear_optimistic_upload_inflight(&hash_hex);
1440        assert_eq!(response.status(), StatusCode::ACCEPTED);
1441    }
1442
1443    #[test]
1444    fn public_writes_accept_unlisted_authors_for_uploads() {
1445        let temp_dir = TempDir::new().expect("temp dir");
1446        let store =
1447            Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 700).expect("store"));
1448        let mut state = test_app_state(store);
1449        let pubkey = "ea4fe79e57f209309bffed2f92f0b95b59d3d1cb4e8892444398aeea7ee317ed";
1450
1451        state.public_writes = true;
1452        assert!(can_accept_upload_author(&state, pubkey));
1453        assert!(!is_allowed_write_author(&state, pubkey));
1454
1455        state.public_writes = false;
1456        assert!(!can_accept_upload_author(&state, pubkey));
1457    }
1458
1459    #[test]
1460    fn public_write_trust_allows_octet_stream_and_raw_media_payloads() {
1461        let encrypted_block: Vec<u8> = (0..=255).collect();
1462
1463        assert_eq!(
1464            validate_upload_payload(&encrypted_block, "application/octet-stream", false, true,),
1465            Ok(())
1466        );
1467
1468        assert_eq!(
1469            validate_upload_payload(b"audio bytes", "audio/mpeg", true, true,),
1470            Ok(())
1471        );
1472
1473        assert_eq!(
1474            validate_upload_payload(b"audio bytes", "audio/mpeg", false, true,),
1475            Err((
1476                StatusCode::FORBIDDEN,
1477                "Raw media uploads require write access".to_string(),
1478            ))
1479        );
1480    }
1481
1482    #[test]
1483    fn unowned_public_uploads_use_cache_storage_semantics() {
1484        let temp_dir = TempDir::new().expect("temp dir");
1485        let store =
1486            Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 700).expect("store"));
1487        let state = test_app_state(Arc::clone(&store));
1488
1489        let owned = vec![1u8; 280];
1490        let owned_hash = sha256(&owned);
1491        store_blossom_blob(&state, &owned, &owned_hash, &[2u8; 32], true).expect("owned upload");
1492
1493        let public_upload = vec![3u8; 280];
1494        let public_hash = sha256(&public_upload);
1495        store_blossom_blob(&state, &public_upload, &public_hash, &[4u8; 32], false)
1496            .expect("public upload");
1497
1498        let replacement = vec![5u8; 280];
1499        let replacement_hash = sha256(&replacement);
1500        state
1501            .store
1502            .put_cached_blob(&replacement)
1503            .expect("replacement cached blob");
1504
1505        assert!(state.store.blob_exists(&owned_hash).expect("owned exists"));
1506        assert!(!state
1507            .store
1508            .blob_exists(&public_hash)
1509            .expect("public upload evicted"));
1510        assert!(state
1511            .store
1512            .blob_exists(&replacement_hash)
1513            .expect("replacement exists"));
1514        assert!(state
1515            .store
1516            .is_blob_owner(&owned_hash, &[2u8; 32])
1517            .expect("owned tracked"));
1518        assert!(!state
1519            .store
1520            .blob_has_owners(&public_hash)
1521            .expect("public upload unowned"));
1522    }
1523
1524    #[test]
1525    fn owned_blossom_uploads_are_rejected_when_storage_limit_is_full() {
1526        let temp_dir = TempDir::new().expect("temp dir");
1527        let store =
1528            Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 500).expect("store"));
1529        let state = test_app_state(Arc::clone(&store));
1530
1531        let first = vec![1u8; 300];
1532        let first_hash = sha256(&first);
1533        let owner = [2u8; 32];
1534        store_blossom_blob(&state, &first, &first_hash, &owner, true).expect("first upload");
1535
1536        let second = vec![3u8; 300];
1537        let second_hash = sha256(&second);
1538        let error = store_blossom_blob(&state, &second, &second_hash, &owner, true)
1539            .expect_err("second owned upload should exceed the storage limit");
1540
1541        assert!(
1542            error.to_string().contains("storage limit"),
1543            "unexpected error: {error}"
1544        );
1545        assert!(state
1546            .store
1547            .blob_exists(&first_hash)
1548            .expect("first blob remains"));
1549        assert!(!state
1550            .store
1551            .blob_exists(&second_hash)
1552            .expect("second blob rejected"));
1553        assert!(state
1554            .store
1555            .is_blob_owner(&first_hash, &owner)
1556            .expect("first owner tracked"));
1557        assert!(!state
1558            .store
1559            .is_blob_owner(&second_hash, &owner)
1560            .expect("second owner not tracked"));
1561    }
1562}