Skip to main content

hashtree_cli/server/
blossom.rs

1//! Blossom protocol implementation (BUD-01, BUD-02)
2//!
3//! Implements blob storage endpoints with Nostr-based authentication.
4//! See: https://github.com/hzrd149/blossom
5
6use axum::{
7    body::Body,
8    extract::{Path, Query, State},
9    http::{header, HeaderMap, Response, StatusCode},
10    response::IntoResponse,
11};
12use base64::Engine;
13use hashtree_core::from_hex;
14use serde::{Deserialize, Serialize};
15use sha2::{Digest, Sha256};
16use std::collections::HashSet;
17use std::sync::{Mutex, OnceLock};
18use std::time::{Duration, SystemTime, UNIX_EPOCH};
19
20use super::auth::AppState;
21use super::blob_read::{acquire_blob_read, acquire_blob_write, blob_read_timeout, BLOB_READ_BUSY};
22use super::ingest_filter::{
23    content_type_base, is_chk_content_type, validate_untrusted_blob, IngestRejection,
24};
25use super::mime::get_mime_type;
26
27/// Blossom authorization event kind (NIP-98 style)
28const BLOSSOM_AUTH_KIND: u16 = 24242;
29
30/// Cache-Control header for immutable content-addressed data (1 year)
31const IMMUTABLE_CACHE_CONTROL: &str = "public, max-age=31536000, immutable";
32const NOT_FOUND_CACHE_CONTROL: &str = "no-store";
33const IMMUTABLE_NOT_FOUND_CACHE_CONTROL: &str = "public, max-age=0, s-maxage=5";
34const OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS_ENV: &str = "HTREE_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS";
35const DEFAULT_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS: u64 = 15_000;
36
37/// Default maximum upload size in bytes (5 MB)
38pub const DEFAULT_MAX_UPLOAD_SIZE: usize = 5 * 1024 * 1024;
39const OPTIMISTIC_UPLOAD_MIN_QUEUE_CHARGE_BYTES: usize = 256 * 1024;
40
41#[derive(Debug, Clone, Copy)]
42pub(super) struct OptimisticUploadQueueSnapshot {
43    pub enabled: bool,
44    pub max_bytes: usize,
45    pub available_bytes: usize,
46    pub reserved_bytes: usize,
47    pub in_flight: usize,
48    pub queue_timeout_ms: u64,
49}
50
51/// Check if a pubkey has write access based on allowed_npubs config or social graph
52/// Returns Ok(()) if allowed, Err with JSON error body if denied
53#[allow(clippy::result_large_err)]
54fn check_write_access(state: &AppState, pubkey: &str) -> Result<(), Response<Body>> {
55    // Check if pubkey is in the allowed list (converted from npub to hex)
56    if is_allowed_write_author(state, pubkey) {
57        tracing::debug!(
58            "Blossom write allowed for {}... (allowed writer)",
59            &pubkey[..8.min(pubkey.len())]
60        );
61        return Ok(());
62    }
63
64    // Not in allowed list or social graph
65    tracing::info!(
66        "Blossom write denied for {}... (not in allowed_npubs or social graph)",
67        &pubkey[..8.min(pubkey.len())]
68    );
69    Err(Response::builder()
70        .status(StatusCode::FORBIDDEN)
71        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
72        .header(header::CONTENT_TYPE, "application/json")
73        .body(Body::from(
74            r#"{"error":"Write access denied. Your pubkey is not in the allowed list."}"#,
75        ))
76        .unwrap())
77}
78
79fn is_allowed_write_author(state: &AppState, pubkey: &str) -> bool {
80    if state.allowed_pubkeys.contains(pubkey) {
81        return true;
82    }
83
84    state
85        .social_graph
86        .as_ref()
87        .map(|sg| sg.check_write_access(pubkey))
88        .unwrap_or(false)
89}
90
91fn can_accept_upload_author(state: &AppState, pubkey: &str) -> bool {
92    state.public_writes || is_allowed_write_author(state, pubkey)
93}
94
95fn validate_upload_payload(
96    body: &[u8],
97    content_type: &str,
98    is_allowed_writer: bool,
99    require_random_untrusted_ingest: bool,
100) -> Result<(), (StatusCode, String)> {
101    let is_chk_upload = is_chk_content_type(content_type);
102
103    if !is_chk_upload && !is_allowed_writer {
104        return Err((
105            StatusCode::FORBIDDEN,
106            "Raw media uploads require write access".to_string(),
107        ));
108    }
109
110    if is_chk_upload {
111        validate_untrusted_blob(body, require_random_untrusted_ingest)
112            .map_err(|IngestRejection { status, reason }| (status, reason))?;
113    }
114
115    Ok(())
116}
117
118fn blossom_json_error(status: StatusCode, reason: impl Into<String>) -> Response<Body> {
119    let reason = reason.into();
120    Response::builder()
121        .status(status)
122        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
123        .header("X-Reason", reason.as_str())
124        .header(header::CONTENT_TYPE, "application/json")
125        .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
126        .unwrap()
127}
128
129fn blossom_retryable_json_error(
130    status: StatusCode,
131    reason: impl Into<String>,
132    retry_after_seconds: u64,
133) -> Response<Body> {
134    let reason = reason.into();
135    Response::builder()
136        .status(status)
137        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
138        .header("X-Reason", reason.as_str())
139        .header(header::RETRY_AFTER, retry_after_seconds.to_string())
140        .header(header::CONTENT_TYPE, "application/json")
141        .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
142        .unwrap()
143}
144
145/// Blob descriptor returned by upload and list endpoints
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct BlobDescriptor {
148    pub url: String,
149    pub sha256: String,
150    pub size: u64,
151    #[serde(rename = "type")]
152    pub mime_type: String,
153    pub uploaded: u64,
154}
155
156/// Query parameters for list endpoint
157#[derive(Debug, Deserialize)]
158pub struct ListQuery {
159    pub since: Option<u64>,
160    pub until: Option<u64>,
161    pub limit: Option<usize>,
162    pub cursor: Option<String>,
163}
164
165/// Parsed Nostr authorization event
166#[derive(Debug)]
167pub struct BlossomAuth {
168    pub pubkey: String,
169    pub kind: u16,
170    pub created_at: u64,
171    pub expiration: Option<u64>,
172    pub action: Option<String>,   // "upload", "delete", "list", "get"
173    pub blob_hashes: Vec<String>, // x tags
174    pub server: Option<String>,   // server tag
175}
176
177/// Parse and verify Nostr authorization from header
178/// Returns the verified auth or an error response
179pub fn verify_blossom_auth(
180    headers: &HeaderMap,
181    required_action: &str,
182    required_hash: Option<&str>,
183) -> Result<BlossomAuth, (StatusCode, &'static str)> {
184    let auth_header = headers
185        .get(header::AUTHORIZATION)
186        .and_then(|v| v.to_str().ok())
187        .ok_or((StatusCode::UNAUTHORIZED, "Missing Authorization header"))?;
188
189    let nostr_event = auth_header.strip_prefix("Nostr ").ok_or((
190        StatusCode::UNAUTHORIZED,
191        "Invalid auth scheme, expected 'Nostr'",
192    ))?;
193
194    // Decode base64 event
195    let engine = base64::engine::general_purpose::STANDARD;
196    let event_bytes = engine
197        .decode(nostr_event)
198        .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid base64 in auth header"))?;
199
200    let event_json: serde_json::Value = serde_json::from_slice(&event_bytes)
201        .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid JSON in auth event"))?;
202
203    // Extract event fields
204    let kind = event_json["kind"]
205        .as_u64()
206        .ok_or((StatusCode::BAD_REQUEST, "Missing kind in event"))?;
207
208    if kind != BLOSSOM_AUTH_KIND as u64 {
209        return Err((
210            StatusCode::BAD_REQUEST,
211            "Invalid event kind, expected 24242",
212        ));
213    }
214
215    let pubkey = event_json["pubkey"]
216        .as_str()
217        .ok_or((StatusCode::BAD_REQUEST, "Missing pubkey in event"))?
218        .to_string();
219
220    let created_at = event_json["created_at"]
221        .as_u64()
222        .ok_or((StatusCode::BAD_REQUEST, "Missing created_at in event"))?;
223
224    let sig = event_json["sig"]
225        .as_str()
226        .ok_or((StatusCode::BAD_REQUEST, "Missing signature in event"))?;
227
228    // Verify signature
229    if !verify_nostr_signature(&event_json, &pubkey, sig) {
230        return Err((StatusCode::UNAUTHORIZED, "Invalid signature"));
231    }
232
233    // Parse tags
234    let tags = event_json["tags"]
235        .as_array()
236        .ok_or((StatusCode::BAD_REQUEST, "Missing tags in event"))?;
237
238    let mut expiration: Option<u64> = None;
239    let mut action: Option<String> = None;
240    let mut blob_hashes: Vec<String> = Vec::new();
241    let mut server: Option<String> = None;
242
243    for tag in tags {
244        let tag_arr = tag.as_array();
245        if let Some(arr) = tag_arr {
246            if arr.len() >= 2 {
247                let tag_name = arr[0].as_str().unwrap_or("");
248                let tag_value = arr[1].as_str().unwrap_or("");
249
250                match tag_name {
251                    "t" => action = Some(tag_value.to_string()),
252                    "x" => blob_hashes.push(tag_value.to_lowercase()),
253                    "expiration" => expiration = tag_value.parse().ok(),
254                    "server" => server = Some(tag_value.to_string()),
255                    _ => {}
256                }
257            }
258        }
259    }
260
261    // Validate expiration
262    let now = SystemTime::now()
263        .duration_since(UNIX_EPOCH)
264        .unwrap()
265        .as_secs();
266
267    if let Some(exp) = expiration {
268        if exp < now {
269            return Err((StatusCode::UNAUTHORIZED, "Authorization expired"));
270        }
271    }
272
273    // Validate created_at is not in the future (with 60s tolerance)
274    if created_at > now + 60 {
275        return Err((StatusCode::BAD_REQUEST, "Event created_at is in the future"));
276    }
277
278    // Validate action matches
279    if let Some(ref act) = action {
280        if act != required_action {
281            return Err((StatusCode::FORBIDDEN, "Action mismatch"));
282        }
283    } else {
284        return Err((StatusCode::BAD_REQUEST, "Missing 't' tag for action"));
285    }
286
287    // Validate hash if required
288    if let Some(hash) = required_hash {
289        if !blob_hashes.is_empty() && !blob_hashes.contains(&hash.to_lowercase()) {
290            return Err((StatusCode::FORBIDDEN, "Blob hash not authorized"));
291        }
292    }
293
294    Ok(BlossomAuth {
295        pubkey,
296        kind: kind as u16,
297        created_at,
298        expiration,
299        action,
300        blob_hashes,
301        server,
302    })
303}
304
305/// Verify Nostr event signature using secp256k1
306fn verify_nostr_signature(event: &serde_json::Value, pubkey: &str, sig: &str) -> bool {
307    use secp256k1::{schnorr::Signature, Message, Secp256k1, XOnlyPublicKey};
308
309    // Compute event ID (sha256 of serialized event)
310    let content = event["content"].as_str().unwrap_or("");
311    let full_serialized = format!(
312        "[0,\"{}\",{},{},{},\"{}\"]",
313        pubkey,
314        event["created_at"],
315        event["kind"],
316        event["tags"],
317        escape_json_string(content),
318    );
319
320    let mut hasher = Sha256::new();
321    hasher.update(full_serialized.as_bytes());
322    let event_id = hasher.finalize();
323
324    // Parse pubkey and signature
325    let pubkey_bytes = match hex::decode(pubkey) {
326        Ok(b) => b,
327        Err(_) => return false,
328    };
329
330    let sig_bytes = match hex::decode(sig) {
331        Ok(b) => b,
332        Err(_) => return false,
333    };
334
335    let secp = Secp256k1::verification_only();
336
337    let xonly_pubkey = match XOnlyPublicKey::from_slice(&pubkey_bytes) {
338        Ok(pk) => pk,
339        Err(_) => return false,
340    };
341
342    let signature = match Signature::from_slice(&sig_bytes) {
343        Ok(s) => s,
344        Err(_) => return false,
345    };
346
347    let message = match Message::from_digest_slice(&event_id) {
348        Ok(m) => m,
349        Err(_) => return false,
350    };
351
352    secp.verify_schnorr(&signature, &message, &xonly_pubkey)
353        .is_ok()
354}
355
356/// Escape string for JSON serialization
357fn escape_json_string(s: &str) -> String {
358    let mut result = String::new();
359    for c in s.chars() {
360        match c {
361            '"' => result.push_str("\\\""),
362            '\\' => result.push_str("\\\\"),
363            '\n' => result.push_str("\\n"),
364            '\r' => result.push_str("\\r"),
365            '\t' => result.push_str("\\t"),
366            c if c.is_control() => {
367                result.push_str(&format!("\\u{:04x}", c as u32));
368            }
369            c => result.push(c),
370        }
371    }
372    result
373}
374
375/// CORS preflight handler for all Blossom endpoints
376/// Echoes back Access-Control-Request-Headers to allow any headers
377pub async fn cors_preflight(headers: HeaderMap) -> impl IntoResponse {
378    // Echo back requested headers, or use sensible defaults that cover common Blossom headers
379    let allowed_headers = headers
380        .get(header::ACCESS_CONTROL_REQUEST_HEADERS)
381        .and_then(|v| v.to_str().ok())
382        .unwrap_or("Authorization, Content-Type, X-SHA-256, x-sha-256");
383
384    // Always include common headers in addition to what was requested
385    let full_allowed = format!(
386        "{}, Authorization, Content-Type, X-SHA-256, x-sha-256, Accept, Cache-Control",
387        allowed_headers
388    );
389
390    Response::builder()
391        .status(StatusCode::NO_CONTENT)
392        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
393        .header(
394            header::ACCESS_CONTROL_ALLOW_METHODS,
395            "GET, HEAD, PUT, DELETE, OPTIONS",
396        )
397        .header(header::ACCESS_CONTROL_ALLOW_HEADERS, full_allowed)
398        .header(header::ACCESS_CONTROL_MAX_AGE, "86400")
399        .body(Body::empty())
400        .unwrap()
401}
402
403/// HEAD /<sha256> - Check if blob exists
404pub async fn head_blob(
405    State(state): State<AppState>,
406    Path(id): Path<String>,
407    connect_info: axum::extract::ConnectInfo<std::net::SocketAddr>,
408) -> impl IntoResponse {
409    let is_localhost = connect_info.0.ip().is_loopback();
410    let (hash_part, ext) = parse_hash_and_extension(&id);
411
412    if !is_valid_sha256(hash_part) {
413        return Response::builder()
414            .status(StatusCode::BAD_REQUEST)
415            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
416            .header("X-Reason", "Invalid SHA256 hash")
417            .body(Body::empty())
418            .unwrap();
419    }
420
421    let sha256_hex = hash_part.to_lowercase();
422    let sha256_bytes: [u8; 32] = match from_hex(&sha256_hex) {
423        Ok(b) => b,
424        Err(_) => {
425            return Response::builder()
426                .status(StatusCode::BAD_REQUEST)
427                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
428                .header("X-Reason", "Invalid SHA256 format")
429                .body(Body::empty())
430                .unwrap()
431        }
432    };
433
434    // Blossom HEAD only needs metadata; avoid reading the full blob body just to
435    // answer cache probes and CDN revalidation. The read permit keeps CDN probe
436    // storms from filling Tokio's blocking thread pool while old blobs without
437    // metadata are still being normalized.
438    let blob_size = if let Some(cached) = state.blob_cache.get_size(&sha256_hex) {
439        Ok(Ok(cached))
440    } else {
441        let permit = match acquire_blob_read().await {
442            Ok(permit) => permit,
443            Err(_) => {
444                return Response::builder()
445                    .status(StatusCode::SERVICE_UNAVAILABLE)
446                    .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
447                    .header(header::CACHE_CONTROL, NOT_FOUND_CACHE_CONTROL)
448                    .header("Retry-After", "1")
449                    .header("X-Reason", BLOB_READ_BUSY)
450                    .body(Body::empty())
451                    .unwrap()
452            }
453        };
454        let store = state.store.clone();
455        let size_read = tokio::task::spawn_blocking(move || {
456            let _permit = permit;
457            store.blob_size(&sha256_bytes)
458        });
459        let result = match tokio::time::timeout(blob_read_timeout(), size_read).await {
460            Ok(result) => result.map_err(|_| ()),
461            Err(_) => Err(()),
462        };
463        if let Ok(Ok(size)) = result {
464            state.blob_cache.put_size(sha256_hex.clone(), size);
465        }
466        result
467    };
468
469    match blob_size {
470        Ok(Ok(Some(size))) => {
471            let mime_type = ext
472                .map(|e| get_mime_type(&format!("file{}", e)))
473                .unwrap_or("application/octet-stream");
474
475            let mut builder = Response::builder()
476                .status(StatusCode::OK)
477                .header(header::CONTENT_TYPE, mime_type)
478                .header(header::CONTENT_LENGTH, size)
479                .header(header::ACCEPT_RANGES, "bytes")
480                .header(header::CACHE_CONTROL, IMMUTABLE_CACHE_CONTROL)
481                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*");
482            if is_localhost {
483                builder = builder.header("X-Source", "local");
484            }
485            builder.body(Body::empty()).unwrap()
486        }
487        Ok(Ok(None)) => Response::builder()
488            .status(StatusCode::NOT_FOUND)
489            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
490            .header(header::CACHE_CONTROL, IMMUTABLE_NOT_FOUND_CACHE_CONTROL)
491            .header("X-Reason", "Blob not found")
492            .body(Body::empty())
493            .unwrap(),
494        _ => Response::builder()
495            .status(StatusCode::INTERNAL_SERVER_ERROR)
496            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
497            .body(Body::empty())
498            .unwrap(),
499    }
500}
501
502async fn store_blossom_blob_without_blocking_runtime(
503    state: &AppState,
504    data: axum::body::Bytes,
505    pubkey: [u8; 32],
506    track_ownership: bool,
507) -> anyhow::Result<()> {
508    let mut hasher = Sha256::new();
509    hasher.update(&data);
510    let hash_hex = hex::encode(hasher.finalize());
511    let data_for_cache = data.clone();
512    let permit = acquire_blob_write()
513        .await
514        .map_err(|err| anyhow::anyhow!(err))?;
515    let store = state.store.clone();
516    tokio::task::spawn_blocking(move || {
517        let _permit = permit;
518        if track_ownership {
519            store.put_owned_blob(&data, &pubkey)?;
520        } else {
521            store.put_cached_blob(&data)?;
522        }
523        Ok::<(), anyhow::Error>(())
524    })
525    .await
526    .map_err(|err| anyhow::anyhow!("blob write task failed: {}", err))??;
527    state
528        .blob_cache
529        .put_size(hash_hex.clone(), Some(data_for_cache.len() as u64));
530    state.blob_cache.put_body(hash_hex, &data_for_cache);
531    Ok(())
532}
533
534fn upload_descriptor_response(status: StatusCode, descriptor: &BlobDescriptor) -> Response<Body> {
535    Response::builder()
536        .status(status)
537        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
538        .header(header::CONTENT_TYPE, "application/json")
539        .body(Body::from(serde_json::to_string(descriptor).unwrap()))
540        .unwrap()
541}
542
543fn optimistic_upload_queue_timeout() -> Duration {
544    let millis = std::env::var(OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS_ENV)
545        .ok()
546        .and_then(|value| value.parse::<u64>().ok())
547        .filter(|value| *value > 0)
548        .unwrap_or(DEFAULT_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS);
549    Duration::from_millis(millis)
550}
551
552fn optimistic_upload_inflight() -> &'static Mutex<HashSet<String>> {
553    static INFLIGHT: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
554    INFLIGHT.get_or_init(|| Mutex::new(HashSet::new()))
555}
556
557fn optimistic_upload_is_inflight(hash_hex: &str) -> bool {
558    optimistic_upload_inflight()
559        .lock()
560        .is_ok_and(|inflight| inflight.contains(hash_hex))
561}
562
563fn mark_optimistic_upload_inflight(hash_hex: &str) -> bool {
564    optimistic_upload_inflight()
565        .lock()
566        .map(|mut inflight| inflight.insert(hash_hex.to_string()))
567        .unwrap_or(true)
568}
569
570fn clear_optimistic_upload_inflight(hash_hex: &str) {
571    if let Ok(mut inflight) = optimistic_upload_inflight().lock() {
572        inflight.remove(hash_hex);
573    }
574}
575
576pub(super) fn optimistic_upload_queue_snapshot(state: &AppState) -> OptimisticUploadQueueSnapshot {
577    let max_bytes = state.optimistic_upload_queue_bytes;
578    let available_bytes = state
579        .optimistic_upload_queue
580        .available_permits()
581        .min(max_bytes);
582    let in_flight = optimistic_upload_inflight()
583        .lock()
584        .map(|inflight| inflight.len())
585        .unwrap_or(0);
586
587    OptimisticUploadQueueSnapshot {
588        enabled: state.optimistic_blossom_uploads,
589        max_bytes,
590        available_bytes,
591        reserved_bytes: max_bytes.saturating_sub(available_bytes),
592        in_flight,
593        queue_timeout_ms: duration_millis_u64(optimistic_upload_queue_timeout()),
594    }
595}
596
597fn duration_millis_u64(duration: Duration) -> u64 {
598    duration.as_millis().min(u128::from(u64::MAX)) as u64
599}
600
601async fn acquire_optimistic_upload_queue(
602    state: &AppState,
603    permits: u32,
604) -> Result<tokio::sync::OwnedSemaphorePermit, &'static str> {
605    match tokio::time::timeout(
606        optimistic_upload_queue_timeout(),
607        state
608            .optimistic_upload_queue
609            .clone()
610            .acquire_many_owned(permits),
611    )
612    .await
613    {
614        Ok(Ok(permit)) => Ok(permit),
615        Ok(Err(_)) => Err("Optimistic upload queue is closed"),
616        Err(_) => Err("Optimistic upload queue is full"),
617    }
618}
619
620async fn uploaded_blob_already_exists(
621    state: &AppState,
622    sha256_hash: [u8; 32],
623    sha256_hex: &str,
624) -> Result<bool, String> {
625    if let Some(Some(_)) = state.blob_cache.get_size(sha256_hex) {
626        return Ok(true);
627    }
628
629    let permit = acquire_blob_read().await.map_err(str::to_string)?;
630    let store = state.store.clone();
631    let size_read = tokio::task::spawn_blocking(move || {
632        let _permit = permit;
633        store
634            .blob_size(&sha256_hash)
635            .map_err(|error| error.to_string())
636    });
637
638    match tokio::time::timeout(blob_read_timeout(), size_read).await {
639        Ok(Ok(Ok(size))) => {
640            state.blob_cache.put_size(sha256_hex.to_string(), size);
641            Ok(size.is_some())
642        }
643        Ok(Ok(Err(error))) => Err(error),
644        Ok(Err(error)) => Err(format!("blob existence task failed: {}", error)),
645        Err(_) => Err("blob existence check timed out".to_string()),
646    }
647}
648
649async fn set_existing_blob_owner_without_body_write(
650    state: AppState,
651    sha256_hash: [u8; 32],
652    pubkey: [u8; 32],
653) -> anyhow::Result<()> {
654    tokio::task::spawn_blocking(move || state.store.set_blob_owner(&sha256_hash, &pubkey))
655        .await
656        .map_err(|error| anyhow::anyhow!("blob owner task failed: {}", error))??;
657    Ok(())
658}
659
660/// PUT /upload - Upload a new blob (BUD-02)
661pub async fn upload_blob(
662    State(state): State<AppState>,
663    headers: HeaderMap,
664    body: axum::body::Bytes,
665) -> impl IntoResponse {
666    // Check size limit first (before auth to save resources)
667    let max_size = state.max_upload_bytes;
668    if body.len() > max_size {
669        return Response::builder()
670            .status(StatusCode::PAYLOAD_TOO_LARGE)
671            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
672            .header(header::CONTENT_TYPE, "application/json")
673            .body(Body::from(format!(
674                r#"{{"error":"Upload size {} bytes exceeds maximum {} bytes ({} MB)"}}"#,
675                body.len(),
676                max_size,
677                max_size / 1024 / 1024
678            )))
679            .unwrap();
680    }
681
682    // Verify authorization
683    let auth = match verify_blossom_auth(&headers, "upload", None) {
684        Ok(a) => a,
685        Err((status, reason)) => {
686            return Response::builder()
687                .status(status)
688                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
689                .header("X-Reason", reason)
690                .header(header::CONTENT_TYPE, "application/json")
691                .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
692                .unwrap();
693        }
694    };
695
696    // Get content type from header
697    let content_type = headers
698        .get(header::CONTENT_TYPE)
699        .and_then(|v| v.to_str().ok())
700        .unwrap_or("application/octet-stream")
701        .to_string();
702
703    // Check write access: either in allowed_npubs/social graph OR public_writes is enabled.
704    let is_allowed_author = is_allowed_write_author(&state, &auth.pubkey);
705    let can_upload = can_accept_upload_author(&state, &auth.pubkey);
706    if !can_upload {
707        let _ = check_write_access(&state, &auth.pubkey);
708        return Response::builder()
709            .status(StatusCode::FORBIDDEN)
710            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
711            .header(header::CONTENT_TYPE, "application/json")
712            .body(Body::from(r#"{"error":"Write access denied. Your pubkey is not in the allowed list and public writes are disabled."}"#))
713            .unwrap();
714    }
715
716    if let Err((status, reason)) = validate_upload_payload(
717        &body,
718        &content_type,
719        can_upload,
720        state.require_random_untrusted_ingest,
721    ) {
722        return blossom_json_error(status, reason);
723    }
724
725    // Compute SHA256 of uploaded data
726    let mut hasher = Sha256::new();
727    hasher.update(&body);
728    let sha256_hash: [u8; 32] = hasher.finalize().into();
729    let sha256_hex = hex::encode(sha256_hash);
730
731    // If auth has x tags, verify hash matches
732    if !auth.blob_hashes.is_empty() && !auth.blob_hashes.contains(&sha256_hex) {
733        return Response::builder()
734            .status(StatusCode::FORBIDDEN)
735            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
736            .header(
737                "X-Reason",
738                "Uploaded blob hash does not match authorized hash",
739            )
740            .header(header::CONTENT_TYPE, "application/json")
741            .body(Body::from(r#"{"error":"Hash mismatch"}"#))
742            .unwrap();
743    }
744
745    // Convert pubkey hex to bytes
746    let pubkey_bytes = match from_hex(&auth.pubkey) {
747        Ok(b) => b,
748        Err(_) => {
749            return Response::builder()
750                .status(StatusCode::BAD_REQUEST)
751                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
752                .header("X-Reason", "Invalid pubkey format")
753                .body(Body::empty())
754                .unwrap();
755        }
756    };
757
758    let size = body.len() as u64;
759    let now = SystemTime::now()
760        .duration_since(UNIX_EPOCH)
761        .unwrap()
762        .as_secs();
763    let ext = mime_to_extension(&content_type_base(&content_type));
764    let descriptor = BlobDescriptor {
765        url: format!("/{}{}", sha256_hex, ext),
766        sha256: sha256_hex.clone(),
767        size,
768        mime_type: content_type,
769        uploaded: now,
770    };
771
772    if state.optimistic_blossom_uploads && optimistic_upload_is_inflight(&sha256_hex) {
773        return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
774    }
775
776    // Store public-write blobs in cache storage unless the writer is explicitly
777    // allowed, so untrusted public uploads do not become protected owned data.
778    if state.optimistic_blossom_uploads {
779        let queued_bytes = body.len().max(OPTIMISTIC_UPLOAD_MIN_QUEUE_CHARGE_BYTES);
780        if queued_bytes <= state.optimistic_upload_queue_bytes {
781            let permits = queued_bytes as u32;
782            let marked_inflight = mark_optimistic_upload_inflight(&sha256_hex);
783            if !marked_inflight {
784                return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
785            }
786            let permit = match acquire_optimistic_upload_queue(&state, permits).await {
787                Ok(permit) => permit,
788                Err(_) => {
789                    clear_optimistic_upload_inflight(&sha256_hex);
790                    return blossom_retryable_json_error(
791                        StatusCode::SERVICE_UNAVAILABLE,
792                        "Optimistic upload queue is full",
793                        2,
794                    );
795                }
796            };
797            let state_for_write = state.clone();
798            let hash_for_log = sha256_hex.clone();
799            tokio::spawn(async move {
800                let _permit = permit;
801                if let Err(error) = store_blossom_blob_without_blocking_runtime(
802                    &state_for_write,
803                    body,
804                    pubkey_bytes,
805                    is_allowed_author,
806                )
807                .await
808                {
809                    tracing::error!(
810                        "Background Blossom storage failed for {}: {:#}",
811                        hash_for_log,
812                        error
813                    );
814                }
815                clear_optimistic_upload_inflight(&hash_for_log);
816            });
817
818            return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
819        }
820
821        tracing::warn!(
822            "Blossom upload {} is larger than optimistic queue budget {}; storing synchronously",
823            queued_bytes,
824            state.optimistic_upload_queue_bytes
825        );
826    }
827
828    match uploaded_blob_already_exists(&state, sha256_hash, &sha256_hex).await {
829        Ok(true) => {
830            if is_allowed_author {
831                if let Err(error) = set_existing_blob_owner_without_body_write(
832                    state.clone(),
833                    sha256_hash,
834                    pubkey_bytes,
835                )
836                .await
837                {
838                    return Response::builder()
839                        .status(StatusCode::INTERNAL_SERVER_ERROR)
840                        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
841                        .header("X-Reason", "Storage error")
842                        .header(header::CONTENT_TYPE, "application/json")
843                        .body(Body::from(format!(r#"{{"error":"{}"}}"#, error)))
844                        .unwrap();
845                }
846            }
847            return upload_descriptor_response(StatusCode::CONFLICT, &descriptor);
848        }
849        Ok(false) => {}
850        Err(error) => {
851            tracing::debug!(
852                "Could not preflight Blossom upload {} before synchronous storage: {}",
853                sha256_hex,
854                error
855            );
856        }
857    }
858
859    let store_result = store_blossom_blob_without_blocking_runtime(
860        &state,
861        body.clone(),
862        pubkey_bytes,
863        is_allowed_author,
864    )
865    .await;
866
867    match store_result {
868        Ok(()) => upload_descriptor_response(StatusCode::OK, &descriptor),
869        Err(e) => Response::builder()
870            .status(StatusCode::INTERNAL_SERVER_ERROR)
871            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
872            .header("X-Reason", "Storage error")
873            .header(header::CONTENT_TYPE, "application/json")
874            .body(Body::from(format!(r#"{{"error":"{}"}}"#, e)))
875            .unwrap(),
876    }
877}
878
879/// DELETE /<sha256> - Delete a blob (BUD-02)
880/// Note: Blob is only fully deleted when ALL owners have removed it
881pub async fn delete_blob(
882    State(state): State<AppState>,
883    Path(id): Path<String>,
884    headers: HeaderMap,
885) -> impl IntoResponse {
886    let (hash_part, _) = parse_hash_and_extension(&id);
887
888    if !is_valid_sha256(hash_part) {
889        return Response::builder()
890            .status(StatusCode::BAD_REQUEST)
891            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
892            .header("X-Reason", "Invalid SHA256 hash")
893            .body(Body::empty())
894            .unwrap();
895    }
896
897    let sha256_hex = hash_part.to_lowercase();
898
899    // Convert hash to bytes
900    let sha256_bytes = match from_hex(&sha256_hex) {
901        Ok(b) => b,
902        Err(_) => {
903            return Response::builder()
904                .status(StatusCode::BAD_REQUEST)
905                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
906                .header("X-Reason", "Invalid SHA256 hash format")
907                .body(Body::empty())
908                .unwrap();
909        }
910    };
911
912    // Verify authorization with hash requirement
913    let auth = match verify_blossom_auth(&headers, "delete", Some(&sha256_hex)) {
914        Ok(a) => a,
915        Err((status, reason)) => {
916            return Response::builder()
917                .status(status)
918                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
919                .header("X-Reason", reason)
920                .body(Body::empty())
921                .unwrap();
922        }
923    };
924
925    // Convert pubkey hex to bytes
926    let pubkey_bytes = match from_hex(&auth.pubkey) {
927        Ok(b) => b,
928        Err(_) => {
929            return Response::builder()
930                .status(StatusCode::BAD_REQUEST)
931                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
932                .header("X-Reason", "Invalid pubkey format")
933                .body(Body::empty())
934                .unwrap();
935        }
936    };
937
938    // Check ownership - user must be one of the owners (O(1) lookup with composite key)
939    match state.store.is_blob_owner(&sha256_bytes, &pubkey_bytes) {
940        Ok(true) => {
941            // User is an owner, proceed with delete
942        }
943        Ok(false) => {
944            // Check if blob exists at all (for proper error message)
945            match state.store.blob_has_owners(&sha256_bytes) {
946                Ok(true) => {
947                    return Response::builder()
948                        .status(StatusCode::FORBIDDEN)
949                        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
950                        .header("X-Reason", "Not a blob owner")
951                        .body(Body::empty())
952                        .unwrap();
953                }
954                Ok(false) => {
955                    return Response::builder()
956                        .status(StatusCode::NOT_FOUND)
957                        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
958                        .header(header::CACHE_CONTROL, NOT_FOUND_CACHE_CONTROL)
959                        .header("X-Reason", "Blob not found")
960                        .body(Body::empty())
961                        .unwrap();
962                }
963                Err(_) => {
964                    return Response::builder()
965                        .status(StatusCode::INTERNAL_SERVER_ERROR)
966                        .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
967                        .body(Body::empty())
968                        .unwrap();
969                }
970            }
971        }
972        Err(_) => {
973            return Response::builder()
974                .status(StatusCode::INTERNAL_SERVER_ERROR)
975                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
976                .body(Body::empty())
977                .unwrap();
978        }
979    }
980
981    // Remove this user's ownership (blob only deleted when no owners remain)
982    match state
983        .store
984        .delete_blossom_blob(&sha256_bytes, &pubkey_bytes)
985    {
986        Ok(fully_deleted) => {
987            // Return 200 OK whether blob was fully deleted or just removed from user's list
988            // The client doesn't need to know if other owners still exist
989            Response::builder()
990                .status(StatusCode::OK)
991                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
992                .header(
993                    "X-Blob-Deleted",
994                    if fully_deleted { "true" } else { "false" },
995                )
996                .body(Body::empty())
997                .unwrap()
998        }
999        Err(_) => Response::builder()
1000            .status(StatusCode::INTERNAL_SERVER_ERROR)
1001            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1002            .body(Body::empty())
1003            .unwrap(),
1004    }
1005}
1006
1007/// GET /list/<pubkey> - List blobs for a pubkey (BUD-02)
1008pub async fn list_blobs(
1009    State(state): State<AppState>,
1010    Path(pubkey): Path<String>,
1011    Query(query): Query<ListQuery>,
1012    headers: HeaderMap,
1013) -> impl IntoResponse {
1014    // Validate pubkey format (64 hex chars)
1015    if pubkey.len() != 64 || !pubkey.chars().all(|c| c.is_ascii_hexdigit()) {
1016        return Response::builder()
1017            .status(StatusCode::BAD_REQUEST)
1018            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1019            .header("X-Reason", "Invalid pubkey format")
1020            .header(header::CONTENT_TYPE, "application/json")
1021            .body(Body::from("[]"))
1022            .unwrap();
1023    }
1024
1025    let pubkey_hex = pubkey.to_lowercase();
1026    let pubkey_bytes: [u8; 32] = match from_hex(&pubkey_hex) {
1027        Ok(b) => b,
1028        Err(_) => {
1029            return Response::builder()
1030                .status(StatusCode::BAD_REQUEST)
1031                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1032                .header("X-Reason", "Invalid pubkey format")
1033                .header(header::CONTENT_TYPE, "application/json")
1034                .body(Body::from("[]"))
1035                .unwrap()
1036        }
1037    };
1038
1039    let auth = match verify_blossom_auth(&headers, "list", None) {
1040        Ok(auth) => auth,
1041        Err((status, reason)) => {
1042            return Response::builder()
1043                .status(status)
1044                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1045                .header("X-Reason", reason)
1046                .header(header::CONTENT_TYPE, "application/json")
1047                .body(Body::from("[]"))
1048                .unwrap();
1049        }
1050    };
1051
1052    if !auth.pubkey.eq_ignore_ascii_case(&pubkey_hex) {
1053        return Response::builder()
1054            .status(StatusCode::FORBIDDEN)
1055            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1056            .header("X-Reason", "Pubkey mismatch")
1057            .header(header::CONTENT_TYPE, "application/json")
1058            .body(Body::from("[]"))
1059            .unwrap();
1060    }
1061
1062    // Get blobs for this pubkey
1063    match state.store.list_blobs_by_pubkey(&pubkey_bytes) {
1064        Ok(blobs) => {
1065            // Apply filters
1066            let mut filtered: Vec<_> = blobs
1067                .into_iter()
1068                .filter(|b| {
1069                    if let Some(since) = query.since {
1070                        if b.uploaded < since {
1071                            return false;
1072                        }
1073                    }
1074                    if let Some(until) = query.until {
1075                        if b.uploaded > until {
1076                            return false;
1077                        }
1078                    }
1079                    true
1080                })
1081                .collect();
1082
1083            // Sort by uploaded descending (most recent first)
1084            filtered.sort_by(|a, b| b.uploaded.cmp(&a.uploaded));
1085
1086            // Apply limit
1087            let limit = query.limit.unwrap_or(100).min(1000);
1088            filtered.truncate(limit);
1089
1090            Response::builder()
1091                .status(StatusCode::OK)
1092                .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1093                .header(header::CONTENT_TYPE, "application/json")
1094                .body(Body::from(serde_json::to_string(&filtered).unwrap()))
1095                .unwrap()
1096        }
1097        Err(_) => Response::builder()
1098            .status(StatusCode::INTERNAL_SERVER_ERROR)
1099            .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1100            .header(header::CONTENT_TYPE, "application/json")
1101            .body(Body::from("[]"))
1102            .unwrap(),
1103    }
1104}
1105
1106// Helper functions
1107
1108fn parse_hash_and_extension(id: &str) -> (&str, Option<&str>) {
1109    if let Some(dot_pos) = id.rfind('.') {
1110        (&id[..dot_pos], Some(&id[dot_pos..]))
1111    } else {
1112        (id, None)
1113    }
1114}
1115
1116fn is_valid_sha256(s: &str) -> bool {
1117    s.len() == 64 && s.chars().all(|c| c.is_ascii_hexdigit())
1118}
1119
1120#[cfg(test)]
1121fn store_blossom_blob(
1122    state: &AppState,
1123    data: &[u8],
1124    _sha256: &[u8; 32],
1125    pubkey: &[u8; 32],
1126    track_ownership: bool,
1127) -> anyhow::Result<()> {
1128    if track_ownership {
1129        state.store.put_owned_blob(data, pubkey)?;
1130    } else {
1131        state.store.put_cached_blob(data)?;
1132    }
1133
1134    Ok(())
1135}
1136
1137fn mime_to_extension(mime: &str) -> &'static str {
1138    match mime {
1139        "image/png" => ".png",
1140        "image/jpeg" => ".jpg",
1141        "image/gif" => ".gif",
1142        "image/webp" => ".webp",
1143        "image/svg+xml" => ".svg",
1144        "video/mp4" => ".mp4",
1145        "video/webm" => ".webm",
1146        "audio/mpeg" => ".mp3",
1147        "audio/ogg" => ".ogg",
1148        "application/pdf" => ".pdf",
1149        "text/plain" => ".txt",
1150        "text/html" => ".html",
1151        "application/json" => ".json",
1152        _ => "",
1153    }
1154}
1155
1156#[cfg(test)]
1157mod tests {
1158    use super::*;
1159    use crate::server::auth::WsRelayState;
1160    use crate::storage::HashtreeStore;
1161    use axum::response::IntoResponse;
1162    use base64::Engine;
1163    use hashtree_core::sha256;
1164    use std::collections::HashSet;
1165    use std::sync::{Arc, Mutex as StdMutex};
1166    use std::time::Duration;
1167    use tempfile::TempDir;
1168
1169    fn test_app_state(store: Arc<HashtreeStore>) -> AppState {
1170        AppState {
1171            store,
1172            auth: None,
1173            daemon_started_at: 1_700_000_000,
1174            peer_mode: crate::config::ServerMode::Normal,
1175            hash_get_enabled: true,
1176            http_webrtc_fetch: true,
1177            webrtc_peers: None,
1178            ws_relay: Arc::new(WsRelayState::new()),
1179            max_upload_bytes: 5 * 1024 * 1024,
1180            public_writes: true,
1181            require_random_untrusted_ingest: true,
1182            optimistic_blossom_uploads: false,
1183            optimistic_upload_queue_bytes: 512 * 1024 * 1024,
1184            optimistic_upload_queue: Arc::new(tokio::sync::Semaphore::new(512 * 1024 * 1024)),
1185            allowed_pubkeys: HashSet::new(),
1186            upstream_blossom: Vec::new(),
1187            social_graph: None,
1188            social_graph_store: None,
1189            social_graph_root: None,
1190            socialgraph_snapshot_public: false,
1191            nostr_relay: None,
1192            nostr_relay_urls: Vec::new(),
1193            tree_root_cache: Arc::new(StdMutex::new(std::collections::HashMap::new())),
1194            inflight_blob_fetches: Arc::new(tokio::sync::Mutex::new(
1195                std::collections::HashMap::new(),
1196            )),
1197            inflight_blob_reads: Arc::new(
1198                tokio::sync::Mutex::new(std::collections::HashMap::new()),
1199            ),
1200            blob_cache: Arc::new(crate::blob_cache::BlobCache::for_tests()),
1201            directory_listing_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1202            resolved_path_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1203            thumbnail_path_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1204            cid_size_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1205        }
1206    }
1207
1208    fn create_upload_auth_header(keys: &nostr::Keys) -> String {
1209        use nostr::{EventBuilder, Kind, Tag, TagKind, Timestamp};
1210
1211        let now = Timestamp::now();
1212        let event = EventBuilder::new(
1213            Kind::Custom(BLOSSOM_AUTH_KIND),
1214            "",
1215            vec![
1216                Tag::custom(TagKind::Custom("t".into()), vec!["upload".to_string()]),
1217                Tag::custom(
1218                    TagKind::Custom("expiration".into()),
1219                    vec![(now.as_u64() + 300).to_string()],
1220                ),
1221            ],
1222        )
1223        .custom_created_at(now)
1224        .to_event(keys)
1225        .expect("sign blossom auth");
1226        let json = serde_json::to_vec(&event).expect("serialize auth event");
1227        format!(
1228            "Nostr {}",
1229            base64::engine::general_purpose::STANDARD.encode(json)
1230        )
1231    }
1232
1233    #[test]
1234    fn test_is_valid_sha256() {
1235        assert!(is_valid_sha256(
1236            "e2bab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6"
1237        ));
1238        assert!(is_valid_sha256(
1239            "0000000000000000000000000000000000000000000000000000000000000000"
1240        ));
1241
1242        // Too short
1243        assert!(!is_valid_sha256("e2bab35b5296ec2242ded0a01f6d6723"));
1244        // Too long
1245        assert!(!is_valid_sha256(
1246            "e2bab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6aa"
1247        ));
1248        // Invalid chars
1249        assert!(!is_valid_sha256(
1250            "zzbab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6"
1251        ));
1252        // Empty
1253        assert!(!is_valid_sha256(""));
1254    }
1255
1256    #[test]
1257    fn test_parse_hash_and_extension() {
1258        let (hash, ext) = parse_hash_and_extension("abc123.png");
1259        assert_eq!(hash, "abc123");
1260        assert_eq!(ext, Some(".png"));
1261
1262        let (hash2, ext2) = parse_hash_and_extension("abc123");
1263        assert_eq!(hash2, "abc123");
1264        assert_eq!(ext2, None);
1265
1266        let (hash3, ext3) = parse_hash_and_extension("abc.123.jpg");
1267        assert_eq!(hash3, "abc.123");
1268        assert_eq!(ext3, Some(".jpg"));
1269    }
1270
1271    #[test]
1272    fn test_mime_to_extension() {
1273        assert_eq!(mime_to_extension("image/png"), ".png");
1274        assert_eq!(mime_to_extension("image/jpeg"), ".jpg");
1275        assert_eq!(mime_to_extension("video/mp4"), ".mp4");
1276        assert_eq!(mime_to_extension("application/octet-stream"), "");
1277        assert_eq!(mime_to_extension("unknown/type"), "");
1278    }
1279
1280    #[tokio::test]
1281    async fn optimistic_uploads_return_accepted_and_store_in_background() {
1282        let temp_dir = TempDir::new().expect("temp dir");
1283        let store = Arc::new(
1284            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1285        );
1286        let mut state = test_app_state(Arc::clone(&store));
1287        state.optimistic_blossom_uploads = true;
1288
1289        let keys = nostr::Keys::generate();
1290        let mut headers = HeaderMap::new();
1291        headers.insert(
1292            header::AUTHORIZATION,
1293            create_upload_auth_header(&keys)
1294                .parse()
1295                .expect("auth header value"),
1296        );
1297        headers.insert(
1298            header::CONTENT_TYPE,
1299            "application/octet-stream"
1300                .parse()
1301                .expect("content type header value"),
1302        );
1303
1304        let body = axum::body::Bytes::from((0u8..=255).map(|byte| byte ^ 0x55).collect::<Vec<_>>());
1305        let hash = sha256(&body);
1306        let response = upload_blob(State(state), headers, body)
1307            .await
1308            .into_response();
1309        assert_eq!(response.status(), StatusCode::ACCEPTED);
1310
1311        for _ in 0..50 {
1312            if store.blob_exists(&hash).expect("blob exists check") {
1313                return;
1314            }
1315            tokio::time::sleep(Duration::from_millis(10)).await;
1316        }
1317
1318        panic!("optimistic upload was not stored in the background");
1319    }
1320
1321    #[tokio::test]
1322    async fn optimistic_upload_existing_blob_skips_queue() {
1323        let temp_dir = TempDir::new().expect("temp dir");
1324        let store = Arc::new(
1325            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1326        );
1327        let body = axum::body::Bytes::from(
1328            (0u16..=255)
1329                .map(|value| ((value * 73 + 19) % 256) as u8)
1330                .collect::<Vec<_>>(),
1331        );
1332        store.put_cached_blob(&body).expect("seed blob");
1333
1334        let mut state = test_app_state(Arc::clone(&store));
1335        state.optimistic_blossom_uploads = true;
1336        state.optimistic_upload_queue_bytes = 1;
1337        state.optimistic_upload_queue = Arc::new(tokio::sync::Semaphore::new(1));
1338
1339        let keys = nostr::Keys::generate();
1340        let mut headers = HeaderMap::new();
1341        headers.insert(
1342            header::AUTHORIZATION,
1343            create_upload_auth_header(&keys)
1344                .parse()
1345                .expect("auth header value"),
1346        );
1347        headers.insert(
1348            header::CONTENT_TYPE,
1349            "application/octet-stream"
1350                .parse()
1351                .expect("content type header value"),
1352        );
1353
1354        let response = upload_blob(State(state), headers, body)
1355            .await
1356            .into_response();
1357        assert_eq!(response.status(), StatusCode::CONFLICT);
1358    }
1359
1360    #[tokio::test]
1361    async fn optimistic_upload_existing_blob_uses_queue_before_preflight_when_queue_has_room() {
1362        let temp_dir = TempDir::new().expect("temp dir");
1363        let store = Arc::new(
1364            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1365        );
1366        let body = axum::body::Bytes::from((0u8..=255).rev().collect::<Vec<_>>());
1367        let hash_hex = hex::encode(sha256(&body));
1368        store.put_cached_blob(&body).expect("seed blob");
1369
1370        let mut state = test_app_state(store);
1371        state.optimistic_blossom_uploads = true;
1372
1373        let keys = nostr::Keys::generate();
1374        let mut headers = HeaderMap::new();
1375        headers.insert(
1376            header::AUTHORIZATION,
1377            create_upload_auth_header(&keys)
1378                .parse()
1379                .expect("auth header value"),
1380        );
1381        headers.insert(
1382            header::CONTENT_TYPE,
1383            "application/octet-stream"
1384                .parse()
1385                .expect("content type header value"),
1386        );
1387
1388        let response = upload_blob(State(state), headers, body)
1389            .await
1390            .into_response();
1391        assert_eq!(response.status(), StatusCode::ACCEPTED);
1392
1393        for _ in 0..50 {
1394            if !optimistic_upload_is_inflight(&hash_hex) {
1395                return;
1396            }
1397            tokio::time::sleep(Duration::from_millis(10)).await;
1398        }
1399
1400        clear_optimistic_upload_inflight(&hash_hex);
1401        panic!("optimistic upload in-flight marker was not cleared");
1402    }
1403
1404    #[tokio::test]
1405    async fn optimistic_upload_inflight_duplicate_skips_queue() {
1406        let temp_dir = TempDir::new().expect("temp dir");
1407        let store = Arc::new(
1408            HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1409        );
1410        let body = axum::body::Bytes::from((0u8..=255).collect::<Vec<_>>());
1411        let hash_hex = hex::encode(sha256(&body));
1412        assert!(mark_optimistic_upload_inflight(&hash_hex));
1413
1414        let mut state = test_app_state(store);
1415        state.optimistic_blossom_uploads = true;
1416        state.optimistic_upload_queue_bytes = 1;
1417        state.optimistic_upload_queue = Arc::new(tokio::sync::Semaphore::new(1));
1418
1419        let keys = nostr::Keys::generate();
1420        let mut headers = HeaderMap::new();
1421        headers.insert(
1422            header::AUTHORIZATION,
1423            create_upload_auth_header(&keys)
1424                .parse()
1425                .expect("auth header value"),
1426        );
1427        headers.insert(
1428            header::CONTENT_TYPE,
1429            "application/octet-stream"
1430                .parse()
1431                .expect("content type header value"),
1432        );
1433
1434        let response = upload_blob(State(state), headers, body)
1435            .await
1436            .into_response();
1437        clear_optimistic_upload_inflight(&hash_hex);
1438        assert_eq!(response.status(), StatusCode::ACCEPTED);
1439    }
1440
1441    #[test]
1442    fn public_writes_accept_unlisted_authors_for_uploads() {
1443        let temp_dir = TempDir::new().expect("temp dir");
1444        let store =
1445            Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 700).expect("store"));
1446        let mut state = test_app_state(store);
1447        let pubkey = "ea4fe79e57f209309bffed2f92f0b95b59d3d1cb4e8892444398aeea7ee317ed";
1448
1449        state.public_writes = true;
1450        assert!(can_accept_upload_author(&state, pubkey));
1451        assert!(!is_allowed_write_author(&state, pubkey));
1452
1453        state.public_writes = false;
1454        assert!(!can_accept_upload_author(&state, pubkey));
1455    }
1456
1457    #[test]
1458    fn public_write_trust_allows_octet_stream_and_raw_media_payloads() {
1459        let encrypted_block: Vec<u8> = (0..=255).collect();
1460
1461        assert_eq!(
1462            validate_upload_payload(&encrypted_block, "application/octet-stream", false, true,),
1463            Ok(())
1464        );
1465
1466        assert_eq!(
1467            validate_upload_payload(b"audio bytes", "audio/mpeg", true, true,),
1468            Ok(())
1469        );
1470
1471        assert_eq!(
1472            validate_upload_payload(b"audio bytes", "audio/mpeg", false, true,),
1473            Err((
1474                StatusCode::FORBIDDEN,
1475                "Raw media uploads require write access".to_string(),
1476            ))
1477        );
1478    }
1479
1480    #[test]
1481    fn unowned_public_uploads_use_cache_storage_semantics() {
1482        let temp_dir = TempDir::new().expect("temp dir");
1483        let store =
1484            Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 700).expect("store"));
1485        let state = test_app_state(Arc::clone(&store));
1486
1487        let owned = vec![1u8; 280];
1488        let owned_hash = sha256(&owned);
1489        store_blossom_blob(&state, &owned, &owned_hash, &[2u8; 32], true).expect("owned upload");
1490
1491        let public_upload = vec![3u8; 280];
1492        let public_hash = sha256(&public_upload);
1493        store_blossom_blob(&state, &public_upload, &public_hash, &[4u8; 32], false)
1494            .expect("public upload");
1495
1496        let replacement = vec![5u8; 280];
1497        let replacement_hash = sha256(&replacement);
1498        state
1499            .store
1500            .put_cached_blob(&replacement)
1501            .expect("replacement cached blob");
1502
1503        assert!(state.store.blob_exists(&owned_hash).expect("owned exists"));
1504        assert!(!state
1505            .store
1506            .blob_exists(&public_hash)
1507            .expect("public upload evicted"));
1508        assert!(state
1509            .store
1510            .blob_exists(&replacement_hash)
1511            .expect("replacement exists"));
1512        assert!(state
1513            .store
1514            .is_blob_owner(&owned_hash, &[2u8; 32])
1515            .expect("owned tracked"));
1516        assert!(!state
1517            .store
1518            .blob_has_owners(&public_hash)
1519            .expect("public upload unowned"));
1520    }
1521
1522    #[test]
1523    fn owned_blossom_uploads_are_rejected_when_storage_limit_is_full() {
1524        let temp_dir = TempDir::new().expect("temp dir");
1525        let store =
1526            Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 500).expect("store"));
1527        let state = test_app_state(Arc::clone(&store));
1528
1529        let first = vec![1u8; 300];
1530        let first_hash = sha256(&first);
1531        let owner = [2u8; 32];
1532        store_blossom_blob(&state, &first, &first_hash, &owner, true).expect("first upload");
1533
1534        let second = vec![3u8; 300];
1535        let second_hash = sha256(&second);
1536        let error = store_blossom_blob(&state, &second, &second_hash, &owner, true)
1537            .expect_err("second owned upload should exceed the storage limit");
1538
1539        assert!(
1540            error.to_string().contains("storage limit"),
1541            "unexpected error: {error}"
1542        );
1543        assert!(state
1544            .store
1545            .blob_exists(&first_hash)
1546            .expect("first blob remains"));
1547        assert!(!state
1548            .store
1549            .blob_exists(&second_hash)
1550            .expect("second blob rejected"));
1551        assert!(state
1552            .store
1553            .is_blob_owner(&first_hash, &owner)
1554            .expect("first owner tracked"));
1555        assert!(!state
1556            .store
1557            .is_blob_owner(&second_hash, &owner)
1558            .expect("second owner not tracked"));
1559    }
1560}