1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::sync::Arc;
3use std::sync::Mutex;
4use std::time::Duration;
5use std::time::Instant;
6
7use anyhow::{Context, Result};
8use hashtree_nostr::{
9 CrawlConfig, CrawlReport, ListEventsOptions, NostrBridge, NostrEventStore, RelayFetchMode,
10};
11use nostr::{
12 Alphabet, Event, EventBuilder, Filter, Kind, PublicKey, SingleLetterTag, Tag, TagKind,
13 Timestamp,
14};
15use nostr_sdk::{
16 pool::RelayLimits, prelude::RelayPoolNotification, Client, EventSource, Keys, Options,
17 RelayStatus,
18};
19use tokio::sync::{watch, Mutex as AsyncMutex};
20use tracing::{debug, info, warn};
21
22use crate::blossom_push::background_blossom_push_with_store;
23use crate::socialgraph::crawler::SOCIALGRAPH_RELAY_EVENT_MAX_SIZE;
24use crate::socialgraph::{self, SocialGraphBackend, SocialGraphStore};
25use crate::HashtreeStore;
26
27#[cfg(not(test))]
28const MIRROR_STARTUP_DELAY: Duration = Duration::from_secs(8);
29#[cfg(test)]
30const MIRROR_STARTUP_DELAY: Duration = Duration::from_millis(50);
31
32#[cfg(not(test))]
33const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_secs(1);
34#[cfg(test)]
35const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_millis(250);
36
37#[cfg(not(test))]
38const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
39#[cfg(test)]
40const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_millis(100);
41
42#[cfg(not(test))]
43const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_secs(30);
44#[cfg(test)]
45const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_millis(100);
46
47const KIND_LONG_FORM_CONTENT: u16 = 30_023;
48const DEFAULT_HISTORY_KINDS: [u16; 7] = [0, 1, 3, 6, 7, 9735, KIND_LONG_FORM_CONTENT];
49const DEFAULT_EVENT_TREE_NAME: &str = "nostr-event-index";
50const DEFAULT_PROFILE_SEARCH_TREE_NAME: &str = "profile-search";
51const DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME: &str = "profiles-by-pubkey";
52const METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 1;
53const METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE: usize = 64;
54const DEFAULT_FULL_TEXT_NOTE_HISTORY_FOLLOW_DISTANCE: u32 = 2;
55const DEFAULT_FULL_TEXT_NOTE_HISTORY_MAX_RELAY_PAGES: usize = 0;
56const LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER: usize = 8;
57const LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 16;
58const LARGE_HISTORY_SYNC_MAX_RELAY_PAGES: usize = 20;
59
60#[cfg(not(test))]
61const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_secs(300);
62#[cfg(test)]
63const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_millis(100);
64
65#[cfg(not(test))]
66const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_secs(5);
67#[cfg(test)]
68const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_millis(20);
69
70#[cfg(not(test))]
71const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_secs(30);
72#[cfg(test)]
73const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_millis(100);
74
75#[cfg(not(test))]
76const MIRROR_ROOT_UPLOAD_RETRY_INTERVAL: Duration = Duration::from_secs(60);
77#[cfg(test)]
78const MIRROR_ROOT_UPLOAD_RETRY_INTERVAL: Duration = Duration::from_millis(100);
79
80const MISSING_LOCAL_BLOB_PUSH_ERROR: &str = "missing local blob";
81
82#[derive(Debug, Clone)]
83pub struct NostrMirrorConfig {
84 pub relays: Vec<String>,
85 pub publish_relays: Vec<String>,
86 pub blossom_write_servers: Vec<String>,
87 pub max_follow_distance: u32,
88 pub overmute_threshold: f64,
89 pub author_batch_size: usize,
90 pub history_sync_author_chunk_size: usize,
91 pub history_sync_per_author_event_limit: usize,
92 pub missing_profile_backfill_batch_size: usize,
93 pub fetch_timeout: Duration,
94 pub relay_event_max_size: Option<u32>,
95 pub require_negentropy: bool,
96 pub kinds: Vec<u16>,
97 pub history_sync_on_start: bool,
98 pub history_sync_on_reconnect: bool,
99 pub full_text_note_history_follow_distance: Option<u32>,
100 pub full_text_note_history_max_relay_pages: usize,
101 pub published_event_tree_name: Option<String>,
102 pub published_profile_search_tree_name: Option<String>,
103 pub published_profiles_by_pubkey_tree_name: Option<String>,
104}
105
106impl Default for NostrMirrorConfig {
107 fn default() -> Self {
108 Self {
109 relays: Vec::new(),
110 publish_relays: Vec::new(),
111 blossom_write_servers: Vec::new(),
112 max_follow_distance: 2,
113 overmute_threshold: 1.0,
114 author_batch_size: 256,
115 history_sync_author_chunk_size: 5_000,
116 history_sync_per_author_event_limit: 256,
117 missing_profile_backfill_batch_size: 5_000,
118 fetch_timeout: Duration::from_secs(15),
119 relay_event_max_size: Some(SOCIALGRAPH_RELAY_EVENT_MAX_SIZE),
120 require_negentropy: false,
121 kinds: DEFAULT_HISTORY_KINDS.to_vec(),
122 history_sync_on_start: true,
123 history_sync_on_reconnect: true,
124 full_text_note_history_follow_distance: Some(
125 DEFAULT_FULL_TEXT_NOTE_HISTORY_FOLLOW_DISTANCE,
126 ),
127 full_text_note_history_max_relay_pages: DEFAULT_FULL_TEXT_NOTE_HISTORY_MAX_RELAY_PAGES,
128 published_event_tree_name: Some(DEFAULT_EVENT_TREE_NAME.to_string()),
129 published_profile_search_tree_name: Some(DEFAULT_PROFILE_SEARCH_TREE_NAME.to_string()),
130 published_profiles_by_pubkey_tree_name: Some(
131 DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME.to_string(),
132 ),
133 }
134 }
135}
136
137#[derive(Debug, Default)]
138struct RootPublishState {
139 pending_root: Option<hashtree_core::Cid>,
140 last_changed_at: Option<Instant>,
141 dirty_since: Option<Instant>,
142 last_published_root: Option<hashtree_core::Cid>,
143 last_published_at: Option<Instant>,
144 last_published_created_at: Option<Timestamp>,
145 last_uploaded_root: Option<hashtree_core::Cid>,
146 last_uploaded_at: Option<Instant>,
147 upload_in_progress_root: Option<hashtree_core::Cid>,
148 last_upload_failed_at: Option<Instant>,
149 last_upload_error: Option<String>,
150 missing_blob_rebuild_required: bool,
151}
152
153#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154struct HistorySyncPlan {
155 relay_fetch_mode: RelayFetchMode,
156 author_batch_size: usize,
157 per_author_event_limit: usize,
158 relay_page_size: usize,
159 max_relay_pages: usize,
160}
161
162pub struct BackgroundNostrMirror {
163 config: NostrMirrorConfig,
164 store: Arc<HashtreeStore>,
165 graph_store: Arc<SocialGraphStore>,
166 client: Client,
167 publish_client: Option<Client>,
168 publish_pubkey: Option<PublicKey>,
169 event_publish_state: Arc<Mutex<RootPublishState>>,
170 profile_search_publish_state: Arc<Mutex<RootPublishState>>,
171 profiles_by_pubkey_publish_state: Arc<Mutex<RootPublishState>>,
172 pending_live_events: Mutex<BTreeMap<String, Event>>,
173 missing_profile_cursor: Mutex<usize>,
174 history_sync_lock: AsyncMutex<()>,
175 shutdown_tx: watch::Sender<bool>,
176 shutdown_rx: watch::Receiver<bool>,
177}
178
179impl BackgroundNostrMirror {
180 pub async fn new(
181 config: NostrMirrorConfig,
182 store: Arc<HashtreeStore>,
183 graph_store: Arc<SocialGraphStore>,
184 publish_keys: Option<Keys>,
185 ) -> Result<Self> {
186 let client = if let Some(max_size) = config.relay_event_max_size {
187 let mut limits = RelayLimits::default();
188 limits.events.max_size = Some(max_size);
189 Client::with_opts(Keys::generate(), Options::new().relay_limits(limits))
190 } else {
191 Client::new(Keys::generate())
192 };
193 for relay in &config.relays {
194 client
195 .add_relay(relay)
196 .await
197 .with_context(|| format!("add mirror relay {relay}"))?;
198 }
199 client.connect().await;
200
201 let publish_pubkey = publish_keys.as_ref().map(Keys::public_key);
202 let publish_client = if let Some(keys) = publish_keys {
203 if config.publish_relays.is_empty() {
204 None
205 } else {
206 let client = Client::with_opts(keys, Options::new().wait_for_send(false));
207 for relay in &config.publish_relays {
208 client
209 .add_relay(relay)
210 .await
211 .with_context(|| format!("add mirror publish relay {relay}"))?;
212 }
213 client.connect().await;
214 Some(client)
215 }
216 } else {
217 None
218 };
219
220 let (shutdown_tx, shutdown_rx) = watch::channel(false);
221 Ok(Self {
222 config,
223 store,
224 graph_store,
225 client,
226 publish_client,
227 publish_pubkey,
228 event_publish_state: Arc::new(Mutex::new(RootPublishState::default())),
229 profile_search_publish_state: Arc::new(Mutex::new(RootPublishState::default())),
230 profiles_by_pubkey_publish_state: Arc::new(Mutex::new(RootPublishState::default())),
231 pending_live_events: Mutex::new(BTreeMap::new()),
232 missing_profile_cursor: Mutex::new(0),
233 history_sync_lock: AsyncMutex::new(()),
234 shutdown_tx,
235 shutdown_rx,
236 })
237 }
238
239 pub fn shutdown(&self) {
240 let _ = self.shutdown_tx.send(true);
241 }
242
243 fn sync_publish_roots_from_store(&self) -> Result<()> {
244 self.note_public_events_root_change()?;
245 self.note_profile_search_root_change()?;
246 self.note_profiles_by_pubkey_root_change()?;
247 Ok(())
248 }
249
250 async fn publish_pending_roots(
251 &self,
252 force_event: bool,
253 force_profile_search: bool,
254 force_profiles_by_pubkey: bool,
255 ) -> (Result<()>, Result<()>, Result<()>) {
256 tokio::join!(
257 self.maybe_publish_event_root(force_event),
258 self.maybe_publish_profile_search_root(force_profile_search),
259 self.maybe_publish_profiles_by_pubkey_root(force_profiles_by_pubkey),
260 )
261 }
262
263 async fn publish_priority_roots(
264 &self,
265 force_event: bool,
266 force_profile_search: bool,
267 force_profiles_by_pubkey: bool,
268 ) -> (Result<()>, Result<()>, Result<()>) {
269 let (profile_search_result, profiles_by_pubkey_result) = tokio::join!(
270 async {
271 if force_profile_search {
272 self.maybe_publish_profile_search_root(true).await
273 } else {
274 Ok(())
275 }
276 },
277 async {
278 if force_profiles_by_pubkey {
279 self.maybe_publish_profiles_by_pubkey_root(true).await
280 } else {
281 Ok(())
282 }
283 },
284 );
285 let event_result = if force_event {
286 self.maybe_publish_event_root(true).await
287 } else {
288 Ok(())
289 };
290 (
291 event_result,
292 profile_search_result,
293 profiles_by_pubkey_result,
294 )
295 }
296
297 pub async fn run(self: Arc<Self>) -> Result<()> {
298 if self.config.relays.is_empty() || self.config.max_follow_distance == 0 {
299 return Ok(());
300 }
301
302 info!(
303 "Nostr mirror starting: relays={} max_follow_distance={} negentropy_only={} kinds={:?} history_sync_author_chunk_size={} history_sync_on_start={} history_sync_on_reconnect={}",
304 self.config.relays.len(),
305 self.config.max_follow_distance,
306 self.config.require_negentropy,
307 self.config.kinds,
308 self.config.history_sync_author_chunk_size.max(1),
309 self.config.history_sync_on_start,
310 self.config.history_sync_on_reconnect
311 );
312
313 tokio::time::sleep(MIRROR_STARTUP_DELAY).await;
314 tokio::time::sleep(MIRROR_CONNECT_SETTLE_DELAY).await;
315 let live_since = Timestamp::now();
316 self.sync_publish_roots_from_store()?;
317 let (event_result, profile_search_result, profiles_by_pubkey_result) =
318 self.publish_priority_roots(true, true, true).await;
319 if let Err(err) = event_result {
320 warn!(
321 "Nostr mirror event-root publish failed on startup: {:#}",
322 err
323 );
324 }
325 if let Err(err) = profile_search_result {
326 warn!(
327 "Nostr mirror profile-search publish failed on startup: {:#}",
328 err
329 );
330 }
331 if let Err(err) = profiles_by_pubkey_result {
332 warn!(
333 "Nostr mirror profiles-by-pubkey publish failed on startup: {:#}",
334 err
335 );
336 }
337
338 let initial_authors = self.collect_authors()?;
339 if initial_authors.is_empty() {
340 info!("Nostr mirror: no social-graph authors to mirror yet");
341 }
342
343 let mut subscribed_authors = HashSet::new();
344 self.subscribe_authors_since(&initial_authors, live_since, &mut subscribed_authors)
345 .await?;
346
347 if !initial_authors.is_empty() && self.config.history_sync_on_start {
348 self.spawn_startup_history_sync(initial_authors.clone());
349 }
350
351 let mut relay_statuses = self.capture_relay_statuses().await;
352 let mut last_reconnect_history_sync_at: Option<Instant> = None;
353 let mut last_missing_profile_backfill_at: Option<Instant> = None;
354 let mut notifications = self.client.notifications();
355 let mut shutdown_rx = self.shutdown_rx.clone();
356 let mut refresh_interval = tokio::time::interval(MIRROR_AUTHOR_REFRESH_INTERVAL);
357 let mut publish_interval = tokio::time::interval(MIRROR_ROOT_PUBLISH_DEBOUNCE);
358
359 loop {
360 tokio::select! {
361 _ = shutdown_rx.changed() => {
362 if *shutdown_rx.borrow() {
363 break;
364 }
365 }
366 _ = refresh_interval.tick() => {
367 let authors = self.collect_authors()?;
368 let new_authors = authors
369 .into_iter()
370 .filter(|author| !subscribed_authors.contains(author))
371 .collect::<Vec<_>>();
372 if !new_authors.is_empty() {
373 debug!(
374 "Nostr mirror discovered {} newly reachable author(s)",
375 new_authors.len()
376 );
377 self.subscribe_authors_since(
378 &new_authors,
379 Timestamp::now(),
380 &mut subscribed_authors,
381 )
382 .await?;
383 self.spawn_author_history_sync(
384 "new-author catch-up",
385 new_authors.clone(),
386 true,
387 true,
388 );
389 }
390 if self.should_backfill_missing_profiles(last_missing_profile_backfill_at) {
391 let missing_profile_authors = self.collect_missing_profile_authors(
392 self.config.missing_profile_backfill_batch_size,
393 )?;
394 if !missing_profile_authors.is_empty() {
395 info!(
396 "Nostr mirror missing-profile backfill starting: authors={}",
397 missing_profile_authors.len()
398 );
399 self.spawn_missing_profile_backfill(missing_profile_authors);
400 last_missing_profile_backfill_at = Some(Instant::now());
401 }
402 }
403 }
404 _ = publish_interval.tick() => {
405 self.sync_publish_roots_from_store()?;
406 if let Err(err) = self.flush_live_events().await {
407 warn!("Nostr mirror live event flush failed: {:#}", err);
408 }
409 let (event_result, profile_search_result, profiles_by_pubkey_result) = self
410 .publish_pending_roots(false, false, false)
411 .await;
412 if let Err(err) = event_result {
413 warn!("Nostr mirror event-root publish failed: {:#}", err);
414 }
415 if let Err(err) = profile_search_result {
416 warn!("Nostr mirror profile-search publish failed: {:#}", err);
417 }
418 if let Err(err) = profiles_by_pubkey_result {
419 warn!("Nostr mirror profiles-by-pubkey publish failed: {:#}", err);
420 }
421 }
422 notification = notifications.recv() => {
423 match notification {
424 Ok(RelayPoolNotification::Event { event, .. }) => {
425 self.ingest_live_event(&event)?;
426 }
427 Ok(RelayPoolNotification::RelayStatus { relay_url, status }) => {
428 let relay_url = relay_url.to_string();
429 let previous = relay_statuses.insert(relay_url.clone(), status);
430 if Self::should_history_sync_on_reconnect(
431 self.config.history_sync_on_reconnect,
432 previous,
433 status,
434 ) && Self::should_run_reconnect_history_sync(
435 last_reconnect_history_sync_at.as_ref(),
436 )
437 {
438 let authors = self.collect_authors()?;
439 if !authors.is_empty() {
440 info!(
441 "Nostr mirror relay reconnected; running catch-up history sync: relay={} authors={} negentropy_only={}",
442 relay_url,
443 authors.len(),
444 self.config.require_negentropy
445 );
446 self.spawn_author_history_sync(
447 "relay reconnect catch-up",
448 authors,
449 false,
450 false,
451 );
452 last_reconnect_history_sync_at = Some(Instant::now());
453 }
454 }
455 }
456 Ok(RelayPoolNotification::Shutdown) => break,
457 Ok(_) => {}
458 Err(err) => {
459 warn!("Nostr mirror notification error: {}", err);
460 break;
461 }
462 }
463 }
464 }
465 }
466
467 if let Err(err) = self.flush_live_events().await {
468 warn!(
469 "Nostr mirror live event flush failed during shutdown: {:#}",
470 err
471 );
472 }
473 if let Err(err) = self.sync_publish_roots_from_store() {
474 warn!(
475 "Nostr mirror root-state refresh failed during shutdown: {:#}",
476 err
477 );
478 }
479 let (event_result, profile_search_result, profiles_by_pubkey_result) =
480 self.publish_pending_roots(true, true, true).await;
481 if let Err(err) = event_result {
482 warn!(
483 "Nostr mirror event-root publish failed during shutdown: {:#}",
484 err
485 );
486 }
487 if let Err(err) = profile_search_result {
488 warn!(
489 "Nostr mirror profile-search publish failed during shutdown: {:#}",
490 err
491 );
492 }
493 if let Err(err) = profiles_by_pubkey_result {
494 warn!(
495 "Nostr mirror profiles-by-pubkey publish failed during shutdown: {:#}",
496 err
497 );
498 }
499 let _ = self.client.disconnect().await;
500 if let Some(client) = self.publish_client.as_ref() {
501 let _ = client.disconnect().await;
502 }
503 Ok(())
504 }
505
506 fn spawn_startup_history_sync(self: &Arc<Self>, initial_authors: Vec<String>) {
507 let mirror = Arc::clone(self);
508 tokio::task::spawn_blocking(move || {
509 let runtime = tokio::runtime::Builder::new_current_thread()
510 .enable_all()
511 .build()
512 .expect("build nostr mirror startup history sync runtime");
513 runtime.block_on(async move {
514 let _guard = mirror.history_sync_lock.lock().await;
515 if let Err(err) = mirror.run_startup_history_sync(initial_authors).await {
516 warn!("Nostr mirror startup history sync failed: {:#}", err);
517 }
518 });
519 });
520 }
521
522 async fn run_startup_history_sync(&self, initial_authors: Vec<String>) -> Result<()> {
523 self.history_sync_recent_text_notes_for_reachable_authors()
524 .await?;
525 self.history_sync_full_text_notes_for_reachable_authors()
526 .await?;
527 if self.should_backfill_missing_profiles(None) {
528 let missing_profile_authors = self
529 .collect_missing_profile_authors(self.config.missing_profile_backfill_batch_size)?;
530 if !missing_profile_authors.is_empty() {
531 info!(
532 "Nostr mirror missing-profile backfill starting: authors={}",
533 missing_profile_authors.len()
534 );
535 self.history_sync_authors_with_kinds(
536 missing_profile_authors,
537 &[Kind::Metadata.as_u16()],
538 )
539 .await?;
540 }
541 }
542 self.history_sync_authors(initial_authors).await
543 }
544
545 fn spawn_author_history_sync(
546 self: &Arc<Self>,
547 label: &'static str,
548 authors: Vec<String>,
549 include_full_text_notes: bool,
550 wait_for_existing_sync: bool,
551 ) {
552 let mirror = Arc::clone(self);
553 tokio::task::spawn_blocking(move || {
554 let runtime = tokio::runtime::Builder::new_current_thread()
555 .enable_all()
556 .build()
557 .expect("build nostr mirror author history sync runtime");
558 runtime.block_on(async move {
559 if wait_for_existing_sync {
560 let _guard = mirror.history_sync_lock.lock().await;
561 if let Err(err) = mirror
562 .run_author_history_sync(authors, include_full_text_notes)
563 .await
564 {
565 warn!("Nostr mirror {label} failed: {:#}", err);
566 }
567 return;
568 }
569
570 let Ok(_guard) = mirror.history_sync_lock.try_lock() else {
571 info!("Nostr mirror {label} skipped; another history sync is running");
572 return;
573 };
574 if let Err(err) = mirror
575 .run_author_history_sync(authors, include_full_text_notes)
576 .await
577 {
578 warn!("Nostr mirror {label} failed: {:#}", err);
579 }
580 });
581 });
582 }
583
584 async fn run_author_history_sync(
585 &self,
586 authors: Vec<String>,
587 include_full_text_notes: bool,
588 ) -> Result<()> {
589 if include_full_text_notes {
590 self.history_sync_recent_text_notes_for_authors(authors.clone())
591 .await?;
592 self.history_sync_full_text_notes_for_authors(authors.clone())
593 .await?;
594 }
595 self.history_sync_authors(authors).await
596 }
597
598 fn spawn_missing_profile_backfill(self: &Arc<Self>, authors: Vec<String>) {
599 let mirror = Arc::clone(self);
600 tokio::task::spawn_blocking(move || {
601 let runtime = tokio::runtime::Builder::new_current_thread()
602 .enable_all()
603 .build()
604 .expect("build nostr mirror missing profile runtime");
605 runtime.block_on(async move {
606 let Ok(_guard) = mirror.history_sync_lock.try_lock() else {
607 info!(
608 "Nostr mirror missing-profile backfill skipped; another history sync is running"
609 );
610 return;
611 };
612 if let Err(err) = mirror
613 .history_sync_authors_with_kinds(authors, &[Kind::Metadata.as_u16()])
614 .await
615 {
616 warn!("Nostr mirror missing-profile backfill failed: {:#}", err);
617 }
618 });
619 });
620 }
621
622 async fn capture_relay_statuses(&self) -> HashMap<String, RelayStatus> {
623 let mut statuses = HashMap::new();
624 for (relay_url, relay) in self.client.relays().await {
625 statuses.insert(relay_url.to_string(), relay.status().await);
626 }
627 statuses
628 }
629
630 async fn has_connected_publish_relay(&self) -> bool {
631 let Some(client) = self.publish_client.as_ref() else {
632 return false;
633 };
634 Self::client_has_connected_relay(client).await
635 }
636
637 async fn client_has_connected_relay(client: &Client) -> bool {
638 for (_relay_url, relay) in client.relays().await {
639 if relay.status().await == RelayStatus::Connected {
640 return true;
641 }
642 }
643 false
644 }
645
646 fn collect_authors(&self) -> Result<Vec<String>> {
647 self.collect_authors_with_max_distance(self.config.max_follow_distance)
648 }
649
650 fn collect_authors_with_max_distance(&self, max_distance: u32) -> Result<Vec<String>> {
651 let mut authors = Vec::new();
652 let mut seen = HashSet::new();
653 for distance in 0..=max_distance {
654 for pubkey in socialgraph::SocialGraphBackend::users_by_follow_distance(
655 self.graph_store.as_ref(),
656 distance,
657 )
658 .with_context(|| format!("load social-graph distance {distance}"))?
659 {
660 if self
661 .graph_store
662 .is_overmuted_user(&pubkey, self.config.overmute_threshold)?
663 {
664 continue;
665 }
666 let hex = hex::encode(pubkey);
667 if seen.insert(hex.clone()) {
668 authors.push(hex);
669 }
670 }
671 }
672 Ok(authors)
673 }
674
675 fn full_text_note_history_follow_distance(&self) -> Option<u32> {
676 let distance = self.config.full_text_note_history_follow_distance?;
677 if self
678 .config
679 .kinds
680 .iter()
681 .any(|kind| *kind == Kind::TextNote.as_u16() || *kind == KIND_LONG_FORM_CONTENT)
682 {
683 Some(distance.min(self.config.max_follow_distance))
684 } else {
685 None
686 }
687 }
688
689 fn full_text_note_history_max_relay_pages(&self) -> Option<usize> {
690 Self::full_text_note_history_max_relay_pages_for_config(&self.config)
691 }
692
693 fn full_text_note_history_max_relay_pages_for_config(
694 config: &NostrMirrorConfig,
695 ) -> Option<usize> {
696 let pages = config.full_text_note_history_max_relay_pages;
697 if pages == 0 {
698 None
699 } else {
700 Some(pages)
701 }
702 }
703
704 fn collect_missing_profile_authors(&self, limit: usize) -> Result<Vec<String>> {
705 if limit == 0 {
706 return Ok(Vec::new());
707 }
708
709 let authors = self.collect_authors()?;
710 if authors.is_empty() {
711 return Ok(Vec::new());
712 }
713
714 let mut cursor = self
715 .missing_profile_cursor
716 .lock()
717 .expect("missing profile cursor");
718 let mut index = (*cursor).min(authors.len());
719 let mut scanned = 0usize;
720 let mut missing = Vec::new();
721
722 while scanned < authors.len() && missing.len() < limit {
723 let author = &authors[index];
724 if self.graph_store.latest_profile_event(author)?.is_none() {
725 missing.push(author.clone());
726 }
727 index += 1;
728 if index == authors.len() {
729 index = 0;
730 }
731 scanned += 1;
732 }
733
734 *cursor = index;
735 Ok(missing)
736 }
737
738 fn should_backfill_missing_profiles(&self, last_run: Option<Instant>) -> bool {
739 if self.config.missing_profile_backfill_batch_size == 0
740 || !self.config.kinds.contains(&Kind::Metadata.as_u16())
741 {
742 return false;
743 }
744 match last_run {
745 Some(last_run) => last_run.elapsed() >= MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL,
746 None => true,
747 }
748 }
749
750 fn should_history_sync_on_reconnect(
751 history_sync_on_reconnect: bool,
752 previous: Option<RelayStatus>,
753 status: RelayStatus,
754 ) -> bool {
755 history_sync_on_reconnect
756 && status == RelayStatus::Connected
757 && matches!(
758 previous,
759 Some(
760 RelayStatus::Initialized
761 | RelayStatus::Pending
762 | RelayStatus::Connecting
763 | RelayStatus::Disconnected
764 | RelayStatus::Terminated
765 )
766 )
767 }
768
769 fn should_run_reconnect_history_sync(last_run: Option<&Instant>) -> bool {
770 match last_run {
771 None => true,
772 Some(last_run) => last_run.elapsed() >= MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN,
773 }
774 }
775
776 fn is_metadata_only_history_sync(kinds: &[u16]) -> bool {
777 !kinds.is_empty() && kinds.iter().all(|kind| *kind == Kind::Metadata.as_u16())
778 }
779
780 fn history_sync_kinds_affect_profile_or_graph(kinds: &[u16]) -> bool {
781 kinds.is_empty()
782 || kinds.iter().any(|kind| {
783 *kind == Kind::Metadata.as_u16()
784 || *kind == Kind::ContactList.as_u16()
785 || *kind == Kind::MuteList.as_u16()
786 })
787 }
788
789 fn history_sync_plan_for(
790 config: &NostrMirrorConfig,
791 authors: usize,
792 kinds: &[u16],
793 ) -> HistorySyncPlan {
794 let author_batch_size = config.author_batch_size.max(1);
795 let per_author_event_limit = config.history_sync_per_author_event_limit.max(1);
796 let relay_page_size = 1_000;
797 let max_relay_pages = 10;
798
799 if Self::is_metadata_only_history_sync(kinds) {
800 return HistorySyncPlan {
801 relay_fetch_mode: RelayFetchMode::AuthorBatches,
802 author_batch_size: author_batch_size.min(METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE),
803 per_author_event_limit: METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT,
804 relay_page_size,
805 max_relay_pages,
806 };
807 }
808
809 if authors > author_batch_size.saturating_mul(LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER) {
810 return HistorySyncPlan {
811 relay_fetch_mode: RelayFetchMode::GlobalRecent,
812 author_batch_size,
813 per_author_event_limit: per_author_event_limit
814 .min(LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT)
815 .max(1),
816 relay_page_size,
817 max_relay_pages: LARGE_HISTORY_SYNC_MAX_RELAY_PAGES,
818 };
819 }
820
821 HistorySyncPlan {
822 relay_fetch_mode: RelayFetchMode::AuthorBatches,
823 author_batch_size,
824 per_author_event_limit,
825 relay_page_size,
826 max_relay_pages,
827 }
828 }
829
830 fn history_sync_plan(&self, authors: usize, kinds: &[u16]) -> HistorySyncPlan {
831 Self::history_sync_plan_for(&self.config, authors, kinds)
832 }
833
834 async fn history_sync_authors(&self, authors: Vec<String>) -> Result<()> {
835 self.history_sync_authors_with_kinds(authors, &self.config.kinds)
836 .await
837 }
838
839 async fn history_sync_authors_with_kinds(
840 &self,
841 authors: Vec<String>,
842 kinds: &[u16],
843 ) -> Result<()> {
844 self.history_sync_authors_with_kinds_and_mode(authors, kinds, false, None)
845 .await
846 }
847
848 async fn history_sync_full_text_notes_for_reachable_authors(&self) -> Result<()> {
849 let Some(distance) = self.full_text_note_history_follow_distance() else {
850 return Ok(());
851 };
852 info!(
853 "Nostr mirror full text content history author collection starting: max_follow_distance={distance}"
854 );
855 let authors = self.collect_authors_with_max_distance(distance)?;
856 self.history_sync_full_text_notes_for_authors(authors).await
857 }
858
859 async fn history_sync_recent_text_notes_for_reachable_authors(&self) -> Result<()> {
860 let Some(distance) = self.full_text_note_history_follow_distance() else {
861 return Ok(());
862 };
863 info!(
864 "Nostr mirror recent text content catch-up author collection starting: max_follow_distance={distance}"
865 );
866 let authors = self.collect_authors_with_max_distance(distance)?;
867 self.history_sync_recent_text_notes_for_authors(authors)
868 .await
869 }
870
871 async fn history_sync_recent_text_notes_for_authors(&self, authors: Vec<String>) -> Result<()> {
872 if authors.is_empty() || self.full_text_note_history_follow_distance().is_none() {
873 return Ok(());
874 }
875
876 info!(
877 "Nostr mirror recent text content catch-up starting: authors={}",
878 authors.len()
879 );
880 let kinds = [Kind::TextNote.as_u16(), KIND_LONG_FORM_CONTENT];
881 let chunk_size = self
882 .config
883 .author_batch_size
884 .max(1)
885 .saturating_mul(LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER + 1);
886 self.history_sync_authors_chunked(
887 authors,
888 |current_root, author_chunk| async move {
889 self.history_sync_author_chunk(current_root, author_chunk, &kinds, false, None)
890 .await
891 },
892 false,
893 Some(chunk_size),
894 )
895 .await
896 }
897
898 async fn history_sync_full_text_notes_for_authors(&self, authors: Vec<String>) -> Result<()> {
899 let Some(distance) = self.full_text_note_history_follow_distance() else {
900 return Ok(());
901 };
902 let Some(max_relay_pages) = self.full_text_note_history_max_relay_pages() else {
903 info!("Nostr mirror full text content history sync skipped: max_relay_pages=0");
904 return Ok(());
905 };
906 let mut close_authors = Vec::new();
907 for author in authors {
908 let Ok(pubkey) = hex::decode(&author) else {
909 continue;
910 };
911 let Ok(pubkey) = <[u8; 32]>::try_from(pubkey.as_slice()) else {
912 continue;
913 };
914 if self
915 .graph_store
916 .follow_distance(&pubkey)?
917 .is_some_and(|actual_distance| actual_distance <= distance)
918 {
919 close_authors.push(author);
920 }
921 }
922 if close_authors.is_empty() {
923 return Ok(());
924 }
925
926 info!(
927 "Nostr mirror full text content history sync starting: authors={} max_follow_distance={} max_relay_pages={}",
928 close_authors.len(),
929 distance,
930 max_relay_pages
931 );
932 let kinds = [Kind::TextNote.as_u16(), KIND_LONG_FORM_CONTENT];
933 self.history_sync_authors_with_kinds_and_mode(
934 close_authors,
935 &kinds,
936 true,
937 Some(max_relay_pages),
938 )
939 .await
940 }
941
942 async fn history_sync_authors_with_kinds_and_mode(
943 &self,
944 authors: Vec<String>,
945 kinds: &[u16],
946 full_author_history: bool,
947 max_relay_pages: Option<usize>,
948 ) -> Result<()> {
949 let update_profile_and_graph = Self::history_sync_kinds_affect_profile_or_graph(kinds);
950 self.history_sync_authors_chunked(
951 authors,
952 |current_root, author_chunk| async move {
953 self.history_sync_author_chunk(
954 current_root,
955 author_chunk,
956 kinds,
957 full_author_history,
958 max_relay_pages,
959 )
960 .await
961 },
962 update_profile_and_graph,
963 None,
964 )
965 .await
966 }
967
968 async fn history_sync_authors_chunked<F, Fut>(
969 &self,
970 authors: Vec<String>,
971 mut run_chunk: F,
972 update_profile_and_graph: bool,
973 chunk_size_override: Option<usize>,
974 ) -> Result<()>
975 where
976 F: FnMut(Option<hashtree_core::Cid>, Vec<String>) -> Fut,
977 Fut: std::future::Future<Output = Result<CrawlReport>>,
978 {
979 if authors.is_empty() {
980 return Ok(());
981 }
982
983 info!(
984 "Nostr mirror history sync starting: authors={} relays={} negentropy_only={}",
985 authors.len(),
986 self.config.relays.len(),
987 self.config.require_negentropy
988 );
989
990 let mut current_root = self.graph_store.public_events_root_for_write()?;
991 let mut last_error = None;
992 let mut applied_chunks = 0usize;
993 let mut failed_chunks = 0usize;
994 let chunk_size = chunk_size_override
995 .unwrap_or(self.config.history_sync_author_chunk_size)
996 .max(1);
997 let total_chunks = authors.len().div_ceil(chunk_size);
998
999 for (chunk_index, author_chunk) in authors.chunks(chunk_size).enumerate() {
1000 let author_chunk = author_chunk.to_vec();
1001 let author_count = author_chunk.len();
1002 info!(
1003 "Nostr mirror history sync chunk starting: chunk={}/{} authors={}",
1004 chunk_index + 1,
1005 total_chunks,
1006 author_count
1007 );
1008 let mut report = match run_chunk(current_root.clone(), author_chunk.clone()).await {
1009 Ok(report) => report,
1010 Err(err) => {
1011 failed_chunks = failed_chunks.saturating_add(1);
1012 warn!(
1013 "Nostr mirror history sync chunk failed: chunk={}/{} authors={} error={:#}",
1014 chunk_index + 1,
1015 total_chunks,
1016 author_count,
1017 err
1018 );
1019 last_error = Some(err);
1020 continue;
1021 }
1022 };
1023
1024 let latest_root = self.graph_store.public_events_root_for_write()?;
1025 if latest_root != current_root {
1026 info!(
1027 "Nostr mirror history sync root advanced while chunk was fetching; merging chunk into latest root: chunk={}/{} authors={} events_applied={}",
1028 chunk_index + 1,
1029 total_chunks,
1030 author_count,
1031 report.applied_events.len()
1032 );
1033 if report.applied_events.is_empty() {
1034 report.root = latest_root.clone();
1035 } else {
1036 let event_store = NostrEventStore::new(self.store.store_arc());
1037 report.root = event_store
1038 .build(latest_root.as_ref(), report.applied_events.clone())
1039 .await
1040 .context("merge history chunk into latest mirrored event root")?;
1041 }
1042 current_root = latest_root;
1043 }
1044
1045 if report.root != current_root {
1046 self.apply_history_root_with_options(
1047 report.root.as_ref(),
1048 update_profile_and_graph,
1049 true,
1050 )
1051 .await?;
1052 current_root = report.root.clone();
1053 info!(
1054 "Nostr mirror history sync updated trusted root: chunk={}/{} authors_processed={} events_selected={} events_seen={}",
1055 chunk_index + 1,
1056 total_chunks,
1057 report.authors_processed,
1058 report.events_selected,
1059 report.events_seen
1060 );
1061 }
1062 applied_chunks = applied_chunks.saturating_add(1);
1063 }
1064
1065 if applied_chunks == 0 {
1066 return Err(last_error
1067 .unwrap_or_else(|| anyhow::anyhow!("mirror history sync made no progress"))
1068 .context("run mirror history sync"));
1069 }
1070 if failed_chunks > 0 {
1071 warn!(
1072 "Nostr mirror history sync completed with skipped chunks: applied_chunks={} failed_chunks={}",
1073 applied_chunks, failed_chunks
1074 );
1075 }
1076 Ok(())
1077 }
1078
1079 async fn history_sync_author_chunk(
1080 &self,
1081 current_root: Option<hashtree_core::Cid>,
1082 authors: Vec<String>,
1083 kinds: &[u16],
1084 full_author_history: bool,
1085 max_relay_pages: Option<usize>,
1086 ) -> Result<CrawlReport> {
1087 let mut last_error = None;
1088 let mut report = None;
1089 let mut plan = self.history_sync_plan(authors.len(), kinds);
1090 if full_author_history {
1091 plan.relay_fetch_mode = RelayFetchMode::AuthorBatches;
1092 plan.max_relay_pages = max_relay_pages.unwrap_or(plan.max_relay_pages);
1093 }
1094 for attempt in 0..3 {
1095 let mut last_logged_authors = 0usize;
1096 let bridge = NostrBridge::new(
1097 self.store.store_arc(),
1098 CrawlConfig {
1099 relays: self.config.relays.clone(),
1100 author_allowlist: Some(authors.clone()),
1101 max_live_bytes: None,
1102 max_events_seen: None,
1103 max_authors: None,
1104 max_follow_distance: None,
1105 author_batch_size: plan.author_batch_size,
1106 per_author_event_limit: plan.per_author_event_limit,
1107 per_author_live_bytes: None,
1108 fetch_timeout: self.config.fetch_timeout,
1109 kinds: Some(kinds.to_vec()),
1110 relay_fetch_mode: plan.relay_fetch_mode,
1111 require_negentropy: self.config.require_negentropy,
1112 relay_event_max_size: self.config.relay_event_max_size,
1113 relay_page_size: plan.relay_page_size,
1114 max_relay_pages: plan.max_relay_pages,
1115 full_author_history,
1116 },
1117 );
1118
1119 match bridge
1120 .crawl_with_progress(self.graph_store.as_ref(), current_root.as_ref(), |progress| {
1121 let log_interval = self.config.author_batch_size.saturating_mul(8).max(2_048);
1122 let should_log = progress.authors_processed == progress.authors_considered
1123 || progress.authors_processed == 0
1124 || progress
1125 .authors_processed
1126 .saturating_sub(last_logged_authors)
1127 >= log_interval;
1128 if should_log {
1129 last_logged_authors = progress.authors_processed;
1130 info!(
1131 "Nostr mirror history sync progress: authors_processed={}/{} events_selected={} events_seen={}",
1132 progress.authors_processed,
1133 progress.authors_considered,
1134 progress.events_selected,
1135 progress.events_seen
1136 );
1137 }
1138 })
1139 .await
1140 {
1141 Ok(next_report) => {
1142 report = Some(next_report);
1143 break;
1144 }
1145 Err(err) => {
1146 last_error = Some(err);
1147 if attempt < 2 {
1148 tokio::time::sleep(Duration::from_millis(500)).await;
1149 }
1150 }
1151 }
1152 }
1153 report
1154 .ok_or_else(|| last_error.expect("history sync retry captured error"))
1155 .context("run mirror history sync")
1156 }
1157
1158 #[cfg(test)]
1159 async fn apply_history_root(&self, root: Option<&hashtree_core::Cid>) -> Result<()> {
1160 self.apply_history_root_with_options(root, true, true).await
1161 }
1162
1163 async fn apply_history_root_with_options(
1164 &self,
1165 root: Option<&hashtree_core::Cid>,
1166 update_profile_and_graph: bool,
1167 publish_roots: bool,
1168 ) -> Result<()> {
1169 self.graph_store.write_public_events_root(root)?;
1170 let Some(root) = root else {
1171 return Ok(());
1172 };
1173
1174 self.note_public_events_root_change()?;
1175 if update_profile_and_graph {
1176 let event_store = NostrEventStore::new(self.store.store_arc());
1177 let events = event_store
1178 .list_recent_lossy(Some(root), ListEventsOptions::default())
1179 .await
1180 .context("list trusted mirrored events")?
1181 .into_iter()
1182 .map(socialgraph::stored_event_to_nostr_event)
1183 .collect::<Result<Vec<_>>>()?;
1184
1185 self.graph_store
1186 .rebuild_profile_index_for_events(&events)
1187 .context("rebuild mirrored profile search index")?;
1188 socialgraph::ingest_graph_parsed_events(self.graph_store.as_ref(), &events)
1189 .context("sync mirrored social graph state")?;
1190 self.note_profile_search_root_change()?;
1191 self.note_profiles_by_pubkey_root_change()?;
1192 }
1193 if !publish_roots {
1194 return Ok(());
1195 }
1196 let (event_result, profile_search_result, profiles_by_pubkey_result) = self
1197 .publish_priority_roots(true, update_profile_and_graph, update_profile_and_graph)
1198 .await;
1199 if let Err(err) = event_result {
1200 warn!(
1201 "Nostr mirror event-root publish failed after root update: {:#}",
1202 err
1203 );
1204 }
1205 if let Err(err) = profile_search_result {
1206 warn!(
1207 "Nostr mirror profile-search publish failed after root update: {:#}",
1208 err
1209 );
1210 }
1211 if let Err(err) = profiles_by_pubkey_result {
1212 warn!(
1213 "Nostr mirror profiles-by-pubkey publish failed after root update: {:#}",
1214 err
1215 );
1216 }
1217 Ok(())
1218 }
1219
1220 async fn subscribe_authors_since(
1221 &self,
1222 authors: &[String],
1223 since: Timestamp,
1224 subscribed_authors: &mut HashSet<String>,
1225 ) -> Result<()> {
1226 let new_authors = authors
1227 .iter()
1228 .filter(|author| !subscribed_authors.contains(*author))
1229 .cloned()
1230 .collect::<Vec<_>>();
1231 if new_authors.is_empty() {
1232 return Ok(());
1233 }
1234
1235 for chunk in new_authors.chunks(self.config.author_batch_size.max(1)) {
1236 let pubkeys = chunk
1237 .iter()
1238 .filter_map(|author| PublicKey::from_hex(author).ok())
1239 .collect::<Vec<_>>();
1240 if pubkeys.is_empty() {
1241 continue;
1242 }
1243
1244 let filter = Filter::new()
1245 .authors(pubkeys)
1246 .kinds(self.config.kinds.iter().copied().map(Kind::from))
1247 .since(since);
1248
1249 if let Err(err) = self.client.subscribe(vec![filter], None).await {
1250 warn!(
1251 "Nostr mirror author subscription failed: authors={} error={:#}",
1252 chunk.len(),
1253 err
1254 );
1255 continue;
1256 }
1257 subscribed_authors.extend(chunk.iter().cloned());
1258 }
1259 Ok(())
1260 }
1261
1262 fn ingest_live_event(&self, event: &Event) -> Result<()> {
1263 self.pending_live_events
1264 .lock()
1265 .expect("pending live events")
1266 .insert(event.id.to_hex(), event.clone());
1267 Ok(())
1268 }
1269
1270 async fn flush_live_events(&self) -> Result<()> {
1271 let pending = {
1272 let mut pending = self
1273 .pending_live_events
1274 .lock()
1275 .expect("pending live events");
1276 if pending.is_empty() {
1277 return Ok(());
1278 }
1279 std::mem::take(&mut *pending)
1280 };
1281 let events = pending.into_values().collect::<Vec<_>>();
1282 let event_count = events.len();
1283 let previous_event_root = self.graph_store.public_events_root()?;
1284 let previous_profile_search_root = self.graph_store.profile_search_root()?;
1285 let previous_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
1286
1287 socialgraph::ingest_parsed_events_with_storage_class(
1288 self.graph_store.as_ref(),
1289 &events,
1290 socialgraph::EventStorageClass::Public,
1291 )
1292 .context("ingest live mirrored event batch")?;
1293
1294 let next_event_root = self.graph_store.public_events_root()?;
1295 let next_profile_search_root = self.graph_store.profile_search_root()?;
1296 let next_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
1297 let event_root_changed = next_event_root != previous_event_root;
1298 let profile_search_root_changed = next_profile_search_root != previous_profile_search_root;
1299 let profiles_by_pubkey_root_changed =
1300 next_profiles_by_pubkey_root != previous_profiles_by_pubkey_root;
1301
1302 if event_root_changed {
1303 self.note_public_events_root_change()?;
1304 }
1305 if profile_search_root_changed {
1306 self.note_profile_search_root_change()?;
1307 }
1308 if profiles_by_pubkey_root_changed {
1309 self.note_profiles_by_pubkey_root_change()?;
1310 }
1311 if profile_search_root_changed {
1312 self.maybe_publish_profile_search_root(true).await?;
1313 }
1314 if profiles_by_pubkey_root_changed {
1315 self.maybe_publish_profiles_by_pubkey_root(true).await?;
1316 }
1317 if event_root_changed {
1318 self.maybe_publish_event_root(true).await?;
1319 }
1320 info!(
1321 "Nostr mirror flushed live events: events={} event_root_changed={} profile_search_root_changed={} profiles_by_pubkey_root_changed={}",
1322 event_count,
1323 event_root_changed,
1324 profile_search_root_changed,
1325 profiles_by_pubkey_root_changed
1326 );
1327 Ok(())
1328 }
1329
1330 fn note_public_events_root_change(&self) -> Result<()> {
1331 let root = self.graph_store.public_events_root()?;
1332 Self::note_root_change(
1333 self.config.published_event_tree_name.as_deref(),
1334 &self.event_publish_state,
1335 root,
1336 )
1337 }
1338
1339 fn note_profile_search_root_change(&self) -> Result<()> {
1340 let root = self.graph_store.profile_search_root()?;
1341 Self::note_root_change(
1342 self.config.published_profile_search_tree_name.as_deref(),
1343 &self.profile_search_publish_state,
1344 root,
1345 )
1346 }
1347
1348 fn note_profiles_by_pubkey_root_change(&self) -> Result<()> {
1349 let root = self.graph_store.profiles_by_pubkey_root()?;
1350 Self::note_root_change(
1351 self.config
1352 .published_profiles_by_pubkey_tree_name
1353 .as_deref(),
1354 &self.profiles_by_pubkey_publish_state,
1355 root,
1356 )
1357 }
1358
1359 fn note_root_change(
1360 tree_name: Option<&str>,
1361 publish_state: &Arc<Mutex<RootPublishState>>,
1362 root: Option<hashtree_core::Cid>,
1363 ) -> Result<()> {
1364 let Some(_tree_name) = tree_name else {
1365 return Ok(());
1366 };
1367
1368 let mut state = publish_state.lock().expect("root publish state");
1369 let now = Instant::now();
1370
1371 if state.pending_root == root {
1372 return Ok(());
1373 }
1374
1375 state.pending_root = root;
1376 state.last_upload_failed_at = None;
1377 state.last_upload_error = None;
1378 state.last_changed_at = Some(now);
1379 if state.dirty_since.is_none() {
1380 state.dirty_since = Some(now);
1381 }
1382 Ok(())
1383 }
1384
1385 async fn maybe_publish_event_root(&self, force: bool) -> Result<()> {
1386 self.ensure_public_events_root_is_publishable().await?;
1387 if self.take_missing_blob_event_upload_error() {
1388 return self.rebuild_event_indexes_after_missing_blobs(force).await;
1389 }
1390 let result = self
1391 .maybe_publish_root(
1392 self.config.published_event_tree_name.as_deref(),
1393 &self.event_publish_state,
1394 "event root",
1395 force,
1396 )
1397 .await;
1398 let Err(error) = result else {
1399 if self.take_missing_blob_event_upload_error() {
1400 return self.rebuild_event_indexes_after_missing_blobs(force).await;
1401 }
1402 return Ok(());
1403 };
1404 if !is_missing_local_blob_push_error(&error) {
1405 return Err(error);
1406 }
1407
1408 self.rebuild_event_indexes_after_missing_blobs(force).await
1409 }
1410
1411 fn take_missing_blob_event_upload_error(&self) -> bool {
1412 let mut state = self
1413 .event_publish_state
1414 .lock()
1415 .expect("event root publish state");
1416 if state.upload_in_progress_root.is_some() {
1417 return false;
1418 }
1419 if !state.missing_blob_rebuild_required {
1420 return false;
1421 }
1422 state.missing_blob_rebuild_required = false;
1423 state.last_upload_error = None;
1424 state.last_upload_failed_at = None;
1425 true
1426 }
1427
1428 async fn rebuild_event_indexes_after_missing_blobs(&self, force: bool) -> Result<()> {
1429 warn!(
1430 "Nostr mirror event root DAG references missing local blobs; rebuilding event indexes from stored events"
1431 );
1432 let (public_count, ambient_count) = self
1433 .graph_store
1434 .rebuild_event_indexes_from_stored_events_async()
1435 .await
1436 .context("rebuild event indexes after missing event blobs")?;
1437 info!(
1438 "Nostr mirror rebuilt event indexes after missing blobs: public={} ambient={}",
1439 public_count, ambient_count
1440 );
1441 self.sync_publish_roots_from_store()?;
1442
1443 self.maybe_publish_root(
1444 self.config.published_event_tree_name.as_deref(),
1445 &self.event_publish_state,
1446 "event root",
1447 force,
1448 )
1449 .await
1450 }
1451
1452 async fn ensure_public_events_root_is_publishable(&self) -> Result<()> {
1453 let Some(root) = self.graph_store.public_events_root()? else {
1454 return Ok(());
1455 };
1456 let event_store = NostrEventStore::new(self.store.store_arc());
1457 if let Err(err) = event_store.validate_index_root(Some(&root)).await {
1458 warn!(
1459 "Nostr mirror refusing to publish invalid event index root {}; clearing trusted root: {}",
1460 hex::encode(root.hash),
1461 err
1462 );
1463 self.graph_store.write_public_events_root(None)?;
1464 self.note_public_events_root_change()?;
1465 }
1466 Ok(())
1467 }
1468
1469 async fn maybe_publish_profile_search_root(&self, force: bool) -> Result<()> {
1470 self.maybe_publish_root(
1471 self.config.published_profile_search_tree_name.as_deref(),
1472 &self.profile_search_publish_state,
1473 "profile search root",
1474 force,
1475 )
1476 .await
1477 }
1478
1479 async fn maybe_publish_profiles_by_pubkey_root(&self, force: bool) -> Result<()> {
1480 self.maybe_publish_root(
1481 self.config
1482 .published_profiles_by_pubkey_tree_name
1483 .as_deref(),
1484 &self.profiles_by_pubkey_publish_state,
1485 "profiles-by-pubkey root",
1486 force,
1487 )
1488 .await
1489 }
1490
1491 async fn maybe_publish_root(
1492 &self,
1493 tree_name: Option<&str>,
1494 publish_state: &Arc<Mutex<RootPublishState>>,
1495 log_label: &str,
1496 force: bool,
1497 ) -> Result<()> {
1498 let Some(tree_name) = tree_name else {
1499 return Ok(());
1500 };
1501
1502 let pending_root = {
1503 let state = publish_state.lock().expect("root publish state");
1504 let Some(pending_root) = state.pending_root.clone() else {
1505 return Ok(());
1506 };
1507
1508 let now = Instant::now();
1509 let debounce_ready = state.last_changed_at.is_some_and(|changed_at| {
1510 now.duration_since(changed_at) >= MIRROR_ROOT_PUBLISH_DEBOUNCE
1511 });
1512 let stale_ready = state.dirty_since.is_some_and(|dirty_since| {
1513 now.duration_since(dirty_since) >= MIRROR_ROOT_PUBLISH_MAX_STALENESS
1514 });
1515 if !force && !debounce_ready && !stale_ready {
1516 return Ok(());
1517 }
1518
1519 pending_root
1520 };
1521
1522 let upload_started =
1523 self.maybe_start_background_root_upload(&pending_root, publish_state, log_label);
1524 let upload_required = !self.config.blossom_write_servers.is_empty();
1525 let upload_ready = {
1526 let state = publish_state.lock().expect("root publish state");
1527 !upload_required || state.last_uploaded_root.as_ref() == Some(&pending_root)
1528 };
1529 let publish_before_upload_ready = force && upload_required && !upload_ready;
1530
1531 let mut successful_relays = Vec::new();
1532 let mut failed_relays = Vec::new();
1533 let mut published_now = false;
1534 let publish_required =
1535 self.publish_client.is_some() && !self.config.publish_relays.is_empty();
1536 if publish_required {
1537 let Some(publish_client) = self.publish_client.as_ref() else {
1538 unreachable!("publish_required implies publish_client");
1539 };
1540 if !self.has_connected_publish_relay().await {
1541 return Ok(());
1542 }
1543 if !upload_ready && !publish_before_upload_ready {
1544 if upload_started {
1545 info!(
1546 "Nostr mirror uploading {} DAG before publish: tree={} hash={}",
1547 log_label,
1548 tree_name,
1549 hex::encode(pending_root.hash),
1550 );
1551 }
1552 return Ok(());
1553 }
1554 if publish_before_upload_ready {
1555 info!(
1556 "Nostr mirror publishing {} before Blossom upload completes: tree={} hash={}",
1557 log_label,
1558 tree_name,
1559 hex::encode(pending_root.hash),
1560 );
1561 }
1562
1563 let already_published = {
1564 let state = publish_state.lock().expect("root publish state");
1565 state.last_published_root.as_ref() == Some(&pending_root)
1566 };
1567 if !already_published {
1568 let publish_relays = self.config.publish_relays.clone();
1569 let latest_known_created_at = {
1570 let state = publish_state.lock().expect("root publish state");
1571 state.last_published_created_at
1572 };
1573 let publish_created_at = next_replaceable_created_at(
1574 Timestamp::now(),
1575 later_timestamp(
1576 latest_known_created_at,
1577 self.latest_root_event_created_at(tree_name).await,
1578 ),
1579 );
1580 let event = publish_client
1581 .sign_event_builder(Self::build_public_root_event(
1582 tree_name,
1583 &pending_root,
1584 publish_created_at,
1585 ))
1586 .await
1587 .with_context(|| format!("sign {log_label} event"))?;
1588 let publish_result = self
1589 .publish_root_event_to_relays(publish_client, &publish_relays, &event)
1590 .await
1591 .with_context(|| format!("publish {log_label} event"))?;
1592 successful_relays = publish_result.0;
1593 failed_relays = publish_result.1;
1594 if successful_relays.is_empty() {
1595 let failure_summary = if failed_relays.is_empty() {
1596 "no publish relays accepted the event".to_string()
1597 } else {
1598 failed_relays.join("; ")
1599 };
1600 anyhow::bail!("no publish relays accepted the event ({failure_summary})");
1601 }
1602
1603 let mut state = publish_state.lock().expect("root publish state");
1604 if state.pending_root.as_ref() == Some(&pending_root) {
1605 state.last_published_root = Some(pending_root.clone());
1606 state.last_published_at = Some(Instant::now());
1607 state.last_published_created_at = Some(event.created_at);
1608 }
1609 published_now = true;
1610 }
1611 }
1612
1613 {
1614 let mut state = publish_state.lock().expect("root publish state");
1615 if state.pending_root.as_ref() == Some(&pending_root) {
1616 let upload_satisfied = self.config.blossom_write_servers.is_empty()
1617 || state.last_uploaded_root.as_ref() == Some(&pending_root);
1618 let publish_satisfied =
1619 !publish_required || state.last_published_root.as_ref() == Some(&pending_root);
1620 if upload_satisfied && publish_satisfied {
1621 state.dirty_since = None;
1622 }
1623 }
1624 }
1625
1626 if published_now {
1627 info!(
1628 "Nostr mirror published {}: tree={} hash={} relays={:?}",
1629 log_label,
1630 tree_name,
1631 hex::encode(pending_root.hash),
1632 successful_relays,
1633 );
1634 }
1635 if !failed_relays.is_empty() {
1636 warn!(
1637 "Nostr mirror publish had relay failures: tree={} failures={:?}",
1638 tree_name, failed_relays
1639 );
1640 }
1641 Ok(())
1642 }
1643
1644 fn maybe_start_background_root_upload(
1645 &self,
1646 pending_root: &hashtree_core::Cid,
1647 publish_state: &Arc<Mutex<RootPublishState>>,
1648 log_label: &str,
1649 ) -> bool {
1650 if self.config.blossom_write_servers.is_empty() {
1651 return false;
1652 }
1653
1654 {
1655 let mut state = publish_state.lock().expect("root publish state");
1656 if state.last_uploaded_root.as_ref() == Some(pending_root)
1657 || state.upload_in_progress_root.is_some()
1658 {
1659 return false;
1660 }
1661 if state
1662 .last_upload_failed_at
1663 .is_some_and(|failed_at| failed_at.elapsed() < MIRROR_ROOT_UPLOAD_RETRY_INTERVAL)
1664 {
1665 return false;
1666 }
1667 state.upload_in_progress_root = Some(pending_root.clone());
1668 }
1669
1670 let store = Arc::clone(&self.store);
1671 let servers = self.config.blossom_write_servers.clone();
1672 let root = pending_root.clone();
1673 let root_string = pending_root.to_string();
1674 let publish_state = Arc::clone(publish_state);
1675 let log_label = log_label.to_string();
1676 tokio::task::spawn_blocking(move || {
1677 let runtime = tokio::runtime::Builder::new_current_thread()
1678 .enable_all()
1679 .build()
1680 .expect("build nostr mirror root upload runtime");
1681 runtime.block_on(async move {
1682 let result =
1683 background_blossom_push_with_store(store, &root_string, &servers).await;
1684 let mut state = publish_state.lock().expect("root publish state");
1685 if state.upload_in_progress_root.as_ref() == Some(&root) {
1686 state.upload_in_progress_root = None;
1687 }
1688 match result {
1689 Ok(()) => {
1690 if state.pending_root.as_ref() == Some(&root) {
1691 state.last_uploaded_root = Some(root.clone());
1692 state.last_uploaded_at = Some(Instant::now());
1693 state.last_upload_failed_at = None;
1694 state.last_upload_error = None;
1695 state.missing_blob_rebuild_required = false;
1696 }
1697 info!(
1698 "Nostr mirror uploaded {} DAG to Blossom: hash={}",
1699 log_label,
1700 hex::encode(root.hash)
1701 );
1702 }
1703 Err(err) => {
1704 if state.pending_root.as_ref() == Some(&root) {
1705 state.last_upload_failed_at = Some(Instant::now());
1706 state.last_upload_error = Some(format!("{err:#}"));
1707 }
1708 if is_missing_local_blob_message(&format!("{err:#}")) {
1709 state.missing_blob_rebuild_required = true;
1710 }
1711 warn!(
1712 "Nostr mirror {} DAG upload failed: hash={} error={:#}",
1713 log_label,
1714 hex::encode(root.hash),
1715 err
1716 );
1717 }
1718 }
1719 });
1720 });
1721
1722 true
1723 }
1724
1725 async fn publish_root_event_to_relays(
1726 &self,
1727 publish_client: &Client,
1728 relays: &[String],
1729 event: &Event,
1730 ) -> Result<(Vec<String>, Vec<String>)> {
1731 let mut successful_relays = Vec::new();
1732 let mut failed_relays = Vec::new();
1733
1734 match publish_client
1735 .send_event_to(relays.iter().map(|relay| relay.as_str()), event.clone())
1736 .await
1737 {
1738 Ok(output) => {
1739 for relay in relays {
1740 let relay_url = relay.trim_end_matches('/');
1741 if output
1742 .success
1743 .iter()
1744 .any(|url| url.as_str().trim_end_matches('/') == relay_url)
1745 {
1746 successful_relays.push(relay.clone());
1747 }
1748 }
1749 failed_relays.extend(output.failed.into_iter().map(|(url, reason)| match reason {
1750 Some(reason) => format!("{url}: {reason}"),
1751 None => format!("{url}: relay rejected publish"),
1752 }));
1753 }
1754 Err(err) => {
1755 failed_relays.push(format!("publish relays: {err}"));
1756 }
1757 }
1758
1759 Ok((successful_relays, failed_relays))
1760 }
1761
1762 async fn latest_root_event_created_at(&self, tree_name: &str) -> Option<Timestamp> {
1763 let publish_client = self.publish_client.as_ref()?;
1764 let author = self.publish_pubkey?;
1765 let events = publish_client
1766 .get_events_of(
1767 vec![Self::build_public_root_filter(author, tree_name)],
1768 EventSource::relays(Some(self.config.fetch_timeout)),
1769 )
1770 .await
1771 .ok()?;
1772 events
1773 .iter()
1774 .filter(|event| Self::matches_public_root_event(event, tree_name))
1775 .max_by_key(|event| (event.created_at, event.id))
1776 .map(|event| event.created_at)
1777 }
1778
1779 fn build_public_root_filter(author: PublicKey, tree_name: &str) -> Filter {
1780 Filter::new()
1781 .kind(Kind::Custom(30078))
1782 .author(author)
1783 .custom_tag(
1784 SingleLetterTag::lowercase(Alphabet::D),
1785 vec![tree_name.to_string()],
1786 )
1787 .custom_tag(
1788 SingleLetterTag::lowercase(Alphabet::L),
1789 vec!["hashtree".to_string()],
1790 )
1791 .limit(50)
1792 }
1793
1794 fn matches_public_root_event(event: &Event, tree_name: &str) -> bool {
1795 event.kind == Kind::Custom(30078)
1796 && event.tags.iter().any(|tag| {
1797 let values = tag.as_slice();
1798 values.first().is_some_and(|value| value == "d")
1799 && values.get(1).is_some_and(|value| value == tree_name)
1800 })
1801 && event.tags.iter().any(|tag| {
1802 let values = tag.as_slice();
1803 values.first().is_some_and(|value| value == "l")
1804 && values.get(1).is_some_and(|value| value == "hashtree")
1805 })
1806 }
1807
1808 fn build_public_root_event(
1809 tree_name: &str,
1810 cid: &hashtree_core::Cid,
1811 created_at: Timestamp,
1812 ) -> EventBuilder {
1813 let mut tags = vec![
1814 Tag::identifier(tree_name.to_string()),
1815 Tag::custom(
1816 TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
1817 vec!["hashtree"],
1818 ),
1819 Tag::custom(TagKind::Custom("hash".into()), vec![hex::encode(cid.hash)]),
1820 ];
1821 if let Some(key) = cid.key {
1822 tags.push(Tag::custom(
1823 TagKind::Custom("key".into()),
1824 vec![hex::encode(key)],
1825 ));
1826 }
1827
1828 EventBuilder::new(Kind::Custom(30078), "", tags).custom_created_at(created_at)
1829 }
1830}
1831
1832fn is_missing_local_blob_push_error(error: &anyhow::Error) -> bool {
1833 error
1834 .chain()
1835 .any(|cause| cause.to_string().contains(MISSING_LOCAL_BLOB_PUSH_ERROR))
1836}
1837
1838fn is_missing_local_blob_message(message: &str) -> bool {
1839 message.contains(MISSING_LOCAL_BLOB_PUSH_ERROR)
1840}
1841
1842fn later_timestamp(left: Option<Timestamp>, right: Option<Timestamp>) -> Option<Timestamp> {
1843 match (left, right) {
1844 (Some(left), Some(right)) => Some(std::cmp::max(left, right)),
1845 (Some(left), None) => Some(left),
1846 (None, Some(right)) => Some(right),
1847 (None, None) => None,
1848 }
1849}
1850
1851fn next_replaceable_created_at(now: Timestamp, latest_existing: Option<Timestamp>) -> Timestamp {
1852 match latest_existing {
1853 Some(latest) if latest >= now => Timestamp::from_secs(latest.as_u64().saturating_add(1)),
1854 _ => now,
1855 }
1856}
1857
1858#[cfg(test)]
1859mod tests;