nostr_rust/nostr_client.rs
1use crate::events::Event;
2use crate::req::{Req, ReqFilter};
3use crate::websocket::{self, SimplifiedWS};
4use crate::Message;
5use serde_json::{json, Value};
6use std::collections::HashMap;
7use std::sync::Arc;
8use thiserror::Error;
9
10#[derive(Error, Debug)]
11pub enum ClientError {
12 #[error("Error while trying to connect to the websocket server")]
13 WSError(websocket::SimplifiedWSError),
14
15 #[error("Already subscribed to the event")]
16 AlreadySubscribed,
17
18 #[error("Relay does not exist")]
19 RelayDoesNotExist,
20
21 #[error("Serde Error: {}", _0)]
22 SerdeError(#[from] serde_json::Error),
23}
24
25impl From<websocket::SimplifiedWSError> for ClientError {
26 fn from(err: websocket::SimplifiedWSError) -> Self {
27 Self::WSError(err)
28 }
29}
30
31#[cfg(not(feature = "async"))]
32/// Nostr Client
33pub struct Client {
34 pub relays: HashMap<String, Arc<std::sync::Mutex<SimplifiedWS>>>,
35 pub subscriptions: HashMap<String, Vec<Message>>,
36}
37
38#[cfg(feature = "async")]
39/// Nostr Client
40pub struct Client {
41 pub relays: HashMap<String, Arc<tokio::sync::Mutex<SimplifiedWS>>>,
42 pub subscriptions: HashMap<String, Vec<Message>>,
43}
44
45impl Client {
46 #[cfg(not(feature = "async"))]
47 /// Create a new client with a list of default relays
48 ///
49 /// # Example
50 /// ```rust
51 /// use nostr_rust::nostr_client::Client;
52 /// let client = Client::new(vec![env!("RELAY_URL")]).unwrap();
53 /// ```
54 pub fn new(default_relays: Vec<&str>) -> Result<Self, ClientError> {
55 let mut client = Self {
56 relays: HashMap::new(),
57 subscriptions: HashMap::new(),
58 };
59
60 for relay in default_relays {
61 client.add_relay(relay)?;
62 }
63
64 Ok(client)
65 }
66
67 #[cfg(feature = "async")]
68 /// Create a new client with a list of default relays
69 ///
70 /// # Example
71 /// ```rust, async_await
72 /// use nostr_rust::nostr_client::Client;
73 ///
74 /// #[tokio::test]
75 /// async fn test_new_client() {
76 /// let client = Client::new(vec![env!("RELAY_URL")]).await.unwrap();
77 /// }
78 /// ```
79 pub async fn new(default_relays: Vec<&str>) -> Result<Self, ClientError> {
80 let mut client = Self {
81 relays: HashMap::new(),
82 subscriptions: HashMap::new(),
83 };
84
85 for relay in default_relays {
86 client.add_relay(relay).await?;
87 }
88
89 Ok(client)
90 }
91}
92
93impl Client {
94 #[cfg(not(feature = "async"))]
95 /// Add a relay to the client
96 /// # Example
97 /// ```rust
98 /// use nostr_rust::nostr_client::Client;
99 /// let mut client = Client::new(vec![]).unwrap();
100 /// client.add_relay(env!("RELAY_URL")).unwrap();
101 /// ```
102 pub fn add_relay(&mut self, relay: &str) -> Result<(), ClientError> {
103 let client = match SimplifiedWS::new(relay) {
104 Ok(client) => client,
105 Err(err) => return Err(ClientError::WSError(err)),
106 };
107
108 // Check if relay is already added
109 if self.relays.contains_key(relay) {
110 return Err(ClientError::AlreadySubscribed);
111 }
112
113 self.relays
114 .insert(relay.to_string(), Arc::new(std::sync::Mutex::new(client)));
115
116 Ok(())
117 }
118
119 #[cfg(feature = "async")]
120 /// Add a relay to the client
121 /// # Example
122 /// ```rust
123 /// use nostr_rust::nostr_client::Client;
124 ///
125 /// #[tokio::test]
126 /// async fn test_add_relay() {
127 /// let mut client = Client::new(vec![env!("RELAY_URL")]).await.unwrap();
128 /// client.add_relay("wss://relay.damus.io").await.unwrap();
129 /// }
130 /// ```
131 pub async fn add_relay(&mut self, relay: &str) -> Result<(), ClientError> {
132 let client = match SimplifiedWS::new(relay).await {
133 Ok(client) => client,
134 Err(err) => return Err(ClientError::WSError(err)),
135 };
136
137 // Check if relay is already added
138 if self.relays.contains_key(relay) {
139 return Err(ClientError::AlreadySubscribed);
140 }
141
142 self.relays
143 .insert(relay.to_string(), Arc::new(tokio::sync::Mutex::new(client)));
144
145 Ok(())
146 }
147
148 #[cfg(not(feature = "async"))]
149 /// Remove a relay from the client
150 /// # Example
151 /// ```rust
152 /// use nostr_rust::nostr_client::Client;
153 /// let mut client = Client::new(vec![env!("RELAY_URL")]).unwrap();
154 /// client.remove_relay(env!("RELAY_URL")).unwrap();
155 /// ```
156 pub fn remove_relay(&mut self, relay: &str) -> Result<(), ClientError> {
157 if !self.relays.contains_key(relay) {
158 return Err(ClientError::RelayDoesNotExist);
159 }
160
161 // Close the connection
162 self.relays
163 .remove(relay)
164 .unwrap()
165 .lock()
166 .unwrap()
167 .socket
168 .close(None)
169 .unwrap();
170
171 Ok(())
172 }
173
174 #[cfg(feature = "async")]
175 /// Remove a relay from the client
176 /// # Example
177 /// ```rust
178 /// use nostr_rust::nostr_client::Client;
179 ///
180 /// #[tokio::test]
181 /// async fn test_remove_relay() {
182 /// let mut client = Client::new(vec![env!("RELAY_URL")]).await.unwrap();
183 /// client.remove_relay(env!("RELAY_URL")).await.unwrap();
184 /// }
185 /// ```
186 pub async fn remove_relay(&mut self, relay: &str) -> Result<(), ClientError> {
187 if !self.relays.contains_key(relay) {
188 return Err(ClientError::RelayDoesNotExist);
189 }
190
191 // Close the connection
192 self.relays
193 .remove(relay)
194 .unwrap()
195 .lock()
196 .await
197 .socket
198 .close(None)
199 .await
200 .unwrap();
201
202 Ok(())
203 }
204
205 #[cfg(not(feature = "async"))]
206 /// Publish a Nostr event
207 pub fn publish_event(&mut self, event: &Event) -> Result<(), ClientError> {
208 let json_stringified = json!(["EVENT", event]).to_string();
209 let message = Message::text(json_stringified);
210
211 for relay in self.relays.values() {
212 let mut relay = relay.lock().unwrap();
213 relay.send_message(&message)?;
214 }
215
216 Ok(())
217 }
218
219 #[cfg(feature = "async")]
220 /// Publish a Nostr event
221 pub async fn publish_event(&mut self, event: &Event) -> Result<(), ClientError> {
222 let json_stringified = json!(["EVENT", event]).to_string();
223 let message = Message::text(json_stringified);
224
225 for relay in self.relays.values() {
226 let mut relay = relay.lock().await;
227 relay.send_message(&message).await?;
228 }
229
230 Ok(())
231 }
232
233 #[cfg(not(feature = "async"))]
234 /// Get next data from the relays
235 /// # Example
236 /// ```rust
237 /// use std::{
238 /// sync::{Arc, Mutex},
239 /// thread,
240 /// };
241 /// use tungstenite::Message;
242 /// use nostr_rust::{nostr_client::Client, req::ReqFilter};
243 ///
244 /// fn handle_message(relay_url: &String, message: &Message) -> Result<(), String> {
245 /// println!("Received message: {:?}", message);
246 ///
247 /// Ok(())
248 /// }
249 ///
250 /// let mut client = Arc::new(Mutex::new(Client::new(vec![env!("RELAY_URL")]).unwrap()));
251 ///
252 /// // Run a new thread to listen
253 /// let nostr_clone = client.clone();
254 /// let nostr_thread = thread::spawn(move || loop {
255 /// let events = nostr_clone.lock().unwrap().next_data().unwrap();
256 ///
257 /// for (relay_url, message) in events.iter() {
258 /// handle_message(relay_url, message).unwrap();
259 /// }
260 /// });
261 ///
262 /// // Subscribe to the most beautiful Nostr profile event
263 /// client
264 /// .lock()
265 /// .unwrap()
266 /// .subscribe(vec![ReqFilter {
267 /// ids: None,
268 /// authors: Some(vec![
269 /// "884704bd421721e292edbff42eb77547fe115c6ff9825b08fc366be4cd69e9f6".to_string(),
270 /// ]),
271 /// kinds: None,
272 /// e: None,
273 /// p: None,
274 /// since: None,
275 /// until: None,
276 /// limit: Some(1),
277 /// }])
278 /// .unwrap();
279 ///
280 /// // Wait 3s for the thread to finish
281 /// std::thread::sleep(std::time::Duration::from_secs(3));
282 /// ```
283 pub fn next_data(&mut self) -> Result<Vec<(String, tungstenite::Message)>, ClientError> {
284 let mut events: Vec<(String, tungstenite::Message)> = Vec::new();
285
286 for (relay_name, socket) in self.relays.iter() {
287 let message = socket.lock().unwrap().read_message()?;
288 events.push((relay_name.clone(), message));
289 }
290
291 Ok(events)
292 }
293
294 #[cfg(feature = "async")]
295 /// Get next data from the relays
296 /// # Example
297 /// ```rust
298 /// use std::{
299 /// sync::{Arc, Mutex},
300 /// thread,
301 /// };
302 /// use tungstenite::Message;
303 /// use nostr_rust::{nostr_client::Client, req::ReqFilter};
304 ///
305 /// fn handle_message(relay_url: &String, message: &Message) -> Result<(), String> {
306 /// println!("Received message: {:?}", message);
307 ///
308 /// Ok(())
309 /// }
310 ///
311 /// #[tokio::test]
312 /// async fn test_next_data() {
313 /// let mut client = Arc::new(Mutex::new(Client::new(vec![env!("RELAY_URL")]).await.unwrap()));
314 ///
315 /// // Run a new thread to listen
316 /// let nostr_clone = client.clone();
317 /// let nostr_thread = thread::spawn(move || loop {
318 /// let events = nostr_clone.lock().unwrap().next_data().await.unwrap();
319 ///
320 /// for (relay_url, message) in events.iter() {
321 /// handle_message(relay_url, message).unwrap();
322 /// }
323 /// });
324 ///
325 /// // Subscribe to the most beautiful Nostr profile event
326 /// client
327 /// .lock()
328 /// .unwrap()
329 /// .subscribe(vec![ReqFilter {
330 /// ids: None,
331 /// authors: Some(vec![
332 /// "884704bd421721e292edbff42eb77547fe115c6ff9825b08fc366be4cd69e9f6".to_string(),
333 /// ]),
334 /// kinds: None,
335 /// e: None,
336 /// p: None,
337 /// since: None,
338 /// until: None,
339 /// limit: Some(1),
340 /// }])
341 /// .await
342 /// .unwrap();
343 ///
344 /// // Wait 3s for the thread to finish
345 /// std::thread::sleep(std::time::Duration::from_secs(3));
346 /// }
347 /// ```
348 pub async fn next_data(&mut self) -> Result<Vec<(String, tungstenite::Message)>, ClientError> {
349 let mut events: Vec<(String, tungstenite::Message)> = Vec::new();
350
351 for (relay_name, socket) in self.relays.iter() {
352 let message = socket.lock().await.read_message().await?;
353 events.push((relay_name.clone(), message));
354 }
355
356 Ok(events)
357 }
358
359 #[cfg(not(feature = "async"))]
360 /// Subscribe
361 /// # Example
362 /// ```rust
363 /// use nostr_rust::{nostr_client::Client, req::ReqFilter};
364 /// let mut client = Client::new(vec![env!("RELAY_URL")]).unwrap();
365 /// client
366 /// .subscribe(vec![ReqFilter { // None means generate a random ID
367 /// ids: None,
368 /// authors: Some(vec![
369 /// "884704bd421721e292edbff42eb77547fe115c6ff9825b08fc366be4cd69e9f6".to_string(),
370 /// ]),
371 /// kinds: None,
372 /// e: None,
373 /// p: None,
374 /// since: None,
375 /// until: None,
376 /// limit: Some(1),
377 /// }])
378 /// .unwrap();
379 /// ```
380 pub fn subscribe(&mut self, filters: Vec<ReqFilter>) -> Result<String, ClientError> {
381 let req = Req::new(None, filters);
382 let message = Message::text(req.to_string());
383
384 for relay in self.relays.values() {
385 let mut relay = relay.lock().unwrap();
386 relay.send_message(&message)?;
387 }
388
389 Ok(req.subscription_id)
390 }
391
392 #[cfg(feature = "async")]
393 /// Subscribe
394 /// # Example
395 /// ```rust
396 /// use nostr_rust::{nostr_client::Client, req::ReqFilter};
397 ///
398 /// #[tokio::test]
399 /// async fn test_subscribe() {
400 /// let mut client = Client::new(vec![env!("RELAY_URL")]).await.unwrap();
401 /// client
402 /// .subscribe(vec![ReqFilter { // None means generate a random ID
403 /// ids: None,
404 /// authors: Some(vec![
405 /// "884704bd421721e292edbff42eb77547fe115c6ff9825b08fc366be4cd69e9f6".to_string(),
406 /// ]),
407 /// kinds: None,
408 /// e: None,
409 /// p: None,
410 /// since: None,
411 /// until: None,
412 /// limit: Some(1),
413 /// }])
414 /// .await
415 /// .unwrap();
416 /// }
417 /// ```
418 pub async fn subscribe(&mut self, filters: Vec<ReqFilter>) -> Result<String, ClientError> {
419 let req = Req::new(None, filters);
420 let message = Message::text(req.to_string());
421
422 for relay in self.relays.values() {
423 let mut relay = relay.lock().await;
424 relay.send_message(&message).await?;
425 }
426
427 Ok(req.subscription_id)
428 }
429
430 #[cfg(not(feature = "async"))]
431 /// Subscribe with a specific ID
432 ///
433 /// # Example
434 /// ```rust
435 /// use nostr_rust::{nostr_client::Client, req::ReqFilter};
436 /// let mut client = Client::new(vec![env!("RELAY_URL")]).unwrap();
437 /// client
438 /// .subscribe_with_id("my_subscription_id", vec![ReqFilter {
439 /// ids: None,
440 /// authors: Some(vec![
441 /// "884704bd421721e292edbff42eb77547fe115c6ff9825b08fc366be4cd69e9f6".to_string(),
442 /// ]),
443 /// kinds: None,
444 /// e: None,
445 /// p: None,
446 /// since: None,
447 /// until: None,
448 /// limit: Some(1),
449 /// }])
450 /// .unwrap();
451 /// ```
452 pub fn subscribe_with_id(
453 &mut self,
454 subscription_id: &str,
455 filters: Vec<ReqFilter>,
456 ) -> Result<(), ClientError> {
457 let req = Req::new(Some(subscription_id), filters);
458 let message = Message::text(req.to_string());
459
460 for relay in self.relays.values() {
461 let mut relay = relay.lock().unwrap();
462 relay.send_message(&message)?;
463 }
464
465 Ok(())
466 }
467
468 #[cfg(feature = "async")]
469 /// Subscribe with a specific ID
470 ///
471 /// # Example
472 /// ```rust
473 /// use nostr_rust::{nostr_client::Client, req::ReqFilter};
474 ///
475 /// #[tokio::test]
476 /// async fn test_subscribe_with_id() {
477 /// let mut client = Client::new(vec![env!("RELAY_URL")]).await.unwrap();
478 /// client
479 /// .subscribe_with_id("my_subscription_id", vec![ReqFilter {
480 /// ids: None,
481 /// authors: Some(vec![
482 /// "884704bd421721e292edbff42eb77547fe115c6ff9825b08fc366be4cd69e9f6".to_string(),
483 /// ]),
484 /// kinds: None,
485 /// e: None,
486 /// p: None,
487 /// since: None,
488 /// until: None,
489 /// limit: Some(1),
490 /// }])
491 /// .await
492 /// .unwrap();
493 /// }
494 /// ```
495 pub async fn subscribe_with_id(
496 &mut self,
497 subscription_id: &str,
498 filters: Vec<ReqFilter>,
499 ) -> Result<(), ClientError> {
500 let req = Req::new(Some(subscription_id), filters);
501 let message = Message::text(req.to_string());
502
503 for relay in self.relays.values() {
504 let mut relay = relay.lock().await;
505 relay.send_message(&message).await?;
506 }
507
508 Ok(())
509 }
510
511 #[cfg(not(feature = "async"))]
512 /// Unsubscribe
513 /// # Example
514 /// ```rust
515 /// use nostr_rust::{nostr_client::Client, req::ReqFilter};
516 /// let mut client = Client::new(vec![env!("RELAY_URL")]).unwrap();
517 /// let subscription_id = client
518 /// .subscribe(vec![ReqFilter {
519 /// ids: None,
520 /// authors: Some(vec![
521 /// "884704bd421721e292edbff42eb77547fe115c6ff9825b08fc366be4cd69e9f6".to_string(),
522 /// ]),
523 /// kinds: None,
524 /// e: None,
525 /// p: None,
526 /// since: None,
527 /// until: None,
528 /// limit: Some(1),
529 /// }])
530 /// .unwrap();
531 /// client.unsubscribe(&subscription_id).unwrap();
532 /// ```
533 pub fn unsubscribe(&mut self, subscription_id: &str) -> Result<(), ClientError> {
534 let message = Message::text(json!(["CLOSE", subscription_id]).to_string());
535
536 for relay in self.relays.values() {
537 let mut relay = relay.lock().unwrap();
538 relay.send_message(&message)?;
539 }
540
541 Ok(())
542 }
543
544 #[cfg(feature = "async")]
545 /// Unsubscribe
546 /// # Example
547 /// ```rust
548 /// use nostr_rust::{nostr_client::Client, req::ReqFilter};
549 ///
550 /// #[tokio::test]
551 /// async fn test_unsubscribe() {
552 /// let mut client = Client::new(vec![env!("RELAY_URL")]).await.unwrap();
553 /// let subscription_id = client
554 /// .subscribe(vec![ReqFilter {
555 /// ids: None,
556 /// authors: Some(vec![
557 /// "884704bd421721e292edbff42eb77547fe115c6ff9825b08fc366be4cd69e9f6".to_string(),
558 /// ]),
559 /// kinds: None,
560 /// e: None,
561 /// p: None,
562 /// since: None,
563 /// until: None,
564 /// limit: Some(1),
565 /// }])
566 /// .await
567 /// .unwrap();
568 /// client.unsubscribe(&subscription_id).await.unwrap();
569 /// }
570 /// ```
571 pub async fn unsubscribe(&mut self, subscription_id: &str) -> Result<(), ClientError> {
572 let message = Message::text(json!(["CLOSE", subscription_id]).to_string());
573
574 for relay in self.relays.values() {
575 let mut relay = relay.lock().await;
576 relay.send_message(&message).await?;
577 }
578
579 Ok(())
580 }
581
582 /// Add event to a subscription
583 pub fn add_event(&mut self, subscription_id: &str, message: Message) {
584 // Check if the subscription exists
585 if !self.subscriptions.contains_key(subscription_id) {
586 self.subscriptions
587 .insert(subscription_id.to_string(), Vec::new());
588 }
589
590 // Check if the message is already in the subscription
591 if !self.subscriptions[subscription_id].contains(&message) {
592 // Add the message to the subscription
593 self.subscriptions
594 .get_mut(subscription_id)
595 .unwrap()
596 .push(message);
597 }
598 }
599
600 /// Get events and remove them from the subscription
601 pub fn get_events(&mut self, subscription_id: &str) -> Option<Vec<Message>> {
602 self.subscriptions.remove(subscription_id)
603 }
604
605 #[cfg(not(feature = "async"))]
606 /// Get events of a given filters
607 ///
608 /// # Example
609 /// ```rust
610 /// use nostr_rust::{nostr_client::Client, req::ReqFilter};
611 /// let mut client = Client::new(vec![env!("RELAY_URL")]).unwrap();
612 /// let events = client.get_events_of(vec![ReqFilter {
613 /// ids: None,
614 /// authors: Some(vec!["884704bd421721e292edbff42eb77547fe115c6ff9825b08fc366be4cd69e9f6".to_string()]),
615 /// kinds: Some(vec![3]),
616 /// e: None,
617 /// p: None,
618 /// since: None,
619 /// until: None,
620 /// limit: Some(1),
621 /// }]).unwrap();
622 /// ```
623 pub fn get_events_of(&mut self, filters: Vec<ReqFilter>) -> Result<Vec<Event>, ClientError> {
624 let mut events: Vec<Event> = Vec::new();
625
626 // Subscribe
627 let id = self.subscribe(filters)?;
628
629 let mut waiting_relays: Vec<String> = self.relays.keys().map(|k| k.to_string()).collect();
630
631 // Get the events
632 while !waiting_relays.is_empty() {
633 let data = self.next_data()?;
634 let mut break_loop = false;
635
636 for (relay, message) in data {
637 let event: Value = serde_json::from_str(&message.to_string())?;
638
639 if event[0] == "EOSE" && event[1].as_str() == Some(&id) {
640 let index = waiting_relays.iter().position(|r| r == &relay).unwrap();
641 waiting_relays.remove(index);
642
643 break_loop = true;
644 break;
645 }
646
647 self.add_event(&id, message);
648 }
649
650 if break_loop {
651 break;
652 }
653 }
654
655 // unsubscribe
656 self.unsubscribe(&id)?;
657
658 // Get the events
659 if let Some(messages) = self.get_events(&id) {
660 for message in messages {
661 if !message.is_text() {
662 continue;
663 }
664
665 let event: Value = serde_json::from_str(&message.to_string())?;
666
667 let event_object = serde_json::from_value::<Event>(event[2].clone());
668
669 if event_object.is_err() {
670 continue;
671 }
672
673 events.push(event_object.unwrap());
674 }
675 }
676 Ok(events)
677 }
678
679 #[cfg(feature = "async")]
680 /// Get events of a given filters
681 ///
682 /// # Example
683 /// ```rust
684 /// use nostr_rust::{nostr_client::Client, req::ReqFilter};
685 ///
686 /// #[tokio::test]
687 /// async fn test_get_events_of() {
688 /// let mut client = Client::new(vec![env!("RELAY_URL")]).await.unwrap();
689 /// let events = client.get_events_of(vec![ReqFilter {
690 /// ids: None,
691 /// authors: Some(vec!["884704bd421721e292edbff42eb77547fe115c6ff9825b08fc366be4cd69e9f6".to_string()]),
692 /// kinds: Some(vec![3]),
693 /// e: None,
694 /// p: None,
695 /// since: None,
696 /// until: None,
697 /// limit: Some(1),
698 /// }]).await
699 /// .unwrap();
700 /// }
701 /// ```
702 pub async fn get_events_of(
703 &mut self,
704 filters: Vec<ReqFilter>,
705 ) -> Result<Vec<Event>, ClientError> {
706 let mut events: Vec<Event> = Vec::new();
707
708 // Subscribe
709 let id = self.subscribe(filters).await?;
710
711 let mut waiting_relays: Vec<String> = self.relays.keys().map(|k| k.to_string()).collect();
712
713 // Get the events
714 while !waiting_relays.is_empty() {
715 let data = self.next_data().await?;
716 let mut break_loop = false;
717
718 for (relay, message) in data {
719 let event: Value = serde_json::from_str(&message.to_string()).unwrap();
720
721 if event[0] == "EOSE" && event[1].as_str() == Some(&id) {
722 let index = waiting_relays.iter().position(|r| r == &relay).unwrap();
723 waiting_relays.remove(index);
724
725 break_loop = true;
726 break;
727 }
728
729 self.add_event(&id, message);
730 }
731
732 if break_loop {
733 break;
734 }
735 }
736
737 // unsubscribe
738 self.unsubscribe(&id).await?;
739
740 // Get the events
741 if let Some(messages) = self.get_events(&id) {
742 for message in messages {
743 if !message.is_text() {
744 continue;
745 }
746
747 let event: Value = serde_json::from_str(&message.to_string())?;
748
749 let event_object = serde_json::from_value::<Event>(event[2].clone());
750
751 if event_object.is_err() {
752 continue;
753 }
754
755 events.push(event_object.unwrap());
756 }
757 }
758 Ok(events)
759 }
760}