1use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
2use std::sync::Arc;
3use std::time::Duration;
4
5use crate::{ListEventsOptions, NostrEventStore, NostrEventStoreError, StoredNostrEvent};
6use futures::{stream, StreamExt};
7use hashtree_core::{Cid, Store};
8use nostr_sdk::{
9 pool::RelayLimits, Client, ClientOptions, EventId, Filter, Keys, Kind, PublicKey, SyncOptions,
10 Timestamp,
11};
12use nostr_social_graph::SocialGraphBackend;
13
14const NEGENTROPY_FETCH_CHUNK_SIZE: usize = 256;
15const NEGENTROPY_FETCH_CHUNK_CONCURRENCY: usize = 16;
16const NEGENTROPY_INITIAL_TIMEOUT: Duration = Duration::from_secs(1);
17const FULL_HISTORY_PAGING_CONCURRENCY_PER_RELAY: usize = 64;
18const METADATA_KIND: u32 = 0;
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub enum RelayFetchMode {
21 AuthorBatches,
22 GlobalRecent,
23}
24
25#[derive(Debug, Clone)]
26pub struct CrawlConfig {
27 pub relays: Vec<String>,
28 pub author_allowlist: Option<Vec<String>>,
29 pub max_live_bytes: Option<u64>,
30 pub max_events_seen: Option<usize>,
31 pub max_authors: Option<usize>,
32 pub max_follow_distance: Option<u32>,
33 pub author_batch_size: usize,
34 pub per_author_event_limit: usize,
35 pub per_author_live_bytes: Option<u64>,
36 pub fetch_timeout: Duration,
37 pub kinds: Option<Vec<u16>>,
38 pub relay_fetch_mode: RelayFetchMode,
39 pub require_negentropy: bool,
40 pub relay_event_max_size: Option<u32>,
41 pub relay_page_size: usize,
42 pub max_relay_pages: usize,
43 pub full_author_history: bool,
44}
45
46impl Default for CrawlConfig {
47 fn default() -> Self {
48 Self {
49 relays: Vec::new(),
50 author_allowlist: None,
51 max_live_bytes: None,
52 max_events_seen: None,
53 max_authors: None,
54 max_follow_distance: Some(1),
55 author_batch_size: 64,
56 per_author_event_limit: 256,
57 per_author_live_bytes: None,
58 fetch_timeout: Duration::from_secs(10),
59 kinds: None,
60 relay_fetch_mode: RelayFetchMode::AuthorBatches,
61 require_negentropy: false,
62 relay_event_max_size: None,
63 relay_page_size: 1_000,
64 max_relay_pages: 10,
65 full_author_history: false,
66 }
67 }
68}
69
70#[derive(Debug, Clone, Default, PartialEq)]
71pub struct CrawlReport {
72 pub root: Option<Cid>,
73 pub authors_considered: usize,
74 pub authors_processed: usize,
75 pub events_seen: usize,
76 pub events_selected: usize,
77 pub live_bytes_selected: u64,
78 pub applied_events: Vec<StoredNostrEvent>,
79}
80
81pub trait EventSelectionPolicy: Send + Sync {
82 fn priority(&self, event: &StoredNostrEvent) -> i32;
83}
84
85#[derive(Debug, Clone)]
86pub struct KindPriorityPolicy {
87 default_priority: i32,
88 priorities: BTreeMap<u32, i32>,
89}
90
91impl Default for KindPriorityPolicy {
92 fn default() -> Self {
93 let mut priorities = BTreeMap::new();
94 priorities.insert(1, 1_000);
95 priorities.insert(0, 900);
96 priorities.insert(3, 800);
97 priorities.insert(10_000, 750);
98 priorities.insert(6, 600);
99 priorities.insert(7, 500);
100 Self {
101 default_priority: 100,
102 priorities,
103 }
104 }
105}
106
107impl KindPriorityPolicy {
108 pub fn with_priority(mut self, kind: u32, priority: i32) -> Self {
109 self.priorities.insert(kind, priority);
110 self
111 }
112}
113
114impl EventSelectionPolicy for KindPriorityPolicy {
115 fn priority(&self, event: &StoredNostrEvent) -> i32 {
116 self.priorities
117 .get(&event.kind)
118 .copied()
119 .unwrap_or(self.default_priority)
120 }
121}
122
123#[derive(Debug, thiserror::Error)]
124pub enum CrawlError {
125 #[error("event store error: {0}")]
126 EventStore(#[from] NostrEventStoreError),
127 #[error("crawl requires at least one relay")]
128 MissingRelays,
129 #[error("per-author event limit must be greater than zero")]
130 InvalidPerAuthorLimit,
131 #[error("per-author live byte cap must be greater than zero")]
132 InvalidPerAuthorLiveBytes,
133 #[error("author batch size must be greater than zero")]
134 InvalidAuthorBatchSize,
135 #[error("relay page size must be greater than zero")]
136 InvalidRelayPageSize,
137 #[error("max relay pages must be greater than zero")]
138 InvalidMaxRelayPages,
139 #[error("max events seen must be greater than zero")]
140 InvalidMaxEventsSeen,
141 #[error("relay event max size must be greater than zero")]
142 InvalidRelayEventMaxSize,
143 #[error("nostr error: {0}")]
144 Nostr(String),
145 #[error("social graph error: {0}")]
146 SocialGraph(String),
147}
148
149pub type Result<T> = std::result::Result<T, CrawlError>;
150
151#[derive(Debug, Default)]
152struct RelayFetchResult {
153 events_seen: usize,
154 events: Vec<StoredNostrEvent>,
155 supports_negentropy: bool,
156}
157
158#[derive(Debug, Default)]
159struct BatchCrawlReport {
160 events_seen: usize,
161 events_selected: usize,
162 events: Vec<StoredNostrEvent>,
163 live_bytes_selected: u64,
164}
165
166#[derive(Debug, Default)]
167struct ProfileBatchReport {
168 events_seen: usize,
169 events_by_author: BTreeMap<String, Vec<StoredNostrEvent>>,
170}
171
172#[derive(Debug, Default)]
173struct GlobalRecentState {
174 current_root: Option<Cid>,
175 retained_by_author: BTreeMap<String, Vec<StoredNostrEvent>>,
176 events_selected: usize,
177 live_bytes_selected: u64,
178}
179
180pub struct NostrBridge<S: Store> {
181 event_store: NostrEventStore<S>,
182 config: CrawlConfig,
183 policy: Arc<dyn EventSelectionPolicy>,
184}
185
186impl<S: Store> NostrBridge<S> {
187 pub fn new(store: Arc<S>, config: CrawlConfig) -> Self {
188 Self {
189 event_store: NostrEventStore::new(store),
190 config,
191 policy: Arc::new(KindPriorityPolicy::default()),
192 }
193 }
194
195 pub fn with_policy(mut self, policy: Arc<dyn EventSelectionPolicy>) -> Self {
196 self.policy = policy;
197 self
198 }
199
200 pub async fn crawl<G: SocialGraphBackend>(
201 &self,
202 graph: &G,
203 existing_root: Option<&Cid>,
204 ) -> Result<CrawlReport> {
205 self.crawl_with_progress(graph, existing_root, |_| {}).await
206 }
207
208 pub async fn crawl_with_progress<G, F>(
209 &self,
210 graph: &G,
211 existing_root: Option<&Cid>,
212 mut on_progress: F,
213 ) -> Result<CrawlReport>
214 where
215 G: SocialGraphBackend,
216 F: FnMut(&CrawlReport),
217 {
218 self.validate_config()?;
219
220 let authors = self.collect_authors(graph)?;
221 if authors.is_empty() {
222 return Ok(CrawlReport::default());
223 }
224
225 let existing_root = self.usable_existing_root(existing_root).await?;
226 let client = self.connect_client().await?;
227
228 let result = if self.config.relay_fetch_mode == RelayFetchMode::AuthorBatches {
229 self.crawl_author_batches(&client, &authors, existing_root.as_ref(), &mut on_progress)
230 .await
231 } else {
232 let state = self
233 .load_existing_global_state(existing_root.as_ref(), &authors)
234 .await?;
235
236 let report = self
237 .crawl_global_recent_incremental(&client, &authors, state, &mut on_progress)
238 .await?;
239 on_progress(&report);
240 Ok(report)
241 };
242
243 let _ = client.disconnect().await;
244 result
245 }
246
247 async fn usable_existing_root(&self, root: Option<&Cid>) -> Result<Option<Cid>> {
248 let Some(root) = root else {
249 return Ok(None);
250 };
251
252 match self.event_store.validate_index_root(Some(root)).await {
253 Ok(()) => Ok(Some(root.clone())),
254 Err(err) => {
255 eprintln!(
256 "Ignoring invalid existing Nostr event index root {}: {err}",
257 hex::encode(root.hash)
258 );
259 Ok(None)
260 }
261 }
262 }
263
264 async fn crawl_author_batches(
265 &self,
266 client: &Client,
267 authors: &[String],
268 existing_root: Option<&Cid>,
269 on_progress: &mut impl FnMut(&CrawlReport),
270 ) -> Result<CrawlReport> {
271 let mut relay_negentropy_support = BTreeMap::<String, bool>::new();
272 let mut failed_relays = BTreeSet::<String>::new();
273 let mut current_root = existing_root.cloned();
274 let mut events_seen = 0usize;
275 let mut events_selected = 0usize;
276 let mut live_bytes_selected = 0u64;
277 let mut authors_processed = 0usize;
278 let mut applied_events = Vec::new();
279
280 for author_batch in authors.chunks(self.config.author_batch_size) {
281 let batch = self
282 .crawl_author_batch(
283 client,
284 author_batch,
285 current_root.as_ref(),
286 &mut relay_negentropy_support,
287 &mut failed_relays,
288 live_bytes_selected,
289 )
290 .await?;
291 events_seen = events_seen.saturating_add(batch.events_seen);
292 events_selected = events_selected.saturating_add(batch.events_selected);
293 live_bytes_selected = batch.live_bytes_selected;
294 authors_processed = authors_processed.saturating_add(author_batch.len());
295 if !batch.events.is_empty() {
296 applied_events.extend(batch.events.clone());
297 current_root = self
298 .event_store
299 .build(current_root.as_ref(), batch.events)
300 .await?;
301 }
302 on_progress(&CrawlReport {
303 root: current_root.clone(),
304 authors_considered: authors.len(),
305 authors_processed,
306 events_seen,
307 events_selected,
308 live_bytes_selected,
309 applied_events: Vec::new(),
310 });
311 if self.reached_events_seen_limit(events_seen) {
312 break;
313 }
314 }
315
316 Ok(CrawlReport {
317 root: current_root,
318 authors_considered: authors.len(),
319 authors_processed,
320 events_seen,
321 events_selected,
322 live_bytes_selected,
323 applied_events,
324 })
325 }
326
327 async fn load_existing_global_state(
328 &self,
329 root: Option<&Cid>,
330 authors: &[String],
331 ) -> Result<GlobalRecentState> {
332 let Some(root) = root else {
333 return Ok(GlobalRecentState::default());
334 };
335
336 match self
337 .event_store
338 .list_recent(Some(root), ListEventsOptions::default())
339 .await
340 {
341 Ok(events) => {
342 let author_set = authors.iter().map(String::as_str).collect::<BTreeSet<_>>();
343 let mut retained_by_author = BTreeMap::<String, Vec<StoredNostrEvent>>::new();
344 for event in events {
345 if !author_set.contains(event.pubkey.as_str()) || !self.kind_allowed(event.kind)
346 {
347 continue;
348 }
349 if !self.is_valid_stored_event(&event) {
350 continue;
351 }
352 retained_by_author
353 .entry(event.pubkey.clone())
354 .or_default()
355 .push(event);
356 }
357
358 let mut state = GlobalRecentState {
359 current_root: Some(root.clone()),
360 ..GlobalRecentState::default()
361 };
362 for (author, events) in retained_by_author {
363 let selected = self.select_author_events(events)?;
364 state.events_selected = state.events_selected.saturating_add(selected.len());
365 state.live_bytes_selected = state
366 .live_bytes_selected
367 .saturating_add(self.encoded_events_size(&selected)?);
368 state.retained_by_author.insert(author, selected);
369 }
370 Ok(state)
371 }
372 Err(NostrEventStoreError::Validation(message))
373 if message == "stored nostr event blob is missing" =>
374 {
375 eprintln!(
376 "Falling back to per-author resume for existing root due to missing event blobs"
377 );
378 let mut state = self
379 .load_existing_global_state_by_author(Some(root), authors)
380 .await?;
381 state.current_root = self.rebuild_root_from_retained_state(&state).await?;
382 Ok(state)
383 }
384 Err(err) => Err(err.into()),
385 }
386 }
387
388 async fn load_existing_global_state_by_author(
389 &self,
390 root: Option<&Cid>,
391 authors: &[String],
392 ) -> Result<GlobalRecentState> {
393 let mut state = GlobalRecentState {
394 current_root: root.cloned(),
395 ..GlobalRecentState::default()
396 };
397 for author in authors {
398 let retained = self
399 .load_retained_events(root, author)
400 .await?
401 .into_iter()
402 .filter(|event| self.kind_allowed(event.kind))
403 .filter(|event| self.is_valid_stored_event(event))
404 .collect::<Vec<_>>();
405 let retained = self.select_author_events(retained)?;
406 state.events_selected = state.events_selected.saturating_add(retained.len());
407 state.live_bytes_selected = state
408 .live_bytes_selected
409 .saturating_add(self.encoded_events_size(&retained)?);
410 state.retained_by_author.insert(author.clone(), retained);
411 }
412 Ok(state)
413 }
414
415 async fn rebuild_root_from_retained_state(
416 &self,
417 state: &GlobalRecentState,
418 ) -> Result<Option<Cid>> {
419 let events = state
420 .retained_by_author
421 .values()
422 .flat_map(|events| events.iter().cloned())
423 .collect::<Vec<_>>();
424 self.event_store
425 .build(None, events)
426 .await
427 .map_err(Into::into)
428 }
429
430 fn validate_config(&self) -> Result<()> {
431 if self.config.relays.is_empty() {
432 return Err(CrawlError::MissingRelays);
433 }
434 if self.config.per_author_event_limit == 0 {
435 return Err(CrawlError::InvalidPerAuthorLimit);
436 }
437 if self.config.per_author_live_bytes == Some(0) {
438 return Err(CrawlError::InvalidPerAuthorLiveBytes);
439 }
440 if self.config.author_batch_size == 0 {
441 return Err(CrawlError::InvalidAuthorBatchSize);
442 }
443 if self.config.relay_page_size == 0 {
444 return Err(CrawlError::InvalidRelayPageSize);
445 }
446 if self.config.max_relay_pages == 0 && !self.config.full_author_history {
447 return Err(CrawlError::InvalidMaxRelayPages);
448 }
449 if self.config.max_events_seen == Some(0) {
450 return Err(CrawlError::InvalidMaxEventsSeen);
451 }
452 if self.config.relay_event_max_size == Some(0) {
453 return Err(CrawlError::InvalidRelayEventMaxSize);
454 }
455 Ok(())
456 }
457
458 fn collect_authors<G: SocialGraphBackend>(&self, graph: &G) -> Result<Vec<String>> {
459 if let Some(author_allowlist) = &self.config.author_allowlist {
460 let mut seen = HashSet::new();
461 let mut authors = Vec::new();
462 for author in author_allowlist {
463 if !is_valid_hex_pubkey(author) {
464 continue;
465 }
466 if seen.insert(author.clone()) {
467 authors.push(author.clone());
468 }
469 }
470 if let Some(max_authors) = self.config.max_authors {
471 authors.truncate(max_authors);
472 }
473 return Ok(authors);
474 }
475
476 let root = graph
477 .get_root()
478 .map_err(|err| CrawlError::SocialGraph(err.to_string()))?;
479 let mut visited = BTreeSet::new();
480 let mut authors = Vec::new();
481 let mut queue = VecDeque::from([(root.clone(), 0u32)]);
482 visited.insert(root);
483
484 while let Some((author, distance)) = queue.pop_front() {
485 if !is_valid_hex_pubkey(&author) {
486 continue;
487 }
488 authors.push(author.clone());
489 if self
490 .config
491 .max_authors
492 .is_some_and(|max_authors| authors.len() >= max_authors)
493 {
494 break;
495 }
496 if self
497 .config
498 .max_follow_distance
499 .is_some_and(|max_distance| distance >= max_distance)
500 {
501 continue;
502 }
503
504 let mut follows = graph
505 .get_followed_by_user(&author)
506 .map_err(|err| CrawlError::SocialGraph(err.to_string()))?;
507 follows.retain(|followed| is_valid_hex_pubkey(followed));
508 follows.sort();
509 for followed in follows {
510 if visited.insert(followed.clone()) {
511 queue.push_back((followed, distance.saturating_add(1)));
512 }
513 }
514 }
515
516 Ok(authors)
517 }
518
519 async fn connect_client(&self) -> Result<Client> {
520 let client = if let Some(max_size) = self.config.relay_event_max_size {
521 let mut limits = RelayLimits::default();
522 limits.events.max_size = Some(max_size);
523 Client::builder()
524 .signer(Keys::generate())
525 .opts(ClientOptions::new().relay_limits(limits))
526 .build()
527 } else {
528 Client::new(Keys::generate())
529 };
530 for relay in &self.config.relays {
531 client
532 .add_relay(relay)
533 .await
534 .map_err(|err| CrawlError::Nostr(err.to_string()))?;
535 }
536 client.connect().await;
537 tokio::time::sleep(Duration::from_millis(250)).await;
538 Ok(client)
539 }
540
541 async fn crawl_author_batch(
542 &self,
543 client: &Client,
544 author_batch: &[String],
545 current_root: Option<&Cid>,
546 relay_negentropy_support: &mut BTreeMap<String, bool>,
547 failed_relays: &mut BTreeSet<String>,
548 live_bytes_selected_so_far: u64,
549 ) -> Result<BatchCrawlReport> {
550 let mut existing_by_author = BTreeMap::<String, Vec<StoredNostrEvent>>::new();
551 let mut known = BTreeMap::<String, StoredNostrEvent>::new();
552 for author in author_batch {
553 let retained = self
554 .load_retained_events(current_root, author)
555 .await?
556 .into_iter()
557 .filter(|event| self.kind_allowed(event.kind))
558 .filter(|event| self.is_valid_stored_event(event))
559 .collect::<Vec<_>>();
560 for event in &retained {
561 known.insert(event.id.clone(), event.clone());
562 }
563 existing_by_author.insert(author.clone(), retained);
564 }
565
566 let pubkeys: Vec<PublicKey> = author_batch
567 .iter()
568 .filter_map(|author| author.parse::<PublicKey>().ok())
569 .collect();
570 if pubkeys.is_empty() {
571 return Ok(BatchCrawlReport {
572 events_seen: 0,
573 events_selected: 0,
574 events: Vec::new(),
575 live_bytes_selected: live_bytes_selected_so_far,
576 });
577 }
578
579 let initial_known_ids = known.keys().cloned().collect::<BTreeSet<_>>();
580 if self.config.full_author_history {
581 return self
582 .crawl_full_author_history_batch(
583 client,
584 author_batch,
585 existing_by_author,
586 known,
587 initial_known_ids,
588 relay_negentropy_support,
589 failed_relays,
590 live_bytes_selected_so_far,
591 )
592 .await;
593 }
594
595 let filter = self.batch_filter(pubkeys);
596 let mut fetched = BTreeMap::<String, StoredNostrEvent>::new();
597 let mut events_seen = 0usize;
598
599 for relay in &self.config.relays {
600 if failed_relays.contains(relay) {
601 continue;
602 }
603 let local_items = self.local_items_for_batch(known.values(), author_batch);
604 let relay_support = relay_negentropy_support.get(relay).copied();
605 let fetched_from_relay = match self
606 .fetch_events_from_relay(client, relay, filter.clone(), local_items, relay_support)
607 .await
608 {
609 Ok(result) => result,
610 Err(err) => {
611 eprintln!("Skipping relay {relay}: {err}");
612 failed_relays.insert(relay.clone());
613 continue;
614 }
615 };
616 relay_negentropy_support.insert(relay.clone(), fetched_from_relay.supports_negentropy);
617 events_seen = events_seen.saturating_add(fetched_from_relay.events_seen);
618 for event in fetched_from_relay.events {
619 if self.kind_allowed(event.kind)
620 && known.insert(event.id.clone(), event.clone()).is_none()
621 {
622 fetched.insert(event.id.clone(), event);
623 }
624 }
625 }
626
627 let mut fetched_by_author = BTreeMap::<String, Vec<StoredNostrEvent>>::new();
628 for event in fetched.into_values() {
629 fetched_by_author
630 .entry(event.pubkey.clone())
631 .or_default()
632 .push(event);
633 }
634
635 let mut selected = Vec::new();
636 for author in author_batch {
637 let mut merged: BTreeMap<String, StoredNostrEvent> = BTreeMap::new();
638 if let Some(existing_events) = existing_by_author.remove(author) {
639 for event in existing_events {
640 merged.insert(event.id.clone(), event);
641 }
642 }
643 if let Some(events) = fetched_by_author.remove(author) {
644 for event in events {
645 merged.insert(event.id.clone(), event);
646 }
647 }
648 selected.extend(self.select_author_events(merged.into_values().collect())?);
649 }
650
651 let selected = selected
652 .into_iter()
653 .filter(|event| self.is_valid_stored_event(event))
654 .collect::<Vec<_>>();
655 let (selected, live_bytes_selected) =
656 self.apply_live_byte_cap_from(selected, live_bytes_selected_so_far)?;
657 let events_selected = selected.len();
658 let events_to_apply = selected
659 .into_iter()
660 .filter(|event| !initial_known_ids.contains(&event.id))
661 .collect::<Vec<_>>();
662
663 Ok(BatchCrawlReport {
664 events_seen,
665 events_selected,
666 events: events_to_apply,
667 live_bytes_selected,
668 })
669 }
670
671 #[allow(clippy::too_many_arguments)]
672 async fn crawl_full_author_history_batch(
673 &self,
674 client: &Client,
675 author_batch: &[String],
676 mut existing_by_author: BTreeMap<String, Vec<StoredNostrEvent>>,
677 mut known: BTreeMap<String, StoredNostrEvent>,
678 initial_known_ids: BTreeSet<String>,
679 relay_negentropy_support: &mut BTreeMap<String, bool>,
680 failed_relays: &mut BTreeSet<String>,
681 live_bytes_selected_so_far: u64,
682 ) -> Result<BatchCrawlReport> {
683 let mut events_seen = 0usize;
684 let mut selected = Vec::new();
685 let author_set = author_batch
686 .iter()
687 .map(String::as_str)
688 .collect::<BTreeSet<_>>();
689 let pubkeys = author_batch
690 .iter()
691 .filter_map(|author| author.parse::<PublicKey>().ok())
692 .collect::<Vec<_>>();
693
694 if pubkeys.is_empty() {
695 return Ok(BatchCrawlReport {
696 events_seen: 0,
697 events_selected: 0,
698 events: Vec::new(),
699 live_bytes_selected: live_bytes_selected_so_far,
700 });
701 }
702
703 let local_items = self.local_items_for_batch(known.values(), author_batch);
704 let mut fetched_by_author = BTreeMap::<String, Vec<StoredNostrEvent>>::new();
705
706 let relays_to_fetch = self
707 .config
708 .relays
709 .iter()
710 .filter(|relay| !failed_relays.contains(*relay))
711 .map(|relay| (relay.clone(), relay_negentropy_support.get(relay).copied()))
712 .collect::<Vec<_>>();
713 let relay_fetches = relays_to_fetch.into_iter().map(|(relay, relay_support)| {
714 let local_items = local_items.clone();
715 let pubkeys = &pubkeys;
716 async move {
717 let result = self
718 .fetch_full_history_from_relay(
719 client,
720 &relay,
721 pubkeys,
722 local_items,
723 relay_support,
724 )
725 .await;
726 (relay, result)
727 }
728 });
729 let mut relay_fetches =
730 stream::iter(relay_fetches).buffer_unordered(self.config.relays.len().max(1));
731
732 while let Some((relay, result)) = relay_fetches.next().await {
733 match result {
734 Ok(fetched_from_relay) => {
735 relay_negentropy_support
736 .insert(relay.clone(), fetched_from_relay.supports_negentropy);
737 events_seen = events_seen.saturating_add(fetched_from_relay.events_seen);
738 for event in fetched_from_relay.events {
739 if self.kind_allowed(event.kind)
740 && author_set.contains(event.pubkey.as_str())
741 && known.insert(event.id.clone(), event.clone()).is_none()
742 {
743 fetched_by_author
744 .entry(event.pubkey.clone())
745 .or_default()
746 .push(event);
747 }
748 }
749 }
750 Err(err) => {
751 eprintln!("Skipping relay {relay}: {err}");
752 failed_relays.insert(relay.clone());
753 }
754 }
755 }
756
757 for author in author_batch {
758 let mut merged = BTreeMap::<String, StoredNostrEvent>::new();
759 if let Some(existing_events) = existing_by_author.remove(author) {
760 for event in existing_events {
761 merged.insert(event.id.clone(), event);
762 }
763 }
764 if let Some(events) = fetched_by_author.remove(author) {
765 for event in events {
766 merged.insert(event.id.clone(), event);
767 }
768 }
769 let author_selected = self
770 .select_author_events_with_limits(
771 merged.into_values().collect(),
772 self.config.per_author_event_limit,
773 self.config.per_author_live_bytes,
774 )?
775 .into_iter()
776 .filter(|event| self.is_valid_stored_event(event))
777 .collect::<Vec<_>>();
778 selected.extend(author_selected);
779 }
780
781 let (events, live_bytes_selected) =
782 self.apply_live_byte_cap_from(selected, live_bytes_selected_so_far)?;
783 let events_selected = events.len();
784 let events_to_apply = events
785 .into_iter()
786 .filter(|event| !initial_known_ids.contains(&event.id))
787 .collect::<Vec<_>>();
788 Ok(BatchCrawlReport {
789 events_seen,
790 events_selected,
791 events: events_to_apply,
792 live_bytes_selected,
793 })
794 }
795
796 async fn crawl_global_recent_incremental(
797 &self,
798 client: &Client,
799 authors: &[String],
800 mut state: GlobalRecentState,
801 on_progress: &mut impl FnMut(&CrawlReport),
802 ) -> Result<CrawlReport> {
803 let author_set = authors.iter().map(String::as_str).collect::<BTreeSet<_>>();
804 let mut known_ids = state
805 .retained_by_author
806 .values()
807 .flat_map(|events| events.iter().map(|event| event.id.clone()))
808 .collect::<BTreeSet<_>>();
809 let mut authors_processed = state
810 .retained_by_author
811 .values()
812 .filter(|events| !events.is_empty())
813 .count();
814 let mut failed_relays = BTreeSet::<String>::new();
815 let mut relay_negentropy_support = BTreeMap::<String, bool>::new();
816 let mut events_seen = 0usize;
817 let mut applied_events = Vec::new();
818
819 self.hydrate_global_recent_profiles(
820 client,
821 authors,
822 &mut state,
823 &mut known_ids,
824 &mut authors_processed,
825 &mut applied_events,
826 &mut relay_negentropy_support,
827 &mut failed_relays,
828 &mut events_seen,
829 on_progress,
830 )
831 .await?;
832 if self.reached_events_seen_limit(events_seen) {
833 return Ok(CrawlReport {
834 root: state.current_root,
835 authors_considered: authors.len(),
836 authors_processed,
837 events_seen,
838 events_selected: state.events_selected,
839 live_bytes_selected: state.live_bytes_selected,
840 applied_events,
841 });
842 }
843
844 for relay in &self.config.relays {
845 if failed_relays.contains(relay) {
846 continue;
847 }
848 let mut until = None;
849 for _ in 0..self.config.max_relay_pages {
850 let filter = self.global_recent_filter(until);
851 let events = match client
852 .fetch_events_from([relay], filter, self.config.fetch_timeout)
853 .await
854 .map(|events| events.to_vec())
855 {
856 Ok(events) => events,
857 Err(err) => {
858 eprintln!("Skipping relay {relay}: {}", err);
859 failed_relays.insert(relay.clone());
860 break;
861 }
862 };
863 let fetched_count = events.len();
864 events_seen = events_seen.saturating_add(fetched_count);
865 if fetched_count == 0 {
866 break;
867 }
868
869 let mut min_created_at = u64::MAX;
870 let mut incoming_by_author = BTreeMap::<String, Vec<StoredNostrEvent>>::new();
871 for event in events {
872 min_created_at = min_created_at.min(event.created_at.as_secs());
873 if event.kind.is_ephemeral() {
874 continue;
875 }
876
877 let stored = stored_event_from_nostr(&event);
878 if !author_set.contains(stored.pubkey.as_str())
879 || !self.kind_allowed(stored.kind)
880 {
881 continue;
882 }
883 incoming_by_author
884 .entry(stored.pubkey.clone())
885 .or_default()
886 .push(stored);
887 }
888
889 let pending_apply = self.merge_author_events_into_state(
890 &mut state,
891 incoming_by_author,
892 &mut known_ids,
893 &mut authors_processed,
894 )?;
895 if !pending_apply.is_empty() {
896 applied_events.extend(pending_apply.clone());
897 state.current_root = self
898 .event_store
899 .build(state.current_root.as_ref(), pending_apply)
900 .await?;
901 }
902
903 on_progress(&CrawlReport {
904 root: state.current_root.clone(),
905 authors_considered: authors.len(),
906 authors_processed,
907 events_seen,
908 events_selected: state.events_selected,
909 live_bytes_selected: state.live_bytes_selected,
910 applied_events: Vec::new(),
911 });
912
913 if min_created_at == u64::MAX || min_created_at == 0 {
914 break;
915 }
916 let next_until = min_created_at.saturating_sub(1);
917 if until == Some(next_until) {
918 break;
919 }
920 until = Some(next_until);
921 if self.reached_events_seen_limit(events_seen) {
922 break;
923 }
924 }
925 if self.reached_events_seen_limit(events_seen) {
926 break;
927 }
928 }
929
930 Ok(CrawlReport {
931 root: state.current_root,
932 authors_considered: authors.len(),
933 authors_processed,
934 events_seen,
935 events_selected: state.events_selected,
936 live_bytes_selected: state.live_bytes_selected,
937 applied_events,
938 })
939 }
940
941 fn batch_filter(&self, pubkeys: Vec<PublicKey>) -> Filter {
942 let mut filter = Filter::new().authors(pubkeys);
943 if let Some(kinds) = &self.config.kinds {
944 filter = filter.kinds(kinds.iter().copied().map(Kind::from));
945 }
946 let mut relay_limit = self
947 .config
948 .author_batch_size
949 .saturating_mul(self.config.per_author_event_limit);
950 let relay_page_budget = self
951 .config
952 .relay_page_size
953 .saturating_mul(self.config.max_relay_pages.max(1));
954 if relay_page_budget > 0 {
955 relay_limit = relay_limit.min(relay_page_budget);
956 }
957 if relay_limit > 0 {
958 filter = filter.limit(relay_limit);
959 }
960 filter
961 }
962
963 fn full_history_negentropy_filter(&self, pubkeys: Vec<PublicKey>) -> Filter {
964 let mut filter = Filter::new().authors(pubkeys);
965 if let Some(kinds) = &self.config.kinds {
966 filter = filter.kinds(kinds.iter().copied().map(Kind::from));
967 }
968 filter
969 }
970
971 fn global_recent_filter(&self, until: Option<u64>) -> Filter {
972 let mut filter = Filter::new().limit(self.config.relay_page_size);
973 if let Some(kinds) = &self.config.kinds {
974 filter = filter.kinds(kinds.iter().copied().map(Kind::from));
975 }
976 if let Some(until) = until {
977 filter = filter.until(Timestamp::from_secs(until));
978 }
979 filter
980 }
981
982 fn reached_events_seen_limit(&self, events_seen: usize) -> bool {
983 self.config
984 .max_events_seen
985 .is_some_and(|limit| events_seen >= limit)
986 }
987
988 fn is_valid_stored_event(&self, event: &StoredNostrEvent) -> bool {
989 self.event_store.encode_event(event).is_ok()
990 }
991
992 fn encoded_events_size(&self, events: &[StoredNostrEvent]) -> Result<u64> {
993 let mut total = 0u64;
994 for event in events {
995 total = total.saturating_add(self.event_store.encode_event(event)?.len() as u64);
996 }
997 Ok(total)
998 }
999
1000 fn local_items_for_batch<'a, I>(
1001 &self,
1002 known_events: I,
1003 author_batch: &[String],
1004 ) -> Vec<(EventId, Timestamp)>
1005 where
1006 I: Iterator<Item = &'a StoredNostrEvent>,
1007 {
1008 let authors = author_batch
1009 .iter()
1010 .map(String::as_str)
1011 .collect::<BTreeSet<_>>();
1012
1013 known_events
1014 .filter(|event| {
1015 authors.contains(event.pubkey.as_str()) && self.kind_allowed(event.kind)
1016 })
1017 .filter_map(|event| {
1018 let event_id = EventId::parse(&event.id).ok()?;
1019 Some((event_id, Timestamp::from_secs(event.created_at)))
1020 })
1021 .collect()
1022 }
1023
1024 fn local_items_for_batch_by_kind<'a, I>(
1025 &self,
1026 known_events: I,
1027 author_batch: &[String],
1028 kind: u32,
1029 ) -> Vec<(EventId, Timestamp)>
1030 where
1031 I: Iterator<Item = &'a StoredNostrEvent>,
1032 {
1033 let authors = author_batch
1034 .iter()
1035 .map(String::as_str)
1036 .collect::<BTreeSet<_>>();
1037
1038 known_events
1039 .filter(|event| {
1040 authors.contains(event.pubkey.as_str())
1041 && event.kind == kind
1042 && self.kind_allowed(event.kind)
1043 })
1044 .filter_map(|event| {
1045 let event_id = EventId::parse(&event.id).ok()?;
1046 Some((event_id, Timestamp::from_secs(event.created_at)))
1047 })
1048 .collect()
1049 }
1050
1051 async fn load_retained_events(
1052 &self,
1053 root: Option<&Cid>,
1054 author: &str,
1055 ) -> Result<Vec<StoredNostrEvent>> {
1056 self.event_store
1057 .list_by_author_lossy(root, author, ListEventsOptions::default())
1058 .await
1059 .map_err(Into::into)
1060 }
1061
1062 fn merge_author_events_into_state(
1063 &self,
1064 state: &mut GlobalRecentState,
1065 incoming_by_author: BTreeMap<String, Vec<StoredNostrEvent>>,
1066 known_ids: &mut BTreeSet<String>,
1067 authors_processed: &mut usize,
1068 ) -> Result<Vec<StoredNostrEvent>> {
1069 let mut pending_apply = Vec::new();
1070
1071 for (author, incoming) in incoming_by_author {
1072 let retained = state.retained_by_author.entry(author).or_default();
1073 let was_empty = retained.is_empty();
1074 let old_len = retained.len();
1075 let old_live_bytes = self.encoded_events_size(retained)?;
1076
1077 let mut merged = BTreeMap::<String, StoredNostrEvent>::new();
1078 for existing in retained.drain(..) {
1079 merged.insert(existing.id.clone(), existing);
1080 }
1081 for event in incoming {
1082 merged.insert(event.id.clone(), event);
1083 }
1084
1085 let selected = self.select_author_events(merged.into_values().collect())?;
1086 let selected_live_bytes = self.encoded_events_size(&selected)?;
1087
1088 for selected_event in &selected {
1089 if known_ids.insert(selected_event.id.clone()) {
1090 pending_apply.push(selected_event.clone());
1091 }
1092 }
1093
1094 state.events_selected = state
1095 .events_selected
1096 .saturating_sub(old_len)
1097 .saturating_add(selected.len());
1098 state.live_bytes_selected = state
1099 .live_bytes_selected
1100 .saturating_sub(old_live_bytes)
1101 .saturating_add(selected_live_bytes);
1102 if was_empty && !selected.is_empty() {
1103 *authors_processed = authors_processed.saturating_add(1);
1104 }
1105 *retained = selected;
1106 }
1107
1108 Ok(pending_apply)
1109 }
1110
1111 #[allow(clippy::too_many_arguments)]
1112 async fn hydrate_global_recent_profiles(
1113 &self,
1114 client: &Client,
1115 authors: &[String],
1116 state: &mut GlobalRecentState,
1117 known_ids: &mut BTreeSet<String>,
1118 authors_processed: &mut usize,
1119 applied_events: &mut Vec<StoredNostrEvent>,
1120 relay_negentropy_support: &mut BTreeMap<String, bool>,
1121 failed_relays: &mut BTreeSet<String>,
1122 events_seen: &mut usize,
1123 on_progress: &mut impl FnMut(&CrawlReport),
1124 ) -> Result<()> {
1125 if !self.kind_allowed(METADATA_KIND) {
1126 return Ok(());
1127 }
1128
1129 let authors_missing_profiles = authors
1130 .iter()
1131 .filter(|author| {
1132 !state
1133 .retained_by_author
1134 .get(*author)
1135 .is_some_and(|events| events.iter().any(|event| event.kind == METADATA_KIND))
1136 })
1137 .cloned()
1138 .collect::<Vec<_>>();
1139 if authors_missing_profiles.is_empty() {
1140 return Ok(());
1141 }
1142
1143 for author_batch in authors_missing_profiles.chunks(self.config.author_batch_size.max(1)) {
1144 let batch = self
1145 .crawl_profile_batch(
1146 client,
1147 author_batch,
1148 state
1149 .retained_by_author
1150 .values()
1151 .flat_map(|events| events.iter()),
1152 relay_negentropy_support,
1153 failed_relays,
1154 )
1155 .await?;
1156
1157 *events_seen = events_seen.saturating_add(batch.events_seen);
1158 let pending_apply = self.merge_author_events_into_state(
1159 state,
1160 batch.events_by_author,
1161 known_ids,
1162 authors_processed,
1163 )?;
1164 if !pending_apply.is_empty() {
1165 applied_events.extend(pending_apply.clone());
1166 state.current_root = self
1167 .event_store
1168 .build(state.current_root.as_ref(), pending_apply)
1169 .await?;
1170 }
1171
1172 on_progress(&CrawlReport {
1173 root: state.current_root.clone(),
1174 authors_considered: authors.len(),
1175 authors_processed: *authors_processed,
1176 events_seen: *events_seen,
1177 events_selected: state.events_selected,
1178 live_bytes_selected: state.live_bytes_selected,
1179 applied_events: Vec::new(),
1180 });
1181
1182 if self.reached_events_seen_limit(*events_seen) {
1183 break;
1184 }
1185 }
1186
1187 Ok(())
1188 }
1189
1190 async fn crawl_profile_batch<'a, I>(
1191 &self,
1192 client: &Client,
1193 author_batch: &[String],
1194 known_events: I,
1195 relay_negentropy_support: &mut BTreeMap<String, bool>,
1196 failed_relays: &mut BTreeSet<String>,
1197 ) -> Result<ProfileBatchReport>
1198 where
1199 I: Iterator<Item = &'a StoredNostrEvent>,
1200 {
1201 let pubkeys: Vec<PublicKey> = author_batch
1202 .iter()
1203 .filter_map(|author| author.parse::<PublicKey>().ok())
1204 .collect();
1205 if pubkeys.is_empty() {
1206 return Ok(ProfileBatchReport::default());
1207 }
1208
1209 let filter = Filter::new()
1210 .authors(pubkeys)
1211 .kind(Kind::Metadata)
1212 .limit(author_batch.len().saturating_mul(2).max(1));
1213 let local_items =
1214 self.local_items_for_batch_by_kind(known_events, author_batch, METADATA_KIND);
1215 let mut fetched_by_author = BTreeMap::<String, Vec<StoredNostrEvent>>::new();
1216 let mut events_seen = 0usize;
1217
1218 for relay in &self.config.relays {
1219 if failed_relays.contains(relay) {
1220 continue;
1221 }
1222 let relay_support = relay_negentropy_support.get(relay).copied();
1223 let fetched_from_relay = match self
1224 .fetch_events_from_relay(
1225 client,
1226 relay,
1227 filter.clone(),
1228 local_items.clone(),
1229 relay_support,
1230 )
1231 .await
1232 {
1233 Ok(result) => result,
1234 Err(err) => {
1235 eprintln!("Skipping relay {relay}: {err}");
1236 failed_relays.insert(relay.clone());
1237 continue;
1238 }
1239 };
1240 relay_negentropy_support.insert(relay.clone(), fetched_from_relay.supports_negentropy);
1241 events_seen = events_seen.saturating_add(fetched_from_relay.events_seen);
1242 for event in fetched_from_relay.events {
1243 if event.kind == METADATA_KIND && self.kind_allowed(event.kind) {
1244 fetched_by_author
1245 .entry(event.pubkey.clone())
1246 .or_default()
1247 .push(event);
1248 }
1249 }
1250 }
1251
1252 Ok(ProfileBatchReport {
1253 events_seen,
1254 events_by_author: fetched_by_author,
1255 })
1256 }
1257
1258 async fn fetch_events_from_relay(
1259 &self,
1260 client: &Client,
1261 relay: &str,
1262 filter: Filter,
1263 local_items: Vec<(EventId, Timestamp)>,
1264 supports_negentropy: Option<bool>,
1265 ) -> Result<RelayFetchResult> {
1266 if supports_negentropy == Some(false) {
1267 if self.config.require_negentropy {
1268 return Ok(RelayFetchResult {
1269 events_seen: 0,
1270 events: Vec::new(),
1271 supports_negentropy: false,
1272 });
1273 }
1274 return self
1275 .fetch_full_filter(client, relay, filter)
1276 .await
1277 .map(|events| RelayFetchResult {
1278 events_seen: events.len(),
1279 events,
1280 supports_negentropy: false,
1281 });
1282 }
1283
1284 match self
1285 .reconcile_missing_ids(client, relay, filter.clone(), local_items)
1286 .await
1287 {
1288 Ok(Some(missing)) => self.fetch_missing_ids(client, relay, missing).await.map(
1289 |RelayFetchResult {
1290 events_seen,
1291 events,
1292 ..
1293 }| RelayFetchResult {
1294 events_seen,
1295 events,
1296 supports_negentropy: true,
1297 },
1298 ),
1299 Ok(None) | Err(_) => {
1300 if self.config.require_negentropy {
1301 Ok(RelayFetchResult {
1302 events_seen: 0,
1303 events: Vec::new(),
1304 supports_negentropy: false,
1305 })
1306 } else {
1307 self.fetch_full_filter(client, relay, filter)
1308 .await
1309 .map(|events| RelayFetchResult {
1310 events_seen: events.len(),
1311 events,
1312 supports_negentropy: false,
1313 })
1314 }
1315 }
1316 }
1317 }
1318
1319 async fn fetch_missing_ids(
1320 &self,
1321 client: &Client,
1322 relay: &str,
1323 missing_ids: Vec<EventId>,
1324 ) -> Result<RelayFetchResult> {
1325 if missing_ids.is_empty() {
1326 return Ok(RelayFetchResult {
1327 events_seen: 0,
1328 events: Vec::new(),
1329 supports_negentropy: true,
1330 });
1331 }
1332
1333 let mut out = BTreeMap::<String, StoredNostrEvent>::new();
1334 let mut events_seen = 0usize;
1335 let filters = missing_ids
1336 .chunks(NEGENTROPY_FETCH_CHUNK_SIZE)
1337 .map(|chunk| Filter::new().ids(chunk.iter().cloned()))
1338 .collect::<Vec<_>>();
1339 let fetches = filters.into_iter().map(|filter| async move {
1340 client
1341 .fetch_events_from([relay], filter, self.config.fetch_timeout)
1342 .await
1343 .map(|events| events.to_vec())
1344 .map_err(|err| CrawlError::Nostr(err.to_string()))
1345 });
1346 let mut fetches =
1347 stream::iter(fetches).buffer_unordered(NEGENTROPY_FETCH_CHUNK_CONCURRENCY);
1348 while let Some(result) = fetches.next().await {
1349 let events = result?;
1350 events_seen = events_seen.saturating_add(events.len());
1351 for event in events {
1352 if event.kind.is_ephemeral() {
1353 continue;
1354 }
1355 let stored = stored_event_from_nostr(&event);
1356 out.insert(stored.id.clone(), stored);
1357 }
1358 }
1359 Ok(RelayFetchResult {
1360 events_seen,
1361 events: out.into_values().collect(),
1362 supports_negentropy: true,
1363 })
1364 }
1365
1366 async fn fetch_full_filter(
1367 &self,
1368 client: &Client,
1369 relay: &str,
1370 filter: Filter,
1371 ) -> Result<Vec<StoredNostrEvent>> {
1372 let mut out = Vec::new();
1373 let events = client
1374 .fetch_events_from([relay], filter, self.config.fetch_timeout)
1375 .await
1376 .map(|events| events.to_vec())
1377 .map_err(|err| CrawlError::Nostr(err.to_string()))?;
1378
1379 for event in events {
1380 if event.kind.is_ephemeral() {
1381 continue;
1382 }
1383 out.push(stored_event_from_nostr(&event));
1384 }
1385
1386 Ok(out)
1387 }
1388
1389 async fn fetch_full_history_from_relay(
1390 &self,
1391 client: &Client,
1392 relay: &str,
1393 pubkeys: &[PublicKey],
1394 local_items: Vec<(EventId, Timestamp)>,
1395 supports_negentropy: Option<bool>,
1396 ) -> Result<RelayFetchResult> {
1397 if supports_negentropy != Some(false) {
1398 let filter = self.full_history_negentropy_filter(pubkeys.to_vec());
1399 match self
1400 .reconcile_missing_ids(client, relay, filter, local_items)
1401 .await
1402 {
1403 Ok(Some(missing)) => {
1404 return match self.fetch_missing_ids(client, relay, missing).await {
1405 Ok(RelayFetchResult {
1406 events_seen,
1407 events,
1408 ..
1409 }) => Ok(RelayFetchResult {
1410 events_seen,
1411 events,
1412 supports_negentropy: true,
1413 }),
1414 Err(err)
1415 if !self.config.require_negentropy
1416 && self.config.max_relay_pages > 0 =>
1417 {
1418 self.fetch_full_history_by_paging_from_relay(client, relay, pubkeys)
1419 .await
1420 .map_err(|_| err)
1421 }
1422 Err(err) => Err(err),
1423 };
1424 }
1425 Ok(None) | Err(_) if self.config.require_negentropy => {
1426 return Ok(RelayFetchResult {
1427 events_seen: 0,
1428 events: Vec::new(),
1429 supports_negentropy: false,
1430 });
1431 }
1432 Ok(None) | Err(_) => {}
1433 }
1434 }
1435
1436 if self.config.require_negentropy || self.config.max_relay_pages == 0 {
1437 return Ok(RelayFetchResult {
1438 events_seen: 0,
1439 events: Vec::new(),
1440 supports_negentropy: false,
1441 });
1442 }
1443 self.fetch_full_history_by_paging_from_relay(client, relay, pubkeys)
1444 .await
1445 }
1446
1447 async fn reconcile_missing_ids(
1448 &self,
1449 client: &Client,
1450 relay: &str,
1451 filter: Filter,
1452 local_items: Vec<(EventId, Timestamp)>,
1453 ) -> Result<Option<Vec<EventId>>> {
1454 let initial_timeout = self.config.fetch_timeout.min(NEGENTROPY_INITIAL_TIMEOUT);
1455 let opts = SyncOptions::default()
1456 .initial_timeout(initial_timeout)
1457 .dry_run();
1458 let targets = [(relay.to_owned(), (filter, local_items))];
1459 let sync = client.pool().sync_targeted(targets, &opts);
1460 let output = match tokio::time::timeout(self.config.fetch_timeout, sync).await {
1461 Ok(Ok(output)) => output,
1462 Ok(Err(_)) | Err(_) => return Ok(None),
1463 };
1464
1465 if output.success.is_empty() {
1466 return Ok(None);
1467 }
1468
1469 Ok(Some(output.remote.iter().cloned().collect()))
1470 }
1471
1472 async fn fetch_full_history_by_paging_from_relay(
1473 &self,
1474 client: &Client,
1475 relay: &str,
1476 pubkeys: &[PublicKey],
1477 ) -> Result<RelayFetchResult> {
1478 let mut out = BTreeMap::<String, StoredNostrEvent>::new();
1479 let mut events_seen = 0usize;
1480 let concurrency = FULL_HISTORY_PAGING_CONCURRENCY_PER_RELAY
1481 .min(pubkeys.len().max(1))
1482 .max(1);
1483 let fetches = pubkeys.iter().copied().map(|pubkey| async move {
1484 self.fetch_full_author_history_by_paging_from_relay(client, relay, pubkey)
1485 .await
1486 });
1487 let mut fetches = stream::iter(fetches).buffer_unordered(concurrency);
1488
1489 while let Some(result) = fetches.next().await {
1490 let fetched = result?;
1491 events_seen = events_seen.saturating_add(fetched.events_seen);
1492 for event in fetched.events {
1493 out.insert(event.id.clone(), event);
1494 }
1495 if self.reached_events_seen_limit(events_seen) {
1496 break;
1497 }
1498 }
1499
1500 Ok(RelayFetchResult {
1501 events_seen,
1502 events: out.into_values().collect(),
1503 supports_negentropy: false,
1504 })
1505 }
1506
1507 async fn fetch_full_author_history_by_paging_from_relay(
1508 &self,
1509 client: &Client,
1510 relay: &str,
1511 pubkey: PublicKey,
1512 ) -> Result<RelayFetchResult> {
1513 let mut out = BTreeMap::<String, StoredNostrEvent>::new();
1514 let mut events_seen = 0usize;
1515 let mut until = None;
1516
1517 for _ in 0..self.config.max_relay_pages {
1518 let remaining = self.config.per_author_event_limit.saturating_sub(out.len());
1519 if remaining == 0 {
1520 break;
1521 }
1522 let mut filter = Filter::new()
1523 .author(pubkey)
1524 .limit(self.config.relay_page_size.min(remaining));
1525 if let Some(kinds) = &self.config.kinds {
1526 filter = filter.kinds(kinds.iter().copied().map(Kind::from));
1527 }
1528 if let Some(until) = until {
1529 filter = filter.until(Timestamp::from_secs(until));
1530 }
1531
1532 let events = client
1533 .fetch_events_from([relay], filter, self.config.fetch_timeout)
1534 .await
1535 .map(|events| events.to_vec())
1536 .map_err(|err| CrawlError::Nostr(err.to_string()))?;
1537 let fetched_count = events.len();
1538 events_seen = events_seen.saturating_add(fetched_count);
1539 if fetched_count == 0 {
1540 break;
1541 }
1542
1543 let mut min_created_at = u64::MAX;
1544 for event in events {
1545 min_created_at = min_created_at.min(event.created_at.as_secs());
1546 if event.kind.is_ephemeral() {
1547 continue;
1548 }
1549 let stored = stored_event_from_nostr(&event);
1550 out.insert(stored.id.clone(), stored);
1551 if out.len() >= self.config.per_author_event_limit {
1552 break;
1553 }
1554 }
1555
1556 if out.len() >= self.config.per_author_event_limit {
1557 break;
1558 }
1559 if min_created_at == u64::MAX || min_created_at == 0 {
1560 break;
1561 }
1562 let next_until = min_created_at.saturating_sub(1);
1563 if until == Some(next_until) {
1564 break;
1565 }
1566 until = Some(next_until);
1567 if self.reached_events_seen_limit(events_seen) {
1568 break;
1569 }
1570 }
1571
1572 Ok(RelayFetchResult {
1573 events_seen,
1574 events: out.into_values().collect(),
1575 supports_negentropy: false,
1576 })
1577 }
1578
1579 fn select_author_events(&self, events: Vec<StoredNostrEvent>) -> Result<Vec<StoredNostrEvent>> {
1580 self.select_author_events_with_limits(
1581 events,
1582 self.config.per_author_event_limit,
1583 self.config.per_author_live_bytes,
1584 )
1585 }
1586
1587 fn select_author_events_with_limits(
1588 &self,
1589 mut events: Vec<StoredNostrEvent>,
1590 event_limit: usize,
1591 live_byte_limit: Option<u64>,
1592 ) -> Result<Vec<StoredNostrEvent>> {
1593 let sticky_events = self.select_sticky_author_events(&events);
1594 let sticky_ids = sticky_events
1595 .iter()
1596 .map(|event| event.id.clone())
1597 .collect::<HashSet<_>>();
1598 events.retain(|event| !sticky_ids.contains(&event.id));
1599 events.sort_by(|left, right| {
1600 self.policy
1601 .priority(right)
1602 .cmp(&self.policy.priority(left))
1603 .then_with(|| right.created_at.cmp(&left.created_at))
1604 .then_with(|| left.id.cmp(&right.id))
1605 });
1606
1607 if let Some(max_live_bytes) = live_byte_limit {
1608 let mut selected = sticky_events.clone();
1609 let mut live_bytes_selected = self.encoded_events_size(&selected)?;
1610 if live_bytes_selected > max_live_bytes {
1611 return Ok(selected);
1612 }
1613 for event in events {
1614 let encoded_len = self.event_store.encode_event(&event)?.len() as u64;
1615 if live_bytes_selected.saturating_add(encoded_len) > max_live_bytes {
1616 continue;
1617 }
1618 live_bytes_selected = live_bytes_selected.saturating_add(encoded_len);
1619 selected.push(event);
1620 if selected.len().saturating_sub(sticky_events.len()) >= event_limit {
1621 break;
1622 }
1623 }
1624 return Ok(selected);
1625 }
1626
1627 let mut selected = sticky_events;
1628 selected.extend(events.into_iter().take(event_limit));
1629 Ok(selected)
1630 }
1631
1632 fn select_sticky_author_events(&self, events: &[StoredNostrEvent]) -> Vec<StoredNostrEvent> {
1633 let latest_metadata = events
1634 .iter()
1635 .filter(|event| event.kind == METADATA_KIND)
1636 .cloned()
1637 .max_by(|left, right| {
1638 left.created_at
1639 .cmp(&right.created_at)
1640 .then_with(|| right.id.cmp(&left.id))
1641 });
1642
1643 latest_metadata.into_iter().collect()
1644 }
1645
1646 fn apply_live_byte_cap_from(
1647 &self,
1648 mut events: Vec<StoredNostrEvent>,
1649 live_bytes_selected_so_far: u64,
1650 ) -> Result<(Vec<StoredNostrEvent>, u64)> {
1651 events.sort_by(|left, right| {
1652 self.policy
1653 .priority(right)
1654 .cmp(&self.policy.priority(left))
1655 .then_with(|| right.created_at.cmp(&left.created_at))
1656 .then_with(|| left.id.cmp(&right.id))
1657 });
1658
1659 let Some(max_live_bytes) = self.config.max_live_bytes else {
1660 let live_bytes_selected =
1661 events
1662 .iter()
1663 .try_fold(live_bytes_selected_so_far, |total, event| {
1664 let encoded = self.event_store.encode_event(event)?;
1665 Ok::<u64, NostrEventStoreError>(total.saturating_add(encoded.len() as u64))
1666 })?;
1667 return Ok((events, live_bytes_selected));
1668 };
1669
1670 let mut selected = Vec::new();
1671 let mut live_bytes_selected = live_bytes_selected_so_far;
1672 for event in events {
1673 let encoded_len = self.event_store.encode_event(&event)?.len() as u64;
1674 if live_bytes_selected.saturating_add(encoded_len) > max_live_bytes {
1675 continue;
1676 }
1677 live_bytes_selected = live_bytes_selected.saturating_add(encoded_len);
1678 selected.push(event);
1679 }
1680
1681 Ok((selected, live_bytes_selected))
1682 }
1683
1684 fn kind_allowed(&self, kind: u32) -> bool {
1685 self.config.kinds.as_ref().is_none_or(|allowed| {
1686 allowed
1687 .iter()
1688 .any(|candidate| u32::from(*candidate) == kind)
1689 })
1690 }
1691}
1692
1693fn stored_event_from_nostr(event: &nostr_sdk::Event) -> StoredNostrEvent {
1694 StoredNostrEvent {
1695 id: event.id.to_hex(),
1696 pubkey: event.pubkey.to_hex(),
1697 created_at: event.created_at.as_secs(),
1698 kind: event.kind.as_u16() as u32,
1699 tags: event
1700 .tags
1701 .iter()
1702 .map(|tag| tag.as_slice().to_vec())
1703 .collect(),
1704 content: event.content.clone(),
1705 sig: event.sig.to_string(),
1706 }
1707}
1708
1709fn is_valid_hex_pubkey(value: &str) -> bool {
1710 value.len() == 64
1711 && value
1712 .bytes()
1713 .all(|byte| byte.is_ascii_digit() || (b'a'..=b'f').contains(&byte))
1714}
1715
1716#[cfg(test)]
1717mod tests {
1718 use std::sync::Arc;
1719
1720 use hashtree_core::MemoryStore;
1721 use nostr_social_graph::{NostrEvent, SocialGraphBackend as NostrSocialGraphBackend};
1722
1723 use super::{CrawlConfig, NostrBridge, StoredNostrEvent};
1724
1725 #[derive(Default)]
1726 struct FakeGraphBackend;
1727
1728 impl NostrSocialGraphBackend for FakeGraphBackend {
1729 type Error = std::io::Error;
1730
1731 fn get_root(&self) -> std::result::Result<String, Self::Error> {
1732 Ok("0".repeat(64))
1733 }
1734
1735 fn set_root(&mut self, _root: &str) -> std::result::Result<(), Self::Error> {
1736 Ok(())
1737 }
1738
1739 fn handle_event(
1740 &mut self,
1741 _event: &NostrEvent,
1742 _allow_unknown_authors: bool,
1743 _overmute_threshold: f64,
1744 ) -> std::result::Result<(), Self::Error> {
1745 Ok(())
1746 }
1747
1748 fn get_follow_distance(&self, _user: &str) -> std::result::Result<u32, Self::Error> {
1749 Ok(0)
1750 }
1751
1752 fn is_following(
1753 &self,
1754 _follower: &str,
1755 _followed_user: &str,
1756 ) -> std::result::Result<bool, Self::Error> {
1757 Ok(false)
1758 }
1759
1760 fn get_followed_by_user(
1761 &self,
1762 user: &str,
1763 ) -> std::result::Result<Vec<String>, Self::Error> {
1764 if user == "0".repeat(64) {
1765 return Ok(vec![
1766 "1".repeat(64),
1767 "NOT-HEX".to_string(),
1768 "a".repeat(63),
1769 "A".repeat(64),
1770 ]);
1771 }
1772 Ok(Vec::new())
1773 }
1774
1775 fn get_followers_by_user(
1776 &self,
1777 _user: &str,
1778 ) -> std::result::Result<Vec<String>, Self::Error> {
1779 Ok(Vec::new())
1780 }
1781
1782 fn get_muted_by_user(&self, _user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1783 Ok(Vec::new())
1784 }
1785
1786 fn get_user_muted_by(&self, _user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1787 Ok(Vec::new())
1788 }
1789
1790 fn get_follow_list_created_at(
1791 &self,
1792 _user: &str,
1793 ) -> std::result::Result<Option<u64>, Self::Error> {
1794 Ok(None)
1795 }
1796
1797 fn get_mute_list_created_at(
1798 &self,
1799 _user: &str,
1800 ) -> std::result::Result<Option<u64>, Self::Error> {
1801 Ok(None)
1802 }
1803
1804 fn is_overmuted(
1805 &self,
1806 _user: &str,
1807 _threshold: f64,
1808 ) -> std::result::Result<bool, Self::Error> {
1809 Ok(false)
1810 }
1811 }
1812
1813 #[test]
1814 fn rejects_invalid_stored_event_shape() {
1815 let bridge = NostrBridge::new(Arc::new(MemoryStore::new()), CrawlConfig::default());
1816 let invalid = StoredNostrEvent {
1817 id: "f".repeat(64),
1818 pubkey: "not-hex".to_string(),
1819 created_at: 1,
1820 kind: 1,
1821 tags: Vec::new(),
1822 content: String::new(),
1823 sig: "f".repeat(128),
1824 };
1825
1826 assert!(!bridge.is_valid_stored_event(&invalid));
1827 }
1828
1829 #[test]
1830 fn collect_authors_skips_invalid_graph_pubkeys() {
1831 let bridge = NostrBridge::new(Arc::new(MemoryStore::new()), CrawlConfig::default());
1832 let authors = bridge
1833 .collect_authors(&FakeGraphBackend)
1834 .expect("collect authors");
1835
1836 assert_eq!(authors, vec!["0".repeat(64), "1".repeat(64)]);
1837 }
1838
1839 #[test]
1840 fn collect_authors_prefers_allowlist_and_applies_limits() {
1841 let bridge = NostrBridge::new(
1842 Arc::new(MemoryStore::new()),
1843 CrawlConfig {
1844 author_allowlist: Some(vec![
1845 "1".repeat(64),
1846 "NOT-HEX".to_string(),
1847 "0".repeat(64),
1848 "1".repeat(64),
1849 ]),
1850 max_authors: Some(1),
1851 ..CrawlConfig::default()
1852 },
1853 );
1854 let authors = bridge
1855 .collect_authors(&FakeGraphBackend)
1856 .expect("collect authors");
1857
1858 assert_eq!(authors, vec!["1".repeat(64)]);
1859 }
1860}