Skip to main content

hashtree_cli/
nostr_mirror.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::sync::Mutex;
5use std::time::Duration;
6use std::time::Instant;
7
8use anyhow::{Context, Result};
9use hashtree_nostr::{
10    CrawlConfig, CrawlReport, ListEventsOptions, NostrBridge, NostrEventStore, RelayFetchMode,
11};
12use nostr::{
13    Alphabet, Event, EventBuilder, Filter, Kind, PublicKey, SingleLetterTag, Tag, TagKind,
14    Timestamp,
15};
16use nostr_sdk::{
17    pool::RelayLimits, prelude::RelayPoolNotification, Client, Keys, Options, RelayStatus,
18};
19use tokio::sync::{watch, Mutex as AsyncMutex};
20use tracing::{debug, info, warn};
21
22use crate::blossom_push::background_blossom_push_incremental_with_store;
23use crate::socialgraph::crawler::SOCIALGRAPH_RELAY_EVENT_MAX_SIZE;
24use crate::socialgraph::{self, SocialGraphBackend, SocialGraphStore};
25use crate::HashtreeStore;
26
27#[cfg(not(test))]
28const MIRROR_STARTUP_DELAY: Duration = Duration::from_secs(8);
29#[cfg(test)]
30const MIRROR_STARTUP_DELAY: Duration = Duration::from_millis(50);
31
32#[cfg(not(test))]
33const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_secs(1);
34#[cfg(test)]
35const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_millis(250);
36
37#[cfg(not(test))]
38const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
39#[cfg(test)]
40const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_millis(100);
41
42#[cfg(not(test))]
43const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_secs(30);
44#[cfg(test)]
45const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_millis(100);
46
47const KIND_LONG_FORM_CONTENT: u16 = 30_023;
48const DEFAULT_HISTORY_KINDS: [u16; 7] = [0, 1, 3, 6, 7, 9735, KIND_LONG_FORM_CONTENT];
49const DEFAULT_EVENT_TREE_NAME: &str = "nostr-event-index";
50const DEFAULT_PROFILE_SEARCH_TREE_NAME: &str = "profile-search";
51const DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME: &str = "profiles-by-pubkey";
52const MIRROR_UPLOAD_STATE_DIR: &str = "nostr-mirror";
53const MIRROR_UPLOADED_ROOT_SUFFIX: &str = ".uploaded-root";
54const METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 1;
55const METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE: usize = 64;
56const DEFAULT_FULL_TEXT_NOTE_HISTORY_FOLLOW_DISTANCE: u32 = 2;
57const DEFAULT_FULL_TEXT_NOTE_HISTORY_MAX_RELAY_PAGES: usize = 0;
58const FULL_TEXT_HISTORY_PRIORITY_MAX_DISTANCE: u32 = 1;
59const FULL_TEXT_HISTORY_PRIORITY_SAMPLE_LIMIT: usize = 32;
60const LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER: usize = 8;
61const LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 16;
62const LARGE_HISTORY_SYNC_MAX_RELAY_PAGES: usize = 20;
63
64#[cfg(not(test))]
65const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_secs(300);
66#[cfg(test)]
67const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_millis(100);
68
69#[cfg(not(test))]
70const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_secs(5);
71#[cfg(test)]
72const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_millis(20);
73
74#[cfg(not(test))]
75const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_secs(30);
76#[cfg(test)]
77const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_millis(100);
78
79#[cfg(not(test))]
80const MIRROR_ROOT_UPLOAD_RETRY_INTERVAL: Duration = Duration::from_secs(60);
81#[cfg(test)]
82const MIRROR_ROOT_UPLOAD_RETRY_INTERVAL: Duration = Duration::from_millis(100);
83
84#[cfg(not(test))]
85const MIRROR_ROOT_PUBLISH_PRIMARY_TIMEOUT: Duration = Duration::from_secs(2);
86#[cfg(test)]
87const MIRROR_ROOT_PUBLISH_PRIMARY_TIMEOUT: Duration = Duration::from_millis(250);
88
89#[cfg(not(test))]
90const MIRROR_ROOT_PUBLISH_RETRY_TIMEOUT: Duration = Duration::from_secs(5);
91#[cfg(test)]
92const MIRROR_ROOT_PUBLISH_RETRY_TIMEOUT: Duration = Duration::from_secs(2);
93
94const MISSING_LOCAL_BLOB_PUSH_ERROR: &str = "missing local blob";
95
96fn trim_transient_allocations() {
97    #[cfg(all(target_os = "linux", target_env = "gnu"))]
98    unsafe {
99        // SAFETY: malloc_trim asks glibc to return free heap pages to the OS.
100        // It does not touch live allocations and is safe to call after the
101        // large, synchronous mirror upload job has dropped its temporary data.
102        libc::malloc_trim(0);
103    }
104}
105
106fn decode_hex_pubkey(value: &str) -> Option<[u8; 32]> {
107    let bytes = hex::decode(value).ok()?;
108    <[u8; 32]>::try_from(bytes.as_slice()).ok()
109}
110
111#[derive(Debug, Clone)]
112pub struct NostrMirrorConfig {
113    pub relays: Vec<String>,
114    pub publish_relays: Vec<String>,
115    pub blossom_write_servers: Vec<String>,
116    pub max_follow_distance: u32,
117    pub overmute_threshold: f64,
118    pub author_batch_size: usize,
119    pub history_sync_author_chunk_size: usize,
120    pub history_sync_per_author_event_limit: usize,
121    pub missing_profile_backfill_batch_size: usize,
122    pub fetch_timeout: Duration,
123    pub relay_event_max_size: Option<u32>,
124    pub require_negentropy: bool,
125    pub kinds: Vec<u16>,
126    pub history_sync_on_start: bool,
127    pub history_sync_on_reconnect: bool,
128    pub full_text_note_history_follow_distance: Option<u32>,
129    pub full_text_note_history_max_relay_pages: usize,
130    pub published_event_tree_name: Option<String>,
131    pub published_profile_search_tree_name: Option<String>,
132    pub published_profiles_by_pubkey_tree_name: Option<String>,
133}
134
135impl Default for NostrMirrorConfig {
136    fn default() -> Self {
137        Self {
138            relays: Vec::new(),
139            publish_relays: Vec::new(),
140            blossom_write_servers: Vec::new(),
141            max_follow_distance: 2,
142            overmute_threshold: 1.0,
143            author_batch_size: 256,
144            history_sync_author_chunk_size: 5_000,
145            history_sync_per_author_event_limit: 256,
146            missing_profile_backfill_batch_size: 5_000,
147            fetch_timeout: Duration::from_secs(15),
148            relay_event_max_size: Some(SOCIALGRAPH_RELAY_EVENT_MAX_SIZE),
149            require_negentropy: false,
150            kinds: DEFAULT_HISTORY_KINDS.to_vec(),
151            history_sync_on_start: true,
152            history_sync_on_reconnect: true,
153            full_text_note_history_follow_distance: Some(
154                DEFAULT_FULL_TEXT_NOTE_HISTORY_FOLLOW_DISTANCE,
155            ),
156            full_text_note_history_max_relay_pages: DEFAULT_FULL_TEXT_NOTE_HISTORY_MAX_RELAY_PAGES,
157            published_event_tree_name: Some(DEFAULT_EVENT_TREE_NAME.to_string()),
158            published_profile_search_tree_name: Some(DEFAULT_PROFILE_SEARCH_TREE_NAME.to_string()),
159            published_profiles_by_pubkey_tree_name: Some(
160                DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME.to_string(),
161            ),
162        }
163    }
164}
165
166#[derive(Debug, Default)]
167struct RootPublishState {
168    pending_root: Option<hashtree_core::Cid>,
169    last_changed_at: Option<Instant>,
170    dirty_since: Option<Instant>,
171    last_published_root: Option<hashtree_core::Cid>,
172    last_published_at: Option<Instant>,
173    last_published_created_at: Option<Timestamp>,
174    last_uploaded_root: Option<hashtree_core::Cid>,
175    last_uploaded_at: Option<Instant>,
176    upload_in_progress_root: Option<hashtree_core::Cid>,
177    last_upload_failed_at: Option<Instant>,
178    last_upload_error: Option<String>,
179    missing_blob_rebuild_required: bool,
180}
181
182#[derive(Debug, Clone, Copy, PartialEq, Eq)]
183struct HistorySyncPlan {
184    relay_fetch_mode: RelayFetchMode,
185    author_batch_size: usize,
186    per_author_event_limit: usize,
187    relay_page_size: usize,
188    max_relay_pages: usize,
189}
190
191pub struct BackgroundNostrMirror {
192    config: NostrMirrorConfig,
193    store: Arc<HashtreeStore>,
194    graph_store: Arc<SocialGraphStore>,
195    client: Client,
196    publish_client: Option<Client>,
197    event_publish_state: Arc<Mutex<RootPublishState>>,
198    profile_search_publish_state: Arc<Mutex<RootPublishState>>,
199    profiles_by_pubkey_publish_state: Arc<Mutex<RootPublishState>>,
200    pending_live_events: Mutex<BTreeMap<String, Event>>,
201    missing_profile_cursor: Mutex<usize>,
202    history_sync_lock: AsyncMutex<()>,
203    shutdown_tx: watch::Sender<bool>,
204    shutdown_rx: watch::Receiver<bool>,
205}
206
207impl BackgroundNostrMirror {
208    pub async fn new(
209        config: NostrMirrorConfig,
210        store: Arc<HashtreeStore>,
211        graph_store: Arc<SocialGraphStore>,
212        publish_keys: Option<Keys>,
213    ) -> Result<Self> {
214        let client = if let Some(max_size) = config.relay_event_max_size {
215            let mut limits = RelayLimits::default();
216            limits.events.max_size = Some(max_size);
217            Client::with_opts(Keys::generate(), Options::new().relay_limits(limits))
218        } else {
219            Client::new(Keys::generate())
220        };
221        for relay in &config.relays {
222            client
223                .add_relay(relay)
224                .await
225                .with_context(|| format!("add mirror relay {relay}"))?;
226        }
227        client.connect().await;
228
229        let publish_client = if let Some(keys) = publish_keys {
230            if config.publish_relays.is_empty() {
231                None
232            } else {
233                let client = Client::with_opts(keys, Options::new().wait_for_send(false));
234                for relay in &config.publish_relays {
235                    client
236                        .add_relay(relay)
237                        .await
238                        .with_context(|| format!("add mirror publish relay {relay}"))?;
239                }
240                client.connect().await;
241                Some(client)
242            }
243        } else {
244            None
245        };
246
247        let (shutdown_tx, shutdown_rx) = watch::channel(false);
248        let event_publish_state = Self::root_publish_state_from_disk(
249            store.base_path(),
250            config.published_event_tree_name.as_deref(),
251            "event root",
252        );
253        let profile_search_publish_state = Self::root_publish_state_from_disk(
254            store.base_path(),
255            config.published_profile_search_tree_name.as_deref(),
256            "profile-search root",
257        );
258        let profiles_by_pubkey_publish_state = Self::root_publish_state_from_disk(
259            store.base_path(),
260            config.published_profiles_by_pubkey_tree_name.as_deref(),
261            "profiles-by-pubkey root",
262        );
263        Ok(Self {
264            config,
265            store,
266            graph_store,
267            client,
268            publish_client,
269            event_publish_state: Arc::new(Mutex::new(event_publish_state)),
270            profile_search_publish_state: Arc::new(Mutex::new(profile_search_publish_state)),
271            profiles_by_pubkey_publish_state: Arc::new(Mutex::new(
272                profiles_by_pubkey_publish_state,
273            )),
274            pending_live_events: Mutex::new(BTreeMap::new()),
275            missing_profile_cursor: Mutex::new(0),
276            history_sync_lock: AsyncMutex::new(()),
277            shutdown_tx,
278            shutdown_rx,
279        })
280    }
281
282    fn root_publish_state_from_disk(
283        base_path: &std::path::Path,
284        tree_name: Option<&str>,
285        log_label: &str,
286    ) -> RootPublishState {
287        let Some(tree_name) = tree_name else {
288            return RootPublishState::default();
289        };
290        let path = Self::uploaded_root_state_path(base_path, tree_name);
291        let Some(root) = Self::read_uploaded_root_state(&path, log_label) else {
292            return RootPublishState::default();
293        };
294
295        RootPublishState {
296            last_uploaded_root: Some(root),
297            ..RootPublishState::default()
298        }
299    }
300
301    fn uploaded_root_state_path(base_path: &std::path::Path, tree_name: &str) -> PathBuf {
302        let safe_tree_name = tree_name
303            .chars()
304            .map(|ch| {
305                if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
306                    ch
307                } else {
308                    '_'
309                }
310            })
311            .collect::<String>();
312        base_path
313            .join(MIRROR_UPLOAD_STATE_DIR)
314            .join(format!("{safe_tree_name}{MIRROR_UPLOADED_ROOT_SUFFIX}"))
315    }
316
317    fn read_uploaded_root_state(
318        path: &std::path::Path,
319        log_label: &str,
320    ) -> Option<hashtree_core::Cid> {
321        let raw = match std::fs::read_to_string(path) {
322            Ok(raw) => raw,
323            Err(err) if err.kind() == std::io::ErrorKind::NotFound => return None,
324            Err(err) => {
325                warn!(
326                    "Nostr mirror failed to read uploaded {} state {}: {}",
327                    log_label,
328                    path.display(),
329                    err
330                );
331                return None;
332            }
333        };
334        let trimmed = raw.trim();
335        if trimmed.is_empty() {
336            return None;
337        }
338        match hashtree_core::Cid::parse(trimmed) {
339            Ok(root) => Some(root),
340            Err(err) => {
341                warn!(
342                    "Nostr mirror ignored invalid uploaded {} state {}: {}",
343                    log_label,
344                    path.display(),
345                    err
346                );
347                None
348            }
349        }
350    }
351
352    fn write_uploaded_root_state(
353        path: &std::path::Path,
354        root: &hashtree_core::Cid,
355        log_label: &str,
356    ) -> Result<()> {
357        if let Some(parent) = path.parent() {
358            std::fs::create_dir_all(parent)
359                .with_context(|| format!("create {}", parent.display()))?;
360        }
361        std::fs::write(path, format!("{root}\n"))
362            .with_context(|| format!("write uploaded {log_label} state {}", path.display()))
363    }
364
365    pub fn shutdown(&self) {
366        let _ = self.shutdown_tx.send(true);
367    }
368
369    fn sync_publish_roots_from_store(&self) -> Result<()> {
370        self.note_public_events_root_change()?;
371        self.note_profile_search_root_change()?;
372        self.note_profiles_by_pubkey_root_change()?;
373        Ok(())
374    }
375
376    async fn publish_pending_roots(
377        &self,
378        force_event: bool,
379        force_profile_search: bool,
380        force_profiles_by_pubkey: bool,
381    ) -> (Result<()>, Result<()>, Result<()>) {
382        tokio::join!(
383            self.maybe_publish_event_root(force_event),
384            self.maybe_publish_profile_search_root(force_profile_search),
385            self.maybe_publish_profiles_by_pubkey_root(force_profiles_by_pubkey),
386        )
387    }
388
389    async fn publish_priority_roots(
390        &self,
391        force_event: bool,
392        force_profile_search: bool,
393        force_profiles_by_pubkey: bool,
394    ) -> (Result<()>, Result<()>, Result<()>) {
395        let (profile_search_result, profiles_by_pubkey_result) = tokio::join!(
396            async {
397                if force_profile_search {
398                    self.maybe_publish_profile_search_root(true).await
399                } else {
400                    Ok(())
401                }
402            },
403            async {
404                if force_profiles_by_pubkey {
405                    self.maybe_publish_profiles_by_pubkey_root(true).await
406                } else {
407                    Ok(())
408                }
409            },
410        );
411        let event_result = if force_event {
412            self.maybe_publish_event_root(true).await
413        } else {
414            Ok(())
415        };
416        (
417            event_result,
418            profile_search_result,
419            profiles_by_pubkey_result,
420        )
421    }
422
423    pub async fn run(self: Arc<Self>) -> Result<()> {
424        if self.config.relays.is_empty() || self.config.max_follow_distance == 0 {
425            return Ok(());
426        }
427
428        info!(
429            "Nostr mirror starting: relays={} max_follow_distance={} negentropy_only={} kinds={:?} history_sync_author_chunk_size={} history_sync_on_start={} history_sync_on_reconnect={}",
430            self.config.relays.len(),
431            self.config.max_follow_distance,
432            self.config.require_negentropy,
433            self.config.kinds,
434            self.config.history_sync_author_chunk_size.max(1),
435            self.config.history_sync_on_start,
436            self.config.history_sync_on_reconnect
437        );
438
439        tokio::time::sleep(MIRROR_STARTUP_DELAY).await;
440        tokio::time::sleep(MIRROR_CONNECT_SETTLE_DELAY).await;
441        let live_since = Timestamp::now();
442        self.sync_publish_roots_from_store()?;
443        let (event_result, profile_search_result, profiles_by_pubkey_result) =
444            self.publish_priority_roots(true, true, true).await;
445        if let Err(err) = event_result {
446            warn!(
447                "Nostr mirror event-root publish failed on startup: {:#}",
448                err
449            );
450        }
451        if let Err(err) = profile_search_result {
452            warn!(
453                "Nostr mirror profile-search publish failed on startup: {:#}",
454                err
455            );
456        }
457        if let Err(err) = profiles_by_pubkey_result {
458            warn!(
459                "Nostr mirror profiles-by-pubkey publish failed on startup: {:#}",
460                err
461            );
462        }
463
464        let initial_authors = self.collect_authors()?;
465        if initial_authors.is_empty() {
466            info!("Nostr mirror: no social-graph authors to mirror yet");
467        }
468
469        let mut subscribed_authors = HashSet::new();
470        self.subscribe_authors_since(&initial_authors, live_since, &mut subscribed_authors)
471            .await?;
472
473        if !initial_authors.is_empty() && self.config.history_sync_on_start {
474            self.spawn_startup_history_sync(initial_authors.clone());
475        }
476
477        let mut relay_statuses = self.capture_relay_statuses().await;
478        let mut last_reconnect_history_sync_at: Option<Instant> = None;
479        let mut last_missing_profile_backfill_at: Option<Instant> = None;
480        let mut notifications = self.client.notifications();
481        let mut shutdown_rx = self.shutdown_rx.clone();
482        let mut refresh_interval = tokio::time::interval(MIRROR_AUTHOR_REFRESH_INTERVAL);
483        let mut publish_interval = tokio::time::interval(MIRROR_ROOT_PUBLISH_DEBOUNCE);
484
485        loop {
486            tokio::select! {
487                _ = shutdown_rx.changed() => {
488                    if *shutdown_rx.borrow() {
489                        break;
490                    }
491                }
492                _ = refresh_interval.tick() => {
493                    let authors = self.collect_authors()?;
494                    let new_authors = authors
495                        .into_iter()
496                        .filter(|author| !subscribed_authors.contains(author))
497                        .collect::<Vec<_>>();
498                    if !new_authors.is_empty() {
499                        debug!(
500                            "Nostr mirror discovered {} newly reachable author(s)",
501                            new_authors.len()
502                        );
503                        self.subscribe_authors_since(
504                            &new_authors,
505                            Timestamp::now(),
506                            &mut subscribed_authors,
507                        )
508                        .await?;
509                        self.spawn_author_history_sync(
510                            "new-author catch-up",
511                            new_authors.clone(),
512                            true,
513                            true,
514                        );
515                    }
516                    if self.should_backfill_missing_profiles(last_missing_profile_backfill_at) {
517                        let missing_profile_authors = self.collect_missing_profile_authors(
518                            self.config.missing_profile_backfill_batch_size,
519                        )?;
520                        if !missing_profile_authors.is_empty() {
521                            info!(
522                                "Nostr mirror missing-profile backfill starting: authors={}",
523                                missing_profile_authors.len()
524                            );
525                            self.spawn_missing_profile_backfill(missing_profile_authors);
526                            last_missing_profile_backfill_at = Some(Instant::now());
527                        }
528                    }
529                }
530                _ = publish_interval.tick() => {
531                    self.sync_publish_roots_from_store()?;
532                    if let Err(err) = self.flush_live_events().await {
533                        warn!("Nostr mirror live event flush failed: {:#}", err);
534                    }
535                    let (event_result, profile_search_result, profiles_by_pubkey_result) = self
536                        .publish_pending_roots(false, false, false)
537                        .await;
538                    if let Err(err) = event_result {
539                        warn!("Nostr mirror event-root publish failed: {:#}", err);
540                    }
541                    if let Err(err) = profile_search_result {
542                        warn!("Nostr mirror profile-search publish failed: {:#}", err);
543                    }
544                    if let Err(err) = profiles_by_pubkey_result {
545                        warn!("Nostr mirror profiles-by-pubkey publish failed: {:#}", err);
546                    }
547                }
548                notification = notifications.recv() => {
549                    match notification {
550                        Ok(RelayPoolNotification::Event { event, .. }) => {
551                            self.ingest_live_event(&event)?;
552                        }
553                        Ok(RelayPoolNotification::RelayStatus { relay_url, status }) => {
554                            let relay_url = relay_url.to_string();
555                            let previous = relay_statuses.insert(relay_url.clone(), status);
556                            if Self::should_history_sync_on_reconnect(
557                                self.config.history_sync_on_reconnect,
558                                previous,
559                                status,
560                            ) && Self::should_run_reconnect_history_sync(
561                                    last_reconnect_history_sync_at.as_ref(),
562                                )
563                            {
564                                let authors = self.collect_authors()?;
565                                if !authors.is_empty() {
566                                    info!(
567                                        "Nostr mirror relay reconnected; running catch-up history sync: relay={} authors={} negentropy_only={}",
568                                        relay_url,
569                                        authors.len(),
570                                        self.config.require_negentropy
571                                    );
572                                    self.spawn_author_history_sync(
573                                        "relay reconnect catch-up",
574                                        authors,
575                                        false,
576                                        false,
577                                    );
578                                    last_reconnect_history_sync_at = Some(Instant::now());
579                                }
580                            }
581                        }
582                        Ok(RelayPoolNotification::Shutdown) => break,
583                        Ok(_) => {}
584                        Err(err) => {
585                            warn!("Nostr mirror notification error: {}", err);
586                            break;
587                        }
588                    }
589                }
590            }
591        }
592
593        if let Err(err) = self.flush_live_events().await {
594            warn!(
595                "Nostr mirror live event flush failed during shutdown: {:#}",
596                err
597            );
598        }
599        if let Err(err) = self.sync_publish_roots_from_store() {
600            warn!(
601                "Nostr mirror root-state refresh failed during shutdown: {:#}",
602                err
603            );
604        }
605        let (event_result, profile_search_result, profiles_by_pubkey_result) =
606            self.publish_pending_roots(true, true, true).await;
607        if let Err(err) = event_result {
608            warn!(
609                "Nostr mirror event-root publish failed during shutdown: {:#}",
610                err
611            );
612        }
613        if let Err(err) = profile_search_result {
614            warn!(
615                "Nostr mirror profile-search publish failed during shutdown: {:#}",
616                err
617            );
618        }
619        if let Err(err) = profiles_by_pubkey_result {
620            warn!(
621                "Nostr mirror profiles-by-pubkey publish failed during shutdown: {:#}",
622                err
623            );
624        }
625        let _ = self.client.disconnect().await;
626        if let Some(client) = self.publish_client.as_ref() {
627            let _ = client.disconnect().await;
628        }
629        Ok(())
630    }
631
632    fn spawn_startup_history_sync(self: &Arc<Self>, initial_authors: Vec<String>) {
633        let mirror = Arc::clone(self);
634        tokio::task::spawn_blocking(move || {
635            let runtime = tokio::runtime::Builder::new_current_thread()
636                .enable_all()
637                .build()
638                .expect("build nostr mirror startup history sync runtime");
639            runtime.block_on(async move {
640                let _guard = mirror.history_sync_lock.lock().await;
641                if let Err(err) = mirror.run_startup_history_sync(initial_authors).await {
642                    warn!("Nostr mirror startup history sync failed: {:#}", err);
643                }
644            });
645        });
646    }
647
648    async fn run_startup_history_sync(&self, initial_authors: Vec<String>) -> Result<()> {
649        self.history_sync_full_text_notes_for_reachable_authors()
650            .await?;
651        self.history_sync_authors(initial_authors).await?;
652        if self.should_backfill_missing_profiles(None) {
653            let missing_profile_authors = self
654                .collect_missing_profile_authors(self.config.missing_profile_backfill_batch_size)?;
655            if !missing_profile_authors.is_empty() {
656                info!(
657                    "Nostr mirror missing-profile backfill starting: authors={}",
658                    missing_profile_authors.len()
659                );
660                self.history_sync_authors_with_kinds(
661                    missing_profile_authors,
662                    &[Kind::Metadata.as_u16()],
663                )
664                .await?;
665            }
666        }
667        Ok(())
668    }
669
670    fn spawn_author_history_sync(
671        self: &Arc<Self>,
672        label: &'static str,
673        authors: Vec<String>,
674        include_full_text_notes: bool,
675        wait_for_existing_sync: bool,
676    ) {
677        let mirror = Arc::clone(self);
678        tokio::task::spawn_blocking(move || {
679            let runtime = tokio::runtime::Builder::new_current_thread()
680                .enable_all()
681                .build()
682                .expect("build nostr mirror author history sync runtime");
683            runtime.block_on(async move {
684                if wait_for_existing_sync {
685                    let _guard = mirror.history_sync_lock.lock().await;
686                    if let Err(err) = mirror
687                        .run_author_history_sync(authors, include_full_text_notes)
688                        .await
689                    {
690                        warn!("Nostr mirror {label} failed: {:#}", err);
691                    }
692                    return;
693                }
694
695                let Ok(_guard) = mirror.history_sync_lock.try_lock() else {
696                    info!("Nostr mirror {label} skipped; another history sync is running");
697                    return;
698                };
699                if let Err(err) = mirror
700                    .run_author_history_sync(authors, include_full_text_notes)
701                    .await
702                {
703                    warn!("Nostr mirror {label} failed: {:#}", err);
704                }
705            });
706        });
707    }
708
709    async fn run_author_history_sync(
710        &self,
711        authors: Vec<String>,
712        include_full_text_notes: bool,
713    ) -> Result<()> {
714        if include_full_text_notes {
715            self.history_sync_full_text_notes_for_authors(authors.clone())
716                .await?;
717        }
718        self.history_sync_authors(authors).await
719    }
720
721    fn spawn_missing_profile_backfill(self: &Arc<Self>, authors: Vec<String>) {
722        let mirror = Arc::clone(self);
723        tokio::task::spawn_blocking(move || {
724            let runtime = tokio::runtime::Builder::new_current_thread()
725                .enable_all()
726                .build()
727                .expect("build nostr mirror missing profile runtime");
728            runtime.block_on(async move {
729                let Ok(_guard) = mirror.history_sync_lock.try_lock() else {
730                    info!(
731                        "Nostr mirror missing-profile backfill skipped; another history sync is running"
732                    );
733                    return;
734                };
735                if let Err(err) = mirror
736                    .history_sync_authors_with_kinds(authors, &[Kind::Metadata.as_u16()])
737                    .await
738                {
739                    warn!("Nostr mirror missing-profile backfill failed: {:#}", err);
740                }
741            });
742        });
743    }
744
745    async fn capture_relay_statuses(&self) -> HashMap<String, RelayStatus> {
746        let mut statuses = HashMap::new();
747        for (relay_url, relay) in self.client.relays().await {
748            statuses.insert(relay_url.to_string(), relay.status().await);
749        }
750        statuses
751    }
752
753    async fn has_connected_publish_relay(&self) -> bool {
754        let Some(client) = self.publish_client.as_ref() else {
755            return false;
756        };
757        Self::client_has_connected_relay(client).await
758    }
759
760    async fn client_has_connected_relay(client: &Client) -> bool {
761        for (_relay_url, relay) in client.relays().await {
762            if relay.status().await == RelayStatus::Connected {
763                return true;
764            }
765        }
766        false
767    }
768
769    fn collect_authors(&self) -> Result<Vec<String>> {
770        self.collect_authors_with_max_distance(self.config.max_follow_distance)
771    }
772
773    fn collect_authors_with_max_distance(&self, max_distance: u32) -> Result<Vec<String>> {
774        let mut authors = Vec::new();
775        let mut seen = HashSet::new();
776        for distance in 0..=max_distance {
777            for pubkey in socialgraph::SocialGraphBackend::users_by_follow_distance(
778                self.graph_store.as_ref(),
779                distance,
780            )
781            .with_context(|| format!("load social-graph distance {distance}"))?
782            {
783                if self
784                    .graph_store
785                    .is_overmuted_user(&pubkey, self.config.overmute_threshold)?
786                {
787                    continue;
788                }
789                let hex = hex::encode(pubkey);
790                if seen.insert(hex.clone()) {
791                    authors.push(hex);
792                }
793            }
794        }
795        Ok(authors)
796    }
797
798    async fn prioritize_full_text_note_history_authors(
799        &self,
800        authors: Vec<String>,
801    ) -> Result<Vec<String>> {
802        let Some(root) = self.graph_store.public_events_root()? else {
803            return Ok(authors);
804        };
805
806        let event_store = NostrEventStore::new(self.store.store_arc());
807        let mut prioritized = Vec::with_capacity(authors.len());
808        let mut sampled = 0usize;
809        for (index, author) in authors.into_iter().enumerate() {
810            let distance = match decode_hex_pubkey(&author) {
811                Some(pubkey) => self
812                    .graph_store
813                    .follow_distance(&pubkey)?
814                    .unwrap_or(u32::MAX),
815                None => u32::MAX,
816            };
817            let indexed_text_sample = if distance <= FULL_TEXT_HISTORY_PRIORITY_MAX_DISTANCE {
818                sampled = sampled.saturating_add(1);
819                event_store
820                    .list_by_author_and_kind(
821                        Some(&root),
822                        &author,
823                        Kind::TextNote.as_u16() as u32,
824                        ListEventsOptions {
825                            limit: Some(FULL_TEXT_HISTORY_PRIORITY_SAMPLE_LIMIT),
826                            ..ListEventsOptions::default()
827                        },
828                    )
829                    .await
830                    .with_context(|| {
831                        format!("sample indexed text-note history for author {author}")
832                    })?
833                    .len()
834            } else {
835                FULL_TEXT_HISTORY_PRIORITY_SAMPLE_LIMIT
836            };
837            prioritized.push((distance, indexed_text_sample, index, author));
838        }
839
840        prioritized.sort_by(|left, right| {
841            left.0
842                .cmp(&right.0)
843                .then_with(|| left.1.cmp(&right.1))
844                .then_with(|| left.2.cmp(&right.2))
845        });
846        info!(
847            "Nostr mirror full text content history prioritized authors: authors={} sampled_distance_le_{}={}",
848            prioritized.len(),
849            FULL_TEXT_HISTORY_PRIORITY_MAX_DISTANCE,
850            sampled
851        );
852        Ok(prioritized
853            .into_iter()
854            .map(|(_, _, _, author)| author)
855            .collect())
856    }
857
858    fn full_text_note_history_follow_distance(&self) -> Option<u32> {
859        let distance = self.config.full_text_note_history_follow_distance?;
860        if self
861            .config
862            .kinds
863            .iter()
864            .any(|kind| *kind == Kind::TextNote.as_u16() || *kind == KIND_LONG_FORM_CONTENT)
865        {
866            Some(distance.min(self.config.max_follow_distance))
867        } else {
868            None
869        }
870    }
871
872    fn full_text_note_history_max_relay_pages(&self) -> Option<usize> {
873        Self::full_text_note_history_max_relay_pages_for_config(&self.config)
874    }
875
876    fn full_text_note_history_max_relay_pages_for_config(
877        config: &NostrMirrorConfig,
878    ) -> Option<usize> {
879        let pages = config.full_text_note_history_max_relay_pages;
880        if pages == 0 {
881            None
882        } else {
883            Some(pages)
884        }
885    }
886
887    fn is_text_content_history_kind(kind: u16) -> bool {
888        kind == Kind::TextNote.as_u16() || kind == KIND_LONG_FORM_CONTENT
889    }
890
891    fn history_sync_kinds_for_config(config: &NostrMirrorConfig) -> Vec<u16> {
892        let mut kinds = config.kinds.clone();
893        kinds.retain(|kind| !Self::is_text_content_history_kind(*kind));
894        kinds
895    }
896
897    fn collect_missing_profile_authors(&self, limit: usize) -> Result<Vec<String>> {
898        if limit == 0 {
899            return Ok(Vec::new());
900        }
901
902        let authors = self.collect_authors()?;
903        if authors.is_empty() {
904            return Ok(Vec::new());
905        }
906
907        let mut cursor = self
908            .missing_profile_cursor
909            .lock()
910            .expect("missing profile cursor");
911        let mut index = (*cursor).min(authors.len());
912        let mut scanned = 0usize;
913        let mut missing = Vec::new();
914
915        while scanned < authors.len() && missing.len() < limit {
916            let author = &authors[index];
917            if self.graph_store.latest_profile_event(author)?.is_none() {
918                missing.push(author.clone());
919            }
920            index += 1;
921            if index == authors.len() {
922                index = 0;
923            }
924            scanned += 1;
925        }
926
927        *cursor = index;
928        Ok(missing)
929    }
930
931    fn should_backfill_missing_profiles(&self, last_run: Option<Instant>) -> bool {
932        if self.config.missing_profile_backfill_batch_size == 0
933            || !self.config.kinds.contains(&Kind::Metadata.as_u16())
934        {
935            return false;
936        }
937        match last_run {
938            Some(last_run) => last_run.elapsed() >= MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL,
939            None => true,
940        }
941    }
942
943    fn should_history_sync_on_reconnect(
944        history_sync_on_reconnect: bool,
945        previous: Option<RelayStatus>,
946        status: RelayStatus,
947    ) -> bool {
948        history_sync_on_reconnect
949            && status == RelayStatus::Connected
950            && matches!(
951                previous,
952                Some(
953                    RelayStatus::Initialized
954                        | RelayStatus::Pending
955                        | RelayStatus::Connecting
956                        | RelayStatus::Disconnected
957                        | RelayStatus::Terminated
958                )
959            )
960    }
961
962    fn should_run_reconnect_history_sync(last_run: Option<&Instant>) -> bool {
963        match last_run {
964            None => true,
965            Some(last_run) => last_run.elapsed() >= MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN,
966        }
967    }
968
969    fn is_metadata_only_history_sync(kinds: &[u16]) -> bool {
970        !kinds.is_empty() && kinds.iter().all(|kind| *kind == Kind::Metadata.as_u16())
971    }
972
973    fn history_sync_kinds_affect_profile_or_graph(kinds: &[u16]) -> bool {
974        kinds.is_empty()
975            || kinds.iter().any(|kind| {
976                *kind == Kind::Metadata.as_u16()
977                    || *kind == Kind::ContactList.as_u16()
978                    || *kind == Kind::MuteList.as_u16()
979            })
980    }
981
982    fn history_sync_plan_for(
983        config: &NostrMirrorConfig,
984        authors: usize,
985        kinds: &[u16],
986    ) -> HistorySyncPlan {
987        let author_batch_size = config.author_batch_size.max(1);
988        let per_author_event_limit = config.history_sync_per_author_event_limit.max(1);
989        let relay_page_size = 1_000;
990        let max_relay_pages = 10;
991
992        if Self::is_metadata_only_history_sync(kinds) {
993            return HistorySyncPlan {
994                relay_fetch_mode: RelayFetchMode::AuthorBatches,
995                author_batch_size: author_batch_size.min(METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE),
996                per_author_event_limit: METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT,
997                relay_page_size,
998                max_relay_pages,
999            };
1000        }
1001
1002        if authors > author_batch_size.saturating_mul(LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER) {
1003            return HistorySyncPlan {
1004                relay_fetch_mode: RelayFetchMode::GlobalRecent,
1005                author_batch_size,
1006                per_author_event_limit: per_author_event_limit
1007                    .min(LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT)
1008                    .max(1),
1009                relay_page_size,
1010                max_relay_pages: LARGE_HISTORY_SYNC_MAX_RELAY_PAGES,
1011            };
1012        }
1013
1014        HistorySyncPlan {
1015            relay_fetch_mode: RelayFetchMode::AuthorBatches,
1016            author_batch_size,
1017            per_author_event_limit,
1018            relay_page_size,
1019            max_relay_pages,
1020        }
1021    }
1022
1023    fn history_sync_plan(&self, authors: usize, kinds: &[u16]) -> HistorySyncPlan {
1024        Self::history_sync_plan_for(&self.config, authors, kinds)
1025    }
1026
1027    fn history_sync_chunk_size_for_config(
1028        config: &NostrMirrorConfig,
1029        authors: usize,
1030        kinds: &[u16],
1031        full_author_history: bool,
1032        chunk_size_override: Option<usize>,
1033    ) -> usize {
1034        let configured_chunk_size = chunk_size_override
1035            .unwrap_or(config.history_sync_author_chunk_size)
1036            .max(1);
1037        if full_author_history {
1038            return 1;
1039        }
1040        if !full_author_history
1041            && Self::history_sync_plan_for(config, authors, kinds).relay_fetch_mode
1042                == RelayFetchMode::GlobalRecent
1043        {
1044            return authors.max(1);
1045        }
1046        configured_chunk_size
1047    }
1048
1049    async fn history_sync_authors(&self, authors: Vec<String>) -> Result<()> {
1050        let kinds = Self::history_sync_kinds_for_config(&self.config);
1051        if kinds.is_empty() {
1052            info!("Nostr mirror history sync skipped: no enabled history kinds");
1053            return Ok(());
1054        }
1055        self.history_sync_authors_with_kinds(authors, &kinds).await
1056    }
1057
1058    async fn history_sync_authors_with_kinds(
1059        &self,
1060        authors: Vec<String>,
1061        kinds: &[u16],
1062    ) -> Result<()> {
1063        self.history_sync_authors_with_kinds_and_mode(authors, kinds, false, None)
1064            .await
1065    }
1066
1067    async fn history_sync_full_text_notes_for_reachable_authors(&self) -> Result<()> {
1068        let Some(distance) = self.full_text_note_history_follow_distance() else {
1069            return Ok(());
1070        };
1071        if self.full_text_note_history_max_relay_pages().is_none() {
1072            info!("Nostr mirror full text content history sync skipped: max_relay_pages=0");
1073            return Ok(());
1074        }
1075        info!(
1076            "Nostr mirror full text content history author collection starting: max_follow_distance={distance}"
1077        );
1078        let authors = self
1079            .prioritize_full_text_note_history_authors(
1080                self.collect_authors_with_max_distance(distance)?,
1081            )
1082            .await?;
1083        let Some(max_relay_pages) = self.full_text_note_history_max_relay_pages() else {
1084            info!("Nostr mirror full text content history sync skipped: max_relay_pages=0");
1085            return Ok(());
1086        };
1087        self.history_sync_distance_filtered_full_text_notes(authors, distance, max_relay_pages)
1088            .await
1089    }
1090
1091    async fn history_sync_full_text_notes_for_authors(&self, authors: Vec<String>) -> Result<()> {
1092        let Some(distance) = self.full_text_note_history_follow_distance() else {
1093            return Ok(());
1094        };
1095        let Some(max_relay_pages) = self.full_text_note_history_max_relay_pages() else {
1096            info!("Nostr mirror full text content history sync skipped: max_relay_pages=0");
1097            return Ok(());
1098        };
1099        let mut close_authors = Vec::new();
1100        for author in authors {
1101            let Some(pubkey) = decode_hex_pubkey(&author) else {
1102                continue;
1103            };
1104            if self
1105                .graph_store
1106                .follow_distance(&pubkey)?
1107                .is_some_and(|actual_distance| actual_distance <= distance)
1108            {
1109                close_authors.push(author);
1110            }
1111        }
1112        if close_authors.is_empty() {
1113            return Ok(());
1114        }
1115        let close_authors = self
1116            .prioritize_full_text_note_history_authors(close_authors)
1117            .await?;
1118
1119        self.history_sync_distance_filtered_full_text_notes(
1120            close_authors,
1121            distance,
1122            max_relay_pages,
1123        )
1124        .await
1125    }
1126
1127    async fn history_sync_distance_filtered_full_text_notes(
1128        &self,
1129        close_authors: Vec<String>,
1130        distance: u32,
1131        max_relay_pages: usize,
1132    ) -> Result<()> {
1133        if close_authors.is_empty() {
1134            return Ok(());
1135        }
1136
1137        info!(
1138            "Nostr mirror full text content history sync starting: authors={} max_follow_distance={} max_relay_pages={}",
1139            close_authors.len(),
1140            distance,
1141            max_relay_pages
1142        );
1143        let kinds = [Kind::TextNote.as_u16(), KIND_LONG_FORM_CONTENT];
1144        self.history_sync_authors_with_kinds_and_mode(
1145            close_authors,
1146            &kinds,
1147            true,
1148            Some(max_relay_pages),
1149        )
1150        .await
1151    }
1152
1153    async fn history_sync_authors_with_kinds_and_mode(
1154        &self,
1155        authors: Vec<String>,
1156        kinds: &[u16],
1157        full_author_history: bool,
1158        max_relay_pages: Option<usize>,
1159    ) -> Result<()> {
1160        let update_profile_and_graph = Self::history_sync_kinds_affect_profile_or_graph(kinds);
1161        let chunk_size = Self::history_sync_chunk_size_for_config(
1162            &self.config,
1163            authors.len(),
1164            kinds,
1165            full_author_history,
1166            None,
1167        );
1168        self.history_sync_authors_chunked(
1169            authors,
1170            |current_root, author_chunk| async move {
1171                self.history_sync_author_chunk(
1172                    current_root,
1173                    author_chunk,
1174                    kinds,
1175                    full_author_history,
1176                    max_relay_pages,
1177                )
1178                .await
1179            },
1180            update_profile_and_graph,
1181            Some(chunk_size),
1182        )
1183        .await
1184    }
1185
1186    async fn history_sync_authors_chunked<F, Fut>(
1187        &self,
1188        authors: Vec<String>,
1189        mut run_chunk: F,
1190        update_profile_and_graph: bool,
1191        chunk_size_override: Option<usize>,
1192    ) -> Result<()>
1193    where
1194        F: FnMut(Option<hashtree_core::Cid>, Vec<String>) -> Fut,
1195        Fut: std::future::Future<Output = Result<CrawlReport>>,
1196    {
1197        if authors.is_empty() {
1198            return Ok(());
1199        }
1200
1201        info!(
1202            "Nostr mirror history sync starting: authors={} relays={} negentropy_only={}",
1203            authors.len(),
1204            self.config.relays.len(),
1205            self.config.require_negentropy
1206        );
1207
1208        let mut current_root = self.graph_store.public_events_root_for_write()?;
1209        let mut last_error = None;
1210        let mut applied_chunks = 0usize;
1211        let mut failed_chunks = 0usize;
1212        let chunk_size = chunk_size_override
1213            .unwrap_or(self.config.history_sync_author_chunk_size)
1214            .max(1);
1215        let total_chunks = authors.len().div_ceil(chunk_size);
1216
1217        for (chunk_index, author_chunk) in authors.chunks(chunk_size).enumerate() {
1218            let author_chunk = author_chunk.to_vec();
1219            let author_count = author_chunk.len();
1220            info!(
1221                "Nostr mirror history sync chunk starting: chunk={}/{} authors={}",
1222                chunk_index + 1,
1223                total_chunks,
1224                author_count
1225            );
1226            let mut report = match run_chunk(current_root.clone(), author_chunk.clone()).await {
1227                Ok(report) => report,
1228                Err(err) => {
1229                    failed_chunks = failed_chunks.saturating_add(1);
1230                    warn!(
1231                        "Nostr mirror history sync chunk failed: chunk={}/{} authors={} error={:#}",
1232                        chunk_index + 1,
1233                        total_chunks,
1234                        author_count,
1235                        err
1236                    );
1237                    last_error = Some(err);
1238                    trim_transient_allocations();
1239                    continue;
1240                }
1241            };
1242
1243            let latest_root = self.graph_store.public_events_root_for_write()?;
1244            if latest_root != current_root {
1245                info!(
1246                    "Nostr mirror history sync root advanced while chunk was fetching; merging chunk into latest root: chunk={}/{} authors={} events_applied={}",
1247                    chunk_index + 1,
1248                    total_chunks,
1249                    author_count,
1250                    report.applied_events.len()
1251                );
1252                if report.applied_events.is_empty() {
1253                    report.root = latest_root.clone();
1254                } else {
1255                    let event_store = NostrEventStore::new(self.store.store_arc());
1256                    report.root = event_store
1257                        .build(latest_root.as_ref(), report.applied_events.clone())
1258                        .await
1259                        .context("merge history chunk into latest mirrored event root")?;
1260                }
1261                current_root = latest_root;
1262            }
1263
1264            if report.root != current_root {
1265                self.apply_history_root_with_options(
1266                    report.root.as_ref(),
1267                    update_profile_and_graph,
1268                    true,
1269                    Some(&report.applied_events),
1270                )
1271                .await?;
1272                current_root = report.root.clone();
1273                info!(
1274                    "Nostr mirror history sync updated trusted root: chunk={}/{} authors_processed={} events_selected={} events_seen={}",
1275                    chunk_index + 1,
1276                    total_chunks,
1277                    report.authors_processed,
1278                    report.events_selected,
1279                    report.events_seen
1280                );
1281            }
1282            applied_chunks = applied_chunks.saturating_add(1);
1283            trim_transient_allocations();
1284        }
1285
1286        if applied_chunks == 0 {
1287            return Err(last_error
1288                .unwrap_or_else(|| anyhow::anyhow!("mirror history sync made no progress"))
1289                .context("run mirror history sync"));
1290        }
1291        if failed_chunks > 0 {
1292            warn!(
1293                "Nostr mirror history sync completed with skipped chunks: applied_chunks={} failed_chunks={}",
1294                applied_chunks, failed_chunks
1295            );
1296        }
1297        Ok(())
1298    }
1299
1300    async fn history_sync_author_chunk(
1301        &self,
1302        current_root: Option<hashtree_core::Cid>,
1303        authors: Vec<String>,
1304        kinds: &[u16],
1305        full_author_history: bool,
1306        max_relay_pages: Option<usize>,
1307    ) -> Result<CrawlReport> {
1308        let mut last_error = None;
1309        let mut report = None;
1310        let mut plan = self.history_sync_plan(authors.len(), kinds);
1311        if full_author_history {
1312            plan.relay_fetch_mode = RelayFetchMode::AuthorBatches;
1313            plan.max_relay_pages = max_relay_pages.unwrap_or(plan.max_relay_pages);
1314        }
1315        for attempt in 0..3 {
1316            let mut last_logged_authors = 0usize;
1317            let bridge = NostrBridge::new(
1318                self.store.store_arc(),
1319                CrawlConfig {
1320                    relays: self.config.relays.clone(),
1321                    author_allowlist: Some(authors.clone()),
1322                    max_live_bytes: None,
1323                    max_events_seen: None,
1324                    max_authors: None,
1325                    max_follow_distance: None,
1326                    author_batch_size: plan.author_batch_size,
1327                    per_author_event_limit: plan.per_author_event_limit,
1328                    per_author_live_bytes: None,
1329                    fetch_timeout: self.config.fetch_timeout,
1330                    kinds: Some(kinds.to_vec()),
1331                    relay_fetch_mode: plan.relay_fetch_mode,
1332                    require_negentropy: self.config.require_negentropy,
1333                    relay_event_max_size: self.config.relay_event_max_size,
1334                    relay_page_size: plan.relay_page_size,
1335                    max_relay_pages: plan.max_relay_pages,
1336                    full_author_history,
1337                },
1338            );
1339
1340            let crawl = bridge.crawl_with_progress(
1341                self.graph_store.as_ref(),
1342                current_root.as_ref(),
1343                |progress| {
1344                    let log_interval = self.config.author_batch_size.saturating_mul(8).max(2_048);
1345                    let should_log = progress.authors_processed == progress.authors_considered
1346                        || progress.authors_processed == 0
1347                        || progress
1348                            .authors_processed
1349                            .saturating_sub(last_logged_authors)
1350                            >= log_interval;
1351                    if should_log {
1352                        last_logged_authors = progress.authors_processed;
1353                        info!(
1354                            "Nostr mirror history sync progress: authors_processed={}/{} events_selected={} events_seen={}",
1355                            progress.authors_processed,
1356                            progress.authors_considered,
1357                            progress.events_selected,
1358                            progress.events_seen
1359                        );
1360                    }
1361                },
1362            );
1363            let crawl_result: Result<CrawlReport> = if full_author_history {
1364                let timeout = self.config.fetch_timeout.saturating_mul(4);
1365                match tokio::time::timeout(timeout, crawl).await {
1366                    Ok(result) => result.map_err(Into::into),
1367                    Err(_) => Err(anyhow::anyhow!(
1368                        "full author history crawl timed out after {:?} for {} author(s)",
1369                        timeout,
1370                        authors.len()
1371                    )),
1372                }
1373            } else {
1374                crawl.await.map_err(Into::into)
1375            };
1376
1377            match crawl_result {
1378                Ok(next_report) => {
1379                    report = Some(next_report);
1380                    break;
1381                }
1382                Err(err) => {
1383                    last_error = Some(err);
1384                    if full_author_history {
1385                        break;
1386                    }
1387                    if attempt < 2 {
1388                        tokio::time::sleep(Duration::from_millis(500)).await;
1389                    }
1390                }
1391            }
1392        }
1393        report
1394            .ok_or_else(|| last_error.expect("history sync retry captured error"))
1395            .context("run mirror history sync")
1396    }
1397
1398    #[cfg(test)]
1399    async fn apply_history_root(&self, root: Option<&hashtree_core::Cid>) -> Result<()> {
1400        self.apply_history_root_with_options(root, true, true, None)
1401            .await
1402    }
1403
1404    async fn apply_history_root_with_options(
1405        &self,
1406        root: Option<&hashtree_core::Cid>,
1407        update_profile_and_graph: bool,
1408        publish_roots: bool,
1409        applied_events: Option<&[hashtree_nostr::StoredNostrEvent]>,
1410    ) -> Result<()> {
1411        self.graph_store.write_public_events_root(root)?;
1412        let Some(root) = root else {
1413            return Ok(());
1414        };
1415
1416        self.note_public_events_root_change()?;
1417        if update_profile_and_graph {
1418            let events = match applied_events {
1419                Some(events) => events
1420                    .iter()
1421                    .cloned()
1422                    .map(socialgraph::stored_event_to_nostr_event)
1423                    .collect::<Result<Vec<_>>>()?,
1424                None => {
1425                    let event_store = NostrEventStore::new(self.store.store_arc());
1426                    event_store
1427                        .list_recent_lossy(Some(root), ListEventsOptions::default())
1428                        .await
1429                        .context("list trusted mirrored events")?
1430                        .into_iter()
1431                        .map(socialgraph::stored_event_to_nostr_event)
1432                        .collect::<Result<Vec<_>>>()?
1433                }
1434            };
1435
1436            socialgraph::ingest_graph_parsed_events(self.graph_store.as_ref(), &events)
1437                .context("sync mirrored social graph state")?;
1438            if applied_events.is_some() {
1439                self.graph_store
1440                    .sync_profile_index_for_events(&events)
1441                    .context("update mirrored profile search index")?;
1442            } else {
1443                self.graph_store
1444                    .rebuild_profile_index_for_events(&events)
1445                    .context("rebuild mirrored profile search index")?;
1446            }
1447            self.note_profile_search_root_change()?;
1448            self.note_profiles_by_pubkey_root_change()?;
1449        }
1450        if !publish_roots {
1451            return Ok(());
1452        }
1453        let (event_result, profile_search_result, profiles_by_pubkey_result) = self
1454            .publish_priority_roots(true, update_profile_and_graph, update_profile_and_graph)
1455            .await;
1456        if let Err(err) = event_result {
1457            warn!(
1458                "Nostr mirror event-root publish failed after root update: {:#}",
1459                err
1460            );
1461        }
1462        if let Err(err) = profile_search_result {
1463            warn!(
1464                "Nostr mirror profile-search publish failed after root update: {:#}",
1465                err
1466            );
1467        }
1468        if let Err(err) = profiles_by_pubkey_result {
1469            warn!(
1470                "Nostr mirror profiles-by-pubkey publish failed after root update: {:#}",
1471                err
1472            );
1473        }
1474        Ok(())
1475    }
1476
1477    async fn subscribe_authors_since(
1478        &self,
1479        authors: &[String],
1480        since: Timestamp,
1481        subscribed_authors: &mut HashSet<String>,
1482    ) -> Result<()> {
1483        let new_authors = authors
1484            .iter()
1485            .filter(|author| !subscribed_authors.contains(*author))
1486            .cloned()
1487            .collect::<Vec<_>>();
1488        if new_authors.is_empty() {
1489            return Ok(());
1490        }
1491
1492        for chunk in new_authors.chunks(self.config.author_batch_size.max(1)) {
1493            let pubkeys = chunk
1494                .iter()
1495                .filter_map(|author| PublicKey::from_hex(author).ok())
1496                .collect::<Vec<_>>();
1497            if pubkeys.is_empty() {
1498                continue;
1499            }
1500
1501            let filter = Filter::new()
1502                .authors(pubkeys)
1503                .kinds(self.config.kinds.iter().copied().map(Kind::from))
1504                .since(since);
1505
1506            if let Err(err) = self.client.subscribe(vec![filter], None).await {
1507                warn!(
1508                    "Nostr mirror author subscription failed: authors={} error={:#}",
1509                    chunk.len(),
1510                    err
1511                );
1512                continue;
1513            }
1514            subscribed_authors.extend(chunk.iter().cloned());
1515        }
1516        Ok(())
1517    }
1518
1519    fn ingest_live_event(&self, event: &Event) -> Result<()> {
1520        self.pending_live_events
1521            .lock()
1522            .expect("pending live events")
1523            .insert(event.id.to_hex(), event.clone());
1524        Ok(())
1525    }
1526
1527    async fn flush_live_events(&self) -> Result<()> {
1528        let pending = {
1529            let mut pending = self
1530                .pending_live_events
1531                .lock()
1532                .expect("pending live events");
1533            if pending.is_empty() {
1534                return Ok(());
1535            }
1536            std::mem::take(&mut *pending)
1537        };
1538        let events = pending.into_values().collect::<Vec<_>>();
1539        let event_count = events.len();
1540        let previous_event_root = self.graph_store.public_events_root()?;
1541        let previous_profile_search_root = self.graph_store.profile_search_root()?;
1542        let previous_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
1543
1544        socialgraph::ingest_parsed_events_with_storage_class(
1545            self.graph_store.as_ref(),
1546            &events,
1547            socialgraph::EventStorageClass::Public,
1548        )
1549        .context("ingest live mirrored event batch")?;
1550
1551        let next_event_root = self.graph_store.public_events_root()?;
1552        let next_profile_search_root = self.graph_store.profile_search_root()?;
1553        let next_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
1554        let event_root_changed = next_event_root != previous_event_root;
1555        let profile_search_root_changed = next_profile_search_root != previous_profile_search_root;
1556        let profiles_by_pubkey_root_changed =
1557            next_profiles_by_pubkey_root != previous_profiles_by_pubkey_root;
1558
1559        if event_root_changed {
1560            self.note_public_events_root_change()?;
1561        }
1562        if profile_search_root_changed {
1563            self.note_profile_search_root_change()?;
1564        }
1565        if profiles_by_pubkey_root_changed {
1566            self.note_profiles_by_pubkey_root_change()?;
1567        }
1568        if profile_search_root_changed {
1569            self.maybe_publish_profile_search_root(false).await?;
1570        }
1571        if profiles_by_pubkey_root_changed {
1572            self.maybe_publish_profiles_by_pubkey_root(false).await?;
1573        }
1574        if event_root_changed {
1575            self.maybe_publish_event_root(false).await?;
1576        }
1577        info!(
1578            "Nostr mirror flushed live events: events={} event_root_changed={} profile_search_root_changed={} profiles_by_pubkey_root_changed={}",
1579            event_count,
1580            event_root_changed,
1581            profile_search_root_changed,
1582            profiles_by_pubkey_root_changed
1583        );
1584        Ok(())
1585    }
1586
1587    fn note_public_events_root_change(&self) -> Result<()> {
1588        let root = self.graph_store.public_events_root()?;
1589        Self::note_root_change(
1590            self.config.published_event_tree_name.as_deref(),
1591            &self.event_publish_state,
1592            root,
1593        )
1594    }
1595
1596    fn note_profile_search_root_change(&self) -> Result<()> {
1597        let root = self.graph_store.profile_search_root()?;
1598        Self::note_root_change(
1599            self.config.published_profile_search_tree_name.as_deref(),
1600            &self.profile_search_publish_state,
1601            root,
1602        )
1603    }
1604
1605    fn note_profiles_by_pubkey_root_change(&self) -> Result<()> {
1606        let root = self.graph_store.profiles_by_pubkey_root()?;
1607        Self::note_root_change(
1608            self.config
1609                .published_profiles_by_pubkey_tree_name
1610                .as_deref(),
1611            &self.profiles_by_pubkey_publish_state,
1612            root,
1613        )
1614    }
1615
1616    fn note_root_change(
1617        tree_name: Option<&str>,
1618        publish_state: &Arc<Mutex<RootPublishState>>,
1619        root: Option<hashtree_core::Cid>,
1620    ) -> Result<()> {
1621        let Some(_tree_name) = tree_name else {
1622            return Ok(());
1623        };
1624
1625        let mut state = publish_state.lock().expect("root publish state");
1626        let now = Instant::now();
1627
1628        if state.pending_root == root {
1629            return Ok(());
1630        }
1631
1632        state.pending_root = root;
1633        state.last_upload_failed_at = None;
1634        state.last_upload_error = None;
1635        state.last_changed_at = Some(now);
1636        if state.dirty_since.is_none() {
1637            state.dirty_since = Some(now);
1638        }
1639        Ok(())
1640    }
1641
1642    async fn maybe_publish_event_root(&self, force: bool) -> Result<()> {
1643        if self.take_missing_blob_event_upload_error() {
1644            return self.rebuild_event_indexes_after_missing_blobs(force).await;
1645        }
1646        let result = self
1647            .maybe_publish_root(
1648                self.config.published_event_tree_name.as_deref(),
1649                &self.event_publish_state,
1650                "event root",
1651                force,
1652                false,
1653            )
1654            .await;
1655        let Err(error) = result else {
1656            if self.take_missing_blob_event_upload_error() {
1657                return self.rebuild_event_indexes_after_missing_blobs(force).await;
1658            }
1659            return Ok(());
1660        };
1661        if !is_missing_local_blob_push_error(&error) {
1662            return Err(error);
1663        }
1664
1665        self.rebuild_event_indexes_after_missing_blobs(force).await
1666    }
1667
1668    fn take_missing_blob_event_upload_error(&self) -> bool {
1669        let mut state = self
1670            .event_publish_state
1671            .lock()
1672            .expect("event root publish state");
1673        if state.upload_in_progress_root.is_some() {
1674            return false;
1675        }
1676        if !state.missing_blob_rebuild_required {
1677            return false;
1678        }
1679        state.missing_blob_rebuild_required = false;
1680        state.last_upload_error = None;
1681        state.last_upload_failed_at = None;
1682        true
1683    }
1684
1685    async fn rebuild_event_indexes_after_missing_blobs(&self, force: bool) -> Result<()> {
1686        warn!(
1687            "Nostr mirror event root DAG references missing local blobs; rebuilding event indexes from stored events"
1688        );
1689        let (public_count, ambient_count) = self
1690            .graph_store
1691            .rebuild_event_indexes_from_stored_events_async()
1692            .await
1693            .context("rebuild event indexes after missing event blobs")?;
1694        info!(
1695            "Nostr mirror rebuilt event indexes after missing blobs: public={} ambient={}",
1696            public_count, ambient_count
1697        );
1698        self.sync_publish_roots_from_store()?;
1699
1700        self.maybe_publish_root(
1701            self.config.published_event_tree_name.as_deref(),
1702            &self.event_publish_state,
1703            "event root",
1704            force,
1705            false,
1706        )
1707        .await
1708    }
1709
1710    async fn maybe_publish_profile_search_root(&self, force: bool) -> Result<()> {
1711        self.maybe_publish_root(
1712            self.config.published_profile_search_tree_name.as_deref(),
1713            &self.profile_search_publish_state,
1714            "profile search root",
1715            force,
1716            true,
1717        )
1718        .await
1719    }
1720
1721    async fn maybe_publish_profiles_by_pubkey_root(&self, force: bool) -> Result<()> {
1722        self.maybe_publish_root(
1723            self.config
1724                .published_profiles_by_pubkey_tree_name
1725                .as_deref(),
1726            &self.profiles_by_pubkey_publish_state,
1727            "profiles-by-pubkey root",
1728            force,
1729            true,
1730        )
1731        .await
1732    }
1733
1734    async fn maybe_publish_root(
1735        &self,
1736        tree_name: Option<&str>,
1737        publish_state: &Arc<Mutex<RootPublishState>>,
1738        log_label: &str,
1739        force: bool,
1740        publish_before_upload_ready_on_force: bool,
1741    ) -> Result<()> {
1742        let Some(tree_name) = tree_name else {
1743            return Ok(());
1744        };
1745
1746        let pending_root = {
1747            let state = publish_state.lock().expect("root publish state");
1748            let Some(pending_root) = state.pending_root.clone() else {
1749                return Ok(());
1750            };
1751
1752            let now = Instant::now();
1753            let debounce_ready = state.last_changed_at.is_some_and(|changed_at| {
1754                now.duration_since(changed_at) >= MIRROR_ROOT_PUBLISH_DEBOUNCE
1755            });
1756            let stale_ready = state.dirty_since.is_some_and(|dirty_since| {
1757                now.duration_since(dirty_since) >= MIRROR_ROOT_PUBLISH_MAX_STALENESS
1758            });
1759            if !force && !debounce_ready && !stale_ready {
1760                return Ok(());
1761            }
1762
1763            pending_root
1764        };
1765
1766        let upload_started = self.maybe_start_background_root_upload(
1767            tree_name,
1768            &pending_root,
1769            publish_state,
1770            log_label,
1771        );
1772        let upload_required = !self.config.blossom_write_servers.is_empty();
1773        let (upload_ready, publish_root) = {
1774            let state = publish_state.lock().expect("root publish state");
1775            let upload_ready =
1776                !upload_required || state.last_uploaded_root.as_ref() == Some(&pending_root);
1777            let publish_root = if upload_ready {
1778                Some(pending_root.clone())
1779            } else {
1780                state.last_uploaded_root.clone().filter(|uploaded_root| {
1781                    state.last_published_root.as_ref() != Some(uploaded_root)
1782                })
1783            };
1784            (upload_ready, publish_root)
1785        };
1786        let publish_before_upload_ready =
1787            force && upload_required && !upload_ready && publish_before_upload_ready_on_force;
1788        let publish_root = if let Some(publish_root) = publish_root {
1789            publish_root
1790        } else if publish_before_upload_ready {
1791            pending_root.clone()
1792        } else {
1793            if upload_started {
1794                info!(
1795                    "Nostr mirror uploading {} DAG before publish: tree={} hash={}",
1796                    log_label,
1797                    tree_name,
1798                    hex::encode(pending_root.hash),
1799                );
1800            }
1801            return Ok(());
1802        };
1803
1804        let mut successful_relays = Vec::new();
1805        let mut failed_relays = Vec::new();
1806        let mut published_now = false;
1807        let publish_required =
1808            self.publish_client.is_some() && !self.config.publish_relays.is_empty();
1809        if publish_required {
1810            let Some(publish_client) = self.publish_client.as_ref() else {
1811                unreachable!("publish_required implies publish_client");
1812            };
1813            if !self.has_connected_publish_relay().await {
1814                return Ok(());
1815            }
1816            if publish_before_upload_ready {
1817                info!(
1818                    "Nostr mirror publishing {} before Blossom upload completes: tree={} hash={}",
1819                    log_label,
1820                    tree_name,
1821                    hex::encode(pending_root.hash),
1822                );
1823            } else if !upload_ready {
1824                info!(
1825                    "Nostr mirror publishing uploaded {} while newer root is still uploading: tree={} published_hash={} pending_hash={}",
1826                    log_label,
1827                    tree_name,
1828                    hex::encode(publish_root.hash),
1829                    hex::encode(pending_root.hash),
1830                );
1831            }
1832
1833            let already_published = {
1834                let state = publish_state.lock().expect("root publish state");
1835                state.last_published_root.as_ref() == Some(&publish_root)
1836            };
1837            if !already_published {
1838                let publish_relays = self.config.publish_relays.clone();
1839                let latest_known_created_at = {
1840                    let state = publish_state.lock().expect("root publish state");
1841                    state.last_published_created_at
1842                };
1843                let publish_created_at =
1844                    next_replaceable_created_at(Timestamp::now(), latest_known_created_at);
1845                let event = publish_client
1846                    .sign_event_builder(Self::build_public_root_event(
1847                        tree_name,
1848                        &publish_root,
1849                        publish_created_at,
1850                    ))
1851                    .await
1852                    .with_context(|| format!("sign {log_label} event"))?;
1853                let publish_result = self
1854                    .publish_root_event_to_relays(publish_client, &publish_relays, &event)
1855                    .await
1856                    .with_context(|| format!("publish {log_label} event"))?;
1857                successful_relays = publish_result.0;
1858                failed_relays = publish_result.1;
1859                if successful_relays.is_empty() {
1860                    let failure_summary = if failed_relays.is_empty() {
1861                        "no publish relays accepted the event".to_string()
1862                    } else {
1863                        failed_relays.join("; ")
1864                    };
1865                    anyhow::bail!("no publish relays accepted the event ({failure_summary})");
1866                }
1867
1868                let mut state = publish_state.lock().expect("root publish state");
1869                if state.pending_root.as_ref() == Some(&pending_root)
1870                    || state.last_uploaded_root.as_ref() == Some(&publish_root)
1871                    || publish_before_upload_ready
1872                {
1873                    state.last_published_root = Some(publish_root.clone());
1874                    state.last_published_at = Some(Instant::now());
1875                    state.last_published_created_at = Some(event.created_at);
1876                }
1877                published_now = true;
1878            }
1879        }
1880
1881        {
1882            let mut state = publish_state.lock().expect("root publish state");
1883            if state.pending_root.as_ref() == Some(&pending_root) {
1884                let upload_satisfied = self.config.blossom_write_servers.is_empty()
1885                    || state.last_uploaded_root.as_ref() == Some(&pending_root);
1886                let publish_satisfied =
1887                    !publish_required || state.last_published_root.as_ref() == Some(&pending_root);
1888                if upload_satisfied && publish_satisfied {
1889                    state.dirty_since = None;
1890                }
1891            }
1892        }
1893
1894        if published_now {
1895            info!(
1896                "Nostr mirror published {}: tree={} hash={} relays={:?}",
1897                log_label,
1898                tree_name,
1899                hex::encode(publish_root.hash),
1900                successful_relays,
1901            );
1902        }
1903        if !failed_relays.is_empty() {
1904            warn!(
1905                "Nostr mirror publish had relay failures: tree={} failures={:?}",
1906                tree_name, failed_relays
1907            );
1908        }
1909        Ok(())
1910    }
1911
1912    fn maybe_start_background_root_upload(
1913        &self,
1914        tree_name: &str,
1915        pending_root: &hashtree_core::Cid,
1916        publish_state: &Arc<Mutex<RootPublishState>>,
1917        log_label: &str,
1918    ) -> bool {
1919        if self.config.blossom_write_servers.is_empty() {
1920            return false;
1921        }
1922
1923        let previous_uploaded_root = {
1924            let mut state = publish_state.lock().expect("root publish state");
1925            if state.last_uploaded_root.as_ref() == Some(pending_root)
1926                || state.upload_in_progress_root.is_some()
1927            {
1928                return false;
1929            }
1930            if state
1931                .last_upload_failed_at
1932                .is_some_and(|failed_at| failed_at.elapsed() < MIRROR_ROOT_UPLOAD_RETRY_INTERVAL)
1933            {
1934                return false;
1935            }
1936            state.upload_in_progress_root = Some(pending_root.clone());
1937            state.last_uploaded_root.clone()
1938        };
1939
1940        let store = Arc::clone(&self.store);
1941        let upload_state_path = Self::uploaded_root_state_path(store.base_path(), tree_name);
1942        let servers = self.config.blossom_write_servers.clone();
1943        let root = pending_root.clone();
1944        let publish_state = Arc::clone(publish_state);
1945        let log_label = log_label.to_string();
1946        tokio::task::spawn_blocking(move || {
1947            let runtime = tokio::runtime::Builder::new_current_thread()
1948                .enable_all()
1949                .build()
1950                .expect("build nostr mirror root upload runtime");
1951            runtime.block_on(async move {
1952                let result = background_blossom_push_incremental_with_store(
1953                    store,
1954                    root.clone(),
1955                    previous_uploaded_root,
1956                    &servers,
1957                )
1958                .await;
1959                let mut state = publish_state.lock().expect("root publish state");
1960                if state.upload_in_progress_root.as_ref() == Some(&root) {
1961                    state.upload_in_progress_root = None;
1962                }
1963                match result {
1964                    Ok(()) => {
1965                        if let Err(err) =
1966                            Self::write_uploaded_root_state(&upload_state_path, &root, &log_label)
1967                        {
1968                            warn!("Nostr mirror failed to persist uploaded root state: {err:#}");
1969                        }
1970                        state.last_uploaded_root = Some(root.clone());
1971                        state.last_uploaded_at = Some(Instant::now());
1972                        if state.pending_root.as_ref() == Some(&root) {
1973                            state.last_upload_failed_at = None;
1974                            state.last_upload_error = None;
1975                            state.missing_blob_rebuild_required = false;
1976                        }
1977                        info!(
1978                            "Nostr mirror uploaded {} DAG to Blossom: hash={}",
1979                            log_label,
1980                            hex::encode(root.hash)
1981                        );
1982                    }
1983                    Err(err) => {
1984                        if state.pending_root.as_ref() == Some(&root) {
1985                            state.last_upload_failed_at = Some(Instant::now());
1986                            state.last_upload_error = Some(format!("{err:#}"));
1987                        }
1988                        if is_missing_local_blob_message(&format!("{err:#}")) {
1989                            state.missing_blob_rebuild_required = true;
1990                        }
1991                        warn!(
1992                            "Nostr mirror {} DAG upload failed: hash={} error={:#}",
1993                            log_label,
1994                            hex::encode(root.hash),
1995                            err
1996                        );
1997                    }
1998                }
1999            });
2000            trim_transient_allocations();
2001        });
2002
2003        true
2004    }
2005
2006    async fn publish_root_event_to_relays(
2007        &self,
2008        publish_client: &Client,
2009        relays: &[String],
2010        event: &Event,
2011    ) -> Result<(Vec<String>, Vec<String>)> {
2012        let primary_send = Self::send_root_event_to_relays(publish_client, relays, event);
2013        let (mut successful_relays, mut failed_relays) =
2014            match tokio::time::timeout(MIRROR_ROOT_PUBLISH_PRIMARY_TIMEOUT, primary_send).await {
2015                Ok(result) => result,
2016                Err(_) => (
2017                    Vec::new(),
2018                    vec![format!(
2019                        "publish relays: primary publish timed out after {:?}",
2020                        MIRROR_ROOT_PUBLISH_PRIMARY_TIMEOUT
2021                    )],
2022                ),
2023            };
2024
2025        if successful_relays.is_empty() {
2026            let (retry_successes, retry_failures) = self
2027                .publish_root_event_with_fresh_client(relays, event)
2028                .await;
2029            successful_relays.extend(retry_successes);
2030            failed_relays.extend(retry_failures);
2031        }
2032
2033        Ok((successful_relays, failed_relays))
2034    }
2035
2036    async fn send_root_event_to_relays(
2037        publish_client: &Client,
2038        relays: &[String],
2039        event: &Event,
2040    ) -> (Vec<String>, Vec<String>) {
2041        let mut successful_relays = Vec::new();
2042        let mut failed_relays = Vec::new();
2043
2044        match publish_client
2045            .send_event_to(relays.iter().map(|relay| relay.as_str()), event.clone())
2046            .await
2047        {
2048            Ok(output) => {
2049                for relay in relays {
2050                    let relay_url = relay.trim_end_matches('/');
2051                    if output
2052                        .success
2053                        .iter()
2054                        .any(|url| url.as_str().trim_end_matches('/') == relay_url)
2055                    {
2056                        successful_relays.push(relay.clone());
2057                    }
2058                }
2059                failed_relays.extend(output.failed.into_iter().map(|(url, reason)| match reason {
2060                    Some(reason) => format!("{url}: {reason}"),
2061                    None => format!("{url}: relay rejected publish"),
2062                }));
2063            }
2064            Err(err) => {
2065                failed_relays.push(format!("publish relays: {err}"));
2066            }
2067        }
2068
2069        (successful_relays, failed_relays)
2070    }
2071
2072    async fn publish_root_event_with_fresh_client(
2073        &self,
2074        relays: &[String],
2075        event: &Event,
2076    ) -> (Vec<String>, Vec<String>) {
2077        let client = Client::with_opts(Keys::generate(), Options::new().wait_for_send(false));
2078        let mut setup_failures = Vec::new();
2079        for relay in relays {
2080            if let Err(err) = client.add_relay(relay).await {
2081                setup_failures.push(format!("{relay}: add relay failed: {err}"));
2082            }
2083        }
2084
2085        client.connect().await;
2086        let publish = Self::send_root_event_to_relays(&client, relays, event);
2087        let retry_timeout = self
2088            .config
2089            .fetch_timeout
2090            .min(MIRROR_ROOT_PUBLISH_RETRY_TIMEOUT);
2091        let result = tokio::time::timeout(retry_timeout, publish).await;
2092        let _ = client.disconnect().await;
2093
2094        match result {
2095            Ok((successful_relays, mut failed_relays)) => {
2096                failed_relays.extend(setup_failures);
2097                (successful_relays, failed_relays)
2098            }
2099            Err(_) => {
2100                setup_failures.push(format!(
2101                    "fresh publish client timed out after {:?}",
2102                    retry_timeout
2103                ));
2104                (Vec::new(), setup_failures)
2105            }
2106        }
2107    }
2108
2109    fn build_public_root_event(
2110        tree_name: &str,
2111        cid: &hashtree_core::Cid,
2112        created_at: Timestamp,
2113    ) -> EventBuilder {
2114        let mut tags = vec![
2115            Tag::identifier(tree_name.to_string()),
2116            Tag::custom(
2117                TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
2118                vec!["hashtree"],
2119            ),
2120            Tag::custom(TagKind::Custom("hash".into()), vec![hex::encode(cid.hash)]),
2121        ];
2122        if let Some(key) = cid.key {
2123            tags.push(Tag::custom(
2124                TagKind::Custom("key".into()),
2125                vec![hex::encode(key)],
2126            ));
2127        }
2128
2129        EventBuilder::new(Kind::Custom(30078), "", tags).custom_created_at(created_at)
2130    }
2131}
2132
2133fn is_missing_local_blob_push_error(error: &anyhow::Error) -> bool {
2134    error
2135        .chain()
2136        .any(|cause| cause.to_string().contains(MISSING_LOCAL_BLOB_PUSH_ERROR))
2137}
2138
2139fn is_missing_local_blob_message(message: &str) -> bool {
2140    message.contains(MISSING_LOCAL_BLOB_PUSH_ERROR)
2141}
2142
2143fn next_replaceable_created_at(now: Timestamp, latest_existing: Option<Timestamp>) -> Timestamp {
2144    match latest_existing {
2145        Some(latest) if latest >= now => Timestamp::from_secs(latest.as_u64().saturating_add(1)),
2146        _ => now,
2147    }
2148}
2149
2150#[cfg(test)]
2151mod tests;