Skip to main content

hashtree_cli/
nostr_mirror.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::sync::Arc;
3use std::sync::Mutex;
4use std::time::Duration;
5use std::time::Instant;
6
7use anyhow::{Context, Result};
8use hashtree_nostr::{
9    CrawlConfig, CrawlReport, ListEventsOptions, NostrBridge, NostrEventStore, RelayFetchMode,
10};
11use nostr::{
12    Alphabet, Event, EventBuilder, Filter, Kind, PublicKey, SingleLetterTag, Tag, TagKind,
13    Timestamp,
14};
15use nostr_sdk::{
16    pool::RelayLimits, prelude::RelayPoolNotification, Client, Keys, Options, RelayStatus,
17};
18use tokio::sync::watch;
19use tracing::{debug, info, warn};
20
21use crate::socialgraph::crawler::SOCIALGRAPH_RELAY_EVENT_MAX_SIZE;
22use crate::socialgraph::{self, SocialGraphBackend, SocialGraphStore};
23use crate::HashtreeStore;
24
25#[cfg(not(test))]
26const MIRROR_STARTUP_DELAY: Duration = Duration::from_secs(8);
27#[cfg(test)]
28const MIRROR_STARTUP_DELAY: Duration = Duration::from_millis(50);
29
30#[cfg(not(test))]
31const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_secs(1);
32#[cfg(test)]
33const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_millis(250);
34
35#[cfg(not(test))]
36const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
37#[cfg(test)]
38const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_millis(100);
39
40#[cfg(not(test))]
41const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_secs(30);
42#[cfg(test)]
43const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_millis(100);
44
45const DEFAULT_HISTORY_KINDS: [u16; 6] = [0, 1, 3, 6, 7, 9735];
46const DEFAULT_EVENT_TREE_NAME: &str = "nostr-event-index";
47const DEFAULT_PROFILE_SEARCH_TREE_NAME: &str = "profile-search";
48
49#[cfg(not(test))]
50const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_secs(300);
51#[cfg(test)]
52const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_millis(100);
53
54#[cfg(not(test))]
55const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_secs(5);
56#[cfg(test)]
57const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_millis(20);
58
59#[cfg(not(test))]
60const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_secs(30);
61#[cfg(test)]
62const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_millis(100);
63
64#[derive(Debug, Clone)]
65pub struct NostrMirrorConfig {
66    pub relays: Vec<String>,
67    pub publish_relays: Vec<String>,
68    pub max_follow_distance: u32,
69    pub overmute_threshold: f64,
70    pub author_batch_size: usize,
71    pub history_sync_author_chunk_size: usize,
72    pub history_sync_per_author_event_limit: usize,
73    pub missing_profile_backfill_batch_size: usize,
74    pub fetch_timeout: Duration,
75    pub relay_event_max_size: Option<u32>,
76    pub require_negentropy: bool,
77    pub kinds: Vec<u16>,
78    pub history_sync_on_start: bool,
79    pub history_sync_on_reconnect: bool,
80    pub published_event_tree_name: Option<String>,
81    pub published_profile_search_tree_name: Option<String>,
82}
83
84impl Default for NostrMirrorConfig {
85    fn default() -> Self {
86        Self {
87            relays: Vec::new(),
88            publish_relays: Vec::new(),
89            max_follow_distance: 2,
90            overmute_threshold: 1.0,
91            author_batch_size: 256,
92            history_sync_author_chunk_size: 5_000,
93            history_sync_per_author_event_limit: 256,
94            missing_profile_backfill_batch_size: 5_000,
95            fetch_timeout: Duration::from_secs(15),
96            relay_event_max_size: Some(SOCIALGRAPH_RELAY_EVENT_MAX_SIZE),
97            require_negentropy: false,
98            kinds: DEFAULT_HISTORY_KINDS.to_vec(),
99            history_sync_on_start: true,
100            history_sync_on_reconnect: true,
101            published_event_tree_name: Some(DEFAULT_EVENT_TREE_NAME.to_string()),
102            published_profile_search_tree_name: Some(DEFAULT_PROFILE_SEARCH_TREE_NAME.to_string()),
103        }
104    }
105}
106
107#[derive(Debug, Default)]
108struct RootPublishState {
109    pending_root: Option<hashtree_core::Cid>,
110    last_changed_at: Option<Instant>,
111    dirty_since: Option<Instant>,
112    last_published_root: Option<hashtree_core::Cid>,
113    last_published_at: Option<Instant>,
114}
115
116pub struct BackgroundNostrMirror {
117    config: NostrMirrorConfig,
118    store: Arc<HashtreeStore>,
119    graph_store: Arc<SocialGraphStore>,
120    client: Client,
121    publish_client: Option<Client>,
122    event_publish_state: Mutex<RootPublishState>,
123    profile_search_publish_state: Mutex<RootPublishState>,
124    pending_live_events: Mutex<BTreeMap<String, Event>>,
125    missing_profile_cursor: Mutex<usize>,
126    shutdown_tx: watch::Sender<bool>,
127    shutdown_rx: watch::Receiver<bool>,
128}
129
130impl BackgroundNostrMirror {
131    pub async fn new(
132        config: NostrMirrorConfig,
133        store: Arc<HashtreeStore>,
134        graph_store: Arc<SocialGraphStore>,
135        publish_keys: Option<Keys>,
136    ) -> Result<Self> {
137        let client = if let Some(max_size) = config.relay_event_max_size {
138            let mut limits = RelayLimits::default();
139            limits.events.max_size = Some(max_size);
140            Client::with_opts(Keys::generate(), Options::new().relay_limits(limits))
141        } else {
142            Client::new(Keys::generate())
143        };
144        for relay in &config.relays {
145            client
146                .add_relay(relay)
147                .await
148                .with_context(|| format!("add mirror relay {relay}"))?;
149        }
150        client.connect().await;
151
152        let publish_client = if let Some(keys) = publish_keys {
153            if config.publish_relays.is_empty() {
154                None
155            } else {
156                let client = Client::new(keys);
157                for relay in &config.publish_relays {
158                    client
159                        .add_relay(relay)
160                        .await
161                        .with_context(|| format!("add mirror publish relay {relay}"))?;
162                }
163                client.connect().await;
164                Some(client)
165            }
166        } else {
167            None
168        };
169
170        let (shutdown_tx, shutdown_rx) = watch::channel(false);
171        Ok(Self {
172            config,
173            store,
174            graph_store,
175            client,
176            publish_client,
177            event_publish_state: Mutex::new(RootPublishState::default()),
178            profile_search_publish_state: Mutex::new(RootPublishState::default()),
179            pending_live_events: Mutex::new(BTreeMap::new()),
180            missing_profile_cursor: Mutex::new(0),
181            shutdown_tx,
182            shutdown_rx,
183        })
184    }
185
186    pub fn shutdown(&self) {
187        let _ = self.shutdown_tx.send(true);
188    }
189
190    fn sync_publish_roots_from_store(&self) -> Result<()> {
191        self.note_public_events_root_change()?;
192        self.note_profile_search_root_change()?;
193        Ok(())
194    }
195
196    pub async fn run(&self) -> Result<()> {
197        if self.config.relays.is_empty() || self.config.max_follow_distance == 0 {
198            return Ok(());
199        }
200
201        info!(
202            "Nostr mirror starting: relays={} max_follow_distance={} negentropy_only={} kinds={:?} history_sync_author_chunk_size={} history_sync_on_start={} history_sync_on_reconnect={}",
203            self.config.relays.len(),
204            self.config.max_follow_distance,
205            self.config.require_negentropy,
206            self.config.kinds,
207            self.config.history_sync_author_chunk_size.max(1),
208            self.config.history_sync_on_start,
209            self.config.history_sync_on_reconnect
210        );
211
212        tokio::time::sleep(MIRROR_STARTUP_DELAY).await;
213        tokio::time::sleep(MIRROR_CONNECT_SETTLE_DELAY).await;
214        let live_since = Timestamp::now();
215        self.sync_publish_roots_from_store()?;
216
217        let initial_authors = self.collect_authors()?;
218        if initial_authors.is_empty() {
219            info!("Nostr mirror: no social-graph authors to mirror yet");
220        } else if self.config.history_sync_on_start {
221            if self.should_backfill_missing_profiles(None) {
222                let missing_profile_authors = self.collect_missing_profile_authors(
223                    self.config.missing_profile_backfill_batch_size,
224                )?;
225                if !missing_profile_authors.is_empty() {
226                    info!(
227                        "Nostr mirror missing-profile backfill starting: authors={}",
228                        missing_profile_authors.len()
229                    );
230                    self.history_sync_authors_with_kinds(
231                        missing_profile_authors,
232                        &[Kind::Metadata.as_u16()],
233                    )
234                    .await?;
235                }
236            }
237            self.history_sync_authors(initial_authors.clone()).await?;
238        }
239
240        let mut subscribed_authors = HashSet::new();
241        self.subscribe_authors_since(&initial_authors, live_since, &mut subscribed_authors)
242            .await?;
243
244        let mut relay_statuses = self.capture_relay_statuses().await;
245        let mut last_reconnect_history_sync_at: Option<Instant> = None;
246        let mut last_missing_profile_backfill_at: Option<Instant> = None;
247        let mut notifications = self.client.notifications();
248        let mut shutdown_rx = self.shutdown_rx.clone();
249        let mut refresh_interval = tokio::time::interval(MIRROR_AUTHOR_REFRESH_INTERVAL);
250        let mut publish_interval = tokio::time::interval(MIRROR_ROOT_PUBLISH_DEBOUNCE);
251
252        loop {
253            tokio::select! {
254                _ = shutdown_rx.changed() => {
255                    if *shutdown_rx.borrow() {
256                        break;
257                    }
258                }
259                _ = refresh_interval.tick() => {
260                    let authors = self.collect_authors()?;
261                    let new_authors = authors
262                        .into_iter()
263                        .filter(|author| !subscribed_authors.contains(author))
264                        .collect::<Vec<_>>();
265                    if !new_authors.is_empty() {
266                        debug!(
267                            "Nostr mirror discovered {} newly reachable author(s)",
268                            new_authors.len()
269                        );
270                        self.history_sync_authors(new_authors.clone()).await?;
271                        self.subscribe_authors_since(
272                            &new_authors,
273                            Timestamp::now(),
274                            &mut subscribed_authors,
275                        )
276                        .await?;
277                    }
278                    if self.should_backfill_missing_profiles(last_missing_profile_backfill_at) {
279                        let missing_profile_authors = self.collect_missing_profile_authors(
280                            self.config.missing_profile_backfill_batch_size,
281                        )?;
282                        if !missing_profile_authors.is_empty() {
283                            info!(
284                                "Nostr mirror missing-profile backfill starting: authors={}",
285                                missing_profile_authors.len()
286                            );
287                            self.history_sync_authors_with_kinds(
288                                missing_profile_authors,
289                                &[Kind::Metadata.as_u16()],
290                            )
291                            .await?;
292                            last_missing_profile_backfill_at = Some(Instant::now());
293                        }
294                    }
295                }
296                _ = publish_interval.tick() => {
297                    self.sync_publish_roots_from_store()?;
298                    if let Err(err) = self.flush_live_events().await {
299                        warn!("Nostr mirror live event flush failed: {:#}", err);
300                    }
301                    if let Err(err) = self.maybe_publish_event_root(false).await {
302                        warn!("Nostr mirror event-root publish failed: {:#}", err);
303                    }
304                    if let Err(err) = self.maybe_publish_profile_search_root(false).await {
305                        warn!("Nostr mirror profile-search publish failed: {:#}", err);
306                    }
307                }
308                notification = notifications.recv() => {
309                    match notification {
310                        Ok(RelayPoolNotification::Event { event, .. }) => {
311                            self.ingest_live_event(&event)?;
312                        }
313                        Ok(RelayPoolNotification::RelayStatus { relay_url, status }) => {
314                            let relay_url = relay_url.to_string();
315                            let previous = relay_statuses.insert(relay_url.clone(), status);
316                            if Self::should_history_sync_on_reconnect(
317                                self.config.history_sync_on_reconnect,
318                                previous,
319                                status,
320                            ) && Self::should_run_reconnect_history_sync(
321                                    last_reconnect_history_sync_at.as_ref(),
322                                )
323                            {
324                                let authors = self.collect_authors()?;
325                                if !authors.is_empty() {
326                                    info!(
327                                        "Nostr mirror relay reconnected; running catch-up history sync: relay={} authors={} negentropy_only={}",
328                                        relay_url,
329                                        authors.len(),
330                                        self.config.require_negentropy
331                                    );
332                                    self.history_sync_authors(authors).await?;
333                                    last_reconnect_history_sync_at = Some(Instant::now());
334                                }
335                            }
336                        }
337                        Ok(RelayPoolNotification::Shutdown) => break,
338                        Ok(_) => {}
339                        Err(err) => {
340                            warn!("Nostr mirror notification error: {}", err);
341                            break;
342                        }
343                    }
344                }
345            }
346        }
347
348        if let Err(err) = self.flush_live_events().await {
349            warn!(
350                "Nostr mirror live event flush failed during shutdown: {:#}",
351                err
352            );
353        }
354        if let Err(err) = self.sync_publish_roots_from_store() {
355            warn!(
356                "Nostr mirror root-state refresh failed during shutdown: {:#}",
357                err
358            );
359        }
360        if let Err(err) = self.maybe_publish_event_root(true).await {
361            warn!(
362                "Nostr mirror event-root publish failed during shutdown: {:#}",
363                err
364            );
365        }
366        if let Err(err) = self.maybe_publish_profile_search_root(true).await {
367            warn!(
368                "Nostr mirror profile-search publish failed during shutdown: {:#}",
369                err
370            );
371        }
372        let _ = self.client.disconnect().await;
373        if let Some(client) = self.publish_client.as_ref() {
374            let _ = client.disconnect().await;
375        }
376        Ok(())
377    }
378
379    async fn capture_relay_statuses(&self) -> HashMap<String, RelayStatus> {
380        let mut statuses = HashMap::new();
381        for (relay_url, relay) in self.client.relays().await {
382            statuses.insert(relay_url.to_string(), relay.status().await);
383        }
384        statuses
385    }
386
387    async fn has_connected_publish_relay(&self) -> bool {
388        let Some(client) = self.publish_client.as_ref() else {
389            return false;
390        };
391        Self::client_has_connected_relay(client).await
392    }
393
394    async fn client_has_connected_relay(client: &Client) -> bool {
395        for (_relay_url, relay) in client.relays().await {
396            if relay.status().await == RelayStatus::Connected {
397                return true;
398            }
399        }
400        false
401    }
402
403    fn collect_authors(&self) -> Result<Vec<String>> {
404        let mut authors = Vec::new();
405        let mut seen = HashSet::new();
406        for distance in 0..=self.config.max_follow_distance {
407            for pubkey in socialgraph::SocialGraphBackend::users_by_follow_distance(
408                self.graph_store.as_ref(),
409                distance,
410            )
411            .with_context(|| format!("load social-graph distance {distance}"))?
412            {
413                if self
414                    .graph_store
415                    .is_overmuted_user(&pubkey, self.config.overmute_threshold)?
416                {
417                    continue;
418                }
419                let hex = hex::encode(pubkey);
420                if seen.insert(hex.clone()) {
421                    authors.push(hex);
422                }
423            }
424        }
425        Ok(authors)
426    }
427
428    fn collect_missing_profile_authors(&self, limit: usize) -> Result<Vec<String>> {
429        if limit == 0 {
430            return Ok(Vec::new());
431        }
432
433        let authors = self.collect_authors()?;
434        if authors.is_empty() {
435            return Ok(Vec::new());
436        }
437
438        let mut cursor = self
439            .missing_profile_cursor
440            .lock()
441            .expect("missing profile cursor");
442        let mut index = (*cursor).min(authors.len());
443        let mut scanned = 0usize;
444        let mut missing = Vec::new();
445
446        while scanned < authors.len() && missing.len() < limit {
447            let author = &authors[index];
448            if self.graph_store.latest_profile_event(author)?.is_none() {
449                missing.push(author.clone());
450            }
451            index += 1;
452            if index == authors.len() {
453                index = 0;
454            }
455            scanned += 1;
456        }
457
458        *cursor = index;
459        Ok(missing)
460    }
461
462    fn should_backfill_missing_profiles(&self, last_run: Option<Instant>) -> bool {
463        if self.config.missing_profile_backfill_batch_size == 0
464            || !self.config.kinds.contains(&Kind::Metadata.as_u16())
465        {
466            return false;
467        }
468        match last_run {
469            Some(last_run) => last_run.elapsed() >= MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL,
470            None => true,
471        }
472    }
473
474    fn should_history_sync_on_reconnect(
475        history_sync_on_reconnect: bool,
476        previous: Option<RelayStatus>,
477        status: RelayStatus,
478    ) -> bool {
479        history_sync_on_reconnect
480            && status == RelayStatus::Connected
481            && matches!(
482                previous,
483                Some(
484                    RelayStatus::Initialized
485                        | RelayStatus::Pending
486                        | RelayStatus::Connecting
487                        | RelayStatus::Disconnected
488                        | RelayStatus::Terminated
489                )
490            )
491    }
492
493    fn should_run_reconnect_history_sync(last_run: Option<&Instant>) -> bool {
494        match last_run {
495            None => true,
496            Some(last_run) => last_run.elapsed() >= MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN,
497        }
498    }
499
500    async fn history_sync_authors(&self, authors: Vec<String>) -> Result<()> {
501        self.history_sync_authors_with_kinds(authors, &self.config.kinds)
502            .await
503    }
504
505    async fn history_sync_authors_with_kinds(
506        &self,
507        authors: Vec<String>,
508        kinds: &[u16],
509    ) -> Result<()> {
510        self.history_sync_authors_chunked(authors, |current_root, author_chunk| async move {
511            self.history_sync_author_chunk(current_root, author_chunk, kinds)
512                .await
513        })
514        .await
515    }
516
517    async fn history_sync_authors_chunked<F, Fut>(
518        &self,
519        authors: Vec<String>,
520        mut run_chunk: F,
521    ) -> Result<()>
522    where
523        F: FnMut(Option<hashtree_core::Cid>, Vec<String>) -> Fut,
524        Fut: std::future::Future<Output = Result<CrawlReport>>,
525    {
526        if authors.is_empty() {
527            return Ok(());
528        }
529
530        info!(
531            "Nostr mirror history sync starting: authors={} relays={} negentropy_only={}",
532            authors.len(),
533            self.config.relays.len(),
534            self.config.require_negentropy
535        );
536
537        let mut current_root = self.graph_store.public_events_root()?;
538        let mut last_error = None;
539        let mut applied_chunks = 0usize;
540        let mut failed_chunks = 0usize;
541        let chunk_size = self.config.history_sync_author_chunk_size.max(1);
542        let total_chunks = authors.len().div_ceil(chunk_size);
543
544        for (chunk_index, author_chunk) in authors.chunks(chunk_size).enumerate() {
545            let author_chunk = author_chunk.to_vec();
546            let author_count = author_chunk.len();
547            info!(
548                "Nostr mirror history sync chunk starting: chunk={}/{} authors={}",
549                chunk_index + 1,
550                total_chunks,
551                author_count
552            );
553            let report = match run_chunk(current_root.clone(), author_chunk).await {
554                Ok(report) => report,
555                Err(err) => {
556                    failed_chunks = failed_chunks.saturating_add(1);
557                    warn!(
558                        "Nostr mirror history sync chunk failed: chunk={}/{} authors={} error={:#}",
559                        chunk_index + 1,
560                        total_chunks,
561                        author_count,
562                        err
563                    );
564                    last_error = Some(err);
565                    continue;
566                }
567            };
568
569            if report.root != current_root {
570                self.apply_history_root(report.root.as_ref()).await?;
571                current_root = report.root.clone();
572                info!(
573                    "Nostr mirror history sync updated trusted root: chunk={}/{} authors_processed={} events_selected={} events_seen={}",
574                    chunk_index + 1,
575                    total_chunks,
576                    report.authors_processed,
577                    report.events_selected,
578                    report.events_seen
579                );
580            }
581            applied_chunks = applied_chunks.saturating_add(1);
582        }
583
584        if applied_chunks == 0 {
585            return Err(last_error
586                .unwrap_or_else(|| anyhow::anyhow!("mirror history sync made no progress"))
587                .context("run mirror history sync"));
588        }
589        if failed_chunks > 0 {
590            warn!(
591                "Nostr mirror history sync completed with skipped chunks: applied_chunks={} failed_chunks={}",
592                applied_chunks,
593                failed_chunks
594            );
595        }
596        Ok(())
597    }
598
599    async fn history_sync_author_chunk(
600        &self,
601        current_root: Option<hashtree_core::Cid>,
602        authors: Vec<String>,
603        kinds: &[u16],
604    ) -> Result<CrawlReport> {
605        let mut last_error = None;
606        let mut report = None;
607        for attempt in 0..3 {
608            let mut last_logged_authors = 0usize;
609            let bridge = NostrBridge::new(
610                self.store.store_arc(),
611                CrawlConfig {
612                    relays: self.config.relays.clone(),
613                    author_allowlist: Some(authors.clone()),
614                    max_live_bytes: None,
615                    max_events_seen: None,
616                    max_authors: None,
617                    max_follow_distance: None,
618                    author_batch_size: self.config.author_batch_size.max(1),
619                    per_author_event_limit: self.config.history_sync_per_author_event_limit.max(1),
620                    per_author_live_bytes: None,
621                    fetch_timeout: self.config.fetch_timeout,
622                    kinds: Some(kinds.to_vec()),
623                    relay_fetch_mode: RelayFetchMode::AuthorBatches,
624                    require_negentropy: self.config.require_negentropy,
625                    relay_event_max_size: self.config.relay_event_max_size,
626                    relay_page_size: 1_000,
627                    max_relay_pages: 10,
628                },
629            );
630
631            match bridge
632                .crawl_with_progress(self.graph_store.as_ref(), current_root.as_ref(), |progress| {
633                    let log_interval = self.config.author_batch_size.saturating_mul(8).max(2_048);
634                    let should_log = progress.authors_processed == progress.authors_considered
635                        || progress.authors_processed == 0
636                        || progress
637                            .authors_processed
638                            .saturating_sub(last_logged_authors)
639                            >= log_interval;
640                    if should_log {
641                        last_logged_authors = progress.authors_processed;
642                        info!(
643                            "Nostr mirror history sync progress: authors_processed={}/{} events_selected={} events_seen={}",
644                            progress.authors_processed,
645                            progress.authors_considered,
646                            progress.events_selected,
647                            progress.events_seen
648                        );
649                    }
650                })
651                .await
652            {
653                Ok(next_report) => {
654                    report = Some(next_report);
655                    break;
656                }
657                Err(err) => {
658                    last_error = Some(err);
659                    if attempt < 2 {
660                        tokio::time::sleep(Duration::from_millis(500)).await;
661                    }
662                }
663            }
664        }
665        report
666            .ok_or_else(|| last_error.expect("history sync retry captured error"))
667            .context("run mirror history sync")
668    }
669
670    async fn apply_history_root(&self, root: Option<&hashtree_core::Cid>) -> Result<()> {
671        self.graph_store.write_public_events_root(root)?;
672        let Some(root) = root else {
673            return Ok(());
674        };
675
676        let event_store = NostrEventStore::new(self.store.store_arc());
677        let events = event_store
678            .list_recent_lossy(Some(root), ListEventsOptions::default())
679            .await
680            .context("list trusted mirrored events")?
681            .into_iter()
682            .map(socialgraph::stored_event_to_nostr_event)
683            .collect::<Result<Vec<_>>>()?;
684
685        self.graph_store
686            .rebuild_profile_index_for_events(&events)
687            .context("rebuild mirrored profile search index")?;
688        socialgraph::ingest_graph_parsed_events(self.graph_store.as_ref(), &events)
689            .context("sync mirrored social graph state")?;
690        self.note_public_events_root_change()?;
691        self.note_profile_search_root_change()?;
692        if let Err(err) = self.maybe_publish_event_root(true).await {
693            warn!(
694                "Nostr mirror event-root publish failed after root update: {:#}",
695                err
696            );
697        }
698        if let Err(err) = self.maybe_publish_profile_search_root(false).await {
699            warn!(
700                "Nostr mirror profile-search publish failed after root update: {:#}",
701                err
702            );
703        }
704        Ok(())
705    }
706
707    async fn subscribe_authors_since(
708        &self,
709        authors: &[String],
710        since: Timestamp,
711        subscribed_authors: &mut HashSet<String>,
712    ) -> Result<()> {
713        let new_authors = authors
714            .iter()
715            .filter(|author| !subscribed_authors.contains(*author))
716            .cloned()
717            .collect::<Vec<_>>();
718        if new_authors.is_empty() {
719            return Ok(());
720        }
721
722        for chunk in new_authors.chunks(self.config.author_batch_size.max(1)) {
723            let pubkeys = chunk
724                .iter()
725                .filter_map(|author| PublicKey::from_hex(author).ok())
726                .collect::<Vec<_>>();
727            if pubkeys.is_empty() {
728                continue;
729            }
730
731            let filter = Filter::new()
732                .authors(pubkeys)
733                .kinds(self.config.kinds.iter().copied().map(Kind::from))
734                .since(since);
735
736            self.client
737                .subscribe(vec![filter], None)
738                .await
739                .context("subscribe mirror author batch")?;
740        }
741
742        subscribed_authors.extend(new_authors);
743        Ok(())
744    }
745
746    fn ingest_live_event(&self, event: &Event) -> Result<()> {
747        self.pending_live_events
748            .lock()
749            .expect("pending live events")
750            .insert(event.id.to_hex(), event.clone());
751        Ok(())
752    }
753
754    async fn flush_live_events(&self) -> Result<()> {
755        let pending = {
756            let mut pending = self
757                .pending_live_events
758                .lock()
759                .expect("pending live events");
760            if pending.is_empty() {
761                return Ok(());
762            }
763            std::mem::take(&mut *pending)
764        };
765        let events = pending.into_values().collect::<Vec<_>>();
766        let event_count = events.len();
767        let previous_event_root = self.graph_store.public_events_root()?;
768        let previous_profile_search_root = self.graph_store.profile_search_root()?;
769
770        socialgraph::ingest_parsed_events_with_storage_class(
771            self.graph_store.as_ref(),
772            &events,
773            socialgraph::EventStorageClass::Public,
774        )
775        .context("ingest live mirrored event batch")?;
776
777        let next_event_root = self.graph_store.public_events_root()?;
778        let next_profile_search_root = self.graph_store.profile_search_root()?;
779        let event_root_changed = next_event_root != previous_event_root;
780        let profile_search_root_changed = next_profile_search_root != previous_profile_search_root;
781
782        if event_root_changed {
783            self.note_public_events_root_change()?;
784        }
785        if profile_search_root_changed {
786            self.note_profile_search_root_change()?;
787        }
788        if event_root_changed {
789            self.maybe_publish_event_root(true).await?;
790        }
791        if profile_search_root_changed {
792            self.maybe_publish_profile_search_root(true).await?;
793        }
794
795        info!(
796            "Nostr mirror flushed live events: events={} event_root_changed={} profile_search_root_changed={}",
797            event_count, event_root_changed, profile_search_root_changed
798        );
799        Ok(())
800    }
801
802    fn note_public_events_root_change(&self) -> Result<()> {
803        let root = self.graph_store.public_events_root()?;
804        Self::note_root_change(
805            self.config.published_event_tree_name.as_deref(),
806            &self.event_publish_state,
807            root,
808        )
809    }
810
811    fn note_profile_search_root_change(&self) -> Result<()> {
812        let root = self.graph_store.profile_search_root()?;
813        Self::note_root_change(
814            self.config.published_profile_search_tree_name.as_deref(),
815            &self.profile_search_publish_state,
816            root,
817        )
818    }
819
820    fn note_root_change(
821        tree_name: Option<&str>,
822        publish_state: &Mutex<RootPublishState>,
823        root: Option<hashtree_core::Cid>,
824    ) -> Result<()> {
825        let Some(_tree_name) = tree_name else {
826            return Ok(());
827        };
828
829        let mut state = publish_state.lock().expect("root publish state");
830        let now = Instant::now();
831
832        if state.pending_root == root {
833            return Ok(());
834        }
835
836        state.pending_root = root;
837        state.last_changed_at = Some(now);
838        if state.dirty_since.is_none() {
839            state.dirty_since = Some(now);
840        }
841        Ok(())
842    }
843
844    async fn maybe_publish_event_root(&self, force: bool) -> Result<()> {
845        self.maybe_publish_root(
846            self.config.published_event_tree_name.as_deref(),
847            &self.event_publish_state,
848            "event root",
849            force,
850        )
851        .await
852    }
853
854    async fn maybe_publish_profile_search_root(&self, force: bool) -> Result<()> {
855        self.maybe_publish_root(
856            self.config.published_profile_search_tree_name.as_deref(),
857            &self.profile_search_publish_state,
858            "profile search root",
859            force,
860        )
861        .await
862    }
863
864    async fn maybe_publish_root(
865        &self,
866        tree_name: Option<&str>,
867        publish_state: &Mutex<RootPublishState>,
868        log_label: &str,
869        force: bool,
870    ) -> Result<()> {
871        let Some(tree_name) = tree_name else {
872            return Ok(());
873        };
874        let Some(publish_client) = self.publish_client.as_ref() else {
875            return Ok(());
876        };
877        if !self.has_connected_publish_relay().await {
878            return Ok(());
879        }
880
881        let pending_root = {
882            let state = publish_state.lock().expect("root publish state");
883            let Some(pending_root) = state.pending_root.clone() else {
884                return Ok(());
885            };
886            if state.last_published_root.as_ref() == Some(&pending_root) {
887                return Ok(());
888            }
889
890            let now = Instant::now();
891            let debounce_ready = state.last_changed_at.is_some_and(|changed_at| {
892                now.duration_since(changed_at) >= MIRROR_ROOT_PUBLISH_DEBOUNCE
893            });
894            let stale_ready = state.dirty_since.is_some_and(|dirty_since| {
895                now.duration_since(dirty_since) >= MIRROR_ROOT_PUBLISH_MAX_STALENESS
896            });
897            if !force && !debounce_ready && !stale_ready {
898                return Ok(());
899            }
900
901            pending_root
902        };
903
904        let event = Self::build_public_root_event(tree_name, &pending_root);
905        let output = publish_client
906            .send_event_builder(event)
907            .await
908            .with_context(|| format!("publish {log_label} event"))?;
909        if output.failed.is_empty() && output.success.is_empty() {
910            return Ok(());
911        }
912
913        {
914            let mut state = publish_state.lock().expect("root publish state");
915            if state.pending_root.as_ref() == Some(&pending_root) {
916                state.last_published_root = Some(pending_root.clone());
917                state.last_published_at = Some(Instant::now());
918                state.dirty_since = None;
919            }
920        }
921
922        info!(
923            "Nostr mirror published {}: tree={} hash={}",
924            log_label,
925            tree_name,
926            hex::encode(pending_root.hash)
927        );
928        Ok(())
929    }
930
931    fn build_public_root_event(tree_name: &str, cid: &hashtree_core::Cid) -> EventBuilder {
932        let mut tags = vec![
933            Tag::identifier(tree_name.to_string()),
934            Tag::custom(
935                TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
936                vec!["hashtree"],
937            ),
938            Tag::custom(TagKind::Custom("hash".into()), vec![hex::encode(cid.hash)]),
939        ];
940        if let Some(key) = cid.key {
941            tags.push(Tag::custom(
942                TagKind::Custom("key".into()),
943                vec![hex::encode(key)],
944            ));
945        }
946
947        EventBuilder::new(Kind::Custom(30078), "", tags)
948    }
949}
950
951#[cfg(test)]
952mod tests;