mod auth;
mod blob_read;
pub mod blossom;
mod handlers;
mod ingest_filter;
mod mime;
mod nostr_query;
mod peer_status;
mod request_paths;
mod status_metrics;
#[cfg(feature = "p2p")]
pub mod stun;
mod ui;
pub mod ws_relay;
use crate::nostr_relay::NostrRelay;
use crate::socialgraph;
use crate::storage::HashtreeStore;
use crate::webrtc::WebRTCState;
use anyhow::Result;
use axum::{
body::Body,
extract::DefaultBodyLimit,
http::Request,
middleware,
response::Response,
routing::{get, post, put},
Router,
};
use futures::{future::poll_fn, pin_mut, FutureExt};
use hyper::body::Incoming;
use hyper_util::{
rt::{TokioExecutor, TokioIo, TokioTimer},
server::conn::auto::Builder as HyperBuilder,
service::TowerToHyperService,
};
use socket2::{SockRef, TcpKeepalive};
use std::collections::{HashMap, HashSet};
use std::convert::Infallible;
use std::future;
use std::io;
use std::net::SocketAddr;
use std::sync::{Arc, OnceLock, RwLock};
use std::time::Duration;
use tokio::sync::watch;
use tower::{Service, ServiceExt as _};
use tower_http::cors::CorsLayer;
use tracing::{debug, error, trace};
pub use auth::{new_lookup_cache, AppState, AuthCredentials, CachedTreeRootEntry};
static VIRTUAL_TREE_HOSTS: OnceLock<RwLock<HashMap<String, String>>> = OnceLock::new();
const DEFAULT_OPTIMISTIC_UPLOAD_QUEUE_BYTES: usize = 512 * 1024 * 1024;
#[cfg(not(test))]
const HTTP1_HEADER_READ_TIMEOUT: Duration = Duration::from_secs(30);
#[cfg(test)]
const HTTP1_HEADER_READ_TIMEOUT: Duration = Duration::from_millis(200);
const HTTP2_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(30);
const HTTP2_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(10);
const TCP_KEEPALIVE_TIME: Duration = Duration::from_secs(60);
const TCP_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(15);
fn virtual_tree_hosts() -> &'static RwLock<HashMap<String, String>> {
VIRTUAL_TREE_HOSTS.get_or_init(|| RwLock::new(HashMap::new()))
}
fn normalize_virtual_tree_host(host: &str) -> Option<String> {
let trimmed = host.trim().trim_end_matches('.').to_ascii_lowercase();
if trimmed.is_empty() {
return None;
}
if let Some(stripped) = trimmed
.strip_prefix('[')
.and_then(|value| value.split_once(']'))
{
let host_only = stripped.0.trim();
if host_only.is_empty() {
return None;
}
return Some(host_only.to_string());
}
if let Some((host_only, port)) = trimmed.rsplit_once(':') {
if !host_only.is_empty() && !port.is_empty() && port.chars().all(|ch| ch.is_ascii_digit()) {
return Some(host_only.to_string());
}
}
Some(trimmed)
}
pub fn register_virtual_tree_host(host: &str, internal_root: &str) {
let Some(normalized_host) = normalize_virtual_tree_host(host) else {
return;
};
let normalized_root = internal_root.trim().trim_end_matches('/');
if normalized_root.is_empty() {
return;
}
if let Ok(mut hosts) = virtual_tree_hosts().write() {
hosts.insert(normalized_host, normalized_root.to_string());
}
}
pub fn resolve_virtual_tree_host(host: &str) -> Option<String> {
let normalized_host = normalize_virtual_tree_host(host)?;
virtual_tree_hosts()
.read()
.ok()
.and_then(|hosts| hosts.get(&normalized_host).cloned())
}
#[cfg(test)]
pub fn clear_virtual_tree_hosts_for_test() {
if let Ok(mut hosts) = virtual_tree_hosts().write() {
hosts.clear();
}
}
pub struct HashtreeServer {
state: AppState,
addr: String,
extra_routes: Option<Router<AppState>>,
cors: Option<CorsLayer>,
}
impl HashtreeServer {
pub fn new(store: Arc<HashtreeStore>, addr: String) -> Self {
Self {
state: AppState {
store,
auth: None,
daemon_started_at: current_unix_secs(),
peer_mode: crate::config::ServerMode::Normal,
hash_get_enabled: true,
http_webrtc_fetch: true,
webrtc_peers: None,
fips_transport: None,
fetch_from_fips_peers: true,
ws_relay: Arc::new(auth::WsRelayState::new()),
max_upload_bytes: 5 * 1024 * 1024, public_writes: true, public_plaintext_reads: true,
require_random_untrusted_ingest: true,
optimistic_blossom_uploads: false,
optimistic_upload_queue_bytes: DEFAULT_OPTIMISTIC_UPLOAD_QUEUE_BYTES,
optimistic_upload_queue: Arc::new(tokio::sync::Semaphore::new(
DEFAULT_OPTIMISTIC_UPLOAD_QUEUE_BYTES,
)),
allowed_pubkeys: HashSet::new(), upstream_blossom: Vec::new(),
social_graph: None,
social_graph_store: None,
social_graph_root: None,
socialgraph_snapshot_public: false,
nostr_relay: None,
nostr_relay_urls: Vec::new(),
tree_root_cache: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
inflight_blob_fetches: Arc::new(tokio::sync::Mutex::new(
std::collections::HashMap::new(),
)),
inflight_blob_reads: Arc::new(tokio::sync::Mutex::new(
std::collections::HashMap::new(),
)),
blob_cache: Arc::new(crate::blob_cache::BlobCache::from_env()),
directory_listing_cache: Arc::new(std::sync::Mutex::new(new_lookup_cache())),
resolved_path_cache: Arc::new(std::sync::Mutex::new(new_lookup_cache())),
thumbnail_path_cache: Arc::new(std::sync::Mutex::new(new_lookup_cache())),
cid_size_cache: Arc::new(std::sync::Mutex::new(new_lookup_cache())),
},
addr,
extra_routes: None,
cors: None,
}
}
pub fn with_max_upload_bytes(mut self, bytes: usize) -> Self {
self.state.max_upload_bytes = bytes;
self
}
pub fn with_public_writes(mut self, public: bool) -> Self {
self.state.public_writes = public;
self
}
pub fn with_public_plaintext_reads(mut self, public: bool) -> Self {
self.state.public_plaintext_reads = public;
self
}
pub fn with_require_random_untrusted_ingest(mut self, require: bool) -> Self {
self.state.require_random_untrusted_ingest = require;
self
}
pub fn with_optimistic_blossom_uploads(mut self, enabled: bool) -> Self {
self.state.optimistic_blossom_uploads = enabled;
self
}
pub fn with_server_mode(mut self, mode: crate::config::ServerMode) -> Self {
self.state.peer_mode = mode;
self
}
pub fn with_hash_get_enabled(mut self, enabled: bool) -> Self {
self.state.hash_get_enabled = enabled;
self
}
pub fn with_http_webrtc_fetch(mut self, enabled: bool) -> Self {
self.state.http_webrtc_fetch = enabled;
self
}
pub fn with_fetch_from_fips_peers(mut self, enabled: bool) -> Self {
self.state.fetch_from_fips_peers = enabled;
self
}
pub fn with_fips_transport(
mut self,
transport: Arc<crate::fips_transport::DaemonFipsTransport>,
) -> Self {
self.state.fips_transport = Some(transport);
self
}
pub fn with_webrtc_peers(mut self, webrtc_state: Arc<WebRTCState>) -> Self {
self.state.webrtc_peers = Some(webrtc_state);
self
}
pub fn with_auth(mut self, username: String, password: String) -> Self {
self.state.auth = Some(AuthCredentials { username, password });
self
}
pub fn with_allowed_pubkeys(mut self, pubkeys: HashSet<String>) -> Self {
self.state.allowed_pubkeys = pubkeys;
self
}
pub fn with_upstream_blossom(mut self, servers: Vec<String>) -> Self {
self.state.upstream_blossom = servers;
self
}
pub fn with_social_graph(mut self, sg: Arc<socialgraph::SocialGraphAccessControl>) -> Self {
self.state.social_graph = Some(sg);
self
}
pub fn with_socialgraph_snapshot(
mut self,
store: Arc<dyn socialgraph::SocialGraphBackend>,
root: [u8; 32],
public: bool,
) -> Self {
self.state.social_graph_store = Some(store);
self.state.social_graph_root = Some(root);
self.state.socialgraph_snapshot_public = public;
self
}
pub fn with_nostr_relay(mut self, relay: Arc<NostrRelay>) -> Self {
self.state.nostr_relay = Some(relay);
self
}
pub fn with_nostr_relay_urls(mut self, relays: Vec<String>) -> Self {
self.state.nostr_relay_urls = relays;
self
}
pub fn with_extra_routes(mut self, routes: Router<AppState>) -> Self {
self.extra_routes = Some(routes);
self
}
pub fn with_cors(mut self, cors: CorsLayer) -> Self {
self.cors = Some(cors);
self
}
pub async fn run(self) -> Result<()> {
let listener = tokio::net::TcpListener::bind(&self.addr).await?;
let _ = self.run_with_listener(listener).await?;
Ok(())
}
pub async fn run_with_listener(self, listener: tokio::net::TcpListener) -> Result<u16> {
self.run_with_listener_until(listener, future::pending::<()>())
.await
}
pub async fn run_with_listener_until<F>(
self,
listener: tokio::net::TcpListener,
shutdown: F,
) -> Result<u16>
where
F: std::future::Future<Output = ()> + Send + 'static,
{
let local_addr = listener.local_addr()?;
let state = self.state.clone();
let public_routes = Router::new()
.route("/", get(handlers::serve_root_or_virtual_host))
.route("/ws", get(ws_relay::ws_data))
.route("/ws/", get(ws_relay::ws_data))
.route(
"/htree/test",
get(handlers::htree_test).head(handlers::htree_test),
)
.route("/htree/nhash1:nhash", get(handlers::htree_nhash))
.route("/htree/nhash1:nhash/", get(handlers::htree_nhash))
.route("/htree/nhash1:nhash/*path", get(handlers::htree_nhash_path))
.route("/htree/npub1:npub/:treename", get(handlers::htree_npub))
.route("/htree/npub1:npub/:treename/", get(handlers::htree_npub))
.route(
"/htree/npub1:npub/:treename/*path",
get(handlers::htree_npub_path),
)
.route("/n/:pubkey/:treename", get(handlers::resolve_and_serve))
.route("/npub1:rest", get(handlers::serve_npub))
.route("/npub1:rest/*path", get(handlers::serve_npub))
.route(
"/:id",
get(handlers::serve_content_or_blob)
.head(blossom::head_blob)
.delete(blossom::delete_blob)
.options(blossom::cors_preflight),
)
.route(
"/upload",
put(blossom::upload_blob).options(blossom::cors_preflight),
)
.route(
"/upload/batch",
post(blossom::upload_blob_batch).options(blossom::cors_preflight),
)
.route(
"/upload/check",
post(blossom::upload_check).options(blossom::cors_preflight),
)
.route(
"/list/:pubkey",
get(blossom::list_blobs).options(blossom::cors_preflight),
)
.route("/health", get(handlers::health_check))
.route("/api/pins", get(handlers::list_pins))
.route("/api/stats", get(handlers::storage_stats))
.route("/api/peers", get(handlers::webrtc_peers))
.route("/api/status", get(handlers::daemon_status))
.route("/api/p2p/signal", post(handlers::p2p_signal))
.route("/api/socialgraph", get(handlers::socialgraph_stats))
.route(
"/api/socialgraph/snapshot",
get(handlers::socialgraph_snapshot),
)
.route(
"/api/socialgraph/distance/:pubkey",
get(handlers::follow_distance),
)
.route(
"/api/resolve/:pubkey/:treename",
get(handlers::resolve_to_hash),
)
.route(
"/api/nostr/resolve/:pubkey/:treename",
get(handlers::resolve_to_hash),
)
.route("/api/nostr/profile/:pubkey", get(handlers::nostr_profile))
.route("/api/cache-tree-root", post(handlers::cache_tree_root))
.route(
"/api/clear-tree-root-cache",
post(handlers::clear_tree_root_cache),
)
.route("/api/trees/:pubkey", get(handlers::list_trees))
.fallback(get(handlers::serve_virtual_host_fallback))
.with_state(state.clone());
let protected_routes = Router::new()
.route("/upload", post(handlers::upload_file))
.route("/api/pin/:cid", post(handlers::pin_cid))
.route("/api/unpin/:cid", post(handlers::unpin_cid))
.route("/api/gc", post(handlers::garbage_collect))
.layer(middleware::from_fn_with_state(
state.clone(),
auth::auth_middleware,
))
.with_state(state.clone());
let mut app = public_routes
.merge(protected_routes)
.layer(DefaultBodyLimit::max(10 * 1024 * 1024 * 1024)) .layer(middleware::from_fn(status_metrics::record_http_status));
if let Some(extra) = self.extra_routes {
app = app.merge(extra.with_state(state));
}
if let Some(cors) = self.cors {
app = app.layer(cors);
}
let make_service = app.into_make_service_with_connect_info::<std::net::SocketAddr>();
serve_with_connection_limits(listener, make_service, shutdown).await?;
Ok(local_addr.port())
}
pub fn addr(&self) -> &str {
&self.addr
}
}
async fn serve_with_connection_limits<M, S, F>(
listener: tokio::net::TcpListener,
mut make_service: M,
shutdown: F,
) -> io::Result<()>
where
M: Service<SocketAddr, Error = Infallible, Response = S> + Send + 'static,
M::Future: Send,
S: Service<Request<Body>, Response = Response, Error = Infallible> + Clone + Send + 'static,
S::Future: Send,
F: std::future::Future<Output = ()> + Send + 'static,
{
let (signal_tx, signal_rx) = watch::channel(());
let signal_tx = Arc::new(signal_tx);
tokio::spawn(async move {
shutdown.await;
trace!("received graceful shutdown signal; stopping daemon listener");
drop(signal_rx);
});
let (close_tx, close_rx) = watch::channel(());
loop {
let (tcp_stream, remote_addr) = tokio::select! {
accepted = accept_tcp(&listener) => {
match accepted {
Some(connection) => connection,
None => continue,
}
}
_ = signal_tx.closed() => {
trace!("shutdown signal received; no longer accepting daemon connections");
break;
}
};
configure_tcp_stream(&tcp_stream);
let tcp_stream = TokioIo::new(tcp_stream);
poll_fn(|cx| make_service.poll_ready(cx))
.await
.unwrap_or_else(|err| match err {});
let tower_service = make_service
.call(remote_addr)
.await
.unwrap_or_else(|err| match err {})
.map_request(|req: Request<Incoming>| req.map(Body::new));
let hyper_service = TowerToHyperService::new(tower_service);
let signal_tx = Arc::clone(&signal_tx);
let close_rx = close_rx.clone();
tokio::spawn(async move {
let mut builder = HyperBuilder::new(TokioExecutor::new());
builder
.http1()
.timer(TokioTimer::new())
.header_read_timeout(HTTP1_HEADER_READ_TIMEOUT);
builder
.http2()
.timer(TokioTimer::new())
.keep_alive_interval(Some(HTTP2_KEEPALIVE_INTERVAL))
.keep_alive_timeout(HTTP2_KEEPALIVE_TIMEOUT);
let conn = builder.serve_connection_with_upgrades(tcp_stream, hyper_service);
pin_mut!(conn);
let signal_closed = signal_tx.closed().fuse();
pin_mut!(signal_closed);
loop {
tokio::select! {
result = conn.as_mut() => {
if let Err(err) = result {
trace!("daemon connection closed with error: {err:#}");
}
break;
}
_ = &mut signal_closed => {
trace!("shutdown signal received by connection task");
conn.as_mut().graceful_shutdown();
}
}
}
drop(close_rx);
});
}
drop(close_rx);
drop(listener);
close_tx.closed().await;
Ok(())
}
fn configure_tcp_stream(tcp_stream: &tokio::net::TcpStream) {
if let Err(err) = tcp_stream.set_nodelay(true) {
debug!("failed to set TCP_NODELAY on daemon connection: {err:#}");
}
let socket = SockRef::from(tcp_stream);
if let Err(err) = socket.set_tcp_keepalive(
&TcpKeepalive::new()
.with_time(TCP_KEEPALIVE_TIME)
.with_interval(TCP_KEEPALIVE_INTERVAL),
) {
debug!("failed to set TCP keepalive on daemon connection: {err:#}");
}
}
async fn accept_tcp(
listener: &tokio::net::TcpListener,
) -> Option<(tokio::net::TcpStream, SocketAddr)> {
match listener.accept().await {
Ok(connection) => Some(connection),
Err(err) => {
if is_connection_error(&err) {
return None;
}
error!("daemon accept error: {err}");
tokio::time::sleep(Duration::from_secs(1)).await;
None
}
}
}
fn is_connection_error(err: &io::Error) -> bool {
matches!(
err.kind(),
io::ErrorKind::ConnectionRefused
| io::ErrorKind::ConnectionAborted
| io::ErrorKind::ConnectionReset
)
}
fn current_unix_secs() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or(std::time::Duration::ZERO)
.as_secs()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
use crate::storage::HashtreeStore;
use hashtree_core::{from_hex, nhash_encode, DirEntry, HashTree, HashTreeConfig, LinkType};
use nostr::{EventBuilder, Keys, Kind, Timestamp};
use serde_json::json;
use tempfile::TempDir;
#[tokio::test]
async fn test_server_serve_file() -> Result<()> {
let temp_dir = TempDir::new()?;
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("db"))?);
let test_file = temp_dir.path().join("test.txt");
std::fs::write(&test_file, b"Hello, Hashtree!")?;
let cid = store.upload_file(&test_file)?;
let hash = from_hex(&cid)?;
let content = store.get_file(&hash)?;
assert!(content.is_some());
assert_eq!(content.unwrap(), b"Hello, Hashtree!");
Ok(())
}
#[tokio::test]
async fn test_server_list_pins() -> Result<()> {
let temp_dir = TempDir::new()?;
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("db"))?);
let test_file = temp_dir.path().join("test.txt");
std::fs::write(&test_file, b"Test")?;
let cid = store.upload_file(&test_file)?;
let hash = from_hex(&cid)?;
let pins = store.list_pins_raw()?;
assert_eq!(pins.len(), 1);
assert_eq!(pins[0], hash);
Ok(())
}
async fn spawn_test_server(
store: Arc<HashtreeStore>,
) -> Result<(u16, tokio::task::JoinHandle<Result<()>>)> {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
let port = listener.local_addr()?.port();
let server = HashtreeServer::new(store, "127.0.0.1:0".to_string());
let handle =
tokio::spawn(async move { server.run_with_listener(listener).await.map(|_| ()) });
Ok((port, handle))
}
async fn spawn_test_server_with_nostr_relay(
store: Arc<HashtreeStore>,
relay: Arc<NostrRelay>,
) -> Result<(u16, tokio::task::JoinHandle<Result<()>>)> {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
let port = listener.local_addr()?.port();
let server = HashtreeServer::new(store, "127.0.0.1:0".to_string()).with_nostr_relay(relay);
let handle =
tokio::spawn(async move { server.run_with_listener(listener).await.map(|_| ()) });
Ok((port, handle))
}
#[tokio::test]
async fn virtual_tree_hosts_serve_root_assets_and_spa_fallbacks() -> Result<()> {
clear_virtual_tree_hosts_for_test();
let temp_dir = TempDir::new()?;
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("db"))?);
let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
let (index_cid, _) = tree
.put(b"<!doctype html><title>Virtual host ok</title>")
.await?;
let (favicon_cid, _) = tree.put(b"ico").await?;
let (main_js_cid, _) = tree.put(b"console.log('ok');").await?;
let assets_dir = tree
.put_directory(vec![
DirEntry::from_cid("main.js", &main_js_cid).with_link_type(LinkType::File)
])
.await?;
let root_cid = tree
.put_directory(vec![
DirEntry::from_cid("index.html", &index_cid).with_link_type(LinkType::File),
DirEntry::from_cid("favicon.ico", &favicon_cid).with_link_type(LinkType::File),
DirEntry::from_cid("assets", &assets_dir).with_link_type(LinkType::Dir),
])
.await?;
let nhash = nhash_encode(&root_cid.hash)?;
let host = "tree-test.htree.localhost";
register_virtual_tree_host(host, &format!("/htree/{nhash}"));
let (port, handle) = spawn_test_server(store).await?;
let base_url = format!("http://127.0.0.1:{port}");
let host_header = format!("{host}:{port}");
let client = reqwest::Client::new();
let root_response = client
.get(format!("{base_url}/"))
.header("Host", &host_header)
.header("Accept", "text/html")
.send()
.await?;
assert_eq!(root_response.status(), reqwest::StatusCode::OK);
assert_eq!(
root_response.bytes().await?.as_ref(),
b"<!doctype html><title>Virtual host ok</title>"
);
let favicon_response = client
.get(format!("{base_url}/favicon.ico"))
.header("Host", &host_header)
.send()
.await?;
assert_eq!(favicon_response.status(), reqwest::StatusCode::OK);
assert_eq!(favicon_response.bytes().await?.as_ref(), b"ico");
let js_response = client
.get(format!("{base_url}/assets/main.js"))
.header("Host", &host_header)
.send()
.await?;
assert_eq!(js_response.status(), reqwest::StatusCode::OK);
assert_eq!(js_response.bytes().await?.as_ref(), b"console.log('ok');");
let profile_response = client
.get(format!("{base_url}/users/npub1example"))
.header("Host", &host_header)
.header("Accept", "text/html")
.send()
.await?;
assert_eq!(profile_response.status(), reqwest::StatusCode::OK);
assert_eq!(
profile_response.bytes().await?.as_ref(),
b"<!doctype html><title>Virtual host ok</title>"
);
handle.abort();
clear_virtual_tree_hosts_for_test();
Ok(())
}
#[tokio::test]
async fn nostr_profile_route_returns_latest_metadata_event() -> Result<()> {
let temp_dir = TempDir::new()?;
let store = Arc::new(HashtreeStore::new(temp_dir.path().join("db"))?);
let graph_store = {
let _guard = crate::socialgraph::test_lock();
crate::socialgraph::open_social_graph_store_with_mapsize(
&temp_dir.path().join("relay-db"),
Some(128 * 1024 * 1024),
)?
};
let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store;
let relay = Arc::new(NostrRelay::new(
backend,
temp_dir.path().to_path_buf(),
HashSet::new(),
None,
NostrRelayConfig {
spambox_db_max_bytes: 0,
..Default::default()
},
)?);
let author = Keys::generate();
let older = EventBuilder::new(
Kind::Metadata,
json!({ "name": "older", "about": "before" }).to_string(),
)
.custom_created_at(Timestamp::from_secs(10))
.sign_with_keys(&author)?;
let newer = EventBuilder::new(
Kind::Metadata,
json!({ "name": "newer", "about": "after" }).to_string(),
)
.custom_created_at(Timestamp::from_secs(20))
.sign_with_keys(&author)?;
relay.ingest_trusted_event(older).await?;
relay.ingest_trusted_event(newer.clone()).await?;
let (port, handle) = spawn_test_server_with_nostr_relay(store, relay).await?;
let response = reqwest::get(format!(
"http://127.0.0.1:{port}/api/nostr/profile/{}",
author.public_key().to_hex()
))
.await?;
assert_eq!(response.status(), reqwest::StatusCode::OK);
let payload: serde_json::Value = response.json().await?;
assert_eq!(payload["profile"]["name"].as_str(), Some("newer"),);
assert_eq!(payload["profile"]["about"].as_str(), Some("after"));
assert_eq!(payload["created_at"].as_u64(), Some(20));
let expected_event_id = newer.id.to_hex();
assert_eq!(
payload["event_id"].as_str(),
Some(expected_event_id.as_str())
);
handle.abort();
Ok(())
}
}