Skip to main content

hashtree_nostr_bridge/
lib.rs

1use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
2use std::sync::Arc;
3use std::time::Duration;
4
5use hashtree_core::{Cid, Store};
6use hashtree_nostr::{ListEventsOptions, NostrEventStore, NostrEventStoreError, StoredNostrEvent};
7use nostr_sdk::{
8    pool::RelayLimits, Client, EventId, Filter, Keys, Kind, NegentropyOptions, Options, PublicKey,
9    Timestamp,
10};
11use nostr_social_graph::SocialGraphBackend;
12
13const NEGENTROPY_FETCH_CHUNK_SIZE: usize = 256;
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum RelayFetchMode {
16    AuthorBatches,
17    GlobalRecent,
18}
19
20#[derive(Debug, Clone)]
21pub struct CrawlConfig {
22    pub relays: Vec<String>,
23    pub author_allowlist: Option<Vec<String>>,
24    pub max_live_bytes: Option<u64>,
25    pub max_events_seen: Option<usize>,
26    pub max_authors: Option<usize>,
27    pub max_follow_distance: Option<u32>,
28    pub author_batch_size: usize,
29    pub per_author_event_limit: usize,
30    pub per_author_live_bytes: Option<u64>,
31    pub fetch_timeout: Duration,
32    pub kinds: Option<Vec<u16>>,
33    pub relay_fetch_mode: RelayFetchMode,
34    pub require_negentropy: bool,
35    pub relay_event_max_size: Option<u32>,
36    pub relay_page_size: usize,
37    pub max_relay_pages: usize,
38}
39
40impl Default for CrawlConfig {
41    fn default() -> Self {
42        Self {
43            relays: Vec::new(),
44            author_allowlist: None,
45            max_live_bytes: None,
46            max_events_seen: None,
47            max_authors: None,
48            max_follow_distance: Some(1),
49            author_batch_size: 64,
50            per_author_event_limit: 256,
51            per_author_live_bytes: None,
52            fetch_timeout: Duration::from_secs(10),
53            kinds: None,
54            relay_fetch_mode: RelayFetchMode::AuthorBatches,
55            require_negentropy: false,
56            relay_event_max_size: None,
57            relay_page_size: 1_000,
58            max_relay_pages: 10,
59        }
60    }
61}
62
63#[derive(Debug, Clone, Default, PartialEq)]
64pub struct CrawlReport {
65    pub root: Option<Cid>,
66    pub authors_considered: usize,
67    pub authors_processed: usize,
68    pub events_seen: usize,
69    pub events_selected: usize,
70    pub live_bytes_selected: u64,
71}
72
73pub trait EventSelectionPolicy: Send + Sync {
74    fn priority(&self, event: &StoredNostrEvent) -> i32;
75}
76
77#[derive(Debug, Clone)]
78pub struct KindPriorityPolicy {
79    default_priority: i32,
80    priorities: BTreeMap<u32, i32>,
81}
82
83impl Default for KindPriorityPolicy {
84    fn default() -> Self {
85        let mut priorities = BTreeMap::new();
86        priorities.insert(1, 1_000);
87        priorities.insert(0, 900);
88        priorities.insert(3, 800);
89        priorities.insert(10_000, 750);
90        priorities.insert(6, 600);
91        priorities.insert(7, 500);
92        Self {
93            default_priority: 100,
94            priorities,
95        }
96    }
97}
98
99impl KindPriorityPolicy {
100    pub fn with_priority(mut self, kind: u32, priority: i32) -> Self {
101        self.priorities.insert(kind, priority);
102        self
103    }
104}
105
106impl EventSelectionPolicy for KindPriorityPolicy {
107    fn priority(&self, event: &StoredNostrEvent) -> i32 {
108        self.priorities
109            .get(&event.kind)
110            .copied()
111            .unwrap_or(self.default_priority)
112    }
113}
114
115#[derive(Debug, thiserror::Error)]
116pub enum CrawlError {
117    #[error("event store error: {0}")]
118    EventStore(#[from] NostrEventStoreError),
119    #[error("crawl requires at least one relay")]
120    MissingRelays,
121    #[error("per-author event limit must be greater than zero")]
122    InvalidPerAuthorLimit,
123    #[error("per-author live byte cap must be greater than zero")]
124    InvalidPerAuthorLiveBytes,
125    #[error("author batch size must be greater than zero")]
126    InvalidAuthorBatchSize,
127    #[error("relay page size must be greater than zero")]
128    InvalidRelayPageSize,
129    #[error("max relay pages must be greater than zero")]
130    InvalidMaxRelayPages,
131    #[error("max events seen must be greater than zero")]
132    InvalidMaxEventsSeen,
133    #[error("relay event max size must be greater than zero")]
134    InvalidRelayEventMaxSize,
135    #[error("nostr error: {0}")]
136    Nostr(String),
137    #[error("social graph error: {0}")]
138    SocialGraph(String),
139}
140
141pub type Result<T> = std::result::Result<T, CrawlError>;
142
143#[derive(Debug, Default)]
144struct RelayFetchResult {
145    events_seen: usize,
146    events: Vec<StoredNostrEvent>,
147    supports_negentropy: bool,
148}
149
150#[derive(Debug, Default)]
151struct BatchCrawlReport {
152    events_seen: usize,
153    events: Vec<StoredNostrEvent>,
154    live_bytes_selected: u64,
155}
156
157#[derive(Debug, Default)]
158struct GlobalRecentState {
159    current_root: Option<Cid>,
160    retained_by_author: BTreeMap<String, Vec<StoredNostrEvent>>,
161    events_selected: usize,
162    live_bytes_selected: u64,
163}
164
165pub struct NostrBridge<S: Store> {
166    event_store: NostrEventStore<S>,
167    config: CrawlConfig,
168    policy: Arc<dyn EventSelectionPolicy>,
169}
170
171impl<S: Store> NostrBridge<S> {
172    pub fn new(store: Arc<S>, config: CrawlConfig) -> Self {
173        Self {
174            event_store: NostrEventStore::new(store),
175            config,
176            policy: Arc::new(KindPriorityPolicy::default()),
177        }
178    }
179
180    pub fn with_policy(mut self, policy: Arc<dyn EventSelectionPolicy>) -> Self {
181        self.policy = policy;
182        self
183    }
184
185    pub async fn crawl<G: SocialGraphBackend>(
186        &self,
187        graph: &G,
188        existing_root: Option<&Cid>,
189    ) -> Result<CrawlReport> {
190        self.crawl_with_progress(graph, existing_root, |_| {}).await
191    }
192
193    pub async fn crawl_with_progress<G, F>(
194        &self,
195        graph: &G,
196        existing_root: Option<&Cid>,
197        mut on_progress: F,
198    ) -> Result<CrawlReport>
199    where
200        G: SocialGraphBackend,
201        F: FnMut(&CrawlReport),
202    {
203        self.validate_config()?;
204
205        let authors = self.collect_authors(graph)?;
206        if authors.is_empty() {
207            return Ok(CrawlReport::default());
208        }
209
210        let client = self.connect_client().await?;
211
212        if self.config.relay_fetch_mode == RelayFetchMode::AuthorBatches {
213            return self
214                .crawl_author_batches(&client, &authors, existing_root, &mut on_progress)
215                .await;
216        }
217
218        let state = self
219            .load_existing_global_state(existing_root, &authors)
220            .await?;
221
222        let report = self
223            .crawl_global_recent_incremental(&client, &authors, state, &mut on_progress)
224            .await?;
225        on_progress(&report);
226        Ok(report)
227    }
228
229    async fn crawl_author_batches(
230        &self,
231        client: &Client,
232        authors: &[String],
233        existing_root: Option<&Cid>,
234        on_progress: &mut impl FnMut(&CrawlReport),
235    ) -> Result<CrawlReport> {
236        let mut relay_negentropy_support = BTreeMap::<String, bool>::new();
237        let mut failed_relays = BTreeSet::<String>::new();
238        let mut current_root = existing_root.cloned();
239        let mut events_seen = 0usize;
240        let mut events_selected = 0usize;
241        let mut live_bytes_selected = 0u64;
242        let mut authors_processed = 0usize;
243
244        for author_batch in authors.chunks(self.config.author_batch_size) {
245            let batch = self
246                .crawl_author_batch(
247                    client,
248                    author_batch,
249                    current_root.as_ref(),
250                    &mut relay_negentropy_support,
251                    &mut failed_relays,
252                    live_bytes_selected,
253                )
254                .await?;
255            events_seen = events_seen.saturating_add(batch.events_seen);
256            events_selected = events_selected.saturating_add(batch.events.len());
257            live_bytes_selected = batch.live_bytes_selected;
258            authors_processed = authors_processed.saturating_add(author_batch.len());
259            if !batch.events.is_empty() {
260                current_root = self
261                    .event_store
262                    .build(current_root.as_ref(), batch.events)
263                    .await?;
264            }
265            on_progress(&CrawlReport {
266                root: current_root.clone(),
267                authors_considered: authors.len(),
268                authors_processed,
269                events_seen,
270                events_selected,
271                live_bytes_selected,
272            });
273            if self.reached_events_seen_limit(events_seen) {
274                break;
275            }
276        }
277
278        Ok(CrawlReport {
279            root: current_root,
280            authors_considered: authors.len(),
281            authors_processed,
282            events_seen,
283            events_selected,
284            live_bytes_selected,
285        })
286    }
287
288    async fn load_existing_global_state(
289        &self,
290        root: Option<&Cid>,
291        authors: &[String],
292    ) -> Result<GlobalRecentState> {
293        let Some(root) = root else {
294            return Ok(GlobalRecentState::default());
295        };
296
297        match self
298            .event_store
299            .list_recent(Some(root), ListEventsOptions::default())
300            .await
301        {
302            Ok(events) => {
303                let author_set = authors.iter().map(String::as_str).collect::<BTreeSet<_>>();
304                let mut retained_by_author = BTreeMap::<String, Vec<StoredNostrEvent>>::new();
305                for event in events {
306                    if !author_set.contains(event.pubkey.as_str()) || !self.kind_allowed(event.kind)
307                    {
308                        continue;
309                    }
310                    if !self.is_valid_stored_event(&event) {
311                        continue;
312                    }
313                    retained_by_author
314                        .entry(event.pubkey.clone())
315                        .or_default()
316                        .push(event);
317                }
318
319                let mut state = GlobalRecentState {
320                    current_root: Some(root.clone()),
321                    ..GlobalRecentState::default()
322                };
323                for (author, events) in retained_by_author {
324                    let selected = self.select_author_events(events)?;
325                    state.events_selected = state.events_selected.saturating_add(selected.len());
326                    state.live_bytes_selected = state
327                        .live_bytes_selected
328                        .saturating_add(self.encoded_events_size(&selected)?);
329                    state.retained_by_author.insert(author, selected);
330                }
331                Ok(state)
332            }
333            Err(NostrEventStoreError::Validation(message))
334                if message == "stored nostr event blob is missing" =>
335            {
336                eprintln!(
337                    "Falling back to per-author resume for existing root due to missing event blobs"
338                );
339                let mut state = self
340                    .load_existing_global_state_by_author(Some(root), authors)
341                    .await?;
342                state.current_root = self.rebuild_root_from_retained_state(&state).await?;
343                Ok(state)
344            }
345            Err(err) => Err(err.into()),
346        }
347    }
348
349    async fn load_existing_global_state_by_author(
350        &self,
351        root: Option<&Cid>,
352        authors: &[String],
353    ) -> Result<GlobalRecentState> {
354        let mut state = GlobalRecentState {
355            current_root: root.cloned(),
356            ..GlobalRecentState::default()
357        };
358        for author in authors {
359            let retained = self
360                .load_retained_events(root, author)
361                .await?
362                .into_iter()
363                .filter(|event| self.kind_allowed(event.kind))
364                .filter(|event| self.is_valid_stored_event(event))
365                .collect::<Vec<_>>();
366            let retained = self.select_author_events(retained)?;
367            state.events_selected = state.events_selected.saturating_add(retained.len());
368            state.live_bytes_selected = state
369                .live_bytes_selected
370                .saturating_add(self.encoded_events_size(&retained)?);
371            state.retained_by_author.insert(author.clone(), retained);
372        }
373        Ok(state)
374    }
375
376    async fn rebuild_root_from_retained_state(
377        &self,
378        state: &GlobalRecentState,
379    ) -> Result<Option<Cid>> {
380        let events = state
381            .retained_by_author
382            .values()
383            .flat_map(|events| events.iter().cloned())
384            .collect::<Vec<_>>();
385        self.event_store
386            .build(None, events)
387            .await
388            .map_err(Into::into)
389    }
390
391    fn validate_config(&self) -> Result<()> {
392        if self.config.relays.is_empty() {
393            return Err(CrawlError::MissingRelays);
394        }
395        if self.config.per_author_event_limit == 0 {
396            return Err(CrawlError::InvalidPerAuthorLimit);
397        }
398        if self.config.per_author_live_bytes == Some(0) {
399            return Err(CrawlError::InvalidPerAuthorLiveBytes);
400        }
401        if self.config.author_batch_size == 0 {
402            return Err(CrawlError::InvalidAuthorBatchSize);
403        }
404        if self.config.relay_page_size == 0 {
405            return Err(CrawlError::InvalidRelayPageSize);
406        }
407        if self.config.max_relay_pages == 0 {
408            return Err(CrawlError::InvalidMaxRelayPages);
409        }
410        if self.config.max_events_seen == Some(0) {
411            return Err(CrawlError::InvalidMaxEventsSeen);
412        }
413        if self.config.relay_event_max_size == Some(0) {
414            return Err(CrawlError::InvalidRelayEventMaxSize);
415        }
416        Ok(())
417    }
418
419    fn collect_authors<G: SocialGraphBackend>(&self, graph: &G) -> Result<Vec<String>> {
420        if let Some(author_allowlist) = &self.config.author_allowlist {
421            let mut seen = HashSet::new();
422            let mut authors = Vec::new();
423            for author in author_allowlist {
424                if !is_valid_hex_pubkey(author) {
425                    continue;
426                }
427                if seen.insert(author.clone()) {
428                    authors.push(author.clone());
429                }
430            }
431            if let Some(max_authors) = self.config.max_authors {
432                authors.truncate(max_authors);
433            }
434            return Ok(authors);
435        }
436
437        let root = graph
438            .get_root()
439            .map_err(|err| CrawlError::SocialGraph(err.to_string()))?;
440        let mut visited = BTreeSet::new();
441        let mut authors = Vec::new();
442        let mut queue = VecDeque::from([(root.clone(), 0u32)]);
443        visited.insert(root);
444
445        while let Some((author, distance)) = queue.pop_front() {
446            if !is_valid_hex_pubkey(&author) {
447                continue;
448            }
449            authors.push(author.clone());
450            if self
451                .config
452                .max_authors
453                .is_some_and(|max_authors| authors.len() >= max_authors)
454            {
455                break;
456            }
457            if self
458                .config
459                .max_follow_distance
460                .is_some_and(|max_distance| distance >= max_distance)
461            {
462                continue;
463            }
464
465            let mut follows = graph
466                .get_followed_by_user(&author)
467                .map_err(|err| CrawlError::SocialGraph(err.to_string()))?;
468            follows.retain(|followed| is_valid_hex_pubkey(followed));
469            follows.sort();
470            for followed in follows {
471                if visited.insert(followed.clone()) {
472                    queue.push_back((followed, distance.saturating_add(1)));
473                }
474            }
475        }
476
477        Ok(authors)
478    }
479
480    async fn connect_client(&self) -> Result<Client> {
481        let client = if let Some(max_size) = self.config.relay_event_max_size {
482            let mut limits = RelayLimits::default();
483            limits.events.max_size = Some(max_size);
484            Client::with_opts(Keys::generate(), Options::new().relay_limits(limits))
485        } else {
486            Client::new(Keys::generate())
487        };
488        for relay in &self.config.relays {
489            client
490                .add_relay(relay)
491                .await
492                .map_err(|err| CrawlError::Nostr(err.to_string()))?;
493        }
494        client.connect().await;
495        tokio::time::sleep(Duration::from_millis(250)).await;
496        Ok(client)
497    }
498
499    async fn crawl_author_batch(
500        &self,
501        client: &Client,
502        author_batch: &[String],
503        current_root: Option<&Cid>,
504        relay_negentropy_support: &mut BTreeMap<String, bool>,
505        failed_relays: &mut BTreeSet<String>,
506        live_bytes_selected_so_far: u64,
507    ) -> Result<BatchCrawlReport> {
508        let mut existing_by_author = BTreeMap::<String, Vec<StoredNostrEvent>>::new();
509        let mut known = BTreeMap::<String, StoredNostrEvent>::new();
510        for author in author_batch {
511            let retained = self
512                .load_retained_events(current_root, author)
513                .await?
514                .into_iter()
515                .filter(|event| self.kind_allowed(event.kind))
516                .filter(|event| self.is_valid_stored_event(event))
517                .collect::<Vec<_>>();
518            for event in &retained {
519                known.insert(event.id.clone(), event.clone());
520            }
521            existing_by_author.insert(author.clone(), retained);
522        }
523
524        let pubkeys: Vec<PublicKey> = author_batch
525            .iter()
526            .filter_map(|author| author.parse::<PublicKey>().ok())
527            .collect();
528        if pubkeys.is_empty() {
529            return Ok(BatchCrawlReport {
530                events_seen: 0,
531                events: Vec::new(),
532                live_bytes_selected: live_bytes_selected_so_far,
533            });
534        }
535
536        let filter = self.batch_filter(pubkeys);
537        let mut fetched = BTreeMap::<String, StoredNostrEvent>::new();
538        let mut events_seen = 0usize;
539
540        for relay in &self.config.relays {
541            if failed_relays.contains(relay) {
542                continue;
543            }
544            let local_items = self.local_items_for_batch(known.values(), author_batch);
545            let relay_support = relay_negentropy_support.get(relay).copied();
546            let fetched_from_relay = match self
547                .fetch_events_from_relay(client, relay, filter.clone(), local_items, relay_support)
548                .await
549            {
550                Ok(result) => result,
551                Err(err) => {
552                    eprintln!("Skipping relay {relay}: {err}");
553                    failed_relays.insert(relay.clone());
554                    continue;
555                }
556            };
557            relay_negentropy_support.insert(relay.clone(), fetched_from_relay.supports_negentropy);
558            events_seen = events_seen.saturating_add(fetched_from_relay.events_seen);
559            for event in fetched_from_relay.events {
560                if self.kind_allowed(event.kind)
561                    && known.insert(event.id.clone(), event.clone()).is_none()
562                {
563                    fetched.insert(event.id.clone(), event);
564                }
565            }
566        }
567
568        let mut fetched_by_author = BTreeMap::<String, Vec<StoredNostrEvent>>::new();
569        for event in fetched.into_values() {
570            fetched_by_author
571                .entry(event.pubkey.clone())
572                .or_default()
573                .push(event);
574        }
575
576        let mut selected = Vec::new();
577        for author in author_batch {
578            let mut merged: BTreeMap<String, StoredNostrEvent> = BTreeMap::new();
579            if let Some(existing_events) = existing_by_author.remove(author) {
580                for event in existing_events {
581                    merged.insert(event.id.clone(), event);
582                }
583            }
584            if let Some(events) = fetched_by_author.remove(author) {
585                for event in events {
586                    merged.insert(event.id.clone(), event);
587                }
588            }
589            selected.extend(self.select_author_events(merged.into_values().collect())?);
590        }
591
592        let selected = selected
593            .into_iter()
594            .filter(|event| self.is_valid_stored_event(event))
595            .collect::<Vec<_>>();
596        let (selected, live_bytes_selected) =
597            self.apply_live_byte_cap_from(selected, live_bytes_selected_so_far)?;
598
599        Ok(BatchCrawlReport {
600            events_seen,
601            events: selected,
602            live_bytes_selected,
603        })
604    }
605
606    async fn crawl_global_recent_incremental(
607        &self,
608        client: &Client,
609        authors: &[String],
610        mut state: GlobalRecentState,
611        on_progress: &mut impl FnMut(&CrawlReport),
612    ) -> Result<CrawlReport> {
613        let authors = authors.iter().map(String::as_str).collect::<BTreeSet<_>>();
614        let mut known_ids = state
615            .retained_by_author
616            .values()
617            .flat_map(|events| events.iter().map(|event| event.id.clone()))
618            .collect::<BTreeSet<_>>();
619        let mut authors_processed = state
620            .retained_by_author
621            .values()
622            .filter(|events| !events.is_empty())
623            .count();
624        let mut failed_relays = BTreeSet::<String>::new();
625        let mut events_seen = 0usize;
626
627        for relay in &self.config.relays {
628            if failed_relays.contains(relay) {
629                continue;
630            }
631            let mut until = None;
632            for _ in 0..self.config.max_relay_pages {
633                let filter = self.global_recent_filter(until);
634                let events = match client
635                    .get_events_from([relay], vec![filter], Some(self.config.fetch_timeout))
636                    .await
637                {
638                    Ok(events) => events,
639                    Err(err) => {
640                        eprintln!("Skipping relay {relay}: {}", err);
641                        failed_relays.insert(relay.clone());
642                        break;
643                    }
644                };
645                let fetched_count = events.len();
646                events_seen = events_seen.saturating_add(fetched_count);
647                if fetched_count == 0 {
648                    break;
649                }
650
651                let mut min_created_at = u64::MAX;
652                let mut pending_apply = Vec::new();
653                for event in events {
654                    min_created_at = min_created_at.min(event.created_at.as_u64());
655                    if event.kind.is_ephemeral() {
656                        continue;
657                    }
658
659                    let stored = stored_event_from_nostr(&event);
660                    if !authors.contains(stored.pubkey.as_str()) || !self.kind_allowed(stored.kind)
661                    {
662                        continue;
663                    }
664
665                    let retained = state
666                        .retained_by_author
667                        .entry(stored.pubkey.clone())
668                        .or_default();
669                    let was_empty = retained.is_empty();
670                    let old_len = retained.len();
671                    let old_live_bytes = self.encoded_events_size(retained)?;
672                    let mut merged = BTreeMap::<String, StoredNostrEvent>::new();
673                    for existing in retained.drain(..) {
674                        merged.insert(existing.id.clone(), existing);
675                    }
676                    merged.insert(stored.id.clone(), stored);
677
678                    let selected = self.select_author_events(merged.into_values().collect())?;
679                    let selected_live_bytes = self.encoded_events_size(&selected)?;
680                    let mut newly_retained = Vec::new();
681                    for selected_event in &selected {
682                        if !known_ids.contains(&selected_event.id) {
683                            known_ids.insert(selected_event.id.clone());
684                            newly_retained.push(selected_event.clone());
685                        }
686                    }
687
688                    state.events_selected = state
689                        .events_selected
690                        .saturating_sub(old_len)
691                        .saturating_add(selected.len());
692                    state.live_bytes_selected = state
693                        .live_bytes_selected
694                        .saturating_sub(old_live_bytes)
695                        .saturating_add(selected_live_bytes);
696                    if was_empty && !selected.is_empty() {
697                        authors_processed = authors_processed.saturating_add(1);
698                    }
699                    *retained = selected;
700                    pending_apply.extend(newly_retained);
701                }
702
703                if !pending_apply.is_empty() {
704                    state.current_root = self
705                        .event_store
706                        .build(state.current_root.as_ref(), pending_apply)
707                        .await?;
708                }
709
710                on_progress(&CrawlReport {
711                    root: state.current_root.clone(),
712                    authors_considered: authors.len(),
713                    authors_processed,
714                    events_seen,
715                    events_selected: state.events_selected,
716                    live_bytes_selected: state.live_bytes_selected,
717                });
718
719                if min_created_at == u64::MAX || min_created_at == 0 {
720                    break;
721                }
722                let next_until = min_created_at.saturating_sub(1);
723                if until == Some(next_until) {
724                    break;
725                }
726                until = Some(next_until);
727                if self.reached_events_seen_limit(events_seen) {
728                    break;
729                }
730            }
731            if self.reached_events_seen_limit(events_seen) {
732                break;
733            }
734        }
735
736        Ok(CrawlReport {
737            root: state.current_root,
738            authors_considered: authors.len(),
739            authors_processed,
740            events_seen,
741            events_selected: state.events_selected,
742            live_bytes_selected: state.live_bytes_selected,
743        })
744    }
745
746    fn batch_filter(&self, pubkeys: Vec<PublicKey>) -> Filter {
747        let mut filter = Filter::new().authors(pubkeys);
748        if let Some(kinds) = &self.config.kinds {
749            filter = filter.kinds(kinds.iter().copied().map(Kind::from));
750        }
751        let relay_limit = self
752            .config
753            .author_batch_size
754            .saturating_mul(self.config.per_author_event_limit);
755        if relay_limit > 0 {
756            filter = filter.limit(relay_limit);
757        }
758        filter
759    }
760
761    fn global_recent_filter(&self, until: Option<u64>) -> Filter {
762        let mut filter = Filter::new().limit(self.config.relay_page_size);
763        if let Some(kinds) = &self.config.kinds {
764            filter = filter.kinds(kinds.iter().copied().map(Kind::from));
765        }
766        if let Some(until) = until {
767            filter = filter.until(Timestamp::from_secs(until));
768        }
769        filter
770    }
771
772    fn reached_events_seen_limit(&self, events_seen: usize) -> bool {
773        self.config
774            .max_events_seen
775            .is_some_and(|limit| events_seen >= limit)
776    }
777
778    fn is_valid_stored_event(&self, event: &StoredNostrEvent) -> bool {
779        self.event_store.encode_event(event).is_ok()
780    }
781
782    fn encoded_events_size(&self, events: &[StoredNostrEvent]) -> Result<u64> {
783        let mut total = 0u64;
784        for event in events {
785            total = total.saturating_add(self.event_store.encode_event(event)?.len() as u64);
786        }
787        Ok(total)
788    }
789
790    fn local_items_for_batch<'a, I>(
791        &self,
792        known_events: I,
793        author_batch: &[String],
794    ) -> Vec<(EventId, Timestamp)>
795    where
796        I: Iterator<Item = &'a StoredNostrEvent>,
797    {
798        let authors = author_batch
799            .iter()
800            .map(String::as_str)
801            .collect::<BTreeSet<_>>();
802
803        known_events
804            .filter(|event| {
805                authors.contains(event.pubkey.as_str()) && self.kind_allowed(event.kind)
806            })
807            .filter_map(|event| {
808                let event_id = EventId::parse(&event.id).ok()?;
809                Some((event_id, Timestamp::from_secs(event.created_at)))
810            })
811            .collect()
812    }
813
814    async fn load_retained_events(
815        &self,
816        root: Option<&Cid>,
817        author: &str,
818    ) -> Result<Vec<StoredNostrEvent>> {
819        match self
820            .event_store
821            .list_by_author(root, author, ListEventsOptions::default())
822            .await
823        {
824            Ok(events) => Ok(events),
825            Err(NostrEventStoreError::Validation(message))
826                if message == "stored nostr event blob is missing" =>
827            {
828                eprintln!(
829                    "Ignoring stale indexed event references for author {}: {}",
830                    author, message
831                );
832                Ok(Vec::new())
833            }
834            Err(err) => Err(err.into()),
835        }
836    }
837
838    async fn fetch_events_from_relay(
839        &self,
840        client: &Client,
841        relay: &str,
842        filter: Filter,
843        local_items: Vec<(EventId, Timestamp)>,
844        supports_negentropy: Option<bool>,
845    ) -> Result<RelayFetchResult> {
846        if supports_negentropy == Some(false) {
847            if self.config.require_negentropy {
848                return Ok(RelayFetchResult {
849                    events_seen: 0,
850                    events: Vec::new(),
851                    supports_negentropy: false,
852                });
853            }
854            return self
855                .fetch_full_filter(client, relay, filter)
856                .await
857                .map(|events| RelayFetchResult {
858                    events_seen: events.len(),
859                    events,
860                    supports_negentropy: false,
861                });
862        }
863
864        match client
865            .reconcile_advanced(
866                [relay],
867                filter.clone(),
868                local_items,
869                NegentropyOptions::default().dry_run(),
870            )
871            .await
872        {
873            Ok(output) if !output.success.is_empty() => {
874                let missing = output.remote.iter().cloned().collect::<Vec<_>>();
875                self.fetch_missing_ids(client, relay, missing).await.map(
876                    |RelayFetchResult {
877                         events_seen,
878                         events,
879                         ..
880                     }| RelayFetchResult {
881                        events_seen,
882                        events,
883                        supports_negentropy: true,
884                    },
885                )
886            }
887            Ok(_) | Err(_) => {
888                if self.config.require_negentropy {
889                    Ok(RelayFetchResult {
890                        events_seen: 0,
891                        events: Vec::new(),
892                        supports_negentropy: false,
893                    })
894                } else {
895                    self.fetch_full_filter(client, relay, filter)
896                        .await
897                        .map(|events| RelayFetchResult {
898                            events_seen: events.len(),
899                            events,
900                            supports_negentropy: false,
901                        })
902                }
903            }
904        }
905    }
906
907    async fn fetch_missing_ids(
908        &self,
909        client: &Client,
910        relay: &str,
911        missing_ids: Vec<EventId>,
912    ) -> Result<RelayFetchResult> {
913        if missing_ids.is_empty() {
914            return Ok(RelayFetchResult {
915                events_seen: 0,
916                events: Vec::new(),
917                supports_negentropy: true,
918            });
919        }
920
921        let mut out = BTreeMap::<String, StoredNostrEvent>::new();
922        let mut events_seen = 0usize;
923        for chunk in missing_ids.chunks(NEGENTROPY_FETCH_CHUNK_SIZE) {
924            let filter = Filter::new().ids(chunk.iter().cloned());
925            let events = client
926                .get_events_from([relay], vec![filter], Some(self.config.fetch_timeout))
927                .await
928                .map_err(|err| CrawlError::Nostr(err.to_string()))?;
929            events_seen = events_seen.saturating_add(events.len());
930            for event in events {
931                if event.kind.is_ephemeral() {
932                    continue;
933                }
934                let stored = stored_event_from_nostr(&event);
935                out.insert(stored.id.clone(), stored);
936            }
937        }
938        Ok(RelayFetchResult {
939            events_seen,
940            events: out.into_values().collect(),
941            supports_negentropy: true,
942        })
943    }
944
945    async fn fetch_full_filter(
946        &self,
947        client: &Client,
948        relay: &str,
949        filter: Filter,
950    ) -> Result<Vec<StoredNostrEvent>> {
951        let mut out = Vec::new();
952        let events = client
953            .get_events_from([relay], vec![filter], Some(self.config.fetch_timeout))
954            .await
955            .map_err(|err| CrawlError::Nostr(err.to_string()))?;
956
957        for event in events {
958            if event.kind.is_ephemeral() {
959                continue;
960            }
961            out.push(stored_event_from_nostr(&event));
962        }
963
964        Ok(out)
965    }
966
967    fn select_author_events(&self, events: Vec<StoredNostrEvent>) -> Result<Vec<StoredNostrEvent>> {
968        self.select_author_events_with_limits(
969            events,
970            self.config.per_author_event_limit,
971            self.config.per_author_live_bytes,
972        )
973    }
974
975    fn select_author_events_with_limits(
976        &self,
977        mut events: Vec<StoredNostrEvent>,
978        event_limit: usize,
979        live_byte_limit: Option<u64>,
980    ) -> Result<Vec<StoredNostrEvent>> {
981        events.sort_by(|left, right| {
982            self.policy
983                .priority(right)
984                .cmp(&self.policy.priority(left))
985                .then_with(|| right.created_at.cmp(&left.created_at))
986                .then_with(|| left.id.cmp(&right.id))
987        });
988
989        if let Some(max_live_bytes) = live_byte_limit {
990            let mut selected = Vec::new();
991            let mut live_bytes_selected = 0u64;
992            for event in events {
993                let encoded_len = self.event_store.encode_event(&event)?.len() as u64;
994                if live_bytes_selected.saturating_add(encoded_len) > max_live_bytes {
995                    continue;
996                }
997                live_bytes_selected = live_bytes_selected.saturating_add(encoded_len);
998                selected.push(event);
999            }
1000            selected.truncate(event_limit);
1001            return Ok(selected);
1002        }
1003
1004        events.truncate(event_limit);
1005        Ok(events)
1006    }
1007
1008    fn apply_live_byte_cap_from(
1009        &self,
1010        mut events: Vec<StoredNostrEvent>,
1011        live_bytes_selected_so_far: u64,
1012    ) -> Result<(Vec<StoredNostrEvent>, u64)> {
1013        events.sort_by(|left, right| {
1014            self.policy
1015                .priority(right)
1016                .cmp(&self.policy.priority(left))
1017                .then_with(|| right.created_at.cmp(&left.created_at))
1018                .then_with(|| left.id.cmp(&right.id))
1019        });
1020
1021        let Some(max_live_bytes) = self.config.max_live_bytes else {
1022            let live_bytes_selected =
1023                events
1024                    .iter()
1025                    .try_fold(live_bytes_selected_so_far, |total, event| {
1026                        let encoded = self.event_store.encode_event(event)?;
1027                        Ok::<u64, NostrEventStoreError>(total.saturating_add(encoded.len() as u64))
1028                    })?;
1029            return Ok((events, live_bytes_selected));
1030        };
1031
1032        let mut selected = Vec::new();
1033        let mut live_bytes_selected = live_bytes_selected_so_far;
1034        for event in events {
1035            let encoded_len = self.event_store.encode_event(&event)?.len() as u64;
1036            if live_bytes_selected.saturating_add(encoded_len) > max_live_bytes {
1037                continue;
1038            }
1039            live_bytes_selected = live_bytes_selected.saturating_add(encoded_len);
1040            selected.push(event);
1041        }
1042
1043        Ok((selected, live_bytes_selected))
1044    }
1045
1046    fn kind_allowed(&self, kind: u32) -> bool {
1047        self.config.kinds.as_ref().is_none_or(|allowed| {
1048            allowed
1049                .iter()
1050                .any(|candidate| u32::from(*candidate) == kind)
1051        })
1052    }
1053}
1054
1055fn stored_event_from_nostr(event: &nostr_sdk::Event) -> StoredNostrEvent {
1056    StoredNostrEvent {
1057        id: event.id.to_hex(),
1058        pubkey: event.pubkey.to_hex(),
1059        created_at: event.created_at.as_u64(),
1060        kind: event.kind.as_u16() as u32,
1061        tags: event
1062            .tags
1063            .iter()
1064            .map(|tag| tag.as_slice().to_vec())
1065            .collect(),
1066        content: event.content.clone(),
1067        sig: event.sig.to_string(),
1068    }
1069}
1070
1071fn is_valid_hex_pubkey(value: &str) -> bool {
1072    value.len() == 64
1073        && value
1074            .bytes()
1075            .all(|byte| byte.is_ascii_digit() || (b'a'..=b'f').contains(&byte))
1076}
1077
1078#[cfg(test)]
1079mod tests {
1080    use std::sync::Arc;
1081
1082    use hashtree_core::MemoryStore;
1083    use nostr_social_graph::{NostrEvent, SocialGraphBackend as NostrSocialGraphBackend};
1084
1085    use super::{CrawlConfig, NostrBridge, StoredNostrEvent};
1086
1087    #[derive(Default)]
1088    struct FakeGraphBackend;
1089
1090    impl NostrSocialGraphBackend for FakeGraphBackend {
1091        type Error = std::io::Error;
1092
1093        fn get_root(&self) -> std::result::Result<String, Self::Error> {
1094            Ok("0".repeat(64))
1095        }
1096
1097        fn set_root(&mut self, _root: &str) -> std::result::Result<(), Self::Error> {
1098            Ok(())
1099        }
1100
1101        fn handle_event(
1102            &mut self,
1103            _event: &NostrEvent,
1104            _allow_unknown_authors: bool,
1105            _overmute_threshold: f64,
1106        ) -> std::result::Result<(), Self::Error> {
1107            Ok(())
1108        }
1109
1110        fn get_follow_distance(&self, _user: &str) -> std::result::Result<u32, Self::Error> {
1111            Ok(0)
1112        }
1113
1114        fn is_following(
1115            &self,
1116            _follower: &str,
1117            _followed_user: &str,
1118        ) -> std::result::Result<bool, Self::Error> {
1119            Ok(false)
1120        }
1121
1122        fn get_followed_by_user(
1123            &self,
1124            user: &str,
1125        ) -> std::result::Result<Vec<String>, Self::Error> {
1126            if user == "0".repeat(64) {
1127                return Ok(vec![
1128                    "1".repeat(64),
1129                    "NOT-HEX".to_string(),
1130                    "a".repeat(63),
1131                    "A".repeat(64),
1132                ]);
1133            }
1134            Ok(Vec::new())
1135        }
1136
1137        fn get_followers_by_user(
1138            &self,
1139            _user: &str,
1140        ) -> std::result::Result<Vec<String>, Self::Error> {
1141            Ok(Vec::new())
1142        }
1143
1144        fn get_muted_by_user(&self, _user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1145            Ok(Vec::new())
1146        }
1147
1148        fn get_user_muted_by(&self, _user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1149            Ok(Vec::new())
1150        }
1151
1152        fn get_follow_list_created_at(
1153            &self,
1154            _user: &str,
1155        ) -> std::result::Result<Option<u64>, Self::Error> {
1156            Ok(None)
1157        }
1158
1159        fn get_mute_list_created_at(
1160            &self,
1161            _user: &str,
1162        ) -> std::result::Result<Option<u64>, Self::Error> {
1163            Ok(None)
1164        }
1165
1166        fn is_overmuted(
1167            &self,
1168            _user: &str,
1169            _threshold: f64,
1170        ) -> std::result::Result<bool, Self::Error> {
1171            Ok(false)
1172        }
1173    }
1174
1175    #[test]
1176    fn rejects_invalid_stored_event_shape() {
1177        let bridge = NostrBridge::new(Arc::new(MemoryStore::new()), CrawlConfig::default());
1178        let invalid = StoredNostrEvent {
1179            id: "f".repeat(64),
1180            pubkey: "not-hex".to_string(),
1181            created_at: 1,
1182            kind: 1,
1183            tags: Vec::new(),
1184            content: String::new(),
1185            sig: "f".repeat(128),
1186        };
1187
1188        assert!(!bridge.is_valid_stored_event(&invalid));
1189    }
1190
1191    #[test]
1192    fn collect_authors_skips_invalid_graph_pubkeys() {
1193        let bridge = NostrBridge::new(Arc::new(MemoryStore::new()), CrawlConfig::default());
1194        let authors = bridge
1195            .collect_authors(&FakeGraphBackend)
1196            .expect("collect authors");
1197
1198        assert_eq!(authors, vec!["0".repeat(64), "1".repeat(64)]);
1199    }
1200
1201    #[test]
1202    fn collect_authors_prefers_allowlist_and_applies_limits() {
1203        let bridge = NostrBridge::new(
1204            Arc::new(MemoryStore::new()),
1205            CrawlConfig {
1206                author_allowlist: Some(vec![
1207                    "1".repeat(64),
1208                    "NOT-HEX".to_string(),
1209                    "0".repeat(64),
1210                    "1".repeat(64),
1211                ]),
1212                max_authors: Some(1),
1213                ..CrawlConfig::default()
1214            },
1215        );
1216        let authors = bridge
1217            .collect_authors(&FakeGraphBackend)
1218            .expect("collect authors");
1219
1220        assert_eq!(authors, vec!["1".repeat(64)]);
1221    }
1222}