1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::sync::Mutex;
5use std::time::Duration;
6use std::time::Instant;
7
8use anyhow::{Context, Result};
9use hashtree_nostr::{
10 CrawlConfig, CrawlReport, ListEventsOptions, NostrBridge, NostrEventStore, RelayFetchMode,
11};
12use nostr::{
13 Alphabet, Event, EventBuilder, Filter, Kind, PublicKey, SingleLetterTag, Tag, TagKind,
14 Timestamp,
15};
16use nostr_sdk::{
17 pool::RelayLimits, prelude::RelayPoolNotification, Client, Keys, Options, RelayStatus,
18};
19use tokio::sync::{watch, Mutex as AsyncMutex};
20use tracing::{debug, info, warn};
21
22use crate::blossom_push::background_blossom_push_incremental_with_store;
23use crate::socialgraph::crawler::SOCIALGRAPH_RELAY_EVENT_MAX_SIZE;
24use crate::socialgraph::{self, SocialGraphBackend, SocialGraphStore};
25use crate::HashtreeStore;
26
27#[cfg(not(test))]
28const MIRROR_STARTUP_DELAY: Duration = Duration::from_secs(8);
29#[cfg(test)]
30const MIRROR_STARTUP_DELAY: Duration = Duration::from_millis(50);
31
32#[cfg(not(test))]
33const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_secs(1);
34#[cfg(test)]
35const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_millis(250);
36
37#[cfg(not(test))]
38const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
39#[cfg(test)]
40const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_millis(100);
41
42#[cfg(not(test))]
43const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_secs(30);
44#[cfg(test)]
45const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_millis(100);
46
47const KIND_LONG_FORM_CONTENT: u16 = 30_023;
48const DEFAULT_HISTORY_KINDS: [u16; 7] = [0, 1, 3, 6, 7, 9735, KIND_LONG_FORM_CONTENT];
49const DEFAULT_EVENT_TREE_NAME: &str = "nostr-event-index";
50const DEFAULT_PROFILE_SEARCH_TREE_NAME: &str = "profile-search";
51const DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME: &str = "profiles-by-pubkey";
52const MIRROR_UPLOAD_STATE_DIR: &str = "nostr-mirror";
53const MIRROR_UPLOADED_ROOT_SUFFIX: &str = ".uploaded-root";
54const METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 1;
55const METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE: usize = 64;
56const DEFAULT_FULL_TEXT_NOTE_HISTORY_FOLLOW_DISTANCE: u32 = 2;
57const DEFAULT_FULL_TEXT_NOTE_HISTORY_MAX_RELAY_PAGES: usize = 0;
58const FULL_TEXT_HISTORY_PRIORITY_MAX_DISTANCE: u32 = 1;
59const FULL_TEXT_HISTORY_PRIORITY_SAMPLE_LIMIT: usize = 32;
60const LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER: usize = 8;
61const LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 16;
62const LARGE_HISTORY_SYNC_MAX_RELAY_PAGES: usize = 20;
63
64#[cfg(not(test))]
65const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_secs(300);
66#[cfg(test)]
67const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_millis(100);
68
69#[cfg(not(test))]
70const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_secs(5);
71#[cfg(test)]
72const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_millis(20);
73
74#[cfg(not(test))]
75const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_secs(30);
76#[cfg(test)]
77const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_millis(100);
78
79#[cfg(not(test))]
80const MIRROR_ROOT_UPLOAD_RETRY_INTERVAL: Duration = Duration::from_secs(60);
81#[cfg(test)]
82const MIRROR_ROOT_UPLOAD_RETRY_INTERVAL: Duration = Duration::from_millis(100);
83
84#[cfg(not(test))]
85const MIRROR_ROOT_PUBLISH_PRIMARY_TIMEOUT: Duration = Duration::from_secs(2);
86#[cfg(test)]
87const MIRROR_ROOT_PUBLISH_PRIMARY_TIMEOUT: Duration = Duration::from_millis(250);
88
89#[cfg(not(test))]
90const MIRROR_ROOT_PUBLISH_RETRY_TIMEOUT: Duration = Duration::from_secs(5);
91#[cfg(test)]
92const MIRROR_ROOT_PUBLISH_RETRY_TIMEOUT: Duration = Duration::from_secs(2);
93
94const MISSING_LOCAL_BLOB_PUSH_ERROR: &str = "missing local blob";
95
96fn trim_transient_allocations() {
97 #[cfg(target_os = "linux")]
98 unsafe {
99 libc::malloc_trim(0);
103 }
104}
105
106fn decode_hex_pubkey(value: &str) -> Option<[u8; 32]> {
107 let bytes = hex::decode(value).ok()?;
108 <[u8; 32]>::try_from(bytes.as_slice()).ok()
109}
110
111#[derive(Debug, Clone)]
112pub struct NostrMirrorConfig {
113 pub relays: Vec<String>,
114 pub publish_relays: Vec<String>,
115 pub blossom_write_servers: Vec<String>,
116 pub max_follow_distance: u32,
117 pub overmute_threshold: f64,
118 pub author_batch_size: usize,
119 pub history_sync_author_chunk_size: usize,
120 pub history_sync_per_author_event_limit: usize,
121 pub missing_profile_backfill_batch_size: usize,
122 pub fetch_timeout: Duration,
123 pub relay_event_max_size: Option<u32>,
124 pub require_negentropy: bool,
125 pub kinds: Vec<u16>,
126 pub history_sync_on_start: bool,
127 pub history_sync_on_reconnect: bool,
128 pub full_text_note_history_follow_distance: Option<u32>,
129 pub full_text_note_history_max_relay_pages: usize,
130 pub published_event_tree_name: Option<String>,
131 pub published_profile_search_tree_name: Option<String>,
132 pub published_profiles_by_pubkey_tree_name: Option<String>,
133}
134
135impl Default for NostrMirrorConfig {
136 fn default() -> Self {
137 Self {
138 relays: Vec::new(),
139 publish_relays: Vec::new(),
140 blossom_write_servers: Vec::new(),
141 max_follow_distance: 2,
142 overmute_threshold: 1.0,
143 author_batch_size: 256,
144 history_sync_author_chunk_size: 5_000,
145 history_sync_per_author_event_limit: 256,
146 missing_profile_backfill_batch_size: 5_000,
147 fetch_timeout: Duration::from_secs(15),
148 relay_event_max_size: Some(SOCIALGRAPH_RELAY_EVENT_MAX_SIZE),
149 require_negentropy: false,
150 kinds: DEFAULT_HISTORY_KINDS.to_vec(),
151 history_sync_on_start: true,
152 history_sync_on_reconnect: true,
153 full_text_note_history_follow_distance: Some(
154 DEFAULT_FULL_TEXT_NOTE_HISTORY_FOLLOW_DISTANCE,
155 ),
156 full_text_note_history_max_relay_pages: DEFAULT_FULL_TEXT_NOTE_HISTORY_MAX_RELAY_PAGES,
157 published_event_tree_name: Some(DEFAULT_EVENT_TREE_NAME.to_string()),
158 published_profile_search_tree_name: Some(DEFAULT_PROFILE_SEARCH_TREE_NAME.to_string()),
159 published_profiles_by_pubkey_tree_name: Some(
160 DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME.to_string(),
161 ),
162 }
163 }
164}
165
166#[derive(Debug, Default)]
167struct RootPublishState {
168 pending_root: Option<hashtree_core::Cid>,
169 last_changed_at: Option<Instant>,
170 dirty_since: Option<Instant>,
171 last_published_root: Option<hashtree_core::Cid>,
172 last_published_at: Option<Instant>,
173 last_published_created_at: Option<Timestamp>,
174 last_uploaded_root: Option<hashtree_core::Cid>,
175 last_uploaded_at: Option<Instant>,
176 upload_in_progress_root: Option<hashtree_core::Cid>,
177 last_upload_failed_at: Option<Instant>,
178 last_upload_error: Option<String>,
179 missing_blob_rebuild_required: bool,
180}
181
182#[derive(Debug, Clone, Copy, PartialEq, Eq)]
183struct HistorySyncPlan {
184 relay_fetch_mode: RelayFetchMode,
185 author_batch_size: usize,
186 per_author_event_limit: usize,
187 relay_page_size: usize,
188 max_relay_pages: usize,
189}
190
191pub struct BackgroundNostrMirror {
192 config: NostrMirrorConfig,
193 store: Arc<HashtreeStore>,
194 graph_store: Arc<SocialGraphStore>,
195 client: Client,
196 publish_client: Option<Client>,
197 event_publish_state: Arc<Mutex<RootPublishState>>,
198 profile_search_publish_state: Arc<Mutex<RootPublishState>>,
199 profiles_by_pubkey_publish_state: Arc<Mutex<RootPublishState>>,
200 pending_live_events: Mutex<BTreeMap<String, Event>>,
201 missing_profile_cursor: Mutex<usize>,
202 history_sync_lock: AsyncMutex<()>,
203 shutdown_tx: watch::Sender<bool>,
204 shutdown_rx: watch::Receiver<bool>,
205}
206
207impl BackgroundNostrMirror {
208 pub async fn new(
209 config: NostrMirrorConfig,
210 store: Arc<HashtreeStore>,
211 graph_store: Arc<SocialGraphStore>,
212 publish_keys: Option<Keys>,
213 ) -> Result<Self> {
214 let client = if let Some(max_size) = config.relay_event_max_size {
215 let mut limits = RelayLimits::default();
216 limits.events.max_size = Some(max_size);
217 Client::with_opts(Keys::generate(), Options::new().relay_limits(limits))
218 } else {
219 Client::new(Keys::generate())
220 };
221 for relay in &config.relays {
222 client
223 .add_relay(relay)
224 .await
225 .with_context(|| format!("add mirror relay {relay}"))?;
226 }
227 client.connect().await;
228
229 let publish_client = if let Some(keys) = publish_keys {
230 if config.publish_relays.is_empty() {
231 None
232 } else {
233 let client = Client::with_opts(keys, Options::new().wait_for_send(false));
234 for relay in &config.publish_relays {
235 client
236 .add_relay(relay)
237 .await
238 .with_context(|| format!("add mirror publish relay {relay}"))?;
239 }
240 client.connect().await;
241 Some(client)
242 }
243 } else {
244 None
245 };
246
247 let (shutdown_tx, shutdown_rx) = watch::channel(false);
248 let event_publish_state = Self::root_publish_state_from_disk(
249 store.base_path(),
250 config.published_event_tree_name.as_deref(),
251 "event root",
252 );
253 let profile_search_publish_state = Self::root_publish_state_from_disk(
254 store.base_path(),
255 config.published_profile_search_tree_name.as_deref(),
256 "profile-search root",
257 );
258 let profiles_by_pubkey_publish_state = Self::root_publish_state_from_disk(
259 store.base_path(),
260 config.published_profiles_by_pubkey_tree_name.as_deref(),
261 "profiles-by-pubkey root",
262 );
263 Ok(Self {
264 config,
265 store,
266 graph_store,
267 client,
268 publish_client,
269 event_publish_state: Arc::new(Mutex::new(event_publish_state)),
270 profile_search_publish_state: Arc::new(Mutex::new(profile_search_publish_state)),
271 profiles_by_pubkey_publish_state: Arc::new(Mutex::new(
272 profiles_by_pubkey_publish_state,
273 )),
274 pending_live_events: Mutex::new(BTreeMap::new()),
275 missing_profile_cursor: Mutex::new(0),
276 history_sync_lock: AsyncMutex::new(()),
277 shutdown_tx,
278 shutdown_rx,
279 })
280 }
281
282 fn root_publish_state_from_disk(
283 base_path: &std::path::Path,
284 tree_name: Option<&str>,
285 log_label: &str,
286 ) -> RootPublishState {
287 let Some(tree_name) = tree_name else {
288 return RootPublishState::default();
289 };
290 let path = Self::uploaded_root_state_path(base_path, tree_name);
291 let Some(root) = Self::read_uploaded_root_state(&path, log_label) else {
292 return RootPublishState::default();
293 };
294
295 RootPublishState {
296 last_uploaded_root: Some(root),
297 ..RootPublishState::default()
298 }
299 }
300
301 fn uploaded_root_state_path(base_path: &std::path::Path, tree_name: &str) -> PathBuf {
302 let safe_tree_name = tree_name
303 .chars()
304 .map(|ch| {
305 if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
306 ch
307 } else {
308 '_'
309 }
310 })
311 .collect::<String>();
312 base_path
313 .join(MIRROR_UPLOAD_STATE_DIR)
314 .join(format!("{safe_tree_name}{MIRROR_UPLOADED_ROOT_SUFFIX}"))
315 }
316
317 fn read_uploaded_root_state(
318 path: &std::path::Path,
319 log_label: &str,
320 ) -> Option<hashtree_core::Cid> {
321 let raw = match std::fs::read_to_string(path) {
322 Ok(raw) => raw,
323 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return None,
324 Err(err) => {
325 warn!(
326 "Nostr mirror failed to read uploaded {} state {}: {}",
327 log_label,
328 path.display(),
329 err
330 );
331 return None;
332 }
333 };
334 let trimmed = raw.trim();
335 if trimmed.is_empty() {
336 return None;
337 }
338 match hashtree_core::Cid::parse(trimmed) {
339 Ok(root) => Some(root),
340 Err(err) => {
341 warn!(
342 "Nostr mirror ignored invalid uploaded {} state {}: {}",
343 log_label,
344 path.display(),
345 err
346 );
347 None
348 }
349 }
350 }
351
352 fn write_uploaded_root_state(
353 path: &std::path::Path,
354 root: &hashtree_core::Cid,
355 log_label: &str,
356 ) -> Result<()> {
357 if let Some(parent) = path.parent() {
358 std::fs::create_dir_all(parent)
359 .with_context(|| format!("create {}", parent.display()))?;
360 }
361 std::fs::write(path, format!("{root}\n"))
362 .with_context(|| format!("write uploaded {log_label} state {}", path.display()))
363 }
364
365 pub fn shutdown(&self) {
366 let _ = self.shutdown_tx.send(true);
367 }
368
369 fn sync_publish_roots_from_store(&self) -> Result<()> {
370 self.note_public_events_root_change()?;
371 self.note_profile_search_root_change()?;
372 self.note_profiles_by_pubkey_root_change()?;
373 Ok(())
374 }
375
376 async fn publish_pending_roots(
377 &self,
378 force_event: bool,
379 force_profile_search: bool,
380 force_profiles_by_pubkey: bool,
381 ) -> (Result<()>, Result<()>, Result<()>) {
382 tokio::join!(
383 self.maybe_publish_event_root(force_event),
384 self.maybe_publish_profile_search_root(force_profile_search),
385 self.maybe_publish_profiles_by_pubkey_root(force_profiles_by_pubkey),
386 )
387 }
388
389 async fn publish_priority_roots(
390 &self,
391 force_event: bool,
392 force_profile_search: bool,
393 force_profiles_by_pubkey: bool,
394 ) -> (Result<()>, Result<()>, Result<()>) {
395 let (profile_search_result, profiles_by_pubkey_result) = tokio::join!(
396 async {
397 if force_profile_search {
398 self.maybe_publish_profile_search_root(true).await
399 } else {
400 Ok(())
401 }
402 },
403 async {
404 if force_profiles_by_pubkey {
405 self.maybe_publish_profiles_by_pubkey_root(true).await
406 } else {
407 Ok(())
408 }
409 },
410 );
411 let event_result = if force_event {
412 self.maybe_publish_event_root(true).await
413 } else {
414 Ok(())
415 };
416 (
417 event_result,
418 profile_search_result,
419 profiles_by_pubkey_result,
420 )
421 }
422
423 pub async fn run(self: Arc<Self>) -> Result<()> {
424 if self.config.relays.is_empty() || self.config.max_follow_distance == 0 {
425 return Ok(());
426 }
427
428 info!(
429 "Nostr mirror starting: relays={} max_follow_distance={} negentropy_only={} kinds={:?} history_sync_author_chunk_size={} history_sync_on_start={} history_sync_on_reconnect={}",
430 self.config.relays.len(),
431 self.config.max_follow_distance,
432 self.config.require_negentropy,
433 self.config.kinds,
434 self.config.history_sync_author_chunk_size.max(1),
435 self.config.history_sync_on_start,
436 self.config.history_sync_on_reconnect
437 );
438
439 tokio::time::sleep(MIRROR_STARTUP_DELAY).await;
440 tokio::time::sleep(MIRROR_CONNECT_SETTLE_DELAY).await;
441 let live_since = Timestamp::now();
442 self.sync_publish_roots_from_store()?;
443 let (event_result, profile_search_result, profiles_by_pubkey_result) =
444 self.publish_priority_roots(true, true, true).await;
445 if let Err(err) = event_result {
446 warn!(
447 "Nostr mirror event-root publish failed on startup: {:#}",
448 err
449 );
450 }
451 if let Err(err) = profile_search_result {
452 warn!(
453 "Nostr mirror profile-search publish failed on startup: {:#}",
454 err
455 );
456 }
457 if let Err(err) = profiles_by_pubkey_result {
458 warn!(
459 "Nostr mirror profiles-by-pubkey publish failed on startup: {:#}",
460 err
461 );
462 }
463
464 let initial_authors = self.collect_authors()?;
465 if initial_authors.is_empty() {
466 info!("Nostr mirror: no social-graph authors to mirror yet");
467 }
468
469 let mut subscribed_authors = HashSet::new();
470 self.subscribe_authors_since(&initial_authors, live_since, &mut subscribed_authors)
471 .await?;
472
473 if !initial_authors.is_empty() && self.config.history_sync_on_start {
474 self.spawn_startup_history_sync(initial_authors.clone());
475 }
476
477 let mut relay_statuses = self.capture_relay_statuses().await;
478 let mut last_reconnect_history_sync_at: Option<Instant> = None;
479 let mut last_missing_profile_backfill_at: Option<Instant> = None;
480 let mut notifications = self.client.notifications();
481 let mut shutdown_rx = self.shutdown_rx.clone();
482 let mut refresh_interval = tokio::time::interval(MIRROR_AUTHOR_REFRESH_INTERVAL);
483 let mut publish_interval = tokio::time::interval(MIRROR_ROOT_PUBLISH_DEBOUNCE);
484
485 loop {
486 tokio::select! {
487 _ = shutdown_rx.changed() => {
488 if *shutdown_rx.borrow() {
489 break;
490 }
491 }
492 _ = refresh_interval.tick() => {
493 let authors = self.collect_authors()?;
494 let new_authors = authors
495 .into_iter()
496 .filter(|author| !subscribed_authors.contains(author))
497 .collect::<Vec<_>>();
498 if !new_authors.is_empty() {
499 debug!(
500 "Nostr mirror discovered {} newly reachable author(s)",
501 new_authors.len()
502 );
503 self.subscribe_authors_since(
504 &new_authors,
505 Timestamp::now(),
506 &mut subscribed_authors,
507 )
508 .await?;
509 self.spawn_author_history_sync(
510 "new-author catch-up",
511 new_authors.clone(),
512 true,
513 true,
514 );
515 }
516 if self.should_backfill_missing_profiles(last_missing_profile_backfill_at) {
517 let missing_profile_authors = self.collect_missing_profile_authors(
518 self.config.missing_profile_backfill_batch_size,
519 )?;
520 if !missing_profile_authors.is_empty() {
521 info!(
522 "Nostr mirror missing-profile backfill starting: authors={}",
523 missing_profile_authors.len()
524 );
525 self.spawn_missing_profile_backfill(missing_profile_authors);
526 last_missing_profile_backfill_at = Some(Instant::now());
527 }
528 }
529 }
530 _ = publish_interval.tick() => {
531 self.sync_publish_roots_from_store()?;
532 if let Err(err) = self.flush_live_events().await {
533 warn!("Nostr mirror live event flush failed: {:#}", err);
534 }
535 let (event_result, profile_search_result, profiles_by_pubkey_result) = self
536 .publish_pending_roots(false, false, false)
537 .await;
538 if let Err(err) = event_result {
539 warn!("Nostr mirror event-root publish failed: {:#}", err);
540 }
541 if let Err(err) = profile_search_result {
542 warn!("Nostr mirror profile-search publish failed: {:#}", err);
543 }
544 if let Err(err) = profiles_by_pubkey_result {
545 warn!("Nostr mirror profiles-by-pubkey publish failed: {:#}", err);
546 }
547 }
548 notification = notifications.recv() => {
549 match notification {
550 Ok(RelayPoolNotification::Event { event, .. }) => {
551 self.ingest_live_event(&event)?;
552 }
553 Ok(RelayPoolNotification::RelayStatus { relay_url, status }) => {
554 let relay_url = relay_url.to_string();
555 let previous = relay_statuses.insert(relay_url.clone(), status);
556 if Self::should_history_sync_on_reconnect(
557 self.config.history_sync_on_reconnect,
558 previous,
559 status,
560 ) && Self::should_run_reconnect_history_sync(
561 last_reconnect_history_sync_at.as_ref(),
562 )
563 {
564 let authors = self.collect_authors()?;
565 if !authors.is_empty() {
566 info!(
567 "Nostr mirror relay reconnected; running catch-up history sync: relay={} authors={} negentropy_only={}",
568 relay_url,
569 authors.len(),
570 self.config.require_negentropy
571 );
572 self.spawn_author_history_sync(
573 "relay reconnect catch-up",
574 authors,
575 false,
576 false,
577 );
578 last_reconnect_history_sync_at = Some(Instant::now());
579 }
580 }
581 }
582 Ok(RelayPoolNotification::Shutdown) => break,
583 Ok(_) => {}
584 Err(err) => {
585 warn!("Nostr mirror notification error: {}", err);
586 break;
587 }
588 }
589 }
590 }
591 }
592
593 if let Err(err) = self.flush_live_events().await {
594 warn!(
595 "Nostr mirror live event flush failed during shutdown: {:#}",
596 err
597 );
598 }
599 if let Err(err) = self.sync_publish_roots_from_store() {
600 warn!(
601 "Nostr mirror root-state refresh failed during shutdown: {:#}",
602 err
603 );
604 }
605 let (event_result, profile_search_result, profiles_by_pubkey_result) =
606 self.publish_pending_roots(true, true, true).await;
607 if let Err(err) = event_result {
608 warn!(
609 "Nostr mirror event-root publish failed during shutdown: {:#}",
610 err
611 );
612 }
613 if let Err(err) = profile_search_result {
614 warn!(
615 "Nostr mirror profile-search publish failed during shutdown: {:#}",
616 err
617 );
618 }
619 if let Err(err) = profiles_by_pubkey_result {
620 warn!(
621 "Nostr mirror profiles-by-pubkey publish failed during shutdown: {:#}",
622 err
623 );
624 }
625 let _ = self.client.disconnect().await;
626 if let Some(client) = self.publish_client.as_ref() {
627 let _ = client.disconnect().await;
628 }
629 Ok(())
630 }
631
632 fn spawn_startup_history_sync(self: &Arc<Self>, initial_authors: Vec<String>) {
633 let mirror = Arc::clone(self);
634 tokio::task::spawn_blocking(move || {
635 let runtime = tokio::runtime::Builder::new_current_thread()
636 .enable_all()
637 .build()
638 .expect("build nostr mirror startup history sync runtime");
639 runtime.block_on(async move {
640 let _guard = mirror.history_sync_lock.lock().await;
641 if let Err(err) = mirror.run_startup_history_sync(initial_authors).await {
642 warn!("Nostr mirror startup history sync failed: {:#}", err);
643 }
644 });
645 });
646 }
647
648 async fn run_startup_history_sync(&self, initial_authors: Vec<String>) -> Result<()> {
649 self.history_sync_full_text_notes_for_reachable_authors()
650 .await?;
651 self.history_sync_authors(initial_authors).await?;
652 if self.should_backfill_missing_profiles(None) {
653 let missing_profile_authors = self
654 .collect_missing_profile_authors(self.config.missing_profile_backfill_batch_size)?;
655 if !missing_profile_authors.is_empty() {
656 info!(
657 "Nostr mirror missing-profile backfill starting: authors={}",
658 missing_profile_authors.len()
659 );
660 self.history_sync_authors_with_kinds(
661 missing_profile_authors,
662 &[Kind::Metadata.as_u16()],
663 )
664 .await?;
665 }
666 }
667 Ok(())
668 }
669
670 fn spawn_author_history_sync(
671 self: &Arc<Self>,
672 label: &'static str,
673 authors: Vec<String>,
674 include_full_text_notes: bool,
675 wait_for_existing_sync: bool,
676 ) {
677 let mirror = Arc::clone(self);
678 tokio::task::spawn_blocking(move || {
679 let runtime = tokio::runtime::Builder::new_current_thread()
680 .enable_all()
681 .build()
682 .expect("build nostr mirror author history sync runtime");
683 runtime.block_on(async move {
684 if wait_for_existing_sync {
685 let _guard = mirror.history_sync_lock.lock().await;
686 if let Err(err) = mirror
687 .run_author_history_sync(authors, include_full_text_notes)
688 .await
689 {
690 warn!("Nostr mirror {label} failed: {:#}", err);
691 }
692 return;
693 }
694
695 let Ok(_guard) = mirror.history_sync_lock.try_lock() else {
696 info!("Nostr mirror {label} skipped; another history sync is running");
697 return;
698 };
699 if let Err(err) = mirror
700 .run_author_history_sync(authors, include_full_text_notes)
701 .await
702 {
703 warn!("Nostr mirror {label} failed: {:#}", err);
704 }
705 });
706 });
707 }
708
709 async fn run_author_history_sync(
710 &self,
711 authors: Vec<String>,
712 include_full_text_notes: bool,
713 ) -> Result<()> {
714 if include_full_text_notes {
715 self.history_sync_full_text_notes_for_authors(authors.clone())
716 .await?;
717 }
718 self.history_sync_authors(authors).await
719 }
720
721 fn spawn_missing_profile_backfill(self: &Arc<Self>, authors: Vec<String>) {
722 let mirror = Arc::clone(self);
723 tokio::task::spawn_blocking(move || {
724 let runtime = tokio::runtime::Builder::new_current_thread()
725 .enable_all()
726 .build()
727 .expect("build nostr mirror missing profile runtime");
728 runtime.block_on(async move {
729 let Ok(_guard) = mirror.history_sync_lock.try_lock() else {
730 info!(
731 "Nostr mirror missing-profile backfill skipped; another history sync is running"
732 );
733 return;
734 };
735 if let Err(err) = mirror
736 .history_sync_authors_with_kinds(authors, &[Kind::Metadata.as_u16()])
737 .await
738 {
739 warn!("Nostr mirror missing-profile backfill failed: {:#}", err);
740 }
741 });
742 });
743 }
744
745 async fn capture_relay_statuses(&self) -> HashMap<String, RelayStatus> {
746 let mut statuses = HashMap::new();
747 for (relay_url, relay) in self.client.relays().await {
748 statuses.insert(relay_url.to_string(), relay.status().await);
749 }
750 statuses
751 }
752
753 async fn has_connected_publish_relay(&self) -> bool {
754 let Some(client) = self.publish_client.as_ref() else {
755 return false;
756 };
757 Self::client_has_connected_relay(client).await
758 }
759
760 async fn client_has_connected_relay(client: &Client) -> bool {
761 for (_relay_url, relay) in client.relays().await {
762 if relay.status().await == RelayStatus::Connected {
763 return true;
764 }
765 }
766 false
767 }
768
769 fn collect_authors(&self) -> Result<Vec<String>> {
770 self.collect_authors_with_max_distance(self.config.max_follow_distance)
771 }
772
773 fn collect_authors_with_max_distance(&self, max_distance: u32) -> Result<Vec<String>> {
774 let mut authors = Vec::new();
775 let mut seen = HashSet::new();
776 for distance in 0..=max_distance {
777 for pubkey in socialgraph::SocialGraphBackend::users_by_follow_distance(
778 self.graph_store.as_ref(),
779 distance,
780 )
781 .with_context(|| format!("load social-graph distance {distance}"))?
782 {
783 if self
784 .graph_store
785 .is_overmuted_user(&pubkey, self.config.overmute_threshold)?
786 {
787 continue;
788 }
789 let hex = hex::encode(pubkey);
790 if seen.insert(hex.clone()) {
791 authors.push(hex);
792 }
793 }
794 }
795 Ok(authors)
796 }
797
798 async fn prioritize_full_text_note_history_authors(
799 &self,
800 authors: Vec<String>,
801 ) -> Result<Vec<String>> {
802 let Some(root) = self.graph_store.public_events_root()? else {
803 return Ok(authors);
804 };
805
806 let event_store = NostrEventStore::new(self.store.store_arc());
807 let mut prioritized = Vec::with_capacity(authors.len());
808 let mut sampled = 0usize;
809 for (index, author) in authors.into_iter().enumerate() {
810 let distance = match decode_hex_pubkey(&author) {
811 Some(pubkey) => self
812 .graph_store
813 .follow_distance(&pubkey)?
814 .unwrap_or(u32::MAX),
815 None => u32::MAX,
816 };
817 let indexed_text_sample = if distance <= FULL_TEXT_HISTORY_PRIORITY_MAX_DISTANCE {
818 sampled = sampled.saturating_add(1);
819 event_store
820 .list_by_author_and_kind(
821 Some(&root),
822 &author,
823 Kind::TextNote.as_u16() as u32,
824 ListEventsOptions {
825 limit: Some(FULL_TEXT_HISTORY_PRIORITY_SAMPLE_LIMIT),
826 ..ListEventsOptions::default()
827 },
828 )
829 .await
830 .with_context(|| {
831 format!("sample indexed text-note history for author {author}")
832 })?
833 .len()
834 } else {
835 FULL_TEXT_HISTORY_PRIORITY_SAMPLE_LIMIT
836 };
837 prioritized.push((distance, indexed_text_sample, index, author));
838 }
839
840 prioritized.sort_by(|left, right| {
841 left.0
842 .cmp(&right.0)
843 .then_with(|| left.1.cmp(&right.1))
844 .then_with(|| left.2.cmp(&right.2))
845 });
846 info!(
847 "Nostr mirror full text content history prioritized authors: authors={} sampled_distance_le_{}={}",
848 prioritized.len(),
849 FULL_TEXT_HISTORY_PRIORITY_MAX_DISTANCE,
850 sampled
851 );
852 Ok(prioritized
853 .into_iter()
854 .map(|(_, _, _, author)| author)
855 .collect())
856 }
857
858 fn full_text_note_history_follow_distance(&self) -> Option<u32> {
859 let distance = self.config.full_text_note_history_follow_distance?;
860 if self
861 .config
862 .kinds
863 .iter()
864 .any(|kind| *kind == Kind::TextNote.as_u16() || *kind == KIND_LONG_FORM_CONTENT)
865 {
866 Some(distance.min(self.config.max_follow_distance))
867 } else {
868 None
869 }
870 }
871
872 fn full_text_note_history_max_relay_pages(&self) -> Option<usize> {
873 Self::full_text_note_history_max_relay_pages_for_config(&self.config)
874 }
875
876 fn full_text_note_history_max_relay_pages_for_config(
877 config: &NostrMirrorConfig,
878 ) -> Option<usize> {
879 let pages = config.full_text_note_history_max_relay_pages;
880 if pages == 0 {
881 None
882 } else {
883 Some(pages)
884 }
885 }
886
887 fn is_text_content_history_kind(kind: u16) -> bool {
888 kind == Kind::TextNote.as_u16() || kind == KIND_LONG_FORM_CONTENT
889 }
890
891 fn history_sync_kinds_for_config(config: &NostrMirrorConfig) -> Vec<u16> {
892 let mut kinds = config.kinds.clone();
893 kinds.retain(|kind| !Self::is_text_content_history_kind(*kind));
894 kinds
895 }
896
897 fn collect_missing_profile_authors(&self, limit: usize) -> Result<Vec<String>> {
898 if limit == 0 {
899 return Ok(Vec::new());
900 }
901
902 let authors = self.collect_authors()?;
903 if authors.is_empty() {
904 return Ok(Vec::new());
905 }
906
907 let mut cursor = self
908 .missing_profile_cursor
909 .lock()
910 .expect("missing profile cursor");
911 let mut index = (*cursor).min(authors.len());
912 let mut scanned = 0usize;
913 let mut missing = Vec::new();
914
915 while scanned < authors.len() && missing.len() < limit {
916 let author = &authors[index];
917 if self.graph_store.latest_profile_event(author)?.is_none() {
918 missing.push(author.clone());
919 }
920 index += 1;
921 if index == authors.len() {
922 index = 0;
923 }
924 scanned += 1;
925 }
926
927 *cursor = index;
928 Ok(missing)
929 }
930
931 fn should_backfill_missing_profiles(&self, last_run: Option<Instant>) -> bool {
932 if self.config.missing_profile_backfill_batch_size == 0
933 || !self.config.kinds.contains(&Kind::Metadata.as_u16())
934 {
935 return false;
936 }
937 match last_run {
938 Some(last_run) => last_run.elapsed() >= MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL,
939 None => true,
940 }
941 }
942
943 fn should_history_sync_on_reconnect(
944 history_sync_on_reconnect: bool,
945 previous: Option<RelayStatus>,
946 status: RelayStatus,
947 ) -> bool {
948 history_sync_on_reconnect
949 && status == RelayStatus::Connected
950 && matches!(
951 previous,
952 Some(
953 RelayStatus::Initialized
954 | RelayStatus::Pending
955 | RelayStatus::Connecting
956 | RelayStatus::Disconnected
957 | RelayStatus::Terminated
958 )
959 )
960 }
961
962 fn should_run_reconnect_history_sync(last_run: Option<&Instant>) -> bool {
963 match last_run {
964 None => true,
965 Some(last_run) => last_run.elapsed() >= MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN,
966 }
967 }
968
969 fn is_metadata_only_history_sync(kinds: &[u16]) -> bool {
970 !kinds.is_empty() && kinds.iter().all(|kind| *kind == Kind::Metadata.as_u16())
971 }
972
973 fn history_sync_kinds_affect_profile_or_graph(kinds: &[u16]) -> bool {
974 kinds.is_empty()
975 || kinds.iter().any(|kind| {
976 *kind == Kind::Metadata.as_u16()
977 || *kind == Kind::ContactList.as_u16()
978 || *kind == Kind::MuteList.as_u16()
979 })
980 }
981
982 fn history_sync_plan_for(
983 config: &NostrMirrorConfig,
984 authors: usize,
985 kinds: &[u16],
986 ) -> HistorySyncPlan {
987 let author_batch_size = config.author_batch_size.max(1);
988 let per_author_event_limit = config.history_sync_per_author_event_limit.max(1);
989 let relay_page_size = 1_000;
990 let max_relay_pages = 10;
991
992 if Self::is_metadata_only_history_sync(kinds) {
993 return HistorySyncPlan {
994 relay_fetch_mode: RelayFetchMode::AuthorBatches,
995 author_batch_size: author_batch_size.min(METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE),
996 per_author_event_limit: METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT,
997 relay_page_size,
998 max_relay_pages,
999 };
1000 }
1001
1002 if authors > author_batch_size.saturating_mul(LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER) {
1003 return HistorySyncPlan {
1004 relay_fetch_mode: RelayFetchMode::GlobalRecent,
1005 author_batch_size,
1006 per_author_event_limit: per_author_event_limit
1007 .min(LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT)
1008 .max(1),
1009 relay_page_size,
1010 max_relay_pages: LARGE_HISTORY_SYNC_MAX_RELAY_PAGES,
1011 };
1012 }
1013
1014 HistorySyncPlan {
1015 relay_fetch_mode: RelayFetchMode::AuthorBatches,
1016 author_batch_size,
1017 per_author_event_limit,
1018 relay_page_size,
1019 max_relay_pages,
1020 }
1021 }
1022
1023 fn history_sync_plan(&self, authors: usize, kinds: &[u16]) -> HistorySyncPlan {
1024 Self::history_sync_plan_for(&self.config, authors, kinds)
1025 }
1026
1027 fn history_sync_chunk_size_for_config(
1028 config: &NostrMirrorConfig,
1029 authors: usize,
1030 kinds: &[u16],
1031 full_author_history: bool,
1032 chunk_size_override: Option<usize>,
1033 ) -> usize {
1034 let configured_chunk_size = chunk_size_override
1035 .unwrap_or(config.history_sync_author_chunk_size)
1036 .max(1);
1037 if full_author_history {
1038 return 1;
1039 }
1040 if !full_author_history
1041 && Self::history_sync_plan_for(config, authors, kinds).relay_fetch_mode
1042 == RelayFetchMode::GlobalRecent
1043 {
1044 return authors.max(1);
1045 }
1046 configured_chunk_size
1047 }
1048
1049 async fn history_sync_authors(&self, authors: Vec<String>) -> Result<()> {
1050 let kinds = Self::history_sync_kinds_for_config(&self.config);
1051 if kinds.is_empty() {
1052 info!("Nostr mirror history sync skipped: no enabled history kinds");
1053 return Ok(());
1054 }
1055 self.history_sync_authors_with_kinds(authors, &kinds).await
1056 }
1057
1058 async fn history_sync_authors_with_kinds(
1059 &self,
1060 authors: Vec<String>,
1061 kinds: &[u16],
1062 ) -> Result<()> {
1063 self.history_sync_authors_with_kinds_and_mode(authors, kinds, false, None)
1064 .await
1065 }
1066
1067 async fn history_sync_full_text_notes_for_reachable_authors(&self) -> Result<()> {
1068 let Some(distance) = self.full_text_note_history_follow_distance() else {
1069 return Ok(());
1070 };
1071 if self.full_text_note_history_max_relay_pages().is_none() {
1072 info!("Nostr mirror full text content history sync skipped: max_relay_pages=0");
1073 return Ok(());
1074 }
1075 info!(
1076 "Nostr mirror full text content history author collection starting: max_follow_distance={distance}"
1077 );
1078 let authors = self
1079 .prioritize_full_text_note_history_authors(
1080 self.collect_authors_with_max_distance(distance)?,
1081 )
1082 .await?;
1083 let Some(max_relay_pages) = self.full_text_note_history_max_relay_pages() else {
1084 info!("Nostr mirror full text content history sync skipped: max_relay_pages=0");
1085 return Ok(());
1086 };
1087 self.history_sync_distance_filtered_full_text_notes(authors, distance, max_relay_pages)
1088 .await
1089 }
1090
1091 async fn history_sync_full_text_notes_for_authors(&self, authors: Vec<String>) -> Result<()> {
1092 let Some(distance) = self.full_text_note_history_follow_distance() else {
1093 return Ok(());
1094 };
1095 let Some(max_relay_pages) = self.full_text_note_history_max_relay_pages() else {
1096 info!("Nostr mirror full text content history sync skipped: max_relay_pages=0");
1097 return Ok(());
1098 };
1099 let mut close_authors = Vec::new();
1100 for author in authors {
1101 let Some(pubkey) = decode_hex_pubkey(&author) else {
1102 continue;
1103 };
1104 if self
1105 .graph_store
1106 .follow_distance(&pubkey)?
1107 .is_some_and(|actual_distance| actual_distance <= distance)
1108 {
1109 close_authors.push(author);
1110 }
1111 }
1112 if close_authors.is_empty() {
1113 return Ok(());
1114 }
1115 let close_authors = self
1116 .prioritize_full_text_note_history_authors(close_authors)
1117 .await?;
1118
1119 self.history_sync_distance_filtered_full_text_notes(
1120 close_authors,
1121 distance,
1122 max_relay_pages,
1123 )
1124 .await
1125 }
1126
1127 async fn history_sync_distance_filtered_full_text_notes(
1128 &self,
1129 close_authors: Vec<String>,
1130 distance: u32,
1131 max_relay_pages: usize,
1132 ) -> Result<()> {
1133 if close_authors.is_empty() {
1134 return Ok(());
1135 }
1136
1137 info!(
1138 "Nostr mirror full text content history sync starting: authors={} max_follow_distance={} max_relay_pages={}",
1139 close_authors.len(),
1140 distance,
1141 max_relay_pages
1142 );
1143 let kinds = [Kind::TextNote.as_u16(), KIND_LONG_FORM_CONTENT];
1144 self.history_sync_authors_with_kinds_and_mode(
1145 close_authors,
1146 &kinds,
1147 true,
1148 Some(max_relay_pages),
1149 )
1150 .await
1151 }
1152
1153 async fn history_sync_authors_with_kinds_and_mode(
1154 &self,
1155 authors: Vec<String>,
1156 kinds: &[u16],
1157 full_author_history: bool,
1158 max_relay_pages: Option<usize>,
1159 ) -> Result<()> {
1160 let update_profile_and_graph = Self::history_sync_kinds_affect_profile_or_graph(kinds);
1161 let chunk_size = Self::history_sync_chunk_size_for_config(
1162 &self.config,
1163 authors.len(),
1164 kinds,
1165 full_author_history,
1166 None,
1167 );
1168 self.history_sync_authors_chunked(
1169 authors,
1170 |current_root, author_chunk| async move {
1171 self.history_sync_author_chunk(
1172 current_root,
1173 author_chunk,
1174 kinds,
1175 full_author_history,
1176 max_relay_pages,
1177 )
1178 .await
1179 },
1180 update_profile_and_graph,
1181 Some(chunk_size),
1182 )
1183 .await
1184 }
1185
1186 async fn history_sync_authors_chunked<F, Fut>(
1187 &self,
1188 authors: Vec<String>,
1189 mut run_chunk: F,
1190 update_profile_and_graph: bool,
1191 chunk_size_override: Option<usize>,
1192 ) -> Result<()>
1193 where
1194 F: FnMut(Option<hashtree_core::Cid>, Vec<String>) -> Fut,
1195 Fut: std::future::Future<Output = Result<CrawlReport>>,
1196 {
1197 if authors.is_empty() {
1198 return Ok(());
1199 }
1200
1201 info!(
1202 "Nostr mirror history sync starting: authors={} relays={} negentropy_only={}",
1203 authors.len(),
1204 self.config.relays.len(),
1205 self.config.require_negentropy
1206 );
1207
1208 let mut current_root = self.graph_store.public_events_root_for_write()?;
1209 let mut last_error = None;
1210 let mut applied_chunks = 0usize;
1211 let mut failed_chunks = 0usize;
1212 let chunk_size = chunk_size_override
1213 .unwrap_or(self.config.history_sync_author_chunk_size)
1214 .max(1);
1215 let total_chunks = authors.len().div_ceil(chunk_size);
1216
1217 for (chunk_index, author_chunk) in authors.chunks(chunk_size).enumerate() {
1218 let author_chunk = author_chunk.to_vec();
1219 let author_count = author_chunk.len();
1220 info!(
1221 "Nostr mirror history sync chunk starting: chunk={}/{} authors={}",
1222 chunk_index + 1,
1223 total_chunks,
1224 author_count
1225 );
1226 let mut report = match run_chunk(current_root.clone(), author_chunk.clone()).await {
1227 Ok(report) => report,
1228 Err(err) => {
1229 failed_chunks = failed_chunks.saturating_add(1);
1230 warn!(
1231 "Nostr mirror history sync chunk failed: chunk={}/{} authors={} error={:#}",
1232 chunk_index + 1,
1233 total_chunks,
1234 author_count,
1235 err
1236 );
1237 last_error = Some(err);
1238 trim_transient_allocations();
1239 continue;
1240 }
1241 };
1242
1243 let latest_root = self.graph_store.public_events_root_for_write()?;
1244 if latest_root != current_root {
1245 info!(
1246 "Nostr mirror history sync root advanced while chunk was fetching; merging chunk into latest root: chunk={}/{} authors={} events_applied={}",
1247 chunk_index + 1,
1248 total_chunks,
1249 author_count,
1250 report.applied_events.len()
1251 );
1252 if report.applied_events.is_empty() {
1253 report.root = latest_root.clone();
1254 } else {
1255 let event_store = NostrEventStore::new(self.store.store_arc());
1256 report.root = event_store
1257 .build(latest_root.as_ref(), report.applied_events.clone())
1258 .await
1259 .context("merge history chunk into latest mirrored event root")?;
1260 }
1261 current_root = latest_root;
1262 }
1263
1264 if report.root != current_root {
1265 self.apply_history_root_with_options(
1266 report.root.as_ref(),
1267 update_profile_and_graph,
1268 true,
1269 Some(&report.applied_events),
1270 )
1271 .await?;
1272 current_root = report.root.clone();
1273 info!(
1274 "Nostr mirror history sync updated trusted root: chunk={}/{} authors_processed={} events_selected={} events_seen={}",
1275 chunk_index + 1,
1276 total_chunks,
1277 report.authors_processed,
1278 report.events_selected,
1279 report.events_seen
1280 );
1281 }
1282 applied_chunks = applied_chunks.saturating_add(1);
1283 trim_transient_allocations();
1284 }
1285
1286 if applied_chunks == 0 {
1287 return Err(last_error
1288 .unwrap_or_else(|| anyhow::anyhow!("mirror history sync made no progress"))
1289 .context("run mirror history sync"));
1290 }
1291 if failed_chunks > 0 {
1292 warn!(
1293 "Nostr mirror history sync completed with skipped chunks: applied_chunks={} failed_chunks={}",
1294 applied_chunks, failed_chunks
1295 );
1296 }
1297 Ok(())
1298 }
1299
1300 async fn history_sync_author_chunk(
1301 &self,
1302 current_root: Option<hashtree_core::Cid>,
1303 authors: Vec<String>,
1304 kinds: &[u16],
1305 full_author_history: bool,
1306 max_relay_pages: Option<usize>,
1307 ) -> Result<CrawlReport> {
1308 let mut last_error = None;
1309 let mut report = None;
1310 let mut plan = self.history_sync_plan(authors.len(), kinds);
1311 if full_author_history {
1312 plan.relay_fetch_mode = RelayFetchMode::AuthorBatches;
1313 plan.max_relay_pages = max_relay_pages.unwrap_or(plan.max_relay_pages);
1314 }
1315 for attempt in 0..3 {
1316 let mut last_logged_authors = 0usize;
1317 let bridge = NostrBridge::new(
1318 self.store.store_arc(),
1319 CrawlConfig {
1320 relays: self.config.relays.clone(),
1321 author_allowlist: Some(authors.clone()),
1322 max_live_bytes: None,
1323 max_events_seen: None,
1324 max_authors: None,
1325 max_follow_distance: None,
1326 author_batch_size: plan.author_batch_size,
1327 per_author_event_limit: plan.per_author_event_limit,
1328 per_author_live_bytes: None,
1329 fetch_timeout: self.config.fetch_timeout,
1330 kinds: Some(kinds.to_vec()),
1331 relay_fetch_mode: plan.relay_fetch_mode,
1332 require_negentropy: self.config.require_negentropy,
1333 relay_event_max_size: self.config.relay_event_max_size,
1334 relay_page_size: plan.relay_page_size,
1335 max_relay_pages: plan.max_relay_pages,
1336 full_author_history,
1337 },
1338 );
1339
1340 let crawl = bridge.crawl_with_progress(
1341 self.graph_store.as_ref(),
1342 current_root.as_ref(),
1343 |progress| {
1344 let log_interval = self.config.author_batch_size.saturating_mul(8).max(2_048);
1345 let should_log = progress.authors_processed == progress.authors_considered
1346 || progress.authors_processed == 0
1347 || progress
1348 .authors_processed
1349 .saturating_sub(last_logged_authors)
1350 >= log_interval;
1351 if should_log {
1352 last_logged_authors = progress.authors_processed;
1353 info!(
1354 "Nostr mirror history sync progress: authors_processed={}/{} events_selected={} events_seen={}",
1355 progress.authors_processed,
1356 progress.authors_considered,
1357 progress.events_selected,
1358 progress.events_seen
1359 );
1360 }
1361 },
1362 );
1363 let crawl_result: Result<CrawlReport> = if full_author_history {
1364 let timeout = self.config.fetch_timeout.saturating_mul(4);
1365 match tokio::time::timeout(timeout, crawl).await {
1366 Ok(result) => result.map_err(Into::into),
1367 Err(_) => Err(anyhow::anyhow!(
1368 "full author history crawl timed out after {:?} for {} author(s)",
1369 timeout,
1370 authors.len()
1371 )),
1372 }
1373 } else {
1374 crawl.await.map_err(Into::into)
1375 };
1376
1377 match crawl_result {
1378 Ok(next_report) => {
1379 report = Some(next_report);
1380 break;
1381 }
1382 Err(err) => {
1383 last_error = Some(err);
1384 if full_author_history {
1385 break;
1386 }
1387 if attempt < 2 {
1388 tokio::time::sleep(Duration::from_millis(500)).await;
1389 }
1390 }
1391 }
1392 }
1393 report
1394 .ok_or_else(|| last_error.expect("history sync retry captured error"))
1395 .context("run mirror history sync")
1396 }
1397
1398 #[cfg(test)]
1399 async fn apply_history_root(&self, root: Option<&hashtree_core::Cid>) -> Result<()> {
1400 self.apply_history_root_with_options(root, true, true, None)
1401 .await
1402 }
1403
1404 async fn apply_history_root_with_options(
1405 &self,
1406 root: Option<&hashtree_core::Cid>,
1407 update_profile_and_graph: bool,
1408 publish_roots: bool,
1409 applied_events: Option<&[hashtree_nostr::StoredNostrEvent]>,
1410 ) -> Result<()> {
1411 self.graph_store.write_public_events_root(root)?;
1412 let Some(root) = root else {
1413 return Ok(());
1414 };
1415
1416 self.note_public_events_root_change()?;
1417 if update_profile_and_graph {
1418 let events = match applied_events {
1419 Some(events) => events
1420 .iter()
1421 .cloned()
1422 .map(socialgraph::stored_event_to_nostr_event)
1423 .collect::<Result<Vec<_>>>()?,
1424 None => {
1425 let event_store = NostrEventStore::new(self.store.store_arc());
1426 event_store
1427 .list_recent_lossy(Some(root), ListEventsOptions::default())
1428 .await
1429 .context("list trusted mirrored events")?
1430 .into_iter()
1431 .map(socialgraph::stored_event_to_nostr_event)
1432 .collect::<Result<Vec<_>>>()?
1433 }
1434 };
1435
1436 socialgraph::ingest_graph_parsed_events(self.graph_store.as_ref(), &events)
1437 .context("sync mirrored social graph state")?;
1438 if applied_events.is_some() {
1439 self.graph_store
1440 .sync_profile_index_for_events(&events)
1441 .context("update mirrored profile search index")?;
1442 } else {
1443 self.graph_store
1444 .rebuild_profile_index_for_events(&events)
1445 .context("rebuild mirrored profile search index")?;
1446 }
1447 self.note_profile_search_root_change()?;
1448 self.note_profiles_by_pubkey_root_change()?;
1449 }
1450 if !publish_roots {
1451 return Ok(());
1452 }
1453 let (event_result, profile_search_result, profiles_by_pubkey_result) = self
1454 .publish_priority_roots(true, update_profile_and_graph, update_profile_and_graph)
1455 .await;
1456 if let Err(err) = event_result {
1457 warn!(
1458 "Nostr mirror event-root publish failed after root update: {:#}",
1459 err
1460 );
1461 }
1462 if let Err(err) = profile_search_result {
1463 warn!(
1464 "Nostr mirror profile-search publish failed after root update: {:#}",
1465 err
1466 );
1467 }
1468 if let Err(err) = profiles_by_pubkey_result {
1469 warn!(
1470 "Nostr mirror profiles-by-pubkey publish failed after root update: {:#}",
1471 err
1472 );
1473 }
1474 Ok(())
1475 }
1476
1477 async fn subscribe_authors_since(
1478 &self,
1479 authors: &[String],
1480 since: Timestamp,
1481 subscribed_authors: &mut HashSet<String>,
1482 ) -> Result<()> {
1483 let new_authors = authors
1484 .iter()
1485 .filter(|author| !subscribed_authors.contains(*author))
1486 .cloned()
1487 .collect::<Vec<_>>();
1488 if new_authors.is_empty() {
1489 return Ok(());
1490 }
1491
1492 for chunk in new_authors.chunks(self.config.author_batch_size.max(1)) {
1493 let pubkeys = chunk
1494 .iter()
1495 .filter_map(|author| PublicKey::from_hex(author).ok())
1496 .collect::<Vec<_>>();
1497 if pubkeys.is_empty() {
1498 continue;
1499 }
1500
1501 let filter = Filter::new()
1502 .authors(pubkeys)
1503 .kinds(self.config.kinds.iter().copied().map(Kind::from))
1504 .since(since);
1505
1506 if let Err(err) = self.client.subscribe(vec![filter], None).await {
1507 warn!(
1508 "Nostr mirror author subscription failed: authors={} error={:#}",
1509 chunk.len(),
1510 err
1511 );
1512 continue;
1513 }
1514 subscribed_authors.extend(chunk.iter().cloned());
1515 }
1516 Ok(())
1517 }
1518
1519 fn ingest_live_event(&self, event: &Event) -> Result<()> {
1520 self.pending_live_events
1521 .lock()
1522 .expect("pending live events")
1523 .insert(event.id.to_hex(), event.clone());
1524 Ok(())
1525 }
1526
1527 async fn flush_live_events(&self) -> Result<()> {
1528 let pending = {
1529 let mut pending = self
1530 .pending_live_events
1531 .lock()
1532 .expect("pending live events");
1533 if pending.is_empty() {
1534 return Ok(());
1535 }
1536 std::mem::take(&mut *pending)
1537 };
1538 let events = pending.into_values().collect::<Vec<_>>();
1539 let event_count = events.len();
1540 let previous_event_root = self.graph_store.public_events_root()?;
1541 let previous_profile_search_root = self.graph_store.profile_search_root()?;
1542 let previous_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
1543
1544 socialgraph::ingest_parsed_events_with_storage_class(
1545 self.graph_store.as_ref(),
1546 &events,
1547 socialgraph::EventStorageClass::Public,
1548 )
1549 .context("ingest live mirrored event batch")?;
1550
1551 let next_event_root = self.graph_store.public_events_root()?;
1552 let next_profile_search_root = self.graph_store.profile_search_root()?;
1553 let next_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
1554 let event_root_changed = next_event_root != previous_event_root;
1555 let profile_search_root_changed = next_profile_search_root != previous_profile_search_root;
1556 let profiles_by_pubkey_root_changed =
1557 next_profiles_by_pubkey_root != previous_profiles_by_pubkey_root;
1558
1559 if event_root_changed {
1560 self.note_public_events_root_change()?;
1561 }
1562 if profile_search_root_changed {
1563 self.note_profile_search_root_change()?;
1564 }
1565 if profiles_by_pubkey_root_changed {
1566 self.note_profiles_by_pubkey_root_change()?;
1567 }
1568 if profile_search_root_changed {
1569 self.maybe_publish_profile_search_root(false).await?;
1570 }
1571 if profiles_by_pubkey_root_changed {
1572 self.maybe_publish_profiles_by_pubkey_root(false).await?;
1573 }
1574 if event_root_changed {
1575 self.maybe_publish_event_root(false).await?;
1576 }
1577 info!(
1578 "Nostr mirror flushed live events: events={} event_root_changed={} profile_search_root_changed={} profiles_by_pubkey_root_changed={}",
1579 event_count,
1580 event_root_changed,
1581 profile_search_root_changed,
1582 profiles_by_pubkey_root_changed
1583 );
1584 Ok(())
1585 }
1586
1587 fn note_public_events_root_change(&self) -> Result<()> {
1588 let root = self.graph_store.public_events_root()?;
1589 Self::note_root_change(
1590 self.config.published_event_tree_name.as_deref(),
1591 &self.event_publish_state,
1592 root,
1593 )
1594 }
1595
1596 fn note_profile_search_root_change(&self) -> Result<()> {
1597 let root = self.graph_store.profile_search_root()?;
1598 Self::note_root_change(
1599 self.config.published_profile_search_tree_name.as_deref(),
1600 &self.profile_search_publish_state,
1601 root,
1602 )
1603 }
1604
1605 fn note_profiles_by_pubkey_root_change(&self) -> Result<()> {
1606 let root = self.graph_store.profiles_by_pubkey_root()?;
1607 Self::note_root_change(
1608 self.config
1609 .published_profiles_by_pubkey_tree_name
1610 .as_deref(),
1611 &self.profiles_by_pubkey_publish_state,
1612 root,
1613 )
1614 }
1615
1616 fn note_root_change(
1617 tree_name: Option<&str>,
1618 publish_state: &Arc<Mutex<RootPublishState>>,
1619 root: Option<hashtree_core::Cid>,
1620 ) -> Result<()> {
1621 let Some(_tree_name) = tree_name else {
1622 return Ok(());
1623 };
1624
1625 let mut state = publish_state.lock().expect("root publish state");
1626 let now = Instant::now();
1627
1628 if state.pending_root == root {
1629 return Ok(());
1630 }
1631
1632 state.pending_root = root;
1633 state.last_upload_failed_at = None;
1634 state.last_upload_error = None;
1635 state.last_changed_at = Some(now);
1636 if state.dirty_since.is_none() {
1637 state.dirty_since = Some(now);
1638 }
1639 Ok(())
1640 }
1641
1642 async fn maybe_publish_event_root(&self, force: bool) -> Result<()> {
1643 if self.take_missing_blob_event_upload_error() {
1644 return self.rebuild_event_indexes_after_missing_blobs(force).await;
1645 }
1646 let result = self
1647 .maybe_publish_root(
1648 self.config.published_event_tree_name.as_deref(),
1649 &self.event_publish_state,
1650 "event root",
1651 force,
1652 false,
1653 )
1654 .await;
1655 let Err(error) = result else {
1656 if self.take_missing_blob_event_upload_error() {
1657 return self.rebuild_event_indexes_after_missing_blobs(force).await;
1658 }
1659 return Ok(());
1660 };
1661 if !is_missing_local_blob_push_error(&error) {
1662 return Err(error);
1663 }
1664
1665 self.rebuild_event_indexes_after_missing_blobs(force).await
1666 }
1667
1668 fn take_missing_blob_event_upload_error(&self) -> bool {
1669 let mut state = self
1670 .event_publish_state
1671 .lock()
1672 .expect("event root publish state");
1673 if state.upload_in_progress_root.is_some() {
1674 return false;
1675 }
1676 if !state.missing_blob_rebuild_required {
1677 return false;
1678 }
1679 state.missing_blob_rebuild_required = false;
1680 state.last_upload_error = None;
1681 state.last_upload_failed_at = None;
1682 true
1683 }
1684
1685 async fn rebuild_event_indexes_after_missing_blobs(&self, force: bool) -> Result<()> {
1686 warn!(
1687 "Nostr mirror event root DAG references missing local blobs; rebuilding event indexes from stored events"
1688 );
1689 let (public_count, ambient_count) = self
1690 .graph_store
1691 .rebuild_event_indexes_from_stored_events_async()
1692 .await
1693 .context("rebuild event indexes after missing event blobs")?;
1694 info!(
1695 "Nostr mirror rebuilt event indexes after missing blobs: public={} ambient={}",
1696 public_count, ambient_count
1697 );
1698 self.sync_publish_roots_from_store()?;
1699
1700 self.maybe_publish_root(
1701 self.config.published_event_tree_name.as_deref(),
1702 &self.event_publish_state,
1703 "event root",
1704 force,
1705 false,
1706 )
1707 .await
1708 }
1709
1710 async fn maybe_publish_profile_search_root(&self, force: bool) -> Result<()> {
1711 self.maybe_publish_root(
1712 self.config.published_profile_search_tree_name.as_deref(),
1713 &self.profile_search_publish_state,
1714 "profile search root",
1715 force,
1716 true,
1717 )
1718 .await
1719 }
1720
1721 async fn maybe_publish_profiles_by_pubkey_root(&self, force: bool) -> Result<()> {
1722 self.maybe_publish_root(
1723 self.config
1724 .published_profiles_by_pubkey_tree_name
1725 .as_deref(),
1726 &self.profiles_by_pubkey_publish_state,
1727 "profiles-by-pubkey root",
1728 force,
1729 true,
1730 )
1731 .await
1732 }
1733
1734 async fn maybe_publish_root(
1735 &self,
1736 tree_name: Option<&str>,
1737 publish_state: &Arc<Mutex<RootPublishState>>,
1738 log_label: &str,
1739 force: bool,
1740 publish_before_upload_ready_on_force: bool,
1741 ) -> Result<()> {
1742 let Some(tree_name) = tree_name else {
1743 return Ok(());
1744 };
1745
1746 let pending_root = {
1747 let state = publish_state.lock().expect("root publish state");
1748 let Some(pending_root) = state.pending_root.clone() else {
1749 return Ok(());
1750 };
1751
1752 let now = Instant::now();
1753 let debounce_ready = state.last_changed_at.is_some_and(|changed_at| {
1754 now.duration_since(changed_at) >= MIRROR_ROOT_PUBLISH_DEBOUNCE
1755 });
1756 let stale_ready = state.dirty_since.is_some_and(|dirty_since| {
1757 now.duration_since(dirty_since) >= MIRROR_ROOT_PUBLISH_MAX_STALENESS
1758 });
1759 if !force && !debounce_ready && !stale_ready {
1760 return Ok(());
1761 }
1762
1763 pending_root
1764 };
1765
1766 let upload_started = self.maybe_start_background_root_upload(
1767 tree_name,
1768 &pending_root,
1769 publish_state,
1770 log_label,
1771 );
1772 let upload_required = !self.config.blossom_write_servers.is_empty();
1773 let (upload_ready, publish_root) = {
1774 let state = publish_state.lock().expect("root publish state");
1775 let upload_ready =
1776 !upload_required || state.last_uploaded_root.as_ref() == Some(&pending_root);
1777 let publish_root = if upload_ready {
1778 Some(pending_root.clone())
1779 } else {
1780 state.last_uploaded_root.clone().filter(|uploaded_root| {
1781 state.last_published_root.as_ref() != Some(uploaded_root)
1782 })
1783 };
1784 (upload_ready, publish_root)
1785 };
1786 let publish_before_upload_ready =
1787 force && upload_required && !upload_ready && publish_before_upload_ready_on_force;
1788 let publish_root = if let Some(publish_root) = publish_root {
1789 publish_root
1790 } else if publish_before_upload_ready {
1791 pending_root.clone()
1792 } else {
1793 if upload_started {
1794 info!(
1795 "Nostr mirror uploading {} DAG before publish: tree={} hash={}",
1796 log_label,
1797 tree_name,
1798 hex::encode(pending_root.hash),
1799 );
1800 }
1801 return Ok(());
1802 };
1803
1804 let mut successful_relays = Vec::new();
1805 let mut failed_relays = Vec::new();
1806 let mut published_now = false;
1807 let publish_required =
1808 self.publish_client.is_some() && !self.config.publish_relays.is_empty();
1809 if publish_required {
1810 let Some(publish_client) = self.publish_client.as_ref() else {
1811 unreachable!("publish_required implies publish_client");
1812 };
1813 if !self.has_connected_publish_relay().await {
1814 return Ok(());
1815 }
1816 if publish_before_upload_ready {
1817 info!(
1818 "Nostr mirror publishing {} before Blossom upload completes: tree={} hash={}",
1819 log_label,
1820 tree_name,
1821 hex::encode(pending_root.hash),
1822 );
1823 } else if !upload_ready {
1824 info!(
1825 "Nostr mirror publishing uploaded {} while newer root is still uploading: tree={} published_hash={} pending_hash={}",
1826 log_label,
1827 tree_name,
1828 hex::encode(publish_root.hash),
1829 hex::encode(pending_root.hash),
1830 );
1831 }
1832
1833 let already_published = {
1834 let state = publish_state.lock().expect("root publish state");
1835 state.last_published_root.as_ref() == Some(&publish_root)
1836 };
1837 if !already_published {
1838 let publish_relays = self.config.publish_relays.clone();
1839 let latest_known_created_at = {
1840 let state = publish_state.lock().expect("root publish state");
1841 state.last_published_created_at
1842 };
1843 let publish_created_at =
1844 next_replaceable_created_at(Timestamp::now(), latest_known_created_at);
1845 let event = publish_client
1846 .sign_event_builder(Self::build_public_root_event(
1847 tree_name,
1848 &publish_root,
1849 publish_created_at,
1850 ))
1851 .await
1852 .with_context(|| format!("sign {log_label} event"))?;
1853 let publish_result = self
1854 .publish_root_event_to_relays(publish_client, &publish_relays, &event)
1855 .await
1856 .with_context(|| format!("publish {log_label} event"))?;
1857 successful_relays = publish_result.0;
1858 failed_relays = publish_result.1;
1859 if successful_relays.is_empty() {
1860 let failure_summary = if failed_relays.is_empty() {
1861 "no publish relays accepted the event".to_string()
1862 } else {
1863 failed_relays.join("; ")
1864 };
1865 anyhow::bail!("no publish relays accepted the event ({failure_summary})");
1866 }
1867
1868 let mut state = publish_state.lock().expect("root publish state");
1869 if state.pending_root.as_ref() == Some(&pending_root)
1870 || state.last_uploaded_root.as_ref() == Some(&publish_root)
1871 || publish_before_upload_ready
1872 {
1873 state.last_published_root = Some(publish_root.clone());
1874 state.last_published_at = Some(Instant::now());
1875 state.last_published_created_at = Some(event.created_at);
1876 }
1877 published_now = true;
1878 }
1879 }
1880
1881 {
1882 let mut state = publish_state.lock().expect("root publish state");
1883 if state.pending_root.as_ref() == Some(&pending_root) {
1884 let upload_satisfied = self.config.blossom_write_servers.is_empty()
1885 || state.last_uploaded_root.as_ref() == Some(&pending_root);
1886 let publish_satisfied =
1887 !publish_required || state.last_published_root.as_ref() == Some(&pending_root);
1888 if upload_satisfied && publish_satisfied {
1889 state.dirty_since = None;
1890 }
1891 }
1892 }
1893
1894 if published_now {
1895 info!(
1896 "Nostr mirror published {}: tree={} hash={} relays={:?}",
1897 log_label,
1898 tree_name,
1899 hex::encode(publish_root.hash),
1900 successful_relays,
1901 );
1902 }
1903 if !failed_relays.is_empty() {
1904 warn!(
1905 "Nostr mirror publish had relay failures: tree={} failures={:?}",
1906 tree_name, failed_relays
1907 );
1908 }
1909 Ok(())
1910 }
1911
1912 fn maybe_start_background_root_upload(
1913 &self,
1914 tree_name: &str,
1915 pending_root: &hashtree_core::Cid,
1916 publish_state: &Arc<Mutex<RootPublishState>>,
1917 log_label: &str,
1918 ) -> bool {
1919 if self.config.blossom_write_servers.is_empty() {
1920 return false;
1921 }
1922
1923 let previous_uploaded_root = {
1924 let mut state = publish_state.lock().expect("root publish state");
1925 if state.last_uploaded_root.as_ref() == Some(pending_root)
1926 || state.upload_in_progress_root.is_some()
1927 {
1928 return false;
1929 }
1930 if state
1931 .last_upload_failed_at
1932 .is_some_and(|failed_at| failed_at.elapsed() < MIRROR_ROOT_UPLOAD_RETRY_INTERVAL)
1933 {
1934 return false;
1935 }
1936 state.upload_in_progress_root = Some(pending_root.clone());
1937 state.last_uploaded_root.clone()
1938 };
1939
1940 let store = Arc::clone(&self.store);
1941 let upload_state_path = Self::uploaded_root_state_path(store.base_path(), tree_name);
1942 let servers = self.config.blossom_write_servers.clone();
1943 let root = pending_root.clone();
1944 let publish_state = Arc::clone(publish_state);
1945 let log_label = log_label.to_string();
1946 tokio::task::spawn_blocking(move || {
1947 let runtime = tokio::runtime::Builder::new_current_thread()
1948 .enable_all()
1949 .build()
1950 .expect("build nostr mirror root upload runtime");
1951 runtime.block_on(async move {
1952 let result = background_blossom_push_incremental_with_store(
1953 store,
1954 root.clone(),
1955 previous_uploaded_root,
1956 &servers,
1957 )
1958 .await;
1959 let mut state = publish_state.lock().expect("root publish state");
1960 if state.upload_in_progress_root.as_ref() == Some(&root) {
1961 state.upload_in_progress_root = None;
1962 }
1963 match result {
1964 Ok(()) => {
1965 if let Err(err) =
1966 Self::write_uploaded_root_state(&upload_state_path, &root, &log_label)
1967 {
1968 warn!("Nostr mirror failed to persist uploaded root state: {err:#}");
1969 }
1970 state.last_uploaded_root = Some(root.clone());
1971 state.last_uploaded_at = Some(Instant::now());
1972 if state.pending_root.as_ref() == Some(&root) {
1973 state.last_upload_failed_at = None;
1974 state.last_upload_error = None;
1975 state.missing_blob_rebuild_required = false;
1976 }
1977 info!(
1978 "Nostr mirror uploaded {} DAG to Blossom: hash={}",
1979 log_label,
1980 hex::encode(root.hash)
1981 );
1982 }
1983 Err(err) => {
1984 if state.pending_root.as_ref() == Some(&root) {
1985 state.last_upload_failed_at = Some(Instant::now());
1986 state.last_upload_error = Some(format!("{err:#}"));
1987 }
1988 if is_missing_local_blob_message(&format!("{err:#}")) {
1989 state.missing_blob_rebuild_required = true;
1990 }
1991 warn!(
1992 "Nostr mirror {} DAG upload failed: hash={} error={:#}",
1993 log_label,
1994 hex::encode(root.hash),
1995 err
1996 );
1997 }
1998 }
1999 });
2000 trim_transient_allocations();
2001 });
2002
2003 true
2004 }
2005
2006 async fn publish_root_event_to_relays(
2007 &self,
2008 publish_client: &Client,
2009 relays: &[String],
2010 event: &Event,
2011 ) -> Result<(Vec<String>, Vec<String>)> {
2012 let primary_send = Self::send_root_event_to_relays(publish_client, relays, event);
2013 let (mut successful_relays, mut failed_relays) =
2014 match tokio::time::timeout(MIRROR_ROOT_PUBLISH_PRIMARY_TIMEOUT, primary_send).await {
2015 Ok(result) => result,
2016 Err(_) => (
2017 Vec::new(),
2018 vec![format!(
2019 "publish relays: primary publish timed out after {:?}",
2020 MIRROR_ROOT_PUBLISH_PRIMARY_TIMEOUT
2021 )],
2022 ),
2023 };
2024
2025 if successful_relays.is_empty() {
2026 let (retry_successes, retry_failures) = self
2027 .publish_root_event_with_fresh_client(relays, event)
2028 .await;
2029 successful_relays.extend(retry_successes);
2030 failed_relays.extend(retry_failures);
2031 }
2032
2033 Ok((successful_relays, failed_relays))
2034 }
2035
2036 async fn send_root_event_to_relays(
2037 publish_client: &Client,
2038 relays: &[String],
2039 event: &Event,
2040 ) -> (Vec<String>, Vec<String>) {
2041 let mut successful_relays = Vec::new();
2042 let mut failed_relays = Vec::new();
2043
2044 match publish_client
2045 .send_event_to(relays.iter().map(|relay| relay.as_str()), event.clone())
2046 .await
2047 {
2048 Ok(output) => {
2049 for relay in relays {
2050 let relay_url = relay.trim_end_matches('/');
2051 if output
2052 .success
2053 .iter()
2054 .any(|url| url.as_str().trim_end_matches('/') == relay_url)
2055 {
2056 successful_relays.push(relay.clone());
2057 }
2058 }
2059 failed_relays.extend(output.failed.into_iter().map(|(url, reason)| match reason {
2060 Some(reason) => format!("{url}: {reason}"),
2061 None => format!("{url}: relay rejected publish"),
2062 }));
2063 }
2064 Err(err) => {
2065 failed_relays.push(format!("publish relays: {err}"));
2066 }
2067 }
2068
2069 (successful_relays, failed_relays)
2070 }
2071
2072 async fn publish_root_event_with_fresh_client(
2073 &self,
2074 relays: &[String],
2075 event: &Event,
2076 ) -> (Vec<String>, Vec<String>) {
2077 let client = Client::with_opts(Keys::generate(), Options::new().wait_for_send(false));
2078 let mut setup_failures = Vec::new();
2079 for relay in relays {
2080 if let Err(err) = client.add_relay(relay).await {
2081 setup_failures.push(format!("{relay}: add relay failed: {err}"));
2082 }
2083 }
2084
2085 client.connect().await;
2086 let publish = Self::send_root_event_to_relays(&client, relays, event);
2087 let retry_timeout = self
2088 .config
2089 .fetch_timeout
2090 .min(MIRROR_ROOT_PUBLISH_RETRY_TIMEOUT);
2091 let result = tokio::time::timeout(retry_timeout, publish).await;
2092 let _ = client.disconnect().await;
2093
2094 match result {
2095 Ok((successful_relays, mut failed_relays)) => {
2096 failed_relays.extend(setup_failures);
2097 (successful_relays, failed_relays)
2098 }
2099 Err(_) => {
2100 setup_failures.push(format!(
2101 "fresh publish client timed out after {:?}",
2102 retry_timeout
2103 ));
2104 (Vec::new(), setup_failures)
2105 }
2106 }
2107 }
2108
2109 fn build_public_root_event(
2110 tree_name: &str,
2111 cid: &hashtree_core::Cid,
2112 created_at: Timestamp,
2113 ) -> EventBuilder {
2114 let mut tags = vec![
2115 Tag::identifier(tree_name.to_string()),
2116 Tag::custom(
2117 TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
2118 vec!["hashtree"],
2119 ),
2120 Tag::custom(TagKind::Custom("hash".into()), vec![hex::encode(cid.hash)]),
2121 ];
2122 if let Some(key) = cid.key {
2123 tags.push(Tag::custom(
2124 TagKind::Custom("key".into()),
2125 vec![hex::encode(key)],
2126 ));
2127 }
2128
2129 EventBuilder::new(Kind::Custom(30078), "", tags).custom_created_at(created_at)
2130 }
2131}
2132
2133fn is_missing_local_blob_push_error(error: &anyhow::Error) -> bool {
2134 error
2135 .chain()
2136 .any(|cause| cause.to_string().contains(MISSING_LOCAL_BLOB_PUSH_ERROR))
2137}
2138
2139fn is_missing_local_blob_message(message: &str) -> bool {
2140 message.contains(MISSING_LOCAL_BLOB_PUSH_ERROR)
2141}
2142
2143fn next_replaceable_created_at(now: Timestamp, latest_existing: Option<Timestamp>) -> Timestamp {
2144 match latest_existing {
2145 Some(latest) if latest >= now => Timestamp::from_secs(latest.as_u64().saturating_add(1)),
2146 _ => now,
2147 }
2148}
2149
2150#[cfg(test)]
2151mod tests;