1use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
2use std::sync::Arc;
3use std::time::Duration;
4
5use hashtree_core::{Cid, Store};
6use hashtree_nostr::{ListEventsOptions, NostrEventStore, NostrEventStoreError, StoredNostrEvent};
7use nostr_sdk::{
8 pool::RelayLimits, Client, EventId, Filter, Keys, Kind, NegentropyOptions, Options, PublicKey,
9 Timestamp,
10};
11use nostr_social_graph::SocialGraphBackend;
12
13const NEGENTROPY_FETCH_CHUNK_SIZE: usize = 256;
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum RelayFetchMode {
16 AuthorBatches,
17 GlobalRecent,
18}
19
20#[derive(Debug, Clone)]
21pub struct CrawlConfig {
22 pub relays: Vec<String>,
23 pub author_allowlist: Option<Vec<String>>,
24 pub max_live_bytes: Option<u64>,
25 pub max_events_seen: Option<usize>,
26 pub max_authors: Option<usize>,
27 pub max_follow_distance: Option<u32>,
28 pub author_batch_size: usize,
29 pub per_author_event_limit: usize,
30 pub per_author_live_bytes: Option<u64>,
31 pub fetch_timeout: Duration,
32 pub kinds: Option<Vec<u16>>,
33 pub relay_fetch_mode: RelayFetchMode,
34 pub require_negentropy: bool,
35 pub relay_event_max_size: Option<u32>,
36 pub relay_page_size: usize,
37 pub max_relay_pages: usize,
38}
39
40impl Default for CrawlConfig {
41 fn default() -> Self {
42 Self {
43 relays: Vec::new(),
44 author_allowlist: None,
45 max_live_bytes: None,
46 max_events_seen: None,
47 max_authors: None,
48 max_follow_distance: Some(1),
49 author_batch_size: 64,
50 per_author_event_limit: 256,
51 per_author_live_bytes: None,
52 fetch_timeout: Duration::from_secs(10),
53 kinds: None,
54 relay_fetch_mode: RelayFetchMode::AuthorBatches,
55 require_negentropy: false,
56 relay_event_max_size: None,
57 relay_page_size: 1_000,
58 max_relay_pages: 10,
59 }
60 }
61}
62
63#[derive(Debug, Clone, Default, PartialEq)]
64pub struct CrawlReport {
65 pub root: Option<Cid>,
66 pub authors_considered: usize,
67 pub authors_processed: usize,
68 pub events_seen: usize,
69 pub events_selected: usize,
70 pub live_bytes_selected: u64,
71}
72
73pub trait EventSelectionPolicy: Send + Sync {
74 fn priority(&self, event: &StoredNostrEvent) -> i32;
75}
76
77#[derive(Debug, Clone)]
78pub struct KindPriorityPolicy {
79 default_priority: i32,
80 priorities: BTreeMap<u32, i32>,
81}
82
83impl Default for KindPriorityPolicy {
84 fn default() -> Self {
85 let mut priorities = BTreeMap::new();
86 priorities.insert(1, 1_000);
87 priorities.insert(0, 900);
88 priorities.insert(3, 800);
89 priorities.insert(10_000, 750);
90 priorities.insert(6, 600);
91 priorities.insert(7, 500);
92 Self {
93 default_priority: 100,
94 priorities,
95 }
96 }
97}
98
99impl KindPriorityPolicy {
100 pub fn with_priority(mut self, kind: u32, priority: i32) -> Self {
101 self.priorities.insert(kind, priority);
102 self
103 }
104}
105
106impl EventSelectionPolicy for KindPriorityPolicy {
107 fn priority(&self, event: &StoredNostrEvent) -> i32 {
108 self.priorities
109 .get(&event.kind)
110 .copied()
111 .unwrap_or(self.default_priority)
112 }
113}
114
115#[derive(Debug, thiserror::Error)]
116pub enum CrawlError {
117 #[error("event store error: {0}")]
118 EventStore(#[from] NostrEventStoreError),
119 #[error("crawl requires at least one relay")]
120 MissingRelays,
121 #[error("per-author event limit must be greater than zero")]
122 InvalidPerAuthorLimit,
123 #[error("per-author live byte cap must be greater than zero")]
124 InvalidPerAuthorLiveBytes,
125 #[error("author batch size must be greater than zero")]
126 InvalidAuthorBatchSize,
127 #[error("relay page size must be greater than zero")]
128 InvalidRelayPageSize,
129 #[error("max relay pages must be greater than zero")]
130 InvalidMaxRelayPages,
131 #[error("max events seen must be greater than zero")]
132 InvalidMaxEventsSeen,
133 #[error("relay event max size must be greater than zero")]
134 InvalidRelayEventMaxSize,
135 #[error("nostr error: {0}")]
136 Nostr(String),
137 #[error("social graph error: {0}")]
138 SocialGraph(String),
139}
140
141pub type Result<T> = std::result::Result<T, CrawlError>;
142
143#[derive(Debug, Default)]
144struct RelayFetchResult {
145 events_seen: usize,
146 events: Vec<StoredNostrEvent>,
147 supports_negentropy: bool,
148}
149
150#[derive(Debug, Default)]
151struct BatchCrawlReport {
152 events_seen: usize,
153 events: Vec<StoredNostrEvent>,
154 live_bytes_selected: u64,
155}
156
157#[derive(Debug, Default)]
158struct GlobalRecentState {
159 current_root: Option<Cid>,
160 retained_by_author: BTreeMap<String, Vec<StoredNostrEvent>>,
161 events_selected: usize,
162 live_bytes_selected: u64,
163}
164
165pub struct NostrBridge<S: Store> {
166 event_store: NostrEventStore<S>,
167 config: CrawlConfig,
168 policy: Arc<dyn EventSelectionPolicy>,
169}
170
171impl<S: Store> NostrBridge<S> {
172 pub fn new(store: Arc<S>, config: CrawlConfig) -> Self {
173 Self {
174 event_store: NostrEventStore::new(store),
175 config,
176 policy: Arc::new(KindPriorityPolicy::default()),
177 }
178 }
179
180 pub fn with_policy(mut self, policy: Arc<dyn EventSelectionPolicy>) -> Self {
181 self.policy = policy;
182 self
183 }
184
185 pub async fn crawl<G: SocialGraphBackend>(
186 &self,
187 graph: &G,
188 existing_root: Option<&Cid>,
189 ) -> Result<CrawlReport> {
190 self.crawl_with_progress(graph, existing_root, |_| {}).await
191 }
192
193 pub async fn crawl_with_progress<G, F>(
194 &self,
195 graph: &G,
196 existing_root: Option<&Cid>,
197 mut on_progress: F,
198 ) -> Result<CrawlReport>
199 where
200 G: SocialGraphBackend,
201 F: FnMut(&CrawlReport),
202 {
203 self.validate_config()?;
204
205 let authors = self.collect_authors(graph)?;
206 if authors.is_empty() {
207 return Ok(CrawlReport::default());
208 }
209
210 let client = self.connect_client().await?;
211
212 if self.config.relay_fetch_mode == RelayFetchMode::AuthorBatches {
213 return self
214 .crawl_author_batches(&client, &authors, existing_root, &mut on_progress)
215 .await;
216 }
217
218 let state = self
219 .load_existing_global_state(existing_root, &authors)
220 .await?;
221
222 let report = self
223 .crawl_global_recent_incremental(&client, &authors, state, &mut on_progress)
224 .await?;
225 on_progress(&report);
226 Ok(report)
227 }
228
229 async fn crawl_author_batches(
230 &self,
231 client: &Client,
232 authors: &[String],
233 existing_root: Option<&Cid>,
234 on_progress: &mut impl FnMut(&CrawlReport),
235 ) -> Result<CrawlReport> {
236 let mut relay_negentropy_support = BTreeMap::<String, bool>::new();
237 let mut failed_relays = BTreeSet::<String>::new();
238 let mut current_root = existing_root.cloned();
239 let mut events_seen = 0usize;
240 let mut events_selected = 0usize;
241 let mut live_bytes_selected = 0u64;
242 let mut authors_processed = 0usize;
243
244 for author_batch in authors.chunks(self.config.author_batch_size) {
245 let batch = self
246 .crawl_author_batch(
247 client,
248 author_batch,
249 current_root.as_ref(),
250 &mut relay_negentropy_support,
251 &mut failed_relays,
252 live_bytes_selected,
253 )
254 .await?;
255 events_seen = events_seen.saturating_add(batch.events_seen);
256 events_selected = events_selected.saturating_add(batch.events.len());
257 live_bytes_selected = batch.live_bytes_selected;
258 authors_processed = authors_processed.saturating_add(author_batch.len());
259 if !batch.events.is_empty() {
260 current_root = self
261 .event_store
262 .build(current_root.as_ref(), batch.events)
263 .await?;
264 }
265 on_progress(&CrawlReport {
266 root: current_root.clone(),
267 authors_considered: authors.len(),
268 authors_processed,
269 events_seen,
270 events_selected,
271 live_bytes_selected,
272 });
273 if self.reached_events_seen_limit(events_seen) {
274 break;
275 }
276 }
277
278 Ok(CrawlReport {
279 root: current_root,
280 authors_considered: authors.len(),
281 authors_processed,
282 events_seen,
283 events_selected,
284 live_bytes_selected,
285 })
286 }
287
288 async fn load_existing_global_state(
289 &self,
290 root: Option<&Cid>,
291 authors: &[String],
292 ) -> Result<GlobalRecentState> {
293 let Some(root) = root else {
294 return Ok(GlobalRecentState::default());
295 };
296
297 match self
298 .event_store
299 .list_recent(Some(root), ListEventsOptions::default())
300 .await
301 {
302 Ok(events) => {
303 let author_set = authors.iter().map(String::as_str).collect::<BTreeSet<_>>();
304 let mut retained_by_author = BTreeMap::<String, Vec<StoredNostrEvent>>::new();
305 for event in events {
306 if !author_set.contains(event.pubkey.as_str()) || !self.kind_allowed(event.kind)
307 {
308 continue;
309 }
310 if !self.is_valid_stored_event(&event) {
311 continue;
312 }
313 retained_by_author
314 .entry(event.pubkey.clone())
315 .or_default()
316 .push(event);
317 }
318
319 let mut state = GlobalRecentState {
320 current_root: Some(root.clone()),
321 ..GlobalRecentState::default()
322 };
323 for (author, events) in retained_by_author {
324 let selected = self.select_author_events(events)?;
325 state.events_selected = state.events_selected.saturating_add(selected.len());
326 state.live_bytes_selected = state
327 .live_bytes_selected
328 .saturating_add(self.encoded_events_size(&selected)?);
329 state.retained_by_author.insert(author, selected);
330 }
331 Ok(state)
332 }
333 Err(NostrEventStoreError::Validation(message))
334 if message == "stored nostr event blob is missing" =>
335 {
336 eprintln!(
337 "Falling back to per-author resume for existing root due to missing event blobs"
338 );
339 let mut state = self
340 .load_existing_global_state_by_author(Some(root), authors)
341 .await?;
342 state.current_root = self.rebuild_root_from_retained_state(&state).await?;
343 Ok(state)
344 }
345 Err(err) => Err(err.into()),
346 }
347 }
348
349 async fn load_existing_global_state_by_author(
350 &self,
351 root: Option<&Cid>,
352 authors: &[String],
353 ) -> Result<GlobalRecentState> {
354 let mut state = GlobalRecentState {
355 current_root: root.cloned(),
356 ..GlobalRecentState::default()
357 };
358 for author in authors {
359 let retained = self
360 .load_retained_events(root, author)
361 .await?
362 .into_iter()
363 .filter(|event| self.kind_allowed(event.kind))
364 .filter(|event| self.is_valid_stored_event(event))
365 .collect::<Vec<_>>();
366 let retained = self.select_author_events(retained)?;
367 state.events_selected = state.events_selected.saturating_add(retained.len());
368 state.live_bytes_selected = state
369 .live_bytes_selected
370 .saturating_add(self.encoded_events_size(&retained)?);
371 state.retained_by_author.insert(author.clone(), retained);
372 }
373 Ok(state)
374 }
375
376 async fn rebuild_root_from_retained_state(
377 &self,
378 state: &GlobalRecentState,
379 ) -> Result<Option<Cid>> {
380 let events = state
381 .retained_by_author
382 .values()
383 .flat_map(|events| events.iter().cloned())
384 .collect::<Vec<_>>();
385 self.event_store
386 .build(None, events)
387 .await
388 .map_err(Into::into)
389 }
390
391 fn validate_config(&self) -> Result<()> {
392 if self.config.relays.is_empty() {
393 return Err(CrawlError::MissingRelays);
394 }
395 if self.config.per_author_event_limit == 0 {
396 return Err(CrawlError::InvalidPerAuthorLimit);
397 }
398 if self.config.per_author_live_bytes == Some(0) {
399 return Err(CrawlError::InvalidPerAuthorLiveBytes);
400 }
401 if self.config.author_batch_size == 0 {
402 return Err(CrawlError::InvalidAuthorBatchSize);
403 }
404 if self.config.relay_page_size == 0 {
405 return Err(CrawlError::InvalidRelayPageSize);
406 }
407 if self.config.max_relay_pages == 0 {
408 return Err(CrawlError::InvalidMaxRelayPages);
409 }
410 if self.config.max_events_seen == Some(0) {
411 return Err(CrawlError::InvalidMaxEventsSeen);
412 }
413 if self.config.relay_event_max_size == Some(0) {
414 return Err(CrawlError::InvalidRelayEventMaxSize);
415 }
416 Ok(())
417 }
418
419 fn collect_authors<G: SocialGraphBackend>(&self, graph: &G) -> Result<Vec<String>> {
420 if let Some(author_allowlist) = &self.config.author_allowlist {
421 let mut seen = HashSet::new();
422 let mut authors = Vec::new();
423 for author in author_allowlist {
424 if !is_valid_hex_pubkey(author) {
425 continue;
426 }
427 if seen.insert(author.clone()) {
428 authors.push(author.clone());
429 }
430 }
431 if let Some(max_authors) = self.config.max_authors {
432 authors.truncate(max_authors);
433 }
434 return Ok(authors);
435 }
436
437 let root = graph
438 .get_root()
439 .map_err(|err| CrawlError::SocialGraph(err.to_string()))?;
440 let mut visited = BTreeSet::new();
441 let mut authors = Vec::new();
442 let mut queue = VecDeque::from([(root.clone(), 0u32)]);
443 visited.insert(root);
444
445 while let Some((author, distance)) = queue.pop_front() {
446 if !is_valid_hex_pubkey(&author) {
447 continue;
448 }
449 authors.push(author.clone());
450 if self
451 .config
452 .max_authors
453 .is_some_and(|max_authors| authors.len() >= max_authors)
454 {
455 break;
456 }
457 if self
458 .config
459 .max_follow_distance
460 .is_some_and(|max_distance| distance >= max_distance)
461 {
462 continue;
463 }
464
465 let mut follows = graph
466 .get_followed_by_user(&author)
467 .map_err(|err| CrawlError::SocialGraph(err.to_string()))?;
468 follows.retain(|followed| is_valid_hex_pubkey(followed));
469 follows.sort();
470 for followed in follows {
471 if visited.insert(followed.clone()) {
472 queue.push_back((followed, distance.saturating_add(1)));
473 }
474 }
475 }
476
477 Ok(authors)
478 }
479
480 async fn connect_client(&self) -> Result<Client> {
481 let client = if let Some(max_size) = self.config.relay_event_max_size {
482 let mut limits = RelayLimits::default();
483 limits.events.max_size = Some(max_size);
484 Client::with_opts(Keys::generate(), Options::new().relay_limits(limits))
485 } else {
486 Client::new(Keys::generate())
487 };
488 for relay in &self.config.relays {
489 client
490 .add_relay(relay)
491 .await
492 .map_err(|err| CrawlError::Nostr(err.to_string()))?;
493 }
494 client.connect().await;
495 tokio::time::sleep(Duration::from_millis(250)).await;
496 Ok(client)
497 }
498
499 async fn crawl_author_batch(
500 &self,
501 client: &Client,
502 author_batch: &[String],
503 current_root: Option<&Cid>,
504 relay_negentropy_support: &mut BTreeMap<String, bool>,
505 failed_relays: &mut BTreeSet<String>,
506 live_bytes_selected_so_far: u64,
507 ) -> Result<BatchCrawlReport> {
508 let mut existing_by_author = BTreeMap::<String, Vec<StoredNostrEvent>>::new();
509 let mut known = BTreeMap::<String, StoredNostrEvent>::new();
510 for author in author_batch {
511 let retained = self
512 .load_retained_events(current_root, author)
513 .await?
514 .into_iter()
515 .filter(|event| self.kind_allowed(event.kind))
516 .filter(|event| self.is_valid_stored_event(event))
517 .collect::<Vec<_>>();
518 for event in &retained {
519 known.insert(event.id.clone(), event.clone());
520 }
521 existing_by_author.insert(author.clone(), retained);
522 }
523
524 let pubkeys: Vec<PublicKey> = author_batch
525 .iter()
526 .filter_map(|author| author.parse::<PublicKey>().ok())
527 .collect();
528 if pubkeys.is_empty() {
529 return Ok(BatchCrawlReport {
530 events_seen: 0,
531 events: Vec::new(),
532 live_bytes_selected: live_bytes_selected_so_far,
533 });
534 }
535
536 let filter = self.batch_filter(pubkeys);
537 let mut fetched = BTreeMap::<String, StoredNostrEvent>::new();
538 let mut events_seen = 0usize;
539
540 for relay in &self.config.relays {
541 if failed_relays.contains(relay) {
542 continue;
543 }
544 let local_items = self.local_items_for_batch(known.values(), author_batch);
545 let relay_support = relay_negentropy_support.get(relay).copied();
546 let fetched_from_relay = match self
547 .fetch_events_from_relay(client, relay, filter.clone(), local_items, relay_support)
548 .await
549 {
550 Ok(result) => result,
551 Err(err) => {
552 eprintln!("Skipping relay {relay}: {err}");
553 failed_relays.insert(relay.clone());
554 continue;
555 }
556 };
557 relay_negentropy_support.insert(relay.clone(), fetched_from_relay.supports_negentropy);
558 events_seen = events_seen.saturating_add(fetched_from_relay.events_seen);
559 for event in fetched_from_relay.events {
560 if self.kind_allowed(event.kind)
561 && known.insert(event.id.clone(), event.clone()).is_none()
562 {
563 fetched.insert(event.id.clone(), event);
564 }
565 }
566 }
567
568 let mut fetched_by_author = BTreeMap::<String, Vec<StoredNostrEvent>>::new();
569 for event in fetched.into_values() {
570 fetched_by_author
571 .entry(event.pubkey.clone())
572 .or_default()
573 .push(event);
574 }
575
576 let mut selected = Vec::new();
577 for author in author_batch {
578 let mut merged: BTreeMap<String, StoredNostrEvent> = BTreeMap::new();
579 if let Some(existing_events) = existing_by_author.remove(author) {
580 for event in existing_events {
581 merged.insert(event.id.clone(), event);
582 }
583 }
584 if let Some(events) = fetched_by_author.remove(author) {
585 for event in events {
586 merged.insert(event.id.clone(), event);
587 }
588 }
589 selected.extend(self.select_author_events(merged.into_values().collect())?);
590 }
591
592 let selected = selected
593 .into_iter()
594 .filter(|event| self.is_valid_stored_event(event))
595 .collect::<Vec<_>>();
596 let (selected, live_bytes_selected) =
597 self.apply_live_byte_cap_from(selected, live_bytes_selected_so_far)?;
598
599 Ok(BatchCrawlReport {
600 events_seen,
601 events: selected,
602 live_bytes_selected,
603 })
604 }
605
606 async fn crawl_global_recent_incremental(
607 &self,
608 client: &Client,
609 authors: &[String],
610 mut state: GlobalRecentState,
611 on_progress: &mut impl FnMut(&CrawlReport),
612 ) -> Result<CrawlReport> {
613 let authors = authors.iter().map(String::as_str).collect::<BTreeSet<_>>();
614 let mut known_ids = state
615 .retained_by_author
616 .values()
617 .flat_map(|events| events.iter().map(|event| event.id.clone()))
618 .collect::<BTreeSet<_>>();
619 let mut authors_processed = state
620 .retained_by_author
621 .values()
622 .filter(|events| !events.is_empty())
623 .count();
624 let mut failed_relays = BTreeSet::<String>::new();
625 let mut events_seen = 0usize;
626
627 for relay in &self.config.relays {
628 if failed_relays.contains(relay) {
629 continue;
630 }
631 let mut until = None;
632 for _ in 0..self.config.max_relay_pages {
633 let filter = self.global_recent_filter(until);
634 let events = match client
635 .get_events_from([relay], vec![filter], Some(self.config.fetch_timeout))
636 .await
637 {
638 Ok(events) => events,
639 Err(err) => {
640 eprintln!("Skipping relay {relay}: {}", err);
641 failed_relays.insert(relay.clone());
642 break;
643 }
644 };
645 let fetched_count = events.len();
646 events_seen = events_seen.saturating_add(fetched_count);
647 if fetched_count == 0 {
648 break;
649 }
650
651 let mut min_created_at = u64::MAX;
652 let mut pending_apply = Vec::new();
653 for event in events {
654 min_created_at = min_created_at.min(event.created_at.as_u64());
655 if event.kind.is_ephemeral() {
656 continue;
657 }
658
659 let stored = stored_event_from_nostr(&event);
660 if !authors.contains(stored.pubkey.as_str()) || !self.kind_allowed(stored.kind)
661 {
662 continue;
663 }
664
665 let retained = state
666 .retained_by_author
667 .entry(stored.pubkey.clone())
668 .or_default();
669 let was_empty = retained.is_empty();
670 let old_len = retained.len();
671 let old_live_bytes = self.encoded_events_size(retained)?;
672 let mut merged = BTreeMap::<String, StoredNostrEvent>::new();
673 for existing in retained.drain(..) {
674 merged.insert(existing.id.clone(), existing);
675 }
676 merged.insert(stored.id.clone(), stored);
677
678 let selected = self.select_author_events(merged.into_values().collect())?;
679 let selected_live_bytes = self.encoded_events_size(&selected)?;
680 let mut newly_retained = Vec::new();
681 for selected_event in &selected {
682 if !known_ids.contains(&selected_event.id) {
683 known_ids.insert(selected_event.id.clone());
684 newly_retained.push(selected_event.clone());
685 }
686 }
687
688 state.events_selected = state
689 .events_selected
690 .saturating_sub(old_len)
691 .saturating_add(selected.len());
692 state.live_bytes_selected = state
693 .live_bytes_selected
694 .saturating_sub(old_live_bytes)
695 .saturating_add(selected_live_bytes);
696 if was_empty && !selected.is_empty() {
697 authors_processed = authors_processed.saturating_add(1);
698 }
699 *retained = selected;
700 pending_apply.extend(newly_retained);
701 }
702
703 if !pending_apply.is_empty() {
704 state.current_root = self
705 .event_store
706 .build(state.current_root.as_ref(), pending_apply)
707 .await?;
708 }
709
710 on_progress(&CrawlReport {
711 root: state.current_root.clone(),
712 authors_considered: authors.len(),
713 authors_processed,
714 events_seen,
715 events_selected: state.events_selected,
716 live_bytes_selected: state.live_bytes_selected,
717 });
718
719 if min_created_at == u64::MAX || min_created_at == 0 {
720 break;
721 }
722 let next_until = min_created_at.saturating_sub(1);
723 if until == Some(next_until) {
724 break;
725 }
726 until = Some(next_until);
727 if self.reached_events_seen_limit(events_seen) {
728 break;
729 }
730 }
731 if self.reached_events_seen_limit(events_seen) {
732 break;
733 }
734 }
735
736 Ok(CrawlReport {
737 root: state.current_root,
738 authors_considered: authors.len(),
739 authors_processed,
740 events_seen,
741 events_selected: state.events_selected,
742 live_bytes_selected: state.live_bytes_selected,
743 })
744 }
745
746 fn batch_filter(&self, pubkeys: Vec<PublicKey>) -> Filter {
747 let mut filter = Filter::new().authors(pubkeys);
748 if let Some(kinds) = &self.config.kinds {
749 filter = filter.kinds(kinds.iter().copied().map(Kind::from));
750 }
751 let relay_limit = self
752 .config
753 .author_batch_size
754 .saturating_mul(self.config.per_author_event_limit);
755 if relay_limit > 0 {
756 filter = filter.limit(relay_limit);
757 }
758 filter
759 }
760
761 fn global_recent_filter(&self, until: Option<u64>) -> Filter {
762 let mut filter = Filter::new().limit(self.config.relay_page_size);
763 if let Some(kinds) = &self.config.kinds {
764 filter = filter.kinds(kinds.iter().copied().map(Kind::from));
765 }
766 if let Some(until) = until {
767 filter = filter.until(Timestamp::from_secs(until));
768 }
769 filter
770 }
771
772 fn reached_events_seen_limit(&self, events_seen: usize) -> bool {
773 self.config
774 .max_events_seen
775 .is_some_and(|limit| events_seen >= limit)
776 }
777
778 fn is_valid_stored_event(&self, event: &StoredNostrEvent) -> bool {
779 self.event_store.encode_event(event).is_ok()
780 }
781
782 fn encoded_events_size(&self, events: &[StoredNostrEvent]) -> Result<u64> {
783 let mut total = 0u64;
784 for event in events {
785 total = total.saturating_add(self.event_store.encode_event(event)?.len() as u64);
786 }
787 Ok(total)
788 }
789
790 fn local_items_for_batch<'a, I>(
791 &self,
792 known_events: I,
793 author_batch: &[String],
794 ) -> Vec<(EventId, Timestamp)>
795 where
796 I: Iterator<Item = &'a StoredNostrEvent>,
797 {
798 let authors = author_batch
799 .iter()
800 .map(String::as_str)
801 .collect::<BTreeSet<_>>();
802
803 known_events
804 .filter(|event| {
805 authors.contains(event.pubkey.as_str()) && self.kind_allowed(event.kind)
806 })
807 .filter_map(|event| {
808 let event_id = EventId::parse(&event.id).ok()?;
809 Some((event_id, Timestamp::from_secs(event.created_at)))
810 })
811 .collect()
812 }
813
814 async fn load_retained_events(
815 &self,
816 root: Option<&Cid>,
817 author: &str,
818 ) -> Result<Vec<StoredNostrEvent>> {
819 match self
820 .event_store
821 .list_by_author(root, author, ListEventsOptions::default())
822 .await
823 {
824 Ok(events) => Ok(events),
825 Err(NostrEventStoreError::Validation(message))
826 if message == "stored nostr event blob is missing" =>
827 {
828 eprintln!(
829 "Ignoring stale indexed event references for author {}: {}",
830 author, message
831 );
832 Ok(Vec::new())
833 }
834 Err(err) => Err(err.into()),
835 }
836 }
837
838 async fn fetch_events_from_relay(
839 &self,
840 client: &Client,
841 relay: &str,
842 filter: Filter,
843 local_items: Vec<(EventId, Timestamp)>,
844 supports_negentropy: Option<bool>,
845 ) -> Result<RelayFetchResult> {
846 if supports_negentropy == Some(false) {
847 if self.config.require_negentropy {
848 return Ok(RelayFetchResult {
849 events_seen: 0,
850 events: Vec::new(),
851 supports_negentropy: false,
852 });
853 }
854 return self
855 .fetch_full_filter(client, relay, filter)
856 .await
857 .map(|events| RelayFetchResult {
858 events_seen: events.len(),
859 events,
860 supports_negentropy: false,
861 });
862 }
863
864 match client
865 .reconcile_advanced(
866 [relay],
867 filter.clone(),
868 local_items,
869 NegentropyOptions::default().dry_run(),
870 )
871 .await
872 {
873 Ok(output) if !output.success.is_empty() => {
874 let missing = output.remote.iter().cloned().collect::<Vec<_>>();
875 self.fetch_missing_ids(client, relay, missing).await.map(
876 |RelayFetchResult {
877 events_seen,
878 events,
879 ..
880 }| RelayFetchResult {
881 events_seen,
882 events,
883 supports_negentropy: true,
884 },
885 )
886 }
887 Ok(_) | Err(_) => {
888 if self.config.require_negentropy {
889 Ok(RelayFetchResult {
890 events_seen: 0,
891 events: Vec::new(),
892 supports_negentropy: false,
893 })
894 } else {
895 self.fetch_full_filter(client, relay, filter)
896 .await
897 .map(|events| RelayFetchResult {
898 events_seen: events.len(),
899 events,
900 supports_negentropy: false,
901 })
902 }
903 }
904 }
905 }
906
907 async fn fetch_missing_ids(
908 &self,
909 client: &Client,
910 relay: &str,
911 missing_ids: Vec<EventId>,
912 ) -> Result<RelayFetchResult> {
913 if missing_ids.is_empty() {
914 return Ok(RelayFetchResult {
915 events_seen: 0,
916 events: Vec::new(),
917 supports_negentropy: true,
918 });
919 }
920
921 let mut out = BTreeMap::<String, StoredNostrEvent>::new();
922 let mut events_seen = 0usize;
923 for chunk in missing_ids.chunks(NEGENTROPY_FETCH_CHUNK_SIZE) {
924 let filter = Filter::new().ids(chunk.iter().cloned());
925 let events = client
926 .get_events_from([relay], vec![filter], Some(self.config.fetch_timeout))
927 .await
928 .map_err(|err| CrawlError::Nostr(err.to_string()))?;
929 events_seen = events_seen.saturating_add(events.len());
930 for event in events {
931 if event.kind.is_ephemeral() {
932 continue;
933 }
934 let stored = stored_event_from_nostr(&event);
935 out.insert(stored.id.clone(), stored);
936 }
937 }
938 Ok(RelayFetchResult {
939 events_seen,
940 events: out.into_values().collect(),
941 supports_negentropy: true,
942 })
943 }
944
945 async fn fetch_full_filter(
946 &self,
947 client: &Client,
948 relay: &str,
949 filter: Filter,
950 ) -> Result<Vec<StoredNostrEvent>> {
951 let mut out = Vec::new();
952 let events = client
953 .get_events_from([relay], vec![filter], Some(self.config.fetch_timeout))
954 .await
955 .map_err(|err| CrawlError::Nostr(err.to_string()))?;
956
957 for event in events {
958 if event.kind.is_ephemeral() {
959 continue;
960 }
961 out.push(stored_event_from_nostr(&event));
962 }
963
964 Ok(out)
965 }
966
967 fn select_author_events(&self, events: Vec<StoredNostrEvent>) -> Result<Vec<StoredNostrEvent>> {
968 self.select_author_events_with_limits(
969 events,
970 self.config.per_author_event_limit,
971 self.config.per_author_live_bytes,
972 )
973 }
974
975 fn select_author_events_with_limits(
976 &self,
977 mut events: Vec<StoredNostrEvent>,
978 event_limit: usize,
979 live_byte_limit: Option<u64>,
980 ) -> Result<Vec<StoredNostrEvent>> {
981 events.sort_by(|left, right| {
982 self.policy
983 .priority(right)
984 .cmp(&self.policy.priority(left))
985 .then_with(|| right.created_at.cmp(&left.created_at))
986 .then_with(|| left.id.cmp(&right.id))
987 });
988
989 if let Some(max_live_bytes) = live_byte_limit {
990 let mut selected = Vec::new();
991 let mut live_bytes_selected = 0u64;
992 for event in events {
993 let encoded_len = self.event_store.encode_event(&event)?.len() as u64;
994 if live_bytes_selected.saturating_add(encoded_len) > max_live_bytes {
995 continue;
996 }
997 live_bytes_selected = live_bytes_selected.saturating_add(encoded_len);
998 selected.push(event);
999 }
1000 selected.truncate(event_limit);
1001 return Ok(selected);
1002 }
1003
1004 events.truncate(event_limit);
1005 Ok(events)
1006 }
1007
1008 fn apply_live_byte_cap_from(
1009 &self,
1010 mut events: Vec<StoredNostrEvent>,
1011 live_bytes_selected_so_far: u64,
1012 ) -> Result<(Vec<StoredNostrEvent>, u64)> {
1013 events.sort_by(|left, right| {
1014 self.policy
1015 .priority(right)
1016 .cmp(&self.policy.priority(left))
1017 .then_with(|| right.created_at.cmp(&left.created_at))
1018 .then_with(|| left.id.cmp(&right.id))
1019 });
1020
1021 let Some(max_live_bytes) = self.config.max_live_bytes else {
1022 let live_bytes_selected =
1023 events
1024 .iter()
1025 .try_fold(live_bytes_selected_so_far, |total, event| {
1026 let encoded = self.event_store.encode_event(event)?;
1027 Ok::<u64, NostrEventStoreError>(total.saturating_add(encoded.len() as u64))
1028 })?;
1029 return Ok((events, live_bytes_selected));
1030 };
1031
1032 let mut selected = Vec::new();
1033 let mut live_bytes_selected = live_bytes_selected_so_far;
1034 for event in events {
1035 let encoded_len = self.event_store.encode_event(&event)?.len() as u64;
1036 if live_bytes_selected.saturating_add(encoded_len) > max_live_bytes {
1037 continue;
1038 }
1039 live_bytes_selected = live_bytes_selected.saturating_add(encoded_len);
1040 selected.push(event);
1041 }
1042
1043 Ok((selected, live_bytes_selected))
1044 }
1045
1046 fn kind_allowed(&self, kind: u32) -> bool {
1047 self.config.kinds.as_ref().is_none_or(|allowed| {
1048 allowed
1049 .iter()
1050 .any(|candidate| u32::from(*candidate) == kind)
1051 })
1052 }
1053}
1054
1055fn stored_event_from_nostr(event: &nostr_sdk::Event) -> StoredNostrEvent {
1056 StoredNostrEvent {
1057 id: event.id.to_hex(),
1058 pubkey: event.pubkey.to_hex(),
1059 created_at: event.created_at.as_u64(),
1060 kind: event.kind.as_u16() as u32,
1061 tags: event
1062 .tags
1063 .iter()
1064 .map(|tag| tag.as_slice().to_vec())
1065 .collect(),
1066 content: event.content.clone(),
1067 sig: event.sig.to_string(),
1068 }
1069}
1070
1071fn is_valid_hex_pubkey(value: &str) -> bool {
1072 value.len() == 64
1073 && value
1074 .bytes()
1075 .all(|byte| byte.is_ascii_digit() || (b'a'..=b'f').contains(&byte))
1076}
1077
1078#[cfg(test)]
1079mod tests {
1080 use std::sync::Arc;
1081
1082 use hashtree_core::MemoryStore;
1083 use nostr_social_graph::{NostrEvent, SocialGraphBackend as NostrSocialGraphBackend};
1084
1085 use super::{CrawlConfig, NostrBridge, StoredNostrEvent};
1086
1087 #[derive(Default)]
1088 struct FakeGraphBackend;
1089
1090 impl NostrSocialGraphBackend for FakeGraphBackend {
1091 type Error = std::io::Error;
1092
1093 fn get_root(&self) -> std::result::Result<String, Self::Error> {
1094 Ok("0".repeat(64))
1095 }
1096
1097 fn set_root(&mut self, _root: &str) -> std::result::Result<(), Self::Error> {
1098 Ok(())
1099 }
1100
1101 fn handle_event(
1102 &mut self,
1103 _event: &NostrEvent,
1104 _allow_unknown_authors: bool,
1105 _overmute_threshold: f64,
1106 ) -> std::result::Result<(), Self::Error> {
1107 Ok(())
1108 }
1109
1110 fn get_follow_distance(&self, _user: &str) -> std::result::Result<u32, Self::Error> {
1111 Ok(0)
1112 }
1113
1114 fn is_following(
1115 &self,
1116 _follower: &str,
1117 _followed_user: &str,
1118 ) -> std::result::Result<bool, Self::Error> {
1119 Ok(false)
1120 }
1121
1122 fn get_followed_by_user(
1123 &self,
1124 user: &str,
1125 ) -> std::result::Result<Vec<String>, Self::Error> {
1126 if user == "0".repeat(64) {
1127 return Ok(vec![
1128 "1".repeat(64),
1129 "NOT-HEX".to_string(),
1130 "a".repeat(63),
1131 "A".repeat(64),
1132 ]);
1133 }
1134 Ok(Vec::new())
1135 }
1136
1137 fn get_followers_by_user(
1138 &self,
1139 _user: &str,
1140 ) -> std::result::Result<Vec<String>, Self::Error> {
1141 Ok(Vec::new())
1142 }
1143
1144 fn get_muted_by_user(&self, _user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1145 Ok(Vec::new())
1146 }
1147
1148 fn get_user_muted_by(&self, _user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1149 Ok(Vec::new())
1150 }
1151
1152 fn get_follow_list_created_at(
1153 &self,
1154 _user: &str,
1155 ) -> std::result::Result<Option<u64>, Self::Error> {
1156 Ok(None)
1157 }
1158
1159 fn get_mute_list_created_at(
1160 &self,
1161 _user: &str,
1162 ) -> std::result::Result<Option<u64>, Self::Error> {
1163 Ok(None)
1164 }
1165
1166 fn is_overmuted(
1167 &self,
1168 _user: &str,
1169 _threshold: f64,
1170 ) -> std::result::Result<bool, Self::Error> {
1171 Ok(false)
1172 }
1173 }
1174
1175 #[test]
1176 fn rejects_invalid_stored_event_shape() {
1177 let bridge = NostrBridge::new(Arc::new(MemoryStore::new()), CrawlConfig::default());
1178 let invalid = StoredNostrEvent {
1179 id: "f".repeat(64),
1180 pubkey: "not-hex".to_string(),
1181 created_at: 1,
1182 kind: 1,
1183 tags: Vec::new(),
1184 content: String::new(),
1185 sig: "f".repeat(128),
1186 };
1187
1188 assert!(!bridge.is_valid_stored_event(&invalid));
1189 }
1190
1191 #[test]
1192 fn collect_authors_skips_invalid_graph_pubkeys() {
1193 let bridge = NostrBridge::new(Arc::new(MemoryStore::new()), CrawlConfig::default());
1194 let authors = bridge
1195 .collect_authors(&FakeGraphBackend)
1196 .expect("collect authors");
1197
1198 assert_eq!(authors, vec!["0".repeat(64), "1".repeat(64)]);
1199 }
1200
1201 #[test]
1202 fn collect_authors_prefers_allowlist_and_applies_limits() {
1203 let bridge = NostrBridge::new(
1204 Arc::new(MemoryStore::new()),
1205 CrawlConfig {
1206 author_allowlist: Some(vec![
1207 "1".repeat(64),
1208 "NOT-HEX".to_string(),
1209 "0".repeat(64),
1210 "1".repeat(64),
1211 ]),
1212 max_authors: Some(1),
1213 ..CrawlConfig::default()
1214 },
1215 );
1216 let authors = bridge
1217 .collect_authors(&FakeGraphBackend)
1218 .expect("collect authors");
1219
1220 assert_eq!(authors, vec!["1".repeat(64)]);
1221 }
1222}