1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use std::sync::Mutex;
4use std::time::Duration;
5use std::time::Instant;
6
7use anyhow::{Context, Result};
8use hashtree_nostr::{
9 CrawlConfig, CrawlReport, ListEventsOptions, NostrBridge, NostrEventStore, RelayFetchMode,
10};
11use nostr::{
12 Alphabet, Event, EventBuilder, Filter, Kind, PublicKey, SingleLetterTag, Tag, TagKind,
13 Timestamp,
14};
15use nostr_sdk::{
16 pool::RelayLimits, prelude::RelayPoolNotification, Client, Keys, Options, RelayStatus,
17};
18use tokio::sync::watch;
19use tracing::{debug, info, warn};
20
21use crate::socialgraph::crawler::SOCIALGRAPH_RELAY_EVENT_MAX_SIZE;
22use crate::socialgraph::{self, SocialGraphBackend, SocialGraphStore};
23use crate::HashtreeStore;
24
25#[cfg(not(test))]
26const MIRROR_STARTUP_DELAY: Duration = Duration::from_secs(8);
27#[cfg(test)]
28const MIRROR_STARTUP_DELAY: Duration = Duration::from_millis(50);
29
30#[cfg(not(test))]
31const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_secs(1);
32#[cfg(test)]
33const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_millis(250);
34
35#[cfg(not(test))]
36const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
37#[cfg(test)]
38const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_millis(100);
39
40#[cfg(not(test))]
41const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_secs(30);
42#[cfg(test)]
43const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_millis(100);
44
45const DEFAULT_HISTORY_KINDS: [u16; 2] = [0, 3];
46const DEFAULT_PROFILE_SEARCH_TREE_NAME: &str = "profile-search";
47
48#[cfg(not(test))]
49const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_secs(300);
50#[cfg(test)]
51const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_millis(100);
52
53#[cfg(not(test))]
54const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_secs(5);
55#[cfg(test)]
56const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_millis(20);
57
58#[cfg(not(test))]
59const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_secs(30);
60#[cfg(test)]
61const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_millis(100);
62
63#[derive(Debug, Clone)]
64pub struct NostrMirrorConfig {
65 pub relays: Vec<String>,
66 pub publish_relays: Vec<String>,
67 pub max_follow_distance: u32,
68 pub overmute_threshold: f64,
69 pub author_batch_size: usize,
70 pub history_sync_author_chunk_size: usize,
71 pub missing_profile_backfill_batch_size: usize,
72 pub fetch_timeout: Duration,
73 pub relay_event_max_size: Option<u32>,
74 pub require_negentropy: bool,
75 pub kinds: Vec<u16>,
76 pub history_sync_on_start: bool,
77 pub history_sync_on_reconnect: bool,
78 pub published_profile_search_tree_name: Option<String>,
79}
80
81impl Default for NostrMirrorConfig {
82 fn default() -> Self {
83 Self {
84 relays: Vec::new(),
85 publish_relays: Vec::new(),
86 max_follow_distance: 2,
87 overmute_threshold: 1.0,
88 author_batch_size: 256,
89 history_sync_author_chunk_size: 5_000,
90 missing_profile_backfill_batch_size: 5_000,
91 fetch_timeout: Duration::from_secs(15),
92 relay_event_max_size: Some(SOCIALGRAPH_RELAY_EVENT_MAX_SIZE),
93 require_negentropy: false,
94 kinds: DEFAULT_HISTORY_KINDS.to_vec(),
95 history_sync_on_start: true,
96 history_sync_on_reconnect: true,
97 published_profile_search_tree_name: Some(DEFAULT_PROFILE_SEARCH_TREE_NAME.to_string()),
98 }
99 }
100}
101
102#[derive(Debug, Default)]
103struct RootPublishState {
104 pending_root: Option<hashtree_core::Cid>,
105 last_changed_at: Option<Instant>,
106 dirty_since: Option<Instant>,
107 last_published_root: Option<hashtree_core::Cid>,
108 last_published_at: Option<Instant>,
109}
110
111pub struct BackgroundNostrMirror {
112 config: NostrMirrorConfig,
113 store: Arc<HashtreeStore>,
114 graph_store: Arc<SocialGraphStore>,
115 client: Client,
116 publish_client: Option<Client>,
117 profile_search_publish_state: Mutex<RootPublishState>,
118 missing_profile_cursor: Mutex<usize>,
119 shutdown_tx: watch::Sender<bool>,
120 shutdown_rx: watch::Receiver<bool>,
121}
122
123impl BackgroundNostrMirror {
124 pub async fn new(
125 config: NostrMirrorConfig,
126 store: Arc<HashtreeStore>,
127 graph_store: Arc<SocialGraphStore>,
128 publish_keys: Option<Keys>,
129 ) -> Result<Self> {
130 let client = if let Some(max_size) = config.relay_event_max_size {
131 let mut limits = RelayLimits::default();
132 limits.events.max_size = Some(max_size);
133 Client::with_opts(Keys::generate(), Options::new().relay_limits(limits))
134 } else {
135 Client::new(Keys::generate())
136 };
137 for relay in &config.relays {
138 client
139 .add_relay(relay)
140 .await
141 .with_context(|| format!("add mirror relay {relay}"))?;
142 }
143 client.connect().await;
144
145 let publish_client = if let Some(keys) = publish_keys {
146 if config.publish_relays.is_empty() {
147 None
148 } else {
149 let client = Client::new(keys);
150 for relay in &config.publish_relays {
151 client
152 .add_relay(relay)
153 .await
154 .with_context(|| format!("add mirror publish relay {relay}"))?;
155 }
156 client.connect().await;
157 Some(client)
158 }
159 } else {
160 None
161 };
162
163 let (shutdown_tx, shutdown_rx) = watch::channel(false);
164 Ok(Self {
165 config,
166 store,
167 graph_store,
168 client,
169 publish_client,
170 profile_search_publish_state: Mutex::new(RootPublishState::default()),
171 missing_profile_cursor: Mutex::new(0),
172 shutdown_tx,
173 shutdown_rx,
174 })
175 }
176
177 pub fn shutdown(&self) {
178 let _ = self.shutdown_tx.send(true);
179 }
180
181 pub async fn run(&self) -> Result<()> {
182 if self.config.relays.is_empty() || self.config.max_follow_distance == 0 {
183 return Ok(());
184 }
185
186 info!(
187 "Nostr mirror starting: relays={} max_follow_distance={} negentropy_only={} kinds={:?} history_sync_author_chunk_size={} history_sync_on_start={} history_sync_on_reconnect={}",
188 self.config.relays.len(),
189 self.config.max_follow_distance,
190 self.config.require_negentropy,
191 self.config.kinds,
192 self.config.history_sync_author_chunk_size.max(1),
193 self.config.history_sync_on_start,
194 self.config.history_sync_on_reconnect
195 );
196
197 tokio::time::sleep(MIRROR_STARTUP_DELAY).await;
198 tokio::time::sleep(MIRROR_CONNECT_SETTLE_DELAY).await;
199 let live_since = Timestamp::now();
200 self.note_profile_search_root_change()?;
201
202 let initial_authors = self.collect_authors()?;
203 if initial_authors.is_empty() {
204 info!("Nostr mirror: no social-graph authors to mirror yet");
205 } else if self.config.history_sync_on_start {
206 if self.should_backfill_missing_profiles(None) {
207 let missing_profile_authors = self.collect_missing_profile_authors(
208 self.config.missing_profile_backfill_batch_size,
209 )?;
210 if !missing_profile_authors.is_empty() {
211 info!(
212 "Nostr mirror missing-profile backfill starting: authors={}",
213 missing_profile_authors.len()
214 );
215 self.history_sync_authors_with_kinds(
216 missing_profile_authors,
217 &[Kind::Metadata.as_u16()],
218 )
219 .await?;
220 }
221 }
222 self.history_sync_authors(initial_authors.clone()).await?;
223 }
224
225 let mut subscribed_authors = HashSet::new();
226 self.subscribe_authors_since(&initial_authors, live_since, &mut subscribed_authors)
227 .await?;
228
229 let mut relay_statuses = self.capture_relay_statuses().await;
230 let mut last_reconnect_history_sync_at: Option<Instant> = None;
231 let mut last_missing_profile_backfill_at: Option<Instant> = None;
232 let mut notifications = self.client.notifications();
233 let mut shutdown_rx = self.shutdown_rx.clone();
234 let mut refresh_interval = tokio::time::interval(MIRROR_AUTHOR_REFRESH_INTERVAL);
235 let mut publish_interval = tokio::time::interval(MIRROR_ROOT_PUBLISH_DEBOUNCE);
236
237 loop {
238 tokio::select! {
239 _ = shutdown_rx.changed() => {
240 if *shutdown_rx.borrow() {
241 break;
242 }
243 }
244 _ = refresh_interval.tick() => {
245 let authors = self.collect_authors()?;
246 let new_authors = authors
247 .into_iter()
248 .filter(|author| !subscribed_authors.contains(author))
249 .collect::<Vec<_>>();
250 if !new_authors.is_empty() {
251 debug!(
252 "Nostr mirror discovered {} newly reachable author(s)",
253 new_authors.len()
254 );
255 self.history_sync_authors(new_authors.clone()).await?;
256 self.subscribe_authors_since(
257 &new_authors,
258 Timestamp::now(),
259 &mut subscribed_authors,
260 )
261 .await?;
262 }
263 if self.should_backfill_missing_profiles(last_missing_profile_backfill_at) {
264 let missing_profile_authors = self.collect_missing_profile_authors(
265 self.config.missing_profile_backfill_batch_size,
266 )?;
267 if !missing_profile_authors.is_empty() {
268 info!(
269 "Nostr mirror missing-profile backfill starting: authors={}",
270 missing_profile_authors.len()
271 );
272 self.history_sync_authors_with_kinds(
273 missing_profile_authors,
274 &[Kind::Metadata.as_u16()],
275 )
276 .await?;
277 last_missing_profile_backfill_at = Some(Instant::now());
278 }
279 }
280 }
281 _ = publish_interval.tick() => {
282 if let Err(err) = self.maybe_publish_profile_search_root(false).await {
283 warn!("Nostr mirror profile-search publish failed: {:#}", err);
284 }
285 }
286 notification = notifications.recv() => {
287 match notification {
288 Ok(RelayPoolNotification::Event { event, .. }) => {
289 self.ingest_live_event(&event)?;
290 }
291 Ok(RelayPoolNotification::RelayStatus { relay_url, status }) => {
292 let relay_url = relay_url.to_string();
293 let previous = relay_statuses.insert(relay_url.clone(), status);
294 if Self::should_history_sync_on_reconnect(
295 self.config.history_sync_on_reconnect,
296 previous,
297 status,
298 ) && Self::should_run_reconnect_history_sync(
299 last_reconnect_history_sync_at.as_ref(),
300 )
301 {
302 let authors = self.collect_authors()?;
303 if !authors.is_empty() {
304 info!(
305 "Nostr mirror relay reconnected; running catch-up history sync: relay={} authors={} negentropy_only={}",
306 relay_url,
307 authors.len(),
308 self.config.require_negentropy
309 );
310 self.history_sync_authors(authors).await?;
311 last_reconnect_history_sync_at = Some(Instant::now());
312 }
313 }
314 }
315 Ok(RelayPoolNotification::Shutdown) => break,
316 Ok(_) => {}
317 Err(err) => {
318 warn!("Nostr mirror notification error: {}", err);
319 break;
320 }
321 }
322 }
323 }
324 }
325
326 let _ = self.client.disconnect().await;
327 if let Some(client) = self.publish_client.as_ref() {
328 let _ = client.disconnect().await;
329 }
330 Ok(())
331 }
332
333 async fn capture_relay_statuses(&self) -> HashMap<String, RelayStatus> {
334 let mut statuses = HashMap::new();
335 for (relay_url, relay) in self.client.relays().await {
336 statuses.insert(relay_url.to_string(), relay.status().await);
337 }
338 statuses
339 }
340
341 async fn has_connected_publish_relay(&self) -> bool {
342 let Some(client) = self.publish_client.as_ref() else {
343 return false;
344 };
345 Self::client_has_connected_relay(client).await
346 }
347
348 async fn client_has_connected_relay(client: &Client) -> bool {
349 for (_relay_url, relay) in client.relays().await {
350 if relay.status().await == RelayStatus::Connected {
351 return true;
352 }
353 }
354 false
355 }
356
357 fn collect_authors(&self) -> Result<Vec<String>> {
358 let mut authors = Vec::new();
359 let mut seen = HashSet::new();
360 for distance in 0..=self.config.max_follow_distance {
361 for pubkey in socialgraph::SocialGraphBackend::users_by_follow_distance(
362 self.graph_store.as_ref(),
363 distance,
364 )
365 .with_context(|| format!("load social-graph distance {distance}"))?
366 {
367 if self
368 .graph_store
369 .is_overmuted_user(&pubkey, self.config.overmute_threshold)?
370 {
371 continue;
372 }
373 let hex = hex::encode(pubkey);
374 if seen.insert(hex.clone()) {
375 authors.push(hex);
376 }
377 }
378 }
379 Ok(authors)
380 }
381
382 fn collect_missing_profile_authors(&self, limit: usize) -> Result<Vec<String>> {
383 if limit == 0 {
384 return Ok(Vec::new());
385 }
386
387 let authors = self.collect_authors()?;
388 if authors.is_empty() {
389 return Ok(Vec::new());
390 }
391
392 let mut cursor = self
393 .missing_profile_cursor
394 .lock()
395 .expect("missing profile cursor");
396 let mut index = (*cursor).min(authors.len());
397 let mut scanned = 0usize;
398 let mut missing = Vec::new();
399
400 while scanned < authors.len() && missing.len() < limit {
401 let author = &authors[index];
402 if self.graph_store.latest_profile_event(author)?.is_none() {
403 missing.push(author.clone());
404 }
405 index += 1;
406 if index == authors.len() {
407 index = 0;
408 }
409 scanned += 1;
410 }
411
412 *cursor = index;
413 Ok(missing)
414 }
415
416 fn should_backfill_missing_profiles(&self, last_run: Option<Instant>) -> bool {
417 if self.config.missing_profile_backfill_batch_size == 0
418 || !self.config.kinds.contains(&Kind::Metadata.as_u16())
419 {
420 return false;
421 }
422 match last_run {
423 Some(last_run) => last_run.elapsed() >= MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL,
424 None => true,
425 }
426 }
427
428 fn should_history_sync_on_reconnect(
429 history_sync_on_reconnect: bool,
430 previous: Option<RelayStatus>,
431 status: RelayStatus,
432 ) -> bool {
433 history_sync_on_reconnect
434 && status == RelayStatus::Connected
435 && matches!(
436 previous,
437 Some(
438 RelayStatus::Initialized
439 | RelayStatus::Pending
440 | RelayStatus::Connecting
441 | RelayStatus::Disconnected
442 | RelayStatus::Terminated
443 )
444 )
445 }
446
447 fn should_run_reconnect_history_sync(last_run: Option<&Instant>) -> bool {
448 match last_run {
449 None => true,
450 Some(last_run) => last_run.elapsed() >= MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN,
451 }
452 }
453
454 async fn history_sync_authors(&self, authors: Vec<String>) -> Result<()> {
455 self.history_sync_authors_with_kinds(authors, &self.config.kinds)
456 .await
457 }
458
459 async fn history_sync_authors_with_kinds(
460 &self,
461 authors: Vec<String>,
462 kinds: &[u16],
463 ) -> Result<()> {
464 self.history_sync_authors_chunked(authors, |current_root, author_chunk| async move {
465 self.history_sync_author_chunk(current_root, author_chunk, kinds)
466 .await
467 })
468 .await
469 }
470
471 async fn history_sync_authors_chunked<F, Fut>(
472 &self,
473 authors: Vec<String>,
474 mut run_chunk: F,
475 ) -> Result<()>
476 where
477 F: FnMut(Option<hashtree_core::Cid>, Vec<String>) -> Fut,
478 Fut: std::future::Future<Output = Result<CrawlReport>>,
479 {
480 if authors.is_empty() {
481 return Ok(());
482 }
483
484 info!(
485 "Nostr mirror history sync starting: authors={} relays={} negentropy_only={}",
486 authors.len(),
487 self.config.relays.len(),
488 self.config.require_negentropy
489 );
490
491 let mut current_root = self.graph_store.public_events_root()?;
492 let mut last_error = None;
493 let mut applied_chunks = 0usize;
494 let mut failed_chunks = 0usize;
495 let chunk_size = self.config.history_sync_author_chunk_size.max(1);
496 let total_chunks = authors.len().div_ceil(chunk_size);
497
498 for (chunk_index, author_chunk) in authors.chunks(chunk_size).enumerate() {
499 let author_chunk = author_chunk.to_vec();
500 let author_count = author_chunk.len();
501 info!(
502 "Nostr mirror history sync chunk starting: chunk={}/{} authors={}",
503 chunk_index + 1,
504 total_chunks,
505 author_count
506 );
507 let report = match run_chunk(current_root.clone(), author_chunk).await {
508 Ok(report) => report,
509 Err(err) => {
510 failed_chunks = failed_chunks.saturating_add(1);
511 warn!(
512 "Nostr mirror history sync chunk failed: chunk={}/{} authors={} error={:#}",
513 chunk_index + 1,
514 total_chunks,
515 author_count,
516 err
517 );
518 last_error = Some(err);
519 continue;
520 }
521 };
522
523 if report.root != current_root {
524 self.apply_history_root(report.root.as_ref()).await?;
525 current_root = report.root.clone();
526 info!(
527 "Nostr mirror history sync updated trusted root: chunk={}/{} authors_processed={} events_selected={} events_seen={}",
528 chunk_index + 1,
529 total_chunks,
530 report.authors_processed,
531 report.events_selected,
532 report.events_seen
533 );
534 }
535 applied_chunks = applied_chunks.saturating_add(1);
536 }
537
538 if applied_chunks == 0 {
539 return Err(last_error
540 .unwrap_or_else(|| anyhow::anyhow!("mirror history sync made no progress"))
541 .context("run mirror history sync"));
542 }
543 if failed_chunks > 0 {
544 warn!(
545 "Nostr mirror history sync completed with skipped chunks: applied_chunks={} failed_chunks={}",
546 applied_chunks,
547 failed_chunks
548 );
549 }
550 Ok(())
551 }
552
553 async fn history_sync_author_chunk(
554 &self,
555 current_root: Option<hashtree_core::Cid>,
556 authors: Vec<String>,
557 kinds: &[u16],
558 ) -> Result<CrawlReport> {
559 let mut last_error = None;
560 let mut report = None;
561 for attempt in 0..3 {
562 let mut last_logged_authors = 0usize;
563 let bridge = NostrBridge::new(
564 self.store.store_arc(),
565 CrawlConfig {
566 relays: self.config.relays.clone(),
567 author_allowlist: Some(authors.clone()),
568 max_live_bytes: None,
569 max_events_seen: None,
570 max_authors: None,
571 max_follow_distance: None,
572 author_batch_size: self.config.author_batch_size.max(1),
573 per_author_event_limit: kinds.len().max(1),
574 per_author_live_bytes: None,
575 fetch_timeout: self.config.fetch_timeout,
576 kinds: Some(kinds.to_vec()),
577 relay_fetch_mode: RelayFetchMode::AuthorBatches,
578 require_negentropy: self.config.require_negentropy,
579 relay_event_max_size: self.config.relay_event_max_size,
580 relay_page_size: 1_000,
581 max_relay_pages: 10,
582 },
583 );
584
585 match bridge
586 .crawl_with_progress(self.graph_store.as_ref(), current_root.as_ref(), |progress| {
587 let log_interval = self.config.author_batch_size.saturating_mul(8).max(2_048);
588 let should_log = progress.authors_processed == progress.authors_considered
589 || progress.authors_processed == 0
590 || progress
591 .authors_processed
592 .saturating_sub(last_logged_authors)
593 >= log_interval;
594 if should_log {
595 last_logged_authors = progress.authors_processed;
596 info!(
597 "Nostr mirror history sync progress: authors_processed={}/{} events_selected={} events_seen={}",
598 progress.authors_processed,
599 progress.authors_considered,
600 progress.events_selected,
601 progress.events_seen
602 );
603 }
604 })
605 .await
606 {
607 Ok(next_report) => {
608 report = Some(next_report);
609 break;
610 }
611 Err(err) => {
612 last_error = Some(err);
613 if attempt < 2 {
614 tokio::time::sleep(Duration::from_millis(500)).await;
615 }
616 }
617 }
618 }
619 report
620 .ok_or_else(|| last_error.expect("history sync retry captured error"))
621 .context("run mirror history sync")
622 }
623
624 async fn apply_history_root(&self, root: Option<&hashtree_core::Cid>) -> Result<()> {
625 self.graph_store.write_public_events_root(root)?;
626 let Some(root) = root else {
627 return Ok(());
628 };
629
630 let event_store = NostrEventStore::new(self.store.store_arc());
631 let events = event_store
632 .list_recent_lossy(Some(root), ListEventsOptions::default())
633 .await
634 .context("list trusted mirrored events")?
635 .into_iter()
636 .map(socialgraph::stored_event_to_nostr_event)
637 .collect::<Result<Vec<_>>>()?;
638
639 self.graph_store
640 .rebuild_profile_index_for_events(&events)
641 .context("rebuild mirrored profile search index")?;
642 socialgraph::ingest_graph_parsed_events(self.graph_store.as_ref(), &events)
643 .context("sync mirrored social graph state")?;
644 self.note_profile_search_root_change()?;
645 if let Err(err) = self.maybe_publish_profile_search_root(false).await {
646 warn!(
647 "Nostr mirror profile-search publish failed after root update: {:#}",
648 err
649 );
650 }
651 Ok(())
652 }
653
654 async fn subscribe_authors_since(
655 &self,
656 authors: &[String],
657 since: Timestamp,
658 subscribed_authors: &mut HashSet<String>,
659 ) -> Result<()> {
660 let new_authors = authors
661 .iter()
662 .filter(|author| !subscribed_authors.contains(*author))
663 .cloned()
664 .collect::<Vec<_>>();
665 if new_authors.is_empty() {
666 return Ok(());
667 }
668
669 for chunk in new_authors.chunks(self.config.author_batch_size.max(1)) {
670 let pubkeys = chunk
671 .iter()
672 .filter_map(|author| PublicKey::from_hex(author).ok())
673 .collect::<Vec<_>>();
674 if pubkeys.is_empty() {
675 continue;
676 }
677
678 let filter = Filter::new()
679 .authors(pubkeys)
680 .kinds(self.config.kinds.iter().copied().map(Kind::from))
681 .since(since);
682
683 self.client
684 .subscribe(vec![filter], None)
685 .await
686 .context("subscribe mirror author batch")?;
687 }
688
689 subscribed_authors.extend(new_authors);
690 Ok(())
691 }
692
693 fn ingest_live_event(&self, event: &Event) -> Result<()> {
694 socialgraph::ingest_parsed_event(self.graph_store.as_ref(), event)
695 .context("ingest live mirrored event")?;
696 if event.kind == Kind::Metadata {
697 self.note_profile_search_root_change()?;
698 }
699 Ok(())
700 }
701
702 fn note_profile_search_root_change(&self) -> Result<()> {
703 let Some(_tree_name) = self.config.published_profile_search_tree_name.as_deref() else {
704 return Ok(());
705 };
706
707 let root = self.graph_store.profile_search_root()?;
708 let mut state = self
709 .profile_search_publish_state
710 .lock()
711 .expect("profile search publish state");
712 let now = Instant::now();
713
714 if state.pending_root == root {
715 return Ok(());
716 }
717
718 state.pending_root = root;
719 state.last_changed_at = Some(now);
720 if state.dirty_since.is_none() {
721 state.dirty_since = Some(now);
722 }
723 Ok(())
724 }
725
726 async fn maybe_publish_profile_search_root(&self, force: bool) -> Result<()> {
727 let Some(tree_name) = self.config.published_profile_search_tree_name.as_deref() else {
728 return Ok(());
729 };
730 let Some(publish_client) = self.publish_client.as_ref() else {
731 return Ok(());
732 };
733 if !self.has_connected_publish_relay().await {
734 return Ok(());
735 }
736
737 let pending_root = {
738 let state = self
739 .profile_search_publish_state
740 .lock()
741 .expect("profile search publish state");
742 let Some(pending_root) = state.pending_root.clone() else {
743 return Ok(());
744 };
745 if state.last_published_root.as_ref() == Some(&pending_root) {
746 return Ok(());
747 }
748
749 let now = Instant::now();
750 let debounce_ready = state.last_changed_at.is_some_and(|changed_at| {
751 now.duration_since(changed_at) >= MIRROR_ROOT_PUBLISH_DEBOUNCE
752 });
753 let stale_ready = state.dirty_since.is_some_and(|dirty_since| {
754 now.duration_since(dirty_since) >= MIRROR_ROOT_PUBLISH_MAX_STALENESS
755 });
756 if !force && !debounce_ready && !stale_ready {
757 return Ok(());
758 }
759
760 pending_root
761 };
762
763 let event = Self::build_public_root_event(tree_name, &pending_root);
764 let output = publish_client
765 .send_event_builder(event)
766 .await
767 .context("publish profile search root event")?;
768 if output.failed.is_empty() && output.success.is_empty() {
769 return Ok(());
770 }
771
772 {
773 let mut state = self
774 .profile_search_publish_state
775 .lock()
776 .expect("profile search publish state");
777 if state.pending_root.as_ref() == Some(&pending_root) {
778 state.last_published_root = Some(pending_root.clone());
779 state.last_published_at = Some(Instant::now());
780 state.dirty_since = None;
781 }
782 }
783
784 info!(
785 "Nostr mirror published profile search root: tree={} hash={}",
786 tree_name,
787 hex::encode(pending_root.hash)
788 );
789 Ok(())
790 }
791
792 fn build_public_root_event(tree_name: &str, cid: &hashtree_core::Cid) -> EventBuilder {
793 let mut tags = vec![
794 Tag::identifier(tree_name.to_string()),
795 Tag::custom(
796 TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
797 vec!["hashtree"],
798 ),
799 Tag::custom(TagKind::Custom("hash".into()), vec![hex::encode(cid.hash)]),
800 ];
801 if let Some(key) = cid.key {
802 tags.push(Tag::custom(
803 TagKind::Custom("key".into()),
804 vec![hex::encode(key)],
805 ));
806 }
807
808 EventBuilder::new(Kind::Custom(30078), "", tags)
809 }
810}
811
812#[cfg(test)]
813mod tests;