Skip to main content

hashtree_cli/
nostr_mirror.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::sync::Mutex;
5use std::time::Duration;
6use std::time::Instant;
7
8use anyhow::{Context, Result};
9use hashtree_nostr::{
10    CrawlConfig, CrawlReport, ListEventsOptions, NostrBridge, NostrEventStore, RelayFetchMode,
11};
12use nostr::{
13    Alphabet, Event, EventBuilder, Filter, Kind, PublicKey, SingleLetterTag, Tag, TagKind,
14    Timestamp,
15};
16use nostr_sdk::{
17    pool::RelayLimits, prelude::RelayPoolNotification, Client, ClientOptions, Keys, RelayStatus,
18};
19use tokio::sync::{watch, Mutex as AsyncMutex};
20use tracing::{debug, info, warn};
21
22use crate::blossom_push::background_blossom_push_incremental_with_store;
23use crate::socialgraph::crawler::SOCIALGRAPH_RELAY_EVENT_MAX_SIZE;
24use crate::socialgraph::{self, SocialGraphBackend, SocialGraphStore};
25use crate::HashtreeStore;
26
27#[cfg(not(test))]
28const MIRROR_STARTUP_DELAY: Duration = Duration::from_secs(8);
29#[cfg(test)]
30const MIRROR_STARTUP_DELAY: Duration = Duration::from_millis(50);
31
32#[cfg(not(test))]
33const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_secs(1);
34#[cfg(test)]
35const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_millis(250);
36
37#[cfg(not(test))]
38const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
39#[cfg(test)]
40const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_millis(100);
41
42#[cfg(not(test))]
43const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_secs(30);
44#[cfg(test)]
45const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_millis(100);
46
47const KIND_LONG_FORM_CONTENT: u16 = 30_023;
48const DEFAULT_HISTORY_KINDS: [u16; 7] = [0, 1, 3, 6, 7, 9735, KIND_LONG_FORM_CONTENT];
49const DEFAULT_EVENT_TREE_NAME: &str = "nostr-event-index";
50const DEFAULT_PROFILE_SEARCH_TREE_NAME: &str = "profile-search";
51const DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME: &str = "profiles-by-pubkey";
52const MIRROR_UPLOAD_STATE_DIR: &str = "nostr-mirror";
53const MIRROR_UPLOADED_ROOT_SUFFIX: &str = ".uploaded-root";
54const METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 1;
55const METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE: usize = 64;
56const DEFAULT_FULL_TEXT_NOTE_HISTORY_FOLLOW_DISTANCE: u32 = 2;
57const DEFAULT_FULL_TEXT_NOTE_HISTORY_MAX_RELAY_PAGES: usize = 0;
58const FULL_TEXT_HISTORY_PRIORITY_MAX_DISTANCE: u32 = 1;
59const FULL_TEXT_HISTORY_PRIORITY_SAMPLE_LIMIT: usize = 32;
60const LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER: usize = 8;
61const LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 16;
62const LARGE_HISTORY_SYNC_MAX_RELAY_PAGES: usize = 20;
63
64#[cfg(not(test))]
65const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_secs(300);
66#[cfg(test)]
67const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_millis(100);
68
69#[cfg(not(test))]
70const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_secs(5);
71#[cfg(test)]
72const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_millis(20);
73
74#[cfg(not(test))]
75const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_secs(30);
76#[cfg(test)]
77const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_millis(100);
78
79#[cfg(not(test))]
80const MIRROR_ROOT_UPLOAD_RETRY_INTERVAL: Duration = Duration::from_secs(60);
81#[cfg(test)]
82const MIRROR_ROOT_UPLOAD_RETRY_INTERVAL: Duration = Duration::from_millis(100);
83
84#[cfg(not(test))]
85const MIRROR_ROOT_PUBLISH_PRIMARY_TIMEOUT: Duration = Duration::from_secs(2);
86#[cfg(test)]
87const MIRROR_ROOT_PUBLISH_PRIMARY_TIMEOUT: Duration = Duration::from_millis(250);
88
89#[cfg(not(test))]
90const MIRROR_ROOT_PUBLISH_RETRY_TIMEOUT: Duration = Duration::from_secs(5);
91#[cfg(test)]
92const MIRROR_ROOT_PUBLISH_RETRY_TIMEOUT: Duration = Duration::from_secs(2);
93
94const MISSING_LOCAL_BLOB_PUSH_ERROR: &str = "missing local blob";
95
96fn trim_transient_allocations() {
97    #[cfg(all(target_os = "linux", target_env = "gnu"))]
98    unsafe {
99        // SAFETY: malloc_trim asks glibc to return free heap pages to the OS.
100        // It does not touch live allocations and is safe to call after the
101        // large, synchronous mirror upload job has dropped its temporary data.
102        libc::malloc_trim(0);
103    }
104}
105
106fn decode_hex_pubkey(value: &str) -> Option<[u8; 32]> {
107    let bytes = hex::decode(value).ok()?;
108    <[u8; 32]>::try_from(bytes.as_slice()).ok()
109}
110
111#[derive(Debug, Clone)]
112pub struct NostrMirrorConfig {
113    pub relays: Vec<String>,
114    pub publish_relays: Vec<String>,
115    pub blossom_write_servers: Vec<String>,
116    pub max_follow_distance: u32,
117    pub overmute_threshold: f64,
118    pub author_batch_size: usize,
119    pub history_sync_author_chunk_size: usize,
120    pub history_sync_per_author_event_limit: usize,
121    pub missing_profile_backfill_batch_size: usize,
122    pub fetch_timeout: Duration,
123    pub relay_event_max_size: Option<u32>,
124    pub require_negentropy: bool,
125    pub kinds: Vec<u16>,
126    pub history_sync_on_start: bool,
127    pub history_sync_on_reconnect: bool,
128    pub full_text_note_history_follow_distance: Option<u32>,
129    pub full_text_note_history_max_relay_pages: usize,
130    pub published_event_tree_name: Option<String>,
131    pub published_profile_search_tree_name: Option<String>,
132    pub published_profiles_by_pubkey_tree_name: Option<String>,
133}
134
135impl Default for NostrMirrorConfig {
136    fn default() -> Self {
137        Self {
138            relays: Vec::new(),
139            publish_relays: Vec::new(),
140            blossom_write_servers: Vec::new(),
141            max_follow_distance: 2,
142            overmute_threshold: 1.0,
143            author_batch_size: 256,
144            history_sync_author_chunk_size: 5_000,
145            history_sync_per_author_event_limit: 256,
146            missing_profile_backfill_batch_size: 5_000,
147            fetch_timeout: Duration::from_secs(15),
148            relay_event_max_size: Some(SOCIALGRAPH_RELAY_EVENT_MAX_SIZE),
149            require_negentropy: false,
150            kinds: DEFAULT_HISTORY_KINDS.to_vec(),
151            history_sync_on_start: true,
152            history_sync_on_reconnect: true,
153            full_text_note_history_follow_distance: Some(
154                DEFAULT_FULL_TEXT_NOTE_HISTORY_FOLLOW_DISTANCE,
155            ),
156            full_text_note_history_max_relay_pages: DEFAULT_FULL_TEXT_NOTE_HISTORY_MAX_RELAY_PAGES,
157            published_event_tree_name: Some(DEFAULT_EVENT_TREE_NAME.to_string()),
158            published_profile_search_tree_name: Some(DEFAULT_PROFILE_SEARCH_TREE_NAME.to_string()),
159            published_profiles_by_pubkey_tree_name: Some(
160                DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME.to_string(),
161            ),
162        }
163    }
164}
165
166#[derive(Debug, Default)]
167struct RootPublishState {
168    pending_root: Option<hashtree_core::Cid>,
169    last_changed_at: Option<Instant>,
170    dirty_since: Option<Instant>,
171    last_published_root: Option<hashtree_core::Cid>,
172    last_published_at: Option<Instant>,
173    last_published_created_at: Option<Timestamp>,
174    last_uploaded_root: Option<hashtree_core::Cid>,
175    last_uploaded_at: Option<Instant>,
176    upload_in_progress_root: Option<hashtree_core::Cid>,
177    last_upload_failed_at: Option<Instant>,
178    last_upload_error: Option<String>,
179    missing_blob_rebuild_required: bool,
180}
181
182#[derive(Debug, Clone, Copy, PartialEq, Eq)]
183struct HistorySyncPlan {
184    relay_fetch_mode: RelayFetchMode,
185    author_batch_size: usize,
186    per_author_event_limit: usize,
187    relay_page_size: usize,
188    max_relay_pages: usize,
189}
190
191pub struct BackgroundNostrMirror {
192    config: NostrMirrorConfig,
193    store: Arc<HashtreeStore>,
194    graph_store: Arc<SocialGraphStore>,
195    client: Client,
196    publish_client: Option<Client>,
197    event_publish_state: Arc<Mutex<RootPublishState>>,
198    profile_search_publish_state: Arc<Mutex<RootPublishState>>,
199    profiles_by_pubkey_publish_state: Arc<Mutex<RootPublishState>>,
200    pending_live_events: Mutex<BTreeMap<String, Event>>,
201    missing_profile_cursor: Mutex<usize>,
202    history_sync_lock: AsyncMutex<()>,
203    shutdown_tx: watch::Sender<bool>,
204    shutdown_rx: watch::Receiver<bool>,
205}
206
207impl BackgroundNostrMirror {
208    pub async fn new(
209        config: NostrMirrorConfig,
210        store: Arc<HashtreeStore>,
211        graph_store: Arc<SocialGraphStore>,
212        publish_keys: Option<Keys>,
213    ) -> Result<Self> {
214        let client = if let Some(max_size) = config.relay_event_max_size {
215            let mut limits = RelayLimits::default();
216            limits.events.max_size = Some(max_size);
217            Client::builder()
218                .signer(Keys::generate())
219                .opts(ClientOptions::new().relay_limits(limits))
220                .build()
221        } else {
222            Client::new(Keys::generate())
223        };
224        for relay in &config.relays {
225            client
226                .add_relay(relay)
227                .await
228                .with_context(|| format!("add mirror relay {relay}"))?;
229        }
230        client.connect().await;
231
232        let publish_client = if let Some(keys) = publish_keys {
233            if config.publish_relays.is_empty() {
234                None
235            } else {
236                let client = Client::new(keys);
237                for relay in &config.publish_relays {
238                    client
239                        .add_relay(relay)
240                        .await
241                        .with_context(|| format!("add mirror publish relay {relay}"))?;
242                }
243                client.connect().await;
244                Some(client)
245            }
246        } else {
247            None
248        };
249
250        let (shutdown_tx, shutdown_rx) = watch::channel(false);
251        let event_publish_state = Self::root_publish_state_from_disk(
252            store.base_path(),
253            config.published_event_tree_name.as_deref(),
254            "event root",
255        );
256        let profile_search_publish_state = Self::root_publish_state_from_disk(
257            store.base_path(),
258            config.published_profile_search_tree_name.as_deref(),
259            "profile-search root",
260        );
261        let profiles_by_pubkey_publish_state = Self::root_publish_state_from_disk(
262            store.base_path(),
263            config.published_profiles_by_pubkey_tree_name.as_deref(),
264            "profiles-by-pubkey root",
265        );
266        Ok(Self {
267            config,
268            store,
269            graph_store,
270            client,
271            publish_client,
272            event_publish_state: Arc::new(Mutex::new(event_publish_state)),
273            profile_search_publish_state: Arc::new(Mutex::new(profile_search_publish_state)),
274            profiles_by_pubkey_publish_state: Arc::new(Mutex::new(
275                profiles_by_pubkey_publish_state,
276            )),
277            pending_live_events: Mutex::new(BTreeMap::new()),
278            missing_profile_cursor: Mutex::new(0),
279            history_sync_lock: AsyncMutex::new(()),
280            shutdown_tx,
281            shutdown_rx,
282        })
283    }
284
285    fn root_publish_state_from_disk(
286        base_path: &std::path::Path,
287        tree_name: Option<&str>,
288        log_label: &str,
289    ) -> RootPublishState {
290        let Some(tree_name) = tree_name else {
291            return RootPublishState::default();
292        };
293        let path = Self::uploaded_root_state_path(base_path, tree_name);
294        let Some(root) = Self::read_uploaded_root_state(&path, log_label) else {
295            return RootPublishState::default();
296        };
297
298        RootPublishState {
299            last_uploaded_root: Some(root),
300            ..RootPublishState::default()
301        }
302    }
303
304    fn uploaded_root_state_path(base_path: &std::path::Path, tree_name: &str) -> PathBuf {
305        let safe_tree_name = tree_name
306            .chars()
307            .map(|ch| {
308                if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
309                    ch
310                } else {
311                    '_'
312                }
313            })
314            .collect::<String>();
315        base_path
316            .join(MIRROR_UPLOAD_STATE_DIR)
317            .join(format!("{safe_tree_name}{MIRROR_UPLOADED_ROOT_SUFFIX}"))
318    }
319
320    fn read_uploaded_root_state(
321        path: &std::path::Path,
322        log_label: &str,
323    ) -> Option<hashtree_core::Cid> {
324        let raw = match std::fs::read_to_string(path) {
325            Ok(raw) => raw,
326            Err(err) if err.kind() == std::io::ErrorKind::NotFound => return None,
327            Err(err) => {
328                warn!(
329                    "Nostr mirror failed to read uploaded {} state {}: {}",
330                    log_label,
331                    path.display(),
332                    err
333                );
334                return None;
335            }
336        };
337        let trimmed = raw.trim();
338        if trimmed.is_empty() {
339            return None;
340        }
341        match hashtree_core::Cid::parse(trimmed) {
342            Ok(root) => Some(root),
343            Err(err) => {
344                warn!(
345                    "Nostr mirror ignored invalid uploaded {} state {}: {}",
346                    log_label,
347                    path.display(),
348                    err
349                );
350                None
351            }
352        }
353    }
354
355    fn write_uploaded_root_state(
356        path: &std::path::Path,
357        root: &hashtree_core::Cid,
358        log_label: &str,
359    ) -> Result<()> {
360        if let Some(parent) = path.parent() {
361            std::fs::create_dir_all(parent)
362                .with_context(|| format!("create {}", parent.display()))?;
363        }
364        std::fs::write(path, format!("{root}\n"))
365            .with_context(|| format!("write uploaded {log_label} state {}", path.display()))
366    }
367
368    pub fn shutdown(&self) {
369        let _ = self.shutdown_tx.send(true);
370    }
371
372    fn sync_publish_roots_from_store(&self) -> Result<()> {
373        self.note_public_events_root_change()?;
374        self.note_profile_search_root_change()?;
375        self.note_profiles_by_pubkey_root_change()?;
376        Ok(())
377    }
378
379    async fn publish_pending_roots(
380        &self,
381        force_event: bool,
382        force_profile_search: bool,
383        force_profiles_by_pubkey: bool,
384    ) -> (Result<()>, Result<()>, Result<()>) {
385        tokio::join!(
386            self.maybe_publish_event_root(force_event),
387            self.maybe_publish_profile_search_root(force_profile_search),
388            self.maybe_publish_profiles_by_pubkey_root(force_profiles_by_pubkey),
389        )
390    }
391
392    async fn publish_priority_roots(
393        &self,
394        force_event: bool,
395        force_profile_search: bool,
396        force_profiles_by_pubkey: bool,
397    ) -> (Result<()>, Result<()>, Result<()>) {
398        let (profile_search_result, profiles_by_pubkey_result) = tokio::join!(
399            async {
400                if force_profile_search {
401                    self.maybe_publish_profile_search_root(true).await
402                } else {
403                    Ok(())
404                }
405            },
406            async {
407                if force_profiles_by_pubkey {
408                    self.maybe_publish_profiles_by_pubkey_root(true).await
409                } else {
410                    Ok(())
411                }
412            },
413        );
414        let event_result = if force_event {
415            self.maybe_publish_event_root(true).await
416        } else {
417            Ok(())
418        };
419        (
420            event_result,
421            profile_search_result,
422            profiles_by_pubkey_result,
423        )
424    }
425
426    pub async fn run(self: Arc<Self>) -> Result<()> {
427        if self.config.relays.is_empty() || self.config.max_follow_distance == 0 {
428            return Ok(());
429        }
430
431        info!(
432            "Nostr mirror starting: relays={} max_follow_distance={} negentropy_only={} kinds={:?} history_sync_author_chunk_size={} history_sync_on_start={} history_sync_on_reconnect={}",
433            self.config.relays.len(),
434            self.config.max_follow_distance,
435            self.config.require_negentropy,
436            self.config.kinds,
437            self.config.history_sync_author_chunk_size.max(1),
438            self.config.history_sync_on_start,
439            self.config.history_sync_on_reconnect
440        );
441
442        tokio::time::sleep(MIRROR_STARTUP_DELAY).await;
443        tokio::time::sleep(MIRROR_CONNECT_SETTLE_DELAY).await;
444        let live_since = Timestamp::now();
445        self.sync_publish_roots_from_store()?;
446        let (event_result, profile_search_result, profiles_by_pubkey_result) =
447            self.publish_priority_roots(true, true, true).await;
448        if let Err(err) = event_result {
449            warn!(
450                "Nostr mirror event-root publish failed on startup: {:#}",
451                err
452            );
453        }
454        if let Err(err) = profile_search_result {
455            warn!(
456                "Nostr mirror profile-search publish failed on startup: {:#}",
457                err
458            );
459        }
460        if let Err(err) = profiles_by_pubkey_result {
461            warn!(
462                "Nostr mirror profiles-by-pubkey publish failed on startup: {:#}",
463                err
464            );
465        }
466
467        let initial_authors = self.collect_authors()?;
468        if initial_authors.is_empty() {
469            info!("Nostr mirror: no social-graph authors to mirror yet");
470        }
471
472        let mut subscribed_authors = HashSet::new();
473        self.subscribe_authors_since(&initial_authors, live_since, &mut subscribed_authors)
474            .await?;
475
476        if !initial_authors.is_empty() && self.config.history_sync_on_start {
477            self.spawn_startup_history_sync(initial_authors.clone());
478        }
479
480        let mut relay_statuses = self.capture_relay_statuses().await;
481        let mut last_reconnect_history_sync_at: Option<Instant> = None;
482        let mut last_missing_profile_backfill_at: Option<Instant> = None;
483        let mut notifications = self.client.notifications();
484        let mut shutdown_rx = self.shutdown_rx.clone();
485        let mut refresh_interval = tokio::time::interval(MIRROR_AUTHOR_REFRESH_INTERVAL);
486        let mut publish_interval = tokio::time::interval(MIRROR_ROOT_PUBLISH_DEBOUNCE);
487
488        loop {
489            tokio::select! {
490                _ = shutdown_rx.changed() => {
491                    if *shutdown_rx.borrow() {
492                        break;
493                    }
494                }
495                _ = refresh_interval.tick() => {
496                    let authors = self.collect_authors()?;
497                    let mut reconnected_relay = None;
498                    for (relay_url, status) in self.capture_relay_statuses().await {
499                        let previous = relay_statuses.insert(relay_url.clone(), status);
500                        if reconnected_relay.is_none()
501                            && Self::should_history_sync_on_reconnect(
502                                self.config.history_sync_on_reconnect,
503                                previous,
504                                status,
505                            )
506                        {
507                            reconnected_relay = Some(relay_url);
508                        }
509                    }
510                    if let Some(relay_url) = reconnected_relay {
511                        if Self::should_run_reconnect_history_sync(
512                            last_reconnect_history_sync_at.as_ref(),
513                        ) && !authors.is_empty()
514                        {
515                            info!(
516                                "Nostr mirror relay reconnected; running catch-up history sync: relay={} authors={} negentropy_only={}",
517                                relay_url,
518                                authors.len(),
519                                self.config.require_negentropy
520                            );
521                            self.spawn_author_history_sync(
522                                "relay reconnect catch-up",
523                                authors.clone(),
524                                false,
525                                false,
526                            );
527                            last_reconnect_history_sync_at = Some(Instant::now());
528                        }
529                    }
530                    let new_authors = authors
531                        .iter()
532                        .cloned()
533                        .filter(|author| !subscribed_authors.contains(author))
534                        .collect::<Vec<_>>();
535                    if !new_authors.is_empty() {
536                        debug!(
537                            "Nostr mirror discovered {} newly reachable author(s)",
538                            new_authors.len()
539                        );
540                        self.subscribe_authors_since(
541                            &new_authors,
542                            Timestamp::now(),
543                            &mut subscribed_authors,
544                        )
545                        .await?;
546                        self.spawn_author_history_sync(
547                            "new-author catch-up",
548                            new_authors.clone(),
549                            true,
550                            true,
551                        );
552                    }
553                    if self.should_backfill_missing_profiles(last_missing_profile_backfill_at) {
554                        let missing_profile_authors = self.collect_missing_profile_authors(
555                            self.config.missing_profile_backfill_batch_size,
556                        )?;
557                        if !missing_profile_authors.is_empty() {
558                            info!(
559                                "Nostr mirror missing-profile backfill starting: authors={}",
560                                missing_profile_authors.len()
561                            );
562                            self.spawn_missing_profile_backfill(missing_profile_authors);
563                            last_missing_profile_backfill_at = Some(Instant::now());
564                        }
565                    }
566                }
567                _ = publish_interval.tick() => {
568                    self.sync_publish_roots_from_store()?;
569                    if let Err(err) = self.flush_live_events().await {
570                        warn!("Nostr mirror live event flush failed: {:#}", err);
571                    }
572                    let (event_result, profile_search_result, profiles_by_pubkey_result) = self
573                        .publish_pending_roots(false, false, false)
574                        .await;
575                    if let Err(err) = event_result {
576                        warn!("Nostr mirror event-root publish failed: {:#}", err);
577                    }
578                    if let Err(err) = profile_search_result {
579                        warn!("Nostr mirror profile-search publish failed: {:#}", err);
580                    }
581                    if let Err(err) = profiles_by_pubkey_result {
582                        warn!("Nostr mirror profiles-by-pubkey publish failed: {:#}", err);
583                    }
584                }
585                notification = notifications.recv() => {
586                    match notification {
587                        Ok(RelayPoolNotification::Event { event, .. }) => {
588                            self.ingest_live_event(&event)?;
589                        }
590                        Ok(RelayPoolNotification::Shutdown) => break,
591                        Ok(_) => {}
592                        Err(err) => {
593                            warn!("Nostr mirror notification error: {}", err);
594                            break;
595                        }
596                    }
597                }
598            }
599        }
600
601        if let Err(err) = self.flush_live_events().await {
602            warn!(
603                "Nostr mirror live event flush failed during shutdown: {:#}",
604                err
605            );
606        }
607        if let Err(err) = self.sync_publish_roots_from_store() {
608            warn!(
609                "Nostr mirror root-state refresh failed during shutdown: {:#}",
610                err
611            );
612        }
613        let (event_result, profile_search_result, profiles_by_pubkey_result) =
614            self.publish_pending_roots(true, true, true).await;
615        if let Err(err) = event_result {
616            warn!(
617                "Nostr mirror event-root publish failed during shutdown: {:#}",
618                err
619            );
620        }
621        if let Err(err) = profile_search_result {
622            warn!(
623                "Nostr mirror profile-search publish failed during shutdown: {:#}",
624                err
625            );
626        }
627        if let Err(err) = profiles_by_pubkey_result {
628            warn!(
629                "Nostr mirror profiles-by-pubkey publish failed during shutdown: {:#}",
630                err
631            );
632        }
633        let _ = self.client.disconnect().await;
634        if let Some(client) = self.publish_client.as_ref() {
635            let _ = client.disconnect().await;
636        }
637        Ok(())
638    }
639
640    fn spawn_startup_history_sync(self: &Arc<Self>, initial_authors: Vec<String>) {
641        let mirror = Arc::clone(self);
642        tokio::task::spawn_blocking(move || {
643            let runtime = tokio::runtime::Builder::new_current_thread()
644                .enable_all()
645                .build()
646                .expect("build nostr mirror startup history sync runtime");
647            runtime.block_on(async move {
648                let _guard = mirror.history_sync_lock.lock().await;
649                if let Err(err) = mirror.run_startup_history_sync(initial_authors).await {
650                    warn!("Nostr mirror startup history sync failed: {:#}", err);
651                }
652            });
653        });
654    }
655
656    async fn run_startup_history_sync(&self, initial_authors: Vec<String>) -> Result<()> {
657        self.history_sync_full_text_notes_for_reachable_authors()
658            .await?;
659        self.history_sync_authors(initial_authors).await?;
660        if self.should_backfill_missing_profiles(None) {
661            let missing_profile_authors = self
662                .collect_missing_profile_authors(self.config.missing_profile_backfill_batch_size)?;
663            if !missing_profile_authors.is_empty() {
664                info!(
665                    "Nostr mirror missing-profile backfill starting: authors={}",
666                    missing_profile_authors.len()
667                );
668                self.history_sync_authors_with_kinds(
669                    missing_profile_authors,
670                    &[Kind::Metadata.as_u16()],
671                )
672                .await?;
673            }
674        }
675        Ok(())
676    }
677
678    fn spawn_author_history_sync(
679        self: &Arc<Self>,
680        label: &'static str,
681        authors: Vec<String>,
682        include_full_text_notes: bool,
683        wait_for_existing_sync: bool,
684    ) {
685        let mirror = Arc::clone(self);
686        tokio::task::spawn_blocking(move || {
687            let runtime = tokio::runtime::Builder::new_current_thread()
688                .enable_all()
689                .build()
690                .expect("build nostr mirror author history sync runtime");
691            runtime.block_on(async move {
692                if wait_for_existing_sync {
693                    let _guard = mirror.history_sync_lock.lock().await;
694                    if let Err(err) = mirror
695                        .run_author_history_sync(authors, include_full_text_notes)
696                        .await
697                    {
698                        warn!("Nostr mirror {label} failed: {:#}", err);
699                    }
700                    return;
701                }
702
703                let Ok(_guard) = mirror.history_sync_lock.try_lock() else {
704                    info!("Nostr mirror {label} skipped; another history sync is running");
705                    return;
706                };
707                if let Err(err) = mirror
708                    .run_author_history_sync(authors, include_full_text_notes)
709                    .await
710                {
711                    warn!("Nostr mirror {label} failed: {:#}", err);
712                }
713            });
714        });
715    }
716
717    async fn run_author_history_sync(
718        &self,
719        authors: Vec<String>,
720        include_full_text_notes: bool,
721    ) -> Result<()> {
722        if include_full_text_notes {
723            self.history_sync_full_text_notes_for_authors(authors.clone())
724                .await?;
725        }
726        self.history_sync_authors(authors).await
727    }
728
729    fn spawn_missing_profile_backfill(self: &Arc<Self>, authors: Vec<String>) {
730        let mirror = Arc::clone(self);
731        tokio::task::spawn_blocking(move || {
732            let runtime = tokio::runtime::Builder::new_current_thread()
733                .enable_all()
734                .build()
735                .expect("build nostr mirror missing profile runtime");
736            runtime.block_on(async move {
737                let Ok(_guard) = mirror.history_sync_lock.try_lock() else {
738                    info!(
739                        "Nostr mirror missing-profile backfill skipped; another history sync is running"
740                    );
741                    return;
742                };
743                if let Err(err) = mirror
744                    .history_sync_authors_with_kinds(authors, &[Kind::Metadata.as_u16()])
745                    .await
746                {
747                    warn!("Nostr mirror missing-profile backfill failed: {:#}", err);
748                }
749            });
750        });
751    }
752
753    async fn capture_relay_statuses(&self) -> HashMap<String, RelayStatus> {
754        let mut statuses = HashMap::new();
755        for (relay_url, relay) in self.client.relays().await {
756            statuses.insert(relay_url.to_string(), relay.status());
757        }
758        statuses
759    }
760
761    async fn has_connected_publish_relay(&self) -> bool {
762        let Some(client) = self.publish_client.as_ref() else {
763            return false;
764        };
765        Self::client_has_connected_relay(client).await
766    }
767
768    async fn client_has_connected_relay(client: &Client) -> bool {
769        for (_relay_url, relay) in client.relays().await {
770            if relay.status() == RelayStatus::Connected {
771                return true;
772            }
773        }
774        false
775    }
776
777    fn collect_authors(&self) -> Result<Vec<String>> {
778        self.collect_authors_with_max_distance(self.config.max_follow_distance)
779    }
780
781    fn collect_authors_with_max_distance(&self, max_distance: u32) -> Result<Vec<String>> {
782        let mut authors = Vec::new();
783        let mut seen = HashSet::new();
784        for distance in 0..=max_distance {
785            for pubkey in socialgraph::SocialGraphBackend::users_by_follow_distance(
786                self.graph_store.as_ref(),
787                distance,
788            )
789            .with_context(|| format!("load social-graph distance {distance}"))?
790            {
791                if self
792                    .graph_store
793                    .is_overmuted_user(&pubkey, self.config.overmute_threshold)?
794                {
795                    continue;
796                }
797                let hex = hex::encode(pubkey);
798                if seen.insert(hex.clone()) {
799                    authors.push(hex);
800                }
801            }
802        }
803        Ok(authors)
804    }
805
806    async fn prioritize_full_text_note_history_authors(
807        &self,
808        authors: Vec<String>,
809    ) -> Result<Vec<String>> {
810        let Some(root) = self.graph_store.public_events_root()? else {
811            return Ok(authors);
812        };
813
814        let event_store = NostrEventStore::new(self.store.store_arc());
815        let mut prioritized = Vec::with_capacity(authors.len());
816        let mut sampled = 0usize;
817        for (index, author) in authors.into_iter().enumerate() {
818            let distance = match decode_hex_pubkey(&author) {
819                Some(pubkey) => self
820                    .graph_store
821                    .follow_distance(&pubkey)?
822                    .unwrap_or(u32::MAX),
823                None => u32::MAX,
824            };
825            let indexed_text_sample = if distance <= FULL_TEXT_HISTORY_PRIORITY_MAX_DISTANCE {
826                sampled = sampled.saturating_add(1);
827                event_store
828                    .list_by_author_and_kind(
829                        Some(&root),
830                        &author,
831                        Kind::TextNote.as_u16() as u32,
832                        ListEventsOptions {
833                            limit: Some(FULL_TEXT_HISTORY_PRIORITY_SAMPLE_LIMIT),
834                            ..ListEventsOptions::default()
835                        },
836                    )
837                    .await
838                    .with_context(|| {
839                        format!("sample indexed text-note history for author {author}")
840                    })?
841                    .len()
842            } else {
843                FULL_TEXT_HISTORY_PRIORITY_SAMPLE_LIMIT
844            };
845            prioritized.push((distance, indexed_text_sample, index, author));
846        }
847
848        prioritized.sort_by(|left, right| {
849            left.0
850                .cmp(&right.0)
851                .then_with(|| left.1.cmp(&right.1))
852                .then_with(|| left.2.cmp(&right.2))
853        });
854        info!(
855            "Nostr mirror full text content history prioritized authors: authors={} sampled_distance_le_{}={}",
856            prioritized.len(),
857            FULL_TEXT_HISTORY_PRIORITY_MAX_DISTANCE,
858            sampled
859        );
860        Ok(prioritized
861            .into_iter()
862            .map(|(_, _, _, author)| author)
863            .collect())
864    }
865
866    fn full_text_note_history_follow_distance(&self) -> Option<u32> {
867        let distance = self.config.full_text_note_history_follow_distance?;
868        if self
869            .config
870            .kinds
871            .iter()
872            .any(|kind| *kind == Kind::TextNote.as_u16() || *kind == KIND_LONG_FORM_CONTENT)
873        {
874            Some(distance.min(self.config.max_follow_distance))
875        } else {
876            None
877        }
878    }
879
880    fn full_text_note_history_max_relay_pages(&self) -> Option<usize> {
881        Self::full_text_note_history_max_relay_pages_for_config(&self.config)
882    }
883
884    fn full_text_note_history_max_relay_pages_for_config(
885        config: &NostrMirrorConfig,
886    ) -> Option<usize> {
887        let pages = config.full_text_note_history_max_relay_pages;
888        if pages == 0 {
889            None
890        } else {
891            Some(pages)
892        }
893    }
894
895    fn is_text_content_history_kind(kind: u16) -> bool {
896        kind == Kind::TextNote.as_u16() || kind == KIND_LONG_FORM_CONTENT
897    }
898
899    fn history_sync_kinds_for_config(config: &NostrMirrorConfig) -> Vec<u16> {
900        let mut kinds = config.kinds.clone();
901        kinds.retain(|kind| !Self::is_text_content_history_kind(*kind));
902        kinds
903    }
904
905    fn collect_missing_profile_authors(&self, limit: usize) -> Result<Vec<String>> {
906        if limit == 0 {
907            return Ok(Vec::new());
908        }
909
910        let authors = self.collect_authors()?;
911        if authors.is_empty() {
912            return Ok(Vec::new());
913        }
914
915        let mut cursor = self
916            .missing_profile_cursor
917            .lock()
918            .expect("missing profile cursor");
919        let mut index = (*cursor).min(authors.len());
920        let mut scanned = 0usize;
921        let mut missing = Vec::new();
922
923        while scanned < authors.len() && missing.len() < limit {
924            let author = &authors[index];
925            if self.graph_store.latest_profile_event(author)?.is_none() {
926                missing.push(author.clone());
927            }
928            index += 1;
929            if index == authors.len() {
930                index = 0;
931            }
932            scanned += 1;
933        }
934
935        *cursor = index;
936        Ok(missing)
937    }
938
939    fn should_backfill_missing_profiles(&self, last_run: Option<Instant>) -> bool {
940        if self.config.missing_profile_backfill_batch_size == 0
941            || !self.config.kinds.contains(&Kind::Metadata.as_u16())
942        {
943            return false;
944        }
945        match last_run {
946            Some(last_run) => last_run.elapsed() >= MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL,
947            None => true,
948        }
949    }
950
951    fn should_history_sync_on_reconnect(
952        history_sync_on_reconnect: bool,
953        previous: Option<RelayStatus>,
954        status: RelayStatus,
955    ) -> bool {
956        history_sync_on_reconnect
957            && status == RelayStatus::Connected
958            && matches!(
959                previous,
960                Some(
961                    RelayStatus::Initialized
962                        | RelayStatus::Pending
963                        | RelayStatus::Connecting
964                        | RelayStatus::Disconnected
965                        | RelayStatus::Terminated
966                )
967            )
968    }
969
970    fn should_run_reconnect_history_sync(last_run: Option<&Instant>) -> bool {
971        match last_run {
972            None => true,
973            Some(last_run) => last_run.elapsed() >= MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN,
974        }
975    }
976
977    fn is_metadata_only_history_sync(kinds: &[u16]) -> bool {
978        !kinds.is_empty() && kinds.iter().all(|kind| *kind == Kind::Metadata.as_u16())
979    }
980
981    fn history_sync_kinds_affect_profile_or_graph(kinds: &[u16]) -> bool {
982        kinds.is_empty()
983            || kinds.iter().any(|kind| {
984                *kind == Kind::Metadata.as_u16()
985                    || *kind == Kind::ContactList.as_u16()
986                    || *kind == Kind::MuteList.as_u16()
987            })
988    }
989
990    fn history_sync_plan_for(
991        config: &NostrMirrorConfig,
992        authors: usize,
993        kinds: &[u16],
994    ) -> HistorySyncPlan {
995        let author_batch_size = config.author_batch_size.max(1);
996        let per_author_event_limit = config.history_sync_per_author_event_limit.max(1);
997        let relay_page_size = 1_000;
998        let max_relay_pages = 10;
999
1000        if Self::is_metadata_only_history_sync(kinds) {
1001            return HistorySyncPlan {
1002                relay_fetch_mode: RelayFetchMode::AuthorBatches,
1003                author_batch_size: author_batch_size.min(METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE),
1004                per_author_event_limit: METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT,
1005                relay_page_size,
1006                max_relay_pages,
1007            };
1008        }
1009
1010        if authors > author_batch_size.saturating_mul(LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER) {
1011            return HistorySyncPlan {
1012                relay_fetch_mode: RelayFetchMode::GlobalRecent,
1013                author_batch_size,
1014                per_author_event_limit: per_author_event_limit
1015                    .min(LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT)
1016                    .max(1),
1017                relay_page_size,
1018                max_relay_pages: LARGE_HISTORY_SYNC_MAX_RELAY_PAGES,
1019            };
1020        }
1021
1022        HistorySyncPlan {
1023            relay_fetch_mode: RelayFetchMode::AuthorBatches,
1024            author_batch_size,
1025            per_author_event_limit,
1026            relay_page_size,
1027            max_relay_pages,
1028        }
1029    }
1030
1031    fn history_sync_plan(&self, authors: usize, kinds: &[u16]) -> HistorySyncPlan {
1032        Self::history_sync_plan_for(&self.config, authors, kinds)
1033    }
1034
1035    fn history_sync_chunk_size_for_config(
1036        config: &NostrMirrorConfig,
1037        authors: usize,
1038        kinds: &[u16],
1039        full_author_history: bool,
1040        chunk_size_override: Option<usize>,
1041    ) -> usize {
1042        let configured_chunk_size = chunk_size_override
1043            .unwrap_or(config.history_sync_author_chunk_size)
1044            .max(1);
1045        if full_author_history {
1046            return 1;
1047        }
1048        if !full_author_history
1049            && Self::history_sync_plan_for(config, authors, kinds).relay_fetch_mode
1050                == RelayFetchMode::GlobalRecent
1051        {
1052            return authors.max(1);
1053        }
1054        configured_chunk_size
1055    }
1056
1057    async fn history_sync_authors(&self, authors: Vec<String>) -> Result<()> {
1058        let kinds = Self::history_sync_kinds_for_config(&self.config);
1059        if kinds.is_empty() {
1060            info!("Nostr mirror history sync skipped: no enabled history kinds");
1061            return Ok(());
1062        }
1063        self.history_sync_authors_with_kinds(authors, &kinds).await
1064    }
1065
1066    async fn history_sync_authors_with_kinds(
1067        &self,
1068        authors: Vec<String>,
1069        kinds: &[u16],
1070    ) -> Result<()> {
1071        self.history_sync_authors_with_kinds_and_mode(authors, kinds, false, None)
1072            .await
1073    }
1074
1075    async fn history_sync_full_text_notes_for_reachable_authors(&self) -> Result<()> {
1076        let Some(distance) = self.full_text_note_history_follow_distance() else {
1077            return Ok(());
1078        };
1079        if self.full_text_note_history_max_relay_pages().is_none() {
1080            info!("Nostr mirror full text content history sync skipped: max_relay_pages=0");
1081            return Ok(());
1082        }
1083        info!(
1084            "Nostr mirror full text content history author collection starting: max_follow_distance={distance}"
1085        );
1086        let authors = self
1087            .prioritize_full_text_note_history_authors(
1088                self.collect_authors_with_max_distance(distance)?,
1089            )
1090            .await?;
1091        let Some(max_relay_pages) = self.full_text_note_history_max_relay_pages() else {
1092            info!("Nostr mirror full text content history sync skipped: max_relay_pages=0");
1093            return Ok(());
1094        };
1095        self.history_sync_distance_filtered_full_text_notes(authors, distance, max_relay_pages)
1096            .await
1097    }
1098
1099    async fn history_sync_full_text_notes_for_authors(&self, authors: Vec<String>) -> Result<()> {
1100        let Some(distance) = self.full_text_note_history_follow_distance() else {
1101            return Ok(());
1102        };
1103        let Some(max_relay_pages) = self.full_text_note_history_max_relay_pages() else {
1104            info!("Nostr mirror full text content history sync skipped: max_relay_pages=0");
1105            return Ok(());
1106        };
1107        let mut close_authors = Vec::new();
1108        for author in authors {
1109            let Some(pubkey) = decode_hex_pubkey(&author) else {
1110                continue;
1111            };
1112            if self
1113                .graph_store
1114                .follow_distance(&pubkey)?
1115                .is_some_and(|actual_distance| actual_distance <= distance)
1116            {
1117                close_authors.push(author);
1118            }
1119        }
1120        if close_authors.is_empty() {
1121            return Ok(());
1122        }
1123        let close_authors = self
1124            .prioritize_full_text_note_history_authors(close_authors)
1125            .await?;
1126
1127        self.history_sync_distance_filtered_full_text_notes(
1128            close_authors,
1129            distance,
1130            max_relay_pages,
1131        )
1132        .await
1133    }
1134
1135    async fn history_sync_distance_filtered_full_text_notes(
1136        &self,
1137        close_authors: Vec<String>,
1138        distance: u32,
1139        max_relay_pages: usize,
1140    ) -> Result<()> {
1141        if close_authors.is_empty() {
1142            return Ok(());
1143        }
1144
1145        info!(
1146            "Nostr mirror full text content history sync starting: authors={} max_follow_distance={} max_relay_pages={}",
1147            close_authors.len(),
1148            distance,
1149            max_relay_pages
1150        );
1151        let kinds = [Kind::TextNote.as_u16(), KIND_LONG_FORM_CONTENT];
1152        self.history_sync_authors_with_kinds_and_mode(
1153            close_authors,
1154            &kinds,
1155            true,
1156            Some(max_relay_pages),
1157        )
1158        .await
1159    }
1160
1161    async fn history_sync_authors_with_kinds_and_mode(
1162        &self,
1163        authors: Vec<String>,
1164        kinds: &[u16],
1165        full_author_history: bool,
1166        max_relay_pages: Option<usize>,
1167    ) -> Result<()> {
1168        let update_profile_and_graph = Self::history_sync_kinds_affect_profile_or_graph(kinds);
1169        let chunk_size = Self::history_sync_chunk_size_for_config(
1170            &self.config,
1171            authors.len(),
1172            kinds,
1173            full_author_history,
1174            None,
1175        );
1176        self.history_sync_authors_chunked(
1177            authors,
1178            |current_root, author_chunk| async move {
1179                self.history_sync_author_chunk(
1180                    current_root,
1181                    author_chunk,
1182                    kinds,
1183                    full_author_history,
1184                    max_relay_pages,
1185                )
1186                .await
1187            },
1188            update_profile_and_graph,
1189            Some(chunk_size),
1190        )
1191        .await
1192    }
1193
1194    async fn history_sync_authors_chunked<F, Fut>(
1195        &self,
1196        authors: Vec<String>,
1197        mut run_chunk: F,
1198        update_profile_and_graph: bool,
1199        chunk_size_override: Option<usize>,
1200    ) -> Result<()>
1201    where
1202        F: FnMut(Option<hashtree_core::Cid>, Vec<String>) -> Fut,
1203        Fut: std::future::Future<Output = Result<CrawlReport>>,
1204    {
1205        if authors.is_empty() {
1206            return Ok(());
1207        }
1208
1209        info!(
1210            "Nostr mirror history sync starting: authors={} relays={} negentropy_only={}",
1211            authors.len(),
1212            self.config.relays.len(),
1213            self.config.require_negentropy
1214        );
1215
1216        let mut current_root = self.graph_store.public_events_root_for_write()?;
1217        let mut last_error = None;
1218        let mut applied_chunks = 0usize;
1219        let mut failed_chunks = 0usize;
1220        let chunk_size = chunk_size_override
1221            .unwrap_or(self.config.history_sync_author_chunk_size)
1222            .max(1);
1223        let total_chunks = authors.len().div_ceil(chunk_size);
1224
1225        for (chunk_index, author_chunk) in authors.chunks(chunk_size).enumerate() {
1226            let author_chunk = author_chunk.to_vec();
1227            let author_count = author_chunk.len();
1228            info!(
1229                "Nostr mirror history sync chunk starting: chunk={}/{} authors={}",
1230                chunk_index + 1,
1231                total_chunks,
1232                author_count
1233            );
1234            let mut report = match run_chunk(current_root.clone(), author_chunk.clone()).await {
1235                Ok(report) => report,
1236                Err(err) => {
1237                    failed_chunks = failed_chunks.saturating_add(1);
1238                    warn!(
1239                        "Nostr mirror history sync chunk failed: chunk={}/{} authors={} error={:#}",
1240                        chunk_index + 1,
1241                        total_chunks,
1242                        author_count,
1243                        err
1244                    );
1245                    last_error = Some(err);
1246                    trim_transient_allocations();
1247                    continue;
1248                }
1249            };
1250
1251            let latest_root = self.graph_store.public_events_root_for_write()?;
1252            if latest_root != current_root {
1253                info!(
1254                    "Nostr mirror history sync root advanced while chunk was fetching; merging chunk into latest root: chunk={}/{} authors={} events_applied={}",
1255                    chunk_index + 1,
1256                    total_chunks,
1257                    author_count,
1258                    report.applied_events.len()
1259                );
1260                if report.applied_events.is_empty() {
1261                    report.root = latest_root.clone();
1262                } else {
1263                    let event_store = NostrEventStore::new(self.store.store_arc());
1264                    report.root = event_store
1265                        .build(latest_root.as_ref(), report.applied_events.clone())
1266                        .await
1267                        .context("merge history chunk into latest mirrored event root")?;
1268                }
1269                current_root = latest_root;
1270            }
1271
1272            if report.root != current_root {
1273                self.apply_history_root_with_options(
1274                    report.root.as_ref(),
1275                    update_profile_and_graph,
1276                    true,
1277                    Some(&report.applied_events),
1278                )
1279                .await?;
1280                current_root = report.root.clone();
1281                info!(
1282                    "Nostr mirror history sync updated trusted root: chunk={}/{} authors_processed={} events_selected={} events_seen={}",
1283                    chunk_index + 1,
1284                    total_chunks,
1285                    report.authors_processed,
1286                    report.events_selected,
1287                    report.events_seen
1288                );
1289            }
1290            applied_chunks = applied_chunks.saturating_add(1);
1291            trim_transient_allocations();
1292        }
1293
1294        if applied_chunks == 0 {
1295            return Err(last_error
1296                .unwrap_or_else(|| anyhow::anyhow!("mirror history sync made no progress"))
1297                .context("run mirror history sync"));
1298        }
1299        if failed_chunks > 0 {
1300            warn!(
1301                "Nostr mirror history sync completed with skipped chunks: applied_chunks={} failed_chunks={}",
1302                applied_chunks, failed_chunks
1303            );
1304        }
1305        Ok(())
1306    }
1307
1308    async fn history_sync_author_chunk(
1309        &self,
1310        current_root: Option<hashtree_core::Cid>,
1311        authors: Vec<String>,
1312        kinds: &[u16],
1313        full_author_history: bool,
1314        max_relay_pages: Option<usize>,
1315    ) -> Result<CrawlReport> {
1316        let mut last_error = None;
1317        let mut report = None;
1318        let mut plan = self.history_sync_plan(authors.len(), kinds);
1319        if full_author_history {
1320            plan.relay_fetch_mode = RelayFetchMode::AuthorBatches;
1321            plan.max_relay_pages = max_relay_pages.unwrap_or(plan.max_relay_pages);
1322        }
1323        for attempt in 0..3 {
1324            let mut last_logged_authors = 0usize;
1325            let bridge = NostrBridge::new(
1326                self.store.store_arc(),
1327                CrawlConfig {
1328                    relays: self.config.relays.clone(),
1329                    author_allowlist: Some(authors.clone()),
1330                    max_live_bytes: None,
1331                    max_events_seen: None,
1332                    max_authors: None,
1333                    max_follow_distance: None,
1334                    author_batch_size: plan.author_batch_size,
1335                    per_author_event_limit: plan.per_author_event_limit,
1336                    per_author_live_bytes: None,
1337                    fetch_timeout: self.config.fetch_timeout,
1338                    kinds: Some(kinds.to_vec()),
1339                    relay_fetch_mode: plan.relay_fetch_mode,
1340                    require_negentropy: self.config.require_negentropy,
1341                    relay_event_max_size: self.config.relay_event_max_size,
1342                    relay_page_size: plan.relay_page_size,
1343                    max_relay_pages: plan.max_relay_pages,
1344                    full_author_history,
1345                },
1346            );
1347
1348            let crawl = bridge.crawl_with_progress(
1349                self.graph_store.as_ref(),
1350                current_root.as_ref(),
1351                |progress| {
1352                    let log_interval = self.config.author_batch_size.saturating_mul(8).max(2_048);
1353                    let should_log = progress.authors_processed == progress.authors_considered
1354                        || progress.authors_processed == 0
1355                        || progress
1356                            .authors_processed
1357                            .saturating_sub(last_logged_authors)
1358                            >= log_interval;
1359                    if should_log {
1360                        last_logged_authors = progress.authors_processed;
1361                        info!(
1362                            "Nostr mirror history sync progress: authors_processed={}/{} events_selected={} events_seen={}",
1363                            progress.authors_processed,
1364                            progress.authors_considered,
1365                            progress.events_selected,
1366                            progress.events_seen
1367                        );
1368                    }
1369                },
1370            );
1371            let crawl_result: Result<CrawlReport> = if full_author_history {
1372                let timeout = self.config.fetch_timeout.saturating_mul(4);
1373                match tokio::time::timeout(timeout, crawl).await {
1374                    Ok(result) => result.map_err(Into::into),
1375                    Err(_) => Err(anyhow::anyhow!(
1376                        "full author history crawl timed out after {:?} for {} author(s)",
1377                        timeout,
1378                        authors.len()
1379                    )),
1380                }
1381            } else {
1382                crawl.await.map_err(Into::into)
1383            };
1384
1385            match crawl_result {
1386                Ok(next_report) => {
1387                    report = Some(next_report);
1388                    break;
1389                }
1390                Err(err) => {
1391                    last_error = Some(err);
1392                    if full_author_history {
1393                        break;
1394                    }
1395                    if attempt < 2 {
1396                        tokio::time::sleep(Duration::from_millis(500)).await;
1397                    }
1398                }
1399            }
1400        }
1401        report
1402            .ok_or_else(|| last_error.expect("history sync retry captured error"))
1403            .context("run mirror history sync")
1404    }
1405
1406    #[cfg(test)]
1407    async fn apply_history_root(&self, root: Option<&hashtree_core::Cid>) -> Result<()> {
1408        self.apply_history_root_with_options(root, true, true, None)
1409            .await
1410    }
1411
1412    async fn apply_history_root_with_options(
1413        &self,
1414        root: Option<&hashtree_core::Cid>,
1415        update_profile_and_graph: bool,
1416        publish_roots: bool,
1417        applied_events: Option<&[hashtree_nostr::StoredNostrEvent]>,
1418    ) -> Result<()> {
1419        self.graph_store.write_public_events_root(root)?;
1420        let Some(root) = root else {
1421            return Ok(());
1422        };
1423
1424        self.note_public_events_root_change()?;
1425        if update_profile_and_graph {
1426            let events = match applied_events {
1427                Some(events) => events
1428                    .iter()
1429                    .cloned()
1430                    .map(socialgraph::stored_event_to_nostr_event)
1431                    .collect::<Result<Vec<_>>>()?,
1432                None => {
1433                    let event_store = NostrEventStore::new(self.store.store_arc());
1434                    event_store
1435                        .list_recent_lossy(Some(root), ListEventsOptions::default())
1436                        .await
1437                        .context("list trusted mirrored events")?
1438                        .into_iter()
1439                        .map(socialgraph::stored_event_to_nostr_event)
1440                        .collect::<Result<Vec<_>>>()?
1441                }
1442            };
1443
1444            socialgraph::ingest_graph_parsed_events(self.graph_store.as_ref(), &events)
1445                .context("sync mirrored social graph state")?;
1446            if applied_events.is_some() {
1447                self.graph_store
1448                    .sync_profile_index_for_events(&events)
1449                    .context("update mirrored profile search index")?;
1450            } else {
1451                self.graph_store
1452                    .rebuild_profile_index_for_events(&events)
1453                    .context("rebuild mirrored profile search index")?;
1454            }
1455            self.note_profile_search_root_change()?;
1456            self.note_profiles_by_pubkey_root_change()?;
1457        }
1458        if !publish_roots {
1459            return Ok(());
1460        }
1461        let (event_result, profile_search_result, profiles_by_pubkey_result) = self
1462            .publish_priority_roots(true, update_profile_and_graph, update_profile_and_graph)
1463            .await;
1464        if let Err(err) = event_result {
1465            warn!(
1466                "Nostr mirror event-root publish failed after root update: {:#}",
1467                err
1468            );
1469        }
1470        if let Err(err) = profile_search_result {
1471            warn!(
1472                "Nostr mirror profile-search publish failed after root update: {:#}",
1473                err
1474            );
1475        }
1476        if let Err(err) = profiles_by_pubkey_result {
1477            warn!(
1478                "Nostr mirror profiles-by-pubkey publish failed after root update: {:#}",
1479                err
1480            );
1481        }
1482        Ok(())
1483    }
1484
1485    async fn subscribe_authors_since(
1486        &self,
1487        authors: &[String],
1488        since: Timestamp,
1489        subscribed_authors: &mut HashSet<String>,
1490    ) -> Result<()> {
1491        let new_authors = authors
1492            .iter()
1493            .filter(|author| !subscribed_authors.contains(*author))
1494            .cloned()
1495            .collect::<Vec<_>>();
1496        if new_authors.is_empty() {
1497            return Ok(());
1498        }
1499
1500        for chunk in new_authors.chunks(self.config.author_batch_size.max(1)) {
1501            let pubkeys = chunk
1502                .iter()
1503                .filter_map(|author| PublicKey::from_hex(author).ok())
1504                .collect::<Vec<_>>();
1505            if pubkeys.is_empty() {
1506                continue;
1507            }
1508
1509            let filter = Filter::new()
1510                .authors(pubkeys)
1511                .kinds(self.config.kinds.iter().copied().map(Kind::from))
1512                .since(since);
1513
1514            if let Err(err) = self.client.subscribe(filter, None).await {
1515                warn!(
1516                    "Nostr mirror author subscription failed: authors={} error={:#}",
1517                    chunk.len(),
1518                    err
1519                );
1520                continue;
1521            }
1522            subscribed_authors.extend(chunk.iter().cloned());
1523        }
1524        Ok(())
1525    }
1526
1527    fn ingest_live_event(&self, event: &Event) -> Result<()> {
1528        self.pending_live_events
1529            .lock()
1530            .expect("pending live events")
1531            .insert(event.id.to_hex(), event.clone());
1532        Ok(())
1533    }
1534
1535    async fn flush_live_events(&self) -> Result<()> {
1536        let pending = {
1537            let mut pending = self
1538                .pending_live_events
1539                .lock()
1540                .expect("pending live events");
1541            if pending.is_empty() {
1542                return Ok(());
1543            }
1544            std::mem::take(&mut *pending)
1545        };
1546        let events = pending.into_values().collect::<Vec<_>>();
1547        let event_count = events.len();
1548        let previous_event_root = self.graph_store.public_events_root()?;
1549        let previous_profile_search_root = self.graph_store.profile_search_root()?;
1550        let previous_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
1551
1552        socialgraph::ingest_parsed_events_with_storage_class(
1553            self.graph_store.as_ref(),
1554            &events,
1555            socialgraph::EventStorageClass::Public,
1556        )
1557        .context("ingest live mirrored event batch")?;
1558
1559        let next_event_root = self.graph_store.public_events_root()?;
1560        let next_profile_search_root = self.graph_store.profile_search_root()?;
1561        let next_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
1562        let event_root_changed = next_event_root != previous_event_root;
1563        let profile_search_root_changed = next_profile_search_root != previous_profile_search_root;
1564        let profiles_by_pubkey_root_changed =
1565            next_profiles_by_pubkey_root != previous_profiles_by_pubkey_root;
1566
1567        if event_root_changed {
1568            self.note_public_events_root_change()?;
1569        }
1570        if profile_search_root_changed {
1571            self.note_profile_search_root_change()?;
1572        }
1573        if profiles_by_pubkey_root_changed {
1574            self.note_profiles_by_pubkey_root_change()?;
1575        }
1576        if profile_search_root_changed {
1577            self.maybe_publish_profile_search_root(false).await?;
1578        }
1579        if profiles_by_pubkey_root_changed {
1580            self.maybe_publish_profiles_by_pubkey_root(false).await?;
1581        }
1582        if event_root_changed {
1583            self.maybe_publish_event_root(false).await?;
1584        }
1585        info!(
1586            "Nostr mirror flushed live events: events={} event_root_changed={} profile_search_root_changed={} profiles_by_pubkey_root_changed={}",
1587            event_count,
1588            event_root_changed,
1589            profile_search_root_changed,
1590            profiles_by_pubkey_root_changed
1591        );
1592        Ok(())
1593    }
1594
1595    fn note_public_events_root_change(&self) -> Result<()> {
1596        let root = self.graph_store.public_events_root()?;
1597        Self::note_root_change(
1598            self.config.published_event_tree_name.as_deref(),
1599            &self.event_publish_state,
1600            root,
1601        )
1602    }
1603
1604    fn note_profile_search_root_change(&self) -> Result<()> {
1605        let root = self.graph_store.profile_search_root()?;
1606        Self::note_root_change(
1607            self.config.published_profile_search_tree_name.as_deref(),
1608            &self.profile_search_publish_state,
1609            root,
1610        )
1611    }
1612
1613    fn note_profiles_by_pubkey_root_change(&self) -> Result<()> {
1614        let root = self.graph_store.profiles_by_pubkey_root()?;
1615        Self::note_root_change(
1616            self.config
1617                .published_profiles_by_pubkey_tree_name
1618                .as_deref(),
1619            &self.profiles_by_pubkey_publish_state,
1620            root,
1621        )
1622    }
1623
1624    fn note_root_change(
1625        tree_name: Option<&str>,
1626        publish_state: &Arc<Mutex<RootPublishState>>,
1627        root: Option<hashtree_core::Cid>,
1628    ) -> Result<()> {
1629        let Some(_tree_name) = tree_name else {
1630            return Ok(());
1631        };
1632
1633        let mut state = publish_state.lock().expect("root publish state");
1634        let now = Instant::now();
1635
1636        if state.pending_root == root {
1637            return Ok(());
1638        }
1639
1640        state.pending_root = root;
1641        state.last_upload_failed_at = None;
1642        state.last_upload_error = None;
1643        state.last_changed_at = Some(now);
1644        if state.dirty_since.is_none() {
1645            state.dirty_since = Some(now);
1646        }
1647        Ok(())
1648    }
1649
1650    async fn maybe_publish_event_root(&self, force: bool) -> Result<()> {
1651        if self.take_missing_blob_event_upload_error() {
1652            return self.rebuild_event_indexes_after_missing_blobs(force).await;
1653        }
1654        let result = self
1655            .maybe_publish_root(
1656                self.config.published_event_tree_name.as_deref(),
1657                &self.event_publish_state,
1658                "event root",
1659                force,
1660                false,
1661            )
1662            .await;
1663        let Err(error) = result else {
1664            if self.take_missing_blob_event_upload_error() {
1665                return self.rebuild_event_indexes_after_missing_blobs(force).await;
1666            }
1667            return Ok(());
1668        };
1669        if !is_missing_local_blob_push_error(&error) {
1670            return Err(error);
1671        }
1672
1673        self.rebuild_event_indexes_after_missing_blobs(force).await
1674    }
1675
1676    fn take_missing_blob_event_upload_error(&self) -> bool {
1677        let mut state = self
1678            .event_publish_state
1679            .lock()
1680            .expect("event root publish state");
1681        if state.upload_in_progress_root.is_some() {
1682            return false;
1683        }
1684        if !state.missing_blob_rebuild_required {
1685            return false;
1686        }
1687        state.missing_blob_rebuild_required = false;
1688        state.last_upload_error = None;
1689        state.last_upload_failed_at = None;
1690        true
1691    }
1692
1693    async fn rebuild_event_indexes_after_missing_blobs(&self, force: bool) -> Result<()> {
1694        warn!(
1695            "Nostr mirror event root DAG references missing local blobs; rebuilding event indexes from stored events"
1696        );
1697        let (public_count, ambient_count) = self
1698            .graph_store
1699            .rebuild_event_indexes_from_stored_events_async()
1700            .await
1701            .context("rebuild event indexes after missing event blobs")?;
1702        info!(
1703            "Nostr mirror rebuilt event indexes after missing blobs: public={} ambient={}",
1704            public_count, ambient_count
1705        );
1706        self.sync_publish_roots_from_store()?;
1707
1708        self.maybe_publish_root(
1709            self.config.published_event_tree_name.as_deref(),
1710            &self.event_publish_state,
1711            "event root",
1712            force,
1713            false,
1714        )
1715        .await
1716    }
1717
1718    async fn maybe_publish_profile_search_root(&self, force: bool) -> Result<()> {
1719        self.maybe_publish_root(
1720            self.config.published_profile_search_tree_name.as_deref(),
1721            &self.profile_search_publish_state,
1722            "profile search root",
1723            force,
1724            true,
1725        )
1726        .await
1727    }
1728
1729    async fn maybe_publish_profiles_by_pubkey_root(&self, force: bool) -> Result<()> {
1730        self.maybe_publish_root(
1731            self.config
1732                .published_profiles_by_pubkey_tree_name
1733                .as_deref(),
1734            &self.profiles_by_pubkey_publish_state,
1735            "profiles-by-pubkey root",
1736            force,
1737            true,
1738        )
1739        .await
1740    }
1741
1742    async fn maybe_publish_root(
1743        &self,
1744        tree_name: Option<&str>,
1745        publish_state: &Arc<Mutex<RootPublishState>>,
1746        log_label: &str,
1747        force: bool,
1748        publish_before_upload_ready_on_force: bool,
1749    ) -> Result<()> {
1750        let Some(tree_name) = tree_name else {
1751            return Ok(());
1752        };
1753
1754        let pending_root = {
1755            let state = publish_state.lock().expect("root publish state");
1756            let Some(pending_root) = state.pending_root.clone() else {
1757                return Ok(());
1758            };
1759
1760            let now = Instant::now();
1761            let debounce_ready = state.last_changed_at.is_some_and(|changed_at| {
1762                now.duration_since(changed_at) >= MIRROR_ROOT_PUBLISH_DEBOUNCE
1763            });
1764            let stale_ready = state.dirty_since.is_some_and(|dirty_since| {
1765                now.duration_since(dirty_since) >= MIRROR_ROOT_PUBLISH_MAX_STALENESS
1766            });
1767            if !force && !debounce_ready && !stale_ready {
1768                return Ok(());
1769            }
1770
1771            pending_root
1772        };
1773
1774        let upload_started = self.maybe_start_background_root_upload(
1775            tree_name,
1776            &pending_root,
1777            publish_state,
1778            log_label,
1779        );
1780        let upload_required = !self.config.blossom_write_servers.is_empty();
1781        let (upload_ready, publish_root) = {
1782            let state = publish_state.lock().expect("root publish state");
1783            let upload_ready =
1784                !upload_required || state.last_uploaded_root.as_ref() == Some(&pending_root);
1785            let publish_root = if upload_ready {
1786                Some(pending_root.clone())
1787            } else {
1788                state.last_uploaded_root.clone().filter(|uploaded_root| {
1789                    state.last_published_root.as_ref() != Some(uploaded_root)
1790                })
1791            };
1792            (upload_ready, publish_root)
1793        };
1794        let publish_before_upload_ready =
1795            force && upload_required && !upload_ready && publish_before_upload_ready_on_force;
1796        let publish_root = if let Some(publish_root) = publish_root {
1797            publish_root
1798        } else if publish_before_upload_ready {
1799            pending_root.clone()
1800        } else {
1801            if upload_started {
1802                info!(
1803                    "Nostr mirror uploading {} DAG before publish: tree={} hash={}",
1804                    log_label,
1805                    tree_name,
1806                    hex::encode(pending_root.hash),
1807                );
1808            }
1809            return Ok(());
1810        };
1811
1812        let mut successful_relays = Vec::new();
1813        let mut failed_relays = Vec::new();
1814        let mut published_now = false;
1815        let publish_required =
1816            self.publish_client.is_some() && !self.config.publish_relays.is_empty();
1817        if publish_required {
1818            let Some(publish_client) = self.publish_client.as_ref() else {
1819                unreachable!("publish_required implies publish_client");
1820            };
1821            if !self.has_connected_publish_relay().await {
1822                return Ok(());
1823            }
1824            if publish_before_upload_ready {
1825                info!(
1826                    "Nostr mirror publishing {} before Blossom upload completes: tree={} hash={}",
1827                    log_label,
1828                    tree_name,
1829                    hex::encode(pending_root.hash),
1830                );
1831            } else if !upload_ready {
1832                info!(
1833                    "Nostr mirror publishing uploaded {} while newer root is still uploading: tree={} published_hash={} pending_hash={}",
1834                    log_label,
1835                    tree_name,
1836                    hex::encode(publish_root.hash),
1837                    hex::encode(pending_root.hash),
1838                );
1839            }
1840
1841            let already_published = {
1842                let state = publish_state.lock().expect("root publish state");
1843                state.last_published_root.as_ref() == Some(&publish_root)
1844            };
1845            if !already_published {
1846                let publish_relays = self.config.publish_relays.clone();
1847                let latest_known_created_at = {
1848                    let state = publish_state.lock().expect("root publish state");
1849                    state.last_published_created_at
1850                };
1851                let publish_created_at =
1852                    next_replaceable_created_at(Timestamp::now(), latest_known_created_at);
1853                let event = publish_client
1854                    .sign_event_builder(Self::build_public_root_event(
1855                        tree_name,
1856                        &publish_root,
1857                        publish_created_at,
1858                    ))
1859                    .await
1860                    .with_context(|| format!("sign {log_label} event"))?;
1861                let publish_result = self
1862                    .publish_root_event_to_relays(publish_client, &publish_relays, &event)
1863                    .await
1864                    .with_context(|| format!("publish {log_label} event"))?;
1865                successful_relays = publish_result.0;
1866                failed_relays = publish_result.1;
1867                if successful_relays.is_empty() {
1868                    let failure_summary = if failed_relays.is_empty() {
1869                        "no publish relays accepted the event".to_string()
1870                    } else {
1871                        failed_relays.join("; ")
1872                    };
1873                    anyhow::bail!("no publish relays accepted the event ({failure_summary})");
1874                }
1875
1876                let mut state = publish_state.lock().expect("root publish state");
1877                if state.pending_root.as_ref() == Some(&pending_root)
1878                    || state.last_uploaded_root.as_ref() == Some(&publish_root)
1879                    || publish_before_upload_ready
1880                {
1881                    state.last_published_root = Some(publish_root.clone());
1882                    state.last_published_at = Some(Instant::now());
1883                    state.last_published_created_at = Some(event.created_at);
1884                }
1885                published_now = true;
1886            }
1887        }
1888
1889        {
1890            let mut state = publish_state.lock().expect("root publish state");
1891            if state.pending_root.as_ref() == Some(&pending_root) {
1892                let upload_satisfied = self.config.blossom_write_servers.is_empty()
1893                    || state.last_uploaded_root.as_ref() == Some(&pending_root);
1894                let publish_satisfied =
1895                    !publish_required || state.last_published_root.as_ref() == Some(&pending_root);
1896                if upload_satisfied && publish_satisfied {
1897                    state.dirty_since = None;
1898                }
1899            }
1900        }
1901
1902        if published_now {
1903            info!(
1904                "Nostr mirror published {}: tree={} hash={} relays={:?}",
1905                log_label,
1906                tree_name,
1907                hex::encode(publish_root.hash),
1908                successful_relays,
1909            );
1910        }
1911        if !failed_relays.is_empty() {
1912            warn!(
1913                "Nostr mirror publish had relay failures: tree={} failures={:?}",
1914                tree_name, failed_relays
1915            );
1916        }
1917        Ok(())
1918    }
1919
1920    fn maybe_start_background_root_upload(
1921        &self,
1922        tree_name: &str,
1923        pending_root: &hashtree_core::Cid,
1924        publish_state: &Arc<Mutex<RootPublishState>>,
1925        log_label: &str,
1926    ) -> bool {
1927        if self.config.blossom_write_servers.is_empty() {
1928            return false;
1929        }
1930
1931        let previous_uploaded_root = {
1932            let mut state = publish_state.lock().expect("root publish state");
1933            if state.last_uploaded_root.as_ref() == Some(pending_root)
1934                || state.upload_in_progress_root.is_some()
1935            {
1936                return false;
1937            }
1938            if state
1939                .last_upload_failed_at
1940                .is_some_and(|failed_at| failed_at.elapsed() < MIRROR_ROOT_UPLOAD_RETRY_INTERVAL)
1941            {
1942                return false;
1943            }
1944            state.upload_in_progress_root = Some(pending_root.clone());
1945            state.last_uploaded_root.clone()
1946        };
1947
1948        let store = Arc::clone(&self.store);
1949        let upload_state_path = Self::uploaded_root_state_path(store.base_path(), tree_name);
1950        let servers = self.config.blossom_write_servers.clone();
1951        let root = pending_root.clone();
1952        let publish_state = Arc::clone(publish_state);
1953        let log_label = log_label.to_string();
1954        tokio::task::spawn_blocking(move || {
1955            let runtime = tokio::runtime::Builder::new_current_thread()
1956                .enable_all()
1957                .build()
1958                .expect("build nostr mirror root upload runtime");
1959            runtime.block_on(async move {
1960                let result = background_blossom_push_incremental_with_store(
1961                    store,
1962                    root.clone(),
1963                    previous_uploaded_root,
1964                    &servers,
1965                )
1966                .await;
1967                let mut state = publish_state.lock().expect("root publish state");
1968                if state.upload_in_progress_root.as_ref() == Some(&root) {
1969                    state.upload_in_progress_root = None;
1970                }
1971                match result {
1972                    Ok(()) => {
1973                        if let Err(err) =
1974                            Self::write_uploaded_root_state(&upload_state_path, &root, &log_label)
1975                        {
1976                            warn!("Nostr mirror failed to persist uploaded root state: {err:#}");
1977                        }
1978                        state.last_uploaded_root = Some(root.clone());
1979                        state.last_uploaded_at = Some(Instant::now());
1980                        if state.pending_root.as_ref() == Some(&root) {
1981                            state.last_upload_failed_at = None;
1982                            state.last_upload_error = None;
1983                            state.missing_blob_rebuild_required = false;
1984                        }
1985                        info!(
1986                            "Nostr mirror uploaded {} DAG to Blossom: hash={}",
1987                            log_label,
1988                            hex::encode(root.hash)
1989                        );
1990                    }
1991                    Err(err) => {
1992                        if state.pending_root.as_ref() == Some(&root) {
1993                            state.last_upload_failed_at = Some(Instant::now());
1994                            state.last_upload_error = Some(format!("{err:#}"));
1995                        }
1996                        if is_missing_local_blob_message(&format!("{err:#}")) {
1997                            state.missing_blob_rebuild_required = true;
1998                        }
1999                        warn!(
2000                            "Nostr mirror {} DAG upload failed: hash={} error={:#}",
2001                            log_label,
2002                            hex::encode(root.hash),
2003                            err
2004                        );
2005                    }
2006                }
2007            });
2008            trim_transient_allocations();
2009        });
2010
2011        true
2012    }
2013
2014    async fn publish_root_event_to_relays(
2015        &self,
2016        publish_client: &Client,
2017        relays: &[String],
2018        event: &Event,
2019    ) -> Result<(Vec<String>, Vec<String>)> {
2020        let primary_send = Self::send_root_event_to_relays(publish_client, relays, event);
2021        let (mut successful_relays, mut failed_relays) =
2022            match tokio::time::timeout(MIRROR_ROOT_PUBLISH_PRIMARY_TIMEOUT, primary_send).await {
2023                Ok(result) => result,
2024                Err(_) => (
2025                    Vec::new(),
2026                    vec![format!(
2027                        "publish relays: primary publish timed out after {:?}",
2028                        MIRROR_ROOT_PUBLISH_PRIMARY_TIMEOUT
2029                    )],
2030                ),
2031            };
2032
2033        if successful_relays.is_empty() {
2034            let (retry_successes, retry_failures) = self
2035                .publish_root_event_with_fresh_client(relays, event)
2036                .await;
2037            successful_relays.extend(retry_successes);
2038            failed_relays.extend(retry_failures);
2039        }
2040
2041        Ok((successful_relays, failed_relays))
2042    }
2043
2044    async fn send_root_event_to_relays(
2045        publish_client: &Client,
2046        relays: &[String],
2047        event: &Event,
2048    ) -> (Vec<String>, Vec<String>) {
2049        let mut successful_relays = Vec::new();
2050        let mut failed_relays = Vec::new();
2051
2052        match publish_client
2053            .send_event_to(relays.iter().map(|relay| relay.as_str()), event)
2054            .await
2055        {
2056            Ok(output) => {
2057                for relay in relays {
2058                    let relay_url = relay.trim_end_matches('/');
2059                    if output
2060                        .success
2061                        .iter()
2062                        .any(|url| url.as_str().trim_end_matches('/') == relay_url)
2063                    {
2064                        successful_relays.push(relay.clone());
2065                    }
2066                }
2067                failed_relays.extend(
2068                    output
2069                        .failed
2070                        .into_iter()
2071                        .map(|(url, reason)| format!("{url}: {reason}")),
2072                );
2073            }
2074            Err(err) => {
2075                failed_relays.push(format!("publish relays: {err}"));
2076            }
2077        }
2078
2079        (successful_relays, failed_relays)
2080    }
2081
2082    async fn publish_root_event_with_fresh_client(
2083        &self,
2084        relays: &[String],
2085        event: &Event,
2086    ) -> (Vec<String>, Vec<String>) {
2087        let client = Client::new(Keys::generate());
2088        let mut setup_failures = Vec::new();
2089        for relay in relays {
2090            if let Err(err) = client.add_relay(relay).await {
2091                setup_failures.push(format!("{relay}: add relay failed: {err}"));
2092            }
2093        }
2094
2095        client.connect().await;
2096        let publish = Self::send_root_event_to_relays(&client, relays, event);
2097        let retry_timeout = self
2098            .config
2099            .fetch_timeout
2100            .min(MIRROR_ROOT_PUBLISH_RETRY_TIMEOUT);
2101        let result = tokio::time::timeout(retry_timeout, publish).await;
2102        let _ = client.disconnect().await;
2103
2104        match result {
2105            Ok((successful_relays, mut failed_relays)) => {
2106                failed_relays.extend(setup_failures);
2107                (successful_relays, failed_relays)
2108            }
2109            Err(_) => {
2110                setup_failures.push(format!(
2111                    "fresh publish client timed out after {:?}",
2112                    retry_timeout
2113                ));
2114                (Vec::new(), setup_failures)
2115            }
2116        }
2117    }
2118
2119    fn build_public_root_event(
2120        tree_name: &str,
2121        cid: &hashtree_core::Cid,
2122        created_at: Timestamp,
2123    ) -> EventBuilder {
2124        let mut tags = vec![
2125            Tag::identifier(tree_name.to_string()),
2126            Tag::custom(
2127                TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
2128                vec!["hashtree"],
2129            ),
2130            Tag::custom(TagKind::Custom("hash".into()), vec![hex::encode(cid.hash)]),
2131        ];
2132        if let Some(key) = cid.key {
2133            tags.push(Tag::custom(
2134                TagKind::Custom("key".into()),
2135                vec![hex::encode(key)],
2136            ));
2137        }
2138
2139        EventBuilder::new(Kind::Custom(30078), "")
2140            .tags(tags)
2141            .custom_created_at(created_at)
2142    }
2143}
2144
2145fn is_missing_local_blob_push_error(error: &anyhow::Error) -> bool {
2146    error
2147        .chain()
2148        .any(|cause| cause.to_string().contains(MISSING_LOCAL_BLOB_PUSH_ERROR))
2149}
2150
2151fn is_missing_local_blob_message(message: &str) -> bool {
2152    message.contains(MISSING_LOCAL_BLOB_PUSH_ERROR)
2153}
2154
2155fn next_replaceable_created_at(now: Timestamp, latest_existing: Option<Timestamp>) -> Timestamp {
2156    match latest_existing {
2157        Some(latest) if latest >= now => Timestamp::from_secs(latest.as_secs().saturating_add(1)),
2158        _ => now,
2159    }
2160}
2161
2162#[cfg(test)]
2163mod tests;