1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::sync::Arc;
3use std::sync::Mutex;
4use std::time::Duration;
5use std::time::Instant;
6
7use anyhow::{Context, Result};
8use hashtree_nostr::{
9 CrawlConfig, CrawlReport, ListEventsOptions, NostrBridge, NostrEventStore, RelayFetchMode,
10};
11use nostr::{
12 Alphabet, Event, EventBuilder, Filter, Kind, PublicKey, SingleLetterTag, Tag, TagKind,
13 Timestamp,
14};
15use nostr_sdk::{
16 pool::RelayLimits, prelude::RelayPoolNotification, Client, EventSource, Keys, Options,
17 RelayStatus,
18};
19use tokio::sync::watch;
20use tracing::{debug, info, warn};
21
22use crate::blossom_push::background_blossom_push_with_store;
23use crate::socialgraph::crawler::SOCIALGRAPH_RELAY_EVENT_MAX_SIZE;
24use crate::socialgraph::{self, SocialGraphBackend, SocialGraphStore};
25use crate::HashtreeStore;
26
27#[cfg(not(test))]
28const MIRROR_STARTUP_DELAY: Duration = Duration::from_secs(8);
29#[cfg(test)]
30const MIRROR_STARTUP_DELAY: Duration = Duration::from_millis(50);
31
32#[cfg(not(test))]
33const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_secs(1);
34#[cfg(test)]
35const MIRROR_CONNECT_SETTLE_DELAY: Duration = Duration::from_millis(250);
36
37#[cfg(not(test))]
38const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
39#[cfg(test)]
40const MIRROR_AUTHOR_REFRESH_INTERVAL: Duration = Duration::from_millis(100);
41
42#[cfg(not(test))]
43const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_secs(30);
44#[cfg(test)]
45const MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN: Duration = Duration::from_millis(100);
46
47const KIND_LONG_FORM_CONTENT: u16 = 30_023;
48const DEFAULT_HISTORY_KINDS: [u16; 7] = [0, 1, 3, 6, 7, 9735, KIND_LONG_FORM_CONTENT];
49const DEFAULT_EVENT_TREE_NAME: &str = "nostr-event-index";
50const DEFAULT_PROFILE_SEARCH_TREE_NAME: &str = "profile-search";
51const DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME: &str = "profiles-by-pubkey";
52const METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 1;
53const METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE: usize = 64;
54const DEFAULT_FULL_TEXT_NOTE_HISTORY_FOLLOW_DISTANCE: u32 = 2;
55const DEFAULT_FULL_TEXT_NOTE_HISTORY_MAX_RELAY_PAGES: usize = 0;
56const LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER: usize = 8;
57const LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT: usize = 16;
58const LARGE_HISTORY_SYNC_MAX_RELAY_PAGES: usize = 20;
59
60#[cfg(not(test))]
61const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_secs(300);
62#[cfg(test)]
63const MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL: Duration = Duration::from_millis(100);
64
65#[cfg(not(test))]
66const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_secs(5);
67#[cfg(test)]
68const MIRROR_ROOT_PUBLISH_DEBOUNCE: Duration = Duration::from_millis(20);
69
70#[cfg(not(test))]
71const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_secs(30);
72#[cfg(test)]
73const MIRROR_ROOT_PUBLISH_MAX_STALENESS: Duration = Duration::from_millis(100);
74
75const MISSING_LOCAL_BLOB_PUSH_ERROR: &str = "missing local blob while pushing DAG";
76
77#[derive(Debug, Clone)]
78pub struct NostrMirrorConfig {
79 pub relays: Vec<String>,
80 pub publish_relays: Vec<String>,
81 pub blossom_write_servers: Vec<String>,
82 pub max_follow_distance: u32,
83 pub overmute_threshold: f64,
84 pub author_batch_size: usize,
85 pub history_sync_author_chunk_size: usize,
86 pub history_sync_per_author_event_limit: usize,
87 pub missing_profile_backfill_batch_size: usize,
88 pub fetch_timeout: Duration,
89 pub relay_event_max_size: Option<u32>,
90 pub require_negentropy: bool,
91 pub kinds: Vec<u16>,
92 pub history_sync_on_start: bool,
93 pub history_sync_on_reconnect: bool,
94 pub full_text_note_history_follow_distance: Option<u32>,
95 pub full_text_note_history_max_relay_pages: usize,
96 pub published_event_tree_name: Option<String>,
97 pub published_profile_search_tree_name: Option<String>,
98 pub published_profiles_by_pubkey_tree_name: Option<String>,
99}
100
101impl Default for NostrMirrorConfig {
102 fn default() -> Self {
103 Self {
104 relays: Vec::new(),
105 publish_relays: Vec::new(),
106 blossom_write_servers: Vec::new(),
107 max_follow_distance: 2,
108 overmute_threshold: 1.0,
109 author_batch_size: 256,
110 history_sync_author_chunk_size: 5_000,
111 history_sync_per_author_event_limit: 256,
112 missing_profile_backfill_batch_size: 5_000,
113 fetch_timeout: Duration::from_secs(15),
114 relay_event_max_size: Some(SOCIALGRAPH_RELAY_EVENT_MAX_SIZE),
115 require_negentropy: false,
116 kinds: DEFAULT_HISTORY_KINDS.to_vec(),
117 history_sync_on_start: true,
118 history_sync_on_reconnect: true,
119 full_text_note_history_follow_distance: Some(
120 DEFAULT_FULL_TEXT_NOTE_HISTORY_FOLLOW_DISTANCE,
121 ),
122 full_text_note_history_max_relay_pages: DEFAULT_FULL_TEXT_NOTE_HISTORY_MAX_RELAY_PAGES,
123 published_event_tree_name: Some(DEFAULT_EVENT_TREE_NAME.to_string()),
124 published_profile_search_tree_name: Some(DEFAULT_PROFILE_SEARCH_TREE_NAME.to_string()),
125 published_profiles_by_pubkey_tree_name: Some(
126 DEFAULT_PROFILES_BY_PUBKEY_TREE_NAME.to_string(),
127 ),
128 }
129 }
130}
131
132#[derive(Debug, Default)]
133struct RootPublishState {
134 pending_root: Option<hashtree_core::Cid>,
135 last_changed_at: Option<Instant>,
136 dirty_since: Option<Instant>,
137 last_published_root: Option<hashtree_core::Cid>,
138 last_published_at: Option<Instant>,
139 last_published_created_at: Option<Timestamp>,
140 last_uploaded_root: Option<hashtree_core::Cid>,
141 last_uploaded_at: Option<Instant>,
142}
143
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145struct HistorySyncPlan {
146 relay_fetch_mode: RelayFetchMode,
147 author_batch_size: usize,
148 per_author_event_limit: usize,
149 relay_page_size: usize,
150 max_relay_pages: usize,
151}
152
153pub struct BackgroundNostrMirror {
154 config: NostrMirrorConfig,
155 store: Arc<HashtreeStore>,
156 graph_store: Arc<SocialGraphStore>,
157 client: Client,
158 publish_client: Option<Client>,
159 publish_pubkey: Option<PublicKey>,
160 event_publish_state: Mutex<RootPublishState>,
161 profile_search_publish_state: Mutex<RootPublishState>,
162 profiles_by_pubkey_publish_state: Mutex<RootPublishState>,
163 pending_live_events: Mutex<BTreeMap<String, Event>>,
164 missing_profile_cursor: Mutex<usize>,
165 shutdown_tx: watch::Sender<bool>,
166 shutdown_rx: watch::Receiver<bool>,
167}
168
169impl BackgroundNostrMirror {
170 pub async fn new(
171 config: NostrMirrorConfig,
172 store: Arc<HashtreeStore>,
173 graph_store: Arc<SocialGraphStore>,
174 publish_keys: Option<Keys>,
175 ) -> Result<Self> {
176 let client = if let Some(max_size) = config.relay_event_max_size {
177 let mut limits = RelayLimits::default();
178 limits.events.max_size = Some(max_size);
179 Client::with_opts(Keys::generate(), Options::new().relay_limits(limits))
180 } else {
181 Client::new(Keys::generate())
182 };
183 for relay in &config.relays {
184 client
185 .add_relay(relay)
186 .await
187 .with_context(|| format!("add mirror relay {relay}"))?;
188 }
189 client.connect().await;
190
191 let publish_pubkey = publish_keys.as_ref().map(Keys::public_key);
192 let publish_client = if let Some(keys) = publish_keys {
193 if config.publish_relays.is_empty() {
194 None
195 } else {
196 let client = Client::new(keys);
197 for relay in &config.publish_relays {
198 client
199 .add_relay(relay)
200 .await
201 .with_context(|| format!("add mirror publish relay {relay}"))?;
202 }
203 client.connect().await;
204 Some(client)
205 }
206 } else {
207 None
208 };
209
210 let (shutdown_tx, shutdown_rx) = watch::channel(false);
211 Ok(Self {
212 config,
213 store,
214 graph_store,
215 client,
216 publish_client,
217 publish_pubkey,
218 event_publish_state: Mutex::new(RootPublishState::default()),
219 profile_search_publish_state: Mutex::new(RootPublishState::default()),
220 profiles_by_pubkey_publish_state: Mutex::new(RootPublishState::default()),
221 pending_live_events: Mutex::new(BTreeMap::new()),
222 missing_profile_cursor: Mutex::new(0),
223 shutdown_tx,
224 shutdown_rx,
225 })
226 }
227
228 pub fn shutdown(&self) {
229 let _ = self.shutdown_tx.send(true);
230 }
231
232 fn sync_publish_roots_from_store(&self) -> Result<()> {
233 self.note_public_events_root_change()?;
234 self.note_profile_search_root_change()?;
235 self.note_profiles_by_pubkey_root_change()?;
236 Ok(())
237 }
238
239 async fn publish_pending_roots(
240 &self,
241 force_event: bool,
242 force_profile_search: bool,
243 force_profiles_by_pubkey: bool,
244 ) -> (Result<()>, Result<()>, Result<()>) {
245 tokio::join!(
246 self.maybe_publish_event_root(force_event),
247 self.maybe_publish_profile_search_root(force_profile_search),
248 self.maybe_publish_profiles_by_pubkey_root(force_profiles_by_pubkey),
249 )
250 }
251
252 async fn publish_priority_roots(
253 &self,
254 force_event: bool,
255 force_profile_search: bool,
256 force_profiles_by_pubkey: bool,
257 ) -> (Result<()>, Result<()>, Result<()>) {
258 let (profile_search_result, profiles_by_pubkey_result) = tokio::join!(
259 async {
260 if force_profile_search {
261 self.maybe_publish_profile_search_root(true).await
262 } else {
263 Ok(())
264 }
265 },
266 async {
267 if force_profiles_by_pubkey {
268 self.maybe_publish_profiles_by_pubkey_root(true).await
269 } else {
270 Ok(())
271 }
272 },
273 );
274 let event_result = if force_event {
275 self.maybe_publish_event_root(true).await
276 } else {
277 Ok(())
278 };
279 (
280 event_result,
281 profile_search_result,
282 profiles_by_pubkey_result,
283 )
284 }
285
286 pub async fn run(&self) -> Result<()> {
287 if self.config.relays.is_empty() || self.config.max_follow_distance == 0 {
288 return Ok(());
289 }
290
291 info!(
292 "Nostr mirror starting: relays={} max_follow_distance={} negentropy_only={} kinds={:?} history_sync_author_chunk_size={} history_sync_on_start={} history_sync_on_reconnect={}",
293 self.config.relays.len(),
294 self.config.max_follow_distance,
295 self.config.require_negentropy,
296 self.config.kinds,
297 self.config.history_sync_author_chunk_size.max(1),
298 self.config.history_sync_on_start,
299 self.config.history_sync_on_reconnect
300 );
301
302 tokio::time::sleep(MIRROR_STARTUP_DELAY).await;
303 tokio::time::sleep(MIRROR_CONNECT_SETTLE_DELAY).await;
304 let live_since = Timestamp::now();
305 self.sync_publish_roots_from_store()?;
306
307 let initial_authors = self.collect_authors()?;
308 if initial_authors.is_empty() {
309 info!("Nostr mirror: no social-graph authors to mirror yet");
310 }
311
312 let mut subscribed_authors = HashSet::new();
313 self.subscribe_authors_since(&initial_authors, live_since, &mut subscribed_authors)
314 .await?;
315
316 if !initial_authors.is_empty() && self.config.history_sync_on_start {
317 self.history_sync_full_text_notes_for_reachable_authors()
318 .await?;
319 if self.should_backfill_missing_profiles(None) {
320 let missing_profile_authors = self.collect_missing_profile_authors(
321 self.config.missing_profile_backfill_batch_size,
322 )?;
323 if !missing_profile_authors.is_empty() {
324 info!(
325 "Nostr mirror missing-profile backfill starting: authors={}",
326 missing_profile_authors.len()
327 );
328 self.history_sync_authors_with_kinds(
329 missing_profile_authors,
330 &[Kind::Metadata.as_u16()],
331 )
332 .await?;
333 }
334 }
335 self.history_sync_authors(initial_authors.clone()).await?;
336 }
337
338 let mut relay_statuses = self.capture_relay_statuses().await;
339 let mut last_reconnect_history_sync_at: Option<Instant> = None;
340 let mut last_missing_profile_backfill_at: Option<Instant> = None;
341 let mut notifications = self.client.notifications();
342 let mut shutdown_rx = self.shutdown_rx.clone();
343 let mut refresh_interval = tokio::time::interval(MIRROR_AUTHOR_REFRESH_INTERVAL);
344 let mut publish_interval = tokio::time::interval(MIRROR_ROOT_PUBLISH_DEBOUNCE);
345
346 loop {
347 tokio::select! {
348 _ = shutdown_rx.changed() => {
349 if *shutdown_rx.borrow() {
350 break;
351 }
352 }
353 _ = refresh_interval.tick() => {
354 let authors = self.collect_authors()?;
355 let new_authors = authors
356 .into_iter()
357 .filter(|author| !subscribed_authors.contains(author))
358 .collect::<Vec<_>>();
359 if !new_authors.is_empty() {
360 debug!(
361 "Nostr mirror discovered {} newly reachable author(s)",
362 new_authors.len()
363 );
364 self.history_sync_full_text_notes_for_authors(new_authors.clone())
365 .await?;
366 self.history_sync_authors(new_authors.clone()).await?;
367 self.subscribe_authors_since(
368 &new_authors,
369 Timestamp::now(),
370 &mut subscribed_authors,
371 )
372 .await?;
373 }
374 if self.should_backfill_missing_profiles(last_missing_profile_backfill_at) {
375 let missing_profile_authors = self.collect_missing_profile_authors(
376 self.config.missing_profile_backfill_batch_size,
377 )?;
378 if !missing_profile_authors.is_empty() {
379 info!(
380 "Nostr mirror missing-profile backfill starting: authors={}",
381 missing_profile_authors.len()
382 );
383 self.history_sync_authors_with_kinds(
384 missing_profile_authors,
385 &[Kind::Metadata.as_u16()],
386 )
387 .await?;
388 last_missing_profile_backfill_at = Some(Instant::now());
389 }
390 }
391 }
392 _ = publish_interval.tick() => {
393 self.sync_publish_roots_from_store()?;
394 if let Err(err) = self.flush_live_events().await {
395 warn!("Nostr mirror live event flush failed: {:#}", err);
396 }
397 let (event_result, profile_search_result, profiles_by_pubkey_result) = self
398 .publish_pending_roots(false, false, false)
399 .await;
400 if let Err(err) = event_result {
401 warn!("Nostr mirror event-root publish failed: {:#}", err);
402 }
403 if let Err(err) = profile_search_result {
404 warn!("Nostr mirror profile-search publish failed: {:#}", err);
405 }
406 if let Err(err) = profiles_by_pubkey_result {
407 warn!("Nostr mirror profiles-by-pubkey publish failed: {:#}", err);
408 }
409 }
410 notification = notifications.recv() => {
411 match notification {
412 Ok(RelayPoolNotification::Event { event, .. }) => {
413 self.ingest_live_event(&event)?;
414 }
415 Ok(RelayPoolNotification::RelayStatus { relay_url, status }) => {
416 let relay_url = relay_url.to_string();
417 let previous = relay_statuses.insert(relay_url.clone(), status);
418 if Self::should_history_sync_on_reconnect(
419 self.config.history_sync_on_reconnect,
420 previous,
421 status,
422 ) && Self::should_run_reconnect_history_sync(
423 last_reconnect_history_sync_at.as_ref(),
424 )
425 {
426 let authors = self.collect_authors()?;
427 if !authors.is_empty() {
428 info!(
429 "Nostr mirror relay reconnected; running catch-up history sync: relay={} authors={} negentropy_only={}",
430 relay_url,
431 authors.len(),
432 self.config.require_negentropy
433 );
434 self.history_sync_authors(authors).await?;
435 last_reconnect_history_sync_at = Some(Instant::now());
436 }
437 }
438 }
439 Ok(RelayPoolNotification::Shutdown) => break,
440 Ok(_) => {}
441 Err(err) => {
442 warn!("Nostr mirror notification error: {}", err);
443 break;
444 }
445 }
446 }
447 }
448 }
449
450 if let Err(err) = self.flush_live_events().await {
451 warn!(
452 "Nostr mirror live event flush failed during shutdown: {:#}",
453 err
454 );
455 }
456 if let Err(err) = self.sync_publish_roots_from_store() {
457 warn!(
458 "Nostr mirror root-state refresh failed during shutdown: {:#}",
459 err
460 );
461 }
462 let (event_result, profile_search_result, profiles_by_pubkey_result) =
463 self.publish_pending_roots(true, true, true).await;
464 if let Err(err) = event_result {
465 warn!(
466 "Nostr mirror event-root publish failed during shutdown: {:#}",
467 err
468 );
469 }
470 if let Err(err) = profile_search_result {
471 warn!(
472 "Nostr mirror profile-search publish failed during shutdown: {:#}",
473 err
474 );
475 }
476 if let Err(err) = profiles_by_pubkey_result {
477 warn!(
478 "Nostr mirror profiles-by-pubkey publish failed during shutdown: {:#}",
479 err
480 );
481 }
482 let _ = self.client.disconnect().await;
483 if let Some(client) = self.publish_client.as_ref() {
484 let _ = client.disconnect().await;
485 }
486 Ok(())
487 }
488
489 async fn capture_relay_statuses(&self) -> HashMap<String, RelayStatus> {
490 let mut statuses = HashMap::new();
491 for (relay_url, relay) in self.client.relays().await {
492 statuses.insert(relay_url.to_string(), relay.status().await);
493 }
494 statuses
495 }
496
497 async fn has_connected_publish_relay(&self) -> bool {
498 let Some(client) = self.publish_client.as_ref() else {
499 return false;
500 };
501 Self::client_has_connected_relay(client).await
502 }
503
504 async fn client_has_connected_relay(client: &Client) -> bool {
505 for (_relay_url, relay) in client.relays().await {
506 if relay.status().await == RelayStatus::Connected {
507 return true;
508 }
509 }
510 false
511 }
512
513 fn collect_authors(&self) -> Result<Vec<String>> {
514 self.collect_authors_with_max_distance(self.config.max_follow_distance)
515 }
516
517 fn collect_authors_with_max_distance(&self, max_distance: u32) -> Result<Vec<String>> {
518 let mut authors = Vec::new();
519 let mut seen = HashSet::new();
520 for distance in 0..=max_distance {
521 for pubkey in socialgraph::SocialGraphBackend::users_by_follow_distance(
522 self.graph_store.as_ref(),
523 distance,
524 )
525 .with_context(|| format!("load social-graph distance {distance}"))?
526 {
527 if self
528 .graph_store
529 .is_overmuted_user(&pubkey, self.config.overmute_threshold)?
530 {
531 continue;
532 }
533 let hex = hex::encode(pubkey);
534 if seen.insert(hex.clone()) {
535 authors.push(hex);
536 }
537 }
538 }
539 Ok(authors)
540 }
541
542 fn full_text_note_history_follow_distance(&self) -> Option<u32> {
543 let distance = self.config.full_text_note_history_follow_distance?;
544 if self
545 .config
546 .kinds
547 .iter()
548 .any(|kind| *kind == Kind::TextNote.as_u16() || *kind == KIND_LONG_FORM_CONTENT)
549 {
550 Some(distance.min(self.config.max_follow_distance))
551 } else {
552 None
553 }
554 }
555
556 fn collect_missing_profile_authors(&self, limit: usize) -> Result<Vec<String>> {
557 if limit == 0 {
558 return Ok(Vec::new());
559 }
560
561 let authors = self.collect_authors()?;
562 if authors.is_empty() {
563 return Ok(Vec::new());
564 }
565
566 let mut cursor = self
567 .missing_profile_cursor
568 .lock()
569 .expect("missing profile cursor");
570 let mut index = (*cursor).min(authors.len());
571 let mut scanned = 0usize;
572 let mut missing = Vec::new();
573
574 while scanned < authors.len() && missing.len() < limit {
575 let author = &authors[index];
576 if self.graph_store.latest_profile_event(author)?.is_none() {
577 missing.push(author.clone());
578 }
579 index += 1;
580 if index == authors.len() {
581 index = 0;
582 }
583 scanned += 1;
584 }
585
586 *cursor = index;
587 Ok(missing)
588 }
589
590 fn should_backfill_missing_profiles(&self, last_run: Option<Instant>) -> bool {
591 if self.config.missing_profile_backfill_batch_size == 0
592 || !self.config.kinds.contains(&Kind::Metadata.as_u16())
593 {
594 return false;
595 }
596 match last_run {
597 Some(last_run) => last_run.elapsed() >= MIRROR_MISSING_PROFILE_BACKFILL_INTERVAL,
598 None => true,
599 }
600 }
601
602 fn should_history_sync_on_reconnect(
603 history_sync_on_reconnect: bool,
604 previous: Option<RelayStatus>,
605 status: RelayStatus,
606 ) -> bool {
607 history_sync_on_reconnect
608 && status == RelayStatus::Connected
609 && matches!(
610 previous,
611 Some(
612 RelayStatus::Initialized
613 | RelayStatus::Pending
614 | RelayStatus::Connecting
615 | RelayStatus::Disconnected
616 | RelayStatus::Terminated
617 )
618 )
619 }
620
621 fn should_run_reconnect_history_sync(last_run: Option<&Instant>) -> bool {
622 match last_run {
623 None => true,
624 Some(last_run) => last_run.elapsed() >= MIRROR_RECONNECT_HISTORY_SYNC_COOLDOWN,
625 }
626 }
627
628 fn is_metadata_only_history_sync(kinds: &[u16]) -> bool {
629 !kinds.is_empty() && kinds.iter().all(|kind| *kind == Kind::Metadata.as_u16())
630 }
631
632 fn history_sync_kinds_affect_profile_or_graph(kinds: &[u16]) -> bool {
633 kinds.is_empty()
634 || kinds.iter().any(|kind| {
635 *kind == Kind::Metadata.as_u16()
636 || *kind == Kind::ContactList.as_u16()
637 || *kind == Kind::MuteList.as_u16()
638 })
639 }
640
641 fn history_sync_plan_for(
642 config: &NostrMirrorConfig,
643 authors: usize,
644 kinds: &[u16],
645 ) -> HistorySyncPlan {
646 let author_batch_size = config.author_batch_size.max(1);
647 let per_author_event_limit = config.history_sync_per_author_event_limit.max(1);
648 let relay_page_size = 1_000;
649 let max_relay_pages = 10;
650
651 if Self::is_metadata_only_history_sync(kinds) {
652 return HistorySyncPlan {
653 relay_fetch_mode: RelayFetchMode::AuthorBatches,
654 author_batch_size: author_batch_size.min(METADATA_HISTORY_SYNC_AUTHOR_BATCH_SIZE),
655 per_author_event_limit: METADATA_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT,
656 relay_page_size,
657 max_relay_pages,
658 };
659 }
660
661 if authors > author_batch_size.saturating_mul(LARGE_HISTORY_SYNC_AUTHOR_MULTIPLIER) {
662 return HistorySyncPlan {
663 relay_fetch_mode: RelayFetchMode::GlobalRecent,
664 author_batch_size,
665 per_author_event_limit: per_author_event_limit
666 .min(LARGE_HISTORY_SYNC_PER_AUTHOR_EVENT_LIMIT)
667 .max(1),
668 relay_page_size,
669 max_relay_pages: LARGE_HISTORY_SYNC_MAX_RELAY_PAGES,
670 };
671 }
672
673 HistorySyncPlan {
674 relay_fetch_mode: RelayFetchMode::AuthorBatches,
675 author_batch_size,
676 per_author_event_limit,
677 relay_page_size,
678 max_relay_pages,
679 }
680 }
681
682 fn history_sync_plan(&self, authors: usize, kinds: &[u16]) -> HistorySyncPlan {
683 Self::history_sync_plan_for(&self.config, authors, kinds)
684 }
685
686 async fn history_sync_authors(&self, authors: Vec<String>) -> Result<()> {
687 self.history_sync_authors_with_kinds(authors, &self.config.kinds)
688 .await
689 }
690
691 async fn history_sync_authors_with_kinds(
692 &self,
693 authors: Vec<String>,
694 kinds: &[u16],
695 ) -> Result<()> {
696 self.history_sync_authors_with_kinds_and_mode(authors, kinds, false, None)
697 .await
698 }
699
700 async fn history_sync_full_text_notes_for_reachable_authors(&self) -> Result<()> {
701 let Some(distance) = self.full_text_note_history_follow_distance() else {
702 return Ok(());
703 };
704 info!(
705 "Nostr mirror full text content history author collection starting: max_follow_distance={distance}"
706 );
707 let authors = self.collect_authors_with_max_distance(distance)?;
708 self.history_sync_full_text_notes_for_authors(authors).await
709 }
710
711 async fn history_sync_full_text_notes_for_authors(&self, authors: Vec<String>) -> Result<()> {
712 let Some(distance) = self.full_text_note_history_follow_distance() else {
713 return Ok(());
714 };
715 let mut close_authors = Vec::new();
716 for author in authors {
717 let Ok(pubkey) = hex::decode(&author) else {
718 continue;
719 };
720 let Ok(pubkey) = <[u8; 32]>::try_from(pubkey.as_slice()) else {
721 continue;
722 };
723 if self
724 .graph_store
725 .follow_distance(&pubkey)?
726 .is_some_and(|actual_distance| actual_distance <= distance)
727 {
728 close_authors.push(author);
729 }
730 }
731 if close_authors.is_empty() {
732 return Ok(());
733 }
734
735 info!(
736 "Nostr mirror full text content history sync starting: authors={} max_follow_distance={} max_relay_pages={}",
737 close_authors.len(),
738 distance,
739 self.config.full_text_note_history_max_relay_pages
740 );
741 let kinds = [Kind::TextNote.as_u16(), KIND_LONG_FORM_CONTENT];
742 self.history_sync_authors_with_kinds_and_mode(
743 close_authors,
744 &kinds,
745 true,
746 Some(self.config.full_text_note_history_max_relay_pages),
747 )
748 .await
749 }
750
751 async fn history_sync_authors_with_kinds_and_mode(
752 &self,
753 authors: Vec<String>,
754 kinds: &[u16],
755 full_author_history: bool,
756 max_relay_pages: Option<usize>,
757 ) -> Result<()> {
758 let update_profile_and_graph = Self::history_sync_kinds_affect_profile_or_graph(kinds);
759 self.history_sync_authors_chunked(
760 authors,
761 |current_root, author_chunk| async move {
762 self.history_sync_author_chunk(
763 current_root,
764 author_chunk,
765 kinds,
766 full_author_history,
767 max_relay_pages,
768 )
769 .await
770 },
771 update_profile_and_graph,
772 )
773 .await
774 }
775
776 async fn history_sync_authors_chunked<F, Fut>(
777 &self,
778 authors: Vec<String>,
779 mut run_chunk: F,
780 update_profile_and_graph: bool,
781 ) -> Result<()>
782 where
783 F: FnMut(Option<hashtree_core::Cid>, Vec<String>) -> Fut,
784 Fut: std::future::Future<Output = Result<CrawlReport>>,
785 {
786 if authors.is_empty() {
787 return Ok(());
788 }
789
790 info!(
791 "Nostr mirror history sync starting: authors={} relays={} negentropy_only={}",
792 authors.len(),
793 self.config.relays.len(),
794 self.config.require_negentropy
795 );
796
797 let mut current_root = self.graph_store.public_events_root_for_write()?;
798 let mut last_error = None;
799 let mut applied_chunks = 0usize;
800 let mut failed_chunks = 0usize;
801 let chunk_size = self.config.history_sync_author_chunk_size.max(1);
802 let total_chunks = authors.len().div_ceil(chunk_size);
803
804 for (chunk_index, author_chunk) in authors.chunks(chunk_size).enumerate() {
805 let author_chunk = author_chunk.to_vec();
806 let author_count = author_chunk.len();
807 info!(
808 "Nostr mirror history sync chunk starting: chunk={}/{} authors={}",
809 chunk_index + 1,
810 total_chunks,
811 author_count
812 );
813 let report = match run_chunk(current_root.clone(), author_chunk).await {
814 Ok(report) => report,
815 Err(err) => {
816 failed_chunks = failed_chunks.saturating_add(1);
817 warn!(
818 "Nostr mirror history sync chunk failed: chunk={}/{} authors={} error={:#}",
819 chunk_index + 1,
820 total_chunks,
821 author_count,
822 err
823 );
824 last_error = Some(err);
825 continue;
826 }
827 };
828
829 if report.root != current_root {
830 self.apply_history_root_with_options(
831 report.root.as_ref(),
832 update_profile_and_graph,
833 )
834 .await?;
835 current_root = report.root.clone();
836 info!(
837 "Nostr mirror history sync updated trusted root: chunk={}/{} authors_processed={} events_selected={} events_seen={}",
838 chunk_index + 1,
839 total_chunks,
840 report.authors_processed,
841 report.events_selected,
842 report.events_seen
843 );
844 }
845 applied_chunks = applied_chunks.saturating_add(1);
846 }
847
848 if applied_chunks == 0 {
849 return Err(last_error
850 .unwrap_or_else(|| anyhow::anyhow!("mirror history sync made no progress"))
851 .context("run mirror history sync"));
852 }
853 if failed_chunks > 0 {
854 warn!(
855 "Nostr mirror history sync completed with skipped chunks: applied_chunks={} failed_chunks={}",
856 applied_chunks, failed_chunks
857 );
858 }
859 Ok(())
860 }
861
862 async fn history_sync_author_chunk(
863 &self,
864 current_root: Option<hashtree_core::Cid>,
865 authors: Vec<String>,
866 kinds: &[u16],
867 full_author_history: bool,
868 max_relay_pages: Option<usize>,
869 ) -> Result<CrawlReport> {
870 let mut last_error = None;
871 let mut report = None;
872 let mut plan = self.history_sync_plan(authors.len(), kinds);
873 if full_author_history {
874 plan.relay_fetch_mode = RelayFetchMode::AuthorBatches;
875 plan.max_relay_pages = max_relay_pages.unwrap_or(plan.max_relay_pages);
876 }
877 for attempt in 0..3 {
878 let mut last_logged_authors = 0usize;
879 let bridge = NostrBridge::new(
880 self.store.store_arc(),
881 CrawlConfig {
882 relays: self.config.relays.clone(),
883 author_allowlist: Some(authors.clone()),
884 max_live_bytes: None,
885 max_events_seen: None,
886 max_authors: None,
887 max_follow_distance: None,
888 author_batch_size: plan.author_batch_size,
889 per_author_event_limit: plan.per_author_event_limit,
890 per_author_live_bytes: None,
891 fetch_timeout: self.config.fetch_timeout,
892 kinds: Some(kinds.to_vec()),
893 relay_fetch_mode: plan.relay_fetch_mode,
894 require_negentropy: self.config.require_negentropy,
895 relay_event_max_size: self.config.relay_event_max_size,
896 relay_page_size: plan.relay_page_size,
897 max_relay_pages: plan.max_relay_pages,
898 full_author_history,
899 },
900 );
901
902 match bridge
903 .crawl_with_progress(self.graph_store.as_ref(), current_root.as_ref(), |progress| {
904 let log_interval = self.config.author_batch_size.saturating_mul(8).max(2_048);
905 let should_log = progress.authors_processed == progress.authors_considered
906 || progress.authors_processed == 0
907 || progress
908 .authors_processed
909 .saturating_sub(last_logged_authors)
910 >= log_interval;
911 if should_log {
912 last_logged_authors = progress.authors_processed;
913 info!(
914 "Nostr mirror history sync progress: authors_processed={}/{} events_selected={} events_seen={}",
915 progress.authors_processed,
916 progress.authors_considered,
917 progress.events_selected,
918 progress.events_seen
919 );
920 }
921 })
922 .await
923 {
924 Ok(next_report) => {
925 report = Some(next_report);
926 break;
927 }
928 Err(err) => {
929 last_error = Some(err);
930 if attempt < 2 {
931 tokio::time::sleep(Duration::from_millis(500)).await;
932 }
933 }
934 }
935 }
936 report
937 .ok_or_else(|| last_error.expect("history sync retry captured error"))
938 .context("run mirror history sync")
939 }
940
941 #[cfg(test)]
942 async fn apply_history_root(&self, root: Option<&hashtree_core::Cid>) -> Result<()> {
943 self.apply_history_root_with_options(root, true).await
944 }
945
946 async fn apply_history_root_with_options(
947 &self,
948 root: Option<&hashtree_core::Cid>,
949 update_profile_and_graph: bool,
950 ) -> Result<()> {
951 self.graph_store.write_public_events_root(root)?;
952 let Some(root) = root else {
953 return Ok(());
954 };
955
956 self.note_public_events_root_change()?;
957 if update_profile_and_graph {
958 let event_store = NostrEventStore::new(self.store.store_arc());
959 let events = event_store
960 .list_recent_lossy(Some(root), ListEventsOptions::default())
961 .await
962 .context("list trusted mirrored events")?
963 .into_iter()
964 .map(socialgraph::stored_event_to_nostr_event)
965 .collect::<Result<Vec<_>>>()?;
966
967 self.graph_store
968 .rebuild_profile_index_for_events(&events)
969 .context("rebuild mirrored profile search index")?;
970 socialgraph::ingest_graph_parsed_events(self.graph_store.as_ref(), &events)
971 .context("sync mirrored social graph state")?;
972 self.note_profile_search_root_change()?;
973 self.note_profiles_by_pubkey_root_change()?;
974 }
975 let (event_result, profile_search_result, profiles_by_pubkey_result) = self
976 .publish_priority_roots(true, update_profile_and_graph, update_profile_and_graph)
977 .await;
978 if let Err(err) = event_result {
979 warn!(
980 "Nostr mirror event-root publish failed after root update: {:#}",
981 err
982 );
983 }
984 if let Err(err) = profile_search_result {
985 warn!(
986 "Nostr mirror profile-search publish failed after root update: {:#}",
987 err
988 );
989 }
990 if let Err(err) = profiles_by_pubkey_result {
991 warn!(
992 "Nostr mirror profiles-by-pubkey publish failed after root update: {:#}",
993 err
994 );
995 }
996 Ok(())
997 }
998
999 async fn subscribe_authors_since(
1000 &self,
1001 authors: &[String],
1002 since: Timestamp,
1003 subscribed_authors: &mut HashSet<String>,
1004 ) -> Result<()> {
1005 let new_authors = authors
1006 .iter()
1007 .filter(|author| !subscribed_authors.contains(*author))
1008 .cloned()
1009 .collect::<Vec<_>>();
1010 if new_authors.is_empty() {
1011 return Ok(());
1012 }
1013
1014 for chunk in new_authors.chunks(self.config.author_batch_size.max(1)) {
1015 let pubkeys = chunk
1016 .iter()
1017 .filter_map(|author| PublicKey::from_hex(author).ok())
1018 .collect::<Vec<_>>();
1019 if pubkeys.is_empty() {
1020 continue;
1021 }
1022
1023 let filter = Filter::new()
1024 .authors(pubkeys)
1025 .kinds(self.config.kinds.iter().copied().map(Kind::from))
1026 .since(since);
1027
1028 self.client
1029 .subscribe(vec![filter], None)
1030 .await
1031 .context("subscribe mirror author batch")?;
1032 }
1033
1034 subscribed_authors.extend(new_authors);
1035 Ok(())
1036 }
1037
1038 fn ingest_live_event(&self, event: &Event) -> Result<()> {
1039 self.pending_live_events
1040 .lock()
1041 .expect("pending live events")
1042 .insert(event.id.to_hex(), event.clone());
1043 Ok(())
1044 }
1045
1046 async fn flush_live_events(&self) -> Result<()> {
1047 let pending = {
1048 let mut pending = self
1049 .pending_live_events
1050 .lock()
1051 .expect("pending live events");
1052 if pending.is_empty() {
1053 return Ok(());
1054 }
1055 std::mem::take(&mut *pending)
1056 };
1057 let events = pending.into_values().collect::<Vec<_>>();
1058 let event_count = events.len();
1059 let previous_event_root = self.graph_store.public_events_root()?;
1060 let previous_profile_search_root = self.graph_store.profile_search_root()?;
1061 let previous_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
1062
1063 socialgraph::ingest_parsed_events_with_storage_class(
1064 self.graph_store.as_ref(),
1065 &events,
1066 socialgraph::EventStorageClass::Public,
1067 )
1068 .context("ingest live mirrored event batch")?;
1069
1070 let next_event_root = self.graph_store.public_events_root()?;
1071 let next_profile_search_root = self.graph_store.profile_search_root()?;
1072 let next_profiles_by_pubkey_root = self.graph_store.profiles_by_pubkey_root()?;
1073 let event_root_changed = next_event_root != previous_event_root;
1074 let profile_search_root_changed = next_profile_search_root != previous_profile_search_root;
1075 let profiles_by_pubkey_root_changed =
1076 next_profiles_by_pubkey_root != previous_profiles_by_pubkey_root;
1077
1078 if event_root_changed {
1079 self.note_public_events_root_change()?;
1080 }
1081 if profile_search_root_changed {
1082 self.note_profile_search_root_change()?;
1083 }
1084 if profiles_by_pubkey_root_changed {
1085 self.note_profiles_by_pubkey_root_change()?;
1086 }
1087 if profile_search_root_changed {
1088 self.maybe_publish_profile_search_root(true).await?;
1089 }
1090 if profiles_by_pubkey_root_changed {
1091 self.maybe_publish_profiles_by_pubkey_root(true).await?;
1092 }
1093 if event_root_changed {
1094 self.maybe_publish_event_root(true).await?;
1095 }
1096 info!(
1097 "Nostr mirror flushed live events: events={} event_root_changed={} profile_search_root_changed={} profiles_by_pubkey_root_changed={}",
1098 event_count,
1099 event_root_changed,
1100 profile_search_root_changed,
1101 profiles_by_pubkey_root_changed
1102 );
1103 Ok(())
1104 }
1105
1106 fn note_public_events_root_change(&self) -> Result<()> {
1107 let root = self.graph_store.public_events_root()?;
1108 Self::note_root_change(
1109 self.config.published_event_tree_name.as_deref(),
1110 &self.event_publish_state,
1111 root,
1112 )
1113 }
1114
1115 fn note_profile_search_root_change(&self) -> Result<()> {
1116 let root = self.graph_store.profile_search_root()?;
1117 Self::note_root_change(
1118 self.config.published_profile_search_tree_name.as_deref(),
1119 &self.profile_search_publish_state,
1120 root,
1121 )
1122 }
1123
1124 fn note_profiles_by_pubkey_root_change(&self) -> Result<()> {
1125 let root = self.graph_store.profiles_by_pubkey_root()?;
1126 Self::note_root_change(
1127 self.config
1128 .published_profiles_by_pubkey_tree_name
1129 .as_deref(),
1130 &self.profiles_by_pubkey_publish_state,
1131 root,
1132 )
1133 }
1134
1135 fn note_root_change(
1136 tree_name: Option<&str>,
1137 publish_state: &Mutex<RootPublishState>,
1138 root: Option<hashtree_core::Cid>,
1139 ) -> Result<()> {
1140 let Some(_tree_name) = tree_name else {
1141 return Ok(());
1142 };
1143
1144 let mut state = publish_state.lock().expect("root publish state");
1145 let now = Instant::now();
1146
1147 if state.pending_root == root {
1148 return Ok(());
1149 }
1150
1151 state.pending_root = root;
1152 state.last_changed_at = Some(now);
1153 if state.dirty_since.is_none() {
1154 state.dirty_since = Some(now);
1155 }
1156 Ok(())
1157 }
1158
1159 async fn maybe_publish_event_root(&self, force: bool) -> Result<()> {
1160 self.ensure_public_events_root_is_publishable().await?;
1161 let result = self
1162 .maybe_publish_root(
1163 self.config.published_event_tree_name.as_deref(),
1164 &self.event_publish_state,
1165 "event root",
1166 force,
1167 )
1168 .await;
1169 let Err(error) = result else {
1170 return Ok(());
1171 };
1172 if !is_missing_local_blob_push_error(&error) {
1173 return Err(error);
1174 }
1175
1176 warn!(
1177 "Nostr mirror event root DAG references missing local blobs; rebuilding event indexes from stored events"
1178 );
1179 let (public_count, ambient_count) = self
1180 .graph_store
1181 .rebuild_event_indexes_from_stored_events_async()
1182 .await
1183 .context("rebuild event indexes after missing event blobs")?;
1184 info!(
1185 "Nostr mirror rebuilt event indexes after missing blobs: public={} ambient={}",
1186 public_count, ambient_count
1187 );
1188 self.sync_publish_roots_from_store()?;
1189
1190 self.maybe_publish_root(
1191 self.config.published_event_tree_name.as_deref(),
1192 &self.event_publish_state,
1193 "event root",
1194 force,
1195 )
1196 .await
1197 }
1198
1199 async fn ensure_public_events_root_is_publishable(&self) -> Result<()> {
1200 let Some(root) = self.graph_store.public_events_root()? else {
1201 return Ok(());
1202 };
1203 let event_store = NostrEventStore::new(self.store.store_arc());
1204 if let Err(err) = event_store.validate_index_root(Some(&root)).await {
1205 warn!(
1206 "Nostr mirror refusing to publish invalid event index root {}; clearing trusted root: {}",
1207 hex::encode(root.hash),
1208 err
1209 );
1210 self.graph_store.write_public_events_root(None)?;
1211 self.note_public_events_root_change()?;
1212 }
1213 Ok(())
1214 }
1215
1216 async fn maybe_publish_profile_search_root(&self, force: bool) -> Result<()> {
1217 self.maybe_publish_root(
1218 self.config.published_profile_search_tree_name.as_deref(),
1219 &self.profile_search_publish_state,
1220 "profile search root",
1221 force,
1222 )
1223 .await
1224 }
1225
1226 async fn maybe_publish_profiles_by_pubkey_root(&self, force: bool) -> Result<()> {
1227 self.maybe_publish_root(
1228 self.config
1229 .published_profiles_by_pubkey_tree_name
1230 .as_deref(),
1231 &self.profiles_by_pubkey_publish_state,
1232 "profiles-by-pubkey root",
1233 force,
1234 )
1235 .await
1236 }
1237
1238 async fn maybe_publish_root(
1239 &self,
1240 tree_name: Option<&str>,
1241 publish_state: &Mutex<RootPublishState>,
1242 log_label: &str,
1243 force: bool,
1244 ) -> Result<()> {
1245 let Some(tree_name) = tree_name else {
1246 return Ok(());
1247 };
1248
1249 let pending_root = {
1250 let state = publish_state.lock().expect("root publish state");
1251 let Some(pending_root) = state.pending_root.clone() else {
1252 return Ok(());
1253 };
1254
1255 let now = Instant::now();
1256 let debounce_ready = state.last_changed_at.is_some_and(|changed_at| {
1257 now.duration_since(changed_at) >= MIRROR_ROOT_PUBLISH_DEBOUNCE
1258 });
1259 let stale_ready = state.dirty_since.is_some_and(|dirty_since| {
1260 now.duration_since(dirty_since) >= MIRROR_ROOT_PUBLISH_MAX_STALENESS
1261 });
1262 if !force && !debounce_ready && !stale_ready {
1263 return Ok(());
1264 }
1265
1266 pending_root
1267 };
1268
1269 let needs_upload = {
1270 let state = publish_state.lock().expect("root publish state");
1271 !self.config.blossom_write_servers.is_empty()
1272 && state.last_uploaded_root.as_ref() != Some(&pending_root)
1273 };
1274 if needs_upload {
1275 background_blossom_push_with_store(
1276 Arc::clone(&self.store),
1277 &pending_root.to_string(),
1278 &self.config.blossom_write_servers,
1279 )
1280 .await
1281 .with_context(|| format!("upload {log_label} DAG to Blossom"))?;
1282
1283 let mut state = publish_state.lock().expect("root publish state");
1284 if state.pending_root.as_ref() == Some(&pending_root) {
1285 state.last_uploaded_root = Some(pending_root.clone());
1286 state.last_uploaded_at = Some(Instant::now());
1287 }
1288 }
1289
1290 let mut successful_relays = Vec::new();
1291 let mut failed_relays = Vec::new();
1292 let publish_required =
1293 self.publish_client.is_some() && !self.config.publish_relays.is_empty();
1294 if publish_required {
1295 let Some(publish_client) = self.publish_client.as_ref() else {
1296 unreachable!("publish_required implies publish_client");
1297 };
1298 if !self.has_connected_publish_relay().await {
1299 return Ok(());
1300 }
1301
1302 let already_published = {
1303 let state = publish_state.lock().expect("root publish state");
1304 state.last_published_root.as_ref() == Some(&pending_root)
1305 };
1306 if !already_published {
1307 let publish_relays = self.config.publish_relays.clone();
1308 let latest_known_created_at = {
1309 let state = publish_state.lock().expect("root publish state");
1310 state.last_published_created_at
1311 };
1312 let publish_created_at = next_replaceable_created_at(
1313 Timestamp::now(),
1314 later_timestamp(
1315 latest_known_created_at,
1316 self.latest_root_event_created_at(tree_name).await,
1317 ),
1318 );
1319 let event = publish_client
1320 .sign_event_builder(Self::build_public_root_event(
1321 tree_name,
1322 &pending_root,
1323 publish_created_at,
1324 ))
1325 .await
1326 .with_context(|| format!("sign {log_label} event"))?;
1327 let publish_result = self
1328 .publish_root_event_to_relays(publish_client, &publish_relays, &event)
1329 .await
1330 .with_context(|| format!("publish {log_label} event"))?;
1331 successful_relays = publish_result.0;
1332 failed_relays = publish_result.1;
1333 if successful_relays.is_empty() {
1334 let failure_summary = if failed_relays.is_empty() {
1335 "no publish relays accepted the event".to_string()
1336 } else {
1337 failed_relays.join("; ")
1338 };
1339 anyhow::bail!("no publish relays accepted the event ({failure_summary})");
1340 }
1341
1342 let mut state = publish_state.lock().expect("root publish state");
1343 if state.pending_root.as_ref() == Some(&pending_root) {
1344 state.last_published_root = Some(pending_root.clone());
1345 state.last_published_at = Some(Instant::now());
1346 state.last_published_created_at = Some(event.created_at);
1347 }
1348 }
1349 }
1350
1351 {
1352 let mut state = publish_state.lock().expect("root publish state");
1353 if state.pending_root.as_ref() == Some(&pending_root) {
1354 let upload_satisfied = self.config.blossom_write_servers.is_empty()
1355 || state.last_uploaded_root.as_ref() == Some(&pending_root);
1356 let publish_satisfied =
1357 !publish_required || state.last_published_root.as_ref() == Some(&pending_root);
1358 if upload_satisfied && publish_satisfied {
1359 state.dirty_since = None;
1360 }
1361 }
1362 }
1363
1364 info!(
1365 "Nostr mirror published {}: tree={} hash={} relays={:?}",
1366 log_label,
1367 tree_name,
1368 hex::encode(pending_root.hash),
1369 successful_relays,
1370 );
1371 if !failed_relays.is_empty() {
1372 warn!(
1373 "Nostr mirror publish had relay failures: tree={} failures={:?}",
1374 tree_name, failed_relays
1375 );
1376 }
1377 Ok(())
1378 }
1379
1380 async fn publish_root_event_to_relays(
1381 &self,
1382 publish_client: &Client,
1383 relays: &[String],
1384 event: &Event,
1385 ) -> Result<(Vec<String>, Vec<String>)> {
1386 let mut successful_relays = Vec::new();
1387 let mut failed_relays = Vec::new();
1388
1389 for relay in relays {
1390 match publish_client
1391 .send_event_to([relay.as_str()], event.clone())
1392 .await
1393 {
1394 Ok(output) => {
1395 if output.success.is_empty() {
1396 failed_relays.push(format!("{relay}: relay did not acknowledge publish"));
1397 continue;
1398 }
1399 successful_relays.push(relay.clone());
1400 failed_relays.extend(output.failed.into_iter().map(
1401 |(url, reason)| match reason {
1402 Some(reason) => format!("{url}: {reason}"),
1403 None => format!("{url}: relay rejected publish"),
1404 },
1405 ));
1406 }
1407 Err(err) => {
1408 failed_relays.push(format!("{relay}: {err}"));
1409 }
1410 }
1411 }
1412
1413 Ok((successful_relays, failed_relays))
1414 }
1415
1416 async fn latest_root_event_created_at(&self, tree_name: &str) -> Option<Timestamp> {
1417 let publish_client = self.publish_client.as_ref()?;
1418 let author = self.publish_pubkey?;
1419 let events = publish_client
1420 .get_events_of(
1421 vec![Self::build_public_root_filter(author, tree_name)],
1422 EventSource::relays(Some(self.config.fetch_timeout)),
1423 )
1424 .await
1425 .ok()?;
1426 events
1427 .iter()
1428 .filter(|event| Self::matches_public_root_event(event, tree_name))
1429 .max_by_key(|event| (event.created_at, event.id))
1430 .map(|event| event.created_at)
1431 }
1432
1433 fn build_public_root_filter(author: PublicKey, tree_name: &str) -> Filter {
1434 Filter::new()
1435 .kind(Kind::Custom(30078))
1436 .author(author)
1437 .custom_tag(
1438 SingleLetterTag::lowercase(Alphabet::D),
1439 vec![tree_name.to_string()],
1440 )
1441 .custom_tag(
1442 SingleLetterTag::lowercase(Alphabet::L),
1443 vec!["hashtree".to_string()],
1444 )
1445 .limit(50)
1446 }
1447
1448 fn matches_public_root_event(event: &Event, tree_name: &str) -> bool {
1449 event.kind == Kind::Custom(30078)
1450 && event.tags.iter().any(|tag| {
1451 let values = tag.as_slice();
1452 values.first().is_some_and(|value| value == "d")
1453 && values.get(1).is_some_and(|value| value == tree_name)
1454 })
1455 && event.tags.iter().any(|tag| {
1456 let values = tag.as_slice();
1457 values.first().is_some_and(|value| value == "l")
1458 && values.get(1).is_some_and(|value| value == "hashtree")
1459 })
1460 }
1461
1462 fn build_public_root_event(
1463 tree_name: &str,
1464 cid: &hashtree_core::Cid,
1465 created_at: Timestamp,
1466 ) -> EventBuilder {
1467 let mut tags = vec![
1468 Tag::identifier(tree_name.to_string()),
1469 Tag::custom(
1470 TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
1471 vec!["hashtree"],
1472 ),
1473 Tag::custom(TagKind::Custom("hash".into()), vec![hex::encode(cid.hash)]),
1474 ];
1475 if let Some(key) = cid.key {
1476 tags.push(Tag::custom(
1477 TagKind::Custom("key".into()),
1478 vec![hex::encode(key)],
1479 ));
1480 }
1481
1482 EventBuilder::new(Kind::Custom(30078), "", tags).custom_created_at(created_at)
1483 }
1484}
1485
1486fn is_missing_local_blob_push_error(error: &anyhow::Error) -> bool {
1487 error
1488 .chain()
1489 .any(|cause| cause.to_string().contains(MISSING_LOCAL_BLOB_PUSH_ERROR))
1490}
1491
1492fn later_timestamp(left: Option<Timestamp>, right: Option<Timestamp>) -> Option<Timestamp> {
1493 match (left, right) {
1494 (Some(left), Some(right)) => Some(std::cmp::max(left, right)),
1495 (Some(left), None) => Some(left),
1496 (None, Some(right)) => Some(right),
1497 (None, None) => None,
1498 }
1499}
1500
1501fn next_replaceable_created_at(now: Timestamp, latest_existing: Option<Timestamp>) -> Timestamp {
1502 match latest_existing {
1503 Some(latest) if latest >= now => Timestamp::from_secs(latest.as_u64().saturating_add(1)),
1504 _ => now,
1505 }
1506}
1507
1508#[cfg(test)]
1509mod tests;