Skip to main content

hashtree_cli/socialgraph/
crawler.rs

1use futures::stream::{self, StreamExt};
2use std::collections::HashSet;
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::time::Duration;
6
7use nostr::Timestamp;
8use tokio::sync::watch;
9use tokio::task::JoinHandle;
10
11use super::{
12    read_local_list_file_state, sync_local_list_files_force, sync_local_list_files_if_changed,
13    LocalListFileState, SocialGraphBackend,
14};
15
16const DEFAULT_AUTHOR_BATCH_SIZE: usize = 500;
17const DEFAULT_CONCURRENT_BATCHES: usize = 4;
18const GRAPH_FETCH_TIMEOUT: Duration = Duration::from_secs(15);
19const RELAY_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
20pub(crate) const SOCIALGRAPH_RELAY_EVENT_MAX_SIZE: u32 = 512 * 1024;
21#[cfg(not(test))]
22const CRAWLER_STARTUP_DELAY: Duration = Duration::from_secs(5);
23#[cfg(test)]
24const CRAWLER_STARTUP_DELAY: Duration = Duration::from_millis(50);
25#[cfg(not(test))]
26const LOCAL_LIST_POLL_INTERVAL: Duration = Duration::from_secs(5);
27#[cfg(test)]
28const LOCAL_LIST_POLL_INTERVAL: Duration = Duration::from_millis(100);
29
30pub struct SocialGraphTaskHandles {
31    pub shutdown_tx: watch::Sender<bool>,
32    pub crawl_handle: JoinHandle<()>,
33    pub local_list_handle: JoinHandle<()>,
34}
35
36pub struct SocialGraphCrawler {
37    graph_store: Arc<dyn SocialGraphBackend>,
38    spambox: Option<Arc<dyn SocialGraphBackend>>,
39    keys: nostr::Keys,
40    relays: Vec<String>,
41    max_depth: u32,
42    author_batch_size: usize,
43    concurrent_batches: usize,
44    full_recrawl: bool,
45    known_since: Option<Timestamp>,
46}
47
48impl SocialGraphCrawler {
49    pub fn new(
50        graph_store: Arc<dyn SocialGraphBackend>,
51        keys: nostr::Keys,
52        relays: Vec<String>,
53        max_depth: u32,
54    ) -> Self {
55        Self {
56            graph_store,
57            spambox: None,
58            keys,
59            relays,
60            max_depth,
61            author_batch_size: DEFAULT_AUTHOR_BATCH_SIZE,
62            concurrent_batches: DEFAULT_CONCURRENT_BATCHES,
63            full_recrawl: false,
64            known_since: None,
65        }
66    }
67
68    pub fn with_spambox(mut self, spambox: Arc<dyn SocialGraphBackend>) -> Self {
69        self.spambox = Some(spambox);
70        self
71    }
72
73    pub fn with_author_batch_size(mut self, author_batch_size: usize) -> Self {
74        self.author_batch_size = author_batch_size.max(1);
75        self
76    }
77
78    pub fn with_concurrent_batches(mut self, concurrent_batches: usize) -> Self {
79        self.concurrent_batches = concurrent_batches.max(1);
80        self
81    }
82
83    pub fn with_full_recrawl(mut self, full_recrawl: bool) -> Self {
84        self.full_recrawl = full_recrawl;
85        self
86    }
87
88    pub fn with_known_since(mut self, known_since: Option<u64>) -> Self {
89        self.known_since = known_since.map(Timestamp::from);
90        self
91    }
92
93    fn is_within_social_graph(&self, pk_bytes: &[u8; 32]) -> bool {
94        if pk_bytes == &self.keys.public_key().to_bytes() {
95            return true;
96        }
97
98        super::get_follow_distance(self.graph_store.as_ref(), pk_bytes)
99            .map(|distance| distance <= self.max_depth)
100            .unwrap_or(false)
101    }
102
103    fn ingest_events_into(
104        &self,
105        graph_store: &(impl SocialGraphBackend + ?Sized),
106        events: &[nostr::Event],
107    ) {
108        if let Err(err) = super::ingest_parsed_events(graph_store, events) {
109            tracing::debug!("Failed to ingest crawler event: {}", err);
110        }
111    }
112
113    #[allow(deprecated)]
114    fn collect_missing_root_follows(
115        &self,
116        event: &nostr::Event,
117        fetched_contact_lists: &mut HashSet<[u8; 32]>,
118    ) -> Vec<[u8; 32]> {
119        if self.max_depth < 2 || event.kind != nostr::Kind::ContactList {
120            return Vec::new();
121        }
122
123        let root_pk = self.keys.public_key().to_bytes();
124        if event.pubkey.to_bytes() != root_pk {
125            return Vec::new();
126        }
127
128        let mut missing = Vec::new();
129        for tag in event.tags.iter() {
130            if let Some(nostr::TagStandard::PublicKey { public_key, .. }) = tag.as_standardized() {
131                let pk_bytes = public_key.to_bytes();
132                if fetched_contact_lists.contains(&pk_bytes) {
133                    continue;
134                }
135
136                let existing_follows = super::get_follows(self.graph_store.as_ref(), &pk_bytes);
137                if !existing_follows.is_empty() {
138                    fetched_contact_lists.insert(pk_bytes);
139                    continue;
140                }
141
142                fetched_contact_lists.insert(pk_bytes);
143                missing.push(pk_bytes);
144            }
145        }
146
147        missing
148    }
149
150    fn graph_filter_for_pubkeys(
151        pubkeys: &[[u8; 32]],
152        since: Option<Timestamp>,
153    ) -> Option<nostr::Filter> {
154        let authors = pubkeys
155            .iter()
156            .filter_map(|pk_bytes| nostr::PublicKey::from_slice(pk_bytes).ok())
157            .collect::<Vec<_>>();
158        if authors.is_empty() {
159            return None;
160        }
161
162        let mut filter = nostr::Filter::new()
163            .authors(authors)
164            .kinds(vec![nostr::Kind::ContactList, nostr::Kind::MuteList]);
165        if let Some(since) = since {
166            filter = filter.since(since);
167        }
168
169        Some(filter)
170    }
171
172    async fn fetch_graph_events_for_pubkeys(
173        &self,
174        client: &nostr_sdk::Client,
175        pubkeys: &[[u8; 32]],
176        since: Option<Timestamp>,
177    ) -> Vec<nostr::Event> {
178        let Some(filter) = Self::graph_filter_for_pubkeys(pubkeys, since) else {
179            return Vec::new();
180        };
181
182        match client.fetch_events(filter, GRAPH_FETCH_TIMEOUT).await {
183            Ok(events) => events.to_vec(),
184            Err(err) => {
185                tracing::debug!(
186                    "Failed to fetch graph events for {} authors: {}",
187                    pubkeys.len(),
188                    err
189                );
190                Vec::new()
191            }
192        }
193    }
194
195    async fn fetch_contact_lists_for_pubkeys(
196        &self,
197        client: &nostr_sdk::Client,
198        pubkeys: &[[u8; 32]],
199        since: Option<Timestamp>,
200        shutdown_rx: &watch::Receiver<bool>,
201    ) {
202        let chunk_futures = stream::iter(
203            pubkeys
204                .chunks(self.author_batch_size)
205                .map(|chunk| chunk.to_vec())
206                .collect::<Vec<_>>(),
207        )
208        .map(|chunk| async move {
209            self.fetch_graph_events_for_pubkeys(client, &chunk, since)
210                .await
211        });
212
213        let mut in_flight = chunk_futures.buffer_unordered(self.concurrent_batches);
214        while let Some(events) = in_flight.next().await {
215            if *shutdown_rx.borrow() {
216                break;
217            }
218            if let Err(err) = super::ingest_graph_parsed_events(self.graph_store.as_ref(), &events)
219            {
220                tracing::debug!("Failed to ingest crawler graph batch: {}", err);
221            }
222        }
223    }
224
225    fn authors_to_fetch_at_distance(&self, distance: u32) -> (Vec<[u8; 32]>, Vec<[u8; 32]>) {
226        let Ok(users) = self.graph_store.users_by_follow_distance(distance) else {
227            return (Vec::new(), Vec::new());
228        };
229        if self.full_recrawl {
230            return (users, Vec::new());
231        }
232
233        let mut new_authors = Vec::new();
234        let mut known_authors = Vec::new();
235        let refresh_known_authors = self.known_since.is_some();
236        for pk_bytes in users {
237            match self
238                .graph_store
239                .follow_list_created_at(&pk_bytes)
240                .ok()
241                .flatten()
242            {
243                Some(_) if refresh_known_authors => known_authors.push(pk_bytes),
244                Some(_) => {}
245                None => new_authors.push(pk_bytes),
246            }
247        }
248        (new_authors, known_authors)
249    }
250
251    async fn connect_client(&self) -> Option<nostr_sdk::Client> {
252        use nostr::nips::nip19::ToBech32;
253
254        let secret = self.keys.secret_key().to_bech32().unwrap_or_default();
255        let Ok(sdk_keys) = nostr_sdk::Keys::parse(&secret) else {
256            return None;
257        };
258
259        let mut relay_limits = nostr_sdk::pool::RelayLimits::default();
260        relay_limits.events.max_size = Some(SOCIALGRAPH_RELAY_EVENT_MAX_SIZE);
261        let client = nostr_sdk::Client::builder()
262            .signer(sdk_keys)
263            .opts(nostr_sdk::ClientOptions::new().relay_limits(relay_limits))
264            .build();
265        for relay in &self.relays {
266            if let Err(err) = client.add_relay(relay).await {
267                tracing::warn!("Failed to add relay {}: {}", relay, err);
268            }
269        }
270        let _ = tokio::time::timeout(RELAY_CONNECT_TIMEOUT, client.connect()).await;
271        Some(client)
272    }
273
274    async fn sync_graph_once(
275        &self,
276        client: &nostr_sdk::Client,
277        shutdown_rx: &watch::Receiver<bool>,
278        fetched_contact_lists: &mut HashSet<[u8; 32]>,
279    ) {
280        for distance in 0..=self.max_depth {
281            if *shutdown_rx.borrow() {
282                break;
283            }
284
285            let (new_authors, known_authors) = self.authors_to_fetch_at_distance(distance);
286            if new_authors.is_empty() && known_authors.is_empty() {
287                continue;
288            }
289
290            tracing::debug!(
291                "Social graph sync distance={} new_authors={} known_authors={}",
292                distance,
293                new_authors.len(),
294                known_authors.len()
295            );
296
297            if !new_authors.is_empty() {
298                for pk_bytes in &new_authors {
299                    fetched_contact_lists.insert(*pk_bytes);
300                }
301                self.fetch_contact_lists_for_pubkeys(client, &new_authors, None, shutdown_rx)
302                    .await;
303            }
304
305            if !known_authors.is_empty() {
306                for pk_bytes in &known_authors {
307                    fetched_contact_lists.insert(*pk_bytes);
308                }
309                self.fetch_contact_lists_for_pubkeys(
310                    client,
311                    &known_authors,
312                    self.known_since,
313                    shutdown_rx,
314                )
315                .await;
316            }
317        }
318    }
319
320    pub async fn warm_once(&self) {
321        if self.relays.is_empty() {
322            tracing::warn!("Social graph crawler: no relays configured, skipping");
323            return;
324        }
325
326        let Some(client) = self.connect_client().await else {
327            return;
328        };
329        let (_shutdown_tx, shutdown_rx) = watch::channel(false);
330        let mut fetched_contact_lists: HashSet<[u8; 32]> = HashSet::new();
331        self.sync_graph_once(&client, &shutdown_rx, &mut fetched_contact_lists)
332            .await;
333        let _ = client.disconnect().await;
334    }
335
336    pub(crate) fn handle_incoming_event(&self, event: &nostr::Event) {
337        let is_contact_list = event.kind == nostr::Kind::ContactList;
338        let is_mute_list = event.kind == nostr::Kind::MuteList;
339        if !is_contact_list && !is_mute_list {
340            return;
341        }
342
343        let pk_bytes = event.pubkey.to_bytes();
344        if self.is_within_social_graph(&pk_bytes) {
345            self.ingest_events_into(self.graph_store.as_ref(), std::slice::from_ref(event));
346            return;
347        }
348
349        if let Some(spambox) = &self.spambox {
350            self.ingest_events_into(spambox.as_ref(), std::slice::from_ref(event));
351        }
352    }
353
354    #[allow(deprecated)]
355    pub async fn crawl(&self, shutdown_rx: watch::Receiver<bool>) {
356        use nostr_sdk::prelude::RelayPoolNotification;
357
358        if self.relays.is_empty() {
359            tracing::warn!("Social graph crawler: no relays configured, skipping");
360            return;
361        }
362
363        let mut shutdown_rx = shutdown_rx;
364        if *shutdown_rx.borrow() {
365            return;
366        }
367
368        let Some(client) = self.connect_client().await else {
369            return;
370        };
371
372        let mut fetched_contact_lists: HashSet<[u8; 32]> = HashSet::new();
373        self.sync_graph_once(&client, &shutdown_rx, &mut fetched_contact_lists)
374            .await;
375
376        let filter = nostr::Filter::new()
377            .kinds(vec![nostr::Kind::ContactList, nostr::Kind::MuteList])
378            .since(nostr::Timestamp::now());
379
380        let _ = client.subscribe(filter, None).await;
381
382        let mut notifications = client.notifications();
383        loop {
384            tokio::select! {
385                _ = shutdown_rx.changed() => {
386                    if *shutdown_rx.borrow() {
387                        break;
388                    }
389                }
390                notification = notifications.recv() => {
391                    match notification {
392                        Ok(RelayPoolNotification::Event { event, .. }) => {
393                            self.handle_incoming_event(&event);
394                            let missing = self.collect_missing_root_follows(&event, &mut fetched_contact_lists);
395                            if !missing.is_empty() {
396                                self.fetch_contact_lists_for_pubkeys(&client, &missing, None, &shutdown_rx).await;
397                            }
398                        }
399                        Ok(_) => {}
400                        Err(err) => {
401                            tracing::warn!("Social graph crawler notification error: {}", err);
402                            break;
403                        }
404                    }
405                }
406            }
407        }
408
409        let _ = client.disconnect().await;
410    }
411}
412
413pub fn spawn_social_graph_tasks(
414    graph_store: Arc<dyn SocialGraphBackend>,
415    keys: nostr::Keys,
416    relays: Vec<String>,
417    max_depth: u32,
418    spambox: Option<Arc<dyn SocialGraphBackend>>,
419    data_dir: PathBuf,
420) -> SocialGraphTaskHandles {
421    let (shutdown_tx, crawl_shutdown_rx) = watch::channel(false);
422    let local_list_shutdown_rx = crawl_shutdown_rx.clone();
423    let crawl_data_dir = data_dir.clone();
424    let local_list_data_dir = data_dir;
425
426    let crawl_handle = tokio::spawn({
427        let graph_store = Arc::clone(&graph_store);
428        let keys = keys.clone();
429        let relays = relays.clone();
430        let spambox = spambox.clone();
431        async move {
432            tokio::time::sleep(CRAWLER_STARTUP_DELAY).await;
433            let mut crawler = SocialGraphCrawler::new(graph_store.clone(), keys, relays, max_depth);
434            if let Some(spambox) = spambox {
435                crawler = crawler.with_spambox(spambox);
436            }
437            if let Err(err) =
438                sync_local_list_files_force(graph_store.as_ref(), &crawl_data_dir, &crawler.keys)
439            {
440                tracing::warn!(
441                    "Failed to sync local social graph lists at startup: {}",
442                    err
443                );
444            }
445            crawler.crawl(crawl_shutdown_rx).await;
446        }
447    });
448
449    let local_list_handle = tokio::spawn({
450        let graph_store = Arc::clone(&graph_store);
451        let keys = keys.clone();
452        let relays = relays.clone();
453        let spambox = spambox.clone();
454        async move {
455            let mut shutdown_rx = local_list_shutdown_rx;
456            let mut state =
457                read_local_list_file_state(&local_list_data_dir).unwrap_or_else(|err| {
458                    tracing::warn!("Failed to read local social graph list state: {}", err);
459                    LocalListFileState::default()
460                });
461            let mut interval = tokio::time::interval(LOCAL_LIST_POLL_INTERVAL);
462            loop {
463                tokio::select! {
464                    _ = shutdown_rx.changed() => {
465                        if *shutdown_rx.borrow() {
466                            break;
467                        }
468                    }
469                    _ = interval.tick() => {
470                        let outcome = match sync_local_list_files_if_changed(
471                            graph_store.as_ref(),
472                            &local_list_data_dir,
473                            &keys,
474                            &mut state,
475                        ) {
476                            Ok(outcome) => outcome,
477                            Err(err) => {
478                                tracing::warn!("Failed to sync local social graph list files: {}", err);
479                                continue;
480                            }
481                        };
482
483                        if outcome.contacts_changed {
484                            let mut crawler =
485                                SocialGraphCrawler::new(graph_store.clone(), keys.clone(), relays.clone(), max_depth);
486                            if let Some(spambox) = spambox.clone() {
487                                crawler = crawler.with_spambox(spambox);
488                            }
489                            crawler.warm_once().await;
490                        }
491                    }
492                }
493            }
494        }
495    });
496
497    SocialGraphTaskHandles {
498        shutdown_tx,
499        crawl_handle,
500        local_list_handle,
501    }
502}
503
504#[cfg(test)]
505mod tests {
506    use super::*;
507    use std::net::TcpListener;
508    use std::sync::Mutex;
509    use std::time::Instant;
510
511    use futures::{SinkExt, StreamExt};
512    use nostr::{EventBuilder, JsonUtil, Kind, PublicKey, Tag};
513    use tempfile::TempDir;
514    use tokio::net::TcpStream;
515    use tokio::sync::broadcast;
516    use tokio_tungstenite::{accept_async, tungstenite::Message};
517
518    macro_rules! event_builder {
519        ($kind:expr, $content:expr $(,)?) => {
520            EventBuilder::new($kind, $content)
521        };
522        ($kind:expr, $content:expr, $tags:expr $(,)?) => {
523            EventBuilder::new($kind, $content).tags($tags)
524        };
525    }
526
527    #[derive(Debug, Default)]
528    struct RelayState {
529        events: Vec<nostr::Event>,
530        request_author_counts: Vec<usize>,
531        requested_authors: Vec<Vec<String>>,
532    }
533
534    struct TestRelay {
535        port: u16,
536        shutdown: broadcast::Sender<()>,
537        state: Arc<Mutex<RelayState>>,
538    }
539
540    impl TestRelay {
541        fn new(events: Vec<nostr::Event>) -> Self {
542            let state = Arc::new(Mutex::new(RelayState {
543                events,
544                request_author_counts: Vec::new(),
545                requested_authors: Vec::new(),
546            }));
547            let (shutdown, _) = broadcast::channel(1);
548
549            let std_listener = TcpListener::bind("127.0.0.1:0").expect("bind relay listener");
550            let port = std_listener.local_addr().expect("local addr").port();
551            std_listener
552                .set_nonblocking(true)
553                .expect("set listener nonblocking");
554
555            let state_for_thread = Arc::clone(&state);
556            let shutdown_for_thread = shutdown.clone();
557            std::thread::spawn(move || {
558                let runtime = tokio::runtime::Builder::new_multi_thread()
559                    .worker_threads(2)
560                    .enable_all()
561                    .build()
562                    .expect("build tokio runtime");
563                runtime.block_on(async move {
564                    let listener = tokio::net::TcpListener::from_std(std_listener)
565                        .expect("tokio listener from std");
566                    let mut shutdown_rx = shutdown_for_thread.subscribe();
567
568                    loop {
569                        tokio::select! {
570                            _ = shutdown_rx.recv() => break,
571                            accept = listener.accept() => {
572                                if let Ok((stream, _)) = accept {
573                                    let state = Arc::clone(&state_for_thread);
574                                    tokio::spawn(async move {
575                                        handle_connection(stream, state).await;
576                                    });
577                                }
578                            }
579                        }
580                    }
581                });
582            });
583
584            std::thread::sleep(Duration::from_millis(100));
585
586            Self {
587                port,
588                shutdown,
589                state,
590            }
591        }
592
593        fn url(&self) -> String {
594            format!("ws://127.0.0.1:{}", self.port)
595        }
596
597        fn request_author_counts(&self) -> Vec<usize> {
598            self.state
599                .lock()
600                .expect("relay state lock")
601                .request_author_counts
602                .clone()
603        }
604
605        fn requested_authors(&self) -> Vec<Vec<String>> {
606            self.state
607                .lock()
608                .expect("relay state lock")
609                .requested_authors
610                .clone()
611        }
612    }
613
614    impl Drop for TestRelay {
615        fn drop(&mut self) {
616            let _ = self.shutdown.send(());
617            std::thread::sleep(Duration::from_millis(50));
618        }
619    }
620
621    fn matching_events(
622        state: &Arc<Mutex<RelayState>>,
623        filters: &[nostr::Filter],
624    ) -> Vec<nostr::Event> {
625        let guard = state.lock().expect("relay state lock");
626        guard
627            .events
628            .iter()
629            .filter(|event| {
630                filters.is_empty()
631                    || filters
632                        .iter()
633                        .any(|filter| filter.match_event(event, Default::default()))
634            })
635            .cloned()
636            .collect()
637    }
638
639    async fn send_relay_message(
640        write: &mut futures::stream::SplitSink<
641            tokio_tungstenite::WebSocketStream<TcpStream>,
642            Message,
643        >,
644        message: nostr::RelayMessage<'_>,
645    ) {
646        let _ = write.send(Message::Text(message.as_json())).await;
647    }
648
649    async fn handle_connection(stream: TcpStream, state: Arc<Mutex<RelayState>>) {
650        let ws_stream = match accept_async(stream).await {
651            Ok(ws) => ws,
652            Err(_) => return,
653        };
654        let (mut write, mut read) = ws_stream.split();
655
656        while let Some(message) = read.next().await {
657            let text = match message {
658                Ok(Message::Text(text)) => text,
659                Ok(Message::Ping(data)) => {
660                    let _ = write.send(Message::Pong(data)).await;
661                    continue;
662                }
663                Ok(Message::Close(_)) => break,
664                _ => continue,
665            };
666
667            let parsed = match nostr::ClientMessage::from_json(text.as_bytes()) {
668                Ok(message) => message,
669                Err(_) => continue,
670            };
671
672            match parsed {
673                nostr::ClientMessage::Req {
674                    subscription_id,
675                    filters,
676                } => {
677                    let subscription_id = subscription_id.into_owned();
678                    let filters = filters
679                        .into_iter()
680                        .map(|filter| filter.into_owned())
681                        .collect::<Vec<_>>();
682                    let author_count = filters
683                        .iter()
684                        .filter_map(|filter| filter.authors.as_ref())
685                        .map(|authors| authors.len())
686                        .sum();
687                    let mut requested_authors = filters
688                        .iter()
689                        .filter_map(|filter| filter.authors.as_ref())
690                        .flat_map(|authors| authors.iter().map(|author| author.to_hex()))
691                        .collect::<Vec<_>>();
692                    requested_authors.sort();
693                    requested_authors.dedup();
694                    {
695                        let mut guard = state.lock().expect("relay state lock");
696                        guard.request_author_counts.push(author_count);
697                        guard.requested_authors.push(requested_authors);
698                    }
699
700                    for event in matching_events(&state, &filters) {
701                        send_relay_message(
702                            &mut write,
703                            nostr::RelayMessage::event(subscription_id.clone(), event),
704                        )
705                        .await;
706                    }
707                    send_relay_message(&mut write, nostr::RelayMessage::eose(subscription_id))
708                        .await;
709                }
710                nostr::ClientMessage::Close(subscription_id) => {
711                    send_relay_message(
712                        &mut write,
713                        nostr::RelayMessage::closed(subscription_id.into_owned(), ""),
714                    )
715                    .await;
716                }
717                _ => {}
718            }
719        }
720    }
721
722    async fn wait_until<F>(timeout: Duration, mut condition: F)
723    where
724        F: FnMut() -> bool,
725    {
726        let start = Instant::now();
727        while start.elapsed() < timeout {
728            if condition() {
729                return;
730            }
731            tokio::time::sleep(Duration::from_millis(50)).await;
732        }
733        panic!("condition not met within {:?}", timeout);
734    }
735
736    fn write_contacts_file(path: &std::path::Path, pubkeys: &[String]) {
737        std::fs::write(
738            path,
739            serde_json::to_string(pubkeys).expect("serialize contacts"),
740        )
741        .expect("write contacts file");
742    }
743
744    #[tokio::test]
745    async fn test_crawler_routes_untrusted_to_spambox() {
746        let _guard = crate::socialgraph::test_lock();
747        let tmp = TempDir::new().unwrap();
748        let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
749        let spambox =
750            crate::socialgraph::open_social_graph_store_at_path(&tmp.path().join("spambox"), None)
751                .unwrap();
752
753        let root_keys = nostr::Keys::generate();
754        let root_pk = root_keys.public_key().to_bytes();
755        crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
756        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
757        let spambox_backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = spambox.clone();
758
759        let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![], 2)
760            .with_spambox(spambox_backend);
761
762        let unknown_keys = nostr::Keys::generate();
763        let follow_tag = Tag::public_key(PublicKey::from_slice(&root_pk).unwrap());
764        let event = event_builder!(Kind::ContactList, "")
765            .tags(vec![follow_tag])
766            .sign_with_keys(&unknown_keys)
767            .unwrap();
768
769        crawler.handle_incoming_event(&event);
770
771        let unknown_pk = unknown_keys.public_key().to_bytes();
772        assert!(crate::socialgraph::get_follows(&graph_store, &unknown_pk).is_empty());
773        assert_eq!(
774            crate::socialgraph::get_follows(&spambox, &unknown_pk),
775            vec![root_pk]
776        );
777    }
778
779    #[tokio::test]
780    #[allow(clippy::await_holding_lock)]
781    async fn test_crawler_batches_graph_fetches_by_author_chunk() {
782        let _guard = crate::socialgraph::test_lock();
783        let tmp = TempDir::new().unwrap();
784        let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
785
786        let root_keys = nostr::Keys::generate();
787        let root_pk = root_keys.public_key().to_bytes();
788        crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
789        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
790
791        let alice_keys = nostr::Keys::generate();
792        let bob_keys = nostr::Keys::generate();
793        let carol_keys = nostr::Keys::generate();
794
795        let root_event = event_builder!(
796            Kind::ContactList,
797            "",
798            vec![
799                Tag::public_key(alice_keys.public_key()),
800                Tag::public_key(bob_keys.public_key()),
801            ],
802        )
803        .custom_created_at(nostr::Timestamp::from(10))
804        .sign_with_keys(&root_keys)
805        .unwrap();
806        let alice_event = event_builder!(
807            Kind::ContactList,
808            "",
809            vec![Tag::public_key(carol_keys.public_key())],
810        )
811        .custom_created_at(nostr::Timestamp::from(11))
812        .sign_with_keys(&alice_keys)
813        .unwrap();
814        let bob_event = event_builder!(Kind::ContactList, "")
815            .custom_created_at(nostr::Timestamp::from(12))
816            .sign_with_keys(&bob_keys)
817            .unwrap();
818
819        let relay = TestRelay::new(vec![root_event, alice_event, bob_event]);
820        let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 2)
821            .with_author_batch_size(2);
822
823        let (shutdown_tx, shutdown_rx) = watch::channel(false);
824        let handle = tokio::spawn(async move {
825            crawler.crawl(shutdown_rx).await;
826        });
827
828        let alice_pk = alice_keys.public_key().to_bytes();
829        let bob_pk = bob_keys.public_key().to_bytes();
830        let carol_pk = carol_keys.public_key().to_bytes();
831        wait_until(Duration::from_secs(5), || {
832            let root_follows = crate::socialgraph::get_follows(&graph_store, &root_pk);
833            let alice_follows = crate::socialgraph::get_follows(&graph_store, &alice_pk);
834            root_follows.contains(&alice_pk)
835                && root_follows.contains(&bob_pk)
836                && alice_follows.contains(&carol_pk)
837        })
838        .await;
839
840        let _ = shutdown_tx.send(true);
841        handle.await.unwrap();
842
843        let author_counts = relay.request_author_counts();
844        assert!(
845            author_counts.iter().any(|count| *count >= 2),
846            "expected batched author REQ, got {:?}",
847            author_counts
848        );
849    }
850
851    #[tokio::test]
852    #[allow(clippy::await_holding_lock)]
853    async fn test_crawler_expands_from_existing_graph_without_root_refetch() {
854        let _guard = crate::socialgraph::test_lock();
855        let tmp = TempDir::new().unwrap();
856        let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
857
858        let root_keys = nostr::Keys::generate();
859        let root_pk = root_keys.public_key().to_bytes();
860        crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
861        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
862
863        let alice_keys = nostr::Keys::generate();
864        let bob_keys = nostr::Keys::generate();
865        let carol_keys = nostr::Keys::generate();
866
867        let root_event = event_builder!(
868            Kind::ContactList,
869            "",
870            vec![
871                Tag::public_key(alice_keys.public_key()),
872                Tag::public_key(bob_keys.public_key()),
873            ],
874        )
875        .custom_created_at(nostr::Timestamp::from(10))
876        .sign_with_keys(&root_keys)
877        .unwrap();
878        crate::socialgraph::ingest_parsed_event(&graph_store, &root_event).unwrap();
879
880        let alice_event = event_builder!(
881            Kind::ContactList,
882            "",
883            vec![Tag::public_key(carol_keys.public_key())],
884        )
885        .custom_created_at(nostr::Timestamp::from(11))
886        .sign_with_keys(&alice_keys)
887        .unwrap();
888        let bob_event = event_builder!(Kind::ContactList, "")
889            .custom_created_at(nostr::Timestamp::from(12))
890            .sign_with_keys(&bob_keys)
891            .unwrap();
892
893        let relay = TestRelay::new(vec![alice_event, bob_event]);
894        let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 2)
895            .with_author_batch_size(2);
896
897        let (shutdown_tx, shutdown_rx) = watch::channel(false);
898        let handle = tokio::spawn(async move {
899            crawler.crawl(shutdown_rx).await;
900        });
901
902        let alice_pk = alice_keys.public_key().to_bytes();
903        let carol_pk = carol_keys.public_key().to_bytes();
904        wait_until(Duration::from_secs(5), || {
905            crate::socialgraph::get_follows(&graph_store, &alice_pk).contains(&carol_pk)
906        })
907        .await;
908
909        let _ = shutdown_tx.send(true);
910        handle.await.unwrap();
911
912        let author_counts = relay.request_author_counts();
913        let requested_authors = relay.requested_authors();
914        assert!(
915            requested_authors
916                .iter()
917                .flatten()
918                .all(|author| author != &root_keys.public_key().to_hex()),
919            "expected incremental crawl to skip root refetch, got {:?}",
920            requested_authors
921        );
922        assert!(
923            author_counts.iter().any(|count| *count >= 2),
924            "expected batched distance-1 REQ, got {:?}",
925            author_counts
926        );
927    }
928
929    #[tokio::test]
930    #[allow(clippy::await_holding_lock)]
931    async fn test_crawler_full_recrawl_refetches_root() {
932        let _guard = crate::socialgraph::test_lock();
933        let tmp = TempDir::new().unwrap();
934        let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
935
936        let root_keys = nostr::Keys::generate();
937        let root_pk = root_keys.public_key().to_bytes();
938        crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
939        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
940
941        let alice_keys = nostr::Keys::generate();
942        let bob_keys = nostr::Keys::generate();
943
944        let root_event = event_builder!(
945            Kind::ContactList,
946            "",
947            vec![
948                Tag::public_key(alice_keys.public_key()),
949                Tag::public_key(bob_keys.public_key()),
950            ],
951        )
952        .custom_created_at(nostr::Timestamp::from(10))
953        .sign_with_keys(&root_keys)
954        .unwrap();
955        crate::socialgraph::ingest_parsed_event(&graph_store, &root_event).unwrap();
956
957        let relay = TestRelay::new(vec![root_event]);
958        let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 1)
959            .with_full_recrawl(true);
960
961        let (shutdown_tx, shutdown_rx) = watch::channel(false);
962        let handle = tokio::spawn(async move {
963            crawler.crawl(shutdown_rx).await;
964        });
965
966        wait_until(Duration::from_secs(5), || {
967            relay.request_author_counts().contains(&1)
968        })
969        .await;
970
971        let _ = shutdown_tx.send(true);
972        handle.await.unwrap();
973
974        let requested_authors = relay.requested_authors();
975        assert!(
976            requested_authors
977                .iter()
978                .flatten()
979                .any(|author| author == &root_keys.public_key().to_hex()),
980            "expected full recrawl to refetch root, got {:?}",
981            requested_authors
982        );
983    }
984
985    #[tokio::test]
986    #[allow(clippy::await_holding_lock)]
987    async fn test_crawler_revisits_known_authors_with_since_cursor() {
988        let _guard = crate::socialgraph::test_lock();
989        let tmp = TempDir::new().unwrap();
990        let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
991
992        let root_keys = nostr::Keys::generate();
993        let root_pk = root_keys.public_key().to_bytes();
994        crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
995        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
996
997        let alice_keys = nostr::Keys::generate();
998        let carol_keys = nostr::Keys::generate();
999        let dave_keys = nostr::Keys::generate();
1000
1001        let root_event = event_builder!(
1002            Kind::ContactList,
1003            "",
1004            vec![Tag::public_key(alice_keys.public_key())],
1005        )
1006        .custom_created_at(nostr::Timestamp::from(10))
1007        .sign_with_keys(&root_keys)
1008        .unwrap();
1009        crate::socialgraph::ingest_parsed_event(&graph_store, &root_event).unwrap();
1010
1011        let alice_old_event = event_builder!(
1012            Kind::ContactList,
1013            "",
1014            vec![Tag::public_key(carol_keys.public_key())],
1015        )
1016        .custom_created_at(nostr::Timestamp::from(11))
1017        .sign_with_keys(&alice_keys)
1018        .unwrap();
1019        crate::socialgraph::ingest_parsed_event(&graph_store, &alice_old_event).unwrap();
1020
1021        let alice_new_event = event_builder!(
1022            Kind::ContactList,
1023            "",
1024            vec![
1025                Tag::public_key(carol_keys.public_key()),
1026                Tag::public_key(dave_keys.public_key()),
1027            ],
1028        )
1029        .custom_created_at(nostr::Timestamp::from(20))
1030        .sign_with_keys(&alice_keys)
1031        .unwrap();
1032
1033        let relay = TestRelay::new(vec![alice_new_event]);
1034        let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 2)
1035            .with_author_batch_size(2)
1036            .with_known_since(Some(15));
1037
1038        let (shutdown_tx, shutdown_rx) = watch::channel(false);
1039        let handle = tokio::spawn(async move {
1040            crawler.crawl(shutdown_rx).await;
1041        });
1042
1043        let alice_pk = alice_keys.public_key().to_bytes();
1044        let dave_pk = dave_keys.public_key().to_bytes();
1045        wait_until(Duration::from_secs(5), || {
1046            crate::socialgraph::get_follows(&graph_store, &alice_pk).contains(&dave_pk)
1047        })
1048        .await;
1049
1050        let _ = shutdown_tx.send(true);
1051        handle.await.unwrap();
1052
1053        let requested_authors = relay.requested_authors();
1054        assert!(
1055            crate::socialgraph::get_follows(&graph_store, &alice_pk).contains(&dave_pk),
1056            "expected incremental crawl to refresh known author follow list"
1057        );
1058        assert!(
1059            requested_authors
1060                .iter()
1061                .flatten()
1062                .any(|author| author == &alice_keys.public_key().to_hex()),
1063            "expected crawler to refetch known author, got {:?}",
1064            requested_authors
1065        );
1066    }
1067
1068    #[tokio::test]
1069    #[allow(clippy::await_holding_lock)]
1070    async fn test_crawler_warm_once_completes_initial_sync_without_shutdown() {
1071        let _guard = crate::socialgraph::test_lock();
1072        let tmp = TempDir::new().unwrap();
1073        let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
1074
1075        let root_keys = nostr::Keys::generate();
1076        let root_pk = root_keys.public_key().to_bytes();
1077        crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
1078        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1079
1080        let alice_keys = nostr::Keys::generate();
1081        let bob_keys = nostr::Keys::generate();
1082
1083        let root_event = event_builder!(
1084            Kind::ContactList,
1085            "",
1086            vec![Tag::public_key(alice_keys.public_key())],
1087        )
1088        .custom_created_at(nostr::Timestamp::from(10))
1089        .sign_with_keys(&root_keys)
1090        .unwrap();
1091        let alice_event = event_builder!(
1092            Kind::ContactList,
1093            "",
1094            vec![Tag::public_key(bob_keys.public_key())],
1095        )
1096        .custom_created_at(nostr::Timestamp::from(11))
1097        .sign_with_keys(&alice_keys)
1098        .unwrap();
1099
1100        let relay = TestRelay::new(vec![root_event, alice_event]);
1101        let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 2)
1102            .with_author_batch_size(2);
1103
1104        let started = std::time::Instant::now();
1105        crawler.warm_once().await;
1106
1107        let alice_pk = alice_keys.public_key().to_bytes();
1108        let bob_pk = bob_keys.public_key().to_bytes();
1109        assert!(
1110            started.elapsed() < Duration::from_secs(5),
1111            "warm_once should complete finite sync promptly"
1112        );
1113        assert!(
1114            crate::socialgraph::get_follows(&graph_store, &alice_pk).contains(&bob_pk),
1115            "expected warm_once to ingest distance-1 follow list"
1116        );
1117    }
1118
1119    #[tokio::test]
1120    async fn test_crawler_warm_once_accepts_large_contact_list_events() {
1121        let _guard = crate::socialgraph::test_lock();
1122        let tmp = TempDir::new().unwrap();
1123        let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
1124
1125        let root_keys = nostr::Keys::generate();
1126        let root_pk = root_keys.public_key().to_bytes();
1127        crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
1128        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1129
1130        let followed_keys = (0..1_600)
1131            .map(|_| nostr::Keys::generate())
1132            .collect::<Vec<_>>();
1133        let tags = followed_keys
1134            .iter()
1135            .map(|keys| Tag::public_key(keys.public_key()))
1136            .collect::<Vec<_>>();
1137        let root_event = event_builder!(Kind::ContactList, "")
1138            .tags(tags)
1139            .custom_created_at(nostr::Timestamp::from(10))
1140            .sign_with_keys(&root_keys)
1141            .unwrap();
1142        assert!(
1143            root_event.as_json().len() > 70_000,
1144            "test event should exceed nostr-sdk default size limit"
1145        );
1146
1147        let relay = TestRelay::new(vec![root_event]);
1148        let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 1)
1149            .with_author_batch_size(1);
1150
1151        crawler.warm_once().await;
1152
1153        let follows = crate::socialgraph::get_follows(&graph_store, &root_pk);
1154        let first_pk = followed_keys
1155            .first()
1156            .expect("first followed key")
1157            .public_key()
1158            .to_bytes();
1159        let last_pk = followed_keys
1160            .last()
1161            .expect("last followed key")
1162            .public_key()
1163            .to_bytes();
1164        assert!(
1165            follows.contains(&first_pk) && follows.contains(&last_pk),
1166            "expected warm_once to ingest oversized contact list event"
1167        );
1168    }
1169
1170    #[tokio::test]
1171    #[allow(clippy::await_holding_lock)]
1172    async fn test_spawned_social_graph_tasks_sync_local_contacts_on_startup() {
1173        let _guard = crate::socialgraph::test_lock();
1174        let tmp = TempDir::new().unwrap();
1175        let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
1176
1177        let root_keys = nostr::Keys::generate();
1178        let root_pk = root_keys.public_key().to_bytes();
1179        crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
1180
1181        let alice_keys = nostr::Keys::generate();
1182        let bob_keys = nostr::Keys::generate();
1183
1184        write_contacts_file(
1185            &tmp.path().join("contacts.json"),
1186            &[alice_keys.public_key().to_hex()],
1187        );
1188
1189        let alice_event = event_builder!(
1190            Kind::ContactList,
1191            "",
1192            vec![Tag::public_key(bob_keys.public_key())],
1193        )
1194        .custom_created_at(nostr::Timestamp::from(11))
1195        .sign_with_keys(&alice_keys)
1196        .unwrap();
1197
1198        let relay = TestRelay::new(vec![alice_event]);
1199        let tasks = spawn_social_graph_tasks(
1200            graph_store.clone(),
1201            root_keys.clone(),
1202            vec![relay.url()],
1203            2,
1204            None,
1205            tmp.path().to_path_buf(),
1206        );
1207
1208        let alice_pk = alice_keys.public_key().to_bytes();
1209        let bob_pk = bob_keys.public_key().to_bytes();
1210        wait_until(Duration::from_secs(5), || {
1211            let root_follows = crate::socialgraph::get_follows(&graph_store, &root_pk);
1212            let alice_follows = crate::socialgraph::get_follows(&graph_store, &alice_pk);
1213            root_follows.contains(&alice_pk) && alice_follows.contains(&bob_pk)
1214        })
1215        .await;
1216
1217        let _ = tasks.shutdown_tx.send(true);
1218        tasks.crawl_handle.abort();
1219        tasks.local_list_handle.abort();
1220    }
1221
1222    #[tokio::test]
1223    #[allow(clippy::await_holding_lock)]
1224    async fn test_spawned_social_graph_tasks_refresh_when_contacts_change() {
1225        let _guard = crate::socialgraph::test_lock();
1226        let tmp = TempDir::new().unwrap();
1227        let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
1228
1229        let root_keys = nostr::Keys::generate();
1230        let root_pk = root_keys.public_key().to_bytes();
1231        crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
1232
1233        let alice_keys = nostr::Keys::generate();
1234        let bob_keys = nostr::Keys::generate();
1235
1236        let alice_event = event_builder!(
1237            Kind::ContactList,
1238            "",
1239            vec![Tag::public_key(bob_keys.public_key())],
1240        )
1241        .custom_created_at(nostr::Timestamp::from(11))
1242        .sign_with_keys(&alice_keys)
1243        .unwrap();
1244
1245        let relay = TestRelay::new(vec![alice_event]);
1246        let tasks = spawn_social_graph_tasks(
1247            graph_store.clone(),
1248            root_keys.clone(),
1249            vec![relay.url()],
1250            2,
1251            None,
1252            tmp.path().to_path_buf(),
1253        );
1254
1255        let alice_pk = alice_keys.public_key().to_bytes();
1256        let bob_pk = bob_keys.public_key().to_bytes();
1257        assert!(!crate::socialgraph::get_follows(&graph_store, &root_pk).contains(&alice_pk));
1258
1259        write_contacts_file(
1260            &tmp.path().join("contacts.json"),
1261            &[alice_keys.public_key().to_hex()],
1262        );
1263
1264        wait_until(Duration::from_secs(5), || {
1265            let root_follows = crate::socialgraph::get_follows(&graph_store, &root_pk);
1266            let alice_follows = crate::socialgraph::get_follows(&graph_store, &alice_pk);
1267            root_follows.contains(&alice_pk) && alice_follows.contains(&bob_pk)
1268        })
1269        .await;
1270
1271        let _ = tasks.shutdown_tx.send(true);
1272        tasks.crawl_handle.abort();
1273        tasks.local_list_handle.abort();
1274    }
1275}