Skip to main content

hashtree_nostr/
crawl.rs

1use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
2use std::sync::Arc;
3use std::time::Duration;
4
5use crate::{ListEventsOptions, NostrEventStore, NostrEventStoreError, StoredNostrEvent};
6use futures::{stream, StreamExt};
7use hashtree_core::{Cid, Store};
8use nostr_sdk::{
9    pool::RelayLimits, Client, ClientOptions, EventId, Filter, Keys, Kind, PublicKey, SyncOptions,
10    Timestamp,
11};
12use nostr_social_graph::SocialGraphBackend;
13
14const NEGENTROPY_FETCH_CHUNK_SIZE: usize = 256;
15const NEGENTROPY_FETCH_CHUNK_CONCURRENCY: usize = 16;
16const NEGENTROPY_INITIAL_TIMEOUT: Duration = Duration::from_secs(1);
17const FULL_HISTORY_PAGING_CONCURRENCY_PER_RELAY: usize = 64;
18const METADATA_KIND: u32 = 0;
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub enum RelayFetchMode {
21    AuthorBatches,
22    GlobalRecent,
23}
24
25#[derive(Debug, Clone)]
26pub struct CrawlConfig {
27    pub relays: Vec<String>,
28    pub author_allowlist: Option<Vec<String>>,
29    pub max_live_bytes: Option<u64>,
30    pub max_events_seen: Option<usize>,
31    pub max_authors: Option<usize>,
32    pub max_follow_distance: Option<u32>,
33    pub author_batch_size: usize,
34    pub per_author_event_limit: usize,
35    pub per_author_live_bytes: Option<u64>,
36    pub fetch_timeout: Duration,
37    pub kinds: Option<Vec<u16>>,
38    pub relay_fetch_mode: RelayFetchMode,
39    pub require_negentropy: bool,
40    pub relay_event_max_size: Option<u32>,
41    pub relay_page_size: usize,
42    pub max_relay_pages: usize,
43    pub full_author_history: bool,
44}
45
46impl Default for CrawlConfig {
47    fn default() -> Self {
48        Self {
49            relays: Vec::new(),
50            author_allowlist: None,
51            max_live_bytes: None,
52            max_events_seen: None,
53            max_authors: None,
54            max_follow_distance: Some(1),
55            author_batch_size: 64,
56            per_author_event_limit: 256,
57            per_author_live_bytes: None,
58            fetch_timeout: Duration::from_secs(10),
59            kinds: None,
60            relay_fetch_mode: RelayFetchMode::AuthorBatches,
61            require_negentropy: false,
62            relay_event_max_size: None,
63            relay_page_size: 1_000,
64            max_relay_pages: 10,
65            full_author_history: false,
66        }
67    }
68}
69
70#[derive(Debug, Clone, Default, PartialEq)]
71pub struct CrawlReport {
72    pub root: Option<Cid>,
73    pub authors_considered: usize,
74    pub authors_processed: usize,
75    pub events_seen: usize,
76    pub events_selected: usize,
77    pub live_bytes_selected: u64,
78    pub applied_events: Vec<StoredNostrEvent>,
79}
80
81pub trait EventSelectionPolicy: Send + Sync {
82    fn priority(&self, event: &StoredNostrEvent) -> i32;
83}
84
85#[derive(Debug, Clone)]
86pub struct KindPriorityPolicy {
87    default_priority: i32,
88    priorities: BTreeMap<u32, i32>,
89}
90
91impl Default for KindPriorityPolicy {
92    fn default() -> Self {
93        let mut priorities = BTreeMap::new();
94        priorities.insert(1, 1_000);
95        priorities.insert(0, 900);
96        priorities.insert(3, 800);
97        priorities.insert(10_000, 750);
98        priorities.insert(6, 600);
99        priorities.insert(7, 500);
100        Self {
101            default_priority: 100,
102            priorities,
103        }
104    }
105}
106
107impl KindPriorityPolicy {
108    pub fn with_priority(mut self, kind: u32, priority: i32) -> Self {
109        self.priorities.insert(kind, priority);
110        self
111    }
112}
113
114impl EventSelectionPolicy for KindPriorityPolicy {
115    fn priority(&self, event: &StoredNostrEvent) -> i32 {
116        self.priorities
117            .get(&event.kind)
118            .copied()
119            .unwrap_or(self.default_priority)
120    }
121}
122
123#[derive(Debug, thiserror::Error)]
124pub enum CrawlError {
125    #[error("event store error: {0}")]
126    EventStore(#[from] NostrEventStoreError),
127    #[error("crawl requires at least one relay")]
128    MissingRelays,
129    #[error("per-author event limit must be greater than zero")]
130    InvalidPerAuthorLimit,
131    #[error("per-author live byte cap must be greater than zero")]
132    InvalidPerAuthorLiveBytes,
133    #[error("author batch size must be greater than zero")]
134    InvalidAuthorBatchSize,
135    #[error("relay page size must be greater than zero")]
136    InvalidRelayPageSize,
137    #[error("max relay pages must be greater than zero")]
138    InvalidMaxRelayPages,
139    #[error("max events seen must be greater than zero")]
140    InvalidMaxEventsSeen,
141    #[error("relay event max size must be greater than zero")]
142    InvalidRelayEventMaxSize,
143    #[error("nostr error: {0}")]
144    Nostr(String),
145    #[error("social graph error: {0}")]
146    SocialGraph(String),
147}
148
149pub type Result<T> = std::result::Result<T, CrawlError>;
150
151#[derive(Debug, Default)]
152struct RelayFetchResult {
153    events_seen: usize,
154    events: Vec<StoredNostrEvent>,
155    supports_negentropy: bool,
156}
157
158#[derive(Debug, Default)]
159struct BatchCrawlReport {
160    events_seen: usize,
161    events_selected: usize,
162    events: Vec<StoredNostrEvent>,
163    live_bytes_selected: u64,
164}
165
166#[derive(Debug, Default)]
167struct ProfileBatchReport {
168    events_seen: usize,
169    events_by_author: BTreeMap<String, Vec<StoredNostrEvent>>,
170}
171
172#[derive(Debug, Default)]
173struct GlobalRecentState {
174    current_root: Option<Cid>,
175    retained_by_author: BTreeMap<String, Vec<StoredNostrEvent>>,
176    events_selected: usize,
177    live_bytes_selected: u64,
178}
179
180pub struct NostrBridge<S: Store> {
181    event_store: NostrEventStore<S>,
182    config: CrawlConfig,
183    policy: Arc<dyn EventSelectionPolicy>,
184}
185
186impl<S: Store> NostrBridge<S> {
187    pub fn new(store: Arc<S>, config: CrawlConfig) -> Self {
188        Self {
189            event_store: NostrEventStore::new(store),
190            config,
191            policy: Arc::new(KindPriorityPolicy::default()),
192        }
193    }
194
195    pub fn with_policy(mut self, policy: Arc<dyn EventSelectionPolicy>) -> Self {
196        self.policy = policy;
197        self
198    }
199
200    pub async fn crawl<G: SocialGraphBackend>(
201        &self,
202        graph: &G,
203        existing_root: Option<&Cid>,
204    ) -> Result<CrawlReport> {
205        self.crawl_with_progress(graph, existing_root, |_| {}).await
206    }
207
208    pub async fn crawl_with_progress<G, F>(
209        &self,
210        graph: &G,
211        existing_root: Option<&Cid>,
212        mut on_progress: F,
213    ) -> Result<CrawlReport>
214    where
215        G: SocialGraphBackend,
216        F: FnMut(&CrawlReport),
217    {
218        self.validate_config()?;
219
220        let authors = self.collect_authors(graph)?;
221        if authors.is_empty() {
222            return Ok(CrawlReport::default());
223        }
224
225        let existing_root = self.usable_existing_root(existing_root).await?;
226        let client = self.connect_client().await?;
227
228        let result = if self.config.relay_fetch_mode == RelayFetchMode::AuthorBatches {
229            self.crawl_author_batches(&client, &authors, existing_root.as_ref(), &mut on_progress)
230                .await
231        } else {
232            let state = self
233                .load_existing_global_state(existing_root.as_ref(), &authors)
234                .await?;
235
236            let report = self
237                .crawl_global_recent_incremental(&client, &authors, state, &mut on_progress)
238                .await?;
239            on_progress(&report);
240            Ok(report)
241        };
242
243        let _ = client.disconnect().await;
244        result
245    }
246
247    async fn usable_existing_root(&self, root: Option<&Cid>) -> Result<Option<Cid>> {
248        let Some(root) = root else {
249            return Ok(None);
250        };
251
252        match self.event_store.validate_index_root(Some(root)).await {
253            Ok(()) => Ok(Some(root.clone())),
254            Err(err) => {
255                eprintln!(
256                    "Ignoring invalid existing Nostr event index root {}: {err}",
257                    hex::encode(root.hash)
258                );
259                Ok(None)
260            }
261        }
262    }
263
264    async fn crawl_author_batches(
265        &self,
266        client: &Client,
267        authors: &[String],
268        existing_root: Option<&Cid>,
269        on_progress: &mut impl FnMut(&CrawlReport),
270    ) -> Result<CrawlReport> {
271        let mut relay_negentropy_support = BTreeMap::<String, bool>::new();
272        let mut failed_relays = BTreeSet::<String>::new();
273        let mut current_root = existing_root.cloned();
274        let mut events_seen = 0usize;
275        let mut events_selected = 0usize;
276        let mut live_bytes_selected = 0u64;
277        let mut authors_processed = 0usize;
278        let mut applied_events = Vec::new();
279
280        for author_batch in authors.chunks(self.config.author_batch_size) {
281            let batch = self
282                .crawl_author_batch(
283                    client,
284                    author_batch,
285                    current_root.as_ref(),
286                    &mut relay_negentropy_support,
287                    &mut failed_relays,
288                    live_bytes_selected,
289                )
290                .await?;
291            events_seen = events_seen.saturating_add(batch.events_seen);
292            events_selected = events_selected.saturating_add(batch.events_selected);
293            live_bytes_selected = batch.live_bytes_selected;
294            authors_processed = authors_processed.saturating_add(author_batch.len());
295            if !batch.events.is_empty() {
296                applied_events.extend(batch.events.clone());
297                current_root = self
298                    .event_store
299                    .build(current_root.as_ref(), batch.events)
300                    .await?;
301            }
302            on_progress(&CrawlReport {
303                root: current_root.clone(),
304                authors_considered: authors.len(),
305                authors_processed,
306                events_seen,
307                events_selected,
308                live_bytes_selected,
309                applied_events: Vec::new(),
310            });
311            if self.reached_events_seen_limit(events_seen) {
312                break;
313            }
314        }
315
316        Ok(CrawlReport {
317            root: current_root,
318            authors_considered: authors.len(),
319            authors_processed,
320            events_seen,
321            events_selected,
322            live_bytes_selected,
323            applied_events,
324        })
325    }
326
327    async fn load_existing_global_state(
328        &self,
329        root: Option<&Cid>,
330        authors: &[String],
331    ) -> Result<GlobalRecentState> {
332        let Some(root) = root else {
333            return Ok(GlobalRecentState::default());
334        };
335
336        match self
337            .event_store
338            .list_recent(Some(root), ListEventsOptions::default())
339            .await
340        {
341            Ok(events) => {
342                let author_set = authors.iter().map(String::as_str).collect::<BTreeSet<_>>();
343                let mut retained_by_author = BTreeMap::<String, Vec<StoredNostrEvent>>::new();
344                for event in events {
345                    if !author_set.contains(event.pubkey.as_str()) || !self.kind_allowed(event.kind)
346                    {
347                        continue;
348                    }
349                    if !self.is_valid_stored_event(&event) {
350                        continue;
351                    }
352                    retained_by_author
353                        .entry(event.pubkey.clone())
354                        .or_default()
355                        .push(event);
356                }
357
358                let mut state = GlobalRecentState {
359                    current_root: Some(root.clone()),
360                    ..GlobalRecentState::default()
361                };
362                for (author, events) in retained_by_author {
363                    let selected = self.select_author_events(events)?;
364                    state.events_selected = state.events_selected.saturating_add(selected.len());
365                    state.live_bytes_selected = state
366                        .live_bytes_selected
367                        .saturating_add(self.encoded_events_size(&selected)?);
368                    state.retained_by_author.insert(author, selected);
369                }
370                Ok(state)
371            }
372            Err(NostrEventStoreError::Validation(message))
373                if message == "stored nostr event blob is missing" =>
374            {
375                eprintln!(
376                    "Falling back to per-author resume for existing root due to missing event blobs"
377                );
378                let mut state = self
379                    .load_existing_global_state_by_author(Some(root), authors)
380                    .await?;
381                state.current_root = self.rebuild_root_from_retained_state(&state).await?;
382                Ok(state)
383            }
384            Err(err) => Err(err.into()),
385        }
386    }
387
388    async fn load_existing_global_state_by_author(
389        &self,
390        root: Option<&Cid>,
391        authors: &[String],
392    ) -> Result<GlobalRecentState> {
393        let mut state = GlobalRecentState {
394            current_root: root.cloned(),
395            ..GlobalRecentState::default()
396        };
397        for author in authors {
398            let retained = self
399                .load_retained_events(root, author)
400                .await?
401                .into_iter()
402                .filter(|event| self.kind_allowed(event.kind))
403                .filter(|event| self.is_valid_stored_event(event))
404                .collect::<Vec<_>>();
405            let retained = self.select_author_events(retained)?;
406            state.events_selected = state.events_selected.saturating_add(retained.len());
407            state.live_bytes_selected = state
408                .live_bytes_selected
409                .saturating_add(self.encoded_events_size(&retained)?);
410            state.retained_by_author.insert(author.clone(), retained);
411        }
412        Ok(state)
413    }
414
415    async fn rebuild_root_from_retained_state(
416        &self,
417        state: &GlobalRecentState,
418    ) -> Result<Option<Cid>> {
419        let events = state
420            .retained_by_author
421            .values()
422            .flat_map(|events| events.iter().cloned())
423            .collect::<Vec<_>>();
424        self.event_store
425            .build(None, events)
426            .await
427            .map_err(Into::into)
428    }
429
430    fn validate_config(&self) -> Result<()> {
431        if self.config.relays.is_empty() {
432            return Err(CrawlError::MissingRelays);
433        }
434        if self.config.per_author_event_limit == 0 {
435            return Err(CrawlError::InvalidPerAuthorLimit);
436        }
437        if self.config.per_author_live_bytes == Some(0) {
438            return Err(CrawlError::InvalidPerAuthorLiveBytes);
439        }
440        if self.config.author_batch_size == 0 {
441            return Err(CrawlError::InvalidAuthorBatchSize);
442        }
443        if self.config.relay_page_size == 0 {
444            return Err(CrawlError::InvalidRelayPageSize);
445        }
446        if self.config.max_relay_pages == 0 && !self.config.full_author_history {
447            return Err(CrawlError::InvalidMaxRelayPages);
448        }
449        if self.config.max_events_seen == Some(0) {
450            return Err(CrawlError::InvalidMaxEventsSeen);
451        }
452        if self.config.relay_event_max_size == Some(0) {
453            return Err(CrawlError::InvalidRelayEventMaxSize);
454        }
455        Ok(())
456    }
457
458    fn collect_authors<G: SocialGraphBackend>(&self, graph: &G) -> Result<Vec<String>> {
459        if let Some(author_allowlist) = &self.config.author_allowlist {
460            let mut seen = HashSet::new();
461            let mut authors = Vec::new();
462            for author in author_allowlist {
463                if !is_valid_hex_pubkey(author) {
464                    continue;
465                }
466                if seen.insert(author.clone()) {
467                    authors.push(author.clone());
468                }
469            }
470            if let Some(max_authors) = self.config.max_authors {
471                authors.truncate(max_authors);
472            }
473            return Ok(authors);
474        }
475
476        let root = graph
477            .get_root()
478            .map_err(|err| CrawlError::SocialGraph(err.to_string()))?;
479        let mut visited = BTreeSet::new();
480        let mut authors = Vec::new();
481        let mut queue = VecDeque::from([(root.clone(), 0u32)]);
482        visited.insert(root);
483
484        while let Some((author, distance)) = queue.pop_front() {
485            if !is_valid_hex_pubkey(&author) {
486                continue;
487            }
488            authors.push(author.clone());
489            if self
490                .config
491                .max_authors
492                .is_some_and(|max_authors| authors.len() >= max_authors)
493            {
494                break;
495            }
496            if self
497                .config
498                .max_follow_distance
499                .is_some_and(|max_distance| distance >= max_distance)
500            {
501                continue;
502            }
503
504            let mut follows = graph
505                .get_followed_by_user(&author)
506                .map_err(|err| CrawlError::SocialGraph(err.to_string()))?;
507            follows.retain(|followed| is_valid_hex_pubkey(followed));
508            follows.sort();
509            for followed in follows {
510                if visited.insert(followed.clone()) {
511                    queue.push_back((followed, distance.saturating_add(1)));
512                }
513            }
514        }
515
516        Ok(authors)
517    }
518
519    async fn connect_client(&self) -> Result<Client> {
520        let client = if let Some(max_size) = self.config.relay_event_max_size {
521            let mut limits = RelayLimits::default();
522            limits.events.max_size = Some(max_size);
523            Client::builder()
524                .signer(Keys::generate())
525                .opts(ClientOptions::new().relay_limits(limits))
526                .build()
527        } else {
528            Client::new(Keys::generate())
529        };
530        for relay in &self.config.relays {
531            client
532                .add_relay(relay)
533                .await
534                .map_err(|err| CrawlError::Nostr(err.to_string()))?;
535        }
536        client.connect().await;
537        tokio::time::sleep(Duration::from_millis(250)).await;
538        Ok(client)
539    }
540
541    async fn crawl_author_batch(
542        &self,
543        client: &Client,
544        author_batch: &[String],
545        current_root: Option<&Cid>,
546        relay_negentropy_support: &mut BTreeMap<String, bool>,
547        failed_relays: &mut BTreeSet<String>,
548        live_bytes_selected_so_far: u64,
549    ) -> Result<BatchCrawlReport> {
550        let mut existing_by_author = BTreeMap::<String, Vec<StoredNostrEvent>>::new();
551        let mut known = BTreeMap::<String, StoredNostrEvent>::new();
552        for author in author_batch {
553            let retained = self
554                .load_retained_events(current_root, author)
555                .await?
556                .into_iter()
557                .filter(|event| self.kind_allowed(event.kind))
558                .filter(|event| self.is_valid_stored_event(event))
559                .collect::<Vec<_>>();
560            for event in &retained {
561                known.insert(event.id.clone(), event.clone());
562            }
563            existing_by_author.insert(author.clone(), retained);
564        }
565
566        let pubkeys: Vec<PublicKey> = author_batch
567            .iter()
568            .filter_map(|author| author.parse::<PublicKey>().ok())
569            .collect();
570        if pubkeys.is_empty() {
571            return Ok(BatchCrawlReport {
572                events_seen: 0,
573                events_selected: 0,
574                events: Vec::new(),
575                live_bytes_selected: live_bytes_selected_so_far,
576            });
577        }
578
579        let initial_known_ids = known.keys().cloned().collect::<BTreeSet<_>>();
580        if self.config.full_author_history {
581            return self
582                .crawl_full_author_history_batch(
583                    client,
584                    author_batch,
585                    existing_by_author,
586                    known,
587                    initial_known_ids,
588                    relay_negentropy_support,
589                    failed_relays,
590                    live_bytes_selected_so_far,
591                )
592                .await;
593        }
594
595        let filter = self.batch_filter(pubkeys);
596        let mut fetched = BTreeMap::<String, StoredNostrEvent>::new();
597        let mut events_seen = 0usize;
598
599        for relay in &self.config.relays {
600            if failed_relays.contains(relay) {
601                continue;
602            }
603            let local_items = self.local_items_for_batch(known.values(), author_batch);
604            let relay_support = relay_negentropy_support.get(relay).copied();
605            let fetched_from_relay = match self
606                .fetch_events_from_relay(client, relay, filter.clone(), local_items, relay_support)
607                .await
608            {
609                Ok(result) => result,
610                Err(err) => {
611                    eprintln!("Skipping relay {relay}: {err}");
612                    failed_relays.insert(relay.clone());
613                    continue;
614                }
615            };
616            relay_negentropy_support.insert(relay.clone(), fetched_from_relay.supports_negentropy);
617            events_seen = events_seen.saturating_add(fetched_from_relay.events_seen);
618            for event in fetched_from_relay.events {
619                if self.kind_allowed(event.kind)
620                    && known.insert(event.id.clone(), event.clone()).is_none()
621                {
622                    fetched.insert(event.id.clone(), event);
623                }
624            }
625        }
626
627        let mut fetched_by_author = BTreeMap::<String, Vec<StoredNostrEvent>>::new();
628        for event in fetched.into_values() {
629            fetched_by_author
630                .entry(event.pubkey.clone())
631                .or_default()
632                .push(event);
633        }
634
635        let mut selected = Vec::new();
636        for author in author_batch {
637            let mut merged: BTreeMap<String, StoredNostrEvent> = BTreeMap::new();
638            if let Some(existing_events) = existing_by_author.remove(author) {
639                for event in existing_events {
640                    merged.insert(event.id.clone(), event);
641                }
642            }
643            if let Some(events) = fetched_by_author.remove(author) {
644                for event in events {
645                    merged.insert(event.id.clone(), event);
646                }
647            }
648            selected.extend(self.select_author_events(merged.into_values().collect())?);
649        }
650
651        let selected = selected
652            .into_iter()
653            .filter(|event| self.is_valid_stored_event(event))
654            .collect::<Vec<_>>();
655        let (selected, live_bytes_selected) =
656            self.apply_live_byte_cap_from(selected, live_bytes_selected_so_far)?;
657        let events_selected = selected.len();
658        let events_to_apply = selected
659            .into_iter()
660            .filter(|event| !initial_known_ids.contains(&event.id))
661            .collect::<Vec<_>>();
662
663        Ok(BatchCrawlReport {
664            events_seen,
665            events_selected,
666            events: events_to_apply,
667            live_bytes_selected,
668        })
669    }
670
671    #[allow(clippy::too_many_arguments)]
672    async fn crawl_full_author_history_batch(
673        &self,
674        client: &Client,
675        author_batch: &[String],
676        mut existing_by_author: BTreeMap<String, Vec<StoredNostrEvent>>,
677        mut known: BTreeMap<String, StoredNostrEvent>,
678        initial_known_ids: BTreeSet<String>,
679        relay_negentropy_support: &mut BTreeMap<String, bool>,
680        failed_relays: &mut BTreeSet<String>,
681        live_bytes_selected_so_far: u64,
682    ) -> Result<BatchCrawlReport> {
683        let mut events_seen = 0usize;
684        let mut selected = Vec::new();
685        let author_set = author_batch
686            .iter()
687            .map(String::as_str)
688            .collect::<BTreeSet<_>>();
689        let pubkeys = author_batch
690            .iter()
691            .filter_map(|author| author.parse::<PublicKey>().ok())
692            .collect::<Vec<_>>();
693
694        if pubkeys.is_empty() {
695            return Ok(BatchCrawlReport {
696                events_seen: 0,
697                events_selected: 0,
698                events: Vec::new(),
699                live_bytes_selected: live_bytes_selected_so_far,
700            });
701        }
702
703        let local_items = self.local_items_for_batch(known.values(), author_batch);
704        let mut fetched_by_author = BTreeMap::<String, Vec<StoredNostrEvent>>::new();
705
706        let relays_to_fetch = self
707            .config
708            .relays
709            .iter()
710            .filter(|relay| !failed_relays.contains(*relay))
711            .map(|relay| (relay.clone(), relay_negentropy_support.get(relay).copied()))
712            .collect::<Vec<_>>();
713        let relay_fetches = relays_to_fetch.into_iter().map(|(relay, relay_support)| {
714            let local_items = local_items.clone();
715            let pubkeys = &pubkeys;
716            async move {
717                let result = self
718                    .fetch_full_history_from_relay(
719                        client,
720                        &relay,
721                        pubkeys,
722                        local_items,
723                        relay_support,
724                    )
725                    .await;
726                (relay, result)
727            }
728        });
729        let mut relay_fetches =
730            stream::iter(relay_fetches).buffer_unordered(self.config.relays.len().max(1));
731
732        while let Some((relay, result)) = relay_fetches.next().await {
733            match result {
734                Ok(fetched_from_relay) => {
735                    relay_negentropy_support
736                        .insert(relay.clone(), fetched_from_relay.supports_negentropy);
737                    events_seen = events_seen.saturating_add(fetched_from_relay.events_seen);
738                    for event in fetched_from_relay.events {
739                        if self.kind_allowed(event.kind)
740                            && author_set.contains(event.pubkey.as_str())
741                            && known.insert(event.id.clone(), event.clone()).is_none()
742                        {
743                            fetched_by_author
744                                .entry(event.pubkey.clone())
745                                .or_default()
746                                .push(event);
747                        }
748                    }
749                }
750                Err(err) => {
751                    eprintln!("Skipping relay {relay}: {err}");
752                    failed_relays.insert(relay.clone());
753                }
754            }
755        }
756
757        for author in author_batch {
758            let mut merged = BTreeMap::<String, StoredNostrEvent>::new();
759            if let Some(existing_events) = existing_by_author.remove(author) {
760                for event in existing_events {
761                    merged.insert(event.id.clone(), event);
762                }
763            }
764            if let Some(events) = fetched_by_author.remove(author) {
765                for event in events {
766                    merged.insert(event.id.clone(), event);
767                }
768            }
769            let author_selected = self
770                .select_author_events_with_limits(
771                    merged.into_values().collect(),
772                    self.config.per_author_event_limit,
773                    self.config.per_author_live_bytes,
774                )?
775                .into_iter()
776                .filter(|event| self.is_valid_stored_event(event))
777                .collect::<Vec<_>>();
778            selected.extend(author_selected);
779        }
780
781        let (events, live_bytes_selected) =
782            self.apply_live_byte_cap_from(selected, live_bytes_selected_so_far)?;
783        let events_selected = events.len();
784        let events_to_apply = events
785            .into_iter()
786            .filter(|event| !initial_known_ids.contains(&event.id))
787            .collect::<Vec<_>>();
788        Ok(BatchCrawlReport {
789            events_seen,
790            events_selected,
791            events: events_to_apply,
792            live_bytes_selected,
793        })
794    }
795
796    async fn crawl_global_recent_incremental(
797        &self,
798        client: &Client,
799        authors: &[String],
800        mut state: GlobalRecentState,
801        on_progress: &mut impl FnMut(&CrawlReport),
802    ) -> Result<CrawlReport> {
803        let author_set = authors.iter().map(String::as_str).collect::<BTreeSet<_>>();
804        let mut known_ids = state
805            .retained_by_author
806            .values()
807            .flat_map(|events| events.iter().map(|event| event.id.clone()))
808            .collect::<BTreeSet<_>>();
809        let mut authors_processed = state
810            .retained_by_author
811            .values()
812            .filter(|events| !events.is_empty())
813            .count();
814        let mut failed_relays = BTreeSet::<String>::new();
815        let mut relay_negentropy_support = BTreeMap::<String, bool>::new();
816        let mut events_seen = 0usize;
817        let mut applied_events = Vec::new();
818
819        self.hydrate_global_recent_profiles(
820            client,
821            authors,
822            &mut state,
823            &mut known_ids,
824            &mut authors_processed,
825            &mut applied_events,
826            &mut relay_negentropy_support,
827            &mut failed_relays,
828            &mut events_seen,
829            on_progress,
830        )
831        .await?;
832        if self.reached_events_seen_limit(events_seen) {
833            return Ok(CrawlReport {
834                root: state.current_root,
835                authors_considered: authors.len(),
836                authors_processed,
837                events_seen,
838                events_selected: state.events_selected,
839                live_bytes_selected: state.live_bytes_selected,
840                applied_events,
841            });
842        }
843
844        for relay in &self.config.relays {
845            if failed_relays.contains(relay) {
846                continue;
847            }
848            let mut until = None;
849            for _ in 0..self.config.max_relay_pages {
850                let filter = self.global_recent_filter(until);
851                let events = match client
852                    .fetch_events_from([relay], filter, self.config.fetch_timeout)
853                    .await
854                    .map(|events| events.to_vec())
855                {
856                    Ok(events) => events,
857                    Err(err) => {
858                        eprintln!("Skipping relay {relay}: {}", err);
859                        failed_relays.insert(relay.clone());
860                        break;
861                    }
862                };
863                let fetched_count = events.len();
864                events_seen = events_seen.saturating_add(fetched_count);
865                if fetched_count == 0 {
866                    break;
867                }
868
869                let mut min_created_at = u64::MAX;
870                let mut incoming_by_author = BTreeMap::<String, Vec<StoredNostrEvent>>::new();
871                for event in events {
872                    min_created_at = min_created_at.min(event.created_at.as_secs());
873                    if event.kind.is_ephemeral() {
874                        continue;
875                    }
876
877                    let stored = stored_event_from_nostr(&event);
878                    if !author_set.contains(stored.pubkey.as_str())
879                        || !self.kind_allowed(stored.kind)
880                    {
881                        continue;
882                    }
883                    incoming_by_author
884                        .entry(stored.pubkey.clone())
885                        .or_default()
886                        .push(stored);
887                }
888
889                let pending_apply = self.merge_author_events_into_state(
890                    &mut state,
891                    incoming_by_author,
892                    &mut known_ids,
893                    &mut authors_processed,
894                )?;
895                if !pending_apply.is_empty() {
896                    applied_events.extend(pending_apply.clone());
897                    state.current_root = self
898                        .event_store
899                        .build(state.current_root.as_ref(), pending_apply)
900                        .await?;
901                }
902
903                on_progress(&CrawlReport {
904                    root: state.current_root.clone(),
905                    authors_considered: authors.len(),
906                    authors_processed,
907                    events_seen,
908                    events_selected: state.events_selected,
909                    live_bytes_selected: state.live_bytes_selected,
910                    applied_events: Vec::new(),
911                });
912
913                if min_created_at == u64::MAX || min_created_at == 0 {
914                    break;
915                }
916                let next_until = min_created_at.saturating_sub(1);
917                if until == Some(next_until) {
918                    break;
919                }
920                until = Some(next_until);
921                if self.reached_events_seen_limit(events_seen) {
922                    break;
923                }
924            }
925            if self.reached_events_seen_limit(events_seen) {
926                break;
927            }
928        }
929
930        Ok(CrawlReport {
931            root: state.current_root,
932            authors_considered: authors.len(),
933            authors_processed,
934            events_seen,
935            events_selected: state.events_selected,
936            live_bytes_selected: state.live_bytes_selected,
937            applied_events,
938        })
939    }
940
941    fn batch_filter(&self, pubkeys: Vec<PublicKey>) -> Filter {
942        let mut filter = Filter::new().authors(pubkeys);
943        if let Some(kinds) = &self.config.kinds {
944            filter = filter.kinds(kinds.iter().copied().map(Kind::from));
945        }
946        let mut relay_limit = self
947            .config
948            .author_batch_size
949            .saturating_mul(self.config.per_author_event_limit);
950        let relay_page_budget = self
951            .config
952            .relay_page_size
953            .saturating_mul(self.config.max_relay_pages.max(1));
954        if relay_page_budget > 0 {
955            relay_limit = relay_limit.min(relay_page_budget);
956        }
957        if relay_limit > 0 {
958            filter = filter.limit(relay_limit);
959        }
960        filter
961    }
962
963    fn full_history_negentropy_filter(&self, pubkeys: Vec<PublicKey>) -> Filter {
964        let mut filter = Filter::new().authors(pubkeys);
965        if let Some(kinds) = &self.config.kinds {
966            filter = filter.kinds(kinds.iter().copied().map(Kind::from));
967        }
968        filter
969    }
970
971    fn global_recent_filter(&self, until: Option<u64>) -> Filter {
972        let mut filter = Filter::new().limit(self.config.relay_page_size);
973        if let Some(kinds) = &self.config.kinds {
974            filter = filter.kinds(kinds.iter().copied().map(Kind::from));
975        }
976        if let Some(until) = until {
977            filter = filter.until(Timestamp::from_secs(until));
978        }
979        filter
980    }
981
982    fn reached_events_seen_limit(&self, events_seen: usize) -> bool {
983        self.config
984            .max_events_seen
985            .is_some_and(|limit| events_seen >= limit)
986    }
987
988    fn is_valid_stored_event(&self, event: &StoredNostrEvent) -> bool {
989        self.event_store.encode_event(event).is_ok()
990    }
991
992    fn encoded_events_size(&self, events: &[StoredNostrEvent]) -> Result<u64> {
993        let mut total = 0u64;
994        for event in events {
995            total = total.saturating_add(self.event_store.encode_event(event)?.len() as u64);
996        }
997        Ok(total)
998    }
999
1000    fn local_items_for_batch<'a, I>(
1001        &self,
1002        known_events: I,
1003        author_batch: &[String],
1004    ) -> Vec<(EventId, Timestamp)>
1005    where
1006        I: Iterator<Item = &'a StoredNostrEvent>,
1007    {
1008        let authors = author_batch
1009            .iter()
1010            .map(String::as_str)
1011            .collect::<BTreeSet<_>>();
1012
1013        known_events
1014            .filter(|event| {
1015                authors.contains(event.pubkey.as_str()) && self.kind_allowed(event.kind)
1016            })
1017            .filter_map(|event| {
1018                let event_id = EventId::parse(&event.id).ok()?;
1019                Some((event_id, Timestamp::from_secs(event.created_at)))
1020            })
1021            .collect()
1022    }
1023
1024    fn local_items_for_batch_by_kind<'a, I>(
1025        &self,
1026        known_events: I,
1027        author_batch: &[String],
1028        kind: u32,
1029    ) -> Vec<(EventId, Timestamp)>
1030    where
1031        I: Iterator<Item = &'a StoredNostrEvent>,
1032    {
1033        let authors = author_batch
1034            .iter()
1035            .map(String::as_str)
1036            .collect::<BTreeSet<_>>();
1037
1038        known_events
1039            .filter(|event| {
1040                authors.contains(event.pubkey.as_str())
1041                    && event.kind == kind
1042                    && self.kind_allowed(event.kind)
1043            })
1044            .filter_map(|event| {
1045                let event_id = EventId::parse(&event.id).ok()?;
1046                Some((event_id, Timestamp::from_secs(event.created_at)))
1047            })
1048            .collect()
1049    }
1050
1051    async fn load_retained_events(
1052        &self,
1053        root: Option<&Cid>,
1054        author: &str,
1055    ) -> Result<Vec<StoredNostrEvent>> {
1056        self.event_store
1057            .list_by_author_lossy(root, author, ListEventsOptions::default())
1058            .await
1059            .map_err(Into::into)
1060    }
1061
1062    fn merge_author_events_into_state(
1063        &self,
1064        state: &mut GlobalRecentState,
1065        incoming_by_author: BTreeMap<String, Vec<StoredNostrEvent>>,
1066        known_ids: &mut BTreeSet<String>,
1067        authors_processed: &mut usize,
1068    ) -> Result<Vec<StoredNostrEvent>> {
1069        let mut pending_apply = Vec::new();
1070
1071        for (author, incoming) in incoming_by_author {
1072            let retained = state.retained_by_author.entry(author).or_default();
1073            let was_empty = retained.is_empty();
1074            let old_len = retained.len();
1075            let old_live_bytes = self.encoded_events_size(retained)?;
1076
1077            let mut merged = BTreeMap::<String, StoredNostrEvent>::new();
1078            for existing in retained.drain(..) {
1079                merged.insert(existing.id.clone(), existing);
1080            }
1081            for event in incoming {
1082                merged.insert(event.id.clone(), event);
1083            }
1084
1085            let selected = self.select_author_events(merged.into_values().collect())?;
1086            let selected_live_bytes = self.encoded_events_size(&selected)?;
1087
1088            for selected_event in &selected {
1089                if known_ids.insert(selected_event.id.clone()) {
1090                    pending_apply.push(selected_event.clone());
1091                }
1092            }
1093
1094            state.events_selected = state
1095                .events_selected
1096                .saturating_sub(old_len)
1097                .saturating_add(selected.len());
1098            state.live_bytes_selected = state
1099                .live_bytes_selected
1100                .saturating_sub(old_live_bytes)
1101                .saturating_add(selected_live_bytes);
1102            if was_empty && !selected.is_empty() {
1103                *authors_processed = authors_processed.saturating_add(1);
1104            }
1105            *retained = selected;
1106        }
1107
1108        Ok(pending_apply)
1109    }
1110
1111    #[allow(clippy::too_many_arguments)]
1112    async fn hydrate_global_recent_profiles(
1113        &self,
1114        client: &Client,
1115        authors: &[String],
1116        state: &mut GlobalRecentState,
1117        known_ids: &mut BTreeSet<String>,
1118        authors_processed: &mut usize,
1119        applied_events: &mut Vec<StoredNostrEvent>,
1120        relay_negentropy_support: &mut BTreeMap<String, bool>,
1121        failed_relays: &mut BTreeSet<String>,
1122        events_seen: &mut usize,
1123        on_progress: &mut impl FnMut(&CrawlReport),
1124    ) -> Result<()> {
1125        if !self.kind_allowed(METADATA_KIND) {
1126            return Ok(());
1127        }
1128
1129        let authors_missing_profiles = authors
1130            .iter()
1131            .filter(|author| {
1132                !state
1133                    .retained_by_author
1134                    .get(*author)
1135                    .is_some_and(|events| events.iter().any(|event| event.kind == METADATA_KIND))
1136            })
1137            .cloned()
1138            .collect::<Vec<_>>();
1139        if authors_missing_profiles.is_empty() {
1140            return Ok(());
1141        }
1142
1143        for author_batch in authors_missing_profiles.chunks(self.config.author_batch_size.max(1)) {
1144            let batch = self
1145                .crawl_profile_batch(
1146                    client,
1147                    author_batch,
1148                    state
1149                        .retained_by_author
1150                        .values()
1151                        .flat_map(|events| events.iter()),
1152                    relay_negentropy_support,
1153                    failed_relays,
1154                )
1155                .await?;
1156
1157            *events_seen = events_seen.saturating_add(batch.events_seen);
1158            let pending_apply = self.merge_author_events_into_state(
1159                state,
1160                batch.events_by_author,
1161                known_ids,
1162                authors_processed,
1163            )?;
1164            if !pending_apply.is_empty() {
1165                applied_events.extend(pending_apply.clone());
1166                state.current_root = self
1167                    .event_store
1168                    .build(state.current_root.as_ref(), pending_apply)
1169                    .await?;
1170            }
1171
1172            on_progress(&CrawlReport {
1173                root: state.current_root.clone(),
1174                authors_considered: authors.len(),
1175                authors_processed: *authors_processed,
1176                events_seen: *events_seen,
1177                events_selected: state.events_selected,
1178                live_bytes_selected: state.live_bytes_selected,
1179                applied_events: Vec::new(),
1180            });
1181
1182            if self.reached_events_seen_limit(*events_seen) {
1183                break;
1184            }
1185        }
1186
1187        Ok(())
1188    }
1189
1190    async fn crawl_profile_batch<'a, I>(
1191        &self,
1192        client: &Client,
1193        author_batch: &[String],
1194        known_events: I,
1195        relay_negentropy_support: &mut BTreeMap<String, bool>,
1196        failed_relays: &mut BTreeSet<String>,
1197    ) -> Result<ProfileBatchReport>
1198    where
1199        I: Iterator<Item = &'a StoredNostrEvent>,
1200    {
1201        let pubkeys: Vec<PublicKey> = author_batch
1202            .iter()
1203            .filter_map(|author| author.parse::<PublicKey>().ok())
1204            .collect();
1205        if pubkeys.is_empty() {
1206            return Ok(ProfileBatchReport::default());
1207        }
1208
1209        let filter = Filter::new()
1210            .authors(pubkeys)
1211            .kind(Kind::Metadata)
1212            .limit(author_batch.len().saturating_mul(2).max(1));
1213        let local_items =
1214            self.local_items_for_batch_by_kind(known_events, author_batch, METADATA_KIND);
1215        let mut fetched_by_author = BTreeMap::<String, Vec<StoredNostrEvent>>::new();
1216        let mut events_seen = 0usize;
1217
1218        for relay in &self.config.relays {
1219            if failed_relays.contains(relay) {
1220                continue;
1221            }
1222            let relay_support = relay_negentropy_support.get(relay).copied();
1223            let fetched_from_relay = match self
1224                .fetch_events_from_relay(
1225                    client,
1226                    relay,
1227                    filter.clone(),
1228                    local_items.clone(),
1229                    relay_support,
1230                )
1231                .await
1232            {
1233                Ok(result) => result,
1234                Err(err) => {
1235                    eprintln!("Skipping relay {relay}: {err}");
1236                    failed_relays.insert(relay.clone());
1237                    continue;
1238                }
1239            };
1240            relay_negentropy_support.insert(relay.clone(), fetched_from_relay.supports_negentropy);
1241            events_seen = events_seen.saturating_add(fetched_from_relay.events_seen);
1242            for event in fetched_from_relay.events {
1243                if event.kind == METADATA_KIND && self.kind_allowed(event.kind) {
1244                    fetched_by_author
1245                        .entry(event.pubkey.clone())
1246                        .or_default()
1247                        .push(event);
1248                }
1249            }
1250        }
1251
1252        Ok(ProfileBatchReport {
1253            events_seen,
1254            events_by_author: fetched_by_author,
1255        })
1256    }
1257
1258    async fn fetch_events_from_relay(
1259        &self,
1260        client: &Client,
1261        relay: &str,
1262        filter: Filter,
1263        local_items: Vec<(EventId, Timestamp)>,
1264        supports_negentropy: Option<bool>,
1265    ) -> Result<RelayFetchResult> {
1266        if supports_negentropy == Some(false) {
1267            if self.config.require_negentropy {
1268                return Ok(RelayFetchResult {
1269                    events_seen: 0,
1270                    events: Vec::new(),
1271                    supports_negentropy: false,
1272                });
1273            }
1274            return self
1275                .fetch_full_filter(client, relay, filter)
1276                .await
1277                .map(|events| RelayFetchResult {
1278                    events_seen: events.len(),
1279                    events,
1280                    supports_negentropy: false,
1281                });
1282        }
1283
1284        match self
1285            .reconcile_missing_ids(client, relay, filter.clone(), local_items)
1286            .await
1287        {
1288            Ok(Some(missing)) => self.fetch_missing_ids(client, relay, missing).await.map(
1289                |RelayFetchResult {
1290                     events_seen,
1291                     events,
1292                     ..
1293                 }| RelayFetchResult {
1294                    events_seen,
1295                    events,
1296                    supports_negentropy: true,
1297                },
1298            ),
1299            Ok(None) | Err(_) => {
1300                if self.config.require_negentropy {
1301                    Ok(RelayFetchResult {
1302                        events_seen: 0,
1303                        events: Vec::new(),
1304                        supports_negentropy: false,
1305                    })
1306                } else {
1307                    self.fetch_full_filter(client, relay, filter)
1308                        .await
1309                        .map(|events| RelayFetchResult {
1310                            events_seen: events.len(),
1311                            events,
1312                            supports_negentropy: false,
1313                        })
1314                }
1315            }
1316        }
1317    }
1318
1319    async fn fetch_missing_ids(
1320        &self,
1321        client: &Client,
1322        relay: &str,
1323        missing_ids: Vec<EventId>,
1324    ) -> Result<RelayFetchResult> {
1325        if missing_ids.is_empty() {
1326            return Ok(RelayFetchResult {
1327                events_seen: 0,
1328                events: Vec::new(),
1329                supports_negentropy: true,
1330            });
1331        }
1332
1333        let mut out = BTreeMap::<String, StoredNostrEvent>::new();
1334        let mut events_seen = 0usize;
1335        let filters = missing_ids
1336            .chunks(NEGENTROPY_FETCH_CHUNK_SIZE)
1337            .map(|chunk| Filter::new().ids(chunk.iter().cloned()))
1338            .collect::<Vec<_>>();
1339        let fetches = filters.into_iter().map(|filter| async move {
1340            client
1341                .fetch_events_from([relay], filter, self.config.fetch_timeout)
1342                .await
1343                .map(|events| events.to_vec())
1344                .map_err(|err| CrawlError::Nostr(err.to_string()))
1345        });
1346        let mut fetches =
1347            stream::iter(fetches).buffer_unordered(NEGENTROPY_FETCH_CHUNK_CONCURRENCY);
1348        while let Some(result) = fetches.next().await {
1349            let events = result?;
1350            events_seen = events_seen.saturating_add(events.len());
1351            for event in events {
1352                if event.kind.is_ephemeral() {
1353                    continue;
1354                }
1355                let stored = stored_event_from_nostr(&event);
1356                out.insert(stored.id.clone(), stored);
1357            }
1358        }
1359        Ok(RelayFetchResult {
1360            events_seen,
1361            events: out.into_values().collect(),
1362            supports_negentropy: true,
1363        })
1364    }
1365
1366    async fn fetch_full_filter(
1367        &self,
1368        client: &Client,
1369        relay: &str,
1370        filter: Filter,
1371    ) -> Result<Vec<StoredNostrEvent>> {
1372        let mut out = Vec::new();
1373        let events = client
1374            .fetch_events_from([relay], filter, self.config.fetch_timeout)
1375            .await
1376            .map(|events| events.to_vec())
1377            .map_err(|err| CrawlError::Nostr(err.to_string()))?;
1378
1379        for event in events {
1380            if event.kind.is_ephemeral() {
1381                continue;
1382            }
1383            out.push(stored_event_from_nostr(&event));
1384        }
1385
1386        Ok(out)
1387    }
1388
1389    async fn fetch_full_history_from_relay(
1390        &self,
1391        client: &Client,
1392        relay: &str,
1393        pubkeys: &[PublicKey],
1394        local_items: Vec<(EventId, Timestamp)>,
1395        supports_negentropy: Option<bool>,
1396    ) -> Result<RelayFetchResult> {
1397        if supports_negentropy != Some(false) {
1398            let filter = self.full_history_negentropy_filter(pubkeys.to_vec());
1399            match self
1400                .reconcile_missing_ids(client, relay, filter, local_items)
1401                .await
1402            {
1403                Ok(Some(missing)) => {
1404                    return match self.fetch_missing_ids(client, relay, missing).await {
1405                        Ok(RelayFetchResult {
1406                            events_seen,
1407                            events,
1408                            ..
1409                        }) => Ok(RelayFetchResult {
1410                            events_seen,
1411                            events,
1412                            supports_negentropy: true,
1413                        }),
1414                        Err(err)
1415                            if !self.config.require_negentropy
1416                                && self.config.max_relay_pages > 0 =>
1417                        {
1418                            self.fetch_full_history_by_paging_from_relay(client, relay, pubkeys)
1419                                .await
1420                                .map_err(|_| err)
1421                        }
1422                        Err(err) => Err(err),
1423                    };
1424                }
1425                Ok(None) | Err(_) if self.config.require_negentropy => {
1426                    return Ok(RelayFetchResult {
1427                        events_seen: 0,
1428                        events: Vec::new(),
1429                        supports_negentropy: false,
1430                    });
1431                }
1432                Ok(None) | Err(_) => {}
1433            }
1434        }
1435
1436        if self.config.require_negentropy || self.config.max_relay_pages == 0 {
1437            return Ok(RelayFetchResult {
1438                events_seen: 0,
1439                events: Vec::new(),
1440                supports_negentropy: false,
1441            });
1442        }
1443        self.fetch_full_history_by_paging_from_relay(client, relay, pubkeys)
1444            .await
1445    }
1446
1447    async fn reconcile_missing_ids(
1448        &self,
1449        client: &Client,
1450        relay: &str,
1451        filter: Filter,
1452        local_items: Vec<(EventId, Timestamp)>,
1453    ) -> Result<Option<Vec<EventId>>> {
1454        let initial_timeout = self.config.fetch_timeout.min(NEGENTROPY_INITIAL_TIMEOUT);
1455        let opts = SyncOptions::default()
1456            .initial_timeout(initial_timeout)
1457            .dry_run();
1458        let targets = [(relay.to_owned(), (filter, local_items))];
1459        let sync = client.pool().sync_targeted(targets, &opts);
1460        let output = match tokio::time::timeout(self.config.fetch_timeout, sync).await {
1461            Ok(Ok(output)) => output,
1462            Ok(Err(_)) | Err(_) => return Ok(None),
1463        };
1464
1465        if output.success.is_empty() {
1466            return Ok(None);
1467        }
1468
1469        Ok(Some(output.remote.iter().cloned().collect()))
1470    }
1471
1472    async fn fetch_full_history_by_paging_from_relay(
1473        &self,
1474        client: &Client,
1475        relay: &str,
1476        pubkeys: &[PublicKey],
1477    ) -> Result<RelayFetchResult> {
1478        let mut out = BTreeMap::<String, StoredNostrEvent>::new();
1479        let mut events_seen = 0usize;
1480        let concurrency = FULL_HISTORY_PAGING_CONCURRENCY_PER_RELAY
1481            .min(pubkeys.len().max(1))
1482            .max(1);
1483        let fetches = pubkeys.iter().copied().map(|pubkey| async move {
1484            self.fetch_full_author_history_by_paging_from_relay(client, relay, pubkey)
1485                .await
1486        });
1487        let mut fetches = stream::iter(fetches).buffer_unordered(concurrency);
1488
1489        while let Some(result) = fetches.next().await {
1490            let fetched = result?;
1491            events_seen = events_seen.saturating_add(fetched.events_seen);
1492            for event in fetched.events {
1493                out.insert(event.id.clone(), event);
1494            }
1495            if self.reached_events_seen_limit(events_seen) {
1496                break;
1497            }
1498        }
1499
1500        Ok(RelayFetchResult {
1501            events_seen,
1502            events: out.into_values().collect(),
1503            supports_negentropy: false,
1504        })
1505    }
1506
1507    async fn fetch_full_author_history_by_paging_from_relay(
1508        &self,
1509        client: &Client,
1510        relay: &str,
1511        pubkey: PublicKey,
1512    ) -> Result<RelayFetchResult> {
1513        let mut out = BTreeMap::<String, StoredNostrEvent>::new();
1514        let mut events_seen = 0usize;
1515        let mut until = None;
1516
1517        for _ in 0..self.config.max_relay_pages {
1518            let remaining = self.config.per_author_event_limit.saturating_sub(out.len());
1519            if remaining == 0 {
1520                break;
1521            }
1522            let mut filter = Filter::new()
1523                .author(pubkey)
1524                .limit(self.config.relay_page_size.min(remaining));
1525            if let Some(kinds) = &self.config.kinds {
1526                filter = filter.kinds(kinds.iter().copied().map(Kind::from));
1527            }
1528            if let Some(until) = until {
1529                filter = filter.until(Timestamp::from_secs(until));
1530            }
1531
1532            let events = client
1533                .fetch_events_from([relay], filter, self.config.fetch_timeout)
1534                .await
1535                .map(|events| events.to_vec())
1536                .map_err(|err| CrawlError::Nostr(err.to_string()))?;
1537            let fetched_count = events.len();
1538            events_seen = events_seen.saturating_add(fetched_count);
1539            if fetched_count == 0 {
1540                break;
1541            }
1542
1543            let mut min_created_at = u64::MAX;
1544            for event in events {
1545                min_created_at = min_created_at.min(event.created_at.as_secs());
1546                if event.kind.is_ephemeral() {
1547                    continue;
1548                }
1549                let stored = stored_event_from_nostr(&event);
1550                out.insert(stored.id.clone(), stored);
1551                if out.len() >= self.config.per_author_event_limit {
1552                    break;
1553                }
1554            }
1555
1556            if out.len() >= self.config.per_author_event_limit {
1557                break;
1558            }
1559            if min_created_at == u64::MAX || min_created_at == 0 {
1560                break;
1561            }
1562            let next_until = min_created_at.saturating_sub(1);
1563            if until == Some(next_until) {
1564                break;
1565            }
1566            until = Some(next_until);
1567            if self.reached_events_seen_limit(events_seen) {
1568                break;
1569            }
1570        }
1571
1572        Ok(RelayFetchResult {
1573            events_seen,
1574            events: out.into_values().collect(),
1575            supports_negentropy: false,
1576        })
1577    }
1578
1579    fn select_author_events(&self, events: Vec<StoredNostrEvent>) -> Result<Vec<StoredNostrEvent>> {
1580        self.select_author_events_with_limits(
1581            events,
1582            self.config.per_author_event_limit,
1583            self.config.per_author_live_bytes,
1584        )
1585    }
1586
1587    fn select_author_events_with_limits(
1588        &self,
1589        mut events: Vec<StoredNostrEvent>,
1590        event_limit: usize,
1591        live_byte_limit: Option<u64>,
1592    ) -> Result<Vec<StoredNostrEvent>> {
1593        let sticky_events = self.select_sticky_author_events(&events);
1594        let sticky_ids = sticky_events
1595            .iter()
1596            .map(|event| event.id.clone())
1597            .collect::<HashSet<_>>();
1598        events.retain(|event| !sticky_ids.contains(&event.id));
1599        events.sort_by(|left, right| {
1600            self.policy
1601                .priority(right)
1602                .cmp(&self.policy.priority(left))
1603                .then_with(|| right.created_at.cmp(&left.created_at))
1604                .then_with(|| left.id.cmp(&right.id))
1605        });
1606
1607        if let Some(max_live_bytes) = live_byte_limit {
1608            let mut selected = sticky_events.clone();
1609            let mut live_bytes_selected = self.encoded_events_size(&selected)?;
1610            if live_bytes_selected > max_live_bytes {
1611                return Ok(selected);
1612            }
1613            for event in events {
1614                let encoded_len = self.event_store.encode_event(&event)?.len() as u64;
1615                if live_bytes_selected.saturating_add(encoded_len) > max_live_bytes {
1616                    continue;
1617                }
1618                live_bytes_selected = live_bytes_selected.saturating_add(encoded_len);
1619                selected.push(event);
1620                if selected.len().saturating_sub(sticky_events.len()) >= event_limit {
1621                    break;
1622                }
1623            }
1624            return Ok(selected);
1625        }
1626
1627        let mut selected = sticky_events;
1628        selected.extend(events.into_iter().take(event_limit));
1629        Ok(selected)
1630    }
1631
1632    fn select_sticky_author_events(&self, events: &[StoredNostrEvent]) -> Vec<StoredNostrEvent> {
1633        let latest_metadata = events
1634            .iter()
1635            .filter(|event| event.kind == METADATA_KIND)
1636            .cloned()
1637            .max_by(|left, right| {
1638                left.created_at
1639                    .cmp(&right.created_at)
1640                    .then_with(|| right.id.cmp(&left.id))
1641            });
1642
1643        latest_metadata.into_iter().collect()
1644    }
1645
1646    fn apply_live_byte_cap_from(
1647        &self,
1648        mut events: Vec<StoredNostrEvent>,
1649        live_bytes_selected_so_far: u64,
1650    ) -> Result<(Vec<StoredNostrEvent>, u64)> {
1651        events.sort_by(|left, right| {
1652            self.policy
1653                .priority(right)
1654                .cmp(&self.policy.priority(left))
1655                .then_with(|| right.created_at.cmp(&left.created_at))
1656                .then_with(|| left.id.cmp(&right.id))
1657        });
1658
1659        let Some(max_live_bytes) = self.config.max_live_bytes else {
1660            let live_bytes_selected =
1661                events
1662                    .iter()
1663                    .try_fold(live_bytes_selected_so_far, |total, event| {
1664                        let encoded = self.event_store.encode_event(event)?;
1665                        Ok::<u64, NostrEventStoreError>(total.saturating_add(encoded.len() as u64))
1666                    })?;
1667            return Ok((events, live_bytes_selected));
1668        };
1669
1670        let mut selected = Vec::new();
1671        let mut live_bytes_selected = live_bytes_selected_so_far;
1672        for event in events {
1673            let encoded_len = self.event_store.encode_event(&event)?.len() as u64;
1674            if live_bytes_selected.saturating_add(encoded_len) > max_live_bytes {
1675                continue;
1676            }
1677            live_bytes_selected = live_bytes_selected.saturating_add(encoded_len);
1678            selected.push(event);
1679        }
1680
1681        Ok((selected, live_bytes_selected))
1682    }
1683
1684    fn kind_allowed(&self, kind: u32) -> bool {
1685        self.config.kinds.as_ref().is_none_or(|allowed| {
1686            allowed
1687                .iter()
1688                .any(|candidate| u32::from(*candidate) == kind)
1689        })
1690    }
1691}
1692
1693fn stored_event_from_nostr(event: &nostr_sdk::Event) -> StoredNostrEvent {
1694    StoredNostrEvent {
1695        id: event.id.to_hex(),
1696        pubkey: event.pubkey.to_hex(),
1697        created_at: event.created_at.as_secs(),
1698        kind: event.kind.as_u16() as u32,
1699        tags: event
1700            .tags
1701            .iter()
1702            .map(|tag| tag.as_slice().to_vec())
1703            .collect(),
1704        content: event.content.clone(),
1705        sig: event.sig.to_string(),
1706    }
1707}
1708
1709fn is_valid_hex_pubkey(value: &str) -> bool {
1710    value.len() == 64
1711        && value
1712            .bytes()
1713            .all(|byte| byte.is_ascii_digit() || (b'a'..=b'f').contains(&byte))
1714}
1715
1716#[cfg(test)]
1717mod tests {
1718    use std::sync::Arc;
1719
1720    use hashtree_core::MemoryStore;
1721    use nostr_social_graph::{NostrEvent, SocialGraphBackend as NostrSocialGraphBackend};
1722
1723    use super::{CrawlConfig, NostrBridge, StoredNostrEvent};
1724
1725    #[derive(Default)]
1726    struct FakeGraphBackend;
1727
1728    impl NostrSocialGraphBackend for FakeGraphBackend {
1729        type Error = std::io::Error;
1730
1731        fn get_root(&self) -> std::result::Result<String, Self::Error> {
1732            Ok("0".repeat(64))
1733        }
1734
1735        fn set_root(&mut self, _root: &str) -> std::result::Result<(), Self::Error> {
1736            Ok(())
1737        }
1738
1739        fn handle_event(
1740            &mut self,
1741            _event: &NostrEvent,
1742            _allow_unknown_authors: bool,
1743            _overmute_threshold: f64,
1744        ) -> std::result::Result<(), Self::Error> {
1745            Ok(())
1746        }
1747
1748        fn get_follow_distance(&self, _user: &str) -> std::result::Result<u32, Self::Error> {
1749            Ok(0)
1750        }
1751
1752        fn is_following(
1753            &self,
1754            _follower: &str,
1755            _followed_user: &str,
1756        ) -> std::result::Result<bool, Self::Error> {
1757            Ok(false)
1758        }
1759
1760        fn get_followed_by_user(
1761            &self,
1762            user: &str,
1763        ) -> std::result::Result<Vec<String>, Self::Error> {
1764            if user == "0".repeat(64) {
1765                return Ok(vec![
1766                    "1".repeat(64),
1767                    "NOT-HEX".to_string(),
1768                    "a".repeat(63),
1769                    "A".repeat(64),
1770                ]);
1771            }
1772            Ok(Vec::new())
1773        }
1774
1775        fn get_followers_by_user(
1776            &self,
1777            _user: &str,
1778        ) -> std::result::Result<Vec<String>, Self::Error> {
1779            Ok(Vec::new())
1780        }
1781
1782        fn get_muted_by_user(&self, _user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1783            Ok(Vec::new())
1784        }
1785
1786        fn get_user_muted_by(&self, _user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1787            Ok(Vec::new())
1788        }
1789
1790        fn get_follow_list_created_at(
1791            &self,
1792            _user: &str,
1793        ) -> std::result::Result<Option<u64>, Self::Error> {
1794            Ok(None)
1795        }
1796
1797        fn get_mute_list_created_at(
1798            &self,
1799            _user: &str,
1800        ) -> std::result::Result<Option<u64>, Self::Error> {
1801            Ok(None)
1802        }
1803
1804        fn is_overmuted(
1805            &self,
1806            _user: &str,
1807            _threshold: f64,
1808        ) -> std::result::Result<bool, Self::Error> {
1809            Ok(false)
1810        }
1811    }
1812
1813    #[test]
1814    fn rejects_invalid_stored_event_shape() {
1815        let bridge = NostrBridge::new(Arc::new(MemoryStore::new()), CrawlConfig::default());
1816        let invalid = StoredNostrEvent {
1817            id: "f".repeat(64),
1818            pubkey: "not-hex".to_string(),
1819            created_at: 1,
1820            kind: 1,
1821            tags: Vec::new(),
1822            content: String::new(),
1823            sig: "f".repeat(128),
1824        };
1825
1826        assert!(!bridge.is_valid_stored_event(&invalid));
1827    }
1828
1829    #[test]
1830    fn collect_authors_skips_invalid_graph_pubkeys() {
1831        let bridge = NostrBridge::new(Arc::new(MemoryStore::new()), CrawlConfig::default());
1832        let authors = bridge
1833            .collect_authors(&FakeGraphBackend)
1834            .expect("collect authors");
1835
1836        assert_eq!(authors, vec!["0".repeat(64), "1".repeat(64)]);
1837    }
1838
1839    #[test]
1840    fn collect_authors_prefers_allowlist_and_applies_limits() {
1841        let bridge = NostrBridge::new(
1842            Arc::new(MemoryStore::new()),
1843            CrawlConfig {
1844                author_allowlist: Some(vec![
1845                    "1".repeat(64),
1846                    "NOT-HEX".to_string(),
1847                    "0".repeat(64),
1848                    "1".repeat(64),
1849                ]),
1850                max_authors: Some(1),
1851                ..CrawlConfig::default()
1852            },
1853        );
1854        let authors = bridge
1855            .collect_authors(&FakeGraphBackend)
1856            .expect("collect authors");
1857
1858        assert_eq!(authors, vec!["1".repeat(64)]);
1859    }
1860}