Skip to main content

hashtree_cli/socialgraph/
crawler.rs

1use futures::stream::{self, StreamExt};
2use std::collections::HashSet;
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::time::Duration;
6
7use nostr::Timestamp;
8use tokio::sync::watch;
9use tokio::task::JoinHandle;
10
11use super::{
12    read_local_list_file_state, sync_local_list_files_force, sync_local_list_files_if_changed,
13    LocalListFileState, SocialGraphBackend,
14};
15
16const DEFAULT_AUTHOR_BATCH_SIZE: usize = 500;
17const DEFAULT_CONCURRENT_BATCHES: usize = 4;
18const GRAPH_FETCH_TIMEOUT: Duration = Duration::from_secs(15);
19const RELAY_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
20pub(crate) const SOCIALGRAPH_RELAY_EVENT_MAX_SIZE: u32 = 512 * 1024;
21#[cfg(not(test))]
22const CRAWLER_STARTUP_DELAY: Duration = Duration::from_secs(5);
23#[cfg(test)]
24const CRAWLER_STARTUP_DELAY: Duration = Duration::from_millis(50);
25#[cfg(not(test))]
26const LOCAL_LIST_POLL_INTERVAL: Duration = Duration::from_secs(5);
27#[cfg(test)]
28const LOCAL_LIST_POLL_INTERVAL: Duration = Duration::from_millis(100);
29
30pub struct SocialGraphTaskHandles {
31    pub shutdown_tx: watch::Sender<bool>,
32    pub crawl_handle: JoinHandle<()>,
33    pub local_list_handle: JoinHandle<()>,
34}
35
36pub struct SocialGraphCrawler {
37    graph_store: Arc<dyn SocialGraphBackend>,
38    spambox: Option<Arc<dyn SocialGraphBackend>>,
39    keys: nostr::Keys,
40    relays: Vec<String>,
41    max_depth: u32,
42    author_batch_size: usize,
43    concurrent_batches: usize,
44    full_recrawl: bool,
45    known_since: Option<Timestamp>,
46}
47
48impl SocialGraphCrawler {
49    pub fn new(
50        graph_store: Arc<dyn SocialGraphBackend>,
51        keys: nostr::Keys,
52        relays: Vec<String>,
53        max_depth: u32,
54    ) -> Self {
55        Self {
56            graph_store,
57            spambox: None,
58            keys,
59            relays,
60            max_depth,
61            author_batch_size: DEFAULT_AUTHOR_BATCH_SIZE,
62            concurrent_batches: DEFAULT_CONCURRENT_BATCHES,
63            full_recrawl: false,
64            known_since: None,
65        }
66    }
67
68    pub fn with_spambox(mut self, spambox: Arc<dyn SocialGraphBackend>) -> Self {
69        self.spambox = Some(spambox);
70        self
71    }
72
73    pub fn with_author_batch_size(mut self, author_batch_size: usize) -> Self {
74        self.author_batch_size = author_batch_size.max(1);
75        self
76    }
77
78    pub fn with_concurrent_batches(mut self, concurrent_batches: usize) -> Self {
79        self.concurrent_batches = concurrent_batches.max(1);
80        self
81    }
82
83    pub fn with_full_recrawl(mut self, full_recrawl: bool) -> Self {
84        self.full_recrawl = full_recrawl;
85        self
86    }
87
88    pub fn with_known_since(mut self, known_since: Option<u64>) -> Self {
89        self.known_since = known_since.map(Timestamp::from);
90        self
91    }
92
93    fn is_within_social_graph(&self, pk_bytes: &[u8; 32]) -> bool {
94        if pk_bytes == &self.keys.public_key().to_bytes() {
95            return true;
96        }
97
98        super::get_follow_distance(self.graph_store.as_ref(), pk_bytes)
99            .map(|distance| distance <= self.max_depth)
100            .unwrap_or(false)
101    }
102
103    fn ingest_events_into(
104        &self,
105        graph_store: &(impl SocialGraphBackend + ?Sized),
106        events: &[nostr::Event],
107    ) {
108        if let Err(err) = super::ingest_parsed_events(graph_store, events) {
109            tracing::debug!("Failed to ingest crawler event: {}", err);
110        }
111    }
112
113    #[allow(deprecated)]
114    fn collect_missing_root_follows(
115        &self,
116        event: &nostr::Event,
117        fetched_contact_lists: &mut HashSet<[u8; 32]>,
118    ) -> Vec<[u8; 32]> {
119        if self.max_depth < 2 || event.kind != nostr::Kind::ContactList {
120            return Vec::new();
121        }
122
123        let root_pk = self.keys.public_key().to_bytes();
124        if event.pubkey.to_bytes() != root_pk {
125            return Vec::new();
126        }
127
128        let mut missing = Vec::new();
129        for tag in event.tags.iter() {
130            if let Some(nostr::TagStandard::PublicKey { public_key, .. }) = tag.as_standardized() {
131                let pk_bytes = public_key.to_bytes();
132                if fetched_contact_lists.contains(&pk_bytes) {
133                    continue;
134                }
135
136                let existing_follows = super::get_follows(self.graph_store.as_ref(), &pk_bytes);
137                if !existing_follows.is_empty() {
138                    fetched_contact_lists.insert(pk_bytes);
139                    continue;
140                }
141
142                fetched_contact_lists.insert(pk_bytes);
143                missing.push(pk_bytes);
144            }
145        }
146
147        missing
148    }
149
150    fn graph_filter_for_pubkeys(
151        pubkeys: &[[u8; 32]],
152        since: Option<Timestamp>,
153    ) -> Option<nostr::Filter> {
154        let authors = pubkeys
155            .iter()
156            .filter_map(|pk_bytes| nostr::PublicKey::from_slice(pk_bytes).ok())
157            .collect::<Vec<_>>();
158        if authors.is_empty() {
159            return None;
160        }
161
162        let mut filter = nostr::Filter::new()
163            .authors(authors)
164            .kinds(vec![nostr::Kind::ContactList, nostr::Kind::MuteList]);
165        if let Some(since) = since {
166            filter = filter.since(since);
167        }
168
169        Some(filter)
170    }
171
172    async fn fetch_graph_events_for_pubkeys(
173        &self,
174        client: &nostr_sdk::Client,
175        pubkeys: &[[u8; 32]],
176        since: Option<Timestamp>,
177    ) -> Vec<nostr::Event> {
178        let Some(filter) = Self::graph_filter_for_pubkeys(pubkeys, since) else {
179            return Vec::new();
180        };
181
182        match client
183            .get_events_of(
184                vec![filter],
185                nostr_sdk::EventSource::relays(Some(GRAPH_FETCH_TIMEOUT)),
186            )
187            .await
188        {
189            Ok(events) => events,
190            Err(err) => {
191                tracing::debug!(
192                    "Failed to fetch graph events for {} authors: {}",
193                    pubkeys.len(),
194                    err
195                );
196                Vec::new()
197            }
198        }
199    }
200
201    async fn fetch_contact_lists_for_pubkeys(
202        &self,
203        client: &nostr_sdk::Client,
204        pubkeys: &[[u8; 32]],
205        since: Option<Timestamp>,
206        shutdown_rx: &watch::Receiver<bool>,
207    ) {
208        let chunk_futures = stream::iter(
209            pubkeys
210                .chunks(self.author_batch_size)
211                .map(|chunk| chunk.to_vec())
212                .collect::<Vec<_>>(),
213        )
214        .map(|chunk| async move {
215            self.fetch_graph_events_for_pubkeys(client, &chunk, since)
216                .await
217        });
218
219        let mut in_flight = chunk_futures.buffer_unordered(self.concurrent_batches);
220        while let Some(events) = in_flight.next().await {
221            if *shutdown_rx.borrow() {
222                break;
223            }
224            if let Err(err) = super::ingest_graph_parsed_events(self.graph_store.as_ref(), &events)
225            {
226                tracing::debug!("Failed to ingest crawler graph batch: {}", err);
227            }
228        }
229    }
230
231    fn authors_to_fetch_at_distance(&self, distance: u32) -> (Vec<[u8; 32]>, Vec<[u8; 32]>) {
232        let Ok(users) = self.graph_store.users_by_follow_distance(distance) else {
233            return (Vec::new(), Vec::new());
234        };
235        if self.full_recrawl {
236            return (users, Vec::new());
237        }
238
239        let mut new_authors = Vec::new();
240        let mut known_authors = Vec::new();
241        let refresh_known_authors = self.known_since.is_some();
242        for pk_bytes in users {
243            match self
244                .graph_store
245                .follow_list_created_at(&pk_bytes)
246                .ok()
247                .flatten()
248            {
249                Some(_) if refresh_known_authors => known_authors.push(pk_bytes),
250                Some(_) => {}
251                None => new_authors.push(pk_bytes),
252            }
253        }
254        (new_authors, known_authors)
255    }
256
257    async fn connect_client(&self) -> Option<nostr_sdk::Client> {
258        use nostr::nips::nip19::ToBech32;
259
260        let Ok(sdk_keys) =
261            nostr_sdk::Keys::parse(self.keys.secret_key().to_bech32().unwrap_or_default())
262        else {
263            return None;
264        };
265
266        let mut relay_limits = nostr_sdk::pool::RelayLimits::default();
267        relay_limits.events.max_size = Some(SOCIALGRAPH_RELAY_EVENT_MAX_SIZE);
268        let client = nostr_sdk::Client::with_opts(
269            sdk_keys,
270            nostr_sdk::Options::new().relay_limits(relay_limits),
271        );
272        for relay in &self.relays {
273            if let Err(err) = client.add_relay(relay).await {
274                tracing::warn!("Failed to add relay {}: {}", relay, err);
275            }
276        }
277        client.connect_with_timeout(RELAY_CONNECT_TIMEOUT).await;
278        Some(client)
279    }
280
281    async fn sync_graph_once(
282        &self,
283        client: &nostr_sdk::Client,
284        shutdown_rx: &watch::Receiver<bool>,
285        fetched_contact_lists: &mut HashSet<[u8; 32]>,
286    ) {
287        for distance in 0..=self.max_depth {
288            if *shutdown_rx.borrow() {
289                break;
290            }
291
292            let (new_authors, known_authors) = self.authors_to_fetch_at_distance(distance);
293            if new_authors.is_empty() && known_authors.is_empty() {
294                continue;
295            }
296
297            tracing::debug!(
298                "Social graph sync distance={} new_authors={} known_authors={}",
299                distance,
300                new_authors.len(),
301                known_authors.len()
302            );
303
304            if !new_authors.is_empty() {
305                for pk_bytes in &new_authors {
306                    fetched_contact_lists.insert(*pk_bytes);
307                }
308                self.fetch_contact_lists_for_pubkeys(client, &new_authors, None, shutdown_rx)
309                    .await;
310            }
311
312            if !known_authors.is_empty() {
313                for pk_bytes in &known_authors {
314                    fetched_contact_lists.insert(*pk_bytes);
315                }
316                self.fetch_contact_lists_for_pubkeys(
317                    client,
318                    &known_authors,
319                    self.known_since,
320                    shutdown_rx,
321                )
322                .await;
323            }
324        }
325    }
326
327    pub async fn warm_once(&self) {
328        if self.relays.is_empty() {
329            tracing::warn!("Social graph crawler: no relays configured, skipping");
330            return;
331        }
332
333        let Some(client) = self.connect_client().await else {
334            return;
335        };
336        let (_shutdown_tx, shutdown_rx) = watch::channel(false);
337        let mut fetched_contact_lists: HashSet<[u8; 32]> = HashSet::new();
338        self.sync_graph_once(&client, &shutdown_rx, &mut fetched_contact_lists)
339            .await;
340        let _ = client.disconnect().await;
341    }
342
343    pub(crate) fn handle_incoming_event(&self, event: &nostr::Event) {
344        let is_contact_list = event.kind == nostr::Kind::ContactList;
345        let is_mute_list = event.kind == nostr::Kind::MuteList;
346        if !is_contact_list && !is_mute_list {
347            return;
348        }
349
350        let pk_bytes = event.pubkey.to_bytes();
351        if self.is_within_social_graph(&pk_bytes) {
352            self.ingest_events_into(self.graph_store.as_ref(), std::slice::from_ref(event));
353            return;
354        }
355
356        if let Some(spambox) = &self.spambox {
357            self.ingest_events_into(spambox.as_ref(), std::slice::from_ref(event));
358        }
359    }
360
361    #[allow(deprecated)]
362    pub async fn crawl(&self, shutdown_rx: watch::Receiver<bool>) {
363        use nostr_sdk::prelude::RelayPoolNotification;
364
365        if self.relays.is_empty() {
366            tracing::warn!("Social graph crawler: no relays configured, skipping");
367            return;
368        }
369
370        let mut shutdown_rx = shutdown_rx;
371        if *shutdown_rx.borrow() {
372            return;
373        }
374
375        let Some(client) = self.connect_client().await else {
376            return;
377        };
378
379        let mut fetched_contact_lists: HashSet<[u8; 32]> = HashSet::new();
380        self.sync_graph_once(&client, &shutdown_rx, &mut fetched_contact_lists)
381            .await;
382
383        let filter = nostr::Filter::new()
384            .kinds(vec![nostr::Kind::ContactList, nostr::Kind::MuteList])
385            .since(nostr::Timestamp::now());
386
387        let _ = client.subscribe(vec![filter], None).await;
388
389        let mut notifications = client.notifications();
390        loop {
391            tokio::select! {
392                _ = shutdown_rx.changed() => {
393                    if *shutdown_rx.borrow() {
394                        break;
395                    }
396                }
397                notification = notifications.recv() => {
398                    match notification {
399                        Ok(RelayPoolNotification::Event { event, .. }) => {
400                            self.handle_incoming_event(&event);
401                            let missing = self.collect_missing_root_follows(&event, &mut fetched_contact_lists);
402                            if !missing.is_empty() {
403                                self.fetch_contact_lists_for_pubkeys(&client, &missing, None, &shutdown_rx).await;
404                            }
405                        }
406                        Ok(_) => {}
407                        Err(err) => {
408                            tracing::warn!("Social graph crawler notification error: {}", err);
409                            break;
410                        }
411                    }
412                }
413            }
414        }
415
416        let _ = client.disconnect().await;
417    }
418}
419
420pub fn spawn_social_graph_tasks(
421    graph_store: Arc<dyn SocialGraphBackend>,
422    keys: nostr::Keys,
423    relays: Vec<String>,
424    max_depth: u32,
425    spambox: Option<Arc<dyn SocialGraphBackend>>,
426    data_dir: PathBuf,
427) -> SocialGraphTaskHandles {
428    let (shutdown_tx, crawl_shutdown_rx) = watch::channel(false);
429    let local_list_shutdown_rx = crawl_shutdown_rx.clone();
430    let crawl_data_dir = data_dir.clone();
431    let local_list_data_dir = data_dir;
432
433    let crawl_handle = tokio::spawn({
434        let graph_store = Arc::clone(&graph_store);
435        let keys = keys.clone();
436        let relays = relays.clone();
437        let spambox = spambox.clone();
438        async move {
439            tokio::time::sleep(CRAWLER_STARTUP_DELAY).await;
440            let mut crawler = SocialGraphCrawler::new(graph_store.clone(), keys, relays, max_depth);
441            if let Some(spambox) = spambox {
442                crawler = crawler.with_spambox(spambox);
443            }
444            if let Err(err) =
445                sync_local_list_files_force(graph_store.as_ref(), &crawl_data_dir, &crawler.keys)
446            {
447                tracing::warn!(
448                    "Failed to sync local social graph lists at startup: {}",
449                    err
450                );
451            }
452            crawler.crawl(crawl_shutdown_rx).await;
453        }
454    });
455
456    let local_list_handle = tokio::spawn({
457        let graph_store = Arc::clone(&graph_store);
458        let keys = keys.clone();
459        let relays = relays.clone();
460        let spambox = spambox.clone();
461        async move {
462            let mut shutdown_rx = local_list_shutdown_rx;
463            let mut state =
464                read_local_list_file_state(&local_list_data_dir).unwrap_or_else(|err| {
465                    tracing::warn!("Failed to read local social graph list state: {}", err);
466                    LocalListFileState::default()
467                });
468            let mut interval = tokio::time::interval(LOCAL_LIST_POLL_INTERVAL);
469            loop {
470                tokio::select! {
471                    _ = shutdown_rx.changed() => {
472                        if *shutdown_rx.borrow() {
473                            break;
474                        }
475                    }
476                    _ = interval.tick() => {
477                        let outcome = match sync_local_list_files_if_changed(
478                            graph_store.as_ref(),
479                            &local_list_data_dir,
480                            &keys,
481                            &mut state,
482                        ) {
483                            Ok(outcome) => outcome,
484                            Err(err) => {
485                                tracing::warn!("Failed to sync local social graph list files: {}", err);
486                                continue;
487                            }
488                        };
489
490                        if outcome.contacts_changed {
491                            let mut crawler =
492                                SocialGraphCrawler::new(graph_store.clone(), keys.clone(), relays.clone(), max_depth);
493                            if let Some(spambox) = spambox.clone() {
494                                crawler = crawler.with_spambox(spambox);
495                            }
496                            crawler.warm_once().await;
497                        }
498                    }
499                }
500            }
501        }
502    });
503
504    SocialGraphTaskHandles {
505        shutdown_tx,
506        crawl_handle,
507        local_list_handle,
508    }
509}
510
511#[cfg(test)]
512mod tests {
513    use super::*;
514    use std::net::TcpListener;
515    use std::sync::Mutex;
516    use std::time::Instant;
517
518    use futures::{SinkExt, StreamExt};
519    use nostr::{EventBuilder, JsonUtil, Kind, PublicKey, Tag};
520    use tempfile::TempDir;
521    use tokio::net::TcpStream;
522    use tokio::sync::broadcast;
523    use tokio_tungstenite::{accept_async, tungstenite::Message};
524
525    #[derive(Debug, Default)]
526    struct RelayState {
527        events: Vec<nostr::Event>,
528        request_author_counts: Vec<usize>,
529        requested_authors: Vec<Vec<String>>,
530    }
531
532    struct TestRelay {
533        port: u16,
534        shutdown: broadcast::Sender<()>,
535        state: Arc<Mutex<RelayState>>,
536    }
537
538    impl TestRelay {
539        fn new(events: Vec<nostr::Event>) -> Self {
540            let state = Arc::new(Mutex::new(RelayState {
541                events,
542                request_author_counts: Vec::new(),
543                requested_authors: Vec::new(),
544            }));
545            let (shutdown, _) = broadcast::channel(1);
546
547            let std_listener = TcpListener::bind("127.0.0.1:0").expect("bind relay listener");
548            let port = std_listener.local_addr().expect("local addr").port();
549            std_listener
550                .set_nonblocking(true)
551                .expect("set listener nonblocking");
552
553            let state_for_thread = Arc::clone(&state);
554            let shutdown_for_thread = shutdown.clone();
555            std::thread::spawn(move || {
556                let runtime = tokio::runtime::Builder::new_multi_thread()
557                    .worker_threads(2)
558                    .enable_all()
559                    .build()
560                    .expect("build tokio runtime");
561                runtime.block_on(async move {
562                    let listener = tokio::net::TcpListener::from_std(std_listener)
563                        .expect("tokio listener from std");
564                    let mut shutdown_rx = shutdown_for_thread.subscribe();
565
566                    loop {
567                        tokio::select! {
568                            _ = shutdown_rx.recv() => break,
569                            accept = listener.accept() => {
570                                if let Ok((stream, _)) = accept {
571                                    let state = Arc::clone(&state_for_thread);
572                                    tokio::spawn(async move {
573                                        handle_connection(stream, state).await;
574                                    });
575                                }
576                            }
577                        }
578                    }
579                });
580            });
581
582            std::thread::sleep(Duration::from_millis(100));
583
584            Self {
585                port,
586                shutdown,
587                state,
588            }
589        }
590
591        fn url(&self) -> String {
592            format!("ws://127.0.0.1:{}", self.port)
593        }
594
595        fn request_author_counts(&self) -> Vec<usize> {
596            self.state
597                .lock()
598                .expect("relay state lock")
599                .request_author_counts
600                .clone()
601        }
602
603        fn requested_authors(&self) -> Vec<Vec<String>> {
604            self.state
605                .lock()
606                .expect("relay state lock")
607                .requested_authors
608                .clone()
609        }
610    }
611
612    impl Drop for TestRelay {
613        fn drop(&mut self) {
614            let _ = self.shutdown.send(());
615            std::thread::sleep(Duration::from_millis(50));
616        }
617    }
618
619    fn matching_events(
620        state: &Arc<Mutex<RelayState>>,
621        filters: &[nostr::Filter],
622    ) -> Vec<nostr::Event> {
623        let guard = state.lock().expect("relay state lock");
624        guard
625            .events
626            .iter()
627            .filter(|event| {
628                filters.is_empty() || filters.iter().any(|filter| filter.match_event(event))
629            })
630            .cloned()
631            .collect()
632    }
633
634    async fn send_relay_message(
635        write: &mut futures::stream::SplitSink<
636            tokio_tungstenite::WebSocketStream<TcpStream>,
637            Message,
638        >,
639        message: nostr::RelayMessage,
640    ) {
641        let _ = write.send(Message::Text(message.as_json())).await;
642    }
643
644    async fn handle_connection(stream: TcpStream, state: Arc<Mutex<RelayState>>) {
645        let ws_stream = match accept_async(stream).await {
646            Ok(ws) => ws,
647            Err(_) => return,
648        };
649        let (mut write, mut read) = ws_stream.split();
650
651        while let Some(message) = read.next().await {
652            let text = match message {
653                Ok(Message::Text(text)) => text,
654                Ok(Message::Ping(data)) => {
655                    let _ = write.send(Message::Pong(data)).await;
656                    continue;
657                }
658                Ok(Message::Close(_)) => break,
659                _ => continue,
660            };
661
662            let parsed = match nostr::ClientMessage::from_json(text.as_bytes()) {
663                Ok(message) => message,
664                Err(_) => continue,
665            };
666
667            match parsed {
668                nostr::ClientMessage::Req {
669                    subscription_id,
670                    filters,
671                } => {
672                    let author_count = filters
673                        .iter()
674                        .filter_map(|filter| filter.authors.as_ref())
675                        .map(|authors| authors.len())
676                        .sum();
677                    let mut requested_authors = filters
678                        .iter()
679                        .filter_map(|filter| filter.authors.as_ref())
680                        .flat_map(|authors| authors.iter().map(|author| author.to_hex()))
681                        .collect::<Vec<_>>();
682                    requested_authors.sort();
683                    requested_authors.dedup();
684                    {
685                        let mut guard = state.lock().expect("relay state lock");
686                        guard.request_author_counts.push(author_count);
687                        guard.requested_authors.push(requested_authors);
688                    }
689
690                    for event in matching_events(&state, &filters) {
691                        send_relay_message(
692                            &mut write,
693                            nostr::RelayMessage::event(subscription_id.clone(), event),
694                        )
695                        .await;
696                    }
697                    send_relay_message(&mut write, nostr::RelayMessage::eose(subscription_id))
698                        .await;
699                }
700                nostr::ClientMessage::Close(subscription_id) => {
701                    send_relay_message(
702                        &mut write,
703                        nostr::RelayMessage::closed(subscription_id, ""),
704                    )
705                    .await;
706                }
707                _ => {}
708            }
709        }
710    }
711
712    async fn wait_until<F>(timeout: Duration, mut condition: F)
713    where
714        F: FnMut() -> bool,
715    {
716        let start = Instant::now();
717        while start.elapsed() < timeout {
718            if condition() {
719                return;
720            }
721            tokio::time::sleep(Duration::from_millis(50)).await;
722        }
723        panic!("condition not met within {:?}", timeout);
724    }
725
726    fn write_contacts_file(path: &std::path::Path, pubkeys: &[String]) {
727        std::fs::write(
728            path,
729            serde_json::to_string(pubkeys).expect("serialize contacts"),
730        )
731        .expect("write contacts file");
732    }
733
734    #[tokio::test]
735    async fn test_crawler_routes_untrusted_to_spambox() {
736        let _guard = crate::socialgraph::test_lock();
737        let tmp = TempDir::new().unwrap();
738        let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
739        let spambox =
740            crate::socialgraph::open_social_graph_store_at_path(&tmp.path().join("spambox"), None)
741                .unwrap();
742
743        let root_keys = nostr::Keys::generate();
744        let root_pk = root_keys.public_key().to_bytes();
745        crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
746        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
747        let spambox_backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = spambox.clone();
748
749        let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![], 2)
750            .with_spambox(spambox_backend);
751
752        let unknown_keys = nostr::Keys::generate();
753        let follow_tag = Tag::public_key(PublicKey::from_slice(&root_pk).unwrap());
754        let event = EventBuilder::new(Kind::ContactList, "", vec![follow_tag])
755            .to_event(&unknown_keys)
756            .unwrap();
757
758        crawler.handle_incoming_event(&event);
759
760        let unknown_pk = unknown_keys.public_key().to_bytes();
761        assert!(crate::socialgraph::get_follows(&graph_store, &unknown_pk).is_empty());
762        assert_eq!(
763            crate::socialgraph::get_follows(&spambox, &unknown_pk),
764            vec![root_pk]
765        );
766    }
767
768    #[tokio::test]
769    #[allow(clippy::await_holding_lock)]
770    async fn test_crawler_batches_graph_fetches_by_author_chunk() {
771        let _guard = crate::socialgraph::test_lock();
772        let tmp = TempDir::new().unwrap();
773        let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
774
775        let root_keys = nostr::Keys::generate();
776        let root_pk = root_keys.public_key().to_bytes();
777        crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
778        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
779
780        let alice_keys = nostr::Keys::generate();
781        let bob_keys = nostr::Keys::generate();
782        let carol_keys = nostr::Keys::generate();
783
784        let root_event = EventBuilder::new(
785            Kind::ContactList,
786            "",
787            vec![
788                Tag::public_key(alice_keys.public_key()),
789                Tag::public_key(bob_keys.public_key()),
790            ],
791        )
792        .custom_created_at(nostr::Timestamp::from(10))
793        .to_event(&root_keys)
794        .unwrap();
795        let alice_event = EventBuilder::new(
796            Kind::ContactList,
797            "",
798            vec![Tag::public_key(carol_keys.public_key())],
799        )
800        .custom_created_at(nostr::Timestamp::from(11))
801        .to_event(&alice_keys)
802        .unwrap();
803        let bob_event = EventBuilder::new(Kind::ContactList, "", vec![])
804            .custom_created_at(nostr::Timestamp::from(12))
805            .to_event(&bob_keys)
806            .unwrap();
807
808        let relay = TestRelay::new(vec![root_event, alice_event, bob_event]);
809        let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 2)
810            .with_author_batch_size(2);
811
812        let (shutdown_tx, shutdown_rx) = watch::channel(false);
813        let handle = tokio::spawn(async move {
814            crawler.crawl(shutdown_rx).await;
815        });
816
817        let alice_pk = alice_keys.public_key().to_bytes();
818        let bob_pk = bob_keys.public_key().to_bytes();
819        let carol_pk = carol_keys.public_key().to_bytes();
820        wait_until(Duration::from_secs(5), || {
821            let root_follows = crate::socialgraph::get_follows(&graph_store, &root_pk);
822            let alice_follows = crate::socialgraph::get_follows(&graph_store, &alice_pk);
823            root_follows.contains(&alice_pk)
824                && root_follows.contains(&bob_pk)
825                && alice_follows.contains(&carol_pk)
826        })
827        .await;
828
829        let _ = shutdown_tx.send(true);
830        handle.await.unwrap();
831
832        let author_counts = relay.request_author_counts();
833        assert!(
834            author_counts.iter().any(|count| *count >= 2),
835            "expected batched author REQ, got {:?}",
836            author_counts
837        );
838    }
839
840    #[tokio::test]
841    #[allow(clippy::await_holding_lock)]
842    async fn test_crawler_expands_from_existing_graph_without_root_refetch() {
843        let _guard = crate::socialgraph::test_lock();
844        let tmp = TempDir::new().unwrap();
845        let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
846
847        let root_keys = nostr::Keys::generate();
848        let root_pk = root_keys.public_key().to_bytes();
849        crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
850        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
851
852        let alice_keys = nostr::Keys::generate();
853        let bob_keys = nostr::Keys::generate();
854        let carol_keys = nostr::Keys::generate();
855
856        let root_event = EventBuilder::new(
857            Kind::ContactList,
858            "",
859            vec![
860                Tag::public_key(alice_keys.public_key()),
861                Tag::public_key(bob_keys.public_key()),
862            ],
863        )
864        .custom_created_at(nostr::Timestamp::from(10))
865        .to_event(&root_keys)
866        .unwrap();
867        crate::socialgraph::ingest_parsed_event(&graph_store, &root_event).unwrap();
868
869        let alice_event = EventBuilder::new(
870            Kind::ContactList,
871            "",
872            vec![Tag::public_key(carol_keys.public_key())],
873        )
874        .custom_created_at(nostr::Timestamp::from(11))
875        .to_event(&alice_keys)
876        .unwrap();
877        let bob_event = EventBuilder::new(Kind::ContactList, "", vec![])
878            .custom_created_at(nostr::Timestamp::from(12))
879            .to_event(&bob_keys)
880            .unwrap();
881
882        let relay = TestRelay::new(vec![alice_event, bob_event]);
883        let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 2)
884            .with_author_batch_size(2);
885
886        let (shutdown_tx, shutdown_rx) = watch::channel(false);
887        let handle = tokio::spawn(async move {
888            crawler.crawl(shutdown_rx).await;
889        });
890
891        let alice_pk = alice_keys.public_key().to_bytes();
892        let carol_pk = carol_keys.public_key().to_bytes();
893        wait_until(Duration::from_secs(5), || {
894            crate::socialgraph::get_follows(&graph_store, &alice_pk).contains(&carol_pk)
895        })
896        .await;
897
898        let _ = shutdown_tx.send(true);
899        handle.await.unwrap();
900
901        let author_counts = relay.request_author_counts();
902        let requested_authors = relay.requested_authors();
903        assert!(
904            requested_authors
905                .iter()
906                .flatten()
907                .all(|author| author != &root_keys.public_key().to_hex()),
908            "expected incremental crawl to skip root refetch, got {:?}",
909            requested_authors
910        );
911        assert!(
912            author_counts.iter().any(|count| *count >= 2),
913            "expected batched distance-1 REQ, got {:?}",
914            author_counts
915        );
916    }
917
918    #[tokio::test]
919    #[allow(clippy::await_holding_lock)]
920    async fn test_crawler_full_recrawl_refetches_root() {
921        let _guard = crate::socialgraph::test_lock();
922        let tmp = TempDir::new().unwrap();
923        let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
924
925        let root_keys = nostr::Keys::generate();
926        let root_pk = root_keys.public_key().to_bytes();
927        crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
928        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
929
930        let alice_keys = nostr::Keys::generate();
931        let bob_keys = nostr::Keys::generate();
932
933        let root_event = EventBuilder::new(
934            Kind::ContactList,
935            "",
936            vec![
937                Tag::public_key(alice_keys.public_key()),
938                Tag::public_key(bob_keys.public_key()),
939            ],
940        )
941        .custom_created_at(nostr::Timestamp::from(10))
942        .to_event(&root_keys)
943        .unwrap();
944        crate::socialgraph::ingest_parsed_event(&graph_store, &root_event).unwrap();
945
946        let relay = TestRelay::new(vec![root_event]);
947        let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 1)
948            .with_full_recrawl(true);
949
950        let (shutdown_tx, shutdown_rx) = watch::channel(false);
951        let handle = tokio::spawn(async move {
952            crawler.crawl(shutdown_rx).await;
953        });
954
955        wait_until(Duration::from_secs(5), || {
956            relay.request_author_counts().contains(&1)
957        })
958        .await;
959
960        let _ = shutdown_tx.send(true);
961        handle.await.unwrap();
962
963        let requested_authors = relay.requested_authors();
964        assert!(
965            requested_authors
966                .iter()
967                .flatten()
968                .any(|author| author == &root_keys.public_key().to_hex()),
969            "expected full recrawl to refetch root, got {:?}",
970            requested_authors
971        );
972    }
973
974    #[tokio::test]
975    #[allow(clippy::await_holding_lock)]
976    async fn test_crawler_revisits_known_authors_with_since_cursor() {
977        let _guard = crate::socialgraph::test_lock();
978        let tmp = TempDir::new().unwrap();
979        let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
980
981        let root_keys = nostr::Keys::generate();
982        let root_pk = root_keys.public_key().to_bytes();
983        crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
984        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
985
986        let alice_keys = nostr::Keys::generate();
987        let carol_keys = nostr::Keys::generate();
988        let dave_keys = nostr::Keys::generate();
989
990        let root_event = EventBuilder::new(
991            Kind::ContactList,
992            "",
993            vec![Tag::public_key(alice_keys.public_key())],
994        )
995        .custom_created_at(nostr::Timestamp::from(10))
996        .to_event(&root_keys)
997        .unwrap();
998        crate::socialgraph::ingest_parsed_event(&graph_store, &root_event).unwrap();
999
1000        let alice_old_event = EventBuilder::new(
1001            Kind::ContactList,
1002            "",
1003            vec![Tag::public_key(carol_keys.public_key())],
1004        )
1005        .custom_created_at(nostr::Timestamp::from(11))
1006        .to_event(&alice_keys)
1007        .unwrap();
1008        crate::socialgraph::ingest_parsed_event(&graph_store, &alice_old_event).unwrap();
1009
1010        let alice_new_event = EventBuilder::new(
1011            Kind::ContactList,
1012            "",
1013            vec![
1014                Tag::public_key(carol_keys.public_key()),
1015                Tag::public_key(dave_keys.public_key()),
1016            ],
1017        )
1018        .custom_created_at(nostr::Timestamp::from(20))
1019        .to_event(&alice_keys)
1020        .unwrap();
1021
1022        let relay = TestRelay::new(vec![alice_new_event]);
1023        let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 2)
1024            .with_author_batch_size(2)
1025            .with_known_since(Some(15));
1026
1027        let (shutdown_tx, shutdown_rx) = watch::channel(false);
1028        let handle = tokio::spawn(async move {
1029            crawler.crawl(shutdown_rx).await;
1030        });
1031
1032        let alice_pk = alice_keys.public_key().to_bytes();
1033        let dave_pk = dave_keys.public_key().to_bytes();
1034        wait_until(Duration::from_secs(5), || {
1035            crate::socialgraph::get_follows(&graph_store, &alice_pk).contains(&dave_pk)
1036        })
1037        .await;
1038
1039        let _ = shutdown_tx.send(true);
1040        handle.await.unwrap();
1041
1042        let requested_authors = relay.requested_authors();
1043        assert!(
1044            crate::socialgraph::get_follows(&graph_store, &alice_pk).contains(&dave_pk),
1045            "expected incremental crawl to refresh known author follow list"
1046        );
1047        assert!(
1048            requested_authors
1049                .iter()
1050                .flatten()
1051                .any(|author| author == &alice_keys.public_key().to_hex()),
1052            "expected crawler to refetch known author, got {:?}",
1053            requested_authors
1054        );
1055    }
1056
1057    #[tokio::test]
1058    #[allow(clippy::await_holding_lock)]
1059    async fn test_crawler_warm_once_completes_initial_sync_without_shutdown() {
1060        let _guard = crate::socialgraph::test_lock();
1061        let tmp = TempDir::new().unwrap();
1062        let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
1063
1064        let root_keys = nostr::Keys::generate();
1065        let root_pk = root_keys.public_key().to_bytes();
1066        crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
1067        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1068
1069        let alice_keys = nostr::Keys::generate();
1070        let bob_keys = nostr::Keys::generate();
1071
1072        let root_event = EventBuilder::new(
1073            Kind::ContactList,
1074            "",
1075            vec![Tag::public_key(alice_keys.public_key())],
1076        )
1077        .custom_created_at(nostr::Timestamp::from(10))
1078        .to_event(&root_keys)
1079        .unwrap();
1080        let alice_event = EventBuilder::new(
1081            Kind::ContactList,
1082            "",
1083            vec![Tag::public_key(bob_keys.public_key())],
1084        )
1085        .custom_created_at(nostr::Timestamp::from(11))
1086        .to_event(&alice_keys)
1087        .unwrap();
1088
1089        let relay = TestRelay::new(vec![root_event, alice_event]);
1090        let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 2)
1091            .with_author_batch_size(2);
1092
1093        let started = std::time::Instant::now();
1094        crawler.warm_once().await;
1095
1096        let alice_pk = alice_keys.public_key().to_bytes();
1097        let bob_pk = bob_keys.public_key().to_bytes();
1098        assert!(
1099            started.elapsed() < Duration::from_secs(5),
1100            "warm_once should complete finite sync promptly"
1101        );
1102        assert!(
1103            crate::socialgraph::get_follows(&graph_store, &alice_pk).contains(&bob_pk),
1104            "expected warm_once to ingest distance-1 follow list"
1105        );
1106    }
1107
1108    #[tokio::test]
1109    async fn test_crawler_warm_once_accepts_large_contact_list_events() {
1110        let _guard = crate::socialgraph::test_lock();
1111        let tmp = TempDir::new().unwrap();
1112        let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
1113
1114        let root_keys = nostr::Keys::generate();
1115        let root_pk = root_keys.public_key().to_bytes();
1116        crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
1117        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1118
1119        let followed_keys = (0..1_600)
1120            .map(|_| nostr::Keys::generate())
1121            .collect::<Vec<_>>();
1122        let tags = followed_keys
1123            .iter()
1124            .map(|keys| Tag::public_key(keys.public_key()))
1125            .collect::<Vec<_>>();
1126        let root_event = EventBuilder::new(Kind::ContactList, "", tags)
1127            .custom_created_at(nostr::Timestamp::from(10))
1128            .to_event(&root_keys)
1129            .unwrap();
1130        assert!(
1131            root_event.as_json().len() > 70_000,
1132            "test event should exceed nostr-sdk default size limit"
1133        );
1134
1135        let relay = TestRelay::new(vec![root_event]);
1136        let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 1)
1137            .with_author_batch_size(1);
1138
1139        crawler.warm_once().await;
1140
1141        let follows = crate::socialgraph::get_follows(&graph_store, &root_pk);
1142        let first_pk = followed_keys
1143            .first()
1144            .expect("first followed key")
1145            .public_key()
1146            .to_bytes();
1147        let last_pk = followed_keys
1148            .last()
1149            .expect("last followed key")
1150            .public_key()
1151            .to_bytes();
1152        assert!(
1153            follows.contains(&first_pk) && follows.contains(&last_pk),
1154            "expected warm_once to ingest oversized contact list event"
1155        );
1156    }
1157
1158    #[tokio::test]
1159    #[allow(clippy::await_holding_lock)]
1160    async fn test_spawned_social_graph_tasks_sync_local_contacts_on_startup() {
1161        let _guard = crate::socialgraph::test_lock();
1162        let tmp = TempDir::new().unwrap();
1163        let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
1164
1165        let root_keys = nostr::Keys::generate();
1166        let root_pk = root_keys.public_key().to_bytes();
1167        crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
1168
1169        let alice_keys = nostr::Keys::generate();
1170        let bob_keys = nostr::Keys::generate();
1171
1172        write_contacts_file(
1173            &tmp.path().join("contacts.json"),
1174            &[alice_keys.public_key().to_hex()],
1175        );
1176
1177        let alice_event = EventBuilder::new(
1178            Kind::ContactList,
1179            "",
1180            vec![Tag::public_key(bob_keys.public_key())],
1181        )
1182        .custom_created_at(nostr::Timestamp::from(11))
1183        .to_event(&alice_keys)
1184        .unwrap();
1185
1186        let relay = TestRelay::new(vec![alice_event]);
1187        let tasks = spawn_social_graph_tasks(
1188            graph_store.clone(),
1189            root_keys.clone(),
1190            vec![relay.url()],
1191            2,
1192            None,
1193            tmp.path().to_path_buf(),
1194        );
1195
1196        let alice_pk = alice_keys.public_key().to_bytes();
1197        let bob_pk = bob_keys.public_key().to_bytes();
1198        wait_until(Duration::from_secs(5), || {
1199            let root_follows = crate::socialgraph::get_follows(&graph_store, &root_pk);
1200            let alice_follows = crate::socialgraph::get_follows(&graph_store, &alice_pk);
1201            root_follows.contains(&alice_pk) && alice_follows.contains(&bob_pk)
1202        })
1203        .await;
1204
1205        let _ = tasks.shutdown_tx.send(true);
1206        tasks.crawl_handle.abort();
1207        tasks.local_list_handle.abort();
1208    }
1209
1210    #[tokio::test]
1211    #[allow(clippy::await_holding_lock)]
1212    async fn test_spawned_social_graph_tasks_refresh_when_contacts_change() {
1213        let _guard = crate::socialgraph::test_lock();
1214        let tmp = TempDir::new().unwrap();
1215        let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
1216
1217        let root_keys = nostr::Keys::generate();
1218        let root_pk = root_keys.public_key().to_bytes();
1219        crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
1220
1221        let alice_keys = nostr::Keys::generate();
1222        let bob_keys = nostr::Keys::generate();
1223
1224        let alice_event = EventBuilder::new(
1225            Kind::ContactList,
1226            "",
1227            vec![Tag::public_key(bob_keys.public_key())],
1228        )
1229        .custom_created_at(nostr::Timestamp::from(11))
1230        .to_event(&alice_keys)
1231        .unwrap();
1232
1233        let relay = TestRelay::new(vec![alice_event]);
1234        let tasks = spawn_social_graph_tasks(
1235            graph_store.clone(),
1236            root_keys.clone(),
1237            vec![relay.url()],
1238            2,
1239            None,
1240            tmp.path().to_path_buf(),
1241        );
1242
1243        let alice_pk = alice_keys.public_key().to_bytes();
1244        let bob_pk = bob_keys.public_key().to_bytes();
1245        assert!(!crate::socialgraph::get_follows(&graph_store, &root_pk).contains(&alice_pk));
1246
1247        write_contacts_file(
1248            &tmp.path().join("contacts.json"),
1249            &[alice_keys.public_key().to_hex()],
1250        );
1251
1252        wait_until(Duration::from_secs(5), || {
1253            let root_follows = crate::socialgraph::get_follows(&graph_store, &root_pk);
1254            let alice_follows = crate::socialgraph::get_follows(&graph_store, &alice_pk);
1255            root_follows.contains(&alice_pk) && alice_follows.contains(&bob_pk)
1256        })
1257        .await;
1258
1259        let _ = tasks.shutdown_tx.send(true);
1260        tasks.crawl_handle.abort();
1261        tasks.local_list_handle.abort();
1262    }
1263}