1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::sync::Arc;
3use std::sync::Mutex;
4use std::time::Duration;
5use std::time::Instant;
6
7use anyhow::{Context, Result};
8use hashtree_nostr::{
9 CrawlConfig, CrawlReport, ListEventsOptions, NostrBridge, NostrEventStore, RelayFetchMode,
10};
11use nostr::{
12 Alphabet, Event, EventBuilder, Filter, Kind, PublicKey, SingleLetterTag, Tag, TagKind,
13 Timestamp,
14};
15use nostr_sdk::{
16 pool::RelayLimits, prelude::RelayPoolNotification, Client, EventSource, Keys, Options,
17 RelayStatus,
18};
19use tokio::sync::watch;
20use tracing::{debug, info, warn};
21
22use crate::blossom_push::background_blossom_push;
23use crate::socialgraph::crawler::SOCIALGRAPH_RELAY_EVENT_MAX_SIZE;
24use crate::socialgraph::{self, SocialGraphBackend, SocialGraphStore};
25use crate::HashtreeStore;
26
27#[cfg(not(test))]
28const MIRROR_STARTUP_DELAY: Duration = Duration::from_secs(8);
29#[cfg(test)]
30const MIRROR_STARTUP_DELAY: Duration = Duration::from_millis(50);
31
32#[cfg(not(test))]
33const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_secs(1);
34#[cfg(test)]
35const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_millis(250);
36
37#[cfg(not(test))]
38const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
39#[cfg(test)]
40const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_millis(100);
41
42#[cfg(not(test))]
43const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_secs(30);
44#[cfg(test)]
45const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_millis(100);
46
47const DEFAULT_HISTORY_KINDS: [u16; 6] = [0, 1, 3, 6, 7, 9735];
48const DEFAULT_EVENT_TREE_NAME: &str = "nostr-event-index";
49const DEFAULT_PROFILE_SEARCH_TREE_NAME: &str = "profile-search";
50const DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME: &str = "profiles-by-pubkey";
51const METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 1;
52const METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE: usize = 64;
53const LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER: usize = 8;
54const LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 16;
55const LARGE_HISTORY_SYNC_MAX_RELAY_PAGES: usize = 20;
56
57#[cfg(not(test))]
58const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_secs(300);
59#[cfg(test)]
60const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_millis(100);
61
62#[cfg(not(test))]
63const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_secs(5);
64#[cfg(test)]
65const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_millis(20);
66
67#[cfg(not(test))]
68const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_secs(30);
69#[cfg(test)]
70const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_millis(100);
71
72#[derive(Debug, Clone)]
73pub struct NostrMirrorConfig {
74 pub relays: Vec<String>,
75 pub publish_relays: Vec<String>,
76 pub blossom_write_servers: Vec<String>,
77 pub max_follow_distance: u32,
78 pub overmute_threshold: f64,
79 pub author_batch_size: usize,
80 pub history_sync_author_chunk_size: usize,
81 pub history_sync_per_author_event_limit: usize,
82 pub missing_profile_backfill_batch_size: usize,
83 pub fetch_timeout: Duration,
84 pub relay_event_max_size: Option<u32>,
85 pub require_negentropy: bool,
86 pub kinds: Vec<u16>,
87 pub history_sync_on_start: bool,
88 pub history_sync_on_reconnect: bool,
89 pub published_event_tree_name: Option<String>,
90 pub published_profile_search_tree_name: Option<String>,
91 pub published_profiles_by_pubkey_tree_name: Option<String>,
92}
93
94impl Default for NostrMirrorConfig {
95 fn default() -> Self {
96 Self {
97 relays: Vec::new(),
98 publish_relays: Vec::new(),
99 blossom_write_servers: Vec::new(),
100 max_follow_distance: 2,
101 overmute_threshold: 1.0,
102 author_batch_size: 256,
103 history_sync_author_chunk_size: 5_000,
104 history_sync_per_author_event_limit: 256,
105 missing_profile_backfill_batch_size: 5_000,
106 fetch_timeout: Duration::from_secs(15),
107 relay_event_max_size: Some(SOCIALGRAPH_RELAY_EVENT_MAX_SIZE),
108 require_negentropy: false,
109 kinds: DEFAULT_HISTORY_KINDS.to_vec(),
110 history_sync_on_start: true,
111 history_sync_on_reconnect: true,
112 published_event_tree_name: Some(DEFAULT_EVENT_TREE_NAME.to_string()),
113 published_profile_search_tree_name: Some(DEFAULT_PROFILE_SEARCH_TREE_NAME.to_string()),
114 published_profiles_by_pubkey_tree_name: Some(
115 DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME.to_string(),
116 ),
117 }
118 }
119}
120
121#[derive(Debug, Default)]
122struct RootPublishState {
123 pending_root: Option<hashtree_core::Cid>,
124 last_changed_at: Option<Instant>,
125 dirty_since: Option<Instant>,
126 last_published_root: Option<hashtree_core::Cid>,
127 last_published_at: Option<Instant>,
128 last_published_created_at: Option<Timestamp>,
129 last_uploaded_root: Option<hashtree_core::Cid>,
130 last_uploaded_at: Option<Instant>,
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134struct HistorySyncPlan {
135 relay_fetch_mode: RelayFetchMode,
136 author_batch_size: usize,
137 per_author_event_limit: usize,
138 relay_page_size: usize,
139 max_relay_pages: usize,
140}
141
142pub struct BackgroundNostrMirror {
143 config: NostrMirrorConfig,
144 store: Arc<HashtreeStore>,
145 graph_store: Arc<SocialGraphStore>,
146 client: Client,
147 publish_client: Option<Client>,
148 publish_pubkey: Option<PublicKey>,
149 event_publish_state: Mutex<RootPublishState>,
150 profile_search_publish_state: Mutex<RootPublishState>,
151 profiles_by_pubkey_publish_state: Mutex<RootPublishState>,
152 pending_live_events: Mutex<BTreeMap<String, Event>>,
153 missing_profile_cursor: Mutex<usize>,
154 shutdown_tx: watch::Sender<bool>,
155 shutdown_rx: watch::Receiver<bool>,
156}
157
158impl BackgroundNostrMirror {
159 pub async fn new(
160 config: NostrMirrorConfig,
161 store: Arc<HashtreeStore>,
162 graph_store: Arc<SocialGraphStore>,
163 publish_keys: Option<Keys>,
164 ) -> Result<Self> {
165 let client = if let Some(max_size) = config.relay_event_max_size {
166 let mut limits = RelayLimits::default();
167 limits.events.max_size = Some(max_size);
168 Client::with_opts(Keys::generate(), Options::new().relay_limits(limits))
169 } else {
170 Client::new(Keys::generate())
171 };
172 for relay in &config.relays {
173 client
174 .add_relay(relay)
175 .await
176 .with_context(|| format!("add mirror relay {relay}"))?;
177 }
178 client.connect().await;
179
180 let publish_pubkey = publish_keys.as_ref().map(Keys::public_key);
181 let publish_client = if let Some(keys) = publish_keys {
182 if config.publish_relays.is_empty() {
183 None
184 } else {
185 let client = Client::new(keys);
186 for relay in &config.publish_relays {
187 client
188 .add_relay(relay)
189 .await
190 .with_context(|| format!("add mirror publish relay {relay}"))?;
191 }
192 client.connect().await;
193 Some(client)
194 }
195 } else {
196 None
197 };
198
199 let (shutdown_tx, shutdown_rx) = watch::channel(false);
200 Ok(Self {
201 config,
202 store,
203 graph_store,
204 client,
205 publish_client,
206 publish_pubkey,
207 event_publish_state: Mutex::new(RootPublishState::default()),
208 profile_search_publish_state: Mutex::new(RootPublishState::default()),
209 profiles_by_pubkey_publish_state: Mutex::new(RootPublishState::default()),
210 pending_live_events: Mutex::new(BTreeMap::new()),
211 missing_profile_cursor: Mutex::new(0),
212 shutdown_tx,
213 shutdown_rx,
214 })
215 }
216
217 pub fn shutdown(&self) {
218 let _ = self.shutdown_tx.send(true);
219 }
220
221 fn sync_publish_roots_from_store(&self) -> Result<()> {
222 self.note_public_events_root_change()?;
223 self.note_profile_search_root_change()?;
224 self.note_profiles_by_pubkey_root_change()?;
225 Ok(())
226 }
227
228 async fn publish_pending_roots(
229 &self,
230 force_event: bool,
231 force_profile_search: bool,
232 force_profiles_by_pubkey: bool,
233 ) -> (Result<()>, Result<()>, Result<()>) {
234 tokio::join!(
235 self.maybe_publish_event_root(force_event),
236 self.maybe_publish_profile_search_root(force_profile_search),
237 self.maybe_publish_profiles_by_pubkey_root(force_profiles_by_pubkey),
238 )
239 }
240
241 async fn publish_priority_roots(
242 &self,
243 force_event: bool,
244 force_profile_search: bool,
245 force_profiles_by_pubkey: bool,
246 ) -> (Result<()>, Result<()>, Result<()>) {
247 let (profile_search_result, profiles_by_pubkey_result) = tokio::join!(
248 async {
249 if force_profile_search {
250 self.maybe_publish_profile_search_root(true).await
251 } else {
252 Ok(())
253 }
254 },
255 async {
256 if force_profiles_by_pubkey {
257 self.maybe_publish_profiles_by_pubkey_root(true).await
258 } else {
259 Ok(())
260 }
261 },
262 );
263 let event_result = if force_event {
264 self.maybe_publish_event_root(true).await
265 } else {
266 Ok(())
267 };
268 (
269 event_result,
270 profile_search_result,
271 profiles_by_pubkey_result,
272 )
273 }
274
275 pub async fn run(&self) -> Result<()> {
276 if self.config.relays.is_empty() || self.config.max_follow_distance == 0 {
277 return Ok(());
278 }
279
280 info!(
281 "Nostr mirror starting: relays={} max_follow_distance={} negentropy_only={} kinds={:?} history_sync_author_chunk_size={} history_sync_on_start={} history_sync_on_reconnect={}",
282 self.config.relays.len(),
283 self.config.max_follow_distance,
284 self.config.require_negentropy,
285 self.config.kinds,
286 self.config.history_sync_author_chunk_size.max(1),
287 self.config.history_sync_on_start,
288 self.config.history_sync_on_reconnect
289 );
290
291 tokio::time::sleep(MIRROR_STARTUP_DELAY).await;
292 tokio::time::sleep(MIRROR_CONNECT_SETTLE_DELAY).await;
293 let live_since = Timestamp::now();
294 self.sync_publish_roots_from_store()?;
295
296 let initial_authors = self.collect_authors()?;
297 if initial_authors.is_empty() {
298 info!("Nostr mirror: no social-graph authors to mirror yet");
299 } else if self.config.history_sync_on_start {
300 if self.should_backfill_missing_profiles(None) {
301 let missing_profile_authors = self.collect_missing_profile_authors(
302 self.config.missing_profile_backfill_batch_size,
303 )?;
304 if !missing_profile_authors.is_empty() {
305 info!(
306 "Nostr mirror missing-profile backfill starting: authors={}",
307 missing_profile_authors.len()
308 );
309 self.history_sync_authors_with_kinds(
310 missing_profile_authors,
311 &[Kind::Metadata.as_u16()],
312 )
313 .await?;
314 }
315 }
316 self.history_sync_authors(initial_authors.clone()).await?;
317 }
318
319 let mut subscribed_authors = HashSet::new();
320 self.subscribe_authors_since(&initial_authors, live_since, &mut subscribed_authors)
321 .await?;
322
323 let mut relay_statuses = self.capture_relay_statuses().await;
324 let mut last_reconnect_history_sync_at: Option<Instant> = None;
325 let mut last_missing_profile_backfill_at: Option<Instant> = None;
326 let mut notifications = self.client.notifications();
327 let mut shutdown_rx = self.shutdown_rx.clone();
328 let mut refresh_interval = tokio::time::interval(MIRROR_AUTHOR_REFRESH_INTERVAL);
329 let mut publish_interval = tokio::time::interval(MIRROR_ROOT_PUBLISH_DEBOUNCE);
330
331 loop {
332 tokio::select! {
333 _ = shutdown_rx.changed() => {
334 if *shutdown_rx.borrow() {
335 break;
336 }
337 }
338 _ = refresh_interval.tick() => {
339 let authors = self.collect_authors()?;
340 let new_authors = authors
341 .into_iter()
342 .filter(|author| !subscribed_authors.contains(author))
343 .collect::<Vec<_>>();
344 if !new_authors.is_empty() {
345 debug!(
346 "Nostr mirror discovered {} newly reachable author(s)",
347 new_authors.len()
348 );
349 self.history_sync_authors(new_authors.clone()).await?;
350 self.subscribe_authors_since(
351 &new_authors,
352 Timestamp::now(),
353 &mut subscribed_authors,
354 )
355 .await?;
356 }
357 if self.should_backfill_missing_profiles(last_missing_profile_backfill_at) {
358 let missing_profile_authors = self.collect_missing_profile_authors(
359 self.config.missing_profile_backfill_batch_size,
360 )?;
361 if !missing_profile_authors.is_empty() {
362 info!(
363 "Nostr mirror missing-profile backfill starting: authors={}",
364 missing_profile_authors.len()
365 );
366 self.history_sync_authors_with_kinds(
367 missing_profile_authors,
368 &[Kind::Metadata.as_u16()],
369 )
370 .await?;
371 last_missing_profile_backfill_at = Some(Instant::now());
372 }
373 }
374 }
375 _ = publish_interval.tick() => {
376 self.sync_publish_roots_from_store()?;
377 if let Err(err) = self.flush_live_events().await {
378 warn!("Nostr mirror live event flush failed: {:#}", err);
379 }
380 let (event_result, profile_search_result, profiles_by_pubkey_result) = self
381 .publish_pending_roots(false, false, false)
382 .await;
383 if let Err(err) = event_result {
384 warn!("Nostr mirror event-root publish failed: {:#}", err);
385 }
386 if let Err(err) = profile_search_result {
387 warn!("Nostr mirror profile-search publish failed: {:#}", err);
388 }
389 if let Err(err) = profiles_by_pubkey_result {
390 warn!("Nostr mirror profiles-by-pubkey publish failed: {:#}", err);
391 }
392 }
393 notification = notifications.recv() => {
394 match notification {
395 Ok(RelayPoolNotification::Event { event, .. }) => {
396 self.ingest_live_event(&event)?;
397 }
398 Ok(RelayPoolNotification::RelayStatus { relay_url, status }) => {
399 let relay_url = relay_url.to_string();
400 let previous = relay_statuses.insert(relay_url.clone(), status);
401 if Self::should_history_sync_on_reconnect(
402 self.config.history_sync_on_reconnect,
403 previous,
404 status,
405 ) && Self::should_run_reconnect_history_sync(
406 last_reconnect_history_sync_at.as_ref(),
407 )
408 {
409 let authors = self.collect_authors()?;
410 if !authors.is_empty() {
411 info!(
412 "Nostr mirror relay reconnected; running catch-up history sync: relay={} authors={} negentropy_only={}",
413 relay_url,
414 authors.len(),
415 self.config.require_negentropy
416 );
417 self.history_sync_authors(authors).await?;
418 last_reconnect_history_sync_at = Some(Instant::now());
419 }
420 }
421 }
422 Ok(RelayPoolNotification::Shutdown) => break,
423 Ok(_) => {}
424 Err(err) => {
425 warn!("Nostr mirror notification error: {}", err);
426 break;
427 }
428 }
429 }
430 }
431 }
432
433 if let Err(err) = self.flush_live_events().await {
434 warn!(
435 "Nostr mirror live event flush failed during shutdown: {:#}",
436 err
437 );
438 }
439 if let Err(err) = self.sync_publish_roots_from_store() {
440 warn!(
441 "Nostr mirror root-state refresh failed during shutdown: {:#}",
442 err
443 );
444 }
445 let (event_result, profile_search_result, profiles_by_pubkey_result) =
446 self.publish_pending_roots(true, true, true).await;
447 if let Err(err) = event_result {
448 warn!(
449 "Nostr mirror event-root publish failed during shutdown: {:#}",
450 err
451 );
452 }
453 if let Err(err) = profile_search_result {
454 warn!(
455 "Nostr mirror profile-search publish failed during shutdown: {:#}",
456 err
457 );
458 }
459 if let Err(err) = profiles_by_pubkey_result {
460 warn!(
461 "Nostr mirror profiles-by-pubkey publish failed during shutdown: {:#}",
462 err
463 );
464 }
465 let _ = self.client.disconnect().await;
466 if let Some(client) = self.publish_client.as_ref() {
467 let _ = client.disconnect().await;
468 }
469 Ok(())
470 }
471
472 async fn capture_relay_statuses(&self) -> HashMap<String, RelayStatus> {
473 let mut statuses = HashMap::new();
474 for (relay_url, relay) in self.client.relays().await {
475 statuses.insert(relay_url.to_string(), relay.status().await);
476 }
477 statuses
478 }
479
480 async fn has_connected_publish_relay(&self) -> bool {
481 let Some(client) = self.publish_client.as_ref() else {
482 return false;
483 };
484 Self::client_has_connected_relay(client).await
485 }
486
487 async fn client_has_connected_relay(client: &Client) -> bool {
488 for (_relay_url, relay) in client.relays().await {
489 if relay.status().await == RelayStatus::Connected {
490 return true;
491 }
492 }
493 false
494 }
495
496 fn collect_authors(&self) -> Result<Vec<String>> {
497 let mut authors = Vec::new();
498 let mut seen = HashSet::new();
499 for distance in 0..=self.config.max_follow_distance {
500 for pubkey in socialgraph::SocialGraphBackend::users_by_follow_distance(
501 self.graph_store.as_ref(),
502 distance,
503 )
504 .with_context(|| format!("load social-graph distance {distance}"))?
505 {
506 if self
507 .graph_store
508 .is_overmuted_user(&pubkey, self.config.overmute_threshold)?
509 {
510 continue;
511 }
512 let hex = hex::encode(pubkey);
513 if seen.insert(hex.clone()) {
514 authors.push(hex);
515 }
516 }
517 }
518 Ok(authors)
519 }
520
521 fn collect_missing_profile_authors(&self, limit: usize) -> Result<Vec<String>> {
522 if limit == 0 {
523 return Ok(Vec::new());
524 }
525
526 let authors = self.collect_authors()?;
527 if authors.is_empty() {
528 return Ok(Vec::new());
529 }
530
531 let mut cursor = self
532 .missing_profile_cursor
533 .lock()
534 .expect("missing profile cursor");
535 let mut index = (*cursor).min(authors.len());
536 let mut scanned = 0usize;
537 let mut missing = Vec::new();
538
539 while scanned < authors.len() && missing.len() < limit {
540 let author = &authors[index];
541 if self.graph_store.latest_profile_event(author)?.is_none() {
542 missing.push(author.clone());
543 }
544 index += 1;
545 if index == authors.len() {
546 index = 0;
547 }
548 scanned += 1;
549 }
550
551 *cursor = index;
552 Ok(missing)
553 }
554
555 fn should_backfill_missing_profiles(&self, last_run: Option<Instant>) -> bool {
556 if self.config.missing_profile_backfill_batch_size == 0
557 || !self.config.kinds.contains(&Kind::Metadata.as_u16())
558 {
559 return false;
560 }
561 match last_run {
562 Some(last_run) => last_run.elapsed() >= MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL,
563 None => true,
564 }
565 }
566
567 fn should_history_sync_on_reconnect(
568 history_sync_on_reconnect: bool,
569 previous: Option<RelayStatus>,
570 status: RelayStatus,
571 ) -> bool {
572 history_sync_on_reconnect
573 && status == RelayStatus::Connected
574 && matches!(
575 previous,
576 Some(
577 RelayStatus::Initialized
578 | RelayStatus::Pending
579 | RelayStatus::Connecting
580 | RelayStatus::Disconnected
581 | RelayStatus::Terminated
582 )
583 )
584 }
585
586 fn should_run_reconnect_history_sync(last_run: Option<&Instant>) -> bool {
587 match last_run {
588 None => true,
589 Some(last_run) => last_run.elapsed() >= MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN,
590 }
591 }
592
593 fn is_metadata_only_history_sync(kinds: &[u16]) -> bool {
594 !kinds.is_empty() && kinds.iter().all(|kind| *kind == Kind::Metadata.as_u16())
595 }
596
597 fn history_sync_plan_for(
598 config: &NostrMirrorConfig,
599 authors: usize,
600 kinds: &[u16],
601 ) -> HistorySyncPlan {
602 let author_batch_size = config.author_batch_size.max(1);
603 let per_author_event_limit = config.history_sync_per_author_event_limit.max(1);
604 let relay_page_size = 1_000;
605 let max_relay_pages = 10;
606
607 if Self::is_metadata_only_history_sync(kinds) {
608 return HistorySyncPlan {
609 relay_fetch_mode: RelayFetchMode::AuthorBatches,
610 author_batch_size: author_batch_size.min(METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE),
611 per_author_event_limit: METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT,
612 relay_page_size,
613 max_relay_pages,
614 };
615 }
616
617 if authors > author_batch_size.saturating_mul(LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER) {
618 return HistorySyncPlan {
619 relay_fetch_mode: RelayFetchMode::GlobalRecent,
620 author_batch_size,
621 per_author_event_limit: per_author_event_limit
622 .min(LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT)
623 .max(1),
624 relay_page_size,
625 max_relay_pages: LARGE_HISTORY_SYNC_MAX_RELAY_PAGES,
626 };
627 }
628
629 HistorySyncPlan {
630 relay_fetch_mode: RelayFetchMode::AuthorBatches,
631 author_batch_size,
632 per_author_event_limit,
633 relay_page_size,
634 max_relay_pages,
635 }
636 }
637
638 fn history_sync_plan(&self, authors: usize, kinds: &[u16]) -> HistorySyncPlan {
639 Self::history_sync_plan_for(&self.config, authors, kinds)
640 }
641
642 async fn history_sync_authors(&self, authors: Vec<String>) -> Result<()> {
643 self.history_sync_authors_with_kinds(authors, &self.config.kinds)
644 .await
645 }
646
647 async fn history_sync_authors_with_kinds(
648 &self,
649 authors: Vec<String>,
650 kinds: &[u16],
651 ) -> Result<()> {
652 self.history_sync_authors_chunked(authors, |current_root, author_chunk| async move {
653 self.history_sync_author_chunk(current_root, author_chunk, kinds)
654 .await
655 })
656 .await
657 }
658
659 async fn history_sync_authors_chunked<F, Fut>(
660 &self,
661 authors: Vec<String>,
662 mut run_chunk: F,
663 ) -> Result<()>
664 where
665 F: FnMut(Option<hashtree_core::Cid>, Vec<String>) -> Fut,
666 Fut: std::future::Future<Output = Result<CrawlReport>>,
667 {
668 if authors.is_empty() {
669 return Ok(());
670 }
671
672 info!(
673 "Nostr mirror history sync starting: authors={} relays={} negentropy_only={}",
674 authors.len(),
675 self.config.relays.len(),
676 self.config.require_negentropy
677 );
678
679 let mut current_root = self.graph_store.public_events_root()?;
680 let mut last_error = None;
681 let mut applied_chunks = 0usize;
682 let mut failed_chunks = 0usize;
683 let chunk_size = self.config.history_sync_author_chunk_size.max(1);
684 let total_chunks = authors.len().div_ceil(chunk_size);
685
686 for (chunk_index, author_chunk) in authors.chunks(chunk_size).enumerate() {
687 let author_chunk = author_chunk.to_vec();
688 let author_count = author_chunk.len();
689 info!(
690 "Nostr mirror history sync chunk starting: chunk={}/{} authors={}",
691 chunk_index + 1,
692 total_chunks,
693 author_count
694 );
695 let report = match run_chunk(current_root.clone(), author_chunk).await {
696 Ok(report) => report,
697 Err(err) => {
698 failed_chunks = failed_chunks.saturating_add(1);
699 warn!(
700 "Nostr mirror history sync chunk failed: chunk={}/{} authors={} error={:#}",
701 chunk_index + 1,
702 total_chunks,
703 author_count,
704 err
705 );
706 last_error = Some(err);
707 continue;
708 }
709 };
710
711 if report.root != current_root {
712 self.apply_history_root(report.root.as_ref()).await?;
713 current_root = report.root.clone();
714 info!(
715 "Nostr mirror history sync updated trusted root: chunk={}/{} authors_processed={} events_selected={} events_seen={}",
716 chunk_index + 1,
717 total_chunks,
718 report.authors_processed,
719 report.events_selected,
720 report.events_seen
721 );
722 }
723 applied_chunks = applied_chunks.saturating_add(1);
724 }
725
726 if applied_chunks == 0 {
727 return Err(last_error
728 .unwrap_or_else(|| anyhow::anyhow!("mirror history sync made no progress"))
729 .context("run mirror history sync"));
730 }
731 if failed_chunks > 0 {
732 warn!(
733 "Nostr mirror history sync completed with skipped chunks: applied_chunks={} failed_chunks={}",
734 applied_chunks,
735 failed_chunks
736 );
737 }
738 Ok(())
739 }
740
741 async fn history_sync_author_chunk(
742 &self,
743 current_root: Option<hashtree_core::Cid>,
744 authors: Vec<String>,
745 kinds: &[u16],
746 ) -> Result<CrawlReport> {
747 let mut last_error = None;
748 let mut report = None;
749 let plan = self.history_sync_plan(authors.len(), kinds);
750 for attempt in 0..3 {
751 let mut last_logged_authors = 0usize;
752 let bridge = NostrBridge::new(
753 self.store.store_arc(),
754 CrawlConfig {
755 relays: self.config.relays.clone(),
756 author_allowlist: Some(authors.clone()),
757 max_live_bytes: None,
758 max_events_seen: None,
759 max_authors: None,
760 max_follow_distance: None,
761 author_batch_size: plan.author_batch_size,
762 per_author_event_limit: plan.per_author_event_limit,
763 per_author_live_bytes: None,
764 fetch_timeout: self.config.fetch_timeout,
765 kinds: Some(kinds.to_vec()),
766 relay_fetch_mode: plan.relay_fetch_mode,
767 require_negentropy: self.config.require_negentropy,
768 relay_event_max_size: self.config.relay_event_max_size,
769 relay_page_size: plan.relay_page_size,
770 max_relay_pages: plan.max_relay_pages,
771 },
772 );
773
774 match bridge
775 .crawl_with_progress(self.graph_store.as_ref(), current_root.as_ref(), |progress| {
776 let log_interval = self.config.author_batch_size.saturating_mul(8).max(2_048);
777 let should_log = progress.authors_processed == progress.authors_considered
778 || progress.authors_processed == 0
779 || progress
780 .authors_processed
781 .saturating_sub(last_logged_authors)
782 >= log_interval;
783 if should_log {
784 last_logged_authors = progress.authors_processed;
785 info!(
786 "Nostr mirror history sync progress: authors_processed={}/{} events_selected={} events_seen={}",
787 progress.authors_processed,
788 progress.authors_considered,
789 progress.events_selected,
790 progress.events_seen
791 );
792 }
793 })
794 .await
795 {
796 Ok(next_report) => {
797 report = Some(next_report);
798 break;
799 }
800 Err(err) => {
801 last_error = Some(err);
802 if attempt < 2 {
803 tokio::time::sleep(Duration::from_millis(500)).await;
804 }
805 }
806 }
807 }
808 report
809 .ok_or_else(|| last_error.expect("history sync retry captured error"))
810 .context("run mirror history sync")
811 }
812
813 async fn apply_history_root(&self, root: Option<&hashtree_core::Cid>) -> Result<()> {
814 self.graph_store.write_public_events_root(root)?;
815 let Some(root) = root else {
816 return Ok(());
817 };
818
819 let event_store = NostrEventStore::new(self.store.store_arc());
820 let events = event_store
821 .list_recent_lossy(Some(root), ListEventsOptions::default())
822 .await
823 .context("list trusted mirrored events")?
824 .into_iter()
825 .map(socialgraph::stored_event_to_nostr_event)
826 .collect::<Result<Vec<_>>>()?;
827
828 self.graph_store
829 .rebuild_profile_index_for_events(&events)
830 .context("rebuild mirrored profile search index")?;
831 socialgraph::ingest_graph_parsed_events(self.graph_store.as_ref(), &events)
832 .context("sync mirrored social graph state")?;
833 self.note_public_events_root_change()?;
834 self.note_profile_search_root_change()?;
835 self.note_profiles_by_pubkey_root_change()?;
836 let (event_result, profile_search_result, profiles_by_pubkey_result) =
837 self.publish_priority_roots(true, true, true).await;
838 if let Err(err) = event_result {
839 warn!(
840 "Nostr mirror event-root publish failed after root update: {:#}",
841 err
842 );
843 }
844 if let Err(err) = profile_search_result {
845 warn!(
846 "Nostr mirror profile-search publish failed after root update: {:#}",
847 err
848 );
849 }
850 if let Err(err) = profiles_by_pubkey_result {
851 warn!(
852 "Nostr mirror profiles-by-pubkey publish failed after root update: {:#}",
853 err
854 );
855 }
856 Ok(())
857 }
858
859 async fn subscribe_authors_since(
860 &self,
861 authors: &[String],
862 since: Timestamp,
863 subscribed_authors: &mut HashSet<String>,
864 ) -> Result<()> {
865 let new_authors = authors
866 .iter()
867 .filter(|author| !subscribed_authors.contains(*author))
868 .cloned()
869 .collect::<Vec<_>>();
870 if new_authors.is_empty() {
871 return Ok(());
872 }
873
874 for chunk in new_authors.chunks(self.config.author_batch_size.max(1)) {
875 let pubkeys = chunk
876 .iter()
877 .filter_map(|author| PublicKey::from_hex(author).ok())
878 .collect::<Vec<_>>();
879 if pubkeys.is_empty() {
880 continue;
881 }
882
883 let filter = Filter::new()
884 .authors(pubkeys)
885 .kinds(self.config.kinds.iter().copied().map(Kind::from))
886 .since(since);
887
888 self.client
889 .subscribe(vec![filter], None)
890 .await
891 .context("subscribe mirror author batch")?;
892 }
893
894 subscribed_authors.extend(new_authors);
895 Ok(())
896 }
897
898 fn ingest_live_event(&self, event: &Event) -> Result<()> {
899 self.pending_live_events
900 .lock()
901 .expect("pending live events")
902 .insert(event.id.to_hex(), event.clone());
903 Ok(())
904 }
905
906 async fn flush_live_events(&self) -> Result<()> {
907 let pending = {
908 let mut pending = self
909 .pending_live_events
910 .lock()
911 .expect("pending live events");
912 if pending.is_empty() {
913 return Ok(());
914 }
915 std::mem::take(&mut *pending)
916 };
917 let events = pending.into_values().collect::<Vec<_>>();
918 let event_count = events.len();
919 let previous_event_root = self.graph_store.public_events_root()?;
920 let previous_profile_search_root = self.graph_store.profile_search_root()?;
921 let previous_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
922
923 socialgraph::ingest_parsed_events_with_storage_class(
924 self.graph_store.as_ref(),
925 &events,
926 socialgraph::EventStorageClass::Public,
927 )
928 .context("ingest live mirrored event batch")?;
929
930 let next_event_root = self.graph_store.public_events_root()?;
931 let next_profile_search_root = self.graph_store.profile_search_root()?;
932 let next_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
933 let event_root_changed = next_event_root != previous_event_root;
934 let profile_search_root_changed = next_profile_search_root != previous_profile_search_root;
935 let profiles_by_pubkey_root_changed =
936 next_profiles_by_pubkey_root != previous_profiles_by_pubkey_root;
937
938 if event_root_changed {
939 self.note_public_events_root_change()?;
940 }
941 if profile_search_root_changed {
942 self.note_profile_search_root_change()?;
943 }
944 if profiles_by_pubkey_root_changed {
945 self.note_profiles_by_pubkey_root_change()?;
946 }
947 if profile_search_root_changed {
948 self.maybe_publish_profile_search_root(true).await?;
949 }
950 if profiles_by_pubkey_root_changed {
951 self.maybe_publish_profiles_by_pubkey_root(true).await?;
952 }
953 if event_root_changed {
954 self.maybe_publish_event_root(true).await?;
955 }
956 info!(
957 "Nostr mirror flushed live events: events={} event_root_changed={} profile_search_root_changed={} profiles_by_pubkey_root_changed={}",
958 event_count,
959 event_root_changed,
960 profile_search_root_changed,
961 profiles_by_pubkey_root_changed
962 );
963 Ok(())
964 }
965
966 fn note_public_events_root_change(&self) -> Result<()> {
967 let root = self.graph_store.public_events_root()?;
968 Self::note_root_change(
969 self.config.published_event_tree_name.as_deref(),
970 &self.event_publish_state,
971 root,
972 )
973 }
974
975 fn note_profile_search_root_change(&self) -> Result<()> {
976 let root = self.graph_store.profile_search_root()?;
977 Self::note_root_change(
978 self.config.published_profile_search_tree_name.as_deref(),
979 &self.profile_search_publish_state,
980 root,
981 )
982 }
983
984 fn note_profiles_by_pubkey_root_change(&self) -> Result<()> {
985 let root = self.graph_store.profiles_by_pubkey_root()?;
986 Self::note_root_change(
987 self.config
988 .published_profiles_by_pubkey_tree_name
989 .as_deref(),
990 &self.profiles_by_pubkey_publish_state,
991 root,
992 )
993 }
994
995 fn note_root_change(
996 tree_name: Option<&str>,
997 publish_state: &Mutex<RootPublishState>,
998 root: Option<hashtree_core::Cid>,
999 ) -> Result<()> {
1000 let Some(_tree_name) = tree_name else {
1001 return Ok(());
1002 };
1003
1004 let mut state = publish_state.lock().expect("root publish state");
1005 let now = Instant::now();
1006
1007 if state.pending_root == root {
1008 return Ok(());
1009 }
1010
1011 state.pending_root = root;
1012 state.last_changed_at = Some(now);
1013 if state.dirty_since.is_none() {
1014 state.dirty_since = Some(now);
1015 }
1016 Ok(())
1017 }
1018
1019 async fn maybe_publish_event_root(&self, force: bool) -> Result<()> {
1020 self.maybe_publish_root(
1021 self.config.published_event_tree_name.as_deref(),
1022 &self.event_publish_state,
1023 "event root",
1024 force,
1025 )
1026 .await
1027 }
1028
1029 async fn maybe_publish_profile_search_root(&self, force: bool) -> Result<()> {
1030 self.maybe_publish_root(
1031 self.config.published_profile_search_tree_name.as_deref(),
1032 &self.profile_search_publish_state,
1033 "profile search root",
1034 force,
1035 )
1036 .await
1037 }
1038
1039 async fn maybe_publish_profiles_by_pubkey_root(&self, force: bool) -> Result<()> {
1040 self.maybe_publish_root(
1041 self.config
1042 .published_profiles_by_pubkey_tree_name
1043 .as_deref(),
1044 &self.profiles_by_pubkey_publish_state,
1045 "profiles-by-pubkey root",
1046 force,
1047 )
1048 .await
1049 }
1050
1051 async fn maybe_publish_root(
1052 &self,
1053 tree_name: Option<&str>,
1054 publish_state: &Mutex<RootPublishState>,
1055 log_label: &str,
1056 force: bool,
1057 ) -> Result<()> {
1058 let Some(tree_name) = tree_name else {
1059 return Ok(());
1060 };
1061
1062 let pending_root = {
1063 let state = publish_state.lock().expect("root publish state");
1064 let Some(pending_root) = state.pending_root.clone() else {
1065 return Ok(());
1066 };
1067
1068 let now = Instant::now();
1069 let debounce_ready = state.last_changed_at.is_some_and(|changed_at| {
1070 now.duration_since(changed_at) >= MIRROR_ROOT_PUBLISH_DEBOUNCE
1071 });
1072 let stale_ready = state.dirty_since.is_some_and(|dirty_since| {
1073 now.duration_since(dirty_since) >= MIRROR_ROOT_PUBLISH_MAX_STALENESS
1074 });
1075 if !force && !debounce_ready && !stale_ready {
1076 return Ok(());
1077 }
1078
1079 pending_root
1080 };
1081
1082 let needs_upload = {
1083 let state = publish_state.lock().expect("root publish state");
1084 !self.config.blossom_write_servers.is_empty()
1085 && state.last_uploaded_root.as_ref() != Some(&pending_root)
1086 };
1087 if needs_upload {
1088 background_blossom_push(
1089 self.store.base_path(),
1090 &pending_root.to_string(),
1091 &self.config.blossom_write_servers,
1092 )
1093 .await
1094 .with_context(|| format!("upload {log_label} DAG to Blossom"))?;
1095
1096 let mut state = publish_state.lock().expect("root publish state");
1097 if state.pending_root.as_ref() == Some(&pending_root) {
1098 state.last_uploaded_root = Some(pending_root.clone());
1099 state.last_uploaded_at = Some(Instant::now());
1100 }
1101 }
1102
1103 let mut successful_relays = Vec::new();
1104 let mut failed_relays = Vec::new();
1105 let publish_required =
1106 self.publish_client.is_some() && !self.config.publish_relays.is_empty();
1107 if publish_required {
1108 let Some(publish_client) = self.publish_client.as_ref() else {
1109 unreachable!("publish_required implies publish_client");
1110 };
1111 if !self.has_connected_publish_relay().await {
1112 return Ok(());
1113 }
1114
1115 let already_published = {
1116 let state = publish_state.lock().expect("root publish state");
1117 state.last_published_root.as_ref() == Some(&pending_root)
1118 };
1119 if !already_published {
1120 let publish_relays = self.config.publish_relays.clone();
1121 let latest_known_created_at = {
1122 let state = publish_state.lock().expect("root publish state");
1123 state.last_published_created_at
1124 };
1125 let publish_created_at = next_replaceable_created_at(
1126 Timestamp::now(),
1127 later_timestamp(
1128 latest_known_created_at,
1129 self.latest_root_event_created_at(tree_name).await,
1130 ),
1131 );
1132 let event = publish_client
1133 .sign_event_builder(Self::build_public_root_event(
1134 tree_name,
1135 &pending_root,
1136 publish_created_at,
1137 ))
1138 .await
1139 .with_context(|| format!("sign {log_label} event"))?;
1140 let publish_result = self
1141 .publish_root_event_to_relays(publish_client, &publish_relays, &event)
1142 .await
1143 .with_context(|| format!("publish {log_label} event"))?;
1144 successful_relays = publish_result.0;
1145 failed_relays = publish_result.1;
1146 if successful_relays.is_empty() {
1147 let failure_summary = if failed_relays.is_empty() {
1148 "no publish relays accepted the event".to_string()
1149 } else {
1150 failed_relays.join("; ")
1151 };
1152 anyhow::bail!("no publish relays accepted the event ({failure_summary})");
1153 }
1154
1155 let mut state = publish_state.lock().expect("root publish state");
1156 if state.pending_root.as_ref() == Some(&pending_root) {
1157 state.last_published_root = Some(pending_root.clone());
1158 state.last_published_at = Some(Instant::now());
1159 state.last_published_created_at = Some(event.created_at);
1160 }
1161 }
1162 }
1163
1164 {
1165 let mut state = publish_state.lock().expect("root publish state");
1166 if state.pending_root.as_ref() == Some(&pending_root) {
1167 let upload_satisfied = self.config.blossom_write_servers.is_empty()
1168 || state.last_uploaded_root.as_ref() == Some(&pending_root);
1169 let publish_satisfied =
1170 !publish_required || state.last_published_root.as_ref() == Some(&pending_root);
1171 if upload_satisfied && publish_satisfied {
1172 state.dirty_since = None;
1173 }
1174 }
1175 }
1176
1177 info!(
1178 "Nostr mirror published {}: tree={} hash={} relays={:?}",
1179 log_label,
1180 tree_name,
1181 hex::encode(pending_root.hash),
1182 successful_relays,
1183 );
1184 if !failed_relays.is_empty() {
1185 warn!(
1186 "Nostr mirror publish had relay failures: tree={} failures={:?}",
1187 tree_name, failed_relays
1188 );
1189 }
1190 Ok(())
1191 }
1192
1193 async fn publish_root_event_to_relays(
1194 &self,
1195 publish_client: &Client,
1196 relays: &[String],
1197 event: &Event,
1198 ) -> Result<(Vec<String>, Vec<String>)> {
1199 let mut successful_relays = Vec::new();
1200 let mut failed_relays = Vec::new();
1201
1202 for relay in relays {
1203 match publish_client
1204 .send_event_to([relay.as_str()], event.clone())
1205 .await
1206 {
1207 Ok(output) => {
1208 if output.success.is_empty() {
1209 failed_relays.push(format!("{relay}: relay did not acknowledge publish"));
1210 continue;
1211 }
1212 successful_relays.push(relay.clone());
1213 failed_relays.extend(output.failed.into_iter().map(
1214 |(url, reason)| match reason {
1215 Some(reason) => format!("{url}: {reason}"),
1216 None => format!("{url}: relay rejected publish"),
1217 },
1218 ));
1219 }
1220 Err(err) => {
1221 failed_relays.push(format!("{relay}: {err}"));
1222 }
1223 }
1224 }
1225
1226 Ok((successful_relays, failed_relays))
1227 }
1228
1229 async fn latest_root_event_created_at(&self, tree_name: &str) -> Option<Timestamp> {
1230 let publish_client = self.publish_client.as_ref()?;
1231 let author = self.publish_pubkey?;
1232 let events = publish_client
1233 .get_events_of(
1234 vec![Self::build_public_root_filter(author, tree_name)],
1235 EventSource::relays(Some(self.config.fetch_timeout)),
1236 )
1237 .await
1238 .ok()?;
1239 events
1240 .iter()
1241 .filter(|event| Self::matches_public_root_event(event, tree_name))
1242 .max_by_key(|event| (event.created_at, event.id))
1243 .map(|event| event.created_at)
1244 }
1245
1246 fn build_public_root_filter(author: PublicKey, tree_name: &str) -> Filter {
1247 Filter::new()
1248 .kind(Kind::Custom(30078))
1249 .author(author)
1250 .custom_tag(
1251 SingleLetterTag::lowercase(Alphabet::D),
1252 vec![tree_name.to_string()],
1253 )
1254 .custom_tag(
1255 SingleLetterTag::lowercase(Alphabet::L),
1256 vec!["hashtree".to_string()],
1257 )
1258 .limit(50)
1259 }
1260
1261 fn matches_public_root_event(event: &Event, tree_name: &str) -> bool {
1262 event.kind == Kind::Custom(30078)
1263 && event.tags.iter().any(|tag| {
1264 let values = tag.as_slice();
1265 values.first().is_some_and(|value| value == "d")
1266 && values.get(1).is_some_and(|value| value == tree_name)
1267 })
1268 && event.tags.iter().any(|tag| {
1269 let values = tag.as_slice();
1270 values.first().is_some_and(|value| value == "l")
1271 && values.get(1).is_some_and(|value| value == "hashtree")
1272 })
1273 }
1274
1275 fn build_public_root_event(
1276 tree_name: &str,
1277 cid: &hashtree_core::Cid,
1278 created_at: Timestamp,
1279 ) -> EventBuilder {
1280 let mut tags = vec![
1281 Tag::identifier(tree_name.to_string()),
1282 Tag::custom(
1283 TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
1284 vec!["hashtree"],
1285 ),
1286 Tag::custom(TagKind::Custom("hash".into()), vec![hex::encode(cid.hash)]),
1287 ];
1288 if let Some(key) = cid.key {
1289 tags.push(Tag::custom(
1290 TagKind::Custom("key".into()),
1291 vec![hex::encode(key)],
1292 ));
1293 }
1294
1295 EventBuilder::new(Kind::Custom(30078), "", tags).custom_created_at(created_at)
1296 }
1297}
1298
1299fn later_timestamp(left: Option<Timestamp>, right: Option<Timestamp>) -> Option<Timestamp> {
1300 match (left, right) {
1301 (Some(left), Some(right)) => Some(std::cmp::max(left, right)),
1302 (Some(left), None) => Some(left),
1303 (None, Some(right)) => Some(right),
1304 (None, None) => None,
1305 }
1306}
1307
1308fn next_replaceable_created_at(now: Timestamp, latest_existing: Option<Timestamp>) -> Timestamp {
1309 match latest_existing {
1310 Some(latest) if latest >= now => Timestamp::from_secs(latest.as_u64().saturating_add(1)),
1311 _ => now,
1312 }
1313}
1314
1315#[cfg(test)]
1316mod tests;