Skip to main content

hashtree_cli/
nostr_mirror.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::sync::Arc;
3use std::sync::Mutex;
4use std::time::Duration;
5use std::time::Instant;
6
7use anyhow::{Context, Result};
8use hashtree_nostr::{
9    CrawlConfig, CrawlReport, ListEventsOptions, NostrBridge, NostrEventStore, RelayFetchMode,
10};
11use nostr::{
12    Alphabet, Event, EventBuilder, Filter, Kind, PublicKey, SingleLetterTag, Tag, TagKind,
13    Timestamp,
14};
15use nostr_sdk::{
16    pool::RelayLimits, prelude::RelayPoolNotification, Client, EventSource, Keys, Options,
17    RelayStatus,
18};
19use tokio::sync::watch;
20use tracing::{debug, info, warn};
21
22use crate::blossom_push::background_blossom_push_with_store;
23use crate::socialgraph::crawler::SOCIALGRAPH_RELAY_EVENT_MAX_SIZE;
24use crate::socialgraph::{self, SocialGraphBackend, SocialGraphStore};
25use crate::HashtreeStore;
26
27#[cfg(not(test))]
28const MIRROR_STARTUP_DELAY: Duration = Duration::from_secs(8);
29#[cfg(test)]
30const MIRROR_STARTUP_DELAY: Duration = Duration::from_millis(50);
31
32#[cfg(not(test))]
33const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_secs(1);
34#[cfg(test)]
35const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_millis(250);
36
37#[cfg(not(test))]
38const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
39#[cfg(test)]
40const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_millis(100);
41
42#[cfg(not(test))]
43const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_secs(30);
44#[cfg(test)]
45const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_millis(100);
46
47const KIND_LONG_FORM_CONTENT: u16 = 30_023;
48const DEFAULT_HISTORY_KINDS: [u16; 7] = [0, 1, 3, 6, 7, 9735, KIND_LONG_FORM_CONTENT];
49const DEFAULT_EVENT_TREE_NAME: &str = "nostr-event-index";
50const DEFAULT_PROFILE_SEARCH_TREE_NAME: &str = "profile-search";
51const DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME: &str = "profiles-by-pubkey";
52const METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 1;
53const METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE: usize = 64;
54const DEFAULT_FULL_TEXT_NOTE_HISTORY_FOLLOW_DISTANCE: u32 = 2;
55const DEFAULT_FULL_TEXT_NOTE_HISTORY_MAX_RELAY_PAGES: usize = 0;
56const LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER: usize = 8;
57const LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 16;
58const LARGE_HISTORY_SYNC_MAX_RELAY_PAGES: usize = 20;
59
60#[cfg(not(test))]
61const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_secs(300);
62#[cfg(test)]
63const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_millis(100);
64
65#[cfg(not(test))]
66const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_secs(5);
67#[cfg(test)]
68const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_millis(20);
69
70#[cfg(not(test))]
71const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_secs(30);
72#[cfg(test)]
73const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_millis(100);
74
75const MISSING_LOCAL_BLOB_PUSH_ERROR: &str = "missing local blob while pushing DAG";
76
77#[derive(Debug, Clone)]
78pub struct NostrMirrorConfig {
79    pub relays: Vec<String>,
80    pub publish_relays: Vec<String>,
81    pub blossom_write_servers: Vec<String>,
82    pub max_follow_distance: u32,
83    pub overmute_threshold: f64,
84    pub author_batch_size: usize,
85    pub history_sync_author_chunk_size: usize,
86    pub history_sync_per_author_event_limit: usize,
87    pub missing_profile_backfill_batch_size: usize,
88    pub fetch_timeout: Duration,
89    pub relay_event_max_size: Option<u32>,
90    pub require_negentropy: bool,
91    pub kinds: Vec<u16>,
92    pub history_sync_on_start: bool,
93    pub history_sync_on_reconnect: bool,
94    pub full_text_note_history_follow_distance: Option<u32>,
95    pub full_text_note_history_max_relay_pages: usize,
96    pub published_event_tree_name: Option<String>,
97    pub published_profile_search_tree_name: Option<String>,
98    pub published_profiles_by_pubkey_tree_name: Option<String>,
99}
100
101impl Default for NostrMirrorConfig {
102    fn default() -> Self {
103        Self {
104            relays: Vec::new(),
105            publish_relays: Vec::new(),
106            blossom_write_servers: Vec::new(),
107            max_follow_distance: 2,
108            overmute_threshold: 1.0,
109            author_batch_size: 256,
110            history_sync_author_chunk_size: 5_000,
111            history_sync_per_author_event_limit: 256,
112            missing_profile_backfill_batch_size: 5_000,
113            fetch_timeout: Duration::from_secs(15),
114            relay_event_max_size: Some(SOCIALGRAPH_RELAY_EVENT_MAX_SIZE),
115            require_negentropy: false,
116            kinds: DEFAULT_HISTORY_KINDS.to_vec(),
117            history_sync_on_start: true,
118            history_sync_on_reconnect: true,
119            full_text_note_history_follow_distance: Some(
120                DEFAULT_FULL_TEXT_NOTE_HISTORY_FOLLOW_DISTANCE,
121            ),
122            full_text_note_history_max_relay_pages: DEFAULT_FULL_TEXT_NOTE_HISTORY_MAX_RELAY_PAGES,
123            published_event_tree_name: Some(DEFAULT_EVENT_TREE_NAME.to_string()),
124            published_profile_search_tree_name: Some(DEFAULT_PROFILE_SEARCH_TREE_NAME.to_string()),
125            published_profiles_by_pubkey_tree_name: Some(
126                DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME.to_string(),
127            ),
128        }
129    }
130}
131
132#[derive(Debug, Default)]
133struct RootPublishState {
134    pending_root: Option<hashtree_core::Cid>,
135    last_changed_at: Option<Instant>,
136    dirty_since: Option<Instant>,
137    last_published_root: Option<hashtree_core::Cid>,
138    last_published_at: Option<Instant>,
139    last_published_created_at: Option<Timestamp>,
140    last_uploaded_root: Option<hashtree_core::Cid>,
141    last_uploaded_at: Option<Instant>,
142}
143
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145struct HistorySyncPlan {
146    relay_fetch_mode: RelayFetchMode,
147    author_batch_size: usize,
148    per_author_event_limit: usize,
149    relay_page_size: usize,
150    max_relay_pages: usize,
151}
152
153pub struct BackgroundNostrMirror {
154    config: NostrMirrorConfig,
155    store: Arc<HashtreeStore>,
156    graph_store: Arc<SocialGraphStore>,
157    client: Client,
158    publish_client: Option<Client>,
159    publish_pubkey: Option<PublicKey>,
160    event_publish_state: Mutex<RootPublishState>,
161    profile_search_publish_state: Mutex<RootPublishState>,
162    profiles_by_pubkey_publish_state: Mutex<RootPublishState>,
163    pending_live_events: Mutex<BTreeMap<String, Event>>,
164    missing_profile_cursor: Mutex<usize>,
165    shutdown_tx: watch::Sender<bool>,
166    shutdown_rx: watch::Receiver<bool>,
167}
168
169impl BackgroundNostrMirror {
170    pub async fn new(
171        config: NostrMirrorConfig,
172        store: Arc<HashtreeStore>,
173        graph_store: Arc<SocialGraphStore>,
174        publish_keys: Option<Keys>,
175    ) -> Result<Self> {
176        let client = if let Some(max_size) = config.relay_event_max_size {
177            let mut limits = RelayLimits::default();
178            limits.events.max_size = Some(max_size);
179            Client::with_opts(Keys::generate(), Options::new().relay_limits(limits))
180        } else {
181            Client::new(Keys::generate())
182        };
183        for relay in &config.relays {
184            client
185                .add_relay(relay)
186                .await
187                .with_context(|| format!("add mirror relay {relay}"))?;
188        }
189        client.connect().await;
190
191        let publish_pubkey = publish_keys.as_ref().map(Keys::public_key);
192        let publish_client = if let Some(keys) = publish_keys {
193            if config.publish_relays.is_empty() {
194                None
195            } else {
196                let client = Client::new(keys);
197                for relay in &config.publish_relays {
198                    client
199                        .add_relay(relay)
200                        .await
201                        .with_context(|| format!("add mirror publish relay {relay}"))?;
202                }
203                client.connect().await;
204                Some(client)
205            }
206        } else {
207            None
208        };
209
210        let (shutdown_tx, shutdown_rx) = watch::channel(false);
211        Ok(Self {
212            config,
213            store,
214            graph_store,
215            client,
216            publish_client,
217            publish_pubkey,
218            event_publish_state: Mutex::new(RootPublishState::default()),
219            profile_search_publish_state: Mutex::new(RootPublishState::default()),
220            profiles_by_pubkey_publish_state: Mutex::new(RootPublishState::default()),
221            pending_live_events: Mutex::new(BTreeMap::new()),
222            missing_profile_cursor: Mutex::new(0),
223            shutdown_tx,
224            shutdown_rx,
225        })
226    }
227
228    pub fn shutdown(&self) {
229        let _ = self.shutdown_tx.send(true);
230    }
231
232    fn sync_publish_roots_from_store(&self) -> Result<()> {
233        self.note_public_events_root_change()?;
234        self.note_profile_search_root_change()?;
235        self.note_profiles_by_pubkey_root_change()?;
236        Ok(())
237    }
238
239    async fn publish_pending_roots(
240        &self,
241        force_event: bool,
242        force_profile_search: bool,
243        force_profiles_by_pubkey: bool,
244    ) -> (Result<()>, Result<()>, Result<()>) {
245        tokio::join!(
246            self.maybe_publish_event_root(force_event),
247            self.maybe_publish_profile_search_root(force_profile_search),
248            self.maybe_publish_profiles_by_pubkey_root(force_profiles_by_pubkey),
249        )
250    }
251
252    async fn publish_priority_roots(
253        &self,
254        force_event: bool,
255        force_profile_search: bool,
256        force_profiles_by_pubkey: bool,
257    ) -> (Result<()>, Result<()>, Result<()>) {
258        let (profile_search_result, profiles_by_pubkey_result) = tokio::join!(
259            async {
260                if force_profile_search {
261                    self.maybe_publish_profile_search_root(true).await
262                } else {
263                    Ok(())
264                }
265            },
266            async {
267                if force_profiles_by_pubkey {
268                    self.maybe_publish_profiles_by_pubkey_root(true).await
269                } else {
270                    Ok(())
271                }
272            },
273        );
274        let event_result = if force_event {
275            self.maybe_publish_event_root(true).await
276        } else {
277            Ok(())
278        };
279        (
280            event_result,
281            profile_search_result,
282            profiles_by_pubkey_result,
283        )
284    }
285
286    pub async fn run(&self) -> Result<()> {
287        if self.config.relays.is_empty() || self.config.max_follow_distance == 0 {
288            return Ok(());
289        }
290
291        info!(
292            "Nostr mirror starting: relays={} max_follow_distance={} negentropy_only={} kinds={:?} history_sync_author_chunk_size={} history_sync_on_start={} history_sync_on_reconnect={}",
293            self.config.relays.len(),
294            self.config.max_follow_distance,
295            self.config.require_negentropy,
296            self.config.kinds,
297            self.config.history_sync_author_chunk_size.max(1),
298            self.config.history_sync_on_start,
299            self.config.history_sync_on_reconnect
300        );
301
302        tokio::time::sleep(MIRROR_STARTUP_DELAY).await;
303        tokio::time::sleep(MIRROR_CONNECT_SETTLE_DELAY).await;
304        let live_since = Timestamp::now();
305        self.sync_publish_roots_from_store()?;
306
307        let initial_authors = self.collect_authors()?;
308        if initial_authors.is_empty() {
309            info!("Nostr mirror: no social-graph authors to mirror yet");
310        }
311
312        let mut subscribed_authors = HashSet::new();
313        self.subscribe_authors_since(&initial_authors, live_since, &mut subscribed_authors)
314            .await?;
315
316        if !initial_authors.is_empty() && self.config.history_sync_on_start {
317            self.history_sync_full_text_notes_for_reachable_authors()
318                .await?;
319            if self.should_backfill_missing_profiles(None) {
320                let missing_profile_authors = self.collect_missing_profile_authors(
321                    self.config.missing_profile_backfill_batch_size,
322                )?;
323                if !missing_profile_authors.is_empty() {
324                    info!(
325                        "Nostr mirror missing-profile backfill starting: authors={}",
326                        missing_profile_authors.len()
327                    );
328                    self.history_sync_authors_with_kinds(
329                        missing_profile_authors,
330                        &[Kind::Metadata.as_u16()],
331                    )
332                    .await?;
333                }
334            }
335            self.history_sync_authors(initial_authors.clone()).await?;
336        }
337
338        let mut relay_statuses = self.capture_relay_statuses().await;
339        let mut last_reconnect_history_sync_at: Option<Instant> = None;
340        let mut last_missing_profile_backfill_at: Option<Instant> = None;
341        let mut notifications = self.client.notifications();
342        let mut shutdown_rx = self.shutdown_rx.clone();
343        let mut refresh_interval = tokio::time::interval(MIRROR_AUTHOR_REFRESH_INTERVAL);
344        let mut publish_interval = tokio::time::interval(MIRROR_ROOT_PUBLISH_DEBOUNCE);
345
346        loop {
347            tokio::select! {
348                _ = shutdown_rx.changed() => {
349                    if *shutdown_rx.borrow() {
350                        break;
351                    }
352                }
353                _ = refresh_interval.tick() => {
354                    let authors = self.collect_authors()?;
355                    let new_authors = authors
356                        .into_iter()
357                        .filter(|author| !subscribed_authors.contains(author))
358                        .collect::<Vec<_>>();
359                    if !new_authors.is_empty() {
360                        debug!(
361                            "Nostr mirror discovered {} newly reachable author(s)",
362                            new_authors.len()
363                        );
364                        self.history_sync_full_text_notes_for_authors(new_authors.clone())
365                            .await?;
366                        self.history_sync_authors(new_authors.clone()).await?;
367                        self.subscribe_authors_since(
368                            &new_authors,
369                            Timestamp::now(),
370                            &mut subscribed_authors,
371                        )
372                        .await?;
373                    }
374                    if self.should_backfill_missing_profiles(last_missing_profile_backfill_at) {
375                        let missing_profile_authors = self.collect_missing_profile_authors(
376                            self.config.missing_profile_backfill_batch_size,
377                        )?;
378                        if !missing_profile_authors.is_empty() {
379                            info!(
380                                "Nostr mirror missing-profile backfill starting: authors={}",
381                                missing_profile_authors.len()
382                            );
383                            self.history_sync_authors_with_kinds(
384                                missing_profile_authors,
385                                &[Kind::Metadata.as_u16()],
386                            )
387                            .await?;
388                            last_missing_profile_backfill_at = Some(Instant::now());
389                        }
390                    }
391                }
392                _ = publish_interval.tick() => {
393                    self.sync_publish_roots_from_store()?;
394                    if let Err(err) = self.flush_live_events().await {
395                        warn!("Nostr mirror live event flush failed: {:#}", err);
396                    }
397                    let (event_result, profile_search_result, profiles_by_pubkey_result) = self
398                        .publish_pending_roots(false, false, false)
399                        .await;
400                    if let Err(err) = event_result {
401                        warn!("Nostr mirror event-root publish failed: {:#}", err);
402                    }
403                    if let Err(err) = profile_search_result {
404                        warn!("Nostr mirror profile-search publish failed: {:#}", err);
405                    }
406                    if let Err(err) = profiles_by_pubkey_result {
407                        warn!("Nostr mirror profiles-by-pubkey publish failed: {:#}", err);
408                    }
409                }
410                notification = notifications.recv() => {
411                    match notification {
412                        Ok(RelayPoolNotification::Event { event, .. }) => {
413                            self.ingest_live_event(&event)?;
414                        }
415                        Ok(RelayPoolNotification::RelayStatus { relay_url, status }) => {
416                            let relay_url = relay_url.to_string();
417                            let previous = relay_statuses.insert(relay_url.clone(), status);
418                            if Self::should_history_sync_on_reconnect(
419                                self.config.history_sync_on_reconnect,
420                                previous,
421                                status,
422                            ) && Self::should_run_reconnect_history_sync(
423                                    last_reconnect_history_sync_at.as_ref(),
424                                )
425                            {
426                                let authors = self.collect_authors()?;
427                                if !authors.is_empty() {
428                                    info!(
429                                        "Nostr mirror relay reconnected; running catch-up history sync: relay={} authors={} negentropy_only={}",
430                                        relay_url,
431                                        authors.len(),
432                                        self.config.require_negentropy
433                                    );
434                                    self.history_sync_authors(authors).await?;
435                                    last_reconnect_history_sync_at = Some(Instant::now());
436                                }
437                            }
438                        }
439                        Ok(RelayPoolNotification::Shutdown) => break,
440                        Ok(_) => {}
441                        Err(err) => {
442                            warn!("Nostr mirror notification error: {}", err);
443                            break;
444                        }
445                    }
446                }
447            }
448        }
449
450        if let Err(err) = self.flush_live_events().await {
451            warn!(
452                "Nostr mirror live event flush failed during shutdown: {:#}",
453                err
454            );
455        }
456        if let Err(err) = self.sync_publish_roots_from_store() {
457            warn!(
458                "Nostr mirror root-state refresh failed during shutdown: {:#}",
459                err
460            );
461        }
462        let (event_result, profile_search_result, profiles_by_pubkey_result) =
463            self.publish_pending_roots(true, true, true).await;
464        if let Err(err) = event_result {
465            warn!(
466                "Nostr mirror event-root publish failed during shutdown: {:#}",
467                err
468            );
469        }
470        if let Err(err) = profile_search_result {
471            warn!(
472                "Nostr mirror profile-search publish failed during shutdown: {:#}",
473                err
474            );
475        }
476        if let Err(err) = profiles_by_pubkey_result {
477            warn!(
478                "Nostr mirror profiles-by-pubkey publish failed during shutdown: {:#}",
479                err
480            );
481        }
482        let _ = self.client.disconnect().await;
483        if let Some(client) = self.publish_client.as_ref() {
484            let _ = client.disconnect().await;
485        }
486        Ok(())
487    }
488
489    async fn capture_relay_statuses(&self) -> HashMap<String, RelayStatus> {
490        let mut statuses = HashMap::new();
491        for (relay_url, relay) in self.client.relays().await {
492            statuses.insert(relay_url.to_string(), relay.status().await);
493        }
494        statuses
495    }
496
497    async fn has_connected_publish_relay(&self) -> bool {
498        let Some(client) = self.publish_client.as_ref() else {
499            return false;
500        };
501        Self::client_has_connected_relay(client).await
502    }
503
504    async fn client_has_connected_relay(client: &Client) -> bool {
505        for (_relay_url, relay) in client.relays().await {
506            if relay.status().await == RelayStatus::Connected {
507                return true;
508            }
509        }
510        false
511    }
512
513    fn collect_authors(&self) -> Result<Vec<String>> {
514        self.collect_authors_with_max_distance(self.config.max_follow_distance)
515    }
516
517    fn collect_authors_with_max_distance(&self, max_distance: u32) -> Result<Vec<String>> {
518        let mut authors = Vec::new();
519        let mut seen = HashSet::new();
520        for distance in 0..=max_distance {
521            for pubkey in socialgraph::SocialGraphBackend::users_by_follow_distance(
522                self.graph_store.as_ref(),
523                distance,
524            )
525            .with_context(|| format!("load social-graph distance {distance}"))?
526            {
527                if self
528                    .graph_store
529                    .is_overmuted_user(&pubkey, self.config.overmute_threshold)?
530                {
531                    continue;
532                }
533                let hex = hex::encode(pubkey);
534                if seen.insert(hex.clone()) {
535                    authors.push(hex);
536                }
537            }
538        }
539        Ok(authors)
540    }
541
542    fn full_text_note_history_follow_distance(&self) -> Option<u32> {
543        let distance = self.config.full_text_note_history_follow_distance?;
544        if self
545            .config
546            .kinds
547            .iter()
548            .any(|kind| *kind == Kind::TextNote.as_u16() || *kind == KIND_LONG_FORM_CONTENT)
549        {
550            Some(distance.min(self.config.max_follow_distance))
551        } else {
552            None
553        }
554    }
555
556    fn collect_missing_profile_authors(&self, limit: usize) -> Result<Vec<String>> {
557        if limit == 0 {
558            return Ok(Vec::new());
559        }
560
561        let authors = self.collect_authors()?;
562        if authors.is_empty() {
563            return Ok(Vec::new());
564        }
565
566        let mut cursor = self
567            .missing_profile_cursor
568            .lock()
569            .expect("missing profile cursor");
570        let mut index = (*cursor).min(authors.len());
571        let mut scanned = 0usize;
572        let mut missing = Vec::new();
573
574        while scanned < authors.len() && missing.len() < limit {
575            let author = &authors[index];
576            if self.graph_store.latest_profile_event(author)?.is_none() {
577                missing.push(author.clone());
578            }
579            index += 1;
580            if index == authors.len() {
581                index = 0;
582            }
583            scanned += 1;
584        }
585
586        *cursor = index;
587        Ok(missing)
588    }
589
590    fn should_backfill_missing_profiles(&self, last_run: Option<Instant>) -> bool {
591        if self.config.missing_profile_backfill_batch_size == 0
592            || !self.config.kinds.contains(&Kind::Metadata.as_u16())
593        {
594            return false;
595        }
596        match last_run {
597            Some(last_run) => last_run.elapsed() >= MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL,
598            None => true,
599        }
600    }
601
602    fn should_history_sync_on_reconnect(
603        history_sync_on_reconnect: bool,
604        previous: Option<RelayStatus>,
605        status: RelayStatus,
606    ) -> bool {
607        history_sync_on_reconnect
608            && status == RelayStatus::Connected
609            && matches!(
610                previous,
611                Some(
612                    RelayStatus::Initialized
613                        | RelayStatus::Pending
614                        | RelayStatus::Connecting
615                        | RelayStatus::Disconnected
616                        | RelayStatus::Terminated
617                )
618            )
619    }
620
621    fn should_run_reconnect_history_sync(last_run: Option<&Instant>) -> bool {
622        match last_run {
623            None => true,
624            Some(last_run) => last_run.elapsed() >= MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN,
625        }
626    }
627
628    fn is_metadata_only_history_sync(kinds: &[u16]) -> bool {
629        !kinds.is_empty() && kinds.iter().all(|kind| *kind == Kind::Metadata.as_u16())
630    }
631
632    fn history_sync_kinds_affect_profile_or_graph(kinds: &[u16]) -> bool {
633        kinds.is_empty()
634            || kinds.iter().any(|kind| {
635                *kind == Kind::Metadata.as_u16()
636                    || *kind == Kind::ContactList.as_u16()
637                    || *kind == Kind::MuteList.as_u16()
638            })
639    }
640
641    fn history_sync_plan_for(
642        config: &NostrMirrorConfig,
643        authors: usize,
644        kinds: &[u16],
645    ) -> HistorySyncPlan {
646        let author_batch_size = config.author_batch_size.max(1);
647        let per_author_event_limit = config.history_sync_per_author_event_limit.max(1);
648        let relay_page_size = 1_000;
649        let max_relay_pages = 10;
650
651        if Self::is_metadata_only_history_sync(kinds) {
652            return HistorySyncPlan {
653                relay_fetch_mode: RelayFetchMode::AuthorBatches,
654                author_batch_size: author_batch_size.min(METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE),
655                per_author_event_limit: METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT,
656                relay_page_size,
657                max_relay_pages,
658            };
659        }
660
661        if authors > author_batch_size.saturating_mul(LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER) {
662            return HistorySyncPlan {
663                relay_fetch_mode: RelayFetchMode::GlobalRecent,
664                author_batch_size,
665                per_author_event_limit: per_author_event_limit
666                    .min(LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT)
667                    .max(1),
668                relay_page_size,
669                max_relay_pages: LARGE_HISTORY_SYNC_MAX_RELAY_PAGES,
670            };
671        }
672
673        HistorySyncPlan {
674            relay_fetch_mode: RelayFetchMode::AuthorBatches,
675            author_batch_size,
676            per_author_event_limit,
677            relay_page_size,
678            max_relay_pages,
679        }
680    }
681
682    fn history_sync_plan(&self, authors: usize, kinds: &[u16]) -> HistorySyncPlan {
683        Self::history_sync_plan_for(&self.config, authors, kinds)
684    }
685
686    async fn history_sync_authors(&self, authors: Vec<String>) -> Result<()> {
687        self.history_sync_authors_with_kinds(authors, &self.config.kinds)
688            .await
689    }
690
691    async fn history_sync_authors_with_kinds(
692        &self,
693        authors: Vec<String>,
694        kinds: &[u16],
695    ) -> Result<()> {
696        self.history_sync_authors_with_kinds_and_mode(authors, kinds, false, None)
697            .await
698    }
699
700    async fn history_sync_full_text_notes_for_reachable_authors(&self) -> Result<()> {
701        let Some(distance) = self.full_text_note_history_follow_distance() else {
702            return Ok(());
703        };
704        info!(
705            "Nostr mirror full text content history author collection starting: max_follow_distance={distance}"
706        );
707        let authors = self.collect_authors_with_max_distance(distance)?;
708        self.history_sync_full_text_notes_for_authors(authors).await
709    }
710
711    async fn history_sync_full_text_notes_for_authors(&self, authors: Vec<String>) -> Result<()> {
712        let Some(distance) = self.full_text_note_history_follow_distance() else {
713            return Ok(());
714        };
715        let mut close_authors = Vec::new();
716        for author in authors {
717            let Ok(pubkey) = hex::decode(&author) else {
718                continue;
719            };
720            let Ok(pubkey) = <[u8; 32]>::try_from(pubkey.as_slice()) else {
721                continue;
722            };
723            if self
724                .graph_store
725                .follow_distance(&pubkey)?
726                .is_some_and(|actual_distance| actual_distance <= distance)
727            {
728                close_authors.push(author);
729            }
730        }
731        if close_authors.is_empty() {
732            return Ok(());
733        }
734
735        info!(
736            "Nostr mirror full text content history sync starting: authors={} max_follow_distance={} max_relay_pages={}",
737            close_authors.len(),
738            distance,
739            self.config.full_text_note_history_max_relay_pages
740        );
741        let kinds = [Kind::TextNote.as_u16(), KIND_LONG_FORM_CONTENT];
742        self.history_sync_authors_with_kinds_and_mode(
743            close_authors,
744            &kinds,
745            true,
746            Some(self.config.full_text_note_history_max_relay_pages),
747        )
748        .await
749    }
750
751    async fn history_sync_authors_with_kinds_and_mode(
752        &self,
753        authors: Vec<String>,
754        kinds: &[u16],
755        full_author_history: bool,
756        max_relay_pages: Option<usize>,
757    ) -> Result<()> {
758        let update_profile_and_graph = Self::history_sync_kinds_affect_profile_or_graph(kinds);
759        self.history_sync_authors_chunked(
760            authors,
761            |current_root, author_chunk| async move {
762                self.history_sync_author_chunk(
763                    current_root,
764                    author_chunk,
765                    kinds,
766                    full_author_history,
767                    max_relay_pages,
768                )
769                .await
770            },
771            update_profile_and_graph,
772        )
773        .await
774    }
775
776    async fn history_sync_authors_chunked<F, Fut>(
777        &self,
778        authors: Vec<String>,
779        mut run_chunk: F,
780        update_profile_and_graph: bool,
781    ) -> Result<()>
782    where
783        F: FnMut(Option<hashtree_core::Cid>, Vec<String>) -> Fut,
784        Fut: std::future::Future<Output = Result<CrawlReport>>,
785    {
786        if authors.is_empty() {
787            return Ok(());
788        }
789
790        info!(
791            "Nostr mirror history sync starting: authors={} relays={} negentropy_only={}",
792            authors.len(),
793            self.config.relays.len(),
794            self.config.require_negentropy
795        );
796
797        let mut current_root = self.graph_store.public_events_root_for_write()?;
798        let mut last_error = None;
799        let mut applied_chunks = 0usize;
800        let mut failed_chunks = 0usize;
801        let chunk_size = self.config.history_sync_author_chunk_size.max(1);
802        let total_chunks = authors.len().div_ceil(chunk_size);
803
804        for (chunk_index, author_chunk) in authors.chunks(chunk_size).enumerate() {
805            let author_chunk = author_chunk.to_vec();
806            let author_count = author_chunk.len();
807            info!(
808                "Nostr mirror history sync chunk starting: chunk={}/{} authors={}",
809                chunk_index + 1,
810                total_chunks,
811                author_count
812            );
813            let report = match run_chunk(current_root.clone(), author_chunk).await {
814                Ok(report) => report,
815                Err(err) => {
816                    failed_chunks = failed_chunks.saturating_add(1);
817                    warn!(
818                        "Nostr mirror history sync chunk failed: chunk={}/{} authors={} error={:#}",
819                        chunk_index + 1,
820                        total_chunks,
821                        author_count,
822                        err
823                    );
824                    last_error = Some(err);
825                    continue;
826                }
827            };
828
829            if report.root != current_root {
830                self.apply_history_root_with_options(
831                    report.root.as_ref(),
832                    update_profile_and_graph,
833                )
834                .await?;
835                current_root = report.root.clone();
836                info!(
837                    "Nostr mirror history sync updated trusted root: chunk={}/{} authors_processed={} events_selected={} events_seen={}",
838                    chunk_index + 1,
839                    total_chunks,
840                    report.authors_processed,
841                    report.events_selected,
842                    report.events_seen
843                );
844            }
845            applied_chunks = applied_chunks.saturating_add(1);
846        }
847
848        if applied_chunks == 0 {
849            return Err(last_error
850                .unwrap_or_else(|| anyhow::anyhow!("mirror history sync made no progress"))
851                .context("run mirror history sync"));
852        }
853        if failed_chunks > 0 {
854            warn!(
855                "Nostr mirror history sync completed with skipped chunks: applied_chunks={} failed_chunks={}",
856                applied_chunks, failed_chunks
857            );
858        }
859        Ok(())
860    }
861
862    async fn history_sync_author_chunk(
863        &self,
864        current_root: Option<hashtree_core::Cid>,
865        authors: Vec<String>,
866        kinds: &[u16],
867        full_author_history: bool,
868        max_relay_pages: Option<usize>,
869    ) -> Result<CrawlReport> {
870        let mut last_error = None;
871        let mut report = None;
872        let mut plan = self.history_sync_plan(authors.len(), kinds);
873        if full_author_history {
874            plan.relay_fetch_mode = RelayFetchMode::AuthorBatches;
875            plan.max_relay_pages = max_relay_pages.unwrap_or(plan.max_relay_pages);
876        }
877        for attempt in 0..3 {
878            let mut last_logged_authors = 0usize;
879            let bridge = NostrBridge::new(
880                self.store.store_arc(),
881                CrawlConfig {
882                    relays: self.config.relays.clone(),
883                    author_allowlist: Some(authors.clone()),
884                    max_live_bytes: None,
885                    max_events_seen: None,
886                    max_authors: None,
887                    max_follow_distance: None,
888                    author_batch_size: plan.author_batch_size,
889                    per_author_event_limit: plan.per_author_event_limit,
890                    per_author_live_bytes: None,
891                    fetch_timeout: self.config.fetch_timeout,
892                    kinds: Some(kinds.to_vec()),
893                    relay_fetch_mode: plan.relay_fetch_mode,
894                    require_negentropy: self.config.require_negentropy,
895                    relay_event_max_size: self.config.relay_event_max_size,
896                    relay_page_size: plan.relay_page_size,
897                    max_relay_pages: plan.max_relay_pages,
898                    full_author_history,
899                },
900            );
901
902            match bridge
903                .crawl_with_progress(self.graph_store.as_ref(), current_root.as_ref(), |progress| {
904                    let log_interval = self.config.author_batch_size.saturating_mul(8).max(2_048);
905                    let should_log = progress.authors_processed == progress.authors_considered
906                        || progress.authors_processed == 0
907                        || progress
908                            .authors_processed
909                            .saturating_sub(last_logged_authors)
910                            >= log_interval;
911                    if should_log {
912                        last_logged_authors = progress.authors_processed;
913                        info!(
914                            "Nostr mirror history sync progress: authors_processed={}/{} events_selected={} events_seen={}",
915                            progress.authors_processed,
916                            progress.authors_considered,
917                            progress.events_selected,
918                            progress.events_seen
919                        );
920                    }
921                })
922                .await
923            {
924                Ok(next_report) => {
925                    report = Some(next_report);
926                    break;
927                }
928                Err(err) => {
929                    last_error = Some(err);
930                    if attempt < 2 {
931                        tokio::time::sleep(Duration::from_millis(500)).await;
932                    }
933                }
934            }
935        }
936        report
937            .ok_or_else(|| last_error.expect("history sync retry captured error"))
938            .context("run mirror history sync")
939    }
940
941    #[cfg(test)]
942    async fn apply_history_root(&self, root: Option<&hashtree_core::Cid>) -> Result<()> {
943        self.apply_history_root_with_options(root, true).await
944    }
945
946    async fn apply_history_root_with_options(
947        &self,
948        root: Option<&hashtree_core::Cid>,
949        update_profile_and_graph: bool,
950    ) -> Result<()> {
951        self.graph_store.write_public_events_root(root)?;
952        let Some(root) = root else {
953            return Ok(());
954        };
955
956        self.note_public_events_root_change()?;
957        if update_profile_and_graph {
958            let event_store = NostrEventStore::new(self.store.store_arc());
959            let events = event_store
960                .list_recent_lossy(Some(root), ListEventsOptions::default())
961                .await
962                .context("list trusted mirrored events")?
963                .into_iter()
964                .map(socialgraph::stored_event_to_nostr_event)
965                .collect::<Result<Vec<_>>>()?;
966
967            self.graph_store
968                .rebuild_profile_index_for_events(&events)
969                .context("rebuild mirrored profile search index")?;
970            socialgraph::ingest_graph_parsed_events(self.graph_store.as_ref(), &events)
971                .context("sync mirrored social graph state")?;
972            self.note_profile_search_root_change()?;
973            self.note_profiles_by_pubkey_root_change()?;
974        }
975        let (event_result, profile_search_result, profiles_by_pubkey_result) = self
976            .publish_priority_roots(true, update_profile_and_graph, update_profile_and_graph)
977            .await;
978        if let Err(err) = event_result {
979            warn!(
980                "Nostr mirror event-root publish failed after root update: {:#}",
981                err
982            );
983        }
984        if let Err(err) = profile_search_result {
985            warn!(
986                "Nostr mirror profile-search publish failed after root update: {:#}",
987                err
988            );
989        }
990        if let Err(err) = profiles_by_pubkey_result {
991            warn!(
992                "Nostr mirror profiles-by-pubkey publish failed after root update: {:#}",
993                err
994            );
995        }
996        Ok(())
997    }
998
999    async fn subscribe_authors_since(
1000        &self,
1001        authors: &[String],
1002        since: Timestamp,
1003        subscribed_authors: &mut HashSet<String>,
1004    ) -> Result<()> {
1005        let new_authors = authors
1006            .iter()
1007            .filter(|author| !subscribed_authors.contains(*author))
1008            .cloned()
1009            .collect::<Vec<_>>();
1010        if new_authors.is_empty() {
1011            return Ok(());
1012        }
1013
1014        for chunk in new_authors.chunks(self.config.author_batch_size.max(1)) {
1015            let pubkeys = chunk
1016                .iter()
1017                .filter_map(|author| PublicKey::from_hex(author).ok())
1018                .collect::<Vec<_>>();
1019            if pubkeys.is_empty() {
1020                continue;
1021            }
1022
1023            let filter = Filter::new()
1024                .authors(pubkeys)
1025                .kinds(self.config.kinds.iter().copied().map(Kind::from))
1026                .since(since);
1027
1028            self.client
1029                .subscribe(vec![filter], None)
1030                .await
1031                .context("subscribe mirror author batch")?;
1032        }
1033
1034        subscribed_authors.extend(new_authors);
1035        Ok(())
1036    }
1037
1038    fn ingest_live_event(&self, event: &Event) -> Result<()> {
1039        self.pending_live_events
1040            .lock()
1041            .expect("pending live events")
1042            .insert(event.id.to_hex(), event.clone());
1043        Ok(())
1044    }
1045
1046    async fn flush_live_events(&self) -> Result<()> {
1047        let pending = {
1048            let mut pending = self
1049                .pending_live_events
1050                .lock()
1051                .expect("pending live events");
1052            if pending.is_empty() {
1053                return Ok(());
1054            }
1055            std::mem::take(&mut *pending)
1056        };
1057        let events = pending.into_values().collect::<Vec<_>>();
1058        let event_count = events.len();
1059        let previous_event_root = self.graph_store.public_events_root()?;
1060        let previous_profile_search_root = self.graph_store.profile_search_root()?;
1061        let previous_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
1062
1063        socialgraph::ingest_parsed_events_with_storage_class(
1064            self.graph_store.as_ref(),
1065            &events,
1066            socialgraph::EventStorageClass::Public,
1067        )
1068        .context("ingest live mirrored event batch")?;
1069
1070        let next_event_root = self.graph_store.public_events_root()?;
1071        let next_profile_search_root = self.graph_store.profile_search_root()?;
1072        let next_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
1073        let event_root_changed = next_event_root != previous_event_root;
1074        let profile_search_root_changed = next_profile_search_root != previous_profile_search_root;
1075        let profiles_by_pubkey_root_changed =
1076            next_profiles_by_pubkey_root != previous_profiles_by_pubkey_root;
1077
1078        if event_root_changed {
1079            self.note_public_events_root_change()?;
1080        }
1081        if profile_search_root_changed {
1082            self.note_profile_search_root_change()?;
1083        }
1084        if profiles_by_pubkey_root_changed {
1085            self.note_profiles_by_pubkey_root_change()?;
1086        }
1087        if profile_search_root_changed {
1088            self.maybe_publish_profile_search_root(true).await?;
1089        }
1090        if profiles_by_pubkey_root_changed {
1091            self.maybe_publish_profiles_by_pubkey_root(true).await?;
1092        }
1093        if event_root_changed {
1094            self.maybe_publish_event_root(true).await?;
1095        }
1096        info!(
1097            "Nostr mirror flushed live events: events={} event_root_changed={} profile_search_root_changed={} profiles_by_pubkey_root_changed={}",
1098            event_count,
1099            event_root_changed,
1100            profile_search_root_changed,
1101            profiles_by_pubkey_root_changed
1102        );
1103        Ok(())
1104    }
1105
1106    fn note_public_events_root_change(&self) -> Result<()> {
1107        let root = self.graph_store.public_events_root()?;
1108        Self::note_root_change(
1109            self.config.published_event_tree_name.as_deref(),
1110            &self.event_publish_state,
1111            root,
1112        )
1113    }
1114
1115    fn note_profile_search_root_change(&self) -> Result<()> {
1116        let root = self.graph_store.profile_search_root()?;
1117        Self::note_root_change(
1118            self.config.published_profile_search_tree_name.as_deref(),
1119            &self.profile_search_publish_state,
1120            root,
1121        )
1122    }
1123
1124    fn note_profiles_by_pubkey_root_change(&self) -> Result<()> {
1125        let root = self.graph_store.profiles_by_pubkey_root()?;
1126        Self::note_root_change(
1127            self.config
1128                .published_profiles_by_pubkey_tree_name
1129                .as_deref(),
1130            &self.profiles_by_pubkey_publish_state,
1131            root,
1132        )
1133    }
1134
1135    fn note_root_change(
1136        tree_name: Option<&str>,
1137        publish_state: &Mutex<RootPublishState>,
1138        root: Option<hashtree_core::Cid>,
1139    ) -> Result<()> {
1140        let Some(_tree_name) = tree_name else {
1141            return Ok(());
1142        };
1143
1144        let mut state = publish_state.lock().expect("root publish state");
1145        let now = Instant::now();
1146
1147        if state.pending_root == root {
1148            return Ok(());
1149        }
1150
1151        state.pending_root = root;
1152        state.last_changed_at = Some(now);
1153        if state.dirty_since.is_none() {
1154            state.dirty_since = Some(now);
1155        }
1156        Ok(())
1157    }
1158
1159    async fn maybe_publish_event_root(&self, force: bool) -> Result<()> {
1160        self.ensure_public_events_root_is_publishable().await?;
1161        let result = self
1162            .maybe_publish_root(
1163                self.config.published_event_tree_name.as_deref(),
1164                &self.event_publish_state,
1165                "event root",
1166                force,
1167            )
1168            .await;
1169        let Err(error) = result else {
1170            return Ok(());
1171        };
1172        if !is_missing_local_blob_push_error(&error) {
1173            return Err(error);
1174        }
1175
1176        warn!(
1177            "Nostr mirror event root DAG references missing local blobs; rebuilding event indexes from stored events"
1178        );
1179        let (public_count, ambient_count) = self
1180            .graph_store
1181            .rebuild_event_indexes_from_stored_events_async()
1182            .await
1183            .context("rebuild event indexes after missing event blobs")?;
1184        info!(
1185            "Nostr mirror rebuilt event indexes after missing blobs: public={} ambient={}",
1186            public_count, ambient_count
1187        );
1188        self.sync_publish_roots_from_store()?;
1189
1190        self.maybe_publish_root(
1191            self.config.published_event_tree_name.as_deref(),
1192            &self.event_publish_state,
1193            "event root",
1194            force,
1195        )
1196        .await
1197    }
1198
1199    async fn ensure_public_events_root_is_publishable(&self) -> Result<()> {
1200        let Some(root) = self.graph_store.public_events_root()? else {
1201            return Ok(());
1202        };
1203        let event_store = NostrEventStore::new(self.store.store_arc());
1204        if let Err(err) = event_store.validate_index_root(Some(&root)).await {
1205            warn!(
1206                "Nostr mirror refusing to publish invalid event index root {}; clearing trusted root: {}",
1207                hex::encode(root.hash),
1208                err
1209            );
1210            self.graph_store.write_public_events_root(None)?;
1211            self.note_public_events_root_change()?;
1212        }
1213        Ok(())
1214    }
1215
1216    async fn maybe_publish_profile_search_root(&self, force: bool) -> Result<()> {
1217        self.maybe_publish_root(
1218            self.config.published_profile_search_tree_name.as_deref(),
1219            &self.profile_search_publish_state,
1220            "profile search root",
1221            force,
1222        )
1223        .await
1224    }
1225
1226    async fn maybe_publish_profiles_by_pubkey_root(&self, force: bool) -> Result<()> {
1227        self.maybe_publish_root(
1228            self.config
1229                .published_profiles_by_pubkey_tree_name
1230                .as_deref(),
1231            &self.profiles_by_pubkey_publish_state,
1232            "profiles-by-pubkey root",
1233            force,
1234        )
1235        .await
1236    }
1237
1238    async fn maybe_publish_root(
1239        &self,
1240        tree_name: Option<&str>,
1241        publish_state: &Mutex<RootPublishState>,
1242        log_label: &str,
1243        force: bool,
1244    ) -> Result<()> {
1245        let Some(tree_name) = tree_name else {
1246            return Ok(());
1247        };
1248
1249        let pending_root = {
1250            let state = publish_state.lock().expect("root publish state");
1251            let Some(pending_root) = state.pending_root.clone() else {
1252                return Ok(());
1253            };
1254
1255            let now = Instant::now();
1256            let debounce_ready = state.last_changed_at.is_some_and(|changed_at| {
1257                now.duration_since(changed_at) >= MIRROR_ROOT_PUBLISH_DEBOUNCE
1258            });
1259            let stale_ready = state.dirty_since.is_some_and(|dirty_since| {
1260                now.duration_since(dirty_since) >= MIRROR_ROOT_PUBLISH_MAX_STALENESS
1261            });
1262            if !force && !debounce_ready && !stale_ready {
1263                return Ok(());
1264            }
1265
1266            pending_root
1267        };
1268
1269        let needs_upload = {
1270            let state = publish_state.lock().expect("root publish state");
1271            !self.config.blossom_write_servers.is_empty()
1272                && state.last_uploaded_root.as_ref() != Some(&pending_root)
1273        };
1274        if needs_upload {
1275            background_blossom_push_with_store(
1276                Arc::clone(&self.store),
1277                &pending_root.to_string(),
1278                &self.config.blossom_write_servers,
1279            )
1280            .await
1281            .with_context(|| format!("upload {log_label} DAG to Blossom"))?;
1282
1283            let mut state = publish_state.lock().expect("root publish state");
1284            if state.pending_root.as_ref() == Some(&pending_root) {
1285                state.last_uploaded_root = Some(pending_root.clone());
1286                state.last_uploaded_at = Some(Instant::now());
1287            }
1288        }
1289
1290        let mut successful_relays = Vec::new();
1291        let mut failed_relays = Vec::new();
1292        let publish_required =
1293            self.publish_client.is_some() && !self.config.publish_relays.is_empty();
1294        if publish_required {
1295            let Some(publish_client) = self.publish_client.as_ref() else {
1296                unreachable!("publish_required implies publish_client");
1297            };
1298            if !self.has_connected_publish_relay().await {
1299                return Ok(());
1300            }
1301
1302            let already_published = {
1303                let state = publish_state.lock().expect("root publish state");
1304                state.last_published_root.as_ref() == Some(&pending_root)
1305            };
1306            if !already_published {
1307                let publish_relays = self.config.publish_relays.clone();
1308                let latest_known_created_at = {
1309                    let state = publish_state.lock().expect("root publish state");
1310                    state.last_published_created_at
1311                };
1312                let publish_created_at = next_replaceable_created_at(
1313                    Timestamp::now(),
1314                    later_timestamp(
1315                        latest_known_created_at,
1316                        self.latest_root_event_created_at(tree_name).await,
1317                    ),
1318                );
1319                let event = publish_client
1320                    .sign_event_builder(Self::build_public_root_event(
1321                        tree_name,
1322                        &pending_root,
1323                        publish_created_at,
1324                    ))
1325                    .await
1326                    .with_context(|| format!("sign {log_label} event"))?;
1327                let publish_result = self
1328                    .publish_root_event_to_relays(publish_client, &publish_relays, &event)
1329                    .await
1330                    .with_context(|| format!("publish {log_label} event"))?;
1331                successful_relays = publish_result.0;
1332                failed_relays = publish_result.1;
1333                if successful_relays.is_empty() {
1334                    let failure_summary = if failed_relays.is_empty() {
1335                        "no publish relays accepted the event".to_string()
1336                    } else {
1337                        failed_relays.join("; ")
1338                    };
1339                    anyhow::bail!("no publish relays accepted the event ({failure_summary})");
1340                }
1341
1342                let mut state = publish_state.lock().expect("root publish state");
1343                if state.pending_root.as_ref() == Some(&pending_root) {
1344                    state.last_published_root = Some(pending_root.clone());
1345                    state.last_published_at = Some(Instant::now());
1346                    state.last_published_created_at = Some(event.created_at);
1347                }
1348            }
1349        }
1350
1351        {
1352            let mut state = publish_state.lock().expect("root publish state");
1353            if state.pending_root.as_ref() == Some(&pending_root) {
1354                let upload_satisfied = self.config.blossom_write_servers.is_empty()
1355                    || state.last_uploaded_root.as_ref() == Some(&pending_root);
1356                let publish_satisfied =
1357                    !publish_required || state.last_published_root.as_ref() == Some(&pending_root);
1358                if upload_satisfied && publish_satisfied {
1359                    state.dirty_since = None;
1360                }
1361            }
1362        }
1363
1364        info!(
1365            "Nostr mirror published {}: tree={} hash={} relays={:?}",
1366            log_label,
1367            tree_name,
1368            hex::encode(pending_root.hash),
1369            successful_relays,
1370        );
1371        if !failed_relays.is_empty() {
1372            warn!(
1373                "Nostr mirror publish had relay failures: tree={} failures={:?}",
1374                tree_name, failed_relays
1375            );
1376        }
1377        Ok(())
1378    }
1379
1380    async fn publish_root_event_to_relays(
1381        &self,
1382        publish_client: &Client,
1383        relays: &[String],
1384        event: &Event,
1385    ) -> Result<(Vec<String>, Vec<String>)> {
1386        let mut successful_relays = Vec::new();
1387        let mut failed_relays = Vec::new();
1388
1389        for relay in relays {
1390            match publish_client
1391                .send_event_to([relay.as_str()], event.clone())
1392                .await
1393            {
1394                Ok(output) => {
1395                    if output.success.is_empty() {
1396                        failed_relays.push(format!("{relay}: relay did not acknowledge publish"));
1397                        continue;
1398                    }
1399                    successful_relays.push(relay.clone());
1400                    failed_relays.extend(output.failed.into_iter().map(
1401                        |(url, reason)| match reason {
1402                            Some(reason) => format!("{url}: {reason}"),
1403                            None => format!("{url}: relay rejected publish"),
1404                        },
1405                    ));
1406                }
1407                Err(err) => {
1408                    failed_relays.push(format!("{relay}: {err}"));
1409                }
1410            }
1411        }
1412
1413        Ok((successful_relays, failed_relays))
1414    }
1415
1416    async fn latest_root_event_created_at(&self, tree_name: &str) -> Option<Timestamp> {
1417        let publish_client = self.publish_client.as_ref()?;
1418        let author = self.publish_pubkey?;
1419        let events = publish_client
1420            .get_events_of(
1421                vec![Self::build_public_root_filter(author, tree_name)],
1422                EventSource::relays(Some(self.config.fetch_timeout)),
1423            )
1424            .await
1425            .ok()?;
1426        events
1427            .iter()
1428            .filter(|event| Self::matches_public_root_event(event, tree_name))
1429            .max_by_key(|event| (event.created_at, event.id))
1430            .map(|event| event.created_at)
1431    }
1432
1433    fn build_public_root_filter(author: PublicKey, tree_name: &str) -> Filter {
1434        Filter::new()
1435            .kind(Kind::Custom(30078))
1436            .author(author)
1437            .custom_tag(
1438                SingleLetterTag::lowercase(Alphabet::D),
1439                vec![tree_name.to_string()],
1440            )
1441            .custom_tag(
1442                SingleLetterTag::lowercase(Alphabet::L),
1443                vec!["hashtree".to_string()],
1444            )
1445            .limit(50)
1446    }
1447
1448    fn matches_public_root_event(event: &Event, tree_name: &str) -> bool {
1449        event.kind == Kind::Custom(30078)
1450            && event.tags.iter().any(|tag| {
1451                let values = tag.as_slice();
1452                values.first().is_some_and(|value| value == "d")
1453                    && values.get(1).is_some_and(|value| value == tree_name)
1454            })
1455            && event.tags.iter().any(|tag| {
1456                let values = tag.as_slice();
1457                values.first().is_some_and(|value| value == "l")
1458                    && values.get(1).is_some_and(|value| value == "hashtree")
1459            })
1460    }
1461
1462    fn build_public_root_event(
1463        tree_name: &str,
1464        cid: &hashtree_core::Cid,
1465        created_at: Timestamp,
1466    ) -> EventBuilder {
1467        let mut tags = vec![
1468            Tag::identifier(tree_name.to_string()),
1469            Tag::custom(
1470                TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
1471                vec!["hashtree"],
1472            ),
1473            Tag::custom(TagKind::Custom("hash".into()), vec![hex::encode(cid.hash)]),
1474        ];
1475        if let Some(key) = cid.key {
1476            tags.push(Tag::custom(
1477                TagKind::Custom("key".into()),
1478                vec![hex::encode(key)],
1479            ));
1480        }
1481
1482        EventBuilder::new(Kind::Custom(30078), "", tags).custom_created_at(created_at)
1483    }
1484}
1485
1486fn is_missing_local_blob_push_error(error: &anyhow::Error) -> bool {
1487    error
1488        .chain()
1489        .any(|cause| cause.to_string().contains(MISSING_LOCAL_BLOB_PUSH_ERROR))
1490}
1491
1492fn later_timestamp(left: Option<Timestamp>, right: Option<Timestamp>) -> Option<Timestamp> {
1493    match (left, right) {
1494        (Some(left), Some(right)) => Some(std::cmp::max(left, right)),
1495        (Some(left), None) => Some(left),
1496        (None, Some(right)) => Some(right),
1497        (None, None) => None,
1498    }
1499}
1500
1501fn next_replaceable_created_at(now: Timestamp, latest_existing: Option<Timestamp>) -> Timestamp {
1502    match latest_existing {
1503        Some(latest) if latest >= now => Timestamp::from_secs(latest.as_u64().saturating_add(1)),
1504        _ => now,
1505    }
1506}
1507
1508#[cfg(test)]
1509mod tests;