use std::{future::Future, pin::Pin};
use amico::{a2a::network::Network, resource::Resource};
use nostr::{
event::{EventBuilder, Kind, Tag},
types::{Filter, SingleLetterTag, Timestamp},
};
use nostr_sdk::{Client, RelayPoolNotification};
use solana_sdk::signer::Signer;
use crate::a2a::crypto;
use crate::web3::wallet::Wallet;
use super::{error::NetworkError, interface::A2aNetwork};
#[derive(Clone)]
pub struct DephyNetwork {
client: nostr_sdk::Client,
wallet: Resource<Wallet>,
}
const MENTION_TAG: SingleLetterTag = SingleLetterTag::lowercase(nostr::Alphabet::P);
const SESSION_TAG: SingleLetterTag = SingleLetterTag::lowercase(nostr::Alphabet::S);
const SESSION_ID: &str = "amico_dephy_session";
impl DephyNetwork {
pub fn new(keys: nostr::Keys, wallet: Resource<Wallet>) -> Self {
let client_opts = nostr_sdk::Options::default();
let client = Client::builder().opts(client_opts).signer(keys).build();
Self { client, wallet }
}
}
impl Network for DephyNetwork {
type Message = String;
type Address = solana_sdk::pubkey::Pubkey;
type Error = NetworkError;
async fn connect(&self) -> Result<(), Self::Error> {
self.client
.add_relay("wss://canary-relay.dephy.dev")
.await?;
self.client.connect().await;
Ok(())
}
async fn publish(
&self,
address: Self::Address,
message: Self::Message,
) -> Result<(), Self::Error> {
let cipher_text = crypto::encrypt_message(&message, &address)?;
let event = EventBuilder::new(Kind::Custom(1573), cipher_text).tags([
Tag::parse([SESSION_TAG.to_string(), SESSION_ID.to_string()])?,
Tag::parse([MENTION_TAG.to_string(), address.to_string()])?,
]);
self.client.send_event_builder(event).await?;
let from_address = self.wallet.get().solana().pubkey().to_string();
tracing::info!("Published cipher text from {from_address} to {address}: {message}");
Ok(())
}
async fn subscribe(
&self,
on_message: Box<
dyn Fn(Self::Message) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>
+ Send
+ Sync
+ 'static,
>,
) -> Result<(), Self::Error> {
let client = self.client.clone();
let pubkey = self.wallet.get().solana().pubkey().to_string();
let wallet = self.wallet.clone();
let filter = Filter::new()
.kind(Kind::Custom(1573))
.since(Timestamp::now())
.custom_tag(SESSION_TAG, [SESSION_ID])
.custom_tag(MENTION_TAG, [pubkey.as_str()]);
client.subscribe(vec![filter], None).await?;
tokio::spawn(async move {
if let Err(e) = client
.handle_notifications(|notification| async {
if let RelayPoolNotification::Event { event, .. } = notification {
tracing::info!("Received cipher text {}", event.content);
let keypair = wallet.get().solana();
if let Ok(plaintext) = crypto::decrypt_message(&event.content, keypair) {
tracing::info!("Decrypted message {}", plaintext);
on_message(plaintext).await;
} else {
tracing::info!("Failed to decrypt message");
}
}
Ok(false) })
.await
{
tracing::error!("Failed to handle notifications: {}", e);
}
});
Ok(())
}
}
impl From<DephyNetwork> for A2aNetwork {
fn from(value: DephyNetwork) -> Self {
Self::new(value)
}
}
#[cfg(test)]
mod tests {
use amico::resource::IntoResource;
use nostr::key::Keys;
use tokio::sync::mpsc;
use tokio::time::{Duration, timeout};
use super::*;
#[ignore]
#[tokio::test]
async fn test_network_pubsub() {
let publisher = Wallet::new().unwrap().into_resource();
let subscriber = Wallet::new().unwrap().into_resource();
let publisher_network = DephyNetwork::new(Keys::generate(), publisher.clone());
let subscriber_network = DephyNetwork::new(Keys::generate(), subscriber.clone());
publisher_network.connect().await.unwrap();
subscriber_network.connect().await.unwrap();
let (tx, mut rx) = mpsc::channel(1);
subscriber_network
.subscribe(Box::new(move |message| {
let tx = tx.clone();
Box::pin(async move {
tracing::info!("Received message: {}", message);
tx.send(message).await.unwrap();
})
}))
.await
.unwrap();
publisher_network
.publish(subscriber.get().solana().pubkey(), "test".to_string())
.await
.unwrap();
let received_message = timeout(Duration::from_secs(10), rx.recv())
.await
.expect("Timeout waiting for message")
.expect("Channel closed");
assert_eq!(received_message, "test");
publisher_network
.publish(subscriber.get().solana().pubkey(), "test2".to_string())
.await
.unwrap();
let received_message = timeout(Duration::from_secs(10), rx.recv())
.await
.expect("Timeout waiting for message")
.expect("Channel closed");
assert_eq!(received_message, "test2");
}
}