Skip to main content

zinc_core/
offer_relay.rs

1//! Nostr relay transport helpers for decentralized offer publication/discovery.
2//!
3//! This module intentionally excludes PoW and relay auth for now. It focuses on
4//! baseline multi-relay fanout and discovery semantics.
5
6use crate::{NostrOfferEvent, ZincError, OFFER_EVENT_KIND};
7use futures_util::{SinkExt, StreamExt};
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use std::collections::HashSet;
11use std::time::{SystemTime, UNIX_EPOCH};
12use tokio::time::{timeout, Duration};
13use tokio_tungstenite::{connect_async, tungstenite::Message};
14
15const OFFER_SCHEMA_TAG_VALUE: &str = "zinc-offer-v1";
16
17/// Publish result for one relay.
18#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
19pub struct RelayPublishResult {
20    pub relay_url: String,
21    pub accepted: bool,
22    pub message: String,
23}
24
25/// Query options for relay discovery.
26#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
27pub struct RelayQueryOptions {
28    /// Maximum number of events requested from each relay.
29    pub limit: usize,
30    /// Per-relay timeout in milliseconds.
31    pub timeout_ms: u64,
32}
33
34impl Default for RelayQueryOptions {
35    fn default() -> Self {
36        Self {
37            limit: 256,
38            timeout_ms: 5_000,
39        }
40    }
41}
42
43/// Stateless helper for Nostr relay framing, publishing, and discovery.
44pub struct NostrRelayClient;
45
46impl NostrRelayClient {
47    /// Build an `["EVENT", <event>]` frame.
48    pub fn event_frame(event: &NostrOfferEvent) -> Result<String, ZincError> {
49        serde_json::to_string(&serde_json::json!(["EVENT", event]))
50            .map_err(|e| ZincError::SerializationError(e.to_string()))
51    }
52
53    /// Build a `["REQ", sub_id, filter]` frame for offer discovery.
54    pub fn req_frame(subscription_id: &str, limit: usize) -> Result<String, ZincError> {
55        serde_json::to_string(&serde_json::json!([
56            "REQ",
57            subscription_id,
58            {
59                "kinds": [OFFER_EVENT_KIND],
60                "#z": [OFFER_SCHEMA_TAG_VALUE],
61                "limit": limit
62            }
63        ]))
64        .map_err(|e| ZincError::SerializationError(e.to_string()))
65    }
66
67    /// Build a `["CLOSE", sub_id]` frame.
68    pub fn close_frame(subscription_id: &str) -> Result<String, ZincError> {
69        serde_json::to_string(&serde_json::json!(["CLOSE", subscription_id]))
70            .map_err(|e| ZincError::SerializationError(e.to_string()))
71    }
72
73    /// Parse relay `["OK", event_id, accepted, message]` frames.
74    pub fn parse_ok_frame(frame: &str, event_id: &str) -> Option<(bool, String)> {
75        let value: Value = serde_json::from_str(frame).ok()?;
76        let arr = value.as_array()?;
77        if arr.len() != 4 {
78            return None;
79        }
80        if arr.first()?.as_str()? != "OK" {
81            return None;
82        }
83        if arr.get(1)?.as_str()? != event_id {
84            return None;
85        }
86        let accepted = arr.get(2)?.as_bool()?;
87        let message = arr.get(3)?.as_str()?.to_string();
88        Some((accepted, message))
89    }
90
91    /// Parse relay `["EVENT", sub_id, event]` frames for the specified subscription.
92    pub fn parse_event_frame(frame: &str, subscription_id: &str) -> Option<NostrOfferEvent> {
93        let value: Value = serde_json::from_str(frame).ok()?;
94        let arr = value.as_array()?;
95        if arr.len() != 3 {
96            return None;
97        }
98        if arr.first()?.as_str()? != "EVENT" {
99            return None;
100        }
101        if arr.get(1)?.as_str()? != subscription_id {
102            return None;
103        }
104
105        let event: NostrOfferEvent = serde_json::from_value(arr.get(2)?.clone()).ok()?;
106        event.verify().ok()?;
107        Some(event)
108    }
109
110    /// Publish one signed offer event to one relay and wait for relay `OK`.
111    pub async fn publish_offer(
112        relay_url: &str,
113        event: &NostrOfferEvent,
114        timeout_ms: u64,
115    ) -> Result<RelayPublishResult, ZincError> {
116        event.verify()?;
117        let (mut socket, _) = connect_async(relay_url).await.map_err(|e| {
118            ZincError::OfferError(format!("failed to connect relay {relay_url}: {e}"))
119        })?;
120
121        let event_frame = Self::event_frame(event)?;
122        socket
123            .send(Message::Text(event_frame.into()))
124            .await
125            .map_err(|e| ZincError::OfferError(format!("failed to send event frame: {e}")))?;
126
127        let relay_url_owned = relay_url.to_string();
128        let event_id = event.id.clone();
129        let ack = timeout(Duration::from_millis(timeout_ms), async move {
130            while let Some(message) = socket.next().await {
131                match message {
132                    Ok(Message::Text(text)) => {
133                        if let Some((accepted, msg)) =
134                            Self::parse_ok_frame(text.as_ref(), &event_id)
135                        {
136                            return Ok(RelayPublishResult {
137                                relay_url: relay_url_owned.clone(),
138                                accepted,
139                                message: msg,
140                            });
141                        }
142                    }
143                    Ok(Message::Binary(bin)) => {
144                        if let Ok(text) = std::str::from_utf8(&bin) {
145                            if let Some((accepted, msg)) = Self::parse_ok_frame(text, &event_id) {
146                                return Ok(RelayPublishResult {
147                                    relay_url: relay_url_owned.clone(),
148                                    accepted,
149                                    message: msg,
150                                });
151                            }
152                        }
153                    }
154                    Ok(Message::Close(_)) => {
155                        break;
156                    }
157                    Ok(_) => {}
158                    Err(e) => {
159                        return Err(ZincError::OfferError(format!(
160                            "relay read error for {relay_url_owned}: {e}"
161                        )));
162                    }
163                }
164            }
165
166            Err(ZincError::OfferError(format!(
167                "relay {relay_url_owned} closed before acknowledging event"
168            )))
169        })
170        .await
171        .map_err(|_| {
172            ZincError::OfferError(format!("relay {relay_url} timed out waiting for OK"))
173        })?;
174
175        ack
176    }
177
178    /// Publish one event to multiple relays concurrently.
179    pub async fn publish_offer_multi(
180        relay_urls: &[String],
181        event: &NostrOfferEvent,
182        timeout_ms: u64,
183    ) -> Vec<RelayPublishResult> {
184        let mut tasks = Vec::new();
185        for relay_url in relay_urls {
186            let relay = relay_url.clone();
187            let event = event.clone();
188            tasks.push(tokio::spawn(async move {
189                match Self::publish_offer(&relay, &event, timeout_ms).await {
190                    Ok(result) => result,
191                    Err(err) => RelayPublishResult {
192                        relay_url: relay,
193                        accepted: false,
194                        message: err.to_string(),
195                    },
196                }
197            }));
198        }
199
200        let mut results = Vec::new();
201        for task in tasks {
202            if let Ok(result) = task.await {
203                results.push(result);
204            }
205        }
206        results
207    }
208
209    /// Discover valid offer events from a single relay.
210    pub async fn discover_offer_events(
211        relay_url: &str,
212        options: RelayQueryOptions,
213    ) -> Result<Vec<NostrOfferEvent>, ZincError> {
214        let (mut socket, _) = connect_async(relay_url).await.map_err(|e| {
215            ZincError::OfferError(format!("failed to connect relay {relay_url}: {e}"))
216        })?;
217
218        let subscription_id = format!(
219            "zinc-offers-{}",
220            SystemTime::now()
221                .duration_since(UNIX_EPOCH)
222                .unwrap_or_default()
223                .as_nanos()
224        );
225        let req_frame = Self::req_frame(&subscription_id, options.limit)?;
226        socket
227            .send(Message::Text(req_frame.into()))
228            .await
229            .map_err(|e| ZincError::OfferError(format!("failed to send req frame: {e}")))?;
230
231        let mut events = Vec::new();
232        let mut seen_ids = HashSet::new();
233        let sid = subscription_id.clone();
234        timeout(Duration::from_millis(options.timeout_ms), async {
235            while let Some(message) = socket.next().await {
236                match message {
237                    Ok(Message::Text(text)) => {
238                        if let Some(event) = Self::parse_event_frame(text.as_ref(), &sid) {
239                            if seen_ids.insert(event.id.clone()) {
240                                events.push(event);
241                            }
242                            continue;
243                        }
244                        if is_eose_frame(text.as_ref(), &sid) {
245                            break;
246                        }
247                    }
248                    Ok(Message::Binary(bin)) => {
249                        if let Ok(text) = std::str::from_utf8(&bin) {
250                            if let Some(event) = Self::parse_event_frame(text, &sid) {
251                                if seen_ids.insert(event.id.clone()) {
252                                    events.push(event);
253                                }
254                                continue;
255                            }
256                            if is_eose_frame(text, &sid) {
257                                break;
258                            }
259                        }
260                    }
261                    Ok(Message::Close(_)) => {
262                        break;
263                    }
264                    Ok(_) => {}
265                    Err(e) => {
266                        return Err(ZincError::OfferError(format!(
267                            "relay read error for {relay_url}: {e}"
268                        )));
269                    }
270                }
271            }
272
273            Ok::<(), ZincError>(())
274        })
275        .await
276        .map_err(|_| {
277            ZincError::OfferError(format!(
278                "relay {relay_url} timed out while discovering offers"
279            ))
280        })??;
281
282        let close = Self::close_frame(&subscription_id)?;
283        let _ = socket.send(Message::Text(close.into())).await;
284        Ok(events)
285    }
286
287    /// Discover valid events across multiple relays and dedupe by event id.
288    pub async fn discover_offer_events_multi(
289        relay_urls: &[String],
290        options: RelayQueryOptions,
291    ) -> Vec<NostrOfferEvent> {
292        let mut tasks = Vec::new();
293        for relay_url in relay_urls {
294            let relay = relay_url.clone();
295            let options = options.clone();
296            tasks.push(tokio::spawn(async move {
297                Self::discover_offer_events(&relay, options)
298                    .await
299                    .unwrap_or_default()
300            }));
301        }
302
303        let mut merged = Vec::new();
304        let mut seen_ids = HashSet::new();
305        for task in tasks {
306            if let Ok(events) = task.await {
307                for event in events {
308                    if seen_ids.insert(event.id.clone()) {
309                        merged.push(event);
310                    }
311                }
312            }
313        }
314        merged
315    }
316}
317
318fn is_eose_frame(frame: &str, subscription_id: &str) -> bool {
319    let value: Value = match serde_json::from_str(frame) {
320        Ok(v) => v,
321        Err(_) => return false,
322    };
323    let arr = match value.as_array() {
324        Some(items) => items,
325        None => return false,
326    };
327    if arr.len() != 2 {
328        return false;
329    }
330    arr.first().and_then(Value::as_str) == Some("EOSE")
331        && arr.get(1).and_then(Value::as_str) == Some(subscription_id)
332}