1use crate::close::Close;
3use crate::close::CloseCmd;
4use crate::config::{Settings, VerifiedUsersMode};
5use crate::conn;
6use crate::db;
7use crate::db::SubmittedEvent;
8use crate::error::{Error, Result};
9use crate::event::Event;
10use crate::event::EventCmd;
11use crate::event::EventWrapper;
12use crate::info::RelayInfo;
13use crate::nip05;
14use crate::notice::Notice;
15use crate::payment;
16use crate::payment::InvoiceInfo;
17use crate::payment::PaymentMessage;
18use crate::repo::NostrRepo;
19use crate::server::Error::CommandUnknownError;
20use crate::server::EventWrapper::{WrappedAuth, WrappedEvent};
21use crate::subscription::Subscription;
22use futures::SinkExt;
23use futures::StreamExt;
24use governor::{Jitter, Quota, RateLimiter};
25use http::header::HeaderMap;
26use hyper::body::to_bytes;
27use hyper::header::ACCEPT;
28use hyper::service::{make_service_fn, service_fn};
29use hyper::upgrade::Upgraded;
30use hyper::{
31 header, server::conn::AddrStream, upgrade, Body, Request, Response, Server, StatusCode,
32};
33use prometheus::IntCounterVec;
34use prometheus::IntGauge;
35use prometheus::{Encoder, Histogram, HistogramOpts, IntCounter, Opts, Registry, TextEncoder};
36use qrcode::render::svg;
37use qrcode::QrCode;
38use serde::{Deserialize, Serialize};
39use serde_json::json;
40use std::collections::HashMap;
41use std::convert::Infallible;
42use std::fs::File;
43use std::io::BufReader;
44use std::io::Read;
45use std::net::SocketAddr;
46use std::path::Path;
47use std::sync::atomic::Ordering;
48use std::sync::mpsc::Receiver as MpscReceiver;
49use std::sync::Arc;
50use std::time::Duration;
51use std::time::Instant;
52use tokio::runtime::Builder;
53use tokio::sync::broadcast::{self, Receiver, Sender};
54use tokio::sync::mpsc;
55use tokio::sync::oneshot;
56use tokio_tungstenite::WebSocketStream;
57use tracing::{debug, error, info, trace, warn};
58use tungstenite::error::CapacityError::MessageTooLong;
59use tungstenite::error::Error as WsError;
60use tungstenite::handshake;
61use tungstenite::protocol::Message;
62use tungstenite::protocol::WebSocketConfig;
63use nostr::key::FromPkStr;
64use nostr::key::Keys;
65
66#[allow(clippy::too_many_arguments)]
68async fn handle_web_request(
69 mut request: Request<Body>,
70 repo: Arc<dyn NostrRepo>,
71 settings: Settings,
72 remote_addr: SocketAddr,
73 broadcast: Sender<Event>,
74 event_tx: tokio::sync::mpsc::Sender<SubmittedEvent>,
75 payment_tx: tokio::sync::broadcast::Sender<PaymentMessage>,
76 shutdown: Receiver<()>,
77 favicon: Option<Vec<u8>>,
78 registry: Registry,
79 metrics: NostrMetrics,
80) -> Result<Response<Body>, Infallible> {
81 match (
82 request.uri().path(),
83 request.headers().contains_key(header::UPGRADE),
84 ) {
85 ("/", true) => {
87 trace!("websocket with upgrade request");
88 let response = match handshake::server::create_response_with_body(&request, || {
90 Body::empty()
91 }) {
92 Ok(response) => {
93 tokio::spawn(async move {
96 match upgrade::on(&mut request).await {
98 Ok(upgraded) => {
100 let config = WebSocketConfig {
102 max_send_queue: Some(1024),
103 max_message_size: settings.limits.max_ws_message_bytes,
104 max_frame_size: settings.limits.max_ws_frame_bytes,
105 ..Default::default()
106 };
107 let ws_stream = WebSocketStream::from_raw_socket(
109 upgraded,
112 tokio_tungstenite::tungstenite::protocol::Role::Server,
113 Some(config),
114 )
115 .await;
116 let origin = get_header_string("origin", request.headers());
117 let user_agent = get_header_string("user-agent", request.headers());
118 let header_ip = settings
120 .network
121 .remote_ip_header
122 .as_ref()
123 .and_then(|x| get_header_string(x, request.headers()));
124 let remote_ip =
126 header_ip.unwrap_or_else(|| remote_addr.ip().to_string());
127 let client_info = ClientInfo {
128 remote_ip,
129 user_agent,
130 origin,
131 };
132 tokio::spawn(nostr_server(
134 repo,
135 client_info,
136 settings,
137 ws_stream,
138 broadcast,
139 event_tx,
140 shutdown,
141 metrics,
142 ));
143 }
144 Err(e) => println!(
146 "error when trying to upgrade connection \
147 from address {remote_addr} to websocket connection. \
148 Error is: {e}",
149 ),
150 }
151 });
152 response
154 }
155 Err(error) => {
156 warn!("websocket response failed");
157 let mut res =
158 Response::new(Body::from(format!("Failed to create websocket: {error}")));
159 *res.status_mut() = StatusCode::BAD_REQUEST;
160 return Ok(res);
161 }
162 };
163 Ok::<_, Infallible>(response)
164 }
165 ("/", false) => {
167 let accept_header = &request.headers().get(ACCEPT);
170 if let Some(media_types) = accept_header {
172 if let Ok(mt_str) = media_types.to_str() {
173 if mt_str.contains("application/nostr+json") {
174 debug!("Responding to server info request");
176 let rinfo = RelayInfo::from(settings);
177 let b = Body::from(serde_json::to_string_pretty(&rinfo).unwrap());
178 return Ok(Response::builder()
179 .status(200)
180 .header("Content-Type", "application/nostr+json")
181 .header("Access-Control-Allow-Origin", "*")
182 .body(b)
183 .unwrap());
184 }
185 }
186 }
187
188 if settings.pay_to_relay.enabled {
190 return Ok(Response::builder()
191 .status(StatusCode::TEMPORARY_REDIRECT)
192 .header("location", "/join")
193 .body(Body::empty())
194 .unwrap());
195 }
196
197 Ok(Response::builder()
198 .status(200)
199 .header("Content-Type", "text/plain")
200 .body(Body::from("Please use a Nostr client to connect."))
201 .unwrap())
202 }
203 ("/metrics", false) => {
204 let mut buffer = vec![];
205 let encoder = TextEncoder::new();
206 let metric_families = registry.gather();
207 encoder.encode(&metric_families, &mut buffer).unwrap();
208
209 Ok(Response::builder()
210 .status(StatusCode::OK)
211 .header("Content-Type", "text/plain")
212 .body(Body::from(buffer))
213 .unwrap())
214 }
215 ("/favicon.ico", false) => {
216 if let Some(favicon_bytes) = favicon {
217 info!("returning favicon");
218 Ok(Response::builder()
219 .status(StatusCode::OK)
220 .header("Content-Type", "image/x-icon")
221 .header("Cache-Control", "public, max-age=2419200")
223 .body(Body::from(favicon_bytes))
224 .unwrap())
225 } else {
226 Ok(Response::builder()
227 .status(StatusCode::NOT_FOUND)
228 .body(Body::from(""))
229 .unwrap())
230 }
231 }
232 ("/lnbits", false) => {
234 let callback: payment::lnbits::LNBitsCallback =
235 serde_json::from_slice(&to_bytes(request.into_body()).await.unwrap()).unwrap();
236 debug!("LNBits callback: {callback:?}");
237
238 if let Err(e) = payment_tx.send(PaymentMessage::InvoicePaid(callback.payment_hash)) {
239 warn!("Could not send invoice update: {}", e);
240 return Ok(Response::builder()
241 .status(StatusCode::INTERNAL_SERVER_ERROR)
242 .body(Body::from("Error processing callback"))
243 .unwrap());
244 }
245
246 Ok(Response::builder()
247 .status(StatusCode::OK)
248 .body(Body::from("ok"))
249 .unwrap())
250 }
251 ("/terms", false) => Ok(Response::builder()
253 .status(200)
254 .header("Content-Type", "text/plain")
255 .body(Body::from(settings.pay_to_relay.terms_message))
256 .unwrap()),
257 ("/join", false) => {
259 if !settings.pay_to_relay.sign_ups {
261 return Ok(Response::builder()
262 .status(401)
263 .header("Content-Type", "text/plain")
264 .body(Body::from("Sorry, joining is not allowed at the moment"))
265 .unwrap());
266 }
267
268 let html = r#"
269<!doctype HTML>
270<head>
271 <meta charset="UTF-8">
272 <style>
273 body {
274 display: flex;
275 flex-direction: column;
276 align-items: center;
277 text-align: center;
278 font-family: Arial, sans-serif;
279 background-color: #6320a7;
280 color: white;
281 }
282
283 .container {
284 display: flex;
285 justify-content: center;
286 align-items: center;
287 height: 400px;
288 }
289
290 a {
291 color: pink;
292 }
293
294 input[type="text"] {
295 width: 100%;
296 max-width: 500px;
297 box-sizing: border-box;
298 overflow-x: auto;
299 white-space: nowrap;
300 }
301 </style>
302</head>
303<body>
304 <div style="width:75%;">
305 <h1>Enter your pubkey</h1>
306 <form action="/invoice" onsubmit="return checkForm(this);">
307 <input type="text" name="pubkey" id="pubkey-input"><br><br>
308 <input type="checkbox" id="terms" required>
309 <label for="terms">I agree to the <a href="/terms">terms and conditions</a></label><br><br>
310 <button type="submit">Submit</button>
311 </form>
312 <button id="get-public-key-btn">Get Public Key</button>
313 </div>
314 <script>
315 function checkForm(form) {
316 if (!form.terms.checked) {
317 alert("Please agree to the terms and conditions");
318 return false;
319 }
320 return true;
321 }
322
323 const pubkeyInput = document.getElementById('pubkey-input');
324 const getPublicKeyBtn = document.getElementById('get-public-key-btn');
325 getPublicKeyBtn.addEventListener('click', async function() {
326 try {
327 const publicKey = await window.nostr.getPublicKey();
328 pubkeyInput.value = publicKey;
329 } catch (error) {
330 console.error(error);
331 }
332 });
333 </script>
334</body>
335</html>
336 "#;
337 Ok(Response::builder()
338 .status(StatusCode::OK)
339 .body(Body::from(html))
340 .unwrap())
341 }
342 ("/invoice", false) => {
344 if !settings.pay_to_relay.sign_ups {
346 return Ok(Response::builder()
347 .status(401)
348 .header("Content-Type", "text/plain")
349 .body(Body::from("Sorry, joining is not allowed at the moment"))
350 .unwrap());
351 }
352
353 let pubkey = get_pubkey(request);
355
356 if pubkey.is_none() {
358 return Ok(Response::builder()
359 .status(404)
360 .header("location", "/join")
361 .body(Body::empty())
362 .unwrap());
363 }
364
365 let pubkey = pubkey.unwrap();
367 let key = Keys::from_pk_str(&pubkey);
368 if key.is_err() {
369 return Ok(Response::builder()
370 .status(401)
371 .header("Content-Type", "text/plain")
372 .body(Body::from("Looks like your key is invalid"))
373 .unwrap());
374 }
375
376 let payment_message;
378 if let Ok((admission_status, _)) = repo.get_account_balance(&key.unwrap()).await {
379 if admission_status {
380 return Ok(Response::builder()
381 .status(StatusCode::OK)
382 .body(Body::from("Already admitted"))
383 .unwrap());
384 } else {
385 payment_message = PaymentMessage::CheckAccount(pubkey.clone());
386 }
387 } else {
388 payment_message = PaymentMessage::NewAccount(pubkey.clone());
389 }
390
391 if payment_tx.send(payment_message).is_err() {
393 warn!("Could not send payment tx");
394 return Ok(Response::builder()
395 .status(501)
396 .header("Content-Type", "text/plain")
397 .body(Body::from("Sorry, something went wrong"))
398 .unwrap());
399 }
400
401 let mut invoice_info: Option<InvoiceInfo> = None;
403 while let Ok(msg) = payment_tx.subscribe().recv().await {
404 match msg {
405 PaymentMessage::Invoice(m_pubkey, m_invoice_info) => {
406 if m_pubkey == pubkey.clone() {
407 invoice_info = Some(m_invoice_info);
408 break;
409 }
410 }
411 PaymentMessage::AccountAdmitted(m_pubkey) => {
412 if m_pubkey == pubkey.clone() {
413 return Ok(Response::builder()
414 .status(StatusCode::OK)
415 .body(Body::from("Already admitted"))
416 .unwrap());
417 }
418 }
419 _ => (),
420 }
421 }
422
423 if invoice_info.is_none() {
425 return Ok(Response::builder()
426 .status(StatusCode::INTERNAL_SERVER_ERROR)
427 .body(Body::from("Sorry, could not get invoice"))
428 .unwrap());
429 }
430
431 let invoice_info = invoice_info.unwrap();
433
434 let qr_code: String;
435 if let Ok(code) = QrCode::new(invoice_info.bolt11.as_bytes()) {
436 qr_code = code
437 .render()
438 .min_dimensions(200, 200)
439 .dark_color(svg::Color("#800000"))
440 .light_color(svg::Color("#ffff80"))
441 .build();
442 } else {
443 qr_code = "Could not render image".to_string();
444 }
445
446 let html_result = format!(
447 r#"
448<!DOCTYPE html>
449<html>
450 <head>
451 <meta charset="UTF-8">
452 <style>
453 body {{
454 display: flex;
455 flex-direction: column;
456 align-items: center;
457 text-align: center;
458 font-family: Arial, sans-serif;
459 background-color: #6320a7 ;
460 color: white;
461 }}
462 #copy-button {{
463 background-color: #bb5f0d ;
464 color: white;
465 padding: 10px 20px;
466 border-radius: 5px;
467 border: none;
468 cursor: pointer;
469 }}
470 #copy-button:hover {{
471 background-color: #8f29f4;
472 }}
473 .container {{
474 display: flex;
475 justify-content: center;
476 align-items: center;
477 height: 400px;
478 }}
479 a {{
480 color: pink;
481 }}
482 </style>
483 </head>
484 <body>
485 <div style="width:75%;">
486 <h3>
487 To use this relay, an admission fee of {} sats is required. By paying the fee, you agree to the <a href='terms'>terms</a>.
488 </h3>
489 </div>
490 <div>
491 <div style="max-height: 300px;">
492 {}
493 </div>
494 </div>
495 <div>
496 <div style="width: 75%;">
497 <p style="overflow-wrap: break-word; width: 500px;">{}</p>
498 <button id="copy-button">Copy</button>
499 </div>
500 <div>
501 <p> This page will not refresh </p>
502 <p> Verify admission <a href=/account?pubkey={}>here</a> once you have paid</p>
503 </div>
504 </div>
505 </body>
506</html>
507
508
509<script>
510 const copyButton = document.getElementById("copy-button");
511 if (navigator.clipboard) {{
512 copyButton.addEventListener("click", function() {{
513 const textToCopy = "{}";
514 navigator.clipboard.writeText(textToCopy).then(function() {{
515 console.log("Text copied to clipboard");
516 }}, function(err) {{
517 console.error("Could not copy text: ", err);
518 }});
519 }});
520 }} else {{
521 copyButton.style.display = "none";
522 console.warn("Clipboard API is not supported in this browser");
523 }}
524</script>
525"#,
526 settings.pay_to_relay.admission_cost,
527 qr_code,
528 invoice_info.bolt11,
529 pubkey,
530 invoice_info.bolt11
531 );
532
533 Ok(Response::builder()
534 .status(StatusCode::OK)
535 .body(Body::from(html_result))
536 .unwrap())
537 }
538 ("/account", false) => {
539 if !settings.pay_to_relay.enabled {
541 return Ok(Response::builder()
542 .status(401)
543 .header("Content-Type", "text/plain")
544 .body(Body::from("This relay is not paid"))
545 .unwrap());
546 }
547
548 let pubkey = get_pubkey(request);
550
551 if pubkey.is_none() {
553 return Ok(Response::builder()
554 .status(404)
555 .header("location", "/join")
556 .body(Body::empty())
557 .unwrap());
558 }
559
560 let pubkey = pubkey.unwrap();
562 let key = Keys::from_pk_str(&pubkey);
563 if key.is_err() {
564 return Ok(Response::builder()
565 .status(401)
566 .header("Content-Type", "text/plain")
567 .body(Body::from("Looks like your key is invalid"))
568 .unwrap());
569 }
570
571 let text =
573 if let Ok((admission_status, _)) = repo.get_account_balance(&key.unwrap()).await {
574 if admission_status {
575 r#"<span style="color: green;">is</span>"#
576 } else {
577 r#"<span style="color: red;">is not</span>"#
578 }
579 } else {
580 "Could not get admission status"
581 };
582
583 let html_result = format!(
584 r#"
585 <!DOCTYPE html>
586<html>
587 <head>
588 <meta charset="UTF-8">
589 <style>
590 body {{
591 display: flex;
592 flex-direction: column;
593 align-items: center;
594 text-align: center;
595 font-family: Arial, sans-serif;
596 background-color: #6320a7;
597 color: white;
598 height: 100vh;
599 }}
600 </style>
601 </head>
602 <body>
603 <div>
604 <h5>{} {} admitted</h5>
605 </div>
606 </body>
607</html>
608
609
610 "#,
611 pubkey, text
612 );
613
614 Ok(Response::builder()
615 .status(StatusCode::OK)
616 .body(Body::from(html_result))
617 .unwrap())
618 }
619 (_, _) => {
621 Ok(Response::builder()
623 .status(StatusCode::NOT_FOUND)
624 .body(Body::from("Nothing here."))
625 .unwrap())
626 }
627 }
628}
629
630fn get_pubkey(request: Request<Body>) -> Option<String> {
632 let query = request.uri().query().unwrap_or("").to_string();
633
634 query.split('&').fold(None, |acc, pair| {
636 let mut parts = pair.splitn(2, '=');
637 let key = parts.next();
638 let value = parts.next();
639 if key == Some("pubkey") {
640 return value.map(|s| s.to_owned());
641 }
642 acc
643 })
644}
645
646fn get_header_string(header: &str, headers: &HeaderMap) -> Option<String> {
647 headers
648 .get(header)
649 .and_then(|x| x.to_str().ok().map(std::string::ToString::to_string))
650}
651
652async fn ctrl_c_or_signal(mut shutdown_signal: Receiver<()>) {
654 let mut term_signal = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
655 .expect("could not define signal");
656 loop {
657 tokio::select! {
658 _ = shutdown_signal.recv() => {
659 info!("Shutting down webserver as requested");
660 break;
662 },
663 _ = tokio::signal::ctrl_c() => {
664 info!("Shutting down webserver due to SIGINT");
665 break;
666 },
667 _ = term_signal.recv() => {
668 info!("Shutting down webserver due to SIGTERM");
669 break;
670 },
671 }
672 }
673}
674
675fn create_metrics() -> (Registry, NostrMetrics) {
676 let registry = Registry::new();
678
679 let query_sub = Histogram::with_opts(HistogramOpts::new(
680 "nostr_query_seconds",
681 "Subscription response times",
682 ))
683 .unwrap();
684 let query_db = Histogram::with_opts(HistogramOpts::new(
685 "nostr_filter_seconds",
686 "Filter SQL query times",
687 ))
688 .unwrap();
689 let write_events = Histogram::with_opts(HistogramOpts::new(
690 "nostr_events_write_seconds",
691 "Event writing response times",
692 ))
693 .unwrap();
694 let sent_events = IntCounterVec::new(
695 Opts::new("nostr_events_sent_total", "Events sent to clients"),
696 vec!["source"].as_slice(),
697 )
698 .unwrap();
699 let connections =
700 IntCounter::with_opts(Opts::new("nostr_connections_total", "New connections")).unwrap();
701 let db_connections = IntGauge::with_opts(Opts::new(
702 "nostr_db_connections",
703 "Active database connections",
704 ))
705 .unwrap();
706 let query_aborts = IntCounterVec::new(
707 Opts::new("nostr_query_abort_total", "Aborted queries"),
708 vec!["reason"].as_slice(),
709 )
710 .unwrap();
711 let cmd_req = IntCounter::with_opts(Opts::new("nostr_cmd_req_total", "REQ commands")).unwrap();
712 let cmd_event =
713 IntCounter::with_opts(Opts::new("nostr_cmd_event_total", "EVENT commands")).unwrap();
714 let cmd_close =
715 IntCounter::with_opts(Opts::new("nostr_cmd_close_total", "CLOSE commands")).unwrap();
716 let cmd_auth =
717 IntCounter::with_opts(Opts::new("nostr_cmd_auth_total", "AUTH commands")).unwrap();
718 let disconnects = IntCounterVec::new(
719 Opts::new("nostr_disconnects_total", "Client disconnects"),
720 vec!["reason"].as_slice(),
721 )
722 .unwrap();
723 registry.register(Box::new(query_sub.clone())).unwrap();
724 registry.register(Box::new(query_db.clone())).unwrap();
725 registry.register(Box::new(write_events.clone())).unwrap();
726 registry.register(Box::new(sent_events.clone())).unwrap();
727 registry.register(Box::new(connections.clone())).unwrap();
728 registry.register(Box::new(db_connections.clone())).unwrap();
729 registry.register(Box::new(query_aborts.clone())).unwrap();
730 registry.register(Box::new(cmd_req.clone())).unwrap();
731 registry.register(Box::new(cmd_event.clone())).unwrap();
732 registry.register(Box::new(cmd_close.clone())).unwrap();
733 registry.register(Box::new(cmd_auth.clone())).unwrap();
734 registry.register(Box::new(disconnects.clone())).unwrap();
735 let metrics = NostrMetrics {
736 query_sub,
737 query_db,
738 write_events,
739 sent_events,
740 connections,
741 db_connections,
742 disconnects,
743 query_aborts,
744 cmd_req,
745 cmd_event,
746 cmd_close,
747 cmd_auth,
748 };
749 (registry, metrics)
750}
751
752fn file_bytes(path: &str) -> Result<Vec<u8>> {
753 let f = File::open(path)?;
754 let mut reader = BufReader::new(f);
755 let mut buffer = Vec::new();
756 reader.read_to_end(&mut buffer)?;
758 Ok(buffer)
759}
760
761pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Result<(), Error> {
763 trace!("Config: {:?}", settings);
764 if !Path::new(&settings.database.data_directory).is_dir() {
766 error!("Database directory does not exist");
767 return Err(Error::DatabaseDirError);
768 }
769 let addr = format!(
770 "{}:{}",
771 settings.network.address.trim(),
772 settings.network.port
773 );
774 let socket_addr = addr.parse().expect("listening address not valid");
775 if let Some(addr_whitelist) = &settings.authorization.pubkey_whitelist {
777 info!(
778 "Event publishing restricted to {} pubkey(s)",
779 addr_whitelist.len()
780 );
781 }
782 if settings.verified_users.is_active() {
784 info!(
785 "NIP-05 user verification mode:{:?}",
786 settings.verified_users.mode
787 );
788 if let Some(d) = settings.verified_users.verify_update_duration() {
789 info!("NIP-05 check user verification every: {:?}", d);
790 }
791 if let Some(d) = settings.verified_users.verify_expiration_duration() {
792 info!("NIP-05 user verification expires after: {:?}", d);
793 }
794 if let Some(wl) = &settings.verified_users.domain_whitelist {
795 info!("NIP-05 domain whitelist: {:?}", wl);
796 }
797 if let Some(bl) = &settings.verified_users.domain_blacklist {
798 info!("NIP-05 domain blacklist: {:?}", bl);
799 }
800 }
801 let rt = Builder::new_multi_thread()
803 .enable_all()
804 .thread_name_fn(|| {
805 static ATOMIC_ID: std::sync::atomic::AtomicUsize =
807 std::sync::atomic::AtomicUsize::new(0);
808 let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
809 format!("tokio-ws-{id}")
810 })
811 .max_blocking_threads(settings.limits.max_blocking_threads)
813 .on_thread_start(|| {
814 trace!("started new thread: {:?}", std::thread::current().name());
815 })
816 .on_thread_stop(|| {
817 trace!("stopped thread: {:?}", std::thread::current().name());
818 })
819 .build()
820 .unwrap();
821 rt.block_on(async {
823 let broadcast_buffer_limit = settings.limits.broadcast_buffer;
824 let persist_buffer_limit = settings.limits.event_persist_buffer;
825 let verified_users_active = settings.verified_users.is_active();
826 let settings = settings.clone();
827 info!("listening on: {}", socket_addr);
828 let (bcast_tx, _) = broadcast::channel::<Event>(broadcast_buffer_limit);
833 let (event_tx, event_rx) = mpsc::channel::<SubmittedEvent>(persist_buffer_limit);
836 let (invoke_shutdown, shutdown_listen) = broadcast::channel::<()>(1);
839 let (metadata_tx, metadata_rx) = broadcast::channel::<Event>(4096);
848
849 let (payment_tx, payment_rx) = broadcast::channel::<PaymentMessage>(4096);
850
851 let (registry, metrics) = create_metrics();
852
853 let repo = db::build_repo(&settings, metrics.clone()).await;
855 tokio::task::spawn(db::db_writer(
859 repo.clone(),
860 settings.clone(),
861 event_rx,
862 bcast_tx.clone(),
863 metadata_tx.clone(),
864 payment_tx.clone(),
865 shutdown_listen,
866 ));
867 info!("db writer created");
868
869 if settings.verified_users.mode != VerifiedUsersMode::Disabled {
871 let verifier_opt = nip05::Verifier::new(
872 repo.clone(),
873 metadata_rx,
874 bcast_tx.clone(),
875 settings.clone(),
876 );
877 if let Ok(mut v) = verifier_opt {
878 if verified_users_active {
879 tokio::task::spawn(async move {
880 info!("starting up NIP-05 verifier...");
881 v.run().await;
882 });
883 }
884 }
885 }
886
887 if settings.pay_to_relay.enabled {
889 let payment_opt = payment::Payment::new(
890 repo.clone(),
891 payment_tx.clone(),
892 payment_rx,
893 bcast_tx.clone(),
894 settings.clone(),
895 );
896 if let Ok(mut p) = payment_opt {
897 tokio::task::spawn(async move {
898 info!("starting payment process ...");
899 p.run().await;
900 });
901 }
902 }
903
904 let controlled_shutdown = invoke_shutdown.clone();
906 tokio::spawn(async move {
907 info!("control message listener started");
908 match shutdown_rx.recv() {
909 Ok(()) => {
910 info!("control message requesting shutdown");
911 controlled_shutdown.send(()).ok();
912 }
913 Err(std::sync::mpsc::RecvError) => {
914 trace!("shutdown requestor is disconnected (this is normal)");
915 }
916 };
917 });
918 let ctrl_c_shutdown = invoke_shutdown.clone();
920 let webserver_shutdown_listen = invoke_shutdown.subscribe();
922
923 tokio::spawn(async move {
924 tokio::signal::ctrl_c().await.unwrap();
925 info!("shutting down due to SIGINT (main)");
926 ctrl_c_shutdown.send(()).ok();
927 });
928 let favicon = settings.info.favicon.as_ref().and_then(|x| {
934 info!("reading favicon...");
935 file_bytes(x).ok()
936 });
937
938 let make_svc = make_service_fn(|conn: &AddrStream| {
941 let repo = repo.clone();
942 let remote_addr = conn.remote_addr();
943 let bcast = bcast_tx.clone();
944 let event = event_tx.clone();
945 let payment_tx = payment_tx.clone();
946 let stop = invoke_shutdown.clone();
947 let settings = settings.clone();
948 let favicon = favicon.clone();
949 let registry = registry.clone();
950 let metrics = metrics.clone();
951 async move {
952 Ok::<_, Infallible>(service_fn(move |request: Request<Body>| {
954 handle_web_request(
955 request,
956 repo.clone(),
957 settings.clone(),
958 remote_addr,
959 bcast.clone(),
960 event.clone(),
961 payment_tx.clone(),
962 stop.subscribe(),
963 favicon.clone(),
964 registry.clone(),
965 metrics.clone(),
966 )
967 }))
968 }
969 });
970 let server = Server::bind(&socket_addr)
971 .serve(make_svc)
972 .with_graceful_shutdown(ctrl_c_or_signal(webserver_shutdown_listen));
973 if let Err(e) = server.await {
975 eprintln!("server error: {e}");
976 }
977 });
978 Ok(())
979}
980
981#[derive(Deserialize, Serialize, Clone, PartialEq, Eq, Debug)]
983#[serde(untagged)]
984pub enum NostrMessage {
985 EventMsg(EventCmd),
987 SubMsg(Subscription),
989 CloseMsg(CloseCmd),
991}
992
993fn convert_to_msg(msg: &str, max_bytes: Option<usize>) -> Result<NostrMessage> {
995 let parsed_res: Result<NostrMessage> =
996 serde_json::from_str(msg).map_err(std::convert::Into::into);
997 match parsed_res {
998 Ok(m) => {
999 if let NostrMessage::SubMsg(_) = m {
1000 trace!("REQ: {:?}", msg);
1002 };
1003 if let NostrMessage::EventMsg(_) = m {
1004 if let Some(max_size) = max_bytes {
1005 if msg.len() > max_size && max_size > 0 {
1007 return Err(Error::EventMaxLengthError(msg.len()));
1008 }
1009 }
1010 }
1011 Ok(m)
1012 }
1013 Err(e) => {
1014 trace!("proto parse error: {:?}", e);
1015 trace!("parse error on message: {:?}", msg.trim());
1016 Err(Error::ProtoParseError)
1017 }
1018 }
1019}
1020
1021fn make_notice_message(notice: &Notice) -> Message {
1023 let json = match notice {
1024 Notice::Message(ref msg) => json!(["NOTICE", msg]),
1025 Notice::EventResult(ref res) => json!(["OK", res.id, res.status.to_bool(), res.msg]),
1026 Notice::AuthChallenge(ref challenge) => json!(["AUTH", challenge]),
1027 };
1028
1029 Message::text(json.to_string())
1030}
1031
1032fn allowed_to_send(event_str: &String, conn: &conn::ClientConn, settings: &Settings) -> bool {
1033 if settings.authorization.nip42_dms {
1035 match serde_json::from_str::<Event>(event_str) {
1036 Ok(event) => {
1037 if event.kind == 4 {
1038 match (conn.auth_pubkey(), event.tag_values_by_name("p").first()) {
1039 (Some(auth_pubkey), Some(recipient_pubkey)) => {
1040 recipient_pubkey == auth_pubkey || &event.pubkey == auth_pubkey
1041 },
1042 (_, _) => {
1043 false
1044 },
1045 }
1046 } else {
1047 true
1048 }
1049 },
1050 Err(_) => false
1051 }
1052 } else {
1053 true
1054 }
1055}
1056
1057struct ClientInfo {
1058 remote_ip: String,
1059 user_agent: Option<String>,
1060 origin: Option<String>,
1061}
1062
1063#[allow(clippy::too_many_arguments)]
1066async fn nostr_server(
1067 repo: Arc<dyn NostrRepo>,
1068 client_info: ClientInfo,
1069 settings: Settings,
1070 mut ws_stream: WebSocketStream<Upgraded>,
1071 broadcast: Sender<Event>,
1072 event_tx: mpsc::Sender<SubmittedEvent>,
1073 mut shutdown: Receiver<()>,
1074 metrics: NostrMetrics,
1075) {
1076 let orig_start = Instant::now();
1078 let mut bcast_rx = broadcast.subscribe();
1080 let mut conn = conn::ClientConn::new(client_info.remote_ip);
1082 let mut sub_lim_opt = None;
1084 let jitter = Jitter::up_to(Duration::from_millis(100));
1086 let sub_per_min_setting = settings.limits.subscriptions_per_min;
1087 if let Some(sub_per_min) = sub_per_min_setting {
1088 if sub_per_min > 0 {
1089 trace!("Rate limits for sub creation ({}/min)", sub_per_min);
1090 let quota_time = core::num::NonZeroU32::new(sub_per_min).unwrap();
1091 let quota = Quota::per_minute(quota_time);
1092 sub_lim_opt = Some(RateLimiter::direct(quota));
1093 }
1094 }
1095 let cid = conn.get_client_prefix();
1097 let (query_tx, mut query_rx) = mpsc::channel::<db::QueryResult>(20_000);
1102 let (notice_tx, mut notice_rx) = mpsc::channel::<Notice>(128);
1104
1105 let mut last_message_time = Instant::now();
1107
1108 let default_ping_dur = Duration::from_secs(settings.network.ping_interval_seconds.into());
1110
1111 let max_quiet_time = Duration::from_secs(60 * 20);
1113
1114 let start = tokio::time::Instant::now() + default_ping_dur;
1115 let mut ping_interval = tokio::time::interval_at(start, default_ping_dur);
1116
1117 let mut running_queries: HashMap<String, oneshot::Sender<()>> = HashMap::new();
1121 let mut client_published_event_count: usize = 0;
1124 let mut client_received_event_count: usize = 0;
1125
1126 let unspec = "<unspecified>".to_string();
1127 info!("new client connection (cid: {}, ip: {:?})", cid, conn.ip());
1128 let origin = client_info.origin.as_ref().unwrap_or(&unspec);
1129 let user_agent = client_info.user_agent.as_ref().unwrap_or(&unspec);
1130 info!(
1131 "cid: {}, origin: {:?}, user-agent: {:?}",
1132 cid, origin, user_agent
1133 );
1134
1135 metrics.connections.inc();
1137
1138 if settings.authorization.nip42_auth {
1139 conn.generate_auth_challenge();
1140 if let Some(challenge) = conn.auth_challenge() {
1141 ws_stream
1142 .send(make_notice_message(&Notice::AuthChallenge(
1143 challenge.to_string(),
1144 )))
1145 .await
1146 .ok();
1147 }
1148 }
1149
1150 loop {
1151 tokio::select! {
1152 _ = shutdown.recv() => {
1153 metrics.disconnects.with_label_values(&["shutdown"]).inc();
1154 info!("Close connection down due to shutdown, client: {}, ip: {:?}, connected: {:?}", cid, conn.ip(), orig_start.elapsed());
1155 break;
1157 },
1158 _ = ping_interval.tick() => {
1159 if last_message_time.elapsed() > max_quiet_time {
1162 debug!("ending connection due to lack of client ping response");
1163 metrics.disconnects.with_label_values(&["timeout"]).inc();
1164 break;
1165 }
1166 ws_stream.send(Message::Ping(Vec::new())).await.ok();
1168 },
1169 Some(notice_msg) = notice_rx.recv() => {
1170 ws_stream.send(make_notice_message(¬ice_msg)).await.ok();
1171 },
1172 Some(query_result) = query_rx.recv() => {
1173 let subesc = query_result.sub_id.replace('"', "");
1175 if query_result.event == "EOSE" {
1176 let send_str = format!("[\"EOSE\",\"{subesc}\"]");
1177 ws_stream.send(Message::Text(send_str)).await.ok();
1178 } else if allowed_to_send(&query_result.event, &conn, &settings) {
1179 metrics.sent_events.with_label_values(&["db"]).inc();
1180 client_received_event_count += 1;
1181 let send_str = format!("[\"EVENT\",\"{}\",{}]", subesc, &query_result.event);
1183 ws_stream.send(Message::Text(send_str)).await.ok();
1184 }
1185 },
1186 Ok(global_event) = bcast_rx.recv() => {
1188 for (s, sub) in conn.subscriptions() {
1191 if !sub.interested_in_event(&global_event) {
1192 continue;
1193 }
1194 if let Ok(event_str) = serde_json::to_string(&global_event) {
1197 if allowed_to_send(&event_str, &conn, &settings) {
1198 trace!("sub match for client: {}, sub: {:?}, event: {:?}",
1200 cid, s,
1201 global_event.get_event_id_prefix());
1202 let subesc = s.replace('"', "");
1203 metrics.sent_events.with_label_values(&["realtime"]).inc();
1204 ws_stream.send(Message::Text(format!("[\"EVENT\",\"{subesc}\",{event_str}]"))).await.ok();
1205 }
1206 } else {
1207 warn!("could not serialize event: {:?}", global_event.get_event_id_prefix());
1208 }
1209 }
1210 },
1211 ws_next = ws_stream.next() => {
1212 last_message_time = Instant::now();
1214 let nostr_msg = match ws_next {
1216 Some(Ok(Message::Text(m))) => {
1217 convert_to_msg(&m,settings.limits.max_event_bytes)
1218 },
1219 Some(Ok(Message::Binary(_))) => {
1220 ws_stream.send(
1221 make_notice_message(&Notice::message("binary messages are not accepted".into()))).await.ok();
1222 continue;
1223 },
1224 Some(Ok(Message::Ping(_) | Message::Pong(_))) => {
1225 continue;
1228 },
1229 Some(Err(WsError::Capacity(MessageTooLong{size, max_size}))) => {
1230 ws_stream.send(
1231 make_notice_message(&Notice::message(format!("message too large ({size} > {max_size})")))).await.ok();
1232 continue;
1233 },
1234 None |
1235 Some(Ok(Message::Close(_)) |
1236 Err(WsError::AlreadyClosed | WsError::ConnectionClosed |
1237 WsError::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake)))
1238 => {
1239 debug!("websocket close from client (cid: {}, ip: {:?})",cid, conn.ip());
1240 metrics.disconnects.with_label_values(&["normal"]).inc();
1241 break;
1242 },
1243 Some(Err(WsError::Io(e))) => {
1244 warn!("IO error (cid: {}, ip: {:?}): {:?}", cid, conn.ip(), e);
1246 metrics.disconnects.with_label_values(&["error"]).inc();
1247
1248 break;
1249 }
1250 x => {
1251 info!("unknown error (cid: {}, ip: {:?}): {:?} (closing conn)", cid, conn.ip(), x);
1253 metrics.disconnects.with_label_values(&["error"]).inc();
1254
1255 break;
1256 }
1257 };
1258
1259 match nostr_msg {
1261 Ok(NostrMessage::EventMsg(ec)) => {
1262 let evid = ec.event_id().to_owned();
1265 let parsed : Result<EventWrapper> = Result::<EventWrapper>::from(ec);
1266 metrics.cmd_event.inc();
1267 match parsed {
1268 Ok(WrappedEvent(e)) => {
1269 metrics.cmd_event.inc();
1270 let id_prefix:String = e.id.chars().take(8).collect();
1271 debug!("successfully parsed/validated event: {:?} (cid: {}, kind: {})", id_prefix, cid, e.kind);
1272 if e.is_expired() {
1274 let notice = Notice::invalid(e.id, "The event has already expired");
1275 ws_stream.send(make_notice_message(¬ice)).await.ok();
1276 } else if e.is_valid_timestamp(settings.options.reject_future_seconds) {
1278 let auth_pubkey = conn.auth_pubkey().and_then(|pubkey| hex::decode(pubkey).ok());
1280 let submit_event = SubmittedEvent {
1281 event: e.clone(),
1282 notice_tx: notice_tx.clone(),
1283 source_ip: conn.ip().to_string(),
1284 origin: client_info.origin.clone(),
1285 user_agent: client_info.user_agent.clone(),
1286 auth_pubkey };
1287 event_tx.send(submit_event).await.ok();
1288 client_published_event_count += 1;
1289 } else {
1290 info!("client: {} sent a far future-dated event", cid);
1291 if let Some(fut_sec) = settings.options.reject_future_seconds {
1292 let msg = format!("The event created_at field is out of the acceptable range (+{fut_sec}sec) for this relay.");
1293 let notice = Notice::invalid(e.id, &msg);
1294 ws_stream.send(make_notice_message(¬ice)).await.ok();
1295 }
1296 }
1297 },
1298 Ok(WrappedAuth(event)) => {
1299 metrics.cmd_auth.inc();
1300 if settings.authorization.nip42_auth {
1301 let id_prefix:String = event.id.chars().take(8).collect();
1302 debug!("successfully parsed auth: {:?} (cid: {})", id_prefix, cid);
1303 match &settings.info.relay_url {
1304 None => {
1305 error!("AUTH command received, but relay_url is not set in the config file (cid: {})", cid);
1306 },
1307 Some(relay) => {
1308 match conn.authenticate(&event, relay) {
1309 Ok(_) => {
1310 let pubkey = match conn.auth_pubkey() {
1311 Some(k) => k.chars().take(8).collect(),
1312 None => "<unspecified>".to_string(),
1313 };
1314 info!("client is authenticated: (cid: {}, pubkey: {:?})", cid, pubkey);
1315 },
1316 Err(e) => {
1317 info!("authentication error: {} (cid: {})", e, cid);
1318 ws_stream.send(make_notice_message(&Notice::restricted(event.id, format!("authentication error: {e}").as_str()))).await.ok();
1319 },
1320 }
1321 }
1322 }
1323 } else {
1324 let e = CommandUnknownError;
1325 info!("client sent an invalid event (cid: {})", cid);
1326 ws_stream.send(make_notice_message(&Notice::invalid(evid, &format!("{e}")))).await.ok();
1327 }
1328 },
1329 Err(e) => {
1330 metrics.cmd_event.inc();
1331 info!("client sent an invalid event (cid: {})", cid);
1332 ws_stream.send(make_notice_message(&Notice::invalid(evid, &format!("{e}")))).await.ok();
1333 }
1334 }
1335 },
1336 Ok(NostrMessage::SubMsg(s)) => {
1337 debug!("subscription requested (cid: {}, sub: {:?})", cid, s.id);
1338 if conn.has_subscription(&s) {
1345 info!("client sent duplicate subscription, ignoring (cid: {}, sub: {:?})", cid, s.id);
1346 } else {
1347 metrics.cmd_req.inc();
1348 if let Some(ref lim) = sub_lim_opt {
1349 lim.until_ready_with_jitter(jitter).await;
1350 }
1351 let (abandon_query_tx, abandon_query_rx) = oneshot::channel::<()>();
1352 match conn.subscribe(s.clone()) {
1353 Ok(()) => {
1354 if let Some(previous_query) = running_queries.insert(s.id.clone(), abandon_query_tx) {
1356 previous_query.send(()).ok();
1357 }
1358 if s.needs_historical_events() {
1359 repo.query_subscription(s, cid.clone(), query_tx.clone(), abandon_query_rx).await.ok();
1361 }
1362 },
1363 Err(e) => {
1364 info!("Subscription error: {} (cid: {}, sub: {:?})", e, cid, s.id);
1365 ws_stream.send(make_notice_message(&Notice::message(format!("Subscription error: {e}")))).await.ok();
1366 }
1367 }
1368 }
1369 },
1370 Ok(NostrMessage::CloseMsg(cc)) => {
1371 let parsed : Result<Close> = Result::<Close>::from(cc);
1373 if let Ok(c) = parsed {
1374 metrics.cmd_close.inc();
1375 let stop_tx = running_queries.remove(&c.id);
1378 if let Some(tx) = stop_tx {
1379 tx.send(()).ok();
1380 }
1381 conn.unsubscribe(&c);
1384 } else {
1385 info!("invalid command ignored");
1386 ws_stream.send(make_notice_message(&Notice::message("could not parse command".into()))).await.ok();
1387 }
1388 },
1389 Err(Error::ConnError) => {
1390 debug!("got connection close/error, disconnecting cid: {}, ip: {:?}",cid, conn.ip());
1391 break;
1392 }
1393 Err(Error::EventMaxLengthError(s)) => {
1394 info!("client sent command larger ({} bytes) than max size (cid: {})", s, cid);
1395 ws_stream.send(make_notice_message(&Notice::message("event exceeded max size".into()))).await.ok();
1396 },
1397 Err(Error::ProtoParseError) => {
1398 info!("client sent command that could not be parsed (cid: {})", cid);
1399 ws_stream.send(make_notice_message(&Notice::message("could not parse command".into()))).await.ok();
1400 },
1401 Err(e) => {
1402 info!("got non-fatal error from client (cid: {}, error: {:?}", cid, e);
1403 },
1404 }
1405 },
1406 }
1407 }
1408 for (_, stop_tx) in running_queries {
1410 stop_tx.send(()).ok();
1411 }
1412 info!(
1413 "stopping client connection (cid: {}, ip: {:?}, sent: {} events, recv: {} events, connected: {:?})",
1414 cid,
1415 conn.ip(),
1416 client_published_event_count,
1417 client_received_event_count,
1418 orig_start.elapsed()
1419 );
1420}
1421
1422#[derive(Clone)]
1423pub struct NostrMetrics {
1424 pub query_sub: Histogram, pub query_db: Histogram, pub db_connections: IntGauge, pub write_events: Histogram, pub sent_events: IntCounterVec, pub connections: IntCounter, pub disconnects: IntCounterVec, pub query_aborts: IntCounterVec, pub cmd_req: IntCounter, pub cmd_event: IntCounter, pub cmd_close: IntCounter, pub cmd_auth: IntCounter, }