1use crate::{NostrOfferEvent, ZincError, OFFER_EVENT_KIND};
7use futures_util::{SinkExt, StreamExt};
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use std::collections::HashSet;
11use std::time::{SystemTime, UNIX_EPOCH};
12use tokio::time::{timeout, Duration};
13use tokio_tungstenite::{connect_async, tungstenite::Message};
14
15const OFFER_SCHEMA_TAG_VALUE: &str = "zinc-offer-v1";
16
17#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
19pub struct RelayPublishResult {
20 pub relay_url: String,
21 pub accepted: bool,
22 pub message: String,
23}
24
25#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
27pub struct RelayQueryOptions {
28 pub limit: usize,
30 pub timeout_ms: u64,
32}
33
34impl Default for RelayQueryOptions {
35 fn default() -> Self {
36 Self {
37 limit: 256,
38 timeout_ms: 5_000,
39 }
40 }
41}
42
43pub struct NostrRelayClient;
45
46impl NostrRelayClient {
47 pub fn event_frame(event: &NostrOfferEvent) -> Result<String, ZincError> {
49 serde_json::to_string(&serde_json::json!(["EVENT", event]))
50 .map_err(|e| ZincError::SerializationError(e.to_string()))
51 }
52
53 pub fn req_frame(subscription_id: &str, limit: usize) -> Result<String, ZincError> {
55 serde_json::to_string(&serde_json::json!([
56 "REQ",
57 subscription_id,
58 {
59 "kinds": [OFFER_EVENT_KIND],
60 "#z": [OFFER_SCHEMA_TAG_VALUE],
61 "limit": limit
62 }
63 ]))
64 .map_err(|e| ZincError::SerializationError(e.to_string()))
65 }
66
67 pub fn close_frame(subscription_id: &str) -> Result<String, ZincError> {
69 serde_json::to_string(&serde_json::json!(["CLOSE", subscription_id]))
70 .map_err(|e| ZincError::SerializationError(e.to_string()))
71 }
72
73 pub fn parse_ok_frame(frame: &str, event_id: &str) -> Option<(bool, String)> {
75 let value: Value = serde_json::from_str(frame).ok()?;
76 let arr = value.as_array()?;
77 if arr.len() != 4 {
78 return None;
79 }
80 if arr.first()?.as_str()? != "OK" {
81 return None;
82 }
83 if arr.get(1)?.as_str()? != event_id {
84 return None;
85 }
86 let accepted = arr.get(2)?.as_bool()?;
87 let message = arr.get(3)?.as_str()?.to_string();
88 Some((accepted, message))
89 }
90
91 pub fn parse_event_frame(frame: &str, subscription_id: &str) -> Option<NostrOfferEvent> {
93 let value: Value = serde_json::from_str(frame).ok()?;
94 let arr = value.as_array()?;
95 if arr.len() != 3 {
96 return None;
97 }
98 if arr.first()?.as_str()? != "EVENT" {
99 return None;
100 }
101 if arr.get(1)?.as_str()? != subscription_id {
102 return None;
103 }
104
105 let event: NostrOfferEvent = serde_json::from_value(arr.get(2)?.clone()).ok()?;
106 event.verify().ok()?;
107 Some(event)
108 }
109
110 pub async fn publish_offer(
112 relay_url: &str,
113 event: &NostrOfferEvent,
114 timeout_ms: u64,
115 ) -> Result<RelayPublishResult, ZincError> {
116 event.verify()?;
117 let (mut socket, _) = connect_async(relay_url).await.map_err(|e| {
118 ZincError::OfferError(format!("failed to connect relay {relay_url}: {e}"))
119 })?;
120
121 let event_frame = Self::event_frame(event)?;
122 socket
123 .send(Message::Text(event_frame.into()))
124 .await
125 .map_err(|e| ZincError::OfferError(format!("failed to send event frame: {e}")))?;
126
127 let relay_url_owned = relay_url.to_string();
128 let event_id = event.id.clone();
129 let ack = timeout(Duration::from_millis(timeout_ms), async move {
130 while let Some(message) = socket.next().await {
131 match message {
132 Ok(Message::Text(text)) => {
133 if let Some((accepted, msg)) =
134 Self::parse_ok_frame(text.as_ref(), &event_id)
135 {
136 return Ok(RelayPublishResult {
137 relay_url: relay_url_owned.clone(),
138 accepted,
139 message: msg,
140 });
141 }
142 }
143 Ok(Message::Binary(bin)) => {
144 if let Ok(text) = std::str::from_utf8(&bin) {
145 if let Some((accepted, msg)) = Self::parse_ok_frame(text, &event_id) {
146 return Ok(RelayPublishResult {
147 relay_url: relay_url_owned.clone(),
148 accepted,
149 message: msg,
150 });
151 }
152 }
153 }
154 Ok(Message::Close(_)) => {
155 break;
156 }
157 Ok(_) => {}
158 Err(e) => {
159 return Err(ZincError::OfferError(format!(
160 "relay read error for {relay_url_owned}: {e}"
161 )));
162 }
163 }
164 }
165
166 Err(ZincError::OfferError(format!(
167 "relay {relay_url_owned} closed before acknowledging event"
168 )))
169 })
170 .await
171 .map_err(|_| {
172 ZincError::OfferError(format!("relay {relay_url} timed out waiting for OK"))
173 })?;
174
175 ack
176 }
177
178 pub async fn publish_offer_multi(
180 relay_urls: &[String],
181 event: &NostrOfferEvent,
182 timeout_ms: u64,
183 ) -> Vec<RelayPublishResult> {
184 let mut tasks = Vec::new();
185 for relay_url in relay_urls {
186 let relay = relay_url.clone();
187 let event = event.clone();
188 tasks.push(tokio::spawn(async move {
189 match Self::publish_offer(&relay, &event, timeout_ms).await {
190 Ok(result) => result,
191 Err(err) => RelayPublishResult {
192 relay_url: relay,
193 accepted: false,
194 message: err.to_string(),
195 },
196 }
197 }));
198 }
199
200 let mut results = Vec::new();
201 for task in tasks {
202 if let Ok(result) = task.await {
203 results.push(result);
204 }
205 }
206 results
207 }
208
209 pub async fn discover_offer_events(
211 relay_url: &str,
212 options: RelayQueryOptions,
213 ) -> Result<Vec<NostrOfferEvent>, ZincError> {
214 let (mut socket, _) = connect_async(relay_url).await.map_err(|e| {
215 ZincError::OfferError(format!("failed to connect relay {relay_url}: {e}"))
216 })?;
217
218 let subscription_id = format!(
219 "zinc-offers-{}",
220 SystemTime::now()
221 .duration_since(UNIX_EPOCH)
222 .unwrap_or_default()
223 .as_nanos()
224 );
225 let req_frame = Self::req_frame(&subscription_id, options.limit)?;
226 socket
227 .send(Message::Text(req_frame.into()))
228 .await
229 .map_err(|e| ZincError::OfferError(format!("failed to send req frame: {e}")))?;
230
231 let mut events = Vec::new();
232 let mut seen_ids = HashSet::new();
233 let sid = subscription_id.clone();
234 timeout(Duration::from_millis(options.timeout_ms), async {
235 while let Some(message) = socket.next().await {
236 match message {
237 Ok(Message::Text(text)) => {
238 if let Some(event) = Self::parse_event_frame(text.as_ref(), &sid) {
239 if seen_ids.insert(event.id.clone()) {
240 events.push(event);
241 }
242 continue;
243 }
244 if is_eose_frame(text.as_ref(), &sid) {
245 break;
246 }
247 }
248 Ok(Message::Binary(bin)) => {
249 if let Ok(text) = std::str::from_utf8(&bin) {
250 if let Some(event) = Self::parse_event_frame(text, &sid) {
251 if seen_ids.insert(event.id.clone()) {
252 events.push(event);
253 }
254 continue;
255 }
256 if is_eose_frame(text, &sid) {
257 break;
258 }
259 }
260 }
261 Ok(Message::Close(_)) => {
262 break;
263 }
264 Ok(_) => {}
265 Err(e) => {
266 return Err(ZincError::OfferError(format!(
267 "relay read error for {relay_url}: {e}"
268 )));
269 }
270 }
271 }
272
273 Ok::<(), ZincError>(())
274 })
275 .await
276 .map_err(|_| {
277 ZincError::OfferError(format!(
278 "relay {relay_url} timed out while discovering offers"
279 ))
280 })??;
281
282 let close = Self::close_frame(&subscription_id)?;
283 let _ = socket.send(Message::Text(close.into())).await;
284 Ok(events)
285 }
286
287 pub async fn discover_offer_events_multi(
289 relay_urls: &[String],
290 options: RelayQueryOptions,
291 ) -> Vec<NostrOfferEvent> {
292 let mut tasks = Vec::new();
293 for relay_url in relay_urls {
294 let relay = relay_url.clone();
295 let options = options.clone();
296 tasks.push(tokio::spawn(async move {
297 Self::discover_offer_events(&relay, options)
298 .await
299 .unwrap_or_default()
300 }));
301 }
302
303 let mut merged = Vec::new();
304 let mut seen_ids = HashSet::new();
305 for task in tasks {
306 if let Ok(events) = task.await {
307 for event in events {
308 if seen_ids.insert(event.id.clone()) {
309 merged.push(event);
310 }
311 }
312 }
313 }
314 merged
315 }
316}
317
318fn is_eose_frame(frame: &str, subscription_id: &str) -> bool {
319 let value: Value = match serde_json::from_str(frame) {
320 Ok(v) => v,
321 Err(_) => return false,
322 };
323 let arr = match value.as_array() {
324 Some(items) => items,
325 None => return false,
326 };
327 if arr.len() != 2 {
328 return false;
329 }
330 arr.first().and_then(Value::as_str) == Some("EOSE")
331 && arr.get(1).and_then(Value::as_str) == Some(subscription_id)
332}