use super::{Transport, TransportCapability, TransportMessage, MessageMetadata};
use crate::{Error, Result};
use async_trait::async_trait;
use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};
use serde::{Deserialize, Serialize};
use sha2::{Sha256, Digest};
use std::collections::VecDeque;
use std::sync::Arc;
use tokio::sync::Mutex;
pub mod kind {
pub const SEALED_DM: u16 = 1059;
pub const GIFT_WRAP: u16 = 1060;
pub const PQC_MESSAGE: u16 = 30078;
pub const PQC_KEY_ANNOUNCE: u16 = 30079;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NostrEvent {
pub id: String,
pub pubkey: String,
pub created_at: u64,
pub kind: u16,
pub tags: Vec<Vec<String>>,
pub content: String,
pub sig: String,
}
impl NostrEvent {
pub fn compute_id(&self) -> String {
let serialized = serde_json::json!([
0,
&self.pubkey,
self.created_at,
self.kind,
&self.tags,
&self.content
]);
let hash = Sha256::digest(serialized.to_string().as_bytes());
hex::encode(hash)
}
pub fn gift_wrap(
sender_pubkey: &str,
recipient_pubkey: &str,
content: &str,
_timestamp: u64,
) -> Self {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
Self {
id: String::new(), pubkey: sender_pubkey.to_string(),
created_at: now,
kind: kind::GIFT_WRAP,
tags: vec![vec!["p".to_string(), recipient_pubkey.to_string()]],
content: content.to_string(), sig: String::new(), }
}
pub fn pqc_message(
sender_pubkey: &str,
recipient_pubkey: &str,
encrypted_content: &[u8],
kem_ciphertext: &[u8],
) -> Self {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
Self {
id: String::new(),
pubkey: sender_pubkey.to_string(),
created_at: now,
kind: kind::PQC_MESSAGE,
tags: vec![
vec!["p".to_string(), recipient_pubkey.to_string()],
vec!["pqc".to_string(), "ml-kem-1024".to_string()],
vec!["kem".to_string(), BASE64.encode(kem_ciphertext)],
],
content: BASE64.encode(encrypted_content),
sig: String::new(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum RelayMessage {
Event(String, String, NostrEvent),
Request(String, String, serde_json::Value),
Close(String, String),
Ok(String, String, bool, String),
Eose(String, String),
}
pub struct RelayConnection {
pub url: String,
pub connected: bool,
pub subscriptions: Vec<String>,
}
pub struct NostrTransport {
relays: Vec<RelayConnection>,
pubkey: String,
incoming: Arc<Mutex<VecDeque<TransportMessage>>>,
seen_events: Arc<Mutex<std::collections::HashSet<String>>>,
}
impl NostrTransport {
pub fn new(pubkey: String, relay_urls: Vec<String>) -> Self {
let relays = relay_urls
.into_iter()
.map(|url| RelayConnection {
url,
connected: false,
subscriptions: Vec::new(),
})
.collect();
Self {
relays,
pubkey,
incoming: Arc::new(Mutex::new(VecDeque::new())),
seen_events: Arc::new(Mutex::new(std::collections::HashSet::new())),
}
}
pub fn add_relay(&mut self, url: String) {
self.relays.push(RelayConnection {
url,
connected: false,
subscriptions: Vec::new(),
});
}
pub fn connected_relays(&self) -> Vec<&str> {
self.relays
.iter()
.filter(|r| r.connected)
.map(|r| r.url.as_str())
.collect()
}
async fn subscribe(&mut self, relay_idx: usize) -> Result<String> {
let sub_id = format!("qcomm_{}", rand::random::<u64>());
let filter = serde_json::json!({
"#p": [&self.pubkey],
"kinds": [kind::GIFT_WRAP, kind::PQC_MESSAGE],
"since": std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() - 86400, });
if let Some(relay) = self.relays.get_mut(relay_idx) {
relay.subscriptions.push(sub_id.clone());
}
Ok(sub_id)
}
async fn publish(&self, event: NostrEvent) -> Result<()> {
let _message = serde_json::json!(["EVENT", event]);
for relay in &self.relays {
if relay.connected {
tracing::debug!("Publishing to relay: {}", relay.url);
}
}
Ok(())
}
async fn process_event(&self, event: NostrEvent) -> Result<Option<TransportMessage>> {
let mut seen = self.seen_events.lock().await;
if seen.contains(&event.id) {
return Ok(None);
}
seen.insert(event.id.clone());
drop(seen);
match event.kind {
kind::GIFT_WRAP | kind::SEALED_DM => {
Ok(Some(TransportMessage {
id: event.id.clone(),
from: event.pubkey.clone(),
to: self.pubkey.clone(),
payload: event.content.as_bytes().to_vec(),
timestamp: event.created_at * 1000,
metadata: MessageMetadata::Nostr {
event_id: event.id,
relay: String::new(), },
}))
}
kind::PQC_MESSAGE => {
let payload = BASE64.decode(&event.content)
.map_err(|e| Error::InvalidMessage(e.to_string()))?;
Ok(Some(TransportMessage {
id: event.id.clone(),
from: event.pubkey.clone(),
to: self.pubkey.clone(),
payload,
timestamp: event.created_at * 1000,
metadata: MessageMetadata::Nostr {
event_id: event.id,
relay: String::new(),
},
}))
}
_ => Ok(None),
}
}
}
#[async_trait]
impl Transport for NostrTransport {
fn name(&self) -> &str {
"Nostr"
}
fn capabilities(&self) -> Vec<TransportCapability> {
vec![
TransportCapability::Send,
TransportCapability::Receive,
TransportCapability::OfflineDelivery,
TransportCapability::GroupChannel,
TransportCapability::PostQuantum,
]
}
async fn is_connected(&self) -> bool {
self.relays.iter().any(|r| r.connected)
}
async fn connect(&mut self) -> Result<()> {
for relay in &mut self.relays {
tracing::info!("Connecting to relay: {}", relay.url);
relay.connected = true;
}
for i in 0..self.relays.len() {
if self.relays[i].connected {
self.subscribe(i).await?;
}
}
Ok(())
}
async fn disconnect(&mut self) -> Result<()> {
for relay in &mut self.relays {
relay.connected = false;
relay.subscriptions.clear();
}
Ok(())
}
async fn send(&self, message: TransportMessage) -> Result<()> {
let event = NostrEvent::pqc_message(
&self.pubkey,
&message.to,
&message.payload,
&[], );
self.publish(event).await
}
async fn receive(&mut self) -> Result<TransportMessage> {
loop {
if let Some(msg) = self.incoming.lock().await.pop_front() {
return Ok(msg);
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
if !self.is_connected().await {
return Err(Error::Nostr("Disconnected".into()));
}
}
}
async fn poll(&mut self) -> Result<Option<TransportMessage>> {
Ok(self.incoming.lock().await.pop_front())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_event_id_computation() {
let event = NostrEvent {
id: String::new(),
pubkey: "a".repeat(64),
created_at: 1234567890,
kind: 1,
tags: vec![],
content: "Hello".to_string(),
sig: String::new(),
};
let id = event.compute_id();
assert_eq!(id.len(), 64); }
#[tokio::test]
async fn test_nostr_transport() {
let transport = NostrTransport::new(
"a".repeat(64),
vec!["wss://relay.example.com".into()],
);
assert!(!transport.is_connected().await);
assert_eq!(transport.name(), "Nostr");
}
}