use super::*;
use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
use crate::socialgraph;
use crate::storage::HashtreeStore;
use crate::webrtc::{
ConnectionState, PeerDirection, PeerEntry, PeerPool, PeerRootEvent, PeerSignalPath,
PeerTransport, WebRTCState,
};
use axum::{
body::{to_bytes, Body},
extract::{Path as AxumPath, State as AxumState},
response::IntoResponse,
routing::get,
Router,
};
use futures::{SinkExt, StreamExt};
use hashtree_core::DirEntry;
use http_body_util::BodyExt;
use nostr::{
nips::nip19::ToBech32, Alphabet, ClientMessage as NostrClientMessage, EventBuilder,
JsonUtil as NostrJsonUtil, Keys, Kind, RelayMessage as NostrRelayMessage, SingleLetterTag, Tag,
TagKind, Timestamp,
};
use sha2::Digest;
use std::{
collections::{BTreeSet, HashSet},
net::SocketAddr,
time::Instant,
};
use tempfile::TempDir;
use tokio::time::timeout;
use tokio_tungstenite::{accept_async, tungstenite::Message as TungsteniteMessage};
#[derive(Clone)]
struct UpstreamBlobTestState {
store: Arc<HashtreeStore>,
requested_ids: Arc<std::sync::Mutex<Vec<String>>>,
}
async fn serve_blob_for_test(
AxumState(store): AxumState<Arc<HashtreeStore>>,
AxumPath(id): AxumPath<String>,
) -> Response<Body> {
let id = id.strip_suffix(".bin").unwrap_or(&id).to_string();
let Ok(hash) = from_hex(&id) else {
return Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from("invalid hash"))
.unwrap();
};
match store.get_blob(&hash) {
Ok(Some(data)) => Response::builder()
.status(StatusCode::OK)
.body(Body::from(data))
.unwrap(),
Ok(None) => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("missing"))
.unwrap(),
Err(err) => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(err.to_string()))
.unwrap(),
}
}
async fn serve_blob_with_request_log_for_test(
AxumState(state): AxumState<UpstreamBlobTestState>,
AxumPath(id): AxumPath<String>,
) -> Response<Body> {
state.requested_ids.lock().unwrap().push(id.clone());
let id = id.strip_suffix(".bin").unwrap_or(&id).to_string();
let Ok(hash) = from_hex(&id) else {
return Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from("invalid hash"))
.unwrap();
};
match state.store.get_blob(&hash) {
Ok(Some(data)) => Response::builder()
.status(StatusCode::OK)
.body(Body::from(data))
.unwrap(),
Ok(None) => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("missing"))
.unwrap(),
Err(err) => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(err.to_string()))
.unwrap(),
}
}
fn test_app_state(store: Arc<HashtreeStore>, upstream_blossom: Vec<String>) -> AppState {
AppState {
store,
auth: None,
daemon_started_at: 1_700_000_000,
peer_mode: crate::config::ServerMode::Normal,
hash_get_enabled: true,
http_webrtc_fetch: true,
webrtc_peers: None,
ws_relay: Arc::new(crate::server::auth::WsRelayState::new()),
max_upload_bytes: 5 * 1024 * 1024,
public_writes: true,
require_random_untrusted_ingest: false,
optimistic_blossom_uploads: false,
optimistic_upload_queue_bytes: 512 * 1024 * 1024,
optimistic_upload_queue: Arc::new(tokio::sync::Semaphore::new(512 * 1024 * 1024)),
allowed_pubkeys: HashSet::new(),
upstream_blossom,
social_graph: None,
social_graph_store: None,
social_graph_root: None,
socialgraph_snapshot_public: false,
nostr_relay: None,
nostr_relay_urls: Vec::new(),
tree_root_cache: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
inflight_blob_fetches: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())),
inflight_blob_reads: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())),
blob_cache: Arc::new(crate::blob_cache::BlobCache::for_tests()),
directory_listing_cache: Arc::new(std::sync::Mutex::new(crate::server::new_lookup_cache())),
resolved_path_cache: Arc::new(std::sync::Mutex::new(crate::server::new_lookup_cache())),
thumbnail_path_cache: Arc::new(std::sync::Mutex::new(crate::server::new_lookup_cache())),
cid_size_cache: Arc::new(std::sync::Mutex::new(crate::server::new_lookup_cache())),
}
}
async fn sample_webrtc_state() -> Arc<WebRTCState> {
let state = Arc::new(WebRTCState::new());
let peer_id = crate::webrtc::PeerId::new(
"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef".to_string(),
);
let peer_key = peer_id.to_string();
let signal_paths = BTreeSet::from([PeerSignalPath::Relay, PeerSignalPath::Multicast]);
state.runtime.peers.write().await.insert(
peer_key.clone(),
PeerEntry {
peer_id,
direction: PeerDirection::Outbound,
state: ConnectionState::Connected,
last_seen: Instant::now(),
peer: None,
pool: PeerPool::Follows,
transport: PeerTransport::WebRtc,
signal_paths,
bytes_sent: 64,
bytes_received: 128,
},
);
state.record_sent(&peer_key, 16).await;
state.record_received(&peer_key, 32).await;
state
}
async fn test_nostr_relay(dir: &TempDir, allowed_pubkey: String) -> Arc<NostrRelay> {
let graph_store =
socialgraph::open_social_graph_store_with_mapsize(dir.path(), Some(128 * 1024 * 1024))
.unwrap();
let backend: Arc<dyn socialgraph::SocialGraphBackend> = graph_store.clone();
let mut allowed = HashSet::new();
allowed.insert(allowed_pubkey.clone());
let access = Arc::new(socialgraph::SocialGraphAccessControl::new(
Arc::clone(&backend),
0,
allowed,
));
Arc::new(
NostrRelay::new(
backend,
dir.path().join("relay"),
HashSet::from([allowed_pubkey]),
Some(access),
NostrRelayConfig {
spambox_db_max_bytes: 0,
..Default::default()
},
)
.unwrap(),
)
}
async fn spawn_mock_upstream_relay(events: Vec<nostr::Event>) -> String {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("bind relay");
let addr = listener.local_addr().expect("relay addr");
tokio::spawn(async move {
let (stream, _) = listener.accept().await.expect("accept relay");
let ws = accept_async(stream).await.expect("accept websocket");
let (mut write, mut read) = ws.split();
while let Some(Ok(message)) = read.next().await {
let TungsteniteMessage::Text(text) = message else {
continue;
};
let Ok(parsed) = NostrClientMessage::from_json(text.as_bytes()) else {
continue;
};
if let NostrClientMessage::Req {
subscription_id,
filters,
} = parsed
{
for event in events
.iter()
.filter(|event| filters.iter().any(|filter| filter.match_event(event)))
{
let _ = write
.send(TungsteniteMessage::Text(
NostrRelayMessage::event(subscription_id.clone(), event.clone())
.as_json()
.into(),
))
.await;
}
let _ = write
.send(TungsteniteMessage::Text(
NostrRelayMessage::eose(subscription_id).as_json().into(),
))
.await;
}
}
});
format!("ws://{}", addr)
}
#[tokio::test]
async fn test_query_upstream_blossom_no_servers() {
let servers: Vec<String> = vec![];
let result = query_upstream_blossom(&servers, "abc123").await;
assert!(result.is_none());
}
#[tokio::test]
async fn await_webrtc_peer_response_returns_success() {
let result = await_webrtc_peer_response(
async { Some((b"ok".to_vec(), "peer-a".to_string())) },
"abcd1234",
Duration::from_millis(10),
)
.await;
assert_eq!(result, Some((b"ok".to_vec(), "peer-a".to_string())));
}
#[tokio::test]
async fn webrtc_peers_reports_transport_and_signal_paths() {
let temp = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp.path()).unwrap());
let mut state = test_app_state(store, vec![]);
state.webrtc_peers = Some(sample_webrtc_state().await);
let response = webrtc_peers(AxumState(state)).await.into_response();
assert_eq!(response.status(), StatusCode::OK);
let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["enabled"], true);
assert_eq!(json["transport_counts"]["webrtc"], 1);
assert_eq!(json["transport_counts"]["bluetooth"], 0);
assert_eq!(json["bytes_sent"], 16);
assert_eq!(json["bytes_received"], 32);
assert_eq!(json["peers"][0]["transport"], "webrtc");
assert_eq!(json["peers"][0]["bytes_sent"], 80);
assert_eq!(json["peers"][0]["bytes_received"], 160);
assert_eq!(
json["peers"][0]["signal_paths"],
json!(["relay", "multicast"])
);
}
#[tokio::test]
async fn daemon_status_exposes_mesh_alias_with_transport_metadata() {
let temp = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp.path()).unwrap());
let mut state = test_app_state(store, vec![]);
state.webrtc_peers = Some(sample_webrtc_state().await);
state.nostr_relay_urls = vec![
"wss://relay.damus.io".to_string(),
"wss://nos.lol".to_string(),
];
state.ws_relay.note_upstream_relay_send(512);
state.ws_relay.note_upstream_relay_receive(1024);
crate::server::status_metrics::record_http_status_for_test(StatusCode::SWITCHING_PROTOCOLS);
crate::server::status_metrics::record_http_status_for_test(StatusCode::OK);
crate::server::status_metrics::record_http_status_for_test(StatusCode::NOT_FOUND);
crate::server::status_metrics::record_http_status_for_test(StatusCode::SERVICE_UNAVAILABLE);
let response = daemon_status(
AxumState(state),
axum::extract::ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 21417))),
)
.await
.into_response();
assert_eq!(response.status(), StatusCode::OK);
let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["mesh"]["enabled"], true);
assert_eq!(json["mesh"]["transport_counts"]["webrtc"], 1);
assert_eq!(json["mesh"]["bytes_sent"], 16);
assert_eq!(json["mesh"]["bytes_received"], 32);
assert_eq!(json["mesh"]["peers"][0]["transport"], "webrtc");
assert_eq!(json["mesh"]["peers"][0]["capabilities"]["hash_get"], true);
assert_eq!(json["webrtc"], json["mesh"]);
assert_eq!(json["relay"]["enabled"], true);
assert_eq!(json["relay"]["bytes_sent"], 512);
assert_eq!(json["relay"]["bytes_received"], 1024);
assert_eq!(json["upstream"]["nostr_relays"], 2);
assert_eq!(json["mode"], "normal");
assert_eq!(json["capabilities"]["hash_get"], true);
assert_eq!(json["capabilities"]["http_webrtc_fetch"], true);
assert_eq!(json["daemon_started_at"], 1_700_000_000u64);
assert!(json["uptime_seconds"].as_u64().unwrap() > 0);
assert!(json["queues"]["blob_reads"]["limit"].as_u64().unwrap() > 0);
assert!(json["queues"]["blob_writes"]["limit"].as_u64().unwrap() > 0);
assert_eq!(
json["queues"]["optimistic_uploads"]["max_bytes"],
512 * 1024 * 1024u64
);
assert!(
json["http"]["status_classes"]["recent"]["1xx"]
.as_u64()
.unwrap()
>= 1
);
assert!(
json["http"]["status_classes"]["recent"]["2xx"]
.as_u64()
.unwrap()
>= 1
);
assert!(
json["http"]["status_classes"]["recent"]["4xx"]
.as_u64()
.unwrap()
>= 1
);
assert!(
json["http"]["status_classes"]["recent"]["5xx"]
.as_u64()
.unwrap()
>= 1
);
}
#[tokio::test]
async fn daemon_status_reports_assist_mode_and_disabled_hash_get() {
let temp = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp.path()).unwrap());
let mut state = test_app_state(store, vec![]);
state.peer_mode = crate::config::ServerMode::Assist;
state.hash_get_enabled = false;
let response = daemon_status(
AxumState(state),
axum::extract::ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 21417))),
)
.await
.into_response();
assert_eq!(response.status(), StatusCode::OK);
let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["mode"], "assist");
assert_eq!(json["capabilities"]["hash_get"], false);
}
#[tokio::test]
async fn await_webrtc_peer_response_times_out() {
let result = await_webrtc_peer_response(
std::future::pending::<Option<(Vec<u8>, String)>>(),
"abcd1234",
Duration::from_millis(10),
)
.await;
assert!(result.is_none());
}
#[tokio::test]
async fn first_available_fetch_prefers_fast_success() {
let result = first_available_fetch(vec![
async {
tokio::time::sleep(Duration::from_millis(20)).await;
Some("slow")
}
.boxed(),
async {
tokio::time::sleep(Duration::from_millis(5)).await;
Some("fast")
}
.boxed(),
])
.await;
assert_eq!(result, Some("fast"));
}
#[tokio::test]
async fn first_available_fetch_skips_empty_results() {
let result = first_available_fetch(vec![
async { None::<&'static str> }.boxed(),
async {
tokio::time::sleep(Duration::from_millis(5)).await;
Some("available")
}
.boxed(),
])
.await;
assert_eq!(result, Some("available"));
}
#[tokio::test]
async fn await_fetch_task_returns_result() {
let result = await_fetch_task("test", "abc123", async { Some(7usize) }).await;
assert_eq!(result, Some(7));
}
#[tokio::test]
async fn await_fetch_task_recovers_from_panic() {
let result: Option<usize> = await_fetch_task("test", "abc123", async move {
panic!("boom");
})
.await;
assert!(result.is_none());
}
#[tokio::test]
async fn test_query_upstream_blossom_invalid_server() {
let servers = vec!["http://localhost:99999".to_string()];
let result = query_upstream_blossom(&servers, "abc123").await;
assert!(result.is_none());
}
#[tokio::test]
async fn test_query_upstream_blossom_hash_format() {
let servers = vec!["http://localhost:99999".to_string()];
let hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
let result = query_upstream_blossom(&servers, hash).await;
assert!(result.is_none());
}
#[tokio::test]
async fn test_query_upstream_blossom_uses_bin_suffix() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("store")).unwrap());
let requested_ids = Arc::new(std::sync::Mutex::new(Vec::new()));
let data = b"hello blossom";
store.put_blob(data).unwrap();
let hash_hex = hex::encode(sha2::Sha256::digest(data));
let upstream_router = Router::new()
.route("/:id", get(serve_blob_with_request_log_for_test))
.with_state(UpstreamBlobTestState {
store: store.clone(),
requested_ids: requested_ids.clone(),
});
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let upstream_addr = listener.local_addr().unwrap();
let upstream_server =
tokio::spawn(async move { axum::serve(listener, upstream_router).await.unwrap() });
let result = query_upstream_blossom(&[format!("http://{}", upstream_addr)], &hash_hex)
.await
.expect("fetch blob");
assert_eq!(result.0, data);
assert_eq!(result.1, format!("http://{}", upstream_addr));
assert_eq!(
requested_ids.lock().unwrap().as_slice(),
&[format!("{}.bin", hash_hex)]
);
upstream_server.abort();
}
#[tokio::test]
async fn query_upstream_blossom_uses_first_server_that_responds() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("store")).unwrap());
let requested_ids = Arc::new(std::sync::Mutex::new(Vec::new()));
let data = b"parallel blossom";
store.put_blob(data).unwrap();
let hash_hex = hex::encode(sha2::Sha256::digest(data));
let slow_router = Router::new().route(
"/:id",
get(|| async {
tokio::time::sleep(Duration::from_secs(11)).await;
StatusCode::GATEWAY_TIMEOUT
}),
);
let slow_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let slow_addr = slow_listener.local_addr().unwrap();
let slow_server =
tokio::spawn(async move { axum::serve(slow_listener, slow_router).await.unwrap() });
let fast_router = Router::new()
.route("/:id", get(serve_blob_with_request_log_for_test))
.with_state(UpstreamBlobTestState {
store: store.clone(),
requested_ids: requested_ids.clone(),
});
let fast_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let fast_addr = fast_listener.local_addr().unwrap();
let fast_server =
tokio::spawn(async move { axum::serve(fast_listener, fast_router).await.unwrap() });
let result = timeout(
Duration::from_secs(3),
query_upstream_blossom(
&[
format!("http://{}", slow_addr),
format!("http://{}", fast_addr),
],
&hash_hex,
),
)
.await
.expect("parallel upstream query completed")
.expect("fetch blob");
assert_eq!(result.0, data);
assert_eq!(result.1, format!("http://{}", fast_addr));
assert_eq!(
requested_ids.lock().unwrap().as_slice(),
&[format!("{}.bin", hash_hex)]
);
slow_server.abort();
fast_server.abort();
}
#[tokio::test]
async fn ensure_blob_available_coalesces_concurrent_upstream_fetches() {
let source_dir = TempDir::new().unwrap();
let source_store = Arc::new(HashtreeStore::new(source_dir.path().join("source-db")).unwrap());
let requested_ids = Arc::new(std::sync::Mutex::new(Vec::new()));
let data = b"shared-upstream-blob";
source_store.put_blob(data).unwrap();
let hash = from_hex(&hex::encode(sha2::Sha256::digest(data))).unwrap();
let upstream_router = Router::new()
.route("/:id", get(serve_blob_with_request_log_for_test))
.with_state(UpstreamBlobTestState {
store: source_store.clone(),
requested_ids: requested_ids.clone(),
});
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let upstream_addr = listener.local_addr().unwrap();
let upstream_server =
tokio::spawn(async move { axum::serve(listener, upstream_router).await.unwrap() });
let local_dir = TempDir::new().unwrap();
let local_store = Arc::new(HashtreeStore::new(local_dir.path().join("local-db")).unwrap());
let state = test_app_state(
local_store.clone(),
vec![format!("http://{}", upstream_addr)],
);
let (first, second, third) = tokio::join!(
ensure_blob_available(&state, &hash),
ensure_blob_available(&state, &hash),
ensure_blob_available(&state, &hash),
);
assert_eq!(first.unwrap(), true);
assert_eq!(second.unwrap(), true);
assert_eq!(third.unwrap(), true);
assert_eq!(
requested_ids.lock().unwrap().as_slice(),
&[format!("{}.bin", hex::encode(hash))]
);
assert!(local_store.get_blob(&hash).unwrap().is_some());
upstream_server.abort();
}
#[tokio::test]
async fn resolve_thumbnail_path_prefers_root_thumbnail() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("store")).unwrap());
let tree = HashTree::new(HashTreeConfig::new(store.store_arc()));
let state = test_app_state(store.clone(), Vec::new());
let (thumb_cid, _size) = tree.put(b"thumb").await.unwrap();
let root_cid = tree
.put_directory(vec![
DirEntry::from_cid("thumbnail.jpg", &thumb_cid).with_link_type(LinkType::File)
])
.await
.unwrap();
let resolved = resolve_thumbnail_path(&state, &tree, &root_cid, "thumbnail")
.await
.unwrap();
assert_eq!(resolved.as_deref(), Some("thumbnail.jpg"));
}
#[tokio::test]
async fn resolve_thumbnail_path_accepts_generic_image_names() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("store")).unwrap());
let tree = HashTree::new(HashTreeConfig::new(store.store_arc()));
let state = test_app_state(store.clone(), Vec::new());
let (thumb_cid, _size) = tree.put(b"thumb").await.unwrap();
let root_cid = tree
.put_directory(vec![
DirEntry::from_cid("cover.jpeg", &thumb_cid).with_link_type(LinkType::File)
])
.await
.unwrap();
let resolved = resolve_thumbnail_path(&state, &tree, &root_cid, "thumbnail")
.await
.unwrap();
assert_eq!(resolved.as_deref(), Some("cover.jpeg"));
}
#[tokio::test]
async fn resolve_thumbnail_path_falls_back_to_subdir() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("store")).unwrap());
let tree = HashTree::new(HashTreeConfig::new(store.store_arc()));
let state = test_app_state(store.clone(), Vec::new());
let (thumb_cid, _size) = tree.put(b"thumb").await.unwrap();
let subdir_cid = tree
.put_directory(vec![
DirEntry::from_cid("thumbnail.png", &thumb_cid).with_link_type(LinkType::File)
])
.await
.unwrap();
let (meta_cid, _size) = tree.put(b"{}").await.unwrap();
let root_cid = tree
.put_directory(vec![
DirEntry::from_cid("clip", &subdir_cid).with_link_type(LinkType::Dir),
DirEntry::from_cid("meta.json", &meta_cid).with_link_type(LinkType::File),
])
.await
.unwrap();
let resolved = resolve_thumbnail_path(&state, &tree, &root_cid, "thumbnail")
.await
.unwrap();
assert_eq!(resolved.as_deref(), Some("clip/thumbnail.png"));
}
#[tokio::test]
async fn resolve_thumbnail_path_fetches_missing_subdir_from_upstream() {
let source_dir = TempDir::new().unwrap();
let source_store = Arc::new(HashtreeStore::new(source_dir.path().join("source-db")).unwrap());
let source_tree = HashTree::new(HashTreeConfig::new(source_store.store_arc()));
let (thumb_cid, _size) = source_tree.put(b"thumb").await.unwrap();
let subdir_cid = source_tree
.put_directory(vec![
DirEntry::from_cid("thumbnail.jpg", &thumb_cid).with_link_type(LinkType::File)
])
.await
.unwrap();
let root_cid = source_tree
.put_directory(vec![
DirEntry::from_cid("clip", &subdir_cid).with_link_type(LinkType::Dir)
])
.await
.unwrap();
let upstream_router = Router::new()
.route("/:id", get(serve_blob_for_test))
.with_state(source_store.clone());
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let upstream_addr = listener.local_addr().unwrap();
let upstream_server =
tokio::spawn(async move { axum::serve(listener, upstream_router).await.unwrap() });
let local_dir = TempDir::new().unwrap();
let local_store = Arc::new(HashtreeStore::new(local_dir.path().join("local-db")).unwrap());
let state = test_app_state(
local_store.clone(),
vec![format!("http://{}", upstream_addr)],
);
let local_tree = HashTree::new(HashTreeConfig::new(local_store.store_arc()));
let resolved = resolve_thumbnail_path(&state, &local_tree, &root_cid, "thumbnail")
.await
.unwrap();
assert_eq!(resolved.as_deref(), Some("clip/thumbnail.jpg"));
upstream_server.abort();
}
#[tokio::test]
async fn resolve_directory_target_prefers_root_index() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("store")).unwrap());
let tree = HashTree::new(HashTreeConfig::new(store.store_arc()));
let state = test_app_state(store.clone(), Vec::new());
let (index_cid, _size) = tree.put(b"<html>ok</html>").await.unwrap();
let root_cid = tree
.put_directory(vec![
DirEntry::from_cid("index.html", &index_cid).with_link_type(LinkType::File)
])
.await
.unwrap();
let target = resolve_directory_target(&state, &tree, &root_cid, None)
.await
.expect("resolve")
.expect("target");
match target {
DirectoryTarget::File { cid, path } => {
assert_eq!(cid, index_cid);
assert_eq!(path, "index.html");
}
DirectoryTarget::DirectoryListing { .. } => panic!("expected file target"),
}
}
#[tokio::test]
async fn resolve_directory_target_prefers_subdir_index() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("store")).unwrap());
let tree = HashTree::new(HashTreeConfig::new(store.store_arc()));
let state = test_app_state(store.clone(), Vec::new());
let (index_cid, _size) = tree.put(b"<html>nested</html>").await.unwrap();
let subdir_cid = tree
.put_directory(vec![
DirEntry::from_cid("index.html", &index_cid).with_link_type(LinkType::File)
])
.await
.unwrap();
let root_cid = tree
.put_directory(vec![
DirEntry::from_cid("video", &subdir_cid).with_link_type(LinkType::Dir)
])
.await
.unwrap();
let target = resolve_directory_target(&state, &tree, &root_cid, Some("video".to_string()))
.await
.expect("resolve")
.expect("target");
match target {
DirectoryTarget::File { cid, path } => {
assert_eq!(cid, index_cid);
assert_eq!(path, "video/index.html");
}
DirectoryTarget::DirectoryListing { .. } => panic!("expected file target"),
}
}
#[tokio::test]
async fn resolve_directory_target_lists_directory_without_index() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("store")).unwrap());
let tree = HashTree::new(HashTreeConfig::new(store.store_arc()));
let state = test_app_state(store.clone(), Vec::new());
let (file_cid, _size) = tree.put(b"asset").await.unwrap();
let root_cid = tree
.put_directory(vec![
DirEntry::from_cid("asset.txt", &file_cid).with_link_type(LinkType::File)
])
.await
.unwrap();
let target = resolve_directory_target(&state, &tree, &root_cid, None)
.await
.expect("resolve")
.expect("target");
match target {
DirectoryTarget::DirectoryListing { cid } => assert_eq!(cid, root_cid),
DirectoryTarget::File { .. } => panic!("expected directory listing"),
}
}
#[test]
fn content_type_for_path_uses_extension() {
assert_eq!(content_type_for_path(Some("dir/video.mp4")), "video/mp4");
assert_eq!(content_type_for_path(Some("image.jpeg")), "image/jpeg");
assert_eq!(content_type_for_path(None), "application/octet-stream");
}
#[tokio::test]
async fn htree_nhash_path_fetches_nested_assets_from_upstream_tree() {
let source_dir = TempDir::new().unwrap();
let source_store = Arc::new(HashtreeStore::new(source_dir.path().join("source-db")).unwrap());
let site_dir = source_dir.path().join("site");
let assets_dir = site_dir.join("assets");
std::fs::create_dir_all(&assets_dir).unwrap();
let index_html = r#"
<!doctype html>
<html>
<head><script type="module" src="./assets/main.js"></script></head>
<body>ok</body>
</html>
"#;
let main_js = "export const big = '".to_string() + &"x".repeat(2_500_000) + "';\n";
std::fs::write(site_dir.join("index.html"), index_html).unwrap();
std::fs::write(assets_dir.join("main.js"), &main_js).unwrap();
let root_hash = source_store
.upload_dir_with_options(&site_dir, true)
.expect("upload site");
let root_hash_bytes = from_hex(&root_hash).expect("hex root hash");
let nhash = hashtree_core::nhash_encode(&root_hash_bytes).expect("encode nhash");
let route_nhash = nhash.strip_prefix("nhash1").expect("nhash prefix");
let upstream_router = Router::new()
.route("/:id", get(serve_blob_for_test))
.with_state(source_store.clone());
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let upstream_addr = listener.local_addr().unwrap();
let _server = tokio::spawn(async move {
axum::serve(listener, upstream_router).await.unwrap();
});
let target_dir = TempDir::new().unwrap();
let target_store = Arc::new(HashtreeStore::new(target_dir.path().join("target-db")).unwrap());
let state = test_app_state(target_store, vec![format!("http://{}", upstream_addr)]);
let response = htree_nhash_path(
State(state),
Path((route_nhash.to_string(), "assets/main.js".to_string())),
Query(HashMap::new()),
axum::http::HeaderMap::new(),
axum::extract::ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 43123))),
)
.await
.into_response();
assert_eq!(response.status(), StatusCode::OK);
assert_eq!(
response
.headers()
.get(CROSS_ORIGIN_RESOURCE_POLICY_HEADER)
.and_then(|value| value.to_str().ok()),
Some(CORP_CROSS_ORIGIN)
);
let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
assert_eq!(body.as_ref(), main_js.as_bytes());
}
#[tokio::test]
async fn htree_nhash_path_resolves_thumbnail_alias() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("db")).unwrap());
let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
let thumb_bytes = vec![0xff, 0xd8, 0xff, 0xe0, 0x00, 0x10, 0x4a, 0x46];
let (thumb_cid, _) = tree.put(&thumb_bytes).await.unwrap();
let root_cid = tree
.put_directory(vec![
DirEntry::from_cid("thumbnail.jpg", &thumb_cid).with_link_type(LinkType::File)
])
.await
.unwrap();
let nhash = hashtree_core::nhash_encode(&root_cid.hash).expect("encode nhash");
let route_nhash = nhash.strip_prefix("nhash1").expect("nhash prefix");
let response = htree_nhash_path(
State(test_app_state(store, Vec::new())),
Path((route_nhash.to_string(), "thumbnail".to_string())),
Query(HashMap::new()),
axum::http::HeaderMap::new(),
axum::extract::ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 43123))),
)
.await
.into_response();
assert_eq!(response.status(), StatusCode::OK);
let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
assert_eq!(body.as_ref(), thumb_bytes.as_slice());
}
#[test]
fn parse_byte_range_supports_suffix_requests() {
match parse_byte_range("bytes=-500", 1000) {
Some(ParsedByteRange::Satisfiable {
start,
end_inclusive,
}) => {
assert_eq!(start, 500);
assert_eq!(end_inclusive, 999);
}
_ => panic!("expected satisfiable suffix range"),
}
}
#[test]
fn parse_byte_range_clamps_large_suffix_requests() {
match parse_byte_range("bytes=-5000", 1000) {
Some(ParsedByteRange::Satisfiable {
start,
end_inclusive,
}) => {
assert_eq!(start, 0);
assert_eq!(end_inclusive, 999);
}
_ => panic!("expected satisfiable suffix range"),
}
}
#[tokio::test]
async fn serve_cid_with_range_honors_suffix_ranges() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("store")).unwrap());
let state = test_app_state(store.clone(), Vec::new());
let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
let data = b"0123456789";
let (cid, _) = tree.put(data).await.unwrap();
let mut headers = axum::http::HeaderMap::new();
headers.insert(header::RANGE, header::HeaderValue::from_static("bytes=-4"));
let response =
serve_cid_with_range(&state, &cid, headers, false, false, Some("clip.mp4")).await;
assert_eq!(response.status(), StatusCode::PARTIAL_CONTENT);
assert_eq!(
response
.headers()
.get(header::CONTENT_RANGE)
.and_then(|value| value.to_str().ok()),
Some("bytes 6-9/10")
);
let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
assert_eq!(body.as_ref(), b"6789");
}
#[tokio::test]
async fn serve_cid_with_range_streams_large_explicit_ranges() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("store")).unwrap());
let state = test_app_state(store.clone(), Vec::new());
let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
let data: Vec<u8> = (0..(5 * 1024 * 1024 + 17))
.map(|i| (i % 251) as u8)
.collect();
let (cid, _) = tree.put(&data).await.unwrap();
let mut headers = axum::http::HeaderMap::new();
headers.insert(
header::RANGE,
header::HeaderValue::from_str(&format!("bytes=0-{}", data.len() - 1)).unwrap(),
);
let response = serve_cid_with_range(&state, &cid, headers, true, false, Some("clip.mp4")).await;
assert_eq!(response.status(), StatusCode::PARTIAL_CONTENT);
let mut body = response.into_body();
let first_frame = timeout(Duration::from_secs(1), body.frame())
.await
.expect("first body frame should arrive quickly")
.expect("body should yield a frame")
.expect("body frame should be ok");
let first_chunk = first_frame
.into_data()
.expect("first frame should contain bytes");
assert_eq!(first_chunk.len(), CID_RANGE_STREAM_CHUNK_SIZE as usize);
}
fn copy_blob_between_stores(
source_store: &Arc<HashtreeStore>,
target_store: &Arc<HashtreeStore>,
hash: &[u8; 32],
) {
let data = source_store
.get_blob(hash)
.unwrap()
.unwrap_or_else(|| panic!("missing blob {}", to_hex(hash)));
target_store.put_blob(&data).unwrap();
}
#[tokio::test]
async fn htree_npub_path_range_fetches_missing_nested_file_from_upstream() {
let source_dir = TempDir::new().unwrap();
let source_store = Arc::new(HashtreeStore::new(source_dir.path().join("source-db")).unwrap());
let source_tree = HashTree::new(HashTreeConfig::new(source_store.store_arc()));
let video_data: Vec<u8> = (0..(3 * 1024 * 1024 + 137))
.map(|i| (i % 251) as u8)
.collect();
let (video_cid, _) = source_tree.put(&video_data).await.unwrap();
let child_dir_cid = source_tree
.put_directory(vec![
DirEntry::from_cid("video.mp4", &video_cid).with_link_type(LinkType::File)
])
.await
.unwrap();
let root_cid = source_tree
.put_directory(vec![DirEntry::from_cid(
"video_1767136282070",
&child_dir_cid,
)
.with_link_type(LinkType::Dir)])
.await
.unwrap();
let upstream_router = Router::new()
.route("/:id", get(serve_blob_for_test))
.with_state(source_store.clone());
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let upstream_addr = listener.local_addr().unwrap();
let upstream_server =
tokio::spawn(async move { axum::serve(listener, upstream_router).await.unwrap() });
let local_dir = TempDir::new().unwrap();
let local_store = Arc::new(HashtreeStore::new(local_dir.path().join("local-db")).unwrap());
copy_blob_between_stores(&source_store, &local_store, &root_cid.hash);
copy_blob_between_stores(&source_store, &local_store, &child_dir_cid.hash);
let state = test_app_state(
local_store.clone(),
vec![format!("http://{}", upstream_addr)],
);
put_cached_tree_root(
&state,
tree_root_cache_key("npub1example", "videos/Music", None),
root_cid.clone(),
"cache",
None,
);
let mut headers = axum::http::HeaderMap::new();
headers.insert(
header::RANGE,
header::HeaderValue::from_static("bytes=0-1023"),
);
let response = htree_npub_impl(
State(state),
"npub1example".to_string(),
"videos/Music".to_string(),
Some("video_1767136282070/video.mp4".to_string()),
Query(HashMap::new()),
headers,
axum::extract::ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 43123))),
)
.await;
assert_eq!(response.status(), StatusCode::PARTIAL_CONTENT);
let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
assert_eq!(body.as_ref(), &video_data[..1024]);
upstream_server.abort();
}
#[tokio::test]
async fn htree_npub_path_range_fetches_missing_nested_file_chunks_from_upstream() {
let source_dir = TempDir::new().unwrap();
let source_store = Arc::new(HashtreeStore::new(source_dir.path().join("source-db")).unwrap());
let source_tree = HashTree::new(HashTreeConfig::new(source_store.store_arc()));
let video_data: Vec<u8> = (0..(5 * 1024 * 1024 + 17))
.map(|i| 255 - (i % 251) as u8)
.collect();
let (video_cid, _) = source_tree.put(&video_data).await.unwrap();
let child_dir_cid = source_tree
.put_directory(vec![
DirEntry::from_cid("video.mp4", &video_cid).with_link_type(LinkType::File)
])
.await
.unwrap();
let root_cid = source_tree
.put_directory(vec![DirEntry::from_cid(
"video_1767136255334",
&child_dir_cid,
)
.with_link_type(LinkType::Dir)])
.await
.unwrap();
let upstream_router = Router::new()
.route("/:id", get(serve_blob_for_test))
.with_state(source_store.clone());
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let upstream_addr = listener.local_addr().unwrap();
let upstream_server =
tokio::spawn(async move { axum::serve(listener, upstream_router).await.unwrap() });
let local_dir = TempDir::new().unwrap();
let local_store = Arc::new(HashtreeStore::new(local_dir.path().join("local-db")).unwrap());
copy_blob_between_stores(&source_store, &local_store, &root_cid.hash);
copy_blob_between_stores(&source_store, &local_store, &child_dir_cid.hash);
copy_blob_between_stores(&source_store, &local_store, &video_cid.hash);
let state = test_app_state(
local_store.clone(),
vec![format!("http://{}", upstream_addr)],
);
put_cached_tree_root(
&state,
tree_root_cache_key("npub1example", "videos/Music", None),
root_cid.clone(),
"cache",
None,
);
let mut headers = axum::http::HeaderMap::new();
headers.insert(
header::RANGE,
header::HeaderValue::from_static("bytes=0-1023"),
);
let response = htree_npub_impl(
State(state),
"npub1example".to_string(),
"videos/Music".to_string(),
Some("video_1767136255334/video.mp4".to_string()),
Query(HashMap::new()),
headers,
axum::extract::ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 43123))),
)
.await;
assert_eq!(response.status(), StatusCode::PARTIAL_CONTENT);
let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
assert_eq!(body.as_ref(), &video_data[..1024]);
upstream_server.abort();
}
#[tokio::test]
async fn htree_npub_path_uses_original_uri_for_encoded_tree_names() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("db")).unwrap());
let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
let asset_bytes = b"nostr-vpn-macos-zip".to_vec();
let (asset_cid, _) = tree.put(&asset_bytes).await.unwrap();
let assets_dir = tree
.put_directory(vec![DirEntry::from_cid(
"nostr-vpn-v0.3.0-macos-arm64.zip",
&asset_cid,
)
.with_link_type(LinkType::File)])
.await
.unwrap();
let version_dir = tree
.put_directory(vec![
DirEntry::from_cid("assets", &assets_dir).with_link_type(LinkType::Dir)
])
.await
.unwrap();
let root_cid = tree
.put_directory(vec![
DirEntry::from_cid("v0.3.0", &version_dir).with_link_type(LinkType::Dir)
])
.await
.unwrap();
let state = test_app_state(store, Vec::new());
put_cached_tree_root(
&state,
tree_root_cache_key("npub1example", "releases/nostr-vpn", None),
root_cid.clone(),
"cache",
None,
);
let response = htree_npub_path(
State(state),
OriginalUri(
"/htree/npub1example/releases%2Fnostr-vpn/v0.3.0/assets/nostr-vpn-v0.3.0-macos-arm64.zip"
.parse()
.unwrap(),
),
Path((
"example".to_string(),
"releases%2Fnostr-vpn".to_string(),
"v0.3.0/assets/nostr-vpn-v0.3.0-macos-arm64.zip".to_string(),
)),
Query(HashMap::new()),
axum::http::HeaderMap::new(),
axum::extract::ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 43123))),
)
.await
.into_response();
assert_eq!(response.status(), StatusCode::OK);
let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
assert_eq!(body.as_ref(), asset_bytes.as_slice());
}
#[tokio::test]
async fn serve_content_internal_honors_suffix_ranges() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("store")).unwrap());
let state = test_app_state(store.clone(), Vec::new());
let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
let data = b"abcdefghij";
let (cid, _) = tree.put(data).await.unwrap();
let mut headers = axum::http::HeaderMap::new();
headers.insert(header::RANGE, header::HeaderValue::from_static("bytes=-3"));
let response = serve_content_internal(&state, &cid.hash, headers, true, false).await;
assert_eq!(response.status(), StatusCode::PARTIAL_CONTENT);
assert_eq!(
response
.headers()
.get(header::CONTENT_RANGE)
.and_then(|value| value.to_str().ok()),
Some("bytes 7-9/10")
);
let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
assert_eq!(body.as_ref(), b"hij");
}
#[tokio::test]
async fn serve_content_or_blob_honors_raw_blob_ranges() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("store")).unwrap());
let state = test_app_state(store.clone(), Vec::new());
let data = b"raw-blob-range";
let hash_hex = store.put_blob(data).unwrap();
let mut headers = axum::http::HeaderMap::new();
headers.insert(header::RANGE, header::HeaderValue::from_static("bytes=4-7"));
let response = serve_content_or_blob(
State(state),
Path(format!("{hash_hex}.bin")),
Query(HashMap::new()),
headers,
axum::extract::ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 43123))),
)
.await
.into_response();
assert_eq!(response.status(), StatusCode::PARTIAL_CONTENT);
assert_eq!(
response
.headers()
.get(header::CONTENT_RANGE)
.and_then(|value| value.to_str().ok()),
Some("bytes 4-7/14")
);
assert_eq!(
response
.headers()
.get(header::ACCEPT_RANGES)
.and_then(|value| value.to_str().ok()),
Some("bytes")
);
let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
assert_eq!(body.as_ref(), b"blob");
}
#[tokio::test]
async fn hot_blob_cache_serves_repeated_raw_blob_reads() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("store")).unwrap());
let state = test_app_state(store.clone(), Vec::new());
let data = b"hot-cache-blob";
let hash_hex = store.put_blob(data).unwrap();
let hash = from_hex(&hash_hex).unwrap();
assert_eq!(
get_blob_size_without_blocking_runtime(&state, hash)
.await
.unwrap(),
Some(data.len() as u64)
);
assert_eq!(
get_blob_without_blocking_runtime(&state, hash)
.await
.unwrap()
.as_deref(),
Some(data.as_slice())
);
assert!(store.router().delete_sync(&hash).unwrap());
assert_eq!(
get_blob_size_without_blocking_runtime(&state, hash)
.await
.unwrap(),
Some(data.len() as u64)
);
assert_eq!(
get_blob_without_blocking_runtime(&state, hash)
.await
.unwrap()
.as_deref(),
Some(data.as_slice())
);
}
#[tokio::test]
async fn raw_blob_miss_allows_short_edge_negative_cache() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("store")).unwrap());
let state = test_app_state(store, Vec::new());
let missing_hash = "0000000000000000000000000000000000000000000000000000000000000000";
let response = serve_content_or_blob(
State(state),
Path(format!("{missing_hash}.bin")),
Query(HashMap::new()),
axum::http::HeaderMap::new(),
axum::extract::ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 43123))),
)
.await
.into_response();
assert_eq!(response.status(), StatusCode::NOT_FOUND);
assert_eq!(
response
.headers()
.get(header::CACHE_CONTROL)
.and_then(|value| value.to_str().ok()),
Some(IMMUTABLE_NOT_FOUND_CACHE_CONTROL)
);
}
#[tokio::test]
async fn generic_not_found_stays_uncacheable() {
let response = not_found_response("missing");
assert_eq!(response.status(), StatusCode::NOT_FOUND);
assert_eq!(
response
.headers()
.get(header::CACHE_CONTROL)
.and_then(|value| value.to_str().ok()),
Some(NOT_FOUND_CACHE_CONTROL)
);
}
#[tokio::test]
async fn cache_tree_root_seeds_mutable_root_cache() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("db")).unwrap());
let state = test_app_state(store, Vec::new());
let response = cache_tree_root(
State(state.clone()),
Json(CacheTreeRootRequest {
npub: "npub1example".to_string(),
tree_name: "video".to_string(),
hash: "988db3f24dc222715f1c1e1fa5876690d3147122243d72d85fd44283867cd61a".to_string(),
key: None,
visibility: Some("public".to_string()),
}),
)
.await
.into_response();
assert_eq!(response.status(), StatusCode::OK);
let cached = get_cached_tree_root(&state, "npub1example/video").expect("cached cid");
assert_eq!(
to_hex(&cached.cid.hash),
"988db3f24dc222715f1c1e1fa5876690d3147122243d72d85fd44283867cd61a"
);
assert!(cached.cid.key.is_none());
}
#[tokio::test]
async fn resolve_root_offline_accepts_npub_owner_for_local_relay_events() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("db")).unwrap());
let keys = Keys::generate();
let relay = test_nostr_relay(&temp_dir, keys.public_key().to_hex()).await;
let state = AppState {
nostr_relay: Some(relay.clone()),
..test_app_state(store, Vec::new())
};
let hash_hex = "ab".repeat(32);
let tree_name = "offline-tree";
let event = EventBuilder::new(
Kind::Custom(30078),
"",
[
Tag::identifier(tree_name.to_string()),
Tag::custom(
TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
vec!["hashtree".to_string()],
),
Tag::custom(TagKind::Custom("hash".into()), vec![hash_hex.clone()]),
],
)
.to_event(&keys)
.unwrap();
relay.ingest_trusted_event(event.clone()).await.unwrap();
let resolved = resolve_root_offline(
&state,
&keys.public_key().to_bech32().unwrap(),
tree_name,
None,
)
.await
.expect("offline root should resolve from local relay with npub");
assert_eq!(resolved.source, "local-relay");
assert_eq!(to_hex(&resolved.cid.hash), hash_hex);
assert_eq!(
resolved
.root_event
.as_ref()
.map(|root| root.event_id.as_str()),
Some(event.id.to_hex().as_str())
);
}
#[tokio::test]
async fn nostr_profile_queries_upstream_relays_after_local_miss() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("db")).unwrap());
let keys = Keys::generate();
let relay = test_nostr_relay(&temp_dir, keys.public_key().to_hex()).await;
let event = EventBuilder::new(
Kind::Metadata,
serde_json::json!({
"display_name": "Sirius Business",
"picture": "https://example.com/avatar.png",
})
.to_string(),
[],
)
.custom_created_at(Timestamp::from_secs(42))
.to_event(&keys)
.unwrap();
let upstream_url = spawn_mock_upstream_relay(vec![event.clone()]).await;
let mut state = test_app_state(store, Vec::new());
state.nostr_relay = Some(relay.clone());
state.nostr_relay_urls = vec![upstream_url];
let response = nostr_profile(AxumState(state), AxumPath(keys.public_key().to_hex()))
.await
.into_response();
assert_eq!(response.status(), StatusCode::OK);
let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
let payload: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(
payload["profile"]["display_name"].as_str(),
Some("Sirius Business")
);
assert_eq!(payload["created_at"].as_u64(), Some(42));
let cached = relay
.query_events(
&nostr::Filter::new()
.author(keys.public_key())
.kind(Kind::Metadata)
.limit(10),
10,
)
.await;
assert_eq!(cached.len(), 1);
assert_eq!(cached[0].id, event.id);
}
#[test]
fn resolver_config_prefers_state_relay_urls() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("db")).unwrap());
let mut state = test_app_state(store, Vec::new());
state.nostr_relay_urls = vec![
"wss://temp.iris.to".to_string(),
"wss://upload.iris.to/nostr".to_string(),
];
let config = resolver_config(&state);
assert_eq!(config.relays, state.nostr_relay_urls);
assert_eq!(config.resolve_timeout, HTTP_RESOLVER_TIMEOUT);
}
#[tokio::test]
async fn resolve_to_hash_refresh_skips_cached_root() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("db")).unwrap());
let state = test_app_state(store, Vec::new());
let hash_hex = "11".repeat(32);
let cid = Cid::parse(&hash_hex).expect("valid cid");
put_cached_tree_root(
&state,
tree_root_cache_key("npub1example", "video", None),
cid,
"cache",
None,
);
let cached = resolve_to_hash(
State(state.clone()),
OriginalUri("/api/resolve/npub1example/video".parse().unwrap()),
Path(("npub1example".to_string(), "video".to_string())),
Query(HashMap::new()),
)
.await
.into_response();
let cached_body = to_bytes(cached.into_body(), usize::MAX).await.unwrap();
let cached_json: serde_json::Value = serde_json::from_slice(&cached_body).unwrap();
assert_eq!(cached_json["hash"], hash_hex);
assert_eq!(cached_json["source"], "cache");
let refresh = resolve_to_hash(
State(state),
OriginalUri("/api/resolve/npub1example/video".parse().unwrap()),
Path(("npub1example".to_string(), "video".to_string())),
Query(HashMap::from([("refresh".to_string(), "1".to_string())])),
)
.await
.into_response();
let refresh_body = to_bytes(refresh.into_body(), usize::MAX).await.unwrap();
let refresh_json: serde_json::Value = serde_json::from_slice(&refresh_body).unwrap();
assert!(refresh_json.get("error").is_some());
assert_eq!(refresh_json["key"], "npub1example/video");
}
#[tokio::test]
async fn resolve_to_hash_refresh_uses_local_relay_before_relays() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("db")).unwrap());
let keys = Keys::generate();
let relay = test_nostr_relay(&temp_dir, keys.public_key().to_hex()).await;
let tree_name = "video";
let cached_hash = "11".repeat(32);
let refreshed_hash = "22".repeat(32);
let event = EventBuilder::new(
Kind::Custom(30078),
"",
[
Tag::identifier(tree_name.to_string()),
Tag::custom(
TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
vec!["hashtree".to_string()],
),
Tag::custom(TagKind::Custom("hash".into()), vec![refreshed_hash.clone()]),
],
)
.to_event(&keys)
.unwrap();
relay.ingest_trusted_event(event.clone()).await.unwrap();
let state = AppState {
nostr_relay: Some(relay),
..test_app_state(store, Vec::new())
};
put_cached_tree_root(
&state,
tree_root_cache_key(&keys.public_key().to_bech32().unwrap(), tree_name, None),
Cid::parse(&cached_hash).expect("valid cached cid"),
"cache",
None,
);
let refresh = resolve_to_hash(
State(state),
OriginalUri(
format!(
"/api/resolve/{}/{}",
keys.public_key().to_bech32().unwrap(),
tree_name
)
.parse()
.unwrap(),
),
Path((
keys.public_key().to_bech32().unwrap(),
tree_name.to_string(),
)),
Query(HashMap::from([("refresh".to_string(), "1".to_string())])),
)
.await
.into_response();
let refresh_body = to_bytes(refresh.into_body(), usize::MAX).await.unwrap();
let refresh_json: serde_json::Value = serde_json::from_slice(&refresh_body).unwrap();
assert_eq!(refresh_json["hash"], refreshed_hash);
assert_eq!(refresh_json["source"], "local-relay");
assert_eq!(refresh_json["event_id"], event.id.to_hex());
}
#[tokio::test]
async fn resolve_to_hash_refresh_uses_upstream_relays_after_local_miss() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("db")).unwrap());
let keys = Keys::generate();
let relay = test_nostr_relay(&temp_dir, keys.public_key().to_hex()).await;
let tree_name = "video";
let refreshed_hash = "33".repeat(32);
let event = EventBuilder::new(
Kind::Custom(30078),
"",
[
Tag::identifier(tree_name.to_string()),
Tag::custom(
TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
vec!["hashtree".to_string()],
),
Tag::custom(TagKind::Custom("hash".into()), vec![refreshed_hash.clone()]),
],
)
.to_event(&keys)
.unwrap();
let upstream_url = spawn_mock_upstream_relay(vec![event.clone()]).await;
let mut state = test_app_state(store, Vec::new());
state.nostr_relay = Some(relay.clone());
state.nostr_relay_urls = vec![upstream_url];
let refresh = resolve_to_hash(
State(state),
OriginalUri(
format!(
"/api/resolve/{}/{}",
keys.public_key().to_bech32().unwrap(),
tree_name
)
.parse()
.unwrap(),
),
Path((
keys.public_key().to_bech32().unwrap(),
tree_name.to_string(),
)),
Query(HashMap::from([("refresh".to_string(), "1".to_string())])),
)
.await
.into_response();
let refresh_body = to_bytes(refresh.into_body(), usize::MAX).await.unwrap();
let refresh_json: serde_json::Value = serde_json::from_slice(&refresh_body).unwrap();
assert_eq!(refresh_json["hash"], refreshed_hash);
assert_eq!(refresh_json["source"], "nostr-relay");
assert_eq!(refresh_json["event_id"], event.id.to_hex());
let cached = relay
.query_events(
&nostr::Filter::new()
.author(keys.public_key())
.kind(Kind::Custom(30078))
.limit(10),
10,
)
.await;
assert_eq!(cached.len(), 1);
assert_eq!(cached[0].id, event.id);
}
#[tokio::test]
async fn htree_npub_path_thumbnail_does_not_fall_back_to_historical_root() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("db")).unwrap());
let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
let keys = Keys::generate();
let relay = test_nostr_relay(&temp_dir, keys.public_key().to_hex()).await;
let thumb_bytes = vec![0xff, 0xd8, 0xff, 0xe0, 0x00, 0x10, 0x4a, 0x46];
let (thumb_cid, _) = tree.put(&thumb_bytes).await.unwrap();
let historical_root = tree
.put_directory(vec![
DirEntry::from_cid("thumbnail.jpg", &thumb_cid).with_link_type(LinkType::File)
])
.await
.unwrap();
let (video_cid, _) = tree.put(b"video-data").await.unwrap();
let current_root = tree
.put_directory(vec![
DirEntry::from_cid("video.mp4", &video_cid).with_link_type(LinkType::File)
])
.await
.unwrap();
let tree_name = "videos/Mine Bombers in-game music";
let historical_event = EventBuilder::new(
Kind::Custom(30078),
"",
[
Tag::identifier(tree_name.to_string()),
Tag::custom(
TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
vec!["hashtree".to_string()],
),
Tag::custom(
TagKind::Custom("hash".into()),
vec![to_hex(&historical_root.hash)],
),
],
)
.custom_created_at(Timestamp::from(10))
.to_event(&keys)
.unwrap();
relay
.ingest_trusted_event(historical_event.clone())
.await
.unwrap();
let current_event = EventBuilder::new(
Kind::Custom(30078),
"",
[
Tag::identifier(tree_name.to_string()),
Tag::custom(
TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
vec!["hashtree".to_string()],
),
Tag::custom(
TagKind::Custom("hash".into()),
vec![to_hex(¤t_root.hash)],
),
],
)
.custom_created_at(Timestamp::from(20))
.to_event(&keys)
.unwrap();
relay.ingest_trusted_event(current_event).await.unwrap();
let state = AppState {
nostr_relay: Some(relay),
..test_app_state(store, Vec::new())
};
let npub = keys.public_key().to_bech32().unwrap();
put_cached_tree_root(
&state,
tree_root_cache_key(&npub, tree_name, None),
current_root.clone(),
"cache",
None,
);
let response = htree_npub_impl(
State(state),
npub,
tree_name.to_string(),
Some("thumbnail".to_string()),
Query(HashMap::new()),
axum::http::HeaderMap::new(),
axum::extract::ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 43123))),
)
.await;
assert_eq!(response.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn cache_tree_root_public_chk_uses_plain_mutable_cache_key() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("db")).unwrap());
let state = test_app_state(store, Vec::new());
let response = cache_tree_root(
State(state.clone()),
Json(CacheTreeRootRequest {
npub: "npub1example".to_string(),
tree_name: "video".to_string(),
hash: "be8f5da537f62d02d3ff113d213a7058116f790a8d0e158c2766543deda10e35".to_string(),
key: Some(
"34e24fadaddc60da2e761501aae44c1c2b6b8706b73dff736eb0fc7d803133bb".to_string(),
),
visibility: Some("public".to_string()),
}),
)
.await
.into_response();
assert_eq!(response.status(), StatusCode::OK);
let cached = get_cached_tree_root(&state, "npub1example/video").expect("cached cid");
assert_eq!(
to_hex(&cached.cid.hash),
"be8f5da537f62d02d3ff113d213a7058116f790a8d0e158c2766543deda10e35"
);
assert_eq!(
cached.cid.key.map(|key| to_hex(&key)).as_deref(),
Some("34e24fadaddc60da2e761501aae44c1c2b6b8706b73dff736eb0fc7d803133bb")
);
assert!(get_cached_tree_root(
&state,
"npub1example/video?k=34e24fadaddc60da2e761501aae44c1c2b6b8706b73dff736eb0fc7d803133bb"
)
.is_none());
}
#[tokio::test]
async fn clear_tree_root_cache_removes_seeded_mutable_root_cache() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("db")).unwrap());
let state = test_app_state(store, Vec::new());
let seed_response = cache_tree_root(
State(state.clone()),
Json(CacheTreeRootRequest {
npub: "npub1example".to_string(),
tree_name: "video".to_string(),
hash: "988db3f24dc222715f1c1e1fa5876690d3147122243d72d85fd44283867cd61a".to_string(),
key: None,
visibility: Some("public".to_string()),
}),
)
.await
.into_response();
assert_eq!(seed_response.status(), StatusCode::OK);
assert!(get_cached_tree_root(&state, "npub1example/video").is_some());
let clear_response = clear_tree_root_cache(
State(state.clone()),
Json(ClearTreeRootCacheRequest {
npub: "npub1example".to_string(),
tree_name: "video".to_string(),
key: None,
visibility: Some("public".to_string()),
}),
)
.await
.into_response();
assert_eq!(clear_response.status(), StatusCode::OK);
assert!(get_cached_tree_root(&state, "npub1example/video").is_none());
}
#[tokio::test]
async fn cached_root_preserves_encrypted_key_metadata_for_followup_resolves() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("db")).unwrap());
let state = test_app_state(store, Vec::new());
let hash_hex = "cd".repeat(32);
let encrypted_key = "ef".repeat(32);
let cid = Cid::parse(&hash_hex).expect("valid cid");
let root_event = PeerRootEvent {
hash: hash_hex.clone(),
key: None,
encrypted_key: Some(encrypted_key.clone()),
self_encrypted_key: None,
event_id: "event-1".to_string(),
created_at: 1,
peer_id: "peer-a".to_string(),
};
put_cached_tree_root(
&state,
tree_root_cache_key("npub1example", "video", None),
cid.clone(),
"webrtc",
Some(root_event.clone()),
);
let resolved = resolve_root_offline(&state, "npub1example", "video", None)
.await
.expect("cached root should resolve");
assert_eq!(resolved.source, "cache");
assert_eq!(resolved.cid, cid);
assert_eq!(
resolved
.root_event
.as_ref()
.and_then(|root| root.encrypted_key.as_deref()),
Some(encrypted_key.as_str())
);
}