nostr_sdk/client/
mod.rs

1// Copyright (c) 2022-2023 Yuki Kishimoto
2// Copyright (c) 2023-2025 Rust Nostr Developers
3// Distributed under the MIT software license
4
5//! Client
6
7use std::collections::{HashMap, HashSet};
8use std::future::Future;
9use std::iter;
10use std::sync::Arc;
11use std::time::Duration;
12
13use nostr::prelude::*;
14use nostr_database::prelude::*;
15use nostr_relay_pool::prelude::*;
16use tokio::sync::broadcast;
17
18pub mod builder;
19mod error;
20pub mod options;
21
22pub use self::builder::ClientBuilder;
23pub use self::error::Error;
24pub use self::options::{ClientOptions, SleepWhenIdle};
25#[cfg(not(target_arch = "wasm32"))]
26pub use self::options::{Connection, ConnectionTarget};
27use crate::gossip::{BrokenDownFilters, Gossip};
28
29/// Nostr client
30#[derive(Debug, Clone)]
31pub struct Client {
32    pool: RelayPool,
33    gossip: Gossip,
34    opts: ClientOptions,
35}
36
37impl Default for Client {
38    #[inline]
39    fn default() -> Self {
40        Self::builder().build()
41    }
42}
43
44impl Client {
45    /// Construct client with signer
46    ///
47    /// To construct a client without signer use [`Client::default`].
48    ///
49    /// # Example
50    /// ```rust,no_run
51    /// use nostr_sdk::prelude::*;
52    ///
53    /// let keys = Keys::generate();
54    /// let client = Client::new(keys);
55    /// ```
56    #[inline]
57    pub fn new<T>(signer: T) -> Self
58    where
59        T: IntoNostrSigner,
60    {
61        Self::builder().signer(signer).build()
62    }
63
64    /// Construct client
65    ///
66    /// # Example
67    /// ```rust,no_run
68    /// use std::time::Duration;
69    ///
70    /// use nostr_sdk::prelude::*;
71    ///
72    /// let signer = Keys::generate();
73    /// let opts = ClientOptions::default().gossip(true);
74    /// let client: Client = Client::builder().signer(signer).opts(opts).build();
75    /// ```
76    #[inline]
77    pub fn builder() -> ClientBuilder {
78        ClientBuilder::default()
79    }
80
81    fn from_builder(builder: ClientBuilder) -> Self {
82        // Construct relay pool builder
83        let pool_builder: RelayPoolBuilder = RelayPoolBuilder {
84            websocket_transport: builder.websocket_transport,
85            admit_policy: builder.admit_policy,
86            monitor: builder.monitor,
87            opts: builder.opts.pool,
88            __database: builder.database,
89            __signer: builder.signer,
90        };
91
92        // Construct client
93        Self {
94            pool: pool_builder.build(),
95            gossip: Gossip::new(),
96            opts: builder.opts,
97        }
98    }
99
100    /// Update minimum POW difficulty for received events
101    ///
102    /// Events with a POW lower than the current value will be ignored to prevent resources exhaustion.
103    #[deprecated(
104        since = "0.40.0",
105        note = "This no longer works, please use `AdmitPolicy` instead."
106    )]
107    pub fn update_min_pow_difficulty(&self, _difficulty: u8) {}
108
109    /// Auto authenticate to relays (default: true)
110    ///
111    /// <https://github.com/nostr-protocol/nips/blob/master/42.md>
112    #[inline]
113    pub fn automatic_authentication(&self, enable: bool) {
114        self.pool.state().automatic_authentication(enable);
115    }
116
117    /// Check if signer is configured
118    #[inline]
119    pub async fn has_signer(&self) -> bool {
120        self.pool.state().has_signer().await
121    }
122
123    /// Get current nostr signer
124    ///
125    /// # Errors
126    ///
127    /// Returns an error if the signer isn't set.
128    #[inline]
129    pub async fn signer(&self) -> Result<Arc<dyn NostrSigner>, Error> {
130        Ok(self.pool.state().signer().await?)
131    }
132
133    /// Set nostr signer
134    #[inline]
135    pub async fn set_signer<T>(&self, signer: T)
136    where
137        T: IntoNostrSigner,
138    {
139        self.pool.state().set_signer(signer).await;
140    }
141
142    /// Unset nostr signer
143    #[inline]
144    pub async fn unset_signer(&self) {
145        self.pool.state().unset_signer().await;
146    }
147
148    /// Get [`RelayPool`]
149    #[inline]
150    pub fn pool(&self) -> &RelayPool {
151        &self.pool
152    }
153
154    /// Get database
155    #[inline]
156    pub fn database(&self) -> &Arc<dyn NostrDatabase> {
157        self.pool.database()
158    }
159
160    /// Get the relay monitor
161    #[inline]
162    pub fn monitor(&self) -> Option<&Monitor> {
163        self.pool.monitor()
164    }
165
166    /// Reset the client
167    ///
168    /// This method resets the client to simplify the switch to another account.
169    ///
170    /// This method will:
171    /// * unsubscribe from all subscriptions
172    /// * disconnect and force remove all relays
173    /// * unset the signer
174    ///
175    /// This method will NOT:
176    /// * reset [`ClientOptions`]
177    /// * remove the database
178    /// * clear the gossip graph
179    pub async fn reset(&self) {
180        self.unsubscribe_all().await;
181        self.force_remove_all_relays().await;
182        self.unset_signer().await;
183    }
184
185    /// Completely shutdown client
186    #[inline]
187    pub async fn shutdown(&self) {
188        self.pool.shutdown().await
189    }
190
191    /// Get new notification listener
192    ///
193    /// <div class="warning">When you call this method, you subscribe to the notifications channel from that precise moment. Anything received by relay/s before that moment is not included in the channel!</div>
194    #[inline]
195    pub fn notifications(&self) -> broadcast::Receiver<RelayPoolNotification> {
196        self.pool.notifications()
197    }
198
199    /// Get relays with [`RelayServiceFlags::READ`] or [`RelayServiceFlags::WRITE`] flags
200    ///
201    /// Call [`RelayPool::all_relays`] to get all relays
202    /// or [`RelayPool::relays_with_flag`] to get relays with specific [`RelayServiceFlags`].
203    #[inline]
204    pub async fn relays(&self) -> HashMap<RelayUrl, Relay> {
205        self.pool.relays().await
206    }
207
208    /// Get a previously added [`Relay`]
209    #[inline]
210    pub async fn relay<U>(&self, url: U) -> Result<Relay, Error>
211    where
212        U: TryIntoUrl,
213        pool::Error: From<<U as TryIntoUrl>::Err>,
214    {
215        Ok(self.pool.relay(url).await?)
216    }
217
218    async fn compose_relay_opts(&self, _url: &RelayUrl) -> RelayOptions {
219        let opts: RelayOptions = RelayOptions::new();
220
221        // Set connection mode
222        #[cfg(not(target_arch = "wasm32"))]
223        let opts: RelayOptions = match &self.opts.connection.mode {
224            ConnectionMode::Direct => opts,
225            ConnectionMode::Proxy(..) => match self.opts.connection.target {
226                ConnectionTarget::All => opts.connection_mode(self.opts.connection.mode.clone()),
227                ConnectionTarget::Onion => {
228                    if _url.is_onion() {
229                        opts.connection_mode(self.opts.connection.mode.clone())
230                    } else {
231                        opts
232                    }
233                }
234            },
235            #[cfg(feature = "tor")]
236            ConnectionMode::Tor { .. } => match self.opts.connection.target {
237                ConnectionTarget::All => opts.connection_mode(self.opts.connection.mode.clone()),
238                ConnectionTarget::Onion => {
239                    if _url.is_onion() {
240                        opts.connection_mode(self.opts.connection.mode.clone())
241                    } else {
242                        opts
243                    }
244                }
245            },
246        };
247
248        // Set sleep when idle
249        let opts: RelayOptions = match self.opts.sleep_when_idle {
250            // Do nothing
251            SleepWhenIdle::Disabled => opts,
252            // Enable: update relay options
253            SleepWhenIdle::Enabled { timeout } => opts.sleep_when_idle(true).idle_timeout(timeout),
254        };
255
256        // Set limits
257        opts.limits(self.opts.relay_limits.clone())
258            .max_avg_latency(self.opts.max_avg_latency)
259            .verify_subscriptions(self.opts.verify_subscriptions)
260            .ban_relay_on_mismatch(self.opts.ban_relay_on_mismatch)
261    }
262
263    /// If return `false` means that already existed
264    async fn get_or_add_relay_with_flag<U>(
265        &self,
266        url: U,
267        flag: RelayServiceFlags,
268    ) -> Result<bool, Error>
269    where
270        U: TryIntoUrl,
271        pool::Error: From<<U as TryIntoUrl>::Err>,
272    {
273        // Convert into url
274        let url: RelayUrl = url.try_into_url().map_err(pool::Error::from)?;
275
276        // Compose relay options
277        let opts: RelayOptions = self.compose_relay_opts(&url).await;
278
279        // Set flag
280        let opts: RelayOptions = opts.flags(flag);
281
282        // Add relay with opts or edit current one
283        // TODO: remove clone here
284        match self.pool.__get_or_add_relay(url.clone(), opts).await? {
285            Some(relay) => {
286                relay.flags().add(flag);
287                Ok(false)
288            }
289            None => {
290                // TODO: move autoconnect to `Relay`?
291                // Connect if `autoconnect` is enabled
292                if self.opts.autoconnect {
293                    self.connect_relay::<RelayUrl>(url).await?;
294                }
295
296                Ok(true)
297            }
298        }
299    }
300
301    /// Add relay
302    ///
303    /// Relays added with this method will have both [`RelayServiceFlags::READ`] and [`RelayServiceFlags::WRITE`] flags enabled.
304    ///
305    /// If the relay already exists, the flags will be updated and `false` returned.
306    ///
307    /// If are set pool subscriptions, the new added relay will inherit them. Use [`Client::subscribe_to`] method instead of [`Client::subscribe`],
308    /// to avoid to set pool subscriptions.
309    ///
310    /// This method use previously set or default [`ClientOptions`] to configure the [`Relay`] (ex. set proxy, set min POW, set relay limits, ...).
311    /// To use custom [`RelayOptions`] use [`RelayPool::add_relay`].
312    ///
313    /// Connection is **NOT** automatically started with relay, remember to call [`Client::connect`]!
314    #[inline]
315    pub async fn add_relay<U>(&self, url: U) -> Result<bool, Error>
316    where
317        U: TryIntoUrl,
318        pool::Error: From<<U as TryIntoUrl>::Err>,
319    {
320        self.get_or_add_relay_with_flag(url, RelayServiceFlags::default())
321            .await
322    }
323
324    /// Add discovery relay
325    ///
326    /// If relay already exists, this method automatically add the [`RelayServiceFlags::DISCOVERY`] flag to it and return `false`.
327    ///
328    /// <https://github.com/nostr-protocol/nips/blob/master/65.md>
329    #[inline]
330    pub async fn add_discovery_relay<U>(&self, url: U) -> Result<bool, Error>
331    where
332        U: TryIntoUrl,
333        pool::Error: From<<U as TryIntoUrl>::Err>,
334    {
335        self.get_or_add_relay_with_flag(url, RelayServiceFlags::PING | RelayServiceFlags::DISCOVERY)
336            .await
337    }
338
339    /// Add read relay
340    ///
341    /// If relay already exists, this method add the [`RelayServiceFlags::READ`] flag to it and return `false`.
342    ///
343    /// If are set pool subscriptions, the new added relay will inherit them. Use `subscribe_to` method instead of `subscribe`,
344    /// to avoid to set pool subscriptions.
345    #[inline]
346    pub async fn add_read_relay<U>(&self, url: U) -> Result<bool, Error>
347    where
348        U: TryIntoUrl,
349        pool::Error: From<<U as TryIntoUrl>::Err>,
350    {
351        self.get_or_add_relay_with_flag(url, RelayServiceFlags::PING | RelayServiceFlags::READ)
352            .await
353    }
354
355    /// Add write relay
356    ///
357    /// If relay already exists, this method add the [`RelayServiceFlags::WRITE`] flag to it and return `false`.
358    #[inline]
359    pub async fn add_write_relay<U>(&self, url: U) -> Result<bool, Error>
360    where
361        U: TryIntoUrl,
362        pool::Error: From<<U as TryIntoUrl>::Err>,
363    {
364        self.get_or_add_relay_with_flag(url, RelayServiceFlags::PING | RelayServiceFlags::WRITE)
365            .await
366    }
367
368    #[inline]
369    async fn add_gossip_relay<U>(&self, url: U) -> Result<bool, Error>
370    where
371        U: TryIntoUrl,
372        pool::Error: From<<U as TryIntoUrl>::Err>,
373    {
374        self.get_or_add_relay_with_flag(url, RelayServiceFlags::PING | RelayServiceFlags::GOSSIP)
375            .await
376    }
377
378    /// Remove and disconnect relay
379    ///
380    /// If the relay has [`RelayServiceFlags::GOSSIP`], it will not be removed from the pool and its
381    /// flags will be updated (remove [`RelayServiceFlags::READ`],
382    /// [`RelayServiceFlags::WRITE`] and [`RelayServiceFlags::DISCOVERY`] flags).
383    ///
384    /// To force remove the relay, use [`Client::force_remove_relay`].
385    #[inline]
386    pub async fn remove_relay<U>(&self, url: U) -> Result<(), Error>
387    where
388        U: TryIntoUrl,
389        pool::Error: From<<U as TryIntoUrl>::Err>,
390    {
391        Ok(self.pool.remove_relay(url).await?)
392    }
393
394    /// Force remove and disconnect relay
395    ///
396    /// Note: this method will remove the relay, also if it's in use for the gossip model or other service!
397    #[inline]
398    pub async fn force_remove_relay<U>(&self, url: U) -> Result<(), Error>
399    where
400        U: TryIntoUrl,
401        pool::Error: From<<U as TryIntoUrl>::Err>,
402    {
403        Ok(self.pool.force_remove_relay(url).await?)
404    }
405
406    /// Disconnect and remove all relays
407    ///
408    /// Some relays used by some services could not be disconnected with this method
409    /// (like the ones used for gossip).
410    /// Use [`Client::force_remove_all_relays`] to remove every relay.
411    #[inline]
412    pub async fn remove_all_relays(&self) {
413        self.pool.remove_all_relays().await
414    }
415
416    /// Disconnect and force remove all relays
417    #[inline]
418    pub async fn force_remove_all_relays(&self) {
419        self.pool.force_remove_all_relays().await
420    }
421
422    /// Connect to a previously added relay
423    ///
424    /// Check [`RelayPool::connect_relay`] docs to learn more.
425    #[inline]
426    pub async fn connect_relay<U>(&self, url: U) -> Result<(), Error>
427    where
428        U: TryIntoUrl,
429        pool::Error: From<<U as TryIntoUrl>::Err>,
430    {
431        Ok(self.pool.connect_relay(url).await?)
432    }
433
434    /// Try to connect to a previously added relay
435    ///
436    /// For further details, see the documentation of [`RelayPool::try_connect_relay`].
437    #[inline]
438    pub async fn try_connect_relay<U>(&self, url: U, timeout: Duration) -> Result<(), Error>
439    where
440        U: TryIntoUrl,
441        pool::Error: From<<U as TryIntoUrl>::Err>,
442    {
443        Ok(self.pool.try_connect_relay(url, timeout).await?)
444    }
445
446    /// Disconnect relay
447    #[inline]
448    pub async fn disconnect_relay<U>(&self, url: U) -> Result<(), Error>
449    where
450        U: TryIntoUrl,
451        pool::Error: From<<U as TryIntoUrl>::Err>,
452    {
453        Ok(self.pool.disconnect_relay(url).await?)
454    }
455
456    /// Connect to all added relays
457    ///
458    /// Attempts to initiate a connection for every relay currently in
459    /// [`RelayStatus::Initialized`] or [`RelayStatus::Terminated`].
460    /// A background connection task is spawned for each such relay, which then tries
461    /// to establish the connection.
462    /// Any relay not in one of these two statuses is skipped.
463    ///
464    /// For further details, see the documentation of [`Relay::connect`].
465    #[inline]
466    pub async fn connect(&self) {
467        self.pool.connect().await;
468    }
469
470    /// Waits for relays connections
471    ///
472    /// Wait for relays connections at most for the specified `timeout`.
473    /// The code continues when the relays are connected or the `timeout` is reached.
474    #[inline]
475    pub async fn wait_for_connection(&self, timeout: Duration) {
476        self.pool.wait_for_connection(timeout).await
477    }
478
479    /// Try to establish a connection with the relays.
480    ///
481    /// Attempts to establish a connection for every relay currently in
482    /// [`RelayStatus::Initialized`] or [`RelayStatus::Terminated`]
483    /// without spawning the connection task if it fails.
484    /// This means that if the connection fails, no automatic retries are scheduled.
485    /// Use [`Client::connect`] if you want to immediately spawn a connection task,
486    /// regardless of whether the initial connection succeeds.
487    ///
488    /// For further details, see the documentation of [`Relay::try_connect`].
489    #[inline]
490    pub async fn try_connect(&self, timeout: Duration) -> Output<()> {
491        self.pool.try_connect(timeout).await
492    }
493
494    /// Connect to all added relays
495    ///
496    /// Try to connect to the relays and wait for them to be connected at most for the specified `timeout`.
497    /// The code continues if the `timeout` is reached or if all relays connect.
498    #[deprecated(
499        since = "0.39.0",
500        note = "Use `connect` + `wait_for_connection` instead."
501    )]
502    pub async fn connect_with_timeout(&self, timeout: Duration) {
503        self.pool.try_connect(timeout).await;
504    }
505
506    /// Disconnect from all relays
507    #[inline]
508    pub async fn disconnect(&self) {
509        self.pool.disconnect().await
510    }
511
512    /// Get subscriptions
513    #[inline]
514    pub async fn subscriptions(&self) -> HashMap<SubscriptionId, HashMap<RelayUrl, Filter>> {
515        self.pool.subscriptions().await
516    }
517
518    /// Get subscription
519    #[inline]
520    pub async fn subscription(&self, id: &SubscriptionId) -> HashMap<RelayUrl, Filter> {
521        self.pool.subscription(id).await
522    }
523
524    /// Subscribe to filters
525    ///
526    /// This method create a new subscription. None of the previous subscriptions will be edited/closed when you call this!
527    /// So remember to unsubscribe when you no longer need it. You can get all your active (non-auto-closing) subscriptions
528    /// by calling `client.subscriptions().await`.
529    ///
530    /// If `gossip` is enabled (see [`ClientOptions::gossip`]) the events will be requested also to
531    /// NIP65 relays (automatically discovered) of public keys included in filters (if any).
532    ///
533    /// # Auto-closing subscription
534    ///
535    /// It's possible to automatically close a subscription by configuring the [`SubscribeAutoCloseOptions`].
536    ///
537    /// Note: auto-closing subscriptions aren't saved in subscriptions map!
538    ///
539    /// # Example
540    /// ```rust,no_run
541    /// # use nostr_sdk::prelude::*;
542    /// # #[tokio::main]
543    /// # async fn main() -> Result<()> {
544    /// #   let keys = Keys::generate();
545    /// #   let client = Client::new(keys.clone());
546    /// // Compose filter
547    /// let subscription = Filter::new()
548    ///     .pubkeys(vec![keys.public_key()])
549    ///     .since(Timestamp::now());
550    ///
551    /// // Subscribe
552    /// let output = client.subscribe(subscription, None).await?;
553    /// println!("Subscription ID: {}", output.val);
554    ///
555    /// // Auto-closing subscription
556    /// let id = SubscriptionId::generate();
557    /// let subscription = Filter::new().kind(Kind::TextNote).limit(10);
558    /// let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE);
559    /// let output = client.subscribe(subscription, Some(opts)).await?;
560    /// println!("Subscription ID: {} [auto-closing]", output.val);
561    /// # Ok(())
562    /// # }
563    /// ```
564    pub async fn subscribe(
565        &self,
566        filter: Filter,
567        opts: Option<SubscribeAutoCloseOptions>,
568    ) -> Result<Output<SubscriptionId>, Error> {
569        let id: SubscriptionId = SubscriptionId::generate();
570        let output: Output<()> = self.subscribe_with_id(id.clone(), filter, opts).await?;
571        Ok(Output {
572            val: id,
573            success: output.success,
574            failed: output.failed,
575        })
576    }
577
578    /// Subscribe to filters with custom [SubscriptionId]
579    ///
580    /// If `gossip` is enabled (see [`ClientOptions::gossip`]) the events will be requested also to
581    /// NIP65 relays (automatically discovered) of public keys included in filters (if any).
582    ///
583    /// # Auto-closing subscription
584    ///
585    /// It's possible to automatically close a subscription by configuring the [SubscribeAutoCloseOptions].
586    ///
587    /// Note: auto-closing subscriptions aren't saved in subscriptions map!
588    pub async fn subscribe_with_id(
589        &self,
590        id: SubscriptionId,
591        filter: Filter,
592        opts: Option<SubscribeAutoCloseOptions>,
593    ) -> Result<Output<()>, Error> {
594        let opts: SubscribeOptions = SubscribeOptions::default().close_on(opts);
595
596        if self.opts.gossip {
597            self.gossip_subscribe(id, filter, opts).await
598        } else {
599            Ok(self.pool.subscribe_with_id(id, filter, opts).await?)
600        }
601    }
602
603    /// Subscribe to filters to specific relays
604    ///
605    /// This method create a new subscription. None of the previous subscriptions will be edited/closed when you call this!
606    /// So remember to unsubscribe when you no longer need it.
607    ///
608    /// ### Auto-closing subscription
609    ///
610    /// It's possible to automatically close a subscription by configuring the [SubscribeAutoCloseOptions].
611    #[inline]
612    pub async fn subscribe_to<I, U>(
613        &self,
614        urls: I,
615        filter: Filter,
616        opts: Option<SubscribeAutoCloseOptions>,
617    ) -> Result<Output<SubscriptionId>, Error>
618    where
619        I: IntoIterator<Item = U>,
620        U: TryIntoUrl,
621        pool::Error: From<<U as TryIntoUrl>::Err>,
622    {
623        let opts: SubscribeOptions = SubscribeOptions::default().close_on(opts);
624        Ok(self.pool.subscribe_to(urls, filter, opts).await?)
625    }
626
627    /// Subscribe to filter with custom [SubscriptionId] to specific relays
628    ///
629    /// ### Auto-closing subscription
630    ///
631    /// It's possible to automatically close a subscription by configuring the [SubscribeAutoCloseOptions].
632    #[inline]
633    pub async fn subscribe_with_id_to<I, U>(
634        &self,
635        urls: I,
636        id: SubscriptionId,
637        filter: Filter,
638        opts: Option<SubscribeAutoCloseOptions>,
639    ) -> Result<Output<()>, Error>
640    where
641        I: IntoIterator<Item = U>,
642        U: TryIntoUrl,
643        pool::Error: From<<U as TryIntoUrl>::Err>,
644    {
645        let opts: SubscribeOptions = SubscribeOptions::default().close_on(opts);
646        Ok(self
647            .pool
648            .subscribe_with_id_to(urls, id, filter, opts)
649            .await?)
650    }
651
652    /// Targeted subscription
653    ///
654    /// Subscribe to specific relays with specific filters
655    #[inline]
656    pub async fn subscribe_targeted<I, U>(
657        &self,
658        id: SubscriptionId,
659        targets: I,
660        opts: SubscribeOptions,
661    ) -> Result<Output<()>, Error>
662    where
663        I: IntoIterator<Item = (U, Filter)>,
664        U: TryIntoUrl,
665        pool::Error: From<<U as TryIntoUrl>::Err>,
666    {
667        Ok(self.pool.subscribe_targeted(id, targets, opts).await?)
668    }
669
670    /// Unsubscribe
671    #[inline]
672    pub async fn unsubscribe(&self, id: &SubscriptionId) {
673        self.pool.unsubscribe(id).await;
674    }
675
676    /// Unsubscribe from all subscriptions
677    #[inline]
678    pub async fn unsubscribe_all(&self) {
679        self.pool.unsubscribe_all().await;
680    }
681
682    /// Sync events with relays (negentropy reconciliation)
683    ///
684    /// If `gossip` is enabled (see [`ClientOptions::gossip`]) the events will be reconciled also from
685    /// NIP65 relays (automatically discovered) of public keys included in filters (if any).
686    ///
687    /// <https://github.com/hoytech/negentropy>
688    #[inline]
689    pub async fn sync(
690        &self,
691        filter: Filter,
692        opts: &SyncOptions,
693    ) -> Result<Output<Reconciliation>, Error> {
694        if self.opts.gossip {
695            return self.gossip_sync_negentropy(filter, opts).await;
696        }
697
698        Ok(self.pool.sync(filter, opts).await?)
699    }
700
701    /// Sync events with specific relays (negentropy reconciliation)
702    ///
703    /// <https://github.com/hoytech/negentropy>
704    pub async fn sync_with<I, U>(
705        &self,
706        urls: I,
707        filter: Filter,
708        opts: &SyncOptions,
709    ) -> Result<Output<Reconciliation>, Error>
710    where
711        I: IntoIterator<Item = U>,
712        U: TryIntoUrl,
713        pool::Error: From<<U as TryIntoUrl>::Err>,
714    {
715        Ok(self.pool.sync_with(urls, filter, opts).await?)
716    }
717
718    /// Fetch events from relays
719    ///
720    /// # Overview
721    ///
722    /// This is an **auto-closing subscription** and will be closed automatically on `EOSE`.
723    /// To use another exit policy, check [`RelayPool::fetch_events`].
724    /// For long-lived subscriptions, check [`Client::subscribe`].
725    ///
726    /// # Gossip
727    ///
728    /// If `gossip` is enabled (see [`ClientOptions::gossip`]) the events will be requested also to
729    /// NIP65 relays (automatically discovered) of public keys included in filters (if any).
730    ///
731    /// # Example
732    /// ```rust,no_run
733    /// # use std::time::Duration;
734    /// # use nostr_sdk::prelude::*;
735    /// # #[tokio::main]
736    /// # async fn main() {
737    /// #   let keys = Keys::generate();
738    /// #   let client = Client::new(keys.clone());
739    /// let subscription = Filter::new()
740    ///     .pubkeys(vec![keys.public_key()])
741    ///     .since(Timestamp::now());
742    ///
743    /// let _events = client
744    ///     .fetch_events(subscription, Duration::from_secs(10))
745    ///     .await
746    ///     .unwrap();
747    /// # }
748    /// ```
749    pub async fn fetch_events(&self, filter: Filter, timeout: Duration) -> Result<Events, Error> {
750        if self.opts.gossip {
751            return self
752                .gossip_fetch_events(filter, timeout, ReqExitPolicy::ExitOnEOSE)
753                .await;
754        }
755
756        Ok(self
757            .pool
758            .fetch_events(filter, timeout, ReqExitPolicy::ExitOnEOSE)
759            .await?)
760    }
761
762    /// Fetch events from specific relays
763    ///
764    /// # Overview
765    ///
766    /// This is an **auto-closing subscription** and will be closed automatically on `EOSE`.
767    /// To use another exit policy, check [`RelayPool::fetch_events_from`].
768    /// For long-lived subscriptions, check [`Client::subscribe_to`].
769    #[inline]
770    pub async fn fetch_events_from<I, U>(
771        &self,
772        urls: I,
773        filter: Filter,
774        timeout: Duration,
775    ) -> Result<Events, Error>
776    where
777        I: IntoIterator<Item = U>,
778        U: TryIntoUrl,
779        pool::Error: From<<U as TryIntoUrl>::Err>,
780    {
781        Ok(self
782            .pool
783            .fetch_events_from(urls, filter, timeout, ReqExitPolicy::ExitOnEOSE)
784            .await?)
785    }
786
787    /// Get events both from database and relays
788    ///
789    /// # Overview
790    ///
791    /// This is an **auto-closing subscription** and will be closed automatically on `EOSE`.
792    /// For long-lived subscriptions, check [`Client::subscribe`].
793    ///
794    /// # Gossip
795    ///
796    /// If `gossip` is enabled (see [`ClientOptions::gossip`]) the events will be requested also to
797    /// NIP65 relays (automatically discovered) of public keys included in filters (if any).
798    ///
799    /// # Notes and alternative example
800    ///
801    /// This method will be deprecated in the future!
802    /// This is a temporary solution for who still want to query events both from database and relays and merge the result.
803    /// The optimal solution is to execute a [`Client::sync`] to reconcile missing events, [`Client::subscribe`] to get all
804    /// new future events, [`NostrDatabase::query`] to query stored events and [`Client::handle_notifications`] to listen-for/handle new events (i.e. to know when update the UI).
805    /// This will allow very fast queries, low bandwidth usage (depending on how many events the client have to reconcile) and a lower load on the relays.
806    ///
807    /// You can obtain the same result with:
808    /// ```rust,no_run
809    /// # use std::time::Duration;
810    /// # use nostr_sdk::prelude::*;
811    /// # #[tokio::main]
812    /// # async fn main() -> Result<()> {
813    /// # let client = Client::default();
814    /// # let filter = Filter::new().limit(1);
815    /// // Query database
816    /// let stored_events: Events = client.database().query(filter.clone()).await?;
817    ///
818    /// // Query relays
819    /// let fetched_events: Events = client.fetch_events(filter, Duration::from_secs(10)).await?;
820    ///
821    /// // Merge result
822    /// let events: Events = stored_events.merge(fetched_events);
823    ///
824    /// // Iter and print result
825    /// for event in events.into_iter() {
826    ///     println!("{}", event.as_json());
827    /// }
828    /// # Ok(())
829    /// # }
830    /// ```
831    pub async fn fetch_combined_events(
832        &self,
833        filter: Filter,
834        timeout: Duration,
835    ) -> Result<Events, Error> {
836        // Query database
837        let stored_events: Events = self.database().query(filter.clone()).await?;
838
839        // Query relays
840        let fetched_events: Events = self.fetch_events(filter, timeout).await?;
841
842        // Merge result
843        Ok(stored_events.merge(fetched_events))
844    }
845
846    /// Stream events from relays
847    ///
848    /// # Overview
849    ///
850    /// This is an **auto-closing subscription** and will be closed automatically on `EOSE`.
851    /// To use another exit policy, check [`RelayPool::stream_events`].
852    /// For long-lived subscriptions, check [`Client::subscribe`].
853    ///
854    /// # Gossip
855    ///
856    /// If `gossip` is enabled (see [`ClientOptions::gossip`]) the events will be streamed also from
857    /// NIP65 relays (automatically discovered) of public keys included in filters (if any).
858    pub async fn stream_events(
859        &self,
860        filter: Filter,
861        timeout: Duration,
862    ) -> Result<ReceiverStream<Event>, Error> {
863        // Check if gossip is enabled
864        if self.opts.gossip {
865            self.gossip_stream_events(filter, timeout, ReqExitPolicy::ExitOnEOSE)
866                .await
867        } else {
868            Ok(self
869                .pool
870                .stream_events(filter, timeout, ReqExitPolicy::ExitOnEOSE)
871                .await?)
872        }
873    }
874
875    /// Stream events from specific relays
876    ///
877    /// # Overview
878    ///
879    /// This is an **auto-closing subscription** and will be closed automatically on `EOSE`.
880    /// To use another exit policy, check [`RelayPool::stream_events_from`].
881    /// For long-lived subscriptions, check [`Client::subscribe_to`].
882    #[inline]
883    pub async fn stream_events_from<I, U>(
884        &self,
885        urls: I,
886        filter: Filter,
887        timeout: Duration,
888    ) -> Result<ReceiverStream<Event>, Error>
889    where
890        I: IntoIterator<Item = U>,
891        U: TryIntoUrl,
892        pool::Error: From<<U as TryIntoUrl>::Err>,
893    {
894        Ok(self
895            .pool
896            .stream_events_from(urls, filter, timeout, ReqExitPolicy::default())
897            .await?)
898    }
899
900    /// Stream events from specific relays with specific filters
901    ///
902    /// # Overview
903    ///
904    /// This is an **auto-closing subscription** and will be closed automatically on `EOSE`.
905    /// To use another exit policy, check [`RelayPool::stream_events_targeted`].
906    /// For long-lived subscriptions, check [`Client::subscribe_targeted`].
907    pub async fn stream_events_targeted(
908        &self,
909        targets: HashMap<RelayUrl, Filter>,
910        timeout: Duration,
911    ) -> Result<ReceiverStream<Event>, Error> {
912        Ok(self
913            .pool
914            .stream_events_targeted(targets, timeout, ReqExitPolicy::default())
915            .await?)
916    }
917
918    /// Send the client message to a **specific relays**
919    #[inline]
920    pub async fn send_msg_to<I, U>(
921        &self,
922        urls: I,
923        msg: ClientMessage<'_>,
924    ) -> Result<Output<()>, Error>
925    where
926        I: IntoIterator<Item = U>,
927        U: TryIntoUrl,
928        pool::Error: From<<U as TryIntoUrl>::Err>,
929    {
930        Ok(self.pool.send_msg_to(urls, msg).await?)
931    }
932
933    /// Batch send client messages to **specific relays**
934    #[inline]
935    pub async fn batch_msg_to<I, U>(
936        &self,
937        urls: I,
938        msgs: Vec<ClientMessage<'_>>,
939    ) -> Result<Output<()>, Error>
940    where
941        I: IntoIterator<Item = U>,
942        U: TryIntoUrl,
943        pool::Error: From<<U as TryIntoUrl>::Err>,
944    {
945        Ok(self.pool.batch_msg_to(urls, msgs).await?)
946    }
947
948    /// Send the event to relays
949    ///
950    /// # Overview
951    ///
952    /// Send the [`Event`] to all relays with [`RelayServiceFlags::WRITE`] flag.
953    ///
954    /// # Gossip
955    ///
956    /// If `gossip` is enabled (see [`ClientOptions::gossip`]):
957    /// - the [`Event`] will be sent also to NIP65 relays (automatically discovered);
958    /// - the gossip data will be updated, if the [`Event`] is a NIP17/NIP65 relay list.
959    #[inline]
960    pub async fn send_event(&self, event: &Event) -> Result<Output<EventId>, Error> {
961        // NOT gossip, send event to all relays
962        if !self.opts.gossip {
963            return Ok(self.pool.send_event(event).await?);
964        }
965
966        // Update gossip graph
967        self.gossip.process_event(event).await;
968
969        // Send event using gossip
970        self.gossip_send_event(event, false).await
971    }
972
973    /// Send event to specific relays
974    ///
975    /// # Gossip
976    ///
977    /// If `gossip` is enabled (see [`ClientOptions::gossip`]) and the [`Event`] is a NIP17/NIP65 relay list,
978    /// the gossip data will be updated.
979    #[inline]
980    pub async fn send_event_to<I, U>(
981        &self,
982        urls: I,
983        event: &Event,
984    ) -> Result<Output<EventId>, Error>
985    where
986        I: IntoIterator<Item = U>,
987        U: TryIntoUrl,
988        pool::Error: From<<U as TryIntoUrl>::Err>,
989    {
990        // If gossip is enabled, update the gossip graph
991        if self.opts.gossip {
992            self.gossip.process_event(event).await;
993        }
994
995        // Send event to relays
996        Ok(self.pool.send_event_to(urls, event).await?)
997    }
998
999    /// Build, sign and return [`Event`]
1000    ///
1001    /// This method requires a [`NostrSigner`].
1002    pub async fn sign_event_builder(&self, builder: EventBuilder) -> Result<Event, Error> {
1003        let signer = self.signer().await?;
1004        Ok(builder.sign(&signer).await?)
1005    }
1006
1007    /// Take an [`EventBuilder`], sign it by using the [`NostrSigner`] and broadcast to relays.
1008    ///
1009    /// This method requires a [`NostrSigner`].
1010    ///
1011    /// Check [`Client::send_event`] from more details.
1012    #[inline]
1013    pub async fn send_event_builder(
1014        &self,
1015        builder: EventBuilder,
1016    ) -> Result<Output<EventId>, Error> {
1017        let event: Event = self.sign_event_builder(builder).await?;
1018        self.send_event(&event).await
1019    }
1020
1021    /// Take an [`EventBuilder`], sign it by using the [`NostrSigner`] and broadcast to specific relays.
1022    ///
1023    /// This method requires a [`NostrSigner`].
1024    ///
1025    /// Check [`Client::send_event_to`] from more details.
1026    #[inline]
1027    pub async fn send_event_builder_to<I, U>(
1028        &self,
1029        urls: I,
1030        builder: EventBuilder,
1031    ) -> Result<Output<EventId>, Error>
1032    where
1033        I: IntoIterator<Item = U>,
1034        U: TryIntoUrl,
1035        pool::Error: From<<U as TryIntoUrl>::Err>,
1036    {
1037        let event: Event = self.sign_event_builder(builder).await?;
1038        self.send_event_to(urls, &event).await
1039    }
1040
1041    /// Fetch the newest public key metadata from relays.
1042    ///
1043    /// Returns [`None`] if the [`Metadata`] of the  [`PublicKey`] has not been found.
1044    ///
1045    /// Check [`Client::fetch_events`] for more details.
1046    ///
1047    /// If you only want to consult stored data,
1048    /// consider `client.database().profile(PUBKEY)`.
1049    ///
1050    /// <https://github.com/nostr-protocol/nips/blob/master/01.md>
1051    pub async fn fetch_metadata(
1052        &self,
1053        public_key: PublicKey,
1054        timeout: Duration,
1055    ) -> Result<Option<Metadata>, Error> {
1056        let filter: Filter = Filter::new()
1057            .author(public_key)
1058            .kind(Kind::Metadata)
1059            .limit(1);
1060        let events: Events = self.fetch_events(filter, timeout).await?;
1061        match events.first() {
1062            Some(event) => Ok(Some(Metadata::try_from(event)?)),
1063            None => Ok(None),
1064        }
1065    }
1066
1067    /// Update metadata
1068    ///
1069    /// This method requires a [`NostrSigner`].
1070    ///
1071    /// <https://github.com/nostr-protocol/nips/blob/master/01.md>
1072    ///
1073    /// # Example
1074    /// ```rust,no_run
1075    /// # use nostr_sdk::prelude::*;
1076    /// # #[tokio::main]
1077    /// # async fn main() {
1078    /// #   let keys = Keys::generate();
1079    /// #   let client = Client::new(keys);
1080    /// let metadata = Metadata::new()
1081    ///     .name("username")
1082    ///     .display_name("My Username")
1083    ///     .about("Description")
1084    ///     .picture(Url::parse("https://example.com/avatar.png").unwrap())
1085    ///     .nip05("username@example.com");
1086    ///
1087    /// client.set_metadata(&metadata).await.unwrap();
1088    /// # }
1089    /// ```
1090    #[inline]
1091    pub async fn set_metadata(&self, metadata: &Metadata) -> Result<Output<EventId>, Error> {
1092        let builder = EventBuilder::metadata(metadata);
1093        self.send_event_builder(builder).await
1094    }
1095
1096    async fn get_contact_list_filter(&self) -> Result<Filter, Error> {
1097        let signer = self.signer().await?;
1098        let public_key = signer.get_public_key().await?;
1099        let filter: Filter = Filter::new()
1100            .author(public_key)
1101            .kind(Kind::ContactList)
1102            .limit(1);
1103        Ok(filter)
1104    }
1105
1106    /// Get the contact list from relays.
1107    ///
1108    /// This method requires a [`NostrSigner`].
1109    ///
1110    /// <https://github.com/nostr-protocol/nips/blob/master/02.md>
1111    pub async fn get_contact_list(&self, timeout: Duration) -> Result<Vec<Contact>, Error> {
1112        let mut contact_list: Vec<Contact> = Vec::new();
1113        let filter: Filter = self.get_contact_list_filter().await?;
1114        let events: Events = self.fetch_events(filter, timeout).await?;
1115
1116        // Get first event (result of `fetch_events` is sorted DESC by timestamp)
1117        if let Some(event) = events.first_owned() {
1118            for tag in event.tags.into_iter() {
1119                if let Some(TagStandard::PublicKey {
1120                    public_key,
1121                    relay_url,
1122                    alias,
1123                    uppercase: false,
1124                }) = tag.to_standardized()
1125                {
1126                    contact_list.push(Contact {
1127                        public_key,
1128                        relay_url,
1129                        alias,
1130                    })
1131                }
1132            }
1133        }
1134
1135        Ok(contact_list)
1136    }
1137
1138    /// Get contact list public keys from relays.
1139    ///
1140    /// This method requires a [`NostrSigner`].
1141    ///
1142    /// <https://github.com/nostr-protocol/nips/blob/master/02.md>
1143    pub async fn get_contact_list_public_keys(
1144        &self,
1145        timeout: Duration,
1146    ) -> Result<Vec<PublicKey>, Error> {
1147        let mut pubkeys: Vec<PublicKey> = Vec::new();
1148        let filter: Filter = self.get_contact_list_filter().await?;
1149        let events: Events = self.fetch_events(filter, timeout).await?;
1150
1151        for event in events.into_iter() {
1152            pubkeys.extend(event.tags.public_keys());
1153        }
1154
1155        Ok(pubkeys)
1156    }
1157
1158    /// Get contact list [`Metadata`] from relays.
1159    ///
1160    /// This method requires a [`NostrSigner`].
1161    pub async fn get_contact_list_metadata(
1162        &self,
1163        timeout: Duration,
1164    ) -> Result<HashMap<PublicKey, Metadata>, Error> {
1165        let public_keys = self.get_contact_list_public_keys(timeout).await?;
1166        let mut contacts: HashMap<PublicKey, Metadata> =
1167            public_keys.iter().map(|p| (*p, Metadata::new())).collect();
1168
1169        let filter: Filter = Filter::new().authors(public_keys).kind(Kind::Metadata);
1170        let events: Events = self.fetch_events(filter, timeout).await?;
1171        for event in events.into_iter() {
1172            let metadata = Metadata::from_json(&event.content)?;
1173            if let Some(m) = contacts.get_mut(&event.pubkey) {
1174                *m = metadata
1175            };
1176        }
1177
1178        Ok(contacts)
1179    }
1180
1181    /// Send a private direct message
1182    ///
1183    /// If `gossip` is enabled (see [`ClientOptions::gossip`]) the message will be sent to the NIP17 relays (automatically discovered).
1184    /// If gossip is not enabled will be sent to all relays with [`RelayServiceFlags::WRITE`] flag.
1185    ///
1186    /// This method requires a [`NostrSigner`].
1187    ///
1188    /// # Errors
1189    ///
1190    /// Returns [`Error::PrivateMsgRelaysNotFound`] if the receiver hasn't set the NIP17 list,
1191    /// meaning that is not ready to receive private messages.
1192    ///
1193    /// <https://github.com/nostr-protocol/nips/blob/master/17.md>
1194    #[inline]
1195    #[cfg(feature = "nip59")]
1196    pub async fn send_private_msg<S, I>(
1197        &self,
1198        receiver: PublicKey,
1199        message: S,
1200        rumor_extra_tags: I,
1201    ) -> Result<Output<EventId>, Error>
1202    where
1203        S: Into<String>,
1204        I: IntoIterator<Item = Tag>,
1205    {
1206        let signer = self.signer().await?;
1207        let event: Event =
1208            EventBuilder::private_msg(&signer, receiver, message, rumor_extra_tags).await?;
1209
1210        // NOT gossip, send to all relays
1211        if !self.opts.gossip {
1212            return self.send_event(&event).await;
1213        }
1214
1215        self.gossip_send_event(&event, true).await
1216    }
1217
1218    /// Send a private direct message to specific relays
1219    ///
1220    /// This method requires a [`NostrSigner`].
1221    ///
1222    /// <https://github.com/nostr-protocol/nips/blob/master/17.md>
1223    #[inline]
1224    #[cfg(feature = "nip59")]
1225    pub async fn send_private_msg_to<I, S, U, IT>(
1226        &self,
1227        urls: I,
1228        receiver: PublicKey,
1229        message: S,
1230        rumor_extra_tags: IT,
1231    ) -> Result<Output<EventId>, Error>
1232    where
1233        I: IntoIterator<Item = U>,
1234        S: Into<String>,
1235        U: TryIntoUrl,
1236        IT: IntoIterator<Item = Tag>,
1237        pool::Error: From<<U as TryIntoUrl>::Err>,
1238    {
1239        let signer = self.signer().await?;
1240        let event: Event =
1241            EventBuilder::private_msg(&signer, receiver, message, rumor_extra_tags).await?;
1242        self.send_event_to(urls, &event).await
1243    }
1244
1245    /// Construct Gift Wrap and send to relays
1246    ///
1247    /// This method requires a [`NostrSigner`].
1248    ///
1249    /// Check [`Client::send_event`] to know how sending events works.
1250    ///
1251    /// <https://github.com/nostr-protocol/nips/blob/master/59.md>
1252    #[inline]
1253    #[cfg(feature = "nip59")]
1254    pub async fn gift_wrap<I>(
1255        &self,
1256        receiver: &PublicKey,
1257        rumor: UnsignedEvent,
1258        extra_tags: I,
1259    ) -> Result<Output<EventId>, Error>
1260    where
1261        I: IntoIterator<Item = Tag>,
1262    {
1263        // Acquire signer
1264        let signer = self.signer().await?;
1265
1266        // Build gift wrap
1267        let gift_wrap: Event =
1268            EventBuilder::gift_wrap(&signer, receiver, rumor, extra_tags).await?;
1269
1270        // Send
1271        self.send_event(&gift_wrap).await
1272    }
1273
1274    /// Construct Gift Wrap and send to specific relays
1275    ///
1276    /// This method requires a [`NostrSigner`].
1277    ///
1278    /// <https://github.com/nostr-protocol/nips/blob/master/59.md>
1279    #[inline]
1280    #[cfg(feature = "nip59")]
1281    pub async fn gift_wrap_to<I, U, IT>(
1282        &self,
1283        urls: I,
1284        receiver: &PublicKey,
1285        rumor: UnsignedEvent,
1286        extra_tags: IT,
1287    ) -> Result<Output<EventId>, Error>
1288    where
1289        I: IntoIterator<Item = U>,
1290        U: TryIntoUrl,
1291        IT: IntoIterator<Item = Tag>,
1292        pool::Error: From<<U as TryIntoUrl>::Err>,
1293    {
1294        // Acquire signer
1295        let signer = self.signer().await?;
1296
1297        // Build gift wrap
1298        let gift_wrap: Event =
1299            EventBuilder::gift_wrap(&signer, receiver, rumor, extra_tags).await?;
1300
1301        // Send
1302        self.send_event_to(urls, &gift_wrap).await
1303    }
1304
1305    /// Unwrap Gift Wrap event
1306    ///
1307    /// This method requires a [`NostrSigner`].
1308    ///
1309    /// Check [`UnwrappedGift::from_gift_wrap`] to learn more.
1310    ///
1311    /// <https://github.com/nostr-protocol/nips/blob/master/59.md>
1312    #[inline]
1313    #[cfg(feature = "nip59")]
1314    pub async fn unwrap_gift_wrap(&self, gift_wrap: &Event) -> Result<UnwrappedGift, Error> {
1315        let signer = self.signer().await?;
1316        Ok(UnwrappedGift::from_gift_wrap(&signer, gift_wrap).await?)
1317    }
1318
1319    /// Handle notifications
1320    ///
1321    /// The closure function expects a `bool` as output: return `true` to exit from the notification loop.
1322    #[inline]
1323    pub async fn handle_notifications<F, Fut>(&self, func: F) -> Result<(), Error>
1324    where
1325        F: Fn(RelayPoolNotification) -> Fut,
1326        Fut: Future<Output = Result<bool>>,
1327    {
1328        Ok(self.pool.handle_notifications(func).await?)
1329    }
1330}
1331
1332// Gossip
1333impl Client {
1334    /// Check if there are outdated public keys and update them
1335    async fn check_and_update_gossip<I>(&self, public_keys: I) -> Result<(), Error>
1336    where
1337        I: IntoIterator<Item = PublicKey>,
1338    {
1339        let outdated_public_keys: HashSet<PublicKey> =
1340            self.gossip.check_outdated(public_keys).await;
1341
1342        // No outdated public keys, immediately return.
1343        if outdated_public_keys.is_empty() {
1344            return Ok(());
1345        }
1346
1347        // Compose filters
1348        let filter: Filter = Filter::default()
1349            .authors(outdated_public_keys.clone())
1350            .kinds([Kind::RelayList, Kind::InboxRelays]);
1351
1352        // Query from database
1353        let stored_events: Events = self.database().query(filter.clone()).await?;
1354
1355        // Get DISCOVERY and READ relays
1356        let urls: Vec<RelayUrl> = self
1357            .pool
1358            .__relay_urls_with_flag(
1359                RelayServiceFlags::DISCOVERY | RelayServiceFlags::READ,
1360                FlagCheck::Any,
1361            )
1362            .await;
1363
1364        // Get events from discovery and read relays
1365        let events: Events = self
1366            .fetch_events_from(urls, filter, Duration::from_secs(10))
1367            .await?;
1368
1369        // Update last check for these public keys
1370        self.gossip.update_last_check(outdated_public_keys).await;
1371
1372        // Merge database and relays events
1373        let merged: Events = events.merge(stored_events);
1374
1375        // Update gossip graph
1376        self.gossip.update(merged).await;
1377
1378        Ok(())
1379    }
1380
1381    /// Break down filters for gossip and discovery relays
1382    async fn break_down_filter(&self, filter: Filter) -> Result<HashMap<RelayUrl, Filter>, Error> {
1383        // Extract all public keys from filters
1384        let public_keys = filter.extract_public_keys();
1385
1386        // Check and update outdated public keys
1387        self.check_and_update_gossip(public_keys).await?;
1388
1389        // Broken-down filters
1390        let filters: HashMap<RelayUrl, Filter> = match self.gossip.break_down_filter(filter).await {
1391            BrokenDownFilters::Filters(filters) => filters,
1392            BrokenDownFilters::Orphan(filter) | BrokenDownFilters::Other(filter) => {
1393                // Get read relays
1394                let read_relays: Vec<RelayUrl> = self.pool.__read_relay_urls().await;
1395
1396                let mut map = HashMap::with_capacity(read_relays.len());
1397                for url in read_relays.into_iter() {
1398                    map.insert(url, filter.clone());
1399                }
1400                map
1401            }
1402        };
1403
1404        // Add gossip (outbox and inbox) relays
1405        for url in filters.keys() {
1406            if self.add_gossip_relay(url).await? {
1407                self.connect_relay(url).await?;
1408            }
1409        }
1410
1411        // Check if filters are empty
1412        // TODO: this can't be empty, right?
1413        if filters.is_empty() {
1414            return Err(Error::GossipFiltersEmpty);
1415        }
1416
1417        Ok(filters)
1418    }
1419
1420    async fn gossip_send_event(
1421        &self,
1422        event: &Event,
1423        is_nip17: bool,
1424    ) -> Result<Output<EventId>, Error> {
1425        let is_gift_wrap: bool = event.kind == Kind::GiftWrap;
1426
1427        // Get involved public keys and check what are up to date in the gossip graph and which ones require an update.
1428        if is_gift_wrap {
1429            // Get only p tags since the author of a gift wrap is randomized
1430            let public_keys = event.tags.public_keys().copied();
1431            self.check_and_update_gossip(public_keys).await?;
1432        } else {
1433            // Get all public keys involved in the event: author + p tags
1434            let public_keys = event
1435                .tags
1436                .public_keys()
1437                .copied()
1438                .chain(iter::once(event.pubkey));
1439            self.check_and_update_gossip(public_keys).await?;
1440        };
1441
1442        // Check if NIP17 or NIP65
1443        let urls: HashSet<RelayUrl> = if is_nip17 && is_gift_wrap {
1444            // Get NIP17 relays
1445            // Get only for relays for p tags since gift wraps are signed with random key (random author)
1446            let relays = self
1447                .gossip
1448                .get_nip17_inbox_relays(event.tags.public_keys())
1449                .await;
1450
1451            // Clients SHOULD publish kind 14 events to the 10050-listed relays.
1452            // If that is not found, that indicates the user is not ready to receive messages under this NIP and clients shouldn't try.
1453            //
1454            // <https://github.com/nostr-protocol/nips/blob/6e7a618e7f873bb91e743caacc3b09edab7796a0/17.md>
1455            if relays.is_empty() {
1456                return Err(Error::PrivateMsgRelaysNotFound);
1457            }
1458
1459            // Add outbox and inbox relays
1460            for url in relays.iter() {
1461                if self.add_gossip_relay(url).await? {
1462                    self.connect_relay(url).await?;
1463                }
1464            }
1465
1466            relays
1467        } else {
1468            // Get NIP65 relays
1469            let mut outbox = self.gossip.get_nip65_outbox_relays(&[event.pubkey]).await;
1470            let inbox = self
1471                .gossip
1472                .get_nip65_inbox_relays(event.tags.public_keys())
1473                .await;
1474
1475            // Add outbox and inbox relays
1476            for url in outbox.iter().chain(inbox.iter()) {
1477                if self.add_gossip_relay(url).await? {
1478                    self.connect_relay(url).await?;
1479                }
1480            }
1481
1482            // Get WRITE relays
1483            let write_relays: Vec<RelayUrl> = self.pool.__write_relay_urls().await;
1484
1485            // Extend OUTBOX relays with WRITE ones
1486            outbox.extend(write_relays);
1487
1488            // Extend outbox relays with inbox ones
1489            outbox.extend(inbox);
1490
1491            // Return all relays
1492            outbox
1493        };
1494
1495        // Send event
1496        Ok(self.pool.send_event_to(urls, event).await?)
1497    }
1498
1499    async fn gossip_stream_events(
1500        &self,
1501        filter: Filter,
1502        timeout: Duration,
1503        policy: ReqExitPolicy,
1504    ) -> Result<ReceiverStream<Event>, Error> {
1505        let filters = self.break_down_filter(filter).await?;
1506
1507        // Stream events
1508        let stream: ReceiverStream<Event> = self
1509            .pool
1510            .stream_events_targeted(filters, timeout, policy)
1511            .await?;
1512
1513        Ok(stream)
1514    }
1515
1516    async fn gossip_fetch_events(
1517        &self,
1518        filter: Filter,
1519        timeout: Duration,
1520        policy: ReqExitPolicy,
1521    ) -> Result<Events, Error> {
1522        let mut events: Events = Events::new(&filter);
1523
1524        // Stream events
1525        let mut stream: ReceiverStream<Event> =
1526            self.gossip_stream_events(filter, timeout, policy).await?;
1527
1528        while let Some(event) = stream.next().await {
1529            // To find out more about why the `force_insert` was used, search for EVENTS_FORCE_INSERT ine the code.
1530            events.force_insert(event);
1531        }
1532
1533        Ok(events)
1534    }
1535
1536    async fn gossip_subscribe(
1537        &self,
1538        id: SubscriptionId,
1539        filter: Filter,
1540        opts: SubscribeOptions,
1541    ) -> Result<Output<()>, Error> {
1542        let filters = self.break_down_filter(filter).await?;
1543        Ok(self.pool.subscribe_targeted(id, filters, opts).await?)
1544    }
1545
1546    async fn gossip_sync_negentropy(
1547        &self,
1548        filter: Filter,
1549        opts: &SyncOptions,
1550    ) -> Result<Output<Reconciliation>, Error> {
1551        // Break down filter
1552        let temp_filters = self.break_down_filter(filter).await?;
1553
1554        let database = self.database();
1555        let mut filters: HashMap<RelayUrl, (Filter, Vec<_>)> =
1556            HashMap::with_capacity(temp_filters.len());
1557
1558        // Iterate broken down filters and compose new filters for targeted reconciliation
1559        for (url, filter) in temp_filters.into_iter() {
1560            // Get items
1561            let items: Vec<(EventId, Timestamp)> =
1562                database.negentropy_items(filter.clone()).await?;
1563
1564            filters.insert(url, (filter, items));
1565        }
1566
1567        // Reconciliation
1568        Ok(self.pool.sync_targeted(filters, opts).await?)
1569    }
1570}