use std::fmt;
#[cfg(not(target_arch = "wasm32"))]
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
#[cfg(feature = "nip11")]
use nostr::nips::nip11::RelayInformationDocument;
use nostr::{ClientMessage, Event, Filter, RelayMessage, SubscriptionId, Timestamp, Url};
use nostr_sdk_net::futures_util::{Future, SinkExt, StreamExt};
use nostr_sdk_net::{self as net, WsMessage};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::{broadcast, oneshot, Mutex};
pub mod pool;
pub use self::pool::{RelayPoolMessage, RelayPoolNotification};
#[cfg(feature = "blocking")]
use crate::RUNTIME;
use crate::{thread, time};
type Message = (RelayEvent, Option<oneshot::Sender<bool>>);
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("channel timeout")]
ChannelTimeout,
#[error("recv message response timeout")]
RecvTimeout,
#[error("timeout")]
Timeout,
#[error("message not sent")]
MessagetNotSent,
#[error("impossible to recv msg")]
OneShotRecvError,
#[error("read actions are disabled for this relay")]
ReadDisabled,
#[error("write actions are disabled for this relay")]
WriteDisabled,
#[error("filters empty")]
FiltersEmpty,
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum RelayStatus {
Initialized,
Connected,
Connecting,
Disconnected,
Terminated,
}
impl fmt::Display for RelayStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Initialized => write!(f, "Initialized"),
Self::Connected => write!(f, "Connected"),
Self::Connecting => write!(f, "Connecting"),
Self::Disconnected => write!(f, "Disconnected"),
Self::Terminated => write!(f, "Terminated"),
}
}
}
#[derive(Debug)]
pub enum RelayEvent {
SendMsg(Box<ClientMessage>),
Close,
Terminate,
}
#[derive(Debug, Clone)]
pub struct RelayOptions {
read: Arc<AtomicBool>,
write: Arc<AtomicBool>,
}
impl Default for RelayOptions {
fn default() -> Self {
Self::new(true, true)
}
}
impl RelayOptions {
pub fn new(read: bool, write: bool) -> Self {
Self {
read: Arc::new(AtomicBool::new(read)),
write: Arc::new(AtomicBool::new(write)),
}
}
pub fn read(&self) -> bool {
self.read.load(Ordering::SeqCst)
}
pub fn set_read(&self, read: bool) {
let _ = self
.read
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(read));
}
pub fn write(&self) -> bool {
self.write.load(Ordering::SeqCst)
}
pub fn set_write(&self, write: bool) {
let _ = self
.write
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(write));
}
}
#[derive(Debug, Clone)]
pub struct RelayConnectionStats {
attempts: Arc<AtomicUsize>,
success: Arc<AtomicUsize>,
connected_at: Arc<AtomicU64>,
}
impl Default for RelayConnectionStats {
fn default() -> Self {
Self::new()
}
}
impl RelayConnectionStats {
pub fn new() -> Self {
Self {
attempts: Arc::new(AtomicUsize::new(0)),
success: Arc::new(AtomicUsize::new(0)),
connected_at: Arc::new(AtomicU64::new(0)),
}
}
pub fn attempts(&self) -> usize {
self.attempts.load(Ordering::SeqCst)
}
pub fn success(&self) -> usize {
self.success.load(Ordering::SeqCst)
}
pub fn connected_at(&self) -> Timestamp {
Timestamp::from(self.connected_at.load(Ordering::SeqCst))
}
pub(crate) fn new_attempt(&self) {
self.attempts.fetch_add(1, Ordering::SeqCst);
}
pub(crate) fn new_success(&self) {
self.success.fetch_add(1, Ordering::SeqCst);
let _ = self
.connected_at
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| {
Some(Timestamp::now().as_u64())
});
}
}
#[derive(Debug, Clone)]
pub struct ActiveSubscription {
id: SubscriptionId,
filters: Vec<Filter>,
}
impl Default for ActiveSubscription {
fn default() -> Self {
Self::new()
}
}
impl ActiveSubscription {
pub fn new() -> Self {
Self {
id: SubscriptionId::generate(),
filters: Vec::new(),
}
}
pub fn id(&self) -> SubscriptionId {
self.id.clone()
}
pub fn filters(&self) -> Vec<Filter> {
self.filters.clone()
}
}
#[derive(Debug, Clone)]
pub struct Relay {
url: Url,
#[cfg(not(target_arch = "wasm32"))]
proxy: Option<SocketAddr>,
status: Arc<Mutex<RelayStatus>>,
#[cfg(feature = "nip11")]
document: Arc<Mutex<RelayInformationDocument>>,
opts: RelayOptions,
stats: RelayConnectionStats,
scheduled_for_termination: Arc<AtomicBool>,
pool_sender: Sender<RelayPoolMessage>,
relay_sender: Sender<Message>,
relay_receiver: Arc<Mutex<Receiver<Message>>>,
notification_sender: broadcast::Sender<RelayPoolNotification>,
subscription: Arc<Mutex<ActiveSubscription>>,
}
impl PartialEq for Relay {
fn eq(&self, other: &Self) -> bool {
self.url == other.url
}
}
impl Relay {
#[cfg(not(target_arch = "wasm32"))]
pub fn new(
url: Url,
pool_sender: Sender<RelayPoolMessage>,
notification_sender: broadcast::Sender<RelayPoolNotification>,
proxy: Option<SocketAddr>,
opts: RelayOptions,
) -> Self {
let (relay_sender, relay_receiver) = mpsc::channel::<Message>(1024);
Self {
url,
proxy,
status: Arc::new(Mutex::new(RelayStatus::Initialized)),
#[cfg(feature = "nip11")]
document: Arc::new(Mutex::new(RelayInformationDocument::new())),
opts,
stats: RelayConnectionStats::new(),
scheduled_for_termination: Arc::new(AtomicBool::new(false)),
pool_sender,
relay_sender,
relay_receiver: Arc::new(Mutex::new(relay_receiver)),
notification_sender,
subscription: Arc::new(Mutex::new(ActiveSubscription::new())),
}
}
#[cfg(target_arch = "wasm32")]
pub fn new(
url: Url,
pool_sender: Sender<RelayPoolMessage>,
notification_sender: broadcast::Sender<RelayPoolNotification>,
opts: RelayOptions,
) -> Self {
let (relay_sender, relay_receiver) = mpsc::channel::<Message>(1024);
Self {
url,
status: Arc::new(Mutex::new(RelayStatus::Initialized)),
#[cfg(feature = "nip11")]
document: Arc::new(Mutex::new(RelayInformationDocument::new())),
opts,
stats: RelayConnectionStats::new(),
scheduled_for_termination: Arc::new(AtomicBool::new(false)),
pool_sender,
relay_sender,
relay_receiver: Arc::new(Mutex::new(relay_receiver)),
notification_sender,
subscription: Arc::new(Mutex::new(ActiveSubscription::new())),
}
}
pub fn url(&self) -> Url {
self.url.clone()
}
#[cfg(not(target_arch = "wasm32"))]
pub fn proxy(&self) -> Option<SocketAddr> {
self.proxy
}
pub async fn status(&self) -> RelayStatus {
let status = self.status.lock().await;
status.clone()
}
#[cfg(feature = "blocking")]
pub fn status_blocking(&self) -> RelayStatus {
RUNTIME.block_on(async { self.status().await })
}
async fn set_status(&self, status: RelayStatus) {
let mut s = self.status.lock().await;
*s = status;
}
#[cfg(feature = "nip11")]
pub async fn document(&self) -> RelayInformationDocument {
let document = self.document.lock().await;
document.clone()
}
#[cfg(all(feature = "nip11", feature = "blocking"))]
pub fn document_blocking(&self) -> RelayInformationDocument {
RUNTIME.block_on(async { self.document().await })
}
#[cfg(feature = "nip11")]
async fn set_document(&self, document: RelayInformationDocument) {
let mut d = self.document.lock().await;
*d = document;
}
pub async fn subscription(&self) -> ActiveSubscription {
let subscription = self.subscription.lock().await;
subscription.clone()
}
pub async fn update_subscription_filters(&self, filters: Vec<Filter>) {
let mut s = self.subscription.lock().await;
s.filters = filters;
}
pub fn opts(&self) -> RelayOptions {
self.opts.clone()
}
pub fn stats(&self) -> RelayConnectionStats {
self.stats.clone()
}
fn is_scheduled_for_termination(&self) -> bool {
self.scheduled_for_termination.load(Ordering::SeqCst)
}
fn schedule_for_termination(&self, value: bool) {
let _ =
self.scheduled_for_termination
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(value));
}
pub async fn connect(&self, wait_for_connection: bool) {
if let RelayStatus::Initialized | RelayStatus::Terminated = self.status().await {
if wait_for_connection {
self.try_connect().await
} else {
self.set_status(RelayStatus::Disconnected).await;
}
let relay = self.clone();
thread::spawn(async move {
loop {
log::debug!(
"{} channel capacity: {}",
relay.url(),
relay.relay_sender.capacity()
);
if relay.is_scheduled_for_termination() {
relay.set_status(RelayStatus::Terminated).await;
relay.schedule_for_termination(false);
log::debug!("Auto connect loop terminated for {}", relay.url);
break;
}
match relay.status().await {
RelayStatus::Disconnected => relay.try_connect().await,
RelayStatus::Terminated => {
log::debug!("Auto connect loop terminated for {}", relay.url);
break;
}
_ => (),
};
thread::sleep(Duration::from_secs(20)).await;
}
});
}
}
async fn try_connect(&self) {
self.stats.new_attempt();
let url: String = self.url.to_string();
self.set_status(RelayStatus::Connecting).await;
log::debug!("Connecting to {}", url);
#[cfg(feature = "nip11")]
{
let relay = self.clone();
thread::spawn(async move {
#[cfg(not(target_arch = "wasm32"))]
let document = RelayInformationDocument::get(relay.url(), relay.proxy()).await;
#[cfg(target_arch = "wasm32")]
let document = RelayInformationDocument::get(relay.url()).await;
match document {
Ok(document) => relay.set_document(document).await,
Err(e) => log::error!(
"Impossible to get information document from {}: {}",
relay.url,
e
),
};
});
}
#[cfg(not(target_arch = "wasm32"))]
let connection = net::native::connect(&self.url, self.proxy, None).await;
#[cfg(target_arch = "wasm32")]
let connection = net::wasm::connect(&self.url).await;
match connection {
Ok((mut ws_tx, mut ws_rx)) => {
self.set_status(RelayStatus::Connected).await;
log::info!("Connected to {}", url);
self.stats.new_success();
let relay = self.clone();
thread::spawn(async move {
log::debug!("Relay Event Thread Started");
let mut rx = relay.relay_receiver.lock().await;
while let Some((relay_event, oneshot_sender)) = rx.recv().await {
match relay_event {
RelayEvent::SendMsg(msg) => {
log::debug!("Sending message {}", msg.as_json());
if let Err(e) = ws_tx.send(WsMessage::Text(msg.as_json())).await {
log::error!(
"Impossible to send msg to {}: {}",
relay.url(),
e.to_string()
);
if let Some(sender) = oneshot_sender {
if let Err(e) = sender.send(false) {
log::error!("Impossible to send oneshot msg: {}", e);
}
}
break;
};
if let Some(sender) = oneshot_sender {
if let Err(e) = sender.send(true) {
log::error!("Impossible to send oneshot msg: {}", e);
}
}
}
RelayEvent::Close => {
let _ = ws_tx.close().await;
relay.set_status(RelayStatus::Disconnected).await;
log::info!("Disconnected from {}", url);
break;
}
RelayEvent::Terminate => {
if let Err(e) = relay.unsubscribe(false).await {
log::error!(
"Impossible to unsubscribe from {}: {}",
relay.url(),
e.to_string()
)
}
let _ = ws_tx.close().await;
relay.set_status(RelayStatus::Terminated).await;
relay.schedule_for_termination(false);
log::info!("Completely disconnected from {}", url);
break;
}
}
}
});
let relay = self.clone();
thread::spawn(async move {
log::debug!("Relay Message Thread Started");
async fn func(relay: &Relay, data: Vec<u8>) {
match String::from_utf8(data) {
Ok(data) => match RelayMessage::from_json(&data) {
Ok(msg) => {
log::trace!("Received message to {}: {:?}", relay.url, msg);
if let Err(err) = relay
.pool_sender
.send(RelayPoolMessage::ReceivedMsg {
relay_url: relay.url(),
msg,
})
.await
{
log::error!(
"Impossible to send ReceivedMsg to pool: {}",
&err
);
};
}
Err(err) => {
log::error!("{}: {}", err, data);
}
},
Err(err) => log::error!("{}", err),
}
}
#[cfg(not(target_arch = "wasm32"))]
while let Some(msg_res) = ws_rx.next().await {
if let Ok(msg) = msg_res {
let data: Vec<u8> = msg.into_data();
func(&relay, data).await;
}
}
#[cfg(target_arch = "wasm32")]
while let Some(msg) = ws_rx.next().await {
let data: Vec<u8> = msg.as_ref().to_vec();
func(&relay, data).await;
}
log::debug!("Exited from Message Thread of {}", relay.url);
if relay.status().await != RelayStatus::Terminated {
if let Err(err) = relay.disconnect().await {
log::error!("Impossible to disconnect {}: {}", relay.url, err);
}
}
});
if self.opts.read() {
if let Err(e) = self.resubscribe(false).await {
match e {
Error::FiltersEmpty => log::debug!("Filters empty for {}", self.url()),
_ => log::error!(
"Impossible to subscribe to {}: {}",
self.url(),
e.to_string()
),
}
}
}
}
Err(err) => {
self.set_status(RelayStatus::Disconnected).await;
log::error!("Impossible to connect to {}: {}", url, err);
}
};
}
async fn send_relay_event(
&self,
relay_msg: RelayEvent,
sender: Option<oneshot::Sender<bool>>,
) -> Result<(), Error> {
time::timeout(Some(Duration::from_secs(60)), async {
self.relay_sender
.send((relay_msg, sender))
.await
.map_err(|_| Error::MessagetNotSent)
})
.await
.ok_or(Error::ChannelTimeout)??;
Ok(())
}
async fn disconnect(&self) -> Result<(), Error> {
let status = self.status().await;
if status.ne(&RelayStatus::Disconnected) && status.ne(&RelayStatus::Terminated) {
self.send_relay_event(RelayEvent::Close, None).await?;
}
Ok(())
}
pub async fn terminate(&self) -> Result<(), Error> {
self.schedule_for_termination(true);
let status = self.status().await;
if status.ne(&RelayStatus::Disconnected) && status.ne(&RelayStatus::Terminated) {
self.send_relay_event(RelayEvent::Terminate, None).await?;
}
Ok(())
}
pub async fn send_msg(&self, msg: ClientMessage, wait: bool) -> Result<(), Error> {
if !self.opts.write() {
if let ClientMessage::Event(_) = msg {
return Err(Error::WriteDisabled);
}
}
if !self.opts.read() {
if let ClientMessage::Req { .. } | ClientMessage::Close(_) = msg {
return Err(Error::ReadDisabled);
}
}
if wait {
let (tx, rx) = oneshot::channel::<bool>();
self.send_relay_event(RelayEvent::SendMsg(Box::new(msg)), Some(tx))
.await?;
match time::timeout(Some(Duration::from_secs(60)), rx).await {
Some(result) => match result {
Ok(val) => {
if val {
Ok(())
} else {
Err(Error::MessagetNotSent)
}
}
Err(_) => Err(Error::OneShotRecvError),
},
_ => Err(Error::RecvTimeout),
}
} else {
self.send_relay_event(RelayEvent::SendMsg(Box::new(msg)), None)
.await
}
}
async fn resubscribe(&self, wait: bool) -> Result<SubscriptionId, Error> {
if !self.opts.read() {
return Err(Error::ReadDisabled);
}
let subscription = self.subscription().await;
if subscription.filters.is_empty() {
return Err(Error::FiltersEmpty);
}
self.send_msg(
ClientMessage::new_req(subscription.id.clone(), subscription.filters),
wait,
)
.await?;
Ok(subscription.id)
}
pub async fn subscribe(
&self,
filters: Vec<Filter>,
wait: bool,
) -> Result<SubscriptionId, Error> {
if !self.opts.read() {
return Err(Error::ReadDisabled);
}
self.update_subscription_filters(filters).await;
self.resubscribe(wait).await
}
pub async fn unsubscribe(&self, wait: bool) -> Result<(), Error> {
if !self.opts.read() {
return Err(Error::ReadDisabled);
}
let subscription = self.subscription().await;
self.send_msg(ClientMessage::close(subscription.id), wait)
.await?;
Ok(())
}
pub async fn get_events_of_with_callback<F>(
&self,
filters: Vec<Filter>,
timeout: Option<Duration>,
callback: impl Fn(Event) -> F,
) -> Result<(), Error>
where
F: Future<Output = ()>,
{
if !self.opts.read() {
return Err(Error::ReadDisabled);
}
let id = SubscriptionId::generate();
self.send_msg(ClientMessage::new_req(id.clone(), filters), false)
.await?;
let mut notifications = self.notification_sender.subscribe();
time::timeout(timeout, async {
while let Ok(notification) = notifications.recv().await {
if let RelayPoolNotification::Message(_, msg) = notification {
match msg {
RelayMessage::Event {
subscription_id,
event,
} => {
if subscription_id.eq(&id) {
callback(*event).await;
}
}
RelayMessage::EndOfStoredEvents(subscription_id) => {
if subscription_id.eq(&id) {
break;
}
}
_ => log::debug!("Receive unhandled message {msg:?} on get_events_of"),
};
}
}
})
.await
.ok_or(Error::Timeout)?;
self.send_msg(ClientMessage::close(id), false).await?;
Ok(())
}
pub async fn get_events_of(
&self,
filters: Vec<Filter>,
timeout: Option<Duration>,
) -> Result<Vec<Event>, Error> {
let events: Mutex<Vec<Event>> = Mutex::new(Vec::new());
self.get_events_of_with_callback(filters, timeout, |event| async {
let mut events = events.lock().await;
events.push(event);
})
.await?;
Ok(events.into_inner())
}
pub fn req_events_of(&self, filters: Vec<Filter>, timeout: Option<Duration>) {
if !self.opts.read() {
log::error!("{}", Error::ReadDisabled);
}
let relay = self.clone();
thread::spawn(async move {
let id = SubscriptionId::generate();
if let Err(e) = relay
.send_msg(ClientMessage::new_req(id.clone(), filters), false)
.await
{
log::error!(
"Impossible to send REQ to {}: {}",
relay.url(),
e.to_string()
);
};
let mut notifications = relay.notification_sender.subscribe();
if time::timeout(timeout, async {
while let Ok(notification) = notifications.recv().await {
if let RelayPoolNotification::Message(
_,
RelayMessage::EndOfStoredEvents(subscription_id),
) = notification
{
if subscription_id.eq(&id) {
break;
}
}
}
})
.await
.is_none()
{
log::error!("{}", Error::Timeout);
}
if let Err(e) = relay.send_msg(ClientMessage::close(id), false).await {
log::error!(
"Impossible to close subscription with {}: {}",
relay.url(),
e.to_string()
);
}
});
}
}