use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::Mutex;
use nostr_sdk::prelude::*;
use crate::core::error::{Error, Result};
use crate::relay::RelayPoolTrait;
struct MockRelayInner {
events: Vec<Event>,
subscriptions: HashMap<u32, Vec<Filter>>,
next_sub_id: u32,
}
impl MockRelayInner {
fn new() -> Self {
Self {
events: Vec::new(),
subscriptions: HashMap::new(),
next_sub_id: 0,
}
}
}
pub struct MockRelayPool {
inner: Arc<Mutex<MockRelayInner>>,
notification_tx: tokio::sync::broadcast::Sender<RelayPoolNotification>,
keys: Keys,
}
impl MockRelayPool {
pub fn new() -> Self {
let keys = Keys::generate();
let (tx, _rx) = tokio::sync::broadcast::channel(1024);
Self {
inner: Arc::new(Mutex::new(MockRelayInner::new())),
notification_tx: tx,
keys,
}
}
pub fn mock_public_key(&self) -> PublicKey {
self.keys.public_key()
}
pub fn mock_keys(&self) -> Keys {
self.keys.clone()
}
pub fn with_keys(keys: Keys) -> Self {
let (tx, _rx) = tokio::sync::broadcast::channel(1024);
Self {
inner: Arc::new(Mutex::new(MockRelayInner::new())),
notification_tx: tx,
keys,
}
}
pub fn create_pair() -> (Self, Self) {
let (tx, _rx) = tokio::sync::broadcast::channel(1024);
let inner = Arc::new(Mutex::new(MockRelayInner::new()));
let a = Self {
inner: Arc::clone(&inner),
notification_tx: tx.clone(),
keys: Keys::generate(),
};
let b = Self {
inner,
notification_tx: tx,
keys: Keys::generate(),
};
(a, b)
}
pub fn create_linked_group(n: usize) -> Vec<Self> {
assert!(n > 0, "group must have at least one pool");
let (tx, _rx) = tokio::sync::broadcast::channel(1024);
let inner = Arc::new(Mutex::new(MockRelayInner::new()));
(0..n)
.map(|_| Self {
inner: Arc::clone(&inner),
notification_tx: tx.clone(),
keys: Keys::generate(),
})
.collect()
}
pub async fn stored_events(&self) -> Vec<Event> {
self.inner.lock().await.events.clone()
}
}
impl Default for MockRelayPool {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl RelayPoolTrait for MockRelayPool {
async fn connect(&self, _relay_urls: &[String]) -> Result<()> {
Ok(())
}
async fn disconnect(&self) -> Result<()> {
Ok(())
}
async fn publish_event(&self, event: &Event) -> Result<EventId> {
let event_id = event.id;
{
let mut inner = self.inner.lock().await;
inner.events.push(event.clone());
}
let notification = make_notification(event.clone());
let _ = self.notification_tx.send(notification);
Ok(event_id)
}
async fn publish(&self, builder: EventBuilder) -> Result<EventId> {
let event = sign_with_keys(builder, &self.keys)?;
let id = event.id;
self.publish_event(&event).await?;
Ok(id)
}
async fn sign(&self, builder: EventBuilder) -> Result<Event> {
sign_with_keys(builder, &self.keys)
}
async fn signer(&self) -> Result<Arc<dyn NostrSigner>> {
Ok(Arc::new(self.keys.clone()) as Arc<dyn NostrSigner>)
}
fn notifications(&self) -> tokio::sync::broadcast::Receiver<RelayPoolNotification> {
self.notification_tx.subscribe()
}
async fn public_key(&self) -> Result<PublicKey> {
Ok(self.keys.public_key())
}
async fn subscribe(&self, filters: Vec<Filter>) -> Result<()> {
let replay = {
let mut inner = self.inner.lock().await;
let sub_id = inner.next_sub_id;
inner.next_sub_id += 1;
inner.subscriptions.insert(sub_id, filters);
let events_snapshot = inner.events.clone();
let stored = inner.subscriptions.get(&sub_id).expect("just inserted");
events_snapshot
.into_iter()
.filter(|e| {
stored
.iter()
.any(|f| f.match_event(e, MatchEventOptions::default()))
})
.collect::<Vec<_>>()
};
for event in replay {
let _ = self.notification_tx.send(make_notification(event));
}
Ok(())
}
}
fn sign_with_keys(builder: EventBuilder, keys: &Keys) -> Result<Event> {
builder
.sign_with_keys(keys)
.map_err(|e| Error::Transport(e.to_string()))
}
fn make_notification(event: Event) -> RelayPoolNotification {
RelayPoolNotification::Event {
relay_url: RelayUrl::parse("wss://mock.relay").expect("hardcoded URL"),
subscription_id: SubscriptionId::generate(),
event: Box::new(event),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn connect_and_disconnect_are_noops() {
let pool = MockRelayPool::new();
assert!(pool.connect(&["wss://unused".to_string()]).await.is_ok());
assert!(pool.disconnect().await.is_ok());
}
#[tokio::test]
async fn publish_event_stores_and_broadcasts() {
let pool = MockRelayPool::new();
let mut rx = pool.notifications();
let keys = Keys::generate();
let event = EventBuilder::new(Kind::TextNote, "hello")
.sign_with_keys(&keys)
.unwrap();
pool.publish_event(&event).await.unwrap();
assert_eq!(pool.stored_events().await.len(), 1);
let notif = rx.try_recv().unwrap();
if let RelayPoolNotification::Event { event: e, .. } = notif {
assert_eq!(e.id, event.id);
} else {
panic!("expected Event notification");
}
}
#[tokio::test]
async fn publish_signs_and_stores() {
let pool = MockRelayPool::new();
let builder = EventBuilder::new(Kind::TextNote, "signed");
pool.publish(builder).await.unwrap();
let stored = pool.stored_events().await;
assert_eq!(stored.len(), 1);
assert_eq!(stored[0].pubkey, pool.mock_public_key());
}
#[tokio::test]
async fn sign_does_not_publish() {
let pool = MockRelayPool::new();
let builder = EventBuilder::new(Kind::TextNote, "unsigned");
let event = pool.sign(builder).await.unwrap();
assert_eq!(event.pubkey, pool.mock_public_key());
assert!(pool.stored_events().await.is_empty());
}
#[tokio::test]
async fn signer_uses_same_key_as_publish() {
let pool = MockRelayPool::new();
let signer = pool.signer().await.unwrap();
let expected_pubkey = pool.mock_public_key();
assert_eq!(signer.get_public_key().await.unwrap(), expected_pubkey);
}
#[tokio::test]
async fn subscribe_replays_matching_stored_events() {
let pool = MockRelayPool::new();
let mut rx = pool.notifications();
let keys = Keys::generate();
let e1 = EventBuilder::new(Kind::TextNote, "one")
.sign_with_keys(&keys)
.unwrap();
let e2 = EventBuilder::new(Kind::Custom(9999), "two")
.sign_with_keys(&keys)
.unwrap();
pool.publish_event(&e1).await.unwrap();
pool.publish_event(&e2).await.unwrap();
rx.try_recv().unwrap();
rx.try_recv().unwrap();
let filter = Filter::new().kind(Kind::TextNote);
pool.subscribe(vec![filter]).await.unwrap();
let replayed = rx.try_recv().unwrap();
if let RelayPoolNotification::Event { event, .. } = replayed {
assert_eq!(event.id, e1.id);
} else {
panic!("expected replayed Event notification");
}
assert!(rx.try_recv().is_err());
}
#[tokio::test]
async fn notifications_receives_future_publishes() {
let pool = MockRelayPool::new();
let mut rx = pool.notifications();
let keys = Keys::generate();
let event = EventBuilder::new(Kind::TextNote, "future")
.sign_with_keys(&keys)
.unwrap();
pool.publish_event(&event).await.unwrap();
let notif = rx.try_recv().unwrap();
assert!(matches!(notif, RelayPoolNotification::Event { .. }));
}
}