Skip to main content

hashtree_cli/
nostr_mirror.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::sync::Arc;
3use std::sync::Mutex;
4use std::time::Duration;
5use std::time::Instant;
6
7use anyhow::{Context, Result};
8use hashtree_nostr::{
9    CrawlConfig, CrawlReport, ListEventsOptions, NostrBridge, NostrEventStore, RelayFetchMode,
10};
11use nostr::{
12    Alphabet, Event, EventBuilder, Filter, Kind, PublicKey, SingleLetterTag, Tag, TagKind,
13    Timestamp,
14};
15use nostr_sdk::{
16    pool::RelayLimits, prelude::RelayPoolNotification, Client, EventSource, Keys, Options,
17    RelayStatus,
18};
19use tokio::sync::{watch, Mutex as AsyncMutex};
20use tracing::{debug, info, warn};
21
22use crate::blossom_push::background_blossom_push_with_store;
23use crate::socialgraph::crawler::SOCIALGRAPH_RELAY_EVENT_MAX_SIZE;
24use crate::socialgraph::{self, SocialGraphBackend, SocialGraphStore};
25use crate::HashtreeStore;
26
27#[cfg(not(test))]
28const MIRROR_STARTUP_DELAY: Duration = Duration::from_secs(8);
29#[cfg(test)]
30const MIRROR_STARTUP_DELAY: Duration = Duration::from_millis(50);
31
32#[cfg(not(test))]
33const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_secs(1);
34#[cfg(test)]
35const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_millis(250);
36
37#[cfg(not(test))]
38const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
39#[cfg(test)]
40const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_millis(100);
41
42#[cfg(not(test))]
43const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_secs(30);
44#[cfg(test)]
45const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_millis(100);
46
47const KIND_LONG_FORM_CONTENT: u16 = 30_023;
48const DEFAULT_HISTORY_KINDS: [u16; 7] = [0, 1, 3, 6, 7, 9735, KIND_LONG_FORM_CONTENT];
49const DEFAULT_EVENT_TREE_NAME: &str = "nostr-event-index";
50const DEFAULT_PROFILE_SEARCH_TREE_NAME: &str = "profile-search";
51const DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME: &str = "profiles-by-pubkey";
52const METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 1;
53const METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE: usize = 64;
54const DEFAULT_FULL_TEXT_NOTE_HISTORY_FOLLOW_DISTANCE: u32 = 2;
55const DEFAULT_FULL_TEXT_NOTE_HISTORY_MAX_RELAY_PAGES: usize = 0;
56const LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER: usize = 8;
57const LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 16;
58const LARGE_HISTORY_SYNC_MAX_RELAY_PAGES: usize = 20;
59
60#[cfg(not(test))]
61const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_secs(300);
62#[cfg(test)]
63const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_millis(100);
64
65#[cfg(not(test))]
66const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_secs(5);
67#[cfg(test)]
68const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_millis(20);
69
70#[cfg(not(test))]
71const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_secs(30);
72#[cfg(test)]
73const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_millis(100);
74
75#[cfg(not(test))]
76const MIRROR_ROOT_UPLOAD_RETRY_INTERVAL: Duration = Duration::from_secs(60);
77#[cfg(test)]
78const MIRROR_ROOT_UPLOAD_RETRY_INTERVAL: Duration = Duration::from_millis(100);
79
80const MISSING_LOCAL_BLOB_PUSH_ERROR: &str = "missing local blob";
81
82#[derive(Debug, Clone)]
83pub struct NostrMirrorConfig {
84    pub relays: Vec<String>,
85    pub publish_relays: Vec<String>,
86    pub blossom_write_servers: Vec<String>,
87    pub max_follow_distance: u32,
88    pub overmute_threshold: f64,
89    pub author_batch_size: usize,
90    pub history_sync_author_chunk_size: usize,
91    pub history_sync_per_author_event_limit: usize,
92    pub missing_profile_backfill_batch_size: usize,
93    pub fetch_timeout: Duration,
94    pub relay_event_max_size: Option<u32>,
95    pub require_negentropy: bool,
96    pub kinds: Vec<u16>,
97    pub history_sync_on_start: bool,
98    pub history_sync_on_reconnect: bool,
99    pub full_text_note_history_follow_distance: Option<u32>,
100    pub full_text_note_history_max_relay_pages: usize,
101    pub published_event_tree_name: Option<String>,
102    pub published_profile_search_tree_name: Option<String>,
103    pub published_profiles_by_pubkey_tree_name: Option<String>,
104}
105
106impl Default for NostrMirrorConfig {
107    fn default() -> Self {
108        Self {
109            relays: Vec::new(),
110            publish_relays: Vec::new(),
111            blossom_write_servers: Vec::new(),
112            max_follow_distance: 2,
113            overmute_threshold: 1.0,
114            author_batch_size: 256,
115            history_sync_author_chunk_size: 5_000,
116            history_sync_per_author_event_limit: 256,
117            missing_profile_backfill_batch_size: 5_000,
118            fetch_timeout: Duration::from_secs(15),
119            relay_event_max_size: Some(SOCIALGRAPH_RELAY_EVENT_MAX_SIZE),
120            require_negentropy: false,
121            kinds: DEFAULT_HISTORY_KINDS.to_vec(),
122            history_sync_on_start: true,
123            history_sync_on_reconnect: true,
124            full_text_note_history_follow_distance: Some(
125                DEFAULT_FULL_TEXT_NOTE_HISTORY_FOLLOW_DISTANCE,
126            ),
127            full_text_note_history_max_relay_pages: DEFAULT_FULL_TEXT_NOTE_HISTORY_MAX_RELAY_PAGES,
128            published_event_tree_name: Some(DEFAULT_EVENT_TREE_NAME.to_string()),
129            published_profile_search_tree_name: Some(DEFAULT_PROFILE_SEARCH_TREE_NAME.to_string()),
130            published_profiles_by_pubkey_tree_name: Some(
131                DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME.to_string(),
132            ),
133        }
134    }
135}
136
137#[derive(Debug, Default)]
138struct RootPublishState {
139    pending_root: Option<hashtree_core::Cid>,
140    last_changed_at: Option<Instant>,
141    dirty_since: Option<Instant>,
142    last_published_root: Option<hashtree_core::Cid>,
143    last_published_at: Option<Instant>,
144    last_published_created_at: Option<Timestamp>,
145    last_uploaded_root: Option<hashtree_core::Cid>,
146    last_uploaded_at: Option<Instant>,
147    upload_in_progress_root: Option<hashtree_core::Cid>,
148    last_upload_failed_at: Option<Instant>,
149    last_upload_error: Option<String>,
150    missing_blob_rebuild_required: bool,
151}
152
153#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154struct HistorySyncPlan {
155    relay_fetch_mode: RelayFetchMode,
156    author_batch_size: usize,
157    per_author_event_limit: usize,
158    relay_page_size: usize,
159    max_relay_pages: usize,
160}
161
162pub struct BackgroundNostrMirror {
163    config: NostrMirrorConfig,
164    store: Arc<HashtreeStore>,
165    graph_store: Arc<SocialGraphStore>,
166    client: Client,
167    publish_client: Option<Client>,
168    publish_pubkey: Option<PublicKey>,
169    event_publish_state: Arc<Mutex<RootPublishState>>,
170    profile_search_publish_state: Arc<Mutex<RootPublishState>>,
171    profiles_by_pubkey_publish_state: Arc<Mutex<RootPublishState>>,
172    pending_live_events: Mutex<BTreeMap<String, Event>>,
173    missing_profile_cursor: Mutex<usize>,
174    history_sync_lock: AsyncMutex<()>,
175    shutdown_tx: watch::Sender<bool>,
176    shutdown_rx: watch::Receiver<bool>,
177}
178
179impl BackgroundNostrMirror {
180    pub async fn new(
181        config: NostrMirrorConfig,
182        store: Arc<HashtreeStore>,
183        graph_store: Arc<SocialGraphStore>,
184        publish_keys: Option<Keys>,
185    ) -> Result<Self> {
186        let client = if let Some(max_size) = config.relay_event_max_size {
187            let mut limits = RelayLimits::default();
188            limits.events.max_size = Some(max_size);
189            Client::with_opts(Keys::generate(), Options::new().relay_limits(limits))
190        } else {
191            Client::new(Keys::generate())
192        };
193        for relay in &config.relays {
194            client
195                .add_relay(relay)
196                .await
197                .with_context(|| format!("add mirror relay {relay}"))?;
198        }
199        client.connect().await;
200
201        let publish_pubkey = publish_keys.as_ref().map(Keys::public_key);
202        let publish_client = if let Some(keys) = publish_keys {
203            if config.publish_relays.is_empty() {
204                None
205            } else {
206                let client = Client::with_opts(keys, Options::new().wait_for_send(false));
207                for relay in &config.publish_relays {
208                    client
209                        .add_relay(relay)
210                        .await
211                        .with_context(|| format!("add mirror publish relay {relay}"))?;
212                }
213                client.connect().await;
214                Some(client)
215            }
216        } else {
217            None
218        };
219
220        let (shutdown_tx, shutdown_rx) = watch::channel(false);
221        Ok(Self {
222            config,
223            store,
224            graph_store,
225            client,
226            publish_client,
227            publish_pubkey,
228            event_publish_state: Arc::new(Mutex::new(RootPublishState::default())),
229            profile_search_publish_state: Arc::new(Mutex::new(RootPublishState::default())),
230            profiles_by_pubkey_publish_state: Arc::new(Mutex::new(RootPublishState::default())),
231            pending_live_events: Mutex::new(BTreeMap::new()),
232            missing_profile_cursor: Mutex::new(0),
233            history_sync_lock: AsyncMutex::new(()),
234            shutdown_tx,
235            shutdown_rx,
236        })
237    }
238
239    pub fn shutdown(&self) {
240        let _ = self.shutdown_tx.send(true);
241    }
242
243    fn sync_publish_roots_from_store(&self) -> Result<()> {
244        self.note_public_events_root_change()?;
245        self.note_profile_search_root_change()?;
246        self.note_profiles_by_pubkey_root_change()?;
247        Ok(())
248    }
249
250    async fn publish_pending_roots(
251        &self,
252        force_event: bool,
253        force_profile_search: bool,
254        force_profiles_by_pubkey: bool,
255    ) -> (Result<()>, Result<()>, Result<()>) {
256        tokio::join!(
257            self.maybe_publish_event_root(force_event),
258            self.maybe_publish_profile_search_root(force_profile_search),
259            self.maybe_publish_profiles_by_pubkey_root(force_profiles_by_pubkey),
260        )
261    }
262
263    async fn publish_priority_roots(
264        &self,
265        force_event: bool,
266        force_profile_search: bool,
267        force_profiles_by_pubkey: bool,
268    ) -> (Result<()>, Result<()>, Result<()>) {
269        let (profile_search_result, profiles_by_pubkey_result) = tokio::join!(
270            async {
271                if force_profile_search {
272                    self.maybe_publish_profile_search_root(true).await
273                } else {
274                    Ok(())
275                }
276            },
277            async {
278                if force_profiles_by_pubkey {
279                    self.maybe_publish_profiles_by_pubkey_root(true).await
280                } else {
281                    Ok(())
282                }
283            },
284        );
285        let event_result = if force_event {
286            self.maybe_publish_event_root(true).await
287        } else {
288            Ok(())
289        };
290        (
291            event_result,
292            profile_search_result,
293            profiles_by_pubkey_result,
294        )
295    }
296
297    pub async fn run(self: Arc<Self>) -> Result<()> {
298        if self.config.relays.is_empty() || self.config.max_follow_distance == 0 {
299            return Ok(());
300        }
301
302        info!(
303            "Nostr mirror starting: relays={} max_follow_distance={} negentropy_only={} kinds={:?} history_sync_author_chunk_size={} history_sync_on_start={} history_sync_on_reconnect={}",
304            self.config.relays.len(),
305            self.config.max_follow_distance,
306            self.config.require_negentropy,
307            self.config.kinds,
308            self.config.history_sync_author_chunk_size.max(1),
309            self.config.history_sync_on_start,
310            self.config.history_sync_on_reconnect
311        );
312
313        tokio::time::sleep(MIRROR_STARTUP_DELAY).await;
314        tokio::time::sleep(MIRROR_CONNECT_SETTLE_DELAY).await;
315        let live_since = Timestamp::now();
316        self.sync_publish_roots_from_store()?;
317        let (event_result, profile_search_result, profiles_by_pubkey_result) =
318            self.publish_priority_roots(true, true, true).await;
319        if let Err(err) = event_result {
320            warn!(
321                "Nostr mirror event-root publish failed on startup: {:#}",
322                err
323            );
324        }
325        if let Err(err) = profile_search_result {
326            warn!(
327                "Nostr mirror profile-search publish failed on startup: {:#}",
328                err
329            );
330        }
331        if let Err(err) = profiles_by_pubkey_result {
332            warn!(
333                "Nostr mirror profiles-by-pubkey publish failed on startup: {:#}",
334                err
335            );
336        }
337
338        let initial_authors = self.collect_authors()?;
339        if initial_authors.is_empty() {
340            info!("Nostr mirror: no social-graph authors to mirror yet");
341        }
342
343        let mut subscribed_authors = HashSet::new();
344        self.subscribe_authors_since(&initial_authors, live_since, &mut subscribed_authors)
345            .await?;
346
347        if !initial_authors.is_empty() && self.config.history_sync_on_start {
348            self.spawn_startup_history_sync(initial_authors.clone());
349        }
350
351        let mut relay_statuses = self.capture_relay_statuses().await;
352        let mut last_reconnect_history_sync_at: Option<Instant> = None;
353        let mut last_missing_profile_backfill_at: Option<Instant> = None;
354        let mut notifications = self.client.notifications();
355        let mut shutdown_rx = self.shutdown_rx.clone();
356        let mut refresh_interval = tokio::time::interval(MIRROR_AUTHOR_REFRESH_INTERVAL);
357        let mut publish_interval = tokio::time::interval(MIRROR_ROOT_PUBLISH_DEBOUNCE);
358
359        loop {
360            tokio::select! {
361                _ = shutdown_rx.changed() => {
362                    if *shutdown_rx.borrow() {
363                        break;
364                    }
365                }
366                _ = refresh_interval.tick() => {
367                    let authors = self.collect_authors()?;
368                    let new_authors = authors
369                        .into_iter()
370                        .filter(|author| !subscribed_authors.contains(author))
371                        .collect::<Vec<_>>();
372                    if !new_authors.is_empty() {
373                        debug!(
374                            "Nostr mirror discovered {} newly reachable author(s)",
375                            new_authors.len()
376                        );
377                        self.subscribe_authors_since(
378                            &new_authors,
379                            Timestamp::now(),
380                            &mut subscribed_authors,
381                        )
382                        .await?;
383                        self.spawn_author_history_sync(
384                            "new-author catch-up",
385                            new_authors.clone(),
386                            true,
387                            true,
388                        );
389                    }
390                    if self.should_backfill_missing_profiles(last_missing_profile_backfill_at) {
391                        let missing_profile_authors = self.collect_missing_profile_authors(
392                            self.config.missing_profile_backfill_batch_size,
393                        )?;
394                        if !missing_profile_authors.is_empty() {
395                            info!(
396                                "Nostr mirror missing-profile backfill starting: authors={}",
397                                missing_profile_authors.len()
398                            );
399                            self.spawn_missing_profile_backfill(missing_profile_authors);
400                            last_missing_profile_backfill_at = Some(Instant::now());
401                        }
402                    }
403                }
404                _ = publish_interval.tick() => {
405                    self.sync_publish_roots_from_store()?;
406                    if let Err(err) = self.flush_live_events().await {
407                        warn!("Nostr mirror live event flush failed: {:#}", err);
408                    }
409                    let (event_result, profile_search_result, profiles_by_pubkey_result) = self
410                        .publish_pending_roots(false, false, false)
411                        .await;
412                    if let Err(err) = event_result {
413                        warn!("Nostr mirror event-root publish failed: {:#}", err);
414                    }
415                    if let Err(err) = profile_search_result {
416                        warn!("Nostr mirror profile-search publish failed: {:#}", err);
417                    }
418                    if let Err(err) = profiles_by_pubkey_result {
419                        warn!("Nostr mirror profiles-by-pubkey publish failed: {:#}", err);
420                    }
421                }
422                notification = notifications.recv() => {
423                    match notification {
424                        Ok(RelayPoolNotification::Event { event, .. }) => {
425                            self.ingest_live_event(&event)?;
426                        }
427                        Ok(RelayPoolNotification::RelayStatus { relay_url, status }) => {
428                            let relay_url = relay_url.to_string();
429                            let previous = relay_statuses.insert(relay_url.clone(), status);
430                            if Self::should_history_sync_on_reconnect(
431                                self.config.history_sync_on_reconnect,
432                                previous,
433                                status,
434                            ) && Self::should_run_reconnect_history_sync(
435                                    last_reconnect_history_sync_at.as_ref(),
436                                )
437                            {
438                                let authors = self.collect_authors()?;
439                                if !authors.is_empty() {
440                                    info!(
441                                        "Nostr mirror relay reconnected; running catch-up history sync: relay={} authors={} negentropy_only={}",
442                                        relay_url,
443                                        authors.len(),
444                                        self.config.require_negentropy
445                                    );
446                                    self.spawn_author_history_sync(
447                                        "relay reconnect catch-up",
448                                        authors,
449                                        false,
450                                        false,
451                                    );
452                                    last_reconnect_history_sync_at = Some(Instant::now());
453                                }
454                            }
455                        }
456                        Ok(RelayPoolNotification::Shutdown) => break,
457                        Ok(_) => {}
458                        Err(err) => {
459                            warn!("Nostr mirror notification error: {}", err);
460                            break;
461                        }
462                    }
463                }
464            }
465        }
466
467        if let Err(err) = self.flush_live_events().await {
468            warn!(
469                "Nostr mirror live event flush failed during shutdown: {:#}",
470                err
471            );
472        }
473        if let Err(err) = self.sync_publish_roots_from_store() {
474            warn!(
475                "Nostr mirror root-state refresh failed during shutdown: {:#}",
476                err
477            );
478        }
479        let (event_result, profile_search_result, profiles_by_pubkey_result) =
480            self.publish_pending_roots(true, true, true).await;
481        if let Err(err) = event_result {
482            warn!(
483                "Nostr mirror event-root publish failed during shutdown: {:#}",
484                err
485            );
486        }
487        if let Err(err) = profile_search_result {
488            warn!(
489                "Nostr mirror profile-search publish failed during shutdown: {:#}",
490                err
491            );
492        }
493        if let Err(err) = profiles_by_pubkey_result {
494            warn!(
495                "Nostr mirror profiles-by-pubkey publish failed during shutdown: {:#}",
496                err
497            );
498        }
499        let _ = self.client.disconnect().await;
500        if let Some(client) = self.publish_client.as_ref() {
501            let _ = client.disconnect().await;
502        }
503        Ok(())
504    }
505
506    fn spawn_startup_history_sync(self: &Arc<Self>, initial_authors: Vec<String>) {
507        let mirror = Arc::clone(self);
508        tokio::task::spawn_blocking(move || {
509            let runtime = tokio::runtime::Builder::new_current_thread()
510                .enable_all()
511                .build()
512                .expect("build nostr mirror startup history sync runtime");
513            runtime.block_on(async move {
514                let _guard = mirror.history_sync_lock.lock().await;
515                if let Err(err) = mirror.run_startup_history_sync(initial_authors).await {
516                    warn!("Nostr mirror startup history sync failed: {:#}", err);
517                }
518            });
519        });
520    }
521
522    async fn run_startup_history_sync(&self, initial_authors: Vec<String>) -> Result<()> {
523        self.history_sync_recent_text_notes_for_reachable_authors()
524            .await?;
525        self.history_sync_full_text_notes_for_reachable_authors()
526            .await?;
527        if self.should_backfill_missing_profiles(None) {
528            let missing_profile_authors = self
529                .collect_missing_profile_authors(self.config.missing_profile_backfill_batch_size)?;
530            if !missing_profile_authors.is_empty() {
531                info!(
532                    "Nostr mirror missing-profile backfill starting: authors={}",
533                    missing_profile_authors.len()
534                );
535                self.history_sync_authors_with_kinds(
536                    missing_profile_authors,
537                    &[Kind::Metadata.as_u16()],
538                )
539                .await?;
540            }
541        }
542        self.history_sync_authors(initial_authors).await
543    }
544
545    fn spawn_author_history_sync(
546        self: &Arc<Self>,
547        label: &'static str,
548        authors: Vec<String>,
549        include_full_text_notes: bool,
550        wait_for_existing_sync: bool,
551    ) {
552        let mirror = Arc::clone(self);
553        tokio::task::spawn_blocking(move || {
554            let runtime = tokio::runtime::Builder::new_current_thread()
555                .enable_all()
556                .build()
557                .expect("build nostr mirror author history sync runtime");
558            runtime.block_on(async move {
559                if wait_for_existing_sync {
560                    let _guard = mirror.history_sync_lock.lock().await;
561                    if let Err(err) = mirror
562                        .run_author_history_sync(authors, include_full_text_notes)
563                        .await
564                    {
565                        warn!("Nostr mirror {label} failed: {:#}", err);
566                    }
567                    return;
568                }
569
570                let Ok(_guard) = mirror.history_sync_lock.try_lock() else {
571                    info!("Nostr mirror {label} skipped; another history sync is running");
572                    return;
573                };
574                if let Err(err) = mirror
575                    .run_author_history_sync(authors, include_full_text_notes)
576                    .await
577                {
578                    warn!("Nostr mirror {label} failed: {:#}", err);
579                }
580            });
581        });
582    }
583
584    async fn run_author_history_sync(
585        &self,
586        authors: Vec<String>,
587        include_full_text_notes: bool,
588    ) -> Result<()> {
589        if include_full_text_notes {
590            self.history_sync_recent_text_notes_for_authors(authors.clone())
591                .await?;
592            self.history_sync_full_text_notes_for_authors(authors.clone())
593                .await?;
594        }
595        self.history_sync_authors(authors).await
596    }
597
598    fn spawn_missing_profile_backfill(self: &Arc<Self>, authors: Vec<String>) {
599        let mirror = Arc::clone(self);
600        tokio::task::spawn_blocking(move || {
601            let runtime = tokio::runtime::Builder::new_current_thread()
602                .enable_all()
603                .build()
604                .expect("build nostr mirror missing profile runtime");
605            runtime.block_on(async move {
606                let Ok(_guard) = mirror.history_sync_lock.try_lock() else {
607                    info!(
608                        "Nostr mirror missing-profile backfill skipped; another history sync is running"
609                    );
610                    return;
611                };
612                if let Err(err) = mirror
613                    .history_sync_authors_with_kinds(authors, &[Kind::Metadata.as_u16()])
614                    .await
615                {
616                    warn!("Nostr mirror missing-profile backfill failed: {:#}", err);
617                }
618            });
619        });
620    }
621
622    async fn capture_relay_statuses(&self) -> HashMap<String, RelayStatus> {
623        let mut statuses = HashMap::new();
624        for (relay_url, relay) in self.client.relays().await {
625            statuses.insert(relay_url.to_string(), relay.status().await);
626        }
627        statuses
628    }
629
630    async fn has_connected_publish_relay(&self) -> bool {
631        let Some(client) = self.publish_client.as_ref() else {
632            return false;
633        };
634        Self::client_has_connected_relay(client).await
635    }
636
637    async fn client_has_connected_relay(client: &Client) -> bool {
638        for (_relay_url, relay) in client.relays().await {
639            if relay.status().await == RelayStatus::Connected {
640                return true;
641            }
642        }
643        false
644    }
645
646    fn collect_authors(&self) -> Result<Vec<String>> {
647        self.collect_authors_with_max_distance(self.config.max_follow_distance)
648    }
649
650    fn collect_authors_with_max_distance(&self, max_distance: u32) -> Result<Vec<String>> {
651        let mut authors = Vec::new();
652        let mut seen = HashSet::new();
653        for distance in 0..=max_distance {
654            for pubkey in socialgraph::SocialGraphBackend::users_by_follow_distance(
655                self.graph_store.as_ref(),
656                distance,
657            )
658            .with_context(|| format!("load social-graph distance {distance}"))?
659            {
660                if self
661                    .graph_store
662                    .is_overmuted_user(&pubkey, self.config.overmute_threshold)?
663                {
664                    continue;
665                }
666                let hex = hex::encode(pubkey);
667                if seen.insert(hex.clone()) {
668                    authors.push(hex);
669                }
670            }
671        }
672        Ok(authors)
673    }
674
675    fn full_text_note_history_follow_distance(&self) -> Option<u32> {
676        let distance = self.config.full_text_note_history_follow_distance?;
677        if self
678            .config
679            .kinds
680            .iter()
681            .any(|kind| *kind == Kind::TextNote.as_u16() || *kind == KIND_LONG_FORM_CONTENT)
682        {
683            Some(distance.min(self.config.max_follow_distance))
684        } else {
685            None
686        }
687    }
688
689    fn full_text_note_history_max_relay_pages(&self) -> Option<usize> {
690        Self::full_text_note_history_max_relay_pages_for_config(&self.config)
691    }
692
693    fn full_text_note_history_max_relay_pages_for_config(
694        config: &NostrMirrorConfig,
695    ) -> Option<usize> {
696        let pages = config.full_text_note_history_max_relay_pages;
697        if pages == 0 {
698            None
699        } else {
700            Some(pages)
701        }
702    }
703
704    fn collect_missing_profile_authors(&self, limit: usize) -> Result<Vec<String>> {
705        if limit == 0 {
706            return Ok(Vec::new());
707        }
708
709        let authors = self.collect_authors()?;
710        if authors.is_empty() {
711            return Ok(Vec::new());
712        }
713
714        let mut cursor = self
715            .missing_profile_cursor
716            .lock()
717            .expect("missing profile cursor");
718        let mut index = (*cursor).min(authors.len());
719        let mut scanned = 0usize;
720        let mut missing = Vec::new();
721
722        while scanned < authors.len() && missing.len() < limit {
723            let author = &authors[index];
724            if self.graph_store.latest_profile_event(author)?.is_none() {
725                missing.push(author.clone());
726            }
727            index += 1;
728            if index == authors.len() {
729                index = 0;
730            }
731            scanned += 1;
732        }
733
734        *cursor = index;
735        Ok(missing)
736    }
737
738    fn should_backfill_missing_profiles(&self, last_run: Option<Instant>) -> bool {
739        if self.config.missing_profile_backfill_batch_size == 0
740            || !self.config.kinds.contains(&Kind::Metadata.as_u16())
741        {
742            return false;
743        }
744        match last_run {
745            Some(last_run) => last_run.elapsed() >= MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL,
746            None => true,
747        }
748    }
749
750    fn should_history_sync_on_reconnect(
751        history_sync_on_reconnect: bool,
752        previous: Option<RelayStatus>,
753        status: RelayStatus,
754    ) -> bool {
755        history_sync_on_reconnect
756            && status == RelayStatus::Connected
757            && matches!(
758                previous,
759                Some(
760                    RelayStatus::Initialized
761                        | RelayStatus::Pending
762                        | RelayStatus::Connecting
763                        | RelayStatus::Disconnected
764                        | RelayStatus::Terminated
765                )
766            )
767    }
768
769    fn should_run_reconnect_history_sync(last_run: Option<&Instant>) -> bool {
770        match last_run {
771            None => true,
772            Some(last_run) => last_run.elapsed() >= MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN,
773        }
774    }
775
776    fn is_metadata_only_history_sync(kinds: &[u16]) -> bool {
777        !kinds.is_empty() && kinds.iter().all(|kind| *kind == Kind::Metadata.as_u16())
778    }
779
780    fn history_sync_kinds_affect_profile_or_graph(kinds: &[u16]) -> bool {
781        kinds.is_empty()
782            || kinds.iter().any(|kind| {
783                *kind == Kind::Metadata.as_u16()
784                    || *kind == Kind::ContactList.as_u16()
785                    || *kind == Kind::MuteList.as_u16()
786            })
787    }
788
789    fn history_sync_plan_for(
790        config: &NostrMirrorConfig,
791        authors: usize,
792        kinds: &[u16],
793    ) -> HistorySyncPlan {
794        let author_batch_size = config.author_batch_size.max(1);
795        let per_author_event_limit = config.history_sync_per_author_event_limit.max(1);
796        let relay_page_size = 1_000;
797        let max_relay_pages = 10;
798
799        if Self::is_metadata_only_history_sync(kinds) {
800            return HistorySyncPlan {
801                relay_fetch_mode: RelayFetchMode::AuthorBatches,
802                author_batch_size: author_batch_size.min(METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE),
803                per_author_event_limit: METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT,
804                relay_page_size,
805                max_relay_pages,
806            };
807        }
808
809        if authors > author_batch_size.saturating_mul(LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER) {
810            return HistorySyncPlan {
811                relay_fetch_mode: RelayFetchMode::GlobalRecent,
812                author_batch_size,
813                per_author_event_limit: per_author_event_limit
814                    .min(LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT)
815                    .max(1),
816                relay_page_size,
817                max_relay_pages: LARGE_HISTORY_SYNC_MAX_RELAY_PAGES,
818            };
819        }
820
821        HistorySyncPlan {
822            relay_fetch_mode: RelayFetchMode::AuthorBatches,
823            author_batch_size,
824            per_author_event_limit,
825            relay_page_size,
826            max_relay_pages,
827        }
828    }
829
830    fn history_sync_plan(&self, authors: usize, kinds: &[u16]) -> HistorySyncPlan {
831        Self::history_sync_plan_for(&self.config, authors, kinds)
832    }
833
834    async fn history_sync_authors(&self, authors: Vec<String>) -> Result<()> {
835        self.history_sync_authors_with_kinds(authors, &self.config.kinds)
836            .await
837    }
838
839    async fn history_sync_authors_with_kinds(
840        &self,
841        authors: Vec<String>,
842        kinds: &[u16],
843    ) -> Result<()> {
844        self.history_sync_authors_with_kinds_and_mode(authors, kinds, false, None)
845            .await
846    }
847
848    async fn history_sync_full_text_notes_for_reachable_authors(&self) -> Result<()> {
849        let Some(distance) = self.full_text_note_history_follow_distance() else {
850            return Ok(());
851        };
852        info!(
853            "Nostr mirror full text content history author collection starting: max_follow_distance={distance}"
854        );
855        let authors = self.collect_authors_with_max_distance(distance)?;
856        self.history_sync_full_text_notes_for_authors(authors).await
857    }
858
859    async fn history_sync_recent_text_notes_for_reachable_authors(&self) -> Result<()> {
860        let Some(distance) = self.full_text_note_history_follow_distance() else {
861            return Ok(());
862        };
863        info!(
864            "Nostr mirror recent text content catch-up author collection starting: max_follow_distance={distance}"
865        );
866        let authors = self.collect_authors_with_max_distance(distance)?;
867        self.history_sync_recent_text_notes_for_authors(authors)
868            .await
869    }
870
871    async fn history_sync_recent_text_notes_for_authors(&self, authors: Vec<String>) -> Result<()> {
872        if authors.is_empty() || self.full_text_note_history_follow_distance().is_none() {
873            return Ok(());
874        }
875
876        info!(
877            "Nostr mirror recent text content catch-up starting: authors={}",
878            authors.len()
879        );
880        let kinds = [Kind::TextNote.as_u16(), KIND_LONG_FORM_CONTENT];
881        let chunk_size = self
882            .config
883            .author_batch_size
884            .max(1)
885            .saturating_mul(LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER + 1);
886        self.history_sync_authors_chunked(
887            authors,
888            |current_root, author_chunk| async move {
889                self.history_sync_author_chunk(current_root, author_chunk, &kinds, false, None)
890                    .await
891            },
892            false,
893            Some(chunk_size),
894        )
895        .await
896    }
897
898    async fn history_sync_full_text_notes_for_authors(&self, authors: Vec<String>) -> Result<()> {
899        let Some(distance) = self.full_text_note_history_follow_distance() else {
900            return Ok(());
901        };
902        let Some(max_relay_pages) = self.full_text_note_history_max_relay_pages() else {
903            info!("Nostr mirror full text content history sync skipped: max_relay_pages=0");
904            return Ok(());
905        };
906        let mut close_authors = Vec::new();
907        for author in authors {
908            let Ok(pubkey) = hex::decode(&author) else {
909                continue;
910            };
911            let Ok(pubkey) = <[u8; 32]>::try_from(pubkey.as_slice()) else {
912                continue;
913            };
914            if self
915                .graph_store
916                .follow_distance(&pubkey)?
917                .is_some_and(|actual_distance| actual_distance <= distance)
918            {
919                close_authors.push(author);
920            }
921        }
922        if close_authors.is_empty() {
923            return Ok(());
924        }
925
926        info!(
927            "Nostr mirror full text content history sync starting: authors={} max_follow_distance={} max_relay_pages={}",
928            close_authors.len(),
929            distance,
930            max_relay_pages
931        );
932        let kinds = [Kind::TextNote.as_u16(), KIND_LONG_FORM_CONTENT];
933        self.history_sync_authors_with_kinds_and_mode(
934            close_authors,
935            &kinds,
936            true,
937            Some(max_relay_pages),
938        )
939        .await
940    }
941
942    async fn history_sync_authors_with_kinds_and_mode(
943        &self,
944        authors: Vec<String>,
945        kinds: &[u16],
946        full_author_history: bool,
947        max_relay_pages: Option<usize>,
948    ) -> Result<()> {
949        let update_profile_and_graph = Self::history_sync_kinds_affect_profile_or_graph(kinds);
950        self.history_sync_authors_chunked(
951            authors,
952            |current_root, author_chunk| async move {
953                self.history_sync_author_chunk(
954                    current_root,
955                    author_chunk,
956                    kinds,
957                    full_author_history,
958                    max_relay_pages,
959                )
960                .await
961            },
962            update_profile_and_graph,
963            None,
964        )
965        .await
966    }
967
968    async fn history_sync_authors_chunked<F, Fut>(
969        &self,
970        authors: Vec<String>,
971        mut run_chunk: F,
972        update_profile_and_graph: bool,
973        chunk_size_override: Option<usize>,
974    ) -> Result<()>
975    where
976        F: FnMut(Option<hashtree_core::Cid>, Vec<String>) -> Fut,
977        Fut: std::future::Future<Output = Result<CrawlReport>>,
978    {
979        if authors.is_empty() {
980            return Ok(());
981        }
982
983        info!(
984            "Nostr mirror history sync starting: authors={} relays={} negentropy_only={}",
985            authors.len(),
986            self.config.relays.len(),
987            self.config.require_negentropy
988        );
989
990        let mut current_root = self.graph_store.public_events_root_for_write()?;
991        let mut last_error = None;
992        let mut applied_chunks = 0usize;
993        let mut failed_chunks = 0usize;
994        let chunk_size = chunk_size_override
995            .unwrap_or(self.config.history_sync_author_chunk_size)
996            .max(1);
997        let total_chunks = authors.len().div_ceil(chunk_size);
998
999        for (chunk_index, author_chunk) in authors.chunks(chunk_size).enumerate() {
1000            let author_chunk = author_chunk.to_vec();
1001            let author_count = author_chunk.len();
1002            info!(
1003                "Nostr mirror history sync chunk starting: chunk={}/{} authors={}",
1004                chunk_index + 1,
1005                total_chunks,
1006                author_count
1007            );
1008            let mut report = match run_chunk(current_root.clone(), author_chunk.clone()).await {
1009                Ok(report) => report,
1010                Err(err) => {
1011                    failed_chunks = failed_chunks.saturating_add(1);
1012                    warn!(
1013                        "Nostr mirror history sync chunk failed: chunk={}/{} authors={} error={:#}",
1014                        chunk_index + 1,
1015                        total_chunks,
1016                        author_count,
1017                        err
1018                    );
1019                    last_error = Some(err);
1020                    continue;
1021                }
1022            };
1023
1024            let latest_root = self.graph_store.public_events_root_for_write()?;
1025            if latest_root != current_root {
1026                info!(
1027                    "Nostr mirror history sync root advanced while chunk was fetching; merging chunk into latest root: chunk={}/{} authors={} events_applied={}",
1028                    chunk_index + 1,
1029                    total_chunks,
1030                    author_count,
1031                    report.applied_events.len()
1032                );
1033                if report.applied_events.is_empty() {
1034                    report.root = latest_root.clone();
1035                } else {
1036                    let event_store = NostrEventStore::new(self.store.store_arc());
1037                    report.root = event_store
1038                        .build(latest_root.as_ref(), report.applied_events.clone())
1039                        .await
1040                        .context("merge history chunk into latest mirrored event root")?;
1041                }
1042                current_root = latest_root;
1043            }
1044
1045            if report.root != current_root {
1046                self.apply_history_root_with_options(
1047                    report.root.as_ref(),
1048                    update_profile_and_graph,
1049                    true,
1050                )
1051                .await?;
1052                current_root = report.root.clone();
1053                info!(
1054                    "Nostr mirror history sync updated trusted root: chunk={}/{} authors_processed={} events_selected={} events_seen={}",
1055                    chunk_index + 1,
1056                    total_chunks,
1057                    report.authors_processed,
1058                    report.events_selected,
1059                    report.events_seen
1060                );
1061            }
1062            applied_chunks = applied_chunks.saturating_add(1);
1063        }
1064
1065        if applied_chunks == 0 {
1066            return Err(last_error
1067                .unwrap_or_else(|| anyhow::anyhow!("mirror history sync made no progress"))
1068                .context("run mirror history sync"));
1069        }
1070        if failed_chunks > 0 {
1071            warn!(
1072                "Nostr mirror history sync completed with skipped chunks: applied_chunks={} failed_chunks={}",
1073                applied_chunks, failed_chunks
1074            );
1075        }
1076        Ok(())
1077    }
1078
1079    async fn history_sync_author_chunk(
1080        &self,
1081        current_root: Option<hashtree_core::Cid>,
1082        authors: Vec<String>,
1083        kinds: &[u16],
1084        full_author_history: bool,
1085        max_relay_pages: Option<usize>,
1086    ) -> Result<CrawlReport> {
1087        let mut last_error = None;
1088        let mut report = None;
1089        let mut plan = self.history_sync_plan(authors.len(), kinds);
1090        if full_author_history {
1091            plan.relay_fetch_mode = RelayFetchMode::AuthorBatches;
1092            plan.max_relay_pages = max_relay_pages.unwrap_or(plan.max_relay_pages);
1093        }
1094        for attempt in 0..3 {
1095            let mut last_logged_authors = 0usize;
1096            let bridge = NostrBridge::new(
1097                self.store.store_arc(),
1098                CrawlConfig {
1099                    relays: self.config.relays.clone(),
1100                    author_allowlist: Some(authors.clone()),
1101                    max_live_bytes: None,
1102                    max_events_seen: None,
1103                    max_authors: None,
1104                    max_follow_distance: None,
1105                    author_batch_size: plan.author_batch_size,
1106                    per_author_event_limit: plan.per_author_event_limit,
1107                    per_author_live_bytes: None,
1108                    fetch_timeout: self.config.fetch_timeout,
1109                    kinds: Some(kinds.to_vec()),
1110                    relay_fetch_mode: plan.relay_fetch_mode,
1111                    require_negentropy: self.config.require_negentropy,
1112                    relay_event_max_size: self.config.relay_event_max_size,
1113                    relay_page_size: plan.relay_page_size,
1114                    max_relay_pages: plan.max_relay_pages,
1115                    full_author_history,
1116                },
1117            );
1118
1119            match bridge
1120                .crawl_with_progress(self.graph_store.as_ref(), current_root.as_ref(), |progress| {
1121                    let log_interval = self.config.author_batch_size.saturating_mul(8).max(2_048);
1122                    let should_log = progress.authors_processed == progress.authors_considered
1123                        || progress.authors_processed == 0
1124                        || progress
1125                            .authors_processed
1126                            .saturating_sub(last_logged_authors)
1127                            >= log_interval;
1128                    if should_log {
1129                        last_logged_authors = progress.authors_processed;
1130                        info!(
1131                            "Nostr mirror history sync progress: authors_processed={}/{} events_selected={} events_seen={}",
1132                            progress.authors_processed,
1133                            progress.authors_considered,
1134                            progress.events_selected,
1135                            progress.events_seen
1136                        );
1137                    }
1138                })
1139                .await
1140            {
1141                Ok(next_report) => {
1142                    report = Some(next_report);
1143                    break;
1144                }
1145                Err(err) => {
1146                    last_error = Some(err);
1147                    if attempt < 2 {
1148                        tokio::time::sleep(Duration::from_millis(500)).await;
1149                    }
1150                }
1151            }
1152        }
1153        report
1154            .ok_or_else(|| last_error.expect("history sync retry captured error"))
1155            .context("run mirror history sync")
1156    }
1157
1158    #[cfg(test)]
1159    async fn apply_history_root(&self, root: Option<&hashtree_core::Cid>) -> Result<()> {
1160        self.apply_history_root_with_options(root, true, true).await
1161    }
1162
1163    async fn apply_history_root_with_options(
1164        &self,
1165        root: Option<&hashtree_core::Cid>,
1166        update_profile_and_graph: bool,
1167        publish_roots: bool,
1168    ) -> Result<()> {
1169        self.graph_store.write_public_events_root(root)?;
1170        let Some(root) = root else {
1171            return Ok(());
1172        };
1173
1174        self.note_public_events_root_change()?;
1175        if update_profile_and_graph {
1176            let event_store = NostrEventStore::new(self.store.store_arc());
1177            let events = event_store
1178                .list_recent_lossy(Some(root), ListEventsOptions::default())
1179                .await
1180                .context("list trusted mirrored events")?
1181                .into_iter()
1182                .map(socialgraph::stored_event_to_nostr_event)
1183                .collect::<Result<Vec<_>>>()?;
1184
1185            self.graph_store
1186                .rebuild_profile_index_for_events(&events)
1187                .context("rebuild mirrored profile search index")?;
1188            socialgraph::ingest_graph_parsed_events(self.graph_store.as_ref(), &events)
1189                .context("sync mirrored social graph state")?;
1190            self.note_profile_search_root_change()?;
1191            self.note_profiles_by_pubkey_root_change()?;
1192        }
1193        if !publish_roots {
1194            return Ok(());
1195        }
1196        let (event_result, profile_search_result, profiles_by_pubkey_result) = self
1197            .publish_priority_roots(true, update_profile_and_graph, update_profile_and_graph)
1198            .await;
1199        if let Err(err) = event_result {
1200            warn!(
1201                "Nostr mirror event-root publish failed after root update: {:#}",
1202                err
1203            );
1204        }
1205        if let Err(err) = profile_search_result {
1206            warn!(
1207                "Nostr mirror profile-search publish failed after root update: {:#}",
1208                err
1209            );
1210        }
1211        if let Err(err) = profiles_by_pubkey_result {
1212            warn!(
1213                "Nostr mirror profiles-by-pubkey publish failed after root update: {:#}",
1214                err
1215            );
1216        }
1217        Ok(())
1218    }
1219
1220    async fn subscribe_authors_since(
1221        &self,
1222        authors: &[String],
1223        since: Timestamp,
1224        subscribed_authors: &mut HashSet<String>,
1225    ) -> Result<()> {
1226        let new_authors = authors
1227            .iter()
1228            .filter(|author| !subscribed_authors.contains(*author))
1229            .cloned()
1230            .collect::<Vec<_>>();
1231        if new_authors.is_empty() {
1232            return Ok(());
1233        }
1234
1235        for chunk in new_authors.chunks(self.config.author_batch_size.max(1)) {
1236            let pubkeys = chunk
1237                .iter()
1238                .filter_map(|author| PublicKey::from_hex(author).ok())
1239                .collect::<Vec<_>>();
1240            if pubkeys.is_empty() {
1241                continue;
1242            }
1243
1244            let filter = Filter::new()
1245                .authors(pubkeys)
1246                .kinds(self.config.kinds.iter().copied().map(Kind::from))
1247                .since(since);
1248
1249            if let Err(err) = self.client.subscribe(vec![filter], None).await {
1250                warn!(
1251                    "Nostr mirror author subscription failed: authors={} error={:#}",
1252                    chunk.len(),
1253                    err
1254                );
1255                continue;
1256            }
1257            subscribed_authors.extend(chunk.iter().cloned());
1258        }
1259        Ok(())
1260    }
1261
1262    fn ingest_live_event(&self, event: &Event) -> Result<()> {
1263        self.pending_live_events
1264            .lock()
1265            .expect("pending live events")
1266            .insert(event.id.to_hex(), event.clone());
1267        Ok(())
1268    }
1269
1270    async fn flush_live_events(&self) -> Result<()> {
1271        let pending = {
1272            let mut pending = self
1273                .pending_live_events
1274                .lock()
1275                .expect("pending live events");
1276            if pending.is_empty() {
1277                return Ok(());
1278            }
1279            std::mem::take(&mut *pending)
1280        };
1281        let events = pending.into_values().collect::<Vec<_>>();
1282        let event_count = events.len();
1283        let previous_event_root = self.graph_store.public_events_root()?;
1284        let previous_profile_search_root = self.graph_store.profile_search_root()?;
1285        let previous_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
1286
1287        socialgraph::ingest_parsed_events_with_storage_class(
1288            self.graph_store.as_ref(),
1289            &events,
1290            socialgraph::EventStorageClass::Public,
1291        )
1292        .context("ingest live mirrored event batch")?;
1293
1294        let next_event_root = self.graph_store.public_events_root()?;
1295        let next_profile_search_root = self.graph_store.profile_search_root()?;
1296        let next_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
1297        let event_root_changed = next_event_root != previous_event_root;
1298        let profile_search_root_changed = next_profile_search_root != previous_profile_search_root;
1299        let profiles_by_pubkey_root_changed =
1300            next_profiles_by_pubkey_root != previous_profiles_by_pubkey_root;
1301
1302        if event_root_changed {
1303            self.note_public_events_root_change()?;
1304        }
1305        if profile_search_root_changed {
1306            self.note_profile_search_root_change()?;
1307        }
1308        if profiles_by_pubkey_root_changed {
1309            self.note_profiles_by_pubkey_root_change()?;
1310        }
1311        if profile_search_root_changed {
1312            self.maybe_publish_profile_search_root(true).await?;
1313        }
1314        if profiles_by_pubkey_root_changed {
1315            self.maybe_publish_profiles_by_pubkey_root(true).await?;
1316        }
1317        if event_root_changed {
1318            self.maybe_publish_event_root(true).await?;
1319        }
1320        info!(
1321            "Nostr mirror flushed live events: events={} event_root_changed={} profile_search_root_changed={} profiles_by_pubkey_root_changed={}",
1322            event_count,
1323            event_root_changed,
1324            profile_search_root_changed,
1325            profiles_by_pubkey_root_changed
1326        );
1327        Ok(())
1328    }
1329
1330    fn note_public_events_root_change(&self) -> Result<()> {
1331        let root = self.graph_store.public_events_root()?;
1332        Self::note_root_change(
1333            self.config.published_event_tree_name.as_deref(),
1334            &self.event_publish_state,
1335            root,
1336        )
1337    }
1338
1339    fn note_profile_search_root_change(&self) -> Result<()> {
1340        let root = self.graph_store.profile_search_root()?;
1341        Self::note_root_change(
1342            self.config.published_profile_search_tree_name.as_deref(),
1343            &self.profile_search_publish_state,
1344            root,
1345        )
1346    }
1347
1348    fn note_profiles_by_pubkey_root_change(&self) -> Result<()> {
1349        let root = self.graph_store.profiles_by_pubkey_root()?;
1350        Self::note_root_change(
1351            self.config
1352                .published_profiles_by_pubkey_tree_name
1353                .as_deref(),
1354            &self.profiles_by_pubkey_publish_state,
1355            root,
1356        )
1357    }
1358
1359    fn note_root_change(
1360        tree_name: Option<&str>,
1361        publish_state: &Arc<Mutex<RootPublishState>>,
1362        root: Option<hashtree_core::Cid>,
1363    ) -> Result<()> {
1364        let Some(_tree_name) = tree_name else {
1365            return Ok(());
1366        };
1367
1368        let mut state = publish_state.lock().expect("root publish state");
1369        let now = Instant::now();
1370
1371        if state.pending_root == root {
1372            return Ok(());
1373        }
1374
1375        state.pending_root = root;
1376        state.last_upload_failed_at = None;
1377        state.last_upload_error = None;
1378        state.last_changed_at = Some(now);
1379        if state.dirty_since.is_none() {
1380            state.dirty_since = Some(now);
1381        }
1382        Ok(())
1383    }
1384
1385    async fn maybe_publish_event_root(&self, force: bool) -> Result<()> {
1386        self.ensure_public_events_root_is_publishable().await?;
1387        if self.take_missing_blob_event_upload_error() {
1388            return self.rebuild_event_indexes_after_missing_blobs(force).await;
1389        }
1390        let result = self
1391            .maybe_publish_root(
1392                self.config.published_event_tree_name.as_deref(),
1393                &self.event_publish_state,
1394                "event root",
1395                force,
1396            )
1397            .await;
1398        let Err(error) = result else {
1399            if self.take_missing_blob_event_upload_error() {
1400                return self.rebuild_event_indexes_after_missing_blobs(force).await;
1401            }
1402            return Ok(());
1403        };
1404        if !is_missing_local_blob_push_error(&error) {
1405            return Err(error);
1406        }
1407
1408        self.rebuild_event_indexes_after_missing_blobs(force).await
1409    }
1410
1411    fn take_missing_blob_event_upload_error(&self) -> bool {
1412        let mut state = self
1413            .event_publish_state
1414            .lock()
1415            .expect("event root publish state");
1416        if state.upload_in_progress_root.is_some() {
1417            return false;
1418        }
1419        if !state.missing_blob_rebuild_required {
1420            return false;
1421        }
1422        state.missing_blob_rebuild_required = false;
1423        state.last_upload_error = None;
1424        state.last_upload_failed_at = None;
1425        true
1426    }
1427
1428    async fn rebuild_event_indexes_after_missing_blobs(&self, force: bool) -> Result<()> {
1429        warn!(
1430            "Nostr mirror event root DAG references missing local blobs; rebuilding event indexes from stored events"
1431        );
1432        let (public_count, ambient_count) = self
1433            .graph_store
1434            .rebuild_event_indexes_from_stored_events_async()
1435            .await
1436            .context("rebuild event indexes after missing event blobs")?;
1437        info!(
1438            "Nostr mirror rebuilt event indexes after missing blobs: public={} ambient={}",
1439            public_count, ambient_count
1440        );
1441        self.sync_publish_roots_from_store()?;
1442
1443        self.maybe_publish_root(
1444            self.config.published_event_tree_name.as_deref(),
1445            &self.event_publish_state,
1446            "event root",
1447            force,
1448        )
1449        .await
1450    }
1451
1452    async fn ensure_public_events_root_is_publishable(&self) -> Result<()> {
1453        let Some(root) = self.graph_store.public_events_root()? else {
1454            return Ok(());
1455        };
1456        let event_store = NostrEventStore::new(self.store.store_arc());
1457        if let Err(err) = event_store.validate_index_root(Some(&root)).await {
1458            warn!(
1459                "Nostr mirror refusing to publish invalid event index root {}; clearing trusted root: {}",
1460                hex::encode(root.hash),
1461                err
1462            );
1463            self.graph_store.write_public_events_root(None)?;
1464            self.note_public_events_root_change()?;
1465        }
1466        Ok(())
1467    }
1468
1469    async fn maybe_publish_profile_search_root(&self, force: bool) -> Result<()> {
1470        self.maybe_publish_root(
1471            self.config.published_profile_search_tree_name.as_deref(),
1472            &self.profile_search_publish_state,
1473            "profile search root",
1474            force,
1475        )
1476        .await
1477    }
1478
1479    async fn maybe_publish_profiles_by_pubkey_root(&self, force: bool) -> Result<()> {
1480        self.maybe_publish_root(
1481            self.config
1482                .published_profiles_by_pubkey_tree_name
1483                .as_deref(),
1484            &self.profiles_by_pubkey_publish_state,
1485            "profiles-by-pubkey root",
1486            force,
1487        )
1488        .await
1489    }
1490
1491    async fn maybe_publish_root(
1492        &self,
1493        tree_name: Option<&str>,
1494        publish_state: &Arc<Mutex<RootPublishState>>,
1495        log_label: &str,
1496        force: bool,
1497    ) -> Result<()> {
1498        let Some(tree_name) = tree_name else {
1499            return Ok(());
1500        };
1501
1502        let pending_root = {
1503            let state = publish_state.lock().expect("root publish state");
1504            let Some(pending_root) = state.pending_root.clone() else {
1505                return Ok(());
1506            };
1507
1508            let now = Instant::now();
1509            let debounce_ready = state.last_changed_at.is_some_and(|changed_at| {
1510                now.duration_since(changed_at) >= MIRROR_ROOT_PUBLISH_DEBOUNCE
1511            });
1512            let stale_ready = state.dirty_since.is_some_and(|dirty_since| {
1513                now.duration_since(dirty_since) >= MIRROR_ROOT_PUBLISH_MAX_STALENESS
1514            });
1515            if !force && !debounce_ready && !stale_ready {
1516                return Ok(());
1517            }
1518
1519            pending_root
1520        };
1521
1522        let upload_started =
1523            self.maybe_start_background_root_upload(&pending_root, publish_state, log_label);
1524        let upload_required = !self.config.blossom_write_servers.is_empty();
1525        let upload_ready = {
1526            let state = publish_state.lock().expect("root publish state");
1527            !upload_required || state.last_uploaded_root.as_ref() == Some(&pending_root)
1528        };
1529        let publish_before_upload_ready = force && upload_required && !upload_ready;
1530
1531        let mut successful_relays = Vec::new();
1532        let mut failed_relays = Vec::new();
1533        let mut published_now = false;
1534        let publish_required =
1535            self.publish_client.is_some() && !self.config.publish_relays.is_empty();
1536        if publish_required {
1537            let Some(publish_client) = self.publish_client.as_ref() else {
1538                unreachable!("publish_required implies publish_client");
1539            };
1540            if !self.has_connected_publish_relay().await {
1541                return Ok(());
1542            }
1543            if !upload_ready && !publish_before_upload_ready {
1544                if upload_started {
1545                    info!(
1546                        "Nostr mirror uploading {} DAG before publish: tree={} hash={}",
1547                        log_label,
1548                        tree_name,
1549                        hex::encode(pending_root.hash),
1550                    );
1551                }
1552                return Ok(());
1553            }
1554            if publish_before_upload_ready {
1555                info!(
1556                    "Nostr mirror publishing {} before Blossom upload completes: tree={} hash={}",
1557                    log_label,
1558                    tree_name,
1559                    hex::encode(pending_root.hash),
1560                );
1561            }
1562
1563            let already_published = {
1564                let state = publish_state.lock().expect("root publish state");
1565                state.last_published_root.as_ref() == Some(&pending_root)
1566            };
1567            if !already_published {
1568                let publish_relays = self.config.publish_relays.clone();
1569                let latest_known_created_at = {
1570                    let state = publish_state.lock().expect("root publish state");
1571                    state.last_published_created_at
1572                };
1573                let publish_created_at = next_replaceable_created_at(
1574                    Timestamp::now(),
1575                    later_timestamp(
1576                        latest_known_created_at,
1577                        self.latest_root_event_created_at(tree_name).await,
1578                    ),
1579                );
1580                let event = publish_client
1581                    .sign_event_builder(Self::build_public_root_event(
1582                        tree_name,
1583                        &pending_root,
1584                        publish_created_at,
1585                    ))
1586                    .await
1587                    .with_context(|| format!("sign {log_label} event"))?;
1588                let publish_result = self
1589                    .publish_root_event_to_relays(publish_client, &publish_relays, &event)
1590                    .await
1591                    .with_context(|| format!("publish {log_label} event"))?;
1592                successful_relays = publish_result.0;
1593                failed_relays = publish_result.1;
1594                if successful_relays.is_empty() {
1595                    let failure_summary = if failed_relays.is_empty() {
1596                        "no publish relays accepted the event".to_string()
1597                    } else {
1598                        failed_relays.join("; ")
1599                    };
1600                    anyhow::bail!("no publish relays accepted the event ({failure_summary})");
1601                }
1602
1603                let mut state = publish_state.lock().expect("root publish state");
1604                if state.pending_root.as_ref() == Some(&pending_root) {
1605                    state.last_published_root = Some(pending_root.clone());
1606                    state.last_published_at = Some(Instant::now());
1607                    state.last_published_created_at = Some(event.created_at);
1608                }
1609                published_now = true;
1610            }
1611        }
1612
1613        {
1614            let mut state = publish_state.lock().expect("root publish state");
1615            if state.pending_root.as_ref() == Some(&pending_root) {
1616                let upload_satisfied = self.config.blossom_write_servers.is_empty()
1617                    || state.last_uploaded_root.as_ref() == Some(&pending_root);
1618                let publish_satisfied =
1619                    !publish_required || state.last_published_root.as_ref() == Some(&pending_root);
1620                if upload_satisfied && publish_satisfied {
1621                    state.dirty_since = None;
1622                }
1623            }
1624        }
1625
1626        if published_now {
1627            info!(
1628                "Nostr mirror published {}: tree={} hash={} relays={:?}",
1629                log_label,
1630                tree_name,
1631                hex::encode(pending_root.hash),
1632                successful_relays,
1633            );
1634        }
1635        if !failed_relays.is_empty() {
1636            warn!(
1637                "Nostr mirror publish had relay failures: tree={} failures={:?}",
1638                tree_name, failed_relays
1639            );
1640        }
1641        Ok(())
1642    }
1643
1644    fn maybe_start_background_root_upload(
1645        &self,
1646        pending_root: &hashtree_core::Cid,
1647        publish_state: &Arc<Mutex<RootPublishState>>,
1648        log_label: &str,
1649    ) -> bool {
1650        if self.config.blossom_write_servers.is_empty() {
1651            return false;
1652        }
1653
1654        {
1655            let mut state = publish_state.lock().expect("root publish state");
1656            if state.last_uploaded_root.as_ref() == Some(pending_root)
1657                || state.upload_in_progress_root.is_some()
1658            {
1659                return false;
1660            }
1661            if state
1662                .last_upload_failed_at
1663                .is_some_and(|failed_at| failed_at.elapsed() < MIRROR_ROOT_UPLOAD_RETRY_INTERVAL)
1664            {
1665                return false;
1666            }
1667            state.upload_in_progress_root = Some(pending_root.clone());
1668        }
1669
1670        let store = Arc::clone(&self.store);
1671        let servers = self.config.blossom_write_servers.clone();
1672        let root = pending_root.clone();
1673        let root_string = pending_root.to_string();
1674        let publish_state = Arc::clone(publish_state);
1675        let log_label = log_label.to_string();
1676        tokio::task::spawn_blocking(move || {
1677            let runtime = tokio::runtime::Builder::new_current_thread()
1678                .enable_all()
1679                .build()
1680                .expect("build nostr mirror root upload runtime");
1681            runtime.block_on(async move {
1682                let result =
1683                    background_blossom_push_with_store(store, &root_string, &servers).await;
1684                let mut state = publish_state.lock().expect("root publish state");
1685                if state.upload_in_progress_root.as_ref() == Some(&root) {
1686                    state.upload_in_progress_root = None;
1687                }
1688                match result {
1689                    Ok(()) => {
1690                        if state.pending_root.as_ref() == Some(&root) {
1691                            state.last_uploaded_root = Some(root.clone());
1692                            state.last_uploaded_at = Some(Instant::now());
1693                            state.last_upload_failed_at = None;
1694                            state.last_upload_error = None;
1695                            state.missing_blob_rebuild_required = false;
1696                        }
1697                        info!(
1698                            "Nostr mirror uploaded {} DAG to Blossom: hash={}",
1699                            log_label,
1700                            hex::encode(root.hash)
1701                        );
1702                    }
1703                    Err(err) => {
1704                        if state.pending_root.as_ref() == Some(&root) {
1705                            state.last_upload_failed_at = Some(Instant::now());
1706                            state.last_upload_error = Some(format!("{err:#}"));
1707                        }
1708                        if is_missing_local_blob_message(&format!("{err:#}")) {
1709                            state.missing_blob_rebuild_required = true;
1710                        }
1711                        warn!(
1712                            "Nostr mirror {} DAG upload failed: hash={} error={:#}",
1713                            log_label,
1714                            hex::encode(root.hash),
1715                            err
1716                        );
1717                    }
1718                }
1719            });
1720        });
1721
1722        true
1723    }
1724
1725    async fn publish_root_event_to_relays(
1726        &self,
1727        publish_client: &Client,
1728        relays: &[String],
1729        event: &Event,
1730    ) -> Result<(Vec<String>, Vec<String>)> {
1731        let mut successful_relays = Vec::new();
1732        let mut failed_relays = Vec::new();
1733
1734        match publish_client
1735            .send_event_to(relays.iter().map(|relay| relay.as_str()), event.clone())
1736            .await
1737        {
1738            Ok(output) => {
1739                for relay in relays {
1740                    let relay_url = relay.trim_end_matches('/');
1741                    if output
1742                        .success
1743                        .iter()
1744                        .any(|url| url.as_str().trim_end_matches('/') == relay_url)
1745                    {
1746                        successful_relays.push(relay.clone());
1747                    }
1748                }
1749                failed_relays.extend(output.failed.into_iter().map(|(url, reason)| match reason {
1750                    Some(reason) => format!("{url}: {reason}"),
1751                    None => format!("{url}: relay rejected publish"),
1752                }));
1753            }
1754            Err(err) => {
1755                failed_relays.push(format!("publish relays: {err}"));
1756            }
1757        }
1758
1759        Ok((successful_relays, failed_relays))
1760    }
1761
1762    async fn latest_root_event_created_at(&self, tree_name: &str) -> Option<Timestamp> {
1763        let publish_client = self.publish_client.as_ref()?;
1764        let author = self.publish_pubkey?;
1765        let events = publish_client
1766            .get_events_of(
1767                vec![Self::build_public_root_filter(author, tree_name)],
1768                EventSource::relays(Some(self.config.fetch_timeout)),
1769            )
1770            .await
1771            .ok()?;
1772        events
1773            .iter()
1774            .filter(|event| Self::matches_public_root_event(event, tree_name))
1775            .max_by_key(|event| (event.created_at, event.id))
1776            .map(|event| event.created_at)
1777    }
1778
1779    fn build_public_root_filter(author: PublicKey, tree_name: &str) -> Filter {
1780        Filter::new()
1781            .kind(Kind::Custom(30078))
1782            .author(author)
1783            .custom_tag(
1784                SingleLetterTag::lowercase(Alphabet::D),
1785                vec![tree_name.to_string()],
1786            )
1787            .custom_tag(
1788                SingleLetterTag::lowercase(Alphabet::L),
1789                vec!["hashtree".to_string()],
1790            )
1791            .limit(50)
1792    }
1793
1794    fn matches_public_root_event(event: &Event, tree_name: &str) -> bool {
1795        event.kind == Kind::Custom(30078)
1796            && event.tags.iter().any(|tag| {
1797                let values = tag.as_slice();
1798                values.first().is_some_and(|value| value == "d")
1799                    && values.get(1).is_some_and(|value| value == tree_name)
1800            })
1801            && event.tags.iter().any(|tag| {
1802                let values = tag.as_slice();
1803                values.first().is_some_and(|value| value == "l")
1804                    && values.get(1).is_some_and(|value| value == "hashtree")
1805            })
1806    }
1807
1808    fn build_public_root_event(
1809        tree_name: &str,
1810        cid: &hashtree_core::Cid,
1811        created_at: Timestamp,
1812    ) -> EventBuilder {
1813        let mut tags = vec![
1814            Tag::identifier(tree_name.to_string()),
1815            Tag::custom(
1816                TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
1817                vec!["hashtree"],
1818            ),
1819            Tag::custom(TagKind::Custom("hash".into()), vec![hex::encode(cid.hash)]),
1820        ];
1821        if let Some(key) = cid.key {
1822            tags.push(Tag::custom(
1823                TagKind::Custom("key".into()),
1824                vec![hex::encode(key)],
1825            ));
1826        }
1827
1828        EventBuilder::new(Kind::Custom(30078), "", tags).custom_created_at(created_at)
1829    }
1830}
1831
1832fn is_missing_local_blob_push_error(error: &anyhow::Error) -> bool {
1833    error
1834        .chain()
1835        .any(|cause| cause.to_string().contains(MISSING_LOCAL_BLOB_PUSH_ERROR))
1836}
1837
1838fn is_missing_local_blob_message(message: &str) -> bool {
1839    message.contains(MISSING_LOCAL_BLOB_PUSH_ERROR)
1840}
1841
1842fn later_timestamp(left: Option<Timestamp>, right: Option<Timestamp>) -> Option<Timestamp> {
1843    match (left, right) {
1844        (Some(left), Some(right)) => Some(std::cmp::max(left, right)),
1845        (Some(left), None) => Some(left),
1846        (None, Some(right)) => Some(right),
1847        (None, None) => None,
1848    }
1849}
1850
1851fn next_replaceable_created_at(now: Timestamp, latest_existing: Option<Timestamp>) -> Timestamp {
1852    match latest_existing {
1853        Some(latest) if latest >= now => Timestamp::from_secs(latest.as_u64().saturating_add(1)),
1854        _ => now,
1855    }
1856}
1857
1858#[cfg(test)]
1859mod tests;