zinc_core/
listing_relay.rs1use crate::{NostrListingEvent, ZincError, LISTING_EVENT_KIND};
4use futures_util::{SinkExt, StreamExt};
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use std::collections::HashSet;
8use std::time::{SystemTime, UNIX_EPOCH};
9use tokio::time::{timeout, Duration};
10use tokio_tungstenite::{connect_async, tungstenite::Message};
11
12const LISTING_SCHEMA_TAG_VALUE: &str = "zinc-listing-v1";
13
14#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
16pub struct ListingRelayPublishResult {
17 pub relay_url: String,
18 pub accepted: bool,
19 pub message: String,
20}
21
22#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
24pub struct ListingRelayQueryOptions {
25 pub limit: usize,
27 pub timeout_ms: u64,
29}
30
31impl Default for ListingRelayQueryOptions {
32 fn default() -> Self {
33 Self {
34 limit: 256,
35 timeout_ms: 5_000,
36 }
37 }
38}
39
40pub struct NostrListingRelayClient;
42
43impl NostrListingRelayClient {
44 pub fn event_frame(event: &NostrListingEvent) -> Result<String, ZincError> {
46 serde_json::to_string(&serde_json::json!(["EVENT", event]))
47 .map_err(|e| ZincError::SerializationError(e.to_string()))
48 }
49
50 pub fn req_frame(subscription_id: &str, limit: usize) -> Result<String, ZincError> {
52 serde_json::to_string(&serde_json::json!([
53 "REQ",
54 subscription_id,
55 {
56 "kinds": [LISTING_EVENT_KIND],
57 "#z": [LISTING_SCHEMA_TAG_VALUE],
58 "limit": limit
59 }
60 ]))
61 .map_err(|e| ZincError::SerializationError(e.to_string()))
62 }
63
64 pub fn close_frame(subscription_id: &str) -> Result<String, ZincError> {
66 serde_json::to_string(&serde_json::json!(["CLOSE", subscription_id]))
67 .map_err(|e| ZincError::SerializationError(e.to_string()))
68 }
69
70 pub fn parse_ok_frame(frame: &str, event_id: &str) -> Option<(bool, String)> {
72 let value: Value = serde_json::from_str(frame).ok()?;
73 let arr = value.as_array()?;
74 if arr.len() != 4 {
75 return None;
76 }
77 if arr.first()?.as_str()? != "OK" {
78 return None;
79 }
80 if arr.get(1)?.as_str()? != event_id {
81 return None;
82 }
83 let accepted = arr.get(2)?.as_bool()?;
84 let message = arr.get(3)?.as_str()?.to_string();
85 Some((accepted, message))
86 }
87
88 pub fn parse_event_frame(frame: &str, subscription_id: &str) -> Option<NostrListingEvent> {
90 let value: Value = serde_json::from_str(frame).ok()?;
91 let arr = value.as_array()?;
92 if arr.len() != 3 {
93 return None;
94 }
95 if arr.first()?.as_str()? != "EVENT" {
96 return None;
97 }
98 if arr.get(1)?.as_str()? != subscription_id {
99 return None;
100 }
101
102 let event: NostrListingEvent = serde_json::from_value(arr.get(2)?.clone()).ok()?;
103 event.verify().ok()?;
104 Some(event)
105 }
106
107 pub async fn publish_listing(
109 relay_url: &str,
110 event: &NostrListingEvent,
111 timeout_ms: u64,
112 ) -> Result<ListingRelayPublishResult, ZincError> {
113 event.verify()?;
114 let (mut socket, _) = connect_async(relay_url).await.map_err(|e| {
115 ZincError::OfferError(format!("failed to connect relay {relay_url}: {e}"))
116 })?;
117
118 let event_frame = Self::event_frame(event)?;
119 socket
120 .send(Message::Text(event_frame))
121 .await
122 .map_err(|e| ZincError::OfferError(format!("failed to send event frame: {e}")))?;
123
124 let relay_url_owned = relay_url.to_string();
125 let event_id = event.id.clone();
126 let ack = timeout(Duration::from_millis(timeout_ms), async move {
127 while let Some(message) = socket.next().await {
128 match message {
129 Ok(Message::Text(text)) => {
130 if let Some((accepted, msg)) =
131 Self::parse_ok_frame(text.as_ref(), &event_id)
132 {
133 return Ok(ListingRelayPublishResult {
134 relay_url: relay_url_owned.clone(),
135 accepted,
136 message: msg,
137 });
138 }
139 }
140 Ok(Message::Binary(bin)) => {
141 if let Ok(text) = std::str::from_utf8(&bin) {
142 if let Some((accepted, msg)) = Self::parse_ok_frame(text, &event_id) {
143 return Ok(ListingRelayPublishResult {
144 relay_url: relay_url_owned.clone(),
145 accepted,
146 message: msg,
147 });
148 }
149 }
150 }
151 Ok(Message::Close(_)) => {
152 break;
153 }
154 Ok(_) => {}
155 Err(e) => {
156 return Err(ZincError::OfferError(format!(
157 "relay read error for {relay_url_owned}: {e}"
158 )));
159 }
160 }
161 }
162
163 Err(ZincError::OfferError(format!(
164 "relay {relay_url_owned} closed before acknowledging event"
165 )))
166 })
167 .await
168 .map_err(|_| {
169 ZincError::OfferError(format!("relay {relay_url} timed out waiting for OK"))
170 })?;
171
172 ack
173 }
174
175 pub async fn publish_listing_multi(
177 relay_urls: &[String],
178 event: &NostrListingEvent,
179 timeout_ms: u64,
180 ) -> Vec<ListingRelayPublishResult> {
181 let mut tasks = Vec::new();
182 for relay_url in relay_urls {
183 let relay = relay_url.clone();
184 let event = event.clone();
185 tasks.push(tokio::spawn(async move {
186 match Self::publish_listing(&relay, &event, timeout_ms).await {
187 Ok(result) => result,
188 Err(err) => ListingRelayPublishResult {
189 relay_url: relay,
190 accepted: false,
191 message: err.to_string(),
192 },
193 }
194 }));
195 }
196
197 let mut results = Vec::new();
198 for task in tasks {
199 if let Ok(result) = task.await {
200 results.push(result);
201 }
202 }
203 results
204 }
205
206 pub async fn discover_listing_events(
208 relay_url: &str,
209 options: ListingRelayQueryOptions,
210 ) -> Result<Vec<NostrListingEvent>, ZincError> {
211 let (mut socket, _) = connect_async(relay_url).await.map_err(|e| {
212 ZincError::OfferError(format!("failed to connect relay {relay_url}: {e}"))
213 })?;
214
215 let subscription_id = format!(
216 "zinc-listings-{}",
217 SystemTime::now()
218 .duration_since(UNIX_EPOCH)
219 .unwrap_or_default()
220 .as_nanos()
221 );
222 let req_frame = Self::req_frame(&subscription_id, options.limit)?;
223 socket
224 .send(Message::Text(req_frame))
225 .await
226 .map_err(|e| ZincError::OfferError(format!("failed to send req frame: {e}")))?;
227
228 let mut events = Vec::new();
229 let mut seen_ids = HashSet::new();
230 let sid = subscription_id.clone();
231 timeout(Duration::from_millis(options.timeout_ms), async {
232 while let Some(message) = socket.next().await {
233 match message {
234 Ok(Message::Text(text)) => {
235 if let Some(event) = Self::parse_event_frame(text.as_ref(), &sid) {
236 if seen_ids.insert(event.id.clone()) {
237 events.push(event);
238 }
239 continue;
240 }
241 if is_eose_frame(text.as_ref(), &sid) {
242 break;
243 }
244 }
245 Ok(Message::Binary(bin)) => {
246 if let Ok(text) = std::str::from_utf8(&bin) {
247 if let Some(event) = Self::parse_event_frame(text, &sid) {
248 if seen_ids.insert(event.id.clone()) {
249 events.push(event);
250 }
251 continue;
252 }
253 if is_eose_frame(text, &sid) {
254 break;
255 }
256 }
257 }
258 Ok(Message::Close(_)) => {
259 break;
260 }
261 Ok(_) => {}
262 Err(e) => {
263 return Err(ZincError::OfferError(format!(
264 "relay read error for {relay_url}: {e}"
265 )));
266 }
267 }
268 }
269
270 Ok::<(), ZincError>(())
271 })
272 .await
273 .map_err(|_| {
274 ZincError::OfferError(format!(
275 "relay {relay_url} timed out while discovering listings"
276 ))
277 })??;
278
279 let close = Self::close_frame(&subscription_id)?;
280 let _ = socket.send(Message::Text(close)).await;
281 Ok(events)
282 }
283
284 pub async fn discover_listing_events_multi(
286 relay_urls: &[String],
287 options: ListingRelayQueryOptions,
288 ) -> Vec<NostrListingEvent> {
289 let mut tasks = Vec::new();
290 for relay_url in relay_urls {
291 let relay = relay_url.clone();
292 let options = options.clone();
293 tasks.push(tokio::spawn(async move {
294 Self::discover_listing_events(&relay, options)
295 .await
296 .unwrap_or_default()
297 }));
298 }
299
300 let mut merged = Vec::new();
301 let mut seen_ids = HashSet::new();
302 for task in tasks {
303 if let Ok(events) = task.await {
304 for event in events {
305 if seen_ids.insert(event.id.clone()) {
306 merged.push(event);
307 }
308 }
309 }
310 }
311 merged
312 }
313}
314
315fn is_eose_frame(frame: &str, subscription_id: &str) -> bool {
316 let value: Value = match serde_json::from_str(frame) {
317 Ok(v) => v,
318 Err(_) => return false,
319 };
320 let arr = match value.as_array() {
321 Some(items) => items,
322 None => return false,
323 };
324 if arr.len() != 2 {
325 return false;
326 }
327 arr.first().and_then(Value::as_str) == Some("EOSE")
328 && arr.get(1).and_then(Value::as_str) == Some(subscription_id)
329}