Skip to main content

hashtree_cli/server/
auth.rs

1use crate::blob_cache::BlobCache;
2use crate::fips_transport::DaemonFipsTransport;
3use crate::nostr_relay::NostrRelay;
4use crate::socialgraph;
5use crate::storage::HashtreeStore;
6use crate::webrtc::{PeerRootEvent, WebRTCState};
7use axum::{
8    body::Body,
9    extract::ws::Message,
10    extract::State,
11    http::{header, Request, Response, StatusCode},
12    middleware::Next,
13};
14use futures::future::{BoxFuture, Shared};
15use hashtree_core::{Cid, LinkType, TreeEntry};
16use lru::LruCache;
17use std::collections::{HashMap, HashSet};
18use std::hash::Hash;
19use std::num::NonZeroUsize;
20use std::sync::{
21    atomic::{AtomicU32, AtomicU64, Ordering},
22    Arc, Mutex as StdMutex,
23};
24use std::time::{Duration, Instant};
25use tokio::{
26    sync::{mpsc, watch, Mutex, Semaphore},
27    task::JoinHandle,
28};
29
30const LOOKUP_CACHE_CAPACITY: usize = 4096;
31const LOOKUP_CACHE_HIT_TTL: Duration = Duration::from_secs(300);
32const LOOKUP_CACHE_MISS_TTL: Duration = Duration::from_secs(1);
33
34#[derive(Debug, Clone)]
35pub enum LookupResult<T> {
36    Hit(T),
37    Miss,
38}
39
40impl<T> LookupResult<T> {
41    pub fn from_option(value: Option<T>) -> Self {
42        match value {
43            Some(value) => Self::Hit(value),
44            None => Self::Miss,
45        }
46    }
47
48    pub fn into_option(self) -> Option<T> {
49        match self {
50            Self::Hit(value) => Some(value),
51            Self::Miss => None,
52        }
53    }
54
55    pub fn ttl(&self) -> Duration {
56        match self {
57            Self::Hit(_) => LOOKUP_CACHE_HIT_TTL,
58            Self::Miss => LOOKUP_CACHE_MISS_TTL,
59        }
60    }
61}
62
63pub struct TimedLruCache<K, V> {
64    cache: LruCache<K, TimedValue<V>>,
65}
66
67#[derive(Clone)]
68struct TimedValue<V> {
69    value: V,
70    expires_at: Instant,
71}
72
73impl<K: Eq + Hash, V: Clone> TimedLruCache<K, V> {
74    pub fn new(capacity: usize) -> Self {
75        Self {
76            cache: LruCache::new(NonZeroUsize::new(capacity.max(1)).unwrap()),
77        }
78    }
79
80    pub fn get_cloned(&mut self, key: &K) -> Option<V> {
81        let now = Instant::now();
82        if let Some(entry) = self.cache.get(key) {
83            if entry.expires_at > now {
84                return Some(entry.value.clone());
85            }
86        }
87        self.cache.pop(key);
88        None
89    }
90
91    pub fn put(&mut self, key: K, value: V, ttl: Duration) {
92        self.cache.put(
93            key,
94            TimedValue {
95                value,
96                expires_at: Instant::now() + ttl,
97            },
98        );
99    }
100}
101
102pub fn new_lookup_cache<K: Eq + Hash, V: Clone>() -> TimedLruCache<K, V> {
103    TimedLruCache::new(LOOKUP_CACHE_CAPACITY)
104}
105
106#[derive(Debug, Clone)]
107pub struct CachedResolvedPathEntry {
108    pub cid: Cid,
109    pub link_type: LinkType,
110}
111
112#[derive(Debug, Clone)]
113pub struct CachedTreeRootEntry {
114    pub cid: Cid,
115    pub source: &'static str,
116    pub root_event: Option<PeerRootEvent>,
117    pub cached_at: Instant,
118}
119
120pub type SharedBlobFetch = Shared<BoxFuture<'static, bool>>;
121pub type SharedBlobRead = Shared<BoxFuture<'static, Result<Option<Vec<u8>>, String>>>;
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq)]
124pub enum WsProtocol {
125    HashtreeJson,
126    HashtreeMsgpack,
127    Unknown,
128}
129
130pub struct PendingRequest {
131    pub origin_id: u64,
132    pub hash: String,
133    pub found: bool,
134    pub origin_protocol: WsProtocol,
135}
136
137pub struct UpstreamNostrSubscription {
138    pub close_tx: watch::Sender<bool>,
139    pub tasks: Vec<JoinHandle<()>>,
140}
141
142pub struct WsRelayState {
143    pub clients: Mutex<HashMap<u64, mpsc::UnboundedSender<Message>>>,
144    pub pending: Mutex<HashMap<(u64, u32), PendingRequest>>,
145    pub client_protocols: Mutex<HashMap<u64, WsProtocol>>,
146    pub upstream_nostr_subscriptions: Mutex<HashMap<(u64, String), UpstreamNostrSubscription>>,
147    pub upstream_seen_events: Mutex<HashMap<(u64, String), HashSet<String>>>,
148    pub upstream_pending_eose: Mutex<HashMap<(u64, String), usize>>,
149    pub next_client_id: AtomicU64,
150    pub next_request_id: AtomicU32,
151    pub upstream_relay_bytes_sent: AtomicU64,
152    pub upstream_relay_bytes_received: AtomicU64,
153}
154
155impl WsRelayState {
156    pub fn new() -> Self {
157        Self {
158            clients: Mutex::new(HashMap::new()),
159            pending: Mutex::new(HashMap::new()),
160            client_protocols: Mutex::new(HashMap::new()),
161            upstream_nostr_subscriptions: Mutex::new(HashMap::new()),
162            upstream_seen_events: Mutex::new(HashMap::new()),
163            upstream_pending_eose: Mutex::new(HashMap::new()),
164            next_client_id: AtomicU64::new(1),
165            next_request_id: AtomicU32::new(1),
166            upstream_relay_bytes_sent: AtomicU64::new(0),
167            upstream_relay_bytes_received: AtomicU64::new(0),
168        }
169    }
170
171    pub fn next_id(&self) -> u64 {
172        self.next_client_id.fetch_add(1, Ordering::SeqCst)
173    }
174
175    pub fn next_request_id(&self) -> u32 {
176        self.next_request_id.fetch_add(1, Ordering::SeqCst)
177    }
178
179    pub fn note_upstream_relay_send(&self, bytes: usize) {
180        self.upstream_relay_bytes_sent
181            .fetch_add(bytes as u64, Ordering::Relaxed);
182    }
183
184    pub fn note_upstream_relay_receive(&self, bytes: usize) {
185        self.upstream_relay_bytes_received
186            .fetch_add(bytes as u64, Ordering::Relaxed);
187    }
188
189    pub fn upstream_relay_bandwidth(&self) -> (u64, u64) {
190        (
191            self.upstream_relay_bytes_sent.load(Ordering::Relaxed),
192            self.upstream_relay_bytes_received.load(Ordering::Relaxed),
193        )
194    }
195}
196
197#[derive(Clone)]
198pub struct AppState {
199    pub store: Arc<HashtreeStore>,
200    pub auth: Option<AuthCredentials>,
201    /// Unix timestamp when this daemon state was created.
202    pub daemon_started_at: u64,
203    pub peer_mode: crate::config::ServerMode,
204    pub hash_get_enabled: bool,
205    /// Whether HTTP cache misses should ask connected WebRTC peers before
206    /// falling back to upstream Blossom.
207    pub http_webrtc_fetch: bool,
208    /// WebRTC peer state for forwarding requests to connected P2P peers
209    pub webrtc_peers: Option<Arc<WebRTCState>>,
210    /// FIPS-backed Hashtree blob transport for peer fetches and responses.
211    pub fips_transport: Option<Arc<DaemonFipsTransport>>,
212    pub fetch_from_fips_peers: bool,
213    /// WebSocket relay state for /ws clients
214    pub ws_relay: Arc<WsRelayState>,
215    /// Maximum upload size in bytes for Blossom uploads (default: 5 MB)
216    pub max_upload_bytes: usize,
217    /// Allow anyone with valid Nostr auth to write (default: true)
218    /// When false, only allowed_pubkeys can write
219    pub public_writes: bool,
220    /// Allow public plaintext reads from mutable npub routes (default: true)
221    /// When false, only allowed_pubkeys or social graph approved pubkeys can read.
222    pub public_plaintext_reads: bool,
223    /// Require untrusted cached blob ingress to look like encrypted CHK blobs.
224    pub require_random_untrusted_ingest: bool,
225    /// Return from Blossom upload after validation while storage writes finish in
226    /// a bounded background queue.
227    pub optimistic_blossom_uploads: bool,
228    /// Background upload queue byte budget. Each queued body holds one permit per
229    /// byte until the storage write completes.
230    pub optimistic_upload_queue_bytes: usize,
231    pub optimistic_upload_queue: Arc<Semaphore>,
232    /// Pubkeys allowed to write (hex format, from config allowed_npubs)
233    pub allowed_pubkeys: HashSet<String>,
234    /// Upstream Blossom servers for cascade fetching
235    pub upstream_blossom: Vec<String>,
236    /// Social graph access control
237    pub social_graph: Option<Arc<socialgraph::SocialGraphAccessControl>>,
238    /// Social graph store handle for snapshot export
239    pub social_graph_store: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
240    /// Social graph root pubkey bytes for snapshot export
241    pub social_graph_root: Option<[u8; 32]>,
242    /// Allow public access to social graph snapshot endpoint
243    pub socialgraph_snapshot_public: bool,
244    /// Nostr relay state for /ws and WebRTC Nostr messages
245    pub nostr_relay: Option<Arc<NostrRelay>>,
246    /// Active upstream Nostr relays for HTTP resolver operations.
247    pub nostr_relay_urls: Vec<String>,
248    /// In-process cache for resolved mutable tree roots, keyed by npub/tree(+key)
249    pub tree_root_cache: Arc<StdMutex<HashMap<String, CachedTreeRootEntry>>>,
250    /// Shared in-flight blob fetches so concurrent misses only hit upstream once per hash
251    pub inflight_blob_fetches: Arc<Mutex<HashMap<String, SharedBlobFetch>>>,
252    /// Shared in-flight local blob reads so request bursts for the same hash
253    /// only spend one blocking storage read.
254    pub inflight_blob_reads: Arc<Mutex<HashMap<String, SharedBlobRead>>>,
255    /// Bounded hot cache for immutable blob bodies and metadata probes.
256    pub(super) blob_cache: Arc<BlobCache>,
257    /// Immutable directory listings keyed by CID
258    pub directory_listing_cache: Arc<StdMutex<TimedLruCache<String, LookupResult<Vec<TreeEntry>>>>>,
259    /// Immutable resolved paths keyed by root CID + path
260    pub resolved_path_cache:
261        Arc<StdMutex<TimedLruCache<String, LookupResult<CachedResolvedPathEntry>>>>,
262    /// Immutable thumbnail alias resolutions keyed by root CID + alias path
263    pub thumbnail_path_cache: Arc<StdMutex<TimedLruCache<String, LookupResult<String>>>>,
264    /// Immutable file sizes keyed by CID
265    pub cid_size_cache: Arc<StdMutex<TimedLruCache<String, LookupResult<u64>>>>,
266}
267
268#[derive(Clone)]
269pub struct AuthCredentials {
270    pub username: String,
271    pub password: String,
272}
273
274/// Auth middleware - validates HTTP Basic Auth
275pub async fn auth_middleware(
276    State(state): State<AppState>,
277    request: Request<Body>,
278    next: Next,
279) -> Result<Response<Body>, StatusCode> {
280    // If auth is not enabled, allow request
281    let Some(auth) = &state.auth else {
282        return Ok(next.run(request).await);
283    };
284
285    // Check Authorization header
286    let auth_header = request
287        .headers()
288        .get(header::AUTHORIZATION)
289        .and_then(|v| v.to_str().ok());
290
291    let authorized = if let Some(header_value) = auth_header {
292        if let Some(credentials) = header_value.strip_prefix("Basic ") {
293            use base64::Engine;
294            let engine = base64::engine::general_purpose::STANDARD;
295            if let Ok(decoded) = engine.decode(credentials) {
296                if let Ok(decoded_str) = String::from_utf8(decoded) {
297                    let expected = format!("{}:{}", auth.username, auth.password);
298                    decoded_str == expected
299                } else {
300                    false
301                }
302            } else {
303                false
304            }
305        } else {
306            false
307        }
308    } else {
309        false
310    };
311
312    if authorized {
313        Ok(next.run(request).await)
314    } else {
315        Ok(Response::builder()
316            .status(StatusCode::UNAUTHORIZED)
317            .header(header::WWW_AUTHENTICATE, "Basic realm=\"hashtree\"")
318            .body(Body::from("Unauthorized"))
319            .unwrap())
320    }
321}