1use crate::blob_cache::BlobCache;
2use crate::fips_transport::DaemonFipsTransport;
3use crate::nostr_relay::NostrRelay;
4use crate::socialgraph;
5use crate::storage::HashtreeStore;
6use crate::webrtc::{PeerRootEvent, WebRTCState};
7use axum::{
8 body::Body,
9 extract::ws::Message,
10 extract::State,
11 http::{header, Request, Response, StatusCode},
12 middleware::Next,
13};
14use futures::future::{BoxFuture, Shared};
15use hashtree_core::{Cid, LinkType, TreeEntry};
16use lru::LruCache;
17use std::collections::{HashMap, HashSet};
18use std::hash::Hash;
19use std::num::NonZeroUsize;
20use std::sync::{
21 atomic::{AtomicU32, AtomicU64, Ordering},
22 Arc, Mutex as StdMutex,
23};
24use std::time::{Duration, Instant};
25use tokio::{
26 sync::{mpsc, watch, Mutex, Semaphore},
27 task::JoinHandle,
28};
29
30const LOOKUP_CACHE_CAPACITY: usize = 4096;
31const LOOKUP_CACHE_HIT_TTL: Duration = Duration::from_secs(300);
32const LOOKUP_CACHE_MISS_TTL: Duration = Duration::from_secs(1);
33
34#[derive(Debug, Clone)]
35pub enum LookupResult<T> {
36 Hit(T),
37 Miss,
38}
39
40impl<T> LookupResult<T> {
41 pub fn from_option(value: Option<T>) -> Self {
42 match value {
43 Some(value) => Self::Hit(value),
44 None => Self::Miss,
45 }
46 }
47
48 pub fn into_option(self) -> Option<T> {
49 match self {
50 Self::Hit(value) => Some(value),
51 Self::Miss => None,
52 }
53 }
54
55 pub fn ttl(&self) -> Duration {
56 match self {
57 Self::Hit(_) => LOOKUP_CACHE_HIT_TTL,
58 Self::Miss => LOOKUP_CACHE_MISS_TTL,
59 }
60 }
61}
62
63pub struct TimedLruCache<K, V> {
64 cache: LruCache<K, TimedValue<V>>,
65}
66
67#[derive(Clone)]
68struct TimedValue<V> {
69 value: V,
70 expires_at: Instant,
71}
72
73impl<K: Eq + Hash, V: Clone> TimedLruCache<K, V> {
74 pub fn new(capacity: usize) -> Self {
75 Self {
76 cache: LruCache::new(NonZeroUsize::new(capacity.max(1)).unwrap()),
77 }
78 }
79
80 pub fn get_cloned(&mut self, key: &K) -> Option<V> {
81 let now = Instant::now();
82 if let Some(entry) = self.cache.get(key) {
83 if entry.expires_at > now {
84 return Some(entry.value.clone());
85 }
86 }
87 self.cache.pop(key);
88 None
89 }
90
91 pub fn put(&mut self, key: K, value: V, ttl: Duration) {
92 self.cache.put(
93 key,
94 TimedValue {
95 value,
96 expires_at: Instant::now() + ttl,
97 },
98 );
99 }
100}
101
102pub fn new_lookup_cache<K: Eq + Hash, V: Clone>() -> TimedLruCache<K, V> {
103 TimedLruCache::new(LOOKUP_CACHE_CAPACITY)
104}
105
106#[derive(Debug, Clone)]
107pub struct CachedResolvedPathEntry {
108 pub cid: Cid,
109 pub link_type: LinkType,
110}
111
112#[derive(Debug, Clone)]
113pub struct CachedTreeRootEntry {
114 pub cid: Cid,
115 pub source: &'static str,
116 pub root_event: Option<PeerRootEvent>,
117 pub cached_at: Instant,
118}
119
120pub type SharedBlobFetch = Shared<BoxFuture<'static, bool>>;
121pub type SharedBlobRead = Shared<BoxFuture<'static, Result<Option<Vec<u8>>, String>>>;
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq)]
124pub enum WsProtocol {
125 HashtreeJson,
126 HashtreeMsgpack,
127 Unknown,
128}
129
130pub struct PendingRequest {
131 pub origin_id: u64,
132 pub hash: String,
133 pub found: bool,
134 pub origin_protocol: WsProtocol,
135}
136
137pub struct UpstreamNostrSubscription {
138 pub close_tx: watch::Sender<bool>,
139 pub tasks: Vec<JoinHandle<()>>,
140}
141
142pub struct WsRelayState {
143 pub clients: Mutex<HashMap<u64, mpsc::UnboundedSender<Message>>>,
144 pub pending: Mutex<HashMap<(u64, u32), PendingRequest>>,
145 pub client_protocols: Mutex<HashMap<u64, WsProtocol>>,
146 pub upstream_nostr_subscriptions: Mutex<HashMap<(u64, String), UpstreamNostrSubscription>>,
147 pub upstream_seen_events: Mutex<HashMap<(u64, String), HashSet<String>>>,
148 pub upstream_pending_eose: Mutex<HashMap<(u64, String), usize>>,
149 pub next_client_id: AtomicU64,
150 pub next_request_id: AtomicU32,
151 pub upstream_relay_bytes_sent: AtomicU64,
152 pub upstream_relay_bytes_received: AtomicU64,
153}
154
155impl WsRelayState {
156 pub fn new() -> Self {
157 Self {
158 clients: Mutex::new(HashMap::new()),
159 pending: Mutex::new(HashMap::new()),
160 client_protocols: Mutex::new(HashMap::new()),
161 upstream_nostr_subscriptions: Mutex::new(HashMap::new()),
162 upstream_seen_events: Mutex::new(HashMap::new()),
163 upstream_pending_eose: Mutex::new(HashMap::new()),
164 next_client_id: AtomicU64::new(1),
165 next_request_id: AtomicU32::new(1),
166 upstream_relay_bytes_sent: AtomicU64::new(0),
167 upstream_relay_bytes_received: AtomicU64::new(0),
168 }
169 }
170
171 pub fn next_id(&self) -> u64 {
172 self.next_client_id.fetch_add(1, Ordering::SeqCst)
173 }
174
175 pub fn next_request_id(&self) -> u32 {
176 self.next_request_id.fetch_add(1, Ordering::SeqCst)
177 }
178
179 pub fn note_upstream_relay_send(&self, bytes: usize) {
180 self.upstream_relay_bytes_sent
181 .fetch_add(bytes as u64, Ordering::Relaxed);
182 }
183
184 pub fn note_upstream_relay_receive(&self, bytes: usize) {
185 self.upstream_relay_bytes_received
186 .fetch_add(bytes as u64, Ordering::Relaxed);
187 }
188
189 pub fn upstream_relay_bandwidth(&self) -> (u64, u64) {
190 (
191 self.upstream_relay_bytes_sent.load(Ordering::Relaxed),
192 self.upstream_relay_bytes_received.load(Ordering::Relaxed),
193 )
194 }
195}
196
197#[derive(Clone)]
198pub struct AppState {
199 pub store: Arc<HashtreeStore>,
200 pub auth: Option<AuthCredentials>,
201 pub daemon_started_at: u64,
203 pub peer_mode: crate::config::ServerMode,
204 pub hash_get_enabled: bool,
205 pub http_webrtc_fetch: bool,
208 pub webrtc_peers: Option<Arc<WebRTCState>>,
210 pub fips_transport: Option<Arc<DaemonFipsTransport>>,
212 pub fetch_from_fips_peers: bool,
213 pub ws_relay: Arc<WsRelayState>,
215 pub max_upload_bytes: usize,
217 pub public_writes: bool,
220 pub public_plaintext_reads: bool,
223 pub require_random_untrusted_ingest: bool,
225 pub optimistic_blossom_uploads: bool,
228 pub optimistic_upload_queue_bytes: usize,
231 pub optimistic_upload_queue: Arc<Semaphore>,
232 pub allowed_pubkeys: HashSet<String>,
234 pub upstream_blossom: Vec<String>,
236 pub social_graph: Option<Arc<socialgraph::SocialGraphAccessControl>>,
238 pub social_graph_store: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
240 pub social_graph_root: Option<[u8; 32]>,
242 pub socialgraph_snapshot_public: bool,
244 pub nostr_relay: Option<Arc<NostrRelay>>,
246 pub nostr_relay_urls: Vec<String>,
248 pub tree_root_cache: Arc<StdMutex<HashMap<String, CachedTreeRootEntry>>>,
250 pub inflight_blob_fetches: Arc<Mutex<HashMap<String, SharedBlobFetch>>>,
252 pub inflight_blob_reads: Arc<Mutex<HashMap<String, SharedBlobRead>>>,
255 pub(super) blob_cache: Arc<BlobCache>,
257 pub directory_listing_cache: Arc<StdMutex<TimedLruCache<String, LookupResult<Vec<TreeEntry>>>>>,
259 pub resolved_path_cache:
261 Arc<StdMutex<TimedLruCache<String, LookupResult<CachedResolvedPathEntry>>>>,
262 pub thumbnail_path_cache: Arc<StdMutex<TimedLruCache<String, LookupResult<String>>>>,
264 pub cid_size_cache: Arc<StdMutex<TimedLruCache<String, LookupResult<u64>>>>,
266}
267
268#[derive(Clone)]
269pub struct AuthCredentials {
270 pub username: String,
271 pub password: String,
272}
273
274pub async fn auth_middleware(
276 State(state): State<AppState>,
277 request: Request<Body>,
278 next: Next,
279) -> Result<Response<Body>, StatusCode> {
280 let Some(auth) = &state.auth else {
282 return Ok(next.run(request).await);
283 };
284
285 let auth_header = request
287 .headers()
288 .get(header::AUTHORIZATION)
289 .and_then(|v| v.to_str().ok());
290
291 let authorized = if let Some(header_value) = auth_header {
292 if let Some(credentials) = header_value.strip_prefix("Basic ") {
293 use base64::Engine;
294 let engine = base64::engine::general_purpose::STANDARD;
295 if let Ok(decoded) = engine.decode(credentials) {
296 if let Ok(decoded_str) = String::from_utf8(decoded) {
297 let expected = format!("{}:{}", auth.username, auth.password);
298 decoded_str == expected
299 } else {
300 false
301 }
302 } else {
303 false
304 }
305 } else {
306 false
307 }
308 } else {
309 false
310 };
311
312 if authorized {
313 Ok(next.run(request).await)
314 } else {
315 Ok(Response::builder()
316 .status(StatusCode::UNAUTHORIZED)
317 .header(header::WWW_AUTHENTICATE, "Basic realm=\"hashtree\"")
318 .body(Body::from("Unauthorized"))
319 .unwrap())
320 }
321}