Skip to main content

hashtree_cli/
nostr_mirror.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::sync::Arc;
3use std::sync::Mutex;
4use std::time::Duration;
5use std::time::Instant;
6
7use anyhow::{Context, Result};
8use hashtree_nostr::{
9    CrawlConfig, CrawlReport, ListEventsOptions, NostrBridge, NostrEventStore, RelayFetchMode,
10};
11use nostr::{
12    Alphabet, Event, EventBuilder, Filter, Kind, PublicKey, SingleLetterTag, Tag, TagKind,
13    Timestamp,
14};
15use nostr_sdk::{
16    pool::RelayLimits, prelude::RelayPoolNotification, Client, EventSource, Keys, Options,
17    RelayStatus,
18};
19use tokio::sync::watch;
20use tracing::{debug, info, warn};
21
22use crate::blossom_push::background_blossom_push;
23use crate::socialgraph::crawler::SOCIALGRAPH_RELAY_EVENT_MAX_SIZE;
24use crate::socialgraph::{self, SocialGraphBackend, SocialGraphStore};
25use crate::HashtreeStore;
26
27#[cfg(not(test))]
28const MIRROR_STARTUP_DELAY: Duration = Duration::from_secs(8);
29#[cfg(test)]
30const MIRROR_STARTUP_DELAY: Duration = Duration::from_millis(50);
31
32#[cfg(not(test))]
33const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_secs(1);
34#[cfg(test)]
35const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_millis(250);
36
37#[cfg(not(test))]
38const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
39#[cfg(test)]
40const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_millis(100);
41
42#[cfg(not(test))]
43const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_secs(30);
44#[cfg(test)]
45const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_millis(100);
46
47const DEFAULT_HISTORY_KINDS: [u16; 6] = [0, 1, 3, 6, 7, 9735];
48const DEFAULT_EVENT_TREE_NAME: &str = "nostr-event-index";
49const DEFAULT_PROFILE_SEARCH_TREE_NAME: &str = "profile-search";
50const DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME: &str = "profiles-by-pubkey";
51const METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 1;
52const METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE: usize = 64;
53const LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER: usize = 8;
54const LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 16;
55const LARGE_HISTORY_SYNC_MAX_RELAY_PAGES: usize = 20;
56
57#[cfg(not(test))]
58const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_secs(300);
59#[cfg(test)]
60const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_millis(100);
61
62#[cfg(not(test))]
63const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_secs(5);
64#[cfg(test)]
65const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_millis(20);
66
67#[cfg(not(test))]
68const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_secs(30);
69#[cfg(test)]
70const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_millis(100);
71
72#[derive(Debug, Clone)]
73pub struct NostrMirrorConfig {
74    pub relays: Vec<String>,
75    pub publish_relays: Vec<String>,
76    pub blossom_write_servers: Vec<String>,
77    pub max_follow_distance: u32,
78    pub overmute_threshold: f64,
79    pub author_batch_size: usize,
80    pub history_sync_author_chunk_size: usize,
81    pub history_sync_per_author_event_limit: usize,
82    pub missing_profile_backfill_batch_size: usize,
83    pub fetch_timeout: Duration,
84    pub relay_event_max_size: Option<u32>,
85    pub require_negentropy: bool,
86    pub kinds: Vec<u16>,
87    pub history_sync_on_start: bool,
88    pub history_sync_on_reconnect: bool,
89    pub published_event_tree_name: Option<String>,
90    pub published_profile_search_tree_name: Option<String>,
91    pub published_profiles_by_pubkey_tree_name: Option<String>,
92}
93
94impl Default for NostrMirrorConfig {
95    fn default() -> Self {
96        Self {
97            relays: Vec::new(),
98            publish_relays: Vec::new(),
99            blossom_write_servers: Vec::new(),
100            max_follow_distance: 2,
101            overmute_threshold: 1.0,
102            author_batch_size: 256,
103            history_sync_author_chunk_size: 5_000,
104            history_sync_per_author_event_limit: 256,
105            missing_profile_backfill_batch_size: 5_000,
106            fetch_timeout: Duration::from_secs(15),
107            relay_event_max_size: Some(SOCIALGRAPH_RELAY_EVENT_MAX_SIZE),
108            require_negentropy: false,
109            kinds: DEFAULT_HISTORY_KINDS.to_vec(),
110            history_sync_on_start: true,
111            history_sync_on_reconnect: true,
112            published_event_tree_name: Some(DEFAULT_EVENT_TREE_NAME.to_string()),
113            published_profile_search_tree_name: Some(DEFAULT_PROFILE_SEARCH_TREE_NAME.to_string()),
114            published_profiles_by_pubkey_tree_name: Some(
115                DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME.to_string(),
116            ),
117        }
118    }
119}
120
121#[derive(Debug, Default)]
122struct RootPublishState {
123    pending_root: Option<hashtree_core::Cid>,
124    last_changed_at: Option<Instant>,
125    dirty_since: Option<Instant>,
126    last_published_root: Option<hashtree_core::Cid>,
127    last_published_at: Option<Instant>,
128    last_published_created_at: Option<Timestamp>,
129    last_uploaded_root: Option<hashtree_core::Cid>,
130    last_uploaded_at: Option<Instant>,
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134struct HistorySyncPlan {
135    relay_fetch_mode: RelayFetchMode,
136    author_batch_size: usize,
137    per_author_event_limit: usize,
138    relay_page_size: usize,
139    max_relay_pages: usize,
140}
141
142pub struct BackgroundNostrMirror {
143    config: NostrMirrorConfig,
144    store: Arc<HashtreeStore>,
145    graph_store: Arc<SocialGraphStore>,
146    client: Client,
147    publish_client: Option<Client>,
148    publish_pubkey: Option<PublicKey>,
149    event_publish_state: Mutex<RootPublishState>,
150    profile_search_publish_state: Mutex<RootPublishState>,
151    profiles_by_pubkey_publish_state: Mutex<RootPublishState>,
152    pending_live_events: Mutex<BTreeMap<String, Event>>,
153    missing_profile_cursor: Mutex<usize>,
154    shutdown_tx: watch::Sender<bool>,
155    shutdown_rx: watch::Receiver<bool>,
156}
157
158impl BackgroundNostrMirror {
159    pub async fn new(
160        config: NostrMirrorConfig,
161        store: Arc<HashtreeStore>,
162        graph_store: Arc<SocialGraphStore>,
163        publish_keys: Option<Keys>,
164    ) -> Result<Self> {
165        let client = if let Some(max_size) = config.relay_event_max_size {
166            let mut limits = RelayLimits::default();
167            limits.events.max_size = Some(max_size);
168            Client::with_opts(Keys::generate(), Options::new().relay_limits(limits))
169        } else {
170            Client::new(Keys::generate())
171        };
172        for relay in &config.relays {
173            client
174                .add_relay(relay)
175                .await
176                .with_context(|| format!("add mirror relay {relay}"))?;
177        }
178        client.connect().await;
179
180        let publish_pubkey = publish_keys.as_ref().map(Keys::public_key);
181        let publish_client = if let Some(keys) = publish_keys {
182            if config.publish_relays.is_empty() {
183                None
184            } else {
185                let client = Client::new(keys);
186                for relay in &config.publish_relays {
187                    client
188                        .add_relay(relay)
189                        .await
190                        .with_context(|| format!("add mirror publish relay {relay}"))?;
191                }
192                client.connect().await;
193                Some(client)
194            }
195        } else {
196            None
197        };
198
199        let (shutdown_tx, shutdown_rx) = watch::channel(false);
200        Ok(Self {
201            config,
202            store,
203            graph_store,
204            client,
205            publish_client,
206            publish_pubkey,
207            event_publish_state: Mutex::new(RootPublishState::default()),
208            profile_search_publish_state: Mutex::new(RootPublishState::default()),
209            profiles_by_pubkey_publish_state: Mutex::new(RootPublishState::default()),
210            pending_live_events: Mutex::new(BTreeMap::new()),
211            missing_profile_cursor: Mutex::new(0),
212            shutdown_tx,
213            shutdown_rx,
214        })
215    }
216
217    pub fn shutdown(&self) {
218        let _ = self.shutdown_tx.send(true);
219    }
220
221    fn sync_publish_roots_from_store(&self) -> Result<()> {
222        self.note_public_events_root_change()?;
223        self.note_profile_search_root_change()?;
224        self.note_profiles_by_pubkey_root_change()?;
225        Ok(())
226    }
227
228    async fn publish_pending_roots(
229        &self,
230        force_event: bool,
231        force_profile_search: bool,
232        force_profiles_by_pubkey: bool,
233    ) -> (Result<()>, Result<()>, Result<()>) {
234        tokio::join!(
235            self.maybe_publish_event_root(force_event),
236            self.maybe_publish_profile_search_root(force_profile_search),
237            self.maybe_publish_profiles_by_pubkey_root(force_profiles_by_pubkey),
238        )
239    }
240
241    async fn publish_priority_roots(
242        &self,
243        force_event: bool,
244        force_profile_search: bool,
245        force_profiles_by_pubkey: bool,
246    ) -> (Result<()>, Result<()>, Result<()>) {
247        let (profile_search_result, profiles_by_pubkey_result) = tokio::join!(
248            async {
249                if force_profile_search {
250                    self.maybe_publish_profile_search_root(true).await
251                } else {
252                    Ok(())
253                }
254            },
255            async {
256                if force_profiles_by_pubkey {
257                    self.maybe_publish_profiles_by_pubkey_root(true).await
258                } else {
259                    Ok(())
260                }
261            },
262        );
263        let event_result = if force_event {
264            self.maybe_publish_event_root(true).await
265        } else {
266            Ok(())
267        };
268        (
269            event_result,
270            profile_search_result,
271            profiles_by_pubkey_result,
272        )
273    }
274
275    pub async fn run(&self) -> Result<()> {
276        if self.config.relays.is_empty() || self.config.max_follow_distance == 0 {
277            return Ok(());
278        }
279
280        info!(
281            "Nostr mirror starting: relays={} max_follow_distance={} negentropy_only={} kinds={:?} history_sync_author_chunk_size={} history_sync_on_start={} history_sync_on_reconnect={}",
282            self.config.relays.len(),
283            self.config.max_follow_distance,
284            self.config.require_negentropy,
285            self.config.kinds,
286            self.config.history_sync_author_chunk_size.max(1),
287            self.config.history_sync_on_start,
288            self.config.history_sync_on_reconnect
289        );
290
291        tokio::time::sleep(MIRROR_STARTUP_DELAY).await;
292        tokio::time::sleep(MIRROR_CONNECT_SETTLE_DELAY).await;
293        let live_since = Timestamp::now();
294        self.sync_publish_roots_from_store()?;
295
296        let initial_authors = self.collect_authors()?;
297        if initial_authors.is_empty() {
298            info!("Nostr mirror: no social-graph authors to mirror yet");
299        } else if self.config.history_sync_on_start {
300            if self.should_backfill_missing_profiles(None) {
301                let missing_profile_authors = self.collect_missing_profile_authors(
302                    self.config.missing_profile_backfill_batch_size,
303                )?;
304                if !missing_profile_authors.is_empty() {
305                    info!(
306                        "Nostr mirror missing-profile backfill starting: authors={}",
307                        missing_profile_authors.len()
308                    );
309                    self.history_sync_authors_with_kinds(
310                        missing_profile_authors,
311                        &[Kind::Metadata.as_u16()],
312                    )
313                    .await?;
314                }
315            }
316            self.history_sync_authors(initial_authors.clone()).await?;
317        }
318
319        let mut subscribed_authors = HashSet::new();
320        self.subscribe_authors_since(&initial_authors, live_since, &mut subscribed_authors)
321            .await?;
322
323        let mut relay_statuses = self.capture_relay_statuses().await;
324        let mut last_reconnect_history_sync_at: Option<Instant> = None;
325        let mut last_missing_profile_backfill_at: Option<Instant> = None;
326        let mut notifications = self.client.notifications();
327        let mut shutdown_rx = self.shutdown_rx.clone();
328        let mut refresh_interval = tokio::time::interval(MIRROR_AUTHOR_REFRESH_INTERVAL);
329        let mut publish_interval = tokio::time::interval(MIRROR_ROOT_PUBLISH_DEBOUNCE);
330
331        loop {
332            tokio::select! {
333                _ = shutdown_rx.changed() => {
334                    if *shutdown_rx.borrow() {
335                        break;
336                    }
337                }
338                _ = refresh_interval.tick() => {
339                    let authors = self.collect_authors()?;
340                    let new_authors = authors
341                        .into_iter()
342                        .filter(|author| !subscribed_authors.contains(author))
343                        .collect::<Vec<_>>();
344                    if !new_authors.is_empty() {
345                        debug!(
346                            "Nostr mirror discovered {} newly reachable author(s)",
347                            new_authors.len()
348                        );
349                        self.history_sync_authors(new_authors.clone()).await?;
350                        self.subscribe_authors_since(
351                            &new_authors,
352                            Timestamp::now(),
353                            &mut subscribed_authors,
354                        )
355                        .await?;
356                    }
357                    if self.should_backfill_missing_profiles(last_missing_profile_backfill_at) {
358                        let missing_profile_authors = self.collect_missing_profile_authors(
359                            self.config.missing_profile_backfill_batch_size,
360                        )?;
361                        if !missing_profile_authors.is_empty() {
362                            info!(
363                                "Nostr mirror missing-profile backfill starting: authors={}",
364                                missing_profile_authors.len()
365                            );
366                            self.history_sync_authors_with_kinds(
367                                missing_profile_authors,
368                                &[Kind::Metadata.as_u16()],
369                            )
370                            .await?;
371                            last_missing_profile_backfill_at = Some(Instant::now());
372                        }
373                    }
374                }
375                _ = publish_interval.tick() => {
376                    self.sync_publish_roots_from_store()?;
377                    if let Err(err) = self.flush_live_events().await {
378                        warn!("Nostr mirror live event flush failed: {:#}", err);
379                    }
380                    let (event_result, profile_search_result, profiles_by_pubkey_result) = self
381                        .publish_pending_roots(false, false, false)
382                        .await;
383                    if let Err(err) = event_result {
384                        warn!("Nostr mirror event-root publish failed: {:#}", err);
385                    }
386                    if let Err(err) = profile_search_result {
387                        warn!("Nostr mirror profile-search publish failed: {:#}", err);
388                    }
389                    if let Err(err) = profiles_by_pubkey_result {
390                        warn!("Nostr mirror profiles-by-pubkey publish failed: {:#}", err);
391                    }
392                }
393                notification = notifications.recv() => {
394                    match notification {
395                        Ok(RelayPoolNotification::Event { event, .. }) => {
396                            self.ingest_live_event(&event)?;
397                        }
398                        Ok(RelayPoolNotification::RelayStatus { relay_url, status }) => {
399                            let relay_url = relay_url.to_string();
400                            let previous = relay_statuses.insert(relay_url.clone(), status);
401                            if Self::should_history_sync_on_reconnect(
402                                self.config.history_sync_on_reconnect,
403                                previous,
404                                status,
405                            ) && Self::should_run_reconnect_history_sync(
406                                    last_reconnect_history_sync_at.as_ref(),
407                                )
408                            {
409                                let authors = self.collect_authors()?;
410                                if !authors.is_empty() {
411                                    info!(
412                                        "Nostr mirror relay reconnected; running catch-up history sync: relay={} authors={} negentropy_only={}",
413                                        relay_url,
414                                        authors.len(),
415                                        self.config.require_negentropy
416                                    );
417                                    self.history_sync_authors(authors).await?;
418                                    last_reconnect_history_sync_at = Some(Instant::now());
419                                }
420                            }
421                        }
422                        Ok(RelayPoolNotification::Shutdown) => break,
423                        Ok(_) => {}
424                        Err(err) => {
425                            warn!("Nostr mirror notification error: {}", err);
426                            break;
427                        }
428                    }
429                }
430            }
431        }
432
433        if let Err(err) = self.flush_live_events().await {
434            warn!(
435                "Nostr mirror live event flush failed during shutdown: {:#}",
436                err
437            );
438        }
439        if let Err(err) = self.sync_publish_roots_from_store() {
440            warn!(
441                "Nostr mirror root-state refresh failed during shutdown: {:#}",
442                err
443            );
444        }
445        let (event_result, profile_search_result, profiles_by_pubkey_result) =
446            self.publish_pending_roots(true, true, true).await;
447        if let Err(err) = event_result {
448            warn!(
449                "Nostr mirror event-root publish failed during shutdown: {:#}",
450                err
451            );
452        }
453        if let Err(err) = profile_search_result {
454            warn!(
455                "Nostr mirror profile-search publish failed during shutdown: {:#}",
456                err
457            );
458        }
459        if let Err(err) = profiles_by_pubkey_result {
460            warn!(
461                "Nostr mirror profiles-by-pubkey publish failed during shutdown: {:#}",
462                err
463            );
464        }
465        let _ = self.client.disconnect().await;
466        if let Some(client) = self.publish_client.as_ref() {
467            let _ = client.disconnect().await;
468        }
469        Ok(())
470    }
471
472    async fn capture_relay_statuses(&self) -> HashMap<String, RelayStatus> {
473        let mut statuses = HashMap::new();
474        for (relay_url, relay) in self.client.relays().await {
475            statuses.insert(relay_url.to_string(), relay.status().await);
476        }
477        statuses
478    }
479
480    async fn has_connected_publish_relay(&self) -> bool {
481        let Some(client) = self.publish_client.as_ref() else {
482            return false;
483        };
484        Self::client_has_connected_relay(client).await
485    }
486
487    async fn client_has_connected_relay(client: &Client) -> bool {
488        for (_relay_url, relay) in client.relays().await {
489            if relay.status().await == RelayStatus::Connected {
490                return true;
491            }
492        }
493        false
494    }
495
496    fn collect_authors(&self) -> Result<Vec<String>> {
497        let mut authors = Vec::new();
498        let mut seen = HashSet::new();
499        for distance in 0..=self.config.max_follow_distance {
500            for pubkey in socialgraph::SocialGraphBackend::users_by_follow_distance(
501                self.graph_store.as_ref(),
502                distance,
503            )
504            .with_context(|| format!("load social-graph distance {distance}"))?
505            {
506                if self
507                    .graph_store
508                    .is_overmuted_user(&pubkey, self.config.overmute_threshold)?
509                {
510                    continue;
511                }
512                let hex = hex::encode(pubkey);
513                if seen.insert(hex.clone()) {
514                    authors.push(hex);
515                }
516            }
517        }
518        Ok(authors)
519    }
520
521    fn collect_missing_profile_authors(&self, limit: usize) -> Result<Vec<String>> {
522        if limit == 0 {
523            return Ok(Vec::new());
524        }
525
526        let authors = self.collect_authors()?;
527        if authors.is_empty() {
528            return Ok(Vec::new());
529        }
530
531        let mut cursor = self
532            .missing_profile_cursor
533            .lock()
534            .expect("missing profile cursor");
535        let mut index = (*cursor).min(authors.len());
536        let mut scanned = 0usize;
537        let mut missing = Vec::new();
538
539        while scanned < authors.len() && missing.len() < limit {
540            let author = &authors[index];
541            if self.graph_store.latest_profile_event(author)?.is_none() {
542                missing.push(author.clone());
543            }
544            index += 1;
545            if index == authors.len() {
546                index = 0;
547            }
548            scanned += 1;
549        }
550
551        *cursor = index;
552        Ok(missing)
553    }
554
555    fn should_backfill_missing_profiles(&self, last_run: Option<Instant>) -> bool {
556        if self.config.missing_profile_backfill_batch_size == 0
557            || !self.config.kinds.contains(&Kind::Metadata.as_u16())
558        {
559            return false;
560        }
561        match last_run {
562            Some(last_run) => last_run.elapsed() >= MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL,
563            None => true,
564        }
565    }
566
567    fn should_history_sync_on_reconnect(
568        history_sync_on_reconnect: bool,
569        previous: Option<RelayStatus>,
570        status: RelayStatus,
571    ) -> bool {
572        history_sync_on_reconnect
573            && status == RelayStatus::Connected
574            && matches!(
575                previous,
576                Some(
577                    RelayStatus::Initialized
578                        | RelayStatus::Pending
579                        | RelayStatus::Connecting
580                        | RelayStatus::Disconnected
581                        | RelayStatus::Terminated
582                )
583            )
584    }
585
586    fn should_run_reconnect_history_sync(last_run: Option<&Instant>) -> bool {
587        match last_run {
588            None => true,
589            Some(last_run) => last_run.elapsed() >= MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN,
590        }
591    }
592
593    fn is_metadata_only_history_sync(kinds: &[u16]) -> bool {
594        !kinds.is_empty() && kinds.iter().all(|kind| *kind == Kind::Metadata.as_u16())
595    }
596
597    fn history_sync_plan_for(
598        config: &NostrMirrorConfig,
599        authors: usize,
600        kinds: &[u16],
601    ) -> HistorySyncPlan {
602        let author_batch_size = config.author_batch_size.max(1);
603        let per_author_event_limit = config.history_sync_per_author_event_limit.max(1);
604        let relay_page_size = 1_000;
605        let max_relay_pages = 10;
606
607        if Self::is_metadata_only_history_sync(kinds) {
608            return HistorySyncPlan {
609                relay_fetch_mode: RelayFetchMode::AuthorBatches,
610                author_batch_size: author_batch_size.min(METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE),
611                per_author_event_limit: METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT,
612                relay_page_size,
613                max_relay_pages,
614            };
615        }
616
617        if authors > author_batch_size.saturating_mul(LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER) {
618            return HistorySyncPlan {
619                relay_fetch_mode: RelayFetchMode::GlobalRecent,
620                author_batch_size,
621                per_author_event_limit: per_author_event_limit
622                    .min(LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT)
623                    .max(1),
624                relay_page_size,
625                max_relay_pages: LARGE_HISTORY_SYNC_MAX_RELAY_PAGES,
626            };
627        }
628
629        HistorySyncPlan {
630            relay_fetch_mode: RelayFetchMode::AuthorBatches,
631            author_batch_size,
632            per_author_event_limit,
633            relay_page_size,
634            max_relay_pages,
635        }
636    }
637
638    fn history_sync_plan(&self, authors: usize, kinds: &[u16]) -> HistorySyncPlan {
639        Self::history_sync_plan_for(&self.config, authors, kinds)
640    }
641
642    async fn history_sync_authors(&self, authors: Vec<String>) -> Result<()> {
643        self.history_sync_authors_with_kinds(authors, &self.config.kinds)
644            .await
645    }
646
647    async fn history_sync_authors_with_kinds(
648        &self,
649        authors: Vec<String>,
650        kinds: &[u16],
651    ) -> Result<()> {
652        self.history_sync_authors_chunked(authors, |current_root, author_chunk| async move {
653            self.history_sync_author_chunk(current_root, author_chunk, kinds)
654                .await
655        })
656        .await
657    }
658
659    async fn history_sync_authors_chunked<F, Fut>(
660        &self,
661        authors: Vec<String>,
662        mut run_chunk: F,
663    ) -> Result<()>
664    where
665        F: FnMut(Option<hashtree_core::Cid>, Vec<String>) -> Fut,
666        Fut: std::future::Future<Output = Result<CrawlReport>>,
667    {
668        if authors.is_empty() {
669            return Ok(());
670        }
671
672        info!(
673            "Nostr mirror history sync starting: authors={} relays={} negentropy_only={}",
674            authors.len(),
675            self.config.relays.len(),
676            self.config.require_negentropy
677        );
678
679        let mut current_root = self.graph_store.public_events_root()?;
680        let mut last_error = None;
681        let mut applied_chunks = 0usize;
682        let mut failed_chunks = 0usize;
683        let chunk_size = self.config.history_sync_author_chunk_size.max(1);
684        let total_chunks = authors.len().div_ceil(chunk_size);
685
686        for (chunk_index, author_chunk) in authors.chunks(chunk_size).enumerate() {
687            let author_chunk = author_chunk.to_vec();
688            let author_count = author_chunk.len();
689            info!(
690                "Nostr mirror history sync chunk starting: chunk={}/{} authors={}",
691                chunk_index + 1,
692                total_chunks,
693                author_count
694            );
695            let report = match run_chunk(current_root.clone(), author_chunk).await {
696                Ok(report) => report,
697                Err(err) => {
698                    failed_chunks = failed_chunks.saturating_add(1);
699                    warn!(
700                        "Nostr mirror history sync chunk failed: chunk={}/{} authors={} error={:#}",
701                        chunk_index + 1,
702                        total_chunks,
703                        author_count,
704                        err
705                    );
706                    last_error = Some(err);
707                    continue;
708                }
709            };
710
711            if report.root != current_root {
712                self.apply_history_root(report.root.as_ref()).await?;
713                current_root = report.root.clone();
714                info!(
715                    "Nostr mirror history sync updated trusted root: chunk={}/{} authors_processed={} events_selected={} events_seen={}",
716                    chunk_index + 1,
717                    total_chunks,
718                    report.authors_processed,
719                    report.events_selected,
720                    report.events_seen
721                );
722            }
723            applied_chunks = applied_chunks.saturating_add(1);
724        }
725
726        if applied_chunks == 0 {
727            return Err(last_error
728                .unwrap_or_else(|| anyhow::anyhow!("mirror history sync made no progress"))
729                .context("run mirror history sync"));
730        }
731        if failed_chunks > 0 {
732            warn!(
733                "Nostr mirror history sync completed with skipped chunks: applied_chunks={} failed_chunks={}",
734                applied_chunks,
735                failed_chunks
736            );
737        }
738        Ok(())
739    }
740
741    async fn history_sync_author_chunk(
742        &self,
743        current_root: Option<hashtree_core::Cid>,
744        authors: Vec<String>,
745        kinds: &[u16],
746    ) -> Result<CrawlReport> {
747        let mut last_error = None;
748        let mut report = None;
749        let plan = self.history_sync_plan(authors.len(), kinds);
750        for attempt in 0..3 {
751            let mut last_logged_authors = 0usize;
752            let bridge = NostrBridge::new(
753                self.store.store_arc(),
754                CrawlConfig {
755                    relays: self.config.relays.clone(),
756                    author_allowlist: Some(authors.clone()),
757                    max_live_bytes: None,
758                    max_events_seen: None,
759                    max_authors: None,
760                    max_follow_distance: None,
761                    author_batch_size: plan.author_batch_size,
762                    per_author_event_limit: plan.per_author_event_limit,
763                    per_author_live_bytes: None,
764                    fetch_timeout: self.config.fetch_timeout,
765                    kinds: Some(kinds.to_vec()),
766                    relay_fetch_mode: plan.relay_fetch_mode,
767                    require_negentropy: self.config.require_negentropy,
768                    relay_event_max_size: self.config.relay_event_max_size,
769                    relay_page_size: plan.relay_page_size,
770                    max_relay_pages: plan.max_relay_pages,
771                },
772            );
773
774            match bridge
775                .crawl_with_progress(self.graph_store.as_ref(), current_root.as_ref(), |progress| {
776                    let log_interval = self.config.author_batch_size.saturating_mul(8).max(2_048);
777                    let should_log = progress.authors_processed == progress.authors_considered
778                        || progress.authors_processed == 0
779                        || progress
780                            .authors_processed
781                            .saturating_sub(last_logged_authors)
782                            >= log_interval;
783                    if should_log {
784                        last_logged_authors = progress.authors_processed;
785                        info!(
786                            "Nostr mirror history sync progress: authors_processed={}/{} events_selected={} events_seen={}",
787                            progress.authors_processed,
788                            progress.authors_considered,
789                            progress.events_selected,
790                            progress.events_seen
791                        );
792                    }
793                })
794                .await
795            {
796                Ok(next_report) => {
797                    report = Some(next_report);
798                    break;
799                }
800                Err(err) => {
801                    last_error = Some(err);
802                    if attempt < 2 {
803                        tokio::time::sleep(Duration::from_millis(500)).await;
804                    }
805                }
806            }
807        }
808        report
809            .ok_or_else(|| last_error.expect("history sync retry captured error"))
810            .context("run mirror history sync")
811    }
812
813    async fn apply_history_root(&self, root: Option<&hashtree_core::Cid>) -> Result<()> {
814        self.graph_store.write_public_events_root(root)?;
815        let Some(root) = root else {
816            return Ok(());
817        };
818
819        let event_store = NostrEventStore::new(self.store.store_arc());
820        let events = event_store
821            .list_recent_lossy(Some(root), ListEventsOptions::default())
822            .await
823            .context("list trusted mirrored events")?
824            .into_iter()
825            .map(socialgraph::stored_event_to_nostr_event)
826            .collect::<Result<Vec<_>>>()?;
827
828        self.graph_store
829            .rebuild_profile_index_for_events(&events)
830            .context("rebuild mirrored profile search index")?;
831        socialgraph::ingest_graph_parsed_events(self.graph_store.as_ref(), &events)
832            .context("sync mirrored social graph state")?;
833        self.note_public_events_root_change()?;
834        self.note_profile_search_root_change()?;
835        self.note_profiles_by_pubkey_root_change()?;
836        let (event_result, profile_search_result, profiles_by_pubkey_result) =
837            self.publish_priority_roots(true, true, true).await;
838        if let Err(err) = event_result {
839            warn!(
840                "Nostr mirror event-root publish failed after root update: {:#}",
841                err
842            );
843        }
844        if let Err(err) = profile_search_result {
845            warn!(
846                "Nostr mirror profile-search publish failed after root update: {:#}",
847                err
848            );
849        }
850        if let Err(err) = profiles_by_pubkey_result {
851            warn!(
852                "Nostr mirror profiles-by-pubkey publish failed after root update: {:#}",
853                err
854            );
855        }
856        Ok(())
857    }
858
859    async fn subscribe_authors_since(
860        &self,
861        authors: &[String],
862        since: Timestamp,
863        subscribed_authors: &mut HashSet<String>,
864    ) -> Result<()> {
865        let new_authors = authors
866            .iter()
867            .filter(|author| !subscribed_authors.contains(*author))
868            .cloned()
869            .collect::<Vec<_>>();
870        if new_authors.is_empty() {
871            return Ok(());
872        }
873
874        for chunk in new_authors.chunks(self.config.author_batch_size.max(1)) {
875            let pubkeys = chunk
876                .iter()
877                .filter_map(|author| PublicKey::from_hex(author).ok())
878                .collect::<Vec<_>>();
879            if pubkeys.is_empty() {
880                continue;
881            }
882
883            let filter = Filter::new()
884                .authors(pubkeys)
885                .kinds(self.config.kinds.iter().copied().map(Kind::from))
886                .since(since);
887
888            self.client
889                .subscribe(vec![filter], None)
890                .await
891                .context("subscribe mirror author batch")?;
892        }
893
894        subscribed_authors.extend(new_authors);
895        Ok(())
896    }
897
898    fn ingest_live_event(&self, event: &Event) -> Result<()> {
899        self.pending_live_events
900            .lock()
901            .expect("pending live events")
902            .insert(event.id.to_hex(), event.clone());
903        Ok(())
904    }
905
906    async fn flush_live_events(&self) -> Result<()> {
907        let pending = {
908            let mut pending = self
909                .pending_live_events
910                .lock()
911                .expect("pending live events");
912            if pending.is_empty() {
913                return Ok(());
914            }
915            std::mem::take(&mut *pending)
916        };
917        let events = pending.into_values().collect::<Vec<_>>();
918        let event_count = events.len();
919        let previous_event_root = self.graph_store.public_events_root()?;
920        let previous_profile_search_root = self.graph_store.profile_search_root()?;
921        let previous_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
922
923        socialgraph::ingest_parsed_events_with_storage_class(
924            self.graph_store.as_ref(),
925            &events,
926            socialgraph::EventStorageClass::Public,
927        )
928        .context("ingest live mirrored event batch")?;
929
930        let next_event_root = self.graph_store.public_events_root()?;
931        let next_profile_search_root = self.graph_store.profile_search_root()?;
932        let next_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
933        let event_root_changed = next_event_root != previous_event_root;
934        let profile_search_root_changed = next_profile_search_root != previous_profile_search_root;
935        let profiles_by_pubkey_root_changed =
936            next_profiles_by_pubkey_root != previous_profiles_by_pubkey_root;
937
938        if event_root_changed {
939            self.note_public_events_root_change()?;
940        }
941        if profile_search_root_changed {
942            self.note_profile_search_root_change()?;
943        }
944        if profiles_by_pubkey_root_changed {
945            self.note_profiles_by_pubkey_root_change()?;
946        }
947        if profile_search_root_changed {
948            self.maybe_publish_profile_search_root(true).await?;
949        }
950        if profiles_by_pubkey_root_changed {
951            self.maybe_publish_profiles_by_pubkey_root(true).await?;
952        }
953        if event_root_changed {
954            self.maybe_publish_event_root(true).await?;
955        }
956        info!(
957            "Nostr mirror flushed live events: events={} event_root_changed={} profile_search_root_changed={} profiles_by_pubkey_root_changed={}",
958            event_count,
959            event_root_changed,
960            profile_search_root_changed,
961            profiles_by_pubkey_root_changed
962        );
963        Ok(())
964    }
965
966    fn note_public_events_root_change(&self) -> Result<()> {
967        let root = self.graph_store.public_events_root()?;
968        Self::note_root_change(
969            self.config.published_event_tree_name.as_deref(),
970            &self.event_publish_state,
971            root,
972        )
973    }
974
975    fn note_profile_search_root_change(&self) -> Result<()> {
976        let root = self.graph_store.profile_search_root()?;
977        Self::note_root_change(
978            self.config.published_profile_search_tree_name.as_deref(),
979            &self.profile_search_publish_state,
980            root,
981        )
982    }
983
984    fn note_profiles_by_pubkey_root_change(&self) -> Result<()> {
985        let root = self.graph_store.profiles_by_pubkey_root()?;
986        Self::note_root_change(
987            self.config
988                .published_profiles_by_pubkey_tree_name
989                .as_deref(),
990            &self.profiles_by_pubkey_publish_state,
991            root,
992        )
993    }
994
995    fn note_root_change(
996        tree_name: Option<&str>,
997        publish_state: &Mutex<RootPublishState>,
998        root: Option<hashtree_core::Cid>,
999    ) -> Result<()> {
1000        let Some(_tree_name) = tree_name else {
1001            return Ok(());
1002        };
1003
1004        let mut state = publish_state.lock().expect("root publish state");
1005        let now = Instant::now();
1006
1007        if state.pending_root == root {
1008            return Ok(());
1009        }
1010
1011        state.pending_root = root;
1012        state.last_changed_at = Some(now);
1013        if state.dirty_since.is_none() {
1014            state.dirty_since = Some(now);
1015        }
1016        Ok(())
1017    }
1018
1019    async fn maybe_publish_event_root(&self, force: bool) -> Result<()> {
1020        self.maybe_publish_root(
1021            self.config.published_event_tree_name.as_deref(),
1022            &self.event_publish_state,
1023            "event root",
1024            force,
1025        )
1026        .await
1027    }
1028
1029    async fn maybe_publish_profile_search_root(&self, force: bool) -> Result<()> {
1030        self.maybe_publish_root(
1031            self.config.published_profile_search_tree_name.as_deref(),
1032            &self.profile_search_publish_state,
1033            "profile search root",
1034            force,
1035        )
1036        .await
1037    }
1038
1039    async fn maybe_publish_profiles_by_pubkey_root(&self, force: bool) -> Result<()> {
1040        self.maybe_publish_root(
1041            self.config
1042                .published_profiles_by_pubkey_tree_name
1043                .as_deref(),
1044            &self.profiles_by_pubkey_publish_state,
1045            "profiles-by-pubkey root",
1046            force,
1047        )
1048        .await
1049    }
1050
1051    async fn maybe_publish_root(
1052        &self,
1053        tree_name: Option<&str>,
1054        publish_state: &Mutex<RootPublishState>,
1055        log_label: &str,
1056        force: bool,
1057    ) -> Result<()> {
1058        let Some(tree_name) = tree_name else {
1059            return Ok(());
1060        };
1061
1062        let pending_root = {
1063            let state = publish_state.lock().expect("root publish state");
1064            let Some(pending_root) = state.pending_root.clone() else {
1065                return Ok(());
1066            };
1067
1068            let now = Instant::now();
1069            let debounce_ready = state.last_changed_at.is_some_and(|changed_at| {
1070                now.duration_since(changed_at) >= MIRROR_ROOT_PUBLISH_DEBOUNCE
1071            });
1072            let stale_ready = state.dirty_since.is_some_and(|dirty_since| {
1073                now.duration_since(dirty_since) >= MIRROR_ROOT_PUBLISH_MAX_STALENESS
1074            });
1075            if !force && !debounce_ready && !stale_ready {
1076                return Ok(());
1077            }
1078
1079            pending_root
1080        };
1081
1082        let needs_upload = {
1083            let state = publish_state.lock().expect("root publish state");
1084            !self.config.blossom_write_servers.is_empty()
1085                && state.last_uploaded_root.as_ref() != Some(&pending_root)
1086        };
1087        if needs_upload {
1088            background_blossom_push(
1089                self.store.base_path(),
1090                &pending_root.to_string(),
1091                &self.config.blossom_write_servers,
1092            )
1093            .await
1094            .with_context(|| format!("upload {log_label} DAG to Blossom"))?;
1095
1096            let mut state = publish_state.lock().expect("root publish state");
1097            if state.pending_root.as_ref() == Some(&pending_root) {
1098                state.last_uploaded_root = Some(pending_root.clone());
1099                state.last_uploaded_at = Some(Instant::now());
1100            }
1101        }
1102
1103        let mut successful_relays = Vec::new();
1104        let mut failed_relays = Vec::new();
1105        let publish_required =
1106            self.publish_client.is_some() && !self.config.publish_relays.is_empty();
1107        if publish_required {
1108            let Some(publish_client) = self.publish_client.as_ref() else {
1109                unreachable!("publish_required implies publish_client");
1110            };
1111            if !self.has_connected_publish_relay().await {
1112                return Ok(());
1113            }
1114
1115            let already_published = {
1116                let state = publish_state.lock().expect("root publish state");
1117                state.last_published_root.as_ref() == Some(&pending_root)
1118            };
1119            if !already_published {
1120                let publish_relays = self.config.publish_relays.clone();
1121                let latest_known_created_at = {
1122                    let state = publish_state.lock().expect("root publish state");
1123                    state.last_published_created_at
1124                };
1125                let publish_created_at = next_replaceable_created_at(
1126                    Timestamp::now(),
1127                    later_timestamp(
1128                        latest_known_created_at,
1129                        self.latest_root_event_created_at(tree_name).await,
1130                    ),
1131                );
1132                let event = publish_client
1133                    .sign_event_builder(Self::build_public_root_event(
1134                        tree_name,
1135                        &pending_root,
1136                        publish_created_at,
1137                    ))
1138                    .await
1139                    .with_context(|| format!("sign {log_label} event"))?;
1140                let publish_result = self
1141                    .publish_root_event_to_relays(publish_client, &publish_relays, &event)
1142                    .await
1143                    .with_context(|| format!("publish {log_label} event"))?;
1144                successful_relays = publish_result.0;
1145                failed_relays = publish_result.1;
1146                if successful_relays.is_empty() {
1147                    let failure_summary = if failed_relays.is_empty() {
1148                        "no publish relays accepted the event".to_string()
1149                    } else {
1150                        failed_relays.join("; ")
1151                    };
1152                    anyhow::bail!("no publish relays accepted the event ({failure_summary})");
1153                }
1154
1155                let mut state = publish_state.lock().expect("root publish state");
1156                if state.pending_root.as_ref() == Some(&pending_root) {
1157                    state.last_published_root = Some(pending_root.clone());
1158                    state.last_published_at = Some(Instant::now());
1159                    state.last_published_created_at = Some(event.created_at);
1160                }
1161            }
1162        }
1163
1164        {
1165            let mut state = publish_state.lock().expect("root publish state");
1166            if state.pending_root.as_ref() == Some(&pending_root) {
1167                let upload_satisfied = self.config.blossom_write_servers.is_empty()
1168                    || state.last_uploaded_root.as_ref() == Some(&pending_root);
1169                let publish_satisfied =
1170                    !publish_required || state.last_published_root.as_ref() == Some(&pending_root);
1171                if upload_satisfied && publish_satisfied {
1172                    state.dirty_since = None;
1173                }
1174            }
1175        }
1176
1177        info!(
1178            "Nostr mirror published {}: tree={} hash={} relays={:?}",
1179            log_label,
1180            tree_name,
1181            hex::encode(pending_root.hash),
1182            successful_relays,
1183        );
1184        if !failed_relays.is_empty() {
1185            warn!(
1186                "Nostr mirror publish had relay failures: tree={} failures={:?}",
1187                tree_name, failed_relays
1188            );
1189        }
1190        Ok(())
1191    }
1192
1193    async fn publish_root_event_to_relays(
1194        &self,
1195        publish_client: &Client,
1196        relays: &[String],
1197        event: &Event,
1198    ) -> Result<(Vec<String>, Vec<String>)> {
1199        let mut successful_relays = Vec::new();
1200        let mut failed_relays = Vec::new();
1201
1202        for relay in relays {
1203            match publish_client
1204                .send_event_to([relay.as_str()], event.clone())
1205                .await
1206            {
1207                Ok(output) => {
1208                    if output.success.is_empty() {
1209                        failed_relays.push(format!("{relay}: relay did not acknowledge publish"));
1210                        continue;
1211                    }
1212                    successful_relays.push(relay.clone());
1213                    failed_relays.extend(output.failed.into_iter().map(
1214                        |(url, reason)| match reason {
1215                            Some(reason) => format!("{url}: {reason}"),
1216                            None => format!("{url}: relay rejected publish"),
1217                        },
1218                    ));
1219                }
1220                Err(err) => {
1221                    failed_relays.push(format!("{relay}: {err}"));
1222                }
1223            }
1224        }
1225
1226        Ok((successful_relays, failed_relays))
1227    }
1228
1229    async fn latest_root_event_created_at(&self, tree_name: &str) -> Option<Timestamp> {
1230        let publish_client = self.publish_client.as_ref()?;
1231        let author = self.publish_pubkey?;
1232        let events = publish_client
1233            .get_events_of(
1234                vec![Self::build_public_root_filter(author, tree_name)],
1235                EventSource::relays(Some(self.config.fetch_timeout)),
1236            )
1237            .await
1238            .ok()?;
1239        events
1240            .iter()
1241            .filter(|event| Self::matches_public_root_event(event, tree_name))
1242            .max_by_key(|event| (event.created_at, event.id))
1243            .map(|event| event.created_at)
1244    }
1245
1246    fn build_public_root_filter(author: PublicKey, tree_name: &str) -> Filter {
1247        Filter::new()
1248            .kind(Kind::Custom(30078))
1249            .author(author)
1250            .custom_tag(
1251                SingleLetterTag::lowercase(Alphabet::D),
1252                vec![tree_name.to_string()],
1253            )
1254            .custom_tag(
1255                SingleLetterTag::lowercase(Alphabet::L),
1256                vec!["hashtree".to_string()],
1257            )
1258            .limit(50)
1259    }
1260
1261    fn matches_public_root_event(event: &Event, tree_name: &str) -> bool {
1262        event.kind == Kind::Custom(30078)
1263            && event.tags.iter().any(|tag| {
1264                let values = tag.as_slice();
1265                values.first().is_some_and(|value| value == "d")
1266                    && values.get(1).is_some_and(|value| value == tree_name)
1267            })
1268            && event.tags.iter().any(|tag| {
1269                let values = tag.as_slice();
1270                values.first().is_some_and(|value| value == "l")
1271                    && values.get(1).is_some_and(|value| value == "hashtree")
1272            })
1273    }
1274
1275    fn build_public_root_event(
1276        tree_name: &str,
1277        cid: &hashtree_core::Cid,
1278        created_at: Timestamp,
1279    ) -> EventBuilder {
1280        let mut tags = vec![
1281            Tag::identifier(tree_name.to_string()),
1282            Tag::custom(
1283                TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
1284                vec!["hashtree"],
1285            ),
1286            Tag::custom(TagKind::Custom("hash".into()), vec![hex::encode(cid.hash)]),
1287        ];
1288        if let Some(key) = cid.key {
1289            tags.push(Tag::custom(
1290                TagKind::Custom("key".into()),
1291                vec![hex::encode(key)],
1292            ));
1293        }
1294
1295        EventBuilder::new(Kind::Custom(30078), "", tags).custom_created_at(created_at)
1296    }
1297}
1298
1299fn later_timestamp(left: Option<Timestamp>, right: Option<Timestamp>) -> Option<Timestamp> {
1300    match (left, right) {
1301        (Some(left), Some(right)) => Some(std::cmp::max(left, right)),
1302        (Some(left), None) => Some(left),
1303        (None, Some(right)) => Some(right),
1304        (None, None) => None,
1305    }
1306}
1307
1308fn next_replaceable_created_at(now: Timestamp, latest_existing: Option<Timestamp>) -> Timestamp {
1309    match latest_existing {
1310        Some(latest) if latest >= now => Timestamp::from_secs(latest.as_u64().saturating_add(1)),
1311        _ => now,
1312    }
1313}
1314
1315#[cfg(test)]
1316mod tests;