1use axum::{
7 body::Body,
8 extract::{Path, Query, State},
9 http::{header, HeaderMap, Response, StatusCode},
10 response::IntoResponse,
11 Json,
12};
13use base64::Engine;
14use hashtree_core::from_hex;
15use serde::{Deserialize, Serialize};
16use sha2::{Digest, Sha256};
17use std::collections::HashSet;
18use std::sync::{Mutex, OnceLock};
19use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
20
21use super::auth::AppState;
22use super::blob_read::{acquire_blob_read, acquire_blob_write, blob_read_timeout, BLOB_READ_BUSY};
23use super::ingest_filter::{
24 content_type_base, is_chk_content_type, validate_untrusted_blob, IngestRejection,
25};
26use super::mime::get_mime_type;
27
28const BLOSSOM_AUTH_KIND: u16 = 24242;
30
31const IMMUTABLE_CACHE_CONTROL: &str = "public, max-age=31536000, immutable";
33const NOT_FOUND_CACHE_CONTROL: &str = "no-store";
34const IMMUTABLE_NOT_FOUND_CACHE_CONTROL: &str = "public, max-age=0, s-maxage=5";
35const OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS_ENV: &str = "HTREE_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS";
36const DEFAULT_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS: u64 = 15_000;
37
38pub const DEFAULT_MAX_UPLOAD_SIZE: usize = 5 * 1024 * 1024;
40const OPTIMISTIC_UPLOAD_MIN_QUEUE_CHARGE_BYTES: usize = 256 * 1024;
41const MAX_BATCH_UPLOAD_BLOBS: usize = 1024;
42const MAX_BATCH_UPLOAD_BYTES: usize = 64 * 1024 * 1024;
43const MAX_UPLOAD_CHECK_HASHES: usize = 10_000;
44const SLOW_BATCH_UPLOAD_LOG_MS_ENV: &str = "HTREE_SLOW_BATCH_UPLOAD_LOG_MS";
45
46fn slow_batch_upload_log_ms() -> Option<u128> {
47 std::env::var(SLOW_BATCH_UPLOAD_LOG_MS_ENV)
48 .ok()
49 .and_then(|value| value.parse::<u128>().ok())
50 .filter(|value| *value > 0)
51}
52
53#[derive(Debug, Clone, Copy)]
54pub(super) struct OptimisticUploadQueueSnapshot {
55 pub enabled: bool,
56 pub max_bytes: usize,
57 pub available_bytes: usize,
58 pub reserved_bytes: usize,
59 pub in_flight: usize,
60 pub queue_timeout_ms: u64,
61}
62
63#[allow(clippy::result_large_err)]
66fn check_write_access(state: &AppState, pubkey: &str) -> Result<(), Response<Body>> {
67 if is_allowed_write_author(state, pubkey) {
69 tracing::debug!(
70 "Blossom write allowed for {}... (allowed writer)",
71 &pubkey[..8.min(pubkey.len())]
72 );
73 return Ok(());
74 }
75
76 tracing::info!(
78 "Blossom write denied for {}... (not in allowed_npubs or social graph)",
79 &pubkey[..8.min(pubkey.len())]
80 );
81 Err(Response::builder()
82 .status(StatusCode::FORBIDDEN)
83 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
84 .header(header::CONTENT_TYPE, "application/json")
85 .body(Body::from(
86 r#"{"error":"Write access denied. Your pubkey is not in the allowed list."}"#,
87 ))
88 .unwrap())
89}
90
91fn is_allowed_write_author(state: &AppState, pubkey: &str) -> bool {
92 if state.allowed_pubkeys.contains(pubkey) {
93 return true;
94 }
95
96 state
97 .social_graph
98 .as_ref()
99 .map(|sg| sg.check_write_access(pubkey))
100 .unwrap_or(false)
101}
102
103fn can_accept_upload_author(state: &AppState, pubkey: &str) -> bool {
104 state.public_writes || is_allowed_write_author(state, pubkey)
105}
106
107fn validate_upload_payload(
108 body: &[u8],
109 content_type: &str,
110 can_upload_author: bool,
111 require_random_untrusted_ingest: bool,
112) -> Result<(), (StatusCode, String)> {
113 let is_chk_upload = is_chk_content_type(content_type);
114
115 if !is_chk_upload && !can_upload_author {
116 return Err((
117 StatusCode::FORBIDDEN,
118 "Raw media uploads require write access".to_string(),
119 ));
120 }
121
122 if is_chk_upload {
123 let require_random = require_random_untrusted_ingest && !can_upload_author;
124 validate_untrusted_blob(body, require_random)
125 .map_err(|IngestRejection { status, reason }| (status, reason))?;
126 }
127
128 Ok(())
129}
130
131fn blossom_json_error(status: StatusCode, reason: impl Into<String>) -> Response<Body> {
132 let reason = reason.into();
133 Response::builder()
134 .status(status)
135 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
136 .header("X-Reason", reason.as_str())
137 .header(header::CONTENT_TYPE, "application/json")
138 .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
139 .unwrap()
140}
141
142fn blossom_retryable_json_error(
143 status: StatusCode,
144 reason: impl Into<String>,
145 retry_after_seconds: u64,
146) -> Response<Body> {
147 let reason = reason.into();
148 Response::builder()
149 .status(status)
150 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
151 .header("X-Reason", reason.as_str())
152 .header(header::RETRY_AFTER, retry_after_seconds.to_string())
153 .header(header::CONTENT_TYPE, "application/json")
154 .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
155 .unwrap()
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct BlobDescriptor {
161 pub url: String,
162 pub sha256: String,
163 pub size: u64,
164 #[serde(rename = "type")]
165 pub mime_type: String,
166 pub uploaded: u64,
167}
168
169#[derive(Debug, Deserialize)]
170pub struct BatchUploadBlob {
171 pub sha256: String,
172 #[serde(default, alias = "contentType")]
173 pub content_type: Option<String>,
174 pub data: String,
175}
176
177#[derive(Debug, Deserialize)]
178pub struct BatchUploadRequest {
179 pub blobs: Vec<BatchUploadBlob>,
180}
181
182#[derive(Debug, Serialize)]
183pub struct BatchUploadResponse {
184 pub uploaded: usize,
185 pub blobs: Vec<BlobDescriptor>,
186}
187
188#[derive(Debug, Deserialize)]
189pub struct UploadCheckRequest {
190 pub hashes: Vec<String>,
191}
192
193#[derive(Debug, Serialize, Deserialize)]
194pub struct UploadCheckResponse {
195 pub count: usize,
196 pub present: String,
197}
198
199#[derive(Debug, Deserialize)]
201pub struct ListQuery {
202 pub since: Option<u64>,
203 pub until: Option<u64>,
204 pub limit: Option<usize>,
205 pub cursor: Option<String>,
206}
207
208#[derive(Debug)]
210pub struct BlossomAuth {
211 pub pubkey: String,
212 pub kind: u16,
213 pub created_at: u64,
214 pub expiration: Option<u64>,
215 pub action: Option<String>, pub blob_hashes: Vec<String>, pub server: Option<String>, }
219
220pub fn verify_blossom_auth(
223 headers: &HeaderMap,
224 required_action: &str,
225 required_hash: Option<&str>,
226) -> Result<BlossomAuth, (StatusCode, &'static str)> {
227 let auth_header = headers
228 .get(header::AUTHORIZATION)
229 .and_then(|v| v.to_str().ok())
230 .ok_or((StatusCode::UNAUTHORIZED, "Missing Authorization header"))?;
231
232 let nostr_event = auth_header.strip_prefix("Nostr ").ok_or((
233 StatusCode::UNAUTHORIZED,
234 "Invalid auth scheme, expected 'Nostr'",
235 ))?;
236
237 let engine = base64::engine::general_purpose::STANDARD;
239 let event_bytes = engine
240 .decode(nostr_event)
241 .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid base64 in auth header"))?;
242
243 let event_json: serde_json::Value = serde_json::from_slice(&event_bytes)
244 .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid JSON in auth event"))?;
245
246 let kind = event_json["kind"]
248 .as_u64()
249 .ok_or((StatusCode::BAD_REQUEST, "Missing kind in event"))?;
250
251 if kind != BLOSSOM_AUTH_KIND as u64 {
252 return Err((
253 StatusCode::BAD_REQUEST,
254 "Invalid event kind, expected 24242",
255 ));
256 }
257
258 let pubkey = event_json["pubkey"]
259 .as_str()
260 .ok_or((StatusCode::BAD_REQUEST, "Missing pubkey in event"))?
261 .to_string();
262
263 let created_at = event_json["created_at"]
264 .as_u64()
265 .ok_or((StatusCode::BAD_REQUEST, "Missing created_at in event"))?;
266
267 let sig = event_json["sig"]
268 .as_str()
269 .ok_or((StatusCode::BAD_REQUEST, "Missing signature in event"))?;
270
271 if !verify_nostr_signature(&event_json, &pubkey, sig) {
273 return Err((StatusCode::UNAUTHORIZED, "Invalid signature"));
274 }
275
276 let tags = event_json["tags"]
278 .as_array()
279 .ok_or((StatusCode::BAD_REQUEST, "Missing tags in event"))?;
280
281 let mut expiration: Option<u64> = None;
282 let mut action: Option<String> = None;
283 let mut blob_hashes: Vec<String> = Vec::new();
284 let mut server: Option<String> = None;
285
286 for tag in tags {
287 let tag_arr = tag.as_array();
288 if let Some(arr) = tag_arr {
289 if arr.len() >= 2 {
290 let tag_name = arr[0].as_str().unwrap_or("");
291 let tag_value = arr[1].as_str().unwrap_or("");
292
293 match tag_name {
294 "t" => action = Some(tag_value.to_string()),
295 "x" => blob_hashes.push(tag_value.to_lowercase()),
296 "expiration" => expiration = tag_value.parse().ok(),
297 "server" => server = Some(tag_value.to_string()),
298 _ => {}
299 }
300 }
301 }
302 }
303
304 let now = SystemTime::now()
306 .duration_since(UNIX_EPOCH)
307 .unwrap()
308 .as_secs();
309
310 if let Some(exp) = expiration {
311 if exp < now {
312 return Err((StatusCode::UNAUTHORIZED, "Authorization expired"));
313 }
314 }
315
316 if created_at > now + 60 {
318 return Err((StatusCode::BAD_REQUEST, "Event created_at is in the future"));
319 }
320
321 if let Some(ref act) = action {
323 if act != required_action {
324 return Err((StatusCode::FORBIDDEN, "Action mismatch"));
325 }
326 } else {
327 return Err((StatusCode::BAD_REQUEST, "Missing 't' tag for action"));
328 }
329
330 if let Some(hash) = required_hash {
332 if !blob_hashes.is_empty() && !blob_hashes.contains(&hash.to_lowercase()) {
333 return Err((StatusCode::FORBIDDEN, "Blob hash not authorized"));
334 }
335 }
336
337 Ok(BlossomAuth {
338 pubkey,
339 kind: kind as u16,
340 created_at,
341 expiration,
342 action,
343 blob_hashes,
344 server,
345 })
346}
347
348fn verify_nostr_signature(event: &serde_json::Value, pubkey: &str, sig: &str) -> bool {
350 use secp256k1::{schnorr::Signature, Message, Secp256k1, XOnlyPublicKey};
351
352 let content = event["content"].as_str().unwrap_or("");
354 let full_serialized = format!(
355 "[0,\"{}\",{},{},{},\"{}\"]",
356 pubkey,
357 event["created_at"],
358 event["kind"],
359 event["tags"],
360 escape_json_string(content),
361 );
362
363 let mut hasher = Sha256::new();
364 hasher.update(full_serialized.as_bytes());
365 let event_id = hasher.finalize();
366
367 let pubkey_bytes = match hex::decode(pubkey) {
369 Ok(b) => b,
370 Err(_) => return false,
371 };
372
373 let sig_bytes = match hex::decode(sig) {
374 Ok(b) => b,
375 Err(_) => return false,
376 };
377
378 let secp = Secp256k1::verification_only();
379
380 let xonly_pubkey = match XOnlyPublicKey::from_slice(&pubkey_bytes) {
381 Ok(pk) => pk,
382 Err(_) => return false,
383 };
384
385 let signature = match Signature::from_slice(&sig_bytes) {
386 Ok(s) => s,
387 Err(_) => return false,
388 };
389
390 let message = match Message::from_digest_slice(&event_id) {
391 Ok(m) => m,
392 Err(_) => return false,
393 };
394
395 secp.verify_schnorr(&signature, &message, &xonly_pubkey)
396 .is_ok()
397}
398
399fn escape_json_string(s: &str) -> String {
401 let mut result = String::new();
402 for c in s.chars() {
403 match c {
404 '"' => result.push_str("\\\""),
405 '\\' => result.push_str("\\\\"),
406 '\n' => result.push_str("\\n"),
407 '\r' => result.push_str("\\r"),
408 '\t' => result.push_str("\\t"),
409 c if c.is_control() => {
410 result.push_str(&format!("\\u{:04x}", c as u32));
411 }
412 c => result.push(c),
413 }
414 }
415 result
416}
417
418pub async fn cors_preflight(headers: HeaderMap) -> impl IntoResponse {
421 let allowed_headers = headers
423 .get(header::ACCESS_CONTROL_REQUEST_HEADERS)
424 .and_then(|v| v.to_str().ok())
425 .unwrap_or("Authorization, Content-Type, X-SHA-256, x-sha-256");
426
427 let full_allowed = format!(
429 "{}, Authorization, Content-Type, X-SHA-256, x-sha-256, Accept, Cache-Control",
430 allowed_headers
431 );
432
433 Response::builder()
434 .status(StatusCode::NO_CONTENT)
435 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
436 .header(
437 header::ACCESS_CONTROL_ALLOW_METHODS,
438 "GET, HEAD, POST, PUT, DELETE, OPTIONS",
439 )
440 .header(header::ACCESS_CONTROL_ALLOW_HEADERS, full_allowed)
441 .header(header::ACCESS_CONTROL_MAX_AGE, "86400")
442 .body(Body::empty())
443 .unwrap()
444}
445
446fn encode_upload_check_bitset(bits: &[bool]) -> String {
447 let mut bytes = vec![0u8; bits.len().div_ceil(8)];
448 for (index, present) in bits.iter().enumerate() {
449 if *present {
450 bytes[index / 8] |= 1 << (index % 8);
451 }
452 }
453 base64::engine::general_purpose::STANDARD.encode(bytes)
454}
455
456pub async fn upload_check(
458 State(state): State<AppState>,
459 Json(payload): Json<UploadCheckRequest>,
460) -> impl IntoResponse {
461 if payload.hashes.len() > MAX_UPLOAD_CHECK_HASHES {
462 return blossom_json_error(
463 StatusCode::PAYLOAD_TOO_LARGE,
464 format!("Too many hashes; maximum is {}", MAX_UPLOAD_CHECK_HASHES),
465 );
466 }
467
468 let mut requested = Vec::with_capacity(payload.hashes.len());
469 let mut unique = Vec::new();
470 for hash in payload.hashes {
471 let hash = hash.trim().to_ascii_lowercase();
472 if !is_valid_sha256(&hash) {
473 return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid SHA256 hash");
474 }
475 let bytes: [u8; 32] = match from_hex(&hash) {
476 Ok(bytes) => bytes,
477 Err(_) => return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid SHA256 hash"),
478 };
479 requested.push(bytes);
480 unique.push(bytes);
481 }
482
483 unique.sort_unstable();
484 unique.dedup();
485
486 let existing = if unique.is_empty() {
487 Vec::new()
488 } else {
489 let permit = match acquire_blob_read().await {
490 Ok(permit) => permit,
491 Err(_) => {
492 return blossom_retryable_json_error(
493 StatusCode::SERVICE_UNAVAILABLE,
494 BLOB_READ_BUSY,
495 1,
496 );
497 }
498 };
499 let store = state.store.clone();
500 let lookup_hashes = unique.clone();
501 let lookup = tokio::task::spawn_blocking(move || {
502 store
503 .router()
504 .existing_local_hashes_in_sorted_candidates(&lookup_hashes)
505 });
506 let result = tokio::time::timeout(blob_read_timeout(), lookup).await;
507 drop(permit);
508 match result {
509 Ok(Ok(Ok(existing))) => existing,
510 Ok(Ok(Err(error))) => {
511 tracing::debug!("Blossom upload check failed: {}", error);
512 return blossom_json_error(StatusCode::INTERNAL_SERVER_ERROR, "Storage error");
513 }
514 Ok(Err(error)) => {
515 tracing::debug!("Blossom upload check task failed: {}", error);
516 return blossom_json_error(StatusCode::INTERNAL_SERVER_ERROR, "Storage error");
517 }
518 Err(_) => {
519 return blossom_retryable_json_error(
520 StatusCode::SERVICE_UNAVAILABLE,
521 "Blob check timed out",
522 1,
523 );
524 }
525 }
526 };
527
528 let present_unique: HashSet<[u8; 32]> = unique
529 .into_iter()
530 .zip(existing)
531 .filter_map(|(hash, present)| present.then_some(hash))
532 .collect();
533 let present_bits: Vec<bool> = requested
534 .iter()
535 .map(|hash| present_unique.contains(hash))
536 .collect();
537
538 Response::builder()
539 .status(StatusCode::OK)
540 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
541 .header(header::CONTENT_TYPE, "application/json")
542 .body(Body::from(
543 serde_json::to_string(&UploadCheckResponse {
544 count: present_bits.len(),
545 present: encode_upload_check_bitset(&present_bits),
546 })
547 .unwrap(),
548 ))
549 .unwrap()
550}
551
552pub async fn head_blob(
554 State(state): State<AppState>,
555 Path(id): Path<String>,
556 connect_info: axum::extract::ConnectInfo<std::net::SocketAddr>,
557) -> impl IntoResponse {
558 let is_localhost = connect_info.0.ip().is_loopback();
559 let (hash_part, ext) = parse_hash_and_extension(&id);
560
561 if !is_valid_sha256(hash_part) {
562 return Response::builder()
563 .status(StatusCode::BAD_REQUEST)
564 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
565 .header("X-Reason", "Invalid SHA256 hash")
566 .body(Body::empty())
567 .unwrap();
568 }
569
570 let sha256_hex = hash_part.to_lowercase();
571 let sha256_bytes: [u8; 32] = match from_hex(&sha256_hex) {
572 Ok(b) => b,
573 Err(_) => {
574 return Response::builder()
575 .status(StatusCode::BAD_REQUEST)
576 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
577 .header("X-Reason", "Invalid SHA256 format")
578 .body(Body::empty())
579 .unwrap();
580 }
581 };
582
583 let blob_size = if let Some(cached) = state.blob_cache.get_size(&sha256_hex) {
588 Ok(Ok(cached))
589 } else {
590 let permit = match acquire_blob_read().await {
591 Ok(permit) => permit,
592 Err(_) => {
593 return Response::builder()
594 .status(StatusCode::SERVICE_UNAVAILABLE)
595 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
596 .header(header::CACHE_CONTROL, NOT_FOUND_CACHE_CONTROL)
597 .header("Retry-After", "1")
598 .header("X-Reason", BLOB_READ_BUSY)
599 .body(Body::empty())
600 .unwrap();
601 }
602 };
603 let store = state.store.clone();
604 let size_read = tokio::task::spawn_blocking(move || store.blob_size(&sha256_bytes));
605 let timed = tokio::time::timeout(blob_read_timeout(), size_read).await;
606 drop(permit);
607 let result = match timed {
608 Ok(result) => result.map_err(|_| ()),
609 Err(_) => Err(()),
610 };
611 if let Ok(Ok(size)) = result {
612 state.blob_cache.put_size(sha256_hex.clone(), size);
613 }
614 result
615 };
616
617 match blob_size {
618 Ok(Ok(Some(size))) => {
619 let mime_type = ext
620 .map(|e| get_mime_type(&format!("file{}", e)))
621 .unwrap_or("application/octet-stream");
622
623 let mut builder = Response::builder()
624 .status(StatusCode::OK)
625 .header(header::CONTENT_TYPE, mime_type)
626 .header(header::CONTENT_LENGTH, size)
627 .header(header::ACCEPT_RANGES, "bytes")
628 .header(header::CACHE_CONTROL, IMMUTABLE_CACHE_CONTROL)
629 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*");
630 if is_localhost {
631 builder = builder.header("X-Source", "local");
632 }
633 builder.body(Body::empty()).unwrap()
634 }
635 Ok(Ok(None)) => Response::builder()
636 .status(StatusCode::NOT_FOUND)
637 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
638 .header(header::CACHE_CONTROL, IMMUTABLE_NOT_FOUND_CACHE_CONTROL)
639 .header("X-Reason", "Blob not found")
640 .body(Body::empty())
641 .unwrap(),
642 _ => Response::builder()
643 .status(StatusCode::INTERNAL_SERVER_ERROR)
644 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
645 .body(Body::empty())
646 .unwrap(),
647 }
648}
649
650async fn store_blossom_blob_without_blocking_runtime(
651 state: &AppState,
652 data: axum::body::Bytes,
653 pubkey: [u8; 32],
654 track_ownership: bool,
655) -> anyhow::Result<()> {
656 let mut hasher = Sha256::new();
657 hasher.update(&data);
658 let hash_hex = hex::encode(hasher.finalize());
659 let data_for_cache = data.clone();
660 let permit = acquire_blob_write()
661 .await
662 .map_err(|err| anyhow::anyhow!(err))?;
663 let store = state.store.clone();
664 tokio::task::spawn_blocking(move || {
665 let _permit = permit;
666 if track_ownership {
667 store.put_owned_blob(&data, &pubkey)?;
668 } else {
669 store.put_cached_blob(&data)?;
670 }
671 Ok::<(), anyhow::Error>(())
672 })
673 .await
674 .map_err(|err| anyhow::anyhow!("blob write task failed: {}", err))??;
675 state
676 .blob_cache
677 .put_size(hash_hex.clone(), Some(data_for_cache.len() as u64));
678 state.blob_cache.put_body(hash_hex, &data_for_cache);
679 Ok(())
680}
681
682fn upload_descriptor_response(status: StatusCode, descriptor: &BlobDescriptor) -> Response<Body> {
683 Response::builder()
684 .status(status)
685 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
686 .header(header::CONTENT_TYPE, "application/json")
687 .body(Body::from(serde_json::to_string(descriptor).unwrap()))
688 .unwrap()
689}
690
691fn optimistic_upload_queue_timeout() -> Duration {
692 let millis = std::env::var(OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS_ENV)
693 .ok()
694 .and_then(|value| value.parse::<u64>().ok())
695 .filter(|value| *value > 0)
696 .unwrap_or(DEFAULT_OPTIMISTIC_UPLOAD_QUEUE_TIMEOUT_MS);
697 Duration::from_millis(millis)
698}
699
700fn optimistic_upload_inflight() -> &'static Mutex<HashSet<String>> {
701 static INFLIGHT: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
702 INFLIGHT.get_or_init(|| Mutex::new(HashSet::new()))
703}
704
705fn optimistic_upload_is_inflight(hash_hex: &str) -> bool {
706 optimistic_upload_inflight()
707 .lock()
708 .is_ok_and(|inflight| inflight.contains(hash_hex))
709}
710
711fn mark_optimistic_upload_inflight(hash_hex: &str) -> bool {
712 optimistic_upload_inflight()
713 .lock()
714 .map(|mut inflight| inflight.insert(hash_hex.to_string()))
715 .unwrap_or(true)
716}
717
718fn clear_optimistic_upload_inflight(hash_hex: &str) {
719 if let Ok(mut inflight) = optimistic_upload_inflight().lock() {
720 inflight.remove(hash_hex);
721 }
722}
723
724pub(super) fn optimistic_upload_queue_snapshot(state: &AppState) -> OptimisticUploadQueueSnapshot {
725 let max_bytes = state.optimistic_upload_queue_bytes;
726 let available_bytes = state
727 .optimistic_upload_queue
728 .available_permits()
729 .min(max_bytes);
730 let in_flight = optimistic_upload_inflight()
731 .lock()
732 .map(|inflight| inflight.len())
733 .unwrap_or(0);
734
735 OptimisticUploadQueueSnapshot {
736 enabled: state.optimistic_blossom_uploads,
737 max_bytes,
738 available_bytes,
739 reserved_bytes: max_bytes.saturating_sub(available_bytes),
740 in_flight,
741 queue_timeout_ms: duration_millis_u64(optimistic_upload_queue_timeout()),
742 }
743}
744
745fn duration_millis_u64(duration: Duration) -> u64 {
746 duration.as_millis().min(u128::from(u64::MAX)) as u64
747}
748
749async fn acquire_optimistic_upload_queue(
750 state: &AppState,
751 permits: u32,
752) -> Result<tokio::sync::OwnedSemaphorePermit, &'static str> {
753 match tokio::time::timeout(
754 optimistic_upload_queue_timeout(),
755 state
756 .optimistic_upload_queue
757 .clone()
758 .acquire_many_owned(permits),
759 )
760 .await
761 {
762 Ok(Ok(permit)) => Ok(permit),
763 Ok(Err(_)) => Err("Optimistic upload queue is closed"),
764 Err(_) => Err("Optimistic upload queue is full"),
765 }
766}
767
768async fn uploaded_blob_already_exists(
769 state: &AppState,
770 sha256_hash: [u8; 32],
771 sha256_hex: &str,
772) -> Result<bool, String> {
773 if let Some(Some(_)) = state.blob_cache.get_size(sha256_hex) {
774 return Ok(true);
775 }
776
777 let permit = acquire_blob_read().await.map_err(str::to_string)?;
778 let store = state.store.clone();
779 let size_read = tokio::task::spawn_blocking(move || {
780 store
781 .blob_size(&sha256_hash)
782 .map_err(|error| error.to_string())
783 });
784
785 let result = tokio::time::timeout(blob_read_timeout(), size_read).await;
786 drop(permit);
787 match result {
788 Ok(Ok(Ok(size))) => {
789 state.blob_cache.put_size(sha256_hex.to_string(), size);
790 Ok(size.is_some())
791 }
792 Ok(Ok(Err(error))) => Err(error),
793 Ok(Err(error)) => Err(format!("blob existence task failed: {}", error)),
794 Err(_) => Err("blob existence check timed out".to_string()),
795 }
796}
797
798async fn set_existing_blob_owner_without_body_write(
799 state: AppState,
800 sha256_hash: [u8; 32],
801 pubkey: [u8; 32],
802) -> anyhow::Result<()> {
803 tokio::task::spawn_blocking(move || state.store.set_blob_owner(&sha256_hash, &pubkey))
804 .await
805 .map_err(|error| anyhow::anyhow!("blob owner task failed: {}", error))??;
806 Ok(())
807}
808
809pub async fn upload_blob(
811 State(state): State<AppState>,
812 headers: HeaderMap,
813 body: axum::body::Bytes,
814) -> impl IntoResponse {
815 let max_size = state.max_upload_bytes;
817 if body.len() > max_size {
818 return Response::builder()
819 .status(StatusCode::PAYLOAD_TOO_LARGE)
820 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
821 .header(header::CONTENT_TYPE, "application/json")
822 .body(Body::from(format!(
823 r#"{{"error":"Upload size {} bytes exceeds maximum {} bytes ({} MB)"}}"#,
824 body.len(),
825 max_size,
826 max_size / 1024 / 1024
827 )))
828 .unwrap();
829 }
830
831 let auth = match verify_blossom_auth(&headers, "upload", None) {
833 Ok(a) => a,
834 Err((status, reason)) => {
835 return Response::builder()
836 .status(status)
837 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
838 .header("X-Reason", reason)
839 .header(header::CONTENT_TYPE, "application/json")
840 .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
841 .unwrap();
842 }
843 };
844
845 let content_type = headers
847 .get(header::CONTENT_TYPE)
848 .and_then(|v| v.to_str().ok())
849 .unwrap_or("application/octet-stream")
850 .to_string();
851
852 let is_allowed_author = is_allowed_write_author(&state, &auth.pubkey);
854 let can_upload = can_accept_upload_author(&state, &auth.pubkey);
855 if !can_upload {
856 let _ = check_write_access(&state, &auth.pubkey);
857 return Response::builder()
858 .status(StatusCode::FORBIDDEN)
859 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
860 .header(header::CONTENT_TYPE, "application/json")
861 .body(Body::from(r#"{"error":"Write access denied. Your pubkey is not in the allowed list and public writes are disabled."}"#))
862 .unwrap();
863 }
864
865 if let Err((status, reason)) = validate_upload_payload(
866 &body,
867 &content_type,
868 can_upload,
869 state.require_random_untrusted_ingest,
870 ) {
871 return blossom_json_error(status, reason);
872 }
873
874 let mut hasher = Sha256::new();
876 hasher.update(&body);
877 let sha256_hash: [u8; 32] = hasher.finalize().into();
878 let sha256_hex = hex::encode(sha256_hash);
879
880 if !auth.blob_hashes.is_empty() && !auth.blob_hashes.contains(&sha256_hex) {
882 return Response::builder()
883 .status(StatusCode::FORBIDDEN)
884 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
885 .header(
886 "X-Reason",
887 "Uploaded blob hash does not match authorized hash",
888 )
889 .header(header::CONTENT_TYPE, "application/json")
890 .body(Body::from(r#"{"error":"Hash mismatch"}"#))
891 .unwrap();
892 }
893
894 let pubkey_bytes = match from_hex(&auth.pubkey) {
896 Ok(b) => b,
897 Err(_) => {
898 return Response::builder()
899 .status(StatusCode::BAD_REQUEST)
900 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
901 .header("X-Reason", "Invalid pubkey format")
902 .body(Body::empty())
903 .unwrap();
904 }
905 };
906
907 let size = body.len() as u64;
908 let now = SystemTime::now()
909 .duration_since(UNIX_EPOCH)
910 .unwrap()
911 .as_secs();
912 let ext = mime_to_extension(&content_type_base(&content_type));
913 let descriptor = BlobDescriptor {
914 url: format!("/{}{}", sha256_hex, ext),
915 sha256: sha256_hex.clone(),
916 size,
917 mime_type: content_type,
918 uploaded: now,
919 };
920
921 if state.optimistic_blossom_uploads && optimistic_upload_is_inflight(&sha256_hex) {
922 return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
923 }
924
925 if state.optimistic_blossom_uploads {
928 let queued_bytes = body.len().max(OPTIMISTIC_UPLOAD_MIN_QUEUE_CHARGE_BYTES);
929 if queued_bytes <= state.optimistic_upload_queue_bytes {
930 let permits = queued_bytes as u32;
931 let marked_inflight = mark_optimistic_upload_inflight(&sha256_hex);
932 if !marked_inflight {
933 return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
934 }
935 let permit = match acquire_optimistic_upload_queue(&state, permits).await {
936 Ok(permit) => permit,
937 Err(_) => {
938 clear_optimistic_upload_inflight(&sha256_hex);
939 return blossom_retryable_json_error(
940 StatusCode::SERVICE_UNAVAILABLE,
941 "Optimistic upload queue is full",
942 2,
943 );
944 }
945 };
946 let state_for_write = state.clone();
947 let hash_for_log = sha256_hex.clone();
948 tokio::spawn(async move {
949 let _permit = permit;
950 if let Err(error) = store_blossom_blob_without_blocking_runtime(
951 &state_for_write,
952 body,
953 pubkey_bytes,
954 is_allowed_author,
955 )
956 .await
957 {
958 tracing::error!(
959 "Background Blossom storage failed for {}: {:#}",
960 hash_for_log,
961 error
962 );
963 }
964 clear_optimistic_upload_inflight(&hash_for_log);
965 });
966
967 return upload_descriptor_response(StatusCode::ACCEPTED, &descriptor);
968 }
969
970 tracing::warn!(
971 "Blossom upload {} is larger than optimistic queue budget {}; storing synchronously",
972 queued_bytes,
973 state.optimistic_upload_queue_bytes
974 );
975 }
976
977 match uploaded_blob_already_exists(&state, sha256_hash, &sha256_hex).await {
978 Ok(true) => {
979 if is_allowed_author {
980 if let Err(error) = set_existing_blob_owner_without_body_write(
981 state.clone(),
982 sha256_hash,
983 pubkey_bytes,
984 )
985 .await
986 {
987 return Response::builder()
988 .status(StatusCode::INTERNAL_SERVER_ERROR)
989 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
990 .header("X-Reason", "Storage error")
991 .header(header::CONTENT_TYPE, "application/json")
992 .body(Body::from(format!(r#"{{"error":"{}"}}"#, error)))
993 .unwrap();
994 }
995 }
996 return upload_descriptor_response(StatusCode::CONFLICT, &descriptor);
997 }
998 Ok(false) => {}
999 Err(error) => {
1000 tracing::debug!(
1001 "Could not preflight Blossom upload {} before synchronous storage: {}",
1002 sha256_hex,
1003 error
1004 );
1005 }
1006 }
1007
1008 let store_result = store_blossom_blob_without_blocking_runtime(
1009 &state,
1010 body.clone(),
1011 pubkey_bytes,
1012 is_allowed_author,
1013 )
1014 .await;
1015
1016 match store_result {
1017 Ok(()) => upload_descriptor_response(StatusCode::OK, &descriptor),
1018 Err(e) => Response::builder()
1019 .status(StatusCode::INTERNAL_SERVER_ERROR)
1020 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1021 .header("X-Reason", "Storage error")
1022 .header(header::CONTENT_TYPE, "application/json")
1023 .body(Body::from(format!(r#"{{"error":"{}"}}"#, e)))
1024 .unwrap(),
1025 }
1026}
1027
1028pub async fn upload_blob_batch(
1030 State(state): State<AppState>,
1031 headers: HeaderMap,
1032 Json(payload): Json<BatchUploadRequest>,
1033) -> impl IntoResponse {
1034 let started_at = Instant::now();
1035 let slow_log_ms = slow_batch_upload_log_ms();
1036 let payload_blobs = payload.blobs.len();
1037 if payload.blobs.is_empty() {
1038 return blossom_json_error(StatusCode::BAD_REQUEST, "Batch is empty");
1039 }
1040 if payload.blobs.len() > MAX_BATCH_UPLOAD_BLOBS {
1041 return blossom_json_error(
1042 StatusCode::PAYLOAD_TOO_LARGE,
1043 "Batch contains too many blobs",
1044 );
1045 }
1046
1047 let auth = match verify_blossom_auth(&headers, "upload", None) {
1048 Ok(a) => a,
1049 Err((status, reason)) => {
1050 return Response::builder()
1051 .status(status)
1052 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1053 .header("X-Reason", reason)
1054 .header(header::CONTENT_TYPE, "application/json")
1055 .body(Body::from(format!(r#"{{"error":"{}"}}"#, reason)))
1056 .unwrap();
1057 }
1058 };
1059 let auth_ms = started_at.elapsed().as_millis();
1060
1061 let is_allowed_author = is_allowed_write_author(&state, &auth.pubkey);
1062 let can_upload = can_accept_upload_author(&state, &auth.pubkey);
1063 if !can_upload {
1064 let _ = check_write_access(&state, &auth.pubkey);
1065 return blossom_json_error(StatusCode::FORBIDDEN, "Write access denied");
1066 }
1067
1068 let pubkey_bytes = match from_hex(&auth.pubkey) {
1069 Ok(bytes) => bytes,
1070 Err(_) => return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid pubkey format"),
1071 };
1072
1073 let now = SystemTime::now()
1074 .duration_since(UNIX_EPOCH)
1075 .unwrap()
1076 .as_secs();
1077 let mut total_bytes = 0usize;
1078 let mut items = Vec::with_capacity(payload.blobs.len());
1079 let mut descriptors = Vec::with_capacity(payload.blobs.len());
1080 let mut decode_hash_ms = 0u128;
1081 let mut validate_ms = 0u128;
1082
1083 for blob in payload.blobs {
1084 let decode_started = Instant::now();
1085 let sha256_hex = blob.sha256.to_lowercase();
1086 let expected_hash: [u8; 32] = match from_hex(&sha256_hex) {
1087 Ok(hash) => hash,
1088 Err(_) => return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid blob hash"),
1089 };
1090 if !auth.blob_hashes.is_empty() && !auth.blob_hashes.contains(&sha256_hex) {
1091 return blossom_json_error(
1092 StatusCode::FORBIDDEN,
1093 "Uploaded blob hash does not match authorized hash",
1094 );
1095 }
1096
1097 let data = match base64::engine::general_purpose::STANDARD.decode(blob.data.as_bytes()) {
1098 Ok(data) => data,
1099 Err(_) => return blossom_json_error(StatusCode::BAD_REQUEST, "Invalid blob data"),
1100 };
1101 if data.len() > state.max_upload_bytes {
1102 return blossom_json_error(
1103 StatusCode::PAYLOAD_TOO_LARGE,
1104 "Blob exceeds maximum upload size",
1105 );
1106 }
1107 total_bytes = total_bytes.saturating_add(data.len());
1108 if total_bytes > MAX_BATCH_UPLOAD_BYTES {
1109 return blossom_json_error(
1110 StatusCode::PAYLOAD_TOO_LARGE,
1111 "Batch exceeds maximum upload size",
1112 );
1113 }
1114
1115 let mut hasher = Sha256::new();
1116 hasher.update(&data);
1117 let actual_hash: [u8; 32] = hasher.finalize().into();
1118 if actual_hash != expected_hash {
1119 return blossom_json_error(StatusCode::FORBIDDEN, "Hash mismatch");
1120 }
1121 decode_hash_ms += decode_started.elapsed().as_millis();
1122
1123 let validate_started = Instant::now();
1124 let content_type = blob
1125 .content_type
1126 .as_deref()
1127 .map(content_type_base)
1128 .unwrap_or_else(|| "application/octet-stream".to_string());
1129 if let Err((status, reason)) = validate_upload_payload(
1130 &data,
1131 &content_type,
1132 can_upload,
1133 state.require_random_untrusted_ingest,
1134 ) {
1135 return blossom_json_error(status, reason);
1136 }
1137 validate_ms += validate_started.elapsed().as_millis();
1138
1139 let ext = mime_to_extension(&content_type);
1140 descriptors.push(BlobDescriptor {
1141 url: format!("/{}{}", sha256_hex, ext),
1142 sha256: sha256_hex,
1143 size: data.len() as u64,
1144 mime_type: content_type,
1145 uploaded: now,
1146 });
1147 items.push((actual_hash, data));
1148 }
1149 let prepare_ms = started_at.elapsed().as_millis();
1150
1151 let store = state.store.clone();
1152 let store_started = Instant::now();
1153 let stored = tokio::task::spawn_blocking(move || {
1154 if is_allowed_author {
1155 store.put_owned_blobs(&items, &pubkey_bytes)
1156 } else {
1157 store.put_cached_blobs(&items)
1158 }
1159 })
1160 .await
1161 .map_err(|error| anyhow::anyhow!("blob batch write task failed: {}", error));
1162 let store_ms = store_started.elapsed().as_millis();
1163 let total_ms = started_at.elapsed().as_millis();
1164
1165 match stored {
1166 Ok(Ok(uploaded)) => {
1167 if slow_log_ms.is_some_and(|threshold| total_ms >= threshold) {
1168 tracing::warn!(
1169 blobs = payload_blobs,
1170 uploaded,
1171 total_bytes,
1172 total_ms,
1173 auth_ms,
1174 prepare_ms,
1175 decode_hash_ms,
1176 validate_ms,
1177 store_ms,
1178 allowed_author = is_allowed_author,
1179 "slow Blossom batch upload"
1180 );
1181 }
1182 Response::builder()
1183 .status(StatusCode::OK)
1184 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1185 .header(header::CONTENT_TYPE, "application/json")
1186 .body(Body::from(
1187 serde_json::to_string(&BatchUploadResponse {
1188 uploaded,
1189 blobs: descriptors,
1190 })
1191 .unwrap(),
1192 ))
1193 .unwrap()
1194 }
1195 Ok(Err(error)) | Err(error) => Response::builder()
1196 .status(StatusCode::INTERNAL_SERVER_ERROR)
1197 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1198 .header("X-Reason", "Storage error")
1199 .header(header::CONTENT_TYPE, "application/json")
1200 .body(Body::from(format!(r#"{{"error":"{}"}}"#, error)))
1201 .unwrap(),
1202 }
1203}
1204
1205pub async fn delete_blob(
1208 State(state): State<AppState>,
1209 Path(id): Path<String>,
1210 headers: HeaderMap,
1211) -> impl IntoResponse {
1212 let (hash_part, _) = parse_hash_and_extension(&id);
1213
1214 if !is_valid_sha256(hash_part) {
1215 return Response::builder()
1216 .status(StatusCode::BAD_REQUEST)
1217 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1218 .header("X-Reason", "Invalid SHA256 hash")
1219 .body(Body::empty())
1220 .unwrap();
1221 }
1222
1223 let sha256_hex = hash_part.to_lowercase();
1224
1225 let sha256_bytes = match from_hex(&sha256_hex) {
1227 Ok(b) => b,
1228 Err(_) => {
1229 return Response::builder()
1230 .status(StatusCode::BAD_REQUEST)
1231 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1232 .header("X-Reason", "Invalid SHA256 hash format")
1233 .body(Body::empty())
1234 .unwrap();
1235 }
1236 };
1237
1238 let auth = match verify_blossom_auth(&headers, "delete", Some(&sha256_hex)) {
1240 Ok(a) => a,
1241 Err((status, reason)) => {
1242 return Response::builder()
1243 .status(status)
1244 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1245 .header("X-Reason", reason)
1246 .body(Body::empty())
1247 .unwrap();
1248 }
1249 };
1250
1251 let pubkey_bytes = match from_hex(&auth.pubkey) {
1253 Ok(b) => b,
1254 Err(_) => {
1255 return Response::builder()
1256 .status(StatusCode::BAD_REQUEST)
1257 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1258 .header("X-Reason", "Invalid pubkey format")
1259 .body(Body::empty())
1260 .unwrap();
1261 }
1262 };
1263
1264 match state.store.is_blob_owner(&sha256_bytes, &pubkey_bytes) {
1266 Ok(true) => {
1267 }
1269 Ok(false) => {
1270 match state.store.blob_has_owners(&sha256_bytes) {
1272 Ok(true) => {
1273 return Response::builder()
1274 .status(StatusCode::FORBIDDEN)
1275 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1276 .header("X-Reason", "Not a blob owner")
1277 .body(Body::empty())
1278 .unwrap();
1279 }
1280 Ok(false) => {
1281 return Response::builder()
1282 .status(StatusCode::NOT_FOUND)
1283 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1284 .header(header::CACHE_CONTROL, NOT_FOUND_CACHE_CONTROL)
1285 .header("X-Reason", "Blob not found")
1286 .body(Body::empty())
1287 .unwrap();
1288 }
1289 Err(_) => {
1290 return Response::builder()
1291 .status(StatusCode::INTERNAL_SERVER_ERROR)
1292 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1293 .body(Body::empty())
1294 .unwrap();
1295 }
1296 }
1297 }
1298 Err(_) => {
1299 return Response::builder()
1300 .status(StatusCode::INTERNAL_SERVER_ERROR)
1301 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1302 .body(Body::empty())
1303 .unwrap();
1304 }
1305 }
1306
1307 match state
1309 .store
1310 .delete_blossom_blob(&sha256_bytes, &pubkey_bytes)
1311 {
1312 Ok(fully_deleted) => {
1313 Response::builder()
1316 .status(StatusCode::OK)
1317 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1318 .header(
1319 "X-Blob-Deleted",
1320 if fully_deleted { "true" } else { "false" },
1321 )
1322 .body(Body::empty())
1323 .unwrap()
1324 }
1325 Err(_) => Response::builder()
1326 .status(StatusCode::INTERNAL_SERVER_ERROR)
1327 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1328 .body(Body::empty())
1329 .unwrap(),
1330 }
1331}
1332
1333pub async fn list_blobs(
1335 State(state): State<AppState>,
1336 Path(pubkey): Path<String>,
1337 Query(query): Query<ListQuery>,
1338 headers: HeaderMap,
1339) -> impl IntoResponse {
1340 if pubkey.len() != 64 || !pubkey.chars().all(|c| c.is_ascii_hexdigit()) {
1342 return Response::builder()
1343 .status(StatusCode::BAD_REQUEST)
1344 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1345 .header("X-Reason", "Invalid pubkey format")
1346 .header(header::CONTENT_TYPE, "application/json")
1347 .body(Body::from("[]"))
1348 .unwrap();
1349 }
1350
1351 let pubkey_hex = pubkey.to_lowercase();
1352 let pubkey_bytes: [u8; 32] = match from_hex(&pubkey_hex) {
1353 Ok(b) => b,
1354 Err(_) => {
1355 return Response::builder()
1356 .status(StatusCode::BAD_REQUEST)
1357 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1358 .header("X-Reason", "Invalid pubkey format")
1359 .header(header::CONTENT_TYPE, "application/json")
1360 .body(Body::from("[]"))
1361 .unwrap();
1362 }
1363 };
1364
1365 let auth = match verify_blossom_auth(&headers, "list", None) {
1366 Ok(auth) => auth,
1367 Err((status, reason)) => {
1368 return Response::builder()
1369 .status(status)
1370 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1371 .header("X-Reason", reason)
1372 .header(header::CONTENT_TYPE, "application/json")
1373 .body(Body::from("[]"))
1374 .unwrap();
1375 }
1376 };
1377
1378 if !auth.pubkey.eq_ignore_ascii_case(&pubkey_hex) {
1379 return Response::builder()
1380 .status(StatusCode::FORBIDDEN)
1381 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1382 .header("X-Reason", "Pubkey mismatch")
1383 .header(header::CONTENT_TYPE, "application/json")
1384 .body(Body::from("[]"))
1385 .unwrap();
1386 }
1387
1388 match state.store.list_blobs_by_pubkey(&pubkey_bytes) {
1390 Ok(blobs) => {
1391 let mut filtered: Vec<_> = blobs
1393 .into_iter()
1394 .filter(|b| {
1395 if let Some(since) = query.since {
1396 if b.uploaded < since {
1397 return false;
1398 }
1399 }
1400 if let Some(until) = query.until {
1401 if b.uploaded > until {
1402 return false;
1403 }
1404 }
1405 true
1406 })
1407 .collect();
1408
1409 filtered.sort_by(|a, b| b.uploaded.cmp(&a.uploaded));
1411
1412 let limit = query.limit.unwrap_or(100).min(1000);
1414 filtered.truncate(limit);
1415
1416 Response::builder()
1417 .status(StatusCode::OK)
1418 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1419 .header(header::CONTENT_TYPE, "application/json")
1420 .body(Body::from(serde_json::to_string(&filtered).unwrap()))
1421 .unwrap()
1422 }
1423 Err(_) => Response::builder()
1424 .status(StatusCode::INTERNAL_SERVER_ERROR)
1425 .header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
1426 .header(header::CONTENT_TYPE, "application/json")
1427 .body(Body::from("[]"))
1428 .unwrap(),
1429 }
1430}
1431
1432fn parse_hash_and_extension(id: &str) -> (&str, Option<&str>) {
1435 if let Some(dot_pos) = id.rfind('.') {
1436 (&id[..dot_pos], Some(&id[dot_pos..]))
1437 } else {
1438 (id, None)
1439 }
1440}
1441
1442fn is_valid_sha256(s: &str) -> bool {
1443 s.len() == 64 && s.chars().all(|c| c.is_ascii_hexdigit())
1444}
1445
1446#[cfg(test)]
1447fn store_blossom_blob(
1448 state: &AppState,
1449 data: &[u8],
1450 _sha256: &[u8; 32],
1451 pubkey: &[u8; 32],
1452 track_ownership: bool,
1453) -> anyhow::Result<()> {
1454 if track_ownership {
1455 state.store.put_owned_blob(data, pubkey)?;
1456 } else {
1457 state.store.put_cached_blob(data)?;
1458 }
1459
1460 Ok(())
1461}
1462
1463fn mime_to_extension(mime: &str) -> &'static str {
1464 match mime {
1465 "image/png" => ".png",
1466 "image/jpeg" => ".jpg",
1467 "image/gif" => ".gif",
1468 "image/webp" => ".webp",
1469 "image/svg+xml" => ".svg",
1470 "video/mp4" => ".mp4",
1471 "video/webm" => ".webm",
1472 "audio/mpeg" => ".mp3",
1473 "audio/ogg" => ".ogg",
1474 "application/pdf" => ".pdf",
1475 "text/plain" => ".txt",
1476 "text/html" => ".html",
1477 "application/json" => ".json",
1478 _ => "",
1479 }
1480}
1481
1482#[cfg(test)]
1483mod tests {
1484 use super::*;
1485 use crate::server::auth::WsRelayState;
1486 use crate::storage::HashtreeStore;
1487 use axum::response::IntoResponse;
1488 use base64::Engine;
1489 use hashtree_core::sha256;
1490 use std::collections::HashSet;
1491 use std::sync::{Arc, Mutex as StdMutex};
1492 use std::time::Duration;
1493 use tempfile::TempDir;
1494
1495 fn test_app_state(store: Arc<HashtreeStore>) -> AppState {
1496 AppState {
1497 store,
1498 auth: None,
1499 daemon_started_at: 1_700_000_000,
1500 peer_mode: crate::config::ServerMode::Normal,
1501 hash_get_enabled: true,
1502 http_webrtc_fetch: true,
1503 webrtc_peers: None,
1504 fips_transport: None,
1505 fetch_from_fips_peers: true,
1506 ws_relay: Arc::new(WsRelayState::new()),
1507 max_upload_bytes: 5 * 1024 * 1024,
1508 public_writes: true,
1509 public_plaintext_reads: true,
1510 require_random_untrusted_ingest: true,
1511 optimistic_blossom_uploads: false,
1512 optimistic_upload_queue_bytes: 512 * 1024 * 1024,
1513 optimistic_upload_queue: Arc::new(tokio::sync::Semaphore::new(512 * 1024 * 1024)),
1514 allowed_pubkeys: HashSet::new(),
1515 upstream_blossom: Vec::new(),
1516 social_graph: None,
1517 social_graph_store: None,
1518 social_graph_root: None,
1519 socialgraph_snapshot_public: false,
1520 nostr_relay: None,
1521 nostr_relay_urls: Vec::new(),
1522 tree_root_cache: Arc::new(StdMutex::new(std::collections::HashMap::new())),
1523 inflight_blob_fetches: Arc::new(tokio::sync::Mutex::new(
1524 std::collections::HashMap::new(),
1525 )),
1526 inflight_blob_reads: Arc::new(
1527 tokio::sync::Mutex::new(std::collections::HashMap::new()),
1528 ),
1529 blob_cache: Arc::new(crate::blob_cache::BlobCache::for_tests()),
1530 directory_listing_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1531 resolved_path_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1532 thumbnail_path_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1533 cid_size_cache: Arc::new(StdMutex::new(crate::server::new_lookup_cache())),
1534 }
1535 }
1536
1537 fn create_upload_auth_header(keys: &nostr::Keys) -> String {
1538 use nostr::{EventBuilder, Kind, Tag, TagKind, Timestamp};
1539
1540 let now = Timestamp::now();
1541 let event = EventBuilder::new(Kind::Custom(BLOSSOM_AUTH_KIND), "")
1542 .tags(vec![
1543 Tag::custom(TagKind::Custom("t".into()), vec!["upload".to_string()]),
1544 Tag::custom(
1545 TagKind::Custom("expiration".into()),
1546 vec![(now.as_secs() + 300).to_string()],
1547 ),
1548 ])
1549 .custom_created_at(now)
1550 .sign_with_keys(keys)
1551 .expect("sign blossom auth");
1552 let json = serde_json::to_vec(&event).expect("serialize auth event");
1553 format!(
1554 "Nostr {}",
1555 base64::engine::general_purpose::STANDARD.encode(json)
1556 )
1557 }
1558
1559 fn upload_check_bits(response: UploadCheckResponse) -> Vec<bool> {
1560 let bytes = base64::engine::general_purpose::STANDARD
1561 .decode(response.present)
1562 .expect("decode upload check bitset");
1563 (0..response.count)
1564 .map(|index| bytes[index / 8] & (1 << (index % 8)) != 0)
1565 .collect()
1566 }
1567
1568 #[test]
1569 fn test_is_valid_sha256() {
1570 assert!(is_valid_sha256(
1571 "e2bab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6"
1572 ));
1573 assert!(is_valid_sha256(
1574 "0000000000000000000000000000000000000000000000000000000000000000"
1575 ));
1576
1577 assert!(!is_valid_sha256("e2bab35b5296ec2242ded0a01f6d6723"));
1579 assert!(!is_valid_sha256(
1581 "e2bab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6aa"
1582 ));
1583 assert!(!is_valid_sha256(
1585 "zzbab35b5296ec2242ded0a01f6d6723a5cd921239280c0a5f0b5589303336b6"
1586 ));
1587 assert!(!is_valid_sha256(""));
1589 }
1590
1591 #[test]
1592 fn test_parse_hash_and_extension() {
1593 let (hash, ext) = parse_hash_and_extension("abc123.png");
1594 assert_eq!(hash, "abc123");
1595 assert_eq!(ext, Some(".png"));
1596
1597 let (hash2, ext2) = parse_hash_and_extension("abc123");
1598 assert_eq!(hash2, "abc123");
1599 assert_eq!(ext2, None);
1600
1601 let (hash3, ext3) = parse_hash_and_extension("abc.123.jpg");
1602 assert_eq!(hash3, "abc.123");
1603 assert_eq!(ext3, Some(".jpg"));
1604 }
1605
1606 #[test]
1607 fn test_mime_to_extension() {
1608 assert_eq!(mime_to_extension("image/png"), ".png");
1609 assert_eq!(mime_to_extension("image/jpeg"), ".jpg");
1610 assert_eq!(mime_to_extension("video/mp4"), ".mp4");
1611 assert_eq!(mime_to_extension("application/octet-stream"), "");
1612 assert_eq!(mime_to_extension("unknown/type"), "");
1613 }
1614
1615 #[tokio::test]
1616 async fn upload_check_reports_present_hashes_in_request_order() {
1617 let temp_dir = TempDir::new().expect("temp dir");
1618 let store = Arc::new(
1619 HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1620 );
1621
1622 let present = b"present blob";
1623 let missing = b"missing blob";
1624 let present_hash = sha256(present);
1625 let missing_hash = sha256(missing);
1626 store.put_cached_blob(present).expect("seed blob");
1627
1628 let state = test_app_state(store);
1629 let response = upload_check(
1630 State(state),
1631 Json(UploadCheckRequest {
1632 hashes: vec![
1633 hex::encode(missing_hash),
1634 hex::encode(present_hash),
1635 hex::encode(present_hash),
1636 ],
1637 }),
1638 )
1639 .await
1640 .into_response();
1641
1642 assert_eq!(response.status(), StatusCode::OK);
1643 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
1644 .await
1645 .expect("read response body");
1646 let parsed: UploadCheckResponse =
1647 serde_json::from_slice(&body).expect("parse upload check response");
1648 assert_eq!(upload_check_bits(parsed), vec![false, true, true]);
1649 }
1650
1651 #[tokio::test]
1652 async fn upload_check_rejects_invalid_hash() {
1653 let temp_dir = TempDir::new().expect("temp dir");
1654 let store = Arc::new(
1655 HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1656 );
1657 let state = test_app_state(store);
1658 let response = upload_check(
1659 State(state),
1660 Json(UploadCheckRequest {
1661 hashes: vec!["not-a-sha256".to_string()],
1662 }),
1663 )
1664 .await
1665 .into_response();
1666
1667 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
1668 }
1669
1670 #[tokio::test]
1671 async fn upload_check_rejects_too_many_hashes() {
1672 let temp_dir = TempDir::new().expect("temp dir");
1673 let store = Arc::new(
1674 HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1675 );
1676 let state = test_app_state(store);
1677 let response = upload_check(
1678 State(state),
1679 Json(UploadCheckRequest {
1680 hashes: vec!["00".repeat(32); MAX_UPLOAD_CHECK_HASHES + 1],
1681 }),
1682 )
1683 .await
1684 .into_response();
1685
1686 assert_eq!(response.status(), StatusCode::PAYLOAD_TOO_LARGE);
1687 }
1688
1689 #[tokio::test]
1690 async fn optimistic_uploads_return_accepted_and_store_in_background() {
1691 let temp_dir = TempDir::new().expect("temp dir");
1692 let store = Arc::new(
1693 HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1694 );
1695 let mut state = test_app_state(Arc::clone(&store));
1696 state.optimistic_blossom_uploads = true;
1697
1698 let keys = nostr::Keys::generate();
1699 let mut headers = HeaderMap::new();
1700 headers.insert(
1701 header::AUTHORIZATION,
1702 create_upload_auth_header(&keys)
1703 .parse()
1704 .expect("auth header value"),
1705 );
1706 headers.insert(
1707 header::CONTENT_TYPE,
1708 "application/octet-stream"
1709 .parse()
1710 .expect("content type header value"),
1711 );
1712
1713 let body = axum::body::Bytes::from((0u8..=255).map(|byte| byte ^ 0x55).collect::<Vec<_>>());
1714 let hash = sha256(&body);
1715 let response = upload_blob(State(state), headers, body)
1716 .await
1717 .into_response();
1718 assert_eq!(response.status(), StatusCode::ACCEPTED);
1719
1720 for _ in 0..50 {
1721 if store.blob_exists(&hash).expect("blob exists check") {
1722 return;
1723 }
1724 tokio::time::sleep(Duration::from_millis(10)).await;
1725 }
1726
1727 panic!("optimistic upload was not stored in the background");
1728 }
1729
1730 #[tokio::test]
1731 async fn optimistic_upload_existing_blob_skips_queue() {
1732 let temp_dir = TempDir::new().expect("temp dir");
1733 let store = Arc::new(
1734 HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1735 );
1736 let body = axum::body::Bytes::from(
1737 (0u16..=255)
1738 .map(|value| ((value * 73 + 19) % 256) as u8)
1739 .collect::<Vec<_>>(),
1740 );
1741 store.put_cached_blob(&body).expect("seed blob");
1742
1743 let mut state = test_app_state(Arc::clone(&store));
1744 state.optimistic_blossom_uploads = true;
1745 state.optimistic_upload_queue_bytes = 1;
1746 state.optimistic_upload_queue = Arc::new(tokio::sync::Semaphore::new(1));
1747
1748 let keys = nostr::Keys::generate();
1749 let mut headers = HeaderMap::new();
1750 headers.insert(
1751 header::AUTHORIZATION,
1752 create_upload_auth_header(&keys)
1753 .parse()
1754 .expect("auth header value"),
1755 );
1756 headers.insert(
1757 header::CONTENT_TYPE,
1758 "application/octet-stream"
1759 .parse()
1760 .expect("content type header value"),
1761 );
1762
1763 let response = upload_blob(State(state), headers, body)
1764 .await
1765 .into_response();
1766 assert_eq!(response.status(), StatusCode::CONFLICT);
1767 }
1768
1769 #[tokio::test]
1770 async fn optimistic_upload_existing_blob_uses_queue_before_preflight_when_queue_has_room() {
1771 let temp_dir = TempDir::new().expect("temp dir");
1772 let store = Arc::new(
1773 HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1774 );
1775 let body = axum::body::Bytes::from((0u8..=255).rev().collect::<Vec<_>>());
1776 let hash_hex = hex::encode(sha256(&body));
1777 store.put_cached_blob(&body).expect("seed blob");
1778
1779 let mut state = test_app_state(store);
1780 state.optimistic_blossom_uploads = true;
1781
1782 let keys = nostr::Keys::generate();
1783 let mut headers = HeaderMap::new();
1784 headers.insert(
1785 header::AUTHORIZATION,
1786 create_upload_auth_header(&keys)
1787 .parse()
1788 .expect("auth header value"),
1789 );
1790 headers.insert(
1791 header::CONTENT_TYPE,
1792 "application/octet-stream"
1793 .parse()
1794 .expect("content type header value"),
1795 );
1796
1797 let response = upload_blob(State(state), headers, body)
1798 .await
1799 .into_response();
1800 assert_eq!(response.status(), StatusCode::ACCEPTED);
1801
1802 for _ in 0..50 {
1803 if !optimistic_upload_is_inflight(&hash_hex) {
1804 return;
1805 }
1806 tokio::time::sleep(Duration::from_millis(10)).await;
1807 }
1808
1809 clear_optimistic_upload_inflight(&hash_hex);
1810 panic!("optimistic upload in-flight marker was not cleared");
1811 }
1812
1813 #[tokio::test]
1814 async fn optimistic_upload_inflight_duplicate_skips_queue() {
1815 let temp_dir = TempDir::new().expect("temp dir");
1816 let store = Arc::new(
1817 HashtreeStore::with_options(temp_dir.path(), None, 128 * 1024 * 1024).expect("store"),
1818 );
1819 let body = axum::body::Bytes::from((0u8..=255).collect::<Vec<_>>());
1820 let hash_hex = hex::encode(sha256(&body));
1821 assert!(mark_optimistic_upload_inflight(&hash_hex));
1822
1823 let mut state = test_app_state(store);
1824 state.optimistic_blossom_uploads = true;
1825 state.optimistic_upload_queue_bytes = 1;
1826 state.optimistic_upload_queue = Arc::new(tokio::sync::Semaphore::new(1));
1827
1828 let keys = nostr::Keys::generate();
1829 let mut headers = HeaderMap::new();
1830 headers.insert(
1831 header::AUTHORIZATION,
1832 create_upload_auth_header(&keys)
1833 .parse()
1834 .expect("auth header value"),
1835 );
1836 headers.insert(
1837 header::CONTENT_TYPE,
1838 "application/octet-stream"
1839 .parse()
1840 .expect("content type header value"),
1841 );
1842
1843 let response = upload_blob(State(state), headers, body)
1844 .await
1845 .into_response();
1846 clear_optimistic_upload_inflight(&hash_hex);
1847 assert_eq!(response.status(), StatusCode::ACCEPTED);
1848 }
1849
1850 #[test]
1851 fn public_writes_accept_unlisted_authors_for_uploads() {
1852 let temp_dir = TempDir::new().expect("temp dir");
1853 let store =
1854 Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 700).expect("store"));
1855 let mut state = test_app_state(store);
1856 let pubkey = "ea4fe79e57f209309bffed2f92f0b95b59d3d1cb4e8892444398aeea7ee317ed";
1857
1858 state.public_writes = true;
1859 assert!(can_accept_upload_author(&state, pubkey));
1860 assert!(!is_allowed_write_author(&state, pubkey));
1861
1862 state.public_writes = false;
1863 assert!(!can_accept_upload_author(&state, pubkey));
1864 }
1865
1866 #[test]
1867 fn public_write_trust_allows_octet_stream_and_raw_media_payloads() {
1868 let encrypted_block: Vec<u8> = (0..=255).collect();
1869
1870 assert_eq!(
1871 validate_upload_payload(&encrypted_block, "application/octet-stream", false, true,),
1872 Ok(())
1873 );
1874
1875 assert_eq!(
1876 validate_upload_payload(b"audio bytes", "audio/mpeg", true, true,),
1877 Ok(())
1878 );
1879
1880 assert_eq!(
1881 validate_upload_payload(b"audio bytes", "audio/mpeg", false, true,),
1882 Err((
1883 StatusCode::FORBIDDEN,
1884 "Raw media uploads require write access".to_string(),
1885 ))
1886 );
1887 }
1888
1889 #[test]
1890 fn authenticated_chk_uploads_skip_entropy_heuristic() {
1891 let low_unique_block: Vec<u8> = (0..256).map(|i| (i % 139) as u8).collect();
1892
1893 assert_eq!(
1894 validate_upload_payload(&low_unique_block, "application/octet-stream", true, true,),
1895 Ok(())
1896 );
1897
1898 assert_eq!(
1899 validate_upload_payload(&low_unique_block, "application/octet-stream", false, true,),
1900 Err((
1901 StatusCode::UNSUPPORTED_MEDIA_TYPE,
1902 "Data not encrypted. Unique: 139 (min: 140)".to_string(),
1903 ))
1904 );
1905 }
1906
1907 #[test]
1908 fn unowned_public_uploads_use_cache_storage_semantics() {
1909 let temp_dir = TempDir::new().expect("temp dir");
1910 let store =
1911 Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 700).expect("store"));
1912 let state = test_app_state(Arc::clone(&store));
1913
1914 let owned = vec![1u8; 280];
1915 let owned_hash = sha256(&owned);
1916 store_blossom_blob(&state, &owned, &owned_hash, &[2u8; 32], true).expect("owned upload");
1917
1918 let public_upload = vec![3u8; 280];
1919 let public_hash = sha256(&public_upload);
1920 store_blossom_blob(&state, &public_upload, &public_hash, &[4u8; 32], false)
1921 .expect("public upload");
1922
1923 let replacement = vec![5u8; 280];
1924 let replacement_hash = sha256(&replacement);
1925 state
1926 .store
1927 .put_cached_blob(&replacement)
1928 .expect("replacement cached blob");
1929
1930 assert!(state.store.blob_exists(&owned_hash).expect("owned exists"));
1931 assert!(!state
1932 .store
1933 .blob_exists(&public_hash)
1934 .expect("public upload evicted"));
1935 assert!(state
1936 .store
1937 .blob_exists(&replacement_hash)
1938 .expect("replacement exists"));
1939 assert!(state
1940 .store
1941 .is_blob_owner(&owned_hash, &[2u8; 32])
1942 .expect("owned tracked"));
1943 assert!(!state
1944 .store
1945 .blob_has_owners(&public_hash)
1946 .expect("public upload unowned"));
1947 }
1948
1949 #[test]
1950 fn owned_blossom_uploads_are_rejected_when_storage_limit_is_full() {
1951 let temp_dir = TempDir::new().expect("temp dir");
1952 let store =
1953 Arc::new(HashtreeStore::with_options(temp_dir.path(), None, 500).expect("store"));
1954 let state = test_app_state(Arc::clone(&store));
1955
1956 let first = vec![1u8; 300];
1957 let first_hash = sha256(&first);
1958 let owner = [2u8; 32];
1959 store_blossom_blob(&state, &first, &first_hash, &owner, true).expect("first upload");
1960
1961 let second = vec![3u8; 300];
1962 let second_hash = sha256(&second);
1963 let error = store_blossom_blob(&state, &second, &second_hash, &owner, true)
1964 .expect_err("second owned upload should exceed the storage limit");
1965
1966 assert!(
1967 error.to_string().contains("storage limit"),
1968 "unexpected error: {error}"
1969 );
1970 assert!(state
1971 .store
1972 .blob_exists(&first_hash)
1973 .expect("first blob remains"));
1974 assert!(!state
1975 .store
1976 .blob_exists(&second_hash)
1977 .expect("second blob rejected"));
1978 assert!(state
1979 .store
1980 .is_blob_owner(&first_hash, &owner)
1981 .expect("first owner tracked"));
1982 assert!(!state
1983 .store
1984 .is_blob_owner(&second_hash, &owner)
1985 .expect("second owner not tracked"));
1986 }
1987}