Skip to main content

zinc_core/
listing_relay.rs

1//! Nostr relay transport helpers for decentralized listing publication/discovery.
2
3use crate::{NostrListingEvent, ZincError, LISTING_EVENT_KIND};
4use futures_util::{SinkExt, StreamExt};
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use std::collections::HashSet;
8use std::time::{SystemTime, UNIX_EPOCH};
9use tokio::time::{timeout, Duration};
10use tokio_tungstenite::{connect_async, tungstenite::Message};
11
12const LISTING_SCHEMA_TAG_VALUE: &str = "zinc-listing-v1";
13
14/// Publish result for one relay.
15#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
16pub struct ListingRelayPublishResult {
17    pub relay_url: String,
18    pub accepted: bool,
19    pub message: String,
20}
21
22/// Query options for listing relay discovery.
23#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
24pub struct ListingRelayQueryOptions {
25    /// Maximum number of events requested from each relay.
26    pub limit: usize,
27    /// Per-relay timeout in milliseconds.
28    pub timeout_ms: u64,
29}
30
31impl Default for ListingRelayQueryOptions {
32    fn default() -> Self {
33        Self {
34            limit: 256,
35            timeout_ms: 5_000,
36        }
37    }
38}
39
40/// Stateless helper for Nostr listing relay framing, publishing, and discovery.
41pub struct NostrListingRelayClient;
42
43impl NostrListingRelayClient {
44    /// Build an `["EVENT", <event>]` frame.
45    pub fn event_frame(event: &NostrListingEvent) -> Result<String, ZincError> {
46        serde_json::to_string(&serde_json::json!(["EVENT", event]))
47            .map_err(|e| ZincError::SerializationError(e.to_string()))
48    }
49
50    /// Build a `["REQ", sub_id, filter]` frame for listing discovery.
51    pub fn req_frame(subscription_id: &str, limit: usize) -> Result<String, ZincError> {
52        serde_json::to_string(&serde_json::json!([
53            "REQ",
54            subscription_id,
55            {
56                "kinds": [LISTING_EVENT_KIND],
57                "#z": [LISTING_SCHEMA_TAG_VALUE],
58                "limit": limit
59            }
60        ]))
61        .map_err(|e| ZincError::SerializationError(e.to_string()))
62    }
63
64    /// Build a `["CLOSE", sub_id]` frame.
65    pub fn close_frame(subscription_id: &str) -> Result<String, ZincError> {
66        serde_json::to_string(&serde_json::json!(["CLOSE", subscription_id]))
67            .map_err(|e| ZincError::SerializationError(e.to_string()))
68    }
69
70    /// Parse relay `["OK", event_id, accepted, message]` frames.
71    pub fn parse_ok_frame(frame: &str, event_id: &str) -> Option<(bool, String)> {
72        let value: Value = serde_json::from_str(frame).ok()?;
73        let arr = value.as_array()?;
74        if arr.len() != 4 {
75            return None;
76        }
77        if arr.first()?.as_str()? != "OK" {
78            return None;
79        }
80        if arr.get(1)?.as_str()? != event_id {
81            return None;
82        }
83        let accepted = arr.get(2)?.as_bool()?;
84        let message = arr.get(3)?.as_str()?.to_string();
85        Some((accepted, message))
86    }
87
88    /// Parse relay `["EVENT", sub_id, event]` frames for the specified subscription.
89    pub fn parse_event_frame(frame: &str, subscription_id: &str) -> Option<NostrListingEvent> {
90        let value: Value = serde_json::from_str(frame).ok()?;
91        let arr = value.as_array()?;
92        if arr.len() != 3 {
93            return None;
94        }
95        if arr.first()?.as_str()? != "EVENT" {
96            return None;
97        }
98        if arr.get(1)?.as_str()? != subscription_id {
99            return None;
100        }
101
102        let event: NostrListingEvent = serde_json::from_value(arr.get(2)?.clone()).ok()?;
103        event.verify().ok()?;
104        Some(event)
105    }
106
107    /// Publish one signed listing event to one relay and wait for relay `OK`.
108    pub async fn publish_listing(
109        relay_url: &str,
110        event: &NostrListingEvent,
111        timeout_ms: u64,
112    ) -> Result<ListingRelayPublishResult, ZincError> {
113        event.verify()?;
114        let (mut socket, _) = connect_async(relay_url).await.map_err(|e| {
115            ZincError::OfferError(format!("failed to connect relay {relay_url}: {e}"))
116        })?;
117
118        let event_frame = Self::event_frame(event)?;
119        socket
120            .send(Message::Text(event_frame))
121            .await
122            .map_err(|e| ZincError::OfferError(format!("failed to send event frame: {e}")))?;
123
124        let relay_url_owned = relay_url.to_string();
125        let event_id = event.id.clone();
126        let ack = timeout(Duration::from_millis(timeout_ms), async move {
127            while let Some(message) = socket.next().await {
128                match message {
129                    Ok(Message::Text(text)) => {
130                        if let Some((accepted, msg)) =
131                            Self::parse_ok_frame(text.as_ref(), &event_id)
132                        {
133                            return Ok(ListingRelayPublishResult {
134                                relay_url: relay_url_owned.clone(),
135                                accepted,
136                                message: msg,
137                            });
138                        }
139                    }
140                    Ok(Message::Binary(bin)) => {
141                        if let Ok(text) = std::str::from_utf8(&bin) {
142                            if let Some((accepted, msg)) = Self::parse_ok_frame(text, &event_id) {
143                                return Ok(ListingRelayPublishResult {
144                                    relay_url: relay_url_owned.clone(),
145                                    accepted,
146                                    message: msg,
147                                });
148                            }
149                        }
150                    }
151                    Ok(Message::Close(_)) => {
152                        break;
153                    }
154                    Ok(_) => {}
155                    Err(e) => {
156                        return Err(ZincError::OfferError(format!(
157                            "relay read error for {relay_url_owned}: {e}"
158                        )));
159                    }
160                }
161            }
162
163            Err(ZincError::OfferError(format!(
164                "relay {relay_url_owned} closed before acknowledging event"
165            )))
166        })
167        .await
168        .map_err(|_| {
169            ZincError::OfferError(format!("relay {relay_url} timed out waiting for OK"))
170        })?;
171
172        ack
173    }
174
175    /// Publish one listing event to multiple relays concurrently.
176    pub async fn publish_listing_multi(
177        relay_urls: &[String],
178        event: &NostrListingEvent,
179        timeout_ms: u64,
180    ) -> Vec<ListingRelayPublishResult> {
181        let mut tasks = Vec::new();
182        for relay_url in relay_urls {
183            let relay = relay_url.clone();
184            let event = event.clone();
185            tasks.push(tokio::spawn(async move {
186                match Self::publish_listing(&relay, &event, timeout_ms).await {
187                    Ok(result) => result,
188                    Err(err) => ListingRelayPublishResult {
189                        relay_url: relay,
190                        accepted: false,
191                        message: err.to_string(),
192                    },
193                }
194            }));
195        }
196
197        let mut results = Vec::new();
198        for task in tasks {
199            if let Ok(result) = task.await {
200                results.push(result);
201            }
202        }
203        results
204    }
205
206    /// Discover valid listing events from a single relay.
207    pub async fn discover_listing_events(
208        relay_url: &str,
209        options: ListingRelayQueryOptions,
210    ) -> Result<Vec<NostrListingEvent>, ZincError> {
211        let (mut socket, _) = connect_async(relay_url).await.map_err(|e| {
212            ZincError::OfferError(format!("failed to connect relay {relay_url}: {e}"))
213        })?;
214
215        let subscription_id = format!(
216            "zinc-listings-{}",
217            SystemTime::now()
218                .duration_since(UNIX_EPOCH)
219                .unwrap_or_default()
220                .as_nanos()
221        );
222        let req_frame = Self::req_frame(&subscription_id, options.limit)?;
223        socket
224            .send(Message::Text(req_frame))
225            .await
226            .map_err(|e| ZincError::OfferError(format!("failed to send req frame: {e}")))?;
227
228        let mut events = Vec::new();
229        let mut seen_ids = HashSet::new();
230        let sid = subscription_id.clone();
231        timeout(Duration::from_millis(options.timeout_ms), async {
232            while let Some(message) = socket.next().await {
233                match message {
234                    Ok(Message::Text(text)) => {
235                        if let Some(event) = Self::parse_event_frame(text.as_ref(), &sid) {
236                            if seen_ids.insert(event.id.clone()) {
237                                events.push(event);
238                            }
239                            continue;
240                        }
241                        if is_eose_frame(text.as_ref(), &sid) {
242                            break;
243                        }
244                    }
245                    Ok(Message::Binary(bin)) => {
246                        if let Ok(text) = std::str::from_utf8(&bin) {
247                            if let Some(event) = Self::parse_event_frame(text, &sid) {
248                                if seen_ids.insert(event.id.clone()) {
249                                    events.push(event);
250                                }
251                                continue;
252                            }
253                            if is_eose_frame(text, &sid) {
254                                break;
255                            }
256                        }
257                    }
258                    Ok(Message::Close(_)) => {
259                        break;
260                    }
261                    Ok(_) => {}
262                    Err(e) => {
263                        return Err(ZincError::OfferError(format!(
264                            "relay read error for {relay_url}: {e}"
265                        )));
266                    }
267                }
268            }
269
270            Ok::<(), ZincError>(())
271        })
272        .await
273        .map_err(|_| {
274            ZincError::OfferError(format!(
275                "relay {relay_url} timed out while discovering listings"
276            ))
277        })??;
278
279        let close = Self::close_frame(&subscription_id)?;
280        let _ = socket.send(Message::Text(close)).await;
281        Ok(events)
282    }
283
284    /// Discover valid listing events across multiple relays and dedupe by event id.
285    pub async fn discover_listing_events_multi(
286        relay_urls: &[String],
287        options: ListingRelayQueryOptions,
288    ) -> Vec<NostrListingEvent> {
289        let mut tasks = Vec::new();
290        for relay_url in relay_urls {
291            let relay = relay_url.clone();
292            let options = options.clone();
293            tasks.push(tokio::spawn(async move {
294                Self::discover_listing_events(&relay, options)
295                    .await
296                    .unwrap_or_default()
297            }));
298        }
299
300        let mut merged = Vec::new();
301        let mut seen_ids = HashSet::new();
302        for task in tasks {
303            if let Ok(events) = task.await {
304                for event in events {
305                    if seen_ids.insert(event.id.clone()) {
306                        merged.push(event);
307                    }
308                }
309            }
310        }
311        merged
312    }
313}
314
315fn is_eose_frame(frame: &str, subscription_id: &str) -> bool {
316    let value: Value = match serde_json::from_str(frame) {
317        Ok(v) => v,
318        Err(_) => return false,
319    };
320    let arr = match value.as_array() {
321        Some(items) => items,
322        None => return false,
323    };
324    if arr.len() != 2 {
325        return false;
326    }
327    arr.first().and_then(Value::as_str) == Some("EOSE")
328        && arr.get(1).and_then(Value::as_str) == Some(subscription_id)
329}