use std::collections::{HashMap, VecDeque};
#[cfg(not(target_arch = "wasm32"))]
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use nostr::url::Url;
use nostr::{ClientMessage, Event, EventId, Filter, RelayMessage};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::{broadcast, Mutex};
use super::{Error as RelayError, Relay, RelayOptions};
use crate::thread;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
Relay(#[from] RelayError),
#[error("no relay connected")]
NoRelayConnected,
#[error("relay not found")]
RelayNotFound,
#[error(transparent)]
Thread(#[from] thread::Error),
}
#[derive(Debug)]
pub enum RelayPoolMessage {
ReceivedMsg {
relay_url: Url,
msg: RelayMessage,
},
EventSent(Box<Event>),
Shutdown,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RelayPoolNotification {
Event(Url, Event),
Message(Url, RelayMessage),
Shutdown,
}
struct RelayPoolTask {
receiver: Receiver<RelayPoolMessage>,
notification_sender: broadcast::Sender<RelayPoolNotification>,
events: VecDeque<EventId>,
}
const MAX_EVENTS: usize = 100000;
impl RelayPoolTask {
pub fn new(
pool_task_receiver: Receiver<RelayPoolMessage>,
notification_sender: broadcast::Sender<RelayPoolNotification>,
) -> Self {
Self {
receiver: pool_task_receiver,
events: VecDeque::new(),
notification_sender,
}
}
pub async fn run(&mut self) {
log::debug!("RelayPoolTask Thread Started");
while let Some(msg) = self.receiver.recv().await {
match msg {
RelayPoolMessage::ReceivedMsg { relay_url, msg } => {
let _ = self
.notification_sender
.send(RelayPoolNotification::Message(
relay_url.clone(),
msg.clone(),
));
if let RelayMessage::Event { event, .. } = msg {
if event.verify().is_ok() {
if !self.events.contains(&event.id) {
self.add_event(event.id);
let notification =
RelayPoolNotification::Event(relay_url, event.as_ref().clone());
let _ = self.notification_sender.send(notification);
}
}
}
}
RelayPoolMessage::EventSent(event) => {
self.add_event(event.id);
}
RelayPoolMessage::Shutdown => {
if let Err(e) = self
.notification_sender
.send(RelayPoolNotification::Shutdown)
{
log::error!("Impossible to send shutdown notification: {}", e);
}
log::debug!("Exited from RelayPoolTask thread");
self.receiver.close();
break;
}
}
}
}
fn add_event(&mut self, event_id: EventId) {
while self.events.len() >= MAX_EVENTS {
self.events.pop_front();
}
self.events.push_back(event_id);
}
}
#[derive(Debug, Clone)]
pub struct RelayPool {
relays: Arc<Mutex<HashMap<Url, Relay>>>,
pool_task_sender: Sender<RelayPoolMessage>,
notification_sender: broadcast::Sender<RelayPoolNotification>,
filters: Arc<Mutex<Vec<Filter>>>,
}
impl Default for RelayPool {
fn default() -> Self {
Self::new()
}
}
impl RelayPool {
pub fn new() -> Self {
let (notification_sender, _) = broadcast::channel(1024);
let (pool_task_sender, pool_task_receiver) = mpsc::channel(1024);
let mut relay_pool_task =
RelayPoolTask::new(pool_task_receiver, notification_sender.clone());
thread::spawn(async move { relay_pool_task.run().await });
Self {
relays: Arc::new(Mutex::new(HashMap::new())),
pool_task_sender,
notification_sender,
filters: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn notifications(&self) -> broadcast::Receiver<RelayPoolNotification> {
self.notification_sender.subscribe()
}
pub async fn relays(&self) -> HashMap<Url, Relay> {
let relays = self.relays.lock().await;
relays.clone()
}
pub async fn relay(&self, url: &Url) -> Result<Relay, Error> {
let relays = self.relays.lock().await;
relays.get(url).cloned().ok_or(Error::RelayNotFound)
}
pub async fn subscription_filters(&self) -> Vec<Filter> {
self.filters.lock().await.clone()
}
async fn update_subscription_filters(&self, filters: Vec<Filter>) {
let mut f = self.filters.lock().await;
*f = filters;
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn add_relay(
&self,
url: Url,
proxy: Option<SocketAddr>,
opts: RelayOptions,
) -> Result<(), Error> {
let mut relays = self.relays.lock().await;
if !relays.contains_key(&url) {
let relay = Relay::new(
url,
self.pool_task_sender.clone(),
self.notification_sender.clone(),
proxy,
opts,
);
relays.insert(relay.url(), relay);
}
Ok(())
}
#[cfg(target_arch = "wasm32")]
pub async fn add_relay(&self, url: Url, opts: RelayOptions) -> Result<(), Error> {
let mut relays = self.relays.lock().await;
if !relays.contains_key(&url) {
let relay = Relay::new(
url,
self.pool_task_sender.clone(),
self.notification_sender.clone(),
opts,
);
relays.insert(relay.url(), relay);
}
Ok(())
}
pub async fn remove_relay(&self, url: Url) -> Result<(), Error> {
let mut relays = self.relays.lock().await;
if let Some(relay) = relays.remove(&url) {
self.disconnect_relay(&relay).await?;
}
Ok(())
}
pub async fn send_msg(&self, msg: ClientMessage, wait: bool) -> Result<(), Error> {
let relays = self.relays().await;
if relays.is_empty() {
return Err(Error::NoRelayConnected);
}
if let ClientMessage::Event(event) = &msg {
if let Err(e) = self
.pool_task_sender
.send(RelayPoolMessage::EventSent(event.clone()))
.await
{
log::error!("{e}");
};
}
for (url, relay) in relays.into_iter() {
if let Err(e) = relay.send_msg(msg.clone(), wait).await {
log::error!("Impossible to send msg to {url}: {e}");
}
}
Ok(())
}
pub async fn send_msg_to(&self, url: Url, msg: ClientMessage, wait: bool) -> Result<(), Error> {
let relays = self.relays().await;
if let Some(relay) = relays.get(&url) {
relay.send_msg(msg, wait).await?;
Ok(())
} else {
Err(Error::RelayNotFound)
}
}
pub async fn subscribe(&self, filters: Vec<Filter>, wait: bool) {
let relays = self.relays().await;
self.update_subscription_filters(filters.clone()).await;
for relay in relays.values() {
if let Err(e) = relay.subscribe(filters.clone(), wait).await {
log::error!("{e}");
}
}
}
pub async fn unsubscribe(&self, wait: bool) {
let relays = self.relays().await;
for relay in relays.values() {
if let Err(e) = relay.unsubscribe(wait).await {
log::error!("{e}");
}
}
}
pub async fn get_events_of(
&self,
filters: Vec<Filter>,
timeout: Option<Duration>,
) -> Result<Vec<Event>, Error> {
let events: Arc<Mutex<Vec<Event>>> = Arc::new(Mutex::new(Vec::new()));
let mut handles = Vec::new();
let relays = self.relays().await;
for (url, relay) in relays.into_iter() {
let filters = filters.clone();
let events = events.clone();
let handle = thread::spawn(async move {
if let Err(e) = relay
.get_events_of_with_callback(filters, timeout, |event| async {
events.lock().await.push(event);
})
.await
{
log::error!("Failed to get events from {url}: {e}");
}
});
handles.push(handle);
}
for handle in handles.into_iter().flatten() {
handle.join().await?;
}
Ok(events.lock_owned().await.clone())
}
pub async fn req_events_of(&self, filters: Vec<Filter>, timeout: Option<Duration>) {
let relays = self.relays().await;
for relay in relays.values() {
relay.req_events_of(filters.clone(), timeout);
}
}
pub async fn connect(&self, wait_for_connection: bool) {
let relays = self.relays().await;
for relay in relays.values() {
self.connect_relay(relay, wait_for_connection).await;
}
}
pub async fn disconnect(&self) -> Result<(), Error> {
let relays = self.relays().await;
for relay in relays.values() {
self.disconnect_relay(relay).await?;
}
Ok(())
}
pub async fn connect_relay(&self, relay: &Relay, wait_for_connection: bool) {
let filters: Vec<Filter> = self.subscription_filters().await;
relay.update_subscription_filters(filters).await;
relay.connect(wait_for_connection).await;
}
pub async fn disconnect_relay(&self, relay: &Relay) -> Result<(), Error> {
relay.terminate().await?;
Ok(())
}
pub async fn shutdown(self) -> Result<(), Error> {
self.disconnect().await?;
thread::spawn(async move {
thread::sleep(Duration::from_secs(3)).await;
let _ = self.pool_task_sender.send(RelayPoolMessage::Shutdown).await;
});
Ok(())
}
}