1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::sync::Arc;
3use std::sync::Mutex;
4use std::time::Duration;
5use std::time::Instant;
6
7use anyhow::{Context, Result};
8use hashtree_nostr::{
9 CrawlConfig, CrawlReport, ListEventsOptions, NostrBridge, NostrEventStore, RelayFetchMode,
10};
11use nostr::{
12 Alphabet, Event, EventBuilder, Filter, Kind, PublicKey, SingleLetterTag, Tag, TagKind,
13 Timestamp,
14};
15use nostr_sdk::{
16 pool::RelayLimits, prelude::RelayPoolNotification, Client, EventSource, Keys, Options,
17 RelayStatus,
18};
19use tokio::sync::watch;
20use tracing::{debug, info, warn};
21
22use crate::blossom_push::background_blossom_push;
23use crate::socialgraph::crawler::SOCIALGRAPH_RELAY_EVENT_MAX_SIZE;
24use crate::socialgraph::{self, SocialGraphBackend, SocialGraphStore};
25use crate::HashtreeStore;
26
27#[cfg(not(test))]
28const MIRROR_STARTUP_DELAY: Duration = Duration::from_secs(8);
29#[cfg(test)]
30const MIRROR_STARTUP_DELAY: Duration = Duration::from_millis(50);
31
32#[cfg(not(test))]
33const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_secs(1);
34#[cfg(test)]
35const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_millis(250);
36
37#[cfg(not(test))]
38const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
39#[cfg(test)]
40const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_millis(100);
41
42#[cfg(not(test))]
43const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_secs(30);
44#[cfg(test)]
45const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_millis(100);
46
47const DEFAULT_HISTORY_KINDS: [u16; 6] = [0, 1, 3, 6, 7, 9735];
48const DEFAULT_EVENT_TREE_NAME: &str = "nostr-event-index";
49const DEFAULT_PROFILE_SEARCH_TREE_NAME: &str = "profile-search";
50const DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME: &str = "profiles-by-pubkey";
51const METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 1;
52const METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE: usize = 64;
53const LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER: usize = 8;
54const LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 16;
55const LARGE_HISTORY_SYNC_MAX_RELAY_PAGES: usize = 20;
56
57#[cfg(not(test))]
58const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_secs(300);
59#[cfg(test)]
60const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_millis(100);
61
62#[cfg(not(test))]
63const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_secs(5);
64#[cfg(test)]
65const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_millis(20);
66
67#[cfg(not(test))]
68const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_secs(30);
69#[cfg(test)]
70const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_millis(100);
71
72#[derive(Debug, Clone)]
73pub struct NostrMirrorConfig {
74 pub relays: Vec<String>,
75 pub publish_relays: Vec<String>,
76 pub blossom_write_servers: Vec<String>,
77 pub max_follow_distance: u32,
78 pub overmute_threshold: f64,
79 pub author_batch_size: usize,
80 pub history_sync_author_chunk_size: usize,
81 pub history_sync_per_author_event_limit: usize,
82 pub missing_profile_backfill_batch_size: usize,
83 pub fetch_timeout: Duration,
84 pub relay_event_max_size: Option<u32>,
85 pub require_negentropy: bool,
86 pub kinds: Vec<u16>,
87 pub history_sync_on_start: bool,
88 pub history_sync_on_reconnect: bool,
89 pub published_event_tree_name: Option<String>,
90 pub published_profile_search_tree_name: Option<String>,
91 pub published_profiles_by_pubkey_tree_name: Option<String>,
92}
93
94impl Default for NostrMirrorConfig {
95 fn default() -> Self {
96 Self {
97 relays: Vec::new(),
98 publish_relays: Vec::new(),
99 blossom_write_servers: Vec::new(),
100 max_follow_distance: 2,
101 overmute_threshold: 1.0,
102 author_batch_size: 256,
103 history_sync_author_chunk_size: 5_000,
104 history_sync_per_author_event_limit: 256,
105 missing_profile_backfill_batch_size: 5_000,
106 fetch_timeout: Duration::from_secs(15),
107 relay_event_max_size: Some(SOCIALGRAPH_RELAY_EVENT_MAX_SIZE),
108 require_negentropy: false,
109 kinds: DEFAULT_HISTORY_KINDS.to_vec(),
110 history_sync_on_start: true,
111 history_sync_on_reconnect: true,
112 published_event_tree_name: Some(DEFAULT_EVENT_TREE_NAME.to_string()),
113 published_profile_search_tree_name: Some(DEFAULT_PROFILE_SEARCH_TREE_NAME.to_string()),
114 published_profiles_by_pubkey_tree_name: Some(
115 DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME.to_string(),
116 ),
117 }
118 }
119}
120
121#[derive(Debug, Default)]
122struct RootPublishState {
123 pending_root: Option<hashtree_core::Cid>,
124 last_changed_at: Option<Instant>,
125 dirty_since: Option<Instant>,
126 last_published_root: Option<hashtree_core::Cid>,
127 last_published_at: Option<Instant>,
128 last_published_created_at: Option<Timestamp>,
129 last_uploaded_root: Option<hashtree_core::Cid>,
130 last_uploaded_at: Option<Instant>,
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134struct HistorySyncPlan {
135 relay_fetch_mode: RelayFetchMode,
136 author_batch_size: usize,
137 per_author_event_limit: usize,
138 relay_page_size: usize,
139 max_relay_pages: usize,
140}
141
142pub struct BackgroundNostrMirror {
143 config: NostrMirrorConfig,
144 store: Arc<HashtreeStore>,
145 graph_store: Arc<SocialGraphStore>,
146 client: Client,
147 publish_client: Option<Client>,
148 publish_pubkey: Option<PublicKey>,
149 event_publish_state: Mutex<RootPublishState>,
150 profile_search_publish_state: Mutex<RootPublishState>,
151 profiles_by_pubkey_publish_state: Mutex<RootPublishState>,
152 pending_live_events: Mutex<BTreeMap<String, Event>>,
153 missing_profile_cursor: Mutex<usize>,
154 shutdown_tx: watch::Sender<bool>,
155 shutdown_rx: watch::Receiver<bool>,
156}
157
158impl BackgroundNostrMirror {
159 pub async fn new(
160 config: NostrMirrorConfig,
161 store: Arc<HashtreeStore>,
162 graph_store: Arc<SocialGraphStore>,
163 publish_keys: Option<Keys>,
164 ) -> Result<Self> {
165 let client = if let Some(max_size) = config.relay_event_max_size {
166 let mut limits = RelayLimits::default();
167 limits.events.max_size = Some(max_size);
168 Client::with_opts(Keys::generate(), Options::new().relay_limits(limits))
169 } else {
170 Client::new(Keys::generate())
171 };
172 for relay in &config.relays {
173 client
174 .add_relay(relay)
175 .await
176 .with_context(|| format!("add mirror relay {relay}"))?;
177 }
178 client.connect().await;
179
180 let publish_pubkey = publish_keys.as_ref().map(Keys::public_key);
181 let publish_client = if let Some(keys) = publish_keys {
182 if config.publish_relays.is_empty() {
183 None
184 } else {
185 let client = Client::new(keys);
186 for relay in &config.publish_relays {
187 client
188 .add_relay(relay)
189 .await
190 .with_context(|| format!("add mirror publish relay {relay}"))?;
191 }
192 client.connect().await;
193 Some(client)
194 }
195 } else {
196 None
197 };
198
199 let (shutdown_tx, shutdown_rx) = watch::channel(false);
200 Ok(Self {
201 config,
202 store,
203 graph_store,
204 client,
205 publish_client,
206 publish_pubkey,
207 event_publish_state: Mutex::new(RootPublishState::default()),
208 profile_search_publish_state: Mutex::new(RootPublishState::default()),
209 profiles_by_pubkey_publish_state: Mutex::new(RootPublishState::default()),
210 pending_live_events: Mutex::new(BTreeMap::new()),
211 missing_profile_cursor: Mutex::new(0),
212 shutdown_tx,
213 shutdown_rx,
214 })
215 }
216
217 pub fn shutdown(&self) {
218 let _ = self.shutdown_tx.send(true);
219 }
220
221 fn sync_publish_roots_from_store(&self) -> Result<()> {
222 self.note_public_events_root_change()?;
223 self.note_profile_search_root_change()?;
224 self.note_profiles_by_pubkey_root_change()?;
225 Ok(())
226 }
227
228 async fn publish_pending_roots(
229 &self,
230 force_event: bool,
231 force_profile_search: bool,
232 force_profiles_by_pubkey: bool,
233 ) -> (Result<()>, Result<()>, Result<()>) {
234 tokio::join!(
235 self.maybe_publish_event_root(force_event),
236 self.maybe_publish_profile_search_root(force_profile_search),
237 self.maybe_publish_profiles_by_pubkey_root(force_profiles_by_pubkey),
238 )
239 }
240
241 pub async fn run(&self) -> Result<()> {
242 if self.config.relays.is_empty() || self.config.max_follow_distance == 0 {
243 return Ok(());
244 }
245
246 info!(
247 "Nostr mirror starting: relays={} max_follow_distance={} negentropy_only={} kinds={:?} history_sync_author_chunk_size={} history_sync_on_start={} history_sync_on_reconnect={}",
248 self.config.relays.len(),
249 self.config.max_follow_distance,
250 self.config.require_negentropy,
251 self.config.kinds,
252 self.config.history_sync_author_chunk_size.max(1),
253 self.config.history_sync_on_start,
254 self.config.history_sync_on_reconnect
255 );
256
257 tokio::time::sleep(MIRROR_STARTUP_DELAY).await;
258 tokio::time::sleep(MIRROR_CONNECT_SETTLE_DELAY).await;
259 let live_since = Timestamp::now();
260 self.sync_publish_roots_from_store()?;
261
262 let initial_authors = self.collect_authors()?;
263 if initial_authors.is_empty() {
264 info!("Nostr mirror: no social-graph authors to mirror yet");
265 } else if self.config.history_sync_on_start {
266 if self.should_backfill_missing_profiles(None) {
267 let missing_profile_authors = self.collect_missing_profile_authors(
268 self.config.missing_profile_backfill_batch_size,
269 )?;
270 if !missing_profile_authors.is_empty() {
271 info!(
272 "Nostr mirror missing-profile backfill starting: authors={}",
273 missing_profile_authors.len()
274 );
275 self.history_sync_authors_with_kinds(
276 missing_profile_authors,
277 &[Kind::Metadata.as_u16()],
278 )
279 .await?;
280 }
281 }
282 self.history_sync_authors(initial_authors.clone()).await?;
283 }
284
285 let mut subscribed_authors = HashSet::new();
286 self.subscribe_authors_since(&initial_authors, live_since, &mut subscribed_authors)
287 .await?;
288
289 let mut relay_statuses = self.capture_relay_statuses().await;
290 let mut last_reconnect_history_sync_at: Option<Instant> = None;
291 let mut last_missing_profile_backfill_at: Option<Instant> = None;
292 let mut notifications = self.client.notifications();
293 let mut shutdown_rx = self.shutdown_rx.clone();
294 let mut refresh_interval = tokio::time::interval(MIRROR_AUTHOR_REFRESH_INTERVAL);
295 let mut publish_interval = tokio::time::interval(MIRROR_ROOT_PUBLISH_DEBOUNCE);
296
297 loop {
298 tokio::select! {
299 _ = shutdown_rx.changed() => {
300 if *shutdown_rx.borrow() {
301 break;
302 }
303 }
304 _ = refresh_interval.tick() => {
305 let authors = self.collect_authors()?;
306 let new_authors = authors
307 .into_iter()
308 .filter(|author| !subscribed_authors.contains(author))
309 .collect::<Vec<_>>();
310 if !new_authors.is_empty() {
311 debug!(
312 "Nostr mirror discovered {} newly reachable author(s)",
313 new_authors.len()
314 );
315 self.history_sync_authors(new_authors.clone()).await?;
316 self.subscribe_authors_since(
317 &new_authors,
318 Timestamp::now(),
319 &mut subscribed_authors,
320 )
321 .await?;
322 }
323 if self.should_backfill_missing_profiles(last_missing_profile_backfill_at) {
324 let missing_profile_authors = self.collect_missing_profile_authors(
325 self.config.missing_profile_backfill_batch_size,
326 )?;
327 if !missing_profile_authors.is_empty() {
328 info!(
329 "Nostr mirror missing-profile backfill starting: authors={}",
330 missing_profile_authors.len()
331 );
332 self.history_sync_authors_with_kinds(
333 missing_profile_authors,
334 &[Kind::Metadata.as_u16()],
335 )
336 .await?;
337 last_missing_profile_backfill_at = Some(Instant::now());
338 }
339 }
340 }
341 _ = publish_interval.tick() => {
342 self.sync_publish_roots_from_store()?;
343 if let Err(err) = self.flush_live_events().await {
344 warn!("Nostr mirror live event flush failed: {:#}", err);
345 }
346 let (event_result, profile_search_result, profiles_by_pubkey_result) = self
347 .publish_pending_roots(false, false, false)
348 .await;
349 if let Err(err) = event_result {
350 warn!("Nostr mirror event-root publish failed: {:#}", err);
351 }
352 if let Err(err) = profile_search_result {
353 warn!("Nostr mirror profile-search publish failed: {:#}", err);
354 }
355 if let Err(err) = profiles_by_pubkey_result {
356 warn!("Nostr mirror profiles-by-pubkey publish failed: {:#}", err);
357 }
358 }
359 notification = notifications.recv() => {
360 match notification {
361 Ok(RelayPoolNotification::Event { event, .. }) => {
362 self.ingest_live_event(&event)?;
363 }
364 Ok(RelayPoolNotification::RelayStatus { relay_url, status }) => {
365 let relay_url = relay_url.to_string();
366 let previous = relay_statuses.insert(relay_url.clone(), status);
367 if Self::should_history_sync_on_reconnect(
368 self.config.history_sync_on_reconnect,
369 previous,
370 status,
371 ) && Self::should_run_reconnect_history_sync(
372 last_reconnect_history_sync_at.as_ref(),
373 )
374 {
375 let authors = self.collect_authors()?;
376 if !authors.is_empty() {
377 info!(
378 "Nostr mirror relay reconnected; running catch-up history sync: relay={} authors={} negentropy_only={}",
379 relay_url,
380 authors.len(),
381 self.config.require_negentropy
382 );
383 self.history_sync_authors(authors).await?;
384 last_reconnect_history_sync_at = Some(Instant::now());
385 }
386 }
387 }
388 Ok(RelayPoolNotification::Shutdown) => break,
389 Ok(_) => {}
390 Err(err) => {
391 warn!("Nostr mirror notification error: {}", err);
392 break;
393 }
394 }
395 }
396 }
397 }
398
399 if let Err(err) = self.flush_live_events().await {
400 warn!(
401 "Nostr mirror live event flush failed during shutdown: {:#}",
402 err
403 );
404 }
405 if let Err(err) = self.sync_publish_roots_from_store() {
406 warn!(
407 "Nostr mirror root-state refresh failed during shutdown: {:#}",
408 err
409 );
410 }
411 let (event_result, profile_search_result, profiles_by_pubkey_result) =
412 self.publish_pending_roots(true, true, true).await;
413 if let Err(err) = event_result {
414 warn!(
415 "Nostr mirror event-root publish failed during shutdown: {:#}",
416 err
417 );
418 }
419 if let Err(err) = profile_search_result {
420 warn!(
421 "Nostr mirror profile-search publish failed during shutdown: {:#}",
422 err
423 );
424 }
425 if let Err(err) = profiles_by_pubkey_result {
426 warn!(
427 "Nostr mirror profiles-by-pubkey publish failed during shutdown: {:#}",
428 err
429 );
430 }
431 let _ = self.client.disconnect().await;
432 if let Some(client) = self.publish_client.as_ref() {
433 let _ = client.disconnect().await;
434 }
435 Ok(())
436 }
437
438 async fn capture_relay_statuses(&self) -> HashMap<String, RelayStatus> {
439 let mut statuses = HashMap::new();
440 for (relay_url, relay) in self.client.relays().await {
441 statuses.insert(relay_url.to_string(), relay.status().await);
442 }
443 statuses
444 }
445
446 async fn has_connected_publish_relay(&self) -> bool {
447 let Some(client) = self.publish_client.as_ref() else {
448 return false;
449 };
450 Self::client_has_connected_relay(client).await
451 }
452
453 async fn client_has_connected_relay(client: &Client) -> bool {
454 for (_relay_url, relay) in client.relays().await {
455 if relay.status().await == RelayStatus::Connected {
456 return true;
457 }
458 }
459 false
460 }
461
462 fn collect_authors(&self) -> Result<Vec<String>> {
463 let mut authors = Vec::new();
464 let mut seen = HashSet::new();
465 for distance in 0..=self.config.max_follow_distance {
466 for pubkey in socialgraph::SocialGraphBackend::users_by_follow_distance(
467 self.graph_store.as_ref(),
468 distance,
469 )
470 .with_context(|| format!("load social-graph distance {distance}"))?
471 {
472 if self
473 .graph_store
474 .is_overmuted_user(&pubkey, self.config.overmute_threshold)?
475 {
476 continue;
477 }
478 let hex = hex::encode(pubkey);
479 if seen.insert(hex.clone()) {
480 authors.push(hex);
481 }
482 }
483 }
484 Ok(authors)
485 }
486
487 fn collect_missing_profile_authors(&self, limit: usize) -> Result<Vec<String>> {
488 if limit == 0 {
489 return Ok(Vec::new());
490 }
491
492 let authors = self.collect_authors()?;
493 if authors.is_empty() {
494 return Ok(Vec::new());
495 }
496
497 let mut cursor = self
498 .missing_profile_cursor
499 .lock()
500 .expect("missing profile cursor");
501 let mut index = (*cursor).min(authors.len());
502 let mut scanned = 0usize;
503 let mut missing = Vec::new();
504
505 while scanned < authors.len() && missing.len() < limit {
506 let author = &authors[index];
507 if self.graph_store.latest_profile_event(author)?.is_none() {
508 missing.push(author.clone());
509 }
510 index += 1;
511 if index == authors.len() {
512 index = 0;
513 }
514 scanned += 1;
515 }
516
517 *cursor = index;
518 Ok(missing)
519 }
520
521 fn should_backfill_missing_profiles(&self, last_run: Option<Instant>) -> bool {
522 if self.config.missing_profile_backfill_batch_size == 0
523 || !self.config.kinds.contains(&Kind::Metadata.as_u16())
524 {
525 return false;
526 }
527 match last_run {
528 Some(last_run) => last_run.elapsed() >= MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL,
529 None => true,
530 }
531 }
532
533 fn should_history_sync_on_reconnect(
534 history_sync_on_reconnect: bool,
535 previous: Option<RelayStatus>,
536 status: RelayStatus,
537 ) -> bool {
538 history_sync_on_reconnect
539 && status == RelayStatus::Connected
540 && matches!(
541 previous,
542 Some(
543 RelayStatus::Initialized
544 | RelayStatus::Pending
545 | RelayStatus::Connecting
546 | RelayStatus::Disconnected
547 | RelayStatus::Terminated
548 )
549 )
550 }
551
552 fn should_run_reconnect_history_sync(last_run: Option<&Instant>) -> bool {
553 match last_run {
554 None => true,
555 Some(last_run) => last_run.elapsed() >= MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN,
556 }
557 }
558
559 fn is_metadata_only_history_sync(kinds: &[u16]) -> bool {
560 !kinds.is_empty() && kinds.iter().all(|kind| *kind == Kind::Metadata.as_u16())
561 }
562
563 fn history_sync_plan_for(
564 config: &NostrMirrorConfig,
565 authors: usize,
566 kinds: &[u16],
567 ) -> HistorySyncPlan {
568 let author_batch_size = config.author_batch_size.max(1);
569 let per_author_event_limit = config.history_sync_per_author_event_limit.max(1);
570 let relay_page_size = 1_000;
571 let max_relay_pages = 10;
572
573 if Self::is_metadata_only_history_sync(kinds) {
574 return HistorySyncPlan {
575 relay_fetch_mode: RelayFetchMode::AuthorBatches,
576 author_batch_size: author_batch_size.min(METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE),
577 per_author_event_limit: METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT,
578 relay_page_size,
579 max_relay_pages,
580 };
581 }
582
583 if authors > author_batch_size.saturating_mul(LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER) {
584 return HistorySyncPlan {
585 relay_fetch_mode: RelayFetchMode::GlobalRecent,
586 author_batch_size,
587 per_author_event_limit: per_author_event_limit
588 .min(LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT)
589 .max(1),
590 relay_page_size,
591 max_relay_pages: LARGE_HISTORY_SYNC_MAX_RELAY_PAGES,
592 };
593 }
594
595 HistorySyncPlan {
596 relay_fetch_mode: RelayFetchMode::AuthorBatches,
597 author_batch_size,
598 per_author_event_limit,
599 relay_page_size,
600 max_relay_pages,
601 }
602 }
603
604 fn history_sync_plan(&self, authors: usize, kinds: &[u16]) -> HistorySyncPlan {
605 Self::history_sync_plan_for(&self.config, authors, kinds)
606 }
607
608 async fn history_sync_authors(&self, authors: Vec<String>) -> Result<()> {
609 self.history_sync_authors_with_kinds(authors, &self.config.kinds)
610 .await
611 }
612
613 async fn history_sync_authors_with_kinds(
614 &self,
615 authors: Vec<String>,
616 kinds: &[u16],
617 ) -> Result<()> {
618 self.history_sync_authors_chunked(authors, |current_root, author_chunk| async move {
619 self.history_sync_author_chunk(current_root, author_chunk, kinds)
620 .await
621 })
622 .await
623 }
624
625 async fn history_sync_authors_chunked<F, Fut>(
626 &self,
627 authors: Vec<String>,
628 mut run_chunk: F,
629 ) -> Result<()>
630 where
631 F: FnMut(Option<hashtree_core::Cid>, Vec<String>) -> Fut,
632 Fut: std::future::Future<Output = Result<CrawlReport>>,
633 {
634 if authors.is_empty() {
635 return Ok(());
636 }
637
638 info!(
639 "Nostr mirror history sync starting: authors={} relays={} negentropy_only={}",
640 authors.len(),
641 self.config.relays.len(),
642 self.config.require_negentropy
643 );
644
645 let mut current_root = self.graph_store.public_events_root()?;
646 let mut last_error = None;
647 let mut applied_chunks = 0usize;
648 let mut failed_chunks = 0usize;
649 let chunk_size = self.config.history_sync_author_chunk_size.max(1);
650 let total_chunks = authors.len().div_ceil(chunk_size);
651
652 for (chunk_index, author_chunk) in authors.chunks(chunk_size).enumerate() {
653 let author_chunk = author_chunk.to_vec();
654 let author_count = author_chunk.len();
655 info!(
656 "Nostr mirror history sync chunk starting: chunk={}/{} authors={}",
657 chunk_index + 1,
658 total_chunks,
659 author_count
660 );
661 let report = match run_chunk(current_root.clone(), author_chunk).await {
662 Ok(report) => report,
663 Err(err) => {
664 failed_chunks = failed_chunks.saturating_add(1);
665 warn!(
666 "Nostr mirror history sync chunk failed: chunk={}/{} authors={} error={:#}",
667 chunk_index + 1,
668 total_chunks,
669 author_count,
670 err
671 );
672 last_error = Some(err);
673 continue;
674 }
675 };
676
677 if report.root != current_root {
678 self.apply_history_root(report.root.as_ref()).await?;
679 current_root = report.root.clone();
680 info!(
681 "Nostr mirror history sync updated trusted root: chunk={}/{} authors_processed={} events_selected={} events_seen={}",
682 chunk_index + 1,
683 total_chunks,
684 report.authors_processed,
685 report.events_selected,
686 report.events_seen
687 );
688 }
689 applied_chunks = applied_chunks.saturating_add(1);
690 }
691
692 if applied_chunks == 0 {
693 return Err(last_error
694 .unwrap_or_else(|| anyhow::anyhow!("mirror history sync made no progress"))
695 .context("run mirror history sync"));
696 }
697 if failed_chunks > 0 {
698 warn!(
699 "Nostr mirror history sync completed with skipped chunks: applied_chunks={} failed_chunks={}",
700 applied_chunks,
701 failed_chunks
702 );
703 }
704 Ok(())
705 }
706
707 async fn history_sync_author_chunk(
708 &self,
709 current_root: Option<hashtree_core::Cid>,
710 authors: Vec<String>,
711 kinds: &[u16],
712 ) -> Result<CrawlReport> {
713 let mut last_error = None;
714 let mut report = None;
715 let plan = self.history_sync_plan(authors.len(), kinds);
716 for attempt in 0..3 {
717 let mut last_logged_authors = 0usize;
718 let bridge = NostrBridge::new(
719 self.store.store_arc(),
720 CrawlConfig {
721 relays: self.config.relays.clone(),
722 author_allowlist: Some(authors.clone()),
723 max_live_bytes: None,
724 max_events_seen: None,
725 max_authors: None,
726 max_follow_distance: None,
727 author_batch_size: plan.author_batch_size,
728 per_author_event_limit: plan.per_author_event_limit,
729 per_author_live_bytes: None,
730 fetch_timeout: self.config.fetch_timeout,
731 kinds: Some(kinds.to_vec()),
732 relay_fetch_mode: plan.relay_fetch_mode,
733 require_negentropy: self.config.require_negentropy,
734 relay_event_max_size: self.config.relay_event_max_size,
735 relay_page_size: plan.relay_page_size,
736 max_relay_pages: plan.max_relay_pages,
737 },
738 );
739
740 match bridge
741 .crawl_with_progress(self.graph_store.as_ref(), current_root.as_ref(), |progress| {
742 let log_interval = self.config.author_batch_size.saturating_mul(8).max(2_048);
743 let should_log = progress.authors_processed == progress.authors_considered
744 || progress.authors_processed == 0
745 || progress
746 .authors_processed
747 .saturating_sub(last_logged_authors)
748 >= log_interval;
749 if should_log {
750 last_logged_authors = progress.authors_processed;
751 info!(
752 "Nostr mirror history sync progress: authors_processed={}/{} events_selected={} events_seen={}",
753 progress.authors_processed,
754 progress.authors_considered,
755 progress.events_selected,
756 progress.events_seen
757 );
758 }
759 })
760 .await
761 {
762 Ok(next_report) => {
763 report = Some(next_report);
764 break;
765 }
766 Err(err) => {
767 last_error = Some(err);
768 if attempt < 2 {
769 tokio::time::sleep(Duration::from_millis(500)).await;
770 }
771 }
772 }
773 }
774 report
775 .ok_or_else(|| last_error.expect("history sync retry captured error"))
776 .context("run mirror history sync")
777 }
778
779 async fn apply_history_root(&self, root: Option<&hashtree_core::Cid>) -> Result<()> {
780 self.graph_store.write_public_events_root(root)?;
781 let Some(root) = root else {
782 return Ok(());
783 };
784
785 let event_store = NostrEventStore::new(self.store.store_arc());
786 let events = event_store
787 .list_recent_lossy(Some(root), ListEventsOptions::default())
788 .await
789 .context("list trusted mirrored events")?
790 .into_iter()
791 .map(socialgraph::stored_event_to_nostr_event)
792 .collect::<Result<Vec<_>>>()?;
793
794 self.graph_store
795 .rebuild_profile_index_for_events(&events)
796 .context("rebuild mirrored profile search index")?;
797 socialgraph::ingest_graph_parsed_events(self.graph_store.as_ref(), &events)
798 .context("sync mirrored social graph state")?;
799 self.note_public_events_root_change()?;
800 self.note_profile_search_root_change()?;
801 self.note_profiles_by_pubkey_root_change()?;
802 let (event_result, profile_search_result, profiles_by_pubkey_result) =
803 self.publish_pending_roots(true, true, true).await;
804 if let Err(err) = event_result {
805 warn!(
806 "Nostr mirror event-root publish failed after root update: {:#}",
807 err
808 );
809 }
810 if let Err(err) = profile_search_result {
811 warn!(
812 "Nostr mirror profile-search publish failed after root update: {:#}",
813 err
814 );
815 }
816 if let Err(err) = profiles_by_pubkey_result {
817 warn!(
818 "Nostr mirror profiles-by-pubkey publish failed after root update: {:#}",
819 err
820 );
821 }
822 Ok(())
823 }
824
825 async fn subscribe_authors_since(
826 &self,
827 authors: &[String],
828 since: Timestamp,
829 subscribed_authors: &mut HashSet<String>,
830 ) -> Result<()> {
831 let new_authors = authors
832 .iter()
833 .filter(|author| !subscribed_authors.contains(*author))
834 .cloned()
835 .collect::<Vec<_>>();
836 if new_authors.is_empty() {
837 return Ok(());
838 }
839
840 for chunk in new_authors.chunks(self.config.author_batch_size.max(1)) {
841 let pubkeys = chunk
842 .iter()
843 .filter_map(|author| PublicKey::from_hex(author).ok())
844 .collect::<Vec<_>>();
845 if pubkeys.is_empty() {
846 continue;
847 }
848
849 let filter = Filter::new()
850 .authors(pubkeys)
851 .kinds(self.config.kinds.iter().copied().map(Kind::from))
852 .since(since);
853
854 self.client
855 .subscribe(vec![filter], None)
856 .await
857 .context("subscribe mirror author batch")?;
858 }
859
860 subscribed_authors.extend(new_authors);
861 Ok(())
862 }
863
864 fn ingest_live_event(&self, event: &Event) -> Result<()> {
865 self.pending_live_events
866 .lock()
867 .expect("pending live events")
868 .insert(event.id.to_hex(), event.clone());
869 Ok(())
870 }
871
872 async fn flush_live_events(&self) -> Result<()> {
873 let pending = {
874 let mut pending = self
875 .pending_live_events
876 .lock()
877 .expect("pending live events");
878 if pending.is_empty() {
879 return Ok(());
880 }
881 std::mem::take(&mut *pending)
882 };
883 let events = pending.into_values().collect::<Vec<_>>();
884 let event_count = events.len();
885 let previous_event_root = self.graph_store.public_events_root()?;
886 let previous_profile_search_root = self.graph_store.profile_search_root()?;
887 let previous_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
888
889 socialgraph::ingest_parsed_events_with_storage_class(
890 self.graph_store.as_ref(),
891 &events,
892 socialgraph::EventStorageClass::Public,
893 )
894 .context("ingest live mirrored event batch")?;
895
896 let next_event_root = self.graph_store.public_events_root()?;
897 let next_profile_search_root = self.graph_store.profile_search_root()?;
898 let next_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
899 let event_root_changed = next_event_root != previous_event_root;
900 let profile_search_root_changed = next_profile_search_root != previous_profile_search_root;
901 let profiles_by_pubkey_root_changed =
902 next_profiles_by_pubkey_root != previous_profiles_by_pubkey_root;
903
904 if event_root_changed {
905 self.note_public_events_root_change()?;
906 }
907 if profile_search_root_changed {
908 self.note_profile_search_root_change()?;
909 }
910 if profiles_by_pubkey_root_changed {
911 self.note_profiles_by_pubkey_root_change()?;
912 }
913 if event_root_changed {
914 self.maybe_publish_event_root(true).await?;
915 }
916 if profile_search_root_changed {
917 self.maybe_publish_profile_search_root(true).await?;
918 }
919 if profiles_by_pubkey_root_changed {
920 self.maybe_publish_profiles_by_pubkey_root(true).await?;
921 }
922
923 info!(
924 "Nostr mirror flushed live events: events={} event_root_changed={} profile_search_root_changed={} profiles_by_pubkey_root_changed={}",
925 event_count,
926 event_root_changed,
927 profile_search_root_changed,
928 profiles_by_pubkey_root_changed
929 );
930 Ok(())
931 }
932
933 fn note_public_events_root_change(&self) -> Result<()> {
934 let root = self.graph_store.public_events_root()?;
935 Self::note_root_change(
936 self.config.published_event_tree_name.as_deref(),
937 &self.event_publish_state,
938 root,
939 )
940 }
941
942 fn note_profile_search_root_change(&self) -> Result<()> {
943 let root = self.graph_store.profile_search_root()?;
944 Self::note_root_change(
945 self.config.published_profile_search_tree_name.as_deref(),
946 &self.profile_search_publish_state,
947 root,
948 )
949 }
950
951 fn note_profiles_by_pubkey_root_change(&self) -> Result<()> {
952 let root = self.graph_store.profiles_by_pubkey_root()?;
953 Self::note_root_change(
954 self.config
955 .published_profiles_by_pubkey_tree_name
956 .as_deref(),
957 &self.profiles_by_pubkey_publish_state,
958 root,
959 )
960 }
961
962 fn note_root_change(
963 tree_name: Option<&str>,
964 publish_state: &Mutex<RootPublishState>,
965 root: Option<hashtree_core::Cid>,
966 ) -> Result<()> {
967 let Some(_tree_name) = tree_name else {
968 return Ok(());
969 };
970
971 let mut state = publish_state.lock().expect("root publish state");
972 let now = Instant::now();
973
974 if state.pending_root == root {
975 return Ok(());
976 }
977
978 state.pending_root = root;
979 state.last_changed_at = Some(now);
980 if state.dirty_since.is_none() {
981 state.dirty_since = Some(now);
982 }
983 Ok(())
984 }
985
986 async fn maybe_publish_event_root(&self, force: bool) -> Result<()> {
987 self.maybe_publish_root(
988 self.config.published_event_tree_name.as_deref(),
989 &self.event_publish_state,
990 "event root",
991 force,
992 )
993 .await
994 }
995
996 async fn maybe_publish_profile_search_root(&self, force: bool) -> Result<()> {
997 self.maybe_publish_root(
998 self.config.published_profile_search_tree_name.as_deref(),
999 &self.profile_search_publish_state,
1000 "profile search root",
1001 force,
1002 )
1003 .await
1004 }
1005
1006 async fn maybe_publish_profiles_by_pubkey_root(&self, force: bool) -> Result<()> {
1007 self.maybe_publish_root(
1008 self.config
1009 .published_profiles_by_pubkey_tree_name
1010 .as_deref(),
1011 &self.profiles_by_pubkey_publish_state,
1012 "profiles-by-pubkey root",
1013 force,
1014 )
1015 .await
1016 }
1017
1018 async fn maybe_publish_root(
1019 &self,
1020 tree_name: Option<&str>,
1021 publish_state: &Mutex<RootPublishState>,
1022 log_label: &str,
1023 force: bool,
1024 ) -> Result<()> {
1025 let Some(tree_name) = tree_name else {
1026 return Ok(());
1027 };
1028
1029 let pending_root = {
1030 let state = publish_state.lock().expect("root publish state");
1031 let Some(pending_root) = state.pending_root.clone() else {
1032 return Ok(());
1033 };
1034
1035 let now = Instant::now();
1036 let debounce_ready = state.last_changed_at.is_some_and(|changed_at| {
1037 now.duration_since(changed_at) >= MIRROR_ROOT_PUBLISH_DEBOUNCE
1038 });
1039 let stale_ready = state.dirty_since.is_some_and(|dirty_since| {
1040 now.duration_since(dirty_since) >= MIRROR_ROOT_PUBLISH_MAX_STALENESS
1041 });
1042 if !force && !debounce_ready && !stale_ready {
1043 return Ok(());
1044 }
1045
1046 pending_root
1047 };
1048
1049 let needs_upload = {
1050 let state = publish_state.lock().expect("root publish state");
1051 !self.config.blossom_write_servers.is_empty()
1052 && state.last_uploaded_root.as_ref() != Some(&pending_root)
1053 };
1054 if needs_upload {
1055 background_blossom_push(
1056 self.store.base_path(),
1057 &pending_root.to_string(),
1058 &self.config.blossom_write_servers,
1059 )
1060 .await
1061 .with_context(|| format!("upload {log_label} DAG to Blossom"))?;
1062
1063 let mut state = publish_state.lock().expect("root publish state");
1064 if state.pending_root.as_ref() == Some(&pending_root) {
1065 state.last_uploaded_root = Some(pending_root.clone());
1066 state.last_uploaded_at = Some(Instant::now());
1067 }
1068 }
1069
1070 let mut successful_relays = Vec::new();
1071 let mut failed_relays = Vec::new();
1072 let publish_required =
1073 self.publish_client.is_some() && !self.config.publish_relays.is_empty();
1074 if publish_required {
1075 let Some(publish_client) = self.publish_client.as_ref() else {
1076 unreachable!("publish_required implies publish_client");
1077 };
1078 if !self.has_connected_publish_relay().await {
1079 return Ok(());
1080 }
1081
1082 let already_published = {
1083 let state = publish_state.lock().expect("root publish state");
1084 state.last_published_root.as_ref() == Some(&pending_root)
1085 };
1086 if !already_published {
1087 let publish_relays = self.config.publish_relays.clone();
1088 let latest_known_created_at = {
1089 let state = publish_state.lock().expect("root publish state");
1090 state.last_published_created_at
1091 };
1092 let publish_created_at = next_replaceable_created_at(
1093 Timestamp::now(),
1094 later_timestamp(
1095 latest_known_created_at,
1096 self.latest_root_event_created_at(tree_name).await,
1097 ),
1098 );
1099 let event = publish_client
1100 .sign_event_builder(Self::build_public_root_event(
1101 tree_name,
1102 &pending_root,
1103 publish_created_at,
1104 ))
1105 .await
1106 .with_context(|| format!("sign {log_label} event"))?;
1107 let publish_result = self
1108 .publish_root_event_to_relays(publish_client, &publish_relays, &event)
1109 .await
1110 .with_context(|| format!("publish {log_label} event"))?;
1111 successful_relays = publish_result.0;
1112 failed_relays = publish_result.1;
1113 if successful_relays.is_empty() {
1114 let failure_summary = if failed_relays.is_empty() {
1115 "no publish relays accepted the event".to_string()
1116 } else {
1117 failed_relays.join("; ")
1118 };
1119 anyhow::bail!("no publish relays accepted the event ({failure_summary})");
1120 }
1121
1122 let mut state = publish_state.lock().expect("root publish state");
1123 if state.pending_root.as_ref() == Some(&pending_root) {
1124 state.last_published_root = Some(pending_root.clone());
1125 state.last_published_at = Some(Instant::now());
1126 state.last_published_created_at = Some(event.created_at);
1127 }
1128 }
1129 }
1130
1131 {
1132 let mut state = publish_state.lock().expect("root publish state");
1133 if state.pending_root.as_ref() == Some(&pending_root) {
1134 let upload_satisfied = self.config.blossom_write_servers.is_empty()
1135 || state.last_uploaded_root.as_ref() == Some(&pending_root);
1136 let publish_satisfied =
1137 !publish_required || state.last_published_root.as_ref() == Some(&pending_root);
1138 if upload_satisfied && publish_satisfied {
1139 state.dirty_since = None;
1140 }
1141 }
1142 }
1143
1144 info!(
1145 "Nostr mirror published {}: tree={} hash={} relays={:?}",
1146 log_label,
1147 tree_name,
1148 hex::encode(pending_root.hash),
1149 successful_relays,
1150 );
1151 if !failed_relays.is_empty() {
1152 warn!(
1153 "Nostr mirror publish had relay failures: tree={} failures={:?}",
1154 tree_name, failed_relays
1155 );
1156 }
1157 Ok(())
1158 }
1159
1160 async fn publish_root_event_to_relays(
1161 &self,
1162 publish_client: &Client,
1163 relays: &[String],
1164 event: &Event,
1165 ) -> Result<(Vec<String>, Vec<String>)> {
1166 let mut successful_relays = Vec::new();
1167 let mut failed_relays = Vec::new();
1168
1169 for relay in relays {
1170 match publish_client
1171 .send_event_to([relay.as_str()], event.clone())
1172 .await
1173 {
1174 Ok(output) => {
1175 if output.success.is_empty() {
1176 failed_relays.push(format!("{relay}: relay did not acknowledge publish"));
1177 continue;
1178 }
1179 successful_relays.push(relay.clone());
1180 failed_relays.extend(output.failed.into_iter().map(
1181 |(url, reason)| match reason {
1182 Some(reason) => format!("{url}: {reason}"),
1183 None => format!("{url}: relay rejected publish"),
1184 },
1185 ));
1186 }
1187 Err(err) => {
1188 failed_relays.push(format!("{relay}: {err}"));
1189 }
1190 }
1191 }
1192
1193 Ok((successful_relays, failed_relays))
1194 }
1195
1196 async fn latest_root_event_created_at(&self, tree_name: &str) -> Option<Timestamp> {
1197 let publish_client = self.publish_client.as_ref()?;
1198 let author = self.publish_pubkey?;
1199 let events = publish_client
1200 .get_events_of(
1201 vec![Self::build_public_root_filter(author, tree_name)],
1202 EventSource::relays(Some(self.config.fetch_timeout)),
1203 )
1204 .await
1205 .ok()?;
1206 events
1207 .iter()
1208 .filter(|event| Self::matches_public_root_event(event, tree_name))
1209 .max_by_key(|event| (event.created_at, event.id))
1210 .map(|event| event.created_at)
1211 }
1212
1213 fn build_public_root_filter(author: PublicKey, tree_name: &str) -> Filter {
1214 Filter::new()
1215 .kind(Kind::Custom(30078))
1216 .author(author)
1217 .custom_tag(
1218 SingleLetterTag::lowercase(Alphabet::D),
1219 vec![tree_name.to_string()],
1220 )
1221 .custom_tag(
1222 SingleLetterTag::lowercase(Alphabet::L),
1223 vec!["hashtree".to_string()],
1224 )
1225 .limit(50)
1226 }
1227
1228 fn matches_public_root_event(event: &Event, tree_name: &str) -> bool {
1229 event.kind == Kind::Custom(30078)
1230 && event.tags.iter().any(|tag| {
1231 let values = tag.as_slice();
1232 values.first().is_some_and(|value| value == "d")
1233 && values.get(1).is_some_and(|value| value == tree_name)
1234 })
1235 && event.tags.iter().any(|tag| {
1236 let values = tag.as_slice();
1237 values.first().is_some_and(|value| value == "l")
1238 && values.get(1).is_some_and(|value| value == "hashtree")
1239 })
1240 }
1241
1242 fn build_public_root_event(
1243 tree_name: &str,
1244 cid: &hashtree_core::Cid,
1245 created_at: Timestamp,
1246 ) -> EventBuilder {
1247 let mut tags = vec![
1248 Tag::identifier(tree_name.to_string()),
1249 Tag::custom(
1250 TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
1251 vec!["hashtree"],
1252 ),
1253 Tag::custom(TagKind::Custom("hash".into()), vec![hex::encode(cid.hash)]),
1254 ];
1255 if let Some(key) = cid.key {
1256 tags.push(Tag::custom(
1257 TagKind::Custom("key".into()),
1258 vec![hex::encode(key)],
1259 ));
1260 }
1261
1262 EventBuilder::new(Kind::Custom(30078), "", tags).custom_created_at(created_at)
1263 }
1264}
1265
1266fn later_timestamp(left: Option<Timestamp>, right: Option<Timestamp>) -> Option<Timestamp> {
1267 match (left, right) {
1268 (Some(left), Some(right)) => Some(std::cmp::max(left, right)),
1269 (Some(left), None) => Some(left),
1270 (None, Some(right)) => Some(right),
1271 (None, None) => None,
1272 }
1273}
1274
1275fn next_replaceable_created_at(now: Timestamp, latest_existing: Option<Timestamp>) -> Timestamp {
1276 match latest_existing {
1277 Some(latest) if latest >= now => Timestamp::from_secs(latest.as_u64().saturating_add(1)),
1278 _ => now,
1279 }
1280}
1281
1282#[cfg(test)]
1283mod tests;