Skip to main content

hashtree_cli/
nostr_mirror.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use std::sync::Mutex;
4use std::time::Duration;
5use std::time::Instant;
6
7use anyhow::{Context, Result};
8use hashtree_nostr::{
9    CrawlConfig, CrawlReport, ListEventsOptions, NostrBridge, NostrEventStore, RelayFetchMode,
10};
11use nostr::{
12    Alphabet, Event, EventBuilder, Filter, Kind, PublicKey, SingleLetterTag, Tag, TagKind,
13    Timestamp,
14};
15use nostr_sdk::{
16    pool::RelayLimits, prelude::RelayPoolNotification, Client, Keys, Options, RelayStatus,
17};
18use tokio::sync::watch;
19use tracing::{debug, info, warn};
20
21use crate::socialgraph::crawler::SOCIALGRAPH_RELAY_EVENT_MAX_SIZE;
22use crate::socialgraph::{self, SocialGraphBackend, SocialGraphStore};
23use crate::HashtreeStore;
24
25#[cfg(not(test))]
26const MIRROR_STARTUP_DELAY: Duration = Duration::from_secs(8);
27#[cfg(test)]
28const MIRROR_STARTUP_DELAY: Duration = Duration::from_millis(50);
29
30#[cfg(not(test))]
31const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_secs(1);
32#[cfg(test)]
33const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_millis(250);
34
35#[cfg(not(test))]
36const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
37#[cfg(test)]
38const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_millis(100);
39
40#[cfg(not(test))]
41const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_secs(30);
42#[cfg(test)]
43const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_millis(100);
44
45const DEFAULT_HISTORY_KINDS: [u16; 2] = [0, 3];
46const DEFAULT_PROFILE_SEARCH_TREE_NAME: &str = "profile-search";
47
48#[cfg(not(test))]
49const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_secs(300);
50#[cfg(test)]
51const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_millis(100);
52
53#[cfg(not(test))]
54const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_secs(5);
55#[cfg(test)]
56const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_millis(20);
57
58#[cfg(not(test))]
59const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_secs(30);
60#[cfg(test)]
61const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_millis(100);
62
63#[derive(Debug, Clone)]
64pub struct NostrMirrorConfig {
65    pub relays: Vec<String>,
66    pub publish_relays: Vec<String>,
67    pub max_follow_distance: u32,
68    pub overmute_threshold: f64,
69    pub author_batch_size: usize,
70    pub history_sync_author_chunk_size: usize,
71    pub missing_profile_backfill_batch_size: usize,
72    pub fetch_timeout: Duration,
73    pub relay_event_max_size: Option<u32>,
74    pub require_negentropy: bool,
75    pub kinds: Vec<u16>,
76    pub history_sync_on_start: bool,
77    pub history_sync_on_reconnect: bool,
78    pub published_profile_search_tree_name: Option<String>,
79}
80
81impl Default for NostrMirrorConfig {
82    fn default() -> Self {
83        Self {
84            relays: Vec::new(),
85            publish_relays: Vec::new(),
86            max_follow_distance: 2,
87            overmute_threshold: 1.0,
88            author_batch_size: 256,
89            history_sync_author_chunk_size: 5_000,
90            missing_profile_backfill_batch_size: 5_000,
91            fetch_timeout: Duration::from_secs(15),
92            relay_event_max_size: Some(SOCIALGRAPH_RELAY_EVENT_MAX_SIZE),
93            require_negentropy: false,
94            kinds: DEFAULT_HISTORY_KINDS.to_vec(),
95            history_sync_on_start: true,
96            history_sync_on_reconnect: true,
97            published_profile_search_tree_name: Some(DEFAULT_PROFILE_SEARCH_TREE_NAME.to_string()),
98        }
99    }
100}
101
102#[derive(Debug, Default)]
103struct RootPublishState {
104    pending_root: Option<hashtree_core::Cid>,
105    last_changed_at: Option<Instant>,
106    dirty_since: Option<Instant>,
107    last_published_root: Option<hashtree_core::Cid>,
108    last_published_at: Option<Instant>,
109}
110
111pub struct BackgroundNostrMirror {
112    config: NostrMirrorConfig,
113    store: Arc<HashtreeStore>,
114    graph_store: Arc<SocialGraphStore>,
115    client: Client,
116    publish_client: Option<Client>,
117    profile_search_publish_state: Mutex<RootPublishState>,
118    missing_profile_cursor: Mutex<usize>,
119    shutdown_tx: watch::Sender<bool>,
120    shutdown_rx: watch::Receiver<bool>,
121}
122
123impl BackgroundNostrMirror {
124    pub async fn new(
125        config: NostrMirrorConfig,
126        store: Arc<HashtreeStore>,
127        graph_store: Arc<SocialGraphStore>,
128        publish_keys: Option<Keys>,
129    ) -> Result<Self> {
130        let client = if let Some(max_size) = config.relay_event_max_size {
131            let mut limits = RelayLimits::default();
132            limits.events.max_size = Some(max_size);
133            Client::with_opts(Keys::generate(), Options::new().relay_limits(limits))
134        } else {
135            Client::new(Keys::generate())
136        };
137        for relay in &config.relays {
138            client
139                .add_relay(relay)
140                .await
141                .with_context(|| format!("add mirror relay {relay}"))?;
142        }
143        client.connect().await;
144
145        let publish_client = if let Some(keys) = publish_keys {
146            if config.publish_relays.is_empty() {
147                None
148            } else {
149                let client = Client::new(keys);
150                for relay in &config.publish_relays {
151                    client
152                        .add_relay(relay)
153                        .await
154                        .with_context(|| format!("add mirror publish relay {relay}"))?;
155                }
156                client.connect().await;
157                Some(client)
158            }
159        } else {
160            None
161        };
162
163        let (shutdown_tx, shutdown_rx) = watch::channel(false);
164        Ok(Self {
165            config,
166            store,
167            graph_store,
168            client,
169            publish_client,
170            profile_search_publish_state: Mutex::new(RootPublishState::default()),
171            missing_profile_cursor: Mutex::new(0),
172            shutdown_tx,
173            shutdown_rx,
174        })
175    }
176
177    pub fn shutdown(&self) {
178        let _ = self.shutdown_tx.send(true);
179    }
180
181    pub async fn run(&self) -> Result<()> {
182        if self.config.relays.is_empty() || self.config.max_follow_distance == 0 {
183            return Ok(());
184        }
185
186        info!(
187            "Nostr mirror starting: relays={} max_follow_distance={} negentropy_only={} kinds={:?} history_sync_author_chunk_size={} history_sync_on_start={} history_sync_on_reconnect={}",
188            self.config.relays.len(),
189            self.config.max_follow_distance,
190            self.config.require_negentropy,
191            self.config.kinds,
192            self.config.history_sync_author_chunk_size.max(1),
193            self.config.history_sync_on_start,
194            self.config.history_sync_on_reconnect
195        );
196
197        tokio::time::sleep(MIRROR_STARTUP_DELAY).await;
198        tokio::time::sleep(MIRROR_CONNECT_SETTLE_DELAY).await;
199        let live_since = Timestamp::now();
200        self.note_profile_search_root_change()?;
201
202        let initial_authors = self.collect_authors()?;
203        if initial_authors.is_empty() {
204            info!("Nostr mirror: no social-graph authors to mirror yet");
205        } else if self.config.history_sync_on_start {
206            if self.should_backfill_missing_profiles(None) {
207                let missing_profile_authors = self.collect_missing_profile_authors(
208                    self.config.missing_profile_backfill_batch_size,
209                )?;
210                if !missing_profile_authors.is_empty() {
211                    info!(
212                        "Nostr mirror missing-profile backfill starting: authors={}",
213                        missing_profile_authors.len()
214                    );
215                    self.history_sync_authors_with_kinds(
216                        missing_profile_authors,
217                        &[Kind::Metadata.as_u16()],
218                    )
219                    .await?;
220                }
221            }
222            self.history_sync_authors(initial_authors.clone()).await?;
223        }
224
225        let mut subscribed_authors = HashSet::new();
226        self.subscribe_authors_since(&initial_authors, live_since, &mut subscribed_authors)
227            .await?;
228
229        let mut relay_statuses = self.capture_relay_statuses().await;
230        let mut last_reconnect_history_sync_at: Option<Instant> = None;
231        let mut last_missing_profile_backfill_at: Option<Instant> = None;
232        let mut notifications = self.client.notifications();
233        let mut shutdown_rx = self.shutdown_rx.clone();
234        let mut refresh_interval = tokio::time::interval(MIRROR_AUTHOR_REFRESH_INTERVAL);
235        let mut publish_interval = tokio::time::interval(MIRROR_ROOT_PUBLISH_DEBOUNCE);
236
237        loop {
238            tokio::select! {
239                _ = shutdown_rx.changed() => {
240                    if *shutdown_rx.borrow() {
241                        break;
242                    }
243                }
244                _ = refresh_interval.tick() => {
245                    let authors = self.collect_authors()?;
246                    let new_authors = authors
247                        .into_iter()
248                        .filter(|author| !subscribed_authors.contains(author))
249                        .collect::<Vec<_>>();
250                    if !new_authors.is_empty() {
251                        debug!(
252                            "Nostr mirror discovered {} newly reachable author(s)",
253                            new_authors.len()
254                        );
255                        self.history_sync_authors(new_authors.clone()).await?;
256                        self.subscribe_authors_since(
257                            &new_authors,
258                            Timestamp::now(),
259                            &mut subscribed_authors,
260                        )
261                        .await?;
262                    }
263                    if self.should_backfill_missing_profiles(last_missing_profile_backfill_at) {
264                        let missing_profile_authors = self.collect_missing_profile_authors(
265                            self.config.missing_profile_backfill_batch_size,
266                        )?;
267                        if !missing_profile_authors.is_empty() {
268                            info!(
269                                "Nostr mirror missing-profile backfill starting: authors={}",
270                                missing_profile_authors.len()
271                            );
272                            self.history_sync_authors_with_kinds(
273                                missing_profile_authors,
274                                &[Kind::Metadata.as_u16()],
275                            )
276                            .await?;
277                            last_missing_profile_backfill_at = Some(Instant::now());
278                        }
279                    }
280                }
281                _ = publish_interval.tick() => {
282                    if let Err(err) = self.maybe_publish_profile_search_root(false).await {
283                        warn!("Nostr mirror profile-search publish failed: {:#}", err);
284                    }
285                }
286                notification = notifications.recv() => {
287                    match notification {
288                        Ok(RelayPoolNotification::Event { event, .. }) => {
289                            self.ingest_live_event(&event)?;
290                        }
291                        Ok(RelayPoolNotification::RelayStatus { relay_url, status }) => {
292                            let relay_url = relay_url.to_string();
293                            let previous = relay_statuses.insert(relay_url.clone(), status);
294                            if Self::should_history_sync_on_reconnect(
295                                self.config.history_sync_on_reconnect,
296                                previous,
297                                status,
298                            ) && Self::should_run_reconnect_history_sync(
299                                    last_reconnect_history_sync_at.as_ref(),
300                                )
301                            {
302                                let authors = self.collect_authors()?;
303                                if !authors.is_empty() {
304                                    info!(
305                                        "Nostr mirror relay reconnected; running catch-up history sync: relay={} authors={} negentropy_only={}",
306                                        relay_url,
307                                        authors.len(),
308                                        self.config.require_negentropy
309                                    );
310                                    self.history_sync_authors(authors).await?;
311                                    last_reconnect_history_sync_at = Some(Instant::now());
312                                }
313                            }
314                        }
315                        Ok(RelayPoolNotification::Shutdown) => break,
316                        Ok(_) => {}
317                        Err(err) => {
318                            warn!("Nostr mirror notification error: {}", err);
319                            break;
320                        }
321                    }
322                }
323            }
324        }
325
326        let _ = self.client.disconnect().await;
327        if let Some(client) = self.publish_client.as_ref() {
328            let _ = client.disconnect().await;
329        }
330        Ok(())
331    }
332
333    async fn capture_relay_statuses(&self) -> HashMap<String, RelayStatus> {
334        let mut statuses = HashMap::new();
335        for (relay_url, relay) in self.client.relays().await {
336            statuses.insert(relay_url.to_string(), relay.status().await);
337        }
338        statuses
339    }
340
341    async fn has_connected_publish_relay(&self) -> bool {
342        let Some(client) = self.publish_client.as_ref() else {
343            return false;
344        };
345        Self::client_has_connected_relay(client).await
346    }
347
348    async fn client_has_connected_relay(client: &Client) -> bool {
349        for (_relay_url, relay) in client.relays().await {
350            if relay.status().await == RelayStatus::Connected {
351                return true;
352            }
353        }
354        false
355    }
356
357    fn collect_authors(&self) -> Result<Vec<String>> {
358        let mut authors = Vec::new();
359        let mut seen = HashSet::new();
360        for distance in 0..=self.config.max_follow_distance {
361            for pubkey in socialgraph::SocialGraphBackend::users_by_follow_distance(
362                self.graph_store.as_ref(),
363                distance,
364            )
365            .with_context(|| format!("load social-graph distance {distance}"))?
366            {
367                if self
368                    .graph_store
369                    .is_overmuted_user(&pubkey, self.config.overmute_threshold)?
370                {
371                    continue;
372                }
373                let hex = hex::encode(pubkey);
374                if seen.insert(hex.clone()) {
375                    authors.push(hex);
376                }
377            }
378        }
379        Ok(authors)
380    }
381
382    fn collect_missing_profile_authors(&self, limit: usize) -> Result<Vec<String>> {
383        if limit == 0 {
384            return Ok(Vec::new());
385        }
386
387        let authors = self.collect_authors()?;
388        if authors.is_empty() {
389            return Ok(Vec::new());
390        }
391
392        let mut cursor = self
393            .missing_profile_cursor
394            .lock()
395            .expect("missing profile cursor");
396        let mut index = (*cursor).min(authors.len());
397        let mut scanned = 0usize;
398        let mut missing = Vec::new();
399
400        while scanned < authors.len() && missing.len() < limit {
401            let author = &authors[index];
402            if self.graph_store.latest_profile_event(author)?.is_none() {
403                missing.push(author.clone());
404            }
405            index += 1;
406            if index == authors.len() {
407                index = 0;
408            }
409            scanned += 1;
410        }
411
412        *cursor = index;
413        Ok(missing)
414    }
415
416    fn should_backfill_missing_profiles(&self, last_run: Option<Instant>) -> bool {
417        if self.config.missing_profile_backfill_batch_size == 0
418            || !self.config.kinds.contains(&Kind::Metadata.as_u16())
419        {
420            return false;
421        }
422        match last_run {
423            Some(last_run) => last_run.elapsed() >= MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL,
424            None => true,
425        }
426    }
427
428    fn should_history_sync_on_reconnect(
429        history_sync_on_reconnect: bool,
430        previous: Option<RelayStatus>,
431        status: RelayStatus,
432    ) -> bool {
433        history_sync_on_reconnect
434            && status == RelayStatus::Connected
435            && matches!(
436                previous,
437                Some(
438                    RelayStatus::Initialized
439                        | RelayStatus::Pending
440                        | RelayStatus::Connecting
441                        | RelayStatus::Disconnected
442                        | RelayStatus::Terminated
443                )
444            )
445    }
446
447    fn should_run_reconnect_history_sync(last_run: Option<&Instant>) -> bool {
448        match last_run {
449            None => true,
450            Some(last_run) => last_run.elapsed() >= MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN,
451        }
452    }
453
454    async fn history_sync_authors(&self, authors: Vec<String>) -> Result<()> {
455        self.history_sync_authors_with_kinds(authors, &self.config.kinds)
456            .await
457    }
458
459    async fn history_sync_authors_with_kinds(
460        &self,
461        authors: Vec<String>,
462        kinds: &[u16],
463    ) -> Result<()> {
464        self.history_sync_authors_chunked(authors, |current_root, author_chunk| async move {
465            self.history_sync_author_chunk(current_root, author_chunk, kinds)
466                .await
467        })
468        .await
469    }
470
471    async fn history_sync_authors_chunked<F, Fut>(
472        &self,
473        authors: Vec<String>,
474        mut run_chunk: F,
475    ) -> Result<()>
476    where
477        F: FnMut(Option<hashtree_core::Cid>, Vec<String>) -> Fut,
478        Fut: std::future::Future<Output = Result<CrawlReport>>,
479    {
480        if authors.is_empty() {
481            return Ok(());
482        }
483
484        info!(
485            "Nostr mirror history sync starting: authors={} relays={} negentropy_only={}",
486            authors.len(),
487            self.config.relays.len(),
488            self.config.require_negentropy
489        );
490
491        let mut current_root = self.graph_store.public_events_root()?;
492        let mut last_error = None;
493        let mut applied_chunks = 0usize;
494        let mut failed_chunks = 0usize;
495        let chunk_size = self.config.history_sync_author_chunk_size.max(1);
496        let total_chunks = authors.len().div_ceil(chunk_size);
497
498        for (chunk_index, author_chunk) in authors.chunks(chunk_size).enumerate() {
499            let author_chunk = author_chunk.to_vec();
500            let author_count = author_chunk.len();
501            info!(
502                "Nostr mirror history sync chunk starting: chunk={}/{} authors={}",
503                chunk_index + 1,
504                total_chunks,
505                author_count
506            );
507            let report = match run_chunk(current_root.clone(), author_chunk).await {
508                Ok(report) => report,
509                Err(err) => {
510                    failed_chunks = failed_chunks.saturating_add(1);
511                    warn!(
512                        "Nostr mirror history sync chunk failed: chunk={}/{} authors={} error={:#}",
513                        chunk_index + 1,
514                        total_chunks,
515                        author_count,
516                        err
517                    );
518                    last_error = Some(err);
519                    continue;
520                }
521            };
522
523            if report.root != current_root {
524                self.apply_history_root(report.root.as_ref()).await?;
525                current_root = report.root.clone();
526                info!(
527                    "Nostr mirror history sync updated trusted root: chunk={}/{} authors_processed={} events_selected={} events_seen={}",
528                    chunk_index + 1,
529                    total_chunks,
530                    report.authors_processed,
531                    report.events_selected,
532                    report.events_seen
533                );
534            }
535            applied_chunks = applied_chunks.saturating_add(1);
536        }
537
538        if applied_chunks == 0 {
539            return Err(last_error
540                .unwrap_or_else(|| anyhow::anyhow!("mirror history sync made no progress"))
541                .context("run mirror history sync"));
542        }
543        if failed_chunks > 0 {
544            warn!(
545                "Nostr mirror history sync completed with skipped chunks: applied_chunks={} failed_chunks={}",
546                applied_chunks,
547                failed_chunks
548            );
549        }
550        Ok(())
551    }
552
553    async fn history_sync_author_chunk(
554        &self,
555        current_root: Option<hashtree_core::Cid>,
556        authors: Vec<String>,
557        kinds: &[u16],
558    ) -> Result<CrawlReport> {
559        let mut last_error = None;
560        let mut report = None;
561        for attempt in 0..3 {
562            let mut last_logged_authors = 0usize;
563            let bridge = NostrBridge::new(
564                self.store.store_arc(),
565                CrawlConfig {
566                    relays: self.config.relays.clone(),
567                    author_allowlist: Some(authors.clone()),
568                    max_live_bytes: None,
569                    max_events_seen: None,
570                    max_authors: None,
571                    max_follow_distance: None,
572                    author_batch_size: self.config.author_batch_size.max(1),
573                    per_author_event_limit: kinds.len().max(1),
574                    per_author_live_bytes: None,
575                    fetch_timeout: self.config.fetch_timeout,
576                    kinds: Some(kinds.to_vec()),
577                    relay_fetch_mode: RelayFetchMode::AuthorBatches,
578                    require_negentropy: self.config.require_negentropy,
579                    relay_event_max_size: self.config.relay_event_max_size,
580                    relay_page_size: 1_000,
581                    max_relay_pages: 10,
582                },
583            );
584
585            match bridge
586                .crawl_with_progress(self.graph_store.as_ref(), current_root.as_ref(), |progress| {
587                    let log_interval = self.config.author_batch_size.saturating_mul(8).max(2_048);
588                    let should_log = progress.authors_processed == progress.authors_considered
589                        || progress.authors_processed == 0
590                        || progress
591                            .authors_processed
592                            .saturating_sub(last_logged_authors)
593                            >= log_interval;
594                    if should_log {
595                        last_logged_authors = progress.authors_processed;
596                        info!(
597                            "Nostr mirror history sync progress: authors_processed={}/{} events_selected={} events_seen={}",
598                            progress.authors_processed,
599                            progress.authors_considered,
600                            progress.events_selected,
601                            progress.events_seen
602                        );
603                    }
604                })
605                .await
606            {
607                Ok(next_report) => {
608                    report = Some(next_report);
609                    break;
610                }
611                Err(err) => {
612                    last_error = Some(err);
613                    if attempt < 2 {
614                        tokio::time::sleep(Duration::from_millis(500)).await;
615                    }
616                }
617            }
618        }
619        report
620            .ok_or_else(|| last_error.expect("history sync retry captured error"))
621            .context("run mirror history sync")
622    }
623
624    async fn apply_history_root(&self, root: Option<&hashtree_core::Cid>) -> Result<()> {
625        self.graph_store.write_public_events_root(root)?;
626        let Some(root) = root else {
627            return Ok(());
628        };
629
630        let event_store = NostrEventStore::new(self.store.store_arc());
631        let events = event_store
632            .list_recent_lossy(Some(root), ListEventsOptions::default())
633            .await
634            .context("list trusted mirrored events")?
635            .into_iter()
636            .map(socialgraph::stored_event_to_nostr_event)
637            .collect::<Result<Vec<_>>>()?;
638
639        self.graph_store
640            .rebuild_profile_index_for_events(&events)
641            .context("rebuild mirrored profile search index")?;
642        socialgraph::ingest_graph_parsed_events(self.graph_store.as_ref(), &events)
643            .context("sync mirrored social graph state")?;
644        self.note_profile_search_root_change()?;
645        if let Err(err) = self.maybe_publish_profile_search_root(false).await {
646            warn!(
647                "Nostr mirror profile-search publish failed after root update: {:#}",
648                err
649            );
650        }
651        Ok(())
652    }
653
654    async fn subscribe_authors_since(
655        &self,
656        authors: &[String],
657        since: Timestamp,
658        subscribed_authors: &mut HashSet<String>,
659    ) -> Result<()> {
660        let new_authors = authors
661            .iter()
662            .filter(|author| !subscribed_authors.contains(*author))
663            .cloned()
664            .collect::<Vec<_>>();
665        if new_authors.is_empty() {
666            return Ok(());
667        }
668
669        for chunk in new_authors.chunks(self.config.author_batch_size.max(1)) {
670            let pubkeys = chunk
671                .iter()
672                .filter_map(|author| PublicKey::from_hex(author).ok())
673                .collect::<Vec<_>>();
674            if pubkeys.is_empty() {
675                continue;
676            }
677
678            let filter = Filter::new()
679                .authors(pubkeys)
680                .kinds(self.config.kinds.iter().copied().map(Kind::from))
681                .since(since);
682
683            self.client
684                .subscribe(vec![filter], None)
685                .await
686                .context("subscribe mirror author batch")?;
687        }
688
689        subscribed_authors.extend(new_authors);
690        Ok(())
691    }
692
693    fn ingest_live_event(&self, event: &Event) -> Result<()> {
694        socialgraph::ingest_parsed_event(self.graph_store.as_ref(), event)
695            .context("ingest live mirrored event")?;
696        if event.kind == Kind::Metadata {
697            self.note_profile_search_root_change()?;
698        }
699        Ok(())
700    }
701
702    fn note_profile_search_root_change(&self) -> Result<()> {
703        let Some(_tree_name) = self.config.published_profile_search_tree_name.as_deref() else {
704            return Ok(());
705        };
706
707        let root = self.graph_store.profile_search_root()?;
708        let mut state = self
709            .profile_search_publish_state
710            .lock()
711            .expect("profile search publish state");
712        let now = Instant::now();
713
714        if state.pending_root == root {
715            return Ok(());
716        }
717
718        state.pending_root = root;
719        state.last_changed_at = Some(now);
720        if state.dirty_since.is_none() {
721            state.dirty_since = Some(now);
722        }
723        Ok(())
724    }
725
726    async fn maybe_publish_profile_search_root(&self, force: bool) -> Result<()> {
727        let Some(tree_name) = self.config.published_profile_search_tree_name.as_deref() else {
728            return Ok(());
729        };
730        let Some(publish_client) = self.publish_client.as_ref() else {
731            return Ok(());
732        };
733        if !self.has_connected_publish_relay().await {
734            return Ok(());
735        }
736
737        let pending_root = {
738            let state = self
739                .profile_search_publish_state
740                .lock()
741                .expect("profile search publish state");
742            let Some(pending_root) = state.pending_root.clone() else {
743                return Ok(());
744            };
745            if state.last_published_root.as_ref() == Some(&pending_root) {
746                return Ok(());
747            }
748
749            let now = Instant::now();
750            let debounce_ready = state.last_changed_at.is_some_and(|changed_at| {
751                now.duration_since(changed_at) >= MIRROR_ROOT_PUBLISH_DEBOUNCE
752            });
753            let stale_ready = state.dirty_since.is_some_and(|dirty_since| {
754                now.duration_since(dirty_since) >= MIRROR_ROOT_PUBLISH_MAX_STALENESS
755            });
756            if !force && !debounce_ready && !stale_ready {
757                return Ok(());
758            }
759
760            pending_root
761        };
762
763        let event = Self::build_public_root_event(tree_name, &pending_root);
764        let output = publish_client
765            .send_event_builder(event)
766            .await
767            .context("publish profile search root event")?;
768        if output.failed.is_empty() && output.success.is_empty() {
769            return Ok(());
770        }
771
772        {
773            let mut state = self
774                .profile_search_publish_state
775                .lock()
776                .expect("profile search publish state");
777            if state.pending_root.as_ref() == Some(&pending_root) {
778                state.last_published_root = Some(pending_root.clone());
779                state.last_published_at = Some(Instant::now());
780                state.dirty_since = None;
781            }
782        }
783
784        info!(
785            "Nostr mirror published profile search root: tree={} hash={}",
786            tree_name,
787            hex::encode(pending_root.hash)
788        );
789        Ok(())
790    }
791
792    fn build_public_root_event(tree_name: &str, cid: &hashtree_core::Cid) -> EventBuilder {
793        let mut tags = vec![
794            Tag::identifier(tree_name.to_string()),
795            Tag::custom(
796                TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
797                vec!["hashtree"],
798            ),
799            Tag::custom(TagKind::Custom("hash".into()), vec![hex::encode(cid.hash)]),
800        ];
801        if let Some(key) = cid.key {
802            tags.push(Tag::custom(
803                TagKind::Custom("key".into()),
804                vec![hex::encode(key)],
805            ));
806        }
807
808        EventBuilder::new(Kind::Custom(30078), "", tags)
809    }
810}
811
812#[cfg(test)]
813mod tests;