use crate::{NostrOfferEvent, ZincError, OFFER_EVENT_KIND};
use futures_util::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashSet;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::time::{timeout, Duration};
use tokio_tungstenite::{connect_async, tungstenite::Message};
const OFFER_SCHEMA_TAG_VALUE: &str = "zinc-offer-v1";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RelayPublishResult {
pub relay_url: String,
pub accepted: bool,
pub message: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RelayQueryOptions {
pub limit: usize,
pub timeout_ms: u64,
}
impl Default for RelayQueryOptions {
fn default() -> Self {
Self {
limit: 256,
timeout_ms: 5_000,
}
}
}
pub struct NostrRelayClient;
impl NostrRelayClient {
pub fn event_frame(event: &NostrOfferEvent) -> Result<String, ZincError> {
serde_json::to_string(&serde_json::json!(["EVENT", event]))
.map_err(|e| ZincError::SerializationError(e.to_string()))
}
pub fn req_frame(subscription_id: &str, limit: usize) -> Result<String, ZincError> {
serde_json::to_string(&serde_json::json!([
"REQ",
subscription_id,
{
"kinds": [OFFER_EVENT_KIND],
"#z": [OFFER_SCHEMA_TAG_VALUE],
"limit": limit
}
]))
.map_err(|e| ZincError::SerializationError(e.to_string()))
}
pub fn close_frame(subscription_id: &str) -> Result<String, ZincError> {
serde_json::to_string(&serde_json::json!(["CLOSE", subscription_id]))
.map_err(|e| ZincError::SerializationError(e.to_string()))
}
pub fn parse_ok_frame(frame: &str, event_id: &str) -> Option<(bool, String)> {
let value: Value = serde_json::from_str(frame).ok()?;
let arr = value.as_array()?;
if arr.len() != 4 {
return None;
}
if arr.first()?.as_str()? != "OK" {
return None;
}
if arr.get(1)?.as_str()? != event_id {
return None;
}
let accepted = arr.get(2)?.as_bool()?;
let message = arr.get(3)?.as_str()?.to_string();
Some((accepted, message))
}
pub fn parse_event_frame(frame: &str, subscription_id: &str) -> Option<NostrOfferEvent> {
let value: Value = serde_json::from_str(frame).ok()?;
let arr = value.as_array()?;
if arr.len() != 3 {
return None;
}
if arr.first()?.as_str()? != "EVENT" {
return None;
}
if arr.get(1)?.as_str()? != subscription_id {
return None;
}
let event: NostrOfferEvent = serde_json::from_value(arr.get(2)?.clone()).ok()?;
event.verify().ok()?;
Some(event)
}
pub async fn publish_offer(
relay_url: &str,
event: &NostrOfferEvent,
timeout_ms: u64,
) -> Result<RelayPublishResult, ZincError> {
event.verify()?;
let (mut socket, _) = connect_async(relay_url).await.map_err(|e| {
ZincError::OfferError(format!("failed to connect relay {relay_url}: {e}"))
})?;
let event_frame = Self::event_frame(event)?;
socket
.send(Message::Text(event_frame))
.await
.map_err(|e| ZincError::OfferError(format!("failed to send event frame: {e}")))?;
let relay_url_owned = relay_url.to_string();
let event_id = event.id.clone();
let ack = timeout(Duration::from_millis(timeout_ms), async move {
while let Some(message) = socket.next().await {
match message {
Ok(Message::Text(text)) => {
if let Some((accepted, msg)) =
Self::parse_ok_frame(text.as_ref(), &event_id)
{
return Ok(RelayPublishResult {
relay_url: relay_url_owned.clone(),
accepted,
message: msg,
});
}
}
Ok(Message::Binary(bin)) => {
if let Ok(text) = std::str::from_utf8(&bin) {
if let Some((accepted, msg)) = Self::parse_ok_frame(text, &event_id) {
return Ok(RelayPublishResult {
relay_url: relay_url_owned.clone(),
accepted,
message: msg,
});
}
}
}
Ok(Message::Close(_)) => {
break;
}
Ok(_) => {}
Err(e) => {
return Err(ZincError::OfferError(format!(
"relay read error for {relay_url_owned}: {e}"
)));
}
}
}
Err(ZincError::OfferError(format!(
"relay {relay_url_owned} closed before acknowledging event"
)))
})
.await
.map_err(|_| {
ZincError::OfferError(format!("relay {relay_url} timed out waiting for OK"))
})?;
ack
}
pub async fn publish_offer_multi(
relay_urls: &[String],
event: &NostrOfferEvent,
timeout_ms: u64,
) -> Vec<RelayPublishResult> {
let mut tasks = Vec::new();
for relay_url in relay_urls {
let relay = relay_url.clone();
let event = event.clone();
tasks.push(tokio::spawn(async move {
match Self::publish_offer(&relay, &event, timeout_ms).await {
Ok(result) => result,
Err(err) => RelayPublishResult {
relay_url: relay,
accepted: false,
message: err.to_string(),
},
}
}));
}
let mut results = Vec::new();
for task in tasks {
if let Ok(result) = task.await {
results.push(result);
}
}
results
}
pub async fn discover_offer_events(
relay_url: &str,
options: RelayQueryOptions,
) -> Result<Vec<NostrOfferEvent>, ZincError> {
let (mut socket, _) = connect_async(relay_url).await.map_err(|e| {
ZincError::OfferError(format!("failed to connect relay {relay_url}: {e}"))
})?;
let subscription_id = format!(
"zinc-offers-{}",
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos()
);
let req_frame = Self::req_frame(&subscription_id, options.limit)?;
socket
.send(Message::Text(req_frame))
.await
.map_err(|e| ZincError::OfferError(format!("failed to send req frame: {e}")))?;
let mut events = Vec::new();
let mut seen_ids = HashSet::new();
let sid = subscription_id.clone();
timeout(Duration::from_millis(options.timeout_ms), async {
while let Some(message) = socket.next().await {
match message {
Ok(Message::Text(text)) => {
if let Some(event) = Self::parse_event_frame(text.as_ref(), &sid) {
if seen_ids.insert(event.id.clone()) {
events.push(event);
}
continue;
}
if is_eose_frame(text.as_ref(), &sid) {
break;
}
}
Ok(Message::Binary(bin)) => {
if let Ok(text) = std::str::from_utf8(&bin) {
if let Some(event) = Self::parse_event_frame(text, &sid) {
if seen_ids.insert(event.id.clone()) {
events.push(event);
}
continue;
}
if is_eose_frame(text, &sid) {
break;
}
}
}
Ok(Message::Close(_)) => {
break;
}
Ok(_) => {}
Err(e) => {
return Err(ZincError::OfferError(format!(
"relay read error for {relay_url}: {e}"
)));
}
}
}
Ok::<(), ZincError>(())
})
.await
.map_err(|_| {
ZincError::OfferError(format!(
"relay {relay_url} timed out while discovering offers"
))
})??;
let close = Self::close_frame(&subscription_id)?;
let _ = socket.send(Message::Text(close)).await;
Ok(events)
}
pub async fn discover_offer_events_multi(
relay_urls: &[String],
options: RelayQueryOptions,
) -> Vec<NostrOfferEvent> {
let mut tasks = Vec::new();
for relay_url in relay_urls {
let relay = relay_url.clone();
let options = options.clone();
tasks.push(tokio::spawn(async move {
Self::discover_offer_events(&relay, options)
.await
.unwrap_or_default()
}));
}
let mut merged = Vec::new();
let mut seen_ids = HashSet::new();
for task in tasks {
if let Ok(events) = task.await {
for event in events {
if seen_ids.insert(event.id.clone()) {
merged.push(event);
}
}
}
}
merged
}
}
fn is_eose_frame(frame: &str, subscription_id: &str) -> bool {
let value: Value = match serde_json::from_str(frame) {
Ok(v) => v,
Err(_) => return false,
};
let arr = match value.as_array() {
Some(items) => items,
None => return false,
};
if arr.len() != 2 {
return false;
}
arr.first().and_then(Value::as_str) == Some("EOSE")
&& arr.get(1).and_then(Value::as_str) == Some(subscription_id)
}