1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::sync::Arc;
3use std::sync::Mutex;
4use std::time::Duration;
5use std::time::Instant;
6
7use anyhow::{Context, Result};
8use hashtree_nostr::{
9 CrawlConfig, CrawlReport, ListEventsOptions, NostrBridge, NostrEventStore, RelayFetchMode,
10};
11use nostr::{
12 Alphabet, Event, EventBuilder, Filter, Kind, PublicKey, SingleLetterTag, Tag, TagKind,
13 Timestamp,
14};
15use nostr_sdk::{
16 pool::RelayLimits, prelude::RelayPoolNotification, Client, EventSource, Keys, Options,
17 RelayStatus,
18};
19use tokio::sync::{watch, Mutex as AsyncMutex};
20use tracing::{debug, info, warn};
21
22use crate::blossom_push::background_blossom_push_with_store;
23use crate::socialgraph::crawler::SOCIALGRAPH_RELAY_EVENT_MAX_SIZE;
24use crate::socialgraph::{self, SocialGraphBackend, SocialGraphStore};
25use crate::HashtreeStore;
26
27#[cfg(not(test))]
28const MIRROR_STARTUP_DELAY: Duration = Duration::from_secs(8);
29#[cfg(test)]
30const MIRROR_STARTUP_DELAY: Duration = Duration::from_millis(50);
31
32#[cfg(not(test))]
33const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_secs(1);
34#[cfg(test)]
35const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_millis(250);
36
37#[cfg(not(test))]
38const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
39#[cfg(test)]
40const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_millis(100);
41
42#[cfg(not(test))]
43const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_secs(30);
44#[cfg(test)]
45const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_millis(100);
46
47const KIND_LONG_FORM_CONTENT: u16 = 30_023;
48const DEFAULT_HISTORY_KINDS: [u16; 7] = [0, 1, 3, 6, 7, 9735, KIND_LONG_FORM_CONTENT];
49const DEFAULT_EVENT_TREE_NAME: &str = "nostr-event-index";
50const DEFAULT_PROFILE_SEARCH_TREE_NAME: &str = "profile-search";
51const DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME: &str = "profiles-by-pubkey";
52const METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 1;
53const METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE: usize = 64;
54const DEFAULT_FULL_TEXT_NOTE_HISTORY_FOLLOW_DISTANCE: u32 = 2;
55const DEFAULT_FULL_TEXT_NOTE_HISTORY_MAX_RELAY_PAGES: usize = 0;
56const LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER: usize = 8;
57const LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 16;
58const LARGE_HISTORY_SYNC_MAX_RELAY_PAGES: usize = 20;
59
60#[cfg(not(test))]
61const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_secs(300);
62#[cfg(test)]
63const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_millis(100);
64
65#[cfg(not(test))]
66const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_secs(5);
67#[cfg(test)]
68const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_millis(20);
69
70#[cfg(not(test))]
71const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_secs(30);
72#[cfg(test)]
73const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_millis(100);
74
75#[cfg(not(test))]
76const MIRROR_ROOT_UPLOAD_RETRY_INTERVAL: Duration = Duration::from_secs(60);
77#[cfg(test)]
78const MIRROR_ROOT_UPLOAD_RETRY_INTERVAL: Duration = Duration::from_millis(100);
79
80const MISSING_LOCAL_BLOB_PUSH_ERROR: &str = "missing local blob";
81
82#[derive(Debug, Clone)]
83pub struct NostrMirrorConfig {
84 pub relays: Vec<String>,
85 pub publish_relays: Vec<String>,
86 pub blossom_write_servers: Vec<String>,
87 pub max_follow_distance: u32,
88 pub overmute_threshold: f64,
89 pub author_batch_size: usize,
90 pub history_sync_author_chunk_size: usize,
91 pub history_sync_per_author_event_limit: usize,
92 pub missing_profile_backfill_batch_size: usize,
93 pub fetch_timeout: Duration,
94 pub relay_event_max_size: Option<u32>,
95 pub require_negentropy: bool,
96 pub kinds: Vec<u16>,
97 pub history_sync_on_start: bool,
98 pub history_sync_on_reconnect: bool,
99 pub full_text_note_history_follow_distance: Option<u32>,
100 pub full_text_note_history_max_relay_pages: usize,
101 pub published_event_tree_name: Option<String>,
102 pub published_profile_search_tree_name: Option<String>,
103 pub published_profiles_by_pubkey_tree_name: Option<String>,
104}
105
106impl Default for NostrMirrorConfig {
107 fn default() -> Self {
108 Self {
109 relays: Vec::new(),
110 publish_relays: Vec::new(),
111 blossom_write_servers: Vec::new(),
112 max_follow_distance: 2,
113 overmute_threshold: 1.0,
114 author_batch_size: 256,
115 history_sync_author_chunk_size: 5_000,
116 history_sync_per_author_event_limit: 256,
117 missing_profile_backfill_batch_size: 5_000,
118 fetch_timeout: Duration::from_secs(15),
119 relay_event_max_size: Some(SOCIALGRAPH_RELAY_EVENT_MAX_SIZE),
120 require_negentropy: false,
121 kinds: DEFAULT_HISTORY_KINDS.to_vec(),
122 history_sync_on_start: true,
123 history_sync_on_reconnect: true,
124 full_text_note_history_follow_distance: Some(
125 DEFAULT_FULL_TEXT_NOTE_HISTORY_FOLLOW_DISTANCE,
126 ),
127 full_text_note_history_max_relay_pages: DEFAULT_FULL_TEXT_NOTE_HISTORY_MAX_RELAY_PAGES,
128 published_event_tree_name: Some(DEFAULT_EVENT_TREE_NAME.to_string()),
129 published_profile_search_tree_name: Some(DEFAULT_PROFILE_SEARCH_TREE_NAME.to_string()),
130 published_profiles_by_pubkey_tree_name: Some(
131 DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME.to_string(),
132 ),
133 }
134 }
135}
136
137#[derive(Debug, Default)]
138struct RootPublishState {
139 pending_root: Option<hashtree_core::Cid>,
140 last_changed_at: Option<Instant>,
141 dirty_since: Option<Instant>,
142 last_published_root: Option<hashtree_core::Cid>,
143 last_published_at: Option<Instant>,
144 last_published_created_at: Option<Timestamp>,
145 last_uploaded_root: Option<hashtree_core::Cid>,
146 last_uploaded_at: Option<Instant>,
147 upload_in_progress_root: Option<hashtree_core::Cid>,
148 last_upload_failed_at: Option<Instant>,
149 last_upload_error: Option<String>,
150 missing_blob_rebuild_required: bool,
151}
152
153#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154struct HistorySyncPlan {
155 relay_fetch_mode: RelayFetchMode,
156 author_batch_size: usize,
157 per_author_event_limit: usize,
158 relay_page_size: usize,
159 max_relay_pages: usize,
160}
161
162pub struct BackgroundNostrMirror {
163 config: NostrMirrorConfig,
164 store: Arc<HashtreeStore>,
165 graph_store: Arc<SocialGraphStore>,
166 client: Client,
167 publish_client: Option<Client>,
168 publish_pubkey: Option<PublicKey>,
169 event_publish_state: Arc<Mutex<RootPublishState>>,
170 profile_search_publish_state: Arc<Mutex<RootPublishState>>,
171 profiles_by_pubkey_publish_state: Arc<Mutex<RootPublishState>>,
172 pending_live_events: Mutex<BTreeMap<String, Event>>,
173 missing_profile_cursor: Mutex<usize>,
174 history_sync_lock: AsyncMutex<()>,
175 shutdown_tx: watch::Sender<bool>,
176 shutdown_rx: watch::Receiver<bool>,
177}
178
179impl BackgroundNostrMirror {
180 pub async fn new(
181 config: NostrMirrorConfig,
182 store: Arc<HashtreeStore>,
183 graph_store: Arc<SocialGraphStore>,
184 publish_keys: Option<Keys>,
185 ) -> Result<Self> {
186 let client = if let Some(max_size) = config.relay_event_max_size {
187 let mut limits = RelayLimits::default();
188 limits.events.max_size = Some(max_size);
189 Client::with_opts(Keys::generate(), Options::new().relay_limits(limits))
190 } else {
191 Client::new(Keys::generate())
192 };
193 for relay in &config.relays {
194 client
195 .add_relay(relay)
196 .await
197 .with_context(|| format!("add mirror relay {relay}"))?;
198 }
199 client.connect().await;
200
201 let publish_pubkey = publish_keys.as_ref().map(Keys::public_key);
202 let publish_client = if let Some(keys) = publish_keys {
203 if config.publish_relays.is_empty() {
204 None
205 } else {
206 let client = Client::with_opts(keys, Options::new().wait_for_send(false));
207 for relay in &config.publish_relays {
208 client
209 .add_relay(relay)
210 .await
211 .with_context(|| format!("add mirror publish relay {relay}"))?;
212 }
213 client.connect().await;
214 Some(client)
215 }
216 } else {
217 None
218 };
219
220 let (shutdown_tx, shutdown_rx) = watch::channel(false);
221 Ok(Self {
222 config,
223 store,
224 graph_store,
225 client,
226 publish_client,
227 publish_pubkey,
228 event_publish_state: Arc::new(Mutex::new(RootPublishState::default())),
229 profile_search_publish_state: Arc::new(Mutex::new(RootPublishState::default())),
230 profiles_by_pubkey_publish_state: Arc::new(Mutex::new(RootPublishState::default())),
231 pending_live_events: Mutex::new(BTreeMap::new()),
232 missing_profile_cursor: Mutex::new(0),
233 history_sync_lock: AsyncMutex::new(()),
234 shutdown_tx,
235 shutdown_rx,
236 })
237 }
238
239 pub fn shutdown(&self) {
240 let _ = self.shutdown_tx.send(true);
241 }
242
243 fn sync_publish_roots_from_store(&self) -> Result<()> {
244 self.note_public_events_root_change()?;
245 self.note_profile_search_root_change()?;
246 self.note_profiles_by_pubkey_root_change()?;
247 Ok(())
248 }
249
250 async fn publish_pending_roots(
251 &self,
252 force_event: bool,
253 force_profile_search: bool,
254 force_profiles_by_pubkey: bool,
255 ) -> (Result<()>, Result<()>, Result<()>) {
256 tokio::join!(
257 self.maybe_publish_event_root(force_event),
258 self.maybe_publish_profile_search_root(force_profile_search),
259 self.maybe_publish_profiles_by_pubkey_root(force_profiles_by_pubkey),
260 )
261 }
262
263 async fn publish_priority_roots(
264 &self,
265 force_event: bool,
266 force_profile_search: bool,
267 force_profiles_by_pubkey: bool,
268 ) -> (Result<()>, Result<()>, Result<()>) {
269 let (profile_search_result, profiles_by_pubkey_result) = tokio::join!(
270 async {
271 if force_profile_search {
272 self.maybe_publish_profile_search_root(true).await
273 } else {
274 Ok(())
275 }
276 },
277 async {
278 if force_profiles_by_pubkey {
279 self.maybe_publish_profiles_by_pubkey_root(true).await
280 } else {
281 Ok(())
282 }
283 },
284 );
285 let event_result = if force_event {
286 self.maybe_publish_event_root(true).await
287 } else {
288 Ok(())
289 };
290 (
291 event_result,
292 profile_search_result,
293 profiles_by_pubkey_result,
294 )
295 }
296
297 pub async fn run(self: Arc<Self>) -> Result<()> {
298 if self.config.relays.is_empty() || self.config.max_follow_distance == 0 {
299 return Ok(());
300 }
301
302 info!(
303 "Nostr mirror starting: relays={} max_follow_distance={} negentropy_only={} kinds={:?} history_sync_author_chunk_size={} history_sync_on_start={} history_sync_on_reconnect={}",
304 self.config.relays.len(),
305 self.config.max_follow_distance,
306 self.config.require_negentropy,
307 self.config.kinds,
308 self.config.history_sync_author_chunk_size.max(1),
309 self.config.history_sync_on_start,
310 self.config.history_sync_on_reconnect
311 );
312
313 tokio::time::sleep(MIRROR_STARTUP_DELAY).await;
314 tokio::time::sleep(MIRROR_CONNECT_SETTLE_DELAY).await;
315 let live_since = Timestamp::now();
316 self.sync_publish_roots_from_store()?;
317 let (event_result, profile_search_result, profiles_by_pubkey_result) =
318 self.publish_priority_roots(true, true, true).await;
319 if let Err(err) = event_result {
320 warn!(
321 "Nostr mirror event-root publish failed on startup: {:#}",
322 err
323 );
324 }
325 if let Err(err) = profile_search_result {
326 warn!(
327 "Nostr mirror profile-search publish failed on startup: {:#}",
328 err
329 );
330 }
331 if let Err(err) = profiles_by_pubkey_result {
332 warn!(
333 "Nostr mirror profiles-by-pubkey publish failed on startup: {:#}",
334 err
335 );
336 }
337
338 let initial_authors = self.collect_authors()?;
339 if initial_authors.is_empty() {
340 info!("Nostr mirror: no social-graph authors to mirror yet");
341 }
342
343 let mut subscribed_authors = HashSet::new();
344 self.subscribe_authors_since(&initial_authors, live_since, &mut subscribed_authors)
345 .await?;
346
347 if !initial_authors.is_empty() && self.config.history_sync_on_start {
348 self.spawn_startup_history_sync(initial_authors.clone());
349 }
350
351 let mut relay_statuses = self.capture_relay_statuses().await;
352 let mut last_reconnect_history_sync_at: Option<Instant> = None;
353 let mut last_missing_profile_backfill_at: Option<Instant> = None;
354 let mut notifications = self.client.notifications();
355 let mut shutdown_rx = self.shutdown_rx.clone();
356 let mut refresh_interval = tokio::time::interval(MIRROR_AUTHOR_REFRESH_INTERVAL);
357 let mut publish_interval = tokio::time::interval(MIRROR_ROOT_PUBLISH_DEBOUNCE);
358
359 loop {
360 tokio::select! {
361 _ = shutdown_rx.changed() => {
362 if *shutdown_rx.borrow() {
363 break;
364 }
365 }
366 _ = refresh_interval.tick() => {
367 let authors = self.collect_authors()?;
368 let new_authors = authors
369 .into_iter()
370 .filter(|author| !subscribed_authors.contains(author))
371 .collect::<Vec<_>>();
372 if !new_authors.is_empty() {
373 debug!(
374 "Nostr mirror discovered {} newly reachable author(s)",
375 new_authors.len()
376 );
377 self.subscribe_authors_since(
378 &new_authors,
379 Timestamp::now(),
380 &mut subscribed_authors,
381 )
382 .await?;
383 self.spawn_author_history_sync(
384 "new-author catch-up",
385 new_authors.clone(),
386 true,
387 true,
388 );
389 }
390 if self.should_backfill_missing_profiles(last_missing_profile_backfill_at) {
391 let missing_profile_authors = self.collect_missing_profile_authors(
392 self.config.missing_profile_backfill_batch_size,
393 )?;
394 if !missing_profile_authors.is_empty() {
395 info!(
396 "Nostr mirror missing-profile backfill starting: authors={}",
397 missing_profile_authors.len()
398 );
399 self.spawn_missing_profile_backfill(missing_profile_authors);
400 last_missing_profile_backfill_at = Some(Instant::now());
401 }
402 }
403 }
404 _ = publish_interval.tick() => {
405 self.sync_publish_roots_from_store()?;
406 if let Err(err) = self.flush_live_events().await {
407 warn!("Nostr mirror live event flush failed: {:#}", err);
408 }
409 let (event_result, profile_search_result, profiles_by_pubkey_result) = self
410 .publish_pending_roots(false, false, false)
411 .await;
412 if let Err(err) = event_result {
413 warn!("Nostr mirror event-root publish failed: {:#}", err);
414 }
415 if let Err(err) = profile_search_result {
416 warn!("Nostr mirror profile-search publish failed: {:#}", err);
417 }
418 if let Err(err) = profiles_by_pubkey_result {
419 warn!("Nostr mirror profiles-by-pubkey publish failed: {:#}", err);
420 }
421 }
422 notification = notifications.recv() => {
423 match notification {
424 Ok(RelayPoolNotification::Event { event, .. }) => {
425 self.ingest_live_event(&event)?;
426 }
427 Ok(RelayPoolNotification::RelayStatus { relay_url, status }) => {
428 let relay_url = relay_url.to_string();
429 let previous = relay_statuses.insert(relay_url.clone(), status);
430 if Self::should_history_sync_on_reconnect(
431 self.config.history_sync_on_reconnect,
432 previous,
433 status,
434 ) && Self::should_run_reconnect_history_sync(
435 last_reconnect_history_sync_at.as_ref(),
436 )
437 {
438 let authors = self.collect_authors()?;
439 if !authors.is_empty() {
440 info!(
441 "Nostr mirror relay reconnected; running catch-up history sync: relay={} authors={} negentropy_only={}",
442 relay_url,
443 authors.len(),
444 self.config.require_negentropy
445 );
446 self.spawn_author_history_sync(
447 "relay reconnect catch-up",
448 authors,
449 false,
450 false,
451 );
452 last_reconnect_history_sync_at = Some(Instant::now());
453 }
454 }
455 }
456 Ok(RelayPoolNotification::Shutdown) => break,
457 Ok(_) => {}
458 Err(err) => {
459 warn!("Nostr mirror notification error: {}", err);
460 break;
461 }
462 }
463 }
464 }
465 }
466
467 if let Err(err) = self.flush_live_events().await {
468 warn!(
469 "Nostr mirror live event flush failed during shutdown: {:#}",
470 err
471 );
472 }
473 if let Err(err) = self.sync_publish_roots_from_store() {
474 warn!(
475 "Nostr mirror root-state refresh failed during shutdown: {:#}",
476 err
477 );
478 }
479 let (event_result, profile_search_result, profiles_by_pubkey_result) =
480 self.publish_pending_roots(true, true, true).await;
481 if let Err(err) = event_result {
482 warn!(
483 "Nostr mirror event-root publish failed during shutdown: {:#}",
484 err
485 );
486 }
487 if let Err(err) = profile_search_result {
488 warn!(
489 "Nostr mirror profile-search publish failed during shutdown: {:#}",
490 err
491 );
492 }
493 if let Err(err) = profiles_by_pubkey_result {
494 warn!(
495 "Nostr mirror profiles-by-pubkey publish failed during shutdown: {:#}",
496 err
497 );
498 }
499 let _ = self.client.disconnect().await;
500 if let Some(client) = self.publish_client.as_ref() {
501 let _ = client.disconnect().await;
502 }
503 Ok(())
504 }
505
506 fn spawn_startup_history_sync(self: &Arc<Self>, initial_authors: Vec<String>) {
507 let mirror = Arc::clone(self);
508 tokio::task::spawn_blocking(move || {
509 let runtime = tokio::runtime::Builder::new_current_thread()
510 .enable_all()
511 .build()
512 .expect("build nostr mirror startup history sync runtime");
513 runtime.block_on(async move {
514 let _guard = mirror.history_sync_lock.lock().await;
515 if let Err(err) = mirror.run_startup_history_sync(initial_authors).await {
516 warn!("Nostr mirror startup history sync failed: {:#}", err);
517 }
518 });
519 });
520 }
521
522 async fn run_startup_history_sync(&self, initial_authors: Vec<String>) -> Result<()> {
523 if self.should_backfill_missing_profiles(None) {
524 let missing_profile_authors = self
525 .collect_missing_profile_authors(self.config.missing_profile_backfill_batch_size)?;
526 if !missing_profile_authors.is_empty() {
527 info!(
528 "Nostr mirror missing-profile backfill starting: authors={}",
529 missing_profile_authors.len()
530 );
531 self.history_sync_authors_with_kinds(
532 missing_profile_authors,
533 &[Kind::Metadata.as_u16()],
534 )
535 .await?;
536 }
537 }
538 self.history_sync_recent_text_notes_for_reachable_authors()
539 .await?;
540 self.history_sync_full_text_notes_for_reachable_authors()
541 .await?;
542 self.history_sync_authors(initial_authors).await
543 }
544
545 fn spawn_author_history_sync(
546 self: &Arc<Self>,
547 label: &'static str,
548 authors: Vec<String>,
549 include_full_text_notes: bool,
550 wait_for_existing_sync: bool,
551 ) {
552 let mirror = Arc::clone(self);
553 tokio::task::spawn_blocking(move || {
554 let runtime = tokio::runtime::Builder::new_current_thread()
555 .enable_all()
556 .build()
557 .expect("build nostr mirror author history sync runtime");
558 runtime.block_on(async move {
559 if wait_for_existing_sync {
560 let _guard = mirror.history_sync_lock.lock().await;
561 if let Err(err) = mirror
562 .run_author_history_sync(authors, include_full_text_notes)
563 .await
564 {
565 warn!("Nostr mirror {label} failed: {:#}", err);
566 }
567 return;
568 }
569
570 let Ok(_guard) = mirror.history_sync_lock.try_lock() else {
571 info!("Nostr mirror {label} skipped; another history sync is running");
572 return;
573 };
574 if let Err(err) = mirror
575 .run_author_history_sync(authors, include_full_text_notes)
576 .await
577 {
578 warn!("Nostr mirror {label} failed: {:#}", err);
579 }
580 });
581 });
582 }
583
584 async fn run_author_history_sync(
585 &self,
586 authors: Vec<String>,
587 include_full_text_notes: bool,
588 ) -> Result<()> {
589 if include_full_text_notes {
590 self.history_sync_recent_text_notes_for_authors(authors.clone())
591 .await?;
592 self.history_sync_full_text_notes_for_authors(authors.clone())
593 .await?;
594 }
595 self.history_sync_authors(authors).await
596 }
597
598 fn spawn_missing_profile_backfill(self: &Arc<Self>, authors: Vec<String>) {
599 let mirror = Arc::clone(self);
600 tokio::task::spawn_blocking(move || {
601 let runtime = tokio::runtime::Builder::new_current_thread()
602 .enable_all()
603 .build()
604 .expect("build nostr mirror missing profile runtime");
605 runtime.block_on(async move {
606 let Ok(_guard) = mirror.history_sync_lock.try_lock() else {
607 info!(
608 "Nostr mirror missing-profile backfill skipped; another history sync is running"
609 );
610 return;
611 };
612 if let Err(err) = mirror
613 .history_sync_authors_with_kinds(authors, &[Kind::Metadata.as_u16()])
614 .await
615 {
616 warn!("Nostr mirror missing-profile backfill failed: {:#}", err);
617 }
618 });
619 });
620 }
621
622 async fn capture_relay_statuses(&self) -> HashMap<String, RelayStatus> {
623 let mut statuses = HashMap::new();
624 for (relay_url, relay) in self.client.relays().await {
625 statuses.insert(relay_url.to_string(), relay.status().await);
626 }
627 statuses
628 }
629
630 async fn has_connected_publish_relay(&self) -> bool {
631 let Some(client) = self.publish_client.as_ref() else {
632 return false;
633 };
634 Self::client_has_connected_relay(client).await
635 }
636
637 async fn client_has_connected_relay(client: &Client) -> bool {
638 for (_relay_url, relay) in client.relays().await {
639 if relay.status().await == RelayStatus::Connected {
640 return true;
641 }
642 }
643 false
644 }
645
646 fn collect_authors(&self) -> Result<Vec<String>> {
647 self.collect_authors_with_max_distance(self.config.max_follow_distance)
648 }
649
650 fn collect_authors_with_max_distance(&self, max_distance: u32) -> Result<Vec<String>> {
651 let mut authors = Vec::new();
652 let mut seen = HashSet::new();
653 for distance in 0..=max_distance {
654 for pubkey in socialgraph::SocialGraphBackend::users_by_follow_distance(
655 self.graph_store.as_ref(),
656 distance,
657 )
658 .with_context(|| format!("load social-graph distance {distance}"))?
659 {
660 if self
661 .graph_store
662 .is_overmuted_user(&pubkey, self.config.overmute_threshold)?
663 {
664 continue;
665 }
666 let hex = hex::encode(pubkey);
667 if seen.insert(hex.clone()) {
668 authors.push(hex);
669 }
670 }
671 }
672 Ok(authors)
673 }
674
675 fn full_text_note_history_follow_distance(&self) -> Option<u32> {
676 let distance = self.config.full_text_note_history_follow_distance?;
677 if self
678 .config
679 .kinds
680 .iter()
681 .any(|kind| *kind == Kind::TextNote.as_u16() || *kind == KIND_LONG_FORM_CONTENT)
682 {
683 Some(distance.min(self.config.max_follow_distance))
684 } else {
685 None
686 }
687 }
688
689 fn full_text_note_history_max_relay_pages(&self) -> Option<usize> {
690 Self::full_text_note_history_max_relay_pages_for_config(&self.config)
691 }
692
693 fn full_text_note_history_max_relay_pages_for_config(
694 config: &NostrMirrorConfig,
695 ) -> Option<usize> {
696 let pages = config.full_text_note_history_max_relay_pages;
697 if pages == 0 {
698 None
699 } else {
700 Some(pages)
701 }
702 }
703
704 fn is_text_content_history_kind(kind: u16) -> bool {
705 kind == Kind::TextNote.as_u16() || kind == KIND_LONG_FORM_CONTENT
706 }
707
708 fn history_sync_kinds_for_config(config: &NostrMirrorConfig) -> Vec<u16> {
709 let mut kinds = config.kinds.clone();
710 if Self::full_text_note_history_max_relay_pages_for_config(config).is_none() {
711 kinds.retain(|kind| !Self::is_text_content_history_kind(*kind));
712 }
713 kinds
714 }
715
716 fn collect_missing_profile_authors(&self, limit: usize) -> Result<Vec<String>> {
717 if limit == 0 {
718 return Ok(Vec::new());
719 }
720
721 let authors = self.collect_authors()?;
722 if authors.is_empty() {
723 return Ok(Vec::new());
724 }
725
726 let mut cursor = self
727 .missing_profile_cursor
728 .lock()
729 .expect("missing profile cursor");
730 let mut index = (*cursor).min(authors.len());
731 let mut scanned = 0usize;
732 let mut missing = Vec::new();
733
734 while scanned < authors.len() && missing.len() < limit {
735 let author = &authors[index];
736 if self.graph_store.latest_profile_event(author)?.is_none() {
737 missing.push(author.clone());
738 }
739 index += 1;
740 if index == authors.len() {
741 index = 0;
742 }
743 scanned += 1;
744 }
745
746 *cursor = index;
747 Ok(missing)
748 }
749
750 fn should_backfill_missing_profiles(&self, last_run: Option<Instant>) -> bool {
751 if self.config.missing_profile_backfill_batch_size == 0
752 || !self.config.kinds.contains(&Kind::Metadata.as_u16())
753 {
754 return false;
755 }
756 match last_run {
757 Some(last_run) => last_run.elapsed() >= MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL,
758 None => true,
759 }
760 }
761
762 fn should_history_sync_on_reconnect(
763 history_sync_on_reconnect: bool,
764 previous: Option<RelayStatus>,
765 status: RelayStatus,
766 ) -> bool {
767 history_sync_on_reconnect
768 && status == RelayStatus::Connected
769 && matches!(
770 previous,
771 Some(
772 RelayStatus::Initialized
773 | RelayStatus::Pending
774 | RelayStatus::Connecting
775 | RelayStatus::Disconnected
776 | RelayStatus::Terminated
777 )
778 )
779 }
780
781 fn should_run_reconnect_history_sync(last_run: Option<&Instant>) -> bool {
782 match last_run {
783 None => true,
784 Some(last_run) => last_run.elapsed() >= MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN,
785 }
786 }
787
788 fn is_metadata_only_history_sync(kinds: &[u16]) -> bool {
789 !kinds.is_empty() && kinds.iter().all(|kind| *kind == Kind::Metadata.as_u16())
790 }
791
792 fn history_sync_kinds_affect_profile_or_graph(kinds: &[u16]) -> bool {
793 kinds.is_empty()
794 || kinds.iter().any(|kind| {
795 *kind == Kind::Metadata.as_u16()
796 || *kind == Kind::ContactList.as_u16()
797 || *kind == Kind::MuteList.as_u16()
798 })
799 }
800
801 fn history_sync_plan_for(
802 config: &NostrMirrorConfig,
803 authors: usize,
804 kinds: &[u16],
805 ) -> HistorySyncPlan {
806 let author_batch_size = config.author_batch_size.max(1);
807 let per_author_event_limit = config.history_sync_per_author_event_limit.max(1);
808 let relay_page_size = 1_000;
809 let max_relay_pages = 10;
810
811 if Self::is_metadata_only_history_sync(kinds) {
812 return HistorySyncPlan {
813 relay_fetch_mode: RelayFetchMode::AuthorBatches,
814 author_batch_size: author_batch_size.min(METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE),
815 per_author_event_limit: METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT,
816 relay_page_size,
817 max_relay_pages,
818 };
819 }
820
821 if authors > author_batch_size.saturating_mul(LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER) {
822 return HistorySyncPlan {
823 relay_fetch_mode: RelayFetchMode::GlobalRecent,
824 author_batch_size,
825 per_author_event_limit: per_author_event_limit
826 .min(LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT)
827 .max(1),
828 relay_page_size,
829 max_relay_pages: LARGE_HISTORY_SYNC_MAX_RELAY_PAGES,
830 };
831 }
832
833 HistorySyncPlan {
834 relay_fetch_mode: RelayFetchMode::AuthorBatches,
835 author_batch_size,
836 per_author_event_limit,
837 relay_page_size,
838 max_relay_pages,
839 }
840 }
841
842 fn history_sync_plan(&self, authors: usize, kinds: &[u16]) -> HistorySyncPlan {
843 Self::history_sync_plan_for(&self.config, authors, kinds)
844 }
845
846 fn history_sync_chunk_size_for_config(
847 config: &NostrMirrorConfig,
848 authors: usize,
849 kinds: &[u16],
850 full_author_history: bool,
851 chunk_size_override: Option<usize>,
852 ) -> usize {
853 let configured_chunk_size = chunk_size_override
854 .unwrap_or(config.history_sync_author_chunk_size)
855 .max(1);
856 if !full_author_history
857 && Self::history_sync_plan_for(config, authors, kinds).relay_fetch_mode
858 == RelayFetchMode::GlobalRecent
859 {
860 return authors.max(1);
861 }
862 configured_chunk_size
863 }
864
865 async fn history_sync_authors(&self, authors: Vec<String>) -> Result<()> {
866 let kinds = Self::history_sync_kinds_for_config(&self.config);
867 if kinds.is_empty() {
868 info!("Nostr mirror history sync skipped: no enabled history kinds");
869 return Ok(());
870 }
871 self.history_sync_authors_with_kinds(authors, &kinds).await
872 }
873
874 async fn history_sync_authors_with_kinds(
875 &self,
876 authors: Vec<String>,
877 kinds: &[u16],
878 ) -> Result<()> {
879 self.history_sync_authors_with_kinds_and_mode(authors, kinds, false, None)
880 .await
881 }
882
883 async fn history_sync_full_text_notes_for_reachable_authors(&self) -> Result<()> {
884 let Some(distance) = self.full_text_note_history_follow_distance() else {
885 return Ok(());
886 };
887 if self.full_text_note_history_max_relay_pages().is_none() {
888 info!("Nostr mirror full text content history sync skipped: max_relay_pages=0");
889 return Ok(());
890 }
891 info!(
892 "Nostr mirror full text content history author collection starting: max_follow_distance={distance}"
893 );
894 let authors = self.collect_authors_with_max_distance(distance)?;
895 self.history_sync_full_text_notes_for_authors(authors).await
896 }
897
898 async fn history_sync_recent_text_notes_for_reachable_authors(&self) -> Result<()> {
899 let Some(distance) = self.full_text_note_history_follow_distance() else {
900 return Ok(());
901 };
902 if self.full_text_note_history_max_relay_pages().is_none() {
903 info!("Nostr mirror startup text content history sync skipped: max_relay_pages=0");
904 return Ok(());
905 }
906 info!(
907 "Nostr mirror recent text content catch-up author collection starting: max_follow_distance={distance}"
908 );
909 let authors = self.collect_authors_with_max_distance(distance)?;
910 self.history_sync_recent_text_notes_for_authors(authors)
911 .await
912 }
913
914 async fn history_sync_recent_text_notes_for_authors(&self, authors: Vec<String>) -> Result<()> {
915 if authors.is_empty()
916 || self.full_text_note_history_follow_distance().is_none()
917 || self.full_text_note_history_max_relay_pages().is_none()
918 {
919 return Ok(());
920 }
921
922 info!(
923 "Nostr mirror recent text content catch-up starting: authors={}",
924 authors.len()
925 );
926 let kinds = [Kind::TextNote.as_u16(), KIND_LONG_FORM_CONTENT];
927 let chunk_size = self
928 .config
929 .author_batch_size
930 .max(1)
931 .saturating_mul(LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER + 1);
932 self.history_sync_authors_chunked(
933 authors,
934 |current_root, author_chunk| async move {
935 self.history_sync_author_chunk(current_root, author_chunk, &kinds, false, None)
936 .await
937 },
938 false,
939 Some(chunk_size),
940 )
941 .await
942 }
943
944 async fn history_sync_full_text_notes_for_authors(&self, authors: Vec<String>) -> Result<()> {
945 let Some(distance) = self.full_text_note_history_follow_distance() else {
946 return Ok(());
947 };
948 let Some(max_relay_pages) = self.full_text_note_history_max_relay_pages() else {
949 info!("Nostr mirror full text content history sync skipped: max_relay_pages=0");
950 return Ok(());
951 };
952 let mut close_authors = Vec::new();
953 for author in authors {
954 let Ok(pubkey) = hex::decode(&author) else {
955 continue;
956 };
957 let Ok(pubkey) = <[u8; 32]>::try_from(pubkey.as_slice()) else {
958 continue;
959 };
960 if self
961 .graph_store
962 .follow_distance(&pubkey)?
963 .is_some_and(|actual_distance| actual_distance <= distance)
964 {
965 close_authors.push(author);
966 }
967 }
968 if close_authors.is_empty() {
969 return Ok(());
970 }
971
972 info!(
973 "Nostr mirror full text content history sync starting: authors={} max_follow_distance={} max_relay_pages={}",
974 close_authors.len(),
975 distance,
976 max_relay_pages
977 );
978 let kinds = [Kind::TextNote.as_u16(), KIND_LONG_FORM_CONTENT];
979 self.history_sync_authors_with_kinds_and_mode(
980 close_authors,
981 &kinds,
982 true,
983 Some(max_relay_pages),
984 )
985 .await
986 }
987
988 async fn history_sync_authors_with_kinds_and_mode(
989 &self,
990 authors: Vec<String>,
991 kinds: &[u16],
992 full_author_history: bool,
993 max_relay_pages: Option<usize>,
994 ) -> Result<()> {
995 let update_profile_and_graph = Self::history_sync_kinds_affect_profile_or_graph(kinds);
996 let chunk_size = Self::history_sync_chunk_size_for_config(
997 &self.config,
998 authors.len(),
999 kinds,
1000 full_author_history,
1001 None,
1002 );
1003 self.history_sync_authors_chunked(
1004 authors,
1005 |current_root, author_chunk| async move {
1006 self.history_sync_author_chunk(
1007 current_root,
1008 author_chunk,
1009 kinds,
1010 full_author_history,
1011 max_relay_pages,
1012 )
1013 .await
1014 },
1015 update_profile_and_graph,
1016 Some(chunk_size),
1017 )
1018 .await
1019 }
1020
1021 async fn history_sync_authors_chunked<F, Fut>(
1022 &self,
1023 authors: Vec<String>,
1024 mut run_chunk: F,
1025 update_profile_and_graph: bool,
1026 chunk_size_override: Option<usize>,
1027 ) -> Result<()>
1028 where
1029 F: FnMut(Option<hashtree_core::Cid>, Vec<String>) -> Fut,
1030 Fut: std::future::Future<Output = Result<CrawlReport>>,
1031 {
1032 if authors.is_empty() {
1033 return Ok(());
1034 }
1035
1036 info!(
1037 "Nostr mirror history sync starting: authors={} relays={} negentropy_only={}",
1038 authors.len(),
1039 self.config.relays.len(),
1040 self.config.require_negentropy
1041 );
1042
1043 let mut current_root = self.graph_store.public_events_root_for_write()?;
1044 let mut last_error = None;
1045 let mut applied_chunks = 0usize;
1046 let mut failed_chunks = 0usize;
1047 let chunk_size = chunk_size_override
1048 .unwrap_or(self.config.history_sync_author_chunk_size)
1049 .max(1);
1050 let total_chunks = authors.len().div_ceil(chunk_size);
1051
1052 for (chunk_index, author_chunk) in authors.chunks(chunk_size).enumerate() {
1053 let author_chunk = author_chunk.to_vec();
1054 let author_count = author_chunk.len();
1055 info!(
1056 "Nostr mirror history sync chunk starting: chunk={}/{} authors={}",
1057 chunk_index + 1,
1058 total_chunks,
1059 author_count
1060 );
1061 let mut report = match run_chunk(current_root.clone(), author_chunk.clone()).await {
1062 Ok(report) => report,
1063 Err(err) => {
1064 failed_chunks = failed_chunks.saturating_add(1);
1065 warn!(
1066 "Nostr mirror history sync chunk failed: chunk={}/{} authors={} error={:#}",
1067 chunk_index + 1,
1068 total_chunks,
1069 author_count,
1070 err
1071 );
1072 last_error = Some(err);
1073 continue;
1074 }
1075 };
1076
1077 let latest_root = self.graph_store.public_events_root_for_write()?;
1078 if latest_root != current_root {
1079 info!(
1080 "Nostr mirror history sync root advanced while chunk was fetching; merging chunk into latest root: chunk={}/{} authors={} events_applied={}",
1081 chunk_index + 1,
1082 total_chunks,
1083 author_count,
1084 report.applied_events.len()
1085 );
1086 if report.applied_events.is_empty() {
1087 report.root = latest_root.clone();
1088 } else {
1089 let event_store = NostrEventStore::new(self.store.store_arc());
1090 report.root = event_store
1091 .build(latest_root.as_ref(), report.applied_events.clone())
1092 .await
1093 .context("merge history chunk into latest mirrored event root")?;
1094 }
1095 current_root = latest_root;
1096 }
1097
1098 if report.root != current_root {
1099 self.apply_history_root_with_options(
1100 report.root.as_ref(),
1101 update_profile_and_graph,
1102 true,
1103 Some(&report.applied_events),
1104 )
1105 .await?;
1106 current_root = report.root.clone();
1107 info!(
1108 "Nostr mirror history sync updated trusted root: chunk={}/{} authors_processed={} events_selected={} events_seen={}",
1109 chunk_index + 1,
1110 total_chunks,
1111 report.authors_processed,
1112 report.events_selected,
1113 report.events_seen
1114 );
1115 }
1116 applied_chunks = applied_chunks.saturating_add(1);
1117 }
1118
1119 if applied_chunks == 0 {
1120 return Err(last_error
1121 .unwrap_or_else(|| anyhow::anyhow!("mirror history sync made no progress"))
1122 .context("run mirror history sync"));
1123 }
1124 if failed_chunks > 0 {
1125 warn!(
1126 "Nostr mirror history sync completed with skipped chunks: applied_chunks={} failed_chunks={}",
1127 applied_chunks, failed_chunks
1128 );
1129 }
1130 Ok(())
1131 }
1132
1133 async fn history_sync_author_chunk(
1134 &self,
1135 current_root: Option<hashtree_core::Cid>,
1136 authors: Vec<String>,
1137 kinds: &[u16],
1138 full_author_history: bool,
1139 max_relay_pages: Option<usize>,
1140 ) -> Result<CrawlReport> {
1141 let mut last_error = None;
1142 let mut report = None;
1143 let mut plan = self.history_sync_plan(authors.len(), kinds);
1144 if full_author_history {
1145 plan.relay_fetch_mode = RelayFetchMode::AuthorBatches;
1146 plan.max_relay_pages = max_relay_pages.unwrap_or(plan.max_relay_pages);
1147 }
1148 for attempt in 0..3 {
1149 let mut last_logged_authors = 0usize;
1150 let bridge = NostrBridge::new(
1151 self.store.store_arc(),
1152 CrawlConfig {
1153 relays: self.config.relays.clone(),
1154 author_allowlist: Some(authors.clone()),
1155 max_live_bytes: None,
1156 max_events_seen: None,
1157 max_authors: None,
1158 max_follow_distance: None,
1159 author_batch_size: plan.author_batch_size,
1160 per_author_event_limit: plan.per_author_event_limit,
1161 per_author_live_bytes: None,
1162 fetch_timeout: self.config.fetch_timeout,
1163 kinds: Some(kinds.to_vec()),
1164 relay_fetch_mode: plan.relay_fetch_mode,
1165 require_negentropy: self.config.require_negentropy,
1166 relay_event_max_size: self.config.relay_event_max_size,
1167 relay_page_size: plan.relay_page_size,
1168 max_relay_pages: plan.max_relay_pages,
1169 full_author_history,
1170 },
1171 );
1172
1173 match bridge
1174 .crawl_with_progress(self.graph_store.as_ref(), current_root.as_ref(), |progress| {
1175 let log_interval = self.config.author_batch_size.saturating_mul(8).max(2_048);
1176 let should_log = progress.authors_processed == progress.authors_considered
1177 || progress.authors_processed == 0
1178 || progress
1179 .authors_processed
1180 .saturating_sub(last_logged_authors)
1181 >= log_interval;
1182 if should_log {
1183 last_logged_authors = progress.authors_processed;
1184 info!(
1185 "Nostr mirror history sync progress: authors_processed={}/{} events_selected={} events_seen={}",
1186 progress.authors_processed,
1187 progress.authors_considered,
1188 progress.events_selected,
1189 progress.events_seen
1190 );
1191 }
1192 })
1193 .await
1194 {
1195 Ok(next_report) => {
1196 report = Some(next_report);
1197 break;
1198 }
1199 Err(err) => {
1200 last_error = Some(err);
1201 if attempt < 2 {
1202 tokio::time::sleep(Duration::from_millis(500)).await;
1203 }
1204 }
1205 }
1206 }
1207 report
1208 .ok_or_else(|| last_error.expect("history sync retry captured error"))
1209 .context("run mirror history sync")
1210 }
1211
1212 #[cfg(test)]
1213 async fn apply_history_root(&self, root: Option<&hashtree_core::Cid>) -> Result<()> {
1214 self.apply_history_root_with_options(root, true, true, None)
1215 .await
1216 }
1217
1218 async fn apply_history_root_with_options(
1219 &self,
1220 root: Option<&hashtree_core::Cid>,
1221 update_profile_and_graph: bool,
1222 publish_roots: bool,
1223 applied_events: Option<&[hashtree_nostr::StoredNostrEvent]>,
1224 ) -> Result<()> {
1225 self.graph_store.write_public_events_root(root)?;
1226 let Some(root) = root else {
1227 return Ok(());
1228 };
1229
1230 self.note_public_events_root_change()?;
1231 if update_profile_and_graph {
1232 let events = match applied_events {
1233 Some(events) => events
1234 .iter()
1235 .cloned()
1236 .map(socialgraph::stored_event_to_nostr_event)
1237 .collect::<Result<Vec<_>>>()?,
1238 None => {
1239 let event_store = NostrEventStore::new(self.store.store_arc());
1240 event_store
1241 .list_recent_lossy(Some(root), ListEventsOptions::default())
1242 .await
1243 .context("list trusted mirrored events")?
1244 .into_iter()
1245 .map(socialgraph::stored_event_to_nostr_event)
1246 .collect::<Result<Vec<_>>>()?
1247 }
1248 };
1249
1250 socialgraph::ingest_graph_parsed_events(self.graph_store.as_ref(), &events)
1251 .context("sync mirrored social graph state")?;
1252 if applied_events.is_some() {
1253 self.graph_store
1254 .sync_profile_index_for_events(&events)
1255 .context("update mirrored profile search index")?;
1256 } else {
1257 self.graph_store
1258 .rebuild_profile_index_for_events(&events)
1259 .context("rebuild mirrored profile search index")?;
1260 }
1261 self.note_profile_search_root_change()?;
1262 self.note_profiles_by_pubkey_root_change()?;
1263 }
1264 if !publish_roots {
1265 return Ok(());
1266 }
1267 let (event_result, profile_search_result, profiles_by_pubkey_result) = self
1268 .publish_priority_roots(true, update_profile_and_graph, update_profile_and_graph)
1269 .await;
1270 if let Err(err) = event_result {
1271 warn!(
1272 "Nostr mirror event-root publish failed after root update: {:#}",
1273 err
1274 );
1275 }
1276 if let Err(err) = profile_search_result {
1277 warn!(
1278 "Nostr mirror profile-search publish failed after root update: {:#}",
1279 err
1280 );
1281 }
1282 if let Err(err) = profiles_by_pubkey_result {
1283 warn!(
1284 "Nostr mirror profiles-by-pubkey publish failed after root update: {:#}",
1285 err
1286 );
1287 }
1288 Ok(())
1289 }
1290
1291 async fn subscribe_authors_since(
1292 &self,
1293 authors: &[String],
1294 since: Timestamp,
1295 subscribed_authors: &mut HashSet<String>,
1296 ) -> Result<()> {
1297 let new_authors = authors
1298 .iter()
1299 .filter(|author| !subscribed_authors.contains(*author))
1300 .cloned()
1301 .collect::<Vec<_>>();
1302 if new_authors.is_empty() {
1303 return Ok(());
1304 }
1305
1306 for chunk in new_authors.chunks(self.config.author_batch_size.max(1)) {
1307 let pubkeys = chunk
1308 .iter()
1309 .filter_map(|author| PublicKey::from_hex(author).ok())
1310 .collect::<Vec<_>>();
1311 if pubkeys.is_empty() {
1312 continue;
1313 }
1314
1315 let filter = Filter::new()
1316 .authors(pubkeys)
1317 .kinds(self.config.kinds.iter().copied().map(Kind::from))
1318 .since(since);
1319
1320 if let Err(err) = self.client.subscribe(vec![filter], None).await {
1321 warn!(
1322 "Nostr mirror author subscription failed: authors={} error={:#}",
1323 chunk.len(),
1324 err
1325 );
1326 continue;
1327 }
1328 subscribed_authors.extend(chunk.iter().cloned());
1329 }
1330 Ok(())
1331 }
1332
1333 fn ingest_live_event(&self, event: &Event) -> Result<()> {
1334 self.pending_live_events
1335 .lock()
1336 .expect("pending live events")
1337 .insert(event.id.to_hex(), event.clone());
1338 Ok(())
1339 }
1340
1341 async fn flush_live_events(&self) -> Result<()> {
1342 let pending = {
1343 let mut pending = self
1344 .pending_live_events
1345 .lock()
1346 .expect("pending live events");
1347 if pending.is_empty() {
1348 return Ok(());
1349 }
1350 std::mem::take(&mut *pending)
1351 };
1352 let events = pending.into_values().collect::<Vec<_>>();
1353 let event_count = events.len();
1354 let previous_event_root = self.graph_store.public_events_root()?;
1355 let previous_profile_search_root = self.graph_store.profile_search_root()?;
1356 let previous_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
1357
1358 socialgraph::ingest_parsed_events_with_storage_class(
1359 self.graph_store.as_ref(),
1360 &events,
1361 socialgraph::EventStorageClass::Public,
1362 )
1363 .context("ingest live mirrored event batch")?;
1364
1365 let next_event_root = self.graph_store.public_events_root()?;
1366 let next_profile_search_root = self.graph_store.profile_search_root()?;
1367 let next_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
1368 let event_root_changed = next_event_root != previous_event_root;
1369 let profile_search_root_changed = next_profile_search_root != previous_profile_search_root;
1370 let profiles_by_pubkey_root_changed =
1371 next_profiles_by_pubkey_root != previous_profiles_by_pubkey_root;
1372
1373 if event_root_changed {
1374 self.note_public_events_root_change()?;
1375 }
1376 if profile_search_root_changed {
1377 self.note_profile_search_root_change()?;
1378 }
1379 if profiles_by_pubkey_root_changed {
1380 self.note_profiles_by_pubkey_root_change()?;
1381 }
1382 if profile_search_root_changed {
1383 self.maybe_publish_profile_search_root(true).await?;
1384 }
1385 if profiles_by_pubkey_root_changed {
1386 self.maybe_publish_profiles_by_pubkey_root(true).await?;
1387 }
1388 if event_root_changed {
1389 self.maybe_publish_event_root(true).await?;
1390 }
1391 info!(
1392 "Nostr mirror flushed live events: events={} event_root_changed={} profile_search_root_changed={} profiles_by_pubkey_root_changed={}",
1393 event_count,
1394 event_root_changed,
1395 profile_search_root_changed,
1396 profiles_by_pubkey_root_changed
1397 );
1398 Ok(())
1399 }
1400
1401 fn note_public_events_root_change(&self) -> Result<()> {
1402 let root = self.graph_store.public_events_root()?;
1403 Self::note_root_change(
1404 self.config.published_event_tree_name.as_deref(),
1405 &self.event_publish_state,
1406 root,
1407 )
1408 }
1409
1410 fn note_profile_search_root_change(&self) -> Result<()> {
1411 let root = self.graph_store.profile_search_root()?;
1412 Self::note_root_change(
1413 self.config.published_profile_search_tree_name.as_deref(),
1414 &self.profile_search_publish_state,
1415 root,
1416 )
1417 }
1418
1419 fn note_profiles_by_pubkey_root_change(&self) -> Result<()> {
1420 let root = self.graph_store.profiles_by_pubkey_root()?;
1421 Self::note_root_change(
1422 self.config
1423 .published_profiles_by_pubkey_tree_name
1424 .as_deref(),
1425 &self.profiles_by_pubkey_publish_state,
1426 root,
1427 )
1428 }
1429
1430 fn note_root_change(
1431 tree_name: Option<&str>,
1432 publish_state: &Arc<Mutex<RootPublishState>>,
1433 root: Option<hashtree_core::Cid>,
1434 ) -> Result<()> {
1435 let Some(_tree_name) = tree_name else {
1436 return Ok(());
1437 };
1438
1439 let mut state = publish_state.lock().expect("root publish state");
1440 let now = Instant::now();
1441
1442 if state.pending_root == root {
1443 return Ok(());
1444 }
1445
1446 state.pending_root = root;
1447 state.last_upload_failed_at = None;
1448 state.last_upload_error = None;
1449 state.last_changed_at = Some(now);
1450 if state.dirty_since.is_none() {
1451 state.dirty_since = Some(now);
1452 }
1453 Ok(())
1454 }
1455
1456 async fn maybe_publish_event_root(&self, force: bool) -> Result<()> {
1457 self.ensure_public_events_root_is_publishable().await?;
1458 if self.take_missing_blob_event_upload_error() {
1459 return self.rebuild_event_indexes_after_missing_blobs(force).await;
1460 }
1461 let result = self
1462 .maybe_publish_root(
1463 self.config.published_event_tree_name.as_deref(),
1464 &self.event_publish_state,
1465 "event root",
1466 force,
1467 )
1468 .await;
1469 let Err(error) = result else {
1470 if self.take_missing_blob_event_upload_error() {
1471 return self.rebuild_event_indexes_after_missing_blobs(force).await;
1472 }
1473 return Ok(());
1474 };
1475 if !is_missing_local_blob_push_error(&error) {
1476 return Err(error);
1477 }
1478
1479 self.rebuild_event_indexes_after_missing_blobs(force).await
1480 }
1481
1482 fn take_missing_blob_event_upload_error(&self) -> bool {
1483 let mut state = self
1484 .event_publish_state
1485 .lock()
1486 .expect("event root publish state");
1487 if state.upload_in_progress_root.is_some() {
1488 return false;
1489 }
1490 if !state.missing_blob_rebuild_required {
1491 return false;
1492 }
1493 state.missing_blob_rebuild_required = false;
1494 state.last_upload_error = None;
1495 state.last_upload_failed_at = None;
1496 true
1497 }
1498
1499 async fn rebuild_event_indexes_after_missing_blobs(&self, force: bool) -> Result<()> {
1500 warn!(
1501 "Nostr mirror event root DAG references missing local blobs; rebuilding event indexes from stored events"
1502 );
1503 let (public_count, ambient_count) = self
1504 .graph_store
1505 .rebuild_event_indexes_from_stored_events_async()
1506 .await
1507 .context("rebuild event indexes after missing event blobs")?;
1508 info!(
1509 "Nostr mirror rebuilt event indexes after missing blobs: public={} ambient={}",
1510 public_count, ambient_count
1511 );
1512 self.sync_publish_roots_from_store()?;
1513
1514 self.maybe_publish_root(
1515 self.config.published_event_tree_name.as_deref(),
1516 &self.event_publish_state,
1517 "event root",
1518 force,
1519 )
1520 .await
1521 }
1522
1523 async fn ensure_public_events_root_is_publishable(&self) -> Result<()> {
1524 let Some(root) = self.graph_store.public_events_root()? else {
1525 return Ok(());
1526 };
1527 let event_store = NostrEventStore::new(self.store.store_arc());
1528 if let Err(err) = event_store.validate_index_root(Some(&root)).await {
1529 warn!(
1530 "Nostr mirror refusing to publish invalid event index root {}; clearing trusted root: {}",
1531 hex::encode(root.hash),
1532 err
1533 );
1534 self.graph_store.write_public_events_root(None)?;
1535 self.note_public_events_root_change()?;
1536 }
1537 Ok(())
1538 }
1539
1540 async fn maybe_publish_profile_search_root(&self, force: bool) -> Result<()> {
1541 self.maybe_publish_root(
1542 self.config.published_profile_search_tree_name.as_deref(),
1543 &self.profile_search_publish_state,
1544 "profile search root",
1545 force,
1546 )
1547 .await
1548 }
1549
1550 async fn maybe_publish_profiles_by_pubkey_root(&self, force: bool) -> Result<()> {
1551 self.maybe_publish_root(
1552 self.config
1553 .published_profiles_by_pubkey_tree_name
1554 .as_deref(),
1555 &self.profiles_by_pubkey_publish_state,
1556 "profiles-by-pubkey root",
1557 force,
1558 )
1559 .await
1560 }
1561
1562 async fn maybe_publish_root(
1563 &self,
1564 tree_name: Option<&str>,
1565 publish_state: &Arc<Mutex<RootPublishState>>,
1566 log_label: &str,
1567 force: bool,
1568 ) -> Result<()> {
1569 let Some(tree_name) = tree_name else {
1570 return Ok(());
1571 };
1572
1573 let pending_root = {
1574 let state = publish_state.lock().expect("root publish state");
1575 let Some(pending_root) = state.pending_root.clone() else {
1576 return Ok(());
1577 };
1578
1579 let now = Instant::now();
1580 let debounce_ready = state.last_changed_at.is_some_and(|changed_at| {
1581 now.duration_since(changed_at) >= MIRROR_ROOT_PUBLISH_DEBOUNCE
1582 });
1583 let stale_ready = state.dirty_since.is_some_and(|dirty_since| {
1584 now.duration_since(dirty_since) >= MIRROR_ROOT_PUBLISH_MAX_STALENESS
1585 });
1586 if !force && !debounce_ready && !stale_ready {
1587 return Ok(());
1588 }
1589
1590 pending_root
1591 };
1592
1593 let upload_started =
1594 self.maybe_start_background_root_upload(&pending_root, publish_state, log_label);
1595 let upload_required = !self.config.blossom_write_servers.is_empty();
1596 let upload_ready = {
1597 let state = publish_state.lock().expect("root publish state");
1598 !upload_required || state.last_uploaded_root.as_ref() == Some(&pending_root)
1599 };
1600 let publish_before_upload_ready = force && upload_required && !upload_ready;
1601
1602 let mut successful_relays = Vec::new();
1603 let mut failed_relays = Vec::new();
1604 let mut published_now = false;
1605 let publish_required =
1606 self.publish_client.is_some() && !self.config.publish_relays.is_empty();
1607 if publish_required {
1608 let Some(publish_client) = self.publish_client.as_ref() else {
1609 unreachable!("publish_required implies publish_client");
1610 };
1611 if !self.has_connected_publish_relay().await {
1612 return Ok(());
1613 }
1614 if !upload_ready && !publish_before_upload_ready {
1615 if upload_started {
1616 info!(
1617 "Nostr mirror uploading {} DAG before publish: tree={} hash={}",
1618 log_label,
1619 tree_name,
1620 hex::encode(pending_root.hash),
1621 );
1622 }
1623 return Ok(());
1624 }
1625 if publish_before_upload_ready {
1626 info!(
1627 "Nostr mirror publishing {} before Blossom upload completes: tree={} hash={}",
1628 log_label,
1629 tree_name,
1630 hex::encode(pending_root.hash),
1631 );
1632 }
1633
1634 let already_published = {
1635 let state = publish_state.lock().expect("root publish state");
1636 state.last_published_root.as_ref() == Some(&pending_root)
1637 };
1638 if !already_published {
1639 let publish_relays = self.config.publish_relays.clone();
1640 let latest_known_created_at = {
1641 let state = publish_state.lock().expect("root publish state");
1642 state.last_published_created_at
1643 };
1644 let publish_created_at = next_replaceable_created_at(
1645 Timestamp::now(),
1646 later_timestamp(
1647 latest_known_created_at,
1648 self.latest_root_event_created_at(tree_name).await,
1649 ),
1650 );
1651 let event = publish_client
1652 .sign_event_builder(Self::build_public_root_event(
1653 tree_name,
1654 &pending_root,
1655 publish_created_at,
1656 ))
1657 .await
1658 .with_context(|| format!("sign {log_label} event"))?;
1659 let publish_result = self
1660 .publish_root_event_to_relays(publish_client, &publish_relays, &event)
1661 .await
1662 .with_context(|| format!("publish {log_label} event"))?;
1663 successful_relays = publish_result.0;
1664 failed_relays = publish_result.1;
1665 if successful_relays.is_empty() {
1666 let failure_summary = if failed_relays.is_empty() {
1667 "no publish relays accepted the event".to_string()
1668 } else {
1669 failed_relays.join("; ")
1670 };
1671 anyhow::bail!("no publish relays accepted the event ({failure_summary})");
1672 }
1673
1674 let mut state = publish_state.lock().expect("root publish state");
1675 if state.pending_root.as_ref() == Some(&pending_root) {
1676 state.last_published_root = Some(pending_root.clone());
1677 state.last_published_at = Some(Instant::now());
1678 state.last_published_created_at = Some(event.created_at);
1679 }
1680 published_now = true;
1681 }
1682 }
1683
1684 {
1685 let mut state = publish_state.lock().expect("root publish state");
1686 if state.pending_root.as_ref() == Some(&pending_root) {
1687 let upload_satisfied = self.config.blossom_write_servers.is_empty()
1688 || state.last_uploaded_root.as_ref() == Some(&pending_root);
1689 let publish_satisfied =
1690 !publish_required || state.last_published_root.as_ref() == Some(&pending_root);
1691 if upload_satisfied && publish_satisfied {
1692 state.dirty_since = None;
1693 }
1694 }
1695 }
1696
1697 if published_now {
1698 info!(
1699 "Nostr mirror published {}: tree={} hash={} relays={:?}",
1700 log_label,
1701 tree_name,
1702 hex::encode(pending_root.hash),
1703 successful_relays,
1704 );
1705 }
1706 if !failed_relays.is_empty() {
1707 warn!(
1708 "Nostr mirror publish had relay failures: tree={} failures={:?}",
1709 tree_name, failed_relays
1710 );
1711 }
1712 Ok(())
1713 }
1714
1715 fn maybe_start_background_root_upload(
1716 &self,
1717 pending_root: &hashtree_core::Cid,
1718 publish_state: &Arc<Mutex<RootPublishState>>,
1719 log_label: &str,
1720 ) -> bool {
1721 if self.config.blossom_write_servers.is_empty() {
1722 return false;
1723 }
1724
1725 {
1726 let mut state = publish_state.lock().expect("root publish state");
1727 if state.last_uploaded_root.as_ref() == Some(pending_root)
1728 || state.upload_in_progress_root.is_some()
1729 {
1730 return false;
1731 }
1732 if state
1733 .last_upload_failed_at
1734 .is_some_and(|failed_at| failed_at.elapsed() < MIRROR_ROOT_UPLOAD_RETRY_INTERVAL)
1735 {
1736 return false;
1737 }
1738 state.upload_in_progress_root = Some(pending_root.clone());
1739 }
1740
1741 let store = Arc::clone(&self.store);
1742 let servers = self.config.blossom_write_servers.clone();
1743 let root = pending_root.clone();
1744 let root_string = pending_root.to_string();
1745 let publish_state = Arc::clone(publish_state);
1746 let log_label = log_label.to_string();
1747 tokio::task::spawn_blocking(move || {
1748 let runtime = tokio::runtime::Builder::new_current_thread()
1749 .enable_all()
1750 .build()
1751 .expect("build nostr mirror root upload runtime");
1752 runtime.block_on(async move {
1753 let result =
1754 background_blossom_push_with_store(store, &root_string, &servers).await;
1755 let mut state = publish_state.lock().expect("root publish state");
1756 if state.upload_in_progress_root.as_ref() == Some(&root) {
1757 state.upload_in_progress_root = None;
1758 }
1759 match result {
1760 Ok(()) => {
1761 if state.pending_root.as_ref() == Some(&root) {
1762 state.last_uploaded_root = Some(root.clone());
1763 state.last_uploaded_at = Some(Instant::now());
1764 state.last_upload_failed_at = None;
1765 state.last_upload_error = None;
1766 state.missing_blob_rebuild_required = false;
1767 }
1768 info!(
1769 "Nostr mirror uploaded {} DAG to Blossom: hash={}",
1770 log_label,
1771 hex::encode(root.hash)
1772 );
1773 }
1774 Err(err) => {
1775 if state.pending_root.as_ref() == Some(&root) {
1776 state.last_upload_failed_at = Some(Instant::now());
1777 state.last_upload_error = Some(format!("{err:#}"));
1778 }
1779 if is_missing_local_blob_message(&format!("{err:#}")) {
1780 state.missing_blob_rebuild_required = true;
1781 }
1782 warn!(
1783 "Nostr mirror {} DAG upload failed: hash={} error={:#}",
1784 log_label,
1785 hex::encode(root.hash),
1786 err
1787 );
1788 }
1789 }
1790 });
1791 });
1792
1793 true
1794 }
1795
1796 async fn publish_root_event_to_relays(
1797 &self,
1798 publish_client: &Client,
1799 relays: &[String],
1800 event: &Event,
1801 ) -> Result<(Vec<String>, Vec<String>)> {
1802 let mut successful_relays = Vec::new();
1803 let mut failed_relays = Vec::new();
1804
1805 match publish_client
1806 .send_event_to(relays.iter().map(|relay| relay.as_str()), event.clone())
1807 .await
1808 {
1809 Ok(output) => {
1810 for relay in relays {
1811 let relay_url = relay.trim_end_matches('/');
1812 if output
1813 .success
1814 .iter()
1815 .any(|url| url.as_str().trim_end_matches('/') == relay_url)
1816 {
1817 successful_relays.push(relay.clone());
1818 }
1819 }
1820 failed_relays.extend(output.failed.into_iter().map(|(url, reason)| match reason {
1821 Some(reason) => format!("{url}: {reason}"),
1822 None => format!("{url}: relay rejected publish"),
1823 }));
1824 }
1825 Err(err) => {
1826 failed_relays.push(format!("publish relays: {err}"));
1827 }
1828 }
1829
1830 Ok((successful_relays, failed_relays))
1831 }
1832
1833 async fn latest_root_event_created_at(&self, tree_name: &str) -> Option<Timestamp> {
1834 let publish_client = self.publish_client.as_ref()?;
1835 let author = self.publish_pubkey?;
1836 let events = publish_client
1837 .get_events_of(
1838 vec![Self::build_public_root_filter(author, tree_name)],
1839 EventSource::relays(Some(self.config.fetch_timeout)),
1840 )
1841 .await
1842 .ok()?;
1843 events
1844 .iter()
1845 .filter(|event| Self::matches_public_root_event(event, tree_name))
1846 .max_by_key(|event| (event.created_at, event.id))
1847 .map(|event| event.created_at)
1848 }
1849
1850 fn build_public_root_filter(author: PublicKey, tree_name: &str) -> Filter {
1851 Filter::new()
1852 .kind(Kind::Custom(30078))
1853 .author(author)
1854 .custom_tag(
1855 SingleLetterTag::lowercase(Alphabet::D),
1856 vec![tree_name.to_string()],
1857 )
1858 .custom_tag(
1859 SingleLetterTag::lowercase(Alphabet::L),
1860 vec!["hashtree".to_string()],
1861 )
1862 .limit(50)
1863 }
1864
1865 fn matches_public_root_event(event: &Event, tree_name: &str) -> bool {
1866 event.kind == Kind::Custom(30078)
1867 && event.tags.iter().any(|tag| {
1868 let values = tag.as_slice();
1869 values.first().is_some_and(|value| value == "d")
1870 && values.get(1).is_some_and(|value| value == tree_name)
1871 })
1872 && event.tags.iter().any(|tag| {
1873 let values = tag.as_slice();
1874 values.first().is_some_and(|value| value == "l")
1875 && values.get(1).is_some_and(|value| value == "hashtree")
1876 })
1877 }
1878
1879 fn build_public_root_event(
1880 tree_name: &str,
1881 cid: &hashtree_core::Cid,
1882 created_at: Timestamp,
1883 ) -> EventBuilder {
1884 let mut tags = vec![
1885 Tag::identifier(tree_name.to_string()),
1886 Tag::custom(
1887 TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
1888 vec!["hashtree"],
1889 ),
1890 Tag::custom(TagKind::Custom("hash".into()), vec![hex::encode(cid.hash)]),
1891 ];
1892 if let Some(key) = cid.key {
1893 tags.push(Tag::custom(
1894 TagKind::Custom("key".into()),
1895 vec![hex::encode(key)],
1896 ));
1897 }
1898
1899 EventBuilder::new(Kind::Custom(30078), "", tags).custom_created_at(created_at)
1900 }
1901}
1902
1903fn is_missing_local_blob_push_error(error: &anyhow::Error) -> bool {
1904 error
1905 .chain()
1906 .any(|cause| cause.to_string().contains(MISSING_LOCAL_BLOB_PUSH_ERROR))
1907}
1908
1909fn is_missing_local_blob_message(message: &str) -> bool {
1910 message.contains(MISSING_LOCAL_BLOB_PUSH_ERROR)
1911}
1912
1913fn later_timestamp(left: Option<Timestamp>, right: Option<Timestamp>) -> Option<Timestamp> {
1914 match (left, right) {
1915 (Some(left), Some(right)) => Some(std::cmp::max(left, right)),
1916 (Some(left), None) => Some(left),
1917 (None, Some(right)) => Some(right),
1918 (None, None) => None,
1919 }
1920}
1921
1922fn next_replaceable_created_at(now: Timestamp, latest_existing: Option<Timestamp>) -> Timestamp {
1923 match latest_existing {
1924 Some(latest) if latest >= now => Timestamp::from_secs(latest.as_u64().saturating_add(1)),
1925 _ => now,
1926 }
1927}
1928
1929#[cfg(test)]
1930mod tests;