1use crate::close::Close;
3use crate::close::CloseCmd;
4use crate::config::{Settings, VerifiedUsersMode};
5use crate::conn;
6use crate::db;
7use crate::db::SubmittedEvent;
8use crate::error::{Error, Result};
9use crate::event::Event;
10use crate::event::EventCmd;
11use crate::event::EventWrapper;
12use crate::info::RelayInfo;
13use crate::nip05;
14use crate::notice::Notice;
15use crate::payment;
16use crate::payment::InvoiceInfo;
17use crate::payment::PaymentMessage;
18use crate::repo::NostrRepo;
19use crate::server::Error::CommandUnknownError;
20use crate::server::EventWrapper::{WrappedAuth, WrappedEvent};
21use crate::subscription::Subscription;
22use futures::SinkExt;
23use futures::StreamExt;
24use governor::{Jitter, Quota, RateLimiter};
25use http::header::HeaderMap;
26use hyper::body::to_bytes;
27use hyper::header::ACCEPT;
28use hyper::service::{make_service_fn, service_fn};
29use hyper::upgrade::Upgraded;
30use hyper::{
31    header, server::conn::AddrStream, upgrade, Body, Request, Response, Server, StatusCode,
32};
33use prometheus::IntCounterVec;
34use prometheus::IntGauge;
35use prometheus::{Encoder, Histogram, HistogramOpts, IntCounter, Opts, Registry, TextEncoder};
36use qrcode::render::svg;
37use qrcode::QrCode;
38use serde::{Deserialize, Serialize};
39use serde_json::json;
40use std::collections::HashMap;
41use std::convert::Infallible;
42use std::fs::File;
43use std::io::BufReader;
44use std::io::Read;
45use std::net::SocketAddr;
46use std::path::Path;
47use std::sync::atomic::Ordering;
48use std::sync::mpsc::Receiver as MpscReceiver;
49use std::sync::Arc;
50use std::time::Duration;
51use std::time::Instant;
52use tokio::runtime::Builder;
53use tokio::sync::broadcast::{self, Receiver, Sender};
54use tokio::sync::mpsc;
55use tokio::sync::oneshot;
56use tokio_tungstenite::WebSocketStream;
57use tracing::{debug, error, info, trace, warn};
58use tungstenite::error::CapacityError::MessageTooLong;
59use tungstenite::error::Error as WsError;
60use tungstenite::handshake;
61use tungstenite::protocol::Message;
62use tungstenite::protocol::WebSocketConfig;
63use nostr::key::FromPkStr;
64use nostr::key::Keys;
65
66#[allow(clippy::too_many_arguments)]
68async fn handle_web_request(
69    mut request: Request<Body>,
70    repo: Arc<dyn NostrRepo>,
71    settings: Settings,
72    remote_addr: SocketAddr,
73    broadcast: Sender<Event>,
74    event_tx: tokio::sync::mpsc::Sender<SubmittedEvent>,
75    payment_tx: tokio::sync::broadcast::Sender<PaymentMessage>,
76    shutdown: Receiver<()>,
77    favicon: Option<Vec<u8>>,
78    registry: Registry,
79    metrics: NostrMetrics,
80) -> Result<Response<Body>, Infallible> {
81    match (
82        request.uri().path(),
83        request.headers().contains_key(header::UPGRADE),
84    ) {
85        ("/", true) => {
87            trace!("websocket with upgrade request");
88            let response = match handshake::server::create_response_with_body(&request, || {
90                Body::empty()
91            }) {
92                Ok(response) => {
93                    tokio::spawn(async move {
96                        match upgrade::on(&mut request).await {
98                            Ok(upgraded) => {
100                                let config = WebSocketConfig {
102                                    max_send_queue: Some(1024),
103                                    max_message_size: settings.limits.max_ws_message_bytes,
104                                    max_frame_size: settings.limits.max_ws_frame_bytes,
105                                    ..Default::default()
106                                };
107                                let ws_stream = WebSocketStream::from_raw_socket(
109                                    upgraded,
112                                    tokio_tungstenite::tungstenite::protocol::Role::Server,
113                                    Some(config),
114                                )
115                                .await;
116                                let origin = get_header_string("origin", request.headers());
117                                let user_agent = get_header_string("user-agent", request.headers());
118                                let header_ip = settings
120                                    .network
121                                    .remote_ip_header
122                                    .as_ref()
123                                    .and_then(|x| get_header_string(x, request.headers()));
124                                let remote_ip =
126                                    header_ip.unwrap_or_else(|| remote_addr.ip().to_string());
127                                let client_info = ClientInfo {
128                                    remote_ip,
129                                    user_agent,
130                                    origin,
131                                };
132                                tokio::spawn(nostr_server(
134                                    repo,
135                                    client_info,
136                                    settings,
137                                    ws_stream,
138                                    broadcast,
139                                    event_tx,
140                                    shutdown,
141                                    metrics,
142                                ));
143                            }
144                            Err(e) => println!(
146                                "error when trying to upgrade connection \
147                                 from address {remote_addr} to websocket connection. \
148                                 Error is: {e}",
149                            ),
150                        }
151                    });
152                    response
154                }
155                Err(error) => {
156                    warn!("websocket response failed");
157                    let mut res =
158                        Response::new(Body::from(format!("Failed to create websocket: {error}")));
159                    *res.status_mut() = StatusCode::BAD_REQUEST;
160                    return Ok(res);
161                }
162            };
163            Ok::<_, Infallible>(response)
164        }
165        ("/", false) => {
167            let accept_header = &request.headers().get(ACCEPT);
170            if let Some(media_types) = accept_header {
172                if let Ok(mt_str) = media_types.to_str() {
173                    if mt_str.contains("application/nostr+json") {
174                        debug!("Responding to server info request");
176                        let rinfo = RelayInfo::from(settings);
177                        let b = Body::from(serde_json::to_string_pretty(&rinfo).unwrap());
178                        return Ok(Response::builder()
179                            .status(200)
180                            .header("Content-Type", "application/nostr+json")
181                            .header("Access-Control-Allow-Origin", "*")
182                            .body(b)
183                            .unwrap());
184                    }
185                }
186            }
187
188            if settings.pay_to_relay.enabled {
190                return Ok(Response::builder()
191                    .status(StatusCode::TEMPORARY_REDIRECT)
192                    .header("location", "/join")
193                    .body(Body::empty())
194                    .unwrap());
195            }
196
197            Ok(Response::builder()
198                .status(200)
199                .header("Content-Type", "text/plain")
200                .body(Body::from("Please use a Nostr client to connect."))
201                .unwrap())
202        }
203        ("/metrics", false) => {
204            let mut buffer = vec![];
205            let encoder = TextEncoder::new();
206            let metric_families = registry.gather();
207            encoder.encode(&metric_families, &mut buffer).unwrap();
208
209            Ok(Response::builder()
210                .status(StatusCode::OK)
211                .header("Content-Type", "text/plain")
212                .body(Body::from(buffer))
213                .unwrap())
214        }
215        ("/favicon.ico", false) => {
216            if let Some(favicon_bytes) = favicon {
217                info!("returning favicon");
218                Ok(Response::builder()
219                    .status(StatusCode::OK)
220                    .header("Content-Type", "image/x-icon")
221                    .header("Cache-Control", "public, max-age=2419200")
223                    .body(Body::from(favicon_bytes))
224                    .unwrap())
225            } else {
226                Ok(Response::builder()
227                    .status(StatusCode::NOT_FOUND)
228                    .body(Body::from(""))
229                    .unwrap())
230            }
231        }
232        ("/lnbits", false) => {
234            let callback: payment::lnbits::LNBitsCallback =
235                serde_json::from_slice(&to_bytes(request.into_body()).await.unwrap()).unwrap();
236            debug!("LNBits callback: {callback:?}");
237
238            if let Err(e) = payment_tx.send(PaymentMessage::InvoicePaid(callback.payment_hash)) {
239                warn!("Could not send invoice update: {}", e);
240                return Ok(Response::builder()
241                    .status(StatusCode::INTERNAL_SERVER_ERROR)
242                    .body(Body::from("Error processing callback"))
243                    .unwrap());
244            }
245
246            Ok(Response::builder()
247                .status(StatusCode::OK)
248                .body(Body::from("ok"))
249                .unwrap())
250        }
251        ("/terms", false) => Ok(Response::builder()
253            .status(200)
254            .header("Content-Type", "text/plain")
255            .body(Body::from(settings.pay_to_relay.terms_message))
256            .unwrap()),
257        ("/join", false) => {
259            if !settings.pay_to_relay.sign_ups {
261                return Ok(Response::builder()
262                    .status(401)
263                    .header("Content-Type", "text/plain")
264                    .body(Body::from("Sorry, joining is not allowed at the moment"))
265                    .unwrap());
266            }
267
268            let html = r#"
269<!doctype HTML>
270<head>
271  <meta charset="UTF-8">
272  <style>
273    body {
274      display: flex;
275      flex-direction: column;
276      align-items: center;
277      text-align: center;
278      font-family: Arial, sans-serif;
279      background-color: #6320a7;
280      color: white;
281    }
282
283    .container {
284      display: flex;
285      justify-content: center;
286      align-items: center;
287      height: 400px;
288    }
289
290    a {
291      color: pink;
292    }
293
294    input[type="text"] {
295        width: 100%;
296        max-width: 500px;
297        box-sizing: border-box;
298        overflow-x: auto;
299        white-space: nowrap;
300    }
301  </style>
302</head>
303<body>
304  <div style="width:75%;">
305    <h1>Enter your pubkey</h1>
306    <form action="/invoice" onsubmit="return checkForm(this);">
307      <input type="text" name="pubkey" id="pubkey-input"><br><br>
308      <input type="checkbox" id="terms" required>
309      <label for="terms">I agree to the <a href="/terms">terms and conditions</a></label><br><br>
310      <button type="submit">Submit</button>
311    </form>
312    <button id="get-public-key-btn">Get Public Key</button>
313  </div>
314  <script>
315    function checkForm(form) {
316      if (!form.terms.checked) {
317        alert("Please agree to the terms and conditions");
318        return false;
319      }
320      return true;
321    }
322
323    const pubkeyInput = document.getElementById('pubkey-input');
324      const getPublicKeyBtn = document.getElementById('get-public-key-btn');
325      getPublicKeyBtn.addEventListener('click', async function() {
326        try {
327          const publicKey = await window.nostr.getPublicKey();
328          pubkeyInput.value = publicKey;
329        } catch (error) {
330          console.error(error);
331        }
332      });
333  </script>
334</body>
335</html>
336            "#;
337            Ok(Response::builder()
338                .status(StatusCode::OK)
339                .body(Body::from(html))
340                .unwrap())
341        }
342        ("/invoice", false) => {
344            if !settings.pay_to_relay.sign_ups {
346                return Ok(Response::builder()
347                    .status(401)
348                    .header("Content-Type", "text/plain")
349                    .body(Body::from("Sorry, joining is not allowed at the moment"))
350                    .unwrap());
351            }
352
353            let pubkey = get_pubkey(request);
355
356            if pubkey.is_none() {
358                return Ok(Response::builder()
359                    .status(404)
360                    .header("location", "/join")
361                    .body(Body::empty())
362                    .unwrap());
363            }
364
365            let pubkey = pubkey.unwrap();
367            let key = Keys::from_pk_str(&pubkey);
368            if key.is_err() {
369                return Ok(Response::builder()
370                    .status(401)
371                    .header("Content-Type", "text/plain")
372                    .body(Body::from("Looks like your key is invalid"))
373                    .unwrap());
374            }
375
376            let payment_message;
378            if let Ok((admission_status, _)) = repo.get_account_balance(&key.unwrap()).await {
379                if admission_status {
380                    return Ok(Response::builder()
381                        .status(StatusCode::OK)
382                        .body(Body::from("Already admitted"))
383                        .unwrap());
384                } else {
385                    payment_message = PaymentMessage::CheckAccount(pubkey.clone());
386                }
387            } else {
388                payment_message = PaymentMessage::NewAccount(pubkey.clone());
389            }
390
391            if payment_tx.send(payment_message).is_err() {
393                warn!("Could not send payment tx");
394                return Ok(Response::builder()
395                    .status(501)
396                    .header("Content-Type", "text/plain")
397                    .body(Body::from("Sorry, something went wrong"))
398                    .unwrap());
399            }
400
401            let mut invoice_info: Option<InvoiceInfo> = None;
403            while let Ok(msg) = payment_tx.subscribe().recv().await {
404                match msg {
405                    PaymentMessage::Invoice(m_pubkey, m_invoice_info) => {
406                        if m_pubkey == pubkey.clone() {
407                            invoice_info = Some(m_invoice_info);
408                            break;
409                        }
410                    }
411                    PaymentMessage::AccountAdmitted(m_pubkey) => {
412                        if m_pubkey == pubkey.clone() {
413                            return Ok(Response::builder()
414                                .status(StatusCode::OK)
415                                .body(Body::from("Already admitted"))
416                                .unwrap());
417                        }
418                    }
419                    _ => (),
420                }
421            }
422
423            if invoice_info.is_none() {
425                return Ok(Response::builder()
426                    .status(StatusCode::INTERNAL_SERVER_ERROR)
427                    .body(Body::from("Sorry, could not get invoice"))
428                    .unwrap());
429            }
430
431            let invoice_info = invoice_info.unwrap();
433
434            let qr_code: String;
435            if let Ok(code) = QrCode::new(invoice_info.bolt11.as_bytes()) {
436                qr_code = code
437                    .render()
438                    .min_dimensions(200, 200)
439                    .dark_color(svg::Color("#800000"))
440                    .light_color(svg::Color("#ffff80"))
441                    .build();
442            } else {
443                qr_code = "Could not render image".to_string();
444            }
445
446            let html_result = format!(
447                r#"
448<!DOCTYPE html>
449<html>
450  <head>
451  <meta charset="UTF-8">
452    <style>
453      body {{
454        display: flex;
455        flex-direction: column;
456        align-items: center;
457        text-align: center;
458        font-family: Arial, sans-serif;
459        background-color:  #6320a7 ;
460        color: white;
461      }}
462      #copy-button {{
463        background-color: #bb5f0d ;
464        color: white;
465        padding: 10px 20px;
466        border-radius: 5px;
467        border: none;
468        cursor: pointer;
469      }}
470      #copy-button:hover {{
471        background-color: #8f29f4;
472      }}
473    .container {{
474        display: flex;
475        justify-content: center;
476        align-items: center;
477        height: 400px;
478    }}
479    a {{
480        color: pink;
481    }}
482    </style>
483  </head>
484  <body>
485    <div style="width:75%;">
486      <h3>
487        To use this relay, an admission fee of {} sats is required. By paying the fee, you agree to the <a href='terms'>terms</a>.
488      </h3>
489    </div>
490    <div>
491        <div style="max-height: 300px;">
492            {}
493        </div>
494    </div>
495    <div>
496    <div style="width: 75%;">
497        <p style="overflow-wrap: break-word; width: 500px;">{}</p>
498        <button id="copy-button">Copy</button>
499    </div>
500    <div>
501        <p> This page will not refresh </p>
502        <p> Verify admission <a href=/account?pubkey={}>here</a> once you have paid</p>
503    </div>
504    </div>
505  </body>
506</html>
507
508
509<script>
510  const copyButton = document.getElementById("copy-button");
511  if (navigator.clipboard) {{
512    copyButton.addEventListener("click", function() {{
513      const textToCopy = "{}";
514      navigator.clipboard.writeText(textToCopy).then(function() {{
515        console.log("Text copied to clipboard");
516      }}, function(err) {{
517        console.error("Could not copy text: ", err);
518      }});
519    }});
520  }} else {{
521    copyButton.style.display = "none";
522    console.warn("Clipboard API is not supported in this browser");
523  }}
524</script>
525"#,
526                settings.pay_to_relay.admission_cost,
527                qr_code,
528                invoice_info.bolt11,
529                pubkey,
530                invoice_info.bolt11
531            );
532
533            Ok(Response::builder()
534                .status(StatusCode::OK)
535                .body(Body::from(html_result))
536                .unwrap())
537        }
538        ("/account", false) => {
539            if !settings.pay_to_relay.enabled {
541                return Ok(Response::builder()
542                    .status(401)
543                    .header("Content-Type", "text/plain")
544                    .body(Body::from("This relay is not paid"))
545                    .unwrap());
546            }
547
548            let pubkey = get_pubkey(request);
550
551            if pubkey.is_none() {
553                return Ok(Response::builder()
554                    .status(404)
555                    .header("location", "/join")
556                    .body(Body::empty())
557                    .unwrap());
558            }
559
560            let pubkey = pubkey.unwrap();
562            let key = Keys::from_pk_str(&pubkey);
563            if key.is_err() {
564                return Ok(Response::builder()
565                    .status(401)
566                    .header("Content-Type", "text/plain")
567                    .body(Body::from("Looks like your key is invalid"))
568                    .unwrap());
569            }
570
571            let text =
573                if let Ok((admission_status, _)) = repo.get_account_balance(&key.unwrap()).await {
574                    if admission_status {
575                        r#"<span style="color: green;">is</span>"#
576                    } else {
577                        r#"<span style="color: red;">is not</span>"#
578                    }
579                } else {
580                    "Could not get admission status"
581                };
582
583            let html_result = format!(
584                r#"
585            <!DOCTYPE html>
586<html>
587  <head>
588    <meta charset="UTF-8">
589    <style>
590      body {{
591        display: flex;
592        flex-direction: column;
593        align-items: center;
594        text-align: center;
595        font-family: Arial, sans-serif;
596        background-color: #6320a7;
597        color: white;
598        height: 100vh;
599      }}
600    </style>
601  </head>
602  <body>
603    <div>
604      <h5>{} {} admitted</h5>
605    </div>
606  </body>
607</html>
608
609
610            "#,
611                pubkey, text
612            );
613
614            Ok(Response::builder()
615                .status(StatusCode::OK)
616                .body(Body::from(html_result))
617                .unwrap())
618        }
619        (_, _) => {
621            Ok(Response::builder()
623                .status(StatusCode::NOT_FOUND)
624                .body(Body::from("Nothing here."))
625                .unwrap())
626        }
627    }
628}
629
630fn get_pubkey(request: Request<Body>) -> Option<String> {
632    let query = request.uri().query().unwrap_or("").to_string();
633
634    query.split('&').fold(None, |acc, pair| {
636        let mut parts = pair.splitn(2, '=');
637        let key = parts.next();
638        let value = parts.next();
639        if key == Some("pubkey") {
640            return value.map(|s| s.to_owned());
641        }
642        acc
643    })
644}
645
646fn get_header_string(header: &str, headers: &HeaderMap) -> Option<String> {
647    headers
648        .get(header)
649        .and_then(|x| x.to_str().ok().map(std::string::ToString::to_string))
650}
651
652async fn ctrl_c_or_signal(mut shutdown_signal: Receiver<()>) {
654    let mut term_signal = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
655        .expect("could not define signal");
656    loop {
657        tokio::select! {
658            _ = shutdown_signal.recv() => {
659                info!("Shutting down webserver as requested");
660                break;
662            },
663            _ = tokio::signal::ctrl_c() => {
664                info!("Shutting down webserver due to SIGINT");
665                break;
666            },
667            _ = term_signal.recv() => {
668                info!("Shutting down webserver due to SIGTERM");
669                break;
670            },
671        }
672    }
673}
674
675fn create_metrics() -> (Registry, NostrMetrics) {
676    let registry = Registry::new();
678
679    let query_sub = Histogram::with_opts(HistogramOpts::new(
680        "nostr_query_seconds",
681        "Subscription response times",
682    ))
683    .unwrap();
684    let query_db = Histogram::with_opts(HistogramOpts::new(
685        "nostr_filter_seconds",
686        "Filter SQL query times",
687    ))
688    .unwrap();
689    let write_events = Histogram::with_opts(HistogramOpts::new(
690        "nostr_events_write_seconds",
691        "Event writing response times",
692    ))
693    .unwrap();
694    let sent_events = IntCounterVec::new(
695        Opts::new("nostr_events_sent_total", "Events sent to clients"),
696        vec!["source"].as_slice(),
697    )
698    .unwrap();
699    let connections =
700        IntCounter::with_opts(Opts::new("nostr_connections_total", "New connections")).unwrap();
701    let db_connections = IntGauge::with_opts(Opts::new(
702        "nostr_db_connections",
703        "Active database connections",
704    ))
705    .unwrap();
706    let query_aborts = IntCounterVec::new(
707        Opts::new("nostr_query_abort_total", "Aborted queries"),
708        vec!["reason"].as_slice(),
709    )
710    .unwrap();
711    let cmd_req = IntCounter::with_opts(Opts::new("nostr_cmd_req_total", "REQ commands")).unwrap();
712    let cmd_event =
713        IntCounter::with_opts(Opts::new("nostr_cmd_event_total", "EVENT commands")).unwrap();
714    let cmd_close =
715        IntCounter::with_opts(Opts::new("nostr_cmd_close_total", "CLOSE commands")).unwrap();
716    let cmd_auth =
717        IntCounter::with_opts(Opts::new("nostr_cmd_auth_total", "AUTH commands")).unwrap();
718    let disconnects = IntCounterVec::new(
719        Opts::new("nostr_disconnects_total", "Client disconnects"),
720        vec!["reason"].as_slice(),
721    )
722    .unwrap();
723    registry.register(Box::new(query_sub.clone())).unwrap();
724    registry.register(Box::new(query_db.clone())).unwrap();
725    registry.register(Box::new(write_events.clone())).unwrap();
726    registry.register(Box::new(sent_events.clone())).unwrap();
727    registry.register(Box::new(connections.clone())).unwrap();
728    registry.register(Box::new(db_connections.clone())).unwrap();
729    registry.register(Box::new(query_aborts.clone())).unwrap();
730    registry.register(Box::new(cmd_req.clone())).unwrap();
731    registry.register(Box::new(cmd_event.clone())).unwrap();
732    registry.register(Box::new(cmd_close.clone())).unwrap();
733    registry.register(Box::new(cmd_auth.clone())).unwrap();
734    registry.register(Box::new(disconnects.clone())).unwrap();
735    let metrics = NostrMetrics {
736        query_sub,
737        query_db,
738        write_events,
739        sent_events,
740        connections,
741        db_connections,
742        disconnects,
743        query_aborts,
744        cmd_req,
745        cmd_event,
746        cmd_close,
747        cmd_auth,
748    };
749    (registry, metrics)
750}
751
752fn file_bytes(path: &str) -> Result<Vec<u8>> {
753    let f = File::open(path)?;
754    let mut reader = BufReader::new(f);
755    let mut buffer = Vec::new();
756    reader.read_to_end(&mut buffer)?;
758    Ok(buffer)
759}
760
761pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Result<(), Error> {
763    trace!("Config: {:?}", settings);
764    if !Path::new(&settings.database.data_directory).is_dir() {
766        error!("Database directory does not exist");
767        return Err(Error::DatabaseDirError);
768    }
769    let addr = format!(
770        "{}:{}",
771        settings.network.address.trim(),
772        settings.network.port
773    );
774    let socket_addr = addr.parse().expect("listening address not valid");
775    if let Some(addr_whitelist) = &settings.authorization.pubkey_whitelist {
777        info!(
778            "Event publishing restricted to {} pubkey(s)",
779            addr_whitelist.len()
780        );
781    }
782    if settings.verified_users.is_active() {
784        info!(
785            "NIP-05 user verification mode:{:?}",
786            settings.verified_users.mode
787        );
788        if let Some(d) = settings.verified_users.verify_update_duration() {
789            info!("NIP-05 check user verification every:   {:?}", d);
790        }
791        if let Some(d) = settings.verified_users.verify_expiration_duration() {
792            info!("NIP-05 user verification expires after: {:?}", d);
793        }
794        if let Some(wl) = &settings.verified_users.domain_whitelist {
795            info!("NIP-05 domain whitelist: {:?}", wl);
796        }
797        if let Some(bl) = &settings.verified_users.domain_blacklist {
798            info!("NIP-05 domain blacklist: {:?}", bl);
799        }
800    }
801    let rt = Builder::new_multi_thread()
803        .enable_all()
804        .thread_name_fn(|| {
805            static ATOMIC_ID: std::sync::atomic::AtomicUsize =
807                std::sync::atomic::AtomicUsize::new(0);
808            let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
809            format!("tokio-ws-{id}")
810        })
811        .max_blocking_threads(settings.limits.max_blocking_threads)
813        .on_thread_start(|| {
814            trace!("started new thread: {:?}", std::thread::current().name());
815        })
816        .on_thread_stop(|| {
817            trace!("stopped thread: {:?}", std::thread::current().name());
818        })
819        .build()
820        .unwrap();
821    rt.block_on(async {
823        let broadcast_buffer_limit = settings.limits.broadcast_buffer;
824        let persist_buffer_limit = settings.limits.event_persist_buffer;
825        let verified_users_active = settings.verified_users.is_active();
826        let settings = settings.clone();
827        info!("listening on: {}", socket_addr);
828        let (bcast_tx, _) = broadcast::channel::<Event>(broadcast_buffer_limit);
833        let (event_tx, event_rx) = mpsc::channel::<SubmittedEvent>(persist_buffer_limit);
836        let (invoke_shutdown, shutdown_listen) = broadcast::channel::<()>(1);
839        let (metadata_tx, metadata_rx) = broadcast::channel::<Event>(4096);
848
849        let (payment_tx, payment_rx) = broadcast::channel::<PaymentMessage>(4096);
850
851        let (registry, metrics) = create_metrics();
852
853        let repo = db::build_repo(&settings, metrics.clone()).await;
855        tokio::task::spawn(db::db_writer(
859            repo.clone(),
860            settings.clone(),
861            event_rx,
862            bcast_tx.clone(),
863            metadata_tx.clone(),
864            payment_tx.clone(),
865            shutdown_listen,
866        ));
867        info!("db writer created");
868
869        if settings.verified_users.mode != VerifiedUsersMode::Disabled {
871            let verifier_opt = nip05::Verifier::new(
872                repo.clone(),
873                metadata_rx,
874                bcast_tx.clone(),
875                settings.clone(),
876            );
877            if let Ok(mut v) = verifier_opt {
878                if verified_users_active {
879                    tokio::task::spawn(async move {
880                        info!("starting up NIP-05 verifier...");
881                        v.run().await;
882                    });
883                }
884            }
885        }
886
887        if settings.pay_to_relay.enabled {
889            let payment_opt = payment::Payment::new(
890                repo.clone(),
891                payment_tx.clone(),
892                payment_rx,
893                bcast_tx.clone(),
894                settings.clone(),
895            );
896            if let Ok(mut p) = payment_opt {
897                tokio::task::spawn(async move {
898                    info!("starting payment process ...");
899                    p.run().await;
900                });
901            }
902        }
903
904        let controlled_shutdown = invoke_shutdown.clone();
906        tokio::spawn(async move {
907            info!("control message listener started");
908            match shutdown_rx.recv() {
909                Ok(()) => {
910                    info!("control message requesting shutdown");
911                    controlled_shutdown.send(()).ok();
912                }
913                Err(std::sync::mpsc::RecvError) => {
914                    trace!("shutdown requestor is disconnected (this is normal)");
915                }
916            };
917        });
918        let ctrl_c_shutdown = invoke_shutdown.clone();
920        let webserver_shutdown_listen = invoke_shutdown.subscribe();
922
923        tokio::spawn(async move {
924            tokio::signal::ctrl_c().await.unwrap();
925            info!("shutting down due to SIGINT (main)");
926            ctrl_c_shutdown.send(()).ok();
927        });
928        let favicon = settings.info.favicon.as_ref().and_then(|x| {
934            info!("reading favicon...");
935            file_bytes(x).ok()
936        });
937
938        let make_svc = make_service_fn(|conn: &AddrStream| {
941            let repo = repo.clone();
942            let remote_addr = conn.remote_addr();
943            let bcast = bcast_tx.clone();
944            let event = event_tx.clone();
945            let payment_tx = payment_tx.clone();
946            let stop = invoke_shutdown.clone();
947            let settings = settings.clone();
948            let favicon = favicon.clone();
949            let registry = registry.clone();
950            let metrics = metrics.clone();
951            async move {
952                Ok::<_, Infallible>(service_fn(move |request: Request<Body>| {
954                    handle_web_request(
955                        request,
956                        repo.clone(),
957                        settings.clone(),
958                        remote_addr,
959                        bcast.clone(),
960                        event.clone(),
961                        payment_tx.clone(),
962                        stop.subscribe(),
963                        favicon.clone(),
964                        registry.clone(),
965                        metrics.clone(),
966                    )
967                }))
968            }
969        });
970        let server = Server::bind(&socket_addr)
971            .serve(make_svc)
972            .with_graceful_shutdown(ctrl_c_or_signal(webserver_shutdown_listen));
973        if let Err(e) = server.await {
975            eprintln!("server error: {e}");
976        }
977    });
978    Ok(())
979}
980
981#[derive(Deserialize, Serialize, Clone, PartialEq, Eq, Debug)]
983#[serde(untagged)]
984pub enum NostrMessage {
985    EventMsg(EventCmd),
987    SubMsg(Subscription),
989    CloseMsg(CloseCmd),
991}
992
993fn convert_to_msg(msg: &str, max_bytes: Option<usize>) -> Result<NostrMessage> {
995    let parsed_res: Result<NostrMessage> =
996        serde_json::from_str(msg).map_err(std::convert::Into::into);
997    match parsed_res {
998        Ok(m) => {
999            if let NostrMessage::SubMsg(_) = m {
1000                trace!("REQ: {:?}", msg);
1002            };
1003            if let NostrMessage::EventMsg(_) = m {
1004                if let Some(max_size) = max_bytes {
1005                    if msg.len() > max_size && max_size > 0 {
1007                        return Err(Error::EventMaxLengthError(msg.len()));
1008                    }
1009                }
1010            }
1011            Ok(m)
1012        }
1013        Err(e) => {
1014            trace!("proto parse error: {:?}", e);
1015            trace!("parse error on message: {:?}", msg.trim());
1016            Err(Error::ProtoParseError)
1017        }
1018    }
1019}
1020
1021fn make_notice_message(notice: &Notice) -> Message {
1023    let json = match notice {
1024        Notice::Message(ref msg) => json!(["NOTICE", msg]),
1025        Notice::EventResult(ref res) => json!(["OK", res.id, res.status.to_bool(), res.msg]),
1026        Notice::AuthChallenge(ref challenge) => json!(["AUTH", challenge]),
1027    };
1028
1029    Message::text(json.to_string())
1030}
1031
1032fn allowed_to_send(event_str: &String, conn: &conn::ClientConn, settings: &Settings) -> bool {
1033    if settings.authorization.nip42_dms {
1035        match serde_json::from_str::<Event>(event_str) {
1036            Ok(event) => {
1037                if event.kind == 4 {
1038                    match (conn.auth_pubkey(), event.tag_values_by_name("p").first()) {
1039                        (Some(auth_pubkey), Some(recipient_pubkey)) => {
1040                            recipient_pubkey == auth_pubkey || &event.pubkey == auth_pubkey
1041                        },
1042                        (_, _) => {
1043                            false
1044                        },
1045                    }
1046                } else {
1047                    true
1048                }
1049            },
1050            Err(_) => false
1051        }
1052    } else {
1053        true
1054    }
1055}
1056
1057struct ClientInfo {
1058    remote_ip: String,
1059    user_agent: Option<String>,
1060    origin: Option<String>,
1061}
1062
1063#[allow(clippy::too_many_arguments)]
1066async fn nostr_server(
1067    repo: Arc<dyn NostrRepo>,
1068    client_info: ClientInfo,
1069    settings: Settings,
1070    mut ws_stream: WebSocketStream<Upgraded>,
1071    broadcast: Sender<Event>,
1072    event_tx: mpsc::Sender<SubmittedEvent>,
1073    mut shutdown: Receiver<()>,
1074    metrics: NostrMetrics,
1075) {
1076    let orig_start = Instant::now();
1078    let mut bcast_rx = broadcast.subscribe();
1080    let mut conn = conn::ClientConn::new(client_info.remote_ip);
1082    let mut sub_lim_opt = None;
1084    let jitter = Jitter::up_to(Duration::from_millis(100));
1086    let sub_per_min_setting = settings.limits.subscriptions_per_min;
1087    if let Some(sub_per_min) = sub_per_min_setting {
1088        if sub_per_min > 0 {
1089            trace!("Rate limits for sub creation ({}/min)", sub_per_min);
1090            let quota_time = core::num::NonZeroU32::new(sub_per_min).unwrap();
1091            let quota = Quota::per_minute(quota_time);
1092            sub_lim_opt = Some(RateLimiter::direct(quota));
1093        }
1094    }
1095    let cid = conn.get_client_prefix();
1097    let (query_tx, mut query_rx) = mpsc::channel::<db::QueryResult>(20_000);
1102    let (notice_tx, mut notice_rx) = mpsc::channel::<Notice>(128);
1104
1105    let mut last_message_time = Instant::now();
1107
1108    let default_ping_dur = Duration::from_secs(settings.network.ping_interval_seconds.into());
1110
1111    let max_quiet_time = Duration::from_secs(60 * 20);
1113
1114    let start = tokio::time::Instant::now() + default_ping_dur;
1115    let mut ping_interval = tokio::time::interval_at(start, default_ping_dur);
1116
1117    let mut running_queries: HashMap<String, oneshot::Sender<()>> = HashMap::new();
1121    let mut client_published_event_count: usize = 0;
1124    let mut client_received_event_count: usize = 0;
1125
1126    let unspec = "<unspecified>".to_string();
1127    info!("new client connection (cid: {}, ip: {:?})", cid, conn.ip());
1128    let origin = client_info.origin.as_ref().unwrap_or(&unspec);
1129    let user_agent = client_info.user_agent.as_ref().unwrap_or(&unspec);
1130    info!(
1131        "cid: {}, origin: {:?}, user-agent: {:?}",
1132        cid, origin, user_agent
1133    );
1134
1135    metrics.connections.inc();
1137
1138    if settings.authorization.nip42_auth {
1139        conn.generate_auth_challenge();
1140        if let Some(challenge) = conn.auth_challenge() {
1141            ws_stream
1142                .send(make_notice_message(&Notice::AuthChallenge(
1143                    challenge.to_string(),
1144                )))
1145                .await
1146                .ok();
1147        }
1148    }
1149
1150    loop {
1151        tokio::select! {
1152            _ = shutdown.recv() => {
1153        metrics.disconnects.with_label_values(&["shutdown"]).inc();
1154                info!("Close connection down due to shutdown, client: {}, ip: {:?}, connected: {:?}", cid, conn.ip(), orig_start.elapsed());
1155                break;
1157            },
1158            _ = ping_interval.tick() => {
1159                if last_message_time.elapsed() > max_quiet_time {
1162                    debug!("ending connection due to lack of client ping response");
1163            metrics.disconnects.with_label_values(&["timeout"]).inc();
1164                    break;
1165                }
1166                ws_stream.send(Message::Ping(Vec::new())).await.ok();
1168            },
1169            Some(notice_msg) = notice_rx.recv() => {
1170                ws_stream.send(make_notice_message(¬ice_msg)).await.ok();
1171            },
1172            Some(query_result) = query_rx.recv() => {
1173                let subesc = query_result.sub_id.replace('"', "");
1175                if query_result.event == "EOSE" {
1176                    let send_str = format!("[\"EOSE\",\"{subesc}\"]");
1177                    ws_stream.send(Message::Text(send_str)).await.ok();
1178                } else if allowed_to_send(&query_result.event, &conn, &settings) {
1179                    metrics.sent_events.with_label_values(&["db"]).inc();
1180                    client_received_event_count += 1;
1181                    let send_str = format!("[\"EVENT\",\"{}\",{}]", subesc, &query_result.event);
1183                    ws_stream.send(Message::Text(send_str)).await.ok();
1184                }
1185            },
1186            Ok(global_event) = bcast_rx.recv() => {
1188                for (s, sub) in conn.subscriptions() {
1191                    if !sub.interested_in_event(&global_event) {
1192                        continue;
1193                    }
1194                    if let Ok(event_str) = serde_json::to_string(&global_event) {
1197                        if allowed_to_send(&event_str, &conn, &settings) {
1198                            trace!("sub match for client: {}, sub: {:?}, event: {:?}",
1200                               cid, s,
1201                               global_event.get_event_id_prefix());
1202                            let subesc = s.replace('"', "");
1203                            metrics.sent_events.with_label_values(&["realtime"]).inc();
1204                            ws_stream.send(Message::Text(format!("[\"EVENT\",\"{subesc}\",{event_str}]"))).await.ok();
1205                        }
1206                    } else {
1207                        warn!("could not serialize event: {:?}", global_event.get_event_id_prefix());
1208                    }
1209                }
1210            },
1211            ws_next = ws_stream.next() => {
1212                last_message_time = Instant::now();
1214                let nostr_msg = match ws_next {
1216                    Some(Ok(Message::Text(m))) => {
1217                        convert_to_msg(&m,settings.limits.max_event_bytes)
1218                    },
1219                    Some(Ok(Message::Binary(_))) => {
1220                        ws_stream.send(
1221                            make_notice_message(&Notice::message("binary messages are not accepted".into()))).await.ok();
1222                        continue;
1223                    },
1224                    Some(Ok(Message::Ping(_) | Message::Pong(_))) => {
1225                        continue;
1228                    },
1229                    Some(Err(WsError::Capacity(MessageTooLong{size, max_size}))) => {
1230                        ws_stream.send(
1231                            make_notice_message(&Notice::message(format!("message too large ({size} > {max_size})")))).await.ok();
1232                        continue;
1233                    },
1234                    None |
1235                    Some(Ok(Message::Close(_)) |
1236                         Err(WsError::AlreadyClosed | WsError::ConnectionClosed |
1237                             WsError::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake)))
1238                        => {
1239                            debug!("websocket close from client (cid: {}, ip: {:?})",cid, conn.ip());
1240                metrics.disconnects.with_label_values(&["normal"]).inc();
1241                            break;
1242                        },
1243                    Some(Err(WsError::Io(e))) => {
1244                        warn!("IO error (cid: {}, ip: {:?}): {:?}", cid, conn.ip(), e);
1246            metrics.disconnects.with_label_values(&["error"]).inc();
1247
1248                        break;
1249                    }
1250                    x => {
1251                        info!("unknown error (cid: {}, ip: {:?}): {:?} (closing conn)", cid, conn.ip(), x);
1253            metrics.disconnects.with_label_values(&["error"]).inc();
1254
1255                        break;
1256                    }
1257                };
1258
1259                match nostr_msg {
1261                    Ok(NostrMessage::EventMsg(ec)) => {
1262                        let evid = ec.event_id().to_owned();
1265                        let parsed : Result<EventWrapper> = Result::<EventWrapper>::from(ec);
1266                        metrics.cmd_event.inc();
1267                        match parsed {
1268                            Ok(WrappedEvent(e)) => {
1269                                metrics.cmd_event.inc();
1270                                let id_prefix:String = e.id.chars().take(8).collect();
1271                                debug!("successfully parsed/validated event: {:?} (cid: {}, kind: {})", id_prefix, cid, e.kind);
1272                                if e.is_expired() {
1274                                    let notice = Notice::invalid(e.id, "The event has already expired");
1275                                    ws_stream.send(make_notice_message(¬ice)).await.ok();
1276                                    } else if e.is_valid_timestamp(settings.options.reject_future_seconds) {
1278                                    let auth_pubkey = conn.auth_pubkey().and_then(|pubkey| hex::decode(pubkey).ok());
1280                                    let submit_event = SubmittedEvent {
1281                                        event: e.clone(),
1282                                        notice_tx: notice_tx.clone(),
1283                                        source_ip: conn.ip().to_string(),
1284                                        origin: client_info.origin.clone(),
1285                                        user_agent: client_info.user_agent.clone(),
1286                                        auth_pubkey };
1287                                    event_tx.send(submit_event).await.ok();
1288                                    client_published_event_count += 1;
1289                                } else {
1290                                    info!("client: {} sent a far future-dated event", cid);
1291                                    if let Some(fut_sec) = settings.options.reject_future_seconds {
1292                                        let msg = format!("The event created_at field is out of the acceptable range (+{fut_sec}sec) for this relay.");
1293                                        let notice = Notice::invalid(e.id, &msg);
1294                                        ws_stream.send(make_notice_message(¬ice)).await.ok();
1295                                    }
1296                                }
1297                            },
1298                            Ok(WrappedAuth(event)) => {
1299                                metrics.cmd_auth.inc();
1300                                if settings.authorization.nip42_auth {
1301                                    let id_prefix:String = event.id.chars().take(8).collect();
1302                                    debug!("successfully parsed auth: {:?} (cid: {})", id_prefix, cid);
1303                                    match &settings.info.relay_url {
1304                                        None => {
1305                                            error!("AUTH command received, but relay_url is not set in the config file (cid: {})", cid);
1306                                        },
1307                                        Some(relay) => {
1308                                            match conn.authenticate(&event, relay) {
1309                                                Ok(_) => {
1310                                                    let pubkey = match conn.auth_pubkey() {
1311                                                        Some(k) => k.chars().take(8).collect(),
1312                                                        None => "<unspecified>".to_string(),
1313                                                    };
1314                                                    info!("client is authenticated: (cid: {}, pubkey: {:?})", cid, pubkey);
1315                                                },
1316                                                Err(e) => {
1317                                                    info!("authentication error: {} (cid: {})", e, cid);
1318                                                    ws_stream.send(make_notice_message(&Notice::restricted(event.id, format!("authentication error: {e}").as_str()))).await.ok();
1319                                                },
1320                                            }
1321                                        }
1322                                    }
1323                                } else {
1324                                    let e = CommandUnknownError;
1325                                    info!("client sent an invalid event (cid: {})", cid);
1326                                    ws_stream.send(make_notice_message(&Notice::invalid(evid, &format!("{e}")))).await.ok();
1327                                }
1328                            },
1329                            Err(e) => {
1330                                metrics.cmd_event.inc();
1331                                info!("client sent an invalid event (cid: {})", cid);
1332                                ws_stream.send(make_notice_message(&Notice::invalid(evid, &format!("{e}")))).await.ok();
1333                            }
1334                        }
1335                    },
1336                    Ok(NostrMessage::SubMsg(s)) => {
1337                        debug!("subscription requested (cid: {}, sub: {:?})", cid, s.id);
1338                        if conn.has_subscription(&s) {
1345                            info!("client sent duplicate subscription, ignoring (cid: {}, sub: {:?})", cid, s.id);
1346                        } else {
1347                metrics.cmd_req.inc();
1348                            if let Some(ref lim) = sub_lim_opt {
1349                                lim.until_ready_with_jitter(jitter).await;
1350                            }
1351                            let (abandon_query_tx, abandon_query_rx) = oneshot::channel::<()>();
1352                            match conn.subscribe(s.clone()) {
1353                                Ok(()) => {
1354                                    if let Some(previous_query) = running_queries.insert(s.id.clone(), abandon_query_tx) {
1356                                        previous_query.send(()).ok();
1357                                    }
1358                                    if s.needs_historical_events() {
1359                                        repo.query_subscription(s, cid.clone(), query_tx.clone(), abandon_query_rx).await.ok();
1361                                    }
1362                                },
1363                                Err(e) => {
1364                                    info!("Subscription error: {} (cid: {}, sub: {:?})", e, cid, s.id);
1365                                    ws_stream.send(make_notice_message(&Notice::message(format!("Subscription error: {e}")))).await.ok();
1366                                }
1367                            }
1368                        }
1369                    },
1370                    Ok(NostrMessage::CloseMsg(cc)) => {
1371                        let parsed : Result<Close> = Result::<Close>::from(cc);
1373                        if let Ok(c) = parsed {
1374                metrics.cmd_close.inc();
1375                            let stop_tx = running_queries.remove(&c.id);
1378                            if let Some(tx) = stop_tx {
1379                                tx.send(()).ok();
1380                            }
1381                            conn.unsubscribe(&c);
1384                        } else {
1385                            info!("invalid command ignored");
1386                            ws_stream.send(make_notice_message(&Notice::message("could not parse command".into()))).await.ok();
1387                        }
1388                    },
1389                    Err(Error::ConnError) => {
1390                        debug!("got connection close/error, disconnecting cid: {}, ip: {:?}",cid, conn.ip());
1391                        break;
1392                    }
1393                    Err(Error::EventMaxLengthError(s)) => {
1394                        info!("client sent command larger ({} bytes) than max size (cid: {})", s, cid);
1395                        ws_stream.send(make_notice_message(&Notice::message("event exceeded max size".into()))).await.ok();
1396                    },
1397                    Err(Error::ProtoParseError) => {
1398                        info!("client sent command that could not be parsed (cid: {})", cid);
1399                        ws_stream.send(make_notice_message(&Notice::message("could not parse command".into()))).await.ok();
1400                    },
1401                    Err(e) => {
1402                        info!("got non-fatal error from client (cid: {}, error: {:?}", cid, e);
1403                    },
1404                }
1405            },
1406        }
1407    }
1408    for (_, stop_tx) in running_queries {
1410        stop_tx.send(()).ok();
1411    }
1412    info!(
1413        "stopping client connection (cid: {}, ip: {:?}, sent: {} events, recv: {} events, connected: {:?})",
1414        cid,
1415        conn.ip(),
1416        client_published_event_count,
1417        client_received_event_count,
1418        orig_start.elapsed()
1419    );
1420}
1421
1422#[derive(Clone)]
1423pub struct NostrMetrics {
1424    pub query_sub: Histogram,        pub query_db: Histogram,         pub db_connections: IntGauge,    pub write_events: Histogram,     pub sent_events: IntCounterVec,  pub connections: IntCounter,     pub disconnects: IntCounterVec,  pub query_aborts: IntCounterVec, pub cmd_req: IntCounter,         pub cmd_event: IntCounter,       pub cmd_close: IntCounter,       pub cmd_auth: IntCounter,        }