Skip to main content

hashtree_cli/
nostr_mirror.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::sync::Arc;
3use std::sync::Mutex;
4use std::time::Duration;
5use std::time::Instant;
6
7use anyhow::{Context, Result};
8use hashtree_nostr::{
9    CrawlConfig, CrawlReport, ListEventsOptions, NostrBridge, NostrEventStore, RelayFetchMode,
10};
11use nostr::{
12    Alphabet, Event, EventBuilder, Filter, Kind, PublicKey, SingleLetterTag, Tag, TagKind,
13    Timestamp,
14};
15use nostr_sdk::{
16    pool::RelayLimits, prelude::RelayPoolNotification, Client, EventSource, Keys, Options,
17    RelayStatus,
18};
19use tokio::sync::watch;
20use tracing::{debug, info, warn};
21
22use crate::blossom_push::background_blossom_push;
23use crate::socialgraph::crawler::SOCIALGRAPH_RELAY_EVENT_MAX_SIZE;
24use crate::socialgraph::{self, SocialGraphBackend, SocialGraphStore};
25use crate::HashtreeStore;
26
27#[cfg(not(test))]
28const MIRROR_STARTUP_DELAY: Duration = Duration::from_secs(8);
29#[cfg(test)]
30const MIRROR_STARTUP_DELAY: Duration = Duration::from_millis(50);
31
32#[cfg(not(test))]
33const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_secs(1);
34#[cfg(test)]
35const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_millis(250);
36
37#[cfg(not(test))]
38const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
39#[cfg(test)]
40const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_millis(100);
41
42#[cfg(not(test))]
43const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_secs(30);
44#[cfg(test)]
45const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_millis(100);
46
47const DEFAULT_HISTORY_KINDS: [u16; 6] = [0, 1, 3, 6, 7, 9735];
48const DEFAULT_EVENT_TREE_NAME: &str = "nostr-event-index";
49const DEFAULT_PROFILE_SEARCH_TREE_NAME: &str = "profile-search";
50const DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME: &str = "profiles-by-pubkey";
51const METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 1;
52const METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE: usize = 64;
53const LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER: usize = 8;
54const LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 16;
55const LARGE_HISTORY_SYNC_MAX_RELAY_PAGES: usize = 20;
56
57#[cfg(not(test))]
58const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_secs(300);
59#[cfg(test)]
60const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_millis(100);
61
62#[cfg(not(test))]
63const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_secs(5);
64#[cfg(test)]
65const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_millis(20);
66
67#[cfg(not(test))]
68const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_secs(30);
69#[cfg(test)]
70const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_millis(100);
71
72#[derive(Debug, Clone)]
73pub struct NostrMirrorConfig {
74    pub relays: Vec<String>,
75    pub publish_relays: Vec<String>,
76    pub blossom_write_servers: Vec<String>,
77    pub max_follow_distance: u32,
78    pub overmute_threshold: f64,
79    pub author_batch_size: usize,
80    pub history_sync_author_chunk_size: usize,
81    pub history_sync_per_author_event_limit: usize,
82    pub missing_profile_backfill_batch_size: usize,
83    pub fetch_timeout: Duration,
84    pub relay_event_max_size: Option<u32>,
85    pub require_negentropy: bool,
86    pub kinds: Vec<u16>,
87    pub history_sync_on_start: bool,
88    pub history_sync_on_reconnect: bool,
89    pub published_event_tree_name: Option<String>,
90    pub published_profile_search_tree_name: Option<String>,
91    pub published_profiles_by_pubkey_tree_name: Option<String>,
92}
93
94impl Default for NostrMirrorConfig {
95    fn default() -> Self {
96        Self {
97            relays: Vec::new(),
98            publish_relays: Vec::new(),
99            blossom_write_servers: Vec::new(),
100            max_follow_distance: 2,
101            overmute_threshold: 1.0,
102            author_batch_size: 256,
103            history_sync_author_chunk_size: 5_000,
104            history_sync_per_author_event_limit: 256,
105            missing_profile_backfill_batch_size: 5_000,
106            fetch_timeout: Duration::from_secs(15),
107            relay_event_max_size: Some(SOCIALGRAPH_RELAY_EVENT_MAX_SIZE),
108            require_negentropy: false,
109            kinds: DEFAULT_HISTORY_KINDS.to_vec(),
110            history_sync_on_start: true,
111            history_sync_on_reconnect: true,
112            published_event_tree_name: Some(DEFAULT_EVENT_TREE_NAME.to_string()),
113            published_profile_search_tree_name: Some(DEFAULT_PROFILE_SEARCH_TREE_NAME.to_string()),
114            published_profiles_by_pubkey_tree_name: Some(
115                DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME.to_string(),
116            ),
117        }
118    }
119}
120
121#[derive(Debug, Default)]
122struct RootPublishState {
123    pending_root: Option<hashtree_core::Cid>,
124    last_changed_at: Option<Instant>,
125    dirty_since: Option<Instant>,
126    last_published_root: Option<hashtree_core::Cid>,
127    last_published_at: Option<Instant>,
128    last_published_created_at: Option<Timestamp>,
129    last_uploaded_root: Option<hashtree_core::Cid>,
130    last_uploaded_at: Option<Instant>,
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134struct HistorySyncPlan {
135    relay_fetch_mode: RelayFetchMode,
136    author_batch_size: usize,
137    per_author_event_limit: usize,
138    relay_page_size: usize,
139    max_relay_pages: usize,
140}
141
142pub struct BackgroundNostrMirror {
143    config: NostrMirrorConfig,
144    store: Arc<HashtreeStore>,
145    graph_store: Arc<SocialGraphStore>,
146    client: Client,
147    publish_client: Option<Client>,
148    publish_pubkey: Option<PublicKey>,
149    event_publish_state: Mutex<RootPublishState>,
150    profile_search_publish_state: Mutex<RootPublishState>,
151    profiles_by_pubkey_publish_state: Mutex<RootPublishState>,
152    pending_live_events: Mutex<BTreeMap<String, Event>>,
153    missing_profile_cursor: Mutex<usize>,
154    shutdown_tx: watch::Sender<bool>,
155    shutdown_rx: watch::Receiver<bool>,
156}
157
158impl BackgroundNostrMirror {
159    pub async fn new(
160        config: NostrMirrorConfig,
161        store: Arc<HashtreeStore>,
162        graph_store: Arc<SocialGraphStore>,
163        publish_keys: Option<Keys>,
164    ) -> Result<Self> {
165        let client = if let Some(max_size) = config.relay_event_max_size {
166            let mut limits = RelayLimits::default();
167            limits.events.max_size = Some(max_size);
168            Client::with_opts(Keys::generate(), Options::new().relay_limits(limits))
169        } else {
170            Client::new(Keys::generate())
171        };
172        for relay in &config.relays {
173            client
174                .add_relay(relay)
175                .await
176                .with_context(|| format!("add mirror relay {relay}"))?;
177        }
178        client.connect().await;
179
180        let publish_pubkey = publish_keys.as_ref().map(Keys::public_key);
181        let publish_client = if let Some(keys) = publish_keys {
182            if config.publish_relays.is_empty() {
183                None
184            } else {
185                let client = Client::new(keys);
186                for relay in &config.publish_relays {
187                    client
188                        .add_relay(relay)
189                        .await
190                        .with_context(|| format!("add mirror publish relay {relay}"))?;
191                }
192                client.connect().await;
193                Some(client)
194            }
195        } else {
196            None
197        };
198
199        let (shutdown_tx, shutdown_rx) = watch::channel(false);
200        Ok(Self {
201            config,
202            store,
203            graph_store,
204            client,
205            publish_client,
206            publish_pubkey,
207            event_publish_state: Mutex::new(RootPublishState::default()),
208            profile_search_publish_state: Mutex::new(RootPublishState::default()),
209            profiles_by_pubkey_publish_state: Mutex::new(RootPublishState::default()),
210            pending_live_events: Mutex::new(BTreeMap::new()),
211            missing_profile_cursor: Mutex::new(0),
212            shutdown_tx,
213            shutdown_rx,
214        })
215    }
216
217    pub fn shutdown(&self) {
218        let _ = self.shutdown_tx.send(true);
219    }
220
221    fn sync_publish_roots_from_store(&self) -> Result<()> {
222        self.note_public_events_root_change()?;
223        self.note_profile_search_root_change()?;
224        self.note_profiles_by_pubkey_root_change()?;
225        Ok(())
226    }
227
228    async fn publish_pending_roots(
229        &self,
230        force_event: bool,
231        force_profile_search: bool,
232        force_profiles_by_pubkey: bool,
233    ) -> (Result<()>, Result<()>, Result<()>) {
234        tokio::join!(
235            self.maybe_publish_event_root(force_event),
236            self.maybe_publish_profile_search_root(force_profile_search),
237            self.maybe_publish_profiles_by_pubkey_root(force_profiles_by_pubkey),
238        )
239    }
240
241    pub async fn run(&self) -> Result<()> {
242        if self.config.relays.is_empty() || self.config.max_follow_distance == 0 {
243            return Ok(());
244        }
245
246        info!(
247            "Nostr mirror starting: relays={} max_follow_distance={} negentropy_only={} kinds={:?} history_sync_author_chunk_size={} history_sync_on_start={} history_sync_on_reconnect={}",
248            self.config.relays.len(),
249            self.config.max_follow_distance,
250            self.config.require_negentropy,
251            self.config.kinds,
252            self.config.history_sync_author_chunk_size.max(1),
253            self.config.history_sync_on_start,
254            self.config.history_sync_on_reconnect
255        );
256
257        tokio::time::sleep(MIRROR_STARTUP_DELAY).await;
258        tokio::time::sleep(MIRROR_CONNECT_SETTLE_DELAY).await;
259        let live_since = Timestamp::now();
260        self.sync_publish_roots_from_store()?;
261
262        let initial_authors = self.collect_authors()?;
263        if initial_authors.is_empty() {
264            info!("Nostr mirror: no social-graph authors to mirror yet");
265        } else if self.config.history_sync_on_start {
266            if self.should_backfill_missing_profiles(None) {
267                let missing_profile_authors = self.collect_missing_profile_authors(
268                    self.config.missing_profile_backfill_batch_size,
269                )?;
270                if !missing_profile_authors.is_empty() {
271                    info!(
272                        "Nostr mirror missing-profile backfill starting: authors={}",
273                        missing_profile_authors.len()
274                    );
275                    self.history_sync_authors_with_kinds(
276                        missing_profile_authors,
277                        &[Kind::Metadata.as_u16()],
278                    )
279                    .await?;
280                }
281            }
282            self.history_sync_authors(initial_authors.clone()).await?;
283        }
284
285        let mut subscribed_authors = HashSet::new();
286        self.subscribe_authors_since(&initial_authors, live_since, &mut subscribed_authors)
287            .await?;
288
289        let mut relay_statuses = self.capture_relay_statuses().await;
290        let mut last_reconnect_history_sync_at: Option<Instant> = None;
291        let mut last_missing_profile_backfill_at: Option<Instant> = None;
292        let mut notifications = self.client.notifications();
293        let mut shutdown_rx = self.shutdown_rx.clone();
294        let mut refresh_interval = tokio::time::interval(MIRROR_AUTHOR_REFRESH_INTERVAL);
295        let mut publish_interval = tokio::time::interval(MIRROR_ROOT_PUBLISH_DEBOUNCE);
296
297        loop {
298            tokio::select! {
299                _ = shutdown_rx.changed() => {
300                    if *shutdown_rx.borrow() {
301                        break;
302                    }
303                }
304                _ = refresh_interval.tick() => {
305                    let authors = self.collect_authors()?;
306                    let new_authors = authors
307                        .into_iter()
308                        .filter(|author| !subscribed_authors.contains(author))
309                        .collect::<Vec<_>>();
310                    if !new_authors.is_empty() {
311                        debug!(
312                            "Nostr mirror discovered {} newly reachable author(s)",
313                            new_authors.len()
314                        );
315                        self.history_sync_authors(new_authors.clone()).await?;
316                        self.subscribe_authors_since(
317                            &new_authors,
318                            Timestamp::now(),
319                            &mut subscribed_authors,
320                        )
321                        .await?;
322                    }
323                    if self.should_backfill_missing_profiles(last_missing_profile_backfill_at) {
324                        let missing_profile_authors = self.collect_missing_profile_authors(
325                            self.config.missing_profile_backfill_batch_size,
326                        )?;
327                        if !missing_profile_authors.is_empty() {
328                            info!(
329                                "Nostr mirror missing-profile backfill starting: authors={}",
330                                missing_profile_authors.len()
331                            );
332                            self.history_sync_authors_with_kinds(
333                                missing_profile_authors,
334                                &[Kind::Metadata.as_u16()],
335                            )
336                            .await?;
337                            last_missing_profile_backfill_at = Some(Instant::now());
338                        }
339                    }
340                }
341                _ = publish_interval.tick() => {
342                    self.sync_publish_roots_from_store()?;
343                    if let Err(err) = self.flush_live_events().await {
344                        warn!("Nostr mirror live event flush failed: {:#}", err);
345                    }
346                    let (event_result, profile_search_result, profiles_by_pubkey_result) = self
347                        .publish_pending_roots(false, false, false)
348                        .await;
349                    if let Err(err) = event_result {
350                        warn!("Nostr mirror event-root publish failed: {:#}", err);
351                    }
352                    if let Err(err) = profile_search_result {
353                        warn!("Nostr mirror profile-search publish failed: {:#}", err);
354                    }
355                    if let Err(err) = profiles_by_pubkey_result {
356                        warn!("Nostr mirror profiles-by-pubkey publish failed: {:#}", err);
357                    }
358                }
359                notification = notifications.recv() => {
360                    match notification {
361                        Ok(RelayPoolNotification::Event { event, .. }) => {
362                            self.ingest_live_event(&event)?;
363                        }
364                        Ok(RelayPoolNotification::RelayStatus { relay_url, status }) => {
365                            let relay_url = relay_url.to_string();
366                            let previous = relay_statuses.insert(relay_url.clone(), status);
367                            if Self::should_history_sync_on_reconnect(
368                                self.config.history_sync_on_reconnect,
369                                previous,
370                                status,
371                            ) && Self::should_run_reconnect_history_sync(
372                                    last_reconnect_history_sync_at.as_ref(),
373                                )
374                            {
375                                let authors = self.collect_authors()?;
376                                if !authors.is_empty() {
377                                    info!(
378                                        "Nostr mirror relay reconnected; running catch-up history sync: relay={} authors={} negentropy_only={}",
379                                        relay_url,
380                                        authors.len(),
381                                        self.config.require_negentropy
382                                    );
383                                    self.history_sync_authors(authors).await?;
384                                    last_reconnect_history_sync_at = Some(Instant::now());
385                                }
386                            }
387                        }
388                        Ok(RelayPoolNotification::Shutdown) => break,
389                        Ok(_) => {}
390                        Err(err) => {
391                            warn!("Nostr mirror notification error: {}", err);
392                            break;
393                        }
394                    }
395                }
396            }
397        }
398
399        if let Err(err) = self.flush_live_events().await {
400            warn!(
401                "Nostr mirror live event flush failed during shutdown: {:#}",
402                err
403            );
404        }
405        if let Err(err) = self.sync_publish_roots_from_store() {
406            warn!(
407                "Nostr mirror root-state refresh failed during shutdown: {:#}",
408                err
409            );
410        }
411        let (event_result, profile_search_result, profiles_by_pubkey_result) =
412            self.publish_pending_roots(true, true, true).await;
413        if let Err(err) = event_result {
414            warn!(
415                "Nostr mirror event-root publish failed during shutdown: {:#}",
416                err
417            );
418        }
419        if let Err(err) = profile_search_result {
420            warn!(
421                "Nostr mirror profile-search publish failed during shutdown: {:#}",
422                err
423            );
424        }
425        if let Err(err) = profiles_by_pubkey_result {
426            warn!(
427                "Nostr mirror profiles-by-pubkey publish failed during shutdown: {:#}",
428                err
429            );
430        }
431        let _ = self.client.disconnect().await;
432        if let Some(client) = self.publish_client.as_ref() {
433            let _ = client.disconnect().await;
434        }
435        Ok(())
436    }
437
438    async fn capture_relay_statuses(&self) -> HashMap<String, RelayStatus> {
439        let mut statuses = HashMap::new();
440        for (relay_url, relay) in self.client.relays().await {
441            statuses.insert(relay_url.to_string(), relay.status().await);
442        }
443        statuses
444    }
445
446    async fn has_connected_publish_relay(&self) -> bool {
447        let Some(client) = self.publish_client.as_ref() else {
448            return false;
449        };
450        Self::client_has_connected_relay(client).await
451    }
452
453    async fn client_has_connected_relay(client: &Client) -> bool {
454        for (_relay_url, relay) in client.relays().await {
455            if relay.status().await == RelayStatus::Connected {
456                return true;
457            }
458        }
459        false
460    }
461
462    fn collect_authors(&self) -> Result<Vec<String>> {
463        let mut authors = Vec::new();
464        let mut seen = HashSet::new();
465        for distance in 0..=self.config.max_follow_distance {
466            for pubkey in socialgraph::SocialGraphBackend::users_by_follow_distance(
467                self.graph_store.as_ref(),
468                distance,
469            )
470            .with_context(|| format!("load social-graph distance {distance}"))?
471            {
472                if self
473                    .graph_store
474                    .is_overmuted_user(&pubkey, self.config.overmute_threshold)?
475                {
476                    continue;
477                }
478                let hex = hex::encode(pubkey);
479                if seen.insert(hex.clone()) {
480                    authors.push(hex);
481                }
482            }
483        }
484        Ok(authors)
485    }
486
487    fn collect_missing_profile_authors(&self, limit: usize) -> Result<Vec<String>> {
488        if limit == 0 {
489            return Ok(Vec::new());
490        }
491
492        let authors = self.collect_authors()?;
493        if authors.is_empty() {
494            return Ok(Vec::new());
495        }
496
497        let mut cursor = self
498            .missing_profile_cursor
499            .lock()
500            .expect("missing profile cursor");
501        let mut index = (*cursor).min(authors.len());
502        let mut scanned = 0usize;
503        let mut missing = Vec::new();
504
505        while scanned < authors.len() && missing.len() < limit {
506            let author = &authors[index];
507            if self.graph_store.latest_profile_event(author)?.is_none() {
508                missing.push(author.clone());
509            }
510            index += 1;
511            if index == authors.len() {
512                index = 0;
513            }
514            scanned += 1;
515        }
516
517        *cursor = index;
518        Ok(missing)
519    }
520
521    fn should_backfill_missing_profiles(&self, last_run: Option<Instant>) -> bool {
522        if self.config.missing_profile_backfill_batch_size == 0
523            || !self.config.kinds.contains(&Kind::Metadata.as_u16())
524        {
525            return false;
526        }
527        match last_run {
528            Some(last_run) => last_run.elapsed() >= MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL,
529            None => true,
530        }
531    }
532
533    fn should_history_sync_on_reconnect(
534        history_sync_on_reconnect: bool,
535        previous: Option<RelayStatus>,
536        status: RelayStatus,
537    ) -> bool {
538        history_sync_on_reconnect
539            && status == RelayStatus::Connected
540            && matches!(
541                previous,
542                Some(
543                    RelayStatus::Initialized
544                        | RelayStatus::Pending
545                        | RelayStatus::Connecting
546                        | RelayStatus::Disconnected
547                        | RelayStatus::Terminated
548                )
549            )
550    }
551
552    fn should_run_reconnect_history_sync(last_run: Option<&Instant>) -> bool {
553        match last_run {
554            None => true,
555            Some(last_run) => last_run.elapsed() >= MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN,
556        }
557    }
558
559    fn is_metadata_only_history_sync(kinds: &[u16]) -> bool {
560        !kinds.is_empty() && kinds.iter().all(|kind| *kind == Kind::Metadata.as_u16())
561    }
562
563    fn history_sync_plan_for(
564        config: &NostrMirrorConfig,
565        authors: usize,
566        kinds: &[u16],
567    ) -> HistorySyncPlan {
568        let author_batch_size = config.author_batch_size.max(1);
569        let per_author_event_limit = config.history_sync_per_author_event_limit.max(1);
570        let relay_page_size = 1_000;
571        let max_relay_pages = 10;
572
573        if Self::is_metadata_only_history_sync(kinds) {
574            return HistorySyncPlan {
575                relay_fetch_mode: RelayFetchMode::AuthorBatches,
576                author_batch_size: author_batch_size.min(METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE),
577                per_author_event_limit: METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT,
578                relay_page_size,
579                max_relay_pages,
580            };
581        }
582
583        if authors > author_batch_size.saturating_mul(LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER) {
584            return HistorySyncPlan {
585                relay_fetch_mode: RelayFetchMode::GlobalRecent,
586                author_batch_size,
587                per_author_event_limit: per_author_event_limit
588                    .min(LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT)
589                    .max(1),
590                relay_page_size,
591                max_relay_pages: LARGE_HISTORY_SYNC_MAX_RELAY_PAGES,
592            };
593        }
594
595        HistorySyncPlan {
596            relay_fetch_mode: RelayFetchMode::AuthorBatches,
597            author_batch_size,
598            per_author_event_limit,
599            relay_page_size,
600            max_relay_pages,
601        }
602    }
603
604    fn history_sync_plan(&self, authors: usize, kinds: &[u16]) -> HistorySyncPlan {
605        Self::history_sync_plan_for(&self.config, authors, kinds)
606    }
607
608    async fn history_sync_authors(&self, authors: Vec<String>) -> Result<()> {
609        self.history_sync_authors_with_kinds(authors, &self.config.kinds)
610            .await
611    }
612
613    async fn history_sync_authors_with_kinds(
614        &self,
615        authors: Vec<String>,
616        kinds: &[u16],
617    ) -> Result<()> {
618        self.history_sync_authors_chunked(authors, |current_root, author_chunk| async move {
619            self.history_sync_author_chunk(current_root, author_chunk, kinds)
620                .await
621        })
622        .await
623    }
624
625    async fn history_sync_authors_chunked<F, Fut>(
626        &self,
627        authors: Vec<String>,
628        mut run_chunk: F,
629    ) -> Result<()>
630    where
631        F: FnMut(Option<hashtree_core::Cid>, Vec<String>) -> Fut,
632        Fut: std::future::Future<Output = Result<CrawlReport>>,
633    {
634        if authors.is_empty() {
635            return Ok(());
636        }
637
638        info!(
639            "Nostr mirror history sync starting: authors={} relays={} negentropy_only={}",
640            authors.len(),
641            self.config.relays.len(),
642            self.config.require_negentropy
643        );
644
645        let mut current_root = self.graph_store.public_events_root()?;
646        let mut last_error = None;
647        let mut applied_chunks = 0usize;
648        let mut failed_chunks = 0usize;
649        let chunk_size = self.config.history_sync_author_chunk_size.max(1);
650        let total_chunks = authors.len().div_ceil(chunk_size);
651
652        for (chunk_index, author_chunk) in authors.chunks(chunk_size).enumerate() {
653            let author_chunk = author_chunk.to_vec();
654            let author_count = author_chunk.len();
655            info!(
656                "Nostr mirror history sync chunk starting: chunk={}/{} authors={}",
657                chunk_index + 1,
658                total_chunks,
659                author_count
660            );
661            let report = match run_chunk(current_root.clone(), author_chunk).await {
662                Ok(report) => report,
663                Err(err) => {
664                    failed_chunks = failed_chunks.saturating_add(1);
665                    warn!(
666                        "Nostr mirror history sync chunk failed: chunk={}/{} authors={} error={:#}",
667                        chunk_index + 1,
668                        total_chunks,
669                        author_count,
670                        err
671                    );
672                    last_error = Some(err);
673                    continue;
674                }
675            };
676
677            if report.root != current_root {
678                self.apply_history_root(report.root.as_ref()).await?;
679                current_root = report.root.clone();
680                info!(
681                    "Nostr mirror history sync updated trusted root: chunk={}/{} authors_processed={} events_selected={} events_seen={}",
682                    chunk_index + 1,
683                    total_chunks,
684                    report.authors_processed,
685                    report.events_selected,
686                    report.events_seen
687                );
688            }
689            applied_chunks = applied_chunks.saturating_add(1);
690        }
691
692        if applied_chunks == 0 {
693            return Err(last_error
694                .unwrap_or_else(|| anyhow::anyhow!("mirror history sync made no progress"))
695                .context("run mirror history sync"));
696        }
697        if failed_chunks > 0 {
698            warn!(
699                "Nostr mirror history sync completed with skipped chunks: applied_chunks={} failed_chunks={}",
700                applied_chunks,
701                failed_chunks
702            );
703        }
704        Ok(())
705    }
706
707    async fn history_sync_author_chunk(
708        &self,
709        current_root: Option<hashtree_core::Cid>,
710        authors: Vec<String>,
711        kinds: &[u16],
712    ) -> Result<CrawlReport> {
713        let mut last_error = None;
714        let mut report = None;
715        let plan = self.history_sync_plan(authors.len(), kinds);
716        for attempt in 0..3 {
717            let mut last_logged_authors = 0usize;
718            let bridge = NostrBridge::new(
719                self.store.store_arc(),
720                CrawlConfig {
721                    relays: self.config.relays.clone(),
722                    author_allowlist: Some(authors.clone()),
723                    max_live_bytes: None,
724                    max_events_seen: None,
725                    max_authors: None,
726                    max_follow_distance: None,
727                    author_batch_size: plan.author_batch_size,
728                    per_author_event_limit: plan.per_author_event_limit,
729                    per_author_live_bytes: None,
730                    fetch_timeout: self.config.fetch_timeout,
731                    kinds: Some(kinds.to_vec()),
732                    relay_fetch_mode: plan.relay_fetch_mode,
733                    require_negentropy: self.config.require_negentropy,
734                    relay_event_max_size: self.config.relay_event_max_size,
735                    relay_page_size: plan.relay_page_size,
736                    max_relay_pages: plan.max_relay_pages,
737                },
738            );
739
740            match bridge
741                .crawl_with_progress(self.graph_store.as_ref(), current_root.as_ref(), |progress| {
742                    let log_interval = self.config.author_batch_size.saturating_mul(8).max(2_048);
743                    let should_log = progress.authors_processed == progress.authors_considered
744                        || progress.authors_processed == 0
745                        || progress
746                            .authors_processed
747                            .saturating_sub(last_logged_authors)
748                            >= log_interval;
749                    if should_log {
750                        last_logged_authors = progress.authors_processed;
751                        info!(
752                            "Nostr mirror history sync progress: authors_processed={}/{} events_selected={} events_seen={}",
753                            progress.authors_processed,
754                            progress.authors_considered,
755                            progress.events_selected,
756                            progress.events_seen
757                        );
758                    }
759                })
760                .await
761            {
762                Ok(next_report) => {
763                    report = Some(next_report);
764                    break;
765                }
766                Err(err) => {
767                    last_error = Some(err);
768                    if attempt < 2 {
769                        tokio::time::sleep(Duration::from_millis(500)).await;
770                    }
771                }
772            }
773        }
774        report
775            .ok_or_else(|| last_error.expect("history sync retry captured error"))
776            .context("run mirror history sync")
777    }
778
779    async fn apply_history_root(&self, root: Option<&hashtree_core::Cid>) -> Result<()> {
780        self.graph_store.write_public_events_root(root)?;
781        let Some(root) = root else {
782            return Ok(());
783        };
784
785        let event_store = NostrEventStore::new(self.store.store_arc());
786        let events = event_store
787            .list_recent_lossy(Some(root), ListEventsOptions::default())
788            .await
789            .context("list trusted mirrored events")?
790            .into_iter()
791            .map(socialgraph::stored_event_to_nostr_event)
792            .collect::<Result<Vec<_>>>()?;
793
794        self.graph_store
795            .rebuild_profile_index_for_events(&events)
796            .context("rebuild mirrored profile search index")?;
797        socialgraph::ingest_graph_parsed_events(self.graph_store.as_ref(), &events)
798            .context("sync mirrored social graph state")?;
799        self.note_public_events_root_change()?;
800        self.note_profile_search_root_change()?;
801        self.note_profiles_by_pubkey_root_change()?;
802        let (event_result, profile_search_result, profiles_by_pubkey_result) =
803            self.publish_pending_roots(true, true, true).await;
804        if let Err(err) = event_result {
805            warn!(
806                "Nostr mirror event-root publish failed after root update: {:#}",
807                err
808            );
809        }
810        if let Err(err) = profile_search_result {
811            warn!(
812                "Nostr mirror profile-search publish failed after root update: {:#}",
813                err
814            );
815        }
816        if let Err(err) = profiles_by_pubkey_result {
817            warn!(
818                "Nostr mirror profiles-by-pubkey publish failed after root update: {:#}",
819                err
820            );
821        }
822        Ok(())
823    }
824
825    async fn subscribe_authors_since(
826        &self,
827        authors: &[String],
828        since: Timestamp,
829        subscribed_authors: &mut HashSet<String>,
830    ) -> Result<()> {
831        let new_authors = authors
832            .iter()
833            .filter(|author| !subscribed_authors.contains(*author))
834            .cloned()
835            .collect::<Vec<_>>();
836        if new_authors.is_empty() {
837            return Ok(());
838        }
839
840        for chunk in new_authors.chunks(self.config.author_batch_size.max(1)) {
841            let pubkeys = chunk
842                .iter()
843                .filter_map(|author| PublicKey::from_hex(author).ok())
844                .collect::<Vec<_>>();
845            if pubkeys.is_empty() {
846                continue;
847            }
848
849            let filter = Filter::new()
850                .authors(pubkeys)
851                .kinds(self.config.kinds.iter().copied().map(Kind::from))
852                .since(since);
853
854            self.client
855                .subscribe(vec![filter], None)
856                .await
857                .context("subscribe mirror author batch")?;
858        }
859
860        subscribed_authors.extend(new_authors);
861        Ok(())
862    }
863
864    fn ingest_live_event(&self, event: &Event) -> Result<()> {
865        self.pending_live_events
866            .lock()
867            .expect("pending live events")
868            .insert(event.id.to_hex(), event.clone());
869        Ok(())
870    }
871
872    async fn flush_live_events(&self) -> Result<()> {
873        let pending = {
874            let mut pending = self
875                .pending_live_events
876                .lock()
877                .expect("pending live events");
878            if pending.is_empty() {
879                return Ok(());
880            }
881            std::mem::take(&mut *pending)
882        };
883        let events = pending.into_values().collect::<Vec<_>>();
884        let event_count = events.len();
885        let previous_event_root = self.graph_store.public_events_root()?;
886        let previous_profile_search_root = self.graph_store.profile_search_root()?;
887        let previous_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
888
889        socialgraph::ingest_parsed_events_with_storage_class(
890            self.graph_store.as_ref(),
891            &events,
892            socialgraph::EventStorageClass::Public,
893        )
894        .context("ingest live mirrored event batch")?;
895
896        let next_event_root = self.graph_store.public_events_root()?;
897        let next_profile_search_root = self.graph_store.profile_search_root()?;
898        let next_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
899        let event_root_changed = next_event_root != previous_event_root;
900        let profile_search_root_changed = next_profile_search_root != previous_profile_search_root;
901        let profiles_by_pubkey_root_changed =
902            next_profiles_by_pubkey_root != previous_profiles_by_pubkey_root;
903
904        if event_root_changed {
905            self.note_public_events_root_change()?;
906        }
907        if profile_search_root_changed {
908            self.note_profile_search_root_change()?;
909        }
910        if profiles_by_pubkey_root_changed {
911            self.note_profiles_by_pubkey_root_change()?;
912        }
913        if event_root_changed {
914            self.maybe_publish_event_root(true).await?;
915        }
916        if profile_search_root_changed {
917            self.maybe_publish_profile_search_root(true).await?;
918        }
919        if profiles_by_pubkey_root_changed {
920            self.maybe_publish_profiles_by_pubkey_root(true).await?;
921        }
922
923        info!(
924            "Nostr mirror flushed live events: events={} event_root_changed={} profile_search_root_changed={} profiles_by_pubkey_root_changed={}",
925            event_count,
926            event_root_changed,
927            profile_search_root_changed,
928            profiles_by_pubkey_root_changed
929        );
930        Ok(())
931    }
932
933    fn note_public_events_root_change(&self) -> Result<()> {
934        let root = self.graph_store.public_events_root()?;
935        Self::note_root_change(
936            self.config.published_event_tree_name.as_deref(),
937            &self.event_publish_state,
938            root,
939        )
940    }
941
942    fn note_profile_search_root_change(&self) -> Result<()> {
943        let root = self.graph_store.profile_search_root()?;
944        Self::note_root_change(
945            self.config.published_profile_search_tree_name.as_deref(),
946            &self.profile_search_publish_state,
947            root,
948        )
949    }
950
951    fn note_profiles_by_pubkey_root_change(&self) -> Result<()> {
952        let root = self.graph_store.profiles_by_pubkey_root()?;
953        Self::note_root_change(
954            self.config
955                .published_profiles_by_pubkey_tree_name
956                .as_deref(),
957            &self.profiles_by_pubkey_publish_state,
958            root,
959        )
960    }
961
962    fn note_root_change(
963        tree_name: Option<&str>,
964        publish_state: &Mutex<RootPublishState>,
965        root: Option<hashtree_core::Cid>,
966    ) -> Result<()> {
967        let Some(_tree_name) = tree_name else {
968            return Ok(());
969        };
970
971        let mut state = publish_state.lock().expect("root publish state");
972        let now = Instant::now();
973
974        if state.pending_root == root {
975            return Ok(());
976        }
977
978        state.pending_root = root;
979        state.last_changed_at = Some(now);
980        if state.dirty_since.is_none() {
981            state.dirty_since = Some(now);
982        }
983        Ok(())
984    }
985
986    async fn maybe_publish_event_root(&self, force: bool) -> Result<()> {
987        self.maybe_publish_root(
988            self.config.published_event_tree_name.as_deref(),
989            &self.event_publish_state,
990            "event root",
991            force,
992        )
993        .await
994    }
995
996    async fn maybe_publish_profile_search_root(&self, force: bool) -> Result<()> {
997        self.maybe_publish_root(
998            self.config.published_profile_search_tree_name.as_deref(),
999            &self.profile_search_publish_state,
1000            "profile search root",
1001            force,
1002        )
1003        .await
1004    }
1005
1006    async fn maybe_publish_profiles_by_pubkey_root(&self, force: bool) -> Result<()> {
1007        self.maybe_publish_root(
1008            self.config
1009                .published_profiles_by_pubkey_tree_name
1010                .as_deref(),
1011            &self.profiles_by_pubkey_publish_state,
1012            "profiles-by-pubkey root",
1013            force,
1014        )
1015        .await
1016    }
1017
1018    async fn maybe_publish_root(
1019        &self,
1020        tree_name: Option<&str>,
1021        publish_state: &Mutex<RootPublishState>,
1022        log_label: &str,
1023        force: bool,
1024    ) -> Result<()> {
1025        let Some(tree_name) = tree_name else {
1026            return Ok(());
1027        };
1028
1029        let pending_root = {
1030            let state = publish_state.lock().expect("root publish state");
1031            let Some(pending_root) = state.pending_root.clone() else {
1032                return Ok(());
1033            };
1034
1035            let now = Instant::now();
1036            let debounce_ready = state.last_changed_at.is_some_and(|changed_at| {
1037                now.duration_since(changed_at) >= MIRROR_ROOT_PUBLISH_DEBOUNCE
1038            });
1039            let stale_ready = state.dirty_since.is_some_and(|dirty_since| {
1040                now.duration_since(dirty_since) >= MIRROR_ROOT_PUBLISH_MAX_STALENESS
1041            });
1042            if !force && !debounce_ready && !stale_ready {
1043                return Ok(());
1044            }
1045
1046            pending_root
1047        };
1048
1049        let needs_upload = {
1050            let state = publish_state.lock().expect("root publish state");
1051            !self.config.blossom_write_servers.is_empty()
1052                && state.last_uploaded_root.as_ref() != Some(&pending_root)
1053        };
1054        if needs_upload {
1055            background_blossom_push(
1056                self.store.base_path(),
1057                &pending_root.to_string(),
1058                &self.config.blossom_write_servers,
1059            )
1060            .await
1061            .with_context(|| format!("upload {log_label} DAG to Blossom"))?;
1062
1063            let mut state = publish_state.lock().expect("root publish state");
1064            if state.pending_root.as_ref() == Some(&pending_root) {
1065                state.last_uploaded_root = Some(pending_root.clone());
1066                state.last_uploaded_at = Some(Instant::now());
1067            }
1068        }
1069
1070        let mut successful_relays = Vec::new();
1071        let mut failed_relays = Vec::new();
1072        let publish_required =
1073            self.publish_client.is_some() && !self.config.publish_relays.is_empty();
1074        if publish_required {
1075            let Some(publish_client) = self.publish_client.as_ref() else {
1076                unreachable!("publish_required implies publish_client");
1077            };
1078            if !self.has_connected_publish_relay().await {
1079                return Ok(());
1080            }
1081
1082            let already_published = {
1083                let state = publish_state.lock().expect("root publish state");
1084                state.last_published_root.as_ref() == Some(&pending_root)
1085            };
1086            if !already_published {
1087                let publish_relays = self.config.publish_relays.clone();
1088                let latest_known_created_at = {
1089                    let state = publish_state.lock().expect("root publish state");
1090                    state.last_published_created_at
1091                };
1092                let publish_created_at = next_replaceable_created_at(
1093                    Timestamp::now(),
1094                    later_timestamp(
1095                        latest_known_created_at,
1096                        self.latest_root_event_created_at(tree_name).await,
1097                    ),
1098                );
1099                let event = publish_client
1100                    .sign_event_builder(Self::build_public_root_event(
1101                        tree_name,
1102                        &pending_root,
1103                        publish_created_at,
1104                    ))
1105                    .await
1106                    .with_context(|| format!("sign {log_label} event"))?;
1107                let publish_result = self
1108                    .publish_root_event_to_relays(publish_client, &publish_relays, &event)
1109                    .await
1110                    .with_context(|| format!("publish {log_label} event"))?;
1111                successful_relays = publish_result.0;
1112                failed_relays = publish_result.1;
1113                if successful_relays.is_empty() {
1114                    let failure_summary = if failed_relays.is_empty() {
1115                        "no publish relays accepted the event".to_string()
1116                    } else {
1117                        failed_relays.join("; ")
1118                    };
1119                    anyhow::bail!("no publish relays accepted the event ({failure_summary})");
1120                }
1121
1122                let mut state = publish_state.lock().expect("root publish state");
1123                if state.pending_root.as_ref() == Some(&pending_root) {
1124                    state.last_published_root = Some(pending_root.clone());
1125                    state.last_published_at = Some(Instant::now());
1126                    state.last_published_created_at = Some(event.created_at);
1127                }
1128            }
1129        }
1130
1131        {
1132            let mut state = publish_state.lock().expect("root publish state");
1133            if state.pending_root.as_ref() == Some(&pending_root) {
1134                let upload_satisfied = self.config.blossom_write_servers.is_empty()
1135                    || state.last_uploaded_root.as_ref() == Some(&pending_root);
1136                let publish_satisfied =
1137                    !publish_required || state.last_published_root.as_ref() == Some(&pending_root);
1138                if upload_satisfied && publish_satisfied {
1139                    state.dirty_since = None;
1140                }
1141            }
1142        }
1143
1144        info!(
1145            "Nostr mirror published {}: tree={} hash={} relays={:?}",
1146            log_label,
1147            tree_name,
1148            hex::encode(pending_root.hash),
1149            successful_relays,
1150        );
1151        if !failed_relays.is_empty() {
1152            warn!(
1153                "Nostr mirror publish had relay failures: tree={} failures={:?}",
1154                tree_name, failed_relays
1155            );
1156        }
1157        Ok(())
1158    }
1159
1160    async fn publish_root_event_to_relays(
1161        &self,
1162        publish_client: &Client,
1163        relays: &[String],
1164        event: &Event,
1165    ) -> Result<(Vec<String>, Vec<String>)> {
1166        let mut successful_relays = Vec::new();
1167        let mut failed_relays = Vec::new();
1168
1169        for relay in relays {
1170            match publish_client
1171                .send_event_to([relay.as_str()], event.clone())
1172                .await
1173            {
1174                Ok(output) => {
1175                    if output.success.is_empty() {
1176                        failed_relays.push(format!("{relay}: relay did not acknowledge publish"));
1177                        continue;
1178                    }
1179                    successful_relays.push(relay.clone());
1180                    failed_relays.extend(output.failed.into_iter().map(
1181                        |(url, reason)| match reason {
1182                            Some(reason) => format!("{url}: {reason}"),
1183                            None => format!("{url}: relay rejected publish"),
1184                        },
1185                    ));
1186                }
1187                Err(err) => {
1188                    failed_relays.push(format!("{relay}: {err}"));
1189                }
1190            }
1191        }
1192
1193        Ok((successful_relays, failed_relays))
1194    }
1195
1196    async fn latest_root_event_created_at(&self, tree_name: &str) -> Option<Timestamp> {
1197        let publish_client = self.publish_client.as_ref()?;
1198        let author = self.publish_pubkey?;
1199        let events = publish_client
1200            .get_events_of(
1201                vec![Self::build_public_root_filter(author, tree_name)],
1202                EventSource::relays(Some(self.config.fetch_timeout)),
1203            )
1204            .await
1205            .ok()?;
1206        events
1207            .iter()
1208            .filter(|event| Self::matches_public_root_event(event, tree_name))
1209            .max_by_key(|event| (event.created_at, event.id))
1210            .map(|event| event.created_at)
1211    }
1212
1213    fn build_public_root_filter(author: PublicKey, tree_name: &str) -> Filter {
1214        Filter::new()
1215            .kind(Kind::Custom(30078))
1216            .author(author)
1217            .custom_tag(
1218                SingleLetterTag::lowercase(Alphabet::D),
1219                vec![tree_name.to_string()],
1220            )
1221            .custom_tag(
1222                SingleLetterTag::lowercase(Alphabet::L),
1223                vec!["hashtree".to_string()],
1224            )
1225            .limit(50)
1226    }
1227
1228    fn matches_public_root_event(event: &Event, tree_name: &str) -> bool {
1229        event.kind == Kind::Custom(30078)
1230            && event.tags.iter().any(|tag| {
1231                let values = tag.as_slice();
1232                values.first().is_some_and(|value| value == "d")
1233                    && values.get(1).is_some_and(|value| value == tree_name)
1234            })
1235            && event.tags.iter().any(|tag| {
1236                let values = tag.as_slice();
1237                values.first().is_some_and(|value| value == "l")
1238                    && values.get(1).is_some_and(|value| value == "hashtree")
1239            })
1240    }
1241
1242    fn build_public_root_event(
1243        tree_name: &str,
1244        cid: &hashtree_core::Cid,
1245        created_at: Timestamp,
1246    ) -> EventBuilder {
1247        let mut tags = vec![
1248            Tag::identifier(tree_name.to_string()),
1249            Tag::custom(
1250                TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
1251                vec!["hashtree"],
1252            ),
1253            Tag::custom(TagKind::Custom("hash".into()), vec![hex::encode(cid.hash)]),
1254        ];
1255        if let Some(key) = cid.key {
1256            tags.push(Tag::custom(
1257                TagKind::Custom("key".into()),
1258                vec![hex::encode(key)],
1259            ));
1260        }
1261
1262        EventBuilder::new(Kind::Custom(30078), "", tags).custom_created_at(created_at)
1263    }
1264}
1265
1266fn later_timestamp(left: Option<Timestamp>, right: Option<Timestamp>) -> Option<Timestamp> {
1267    match (left, right) {
1268        (Some(left), Some(right)) => Some(std::cmp::max(left, right)),
1269        (Some(left), None) => Some(left),
1270        (None, Some(right)) => Some(right),
1271        (None, None) => None,
1272    }
1273}
1274
1275fn next_replaceable_created_at(now: Timestamp, latest_existing: Option<Timestamp>) -> Timestamp {
1276    match latest_existing {
1277        Some(latest) if latest >= now => Timestamp::from_secs(latest.as_u64().saturating_add(1)),
1278        _ => now,
1279    }
1280}
1281
1282#[cfg(test)]
1283mod tests;