1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::sync::Mutex;
5use std::time::Duration;
6use std::time::Instant;
7
8use anyhow::{Context, Result};
9use hashtree_nostr::{
10 CrawlConfig, CrawlReport, ListEventsOptions, NostrBridge, NostrEventStore, RelayFetchMode,
11};
12use nostr::{
13 Alphabet, Event, EventBuilder, Filter, Kind, PublicKey, SingleLetterTag, Tag, TagKind,
14 Timestamp,
15};
16use nostr_sdk::{
17 pool::RelayLimits, prelude::RelayPoolNotification, Client, ClientOptions, Keys, RelayStatus,
18};
19use tokio::sync::{watch, Mutex as AsyncMutex};
20use tracing::{debug, info, warn};
21
22use crate::blossom_push::background_blossom_push_incremental_with_store;
23use crate::socialgraph::crawler::SOCIALGRAPH_RELAY_EVENT_MAX_SIZE;
24use crate::socialgraph::{self, SocialGraphBackend, SocialGraphStore};
25use crate::HashtreeStore;
26
27#[cfg(not(test))]
28const MIRROR_STARTUP_DELAY: Duration = Duration::from_secs(8);
29#[cfg(test)]
30const MIRROR_STARTUP_DELAY: Duration = Duration::from_millis(50);
31
32#[cfg(not(test))]
33const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_secs(1);
34#[cfg(test)]
35const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_millis(250);
36
37#[cfg(not(test))]
38const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
39#[cfg(test)]
40const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_millis(100);
41
42#[cfg(not(test))]
43const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_secs(30);
44#[cfg(test)]
45const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_millis(100);
46
47const KIND_LONG_FORM_CONTENT: u16 = 30_023;
48const DEFAULT_HISTORY_KINDS: [u16; 7] = [0, 1, 3, 6, 7, 9735, KIND_LONG_FORM_CONTENT];
49const DEFAULT_EVENT_TREE_NAME: &str = "nostr-event-index";
50const DEFAULT_PROFILE_SEARCH_TREE_NAME: &str = "profile-search";
51const DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME: &str = "profiles-by-pubkey";
52const MIRROR_UPLOAD_STATE_DIR: &str = "nostr-mirror";
53const MIRROR_UPLOADED_ROOT_SUFFIX: &str = ".uploaded-root";
54const METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 1;
55const METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE: usize = 64;
56const DEFAULT_FULL_TEXT_NOTE_HISTORY_FOLLOW_DISTANCE: u32 = 2;
57const DEFAULT_FULL_TEXT_NOTE_HISTORY_MAX_RELAY_PAGES: usize = 0;
58const FULL_TEXT_HISTORY_PRIORITY_MAX_DISTANCE: u32 = 1;
59const FULL_TEXT_HISTORY_PRIORITY_SAMPLE_LIMIT: usize = 32;
60const LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER: usize = 8;
61const LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 16;
62const LARGE_HISTORY_SYNC_MAX_RELAY_PAGES: usize = 20;
63
64#[cfg(not(test))]
65const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_secs(300);
66#[cfg(test)]
67const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_millis(100);
68
69#[cfg(not(test))]
70const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_secs(5);
71#[cfg(test)]
72const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_millis(20);
73
74#[cfg(not(test))]
75const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_secs(30);
76#[cfg(test)]
77const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_millis(100);
78
79#[cfg(not(test))]
80const MIRROR_ROOT_UPLOAD_RETRY_INTERVAL: Duration = Duration::from_secs(60);
81#[cfg(test)]
82const MIRROR_ROOT_UPLOAD_RETRY_INTERVAL: Duration = Duration::from_millis(100);
83
84#[cfg(not(test))]
85const MIRROR_ROOT_PUBLISH_PRIMARY_TIMEOUT: Duration = Duration::from_secs(2);
86#[cfg(test)]
87const MIRROR_ROOT_PUBLISH_PRIMARY_TIMEOUT: Duration = Duration::from_millis(250);
88
89#[cfg(not(test))]
90const MIRROR_ROOT_PUBLISH_RETRY_TIMEOUT: Duration = Duration::from_secs(5);
91#[cfg(test)]
92const MIRROR_ROOT_PUBLISH_RETRY_TIMEOUT: Duration = Duration::from_secs(2);
93
94const MISSING_LOCAL_BLOB_PUSH_ERROR: &str = "missing local blob";
95
96fn trim_transient_allocations() {
97 #[cfg(all(target_os = "linux", target_env = "gnu"))]
98 unsafe {
99 libc::malloc_trim(0);
103 }
104}
105
106fn decode_hex_pubkey(value: &str) -> Option<[u8; 32]> {
107 let bytes = hex::decode(value).ok()?;
108 <[u8; 32]>::try_from(bytes.as_slice()).ok()
109}
110
111#[derive(Debug, Clone)]
112pub struct NostrMirrorConfig {
113 pub relays: Vec<String>,
114 pub publish_relays: Vec<String>,
115 pub blossom_write_servers: Vec<String>,
116 pub max_follow_distance: u32,
117 pub overmute_threshold: f64,
118 pub author_batch_size: usize,
119 pub history_sync_author_chunk_size: usize,
120 pub history_sync_per_author_event_limit: usize,
121 pub missing_profile_backfill_batch_size: usize,
122 pub fetch_timeout: Duration,
123 pub relay_event_max_size: Option<u32>,
124 pub require_negentropy: bool,
125 pub kinds: Vec<u16>,
126 pub history_sync_on_start: bool,
127 pub history_sync_on_reconnect: bool,
128 pub full_text_note_history_follow_distance: Option<u32>,
129 pub full_text_note_history_max_relay_pages: usize,
130 pub published_event_tree_name: Option<String>,
131 pub published_profile_search_tree_name: Option<String>,
132 pub published_profiles_by_pubkey_tree_name: Option<String>,
133}
134
135impl Default for NostrMirrorConfig {
136 fn default() -> Self {
137 Self {
138 relays: Vec::new(),
139 publish_relays: Vec::new(),
140 blossom_write_servers: Vec::new(),
141 max_follow_distance: 2,
142 overmute_threshold: 1.0,
143 author_batch_size: 256,
144 history_sync_author_chunk_size: 5_000,
145 history_sync_per_author_event_limit: 256,
146 missing_profile_backfill_batch_size: 5_000,
147 fetch_timeout: Duration::from_secs(15),
148 relay_event_max_size: Some(SOCIALGRAPH_RELAY_EVENT_MAX_SIZE),
149 require_negentropy: false,
150 kinds: DEFAULT_HISTORY_KINDS.to_vec(),
151 history_sync_on_start: true,
152 history_sync_on_reconnect: true,
153 full_text_note_history_follow_distance: Some(
154 DEFAULT_FULL_TEXT_NOTE_HISTORY_FOLLOW_DISTANCE,
155 ),
156 full_text_note_history_max_relay_pages: DEFAULT_FULL_TEXT_NOTE_HISTORY_MAX_RELAY_PAGES,
157 published_event_tree_name: Some(DEFAULT_EVENT_TREE_NAME.to_string()),
158 published_profile_search_tree_name: Some(DEFAULT_PROFILE_SEARCH_TREE_NAME.to_string()),
159 published_profiles_by_pubkey_tree_name: Some(
160 DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME.to_string(),
161 ),
162 }
163 }
164}
165
166#[derive(Debug, Default)]
167struct RootPublishState {
168 pending_root: Option<hashtree_core::Cid>,
169 last_changed_at: Option<Instant>,
170 dirty_since: Option<Instant>,
171 last_published_root: Option<hashtree_core::Cid>,
172 last_published_at: Option<Instant>,
173 last_published_created_at: Option<Timestamp>,
174 last_uploaded_root: Option<hashtree_core::Cid>,
175 last_uploaded_at: Option<Instant>,
176 upload_in_progress_root: Option<hashtree_core::Cid>,
177 last_upload_failed_at: Option<Instant>,
178 last_upload_error: Option<String>,
179 missing_blob_rebuild_required: bool,
180}
181
182#[derive(Debug, Clone, Copy, PartialEq, Eq)]
183struct HistorySyncPlan {
184 relay_fetch_mode: RelayFetchMode,
185 author_batch_size: usize,
186 per_author_event_limit: usize,
187 relay_page_size: usize,
188 max_relay_pages: usize,
189}
190
191pub struct BackgroundNostrMirror {
192 config: NostrMirrorConfig,
193 store: Arc<HashtreeStore>,
194 graph_store: Arc<SocialGraphStore>,
195 client: Client,
196 publish_client: Option<Client>,
197 event_publish_state: Arc<Mutex<RootPublishState>>,
198 profile_search_publish_state: Arc<Mutex<RootPublishState>>,
199 profiles_by_pubkey_publish_state: Arc<Mutex<RootPublishState>>,
200 pending_live_events: Mutex<BTreeMap<String, Event>>,
201 missing_profile_cursor: Mutex<usize>,
202 history_sync_lock: AsyncMutex<()>,
203 shutdown_tx: watch::Sender<bool>,
204 shutdown_rx: watch::Receiver<bool>,
205}
206
207impl BackgroundNostrMirror {
208 pub async fn new(
209 config: NostrMirrorConfig,
210 store: Arc<HashtreeStore>,
211 graph_store: Arc<SocialGraphStore>,
212 publish_keys: Option<Keys>,
213 ) -> Result<Self> {
214 let client = if let Some(max_size) = config.relay_event_max_size {
215 let mut limits = RelayLimits::default();
216 limits.events.max_size = Some(max_size);
217 Client::builder()
218 .signer(Keys::generate())
219 .opts(ClientOptions::new().relay_limits(limits))
220 .build()
221 } else {
222 Client::new(Keys::generate())
223 };
224 for relay in &config.relays {
225 client
226 .add_relay(relay)
227 .await
228 .with_context(|| format!("add mirror relay {relay}"))?;
229 }
230 client.connect().await;
231
232 let publish_client = if let Some(keys) = publish_keys {
233 if config.publish_relays.is_empty() {
234 None
235 } else {
236 let client = Client::new(keys);
237 for relay in &config.publish_relays {
238 client
239 .add_relay(relay)
240 .await
241 .with_context(|| format!("add mirror publish relay {relay}"))?;
242 }
243 client.connect().await;
244 Some(client)
245 }
246 } else {
247 None
248 };
249
250 let (shutdown_tx, shutdown_rx) = watch::channel(false);
251 let event_publish_state = Self::root_publish_state_from_disk(
252 store.base_path(),
253 config.published_event_tree_name.as_deref(),
254 "event root",
255 );
256 let profile_search_publish_state = Self::root_publish_state_from_disk(
257 store.base_path(),
258 config.published_profile_search_tree_name.as_deref(),
259 "profile-search root",
260 );
261 let profiles_by_pubkey_publish_state = Self::root_publish_state_from_disk(
262 store.base_path(),
263 config.published_profiles_by_pubkey_tree_name.as_deref(),
264 "profiles-by-pubkey root",
265 );
266 Ok(Self {
267 config,
268 store,
269 graph_store,
270 client,
271 publish_client,
272 event_publish_state: Arc::new(Mutex::new(event_publish_state)),
273 profile_search_publish_state: Arc::new(Mutex::new(profile_search_publish_state)),
274 profiles_by_pubkey_publish_state: Arc::new(Mutex::new(
275 profiles_by_pubkey_publish_state,
276 )),
277 pending_live_events: Mutex::new(BTreeMap::new()),
278 missing_profile_cursor: Mutex::new(0),
279 history_sync_lock: AsyncMutex::new(()),
280 shutdown_tx,
281 shutdown_rx,
282 })
283 }
284
285 fn root_publish_state_from_disk(
286 base_path: &std::path::Path,
287 tree_name: Option<&str>,
288 log_label: &str,
289 ) -> RootPublishState {
290 let Some(tree_name) = tree_name else {
291 return RootPublishState::default();
292 };
293 let path = Self::uploaded_root_state_path(base_path, tree_name);
294 let Some(root) = Self::read_uploaded_root_state(&path, log_label) else {
295 return RootPublishState::default();
296 };
297
298 RootPublishState {
299 last_uploaded_root: Some(root),
300 ..RootPublishState::default()
301 }
302 }
303
304 fn uploaded_root_state_path(base_path: &std::path::Path, tree_name: &str) -> PathBuf {
305 let safe_tree_name = tree_name
306 .chars()
307 .map(|ch| {
308 if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
309 ch
310 } else {
311 '_'
312 }
313 })
314 .collect::<String>();
315 base_path
316 .join(MIRROR_UPLOAD_STATE_DIR)
317 .join(format!("{safe_tree_name}{MIRROR_UPLOADED_ROOT_SUFFIX}"))
318 }
319
320 fn read_uploaded_root_state(
321 path: &std::path::Path,
322 log_label: &str,
323 ) -> Option<hashtree_core::Cid> {
324 let raw = match std::fs::read_to_string(path) {
325 Ok(raw) => raw,
326 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return None,
327 Err(err) => {
328 warn!(
329 "Nostr mirror failed to read uploaded {} state {}: {}",
330 log_label,
331 path.display(),
332 err
333 );
334 return None;
335 }
336 };
337 let trimmed = raw.trim();
338 if trimmed.is_empty() {
339 return None;
340 }
341 match hashtree_core::Cid::parse(trimmed) {
342 Ok(root) => Some(root),
343 Err(err) => {
344 warn!(
345 "Nostr mirror ignored invalid uploaded {} state {}: {}",
346 log_label,
347 path.display(),
348 err
349 );
350 None
351 }
352 }
353 }
354
355 fn write_uploaded_root_state(
356 path: &std::path::Path,
357 root: &hashtree_core::Cid,
358 log_label: &str,
359 ) -> Result<()> {
360 if let Some(parent) = path.parent() {
361 std::fs::create_dir_all(parent)
362 .with_context(|| format!("create {}", parent.display()))?;
363 }
364 std::fs::write(path, format!("{root}\n"))
365 .with_context(|| format!("write uploaded {log_label} state {}", path.display()))
366 }
367
368 pub fn shutdown(&self) {
369 let _ = self.shutdown_tx.send(true);
370 }
371
372 fn sync_publish_roots_from_store(&self) -> Result<()> {
373 self.note_public_events_root_change()?;
374 self.note_profile_search_root_change()?;
375 self.note_profiles_by_pubkey_root_change()?;
376 Ok(())
377 }
378
379 async fn publish_pending_roots(
380 &self,
381 force_event: bool,
382 force_profile_search: bool,
383 force_profiles_by_pubkey: bool,
384 ) -> (Result<()>, Result<()>, Result<()>) {
385 tokio::join!(
386 self.maybe_publish_event_root(force_event),
387 self.maybe_publish_profile_search_root(force_profile_search),
388 self.maybe_publish_profiles_by_pubkey_root(force_profiles_by_pubkey),
389 )
390 }
391
392 async fn publish_priority_roots(
393 &self,
394 force_event: bool,
395 force_profile_search: bool,
396 force_profiles_by_pubkey: bool,
397 ) -> (Result<()>, Result<()>, Result<()>) {
398 let (profile_search_result, profiles_by_pubkey_result) = tokio::join!(
399 async {
400 if force_profile_search {
401 self.maybe_publish_profile_search_root(true).await
402 } else {
403 Ok(())
404 }
405 },
406 async {
407 if force_profiles_by_pubkey {
408 self.maybe_publish_profiles_by_pubkey_root(true).await
409 } else {
410 Ok(())
411 }
412 },
413 );
414 let event_result = if force_event {
415 self.maybe_publish_event_root(true).await
416 } else {
417 Ok(())
418 };
419 (
420 event_result,
421 profile_search_result,
422 profiles_by_pubkey_result,
423 )
424 }
425
426 pub async fn run(self: Arc<Self>) -> Result<()> {
427 if self.config.relays.is_empty() || self.config.max_follow_distance == 0 {
428 return Ok(());
429 }
430
431 info!(
432 "Nostr mirror starting: relays={} max_follow_distance={} negentropy_only={} kinds={:?} history_sync_author_chunk_size={} history_sync_on_start={} history_sync_on_reconnect={}",
433 self.config.relays.len(),
434 self.config.max_follow_distance,
435 self.config.require_negentropy,
436 self.config.kinds,
437 self.config.history_sync_author_chunk_size.max(1),
438 self.config.history_sync_on_start,
439 self.config.history_sync_on_reconnect
440 );
441
442 tokio::time::sleep(MIRROR_STARTUP_DELAY).await;
443 tokio::time::sleep(MIRROR_CONNECT_SETTLE_DELAY).await;
444 let live_since = Timestamp::now();
445 self.sync_publish_roots_from_store()?;
446 let (event_result, profile_search_result, profiles_by_pubkey_result) =
447 self.publish_priority_roots(true, true, true).await;
448 if let Err(err) = event_result {
449 warn!(
450 "Nostr mirror event-root publish failed on startup: {:#}",
451 err
452 );
453 }
454 if let Err(err) = profile_search_result {
455 warn!(
456 "Nostr mirror profile-search publish failed on startup: {:#}",
457 err
458 );
459 }
460 if let Err(err) = profiles_by_pubkey_result {
461 warn!(
462 "Nostr mirror profiles-by-pubkey publish failed on startup: {:#}",
463 err
464 );
465 }
466
467 let initial_authors = self.collect_authors()?;
468 if initial_authors.is_empty() {
469 info!("Nostr mirror: no social-graph authors to mirror yet");
470 }
471
472 let mut subscribed_authors = HashSet::new();
473 self.subscribe_authors_since(&initial_authors, live_since, &mut subscribed_authors)
474 .await?;
475
476 if !initial_authors.is_empty() && self.config.history_sync_on_start {
477 self.spawn_startup_history_sync(initial_authors.clone());
478 }
479
480 let mut relay_statuses = self.capture_relay_statuses().await;
481 let mut last_reconnect_history_sync_at: Option<Instant> = None;
482 let mut last_missing_profile_backfill_at: Option<Instant> = None;
483 let mut notifications = self.client.notifications();
484 let mut shutdown_rx = self.shutdown_rx.clone();
485 let mut refresh_interval = tokio::time::interval(MIRROR_AUTHOR_REFRESH_INTERVAL);
486 let mut publish_interval = tokio::time::interval(MIRROR_ROOT_PUBLISH_DEBOUNCE);
487
488 loop {
489 tokio::select! {
490 _ = shutdown_rx.changed() => {
491 if *shutdown_rx.borrow() {
492 break;
493 }
494 }
495 _ = refresh_interval.tick() => {
496 let authors = self.collect_authors()?;
497 let mut reconnected_relay = None;
498 for (relay_url, status) in self.capture_relay_statuses().await {
499 let previous = relay_statuses.insert(relay_url.clone(), status);
500 if reconnected_relay.is_none()
501 && Self::should_history_sync_on_reconnect(
502 self.config.history_sync_on_reconnect,
503 previous,
504 status,
505 )
506 {
507 reconnected_relay = Some(relay_url);
508 }
509 }
510 if let Some(relay_url) = reconnected_relay {
511 if Self::should_run_reconnect_history_sync(
512 last_reconnect_history_sync_at.as_ref(),
513 ) && !authors.is_empty()
514 {
515 info!(
516 "Nostr mirror relay reconnected; running catch-up history sync: relay={} authors={} negentropy_only={}",
517 relay_url,
518 authors.len(),
519 self.config.require_negentropy
520 );
521 self.spawn_author_history_sync(
522 "relay reconnect catch-up",
523 authors.clone(),
524 false,
525 false,
526 );
527 last_reconnect_history_sync_at = Some(Instant::now());
528 }
529 }
530 let new_authors = authors
531 .iter()
532 .cloned()
533 .filter(|author| !subscribed_authors.contains(author))
534 .collect::<Vec<_>>();
535 if !new_authors.is_empty() {
536 debug!(
537 "Nostr mirror discovered {} newly reachable author(s)",
538 new_authors.len()
539 );
540 self.subscribe_authors_since(
541 &new_authors,
542 Timestamp::now(),
543 &mut subscribed_authors,
544 )
545 .await?;
546 self.spawn_author_history_sync(
547 "new-author catch-up",
548 new_authors.clone(),
549 true,
550 true,
551 );
552 }
553 if self.should_backfill_missing_profiles(last_missing_profile_backfill_at) {
554 let missing_profile_authors = self.collect_missing_profile_authors(
555 self.config.missing_profile_backfill_batch_size,
556 )?;
557 if !missing_profile_authors.is_empty() {
558 info!(
559 "Nostr mirror missing-profile backfill starting: authors={}",
560 missing_profile_authors.len()
561 );
562 self.spawn_missing_profile_backfill(missing_profile_authors);
563 last_missing_profile_backfill_at = Some(Instant::now());
564 }
565 }
566 }
567 _ = publish_interval.tick() => {
568 self.sync_publish_roots_from_store()?;
569 if let Err(err) = self.flush_live_events().await {
570 warn!("Nostr mirror live event flush failed: {:#}", err);
571 }
572 let (event_result, profile_search_result, profiles_by_pubkey_result) = self
573 .publish_pending_roots(false, false, false)
574 .await;
575 if let Err(err) = event_result {
576 warn!("Nostr mirror event-root publish failed: {:#}", err);
577 }
578 if let Err(err) = profile_search_result {
579 warn!("Nostr mirror profile-search publish failed: {:#}", err);
580 }
581 if let Err(err) = profiles_by_pubkey_result {
582 warn!("Nostr mirror profiles-by-pubkey publish failed: {:#}", err);
583 }
584 }
585 notification = notifications.recv() => {
586 match notification {
587 Ok(RelayPoolNotification::Event { event, .. }) => {
588 self.ingest_live_event(&event)?;
589 }
590 Ok(RelayPoolNotification::Shutdown) => break,
591 Ok(_) => {}
592 Err(err) => {
593 warn!("Nostr mirror notification error: {}", err);
594 break;
595 }
596 }
597 }
598 }
599 }
600
601 if let Err(err) = self.flush_live_events().await {
602 warn!(
603 "Nostr mirror live event flush failed during shutdown: {:#}",
604 err
605 );
606 }
607 if let Err(err) = self.sync_publish_roots_from_store() {
608 warn!(
609 "Nostr mirror root-state refresh failed during shutdown: {:#}",
610 err
611 );
612 }
613 let (event_result, profile_search_result, profiles_by_pubkey_result) =
614 self.publish_pending_roots(true, true, true).await;
615 if let Err(err) = event_result {
616 warn!(
617 "Nostr mirror event-root publish failed during shutdown: {:#}",
618 err
619 );
620 }
621 if let Err(err) = profile_search_result {
622 warn!(
623 "Nostr mirror profile-search publish failed during shutdown: {:#}",
624 err
625 );
626 }
627 if let Err(err) = profiles_by_pubkey_result {
628 warn!(
629 "Nostr mirror profiles-by-pubkey publish failed during shutdown: {:#}",
630 err
631 );
632 }
633 let _ = self.client.disconnect().await;
634 if let Some(client) = self.publish_client.as_ref() {
635 let _ = client.disconnect().await;
636 }
637 Ok(())
638 }
639
640 fn spawn_startup_history_sync(self: &Arc<Self>, initial_authors: Vec<String>) {
641 let mirror = Arc::clone(self);
642 tokio::task::spawn_blocking(move || {
643 let runtime = tokio::runtime::Builder::new_current_thread()
644 .enable_all()
645 .build()
646 .expect("build nostr mirror startup history sync runtime");
647 runtime.block_on(async move {
648 let _guard = mirror.history_sync_lock.lock().await;
649 if let Err(err) = mirror.run_startup_history_sync(initial_authors).await {
650 warn!("Nostr mirror startup history sync failed: {:#}", err);
651 }
652 });
653 });
654 }
655
656 async fn run_startup_history_sync(&self, initial_authors: Vec<String>) -> Result<()> {
657 self.history_sync_full_text_notes_for_reachable_authors()
658 .await?;
659 self.history_sync_authors(initial_authors).await?;
660 if self.should_backfill_missing_profiles(None) {
661 let missing_profile_authors = self
662 .collect_missing_profile_authors(self.config.missing_profile_backfill_batch_size)?;
663 if !missing_profile_authors.is_empty() {
664 info!(
665 "Nostr mirror missing-profile backfill starting: authors={}",
666 missing_profile_authors.len()
667 );
668 self.history_sync_authors_with_kinds(
669 missing_profile_authors,
670 &[Kind::Metadata.as_u16()],
671 )
672 .await?;
673 }
674 }
675 Ok(())
676 }
677
678 fn spawn_author_history_sync(
679 self: &Arc<Self>,
680 label: &'static str,
681 authors: Vec<String>,
682 include_full_text_notes: bool,
683 wait_for_existing_sync: bool,
684 ) {
685 let mirror = Arc::clone(self);
686 tokio::task::spawn_blocking(move || {
687 let runtime = tokio::runtime::Builder::new_current_thread()
688 .enable_all()
689 .build()
690 .expect("build nostr mirror author history sync runtime");
691 runtime.block_on(async move {
692 if wait_for_existing_sync {
693 let _guard = mirror.history_sync_lock.lock().await;
694 if let Err(err) = mirror
695 .run_author_history_sync(authors, include_full_text_notes)
696 .await
697 {
698 warn!("Nostr mirror {label} failed: {:#}", err);
699 }
700 return;
701 }
702
703 let Ok(_guard) = mirror.history_sync_lock.try_lock() else {
704 info!("Nostr mirror {label} skipped; another history sync is running");
705 return;
706 };
707 if let Err(err) = mirror
708 .run_author_history_sync(authors, include_full_text_notes)
709 .await
710 {
711 warn!("Nostr mirror {label} failed: {:#}", err);
712 }
713 });
714 });
715 }
716
717 async fn run_author_history_sync(
718 &self,
719 authors: Vec<String>,
720 include_full_text_notes: bool,
721 ) -> Result<()> {
722 if include_full_text_notes {
723 self.history_sync_full_text_notes_for_authors(authors.clone())
724 .await?;
725 }
726 self.history_sync_authors(authors).await
727 }
728
729 fn spawn_missing_profile_backfill(self: &Arc<Self>, authors: Vec<String>) {
730 let mirror = Arc::clone(self);
731 tokio::task::spawn_blocking(move || {
732 let runtime = tokio::runtime::Builder::new_current_thread()
733 .enable_all()
734 .build()
735 .expect("build nostr mirror missing profile runtime");
736 runtime.block_on(async move {
737 let Ok(_guard) = mirror.history_sync_lock.try_lock() else {
738 info!(
739 "Nostr mirror missing-profile backfill skipped; another history sync is running"
740 );
741 return;
742 };
743 if let Err(err) = mirror
744 .history_sync_authors_with_kinds(authors, &[Kind::Metadata.as_u16()])
745 .await
746 {
747 warn!("Nostr mirror missing-profile backfill failed: {:#}", err);
748 }
749 });
750 });
751 }
752
753 async fn capture_relay_statuses(&self) -> HashMap<String, RelayStatus> {
754 let mut statuses = HashMap::new();
755 for (relay_url, relay) in self.client.relays().await {
756 statuses.insert(relay_url.to_string(), relay.status());
757 }
758 statuses
759 }
760
761 async fn has_connected_publish_relay(&self) -> bool {
762 let Some(client) = self.publish_client.as_ref() else {
763 return false;
764 };
765 Self::client_has_connected_relay(client).await
766 }
767
768 async fn client_has_connected_relay(client: &Client) -> bool {
769 for (_relay_url, relay) in client.relays().await {
770 if relay.status() == RelayStatus::Connected {
771 return true;
772 }
773 }
774 false
775 }
776
777 fn collect_authors(&self) -> Result<Vec<String>> {
778 self.collect_authors_with_max_distance(self.config.max_follow_distance)
779 }
780
781 fn collect_authors_with_max_distance(&self, max_distance: u32) -> Result<Vec<String>> {
782 let mut authors = Vec::new();
783 let mut seen = HashSet::new();
784 for distance in 0..=max_distance {
785 for pubkey in socialgraph::SocialGraphBackend::users_by_follow_distance(
786 self.graph_store.as_ref(),
787 distance,
788 )
789 .with_context(|| format!("load social-graph distance {distance}"))?
790 {
791 if self
792 .graph_store
793 .is_overmuted_user(&pubkey, self.config.overmute_threshold)?
794 {
795 continue;
796 }
797 let hex = hex::encode(pubkey);
798 if seen.insert(hex.clone()) {
799 authors.push(hex);
800 }
801 }
802 }
803 Ok(authors)
804 }
805
806 async fn prioritize_full_text_note_history_authors(
807 &self,
808 authors: Vec<String>,
809 ) -> Result<Vec<String>> {
810 let Some(root) = self.graph_store.public_events_root()? else {
811 return Ok(authors);
812 };
813
814 let event_store = NostrEventStore::new(self.store.store_arc());
815 let mut prioritized = Vec::with_capacity(authors.len());
816 let mut sampled = 0usize;
817 for (index, author) in authors.into_iter().enumerate() {
818 let distance = match decode_hex_pubkey(&author) {
819 Some(pubkey) => self
820 .graph_store
821 .follow_distance(&pubkey)?
822 .unwrap_or(u32::MAX),
823 None => u32::MAX,
824 };
825 let indexed_text_sample = if distance <= FULL_TEXT_HISTORY_PRIORITY_MAX_DISTANCE {
826 sampled = sampled.saturating_add(1);
827 event_store
828 .list_by_author_and_kind(
829 Some(&root),
830 &author,
831 Kind::TextNote.as_u16() as u32,
832 ListEventsOptions {
833 limit: Some(FULL_TEXT_HISTORY_PRIORITY_SAMPLE_LIMIT),
834 ..ListEventsOptions::default()
835 },
836 )
837 .await
838 .with_context(|| {
839 format!("sample indexed text-note history for author {author}")
840 })?
841 .len()
842 } else {
843 FULL_TEXT_HISTORY_PRIORITY_SAMPLE_LIMIT
844 };
845 prioritized.push((distance, indexed_text_sample, index, author));
846 }
847
848 prioritized.sort_by(|left, right| {
849 left.0
850 .cmp(&right.0)
851 .then_with(|| left.1.cmp(&right.1))
852 .then_with(|| left.2.cmp(&right.2))
853 });
854 info!(
855 "Nostr mirror full text content history prioritized authors: authors={} sampled_distance_le_{}={}",
856 prioritized.len(),
857 FULL_TEXT_HISTORY_PRIORITY_MAX_DISTANCE,
858 sampled
859 );
860 Ok(prioritized
861 .into_iter()
862 .map(|(_, _, _, author)| author)
863 .collect())
864 }
865
866 fn full_text_note_history_follow_distance(&self) -> Option<u32> {
867 let distance = self.config.full_text_note_history_follow_distance?;
868 if self
869 .config
870 .kinds
871 .iter()
872 .any(|kind| *kind == Kind::TextNote.as_u16() || *kind == KIND_LONG_FORM_CONTENT)
873 {
874 Some(distance.min(self.config.max_follow_distance))
875 } else {
876 None
877 }
878 }
879
880 fn full_text_note_history_max_relay_pages(&self) -> Option<usize> {
881 Self::full_text_note_history_max_relay_pages_for_config(&self.config)
882 }
883
884 fn full_text_note_history_max_relay_pages_for_config(
885 config: &NostrMirrorConfig,
886 ) -> Option<usize> {
887 let pages = config.full_text_note_history_max_relay_pages;
888 if pages == 0 {
889 None
890 } else {
891 Some(pages)
892 }
893 }
894
895 fn is_text_content_history_kind(kind: u16) -> bool {
896 kind == Kind::TextNote.as_u16() || kind == KIND_LONG_FORM_CONTENT
897 }
898
899 fn history_sync_kinds_for_config(config: &NostrMirrorConfig) -> Vec<u16> {
900 let mut kinds = config.kinds.clone();
901 kinds.retain(|kind| !Self::is_text_content_history_kind(*kind));
902 kinds
903 }
904
905 fn collect_missing_profile_authors(&self, limit: usize) -> Result<Vec<String>> {
906 if limit == 0 {
907 return Ok(Vec::new());
908 }
909
910 let authors = self.collect_authors()?;
911 if authors.is_empty() {
912 return Ok(Vec::new());
913 }
914
915 let mut cursor = self
916 .missing_profile_cursor
917 .lock()
918 .expect("missing profile cursor");
919 let mut index = (*cursor).min(authors.len());
920 let mut scanned = 0usize;
921 let mut missing = Vec::new();
922
923 while scanned < authors.len() && missing.len() < limit {
924 let author = &authors[index];
925 if self.graph_store.latest_profile_event(author)?.is_none() {
926 missing.push(author.clone());
927 }
928 index += 1;
929 if index == authors.len() {
930 index = 0;
931 }
932 scanned += 1;
933 }
934
935 *cursor = index;
936 Ok(missing)
937 }
938
939 fn should_backfill_missing_profiles(&self, last_run: Option<Instant>) -> bool {
940 if self.config.missing_profile_backfill_batch_size == 0
941 || !self.config.kinds.contains(&Kind::Metadata.as_u16())
942 {
943 return false;
944 }
945 match last_run {
946 Some(last_run) => last_run.elapsed() >= MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL,
947 None => true,
948 }
949 }
950
951 fn should_history_sync_on_reconnect(
952 history_sync_on_reconnect: bool,
953 previous: Option<RelayStatus>,
954 status: RelayStatus,
955 ) -> bool {
956 history_sync_on_reconnect
957 && status == RelayStatus::Connected
958 && matches!(
959 previous,
960 Some(
961 RelayStatus::Initialized
962 | RelayStatus::Pending
963 | RelayStatus::Connecting
964 | RelayStatus::Disconnected
965 | RelayStatus::Terminated
966 )
967 )
968 }
969
970 fn should_run_reconnect_history_sync(last_run: Option<&Instant>) -> bool {
971 match last_run {
972 None => true,
973 Some(last_run) => last_run.elapsed() >= MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN,
974 }
975 }
976
977 fn is_metadata_only_history_sync(kinds: &[u16]) -> bool {
978 !kinds.is_empty() && kinds.iter().all(|kind| *kind == Kind::Metadata.as_u16())
979 }
980
981 fn history_sync_kinds_affect_profile_or_graph(kinds: &[u16]) -> bool {
982 kinds.is_empty()
983 || kinds.iter().any(|kind| {
984 *kind == Kind::Metadata.as_u16()
985 || *kind == Kind::ContactList.as_u16()
986 || *kind == Kind::MuteList.as_u16()
987 })
988 }
989
990 fn history_sync_plan_for(
991 config: &NostrMirrorConfig,
992 authors: usize,
993 kinds: &[u16],
994 ) -> HistorySyncPlan {
995 let author_batch_size = config.author_batch_size.max(1);
996 let per_author_event_limit = config.history_sync_per_author_event_limit.max(1);
997 let relay_page_size = 1_000;
998 let max_relay_pages = 10;
999
1000 if Self::is_metadata_only_history_sync(kinds) {
1001 return HistorySyncPlan {
1002 relay_fetch_mode: RelayFetchMode::AuthorBatches,
1003 author_batch_size: author_batch_size.min(METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE),
1004 per_author_event_limit: METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT,
1005 relay_page_size,
1006 max_relay_pages,
1007 };
1008 }
1009
1010 if authors > author_batch_size.saturating_mul(LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER) {
1011 return HistorySyncPlan {
1012 relay_fetch_mode: RelayFetchMode::GlobalRecent,
1013 author_batch_size,
1014 per_author_event_limit: per_author_event_limit
1015 .min(LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT)
1016 .max(1),
1017 relay_page_size,
1018 max_relay_pages: LARGE_HISTORY_SYNC_MAX_RELAY_PAGES,
1019 };
1020 }
1021
1022 HistorySyncPlan {
1023 relay_fetch_mode: RelayFetchMode::AuthorBatches,
1024 author_batch_size,
1025 per_author_event_limit,
1026 relay_page_size,
1027 max_relay_pages,
1028 }
1029 }
1030
1031 fn history_sync_plan(&self, authors: usize, kinds: &[u16]) -> HistorySyncPlan {
1032 Self::history_sync_plan_for(&self.config, authors, kinds)
1033 }
1034
1035 fn history_sync_chunk_size_for_config(
1036 config: &NostrMirrorConfig,
1037 authors: usize,
1038 kinds: &[u16],
1039 full_author_history: bool,
1040 chunk_size_override: Option<usize>,
1041 ) -> usize {
1042 let configured_chunk_size = chunk_size_override
1043 .unwrap_or(config.history_sync_author_chunk_size)
1044 .max(1);
1045 if full_author_history {
1046 return 1;
1047 }
1048 if !full_author_history
1049 && Self::history_sync_plan_for(config, authors, kinds).relay_fetch_mode
1050 == RelayFetchMode::GlobalRecent
1051 {
1052 return authors.max(1);
1053 }
1054 configured_chunk_size
1055 }
1056
1057 async fn history_sync_authors(&self, authors: Vec<String>) -> Result<()> {
1058 let kinds = Self::history_sync_kinds_for_config(&self.config);
1059 if kinds.is_empty() {
1060 info!("Nostr mirror history sync skipped: no enabled history kinds");
1061 return Ok(());
1062 }
1063 self.history_sync_authors_with_kinds(authors, &kinds).await
1064 }
1065
1066 async fn history_sync_authors_with_kinds(
1067 &self,
1068 authors: Vec<String>,
1069 kinds: &[u16],
1070 ) -> Result<()> {
1071 self.history_sync_authors_with_kinds_and_mode(authors, kinds, false, None)
1072 .await
1073 }
1074
1075 async fn history_sync_full_text_notes_for_reachable_authors(&self) -> Result<()> {
1076 let Some(distance) = self.full_text_note_history_follow_distance() else {
1077 return Ok(());
1078 };
1079 if self.full_text_note_history_max_relay_pages().is_none() {
1080 info!("Nostr mirror full text content history sync skipped: max_relay_pages=0");
1081 return Ok(());
1082 }
1083 info!(
1084 "Nostr mirror full text content history author collection starting: max_follow_distance={distance}"
1085 );
1086 let authors = self
1087 .prioritize_full_text_note_history_authors(
1088 self.collect_authors_with_max_distance(distance)?,
1089 )
1090 .await?;
1091 let Some(max_relay_pages) = self.full_text_note_history_max_relay_pages() else {
1092 info!("Nostr mirror full text content history sync skipped: max_relay_pages=0");
1093 return Ok(());
1094 };
1095 self.history_sync_distance_filtered_full_text_notes(authors, distance, max_relay_pages)
1096 .await
1097 }
1098
1099 async fn history_sync_full_text_notes_for_authors(&self, authors: Vec<String>) -> Result<()> {
1100 let Some(distance) = self.full_text_note_history_follow_distance() else {
1101 return Ok(());
1102 };
1103 let Some(max_relay_pages) = self.full_text_note_history_max_relay_pages() else {
1104 info!("Nostr mirror full text content history sync skipped: max_relay_pages=0");
1105 return Ok(());
1106 };
1107 let mut close_authors = Vec::new();
1108 for author in authors {
1109 let Some(pubkey) = decode_hex_pubkey(&author) else {
1110 continue;
1111 };
1112 if self
1113 .graph_store
1114 .follow_distance(&pubkey)?
1115 .is_some_and(|actual_distance| actual_distance <= distance)
1116 {
1117 close_authors.push(author);
1118 }
1119 }
1120 if close_authors.is_empty() {
1121 return Ok(());
1122 }
1123 let close_authors = self
1124 .prioritize_full_text_note_history_authors(close_authors)
1125 .await?;
1126
1127 self.history_sync_distance_filtered_full_text_notes(
1128 close_authors,
1129 distance,
1130 max_relay_pages,
1131 )
1132 .await
1133 }
1134
1135 async fn history_sync_distance_filtered_full_text_notes(
1136 &self,
1137 close_authors: Vec<String>,
1138 distance: u32,
1139 max_relay_pages: usize,
1140 ) -> Result<()> {
1141 if close_authors.is_empty() {
1142 return Ok(());
1143 }
1144
1145 info!(
1146 "Nostr mirror full text content history sync starting: authors={} max_follow_distance={} max_relay_pages={}",
1147 close_authors.len(),
1148 distance,
1149 max_relay_pages
1150 );
1151 let kinds = [Kind::TextNote.as_u16(), KIND_LONG_FORM_CONTENT];
1152 self.history_sync_authors_with_kinds_and_mode(
1153 close_authors,
1154 &kinds,
1155 true,
1156 Some(max_relay_pages),
1157 )
1158 .await
1159 }
1160
1161 async fn history_sync_authors_with_kinds_and_mode(
1162 &self,
1163 authors: Vec<String>,
1164 kinds: &[u16],
1165 full_author_history: bool,
1166 max_relay_pages: Option<usize>,
1167 ) -> Result<()> {
1168 let update_profile_and_graph = Self::history_sync_kinds_affect_profile_or_graph(kinds);
1169 let chunk_size = Self::history_sync_chunk_size_for_config(
1170 &self.config,
1171 authors.len(),
1172 kinds,
1173 full_author_history,
1174 None,
1175 );
1176 self.history_sync_authors_chunked(
1177 authors,
1178 |current_root, author_chunk| async move {
1179 self.history_sync_author_chunk(
1180 current_root,
1181 author_chunk,
1182 kinds,
1183 full_author_history,
1184 max_relay_pages,
1185 )
1186 .await
1187 },
1188 update_profile_and_graph,
1189 Some(chunk_size),
1190 )
1191 .await
1192 }
1193
1194 async fn history_sync_authors_chunked<F, Fut>(
1195 &self,
1196 authors: Vec<String>,
1197 mut run_chunk: F,
1198 update_profile_and_graph: bool,
1199 chunk_size_override: Option<usize>,
1200 ) -> Result<()>
1201 where
1202 F: FnMut(Option<hashtree_core::Cid>, Vec<String>) -> Fut,
1203 Fut: std::future::Future<Output = Result<CrawlReport>>,
1204 {
1205 if authors.is_empty() {
1206 return Ok(());
1207 }
1208
1209 info!(
1210 "Nostr mirror history sync starting: authors={} relays={} negentropy_only={}",
1211 authors.len(),
1212 self.config.relays.len(),
1213 self.config.require_negentropy
1214 );
1215
1216 let mut current_root = self.graph_store.public_events_root_for_write()?;
1217 let mut last_error = None;
1218 let mut applied_chunks = 0usize;
1219 let mut failed_chunks = 0usize;
1220 let chunk_size = chunk_size_override
1221 .unwrap_or(self.config.history_sync_author_chunk_size)
1222 .max(1);
1223 let total_chunks = authors.len().div_ceil(chunk_size);
1224
1225 for (chunk_index, author_chunk) in authors.chunks(chunk_size).enumerate() {
1226 let author_chunk = author_chunk.to_vec();
1227 let author_count = author_chunk.len();
1228 info!(
1229 "Nostr mirror history sync chunk starting: chunk={}/{} authors={}",
1230 chunk_index + 1,
1231 total_chunks,
1232 author_count
1233 );
1234 let mut report = match run_chunk(current_root.clone(), author_chunk.clone()).await {
1235 Ok(report) => report,
1236 Err(err) => {
1237 failed_chunks = failed_chunks.saturating_add(1);
1238 warn!(
1239 "Nostr mirror history sync chunk failed: chunk={}/{} authors={} error={:#}",
1240 chunk_index + 1,
1241 total_chunks,
1242 author_count,
1243 err
1244 );
1245 last_error = Some(err);
1246 trim_transient_allocations();
1247 continue;
1248 }
1249 };
1250
1251 let latest_root = self.graph_store.public_events_root_for_write()?;
1252 if latest_root != current_root {
1253 info!(
1254 "Nostr mirror history sync root advanced while chunk was fetching; merging chunk into latest root: chunk={}/{} authors={} events_applied={}",
1255 chunk_index + 1,
1256 total_chunks,
1257 author_count,
1258 report.applied_events.len()
1259 );
1260 if report.applied_events.is_empty() {
1261 report.root = latest_root.clone();
1262 } else {
1263 let event_store = NostrEventStore::new(self.store.store_arc());
1264 report.root = event_store
1265 .build(latest_root.as_ref(), report.applied_events.clone())
1266 .await
1267 .context("merge history chunk into latest mirrored event root")?;
1268 }
1269 current_root = latest_root;
1270 }
1271
1272 if report.root != current_root {
1273 self.apply_history_root_with_options(
1274 report.root.as_ref(),
1275 update_profile_and_graph,
1276 true,
1277 Some(&report.applied_events),
1278 )
1279 .await?;
1280 current_root = report.root.clone();
1281 info!(
1282 "Nostr mirror history sync updated trusted root: chunk={}/{} authors_processed={} events_selected={} events_seen={}",
1283 chunk_index + 1,
1284 total_chunks,
1285 report.authors_processed,
1286 report.events_selected,
1287 report.events_seen
1288 );
1289 }
1290 applied_chunks = applied_chunks.saturating_add(1);
1291 trim_transient_allocations();
1292 }
1293
1294 if applied_chunks == 0 {
1295 return Err(last_error
1296 .unwrap_or_else(|| anyhow::anyhow!("mirror history sync made no progress"))
1297 .context("run mirror history sync"));
1298 }
1299 if failed_chunks > 0 {
1300 warn!(
1301 "Nostr mirror history sync completed with skipped chunks: applied_chunks={} failed_chunks={}",
1302 applied_chunks, failed_chunks
1303 );
1304 }
1305 Ok(())
1306 }
1307
1308 async fn history_sync_author_chunk(
1309 &self,
1310 current_root: Option<hashtree_core::Cid>,
1311 authors: Vec<String>,
1312 kinds: &[u16],
1313 full_author_history: bool,
1314 max_relay_pages: Option<usize>,
1315 ) -> Result<CrawlReport> {
1316 let mut last_error = None;
1317 let mut report = None;
1318 let mut plan = self.history_sync_plan(authors.len(), kinds);
1319 if full_author_history {
1320 plan.relay_fetch_mode = RelayFetchMode::AuthorBatches;
1321 plan.max_relay_pages = max_relay_pages.unwrap_or(plan.max_relay_pages);
1322 }
1323 for attempt in 0..3 {
1324 let mut last_logged_authors = 0usize;
1325 let bridge = NostrBridge::new(
1326 self.store.store_arc(),
1327 CrawlConfig {
1328 relays: self.config.relays.clone(),
1329 author_allowlist: Some(authors.clone()),
1330 max_live_bytes: None,
1331 max_events_seen: None,
1332 max_authors: None,
1333 max_follow_distance: None,
1334 author_batch_size: plan.author_batch_size,
1335 per_author_event_limit: plan.per_author_event_limit,
1336 per_author_live_bytes: None,
1337 fetch_timeout: self.config.fetch_timeout,
1338 kinds: Some(kinds.to_vec()),
1339 relay_fetch_mode: plan.relay_fetch_mode,
1340 require_negentropy: self.config.require_negentropy,
1341 relay_event_max_size: self.config.relay_event_max_size,
1342 relay_page_size: plan.relay_page_size,
1343 max_relay_pages: plan.max_relay_pages,
1344 full_author_history,
1345 },
1346 );
1347
1348 let crawl = bridge.crawl_with_progress(
1349 self.graph_store.as_ref(),
1350 current_root.as_ref(),
1351 |progress| {
1352 let log_interval = self.config.author_batch_size.saturating_mul(8).max(2_048);
1353 let should_log = progress.authors_processed == progress.authors_considered
1354 || progress.authors_processed == 0
1355 || progress
1356 .authors_processed
1357 .saturating_sub(last_logged_authors)
1358 >= log_interval;
1359 if should_log {
1360 last_logged_authors = progress.authors_processed;
1361 info!(
1362 "Nostr mirror history sync progress: authors_processed={}/{} events_selected={} events_seen={}",
1363 progress.authors_processed,
1364 progress.authors_considered,
1365 progress.events_selected,
1366 progress.events_seen
1367 );
1368 }
1369 },
1370 );
1371 let crawl_result: Result<CrawlReport> = if full_author_history {
1372 let timeout = self.config.fetch_timeout.saturating_mul(4);
1373 match tokio::time::timeout(timeout, crawl).await {
1374 Ok(result) => result.map_err(Into::into),
1375 Err(_) => Err(anyhow::anyhow!(
1376 "full author history crawl timed out after {:?} for {} author(s)",
1377 timeout,
1378 authors.len()
1379 )),
1380 }
1381 } else {
1382 crawl.await.map_err(Into::into)
1383 };
1384
1385 match crawl_result {
1386 Ok(next_report) => {
1387 report = Some(next_report);
1388 break;
1389 }
1390 Err(err) => {
1391 last_error = Some(err);
1392 if full_author_history {
1393 break;
1394 }
1395 if attempt < 2 {
1396 tokio::time::sleep(Duration::from_millis(500)).await;
1397 }
1398 }
1399 }
1400 }
1401 report
1402 .ok_or_else(|| last_error.expect("history sync retry captured error"))
1403 .context("run mirror history sync")
1404 }
1405
1406 #[cfg(test)]
1407 async fn apply_history_root(&self, root: Option<&hashtree_core::Cid>) -> Result<()> {
1408 self.apply_history_root_with_options(root, true, true, None)
1409 .await
1410 }
1411
1412 async fn apply_history_root_with_options(
1413 &self,
1414 root: Option<&hashtree_core::Cid>,
1415 update_profile_and_graph: bool,
1416 publish_roots: bool,
1417 applied_events: Option<&[hashtree_nostr::StoredNostrEvent]>,
1418 ) -> Result<()> {
1419 self.graph_store.write_public_events_root(root)?;
1420 let Some(root) = root else {
1421 return Ok(());
1422 };
1423
1424 self.note_public_events_root_change()?;
1425 if update_profile_and_graph {
1426 let events = match applied_events {
1427 Some(events) => events
1428 .iter()
1429 .cloned()
1430 .map(socialgraph::stored_event_to_nostr_event)
1431 .collect::<Result<Vec<_>>>()?,
1432 None => {
1433 let event_store = NostrEventStore::new(self.store.store_arc());
1434 event_store
1435 .list_recent_lossy(Some(root), ListEventsOptions::default())
1436 .await
1437 .context("list trusted mirrored events")?
1438 .into_iter()
1439 .map(socialgraph::stored_event_to_nostr_event)
1440 .collect::<Result<Vec<_>>>()?
1441 }
1442 };
1443
1444 socialgraph::ingest_graph_parsed_events(self.graph_store.as_ref(), &events)
1445 .context("sync mirrored social graph state")?;
1446 if applied_events.is_some() {
1447 self.graph_store
1448 .sync_profile_index_for_events(&events)
1449 .context("update mirrored profile search index")?;
1450 } else {
1451 self.graph_store
1452 .rebuild_profile_index_for_events(&events)
1453 .context("rebuild mirrored profile search index")?;
1454 }
1455 self.note_profile_search_root_change()?;
1456 self.note_profiles_by_pubkey_root_change()?;
1457 }
1458 if !publish_roots {
1459 return Ok(());
1460 }
1461 let (event_result, profile_search_result, profiles_by_pubkey_result) = self
1462 .publish_priority_roots(true, update_profile_and_graph, update_profile_and_graph)
1463 .await;
1464 if let Err(err) = event_result {
1465 warn!(
1466 "Nostr mirror event-root publish failed after root update: {:#}",
1467 err
1468 );
1469 }
1470 if let Err(err) = profile_search_result {
1471 warn!(
1472 "Nostr mirror profile-search publish failed after root update: {:#}",
1473 err
1474 );
1475 }
1476 if let Err(err) = profiles_by_pubkey_result {
1477 warn!(
1478 "Nostr mirror profiles-by-pubkey publish failed after root update: {:#}",
1479 err
1480 );
1481 }
1482 Ok(())
1483 }
1484
1485 async fn subscribe_authors_since(
1486 &self,
1487 authors: &[String],
1488 since: Timestamp,
1489 subscribed_authors: &mut HashSet<String>,
1490 ) -> Result<()> {
1491 let new_authors = authors
1492 .iter()
1493 .filter(|author| !subscribed_authors.contains(*author))
1494 .cloned()
1495 .collect::<Vec<_>>();
1496 if new_authors.is_empty() {
1497 return Ok(());
1498 }
1499
1500 for chunk in new_authors.chunks(self.config.author_batch_size.max(1)) {
1501 let pubkeys = chunk
1502 .iter()
1503 .filter_map(|author| PublicKey::from_hex(author).ok())
1504 .collect::<Vec<_>>();
1505 if pubkeys.is_empty() {
1506 continue;
1507 }
1508
1509 let filter = Filter::new()
1510 .authors(pubkeys)
1511 .kinds(self.config.kinds.iter().copied().map(Kind::from))
1512 .since(since);
1513
1514 if let Err(err) = self.client.subscribe(filter, None).await {
1515 warn!(
1516 "Nostr mirror author subscription failed: authors={} error={:#}",
1517 chunk.len(),
1518 err
1519 );
1520 continue;
1521 }
1522 subscribed_authors.extend(chunk.iter().cloned());
1523 }
1524 Ok(())
1525 }
1526
1527 fn ingest_live_event(&self, event: &Event) -> Result<()> {
1528 self.pending_live_events
1529 .lock()
1530 .expect("pending live events")
1531 .insert(event.id.to_hex(), event.clone());
1532 Ok(())
1533 }
1534
1535 async fn flush_live_events(&self) -> Result<()> {
1536 let pending = {
1537 let mut pending = self
1538 .pending_live_events
1539 .lock()
1540 .expect("pending live events");
1541 if pending.is_empty() {
1542 return Ok(());
1543 }
1544 std::mem::take(&mut *pending)
1545 };
1546 let events = pending.into_values().collect::<Vec<_>>();
1547 let event_count = events.len();
1548 let previous_event_root = self.graph_store.public_events_root()?;
1549 let previous_profile_search_root = self.graph_store.profile_search_root()?;
1550 let previous_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
1551
1552 socialgraph::ingest_parsed_events_with_storage_class(
1553 self.graph_store.as_ref(),
1554 &events,
1555 socialgraph::EventStorageClass::Public,
1556 )
1557 .context("ingest live mirrored event batch")?;
1558
1559 let next_event_root = self.graph_store.public_events_root()?;
1560 let next_profile_search_root = self.graph_store.profile_search_root()?;
1561 let next_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
1562 let event_root_changed = next_event_root != previous_event_root;
1563 let profile_search_root_changed = next_profile_search_root != previous_profile_search_root;
1564 let profiles_by_pubkey_root_changed =
1565 next_profiles_by_pubkey_root != previous_profiles_by_pubkey_root;
1566
1567 if event_root_changed {
1568 self.note_public_events_root_change()?;
1569 }
1570 if profile_search_root_changed {
1571 self.note_profile_search_root_change()?;
1572 }
1573 if profiles_by_pubkey_root_changed {
1574 self.note_profiles_by_pubkey_root_change()?;
1575 }
1576 if profile_search_root_changed {
1577 self.maybe_publish_profile_search_root(false).await?;
1578 }
1579 if profiles_by_pubkey_root_changed {
1580 self.maybe_publish_profiles_by_pubkey_root(false).await?;
1581 }
1582 if event_root_changed {
1583 self.maybe_publish_event_root(false).await?;
1584 }
1585 info!(
1586 "Nostr mirror flushed live events: events={} event_root_changed={} profile_search_root_changed={} profiles_by_pubkey_root_changed={}",
1587 event_count,
1588 event_root_changed,
1589 profile_search_root_changed,
1590 profiles_by_pubkey_root_changed
1591 );
1592 Ok(())
1593 }
1594
1595 fn note_public_events_root_change(&self) -> Result<()> {
1596 let root = self.graph_store.public_events_root()?;
1597 Self::note_root_change(
1598 self.config.published_event_tree_name.as_deref(),
1599 &self.event_publish_state,
1600 root,
1601 )
1602 }
1603
1604 fn note_profile_search_root_change(&self) -> Result<()> {
1605 let root = self.graph_store.profile_search_root()?;
1606 Self::note_root_change(
1607 self.config.published_profile_search_tree_name.as_deref(),
1608 &self.profile_search_publish_state,
1609 root,
1610 )
1611 }
1612
1613 fn note_profiles_by_pubkey_root_change(&self) -> Result<()> {
1614 let root = self.graph_store.profiles_by_pubkey_root()?;
1615 Self::note_root_change(
1616 self.config
1617 .published_profiles_by_pubkey_tree_name
1618 .as_deref(),
1619 &self.profiles_by_pubkey_publish_state,
1620 root,
1621 )
1622 }
1623
1624 fn note_root_change(
1625 tree_name: Option<&str>,
1626 publish_state: &Arc<Mutex<RootPublishState>>,
1627 root: Option<hashtree_core::Cid>,
1628 ) -> Result<()> {
1629 let Some(_tree_name) = tree_name else {
1630 return Ok(());
1631 };
1632
1633 let mut state = publish_state.lock().expect("root publish state");
1634 let now = Instant::now();
1635
1636 if state.pending_root == root {
1637 return Ok(());
1638 }
1639
1640 state.pending_root = root;
1641 state.last_upload_failed_at = None;
1642 state.last_upload_error = None;
1643 state.last_changed_at = Some(now);
1644 if state.dirty_since.is_none() {
1645 state.dirty_since = Some(now);
1646 }
1647 Ok(())
1648 }
1649
1650 async fn maybe_publish_event_root(&self, force: bool) -> Result<()> {
1651 if self.take_missing_blob_event_upload_error() {
1652 return self.rebuild_event_indexes_after_missing_blobs(force).await;
1653 }
1654 let result = self
1655 .maybe_publish_root(
1656 self.config.published_event_tree_name.as_deref(),
1657 &self.event_publish_state,
1658 "event root",
1659 force,
1660 false,
1661 )
1662 .await;
1663 let Err(error) = result else {
1664 if self.take_missing_blob_event_upload_error() {
1665 return self.rebuild_event_indexes_after_missing_blobs(force).await;
1666 }
1667 return Ok(());
1668 };
1669 if !is_missing_local_blob_push_error(&error) {
1670 return Err(error);
1671 }
1672
1673 self.rebuild_event_indexes_after_missing_blobs(force).await
1674 }
1675
1676 fn take_missing_blob_event_upload_error(&self) -> bool {
1677 let mut state = self
1678 .event_publish_state
1679 .lock()
1680 .expect("event root publish state");
1681 if state.upload_in_progress_root.is_some() {
1682 return false;
1683 }
1684 if !state.missing_blob_rebuild_required {
1685 return false;
1686 }
1687 state.missing_blob_rebuild_required = false;
1688 state.last_upload_error = None;
1689 state.last_upload_failed_at = None;
1690 true
1691 }
1692
1693 async fn rebuild_event_indexes_after_missing_blobs(&self, force: bool) -> Result<()> {
1694 warn!(
1695 "Nostr mirror event root DAG references missing local blobs; rebuilding event indexes from stored events"
1696 );
1697 let (public_count, ambient_count) = self
1698 .graph_store
1699 .rebuild_event_indexes_from_stored_events_async()
1700 .await
1701 .context("rebuild event indexes after missing event blobs")?;
1702 info!(
1703 "Nostr mirror rebuilt event indexes after missing blobs: public={} ambient={}",
1704 public_count, ambient_count
1705 );
1706 self.sync_publish_roots_from_store()?;
1707
1708 self.maybe_publish_root(
1709 self.config.published_event_tree_name.as_deref(),
1710 &self.event_publish_state,
1711 "event root",
1712 force,
1713 false,
1714 )
1715 .await
1716 }
1717
1718 async fn maybe_publish_profile_search_root(&self, force: bool) -> Result<()> {
1719 self.maybe_publish_root(
1720 self.config.published_profile_search_tree_name.as_deref(),
1721 &self.profile_search_publish_state,
1722 "profile search root",
1723 force,
1724 true,
1725 )
1726 .await
1727 }
1728
1729 async fn maybe_publish_profiles_by_pubkey_root(&self, force: bool) -> Result<()> {
1730 self.maybe_publish_root(
1731 self.config
1732 .published_profiles_by_pubkey_tree_name
1733 .as_deref(),
1734 &self.profiles_by_pubkey_publish_state,
1735 "profiles-by-pubkey root",
1736 force,
1737 true,
1738 )
1739 .await
1740 }
1741
1742 async fn maybe_publish_root(
1743 &self,
1744 tree_name: Option<&str>,
1745 publish_state: &Arc<Mutex<RootPublishState>>,
1746 log_label: &str,
1747 force: bool,
1748 publish_before_upload_ready_on_force: bool,
1749 ) -> Result<()> {
1750 let Some(tree_name) = tree_name else {
1751 return Ok(());
1752 };
1753
1754 let pending_root = {
1755 let state = publish_state.lock().expect("root publish state");
1756 let Some(pending_root) = state.pending_root.clone() else {
1757 return Ok(());
1758 };
1759
1760 let now = Instant::now();
1761 let debounce_ready = state.last_changed_at.is_some_and(|changed_at| {
1762 now.duration_since(changed_at) >= MIRROR_ROOT_PUBLISH_DEBOUNCE
1763 });
1764 let stale_ready = state.dirty_since.is_some_and(|dirty_since| {
1765 now.duration_since(dirty_since) >= MIRROR_ROOT_PUBLISH_MAX_STALENESS
1766 });
1767 if !force && !debounce_ready && !stale_ready {
1768 return Ok(());
1769 }
1770
1771 pending_root
1772 };
1773
1774 let upload_started = self.maybe_start_background_root_upload(
1775 tree_name,
1776 &pending_root,
1777 publish_state,
1778 log_label,
1779 );
1780 let upload_required = !self.config.blossom_write_servers.is_empty();
1781 let (upload_ready, publish_root) = {
1782 let state = publish_state.lock().expect("root publish state");
1783 let upload_ready =
1784 !upload_required || state.last_uploaded_root.as_ref() == Some(&pending_root);
1785 let publish_root = if upload_ready {
1786 Some(pending_root.clone())
1787 } else {
1788 state.last_uploaded_root.clone().filter(|uploaded_root| {
1789 state.last_published_root.as_ref() != Some(uploaded_root)
1790 })
1791 };
1792 (upload_ready, publish_root)
1793 };
1794 let publish_before_upload_ready =
1795 force && upload_required && !upload_ready && publish_before_upload_ready_on_force;
1796 let publish_root = if let Some(publish_root) = publish_root {
1797 publish_root
1798 } else if publish_before_upload_ready {
1799 pending_root.clone()
1800 } else {
1801 if upload_started {
1802 info!(
1803 "Nostr mirror uploading {} DAG before publish: tree={} hash={}",
1804 log_label,
1805 tree_name,
1806 hex::encode(pending_root.hash),
1807 );
1808 }
1809 return Ok(());
1810 };
1811
1812 let mut successful_relays = Vec::new();
1813 let mut failed_relays = Vec::new();
1814 let mut published_now = false;
1815 let publish_required =
1816 self.publish_client.is_some() && !self.config.publish_relays.is_empty();
1817 if publish_required {
1818 let Some(publish_client) = self.publish_client.as_ref() else {
1819 unreachable!("publish_required implies publish_client");
1820 };
1821 if !self.has_connected_publish_relay().await {
1822 return Ok(());
1823 }
1824 if publish_before_upload_ready {
1825 info!(
1826 "Nostr mirror publishing {} before Blossom upload completes: tree={} hash={}",
1827 log_label,
1828 tree_name,
1829 hex::encode(pending_root.hash),
1830 );
1831 } else if !upload_ready {
1832 info!(
1833 "Nostr mirror publishing uploaded {} while newer root is still uploading: tree={} published_hash={} pending_hash={}",
1834 log_label,
1835 tree_name,
1836 hex::encode(publish_root.hash),
1837 hex::encode(pending_root.hash),
1838 );
1839 }
1840
1841 let already_published = {
1842 let state = publish_state.lock().expect("root publish state");
1843 state.last_published_root.as_ref() == Some(&publish_root)
1844 };
1845 if !already_published {
1846 let publish_relays = self.config.publish_relays.clone();
1847 let latest_known_created_at = {
1848 let state = publish_state.lock().expect("root publish state");
1849 state.last_published_created_at
1850 };
1851 let publish_created_at =
1852 next_replaceable_created_at(Timestamp::now(), latest_known_created_at);
1853 let event = publish_client
1854 .sign_event_builder(Self::build_public_root_event(
1855 tree_name,
1856 &publish_root,
1857 publish_created_at,
1858 ))
1859 .await
1860 .with_context(|| format!("sign {log_label} event"))?;
1861 let publish_result = self
1862 .publish_root_event_to_relays(publish_client, &publish_relays, &event)
1863 .await
1864 .with_context(|| format!("publish {log_label} event"))?;
1865 successful_relays = publish_result.0;
1866 failed_relays = publish_result.1;
1867 if successful_relays.is_empty() {
1868 let failure_summary = if failed_relays.is_empty() {
1869 "no publish relays accepted the event".to_string()
1870 } else {
1871 failed_relays.join("; ")
1872 };
1873 anyhow::bail!("no publish relays accepted the event ({failure_summary})");
1874 }
1875
1876 let mut state = publish_state.lock().expect("root publish state");
1877 if state.pending_root.as_ref() == Some(&pending_root)
1878 || state.last_uploaded_root.as_ref() == Some(&publish_root)
1879 || publish_before_upload_ready
1880 {
1881 state.last_published_root = Some(publish_root.clone());
1882 state.last_published_at = Some(Instant::now());
1883 state.last_published_created_at = Some(event.created_at);
1884 }
1885 published_now = true;
1886 }
1887 }
1888
1889 {
1890 let mut state = publish_state.lock().expect("root publish state");
1891 if state.pending_root.as_ref() == Some(&pending_root) {
1892 let upload_satisfied = self.config.blossom_write_servers.is_empty()
1893 || state.last_uploaded_root.as_ref() == Some(&pending_root);
1894 let publish_satisfied =
1895 !publish_required || state.last_published_root.as_ref() == Some(&pending_root);
1896 if upload_satisfied && publish_satisfied {
1897 state.dirty_since = None;
1898 }
1899 }
1900 }
1901
1902 if published_now {
1903 info!(
1904 "Nostr mirror published {}: tree={} hash={} relays={:?}",
1905 log_label,
1906 tree_name,
1907 hex::encode(publish_root.hash),
1908 successful_relays,
1909 );
1910 }
1911 if !failed_relays.is_empty() {
1912 warn!(
1913 "Nostr mirror publish had relay failures: tree={} failures={:?}",
1914 tree_name, failed_relays
1915 );
1916 }
1917 Ok(())
1918 }
1919
1920 fn maybe_start_background_root_upload(
1921 &self,
1922 tree_name: &str,
1923 pending_root: &hashtree_core::Cid,
1924 publish_state: &Arc<Mutex<RootPublishState>>,
1925 log_label: &str,
1926 ) -> bool {
1927 if self.config.blossom_write_servers.is_empty() {
1928 return false;
1929 }
1930
1931 let previous_uploaded_root = {
1932 let mut state = publish_state.lock().expect("root publish state");
1933 if state.last_uploaded_root.as_ref() == Some(pending_root)
1934 || state.upload_in_progress_root.is_some()
1935 {
1936 return false;
1937 }
1938 if state
1939 .last_upload_failed_at
1940 .is_some_and(|failed_at| failed_at.elapsed() < MIRROR_ROOT_UPLOAD_RETRY_INTERVAL)
1941 {
1942 return false;
1943 }
1944 state.upload_in_progress_root = Some(pending_root.clone());
1945 state.last_uploaded_root.clone()
1946 };
1947
1948 let store = Arc::clone(&self.store);
1949 let upload_state_path = Self::uploaded_root_state_path(store.base_path(), tree_name);
1950 let servers = self.config.blossom_write_servers.clone();
1951 let root = pending_root.clone();
1952 let publish_state = Arc::clone(publish_state);
1953 let log_label = log_label.to_string();
1954 tokio::task::spawn_blocking(move || {
1955 let runtime = tokio::runtime::Builder::new_current_thread()
1956 .enable_all()
1957 .build()
1958 .expect("build nostr mirror root upload runtime");
1959 runtime.block_on(async move {
1960 let result = background_blossom_push_incremental_with_store(
1961 store,
1962 root.clone(),
1963 previous_uploaded_root,
1964 &servers,
1965 )
1966 .await;
1967 let mut state = publish_state.lock().expect("root publish state");
1968 if state.upload_in_progress_root.as_ref() == Some(&root) {
1969 state.upload_in_progress_root = None;
1970 }
1971 match result {
1972 Ok(()) => {
1973 if let Err(err) =
1974 Self::write_uploaded_root_state(&upload_state_path, &root, &log_label)
1975 {
1976 warn!("Nostr mirror failed to persist uploaded root state: {err:#}");
1977 }
1978 state.last_uploaded_root = Some(root.clone());
1979 state.last_uploaded_at = Some(Instant::now());
1980 if state.pending_root.as_ref() == Some(&root) {
1981 state.last_upload_failed_at = None;
1982 state.last_upload_error = None;
1983 state.missing_blob_rebuild_required = false;
1984 }
1985 info!(
1986 "Nostr mirror uploaded {} DAG to Blossom: hash={}",
1987 log_label,
1988 hex::encode(root.hash)
1989 );
1990 }
1991 Err(err) => {
1992 if state.pending_root.as_ref() == Some(&root) {
1993 state.last_upload_failed_at = Some(Instant::now());
1994 state.last_upload_error = Some(format!("{err:#}"));
1995 }
1996 if is_missing_local_blob_message(&format!("{err:#}")) {
1997 state.missing_blob_rebuild_required = true;
1998 }
1999 warn!(
2000 "Nostr mirror {} DAG upload failed: hash={} error={:#}",
2001 log_label,
2002 hex::encode(root.hash),
2003 err
2004 );
2005 }
2006 }
2007 });
2008 trim_transient_allocations();
2009 });
2010
2011 true
2012 }
2013
2014 async fn publish_root_event_to_relays(
2015 &self,
2016 publish_client: &Client,
2017 relays: &[String],
2018 event: &Event,
2019 ) -> Result<(Vec<String>, Vec<String>)> {
2020 let primary_send = Self::send_root_event_to_relays(publish_client, relays, event);
2021 let (mut successful_relays, mut failed_relays) =
2022 match tokio::time::timeout(MIRROR_ROOT_PUBLISH_PRIMARY_TIMEOUT, primary_send).await {
2023 Ok(result) => result,
2024 Err(_) => (
2025 Vec::new(),
2026 vec![format!(
2027 "publish relays: primary publish timed out after {:?}",
2028 MIRROR_ROOT_PUBLISH_PRIMARY_TIMEOUT
2029 )],
2030 ),
2031 };
2032
2033 if successful_relays.is_empty() {
2034 let (retry_successes, retry_failures) = self
2035 .publish_root_event_with_fresh_client(relays, event)
2036 .await;
2037 successful_relays.extend(retry_successes);
2038 failed_relays.extend(retry_failures);
2039 }
2040
2041 Ok((successful_relays, failed_relays))
2042 }
2043
2044 async fn send_root_event_to_relays(
2045 publish_client: &Client,
2046 relays: &[String],
2047 event: &Event,
2048 ) -> (Vec<String>, Vec<String>) {
2049 let mut successful_relays = Vec::new();
2050 let mut failed_relays = Vec::new();
2051
2052 match publish_client
2053 .send_event_to(relays.iter().map(|relay| relay.as_str()), event)
2054 .await
2055 {
2056 Ok(output) => {
2057 for relay in relays {
2058 let relay_url = relay.trim_end_matches('/');
2059 if output
2060 .success
2061 .iter()
2062 .any(|url| url.as_str().trim_end_matches('/') == relay_url)
2063 {
2064 successful_relays.push(relay.clone());
2065 }
2066 }
2067 failed_relays.extend(
2068 output
2069 .failed
2070 .into_iter()
2071 .map(|(url, reason)| format!("{url}: {reason}")),
2072 );
2073 }
2074 Err(err) => {
2075 failed_relays.push(format!("publish relays: {err}"));
2076 }
2077 }
2078
2079 (successful_relays, failed_relays)
2080 }
2081
2082 async fn publish_root_event_with_fresh_client(
2083 &self,
2084 relays: &[String],
2085 event: &Event,
2086 ) -> (Vec<String>, Vec<String>) {
2087 let client = Client::new(Keys::generate());
2088 let mut setup_failures = Vec::new();
2089 for relay in relays {
2090 if let Err(err) = client.add_relay(relay).await {
2091 setup_failures.push(format!("{relay}: add relay failed: {err}"));
2092 }
2093 }
2094
2095 client.connect().await;
2096 let publish = Self::send_root_event_to_relays(&client, relays, event);
2097 let retry_timeout = self
2098 .config
2099 .fetch_timeout
2100 .min(MIRROR_ROOT_PUBLISH_RETRY_TIMEOUT);
2101 let result = tokio::time::timeout(retry_timeout, publish).await;
2102 let _ = client.disconnect().await;
2103
2104 match result {
2105 Ok((successful_relays, mut failed_relays)) => {
2106 failed_relays.extend(setup_failures);
2107 (successful_relays, failed_relays)
2108 }
2109 Err(_) => {
2110 setup_failures.push(format!(
2111 "fresh publish client timed out after {:?}",
2112 retry_timeout
2113 ));
2114 (Vec::new(), setup_failures)
2115 }
2116 }
2117 }
2118
2119 fn build_public_root_event(
2120 tree_name: &str,
2121 cid: &hashtree_core::Cid,
2122 created_at: Timestamp,
2123 ) -> EventBuilder {
2124 let mut tags = vec![
2125 Tag::identifier(tree_name.to_string()),
2126 Tag::custom(
2127 TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
2128 vec!["hashtree"],
2129 ),
2130 Tag::custom(TagKind::Custom("hash".into()), vec![hex::encode(cid.hash)]),
2131 ];
2132 if let Some(key) = cid.key {
2133 tags.push(Tag::custom(
2134 TagKind::Custom("key".into()),
2135 vec![hex::encode(key)],
2136 ));
2137 }
2138
2139 EventBuilder::new(Kind::Custom(30078), "")
2140 .tags(tags)
2141 .custom_created_at(created_at)
2142 }
2143}
2144
2145fn is_missing_local_blob_push_error(error: &anyhow::Error) -> bool {
2146 error
2147 .chain()
2148 .any(|cause| cause.to_string().contains(MISSING_LOCAL_BLOB_PUSH_ERROR))
2149}
2150
2151fn is_missing_local_blob_message(message: &str) -> bool {
2152 message.contains(MISSING_LOCAL_BLOB_PUSH_ERROR)
2153}
2154
2155fn next_replaceable_created_at(now: Timestamp, latest_existing: Option<Timestamp>) -> Timestamp {
2156 match latest_existing {
2157 Some(latest) if latest >= now => Timestamp::from_secs(latest.as_secs().saturating_add(1)),
2158 _ => now,
2159 }
2160}
2161
2162#[cfg(test)]
2163mod tests;