1use axum::{
7 body::Body,
8 extract::{Path, Query, State},
9 http::{header, HeaderMap, Response, StatusCode},
10 response::IntoResponse,
11};
12use base64::Engine;
13use hashtree_core::from_hex;
14use serde::{Deserialize, Serialize};
15use sha2::{Digest, Sha256};
16use std::collections::HashSet;
17use std::sync::{Mutex, OnceLock};
18use std::time::{Duration, SystemTime, UNIX_EPOCH};
19
20use super::auth::AppState;
21use super::blob_read::{acquire_blob_read, acquire_blob_write, blob_read_timeout, BLOB_READ_BUSY};
22use super::ingest_filter::{
23 content_type_base, is_chk_content_type, validate_untrusted_blob, IngestRejection,
24};
25use super::mime::get_mime_type;
26
27const BLOSSOM_AUTH_KIND: u16 = 24242;
29
30const IMMUTABLE_CACHE_CONTROL: &str = "public, max-age=31536000, immutable";
32const NOT_FOUND_CACHE_CONTROL: &str = "no-store";
33const IMMUTABLE_NOT_FOUND_CACHE_CONTROL: &str = "public, max-age=0, s-maxage=5";
34const OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS_ENV: &str = "HTREE_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS";
35const DEFAULT_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS: u64 = 15_000;
36
37pub const DEFAULT_MAX_UPLOAD_SIZE: usize = 5 * 1024 * 1024;
39const OPTIMISTIC_UPLOAD_MIN_QUEUE_CHARGE_BYTES: usize = 256 * 1024;
40
41#[derive(Debug, Clone, Copy)]
42pub(super) struct OptimisticUploadQueueSnapshot {
43 pub enabled: bool,
44 pub max_bytes: usize,
45 pub available_bytes: usize,
46 pub reserved_bytes: usize,
47 pub in_flight: usize,
48 pub queue_timeout_ms: u64,
49}
50
51#[allow(clippy::result_large_err)]
54fn check_write_access(state: &AppState, pubkey: &str) -> Result<(), Response<Body>> {
55 if is_allowed_write_author(state, pubkey) {
57 tracing::debug!(
58 "Blossom write allowed for {}... (allowed writer)",
59 &pubkey[..8.min(pubkey.len())]
60 );
61 return Ok(());
62 }
63
64 tracing::info!(
66 "Blossom write denied for {}... (not in allowed_npubs or social graph)",
67 &pubkey[..8.min(pubkey.len())]
68 );
69 Err(Response::builder()
70 .status(StatusCode::FORBIDDEN)
71 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
72 .header(header::CONTENT_TYPE, "application/json")
73 .body(Body::from(
74 r#"{"error":"Write access denied. Your pubkey is not in the allowed list."}"#,
75 ))
76 .unwrap())
77}
78
79fn is_allowed_write_author(state: &AppState, pubkey: &str) -> bool {
80 if state.allowed_pubkeys.contains(pubkey) {
81 return true;
82 }
83
84 state
85 .social_graph
86 .as_ref()
87 .map(|sg| sg.check_write_access(pubkey))
88 .unwrap_or(false)
89}
90
91fn can_accept_upload_author(state: &AppState, pubkey: &str) -> bool {
92 state.public_writes || is_allowed_write_author(state, pubkey)
93}
94
95fn validate_upload_payload(
96 body: &[u8],
97 content_type: &str,
98 is_allowed_writer: bool,
99 require_random_untrusted_ingest: bool,
100) -> Result<(), (StatusCode, String)> {
101 let is_chk_upload = is_chk_content_type(content_type);
102
103 if !is_chk_upload && !is_allowed_writer {
104 return Err((
105 StatusCode::FORBIDDEN,
106 "Raw media uploads require write access".to_string(),
107 ));
108 }
109
110 if is_chk_upload {
111 validate_untrusted_blob(body, require_random_untrusted_ingest)
112 .map_err(|IngestRejection { status, reason }| (status, reason))?;
113 }
114
115 Ok(())
116}
117
118fn blossom_json_error(status: StatusCode, reason: impl Into<String>) -> Response<Body> {
119 let reason = reason.into();
120 Response::builder()
121 .status(status)
122 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
123 .header("X-Reason", reason.as_str())
124 .header(header::CONTENT_TYPE, "application/json")
125 .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
126 .unwrap()
127}
128
129fn blossom_retryable_json_error(
130 status: StatusCode,
131 reason: impl Into<String>,
132 retry_after_seconds: u64,
133) -> Response<Body> {
134 let reason = reason.into();
135 Response::builder()
136 .status(status)
137 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
138 .header("X-Reason", reason.as_str())
139 .header(header::RETRY_AFTER, retry_after_seconds.to_string())
140 .header(header::CONTENT_TYPE, "application/json")
141 .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
142 .unwrap()
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct BlobDescriptor {
148 pub url: String,
149 pub sha256: String,
150 pub size: u64,
151 #[serde(rename = "type")]
152 pub mime_type: String,
153 pub uploaded: u64,
154}
155
156#[derive(Debug, Deserialize)]
158pub struct ListQuery {
159 pub since: Option<u64>,
160 pub until: Option<u64>,
161 pub limit: Option<usize>,
162 pub cursor: Option<String>,
163}
164
165#[derive(Debug)]
167pub struct BlossomAuth {
168 pub pubkey: String,
169 pub kind: u16,
170 pub created_at: u64,
171 pub expiration: Option<u64>,
172 pub action: Option<String>, pub blob_hashes: Vec<String>, pub server: Option<String>, }
176
177pub fn verify_blossom_auth(
180 headers: &HeaderMap,
181 required_action: &str,
182 required_hash: Option<&str>,
183) -> Result<BlossomAuth, (StatusCode, &'static str)> {
184 let auth_header = headers
185 .get(header::AUTHORIZATION)
186 .and_then(|v| v.to_str().ok())
187 .ok_or((StatusCode::UNAUTHORIZED, "Missing Authorization header"))?;
188
189 let nostr_event = auth_header.strip_prefix("Nostr ").ok_or((
190 StatusCode::UNAUTHORIZED,
191 "Invalid auth scheme, expected 'Nostr'",
192 ))?;
193
194 let engine = base64::engine::general_purpose::STANDARD;
196 let event_bytes = engine
197 .decode(nostr_event)
198 .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid base64 in auth header"))?;
199
200 let event_json: serde_json::Value = serde_json::from_slice(&event_bytes)
201 .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid JSON in auth event"))?;
202
203 let kind = event_json["kind"]
205 .as_u64()
206 .ok_or((StatusCode::BAD_REQUEST, "Missing kind in event"))?;
207
208 if kind != BLOSSOM_AUTH_KIND as u64 {
209 return Err((
210 StatusCode::BAD_REQUEST,
211 "Invalid event kind, expected 24242",
212 ));
213 }
214
215 let pubkey = event_json["pubkey"]
216 .as_str()
217 .ok_or((StatusCode::BAD_REQUEST, "Missing pubkey in event"))?
218 .to_string();
219
220 let created_at = event_json["created_at"]
221 .as_u64()
222 .ok_or((StatusCode::BAD_REQUEST, "Missing created_at in event"))?;
223
224 let sig = event_json["sig"]
225 .as_str()
226 .ok_or((StatusCode::BAD_REQUEST, "Missing signature in event"))?;
227
228 if !verify_nostr_signature(&event_json, &pubkey, sig) {
230 return Err((StatusCode::UNAUTHORIZED, "Invalid signature"));
231 }
232
233 let tags = event_json["tags"]
235 .as_array()
236 .ok_or((StatusCode::BAD_REQUEST, "Missing tags in event"))?;
237
238 let mut expiration: Option<u64> = None;
239 let mut action: Option<String> = None;
240 let mut blob_hashes: Vec<String> = Vec::new();
241 let mut server: Option<String> = None;
242
243 for tag in tags {
244 let tag_arr = tag.as_array();
245 if let Some(arr) = tag_arr {
246 if arr.len() >= 2 {
247 let tag_name = arr[0].as_str().unwrap_or("");
248 let tag_value = arr[1].as_str().unwrap_or("");
249
250 match tag_name {
251 "t" => action = Some(tag_value.to_string()),
252 "x" => blob_hashes.push(tag_value.to_lowercase()),
253 "expiration" => expiration = tag_value.parse().ok(),
254 "server" => server = Some(tag_value.to_string()),
255 _ => {}
256 }
257 }
258 }
259 }
260
261 let now = SystemTime::now()
263 .duration_since(UNIX_EPOCH)
264 .unwrap()
265 .as_secs();
266
267 if let Some(exp) = expiration {
268 if exp < now {
269 return Err((StatusCode::UNAUTHORIZED, "Authorization expired"));
270 }
271 }
272
273 if created_at > now + 60 {
275 return Err((StatusCode::BAD_REQUEST, "Event created_at is in the future"));
276 }
277
278 if let Some(ref act) = action {
280 if act != required_action {
281 return Err((StatusCode::FORBIDDEN, "Action mismatch"));
282 }
283 } else {
284 return Err((StatusCode::BAD_REQUEST, "Missing 't' tag for action"));
285 }
286
287 if let Some(hash) = required_hash {
289 if !blob_hashes.is_empty() && !blob_hashes.contains(&hash.to_lowercase()) {
290 return Err((StatusCode::FORBIDDEN, "Blob hash not authorized"));
291 }
292 }
293
294 Ok(BlossomAuth {
295 pubkey,
296 kind: kind as u16,
297 created_at,
298 expiration,
299 action,
300 blob_hashes,
301 server,
302 })
303}
304
305fn verify_nostr_signature(event: &serde_json::Value, pubkey: &str, sig: &str) -> bool {
307 use secp256k1::{schnorr::Signature, Message, Secp256k1, XOnlyPublicKey};
308
309 let content = event["content"].as_str().unwrap_or("");
311 let full_serialized = format!(
312 "[0,\"{}\",{},{},{},\"{}\"]",
313 pubkey,
314 event["created_at"],
315 event["kind"],
316 event["tags"],
317 escape_json_string(content),
318 );
319
320 let mut hasher = Sha256::new();
321 hasher.update(full_serialized.as_bytes());
322 let event_id = hasher.finalize();
323
324 let pubkey_bytes = match hex::decode(pubkey) {
326 Ok(b) => b,
327 Err(_) => return false,
328 };
329
330 let sig_bytes = match hex::decode(sig) {
331 Ok(b) => b,
332 Err(_) => return false,
333 };
334
335 let secp = Secp256k1::verification_only();
336
337 let xonly_pubkey = match XOnlyPublicKey::from_slice(&pubkey_bytes) {
338 Ok(pk) => pk,
339 Err(_) => return false,
340 };
341
342 let signature = match Signature::from_slice(&sig_bytes) {
343 Ok(s) => s,
344 Err(_) => return false,
345 };
346
347 let message = match Message::from_digest_slice(&event_id) {
348 Ok(m) => m,
349 Err(_) => return false,
350 };
351
352 secp.verify_schnorr(&signature, &message, &xonly_pubkey)
353 .is_ok()
354}
355
356fn escape_json_string(s: &str) -> String {
358 let mut result = String::new();
359 for c in s.chars() {
360 match c {
361 '"' => result.push_str("\\\""),
362 '\\' => result.push_str("\\\\"),
363 '\n' => result.push_str("\\n"),
364 '\r' => result.push_str("\\r"),
365 '\t' => result.push_str("\\t"),
366 c if c.is_control() => {
367 result.push_str(&format!("\\u{:04x}", c as u32));
368 }
369 c => result.push(c),
370 }
371 }
372 result
373}
374
375pub async fn cors_preflight(headers: HeaderMap) -> impl IntoResponse {
378 let allowed_headers = headers
380 .get(header::ACCESS_CONTROL_REQUEST_HEADERS)
381 .and_then(|v| v.to_str().ok())
382 .unwrap_or("Authorization, Content-Type, X-SHA-256, x-sha-256");
383
384 let full_allowed = format!(
386 "{}, Authorization, Content-Type, X-SHA-256, x-sha-256, Accept, Cache-Control",
387 allowed_headers
388 );
389
390 Response::builder()
391 .status(StatusCode::NO_CONTENT)
392 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
393 .header(
394 header::ACCESS_CONTROL_ALLOW_METHODS,
395 "GET, HEAD, PUT, DELETE, OPTIONS",
396 )
397 .header(header::ACCESS_CONTROL_ALLOW_HEADERS, full_allowed)
398 .header(header::ACCESS_CONTROL_MAX_AGE, "86400")
399 .body(Body::empty())
400 .unwrap()
401}
402
403pub async fn head_blob(
405 State(state): State<AppState>,
406 Path(id): Path<String>,
407 connect_info: axum::extract::ConnectInfo<std::net::SocketAddr>,
408) -> impl IntoResponse {
409 let is_localhost = connect_info.0.ip().is_loopback();
410 let (hash_part, ext) = parse_hash_and_extension(&id);
411
412 if !is_valid_sha256(hash_part) {
413 return Response::builder()
414 .status(StatusCode::BAD_REQUEST)
415 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
416 .header("X-Reason", "Invalid SHA256 hash")
417 .body(Body::empty())
418 .unwrap();
419 }
420
421 let sha256_hex = hash_part.to_lowercase();
422 let sha256_bytes: [u8; 32] = match from_hex(&sha256_hex) {
423 Ok(b) => b,
424 Err(_) => {
425 return Response::builder()
426 .status(StatusCode::BAD_REQUEST)
427 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
428 .header("X-Reason", "Invalid SHA256 format")
429 .body(Body::empty())
430 .unwrap()
431 }
432 };
433
434 let blob_size = if let Some(cached) = state.blob_cache.get_size(&sha256_hex) {
439 Ok(Ok(cached))
440 } else {
441 let permit = match acquire_blob_read().await {
442 Ok(permit) => permit,
443 Err(_) => {
444 return Response::builder()
445 .status(StatusCode::SERVICE_UNAVAILABLE)
446 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
447 .header(header::CACHE_CONTROL, NOT_FOUND_CACHE_CONTROL)
448 .header("Retry-After", "1")
449 .header("X-Reason", BLOB_READ_BUSY)
450 .body(Body::empty())
451 .unwrap()
452 }
453 };
454 let store = state.store.clone();
455 let size_read = tokio::task::spawn_blocking(move || store.blob_size(&sha256_bytes));
456 let timed = tokio::time::timeout(blob_read_timeout(), size_read).await;
457 drop(permit);
458 let result = match timed {
459 Ok(result) => result.map_err(|_| ()),
460 Err(_) => Err(()),
461 };
462 if let Ok(Ok(size)) = result {
463 state.blob_cache.put_size(sha256_hex.clone(), size);
464 }
465 result
466 };
467
468 match blob_size {
469 Ok(Ok(Some(size))) => {
470 let mime_type = ext
471 .map(|e| get_mime_type(&format!("file{}", e)))
472 .unwrap_or("application/octet-stream");
473
474 let mut builder = Response::builder()
475 .status(StatusCode::OK)
476 .header(header::CONTENT_TYPE, mime_type)
477 .header(header::CONTENT_LENGTH, size)
478 .header(header::ACCEPT_RANGES, "bytes")
479 .header(header::CACHE_CONTROL, IMMUTABLE_CACHE_CONTROL)
480 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*");
481 if is_localhost {
482 builder = builder.header("X-Source", "local");
483 }
484 builder.body(Body::empty()).unwrap()
485 }
486 Ok(Ok(None)) => Response::builder()
487 .status(StatusCode::NOT_FOUND)
488 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
489 .header(header::CACHE_CONTROL, IMMUTABLE_NOT_FOUND_CACHE_CONTROL)
490 .header("X-Reason", "Blob not found")
491 .body(Body::empty())
492 .unwrap(),
493 _ => Response::builder()
494 .status(StatusCode::INTERNAL_SERVER_ERROR)
495 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
496 .body(Body::empty())
497 .unwrap(),
498 }
499}
500
501async fn store_blossom_blob_without_blocking_runtime(
502 state: &AppState,
503 data: axum::body::Bytes,
504 pubkey: [u8; 32],
505 track_ownership: bool,
506) -> anyhow::Result<()> {
507 let mut hasher = Sha256::new();
508 hasher.update(&data);
509 let hash_hex = hex::encode(hasher.finalize());
510 let data_for_cache = data.clone();
511 let permit = acquire_blob_write()
512 .await
513 .map_err(|err| anyhow::anyhow!(err))?;
514 let store = state.store.clone();
515 tokio::task::spawn_blocking(move || {
516 let _permit = permit;
517 if track_ownership {
518 store.put_owned_blob(&data, &pubkey)?;
519 } else {
520 store.put_cached_blob(&data)?;
521 }
522 Ok::<(), anyhow::Error>(())
523 })
524 .await
525 .map_err(|err| anyhow::anyhow!("blob write task failed: {}", err))??;
526 state
527 .blob_cache
528 .put_size(hash_hex.clone(), Some(data_for_cache.len() as u64));
529 state.blob_cache.put_body(hash_hex, &data_for_cache);
530 Ok(())
531}
532
533fn upload_descriptor_response(status: StatusCode, descriptor: &BlobDescriptor) -> Response<Body> {
534 Response::builder()
535 .status(status)
536 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
537 .header(header::CONTENT_TYPE, "application/json")
538 .body(Body::from(serde_json::to_string(descriptor).unwrap()))
539 .unwrap()
540}
541
542fn optimistic_upload_queue_timeout() -> Duration {
543 let millis = std::env::var(OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS_ENV)
544 .ok()
545 .and_then(|value| value.parse::<u64>().ok())
546 .filter(|value| *value > 0)
547 .unwrap_or(DEFAULT_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS);
548 Duration::from_millis(millis)
549}
550
551fn optimistic_upload_inflight() -> &'static Mutex<HashSet<String>> {
552 static INFLIGHT: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
553 INFLIGHT.get_or_init(|| Mutex::new(HashSet::new()))
554}
555
556fn optimistic_upload_is_inflight(hash_hex: &str) -> bool {
557 optimistic_upload_inflight()
558 .lock()
559 .is_ok_and(|inflight| inflight.contains(hash_hex))
560}
561
562fn mark_optimistic_upload_inflight(hash_hex: &str) -> bool {
563 optimistic_upload_inflight()
564 .lock()
565 .map(|mut inflight| inflight.insert(hash_hex.to_string()))
566 .unwrap_or(true)
567}
568
569fn clear_optimistic_upload_inflight(hash_hex: &str) {
570 if let Ok(mut inflight) = optimistic_upload_inflight().lock() {
571 inflight.remove(hash_hex);
572 }
573}
574
575pub(super) fn optimistic_upload_queue_snapshot(state: &AppState) -> OptimisticUploadQueueSnapshot {
576 let max_bytes = state.optimistic_upload_queue_bytes;
577 let available_bytes = state
578 .optimistic_upload_queue
579 .available_permits()
580 .min(max_bytes);
581 let in_flight = optimistic_upload_inflight()
582 .lock()
583 .map(|inflight| inflight.len())
584 .unwrap_or(0);
585
586 OptimisticUploadQueueSnapshot {
587 enabled: state.optimistic_blossom_uploads,
588 max_bytes,
589 available_bytes,
590 reserved_bytes: max_bytes.saturating_sub(available_bytes),
591 in_flight,
592 queue_timeout_ms: duration_millis_u64(optimistic_upload_queue_timeout()),
593 }
594}
595
596fn duration_millis_u64(duration: Duration) -> u64 {
597 duration.as_millis().min(u128::from(u64::MAX)) as u64
598}
599
600async fn acquire_optimistic_upload_queue(
601 state: &AppState,
602 permits: u32,
603) -> Result<tokio::sync::OwnedSemaphorePermit, &'static str> {
604 match tokio::time::timeout(
605 optimistic_upload_queue_timeout(),
606 state
607 .optimistic_upload_queue
608 .clone()
609 .acquire_many_owned(permits),
610 )
611 .await
612 {
613 Ok(Ok(permit)) => Ok(permit),
614 Ok(Err(_)) => Err("Optimistic upload queue is closed"),
615 Err(_) => Err("Optimistic upload queue is full"),
616 }
617}
618
619async fn uploaded_blob_already_exists(
620 state: &AppState,
621 sha256_hash: [u8; 32],
622 sha256_hex: &str,
623) -> Result<bool, String> {
624 if let Some(Some(_)) = state.blob_cache.get_size(sha256_hex) {
625 return Ok(true);
626 }
627
628 let permit = acquire_blob_read().await.map_err(str::to_string)?;
629 let store = state.store.clone();
630 let size_read = tokio::task::spawn_blocking(move || {
631 store
632 .blob_size(&sha256_hash)
633 .map_err(|error| error.to_string())
634 });
635
636 let result = tokio::time::timeout(blob_read_timeout(), size_read).await;
637 drop(permit);
638 match result {
639 Ok(Ok(Ok(size))) => {
640 state.blob_cache.put_size(sha256_hex.to_string(), size);
641 Ok(size.is_some())
642 }
643 Ok(Ok(Err(error))) => Err(error),
644 Ok(Err(error)) => Err(format!("blob existence task failed: {}", error)),
645 Err(_) => Err("blob existence check timed out".to_string()),
646 }
647}
648
649async fn set_existing_blob_owner_without_body_write(
650 state: AppState,
651 sha256_hash: [u8; 32],
652 pubkey: [u8; 32],
653) -> anyhow::Result<()> {
654 tokio::task::spawn_blocking(move || state.store.set_blob_owner(&sha256_hash, &pubkey))
655 .await
656 .map_err(|error| anyhow::anyhow!("blob owner task failed: {}", error))??;
657 Ok(())
658}
659
660pub async fn upload_blob(
662 State(state): State<AppState>,
663 headers: HeaderMap,
664 body: axum::body::Bytes,
665) -> impl IntoResponse {
666 let max_size = state.max_upload_bytes;
668 if body.len() > max_size {
669 return Response::builder()
670 .status(StatusCode::PAYLOAD_TOO_LARGE)
671 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
672 .header(header::CONTENT_TYPE, "application/json")
673 .body(Body::from(format!(
674 r#"{{"error":"Upload size {} bytes exceeds maximum {} bytes ({} MB)"}}"#,
675 body.len(),
676 max_size,
677 max_size / 1024 / 1024
678 )))
679 .unwrap();
680 }
681
682 let auth = match verify_blossom_auth(&headers, "upload", None) {
684 Ok(a) => a,
685 Err((status, reason)) => {
686 return Response::builder()
687 .status(status)
688 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
689 .header("X-Reason", reason)
690 .header(header::CONTENT_TYPE, "application/json")
691 .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
692 .unwrap();
693 }
694 };
695
696 let content_type = headers
698 .get(header::CONTENT_TYPE)
699 .and_then(|v| v.to_str().ok())
700 .unwrap_or("application/octet-stream")
701 .to_string();
702
703 let is_allowed_author = is_allowed_write_author(&state, &auth.pubkey);
705 let can_upload = can_accept_upload_author(&state, &auth.pubkey);
706 if !can_upload {
707 let _ = check_write_access(&state, &auth.pubkey);
708 return Response::builder()
709 .status(StatusCode::FORBIDDEN)
710 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
711 .header(header::CONTENT_TYPE, "application/json")
712 .body(Body::from(r#"{"error":"Write access denied. Your pubkey is not in the allowed list and public writes are disabled."}"#))
713 .unwrap();
714 }
715
716 if let Err((status, reason)) = validate_upload_payload(
717 &body,
718 &content_type,
719 can_upload,
720 state.require_random_untrusted_ingest,
721 ) {
722 return blossom_json_error(status, reason);
723 }
724
725 let mut hasher = Sha256::new();
727 hasher.update(&body);
728 let sha256_hash: [u8; 32] = hasher.finalize().into();
729 let sha256_hex = hex::encode(sha256_hash);
730
731 if !auth.blob_hashes.is_empty() && !auth.blob_hashes.contains(&sha256_hex) {
733 return Response::builder()
734 .status(StatusCode::FORBIDDEN)
735 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
736 .header(
737 "X-Reason",
738 "Uploaded blob hash does not match authorized hash",
739 )
740 .header(header::CONTENT_TYPE, "application/json")
741 .body(Body::from(r#"{"error":"Hash mismatch"}"#))
742 .unwrap();
743 }
744
745 let pubkey_bytes = match from_hex(&auth.pubkey) {
747 Ok(b) => b,
748 Err(_) => {
749 return Response::builder()
750 .status(StatusCode::BAD_REQUEST)
751 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
752 .header("X-Reason", "Invalid pubkey format")
753 .body(Body::empty())
754 .unwrap();
755 }
756 };
757
758 let size = body.len() as u64;
759 let now = SystemTime::now()
760 .duration_since(UNIX_EPOCH)
761 .unwrap()
762 .as_secs();
763 let ext = mime_to_extension(&content_type_base(&content_type));
764 let descriptor = BlobDescriptor {
765 url: format!("/{}{}", sha256_hex, ext),
766 sha256: sha256_hex.clone(),
767 size,
768 mime_type: content_type,
769 uploaded: now,
770 };
771
772 if state.optimistic_blossom_uploads && optimistic_upload_is_inflight(&sha256_hex) {
773 return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
774 }
775
776 if state.optimistic_blossom_uploads {
779 let queued_bytes = body.len().max(OPTIMISTIC_UPLOAD_MIN_QUEUE_CHARGE_BYTES);
780 if queued_bytes <= state.optimistic_upload_queue_bytes {
781 let permits = queued_bytes as u32;
782 let marked_inflight = mark_optimistic_upload_inflight(&sha256_hex);
783 if !marked_inflight {
784 return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
785 }
786 let permit = match acquire_optimistic_upload_queue(&state, permits).await {
787 Ok(permit) => permit,
788 Err(_) => {
789 clear_optimistic_upload_inflight(&sha256_hex);
790 return blossom_retryable_json_error(
791 StatusCode::SERVICE_UNAVAILABLE,
792 "Optimistic upload queue is full",
793 2,
794 );
795 }
796 };
797 let state_for_write = state.clone();
798 let hash_for_log = sha256_hex.clone();
799 tokio::spawn(async move {
800 let _permit = permit;
801 if let Err(error) = store_blossom_blob_without_blocking_runtime(
802 &state_for_write,
803 body,
804 pubkey_bytes,
805 is_allowed_author,
806 )
807 .await
808 {
809 tracing::error!(
810 "Background Blossom storage failed for {}: {:#}",
811 hash_for_log,
812 error
813 );
814 }
815 clear_optimistic_upload_inflight(&hash_for_log);
816 });
817
818 return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
819 }
820
821 tracing::warn!(
822 "Blossom upload {} is larger than optimistic queue budget {}; storing synchronously",
823 queued_bytes,
824 state.optimistic_upload_queue_bytes
825 );
826 }
827
828 match uploaded_blob_already_exists(&state, sha256_hash, &sha256_hex).await {
829 Ok(true) => {
830 if is_allowed_author {
831 if let Err(error) = set_existing_blob_owner_without_body_write(
832 state.clone(),
833 sha256_hash,
834 pubkey_bytes,
835 )
836 .await
837 {
838 return Response::builder()
839 .status(StatusCode::INTERNAL_SERVER_ERROR)
840 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
841 .header("X-Reason", "Storage error")
842 .header(header::CONTENT_TYPE, "application/json")
843 .body(Body::from(format!(r#"{{"error":"{}"}}"#, error)))
844 .unwrap();
845 }
846 }
847 return upload_descriptor_response(StatusCode::CONFLICT, &descriptor);
848 }
849 Ok(false) => {}
850 Err(error) => {
851 tracing::debug!(
852 "Could not preflight Blossom upload {} before synchronous storage: {}",
853 sha256_hex,
854 error
855 );
856 }
857 }
858
859 let store_result = store_blossom_blob_without_blocking_runtime(
860 &state,
861 body.clone(),
862 pubkey_bytes,
863 is_allowed_author,
864 )
865 .await;
866
867 match store_result {
868 Ok(()) => upload_descriptor_response(StatusCode::OK, &descriptor),
869 Err(e) => Response::builder()
870 .status(StatusCode::INTERNAL_SERVER_ERROR)
871 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
872 .header("X-Reason", "Storage error")
873 .header(header::CONTENT_TYPE, "application/json")
874 .body(Body::from(format!(r#"{{"error":"{}"}}"#, e)))
875 .unwrap(),
876 }
877}
878
879pub async fn delete_blob(
882 State(state): State<AppState>,
883 Path(id): Path<String>,
884 headers: HeaderMap,
885) -> impl IntoResponse {
886 let (hash_part, _) = parse_hash_and_extension(&id);
887
888 if !is_valid_sha256(hash_part) {
889 return Response::builder()
890 .status(StatusCode::BAD_REQUEST)
891 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
892 .header("X-Reason", "Invalid SHA256 hash")
893 .body(Body::empty())
894 .unwrap();
895 }
896
897 let sha256_hex = hash_part.to_lowercase();
898
899 let sha256_bytes = match from_hex(&sha256_hex) {
901 Ok(b) => b,
902 Err(_) => {
903 return Response::builder()
904 .status(StatusCode::BAD_REQUEST)
905 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
906 .header("X-Reason", "Invalid SHA256 hash format")
907 .body(Body::empty())
908 .unwrap();
909 }
910 };
911
912 let auth = match verify_blossom_auth(&headers, "delete", Some(&sha256_hex)) {
914 Ok(a) => a,
915 Err((status, reason)) => {
916 return Response::builder()
917 .status(status)
918 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
919 .header("X-Reason", reason)
920 .body(Body::empty())
921 .unwrap();
922 }
923 };
924
925 let pubkey_bytes = match from_hex(&auth.pubkey) {
927 Ok(b) => b,
928 Err(_) => {
929 return Response::builder()
930 .status(StatusCode::BAD_REQUEST)
931 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
932 .header("X-Reason", "Invalid pubkey format")
933 .body(Body::empty())
934 .unwrap();
935 }
936 };
937
938 match state.store.is_blob_owner(&sha256_bytes, &pubkey_bytes) {
940 Ok(true) => {
941 }
943 Ok(false) => {
944 match state.store.blob_has_owners(&sha256_bytes) {
946 Ok(true) => {
947 return Response::builder()
948 .status(StatusCode::FORBIDDEN)
949 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
950 .header("X-Reason", "Not a blob owner")
951 .body(Body::empty())
952 .unwrap();
953 }
954 Ok(false) => {
955 return Response::builder()
956 .status(StatusCode::NOT_FOUND)
957 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
958 .header(header::CACHE_CONTROL, NOT_FOUND_CACHE_CONTROL)
959 .header("X-Reason", "Blob not found")
960 .body(Body::empty())
961 .unwrap();
962 }
963 Err(_) => {
964 return Response::builder()
965 .status(StatusCode::INTERNAL_SERVER_ERROR)
966 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
967 .body(Body::empty())
968 .unwrap();
969 }
970 }
971 }
972 Err(_) => {
973 return Response::builder()
974 .status(StatusCode::INTERNAL_SERVER_ERROR)
975 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
976 .body(Body::empty())
977 .unwrap();
978 }
979 }
980
981 match state
983 .store
984 .delete_blossom_blob(&sha256_bytes, &pubkey_bytes)
985 {
986 Ok(fully_deleted) => {
987 Response::builder()
990 .status(StatusCode::OK)
991 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
992 .header(
993 "X-Blob-Deleted",
994 if fully_deleted { "true" } else { "false" },
995 )
996 .body(Body::empty())
997 .unwrap()
998 }
999 Err(_) => Response::builder()
1000 .status(StatusCode::INTERNAL_SERVER_ERROR)
1001 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1002 .body(Body::empty())
1003 .unwrap(),
1004 }
1005}
1006
1007pub async fn list_blobs(
1009 State(state): State<AppState>,
1010 Path(pubkey): Path<String>,
1011 Query(query): Query<ListQuery>,
1012 headers: HeaderMap,
1013) -> impl IntoResponse {
1014 if pubkey.len() != 64 || !pubkey.chars().all(|c| c.is_ascii_hexdigit()) {
1016 return Response::builder()
1017 .status(StatusCode::BAD_REQUEST)
1018 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1019 .header("X-Reason", "Invalid pubkey format")
1020 .header(header::CONTENT_TYPE, "application/json")
1021 .body(Body::from("[]"))
1022 .unwrap();
1023 }
1024
1025 let pubkey_hex = pubkey.to_lowercase();
1026 let pubkey_bytes: [u8; 32] = match from_hex(&pubkey_hex) {
1027 Ok(b) => b,
1028 Err(_) => {
1029 return Response::builder()
1030 .status(StatusCode::BAD_REQUEST)
1031 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1032 .header("X-Reason", "Invalid pubkey format")
1033 .header(header::CONTENT_TYPE, "application/json")
1034 .body(Body::from("[]"))
1035 .unwrap()
1036 }
1037 };
1038
1039 let auth = match verify_blossom_auth(&headers, "list", None) {
1040 Ok(auth) => auth,
1041 Err((status, reason)) => {
1042 return Response::builder()
1043 .status(status)
1044 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1045 .header("X-Reason", reason)
1046 .header(header::CONTENT_TYPE, "application/json")
1047 .body(Body::from("[]"))
1048 .unwrap();
1049 }
1050 };
1051
1052 if !auth.pubkey.eq_ignore_ascii_case(&pubkey_hex) {
1053 return Response::builder()
1054 .status(StatusCode::FORBIDDEN)
1055 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1056 .header("X-Reason", "Pubkey mismatch")
1057 .header(header::CONTENT_TYPE, "application/json")
1058 .body(Body::from("[]"))
1059 .unwrap();
1060 }
1061
1062 match state.store.list_blobs_by_pubkey(&pubkey_bytes) {
1064 Ok(blobs) => {
1065 let mut filtered: Vec<_> = blobs
1067 .into_iter()
1068 .filter(|b| {
1069 if let Some(since) = query.since {
1070 if b.uploaded < since {
1071 return false;
1072 }
1073 }
1074 if let Some(until) = query.until {
1075 if b.uploaded > until {
1076 return false;
1077 }
1078 }
1079 true
1080 })
1081 .collect();
1082
1083 filtered.sort_by(|a, b| b.uploaded.cmp(&a.uploaded));
1085
1086 let limit = query.limit.unwrap_or(100).min(1000);
1088 filtered.truncate(limit);
1089
1090 Response::builder()
1091 .status(StatusCode::OK)
1092 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1093 .header(header::CONTENT_TYPE, "application/json")
1094 .body(Body::from(serde_json::to_string(&filtered).unwrap()))
1095 .unwrap()
1096 }
1097 Err(_) => Response::builder()
1098 .status(StatusCode::INTERNAL_SERVER_ERROR)
1099 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1100 .header(header::CONTENT_TYPE, "application/json")
1101 .body(Body::from("[]"))
1102 .unwrap(),
1103 }
1104}
1105
1106fn parse_hash_and_extension(id: &str) -> (&str, Option<&str>) {
1109 if let Some(dot_pos) = id.rfind('.') {
1110 (&id[..dot_pos], Some(&id[dot_pos..]))
1111 } else {
1112 (id, None)
1113 }
1114}
1115
1116fn is_valid_sha256(s: &str) -> bool {
1117 s.len() == 64 && s.chars().all(|c| c.is_ascii_hexdigit())
1118}
1119
1120#[cfg(test)]
1121fn store_blossom_blob(
1122 state: &AppState,
1123 data: &[u8],
1124 _sha256: &[u8; 32],
1125 pubkey: &[u8; 32],
1126 track_ownership: bool,
1127) -> anyhow::Result<()> {
1128 if track_ownership {
1129 state.store.put_owned_blob(data, pubkey)?;
1130 } else {
1131 state.store.put_cached_blob(data)?;
1132 }
1133
1134 Ok(())
1135}
1136
1137fn mime_to_extension(mime: &str) -> &'static str {
1138 match mime {
1139 "image/png" => ".png",
1140 "image/jpeg" => ".jpg",
1141 "image/gif" => ".gif",
1142 "image/webp" => ".webp",
1143 "image/svg+xml" => ".svg",
1144 "video/mp4" => ".mp4",
1145 "video/webm" => ".webm",
1146 "audio/mpeg" => ".mp3",
1147 "audio/ogg" => ".ogg",
1148 "application/pdf" => ".pdf",
1149 "text/plain" => ".txt",
1150 "text/html" => ".html",
1151 "application/json" => ".json",
1152 _ => "",
1153 }
1154}
1155
1156#[cfg(test)]
1157mod tests {
1158 use super::*;
1159 use crate::server::auth::WsRelayState;
1160 use crate::storage::HashtreeStore;
1161 use axum::response::IntoResponse;
1162 use base64::Engine;
1163 use hashtree_core::sha256;
1164 use std::collections::HashSet;
1165 use std::sync::{Arc, Mutex as StdMutex};
1166 use std::time::Duration;
1167 use tempfile::TempDir;
1168
1169 fn test_app_state(store: Arc<HashtreeStore>) -> AppState {
1170 AppState {
1171 store,
1172 auth: None,
1173 daemon_started_at: 1_700_000_000,
1174 peer_mode: crate::config::ServerMode::Normal,
1175 hash_get_enabled: true,
1176 http_webrtc_fetch: true,
1177 webrtc_peers: None,
1178 fips_transport: None,
1179 fetch_from_fips_peers: true,
1180 ws_relay: Arc::new(WsRelayState::new()),
1181 max_upload_bytes: 5 * 1024 * 1024,
1182 public_writes: true,
1183 require_random_untrusted_ingest: true,
1184 optimistic_blossom_uploads: false,
1185 optimistic_upload_queue_bytes: 512 * 1024 * 1024,
1186 optimistic_upload_queue: Arc::new(tokio::sync::Semaphore::new(512 * 1024 * 1024)),
1187 allowed_pubkeys: HashSet::new(),
1188 upstream_blossom: Vec::new(),
1189 social_graph: None,
1190 social_graph_store: None,
1191 social_graph_root: None,
1192 socialgraph_snapshot_public: false,
1193 nostr_relay: None,
1194 nostr_relay_urls: Vec::new(),
1195 tree_root_cache: Arc::new(StdMutex::new(std::collections::HashMap::new())),
1196 inflight_blob_fetches: Arc::new(tokio::sync::Mutex::new(
1197 std::collections::HashMap::new(),
1198 )),
1199 inflight_blob_reads: Arc::new(
1200 tokio::sync::Mutex::new(std::collections::HashMap::new()),
1201 ),
1202 blob_cache: Arc::new(crate::blob_cache::BlobCache::for_tests()),
1203 directory_listing_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1204 resolved_path_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1205 thumbnail_path_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1206 cid_size_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1207 }
1208 }
1209
1210 fn create_upload_auth_header(keys: &nostr::Keys) -> String {
1211 use nostr::{EventBuilder, Kind, Tag, TagKind, Timestamp};
1212
1213 let now = Timestamp::now();
1214 let event = EventBuilder::new(
1215 Kind::Custom(BLOSSOM_AUTH_KIND),
1216 "",
1217 vec![
1218 Tag::custom(TagKind::Custom("t".into()), vec!["upload".to_string()]),
1219 Tag::custom(
1220 TagKind::Custom("expiration".into()),
1221 vec![(now.as_u64() + 300).to_string()],
1222 ),
1223 ],
1224 )
1225 .custom_created_at(now)
1226 .to_event(keys)
1227 .expect("sign blossom auth");
1228 let json = serde_json::to_vec(&event).expect("serialize auth event");
1229 format!(
1230 "Nostr {}",
1231 base64::engine::general_purpose::STANDARD.encode(json)
1232 )
1233 }
1234
1235 #[test]
1236 fn test_is_valid_sha256() {
1237 assert!(is_valid_sha256(
1238 "e2bab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6"
1239 ));
1240 assert!(is_valid_sha256(
1241 "0000000000000000000000000000000000000000000000000000000000000000"
1242 ));
1243
1244 assert!(!is_valid_sha256("e2bab35b5296ec2242ded0a01f6d6723"));
1246 assert!(!is_valid_sha256(
1248 "e2bab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6aa"
1249 ));
1250 assert!(!is_valid_sha256(
1252 "zzbab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6"
1253 ));
1254 assert!(!is_valid_sha256(""));
1256 }
1257
1258 #[test]
1259 fn test_parse_hash_and_extension() {
1260 let (hash, ext) = parse_hash_and_extension("abc123.png");
1261 assert_eq!(hash, "abc123");
1262 assert_eq!(ext, Some(".png"));
1263
1264 let (hash2, ext2) = parse_hash_and_extension("abc123");
1265 assert_eq!(hash2, "abc123");
1266 assert_eq!(ext2, None);
1267
1268 let (hash3, ext3) = parse_hash_and_extension("abc.123.jpg");
1269 assert_eq!(hash3, "abc.123");
1270 assert_eq!(ext3, Some(".jpg"));
1271 }
1272
1273 #[test]
1274 fn test_mime_to_extension() {
1275 assert_eq!(mime_to_extension("image/png"), ".png");
1276 assert_eq!(mime_to_extension("image/jpeg"), ".jpg");
1277 assert_eq!(mime_to_extension("video/mp4"), ".mp4");
1278 assert_eq!(mime_to_extension("application/octet-stream"), "");
1279 assert_eq!(mime_to_extension("unknown/type"), "");
1280 }
1281
1282 #[tokio::test]
1283 async fn optimistic_uploads_return_accepted_and_store_in_background() {
1284 let temp_dir = TempDir::new().expect("temp dir");
1285 let store = Arc::new(
1286 HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1287 );
1288 let mut state = test_app_state(Arc::clone(&store));
1289 state.optimistic_blossom_uploads = true;
1290
1291 let keys = nostr::Keys::generate();
1292 let mut headers = HeaderMap::new();
1293 headers.insert(
1294 header::AUTHORIZATION,
1295 create_upload_auth_header(&keys)
1296 .parse()
1297 .expect("auth header value"),
1298 );
1299 headers.insert(
1300 header::CONTENT_TYPE,
1301 "application/octet-stream"
1302 .parse()
1303 .expect("content type header value"),
1304 );
1305
1306 let body = axum::body::Bytes::from((0u8..=255).map(|byte| byte ^ 0x55).collect::<Vec<_>>());
1307 let hash = sha256(&body);
1308 let response = upload_blob(State(state), headers, body)
1309 .await
1310 .into_response();
1311 assert_eq!(response.status(), StatusCode::ACCEPTED);
1312
1313 for _ in 0..50 {
1314 if store.blob_exists(&hash).expect("blob exists check") {
1315 return;
1316 }
1317 tokio::time::sleep(Duration::from_millis(10)).await;
1318 }
1319
1320 panic!("optimistic upload was not stored in the background");
1321 }
1322
1323 #[tokio::test]
1324 async fn optimistic_upload_existing_blob_skips_queue() {
1325 let temp_dir = TempDir::new().expect("temp dir");
1326 let store = Arc::new(
1327 HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1328 );
1329 let body = axum::body::Bytes::from(
1330 (0u16..=255)
1331 .map(|value| ((value * 73 + 19) % 256) as u8)
1332 .collect::<Vec<_>>(),
1333 );
1334 store.put_cached_blob(&body).expect("seed blob");
1335
1336 let mut state = test_app_state(Arc::clone(&store));
1337 state.optimistic_blossom_uploads = true;
1338 state.optimistic_upload_queue_bytes = 1;
1339 state.optimistic_upload_queue = Arc::new(tokio::sync::Semaphore::new(1));
1340
1341 let keys = nostr::Keys::generate();
1342 let mut headers = HeaderMap::new();
1343 headers.insert(
1344 header::AUTHORIZATION,
1345 create_upload_auth_header(&keys)
1346 .parse()
1347 .expect("auth header value"),
1348 );
1349 headers.insert(
1350 header::CONTENT_TYPE,
1351 "application/octet-stream"
1352 .parse()
1353 .expect("content type header value"),
1354 );
1355
1356 let response = upload_blob(State(state), headers, body)
1357 .await
1358 .into_response();
1359 assert_eq!(response.status(), StatusCode::CONFLICT);
1360 }
1361
1362 #[tokio::test]
1363 async fn optimistic_upload_existing_blob_uses_queue_before_preflight_when_queue_has_room() {
1364 let temp_dir = TempDir::new().expect("temp dir");
1365 let store = Arc::new(
1366 HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1367 );
1368 let body = axum::body::Bytes::from((0u8..=255).rev().collect::<Vec<_>>());
1369 let hash_hex = hex::encode(sha256(&body));
1370 store.put_cached_blob(&body).expect("seed blob");
1371
1372 let mut state = test_app_state(store);
1373 state.optimistic_blossom_uploads = true;
1374
1375 let keys = nostr::Keys::generate();
1376 let mut headers = HeaderMap::new();
1377 headers.insert(
1378 header::AUTHORIZATION,
1379 create_upload_auth_header(&keys)
1380 .parse()
1381 .expect("auth header value"),
1382 );
1383 headers.insert(
1384 header::CONTENT_TYPE,
1385 "application/octet-stream"
1386 .parse()
1387 .expect("content type header value"),
1388 );
1389
1390 let response = upload_blob(State(state), headers, body)
1391 .await
1392 .into_response();
1393 assert_eq!(response.status(), StatusCode::ACCEPTED);
1394
1395 for _ in 0..50 {
1396 if !optimistic_upload_is_inflight(&hash_hex) {
1397 return;
1398 }
1399 tokio::time::sleep(Duration::from_millis(10)).await;
1400 }
1401
1402 clear_optimistic_upload_inflight(&hash_hex);
1403 panic!("optimistic upload in-flight marker was not cleared");
1404 }
1405
1406 #[tokio::test]
1407 async fn optimistic_upload_inflight_duplicate_skips_queue() {
1408 let temp_dir = TempDir::new().expect("temp dir");
1409 let store = Arc::new(
1410 HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1411 );
1412 let body = axum::body::Bytes::from((0u8..=255).collect::<Vec<_>>());
1413 let hash_hex = hex::encode(sha256(&body));
1414 assert!(mark_optimistic_upload_inflight(&hash_hex));
1415
1416 let mut state = test_app_state(store);
1417 state.optimistic_blossom_uploads = true;
1418 state.optimistic_upload_queue_bytes = 1;
1419 state.optimistic_upload_queue = Arc::new(tokio::sync::Semaphore::new(1));
1420
1421 let keys = nostr::Keys::generate();
1422 let mut headers = HeaderMap::new();
1423 headers.insert(
1424 header::AUTHORIZATION,
1425 create_upload_auth_header(&keys)
1426 .parse()
1427 .expect("auth header value"),
1428 );
1429 headers.insert(
1430 header::CONTENT_TYPE,
1431 "application/octet-stream"
1432 .parse()
1433 .expect("content type header value"),
1434 );
1435
1436 let response = upload_blob(State(state), headers, body)
1437 .await
1438 .into_response();
1439 clear_optimistic_upload_inflight(&hash_hex);
1440 assert_eq!(response.status(), StatusCode::ACCEPTED);
1441 }
1442
1443 #[test]
1444 fn public_writes_accept_unlisted_authors_for_uploads() {
1445 let temp_dir = TempDir::new().expect("temp dir");
1446 let store =
1447 Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 700).expect("store"));
1448 let mut state = test_app_state(store);
1449 let pubkey = "ea4fe79e57f209309bffed2f92f0b95b59d3d1cb4e8892444398aeea7ee317ed";
1450
1451 state.public_writes = true;
1452 assert!(can_accept_upload_author(&state, pubkey));
1453 assert!(!is_allowed_write_author(&state, pubkey));
1454
1455 state.public_writes = false;
1456 assert!(!can_accept_upload_author(&state, pubkey));
1457 }
1458
1459 #[test]
1460 fn public_write_trust_allows_octet_stream_and_raw_media_payloads() {
1461 let encrypted_block: Vec<u8> = (0..=255).collect();
1462
1463 assert_eq!(
1464 validate_upload_payload(&encrypted_block, "application/octet-stream", false, true,),
1465 Ok(())
1466 );
1467
1468 assert_eq!(
1469 validate_upload_payload(b"audio bytes", "audio/mpeg", true, true,),
1470 Ok(())
1471 );
1472
1473 assert_eq!(
1474 validate_upload_payload(b"audio bytes", "audio/mpeg", false, true,),
1475 Err((
1476 StatusCode::FORBIDDEN,
1477 "Raw media uploads require write access".to_string(),
1478 ))
1479 );
1480 }
1481
1482 #[test]
1483 fn unowned_public_uploads_use_cache_storage_semantics() {
1484 let temp_dir = TempDir::new().expect("temp dir");
1485 let store =
1486 Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 700).expect("store"));
1487 let state = test_app_state(Arc::clone(&store));
1488
1489 let owned = vec![1u8; 280];
1490 let owned_hash = sha256(&owned);
1491 store_blossom_blob(&state, &owned, &owned_hash, &[2u8; 32], true).expect("owned upload");
1492
1493 let public_upload = vec![3u8; 280];
1494 let public_hash = sha256(&public_upload);
1495 store_blossom_blob(&state, &public_upload, &public_hash, &[4u8; 32], false)
1496 .expect("public upload");
1497
1498 let replacement = vec![5u8; 280];
1499 let replacement_hash = sha256(&replacement);
1500 state
1501 .store
1502 .put_cached_blob(&replacement)
1503 .expect("replacement cached blob");
1504
1505 assert!(state.store.blob_exists(&owned_hash).expect("owned exists"));
1506 assert!(!state
1507 .store
1508 .blob_exists(&public_hash)
1509 .expect("public upload evicted"));
1510 assert!(state
1511 .store
1512 .blob_exists(&replacement_hash)
1513 .expect("replacement exists"));
1514 assert!(state
1515 .store
1516 .is_blob_owner(&owned_hash, &[2u8; 32])
1517 .expect("owned tracked"));
1518 assert!(!state
1519 .store
1520 .blob_has_owners(&public_hash)
1521 .expect("public upload unowned"));
1522 }
1523
1524 #[test]
1525 fn owned_blossom_uploads_are_rejected_when_storage_limit_is_full() {
1526 let temp_dir = TempDir::new().expect("temp dir");
1527 let store =
1528 Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 500).expect("store"));
1529 let state = test_app_state(Arc::clone(&store));
1530
1531 let first = vec![1u8; 300];
1532 let first_hash = sha256(&first);
1533 let owner = [2u8; 32];
1534 store_blossom_blob(&state, &first, &first_hash, &owner, true).expect("first upload");
1535
1536 let second = vec![3u8; 300];
1537 let second_hash = sha256(&second);
1538 let error = store_blossom_blob(&state, &second, &second_hash, &owner, true)
1539 .expect_err("second owned upload should exceed the storage limit");
1540
1541 assert!(
1542 error.to_string().contains("storage limit"),
1543 "unexpected error: {error}"
1544 );
1545 assert!(state
1546 .store
1547 .blob_exists(&first_hash)
1548 .expect("first blob remains"));
1549 assert!(!state
1550 .store
1551 .blob_exists(&second_hash)
1552 .expect("second blob rejected"));
1553 assert!(state
1554 .store
1555 .is_blob_owner(&first_hash, &owner)
1556 .expect("first owner tracked"));
1557 assert!(!state
1558 .store
1559 .is_blob_owner(&second_hash, &owner)
1560 .expect("second owner not tracked"));
1561 }
1562}