1use axum::{
7 body::Body,
8 extract::{Path, Query, State},
9 http::{header, HeaderMap, Response, StatusCode},
10 response::IntoResponse,
11 Json,
12};
13use base64::Engine;
14use hashtree_core::from_hex;
15use serde::{Deserialize, Serialize};
16use sha2::{Digest, Sha256};
17use std::collections::HashSet;
18use std::sync::{Mutex, OnceLock};
19use std::time::{Duration, SystemTime, UNIX_EPOCH};
20
21use super::auth::AppState;
22use super::blob_read::{acquire_blob_read, acquire_blob_write, blob_read_timeout, BLOB_READ_BUSY};
23use super::ingest_filter::{
24 content_type_base, is_chk_content_type, validate_untrusted_blob, IngestRejection,
25};
26use super::mime::get_mime_type;
27
28const BLOSSOM_AUTH_KIND: u16 = 24242;
30
31const IMMUTABLE_CACHE_CONTROL: &str = "public, max-age=31536000, immutable";
33const NOT_FOUND_CACHE_CONTROL: &str = "no-store";
34const IMMUTABLE_NOT_FOUND_CACHE_CONTROL: &str = "public, max-age=0, s-maxage=5";
35const OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS_ENV: &str = "HTREE_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS";
36const DEFAULT_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS: u64 = 15_000;
37
38pub const DEFAULT_MAX_UPLOAD_SIZE: usize = 5 * 1024 * 1024;
40const OPTIMISTIC_UPLOAD_MIN_QUEUE_CHARGE_BYTES: usize = 256 * 1024;
41const MAX_BATCH_UPLOAD_BLOBS: usize = 1024;
42const MAX_BATCH_UPLOAD_BYTES: usize = 64 * 1024 * 1024;
43
44#[derive(Debug, Clone, Copy)]
45pub(super) struct OptimisticUploadQueueSnapshot {
46 pub enabled: bool,
47 pub max_bytes: usize,
48 pub available_bytes: usize,
49 pub reserved_bytes: usize,
50 pub in_flight: usize,
51 pub queue_timeout_ms: u64,
52}
53
54#[allow(clippy::result_large_err)]
57fn check_write_access(state: &AppState, pubkey: &str) -> Result<(), Response<Body>> {
58 if is_allowed_write_author(state, pubkey) {
60 tracing::debug!(
61 "Blossom write allowed for {}... (allowed writer)",
62 &pubkey[..8.min(pubkey.len())]
63 );
64 return Ok(());
65 }
66
67 tracing::info!(
69 "Blossom write denied for {}... (not in allowed_npubs or social graph)",
70 &pubkey[..8.min(pubkey.len())]
71 );
72 Err(Response::builder()
73 .status(StatusCode::FORBIDDEN)
74 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
75 .header(header::CONTENT_TYPE, "application/json")
76 .body(Body::from(
77 r#"{"error":"Write access denied. Your pubkey is not in the allowed list."}"#,
78 ))
79 .unwrap())
80}
81
82fn is_allowed_write_author(state: &AppState, pubkey: &str) -> bool {
83 if state.allowed_pubkeys.contains(pubkey) {
84 return true;
85 }
86
87 state
88 .social_graph
89 .as_ref()
90 .map(|sg| sg.check_write_access(pubkey))
91 .unwrap_or(false)
92}
93
94fn can_accept_upload_author(state: &AppState, pubkey: &str) -> bool {
95 state.public_writes || is_allowed_write_author(state, pubkey)
96}
97
98fn validate_upload_payload(
99 body: &[u8],
100 content_type: &str,
101 is_allowed_writer: bool,
102 require_random_untrusted_ingest: bool,
103) -> Result<(), (StatusCode, String)> {
104 let is_chk_upload = is_chk_content_type(content_type);
105
106 if !is_chk_upload && !is_allowed_writer {
107 return Err((
108 StatusCode::FORBIDDEN,
109 "Raw media uploads require write access".to_string(),
110 ));
111 }
112
113 if is_chk_upload {
114 validate_untrusted_blob(body, require_random_untrusted_ingest)
115 .map_err(|IngestRejection { status, reason }| (status, reason))?;
116 }
117
118 Ok(())
119}
120
121fn blossom_json_error(status: StatusCode, reason: impl Into<String>) -> Response<Body> {
122 let reason = reason.into();
123 Response::builder()
124 .status(status)
125 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
126 .header("X-Reason", reason.as_str())
127 .header(header::CONTENT_TYPE, "application/json")
128 .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
129 .unwrap()
130}
131
132fn blossom_retryable_json_error(
133 status: StatusCode,
134 reason: impl Into<String>,
135 retry_after_seconds: u64,
136) -> Response<Body> {
137 let reason = reason.into();
138 Response::builder()
139 .status(status)
140 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
141 .header("X-Reason", reason.as_str())
142 .header(header::RETRY_AFTER, retry_after_seconds.to_string())
143 .header(header::CONTENT_TYPE, "application/json")
144 .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
145 .unwrap()
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct BlobDescriptor {
151 pub url: String,
152 pub sha256: String,
153 pub size: u64,
154 #[serde(rename = "type")]
155 pub mime_type: String,
156 pub uploaded: u64,
157}
158
159#[derive(Debug, Deserialize)]
160pub struct BatchUploadBlob {
161 pub sha256: String,
162 #[serde(default, alias = "contentType")]
163 pub content_type: Option<String>,
164 pub data: String,
165}
166
167#[derive(Debug, Deserialize)]
168pub struct BatchUploadRequest {
169 pub blobs: Vec<BatchUploadBlob>,
170}
171
172#[derive(Debug, Serialize)]
173pub struct BatchUploadResponse {
174 pub uploaded: usize,
175 pub blobs: Vec<BlobDescriptor>,
176}
177
178#[derive(Debug, Deserialize)]
180pub struct ListQuery {
181 pub since: Option<u64>,
182 pub until: Option<u64>,
183 pub limit: Option<usize>,
184 pub cursor: Option<String>,
185}
186
187#[derive(Debug)]
189pub struct BlossomAuth {
190 pub pubkey: String,
191 pub kind: u16,
192 pub created_at: u64,
193 pub expiration: Option<u64>,
194 pub action: Option<String>, pub blob_hashes: Vec<String>, pub server: Option<String>, }
198
199pub fn verify_blossom_auth(
202 headers: &HeaderMap,
203 required_action: &str,
204 required_hash: Option<&str>,
205) -> Result<BlossomAuth, (StatusCode, &'static str)> {
206 let auth_header = headers
207 .get(header::AUTHORIZATION)
208 .and_then(|v| v.to_str().ok())
209 .ok_or((StatusCode::UNAUTHORIZED, "Missing Authorization header"))?;
210
211 let nostr_event = auth_header.strip_prefix("Nostr ").ok_or((
212 StatusCode::UNAUTHORIZED,
213 "Invalid auth scheme, expected 'Nostr'",
214 ))?;
215
216 let engine = base64::engine::general_purpose::STANDARD;
218 let event_bytes = engine
219 .decode(nostr_event)
220 .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid base64 in auth header"))?;
221
222 let event_json: serde_json::Value = serde_json::from_slice(&event_bytes)
223 .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid JSON in auth event"))?;
224
225 let kind = event_json["kind"]
227 .as_u64()
228 .ok_or((StatusCode::BAD_REQUEST, "Missing kind in event"))?;
229
230 if kind != BLOSSOM_AUTH_KIND as u64 {
231 return Err((
232 StatusCode::BAD_REQUEST,
233 "Invalid event kind, expected 24242",
234 ));
235 }
236
237 let pubkey = event_json["pubkey"]
238 .as_str()
239 .ok_or((StatusCode::BAD_REQUEST, "Missing pubkey in event"))?
240 .to_string();
241
242 let created_at = event_json["created_at"]
243 .as_u64()
244 .ok_or((StatusCode::BAD_REQUEST, "Missing created_at in event"))?;
245
246 let sig = event_json["sig"]
247 .as_str()
248 .ok_or((StatusCode::BAD_REQUEST, "Missing signature in event"))?;
249
250 if !verify_nostr_signature(&event_json, &pubkey, sig) {
252 return Err((StatusCode::UNAUTHORIZED, "Invalid signature"));
253 }
254
255 let tags = event_json["tags"]
257 .as_array()
258 .ok_or((StatusCode::BAD_REQUEST, "Missing tags in event"))?;
259
260 let mut expiration: Option<u64> = None;
261 let mut action: Option<String> = None;
262 let mut blob_hashes: Vec<String> = Vec::new();
263 let mut server: Option<String> = None;
264
265 for tag in tags {
266 let tag_arr = tag.as_array();
267 if let Some(arr) = tag_arr {
268 if arr.len() >= 2 {
269 let tag_name = arr[0].as_str().unwrap_or("");
270 let tag_value = arr[1].as_str().unwrap_or("");
271
272 match tag_name {
273 "t" => action = Some(tag_value.to_string()),
274 "x" => blob_hashes.push(tag_value.to_lowercase()),
275 "expiration" => expiration = tag_value.parse().ok(),
276 "server" => server = Some(tag_value.to_string()),
277 _ => {}
278 }
279 }
280 }
281 }
282
283 let now = SystemTime::now()
285 .duration_since(UNIX_EPOCH)
286 .unwrap()
287 .as_secs();
288
289 if let Some(exp) = expiration {
290 if exp < now {
291 return Err((StatusCode::UNAUTHORIZED, "Authorization expired"));
292 }
293 }
294
295 if created_at > now + 60 {
297 return Err((StatusCode::BAD_REQUEST, "Event created_at is in the future"));
298 }
299
300 if let Some(ref act) = action {
302 if act != required_action {
303 return Err((StatusCode::FORBIDDEN, "Action mismatch"));
304 }
305 } else {
306 return Err((StatusCode::BAD_REQUEST, "Missing 't' tag for action"));
307 }
308
309 if let Some(hash) = required_hash {
311 if !blob_hashes.is_empty() && !blob_hashes.contains(&hash.to_lowercase()) {
312 return Err((StatusCode::FORBIDDEN, "Blob hash not authorized"));
313 }
314 }
315
316 Ok(BlossomAuth {
317 pubkey,
318 kind: kind as u16,
319 created_at,
320 expiration,
321 action,
322 blob_hashes,
323 server,
324 })
325}
326
327fn verify_nostr_signature(event: &serde_json::Value, pubkey: &str, sig: &str) -> bool {
329 use secp256k1::{schnorr::Signature, Message, Secp256k1, XOnlyPublicKey};
330
331 let content = event["content"].as_str().unwrap_or("");
333 let full_serialized = format!(
334 "[0,\"{}\",{},{},{},\"{}\"]",
335 pubkey,
336 event["created_at"],
337 event["kind"],
338 event["tags"],
339 escape_json_string(content),
340 );
341
342 let mut hasher = Sha256::new();
343 hasher.update(full_serialized.as_bytes());
344 let event_id = hasher.finalize();
345
346 let pubkey_bytes = match hex::decode(pubkey) {
348 Ok(b) => b,
349 Err(_) => return false,
350 };
351
352 let sig_bytes = match hex::decode(sig) {
353 Ok(b) => b,
354 Err(_) => return false,
355 };
356
357 let secp = Secp256k1::verification_only();
358
359 let xonly_pubkey = match XOnlyPublicKey::from_slice(&pubkey_bytes) {
360 Ok(pk) => pk,
361 Err(_) => return false,
362 };
363
364 let signature = match Signature::from_slice(&sig_bytes) {
365 Ok(s) => s,
366 Err(_) => return false,
367 };
368
369 let message = match Message::from_digest_slice(&event_id) {
370 Ok(m) => m,
371 Err(_) => return false,
372 };
373
374 secp.verify_schnorr(&signature, &message, &xonly_pubkey)
375 .is_ok()
376}
377
378fn escape_json_string(s: &str) -> String {
380 let mut result = String::new();
381 for c in s.chars() {
382 match c {
383 '"' => result.push_str("\\\""),
384 '\\' => result.push_str("\\\\"),
385 '\n' => result.push_str("\\n"),
386 '\r' => result.push_str("\\r"),
387 '\t' => result.push_str("\\t"),
388 c if c.is_control() => {
389 result.push_str(&format!("\\u{:04x}", c as u32));
390 }
391 c => result.push(c),
392 }
393 }
394 result
395}
396
397pub async fn cors_preflight(headers: HeaderMap) -> impl IntoResponse {
400 let allowed_headers = headers
402 .get(header::ACCESS_CONTROL_REQUEST_HEADERS)
403 .and_then(|v| v.to_str().ok())
404 .unwrap_or("Authorization, Content-Type, X-SHA-256, x-sha-256");
405
406 let full_allowed = format!(
408 "{}, Authorization, Content-Type, X-SHA-256, x-sha-256, Accept, Cache-Control",
409 allowed_headers
410 );
411
412 Response::builder()
413 .status(StatusCode::NO_CONTENT)
414 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
415 .header(
416 header::ACCESS_CONTROL_ALLOW_METHODS,
417 "GET, HEAD, PUT, DELETE, OPTIONS",
418 )
419 .header(header::ACCESS_CONTROL_ALLOW_HEADERS, full_allowed)
420 .header(header::ACCESS_CONTROL_MAX_AGE, "86400")
421 .body(Body::empty())
422 .unwrap()
423}
424
425pub async fn head_blob(
427 State(state): State<AppState>,
428 Path(id): Path<String>,
429 connect_info: axum::extract::ConnectInfo<std::net::SocketAddr>,
430) -> impl IntoResponse {
431 let is_localhost = connect_info.0.ip().is_loopback();
432 let (hash_part, ext) = parse_hash_and_extension(&id);
433
434 if !is_valid_sha256(hash_part) {
435 return Response::builder()
436 .status(StatusCode::BAD_REQUEST)
437 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
438 .header("X-Reason", "Invalid SHA256 hash")
439 .body(Body::empty())
440 .unwrap();
441 }
442
443 let sha256_hex = hash_part.to_lowercase();
444 let sha256_bytes: [u8; 32] = match from_hex(&sha256_hex) {
445 Ok(b) => b,
446 Err(_) => {
447 return Response::builder()
448 .status(StatusCode::BAD_REQUEST)
449 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
450 .header("X-Reason", "Invalid SHA256 format")
451 .body(Body::empty())
452 .unwrap()
453 }
454 };
455
456 let blob_size = if let Some(cached) = state.blob_cache.get_size(&sha256_hex) {
461 Ok(Ok(cached))
462 } else {
463 let permit = match acquire_blob_read().await {
464 Ok(permit) => permit,
465 Err(_) => {
466 return Response::builder()
467 .status(StatusCode::SERVICE_UNAVAILABLE)
468 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
469 .header(header::CACHE_CONTROL, NOT_FOUND_CACHE_CONTROL)
470 .header("Retry-After", "1")
471 .header("X-Reason", BLOB_READ_BUSY)
472 .body(Body::empty())
473 .unwrap()
474 }
475 };
476 let store = state.store.clone();
477 let size_read = tokio::task::spawn_blocking(move || store.blob_size(&sha256_bytes));
478 let timed = tokio::time::timeout(blob_read_timeout(), size_read).await;
479 drop(permit);
480 let result = match timed {
481 Ok(result) => result.map_err(|_| ()),
482 Err(_) => Err(()),
483 };
484 if let Ok(Ok(size)) = result {
485 state.blob_cache.put_size(sha256_hex.clone(), size);
486 }
487 result
488 };
489
490 match blob_size {
491 Ok(Ok(Some(size))) => {
492 let mime_type = ext
493 .map(|e| get_mime_type(&format!("file{}", e)))
494 .unwrap_or("application/octet-stream");
495
496 let mut builder = Response::builder()
497 .status(StatusCode::OK)
498 .header(header::CONTENT_TYPE, mime_type)
499 .header(header::CONTENT_LENGTH, size)
500 .header(header::ACCEPT_RANGES, "bytes")
501 .header(header::CACHE_CONTROL, IMMUTABLE_CACHE_CONTROL)
502 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*");
503 if is_localhost {
504 builder = builder.header("X-Source", "local");
505 }
506 builder.body(Body::empty()).unwrap()
507 }
508 Ok(Ok(None)) => Response::builder()
509 .status(StatusCode::NOT_FOUND)
510 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
511 .header(header::CACHE_CONTROL, IMMUTABLE_NOT_FOUND_CACHE_CONTROL)
512 .header("X-Reason", "Blob not found")
513 .body(Body::empty())
514 .unwrap(),
515 _ => Response::builder()
516 .status(StatusCode::INTERNAL_SERVER_ERROR)
517 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
518 .body(Body::empty())
519 .unwrap(),
520 }
521}
522
523async fn store_blossom_blob_without_blocking_runtime(
524 state: &AppState,
525 data: axum::body::Bytes,
526 pubkey: [u8; 32],
527 track_ownership: bool,
528) -> anyhow::Result<()> {
529 let mut hasher = Sha256::new();
530 hasher.update(&data);
531 let hash_hex = hex::encode(hasher.finalize());
532 let data_for_cache = data.clone();
533 let permit = acquire_blob_write()
534 .await
535 .map_err(|err| anyhow::anyhow!(err))?;
536 let store = state.store.clone();
537 tokio::task::spawn_blocking(move || {
538 let _permit = permit;
539 if track_ownership {
540 store.put_owned_blob(&data, &pubkey)?;
541 } else {
542 store.put_cached_blob(&data)?;
543 }
544 Ok::<(), anyhow::Error>(())
545 })
546 .await
547 .map_err(|err| anyhow::anyhow!("blob write task failed: {}", err))??;
548 state
549 .blob_cache
550 .put_size(hash_hex.clone(), Some(data_for_cache.len() as u64));
551 state.blob_cache.put_body(hash_hex, &data_for_cache);
552 Ok(())
553}
554
555fn upload_descriptor_response(status: StatusCode, descriptor: &BlobDescriptor) -> Response<Body> {
556 Response::builder()
557 .status(status)
558 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
559 .header(header::CONTENT_TYPE, "application/json")
560 .body(Body::from(serde_json::to_string(descriptor).unwrap()))
561 .unwrap()
562}
563
564fn optimistic_upload_queue_timeout() -> Duration {
565 let millis = std::env::var(OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS_ENV)
566 .ok()
567 .and_then(|value| value.parse::<u64>().ok())
568 .filter(|value| *value > 0)
569 .unwrap_or(DEFAULT_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS);
570 Duration::from_millis(millis)
571}
572
573fn optimistic_upload_inflight() -> &'static Mutex<HashSet<String>> {
574 static INFLIGHT: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
575 INFLIGHT.get_or_init(|| Mutex::new(HashSet::new()))
576}
577
578fn optimistic_upload_is_inflight(hash_hex: &str) -> bool {
579 optimistic_upload_inflight()
580 .lock()
581 .is_ok_and(|inflight| inflight.contains(hash_hex))
582}
583
584fn mark_optimistic_upload_inflight(hash_hex: &str) -> bool {
585 optimistic_upload_inflight()
586 .lock()
587 .map(|mut inflight| inflight.insert(hash_hex.to_string()))
588 .unwrap_or(true)
589}
590
591fn clear_optimistic_upload_inflight(hash_hex: &str) {
592 if let Ok(mut inflight) = optimistic_upload_inflight().lock() {
593 inflight.remove(hash_hex);
594 }
595}
596
597pub(super) fn optimistic_upload_queue_snapshot(state: &AppState) -> OptimisticUploadQueueSnapshot {
598 let max_bytes = state.optimistic_upload_queue_bytes;
599 let available_bytes = state
600 .optimistic_upload_queue
601 .available_permits()
602 .min(max_bytes);
603 let in_flight = optimistic_upload_inflight()
604 .lock()
605 .map(|inflight| inflight.len())
606 .unwrap_or(0);
607
608 OptimisticUploadQueueSnapshot {
609 enabled: state.optimistic_blossom_uploads,
610 max_bytes,
611 available_bytes,
612 reserved_bytes: max_bytes.saturating_sub(available_bytes),
613 in_flight,
614 queue_timeout_ms: duration_millis_u64(optimistic_upload_queue_timeout()),
615 }
616}
617
618fn duration_millis_u64(duration: Duration) -> u64 {
619 duration.as_millis().min(u128::from(u64::MAX)) as u64
620}
621
622async fn acquire_optimistic_upload_queue(
623 state: &AppState,
624 permits: u32,
625) -> Result<tokio::sync::OwnedSemaphorePermit, &'static str> {
626 match tokio::time::timeout(
627 optimistic_upload_queue_timeout(),
628 state
629 .optimistic_upload_queue
630 .clone()
631 .acquire_many_owned(permits),
632 )
633 .await
634 {
635 Ok(Ok(permit)) => Ok(permit),
636 Ok(Err(_)) => Err("Optimistic upload queue is closed"),
637 Err(_) => Err("Optimistic upload queue is full"),
638 }
639}
640
641async fn uploaded_blob_already_exists(
642 state: &AppState,
643 sha256_hash: [u8; 32],
644 sha256_hex: &str,
645) -> Result<bool, String> {
646 if let Some(Some(_)) = state.blob_cache.get_size(sha256_hex) {
647 return Ok(true);
648 }
649
650 let permit = acquire_blob_read().await.map_err(str::to_string)?;
651 let store = state.store.clone();
652 let size_read = tokio::task::spawn_blocking(move || {
653 store
654 .blob_size(&sha256_hash)
655 .map_err(|error| error.to_string())
656 });
657
658 let result = tokio::time::timeout(blob_read_timeout(), size_read).await;
659 drop(permit);
660 match result {
661 Ok(Ok(Ok(size))) => {
662 state.blob_cache.put_size(sha256_hex.to_string(), size);
663 Ok(size.is_some())
664 }
665 Ok(Ok(Err(error))) => Err(error),
666 Ok(Err(error)) => Err(format!("blob existence task failed: {}", error)),
667 Err(_) => Err("blob existence check timed out".to_string()),
668 }
669}
670
671async fn set_existing_blob_owner_without_body_write(
672 state: AppState,
673 sha256_hash: [u8; 32],
674 pubkey: [u8; 32],
675) -> anyhow::Result<()> {
676 tokio::task::spawn_blocking(move || state.store.set_blob_owner(&sha256_hash, &pubkey))
677 .await
678 .map_err(|error| anyhow::anyhow!("blob owner task failed: {}", error))??;
679 Ok(())
680}
681
682pub async fn upload_blob(
684 State(state): State<AppState>,
685 headers: HeaderMap,
686 body: axum::body::Bytes,
687) -> impl IntoResponse {
688 let max_size = state.max_upload_bytes;
690 if body.len() > max_size {
691 return Response::builder()
692 .status(StatusCode::PAYLOAD_TOO_LARGE)
693 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
694 .header(header::CONTENT_TYPE, "application/json")
695 .body(Body::from(format!(
696 r#"{{"error":"Upload size {} bytes exceeds maximum {} bytes ({} MB)"}}"#,
697 body.len(),
698 max_size,
699 max_size / 1024 / 1024
700 )))
701 .unwrap();
702 }
703
704 let auth = match verify_blossom_auth(&headers, "upload", None) {
706 Ok(a) => a,
707 Err((status, reason)) => {
708 return Response::builder()
709 .status(status)
710 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
711 .header("X-Reason", reason)
712 .header(header::CONTENT_TYPE, "application/json")
713 .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
714 .unwrap();
715 }
716 };
717
718 let content_type = headers
720 .get(header::CONTENT_TYPE)
721 .and_then(|v| v.to_str().ok())
722 .unwrap_or("application/octet-stream")
723 .to_string();
724
725 let is_allowed_author = is_allowed_write_author(&state, &auth.pubkey);
727 let can_upload = can_accept_upload_author(&state, &auth.pubkey);
728 if !can_upload {
729 let _ = check_write_access(&state, &auth.pubkey);
730 return Response::builder()
731 .status(StatusCode::FORBIDDEN)
732 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
733 .header(header::CONTENT_TYPE, "application/json")
734 .body(Body::from(r#"{"error":"Write access denied. Your pubkey is not in the allowed list and public writes are disabled."}"#))
735 .unwrap();
736 }
737
738 if let Err((status, reason)) = validate_upload_payload(
739 &body,
740 &content_type,
741 can_upload,
742 state.require_random_untrusted_ingest,
743 ) {
744 return blossom_json_error(status, reason);
745 }
746
747 let mut hasher = Sha256::new();
749 hasher.update(&body);
750 let sha256_hash: [u8; 32] = hasher.finalize().into();
751 let sha256_hex = hex::encode(sha256_hash);
752
753 if !auth.blob_hashes.is_empty() && !auth.blob_hashes.contains(&sha256_hex) {
755 return Response::builder()
756 .status(StatusCode::FORBIDDEN)
757 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
758 .header(
759 "X-Reason",
760 "Uploaded blob hash does not match authorized hash",
761 )
762 .header(header::CONTENT_TYPE, "application/json")
763 .body(Body::from(r#"{"error":"Hash mismatch"}"#))
764 .unwrap();
765 }
766
767 let pubkey_bytes = match from_hex(&auth.pubkey) {
769 Ok(b) => b,
770 Err(_) => {
771 return Response::builder()
772 .status(StatusCode::BAD_REQUEST)
773 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
774 .header("X-Reason", "Invalid pubkey format")
775 .body(Body::empty())
776 .unwrap();
777 }
778 };
779
780 let size = body.len() as u64;
781 let now = SystemTime::now()
782 .duration_since(UNIX_EPOCH)
783 .unwrap()
784 .as_secs();
785 let ext = mime_to_extension(&content_type_base(&content_type));
786 let descriptor = BlobDescriptor {
787 url: format!("/{}{}", sha256_hex, ext),
788 sha256: sha256_hex.clone(),
789 size,
790 mime_type: content_type,
791 uploaded: now,
792 };
793
794 if state.optimistic_blossom_uploads && optimistic_upload_is_inflight(&sha256_hex) {
795 return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
796 }
797
798 if state.optimistic_blossom_uploads {
801 let queued_bytes = body.len().max(OPTIMISTIC_UPLOAD_MIN_QUEUE_CHARGE_BYTES);
802 if queued_bytes <= state.optimistic_upload_queue_bytes {
803 let permits = queued_bytes as u32;
804 let marked_inflight = mark_optimistic_upload_inflight(&sha256_hex);
805 if !marked_inflight {
806 return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
807 }
808 let permit = match acquire_optimistic_upload_queue(&state, permits).await {
809 Ok(permit) => permit,
810 Err(_) => {
811 clear_optimistic_upload_inflight(&sha256_hex);
812 return blossom_retryable_json_error(
813 StatusCode::SERVICE_UNAVAILABLE,
814 "Optimistic upload queue is full",
815 2,
816 );
817 }
818 };
819 let state_for_write = state.clone();
820 let hash_for_log = sha256_hex.clone();
821 tokio::spawn(async move {
822 let _permit = permit;
823 if let Err(error) = store_blossom_blob_without_blocking_runtime(
824 &state_for_write,
825 body,
826 pubkey_bytes,
827 is_allowed_author,
828 )
829 .await
830 {
831 tracing::error!(
832 "Background Blossom storage failed for {}: {:#}",
833 hash_for_log,
834 error
835 );
836 }
837 clear_optimistic_upload_inflight(&hash_for_log);
838 });
839
840 return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
841 }
842
843 tracing::warn!(
844 "Blossom upload {} is larger than optimistic queue budget {}; storing synchronously",
845 queued_bytes,
846 state.optimistic_upload_queue_bytes
847 );
848 }
849
850 match uploaded_blob_already_exists(&state, sha256_hash, &sha256_hex).await {
851 Ok(true) => {
852 if is_allowed_author {
853 if let Err(error) = set_existing_blob_owner_without_body_write(
854 state.clone(),
855 sha256_hash,
856 pubkey_bytes,
857 )
858 .await
859 {
860 return Response::builder()
861 .status(StatusCode::INTERNAL_SERVER_ERROR)
862 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
863 .header("X-Reason", "Storage error")
864 .header(header::CONTENT_TYPE, "application/json")
865 .body(Body::from(format!(r#"{{"error":"{}"}}"#, error)))
866 .unwrap();
867 }
868 }
869 return upload_descriptor_response(StatusCode::CONFLICT, &descriptor);
870 }
871 Ok(false) => {}
872 Err(error) => {
873 tracing::debug!(
874 "Could not preflight Blossom upload {} before synchronous storage: {}",
875 sha256_hex,
876 error
877 );
878 }
879 }
880
881 let store_result = store_blossom_blob_without_blocking_runtime(
882 &state,
883 body.clone(),
884 pubkey_bytes,
885 is_allowed_author,
886 )
887 .await;
888
889 match store_result {
890 Ok(()) => upload_descriptor_response(StatusCode::OK, &descriptor),
891 Err(e) => Response::builder()
892 .status(StatusCode::INTERNAL_SERVER_ERROR)
893 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
894 .header("X-Reason", "Storage error")
895 .header(header::CONTENT_TYPE, "application/json")
896 .body(Body::from(format!(r#"{{"error":"{}"}}"#, e)))
897 .unwrap(),
898 }
899}
900
901pub async fn upload_blob_batch(
903 State(state): State<AppState>,
904 headers: HeaderMap,
905 Json(payload): Json<BatchUploadRequest>,
906) -> impl IntoResponse {
907 if payload.blobs.is_empty() {
908 return blossom_json_error(StatusCode::BAD_REQUEST, "Batch is empty");
909 }
910 if payload.blobs.len() > MAX_BATCH_UPLOAD_BLOBS {
911 return blossom_json_error(StatusCode::PAYLOAD_TOO_LARGE, "Batch contains too many blobs");
912 }
913
914 let auth = match verify_blossom_auth(&headers, "upload", None) {
915 Ok(a) => a,
916 Err((status, reason)) => {
917 return Response::builder()
918 .status(status)
919 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
920 .header("X-Reason", reason)
921 .header(header::CONTENT_TYPE, "application/json")
922 .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
923 .unwrap();
924 }
925 };
926
927 let is_allowed_author = is_allowed_write_author(&state, &auth.pubkey);
928 let can_upload = can_accept_upload_author(&state, &auth.pubkey);
929 if !can_upload {
930 let _ = check_write_access(&state, &auth.pubkey);
931 return blossom_json_error(StatusCode::FORBIDDEN, "Write access denied");
932 }
933
934 let pubkey_bytes = match from_hex(&auth.pubkey) {
935 Ok(bytes) => bytes,
936 Err(_) => return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid pubkey format"),
937 };
938
939 let now = SystemTime::now()
940 .duration_since(UNIX_EPOCH)
941 .unwrap()
942 .as_secs();
943 let mut total_bytes = 0usize;
944 let mut items = Vec::with_capacity(payload.blobs.len());
945 let mut descriptors = Vec::with_capacity(payload.blobs.len());
946
947 for blob in payload.blobs {
948 let sha256_hex = blob.sha256.to_lowercase();
949 let expected_hash: [u8; 32] = match from_hex(&sha256_hex) {
950 Ok(hash) => hash,
951 Err(_) => return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid blob hash"),
952 };
953 if !auth.blob_hashes.is_empty() && !auth.blob_hashes.contains(&sha256_hex) {
954 return blossom_json_error(StatusCode::FORBIDDEN, "Uploaded blob hash does not match authorized hash");
955 }
956
957 let data = match base64::engine::general_purpose::STANDARD.decode(blob.data.as_bytes()) {
958 Ok(data) => data,
959 Err(_) => return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid blob data"),
960 };
961 if data.len() > state.max_upload_bytes {
962 return blossom_json_error(StatusCode::PAYLOAD_TOO_LARGE, "Blob exceeds maximum upload size");
963 }
964 total_bytes = total_bytes.saturating_add(data.len());
965 if total_bytes > MAX_BATCH_UPLOAD_BYTES {
966 return blossom_json_error(StatusCode::PAYLOAD_TOO_LARGE, "Batch exceeds maximum upload size");
967 }
968
969 let mut hasher = Sha256::new();
970 hasher.update(&data);
971 let actual_hash: [u8; 32] = hasher.finalize().into();
972 if actual_hash != expected_hash {
973 return blossom_json_error(StatusCode::FORBIDDEN, "Hash mismatch");
974 }
975
976 let content_type = blob
977 .content_type
978 .as_deref()
979 .map(content_type_base)
980 .unwrap_or_else(|| "application/octet-stream".to_string());
981 if let Err((status, reason)) = validate_upload_payload(
982 &data,
983 &content_type,
984 can_upload,
985 state.require_random_untrusted_ingest,
986 ) {
987 return blossom_json_error(status, reason);
988 }
989
990 let ext = mime_to_extension(&content_type);
991 descriptors.push(BlobDescriptor {
992 url: format!("/{}{}", sha256_hex, ext),
993 sha256: sha256_hex,
994 size: data.len() as u64,
995 mime_type: content_type,
996 uploaded: now,
997 });
998 items.push((actual_hash, data));
999 }
1000
1001 let store = state.store.clone();
1002 let stored = tokio::task::spawn_blocking(move || {
1003 if is_allowed_author {
1004 store.put_owned_blobs(&items, &pubkey_bytes)
1005 } else {
1006 let mut inserted = 0usize;
1007 for (_, data) in &items {
1008 store.put_cached_blob(data)?;
1009 inserted += 1;
1010 }
1011 Ok(inserted)
1012 }
1013 })
1014 .await
1015 .map_err(|error| anyhow::anyhow!("blob batch write task failed: {}", error));
1016
1017 match stored {
1018 Ok(Ok(uploaded)) => Response::builder()
1019 .status(StatusCode::OK)
1020 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1021 .header(header::CONTENT_TYPE, "application/json")
1022 .body(Body::from(
1023 serde_json::to_string(&BatchUploadResponse {
1024 uploaded,
1025 blobs: descriptors,
1026 })
1027 .unwrap(),
1028 ))
1029 .unwrap(),
1030 Ok(Err(error)) | Err(error) => Response::builder()
1031 .status(StatusCode::INTERNAL_SERVER_ERROR)
1032 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1033 .header("X-Reason", "Storage error")
1034 .header(header::CONTENT_TYPE, "application/json")
1035 .body(Body::from(format!(r#"{{"error":"{}"}}"#, error)))
1036 .unwrap(),
1037 }
1038}
1039
1040pub async fn delete_blob(
1043 State(state): State<AppState>,
1044 Path(id): Path<String>,
1045 headers: HeaderMap,
1046) -> impl IntoResponse {
1047 let (hash_part, _) = parse_hash_and_extension(&id);
1048
1049 if !is_valid_sha256(hash_part) {
1050 return Response::builder()
1051 .status(StatusCode::BAD_REQUEST)
1052 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1053 .header("X-Reason", "Invalid SHA256 hash")
1054 .body(Body::empty())
1055 .unwrap();
1056 }
1057
1058 let sha256_hex = hash_part.to_lowercase();
1059
1060 let sha256_bytes = match from_hex(&sha256_hex) {
1062 Ok(b) => b,
1063 Err(_) => {
1064 return Response::builder()
1065 .status(StatusCode::BAD_REQUEST)
1066 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1067 .header("X-Reason", "Invalid SHA256 hash format")
1068 .body(Body::empty())
1069 .unwrap();
1070 }
1071 };
1072
1073 let auth = match verify_blossom_auth(&headers, "delete", Some(&sha256_hex)) {
1075 Ok(a) => a,
1076 Err((status, reason)) => {
1077 return Response::builder()
1078 .status(status)
1079 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1080 .header("X-Reason", reason)
1081 .body(Body::empty())
1082 .unwrap();
1083 }
1084 };
1085
1086 let pubkey_bytes = match from_hex(&auth.pubkey) {
1088 Ok(b) => b,
1089 Err(_) => {
1090 return Response::builder()
1091 .status(StatusCode::BAD_REQUEST)
1092 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1093 .header("X-Reason", "Invalid pubkey format")
1094 .body(Body::empty())
1095 .unwrap();
1096 }
1097 };
1098
1099 match state.store.is_blob_owner(&sha256_bytes, &pubkey_bytes) {
1101 Ok(true) => {
1102 }
1104 Ok(false) => {
1105 match state.store.blob_has_owners(&sha256_bytes) {
1107 Ok(true) => {
1108 return Response::builder()
1109 .status(StatusCode::FORBIDDEN)
1110 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1111 .header("X-Reason", "Not a blob owner")
1112 .body(Body::empty())
1113 .unwrap();
1114 }
1115 Ok(false) => {
1116 return Response::builder()
1117 .status(StatusCode::NOT_FOUND)
1118 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1119 .header(header::CACHE_CONTROL, NOT_FOUND_CACHE_CONTROL)
1120 .header("X-Reason", "Blob not found")
1121 .body(Body::empty())
1122 .unwrap();
1123 }
1124 Err(_) => {
1125 return Response::builder()
1126 .status(StatusCode::INTERNAL_SERVER_ERROR)
1127 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1128 .body(Body::empty())
1129 .unwrap();
1130 }
1131 }
1132 }
1133 Err(_) => {
1134 return Response::builder()
1135 .status(StatusCode::INTERNAL_SERVER_ERROR)
1136 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1137 .body(Body::empty())
1138 .unwrap();
1139 }
1140 }
1141
1142 match state
1144 .store
1145 .delete_blossom_blob(&sha256_bytes, &pubkey_bytes)
1146 {
1147 Ok(fully_deleted) => {
1148 Response::builder()
1151 .status(StatusCode::OK)
1152 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1153 .header(
1154 "X-Blob-Deleted",
1155 if fully_deleted { "true" } else { "false" },
1156 )
1157 .body(Body::empty())
1158 .unwrap()
1159 }
1160 Err(_) => Response::builder()
1161 .status(StatusCode::INTERNAL_SERVER_ERROR)
1162 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1163 .body(Body::empty())
1164 .unwrap(),
1165 }
1166}
1167
1168pub async fn list_blobs(
1170 State(state): State<AppState>,
1171 Path(pubkey): Path<String>,
1172 Query(query): Query<ListQuery>,
1173 headers: HeaderMap,
1174) -> impl IntoResponse {
1175 if pubkey.len() != 64 || !pubkey.chars().all(|c| c.is_ascii_hexdigit()) {
1177 return Response::builder()
1178 .status(StatusCode::BAD_REQUEST)
1179 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1180 .header("X-Reason", "Invalid pubkey format")
1181 .header(header::CONTENT_TYPE, "application/json")
1182 .body(Body::from("[]"))
1183 .unwrap();
1184 }
1185
1186 let pubkey_hex = pubkey.to_lowercase();
1187 let pubkey_bytes: [u8; 32] = match from_hex(&pubkey_hex) {
1188 Ok(b) => b,
1189 Err(_) => {
1190 return Response::builder()
1191 .status(StatusCode::BAD_REQUEST)
1192 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1193 .header("X-Reason", "Invalid pubkey format")
1194 .header(header::CONTENT_TYPE, "application/json")
1195 .body(Body::from("[]"))
1196 .unwrap()
1197 }
1198 };
1199
1200 let auth = match verify_blossom_auth(&headers, "list", None) {
1201 Ok(auth) => auth,
1202 Err((status, reason)) => {
1203 return Response::builder()
1204 .status(status)
1205 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1206 .header("X-Reason", reason)
1207 .header(header::CONTENT_TYPE, "application/json")
1208 .body(Body::from("[]"))
1209 .unwrap();
1210 }
1211 };
1212
1213 if !auth.pubkey.eq_ignore_ascii_case(&pubkey_hex) {
1214 return Response::builder()
1215 .status(StatusCode::FORBIDDEN)
1216 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1217 .header("X-Reason", "Pubkey mismatch")
1218 .header(header::CONTENT_TYPE, "application/json")
1219 .body(Body::from("[]"))
1220 .unwrap();
1221 }
1222
1223 match state.store.list_blobs_by_pubkey(&pubkey_bytes) {
1225 Ok(blobs) => {
1226 let mut filtered: Vec<_> = blobs
1228 .into_iter()
1229 .filter(|b| {
1230 if let Some(since) = query.since {
1231 if b.uploaded < since {
1232 return false;
1233 }
1234 }
1235 if let Some(until) = query.until {
1236 if b.uploaded > until {
1237 return false;
1238 }
1239 }
1240 true
1241 })
1242 .collect();
1243
1244 filtered.sort_by(|a, b| b.uploaded.cmp(&a.uploaded));
1246
1247 let limit = query.limit.unwrap_or(100).min(1000);
1249 filtered.truncate(limit);
1250
1251 Response::builder()
1252 .status(StatusCode::OK)
1253 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1254 .header(header::CONTENT_TYPE, "application/json")
1255 .body(Body::from(serde_json::to_string(&filtered).unwrap()))
1256 .unwrap()
1257 }
1258 Err(_) => Response::builder()
1259 .status(StatusCode::INTERNAL_SERVER_ERROR)
1260 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1261 .header(header::CONTENT_TYPE, "application/json")
1262 .body(Body::from("[]"))
1263 .unwrap(),
1264 }
1265}
1266
1267fn parse_hash_and_extension(id: &str) -> (&str, Option<&str>) {
1270 if let Some(dot_pos) = id.rfind('.') {
1271 (&id[..dot_pos], Some(&id[dot_pos..]))
1272 } else {
1273 (id, None)
1274 }
1275}
1276
1277fn is_valid_sha256(s: &str) -> bool {
1278 s.len() == 64 && s.chars().all(|c| c.is_ascii_hexdigit())
1279}
1280
1281#[cfg(test)]
1282fn store_blossom_blob(
1283 state: &AppState,
1284 data: &[u8],
1285 _sha256: &[u8; 32],
1286 pubkey: &[u8; 32],
1287 track_ownership: bool,
1288) -> anyhow::Result<()> {
1289 if track_ownership {
1290 state.store.put_owned_blob(data, pubkey)?;
1291 } else {
1292 state.store.put_cached_blob(data)?;
1293 }
1294
1295 Ok(())
1296}
1297
1298fn mime_to_extension(mime: &str) -> &'static str {
1299 match mime {
1300 "image/png" => ".png",
1301 "image/jpeg" => ".jpg",
1302 "image/gif" => ".gif",
1303 "image/webp" => ".webp",
1304 "image/svg+xml" => ".svg",
1305 "video/mp4" => ".mp4",
1306 "video/webm" => ".webm",
1307 "audio/mpeg" => ".mp3",
1308 "audio/ogg" => ".ogg",
1309 "application/pdf" => ".pdf",
1310 "text/plain" => ".txt",
1311 "text/html" => ".html",
1312 "application/json" => ".json",
1313 _ => "",
1314 }
1315}
1316
1317#[cfg(test)]
1318mod tests {
1319 use super::*;
1320 use crate::server::auth::WsRelayState;
1321 use crate::storage::HashtreeStore;
1322 use axum::response::IntoResponse;
1323 use base64::Engine;
1324 use hashtree_core::sha256;
1325 use std::collections::HashSet;
1326 use std::sync::{Arc, Mutex as StdMutex};
1327 use std::time::Duration;
1328 use tempfile::TempDir;
1329
1330 fn test_app_state(store: Arc<HashtreeStore>) -> AppState {
1331 AppState {
1332 store,
1333 auth: None,
1334 daemon_started_at: 1_700_000_000,
1335 peer_mode: crate::config::ServerMode::Normal,
1336 hash_get_enabled: true,
1337 http_webrtc_fetch: true,
1338 webrtc_peers: None,
1339 fips_transport: None,
1340 fetch_from_fips_peers: true,
1341 ws_relay: Arc::new(WsRelayState::new()),
1342 max_upload_bytes: 5 * 1024 * 1024,
1343 public_writes: true,
1344 require_random_untrusted_ingest: true,
1345 optimistic_blossom_uploads: false,
1346 optimistic_upload_queue_bytes: 512 * 1024 * 1024,
1347 optimistic_upload_queue: Arc::new(tokio::sync::Semaphore::new(512 * 1024 * 1024)),
1348 allowed_pubkeys: HashSet::new(),
1349 upstream_blossom: Vec::new(),
1350 social_graph: None,
1351 social_graph_store: None,
1352 social_graph_root: None,
1353 socialgraph_snapshot_public: false,
1354 nostr_relay: None,
1355 nostr_relay_urls: Vec::new(),
1356 tree_root_cache: Arc::new(StdMutex::new(std::collections::HashMap::new())),
1357 inflight_blob_fetches: Arc::new(tokio::sync::Mutex::new(
1358 std::collections::HashMap::new(),
1359 )),
1360 inflight_blob_reads: Arc::new(
1361 tokio::sync::Mutex::new(std::collections::HashMap::new()),
1362 ),
1363 blob_cache: Arc::new(crate::blob_cache::BlobCache::for_tests()),
1364 directory_listing_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1365 resolved_path_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1366 thumbnail_path_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1367 cid_size_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1368 }
1369 }
1370
1371 fn create_upload_auth_header(keys: &nostr::Keys) -> String {
1372 use nostr::{EventBuilder, Kind, Tag, TagKind, Timestamp};
1373
1374 let now = Timestamp::now();
1375 let event = EventBuilder::new(
1376 Kind::Custom(BLOSSOM_AUTH_KIND),
1377 "",
1378 vec![
1379 Tag::custom(TagKind::Custom("t".into()), vec!["upload".to_string()]),
1380 Tag::custom(
1381 TagKind::Custom("expiration".into()),
1382 vec![(now.as_u64() + 300).to_string()],
1383 ),
1384 ],
1385 )
1386 .custom_created_at(now)
1387 .to_event(keys)
1388 .expect("sign blossom auth");
1389 let json = serde_json::to_vec(&event).expect("serialize auth event");
1390 format!(
1391 "Nostr {}",
1392 base64::engine::general_purpose::STANDARD.encode(json)
1393 )
1394 }
1395
1396 #[test]
1397 fn test_is_valid_sha256() {
1398 assert!(is_valid_sha256(
1399 "e2bab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6"
1400 ));
1401 assert!(is_valid_sha256(
1402 "0000000000000000000000000000000000000000000000000000000000000000"
1403 ));
1404
1405 assert!(!is_valid_sha256("e2bab35b5296ec2242ded0a01f6d6723"));
1407 assert!(!is_valid_sha256(
1409 "e2bab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6aa"
1410 ));
1411 assert!(!is_valid_sha256(
1413 "zzbab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6"
1414 ));
1415 assert!(!is_valid_sha256(""));
1417 }
1418
1419 #[test]
1420 fn test_parse_hash_and_extension() {
1421 let (hash, ext) = parse_hash_and_extension("abc123.png");
1422 assert_eq!(hash, "abc123");
1423 assert_eq!(ext, Some(".png"));
1424
1425 let (hash2, ext2) = parse_hash_and_extension("abc123");
1426 assert_eq!(hash2, "abc123");
1427 assert_eq!(ext2, None);
1428
1429 let (hash3, ext3) = parse_hash_and_extension("abc.123.jpg");
1430 assert_eq!(hash3, "abc.123");
1431 assert_eq!(ext3, Some(".jpg"));
1432 }
1433
1434 #[test]
1435 fn test_mime_to_extension() {
1436 assert_eq!(mime_to_extension("image/png"), ".png");
1437 assert_eq!(mime_to_extension("image/jpeg"), ".jpg");
1438 assert_eq!(mime_to_extension("video/mp4"), ".mp4");
1439 assert_eq!(mime_to_extension("application/octet-stream"), "");
1440 assert_eq!(mime_to_extension("unknown/type"), "");
1441 }
1442
1443 #[tokio::test]
1444 async fn optimistic_uploads_return_accepted_and_store_in_background() {
1445 let temp_dir = TempDir::new().expect("temp dir");
1446 let store = Arc::new(
1447 HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1448 );
1449 let mut state = test_app_state(Arc::clone(&store));
1450 state.optimistic_blossom_uploads = true;
1451
1452 let keys = nostr::Keys::generate();
1453 let mut headers = HeaderMap::new();
1454 headers.insert(
1455 header::AUTHORIZATION,
1456 create_upload_auth_header(&keys)
1457 .parse()
1458 .expect("auth header value"),
1459 );
1460 headers.insert(
1461 header::CONTENT_TYPE,
1462 "application/octet-stream"
1463 .parse()
1464 .expect("content type header value"),
1465 );
1466
1467 let body = axum::body::Bytes::from((0u8..=255).map(|byte| byte ^ 0x55).collect::<Vec<_>>());
1468 let hash = sha256(&body);
1469 let response = upload_blob(State(state), headers, body)
1470 .await
1471 .into_response();
1472 assert_eq!(response.status(), StatusCode::ACCEPTED);
1473
1474 for _ in 0..50 {
1475 if store.blob_exists(&hash).expect("blob exists check") {
1476 return;
1477 }
1478 tokio::time::sleep(Duration::from_millis(10)).await;
1479 }
1480
1481 panic!("optimistic upload was not stored in the background");
1482 }
1483
1484 #[tokio::test]
1485 async fn optimistic_upload_existing_blob_skips_queue() {
1486 let temp_dir = TempDir::new().expect("temp dir");
1487 let store = Arc::new(
1488 HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1489 );
1490 let body = axum::body::Bytes::from(
1491 (0u16..=255)
1492 .map(|value| ((value * 73 + 19) % 256) as u8)
1493 .collect::<Vec<_>>(),
1494 );
1495 store.put_cached_blob(&body).expect("seed blob");
1496
1497 let mut state = test_app_state(Arc::clone(&store));
1498 state.optimistic_blossom_uploads = true;
1499 state.optimistic_upload_queue_bytes = 1;
1500 state.optimistic_upload_queue = Arc::new(tokio::sync::Semaphore::new(1));
1501
1502 let keys = nostr::Keys::generate();
1503 let mut headers = HeaderMap::new();
1504 headers.insert(
1505 header::AUTHORIZATION,
1506 create_upload_auth_header(&keys)
1507 .parse()
1508 .expect("auth header value"),
1509 );
1510 headers.insert(
1511 header::CONTENT_TYPE,
1512 "application/octet-stream"
1513 .parse()
1514 .expect("content type header value"),
1515 );
1516
1517 let response = upload_blob(State(state), headers, body)
1518 .await
1519 .into_response();
1520 assert_eq!(response.status(), StatusCode::CONFLICT);
1521 }
1522
1523 #[tokio::test]
1524 async fn optimistic_upload_existing_blob_uses_queue_before_preflight_when_queue_has_room() {
1525 let temp_dir = TempDir::new().expect("temp dir");
1526 let store = Arc::new(
1527 HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1528 );
1529 let body = axum::body::Bytes::from((0u8..=255).rev().collect::<Vec<_>>());
1530 let hash_hex = hex::encode(sha256(&body));
1531 store.put_cached_blob(&body).expect("seed blob");
1532
1533 let mut state = test_app_state(store);
1534 state.optimistic_blossom_uploads = true;
1535
1536 let keys = nostr::Keys::generate();
1537 let mut headers = HeaderMap::new();
1538 headers.insert(
1539 header::AUTHORIZATION,
1540 create_upload_auth_header(&keys)
1541 .parse()
1542 .expect("auth header value"),
1543 );
1544 headers.insert(
1545 header::CONTENT_TYPE,
1546 "application/octet-stream"
1547 .parse()
1548 .expect("content type header value"),
1549 );
1550
1551 let response = upload_blob(State(state), headers, body)
1552 .await
1553 .into_response();
1554 assert_eq!(response.status(), StatusCode::ACCEPTED);
1555
1556 for _ in 0..50 {
1557 if !optimistic_upload_is_inflight(&hash_hex) {
1558 return;
1559 }
1560 tokio::time::sleep(Duration::from_millis(10)).await;
1561 }
1562
1563 clear_optimistic_upload_inflight(&hash_hex);
1564 panic!("optimistic upload in-flight marker was not cleared");
1565 }
1566
1567 #[tokio::test]
1568 async fn optimistic_upload_inflight_duplicate_skips_queue() {
1569 let temp_dir = TempDir::new().expect("temp dir");
1570 let store = Arc::new(
1571 HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1572 );
1573 let body = axum::body::Bytes::from((0u8..=255).collect::<Vec<_>>());
1574 let hash_hex = hex::encode(sha256(&body));
1575 assert!(mark_optimistic_upload_inflight(&hash_hex));
1576
1577 let mut state = test_app_state(store);
1578 state.optimistic_blossom_uploads = true;
1579 state.optimistic_upload_queue_bytes = 1;
1580 state.optimistic_upload_queue = Arc::new(tokio::sync::Semaphore::new(1));
1581
1582 let keys = nostr::Keys::generate();
1583 let mut headers = HeaderMap::new();
1584 headers.insert(
1585 header::AUTHORIZATION,
1586 create_upload_auth_header(&keys)
1587 .parse()
1588 .expect("auth header value"),
1589 );
1590 headers.insert(
1591 header::CONTENT_TYPE,
1592 "application/octet-stream"
1593 .parse()
1594 .expect("content type header value"),
1595 );
1596
1597 let response = upload_blob(State(state), headers, body)
1598 .await
1599 .into_response();
1600 clear_optimistic_upload_inflight(&hash_hex);
1601 assert_eq!(response.status(), StatusCode::ACCEPTED);
1602 }
1603
1604 #[test]
1605 fn public_writes_accept_unlisted_authors_for_uploads() {
1606 let temp_dir = TempDir::new().expect("temp dir");
1607 let store =
1608 Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 700).expect("store"));
1609 let mut state = test_app_state(store);
1610 let pubkey = "ea4fe79e57f209309bffed2f92f0b95b59d3d1cb4e8892444398aeea7ee317ed";
1611
1612 state.public_writes = true;
1613 assert!(can_accept_upload_author(&state, pubkey));
1614 assert!(!is_allowed_write_author(&state, pubkey));
1615
1616 state.public_writes = false;
1617 assert!(!can_accept_upload_author(&state, pubkey));
1618 }
1619
1620 #[test]
1621 fn public_write_trust_allows_octet_stream_and_raw_media_payloads() {
1622 let encrypted_block: Vec<u8> = (0..=255).collect();
1623
1624 assert_eq!(
1625 validate_upload_payload(&encrypted_block, "application/octet-stream", false, true,),
1626 Ok(())
1627 );
1628
1629 assert_eq!(
1630 validate_upload_payload(b"audio bytes", "audio/mpeg", true, true,),
1631 Ok(())
1632 );
1633
1634 assert_eq!(
1635 validate_upload_payload(b"audio bytes", "audio/mpeg", false, true,),
1636 Err((
1637 StatusCode::FORBIDDEN,
1638 "Raw media uploads require write access".to_string(),
1639 ))
1640 );
1641 }
1642
1643 #[test]
1644 fn unowned_public_uploads_use_cache_storage_semantics() {
1645 let temp_dir = TempDir::new().expect("temp dir");
1646 let store =
1647 Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 700).expect("store"));
1648 let state = test_app_state(Arc::clone(&store));
1649
1650 let owned = vec![1u8; 280];
1651 let owned_hash = sha256(&owned);
1652 store_blossom_blob(&state, &owned, &owned_hash, &[2u8; 32], true).expect("owned upload");
1653
1654 let public_upload = vec![3u8; 280];
1655 let public_hash = sha256(&public_upload);
1656 store_blossom_blob(&state, &public_upload, &public_hash, &[4u8; 32], false)
1657 .expect("public upload");
1658
1659 let replacement = vec![5u8; 280];
1660 let replacement_hash = sha256(&replacement);
1661 state
1662 .store
1663 .put_cached_blob(&replacement)
1664 .expect("replacement cached blob");
1665
1666 assert!(state.store.blob_exists(&owned_hash).expect("owned exists"));
1667 assert!(!state
1668 .store
1669 .blob_exists(&public_hash)
1670 .expect("public upload evicted"));
1671 assert!(state
1672 .store
1673 .blob_exists(&replacement_hash)
1674 .expect("replacement exists"));
1675 assert!(state
1676 .store
1677 .is_blob_owner(&owned_hash, &[2u8; 32])
1678 .expect("owned tracked"));
1679 assert!(!state
1680 .store
1681 .blob_has_owners(&public_hash)
1682 .expect("public upload unowned"));
1683 }
1684
1685 #[test]
1686 fn owned_blossom_uploads_are_rejected_when_storage_limit_is_full() {
1687 let temp_dir = TempDir::new().expect("temp dir");
1688 let store =
1689 Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 500).expect("store"));
1690 let state = test_app_state(Arc::clone(&store));
1691
1692 let first = vec![1u8; 300];
1693 let first_hash = sha256(&first);
1694 let owner = [2u8; 32];
1695 store_blossom_blob(&state, &first, &first_hash, &owner, true).expect("first upload");
1696
1697 let second = vec![3u8; 300];
1698 let second_hash = sha256(&second);
1699 let error = store_blossom_blob(&state, &second, &second_hash, &owner, true)
1700 .expect_err("second owned upload should exceed the storage limit");
1701
1702 assert!(
1703 error.to_string().contains("storage limit"),
1704 "unexpected error: {error}"
1705 );
1706 assert!(state
1707 .store
1708 .blob_exists(&first_hash)
1709 .expect("first blob remains"));
1710 assert!(!state
1711 .store
1712 .blob_exists(&second_hash)
1713 .expect("second blob rejected"));
1714 assert!(state
1715 .store
1716 .is_blob_owner(&first_hash, &owner)
1717 .expect("first owner tracked"));
1718 assert!(!state
1719 .store
1720 .is_blob_owner(&second_hash, &owner)
1721 .expect("second owner not tracked"));
1722 }
1723}