1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::sync::Arc;
3use std::sync::Mutex;
4use std::time::Duration;
5use std::time::Instant;
6
7use anyhow::{Context, Result};
8use hashtree_nostr::{
9 CrawlConfig, CrawlReport, ListEventsOptions, NostrBridge, NostrEventStore, RelayFetchMode,
10};
11use nostr::{
12 Alphabet, Event, EventBuilder, Filter, Kind, PublicKey, SingleLetterTag, Tag, TagKind,
13 Timestamp,
14};
15use nostr_sdk::{
16 pool::RelayLimits, prelude::RelayPoolNotification, Client, Keys, Options, RelayStatus,
17};
18use tokio::sync::watch;
19use tracing::{debug, info, warn};
20
21use crate::socialgraph::crawler::SOCIALGRAPH_RELAY_EVENT_MAX_SIZE;
22use crate::socialgraph::{self, SocialGraphBackend, SocialGraphStore};
23use crate::HashtreeStore;
24
25#[cfg(not(test))]
26const MIRROR_STARTUP_DELAY: Duration = Duration::from_secs(8);
27#[cfg(test)]
28const MIRROR_STARTUP_DELAY: Duration = Duration::from_millis(50);
29
30#[cfg(not(test))]
31const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_secs(1);
32#[cfg(test)]
33const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_millis(250);
34
35#[cfg(not(test))]
36const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
37#[cfg(test)]
38const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_millis(100);
39
40#[cfg(not(test))]
41const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_secs(30);
42#[cfg(test)]
43const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_millis(100);
44
45const DEFAULT_HISTORY_KINDS: [u16; 6] = [0, 1, 3, 6, 7, 9735];
46const DEFAULT_EVENT_TREE_NAME: &str = "nostr-event-index";
47const DEFAULT_PROFILE_SEARCH_TREE_NAME: &str = "profile-search";
48
49#[cfg(not(test))]
50const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_secs(300);
51#[cfg(test)]
52const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_millis(100);
53
54#[cfg(not(test))]
55const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_secs(5);
56#[cfg(test)]
57const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_millis(20);
58
59#[cfg(not(test))]
60const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_secs(30);
61#[cfg(test)]
62const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_millis(100);
63
64#[derive(Debug, Clone)]
65pub struct NostrMirrorConfig {
66 pub relays: Vec<String>,
67 pub publish_relays: Vec<String>,
68 pub max_follow_distance: u32,
69 pub overmute_threshold: f64,
70 pub author_batch_size: usize,
71 pub history_sync_author_chunk_size: usize,
72 pub history_sync_per_author_event_limit: usize,
73 pub missing_profile_backfill_batch_size: usize,
74 pub fetch_timeout: Duration,
75 pub relay_event_max_size: Option<u32>,
76 pub require_negentropy: bool,
77 pub kinds: Vec<u16>,
78 pub history_sync_on_start: bool,
79 pub history_sync_on_reconnect: bool,
80 pub published_event_tree_name: Option<String>,
81 pub published_profile_search_tree_name: Option<String>,
82}
83
84impl Default for NostrMirrorConfig {
85 fn default() -> Self {
86 Self {
87 relays: Vec::new(),
88 publish_relays: Vec::new(),
89 max_follow_distance: 2,
90 overmute_threshold: 1.0,
91 author_batch_size: 256,
92 history_sync_author_chunk_size: 5_000,
93 history_sync_per_author_event_limit: 256,
94 missing_profile_backfill_batch_size: 5_000,
95 fetch_timeout: Duration::from_secs(15),
96 relay_event_max_size: Some(SOCIALGRAPH_RELAY_EVENT_MAX_SIZE),
97 require_negentropy: false,
98 kinds: DEFAULT_HISTORY_KINDS.to_vec(),
99 history_sync_on_start: true,
100 history_sync_on_reconnect: true,
101 published_event_tree_name: Some(DEFAULT_EVENT_TREE_NAME.to_string()),
102 published_profile_search_tree_name: Some(DEFAULT_PROFILE_SEARCH_TREE_NAME.to_string()),
103 }
104 }
105}
106
107#[derive(Debug, Default)]
108struct RootPublishState {
109 pending_root: Option<hashtree_core::Cid>,
110 last_changed_at: Option<Instant>,
111 dirty_since: Option<Instant>,
112 last_published_root: Option<hashtree_core::Cid>,
113 last_published_at: Option<Instant>,
114}
115
116pub struct BackgroundNostrMirror {
117 config: NostrMirrorConfig,
118 store: Arc<HashtreeStore>,
119 graph_store: Arc<SocialGraphStore>,
120 client: Client,
121 publish_client: Option<Client>,
122 event_publish_state: Mutex<RootPublishState>,
123 profile_search_publish_state: Mutex<RootPublishState>,
124 pending_live_events: Mutex<BTreeMap<String, Event>>,
125 missing_profile_cursor: Mutex<usize>,
126 shutdown_tx: watch::Sender<bool>,
127 shutdown_rx: watch::Receiver<bool>,
128}
129
130impl BackgroundNostrMirror {
131 pub async fn new(
132 config: NostrMirrorConfig,
133 store: Arc<HashtreeStore>,
134 graph_store: Arc<SocialGraphStore>,
135 publish_keys: Option<Keys>,
136 ) -> Result<Self> {
137 let client = if let Some(max_size) = config.relay_event_max_size {
138 let mut limits = RelayLimits::default();
139 limits.events.max_size = Some(max_size);
140 Client::with_opts(Keys::generate(), Options::new().relay_limits(limits))
141 } else {
142 Client::new(Keys::generate())
143 };
144 for relay in &config.relays {
145 client
146 .add_relay(relay)
147 .await
148 .with_context(|| format!("add mirror relay {relay}"))?;
149 }
150 client.connect().await;
151
152 let publish_client = if let Some(keys) = publish_keys {
153 if config.publish_relays.is_empty() {
154 None
155 } else {
156 let client = Client::new(keys);
157 for relay in &config.publish_relays {
158 client
159 .add_relay(relay)
160 .await
161 .with_context(|| format!("add mirror publish relay {relay}"))?;
162 }
163 client.connect().await;
164 Some(client)
165 }
166 } else {
167 None
168 };
169
170 let (shutdown_tx, shutdown_rx) = watch::channel(false);
171 Ok(Self {
172 config,
173 store,
174 graph_store,
175 client,
176 publish_client,
177 event_publish_state: Mutex::new(RootPublishState::default()),
178 profile_search_publish_state: Mutex::new(RootPublishState::default()),
179 pending_live_events: Mutex::new(BTreeMap::new()),
180 missing_profile_cursor: Mutex::new(0),
181 shutdown_tx,
182 shutdown_rx,
183 })
184 }
185
186 pub fn shutdown(&self) {
187 let _ = self.shutdown_tx.send(true);
188 }
189
190 fn sync_publish_roots_from_store(&self) -> Result<()> {
191 self.note_public_events_root_change()?;
192 self.note_profile_search_root_change()?;
193 Ok(())
194 }
195
196 pub async fn run(&self) -> Result<()> {
197 if self.config.relays.is_empty() || self.config.max_follow_distance == 0 {
198 return Ok(());
199 }
200
201 info!(
202 "Nostr mirror starting: relays={} max_follow_distance={} negentropy_only={} kinds={:?} history_sync_author_chunk_size={} history_sync_on_start={} history_sync_on_reconnect={}",
203 self.config.relays.len(),
204 self.config.max_follow_distance,
205 self.config.require_negentropy,
206 self.config.kinds,
207 self.config.history_sync_author_chunk_size.max(1),
208 self.config.history_sync_on_start,
209 self.config.history_sync_on_reconnect
210 );
211
212 tokio::time::sleep(MIRROR_STARTUP_DELAY).await;
213 tokio::time::sleep(MIRROR_CONNECT_SETTLE_DELAY).await;
214 let live_since = Timestamp::now();
215 self.sync_publish_roots_from_store()?;
216
217 let initial_authors = self.collect_authors()?;
218 if initial_authors.is_empty() {
219 info!("Nostr mirror: no social-graph authors to mirror yet");
220 } else if self.config.history_sync_on_start {
221 if self.should_backfill_missing_profiles(None) {
222 let missing_profile_authors = self.collect_missing_profile_authors(
223 self.config.missing_profile_backfill_batch_size,
224 )?;
225 if !missing_profile_authors.is_empty() {
226 info!(
227 "Nostr mirror missing-profile backfill starting: authors={}",
228 missing_profile_authors.len()
229 );
230 self.history_sync_authors_with_kinds(
231 missing_profile_authors,
232 &[Kind::Metadata.as_u16()],
233 )
234 .await?;
235 }
236 }
237 self.history_sync_authors(initial_authors.clone()).await?;
238 }
239
240 let mut subscribed_authors = HashSet::new();
241 self.subscribe_authors_since(&initial_authors, live_since, &mut subscribed_authors)
242 .await?;
243
244 let mut relay_statuses = self.capture_relay_statuses().await;
245 let mut last_reconnect_history_sync_at: Option<Instant> = None;
246 let mut last_missing_profile_backfill_at: Option<Instant> = None;
247 let mut notifications = self.client.notifications();
248 let mut shutdown_rx = self.shutdown_rx.clone();
249 let mut refresh_interval = tokio::time::interval(MIRROR_AUTHOR_REFRESH_INTERVAL);
250 let mut publish_interval = tokio::time::interval(MIRROR_ROOT_PUBLISH_DEBOUNCE);
251
252 loop {
253 tokio::select! {
254 _ = shutdown_rx.changed() => {
255 if *shutdown_rx.borrow() {
256 break;
257 }
258 }
259 _ = refresh_interval.tick() => {
260 let authors = self.collect_authors()?;
261 let new_authors = authors
262 .into_iter()
263 .filter(|author| !subscribed_authors.contains(author))
264 .collect::<Vec<_>>();
265 if !new_authors.is_empty() {
266 debug!(
267 "Nostr mirror discovered {} newly reachable author(s)",
268 new_authors.len()
269 );
270 self.history_sync_authors(new_authors.clone()).await?;
271 self.subscribe_authors_since(
272 &new_authors,
273 Timestamp::now(),
274 &mut subscribed_authors,
275 )
276 .await?;
277 }
278 if self.should_backfill_missing_profiles(last_missing_profile_backfill_at) {
279 let missing_profile_authors = self.collect_missing_profile_authors(
280 self.config.missing_profile_backfill_batch_size,
281 )?;
282 if !missing_profile_authors.is_empty() {
283 info!(
284 "Nostr mirror missing-profile backfill starting: authors={}",
285 missing_profile_authors.len()
286 );
287 self.history_sync_authors_with_kinds(
288 missing_profile_authors,
289 &[Kind::Metadata.as_u16()],
290 )
291 .await?;
292 last_missing_profile_backfill_at = Some(Instant::now());
293 }
294 }
295 }
296 _ = publish_interval.tick() => {
297 self.sync_publish_roots_from_store()?;
298 if let Err(err) = self.flush_live_events().await {
299 warn!("Nostr mirror live event flush failed: {:#}", err);
300 }
301 if let Err(err) = self.maybe_publish_event_root(false).await {
302 warn!("Nostr mirror event-root publish failed: {:#}", err);
303 }
304 if let Err(err) = self.maybe_publish_profile_search_root(false).await {
305 warn!("Nostr mirror profile-search publish failed: {:#}", err);
306 }
307 }
308 notification = notifications.recv() => {
309 match notification {
310 Ok(RelayPoolNotification::Event { event, .. }) => {
311 self.ingest_live_event(&event)?;
312 }
313 Ok(RelayPoolNotification::RelayStatus { relay_url, status }) => {
314 let relay_url = relay_url.to_string();
315 let previous = relay_statuses.insert(relay_url.clone(), status);
316 if Self::should_history_sync_on_reconnect(
317 self.config.history_sync_on_reconnect,
318 previous,
319 status,
320 ) && Self::should_run_reconnect_history_sync(
321 last_reconnect_history_sync_at.as_ref(),
322 )
323 {
324 let authors = self.collect_authors()?;
325 if !authors.is_empty() {
326 info!(
327 "Nostr mirror relay reconnected; running catch-up history sync: relay={} authors={} negentropy_only={}",
328 relay_url,
329 authors.len(),
330 self.config.require_negentropy
331 );
332 self.history_sync_authors(authors).await?;
333 last_reconnect_history_sync_at = Some(Instant::now());
334 }
335 }
336 }
337 Ok(RelayPoolNotification::Shutdown) => break,
338 Ok(_) => {}
339 Err(err) => {
340 warn!("Nostr mirror notification error: {}", err);
341 break;
342 }
343 }
344 }
345 }
346 }
347
348 if let Err(err) = self.flush_live_events().await {
349 warn!(
350 "Nostr mirror live event flush failed during shutdown: {:#}",
351 err
352 );
353 }
354 if let Err(err) = self.sync_publish_roots_from_store() {
355 warn!(
356 "Nostr mirror root-state refresh failed during shutdown: {:#}",
357 err
358 );
359 }
360 if let Err(err) = self.maybe_publish_event_root(true).await {
361 warn!(
362 "Nostr mirror event-root publish failed during shutdown: {:#}",
363 err
364 );
365 }
366 if let Err(err) = self.maybe_publish_profile_search_root(true).await {
367 warn!(
368 "Nostr mirror profile-search publish failed during shutdown: {:#}",
369 err
370 );
371 }
372 let _ = self.client.disconnect().await;
373 if let Some(client) = self.publish_client.as_ref() {
374 let _ = client.disconnect().await;
375 }
376 Ok(())
377 }
378
379 async fn capture_relay_statuses(&self) -> HashMap<String, RelayStatus> {
380 let mut statuses = HashMap::new();
381 for (relay_url, relay) in self.client.relays().await {
382 statuses.insert(relay_url.to_string(), relay.status().await);
383 }
384 statuses
385 }
386
387 async fn has_connected_publish_relay(&self) -> bool {
388 let Some(client) = self.publish_client.as_ref() else {
389 return false;
390 };
391 Self::client_has_connected_relay(client).await
392 }
393
394 async fn client_has_connected_relay(client: &Client) -> bool {
395 for (_relay_url, relay) in client.relays().await {
396 if relay.status().await == RelayStatus::Connected {
397 return true;
398 }
399 }
400 false
401 }
402
403 fn collect_authors(&self) -> Result<Vec<String>> {
404 let mut authors = Vec::new();
405 let mut seen = HashSet::new();
406 for distance in 0..=self.config.max_follow_distance {
407 for pubkey in socialgraph::SocialGraphBackend::users_by_follow_distance(
408 self.graph_store.as_ref(),
409 distance,
410 )
411 .with_context(|| format!("load social-graph distance {distance}"))?
412 {
413 if self
414 .graph_store
415 .is_overmuted_user(&pubkey, self.config.overmute_threshold)?
416 {
417 continue;
418 }
419 let hex = hex::encode(pubkey);
420 if seen.insert(hex.clone()) {
421 authors.push(hex);
422 }
423 }
424 }
425 Ok(authors)
426 }
427
428 fn collect_missing_profile_authors(&self, limit: usize) -> Result<Vec<String>> {
429 if limit == 0 {
430 return Ok(Vec::new());
431 }
432
433 let authors = self.collect_authors()?;
434 if authors.is_empty() {
435 return Ok(Vec::new());
436 }
437
438 let mut cursor = self
439 .missing_profile_cursor
440 .lock()
441 .expect("missing profile cursor");
442 let mut index = (*cursor).min(authors.len());
443 let mut scanned = 0usize;
444 let mut missing = Vec::new();
445
446 while scanned < authors.len() && missing.len() < limit {
447 let author = &authors[index];
448 if self.graph_store.latest_profile_event(author)?.is_none() {
449 missing.push(author.clone());
450 }
451 index += 1;
452 if index == authors.len() {
453 index = 0;
454 }
455 scanned += 1;
456 }
457
458 *cursor = index;
459 Ok(missing)
460 }
461
462 fn should_backfill_missing_profiles(&self, last_run: Option<Instant>) -> bool {
463 if self.config.missing_profile_backfill_batch_size == 0
464 || !self.config.kinds.contains(&Kind::Metadata.as_u16())
465 {
466 return false;
467 }
468 match last_run {
469 Some(last_run) => last_run.elapsed() >= MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL,
470 None => true,
471 }
472 }
473
474 fn should_history_sync_on_reconnect(
475 history_sync_on_reconnect: bool,
476 previous: Option<RelayStatus>,
477 status: RelayStatus,
478 ) -> bool {
479 history_sync_on_reconnect
480 && status == RelayStatus::Connected
481 && matches!(
482 previous,
483 Some(
484 RelayStatus::Initialized
485 | RelayStatus::Pending
486 | RelayStatus::Connecting
487 | RelayStatus::Disconnected
488 | RelayStatus::Terminated
489 )
490 )
491 }
492
493 fn should_run_reconnect_history_sync(last_run: Option<&Instant>) -> bool {
494 match last_run {
495 None => true,
496 Some(last_run) => last_run.elapsed() >= MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN,
497 }
498 }
499
500 async fn history_sync_authors(&self, authors: Vec<String>) -> Result<()> {
501 self.history_sync_authors_with_kinds(authors, &self.config.kinds)
502 .await
503 }
504
505 async fn history_sync_authors_with_kinds(
506 &self,
507 authors: Vec<String>,
508 kinds: &[u16],
509 ) -> Result<()> {
510 self.history_sync_authors_chunked(authors, |current_root, author_chunk| async move {
511 self.history_sync_author_chunk(current_root, author_chunk, kinds)
512 .await
513 })
514 .await
515 }
516
517 async fn history_sync_authors_chunked<F, Fut>(
518 &self,
519 authors: Vec<String>,
520 mut run_chunk: F,
521 ) -> Result<()>
522 where
523 F: FnMut(Option<hashtree_core::Cid>, Vec<String>) -> Fut,
524 Fut: std::future::Future<Output = Result<CrawlReport>>,
525 {
526 if authors.is_empty() {
527 return Ok(());
528 }
529
530 info!(
531 "Nostr mirror history sync starting: authors={} relays={} negentropy_only={}",
532 authors.len(),
533 self.config.relays.len(),
534 self.config.require_negentropy
535 );
536
537 let mut current_root = self.graph_store.public_events_root()?;
538 let mut last_error = None;
539 let mut applied_chunks = 0usize;
540 let mut failed_chunks = 0usize;
541 let chunk_size = self.config.history_sync_author_chunk_size.max(1);
542 let total_chunks = authors.len().div_ceil(chunk_size);
543
544 for (chunk_index, author_chunk) in authors.chunks(chunk_size).enumerate() {
545 let author_chunk = author_chunk.to_vec();
546 let author_count = author_chunk.len();
547 info!(
548 "Nostr mirror history sync chunk starting: chunk={}/{} authors={}",
549 chunk_index + 1,
550 total_chunks,
551 author_count
552 );
553 let report = match run_chunk(current_root.clone(), author_chunk).await {
554 Ok(report) => report,
555 Err(err) => {
556 failed_chunks = failed_chunks.saturating_add(1);
557 warn!(
558 "Nostr mirror history sync chunk failed: chunk={}/{} authors={} error={:#}",
559 chunk_index + 1,
560 total_chunks,
561 author_count,
562 err
563 );
564 last_error = Some(err);
565 continue;
566 }
567 };
568
569 if report.root != current_root {
570 self.apply_history_root(report.root.as_ref()).await?;
571 current_root = report.root.clone();
572 info!(
573 "Nostr mirror history sync updated trusted root: chunk={}/{} authors_processed={} events_selected={} events_seen={}",
574 chunk_index + 1,
575 total_chunks,
576 report.authors_processed,
577 report.events_selected,
578 report.events_seen
579 );
580 }
581 applied_chunks = applied_chunks.saturating_add(1);
582 }
583
584 if applied_chunks == 0 {
585 return Err(last_error
586 .unwrap_or_else(|| anyhow::anyhow!("mirror history sync made no progress"))
587 .context("run mirror history sync"));
588 }
589 if failed_chunks > 0 {
590 warn!(
591 "Nostr mirror history sync completed with skipped chunks: applied_chunks={} failed_chunks={}",
592 applied_chunks,
593 failed_chunks
594 );
595 }
596 Ok(())
597 }
598
599 async fn history_sync_author_chunk(
600 &self,
601 current_root: Option<hashtree_core::Cid>,
602 authors: Vec<String>,
603 kinds: &[u16],
604 ) -> Result<CrawlReport> {
605 let mut last_error = None;
606 let mut report = None;
607 for attempt in 0..3 {
608 let mut last_logged_authors = 0usize;
609 let bridge = NostrBridge::new(
610 self.store.store_arc(),
611 CrawlConfig {
612 relays: self.config.relays.clone(),
613 author_allowlist: Some(authors.clone()),
614 max_live_bytes: None,
615 max_events_seen: None,
616 max_authors: None,
617 max_follow_distance: None,
618 author_batch_size: self.config.author_batch_size.max(1),
619 per_author_event_limit: self.config.history_sync_per_author_event_limit.max(1),
620 per_author_live_bytes: None,
621 fetch_timeout: self.config.fetch_timeout,
622 kinds: Some(kinds.to_vec()),
623 relay_fetch_mode: RelayFetchMode::AuthorBatches,
624 require_negentropy: self.config.require_negentropy,
625 relay_event_max_size: self.config.relay_event_max_size,
626 relay_page_size: 1_000,
627 max_relay_pages: 10,
628 },
629 );
630
631 match bridge
632 .crawl_with_progress(self.graph_store.as_ref(), current_root.as_ref(), |progress| {
633 let log_interval = self.config.author_batch_size.saturating_mul(8).max(2_048);
634 let should_log = progress.authors_processed == progress.authors_considered
635 || progress.authors_processed == 0
636 || progress
637 .authors_processed
638 .saturating_sub(last_logged_authors)
639 >= log_interval;
640 if should_log {
641 last_logged_authors = progress.authors_processed;
642 info!(
643 "Nostr mirror history sync progress: authors_processed={}/{} events_selected={} events_seen={}",
644 progress.authors_processed,
645 progress.authors_considered,
646 progress.events_selected,
647 progress.events_seen
648 );
649 }
650 })
651 .await
652 {
653 Ok(next_report) => {
654 report = Some(next_report);
655 break;
656 }
657 Err(err) => {
658 last_error = Some(err);
659 if attempt < 2 {
660 tokio::time::sleep(Duration::from_millis(500)).await;
661 }
662 }
663 }
664 }
665 report
666 .ok_or_else(|| last_error.expect("history sync retry captured error"))
667 .context("run mirror history sync")
668 }
669
670 async fn apply_history_root(&self, root: Option<&hashtree_core::Cid>) -> Result<()> {
671 self.graph_store.write_public_events_root(root)?;
672 let Some(root) = root else {
673 return Ok(());
674 };
675
676 let event_store = NostrEventStore::new(self.store.store_arc());
677 let events = event_store
678 .list_recent_lossy(Some(root), ListEventsOptions::default())
679 .await
680 .context("list trusted mirrored events")?
681 .into_iter()
682 .map(socialgraph::stored_event_to_nostr_event)
683 .collect::<Result<Vec<_>>>()?;
684
685 self.graph_store
686 .rebuild_profile_index_for_events(&events)
687 .context("rebuild mirrored profile search index")?;
688 socialgraph::ingest_graph_parsed_events(self.graph_store.as_ref(), &events)
689 .context("sync mirrored social graph state")?;
690 self.note_public_events_root_change()?;
691 self.note_profile_search_root_change()?;
692 if let Err(err) = self.maybe_publish_event_root(true).await {
693 warn!(
694 "Nostr mirror event-root publish failed after root update: {:#}",
695 err
696 );
697 }
698 if let Err(err) = self.maybe_publish_profile_search_root(false).await {
699 warn!(
700 "Nostr mirror profile-search publish failed after root update: {:#}",
701 err
702 );
703 }
704 Ok(())
705 }
706
707 async fn subscribe_authors_since(
708 &self,
709 authors: &[String],
710 since: Timestamp,
711 subscribed_authors: &mut HashSet<String>,
712 ) -> Result<()> {
713 let new_authors = authors
714 .iter()
715 .filter(|author| !subscribed_authors.contains(*author))
716 .cloned()
717 .collect::<Vec<_>>();
718 if new_authors.is_empty() {
719 return Ok(());
720 }
721
722 for chunk in new_authors.chunks(self.config.author_batch_size.max(1)) {
723 let pubkeys = chunk
724 .iter()
725 .filter_map(|author| PublicKey::from_hex(author).ok())
726 .collect::<Vec<_>>();
727 if pubkeys.is_empty() {
728 continue;
729 }
730
731 let filter = Filter::new()
732 .authors(pubkeys)
733 .kinds(self.config.kinds.iter().copied().map(Kind::from))
734 .since(since);
735
736 self.client
737 .subscribe(vec![filter], None)
738 .await
739 .context("subscribe mirror author batch")?;
740 }
741
742 subscribed_authors.extend(new_authors);
743 Ok(())
744 }
745
746 fn ingest_live_event(&self, event: &Event) -> Result<()> {
747 self.pending_live_events
748 .lock()
749 .expect("pending live events")
750 .insert(event.id.to_hex(), event.clone());
751 Ok(())
752 }
753
754 async fn flush_live_events(&self) -> Result<()> {
755 let pending = {
756 let mut pending = self
757 .pending_live_events
758 .lock()
759 .expect("pending live events");
760 if pending.is_empty() {
761 return Ok(());
762 }
763 std::mem::take(&mut *pending)
764 };
765 let events = pending.into_values().collect::<Vec<_>>();
766 let event_count = events.len();
767 let previous_event_root = self.graph_store.public_events_root()?;
768 let previous_profile_search_root = self.graph_store.profile_search_root()?;
769
770 socialgraph::ingest_parsed_events_with_storage_class(
771 self.graph_store.as_ref(),
772 &events,
773 socialgraph::EventStorageClass::Public,
774 )
775 .context("ingest live mirrored event batch")?;
776
777 let next_event_root = self.graph_store.public_events_root()?;
778 let next_profile_search_root = self.graph_store.profile_search_root()?;
779 let event_root_changed = next_event_root != previous_event_root;
780 let profile_search_root_changed = next_profile_search_root != previous_profile_search_root;
781
782 if event_root_changed {
783 self.note_public_events_root_change()?;
784 }
785 if profile_search_root_changed {
786 self.note_profile_search_root_change()?;
787 }
788 if event_root_changed {
789 self.maybe_publish_event_root(true).await?;
790 }
791 if profile_search_root_changed {
792 self.maybe_publish_profile_search_root(true).await?;
793 }
794
795 info!(
796 "Nostr mirror flushed live events: events={} event_root_changed={} profile_search_root_changed={}",
797 event_count, event_root_changed, profile_search_root_changed
798 );
799 Ok(())
800 }
801
802 fn note_public_events_root_change(&self) -> Result<()> {
803 let root = self.graph_store.public_events_root()?;
804 Self::note_root_change(
805 self.config.published_event_tree_name.as_deref(),
806 &self.event_publish_state,
807 root,
808 )
809 }
810
811 fn note_profile_search_root_change(&self) -> Result<()> {
812 let root = self.graph_store.profile_search_root()?;
813 Self::note_root_change(
814 self.config.published_profile_search_tree_name.as_deref(),
815 &self.profile_search_publish_state,
816 root,
817 )
818 }
819
820 fn note_root_change(
821 tree_name: Option<&str>,
822 publish_state: &Mutex<RootPublishState>,
823 root: Option<hashtree_core::Cid>,
824 ) -> Result<()> {
825 let Some(_tree_name) = tree_name else {
826 return Ok(());
827 };
828
829 let mut state = publish_state.lock().expect("root publish state");
830 let now = Instant::now();
831
832 if state.pending_root == root {
833 return Ok(());
834 }
835
836 state.pending_root = root;
837 state.last_changed_at = Some(now);
838 if state.dirty_since.is_none() {
839 state.dirty_since = Some(now);
840 }
841 Ok(())
842 }
843
844 async fn maybe_publish_event_root(&self, force: bool) -> Result<()> {
845 self.maybe_publish_root(
846 self.config.published_event_tree_name.as_deref(),
847 &self.event_publish_state,
848 "event root",
849 force,
850 )
851 .await
852 }
853
854 async fn maybe_publish_profile_search_root(&self, force: bool) -> Result<()> {
855 self.maybe_publish_root(
856 self.config.published_profile_search_tree_name.as_deref(),
857 &self.profile_search_publish_state,
858 "profile search root",
859 force,
860 )
861 .await
862 }
863
864 async fn maybe_publish_root(
865 &self,
866 tree_name: Option<&str>,
867 publish_state: &Mutex<RootPublishState>,
868 log_label: &str,
869 force: bool,
870 ) -> Result<()> {
871 let Some(tree_name) = tree_name else {
872 return Ok(());
873 };
874 let Some(publish_client) = self.publish_client.as_ref() else {
875 return Ok(());
876 };
877 if !self.has_connected_publish_relay().await {
878 return Ok(());
879 }
880
881 let pending_root = {
882 let state = publish_state.lock().expect("root publish state");
883 let Some(pending_root) = state.pending_root.clone() else {
884 return Ok(());
885 };
886 if state.last_published_root.as_ref() == Some(&pending_root) {
887 return Ok(());
888 }
889
890 let now = Instant::now();
891 let debounce_ready = state.last_changed_at.is_some_and(|changed_at| {
892 now.duration_since(changed_at) >= MIRROR_ROOT_PUBLISH_DEBOUNCE
893 });
894 let stale_ready = state.dirty_since.is_some_and(|dirty_since| {
895 now.duration_since(dirty_since) >= MIRROR_ROOT_PUBLISH_MAX_STALENESS
896 });
897 if !force && !debounce_ready && !stale_ready {
898 return Ok(());
899 }
900
901 pending_root
902 };
903
904 let event = Self::build_public_root_event(tree_name, &pending_root);
905 let output = publish_client
906 .send_event_builder(event)
907 .await
908 .with_context(|| format!("publish {log_label} event"))?;
909 if output.failed.is_empty() && output.success.is_empty() {
910 return Ok(());
911 }
912
913 {
914 let mut state = publish_state.lock().expect("root publish state");
915 if state.pending_root.as_ref() == Some(&pending_root) {
916 state.last_published_root = Some(pending_root.clone());
917 state.last_published_at = Some(Instant::now());
918 state.dirty_since = None;
919 }
920 }
921
922 info!(
923 "Nostr mirror published {}: tree={} hash={}",
924 log_label,
925 tree_name,
926 hex::encode(pending_root.hash)
927 );
928 Ok(())
929 }
930
931 fn build_public_root_event(tree_name: &str, cid: &hashtree_core::Cid) -> EventBuilder {
932 let mut tags = vec![
933 Tag::identifier(tree_name.to_string()),
934 Tag::custom(
935 TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
936 vec!["hashtree"],
937 ),
938 Tag::custom(TagKind::Custom("hash".into()), vec![hex::encode(cid.hash)]),
939 ];
940 if let Some(key) = cid.key {
941 tags.push(Tag::custom(
942 TagKind::Custom("key".into()),
943 vec![hex::encode(key)],
944 ));
945 }
946
947 EventBuilder::new(Kind::Custom(30078), "", tags)
948 }
949}
950
951#[cfg(test)]
952mod tests;