Skip to main content

hashtree_cli/server/
auth.rs

1use crate::blob_cache::BlobCache;
2use crate::nostr_relay::NostrRelay;
3use crate::socialgraph;
4use crate::storage::HashtreeStore;
5use crate::webrtc::{PeerRootEvent, WebRTCState};
6use axum::{
7    body::Body,
8    extract::ws::Message,
9    extract::State,
10    http::{header, Request, Response, StatusCode},
11    middleware::Next,
12};
13use futures::future::{BoxFuture, Shared};
14use hashtree_core::{Cid, LinkType, TreeEntry};
15use lru::LruCache;
16use std::collections::{HashMap, HashSet};
17use std::hash::Hash;
18use std::num::NonZeroUsize;
19use std::sync::{
20    atomic::{AtomicU32, AtomicU64, Ordering},
21    Arc, Mutex as StdMutex,
22};
23use std::time::{Duration, Instant};
24use tokio::{
25    sync::{mpsc, watch, Mutex, Semaphore},
26    task::JoinHandle,
27};
28
29const LOOKUP_CACHE_CAPACITY: usize = 4096;
30const LOOKUP_CACHE_HIT_TTL: Duration = Duration::from_secs(300);
31const LOOKUP_CACHE_MISS_TTL: Duration = Duration::from_secs(1);
32
33#[derive(Debug, Clone)]
34pub enum LookupResult<T> {
35    Hit(T),
36    Miss,
37}
38
39impl<T> LookupResult<T> {
40    pub fn from_option(value: Option<T>) -> Self {
41        match value {
42            Some(value) => Self::Hit(value),
43            None => Self::Miss,
44        }
45    }
46
47    pub fn into_option(self) -> Option<T> {
48        match self {
49            Self::Hit(value) => Some(value),
50            Self::Miss => None,
51        }
52    }
53
54    pub fn ttl(&self) -> Duration {
55        match self {
56            Self::Hit(_) => LOOKUP_CACHE_HIT_TTL,
57            Self::Miss => LOOKUP_CACHE_MISS_TTL,
58        }
59    }
60}
61
62pub struct TimedLruCache<K, V> {
63    cache: LruCache<K, TimedValue<V>>,
64}
65
66#[derive(Clone)]
67struct TimedValue<V> {
68    value: V,
69    expires_at: Instant,
70}
71
72impl<K: Eq + Hash, V: Clone> TimedLruCache<K, V> {
73    pub fn new(capacity: usize) -> Self {
74        Self {
75            cache: LruCache::new(NonZeroUsize::new(capacity.max(1)).unwrap()),
76        }
77    }
78
79    pub fn get_cloned(&mut self, key: &K) -> Option<V> {
80        let now = Instant::now();
81        if let Some(entry) = self.cache.get(key) {
82            if entry.expires_at > now {
83                return Some(entry.value.clone());
84            }
85        }
86        self.cache.pop(key);
87        None
88    }
89
90    pub fn put(&mut self, key: K, value: V, ttl: Duration) {
91        self.cache.put(
92            key,
93            TimedValue {
94                value,
95                expires_at: Instant::now() + ttl,
96            },
97        );
98    }
99}
100
101pub fn new_lookup_cache<K: Eq + Hash, V: Clone>() -> TimedLruCache<K, V> {
102    TimedLruCache::new(LOOKUP_CACHE_CAPACITY)
103}
104
105#[derive(Debug, Clone)]
106pub struct CachedResolvedPathEntry {
107    pub cid: Cid,
108    pub link_type: LinkType,
109}
110
111#[derive(Debug, Clone)]
112pub struct CachedTreeRootEntry {
113    pub cid: Cid,
114    pub source: &'static str,
115    pub root_event: Option<PeerRootEvent>,
116}
117
118pub type SharedBlobFetch = Shared<BoxFuture<'static, bool>>;
119pub type SharedBlobRead = Shared<BoxFuture<'static, Result<Option<Vec<u8>>, String>>>;
120
121#[derive(Debug, Clone, Copy, PartialEq, Eq)]
122pub enum WsProtocol {
123    HashtreeJson,
124    HashtreeMsgpack,
125    Unknown,
126}
127
128pub struct PendingRequest {
129    pub origin_id: u64,
130    pub hash: String,
131    pub found: bool,
132    pub origin_protocol: WsProtocol,
133}
134
135pub struct UpstreamNostrSubscription {
136    pub close_tx: watch::Sender<bool>,
137    pub tasks: Vec<JoinHandle<()>>,
138}
139
140pub struct WsRelayState {
141    pub clients: Mutex<HashMap<u64, mpsc::UnboundedSender<Message>>>,
142    pub pending: Mutex<HashMap<(u64, u32), PendingRequest>>,
143    pub client_protocols: Mutex<HashMap<u64, WsProtocol>>,
144    pub upstream_nostr_subscriptions: Mutex<HashMap<(u64, String), UpstreamNostrSubscription>>,
145    pub upstream_seen_events: Mutex<HashMap<(u64, String), HashSet<String>>>,
146    pub upstream_pending_eose: Mutex<HashMap<(u64, String), usize>>,
147    pub next_client_id: AtomicU64,
148    pub next_request_id: AtomicU32,
149    pub upstream_relay_bytes_sent: AtomicU64,
150    pub upstream_relay_bytes_received: AtomicU64,
151}
152
153impl WsRelayState {
154    pub fn new() -> Self {
155        Self {
156            clients: Mutex::new(HashMap::new()),
157            pending: Mutex::new(HashMap::new()),
158            client_protocols: Mutex::new(HashMap::new()),
159            upstream_nostr_subscriptions: Mutex::new(HashMap::new()),
160            upstream_seen_events: Mutex::new(HashMap::new()),
161            upstream_pending_eose: Mutex::new(HashMap::new()),
162            next_client_id: AtomicU64::new(1),
163            next_request_id: AtomicU32::new(1),
164            upstream_relay_bytes_sent: AtomicU64::new(0),
165            upstream_relay_bytes_received: AtomicU64::new(0),
166        }
167    }
168
169    pub fn next_id(&self) -> u64 {
170        self.next_client_id.fetch_add(1, Ordering::SeqCst)
171    }
172
173    pub fn next_request_id(&self) -> u32 {
174        self.next_request_id.fetch_add(1, Ordering::SeqCst)
175    }
176
177    pub fn note_upstream_relay_send(&self, bytes: usize) {
178        self.upstream_relay_bytes_sent
179            .fetch_add(bytes as u64, Ordering::Relaxed);
180    }
181
182    pub fn note_upstream_relay_receive(&self, bytes: usize) {
183        self.upstream_relay_bytes_received
184            .fetch_add(bytes as u64, Ordering::Relaxed);
185    }
186
187    pub fn upstream_relay_bandwidth(&self) -> (u64, u64) {
188        (
189            self.upstream_relay_bytes_sent.load(Ordering::Relaxed),
190            self.upstream_relay_bytes_received.load(Ordering::Relaxed),
191        )
192    }
193}
194
195#[derive(Clone)]
196pub struct AppState {
197    pub store: Arc<HashtreeStore>,
198    pub auth: Option<AuthCredentials>,
199    /// Unix timestamp when this daemon state was created.
200    pub daemon_started_at: u64,
201    pub peer_mode: crate::config::ServerMode,
202    pub hash_get_enabled: bool,
203    /// Whether HTTP cache misses should ask connected WebRTC peers before
204    /// falling back to upstream Blossom.
205    pub http_webrtc_fetch: bool,
206    /// WebRTC peer state for forwarding requests to connected P2P peers
207    pub webrtc_peers: Option<Arc<WebRTCState>>,
208    /// WebSocket relay state for /ws clients
209    pub ws_relay: Arc<WsRelayState>,
210    /// Maximum upload size in bytes for Blossom uploads (default: 5 MB)
211    pub max_upload_bytes: usize,
212    /// Allow anyone with valid Nostr auth to write (default: true)
213    /// When false, only allowed_pubkeys can write
214    pub public_writes: bool,
215    /// Require untrusted cached blob ingress to look like encrypted CHK blobs.
216    pub require_random_untrusted_ingest: bool,
217    /// Return from Blossom upload after validation while storage writes finish in
218    /// a bounded background queue.
219    pub optimistic_blossom_uploads: bool,
220    /// Background upload queue byte budget. Each queued body holds one permit per
221    /// byte until the storage write completes.
222    pub optimistic_upload_queue_bytes: usize,
223    pub optimistic_upload_queue: Arc<Semaphore>,
224    /// Pubkeys allowed to write (hex format, from config allowed_npubs)
225    pub allowed_pubkeys: HashSet<String>,
226    /// Upstream Blossom servers for cascade fetching
227    pub upstream_blossom: Vec<String>,
228    /// Social graph access control
229    pub social_graph: Option<Arc<socialgraph::SocialGraphAccessControl>>,
230    /// Social graph store handle for snapshot export
231    pub social_graph_store: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
232    /// Social graph root pubkey bytes for snapshot export
233    pub social_graph_root: Option<[u8; 32]>,
234    /// Allow public access to social graph snapshot endpoint
235    pub socialgraph_snapshot_public: bool,
236    /// Nostr relay state for /ws and WebRTC Nostr messages
237    pub nostr_relay: Option<Arc<NostrRelay>>,
238    /// Active upstream Nostr relays for HTTP resolver operations.
239    pub nostr_relay_urls: Vec<String>,
240    /// In-process cache for resolved mutable tree roots, keyed by npub/tree(+key)
241    pub tree_root_cache: Arc<StdMutex<HashMap<String, CachedTreeRootEntry>>>,
242    /// Shared in-flight blob fetches so concurrent misses only hit upstream once per hash
243    pub inflight_blob_fetches: Arc<Mutex<HashMap<String, SharedBlobFetch>>>,
244    /// Shared in-flight local blob reads so request bursts for the same hash
245    /// only spend one blocking storage read.
246    pub inflight_blob_reads: Arc<Mutex<HashMap<String, SharedBlobRead>>>,
247    /// Bounded hot cache for immutable blob bodies and metadata probes.
248    pub(super) blob_cache: Arc<BlobCache>,
249    /// Immutable directory listings keyed by CID
250    pub directory_listing_cache: Arc<StdMutex<TimedLruCache<String, LookupResult<Vec<TreeEntry>>>>>,
251    /// Immutable resolved paths keyed by root CID + path
252    pub resolved_path_cache:
253        Arc<StdMutex<TimedLruCache<String, LookupResult<CachedResolvedPathEntry>>>>,
254    /// Immutable thumbnail alias resolutions keyed by root CID + alias path
255    pub thumbnail_path_cache: Arc<StdMutex<TimedLruCache<String, LookupResult<String>>>>,
256    /// Immutable file sizes keyed by CID
257    pub cid_size_cache: Arc<StdMutex<TimedLruCache<String, LookupResult<u64>>>>,
258}
259
260#[derive(Clone)]
261pub struct AuthCredentials {
262    pub username: String,
263    pub password: String,
264}
265
266/// Auth middleware - validates HTTP Basic Auth
267pub async fn auth_middleware(
268    State(state): State<AppState>,
269    request: Request<Body>,
270    next: Next,
271) -> Result<Response<Body>, StatusCode> {
272    // If auth is not enabled, allow request
273    let Some(auth) = &state.auth else {
274        return Ok(next.run(request).await);
275    };
276
277    // Check Authorization header
278    let auth_header = request
279        .headers()
280        .get(header::AUTHORIZATION)
281        .and_then(|v| v.to_str().ok());
282
283    let authorized = if let Some(header_value) = auth_header {
284        if let Some(credentials) = header_value.strip_prefix("Basic ") {
285            use base64::Engine;
286            let engine = base64::engine::general_purpose::STANDARD;
287            if let Ok(decoded) = engine.decode(credentials) {
288                if let Ok(decoded_str) = String::from_utf8(decoded) {
289                    let expected = format!("{}:{}", auth.username, auth.password);
290                    decoded_str == expected
291                } else {
292                    false
293                }
294            } else {
295                false
296            }
297        } else {
298            false
299        }
300    } else {
301        false
302    };
303
304    if authorized {
305        Ok(next.run(request).await)
306    } else {
307        Ok(Response::builder()
308            .status(StatusCode::UNAUTHORIZED)
309            .header(header::WWW_AUTHENTICATE, "Basic realm=\"hashtree\"")
310            .body(Body::from("Unauthorized"))
311            .unwrap())
312    }
313}