1use axum::{
7 body::Body,
8 extract::{Path, Query, State},
9 http::{header, HeaderMap, Response, StatusCode},
10 response::IntoResponse,
11 Json,
12};
13use base64::Engine;
14use hashtree_core::from_hex;
15use serde::{Deserialize, Serialize};
16use sha2::{Digest, Sha256};
17use std::collections::HashSet;
18use std::sync::{Mutex, OnceLock};
19use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
20
21use super::auth::AppState;
22use super::blob_read::{acquire_blob_read, acquire_blob_write, blob_read_timeout, BLOB_READ_BUSY};
23use super::ingest_filter::{
24 content_type_base, is_chk_content_type, validate_untrusted_blob, IngestRejection,
25};
26use super::mime::get_mime_type;
27
28const BLOSSOM_AUTH_KIND: u16 = 24242;
30
31const IMMUTABLE_CACHE_CONTROL: &str = "public, max-age=31536000, immutable";
33const NOT_FOUND_CACHE_CONTROL: &str = "no-store";
34const IMMUTABLE_NOT_FOUND_CACHE_CONTROL: &str = "public, max-age=0, s-maxage=5";
35const OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS_ENV: &str = "HTREE_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS";
36const DEFAULT_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS: u64 = 15_000;
37
38pub const DEFAULT_MAX_UPLOAD_SIZE: usize = 5 * 1024 * 1024;
40const OPTIMISTIC_UPLOAD_MIN_QUEUE_CHARGE_BYTES: usize = 256 * 1024;
41const MAX_BATCH_UPLOAD_BLOBS: usize = 1024;
42const MAX_BATCH_UPLOAD_BYTES: usize = 64 * 1024 * 1024;
43const SLOW_BATCH_UPLOAD_LOG_MS_ENV: &str = "HTREE_SLOW_BATCH_UPLOAD_LOG_MS";
44
45fn slow_batch_upload_log_ms() -> Option<u128> {
46 std::env::var(SLOW_BATCH_UPLOAD_LOG_MS_ENV)
47 .ok()
48 .and_then(|value| value.parse::<u128>().ok())
49 .filter(|value| *value > 0)
50}
51
52#[derive(Debug, Clone, Copy)]
53pub(super) struct OptimisticUploadQueueSnapshot {
54 pub enabled: bool,
55 pub max_bytes: usize,
56 pub available_bytes: usize,
57 pub reserved_bytes: usize,
58 pub in_flight: usize,
59 pub queue_timeout_ms: u64,
60}
61
62#[allow(clippy::result_large_err)]
65fn check_write_access(state: &AppState, pubkey: &str) -> Result<(), Response<Body>> {
66 if is_allowed_write_author(state, pubkey) {
68 tracing::debug!(
69 "Blossom write allowed for {}... (allowed writer)",
70 &pubkey[..8.min(pubkey.len())]
71 );
72 return Ok(());
73 }
74
75 tracing::info!(
77 "Blossom write denied for {}... (not in allowed_npubs or social graph)",
78 &pubkey[..8.min(pubkey.len())]
79 );
80 Err(Response::builder()
81 .status(StatusCode::FORBIDDEN)
82 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
83 .header(header::CONTENT_TYPE, "application/json")
84 .body(Body::from(
85 r#"{"error":"Write access denied. Your pubkey is not in the allowed list."}"#,
86 ))
87 .unwrap())
88}
89
90fn is_allowed_write_author(state: &AppState, pubkey: &str) -> bool {
91 if state.allowed_pubkeys.contains(pubkey) {
92 return true;
93 }
94
95 state
96 .social_graph
97 .as_ref()
98 .map(|sg| sg.check_write_access(pubkey))
99 .unwrap_or(false)
100}
101
102fn can_accept_upload_author(state: &AppState, pubkey: &str) -> bool {
103 state.public_writes || is_allowed_write_author(state, pubkey)
104}
105
106fn validate_upload_payload(
107 body: &[u8],
108 content_type: &str,
109 can_upload_author: bool,
110 require_random_untrusted_ingest: bool,
111) -> Result<(), (StatusCode, String)> {
112 let is_chk_upload = is_chk_content_type(content_type);
113
114 if !is_chk_upload && !can_upload_author {
115 return Err((
116 StatusCode::FORBIDDEN,
117 "Raw media uploads require write access".to_string(),
118 ));
119 }
120
121 if is_chk_upload {
122 let require_random = require_random_untrusted_ingest && !can_upload_author;
123 validate_untrusted_blob(body, require_random)
124 .map_err(|IngestRejection { status, reason }| (status, reason))?;
125 }
126
127 Ok(())
128}
129
130fn blossom_json_error(status: StatusCode, reason: impl Into<String>) -> Response<Body> {
131 let reason = reason.into();
132 Response::builder()
133 .status(status)
134 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
135 .header("X-Reason", reason.as_str())
136 .header(header::CONTENT_TYPE, "application/json")
137 .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
138 .unwrap()
139}
140
141fn blossom_retryable_json_error(
142 status: StatusCode,
143 reason: impl Into<String>,
144 retry_after_seconds: u64,
145) -> Response<Body> {
146 let reason = reason.into();
147 Response::builder()
148 .status(status)
149 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
150 .header("X-Reason", reason.as_str())
151 .header(header::RETRY_AFTER, retry_after_seconds.to_string())
152 .header(header::CONTENT_TYPE, "application/json")
153 .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
154 .unwrap()
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize)]
159pub struct BlobDescriptor {
160 pub url: String,
161 pub sha256: String,
162 pub size: u64,
163 #[serde(rename = "type")]
164 pub mime_type: String,
165 pub uploaded: u64,
166}
167
168#[derive(Debug, Deserialize)]
169pub struct BatchUploadBlob {
170 pub sha256: String,
171 #[serde(default, alias = "contentType")]
172 pub content_type: Option<String>,
173 pub data: String,
174}
175
176#[derive(Debug, Deserialize)]
177pub struct BatchUploadRequest {
178 pub blobs: Vec<BatchUploadBlob>,
179}
180
181#[derive(Debug, Serialize)]
182pub struct BatchUploadResponse {
183 pub uploaded: usize,
184 pub blobs: Vec<BlobDescriptor>,
185}
186
187#[derive(Debug, Deserialize)]
189pub struct ListQuery {
190 pub since: Option<u64>,
191 pub until: Option<u64>,
192 pub limit: Option<usize>,
193 pub cursor: Option<String>,
194}
195
196#[derive(Debug)]
198pub struct BlossomAuth {
199 pub pubkey: String,
200 pub kind: u16,
201 pub created_at: u64,
202 pub expiration: Option<u64>,
203 pub action: Option<String>, pub blob_hashes: Vec<String>, pub server: Option<String>, }
207
208pub fn verify_blossom_auth(
211 headers: &HeaderMap,
212 required_action: &str,
213 required_hash: Option<&str>,
214) -> Result<BlossomAuth, (StatusCode, &'static str)> {
215 let auth_header = headers
216 .get(header::AUTHORIZATION)
217 .and_then(|v| v.to_str().ok())
218 .ok_or((StatusCode::UNAUTHORIZED, "Missing Authorization header"))?;
219
220 let nostr_event = auth_header.strip_prefix("Nostr ").ok_or((
221 StatusCode::UNAUTHORIZED,
222 "Invalid auth scheme, expected 'Nostr'",
223 ))?;
224
225 let engine = base64::engine::general_purpose::STANDARD;
227 let event_bytes = engine
228 .decode(nostr_event)
229 .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid base64 in auth header"))?;
230
231 let event_json: serde_json::Value = serde_json::from_slice(&event_bytes)
232 .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid JSON in auth event"))?;
233
234 let kind = event_json["kind"]
236 .as_u64()
237 .ok_or((StatusCode::BAD_REQUEST, "Missing kind in event"))?;
238
239 if kind != BLOSSOM_AUTH_KIND as u64 {
240 return Err((
241 StatusCode::BAD_REQUEST,
242 "Invalid event kind, expected 24242",
243 ));
244 }
245
246 let pubkey = event_json["pubkey"]
247 .as_str()
248 .ok_or((StatusCode::BAD_REQUEST, "Missing pubkey in event"))?
249 .to_string();
250
251 let created_at = event_json["created_at"]
252 .as_u64()
253 .ok_or((StatusCode::BAD_REQUEST, "Missing created_at in event"))?;
254
255 let sig = event_json["sig"]
256 .as_str()
257 .ok_or((StatusCode::BAD_REQUEST, "Missing signature in event"))?;
258
259 if !verify_nostr_signature(&event_json, &pubkey, sig) {
261 return Err((StatusCode::UNAUTHORIZED, "Invalid signature"));
262 }
263
264 let tags = event_json["tags"]
266 .as_array()
267 .ok_or((StatusCode::BAD_REQUEST, "Missing tags in event"))?;
268
269 let mut expiration: Option<u64> = None;
270 let mut action: Option<String> = None;
271 let mut blob_hashes: Vec<String> = Vec::new();
272 let mut server: Option<String> = None;
273
274 for tag in tags {
275 let tag_arr = tag.as_array();
276 if let Some(arr) = tag_arr {
277 if arr.len() >= 2 {
278 let tag_name = arr[0].as_str().unwrap_or("");
279 let tag_value = arr[1].as_str().unwrap_or("");
280
281 match tag_name {
282 "t" => action = Some(tag_value.to_string()),
283 "x" => blob_hashes.push(tag_value.to_lowercase()),
284 "expiration" => expiration = tag_value.parse().ok(),
285 "server" => server = Some(tag_value.to_string()),
286 _ => {}
287 }
288 }
289 }
290 }
291
292 let now = SystemTime::now()
294 .duration_since(UNIX_EPOCH)
295 .unwrap()
296 .as_secs();
297
298 if let Some(exp) = expiration {
299 if exp < now {
300 return Err((StatusCode::UNAUTHORIZED, "Authorization expired"));
301 }
302 }
303
304 if created_at > now + 60 {
306 return Err((StatusCode::BAD_REQUEST, "Event created_at is in the future"));
307 }
308
309 if let Some(ref act) = action {
311 if act != required_action {
312 return Err((StatusCode::FORBIDDEN, "Action mismatch"));
313 }
314 } else {
315 return Err((StatusCode::BAD_REQUEST, "Missing 't' tag for action"));
316 }
317
318 if let Some(hash) = required_hash {
320 if !blob_hashes.is_empty() && !blob_hashes.contains(&hash.to_lowercase()) {
321 return Err((StatusCode::FORBIDDEN, "Blob hash not authorized"));
322 }
323 }
324
325 Ok(BlossomAuth {
326 pubkey,
327 kind: kind as u16,
328 created_at,
329 expiration,
330 action,
331 blob_hashes,
332 server,
333 })
334}
335
336fn verify_nostr_signature(event: &serde_json::Value, pubkey: &str, sig: &str) -> bool {
338 use secp256k1::{schnorr::Signature, Message, Secp256k1, XOnlyPublicKey};
339
340 let content = event["content"].as_str().unwrap_or("");
342 let full_serialized = format!(
343 "[0,\"{}\",{},{},{},\"{}\"]",
344 pubkey,
345 event["created_at"],
346 event["kind"],
347 event["tags"],
348 escape_json_string(content),
349 );
350
351 let mut hasher = Sha256::new();
352 hasher.update(full_serialized.as_bytes());
353 let event_id = hasher.finalize();
354
355 let pubkey_bytes = match hex::decode(pubkey) {
357 Ok(b) => b,
358 Err(_) => return false,
359 };
360
361 let sig_bytes = match hex::decode(sig) {
362 Ok(b) => b,
363 Err(_) => return false,
364 };
365
366 let secp = Secp256k1::verification_only();
367
368 let xonly_pubkey = match XOnlyPublicKey::from_slice(&pubkey_bytes) {
369 Ok(pk) => pk,
370 Err(_) => return false,
371 };
372
373 let signature = match Signature::from_slice(&sig_bytes) {
374 Ok(s) => s,
375 Err(_) => return false,
376 };
377
378 let message = match Message::from_digest_slice(&event_id) {
379 Ok(m) => m,
380 Err(_) => return false,
381 };
382
383 secp.verify_schnorr(&signature, &message, &xonly_pubkey)
384 .is_ok()
385}
386
387fn escape_json_string(s: &str) -> String {
389 let mut result = String::new();
390 for c in s.chars() {
391 match c {
392 '"' => result.push_str("\\\""),
393 '\\' => result.push_str("\\\\"),
394 '\n' => result.push_str("\\n"),
395 '\r' => result.push_str("\\r"),
396 '\t' => result.push_str("\\t"),
397 c if c.is_control() => {
398 result.push_str(&format!("\\u{:04x}", c as u32));
399 }
400 c => result.push(c),
401 }
402 }
403 result
404}
405
406pub async fn cors_preflight(headers: HeaderMap) -> impl IntoResponse {
409 let allowed_headers = headers
411 .get(header::ACCESS_CONTROL_REQUEST_HEADERS)
412 .and_then(|v| v.to_str().ok())
413 .unwrap_or("Authorization, Content-Type, X-SHA-256, x-sha-256");
414
415 let full_allowed = format!(
417 "{}, Authorization, Content-Type, X-SHA-256, x-sha-256, Accept, Cache-Control",
418 allowed_headers
419 );
420
421 Response::builder()
422 .status(StatusCode::NO_CONTENT)
423 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
424 .header(
425 header::ACCESS_CONTROL_ALLOW_METHODS,
426 "GET, HEAD, PUT, DELETE, OPTIONS",
427 )
428 .header(header::ACCESS_CONTROL_ALLOW_HEADERS, full_allowed)
429 .header(header::ACCESS_CONTROL_MAX_AGE, "86400")
430 .body(Body::empty())
431 .unwrap()
432}
433
434pub async fn head_blob(
436 State(state): State<AppState>,
437 Path(id): Path<String>,
438 connect_info: axum::extract::ConnectInfo<std::net::SocketAddr>,
439) -> impl IntoResponse {
440 let is_localhost = connect_info.0.ip().is_loopback();
441 let (hash_part, ext) = parse_hash_and_extension(&id);
442
443 if !is_valid_sha256(hash_part) {
444 return Response::builder()
445 .status(StatusCode::BAD_REQUEST)
446 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
447 .header("X-Reason", "Invalid SHA256 hash")
448 .body(Body::empty())
449 .unwrap();
450 }
451
452 let sha256_hex = hash_part.to_lowercase();
453 let sha256_bytes: [u8; 32] = match from_hex(&sha256_hex) {
454 Ok(b) => b,
455 Err(_) => {
456 return Response::builder()
457 .status(StatusCode::BAD_REQUEST)
458 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
459 .header("X-Reason", "Invalid SHA256 format")
460 .body(Body::empty())
461 .unwrap()
462 }
463 };
464
465 let blob_size = if let Some(cached) = state.blob_cache.get_size(&sha256_hex) {
470 Ok(Ok(cached))
471 } else {
472 let permit = match acquire_blob_read().await {
473 Ok(permit) => permit,
474 Err(_) => {
475 return Response::builder()
476 .status(StatusCode::SERVICE_UNAVAILABLE)
477 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
478 .header(header::CACHE_CONTROL, NOT_FOUND_CACHE_CONTROL)
479 .header("Retry-After", "1")
480 .header("X-Reason", BLOB_READ_BUSY)
481 .body(Body::empty())
482 .unwrap()
483 }
484 };
485 let store = state.store.clone();
486 let size_read = tokio::task::spawn_blocking(move || store.blob_size(&sha256_bytes));
487 let timed = tokio::time::timeout(blob_read_timeout(), size_read).await;
488 drop(permit);
489 let result = match timed {
490 Ok(result) => result.map_err(|_| ()),
491 Err(_) => Err(()),
492 };
493 if let Ok(Ok(size)) = result {
494 state.blob_cache.put_size(sha256_hex.clone(), size);
495 }
496 result
497 };
498
499 match blob_size {
500 Ok(Ok(Some(size))) => {
501 let mime_type = ext
502 .map(|e| get_mime_type(&format!("file{}", e)))
503 .unwrap_or("application/octet-stream");
504
505 let mut builder = Response::builder()
506 .status(StatusCode::OK)
507 .header(header::CONTENT_TYPE, mime_type)
508 .header(header::CONTENT_LENGTH, size)
509 .header(header::ACCEPT_RANGES, "bytes")
510 .header(header::CACHE_CONTROL, IMMUTABLE_CACHE_CONTROL)
511 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*");
512 if is_localhost {
513 builder = builder.header("X-Source", "local");
514 }
515 builder.body(Body::empty()).unwrap()
516 }
517 Ok(Ok(None)) => Response::builder()
518 .status(StatusCode::NOT_FOUND)
519 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
520 .header(header::CACHE_CONTROL, IMMUTABLE_NOT_FOUND_CACHE_CONTROL)
521 .header("X-Reason", "Blob not found")
522 .body(Body::empty())
523 .unwrap(),
524 _ => Response::builder()
525 .status(StatusCode::INTERNAL_SERVER_ERROR)
526 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
527 .body(Body::empty())
528 .unwrap(),
529 }
530}
531
532async fn store_blossom_blob_without_blocking_runtime(
533 state: &AppState,
534 data: axum::body::Bytes,
535 pubkey: [u8; 32],
536 track_ownership: bool,
537) -> anyhow::Result<()> {
538 let mut hasher = Sha256::new();
539 hasher.update(&data);
540 let hash_hex = hex::encode(hasher.finalize());
541 let data_for_cache = data.clone();
542 let permit = acquire_blob_write()
543 .await
544 .map_err(|err| anyhow::anyhow!(err))?;
545 let store = state.store.clone();
546 tokio::task::spawn_blocking(move || {
547 let _permit = permit;
548 if track_ownership {
549 store.put_owned_blob(&data, &pubkey)?;
550 } else {
551 store.put_cached_blob(&data)?;
552 }
553 Ok::<(), anyhow::Error>(())
554 })
555 .await
556 .map_err(|err| anyhow::anyhow!("blob write task failed: {}", err))??;
557 state
558 .blob_cache
559 .put_size(hash_hex.clone(), Some(data_for_cache.len() as u64));
560 state.blob_cache.put_body(hash_hex, &data_for_cache);
561 Ok(())
562}
563
564fn upload_descriptor_response(status: StatusCode, descriptor: &BlobDescriptor) -> Response<Body> {
565 Response::builder()
566 .status(status)
567 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
568 .header(header::CONTENT_TYPE, "application/json")
569 .body(Body::from(serde_json::to_string(descriptor).unwrap()))
570 .unwrap()
571}
572
573fn optimistic_upload_queue_timeout() -> Duration {
574 let millis = std::env::var(OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS_ENV)
575 .ok()
576 .and_then(|value| value.parse::<u64>().ok())
577 .filter(|value| *value > 0)
578 .unwrap_or(DEFAULT_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS);
579 Duration::from_millis(millis)
580}
581
582fn optimistic_upload_inflight() -> &'static Mutex<HashSet<String>> {
583 static INFLIGHT: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
584 INFLIGHT.get_or_init(|| Mutex::new(HashSet::new()))
585}
586
587fn optimistic_upload_is_inflight(hash_hex: &str) -> bool {
588 optimistic_upload_inflight()
589 .lock()
590 .is_ok_and(|inflight| inflight.contains(hash_hex))
591}
592
593fn mark_optimistic_upload_inflight(hash_hex: &str) -> bool {
594 optimistic_upload_inflight()
595 .lock()
596 .map(|mut inflight| inflight.insert(hash_hex.to_string()))
597 .unwrap_or(true)
598}
599
600fn clear_optimistic_upload_inflight(hash_hex: &str) {
601 if let Ok(mut inflight) = optimistic_upload_inflight().lock() {
602 inflight.remove(hash_hex);
603 }
604}
605
606pub(super) fn optimistic_upload_queue_snapshot(state: &AppState) -> OptimisticUploadQueueSnapshot {
607 let max_bytes = state.optimistic_upload_queue_bytes;
608 let available_bytes = state
609 .optimistic_upload_queue
610 .available_permits()
611 .min(max_bytes);
612 let in_flight = optimistic_upload_inflight()
613 .lock()
614 .map(|inflight| inflight.len())
615 .unwrap_or(0);
616
617 OptimisticUploadQueueSnapshot {
618 enabled: state.optimistic_blossom_uploads,
619 max_bytes,
620 available_bytes,
621 reserved_bytes: max_bytes.saturating_sub(available_bytes),
622 in_flight,
623 queue_timeout_ms: duration_millis_u64(optimistic_upload_queue_timeout()),
624 }
625}
626
627fn duration_millis_u64(duration: Duration) -> u64 {
628 duration.as_millis().min(u128::from(u64::MAX)) as u64
629}
630
631async fn acquire_optimistic_upload_queue(
632 state: &AppState,
633 permits: u32,
634) -> Result<tokio::sync::OwnedSemaphorePermit, &'static str> {
635 match tokio::time::timeout(
636 optimistic_upload_queue_timeout(),
637 state
638 .optimistic_upload_queue
639 .clone()
640 .acquire_many_owned(permits),
641 )
642 .await
643 {
644 Ok(Ok(permit)) => Ok(permit),
645 Ok(Err(_)) => Err("Optimistic upload queue is closed"),
646 Err(_) => Err("Optimistic upload queue is full"),
647 }
648}
649
650async fn uploaded_blob_already_exists(
651 state: &AppState,
652 sha256_hash: [u8; 32],
653 sha256_hex: &str,
654) -> Result<bool, String> {
655 if let Some(Some(_)) = state.blob_cache.get_size(sha256_hex) {
656 return Ok(true);
657 }
658
659 let permit = acquire_blob_read().await.map_err(str::to_string)?;
660 let store = state.store.clone();
661 let size_read = tokio::task::spawn_blocking(move || {
662 store
663 .blob_size(&sha256_hash)
664 .map_err(|error| error.to_string())
665 });
666
667 let result = tokio::time::timeout(blob_read_timeout(), size_read).await;
668 drop(permit);
669 match result {
670 Ok(Ok(Ok(size))) => {
671 state.blob_cache.put_size(sha256_hex.to_string(), size);
672 Ok(size.is_some())
673 }
674 Ok(Ok(Err(error))) => Err(error),
675 Ok(Err(error)) => Err(format!("blob existence task failed: {}", error)),
676 Err(_) => Err("blob existence check timed out".to_string()),
677 }
678}
679
680async fn set_existing_blob_owner_without_body_write(
681 state: AppState,
682 sha256_hash: [u8; 32],
683 pubkey: [u8; 32],
684) -> anyhow::Result<()> {
685 tokio::task::spawn_blocking(move || state.store.set_blob_owner(&sha256_hash, &pubkey))
686 .await
687 .map_err(|error| anyhow::anyhow!("blob owner task failed: {}", error))??;
688 Ok(())
689}
690
691pub async fn upload_blob(
693 State(state): State<AppState>,
694 headers: HeaderMap,
695 body: axum::body::Bytes,
696) -> impl IntoResponse {
697 let max_size = state.max_upload_bytes;
699 if body.len() > max_size {
700 return Response::builder()
701 .status(StatusCode::PAYLOAD_TOO_LARGE)
702 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
703 .header(header::CONTENT_TYPE, "application/json")
704 .body(Body::from(format!(
705 r#"{{"error":"Upload size {} bytes exceeds maximum {} bytes ({} MB)"}}"#,
706 body.len(),
707 max_size,
708 max_size / 1024 / 1024
709 )))
710 .unwrap();
711 }
712
713 let auth = match verify_blossom_auth(&headers, "upload", None) {
715 Ok(a) => a,
716 Err((status, reason)) => {
717 return Response::builder()
718 .status(status)
719 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
720 .header("X-Reason", reason)
721 .header(header::CONTENT_TYPE, "application/json")
722 .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
723 .unwrap();
724 }
725 };
726
727 let content_type = headers
729 .get(header::CONTENT_TYPE)
730 .and_then(|v| v.to_str().ok())
731 .unwrap_or("application/octet-stream")
732 .to_string();
733
734 let is_allowed_author = is_allowed_write_author(&state, &auth.pubkey);
736 let can_upload = can_accept_upload_author(&state, &auth.pubkey);
737 if !can_upload {
738 let _ = check_write_access(&state, &auth.pubkey);
739 return Response::builder()
740 .status(StatusCode::FORBIDDEN)
741 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
742 .header(header::CONTENT_TYPE, "application/json")
743 .body(Body::from(r#"{"error":"Write access denied. Your pubkey is not in the allowed list and public writes are disabled."}"#))
744 .unwrap();
745 }
746
747 if let Err((status, reason)) = validate_upload_payload(
748 &body,
749 &content_type,
750 can_upload,
751 state.require_random_untrusted_ingest,
752 ) {
753 return blossom_json_error(status, reason);
754 }
755
756 let mut hasher = Sha256::new();
758 hasher.update(&body);
759 let sha256_hash: [u8; 32] = hasher.finalize().into();
760 let sha256_hex = hex::encode(sha256_hash);
761
762 if !auth.blob_hashes.is_empty() && !auth.blob_hashes.contains(&sha256_hex) {
764 return Response::builder()
765 .status(StatusCode::FORBIDDEN)
766 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
767 .header(
768 "X-Reason",
769 "Uploaded blob hash does not match authorized hash",
770 )
771 .header(header::CONTENT_TYPE, "application/json")
772 .body(Body::from(r#"{"error":"Hash mismatch"}"#))
773 .unwrap();
774 }
775
776 let pubkey_bytes = match from_hex(&auth.pubkey) {
778 Ok(b) => b,
779 Err(_) => {
780 return Response::builder()
781 .status(StatusCode::BAD_REQUEST)
782 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
783 .header("X-Reason", "Invalid pubkey format")
784 .body(Body::empty())
785 .unwrap();
786 }
787 };
788
789 let size = body.len() as u64;
790 let now = SystemTime::now()
791 .duration_since(UNIX_EPOCH)
792 .unwrap()
793 .as_secs();
794 let ext = mime_to_extension(&content_type_base(&content_type));
795 let descriptor = BlobDescriptor {
796 url: format!("/{}{}", sha256_hex, ext),
797 sha256: sha256_hex.clone(),
798 size,
799 mime_type: content_type,
800 uploaded: now,
801 };
802
803 if state.optimistic_blossom_uploads && optimistic_upload_is_inflight(&sha256_hex) {
804 return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
805 }
806
807 if state.optimistic_blossom_uploads {
810 let queued_bytes = body.len().max(OPTIMISTIC_UPLOAD_MIN_QUEUE_CHARGE_BYTES);
811 if queued_bytes <= state.optimistic_upload_queue_bytes {
812 let permits = queued_bytes as u32;
813 let marked_inflight = mark_optimistic_upload_inflight(&sha256_hex);
814 if !marked_inflight {
815 return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
816 }
817 let permit = match acquire_optimistic_upload_queue(&state, permits).await {
818 Ok(permit) => permit,
819 Err(_) => {
820 clear_optimistic_upload_inflight(&sha256_hex);
821 return blossom_retryable_json_error(
822 StatusCode::SERVICE_UNAVAILABLE,
823 "Optimistic upload queue is full",
824 2,
825 );
826 }
827 };
828 let state_for_write = state.clone();
829 let hash_for_log = sha256_hex.clone();
830 tokio::spawn(async move {
831 let _permit = permit;
832 if let Err(error) = store_blossom_blob_without_blocking_runtime(
833 &state_for_write,
834 body,
835 pubkey_bytes,
836 is_allowed_author,
837 )
838 .await
839 {
840 tracing::error!(
841 "Background Blossom storage failed for {}: {:#}",
842 hash_for_log,
843 error
844 );
845 }
846 clear_optimistic_upload_inflight(&hash_for_log);
847 });
848
849 return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
850 }
851
852 tracing::warn!(
853 "Blossom upload {} is larger than optimistic queue budget {}; storing synchronously",
854 queued_bytes,
855 state.optimistic_upload_queue_bytes
856 );
857 }
858
859 match uploaded_blob_already_exists(&state, sha256_hash, &sha256_hex).await {
860 Ok(true) => {
861 if is_allowed_author {
862 if let Err(error) = set_existing_blob_owner_without_body_write(
863 state.clone(),
864 sha256_hash,
865 pubkey_bytes,
866 )
867 .await
868 {
869 return Response::builder()
870 .status(StatusCode::INTERNAL_SERVER_ERROR)
871 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
872 .header("X-Reason", "Storage error")
873 .header(header::CONTENT_TYPE, "application/json")
874 .body(Body::from(format!(r#"{{"error":"{}"}}"#, error)))
875 .unwrap();
876 }
877 }
878 return upload_descriptor_response(StatusCode::CONFLICT, &descriptor);
879 }
880 Ok(false) => {}
881 Err(error) => {
882 tracing::debug!(
883 "Could not preflight Blossom upload {} before synchronous storage: {}",
884 sha256_hex,
885 error
886 );
887 }
888 }
889
890 let store_result = store_blossom_blob_without_blocking_runtime(
891 &state,
892 body.clone(),
893 pubkey_bytes,
894 is_allowed_author,
895 )
896 .await;
897
898 match store_result {
899 Ok(()) => upload_descriptor_response(StatusCode::OK, &descriptor),
900 Err(e) => Response::builder()
901 .status(StatusCode::INTERNAL_SERVER_ERROR)
902 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
903 .header("X-Reason", "Storage error")
904 .header(header::CONTENT_TYPE, "application/json")
905 .body(Body::from(format!(r#"{{"error":"{}"}}"#, e)))
906 .unwrap(),
907 }
908}
909
910pub async fn upload_blob_batch(
912 State(state): State<AppState>,
913 headers: HeaderMap,
914 Json(payload): Json<BatchUploadRequest>,
915) -> impl IntoResponse {
916 let started_at = Instant::now();
917 let slow_log_ms = slow_batch_upload_log_ms();
918 let payload_blobs = payload.blobs.len();
919 if payload.blobs.is_empty() {
920 return blossom_json_error(StatusCode::BAD_REQUEST, "Batch is empty");
921 }
922 if payload.blobs.len() > MAX_BATCH_UPLOAD_BLOBS {
923 return blossom_json_error(
924 StatusCode::PAYLOAD_TOO_LARGE,
925 "Batch contains too many blobs",
926 );
927 }
928
929 let auth = match verify_blossom_auth(&headers, "upload", None) {
930 Ok(a) => a,
931 Err((status, reason)) => {
932 return Response::builder()
933 .status(status)
934 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
935 .header("X-Reason", reason)
936 .header(header::CONTENT_TYPE, "application/json")
937 .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
938 .unwrap();
939 }
940 };
941 let auth_ms = started_at.elapsed().as_millis();
942
943 let is_allowed_author = is_allowed_write_author(&state, &auth.pubkey);
944 let can_upload = can_accept_upload_author(&state, &auth.pubkey);
945 if !can_upload {
946 let _ = check_write_access(&state, &auth.pubkey);
947 return blossom_json_error(StatusCode::FORBIDDEN, "Write access denied");
948 }
949
950 let pubkey_bytes = match from_hex(&auth.pubkey) {
951 Ok(bytes) => bytes,
952 Err(_) => return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid pubkey format"),
953 };
954
955 let now = SystemTime::now()
956 .duration_since(UNIX_EPOCH)
957 .unwrap()
958 .as_secs();
959 let mut total_bytes = 0usize;
960 let mut items = Vec::with_capacity(payload.blobs.len());
961 let mut descriptors = Vec::with_capacity(payload.blobs.len());
962 let mut decode_hash_ms = 0u128;
963 let mut validate_ms = 0u128;
964
965 for blob in payload.blobs {
966 let decode_started = Instant::now();
967 let sha256_hex = blob.sha256.to_lowercase();
968 let expected_hash: [u8; 32] = match from_hex(&sha256_hex) {
969 Ok(hash) => hash,
970 Err(_) => return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid blob hash"),
971 };
972 if !auth.blob_hashes.is_empty() && !auth.blob_hashes.contains(&sha256_hex) {
973 return blossom_json_error(
974 StatusCode::FORBIDDEN,
975 "Uploaded blob hash does not match authorized hash",
976 );
977 }
978
979 let data = match base64::engine::general_purpose::STANDARD.decode(blob.data.as_bytes()) {
980 Ok(data) => data,
981 Err(_) => return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid blob data"),
982 };
983 if data.len() > state.max_upload_bytes {
984 return blossom_json_error(
985 StatusCode::PAYLOAD_TOO_LARGE,
986 "Blob exceeds maximum upload size",
987 );
988 }
989 total_bytes = total_bytes.saturating_add(data.len());
990 if total_bytes > MAX_BATCH_UPLOAD_BYTES {
991 return blossom_json_error(
992 StatusCode::PAYLOAD_TOO_LARGE,
993 "Batch exceeds maximum upload size",
994 );
995 }
996
997 let mut hasher = Sha256::new();
998 hasher.update(&data);
999 let actual_hash: [u8; 32] = hasher.finalize().into();
1000 if actual_hash != expected_hash {
1001 return blossom_json_error(StatusCode::FORBIDDEN, "Hash mismatch");
1002 }
1003 decode_hash_ms += decode_started.elapsed().as_millis();
1004
1005 let validate_started = Instant::now();
1006 let content_type = blob
1007 .content_type
1008 .as_deref()
1009 .map(content_type_base)
1010 .unwrap_or_else(|| "application/octet-stream".to_string());
1011 if let Err((status, reason)) = validate_upload_payload(
1012 &data,
1013 &content_type,
1014 can_upload,
1015 state.require_random_untrusted_ingest,
1016 ) {
1017 return blossom_json_error(status, reason);
1018 }
1019 validate_ms += validate_started.elapsed().as_millis();
1020
1021 let ext = mime_to_extension(&content_type);
1022 descriptors.push(BlobDescriptor {
1023 url: format!("/{}{}", sha256_hex, ext),
1024 sha256: sha256_hex,
1025 size: data.len() as u64,
1026 mime_type: content_type,
1027 uploaded: now,
1028 });
1029 items.push((actual_hash, data));
1030 }
1031 let prepare_ms = started_at.elapsed().as_millis();
1032
1033 let store = state.store.clone();
1034 let store_started = Instant::now();
1035 let stored = tokio::task::spawn_blocking(move || {
1036 if is_allowed_author {
1037 store.put_owned_blobs(&items, &pubkey_bytes)
1038 } else {
1039 store.put_cached_blobs(&items)
1040 }
1041 })
1042 .await
1043 .map_err(|error| anyhow::anyhow!("blob batch write task failed: {}", error));
1044 let store_ms = store_started.elapsed().as_millis();
1045 let total_ms = started_at.elapsed().as_millis();
1046
1047 match stored {
1048 Ok(Ok(uploaded)) => {
1049 if slow_log_ms.is_some_and(|threshold| total_ms >= threshold) {
1050 tracing::warn!(
1051 blobs = payload_blobs,
1052 uploaded,
1053 total_bytes,
1054 total_ms,
1055 auth_ms,
1056 prepare_ms,
1057 decode_hash_ms,
1058 validate_ms,
1059 store_ms,
1060 allowed_author = is_allowed_author,
1061 "slow Blossom batch upload"
1062 );
1063 }
1064 Response::builder()
1065 .status(StatusCode::OK)
1066 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1067 .header(header::CONTENT_TYPE, "application/json")
1068 .body(Body::from(
1069 serde_json::to_string(&BatchUploadResponse {
1070 uploaded,
1071 blobs: descriptors,
1072 })
1073 .unwrap(),
1074 ))
1075 .unwrap()
1076 }
1077 Ok(Err(error)) | Err(error) => Response::builder()
1078 .status(StatusCode::INTERNAL_SERVER_ERROR)
1079 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1080 .header("X-Reason", "Storage error")
1081 .header(header::CONTENT_TYPE, "application/json")
1082 .body(Body::from(format!(r#"{{"error":"{}"}}"#, error)))
1083 .unwrap(),
1084 }
1085}
1086
1087pub async fn delete_blob(
1090 State(state): State<AppState>,
1091 Path(id): Path<String>,
1092 headers: HeaderMap,
1093) -> impl IntoResponse {
1094 let (hash_part, _) = parse_hash_and_extension(&id);
1095
1096 if !is_valid_sha256(hash_part) {
1097 return Response::builder()
1098 .status(StatusCode::BAD_REQUEST)
1099 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1100 .header("X-Reason", "Invalid SHA256 hash")
1101 .body(Body::empty())
1102 .unwrap();
1103 }
1104
1105 let sha256_hex = hash_part.to_lowercase();
1106
1107 let sha256_bytes = match from_hex(&sha256_hex) {
1109 Ok(b) => b,
1110 Err(_) => {
1111 return Response::builder()
1112 .status(StatusCode::BAD_REQUEST)
1113 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1114 .header("X-Reason", "Invalid SHA256 hash format")
1115 .body(Body::empty())
1116 .unwrap();
1117 }
1118 };
1119
1120 let auth = match verify_blossom_auth(&headers, "delete", Some(&sha256_hex)) {
1122 Ok(a) => a,
1123 Err((status, reason)) => {
1124 return Response::builder()
1125 .status(status)
1126 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1127 .header("X-Reason", reason)
1128 .body(Body::empty())
1129 .unwrap();
1130 }
1131 };
1132
1133 let pubkey_bytes = match from_hex(&auth.pubkey) {
1135 Ok(b) => b,
1136 Err(_) => {
1137 return Response::builder()
1138 .status(StatusCode::BAD_REQUEST)
1139 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1140 .header("X-Reason", "Invalid pubkey format")
1141 .body(Body::empty())
1142 .unwrap();
1143 }
1144 };
1145
1146 match state.store.is_blob_owner(&sha256_bytes, &pubkey_bytes) {
1148 Ok(true) => {
1149 }
1151 Ok(false) => {
1152 match state.store.blob_has_owners(&sha256_bytes) {
1154 Ok(true) => {
1155 return Response::builder()
1156 .status(StatusCode::FORBIDDEN)
1157 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1158 .header("X-Reason", "Not a blob owner")
1159 .body(Body::empty())
1160 .unwrap();
1161 }
1162 Ok(false) => {
1163 return Response::builder()
1164 .status(StatusCode::NOT_FOUND)
1165 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1166 .header(header::CACHE_CONTROL, NOT_FOUND_CACHE_CONTROL)
1167 .header("X-Reason", "Blob not found")
1168 .body(Body::empty())
1169 .unwrap();
1170 }
1171 Err(_) => {
1172 return Response::builder()
1173 .status(StatusCode::INTERNAL_SERVER_ERROR)
1174 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1175 .body(Body::empty())
1176 .unwrap();
1177 }
1178 }
1179 }
1180 Err(_) => {
1181 return Response::builder()
1182 .status(StatusCode::INTERNAL_SERVER_ERROR)
1183 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1184 .body(Body::empty())
1185 .unwrap();
1186 }
1187 }
1188
1189 match state
1191 .store
1192 .delete_blossom_blob(&sha256_bytes, &pubkey_bytes)
1193 {
1194 Ok(fully_deleted) => {
1195 Response::builder()
1198 .status(StatusCode::OK)
1199 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1200 .header(
1201 "X-Blob-Deleted",
1202 if fully_deleted { "true" } else { "false" },
1203 )
1204 .body(Body::empty())
1205 .unwrap()
1206 }
1207 Err(_) => Response::builder()
1208 .status(StatusCode::INTERNAL_SERVER_ERROR)
1209 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1210 .body(Body::empty())
1211 .unwrap(),
1212 }
1213}
1214
1215pub async fn list_blobs(
1217 State(state): State<AppState>,
1218 Path(pubkey): Path<String>,
1219 Query(query): Query<ListQuery>,
1220 headers: HeaderMap,
1221) -> impl IntoResponse {
1222 if pubkey.len() != 64 || !pubkey.chars().all(|c| c.is_ascii_hexdigit()) {
1224 return Response::builder()
1225 .status(StatusCode::BAD_REQUEST)
1226 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1227 .header("X-Reason", "Invalid pubkey format")
1228 .header(header::CONTENT_TYPE, "application/json")
1229 .body(Body::from("[]"))
1230 .unwrap();
1231 }
1232
1233 let pubkey_hex = pubkey.to_lowercase();
1234 let pubkey_bytes: [u8; 32] = match from_hex(&pubkey_hex) {
1235 Ok(b) => b,
1236 Err(_) => {
1237 return Response::builder()
1238 .status(StatusCode::BAD_REQUEST)
1239 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1240 .header("X-Reason", "Invalid pubkey format")
1241 .header(header::CONTENT_TYPE, "application/json")
1242 .body(Body::from("[]"))
1243 .unwrap()
1244 }
1245 };
1246
1247 let auth = match verify_blossom_auth(&headers, "list", None) {
1248 Ok(auth) => auth,
1249 Err((status, reason)) => {
1250 return Response::builder()
1251 .status(status)
1252 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1253 .header("X-Reason", reason)
1254 .header(header::CONTENT_TYPE, "application/json")
1255 .body(Body::from("[]"))
1256 .unwrap();
1257 }
1258 };
1259
1260 if !auth.pubkey.eq_ignore_ascii_case(&pubkey_hex) {
1261 return Response::builder()
1262 .status(StatusCode::FORBIDDEN)
1263 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1264 .header("X-Reason", "Pubkey mismatch")
1265 .header(header::CONTENT_TYPE, "application/json")
1266 .body(Body::from("[]"))
1267 .unwrap();
1268 }
1269
1270 match state.store.list_blobs_by_pubkey(&pubkey_bytes) {
1272 Ok(blobs) => {
1273 let mut filtered: Vec<_> = blobs
1275 .into_iter()
1276 .filter(|b| {
1277 if let Some(since) = query.since {
1278 if b.uploaded < since {
1279 return false;
1280 }
1281 }
1282 if let Some(until) = query.until {
1283 if b.uploaded > until {
1284 return false;
1285 }
1286 }
1287 true
1288 })
1289 .collect();
1290
1291 filtered.sort_by(|a, b| b.uploaded.cmp(&a.uploaded));
1293
1294 let limit = query.limit.unwrap_or(100).min(1000);
1296 filtered.truncate(limit);
1297
1298 Response::builder()
1299 .status(StatusCode::OK)
1300 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1301 .header(header::CONTENT_TYPE, "application/json")
1302 .body(Body::from(serde_json::to_string(&filtered).unwrap()))
1303 .unwrap()
1304 }
1305 Err(_) => Response::builder()
1306 .status(StatusCode::INTERNAL_SERVER_ERROR)
1307 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1308 .header(header::CONTENT_TYPE, "application/json")
1309 .body(Body::from("[]"))
1310 .unwrap(),
1311 }
1312}
1313
1314fn parse_hash_and_extension(id: &str) -> (&str, Option<&str>) {
1317 if let Some(dot_pos) = id.rfind('.') {
1318 (&id[..dot_pos], Some(&id[dot_pos..]))
1319 } else {
1320 (id, None)
1321 }
1322}
1323
1324fn is_valid_sha256(s: &str) -> bool {
1325 s.len() == 64 && s.chars().all(|c| c.is_ascii_hexdigit())
1326}
1327
1328#[cfg(test)]
1329fn store_blossom_blob(
1330 state: &AppState,
1331 data: &[u8],
1332 _sha256: &[u8; 32],
1333 pubkey: &[u8; 32],
1334 track_ownership: bool,
1335) -> anyhow::Result<()> {
1336 if track_ownership {
1337 state.store.put_owned_blob(data, pubkey)?;
1338 } else {
1339 state.store.put_cached_blob(data)?;
1340 }
1341
1342 Ok(())
1343}
1344
1345fn mime_to_extension(mime: &str) -> &'static str {
1346 match mime {
1347 "image/png" => ".png",
1348 "image/jpeg" => ".jpg",
1349 "image/gif" => ".gif",
1350 "image/webp" => ".webp",
1351 "image/svg+xml" => ".svg",
1352 "video/mp4" => ".mp4",
1353 "video/webm" => ".webm",
1354 "audio/mpeg" => ".mp3",
1355 "audio/ogg" => ".ogg",
1356 "application/pdf" => ".pdf",
1357 "text/plain" => ".txt",
1358 "text/html" => ".html",
1359 "application/json" => ".json",
1360 _ => "",
1361 }
1362}
1363
1364#[cfg(test)]
1365mod tests {
1366 use super::*;
1367 use crate::server::auth::WsRelayState;
1368 use crate::storage::HashtreeStore;
1369 use axum::response::IntoResponse;
1370 use base64::Engine;
1371 use hashtree_core::sha256;
1372 use std::collections::HashSet;
1373 use std::sync::{Arc, Mutex as StdMutex};
1374 use std::time::Duration;
1375 use tempfile::TempDir;
1376
1377 fn test_app_state(store: Arc<HashtreeStore>) -> AppState {
1378 AppState {
1379 store,
1380 auth: None,
1381 daemon_started_at: 1_700_000_000,
1382 peer_mode: crate::config::ServerMode::Normal,
1383 hash_get_enabled: true,
1384 http_webrtc_fetch: true,
1385 webrtc_peers: None,
1386 fips_transport: None,
1387 fetch_from_fips_peers: true,
1388 ws_relay: Arc::new(WsRelayState::new()),
1389 max_upload_bytes: 5 * 1024 * 1024,
1390 public_writes: true,
1391 require_random_untrusted_ingest: true,
1392 optimistic_blossom_uploads: false,
1393 optimistic_upload_queue_bytes: 512 * 1024 * 1024,
1394 optimistic_upload_queue: Arc::new(tokio::sync::Semaphore::new(512 * 1024 * 1024)),
1395 allowed_pubkeys: HashSet::new(),
1396 upstream_blossom: Vec::new(),
1397 social_graph: None,
1398 social_graph_store: None,
1399 social_graph_root: None,
1400 socialgraph_snapshot_public: false,
1401 nostr_relay: None,
1402 nostr_relay_urls: Vec::new(),
1403 tree_root_cache: Arc::new(StdMutex::new(std::collections::HashMap::new())),
1404 inflight_blob_fetches: Arc::new(tokio::sync::Mutex::new(
1405 std::collections::HashMap::new(),
1406 )),
1407 inflight_blob_reads: Arc::new(
1408 tokio::sync::Mutex::new(std::collections::HashMap::new()),
1409 ),
1410 blob_cache: Arc::new(crate::blob_cache::BlobCache::for_tests()),
1411 directory_listing_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1412 resolved_path_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1413 thumbnail_path_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1414 cid_size_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1415 }
1416 }
1417
1418 fn create_upload_auth_header(keys: &nostr::Keys) -> String {
1419 use nostr::{EventBuilder, Kind, Tag, TagKind, Timestamp};
1420
1421 let now = Timestamp::now();
1422 let event = EventBuilder::new(
1423 Kind::Custom(BLOSSOM_AUTH_KIND),
1424 "",
1425 vec![
1426 Tag::custom(TagKind::Custom("t".into()), vec!["upload".to_string()]),
1427 Tag::custom(
1428 TagKind::Custom("expiration".into()),
1429 vec![(now.as_u64() + 300).to_string()],
1430 ),
1431 ],
1432 )
1433 .custom_created_at(now)
1434 .to_event(keys)
1435 .expect("sign blossom auth");
1436 let json = serde_json::to_vec(&event).expect("serialize auth event");
1437 format!(
1438 "Nostr {}",
1439 base64::engine::general_purpose::STANDARD.encode(json)
1440 )
1441 }
1442
1443 #[test]
1444 fn test_is_valid_sha256() {
1445 assert!(is_valid_sha256(
1446 "e2bab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6"
1447 ));
1448 assert!(is_valid_sha256(
1449 "0000000000000000000000000000000000000000000000000000000000000000"
1450 ));
1451
1452 assert!(!is_valid_sha256("e2bab35b5296ec2242ded0a01f6d6723"));
1454 assert!(!is_valid_sha256(
1456 "e2bab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6aa"
1457 ));
1458 assert!(!is_valid_sha256(
1460 "zzbab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6"
1461 ));
1462 assert!(!is_valid_sha256(""));
1464 }
1465
1466 #[test]
1467 fn test_parse_hash_and_extension() {
1468 let (hash, ext) = parse_hash_and_extension("abc123.png");
1469 assert_eq!(hash, "abc123");
1470 assert_eq!(ext, Some(".png"));
1471
1472 let (hash2, ext2) = parse_hash_and_extension("abc123");
1473 assert_eq!(hash2, "abc123");
1474 assert_eq!(ext2, None);
1475
1476 let (hash3, ext3) = parse_hash_and_extension("abc.123.jpg");
1477 assert_eq!(hash3, "abc.123");
1478 assert_eq!(ext3, Some(".jpg"));
1479 }
1480
1481 #[test]
1482 fn test_mime_to_extension() {
1483 assert_eq!(mime_to_extension("image/png"), ".png");
1484 assert_eq!(mime_to_extension("image/jpeg"), ".jpg");
1485 assert_eq!(mime_to_extension("video/mp4"), ".mp4");
1486 assert_eq!(mime_to_extension("application/octet-stream"), "");
1487 assert_eq!(mime_to_extension("unknown/type"), "");
1488 }
1489
1490 #[tokio::test]
1491 async fn optimistic_uploads_return_accepted_and_store_in_background() {
1492 let temp_dir = TempDir::new().expect("temp dir");
1493 let store = Arc::new(
1494 HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1495 );
1496 let mut state = test_app_state(Arc::clone(&store));
1497 state.optimistic_blossom_uploads = true;
1498
1499 let keys = nostr::Keys::generate();
1500 let mut headers = HeaderMap::new();
1501 headers.insert(
1502 header::AUTHORIZATION,
1503 create_upload_auth_header(&keys)
1504 .parse()
1505 .expect("auth header value"),
1506 );
1507 headers.insert(
1508 header::CONTENT_TYPE,
1509 "application/octet-stream"
1510 .parse()
1511 .expect("content type header value"),
1512 );
1513
1514 let body = axum::body::Bytes::from((0u8..=255).map(|byte| byte ^ 0x55).collect::<Vec<_>>());
1515 let hash = sha256(&body);
1516 let response = upload_blob(State(state), headers, body)
1517 .await
1518 .into_response();
1519 assert_eq!(response.status(), StatusCode::ACCEPTED);
1520
1521 for _ in 0..50 {
1522 if store.blob_exists(&hash).expect("blob exists check") {
1523 return;
1524 }
1525 tokio::time::sleep(Duration::from_millis(10)).await;
1526 }
1527
1528 panic!("optimistic upload was not stored in the background");
1529 }
1530
1531 #[tokio::test]
1532 async fn optimistic_upload_existing_blob_skips_queue() {
1533 let temp_dir = TempDir::new().expect("temp dir");
1534 let store = Arc::new(
1535 HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1536 );
1537 let body = axum::body::Bytes::from(
1538 (0u16..=255)
1539 .map(|value| ((value * 73 + 19) % 256) as u8)
1540 .collect::<Vec<_>>(),
1541 );
1542 store.put_cached_blob(&body).expect("seed blob");
1543
1544 let mut state = test_app_state(Arc::clone(&store));
1545 state.optimistic_blossom_uploads = true;
1546 state.optimistic_upload_queue_bytes = 1;
1547 state.optimistic_upload_queue = Arc::new(tokio::sync::Semaphore::new(1));
1548
1549 let keys = nostr::Keys::generate();
1550 let mut headers = HeaderMap::new();
1551 headers.insert(
1552 header::AUTHORIZATION,
1553 create_upload_auth_header(&keys)
1554 .parse()
1555 .expect("auth header value"),
1556 );
1557 headers.insert(
1558 header::CONTENT_TYPE,
1559 "application/octet-stream"
1560 .parse()
1561 .expect("content type header value"),
1562 );
1563
1564 let response = upload_blob(State(state), headers, body)
1565 .await
1566 .into_response();
1567 assert_eq!(response.status(), StatusCode::CONFLICT);
1568 }
1569
1570 #[tokio::test]
1571 async fn optimistic_upload_existing_blob_uses_queue_before_preflight_when_queue_has_room() {
1572 let temp_dir = TempDir::new().expect("temp dir");
1573 let store = Arc::new(
1574 HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1575 );
1576 let body = axum::body::Bytes::from((0u8..=255).rev().collect::<Vec<_>>());
1577 let hash_hex = hex::encode(sha256(&body));
1578 store.put_cached_blob(&body).expect("seed blob");
1579
1580 let mut state = test_app_state(store);
1581 state.optimistic_blossom_uploads = true;
1582
1583 let keys = nostr::Keys::generate();
1584 let mut headers = HeaderMap::new();
1585 headers.insert(
1586 header::AUTHORIZATION,
1587 create_upload_auth_header(&keys)
1588 .parse()
1589 .expect("auth header value"),
1590 );
1591 headers.insert(
1592 header::CONTENT_TYPE,
1593 "application/octet-stream"
1594 .parse()
1595 .expect("content type header value"),
1596 );
1597
1598 let response = upload_blob(State(state), headers, body)
1599 .await
1600 .into_response();
1601 assert_eq!(response.status(), StatusCode::ACCEPTED);
1602
1603 for _ in 0..50 {
1604 if !optimistic_upload_is_inflight(&hash_hex) {
1605 return;
1606 }
1607 tokio::time::sleep(Duration::from_millis(10)).await;
1608 }
1609
1610 clear_optimistic_upload_inflight(&hash_hex);
1611 panic!("optimistic upload in-flight marker was not cleared");
1612 }
1613
1614 #[tokio::test]
1615 async fn optimistic_upload_inflight_duplicate_skips_queue() {
1616 let temp_dir = TempDir::new().expect("temp dir");
1617 let store = Arc::new(
1618 HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1619 );
1620 let body = axum::body::Bytes::from((0u8..=255).collect::<Vec<_>>());
1621 let hash_hex = hex::encode(sha256(&body));
1622 assert!(mark_optimistic_upload_inflight(&hash_hex));
1623
1624 let mut state = test_app_state(store);
1625 state.optimistic_blossom_uploads = true;
1626 state.optimistic_upload_queue_bytes = 1;
1627 state.optimistic_upload_queue = Arc::new(tokio::sync::Semaphore::new(1));
1628
1629 let keys = nostr::Keys::generate();
1630 let mut headers = HeaderMap::new();
1631 headers.insert(
1632 header::AUTHORIZATION,
1633 create_upload_auth_header(&keys)
1634 .parse()
1635 .expect("auth header value"),
1636 );
1637 headers.insert(
1638 header::CONTENT_TYPE,
1639 "application/octet-stream"
1640 .parse()
1641 .expect("content type header value"),
1642 );
1643
1644 let response = upload_blob(State(state), headers, body)
1645 .await
1646 .into_response();
1647 clear_optimistic_upload_inflight(&hash_hex);
1648 assert_eq!(response.status(), StatusCode::ACCEPTED);
1649 }
1650
1651 #[test]
1652 fn public_writes_accept_unlisted_authors_for_uploads() {
1653 let temp_dir = TempDir::new().expect("temp dir");
1654 let store =
1655 Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 700).expect("store"));
1656 let mut state = test_app_state(store);
1657 let pubkey = "ea4fe79e57f209309bffed2f92f0b95b59d3d1cb4e8892444398aeea7ee317ed";
1658
1659 state.public_writes = true;
1660 assert!(can_accept_upload_author(&state, pubkey));
1661 assert!(!is_allowed_write_author(&state, pubkey));
1662
1663 state.public_writes = false;
1664 assert!(!can_accept_upload_author(&state, pubkey));
1665 }
1666
1667 #[test]
1668 fn public_write_trust_allows_octet_stream_and_raw_media_payloads() {
1669 let encrypted_block: Vec<u8> = (0..=255).collect();
1670
1671 assert_eq!(
1672 validate_upload_payload(&encrypted_block, "application/octet-stream", false, true,),
1673 Ok(())
1674 );
1675
1676 assert_eq!(
1677 validate_upload_payload(b"audio bytes", "audio/mpeg", true, true,),
1678 Ok(())
1679 );
1680
1681 assert_eq!(
1682 validate_upload_payload(b"audio bytes", "audio/mpeg", false, true,),
1683 Err((
1684 StatusCode::FORBIDDEN,
1685 "Raw media uploads require write access".to_string(),
1686 ))
1687 );
1688 }
1689
1690 #[test]
1691 fn authenticated_chk_uploads_skip_entropy_heuristic() {
1692 let low_unique_block: Vec<u8> = (0..256).map(|i| (i % 139) as u8).collect();
1693
1694 assert_eq!(
1695 validate_upload_payload(&low_unique_block, "application/octet-stream", true, true,),
1696 Ok(())
1697 );
1698
1699 assert_eq!(
1700 validate_upload_payload(&low_unique_block, "application/octet-stream", false, true,),
1701 Err((
1702 StatusCode::UNSUPPORTED_MEDIA_TYPE,
1703 "Data not encrypted. Unique: 139 (min: 140)".to_string(),
1704 ))
1705 );
1706 }
1707
1708 #[test]
1709 fn unowned_public_uploads_use_cache_storage_semantics() {
1710 let temp_dir = TempDir::new().expect("temp dir");
1711 let store =
1712 Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 700).expect("store"));
1713 let state = test_app_state(Arc::clone(&store));
1714
1715 let owned = vec![1u8; 280];
1716 let owned_hash = sha256(&owned);
1717 store_blossom_blob(&state, &owned, &owned_hash, &[2u8; 32], true).expect("owned upload");
1718
1719 let public_upload = vec![3u8; 280];
1720 let public_hash = sha256(&public_upload);
1721 store_blossom_blob(&state, &public_upload, &public_hash, &[4u8; 32], false)
1722 .expect("public upload");
1723
1724 let replacement = vec![5u8; 280];
1725 let replacement_hash = sha256(&replacement);
1726 state
1727 .store
1728 .put_cached_blob(&replacement)
1729 .expect("replacement cached blob");
1730
1731 assert!(state.store.blob_exists(&owned_hash).expect("owned exists"));
1732 assert!(!state
1733 .store
1734 .blob_exists(&public_hash)
1735 .expect("public upload evicted"));
1736 assert!(state
1737 .store
1738 .blob_exists(&replacement_hash)
1739 .expect("replacement exists"));
1740 assert!(state
1741 .store
1742 .is_blob_owner(&owned_hash, &[2u8; 32])
1743 .expect("owned tracked"));
1744 assert!(!state
1745 .store
1746 .blob_has_owners(&public_hash)
1747 .expect("public upload unowned"));
1748 }
1749
1750 #[test]
1751 fn owned_blossom_uploads_are_rejected_when_storage_limit_is_full() {
1752 let temp_dir = TempDir::new().expect("temp dir");
1753 let store =
1754 Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 500).expect("store"));
1755 let state = test_app_state(Arc::clone(&store));
1756
1757 let first = vec![1u8; 300];
1758 let first_hash = sha256(&first);
1759 let owner = [2u8; 32];
1760 store_blossom_blob(&state, &first, &first_hash, &owner, true).expect("first upload");
1761
1762 let second = vec![3u8; 300];
1763 let second_hash = sha256(&second);
1764 let error = store_blossom_blob(&state, &second, &second_hash, &owner, true)
1765 .expect_err("second owned upload should exceed the storage limit");
1766
1767 assert!(
1768 error.to_string().contains("storage limit"),
1769 "unexpected error: {error}"
1770 );
1771 assert!(state
1772 .store
1773 .blob_exists(&first_hash)
1774 .expect("first blob remains"));
1775 assert!(!state
1776 .store
1777 .blob_exists(&second_hash)
1778 .expect("second blob rejected"));
1779 assert!(state
1780 .store
1781 .is_blob_owner(&first_hash, &owner)
1782 .expect("first owner tracked"));
1783 assert!(!state
1784 .store
1785 .is_blob_owner(&second_hash, &owner)
1786 .expect("second owner not tracked"));
1787 }
1788}