nostr_sdk/client/mod.rs
1// Copyright (c) 2022-2023 Yuki Kishimoto
2// Copyright (c) 2023-2025 Rust Nostr Developers
3// Distributed under the MIT software license
4
5//! Client
6
7use std::collections::{HashMap, HashSet};
8use std::future::Future;
9use std::iter;
10use std::sync::Arc;
11use std::time::Duration;
12
13use nostr::prelude::*;
14use nostr_database::prelude::*;
15use nostr_relay_pool::prelude::*;
16use tokio::sync::broadcast;
17
18pub mod builder;
19mod error;
20pub mod options;
21
22pub use self::builder::ClientBuilder;
23pub use self::error::Error;
24pub use self::options::{ClientOptions, SleepWhenIdle};
25#[cfg(not(target_arch = "wasm32"))]
26pub use self::options::{Connection, ConnectionTarget};
27use crate::gossip::{BrokenDownFilters, Gossip};
28
29/// Nostr client
30#[derive(Debug, Clone)]
31pub struct Client {
32 pool: RelayPool,
33 gossip: Gossip,
34 opts: ClientOptions,
35}
36
37impl Default for Client {
38 #[inline]
39 fn default() -> Self {
40 Self::builder().build()
41 }
42}
43
44impl Client {
45 /// Construct client with signer
46 ///
47 /// To construct a client without signer use [`Client::default`].
48 ///
49 /// # Example
50 /// ```rust,no_run
51 /// use nostr_sdk::prelude::*;
52 ///
53 /// let keys = Keys::generate();
54 /// let client = Client::new(keys);
55 /// ```
56 #[inline]
57 pub fn new<T>(signer: T) -> Self
58 where
59 T: IntoNostrSigner,
60 {
61 Self::builder().signer(signer).build()
62 }
63
64 /// Construct client
65 ///
66 /// # Example
67 /// ```rust,no_run
68 /// use std::time::Duration;
69 ///
70 /// use nostr_sdk::prelude::*;
71 ///
72 /// let signer = Keys::generate();
73 /// let opts = ClientOptions::default().gossip(true);
74 /// let client: Client = Client::builder().signer(signer).opts(opts).build();
75 /// ```
76 #[inline]
77 pub fn builder() -> ClientBuilder {
78 ClientBuilder::default()
79 }
80
81 fn from_builder(builder: ClientBuilder) -> Self {
82 // Construct relay pool builder
83 let pool_builder: RelayPoolBuilder = RelayPoolBuilder {
84 websocket_transport: builder.websocket_transport,
85 admit_policy: builder.admit_policy,
86 monitor: builder.monitor,
87 opts: builder.opts.pool,
88 __database: builder.database,
89 __signer: builder.signer,
90 };
91
92 // Construct client
93 Self {
94 pool: pool_builder.build(),
95 gossip: Gossip::new(),
96 opts: builder.opts,
97 }
98 }
99
100 /// Update minimum POW difficulty for received events
101 ///
102 /// Events with a POW lower than the current value will be ignored to prevent resources exhaustion.
103 #[deprecated(
104 since = "0.40.0",
105 note = "This no longer works, please use `AdmitPolicy` instead."
106 )]
107 pub fn update_min_pow_difficulty(&self, _difficulty: u8) {}
108
109 /// Auto authenticate to relays (default: true)
110 ///
111 /// <https://github.com/nostr-protocol/nips/blob/master/42.md>
112 #[inline]
113 pub fn automatic_authentication(&self, enable: bool) {
114 self.pool.state().automatic_authentication(enable);
115 }
116
117 /// Check if signer is configured
118 #[inline]
119 pub async fn has_signer(&self) -> bool {
120 self.pool.state().has_signer().await
121 }
122
123 /// Get current nostr signer
124 ///
125 /// # Errors
126 ///
127 /// Returns an error if the signer isn't set.
128 #[inline]
129 pub async fn signer(&self) -> Result<Arc<dyn NostrSigner>, Error> {
130 Ok(self.pool.state().signer().await?)
131 }
132
133 /// Set nostr signer
134 #[inline]
135 pub async fn set_signer<T>(&self, signer: T)
136 where
137 T: IntoNostrSigner,
138 {
139 self.pool.state().set_signer(signer).await;
140 }
141
142 /// Unset nostr signer
143 #[inline]
144 pub async fn unset_signer(&self) {
145 self.pool.state().unset_signer().await;
146 }
147
148 /// Get [`RelayPool`]
149 #[inline]
150 pub fn pool(&self) -> &RelayPool {
151 &self.pool
152 }
153
154 /// Get database
155 #[inline]
156 pub fn database(&self) -> &Arc<dyn NostrDatabase> {
157 self.pool.database()
158 }
159
160 /// Get the relay monitor
161 #[inline]
162 pub fn monitor(&self) -> Option<&Monitor> {
163 self.pool.monitor()
164 }
165
166 /// Reset the client
167 ///
168 /// This method resets the client to simplify the switch to another account.
169 ///
170 /// This method will:
171 /// * unsubscribe from all subscriptions
172 /// * disconnect and force remove all relays
173 /// * unset the signer
174 ///
175 /// This method will NOT:
176 /// * reset [`ClientOptions`]
177 /// * remove the database
178 /// * clear the gossip graph
179 pub async fn reset(&self) {
180 self.unsubscribe_all().await;
181 self.force_remove_all_relays().await;
182 self.unset_signer().await;
183 }
184
185 /// Completely shutdown client
186 #[inline]
187 pub async fn shutdown(&self) {
188 self.pool.shutdown().await
189 }
190
191 /// Get new notification listener
192 ///
193 /// <div class="warning">When you call this method, you subscribe to the notifications channel from that precise moment. Anything received by relay/s before that moment is not included in the channel!</div>
194 #[inline]
195 pub fn notifications(&self) -> broadcast::Receiver<RelayPoolNotification> {
196 self.pool.notifications()
197 }
198
199 /// Get relays with [`RelayServiceFlags::READ`] or [`RelayServiceFlags::WRITE`] flags
200 ///
201 /// Call [`RelayPool::all_relays`] to get all relays
202 /// or [`RelayPool::relays_with_flag`] to get relays with specific [`RelayServiceFlags`].
203 #[inline]
204 pub async fn relays(&self) -> HashMap<RelayUrl, Relay> {
205 self.pool.relays().await
206 }
207
208 /// Get a previously added [`Relay`]
209 #[inline]
210 pub async fn relay<U>(&self, url: U) -> Result<Relay, Error>
211 where
212 U: TryIntoUrl,
213 pool::Error: From<<U as TryIntoUrl>::Err>,
214 {
215 Ok(self.pool.relay(url).await?)
216 }
217
218 async fn compose_relay_opts(&self, _url: &RelayUrl) -> RelayOptions {
219 let opts: RelayOptions = RelayOptions::new();
220
221 // Set connection mode
222 #[cfg(not(target_arch = "wasm32"))]
223 let opts: RelayOptions = match &self.opts.connection.mode {
224 ConnectionMode::Direct => opts,
225 ConnectionMode::Proxy(..) => match self.opts.connection.target {
226 ConnectionTarget::All => opts.connection_mode(self.opts.connection.mode.clone()),
227 ConnectionTarget::Onion => {
228 if _url.is_onion() {
229 opts.connection_mode(self.opts.connection.mode.clone())
230 } else {
231 opts
232 }
233 }
234 },
235 #[cfg(feature = "tor")]
236 ConnectionMode::Tor { .. } => match self.opts.connection.target {
237 ConnectionTarget::All => opts.connection_mode(self.opts.connection.mode.clone()),
238 ConnectionTarget::Onion => {
239 if _url.is_onion() {
240 opts.connection_mode(self.opts.connection.mode.clone())
241 } else {
242 opts
243 }
244 }
245 },
246 };
247
248 // Set sleep when idle
249 let opts: RelayOptions = match self.opts.sleep_when_idle {
250 // Do nothing
251 SleepWhenIdle::Disabled => opts,
252 // Enable: update relay options
253 SleepWhenIdle::Enabled { timeout } => opts.sleep_when_idle(true).idle_timeout(timeout),
254 };
255
256 // Set limits
257 opts.limits(self.opts.relay_limits.clone())
258 .max_avg_latency(self.opts.max_avg_latency)
259 .verify_subscriptions(self.opts.verify_subscriptions)
260 .ban_relay_on_mismatch(self.opts.ban_relay_on_mismatch)
261 }
262
263 /// If return `false` means that already existed
264 async fn get_or_add_relay_with_flag<U>(
265 &self,
266 url: U,
267 flag: RelayServiceFlags,
268 ) -> Result<bool, Error>
269 where
270 U: TryIntoUrl,
271 pool::Error: From<<U as TryIntoUrl>::Err>,
272 {
273 // Convert into url
274 let url: RelayUrl = url.try_into_url().map_err(pool::Error::from)?;
275
276 // Compose relay options
277 let opts: RelayOptions = self.compose_relay_opts(&url).await;
278
279 // Set flag
280 let opts: RelayOptions = opts.flags(flag);
281
282 // Add relay with opts or edit current one
283 // TODO: remove clone here
284 match self.pool.__get_or_add_relay(url.clone(), opts).await? {
285 Some(relay) => {
286 relay.flags().add(flag);
287 Ok(false)
288 }
289 None => {
290 // TODO: move autoconnect to `Relay`?
291 // Connect if `autoconnect` is enabled
292 if self.opts.autoconnect {
293 self.connect_relay::<RelayUrl>(url).await?;
294 }
295
296 Ok(true)
297 }
298 }
299 }
300
301 /// Add relay
302 ///
303 /// Relays added with this method will have both [`RelayServiceFlags::READ`] and [`RelayServiceFlags::WRITE`] flags enabled.
304 ///
305 /// If the relay already exists, the flags will be updated and `false` returned.
306 ///
307 /// If are set pool subscriptions, the new added relay will inherit them. Use [`Client::subscribe_to`] method instead of [`Client::subscribe`],
308 /// to avoid to set pool subscriptions.
309 ///
310 /// This method use previously set or default [`ClientOptions`] to configure the [`Relay`] (ex. set proxy, set min POW, set relay limits, ...).
311 /// To use custom [`RelayOptions`] use [`RelayPool::add_relay`].
312 ///
313 /// Connection is **NOT** automatically started with relay, remember to call [`Client::connect`]!
314 #[inline]
315 pub async fn add_relay<U>(&self, url: U) -> Result<bool, Error>
316 where
317 U: TryIntoUrl,
318 pool::Error: From<<U as TryIntoUrl>::Err>,
319 {
320 self.get_or_add_relay_with_flag(url, RelayServiceFlags::default())
321 .await
322 }
323
324 /// Add discovery relay
325 ///
326 /// If relay already exists, this method automatically add the [`RelayServiceFlags::DISCOVERY`] flag to it and return `false`.
327 ///
328 /// <https://github.com/nostr-protocol/nips/blob/master/65.md>
329 #[inline]
330 pub async fn add_discovery_relay<U>(&self, url: U) -> Result<bool, Error>
331 where
332 U: TryIntoUrl,
333 pool::Error: From<<U as TryIntoUrl>::Err>,
334 {
335 self.get_or_add_relay_with_flag(url, RelayServiceFlags::PING | RelayServiceFlags::DISCOVERY)
336 .await
337 }
338
339 /// Add read relay
340 ///
341 /// If relay already exists, this method add the [`RelayServiceFlags::READ`] flag to it and return `false`.
342 ///
343 /// If are set pool subscriptions, the new added relay will inherit them. Use `subscribe_to` method instead of `subscribe`,
344 /// to avoid to set pool subscriptions.
345 #[inline]
346 pub async fn add_read_relay<U>(&self, url: U) -> Result<bool, Error>
347 where
348 U: TryIntoUrl,
349 pool::Error: From<<U as TryIntoUrl>::Err>,
350 {
351 self.get_or_add_relay_with_flag(url, RelayServiceFlags::PING | RelayServiceFlags::READ)
352 .await
353 }
354
355 /// Add write relay
356 ///
357 /// If relay already exists, this method add the [`RelayServiceFlags::WRITE`] flag to it and return `false`.
358 #[inline]
359 pub async fn add_write_relay<U>(&self, url: U) -> Result<bool, Error>
360 where
361 U: TryIntoUrl,
362 pool::Error: From<<U as TryIntoUrl>::Err>,
363 {
364 self.get_or_add_relay_with_flag(url, RelayServiceFlags::PING | RelayServiceFlags::WRITE)
365 .await
366 }
367
368 #[inline]
369 async fn add_gossip_relay<U>(&self, url: U) -> Result<bool, Error>
370 where
371 U: TryIntoUrl,
372 pool::Error: From<<U as TryIntoUrl>::Err>,
373 {
374 self.get_or_add_relay_with_flag(url, RelayServiceFlags::PING | RelayServiceFlags::GOSSIP)
375 .await
376 }
377
378 /// Remove and disconnect relay
379 ///
380 /// If the relay has [`RelayServiceFlags::GOSSIP`], it will not be removed from the pool and its
381 /// flags will be updated (remove [`RelayServiceFlags::READ`],
382 /// [`RelayServiceFlags::WRITE`] and [`RelayServiceFlags::DISCOVERY`] flags).
383 ///
384 /// To force remove the relay, use [`Client::force_remove_relay`].
385 #[inline]
386 pub async fn remove_relay<U>(&self, url: U) -> Result<(), Error>
387 where
388 U: TryIntoUrl,
389 pool::Error: From<<U as TryIntoUrl>::Err>,
390 {
391 Ok(self.pool.remove_relay(url).await?)
392 }
393
394 /// Force remove and disconnect relay
395 ///
396 /// Note: this method will remove the relay, also if it's in use for the gossip model or other service!
397 #[inline]
398 pub async fn force_remove_relay<U>(&self, url: U) -> Result<(), Error>
399 where
400 U: TryIntoUrl,
401 pool::Error: From<<U as TryIntoUrl>::Err>,
402 {
403 Ok(self.pool.force_remove_relay(url).await?)
404 }
405
406 /// Disconnect and remove all relays
407 ///
408 /// Some relays used by some services could not be disconnected with this method
409 /// (like the ones used for gossip).
410 /// Use [`Client::force_remove_all_relays`] to remove every relay.
411 #[inline]
412 pub async fn remove_all_relays(&self) {
413 self.pool.remove_all_relays().await
414 }
415
416 /// Disconnect and force remove all relays
417 #[inline]
418 pub async fn force_remove_all_relays(&self) {
419 self.pool.force_remove_all_relays().await
420 }
421
422 /// Connect to a previously added relay
423 ///
424 /// Check [`RelayPool::connect_relay`] docs to learn more.
425 #[inline]
426 pub async fn connect_relay<U>(&self, url: U) -> Result<(), Error>
427 where
428 U: TryIntoUrl,
429 pool::Error: From<<U as TryIntoUrl>::Err>,
430 {
431 Ok(self.pool.connect_relay(url).await?)
432 }
433
434 /// Try to connect to a previously added relay
435 ///
436 /// For further details, see the documentation of [`RelayPool::try_connect_relay`].
437 #[inline]
438 pub async fn try_connect_relay<U>(&self, url: U, timeout: Duration) -> Result<(), Error>
439 where
440 U: TryIntoUrl,
441 pool::Error: From<<U as TryIntoUrl>::Err>,
442 {
443 Ok(self.pool.try_connect_relay(url, timeout).await?)
444 }
445
446 /// Disconnect relay
447 #[inline]
448 pub async fn disconnect_relay<U>(&self, url: U) -> Result<(), Error>
449 where
450 U: TryIntoUrl,
451 pool::Error: From<<U as TryIntoUrl>::Err>,
452 {
453 Ok(self.pool.disconnect_relay(url).await?)
454 }
455
456 /// Connect to all added relays
457 ///
458 /// Attempts to initiate a connection for every relay currently in
459 /// [`RelayStatus::Initialized`] or [`RelayStatus::Terminated`].
460 /// A background connection task is spawned for each such relay, which then tries
461 /// to establish the connection.
462 /// Any relay not in one of these two statuses is skipped.
463 ///
464 /// For further details, see the documentation of [`Relay::connect`].
465 #[inline]
466 pub async fn connect(&self) {
467 self.pool.connect().await;
468 }
469
470 /// Waits for relays connections
471 ///
472 /// Wait for relays connections at most for the specified `timeout`.
473 /// The code continues when the relays are connected or the `timeout` is reached.
474 #[inline]
475 pub async fn wait_for_connection(&self, timeout: Duration) {
476 self.pool.wait_for_connection(timeout).await
477 }
478
479 /// Try to establish a connection with the relays.
480 ///
481 /// Attempts to establish a connection for every relay currently in
482 /// [`RelayStatus::Initialized`] or [`RelayStatus::Terminated`]
483 /// without spawning the connection task if it fails.
484 /// This means that if the connection fails, no automatic retries are scheduled.
485 /// Use [`Client::connect`] if you want to immediately spawn a connection task,
486 /// regardless of whether the initial connection succeeds.
487 ///
488 /// For further details, see the documentation of [`Relay::try_connect`].
489 #[inline]
490 pub async fn try_connect(&self, timeout: Duration) -> Output<()> {
491 self.pool.try_connect(timeout).await
492 }
493
494 /// Connect to all added relays
495 ///
496 /// Try to connect to the relays and wait for them to be connected at most for the specified `timeout`.
497 /// The code continues if the `timeout` is reached or if all relays connect.
498 #[deprecated(
499 since = "0.39.0",
500 note = "Use `connect` + `wait_for_connection` instead."
501 )]
502 pub async fn connect_with_timeout(&self, timeout: Duration) {
503 self.pool.try_connect(timeout).await;
504 }
505
506 /// Disconnect from all relays
507 #[inline]
508 pub async fn disconnect(&self) {
509 self.pool.disconnect().await
510 }
511
512 /// Get subscriptions
513 #[inline]
514 pub async fn subscriptions(&self) -> HashMap<SubscriptionId, HashMap<RelayUrl, Filter>> {
515 self.pool.subscriptions().await
516 }
517
518 /// Get subscription
519 #[inline]
520 pub async fn subscription(&self, id: &SubscriptionId) -> HashMap<RelayUrl, Filter> {
521 self.pool.subscription(id).await
522 }
523
524 /// Subscribe to filters
525 ///
526 /// This method create a new subscription. None of the previous subscriptions will be edited/closed when you call this!
527 /// So remember to unsubscribe when you no longer need it. You can get all your active (non-auto-closing) subscriptions
528 /// by calling `client.subscriptions().await`.
529 ///
530 /// If `gossip` is enabled (see [`ClientOptions::gossip`]) the events will be requested also to
531 /// NIP65 relays (automatically discovered) of public keys included in filters (if any).
532 ///
533 /// # Auto-closing subscription
534 ///
535 /// It's possible to automatically close a subscription by configuring the [`SubscribeAutoCloseOptions`].
536 ///
537 /// Note: auto-closing subscriptions aren't saved in subscriptions map!
538 ///
539 /// # Example
540 /// ```rust,no_run
541 /// # use nostr_sdk::prelude::*;
542 /// # #[tokio::main]
543 /// # async fn main() -> Result<()> {
544 /// # let keys = Keys::generate();
545 /// # let client = Client::new(keys.clone());
546 /// // Compose filter
547 /// let subscription = Filter::new()
548 /// .pubkeys(vec![keys.public_key()])
549 /// .since(Timestamp::now());
550 ///
551 /// // Subscribe
552 /// let output = client.subscribe(subscription, None).await?;
553 /// println!("Subscription ID: {}", output.val);
554 ///
555 /// // Auto-closing subscription
556 /// let id = SubscriptionId::generate();
557 /// let subscription = Filter::new().kind(Kind::TextNote).limit(10);
558 /// let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE);
559 /// let output = client.subscribe(subscription, Some(opts)).await?;
560 /// println!("Subscription ID: {} [auto-closing]", output.val);
561 /// # Ok(())
562 /// # }
563 /// ```
564 pub async fn subscribe(
565 &self,
566 filter: Filter,
567 opts: Option<SubscribeAutoCloseOptions>,
568 ) -> Result<Output<SubscriptionId>, Error> {
569 let id: SubscriptionId = SubscriptionId::generate();
570 let output: Output<()> = self.subscribe_with_id(id.clone(), filter, opts).await?;
571 Ok(Output {
572 val: id,
573 success: output.success,
574 failed: output.failed,
575 })
576 }
577
578 /// Subscribe to filters with custom [SubscriptionId]
579 ///
580 /// If `gossip` is enabled (see [`ClientOptions::gossip`]) the events will be requested also to
581 /// NIP65 relays (automatically discovered) of public keys included in filters (if any).
582 ///
583 /// # Auto-closing subscription
584 ///
585 /// It's possible to automatically close a subscription by configuring the [SubscribeAutoCloseOptions].
586 ///
587 /// Note: auto-closing subscriptions aren't saved in subscriptions map!
588 pub async fn subscribe_with_id(
589 &self,
590 id: SubscriptionId,
591 filter: Filter,
592 opts: Option<SubscribeAutoCloseOptions>,
593 ) -> Result<Output<()>, Error> {
594 let opts: SubscribeOptions = SubscribeOptions::default().close_on(opts);
595
596 if self.opts.gossip {
597 self.gossip_subscribe(id, filter, opts).await
598 } else {
599 Ok(self.pool.subscribe_with_id(id, filter, opts).await?)
600 }
601 }
602
603 /// Subscribe to filters to specific relays
604 ///
605 /// This method create a new subscription. None of the previous subscriptions will be edited/closed when you call this!
606 /// So remember to unsubscribe when you no longer need it.
607 ///
608 /// ### Auto-closing subscription
609 ///
610 /// It's possible to automatically close a subscription by configuring the [SubscribeAutoCloseOptions].
611 #[inline]
612 pub async fn subscribe_to<I, U>(
613 &self,
614 urls: I,
615 filter: Filter,
616 opts: Option<SubscribeAutoCloseOptions>,
617 ) -> Result<Output<SubscriptionId>, Error>
618 where
619 I: IntoIterator<Item = U>,
620 U: TryIntoUrl,
621 pool::Error: From<<U as TryIntoUrl>::Err>,
622 {
623 let opts: SubscribeOptions = SubscribeOptions::default().close_on(opts);
624 Ok(self.pool.subscribe_to(urls, filter, opts).await?)
625 }
626
627 /// Subscribe to filter with custom [SubscriptionId] to specific relays
628 ///
629 /// ### Auto-closing subscription
630 ///
631 /// It's possible to automatically close a subscription by configuring the [SubscribeAutoCloseOptions].
632 #[inline]
633 pub async fn subscribe_with_id_to<I, U>(
634 &self,
635 urls: I,
636 id: SubscriptionId,
637 filter: Filter,
638 opts: Option<SubscribeAutoCloseOptions>,
639 ) -> Result<Output<()>, Error>
640 where
641 I: IntoIterator<Item = U>,
642 U: TryIntoUrl,
643 pool::Error: From<<U as TryIntoUrl>::Err>,
644 {
645 let opts: SubscribeOptions = SubscribeOptions::default().close_on(opts);
646 Ok(self
647 .pool
648 .subscribe_with_id_to(urls, id, filter, opts)
649 .await?)
650 }
651
652 /// Targeted subscription
653 ///
654 /// Subscribe to specific relays with specific filters
655 #[inline]
656 pub async fn subscribe_targeted<I, U>(
657 &self,
658 id: SubscriptionId,
659 targets: I,
660 opts: SubscribeOptions,
661 ) -> Result<Output<()>, Error>
662 where
663 I: IntoIterator<Item = (U, Filter)>,
664 U: TryIntoUrl,
665 pool::Error: From<<U as TryIntoUrl>::Err>,
666 {
667 Ok(self.pool.subscribe_targeted(id, targets, opts).await?)
668 }
669
670 /// Unsubscribe
671 #[inline]
672 pub async fn unsubscribe(&self, id: &SubscriptionId) {
673 self.pool.unsubscribe(id).await;
674 }
675
676 /// Unsubscribe from all subscriptions
677 #[inline]
678 pub async fn unsubscribe_all(&self) {
679 self.pool.unsubscribe_all().await;
680 }
681
682 /// Sync events with relays (negentropy reconciliation)
683 ///
684 /// If `gossip` is enabled (see [`ClientOptions::gossip`]) the events will be reconciled also from
685 /// NIP65 relays (automatically discovered) of public keys included in filters (if any).
686 ///
687 /// <https://github.com/hoytech/negentropy>
688 #[inline]
689 pub async fn sync(
690 &self,
691 filter: Filter,
692 opts: &SyncOptions,
693 ) -> Result<Output<Reconciliation>, Error> {
694 if self.opts.gossip {
695 return self.gossip_sync_negentropy(filter, opts).await;
696 }
697
698 Ok(self.pool.sync(filter, opts).await?)
699 }
700
701 /// Sync events with specific relays (negentropy reconciliation)
702 ///
703 /// <https://github.com/hoytech/negentropy>
704 pub async fn sync_with<I, U>(
705 &self,
706 urls: I,
707 filter: Filter,
708 opts: &SyncOptions,
709 ) -> Result<Output<Reconciliation>, Error>
710 where
711 I: IntoIterator<Item = U>,
712 U: TryIntoUrl,
713 pool::Error: From<<U as TryIntoUrl>::Err>,
714 {
715 Ok(self.pool.sync_with(urls, filter, opts).await?)
716 }
717
718 /// Fetch events from relays
719 ///
720 /// # Overview
721 ///
722 /// This is an **auto-closing subscription** and will be closed automatically on `EOSE`.
723 /// To use another exit policy, check [`RelayPool::fetch_events`].
724 /// For long-lived subscriptions, check [`Client::subscribe`].
725 ///
726 /// # Gossip
727 ///
728 /// If `gossip` is enabled (see [`ClientOptions::gossip`]) the events will be requested also to
729 /// NIP65 relays (automatically discovered) of public keys included in filters (if any).
730 ///
731 /// # Example
732 /// ```rust,no_run
733 /// # use std::time::Duration;
734 /// # use nostr_sdk::prelude::*;
735 /// # #[tokio::main]
736 /// # async fn main() {
737 /// # let keys = Keys::generate();
738 /// # let client = Client::new(keys.clone());
739 /// let subscription = Filter::new()
740 /// .pubkeys(vec![keys.public_key()])
741 /// .since(Timestamp::now());
742 ///
743 /// let _events = client
744 /// .fetch_events(subscription, Duration::from_secs(10))
745 /// .await
746 /// .unwrap();
747 /// # }
748 /// ```
749 pub async fn fetch_events(&self, filter: Filter, timeout: Duration) -> Result<Events, Error> {
750 if self.opts.gossip {
751 return self
752 .gossip_fetch_events(filter, timeout, ReqExitPolicy::ExitOnEOSE)
753 .await;
754 }
755
756 Ok(self
757 .pool
758 .fetch_events(filter, timeout, ReqExitPolicy::ExitOnEOSE)
759 .await?)
760 }
761
762 /// Fetch events from specific relays
763 ///
764 /// # Overview
765 ///
766 /// This is an **auto-closing subscription** and will be closed automatically on `EOSE`.
767 /// To use another exit policy, check [`RelayPool::fetch_events_from`].
768 /// For long-lived subscriptions, check [`Client::subscribe_to`].
769 #[inline]
770 pub async fn fetch_events_from<I, U>(
771 &self,
772 urls: I,
773 filter: Filter,
774 timeout: Duration,
775 ) -> Result<Events, Error>
776 where
777 I: IntoIterator<Item = U>,
778 U: TryIntoUrl,
779 pool::Error: From<<U as TryIntoUrl>::Err>,
780 {
781 Ok(self
782 .pool
783 .fetch_events_from(urls, filter, timeout, ReqExitPolicy::ExitOnEOSE)
784 .await?)
785 }
786
787 /// Get events both from database and relays
788 ///
789 /// # Overview
790 ///
791 /// This is an **auto-closing subscription** and will be closed automatically on `EOSE`.
792 /// For long-lived subscriptions, check [`Client::subscribe`].
793 ///
794 /// # Gossip
795 ///
796 /// If `gossip` is enabled (see [`ClientOptions::gossip`]) the events will be requested also to
797 /// NIP65 relays (automatically discovered) of public keys included in filters (if any).
798 ///
799 /// # Notes and alternative example
800 ///
801 /// This method will be deprecated in the future!
802 /// This is a temporary solution for who still want to query events both from database and relays and merge the result.
803 /// The optimal solution is to execute a [`Client::sync`] to reconcile missing events, [`Client::subscribe`] to get all
804 /// new future events, [`NostrDatabase::query`] to query stored events and [`Client::handle_notifications`] to listen-for/handle new events (i.e. to know when update the UI).
805 /// This will allow very fast queries, low bandwidth usage (depending on how many events the client have to reconcile) and a lower load on the relays.
806 ///
807 /// You can obtain the same result with:
808 /// ```rust,no_run
809 /// # use std::time::Duration;
810 /// # use nostr_sdk::prelude::*;
811 /// # #[tokio::main]
812 /// # async fn main() -> Result<()> {
813 /// # let client = Client::default();
814 /// # let filter = Filter::new().limit(1);
815 /// // Query database
816 /// let stored_events: Events = client.database().query(filter.clone()).await?;
817 ///
818 /// // Query relays
819 /// let fetched_events: Events = client.fetch_events(filter, Duration::from_secs(10)).await?;
820 ///
821 /// // Merge result
822 /// let events: Events = stored_events.merge(fetched_events);
823 ///
824 /// // Iter and print result
825 /// for event in events.into_iter() {
826 /// println!("{}", event.as_json());
827 /// }
828 /// # Ok(())
829 /// # }
830 /// ```
831 pub async fn fetch_combined_events(
832 &self,
833 filter: Filter,
834 timeout: Duration,
835 ) -> Result<Events, Error> {
836 // Query database
837 let stored_events: Events = self.database().query(filter.clone()).await?;
838
839 // Query relays
840 let fetched_events: Events = self.fetch_events(filter, timeout).await?;
841
842 // Merge result
843 Ok(stored_events.merge(fetched_events))
844 }
845
846 /// Stream events from relays
847 ///
848 /// # Overview
849 ///
850 /// This is an **auto-closing subscription** and will be closed automatically on `EOSE`.
851 /// To use another exit policy, check [`RelayPool::stream_events`].
852 /// For long-lived subscriptions, check [`Client::subscribe`].
853 ///
854 /// # Gossip
855 ///
856 /// If `gossip` is enabled (see [`ClientOptions::gossip`]) the events will be streamed also from
857 /// NIP65 relays (automatically discovered) of public keys included in filters (if any).
858 pub async fn stream_events(
859 &self,
860 filter: Filter,
861 timeout: Duration,
862 ) -> Result<ReceiverStream<Event>, Error> {
863 // Check if gossip is enabled
864 if self.opts.gossip {
865 self.gossip_stream_events(filter, timeout, ReqExitPolicy::ExitOnEOSE)
866 .await
867 } else {
868 Ok(self
869 .pool
870 .stream_events(filter, timeout, ReqExitPolicy::ExitOnEOSE)
871 .await?)
872 }
873 }
874
875 /// Stream events from specific relays
876 ///
877 /// # Overview
878 ///
879 /// This is an **auto-closing subscription** and will be closed automatically on `EOSE`.
880 /// To use another exit policy, check [`RelayPool::stream_events_from`].
881 /// For long-lived subscriptions, check [`Client::subscribe_to`].
882 #[inline]
883 pub async fn stream_events_from<I, U>(
884 &self,
885 urls: I,
886 filter: Filter,
887 timeout: Duration,
888 ) -> Result<ReceiverStream<Event>, Error>
889 where
890 I: IntoIterator<Item = U>,
891 U: TryIntoUrl,
892 pool::Error: From<<U as TryIntoUrl>::Err>,
893 {
894 Ok(self
895 .pool
896 .stream_events_from(urls, filter, timeout, ReqExitPolicy::default())
897 .await?)
898 }
899
900 /// Stream events from specific relays with specific filters
901 ///
902 /// # Overview
903 ///
904 /// This is an **auto-closing subscription** and will be closed automatically on `EOSE`.
905 /// To use another exit policy, check [`RelayPool::stream_events_targeted`].
906 /// For long-lived subscriptions, check [`Client::subscribe_targeted`].
907 pub async fn stream_events_targeted(
908 &self,
909 targets: HashMap<RelayUrl, Filter>,
910 timeout: Duration,
911 ) -> Result<ReceiverStream<Event>, Error> {
912 Ok(self
913 .pool
914 .stream_events_targeted(targets, timeout, ReqExitPolicy::default())
915 .await?)
916 }
917
918 /// Send the client message to a **specific relays**
919 #[inline]
920 pub async fn send_msg_to<I, U>(
921 &self,
922 urls: I,
923 msg: ClientMessage<'_>,
924 ) -> Result<Output<()>, Error>
925 where
926 I: IntoIterator<Item = U>,
927 U: TryIntoUrl,
928 pool::Error: From<<U as TryIntoUrl>::Err>,
929 {
930 Ok(self.pool.send_msg_to(urls, msg).await?)
931 }
932
933 /// Batch send client messages to **specific relays**
934 #[inline]
935 pub async fn batch_msg_to<I, U>(
936 &self,
937 urls: I,
938 msgs: Vec<ClientMessage<'_>>,
939 ) -> Result<Output<()>, Error>
940 where
941 I: IntoIterator<Item = U>,
942 U: TryIntoUrl,
943 pool::Error: From<<U as TryIntoUrl>::Err>,
944 {
945 Ok(self.pool.batch_msg_to(urls, msgs).await?)
946 }
947
948 /// Send the event to relays
949 ///
950 /// # Overview
951 ///
952 /// Send the [`Event`] to all relays with [`RelayServiceFlags::WRITE`] flag.
953 ///
954 /// # Gossip
955 ///
956 /// If `gossip` is enabled (see [`ClientOptions::gossip`]):
957 /// - the [`Event`] will be sent also to NIP65 relays (automatically discovered);
958 /// - the gossip data will be updated, if the [`Event`] is a NIP17/NIP65 relay list.
959 #[inline]
960 pub async fn send_event(&self, event: &Event) -> Result<Output<EventId>, Error> {
961 // NOT gossip, send event to all relays
962 if !self.opts.gossip {
963 return Ok(self.pool.send_event(event).await?);
964 }
965
966 // Update gossip graph
967 self.gossip.process_event(event).await;
968
969 // Send event using gossip
970 self.gossip_send_event(event, false).await
971 }
972
973 /// Send event to specific relays
974 ///
975 /// # Gossip
976 ///
977 /// If `gossip` is enabled (see [`ClientOptions::gossip`]) and the [`Event`] is a NIP17/NIP65 relay list,
978 /// the gossip data will be updated.
979 #[inline]
980 pub async fn send_event_to<I, U>(
981 &self,
982 urls: I,
983 event: &Event,
984 ) -> Result<Output<EventId>, Error>
985 where
986 I: IntoIterator<Item = U>,
987 U: TryIntoUrl,
988 pool::Error: From<<U as TryIntoUrl>::Err>,
989 {
990 // If gossip is enabled, update the gossip graph
991 if self.opts.gossip {
992 self.gossip.process_event(event).await;
993 }
994
995 // Send event to relays
996 Ok(self.pool.send_event_to(urls, event).await?)
997 }
998
999 /// Build, sign and return [`Event`]
1000 ///
1001 /// This method requires a [`NostrSigner`].
1002 pub async fn sign_event_builder(&self, builder: EventBuilder) -> Result<Event, Error> {
1003 let signer = self.signer().await?;
1004 Ok(builder.sign(&signer).await?)
1005 }
1006
1007 /// Take an [`EventBuilder`], sign it by using the [`NostrSigner`] and broadcast to relays.
1008 ///
1009 /// This method requires a [`NostrSigner`].
1010 ///
1011 /// Check [`Client::send_event`] from more details.
1012 #[inline]
1013 pub async fn send_event_builder(
1014 &self,
1015 builder: EventBuilder,
1016 ) -> Result<Output<EventId>, Error> {
1017 let event: Event = self.sign_event_builder(builder).await?;
1018 self.send_event(&event).await
1019 }
1020
1021 /// Take an [`EventBuilder`], sign it by using the [`NostrSigner`] and broadcast to specific relays.
1022 ///
1023 /// This method requires a [`NostrSigner`].
1024 ///
1025 /// Check [`Client::send_event_to`] from more details.
1026 #[inline]
1027 pub async fn send_event_builder_to<I, U>(
1028 &self,
1029 urls: I,
1030 builder: EventBuilder,
1031 ) -> Result<Output<EventId>, Error>
1032 where
1033 I: IntoIterator<Item = U>,
1034 U: TryIntoUrl,
1035 pool::Error: From<<U as TryIntoUrl>::Err>,
1036 {
1037 let event: Event = self.sign_event_builder(builder).await?;
1038 self.send_event_to(urls, &event).await
1039 }
1040
1041 /// Fetch the newest public key metadata from relays.
1042 ///
1043 /// Returns [`None`] if the [`Metadata`] of the [`PublicKey`] has not been found.
1044 ///
1045 /// Check [`Client::fetch_events`] for more details.
1046 ///
1047 /// If you only want to consult stored data,
1048 /// consider `client.database().profile(PUBKEY)`.
1049 ///
1050 /// <https://github.com/nostr-protocol/nips/blob/master/01.md>
1051 pub async fn fetch_metadata(
1052 &self,
1053 public_key: PublicKey,
1054 timeout: Duration,
1055 ) -> Result<Option<Metadata>, Error> {
1056 let filter: Filter = Filter::new()
1057 .author(public_key)
1058 .kind(Kind::Metadata)
1059 .limit(1);
1060 let events: Events = self.fetch_events(filter, timeout).await?;
1061 match events.first() {
1062 Some(event) => Ok(Some(Metadata::try_from(event)?)),
1063 None => Ok(None),
1064 }
1065 }
1066
1067 /// Update metadata
1068 ///
1069 /// This method requires a [`NostrSigner`].
1070 ///
1071 /// <https://github.com/nostr-protocol/nips/blob/master/01.md>
1072 ///
1073 /// # Example
1074 /// ```rust,no_run
1075 /// # use nostr_sdk::prelude::*;
1076 /// # #[tokio::main]
1077 /// # async fn main() {
1078 /// # let keys = Keys::generate();
1079 /// # let client = Client::new(keys);
1080 /// let metadata = Metadata::new()
1081 /// .name("username")
1082 /// .display_name("My Username")
1083 /// .about("Description")
1084 /// .picture(Url::parse("https://example.com/avatar.png").unwrap())
1085 /// .nip05("username@example.com");
1086 ///
1087 /// client.set_metadata(&metadata).await.unwrap();
1088 /// # }
1089 /// ```
1090 #[inline]
1091 pub async fn set_metadata(&self, metadata: &Metadata) -> Result<Output<EventId>, Error> {
1092 let builder = EventBuilder::metadata(metadata);
1093 self.send_event_builder(builder).await
1094 }
1095
1096 async fn get_contact_list_filter(&self) -> Result<Filter, Error> {
1097 let signer = self.signer().await?;
1098 let public_key = signer.get_public_key().await?;
1099 let filter: Filter = Filter::new()
1100 .author(public_key)
1101 .kind(Kind::ContactList)
1102 .limit(1);
1103 Ok(filter)
1104 }
1105
1106 /// Get the contact list from relays.
1107 ///
1108 /// This method requires a [`NostrSigner`].
1109 ///
1110 /// <https://github.com/nostr-protocol/nips/blob/master/02.md>
1111 pub async fn get_contact_list(&self, timeout: Duration) -> Result<Vec<Contact>, Error> {
1112 let mut contact_list: Vec<Contact> = Vec::new();
1113 let filter: Filter = self.get_contact_list_filter().await?;
1114 let events: Events = self.fetch_events(filter, timeout).await?;
1115
1116 // Get first event (result of `fetch_events` is sorted DESC by timestamp)
1117 if let Some(event) = events.first_owned() {
1118 for tag in event.tags.into_iter() {
1119 if let Some(TagStandard::PublicKey {
1120 public_key,
1121 relay_url,
1122 alias,
1123 uppercase: false,
1124 }) = tag.to_standardized()
1125 {
1126 contact_list.push(Contact {
1127 public_key,
1128 relay_url,
1129 alias,
1130 })
1131 }
1132 }
1133 }
1134
1135 Ok(contact_list)
1136 }
1137
1138 /// Get contact list public keys from relays.
1139 ///
1140 /// This method requires a [`NostrSigner`].
1141 ///
1142 /// <https://github.com/nostr-protocol/nips/blob/master/02.md>
1143 pub async fn get_contact_list_public_keys(
1144 &self,
1145 timeout: Duration,
1146 ) -> Result<Vec<PublicKey>, Error> {
1147 let mut pubkeys: Vec<PublicKey> = Vec::new();
1148 let filter: Filter = self.get_contact_list_filter().await?;
1149 let events: Events = self.fetch_events(filter, timeout).await?;
1150
1151 for event in events.into_iter() {
1152 pubkeys.extend(event.tags.public_keys());
1153 }
1154
1155 Ok(pubkeys)
1156 }
1157
1158 /// Get contact list [`Metadata`] from relays.
1159 ///
1160 /// This method requires a [`NostrSigner`].
1161 pub async fn get_contact_list_metadata(
1162 &self,
1163 timeout: Duration,
1164 ) -> Result<HashMap<PublicKey, Metadata>, Error> {
1165 let public_keys = self.get_contact_list_public_keys(timeout).await?;
1166 let mut contacts: HashMap<PublicKey, Metadata> =
1167 public_keys.iter().map(|p| (*p, Metadata::new())).collect();
1168
1169 let filter: Filter = Filter::new().authors(public_keys).kind(Kind::Metadata);
1170 let events: Events = self.fetch_events(filter, timeout).await?;
1171 for event in events.into_iter() {
1172 let metadata = Metadata::from_json(&event.content)?;
1173 if let Some(m) = contacts.get_mut(&event.pubkey) {
1174 *m = metadata
1175 };
1176 }
1177
1178 Ok(contacts)
1179 }
1180
1181 /// Send a private direct message
1182 ///
1183 /// If `gossip` is enabled (see [`ClientOptions::gossip`]) the message will be sent to the NIP17 relays (automatically discovered).
1184 /// If gossip is not enabled will be sent to all relays with [`RelayServiceFlags::WRITE`] flag.
1185 ///
1186 /// This method requires a [`NostrSigner`].
1187 ///
1188 /// # Errors
1189 ///
1190 /// Returns [`Error::PrivateMsgRelaysNotFound`] if the receiver hasn't set the NIP17 list,
1191 /// meaning that is not ready to receive private messages.
1192 ///
1193 /// <https://github.com/nostr-protocol/nips/blob/master/17.md>
1194 #[inline]
1195 #[cfg(feature = "nip59")]
1196 pub async fn send_private_msg<S, I>(
1197 &self,
1198 receiver: PublicKey,
1199 message: S,
1200 rumor_extra_tags: I,
1201 ) -> Result<Output<EventId>, Error>
1202 where
1203 S: Into<String>,
1204 I: IntoIterator<Item = Tag>,
1205 {
1206 let signer = self.signer().await?;
1207 let event: Event =
1208 EventBuilder::private_msg(&signer, receiver, message, rumor_extra_tags).await?;
1209
1210 // NOT gossip, send to all relays
1211 if !self.opts.gossip {
1212 return self.send_event(&event).await;
1213 }
1214
1215 self.gossip_send_event(&event, true).await
1216 }
1217
1218 /// Send a private direct message to specific relays
1219 ///
1220 /// This method requires a [`NostrSigner`].
1221 ///
1222 /// <https://github.com/nostr-protocol/nips/blob/master/17.md>
1223 #[inline]
1224 #[cfg(feature = "nip59")]
1225 pub async fn send_private_msg_to<I, S, U, IT>(
1226 &self,
1227 urls: I,
1228 receiver: PublicKey,
1229 message: S,
1230 rumor_extra_tags: IT,
1231 ) -> Result<Output<EventId>, Error>
1232 where
1233 I: IntoIterator<Item = U>,
1234 S: Into<String>,
1235 U: TryIntoUrl,
1236 IT: IntoIterator<Item = Tag>,
1237 pool::Error: From<<U as TryIntoUrl>::Err>,
1238 {
1239 let signer = self.signer().await?;
1240 let event: Event =
1241 EventBuilder::private_msg(&signer, receiver, message, rumor_extra_tags).await?;
1242 self.send_event_to(urls, &event).await
1243 }
1244
1245 /// Construct Gift Wrap and send to relays
1246 ///
1247 /// This method requires a [`NostrSigner`].
1248 ///
1249 /// Check [`Client::send_event`] to know how sending events works.
1250 ///
1251 /// <https://github.com/nostr-protocol/nips/blob/master/59.md>
1252 #[inline]
1253 #[cfg(feature = "nip59")]
1254 pub async fn gift_wrap<I>(
1255 &self,
1256 receiver: &PublicKey,
1257 rumor: UnsignedEvent,
1258 extra_tags: I,
1259 ) -> Result<Output<EventId>, Error>
1260 where
1261 I: IntoIterator<Item = Tag>,
1262 {
1263 // Acquire signer
1264 let signer = self.signer().await?;
1265
1266 // Build gift wrap
1267 let gift_wrap: Event =
1268 EventBuilder::gift_wrap(&signer, receiver, rumor, extra_tags).await?;
1269
1270 // Send
1271 self.send_event(&gift_wrap).await
1272 }
1273
1274 /// Construct Gift Wrap and send to specific relays
1275 ///
1276 /// This method requires a [`NostrSigner`].
1277 ///
1278 /// <https://github.com/nostr-protocol/nips/blob/master/59.md>
1279 #[inline]
1280 #[cfg(feature = "nip59")]
1281 pub async fn gift_wrap_to<I, U, IT>(
1282 &self,
1283 urls: I,
1284 receiver: &PublicKey,
1285 rumor: UnsignedEvent,
1286 extra_tags: IT,
1287 ) -> Result<Output<EventId>, Error>
1288 where
1289 I: IntoIterator<Item = U>,
1290 U: TryIntoUrl,
1291 IT: IntoIterator<Item = Tag>,
1292 pool::Error: From<<U as TryIntoUrl>::Err>,
1293 {
1294 // Acquire signer
1295 let signer = self.signer().await?;
1296
1297 // Build gift wrap
1298 let gift_wrap: Event =
1299 EventBuilder::gift_wrap(&signer, receiver, rumor, extra_tags).await?;
1300
1301 // Send
1302 self.send_event_to(urls, &gift_wrap).await
1303 }
1304
1305 /// Unwrap Gift Wrap event
1306 ///
1307 /// This method requires a [`NostrSigner`].
1308 ///
1309 /// Check [`UnwrappedGift::from_gift_wrap`] to learn more.
1310 ///
1311 /// <https://github.com/nostr-protocol/nips/blob/master/59.md>
1312 #[inline]
1313 #[cfg(feature = "nip59")]
1314 pub async fn unwrap_gift_wrap(&self, gift_wrap: &Event) -> Result<UnwrappedGift, Error> {
1315 let signer = self.signer().await?;
1316 Ok(UnwrappedGift::from_gift_wrap(&signer, gift_wrap).await?)
1317 }
1318
1319 /// Handle notifications
1320 ///
1321 /// The closure function expects a `bool` as output: return `true` to exit from the notification loop.
1322 #[inline]
1323 pub async fn handle_notifications<F, Fut>(&self, func: F) -> Result<(), Error>
1324 where
1325 F: Fn(RelayPoolNotification) -> Fut,
1326 Fut: Future<Output = Result<bool>>,
1327 {
1328 Ok(self.pool.handle_notifications(func).await?)
1329 }
1330}
1331
1332// Gossip
1333impl Client {
1334 /// Check if there are outdated public keys and update them
1335 async fn check_and_update_gossip<I>(&self, public_keys: I) -> Result<(), Error>
1336 where
1337 I: IntoIterator<Item = PublicKey>,
1338 {
1339 let outdated_public_keys: HashSet<PublicKey> =
1340 self.gossip.check_outdated(public_keys).await;
1341
1342 // No outdated public keys, immediately return.
1343 if outdated_public_keys.is_empty() {
1344 return Ok(());
1345 }
1346
1347 // Compose filters
1348 let filter: Filter = Filter::default()
1349 .authors(outdated_public_keys.clone())
1350 .kinds([Kind::RelayList, Kind::InboxRelays]);
1351
1352 // Query from database
1353 let stored_events: Events = self.database().query(filter.clone()).await?;
1354
1355 // Get DISCOVERY and READ relays
1356 let urls: Vec<RelayUrl> = self
1357 .pool
1358 .__relay_urls_with_flag(
1359 RelayServiceFlags::DISCOVERY | RelayServiceFlags::READ,
1360 FlagCheck::Any,
1361 )
1362 .await;
1363
1364 // Get events from discovery and read relays
1365 let events: Events = self
1366 .fetch_events_from(urls, filter, Duration::from_secs(10))
1367 .await?;
1368
1369 // Update last check for these public keys
1370 self.gossip.update_last_check(outdated_public_keys).await;
1371
1372 // Merge database and relays events
1373 let merged: Events = events.merge(stored_events);
1374
1375 // Update gossip graph
1376 self.gossip.update(merged).await;
1377
1378 Ok(())
1379 }
1380
1381 /// Break down filters for gossip and discovery relays
1382 async fn break_down_filter(&self, filter: Filter) -> Result<HashMap<RelayUrl, Filter>, Error> {
1383 // Extract all public keys from filters
1384 let public_keys = filter.extract_public_keys();
1385
1386 // Check and update outdated public keys
1387 self.check_and_update_gossip(public_keys).await?;
1388
1389 // Broken-down filters
1390 let filters: HashMap<RelayUrl, Filter> = match self.gossip.break_down_filter(filter).await {
1391 BrokenDownFilters::Filters(filters) => filters,
1392 BrokenDownFilters::Orphan(filter) | BrokenDownFilters::Other(filter) => {
1393 // Get read relays
1394 let read_relays: Vec<RelayUrl> = self.pool.__read_relay_urls().await;
1395
1396 let mut map = HashMap::with_capacity(read_relays.len());
1397 for url in read_relays.into_iter() {
1398 map.insert(url, filter.clone());
1399 }
1400 map
1401 }
1402 };
1403
1404 // Add gossip (outbox and inbox) relays
1405 for url in filters.keys() {
1406 if self.add_gossip_relay(url).await? {
1407 self.connect_relay(url).await?;
1408 }
1409 }
1410
1411 // Check if filters are empty
1412 // TODO: this can't be empty, right?
1413 if filters.is_empty() {
1414 return Err(Error::GossipFiltersEmpty);
1415 }
1416
1417 Ok(filters)
1418 }
1419
1420 async fn gossip_send_event(
1421 &self,
1422 event: &Event,
1423 is_nip17: bool,
1424 ) -> Result<Output<EventId>, Error> {
1425 let is_gift_wrap: bool = event.kind == Kind::GiftWrap;
1426
1427 // Get involved public keys and check what are up to date in the gossip graph and which ones require an update.
1428 if is_gift_wrap {
1429 // Get only p tags since the author of a gift wrap is randomized
1430 let public_keys = event.tags.public_keys().copied();
1431 self.check_and_update_gossip(public_keys).await?;
1432 } else {
1433 // Get all public keys involved in the event: author + p tags
1434 let public_keys = event
1435 .tags
1436 .public_keys()
1437 .copied()
1438 .chain(iter::once(event.pubkey));
1439 self.check_and_update_gossip(public_keys).await?;
1440 };
1441
1442 // Check if NIP17 or NIP65
1443 let urls: HashSet<RelayUrl> = if is_nip17 && is_gift_wrap {
1444 // Get NIP17 relays
1445 // Get only for relays for p tags since gift wraps are signed with random key (random author)
1446 let relays = self
1447 .gossip
1448 .get_nip17_inbox_relays(event.tags.public_keys())
1449 .await;
1450
1451 // Clients SHOULD publish kind 14 events to the 10050-listed relays.
1452 // If that is not found, that indicates the user is not ready to receive messages under this NIP and clients shouldn't try.
1453 //
1454 // <https://github.com/nostr-protocol/nips/blob/6e7a618e7f873bb91e743caacc3b09edab7796a0/17.md>
1455 if relays.is_empty() {
1456 return Err(Error::PrivateMsgRelaysNotFound);
1457 }
1458
1459 // Add outbox and inbox relays
1460 for url in relays.iter() {
1461 if self.add_gossip_relay(url).await? {
1462 self.connect_relay(url).await?;
1463 }
1464 }
1465
1466 relays
1467 } else {
1468 // Get NIP65 relays
1469 let mut outbox = self.gossip.get_nip65_outbox_relays(&[event.pubkey]).await;
1470 let inbox = self
1471 .gossip
1472 .get_nip65_inbox_relays(event.tags.public_keys())
1473 .await;
1474
1475 // Add outbox and inbox relays
1476 for url in outbox.iter().chain(inbox.iter()) {
1477 if self.add_gossip_relay(url).await? {
1478 self.connect_relay(url).await?;
1479 }
1480 }
1481
1482 // Get WRITE relays
1483 let write_relays: Vec<RelayUrl> = self.pool.__write_relay_urls().await;
1484
1485 // Extend OUTBOX relays with WRITE ones
1486 outbox.extend(write_relays);
1487
1488 // Extend outbox relays with inbox ones
1489 outbox.extend(inbox);
1490
1491 // Return all relays
1492 outbox
1493 };
1494
1495 // Send event
1496 Ok(self.pool.send_event_to(urls, event).await?)
1497 }
1498
1499 async fn gossip_stream_events(
1500 &self,
1501 filter: Filter,
1502 timeout: Duration,
1503 policy: ReqExitPolicy,
1504 ) -> Result<ReceiverStream<Event>, Error> {
1505 let filters = self.break_down_filter(filter).await?;
1506
1507 // Stream events
1508 let stream: ReceiverStream<Event> = self
1509 .pool
1510 .stream_events_targeted(filters, timeout, policy)
1511 .await?;
1512
1513 Ok(stream)
1514 }
1515
1516 async fn gossip_fetch_events(
1517 &self,
1518 filter: Filter,
1519 timeout: Duration,
1520 policy: ReqExitPolicy,
1521 ) -> Result<Events, Error> {
1522 let mut events: Events = Events::new(&filter);
1523
1524 // Stream events
1525 let mut stream: ReceiverStream<Event> =
1526 self.gossip_stream_events(filter, timeout, policy).await?;
1527
1528 while let Some(event) = stream.next().await {
1529 // To find out more about why the `force_insert` was used, search for EVENTS_FORCE_INSERT ine the code.
1530 events.force_insert(event);
1531 }
1532
1533 Ok(events)
1534 }
1535
1536 async fn gossip_subscribe(
1537 &self,
1538 id: SubscriptionId,
1539 filter: Filter,
1540 opts: SubscribeOptions,
1541 ) -> Result<Output<()>, Error> {
1542 let filters = self.break_down_filter(filter).await?;
1543 Ok(self.pool.subscribe_targeted(id, filters, opts).await?)
1544 }
1545
1546 async fn gossip_sync_negentropy(
1547 &self,
1548 filter: Filter,
1549 opts: &SyncOptions,
1550 ) -> Result<Output<Reconciliation>, Error> {
1551 // Break down filter
1552 let temp_filters = self.break_down_filter(filter).await?;
1553
1554 let database = self.database();
1555 let mut filters: HashMap<RelayUrl, (Filter, Vec<_>)> =
1556 HashMap::with_capacity(temp_filters.len());
1557
1558 // Iterate broken down filters and compose new filters for targeted reconciliation
1559 for (url, filter) in temp_filters.into_iter() {
1560 // Get items
1561 let items: Vec<(EventId, Timestamp)> =
1562 database.negentropy_items(filter.clone()).await?;
1563
1564 filters.insert(url, (filter, items));
1565 }
1566
1567 // Reconciliation
1568 Ok(self.pool.sync_targeted(filters, opts).await?)
1569 }
1570}