Skip to main content

hashtree_cli/
nostr_mirror.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::sync::Arc;
3use std::sync::Mutex;
4use std::time::Duration;
5use std::time::Instant;
6
7use anyhow::{Context, Result};
8use hashtree_nostr::{
9    CrawlConfig, CrawlReport, ListEventsOptions, NostrBridge, NostrEventStore, RelayFetchMode,
10};
11use nostr::{
12    Alphabet, Event, EventBuilder, Filter, Kind, PublicKey, SingleLetterTag, Tag, TagKind,
13    Timestamp,
14};
15use nostr_sdk::{
16    pool::RelayLimits, prelude::RelayPoolNotification, Client, EventSource, Keys, Options,
17    RelayStatus,
18};
19use tokio::sync::{watch, Mutex as AsyncMutex};
20use tracing::{debug, info, warn};
21
22use crate::blossom_push::background_blossom_push_with_store;
23use crate::socialgraph::crawler::SOCIALGRAPH_RELAY_EVENT_MAX_SIZE;
24use crate::socialgraph::{self, SocialGraphBackend, SocialGraphStore};
25use crate::HashtreeStore;
26
27#[cfg(not(test))]
28const MIRROR_STARTUP_DELAY: Duration = Duration::from_secs(8);
29#[cfg(test)]
30const MIRROR_STARTUP_DELAY: Duration = Duration::from_millis(50);
31
32#[cfg(not(test))]
33const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_secs(1);
34#[cfg(test)]
35const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_millis(250);
36
37#[cfg(not(test))]
38const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
39#[cfg(test)]
40const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_millis(100);
41
42#[cfg(not(test))]
43const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_secs(30);
44#[cfg(test)]
45const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_millis(100);
46
47const KIND_LONG_FORM_CONTENT: u16 = 30_023;
48const DEFAULT_HISTORY_KINDS: [u16; 7] = [0, 1, 3, 6, 7, 9735, KIND_LONG_FORM_CONTENT];
49const DEFAULT_EVENT_TREE_NAME: &str = "nostr-event-index";
50const DEFAULT_PROFILE_SEARCH_TREE_NAME: &str = "profile-search";
51const DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME: &str = "profiles-by-pubkey";
52const METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 1;
53const METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE: usize = 64;
54const DEFAULT_FULL_TEXT_NOTE_HISTORY_FOLLOW_DISTANCE: u32 = 2;
55const DEFAULT_FULL_TEXT_NOTE_HISTORY_MAX_RELAY_PAGES: usize = 0;
56const LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER: usize = 8;
57const LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 16;
58const LARGE_HISTORY_SYNC_MAX_RELAY_PAGES: usize = 20;
59
60#[cfg(not(test))]
61const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_secs(300);
62#[cfg(test)]
63const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_millis(100);
64
65#[cfg(not(test))]
66const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_secs(5);
67#[cfg(test)]
68const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_millis(20);
69
70#[cfg(not(test))]
71const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_secs(30);
72#[cfg(test)]
73const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_millis(100);
74
75#[cfg(not(test))]
76const MIRROR_ROOT_UPLOAD_RETRY_INTERVAL: Duration = Duration::from_secs(60);
77#[cfg(test)]
78const MIRROR_ROOT_UPLOAD_RETRY_INTERVAL: Duration = Duration::from_millis(100);
79
80const MISSING_LOCAL_BLOB_PUSH_ERROR: &str = "missing local blob";
81
82#[derive(Debug, Clone)]
83pub struct NostrMirrorConfig {
84    pub relays: Vec<String>,
85    pub publish_relays: Vec<String>,
86    pub blossom_write_servers: Vec<String>,
87    pub max_follow_distance: u32,
88    pub overmute_threshold: f64,
89    pub author_batch_size: usize,
90    pub history_sync_author_chunk_size: usize,
91    pub history_sync_per_author_event_limit: usize,
92    pub missing_profile_backfill_batch_size: usize,
93    pub fetch_timeout: Duration,
94    pub relay_event_max_size: Option<u32>,
95    pub require_negentropy: bool,
96    pub kinds: Vec<u16>,
97    pub history_sync_on_start: bool,
98    pub history_sync_on_reconnect: bool,
99    pub full_text_note_history_follow_distance: Option<u32>,
100    pub full_text_note_history_max_relay_pages: usize,
101    pub published_event_tree_name: Option<String>,
102    pub published_profile_search_tree_name: Option<String>,
103    pub published_profiles_by_pubkey_tree_name: Option<String>,
104}
105
106impl Default for NostrMirrorConfig {
107    fn default() -> Self {
108        Self {
109            relays: Vec::new(),
110            publish_relays: Vec::new(),
111            blossom_write_servers: Vec::new(),
112            max_follow_distance: 2,
113            overmute_threshold: 1.0,
114            author_batch_size: 256,
115            history_sync_author_chunk_size: 5_000,
116            history_sync_per_author_event_limit: 256,
117            missing_profile_backfill_batch_size: 5_000,
118            fetch_timeout: Duration::from_secs(15),
119            relay_event_max_size: Some(SOCIALGRAPH_RELAY_EVENT_MAX_SIZE),
120            require_negentropy: false,
121            kinds: DEFAULT_HISTORY_KINDS.to_vec(),
122            history_sync_on_start: true,
123            history_sync_on_reconnect: true,
124            full_text_note_history_follow_distance: Some(
125                DEFAULT_FULL_TEXT_NOTE_HISTORY_FOLLOW_DISTANCE,
126            ),
127            full_text_note_history_max_relay_pages: DEFAULT_FULL_TEXT_NOTE_HISTORY_MAX_RELAY_PAGES,
128            published_event_tree_name: Some(DEFAULT_EVENT_TREE_NAME.to_string()),
129            published_profile_search_tree_name: Some(DEFAULT_PROFILE_SEARCH_TREE_NAME.to_string()),
130            published_profiles_by_pubkey_tree_name: Some(
131                DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME.to_string(),
132            ),
133        }
134    }
135}
136
137#[derive(Debug, Default)]
138struct RootPublishState {
139    pending_root: Option<hashtree_core::Cid>,
140    last_changed_at: Option<Instant>,
141    dirty_since: Option<Instant>,
142    last_published_root: Option<hashtree_core::Cid>,
143    last_published_at: Option<Instant>,
144    last_published_created_at: Option<Timestamp>,
145    last_uploaded_root: Option<hashtree_core::Cid>,
146    last_uploaded_at: Option<Instant>,
147    upload_in_progress_root: Option<hashtree_core::Cid>,
148    last_upload_failed_at: Option<Instant>,
149    last_upload_error: Option<String>,
150    missing_blob_rebuild_required: bool,
151}
152
153#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154struct HistorySyncPlan {
155    relay_fetch_mode: RelayFetchMode,
156    author_batch_size: usize,
157    per_author_event_limit: usize,
158    relay_page_size: usize,
159    max_relay_pages: usize,
160}
161
162pub struct BackgroundNostrMirror {
163    config: NostrMirrorConfig,
164    store: Arc<HashtreeStore>,
165    graph_store: Arc<SocialGraphStore>,
166    client: Client,
167    publish_client: Option<Client>,
168    publish_pubkey: Option<PublicKey>,
169    event_publish_state: Arc<Mutex<RootPublishState>>,
170    profile_search_publish_state: Arc<Mutex<RootPublishState>>,
171    profiles_by_pubkey_publish_state: Arc<Mutex<RootPublishState>>,
172    pending_live_events: Mutex<BTreeMap<String, Event>>,
173    missing_profile_cursor: Mutex<usize>,
174    history_sync_lock: AsyncMutex<()>,
175    shutdown_tx: watch::Sender<bool>,
176    shutdown_rx: watch::Receiver<bool>,
177}
178
179impl BackgroundNostrMirror {
180    pub async fn new(
181        config: NostrMirrorConfig,
182        store: Arc<HashtreeStore>,
183        graph_store: Arc<SocialGraphStore>,
184        publish_keys: Option<Keys>,
185    ) -> Result<Self> {
186        let client = if let Some(max_size) = config.relay_event_max_size {
187            let mut limits = RelayLimits::default();
188            limits.events.max_size = Some(max_size);
189            Client::with_opts(Keys::generate(), Options::new().relay_limits(limits))
190        } else {
191            Client::new(Keys::generate())
192        };
193        for relay in &config.relays {
194            client
195                .add_relay(relay)
196                .await
197                .with_context(|| format!("add mirror relay {relay}"))?;
198        }
199        client.connect().await;
200
201        let publish_pubkey = publish_keys.as_ref().map(Keys::public_key);
202        let publish_client = if let Some(keys) = publish_keys {
203            if config.publish_relays.is_empty() {
204                None
205            } else {
206                let client = Client::with_opts(keys, Options::new().wait_for_send(false));
207                for relay in &config.publish_relays {
208                    client
209                        .add_relay(relay)
210                        .await
211                        .with_context(|| format!("add mirror publish relay {relay}"))?;
212                }
213                client.connect().await;
214                Some(client)
215            }
216        } else {
217            None
218        };
219
220        let (shutdown_tx, shutdown_rx) = watch::channel(false);
221        Ok(Self {
222            config,
223            store,
224            graph_store,
225            client,
226            publish_client,
227            publish_pubkey,
228            event_publish_state: Arc::new(Mutex::new(RootPublishState::default())),
229            profile_search_publish_state: Arc::new(Mutex::new(RootPublishState::default())),
230            profiles_by_pubkey_publish_state: Arc::new(Mutex::new(RootPublishState::default())),
231            pending_live_events: Mutex::new(BTreeMap::new()),
232            missing_profile_cursor: Mutex::new(0),
233            history_sync_lock: AsyncMutex::new(()),
234            shutdown_tx,
235            shutdown_rx,
236        })
237    }
238
239    pub fn shutdown(&self) {
240        let _ = self.shutdown_tx.send(true);
241    }
242
243    fn sync_publish_roots_from_store(&self) -> Result<()> {
244        self.note_public_events_root_change()?;
245        self.note_profile_search_root_change()?;
246        self.note_profiles_by_pubkey_root_change()?;
247        Ok(())
248    }
249
250    async fn publish_pending_roots(
251        &self,
252        force_event: bool,
253        force_profile_search: bool,
254        force_profiles_by_pubkey: bool,
255    ) -> (Result<()>, Result<()>, Result<()>) {
256        tokio::join!(
257            self.maybe_publish_event_root(force_event),
258            self.maybe_publish_profile_search_root(force_profile_search),
259            self.maybe_publish_profiles_by_pubkey_root(force_profiles_by_pubkey),
260        )
261    }
262
263    async fn publish_priority_roots(
264        &self,
265        force_event: bool,
266        force_profile_search: bool,
267        force_profiles_by_pubkey: bool,
268    ) -> (Result<()>, Result<()>, Result<()>) {
269        let (profile_search_result, profiles_by_pubkey_result) = tokio::join!(
270            async {
271                if force_profile_search {
272                    self.maybe_publish_profile_search_root(true).await
273                } else {
274                    Ok(())
275                }
276            },
277            async {
278                if force_profiles_by_pubkey {
279                    self.maybe_publish_profiles_by_pubkey_root(true).await
280                } else {
281                    Ok(())
282                }
283            },
284        );
285        let event_result = if force_event {
286            self.maybe_publish_event_root(true).await
287        } else {
288            Ok(())
289        };
290        (
291            event_result,
292            profile_search_result,
293            profiles_by_pubkey_result,
294        )
295    }
296
297    pub async fn run(self: Arc<Self>) -> Result<()> {
298        if self.config.relays.is_empty() || self.config.max_follow_distance == 0 {
299            return Ok(());
300        }
301
302        info!(
303            "Nostr mirror starting: relays={} max_follow_distance={} negentropy_only={} kinds={:?} history_sync_author_chunk_size={} history_sync_on_start={} history_sync_on_reconnect={}",
304            self.config.relays.len(),
305            self.config.max_follow_distance,
306            self.config.require_negentropy,
307            self.config.kinds,
308            self.config.history_sync_author_chunk_size.max(1),
309            self.config.history_sync_on_start,
310            self.config.history_sync_on_reconnect
311        );
312
313        tokio::time::sleep(MIRROR_STARTUP_DELAY).await;
314        tokio::time::sleep(MIRROR_CONNECT_SETTLE_DELAY).await;
315        let live_since = Timestamp::now();
316        self.sync_publish_roots_from_store()?;
317        let (event_result, profile_search_result, profiles_by_pubkey_result) =
318            self.publish_priority_roots(true, true, true).await;
319        if let Err(err) = event_result {
320            warn!(
321                "Nostr mirror event-root publish failed on startup: {:#}",
322                err
323            );
324        }
325        if let Err(err) = profile_search_result {
326            warn!(
327                "Nostr mirror profile-search publish failed on startup: {:#}",
328                err
329            );
330        }
331        if let Err(err) = profiles_by_pubkey_result {
332            warn!(
333                "Nostr mirror profiles-by-pubkey publish failed on startup: {:#}",
334                err
335            );
336        }
337
338        let initial_authors = self.collect_authors()?;
339        if initial_authors.is_empty() {
340            info!("Nostr mirror: no social-graph authors to mirror yet");
341        }
342
343        let mut subscribed_authors = HashSet::new();
344        self.subscribe_authors_since(&initial_authors, live_since, &mut subscribed_authors)
345            .await?;
346
347        if !initial_authors.is_empty() && self.config.history_sync_on_start {
348            self.spawn_startup_history_sync(initial_authors.clone());
349        }
350
351        let mut relay_statuses = self.capture_relay_statuses().await;
352        let mut last_reconnect_history_sync_at: Option<Instant> = None;
353        let mut last_missing_profile_backfill_at: Option<Instant> = None;
354        let mut notifications = self.client.notifications();
355        let mut shutdown_rx = self.shutdown_rx.clone();
356        let mut refresh_interval = tokio::time::interval(MIRROR_AUTHOR_REFRESH_INTERVAL);
357        let mut publish_interval = tokio::time::interval(MIRROR_ROOT_PUBLISH_DEBOUNCE);
358
359        loop {
360            tokio::select! {
361                _ = shutdown_rx.changed() => {
362                    if *shutdown_rx.borrow() {
363                        break;
364                    }
365                }
366                _ = refresh_interval.tick() => {
367                    let authors = self.collect_authors()?;
368                    let new_authors = authors
369                        .into_iter()
370                        .filter(|author| !subscribed_authors.contains(author))
371                        .collect::<Vec<_>>();
372                    if !new_authors.is_empty() {
373                        debug!(
374                            "Nostr mirror discovered {} newly reachable author(s)",
375                            new_authors.len()
376                        );
377                        self.subscribe_authors_since(
378                            &new_authors,
379                            Timestamp::now(),
380                            &mut subscribed_authors,
381                        )
382                        .await?;
383                        self.spawn_author_history_sync(
384                            "new-author catch-up",
385                            new_authors.clone(),
386                            true,
387                            true,
388                        );
389                    }
390                    if self.should_backfill_missing_profiles(last_missing_profile_backfill_at) {
391                        let missing_profile_authors = self.collect_missing_profile_authors(
392                            self.config.missing_profile_backfill_batch_size,
393                        )?;
394                        if !missing_profile_authors.is_empty() {
395                            info!(
396                                "Nostr mirror missing-profile backfill starting: authors={}",
397                                missing_profile_authors.len()
398                            );
399                            self.spawn_missing_profile_backfill(missing_profile_authors);
400                            last_missing_profile_backfill_at = Some(Instant::now());
401                        }
402                    }
403                }
404                _ = publish_interval.tick() => {
405                    self.sync_publish_roots_from_store()?;
406                    if let Err(err) = self.flush_live_events().await {
407                        warn!("Nostr mirror live event flush failed: {:#}", err);
408                    }
409                    let (event_result, profile_search_result, profiles_by_pubkey_result) = self
410                        .publish_pending_roots(false, false, false)
411                        .await;
412                    if let Err(err) = event_result {
413                        warn!("Nostr mirror event-root publish failed: {:#}", err);
414                    }
415                    if let Err(err) = profile_search_result {
416                        warn!("Nostr mirror profile-search publish failed: {:#}", err);
417                    }
418                    if let Err(err) = profiles_by_pubkey_result {
419                        warn!("Nostr mirror profiles-by-pubkey publish failed: {:#}", err);
420                    }
421                }
422                notification = notifications.recv() => {
423                    match notification {
424                        Ok(RelayPoolNotification::Event { event, .. }) => {
425                            self.ingest_live_event(&event)?;
426                        }
427                        Ok(RelayPoolNotification::RelayStatus { relay_url, status }) => {
428                            let relay_url = relay_url.to_string();
429                            let previous = relay_statuses.insert(relay_url.clone(), status);
430                            if Self::should_history_sync_on_reconnect(
431                                self.config.history_sync_on_reconnect,
432                                previous,
433                                status,
434                            ) && Self::should_run_reconnect_history_sync(
435                                    last_reconnect_history_sync_at.as_ref(),
436                                )
437                            {
438                                let authors = self.collect_authors()?;
439                                if !authors.is_empty() {
440                                    info!(
441                                        "Nostr mirror relay reconnected; running catch-up history sync: relay={} authors={} negentropy_only={}",
442                                        relay_url,
443                                        authors.len(),
444                                        self.config.require_negentropy
445                                    );
446                                    self.spawn_author_history_sync(
447                                        "relay reconnect catch-up",
448                                        authors,
449                                        false,
450                                        false,
451                                    );
452                                    last_reconnect_history_sync_at = Some(Instant::now());
453                                }
454                            }
455                        }
456                        Ok(RelayPoolNotification::Shutdown) => break,
457                        Ok(_) => {}
458                        Err(err) => {
459                            warn!("Nostr mirror notification error: {}", err);
460                            break;
461                        }
462                    }
463                }
464            }
465        }
466
467        if let Err(err) = self.flush_live_events().await {
468            warn!(
469                "Nostr mirror live event flush failed during shutdown: {:#}",
470                err
471            );
472        }
473        if let Err(err) = self.sync_publish_roots_from_store() {
474            warn!(
475                "Nostr mirror root-state refresh failed during shutdown: {:#}",
476                err
477            );
478        }
479        let (event_result, profile_search_result, profiles_by_pubkey_result) =
480            self.publish_pending_roots(true, true, true).await;
481        if let Err(err) = event_result {
482            warn!(
483                "Nostr mirror event-root publish failed during shutdown: {:#}",
484                err
485            );
486        }
487        if let Err(err) = profile_search_result {
488            warn!(
489                "Nostr mirror profile-search publish failed during shutdown: {:#}",
490                err
491            );
492        }
493        if let Err(err) = profiles_by_pubkey_result {
494            warn!(
495                "Nostr mirror profiles-by-pubkey publish failed during shutdown: {:#}",
496                err
497            );
498        }
499        let _ = self.client.disconnect().await;
500        if let Some(client) = self.publish_client.as_ref() {
501            let _ = client.disconnect().await;
502        }
503        Ok(())
504    }
505
506    fn spawn_startup_history_sync(self: &Arc<Self>, initial_authors: Vec<String>) {
507        let mirror = Arc::clone(self);
508        tokio::task::spawn_blocking(move || {
509            let runtime = tokio::runtime::Builder::new_current_thread()
510                .enable_all()
511                .build()
512                .expect("build nostr mirror startup history sync runtime");
513            runtime.block_on(async move {
514                let _guard = mirror.history_sync_lock.lock().await;
515                if let Err(err) = mirror.run_startup_history_sync(initial_authors).await {
516                    warn!("Nostr mirror startup history sync failed: {:#}", err);
517                }
518            });
519        });
520    }
521
522    async fn run_startup_history_sync(&self, initial_authors: Vec<String>) -> Result<()> {
523        if self.should_backfill_missing_profiles(None) {
524            let missing_profile_authors = self
525                .collect_missing_profile_authors(self.config.missing_profile_backfill_batch_size)?;
526            if !missing_profile_authors.is_empty() {
527                info!(
528                    "Nostr mirror missing-profile backfill starting: authors={}",
529                    missing_profile_authors.len()
530                );
531                self.history_sync_authors_with_kinds(
532                    missing_profile_authors,
533                    &[Kind::Metadata.as_u16()],
534                )
535                .await?;
536            }
537        }
538        self.history_sync_recent_text_notes_for_reachable_authors()
539            .await?;
540        self.history_sync_full_text_notes_for_reachable_authors()
541            .await?;
542        self.history_sync_authors(initial_authors).await
543    }
544
545    fn spawn_author_history_sync(
546        self: &Arc<Self>,
547        label: &'static str,
548        authors: Vec<String>,
549        include_full_text_notes: bool,
550        wait_for_existing_sync: bool,
551    ) {
552        let mirror = Arc::clone(self);
553        tokio::task::spawn_blocking(move || {
554            let runtime = tokio::runtime::Builder::new_current_thread()
555                .enable_all()
556                .build()
557                .expect("build nostr mirror author history sync runtime");
558            runtime.block_on(async move {
559                if wait_for_existing_sync {
560                    let _guard = mirror.history_sync_lock.lock().await;
561                    if let Err(err) = mirror
562                        .run_author_history_sync(authors, include_full_text_notes)
563                        .await
564                    {
565                        warn!("Nostr mirror {label} failed: {:#}", err);
566                    }
567                    return;
568                }
569
570                let Ok(_guard) = mirror.history_sync_lock.try_lock() else {
571                    info!("Nostr mirror {label} skipped; another history sync is running");
572                    return;
573                };
574                if let Err(err) = mirror
575                    .run_author_history_sync(authors, include_full_text_notes)
576                    .await
577                {
578                    warn!("Nostr mirror {label} failed: {:#}", err);
579                }
580            });
581        });
582    }
583
584    async fn run_author_history_sync(
585        &self,
586        authors: Vec<String>,
587        include_full_text_notes: bool,
588    ) -> Result<()> {
589        if include_full_text_notes {
590            self.history_sync_recent_text_notes_for_authors(authors.clone())
591                .await?;
592            self.history_sync_full_text_notes_for_authors(authors.clone())
593                .await?;
594        }
595        self.history_sync_authors(authors).await
596    }
597
598    fn spawn_missing_profile_backfill(self: &Arc<Self>, authors: Vec<String>) {
599        let mirror = Arc::clone(self);
600        tokio::task::spawn_blocking(move || {
601            let runtime = tokio::runtime::Builder::new_current_thread()
602                .enable_all()
603                .build()
604                .expect("build nostr mirror missing profile runtime");
605            runtime.block_on(async move {
606                let Ok(_guard) = mirror.history_sync_lock.try_lock() else {
607                    info!(
608                        "Nostr mirror missing-profile backfill skipped; another history sync is running"
609                    );
610                    return;
611                };
612                if let Err(err) = mirror
613                    .history_sync_authors_with_kinds(authors, &[Kind::Metadata.as_u16()])
614                    .await
615                {
616                    warn!("Nostr mirror missing-profile backfill failed: {:#}", err);
617                }
618            });
619        });
620    }
621
622    async fn capture_relay_statuses(&self) -> HashMap<String, RelayStatus> {
623        let mut statuses = HashMap::new();
624        for (relay_url, relay) in self.client.relays().await {
625            statuses.insert(relay_url.to_string(), relay.status().await);
626        }
627        statuses
628    }
629
630    async fn has_connected_publish_relay(&self) -> bool {
631        let Some(client) = self.publish_client.as_ref() else {
632            return false;
633        };
634        Self::client_has_connected_relay(client).await
635    }
636
637    async fn client_has_connected_relay(client: &Client) -> bool {
638        for (_relay_url, relay) in client.relays().await {
639            if relay.status().await == RelayStatus::Connected {
640                return true;
641            }
642        }
643        false
644    }
645
646    fn collect_authors(&self) -> Result<Vec<String>> {
647        self.collect_authors_with_max_distance(self.config.max_follow_distance)
648    }
649
650    fn collect_authors_with_max_distance(&self, max_distance: u32) -> Result<Vec<String>> {
651        let mut authors = Vec::new();
652        let mut seen = HashSet::new();
653        for distance in 0..=max_distance {
654            for pubkey in socialgraph::SocialGraphBackend::users_by_follow_distance(
655                self.graph_store.as_ref(),
656                distance,
657            )
658            .with_context(|| format!("load social-graph distance {distance}"))?
659            {
660                if self
661                    .graph_store
662                    .is_overmuted_user(&pubkey, self.config.overmute_threshold)?
663                {
664                    continue;
665                }
666                let hex = hex::encode(pubkey);
667                if seen.insert(hex.clone()) {
668                    authors.push(hex);
669                }
670            }
671        }
672        Ok(authors)
673    }
674
675    fn full_text_note_history_follow_distance(&self) -> Option<u32> {
676        let distance = self.config.full_text_note_history_follow_distance?;
677        if self
678            .config
679            .kinds
680            .iter()
681            .any(|kind| *kind == Kind::TextNote.as_u16() || *kind == KIND_LONG_FORM_CONTENT)
682        {
683            Some(distance.min(self.config.max_follow_distance))
684        } else {
685            None
686        }
687    }
688
689    fn full_text_note_history_max_relay_pages(&self) -> Option<usize> {
690        Self::full_text_note_history_max_relay_pages_for_config(&self.config)
691    }
692
693    fn full_text_note_history_max_relay_pages_for_config(
694        config: &NostrMirrorConfig,
695    ) -> Option<usize> {
696        let pages = config.full_text_note_history_max_relay_pages;
697        if pages == 0 {
698            None
699        } else {
700            Some(pages)
701        }
702    }
703
704    fn is_text_content_history_kind(kind: u16) -> bool {
705        kind == Kind::TextNote.as_u16() || kind == KIND_LONG_FORM_CONTENT
706    }
707
708    fn history_sync_kinds_for_config(config: &NostrMirrorConfig) -> Vec<u16> {
709        let mut kinds = config.kinds.clone();
710        if Self::full_text_note_history_max_relay_pages_for_config(config).is_none() {
711            kinds.retain(|kind| !Self::is_text_content_history_kind(*kind));
712        }
713        kinds
714    }
715
716    fn collect_missing_profile_authors(&self, limit: usize) -> Result<Vec<String>> {
717        if limit == 0 {
718            return Ok(Vec::new());
719        }
720
721        let authors = self.collect_authors()?;
722        if authors.is_empty() {
723            return Ok(Vec::new());
724        }
725
726        let mut cursor = self
727            .missing_profile_cursor
728            .lock()
729            .expect("missing profile cursor");
730        let mut index = (*cursor).min(authors.len());
731        let mut scanned = 0usize;
732        let mut missing = Vec::new();
733
734        while scanned < authors.len() && missing.len() < limit {
735            let author = &authors[index];
736            if self.graph_store.latest_profile_event(author)?.is_none() {
737                missing.push(author.clone());
738            }
739            index += 1;
740            if index == authors.len() {
741                index = 0;
742            }
743            scanned += 1;
744        }
745
746        *cursor = index;
747        Ok(missing)
748    }
749
750    fn should_backfill_missing_profiles(&self, last_run: Option<Instant>) -> bool {
751        if self.config.missing_profile_backfill_batch_size == 0
752            || !self.config.kinds.contains(&Kind::Metadata.as_u16())
753        {
754            return false;
755        }
756        match last_run {
757            Some(last_run) => last_run.elapsed() >= MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL,
758            None => true,
759        }
760    }
761
762    fn should_history_sync_on_reconnect(
763        history_sync_on_reconnect: bool,
764        previous: Option<RelayStatus>,
765        status: RelayStatus,
766    ) -> bool {
767        history_sync_on_reconnect
768            && status == RelayStatus::Connected
769            && matches!(
770                previous,
771                Some(
772                    RelayStatus::Initialized
773                        | RelayStatus::Pending
774                        | RelayStatus::Connecting
775                        | RelayStatus::Disconnected
776                        | RelayStatus::Terminated
777                )
778            )
779    }
780
781    fn should_run_reconnect_history_sync(last_run: Option<&Instant>) -> bool {
782        match last_run {
783            None => true,
784            Some(last_run) => last_run.elapsed() >= MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN,
785        }
786    }
787
788    fn is_metadata_only_history_sync(kinds: &[u16]) -> bool {
789        !kinds.is_empty() && kinds.iter().all(|kind| *kind == Kind::Metadata.as_u16())
790    }
791
792    fn history_sync_kinds_affect_profile_or_graph(kinds: &[u16]) -> bool {
793        kinds.is_empty()
794            || kinds.iter().any(|kind| {
795                *kind == Kind::Metadata.as_u16()
796                    || *kind == Kind::ContactList.as_u16()
797                    || *kind == Kind::MuteList.as_u16()
798            })
799    }
800
801    fn history_sync_plan_for(
802        config: &NostrMirrorConfig,
803        authors: usize,
804        kinds: &[u16],
805    ) -> HistorySyncPlan {
806        let author_batch_size = config.author_batch_size.max(1);
807        let per_author_event_limit = config.history_sync_per_author_event_limit.max(1);
808        let relay_page_size = 1_000;
809        let max_relay_pages = 10;
810
811        if Self::is_metadata_only_history_sync(kinds) {
812            return HistorySyncPlan {
813                relay_fetch_mode: RelayFetchMode::AuthorBatches,
814                author_batch_size: author_batch_size.min(METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE),
815                per_author_event_limit: METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT,
816                relay_page_size,
817                max_relay_pages,
818            };
819        }
820
821        if authors > author_batch_size.saturating_mul(LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER) {
822            return HistorySyncPlan {
823                relay_fetch_mode: RelayFetchMode::GlobalRecent,
824                author_batch_size,
825                per_author_event_limit: per_author_event_limit
826                    .min(LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT)
827                    .max(1),
828                relay_page_size,
829                max_relay_pages: LARGE_HISTORY_SYNC_MAX_RELAY_PAGES,
830            };
831        }
832
833        HistorySyncPlan {
834            relay_fetch_mode: RelayFetchMode::AuthorBatches,
835            author_batch_size,
836            per_author_event_limit,
837            relay_page_size,
838            max_relay_pages,
839        }
840    }
841
842    fn history_sync_plan(&self, authors: usize, kinds: &[u16]) -> HistorySyncPlan {
843        Self::history_sync_plan_for(&self.config, authors, kinds)
844    }
845
846    fn history_sync_chunk_size_for_config(
847        config: &NostrMirrorConfig,
848        authors: usize,
849        kinds: &[u16],
850        full_author_history: bool,
851        chunk_size_override: Option<usize>,
852    ) -> usize {
853        let configured_chunk_size = chunk_size_override
854            .unwrap_or(config.history_sync_author_chunk_size)
855            .max(1);
856        if !full_author_history
857            && Self::history_sync_plan_for(config, authors, kinds).relay_fetch_mode
858                == RelayFetchMode::GlobalRecent
859        {
860            return authors.max(1);
861        }
862        configured_chunk_size
863    }
864
865    async fn history_sync_authors(&self, authors: Vec<String>) -> Result<()> {
866        let kinds = Self::history_sync_kinds_for_config(&self.config);
867        if kinds.is_empty() {
868            info!("Nostr mirror history sync skipped: no enabled history kinds");
869            return Ok(());
870        }
871        self.history_sync_authors_with_kinds(authors, &kinds).await
872    }
873
874    async fn history_sync_authors_with_kinds(
875        &self,
876        authors: Vec<String>,
877        kinds: &[u16],
878    ) -> Result<()> {
879        self.history_sync_authors_with_kinds_and_mode(authors, kinds, false, None)
880            .await
881    }
882
883    async fn history_sync_full_text_notes_for_reachable_authors(&self) -> Result<()> {
884        let Some(distance) = self.full_text_note_history_follow_distance() else {
885            return Ok(());
886        };
887        if self.full_text_note_history_max_relay_pages().is_none() {
888            info!("Nostr mirror full text content history sync skipped: max_relay_pages=0");
889            return Ok(());
890        }
891        info!(
892            "Nostr mirror full text content history author collection starting: max_follow_distance={distance}"
893        );
894        let authors = self.collect_authors_with_max_distance(distance)?;
895        self.history_sync_full_text_notes_for_authors(authors).await
896    }
897
898    async fn history_sync_recent_text_notes_for_reachable_authors(&self) -> Result<()> {
899        let Some(distance) = self.full_text_note_history_follow_distance() else {
900            return Ok(());
901        };
902        if self.full_text_note_history_max_relay_pages().is_none() {
903            info!("Nostr mirror startup text content history sync skipped: max_relay_pages=0");
904            return Ok(());
905        }
906        info!(
907            "Nostr mirror recent text content catch-up author collection starting: max_follow_distance={distance}"
908        );
909        let authors = self.collect_authors_with_max_distance(distance)?;
910        self.history_sync_recent_text_notes_for_authors(authors)
911            .await
912    }
913
914    async fn history_sync_recent_text_notes_for_authors(&self, authors: Vec<String>) -> Result<()> {
915        if authors.is_empty()
916            || self.full_text_note_history_follow_distance().is_none()
917            || self.full_text_note_history_max_relay_pages().is_none()
918        {
919            return Ok(());
920        }
921
922        info!(
923            "Nostr mirror recent text content catch-up starting: authors={}",
924            authors.len()
925        );
926        let kinds = [Kind::TextNote.as_u16(), KIND_LONG_FORM_CONTENT];
927        let chunk_size = self
928            .config
929            .author_batch_size
930            .max(1)
931            .saturating_mul(LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER + 1);
932        self.history_sync_authors_chunked(
933            authors,
934            |current_root, author_chunk| async move {
935                self.history_sync_author_chunk(current_root, author_chunk, &kinds, false, None)
936                    .await
937            },
938            false,
939            Some(chunk_size),
940        )
941        .await
942    }
943
944    async fn history_sync_full_text_notes_for_authors(&self, authors: Vec<String>) -> Result<()> {
945        let Some(distance) = self.full_text_note_history_follow_distance() else {
946            return Ok(());
947        };
948        let Some(max_relay_pages) = self.full_text_note_history_max_relay_pages() else {
949            info!("Nostr mirror full text content history sync skipped: max_relay_pages=0");
950            return Ok(());
951        };
952        let mut close_authors = Vec::new();
953        for author in authors {
954            let Ok(pubkey) = hex::decode(&author) else {
955                continue;
956            };
957            let Ok(pubkey) = <[u8; 32]>::try_from(pubkey.as_slice()) else {
958                continue;
959            };
960            if self
961                .graph_store
962                .follow_distance(&pubkey)?
963                .is_some_and(|actual_distance| actual_distance <= distance)
964            {
965                close_authors.push(author);
966            }
967        }
968        if close_authors.is_empty() {
969            return Ok(());
970        }
971
972        info!(
973            "Nostr mirror full text content history sync starting: authors={} max_follow_distance={} max_relay_pages={}",
974            close_authors.len(),
975            distance,
976            max_relay_pages
977        );
978        let kinds = [Kind::TextNote.as_u16(), KIND_LONG_FORM_CONTENT];
979        self.history_sync_authors_with_kinds_and_mode(
980            close_authors,
981            &kinds,
982            true,
983            Some(max_relay_pages),
984        )
985        .await
986    }
987
988    async fn history_sync_authors_with_kinds_and_mode(
989        &self,
990        authors: Vec<String>,
991        kinds: &[u16],
992        full_author_history: bool,
993        max_relay_pages: Option<usize>,
994    ) -> Result<()> {
995        let update_profile_and_graph = Self::history_sync_kinds_affect_profile_or_graph(kinds);
996        let chunk_size = Self::history_sync_chunk_size_for_config(
997            &self.config,
998            authors.len(),
999            kinds,
1000            full_author_history,
1001            None,
1002        );
1003        self.history_sync_authors_chunked(
1004            authors,
1005            |current_root, author_chunk| async move {
1006                self.history_sync_author_chunk(
1007                    current_root,
1008                    author_chunk,
1009                    kinds,
1010                    full_author_history,
1011                    max_relay_pages,
1012                )
1013                .await
1014            },
1015            update_profile_and_graph,
1016            Some(chunk_size),
1017        )
1018        .await
1019    }
1020
1021    async fn history_sync_authors_chunked<F, Fut>(
1022        &self,
1023        authors: Vec<String>,
1024        mut run_chunk: F,
1025        update_profile_and_graph: bool,
1026        chunk_size_override: Option<usize>,
1027    ) -> Result<()>
1028    where
1029        F: FnMut(Option<hashtree_core::Cid>, Vec<String>) -> Fut,
1030        Fut: std::future::Future<Output = Result<CrawlReport>>,
1031    {
1032        if authors.is_empty() {
1033            return Ok(());
1034        }
1035
1036        info!(
1037            "Nostr mirror history sync starting: authors={} relays={} negentropy_only={}",
1038            authors.len(),
1039            self.config.relays.len(),
1040            self.config.require_negentropy
1041        );
1042
1043        let mut current_root = self.graph_store.public_events_root_for_write()?;
1044        let mut last_error = None;
1045        let mut applied_chunks = 0usize;
1046        let mut failed_chunks = 0usize;
1047        let chunk_size = chunk_size_override
1048            .unwrap_or(self.config.history_sync_author_chunk_size)
1049            .max(1);
1050        let total_chunks = authors.len().div_ceil(chunk_size);
1051
1052        for (chunk_index, author_chunk) in authors.chunks(chunk_size).enumerate() {
1053            let author_chunk = author_chunk.to_vec();
1054            let author_count = author_chunk.len();
1055            info!(
1056                "Nostr mirror history sync chunk starting: chunk={}/{} authors={}",
1057                chunk_index + 1,
1058                total_chunks,
1059                author_count
1060            );
1061            let mut report = match run_chunk(current_root.clone(), author_chunk.clone()).await {
1062                Ok(report) => report,
1063                Err(err) => {
1064                    failed_chunks = failed_chunks.saturating_add(1);
1065                    warn!(
1066                        "Nostr mirror history sync chunk failed: chunk={}/{} authors={} error={:#}",
1067                        chunk_index + 1,
1068                        total_chunks,
1069                        author_count,
1070                        err
1071                    );
1072                    last_error = Some(err);
1073                    continue;
1074                }
1075            };
1076
1077            let latest_root = self.graph_store.public_events_root_for_write()?;
1078            if latest_root != current_root {
1079                info!(
1080                    "Nostr mirror history sync root advanced while chunk was fetching; merging chunk into latest root: chunk={}/{} authors={} events_applied={}",
1081                    chunk_index + 1,
1082                    total_chunks,
1083                    author_count,
1084                    report.applied_events.len()
1085                );
1086                if report.applied_events.is_empty() {
1087                    report.root = latest_root.clone();
1088                } else {
1089                    let event_store = NostrEventStore::new(self.store.store_arc());
1090                    report.root = event_store
1091                        .build(latest_root.as_ref(), report.applied_events.clone())
1092                        .await
1093                        .context("merge history chunk into latest mirrored event root")?;
1094                }
1095                current_root = latest_root;
1096            }
1097
1098            if report.root != current_root {
1099                self.apply_history_root_with_options(
1100                    report.root.as_ref(),
1101                    update_profile_and_graph,
1102                    true,
1103                    Some(&report.applied_events),
1104                )
1105                .await?;
1106                current_root = report.root.clone();
1107                info!(
1108                    "Nostr mirror history sync updated trusted root: chunk={}/{} authors_processed={} events_selected={} events_seen={}",
1109                    chunk_index + 1,
1110                    total_chunks,
1111                    report.authors_processed,
1112                    report.events_selected,
1113                    report.events_seen
1114                );
1115            }
1116            applied_chunks = applied_chunks.saturating_add(1);
1117        }
1118
1119        if applied_chunks == 0 {
1120            return Err(last_error
1121                .unwrap_or_else(|| anyhow::anyhow!("mirror history sync made no progress"))
1122                .context("run mirror history sync"));
1123        }
1124        if failed_chunks > 0 {
1125            warn!(
1126                "Nostr mirror history sync completed with skipped chunks: applied_chunks={} failed_chunks={}",
1127                applied_chunks, failed_chunks
1128            );
1129        }
1130        Ok(())
1131    }
1132
1133    async fn history_sync_author_chunk(
1134        &self,
1135        current_root: Option<hashtree_core::Cid>,
1136        authors: Vec<String>,
1137        kinds: &[u16],
1138        full_author_history: bool,
1139        max_relay_pages: Option<usize>,
1140    ) -> Result<CrawlReport> {
1141        let mut last_error = None;
1142        let mut report = None;
1143        let mut plan = self.history_sync_plan(authors.len(), kinds);
1144        if full_author_history {
1145            plan.relay_fetch_mode = RelayFetchMode::AuthorBatches;
1146            plan.max_relay_pages = max_relay_pages.unwrap_or(plan.max_relay_pages);
1147        }
1148        for attempt in 0..3 {
1149            let mut last_logged_authors = 0usize;
1150            let bridge = NostrBridge::new(
1151                self.store.store_arc(),
1152                CrawlConfig {
1153                    relays: self.config.relays.clone(),
1154                    author_allowlist: Some(authors.clone()),
1155                    max_live_bytes: None,
1156                    max_events_seen: None,
1157                    max_authors: None,
1158                    max_follow_distance: None,
1159                    author_batch_size: plan.author_batch_size,
1160                    per_author_event_limit: plan.per_author_event_limit,
1161                    per_author_live_bytes: None,
1162                    fetch_timeout: self.config.fetch_timeout,
1163                    kinds: Some(kinds.to_vec()),
1164                    relay_fetch_mode: plan.relay_fetch_mode,
1165                    require_negentropy: self.config.require_negentropy,
1166                    relay_event_max_size: self.config.relay_event_max_size,
1167                    relay_page_size: plan.relay_page_size,
1168                    max_relay_pages: plan.max_relay_pages,
1169                    full_author_history,
1170                },
1171            );
1172
1173            match bridge
1174                .crawl_with_progress(self.graph_store.as_ref(), current_root.as_ref(), |progress| {
1175                    let log_interval = self.config.author_batch_size.saturating_mul(8).max(2_048);
1176                    let should_log = progress.authors_processed == progress.authors_considered
1177                        || progress.authors_processed == 0
1178                        || progress
1179                            .authors_processed
1180                            .saturating_sub(last_logged_authors)
1181                            >= log_interval;
1182                    if should_log {
1183                        last_logged_authors = progress.authors_processed;
1184                        info!(
1185                            "Nostr mirror history sync progress: authors_processed={}/{} events_selected={} events_seen={}",
1186                            progress.authors_processed,
1187                            progress.authors_considered,
1188                            progress.events_selected,
1189                            progress.events_seen
1190                        );
1191                    }
1192                })
1193                .await
1194            {
1195                Ok(next_report) => {
1196                    report = Some(next_report);
1197                    break;
1198                }
1199                Err(err) => {
1200                    last_error = Some(err);
1201                    if attempt < 2 {
1202                        tokio::time::sleep(Duration::from_millis(500)).await;
1203                    }
1204                }
1205            }
1206        }
1207        report
1208            .ok_or_else(|| last_error.expect("history sync retry captured error"))
1209            .context("run mirror history sync")
1210    }
1211
1212    #[cfg(test)]
1213    async fn apply_history_root(&self, root: Option<&hashtree_core::Cid>) -> Result<()> {
1214        self.apply_history_root_with_options(root, true, true, None)
1215            .await
1216    }
1217
1218    async fn apply_history_root_with_options(
1219        &self,
1220        root: Option<&hashtree_core::Cid>,
1221        update_profile_and_graph: bool,
1222        publish_roots: bool,
1223        applied_events: Option<&[hashtree_nostr::StoredNostrEvent]>,
1224    ) -> Result<()> {
1225        self.graph_store.write_public_events_root(root)?;
1226        let Some(root) = root else {
1227            return Ok(());
1228        };
1229
1230        self.note_public_events_root_change()?;
1231        if update_profile_and_graph {
1232            let events = match applied_events {
1233                Some(events) => events
1234                    .iter()
1235                    .cloned()
1236                    .map(socialgraph::stored_event_to_nostr_event)
1237                    .collect::<Result<Vec<_>>>()?,
1238                None => {
1239                    let event_store = NostrEventStore::new(self.store.store_arc());
1240                    event_store
1241                        .list_recent_lossy(Some(root), ListEventsOptions::default())
1242                        .await
1243                        .context("list trusted mirrored events")?
1244                        .into_iter()
1245                        .map(socialgraph::stored_event_to_nostr_event)
1246                        .collect::<Result<Vec<_>>>()?
1247                }
1248            };
1249
1250            socialgraph::ingest_graph_parsed_events(self.graph_store.as_ref(), &events)
1251                .context("sync mirrored social graph state")?;
1252            if applied_events.is_some() {
1253                self.graph_store
1254                    .sync_profile_index_for_events(&events)
1255                    .context("update mirrored profile search index")?;
1256            } else {
1257                self.graph_store
1258                    .rebuild_profile_index_for_events(&events)
1259                    .context("rebuild mirrored profile search index")?;
1260            }
1261            self.note_profile_search_root_change()?;
1262            self.note_profiles_by_pubkey_root_change()?;
1263        }
1264        if !publish_roots {
1265            return Ok(());
1266        }
1267        let (event_result, profile_search_result, profiles_by_pubkey_result) = self
1268            .publish_priority_roots(true, update_profile_and_graph, update_profile_and_graph)
1269            .await;
1270        if let Err(err) = event_result {
1271            warn!(
1272                "Nostr mirror event-root publish failed after root update: {:#}",
1273                err
1274            );
1275        }
1276        if let Err(err) = profile_search_result {
1277            warn!(
1278                "Nostr mirror profile-search publish failed after root update: {:#}",
1279                err
1280            );
1281        }
1282        if let Err(err) = profiles_by_pubkey_result {
1283            warn!(
1284                "Nostr mirror profiles-by-pubkey publish failed after root update: {:#}",
1285                err
1286            );
1287        }
1288        Ok(())
1289    }
1290
1291    async fn subscribe_authors_since(
1292        &self,
1293        authors: &[String],
1294        since: Timestamp,
1295        subscribed_authors: &mut HashSet<String>,
1296    ) -> Result<()> {
1297        let new_authors = authors
1298            .iter()
1299            .filter(|author| !subscribed_authors.contains(*author))
1300            .cloned()
1301            .collect::<Vec<_>>();
1302        if new_authors.is_empty() {
1303            return Ok(());
1304        }
1305
1306        for chunk in new_authors.chunks(self.config.author_batch_size.max(1)) {
1307            let pubkeys = chunk
1308                .iter()
1309                .filter_map(|author| PublicKey::from_hex(author).ok())
1310                .collect::<Vec<_>>();
1311            if pubkeys.is_empty() {
1312                continue;
1313            }
1314
1315            let filter = Filter::new()
1316                .authors(pubkeys)
1317                .kinds(self.config.kinds.iter().copied().map(Kind::from))
1318                .since(since);
1319
1320            if let Err(err) = self.client.subscribe(vec![filter], None).await {
1321                warn!(
1322                    "Nostr mirror author subscription failed: authors={} error={:#}",
1323                    chunk.len(),
1324                    err
1325                );
1326                continue;
1327            }
1328            subscribed_authors.extend(chunk.iter().cloned());
1329        }
1330        Ok(())
1331    }
1332
1333    fn ingest_live_event(&self, event: &Event) -> Result<()> {
1334        self.pending_live_events
1335            .lock()
1336            .expect("pending live events")
1337            .insert(event.id.to_hex(), event.clone());
1338        Ok(())
1339    }
1340
1341    async fn flush_live_events(&self) -> Result<()> {
1342        let pending = {
1343            let mut pending = self
1344                .pending_live_events
1345                .lock()
1346                .expect("pending live events");
1347            if pending.is_empty() {
1348                return Ok(());
1349            }
1350            std::mem::take(&mut *pending)
1351        };
1352        let events = pending.into_values().collect::<Vec<_>>();
1353        let event_count = events.len();
1354        let previous_event_root = self.graph_store.public_events_root()?;
1355        let previous_profile_search_root = self.graph_store.profile_search_root()?;
1356        let previous_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
1357
1358        socialgraph::ingest_parsed_events_with_storage_class(
1359            self.graph_store.as_ref(),
1360            &events,
1361            socialgraph::EventStorageClass::Public,
1362        )
1363        .context("ingest live mirrored event batch")?;
1364
1365        let next_event_root = self.graph_store.public_events_root()?;
1366        let next_profile_search_root = self.graph_store.profile_search_root()?;
1367        let next_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
1368        let event_root_changed = next_event_root != previous_event_root;
1369        let profile_search_root_changed = next_profile_search_root != previous_profile_search_root;
1370        let profiles_by_pubkey_root_changed =
1371            next_profiles_by_pubkey_root != previous_profiles_by_pubkey_root;
1372
1373        if event_root_changed {
1374            self.note_public_events_root_change()?;
1375        }
1376        if profile_search_root_changed {
1377            self.note_profile_search_root_change()?;
1378        }
1379        if profiles_by_pubkey_root_changed {
1380            self.note_profiles_by_pubkey_root_change()?;
1381        }
1382        if profile_search_root_changed {
1383            self.maybe_publish_profile_search_root(true).await?;
1384        }
1385        if profiles_by_pubkey_root_changed {
1386            self.maybe_publish_profiles_by_pubkey_root(true).await?;
1387        }
1388        if event_root_changed {
1389            self.maybe_publish_event_root(true).await?;
1390        }
1391        info!(
1392            "Nostr mirror flushed live events: events={} event_root_changed={} profile_search_root_changed={} profiles_by_pubkey_root_changed={}",
1393            event_count,
1394            event_root_changed,
1395            profile_search_root_changed,
1396            profiles_by_pubkey_root_changed
1397        );
1398        Ok(())
1399    }
1400
1401    fn note_public_events_root_change(&self) -> Result<()> {
1402        let root = self.graph_store.public_events_root()?;
1403        Self::note_root_change(
1404            self.config.published_event_tree_name.as_deref(),
1405            &self.event_publish_state,
1406            root,
1407        )
1408    }
1409
1410    fn note_profile_search_root_change(&self) -> Result<()> {
1411        let root = self.graph_store.profile_search_root()?;
1412        Self::note_root_change(
1413            self.config.published_profile_search_tree_name.as_deref(),
1414            &self.profile_search_publish_state,
1415            root,
1416        )
1417    }
1418
1419    fn note_profiles_by_pubkey_root_change(&self) -> Result<()> {
1420        let root = self.graph_store.profiles_by_pubkey_root()?;
1421        Self::note_root_change(
1422            self.config
1423                .published_profiles_by_pubkey_tree_name
1424                .as_deref(),
1425            &self.profiles_by_pubkey_publish_state,
1426            root,
1427        )
1428    }
1429
1430    fn note_root_change(
1431        tree_name: Option<&str>,
1432        publish_state: &Arc<Mutex<RootPublishState>>,
1433        root: Option<hashtree_core::Cid>,
1434    ) -> Result<()> {
1435        let Some(_tree_name) = tree_name else {
1436            return Ok(());
1437        };
1438
1439        let mut state = publish_state.lock().expect("root publish state");
1440        let now = Instant::now();
1441
1442        if state.pending_root == root {
1443            return Ok(());
1444        }
1445
1446        state.pending_root = root;
1447        state.last_upload_failed_at = None;
1448        state.last_upload_error = None;
1449        state.last_changed_at = Some(now);
1450        if state.dirty_since.is_none() {
1451            state.dirty_since = Some(now);
1452        }
1453        Ok(())
1454    }
1455
1456    async fn maybe_publish_event_root(&self, force: bool) -> Result<()> {
1457        self.ensure_public_events_root_is_publishable().await?;
1458        if self.take_missing_blob_event_upload_error() {
1459            return self.rebuild_event_indexes_after_missing_blobs(force).await;
1460        }
1461        let result = self
1462            .maybe_publish_root(
1463                self.config.published_event_tree_name.as_deref(),
1464                &self.event_publish_state,
1465                "event root",
1466                force,
1467            )
1468            .await;
1469        let Err(error) = result else {
1470            if self.take_missing_blob_event_upload_error() {
1471                return self.rebuild_event_indexes_after_missing_blobs(force).await;
1472            }
1473            return Ok(());
1474        };
1475        if !is_missing_local_blob_push_error(&error) {
1476            return Err(error);
1477        }
1478
1479        self.rebuild_event_indexes_after_missing_blobs(force).await
1480    }
1481
1482    fn take_missing_blob_event_upload_error(&self) -> bool {
1483        let mut state = self
1484            .event_publish_state
1485            .lock()
1486            .expect("event root publish state");
1487        if state.upload_in_progress_root.is_some() {
1488            return false;
1489        }
1490        if !state.missing_blob_rebuild_required {
1491            return false;
1492        }
1493        state.missing_blob_rebuild_required = false;
1494        state.last_upload_error = None;
1495        state.last_upload_failed_at = None;
1496        true
1497    }
1498
1499    async fn rebuild_event_indexes_after_missing_blobs(&self, force: bool) -> Result<()> {
1500        warn!(
1501            "Nostr mirror event root DAG references missing local blobs; rebuilding event indexes from stored events"
1502        );
1503        let (public_count, ambient_count) = self
1504            .graph_store
1505            .rebuild_event_indexes_from_stored_events_async()
1506            .await
1507            .context("rebuild event indexes after missing event blobs")?;
1508        info!(
1509            "Nostr mirror rebuilt event indexes after missing blobs: public={} ambient={}",
1510            public_count, ambient_count
1511        );
1512        self.sync_publish_roots_from_store()?;
1513
1514        self.maybe_publish_root(
1515            self.config.published_event_tree_name.as_deref(),
1516            &self.event_publish_state,
1517            "event root",
1518            force,
1519        )
1520        .await
1521    }
1522
1523    async fn ensure_public_events_root_is_publishable(&self) -> Result<()> {
1524        let Some(root) = self.graph_store.public_events_root()? else {
1525            return Ok(());
1526        };
1527        let event_store = NostrEventStore::new(self.store.store_arc());
1528        if let Err(err) = event_store.validate_index_root(Some(&root)).await {
1529            warn!(
1530                "Nostr mirror refusing to publish invalid event index root {}; clearing trusted root: {}",
1531                hex::encode(root.hash),
1532                err
1533            );
1534            self.graph_store.write_public_events_root(None)?;
1535            self.note_public_events_root_change()?;
1536        }
1537        Ok(())
1538    }
1539
1540    async fn maybe_publish_profile_search_root(&self, force: bool) -> Result<()> {
1541        self.maybe_publish_root(
1542            self.config.published_profile_search_tree_name.as_deref(),
1543            &self.profile_search_publish_state,
1544            "profile search root",
1545            force,
1546        )
1547        .await
1548    }
1549
1550    async fn maybe_publish_profiles_by_pubkey_root(&self, force: bool) -> Result<()> {
1551        self.maybe_publish_root(
1552            self.config
1553                .published_profiles_by_pubkey_tree_name
1554                .as_deref(),
1555            &self.profiles_by_pubkey_publish_state,
1556            "profiles-by-pubkey root",
1557            force,
1558        )
1559        .await
1560    }
1561
1562    async fn maybe_publish_root(
1563        &self,
1564        tree_name: Option<&str>,
1565        publish_state: &Arc<Mutex<RootPublishState>>,
1566        log_label: &str,
1567        force: bool,
1568    ) -> Result<()> {
1569        let Some(tree_name) = tree_name else {
1570            return Ok(());
1571        };
1572
1573        let pending_root = {
1574            let state = publish_state.lock().expect("root publish state");
1575            let Some(pending_root) = state.pending_root.clone() else {
1576                return Ok(());
1577            };
1578
1579            let now = Instant::now();
1580            let debounce_ready = state.last_changed_at.is_some_and(|changed_at| {
1581                now.duration_since(changed_at) >= MIRROR_ROOT_PUBLISH_DEBOUNCE
1582            });
1583            let stale_ready = state.dirty_since.is_some_and(|dirty_since| {
1584                now.duration_since(dirty_since) >= MIRROR_ROOT_PUBLISH_MAX_STALENESS
1585            });
1586            if !force && !debounce_ready && !stale_ready {
1587                return Ok(());
1588            }
1589
1590            pending_root
1591        };
1592
1593        let upload_started =
1594            self.maybe_start_background_root_upload(&pending_root, publish_state, log_label);
1595        let upload_required = !self.config.blossom_write_servers.is_empty();
1596        let upload_ready = {
1597            let state = publish_state.lock().expect("root publish state");
1598            !upload_required || state.last_uploaded_root.as_ref() == Some(&pending_root)
1599        };
1600        let publish_before_upload_ready = force && upload_required && !upload_ready;
1601
1602        let mut successful_relays = Vec::new();
1603        let mut failed_relays = Vec::new();
1604        let mut published_now = false;
1605        let publish_required =
1606            self.publish_client.is_some() && !self.config.publish_relays.is_empty();
1607        if publish_required {
1608            let Some(publish_client) = self.publish_client.as_ref() else {
1609                unreachable!("publish_required implies publish_client");
1610            };
1611            if !self.has_connected_publish_relay().await {
1612                return Ok(());
1613            }
1614            if !upload_ready && !publish_before_upload_ready {
1615                if upload_started {
1616                    info!(
1617                        "Nostr mirror uploading {} DAG before publish: tree={} hash={}",
1618                        log_label,
1619                        tree_name,
1620                        hex::encode(pending_root.hash),
1621                    );
1622                }
1623                return Ok(());
1624            }
1625            if publish_before_upload_ready {
1626                info!(
1627                    "Nostr mirror publishing {} before Blossom upload completes: tree={} hash={}",
1628                    log_label,
1629                    tree_name,
1630                    hex::encode(pending_root.hash),
1631                );
1632            }
1633
1634            let already_published = {
1635                let state = publish_state.lock().expect("root publish state");
1636                state.last_published_root.as_ref() == Some(&pending_root)
1637            };
1638            if !already_published {
1639                let publish_relays = self.config.publish_relays.clone();
1640                let latest_known_created_at = {
1641                    let state = publish_state.lock().expect("root publish state");
1642                    state.last_published_created_at
1643                };
1644                let publish_created_at = next_replaceable_created_at(
1645                    Timestamp::now(),
1646                    later_timestamp(
1647                        latest_known_created_at,
1648                        self.latest_root_event_created_at(tree_name).await,
1649                    ),
1650                );
1651                let event = publish_client
1652                    .sign_event_builder(Self::build_public_root_event(
1653                        tree_name,
1654                        &pending_root,
1655                        publish_created_at,
1656                    ))
1657                    .await
1658                    .with_context(|| format!("sign {log_label} event"))?;
1659                let publish_result = self
1660                    .publish_root_event_to_relays(publish_client, &publish_relays, &event)
1661                    .await
1662                    .with_context(|| format!("publish {log_label} event"))?;
1663                successful_relays = publish_result.0;
1664                failed_relays = publish_result.1;
1665                if successful_relays.is_empty() {
1666                    let failure_summary = if failed_relays.is_empty() {
1667                        "no publish relays accepted the event".to_string()
1668                    } else {
1669                        failed_relays.join("; ")
1670                    };
1671                    anyhow::bail!("no publish relays accepted the event ({failure_summary})");
1672                }
1673
1674                let mut state = publish_state.lock().expect("root publish state");
1675                if state.pending_root.as_ref() == Some(&pending_root) {
1676                    state.last_published_root = Some(pending_root.clone());
1677                    state.last_published_at = Some(Instant::now());
1678                    state.last_published_created_at = Some(event.created_at);
1679                }
1680                published_now = true;
1681            }
1682        }
1683
1684        {
1685            let mut state = publish_state.lock().expect("root publish state");
1686            if state.pending_root.as_ref() == Some(&pending_root) {
1687                let upload_satisfied = self.config.blossom_write_servers.is_empty()
1688                    || state.last_uploaded_root.as_ref() == Some(&pending_root);
1689                let publish_satisfied =
1690                    !publish_required || state.last_published_root.as_ref() == Some(&pending_root);
1691                if upload_satisfied && publish_satisfied {
1692                    state.dirty_since = None;
1693                }
1694            }
1695        }
1696
1697        if published_now {
1698            info!(
1699                "Nostr mirror published {}: tree={} hash={} relays={:?}",
1700                log_label,
1701                tree_name,
1702                hex::encode(pending_root.hash),
1703                successful_relays,
1704            );
1705        }
1706        if !failed_relays.is_empty() {
1707            warn!(
1708                "Nostr mirror publish had relay failures: tree={} failures={:?}",
1709                tree_name, failed_relays
1710            );
1711        }
1712        Ok(())
1713    }
1714
1715    fn maybe_start_background_root_upload(
1716        &self,
1717        pending_root: &hashtree_core::Cid,
1718        publish_state: &Arc<Mutex<RootPublishState>>,
1719        log_label: &str,
1720    ) -> bool {
1721        if self.config.blossom_write_servers.is_empty() {
1722            return false;
1723        }
1724
1725        {
1726            let mut state = publish_state.lock().expect("root publish state");
1727            if state.last_uploaded_root.as_ref() == Some(pending_root)
1728                || state.upload_in_progress_root.is_some()
1729            {
1730                return false;
1731            }
1732            if state
1733                .last_upload_failed_at
1734                .is_some_and(|failed_at| failed_at.elapsed() < MIRROR_ROOT_UPLOAD_RETRY_INTERVAL)
1735            {
1736                return false;
1737            }
1738            state.upload_in_progress_root = Some(pending_root.clone());
1739        }
1740
1741        let store = Arc::clone(&self.store);
1742        let servers = self.config.blossom_write_servers.clone();
1743        let root = pending_root.clone();
1744        let root_string = pending_root.to_string();
1745        let publish_state = Arc::clone(publish_state);
1746        let log_label = log_label.to_string();
1747        tokio::task::spawn_blocking(move || {
1748            let runtime = tokio::runtime::Builder::new_current_thread()
1749                .enable_all()
1750                .build()
1751                .expect("build nostr mirror root upload runtime");
1752            runtime.block_on(async move {
1753                let result =
1754                    background_blossom_push_with_store(store, &root_string, &servers).await;
1755                let mut state = publish_state.lock().expect("root publish state");
1756                if state.upload_in_progress_root.as_ref() == Some(&root) {
1757                    state.upload_in_progress_root = None;
1758                }
1759                match result {
1760                    Ok(()) => {
1761                        if state.pending_root.as_ref() == Some(&root) {
1762                            state.last_uploaded_root = Some(root.clone());
1763                            state.last_uploaded_at = Some(Instant::now());
1764                            state.last_upload_failed_at = None;
1765                            state.last_upload_error = None;
1766                            state.missing_blob_rebuild_required = false;
1767                        }
1768                        info!(
1769                            "Nostr mirror uploaded {} DAG to Blossom: hash={}",
1770                            log_label,
1771                            hex::encode(root.hash)
1772                        );
1773                    }
1774                    Err(err) => {
1775                        if state.pending_root.as_ref() == Some(&root) {
1776                            state.last_upload_failed_at = Some(Instant::now());
1777                            state.last_upload_error = Some(format!("{err:#}"));
1778                        }
1779                        if is_missing_local_blob_message(&format!("{err:#}")) {
1780                            state.missing_blob_rebuild_required = true;
1781                        }
1782                        warn!(
1783                            "Nostr mirror {} DAG upload failed: hash={} error={:#}",
1784                            log_label,
1785                            hex::encode(root.hash),
1786                            err
1787                        );
1788                    }
1789                }
1790            });
1791        });
1792
1793        true
1794    }
1795
1796    async fn publish_root_event_to_relays(
1797        &self,
1798        publish_client: &Client,
1799        relays: &[String],
1800        event: &Event,
1801    ) -> Result<(Vec<String>, Vec<String>)> {
1802        let mut successful_relays = Vec::new();
1803        let mut failed_relays = Vec::new();
1804
1805        match publish_client
1806            .send_event_to(relays.iter().map(|relay| relay.as_str()), event.clone())
1807            .await
1808        {
1809            Ok(output) => {
1810                for relay in relays {
1811                    let relay_url = relay.trim_end_matches('/');
1812                    if output
1813                        .success
1814                        .iter()
1815                        .any(|url| url.as_str().trim_end_matches('/') == relay_url)
1816                    {
1817                        successful_relays.push(relay.clone());
1818                    }
1819                }
1820                failed_relays.extend(output.failed.into_iter().map(|(url, reason)| match reason {
1821                    Some(reason) => format!("{url}: {reason}"),
1822                    None => format!("{url}: relay rejected publish"),
1823                }));
1824            }
1825            Err(err) => {
1826                failed_relays.push(format!("publish relays: {err}"));
1827            }
1828        }
1829
1830        Ok((successful_relays, failed_relays))
1831    }
1832
1833    async fn latest_root_event_created_at(&self, tree_name: &str) -> Option<Timestamp> {
1834        let publish_client = self.publish_client.as_ref()?;
1835        let author = self.publish_pubkey?;
1836        let events = publish_client
1837            .get_events_of(
1838                vec![Self::build_public_root_filter(author, tree_name)],
1839                EventSource::relays(Some(self.config.fetch_timeout)),
1840            )
1841            .await
1842            .ok()?;
1843        events
1844            .iter()
1845            .filter(|event| Self::matches_public_root_event(event, tree_name))
1846            .max_by_key(|event| (event.created_at, event.id))
1847            .map(|event| event.created_at)
1848    }
1849
1850    fn build_public_root_filter(author: PublicKey, tree_name: &str) -> Filter {
1851        Filter::new()
1852            .kind(Kind::Custom(30078))
1853            .author(author)
1854            .custom_tag(
1855                SingleLetterTag::lowercase(Alphabet::D),
1856                vec![tree_name.to_string()],
1857            )
1858            .custom_tag(
1859                SingleLetterTag::lowercase(Alphabet::L),
1860                vec!["hashtree".to_string()],
1861            )
1862            .limit(50)
1863    }
1864
1865    fn matches_public_root_event(event: &Event, tree_name: &str) -> bool {
1866        event.kind == Kind::Custom(30078)
1867            && event.tags.iter().any(|tag| {
1868                let values = tag.as_slice();
1869                values.first().is_some_and(|value| value == "d")
1870                    && values.get(1).is_some_and(|value| value == tree_name)
1871            })
1872            && event.tags.iter().any(|tag| {
1873                let values = tag.as_slice();
1874                values.first().is_some_and(|value| value == "l")
1875                    && values.get(1).is_some_and(|value| value == "hashtree")
1876            })
1877    }
1878
1879    fn build_public_root_event(
1880        tree_name: &str,
1881        cid: &hashtree_core::Cid,
1882        created_at: Timestamp,
1883    ) -> EventBuilder {
1884        let mut tags = vec![
1885            Tag::identifier(tree_name.to_string()),
1886            Tag::custom(
1887                TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
1888                vec!["hashtree"],
1889            ),
1890            Tag::custom(TagKind::Custom("hash".into()), vec![hex::encode(cid.hash)]),
1891        ];
1892        if let Some(key) = cid.key {
1893            tags.push(Tag::custom(
1894                TagKind::Custom("key".into()),
1895                vec![hex::encode(key)],
1896            ));
1897        }
1898
1899        EventBuilder::new(Kind::Custom(30078), "", tags).custom_created_at(created_at)
1900    }
1901}
1902
1903fn is_missing_local_blob_push_error(error: &anyhow::Error) -> bool {
1904    error
1905        .chain()
1906        .any(|cause| cause.to_string().contains(MISSING_LOCAL_BLOB_PUSH_ERROR))
1907}
1908
1909fn is_missing_local_blob_message(message: &str) -> bool {
1910    message.contains(MISSING_LOCAL_BLOB_PUSH_ERROR)
1911}
1912
1913fn later_timestamp(left: Option<Timestamp>, right: Option<Timestamp>) -> Option<Timestamp> {
1914    match (left, right) {
1915        (Some(left), Some(right)) => Some(std::cmp::max(left, right)),
1916        (Some(left), None) => Some(left),
1917        (None, Some(right)) => Some(right),
1918        (None, None) => None,
1919    }
1920}
1921
1922fn next_replaceable_created_at(now: Timestamp, latest_existing: Option<Timestamp>) -> Timestamp {
1923    match latest_existing {
1924        Some(latest) if latest >= now => Timestamp::from_secs(latest.as_u64().saturating_add(1)),
1925        _ => now,
1926    }
1927}
1928
1929#[cfg(test)]
1930mod tests;